X-Git-Url: http://drtracing.org/?a=blobdiff_plain;f=drivers%2Fstaging%2Flustre%2Flustre%2Fosc%2Fosc_request.c;h=47417f88fe3c209f80cdc58cdc90f3fedd237f36;hb=025fd3c20bfb4e84972f174c7246f86d693f6544;hp=30526ebcad04e52e8e6830fc2d1952c19e3ebd69;hpb=8595bb27cebe8b823167fd11b2b506f2e59e83f5;p=deliverable%2Flinux.git diff --git a/drivers/staging/lustre/lustre/osc/osc_request.c b/drivers/staging/lustre/lustre/osc/osc_request.c index 30526ebcad04..536b868ff776 100644 --- a/drivers/staging/lustre/lustre/osc/osc_request.c +++ b/drivers/staging/lustre/lustre/osc/osc_request.c @@ -15,11 +15,7 @@ * * You should have received a copy of the GNU General Public License * version 2 along with this program; If not, see - * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf - * - * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara, - * CA 95054 USA or visit www.sun.com if you need additional information or - * have any questions. + * http://www.gnu.org/licenses/gpl-2.0.html * * GPL HEADER END */ @@ -92,12 +88,13 @@ struct osc_fsync_args { struct osc_enqueue_args { struct obd_export *oa_exp; + enum ldlm_type oa_type; + enum ldlm_mode oa_mode; __u64 *oa_flags; - obd_enqueue_update_f oa_upcall; + osc_enqueue_upcall_f oa_upcall; void *oa_cookie; struct ost_lvb *oa_lvb; - struct lustre_handle *oa_lockh; - struct ldlm_enqueue_info *oa_ei; + struct lustre_handle oa_lockh; unsigned int oa_agl:1; }; @@ -473,7 +470,8 @@ static int osc_real_create(struct obd_export *exp, struct obdo *oa, DEBUG_REQ(D_HA, req, "delorphan from OST integration"); /* Don't resend the delorphan req */ - req->rq_no_resend = req->rq_no_delay = 1; + req->rq_no_resend = 1; + req->rq_no_delay = 1; } rc = ptlrpc_queue_wait(req); @@ -801,21 +799,24 @@ static void osc_announce_cached(struct client_obd *cli, struct obdo *oa, LASSERT(!(oa->o_valid & bits)); oa->o_valid |= bits; - client_obd_list_lock(&cli->cl_loi_list_lock); + spin_lock(&cli->cl_loi_list_lock); oa->o_dirty = cli->cl_dirty; if (unlikely(cli->cl_dirty - cli->cl_dirty_transit > cli->cl_dirty_max)) { CERROR("dirty %lu - %lu > dirty_max %lu\n", cli->cl_dirty, cli->cl_dirty_transit, cli->cl_dirty_max); oa->o_undirty = 0; - } else if (unlikely(atomic_read(&obd_dirty_pages) - + } else if (unlikely(atomic_read(&obd_unstable_pages) + + atomic_read(&obd_dirty_pages) - atomic_read(&obd_dirty_transit_pages) > (long)(obd_max_dirty_pages + 1))) { /* The atomic_read() allowing the atomic_inc() are * not covered by a lock thus they may safely race and trip * this CERROR() unless we add in a small fudge factor (+1). */ - CERROR("dirty %d - %d > system dirty_max %d\n", + CERROR("%s: dirty %d + %d - %d > system dirty_max %d\n", + cli->cl_import->imp_obd->obd_name, + atomic_read(&obd_unstable_pages), atomic_read(&obd_dirty_pages), atomic_read(&obd_dirty_transit_pages), obd_max_dirty_pages); @@ -833,10 +834,9 @@ static void osc_announce_cached(struct client_obd *cli, struct obdo *oa, oa->o_grant = cli->cl_avail_grant + cli->cl_reserved_grant; oa->o_dropped = cli->cl_lost_grant; cli->cl_lost_grant = 0; - client_obd_list_unlock(&cli->cl_loi_list_lock); + spin_unlock(&cli->cl_loi_list_lock); CDEBUG(D_CACHE, "dirty: %llu undirty: %u dropped %u grant: %llu\n", oa->o_dirty, oa->o_undirty, oa->o_dropped, oa->o_grant); - } void osc_update_next_shrink(struct client_obd *cli) @@ -849,9 +849,9 @@ void osc_update_next_shrink(struct client_obd *cli) static void __osc_update_grant(struct client_obd *cli, u64 grant) { - client_obd_list_lock(&cli->cl_loi_list_lock); + spin_lock(&cli->cl_loi_list_lock); cli->cl_avail_grant += grant; - client_obd_list_unlock(&cli->cl_loi_list_lock); + spin_unlock(&cli->cl_loi_list_lock); } static void osc_update_grant(struct client_obd *cli, struct ost_body *body) @@ -889,10 +889,10 @@ out: static void osc_shrink_grant_local(struct client_obd *cli, struct obdo *oa) { - client_obd_list_lock(&cli->cl_loi_list_lock); + spin_lock(&cli->cl_loi_list_lock); oa->o_grant = cli->cl_avail_grant / 4; cli->cl_avail_grant -= oa->o_grant; - client_obd_list_unlock(&cli->cl_loi_list_lock); + spin_unlock(&cli->cl_loi_list_lock); if (!(oa->o_valid & OBD_MD_FLFLAGS)) { oa->o_valid |= OBD_MD_FLFLAGS; oa->o_flags = 0; @@ -911,10 +911,10 @@ static int osc_shrink_grant(struct client_obd *cli) __u64 target_bytes = (cli->cl_max_rpcs_in_flight + 1) * (cli->cl_max_pages_per_rpc << PAGE_SHIFT); - client_obd_list_lock(&cli->cl_loi_list_lock); + spin_lock(&cli->cl_loi_list_lock); if (cli->cl_avail_grant <= target_bytes) target_bytes = cli->cl_max_pages_per_rpc << PAGE_SHIFT; - client_obd_list_unlock(&cli->cl_loi_list_lock); + spin_unlock(&cli->cl_loi_list_lock); return osc_shrink_grant_to_target(cli, target_bytes); } @@ -924,7 +924,7 @@ int osc_shrink_grant_to_target(struct client_obd *cli, __u64 target_bytes) int rc = 0; struct ost_body *body; - client_obd_list_lock(&cli->cl_loi_list_lock); + spin_lock(&cli->cl_loi_list_lock); /* Don't shrink if we are already above or below the desired limit * We don't want to shrink below a single RPC, as that will negatively * impact block allocation and long-term performance. @@ -933,10 +933,10 @@ int osc_shrink_grant_to_target(struct client_obd *cli, __u64 target_bytes) target_bytes = cli->cl_max_pages_per_rpc << PAGE_SHIFT; if (target_bytes >= cli->cl_avail_grant) { - client_obd_list_unlock(&cli->cl_loi_list_lock); + spin_unlock(&cli->cl_loi_list_lock); return 0; } - client_obd_list_unlock(&cli->cl_loi_list_lock); + spin_unlock(&cli->cl_loi_list_lock); body = kzalloc(sizeof(*body), GFP_NOFS); if (!body) @@ -944,10 +944,10 @@ int osc_shrink_grant_to_target(struct client_obd *cli, __u64 target_bytes) osc_announce_cached(cli, &body->oa, 0); - client_obd_list_lock(&cli->cl_loi_list_lock); + spin_lock(&cli->cl_loi_list_lock); body->oa.o_grant = cli->cl_avail_grant - target_bytes; cli->cl_avail_grant = target_bytes; - client_obd_list_unlock(&cli->cl_loi_list_lock); + spin_unlock(&cli->cl_loi_list_lock); if (!(body->oa.o_valid & OBD_MD_FLFLAGS)) { body->oa.o_valid |= OBD_MD_FLFLAGS; body->oa.o_flags = 0; @@ -1035,7 +1035,7 @@ static void osc_init_grant(struct client_obd *cli, struct obd_connect_data *ocd) * race is tolerable here: if we're evicted, but imp_state already * left EVICTED state, then cl_dirty must be 0 already. */ - client_obd_list_lock(&cli->cl_loi_list_lock); + spin_lock(&cli->cl_loi_list_lock); if (cli->cl_import->imp_state == LUSTRE_IMP_EVICTED) cli->cl_avail_grant = ocd->ocd_grant; else @@ -1053,7 +1053,7 @@ static void osc_init_grant(struct client_obd *cli, struct obd_connect_data *ocd) /* determine the appropriate chunk size used by osc_extent. */ cli->cl_chunkbits = max_t(int, PAGE_SHIFT, ocd->ocd_blocksize); - client_obd_list_unlock(&cli->cl_loi_list_lock); + spin_unlock(&cli->cl_loi_list_lock); CDEBUG(D_CACHE, "%s, setting cl_avail_grant: %ld cl_lost_grant: %ld chunk bits: %d\n", cli->cl_import->imp_obd->obd_name, @@ -1082,7 +1082,7 @@ static void handle_short_read(int nob_read, u32 page_count, if (pga[i]->count > nob_read) { /* EOF inside this page */ ptr = kmap(pga[i]->pg) + - (pga[i]->off & ~CFS_PAGE_MASK); + (pga[i]->off & ~PAGE_MASK); memset(ptr + nob_read, 0, pga[i]->count - nob_read); kunmap(pga[i]->pg); page_count--; @@ -1097,7 +1097,7 @@ static void handle_short_read(int nob_read, u32 page_count, /* zero remaining pages */ while (page_count-- > 0) { - ptr = kmap(pga[i]->pg) + (pga[i]->off & ~CFS_PAGE_MASK); + ptr = kmap(pga[i]->pg) + (pga[i]->off & ~PAGE_MASK); memset(ptr, 0, pga[i]->count); kunmap(pga[i]->pg); i++; @@ -1144,7 +1144,8 @@ static inline int can_merge_pages(struct brw_page *p1, struct brw_page *p2) { if (p1->flag != p2->flag) { unsigned mask = ~(OBD_BRW_FROM_GRANT | OBD_BRW_NOCACHE | - OBD_BRW_SYNC | OBD_BRW_ASYNC|OBD_BRW_NOQUOTA); + OBD_BRW_SYNC | OBD_BRW_ASYNC | + OBD_BRW_NOQUOTA | OBD_BRW_SOFT_SYNC); /* warn if we try to combine flags that we don't know to be * safe to combine @@ -1188,32 +1189,29 @@ static u32 osc_checksum_bulk(int nob, u32 pg_count, if (i == 0 && opc == OST_READ && OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_RECEIVE)) { unsigned char *ptr = kmap(pga[i]->pg); - int off = pga[i]->off & ~CFS_PAGE_MASK; + int off = pga[i]->off & ~PAGE_MASK; memcpy(ptr + off, "bad1", min(4, nob)); kunmap(pga[i]->pg); } cfs_crypto_hash_update_page(hdesc, pga[i]->pg, - pga[i]->off & ~CFS_PAGE_MASK, + pga[i]->off & ~PAGE_MASK, count); CDEBUG(D_PAGE, "page %p map %p index %lu flags %lx count %u priv %0lx: off %d\n", pga[i]->pg, pga[i]->pg->mapping, pga[i]->pg->index, (long)pga[i]->pg->flags, page_count(pga[i]->pg), page_private(pga[i]->pg), - (int)(pga[i]->off & ~CFS_PAGE_MASK)); + (int)(pga[i]->off & ~PAGE_MASK)); nob -= pga[i]->count; pg_count--; i++; } - bufsize = 4; + bufsize = sizeof(cksum); err = cfs_crypto_hash_final(hdesc, (unsigned char *)&cksum, &bufsize); - if (err) - cfs_crypto_hash_final(hdesc, NULL, NULL); - /* For sending we only compute the wrong checksum instead * of corrupting the data so it is still correct on a redo */ @@ -1312,7 +1310,7 @@ static int osc_brw_prep_request(int cmd, struct client_obd *cli, pg_prev = pga[0]; for (requested_nob = i = 0; i < page_count; i++, niobuf++) { struct brw_page *pg = pga[i]; - int poff = pg->off & ~CFS_PAGE_MASK; + int poff = pg->off & ~PAGE_MASK; LASSERT(pg->count > 0); /* make sure there is no gap in the middle of page array */ @@ -1658,6 +1656,7 @@ static int osc_brw_redo_request(struct ptlrpc_request *request, aa->aa_resends++; new_req->rq_interpret_reply = request->rq_interpret_reply; new_req->rq_async_args = request->rq_async_args; + new_req->rq_commit_cb = request->rq_commit_cb; /* cap resend delay to the current request timeout, this is similar to * what ptlrpc does (see after_reply()) */ @@ -1737,7 +1736,6 @@ static int brw_interpret(const struct lu_env *env, struct osc_brw_async_args *aa = data; struct osc_extent *ext; struct osc_extent *tmp; - struct cl_object *obj = NULL; struct client_obd *cli = aa->aa_cli; rc = osc_brw_fini_request(req, rc); @@ -1766,24 +1764,17 @@ static int brw_interpret(const struct lu_env *env, rc = -EIO; } - list_for_each_entry_safe(ext, tmp, &aa->aa_exts, oe_link) { - if (!obj && rc == 0) { - obj = osc2cl(ext->oe_obj); - cl_object_get(obj); - } - - list_del_init(&ext->oe_link); - osc_extent_finish(env, ext, 1, rc); - } - LASSERT(list_empty(&aa->aa_exts)); - LASSERT(list_empty(&aa->aa_oaps)); - - if (obj) { + if (rc == 0) { struct obdo *oa = aa->aa_oa; struct cl_attr *attr = &osc_env_info(env)->oti_attr; unsigned long valid = 0; + struct cl_object *obj; + struct osc_async_page *last; - LASSERT(rc == 0); + last = brw_page2oap(aa->aa_ppga[aa->aa_page_count - 1]); + obj = osc2cl(last->oap_obj); + + cl_object_attr_lock(obj); if (oa->o_valid & OBD_MD_FLBLOCKS) { attr->cat_blocks = oa->o_blocks; valid |= CAT_BLOCKS; @@ -1800,21 +1791,45 @@ static int brw_interpret(const struct lu_env *env, attr->cat_ctime = oa->o_ctime; valid |= CAT_CTIME; } - if (valid != 0) { - cl_object_attr_lock(obj); - cl_object_attr_set(env, obj, attr, valid); - cl_object_attr_unlock(obj); + + if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) { + struct lov_oinfo *loi = cl2osc(obj)->oo_oinfo; + loff_t last_off = last->oap_count + last->oap_obj_off; + + /* Change file size if this is an out of quota or + * direct IO write and it extends the file size + */ + if (loi->loi_lvb.lvb_size < last_off) { + attr->cat_size = last_off; + valid |= CAT_SIZE; + } + /* Extend KMS if it's not a lockless write */ + if (loi->loi_kms < last_off && + oap2osc_page(last)->ops_srvlock == 0) { + attr->cat_kms = last_off; + valid |= CAT_KMS; + } } - cl_object_put(env, obj); + + if (valid != 0) + cl_object_attr_set(env, obj, attr, valid); + cl_object_attr_unlock(obj); } kmem_cache_free(obdo_cachep, aa->aa_oa); + list_for_each_entry_safe(ext, tmp, &aa->aa_exts, oe_link) { + list_del_init(&ext->oe_link); + osc_extent_finish(env, ext, 1, rc); + } + LASSERT(list_empty(&aa->aa_exts)); + LASSERT(list_empty(&aa->aa_oaps)); + cl_req_completion(env, aa->aa_clerq, rc < 0 ? rc : req->rq_bulk->bd_nob_transferred); osc_release_ppga(aa->aa_ppga, aa->aa_page_count); ptlrpc_lprocfs_brw(req, req->rq_bulk->bd_nob_transferred); - client_obd_list_lock(&cli->cl_loi_list_lock); + spin_lock(&cli->cl_loi_list_lock); /* We need to decrement before osc_ap_completion->osc_wake_cache_waiters * is called so we know whether to go to sync BRWs or wait for more * RPCs to complete @@ -1824,12 +1839,31 @@ static int brw_interpret(const struct lu_env *env, else cli->cl_r_in_flight--; osc_wake_cache_waiters(cli); - client_obd_list_unlock(&cli->cl_loi_list_lock); + spin_unlock(&cli->cl_loi_list_lock); osc_io_unplug(env, cli, NULL); return rc; } +static void brw_commit(struct ptlrpc_request *req) +{ + spin_lock(&req->rq_lock); + /* + * If osc_inc_unstable_pages (via osc_extent_finish) races with + * this called via the rq_commit_cb, I need to ensure + * osc_dec_unstable_pages is still called. Otherwise unstable + * pages may be leaked. + */ + if (req->rq_unstable) { + spin_unlock(&req->rq_lock); + osc_dec_unstable_pages(req); + spin_lock(&req->rq_lock); + } else { + req->rq_committed = 1; + } + spin_unlock(&req->rq_lock); +} + /** * Build an RPC by the list of extent @ext_list. The caller must ensure * that the total pages in this list are NOT over max pages per RPC. @@ -1920,7 +1954,7 @@ int osc_build_rpc(const struct lu_env *env, struct client_obd *cli, pga[i] = &oap->oap_brw_page; pga[i]->off = oap->oap_obj_off + oap->oap_page_off; CDEBUG(0, "put page %p index %lu oap %p flg %x to pga\n", - pga[i]->pg, page_index(oap->oap_page), oap, + pga[i]->pg, oap->oap_page->index, oap, pga[i]->flag); i++; cl_req_page_add(env, clerq, page); @@ -1949,6 +1983,7 @@ int osc_build_rpc(const struct lu_env *env, struct client_obd *cli, goto out; } + req->rq_commit_cb = brw_commit; req->rq_interpret_reply = brw_interpret; if (mem_tight != 0) @@ -1992,7 +2027,7 @@ int osc_build_rpc(const struct lu_env *env, struct client_obd *cli, if (tmp) tmp->oap_request = ptlrpc_request_addref(req); - client_obd_list_lock(&cli->cl_loi_list_lock); + spin_lock(&cli->cl_loi_list_lock); starting_offset >>= PAGE_SHIFT; if (cmd == OBD_BRW_READ) { cli->cl_r_in_flight++; @@ -2007,7 +2042,7 @@ int osc_build_rpc(const struct lu_env *env, struct client_obd *cli, lprocfs_oh_tally_log2(&cli->cl_write_offset_hist, starting_offset + 1); } - client_obd_list_unlock(&cli->cl_loi_list_lock); + spin_unlock(&cli->cl_loi_list_lock); DEBUG_REQ(D_INODE, req, "%d pages, aa %p. now %dr/%dw in flight", page_count, aa, cli->cl_r_in_flight, @@ -2055,14 +2090,12 @@ static int osc_set_lock_data_with_check(struct ldlm_lock *lock, LASSERT(lock->l_glimpse_ast == einfo->ei_cb_gl); lock_res_and_lock(lock); - spin_lock(&osc_ast_guard); if (!lock->l_ast_data) lock->l_ast_data = data; if (lock->l_ast_data == data) set = 1; - spin_unlock(&osc_ast_guard); unlock_res_and_lock(lock); return set; @@ -2104,36 +2137,38 @@ static int osc_find_cbdata(struct obd_export *exp, struct lov_stripe_md *lsm, return rc; } -static int osc_enqueue_fini(struct ptlrpc_request *req, struct ost_lvb *lvb, - obd_enqueue_update_f upcall, void *cookie, - __u64 *flags, int agl, int rc) +static int osc_enqueue_fini(struct ptlrpc_request *req, + osc_enqueue_upcall_f upcall, void *cookie, + struct lustre_handle *lockh, enum ldlm_mode mode, + __u64 *flags, int agl, int errcode) { - int intent = *flags & LDLM_FL_HAS_INTENT; - - if (intent) { - /* The request was created before ldlm_cli_enqueue call. */ - if (rc == ELDLM_LOCK_ABORTED) { - struct ldlm_reply *rep; + bool intent = *flags & LDLM_FL_HAS_INTENT; + int rc; - rep = req_capsule_server_get(&req->rq_pill, - &RMF_DLM_REP); + /* The request was created before ldlm_cli_enqueue call. */ + if (intent && errcode == ELDLM_LOCK_ABORTED) { + struct ldlm_reply *rep; - rep->lock_policy_res1 = - ptlrpc_status_ntoh(rep->lock_policy_res1); - if (rep->lock_policy_res1) - rc = rep->lock_policy_res1; - } - } + rep = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP); - if ((intent != 0 && rc == ELDLM_LOCK_ABORTED && agl == 0) || - (rc == 0)) { + rep->lock_policy_res1 = + ptlrpc_status_ntoh(rep->lock_policy_res1); + if (rep->lock_policy_res1) + errcode = rep->lock_policy_res1; + if (!agl) + *flags |= LDLM_FL_LVB_READY; + } else if (errcode == ELDLM_OK) { *flags |= LDLM_FL_LVB_READY; - CDEBUG(D_INODE, "got kms %llu blocks %llu mtime %llu\n", - lvb->lvb_size, lvb->lvb_blocks, lvb->lvb_mtime); } /* Call the update callback. */ - rc = (*upcall)(cookie, rc); + rc = (*upcall)(cookie, lockh, errcode); + /* release the reference taken in ldlm_cli_enqueue() */ + if (errcode == ELDLM_LOCK_MATCHED) + errcode = ELDLM_OK; + if (errcode == ELDLM_OK && lustre_handle_is_used(lockh)) + ldlm_lock_decref(lockh, mode); + return rc; } @@ -2142,62 +2177,50 @@ static int osc_enqueue_interpret(const struct lu_env *env, struct osc_enqueue_args *aa, int rc) { struct ldlm_lock *lock; - struct lustre_handle handle; - __u32 mode; - struct ost_lvb *lvb; - __u32 lvb_len; - __u64 *flags = aa->oa_flags; - - /* Make a local copy of a lock handle and a mode, because aa->oa_* - * might be freed anytime after lock upcall has been called. - */ - lustre_handle_copy(&handle, aa->oa_lockh); - mode = aa->oa_ei->ei_mode; + struct lustre_handle *lockh = &aa->oa_lockh; + enum ldlm_mode mode = aa->oa_mode; + struct ost_lvb *lvb = aa->oa_lvb; + __u32 lvb_len = sizeof(*lvb); + __u64 flags = 0; + /* ldlm_cli_enqueue is holding a reference on the lock, so it must * be valid. */ - lock = ldlm_handle2lock(&handle); + lock = ldlm_handle2lock(lockh); + LASSERTF(lock, "lockh %llx, req %p, aa %p - client evicted?\n", + lockh->cookie, req, aa); /* Take an additional reference so that a blocking AST that * ldlm_cli_enqueue_fini() might post for a failed lock, is guaranteed * to arrive after an upcall has been executed by * osc_enqueue_fini(). */ - ldlm_lock_addref(&handle, mode); + ldlm_lock_addref(lockh, mode); + + /* Let cl_lock_state_wait fail with -ERESTARTSYS to unuse sublocks. */ + OBD_FAIL_TIMEOUT(OBD_FAIL_LDLM_ENQUEUE_HANG, 2); /* Let CP AST to grant the lock first. */ OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_ENQ_RACE, 1); - if (aa->oa_agl && rc == ELDLM_LOCK_ABORTED) { - lvb = NULL; - lvb_len = 0; - } else { - lvb = aa->oa_lvb; - lvb_len = sizeof(*aa->oa_lvb); + if (aa->oa_agl) { + LASSERT(!aa->oa_lvb); + LASSERT(!aa->oa_flags); + aa->oa_flags = &flags; } /* Complete obtaining the lock procedure. */ - rc = ldlm_cli_enqueue_fini(aa->oa_exp, req, aa->oa_ei->ei_type, 1, - mode, flags, lvb, lvb_len, &handle, rc); + rc = ldlm_cli_enqueue_fini(aa->oa_exp, req, aa->oa_type, 1, + aa->oa_mode, aa->oa_flags, lvb, lvb_len, + lockh, rc); /* Complete osc stuff. */ - rc = osc_enqueue_fini(req, aa->oa_lvb, aa->oa_upcall, aa->oa_cookie, - flags, aa->oa_agl, rc); + rc = osc_enqueue_fini(req, aa->oa_upcall, aa->oa_cookie, lockh, mode, + aa->oa_flags, aa->oa_agl, rc); OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_CANCEL_RACE, 10); - /* Release the lock for async request. */ - if (lustre_handle_is_used(&handle) && rc == ELDLM_OK) - /* - * Releases a reference taken by ldlm_cli_enqueue(), if it is - * not already released by - * ldlm_cli_enqueue_fini()->failed_lock_cleanup() - */ - ldlm_lock_decref(&handle, mode); - - LASSERTF(lock, "lockh %p, req %p, aa %p - client evicted?\n", - aa->oa_lockh, req, aa); - ldlm_lock_decref(&handle, mode); + ldlm_lock_decref(lockh, mode); LDLM_LOCK_PUT(lock); return rc; } @@ -2209,29 +2232,29 @@ struct ptlrpc_request_set *PTLRPCD_SET = (void *)1; * other synchronous requests, however keeping some locks and trying to obtain * others may take a considerable amount of time in a case of ost failure; and * when other sync requests do not get released lock from a client, the client - * is excluded from the cluster -- such scenarious make the life difficult, so + * is evicted from the cluster -- such scenaries make the life difficult, so * release locks just after they are obtained. */ int osc_enqueue_base(struct obd_export *exp, struct ldlm_res_id *res_id, __u64 *flags, ldlm_policy_data_t *policy, struct ost_lvb *lvb, int kms_valid, - obd_enqueue_update_f upcall, void *cookie, + osc_enqueue_upcall_f upcall, void *cookie, struct ldlm_enqueue_info *einfo, - struct lustre_handle *lockh, struct ptlrpc_request_set *rqset, int async, int agl) { struct obd_device *obd = exp->exp_obd; + struct lustre_handle lockh = { 0 }; struct ptlrpc_request *req = NULL; int intent = *flags & LDLM_FL_HAS_INTENT; - __u64 match_lvb = (agl != 0 ? 0 : LDLM_FL_LVB_READY); + __u64 match_flags = *flags; enum ldlm_mode mode; int rc; /* Filesystem lock extents are extended to page boundaries so that * dealing with the page cache is a little smoother. */ - policy->l_extent.start -= policy->l_extent.start & ~CFS_PAGE_MASK; - policy->l_extent.end |= ~CFS_PAGE_MASK; + policy->l_extent.start -= policy->l_extent.start & ~PAGE_MASK; + policy->l_extent.end |= ~PAGE_MASK; /* * kms is not valid when either object is completely fresh (so that no @@ -2258,65 +2281,51 @@ int osc_enqueue_base(struct obd_export *exp, struct ldlm_res_id *res_id, mode = einfo->ei_mode; if (einfo->ei_mode == LCK_PR) mode |= LCK_PW; - mode = ldlm_lock_match(obd->obd_namespace, *flags | match_lvb, res_id, - einfo->ei_type, policy, mode, lockh, 0); + if (agl == 0) + match_flags |= LDLM_FL_LVB_READY; + if (intent != 0) + match_flags |= LDLM_FL_BLOCK_GRANTED; + mode = ldlm_lock_match(obd->obd_namespace, match_flags, res_id, + einfo->ei_type, policy, mode, &lockh, 0); if (mode) { - struct ldlm_lock *matched = ldlm_handle2lock(lockh); + struct ldlm_lock *matched; - if ((agl != 0) && !(matched->l_flags & LDLM_FL_LVB_READY)) { - /* For AGL, if enqueue RPC is sent but the lock is not - * granted, then skip to process this strpe. - * Return -ECANCELED to tell the caller. + if (*flags & LDLM_FL_TEST_LOCK) + return ELDLM_OK; + + matched = ldlm_handle2lock(&lockh); + if (agl) { + /* AGL enqueues DLM locks speculatively. Therefore if + * it already exists a DLM lock, it wll just inform the + * caller to cancel the AGL process for this stripe. */ - ldlm_lock_decref(lockh, mode); + ldlm_lock_decref(&lockh, mode); LDLM_LOCK_PUT(matched); return -ECANCELED; - } - - if (osc_set_lock_data_with_check(matched, einfo)) { + } else if (osc_set_lock_data_with_check(matched, einfo)) { *flags |= LDLM_FL_LVB_READY; - /* addref the lock only if not async requests and PW - * lock is matched whereas we asked for PR. - */ - if (!rqset && einfo->ei_mode != mode) - ldlm_lock_addref(lockh, LCK_PR); - if (intent) { - /* I would like to be able to ASSERT here that - * rss <= kms, but I can't, for reasons which - * are explained in lov_enqueue() - */ - } - - /* We already have a lock, and it's referenced. - * - * At this point, the cl_lock::cll_state is CLS_QUEUING, - * AGL upcall may change it to CLS_HELD directly. - */ - (*upcall)(cookie, ELDLM_OK); + /* We already have a lock, and it's referenced. */ + (*upcall)(cookie, &lockh, ELDLM_LOCK_MATCHED); - if (einfo->ei_mode != mode) - ldlm_lock_decref(lockh, LCK_PW); - else if (rqset) - /* For async requests, decref the lock. */ - ldlm_lock_decref(lockh, einfo->ei_mode); + ldlm_lock_decref(&lockh, mode); LDLM_LOCK_PUT(matched); return ELDLM_OK; + } else { + ldlm_lock_decref(&lockh, mode); + LDLM_LOCK_PUT(matched); } - - ldlm_lock_decref(lockh, mode); - LDLM_LOCK_PUT(matched); } - no_match: +no_match: + if (*flags & LDLM_FL_TEST_LOCK) + return -ENOLCK; if (intent) { - LIST_HEAD(cancels); - req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_LDLM_ENQUEUE_LVB); if (!req) return -ENOMEM; - rc = ldlm_prep_enqueue_req(exp, req, &cancels, 0); + rc = ldlm_prep_enqueue_req(exp, req, NULL, 0); if (rc) { ptlrpc_request_free(req); return rc; @@ -2331,21 +2340,31 @@ int osc_enqueue_base(struct obd_export *exp, struct ldlm_res_id *res_id, *flags &= ~LDLM_FL_BLOCK_GRANTED; rc = ldlm_cli_enqueue(exp, &req, einfo, res_id, policy, flags, lvb, - sizeof(*lvb), LVB_T_OST, lockh, async); - if (rqset) { + sizeof(*lvb), LVB_T_OST, &lockh, async); + if (async) { if (!rc) { struct osc_enqueue_args *aa; - CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args)); + CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args)); aa = ptlrpc_req_async_args(req); - aa->oa_ei = einfo; aa->oa_exp = exp; - aa->oa_flags = flags; + aa->oa_mode = einfo->ei_mode; + aa->oa_type = einfo->ei_type; + lustre_handle_copy(&aa->oa_lockh, &lockh); aa->oa_upcall = upcall; aa->oa_cookie = cookie; - aa->oa_lvb = lvb; - aa->oa_lockh = lockh; aa->oa_agl = !!agl; + if (!agl) { + aa->oa_flags = flags; + aa->oa_lvb = lvb; + } else { + /* AGL is essentially to enqueue an DLM lock + * in advance, so we don't care about the + * result of AGL enqueue. + */ + aa->oa_lvb = NULL; + aa->oa_flags = NULL; + } req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_enqueue_interpret; @@ -2359,7 +2378,8 @@ int osc_enqueue_base(struct obd_export *exp, struct ldlm_res_id *res_id, return rc; } - rc = osc_enqueue_fini(req, lvb, upcall, cookie, flags, agl, rc); + rc = osc_enqueue_fini(req, upcall, cookie, &lockh, einfo->ei_mode, + flags, agl, rc); if (intent) ptlrpc_req_finished(req); @@ -2381,8 +2401,8 @@ int osc_match_base(struct obd_export *exp, struct ldlm_res_id *res_id, /* Filesystem lock extents are extended to page boundaries so that * dealing with the page cache is a little smoother */ - policy->l_extent.start -= policy->l_extent.start & ~CFS_PAGE_MASK; - policy->l_extent.end |= ~CFS_PAGE_MASK; + policy->l_extent.start -= policy->l_extent.start & ~PAGE_MASK; + policy->l_extent.end |= ~PAGE_MASK; /* Next, search for already existing extent locks that will cover us */ /* If we're trying to read, we also search for an existing PW lock. The @@ -2493,7 +2513,7 @@ static int osc_statfs_async(struct obd_export *exp, } req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_statfs_interpret; - CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args)); + CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args)); aa = ptlrpc_req_async_args(req); aa->aa_oi = oinfo; @@ -2756,7 +2776,8 @@ static int osc_get_info(const struct lu_env *env, struct obd_export *exp, tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY); memcpy(tmp, key, keylen); - req->rq_no_delay = req->rq_no_resend = 1; + req->rq_no_delay = 1; + req->rq_no_resend = 1; ptlrpc_request_set_replen(req); rc = ptlrpc_queue_wait(req); if (rc) @@ -2787,7 +2808,7 @@ out: goto skip_locking; policy.l_extent.start = fm_key->fiemap.fm_start & - CFS_PAGE_MASK; + PAGE_MASK; if (OBD_OBJECT_EOF - fm_key->fiemap.fm_length <= fm_key->fiemap.fm_start + PAGE_SIZE - 1) @@ -2795,7 +2816,7 @@ out: else policy.l_extent.end = (fm_key->fiemap.fm_start + fm_key->fiemap.fm_length + - PAGE_SIZE - 1) & CFS_PAGE_MASK; + PAGE_SIZE - 1) & PAGE_MASK; ostid_build_res_name(&fm_key->oa.o_oi, &res_id); mode = ldlm_lock_match(exp->exp_obd->obd_namespace, @@ -2896,7 +2917,7 @@ static int osc_set_info_async(const struct lu_env *env, struct obd_export *exp, LASSERT(!cli->cl_cache); /* only once */ cli->cl_cache = val; - atomic_inc(&cli->cl_cache->ccc_users); + cl_cache_incref(cli->cl_cache); cli->cl_lru_left = &cli->cl_cache->ccc_lru_left; /* add this osc into entity list */ @@ -2913,7 +2934,7 @@ static int osc_set_info_async(const struct lu_env *env, struct obd_export *exp, int nr = atomic_read(&cli->cl_lru_in_list) >> 1; int target = *(int *)val; - nr = osc_lru_shrink(cli, min(nr, target)); + nr = osc_lru_shrink(env, cli, min(nr, target), true); *(int *)val -= nr; return 0; } @@ -2992,12 +3013,12 @@ static int osc_reconnect(const struct lu_env *env, if (data && (data->ocd_connect_flags & OBD_CONNECT_GRANT)) { long lost_grant; - client_obd_list_lock(&cli->cl_loi_list_lock); + spin_lock(&cli->cl_loi_list_lock); data->ocd_grant = (cli->cl_avail_grant + cli->cl_dirty) ?: 2 * cli_brw_size(obd); lost_grant = cli->cl_lost_grant; cli->cl_lost_grant = 0; - client_obd_list_unlock(&cli->cl_loi_list_lock); + spin_unlock(&cli->cl_loi_list_lock); CDEBUG(D_RPCTRACE, "ocd_connect_flags: %#llx ocd_version: %d ocd_grant: %d, lost: %ld.\n", data->ocd_connect_flags, @@ -3047,10 +3068,10 @@ static int osc_import_event(struct obd_device *obd, switch (event) { case IMP_EVENT_DISCON: { cli = &obd->u.cli; - client_obd_list_lock(&cli->cl_loi_list_lock); + spin_lock(&cli->cl_loi_list_lock); cli->cl_avail_grant = 0; cli->cl_lost_grant = 0; - client_obd_list_unlock(&cli->cl_loi_list_lock); + spin_unlock(&cli->cl_loi_list_lock); break; } case IMP_EVENT_INACTIVE: { @@ -3073,8 +3094,9 @@ static int osc_import_event(struct obd_device *obd, ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY); cl_env_put(env, &refcheck); - } else + } else { rc = PTR_ERR(env); + } break; } case IMP_EVENT_ACTIVE: { @@ -3116,20 +3138,14 @@ static int osc_import_event(struct obd_device *obd, * \retval zero the lock can't be canceled * \retval other ok to cancel */ -static int osc_cancel_for_recovery(struct ldlm_lock *lock) +static int osc_cancel_weight(struct ldlm_lock *lock) { - check_res_locked(lock->l_resource); - /* - * Cancel all unused extent lock in granted mode LCK_PR or LCK_CR. - * - * XXX as a future improvement, we can also cancel unused write lock - * if it doesn't have dirty data and active mmaps. + * Cancel all unused and granted extent lock. */ if (lock->l_resource->lr_type == LDLM_EXTENT && - (lock->l_granted_mode == LCK_PR || - lock->l_granted_mode == LCK_CR) && - (osc_dlm_lock_pageref(lock) == 0)) + lock->l_granted_mode == lock->l_req_mode && + osc_ldlm_weigh_ast(lock) == 0) return 1; return 0; @@ -3170,6 +3186,14 @@ int osc_setup(struct obd_device *obd, struct lustre_cfg *lcfg) } cli->cl_writeback_work = handler; + handler = ptlrpcd_alloc_work(cli->cl_import, lru_queue_work, cli); + if (IS_ERR(handler)) { + rc = PTR_ERR(handler); + goto out_ptlrpcd_work; + } + + cli->cl_lru_work = handler; + rc = osc_quota_setup(obd); if (rc) goto out_ptlrpcd_work; @@ -3198,11 +3222,18 @@ int osc_setup(struct obd_device *obd, struct lustre_cfg *lcfg) } INIT_LIST_HEAD(&cli->cl_grant_shrink_list); - ns_register_cancel(obd->obd_namespace, osc_cancel_for_recovery); + ns_register_cancel(obd->obd_namespace, osc_cancel_weight); return rc; out_ptlrpcd_work: - ptlrpcd_destroy_work(handler); + if (cli->cl_writeback_work) { + ptlrpcd_destroy_work(cli->cl_writeback_work); + cli->cl_writeback_work = NULL; + } + if (cli->cl_lru_work) { + ptlrpcd_destroy_work(cli->cl_lru_work); + cli->cl_lru_work = NULL; + } out_client_setup: client_obd_cleanup(obd); out_ptlrpcd: @@ -3241,6 +3272,10 @@ static int osc_precleanup(struct obd_device *obd, enum obd_cleanup_stage stage) ptlrpcd_destroy_work(cli->cl_writeback_work); cli->cl_writeback_work = NULL; } + if (cli->cl_lru_work) { + ptlrpcd_destroy_work(cli->cl_lru_work); + cli->cl_lru_work = NULL; + } obd_cleanup_client_import(obd); ptlrpc_lprocfs_unregister_obd(obd); lprocfs_obd_cleanup(obd); @@ -3262,7 +3297,7 @@ static int osc_cleanup(struct obd_device *obd) list_del_init(&cli->cl_lru_osc); spin_unlock(&cli->cl_cache->ccc_lru_lock); cli->cl_lru_left = NULL; - atomic_dec(&cli->cl_cache->ccc_users); + cl_cache_decref(cli->cl_cache); cli->cl_cache = NULL; } @@ -3330,7 +3365,6 @@ static struct obd_ops osc_obd_ops = { }; extern struct lu_kmem_descr osc_caches[]; -extern spinlock_t osc_ast_guard; extern struct lock_class_key osc_ast_guard_class; static int __init osc_init(void) @@ -3357,9 +3391,6 @@ static int __init osc_init(void) if (rc) goto out_kmem; - spin_lock_init(&osc_ast_guard); - lockdep_set_class(&osc_ast_guard, &osc_ast_guard_class); - /* This is obviously too much memory, only prevent overflow here */ if (osc_reqpool_mem_max >= 1 << 12 || osc_reqpool_mem_max == 0) { rc = -EINVAL;