staging/lustre/clio: optimize read ahead code
authorJinshan Xiong <jinshan.xiong@intel.com>
Wed, 30 Mar 2016 23:48:33 +0000 (19:48 -0400)
committerGreg Kroah-Hartman <gregkh@linuxfoundation.org>
Thu, 31 Mar 2016 04:38:13 +0000 (21:38 -0700)
It used to check each page in the readahead window is covered by
a lock underneath, now cpo_page_is_under_lock() provides @max_index
to help decide the maximum ra window. @max_index can be modified by
OSC to extend the maximum lock region, to align stripe boundary at
LOV, and to make sure the readahead region at least covers read
region at LLITE layer.

After this is done, usually readahead code calls
cpo_page_is_under_lock() for each stripe.

Signed-off-by: Jinshan Xiong <jinshan.xiong@intel.com>
Reviewed-on: http://review.whamcloud.com/8523
Intel-bug-id: https://jira.hpdd.intel.com/browse/LU-3321
Reviewed-by: Andreas Dilger <andreas.dilger@intel.com>
Signed-off-by: Oleg Drokin <green@linuxhacker.ru>
Signed-off-by: Greg Kroah-Hartman <gregkh@linuxfoundation.org>
18 files changed:
drivers/staging/lustre/lustre/include/cl_object.h
drivers/staging/lustre/lustre/include/lclient.h
drivers/staging/lustre/lustre/llite/lcommon_cl.c
drivers/staging/lustre/lustre/llite/llite_internal.h
drivers/staging/lustre/lustre/llite/lproc_llite.c
drivers/staging/lustre/lustre/llite/rw.c
drivers/staging/lustre/lustre/llite/vvp_io.c
drivers/staging/lustre/lustre/llite/vvp_page.c
drivers/staging/lustre/lustre/lov/lov_cl_internal.h
drivers/staging/lustre/lustre/lov/lov_internal.h
drivers/staging/lustre/lustre/lov/lov_io.c
drivers/staging/lustre/lustre/lov/lov_offset.c
drivers/staging/lustre/lustre/lov/lov_page.c
drivers/staging/lustre/lustre/lov/lovsub_page.c
drivers/staging/lustre/lustre/obdclass/cl_io.c
drivers/staging/lustre/lustre/obdclass/cl_page.c
drivers/staging/lustre/lustre/obdecho/echo_client.c
drivers/staging/lustre/lustre/osc/osc_page.c

index 5b65854834f58899226b7c60cd471a283b42858e..69b40f50df7017a9638cff2470514debd854661c 100644 (file)
@@ -935,7 +935,7 @@ struct cl_page_operations {
         */
        int (*cpo_is_under_lock)(const struct lu_env *env,
                                 const struct cl_page_slice *slice,
-                                struct cl_io *io);
+                                struct cl_io *io, pgoff_t *max);
 
        /**
         * Optional debugging helper. Prints given page slice.
@@ -2674,7 +2674,7 @@ static inline void cl_device_fini(struct cl_device *d)
 }
 
 void cl_page_slice_add(struct cl_page *page, struct cl_page_slice *slice,
-                      struct cl_object *obj,
+                      struct cl_object *obj, pgoff_t index,
                       const struct cl_page_operations *ops);
 void cl_lock_slice_add(struct cl_lock *lock, struct cl_lock_slice *slice,
                       struct cl_object *obj,
@@ -2826,7 +2826,7 @@ void cl_page_delete(const struct lu_env *env, struct cl_page *pg);
 int cl_page_is_vmlocked(const struct lu_env *env, const struct cl_page *pg);
 void cl_page_export(const struct lu_env *env, struct cl_page *pg, int uptodate);
 int cl_page_is_under_lock(const struct lu_env *env, struct cl_io *io,
-                         struct cl_page *page);
+                         struct cl_page *page, pgoff_t *max_index);
 loff_t cl_offset(const struct cl_object *obj, pgoff_t idx);
 pgoff_t cl_index(const struct cl_object *obj, loff_t offset);
 int cl_page_size(const struct cl_object *obj);
index c91fb0151a52221c8f2897b52e3c25df1c5454b7..a8c8788ebc07a5501c2d3c34a464a6a75d7b229b 100644 (file)
@@ -299,8 +299,6 @@ int ccc_lock_init(const struct lu_env *env, struct cl_object *obj,
                  const struct cl_lock_operations *lkops);
 int ccc_object_glimpse(const struct lu_env *env,
                       const struct cl_object *obj, struct ost_lvb *lvb);
-int ccc_page_is_under_lock(const struct lu_env *env,
-                          const struct cl_page_slice *slice, struct cl_io *io);
 int ccc_fail(const struct lu_env *env, const struct cl_page_slice *slice);
 int ccc_transient_page_prep(const struct lu_env *env,
                            const struct cl_page_slice *slice,
index 55fa0da80b815cf88857e130eb2ae1a1aa7860ec..e34d8323944d0ccdfd43a0c41f050acf193ade10 100644 (file)
@@ -452,34 +452,6 @@ static void ccc_object_size_unlock(struct cl_object *obj)
  *
  */
 
