direct-io: Implement generic deferred AIO completions
[deliverable/linux.git] / fs / direct-io.c
index 7ab90f5081eebc4ab8b0de88bef8d0b6310ed113..8b31b9f449f4603f0a68336f3f716aab751a6231 100644 (file)
@@ -127,6 +127,7 @@ struct dio {
        spinlock_t bio_lock;            /* protects BIO fields below */
        int page_errors;                /* errno from get_user_pages() */
        int is_async;                   /* is IO async ? */
+       bool defer_completion;          /* defer AIO completion to workqueue? */
        int io_error;                   /* IO error in completion path */
        unsigned long refcount;         /* direct_io_worker() and bios */
        struct bio *bio_list;           /* singly linked via bi_private */
@@ -141,7 +142,10 @@ struct dio {
         * allocation time.  Don't add new fields after pages[] unless you
         * wish that they not be zeroed.
         */
-       struct page *pages[DIO_PAGES];  /* page buffer */
+       union {
+               struct page *pages[DIO_PAGES];  /* page buffer */
+               struct work_struct complete_work;/* deferred AIO completion */
+       };
 } ____cacheline_aligned_in_smp;
 
 static struct kmem_cache *dio_cache __read_mostly;
@@ -221,16 +225,16 @@ static inline struct page *dio_get_page(struct dio *dio,
  * dio_complete() - called when all DIO BIO I/O has been completed
  * @offset: the byte offset in the file of the completed operation
  *
- * This releases locks as dictated by the locking type, lets interested parties
- * know that a DIO operation has completed, and calculates the resulting return
- * code for the operation.
+ * This drops i_dio_count, lets interested parties know that a DIO operation
+ * has completed, and calculates the resulting return code for the operation.
  *
  * It lets the filesystem know if it registered an interest earlier via
  * get_block.  Pass the private field of the map buffer_head so that
  * filesystems can use it to hold additional state between get_block calls and
  * dio_complete.
  */
-static ssize_t dio_complete(struct dio *dio, loff_t offset, ssize_t ret, bool is_async)
+static ssize_t dio_complete(struct dio *dio, loff_t offset, ssize_t ret,
+               bool is_async)
 {
        ssize_t transferred = 0;
 
@@ -258,19 +262,26 @@ static ssize_t dio_complete(struct dio *dio, loff_t offset, ssize_t ret, bool is
        if (ret == 0)
                ret = transferred;
 
-       if (dio->end_io && dio->result) {
-               dio->end_io(dio->iocb, offset, transferred,
-                           dio->private, ret, is_async);
-       } else {
-               inode_dio_done(dio->inode);
-               if (is_async)
-                       aio_complete(dio->iocb, ret, 0);
-       }
+       if (dio->end_io && dio->result)
+               dio->end_io(dio->iocb, offset, transferred, dio->private);
+
+       inode_dio_done(dio->inode);
+       if (is_async)
+               aio_complete(dio->iocb, ret, 0);
 
+       kmem_cache_free(dio_cache, dio);
        return ret;
 }
 
+static void dio_aio_complete_work(struct work_struct *work)
+{
+       struct dio *dio = container_of(work, struct dio, complete_work);
+
+       dio_complete(dio, dio->iocb->ki_pos, 0, true);
+}
+
 static int dio_bio_complete(struct dio *dio, struct bio *bio);
+
 /*
  * Asynchronous IO callback. 
  */
@@ -290,8 +301,13 @@ static void dio_bio_end_aio(struct bio *bio, int error)
        spin_unlock_irqrestore(&dio->bio_lock, flags);
 
        if (remaining == 0) {
-               dio_complete(dio, dio->iocb->ki_pos, 0, true);
-               kmem_cache_free(dio_cache, dio);
+               if (dio->result && dio->defer_completion) {
+                       INIT_WORK(&dio->complete_work, dio_aio_complete_work);
+                       queue_work(dio->inode->i_sb->s_dio_done_wq,
+                                  &dio->complete_work);
+               } else {
+                       dio_complete(dio, dio->iocb->ki_pos, 0, true);
+               }
        }
 }
 
@@ -510,6 +526,41 @@ static inline int dio_bio_reap(struct dio *dio, struct dio_submit *sdio)
        return ret;
 }
 
+/*
+ * Create workqueue for deferred direct IO completions. We allocate the
+ * workqueue when it's first needed. This avoids creating workqueue for
+ * filesystems that don't need it and also allows us to create the workqueue
+ * late enough so the we can include s_id in the name of the workqueue.
+ */
+static int sb_init_dio_done_wq(struct super_block *sb)
+{
+       struct workqueue_struct *wq = alloc_workqueue("dio/%s",
+                                                     WQ_MEM_RECLAIM, 0,
+                                                     sb->s_id);
+       if (!wq)
+               return -ENOMEM;
+       /*
+        * This has to be atomic as more DIOs can race to create the workqueue
+        */
+       cmpxchg(&sb->s_dio_done_wq, NULL, wq);
+       /* Someone created workqueue before us? Free ours... */
+       if (wq != sb->s_dio_done_wq)
+               destroy_workqueue(wq);
+       return 0;
+}
+
+static int dio_set_defer_completion(struct dio *dio)
+{
+       struct super_block *sb = dio->inode->i_sb;
+
+       if (dio->defer_completion)
+               return 0;
+       dio->defer_completion = true;
+       if (!sb->s_dio_done_wq)
+               return sb_init_dio_done_wq(sb);
+       return 0;
+}
+
 /*
  * Call into the fs to map some more disk blocks.  We record the current number
  * of available blocks at sdio->blocks_available.  These are in units of the
@@ -581,6 +632,9 @@ static int get_more_blocks(struct dio *dio, struct dio_submit *sdio,
 
                /* Store for completion */
                dio->private = map_bh->b_private;
+
+               if (ret == 0 && buffer_defer_completion(map_bh))
+                       ret = dio_set_defer_completion(dio);
        }
        return ret;
 }
@@ -1269,7 +1323,6 @@ do_blockdev_direct_IO(int rw, struct kiocb *iocb, struct inode *inode,
 
        if (drop_refcount(dio) == 0) {
                retval = dio_complete(dio, offset, retval, false);
-               kmem_cache_free(dio_cache, dio);
        } else
                BUG_ON(retval != -EIOCBQUEUED);
 
This page took 0.03133 seconds and 5 git commands to generate.