4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License version 2 for more details (a copy is included
14 * in the LICENSE file that accompanied this code).
16 * You should have received a copy of the GNU General Public License
17 * version 2 along with this program; If not, see
18 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
20 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21 * CA 95054 USA or visit www.sun.com if you need additional information or
27 * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
28 * Use is subject to license terms.
30 * Copyright (c) 2010, 2012, Intel Corporation.
33 * This file is part of Lustre, http://www.lustre.org/
34 * Lustre is a trademark of Sun Microsystems, Inc.
36 * lustre/ldlm/ldlm_lockd.c
38 * Author: Peter Braam <braam@clusterfs.com>
39 * Author: Phil Schwan <phil@clusterfs.com>
42 #define DEBUG_SUBSYSTEM S_LDLM
44 #include "../../include/linux/libcfs/libcfs.h"
46 #include <lustre_dlm.h>
47 #include <obd_class.h>
48 #include <linux/list.h>
49 #include "ldlm_internal.h"
51 static int ldlm_num_threads
;
52 module_param(ldlm_num_threads
, int, 0444);
53 MODULE_PARM_DESC(ldlm_num_threads
, "number of DLM service threads to start");
55 static char *ldlm_cpts
;
56 module_param(ldlm_cpts
, charp
, 0444);
57 MODULE_PARM_DESC(ldlm_cpts
, "CPU partitions ldlm threads should run on");
59 extern struct kmem_cache
*ldlm_resource_slab
;
60 extern struct kmem_cache
*ldlm_lock_slab
;
61 static struct mutex ldlm_ref_mutex
;
62 static int ldlm_refcount
;
64 struct ldlm_cb_async_args
{
65 struct ldlm_cb_set_arg
*ca_set_arg
;
66 struct ldlm_lock
*ca_lock
;
71 static struct ldlm_state
*ldlm_state
;
73 inline cfs_time_t
round_timeout(cfs_time_t timeout
)
75 return cfs_time_seconds((int)cfs_duration_sec(cfs_time_sub(timeout
, 0)) + 1);
78 /* timeout for initial callback (AST) reply (bz10399) */
79 static inline unsigned int ldlm_get_rq_timeout(void)
82 unsigned int timeout
= min(ldlm_timeout
, obd_timeout
/ 3);
84 return timeout
< 1 ? 1 : timeout
;
89 #define ELT_TERMINATE 2
95 * blp_prio_list is used for callbacks that should be handled
96 * as a priority. It is used for LDLM_FL_DISCARD_DATA requests.
99 struct list_head blp_prio_list
;
102 * blp_list is used for all other callbacks which are likely
103 * to take longer to process.
105 struct list_head blp_list
;
107 wait_queue_head_t blp_waitq
;
108 struct completion blp_comp
;
109 atomic_t blp_num_threads
;
110 atomic_t blp_busy_threads
;
115 struct ldlm_bl_work_item
{
116 struct list_head blwi_entry
;
117 struct ldlm_namespace
*blwi_ns
;
118 struct ldlm_lock_desc blwi_ld
;
119 struct ldlm_lock
*blwi_lock
;
120 struct list_head blwi_head
;
122 struct completion blwi_comp
;
123 ldlm_cancel_flags_t blwi_flags
;
124 int blwi_mem_pressure
;
128 int ldlm_del_waiting_lock(struct ldlm_lock
*lock
)
133 int ldlm_refresh_waiting_lock(struct ldlm_lock
*lock
, int timeout
)
141 * Callback handler for receiving incoming blocking ASTs.
143 * This can only happen on client side.
145 void ldlm_handle_bl_callback(struct ldlm_namespace
*ns
,
146 struct ldlm_lock_desc
*ld
, struct ldlm_lock
*lock
)
150 LDLM_DEBUG(lock
, "client blocking AST callback handler");
152 lock_res_and_lock(lock
);
153 lock
->l_flags
|= LDLM_FL_CBPENDING
;
155 if (lock
->l_flags
& LDLM_FL_CANCEL_ON_BLOCK
)
156 lock
->l_flags
|= LDLM_FL_CANCEL
;
158 do_ast
= (!lock
->l_readers
&& !lock
->l_writers
);
159 unlock_res_and_lock(lock
);
162 CDEBUG(D_DLMTRACE
, "Lock %p already unused, calling callback (%p)\n",
163 lock
, lock
->l_blocking_ast
);
164 if (lock
->l_blocking_ast
!= NULL
)
165 lock
->l_blocking_ast(lock
, ld
, lock
->l_ast_data
,
168 CDEBUG(D_DLMTRACE
, "Lock %p is referenced, will be cancelled later\n",
172 LDLM_DEBUG(lock
, "client blocking callback handler END");
173 LDLM_LOCK_RELEASE(lock
);
177 * Callback handler for receiving incoming completion ASTs.
179 * This only can happen on client side.
181 static void ldlm_handle_cp_callback(struct ptlrpc_request
*req
,
182 struct ldlm_namespace
*ns
,
183 struct ldlm_request
*dlm_req
,
184 struct ldlm_lock
*lock
)
190 LDLM_DEBUG(lock
, "client completion callback handler START");
192 if (OBD_FAIL_CHECK(OBD_FAIL_LDLM_CANCEL_BL_CB_RACE
)) {
193 int to
= cfs_time_seconds(1);
195 set_current_state(TASK_INTERRUPTIBLE
);
196 schedule_timeout(to
);
197 if (lock
->l_granted_mode
== lock
->l_req_mode
||
198 lock
->l_flags
& LDLM_FL_DESTROYED
)
203 lvb_len
= req_capsule_get_size(&req
->rq_pill
, &RMF_DLM_LVB
, RCL_CLIENT
);
205 LDLM_ERROR(lock
, "Fail to get lvb_len, rc = %d", lvb_len
);
206 GOTO(out
, rc
= lvb_len
);
207 } else if (lvb_len
> 0) {
208 if (lock
->l_lvb_len
> 0) {
209 /* for extent lock, lvb contains ost_lvb{}. */
210 LASSERT(lock
->l_lvb_data
!= NULL
);
212 if (unlikely(lock
->l_lvb_len
< lvb_len
)) {
213 LDLM_ERROR(lock
, "Replied LVB is larger than "
214 "expectation, expected = %d, "
216 lock
->l_lvb_len
, lvb_len
);
217 GOTO(out
, rc
= -EINVAL
);
219 } else if (ldlm_has_layout(lock
)) { /* for layout lock, lvb has
223 OBD_ALLOC(lvb_data
, lvb_len
);
224 if (lvb_data
== NULL
) {
225 LDLM_ERROR(lock
, "No memory: %d.\n", lvb_len
);
226 GOTO(out
, rc
= -ENOMEM
);
229 lock_res_and_lock(lock
);
230 LASSERT(lock
->l_lvb_data
== NULL
);
231 lock
->l_lvb_type
= LVB_T_LAYOUT
;
232 lock
->l_lvb_data
= lvb_data
;
233 lock
->l_lvb_len
= lvb_len
;
234 unlock_res_and_lock(lock
);
238 lock_res_and_lock(lock
);
239 if ((lock
->l_flags
& LDLM_FL_DESTROYED
) ||
240 lock
->l_granted_mode
== lock
->l_req_mode
) {
241 /* bug 11300: the lock has already been granted */
242 unlock_res_and_lock(lock
);
243 LDLM_DEBUG(lock
, "Double grant race happened");
247 /* If we receive the completion AST before the actual enqueue returned,
248 * then we might need to switch lock modes, resources, or extents. */
249 if (dlm_req
->lock_desc
.l_granted_mode
!= lock
->l_req_mode
) {
250 lock
->l_req_mode
= dlm_req
->lock_desc
.l_granted_mode
;
251 LDLM_DEBUG(lock
, "completion AST, new lock mode");
254 if (lock
->l_resource
->lr_type
!= LDLM_PLAIN
) {
255 ldlm_convert_policy_to_local(req
->rq_export
,
256 dlm_req
->lock_desc
.l_resource
.lr_type
,
257 &dlm_req
->lock_desc
.l_policy_data
,
258 &lock
->l_policy_data
);
259 LDLM_DEBUG(lock
, "completion AST, new policy data");
262 ldlm_resource_unlink_lock(lock
);
263 if (memcmp(&dlm_req
->lock_desc
.l_resource
.lr_name
,
264 &lock
->l_resource
->lr_name
,
265 sizeof(lock
->l_resource
->lr_name
)) != 0) {
266 unlock_res_and_lock(lock
);
267 rc
= ldlm_lock_change_resource(ns
, lock
,
268 &dlm_req
->lock_desc
.l_resource
.lr_name
);
270 LDLM_ERROR(lock
, "Failed to allocate resource");
273 LDLM_DEBUG(lock
, "completion AST, new resource");
274 CERROR("change resource!\n");
275 lock_res_and_lock(lock
);
278 if (dlm_req
->lock_flags
& LDLM_FL_AST_SENT
) {
279 /* BL_AST locks are not needed in LRU.
280 * Let ldlm_cancel_lru() be fast. */
281 ldlm_lock_remove_from_lru(lock
);
282 lock
->l_flags
|= LDLM_FL_CBPENDING
| LDLM_FL_BL_AST
;
283 LDLM_DEBUG(lock
, "completion AST includes blocking AST");
286 if (lock
->l_lvb_len
> 0) {
287 rc
= ldlm_fill_lvb(lock
, &req
->rq_pill
, RCL_CLIENT
,
288 lock
->l_lvb_data
, lvb_len
);
290 unlock_res_and_lock(lock
);
295 ldlm_grant_lock(lock
, &ast_list
);
296 unlock_res_and_lock(lock
);
298 LDLM_DEBUG(lock
, "callback handler finished, about to run_ast_work");
300 /* Let Enqueue to call osc_lock_upcall() and initialize
302 OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_ENQ_RACE
, 2);
304 ldlm_run_ast_work(ns
, &ast_list
, LDLM_WORK_CP_AST
);
306 LDLM_DEBUG_NOLOCK("client completion callback handler END (lock %p)",
312 lock_res_and_lock(lock
);
313 lock
->l_flags
|= LDLM_FL_FAILED
;
314 unlock_res_and_lock(lock
);
315 wake_up(&lock
->l_waitq
);
317 LDLM_LOCK_RELEASE(lock
);
321 * Callback handler for receiving incoming glimpse ASTs.
323 * This only can happen on client side. After handling the glimpse AST
324 * we also consider dropping the lock here if it is unused locally for a
327 static void ldlm_handle_gl_callback(struct ptlrpc_request
*req
,
328 struct ldlm_namespace
*ns
,
329 struct ldlm_request
*dlm_req
,
330 struct ldlm_lock
*lock
)
334 LDLM_DEBUG(lock
, "client glimpse AST callback handler");
336 if (lock
->l_glimpse_ast
!= NULL
)
337 rc
= lock
->l_glimpse_ast(lock
, req
);
339 if (req
->rq_repmsg
!= NULL
) {
346 lock_res_and_lock(lock
);
347 if (lock
->l_granted_mode
== LCK_PW
&&
348 !lock
->l_readers
&& !lock
->l_writers
&&
349 cfs_time_after(cfs_time_current(),
350 cfs_time_add(lock
->l_last_used
,
351 cfs_time_seconds(10)))) {
352 unlock_res_and_lock(lock
);
353 if (ldlm_bl_to_thread_lock(ns
, NULL
, lock
))
354 ldlm_handle_bl_callback(ns
, NULL
, lock
);
358 unlock_res_and_lock(lock
);
359 LDLM_LOCK_RELEASE(lock
);
362 static int ldlm_callback_reply(struct ptlrpc_request
*req
, int rc
)
364 if (req
->rq_no_reply
)
368 if (!req
->rq_packed_final
) {
369 rc
= lustre_pack_reply(req
, 1, NULL
, NULL
);
373 return ptlrpc_reply(req
);
376 static int __ldlm_bl_to_thread(struct ldlm_bl_work_item
*blwi
,
377 ldlm_cancel_flags_t cancel_flags
)
379 struct ldlm_bl_pool
*blp
= ldlm_state
->ldlm_bl_pool
;
381 spin_lock(&blp
->blp_lock
);
382 if (blwi
->blwi_lock
&&
383 blwi
->blwi_lock
->l_flags
& LDLM_FL_DISCARD_DATA
) {
384 /* add LDLM_FL_DISCARD_DATA requests to the priority list */
385 list_add_tail(&blwi
->blwi_entry
, &blp
->blp_prio_list
);
387 /* other blocking callbacks are added to the regular list */
388 list_add_tail(&blwi
->blwi_entry
, &blp
->blp_list
);
390 spin_unlock(&blp
->blp_lock
);
392 wake_up(&blp
->blp_waitq
);
394 /* can not check blwi->blwi_flags as blwi could be already freed in
396 if (!(cancel_flags
& LCF_ASYNC
))
397 wait_for_completion(&blwi
->blwi_comp
);
402 static inline void init_blwi(struct ldlm_bl_work_item
*blwi
,
403 struct ldlm_namespace
*ns
,
404 struct ldlm_lock_desc
*ld
,
405 struct list_head
*cancels
, int count
,
406 struct ldlm_lock
*lock
,
407 ldlm_cancel_flags_t cancel_flags
)
409 init_completion(&blwi
->blwi_comp
);
410 INIT_LIST_HEAD(&blwi
->blwi_head
);
412 if (memory_pressure_get())
413 blwi
->blwi_mem_pressure
= 1;
416 blwi
->blwi_flags
= cancel_flags
;
420 list_add(&blwi
->blwi_head
, cancels
);
421 list_del_init(cancels
);
422 blwi
->blwi_count
= count
;
424 blwi
->blwi_lock
= lock
;
429 * Queues a list of locks \a cancels containing \a count locks
430 * for later processing by a blocking thread. If \a count is zero,
431 * then the lock referenced as \a lock is queued instead.
433 * The blocking thread would then call ->l_blocking_ast callback in the lock.
434 * If list addition fails an error is returned and caller is supposed to
435 * call ->l_blocking_ast itself.
437 static int ldlm_bl_to_thread(struct ldlm_namespace
*ns
,
438 struct ldlm_lock_desc
*ld
,
439 struct ldlm_lock
*lock
,
440 struct list_head
*cancels
, int count
,
441 ldlm_cancel_flags_t cancel_flags
)
443 if (cancels
&& count
== 0)
446 if (cancel_flags
& LCF_ASYNC
) {
447 struct ldlm_bl_work_item
*blwi
;
449 OBD_ALLOC(blwi
, sizeof(*blwi
));
452 init_blwi(blwi
, ns
, ld
, cancels
, count
, lock
, cancel_flags
);
454 return __ldlm_bl_to_thread(blwi
, cancel_flags
);
456 /* if it is synchronous call do minimum mem alloc, as it could
457 * be triggered from kernel shrinker
459 struct ldlm_bl_work_item blwi
;
461 memset(&blwi
, 0, sizeof(blwi
));
462 init_blwi(&blwi
, ns
, ld
, cancels
, count
, lock
, cancel_flags
);
463 return __ldlm_bl_to_thread(&blwi
, cancel_flags
);
468 int ldlm_bl_to_thread_lock(struct ldlm_namespace
*ns
, struct ldlm_lock_desc
*ld
,
469 struct ldlm_lock
*lock
)
471 return ldlm_bl_to_thread(ns
, ld
, lock
, NULL
, 0, LCF_ASYNC
);
474 int ldlm_bl_to_thread_list(struct ldlm_namespace
*ns
, struct ldlm_lock_desc
*ld
,
475 struct list_head
*cancels
, int count
,
476 ldlm_cancel_flags_t cancel_flags
)
478 return ldlm_bl_to_thread(ns
, ld
, NULL
, cancels
, count
, cancel_flags
);
481 /* Setinfo coming from Server (eg MDT) to Client (eg MDC)! */
482 static int ldlm_handle_setinfo(struct ptlrpc_request
*req
)
484 struct obd_device
*obd
= req
->rq_export
->exp_obd
;
490 DEBUG_REQ(D_HSM
, req
, "%s: handle setinfo\n", obd
->obd_name
);
492 req_capsule_set(&req
->rq_pill
, &RQF_OBD_SET_INFO
);
494 key
= req_capsule_client_get(&req
->rq_pill
, &RMF_SETINFO_KEY
);
496 DEBUG_REQ(D_IOCTL
, req
, "no set_info key");
499 keylen
= req_capsule_get_size(&req
->rq_pill
, &RMF_SETINFO_KEY
,
501 val
= req_capsule_client_get(&req
->rq_pill
, &RMF_SETINFO_VAL
);
503 DEBUG_REQ(D_IOCTL
, req
, "no set_info val");
506 vallen
= req_capsule_get_size(&req
->rq_pill
, &RMF_SETINFO_VAL
,
509 /* We are responsible for swabbing contents of val */
511 if (KEY_IS(KEY_HSM_COPYTOOL_SEND
))
512 /* Pass it on to mdc (the "export" in this case) */
513 rc
= obd_set_info_async(req
->rq_svc_thread
->t_env
,
515 sizeof(KEY_HSM_COPYTOOL_SEND
),
516 KEY_HSM_COPYTOOL_SEND
,
519 DEBUG_REQ(D_WARNING
, req
, "ignoring unknown key %s", key
);
524 static inline void ldlm_callback_errmsg(struct ptlrpc_request
*req
,
525 const char *msg
, int rc
,
526 struct lustre_handle
*handle
)
528 DEBUG_REQ((req
->rq_no_reply
|| rc
) ? D_WARNING
: D_DLMTRACE
, req
,
529 "%s: [nid %s] [rc %d] [lock "LPX64
"]",
530 msg
, libcfs_id2str(req
->rq_peer
), rc
,
531 handle
? handle
->cookie
: 0);
532 if (req
->rq_no_reply
)
533 CWARN("No reply was sent, maybe cause bug 21636.\n");
535 CWARN("Send reply failed, maybe cause bug 21636.\n");
538 static int ldlm_handle_qc_callback(struct ptlrpc_request
*req
)
540 struct obd_quotactl
*oqctl
;
541 struct client_obd
*cli
= &req
->rq_export
->exp_obd
->u
.cli
;
543 oqctl
= req_capsule_client_get(&req
->rq_pill
, &RMF_OBD_QUOTACTL
);
545 CERROR("Can't unpack obd_quotactl\n");
549 oqctl
->qc_stat
= ptlrpc_status_ntoh(oqctl
->qc_stat
);
551 cli
->cl_qchk_stat
= oqctl
->qc_stat
;
555 /* TODO: handle requests in a similar way as MDT: see mdt_handle_common() */
556 static int ldlm_callback_handler(struct ptlrpc_request
*req
)
558 struct ldlm_namespace
*ns
;
559 struct ldlm_request
*dlm_req
;
560 struct ldlm_lock
*lock
;
563 /* Requests arrive in sender's byte order. The ptlrpc service
564 * handler has already checked and, if necessary, byte-swapped the
565 * incoming request message body, but I am responsible for the
566 * message buffers. */
568 /* do nothing for sec context finalize */
569 if (lustre_msg_get_opc(req
->rq_reqmsg
) == SEC_CTX_FINI
)
572 req_capsule_init(&req
->rq_pill
, req
, RCL_SERVER
);
574 if (req
->rq_export
== NULL
) {
575 rc
= ldlm_callback_reply(req
, -ENOTCONN
);
576 ldlm_callback_errmsg(req
, "Operate on unconnected server",
581 LASSERT(req
->rq_export
!= NULL
);
582 LASSERT(req
->rq_export
->exp_obd
!= NULL
);
584 switch (lustre_msg_get_opc(req
->rq_reqmsg
)) {
585 case LDLM_BL_CALLBACK
:
586 if (OBD_FAIL_CHECK(OBD_FAIL_LDLM_BL_CALLBACK_NET
))
589 case LDLM_CP_CALLBACK
:
590 if (OBD_FAIL_CHECK(OBD_FAIL_LDLM_CP_CALLBACK_NET
))
593 case LDLM_GL_CALLBACK
:
594 if (OBD_FAIL_CHECK(OBD_FAIL_LDLM_GL_CALLBACK_NET
))
598 rc
= ldlm_handle_setinfo(req
);
599 ldlm_callback_reply(req
, rc
);
601 case OBD_QC_CALLBACK
:
602 req_capsule_set(&req
->rq_pill
, &RQF_QC_CALLBACK
);
603 if (OBD_FAIL_CHECK(OBD_FAIL_OBD_QC_CALLBACK_NET
))
605 rc
= ldlm_handle_qc_callback(req
);
606 ldlm_callback_reply(req
, rc
);
609 CERROR("unknown opcode %u\n",
610 lustre_msg_get_opc(req
->rq_reqmsg
));
611 ldlm_callback_reply(req
, -EPROTO
);
615 ns
= req
->rq_export
->exp_obd
->obd_namespace
;
618 req_capsule_set(&req
->rq_pill
, &RQF_LDLM_CALLBACK
);
620 dlm_req
= req_capsule_client_get(&req
->rq_pill
, &RMF_DLM_REQ
);
621 if (dlm_req
== NULL
) {
622 rc
= ldlm_callback_reply(req
, -EPROTO
);
623 ldlm_callback_errmsg(req
, "Operate without parameter", rc
,
628 /* Force a known safe race, send a cancel to the server for a lock
629 * which the server has already started a blocking callback on. */
630 if (OBD_FAIL_CHECK(OBD_FAIL_LDLM_CANCEL_BL_CB_RACE
) &&
631 lustre_msg_get_opc(req
->rq_reqmsg
) == LDLM_BL_CALLBACK
) {
632 rc
= ldlm_cli_cancel(&dlm_req
->lock_handle
[0], 0);
634 CERROR("ldlm_cli_cancel: %d\n", rc
);
637 lock
= ldlm_handle2lock_long(&dlm_req
->lock_handle
[0], 0);
639 CDEBUG(D_DLMTRACE
, "callback on lock "LPX64
" - lock "
640 "disappeared\n", dlm_req
->lock_handle
[0].cookie
);
641 rc
= ldlm_callback_reply(req
, -EINVAL
);
642 ldlm_callback_errmsg(req
, "Operate with invalid parameter", rc
,
643 &dlm_req
->lock_handle
[0]);
647 if ((lock
->l_flags
& LDLM_FL_FAIL_LOC
) &&
648 lustre_msg_get_opc(req
->rq_reqmsg
) == LDLM_BL_CALLBACK
)
649 OBD_RACE(OBD_FAIL_LDLM_CP_BL_RACE
);
651 /* Copy hints/flags (e.g. LDLM_FL_DISCARD_DATA) from AST. */
652 lock_res_and_lock(lock
);
653 lock
->l_flags
|= ldlm_flags_from_wire(dlm_req
->lock_flags
&
655 if (lustre_msg_get_opc(req
->rq_reqmsg
) == LDLM_BL_CALLBACK
) {
656 /* If somebody cancels lock and cache is already dropped,
657 * or lock is failed before cp_ast received on client,
658 * we can tell the server we have no lock. Otherwise, we
659 * should send cancel after dropping the cache. */
660 if (((lock
->l_flags
& LDLM_FL_CANCELING
) &&
661 (lock
->l_flags
& LDLM_FL_BL_DONE
)) ||
662 (lock
->l_flags
& LDLM_FL_FAILED
)) {
663 LDLM_DEBUG(lock
, "callback on lock "
664 LPX64
" - lock disappeared\n",
665 dlm_req
->lock_handle
[0].cookie
);
666 unlock_res_and_lock(lock
);
667 LDLM_LOCK_RELEASE(lock
);
668 rc
= ldlm_callback_reply(req
, -EINVAL
);
669 ldlm_callback_errmsg(req
, "Operate on stale lock", rc
,
670 &dlm_req
->lock_handle
[0]);
673 /* BL_AST locks are not needed in LRU.
674 * Let ldlm_cancel_lru() be fast. */
675 ldlm_lock_remove_from_lru(lock
);
676 lock
->l_flags
|= LDLM_FL_BL_AST
;
678 unlock_res_and_lock(lock
);
680 /* We want the ost thread to get this reply so that it can respond
681 * to ost requests (write cache writeback) that might be triggered
684 * But we'd also like to be able to indicate in the reply that we're
685 * cancelling right now, because it's unused, or have an intent result
686 * in the reply, so we might have to push the responsibility for sending
687 * the reply down into the AST handlers, alas. */
689 switch (lustre_msg_get_opc(req
->rq_reqmsg
)) {
690 case LDLM_BL_CALLBACK
:
691 CDEBUG(D_INODE
, "blocking ast\n");
692 req_capsule_extend(&req
->rq_pill
, &RQF_LDLM_BL_CALLBACK
);
693 if (!(lock
->l_flags
& LDLM_FL_CANCEL_ON_BLOCK
)) {
694 rc
= ldlm_callback_reply(req
, 0);
695 if (req
->rq_no_reply
|| rc
)
696 ldlm_callback_errmsg(req
, "Normal process", rc
,
697 &dlm_req
->lock_handle
[0]);
699 if (ldlm_bl_to_thread_lock(ns
, &dlm_req
->lock_desc
, lock
))
700 ldlm_handle_bl_callback(ns
, &dlm_req
->lock_desc
, lock
);
702 case LDLM_CP_CALLBACK
:
703 CDEBUG(D_INODE
, "completion ast\n");
704 req_capsule_extend(&req
->rq_pill
, &RQF_LDLM_CP_CALLBACK
);
705 ldlm_callback_reply(req
, 0);
706 ldlm_handle_cp_callback(req
, ns
, dlm_req
, lock
);
708 case LDLM_GL_CALLBACK
:
709 CDEBUG(D_INODE
, "glimpse ast\n");
710 req_capsule_extend(&req
->rq_pill
, &RQF_LDLM_GL_CALLBACK
);
711 ldlm_handle_gl_callback(req
, ns
, dlm_req
, lock
);
714 LBUG(); /* checked above */
721 static struct ldlm_bl_work_item
*ldlm_bl_get_work(struct ldlm_bl_pool
*blp
)
723 struct ldlm_bl_work_item
*blwi
= NULL
;
724 static unsigned int num_bl
= 0;
726 spin_lock(&blp
->blp_lock
);
727 /* process a request from the blp_list at least every blp_num_threads */
728 if (!list_empty(&blp
->blp_list
) &&
729 (list_empty(&blp
->blp_prio_list
) || num_bl
== 0))
730 blwi
= list_entry(blp
->blp_list
.next
,
731 struct ldlm_bl_work_item
, blwi_entry
);
733 if (!list_empty(&blp
->blp_prio_list
))
734 blwi
= list_entry(blp
->blp_prio_list
.next
,
735 struct ldlm_bl_work_item
,
739 if (++num_bl
>= atomic_read(&blp
->blp_num_threads
))
741 list_del(&blwi
->blwi_entry
);
743 spin_unlock(&blp
->blp_lock
);
748 /* This only contains temporary data until the thread starts */
749 struct ldlm_bl_thread_data
{
750 char bltd_name
[CFS_CURPROC_COMM_MAX
];
751 struct ldlm_bl_pool
*bltd_blp
;
752 struct completion bltd_comp
;
756 static int ldlm_bl_thread_main(void *arg
);
758 static int ldlm_bl_thread_start(struct ldlm_bl_pool
*blp
)
760 struct ldlm_bl_thread_data bltd
= { .bltd_blp
= blp
};
761 struct task_struct
*task
;
763 init_completion(&bltd
.bltd_comp
);
764 bltd
.bltd_num
= atomic_read(&blp
->blp_num_threads
);
765 snprintf(bltd
.bltd_name
, sizeof(bltd
.bltd_name
),
766 "ldlm_bl_%02d", bltd
.bltd_num
);
767 task
= kthread_run(ldlm_bl_thread_main
, &bltd
, "%s", bltd
.bltd_name
);
769 CERROR("cannot start LDLM thread ldlm_bl_%02d: rc %ld\n",
770 atomic_read(&blp
->blp_num_threads
), PTR_ERR(task
));
771 return PTR_ERR(task
);
773 wait_for_completion(&bltd
.bltd_comp
);
779 * Main blocking requests processing thread.
781 * Callers put locks into its queue by calling ldlm_bl_to_thread.
782 * This thread in the end ends up doing actual call to ->l_blocking_ast
785 static int ldlm_bl_thread_main(void *arg
)
787 struct ldlm_bl_pool
*blp
;
790 struct ldlm_bl_thread_data
*bltd
= arg
;
792 blp
= bltd
->bltd_blp
;
794 atomic_inc(&blp
->blp_num_threads
);
795 atomic_inc(&blp
->blp_busy_threads
);
797 complete(&bltd
->bltd_comp
);
798 /* cannot use bltd after this, it is only on caller's stack */
802 struct l_wait_info lwi
= { 0 };
803 struct ldlm_bl_work_item
*blwi
= NULL
;
806 blwi
= ldlm_bl_get_work(blp
);
809 atomic_dec(&blp
->blp_busy_threads
);
810 l_wait_event_exclusive(blp
->blp_waitq
,
811 (blwi
= ldlm_bl_get_work(blp
)) != NULL
,
813 busy
= atomic_inc_return(&blp
->blp_busy_threads
);
815 busy
= atomic_read(&blp
->blp_busy_threads
);
818 if (blwi
->blwi_ns
== NULL
)
819 /* added by ldlm_cleanup() */
822 /* Not fatal if racy and have a few too many threads */
823 if (unlikely(busy
< blp
->blp_max_threads
&&
824 busy
>= atomic_read(&blp
->blp_num_threads
) &&
825 !blwi
->blwi_mem_pressure
))
826 /* discard the return value, we tried */
827 ldlm_bl_thread_start(blp
);
829 if (blwi
->blwi_mem_pressure
)
830 memory_pressure_set();
832 if (blwi
->blwi_count
) {
834 /* The special case when we cancel locks in LRU
835 * asynchronously, we pass the list of locks here.
836 * Thus locks are marked LDLM_FL_CANCELING, but NOT
837 * canceled locally yet. */
838 count
= ldlm_cli_cancel_list_local(&blwi
->blwi_head
,
841 ldlm_cli_cancel_list(&blwi
->blwi_head
, count
, NULL
,
844 ldlm_handle_bl_callback(blwi
->blwi_ns
, &blwi
->blwi_ld
,
847 if (blwi
->blwi_mem_pressure
)
848 memory_pressure_clr();
850 if (blwi
->blwi_flags
& LCF_ASYNC
)
851 OBD_FREE(blwi
, sizeof(*blwi
));
853 complete(&blwi
->blwi_comp
);
856 atomic_dec(&blp
->blp_busy_threads
);
857 atomic_dec(&blp
->blp_num_threads
);
858 complete(&blp
->blp_comp
);
863 static int ldlm_setup(void);
864 static int ldlm_cleanup(void);
866 int ldlm_get_ref(void)
870 mutex_lock(&ldlm_ref_mutex
);
871 if (++ldlm_refcount
== 1) {
876 mutex_unlock(&ldlm_ref_mutex
);
880 EXPORT_SYMBOL(ldlm_get_ref
);
882 void ldlm_put_ref(void)
884 mutex_lock(&ldlm_ref_mutex
);
885 if (ldlm_refcount
== 1) {
886 int rc
= ldlm_cleanup();
888 CERROR("ldlm_cleanup failed: %d\n", rc
);
894 mutex_unlock(&ldlm_ref_mutex
);
896 EXPORT_SYMBOL(ldlm_put_ref
);
899 * Export handle<->lock hash operations.
902 ldlm_export_lock_hash(struct cfs_hash
*hs
, const void *key
, unsigned mask
)
904 return cfs_hash_u64_hash(((struct lustre_handle
*)key
)->cookie
, mask
);
908 ldlm_export_lock_key(struct hlist_node
*hnode
)
910 struct ldlm_lock
*lock
;
912 lock
= hlist_entry(hnode
, struct ldlm_lock
, l_exp_hash
);
913 return &lock
->l_remote_handle
;
917 ldlm_export_lock_keycpy(struct hlist_node
*hnode
, void *key
)
919 struct ldlm_lock
*lock
;
921 lock
= hlist_entry(hnode
, struct ldlm_lock
, l_exp_hash
);
922 lock
->l_remote_handle
= *(struct lustre_handle
*)key
;
926 ldlm_export_lock_keycmp(const void *key
, struct hlist_node
*hnode
)
928 return lustre_handle_equal(ldlm_export_lock_key(hnode
), key
);
932 ldlm_export_lock_object(struct hlist_node
*hnode
)
934 return hlist_entry(hnode
, struct ldlm_lock
, l_exp_hash
);
938 ldlm_export_lock_get(struct cfs_hash
*hs
, struct hlist_node
*hnode
)
940 struct ldlm_lock
*lock
;
942 lock
= hlist_entry(hnode
, struct ldlm_lock
, l_exp_hash
);
947 ldlm_export_lock_put(struct cfs_hash
*hs
, struct hlist_node
*hnode
)
949 struct ldlm_lock
*lock
;
951 lock
= hlist_entry(hnode
, struct ldlm_lock
, l_exp_hash
);
952 LDLM_LOCK_RELEASE(lock
);
955 static cfs_hash_ops_t ldlm_export_lock_ops
= {
956 .hs_hash
= ldlm_export_lock_hash
,
957 .hs_key
= ldlm_export_lock_key
,
958 .hs_keycmp
= ldlm_export_lock_keycmp
,
959 .hs_keycpy
= ldlm_export_lock_keycpy
,
960 .hs_object
= ldlm_export_lock_object
,
961 .hs_get
= ldlm_export_lock_get
,
962 .hs_put
= ldlm_export_lock_put
,
963 .hs_put_locked
= ldlm_export_lock_put
,
966 int ldlm_init_export(struct obd_export
*exp
)
970 cfs_hash_create(obd_uuid2str(&exp
->exp_client_uuid
),
971 HASH_EXP_LOCK_CUR_BITS
,
972 HASH_EXP_LOCK_MAX_BITS
,
973 HASH_EXP_LOCK_BKT_BITS
, 0,
974 CFS_HASH_MIN_THETA
, CFS_HASH_MAX_THETA
,
975 &ldlm_export_lock_ops
,
976 CFS_HASH_DEFAULT
| CFS_HASH_REHASH_KEY
|
977 CFS_HASH_NBLK_CHANGE
);
979 if (!exp
->exp_lock_hash
)
982 rc
= ldlm_init_flock_export(exp
);
988 ldlm_destroy_export(exp
);
991 EXPORT_SYMBOL(ldlm_init_export
);
993 void ldlm_destroy_export(struct obd_export
*exp
)
995 cfs_hash_putref(exp
->exp_lock_hash
);
996 exp
->exp_lock_hash
= NULL
;
998 ldlm_destroy_flock_export(exp
);
1000 EXPORT_SYMBOL(ldlm_destroy_export
);
1002 static int ldlm_setup(void)
1004 static struct ptlrpc_service_conf conf
;
1005 struct ldlm_bl_pool
*blp
= NULL
;
1009 if (ldlm_state
!= NULL
)
1012 OBD_ALLOC(ldlm_state
, sizeof(*ldlm_state
));
1013 if (ldlm_state
== NULL
)
1016 rc
= ldlm_proc_setup();
1020 memset(&conf
, 0, sizeof(conf
));
1021 conf
= (typeof(conf
)) {
1022 .psc_name
= "ldlm_cbd",
1023 .psc_watchdog_factor
= 2,
1025 .bc_nbufs
= LDLM_CLIENT_NBUFS
,
1026 .bc_buf_size
= LDLM_BUFSIZE
,
1027 .bc_req_max_size
= LDLM_MAXREQSIZE
,
1028 .bc_rep_max_size
= LDLM_MAXREPSIZE
,
1029 .bc_req_portal
= LDLM_CB_REQUEST_PORTAL
,
1030 .bc_rep_portal
= LDLM_CB_REPLY_PORTAL
,
1033 .tc_thr_name
= "ldlm_cb",
1034 .tc_thr_factor
= LDLM_THR_FACTOR
,
1035 .tc_nthrs_init
= LDLM_NTHRS_INIT
,
1036 .tc_nthrs_base
= LDLM_NTHRS_BASE
,
1037 .tc_nthrs_max
= LDLM_NTHRS_MAX
,
1038 .tc_nthrs_user
= ldlm_num_threads
,
1039 .tc_cpu_affinity
= 1,
1040 .tc_ctx_tags
= LCT_MD_THREAD
| LCT_DT_THREAD
,
1043 .cc_pattern
= ldlm_cpts
,
1046 .so_req_handler
= ldlm_callback_handler
,
1049 ldlm_state
->ldlm_cb_service
= \
1050 ptlrpc_register_service(&conf
, ldlm_svc_proc_dir
);
1051 if (IS_ERR(ldlm_state
->ldlm_cb_service
)) {
1052 CERROR("failed to start service\n");
1053 rc
= PTR_ERR(ldlm_state
->ldlm_cb_service
);
1054 ldlm_state
->ldlm_cb_service
= NULL
;
1059 OBD_ALLOC(blp
, sizeof(*blp
));
1061 GOTO(out
, rc
= -ENOMEM
);
1062 ldlm_state
->ldlm_bl_pool
= blp
;
1064 spin_lock_init(&blp
->blp_lock
);
1065 INIT_LIST_HEAD(&blp
->blp_list
);
1066 INIT_LIST_HEAD(&blp
->blp_prio_list
);
1067 init_waitqueue_head(&blp
->blp_waitq
);
1068 atomic_set(&blp
->blp_num_threads
, 0);
1069 atomic_set(&blp
->blp_busy_threads
, 0);
1071 if (ldlm_num_threads
== 0) {
1072 blp
->blp_min_threads
= LDLM_NTHRS_INIT
;
1073 blp
->blp_max_threads
= LDLM_NTHRS_MAX
;
1075 blp
->blp_min_threads
= blp
->blp_max_threads
= \
1076 min_t(int, LDLM_NTHRS_MAX
, max_t(int, LDLM_NTHRS_INIT
,
1080 for (i
= 0; i
< blp
->blp_min_threads
; i
++) {
1081 rc
= ldlm_bl_thread_start(blp
);
1087 rc
= ldlm_pools_init();
1089 CERROR("Failed to initialize LDLM pools: %d\n", rc
);
1099 static int ldlm_cleanup(void)
1101 if (!list_empty(ldlm_namespace_list(LDLM_NAMESPACE_SERVER
)) ||
1102 !list_empty(ldlm_namespace_list(LDLM_NAMESPACE_CLIENT
))) {
1103 CERROR("ldlm still has namespaces; clean these up first.\n");
1104 ldlm_dump_all_namespaces(LDLM_NAMESPACE_SERVER
, D_DLMTRACE
);
1105 ldlm_dump_all_namespaces(LDLM_NAMESPACE_CLIENT
, D_DLMTRACE
);
1111 if (ldlm_state
->ldlm_bl_pool
!= NULL
) {
1112 struct ldlm_bl_pool
*blp
= ldlm_state
->ldlm_bl_pool
;
1114 while (atomic_read(&blp
->blp_num_threads
) > 0) {
1115 struct ldlm_bl_work_item blwi
= { .blwi_ns
= NULL
};
1117 init_completion(&blp
->blp_comp
);
1119 spin_lock(&blp
->blp_lock
);
1120 list_add_tail(&blwi
.blwi_entry
, &blp
->blp_list
);
1121 wake_up(&blp
->blp_waitq
);
1122 spin_unlock(&blp
->blp_lock
);
1124 wait_for_completion(&blp
->blp_comp
);
1127 OBD_FREE(blp
, sizeof(*blp
));
1130 if (ldlm_state
->ldlm_cb_service
!= NULL
)
1131 ptlrpc_unregister_service(ldlm_state
->ldlm_cb_service
);
1133 ldlm_proc_cleanup();
1136 OBD_FREE(ldlm_state
, sizeof(*ldlm_state
));
1144 mutex_init(&ldlm_ref_mutex
);
1145 mutex_init(ldlm_namespace_lock(LDLM_NAMESPACE_SERVER
));
1146 mutex_init(ldlm_namespace_lock(LDLM_NAMESPACE_CLIENT
));
1147 ldlm_resource_slab
= kmem_cache_create("ldlm_resources",
1148 sizeof(struct ldlm_resource
), 0,
1149 SLAB_HWCACHE_ALIGN
, NULL
);
1150 if (ldlm_resource_slab
== NULL
)
1153 ldlm_lock_slab
= kmem_cache_create("ldlm_locks",
1154 sizeof(struct ldlm_lock
), 0,
1155 SLAB_HWCACHE_ALIGN
| SLAB_DESTROY_BY_RCU
, NULL
);
1156 if (ldlm_lock_slab
== NULL
) {
1157 kmem_cache_destroy(ldlm_resource_slab
);
1161 ldlm_interval_slab
= kmem_cache_create("interval_node",
1162 sizeof(struct ldlm_interval
),
1163 0, SLAB_HWCACHE_ALIGN
, NULL
);
1164 if (ldlm_interval_slab
== NULL
) {
1165 kmem_cache_destroy(ldlm_resource_slab
);
1166 kmem_cache_destroy(ldlm_lock_slab
);
1169 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1170 class_export_dump_hook
= ldlm_dump_export_locks
;
1175 void ldlm_exit(void)
1178 CERROR("ldlm_refcount is %d in ldlm_exit!\n", ldlm_refcount
);
1179 kmem_cache_destroy(ldlm_resource_slab
);
1180 /* ldlm_lock_put() use RCU to call ldlm_lock_free, so need call
1181 * synchronize_rcu() to wait a grace period elapsed, so that
1182 * ldlm_lock_free() get a chance to be called. */
1184 kmem_cache_destroy(ldlm_lock_slab
);
1185 kmem_cache_destroy(ldlm_interval_slab
);