4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License version 2 for more details (a copy is included
14 * in the LICENSE file that accompanied this code).
16 * You should have received a copy of the GNU General Public License
17 * version 2 along with this program; If not, see
18 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
20 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21 * CA 95054 USA or visit www.sun.com if you need additional information or
27 * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
28 * Use is subject to license terms.
30 * Copyright (c) 2010, 2012, Intel Corporation.
33 * This file is part of Lustre, http://www.lustre.org/
34 * Lustre is a trademark of Sun Microsystems, Inc.
36 * lustre/ldlm/ldlm_resource.c
38 * Author: Phil Schwan <phil@clusterfs.com>
39 * Author: Peter Braam <braam@clusterfs.com>
42 #define DEBUG_SUBSYSTEM S_LDLM
43 #include "../include/lustre_dlm.h"
44 #include "../include/lustre_fid.h"
45 #include "../include/obd_class.h"
46 #include "ldlm_internal.h"
48 struct kmem_cache
*ldlm_resource_slab
, *ldlm_lock_slab
;
50 int ldlm_srv_namespace_nr
;
51 int ldlm_cli_namespace_nr
;
53 struct mutex ldlm_srv_namespace_lock
;
54 LIST_HEAD(ldlm_srv_namespace_list
);
56 struct mutex ldlm_cli_namespace_lock
;
57 /* Client Namespaces that have active resources in them.
58 * Once all resources go away, ldlm_poold moves such namespaces to the
60 LIST_HEAD(ldlm_cli_active_namespace_list
);
61 /* Client namespaces that don't have any locks in them */
62 static LIST_HEAD(ldlm_cli_inactive_namespace_list
);
64 static struct dentry
*ldlm_debugfs_dir
;
65 static struct dentry
*ldlm_ns_debugfs_dir
;
66 struct dentry
*ldlm_svc_debugfs_dir
;
68 /* during debug dump certain amount of granted locks for one resource to avoid
70 static unsigned int ldlm_dump_granted_max
= 256;
73 lprocfs_wr_dump_ns(struct file
*file
, const char __user
*buffer
,
74 size_t count
, loff_t
*off
)
76 ldlm_dump_all_namespaces(LDLM_NAMESPACE_SERVER
, D_DLMTRACE
);
77 ldlm_dump_all_namespaces(LDLM_NAMESPACE_CLIENT
, D_DLMTRACE
);
81 LPROC_SEQ_FOPS_WR_ONLY(ldlm
, dump_ns
);
83 LPROC_SEQ_FOPS_RW_TYPE(ldlm_rw
, uint
);
85 static struct lprocfs_vars ldlm_debugfs_list
[] = {
86 { "dump_namespaces", &ldlm_dump_ns_fops
, NULL
, 0222 },
87 { "dump_granted_max", &ldlm_rw_uint_fops
, &ldlm_dump_granted_max
},
91 int ldlm_debugfs_setup(void)
95 ldlm_debugfs_dir
= ldebugfs_register(OBD_LDLM_DEVICENAME
,
98 if (IS_ERR_OR_NULL(ldlm_debugfs_dir
)) {
99 CERROR("LProcFS failed in ldlm-init\n");
100 rc
= ldlm_debugfs_dir
? PTR_ERR(ldlm_debugfs_dir
) : -ENOMEM
;
104 ldlm_ns_debugfs_dir
= ldebugfs_register("namespaces",
107 if (IS_ERR_OR_NULL(ldlm_ns_debugfs_dir
)) {
108 CERROR("LProcFS failed in ldlm-init\n");
109 rc
= ldlm_ns_debugfs_dir
? PTR_ERR(ldlm_ns_debugfs_dir
)
114 ldlm_svc_debugfs_dir
= ldebugfs_register("services",
117 if (IS_ERR_OR_NULL(ldlm_svc_debugfs_dir
)) {
118 CERROR("LProcFS failed in ldlm-init\n");
119 rc
= ldlm_svc_debugfs_dir
? PTR_ERR(ldlm_svc_debugfs_dir
)
124 rc
= ldebugfs_add_vars(ldlm_debugfs_dir
, ldlm_debugfs_list
, NULL
);
129 ldebugfs_remove(&ldlm_ns_debugfs_dir
);
131 ldebugfs_remove(&ldlm_debugfs_dir
);
133 ldlm_svc_debugfs_dir
= NULL
;
134 ldlm_ns_debugfs_dir
= NULL
;
135 ldlm_debugfs_dir
= NULL
;
139 void ldlm_debugfs_cleanup(void)
141 if (!IS_ERR_OR_NULL(ldlm_svc_debugfs_dir
))
142 ldebugfs_remove(&ldlm_svc_debugfs_dir
);
144 if (!IS_ERR_OR_NULL(ldlm_ns_debugfs_dir
))
145 ldebugfs_remove(&ldlm_ns_debugfs_dir
);
147 if (!IS_ERR_OR_NULL(ldlm_debugfs_dir
))
148 ldebugfs_remove(&ldlm_debugfs_dir
);
150 ldlm_svc_debugfs_dir
= NULL
;
151 ldlm_ns_debugfs_dir
= NULL
;
152 ldlm_debugfs_dir
= NULL
;
155 static ssize_t
resource_count_show(struct kobject
*kobj
, struct attribute
*attr
,
158 struct ldlm_namespace
*ns
= container_of(kobj
, struct ldlm_namespace
,
161 struct cfs_hash_bd bd
;
164 /* result is not strictly consistent */
165 cfs_hash_for_each_bucket(ns
->ns_rs_hash
, &bd
, i
)
166 res
+= cfs_hash_bd_count_get(&bd
);
167 return sprintf(buf
, "%lld\n", res
);
169 LUSTRE_RO_ATTR(resource_count
);
171 static ssize_t
lock_count_show(struct kobject
*kobj
, struct attribute
*attr
,
174 struct ldlm_namespace
*ns
= container_of(kobj
, struct ldlm_namespace
,
178 locks
= lprocfs_stats_collector(ns
->ns_stats
, LDLM_NSS_LOCKS
,
179 LPROCFS_FIELDS_FLAGS_SUM
);
180 return sprintf(buf
, "%lld\n", locks
);
182 LUSTRE_RO_ATTR(lock_count
);
184 static ssize_t
lock_unused_count_show(struct kobject
*kobj
,
185 struct attribute
*attr
,
188 struct ldlm_namespace
*ns
= container_of(kobj
, struct ldlm_namespace
,
191 return sprintf(buf
, "%d\n", ns
->ns_nr_unused
);
193 LUSTRE_RO_ATTR(lock_unused_count
);
195 static ssize_t
lru_size_show(struct kobject
*kobj
, struct attribute
*attr
,
198 struct ldlm_namespace
*ns
= container_of(kobj
, struct ldlm_namespace
,
200 __u32
*nr
= &ns
->ns_max_unused
;
202 if (ns_connect_lru_resize(ns
))
203 nr
= &ns
->ns_nr_unused
;
204 return sprintf(buf
, "%u", *nr
);
207 static ssize_t
lru_size_store(struct kobject
*kobj
, struct attribute
*attr
,
208 const char *buffer
, size_t count
)
210 struct ldlm_namespace
*ns
= container_of(kobj
, struct ldlm_namespace
,
216 if (strncmp(buffer
, "clear", 5) == 0) {
218 "dropping all unused locks from namespace %s\n",
220 if (ns_connect_lru_resize(ns
)) {
221 int canceled
, unused
= ns
->ns_nr_unused
;
223 /* Try to cancel all @ns_nr_unused locks. */
224 canceled
= ldlm_cancel_lru(ns
, unused
, 0,
226 if (canceled
< unused
) {
228 "not all requested locks are canceled, requested: %d, canceled: %d\n",
234 tmp
= ns
->ns_max_unused
;
235 ns
->ns_max_unused
= 0;
236 ldlm_cancel_lru(ns
, 0, 0, LDLM_CANCEL_PASSED
);
237 ns
->ns_max_unused
= tmp
;
242 err
= kstrtoul(buffer
, 10, &tmp
);
244 CERROR("lru_size: invalid value written\n");
247 lru_resize
= (tmp
== 0);
249 if (ns_connect_lru_resize(ns
)) {
251 ns
->ns_max_unused
= (unsigned int)tmp
;
253 if (tmp
> ns
->ns_nr_unused
)
254 tmp
= ns
->ns_nr_unused
;
255 tmp
= ns
->ns_nr_unused
- tmp
;
258 "changing namespace %s unused locks from %u to %u\n",
259 ldlm_ns_name(ns
), ns
->ns_nr_unused
,
261 ldlm_cancel_lru(ns
, tmp
, LCF_ASYNC
, LDLM_CANCEL_PASSED
);
265 "disable lru_resize for namespace %s\n",
267 ns
->ns_connect_flags
&= ~OBD_CONNECT_LRU_RESIZE
;
271 "changing namespace %s max_unused from %u to %u\n",
272 ldlm_ns_name(ns
), ns
->ns_max_unused
,
274 ns
->ns_max_unused
= (unsigned int)tmp
;
275 ldlm_cancel_lru(ns
, 0, LCF_ASYNC
, LDLM_CANCEL_PASSED
);
277 /* Make sure that LRU resize was originally supported before
278 * turning it on here. */
280 (ns
->ns_orig_connect_flags
& OBD_CONNECT_LRU_RESIZE
)) {
282 "enable lru_resize for namespace %s\n",
284 ns
->ns_connect_flags
|= OBD_CONNECT_LRU_RESIZE
;
290 LUSTRE_RW_ATTR(lru_size
);
292 static ssize_t
lru_max_age_show(struct kobject
*kobj
, struct attribute
*attr
,
295 struct ldlm_namespace
*ns
= container_of(kobj
, struct ldlm_namespace
,
298 return sprintf(buf
, "%u", ns
->ns_max_age
);
301 static ssize_t
lru_max_age_store(struct kobject
*kobj
, struct attribute
*attr
,
302 const char *buffer
, size_t count
)
304 struct ldlm_namespace
*ns
= container_of(kobj
, struct ldlm_namespace
,
309 err
= kstrtoul(buffer
, 10, &tmp
);
313 ns
->ns_max_age
= tmp
;
317 LUSTRE_RW_ATTR(lru_max_age
);
319 static ssize_t
early_lock_cancel_show(struct kobject
*kobj
,
320 struct attribute
*attr
,
323 struct ldlm_namespace
*ns
= container_of(kobj
, struct ldlm_namespace
,
326 return sprintf(buf
, "%d\n", ns_connect_cancelset(ns
));
329 static ssize_t
early_lock_cancel_store(struct kobject
*kobj
,
330 struct attribute
*attr
,
334 struct ldlm_namespace
*ns
= container_of(kobj
, struct ldlm_namespace
,
336 unsigned long supp
= -1;
339 rc
= kstrtoul(buffer
, 10, &supp
);
344 ns
->ns_connect_flags
&= ~OBD_CONNECT_CANCELSET
;
345 else if (ns
->ns_orig_connect_flags
& OBD_CONNECT_CANCELSET
)
346 ns
->ns_connect_flags
|= OBD_CONNECT_CANCELSET
;
349 LUSTRE_RW_ATTR(early_lock_cancel
);
351 /* These are for namespaces in /sys/fs/lustre/ldlm/namespaces/ */
352 static struct attribute
*ldlm_ns_attrs
[] = {
353 &lustre_attr_resource_count
.attr
,
354 &lustre_attr_lock_count
.attr
,
355 &lustre_attr_lock_unused_count
.attr
,
356 &lustre_attr_lru_size
.attr
,
357 &lustre_attr_lru_max_age
.attr
,
358 &lustre_attr_early_lock_cancel
.attr
,
362 static void ldlm_ns_release(struct kobject
*kobj
)
364 struct ldlm_namespace
*ns
= container_of(kobj
, struct ldlm_namespace
,
366 complete(&ns
->ns_kobj_unregister
);
369 static struct kobj_type ldlm_ns_ktype
= {
370 .default_attrs
= ldlm_ns_attrs
,
371 .sysfs_ops
= &lustre_sysfs_ops
,
372 .release
= ldlm_ns_release
,
375 static void ldlm_namespace_debugfs_unregister(struct ldlm_namespace
*ns
)
377 if (IS_ERR_OR_NULL(ns
->ns_debugfs_entry
))
378 CERROR("dlm namespace %s has no procfs dir?\n",
381 ldebugfs_remove(&ns
->ns_debugfs_entry
);
383 if (ns
->ns_stats
!= NULL
)
384 lprocfs_free_stats(&ns
->ns_stats
);
387 static void ldlm_namespace_sysfs_unregister(struct ldlm_namespace
*ns
)
389 kobject_put(&ns
->ns_kobj
);
390 wait_for_completion(&ns
->ns_kobj_unregister
);
393 static int ldlm_namespace_sysfs_register(struct ldlm_namespace
*ns
)
397 ns
->ns_kobj
.kset
= ldlm_ns_kset
;
398 init_completion(&ns
->ns_kobj_unregister
);
399 err
= kobject_init_and_add(&ns
->ns_kobj
, &ldlm_ns_ktype
, NULL
,
400 "%s", ldlm_ns_name(ns
));
402 ns
->ns_stats
= lprocfs_alloc_stats(LDLM_NSS_LAST
, 0);
403 if (ns
->ns_stats
== NULL
) {
404 kobject_put(&ns
->ns_kobj
);
408 lprocfs_counter_init(ns
->ns_stats
, LDLM_NSS_LOCKS
,
409 LPROCFS_CNTR_AVGMINMAX
, "locks", "locks");
414 static int ldlm_namespace_debugfs_register(struct ldlm_namespace
*ns
)
416 struct dentry
*ns_entry
;
418 if (!IS_ERR_OR_NULL(ns
->ns_debugfs_entry
)) {
419 ns_entry
= ns
->ns_debugfs_entry
;
421 ns_entry
= debugfs_create_dir(ldlm_ns_name(ns
),
422 ldlm_ns_debugfs_dir
);
423 if (ns_entry
== NULL
)
425 ns
->ns_debugfs_entry
= ns_entry
;
431 #undef MAX_STRING_SIZE
433 static struct ldlm_resource
*ldlm_resource_getref(struct ldlm_resource
*res
)
436 LASSERT(res
!= LP_POISON
);
437 atomic_inc(&res
->lr_refcount
);
438 CDEBUG(D_INFO
, "getref res: %p count: %d\n", res
,
439 atomic_read(&res
->lr_refcount
));
443 static unsigned ldlm_res_hop_hash(struct cfs_hash
*hs
,
444 const void *key
, unsigned mask
)
446 const struct ldlm_res_id
*id
= key
;
450 for (i
= 0; i
< RES_NAME_SIZE
; i
++)
455 static unsigned ldlm_res_hop_fid_hash(struct cfs_hash
*hs
,
456 const void *key
, unsigned mask
)
458 const struct ldlm_res_id
*id
= key
;
463 fid
.f_seq
= id
->name
[LUSTRE_RES_ID_SEQ_OFF
];
464 fid
.f_oid
= (__u32
)id
->name
[LUSTRE_RES_ID_VER_OID_OFF
];
465 fid
.f_ver
= (__u32
)(id
->name
[LUSTRE_RES_ID_VER_OID_OFF
] >> 32);
467 hash
= fid_flatten32(&fid
);
468 hash
+= (hash
>> 4) + (hash
<< 12); /* mixing oid and seq */
469 if (id
->name
[LUSTRE_RES_ID_HSH_OFF
] != 0) {
470 val
= id
->name
[LUSTRE_RES_ID_HSH_OFF
];
471 hash
+= (val
>> 5) + (val
<< 11);
475 hash
= hash_long(hash
, hs
->hs_bkt_bits
);
476 /* give me another random factor */
477 hash
-= hash_long((unsigned long)hs
, val
% 11 + 3);
479 hash
<<= hs
->hs_cur_bits
- hs
->hs_bkt_bits
;
480 hash
|= ldlm_res_hop_hash(hs
, key
, CFS_HASH_NBKT(hs
) - 1);
485 static void *ldlm_res_hop_key(struct hlist_node
*hnode
)
487 struct ldlm_resource
*res
;
489 res
= hlist_entry(hnode
, struct ldlm_resource
, lr_hash
);
490 return &res
->lr_name
;
493 static int ldlm_res_hop_keycmp(const void *key
, struct hlist_node
*hnode
)
495 struct ldlm_resource
*res
;
497 res
= hlist_entry(hnode
, struct ldlm_resource
, lr_hash
);
498 return ldlm_res_eq((const struct ldlm_res_id
*)key
,
499 (const struct ldlm_res_id
*)&res
->lr_name
);
502 static void *ldlm_res_hop_object(struct hlist_node
*hnode
)
504 return hlist_entry(hnode
, struct ldlm_resource
, lr_hash
);
507 static void ldlm_res_hop_get_locked(struct cfs_hash
*hs
,
508 struct hlist_node
*hnode
)
510 struct ldlm_resource
*res
;
512 res
= hlist_entry(hnode
, struct ldlm_resource
, lr_hash
);
513 ldlm_resource_getref(res
);
516 static void ldlm_res_hop_put_locked(struct cfs_hash
*hs
,
517 struct hlist_node
*hnode
)
519 struct ldlm_resource
*res
;
521 res
= hlist_entry(hnode
, struct ldlm_resource
, lr_hash
);
522 /* cfs_hash_for_each_nolock is the only chance we call it */
523 ldlm_resource_putref_locked(res
);
526 static void ldlm_res_hop_put(struct cfs_hash
*hs
, struct hlist_node
*hnode
)
528 struct ldlm_resource
*res
;
530 res
= hlist_entry(hnode
, struct ldlm_resource
, lr_hash
);
531 ldlm_resource_putref(res
);
534 static struct cfs_hash_ops ldlm_ns_hash_ops
= {
535 .hs_hash
= ldlm_res_hop_hash
,
536 .hs_key
= ldlm_res_hop_key
,
537 .hs_keycmp
= ldlm_res_hop_keycmp
,
539 .hs_object
= ldlm_res_hop_object
,
540 .hs_get
= ldlm_res_hop_get_locked
,
541 .hs_put_locked
= ldlm_res_hop_put_locked
,
542 .hs_put
= ldlm_res_hop_put
545 static struct cfs_hash_ops ldlm_ns_fid_hash_ops
= {
546 .hs_hash
= ldlm_res_hop_fid_hash
,
547 .hs_key
= ldlm_res_hop_key
,
548 .hs_keycmp
= ldlm_res_hop_keycmp
,
550 .hs_object
= ldlm_res_hop_object
,
551 .hs_get
= ldlm_res_hop_get_locked
,
552 .hs_put_locked
= ldlm_res_hop_put_locked
,
553 .hs_put
= ldlm_res_hop_put
556 struct ldlm_ns_hash_def
{
557 ldlm_ns_type_t nsd_type
;
558 /** hash bucket bits */
559 unsigned nsd_bkt_bits
;
561 unsigned nsd_all_bits
;
562 /** hash operations */
563 struct cfs_hash_ops
*nsd_hops
;
566 static struct ldlm_ns_hash_def ldlm_ns_hash_defs
[] = {
568 .nsd_type
= LDLM_NS_TYPE_MDC
,
571 .nsd_hops
= &ldlm_ns_fid_hash_ops
,
574 .nsd_type
= LDLM_NS_TYPE_MDT
,
577 .nsd_hops
= &ldlm_ns_fid_hash_ops
,
580 .nsd_type
= LDLM_NS_TYPE_OSC
,
583 .nsd_hops
= &ldlm_ns_hash_ops
,
586 .nsd_type
= LDLM_NS_TYPE_OST
,
589 .nsd_hops
= &ldlm_ns_hash_ops
,
592 .nsd_type
= LDLM_NS_TYPE_MGC
,
595 .nsd_hops
= &ldlm_ns_hash_ops
,
598 .nsd_type
= LDLM_NS_TYPE_MGT
,
601 .nsd_hops
= &ldlm_ns_hash_ops
,
604 .nsd_type
= LDLM_NS_TYPE_UNKNOWN
,
608 /** Register \a ns in the list of namespaces */
609 static void ldlm_namespace_register(struct ldlm_namespace
*ns
,
612 mutex_lock(ldlm_namespace_lock(client
));
613 LASSERT(list_empty(&ns
->ns_list_chain
));
614 list_add(&ns
->ns_list_chain
, &ldlm_cli_inactive_namespace_list
);
615 ldlm_namespace_nr_inc(client
);
616 mutex_unlock(ldlm_namespace_lock(client
));
620 * Create and initialize new empty namespace.
622 struct ldlm_namespace
*ldlm_namespace_new(struct obd_device
*obd
, char *name
,
625 ldlm_ns_type_t ns_type
)
627 struct ldlm_namespace
*ns
= NULL
;
628 struct ldlm_ns_bucket
*nsb
;
629 struct ldlm_ns_hash_def
*nsd
;
630 struct cfs_hash_bd bd
;
634 LASSERT(obd
!= NULL
);
638 CERROR("ldlm_get_ref failed: %d\n", rc
);
642 for (idx
= 0;; idx
++) {
643 nsd
= &ldlm_ns_hash_defs
[idx
];
644 if (nsd
->nsd_type
== LDLM_NS_TYPE_UNKNOWN
) {
645 CERROR("Unknown type %d for ns %s\n", ns_type
, name
);
649 if (nsd
->nsd_type
== ns_type
)
653 ns
= kzalloc(sizeof(*ns
), GFP_NOFS
);
657 ns
->ns_rs_hash
= cfs_hash_create(name
,
658 nsd
->nsd_all_bits
, nsd
->nsd_all_bits
,
659 nsd
->nsd_bkt_bits
, sizeof(*nsb
),
665 CFS_HASH_SPIN_BKTLOCK
|
666 CFS_HASH_NO_ITEMREF
);
667 if (ns
->ns_rs_hash
== NULL
)
670 cfs_hash_for_each_bucket(ns
->ns_rs_hash
, &bd
, idx
) {
671 nsb
= cfs_hash_bd_extra_get(ns
->ns_rs_hash
, &bd
);
672 at_init(&nsb
->nsb_at_estimate
, ldlm_enqueue_min
, 0);
673 nsb
->nsb_namespace
= ns
;
677 ns
->ns_appetite
= apt
;
678 ns
->ns_client
= client
;
680 INIT_LIST_HEAD(&ns
->ns_list_chain
);
681 INIT_LIST_HEAD(&ns
->ns_unused_list
);
682 spin_lock_init(&ns
->ns_lock
);
683 atomic_set(&ns
->ns_bref
, 0);
684 init_waitqueue_head(&ns
->ns_waitq
);
686 ns
->ns_max_parallel_ast
= LDLM_DEFAULT_PARALLEL_AST_LIMIT
;
687 ns
->ns_nr_unused
= 0;
688 ns
->ns_max_unused
= LDLM_DEFAULT_LRU_SIZE
;
689 ns
->ns_max_age
= LDLM_DEFAULT_MAX_ALIVE
;
690 ns
->ns_orig_connect_flags
= 0;
691 ns
->ns_connect_flags
= 0;
694 rc
= ldlm_namespace_sysfs_register(ns
);
696 CERROR("Can't initialize ns sysfs, rc %d\n", rc
);
700 rc
= ldlm_namespace_debugfs_register(ns
);
702 CERROR("Can't initialize ns proc, rc %d\n", rc
);
706 idx
= ldlm_namespace_nr_read(client
);
707 rc
= ldlm_pool_init(&ns
->ns_pool
, ns
, idx
, client
);
709 CERROR("Can't initialize lock pool, rc %d\n", rc
);
713 ldlm_namespace_register(ns
, client
);
716 ldlm_namespace_debugfs_unregister(ns
);
718 ldlm_namespace_sysfs_unregister(ns
);
719 ldlm_namespace_cleanup(ns
, 0);
721 cfs_hash_putref(ns
->ns_rs_hash
);
728 EXPORT_SYMBOL(ldlm_namespace_new
);
730 extern struct ldlm_lock
*ldlm_lock_get(struct ldlm_lock
*lock
);
733 * Cancel and destroy all locks on a resource.
735 * If flags contains FL_LOCAL_ONLY, don't try to tell the server, just
736 * clean up. This is currently only used for recovery, and we make
737 * certain assumptions as a result--notably, that we shouldn't cancel
740 static void cleanup_resource(struct ldlm_resource
*res
, struct list_head
*q
,
743 struct list_head
*tmp
;
745 bool local_only
= !!(flags
& LDLM_FL_LOCAL_ONLY
);
748 struct ldlm_lock
*lock
= NULL
;
749 struct lustre_handle lockh
;
751 /* First, we look for non-cleaned-yet lock
752 * all cleaned locks are marked by CLEANED flag. */
754 list_for_each(tmp
, q
) {
755 lock
= list_entry(tmp
, struct ldlm_lock
,
757 if (lock
->l_flags
& LDLM_FL_CLEANED
) {
762 lock
->l_flags
|= LDLM_FL_CLEANED
;
771 /* Set CBPENDING so nothing in the cancellation path
772 * can match this lock. */
773 lock
->l_flags
|= LDLM_FL_CBPENDING
;
774 lock
->l_flags
|= LDLM_FL_FAILED
;
775 lock
->l_flags
|= flags
;
777 /* ... without sending a CANCEL message for local_only. */
779 lock
->l_flags
|= LDLM_FL_LOCAL_ONLY
;
781 if (local_only
&& (lock
->l_readers
|| lock
->l_writers
)) {
782 /* This is a little bit gross, but much better than the
783 * alternative: pretend that we got a blocking AST from
784 * the server, so that when the lock is decref'd, it
785 * will go away ... */
787 LDLM_DEBUG(lock
, "setting FL_LOCAL_ONLY");
788 if (lock
->l_completion_ast
)
789 lock
->l_completion_ast(lock
, 0, NULL
);
790 LDLM_LOCK_RELEASE(lock
);
795 ldlm_lock2handle(lock
, &lockh
);
796 rc
= ldlm_cli_cancel(&lockh
, LCF_ASYNC
);
798 CERROR("ldlm_cli_cancel: %d\n", rc
);
799 LDLM_LOCK_RELEASE(lock
);
803 static int ldlm_resource_clean(struct cfs_hash
*hs
, struct cfs_hash_bd
*bd
,
804 struct hlist_node
*hnode
, void *arg
)
806 struct ldlm_resource
*res
= cfs_hash_object(hs
, hnode
);
807 __u64 flags
= *(__u64
*)arg
;
809 cleanup_resource(res
, &res
->lr_granted
, flags
);
810 cleanup_resource(res
, &res
->lr_waiting
, flags
);
815 static int ldlm_resource_complain(struct cfs_hash
*hs
, struct cfs_hash_bd
*bd
,
816 struct hlist_node
*hnode
, void *arg
)
818 struct ldlm_resource
*res
= cfs_hash_object(hs
, hnode
);
821 CERROR("%s: namespace resource "DLDLMRES
822 " (%p) refcount nonzero (%d) after lock cleanup; forcing cleanup.\n",
823 ldlm_ns_name(ldlm_res_to_ns(res
)), PLDLMRES(res
), res
,
824 atomic_read(&res
->lr_refcount
) - 1);
826 ldlm_resource_dump(D_ERROR
, res
);
832 * Cancel and destroy all locks in the namespace.
834 * Typically used during evictions when server notified client that it was
835 * evicted and all of its state needs to be destroyed.
836 * Also used during shutdown.
838 int ldlm_namespace_cleanup(struct ldlm_namespace
*ns
, __u64 flags
)
841 CDEBUG(D_INFO
, "NULL ns, skipping cleanup\n");
845 cfs_hash_for_each_nolock(ns
->ns_rs_hash
, ldlm_resource_clean
, &flags
);
846 cfs_hash_for_each_nolock(ns
->ns_rs_hash
, ldlm_resource_complain
, NULL
);
849 EXPORT_SYMBOL(ldlm_namespace_cleanup
);
852 * Attempts to free namespace.
854 * Only used when namespace goes away, like during an unmount.
856 static int __ldlm_namespace_free(struct ldlm_namespace
*ns
, int force
)
858 /* At shutdown time, don't call the cancellation callback */
859 ldlm_namespace_cleanup(ns
, force
? LDLM_FL_LOCAL_ONLY
: 0);
861 if (atomic_read(&ns
->ns_bref
) > 0) {
862 struct l_wait_info lwi
= LWI_INTR(LWI_ON_SIGNAL_NOOP
, NULL
);
866 "dlm namespace %s free waiting on refcount %d\n",
867 ldlm_ns_name(ns
), atomic_read(&ns
->ns_bref
));
870 lwi
= LWI_TIMEOUT(obd_timeout
* HZ
/ 4, NULL
, NULL
);
872 rc
= l_wait_event(ns
->ns_waitq
,
873 atomic_read(&ns
->ns_bref
) == 0, &lwi
);
875 /* Forced cleanups should be able to reclaim all references,
876 * so it's safe to wait forever... we can't leak locks... */
877 if (force
&& rc
== -ETIMEDOUT
) {
878 LCONSOLE_ERROR("Forced cleanup waiting for %s namespace with %d resources in use, (rc=%d)\n",
880 atomic_read(&ns
->ns_bref
), rc
);
884 if (atomic_read(&ns
->ns_bref
)) {
885 LCONSOLE_ERROR("Cleanup waiting for %s namespace with %d resources in use, (rc=%d)\n",
887 atomic_read(&ns
->ns_bref
), rc
);
888 return ELDLM_NAMESPACE_EXISTS
;
890 CDEBUG(D_DLMTRACE
, "dlm namespace %s free done waiting\n",
898 * Performs various cleanups for passed \a ns to make it drop refc and be
899 * ready for freeing. Waits for refc == 0.
901 * The following is done:
902 * (0) Unregister \a ns from its list to make inaccessible for potential
903 * users like pools thread and others;
904 * (1) Clear all locks in \a ns.
906 void ldlm_namespace_free_prior(struct ldlm_namespace
*ns
,
907 struct obd_import
*imp
,
915 spin_lock(&ns
->ns_lock
);
917 spin_unlock(&ns
->ns_lock
);
920 * Can fail with -EINTR when force == 0 in which case try harder.
922 rc
= __ldlm_namespace_free(ns
, force
);
923 if (rc
!= ELDLM_OK
) {
925 ptlrpc_disconnect_import(imp
, 0);
926 ptlrpc_invalidate_import(imp
);
930 * With all requests dropped and the import inactive
931 * we are guaranteed all reference will be dropped.
933 rc
= __ldlm_namespace_free(ns
, 1);
938 /** Unregister \a ns from the list of namespaces. */
939 static void ldlm_namespace_unregister(struct ldlm_namespace
*ns
,
942 mutex_lock(ldlm_namespace_lock(client
));
943 LASSERT(!list_empty(&ns
->ns_list_chain
));
944 /* Some asserts and possibly other parts of the code are still
945 * using list_empty(&ns->ns_list_chain). This is why it is
946 * important to use list_del_init() here. */
947 list_del_init(&ns
->ns_list_chain
);
948 ldlm_namespace_nr_dec(client
);
949 mutex_unlock(ldlm_namespace_lock(client
));
953 * Performs freeing memory structures related to \a ns. This is only done
954 * when ldlm_namespce_free_prior() successfully removed all resources
955 * referencing \a ns and its refc == 0.
957 void ldlm_namespace_free_post(struct ldlm_namespace
*ns
)
962 /* Make sure that nobody can find this ns in its list. */
963 ldlm_namespace_unregister(ns
, ns
->ns_client
);
964 /* Fini pool _before_ parent proc dir is removed. This is important as
965 * ldlm_pool_fini() removes own proc dir which is child to @dir.
966 * Removing it after @dir may cause oops. */
967 ldlm_pool_fini(&ns
->ns_pool
);
969 ldlm_namespace_debugfs_unregister(ns
);
970 ldlm_namespace_sysfs_unregister(ns
);
971 cfs_hash_putref(ns
->ns_rs_hash
);
972 /* Namespace \a ns should be not on list at this time, otherwise
973 * this will cause issues related to using freed \a ns in poold
975 LASSERT(list_empty(&ns
->ns_list_chain
));
980 void ldlm_namespace_get(struct ldlm_namespace
*ns
)
982 atomic_inc(&ns
->ns_bref
);
984 EXPORT_SYMBOL(ldlm_namespace_get
);
986 /* This is only for callers that care about refcount */
987 static int ldlm_namespace_get_return(struct ldlm_namespace
*ns
)
989 return atomic_inc_return(&ns
->ns_bref
);
992 void ldlm_namespace_put(struct ldlm_namespace
*ns
)
994 if (atomic_dec_and_lock(&ns
->ns_bref
, &ns
->ns_lock
)) {
995 wake_up(&ns
->ns_waitq
);
996 spin_unlock(&ns
->ns_lock
);
999 EXPORT_SYMBOL(ldlm_namespace_put
);
1001 /** Should be called with ldlm_namespace_lock(client) taken. */
1002 void ldlm_namespace_move_to_active_locked(struct ldlm_namespace
*ns
,
1005 LASSERT(!list_empty(&ns
->ns_list_chain
));
1006 LASSERT(mutex_is_locked(ldlm_namespace_lock(client
)));
1007 list_move_tail(&ns
->ns_list_chain
, ldlm_namespace_list(client
));
1010 /** Should be called with ldlm_namespace_lock(client) taken. */
1011 void ldlm_namespace_move_to_inactive_locked(struct ldlm_namespace
*ns
,
1014 LASSERT(!list_empty(&ns
->ns_list_chain
));
1015 LASSERT(mutex_is_locked(ldlm_namespace_lock(client
)));
1016 list_move_tail(&ns
->ns_list_chain
, &ldlm_cli_inactive_namespace_list
);
1019 /** Should be called with ldlm_namespace_lock(client) taken. */
1020 struct ldlm_namespace
*ldlm_namespace_first_locked(ldlm_side_t client
)
1022 LASSERT(mutex_is_locked(ldlm_namespace_lock(client
)));
1023 LASSERT(!list_empty(ldlm_namespace_list(client
)));
1024 return container_of(ldlm_namespace_list(client
)->next
,
1025 struct ldlm_namespace
, ns_list_chain
);
1028 /** Create and initialize new resource. */
1029 static struct ldlm_resource
*ldlm_resource_new(void)
1031 struct ldlm_resource
*res
;
1034 res
= kmem_cache_alloc(ldlm_resource_slab
, GFP_NOFS
| __GFP_ZERO
);
1038 INIT_LIST_HEAD(&res
->lr_granted
);
1039 INIT_LIST_HEAD(&res
->lr_waiting
);
1041 /* Initialize interval trees for each lock mode. */
1042 for (idx
= 0; idx
< LCK_MODE_NUM
; idx
++) {
1043 res
->lr_itree
[idx
].lit_size
= 0;
1044 res
->lr_itree
[idx
].lit_mode
= 1 << idx
;
1045 res
->lr_itree
[idx
].lit_root
= NULL
;
1048 atomic_set(&res
->lr_refcount
, 1);
1049 spin_lock_init(&res
->lr_lock
);
1050 lu_ref_init(&res
->lr_reference
);
1052 /* The creator of the resource must unlock the mutex after LVB
1053 * initialization. */
1054 mutex_init(&res
->lr_lvb_mutex
);
1055 mutex_lock(&res
->lr_lvb_mutex
);
1061 * Return a reference to resource with given name, creating it if necessary.
1062 * Args: namespace with ns_lock unlocked
1063 * Locks: takes and releases NS hash-lock and res->lr_lock
1064 * Returns: referenced, unlocked ldlm_resource or NULL
1066 struct ldlm_resource
*
1067 ldlm_resource_get(struct ldlm_namespace
*ns
, struct ldlm_resource
*parent
,
1068 const struct ldlm_res_id
*name
, ldlm_type_t type
, int create
)
1070 struct hlist_node
*hnode
;
1071 struct ldlm_resource
*res
;
1072 struct cfs_hash_bd bd
;
1074 int ns_refcount
= 0;
1076 LASSERT(ns
!= NULL
);
1077 LASSERT(parent
== NULL
);
1078 LASSERT(ns
->ns_rs_hash
!= NULL
);
1079 LASSERT(name
->name
[0] != 0);
1081 cfs_hash_bd_get_and_lock(ns
->ns_rs_hash
, (void *)name
, &bd
, 0);
1082 hnode
= cfs_hash_bd_lookup_locked(ns
->ns_rs_hash
, &bd
, (void *)name
);
1083 if (hnode
!= NULL
) {
1084 cfs_hash_bd_unlock(ns
->ns_rs_hash
, &bd
, 0);
1085 res
= hlist_entry(hnode
, struct ldlm_resource
, lr_hash
);
1086 /* Synchronize with regard to resource creation. */
1087 if (ns
->ns_lvbo
&& ns
->ns_lvbo
->lvbo_init
) {
1088 mutex_lock(&res
->lr_lvb_mutex
);
1089 mutex_unlock(&res
->lr_lvb_mutex
);
1092 if (unlikely(res
->lr_lvb_len
< 0)) {
1093 ldlm_resource_putref(res
);
1099 version
= cfs_hash_bd_version_get(&bd
);
1100 cfs_hash_bd_unlock(ns
->ns_rs_hash
, &bd
, 0);
1105 LASSERTF(type
>= LDLM_MIN_TYPE
&& type
< LDLM_MAX_TYPE
,
1106 "type: %d\n", type
);
1107 res
= ldlm_resource_new();
1111 res
->lr_ns_bucket
= cfs_hash_bd_extra_get(ns
->ns_rs_hash
, &bd
);
1112 res
->lr_name
= *name
;
1113 res
->lr_type
= type
;
1114 res
->lr_most_restr
= LCK_NL
;
1116 cfs_hash_bd_lock(ns
->ns_rs_hash
, &bd
, 1);
1117 hnode
= (version
== cfs_hash_bd_version_get(&bd
)) ? NULL
:
1118 cfs_hash_bd_lookup_locked(ns
->ns_rs_hash
, &bd
, (void *)name
);
1120 if (hnode
!= NULL
) {
1121 /* Someone won the race and already added the resource. */
1122 cfs_hash_bd_unlock(ns
->ns_rs_hash
, &bd
, 1);
1123 /* Clean lu_ref for failed resource. */
1124 lu_ref_fini(&res
->lr_reference
);
1125 /* We have taken lr_lvb_mutex. Drop it. */
1126 mutex_unlock(&res
->lr_lvb_mutex
);
1127 kmem_cache_free(ldlm_resource_slab
, res
);
1129 res
= hlist_entry(hnode
, struct ldlm_resource
, lr_hash
);
1130 /* Synchronize with regard to resource creation. */
1131 if (ns
->ns_lvbo
&& ns
->ns_lvbo
->lvbo_init
) {
1132 mutex_lock(&res
->lr_lvb_mutex
);
1133 mutex_unlock(&res
->lr_lvb_mutex
);
1136 if (unlikely(res
->lr_lvb_len
< 0)) {
1137 ldlm_resource_putref(res
);
1142 /* We won! Let's add the resource. */
1143 cfs_hash_bd_add_locked(ns
->ns_rs_hash
, &bd
, &res
->lr_hash
);
1144 if (cfs_hash_bd_count_get(&bd
) == 1)
1145 ns_refcount
= ldlm_namespace_get_return(ns
);
1147 cfs_hash_bd_unlock(ns
->ns_rs_hash
, &bd
, 1);
1148 if (ns
->ns_lvbo
&& ns
->ns_lvbo
->lvbo_init
) {
1151 OBD_FAIL_TIMEOUT(OBD_FAIL_LDLM_CREATE_RESOURCE
, 2);
1152 rc
= ns
->ns_lvbo
->lvbo_init(res
);
1154 CERROR("%s: lvbo_init failed for resource %#llx:%#llx: rc = %d\n",
1155 ns
->ns_obd
->obd_name
, name
->name
[0],
1157 kfree(res
->lr_lvb_data
);
1158 res
->lr_lvb_data
= NULL
;
1159 res
->lr_lvb_len
= rc
;
1160 mutex_unlock(&res
->lr_lvb_mutex
);
1161 ldlm_resource_putref(res
);
1166 /* We create resource with locked lr_lvb_mutex. */
1167 mutex_unlock(&res
->lr_lvb_mutex
);
1169 /* Let's see if we happened to be the very first resource in this
1170 * namespace. If so, and this is a client namespace, we need to move
1171 * the namespace into the active namespaces list to be patrolled by
1172 * the ldlm_poold. */
1173 if (ns_refcount
== 1) {
1174 mutex_lock(ldlm_namespace_lock(LDLM_NAMESPACE_CLIENT
));
1175 ldlm_namespace_move_to_active_locked(ns
, LDLM_NAMESPACE_CLIENT
);
1176 mutex_unlock(ldlm_namespace_lock(LDLM_NAMESPACE_CLIENT
));
1181 EXPORT_SYMBOL(ldlm_resource_get
);
1183 static void __ldlm_resource_putref_final(struct cfs_hash_bd
*bd
,
1184 struct ldlm_resource
*res
)
1186 struct ldlm_ns_bucket
*nsb
= res
->lr_ns_bucket
;
1188 if (!list_empty(&res
->lr_granted
)) {
1189 ldlm_resource_dump(D_ERROR
, res
);
1193 if (!list_empty(&res
->lr_waiting
)) {
1194 ldlm_resource_dump(D_ERROR
, res
);
1198 cfs_hash_bd_del_locked(nsb
->nsb_namespace
->ns_rs_hash
,
1200 lu_ref_fini(&res
->lr_reference
);
1201 if (cfs_hash_bd_count_get(bd
) == 0)
1202 ldlm_namespace_put(nsb
->nsb_namespace
);
1205 /* Returns 1 if the resource was freed, 0 if it remains. */
1206 int ldlm_resource_putref(struct ldlm_resource
*res
)
1208 struct ldlm_namespace
*ns
= ldlm_res_to_ns(res
);
1209 struct cfs_hash_bd bd
;
1211 LASSERT_ATOMIC_GT_LT(&res
->lr_refcount
, 0, LI_POISON
);
1212 CDEBUG(D_INFO
, "putref res: %p count: %d\n",
1213 res
, atomic_read(&res
->lr_refcount
) - 1);
1215 cfs_hash_bd_get(ns
->ns_rs_hash
, &res
->lr_name
, &bd
);
1216 if (cfs_hash_bd_dec_and_lock(ns
->ns_rs_hash
, &bd
, &res
->lr_refcount
)) {
1217 __ldlm_resource_putref_final(&bd
, res
);
1218 cfs_hash_bd_unlock(ns
->ns_rs_hash
, &bd
, 1);
1219 if (ns
->ns_lvbo
&& ns
->ns_lvbo
->lvbo_free
)
1220 ns
->ns_lvbo
->lvbo_free(res
);
1221 kmem_cache_free(ldlm_resource_slab
, res
);
1226 EXPORT_SYMBOL(ldlm_resource_putref
);
1228 /* Returns 1 if the resource was freed, 0 if it remains. */
1229 int ldlm_resource_putref_locked(struct ldlm_resource
*res
)
1231 struct ldlm_namespace
*ns
= ldlm_res_to_ns(res
);
1233 LASSERT_ATOMIC_GT_LT(&res
->lr_refcount
, 0, LI_POISON
);
1234 CDEBUG(D_INFO
, "putref res: %p count: %d\n",
1235 res
, atomic_read(&res
->lr_refcount
) - 1);
1237 if (atomic_dec_and_test(&res
->lr_refcount
)) {
1238 struct cfs_hash_bd bd
;
1240 cfs_hash_bd_get(ldlm_res_to_ns(res
)->ns_rs_hash
,
1241 &res
->lr_name
, &bd
);
1242 __ldlm_resource_putref_final(&bd
, res
);
1243 cfs_hash_bd_unlock(ns
->ns_rs_hash
, &bd
, 1);
1244 /* NB: ns_rs_hash is created with CFS_HASH_NO_ITEMREF,
1245 * so we should never be here while calling cfs_hash_del,
1246 * cfs_hash_for_each_nolock is the only case we can get
1247 * here, which is safe to release cfs_hash_bd_lock.
1249 if (ns
->ns_lvbo
&& ns
->ns_lvbo
->lvbo_free
)
1250 ns
->ns_lvbo
->lvbo_free(res
);
1251 kmem_cache_free(ldlm_resource_slab
, res
);
1253 cfs_hash_bd_lock(ns
->ns_rs_hash
, &bd
, 1);
1260 * Add a lock into a given resource into specified lock list.
1262 void ldlm_resource_add_lock(struct ldlm_resource
*res
, struct list_head
*head
,
1263 struct ldlm_lock
*lock
)
1265 check_res_locked(res
);
1267 LDLM_DEBUG(lock
, "About to add this lock:\n");
1269 if (lock
->l_flags
& LDLM_FL_DESTROYED
) {
1270 CDEBUG(D_OTHER
, "Lock destroyed, not adding to resource\n");
1274 LASSERT(list_empty(&lock
->l_res_link
));
1276 list_add_tail(&lock
->l_res_link
, head
);
1279 void ldlm_resource_unlink_lock(struct ldlm_lock
*lock
)
1281 int type
= lock
->l_resource
->lr_type
;
1283 check_res_locked(lock
->l_resource
);
1284 if (type
== LDLM_IBITS
|| type
== LDLM_PLAIN
)
1285 ldlm_unlink_lock_skiplist(lock
);
1286 else if (type
== LDLM_EXTENT
)
1287 ldlm_extent_unlink_lock(lock
);
1288 list_del_init(&lock
->l_res_link
);
1290 EXPORT_SYMBOL(ldlm_resource_unlink_lock
);
1292 void ldlm_res2desc(struct ldlm_resource
*res
, struct ldlm_resource_desc
*desc
)
1294 desc
->lr_type
= res
->lr_type
;
1295 desc
->lr_name
= res
->lr_name
;
1299 * Print information about all locks in all namespaces on this node to debug
1302 void ldlm_dump_all_namespaces(ldlm_side_t client
, int level
)
1304 struct list_head
*tmp
;
1306 if (!((libcfs_debug
| D_ERROR
) & level
))
1309 mutex_lock(ldlm_namespace_lock(client
));
1311 list_for_each(tmp
, ldlm_namespace_list(client
)) {
1312 struct ldlm_namespace
*ns
;
1314 ns
= list_entry(tmp
, struct ldlm_namespace
, ns_list_chain
);
1315 ldlm_namespace_dump(level
, ns
);
1318 mutex_unlock(ldlm_namespace_lock(client
));
1320 EXPORT_SYMBOL(ldlm_dump_all_namespaces
);
1322 static int ldlm_res_hash_dump(struct cfs_hash
*hs
, struct cfs_hash_bd
*bd
,
1323 struct hlist_node
*hnode
, void *arg
)
1325 struct ldlm_resource
*res
= cfs_hash_object(hs
, hnode
);
1326 int level
= (int)(unsigned long)arg
;
1329 ldlm_resource_dump(level
, res
);
1336 * Print information about all locks in this namespace on this node to debug
1339 void ldlm_namespace_dump(int level
, struct ldlm_namespace
*ns
)
1341 if (!((libcfs_debug
| D_ERROR
) & level
))
1344 CDEBUG(level
, "--- Namespace: %s (rc: %d, side: client)\n",
1345 ldlm_ns_name(ns
), atomic_read(&ns
->ns_bref
));
1347 if (time_before(cfs_time_current(), ns
->ns_next_dump
))
1350 cfs_hash_for_each_nolock(ns
->ns_rs_hash
,
1352 (void *)(unsigned long)level
);
1353 spin_lock(&ns
->ns_lock
);
1354 ns
->ns_next_dump
= cfs_time_shift(10);
1355 spin_unlock(&ns
->ns_lock
);
1357 EXPORT_SYMBOL(ldlm_namespace_dump
);
1360 * Print information about all locks in this resource to debug log.
1362 void ldlm_resource_dump(int level
, struct ldlm_resource
*res
)
1364 struct ldlm_lock
*lock
;
1365 unsigned int granted
= 0;
1367 CLASSERT(RES_NAME_SIZE
== 4);
1369 if (!((libcfs_debug
| D_ERROR
) & level
))
1372 CDEBUG(level
, "--- Resource: "DLDLMRES
" (%p) refcount = %d\n",
1373 PLDLMRES(res
), res
, atomic_read(&res
->lr_refcount
));
1375 if (!list_empty(&res
->lr_granted
)) {
1376 CDEBUG(level
, "Granted locks (in reverse order):\n");
1377 list_for_each_entry_reverse(lock
, &res
->lr_granted
,
1379 LDLM_DEBUG_LIMIT(level
, lock
, "###");
1380 if (!(level
& D_CANTMASK
) &&
1381 ++granted
> ldlm_dump_granted_max
) {
1382 CDEBUG(level
, "only dump %d granted locks to avoid DDOS.\n",
1388 if (!list_empty(&res
->lr_waiting
)) {
1389 CDEBUG(level
, "Waiting locks:\n");
1390 list_for_each_entry(lock
, &res
->lr_waiting
, l_res_link
)
1391 LDLM_DEBUG_LIMIT(level
, lock
, "###");