4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License version 2 for more details (a copy is included
14 * in the LICENSE file that accompanied this code).
16 * You should have received a copy of the GNU General Public License
17 * version 2 along with this program; If not, see
18 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
20 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21 * CA 95054 USA or visit www.sun.com if you need additional information or
27 * Copyright (c) 2008, 2010, Oracle and/or its affiliates. All rights reserved.
28 * Use is subject to license terms.
30 * Copyright (c) 2011, 2015, Intel Corporation.
33 * This file is part of Lustre, http://www.lustre.org/
34 * Lustre is a trademark of Sun Microsystems, Inc.
36 * Implementation of cl_lock for OSC layer.
38 * Author: Nikita Danilov <nikita.danilov@sun.com>
39 * Author: Jinshan Xiong <jinshan.xiong@intel.com>
42 #define DEBUG_SUBSYSTEM S_OSC
44 #include "../../include/linux/libcfs/libcfs.h"
45 /* fid_build_reg_res_name() */
46 #include "../include/lustre_fid.h"
48 #include "osc_cl_internal.h"
54 /*****************************************************************************
60 static const struct cl_lock_operations osc_lock_ops
;
61 static const struct cl_lock_operations osc_lock_lockless_ops
;
62 static void osc_lock_to_lockless(const struct lu_env
*env
,
63 struct osc_lock
*ols
, int force
);
65 int osc_lock_is_lockless(const struct osc_lock
*olck
)
67 return (olck
->ols_cl
.cls_ops
== &osc_lock_lockless_ops
);
71 * Returns a weak pointer to the ldlm lock identified by a handle. Returned
72 * pointer cannot be dereferenced, as lock is not protected from concurrent
73 * reclaim. This function is a helper for osc_lock_invariant().
75 static struct ldlm_lock
*osc_handle_ptr(struct lustre_handle
*handle
)
77 struct ldlm_lock
*lock
;
79 lock
= ldlm_handle2lock(handle
);
86 * Invariant that has to be true all of the time.
88 static int osc_lock_invariant(struct osc_lock
*ols
)
90 struct ldlm_lock
*lock
= osc_handle_ptr(&ols
->ols_handle
);
91 struct ldlm_lock
*olock
= ols
->ols_dlmlock
;
92 int handle_used
= lustre_handle_is_used(&ols
->ols_handle
);
94 if (ergo(osc_lock_is_lockless(ols
),
95 ols
->ols_locklessable
&& !ols
->ols_dlmlock
))
99 * If all the following "ergo"s are true, return 1, otherwise 0
101 if (!ergo(olock
, handle_used
))
104 if (!ergo(olock
, olock
->l_handle
.h_cookie
== ols
->ols_handle
.cookie
))
107 if (!ergo(handle_used
,
108 ergo(lock
&& olock
, lock
== olock
) &&
109 ergo(!lock
, !olock
)))
112 * Check that ->ols_handle and ->ols_dlmlock are consistent, but
113 * take into account that they are set at the different time.
115 if (!ergo(ols
->ols_state
== OLS_CANCELLED
,
116 !olock
&& !handle_used
))
119 * DLM lock is destroyed only after we have seen cancellation
122 if (!ergo(olock
&& ols
->ols_state
< OLS_CANCELLED
,
123 !ldlm_is_destroyed(olock
)))
126 if (!ergo(ols
->ols_state
== OLS_GRANTED
,
127 olock
&& olock
->l_req_mode
== olock
->l_granted_mode
&&
133 /*****************************************************************************
139 static void osc_lock_fini(const struct lu_env
*env
,
140 struct cl_lock_slice
*slice
)
142 struct osc_lock
*ols
= cl2osc_lock(slice
);
144 LINVRNT(osc_lock_invariant(ols
));
145 LASSERT(!ols
->ols_dlmlock
);
147 kmem_cache_free(osc_lock_kmem
, ols
);
150 static void osc_lock_build_policy(const struct lu_env
*env
,
151 const struct cl_lock
*lock
,
152 ldlm_policy_data_t
*policy
)
154 const struct cl_lock_descr
*d
= &lock
->cll_descr
;
156 osc_index2policy(policy
, d
->cld_obj
, d
->cld_start
, d
->cld_end
);
157 policy
->l_extent
.gid
= d
->cld_gid
;
160 static __u64
osc_enq2ldlm_flags(__u32 enqflags
)
164 LASSERT((enqflags
& ~CEF_MASK
) == 0);
166 if (enqflags
& CEF_NONBLOCK
)
167 result
|= LDLM_FL_BLOCK_NOWAIT
;
168 if (enqflags
& CEF_ASYNC
)
169 result
|= LDLM_FL_HAS_INTENT
;
170 if (enqflags
& CEF_DISCARD_DATA
)
171 result
|= LDLM_FL_AST_DISCARD_DATA
;
172 if (enqflags
& CEF_PEEK
)
173 result
|= LDLM_FL_TEST_LOCK
;
178 * Updates object attributes from a lock value block (lvb) received together
179 * with the DLM lock reply from the server. Copy of osc_update_enqueue()
182 * This can be optimized to not update attributes when lock is a result of a
185 * Called under lock and resource spin-locks.
187 static void osc_lock_lvb_update(const struct lu_env
*env
,
188 struct osc_object
*osc
,
189 struct ldlm_lock
*dlmlock
,
192 struct cl_object
*obj
= osc2cl(osc
);
193 struct lov_oinfo
*oinfo
= osc
->oo_oinfo
;
194 struct cl_attr
*attr
= &osc_env_info(env
)->oti_attr
;
197 valid
= CAT_BLOCKS
| CAT_ATIME
| CAT_CTIME
| CAT_MTIME
| CAT_SIZE
;
199 lvb
= dlmlock
->l_lvb_data
;
201 cl_lvb2attr(attr
, lvb
);
203 cl_object_attr_lock(obj
);
207 check_res_locked(dlmlock
->l_resource
);
208 LASSERT(lvb
== dlmlock
->l_lvb_data
);
209 size
= lvb
->lvb_size
;
211 /* Extend KMS up to the end of this lock and no further
212 * A lock on [x,y] means a KMS of up to y + 1 bytes!
214 if (size
> dlmlock
->l_policy_data
.l_extent
.end
)
215 size
= dlmlock
->l_policy_data
.l_extent
.end
+ 1;
216 if (size
>= oinfo
->loi_kms
) {
217 LDLM_DEBUG(dlmlock
, "lock acquired, setting rss=%llu, kms=%llu",
218 lvb
->lvb_size
, size
);
220 attr
->cat_kms
= size
;
222 LDLM_DEBUG(dlmlock
, "lock acquired, setting rss=%llu; leaving kms=%llu, end=%llu",
223 lvb
->lvb_size
, oinfo
->loi_kms
,
224 dlmlock
->l_policy_data
.l_extent
.end
);
226 ldlm_lock_allow_match_locked(dlmlock
);
229 cl_object_attr_set(env
, obj
, attr
, valid
);
230 cl_object_attr_unlock(obj
);
233 static void osc_lock_granted(const struct lu_env
*env
, struct osc_lock
*oscl
,
234 struct lustre_handle
*lockh
, bool lvb_update
)
236 struct ldlm_lock
*dlmlock
;
238 dlmlock
= ldlm_handle2lock_long(lockh
, 0);
241 /* lock reference taken by ldlm_handle2lock_long() is
242 * owned by osc_lock and released in osc_lock_detach()
244 lu_ref_add(&dlmlock
->l_reference
, "osc_lock", oscl
);
245 oscl
->ols_has_ref
= 1;
247 LASSERT(!oscl
->ols_dlmlock
);
248 oscl
->ols_dlmlock
= dlmlock
;
250 /* This may be a matched lock for glimpse request, do not hold
251 * lock reference in that case.
253 if (!oscl
->ols_glimpse
) {
254 /* hold a refc for non glimpse lock which will
255 * be released in osc_lock_cancel()
257 lustre_handle_copy(&oscl
->ols_handle
, lockh
);
258 ldlm_lock_addref(lockh
, oscl
->ols_einfo
.ei_mode
);
262 /* Lock must have been granted. */
263 lock_res_and_lock(dlmlock
);
264 if (dlmlock
->l_granted_mode
== dlmlock
->l_req_mode
) {
265 struct ldlm_extent
*ext
= &dlmlock
->l_policy_data
.l_extent
;
266 struct cl_lock_descr
*descr
= &oscl
->ols_cl
.cls_lock
->cll_descr
;
268 /* extend the lock extent, otherwise it will have problem when
269 * we decide whether to grant a lockless lock.
271 descr
->cld_mode
= osc_ldlm2cl_lock(dlmlock
->l_granted_mode
);
272 descr
->cld_start
= cl_index(descr
->cld_obj
, ext
->start
);
273 descr
->cld_end
= cl_index(descr
->cld_obj
, ext
->end
);
274 descr
->cld_gid
= ext
->gid
;
276 /* no lvb update for matched lock */
278 LASSERT(oscl
->ols_flags
& LDLM_FL_LVB_READY
);
279 osc_lock_lvb_update(env
, cl2osc(oscl
->ols_cl
.cls_obj
),
282 LINVRNT(osc_lock_invariant(oscl
));
284 unlock_res_and_lock(dlmlock
);
286 LASSERT(oscl
->ols_state
!= OLS_GRANTED
);
287 oscl
->ols_state
= OLS_GRANTED
;
291 * Lock upcall function that is executed either when a reply to ENQUEUE rpc is
292 * received from a server, or after osc_enqueue_base() matched a local DLM
295 static int osc_lock_upcall(void *cookie
, struct lustre_handle
*lockh
,
298 struct osc_lock
*oscl
= cookie
;
299 struct cl_lock_slice
*slice
= &oscl
->ols_cl
;
301 struct cl_env_nest nest
;
304 env
= cl_env_nested_get(&nest
);
305 /* should never happen, similar to osc_ldlm_blocking_ast(). */
306 LASSERT(!IS_ERR(env
));
308 rc
= ldlm_error2errno(errcode
);
309 if (oscl
->ols_state
== OLS_ENQUEUED
) {
310 oscl
->ols_state
= OLS_UPCALL_RECEIVED
;
311 } else if (oscl
->ols_state
== OLS_CANCELLED
) {
314 CERROR("Impossible state: %d\n", oscl
->ols_state
);
319 osc_lock_granted(env
, oscl
, lockh
, errcode
== ELDLM_OK
);
321 /* Error handling, some errors are tolerable. */
322 if (oscl
->ols_locklessable
&& rc
== -EUSERS
) {
323 /* This is a tolerable error, turn this lock into
326 osc_object_set_contended(cl2osc(slice
->cls_obj
));
327 LASSERT(slice
->cls_ops
== &osc_lock_ops
);
329 /* Change this lock to ldlmlock-less lock. */
330 osc_lock_to_lockless(env
, oscl
, 1);
331 oscl
->ols_state
= OLS_GRANTED
;
333 } else if (oscl
->ols_glimpse
&& rc
== -ENAVAIL
) {
334 LASSERT(oscl
->ols_flags
& LDLM_FL_LVB_READY
);
335 osc_lock_lvb_update(env
, cl2osc(slice
->cls_obj
),
336 NULL
, &oscl
->ols_lvb
);
337 /* Hide the error. */
342 cl_sync_io_note(env
, oscl
->ols_owner
, rc
);
343 cl_env_nested_put(&nest
, env
);
348 static int osc_lock_upcall_agl(void *cookie
, struct lustre_handle
*lockh
,
351 struct osc_object
*osc
= cookie
;
352 struct ldlm_lock
*dlmlock
;
354 struct cl_env_nest nest
;
356 env
= cl_env_nested_get(&nest
);
357 LASSERT(!IS_ERR(env
));
359 if (errcode
== ELDLM_LOCK_MATCHED
) {
364 if (errcode
!= ELDLM_OK
)
367 dlmlock
= ldlm_handle2lock(lockh
);
370 lock_res_and_lock(dlmlock
);
371 LASSERT(dlmlock
->l_granted_mode
== dlmlock
->l_req_mode
);
373 /* there is no osc_lock associated with AGL lock */
374 osc_lock_lvb_update(env
, osc
, dlmlock
, NULL
);
376 unlock_res_and_lock(dlmlock
);
377 LDLM_LOCK_PUT(dlmlock
);
380 cl_object_put(env
, osc2cl(osc
));
381 cl_env_nested_put(&nest
, env
);
382 return ldlm_error2errno(errcode
);
385 static int osc_lock_flush(struct osc_object
*obj
, pgoff_t start
, pgoff_t end
,
386 enum cl_lock_mode mode
, int discard
)
389 struct cl_env_nest nest
;
393 env
= cl_env_nested_get(&nest
);
397 if (mode
== CLM_WRITE
) {
398 rc
= osc_cache_writeback_range(env
, obj
, start
, end
, 1,
400 CDEBUG(D_CACHE
, "object %p: [%lu -> %lu] %d pages were %s.\n",
402 discard
? "discarded" : "written back");
407 rc2
= osc_lock_discard_pages(env
, obj
, start
, end
, mode
);
408 if (rc
== 0 && rc2
< 0)
411 cl_env_nested_put(&nest
, env
);
416 * Helper for osc_dlm_blocking_ast() handling discrepancies between cl_lock
417 * and ldlm_lock caches.
419 static int osc_dlm_blocking_ast0(const struct lu_env
*env
,
420 struct ldlm_lock
*dlmlock
,
421 void *data
, int flag
)
423 struct cl_object
*obj
= NULL
;
426 enum cl_lock_mode mode
= CLM_READ
;
428 LASSERT(flag
== LDLM_CB_CANCELING
);
430 lock_res_and_lock(dlmlock
);
431 if (dlmlock
->l_granted_mode
!= dlmlock
->l_req_mode
) {
432 dlmlock
->l_ast_data
= NULL
;
433 unlock_res_and_lock(dlmlock
);
437 discard
= ldlm_is_discard_data(dlmlock
);
438 if (dlmlock
->l_granted_mode
& (LCK_PW
| LCK_GROUP
))
441 if (dlmlock
->l_ast_data
) {
442 obj
= osc2cl(dlmlock
->l_ast_data
);
443 dlmlock
->l_ast_data
= NULL
;
448 unlock_res_and_lock(dlmlock
);
450 /* if l_ast_data is NULL, the dlmlock was enqueued by AGL or
451 * the object has been destroyed.
454 struct ldlm_extent
*extent
= &dlmlock
->l_policy_data
.l_extent
;
455 struct cl_attr
*attr
= &osc_env_info(env
)->oti_attr
;
458 /* Destroy pages covered by the extent of the DLM lock */
459 result
= osc_lock_flush(cl2osc(obj
),
460 cl_index(obj
, extent
->start
),
461 cl_index(obj
, extent
->end
),
464 /* losing a lock, update kms */
465 lock_res_and_lock(dlmlock
);
466 cl_object_attr_lock(obj
);
467 /* Must get the value under the lock to avoid race. */
468 old_kms
= cl2osc(obj
)->oo_oinfo
->loi_kms
;
469 /* Update the kms. Need to loop all granted locks.
470 * Not a problem for the client
472 attr
->cat_kms
= ldlm_extent_shift_kms(dlmlock
, old_kms
);
474 cl_object_attr_set(env
, obj
, attr
, CAT_KMS
);
475 cl_object_attr_unlock(obj
);
476 unlock_res_and_lock(dlmlock
);
478 cl_object_put(env
, obj
);
484 * Blocking ast invoked by ldlm when dlm lock is either blocking progress of
485 * some other lock, or is canceled. This function is installed as a
486 * ldlm_lock::l_blocking_ast() for client extent locks.
488 * Control flow is tricky, because ldlm uses the same call-back
489 * (ldlm_lock::l_blocking_ast()) for both blocking and cancellation ast's.
491 * \param dlmlock lock for which ast occurred.
493 * \param new description of a conflicting lock in case of blocking ast.
495 * \param data value of dlmlock->l_ast_data
497 * \param flag LDLM_CB_BLOCKING or LDLM_CB_CANCELING. Used to distinguish
498 * cancellation and blocking ast's.
500 * Possible use cases:
502 * - ldlm calls dlmlock->l_blocking_ast(..., LDLM_CB_CANCELING) to cancel
503 * lock due to lock lru pressure, or explicit user request to purge
506 * - ldlm calls dlmlock->l_blocking_ast(..., LDLM_CB_BLOCKING) to notify
507 * us that dlmlock conflicts with another lock that some client is
508 * enqueing. Lock is canceled.
510 * - cl_lock_cancel() is called. osc_lock_cancel() calls
511 * ldlm_cli_cancel() that calls
513 * dlmlock->l_blocking_ast(..., LDLM_CB_CANCELING)
515 * recursively entering osc_ldlm_blocking_ast().
517 * - client cancels lock voluntary (e.g., as a part of early cancellation):
520 * osc_lock_cancel()->
521 * ldlm_cli_cancel()->
522 * dlmlock->l_blocking_ast(..., LDLM_CB_CANCELING)
525 static int osc_ldlm_blocking_ast(struct ldlm_lock
*dlmlock
,
526 struct ldlm_lock_desc
*new, void *data
,
532 case LDLM_CB_BLOCKING
: {
533 struct lustre_handle lockh
;
535 ldlm_lock2handle(dlmlock
, &lockh
);
536 result
= ldlm_cli_cancel(&lockh
, LCF_ASYNC
);
537 if (result
== -ENODATA
)
541 case LDLM_CB_CANCELING
: {
543 struct cl_env_nest nest
;
546 * This can be called in the context of outer IO, e.g.,
548 * osc_enqueue_base()->...
549 * ->ldlm_prep_elc_req()->...
550 * ->ldlm_cancel_callback()->...
551 * ->osc_ldlm_blocking_ast()
553 * new environment has to be created to not corrupt outer
556 env
= cl_env_nested_get(&nest
);
558 result
= PTR_ERR(env
);
562 result
= osc_dlm_blocking_ast0(env
, dlmlock
, data
, flag
);
563 cl_env_nested_put(&nest
, env
);
572 static int osc_ldlm_glimpse_ast(struct ldlm_lock
*dlmlock
, void *data
)
574 struct ptlrpc_request
*req
= data
;
575 struct cl_env_nest nest
;
578 struct req_capsule
*cap
;
581 LASSERT(lustre_msg_get_opc(req
->rq_reqmsg
) == LDLM_GL_CALLBACK
);
583 env
= cl_env_nested_get(&nest
);
585 struct cl_object
*obj
= NULL
;
587 lock_res_and_lock(dlmlock
);
588 if (dlmlock
->l_ast_data
) {
589 obj
= osc2cl(dlmlock
->l_ast_data
);
592 unlock_res_and_lock(dlmlock
);
595 /* Do not grab the mutex of cl_lock for glimpse.
596 * See LU-1274 for details.
597 * BTW, it's okay for cl_lock to be cancelled during
598 * this period because server can handle this race.
599 * See ldlm_server_glimpse_ast() for details.
600 * cl_lock_mutex_get(env, lock);
603 req_capsule_extend(cap
, &RQF_LDLM_GL_CALLBACK
);
604 req_capsule_set_size(cap
, &RMF_DLM_LVB
, RCL_SERVER
,
606 result
= req_capsule_server_pack(cap
);
608 lvb
= req_capsule_server_get(cap
, &RMF_DLM_LVB
);
609 result
= cl_object_glimpse(env
, obj
, lvb
);
611 if (!exp_connect_lvb_type(req
->rq_export
))
612 req_capsule_shrink(&req
->rq_pill
,
614 sizeof(struct ost_lvb_v1
),
616 cl_object_put(env
, obj
);
619 * These errors are normal races, so we don't want to
620 * fill the console with messages by calling
623 lustre_pack_reply(req
, 1, NULL
, NULL
);
624 result
= -ELDLM_NO_LOCK_DATA
;
626 cl_env_nested_put(&nest
, env
);
628 result
= PTR_ERR(env
);
630 req
->rq_status
= result
;
634 static int weigh_cb(const struct lu_env
*env
, struct cl_io
*io
,
635 struct osc_page
*ops
, void *cbdata
)
637 struct cl_page
*page
= ops
->ops_cl
.cpl_page
;
639 if (cl_page_is_vmlocked(env
, page
) ||
640 PageDirty(page
->cp_vmpage
) || PageWriteback(page
->cp_vmpage
)
642 (*(unsigned long *)cbdata
)++;
643 return CLP_GANG_ABORT
;
646 return CLP_GANG_OKAY
;
649 static unsigned long osc_lock_weight(const struct lu_env
*env
,
650 struct osc_object
*oscobj
,
651 struct ldlm_extent
*extent
)
653 struct cl_io
*io
= &osc_env_info(env
)->oti_io
;
654 struct cl_object
*obj
= cl_object_top(&oscobj
->oo_cl
);
655 unsigned long npages
= 0;
659 io
->ci_ignore_layout
= 1;
660 result
= cl_io_init(env
, io
, CIT_MISC
, io
->ci_obj
);
665 result
= osc_page_gang_lookup(env
, io
, oscobj
,
666 cl_index(obj
, extent
->start
),
667 cl_index(obj
, extent
->end
),
668 weigh_cb
, (void *)&npages
);
669 if (result
== CLP_GANG_ABORT
)
671 if (result
== CLP_GANG_RESCHED
)
673 } while (result
!= CLP_GANG_OKAY
);
680 * Get the weight of dlm lock for early cancellation.
682 unsigned long osc_ldlm_weigh_ast(struct ldlm_lock
*dlmlock
)
684 struct cl_env_nest nest
;
686 struct osc_object
*obj
;
687 struct osc_lock
*oscl
;
688 unsigned long weight
;
693 * osc_ldlm_weigh_ast has a complex context since it might be called
694 * because of lock canceling, or from user's input. We have to make
695 * a new environment for it. Probably it is implementation safe to use
696 * the upper context because cl_lock_put don't modify environment
697 * variables. But just in case ..
699 env
= cl_env_nested_get(&nest
);
701 /* Mostly because lack of memory, do not eliminate this lock */
704 LASSERT(dlmlock
->l_resource
->lr_type
== LDLM_EXTENT
);
705 obj
= dlmlock
->l_ast_data
;
711 spin_lock(&obj
->oo_ol_spin
);
712 list_for_each_entry(oscl
, &obj
->oo_ol_list
, ols_nextlock_oscobj
) {
713 if (oscl
->ols_dlmlock
&& oscl
->ols_dlmlock
!= dlmlock
)
717 spin_unlock(&obj
->oo_ol_spin
);
720 * If the lock is being used by an IO, definitely not cancel it.
726 weight
= osc_lock_weight(env
, obj
, &dlmlock
->l_policy_data
.l_extent
);
729 cl_env_nested_put(&nest
, env
);
733 static void osc_lock_build_einfo(const struct lu_env
*env
,
734 const struct cl_lock
*lock
,
735 struct osc_object
*osc
,
736 struct ldlm_enqueue_info
*einfo
)
738 einfo
->ei_type
= LDLM_EXTENT
;
739 einfo
->ei_mode
= osc_cl_lock2ldlm(lock
->cll_descr
.cld_mode
);
740 einfo
->ei_cb_bl
= osc_ldlm_blocking_ast
;
741 einfo
->ei_cb_cp
= ldlm_completion_ast
;
742 einfo
->ei_cb_gl
= osc_ldlm_glimpse_ast
;
743 einfo
->ei_cbdata
= osc
; /* value to be put into ->l_ast_data */
747 * Determine if the lock should be converted into a lockless lock.
750 * - if the lock has an explicit requirement for a non-lockless lock;
751 * - if the io lock request type ci_lockreq;
752 * - send the enqueue rpc to ost to make the further decision;
753 * - special treat to truncate lockless lock
755 * Additional policy can be implemented here, e.g., never do lockless-io
758 static void osc_lock_to_lockless(const struct lu_env
*env
,
759 struct osc_lock
*ols
, int force
)
761 struct cl_lock_slice
*slice
= &ols
->ols_cl
;
763 LASSERT(ols
->ols_state
== OLS_NEW
||
764 ols
->ols_state
== OLS_UPCALL_RECEIVED
);
767 ols
->ols_locklessable
= 1;
768 slice
->cls_ops
= &osc_lock_lockless_ops
;
770 struct osc_io
*oio
= osc_env_io(env
);
771 struct cl_io
*io
= oio
->oi_cl
.cis_io
;
772 struct cl_object
*obj
= slice
->cls_obj
;
773 struct osc_object
*oob
= cl2osc(obj
);
774 const struct osc_device
*osd
= lu2osc_dev(obj
->co_lu
.lo_dev
);
775 struct obd_connect_data
*ocd
;
777 LASSERT(io
->ci_lockreq
== CILR_MANDATORY
||
778 io
->ci_lockreq
== CILR_MAYBE
||
779 io
->ci_lockreq
== CILR_NEVER
);
781 ocd
= &class_exp2cliimp(osc_export(oob
))->imp_connect_data
;
782 ols
->ols_locklessable
= (io
->ci_type
!= CIT_SETATTR
) &&
783 (io
->ci_lockreq
== CILR_MAYBE
) &&
784 (ocd
->ocd_connect_flags
& OBD_CONNECT_SRVLOCK
);
785 if (io
->ci_lockreq
== CILR_NEVER
||
787 (ols
->ols_locklessable
&& osc_object_is_contended(oob
)) ||
788 /* lockless truncate */
789 (cl_io_is_trunc(io
) &&
790 (ocd
->ocd_connect_flags
& OBD_CONNECT_TRUNCLOCK
) &&
791 osd
->od_lockless_truncate
)) {
792 ols
->ols_locklessable
= 1;
793 slice
->cls_ops
= &osc_lock_lockless_ops
;
796 LASSERT(ergo(ols
->ols_glimpse
, !osc_lock_is_lockless(ols
)));
799 static bool osc_lock_compatible(const struct osc_lock
*qing
,
800 const struct osc_lock
*qed
)
802 struct cl_lock_descr
*qed_descr
= &qed
->ols_cl
.cls_lock
->cll_descr
;
803 struct cl_lock_descr
*qing_descr
= &qing
->ols_cl
.cls_lock
->cll_descr
;
805 if (qed
->ols_glimpse
)
808 if (qing_descr
->cld_mode
== CLM_READ
&& qed_descr
->cld_mode
== CLM_READ
)
811 if (qed
->ols_state
< OLS_GRANTED
)
814 if (qed_descr
->cld_mode
>= qing_descr
->cld_mode
&&
815 qed_descr
->cld_start
<= qing_descr
->cld_start
&&
816 qed_descr
->cld_end
>= qing_descr
->cld_end
)
822 static void osc_lock_wake_waiters(const struct lu_env
*env
,
823 struct osc_object
*osc
,
824 struct osc_lock
*oscl
)
826 spin_lock(&osc
->oo_ol_spin
);
827 list_del_init(&oscl
->ols_nextlock_oscobj
);
828 spin_unlock(&osc
->oo_ol_spin
);
830 spin_lock(&oscl
->ols_lock
);
831 while (!list_empty(&oscl
->ols_waiting_list
)) {
832 struct osc_lock
*scan
;
834 scan
= list_entry(oscl
->ols_waiting_list
.next
, struct osc_lock
,
836 list_del_init(&scan
->ols_wait_entry
);
838 cl_sync_io_note(env
, scan
->ols_owner
, 0);
840 spin_unlock(&oscl
->ols_lock
);
843 static void osc_lock_enqueue_wait(const struct lu_env
*env
,
844 struct osc_object
*obj
,
845 struct osc_lock
*oscl
)
847 struct osc_lock
*tmp_oscl
;
848 struct cl_lock_descr
*need
= &oscl
->ols_cl
.cls_lock
->cll_descr
;
849 struct cl_sync_io
*waiter
= &osc_env_info(env
)->oti_anchor
;
851 spin_lock(&obj
->oo_ol_spin
);
852 list_add_tail(&oscl
->ols_nextlock_oscobj
, &obj
->oo_ol_list
);
855 list_for_each_entry(tmp_oscl
, &obj
->oo_ol_list
,
856 ols_nextlock_oscobj
) {
857 struct cl_lock_descr
*descr
;
859 if (tmp_oscl
== oscl
)
862 descr
= &tmp_oscl
->ols_cl
.cls_lock
->cll_descr
;
863 if (descr
->cld_start
> need
->cld_end
||
864 descr
->cld_end
< need
->cld_start
)
867 /* We're not supposed to give up group lock */
868 if (descr
->cld_mode
== CLM_GROUP
)
871 if (!osc_lock_is_lockless(oscl
) &&
872 osc_lock_compatible(oscl
, tmp_oscl
))
875 /* wait for conflicting lock to be canceled */
876 cl_sync_io_init(waiter
, 1, cl_sync_io_end
);
877 oscl
->ols_owner
= waiter
;
879 spin_lock(&tmp_oscl
->ols_lock
);
880 /* add oscl into tmp's ols_waiting list */
881 list_add_tail(&oscl
->ols_wait_entry
,
882 &tmp_oscl
->ols_waiting_list
);
883 spin_unlock(&tmp_oscl
->ols_lock
);
885 spin_unlock(&obj
->oo_ol_spin
);
886 (void)cl_sync_io_wait(env
, waiter
, 0);
888 spin_lock(&obj
->oo_ol_spin
);
889 oscl
->ols_owner
= NULL
;
892 spin_unlock(&obj
->oo_ol_spin
);
896 * Implementation of cl_lock_operations::clo_enqueue() method for osc
897 * layer. This initiates ldlm enqueue:
899 * - cancels conflicting locks early (osc_lock_enqueue_wait());
901 * - calls osc_enqueue_base() to do actual enqueue.
903 * osc_enqueue_base() is supplied with an upcall function that is executed
904 * when lock is received either after a local cached ldlm lock is matched, or
905 * when a reply from the server is received.
907 * This function does not wait for the network communication to complete.
909 static int osc_lock_enqueue(const struct lu_env
*env
,
910 const struct cl_lock_slice
*slice
,
911 struct cl_io
*unused
, struct cl_sync_io
*anchor
)
913 struct osc_thread_info
*info
= osc_env_info(env
);
914 struct osc_io
*oio
= osc_env_io(env
);
915 struct osc_object
*osc
= cl2osc(slice
->cls_obj
);
916 struct osc_lock
*oscl
= cl2osc_lock(slice
);
917 struct cl_lock
*lock
= slice
->cls_lock
;
918 struct ldlm_res_id
*resname
= &info
->oti_resname
;
919 ldlm_policy_data_t
*policy
= &info
->oti_policy
;
920 osc_enqueue_upcall_f upcall
= osc_lock_upcall
;
925 LASSERTF(ergo(oscl
->ols_glimpse
, lock
->cll_descr
.cld_mode
<= CLM_READ
),
926 "lock = %p, ols = %p\n", lock
, oscl
);
928 if (oscl
->ols_state
== OLS_GRANTED
)
931 if (oscl
->ols_flags
& LDLM_FL_TEST_LOCK
)
934 if (oscl
->ols_glimpse
) {
935 LASSERT(equi(oscl
->ols_agl
, !anchor
));
940 osc_lock_enqueue_wait(env
, osc
, oscl
);
942 /* we can grant lockless lock right after all conflicting locks
945 if (osc_lock_is_lockless(oscl
)) {
946 oscl
->ols_state
= OLS_GRANTED
;
947 oio
->oi_lockless
= 1;
952 oscl
->ols_state
= OLS_ENQUEUED
;
954 atomic_inc(&anchor
->csi_sync_nr
);
955 oscl
->ols_owner
= anchor
;
959 * DLM lock's ast data must be osc_object;
960 * if glimpse or AGL lock, async of osc_enqueue_base() must be true,
961 * DLM's enqueue callback set to osc_lock_upcall() with cookie as
964 ostid_build_res_name(&osc
->oo_oinfo
->loi_oi
, resname
);
965 osc_lock_build_einfo(env
, lock
, osc
, &oscl
->ols_einfo
);
966 osc_lock_build_policy(env
, lock
, policy
);
968 oscl
->ols_einfo
.ei_cbdata
= NULL
;
969 /* hold a reference for callback */
970 cl_object_get(osc2cl(osc
));
971 upcall
= osc_lock_upcall_agl
;
974 result
= osc_enqueue_base(osc_export(osc
), resname
, &oscl
->ols_flags
,
975 policy
, &oscl
->ols_lvb
,
976 osc
->oo_oinfo
->loi_kms_valid
,
978 &oscl
->ols_einfo
, PTLRPCD_SET
, async
,
981 oscl
->ols_state
= OLS_CANCELLED
;
982 osc_lock_wake_waiters(env
, osc
, oscl
);
984 /* hide error for AGL lock. */
986 cl_object_put(env
, osc2cl(osc
));
990 cl_sync_io_note(env
, anchor
, result
);
992 if (osc_lock_is_lockless(oscl
)) {
993 oio
->oi_lockless
= 1;
995 LASSERT(oscl
->ols_state
== OLS_GRANTED
);
996 LASSERT(oscl
->ols_hold
);
997 LASSERT(oscl
->ols_dlmlock
);
1004 * Breaks a link between osc_lock and dlm_lock.
1006 static void osc_lock_detach(const struct lu_env
*env
, struct osc_lock
*olck
)
1008 struct ldlm_lock
*dlmlock
;
1010 dlmlock
= olck
->ols_dlmlock
;
1014 if (olck
->ols_hold
) {
1016 osc_cancel_base(&olck
->ols_handle
, olck
->ols_einfo
.ei_mode
);
1017 olck
->ols_handle
.cookie
= 0ULL;
1020 olck
->ols_dlmlock
= NULL
;
1022 /* release a reference taken in osc_lock_upcall(). */
1023 LASSERT(olck
->ols_has_ref
);
1024 lu_ref_del(&dlmlock
->l_reference
, "osc_lock", olck
);
1025 LDLM_LOCK_RELEASE(dlmlock
);
1026 olck
->ols_has_ref
= 0;
1030 * Implements cl_lock_operations::clo_cancel() method for osc layer. This is
1031 * called (as part of cl_lock_cancel()) when lock is canceled either voluntary
1032 * (LRU pressure, early cancellation, umount, etc.) or due to the conflict
1033 * with some other lock some where in the cluster. This function does the
1036 * - invalidates all pages protected by this lock (after sending dirty
1037 * ones to the server, as necessary);
1039 * - decref's underlying ldlm lock;
1041 * - cancels ldlm lock (ldlm_cli_cancel()).
1043 static void osc_lock_cancel(const struct lu_env
*env
,
1044 const struct cl_lock_slice
*slice
)
1046 struct osc_object
*obj
= cl2osc(slice
->cls_obj
);
1047 struct osc_lock
*oscl
= cl2osc_lock(slice
);
1049 LINVRNT(osc_lock_invariant(oscl
));
1051 osc_lock_detach(env
, oscl
);
1052 oscl
->ols_state
= OLS_CANCELLED
;
1053 oscl
->ols_flags
&= ~LDLM_FL_LVB_READY
;
1055 osc_lock_wake_waiters(env
, obj
, oscl
);
1058 static int osc_lock_print(const struct lu_env
*env
, void *cookie
,
1059 lu_printer_t p
, const struct cl_lock_slice
*slice
)
1061 struct osc_lock
*lock
= cl2osc_lock(slice
);
1063 (*p
)(env
, cookie
, "%p %#16llx %#llx %d %p ",
1064 lock
->ols_dlmlock
, lock
->ols_flags
, lock
->ols_handle
.cookie
,
1065 lock
->ols_state
, lock
->ols_owner
);
1066 osc_lvb_print(env
, cookie
, p
, &lock
->ols_lvb
);
1070 static const struct cl_lock_operations osc_lock_ops
= {
1071 .clo_fini
= osc_lock_fini
,
1072 .clo_enqueue
= osc_lock_enqueue
,
1073 .clo_cancel
= osc_lock_cancel
,
1074 .clo_print
= osc_lock_print
,
1077 static void osc_lock_lockless_cancel(const struct lu_env
*env
,
1078 const struct cl_lock_slice
*slice
)
1080 struct osc_lock
*ols
= cl2osc_lock(slice
);
1081 struct osc_object
*osc
= cl2osc(slice
->cls_obj
);
1082 struct cl_lock_descr
*descr
= &slice
->cls_lock
->cll_descr
;
1085 LASSERT(!ols
->ols_dlmlock
);
1086 result
= osc_lock_flush(osc
, descr
->cld_start
, descr
->cld_end
,
1087 descr
->cld_mode
, 0);
1089 CERROR("Pages for lockless lock %p were not purged(%d)\n",
1092 osc_lock_wake_waiters(env
, osc
, ols
);
1095 static const struct cl_lock_operations osc_lock_lockless_ops
= {
1096 .clo_fini
= osc_lock_fini
,
1097 .clo_enqueue
= osc_lock_enqueue
,
1098 .clo_cancel
= osc_lock_lockless_cancel
,
1099 .clo_print
= osc_lock_print
1102 static void osc_lock_set_writer(const struct lu_env
*env
,
1103 const struct cl_io
*io
,
1104 struct cl_object
*obj
, struct osc_lock
*oscl
)
1106 struct cl_lock_descr
*descr
= &oscl
->ols_cl
.cls_lock
->cll_descr
;
1110 if (!cl_object_same(io
->ci_obj
, obj
))
1113 if (likely(io
->ci_type
== CIT_WRITE
)) {
1114 io_start
= cl_index(obj
, io
->u
.ci_rw
.crw_pos
);
1115 io_end
= cl_index(obj
, io
->u
.ci_rw
.crw_pos
+
1116 io
->u
.ci_rw
.crw_count
- 1);
1117 if (cl_io_is_append(io
)) {
1119 io_end
= CL_PAGE_EOF
;
1122 LASSERT(cl_io_is_mkwrite(io
));
1123 io_start
= io_end
= io
->u
.ci_fault
.ft_index
;
1126 if (descr
->cld_mode
>= CLM_WRITE
&&
1127 descr
->cld_start
<= io_start
&& descr
->cld_end
>= io_end
) {
1128 struct osc_io
*oio
= osc_env_io(env
);
1130 /* There must be only one lock to match the write region */
1131 LASSERT(!oio
->oi_write_osclock
);
1132 oio
->oi_write_osclock
= oscl
;
1136 int osc_lock_init(const struct lu_env
*env
,
1137 struct cl_object
*obj
, struct cl_lock
*lock
,
1138 const struct cl_io
*io
)
1140 struct osc_lock
*oscl
;
1141 __u32 enqflags
= lock
->cll_descr
.cld_enq_flags
;
1143 oscl
= kmem_cache_zalloc(osc_lock_kmem
, GFP_NOFS
);
1147 oscl
->ols_state
= OLS_NEW
;
1148 spin_lock_init(&oscl
->ols_lock
);
1149 INIT_LIST_HEAD(&oscl
->ols_waiting_list
);
1150 INIT_LIST_HEAD(&oscl
->ols_wait_entry
);
1151 INIT_LIST_HEAD(&oscl
->ols_nextlock_oscobj
);
1153 oscl
->ols_flags
= osc_enq2ldlm_flags(enqflags
);
1154 oscl
->ols_agl
= !!(enqflags
& CEF_AGL
);
1156 oscl
->ols_flags
|= LDLM_FL_BLOCK_NOWAIT
;
1157 if (oscl
->ols_flags
& LDLM_FL_HAS_INTENT
) {
1158 oscl
->ols_flags
|= LDLM_FL_BLOCK_GRANTED
;
1159 oscl
->ols_glimpse
= 1;
1162 cl_lock_slice_add(lock
, &oscl
->ols_cl
, obj
, &osc_lock_ops
);
1164 if (!(enqflags
& CEF_MUST
))
1165 /* try to convert this lock to a lockless lock */
1166 osc_lock_to_lockless(env
, oscl
, (enqflags
& CEF_NEVER
));
1167 if (oscl
->ols_locklessable
&& !(enqflags
& CEF_DISCARD_DATA
))
1168 oscl
->ols_flags
|= LDLM_FL_DENY_ON_CONTENTION
;
1170 if (io
->ci_type
== CIT_WRITE
|| cl_io_is_mkwrite(io
))
1171 osc_lock_set_writer(env
, io
, obj
, oscl
);
1174 LDLM_DEBUG_NOLOCK("lock %p, osc lock %p, flags %llx\n",
1175 lock
, oscl
, oscl
->ols_flags
);
1181 * Finds an existing lock covering given index and optionally different from a
1182 * given \a except lock.
1184 struct ldlm_lock
*osc_dlmlock_at_pgoff(const struct lu_env
*env
,
1185 struct osc_object
*obj
, pgoff_t index
,
1186 int pending
, int canceling
)
1188 struct osc_thread_info
*info
= osc_env_info(env
);
1189 struct ldlm_res_id
*resname
= &info
->oti_resname
;
1190 ldlm_policy_data_t
*policy
= &info
->oti_policy
;
1191 struct lustre_handle lockh
;
1192 struct ldlm_lock
*lock
= NULL
;
1193 enum ldlm_mode mode
;
1196 ostid_build_res_name(&obj
->oo_oinfo
->loi_oi
, resname
);
1197 osc_index2policy(policy
, osc2cl(obj
), index
, index
);
1198 policy
->l_extent
.gid
= LDLM_GID_ANY
;
1200 flags
= LDLM_FL_BLOCK_GRANTED
| LDLM_FL_TEST_LOCK
;
1202 flags
|= LDLM_FL_CBPENDING
;
1204 * It is fine to match any group lock since there could be only one
1205 * with a uniq gid and it conflicts with all other lock modes too
1208 mode
= ldlm_lock_match(osc_export(obj
)->exp_obd
->obd_namespace
,
1209 flags
, resname
, LDLM_EXTENT
, policy
,
1210 LCK_PR
| LCK_PW
| LCK_GROUP
, &lockh
, canceling
);
1212 lock
= ldlm_handle2lock(&lockh
);
1213 /* RACE: the lock is cancelled so let's try again */
1214 if (unlikely(!lock
))