4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License version 2 for more details (a copy is included
14 * in the LICENSE file that accompanied this code).
16 * You should have received a copy of the GNU General Public License
17 * version 2 along with this program; If not, see
18 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
20 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21 * CA 95054 USA or visit www.sun.com if you need additional information or
27 * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
28 * Use is subject to license terms.
30 * Copyright (c) 2010, 2012, Intel Corporation.
33 * This file is part of Lustre, http://www.lustre.org/
34 * Lustre is a trademark of Sun Microsystems, Inc.
36 * lustre/ldlm/ldlm_lockd.c
38 * Author: Peter Braam <braam@clusterfs.com>
39 * Author: Phil Schwan <phil@clusterfs.com>
42 #define DEBUG_SUBSYSTEM S_LDLM
44 #include "../../include/linux/libcfs/libcfs.h"
45 #include "../include/lustre_dlm.h"
46 #include "../include/obd_class.h"
47 #include <linux/list.h>
48 #include "ldlm_internal.h"
50 static int ldlm_num_threads
;
51 module_param(ldlm_num_threads
, int, 0444);
52 MODULE_PARM_DESC(ldlm_num_threads
, "number of DLM service threads to start");
54 static char *ldlm_cpts
;
55 module_param(ldlm_cpts
, charp
, 0444);
56 MODULE_PARM_DESC(ldlm_cpts
, "CPU partitions ldlm threads should run on");
58 static struct mutex ldlm_ref_mutex
;
59 static int ldlm_refcount
;
61 static struct kobject
*ldlm_kobj
;
62 struct kset
*ldlm_ns_kset
;
63 static struct kset
*ldlm_svc_kset
;
65 struct ldlm_cb_async_args
{
66 struct ldlm_cb_set_arg
*ca_set_arg
;
67 struct ldlm_lock
*ca_lock
;
72 static struct ldlm_state
*ldlm_state
;
76 #define ELT_TERMINATE 2
82 * blp_prio_list is used for callbacks that should be handled
83 * as a priority. It is used for LDLM_FL_DISCARD_DATA requests.
86 struct list_head blp_prio_list
;
89 * blp_list is used for all other callbacks which are likely
90 * to take longer to process.
92 struct list_head blp_list
;
94 wait_queue_head_t blp_waitq
;
95 struct completion blp_comp
;
96 atomic_t blp_num_threads
;
97 atomic_t blp_busy_threads
;
102 struct ldlm_bl_work_item
{
103 struct list_head blwi_entry
;
104 struct ldlm_namespace
*blwi_ns
;
105 struct ldlm_lock_desc blwi_ld
;
106 struct ldlm_lock
*blwi_lock
;
107 struct list_head blwi_head
;
109 struct completion blwi_comp
;
110 ldlm_cancel_flags_t blwi_flags
;
111 int blwi_mem_pressure
;
115 * Callback handler for receiving incoming blocking ASTs.
117 * This can only happen on client side.
119 void ldlm_handle_bl_callback(struct ldlm_namespace
*ns
,
120 struct ldlm_lock_desc
*ld
, struct ldlm_lock
*lock
)
124 LDLM_DEBUG(lock
, "client blocking AST callback handler");
126 lock_res_and_lock(lock
);
127 lock
->l_flags
|= LDLM_FL_CBPENDING
;
129 if (lock
->l_flags
& LDLM_FL_CANCEL_ON_BLOCK
)
130 lock
->l_flags
|= LDLM_FL_CANCEL
;
132 do_ast
= !lock
->l_readers
&& !lock
->l_writers
;
133 unlock_res_and_lock(lock
);
137 "Lock %p already unused, calling callback (%p)\n", lock
,
138 lock
->l_blocking_ast
);
139 if (lock
->l_blocking_ast
!= NULL
)
140 lock
->l_blocking_ast(lock
, ld
, lock
->l_ast_data
,
144 "Lock %p is referenced, will be cancelled later\n",
148 LDLM_DEBUG(lock
, "client blocking callback handler END");
149 LDLM_LOCK_RELEASE(lock
);
153 * Callback handler for receiving incoming completion ASTs.
155 * This only can happen on client side.
157 static void ldlm_handle_cp_callback(struct ptlrpc_request
*req
,
158 struct ldlm_namespace
*ns
,
159 struct ldlm_request
*dlm_req
,
160 struct ldlm_lock
*lock
)
166 LDLM_DEBUG(lock
, "client completion callback handler START");
168 if (OBD_FAIL_CHECK(OBD_FAIL_LDLM_CANCEL_BL_CB_RACE
)) {
169 int to
= cfs_time_seconds(1);
172 set_current_state(TASK_INTERRUPTIBLE
);
173 schedule_timeout(to
);
174 if (lock
->l_granted_mode
== lock
->l_req_mode
||
175 lock
->l_flags
& LDLM_FL_DESTROYED
)
180 lvb_len
= req_capsule_get_size(&req
->rq_pill
, &RMF_DLM_LVB
, RCL_CLIENT
);
182 LDLM_ERROR(lock
, "Fail to get lvb_len, rc = %d", lvb_len
);
185 } else if (lvb_len
> 0) {
186 if (lock
->l_lvb_len
> 0) {
187 /* for extent lock, lvb contains ost_lvb{}. */
188 LASSERT(lock
->l_lvb_data
!= NULL
);
190 if (unlikely(lock
->l_lvb_len
< lvb_len
)) {
191 LDLM_ERROR(lock
, "Replied LVB is larger than expectation, expected = %d, replied = %d",
192 lock
->l_lvb_len
, lvb_len
);
196 } else if (ldlm_has_layout(lock
)) { /* for layout lock, lvb has
200 lvb_data
= kzalloc(lvb_len
, GFP_NOFS
);
202 LDLM_ERROR(lock
, "No memory: %d.\n", lvb_len
);
207 lock_res_and_lock(lock
);
208 LASSERT(lock
->l_lvb_data
== NULL
);
209 lock
->l_lvb_type
= LVB_T_LAYOUT
;
210 lock
->l_lvb_data
= lvb_data
;
211 lock
->l_lvb_len
= lvb_len
;
212 unlock_res_and_lock(lock
);
216 lock_res_and_lock(lock
);
217 if ((lock
->l_flags
& LDLM_FL_DESTROYED
) ||
218 lock
->l_granted_mode
== lock
->l_req_mode
) {
219 /* bug 11300: the lock has already been granted */
220 unlock_res_and_lock(lock
);
221 LDLM_DEBUG(lock
, "Double grant race happened");
226 /* If we receive the completion AST before the actual enqueue returned,
227 * then we might need to switch lock modes, resources, or extents. */
228 if (dlm_req
->lock_desc
.l_granted_mode
!= lock
->l_req_mode
) {
229 lock
->l_req_mode
= dlm_req
->lock_desc
.l_granted_mode
;
230 LDLM_DEBUG(lock
, "completion AST, new lock mode");
233 if (lock
->l_resource
->lr_type
!= LDLM_PLAIN
) {
234 ldlm_convert_policy_to_local(req
->rq_export
,
235 dlm_req
->lock_desc
.l_resource
.lr_type
,
236 &dlm_req
->lock_desc
.l_policy_data
,
237 &lock
->l_policy_data
);
238 LDLM_DEBUG(lock
, "completion AST, new policy data");
241 ldlm_resource_unlink_lock(lock
);
242 if (memcmp(&dlm_req
->lock_desc
.l_resource
.lr_name
,
243 &lock
->l_resource
->lr_name
,
244 sizeof(lock
->l_resource
->lr_name
)) != 0) {
245 unlock_res_and_lock(lock
);
246 rc
= ldlm_lock_change_resource(ns
, lock
,
247 &dlm_req
->lock_desc
.l_resource
.lr_name
);
249 LDLM_ERROR(lock
, "Failed to allocate resource");
252 LDLM_DEBUG(lock
, "completion AST, new resource");
253 CERROR("change resource!\n");
254 lock_res_and_lock(lock
);
257 if (dlm_req
->lock_flags
& LDLM_FL_AST_SENT
) {
258 /* BL_AST locks are not needed in LRU.
259 * Let ldlm_cancel_lru() be fast. */
260 ldlm_lock_remove_from_lru(lock
);
261 lock
->l_flags
|= LDLM_FL_CBPENDING
| LDLM_FL_BL_AST
;
262 LDLM_DEBUG(lock
, "completion AST includes blocking AST");
265 if (lock
->l_lvb_len
> 0) {
266 rc
= ldlm_fill_lvb(lock
, &req
->rq_pill
, RCL_CLIENT
,
267 lock
->l_lvb_data
, lvb_len
);
269 unlock_res_and_lock(lock
);
274 ldlm_grant_lock(lock
, &ast_list
);
275 unlock_res_and_lock(lock
);
277 LDLM_DEBUG(lock
, "callback handler finished, about to run_ast_work");
279 /* Let Enqueue to call osc_lock_upcall() and initialize
281 OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_ENQ_RACE
, 2);
283 ldlm_run_ast_work(ns
, &ast_list
, LDLM_WORK_CP_AST
);
285 LDLM_DEBUG_NOLOCK("client completion callback handler END (lock %p)",
291 lock_res_and_lock(lock
);
292 lock
->l_flags
|= LDLM_FL_FAILED
;
293 unlock_res_and_lock(lock
);
294 wake_up(&lock
->l_waitq
);
296 LDLM_LOCK_RELEASE(lock
);
300 * Callback handler for receiving incoming glimpse ASTs.
302 * This only can happen on client side. After handling the glimpse AST
303 * we also consider dropping the lock here if it is unused locally for a
306 static void ldlm_handle_gl_callback(struct ptlrpc_request
*req
,
307 struct ldlm_namespace
*ns
,
308 struct ldlm_request
*dlm_req
,
309 struct ldlm_lock
*lock
)
313 LDLM_DEBUG(lock
, "client glimpse AST callback handler");
315 if (lock
->l_glimpse_ast
!= NULL
)
316 rc
= lock
->l_glimpse_ast(lock
, req
);
318 if (req
->rq_repmsg
!= NULL
) {
325 lock_res_and_lock(lock
);
326 if (lock
->l_granted_mode
== LCK_PW
&&
327 !lock
->l_readers
&& !lock
->l_writers
&&
328 cfs_time_after(cfs_time_current(),
329 cfs_time_add(lock
->l_last_used
,
330 cfs_time_seconds(10)))) {
331 unlock_res_and_lock(lock
);
332 if (ldlm_bl_to_thread_lock(ns
, NULL
, lock
))
333 ldlm_handle_bl_callback(ns
, NULL
, lock
);
337 unlock_res_and_lock(lock
);
338 LDLM_LOCK_RELEASE(lock
);
341 static int ldlm_callback_reply(struct ptlrpc_request
*req
, int rc
)
343 if (req
->rq_no_reply
)
347 if (!req
->rq_packed_final
) {
348 rc
= lustre_pack_reply(req
, 1, NULL
, NULL
);
352 return ptlrpc_reply(req
);
355 static int __ldlm_bl_to_thread(struct ldlm_bl_work_item
*blwi
,
356 ldlm_cancel_flags_t cancel_flags
)
358 struct ldlm_bl_pool
*blp
= ldlm_state
->ldlm_bl_pool
;
360 spin_lock(&blp
->blp_lock
);
361 if (blwi
->blwi_lock
&&
362 blwi
->blwi_lock
->l_flags
& LDLM_FL_DISCARD_DATA
) {
363 /* add LDLM_FL_DISCARD_DATA requests to the priority list */
364 list_add_tail(&blwi
->blwi_entry
, &blp
->blp_prio_list
);
366 /* other blocking callbacks are added to the regular list */
367 list_add_tail(&blwi
->blwi_entry
, &blp
->blp_list
);
369 spin_unlock(&blp
->blp_lock
);
371 wake_up(&blp
->blp_waitq
);
373 /* can not check blwi->blwi_flags as blwi could be already freed in
375 if (!(cancel_flags
& LCF_ASYNC
))
376 wait_for_completion(&blwi
->blwi_comp
);
381 static inline void init_blwi(struct ldlm_bl_work_item
*blwi
,
382 struct ldlm_namespace
*ns
,
383 struct ldlm_lock_desc
*ld
,
384 struct list_head
*cancels
, int count
,
385 struct ldlm_lock
*lock
,
386 ldlm_cancel_flags_t cancel_flags
)
388 init_completion(&blwi
->blwi_comp
);
389 INIT_LIST_HEAD(&blwi
->blwi_head
);
391 if (memory_pressure_get())
392 blwi
->blwi_mem_pressure
= 1;
395 blwi
->blwi_flags
= cancel_flags
;
399 list_add(&blwi
->blwi_head
, cancels
);
400 list_del_init(cancels
);
401 blwi
->blwi_count
= count
;
403 blwi
->blwi_lock
= lock
;
408 * Queues a list of locks \a cancels containing \a count locks
409 * for later processing by a blocking thread. If \a count is zero,
410 * then the lock referenced as \a lock is queued instead.
412 * The blocking thread would then call ->l_blocking_ast callback in the lock.
413 * If list addition fails an error is returned and caller is supposed to
414 * call ->l_blocking_ast itself.
416 static int ldlm_bl_to_thread(struct ldlm_namespace
*ns
,
417 struct ldlm_lock_desc
*ld
,
418 struct ldlm_lock
*lock
,
419 struct list_head
*cancels
, int count
,
420 ldlm_cancel_flags_t cancel_flags
)
422 if (cancels
&& count
== 0)
425 if (cancel_flags
& LCF_ASYNC
) {
426 struct ldlm_bl_work_item
*blwi
;
428 blwi
= kzalloc(sizeof(*blwi
), GFP_NOFS
);
431 init_blwi(blwi
, ns
, ld
, cancels
, count
, lock
, cancel_flags
);
433 return __ldlm_bl_to_thread(blwi
, cancel_flags
);
435 /* if it is synchronous call do minimum mem alloc, as it could
436 * be triggered from kernel shrinker
438 struct ldlm_bl_work_item blwi
;
440 memset(&blwi
, 0, sizeof(blwi
));
441 init_blwi(&blwi
, ns
, ld
, cancels
, count
, lock
, cancel_flags
);
442 return __ldlm_bl_to_thread(&blwi
, cancel_flags
);
446 int ldlm_bl_to_thread_lock(struct ldlm_namespace
*ns
, struct ldlm_lock_desc
*ld
,
447 struct ldlm_lock
*lock
)
449 return ldlm_bl_to_thread(ns
, ld
, lock
, NULL
, 0, LCF_ASYNC
);
452 int ldlm_bl_to_thread_list(struct ldlm_namespace
*ns
, struct ldlm_lock_desc
*ld
,
453 struct list_head
*cancels
, int count
,
454 ldlm_cancel_flags_t cancel_flags
)
456 return ldlm_bl_to_thread(ns
, ld
, NULL
, cancels
, count
, cancel_flags
);
459 /* Setinfo coming from Server (eg MDT) to Client (eg MDC)! */
460 static int ldlm_handle_setinfo(struct ptlrpc_request
*req
)
462 struct obd_device
*obd
= req
->rq_export
->exp_obd
;
468 DEBUG_REQ(D_HSM
, req
, "%s: handle setinfo\n", obd
->obd_name
);
470 req_capsule_set(&req
->rq_pill
, &RQF_OBD_SET_INFO
);
472 key
= req_capsule_client_get(&req
->rq_pill
, &RMF_SETINFO_KEY
);
474 DEBUG_REQ(D_IOCTL
, req
, "no set_info key");
477 keylen
= req_capsule_get_size(&req
->rq_pill
, &RMF_SETINFO_KEY
,
479 val
= req_capsule_client_get(&req
->rq_pill
, &RMF_SETINFO_VAL
);
481 DEBUG_REQ(D_IOCTL
, req
, "no set_info val");
484 vallen
= req_capsule_get_size(&req
->rq_pill
, &RMF_SETINFO_VAL
,
487 /* We are responsible for swabbing contents of val */
489 if (KEY_IS(KEY_HSM_COPYTOOL_SEND
))
490 /* Pass it on to mdc (the "export" in this case) */
491 rc
= obd_set_info_async(req
->rq_svc_thread
->t_env
,
493 sizeof(KEY_HSM_COPYTOOL_SEND
),
494 KEY_HSM_COPYTOOL_SEND
,
497 DEBUG_REQ(D_WARNING
, req
, "ignoring unknown key %s", key
);
502 static inline void ldlm_callback_errmsg(struct ptlrpc_request
*req
,
503 const char *msg
, int rc
,
504 struct lustre_handle
*handle
)
506 DEBUG_REQ((req
->rq_no_reply
|| rc
) ? D_WARNING
: D_DLMTRACE
, req
,
507 "%s: [nid %s] [rc %d] [lock %#llx]",
508 msg
, libcfs_id2str(req
->rq_peer
), rc
,
509 handle
? handle
->cookie
: 0);
510 if (req
->rq_no_reply
)
511 CWARN("No reply was sent, maybe cause bug 21636.\n");
513 CWARN("Send reply failed, maybe cause bug 21636.\n");
516 static int ldlm_handle_qc_callback(struct ptlrpc_request
*req
)
518 struct obd_quotactl
*oqctl
;
519 struct client_obd
*cli
= &req
->rq_export
->exp_obd
->u
.cli
;
521 oqctl
= req_capsule_client_get(&req
->rq_pill
, &RMF_OBD_QUOTACTL
);
523 CERROR("Can't unpack obd_quotactl\n");
527 oqctl
->qc_stat
= ptlrpc_status_ntoh(oqctl
->qc_stat
);
529 cli
->cl_qchk_stat
= oqctl
->qc_stat
;
533 /* TODO: handle requests in a similar way as MDT: see mdt_handle_common() */
534 static int ldlm_callback_handler(struct ptlrpc_request
*req
)
536 struct ldlm_namespace
*ns
;
537 struct ldlm_request
*dlm_req
;
538 struct ldlm_lock
*lock
;
541 /* Requests arrive in sender's byte order. The ptlrpc service
542 * handler has already checked and, if necessary, byte-swapped the
543 * incoming request message body, but I am responsible for the
544 * message buffers. */
546 /* do nothing for sec context finalize */
547 if (lustre_msg_get_opc(req
->rq_reqmsg
) == SEC_CTX_FINI
)
550 req_capsule_init(&req
->rq_pill
, req
, RCL_SERVER
);
552 if (req
->rq_export
== NULL
) {
553 rc
= ldlm_callback_reply(req
, -ENOTCONN
);
554 ldlm_callback_errmsg(req
, "Operate on unconnected server",
559 LASSERT(req
->rq_export
!= NULL
);
560 LASSERT(req
->rq_export
->exp_obd
!= NULL
);
562 switch (lustre_msg_get_opc(req
->rq_reqmsg
)) {
563 case LDLM_BL_CALLBACK
:
564 if (OBD_FAIL_CHECK(OBD_FAIL_LDLM_BL_CALLBACK_NET
))
567 case LDLM_CP_CALLBACK
:
568 if (OBD_FAIL_CHECK(OBD_FAIL_LDLM_CP_CALLBACK_NET
))
571 case LDLM_GL_CALLBACK
:
572 if (OBD_FAIL_CHECK(OBD_FAIL_LDLM_GL_CALLBACK_NET
))
576 rc
= ldlm_handle_setinfo(req
);
577 ldlm_callback_reply(req
, rc
);
579 case OBD_QC_CALLBACK
:
580 req_capsule_set(&req
->rq_pill
, &RQF_QC_CALLBACK
);
581 if (OBD_FAIL_CHECK(OBD_FAIL_OBD_QC_CALLBACK_NET
))
583 rc
= ldlm_handle_qc_callback(req
);
584 ldlm_callback_reply(req
, rc
);
587 CERROR("unknown opcode %u\n",
588 lustre_msg_get_opc(req
->rq_reqmsg
));
589 ldlm_callback_reply(req
, -EPROTO
);
593 ns
= req
->rq_export
->exp_obd
->obd_namespace
;
596 req_capsule_set(&req
->rq_pill
, &RQF_LDLM_CALLBACK
);
598 dlm_req
= req_capsule_client_get(&req
->rq_pill
, &RMF_DLM_REQ
);
599 if (dlm_req
== NULL
) {
600 rc
= ldlm_callback_reply(req
, -EPROTO
);
601 ldlm_callback_errmsg(req
, "Operate without parameter", rc
,
606 /* Force a known safe race, send a cancel to the server for a lock
607 * which the server has already started a blocking callback on. */
608 if (OBD_FAIL_CHECK(OBD_FAIL_LDLM_CANCEL_BL_CB_RACE
) &&
609 lustre_msg_get_opc(req
->rq_reqmsg
) == LDLM_BL_CALLBACK
) {
610 rc
= ldlm_cli_cancel(&dlm_req
->lock_handle
[0], 0);
612 CERROR("ldlm_cli_cancel: %d\n", rc
);
615 lock
= ldlm_handle2lock_long(&dlm_req
->lock_handle
[0], 0);
617 CDEBUG(D_DLMTRACE
, "callback on lock %#llx - lock disappeared\n",
618 dlm_req
->lock_handle
[0].cookie
);
619 rc
= ldlm_callback_reply(req
, -EINVAL
);
620 ldlm_callback_errmsg(req
, "Operate with invalid parameter", rc
,
621 &dlm_req
->lock_handle
[0]);
625 if ((lock
->l_flags
& LDLM_FL_FAIL_LOC
) &&
626 lustre_msg_get_opc(req
->rq_reqmsg
) == LDLM_BL_CALLBACK
)
627 OBD_RACE(OBD_FAIL_LDLM_CP_BL_RACE
);
629 /* Copy hints/flags (e.g. LDLM_FL_DISCARD_DATA) from AST. */
630 lock_res_and_lock(lock
);
631 lock
->l_flags
|= ldlm_flags_from_wire(dlm_req
->lock_flags
&
633 if (lustre_msg_get_opc(req
->rq_reqmsg
) == LDLM_BL_CALLBACK
) {
634 /* If somebody cancels lock and cache is already dropped,
635 * or lock is failed before cp_ast received on client,
636 * we can tell the server we have no lock. Otherwise, we
637 * should send cancel after dropping the cache. */
638 if (((lock
->l_flags
& LDLM_FL_CANCELING
) &&
639 (lock
->l_flags
& LDLM_FL_BL_DONE
)) ||
640 (lock
->l_flags
& LDLM_FL_FAILED
)) {
641 LDLM_DEBUG(lock
, "callback on lock %#llx - lock disappeared\n",
642 dlm_req
->lock_handle
[0].cookie
);
643 unlock_res_and_lock(lock
);
644 LDLM_LOCK_RELEASE(lock
);
645 rc
= ldlm_callback_reply(req
, -EINVAL
);
646 ldlm_callback_errmsg(req
, "Operate on stale lock", rc
,
647 &dlm_req
->lock_handle
[0]);
650 /* BL_AST locks are not needed in LRU.
651 * Let ldlm_cancel_lru() be fast. */
652 ldlm_lock_remove_from_lru(lock
);
653 lock
->l_flags
|= LDLM_FL_BL_AST
;
655 unlock_res_and_lock(lock
);
657 /* We want the ost thread to get this reply so that it can respond
658 * to ost requests (write cache writeback) that might be triggered
661 * But we'd also like to be able to indicate in the reply that we're
662 * cancelling right now, because it's unused, or have an intent result
663 * in the reply, so we might have to push the responsibility for sending
664 * the reply down into the AST handlers, alas. */
666 switch (lustre_msg_get_opc(req
->rq_reqmsg
)) {
667 case LDLM_BL_CALLBACK
:
668 CDEBUG(D_INODE
, "blocking ast\n");
669 req_capsule_extend(&req
->rq_pill
, &RQF_LDLM_BL_CALLBACK
);
670 if (!(lock
->l_flags
& LDLM_FL_CANCEL_ON_BLOCK
)) {
671 rc
= ldlm_callback_reply(req
, 0);
672 if (req
->rq_no_reply
|| rc
)
673 ldlm_callback_errmsg(req
, "Normal process", rc
,
674 &dlm_req
->lock_handle
[0]);
676 if (ldlm_bl_to_thread_lock(ns
, &dlm_req
->lock_desc
, lock
))
677 ldlm_handle_bl_callback(ns
, &dlm_req
->lock_desc
, lock
);
679 case LDLM_CP_CALLBACK
:
680 CDEBUG(D_INODE
, "completion ast\n");
681 req_capsule_extend(&req
->rq_pill
, &RQF_LDLM_CP_CALLBACK
);
682 ldlm_callback_reply(req
, 0);
683 ldlm_handle_cp_callback(req
, ns
, dlm_req
, lock
);
685 case LDLM_GL_CALLBACK
:
686 CDEBUG(D_INODE
, "glimpse ast\n");
687 req_capsule_extend(&req
->rq_pill
, &RQF_LDLM_GL_CALLBACK
);
688 ldlm_handle_gl_callback(req
, ns
, dlm_req
, lock
);
691 LBUG(); /* checked above */
697 static struct ldlm_bl_work_item
*ldlm_bl_get_work(struct ldlm_bl_pool
*blp
)
699 struct ldlm_bl_work_item
*blwi
= NULL
;
700 static unsigned int num_bl
;
702 spin_lock(&blp
->blp_lock
);
703 /* process a request from the blp_list at least every blp_num_threads */
704 if (!list_empty(&blp
->blp_list
) &&
705 (list_empty(&blp
->blp_prio_list
) || num_bl
== 0))
706 blwi
= list_entry(blp
->blp_list
.next
,
707 struct ldlm_bl_work_item
, blwi_entry
);
709 if (!list_empty(&blp
->blp_prio_list
))
710 blwi
= list_entry(blp
->blp_prio_list
.next
,
711 struct ldlm_bl_work_item
,
715 if (++num_bl
>= atomic_read(&blp
->blp_num_threads
))
717 list_del(&blwi
->blwi_entry
);
719 spin_unlock(&blp
->blp_lock
);
724 /* This only contains temporary data until the thread starts */
725 struct ldlm_bl_thread_data
{
726 char bltd_name
[CFS_CURPROC_COMM_MAX
];
727 struct ldlm_bl_pool
*bltd_blp
;
728 struct completion bltd_comp
;
732 static int ldlm_bl_thread_main(void *arg
);
734 static int ldlm_bl_thread_start(struct ldlm_bl_pool
*blp
)
736 struct ldlm_bl_thread_data bltd
= { .bltd_blp
= blp
};
737 struct task_struct
*task
;
739 init_completion(&bltd
.bltd_comp
);
740 bltd
.bltd_num
= atomic_read(&blp
->blp_num_threads
);
741 snprintf(bltd
.bltd_name
, sizeof(bltd
.bltd_name
),
742 "ldlm_bl_%02d", bltd
.bltd_num
);
743 task
= kthread_run(ldlm_bl_thread_main
, &bltd
, "%s", bltd
.bltd_name
);
745 CERROR("cannot start LDLM thread ldlm_bl_%02d: rc %ld\n",
746 atomic_read(&blp
->blp_num_threads
), PTR_ERR(task
));
747 return PTR_ERR(task
);
749 wait_for_completion(&bltd
.bltd_comp
);
755 * Main blocking requests processing thread.
757 * Callers put locks into its queue by calling ldlm_bl_to_thread.
758 * This thread in the end ends up doing actual call to ->l_blocking_ast
761 static int ldlm_bl_thread_main(void *arg
)
763 struct ldlm_bl_pool
*blp
;
766 struct ldlm_bl_thread_data
*bltd
= arg
;
768 blp
= bltd
->bltd_blp
;
770 atomic_inc(&blp
->blp_num_threads
);
771 atomic_inc(&blp
->blp_busy_threads
);
773 complete(&bltd
->bltd_comp
);
774 /* cannot use bltd after this, it is only on caller's stack */
778 struct l_wait_info lwi
= { 0 };
779 struct ldlm_bl_work_item
*blwi
= NULL
;
782 blwi
= ldlm_bl_get_work(blp
);
785 atomic_dec(&blp
->blp_busy_threads
);
786 l_wait_event_exclusive(blp
->blp_waitq
,
787 (blwi
= ldlm_bl_get_work(blp
)) != NULL
,
789 busy
= atomic_inc_return(&blp
->blp_busy_threads
);
791 busy
= atomic_read(&blp
->blp_busy_threads
);
794 if (blwi
->blwi_ns
== NULL
)
795 /* added by ldlm_cleanup() */
798 /* Not fatal if racy and have a few too many threads */
799 if (unlikely(busy
< blp
->blp_max_threads
&&
800 busy
>= atomic_read(&blp
->blp_num_threads
) &&
801 !blwi
->blwi_mem_pressure
))
802 /* discard the return value, we tried */
803 ldlm_bl_thread_start(blp
);
805 if (blwi
->blwi_mem_pressure
)
806 memory_pressure_set();
808 if (blwi
->blwi_count
) {
810 /* The special case when we cancel locks in LRU
811 * asynchronously, we pass the list of locks here.
812 * Thus locks are marked LDLM_FL_CANCELING, but NOT
813 * canceled locally yet. */
814 count
= ldlm_cli_cancel_list_local(&blwi
->blwi_head
,
817 ldlm_cli_cancel_list(&blwi
->blwi_head
, count
, NULL
,
820 ldlm_handle_bl_callback(blwi
->blwi_ns
, &blwi
->blwi_ld
,
823 if (blwi
->blwi_mem_pressure
)
824 memory_pressure_clr();
826 if (blwi
->blwi_flags
& LCF_ASYNC
)
829 complete(&blwi
->blwi_comp
);
832 atomic_dec(&blp
->blp_busy_threads
);
833 atomic_dec(&blp
->blp_num_threads
);
834 complete(&blp
->blp_comp
);
838 static int ldlm_setup(void);
839 static int ldlm_cleanup(void);
841 int ldlm_get_ref(void)
845 mutex_lock(&ldlm_ref_mutex
);
846 if (++ldlm_refcount
== 1) {
851 mutex_unlock(&ldlm_ref_mutex
);
855 EXPORT_SYMBOL(ldlm_get_ref
);
857 void ldlm_put_ref(void)
859 mutex_lock(&ldlm_ref_mutex
);
860 if (ldlm_refcount
== 1) {
861 int rc
= ldlm_cleanup();
864 CERROR("ldlm_cleanup failed: %d\n", rc
);
870 mutex_unlock(&ldlm_ref_mutex
);
872 EXPORT_SYMBOL(ldlm_put_ref
);
874 extern unsigned int ldlm_cancel_unused_locks_before_replay
;
876 static ssize_t
cancel_unused_locks_before_replay_show(struct kobject
*kobj
,
877 struct attribute
*attr
,
880 return sprintf(buf
, "%d\n", ldlm_cancel_unused_locks_before_replay
);
883 static ssize_t
cancel_unused_locks_before_replay_store(struct kobject
*kobj
,
884 struct attribute
*attr
,
891 rc
= kstrtoul(buffer
, 10, &val
);
895 ldlm_cancel_unused_locks_before_replay
= val
;
899 LUSTRE_RW_ATTR(cancel_unused_locks_before_replay
);
901 /* These are for root of /sys/fs/lustre/ldlm */
902 static struct attribute
*ldlm_attrs
[] = {
903 &lustre_attr_cancel_unused_locks_before_replay
.attr
,
907 static struct attribute_group ldlm_attr_group
= {
911 static int ldlm_setup(void)
913 static struct ptlrpc_service_conf conf
;
914 struct ldlm_bl_pool
*blp
= NULL
;
918 if (ldlm_state
!= NULL
)
921 ldlm_state
= kzalloc(sizeof(*ldlm_state
), GFP_NOFS
);
925 ldlm_kobj
= kobject_create_and_add("ldlm", lustre_kobj
);
931 rc
= sysfs_create_group(ldlm_kobj
, &ldlm_attr_group
);
935 ldlm_ns_kset
= kset_create_and_add("namespaces", NULL
, ldlm_kobj
);
941 ldlm_svc_kset
= kset_create_and_add("services", NULL
, ldlm_kobj
);
942 if (!ldlm_svc_kset
) {
947 rc
= ldlm_debugfs_setup();
951 memset(&conf
, 0, sizeof(conf
));
952 conf
= (typeof(conf
)) {
953 .psc_name
= "ldlm_cbd",
954 .psc_watchdog_factor
= 2,
956 .bc_nbufs
= LDLM_CLIENT_NBUFS
,
957 .bc_buf_size
= LDLM_BUFSIZE
,
958 .bc_req_max_size
= LDLM_MAXREQSIZE
,
959 .bc_rep_max_size
= LDLM_MAXREPSIZE
,
960 .bc_req_portal
= LDLM_CB_REQUEST_PORTAL
,
961 .bc_rep_portal
= LDLM_CB_REPLY_PORTAL
,
964 .tc_thr_name
= "ldlm_cb",
965 .tc_thr_factor
= LDLM_THR_FACTOR
,
966 .tc_nthrs_init
= LDLM_NTHRS_INIT
,
967 .tc_nthrs_base
= LDLM_NTHRS_BASE
,
968 .tc_nthrs_max
= LDLM_NTHRS_MAX
,
969 .tc_nthrs_user
= ldlm_num_threads
,
970 .tc_cpu_affinity
= 1,
971 .tc_ctx_tags
= LCT_MD_THREAD
| LCT_DT_THREAD
,
974 .cc_pattern
= ldlm_cpts
,
977 .so_req_handler
= ldlm_callback_handler
,
980 ldlm_state
->ldlm_cb_service
=
981 ptlrpc_register_service(&conf
, ldlm_svc_kset
,
982 ldlm_svc_debugfs_dir
);
983 if (IS_ERR(ldlm_state
->ldlm_cb_service
)) {
984 CERROR("failed to start service\n");
985 rc
= PTR_ERR(ldlm_state
->ldlm_cb_service
);
986 ldlm_state
->ldlm_cb_service
= NULL
;
990 blp
= kzalloc(sizeof(*blp
), GFP_NOFS
);
995 ldlm_state
->ldlm_bl_pool
= blp
;
997 spin_lock_init(&blp
->blp_lock
);
998 INIT_LIST_HEAD(&blp
->blp_list
);
999 INIT_LIST_HEAD(&blp
->blp_prio_list
);
1000 init_waitqueue_head(&blp
->blp_waitq
);
1001 atomic_set(&blp
->blp_num_threads
, 0);
1002 atomic_set(&blp
->blp_busy_threads
, 0);
1004 if (ldlm_num_threads
== 0) {
1005 blp
->blp_min_threads
= LDLM_NTHRS_INIT
;
1006 blp
->blp_max_threads
= LDLM_NTHRS_MAX
;
1008 blp
->blp_min_threads
= blp
->blp_max_threads
=
1009 min_t(int, LDLM_NTHRS_MAX
, max_t(int, LDLM_NTHRS_INIT
,
1013 for (i
= 0; i
< blp
->blp_min_threads
; i
++) {
1014 rc
= ldlm_bl_thread_start(blp
);
1019 rc
= ldlm_pools_init();
1021 CERROR("Failed to initialize LDLM pools: %d\n", rc
);
1031 static int ldlm_cleanup(void)
1033 if (!list_empty(ldlm_namespace_list(LDLM_NAMESPACE_SERVER
)) ||
1034 !list_empty(ldlm_namespace_list(LDLM_NAMESPACE_CLIENT
))) {
1035 CERROR("ldlm still has namespaces; clean these up first.\n");
1036 ldlm_dump_all_namespaces(LDLM_NAMESPACE_SERVER
, D_DLMTRACE
);
1037 ldlm_dump_all_namespaces(LDLM_NAMESPACE_CLIENT
, D_DLMTRACE
);
1043 if (ldlm_state
->ldlm_bl_pool
!= NULL
) {
1044 struct ldlm_bl_pool
*blp
= ldlm_state
->ldlm_bl_pool
;
1046 while (atomic_read(&blp
->blp_num_threads
) > 0) {
1047 struct ldlm_bl_work_item blwi
= { .blwi_ns
= NULL
};
1049 init_completion(&blp
->blp_comp
);
1051 spin_lock(&blp
->blp_lock
);
1052 list_add_tail(&blwi
.blwi_entry
, &blp
->blp_list
);
1053 wake_up(&blp
->blp_waitq
);
1054 spin_unlock(&blp
->blp_lock
);
1056 wait_for_completion(&blp
->blp_comp
);
1062 if (ldlm_state
->ldlm_cb_service
!= NULL
)
1063 ptlrpc_unregister_service(ldlm_state
->ldlm_cb_service
);
1066 kset_unregister(ldlm_ns_kset
);
1068 kset_unregister(ldlm_svc_kset
);
1070 kobject_put(ldlm_kobj
);
1072 ldlm_debugfs_cleanup();
1082 mutex_init(&ldlm_ref_mutex
);
1083 mutex_init(ldlm_namespace_lock(LDLM_NAMESPACE_SERVER
));
1084 mutex_init(ldlm_namespace_lock(LDLM_NAMESPACE_CLIENT
));
1085 ldlm_resource_slab
= kmem_cache_create("ldlm_resources",
1086 sizeof(struct ldlm_resource
), 0,
1087 SLAB_HWCACHE_ALIGN
, NULL
);
1088 if (ldlm_resource_slab
== NULL
)
1091 ldlm_lock_slab
= kmem_cache_create("ldlm_locks",
1092 sizeof(struct ldlm_lock
), 0,
1093 SLAB_HWCACHE_ALIGN
| SLAB_DESTROY_BY_RCU
, NULL
);
1094 if (ldlm_lock_slab
== NULL
) {
1095 kmem_cache_destroy(ldlm_resource_slab
);
1099 ldlm_interval_slab
= kmem_cache_create("interval_node",
1100 sizeof(struct ldlm_interval
),
1101 0, SLAB_HWCACHE_ALIGN
, NULL
);
1102 if (ldlm_interval_slab
== NULL
) {
1103 kmem_cache_destroy(ldlm_resource_slab
);
1104 kmem_cache_destroy(ldlm_lock_slab
);
1107 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1108 class_export_dump_hook
= ldlm_dump_export_locks
;
1113 void ldlm_exit(void)
1116 CERROR("ldlm_refcount is %d in ldlm_exit!\n", ldlm_refcount
);
1117 kmem_cache_destroy(ldlm_resource_slab
);
1118 /* ldlm_lock_put() use RCU to call ldlm_lock_free, so need call
1119 * synchronize_rcu() to wait a grace period elapsed, so that
1120 * ldlm_lock_free() get a chance to be called. */
1122 kmem_cache_destroy(ldlm_lock_slab
);
1123 kmem_cache_destroy(ldlm_interval_slab
);