-int ccc_page_is_under_lock(const struct lu_env *env,
-                          const struct cl_page_slice *slice,
-                          struct cl_io *io)
-{
-       struct ccc_io   *cio  = ccc_env_io(env);
-       struct cl_lock_descr *desc = &ccc_env_info(env)->cti_descr;
-       struct cl_page       *page = slice->cpl_page;
-
-       int result;
-
-       if (io->ci_type == CIT_READ || io->ci_type == CIT_WRITE ||
-           io->ci_type == CIT_FAULT) {
-               if (cio->cui_fd->fd_flags & LL_FILE_GROUP_LOCKED) {
-                       result = -EBUSY;
-               } else {
-                       desc->cld_start = ccc_index(cl2ccc_page(slice));
-                       desc->cld_end   = ccc_index(cl2ccc_page(slice));
-                       desc->cld_obj   = page->cp_obj;
-                       desc->cld_mode  = CLM_READ;
-                       result = cl_queue_match(&io->ci_lockset.cls_done,
-                                               desc) ? -EBUSY : 0;
-               }
-       } else {
-               result = 0;
-       }
-       return result;
-}
-
 int ccc_fail(const struct lu_env *env, const struct cl_page_slice *slice)
 {
        /*
index bc831472b3ed29eeef1967bd63c9bf289655f556..cd691734f2429ba9291b507d4c11fc0a1079514a 100644 (file)
@@ -328,6 +328,7 @@ enum ra_stat {
        RA_STAT_EOF,
        RA_STAT_MAX_IN_FLIGHT,
        RA_STAT_WRONG_GRAB_PAGE,
+       RA_STAT_FAILED_REACH_END,
        _NR_RA_STAT,
 };
 
@@ -702,8 +703,8 @@ int ll_writepages(struct address_space *, struct writeback_control *wbc);
 int ll_readpage(struct file *file, struct page *page);
 void ll_readahead_init(struct inode *inode, struct ll_readahead_state *ras);
 int ll_readahead(const struct lu_env *env, struct cl_io *io,
-                struct ll_readahead_state *ras, struct address_space *mapping,
-                struct cl_page_list *queue, int flags);
+                struct cl_page_list *queue, struct ll_readahead_state *ras,
+                bool hit);
 int vvp_io_write_commit(const struct lu_env *env, struct cl_io *io);
 struct ll_cl_context *ll_cl_init(struct file *file, struct page *vmpage);
 void ll_cl_fini(struct ll_cl_context *lcc);
@@ -1074,7 +1075,7 @@ void ras_update(struct ll_sb_info *sbi, struct inode *inode,
                struct ll_readahead_state *ras, unsigned long index,
                unsigned hit);
 void ll_ra_count_put(struct ll_sb_info *sbi, unsigned long len);
-void ll_ra_stats_inc(struct address_space *mapping, enum ra_stat which);
+void ll_ra_stats_inc(struct inode *inode, enum ra_stat which);
 
 /* llite/llite_rmtacl.c */
 #ifdef CONFIG_FS_POSIX_ACL
index 9e8e61a730b7d28f05eb62f058c1d49376c39298..091144fa97dd32d2eaaa2719d1d4064a2988dbd3 100644 (file)
@@ -960,6 +960,7 @@ static const char *ra_stat_string[] = {
        [RA_STAT_EOF] = "read-ahead to EOF",
        [RA_STAT_MAX_IN_FLIGHT] = "hit max r-a issue",
        [RA_STAT_WRONG_GRAB_PAGE] = "wrong page from grab_cache_page",
+       [RA_STAT_FAILED_REACH_END] = "failed to reach end"
 };
 
 int ldebugfs_register_mountpoint(struct dentry *parent,
index b1375f1719e79a7640fc825f4ac05f061af65c49..ad15058c2ddb8e14bd99527ef6afadf327a26028 100644 (file)
@@ -166,7 +166,7 @@ static void ll_ra_stats_inc_sbi(struct ll_sb_info *sbi, enum ra_stat which);
  */
 static unsigned long ll_ra_count_get(struct ll_sb_info *sbi,
                                     struct ra_io_arg *ria,
-                                    unsigned long pages)
+                                    unsigned long pages, unsigned long min)
 {
        struct ll_ra_info *ra = &sbi->ll_ra_info;
        long ret;
@@ -206,6 +206,11 @@ static unsigned long ll_ra_count_get(struct ll_sb_info *sbi,
        }
 
 out:
+       if (ret < min) {
+               /* override ra limit for maximum performance */
+               atomic_add(min - ret, &ra->ra_cur_pages);
+               ret = min;
+       }
        return ret;
 }
 
@@ -222,9 +227,9 @@ static void ll_ra_stats_inc_sbi(struct ll_sb_info *sbi, enum ra_stat which)
        lprocfs_counter_incr(sbi->ll_ra_stats, which);
 }
 
-void ll_ra_stats_inc(struct address_space *mapping, enum ra_stat which)
+void ll_ra_stats_inc(struct inode *inode, enum ra_stat which)
 {
-       struct ll_sb_info *sbi = ll_i2sbi(mapping->host);
+       struct ll_sb_info *sbi = ll_i2sbi(inode);
 
        ll_ra_stats_inc_sbi(sbi, which);
 }
