NFS: rewrite directio read to use async coalesce code
[deliverable/linux.git] / fs / nfs / direct.c
index 22a40c4084494d2b03e9daca0f1b4c53183b7da7..4ba9a2c839bbf62d707fc06d758dd5d9f1ca7d74 100644 (file)
@@ -124,22 +124,6 @@ ssize_t nfs_direct_IO(int rw, struct kiocb *iocb, const struct iovec *iov, loff_
        return -EINVAL;
 }
 
-static void nfs_direct_dirty_pages(struct page **pages, unsigned int pgbase, size_t count)
-{
-       unsigned int npages;
-       unsigned int i;
-
-       if (count == 0)
-               return;
-       pages += (pgbase >> PAGE_SHIFT);
-       npages = (count + (pgbase & ~PAGE_MASK) + PAGE_SIZE - 1) >> PAGE_SHIFT;
-       for (i = 0; i < npages; i++) {
-               struct page *page = pages[i];
-               if (!PageCompound(page))
-                       set_page_dirty(page);
-       }
-}
-
 static void nfs_direct_release_pages(struct page **pages, unsigned int npages)
 {
        unsigned int i;
@@ -226,58 +210,92 @@ static void nfs_direct_complete(struct nfs_direct_req *dreq)
        nfs_direct_req_release(dreq);
 }
 
-/*
- * We must hold a reference to all the pages in this direct read request
- * until the RPCs complete.  This could be long *after* we are woken up in
- * nfs_direct_wait (for instance, if someone hits ^C on a slow server).
- */
-static void nfs_direct_read_result(struct rpc_task *task, void *calldata)
+void nfs_direct_readpage_release(struct nfs_page *req)
 {
-       struct nfs_read_data *data = calldata;
-
-       nfs_readpage_result(task, data);
+       dprintk("NFS: direct read done (%s/%lld %d@%lld)\n",
+               req->wb_context->dentry->d_inode->i_sb->s_id,
+               (long long)NFS_FILEID(req->wb_context->dentry->d_inode),
+               req->wb_bytes,
+               (long long)req_offset(req));
+       nfs_release_request(req);
 }
 
