4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License version 2 for more details (a copy is included
14 * in the LICENSE file that accompanied this code).
16 * You should have received a copy of the GNU General Public License
17 * version 2 along with this program; If not, see
18 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
20 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21 * CA 95054 USA or visit www.sun.com if you need additional information or
27 * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
28 * Use is subject to license terms.
30 * Copyright (c) 2010, 2012, Intel Corporation.
33 * This file is part of Lustre, http://www.lustre.org/
34 * Lustre is a trademark of Sun Microsystems, Inc.
36 * lustre/ldlm/ldlm_lock.c
38 * Author: Peter Braam <braam@clusterfs.com>
39 * Author: Phil Schwan <phil@clusterfs.com>
42 #define DEBUG_SUBSYSTEM S_LDLM
44 #include "../../include/linux/libcfs/libcfs.h"
45 #include "../include/lustre_intent.h"
46 #include "../include/obd_class.h"
47 #include "ldlm_internal.h"
50 char *ldlm_lockname
[] = {
58 [LCK_GROUP
] = "GROUP",
61 EXPORT_SYMBOL(ldlm_lockname
);
63 static char *ldlm_typename
[] = {
65 [LDLM_EXTENT
] = "EXT",
70 static ldlm_policy_wire_to_local_t ldlm_policy_wire18_to_local
[] = {
71 [LDLM_PLAIN
- LDLM_MIN_TYPE
] = ldlm_plain_policy_wire_to_local
,
72 [LDLM_EXTENT
- LDLM_MIN_TYPE
] = ldlm_extent_policy_wire_to_local
,
73 [LDLM_FLOCK
- LDLM_MIN_TYPE
] = ldlm_flock_policy_wire18_to_local
,
74 [LDLM_IBITS
- LDLM_MIN_TYPE
] = ldlm_ibits_policy_wire_to_local
,
77 static ldlm_policy_wire_to_local_t ldlm_policy_wire21_to_local
[] = {
78 [LDLM_PLAIN
- LDLM_MIN_TYPE
] = ldlm_plain_policy_wire_to_local
,
79 [LDLM_EXTENT
- LDLM_MIN_TYPE
] = ldlm_extent_policy_wire_to_local
,
80 [LDLM_FLOCK
- LDLM_MIN_TYPE
] = ldlm_flock_policy_wire21_to_local
,
81 [LDLM_IBITS
- LDLM_MIN_TYPE
] = ldlm_ibits_policy_wire_to_local
,
84 static ldlm_policy_local_to_wire_t ldlm_policy_local_to_wire
[] = {
85 [LDLM_PLAIN
- LDLM_MIN_TYPE
] = ldlm_plain_policy_local_to_wire
,
86 [LDLM_EXTENT
- LDLM_MIN_TYPE
] = ldlm_extent_policy_local_to_wire
,
87 [LDLM_FLOCK
- LDLM_MIN_TYPE
] = ldlm_flock_policy_local_to_wire
,
88 [LDLM_IBITS
- LDLM_MIN_TYPE
] = ldlm_ibits_policy_local_to_wire
,
92 * Converts lock policy from local format to on the wire lock_desc format
94 static void ldlm_convert_policy_to_wire(ldlm_type_t type
,
95 const ldlm_policy_data_t
*lpolicy
,
96 ldlm_wire_policy_data_t
*wpolicy
)
98 ldlm_policy_local_to_wire_t convert
;
100 convert
= ldlm_policy_local_to_wire
[type
- LDLM_MIN_TYPE
];
102 convert(lpolicy
, wpolicy
);
106 * Converts lock policy from on the wire lock_desc format to local format
108 void ldlm_convert_policy_to_local(struct obd_export
*exp
, ldlm_type_t type
,
109 const ldlm_wire_policy_data_t
*wpolicy
,
110 ldlm_policy_data_t
*lpolicy
)
112 ldlm_policy_wire_to_local_t convert
;
115 /** some badness for 2.0.0 clients, but 2.0.0 isn't supported */
116 new_client
= (exp_connect_flags(exp
) & OBD_CONNECT_FULL20
) != 0;
118 convert
= ldlm_policy_wire21_to_local
[type
- LDLM_MIN_TYPE
];
120 convert
= ldlm_policy_wire18_to_local
[type
- LDLM_MIN_TYPE
];
122 convert(wpolicy
, lpolicy
);
125 char *ldlm_it2str(int it
)
132 case (IT_OPEN
| IT_CREAT
):
147 CERROR("Unknown intent %d\n", it
);
151 EXPORT_SYMBOL(ldlm_it2str
);
154 * REFCOUNTED LOCK OBJECTS
158 * Get a reference on a lock.
160 * Lock refcounts, during creation:
161 * - one special one for allocation, dec'd only once in destroy
162 * - one for being a lock that's in-use
163 * - one for the addref associated with a new lock
165 struct ldlm_lock
*ldlm_lock_get(struct ldlm_lock
*lock
)
167 atomic_inc(&lock
->l_refc
);
170 EXPORT_SYMBOL(ldlm_lock_get
);
173 * Release lock reference.
175 * Also frees the lock if it was last reference.
177 void ldlm_lock_put(struct ldlm_lock
*lock
)
179 LASSERT(lock
->l_resource
!= LP_POISON
);
180 LASSERT(atomic_read(&lock
->l_refc
) > 0);
181 if (atomic_dec_and_test(&lock
->l_refc
)) {
182 struct ldlm_resource
*res
;
185 "final lock_put on destroyed lock, freeing it.");
187 res
= lock
->l_resource
;
188 LASSERT(lock
->l_flags
& LDLM_FL_DESTROYED
);
189 LASSERT(list_empty(&lock
->l_res_link
));
190 LASSERT(list_empty(&lock
->l_pending_chain
));
192 lprocfs_counter_decr(ldlm_res_to_ns(res
)->ns_stats
,
194 lu_ref_del(&res
->lr_reference
, "lock", lock
);
195 ldlm_resource_putref(res
);
196 lock
->l_resource
= NULL
;
197 if (lock
->l_export
) {
198 class_export_lock_put(lock
->l_export
, lock
);
199 lock
->l_export
= NULL
;
202 kfree(lock
->l_lvb_data
);
204 ldlm_interval_free(ldlm_interval_detach(lock
));
205 lu_ref_fini(&lock
->l_reference
);
206 OBD_FREE_RCU(lock
, sizeof(*lock
), &lock
->l_handle
);
209 EXPORT_SYMBOL(ldlm_lock_put
);
212 * Removes LDLM lock \a lock from LRU. Assumes LRU is already locked.
214 int ldlm_lock_remove_from_lru_nolock(struct ldlm_lock
*lock
)
218 if (!list_empty(&lock
->l_lru
)) {
219 struct ldlm_namespace
*ns
= ldlm_lock_to_ns(lock
);
221 LASSERT(lock
->l_resource
->lr_type
!= LDLM_FLOCK
);
222 list_del_init(&lock
->l_lru
);
223 LASSERT(ns
->ns_nr_unused
> 0);
231 * Removes LDLM lock \a lock from LRU. Obtains the LRU lock first.
233 int ldlm_lock_remove_from_lru(struct ldlm_lock
*lock
)
235 struct ldlm_namespace
*ns
= ldlm_lock_to_ns(lock
);
238 spin_lock(&ns
->ns_lock
);
239 rc
= ldlm_lock_remove_from_lru_nolock(lock
);
240 spin_unlock(&ns
->ns_lock
);
245 * Adds LDLM lock \a lock to namespace LRU. Assumes LRU is already locked.
247 static void ldlm_lock_add_to_lru_nolock(struct ldlm_lock
*lock
)
249 struct ldlm_namespace
*ns
= ldlm_lock_to_ns(lock
);
251 lock
->l_last_used
= cfs_time_current();
252 LASSERT(list_empty(&lock
->l_lru
));
253 LASSERT(lock
->l_resource
->lr_type
!= LDLM_FLOCK
);
254 list_add_tail(&lock
->l_lru
, &ns
->ns_unused_list
);
255 if (lock
->l_flags
& LDLM_FL_SKIPPED
)
256 lock
->l_flags
&= ~LDLM_FL_SKIPPED
;
257 LASSERT(ns
->ns_nr_unused
>= 0);
262 * Adds LDLM lock \a lock to namespace LRU. Obtains necessary LRU locks
265 static void ldlm_lock_add_to_lru(struct ldlm_lock
*lock
)
267 struct ldlm_namespace
*ns
= ldlm_lock_to_ns(lock
);
269 spin_lock(&ns
->ns_lock
);
270 ldlm_lock_add_to_lru_nolock(lock
);
271 spin_unlock(&ns
->ns_lock
);
275 * Moves LDLM lock \a lock that is already in namespace LRU to the tail of
276 * the LRU. Performs necessary LRU locking
278 static void ldlm_lock_touch_in_lru(struct ldlm_lock
*lock
)
280 struct ldlm_namespace
*ns
= ldlm_lock_to_ns(lock
);
282 spin_lock(&ns
->ns_lock
);
283 if (!list_empty(&lock
->l_lru
)) {
284 ldlm_lock_remove_from_lru_nolock(lock
);
285 ldlm_lock_add_to_lru_nolock(lock
);
287 spin_unlock(&ns
->ns_lock
);
291 * Helper to destroy a locked lock.
293 * Used by ldlm_lock_destroy and ldlm_lock_destroy_nolock
294 * Must be called with l_lock and lr_lock held.
296 * Does not actually free the lock data, but rather marks the lock as
297 * destroyed by setting l_destroyed field in the lock to 1. Destroys a
298 * handle->lock association too, so that the lock can no longer be found
299 * and removes the lock from LRU list. Actual lock freeing occurs when
300 * last lock reference goes away.
302 * Original comment (of some historical value):
303 * This used to have a 'strict' flag, which recovery would use to mark an
304 * in-use lock as needing-to-die. Lest I am ever tempted to put it back, I
305 * shall explain why it's gone: with the new hash table scheme, once you call
306 * ldlm_lock_destroy, you can never drop your final references on this lock.
307 * Because it's not in the hash table anymore. -phil
309 static int ldlm_lock_destroy_internal(struct ldlm_lock
*lock
)
311 if (lock
->l_readers
|| lock
->l_writers
) {
312 LDLM_ERROR(lock
, "lock still has references");
316 if (!list_empty(&lock
->l_res_link
)) {
317 LDLM_ERROR(lock
, "lock still on resource");
321 if (lock
->l_flags
& LDLM_FL_DESTROYED
) {
322 LASSERT(list_empty(&lock
->l_lru
));
325 lock
->l_flags
|= LDLM_FL_DESTROYED
;
327 if (lock
->l_export
&& lock
->l_export
->exp_lock_hash
) {
328 /* NB: it's safe to call cfs_hash_del() even lock isn't
329 * in exp_lock_hash. */
330 /* In the function below, .hs_keycmp resolves to
331 * ldlm_export_lock_keycmp() */
332 /* coverity[overrun-buffer-val] */
333 cfs_hash_del(lock
->l_export
->exp_lock_hash
,
334 &lock
->l_remote_handle
, &lock
->l_exp_hash
);
337 ldlm_lock_remove_from_lru(lock
);
338 class_handle_unhash(&lock
->l_handle
);
341 /* Wake anyone waiting for this lock */
342 /* FIXME: I should probably add yet another flag, instead of using
343 * l_export to only call this on clients */
345 class_export_put(lock
->l_export
);
346 lock
->l_export
= NULL
;
347 if (lock
->l_export
&& lock
->l_completion_ast
)
348 lock
->l_completion_ast(lock
, 0);
354 * Destroys a LDLM lock \a lock. Performs necessary locking first.
356 static void ldlm_lock_destroy(struct ldlm_lock
*lock
)
360 lock_res_and_lock(lock
);
361 first
= ldlm_lock_destroy_internal(lock
);
362 unlock_res_and_lock(lock
);
364 /* drop reference from hashtable only for first destroy */
366 lu_ref_del(&lock
->l_reference
, "hash", lock
);
367 LDLM_LOCK_RELEASE(lock
);
372 * Destroys a LDLM lock \a lock that is already locked.
374 void ldlm_lock_destroy_nolock(struct ldlm_lock
*lock
)
378 first
= ldlm_lock_destroy_internal(lock
);
379 /* drop reference from hashtable only for first destroy */
381 lu_ref_del(&lock
->l_reference
, "hash", lock
);
382 LDLM_LOCK_RELEASE(lock
);
386 /* this is called by portals_handle2object with the handle lock taken */
387 static void lock_handle_addref(void *lock
)
389 LDLM_LOCK_GET((struct ldlm_lock
*)lock
);
392 static void lock_handle_free(void *lock
, int size
)
394 LASSERT(size
== sizeof(struct ldlm_lock
));
395 kmem_cache_free(ldlm_lock_slab
, lock
);
398 static struct portals_handle_ops lock_handle_ops
= {
399 .hop_addref
= lock_handle_addref
,
400 .hop_free
= lock_handle_free
,
405 * Allocate and initialize new lock structure.
407 * usage: pass in a resource on which you have done ldlm_resource_get
408 * new lock will take over the refcount.
409 * returns: lock with refcount 2 - one for current caller and one for remote
411 static struct ldlm_lock
*ldlm_lock_new(struct ldlm_resource
*resource
)
413 struct ldlm_lock
*lock
;
415 if (resource
== NULL
)
418 lock
= kmem_cache_alloc(ldlm_lock_slab
, GFP_NOFS
| __GFP_ZERO
);
422 spin_lock_init(&lock
->l_lock
);
423 lock
->l_resource
= resource
;
424 lu_ref_add(&resource
->lr_reference
, "lock", lock
);
426 atomic_set(&lock
->l_refc
, 2);
427 INIT_LIST_HEAD(&lock
->l_res_link
);
428 INIT_LIST_HEAD(&lock
->l_lru
);
429 INIT_LIST_HEAD(&lock
->l_pending_chain
);
430 INIT_LIST_HEAD(&lock
->l_bl_ast
);
431 INIT_LIST_HEAD(&lock
->l_cp_ast
);
432 INIT_LIST_HEAD(&lock
->l_rk_ast
);
433 init_waitqueue_head(&lock
->l_waitq
);
434 lock
->l_blocking_lock
= NULL
;
435 INIT_LIST_HEAD(&lock
->l_sl_mode
);
436 INIT_LIST_HEAD(&lock
->l_sl_policy
);
437 INIT_HLIST_NODE(&lock
->l_exp_hash
);
438 INIT_HLIST_NODE(&lock
->l_exp_flock_hash
);
440 lprocfs_counter_incr(ldlm_res_to_ns(resource
)->ns_stats
,
442 INIT_LIST_HEAD(&lock
->l_handle
.h_link
);
443 class_handle_hash(&lock
->l_handle
, &lock_handle_ops
);
445 lu_ref_init(&lock
->l_reference
);
446 lu_ref_add(&lock
->l_reference
, "hash", lock
);
447 lock
->l_callback_timeout
= 0;
449 #if LUSTRE_TRACKS_LOCK_EXP_REFS
450 INIT_LIST_HEAD(&lock
->l_exp_refs_link
);
451 lock
->l_exp_refs_nr
= 0;
452 lock
->l_exp_refs_target
= NULL
;
454 INIT_LIST_HEAD(&lock
->l_exp_list
);
460 * Moves LDLM lock \a lock to another resource.
461 * This is used on client when server returns some other lock than requested
462 * (typically as a result of intent operation)
464 int ldlm_lock_change_resource(struct ldlm_namespace
*ns
, struct ldlm_lock
*lock
,
465 const struct ldlm_res_id
*new_resid
)
467 struct ldlm_resource
*oldres
= lock
->l_resource
;
468 struct ldlm_resource
*newres
;
471 lock_res_and_lock(lock
);
472 if (memcmp(new_resid
, &lock
->l_resource
->lr_name
,
473 sizeof(lock
->l_resource
->lr_name
)) == 0) {
475 unlock_res_and_lock(lock
);
479 LASSERT(new_resid
->name
[0] != 0);
481 /* This function assumes that the lock isn't on any lists */
482 LASSERT(list_empty(&lock
->l_res_link
));
484 type
= oldres
->lr_type
;
485 unlock_res_and_lock(lock
);
487 newres
= ldlm_resource_get(ns
, NULL
, new_resid
, type
, 1);
491 lu_ref_add(&newres
->lr_reference
, "lock", lock
);
493 * To flip the lock from the old to the new resource, lock, oldres and
494 * newres have to be locked. Resource spin-locks are nested within
495 * lock->l_lock, and are taken in the memory address order to avoid
498 spin_lock(&lock
->l_lock
);
499 oldres
= lock
->l_resource
;
500 if (oldres
< newres
) {
502 lock_res_nested(newres
, LRT_NEW
);
505 lock_res_nested(oldres
, LRT_NEW
);
507 LASSERT(memcmp(new_resid
, &oldres
->lr_name
,
508 sizeof(oldres
->lr_name
)) != 0);
509 lock
->l_resource
= newres
;
511 unlock_res_and_lock(lock
);
513 /* ...and the flowers are still standing! */
514 lu_ref_del(&oldres
->lr_reference
, "lock", lock
);
515 ldlm_resource_putref(oldres
);
519 EXPORT_SYMBOL(ldlm_lock_change_resource
);
521 /** \defgroup ldlm_handles LDLM HANDLES
522 * Ways to get hold of locks without any addresses.
527 * Fills in handle for LDLM lock \a lock into supplied \a lockh
528 * Does not take any references.
530 void ldlm_lock2handle(const struct ldlm_lock
*lock
, struct lustre_handle
*lockh
)
532 lockh
->cookie
= lock
->l_handle
.h_cookie
;
534 EXPORT_SYMBOL(ldlm_lock2handle
);
537 * Obtain a lock reference by handle.
539 * if \a flags: atomically get the lock and set the flags.
540 * Return NULL if flag already set
542 struct ldlm_lock
*__ldlm_handle2lock(const struct lustre_handle
*handle
,
545 struct ldlm_lock
*lock
;
549 lock
= class_handle2object(handle
->cookie
);
553 /* It's unlikely but possible that someone marked the lock as
554 * destroyed after we did handle2object on it */
555 if (flags
== 0 && ((lock
->l_flags
& LDLM_FL_DESTROYED
) == 0)) {
556 lu_ref_add(&lock
->l_reference
, "handle", current
);
560 lock_res_and_lock(lock
);
562 LASSERT(lock
->l_resource
!= NULL
);
564 lu_ref_add_atomic(&lock
->l_reference
, "handle", current
);
565 if (unlikely(lock
->l_flags
& LDLM_FL_DESTROYED
)) {
566 unlock_res_and_lock(lock
);
567 CDEBUG(D_INFO
, "lock already destroyed: lock %p\n", lock
);
572 if (flags
&& (lock
->l_flags
& flags
)) {
573 unlock_res_and_lock(lock
);
579 lock
->l_flags
|= flags
;
581 unlock_res_and_lock(lock
);
584 EXPORT_SYMBOL(__ldlm_handle2lock
);
585 /** @} ldlm_handles */
588 * Fill in "on the wire" representation for given LDLM lock into supplied
589 * lock descriptor \a desc structure.
591 void ldlm_lock2desc(struct ldlm_lock
*lock
, struct ldlm_lock_desc
*desc
)
593 ldlm_res2desc(lock
->l_resource
, &desc
->l_resource
);
594 desc
->l_req_mode
= lock
->l_req_mode
;
595 desc
->l_granted_mode
= lock
->l_granted_mode
;
596 ldlm_convert_policy_to_wire(lock
->l_resource
->lr_type
,
597 &lock
->l_policy_data
,
598 &desc
->l_policy_data
);
600 EXPORT_SYMBOL(ldlm_lock2desc
);
603 * Add a lock to list of conflicting locks to send AST to.
605 * Only add if we have not sent a blocking AST to the lock yet.
607 static void ldlm_add_bl_work_item(struct ldlm_lock
*lock
, struct ldlm_lock
*new,
608 struct list_head
*work_list
)
610 if ((lock
->l_flags
& LDLM_FL_AST_SENT
) == 0) {
611 LDLM_DEBUG(lock
, "lock incompatible; sending blocking AST.");
612 lock
->l_flags
|= LDLM_FL_AST_SENT
;
613 /* If the enqueuing client said so, tell the AST recipient to
614 * discard dirty data, rather than writing back. */
615 if (new->l_flags
& LDLM_FL_AST_DISCARD_DATA
)
616 lock
->l_flags
|= LDLM_FL_DISCARD_DATA
;
617 LASSERT(list_empty(&lock
->l_bl_ast
));
618 list_add(&lock
->l_bl_ast
, work_list
);
620 LASSERT(lock
->l_blocking_lock
== NULL
);
621 lock
->l_blocking_lock
= LDLM_LOCK_GET(new);
626 * Add a lock to list of just granted locks to send completion AST to.
628 static void ldlm_add_cp_work_item(struct ldlm_lock
*lock
,
629 struct list_head
*work_list
)
631 if ((lock
->l_flags
& LDLM_FL_CP_REQD
) == 0) {
632 lock
->l_flags
|= LDLM_FL_CP_REQD
;
633 LDLM_DEBUG(lock
, "lock granted; sending completion AST.");
634 LASSERT(list_empty(&lock
->l_cp_ast
));
635 list_add(&lock
->l_cp_ast
, work_list
);
641 * Aggregator function to add AST work items into a list. Determines
642 * what sort of an AST work needs to be done and calls the proper
644 * Must be called with lr_lock held.
646 static void ldlm_add_ast_work_item(struct ldlm_lock
*lock
,
647 struct ldlm_lock
*new,
648 struct list_head
*work_list
)
650 check_res_locked(lock
->l_resource
);
652 ldlm_add_bl_work_item(lock
, new, work_list
);
654 ldlm_add_cp_work_item(lock
, work_list
);
658 * Add specified reader/writer reference to LDLM lock with handle \a lockh.
659 * r/w reference type is determined by \a mode
660 * Calls ldlm_lock_addref_internal.
662 void ldlm_lock_addref(struct lustre_handle
*lockh
, __u32 mode
)
664 struct ldlm_lock
*lock
;
666 lock
= ldlm_handle2lock(lockh
);
667 LASSERT(lock
!= NULL
);
668 ldlm_lock_addref_internal(lock
, mode
);
671 EXPORT_SYMBOL(ldlm_lock_addref
);
675 * Add specified reader/writer reference to LDLM lock \a lock.
676 * r/w reference type is determined by \a mode
677 * Removes lock from LRU if it is there.
678 * Assumes the LDLM lock is already locked.
680 void ldlm_lock_addref_internal_nolock(struct ldlm_lock
*lock
, __u32 mode
)
682 ldlm_lock_remove_from_lru(lock
);
683 if (mode
& (LCK_NL
| LCK_CR
| LCK_PR
)) {
685 lu_ref_add_atomic(&lock
->l_reference
, "reader", lock
);
687 if (mode
& (LCK_EX
| LCK_CW
| LCK_PW
| LCK_GROUP
| LCK_COS
)) {
689 lu_ref_add_atomic(&lock
->l_reference
, "writer", lock
);
692 lu_ref_add_atomic(&lock
->l_reference
, "user", lock
);
693 LDLM_DEBUG(lock
, "ldlm_lock_addref(%s)", ldlm_lockname
[mode
]);
697 * Attempts to add reader/writer reference to a lock with handle \a lockh, and
698 * fails if lock is already LDLM_FL_CBPENDING or destroyed.
700 * \retval 0 success, lock was addref-ed
702 * \retval -EAGAIN lock is being canceled.
704 int ldlm_lock_addref_try(struct lustre_handle
*lockh
, __u32 mode
)
706 struct ldlm_lock
*lock
;
710 lock
= ldlm_handle2lock(lockh
);
712 lock_res_and_lock(lock
);
713 if (lock
->l_readers
!= 0 || lock
->l_writers
!= 0 ||
714 !(lock
->l_flags
& LDLM_FL_CBPENDING
)) {
715 ldlm_lock_addref_internal_nolock(lock
, mode
);
718 unlock_res_and_lock(lock
);
723 EXPORT_SYMBOL(ldlm_lock_addref_try
);
726 * Add specified reader/writer reference to LDLM lock \a lock.
727 * Locks LDLM lock and calls ldlm_lock_addref_internal_nolock to do the work.
728 * Only called for local locks.
730 void ldlm_lock_addref_internal(struct ldlm_lock
*lock
, __u32 mode
)
732 lock_res_and_lock(lock
);
733 ldlm_lock_addref_internal_nolock(lock
, mode
);
734 unlock_res_and_lock(lock
);
738 * Removes reader/writer reference for LDLM lock \a lock.
739 * Assumes LDLM lock is already locked.
740 * only called in ldlm_flock_destroy and for local locks.
741 * Does NOT add lock to LRU if no r/w references left to accommodate flock locks
742 * that cannot be placed in LRU.
744 void ldlm_lock_decref_internal_nolock(struct ldlm_lock
*lock
, __u32 mode
)
746 LDLM_DEBUG(lock
, "ldlm_lock_decref(%s)", ldlm_lockname
[mode
]);
747 if (mode
& (LCK_NL
| LCK_CR
| LCK_PR
)) {
748 LASSERT(lock
->l_readers
> 0);
749 lu_ref_del(&lock
->l_reference
, "reader", lock
);
752 if (mode
& (LCK_EX
| LCK_CW
| LCK_PW
| LCK_GROUP
| LCK_COS
)) {
753 LASSERT(lock
->l_writers
> 0);
754 lu_ref_del(&lock
->l_reference
, "writer", lock
);
758 lu_ref_del(&lock
->l_reference
, "user", lock
);
759 LDLM_LOCK_RELEASE(lock
); /* matches the LDLM_LOCK_GET() in addref */
763 * Removes reader/writer reference for LDLM lock \a lock.
764 * Locks LDLM lock first.
765 * If the lock is determined to be client lock on a client and r/w refcount
766 * drops to zero and the lock is not blocked, the lock is added to LRU lock
768 * For blocked LDLM locks if r/w count drops to zero, blocking_ast is called.
770 void ldlm_lock_decref_internal(struct ldlm_lock
*lock
, __u32 mode
)
772 struct ldlm_namespace
*ns
;
774 lock_res_and_lock(lock
);
776 ns
= ldlm_lock_to_ns(lock
);
778 ldlm_lock_decref_internal_nolock(lock
, mode
);
780 if (lock
->l_flags
& LDLM_FL_LOCAL
&&
781 !lock
->l_readers
&& !lock
->l_writers
) {
782 /* If this is a local lock on a server namespace and this was
783 * the last reference, cancel the lock. */
784 CDEBUG(D_INFO
, "forcing cancel of local lock\n");
785 lock
->l_flags
|= LDLM_FL_CBPENDING
;
788 if (!lock
->l_readers
&& !lock
->l_writers
&&
789 (lock
->l_flags
& LDLM_FL_CBPENDING
)) {
790 /* If we received a blocked AST and this was the last reference,
791 * run the callback. */
793 LDLM_DEBUG(lock
, "final decref done on cbpending lock");
795 LDLM_LOCK_GET(lock
); /* dropped by bl thread */
796 ldlm_lock_remove_from_lru(lock
);
797 unlock_res_and_lock(lock
);
799 if (lock
->l_flags
& LDLM_FL_FAIL_LOC
)
800 OBD_RACE(OBD_FAIL_LDLM_CP_BL_RACE
);
802 if ((lock
->l_flags
& LDLM_FL_ATOMIC_CB
) ||
803 ldlm_bl_to_thread_lock(ns
, NULL
, lock
) != 0)
804 ldlm_handle_bl_callback(ns
, NULL
, lock
);
805 } else if (!lock
->l_readers
&& !lock
->l_writers
&&
806 !(lock
->l_flags
& LDLM_FL_NO_LRU
) &&
807 !(lock
->l_flags
& LDLM_FL_BL_AST
)) {
809 LDLM_DEBUG(lock
, "add lock into lru list");
811 /* If this is a client-side namespace and this was the last
812 * reference, put it on the LRU. */
813 ldlm_lock_add_to_lru(lock
);
814 unlock_res_and_lock(lock
);
816 if (lock
->l_flags
& LDLM_FL_FAIL_LOC
)
817 OBD_RACE(OBD_FAIL_LDLM_CP_BL_RACE
);
819 /* Call ldlm_cancel_lru() only if EARLY_CANCEL and LRU RESIZE
820 * are not supported by the server, otherwise, it is done on
822 if (!exp_connect_cancelset(lock
->l_conn_export
) &&
823 !ns_connect_lru_resize(ns
))
824 ldlm_cancel_lru(ns
, 0, LCF_ASYNC
, 0);
826 LDLM_DEBUG(lock
, "do not add lock into lru list");
827 unlock_res_and_lock(lock
);
832 * Decrease reader/writer refcount for LDLM lock with handle \a lockh
834 void ldlm_lock_decref(struct lustre_handle
*lockh
, __u32 mode
)
836 struct ldlm_lock
*lock
= __ldlm_handle2lock(lockh
, 0);
838 LASSERTF(lock
!= NULL
, "Non-existing lock: %#llx\n", lockh
->cookie
);
839 ldlm_lock_decref_internal(lock
, mode
);
842 EXPORT_SYMBOL(ldlm_lock_decref
);
845 * Decrease reader/writer refcount for LDLM lock with handle
846 * \a lockh and mark it for subsequent cancellation once r/w refcount
847 * drops to zero instead of putting into LRU.
849 * Typical usage is for GROUP locks which we cannot allow to be cached.
851 void ldlm_lock_decref_and_cancel(struct lustre_handle
*lockh
, __u32 mode
)
853 struct ldlm_lock
*lock
= __ldlm_handle2lock(lockh
, 0);
855 LASSERT(lock
!= NULL
);
857 LDLM_DEBUG(lock
, "ldlm_lock_decref(%s)", ldlm_lockname
[mode
]);
858 lock_res_and_lock(lock
);
859 lock
->l_flags
|= LDLM_FL_CBPENDING
;
860 unlock_res_and_lock(lock
);
861 ldlm_lock_decref_internal(lock
, mode
);
864 EXPORT_SYMBOL(ldlm_lock_decref_and_cancel
);
866 struct sl_insert_point
{
867 struct list_head
*res_link
;
868 struct list_head
*mode_link
;
869 struct list_head
*policy_link
;
873 * Finds a position to insert the new lock into granted lock list.
875 * Used for locks eligible for skiplist optimization.
878 * queue [input]: the granted list where search acts on;
879 * req [input]: the lock whose position to be located;
880 * prev [output]: positions within 3 lists to insert @req to
884 * - ldlm_grant_lock_with_skiplist
886 static void search_granted_lock(struct list_head
*queue
,
887 struct ldlm_lock
*req
,
888 struct sl_insert_point
*prev
)
890 struct list_head
*tmp
;
891 struct ldlm_lock
*lock
, *mode_end
, *policy_end
;
893 list_for_each(tmp
, queue
) {
894 lock
= list_entry(tmp
, struct ldlm_lock
, l_res_link
);
896 mode_end
= list_entry(lock
->l_sl_mode
.prev
,
897 struct ldlm_lock
, l_sl_mode
);
899 if (lock
->l_req_mode
!= req
->l_req_mode
) {
900 /* jump to last lock of mode group */
901 tmp
= &mode_end
->l_res_link
;
905 /* suitable mode group is found */
906 if (lock
->l_resource
->lr_type
== LDLM_PLAIN
) {
907 /* insert point is last lock of the mode group */
908 prev
->res_link
= &mode_end
->l_res_link
;
909 prev
->mode_link
= &mode_end
->l_sl_mode
;
910 prev
->policy_link
= &req
->l_sl_policy
;
914 if (lock
->l_resource
->lr_type
== LDLM_IBITS
) {
917 list_entry(lock
->l_sl_policy
.prev
,
921 if (lock
->l_policy_data
.l_inodebits
.bits
==
922 req
->l_policy_data
.l_inodebits
.bits
) {
923 /* insert point is last lock of
924 * the policy group */
926 &policy_end
->l_res_link
;
928 &policy_end
->l_sl_mode
;
930 &policy_end
->l_sl_policy
;
934 if (policy_end
== mode_end
)
935 /* done with mode group */
938 /* go to next policy group within mode group */
939 tmp
= policy_end
->l_res_link
.next
;
940 lock
= list_entry(tmp
, struct ldlm_lock
,
942 } /* loop over policy groups within the mode group */
944 /* insert point is last lock of the mode group,
945 * new policy group is started */
946 prev
->res_link
= &mode_end
->l_res_link
;
947 prev
->mode_link
= &mode_end
->l_sl_mode
;
948 prev
->policy_link
= &req
->l_sl_policy
;
952 LDLM_ERROR(lock
, "is not LDLM_PLAIN or LDLM_IBITS lock");
956 /* insert point is last lock on the queue,
957 * new mode group and new policy group are started */
958 prev
->res_link
= queue
->prev
;
959 prev
->mode_link
= &req
->l_sl_mode
;
960 prev
->policy_link
= &req
->l_sl_policy
;
964 * Add a lock into resource granted list after a position described by
967 static void ldlm_granted_list_add_lock(struct ldlm_lock
*lock
,
968 struct sl_insert_point
*prev
)
970 struct ldlm_resource
*res
= lock
->l_resource
;
972 check_res_locked(res
);
974 ldlm_resource_dump(D_INFO
, res
);
975 LDLM_DEBUG(lock
, "About to add lock:");
977 if (lock
->l_flags
& LDLM_FL_DESTROYED
) {
978 CDEBUG(D_OTHER
, "Lock destroyed, not adding to resource\n");
982 LASSERT(list_empty(&lock
->l_res_link
));
983 LASSERT(list_empty(&lock
->l_sl_mode
));
984 LASSERT(list_empty(&lock
->l_sl_policy
));
987 * lock->link == prev->link means lock is first starting the group.
988 * Don't re-add to itself to suppress kernel warnings.
990 if (&lock
->l_res_link
!= prev
->res_link
)
991 list_add(&lock
->l_res_link
, prev
->res_link
);
992 if (&lock
->l_sl_mode
!= prev
->mode_link
)
993 list_add(&lock
->l_sl_mode
, prev
->mode_link
);
994 if (&lock
->l_sl_policy
!= prev
->policy_link
)
995 list_add(&lock
->l_sl_policy
, prev
->policy_link
);
999 * Add a lock to granted list on a resource maintaining skiplist
1002 static void ldlm_grant_lock_with_skiplist(struct ldlm_lock
*lock
)
1004 struct sl_insert_point prev
;
1006 LASSERT(lock
->l_req_mode
== lock
->l_granted_mode
);
1008 search_granted_lock(&lock
->l_resource
->lr_granted
, lock
, &prev
);
1009 ldlm_granted_list_add_lock(lock
, &prev
);
1013 * Perform lock granting bookkeeping.
1015 * Includes putting the lock into granted list and updating lock mode.
1017 * - ldlm_lock_enqueue
1018 * - ldlm_reprocess_queue
1019 * - ldlm_lock_convert
1021 * must be called with lr_lock held
1023 void ldlm_grant_lock(struct ldlm_lock
*lock
, struct list_head
*work_list
)
1025 struct ldlm_resource
*res
= lock
->l_resource
;
1027 check_res_locked(res
);
1029 lock
->l_granted_mode
= lock
->l_req_mode
;
1030 if (res
->lr_type
== LDLM_PLAIN
|| res
->lr_type
== LDLM_IBITS
)
1031 ldlm_grant_lock_with_skiplist(lock
);
1032 else if (res
->lr_type
== LDLM_EXTENT
)
1033 ldlm_extent_add_lock(res
, lock
);
1035 ldlm_resource_add_lock(res
, &res
->lr_granted
, lock
);
1037 if (lock
->l_granted_mode
< res
->lr_most_restr
)
1038 res
->lr_most_restr
= lock
->l_granted_mode
;
1040 if (work_list
&& lock
->l_completion_ast
!= NULL
)
1041 ldlm_add_ast_work_item(lock
, NULL
, work_list
);
1043 ldlm_pool_add(&ldlm_res_to_ns(res
)->ns_pool
, lock
);
1047 * Search for a lock with given properties in a queue.
1049 * \retval a referenced lock or NULL. See the flag descriptions below, in the
1050 * comment above ldlm_lock_match
1052 static struct ldlm_lock
*search_queue(struct list_head
*queue
,
1054 ldlm_policy_data_t
*policy
,
1055 struct ldlm_lock
*old_lock
,
1056 __u64 flags
, int unref
)
1058 struct ldlm_lock
*lock
;
1059 struct list_head
*tmp
;
1061 list_for_each(tmp
, queue
) {
1064 lock
= list_entry(tmp
, struct ldlm_lock
, l_res_link
);
1066 if (lock
== old_lock
)
1069 /* Check if this lock can be matched.
1070 * Used by LU-2919(exclusive open) for open lease lock */
1071 if (ldlm_is_excl(lock
))
1074 /* llite sometimes wants to match locks that will be
1075 * canceled when their users drop, but we allow it to match
1076 * if it passes in CBPENDING and the lock still has users.
1077 * this is generally only going to be used by children
1078 * whose parents already hold a lock so forward progress
1079 * can still happen. */
1080 if (lock
->l_flags
& LDLM_FL_CBPENDING
&&
1081 !(flags
& LDLM_FL_CBPENDING
))
1083 if (!unref
&& lock
->l_flags
& LDLM_FL_CBPENDING
&&
1084 lock
->l_readers
== 0 && lock
->l_writers
== 0)
1087 if (!(lock
->l_req_mode
& *mode
))
1089 match
= lock
->l_req_mode
;
1091 if (lock
->l_resource
->lr_type
== LDLM_EXTENT
&&
1092 (lock
->l_policy_data
.l_extent
.start
>
1093 policy
->l_extent
.start
||
1094 lock
->l_policy_data
.l_extent
.end
< policy
->l_extent
.end
))
1097 if (unlikely(match
== LCK_GROUP
) &&
1098 lock
->l_resource
->lr_type
== LDLM_EXTENT
&&
1099 lock
->l_policy_data
.l_extent
.gid
!= policy
->l_extent
.gid
)
1102 /* We match if we have existing lock with same or wider set
1104 if (lock
->l_resource
->lr_type
== LDLM_IBITS
&&
1105 ((lock
->l_policy_data
.l_inodebits
.bits
&
1106 policy
->l_inodebits
.bits
) !=
1107 policy
->l_inodebits
.bits
))
1110 if (!unref
&& (lock
->l_flags
& LDLM_FL_GONE_MASK
))
1113 if ((flags
& LDLM_FL_LOCAL_ONLY
) &&
1114 !(lock
->l_flags
& LDLM_FL_LOCAL
))
1117 if (flags
& LDLM_FL_TEST_LOCK
) {
1118 LDLM_LOCK_GET(lock
);
1119 ldlm_lock_touch_in_lru(lock
);
1121 ldlm_lock_addref_internal_nolock(lock
, match
);
1130 void ldlm_lock_fail_match_locked(struct ldlm_lock
*lock
)
1132 if ((lock
->l_flags
& LDLM_FL_FAIL_NOTIFIED
) == 0) {
1133 lock
->l_flags
|= LDLM_FL_FAIL_NOTIFIED
;
1134 wake_up_all(&lock
->l_waitq
);
1137 EXPORT_SYMBOL(ldlm_lock_fail_match_locked
);
1140 * Mark lock as "matchable" by OST.
1142 * Used to prevent certain races in LOV/OSC where the lock is granted, but LVB
1144 * Assumes LDLM lock is already locked.
1146 void ldlm_lock_allow_match_locked(struct ldlm_lock
*lock
)
1148 lock
->l_flags
|= LDLM_FL_LVB_READY
;
1149 wake_up_all(&lock
->l_waitq
);
1151 EXPORT_SYMBOL(ldlm_lock_allow_match_locked
);
1154 * Mark lock as "matchable" by OST.
1155 * Locks the lock and then \see ldlm_lock_allow_match_locked
1157 void ldlm_lock_allow_match(struct ldlm_lock
*lock
)
1159 lock_res_and_lock(lock
);
1160 ldlm_lock_allow_match_locked(lock
);
1161 unlock_res_and_lock(lock
);
1163 EXPORT_SYMBOL(ldlm_lock_allow_match
);
1166 * Attempt to find a lock with specified properties.
1168 * Typically returns a reference to matched lock unless LDLM_FL_TEST_LOCK is
1171 * Can be called in two ways:
1173 * If 'ns' is NULL, then lockh describes an existing lock that we want to look
1174 * for a duplicate of.
1176 * Otherwise, all of the fields must be filled in, to match against.
1178 * If 'flags' contains LDLM_FL_LOCAL_ONLY, then only match local locks on the
1179 * server (ie, connh is NULL)
1180 * If 'flags' contains LDLM_FL_BLOCK_GRANTED, then only locks on the granted
1181 * list will be considered
1182 * If 'flags' contains LDLM_FL_CBPENDING, then locks that have been marked
1183 * to be canceled can still be matched as long as they still have reader
1184 * or writer referneces
1185 * If 'flags' contains LDLM_FL_TEST_LOCK, then don't actually reference a lock,
1186 * just tell us if we would have matched.
1188 * \retval 1 if it finds an already-existing lock that is compatible; in this
1189 * case, lockh is filled in with a addref()ed lock
1191 * We also check security context, and if that fails we simply return 0 (to
1192 * keep caller code unchanged), the context failure will be discovered by
1193 * caller sometime later.
1195 ldlm_mode_t
ldlm_lock_match(struct ldlm_namespace
*ns
, __u64 flags
,
1196 const struct ldlm_res_id
*res_id
, ldlm_type_t type
,
1197 ldlm_policy_data_t
*policy
, ldlm_mode_t mode
,
1198 struct lustre_handle
*lockh
, int unref
)
1200 struct ldlm_resource
*res
;
1201 struct ldlm_lock
*lock
, *old_lock
= NULL
;
1205 old_lock
= ldlm_handle2lock(lockh
);
1208 ns
= ldlm_lock_to_ns(old_lock
);
1209 res_id
= &old_lock
->l_resource
->lr_name
;
1210 type
= old_lock
->l_resource
->lr_type
;
1211 mode
= old_lock
->l_req_mode
;
1214 res
= ldlm_resource_get(ns
, NULL
, res_id
, type
, 0);
1216 LASSERT(old_lock
== NULL
);
1220 LDLM_RESOURCE_ADDREF(res
);
1223 lock
= search_queue(&res
->lr_granted
, &mode
, policy
, old_lock
,
1229 if (flags
& LDLM_FL_BLOCK_GRANTED
) {
1233 lock
= search_queue(&res
->lr_waiting
, &mode
, policy
, old_lock
,
1242 LDLM_RESOURCE_DELREF(res
);
1243 ldlm_resource_putref(res
);
1246 ldlm_lock2handle(lock
, lockh
);
1247 if ((flags
& LDLM_FL_LVB_READY
) &&
1248 (!(lock
->l_flags
& LDLM_FL_LVB_READY
))) {
1249 __u64 wait_flags
= LDLM_FL_LVB_READY
|
1250 LDLM_FL_DESTROYED
| LDLM_FL_FAIL_NOTIFIED
;
1251 struct l_wait_info lwi
;
1253 if (lock
->l_completion_ast
) {
1254 int err
= lock
->l_completion_ast(lock
,
1255 LDLM_FL_WAIT_NOREPROC
,
1258 if (flags
& LDLM_FL_TEST_LOCK
)
1259 LDLM_LOCK_RELEASE(lock
);
1261 ldlm_lock_decref_internal(lock
,
1268 lwi
= LWI_TIMEOUT_INTR(cfs_time_seconds(obd_timeout
),
1269 NULL
, LWI_ON_SIGNAL_NOOP
, NULL
);
1271 /* XXX FIXME see comment on CAN_MATCH in lustre_dlm.h */
1272 l_wait_event(lock
->l_waitq
,
1273 lock
->l_flags
& wait_flags
,
1275 if (!(lock
->l_flags
& LDLM_FL_LVB_READY
)) {
1276 if (flags
& LDLM_FL_TEST_LOCK
)
1277 LDLM_LOCK_RELEASE(lock
);
1279 ldlm_lock_decref_internal(lock
, mode
);
1286 LDLM_DEBUG(lock
, "matched (%llu %llu)",
1287 (type
== LDLM_PLAIN
|| type
== LDLM_IBITS
) ?
1288 res_id
->name
[2] : policy
->l_extent
.start
,
1289 (type
== LDLM_PLAIN
|| type
== LDLM_IBITS
) ?
1290 res_id
->name
[3] : policy
->l_extent
.end
);
1292 /* check user's security context */
1293 if (lock
->l_conn_export
&&
1294 sptlrpc_import_check_ctx(
1295 class_exp2cliimp(lock
->l_conn_export
))) {
1296 if (!(flags
& LDLM_FL_TEST_LOCK
))
1297 ldlm_lock_decref_internal(lock
, mode
);
1301 if (flags
& LDLM_FL_TEST_LOCK
)
1302 LDLM_LOCK_RELEASE(lock
);
1304 } else if (!(flags
& LDLM_FL_TEST_LOCK
)) {/*less verbose for test-only*/
1305 LDLM_DEBUG_NOLOCK("not matched ns %p type %u mode %u res %llu/%llu (%llu %llu)",
1306 ns
, type
, mode
, res_id
->name
[0],
1308 (type
== LDLM_PLAIN
|| type
== LDLM_IBITS
) ?
1309 res_id
->name
[2] : policy
->l_extent
.start
,
1310 (type
== LDLM_PLAIN
|| type
== LDLM_IBITS
) ?
1311 res_id
->name
[3] : policy
->l_extent
.end
);
1314 LDLM_LOCK_PUT(old_lock
);
1316 return rc
? mode
: 0;
1318 EXPORT_SYMBOL(ldlm_lock_match
);
1320 ldlm_mode_t
ldlm_revalidate_lock_handle(struct lustre_handle
*lockh
,
1323 struct ldlm_lock
*lock
;
1324 ldlm_mode_t mode
= 0;
1326 lock
= ldlm_handle2lock(lockh
);
1328 lock_res_and_lock(lock
);
1329 if (lock
->l_flags
& LDLM_FL_GONE_MASK
)
1332 if (lock
->l_flags
& LDLM_FL_CBPENDING
&&
1333 lock
->l_readers
== 0 && lock
->l_writers
== 0)
1337 *bits
= lock
->l_policy_data
.l_inodebits
.bits
;
1338 mode
= lock
->l_granted_mode
;
1339 ldlm_lock_addref_internal_nolock(lock
, mode
);
1344 unlock_res_and_lock(lock
);
1345 LDLM_LOCK_PUT(lock
);
1349 EXPORT_SYMBOL(ldlm_revalidate_lock_handle
);
1351 /** The caller must guarantee that the buffer is large enough. */
1352 int ldlm_fill_lvb(struct ldlm_lock
*lock
, struct req_capsule
*pill
,
1353 enum req_location loc
, void *data
, int size
)
1357 LASSERT(data
!= NULL
);
1360 switch (lock
->l_lvb_type
) {
1362 if (size
== sizeof(struct ost_lvb
)) {
1363 if (loc
== RCL_CLIENT
)
1364 lvb
= req_capsule_client_swab_get(pill
,
1366 lustre_swab_ost_lvb
);
1368 lvb
= req_capsule_server_swab_get(pill
,
1370 lustre_swab_ost_lvb
);
1371 if (unlikely(lvb
== NULL
)) {
1372 LDLM_ERROR(lock
, "no LVB");
1376 memcpy(data
, lvb
, size
);
1377 } else if (size
== sizeof(struct ost_lvb_v1
)) {
1378 struct ost_lvb
*olvb
= data
;
1380 if (loc
== RCL_CLIENT
)
1381 lvb
= req_capsule_client_swab_get(pill
,
1383 lustre_swab_ost_lvb_v1
);
1385 lvb
= req_capsule_server_sized_swab_get(pill
,
1387 lustre_swab_ost_lvb_v1
);
1388 if (unlikely(lvb
== NULL
)) {
1389 LDLM_ERROR(lock
, "no LVB");
1393 memcpy(data
, lvb
, size
);
1394 olvb
->lvb_mtime_ns
= 0;
1395 olvb
->lvb_atime_ns
= 0;
1396 olvb
->lvb_ctime_ns
= 0;
1398 LDLM_ERROR(lock
, "Replied unexpected ost LVB size %d",
1404 if (size
== sizeof(struct lquota_lvb
)) {
1405 if (loc
== RCL_CLIENT
)
1406 lvb
= req_capsule_client_swab_get(pill
,
1408 lustre_swab_lquota_lvb
);
1410 lvb
= req_capsule_server_swab_get(pill
,
1412 lustre_swab_lquota_lvb
);
1413 if (unlikely(lvb
== NULL
)) {
1414 LDLM_ERROR(lock
, "no LVB");
1418 memcpy(data
, lvb
, size
);
1421 "Replied unexpected lquota LVB size %d",
1430 if (loc
== RCL_CLIENT
)
1431 lvb
= req_capsule_client_get(pill
, &RMF_DLM_LVB
);
1433 lvb
= req_capsule_server_get(pill
, &RMF_DLM_LVB
);
1434 if (unlikely(lvb
== NULL
)) {
1435 LDLM_ERROR(lock
, "no LVB");
1439 memcpy(data
, lvb
, size
);
1442 LDLM_ERROR(lock
, "Unknown LVB type: %d\n", lock
->l_lvb_type
);
1451 * Create and fill in new LDLM lock with specified properties.
1452 * Returns a referenced lock
1454 struct ldlm_lock
*ldlm_lock_create(struct ldlm_namespace
*ns
,
1455 const struct ldlm_res_id
*res_id
,
1458 const struct ldlm_callback_suite
*cbs
,
1459 void *data
, __u32 lvb_len
,
1460 enum lvb_type lvb_type
)
1462 struct ldlm_lock
*lock
;
1463 struct ldlm_resource
*res
;
1465 res
= ldlm_resource_get(ns
, NULL
, res_id
, type
, 1);
1469 lock
= ldlm_lock_new(res
);
1474 lock
->l_req_mode
= mode
;
1475 lock
->l_ast_data
= data
;
1476 lock
->l_pid
= current_pid();
1478 lock
->l_blocking_ast
= cbs
->lcs_blocking
;
1479 lock
->l_completion_ast
= cbs
->lcs_completion
;
1480 lock
->l_glimpse_ast
= cbs
->lcs_glimpse
;
1483 lock
->l_tree_node
= NULL
;
1484 /* if this is the extent lock, allocate the interval tree node */
1485 if (type
== LDLM_EXTENT
) {
1486 if (ldlm_interval_alloc(lock
) == NULL
)
1491 lock
->l_lvb_len
= lvb_len
;
1492 lock
->l_lvb_data
= kzalloc(lvb_len
, GFP_NOFS
);
1493 if (!lock
->l_lvb_data
)
1497 lock
->l_lvb_type
= lvb_type
;
1498 if (OBD_FAIL_CHECK(OBD_FAIL_LDLM_NEW_LOCK
))
1504 ldlm_lock_destroy(lock
);
1505 LDLM_LOCK_RELEASE(lock
);
1510 * Enqueue (request) a lock.
1511 * On the client this is called from ldlm_cli_enqueue_fini
1512 * after we already got an initial reply from the server with some status.
1514 * Does not block. As a result of enqueue the lock would be put
1515 * into granted or waiting list.
1517 ldlm_error_t
ldlm_lock_enqueue(struct ldlm_namespace
*ns
,
1518 struct ldlm_lock
**lockp
,
1519 void *cookie
, __u64
*flags
)
1521 struct ldlm_lock
*lock
= *lockp
;
1522 struct ldlm_resource
*res
= lock
->l_resource
;
1524 lock
->l_last_activity
= ktime_get_real_seconds();
1526 lock_res_and_lock(lock
);
1527 if (lock
->l_req_mode
== lock
->l_granted_mode
) {
1528 /* The server returned a blocked lock, but it was granted
1529 * before we got a chance to actually enqueue it. We don't
1530 * need to do anything else. */
1531 *flags
&= ~(LDLM_FL_BLOCK_GRANTED
|
1532 LDLM_FL_BLOCK_CONV
| LDLM_FL_BLOCK_WAIT
);
1536 ldlm_resource_unlink_lock(lock
);
1538 /* Cannot happen unless on the server */
1539 if (res
->lr_type
== LDLM_EXTENT
&& !lock
->l_tree_node
)
1542 /* Some flags from the enqueue want to make it into the AST, via the
1543 * lock's l_flags. */
1544 lock
->l_flags
|= *flags
& LDLM_FL_AST_DISCARD_DATA
;
1547 * This distinction between local lock trees is very important; a client
1548 * namespace only has information about locks taken by that client, and
1549 * thus doesn't have enough information to decide for itself if it can
1550 * be granted (below). In this case, we do exactly what the server
1551 * tells us to do, as dictated by the 'flags'.
1553 if (*flags
& (LDLM_FL_BLOCK_WAIT
| LDLM_FL_BLOCK_GRANTED
))
1554 ldlm_resource_add_lock(res
, &res
->lr_waiting
, lock
);
1556 ldlm_grant_lock(lock
, NULL
);
1559 unlock_res_and_lock(lock
);
1564 * Process a call to blocking AST callback for a lock in ast_work list
1567 ldlm_work_bl_ast_lock(struct ptlrpc_request_set
*rqset
, void *opaq
)
1569 struct ldlm_cb_set_arg
*arg
= opaq
;
1570 struct ldlm_lock_desc d
;
1572 struct ldlm_lock
*lock
;
1574 if (list_empty(arg
->list
))
1577 lock
= list_entry(arg
->list
->next
, struct ldlm_lock
, l_bl_ast
);
1579 /* nobody should touch l_bl_ast */
1580 lock_res_and_lock(lock
);
1581 list_del_init(&lock
->l_bl_ast
);
1583 LASSERT(lock
->l_flags
& LDLM_FL_AST_SENT
);
1584 LASSERT(lock
->l_bl_ast_run
== 0);
1585 LASSERT(lock
->l_blocking_lock
);
1586 lock
->l_bl_ast_run
++;
1587 unlock_res_and_lock(lock
);
1589 ldlm_lock2desc(lock
->l_blocking_lock
, &d
);
1591 rc
= lock
->l_blocking_ast(lock
, &d
, (void *)arg
, LDLM_CB_BLOCKING
);
1592 LDLM_LOCK_RELEASE(lock
->l_blocking_lock
);
1593 lock
->l_blocking_lock
= NULL
;
1594 LDLM_LOCK_RELEASE(lock
);
1600 * Process a call to completion AST callback for a lock in ast_work list
1603 ldlm_work_cp_ast_lock(struct ptlrpc_request_set
*rqset
, void *opaq
)
1605 struct ldlm_cb_set_arg
*arg
= opaq
;
1607 struct ldlm_lock
*lock
;
1608 ldlm_completion_callback completion_callback
;
1610 if (list_empty(arg
->list
))
1613 lock
= list_entry(arg
->list
->next
, struct ldlm_lock
, l_cp_ast
);
1615 /* It's possible to receive a completion AST before we've set
1616 * the l_completion_ast pointer: either because the AST arrived
1617 * before the reply, or simply because there's a small race
1618 * window between receiving the reply and finishing the local
1619 * enqueue. (bug 842)
1621 * This can't happen with the blocking_ast, however, because we
1622 * will never call the local blocking_ast until we drop our
1623 * reader/writer reference, which we won't do until we get the
1624 * reply and finish enqueueing. */
1626 /* nobody should touch l_cp_ast */
1627 lock_res_and_lock(lock
);
1628 list_del_init(&lock
->l_cp_ast
);
1629 LASSERT(lock
->l_flags
& LDLM_FL_CP_REQD
);
1630 /* save l_completion_ast since it can be changed by
1631 * mds_intent_policy(), see bug 14225 */
1632 completion_callback
= lock
->l_completion_ast
;
1633 lock
->l_flags
&= ~LDLM_FL_CP_REQD
;
1634 unlock_res_and_lock(lock
);
1636 if (completion_callback
!= NULL
)
1637 rc
= completion_callback(lock
, 0, (void *)arg
);
1638 LDLM_LOCK_RELEASE(lock
);
1644 * Process a call to revocation AST callback for a lock in ast_work list
1647 ldlm_work_revoke_ast_lock(struct ptlrpc_request_set
*rqset
, void *opaq
)
1649 struct ldlm_cb_set_arg
*arg
= opaq
;
1650 struct ldlm_lock_desc desc
;
1652 struct ldlm_lock
*lock
;
1654 if (list_empty(arg
->list
))
1657 lock
= list_entry(arg
->list
->next
, struct ldlm_lock
, l_rk_ast
);
1658 list_del_init(&lock
->l_rk_ast
);
1660 /* the desc just pretend to exclusive */
1661 ldlm_lock2desc(lock
, &desc
);
1662 desc
.l_req_mode
= LCK_EX
;
1663 desc
.l_granted_mode
= 0;
1665 rc
= lock
->l_blocking_ast(lock
, &desc
, (void *)arg
, LDLM_CB_BLOCKING
);
1666 LDLM_LOCK_RELEASE(lock
);
1672 * Process a call to glimpse AST callback for a lock in ast_work list
1674 static int ldlm_work_gl_ast_lock(struct ptlrpc_request_set
*rqset
, void *opaq
)
1676 struct ldlm_cb_set_arg
*arg
= opaq
;
1677 struct ldlm_glimpse_work
*gl_work
;
1678 struct ldlm_lock
*lock
;
1681 if (list_empty(arg
->list
))
1684 gl_work
= list_entry(arg
->list
->next
, struct ldlm_glimpse_work
,
1686 list_del_init(&gl_work
->gl_list
);
1688 lock
= gl_work
->gl_lock
;
1690 /* transfer the glimpse descriptor to ldlm_cb_set_arg */
1691 arg
->gl_desc
= gl_work
->gl_desc
;
1693 /* invoke the actual glimpse callback */
1694 if (lock
->l_glimpse_ast(lock
, (void *)arg
) == 0)
1697 LDLM_LOCK_RELEASE(lock
);
1699 if ((gl_work
->gl_flags
& LDLM_GL_WORK_NOFREE
) == 0)
1706 * Process list of locks in need of ASTs being sent.
1708 * Used on server to send multiple ASTs together instead of sending one by
1711 int ldlm_run_ast_work(struct ldlm_namespace
*ns
, struct list_head
*rpc_list
,
1712 enum ldlm_desc_ast_t ast_type
)
1714 struct ldlm_cb_set_arg
*arg
;
1715 set_producer_func work_ast_lock
;
1718 if (list_empty(rpc_list
))
1721 arg
= kzalloc(sizeof(*arg
), GFP_NOFS
);
1725 atomic_set(&arg
->restart
, 0);
1726 arg
->list
= rpc_list
;
1729 case LDLM_WORK_BL_AST
:
1730 arg
->type
= LDLM_BL_CALLBACK
;
1731 work_ast_lock
= ldlm_work_bl_ast_lock
;
1733 case LDLM_WORK_CP_AST
:
1734 arg
->type
= LDLM_CP_CALLBACK
;
1735 work_ast_lock
= ldlm_work_cp_ast_lock
;
1737 case LDLM_WORK_REVOKE_AST
:
1738 arg
->type
= LDLM_BL_CALLBACK
;
1739 work_ast_lock
= ldlm_work_revoke_ast_lock
;
1741 case LDLM_WORK_GL_AST
:
1742 arg
->type
= LDLM_GL_CALLBACK
;
1743 work_ast_lock
= ldlm_work_gl_ast_lock
;
1749 /* We create a ptlrpc request set with flow control extension.
1750 * This request set will use the work_ast_lock function to produce new
1751 * requests and will send a new request each time one completes in order
1752 * to keep the number of requests in flight to ns_max_parallel_ast */
1753 arg
->set
= ptlrpc_prep_fcset(ns
->ns_max_parallel_ast
? : UINT_MAX
,
1754 work_ast_lock
, arg
);
1755 if (arg
->set
== NULL
) {
1760 ptlrpc_set_wait(arg
->set
);
1761 ptlrpc_set_destroy(arg
->set
);
1763 rc
= atomic_read(&arg
->restart
) ? -ERESTART
: 0;
1771 * Helper function to call blocking AST for LDLM lock \a lock in a
1772 * "cancelling" mode.
1774 void ldlm_cancel_callback(struct ldlm_lock
*lock
)
1776 check_res_locked(lock
->l_resource
);
1777 if (!(lock
->l_flags
& LDLM_FL_CANCEL
)) {
1778 lock
->l_flags
|= LDLM_FL_CANCEL
;
1779 if (lock
->l_blocking_ast
) {
1780 unlock_res_and_lock(lock
);
1781 lock
->l_blocking_ast(lock
, NULL
, lock
->l_ast_data
,
1783 lock_res_and_lock(lock
);
1785 LDLM_DEBUG(lock
, "no blocking ast");
1788 lock
->l_flags
|= LDLM_FL_BL_DONE
;
1792 * Remove skiplist-enabled LDLM lock \a req from granted list
1794 void ldlm_unlink_lock_skiplist(struct ldlm_lock
*req
)
1796 if (req
->l_resource
->lr_type
!= LDLM_PLAIN
&&
1797 req
->l_resource
->lr_type
!= LDLM_IBITS
)
1800 list_del_init(&req
->l_sl_policy
);
1801 list_del_init(&req
->l_sl_mode
);
1805 * Attempts to cancel LDLM lock \a lock that has no reader/writer references.
1807 void ldlm_lock_cancel(struct ldlm_lock
*lock
)
1809 struct ldlm_resource
*res
;
1810 struct ldlm_namespace
*ns
;
1812 lock_res_and_lock(lock
);
1814 res
= lock
->l_resource
;
1815 ns
= ldlm_res_to_ns(res
);
1817 /* Please do not, no matter how tempting, remove this LBUG without
1818 * talking to me first. -phik */
1819 if (lock
->l_readers
|| lock
->l_writers
) {
1820 LDLM_ERROR(lock
, "lock still has references");
1824 /* Releases cancel callback. */
1825 ldlm_cancel_callback(lock
);
1827 ldlm_resource_unlink_lock(lock
);
1828 ldlm_lock_destroy_nolock(lock
);
1830 if (lock
->l_granted_mode
== lock
->l_req_mode
)
1831 ldlm_pool_del(&ns
->ns_pool
, lock
);
1833 /* Make sure we will not be called again for same lock what is possible
1834 * if not to zero out lock->l_granted_mode */
1835 lock
->l_granted_mode
= LCK_MINMODE
;
1836 unlock_res_and_lock(lock
);
1838 EXPORT_SYMBOL(ldlm_lock_cancel
);
1841 * Set opaque data into the lock that only makes sense to upper layer.
1843 int ldlm_lock_set_data(struct lustre_handle
*lockh
, void *data
)
1845 struct ldlm_lock
*lock
= ldlm_handle2lock(lockh
);
1849 if (lock
->l_ast_data
== NULL
)
1850 lock
->l_ast_data
= data
;
1851 if (lock
->l_ast_data
== data
)
1853 LDLM_LOCK_PUT(lock
);
1857 EXPORT_SYMBOL(ldlm_lock_set_data
);
1859 struct export_cl_data
{
1860 struct obd_export
*ecl_exp
;
1865 * Print lock with lock handle \a lockh description into debug log.
1867 * Used when printing all locks on a resource for debug purposes.
1869 void ldlm_lock_dump_handle(int level
, struct lustre_handle
*lockh
)
1871 struct ldlm_lock
*lock
;
1873 if (!((libcfs_debug
| D_ERROR
) & level
))
1876 lock
= ldlm_handle2lock(lockh
);
1880 LDLM_DEBUG_LIMIT(level
, lock
, "###");
1882 LDLM_LOCK_PUT(lock
);
1884 EXPORT_SYMBOL(ldlm_lock_dump_handle
);
1887 * Print lock information with custom message into debug log.
1890 void _ldlm_lock_debug(struct ldlm_lock
*lock
,
1891 struct libcfs_debug_msg_data
*msgdata
,
1892 const char *fmt
, ...)
1895 struct obd_export
*exp
= lock
->l_export
;
1896 struct ldlm_resource
*resource
= lock
->l_resource
;
1897 char *nid
= "local";
1899 va_start(args
, fmt
);
1901 if (exp
&& exp
->exp_connection
) {
1902 nid
= libcfs_nid2str(exp
->exp_connection
->c_peer
.nid
);
1903 } else if (exp
&& exp
->exp_obd
!= NULL
) {
1904 struct obd_import
*imp
= exp
->exp_obd
->u
.cli
.cl_import
;
1906 nid
= libcfs_nid2str(imp
->imp_connection
->c_peer
.nid
);
1909 if (resource
== NULL
) {
1910 libcfs_debug_vmsg2(msgdata
, fmt
, args
,
1911 " ns: \?\? lock: %p/%#llx lrc: %d/%d,%d mode: %s/%s res: \?\? rrc=\?\? type: \?\?\? flags: %#llx nid: %s remote: %#llx expref: %d pid: %u timeout: %lu lvb_type: %d\n",
1913 lock
->l_handle
.h_cookie
, atomic_read(&lock
->l_refc
),
1914 lock
->l_readers
, lock
->l_writers
,
1915 ldlm_lockname
[lock
->l_granted_mode
],
1916 ldlm_lockname
[lock
->l_req_mode
],
1917 lock
->l_flags
, nid
, lock
->l_remote_handle
.cookie
,
1918 exp
? atomic_read(&exp
->exp_refcount
) : -99,
1919 lock
->l_pid
, lock
->l_callback_timeout
, lock
->l_lvb_type
);
1924 switch (resource
->lr_type
) {
1926 libcfs_debug_vmsg2(msgdata
, fmt
, args
,
1927 " ns: %s lock: %p/%#llx lrc: %d/%d,%d mode: %s/%s res: " DLDLMRES
" rrc: %d type: %s [%llu->%llu] (req %llu->%llu) flags: %#llx nid: %s remote: %#llx expref: %d pid: %u timeout: %lu lvb_type: %d\n",
1928 ldlm_lock_to_ns_name(lock
), lock
,
1929 lock
->l_handle
.h_cookie
, atomic_read(&lock
->l_refc
),
1930 lock
->l_readers
, lock
->l_writers
,
1931 ldlm_lockname
[lock
->l_granted_mode
],
1932 ldlm_lockname
[lock
->l_req_mode
],
1934 atomic_read(&resource
->lr_refcount
),
1935 ldlm_typename
[resource
->lr_type
],
1936 lock
->l_policy_data
.l_extent
.start
,
1937 lock
->l_policy_data
.l_extent
.end
,
1938 lock
->l_req_extent
.start
, lock
->l_req_extent
.end
,
1939 lock
->l_flags
, nid
, lock
->l_remote_handle
.cookie
,
1940 exp
? atomic_read(&exp
->exp_refcount
) : -99,
1941 lock
->l_pid
, lock
->l_callback_timeout
,
1946 libcfs_debug_vmsg2(msgdata
, fmt
, args
,
1947 " ns: %s lock: %p/%#llx lrc: %d/%d,%d mode: %s/%s res: " DLDLMRES
" rrc: %d type: %s pid: %d [%llu->%llu] flags: %#llx nid: %s remote: %#llx expref: %d pid: %u timeout: %lu\n",
1948 ldlm_lock_to_ns_name(lock
), lock
,
1949 lock
->l_handle
.h_cookie
, atomic_read(&lock
->l_refc
),
1950 lock
->l_readers
, lock
->l_writers
,
1951 ldlm_lockname
[lock
->l_granted_mode
],
1952 ldlm_lockname
[lock
->l_req_mode
],
1954 atomic_read(&resource
->lr_refcount
),
1955 ldlm_typename
[resource
->lr_type
],
1956 lock
->l_policy_data
.l_flock
.pid
,
1957 lock
->l_policy_data
.l_flock
.start
,
1958 lock
->l_policy_data
.l_flock
.end
,
1959 lock
->l_flags
, nid
, lock
->l_remote_handle
.cookie
,
1960 exp
? atomic_read(&exp
->exp_refcount
) : -99,
1961 lock
->l_pid
, lock
->l_callback_timeout
);
1965 libcfs_debug_vmsg2(msgdata
, fmt
, args
,
1966 " ns: %s lock: %p/%#llx lrc: %d/%d,%d mode: %s/%s res: " DLDLMRES
" bits %#llx rrc: %d type: %s flags: %#llx nid: %s remote: %#llx expref: %d pid: %u timeout: %lu lvb_type: %d\n",
1967 ldlm_lock_to_ns_name(lock
),
1968 lock
, lock
->l_handle
.h_cookie
,
1969 atomic_read(&lock
->l_refc
),
1970 lock
->l_readers
, lock
->l_writers
,
1971 ldlm_lockname
[lock
->l_granted_mode
],
1972 ldlm_lockname
[lock
->l_req_mode
],
1974 lock
->l_policy_data
.l_inodebits
.bits
,
1975 atomic_read(&resource
->lr_refcount
),
1976 ldlm_typename
[resource
->lr_type
],
1977 lock
->l_flags
, nid
, lock
->l_remote_handle
.cookie
,
1978 exp
? atomic_read(&exp
->exp_refcount
) : -99,
1979 lock
->l_pid
, lock
->l_callback_timeout
,
1984 libcfs_debug_vmsg2(msgdata
, fmt
, args
,
1985 " ns: %s lock: %p/%#llx lrc: %d/%d,%d mode: %s/%s res: " DLDLMRES
" rrc: %d type: %s flags: %#llx nid: %s remote: %#llx expref: %d pid: %u timeout: %lu lvb_type: %d\n",
1986 ldlm_lock_to_ns_name(lock
),
1987 lock
, lock
->l_handle
.h_cookie
,
1988 atomic_read(&lock
->l_refc
),
1989 lock
->l_readers
, lock
->l_writers
,
1990 ldlm_lockname
[lock
->l_granted_mode
],
1991 ldlm_lockname
[lock
->l_req_mode
],
1993 atomic_read(&resource
->lr_refcount
),
1994 ldlm_typename
[resource
->lr_type
],
1995 lock
->l_flags
, nid
, lock
->l_remote_handle
.cookie
,
1996 exp
? atomic_read(&exp
->exp_refcount
) : -99,
1997 lock
->l_pid
, lock
->l_callback_timeout
,
2003 EXPORT_SYMBOL(_ldlm_lock_debug
);