4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License version 2 for more details (a copy is included
14 * in the LICENSE file that accompanied this code).
16 * You should have received a copy of the GNU General Public License
17 * version 2 along with this program; If not, see
18 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
20 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21 * CA 95054 USA or visit www.sun.com if you need additional information or
27 * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
28 * Use is subject to license terms.
30 * Copyright (c) 2010, 2012, Intel Corporation.
33 * This file is part of Lustre, http://www.lustre.org/
34 * Lustre is a trademark of Sun Microsystems, Inc.
36 * lustre/ldlm/ldlm_lockd.c
38 * Author: Peter Braam <braam@clusterfs.com>
39 * Author: Phil Schwan <phil@clusterfs.com>
42 #define DEBUG_SUBSYSTEM S_LDLM
44 # include <linux/libcfs/libcfs.h>
46 #include <lustre_dlm.h>
47 #include <obd_class.h>
48 #include <linux/list.h>
49 #include "ldlm_internal.h"
51 static int ldlm_num_threads
;
52 CFS_MODULE_PARM(ldlm_num_threads
, "i", int, 0444,
53 "number of DLM service threads to start");
55 static char *ldlm_cpts
;
56 CFS_MODULE_PARM(ldlm_cpts
, "s", charp
, 0444,
57 "CPU partitions ldlm threads should run on");
59 extern struct kmem_cache
*ldlm_resource_slab
;
60 extern struct kmem_cache
*ldlm_lock_slab
;
61 static struct mutex ldlm_ref_mutex
;
62 static int ldlm_refcount
;
64 struct ldlm_cb_async_args
{
65 struct ldlm_cb_set_arg
*ca_set_arg
;
66 struct ldlm_lock
*ca_lock
;
71 static struct ldlm_state
*ldlm_state
;
73 inline cfs_time_t
round_timeout(cfs_time_t timeout
)
75 return cfs_time_seconds((int)cfs_duration_sec(cfs_time_sub(timeout
, 0)) + 1);
78 /* timeout for initial callback (AST) reply (bz10399) */
79 static inline unsigned int ldlm_get_rq_timeout(void)
82 unsigned int timeout
= min(ldlm_timeout
, obd_timeout
/ 3);
84 return timeout
< 1 ? 1 : timeout
;
89 #define ELT_TERMINATE 2
95 * blp_prio_list is used for callbacks that should be handled
96 * as a priority. It is used for LDLM_FL_DISCARD_DATA requests.
99 struct list_head blp_prio_list
;
102 * blp_list is used for all other callbacks which are likely
103 * to take longer to process.
105 struct list_head blp_list
;
107 wait_queue_head_t blp_waitq
;
108 struct completion blp_comp
;
109 atomic_t blp_num_threads
;
110 atomic_t blp_busy_threads
;
115 struct ldlm_bl_work_item
{
116 struct list_head blwi_entry
;
117 struct ldlm_namespace
*blwi_ns
;
118 struct ldlm_lock_desc blwi_ld
;
119 struct ldlm_lock
*blwi_lock
;
120 struct list_head blwi_head
;
122 struct completion blwi_comp
;
123 ldlm_cancel_flags_t blwi_flags
;
124 int blwi_mem_pressure
;
128 int ldlm_del_waiting_lock(struct ldlm_lock
*lock
)
133 int ldlm_refresh_waiting_lock(struct ldlm_lock
*lock
, int timeout
)
141 * Callback handler for receiving incoming blocking ASTs.
143 * This can only happen on client side.
145 void ldlm_handle_bl_callback(struct ldlm_namespace
*ns
,
146 struct ldlm_lock_desc
*ld
, struct ldlm_lock
*lock
)
150 LDLM_DEBUG(lock
, "client blocking AST callback handler");
152 lock_res_and_lock(lock
);
153 lock
->l_flags
|= LDLM_FL_CBPENDING
;
155 if (lock
->l_flags
& LDLM_FL_CANCEL_ON_BLOCK
)
156 lock
->l_flags
|= LDLM_FL_CANCEL
;
158 do_ast
= (!lock
->l_readers
&& !lock
->l_writers
);
159 unlock_res_and_lock(lock
);
162 CDEBUG(D_DLMTRACE
, "Lock %p already unused, calling callback (%p)\n",
163 lock
, lock
->l_blocking_ast
);
164 if (lock
->l_blocking_ast
!= NULL
)
165 lock
->l_blocking_ast(lock
, ld
, lock
->l_ast_data
,
168 CDEBUG(D_DLMTRACE
, "Lock %p is referenced, will be cancelled later\n",
172 LDLM_DEBUG(lock
, "client blocking callback handler END");
173 LDLM_LOCK_RELEASE(lock
);
177 * Callback handler for receiving incoming completion ASTs.
179 * This only can happen on client side.
181 static void ldlm_handle_cp_callback(struct ptlrpc_request
*req
,
182 struct ldlm_namespace
*ns
,
183 struct ldlm_request
*dlm_req
,
184 struct ldlm_lock
*lock
)
190 LDLM_DEBUG(lock
, "client completion callback handler START");
192 if (OBD_FAIL_CHECK(OBD_FAIL_LDLM_CANCEL_BL_CB_RACE
)) {
193 int to
= cfs_time_seconds(1);
195 schedule_timeout_and_set_state(
196 TASK_INTERRUPTIBLE
, to
);
197 if (lock
->l_granted_mode
== lock
->l_req_mode
||
198 lock
->l_flags
& LDLM_FL_DESTROYED
)
203 lvb_len
= req_capsule_get_size(&req
->rq_pill
, &RMF_DLM_LVB
, RCL_CLIENT
);
205 LDLM_ERROR(lock
, "Fail to get lvb_len, rc = %d", lvb_len
);
206 GOTO(out
, rc
= lvb_len
);
207 } else if (lvb_len
> 0) {
208 if (lock
->l_lvb_len
> 0) {
209 /* for extent lock, lvb contains ost_lvb{}. */
210 LASSERT(lock
->l_lvb_data
!= NULL
);
212 if (unlikely(lock
->l_lvb_len
< lvb_len
)) {
213 LDLM_ERROR(lock
, "Replied LVB is larger than "
214 "expectation, expected = %d, "
216 lock
->l_lvb_len
, lvb_len
);
217 GOTO(out
, rc
= -EINVAL
);
219 } else if (ldlm_has_layout(lock
)) { /* for layout lock, lvb has
223 OBD_ALLOC(lvb_data
, lvb_len
);
224 if (lvb_data
== NULL
) {
225 LDLM_ERROR(lock
, "No memory: %d.\n", lvb_len
);
226 GOTO(out
, rc
= -ENOMEM
);
229 lock_res_and_lock(lock
);
230 LASSERT(lock
->l_lvb_data
== NULL
);
231 lock
->l_lvb_data
= lvb_data
;
232 lock
->l_lvb_len
= lvb_len
;
233 unlock_res_and_lock(lock
);
237 lock_res_and_lock(lock
);
238 if ((lock
->l_flags
& LDLM_FL_DESTROYED
) ||
239 lock
->l_granted_mode
== lock
->l_req_mode
) {
240 /* bug 11300: the lock has already been granted */
241 unlock_res_and_lock(lock
);
242 LDLM_DEBUG(lock
, "Double grant race happened");
246 /* If we receive the completion AST before the actual enqueue returned,
247 * then we might need to switch lock modes, resources, or extents. */
248 if (dlm_req
->lock_desc
.l_granted_mode
!= lock
->l_req_mode
) {
249 lock
->l_req_mode
= dlm_req
->lock_desc
.l_granted_mode
;
250 LDLM_DEBUG(lock
, "completion AST, new lock mode");
253 if (lock
->l_resource
->lr_type
!= LDLM_PLAIN
) {
254 ldlm_convert_policy_to_local(req
->rq_export
,
255 dlm_req
->lock_desc
.l_resource
.lr_type
,
256 &dlm_req
->lock_desc
.l_policy_data
,
257 &lock
->l_policy_data
);
258 LDLM_DEBUG(lock
, "completion AST, new policy data");
261 ldlm_resource_unlink_lock(lock
);
262 if (memcmp(&dlm_req
->lock_desc
.l_resource
.lr_name
,
263 &lock
->l_resource
->lr_name
,
264 sizeof(lock
->l_resource
->lr_name
)) != 0) {
265 unlock_res_and_lock(lock
);
266 rc
= ldlm_lock_change_resource(ns
, lock
,
267 &dlm_req
->lock_desc
.l_resource
.lr_name
);
269 LDLM_ERROR(lock
, "Failed to allocate resource");
272 LDLM_DEBUG(lock
, "completion AST, new resource");
273 CERROR("change resource!\n");
274 lock_res_and_lock(lock
);
277 if (dlm_req
->lock_flags
& LDLM_FL_AST_SENT
) {
278 /* BL_AST locks are not needed in LRU.
279 * Let ldlm_cancel_lru() be fast. */
280 ldlm_lock_remove_from_lru(lock
);
281 lock
->l_flags
|= LDLM_FL_CBPENDING
| LDLM_FL_BL_AST
;
282 LDLM_DEBUG(lock
, "completion AST includes blocking AST");
285 if (lock
->l_lvb_len
> 0) {
286 rc
= ldlm_fill_lvb(lock
, &req
->rq_pill
, RCL_CLIENT
,
287 lock
->l_lvb_data
, lvb_len
);
289 unlock_res_and_lock(lock
);
294 ldlm_grant_lock(lock
, &ast_list
);
295 unlock_res_and_lock(lock
);
297 LDLM_DEBUG(lock
, "callback handler finished, about to run_ast_work");
299 /* Let Enqueue to call osc_lock_upcall() and initialize
301 OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_ENQ_RACE
, 2);
303 ldlm_run_ast_work(ns
, &ast_list
, LDLM_WORK_CP_AST
);
305 LDLM_DEBUG_NOLOCK("client completion callback handler END (lock %p)",
311 lock_res_and_lock(lock
);
312 lock
->l_flags
|= LDLM_FL_FAILED
;
313 unlock_res_and_lock(lock
);
314 wake_up(&lock
->l_waitq
);
316 LDLM_LOCK_RELEASE(lock
);
320 * Callback handler for receiving incoming glimpse ASTs.
322 * This only can happen on client side. After handling the glimpse AST
323 * we also consider dropping the lock here if it is unused locally for a
326 static void ldlm_handle_gl_callback(struct ptlrpc_request
*req
,
327 struct ldlm_namespace
*ns
,
328 struct ldlm_request
*dlm_req
,
329 struct ldlm_lock
*lock
)
333 LDLM_DEBUG(lock
, "client glimpse AST callback handler");
335 if (lock
->l_glimpse_ast
!= NULL
)
336 rc
= lock
->l_glimpse_ast(lock
, req
);
338 if (req
->rq_repmsg
!= NULL
) {
345 lock_res_and_lock(lock
);
346 if (lock
->l_granted_mode
== LCK_PW
&&
347 !lock
->l_readers
&& !lock
->l_writers
&&
348 cfs_time_after(cfs_time_current(),
349 cfs_time_add(lock
->l_last_used
,
350 cfs_time_seconds(10)))) {
351 unlock_res_and_lock(lock
);
352 if (ldlm_bl_to_thread_lock(ns
, NULL
, lock
))
353 ldlm_handle_bl_callback(ns
, NULL
, lock
);
357 unlock_res_and_lock(lock
);
358 LDLM_LOCK_RELEASE(lock
);
361 static int ldlm_callback_reply(struct ptlrpc_request
*req
, int rc
)
363 if (req
->rq_no_reply
)
367 if (!req
->rq_packed_final
) {
368 rc
= lustre_pack_reply(req
, 1, NULL
, NULL
);
372 return ptlrpc_reply(req
);
375 static int __ldlm_bl_to_thread(struct ldlm_bl_work_item
*blwi
,
376 ldlm_cancel_flags_t cancel_flags
)
378 struct ldlm_bl_pool
*blp
= ldlm_state
->ldlm_bl_pool
;
380 spin_lock(&blp
->blp_lock
);
381 if (blwi
->blwi_lock
&&
382 blwi
->blwi_lock
->l_flags
& LDLM_FL_DISCARD_DATA
) {
383 /* add LDLM_FL_DISCARD_DATA requests to the priority list */
384 list_add_tail(&blwi
->blwi_entry
, &blp
->blp_prio_list
);
386 /* other blocking callbacks are added to the regular list */
387 list_add_tail(&blwi
->blwi_entry
, &blp
->blp_list
);
389 spin_unlock(&blp
->blp_lock
);
391 wake_up(&blp
->blp_waitq
);
393 /* can not check blwi->blwi_flags as blwi could be already freed in
395 if (!(cancel_flags
& LCF_ASYNC
))
396 wait_for_completion(&blwi
->blwi_comp
);
401 static inline void init_blwi(struct ldlm_bl_work_item
*blwi
,
402 struct ldlm_namespace
*ns
,
403 struct ldlm_lock_desc
*ld
,
404 struct list_head
*cancels
, int count
,
405 struct ldlm_lock
*lock
,
406 ldlm_cancel_flags_t cancel_flags
)
408 init_completion(&blwi
->blwi_comp
);
409 INIT_LIST_HEAD(&blwi
->blwi_head
);
411 if (memory_pressure_get())
412 blwi
->blwi_mem_pressure
= 1;
415 blwi
->blwi_flags
= cancel_flags
;
419 list_add(&blwi
->blwi_head
, cancels
);
420 list_del_init(cancels
);
421 blwi
->blwi_count
= count
;
423 blwi
->blwi_lock
= lock
;
428 * Queues a list of locks \a cancels containing \a count locks
429 * for later processing by a blocking thread. If \a count is zero,
430 * then the lock referenced as \a lock is queued instead.
432 * The blocking thread would then call ->l_blocking_ast callback in the lock.
433 * If list addition fails an error is returned and caller is supposed to
434 * call ->l_blocking_ast itself.
436 static int ldlm_bl_to_thread(struct ldlm_namespace
*ns
,
437 struct ldlm_lock_desc
*ld
,
438 struct ldlm_lock
*lock
,
439 struct list_head
*cancels
, int count
,
440 ldlm_cancel_flags_t cancel_flags
)
442 if (cancels
&& count
== 0)
445 if (cancel_flags
& LCF_ASYNC
) {
446 struct ldlm_bl_work_item
*blwi
;
448 OBD_ALLOC(blwi
, sizeof(*blwi
));
451 init_blwi(blwi
, ns
, ld
, cancels
, count
, lock
, cancel_flags
);
453 return __ldlm_bl_to_thread(blwi
, cancel_flags
);
455 /* if it is synchronous call do minimum mem alloc, as it could
456 * be triggered from kernel shrinker
458 struct ldlm_bl_work_item blwi
;
460 memset(&blwi
, 0, sizeof(blwi
));
461 init_blwi(&blwi
, ns
, ld
, cancels
, count
, lock
, cancel_flags
);
462 return __ldlm_bl_to_thread(&blwi
, cancel_flags
);
467 int ldlm_bl_to_thread_lock(struct ldlm_namespace
*ns
, struct ldlm_lock_desc
*ld
,
468 struct ldlm_lock
*lock
)
470 return ldlm_bl_to_thread(ns
, ld
, lock
, NULL
, 0, LCF_ASYNC
);
473 int ldlm_bl_to_thread_list(struct ldlm_namespace
*ns
, struct ldlm_lock_desc
*ld
,
474 struct list_head
*cancels
, int count
,
475 ldlm_cancel_flags_t cancel_flags
)
477 return ldlm_bl_to_thread(ns
, ld
, NULL
, cancels
, count
, cancel_flags
);
480 /* Setinfo coming from Server (eg MDT) to Client (eg MDC)! */
481 static int ldlm_handle_setinfo(struct ptlrpc_request
*req
)
483 struct obd_device
*obd
= req
->rq_export
->exp_obd
;
489 DEBUG_REQ(D_HSM
, req
, "%s: handle setinfo\n", obd
->obd_name
);
491 req_capsule_set(&req
->rq_pill
, &RQF_OBD_SET_INFO
);
493 key
= req_capsule_client_get(&req
->rq_pill
, &RMF_SETINFO_KEY
);
495 DEBUG_REQ(D_IOCTL
, req
, "no set_info key");
498 keylen
= req_capsule_get_size(&req
->rq_pill
, &RMF_SETINFO_KEY
,
500 val
= req_capsule_client_get(&req
->rq_pill
, &RMF_SETINFO_VAL
);
502 DEBUG_REQ(D_IOCTL
, req
, "no set_info val");
505 vallen
= req_capsule_get_size(&req
->rq_pill
, &RMF_SETINFO_VAL
,
508 /* We are responsible for swabbing contents of val */
510 if (KEY_IS(KEY_HSM_COPYTOOL_SEND
))
511 /* Pass it on to mdc (the "export" in this case) */
512 rc
= obd_set_info_async(req
->rq_svc_thread
->t_env
,
514 sizeof(KEY_HSM_COPYTOOL_SEND
),
515 KEY_HSM_COPYTOOL_SEND
,
518 DEBUG_REQ(D_WARNING
, req
, "ignoring unknown key %s", key
);
523 static inline void ldlm_callback_errmsg(struct ptlrpc_request
*req
,
524 const char *msg
, int rc
,
525 struct lustre_handle
*handle
)
527 DEBUG_REQ((req
->rq_no_reply
|| rc
) ? D_WARNING
: D_DLMTRACE
, req
,
528 "%s: [nid %s] [rc %d] [lock "LPX64
"]",
529 msg
, libcfs_id2str(req
->rq_peer
), rc
,
530 handle
? handle
->cookie
: 0);
531 if (req
->rq_no_reply
)
532 CWARN("No reply was sent, maybe cause bug 21636.\n");
534 CWARN("Send reply failed, maybe cause bug 21636.\n");
537 static int ldlm_handle_qc_callback(struct ptlrpc_request
*req
)
539 struct obd_quotactl
*oqctl
;
540 struct client_obd
*cli
= &req
->rq_export
->exp_obd
->u
.cli
;
542 oqctl
= req_capsule_client_get(&req
->rq_pill
, &RMF_OBD_QUOTACTL
);
544 CERROR("Can't unpack obd_quotactl\n");
548 oqctl
->qc_stat
= ptlrpc_status_ntoh(oqctl
->qc_stat
);
550 cli
->cl_qchk_stat
= oqctl
->qc_stat
;
554 /* TODO: handle requests in a similar way as MDT: see mdt_handle_common() */
555 static int ldlm_callback_handler(struct ptlrpc_request
*req
)
557 struct ldlm_namespace
*ns
;
558 struct ldlm_request
*dlm_req
;
559 struct ldlm_lock
*lock
;
562 /* Requests arrive in sender's byte order. The ptlrpc service
563 * handler has already checked and, if necessary, byte-swapped the
564 * incoming request message body, but I am responsible for the
565 * message buffers. */
567 /* do nothing for sec context finalize */
568 if (lustre_msg_get_opc(req
->rq_reqmsg
) == SEC_CTX_FINI
)
571 req_capsule_init(&req
->rq_pill
, req
, RCL_SERVER
);
573 if (req
->rq_export
== NULL
) {
574 rc
= ldlm_callback_reply(req
, -ENOTCONN
);
575 ldlm_callback_errmsg(req
, "Operate on unconnected server",
580 LASSERT(req
->rq_export
!= NULL
);
581 LASSERT(req
->rq_export
->exp_obd
!= NULL
);
583 switch (lustre_msg_get_opc(req
->rq_reqmsg
)) {
584 case LDLM_BL_CALLBACK
:
585 if (OBD_FAIL_CHECK(OBD_FAIL_LDLM_BL_CALLBACK_NET
))
588 case LDLM_CP_CALLBACK
:
589 if (OBD_FAIL_CHECK(OBD_FAIL_LDLM_CP_CALLBACK_NET
))
592 case LDLM_GL_CALLBACK
:
593 if (OBD_FAIL_CHECK(OBD_FAIL_LDLM_GL_CALLBACK_NET
))
597 rc
= ldlm_handle_setinfo(req
);
598 ldlm_callback_reply(req
, rc
);
600 case OBD_LOG_CANCEL
: /* remove this eventually - for 1.4.0 compat */
601 CERROR("shouldn't be handling OBD_LOG_CANCEL on DLM thread\n");
602 req_capsule_set(&req
->rq_pill
, &RQF_LOG_CANCEL
);
603 if (OBD_FAIL_CHECK(OBD_FAIL_OBD_LOG_CANCEL_NET
))
605 rc
= llog_origin_handle_cancel(req
);
606 if (OBD_FAIL_CHECK(OBD_FAIL_OBD_LOG_CANCEL_REP
))
608 ldlm_callback_reply(req
, rc
);
610 case LLOG_ORIGIN_HANDLE_CREATE
:
611 req_capsule_set(&req
->rq_pill
, &RQF_LLOG_ORIGIN_HANDLE_CREATE
);
612 if (OBD_FAIL_CHECK(OBD_FAIL_OBD_LOGD_NET
))
614 rc
= llog_origin_handle_open(req
);
615 ldlm_callback_reply(req
, rc
);
617 case LLOG_ORIGIN_HANDLE_NEXT_BLOCK
:
618 req_capsule_set(&req
->rq_pill
,
619 &RQF_LLOG_ORIGIN_HANDLE_NEXT_BLOCK
);
620 if (OBD_FAIL_CHECK(OBD_FAIL_OBD_LOGD_NET
))
622 rc
= llog_origin_handle_next_block(req
);
623 ldlm_callback_reply(req
, rc
);
625 case LLOG_ORIGIN_HANDLE_READ_HEADER
:
626 req_capsule_set(&req
->rq_pill
,
627 &RQF_LLOG_ORIGIN_HANDLE_READ_HEADER
);
628 if (OBD_FAIL_CHECK(OBD_FAIL_OBD_LOGD_NET
))
630 rc
= llog_origin_handle_read_header(req
);
631 ldlm_callback_reply(req
, rc
);
633 case LLOG_ORIGIN_HANDLE_CLOSE
:
634 if (OBD_FAIL_CHECK(OBD_FAIL_OBD_LOGD_NET
))
636 rc
= llog_origin_handle_close(req
);
637 ldlm_callback_reply(req
, rc
);
639 case OBD_QC_CALLBACK
:
640 req_capsule_set(&req
->rq_pill
, &RQF_QC_CALLBACK
);
641 if (OBD_FAIL_CHECK(OBD_FAIL_OBD_QC_CALLBACK_NET
))
643 rc
= ldlm_handle_qc_callback(req
);
644 ldlm_callback_reply(req
, rc
);
647 CERROR("unknown opcode %u\n",
648 lustre_msg_get_opc(req
->rq_reqmsg
));
649 ldlm_callback_reply(req
, -EPROTO
);
653 ns
= req
->rq_export
->exp_obd
->obd_namespace
;
656 req_capsule_set(&req
->rq_pill
, &RQF_LDLM_CALLBACK
);
658 dlm_req
= req_capsule_client_get(&req
->rq_pill
, &RMF_DLM_REQ
);
659 if (dlm_req
== NULL
) {
660 rc
= ldlm_callback_reply(req
, -EPROTO
);
661 ldlm_callback_errmsg(req
, "Operate without parameter", rc
,
666 /* Force a known safe race, send a cancel to the server for a lock
667 * which the server has already started a blocking callback on. */
668 if (OBD_FAIL_CHECK(OBD_FAIL_LDLM_CANCEL_BL_CB_RACE
) &&
669 lustre_msg_get_opc(req
->rq_reqmsg
) == LDLM_BL_CALLBACK
) {
670 rc
= ldlm_cli_cancel(&dlm_req
->lock_handle
[0], 0);
672 CERROR("ldlm_cli_cancel: %d\n", rc
);
675 lock
= ldlm_handle2lock_long(&dlm_req
->lock_handle
[0], 0);
677 CDEBUG(D_DLMTRACE
, "callback on lock "LPX64
" - lock "
678 "disappeared\n", dlm_req
->lock_handle
[0].cookie
);
679 rc
= ldlm_callback_reply(req
, -EINVAL
);
680 ldlm_callback_errmsg(req
, "Operate with invalid parameter", rc
,
681 &dlm_req
->lock_handle
[0]);
685 if ((lock
->l_flags
& LDLM_FL_FAIL_LOC
) &&
686 lustre_msg_get_opc(req
->rq_reqmsg
) == LDLM_BL_CALLBACK
)
687 OBD_RACE(OBD_FAIL_LDLM_CP_BL_RACE
);
689 /* Copy hints/flags (e.g. LDLM_FL_DISCARD_DATA) from AST. */
690 lock_res_and_lock(lock
);
691 lock
->l_flags
|= ldlm_flags_from_wire(dlm_req
->lock_flags
&
693 if (lustre_msg_get_opc(req
->rq_reqmsg
) == LDLM_BL_CALLBACK
) {
694 /* If somebody cancels lock and cache is already dropped,
695 * or lock is failed before cp_ast received on client,
696 * we can tell the server we have no lock. Otherwise, we
697 * should send cancel after dropping the cache. */
698 if (((lock
->l_flags
& LDLM_FL_CANCELING
) &&
699 (lock
->l_flags
& LDLM_FL_BL_DONE
)) ||
700 (lock
->l_flags
& LDLM_FL_FAILED
)) {
701 LDLM_DEBUG(lock
, "callback on lock "
702 LPX64
" - lock disappeared\n",
703 dlm_req
->lock_handle
[0].cookie
);
704 unlock_res_and_lock(lock
);
705 LDLM_LOCK_RELEASE(lock
);
706 rc
= ldlm_callback_reply(req
, -EINVAL
);
707 ldlm_callback_errmsg(req
, "Operate on stale lock", rc
,
708 &dlm_req
->lock_handle
[0]);
711 /* BL_AST locks are not needed in LRU.
712 * Let ldlm_cancel_lru() be fast. */
713 ldlm_lock_remove_from_lru(lock
);
714 lock
->l_flags
|= LDLM_FL_BL_AST
;
716 unlock_res_and_lock(lock
);
718 /* We want the ost thread to get this reply so that it can respond
719 * to ost requests (write cache writeback) that might be triggered
722 * But we'd also like to be able to indicate in the reply that we're
723 * cancelling right now, because it's unused, or have an intent result
724 * in the reply, so we might have to push the responsibility for sending
725 * the reply down into the AST handlers, alas. */
727 switch (lustre_msg_get_opc(req
->rq_reqmsg
)) {
728 case LDLM_BL_CALLBACK
:
729 CDEBUG(D_INODE
, "blocking ast\n");
730 req_capsule_extend(&req
->rq_pill
, &RQF_LDLM_BL_CALLBACK
);
731 if (!(lock
->l_flags
& LDLM_FL_CANCEL_ON_BLOCK
)) {
732 rc
= ldlm_callback_reply(req
, 0);
733 if (req
->rq_no_reply
|| rc
)
734 ldlm_callback_errmsg(req
, "Normal process", rc
,
735 &dlm_req
->lock_handle
[0]);
737 if (ldlm_bl_to_thread_lock(ns
, &dlm_req
->lock_desc
, lock
))
738 ldlm_handle_bl_callback(ns
, &dlm_req
->lock_desc
, lock
);
740 case LDLM_CP_CALLBACK
:
741 CDEBUG(D_INODE
, "completion ast\n");
742 req_capsule_extend(&req
->rq_pill
, &RQF_LDLM_CP_CALLBACK
);
743 ldlm_callback_reply(req
, 0);
744 ldlm_handle_cp_callback(req
, ns
, dlm_req
, lock
);
746 case LDLM_GL_CALLBACK
:
747 CDEBUG(D_INODE
, "glimpse ast\n");
748 req_capsule_extend(&req
->rq_pill
, &RQF_LDLM_GL_CALLBACK
);
749 ldlm_handle_gl_callback(req
, ns
, dlm_req
, lock
);
752 LBUG(); /* checked above */
759 static struct ldlm_bl_work_item
*ldlm_bl_get_work(struct ldlm_bl_pool
*blp
)
761 struct ldlm_bl_work_item
*blwi
= NULL
;
762 static unsigned int num_bl
= 0;
764 spin_lock(&blp
->blp_lock
);
765 /* process a request from the blp_list at least every blp_num_threads */
766 if (!list_empty(&blp
->blp_list
) &&
767 (list_empty(&blp
->blp_prio_list
) || num_bl
== 0))
768 blwi
= list_entry(blp
->blp_list
.next
,
769 struct ldlm_bl_work_item
, blwi_entry
);
771 if (!list_empty(&blp
->blp_prio_list
))
772 blwi
= list_entry(blp
->blp_prio_list
.next
,
773 struct ldlm_bl_work_item
,
777 if (++num_bl
>= atomic_read(&blp
->blp_num_threads
))
779 list_del(&blwi
->blwi_entry
);
781 spin_unlock(&blp
->blp_lock
);
786 /* This only contains temporary data until the thread starts */
787 struct ldlm_bl_thread_data
{
788 char bltd_name
[CFS_CURPROC_COMM_MAX
];
789 struct ldlm_bl_pool
*bltd_blp
;
790 struct completion bltd_comp
;
794 static int ldlm_bl_thread_main(void *arg
);
796 static int ldlm_bl_thread_start(struct ldlm_bl_pool
*blp
)
798 struct ldlm_bl_thread_data bltd
= { .bltd_blp
= blp
};
801 init_completion(&bltd
.bltd_comp
);
802 bltd
.bltd_num
= atomic_read(&blp
->blp_num_threads
);
803 snprintf(bltd
.bltd_name
, sizeof(bltd
.bltd_name
) - 1,
804 "ldlm_bl_%02d", bltd
.bltd_num
);
805 task
= kthread_run(ldlm_bl_thread_main
, &bltd
, bltd
.bltd_name
);
807 CERROR("cannot start LDLM thread ldlm_bl_%02d: rc %ld\n",
808 atomic_read(&blp
->blp_num_threads
), PTR_ERR(task
));
809 return PTR_ERR(task
);
811 wait_for_completion(&bltd
.bltd_comp
);
817 * Main blocking requests processing thread.
819 * Callers put locks into its queue by calling ldlm_bl_to_thread.
820 * This thread in the end ends up doing actual call to ->l_blocking_ast
823 static int ldlm_bl_thread_main(void *arg
)
825 struct ldlm_bl_pool
*blp
;
828 struct ldlm_bl_thread_data
*bltd
= arg
;
830 blp
= bltd
->bltd_blp
;
832 atomic_inc(&blp
->blp_num_threads
);
833 atomic_inc(&blp
->blp_busy_threads
);
835 complete(&bltd
->bltd_comp
);
836 /* cannot use bltd after this, it is only on caller's stack */
840 struct l_wait_info lwi
= { 0 };
841 struct ldlm_bl_work_item
*blwi
= NULL
;
844 blwi
= ldlm_bl_get_work(blp
);
847 atomic_dec(&blp
->blp_busy_threads
);
848 l_wait_event_exclusive(blp
->blp_waitq
,
849 (blwi
= ldlm_bl_get_work(blp
)) != NULL
,
851 busy
= atomic_inc_return(&blp
->blp_busy_threads
);
853 busy
= atomic_read(&blp
->blp_busy_threads
);
856 if (blwi
->blwi_ns
== NULL
)
857 /* added by ldlm_cleanup() */
860 /* Not fatal if racy and have a few too many threads */
861 if (unlikely(busy
< blp
->blp_max_threads
&&
862 busy
>= atomic_read(&blp
->blp_num_threads
) &&
863 !blwi
->blwi_mem_pressure
))
864 /* discard the return value, we tried */
865 ldlm_bl_thread_start(blp
);
867 if (blwi
->blwi_mem_pressure
)
868 memory_pressure_set();
870 if (blwi
->blwi_count
) {
872 /* The special case when we cancel locks in LRU
873 * asynchronously, we pass the list of locks here.
874 * Thus locks are marked LDLM_FL_CANCELING, but NOT
875 * canceled locally yet. */
876 count
= ldlm_cli_cancel_list_local(&blwi
->blwi_head
,
879 ldlm_cli_cancel_list(&blwi
->blwi_head
, count
, NULL
,
882 ldlm_handle_bl_callback(blwi
->blwi_ns
, &blwi
->blwi_ld
,
885 if (blwi
->blwi_mem_pressure
)
886 memory_pressure_clr();
888 if (blwi
->blwi_flags
& LCF_ASYNC
)
889 OBD_FREE(blwi
, sizeof(*blwi
));
891 complete(&blwi
->blwi_comp
);
894 atomic_dec(&blp
->blp_busy_threads
);
895 atomic_dec(&blp
->blp_num_threads
);
896 complete(&blp
->blp_comp
);
901 static int ldlm_setup(void);
902 static int ldlm_cleanup(void);
904 int ldlm_get_ref(void)
908 mutex_lock(&ldlm_ref_mutex
);
909 if (++ldlm_refcount
== 1) {
914 mutex_unlock(&ldlm_ref_mutex
);
918 EXPORT_SYMBOL(ldlm_get_ref
);
920 void ldlm_put_ref(void)
922 mutex_lock(&ldlm_ref_mutex
);
923 if (ldlm_refcount
== 1) {
924 int rc
= ldlm_cleanup();
926 CERROR("ldlm_cleanup failed: %d\n", rc
);
932 mutex_unlock(&ldlm_ref_mutex
);
934 EXPORT_SYMBOL(ldlm_put_ref
);
937 * Export handle<->lock hash operations.
940 ldlm_export_lock_hash(cfs_hash_t
*hs
, const void *key
, unsigned mask
)
942 return cfs_hash_u64_hash(((struct lustre_handle
*)key
)->cookie
, mask
);
946 ldlm_export_lock_key(struct hlist_node
*hnode
)
948 struct ldlm_lock
*lock
;
950 lock
= hlist_entry(hnode
, struct ldlm_lock
, l_exp_hash
);
951 return &lock
->l_remote_handle
;
955 ldlm_export_lock_keycpy(struct hlist_node
*hnode
, void *key
)
957 struct ldlm_lock
*lock
;
959 lock
= hlist_entry(hnode
, struct ldlm_lock
, l_exp_hash
);
960 lock
->l_remote_handle
= *(struct lustre_handle
*)key
;
964 ldlm_export_lock_keycmp(const void *key
, struct hlist_node
*hnode
)
966 return lustre_handle_equal(ldlm_export_lock_key(hnode
), key
);
970 ldlm_export_lock_object(struct hlist_node
*hnode
)
972 return hlist_entry(hnode
, struct ldlm_lock
, l_exp_hash
);
976 ldlm_export_lock_get(cfs_hash_t
*hs
, struct hlist_node
*hnode
)
978 struct ldlm_lock
*lock
;
980 lock
= hlist_entry(hnode
, struct ldlm_lock
, l_exp_hash
);
985 ldlm_export_lock_put(cfs_hash_t
*hs
, struct hlist_node
*hnode
)
987 struct ldlm_lock
*lock
;
989 lock
= hlist_entry(hnode
, struct ldlm_lock
, l_exp_hash
);
990 LDLM_LOCK_RELEASE(lock
);
993 static cfs_hash_ops_t ldlm_export_lock_ops
= {
994 .hs_hash
= ldlm_export_lock_hash
,
995 .hs_key
= ldlm_export_lock_key
,
996 .hs_keycmp
= ldlm_export_lock_keycmp
,
997 .hs_keycpy
= ldlm_export_lock_keycpy
,
998 .hs_object
= ldlm_export_lock_object
,
999 .hs_get
= ldlm_export_lock_get
,
1000 .hs_put
= ldlm_export_lock_put
,
1001 .hs_put_locked
= ldlm_export_lock_put
,
1004 int ldlm_init_export(struct obd_export
*exp
)
1006 exp
->exp_lock_hash
=
1007 cfs_hash_create(obd_uuid2str(&exp
->exp_client_uuid
),
1008 HASH_EXP_LOCK_CUR_BITS
,
1009 HASH_EXP_LOCK_MAX_BITS
,
1010 HASH_EXP_LOCK_BKT_BITS
, 0,
1011 CFS_HASH_MIN_THETA
, CFS_HASH_MAX_THETA
,
1012 &ldlm_export_lock_ops
,
1013 CFS_HASH_DEFAULT
| CFS_HASH_REHASH_KEY
|
1014 CFS_HASH_NBLK_CHANGE
);
1016 if (!exp
->exp_lock_hash
)
1021 EXPORT_SYMBOL(ldlm_init_export
);
1023 void ldlm_destroy_export(struct obd_export
*exp
)
1025 cfs_hash_putref(exp
->exp_lock_hash
);
1026 exp
->exp_lock_hash
= NULL
;
1028 ldlm_destroy_flock_export(exp
);
1030 EXPORT_SYMBOL(ldlm_destroy_export
);
1032 static int ldlm_setup(void)
1034 static struct ptlrpc_service_conf conf
;
1035 struct ldlm_bl_pool
*blp
= NULL
;
1039 if (ldlm_state
!= NULL
)
1042 OBD_ALLOC(ldlm_state
, sizeof(*ldlm_state
));
1043 if (ldlm_state
== NULL
)
1047 rc
= ldlm_proc_setup();
1052 memset(&conf
, 0, sizeof(conf
));
1053 conf
= (typeof(conf
)) {
1054 .psc_name
= "ldlm_cbd",
1055 .psc_watchdog_factor
= 2,
1057 .bc_nbufs
= LDLM_CLIENT_NBUFS
,
1058 .bc_buf_size
= LDLM_BUFSIZE
,
1059 .bc_req_max_size
= LDLM_MAXREQSIZE
,
1060 .bc_rep_max_size
= LDLM_MAXREPSIZE
,
1061 .bc_req_portal
= LDLM_CB_REQUEST_PORTAL
,
1062 .bc_rep_portal
= LDLM_CB_REPLY_PORTAL
,
1065 .tc_thr_name
= "ldlm_cb",
1066 .tc_thr_factor
= LDLM_THR_FACTOR
,
1067 .tc_nthrs_init
= LDLM_NTHRS_INIT
,
1068 .tc_nthrs_base
= LDLM_NTHRS_BASE
,
1069 .tc_nthrs_max
= LDLM_NTHRS_MAX
,
1070 .tc_nthrs_user
= ldlm_num_threads
,
1071 .tc_cpu_affinity
= 1,
1072 .tc_ctx_tags
= LCT_MD_THREAD
| LCT_DT_THREAD
,
1075 .cc_pattern
= ldlm_cpts
,
1078 .so_req_handler
= ldlm_callback_handler
,
1081 ldlm_state
->ldlm_cb_service
= \
1082 ptlrpc_register_service(&conf
, ldlm_svc_proc_dir
);
1083 if (IS_ERR(ldlm_state
->ldlm_cb_service
)) {
1084 CERROR("failed to start service\n");
1085 rc
= PTR_ERR(ldlm_state
->ldlm_cb_service
);
1086 ldlm_state
->ldlm_cb_service
= NULL
;
1091 OBD_ALLOC(blp
, sizeof(*blp
));
1093 GOTO(out
, rc
= -ENOMEM
);
1094 ldlm_state
->ldlm_bl_pool
= blp
;
1096 spin_lock_init(&blp
->blp_lock
);
1097 INIT_LIST_HEAD(&blp
->blp_list
);
1098 INIT_LIST_HEAD(&blp
->blp_prio_list
);
1099 init_waitqueue_head(&blp
->blp_waitq
);
1100 atomic_set(&blp
->blp_num_threads
, 0);
1101 atomic_set(&blp
->blp_busy_threads
, 0);
1103 if (ldlm_num_threads
== 0) {
1104 blp
->blp_min_threads
= LDLM_NTHRS_INIT
;
1105 blp
->blp_max_threads
= LDLM_NTHRS_MAX
;
1107 blp
->blp_min_threads
= blp
->blp_max_threads
= \
1108 min_t(int, LDLM_NTHRS_MAX
, max_t(int, LDLM_NTHRS_INIT
,
1112 for (i
= 0; i
< blp
->blp_min_threads
; i
++) {
1113 rc
= ldlm_bl_thread_start(blp
);
1119 rc
= ldlm_pools_init();
1121 CERROR("Failed to initialize LDLM pools: %d\n", rc
);
1131 static int ldlm_cleanup(void)
1133 if (!list_empty(ldlm_namespace_list(LDLM_NAMESPACE_SERVER
)) ||
1134 !list_empty(ldlm_namespace_list(LDLM_NAMESPACE_CLIENT
))) {
1135 CERROR("ldlm still has namespaces; clean these up first.\n");
1136 ldlm_dump_all_namespaces(LDLM_NAMESPACE_SERVER
, D_DLMTRACE
);
1137 ldlm_dump_all_namespaces(LDLM_NAMESPACE_CLIENT
, D_DLMTRACE
);
1143 if (ldlm_state
->ldlm_bl_pool
!= NULL
) {
1144 struct ldlm_bl_pool
*blp
= ldlm_state
->ldlm_bl_pool
;
1146 while (atomic_read(&blp
->blp_num_threads
) > 0) {
1147 struct ldlm_bl_work_item blwi
= { .blwi_ns
= NULL
};
1149 init_completion(&blp
->blp_comp
);
1151 spin_lock(&blp
->blp_lock
);
1152 list_add_tail(&blwi
.blwi_entry
, &blp
->blp_list
);
1153 wake_up(&blp
->blp_waitq
);
1154 spin_unlock(&blp
->blp_lock
);
1156 wait_for_completion(&blp
->blp_comp
);
1159 OBD_FREE(blp
, sizeof(*blp
));
1162 if (ldlm_state
->ldlm_cb_service
!= NULL
)
1163 ptlrpc_unregister_service(ldlm_state
->ldlm_cb_service
);
1165 ldlm_proc_cleanup();
1168 OBD_FREE(ldlm_state
, sizeof(*ldlm_state
));
1176 mutex_init(&ldlm_ref_mutex
);
1177 mutex_init(ldlm_namespace_lock(LDLM_NAMESPACE_SERVER
));
1178 mutex_init(ldlm_namespace_lock(LDLM_NAMESPACE_CLIENT
));
1179 ldlm_resource_slab
= kmem_cache_create("ldlm_resources",
1180 sizeof(struct ldlm_resource
), 0,
1181 SLAB_HWCACHE_ALIGN
, NULL
);
1182 if (ldlm_resource_slab
== NULL
)
1185 ldlm_lock_slab
= kmem_cache_create("ldlm_locks",
1186 sizeof(struct ldlm_lock
), 0,
1187 SLAB_HWCACHE_ALIGN
| SLAB_DESTROY_BY_RCU
, NULL
);
1188 if (ldlm_lock_slab
== NULL
) {
1189 kmem_cache_destroy(ldlm_resource_slab
);
1193 ldlm_interval_slab
= kmem_cache_create("interval_node",
1194 sizeof(struct ldlm_interval
),
1195 0, SLAB_HWCACHE_ALIGN
, NULL
);
1196 if (ldlm_interval_slab
== NULL
) {
1197 kmem_cache_destroy(ldlm_resource_slab
);
1198 kmem_cache_destroy(ldlm_lock_slab
);
1201 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1202 class_export_dump_hook
= ldlm_dump_export_locks
;
1207 void ldlm_exit(void)
1210 CERROR("ldlm_refcount is %d in ldlm_exit!\n", ldlm_refcount
);
1211 kmem_cache_destroy(ldlm_resource_slab
);
1212 /* ldlm_lock_put() use RCU to call ldlm_lock_free, so need call
1213 * synchronize_rcu() to wait a grace period elapsed, so that
1214 * ldlm_lock_free() get a chance to be called. */
1216 kmem_cache_destroy(ldlm_lock_slab
);
1217 kmem_cache_destroy(ldlm_interval_slab
);