-static void nfs_direct_read_release(void *calldata)
+static void nfs_direct_read_completion(struct nfs_pgio_header *hdr)
 {
+       unsigned long bytes = 0;
+       struct nfs_direct_req *dreq = hdr->dreq;
 
-       struct nfs_read_data *data = calldata;
-       struct nfs_direct_req *dreq = (struct nfs_direct_req *)data->header->req;
-       int status = data->task.tk_status;
+       if (test_bit(NFS_IOHDR_REDO, &hdr->flags))
+               goto out_put;
 
        spin_lock(&dreq->lock);
-       if (unlikely(status < 0)) {
-               dreq->error = status;
-               spin_unlock(&dreq->lock);
+       if (test_bit(NFS_IOHDR_ERROR, &hdr->flags) && (hdr->good_bytes == 0))
+               dreq->error = hdr->error;
+       else
+               dreq->count += hdr->good_bytes;
+       spin_unlock(&dreq->lock);
+
+       if (!test_bit(NFS_IOHDR_ERROR, &hdr->flags)) {
+               while (!list_empty(&hdr->pages)) {
+                       struct nfs_page *req = nfs_list_entry(hdr->pages.next);
+                       struct page *page = req->wb_page;
+
+                       if (test_bit(NFS_IOHDR_EOF, &hdr->flags)) {
+                               if (bytes > hdr->good_bytes)
+                                       zero_user(page, 0, PAGE_SIZE);
+                               else if (hdr->good_bytes - bytes < PAGE_SIZE)
+                                       zero_user_segment(page,
+                                               hdr->good_bytes & ~PAGE_MASK,
+                                               PAGE_SIZE);
+                       }
+                       bytes += req->wb_bytes;
+                       nfs_list_remove_request(req);
+                       nfs_direct_readpage_release(req);
+                       if (!PageCompound(page))
+                               set_page_dirty(page);
+                       page_cache_release(page);
+               }
        } else {
-               dreq->count += data->res.count;
-               spin_unlock(&dreq->lock);
-               nfs_direct_dirty_pages(data->pages.pagevec,
-                               data->args.pgbase,
-                               data->res.count);
+               while (!list_empty(&hdr->pages)) {
+                       struct nfs_page *req = nfs_list_entry(hdr->pages.next);
+
+                       if (bytes < hdr->good_bytes)
+                               if (!PageCompound(req->wb_page))
+                                       set_page_dirty(req->wb_page);
+                       bytes += req->wb_bytes;
+                       page_cache_release(req->wb_page);
+                       nfs_list_remove_request(req);
+                       nfs_direct_readpage_release(req);
+               }
        }
-       nfs_direct_release_pages(data->pages.pagevec, data->pages.npages);
-
+out_put:
        if (put_dreq(dreq))
                nfs_direct_complete(dreq);
-       nfs_readdata_release(data);
+       hdr->release(hdr);
 }
 
-static const struct rpc_call_ops nfs_read_direct_ops = {
-       .rpc_call_prepare = nfs_read_prepare,
-       .rpc_call_done = nfs_direct_read_result,
-       .rpc_release = nfs_direct_read_release,
-};
-
-static void nfs_direct_readhdr_release(struct nfs_read_header *rhdr)
+static void nfs_sync_pgio_error(struct list_head *head)
 {
-       struct nfs_read_data *data = &rhdr->rpc_data;
+       struct nfs_page *req;
 
-       if (data->pages.pagevec != data->pages.page_array)
-               kfree(data->pages.pagevec);
-       nfs_readhdr_free(&rhdr->header);
+       while (!list_empty(head)) {
+               req = nfs_list_entry(head->next);
+               nfs_list_remove_request(req);
+               nfs_release_request(req);
+       }
 }
 
+static void nfs_direct_pgio_init(struct nfs_pgio_header *hdr)
+{
+       get_dreq(hdr->dreq);
+}
+
+static const struct nfs_pgio_completion_ops nfs_direct_read_completion_ops = {
+       .error_cleanup = nfs_sync_pgio_error,
+       .init_hdr = nfs_direct_pgio_init,
+       .completion = nfs_direct_read_completion,
+};
+
 /*
  * For each rsize'd chunk of the user's buffer, dispatch an NFS READ
  * operation.  If nfs_readdata_alloc() or get_user_pages() fails,
@@ -285,118 +303,85 @@ static void nfs_direct_readhdr_release(struct nfs_read_header *rhdr)
  * handled automatically by nfs_direct_read_result().  Otherwise, if
  * no requests have been sent, just return an error.
  */
-static ssize_t nfs_direct_read_schedule_segment(struct nfs_direct_req *dreq,
+static ssize_t nfs_direct_read_schedule_segment(struct nfs_pageio_descriptor *desc,
                                                const struct iovec *iov,
                                                loff_t pos)
 {
+       struct nfs_direct_req *dreq = desc->pg_dreq;
        struct nfs_open_context *ctx = dreq->ctx;
        struct inode *inode = ctx->dentry->d_inode;
        unsigned long user_addr = (unsigned long)iov->iov_base;
        size_t count = iov->iov_len;
        size_t rsize = NFS_SERVER(inode)->rsize;
-       struct rpc_task *task;
-       struct rpc_message msg = {
-               .rpc_cred = ctx->cred,
-       };
-       struct rpc_task_setup task_setup_data = {
-               .rpc_client = NFS_CLIENT(inode),
-               .rpc_message = &msg,
-               .callback_ops = &nfs_read_direct_ops,
-               .workqueue = nfsiod_workqueue,
-               .flags = RPC_TASK_ASYNC,
-       };
        unsigned int pgbase;
        int result;
        ssize_t started = 0;
+       struct page **pagevec = NULL;
+       unsigned int npages;
 
        do {
-               struct nfs_read_header *rhdr;
-               struct nfs_read_data *data;
-               struct nfs_page_array *pages;
                size_t bytes;
+               int i;
 
                pgbase = user_addr & ~PAGE_MASK;
-               bytes = min(rsize,count);
+               bytes = min(max(rsize, PAGE_SIZE), count);
 
                result = -ENOMEM;
-               rhdr = nfs_readhdr_alloc();
-               if (unlikely(!rhdr))
-                       break;
-               data = nfs_readdata_alloc(&rhdr->header, nfs_page_array_len(pgbase, bytes));
-               if (!data) {
-                       nfs_readhdr_free(&rhdr->header);
+               npages = nfs_page_array_len(pgbase, bytes);
+               if (!pagevec)
+                       pagevec = kmalloc(npages * sizeof(struct page *),
+                                         GFP_KERNEL);
+               if (!pagevec)
                        break;
-               }
-               data->header = &rhdr->header;
-               atomic_inc(&data->header->refcnt);
-               pages = &data->pages;
-
                down_read(&current->mm->mmap_sem);
                result = get_user_pages(current, current->mm, user_addr,
-                                       pages->npages, 1, 0, pages->pagevec, NULL);
+                                       npages, 1, 0, pagevec, NULL);
                up_read(&current->mm->mmap_sem);
-               if (result < 0) {
-                       nfs_direct_readhdr_release(rhdr);
+               if (result < 0)
                        break;
-               }
-               if ((unsigned)result < pages->npages) {
+               if ((unsigned)result < npages) {
                        bytes = result * PAGE_SIZE;
                        if (bytes <= pgbase) {
-                               nfs_direct_release_pages(pages->pagevec, result);
-                               nfs_direct_readhdr_release(rhdr);
+                               nfs_direct_release_pages(pagevec, result);
                                break;
                        }
                        bytes -= pgbase;
-                       pages->npages = result;
+                       npages = result;
                }
 
-               get_dreq(dreq);
-
-               rhdr->header.req = (struct nfs_page *) dreq;
-               rhdr->header.inode = inode;
-               rhdr->header.cred = msg.rpc_cred;
-               data->args.fh = NFS_FH(inode);
-               data->args.context = get_nfs_open_context(ctx);
-               data->args.lock_context = dreq->l_ctx;
-               data->args.offset = pos;
-               data->args.pgbase = pgbase;
-               data->args.pages = pages->pagevec;
-               data->args.count = bytes;
-               data->res.fattr = &data->fattr;
-               data->res.eof = 0;
-               data->res.count = bytes;
-               nfs_fattr_init(&data->fattr);
-               msg.rpc_argp = &data->args;
-               msg.rpc_resp = &data->res;
-
-               task_setup_data.task = &data->task;
-               task_setup_data.callback_data = data;
-               NFS_PROTO(inode)->read_setup(data, &msg);
-
-               task = rpc_run_task(&task_setup_data);
-               if (IS_ERR(task))
-                       break;
-
-               dprintk("NFS: %5u initiated direct read call "
-                       "(req %s/%Ld, %zu bytes @ offset %Lu)\n",
-                               task->tk_pid,
-                               inode->i_sb->s_id,
-                               (long long)NFS_FILEID(inode),
-                               bytes,
-                               (unsigned long long)data->args.offset);
-               rpc_put_task(task);
-
-               started += bytes;
-               user_addr += bytes;
-               pos += bytes;
-               /* FIXME: Remove this unnecessary math from final patch */
-               pgbase += bytes;
-               pgbase &= ~PAGE_MASK;
-               BUG_ON(pgbase != (user_addr & ~PAGE_MASK));
-
-               count -= bytes;
+               for (i = 0; i < npages; i++) {
+                       struct nfs_page *req;
+                       unsigned int req_len = min(bytes, PAGE_SIZE - pgbase);
+                       /* XXX do we need to do the eof zeroing found in async_filler? */
+                       req = nfs_create_request(dreq->ctx, dreq->inode,
+                                                pagevec[i],
+                                                pgbase, req_len);
+                       if (IS_ERR(req)) {
+                               nfs_direct_release_pages(pagevec + i,
+                                                        npages - i);
+                               result = PTR_ERR(req);
+                               break;
+                       }
+                       req->wb_index = pos >> PAGE_SHIFT;
+                       req->wb_offset = pos & ~PAGE_MASK;
+                       if (!nfs_pageio_add_request(desc, req)) {
+                               result = desc->pg_error;
+                               nfs_release_request(req);
+                               nfs_direct_release_pages(pagevec + i,
+                                                        npages - i);
+                               break;
+                       }
+                       pgbase = 0;
+                       bytes -= req_len;
+                       started += req_len;
+                       user_addr += req_len;
+                       pos += req_len;
+                       count -= req_len;
+               }
        } while (count != 0);
 
+       kfree(pagevec);
+
        if (started)
                return started;
        return result < 0 ? (ssize_t) result : -EFAULT;
@@ -407,15 +392,19 @@ static ssize_t nfs_direct_read_schedule_iovec(struct nfs_direct_req *dreq,
                                              unsigned long nr_segs,
                                              loff_t pos)
 {
+       struct nfs_pageio_descriptor desc;
        ssize_t result = -EINVAL;
        size_t requested_bytes = 0;
        unsigned long seg;
 
+       nfs_pageio_init_read(&desc, dreq->inode,
+                            &nfs_direct_read_completion_ops);
        get_dreq(dreq);
+       desc.pg_dreq = dreq;
 
        for (seg = 0; seg < nr_segs; seg++) {
                const struct iovec *vec = &iov[seg];
-               result = nfs_direct_read_schedule_segment(dreq, vec, pos);
+               result = nfs_direct_read_schedule_segment(&desc, vec, pos);
                if (result < 0)
                        break;
                requested_bytes += result;
@@ -424,6 +413,8 @@ static ssize_t nfs_direct_read_schedule_iovec(struct nfs_direct_req *dreq,
                pos += vec->iov_len;
        }
 
+       nfs_pageio_complete(&desc);
+
        /*
         * If no bytes were started, return the error, and let the
         * generic layer handle the completion.
This page took 0.04415 seconds and 5 git commands to generate.