staging/lustre/osc: glimpse lock should match only with granted locks
[deliverable/linux.git] / drivers / staging / lustre / lustre / osc / osc_request.c
index 30526ebcad04e52e8e6830fc2d1952c19e3ebd69..536b868ff776b8ad38f93c23f628e4e875991785 100644 (file)
  *
  * You should have received a copy of the GNU General Public License
  * version 2 along with this program; If not, see
- * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
- *
- * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
- * CA 95054 USA or visit www.sun.com if you need additional information or
- * have any questions.
+ * http://www.gnu.org/licenses/gpl-2.0.html
  *
  * GPL HEADER END
  */
@@ -92,12 +88,13 @@ struct osc_fsync_args {
 
 struct osc_enqueue_args {
        struct obd_export       *oa_exp;
+       enum ldlm_type          oa_type;
+       enum ldlm_mode          oa_mode;
        __u64               *oa_flags;
-       obd_enqueue_update_f      oa_upcall;
+       osc_enqueue_upcall_f    oa_upcall;
        void                 *oa_cookie;
        struct ost_lvb     *oa_lvb;
-       struct lustre_handle     *oa_lockh;
-       struct ldlm_enqueue_info *oa_ei;
+       struct lustre_handle    oa_lockh;
        unsigned int          oa_agl:1;
 };
 
@@ -473,7 +470,8 @@ static int osc_real_create(struct obd_export *exp, struct obdo *oa,
                DEBUG_REQ(D_HA, req,
                          "delorphan from OST integration");
                /* Don't resend the delorphan req */
-               req->rq_no_resend = req->rq_no_delay = 1;
+               req->rq_no_resend = 1;
+               req->rq_no_delay = 1;
        }
 
        rc = ptlrpc_queue_wait(req);
@@ -801,21 +799,24 @@ static void osc_announce_cached(struct client_obd *cli, struct obdo *oa,
        LASSERT(!(oa->o_valid & bits));
 
        oa->o_valid |= bits;
-       client_obd_list_lock(&cli->cl_loi_list_lock);
+       spin_lock(&cli->cl_loi_list_lock);
        oa->o_dirty = cli->cl_dirty;
        if (unlikely(cli->cl_dirty - cli->cl_dirty_transit >
                     cli->cl_dirty_max)) {
                CERROR("dirty %lu - %lu > dirty_max %lu\n",
                       cli->cl_dirty, cli->cl_dirty_transit, cli->cl_dirty_max);
                oa->o_undirty = 0;
-       } else if (unlikely(atomic_read(&obd_dirty_pages) -
+       } else if (unlikely(atomic_read(&obd_unstable_pages) +
+                           atomic_read(&obd_dirty_pages) -
                            atomic_read(&obd_dirty_transit_pages) >
                            (long)(obd_max_dirty_pages + 1))) {
                /* The atomic_read() allowing the atomic_inc() are
                 * not covered by a lock thus they may safely race and trip
                 * this CERROR() unless we add in a small fudge factor (+1).
                 */
-               CERROR("dirty %d - %d > system dirty_max %d\n",
+               CERROR("%s: dirty %d + %d - %d > system dirty_max %d\n",
+                      cli->cl_import->imp_obd->obd_name,
+                      atomic_read(&obd_unstable_pages),
                       atomic_read(&obd_dirty_pages),
                       atomic_read(&obd_dirty_transit_pages),
                       obd_max_dirty_pages);
@@ -833,10 +834,9 @@ static void osc_announce_cached(struct client_obd *cli, struct obdo *oa,
        oa->o_grant = cli->cl_avail_grant + cli->cl_reserved_grant;
        oa->o_dropped = cli->cl_lost_grant;
        cli->cl_lost_grant = 0;
-       client_obd_list_unlock(&cli->cl_loi_list_lock);
+       spin_unlock(&cli->cl_loi_list_lock);
        CDEBUG(D_CACHE, "dirty: %llu undirty: %u dropped %u grant: %llu\n",
               oa->o_dirty, oa->o_undirty, oa->o_dropped, oa->o_grant);
-
 }
 
 void osc_update_next_shrink(struct client_obd *cli)
@@ -849,9 +849,9 @@ void osc_update_next_shrink(struct client_obd *cli)
 
 static void __osc_update_grant(struct client_obd *cli, u64 grant)
 {
-       client_obd_list_lock(&cli->cl_loi_list_lock);
+       spin_lock(&cli->cl_loi_list_lock);
        cli->cl_avail_grant += grant;
-       client_obd_list_unlock(&cli->cl_loi_list_lock);
+       spin_unlock(&cli->cl_loi_list_lock);
 }
 
 static void osc_update_grant(struct client_obd *cli, struct ost_body *body)
@@ -889,10 +889,10 @@ out:
 
 static void osc_shrink_grant_local(struct client_obd *cli, struct obdo *oa)
 {
-       client_obd_list_lock(&cli->cl_loi_list_lock);
+       spin_lock(&cli->cl_loi_list_lock);
        oa->o_grant = cli->cl_avail_grant / 4;
        cli->cl_avail_grant -= oa->o_grant;
-       client_obd_list_unlock(&cli->cl_loi_list_lock);
+       spin_unlock(&cli->cl_loi_list_lock);
        if (!(oa->o_valid & OBD_MD_FLFLAGS)) {
                oa->o_valid |= OBD_MD_FLFLAGS;
                oa->o_flags = 0;
@@ -911,10 +911,10 @@ static int osc_shrink_grant(struct client_obd *cli)
        __u64 target_bytes = (cli->cl_max_rpcs_in_flight + 1) *
                             (cli->cl_max_pages_per_rpc << PAGE_SHIFT);
 
-       client_obd_list_lock(&cli->cl_loi_list_lock);
+       spin_lock(&cli->cl_loi_list_lock);
        if (cli->cl_avail_grant <= target_bytes)
                target_bytes = cli->cl_max_pages_per_rpc << PAGE_SHIFT;
-       client_obd_list_unlock(&cli->cl_loi_list_lock);
+       spin_unlock(&cli->cl_loi_list_lock);
 
        return osc_shrink_grant_to_target(cli, target_bytes);
 }
@@ -924,7 +924,7 @@ int osc_shrink_grant_to_target(struct client_obd *cli, __u64 target_bytes)
        int rc = 0;
        struct ost_body *body;
 
-       client_obd_list_lock(&cli->cl_loi_list_lock);
+       spin_lock(&cli->cl_loi_list_lock);
        /* Don't shrink if we are already above or below the desired limit
         * We don't want to shrink below a single RPC, as that will negatively
         * impact block allocation and long-term performance.
@@ -933,10 +933,10 @@ int osc_shrink_grant_to_target(struct client_obd *cli, __u64 target_bytes)
                target_bytes = cli->cl_max_pages_per_rpc << PAGE_SHIFT;
 
        if (target_bytes >= cli->cl_avail_grant) {
-               client_obd_list_unlock(&cli->cl_loi_list_lock);
+               spin_unlock(&cli->cl_loi_list_lock);
                return 0;
        }
-       client_obd_list_unlock(&cli->cl_loi_list_lock);
+       spin_unlock(&cli->cl_loi_list_lock);
 
        body = kzalloc(sizeof(*body), GFP_NOFS);
        if (!body)
@@ -944,10 +944,10 @@ int osc_shrink_grant_to_target(struct client_obd *cli, __u64 target_bytes)
 
        osc_announce_cached(cli, &body->oa, 0);
 
-       client_obd_list_lock(&cli->cl_loi_list_lock);
+       spin_lock(&cli->cl_loi_list_lock);
        body->oa.o_grant = cli->cl_avail_grant - target_bytes;
        cli->cl_avail_grant = target_bytes;
-       client_obd_list_unlock(&cli->cl_loi_list_lock);
+       spin_unlock(&cli->cl_loi_list_lock);
        if (!(body->oa.o_valid & OBD_MD_FLFLAGS)) {
                body->oa.o_valid |= OBD_MD_FLFLAGS;
                body->oa.o_flags = 0;
@@ -1035,7 +1035,7 @@ static void osc_init_grant(struct client_obd *cli, struct obd_connect_data *ocd)
         * race is tolerable here: if we're evicted, but imp_state already
         * left EVICTED state, then cl_dirty must be 0 already.
         */
-       client_obd_list_lock(&cli->cl_loi_list_lock);
+       spin_lock(&cli->cl_loi_list_lock);
        if (cli->cl_import->imp_state == LUSTRE_IMP_EVICTED)
                cli->cl_avail_grant = ocd->ocd_grant;
        else
@@ -1053,7 +1053,7 @@ static void osc_init_grant(struct client_obd *cli, struct obd_connect_data *ocd)
 
        /* determine the appropriate chunk size used by osc_extent. */
        cli->cl_chunkbits = max_t(int, PAGE_SHIFT, ocd->ocd_blocksize);
-       client_obd_list_unlock(&cli->cl_loi_list_lock);
+       spin_unlock(&cli->cl_loi_list_lock);
 
        CDEBUG(D_CACHE, "%s, setting cl_avail_grant: %ld cl_lost_grant: %ld chunk bits: %d\n",
               cli->cl_import->imp_obd->obd_name,
@@ -1082,7 +1082,7 @@ static void handle_short_read(int nob_read, u32 page_count,
                if (pga[i]->count > nob_read) {
                        /* EOF inside this page */
                        ptr = kmap(pga[i]->pg) +
-                               (pga[i]->off & ~CFS_PAGE_MASK);
+                               (pga[i]->off & ~PAGE_MASK);
                        memset(ptr + nob_read, 0, pga[i]->count - nob_read);
                        kunmap(pga[i]->pg);
                        page_count--;
@@ -1097,7 +1097,7 @@ static void handle_short_read(int nob_read, u32 page_count,
 
        /* zero remaining pages */
        while (page_count-- > 0) {
-               ptr = kmap(pga[i]->pg) + (pga[i]->off & ~CFS_PAGE_MASK);
+               ptr = kmap(pga[i]->pg) + (pga[i]->off & ~PAGE_MASK);
                memset(ptr, 0, pga[i]->count);
                kunmap(pga[i]->pg);
                i++;
@@ -1144,7 +1144,8 @@ static inline int can_merge_pages(struct brw_page *p1, struct brw_page *p2)
 {
        if (p1->flag != p2->flag) {
                unsigned mask = ~(OBD_BRW_FROM_GRANT | OBD_BRW_NOCACHE |
-                                 OBD_BRW_SYNC | OBD_BRW_ASYNC|OBD_BRW_NOQUOTA);
+                                 OBD_BRW_SYNC | OBD_BRW_ASYNC |
+                                 OBD_BRW_NOQUOTA | OBD_BRW_SOFT_SYNC);
 
                /* warn if we try to combine flags that we don't know to be
                 * safe to combine
@@ -1188,32 +1189,29 @@ static u32 osc_checksum_bulk(int nob, u32 pg_count,
                if (i == 0 && opc == OST_READ &&
                    OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_RECEIVE)) {
                        unsigned char *ptr = kmap(pga[i]->pg);
-                       int off = pga[i]->off & ~CFS_PAGE_MASK;
+                       int off = pga[i]->off & ~PAGE_MASK;
 
                        memcpy(ptr + off, "bad1", min(4, nob));
                        kunmap(pga[i]->pg);
                }
                cfs_crypto_hash_update_page(hdesc, pga[i]->pg,
-                                           pga[i]->off & ~CFS_PAGE_MASK,
+                                           pga[i]->off & ~PAGE_MASK,
                                  count);
                CDEBUG(D_PAGE,
                       "page %p map %p index %lu flags %lx count %u priv %0lx: off %d\n",
                       pga[i]->pg, pga[i]->pg->mapping, pga[i]->pg->index,
                       (long)pga[i]->pg->flags, page_count(pga[i]->pg),
                       page_private(pga[i]->pg),
-                      (int)(pga[i]->off & ~CFS_PAGE_MASK));
+                      (int)(pga[i]->off & ~PAGE_MASK));
 
                nob -= pga[i]->count;
                pg_count--;
                i++;
        }
 
-       bufsize = 4;
+       bufsize = sizeof(cksum);
        err = cfs_crypto_hash_final(hdesc, (unsigned char *)&cksum, &bufsize);
 
-       if (err)
-               cfs_crypto_hash_final(hdesc, NULL, NULL);
-
        /* For sending we only compute the wrong checksum instead
         * of corrupting the data so it is still correct on a redo
         */
@@ -1312,7 +1310,7 @@ static int osc_brw_prep_request(int cmd, struct client_obd *cli,
        pg_prev = pga[0];
        for (requested_nob = i = 0; i < page_count; i++, niobuf++) {
                struct brw_page *pg = pga[i];
-               int poff = pg->off & ~CFS_PAGE_MASK;
+               int poff = pg->off & ~PAGE_MASK;
 
                LASSERT(pg->count > 0);
                /* make sure there is no gap in the middle of page array */
@@ -1658,6 +1656,7 @@ static int osc_brw_redo_request(struct ptlrpc_request *request,
        aa->aa_resends++;
        new_req->rq_interpret_reply = request->rq_interpret_reply;
        new_req->rq_async_args = request->rq_async_args;
+       new_req->rq_commit_cb = request->rq_commit_cb;
        /* cap resend delay to the current request timeout, this is similar to
         * what ptlrpc does (see after_reply())
         */
@@ -1737,7 +1736,6 @@ static int brw_interpret(const struct lu_env *env,
        struct osc_brw_async_args *aa = data;
        struct osc_extent *ext;
        struct osc_extent *tmp;
-       struct cl_object *obj = NULL;
        struct client_obd *cli = aa->aa_cli;
 
        rc = osc_brw_fini_request(req, rc);
@@ -1766,24 +1764,17 @@ static int brw_interpret(const struct lu_env *env,
                        rc = -EIO;
        }
 
-       list_for_each_entry_safe(ext, tmp, &aa->aa_exts, oe_link) {
-               if (!obj && rc == 0) {
-                       obj = osc2cl(ext->oe_obj);
-                       cl_object_get(obj);
-               }
-
-               list_del_init(&ext->oe_link);
-               osc_extent_finish(env, ext, 1, rc);
-       }
-       LASSERT(list_empty(&aa->aa_exts));
-       LASSERT(list_empty(&aa->aa_oaps));
-
-       if (obj) {
+       if (rc == 0) {
                struct obdo *oa = aa->aa_oa;
                struct cl_attr *attr  = &osc_env_info(env)->oti_attr;
                unsigned long valid = 0;
+               struct cl_object *obj;
+               struct osc_async_page *last;
 
-               LASSERT(rc == 0);
+               last = brw_page2oap(aa->aa_ppga[aa->aa_page_count - 1]);
+               obj = osc2cl(last->oap_obj);
+
+               cl_object_attr_lock(obj);
                if (oa->o_valid & OBD_MD_FLBLOCKS) {
                        attr->cat_blocks = oa->o_blocks;
                        valid |= CAT_BLOCKS;
@@ -1800,21 +1791,45 @@ static int brw_interpret(const struct lu_env *env,
                        attr->cat_ctime = oa->o_ctime;
                        valid |= CAT_CTIME;
                }
-               if (valid != 0) {
-                       cl_object_attr_lock(obj);
-                       cl_object_attr_set(env, obj, attr, valid);
-                       cl_object_attr_unlock(obj);
+
+               if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
+                       struct lov_oinfo *loi = cl2osc(obj)->oo_oinfo;
+                       loff_t last_off = last->oap_count + last->oap_obj_off;
+
+                       /* Change file size if this is an out of quota or
+                        * direct IO write and it extends the file size
+                        */
+                       if (loi->loi_lvb.lvb_size < last_off) {
+                               attr->cat_size = last_off;
+                               valid |= CAT_SIZE;
+                       }
+                       /* Extend KMS if it's not a lockless write */
+                       if (loi->loi_kms < last_off &&
+                           oap2osc_page(last)->ops_srvlock == 0) {
+                               attr->cat_kms = last_off;
+                               valid |= CAT_KMS;
+                       }
                }
-               cl_object_put(env, obj);
+
+               if (valid != 0)
+                       cl_object_attr_set(env, obj, attr, valid);
+               cl_object_attr_unlock(obj);
        }
        kmem_cache_free(obdo_cachep, aa->aa_oa);
 
+       list_for_each_entry_safe(ext, tmp, &aa->aa_exts, oe_link) {
+               list_del_init(&ext->oe_link);
+               osc_extent_finish(env, ext, 1, rc);
+       }
+       LASSERT(list_empty(&aa->aa_exts));
+       LASSERT(list_empty(&aa->aa_oaps));
+
        cl_req_completion(env, aa->aa_clerq, rc < 0 ? rc :
                          req->rq_bulk->bd_nob_transferred);
        osc_release_ppga(aa->aa_ppga, aa->aa_page_count);
        ptlrpc_lprocfs_brw(req, req->rq_bulk->bd_nob_transferred);
 
-       client_obd_list_lock(&cli->cl_loi_list_lock);
+       spin_lock(&cli->cl_loi_list_lock);
        /* We need to decrement before osc_ap_completion->osc_wake_cache_waiters
         * is called so we know whether to go to sync BRWs or wait for more
         * RPCs to complete
@@ -1824,12 +1839,31 @@ static int brw_interpret(const struct lu_env *env,
        else
                cli->cl_r_in_flight--;
        osc_wake_cache_waiters(cli);
-       client_obd_list_unlock(&cli->cl_loi_list_lock);
+       spin_unlock(&cli->cl_loi_list_lock);
 
        osc_io_unplug(env, cli, NULL);
        return rc;
 }
 
+static void brw_commit(struct ptlrpc_request *req)
+{
+       spin_lock(&req->rq_lock);
+       /*
+        * If osc_inc_unstable_pages (via osc_extent_finish) races with
+        * this called via the rq_commit_cb, I need to ensure
+        * osc_dec_unstable_pages is still called. Otherwise unstable
+        * pages may be leaked.
+        */
+       if (req->rq_unstable) {
+               spin_unlock(&req->rq_lock);
+               osc_dec_unstable_pages(req);
+               spin_lock(&req->rq_lock);
+       } else {
+               req->rq_committed = 1;
+       }
+       spin_unlock(&req->rq_lock);
+}
+
 /**
  * Build an RPC by the list of extent @ext_list. The caller must ensure
  * that the total pages in this list are NOT over max pages per RPC.
@@ -1920,7 +1954,7 @@ int osc_build_rpc(const struct lu_env *env, struct client_obd *cli,
                pga[i] = &oap->oap_brw_page;
                pga[i]->off = oap->oap_obj_off + oap->oap_page_off;
                CDEBUG(0, "put page %p index %lu oap %p flg %x to pga\n",
-                      pga[i]->pg, page_index(oap->oap_page), oap,
+                      pga[i]->pg, oap->oap_page->index, oap,
                       pga[i]->flag);
                i++;
                cl_req_page_add(env, clerq, page);
@@ -1949,6 +1983,7 @@ int osc_build_rpc(const struct lu_env *env, struct client_obd *cli,
                goto out;
        }
 
+       req->rq_commit_cb = brw_commit;
        req->rq_interpret_reply = brw_interpret;
 
        if (mem_tight != 0)
@@ -1992,7 +2027,7 @@ int osc_build_rpc(const struct lu_env *env, struct client_obd *cli,
        if (tmp)
                tmp->oap_request = ptlrpc_request_addref(req);
 
-       client_obd_list_lock(&cli->cl_loi_list_lock);
+       spin_lock(&cli->cl_loi_list_lock);
        starting_offset >>= PAGE_SHIFT;
        if (cmd == OBD_BRW_READ) {
                cli->cl_r_in_flight++;
@@ -2007,7 +2042,7 @@ int osc_build_rpc(const struct lu_env *env, struct client_obd *cli,
                lprocfs_oh_tally_log2(&cli->cl_write_offset_hist,
                                      starting_offset + 1);
        }
-       client_obd_list_unlock(&cli->cl_loi_list_lock);
+       spin_unlock(&cli->cl_loi_list_lock);
 
        DEBUG_REQ(D_INODE, req, "%d pages, aa %p. now %dr/%dw in flight",
                  page_count, aa, cli->cl_r_in_flight,
@@ -2055,14 +2090,12 @@ static int osc_set_lock_data_with_check(struct ldlm_lock *lock,
        LASSERT(lock->l_glimpse_ast == einfo->ei_cb_gl);
 
        lock_res_and_lock(lock);
-       spin_lock(&osc_ast_guard);
 
        if (!lock->l_ast_data)
                lock->l_ast_data = data;
        if (lock->l_ast_data == data)
                set = 1;
 
-       spin_unlock(&osc_ast_guard);
        unlock_res_and_lock(lock);
 
        return set;
@@ -2104,36 +2137,38 @@ static int osc_find_cbdata(struct obd_export *exp, struct lov_stripe_md *lsm,
        return rc;
 }
 
-static int osc_enqueue_fini(struct ptlrpc_request *req, struct ost_lvb *lvb,
-                           obd_enqueue_update_f upcall, void *cookie,
-                           __u64 *flags, int agl, int rc)
+static int osc_enqueue_fini(struct ptlrpc_request *req,
+                           osc_enqueue_upcall_f upcall, void *cookie,
+                           struct lustre_handle *lockh, enum ldlm_mode mode,
+                           __u64 *flags, int agl, int errcode)
 {
-       int intent = *flags & LDLM_FL_HAS_INTENT;
-
-       if (intent) {
-               /* The request was created before ldlm_cli_enqueue call. */
-               if (rc == ELDLM_LOCK_ABORTED) {
-                       struct ldlm_reply *rep;
+       bool intent = *flags & LDLM_FL_HAS_INTENT;
+       int rc;
 
-                       rep = req_capsule_server_get(&req->rq_pill,
-                                                    &RMF_DLM_REP);
+       /* The request was created before ldlm_cli_enqueue call. */
+       if (intent && errcode == ELDLM_LOCK_ABORTED) {
+               struct ldlm_reply *rep;
 
-                       rep->lock_policy_res1 =
-                               ptlrpc_status_ntoh(rep->lock_policy_res1);
-                       if (rep->lock_policy_res1)
-                               rc = rep->lock_policy_res1;
-               }
-       }
+               rep = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
 
-       if ((intent != 0 && rc == ELDLM_LOCK_ABORTED && agl == 0) ||
-           (rc == 0)) {
+               rep->lock_policy_res1 =
+                       ptlrpc_status_ntoh(rep->lock_policy_res1);
+               if (rep->lock_policy_res1)
+                       errcode = rep->lock_policy_res1;
+               if (!agl)
+                       *flags |= LDLM_FL_LVB_READY;
+       } else if (errcode == ELDLM_OK) {
                *flags |= LDLM_FL_LVB_READY;
-               CDEBUG(D_INODE, "got kms %llu blocks %llu mtime %llu\n",
-                      lvb->lvb_size, lvb->lvb_blocks, lvb->lvb_mtime);
        }
 
        /* Call the update callback. */
-       rc = (*upcall)(cookie, rc);
+       rc = (*upcall)(cookie, lockh, errcode);
+       /* release the reference taken in ldlm_cli_enqueue() */
+       if (errcode == ELDLM_LOCK_MATCHED)
+               errcode = ELDLM_OK;
+       if (errcode == ELDLM_OK && lustre_handle_is_used(lockh))
+               ldlm_lock_decref(lockh, mode);
+
        return rc;
 }
 
@@ -2142,62 +2177,50 @@ static int osc_enqueue_interpret(const struct lu_env *env,
                                 struct osc_enqueue_args *aa, int rc)
 {
        struct ldlm_lock *lock;
-       struct lustre_handle handle;
-       __u32 mode;
-       struct ost_lvb *lvb;
-       __u32 lvb_len;
-       __u64 *flags = aa->oa_flags;
-
-       /* Make a local copy of a lock handle and a mode, because aa->oa_*
-        * might be freed anytime after lock upcall has been called.
-        */
-       lustre_handle_copy(&handle, aa->oa_lockh);
-       mode = aa->oa_ei->ei_mode;
+       struct lustre_handle *lockh = &aa->oa_lockh;
+       enum ldlm_mode mode = aa->oa_mode;
+       struct ost_lvb *lvb = aa->oa_lvb;
+       __u32 lvb_len = sizeof(*lvb);
+       __u64 flags = 0;
+
 
        /* ldlm_cli_enqueue is holding a reference on the lock, so it must
         * be valid.
         */
-       lock = ldlm_handle2lock(&handle);
+       lock = ldlm_handle2lock(lockh);
+       LASSERTF(lock, "lockh %llx, req %p, aa %p - client evicted?\n",
+                lockh->cookie, req, aa);
 
        /* Take an additional reference so that a blocking AST that
         * ldlm_cli_enqueue_fini() might post for a failed lock, is guaranteed
         * to arrive after an upcall has been executed by
         * osc_enqueue_fini().
         */
-       ldlm_lock_addref(&handle, mode);
+       ldlm_lock_addref(lockh, mode);
+
+       /* Let cl_lock_state_wait fail with -ERESTARTSYS to unuse sublocks. */
+       OBD_FAIL_TIMEOUT(OBD_FAIL_LDLM_ENQUEUE_HANG, 2);
 
        /* Let CP AST to grant the lock first. */
        OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_ENQ_RACE, 1);
 
-       if (aa->oa_agl && rc == ELDLM_LOCK_ABORTED) {
-               lvb = NULL;
-               lvb_len = 0;
-       } else {
-               lvb = aa->oa_lvb;
-               lvb_len = sizeof(*aa->oa_lvb);
+       if (aa->oa_agl) {
+               LASSERT(!aa->oa_lvb);
+               LASSERT(!aa->oa_flags);
+               aa->oa_flags = &flags;
        }
 
        /* Complete obtaining the lock procedure. */
-       rc = ldlm_cli_enqueue_fini(aa->oa_exp, req, aa->oa_ei->ei_type, 1,
-                                  mode, flags, lvb, lvb_len, &handle, rc);
+       rc = ldlm_cli_enqueue_fini(aa->oa_exp, req, aa->oa_type, 1,
+                                  aa->oa_mode, aa->oa_flags, lvb, lvb_len,
+                                  lockh, rc);
        /* Complete osc stuff. */
-       rc = osc_enqueue_fini(req, aa->oa_lvb, aa->oa_upcall, aa->oa_cookie,
-                             flags, aa->oa_agl, rc);
+       rc = osc_enqueue_fini(req, aa->oa_upcall, aa->oa_cookie, lockh, mode,
+                             aa->oa_flags, aa->oa_agl, rc);
 
        OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_CANCEL_RACE, 10);
 
-       /* Release the lock for async request. */
-       if (lustre_handle_is_used(&handle) && rc == ELDLM_OK)
-               /*
-                * Releases a reference taken by ldlm_cli_enqueue(), if it is
-                * not already released by
-                * ldlm_cli_enqueue_fini()->failed_lock_cleanup()
-                */
-               ldlm_lock_decref(&handle, mode);
-
-       LASSERTF(lock, "lockh %p, req %p, aa %p - client evicted?\n",
-                aa->oa_lockh, req, aa);
-       ldlm_lock_decref(&handle, mode);
+       ldlm_lock_decref(lockh, mode);
        LDLM_LOCK_PUT(lock);
        return rc;
 }
@@ -2209,29 +2232,29 @@ struct ptlrpc_request_set *PTLRPCD_SET = (void *)1;
  * other synchronous requests, however keeping some locks and trying to obtain
  * others may take a considerable amount of time in a case of ost failure; and
  * when other sync requests do not get released lock from a client, the client
- * is excluded from the cluster -- such scenarious make the life difficult, so
+ * is evicted from the cluster -- such scenaries make the life difficult, so
  * release locks just after they are obtained.
  */
 int osc_enqueue_base(struct obd_export *exp, struct ldlm_res_id *res_id,
                     __u64 *flags, ldlm_policy_data_t *policy,
                     struct ost_lvb *lvb, int kms_valid,
-                    obd_enqueue_update_f upcall, void *cookie,
+                    osc_enqueue_upcall_f upcall, void *cookie,
                     struct ldlm_enqueue_info *einfo,
-                    struct lustre_handle *lockh,
                     struct ptlrpc_request_set *rqset, int async, int agl)
 {
        struct obd_device *obd = exp->exp_obd;
+       struct lustre_handle lockh = { 0 };
        struct ptlrpc_request *req = NULL;
        int intent = *flags & LDLM_FL_HAS_INTENT;
-       __u64 match_lvb = (agl != 0 ? 0 : LDLM_FL_LVB_READY);
+       __u64 match_flags = *flags;
        enum ldlm_mode mode;
        int rc;
 
        /* Filesystem lock extents are extended to page boundaries so that
         * dealing with the page cache is a little smoother.
         */
-       policy->l_extent.start -= policy->l_extent.start & ~CFS_PAGE_MASK;
-       policy->l_extent.end |= ~CFS_PAGE_MASK;
+       policy->l_extent.start -= policy->l_extent.start & ~PAGE_MASK;
+       policy->l_extent.end |= ~PAGE_MASK;
 
        /*
         * kms is not valid when either object is completely fresh (so that no
@@ -2258,65 +2281,51 @@ int osc_enqueue_base(struct obd_export *exp, struct ldlm_res_id *res_id,
        mode = einfo->ei_mode;
        if (einfo->ei_mode == LCK_PR)
                mode |= LCK_PW;
-       mode = ldlm_lock_match(obd->obd_namespace, *flags | match_lvb, res_id,
-                              einfo->ei_type, policy, mode, lockh, 0);
+       if (agl == 0)
+               match_flags |= LDLM_FL_LVB_READY;
+       if (intent != 0)
+               match_flags |= LDLM_FL_BLOCK_GRANTED;
+       mode = ldlm_lock_match(obd->obd_namespace, match_flags, res_id,
+                              einfo->ei_type, policy, mode, &lockh, 0);
        if (mode) {
-               struct ldlm_lock *matched = ldlm_handle2lock(lockh);
+               struct ldlm_lock *matched;
 
-               if ((agl != 0) && !(matched->l_flags & LDLM_FL_LVB_READY)) {
-                       /* For AGL, if enqueue RPC is sent but the lock is not
-                        * granted, then skip to process this strpe.
-                        * Return -ECANCELED to tell the caller.
+               if (*flags & LDLM_FL_TEST_LOCK)
+                       return ELDLM_OK;
+
+               matched = ldlm_handle2lock(&lockh);
+               if (agl) {
+                       /* AGL enqueues DLM locks speculatively. Therefore if
+                        * it already exists a DLM lock, it wll just inform the
+                        * caller to cancel the AGL process for this stripe.
                         */
-                       ldlm_lock_decref(lockh, mode);
+                       ldlm_lock_decref(&lockh, mode);
                        LDLM_LOCK_PUT(matched);
                        return -ECANCELED;
-               }
-
-               if (osc_set_lock_data_with_check(matched, einfo)) {
+               } else if (osc_set_lock_data_with_check(matched, einfo)) {
                        *flags |= LDLM_FL_LVB_READY;
-                       /* addref the lock only if not async requests and PW
-                        * lock is matched whereas we asked for PR.
-                        */
-                       if (!rqset && einfo->ei_mode != mode)
-                               ldlm_lock_addref(lockh, LCK_PR);
-                       if (intent) {
-                               /* I would like to be able to ASSERT here that
-                                * rss <= kms, but I can't, for reasons which
-                                * are explained in lov_enqueue()
-                                */
-                       }
-
-                       /* We already have a lock, and it's referenced.
-                        *
-                        * At this point, the cl_lock::cll_state is CLS_QUEUING,
-                        * AGL upcall may change it to CLS_HELD directly.
-                        */
-                       (*upcall)(cookie, ELDLM_OK);
+                       /* We already have a lock, and it's referenced. */
+                       (*upcall)(cookie, &lockh, ELDLM_LOCK_MATCHED);
 
-                       if (einfo->ei_mode != mode)
-                               ldlm_lock_decref(lockh, LCK_PW);
-                       else if (rqset)
-                               /* For async requests, decref the lock. */
-                               ldlm_lock_decref(lockh, einfo->ei_mode);
+                       ldlm_lock_decref(&lockh, mode);
                        LDLM_LOCK_PUT(matched);
                        return ELDLM_OK;
+               } else {
+                       ldlm_lock_decref(&lockh, mode);
+                       LDLM_LOCK_PUT(matched);
                }
-
-               ldlm_lock_decref(lockh, mode);
-               LDLM_LOCK_PUT(matched);
        }
 
- no_match:
+no_match:
+       if (*flags & LDLM_FL_TEST_LOCK)
+               return -ENOLCK;
        if (intent) {
-               LIST_HEAD(cancels);
-
                req = ptlrpc_request_alloc(class_exp2cliimp(exp),
                                           &RQF_LDLM_ENQUEUE_LVB);
                if (!req)
                        return -ENOMEM;
 
-               rc = ldlm_prep_enqueue_req(exp, req, &cancels, 0);
+               rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
                if (rc) {
                        ptlrpc_request_free(req);
                        return rc;
@@ -2331,21 +2340,31 @@ int osc_enqueue_base(struct obd_export *exp, struct ldlm_res_id *res_id,
        *flags &= ~LDLM_FL_BLOCK_GRANTED;
 
        rc = ldlm_cli_enqueue(exp, &req, einfo, res_id, policy, flags, lvb,
-                             sizeof(*lvb), LVB_T_OST, lockh, async);
-       if (rqset) {
+                             sizeof(*lvb), LVB_T_OST, &lockh, async);
+       if (async) {
                if (!rc) {
                        struct osc_enqueue_args *aa;
 
-                       CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
+                       CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
                        aa = ptlrpc_req_async_args(req);
-                       aa->oa_ei = einfo;
                        aa->oa_exp = exp;
-                       aa->oa_flags  = flags;
+                       aa->oa_mode = einfo->ei_mode;
+                       aa->oa_type = einfo->ei_type;
+                       lustre_handle_copy(&aa->oa_lockh, &lockh);
                        aa->oa_upcall = upcall;
                        aa->oa_cookie = cookie;
-                       aa->oa_lvb    = lvb;
-                       aa->oa_lockh  = lockh;
                        aa->oa_agl    = !!agl;
+                       if (!agl) {
+                               aa->oa_flags = flags;
+                               aa->oa_lvb = lvb;
+                       } else {
+                               /* AGL is essentially to enqueue an DLM lock
+                               * in advance, so we don't care about the
+                               * result of AGL enqueue.
+                               */
+                               aa->oa_lvb = NULL;
+                               aa->oa_flags = NULL;
+                       }
 
                        req->rq_interpret_reply =
                                (ptlrpc_interpterer_t)osc_enqueue_interpret;
@@ -2359,7 +2378,8 @@ int osc_enqueue_base(struct obd_export *exp, struct ldlm_res_id *res_id,
                return rc;
        }
 
-       rc = osc_enqueue_fini(req, lvb, upcall, cookie, flags, agl, rc);
+       rc = osc_enqueue_fini(req, upcall, cookie, &lockh, einfo->ei_mode,
+                             flags, agl, rc);
        if (intent)
                ptlrpc_req_finished(req);
 
@@ -2381,8 +2401,8 @@ int osc_match_base(struct obd_export *exp, struct ldlm_res_id *res_id,
        /* Filesystem lock extents are extended to page boundaries so that
         * dealing with the page cache is a little smoother
         */
-       policy->l_extent.start -= policy->l_extent.start & ~CFS_PAGE_MASK;
-       policy->l_extent.end |= ~CFS_PAGE_MASK;
+       policy->l_extent.start -= policy->l_extent.start & ~PAGE_MASK;
+       policy->l_extent.end |= ~PAGE_MASK;
 
        /* Next, search for already existing extent locks that will cover us */
        /* If we're trying to read, we also search for an existing PW lock.  The
@@ -2493,7 +2513,7 @@ static int osc_statfs_async(struct obd_export *exp,
        }
 
        req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_statfs_interpret;
-       CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
+       CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
        aa = ptlrpc_req_async_args(req);
        aa->aa_oi = oinfo;
 
@@ -2756,7 +2776,8 @@ static int osc_get_info(const struct lu_env *env, struct obd_export *exp,
                tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
                memcpy(tmp, key, keylen);
 
-               req->rq_no_delay = req->rq_no_resend = 1;
+               req->rq_no_delay = 1;
+               req->rq_no_resend = 1;
                ptlrpc_request_set_replen(req);
                rc = ptlrpc_queue_wait(req);
                if (rc)
@@ -2787,7 +2808,7 @@ out:
                        goto skip_locking;
 
                policy.l_extent.start = fm_key->fiemap.fm_start &
-                                               CFS_PAGE_MASK;
+                                               PAGE_MASK;
 
                if (OBD_OBJECT_EOF - fm_key->fiemap.fm_length <=
                    fm_key->fiemap.fm_start + PAGE_SIZE - 1)
@@ -2795,7 +2816,7 @@ out:
                else
                        policy.l_extent.end = (fm_key->fiemap.fm_start +
                                fm_key->fiemap.fm_length +
-                               PAGE_SIZE - 1) & CFS_PAGE_MASK;
+                               PAGE_SIZE - 1) & PAGE_MASK;
 
                ostid_build_res_name(&fm_key->oa.o_oi, &res_id);
                mode = ldlm_lock_match(exp->exp_obd->obd_namespace,
@@ -2896,7 +2917,7 @@ static int osc_set_info_async(const struct lu_env *env, struct obd_export *exp,
 
                LASSERT(!cli->cl_cache); /* only once */
                cli->cl_cache = val;
-               atomic_inc(&cli->cl_cache->ccc_users);
+               cl_cache_incref(cli->cl_cache);
                cli->cl_lru_left = &cli->cl_cache->ccc_lru_left;
 
                /* add this osc into entity list */
@@ -2913,7 +2934,7 @@ static int osc_set_info_async(const struct lu_env *env, struct obd_export *exp,
                int nr = atomic_read(&cli->cl_lru_in_list) >> 1;
                int target = *(int *)val;
 
-               nr = osc_lru_shrink(cli, min(nr, target));
+               nr = osc_lru_shrink(env, cli, min(nr, target), true);
                *(int *)val -= nr;
                return 0;
        }
@@ -2992,12 +3013,12 @@ static int osc_reconnect(const struct lu_env *env,
        if (data && (data->ocd_connect_flags & OBD_CONNECT_GRANT)) {
                long lost_grant;
 
-               client_obd_list_lock(&cli->cl_loi_list_lock);
+               spin_lock(&cli->cl_loi_list_lock);
                data->ocd_grant = (cli->cl_avail_grant + cli->cl_dirty) ?:
                                2 * cli_brw_size(obd);
                lost_grant = cli->cl_lost_grant;
                cli->cl_lost_grant = 0;
-               client_obd_list_unlock(&cli->cl_loi_list_lock);
+               spin_unlock(&cli->cl_loi_list_lock);
 
                CDEBUG(D_RPCTRACE, "ocd_connect_flags: %#llx ocd_version: %d ocd_grant: %d, lost: %ld.\n",
                       data->ocd_connect_flags,
@@ -3047,10 +3068,10 @@ static int osc_import_event(struct obd_device *obd,
        switch (event) {
        case IMP_EVENT_DISCON: {
                cli = &obd->u.cli;
-               client_obd_list_lock(&cli->cl_loi_list_lock);
+               spin_lock(&cli->cl_loi_list_lock);
                cli->cl_avail_grant = 0;
                cli->cl_lost_grant = 0;
-               client_obd_list_unlock(&cli->cl_loi_list_lock);
+               spin_unlock(&cli->cl_loi_list_lock);
                break;
        }
        case IMP_EVENT_INACTIVE: {
@@ -3073,8 +3094,9 @@ static int osc_import_event(struct obd_device *obd,
 
                        ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY);
                        cl_env_put(env, &refcheck);
-               } else
+               } else {
                        rc = PTR_ERR(env);
+               }
                break;
        }
        case IMP_EVENT_ACTIVE: {
@@ -3116,20 +3138,14 @@ static int osc_import_event(struct obd_device *obd,
  * \retval zero the lock can't be canceled
  * \retval other ok to cancel
  */
-static int osc_cancel_for_recovery(struct ldlm_lock *lock)
+static int osc_cancel_weight(struct ldlm_lock *lock)
 {
-       check_res_locked(lock->l_resource);
-
        /*
-        * Cancel all unused extent lock in granted mode LCK_PR or LCK_CR.
-        *
-        * XXX as a future improvement, we can also cancel unused write lock
-        * if it doesn't have dirty data and active mmaps.
+        * Cancel all unused and granted extent lock.
         */
        if (lock->l_resource->lr_type == LDLM_EXTENT &&
-           (lock->l_granted_mode == LCK_PR ||
-            lock->l_granted_mode == LCK_CR) &&
-           (osc_dlm_lock_pageref(lock) == 0))
+           lock->l_granted_mode == lock->l_req_mode &&
+           osc_ldlm_weigh_ast(lock) == 0)
                return 1;
 
        return 0;
@@ -3170,6 +3186,14 @@ int osc_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
        }
        cli->cl_writeback_work = handler;
 
+       handler = ptlrpcd_alloc_work(cli->cl_import, lru_queue_work, cli);
+       if (IS_ERR(handler)) {
+               rc = PTR_ERR(handler);
+               goto out_ptlrpcd_work;
+       }
+
+       cli->cl_lru_work = handler;
+
        rc = osc_quota_setup(obd);
        if (rc)
                goto out_ptlrpcd_work;
@@ -3198,11 +3222,18 @@ int osc_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
        }
 
        INIT_LIST_HEAD(&cli->cl_grant_shrink_list);
-       ns_register_cancel(obd->obd_namespace, osc_cancel_for_recovery);
+       ns_register_cancel(obd->obd_namespace, osc_cancel_weight);
        return rc;
 
 out_ptlrpcd_work:
-       ptlrpcd_destroy_work(handler);
+       if (cli->cl_writeback_work) {
+               ptlrpcd_destroy_work(cli->cl_writeback_work);
+               cli->cl_writeback_work = NULL;
+       }
+       if (cli->cl_lru_work) {
+               ptlrpcd_destroy_work(cli->cl_lru_work);
+               cli->cl_lru_work = NULL;
+       }
 out_client_setup:
        client_obd_cleanup(obd);
 out_ptlrpcd:
@@ -3241,6 +3272,10 @@ static int osc_precleanup(struct obd_device *obd, enum obd_cleanup_stage stage)
                        ptlrpcd_destroy_work(cli->cl_writeback_work);
                        cli->cl_writeback_work = NULL;
                }
+               if (cli->cl_lru_work) {
+                       ptlrpcd_destroy_work(cli->cl_lru_work);
+                       cli->cl_lru_work = NULL;
+               }
                obd_cleanup_client_import(obd);
                ptlrpc_lprocfs_unregister_obd(obd);
                lprocfs_obd_cleanup(obd);
@@ -3262,7 +3297,7 @@ static int osc_cleanup(struct obd_device *obd)
                list_del_init(&cli->cl_lru_osc);
                spin_unlock(&cli->cl_cache->ccc_lru_lock);
                cli->cl_lru_left = NULL;
-               atomic_dec(&cli->cl_cache->ccc_users);
+               cl_cache_decref(cli->cl_cache);
                cli->cl_cache = NULL;
        }
 
@@ -3330,7 +3365,6 @@ static struct obd_ops osc_obd_ops = {
 };
 
 extern struct lu_kmem_descr osc_caches[];
-extern spinlock_t osc_ast_guard;
 extern struct lock_class_key osc_ast_guard_class;
 
 static int __init osc_init(void)
@@ -3357,9 +3391,6 @@ static int __init osc_init(void)
        if (rc)
                goto out_kmem;
 
-       spin_lock_init(&osc_ast_guard);
-       lockdep_set_class(&osc_ast_guard, &osc_ast_guard_class);
-
        /* This is obviously too much memory, only prevent overflow here */
        if (osc_reqpool_mem_max >= 1 << 12 || osc_reqpool_mem_max == 0) {
                rc = -EINVAL;
This page took 0.046192 seconds and 5 git commands to generate.