4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License version 2 for more details (a copy is included
14 * in the LICENSE file that accompanied this code).
16 * You should have received a copy of the GNU General Public License
17 * version 2 along with this program; If not, see
18 * http://www.gnu.org/licenses/gpl-2.0.html
23 * Copyright (c) 2008, 2010, Oracle and/or its affiliates. All rights reserved.
24 * Use is subject to license terms.
26 * Copyright (c) 2011, 2015, Intel Corporation.
29 * This file is part of Lustre, http://www.lustre.org/
30 * Lustre is a trademark of Sun Microsystems, Inc.
32 * Implementation of cl_lock for OSC layer.
34 * Author: Nikita Danilov <nikita.danilov@sun.com>
35 * Author: Jinshan Xiong <jinshan.xiong@intel.com>
38 #define DEBUG_SUBSYSTEM S_OSC
40 #include "../../include/linux/libcfs/libcfs.h"
41 /* fid_build_reg_res_name() */
42 #include "../include/lustre_fid.h"
44 #include "osc_cl_internal.h"
50 /*****************************************************************************
56 static const struct cl_lock_operations osc_lock_ops
;
57 static const struct cl_lock_operations osc_lock_lockless_ops
;
58 static void osc_lock_to_lockless(const struct lu_env
*env
,
59 struct osc_lock
*ols
, int force
);
61 int osc_lock_is_lockless(const struct osc_lock
*olck
)
63 return (olck
->ols_cl
.cls_ops
== &osc_lock_lockless_ops
);
67 * Returns a weak pointer to the ldlm lock identified by a handle. Returned
68 * pointer cannot be dereferenced, as lock is not protected from concurrent
69 * reclaim. This function is a helper for osc_lock_invariant().
71 static struct ldlm_lock
*osc_handle_ptr(struct lustre_handle
*handle
)
73 struct ldlm_lock
*lock
;
75 lock
= ldlm_handle2lock(handle
);
82 * Invariant that has to be true all of the time.
84 static int osc_lock_invariant(struct osc_lock
*ols
)
86 struct ldlm_lock
*lock
= osc_handle_ptr(&ols
->ols_handle
);
87 struct ldlm_lock
*olock
= ols
->ols_dlmlock
;
88 int handle_used
= lustre_handle_is_used(&ols
->ols_handle
);
90 if (ergo(osc_lock_is_lockless(ols
),
91 ols
->ols_locklessable
&& !ols
->ols_dlmlock
))
95 * If all the following "ergo"s are true, return 1, otherwise 0
97 if (!ergo(olock
, handle_used
))
100 if (!ergo(olock
, olock
->l_handle
.h_cookie
== ols
->ols_handle
.cookie
))
103 if (!ergo(handle_used
,
104 ergo(lock
&& olock
, lock
== olock
) &&
105 ergo(!lock
, !olock
)))
108 * Check that ->ols_handle and ->ols_dlmlock are consistent, but
109 * take into account that they are set at the different time.
111 if (!ergo(ols
->ols_state
== OLS_CANCELLED
,
112 !olock
&& !handle_used
))
115 * DLM lock is destroyed only after we have seen cancellation
118 if (!ergo(olock
&& ols
->ols_state
< OLS_CANCELLED
,
119 !ldlm_is_destroyed(olock
)))
122 if (!ergo(ols
->ols_state
== OLS_GRANTED
,
123 olock
&& olock
->l_req_mode
== olock
->l_granted_mode
&&
129 /*****************************************************************************
135 static void osc_lock_fini(const struct lu_env
*env
,
136 struct cl_lock_slice
*slice
)
138 struct osc_lock
*ols
= cl2osc_lock(slice
);
140 LINVRNT(osc_lock_invariant(ols
));
141 LASSERT(!ols
->ols_dlmlock
);
143 kmem_cache_free(osc_lock_kmem
, ols
);
146 static void osc_lock_build_policy(const struct lu_env
*env
,
147 const struct cl_lock
*lock
,
148 ldlm_policy_data_t
*policy
)
150 const struct cl_lock_descr
*d
= &lock
->cll_descr
;
152 osc_index2policy(policy
, d
->cld_obj
, d
->cld_start
, d
->cld_end
);
153 policy
->l_extent
.gid
= d
->cld_gid
;
156 static __u64
osc_enq2ldlm_flags(__u32 enqflags
)
160 LASSERT((enqflags
& ~CEF_MASK
) == 0);
162 if (enqflags
& CEF_NONBLOCK
)
163 result
|= LDLM_FL_BLOCK_NOWAIT
;
164 if (enqflags
& CEF_ASYNC
)
165 result
|= LDLM_FL_HAS_INTENT
;
166 if (enqflags
& CEF_DISCARD_DATA
)
167 result
|= LDLM_FL_AST_DISCARD_DATA
;
168 if (enqflags
& CEF_PEEK
)
169 result
|= LDLM_FL_TEST_LOCK
;
174 * Updates object attributes from a lock value block (lvb) received together
175 * with the DLM lock reply from the server. Copy of osc_update_enqueue()
178 * This can be optimized to not update attributes when lock is a result of a
181 * Called under lock and resource spin-locks.
183 static void osc_lock_lvb_update(const struct lu_env
*env
,
184 struct osc_object
*osc
,
185 struct ldlm_lock
*dlmlock
,
188 struct cl_object
*obj
= osc2cl(osc
);
189 struct lov_oinfo
*oinfo
= osc
->oo_oinfo
;
190 struct cl_attr
*attr
= &osc_env_info(env
)->oti_attr
;
193 valid
= CAT_BLOCKS
| CAT_ATIME
| CAT_CTIME
| CAT_MTIME
| CAT_SIZE
;
195 lvb
= dlmlock
->l_lvb_data
;
197 cl_lvb2attr(attr
, lvb
);
199 cl_object_attr_lock(obj
);
203 check_res_locked(dlmlock
->l_resource
);
204 LASSERT(lvb
== dlmlock
->l_lvb_data
);
205 size
= lvb
->lvb_size
;
207 /* Extend KMS up to the end of this lock and no further
208 * A lock on [x,y] means a KMS of up to y + 1 bytes!
210 if (size
> dlmlock
->l_policy_data
.l_extent
.end
)
211 size
= dlmlock
->l_policy_data
.l_extent
.end
+ 1;
212 if (size
>= oinfo
->loi_kms
) {
213 LDLM_DEBUG(dlmlock
, "lock acquired, setting rss=%llu, kms=%llu",
214 lvb
->lvb_size
, size
);
216 attr
->cat_kms
= size
;
218 LDLM_DEBUG(dlmlock
, "lock acquired, setting rss=%llu; leaving kms=%llu, end=%llu",
219 lvb
->lvb_size
, oinfo
->loi_kms
,
220 dlmlock
->l_policy_data
.l_extent
.end
);
222 ldlm_lock_allow_match_locked(dlmlock
);
225 cl_object_attr_set(env
, obj
, attr
, valid
);
226 cl_object_attr_unlock(obj
);
229 static void osc_lock_granted(const struct lu_env
*env
, struct osc_lock
*oscl
,
230 struct lustre_handle
*lockh
, bool lvb_update
)
232 struct ldlm_lock
*dlmlock
;
234 dlmlock
= ldlm_handle2lock_long(lockh
, 0);
237 /* lock reference taken by ldlm_handle2lock_long() is
238 * owned by osc_lock and released in osc_lock_detach()
240 lu_ref_add(&dlmlock
->l_reference
, "osc_lock", oscl
);
241 oscl
->ols_has_ref
= 1;
243 LASSERT(!oscl
->ols_dlmlock
);
244 oscl
->ols_dlmlock
= dlmlock
;
246 /* This may be a matched lock for glimpse request, do not hold
247 * lock reference in that case.
249 if (!oscl
->ols_glimpse
) {
250 /* hold a refc for non glimpse lock which will
251 * be released in osc_lock_cancel()
253 lustre_handle_copy(&oscl
->ols_handle
, lockh
);
254 ldlm_lock_addref(lockh
, oscl
->ols_einfo
.ei_mode
);
258 /* Lock must have been granted. */
259 lock_res_and_lock(dlmlock
);
260 if (dlmlock
->l_granted_mode
== dlmlock
->l_req_mode
) {
261 struct ldlm_extent
*ext
= &dlmlock
->l_policy_data
.l_extent
;
262 struct cl_lock_descr
*descr
= &oscl
->ols_cl
.cls_lock
->cll_descr
;
264 /* extend the lock extent, otherwise it will have problem when
265 * we decide whether to grant a lockless lock.
267 descr
->cld_mode
= osc_ldlm2cl_lock(dlmlock
->l_granted_mode
);
268 descr
->cld_start
= cl_index(descr
->cld_obj
, ext
->start
);
269 descr
->cld_end
= cl_index(descr
->cld_obj
, ext
->end
);
270 descr
->cld_gid
= ext
->gid
;
272 /* no lvb update for matched lock */
274 LASSERT(oscl
->ols_flags
& LDLM_FL_LVB_READY
);
275 osc_lock_lvb_update(env
, cl2osc(oscl
->ols_cl
.cls_obj
),
278 LINVRNT(osc_lock_invariant(oscl
));
280 unlock_res_and_lock(dlmlock
);
282 LASSERT(oscl
->ols_state
!= OLS_GRANTED
);
283 oscl
->ols_state
= OLS_GRANTED
;
287 * Lock upcall function that is executed either when a reply to ENQUEUE rpc is
288 * received from a server, or after osc_enqueue_base() matched a local DLM
291 static int osc_lock_upcall(void *cookie
, struct lustre_handle
*lockh
,
294 struct osc_lock
*oscl
= cookie
;
295 struct cl_lock_slice
*slice
= &oscl
->ols_cl
;
297 struct cl_env_nest nest
;
300 env
= cl_env_nested_get(&nest
);
301 /* should never happen, similar to osc_ldlm_blocking_ast(). */
302 LASSERT(!IS_ERR(env
));
304 rc
= ldlm_error2errno(errcode
);
305 if (oscl
->ols_state
== OLS_ENQUEUED
) {
306 oscl
->ols_state
= OLS_UPCALL_RECEIVED
;
307 } else if (oscl
->ols_state
== OLS_CANCELLED
) {
310 CERROR("Impossible state: %d\n", oscl
->ols_state
);
315 osc_lock_granted(env
, oscl
, lockh
, errcode
== ELDLM_OK
);
317 /* Error handling, some errors are tolerable. */
318 if (oscl
->ols_locklessable
&& rc
== -EUSERS
) {
319 /* This is a tolerable error, turn this lock into
322 osc_object_set_contended(cl2osc(slice
->cls_obj
));
323 LASSERT(slice
->cls_ops
== &osc_lock_ops
);
325 /* Change this lock to ldlmlock-less lock. */
326 osc_lock_to_lockless(env
, oscl
, 1);
327 oscl
->ols_state
= OLS_GRANTED
;
329 } else if (oscl
->ols_glimpse
&& rc
== -ENAVAIL
) {
330 LASSERT(oscl
->ols_flags
& LDLM_FL_LVB_READY
);
331 osc_lock_lvb_update(env
, cl2osc(slice
->cls_obj
),
332 NULL
, &oscl
->ols_lvb
);
333 /* Hide the error. */
338 cl_sync_io_note(env
, oscl
->ols_owner
, rc
);
339 cl_env_nested_put(&nest
, env
);
344 static int osc_lock_upcall_agl(void *cookie
, struct lustre_handle
*lockh
,
347 struct osc_object
*osc
= cookie
;
348 struct ldlm_lock
*dlmlock
;
350 struct cl_env_nest nest
;
352 env
= cl_env_nested_get(&nest
);
353 LASSERT(!IS_ERR(env
));
355 if (errcode
== ELDLM_LOCK_MATCHED
) {
360 if (errcode
!= ELDLM_OK
)
363 dlmlock
= ldlm_handle2lock(lockh
);
366 lock_res_and_lock(dlmlock
);
367 LASSERT(dlmlock
->l_granted_mode
== dlmlock
->l_req_mode
);
369 /* there is no osc_lock associated with AGL lock */
370 osc_lock_lvb_update(env
, osc
, dlmlock
, NULL
);
372 unlock_res_and_lock(dlmlock
);
373 LDLM_LOCK_PUT(dlmlock
);
376 cl_object_put(env
, osc2cl(osc
));
377 cl_env_nested_put(&nest
, env
);
378 return ldlm_error2errno(errcode
);
381 static int osc_lock_flush(struct osc_object
*obj
, pgoff_t start
, pgoff_t end
,
382 enum cl_lock_mode mode
, int discard
)
385 struct cl_env_nest nest
;
389 env
= cl_env_nested_get(&nest
);
393 if (mode
== CLM_WRITE
) {
394 rc
= osc_cache_writeback_range(env
, obj
, start
, end
, 1,
396 CDEBUG(D_CACHE
, "object %p: [%lu -> %lu] %d pages were %s.\n",
398 discard
? "discarded" : "written back");
403 rc2
= osc_lock_discard_pages(env
, obj
, start
, end
, mode
);
404 if (rc
== 0 && rc2
< 0)
407 cl_env_nested_put(&nest
, env
);
412 * Helper for osc_dlm_blocking_ast() handling discrepancies between cl_lock
413 * and ldlm_lock caches.
415 static int osc_dlm_blocking_ast0(const struct lu_env
*env
,
416 struct ldlm_lock
*dlmlock
,
417 void *data
, int flag
)
419 struct cl_object
*obj
= NULL
;
422 enum cl_lock_mode mode
= CLM_READ
;
424 LASSERT(flag
== LDLM_CB_CANCELING
);
426 lock_res_and_lock(dlmlock
);
427 if (dlmlock
->l_granted_mode
!= dlmlock
->l_req_mode
) {
428 dlmlock
->l_ast_data
= NULL
;
429 unlock_res_and_lock(dlmlock
);
433 discard
= ldlm_is_discard_data(dlmlock
);
434 if (dlmlock
->l_granted_mode
& (LCK_PW
| LCK_GROUP
))
437 if (dlmlock
->l_ast_data
) {
438 obj
= osc2cl(dlmlock
->l_ast_data
);
439 dlmlock
->l_ast_data
= NULL
;
444 unlock_res_and_lock(dlmlock
);
446 /* if l_ast_data is NULL, the dlmlock was enqueued by AGL or
447 * the object has been destroyed.
450 struct ldlm_extent
*extent
= &dlmlock
->l_policy_data
.l_extent
;
451 struct cl_attr
*attr
= &osc_env_info(env
)->oti_attr
;
454 /* Destroy pages covered by the extent of the DLM lock */
455 result
= osc_lock_flush(cl2osc(obj
),
456 cl_index(obj
, extent
->start
),
457 cl_index(obj
, extent
->end
),
460 /* losing a lock, update kms */
461 lock_res_and_lock(dlmlock
);
462 cl_object_attr_lock(obj
);
463 /* Must get the value under the lock to avoid race. */
464 old_kms
= cl2osc(obj
)->oo_oinfo
->loi_kms
;
465 /* Update the kms. Need to loop all granted locks.
466 * Not a problem for the client
468 attr
->cat_kms
= ldlm_extent_shift_kms(dlmlock
, old_kms
);
470 cl_object_attr_set(env
, obj
, attr
, CAT_KMS
);
471 cl_object_attr_unlock(obj
);
472 unlock_res_and_lock(dlmlock
);
474 cl_object_put(env
, obj
);
480 * Blocking ast invoked by ldlm when dlm lock is either blocking progress of
481 * some other lock, or is canceled. This function is installed as a
482 * ldlm_lock::l_blocking_ast() for client extent locks.
484 * Control flow is tricky, because ldlm uses the same call-back
485 * (ldlm_lock::l_blocking_ast()) for both blocking and cancellation ast's.
487 * \param dlmlock lock for which ast occurred.
489 * \param new description of a conflicting lock in case of blocking ast.
491 * \param data value of dlmlock->l_ast_data
493 * \param flag LDLM_CB_BLOCKING or LDLM_CB_CANCELING. Used to distinguish
494 * cancellation and blocking ast's.
496 * Possible use cases:
498 * - ldlm calls dlmlock->l_blocking_ast(..., LDLM_CB_CANCELING) to cancel
499 * lock due to lock lru pressure, or explicit user request to purge
502 * - ldlm calls dlmlock->l_blocking_ast(..., LDLM_CB_BLOCKING) to notify
503 * us that dlmlock conflicts with another lock that some client is
504 * enqueing. Lock is canceled.
506 * - cl_lock_cancel() is called. osc_lock_cancel() calls
507 * ldlm_cli_cancel() that calls
509 * dlmlock->l_blocking_ast(..., LDLM_CB_CANCELING)
511 * recursively entering osc_ldlm_blocking_ast().
513 * - client cancels lock voluntary (e.g., as a part of early cancellation):
516 * osc_lock_cancel()->
517 * ldlm_cli_cancel()->
518 * dlmlock->l_blocking_ast(..., LDLM_CB_CANCELING)
521 static int osc_ldlm_blocking_ast(struct ldlm_lock
*dlmlock
,
522 struct ldlm_lock_desc
*new, void *data
,
528 case LDLM_CB_BLOCKING
: {
529 struct lustre_handle lockh
;
531 ldlm_lock2handle(dlmlock
, &lockh
);
532 result
= ldlm_cli_cancel(&lockh
, LCF_ASYNC
);
533 if (result
== -ENODATA
)
537 case LDLM_CB_CANCELING
: {
539 struct cl_env_nest nest
;
542 * This can be called in the context of outer IO, e.g.,
544 * osc_enqueue_base()->...
545 * ->ldlm_prep_elc_req()->...
546 * ->ldlm_cancel_callback()->...
547 * ->osc_ldlm_blocking_ast()
549 * new environment has to be created to not corrupt outer
552 env
= cl_env_nested_get(&nest
);
554 result
= PTR_ERR(env
);
558 result
= osc_dlm_blocking_ast0(env
, dlmlock
, data
, flag
);
559 cl_env_nested_put(&nest
, env
);
568 static int osc_ldlm_glimpse_ast(struct ldlm_lock
*dlmlock
, void *data
)
570 struct ptlrpc_request
*req
= data
;
571 struct cl_env_nest nest
;
574 struct req_capsule
*cap
;
577 LASSERT(lustre_msg_get_opc(req
->rq_reqmsg
) == LDLM_GL_CALLBACK
);
579 env
= cl_env_nested_get(&nest
);
581 struct cl_object
*obj
= NULL
;
583 lock_res_and_lock(dlmlock
);
584 if (dlmlock
->l_ast_data
) {
585 obj
= osc2cl(dlmlock
->l_ast_data
);
588 unlock_res_and_lock(dlmlock
);
591 /* Do not grab the mutex of cl_lock for glimpse.
592 * See LU-1274 for details.
593 * BTW, it's okay for cl_lock to be cancelled during
594 * this period because server can handle this race.
595 * See ldlm_server_glimpse_ast() for details.
596 * cl_lock_mutex_get(env, lock);
599 req_capsule_extend(cap
, &RQF_LDLM_GL_CALLBACK
);
600 req_capsule_set_size(cap
, &RMF_DLM_LVB
, RCL_SERVER
,
602 result
= req_capsule_server_pack(cap
);
604 lvb
= req_capsule_server_get(cap
, &RMF_DLM_LVB
);
605 result
= cl_object_glimpse(env
, obj
, lvb
);
607 if (!exp_connect_lvb_type(req
->rq_export
))
608 req_capsule_shrink(&req
->rq_pill
,
610 sizeof(struct ost_lvb_v1
),
612 cl_object_put(env
, obj
);
615 * These errors are normal races, so we don't want to
616 * fill the console with messages by calling
619 lustre_pack_reply(req
, 1, NULL
, NULL
);
620 result
= -ELDLM_NO_LOCK_DATA
;
622 cl_env_nested_put(&nest
, env
);
624 result
= PTR_ERR(env
);
626 req
->rq_status
= result
;
630 static int weigh_cb(const struct lu_env
*env
, struct cl_io
*io
,
631 struct osc_page
*ops
, void *cbdata
)
633 struct cl_page
*page
= ops
->ops_cl
.cpl_page
;
635 if (cl_page_is_vmlocked(env
, page
) ||
636 PageDirty(page
->cp_vmpage
) || PageWriteback(page
->cp_vmpage
)
638 (*(unsigned long *)cbdata
)++;
639 return CLP_GANG_ABORT
;
642 return CLP_GANG_OKAY
;
645 static unsigned long osc_lock_weight(const struct lu_env
*env
,
646 struct osc_object
*oscobj
,
647 struct ldlm_extent
*extent
)
649 struct cl_io
*io
= &osc_env_info(env
)->oti_io
;
650 struct cl_object
*obj
= cl_object_top(&oscobj
->oo_cl
);
651 unsigned long npages
= 0;
655 io
->ci_ignore_layout
= 1;
656 result
= cl_io_init(env
, io
, CIT_MISC
, io
->ci_obj
);
661 result
= osc_page_gang_lookup(env
, io
, oscobj
,
662 cl_index(obj
, extent
->start
),
663 cl_index(obj
, extent
->end
),
664 weigh_cb
, (void *)&npages
);
665 if (result
== CLP_GANG_ABORT
)
667 if (result
== CLP_GANG_RESCHED
)
669 } while (result
!= CLP_GANG_OKAY
);
676 * Get the weight of dlm lock for early cancellation.
678 unsigned long osc_ldlm_weigh_ast(struct ldlm_lock
*dlmlock
)
680 struct cl_env_nest nest
;
682 struct osc_object
*obj
;
683 struct osc_lock
*oscl
;
684 unsigned long weight
;
689 * osc_ldlm_weigh_ast has a complex context since it might be called
690 * because of lock canceling, or from user's input. We have to make
691 * a new environment for it. Probably it is implementation safe to use
692 * the upper context because cl_lock_put don't modify environment
693 * variables. But just in case ..
695 env
= cl_env_nested_get(&nest
);
697 /* Mostly because lack of memory, do not eliminate this lock */
700 LASSERT(dlmlock
->l_resource
->lr_type
== LDLM_EXTENT
);
701 obj
= dlmlock
->l_ast_data
;
707 spin_lock(&obj
->oo_ol_spin
);
708 list_for_each_entry(oscl
, &obj
->oo_ol_list
, ols_nextlock_oscobj
) {
709 if (oscl
->ols_dlmlock
&& oscl
->ols_dlmlock
!= dlmlock
)
713 spin_unlock(&obj
->oo_ol_spin
);
716 * If the lock is being used by an IO, definitely not cancel it.
722 weight
= osc_lock_weight(env
, obj
, &dlmlock
->l_policy_data
.l_extent
);
725 cl_env_nested_put(&nest
, env
);
729 static void osc_lock_build_einfo(const struct lu_env
*env
,
730 const struct cl_lock
*lock
,
731 struct osc_object
*osc
,
732 struct ldlm_enqueue_info
*einfo
)
734 einfo
->ei_type
= LDLM_EXTENT
;
735 einfo
->ei_mode
= osc_cl_lock2ldlm(lock
->cll_descr
.cld_mode
);
736 einfo
->ei_cb_bl
= osc_ldlm_blocking_ast
;
737 einfo
->ei_cb_cp
= ldlm_completion_ast
;
738 einfo
->ei_cb_gl
= osc_ldlm_glimpse_ast
;
739 einfo
->ei_cbdata
= osc
; /* value to be put into ->l_ast_data */
743 * Determine if the lock should be converted into a lockless lock.
746 * - if the lock has an explicit requirement for a non-lockless lock;
747 * - if the io lock request type ci_lockreq;
748 * - send the enqueue rpc to ost to make the further decision;
749 * - special treat to truncate lockless lock
751 * Additional policy can be implemented here, e.g., never do lockless-io
754 static void osc_lock_to_lockless(const struct lu_env
*env
,
755 struct osc_lock
*ols
, int force
)
757 struct cl_lock_slice
*slice
= &ols
->ols_cl
;
759 LASSERT(ols
->ols_state
== OLS_NEW
||
760 ols
->ols_state
== OLS_UPCALL_RECEIVED
);
763 ols
->ols_locklessable
= 1;
764 slice
->cls_ops
= &osc_lock_lockless_ops
;
766 struct osc_io
*oio
= osc_env_io(env
);
767 struct cl_io
*io
= oio
->oi_cl
.cis_io
;
768 struct cl_object
*obj
= slice
->cls_obj
;
769 struct osc_object
*oob
= cl2osc(obj
);
770 const struct osc_device
*osd
= lu2osc_dev(obj
->co_lu
.lo_dev
);
771 struct obd_connect_data
*ocd
;
773 LASSERT(io
->ci_lockreq
== CILR_MANDATORY
||
774 io
->ci_lockreq
== CILR_MAYBE
||
775 io
->ci_lockreq
== CILR_NEVER
);
777 ocd
= &class_exp2cliimp(osc_export(oob
))->imp_connect_data
;
778 ols
->ols_locklessable
= (io
->ci_type
!= CIT_SETATTR
) &&
779 (io
->ci_lockreq
== CILR_MAYBE
) &&
780 (ocd
->ocd_connect_flags
& OBD_CONNECT_SRVLOCK
);
781 if (io
->ci_lockreq
== CILR_NEVER
||
783 (ols
->ols_locklessable
&& osc_object_is_contended(oob
)) ||
784 /* lockless truncate */
785 (cl_io_is_trunc(io
) &&
786 (ocd
->ocd_connect_flags
& OBD_CONNECT_TRUNCLOCK
) &&
787 osd
->od_lockless_truncate
)) {
788 ols
->ols_locklessable
= 1;
789 slice
->cls_ops
= &osc_lock_lockless_ops
;
792 LASSERT(ergo(ols
->ols_glimpse
, !osc_lock_is_lockless(ols
)));
795 static bool osc_lock_compatible(const struct osc_lock
*qing
,
796 const struct osc_lock
*qed
)
798 struct cl_lock_descr
*qed_descr
= &qed
->ols_cl
.cls_lock
->cll_descr
;
799 struct cl_lock_descr
*qing_descr
= &qing
->ols_cl
.cls_lock
->cll_descr
;
801 if (qed
->ols_glimpse
)
804 if (qing_descr
->cld_mode
== CLM_READ
&& qed_descr
->cld_mode
== CLM_READ
)
807 if (qed
->ols_state
< OLS_GRANTED
)
810 if (qed_descr
->cld_mode
>= qing_descr
->cld_mode
&&
811 qed_descr
->cld_start
<= qing_descr
->cld_start
&&
812 qed_descr
->cld_end
>= qing_descr
->cld_end
)
818 static void osc_lock_wake_waiters(const struct lu_env
*env
,
819 struct osc_object
*osc
,
820 struct osc_lock
*oscl
)
822 spin_lock(&osc
->oo_ol_spin
);
823 list_del_init(&oscl
->ols_nextlock_oscobj
);
824 spin_unlock(&osc
->oo_ol_spin
);
826 spin_lock(&oscl
->ols_lock
);
827 while (!list_empty(&oscl
->ols_waiting_list
)) {
828 struct osc_lock
*scan
;
830 scan
= list_entry(oscl
->ols_waiting_list
.next
, struct osc_lock
,
832 list_del_init(&scan
->ols_wait_entry
);
834 cl_sync_io_note(env
, scan
->ols_owner
, 0);
836 spin_unlock(&oscl
->ols_lock
);
839 static void osc_lock_enqueue_wait(const struct lu_env
*env
,
840 struct osc_object
*obj
,
841 struct osc_lock
*oscl
)
843 struct osc_lock
*tmp_oscl
;
844 struct cl_lock_descr
*need
= &oscl
->ols_cl
.cls_lock
->cll_descr
;
845 struct cl_sync_io
*waiter
= &osc_env_info(env
)->oti_anchor
;
847 spin_lock(&obj
->oo_ol_spin
);
848 list_add_tail(&oscl
->ols_nextlock_oscobj
, &obj
->oo_ol_list
);
851 list_for_each_entry(tmp_oscl
, &obj
->oo_ol_list
,
852 ols_nextlock_oscobj
) {
853 struct cl_lock_descr
*descr
;
855 if (tmp_oscl
== oscl
)
858 descr
= &tmp_oscl
->ols_cl
.cls_lock
->cll_descr
;
859 if (descr
->cld_start
> need
->cld_end
||
860 descr
->cld_end
< need
->cld_start
)
863 /* We're not supposed to give up group lock */
864 if (descr
->cld_mode
== CLM_GROUP
)
867 if (!osc_lock_is_lockless(oscl
) &&
868 osc_lock_compatible(oscl
, tmp_oscl
))
871 /* wait for conflicting lock to be canceled */
872 cl_sync_io_init(waiter
, 1, cl_sync_io_end
);
873 oscl
->ols_owner
= waiter
;
875 spin_lock(&tmp_oscl
->ols_lock
);
876 /* add oscl into tmp's ols_waiting list */
877 list_add_tail(&oscl
->ols_wait_entry
,
878 &tmp_oscl
->ols_waiting_list
);
879 spin_unlock(&tmp_oscl
->ols_lock
);
881 spin_unlock(&obj
->oo_ol_spin
);
882 (void)cl_sync_io_wait(env
, waiter
, 0);
884 spin_lock(&obj
->oo_ol_spin
);
885 oscl
->ols_owner
= NULL
;
888 spin_unlock(&obj
->oo_ol_spin
);
892 * Implementation of cl_lock_operations::clo_enqueue() method for osc
893 * layer. This initiates ldlm enqueue:
895 * - cancels conflicting locks early (osc_lock_enqueue_wait());
897 * - calls osc_enqueue_base() to do actual enqueue.
899 * osc_enqueue_base() is supplied with an upcall function that is executed
900 * when lock is received either after a local cached ldlm lock is matched, or
901 * when a reply from the server is received.
903 * This function does not wait for the network communication to complete.
905 static int osc_lock_enqueue(const struct lu_env
*env
,
906 const struct cl_lock_slice
*slice
,
907 struct cl_io
*unused
, struct cl_sync_io
*anchor
)
909 struct osc_thread_info
*info
= osc_env_info(env
);
910 struct osc_io
*oio
= osc_env_io(env
);
911 struct osc_object
*osc
= cl2osc(slice
->cls_obj
);
912 struct osc_lock
*oscl
= cl2osc_lock(slice
);
913 struct cl_lock
*lock
= slice
->cls_lock
;
914 struct ldlm_res_id
*resname
= &info
->oti_resname
;
915 ldlm_policy_data_t
*policy
= &info
->oti_policy
;
916 osc_enqueue_upcall_f upcall
= osc_lock_upcall
;
921 LASSERTF(ergo(oscl
->ols_glimpse
, lock
->cll_descr
.cld_mode
<= CLM_READ
),
922 "lock = %p, ols = %p\n", lock
, oscl
);
924 if (oscl
->ols_state
== OLS_GRANTED
)
927 if (oscl
->ols_flags
& LDLM_FL_TEST_LOCK
)
930 if (oscl
->ols_glimpse
) {
931 LASSERT(equi(oscl
->ols_agl
, !anchor
));
936 osc_lock_enqueue_wait(env
, osc
, oscl
);
938 /* we can grant lockless lock right after all conflicting locks
941 if (osc_lock_is_lockless(oscl
)) {
942 oscl
->ols_state
= OLS_GRANTED
;
943 oio
->oi_lockless
= 1;
948 oscl
->ols_state
= OLS_ENQUEUED
;
950 atomic_inc(&anchor
->csi_sync_nr
);
951 oscl
->ols_owner
= anchor
;
955 * DLM lock's ast data must be osc_object;
956 * if glimpse or AGL lock, async of osc_enqueue_base() must be true,
957 * DLM's enqueue callback set to osc_lock_upcall() with cookie as
960 ostid_build_res_name(&osc
->oo_oinfo
->loi_oi
, resname
);
961 osc_lock_build_einfo(env
, lock
, osc
, &oscl
->ols_einfo
);
962 osc_lock_build_policy(env
, lock
, policy
);
964 oscl
->ols_einfo
.ei_cbdata
= NULL
;
965 /* hold a reference for callback */
966 cl_object_get(osc2cl(osc
));
967 upcall
= osc_lock_upcall_agl
;
970 result
= osc_enqueue_base(osc_export(osc
), resname
, &oscl
->ols_flags
,
971 policy
, &oscl
->ols_lvb
,
972 osc
->oo_oinfo
->loi_kms_valid
,
974 &oscl
->ols_einfo
, PTLRPCD_SET
, async
,
977 oscl
->ols_state
= OLS_CANCELLED
;
978 osc_lock_wake_waiters(env
, osc
, oscl
);
980 /* hide error for AGL lock. */
982 cl_object_put(env
, osc2cl(osc
));
986 cl_sync_io_note(env
, anchor
, result
);
988 if (osc_lock_is_lockless(oscl
)) {
989 oio
->oi_lockless
= 1;
991 LASSERT(oscl
->ols_state
== OLS_GRANTED
);
992 LASSERT(oscl
->ols_hold
);
993 LASSERT(oscl
->ols_dlmlock
);
1000 * Breaks a link between osc_lock and dlm_lock.
1002 static void osc_lock_detach(const struct lu_env
*env
, struct osc_lock
*olck
)
1004 struct ldlm_lock
*dlmlock
;
1006 dlmlock
= olck
->ols_dlmlock
;
1010 if (olck
->ols_hold
) {
1012 osc_cancel_base(&olck
->ols_handle
, olck
->ols_einfo
.ei_mode
);
1013 olck
->ols_handle
.cookie
= 0ULL;
1016 olck
->ols_dlmlock
= NULL
;
1018 /* release a reference taken in osc_lock_upcall(). */
1019 LASSERT(olck
->ols_has_ref
);
1020 lu_ref_del(&dlmlock
->l_reference
, "osc_lock", olck
);
1021 LDLM_LOCK_RELEASE(dlmlock
);
1022 olck
->ols_has_ref
= 0;
1026 * Implements cl_lock_operations::clo_cancel() method for osc layer. This is
1027 * called (as part of cl_lock_cancel()) when lock is canceled either voluntary
1028 * (LRU pressure, early cancellation, umount, etc.) or due to the conflict
1029 * with some other lock some where in the cluster. This function does the
1032 * - invalidates all pages protected by this lock (after sending dirty
1033 * ones to the server, as necessary);
1035 * - decref's underlying ldlm lock;
1037 * - cancels ldlm lock (ldlm_cli_cancel()).
1039 static void osc_lock_cancel(const struct lu_env
*env
,
1040 const struct cl_lock_slice
*slice
)
1042 struct osc_object
*obj
= cl2osc(slice
->cls_obj
);
1043 struct osc_lock
*oscl
= cl2osc_lock(slice
);
1045 LINVRNT(osc_lock_invariant(oscl
));
1047 osc_lock_detach(env
, oscl
);
1048 oscl
->ols_state
= OLS_CANCELLED
;
1049 oscl
->ols_flags
&= ~LDLM_FL_LVB_READY
;
1051 osc_lock_wake_waiters(env
, obj
, oscl
);
1054 static int osc_lock_print(const struct lu_env
*env
, void *cookie
,
1055 lu_printer_t p
, const struct cl_lock_slice
*slice
)
1057 struct osc_lock
*lock
= cl2osc_lock(slice
);
1059 (*p
)(env
, cookie
, "%p %#16llx %#llx %d %p ",
1060 lock
->ols_dlmlock
, lock
->ols_flags
, lock
->ols_handle
.cookie
,
1061 lock
->ols_state
, lock
->ols_owner
);
1062 osc_lvb_print(env
, cookie
, p
, &lock
->ols_lvb
);
1066 static const struct cl_lock_operations osc_lock_ops
= {
1067 .clo_fini
= osc_lock_fini
,
1068 .clo_enqueue
= osc_lock_enqueue
,
1069 .clo_cancel
= osc_lock_cancel
,
1070 .clo_print
= osc_lock_print
,
1073 static void osc_lock_lockless_cancel(const struct lu_env
*env
,
1074 const struct cl_lock_slice
*slice
)
1076 struct osc_lock
*ols
= cl2osc_lock(slice
);
1077 struct osc_object
*osc
= cl2osc(slice
->cls_obj
);
1078 struct cl_lock_descr
*descr
= &slice
->cls_lock
->cll_descr
;
1081 LASSERT(!ols
->ols_dlmlock
);
1082 result
= osc_lock_flush(osc
, descr
->cld_start
, descr
->cld_end
,
1083 descr
->cld_mode
, 0);
1085 CERROR("Pages for lockless lock %p were not purged(%d)\n",
1088 osc_lock_wake_waiters(env
, osc
, ols
);
1091 static const struct cl_lock_operations osc_lock_lockless_ops
= {
1092 .clo_fini
= osc_lock_fini
,
1093 .clo_enqueue
= osc_lock_enqueue
,
1094 .clo_cancel
= osc_lock_lockless_cancel
,
1095 .clo_print
= osc_lock_print
1098 static void osc_lock_set_writer(const struct lu_env
*env
,
1099 const struct cl_io
*io
,
1100 struct cl_object
*obj
, struct osc_lock
*oscl
)
1102 struct cl_lock_descr
*descr
= &oscl
->ols_cl
.cls_lock
->cll_descr
;
1106 if (!cl_object_same(io
->ci_obj
, obj
))
1109 if (likely(io
->ci_type
== CIT_WRITE
)) {
1110 io_start
= cl_index(obj
, io
->u
.ci_rw
.crw_pos
);
1111 io_end
= cl_index(obj
, io
->u
.ci_rw
.crw_pos
+
1112 io
->u
.ci_rw
.crw_count
- 1);
1113 if (cl_io_is_append(io
)) {
1115 io_end
= CL_PAGE_EOF
;
1118 LASSERT(cl_io_is_mkwrite(io
));
1119 io_start
= io
->u
.ci_fault
.ft_index
;
1120 io_end
= io
->u
.ci_fault
.ft_index
;
1123 if (descr
->cld_mode
>= CLM_WRITE
&&
1124 descr
->cld_start
<= io_start
&& descr
->cld_end
>= io_end
) {
1125 struct osc_io
*oio
= osc_env_io(env
);
1127 /* There must be only one lock to match the write region */
1128 LASSERT(!oio
->oi_write_osclock
);
1129 oio
->oi_write_osclock
= oscl
;
1133 int osc_lock_init(const struct lu_env
*env
,
1134 struct cl_object
*obj
, struct cl_lock
*lock
,
1135 const struct cl_io
*io
)
1137 struct osc_lock
*oscl
;
1138 __u32 enqflags
= lock
->cll_descr
.cld_enq_flags
;
1140 oscl
= kmem_cache_zalloc(osc_lock_kmem
, GFP_NOFS
);
1144 oscl
->ols_state
= OLS_NEW
;
1145 spin_lock_init(&oscl
->ols_lock
);
1146 INIT_LIST_HEAD(&oscl
->ols_waiting_list
);
1147 INIT_LIST_HEAD(&oscl
->ols_wait_entry
);
1148 INIT_LIST_HEAD(&oscl
->ols_nextlock_oscobj
);
1150 oscl
->ols_flags
= osc_enq2ldlm_flags(enqflags
);
1151 oscl
->ols_agl
= !!(enqflags
& CEF_AGL
);
1153 oscl
->ols_flags
|= LDLM_FL_BLOCK_NOWAIT
;
1154 if (oscl
->ols_flags
& LDLM_FL_HAS_INTENT
) {
1155 oscl
->ols_flags
|= LDLM_FL_BLOCK_GRANTED
;
1156 oscl
->ols_glimpse
= 1;
1159 cl_lock_slice_add(lock
, &oscl
->ols_cl
, obj
, &osc_lock_ops
);
1161 if (!(enqflags
& CEF_MUST
))
1162 /* try to convert this lock to a lockless lock */
1163 osc_lock_to_lockless(env
, oscl
, (enqflags
& CEF_NEVER
));
1164 if (oscl
->ols_locklessable
&& !(enqflags
& CEF_DISCARD_DATA
))
1165 oscl
->ols_flags
|= LDLM_FL_DENY_ON_CONTENTION
;
1167 if (io
->ci_type
== CIT_WRITE
|| cl_io_is_mkwrite(io
))
1168 osc_lock_set_writer(env
, io
, obj
, oscl
);
1171 LDLM_DEBUG_NOLOCK("lock %p, osc lock %p, flags %llx\n",
1172 lock
, oscl
, oscl
->ols_flags
);
1178 * Finds an existing lock covering given index and optionally different from a
1179 * given \a except lock.
1181 struct ldlm_lock
*osc_dlmlock_at_pgoff(const struct lu_env
*env
,
1182 struct osc_object
*obj
, pgoff_t index
,
1183 int pending
, int canceling
)
1185 struct osc_thread_info
*info
= osc_env_info(env
);
1186 struct ldlm_res_id
*resname
= &info
->oti_resname
;
1187 ldlm_policy_data_t
*policy
= &info
->oti_policy
;
1188 struct lustre_handle lockh
;
1189 struct ldlm_lock
*lock
= NULL
;
1190 enum ldlm_mode mode
;
1193 ostid_build_res_name(&obj
->oo_oinfo
->loi_oi
, resname
);
1194 osc_index2policy(policy
, osc2cl(obj
), index
, index
);
1195 policy
->l_extent
.gid
= LDLM_GID_ANY
;
1197 flags
= LDLM_FL_BLOCK_GRANTED
| LDLM_FL_TEST_LOCK
;
1199 flags
|= LDLM_FL_CBPENDING
;
1201 * It is fine to match any group lock since there could be only one
1202 * with a uniq gid and it conflicts with all other lock modes too
1205 mode
= ldlm_lock_match(osc_export(obj
)->exp_obd
->obd_namespace
,
1206 flags
, resname
, LDLM_EXTENT
, policy
,
1207 LCK_PR
| LCK_PW
| LCK_GROUP
, &lockh
, canceling
);
1209 lock
= ldlm_handle2lock(&lockh
);
1210 /* RACE: the lock is cancelled so let's try again */
1211 if (unlikely(!lock
))