4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License version 2 for more details (a copy is included
14 * in the LICENSE file that accompanied this code).
16 * You should have received a copy of the GNU General Public License
17 * version 2 along with this program; If not, see
18 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
20 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21 * CA 95054 USA or visit www.sun.com if you need additional information or
27 * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
28 * Use is subject to license terms.
30 * Copyright (c) 2010, 2012, Intel Corporation.
33 * This file is part of Lustre, http://www.lustre.org/
34 * Lustre is a trademark of Sun Microsystems, Inc.
36 * lustre/ldlm/ldlm_lock.c
38 * Author: Peter Braam <braam@clusterfs.com>
39 * Author: Phil Schwan <phil@clusterfs.com>
42 #define DEBUG_SUBSYSTEM S_LDLM
44 #include "../../include/linux/libcfs/libcfs.h"
45 #include "../include/lustre_intent.h"
46 #include "../include/obd_class.h"
47 #include "ldlm_internal.h"
50 char *ldlm_lockname
[] = {
58 [LCK_GROUP
] = "GROUP",
61 EXPORT_SYMBOL(ldlm_lockname
);
63 char *ldlm_typename
[] = {
65 [LDLM_EXTENT
] = "EXT",
69 EXPORT_SYMBOL(ldlm_typename
);
71 static ldlm_policy_wire_to_local_t ldlm_policy_wire18_to_local
[] = {
72 [LDLM_PLAIN
- LDLM_MIN_TYPE
] = ldlm_plain_policy_wire_to_local
,
73 [LDLM_EXTENT
- LDLM_MIN_TYPE
] = ldlm_extent_policy_wire_to_local
,
74 [LDLM_FLOCK
- LDLM_MIN_TYPE
] = ldlm_flock_policy_wire18_to_local
,
75 [LDLM_IBITS
- LDLM_MIN_TYPE
] = ldlm_ibits_policy_wire_to_local
,
78 static ldlm_policy_wire_to_local_t ldlm_policy_wire21_to_local
[] = {
79 [LDLM_PLAIN
- LDLM_MIN_TYPE
] = ldlm_plain_policy_wire_to_local
,
80 [LDLM_EXTENT
- LDLM_MIN_TYPE
] = ldlm_extent_policy_wire_to_local
,
81 [LDLM_FLOCK
- LDLM_MIN_TYPE
] = ldlm_flock_policy_wire21_to_local
,
82 [LDLM_IBITS
- LDLM_MIN_TYPE
] = ldlm_ibits_policy_wire_to_local
,
85 static ldlm_policy_local_to_wire_t ldlm_policy_local_to_wire
[] = {
86 [LDLM_PLAIN
- LDLM_MIN_TYPE
] = ldlm_plain_policy_local_to_wire
,
87 [LDLM_EXTENT
- LDLM_MIN_TYPE
] = ldlm_extent_policy_local_to_wire
,
88 [LDLM_FLOCK
- LDLM_MIN_TYPE
] = ldlm_flock_policy_local_to_wire
,
89 [LDLM_IBITS
- LDLM_MIN_TYPE
] = ldlm_ibits_policy_local_to_wire
,
93 * Converts lock policy from local format to on the wire lock_desc format
95 void ldlm_convert_policy_to_wire(ldlm_type_t type
,
96 const ldlm_policy_data_t
*lpolicy
,
97 ldlm_wire_policy_data_t
*wpolicy
)
99 ldlm_policy_local_to_wire_t convert
;
101 convert
= ldlm_policy_local_to_wire
[type
- LDLM_MIN_TYPE
];
103 convert(lpolicy
, wpolicy
);
107 * Converts lock policy from on the wire lock_desc format to local format
109 void ldlm_convert_policy_to_local(struct obd_export
*exp
, ldlm_type_t type
,
110 const ldlm_wire_policy_data_t
*wpolicy
,
111 ldlm_policy_data_t
*lpolicy
)
113 ldlm_policy_wire_to_local_t convert
;
116 /** some badness for 2.0.0 clients, but 2.0.0 isn't supported */
117 new_client
= (exp_connect_flags(exp
) & OBD_CONNECT_FULL20
) != 0;
119 convert
= ldlm_policy_wire21_to_local
[type
- LDLM_MIN_TYPE
];
121 convert
= ldlm_policy_wire18_to_local
[type
- LDLM_MIN_TYPE
];
123 convert(wpolicy
, lpolicy
);
126 char *ldlm_it2str(int it
)
133 case (IT_OPEN
| IT_CREAT
):
148 CERROR("Unknown intent %d\n", it
);
152 EXPORT_SYMBOL(ldlm_it2str
);
155 void ldlm_register_intent(struct ldlm_namespace
*ns
, ldlm_res_policy arg
)
159 EXPORT_SYMBOL(ldlm_register_intent
);
162 * REFCOUNTED LOCK OBJECTS
167 * Get a reference on a lock.
169 * Lock refcounts, during creation:
170 * - one special one for allocation, dec'd only once in destroy
171 * - one for being a lock that's in-use
172 * - one for the addref associated with a new lock
174 struct ldlm_lock
*ldlm_lock_get(struct ldlm_lock
*lock
)
176 atomic_inc(&lock
->l_refc
);
179 EXPORT_SYMBOL(ldlm_lock_get
);
182 * Release lock reference.
184 * Also frees the lock if it was last reference.
186 void ldlm_lock_put(struct ldlm_lock
*lock
)
188 LASSERT(lock
->l_resource
!= LP_POISON
);
189 LASSERT(atomic_read(&lock
->l_refc
) > 0);
190 if (atomic_dec_and_test(&lock
->l_refc
)) {
191 struct ldlm_resource
*res
;
194 "final lock_put on destroyed lock, freeing it.");
196 res
= lock
->l_resource
;
197 LASSERT(lock
->l_flags
& LDLM_FL_DESTROYED
);
198 LASSERT(list_empty(&lock
->l_res_link
));
199 LASSERT(list_empty(&lock
->l_pending_chain
));
201 lprocfs_counter_decr(ldlm_res_to_ns(res
)->ns_stats
,
203 lu_ref_del(&res
->lr_reference
, "lock", lock
);
204 ldlm_resource_putref(res
);
205 lock
->l_resource
= NULL
;
206 if (lock
->l_export
) {
207 class_export_lock_put(lock
->l_export
, lock
);
208 lock
->l_export
= NULL
;
211 kfree(lock
->l_lvb_data
);
213 ldlm_interval_free(ldlm_interval_detach(lock
));
214 lu_ref_fini(&lock
->l_reference
);
215 OBD_FREE_RCU(lock
, sizeof(*lock
), &lock
->l_handle
);
218 EXPORT_SYMBOL(ldlm_lock_put
);
221 * Removes LDLM lock \a lock from LRU. Assumes LRU is already locked.
223 int ldlm_lock_remove_from_lru_nolock(struct ldlm_lock
*lock
)
227 if (!list_empty(&lock
->l_lru
)) {
228 struct ldlm_namespace
*ns
= ldlm_lock_to_ns(lock
);
230 LASSERT(lock
->l_resource
->lr_type
!= LDLM_FLOCK
);
231 list_del_init(&lock
->l_lru
);
232 LASSERT(ns
->ns_nr_unused
> 0);
240 * Removes LDLM lock \a lock from LRU. Obtains the LRU lock first.
242 int ldlm_lock_remove_from_lru(struct ldlm_lock
*lock
)
244 struct ldlm_namespace
*ns
= ldlm_lock_to_ns(lock
);
247 if (lock
->l_flags
& LDLM_FL_NS_SRV
) {
248 LASSERT(list_empty(&lock
->l_lru
));
252 spin_lock(&ns
->ns_lock
);
253 rc
= ldlm_lock_remove_from_lru_nolock(lock
);
254 spin_unlock(&ns
->ns_lock
);
259 * Adds LDLM lock \a lock to namespace LRU. Assumes LRU is already locked.
261 void ldlm_lock_add_to_lru_nolock(struct ldlm_lock
*lock
)
263 struct ldlm_namespace
*ns
= ldlm_lock_to_ns(lock
);
265 lock
->l_last_used
= cfs_time_current();
266 LASSERT(list_empty(&lock
->l_lru
));
267 LASSERT(lock
->l_resource
->lr_type
!= LDLM_FLOCK
);
268 list_add_tail(&lock
->l_lru
, &ns
->ns_unused_list
);
269 if (lock
->l_flags
& LDLM_FL_SKIPPED
)
270 lock
->l_flags
&= ~LDLM_FL_SKIPPED
;
271 LASSERT(ns
->ns_nr_unused
>= 0);
276 * Adds LDLM lock \a lock to namespace LRU. Obtains necessary LRU locks
279 void ldlm_lock_add_to_lru(struct ldlm_lock
*lock
)
281 struct ldlm_namespace
*ns
= ldlm_lock_to_ns(lock
);
283 spin_lock(&ns
->ns_lock
);
284 ldlm_lock_add_to_lru_nolock(lock
);
285 spin_unlock(&ns
->ns_lock
);
289 * Moves LDLM lock \a lock that is already in namespace LRU to the tail of
290 * the LRU. Performs necessary LRU locking
292 void ldlm_lock_touch_in_lru(struct ldlm_lock
*lock
)
294 struct ldlm_namespace
*ns
= ldlm_lock_to_ns(lock
);
296 if (lock
->l_flags
& LDLM_FL_NS_SRV
) {
297 LASSERT(list_empty(&lock
->l_lru
));
301 spin_lock(&ns
->ns_lock
);
302 if (!list_empty(&lock
->l_lru
)) {
303 ldlm_lock_remove_from_lru_nolock(lock
);
304 ldlm_lock_add_to_lru_nolock(lock
);
306 spin_unlock(&ns
->ns_lock
);
310 * Helper to destroy a locked lock.
312 * Used by ldlm_lock_destroy and ldlm_lock_destroy_nolock
313 * Must be called with l_lock and lr_lock held.
315 * Does not actually free the lock data, but rather marks the lock as
316 * destroyed by setting l_destroyed field in the lock to 1. Destroys a
317 * handle->lock association too, so that the lock can no longer be found
318 * and removes the lock from LRU list. Actual lock freeing occurs when
319 * last lock reference goes away.
321 * Original comment (of some historical value):
322 * This used to have a 'strict' flag, which recovery would use to mark an
323 * in-use lock as needing-to-die. Lest I am ever tempted to put it back, I
324 * shall explain why it's gone: with the new hash table scheme, once you call
325 * ldlm_lock_destroy, you can never drop your final references on this lock.
326 * Because it's not in the hash table anymore. -phil
328 int ldlm_lock_destroy_internal(struct ldlm_lock
*lock
)
330 if (lock
->l_readers
|| lock
->l_writers
) {
331 LDLM_ERROR(lock
, "lock still has references");
335 if (!list_empty(&lock
->l_res_link
)) {
336 LDLM_ERROR(lock
, "lock still on resource");
340 if (lock
->l_flags
& LDLM_FL_DESTROYED
) {
341 LASSERT(list_empty(&lock
->l_lru
));
344 lock
->l_flags
|= LDLM_FL_DESTROYED
;
346 if (lock
->l_export
&& lock
->l_export
->exp_lock_hash
) {
347 /* NB: it's safe to call cfs_hash_del() even lock isn't
348 * in exp_lock_hash. */
349 /* In the function below, .hs_keycmp resolves to
350 * ldlm_export_lock_keycmp() */
351 /* coverity[overrun-buffer-val] */
352 cfs_hash_del(lock
->l_export
->exp_lock_hash
,
353 &lock
->l_remote_handle
, &lock
->l_exp_hash
);
356 ldlm_lock_remove_from_lru(lock
);
357 class_handle_unhash(&lock
->l_handle
);
360 /* Wake anyone waiting for this lock */
361 /* FIXME: I should probably add yet another flag, instead of using
362 * l_export to only call this on clients */
364 class_export_put(lock
->l_export
);
365 lock
->l_export
= NULL
;
366 if (lock
->l_export
&& lock
->l_completion_ast
)
367 lock
->l_completion_ast(lock
, 0);
373 * Destroys a LDLM lock \a lock. Performs necessary locking first.
375 void ldlm_lock_destroy(struct ldlm_lock
*lock
)
379 lock_res_and_lock(lock
);
380 first
= ldlm_lock_destroy_internal(lock
);
381 unlock_res_and_lock(lock
);
383 /* drop reference from hashtable only for first destroy */
385 lu_ref_del(&lock
->l_reference
, "hash", lock
);
386 LDLM_LOCK_RELEASE(lock
);
391 * Destroys a LDLM lock \a lock that is already locked.
393 void ldlm_lock_destroy_nolock(struct ldlm_lock
*lock
)
397 first
= ldlm_lock_destroy_internal(lock
);
398 /* drop reference from hashtable only for first destroy */
400 lu_ref_del(&lock
->l_reference
, "hash", lock
);
401 LDLM_LOCK_RELEASE(lock
);
405 /* this is called by portals_handle2object with the handle lock taken */
406 static void lock_handle_addref(void *lock
)
408 LDLM_LOCK_GET((struct ldlm_lock
*)lock
);
411 static void lock_handle_free(void *lock
, int size
)
413 LASSERT(size
== sizeof(struct ldlm_lock
));
414 OBD_SLAB_FREE(lock
, ldlm_lock_slab
, size
);
417 struct portals_handle_ops lock_handle_ops
= {
418 .hop_addref
= lock_handle_addref
,
419 .hop_free
= lock_handle_free
,
424 * Allocate and initialize new lock structure.
426 * usage: pass in a resource on which you have done ldlm_resource_get
427 * new lock will take over the refcount.
428 * returns: lock with refcount 2 - one for current caller and one for remote
430 static struct ldlm_lock
*ldlm_lock_new(struct ldlm_resource
*resource
)
432 struct ldlm_lock
*lock
;
434 if (resource
== NULL
)
437 OBD_SLAB_ALLOC_PTR_GFP(lock
, ldlm_lock_slab
, GFP_NOFS
);
441 spin_lock_init(&lock
->l_lock
);
442 lock
->l_resource
= resource
;
443 lu_ref_add(&resource
->lr_reference
, "lock", lock
);
445 atomic_set(&lock
->l_refc
, 2);
446 INIT_LIST_HEAD(&lock
->l_res_link
);
447 INIT_LIST_HEAD(&lock
->l_lru
);
448 INIT_LIST_HEAD(&lock
->l_pending_chain
);
449 INIT_LIST_HEAD(&lock
->l_bl_ast
);
450 INIT_LIST_HEAD(&lock
->l_cp_ast
);
451 INIT_LIST_HEAD(&lock
->l_rk_ast
);
452 init_waitqueue_head(&lock
->l_waitq
);
453 lock
->l_blocking_lock
= NULL
;
454 INIT_LIST_HEAD(&lock
->l_sl_mode
);
455 INIT_LIST_HEAD(&lock
->l_sl_policy
);
456 INIT_HLIST_NODE(&lock
->l_exp_hash
);
457 INIT_HLIST_NODE(&lock
->l_exp_flock_hash
);
459 lprocfs_counter_incr(ldlm_res_to_ns(resource
)->ns_stats
,
461 INIT_LIST_HEAD(&lock
->l_handle
.h_link
);
462 class_handle_hash(&lock
->l_handle
, &lock_handle_ops
);
464 lu_ref_init(&lock
->l_reference
);
465 lu_ref_add(&lock
->l_reference
, "hash", lock
);
466 lock
->l_callback_timeout
= 0;
468 #if LUSTRE_TRACKS_LOCK_EXP_REFS
469 INIT_LIST_HEAD(&lock
->l_exp_refs_link
);
470 lock
->l_exp_refs_nr
= 0;
471 lock
->l_exp_refs_target
= NULL
;
473 INIT_LIST_HEAD(&lock
->l_exp_list
);
479 * Moves LDLM lock \a lock to another resource.
480 * This is used on client when server returns some other lock than requested
481 * (typically as a result of intent operation)
483 int ldlm_lock_change_resource(struct ldlm_namespace
*ns
, struct ldlm_lock
*lock
,
484 const struct ldlm_res_id
*new_resid
)
486 struct ldlm_resource
*oldres
= lock
->l_resource
;
487 struct ldlm_resource
*newres
;
490 LASSERT(ns_is_client(ns
));
492 lock_res_and_lock(lock
);
493 if (memcmp(new_resid
, &lock
->l_resource
->lr_name
,
494 sizeof(lock
->l_resource
->lr_name
)) == 0) {
496 unlock_res_and_lock(lock
);
500 LASSERT(new_resid
->name
[0] != 0);
502 /* This function assumes that the lock isn't on any lists */
503 LASSERT(list_empty(&lock
->l_res_link
));
505 type
= oldres
->lr_type
;
506 unlock_res_and_lock(lock
);
508 newres
= ldlm_resource_get(ns
, NULL
, new_resid
, type
, 1);
512 lu_ref_add(&newres
->lr_reference
, "lock", lock
);
514 * To flip the lock from the old to the new resource, lock, oldres and
515 * newres have to be locked. Resource spin-locks are nested within
516 * lock->l_lock, and are taken in the memory address order to avoid
519 spin_lock(&lock
->l_lock
);
520 oldres
= lock
->l_resource
;
521 if (oldres
< newres
) {
523 lock_res_nested(newres
, LRT_NEW
);
526 lock_res_nested(oldres
, LRT_NEW
);
528 LASSERT(memcmp(new_resid
, &oldres
->lr_name
,
529 sizeof(oldres
->lr_name
)) != 0);
530 lock
->l_resource
= newres
;
532 unlock_res_and_lock(lock
);
534 /* ...and the flowers are still standing! */
535 lu_ref_del(&oldres
->lr_reference
, "lock", lock
);
536 ldlm_resource_putref(oldres
);
540 EXPORT_SYMBOL(ldlm_lock_change_resource
);
542 /** \defgroup ldlm_handles LDLM HANDLES
543 * Ways to get hold of locks without any addresses.
548 * Fills in handle for LDLM lock \a lock into supplied \a lockh
549 * Does not take any references.
551 void ldlm_lock2handle(const struct ldlm_lock
*lock
, struct lustre_handle
*lockh
)
553 lockh
->cookie
= lock
->l_handle
.h_cookie
;
555 EXPORT_SYMBOL(ldlm_lock2handle
);
558 * Obtain a lock reference by handle.
560 * if \a flags: atomically get the lock and set the flags.
561 * Return NULL if flag already set
563 struct ldlm_lock
*__ldlm_handle2lock(const struct lustre_handle
*handle
,
566 struct ldlm_lock
*lock
;
570 lock
= class_handle2object(handle
->cookie
);
574 /* It's unlikely but possible that someone marked the lock as
575 * destroyed after we did handle2object on it */
576 if (flags
== 0 && ((lock
->l_flags
& LDLM_FL_DESTROYED
) == 0)) {
577 lu_ref_add(&lock
->l_reference
, "handle", current
);
581 lock_res_and_lock(lock
);
583 LASSERT(lock
->l_resource
!= NULL
);
585 lu_ref_add_atomic(&lock
->l_reference
, "handle", current
);
586 if (unlikely(lock
->l_flags
& LDLM_FL_DESTROYED
)) {
587 unlock_res_and_lock(lock
);
588 CDEBUG(D_INFO
, "lock already destroyed: lock %p\n", lock
);
593 if (flags
&& (lock
->l_flags
& flags
)) {
594 unlock_res_and_lock(lock
);
600 lock
->l_flags
|= flags
;
602 unlock_res_and_lock(lock
);
605 EXPORT_SYMBOL(__ldlm_handle2lock
);
606 /** @} ldlm_handles */
609 * Fill in "on the wire" representation for given LDLM lock into supplied
610 * lock descriptor \a desc structure.
612 void ldlm_lock2desc(struct ldlm_lock
*lock
, struct ldlm_lock_desc
*desc
)
614 ldlm_res2desc(lock
->l_resource
, &desc
->l_resource
);
615 desc
->l_req_mode
= lock
->l_req_mode
;
616 desc
->l_granted_mode
= lock
->l_granted_mode
;
617 ldlm_convert_policy_to_wire(lock
->l_resource
->lr_type
,
618 &lock
->l_policy_data
,
619 &desc
->l_policy_data
);
621 EXPORT_SYMBOL(ldlm_lock2desc
);
624 * Add a lock to list of conflicting locks to send AST to.
626 * Only add if we have not sent a blocking AST to the lock yet.
628 void ldlm_add_bl_work_item(struct ldlm_lock
*lock
, struct ldlm_lock
*new,
629 struct list_head
*work_list
)
631 if ((lock
->l_flags
& LDLM_FL_AST_SENT
) == 0) {
632 LDLM_DEBUG(lock
, "lock incompatible; sending blocking AST.");
633 lock
->l_flags
|= LDLM_FL_AST_SENT
;
634 /* If the enqueuing client said so, tell the AST recipient to
635 * discard dirty data, rather than writing back. */
636 if (new->l_flags
& LDLM_FL_AST_DISCARD_DATA
)
637 lock
->l_flags
|= LDLM_FL_DISCARD_DATA
;
638 LASSERT(list_empty(&lock
->l_bl_ast
));
639 list_add(&lock
->l_bl_ast
, work_list
);
641 LASSERT(lock
->l_blocking_lock
== NULL
);
642 lock
->l_blocking_lock
= LDLM_LOCK_GET(new);
647 * Add a lock to list of just granted locks to send completion AST to.
649 void ldlm_add_cp_work_item(struct ldlm_lock
*lock
, struct list_head
*work_list
)
651 if ((lock
->l_flags
& LDLM_FL_CP_REQD
) == 0) {
652 lock
->l_flags
|= LDLM_FL_CP_REQD
;
653 LDLM_DEBUG(lock
, "lock granted; sending completion AST.");
654 LASSERT(list_empty(&lock
->l_cp_ast
));
655 list_add(&lock
->l_cp_ast
, work_list
);
661 * Aggregator function to add AST work items into a list. Determines
662 * what sort of an AST work needs to be done and calls the proper
664 * Must be called with lr_lock held.
666 void ldlm_add_ast_work_item(struct ldlm_lock
*lock
, struct ldlm_lock
*new,
667 struct list_head
*work_list
)
669 check_res_locked(lock
->l_resource
);
671 ldlm_add_bl_work_item(lock
, new, work_list
);
673 ldlm_add_cp_work_item(lock
, work_list
);
677 * Add specified reader/writer reference to LDLM lock with handle \a lockh.
678 * r/w reference type is determined by \a mode
679 * Calls ldlm_lock_addref_internal.
681 void ldlm_lock_addref(struct lustre_handle
*lockh
, __u32 mode
)
683 struct ldlm_lock
*lock
;
685 lock
= ldlm_handle2lock(lockh
);
686 LASSERT(lock
!= NULL
);
687 ldlm_lock_addref_internal(lock
, mode
);
690 EXPORT_SYMBOL(ldlm_lock_addref
);
694 * Add specified reader/writer reference to LDLM lock \a lock.
695 * r/w reference type is determined by \a mode
696 * Removes lock from LRU if it is there.
697 * Assumes the LDLM lock is already locked.
699 void ldlm_lock_addref_internal_nolock(struct ldlm_lock
*lock
, __u32 mode
)
701 ldlm_lock_remove_from_lru(lock
);
702 if (mode
& (LCK_NL
| LCK_CR
| LCK_PR
)) {
704 lu_ref_add_atomic(&lock
->l_reference
, "reader", lock
);
706 if (mode
& (LCK_EX
| LCK_CW
| LCK_PW
| LCK_GROUP
| LCK_COS
)) {
708 lu_ref_add_atomic(&lock
->l_reference
, "writer", lock
);
711 lu_ref_add_atomic(&lock
->l_reference
, "user", lock
);
712 LDLM_DEBUG(lock
, "ldlm_lock_addref(%s)", ldlm_lockname
[mode
]);
716 * Attempts to add reader/writer reference to a lock with handle \a lockh, and
717 * fails if lock is already LDLM_FL_CBPENDING or destroyed.
719 * \retval 0 success, lock was addref-ed
721 * \retval -EAGAIN lock is being canceled.
723 int ldlm_lock_addref_try(struct lustre_handle
*lockh
, __u32 mode
)
725 struct ldlm_lock
*lock
;
729 lock
= ldlm_handle2lock(lockh
);
731 lock_res_and_lock(lock
);
732 if (lock
->l_readers
!= 0 || lock
->l_writers
!= 0 ||
733 !(lock
->l_flags
& LDLM_FL_CBPENDING
)) {
734 ldlm_lock_addref_internal_nolock(lock
, mode
);
737 unlock_res_and_lock(lock
);
742 EXPORT_SYMBOL(ldlm_lock_addref_try
);
745 * Add specified reader/writer reference to LDLM lock \a lock.
746 * Locks LDLM lock and calls ldlm_lock_addref_internal_nolock to do the work.
747 * Only called for local locks.
749 void ldlm_lock_addref_internal(struct ldlm_lock
*lock
, __u32 mode
)
751 lock_res_and_lock(lock
);
752 ldlm_lock_addref_internal_nolock(lock
, mode
);
753 unlock_res_and_lock(lock
);
757 * Removes reader/writer reference for LDLM lock \a lock.
758 * Assumes LDLM lock is already locked.
759 * only called in ldlm_flock_destroy and for local locks.
760 * Does NOT add lock to LRU if no r/w references left to accommodate flock locks
761 * that cannot be placed in LRU.
763 void ldlm_lock_decref_internal_nolock(struct ldlm_lock
*lock
, __u32 mode
)
765 LDLM_DEBUG(lock
, "ldlm_lock_decref(%s)", ldlm_lockname
[mode
]);
766 if (mode
& (LCK_NL
| LCK_CR
| LCK_PR
)) {
767 LASSERT(lock
->l_readers
> 0);
768 lu_ref_del(&lock
->l_reference
, "reader", lock
);
771 if (mode
& (LCK_EX
| LCK_CW
| LCK_PW
| LCK_GROUP
| LCK_COS
)) {
772 LASSERT(lock
->l_writers
> 0);
773 lu_ref_del(&lock
->l_reference
, "writer", lock
);
777 lu_ref_del(&lock
->l_reference
, "user", lock
);
778 LDLM_LOCK_RELEASE(lock
); /* matches the LDLM_LOCK_GET() in addref */
782 * Removes reader/writer reference for LDLM lock \a lock.
783 * Locks LDLM lock first.
784 * If the lock is determined to be client lock on a client and r/w refcount
785 * drops to zero and the lock is not blocked, the lock is added to LRU lock
787 * For blocked LDLM locks if r/w count drops to zero, blocking_ast is called.
789 void ldlm_lock_decref_internal(struct ldlm_lock
*lock
, __u32 mode
)
791 struct ldlm_namespace
*ns
;
793 lock_res_and_lock(lock
);
795 ns
= ldlm_lock_to_ns(lock
);
797 ldlm_lock_decref_internal_nolock(lock
, mode
);
799 if (lock
->l_flags
& LDLM_FL_LOCAL
&&
800 !lock
->l_readers
&& !lock
->l_writers
) {
801 /* If this is a local lock on a server namespace and this was
802 * the last reference, cancel the lock. */
803 CDEBUG(D_INFO
, "forcing cancel of local lock\n");
804 lock
->l_flags
|= LDLM_FL_CBPENDING
;
807 if (!lock
->l_readers
&& !lock
->l_writers
&&
808 (lock
->l_flags
& LDLM_FL_CBPENDING
)) {
809 /* If we received a blocked AST and this was the last reference,
810 * run the callback. */
811 if ((lock
->l_flags
& LDLM_FL_NS_SRV
) && lock
->l_export
)
812 CERROR("FL_CBPENDING set on non-local lock--just a warning\n");
814 LDLM_DEBUG(lock
, "final decref done on cbpending lock");
816 LDLM_LOCK_GET(lock
); /* dropped by bl thread */
817 ldlm_lock_remove_from_lru(lock
);
818 unlock_res_and_lock(lock
);
820 if (lock
->l_flags
& LDLM_FL_FAIL_LOC
)
821 OBD_RACE(OBD_FAIL_LDLM_CP_BL_RACE
);
823 if ((lock
->l_flags
& LDLM_FL_ATOMIC_CB
) ||
824 ldlm_bl_to_thread_lock(ns
, NULL
, lock
) != 0)
825 ldlm_handle_bl_callback(ns
, NULL
, lock
);
826 } else if (ns_is_client(ns
) &&
827 !lock
->l_readers
&& !lock
->l_writers
&&
828 !(lock
->l_flags
& LDLM_FL_NO_LRU
) &&
829 !(lock
->l_flags
& LDLM_FL_BL_AST
)) {
831 LDLM_DEBUG(lock
, "add lock into lru list");
833 /* If this is a client-side namespace and this was the last
834 * reference, put it on the LRU. */
835 ldlm_lock_add_to_lru(lock
);
836 unlock_res_and_lock(lock
);
838 if (lock
->l_flags
& LDLM_FL_FAIL_LOC
)
839 OBD_RACE(OBD_FAIL_LDLM_CP_BL_RACE
);
841 /* Call ldlm_cancel_lru() only if EARLY_CANCEL and LRU RESIZE
842 * are not supported by the server, otherwise, it is done on
844 if (!exp_connect_cancelset(lock
->l_conn_export
) &&
845 !ns_connect_lru_resize(ns
))
846 ldlm_cancel_lru(ns
, 0, LCF_ASYNC
, 0);
848 LDLM_DEBUG(lock
, "do not add lock into lru list");
849 unlock_res_and_lock(lock
);
854 * Decrease reader/writer refcount for LDLM lock with handle \a lockh
856 void ldlm_lock_decref(struct lustre_handle
*lockh
, __u32 mode
)
858 struct ldlm_lock
*lock
= __ldlm_handle2lock(lockh
, 0);
860 LASSERTF(lock
!= NULL
, "Non-existing lock: %#llx\n", lockh
->cookie
);
861 ldlm_lock_decref_internal(lock
, mode
);
864 EXPORT_SYMBOL(ldlm_lock_decref
);
867 * Decrease reader/writer refcount for LDLM lock with handle
868 * \a lockh and mark it for subsequent cancellation once r/w refcount
869 * drops to zero instead of putting into LRU.
871 * Typical usage is for GROUP locks which we cannot allow to be cached.
873 void ldlm_lock_decref_and_cancel(struct lustre_handle
*lockh
, __u32 mode
)
875 struct ldlm_lock
*lock
= __ldlm_handle2lock(lockh
, 0);
877 LASSERT(lock
!= NULL
);
879 LDLM_DEBUG(lock
, "ldlm_lock_decref(%s)", ldlm_lockname
[mode
]);
880 lock_res_and_lock(lock
);
881 lock
->l_flags
|= LDLM_FL_CBPENDING
;
882 unlock_res_and_lock(lock
);
883 ldlm_lock_decref_internal(lock
, mode
);
886 EXPORT_SYMBOL(ldlm_lock_decref_and_cancel
);
888 struct sl_insert_point
{
889 struct list_head
*res_link
;
890 struct list_head
*mode_link
;
891 struct list_head
*policy_link
;
895 * Finds a position to insert the new lock into granted lock list.
897 * Used for locks eligible for skiplist optimization.
900 * queue [input]: the granted list where search acts on;
901 * req [input]: the lock whose position to be located;
902 * prev [output]: positions within 3 lists to insert @req to
906 * - ldlm_grant_lock_with_skiplist
908 static void search_granted_lock(struct list_head
*queue
,
909 struct ldlm_lock
*req
,
910 struct sl_insert_point
*prev
)
912 struct list_head
*tmp
;
913 struct ldlm_lock
*lock
, *mode_end
, *policy_end
;
915 list_for_each(tmp
, queue
) {
916 lock
= list_entry(tmp
, struct ldlm_lock
, l_res_link
);
918 mode_end
= list_entry(lock
->l_sl_mode
.prev
,
919 struct ldlm_lock
, l_sl_mode
);
921 if (lock
->l_req_mode
!= req
->l_req_mode
) {
922 /* jump to last lock of mode group */
923 tmp
= &mode_end
->l_res_link
;
927 /* suitable mode group is found */
928 if (lock
->l_resource
->lr_type
== LDLM_PLAIN
) {
929 /* insert point is last lock of the mode group */
930 prev
->res_link
= &mode_end
->l_res_link
;
931 prev
->mode_link
= &mode_end
->l_sl_mode
;
932 prev
->policy_link
= &req
->l_sl_policy
;
934 } else if (lock
->l_resource
->lr_type
== LDLM_IBITS
) {
937 list_entry(lock
->l_sl_policy
.prev
,
941 if (lock
->l_policy_data
.l_inodebits
.bits
==
942 req
->l_policy_data
.l_inodebits
.bits
) {
943 /* insert point is last lock of
944 * the policy group */
946 &policy_end
->l_res_link
;
948 &policy_end
->l_sl_mode
;
950 &policy_end
->l_sl_policy
;
954 if (policy_end
== mode_end
)
955 /* done with mode group */
958 /* go to next policy group within mode group */
959 tmp
= policy_end
->l_res_link
.next
;
960 lock
= list_entry(tmp
, struct ldlm_lock
,
962 } /* loop over policy groups within the mode group */
964 /* insert point is last lock of the mode group,
965 * new policy group is started */
966 prev
->res_link
= &mode_end
->l_res_link
;
967 prev
->mode_link
= &mode_end
->l_sl_mode
;
968 prev
->policy_link
= &req
->l_sl_policy
;
972 "is not LDLM_PLAIN or LDLM_IBITS lock");
977 /* insert point is last lock on the queue,
978 * new mode group and new policy group are started */
979 prev
->res_link
= queue
->prev
;
980 prev
->mode_link
= &req
->l_sl_mode
;
981 prev
->policy_link
= &req
->l_sl_policy
;
985 * Add a lock into resource granted list after a position described by
988 static void ldlm_granted_list_add_lock(struct ldlm_lock
*lock
,
989 struct sl_insert_point
*prev
)
991 struct ldlm_resource
*res
= lock
->l_resource
;
993 check_res_locked(res
);
995 ldlm_resource_dump(D_INFO
, res
);
996 LDLM_DEBUG(lock
, "About to add lock:");
998 if (lock
->l_flags
& LDLM_FL_DESTROYED
) {
999 CDEBUG(D_OTHER
, "Lock destroyed, not adding to resource\n");
1003 LASSERT(list_empty(&lock
->l_res_link
));
1004 LASSERT(list_empty(&lock
->l_sl_mode
));
1005 LASSERT(list_empty(&lock
->l_sl_policy
));
1008 * lock->link == prev->link means lock is first starting the group.
1009 * Don't re-add to itself to suppress kernel warnings.
1011 if (&lock
->l_res_link
!= prev
->res_link
)
1012 list_add(&lock
->l_res_link
, prev
->res_link
);
1013 if (&lock
->l_sl_mode
!= prev
->mode_link
)
1014 list_add(&lock
->l_sl_mode
, prev
->mode_link
);
1015 if (&lock
->l_sl_policy
!= prev
->policy_link
)
1016 list_add(&lock
->l_sl_policy
, prev
->policy_link
);
1020 * Add a lock to granted list on a resource maintaining skiplist
1023 static void ldlm_grant_lock_with_skiplist(struct ldlm_lock
*lock
)
1025 struct sl_insert_point prev
;
1027 LASSERT(lock
->l_req_mode
== lock
->l_granted_mode
);
1029 search_granted_lock(&lock
->l_resource
->lr_granted
, lock
, &prev
);
1030 ldlm_granted_list_add_lock(lock
, &prev
);
1034 * Perform lock granting bookkeeping.
1036 * Includes putting the lock into granted list and updating lock mode.
1038 * - ldlm_lock_enqueue
1039 * - ldlm_reprocess_queue
1040 * - ldlm_lock_convert
1042 * must be called with lr_lock held
1044 void ldlm_grant_lock(struct ldlm_lock
*lock
, struct list_head
*work_list
)
1046 struct ldlm_resource
*res
= lock
->l_resource
;
1048 check_res_locked(res
);
1050 lock
->l_granted_mode
= lock
->l_req_mode
;
1051 if (res
->lr_type
== LDLM_PLAIN
|| res
->lr_type
== LDLM_IBITS
)
1052 ldlm_grant_lock_with_skiplist(lock
);
1053 else if (res
->lr_type
== LDLM_EXTENT
)
1054 ldlm_extent_add_lock(res
, lock
);
1056 ldlm_resource_add_lock(res
, &res
->lr_granted
, lock
);
1058 if (lock
->l_granted_mode
< res
->lr_most_restr
)
1059 res
->lr_most_restr
= lock
->l_granted_mode
;
1061 if (work_list
&& lock
->l_completion_ast
!= NULL
)
1062 ldlm_add_ast_work_item(lock
, NULL
, work_list
);
1064 ldlm_pool_add(&ldlm_res_to_ns(res
)->ns_pool
, lock
);
1068 * Search for a lock with given properties in a queue.
1070 * \retval a referenced lock or NULL. See the flag descriptions below, in the
1071 * comment above ldlm_lock_match
1073 static struct ldlm_lock
*search_queue(struct list_head
*queue
,
1075 ldlm_policy_data_t
*policy
,
1076 struct ldlm_lock
*old_lock
,
1077 __u64 flags
, int unref
)
1079 struct ldlm_lock
*lock
;
1080 struct list_head
*tmp
;
1082 list_for_each(tmp
, queue
) {
1085 lock
= list_entry(tmp
, struct ldlm_lock
, l_res_link
);
1087 if (lock
== old_lock
)
1090 /* Check if this lock can be matched.
1091 * Used by LU-2919(exclusive open) for open lease lock */
1092 if (ldlm_is_excl(lock
))
1095 /* llite sometimes wants to match locks that will be
1096 * canceled when their users drop, but we allow it to match
1097 * if it passes in CBPENDING and the lock still has users.
1098 * this is generally only going to be used by children
1099 * whose parents already hold a lock so forward progress
1100 * can still happen. */
1101 if (lock
->l_flags
& LDLM_FL_CBPENDING
&&
1102 !(flags
& LDLM_FL_CBPENDING
))
1104 if (!unref
&& lock
->l_flags
& LDLM_FL_CBPENDING
&&
1105 lock
->l_readers
== 0 && lock
->l_writers
== 0)
1108 if (!(lock
->l_req_mode
& *mode
))
1110 match
= lock
->l_req_mode
;
1112 if (lock
->l_resource
->lr_type
== LDLM_EXTENT
&&
1113 (lock
->l_policy_data
.l_extent
.start
>
1114 policy
->l_extent
.start
||
1115 lock
->l_policy_data
.l_extent
.end
< policy
->l_extent
.end
))
1118 if (unlikely(match
== LCK_GROUP
) &&
1119 lock
->l_resource
->lr_type
== LDLM_EXTENT
&&
1120 lock
->l_policy_data
.l_extent
.gid
!= policy
->l_extent
.gid
)
1123 /* We match if we have existing lock with same or wider set
1125 if (lock
->l_resource
->lr_type
== LDLM_IBITS
&&
1126 ((lock
->l_policy_data
.l_inodebits
.bits
&
1127 policy
->l_inodebits
.bits
) !=
1128 policy
->l_inodebits
.bits
))
1131 if (!unref
&& (lock
->l_flags
& LDLM_FL_GONE_MASK
))
1134 if ((flags
& LDLM_FL_LOCAL_ONLY
) &&
1135 !(lock
->l_flags
& LDLM_FL_LOCAL
))
1138 if (flags
& LDLM_FL_TEST_LOCK
) {
1139 LDLM_LOCK_GET(lock
);
1140 ldlm_lock_touch_in_lru(lock
);
1142 ldlm_lock_addref_internal_nolock(lock
, match
);
1151 void ldlm_lock_fail_match_locked(struct ldlm_lock
*lock
)
1153 if ((lock
->l_flags
& LDLM_FL_FAIL_NOTIFIED
) == 0) {
1154 lock
->l_flags
|= LDLM_FL_FAIL_NOTIFIED
;
1155 wake_up_all(&lock
->l_waitq
);
1158 EXPORT_SYMBOL(ldlm_lock_fail_match_locked
);
1160 void ldlm_lock_fail_match(struct ldlm_lock
*lock
)
1162 lock_res_and_lock(lock
);
1163 ldlm_lock_fail_match_locked(lock
);
1164 unlock_res_and_lock(lock
);
1166 EXPORT_SYMBOL(ldlm_lock_fail_match
);
1169 * Mark lock as "matchable" by OST.
1171 * Used to prevent certain races in LOV/OSC where the lock is granted, but LVB
1173 * Assumes LDLM lock is already locked.
1175 void ldlm_lock_allow_match_locked(struct ldlm_lock
*lock
)
1177 lock
->l_flags
|= LDLM_FL_LVB_READY
;
1178 wake_up_all(&lock
->l_waitq
);
1180 EXPORT_SYMBOL(ldlm_lock_allow_match_locked
);
1183 * Mark lock as "matchable" by OST.
1184 * Locks the lock and then \see ldlm_lock_allow_match_locked
1186 void ldlm_lock_allow_match(struct ldlm_lock
*lock
)
1188 lock_res_and_lock(lock
);
1189 ldlm_lock_allow_match_locked(lock
);
1190 unlock_res_and_lock(lock
);
1192 EXPORT_SYMBOL(ldlm_lock_allow_match
);
1195 * Attempt to find a lock with specified properties.
1197 * Typically returns a reference to matched lock unless LDLM_FL_TEST_LOCK is
1200 * Can be called in two ways:
1202 * If 'ns' is NULL, then lockh describes an existing lock that we want to look
1203 * for a duplicate of.
1205 * Otherwise, all of the fields must be filled in, to match against.
1207 * If 'flags' contains LDLM_FL_LOCAL_ONLY, then only match local locks on the
1208 * server (ie, connh is NULL)
1209 * If 'flags' contains LDLM_FL_BLOCK_GRANTED, then only locks on the granted
1210 * list will be considered
1211 * If 'flags' contains LDLM_FL_CBPENDING, then locks that have been marked
1212 * to be canceled can still be matched as long as they still have reader
1213 * or writer referneces
1214 * If 'flags' contains LDLM_FL_TEST_LOCK, then don't actually reference a lock,
1215 * just tell us if we would have matched.
1217 * \retval 1 if it finds an already-existing lock that is compatible; in this
1218 * case, lockh is filled in with a addref()ed lock
1220 * We also check security context, and if that fails we simply return 0 (to
1221 * keep caller code unchanged), the context failure will be discovered by
1222 * caller sometime later.
1224 ldlm_mode_t
ldlm_lock_match(struct ldlm_namespace
*ns
, __u64 flags
,
1225 const struct ldlm_res_id
*res_id
, ldlm_type_t type
,
1226 ldlm_policy_data_t
*policy
, ldlm_mode_t mode
,
1227 struct lustre_handle
*lockh
, int unref
)
1229 struct ldlm_resource
*res
;
1230 struct ldlm_lock
*lock
, *old_lock
= NULL
;
1234 old_lock
= ldlm_handle2lock(lockh
);
1237 ns
= ldlm_lock_to_ns(old_lock
);
1238 res_id
= &old_lock
->l_resource
->lr_name
;
1239 type
= old_lock
->l_resource
->lr_type
;
1240 mode
= old_lock
->l_req_mode
;
1243 res
= ldlm_resource_get(ns
, NULL
, res_id
, type
, 0);
1245 LASSERT(old_lock
== NULL
);
1249 LDLM_RESOURCE_ADDREF(res
);
1252 lock
= search_queue(&res
->lr_granted
, &mode
, policy
, old_lock
,
1258 if (flags
& LDLM_FL_BLOCK_GRANTED
) {
1262 lock
= search_queue(&res
->lr_converting
, &mode
, policy
, old_lock
,
1268 lock
= search_queue(&res
->lr_waiting
, &mode
, policy
, old_lock
,
1277 LDLM_RESOURCE_DELREF(res
);
1278 ldlm_resource_putref(res
);
1281 ldlm_lock2handle(lock
, lockh
);
1282 if ((flags
& LDLM_FL_LVB_READY
) &&
1283 (!(lock
->l_flags
& LDLM_FL_LVB_READY
))) {
1284 __u64 wait_flags
= LDLM_FL_LVB_READY
|
1285 LDLM_FL_DESTROYED
| LDLM_FL_FAIL_NOTIFIED
;
1286 struct l_wait_info lwi
;
1288 if (lock
->l_completion_ast
) {
1289 int err
= lock
->l_completion_ast(lock
,
1290 LDLM_FL_WAIT_NOREPROC
,
1293 if (flags
& LDLM_FL_TEST_LOCK
)
1294 LDLM_LOCK_RELEASE(lock
);
1296 ldlm_lock_decref_internal(lock
,
1303 lwi
= LWI_TIMEOUT_INTR(cfs_time_seconds(obd_timeout
),
1304 NULL
, LWI_ON_SIGNAL_NOOP
, NULL
);
1306 /* XXX FIXME see comment on CAN_MATCH in lustre_dlm.h */
1307 l_wait_event(lock
->l_waitq
,
1308 lock
->l_flags
& wait_flags
,
1310 if (!(lock
->l_flags
& LDLM_FL_LVB_READY
)) {
1311 if (flags
& LDLM_FL_TEST_LOCK
)
1312 LDLM_LOCK_RELEASE(lock
);
1314 ldlm_lock_decref_internal(lock
, mode
);
1321 LDLM_DEBUG(lock
, "matched (%llu %llu)",
1322 (type
== LDLM_PLAIN
|| type
== LDLM_IBITS
) ?
1323 res_id
->name
[2] : policy
->l_extent
.start
,
1324 (type
== LDLM_PLAIN
|| type
== LDLM_IBITS
) ?
1325 res_id
->name
[3] : policy
->l_extent
.end
);
1327 /* check user's security context */
1328 if (lock
->l_conn_export
&&
1329 sptlrpc_import_check_ctx(
1330 class_exp2cliimp(lock
->l_conn_export
))) {
1331 if (!(flags
& LDLM_FL_TEST_LOCK
))
1332 ldlm_lock_decref_internal(lock
, mode
);
1336 if (flags
& LDLM_FL_TEST_LOCK
)
1337 LDLM_LOCK_RELEASE(lock
);
1339 } else if (!(flags
& LDLM_FL_TEST_LOCK
)) {/*less verbose for test-only*/
1340 LDLM_DEBUG_NOLOCK("not matched ns %p type %u mode %u res %llu/%llu (%llu %llu)",
1341 ns
, type
, mode
, res_id
->name
[0],
1343 (type
== LDLM_PLAIN
|| type
== LDLM_IBITS
) ?
1344 res_id
->name
[2] : policy
->l_extent
.start
,
1345 (type
== LDLM_PLAIN
|| type
== LDLM_IBITS
) ?
1346 res_id
->name
[3] : policy
->l_extent
.end
);
1349 LDLM_LOCK_PUT(old_lock
);
1351 return rc
? mode
: 0;
1353 EXPORT_SYMBOL(ldlm_lock_match
);
1355 ldlm_mode_t
ldlm_revalidate_lock_handle(struct lustre_handle
*lockh
,
1358 struct ldlm_lock
*lock
;
1359 ldlm_mode_t mode
= 0;
1361 lock
= ldlm_handle2lock(lockh
);
1363 lock_res_and_lock(lock
);
1364 if (lock
->l_flags
& LDLM_FL_GONE_MASK
)
1367 if (lock
->l_flags
& LDLM_FL_CBPENDING
&&
1368 lock
->l_readers
== 0 && lock
->l_writers
== 0)
1372 *bits
= lock
->l_policy_data
.l_inodebits
.bits
;
1373 mode
= lock
->l_granted_mode
;
1374 ldlm_lock_addref_internal_nolock(lock
, mode
);
1379 unlock_res_and_lock(lock
);
1380 LDLM_LOCK_PUT(lock
);
1384 EXPORT_SYMBOL(ldlm_revalidate_lock_handle
);
1386 /** The caller must guarantee that the buffer is large enough. */
1387 int ldlm_fill_lvb(struct ldlm_lock
*lock
, struct req_capsule
*pill
,
1388 enum req_location loc
, void *data
, int size
)
1392 LASSERT(data
!= NULL
);
1395 switch (lock
->l_lvb_type
) {
1397 if (size
== sizeof(struct ost_lvb
)) {
1398 if (loc
== RCL_CLIENT
)
1399 lvb
= req_capsule_client_swab_get(pill
,
1401 lustre_swab_ost_lvb
);
1403 lvb
= req_capsule_server_swab_get(pill
,
1405 lustre_swab_ost_lvb
);
1406 if (unlikely(lvb
== NULL
)) {
1407 LDLM_ERROR(lock
, "no LVB");
1411 memcpy(data
, lvb
, size
);
1412 } else if (size
== sizeof(struct ost_lvb_v1
)) {
1413 struct ost_lvb
*olvb
= data
;
1415 if (loc
== RCL_CLIENT
)
1416 lvb
= req_capsule_client_swab_get(pill
,
1418 lustre_swab_ost_lvb_v1
);
1420 lvb
= req_capsule_server_sized_swab_get(pill
,
1422 lustre_swab_ost_lvb_v1
);
1423 if (unlikely(lvb
== NULL
)) {
1424 LDLM_ERROR(lock
, "no LVB");
1428 memcpy(data
, lvb
, size
);
1429 olvb
->lvb_mtime_ns
= 0;
1430 olvb
->lvb_atime_ns
= 0;
1431 olvb
->lvb_ctime_ns
= 0;
1433 LDLM_ERROR(lock
, "Replied unexpected ost LVB size %d",
1439 if (size
== sizeof(struct lquota_lvb
)) {
1440 if (loc
== RCL_CLIENT
)
1441 lvb
= req_capsule_client_swab_get(pill
,
1443 lustre_swab_lquota_lvb
);
1445 lvb
= req_capsule_server_swab_get(pill
,
1447 lustre_swab_lquota_lvb
);
1448 if (unlikely(lvb
== NULL
)) {
1449 LDLM_ERROR(lock
, "no LVB");
1453 memcpy(data
, lvb
, size
);
1456 "Replied unexpected lquota LVB size %d",
1465 if (loc
== RCL_CLIENT
)
1466 lvb
= req_capsule_client_get(pill
, &RMF_DLM_LVB
);
1468 lvb
= req_capsule_server_get(pill
, &RMF_DLM_LVB
);
1469 if (unlikely(lvb
== NULL
)) {
1470 LDLM_ERROR(lock
, "no LVB");
1474 memcpy(data
, lvb
, size
);
1477 LDLM_ERROR(lock
, "Unknown LVB type: %d\n", lock
->l_lvb_type
);
1486 * Create and fill in new LDLM lock with specified properties.
1487 * Returns a referenced lock
1489 struct ldlm_lock
*ldlm_lock_create(struct ldlm_namespace
*ns
,
1490 const struct ldlm_res_id
*res_id
,
1493 const struct ldlm_callback_suite
*cbs
,
1494 void *data
, __u32 lvb_len
,
1495 enum lvb_type lvb_type
)
1497 struct ldlm_lock
*lock
;
1498 struct ldlm_resource
*res
;
1500 res
= ldlm_resource_get(ns
, NULL
, res_id
, type
, 1);
1504 lock
= ldlm_lock_new(res
);
1509 lock
->l_req_mode
= mode
;
1510 lock
->l_ast_data
= data
;
1511 lock
->l_pid
= current_pid();
1512 if (ns_is_server(ns
))
1513 lock
->l_flags
|= LDLM_FL_NS_SRV
;
1515 lock
->l_blocking_ast
= cbs
->lcs_blocking
;
1516 lock
->l_completion_ast
= cbs
->lcs_completion
;
1517 lock
->l_glimpse_ast
= cbs
->lcs_glimpse
;
1520 lock
->l_tree_node
= NULL
;
1521 /* if this is the extent lock, allocate the interval tree node */
1522 if (type
== LDLM_EXTENT
) {
1523 if (ldlm_interval_alloc(lock
) == NULL
)
1528 lock
->l_lvb_len
= lvb_len
;
1529 lock
->l_lvb_data
= kzalloc(lvb_len
, GFP_NOFS
);
1530 if (lock
->l_lvb_data
== NULL
)
1534 lock
->l_lvb_type
= lvb_type
;
1535 if (OBD_FAIL_CHECK(OBD_FAIL_LDLM_NEW_LOCK
))
1541 ldlm_lock_destroy(lock
);
1542 LDLM_LOCK_RELEASE(lock
);
1547 * Enqueue (request) a lock.
1549 * Does not block. As a result of enqueue the lock would be put
1550 * into granted or waiting list.
1552 * If namespace has intent policy sent and the lock has LDLM_FL_HAS_INTENT flag
1553 * set, skip all the enqueueing and delegate lock processing to intent policy
1556 ldlm_error_t
ldlm_lock_enqueue(struct ldlm_namespace
*ns
,
1557 struct ldlm_lock
**lockp
,
1558 void *cookie
, __u64
*flags
)
1560 struct ldlm_lock
*lock
= *lockp
;
1561 struct ldlm_resource
*res
= lock
->l_resource
;
1562 int local
= ns_is_client(ldlm_res_to_ns(res
));
1563 ldlm_error_t rc
= ELDLM_OK
;
1564 struct ldlm_interval
*node
= NULL
;
1566 lock
->l_last_activity
= get_seconds();
1567 /* policies are not executed on the client or during replay */
1568 if ((*flags
& (LDLM_FL_HAS_INTENT
|LDLM_FL_REPLAY
)) == LDLM_FL_HAS_INTENT
1569 && !local
&& ns
->ns_policy
) {
1570 rc
= ns
->ns_policy(ns
, lockp
, cookie
, lock
->l_req_mode
, *flags
,
1572 if (rc
== ELDLM_LOCK_REPLACED
) {
1573 /* The lock that was returned has already been granted,
1574 * and placed into lockp. If it's not the same as the
1575 * one we passed in, then destroy the old one and our
1576 * work here is done. */
1577 if (lock
!= *lockp
) {
1578 ldlm_lock_destroy(lock
);
1579 LDLM_LOCK_RELEASE(lock
);
1581 *flags
|= LDLM_FL_LOCK_CHANGED
;
1583 } else if (rc
!= ELDLM_OK
||
1584 (rc
== ELDLM_OK
&& (*flags
& LDLM_FL_INTENT_ONLY
))) {
1585 ldlm_lock_destroy(lock
);
1590 /* For a replaying lock, it might be already in granted list. So
1591 * unlinking the lock will cause the interval node to be freed, we
1592 * have to allocate the interval node early otherwise we can't regrant
1593 * this lock in the future. - jay */
1594 if (!local
&& (*flags
& LDLM_FL_REPLAY
) && res
->lr_type
== LDLM_EXTENT
)
1595 OBD_SLAB_ALLOC_PTR_GFP(node
, ldlm_interval_slab
, GFP_NOFS
);
1597 lock_res_and_lock(lock
);
1598 if (local
&& lock
->l_req_mode
== lock
->l_granted_mode
) {
1599 /* The server returned a blocked lock, but it was granted
1600 * before we got a chance to actually enqueue it. We don't
1601 * need to do anything else. */
1602 *flags
&= ~(LDLM_FL_BLOCK_GRANTED
|
1603 LDLM_FL_BLOCK_CONV
| LDLM_FL_BLOCK_WAIT
);
1607 ldlm_resource_unlink_lock(lock
);
1608 if (res
->lr_type
== LDLM_EXTENT
&& lock
->l_tree_node
== NULL
) {
1610 ldlm_lock_destroy_nolock(lock
);
1615 INIT_LIST_HEAD(&node
->li_group
);
1616 ldlm_interval_attach(node
, lock
);
1620 /* Some flags from the enqueue want to make it into the AST, via the
1621 * lock's l_flags. */
1622 lock
->l_flags
|= *flags
& LDLM_FL_AST_DISCARD_DATA
;
1624 /* This distinction between local lock trees is very important; a client
1625 * namespace only has information about locks taken by that client, and
1626 * thus doesn't have enough information to decide for itself if it can
1627 * be granted (below). In this case, we do exactly what the server
1628 * tells us to do, as dictated by the 'flags'.
1630 * We do exactly the same thing during recovery, when the server is
1631 * more or less trusting the clients not to lie.
1633 * FIXME (bug 268): Detect obvious lies by checking compatibility in
1634 * granted/converting queues. */
1636 if (*flags
& LDLM_FL_BLOCK_CONV
)
1637 ldlm_resource_add_lock(res
, &res
->lr_converting
, lock
);
1638 else if (*flags
& (LDLM_FL_BLOCK_WAIT
| LDLM_FL_BLOCK_GRANTED
))
1639 ldlm_resource_add_lock(res
, &res
->lr_waiting
, lock
);
1641 ldlm_grant_lock(lock
, NULL
);
1644 CERROR("This is client-side-only module, cannot handle LDLM_NAMESPACE_SERVER resource type lock.\n");
1649 unlock_res_and_lock(lock
);
1651 OBD_SLAB_FREE(node
, ldlm_interval_slab
, sizeof(*node
));
1657 * Process a call to blocking AST callback for a lock in ast_work list
1660 ldlm_work_bl_ast_lock(struct ptlrpc_request_set
*rqset
, void *opaq
)
1662 struct ldlm_cb_set_arg
*arg
= opaq
;
1663 struct ldlm_lock_desc d
;
1665 struct ldlm_lock
*lock
;
1667 if (list_empty(arg
->list
))
1670 lock
= list_entry(arg
->list
->next
, struct ldlm_lock
, l_bl_ast
);
1672 /* nobody should touch l_bl_ast */
1673 lock_res_and_lock(lock
);
1674 list_del_init(&lock
->l_bl_ast
);
1676 LASSERT(lock
->l_flags
& LDLM_FL_AST_SENT
);
1677 LASSERT(lock
->l_bl_ast_run
== 0);
1678 LASSERT(lock
->l_blocking_lock
);
1679 lock
->l_bl_ast_run
++;
1680 unlock_res_and_lock(lock
);
1682 ldlm_lock2desc(lock
->l_blocking_lock
, &d
);
1684 rc
= lock
->l_blocking_ast(lock
, &d
, (void *)arg
, LDLM_CB_BLOCKING
);
1685 LDLM_LOCK_RELEASE(lock
->l_blocking_lock
);
1686 lock
->l_blocking_lock
= NULL
;
1687 LDLM_LOCK_RELEASE(lock
);
1693 * Process a call to completion AST callback for a lock in ast_work list
1696 ldlm_work_cp_ast_lock(struct ptlrpc_request_set
*rqset
, void *opaq
)
1698 struct ldlm_cb_set_arg
*arg
= opaq
;
1700 struct ldlm_lock
*lock
;
1701 ldlm_completion_callback completion_callback
;
1703 if (list_empty(arg
->list
))
1706 lock
= list_entry(arg
->list
->next
, struct ldlm_lock
, l_cp_ast
);
1708 /* It's possible to receive a completion AST before we've set
1709 * the l_completion_ast pointer: either because the AST arrived
1710 * before the reply, or simply because there's a small race
1711 * window between receiving the reply and finishing the local
1712 * enqueue. (bug 842)
1714 * This can't happen with the blocking_ast, however, because we
1715 * will never call the local blocking_ast until we drop our
1716 * reader/writer reference, which we won't do until we get the
1717 * reply and finish enqueueing. */
1719 /* nobody should touch l_cp_ast */
1720 lock_res_and_lock(lock
);
1721 list_del_init(&lock
->l_cp_ast
);
1722 LASSERT(lock
->l_flags
& LDLM_FL_CP_REQD
);
1723 /* save l_completion_ast since it can be changed by
1724 * mds_intent_policy(), see bug 14225 */
1725 completion_callback
= lock
->l_completion_ast
;
1726 lock
->l_flags
&= ~LDLM_FL_CP_REQD
;
1727 unlock_res_and_lock(lock
);
1729 if (completion_callback
!= NULL
)
1730 rc
= completion_callback(lock
, 0, (void *)arg
);
1731 LDLM_LOCK_RELEASE(lock
);
1737 * Process a call to revocation AST callback for a lock in ast_work list
1740 ldlm_work_revoke_ast_lock(struct ptlrpc_request_set
*rqset
, void *opaq
)
1742 struct ldlm_cb_set_arg
*arg
= opaq
;
1743 struct ldlm_lock_desc desc
;
1745 struct ldlm_lock
*lock
;
1747 if (list_empty(arg
->list
))
1750 lock
= list_entry(arg
->list
->next
, struct ldlm_lock
, l_rk_ast
);
1751 list_del_init(&lock
->l_rk_ast
);
1753 /* the desc just pretend to exclusive */
1754 ldlm_lock2desc(lock
, &desc
);
1755 desc
.l_req_mode
= LCK_EX
;
1756 desc
.l_granted_mode
= 0;
1758 rc
= lock
->l_blocking_ast(lock
, &desc
, (void *)arg
, LDLM_CB_BLOCKING
);
1759 LDLM_LOCK_RELEASE(lock
);
1765 * Process a call to glimpse AST callback for a lock in ast_work list
1767 int ldlm_work_gl_ast_lock(struct ptlrpc_request_set
*rqset
, void *opaq
)
1769 struct ldlm_cb_set_arg
*arg
= opaq
;
1770 struct ldlm_glimpse_work
*gl_work
;
1771 struct ldlm_lock
*lock
;
1774 if (list_empty(arg
->list
))
1777 gl_work
= list_entry(arg
->list
->next
, struct ldlm_glimpse_work
,
1779 list_del_init(&gl_work
->gl_list
);
1781 lock
= gl_work
->gl_lock
;
1783 /* transfer the glimpse descriptor to ldlm_cb_set_arg */
1784 arg
->gl_desc
= gl_work
->gl_desc
;
1786 /* invoke the actual glimpse callback */
1787 if (lock
->l_glimpse_ast(lock
, (void *)arg
) == 0)
1790 LDLM_LOCK_RELEASE(lock
);
1792 if ((gl_work
->gl_flags
& LDLM_GL_WORK_NOFREE
) == 0)
1799 * Process list of locks in need of ASTs being sent.
1801 * Used on server to send multiple ASTs together instead of sending one by
1804 int ldlm_run_ast_work(struct ldlm_namespace
*ns
, struct list_head
*rpc_list
,
1805 enum ldlm_desc_ast_t ast_type
)
1807 struct ldlm_cb_set_arg
*arg
;
1808 set_producer_func work_ast_lock
;
1811 if (list_empty(rpc_list
))
1814 arg
= kzalloc(sizeof(*arg
), GFP_NOFS
);
1818 atomic_set(&arg
->restart
, 0);
1819 arg
->list
= rpc_list
;
1822 case LDLM_WORK_BL_AST
:
1823 arg
->type
= LDLM_BL_CALLBACK
;
1824 work_ast_lock
= ldlm_work_bl_ast_lock
;
1826 case LDLM_WORK_CP_AST
:
1827 arg
->type
= LDLM_CP_CALLBACK
;
1828 work_ast_lock
= ldlm_work_cp_ast_lock
;
1830 case LDLM_WORK_REVOKE_AST
:
1831 arg
->type
= LDLM_BL_CALLBACK
;
1832 work_ast_lock
= ldlm_work_revoke_ast_lock
;
1834 case LDLM_WORK_GL_AST
:
1835 arg
->type
= LDLM_GL_CALLBACK
;
1836 work_ast_lock
= ldlm_work_gl_ast_lock
;
1842 /* We create a ptlrpc request set with flow control extension.
1843 * This request set will use the work_ast_lock function to produce new
1844 * requests and will send a new request each time one completes in order
1845 * to keep the number of requests in flight to ns_max_parallel_ast */
1846 arg
->set
= ptlrpc_prep_fcset(ns
->ns_max_parallel_ast
? : UINT_MAX
,
1847 work_ast_lock
, arg
);
1848 if (arg
->set
== NULL
) {
1853 ptlrpc_set_wait(arg
->set
);
1854 ptlrpc_set_destroy(arg
->set
);
1856 rc
= atomic_read(&arg
->restart
) ? -ERESTART
: 0;
1863 static int reprocess_one_queue(struct ldlm_resource
*res
, void *closure
)
1865 ldlm_reprocess_all(res
);
1866 return LDLM_ITER_CONTINUE
;
1869 static int ldlm_reprocess_res(struct cfs_hash
*hs
, struct cfs_hash_bd
*bd
,
1870 struct hlist_node
*hnode
, void *arg
)
1872 struct ldlm_resource
*res
= cfs_hash_object(hs
, hnode
);
1875 rc
= reprocess_one_queue(res
, arg
);
1877 return rc
== LDLM_ITER_STOP
;
1881 * Iterate through all resources on a namespace attempting to grant waiting
1884 void ldlm_reprocess_all_ns(struct ldlm_namespace
*ns
)
1887 cfs_hash_for_each_nolock(ns
->ns_rs_hash
,
1888 ldlm_reprocess_res
, NULL
);
1891 EXPORT_SYMBOL(ldlm_reprocess_all_ns
);
1894 * Try to grant all waiting locks on a resource.
1896 * Calls ldlm_reprocess_queue on converting and waiting queues.
1898 * Typically called after some resource locks are cancelled to see
1899 * if anything could be granted as a result of the cancellation.
1901 void ldlm_reprocess_all(struct ldlm_resource
*res
)
1903 LIST_HEAD(rpc_list
);
1905 if (!ns_is_client(ldlm_res_to_ns(res
))) {
1906 CERROR("This is client-side-only module, cannot handle LDLM_NAMESPACE_SERVER resource type lock.\n");
1912 * Helper function to call blocking AST for LDLM lock \a lock in a
1913 * "cancelling" mode.
1915 void ldlm_cancel_callback(struct ldlm_lock
*lock
)
1917 check_res_locked(lock
->l_resource
);
1918 if (!(lock
->l_flags
& LDLM_FL_CANCEL
)) {
1919 lock
->l_flags
|= LDLM_FL_CANCEL
;
1920 if (lock
->l_blocking_ast
) {
1921 unlock_res_and_lock(lock
);
1922 lock
->l_blocking_ast(lock
, NULL
, lock
->l_ast_data
,
1924 lock_res_and_lock(lock
);
1926 LDLM_DEBUG(lock
, "no blocking ast");
1929 lock
->l_flags
|= LDLM_FL_BL_DONE
;
1933 * Remove skiplist-enabled LDLM lock \a req from granted list
1935 void ldlm_unlink_lock_skiplist(struct ldlm_lock
*req
)
1937 if (req
->l_resource
->lr_type
!= LDLM_PLAIN
&&
1938 req
->l_resource
->lr_type
!= LDLM_IBITS
)
1941 list_del_init(&req
->l_sl_policy
);
1942 list_del_init(&req
->l_sl_mode
);
1946 * Attempts to cancel LDLM lock \a lock that has no reader/writer references.
1948 void ldlm_lock_cancel(struct ldlm_lock
*lock
)
1950 struct ldlm_resource
*res
;
1951 struct ldlm_namespace
*ns
;
1953 lock_res_and_lock(lock
);
1955 res
= lock
->l_resource
;
1956 ns
= ldlm_res_to_ns(res
);
1958 /* Please do not, no matter how tempting, remove this LBUG without
1959 * talking to me first. -phik */
1960 if (lock
->l_readers
|| lock
->l_writers
) {
1961 LDLM_ERROR(lock
, "lock still has references");
1965 if (lock
->l_flags
& LDLM_FL_WAITED
)
1966 ldlm_del_waiting_lock(lock
);
1968 /* Releases cancel callback. */
1969 ldlm_cancel_callback(lock
);
1971 /* Yes, second time, just in case it was added again while we were
1972 * running with no res lock in ldlm_cancel_callback */
1973 if (lock
->l_flags
& LDLM_FL_WAITED
)
1974 ldlm_del_waiting_lock(lock
);
1976 ldlm_resource_unlink_lock(lock
);
1977 ldlm_lock_destroy_nolock(lock
);
1979 if (lock
->l_granted_mode
== lock
->l_req_mode
)
1980 ldlm_pool_del(&ns
->ns_pool
, lock
);
1982 /* Make sure we will not be called again for same lock what is possible
1983 * if not to zero out lock->l_granted_mode */
1984 lock
->l_granted_mode
= LCK_MINMODE
;
1985 unlock_res_and_lock(lock
);
1987 EXPORT_SYMBOL(ldlm_lock_cancel
);
1990 * Set opaque data into the lock that only makes sense to upper layer.
1992 int ldlm_lock_set_data(struct lustre_handle
*lockh
, void *data
)
1994 struct ldlm_lock
*lock
= ldlm_handle2lock(lockh
);
1998 if (lock
->l_ast_data
== NULL
)
1999 lock
->l_ast_data
= data
;
2000 if (lock
->l_ast_data
== data
)
2002 LDLM_LOCK_PUT(lock
);
2006 EXPORT_SYMBOL(ldlm_lock_set_data
);
2008 struct export_cl_data
{
2009 struct obd_export
*ecl_exp
;
2014 * Iterator function for ldlm_cancel_locks_for_export.
2015 * Cancels passed locks.
2017 int ldlm_cancel_locks_for_export_cb(struct cfs_hash
*hs
, struct cfs_hash_bd
*bd
,
2018 struct hlist_node
*hnode
, void *data
)
2021 struct export_cl_data
*ecl
= (struct export_cl_data
*)data
;
2022 struct obd_export
*exp
= ecl
->ecl_exp
;
2023 struct ldlm_lock
*lock
= cfs_hash_object(hs
, hnode
);
2024 struct ldlm_resource
*res
;
2026 res
= ldlm_resource_getref(lock
->l_resource
);
2027 LDLM_LOCK_GET(lock
);
2029 LDLM_DEBUG(lock
, "export %p", exp
);
2030 ldlm_res_lvbo_update(res
, NULL
, 1);
2031 ldlm_lock_cancel(lock
);
2032 ldlm_reprocess_all(res
);
2033 ldlm_resource_putref(res
);
2034 LDLM_LOCK_RELEASE(lock
);
2037 if ((ecl
->ecl_loop
& -ecl
->ecl_loop
) == ecl
->ecl_loop
) {
2039 "Cancel lock %p for export %p (loop %d), still have %d locks left on hash table.\n",
2040 lock
, exp
, ecl
->ecl_loop
,
2041 atomic_read(&hs
->hs_count
));
2048 * Cancel all locks for given export.
2050 * Typically called on client disconnection/eviction
2052 void ldlm_cancel_locks_for_export(struct obd_export
*exp
)
2054 struct export_cl_data ecl
= {
2059 cfs_hash_for_each_empty(exp
->exp_lock_hash
,
2060 ldlm_cancel_locks_for_export_cb
, &ecl
);
2064 * Downgrade an exclusive lock.
2066 * A fast variant of ldlm_lock_convert for conversion of exclusive
2067 * locks. The conversion is always successful.
2068 * Used by Commit on Sharing (COS) code.
2070 * \param lock A lock to convert
2071 * \param new_mode new lock mode
2073 void ldlm_lock_downgrade(struct ldlm_lock
*lock
, int new_mode
)
2075 LASSERT(lock
->l_granted_mode
& (LCK_PW
| LCK_EX
));
2076 LASSERT(new_mode
== LCK_COS
);
2078 lock_res_and_lock(lock
);
2079 ldlm_resource_unlink_lock(lock
);
2081 * Remove the lock from pool as it will be added again in
2082 * ldlm_grant_lock() called below.
2084 ldlm_pool_del(&ldlm_lock_to_ns(lock
)->ns_pool
, lock
);
2086 lock
->l_req_mode
= new_mode
;
2087 ldlm_grant_lock(lock
, NULL
);
2088 unlock_res_and_lock(lock
);
2089 ldlm_reprocess_all(lock
->l_resource
);
2091 EXPORT_SYMBOL(ldlm_lock_downgrade
);
2094 * Attempt to convert already granted lock to a different mode.
2096 * While lock conversion is not currently used, future client-side
2097 * optimizations could take advantage of it to avoid discarding cached
2100 struct ldlm_resource
*ldlm_lock_convert(struct ldlm_lock
*lock
, int new_mode
,
2103 LIST_HEAD(rpc_list
);
2104 struct ldlm_resource
*res
;
2105 struct ldlm_namespace
*ns
;
2107 struct ldlm_interval
*node
;
2109 /* Just return if mode is unchanged. */
2110 if (new_mode
== lock
->l_granted_mode
) {
2111 *flags
|= LDLM_FL_BLOCK_GRANTED
;
2112 return lock
->l_resource
;
2115 /* I can't check the type of lock here because the bitlock of lock
2116 * is not held here, so do the allocation blindly. -jay */
2117 OBD_SLAB_ALLOC_PTR_GFP(node
, ldlm_interval_slab
, GFP_NOFS
);
2119 /* Actually, this causes EDEADLOCK to be returned */
2122 LASSERTF((new_mode
== LCK_PW
&& lock
->l_granted_mode
== LCK_PR
),
2123 "new_mode %u, granted %u\n", new_mode
, lock
->l_granted_mode
);
2125 lock_res_and_lock(lock
);
2127 res
= lock
->l_resource
;
2128 ns
= ldlm_res_to_ns(res
);
2130 lock
->l_req_mode
= new_mode
;
2131 if (res
->lr_type
== LDLM_PLAIN
|| res
->lr_type
== LDLM_IBITS
) {
2132 ldlm_resource_unlink_lock(lock
);
2134 ldlm_resource_unlink_lock(lock
);
2135 if (res
->lr_type
== LDLM_EXTENT
) {
2136 /* FIXME: ugly code, I have to attach the lock to a
2137 * interval node again since perhaps it will be granted
2139 INIT_LIST_HEAD(&node
->li_group
);
2140 ldlm_interval_attach(node
, lock
);
2146 * Remove old lock from the pool before adding the lock with new
2147 * mode below in ->policy()
2149 ldlm_pool_del(&ns
->ns_pool
, lock
);
2151 /* If this is a local resource, put it on the appropriate list. */
2152 if (ns_is_client(ldlm_res_to_ns(res
))) {
2153 if (*flags
& (LDLM_FL_BLOCK_CONV
| LDLM_FL_BLOCK_GRANTED
)) {
2154 ldlm_resource_add_lock(res
, &res
->lr_converting
, lock
);
2156 /* This should never happen, because of the way the
2157 * server handles conversions. */
2158 LDLM_ERROR(lock
, "Erroneous flags %x on local lock\n",
2162 ldlm_grant_lock(lock
, &rpc_list
);
2164 /* FIXME: completion handling not with lr_lock held ! */
2165 if (lock
->l_completion_ast
)
2166 lock
->l_completion_ast(lock
, 0, NULL
);
2169 CERROR("This is client-side-only module, cannot handle LDLM_NAMESPACE_SERVER resource type lock.\n");
2172 unlock_res_and_lock(lock
);
2175 ldlm_run_ast_work(ns
, &rpc_list
, LDLM_WORK_CP_AST
);
2177 OBD_SLAB_FREE(node
, ldlm_interval_slab
, sizeof(*node
));
2180 EXPORT_SYMBOL(ldlm_lock_convert
);
2183 * Print lock with lock handle \a lockh description into debug log.
2185 * Used when printing all locks on a resource for debug purposes.
2187 void ldlm_lock_dump_handle(int level
, struct lustre_handle
*lockh
)
2189 struct ldlm_lock
*lock
;
2191 if (!((libcfs_debug
| D_ERROR
) & level
))
2194 lock
= ldlm_handle2lock(lockh
);
2198 LDLM_DEBUG_LIMIT(level
, lock
, "###");
2200 LDLM_LOCK_PUT(lock
);
2202 EXPORT_SYMBOL(ldlm_lock_dump_handle
);
2205 * Print lock information with custom message into debug log.
2208 void _ldlm_lock_debug(struct ldlm_lock
*lock
,
2209 struct libcfs_debug_msg_data
*msgdata
,
2210 const char *fmt
, ...)
2213 struct obd_export
*exp
= lock
->l_export
;
2214 struct ldlm_resource
*resource
= lock
->l_resource
;
2215 char *nid
= "local";
2217 va_start(args
, fmt
);
2219 if (exp
&& exp
->exp_connection
) {
2220 nid
= libcfs_nid2str(exp
->exp_connection
->c_peer
.nid
);
2221 } else if (exp
&& exp
->exp_obd
!= NULL
) {
2222 struct obd_import
*imp
= exp
->exp_obd
->u
.cli
.cl_import
;
2224 nid
= libcfs_nid2str(imp
->imp_connection
->c_peer
.nid
);
2227 if (resource
== NULL
) {
2228 libcfs_debug_vmsg2(msgdata
, fmt
, args
,
2229 " ns: \?\? lock: %p/%#llx lrc: %d/%d,%d mode: %s/%s res: \?\? rrc=\?\? type: \?\?\? flags: %#llx nid: %s remote: %#llx expref: %d pid: %u timeout: %lu lvb_type: %d\n",
2231 lock
->l_handle
.h_cookie
, atomic_read(&lock
->l_refc
),
2232 lock
->l_readers
, lock
->l_writers
,
2233 ldlm_lockname
[lock
->l_granted_mode
],
2234 ldlm_lockname
[lock
->l_req_mode
],
2235 lock
->l_flags
, nid
, lock
->l_remote_handle
.cookie
,
2236 exp
? atomic_read(&exp
->exp_refcount
) : -99,
2237 lock
->l_pid
, lock
->l_callback_timeout
, lock
->l_lvb_type
);
2242 switch (resource
->lr_type
) {
2244 libcfs_debug_vmsg2(msgdata
, fmt
, args
,
2245 " ns: %s lock: %p/%#llx lrc: %d/%d,%d mode: %s/%s res: " DLDLMRES
" rrc: %d type: %s [%llu->%llu] (req %llu->%llu) flags: %#llx nid: %s remote: %#llx expref: %d pid: %u timeout: %lu lvb_type: %d\n",
2246 ldlm_lock_to_ns_name(lock
), lock
,
2247 lock
->l_handle
.h_cookie
, atomic_read(&lock
->l_refc
),
2248 lock
->l_readers
, lock
->l_writers
,
2249 ldlm_lockname
[lock
->l_granted_mode
],
2250 ldlm_lockname
[lock
->l_req_mode
],
2252 atomic_read(&resource
->lr_refcount
),
2253 ldlm_typename
[resource
->lr_type
],
2254 lock
->l_policy_data
.l_extent
.start
,
2255 lock
->l_policy_data
.l_extent
.end
,
2256 lock
->l_req_extent
.start
, lock
->l_req_extent
.end
,
2257 lock
->l_flags
, nid
, lock
->l_remote_handle
.cookie
,
2258 exp
? atomic_read(&exp
->exp_refcount
) : -99,
2259 lock
->l_pid
, lock
->l_callback_timeout
,
2264 libcfs_debug_vmsg2(msgdata
, fmt
, args
,
2265 " ns: %s lock: %p/%#llx lrc: %d/%d,%d mode: %s/%s res: " DLDLMRES
" rrc: %d type: %s pid: %d [%llu->%llu] flags: %#llx nid: %s remote: %#llx expref: %d pid: %u timeout: %lu\n",
2266 ldlm_lock_to_ns_name(lock
), lock
,
2267 lock
->l_handle
.h_cookie
, atomic_read(&lock
->l_refc
),
2268 lock
->l_readers
, lock
->l_writers
,
2269 ldlm_lockname
[lock
->l_granted_mode
],
2270 ldlm_lockname
[lock
->l_req_mode
],
2272 atomic_read(&resource
->lr_refcount
),
2273 ldlm_typename
[resource
->lr_type
],
2274 lock
->l_policy_data
.l_flock
.pid
,
2275 lock
->l_policy_data
.l_flock
.start
,
2276 lock
->l_policy_data
.l_flock
.end
,
2277 lock
->l_flags
, nid
, lock
->l_remote_handle
.cookie
,
2278 exp
? atomic_read(&exp
->exp_refcount
) : -99,
2279 lock
->l_pid
, lock
->l_callback_timeout
);
2283 libcfs_debug_vmsg2(msgdata
, fmt
, args
,
2284 " ns: %s lock: %p/%#llx lrc: %d/%d,%d mode: %s/%s res: " DLDLMRES
" bits %#llx rrc: %d type: %s flags: %#llx nid: %s remote: %#llx expref: %d pid: %u timeout: %lu lvb_type: %d\n",
2285 ldlm_lock_to_ns_name(lock
),
2286 lock
, lock
->l_handle
.h_cookie
,
2287 atomic_read(&lock
->l_refc
),
2288 lock
->l_readers
, lock
->l_writers
,
2289 ldlm_lockname
[lock
->l_granted_mode
],
2290 ldlm_lockname
[lock
->l_req_mode
],
2292 lock
->l_policy_data
.l_inodebits
.bits
,
2293 atomic_read(&resource
->lr_refcount
),
2294 ldlm_typename
[resource
->lr_type
],
2295 lock
->l_flags
, nid
, lock
->l_remote_handle
.cookie
,
2296 exp
? atomic_read(&exp
->exp_refcount
) : -99,
2297 lock
->l_pid
, lock
->l_callback_timeout
,
2302 libcfs_debug_vmsg2(msgdata
, fmt
, args
,
2303 " ns: %s lock: %p/%#llx lrc: %d/%d,%d mode: %s/%s res: " DLDLMRES
" rrc: %d type: %s flags: %#llx nid: %s remote: %#llx expref: %d pid: %u timeout: %lu lvb_type: %d\n",
2304 ldlm_lock_to_ns_name(lock
),
2305 lock
, lock
->l_handle
.h_cookie
,
2306 atomic_read(&lock
->l_refc
),
2307 lock
->l_readers
, lock
->l_writers
,
2308 ldlm_lockname
[lock
->l_granted_mode
],
2309 ldlm_lockname
[lock
->l_req_mode
],
2311 atomic_read(&resource
->lr_refcount
),
2312 ldlm_typename
[resource
->lr_type
],
2313 lock
->l_flags
, nid
, lock
->l_remote_handle
.cookie
,
2314 exp
? atomic_read(&exp
->exp_refcount
) : -99,
2315 lock
->l_pid
, lock
->l_callback_timeout
,
2321 EXPORT_SYMBOL(_ldlm_lock_debug
);