4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License version 2 for more details (a copy is included
14 * in the LICENSE file that accompanied this code).
16 * You should have received a copy of the GNU General Public License
17 * version 2 along with this program; If not, see
18 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
20 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21 * CA 95054 USA or visit www.sun.com if you need additional information or
27 * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
28 * Use is subject to license terms.
30 * Copyright (c) 2011, 2013, Intel Corporation.
33 * This file is part of Lustre, http://www.lustre.org/
34 * Lustre is a trademark of Sun Microsystems, Inc.
36 * lustre/fld/fld_request.c
38 * FLD (Fids Location Database)
40 * Author: Yury Umanets <umka@clusterfs.com>
43 #define DEBUG_SUBSYSTEM S_FLD
45 #include "../../include/linux/libcfs/libcfs.h"
46 # include <linux/module.h>
47 # include <asm/div64.h>
50 #include <obd_class.h>
51 #include <lustre_ver.h>
52 #include <obd_support.h>
53 #include <lprocfs_status.h>
55 #include <dt_object.h>
56 #include <md_object.h>
57 #include <lustre_req_layout.h>
58 #include <lustre_fld.h>
59 #include <lustre_mdc.h>
60 #include "fld_internal.h"
62 /* TODO: these 3 functions are copies of flow-control code from mdc_lib.c
63 * It should be common thing. The same about mdc RPC lock */
64 static int fld_req_avail(struct client_obd
*cli
, struct mdc_cache_waiter
*mcw
)
68 client_obd_list_lock(&cli
->cl_loi_list_lock
);
69 rc
= list_empty(&mcw
->mcw_entry
);
70 client_obd_list_unlock(&cli
->cl_loi_list_lock
);
74 static void fld_enter_request(struct client_obd
*cli
)
76 struct mdc_cache_waiter mcw
;
77 struct l_wait_info lwi
= { 0 };
79 client_obd_list_lock(&cli
->cl_loi_list_lock
);
80 if (cli
->cl_r_in_flight
>= cli
->cl_max_rpcs_in_flight
) {
81 list_add_tail(&mcw
.mcw_entry
, &cli
->cl_cache_waiters
);
82 init_waitqueue_head(&mcw
.mcw_waitq
);
83 client_obd_list_unlock(&cli
->cl_loi_list_lock
);
84 l_wait_event(mcw
.mcw_waitq
, fld_req_avail(cli
, &mcw
), &lwi
);
86 cli
->cl_r_in_flight
++;
87 client_obd_list_unlock(&cli
->cl_loi_list_lock
);
91 static void fld_exit_request(struct client_obd
*cli
)
93 struct list_head
*l
, *tmp
;
94 struct mdc_cache_waiter
*mcw
;
96 client_obd_list_lock(&cli
->cl_loi_list_lock
);
97 cli
->cl_r_in_flight
--;
98 list_for_each_safe(l
, tmp
, &cli
->cl_cache_waiters
) {
100 if (cli
->cl_r_in_flight
>= cli
->cl_max_rpcs_in_flight
) {
101 /* No free request slots anymore */
105 mcw
= list_entry(l
, struct mdc_cache_waiter
, mcw_entry
);
106 list_del_init(&mcw
->mcw_entry
);
107 cli
->cl_r_in_flight
++;
108 wake_up(&mcw
->mcw_waitq
);
110 client_obd_list_unlock(&cli
->cl_loi_list_lock
);
113 static int fld_rrb_hash(struct lu_client_fld
*fld
,
116 LASSERT(fld
->lcf_count
> 0);
117 return do_div(seq
, fld
->lcf_count
);
120 static struct lu_fld_target
*
121 fld_rrb_scan(struct lu_client_fld
*fld
, seqno_t seq
)
123 struct lu_fld_target
*target
;
126 /* Because almost all of special sequence located in MDT0,
127 * it should go to index 0 directly, instead of calculating
128 * hash again, and also if other MDTs is not being connected,
129 * the fld lookup requests(for seq on MDT0) should not be
130 * blocked because of other MDTs */
131 if (fid_seq_is_norm(seq
))
132 hash
= fld_rrb_hash(fld
, seq
);
136 list_for_each_entry(target
, &fld
->lcf_targets
, ft_chain
) {
137 if (target
->ft_idx
== hash
)
141 CERROR("%s: Can't find target by hash %d (seq "LPX64
"). Targets (%d):\n",
142 fld
->lcf_name
, hash
, seq
, fld
->lcf_count
);
144 list_for_each_entry(target
, &fld
->lcf_targets
, ft_chain
) {
145 const char *srv_name
= target
->ft_srv
!= NULL
?
146 target
->ft_srv
->lsf_name
: "<null>";
147 const char *exp_name
= target
->ft_exp
!= NULL
?
148 (char *)target
->ft_exp
->exp_obd
->obd_uuid
.uuid
:
151 CERROR(" exp: 0x%p (%s), srv: 0x%p (%s), idx: "LPU64
"\n",
152 target
->ft_exp
, exp_name
, target
->ft_srv
,
153 srv_name
, target
->ft_idx
);
157 * If target is not found, there is logical error anyway, so here is
158 * LBUG() to catch this situation.
164 struct lu_fld_hash fld_hash
[] = {
167 .fh_hash_func
= fld_rrb_hash
,
168 .fh_scan_func
= fld_rrb_scan
175 static struct lu_fld_target
*
176 fld_client_get_target(struct lu_client_fld
*fld
, seqno_t seq
)
178 struct lu_fld_target
*target
;
180 LASSERT(fld
->lcf_hash
!= NULL
);
182 spin_lock(&fld
->lcf_lock
);
183 target
= fld
->lcf_hash
->fh_scan_func(fld
, seq
);
184 spin_unlock(&fld
->lcf_lock
);
186 if (target
!= NULL
) {
187 CDEBUG(D_INFO
, "%s: Found target (idx "LPU64
188 ") by seq "LPX64
"\n", fld
->lcf_name
,
189 target
->ft_idx
, seq
);
196 * Add export to FLD. This is usually done by CMM and LMV as they are main users
199 int fld_client_add_target(struct lu_client_fld
*fld
,
200 struct lu_fld_target
*tar
)
203 struct lu_fld_target
*target
, *tmp
;
205 LASSERT(tar
!= NULL
);
206 name
= fld_target_name(tar
);
207 LASSERT(name
!= NULL
);
208 LASSERT(tar
->ft_srv
!= NULL
|| tar
->ft_exp
!= NULL
);
210 if (fld
->lcf_flags
!= LUSTRE_FLD_INIT
) {
211 CERROR("%s: Attempt to add target %s (idx "LPU64
") on fly - skip it\n",
212 fld
->lcf_name
, name
, tar
->ft_idx
);
215 CDEBUG(D_INFO
, "%s: Adding target %s (idx "
216 LPU64
")\n", fld
->lcf_name
, name
, tar
->ft_idx
);
219 OBD_ALLOC_PTR(target
);
223 spin_lock(&fld
->lcf_lock
);
224 list_for_each_entry(tmp
, &fld
->lcf_targets
, ft_chain
) {
225 if (tmp
->ft_idx
== tar
->ft_idx
) {
226 spin_unlock(&fld
->lcf_lock
);
227 OBD_FREE_PTR(target
);
228 CERROR("Target %s exists in FLD and known as %s:#"LPU64
"\n",
229 name
, fld_target_name(tmp
), tmp
->ft_idx
);
234 target
->ft_exp
= tar
->ft_exp
;
235 if (target
->ft_exp
!= NULL
)
236 class_export_get(target
->ft_exp
);
237 target
->ft_srv
= tar
->ft_srv
;
238 target
->ft_idx
= tar
->ft_idx
;
240 list_add_tail(&target
->ft_chain
,
244 spin_unlock(&fld
->lcf_lock
);
248 EXPORT_SYMBOL(fld_client_add_target
);
250 /* Remove export from FLD */
251 int fld_client_del_target(struct lu_client_fld
*fld
, __u64 idx
)
253 struct lu_fld_target
*target
, *tmp
;
255 spin_lock(&fld
->lcf_lock
);
256 list_for_each_entry_safe(target
, tmp
,
257 &fld
->lcf_targets
, ft_chain
) {
258 if (target
->ft_idx
== idx
) {
260 list_del(&target
->ft_chain
);
261 spin_unlock(&fld
->lcf_lock
);
263 if (target
->ft_exp
!= NULL
)
264 class_export_put(target
->ft_exp
);
266 OBD_FREE_PTR(target
);
270 spin_unlock(&fld
->lcf_lock
);
273 EXPORT_SYMBOL(fld_client_del_target
);
275 struct proc_dir_entry
*fld_type_proc_dir
= NULL
;
278 static int fld_client_proc_init(struct lu_client_fld
*fld
)
282 fld
->lcf_proc_dir
= lprocfs_register(fld
->lcf_name
,
286 if (IS_ERR(fld
->lcf_proc_dir
)) {
287 CERROR("%s: LProcFS failed in fld-init\n",
289 rc
= PTR_ERR(fld
->lcf_proc_dir
);
293 rc
= lprocfs_add_vars(fld
->lcf_proc_dir
,
294 fld_client_proc_list
, fld
);
296 CERROR("%s: Can't init FLD proc, rc %d\n",
298 GOTO(out_cleanup
, rc
);
304 fld_client_proc_fini(fld
);
308 void fld_client_proc_fini(struct lu_client_fld
*fld
)
310 if (fld
->lcf_proc_dir
) {
311 if (!IS_ERR(fld
->lcf_proc_dir
))
312 lprocfs_remove(&fld
->lcf_proc_dir
);
313 fld
->lcf_proc_dir
= NULL
;
317 static int fld_client_proc_init(struct lu_client_fld
*fld
)
322 void fld_client_proc_fini(struct lu_client_fld
*fld
)
327 EXPORT_SYMBOL(fld_client_proc_fini
);
329 static inline int hash_is_sane(int hash
)
331 return (hash
>= 0 && hash
< ARRAY_SIZE(fld_hash
));
334 int fld_client_init(struct lu_client_fld
*fld
,
335 const char *prefix
, int hash
)
337 int cache_size
, cache_threshold
;
340 LASSERT(fld
!= NULL
);
342 snprintf(fld
->lcf_name
, sizeof(fld
->lcf_name
),
345 if (!hash_is_sane(hash
)) {
346 CERROR("%s: Wrong hash function %#x\n",
347 fld
->lcf_name
, hash
);
352 spin_lock_init(&fld
->lcf_lock
);
353 fld
->lcf_hash
= &fld_hash
[hash
];
354 fld
->lcf_flags
= LUSTRE_FLD_INIT
;
355 INIT_LIST_HEAD(&fld
->lcf_targets
);
357 cache_size
= FLD_CLIENT_CACHE_SIZE
/
358 sizeof(struct fld_cache_entry
);
360 cache_threshold
= cache_size
*
361 FLD_CLIENT_CACHE_THRESHOLD
/ 100;
363 fld
->lcf_cache
= fld_cache_init(fld
->lcf_name
,
364 cache_size
, cache_threshold
);
365 if (IS_ERR(fld
->lcf_cache
)) {
366 rc
= PTR_ERR(fld
->lcf_cache
);
367 fld
->lcf_cache
= NULL
;
371 rc
= fld_client_proc_init(fld
);
376 fld_client_fini(fld
);
378 CDEBUG(D_INFO
, "%s: Using \"%s\" hash\n",
379 fld
->lcf_name
, fld
->lcf_hash
->fh_name
);
382 EXPORT_SYMBOL(fld_client_init
);
384 void fld_client_fini(struct lu_client_fld
*fld
)
386 struct lu_fld_target
*target
, *tmp
;
388 spin_lock(&fld
->lcf_lock
);
389 list_for_each_entry_safe(target
, tmp
,
390 &fld
->lcf_targets
, ft_chain
) {
392 list_del(&target
->ft_chain
);
393 if (target
->ft_exp
!= NULL
)
394 class_export_put(target
->ft_exp
);
395 OBD_FREE_PTR(target
);
397 spin_unlock(&fld
->lcf_lock
);
399 if (fld
->lcf_cache
!= NULL
) {
400 if (!IS_ERR(fld
->lcf_cache
))
401 fld_cache_fini(fld
->lcf_cache
);
402 fld
->lcf_cache
= NULL
;
405 EXPORT_SYMBOL(fld_client_fini
);
407 int fld_client_rpc(struct obd_export
*exp
,
408 struct lu_seq_range
*range
, __u32 fld_op
)
410 struct ptlrpc_request
*req
;
411 struct lu_seq_range
*prange
;
414 struct obd_import
*imp
;
416 LASSERT(exp
!= NULL
);
418 imp
= class_exp2cliimp(exp
);
419 req
= ptlrpc_request_alloc_pack(imp
, &RQF_FLD_QUERY
, LUSTRE_MDS_VERSION
,
424 op
= req_capsule_client_get(&req
->rq_pill
, &RMF_FLD_OPC
);
427 prange
= req_capsule_client_get(&req
->rq_pill
, &RMF_FLD_MDFLD
);
430 ptlrpc_request_set_replen(req
);
431 req
->rq_request_portal
= FLD_REQUEST_PORTAL
;
432 ptlrpc_at_set_req_timeout(req
);
434 if (fld_op
== FLD_LOOKUP
&&
435 imp
->imp_connect_flags_orig
& OBD_CONNECT_MDS_MDS
)
436 req
->rq_allow_replay
= 1;
438 if (fld_op
!= FLD_LOOKUP
)
439 mdc_get_rpc_lock(exp
->exp_obd
->u
.cli
.cl_rpc_lock
, NULL
);
440 fld_enter_request(&exp
->exp_obd
->u
.cli
);
441 rc
= ptlrpc_queue_wait(req
);
442 fld_exit_request(&exp
->exp_obd
->u
.cli
);
443 if (fld_op
!= FLD_LOOKUP
)
444 mdc_put_rpc_lock(exp
->exp_obd
->u
.cli
.cl_rpc_lock
, NULL
);
448 prange
= req_capsule_server_get(&req
->rq_pill
, &RMF_FLD_MDFLD
);
450 GOTO(out_req
, rc
= -EFAULT
);
453 ptlrpc_req_finished(req
);
457 int fld_client_lookup(struct lu_client_fld
*fld
, seqno_t seq
, mdsno_t
*mds
,
458 __u32 flags
, const struct lu_env
*env
)
460 struct lu_seq_range res
= { 0 };
461 struct lu_fld_target
*target
;
464 fld
->lcf_flags
|= LUSTRE_FLD_RUN
;
466 rc
= fld_cache_lookup(fld
->lcf_cache
, seq
, &res
);
468 *mds
= res
.lsr_index
;
472 /* Can not find it in the cache */
473 target
= fld_client_get_target(fld
, seq
);
474 LASSERT(target
!= NULL
);
476 CDEBUG(D_INFO
, "%s: Lookup fld entry (seq: "LPX64
") on target %s (idx "LPU64
")\n",
477 fld
->lcf_name
, seq
, fld_target_name(target
), target
->ft_idx
);
480 fld_range_set_type(&res
, flags
);
481 rc
= fld_client_rpc(target
->ft_exp
, &res
, FLD_LOOKUP
);
484 *mds
= res
.lsr_index
;
486 fld_cache_insert(fld
->lcf_cache
, &res
);
490 EXPORT_SYMBOL(fld_client_lookup
);
492 void fld_client_flush(struct lu_client_fld
*fld
)
494 fld_cache_flush(fld
->lcf_cache
);
496 EXPORT_SYMBOL(fld_client_flush
);
498 static int __init
fld_mod_init(void)
500 fld_type_proc_dir
= lprocfs_register(LUSTRE_FLD_NAME
,
503 return PTR_ERR_OR_ZERO(fld_type_proc_dir
);
506 static void __exit
fld_mod_exit(void)
508 if (fld_type_proc_dir
!= NULL
&& !IS_ERR(fld_type_proc_dir
)) {
509 lprocfs_remove(&fld_type_proc_dir
);
510 fld_type_proc_dir
= NULL
;
514 MODULE_AUTHOR("Sun Microsystems, Inc. <http://www.lustre.org/>");
515 MODULE_DESCRIPTION("Lustre FLD");
516 MODULE_LICENSE("GPL");
518 module_init(fld_mod_init
)
519 module_exit(fld_mod_exit
)