@@ -290,7 +295,7 @@ void ll_ra_read_ex(struct file *f, struct ll_ra_read *rar)
 
 static int cl_read_ahead_page(const struct lu_env *env, struct cl_io *io,
                              struct cl_page_list *queue, struct cl_page *page,
-                             struct cl_object *clob)
+                             struct cl_object *clob, pgoff_t *max_index)
 {
        struct page *vmpage = page->cp_vmpage;
        struct ccc_page *cp;
@@ -301,8 +306,11 @@ static int cl_read_ahead_page(const struct lu_env *env, struct cl_io *io,
        lu_ref_add(&page->cp_reference, "ra", current);
        cp = cl2ccc_page(cl_object_page_slice(clob, page));
        if (!cp->cpg_defer_uptodate && !PageUptodate(vmpage)) {
-               rc = cl_page_is_under_lock(env, io, page);
-               if (rc == -EBUSY) {
+               CDEBUG(D_READA, "page index %lu, max_index: %lu\n",
+                      ccc_index(cp), *max_index);
+               if (*max_index == 0 || ccc_index(cp) > *max_index)
+                       rc = cl_page_is_under_lock(env, io, page, max_index);
+               if (rc == 0) {
                        cp->cpg_defer_uptodate = 1;
                        cp->cpg_ra_used = 0;
                        cl_page_list_add(queue, page);
@@ -332,24 +340,25 @@ static int cl_read_ahead_page(const struct lu_env *env, struct cl_io *io,
  */
 static int ll_read_ahead_page(const struct lu_env *env, struct cl_io *io,
                              struct cl_page_list *queue,
-                             pgoff_t index, struct address_space *mapping)
+                             pgoff_t index, pgoff_t *max_index)
 {
+       struct cl_object *clob  = io->ci_obj;
+       struct inode     *inode = ccc_object_inode(clob);
        struct page      *vmpage;
-       struct cl_object *clob  = ll_i2info(mapping->host)->lli_clob;
        struct cl_page   *page;
        enum ra_stat      which = _NR_RA_STAT; /* keep gcc happy */
        int            rc    = 0;
        const char       *msg   = NULL;
 
-       vmpage = grab_cache_page_nowait(mapping, index);
+       vmpage = grab_cache_page_nowait(inode->i_mapping, index);
        if (vmpage) {
                /* Check if vmpage was truncated or reclaimed */
-               if (vmpage->mapping == mapping) {
+               if (vmpage->mapping == inode->i_mapping) {
                        page = cl_page_find(env, clob, vmpage->index,
                                            vmpage, CPT_CACHEABLE);
                        if (!IS_ERR(page)) {
                                rc = cl_read_ahead_page(env, io, queue,
-                                                       page, clob);
+                                                       page, clob, max_index);
                                if (rc == -ENOLCK) {
                                        which = RA_STAT_FAILED_MATCH;
                                        msg   = "lock match failed";
@@ -370,7 +379,7 @@ static int ll_read_ahead_page(const struct lu_env *env, struct cl_io *io,
                msg   = "g_c_p_n failed";
        }
        if (msg) {
-               ll_ra_stats_inc(mapping, which);
+               ll_ra_stats_inc(inode, which);
                CDEBUG(D_READA, "%s\n", msg);
        }
        return rc;
@@ -482,11 +491,12 @@ static int ll_read_ahead_pages(const struct lu_env *env,
                               struct cl_io *io, struct cl_page_list *queue,
                               struct ra_io_arg *ria,
                               unsigned long *reserved_pages,
-                              struct address_space *mapping,
                               unsigned long *ra_end)
 {
-       int rc, count = 0, stride_ria;
-       unsigned long page_idx;
+       int rc, count = 0;
+       bool stride_ria;
+       pgoff_t page_idx;
+       pgoff_t max_index = 0;
 
        LASSERT(ria);
        RIA_DEBUG(ria);
@@ -497,7 +507,7 @@ static int ll_read_ahead_pages(const struct lu_env *env,
                if (ras_inside_ra_window(page_idx, ria)) {
                        /* If the page is inside the read-ahead window*/
                        rc = ll_read_ahead_page(env, io, queue,
-                                               page_idx, mapping);
+                                               page_idx, &max_index);
                        if (rc == 1) {
                                (*reserved_pages)--;
                                count++;
@@ -532,25 +542,23 @@ static int ll_read_ahead_pages(const struct lu_env *env,
 }
 
 int ll_readahead(const struct lu_env *env, struct cl_io *io,
-                struct ll_readahead_state *ras, struct address_space *mapping,
-                struct cl_page_list *queue, int flags)
+                struct cl_page_list *queue, struct ll_readahead_state *ras,
+                bool hit)
 {
        struct vvp_io *vio = vvp_env_io(env);
        struct vvp_thread_info *vti = vvp_env_info(env);
        struct cl_attr *attr = ccc_env_thread_attr(env);
        unsigned long start = 0, end = 0, reserved;
-       unsigned long ra_end, len;
+       unsigned long ra_end, len, mlen = 0;
        struct inode *inode;
        struct ll_ra_read *bead;
        struct ra_io_arg *ria = &vti->vti_ria;
-       struct ll_inode_info *lli;
        struct cl_object *clob;
        int ret = 0;
        __u64 kms;
 
-       inode = mapping->host;
-       lli = ll_i2info(inode);
-       clob = lli->lli_clob;
+       clob = io->ci_obj;
+       inode = ccc_object_inode(clob);
 
        memset(ria, 0, sizeof(*ria));
 
@@ -562,7 +570,7 @@ int ll_readahead(const struct lu_env *env, struct cl_io *io,
                return ret;
        kms = attr->cat_kms;
        if (kms == 0) {
-               ll_ra_stats_inc(mapping, RA_STAT_ZERO_LEN);
+               ll_ra_stats_inc(inode, RA_STAT_ZERO_LEN);
                return 0;
        }
 
@@ -621,29 +629,48 @@ int ll_readahead(const struct lu_env *env, struct cl_io *io,
        spin_unlock(&ras->ras_lock);
 
        if (end == 0) {
-               ll_ra_stats_inc(mapping, RA_STAT_ZERO_WINDOW);
+               ll_ra_stats_inc(inode, RA_STAT_ZERO_WINDOW);
                return 0;
        }
        len = ria_page_count(ria);
-       if (len == 0)
+       if (len == 0) {
+               ll_ra_stats_inc(inode, RA_STAT_ZERO_WINDOW);
                return 0;
+       }
+
+       CDEBUG(D_READA, DFID ": ria: %lu/%lu, bead: %lu/%lu, hit: %d\n",
+              PFID(lu_object_fid(&clob->co_lu)),
+              ria->ria_start, ria->ria_end,
+              !bead ? 0 : bead->lrr_start,
+              !bead ? 0 : bead->lrr_count,
+              hit);
+
+       /* at least to extend the readahead window to cover current read */
+       if (!hit && bead &&
+           bead->lrr_start + bead->lrr_count > ria->ria_start) {
+               /* to the end of current read window. */
+               mlen = bead->lrr_start + bead->lrr_count - ria->ria_start;
+               /* trim to RPC boundary */
+               start = ria->ria_start & (PTLRPC_MAX_BRW_PAGES - 1);
+               mlen = min(mlen, PTLRPC_MAX_BRW_PAGES - start);
+       }
 
-       reserved = ll_ra_count_get(ll_i2sbi(inode), ria, len);
+       reserved = ll_ra_count_get(ll_i2sbi(inode), ria, len, mlen);
        if (reserved < len)
-               ll_ra_stats_inc(mapping, RA_STAT_MAX_IN_FLIGHT);
+               ll_ra_stats_inc(inode, RA_STAT_MAX_IN_FLIGHT);
 
-       CDEBUG(D_READA, "reserved page %lu ra_cur %d ra_max %lu\n", reserved,
+       CDEBUG(D_READA, "reserved pages %lu/%lu/%lu, ra_cur %d, ra_max %lu\n",
+              reserved, len, mlen,
               atomic_read(&ll_i2sbi(inode)->ll_ra_info.ra_cur_pages),
               ll_i2sbi(inode)->ll_ra_info.ra_max_pages);
 
-       ret = ll_read_ahead_pages(env, io, queue,
-                                 ria, &reserved, mapping, &ra_end);
+       ret = ll_read_ahead_pages(env, io, queue, ria, &reserved, &ra_end);
 
        if (reserved != 0)
                ll_ra_count_put(ll_i2sbi(inode), reserved);
 
        if (ra_end == end + 1 && ra_end == (kms >> PAGE_CACHE_SHIFT))
-               ll_ra_stats_inc(mapping, RA_STAT_EOF);
+               ll_ra_stats_inc(inode, RA_STAT_EOF);
 
        /* if we didn't get to the end of the region we reserved from
         * the ras we need to go back and update the ras so that the
@@ -655,6 +682,7 @@ int ll_readahead(const struct lu_env *env, struct cl_io *io,
               ra_end, end, ria->ria_end);
 
        if (ra_end != end + 1) {
+               ll_ra_stats_inc(inode, RA_STAT_FAILED_REACH_END);
                spin_lock(&ras->ras_lock);
                if (ra_end < ras->ras_next_readahead &&
                    index_in_window(ra_end, ras->ras_window_start, 0,
@@ -925,15 +953,18 @@ void ras_update(struct ll_sb_info *sbi, struct inode *inode,
        ras->ras_last_readpage = index;
        ras_set_start(inode, ras, index);
 
-       if (stride_io_mode(ras))
+       if (stride_io_mode(ras)) {
                /* Since stride readahead is sensitive to the offset
                 * of read-ahead, so we use original offset here,
                 * instead of ras_window_start, which is RPC aligned
                 */
                ras->ras_next_readahead = max(index, ras->ras_next_readahead);
-       else
-               ras->ras_next_readahead = max(ras->ras_window_start,
-                                             ras->ras_next_readahead);
+       } else {
+               if (ras->ras_next_readahead < ras->ras_window_start)
+                       ras->ras_next_readahead = ras->ras_window_start;
+               if (!hit)
+                       ras->ras_next_readahead = index + 1;
+       }
        RAS_CDEBUG(ras);
 
        /* Trigger RA in the mmap case where ras_consecutive_requests
index ac9d615b78d90e6eb3a92209bdc1a72732c0e408..18127d3dc493a49eba2b047548f8c75669324fee 100644 (file)
@@ -1052,35 +1052,19 @@ static int vvp_io_read_page(const struct lu_env *env,
                            const struct cl_page_slice *slice)
 {
        struct cl_io          *io     = ios->cis_io;
-       struct cl_object          *obj    = slice->cpl_obj;
        struct ccc_page    *cp     = cl2ccc_page(slice);
        struct cl_page      *page   = slice->cpl_page;
-       struct inode          *inode  = ccc_object_inode(obj);
+       struct inode          *inode  = ccc_object_inode(slice->cpl_obj);
        struct ll_sb_info        *sbi    = ll_i2sbi(inode);
        struct ll_file_data       *fd     = cl2ccc_io(env, ios)->cui_fd;
        struct ll_readahead_state *ras    = &fd->fd_ras;
-       struct page             *vmpage = cp->cpg_page;
        struct cl_2queue          *queue  = &io->ci_queue;
-       int rc;
-
-       CLOBINVRNT(env, obj, ccc_object_invariant(obj));
-       LASSERT(slice->cpl_obj == obj);
 
        if (sbi->ll_ra_info.ra_max_pages_per_file &&
            sbi->ll_ra_info.ra_max_pages)
                ras_update(sbi, inode, ras, ccc_index(cp),
                           cp->cpg_defer_uptodate);
 
-       /* Sanity check whether the page is protected by a lock. */
-       rc = cl_page_is_under_lock(env, io, page);
-       if (rc != -EBUSY) {
-               CL_PAGE_HEADER(D_WARNING, env, page, "%s: %d\n",
-                              rc == -ENODATA ? "without a lock" :
-                              "match failed", rc);
-               if (rc != -ENODATA)
-                       return rc;
-       }
-
        if (cp->cpg_defer_uptodate) {
                cp->cpg_ra_used = 1;
                cl_page_export(env, page, 1);
@@ -1089,11 +1073,12 @@ static int vvp_io_read_page(const struct lu_env *env,
         * Add page into the queue even when it is marked uptodate above.
         * this will unlock it automatically as part of cl_page_list_disown().
         */
+
        cl_page_list_add(&queue->c2_qin, page);
        if (sbi->ll_ra_info.ra_max_pages_per_file &&
            sbi->ll_ra_info.ra_max_pages)
-               ll_readahead(env, io, ras,
-                            vmpage->mapping, &queue->c2_qin, fd->fd_flags);
+               ll_readahead(env, io, &queue->c2_qin, ras,
+                            cp->cpg_defer_uptodate);
 
        return 0;
 }
index d9f13c31091a79fb4e95c34676cbb55cff6b9d52..3c6b72398d626961ad511e5aee8aba047cf8760a 100644 (file)
@@ -142,7 +142,7 @@ static void vvp_page_discard(const struct lu_env *env,
        LASSERT(PageLocked(vmpage));
 
        if (cpg->cpg_defer_uptodate && !cpg->cpg_ra_used)
-               ll_ra_stats_inc(vmpage->mapping, RA_STAT_DISCARDED);
+               ll_ra_stats_inc(vmpage->mapping->host, RA_STAT_DISCARDED);
 
        ll_invalidate_page(vmpage);
 }
@@ -357,6 +357,20 @@ static int vvp_page_make_ready(const struct lu_env *env,
        return result;
 }
 
+static int vvp_page_is_under_lock(const struct lu_env *env,
+                                 const struct cl_page_slice *slice,
+                                 struct cl_io *io, pgoff_t *max_index)
+{
+       if (io->ci_type == CIT_READ || io->ci_type == CIT_WRITE ||
+           io->ci_type == CIT_FAULT) {
+               struct ccc_io *cio = ccc_env_io(env);
+
+               if (unlikely(cio->cui_fd->fd_flags & LL_FILE_GROUP_LOCKED))
+                       *max_index = CL_PAGE_EOF;
+       }
+       return 0;
+}
+
 static int vvp_page_print(const struct lu_env *env,
                          const struct cl_page_slice *slice,
                          void *cookie, lu_printer_t printer)
@@ -389,7 +403,7 @@ static const struct cl_page_operations vvp_page_ops = {
        .cpo_is_vmlocked   = vvp_page_is_vmlocked,
        .cpo_fini         = vvp_page_fini,
        .cpo_print       = vvp_page_print,
-       .cpo_is_under_lock = ccc_page_is_under_lock,
+       .cpo_is_under_lock = vvp_page_is_under_lock,
        .io = {
                [CRT_READ] = {
                        .cpo_prep       = vvp_page_prep_read,
@@ -495,7 +509,7 @@ static const struct cl_page_operations vvp_transient_page_ops = {
        .cpo_fini         = vvp_transient_page_fini,
        .cpo_is_vmlocked   = vvp_transient_page_is_vmlocked,
        .cpo_print       = vvp_page_print,
-       .cpo_is_under_lock = ccc_page_is_under_lock,
+       .cpo_is_under_lock      = vvp_page_is_under_lock,
        .io = {
                [CRT_READ] = {
                        .cpo_prep       = ccc_transient_page_prep,
@@ -516,7 +530,6 @@ int vvp_page_init(const struct lu_env *env, struct cl_object *obj,
 
        CLOBINVRNT(env, obj, ccc_object_invariant(obj));
 
-       cpg->cpg_cl.cpl_index = index;
        cpg->cpg_page = vmpage;
        page_cache_get(vmpage);
 
@@ -526,12 +539,13 @@ int vvp_page_init(const struct lu_env *env, struct cl_object *obj,
                atomic_inc(&page->cp_ref);
                SetPagePrivate(vmpage);
                vmpage->private = (unsigned long)page;
-               cl_page_slice_add(page, &cpg->cpg_cl, obj, &vvp_page_ops);
+               cl_page_slice_add(page, &cpg->cpg_cl, obj, index,
+                                 &vvp_page_ops);
        } else {
                struct ccc_object *clobj = cl2ccc(obj);
 
                LASSERT(!inode_trylock(clobj->cob_inode));
-               cl_page_slice_add(page, &cpg->cpg_cl, obj,
+               cl_page_slice_add(page, &cpg->cpg_cl, obj, index,
                                  &vvp_transient_page_ops);
                clobj->cob_transient_pages++;
        }
index b8e2315d521606cfb1583b645ff51f2d91feef82..9b3d13bf2a46955ef7e7473beeefe00478ac9811 100644 (file)
@@ -632,6 +632,7 @@ struct lov_lock_link *lov_lock_link_find(const struct lu_env *env,
                                         struct lovsub_lock *sub);
 struct lov_io_sub *lov_page_subio(const struct lu_env *env, struct lov_io *lio,
                                  const struct cl_page_slice *slice);
+int lov_page_stripe(const struct cl_page *page);
 
 #define lov_foreach_target(lov, var)               \
        for (var = 0; var < lov_targets_nr(lov); ++var)
index 590f9326af3787be99674a97591dec53091d671d..9985855c4e06ec03caaf128aa30ea5e27bc6cc88 100644 (file)
@@ -146,6 +146,8 @@ int lov_stripe_intersects(struct lov_stripe_md *lsm, int stripeno,
                          u64 start, u64 end,
                          u64 *obd_start, u64 *obd_end);
 int lov_stripe_number(struct lov_stripe_md *lsm, u64 lov_off);
+pgoff_t lov_stripe_pgoff(struct lov_stripe_md *lsm, pgoff_t stripe_index,
+                        int stripe);
 
 /* lov_qos.c */
 #define LOV_USES_ASSIGNED_STRIPE       0
index e5b2cfc5b662d630cf2670ca5a0a5ef8ba1e5468..ba79955f54bbd201d0963f182e071949f5bcca5f 100644 (file)
@@ -245,7 +245,7 @@ void lov_sub_put(struct lov_io_sub *sub)
  *
  */
 
-static int lov_page_stripe(const struct cl_page *page)
+int lov_page_stripe(const struct cl_page *page)
 {
        struct lovsub_object *subobj;
        const struct cl_page_slice *slice;
index ae83eb0f6f3625ddd8a7e7caa304e36138a5dd76..cb7b5161749840f396cb23763be7c6059d063541 100644 (file)
@@ -66,6 +66,19 @@ u64 lov_stripe_size(struct lov_stripe_md *lsm, u64 ost_size, int stripeno)
        return lov_size;
 }
 
+/**
+ * Compute file level page index by stripe level page offset
+ */
+pgoff_t lov_stripe_pgoff(struct lov_stripe_md *lsm, pgoff_t stripe_index,
+                        int stripe)
+{
+       loff_t offset;
+
+       offset = lov_stripe_size(lsm, stripe_index << PAGE_CACHE_SHIFT,
+                                stripe);
+       return offset >> PAGE_CACHE_SHIFT;
+}
+
 /* we have an offset in file backed by an lov and want to find out where
  * that offset lands in our given stripe of the file.  for the easy
  * case where the offset is within the stripe, we just have to scale the
index 0c508bd0f8ad2b2851c3a780342c9ba65f891d5b..9634c13a574d8b0155b76a7efeb3f6b3c1fc8968 100644 (file)
  * Lov page operations.
  *
  */
-static int lov_page_print(const struct lu_env *env,
-                         const struct cl_page_slice *slice,
-                         void *cookie, lu_printer_t printer)
+
+/**
+ * Adjust the stripe index by layout of raid0. @max_index is the maximum
+ * page index covered by an underlying DLM lock.
+ * This function converts max_index from stripe level to file level, and make
+ * sure it's not beyond one stripe.
+ */
+static int lov_raid0_page_is_under_lock(const struct lu_env *env,
+                                       const struct cl_page_slice *slice,
+                                       struct cl_io *unused,
+                                       pgoff_t *max_index)
+{
+       struct lov_object *loo = cl2lov(slice->cpl_obj);
+       struct lov_layout_raid0 *r0 = lov_r0(loo);
+       pgoff_t index = *max_index;
+       unsigned int pps; /* pages per stripe */
+
+       CDEBUG(D_READA, "*max_index = %lu, nr = %d\n", index, r0->lo_nr);
+       if (index == 0) /* the page is not covered by any lock */
+               return 0;
+
+       if (r0->lo_nr == 1) /* single stripe file */
+               return 0;
+
+       /* max_index is stripe level, convert it into file level */
+       if (index != CL_PAGE_EOF) {
+               int stripeno = lov_page_stripe(slice->cpl_page);
+               *max_index = lov_stripe_pgoff(loo->lo_lsm, index, stripeno);
+       }
+
+       /* calculate the end of current stripe */
+       pps = loo->lo_lsm->lsm_stripe_size >> PAGE_CACHE_SHIFT;
+       index = ((slice->cpl_index + pps) & ~(pps - 1)) - 1;
+
+       /* never exceed the end of the stripe */
+       *max_index = min_t(pgoff_t, *max_index, index);
+       return 0;
+}
+
+static int lov_raid0_page_print(const struct lu_env *env,
+                               const struct cl_page_slice *slice,
+                               void *cookie, lu_printer_t printer)
 {
        struct lov_page *lp = cl2lov_page(slice);
 
-       return (*printer)(env, cookie, LUSTRE_LOV_NAME"-page@%p\n", lp);
+       return (*printer)(env, cookie, LUSTRE_LOV_NAME "-page@%p, raid0\n", lp);
 }
 
-static const struct cl_page_operations lov_page_ops = {
-       .cpo_print  = lov_page_print
+static const struct cl_page_operations lov_raid0_page_ops = {
+       .cpo_is_under_lock = lov_raid0_page_is_under_lock,
+       .cpo_print  = lov_raid0_page_print
 };
 
 int lov_page_init_raid0(const struct lu_env *env, struct cl_object *obj,
@@ -86,7 +126,7 @@ int lov_page_init_raid0(const struct lu_env *env, struct cl_object *obj,
        rc = lov_stripe_offset(loo->lo_lsm, offset, stripe, &suboff);
        LASSERT(rc == 0);
 
-       cl_page_slice_add(page, &lpg->lps_cl, obj, &lov_page_ops);
+       cl_page_slice_add(page, &lpg->lps_cl, obj, index, &lov_raid0_page_ops);
 
        sub = lov_sub_get(env, lio, stripe);
        if (IS_ERR(sub))
@@ -107,7 +147,7 @@ int lov_page_init_raid0(const struct lu_env *env, struct cl_object *obj,
        return rc;
 }
 
-static int lov_page_empty_print(const struct lu_env *env,
+static int lov_empty_page_print(const struct lu_env *env,
                                const struct cl_page_slice *slice,
                                void *cookie, lu_printer_t printer)
 {
@@ -118,7 +158,7 @@ static int lov_page_empty_print(const struct lu_env *env,
 }
 
 static const struct cl_page_operations lov_empty_page_ops = {
-       .cpo_print = lov_page_empty_print
+       .cpo_print = lov_empty_page_print
 };
 
 int lov_page_init_empty(const struct lu_env *env, struct cl_object *obj,
@@ -127,7 +167,7 @@ int lov_page_init_empty(const struct lu_env *env, struct cl_object *obj,
        struct lov_page *lpg = cl_object_page_slice(obj, page);
        void *addr;
 
-       cl_page_slice_add(page, &lpg->lps_cl, obj, &lov_empty_page_ops);
+       cl_page_slice_add(page, &lpg->lps_cl, obj, index, &lov_empty_page_ops);
        addr = kmap(page->cp_vmpage);
        memset(addr, 0, cl_page_size(obj));
        kunmap(page->cp_vmpage);
index fb4c0ccee30c7ccad7173bd74ef0f75e373ea3f9..9badedcce2bfe20352481922e08d7ac55e585a17 100644 (file)
@@ -60,11 +60,11 @@ static const struct cl_page_operations lovsub_page_ops = {
 };
 
 int lovsub_page_init(const struct lu_env *env, struct cl_object *obj,
-                    struct cl_page *page, pgoff_t ind)
+                    struct cl_page *page, pgoff_t index)
 {
        struct lovsub_page *lsb = cl_object_page_slice(obj, page);
 
-       cl_page_slice_add(page, &lsb->lsb_cl, obj, &lovsub_page_ops);
+       cl_page_slice_add(page, &lsb->lsb_cl, obj, index, &lovsub_page_ops);
        return 0;
 }
 
index 86591ceac9c1831856a7f78f540b4390630f40db..65d6cee5232bd705424a81878fc268646555f25e 100644 (file)
@@ -733,7 +733,7 @@ int cl_io_read_page(const struct lu_env *env, struct cl_io *io,
                                break;
                }
        }
-       if (result == 0)
+       if (result == 0 && queue->c2_qin.pl_nr > 0)
                result = cl_io_submit_rw(env, io, CRT_READ, queue);
        /*
         * Unlock unsent pages in case of error.
index cb156739b25403e5aebb3effe8fac497ac20f178..506a9f94e5ba7302a01d6f25d911e553b37a31b2 100644 (file)
@@ -401,6 +401,30 @@ EXPORT_SYMBOL(cl_page_at);
        __result;                                                      \
 })
 
+#define CL_PAGE_INVOKE_REVERSE(_env, _page, _op, _proto, ...)          \
+({                                                                     \
+       const struct lu_env        *__env  = (_env);                    \
+       struct cl_page             *__page = (_page);                   \
+       const struct cl_page_slice *__scan;                             \
+       int                         __result;                           \
+       ptrdiff_t                   __op   = (_op);                     \
+       int                       (*__method)_proto;                    \
+                                                                       \
+       __result = 0;                                                   \
+       list_for_each_entry_reverse(__scan, &__page->cp_layers,         \
+                                       cpl_linkage) {                  \
+               __method = *(void **)((char *)__scan->cpl_ops +  __op); \
+               if (__method) {                                         \
+                       __result = (*__method)(__env, __scan, ## __VA_ARGS__); \
+                       if (__result != 0)                              \
+                               break;                                  \
+               }                                                       \
+       }                                                               \
+       if (__result > 0)                                               \
+               __result = 0;                                           \
+       __result;                                                       \
+})
+
 #define CL_PAGE_INVOID(_env, _page, _op, _proto, ...)             \
 do {                                                               \
        const struct lu_env     *__env  = (_env);                   \
@@ -928,17 +952,17 @@ EXPORT_SYMBOL(cl_page_flush);
  * \see cl_page_operations::cpo_is_under_lock()
  */
 int cl_page_is_under_lock(const struct lu_env *env, struct cl_io *io,
-                         struct cl_page *page)
+                         struct cl_page *page, pgoff_t *max_index)
 {
        int rc;
 
        PINVRNT(env, page, cl_page_invariant(page));
 
-       rc = CL_PAGE_INVOKE(env, page, CL_PAGE_OP(cpo_is_under_lock),
-                           (const struct lu_env *,
-                            const struct cl_page_slice *, struct cl_io *),
-                           io);
-       PASSERT(env, page, rc != 0);
+       rc = CL_PAGE_INVOKE_REVERSE(env, page, CL_PAGE_OP(cpo_is_under_lock),
+                                   (const struct lu_env *,
+                                    const struct cl_page_slice *,
+                                     struct cl_io *, pgoff_t *),
+                                   io, max_index);
        return rc;
 }
 EXPORT_SYMBOL(cl_page_is_under_lock);
@@ -1041,11 +1065,12 @@ EXPORT_SYMBOL(cl_page_size);
  * \see cl_lock_slice_add(), cl_req_slice_add(), cl_io_slice_add()
  */
 void cl_page_slice_add(struct cl_page *page, struct cl_page_slice *slice,
-                      struct cl_object *obj,
+                      struct cl_object *obj, pgoff_t index,
                       const struct cl_page_operations *ops)
 {
        list_add_tail(&slice->cpl_linkage, &page->cp_layers);
        slice->cpl_obj  = obj;
+       slice->cpl_index = index;
        slice->cpl_ops  = ops;
        slice->cpl_page = page;
 }
index db56081330e9a22403882e0697000792f897bad4..0d84d04c12de4c0c92940ccc0e091e0f07d4ef64 100644 (file)
@@ -365,7 +365,7 @@ static int echo_page_init(const struct lu_env *env, struct cl_object *obj,
 
        page_cache_get(page->cp_vmpage);
        mutex_init(&ep->ep_lock);
-       cl_page_slice_add(page, &ep->ep_cl, obj, &echo_page_ops);
+       cl_page_slice_add(page, &ep->ep_cl, obj, index, &echo_page_ops);
        atomic_inc(&eco->eo_npages);
        return 0;
 }
index 3e0a8c3f4844e66b354a8ae6d962e3f45927bf1c..e02dd33b637c4c5cbcc16fb4baf1e205606295cb 100644 (file)
@@ -132,17 +132,19 @@ void osc_index2policy(ldlm_policy_data_t *policy, const struct cl_object *obj,
 
 static int osc_page_is_under_lock(const struct lu_env *env,
                                  const struct cl_page_slice *slice,
-                                 struct cl_io *unused)
+                                 struct cl_io *unused, pgoff_t *max_index)
 {
        struct osc_page *opg = cl2osc_page(slice);
        struct cl_lock *lock;
        int result = -ENODATA;
 
+       *max_index = 0;
        lock = cl_lock_at_pgoff(env, slice->cpl_obj, osc_index(opg),
                                NULL, 1, 0);
        if (lock) {
+               *max_index = lock->cll_descr.cld_end;
                cl_lock_put(env, lock);
-               result = -EBUSY;
+               result = 0;
        }
        return result;
 }
@@ -308,7 +310,6 @@ int osc_page_init(const struct lu_env *env, struct cl_object *obj,
 
        opg->ops_from = 0;
        opg->ops_to = PAGE_CACHE_SIZE;
-       opg->ops_cl.cpl_index = index;
 
        result = osc_prep_async_page(osc, opg, page->cp_vmpage,
                                     cl_offset(obj, index));
@@ -316,7 +317,8 @@ int osc_page_init(const struct lu_env *env, struct cl_object *obj,
                struct osc_io *oio = osc_env_io(env);
 
                opg->ops_srvlock = osc_io_srvlock(oio);
-               cl_page_slice_add(page, &opg->ops_cl, obj, &osc_page_ops);
+               cl_page_slice_add(page, &opg->ops_cl, obj, index,
+                                 &osc_page_ops);
        }
        /*
         * Cannot assert osc_page_protected() here as read-ahead
This page took 0.040767 seconds and 5 git commands to generate.