4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License version 2 for more details (a copy is included
14 * in the LICENSE file that accompanied this code).
16 * You should have received a copy of the GNU General Public License
17 * version 2 along with this program; If not, see
18 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
20 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21 * CA 95054 USA or visit www.sun.com if you need additional information or
27 * Copyright (c) 1999, 2010, Oracle and/or its affiliates. All rights reserved.
28 * Use is subject to license terms.
30 * Copyright (c) 2011, 2012, Intel Corporation.
33 * This file is part of Lustre, http://www.lustre.org/
34 * Lustre is a trademark of Sun Microsystems, Inc.
36 * lustre/obdclass/genops.c
38 * These are the only exported functions, they provide some generic
39 * infrastructure for managing object devices
42 #define DEBUG_SUBSYSTEM S_CLASS
44 #include <obd_class.h>
45 #include <lprocfs_status.h>
47 extern struct list_head obd_types
;
48 spinlock_t obd_types_lock
;
50 struct kmem_cache
*obd_device_cachep
;
51 struct kmem_cache
*obdo_cachep
;
52 EXPORT_SYMBOL(obdo_cachep
);
53 struct kmem_cache
*import_cachep
;
55 struct list_head obd_zombie_imports
;
56 struct list_head obd_zombie_exports
;
57 spinlock_t obd_zombie_impexp_lock
;
58 static void obd_zombie_impexp_notify(void);
59 static void obd_zombie_export_add(struct obd_export
*exp
);
60 static void obd_zombie_import_add(struct obd_import
*imp
);
61 static void print_export_data(struct obd_export
*exp
,
62 const char *status
, int locks
);
64 int (*ptlrpc_put_connection_superhack
)(struct ptlrpc_connection
*c
);
65 EXPORT_SYMBOL(ptlrpc_put_connection_superhack
);
68 * support functions: we could use inter-module communication, but this
69 * is more portable to other OS's
71 static struct obd_device
*obd_device_alloc(void)
73 struct obd_device
*obd
;
75 OBD_SLAB_ALLOC_PTR_GFP(obd
, obd_device_cachep
, __GFP_IO
);
77 obd
->obd_magic
= OBD_DEVICE_MAGIC
;
82 static void obd_device_free(struct obd_device
*obd
)
85 LASSERTF(obd
->obd_magic
== OBD_DEVICE_MAGIC
, "obd %p obd_magic %08x != %08x\n",
86 obd
, obd
->obd_magic
, OBD_DEVICE_MAGIC
);
87 if (obd
->obd_namespace
!= NULL
) {
88 CERROR("obd %p: namespace %p was not properly cleaned up (obd_force=%d)!\n",
89 obd
, obd
->obd_namespace
, obd
->obd_force
);
92 lu_ref_fini(&obd
->obd_reference
);
93 OBD_SLAB_FREE_PTR(obd
, obd_device_cachep
);
96 struct obd_type
*class_search_type(const char *name
)
98 struct list_head
*tmp
;
99 struct obd_type
*type
;
101 spin_lock(&obd_types_lock
);
102 list_for_each(tmp
, &obd_types
) {
103 type
= list_entry(tmp
, struct obd_type
, typ_chain
);
104 if (strcmp(type
->typ_name
, name
) == 0) {
105 spin_unlock(&obd_types_lock
);
109 spin_unlock(&obd_types_lock
);
112 EXPORT_SYMBOL(class_search_type
);
114 struct obd_type
*class_get_type(const char *name
)
116 struct obd_type
*type
= class_search_type(name
);
119 const char *modname
= name
;
121 if (strcmp(modname
, "obdfilter") == 0)
124 if (strcmp(modname
, LUSTRE_LWP_NAME
) == 0)
125 modname
= LUSTRE_OSP_NAME
;
127 if (!strncmp(modname
, LUSTRE_MDS_NAME
, strlen(LUSTRE_MDS_NAME
)))
128 modname
= LUSTRE_MDT_NAME
;
130 if (!request_module("%s", modname
)) {
131 CDEBUG(D_INFO
, "Loaded module '%s'\n", modname
);
132 type
= class_search_type(name
);
134 LCONSOLE_ERROR_MSG(0x158, "Can't load module '%s'\n",
139 spin_lock(&type
->obd_type_lock
);
141 try_module_get(type
->typ_dt_ops
->o_owner
);
142 spin_unlock(&type
->obd_type_lock
);
146 EXPORT_SYMBOL(class_get_type
);
148 void class_put_type(struct obd_type
*type
)
151 spin_lock(&type
->obd_type_lock
);
153 module_put(type
->typ_dt_ops
->o_owner
);
154 spin_unlock(&type
->obd_type_lock
);
156 EXPORT_SYMBOL(class_put_type
);
158 #define CLASS_MAX_NAME 1024
160 int class_register_type(struct obd_ops
*dt_ops
, struct md_ops
*md_ops
,
161 struct lprocfs_vars
*vars
, const char *name
,
162 struct lu_device_type
*ldt
)
164 struct obd_type
*type
;
168 LASSERT(strnlen(name
, CLASS_MAX_NAME
) < CLASS_MAX_NAME
);
170 if (class_search_type(name
)) {
171 CDEBUG(D_IOCTL
, "Type %s already registered\n", name
);
176 OBD_ALLOC(type
, sizeof(*type
));
180 OBD_ALLOC_PTR(type
->typ_dt_ops
);
181 OBD_ALLOC_PTR(type
->typ_md_ops
);
182 OBD_ALLOC(type
->typ_name
, strlen(name
) + 1);
184 if (type
->typ_dt_ops
== NULL
||
185 type
->typ_md_ops
== NULL
||
186 type
->typ_name
== NULL
)
189 *(type
->typ_dt_ops
) = *dt_ops
;
190 /* md_ops is optional */
192 *(type
->typ_md_ops
) = *md_ops
;
193 strcpy(type
->typ_name
, name
);
194 spin_lock_init(&type
->obd_type_lock
);
197 type
->typ_procroot
= lprocfs_register(type
->typ_name
, proc_lustre_root
,
199 if (IS_ERR(type
->typ_procroot
)) {
200 rc
= PTR_ERR(type
->typ_procroot
);
201 type
->typ_procroot
= NULL
;
207 rc
= lu_device_type_init(ldt
);
212 spin_lock(&obd_types_lock
);
213 list_add(&type
->typ_chain
, &obd_types
);
214 spin_unlock(&obd_types_lock
);
219 if (type
->typ_name
!= NULL
)
220 OBD_FREE(type
->typ_name
, strlen(name
) + 1);
221 if (type
->typ_md_ops
!= NULL
)
222 OBD_FREE_PTR(type
->typ_md_ops
);
223 if (type
->typ_dt_ops
!= NULL
)
224 OBD_FREE_PTR(type
->typ_dt_ops
);
225 OBD_FREE(type
, sizeof(*type
));
228 EXPORT_SYMBOL(class_register_type
);
230 int class_unregister_type(const char *name
)
232 struct obd_type
*type
= class_search_type(name
);
235 CERROR("unknown obd type\n");
239 if (type
->typ_refcnt
) {
240 CERROR("type %s has refcount (%d)\n", name
, type
->typ_refcnt
);
241 /* This is a bad situation, let's make the best of it */
242 /* Remove ops, but leave the name for debugging */
243 OBD_FREE_PTR(type
->typ_dt_ops
);
244 OBD_FREE_PTR(type
->typ_md_ops
);
248 if (type
->typ_procroot
) {
249 lprocfs_remove(&type
->typ_procroot
);
253 lu_device_type_fini(type
->typ_lu
);
255 spin_lock(&obd_types_lock
);
256 list_del(&type
->typ_chain
);
257 spin_unlock(&obd_types_lock
);
258 OBD_FREE(type
->typ_name
, strlen(name
) + 1);
259 if (type
->typ_dt_ops
!= NULL
)
260 OBD_FREE_PTR(type
->typ_dt_ops
);
261 if (type
->typ_md_ops
!= NULL
)
262 OBD_FREE_PTR(type
->typ_md_ops
);
263 OBD_FREE(type
, sizeof(*type
));
265 } /* class_unregister_type */
266 EXPORT_SYMBOL(class_unregister_type
);
269 * Create a new obd device.
271 * Find an empty slot in ::obd_devs[], create a new obd device in it.
273 * \param[in] type_name obd device type string.
274 * \param[in] name obd device name.
276 * \retval NULL if create fails, otherwise return the obd device
279 struct obd_device
*class_newdev(const char *type_name
, const char *name
)
281 struct obd_device
*result
= NULL
;
282 struct obd_device
*newdev
;
283 struct obd_type
*type
= NULL
;
285 int new_obd_minor
= 0;
287 if (strlen(name
) >= MAX_OBD_NAME
) {
288 CERROR("name/uuid must be < %u bytes long\n", MAX_OBD_NAME
);
289 return ERR_PTR(-EINVAL
);
292 type
= class_get_type(type_name
);
294 CERROR("OBD: unknown type: %s\n", type_name
);
295 return ERR_PTR(-ENODEV
);
298 newdev
= obd_device_alloc();
300 GOTO(out_type
, result
= ERR_PTR(-ENOMEM
));
302 LASSERT(newdev
->obd_magic
== OBD_DEVICE_MAGIC
);
304 write_lock(&obd_dev_lock
);
305 for (i
= 0; i
< class_devno_max(); i
++) {
306 struct obd_device
*obd
= class_num2obd(i
);
308 if (obd
&& (strcmp(name
, obd
->obd_name
) == 0)) {
309 CERROR("Device %s already exists at %d, won't add\n",
312 LASSERTF(result
->obd_magic
== OBD_DEVICE_MAGIC
,
313 "%p obd_magic %08x != %08x\n", result
,
314 result
->obd_magic
, OBD_DEVICE_MAGIC
);
315 LASSERTF(result
->obd_minor
== new_obd_minor
,
316 "%p obd_minor %d != %d\n", result
,
317 result
->obd_minor
, new_obd_minor
);
319 obd_devs
[result
->obd_minor
] = NULL
;
320 result
->obd_name
[0]='\0';
322 result
= ERR_PTR(-EEXIST
);
325 if (!result
&& !obd
) {
327 result
->obd_minor
= i
;
329 result
->obd_type
= type
;
330 strncpy(result
->obd_name
, name
,
331 sizeof(result
->obd_name
) - 1);
332 obd_devs
[i
] = result
;
335 write_unlock(&obd_dev_lock
);
337 if (result
== NULL
&& i
>= class_devno_max()) {
338 CERROR("all %u OBD devices used, increase MAX_OBD_DEVICES\n",
340 GOTO(out
, result
= ERR_PTR(-EOVERFLOW
));
346 CDEBUG(D_IOCTL
, "Adding new device %s (%p)\n",
347 result
->obd_name
, result
);
351 obd_device_free(newdev
);
353 class_put_type(type
);
357 void class_release_dev(struct obd_device
*obd
)
359 struct obd_type
*obd_type
= obd
->obd_type
;
361 LASSERTF(obd
->obd_magic
== OBD_DEVICE_MAGIC
, "%p obd_magic %08x != %08x\n",
362 obd
, obd
->obd_magic
, OBD_DEVICE_MAGIC
);
363 LASSERTF(obd
== obd_devs
[obd
->obd_minor
], "obd %p != obd_devs[%d] %p\n",
364 obd
, obd
->obd_minor
, obd_devs
[obd
->obd_minor
]);
365 LASSERT(obd_type
!= NULL
);
367 CDEBUG(D_INFO
, "Release obd device %s at %d obd_type name =%s\n",
368 obd
->obd_name
, obd
->obd_minor
, obd
->obd_type
->typ_name
);
370 write_lock(&obd_dev_lock
);
371 obd_devs
[obd
->obd_minor
] = NULL
;
372 write_unlock(&obd_dev_lock
);
373 obd_device_free(obd
);
375 class_put_type(obd_type
);
378 int class_name2dev(const char *name
)
385 read_lock(&obd_dev_lock
);
386 for (i
= 0; i
< class_devno_max(); i
++) {
387 struct obd_device
*obd
= class_num2obd(i
);
389 if (obd
&& strcmp(name
, obd
->obd_name
) == 0) {
390 /* Make sure we finished attaching before we give
391 out any references */
392 LASSERT(obd
->obd_magic
== OBD_DEVICE_MAGIC
);
393 if (obd
->obd_attached
) {
394 read_unlock(&obd_dev_lock
);
400 read_unlock(&obd_dev_lock
);
404 EXPORT_SYMBOL(class_name2dev
);
406 struct obd_device
*class_name2obd(const char *name
)
408 int dev
= class_name2dev(name
);
410 if (dev
< 0 || dev
> class_devno_max())
412 return class_num2obd(dev
);
414 EXPORT_SYMBOL(class_name2obd
);
416 int class_uuid2dev(struct obd_uuid
*uuid
)
420 read_lock(&obd_dev_lock
);
421 for (i
= 0; i
< class_devno_max(); i
++) {
422 struct obd_device
*obd
= class_num2obd(i
);
424 if (obd
&& obd_uuid_equals(uuid
, &obd
->obd_uuid
)) {
425 LASSERT(obd
->obd_magic
== OBD_DEVICE_MAGIC
);
426 read_unlock(&obd_dev_lock
);
430 read_unlock(&obd_dev_lock
);
434 EXPORT_SYMBOL(class_uuid2dev
);
436 struct obd_device
*class_uuid2obd(struct obd_uuid
*uuid
)
438 int dev
= class_uuid2dev(uuid
);
441 return class_num2obd(dev
);
443 EXPORT_SYMBOL(class_uuid2obd
);
446 * Get obd device from ::obd_devs[]
448 * \param num [in] array index
450 * \retval NULL if ::obd_devs[\a num] does not contains an obd device
451 * otherwise return the obd device there.
453 struct obd_device
*class_num2obd(int num
)
455 struct obd_device
*obd
= NULL
;
457 if (num
< class_devno_max()) {
462 LASSERTF(obd
->obd_magic
== OBD_DEVICE_MAGIC
,
463 "%p obd_magic %08x != %08x\n",
464 obd
, obd
->obd_magic
, OBD_DEVICE_MAGIC
);
465 LASSERTF(obd
->obd_minor
== num
,
466 "%p obd_minor %0d != %0d\n",
467 obd
, obd
->obd_minor
, num
);
472 EXPORT_SYMBOL(class_num2obd
);
475 * Get obd devices count. Device in any
477 * \retval obd device count
479 int get_devices_count(void)
481 int index
, max_index
= class_devno_max(), dev_count
= 0;
483 read_lock(&obd_dev_lock
);
484 for (index
= 0; index
<= max_index
; index
++) {
485 struct obd_device
*obd
= class_num2obd(index
);
489 read_unlock(&obd_dev_lock
);
493 EXPORT_SYMBOL(get_devices_count
);
495 void class_obd_list(void)
500 read_lock(&obd_dev_lock
);
501 for (i
= 0; i
< class_devno_max(); i
++) {
502 struct obd_device
*obd
= class_num2obd(i
);
506 if (obd
->obd_stopping
)
508 else if (obd
->obd_set_up
)
510 else if (obd
->obd_attached
)
514 LCONSOLE(D_CONFIG
, "%3d %s %s %s %s %d\n",
515 i
, status
, obd
->obd_type
->typ_name
,
516 obd
->obd_name
, obd
->obd_uuid
.uuid
,
517 atomic_read(&obd
->obd_refcount
));
519 read_unlock(&obd_dev_lock
);
523 /* Search for a client OBD connected to tgt_uuid. If grp_uuid is
524 specified, then only the client with that uuid is returned,
525 otherwise any client connected to the tgt is returned. */
526 struct obd_device
* class_find_client_obd(struct obd_uuid
*tgt_uuid
,
527 const char * typ_name
,
528 struct obd_uuid
*grp_uuid
)
532 read_lock(&obd_dev_lock
);
533 for (i
= 0; i
< class_devno_max(); i
++) {
534 struct obd_device
*obd
= class_num2obd(i
);
538 if ((strncmp(obd
->obd_type
->typ_name
, typ_name
,
539 strlen(typ_name
)) == 0)) {
540 if (obd_uuid_equals(tgt_uuid
,
541 &obd
->u
.cli
.cl_target_uuid
) &&
542 ((grp_uuid
)? obd_uuid_equals(grp_uuid
,
543 &obd
->obd_uuid
) : 1)) {
544 read_unlock(&obd_dev_lock
);
549 read_unlock(&obd_dev_lock
);
553 EXPORT_SYMBOL(class_find_client_obd
);
555 /* Iterate the obd_device list looking devices have grp_uuid. Start
556 searching at *next, and if a device is found, the next index to look
557 at is saved in *next. If next is NULL, then the first matching device
558 will always be returned. */
559 struct obd_device
* class_devices_in_group(struct obd_uuid
*grp_uuid
, int *next
)
565 else if (*next
>= 0 && *next
< class_devno_max())
570 read_lock(&obd_dev_lock
);
571 for (; i
< class_devno_max(); i
++) {
572 struct obd_device
*obd
= class_num2obd(i
);
576 if (obd_uuid_equals(grp_uuid
, &obd
->obd_uuid
)) {
579 read_unlock(&obd_dev_lock
);
583 read_unlock(&obd_dev_lock
);
587 EXPORT_SYMBOL(class_devices_in_group
);
590 * to notify sptlrpc log for \a fsname has changed, let every relevant OBD
591 * adjust sptlrpc settings accordingly.
593 int class_notify_sptlrpc_conf(const char *fsname
, int namelen
)
595 struct obd_device
*obd
;
599 LASSERT(namelen
> 0);
601 read_lock(&obd_dev_lock
);
602 for (i
= 0; i
< class_devno_max(); i
++) {
603 obd
= class_num2obd(i
);
605 if (obd
== NULL
|| obd
->obd_set_up
== 0 || obd
->obd_stopping
)
608 /* only notify mdc, osc, mdt, ost */
609 type
= obd
->obd_type
->typ_name
;
610 if (strcmp(type
, LUSTRE_MDC_NAME
) != 0 &&
611 strcmp(type
, LUSTRE_OSC_NAME
) != 0 &&
612 strcmp(type
, LUSTRE_MDT_NAME
) != 0 &&
613 strcmp(type
, LUSTRE_OST_NAME
) != 0)
616 if (strncmp(obd
->obd_name
, fsname
, namelen
))
619 class_incref(obd
, __FUNCTION__
, obd
);
620 read_unlock(&obd_dev_lock
);
621 rc2
= obd_set_info_async(NULL
, obd
->obd_self_export
,
622 sizeof(KEY_SPTLRPC_CONF
),
623 KEY_SPTLRPC_CONF
, 0, NULL
, NULL
);
625 class_decref(obd
, __FUNCTION__
, obd
);
626 read_lock(&obd_dev_lock
);
628 read_unlock(&obd_dev_lock
);
631 EXPORT_SYMBOL(class_notify_sptlrpc_conf
);
633 void obd_cleanup_caches(void)
635 if (obd_device_cachep
) {
636 kmem_cache_destroy(obd_device_cachep
);
637 obd_device_cachep
= NULL
;
640 kmem_cache_destroy(obdo_cachep
);
644 kmem_cache_destroy(import_cachep
);
645 import_cachep
= NULL
;
648 kmem_cache_destroy(capa_cachep
);
653 int obd_init_caches(void)
655 LASSERT(obd_device_cachep
== NULL
);
656 obd_device_cachep
= kmem_cache_create("ll_obd_dev_cache",
657 sizeof(struct obd_device
),
659 if (!obd_device_cachep
)
662 LASSERT(obdo_cachep
== NULL
);
663 obdo_cachep
= kmem_cache_create("ll_obdo_cache", sizeof(struct obdo
),
668 LASSERT(import_cachep
== NULL
);
669 import_cachep
= kmem_cache_create("ll_import_cache",
670 sizeof(struct obd_import
),
675 LASSERT(capa_cachep
== NULL
);
676 capa_cachep
= kmem_cache_create("capa_cache",
677 sizeof(struct obd_capa
), 0, 0, NULL
);
683 obd_cleanup_caches();
688 /* map connection to client */
689 struct obd_export
*class_conn2export(struct lustre_handle
*conn
)
691 struct obd_export
*export
;
694 CDEBUG(D_CACHE
, "looking for null handle\n");
698 if (conn
->cookie
== -1) { /* this means assign a new connection */
699 CDEBUG(D_CACHE
, "want a new connection\n");
703 CDEBUG(D_INFO
, "looking for export cookie "LPX64
"\n", conn
->cookie
);
704 export
= class_handle2object(conn
->cookie
);
707 EXPORT_SYMBOL(class_conn2export
);
709 struct obd_device
*class_exp2obd(struct obd_export
*exp
)
715 EXPORT_SYMBOL(class_exp2obd
);
717 struct obd_device
*class_conn2obd(struct lustre_handle
*conn
)
719 struct obd_export
*export
;
720 export
= class_conn2export(conn
);
722 struct obd_device
*obd
= export
->exp_obd
;
723 class_export_put(export
);
728 EXPORT_SYMBOL(class_conn2obd
);
730 struct obd_import
*class_exp2cliimp(struct obd_export
*exp
)
732 struct obd_device
*obd
= exp
->exp_obd
;
735 return obd
->u
.cli
.cl_import
;
737 EXPORT_SYMBOL(class_exp2cliimp
);
739 struct obd_import
*class_conn2cliimp(struct lustre_handle
*conn
)
741 struct obd_device
*obd
= class_conn2obd(conn
);
744 return obd
->u
.cli
.cl_import
;
746 EXPORT_SYMBOL(class_conn2cliimp
);
748 /* Export management functions */
749 static void class_export_destroy(struct obd_export
*exp
)
751 struct obd_device
*obd
= exp
->exp_obd
;
753 LASSERT_ATOMIC_ZERO(&exp
->exp_refcount
);
754 LASSERT(obd
!= NULL
);
756 CDEBUG(D_IOCTL
, "destroying export %p/%s for %s\n", exp
,
757 exp
->exp_client_uuid
.uuid
, obd
->obd_name
);
759 /* "Local" exports (lctl, LOV->{mdc,osc}) have no connection. */
760 if (exp
->exp_connection
)
761 ptlrpc_put_connection_superhack(exp
->exp_connection
);
763 LASSERT(list_empty(&exp
->exp_outstanding_replies
));
764 LASSERT(list_empty(&exp
->exp_uncommitted_replies
));
765 LASSERT(list_empty(&exp
->exp_req_replay_queue
));
766 LASSERT(list_empty(&exp
->exp_hp_rpcs
));
767 obd_destroy_export(exp
);
768 class_decref(obd
, "export", exp
);
770 OBD_FREE_RCU(exp
, sizeof(*exp
), &exp
->exp_handle
);
773 static void export_handle_addref(void *export
)
775 class_export_get(export
);
778 static struct portals_handle_ops export_handle_ops
= {
779 .hop_addref
= export_handle_addref
,
783 struct obd_export
*class_export_get(struct obd_export
*exp
)
785 atomic_inc(&exp
->exp_refcount
);
786 CDEBUG(D_INFO
, "GETting export %p : new refcount %d\n", exp
,
787 atomic_read(&exp
->exp_refcount
));
790 EXPORT_SYMBOL(class_export_get
);
792 void class_export_put(struct obd_export
*exp
)
794 LASSERT(exp
!= NULL
);
795 LASSERT_ATOMIC_GT_LT(&exp
->exp_refcount
, 0, LI_POISON
);
796 CDEBUG(D_INFO
, "PUTting export %p : new refcount %d\n", exp
,
797 atomic_read(&exp
->exp_refcount
) - 1);
799 if (atomic_dec_and_test(&exp
->exp_refcount
)) {
800 LASSERT(!list_empty(&exp
->exp_obd_chain
));
801 CDEBUG(D_IOCTL
, "final put %p/%s\n",
802 exp
, exp
->exp_client_uuid
.uuid
);
804 /* release nid stat refererence */
805 lprocfs_exp_cleanup(exp
);
807 obd_zombie_export_add(exp
);
810 EXPORT_SYMBOL(class_export_put
);
812 /* Creates a new export, adds it to the hash table, and returns a
813 * pointer to it. The refcount is 2: one for the hash reference, and
814 * one for the pointer returned by this function. */
815 struct obd_export
*class_new_export(struct obd_device
*obd
,
816 struct obd_uuid
*cluuid
)
818 struct obd_export
*export
;
819 cfs_hash_t
*hash
= NULL
;
822 OBD_ALLOC_PTR(export
);
824 return ERR_PTR(-ENOMEM
);
826 export
->exp_conn_cnt
= 0;
827 export
->exp_lock_hash
= NULL
;
828 export
->exp_flock_hash
= NULL
;
829 atomic_set(&export
->exp_refcount
, 2);
830 atomic_set(&export
->exp_rpc_count
, 0);
831 atomic_set(&export
->exp_cb_count
, 0);
832 atomic_set(&export
->exp_locks_count
, 0);
833 #if LUSTRE_TRACKS_LOCK_EXP_REFS
834 INIT_LIST_HEAD(&export
->exp_locks_list
);
835 spin_lock_init(&export
->exp_locks_list_guard
);
837 atomic_set(&export
->exp_replay_count
, 0);
838 export
->exp_obd
= obd
;
839 INIT_LIST_HEAD(&export
->exp_outstanding_replies
);
840 spin_lock_init(&export
->exp_uncommitted_replies_lock
);
841 INIT_LIST_HEAD(&export
->exp_uncommitted_replies
);
842 INIT_LIST_HEAD(&export
->exp_req_replay_queue
);
843 INIT_LIST_HEAD(&export
->exp_handle
.h_link
);
844 INIT_LIST_HEAD(&export
->exp_hp_rpcs
);
845 class_handle_hash(&export
->exp_handle
, &export_handle_ops
);
846 export
->exp_last_request_time
= cfs_time_current_sec();
847 spin_lock_init(&export
->exp_lock
);
848 spin_lock_init(&export
->exp_rpc_lock
);
849 INIT_HLIST_NODE(&export
->exp_uuid_hash
);
850 INIT_HLIST_NODE(&export
->exp_nid_hash
);
851 spin_lock_init(&export
->exp_bl_list_lock
);
852 INIT_LIST_HEAD(&export
->exp_bl_list
);
854 export
->exp_sp_peer
= LUSTRE_SP_ANY
;
855 export
->exp_flvr
.sf_rpc
= SPTLRPC_FLVR_INVALID
;
856 export
->exp_client_uuid
= *cluuid
;
857 obd_init_export(export
);
859 spin_lock(&obd
->obd_dev_lock
);
860 /* shouldn't happen, but might race */
861 if (obd
->obd_stopping
)
862 GOTO(exit_unlock
, rc
= -ENODEV
);
864 hash
= cfs_hash_getref(obd
->obd_uuid_hash
);
866 GOTO(exit_unlock
, rc
= -ENODEV
);
867 spin_unlock(&obd
->obd_dev_lock
);
869 if (!obd_uuid_equals(cluuid
, &obd
->obd_uuid
)) {
870 rc
= cfs_hash_add_unique(hash
, cluuid
, &export
->exp_uuid_hash
);
872 LCONSOLE_WARN("%s: denying duplicate export for %s, %d\n",
873 obd
->obd_name
, cluuid
->uuid
, rc
);
874 GOTO(exit_err
, rc
= -EALREADY
);
878 spin_lock(&obd
->obd_dev_lock
);
879 if (obd
->obd_stopping
) {
880 cfs_hash_del(hash
, cluuid
, &export
->exp_uuid_hash
);
881 GOTO(exit_unlock
, rc
= -ENODEV
);
884 class_incref(obd
, "export", export
);
885 list_add(&export
->exp_obd_chain
, &export
->exp_obd
->obd_exports
);
886 list_add_tail(&export
->exp_obd_chain_timed
,
887 &export
->exp_obd
->obd_exports_timed
);
888 export
->exp_obd
->obd_num_exports
++;
889 spin_unlock(&obd
->obd_dev_lock
);
890 cfs_hash_putref(hash
);
894 spin_unlock(&obd
->obd_dev_lock
);
897 cfs_hash_putref(hash
);
898 class_handle_unhash(&export
->exp_handle
);
899 LASSERT(hlist_unhashed(&export
->exp_uuid_hash
));
900 obd_destroy_export(export
);
901 OBD_FREE_PTR(export
);
904 EXPORT_SYMBOL(class_new_export
);
906 void class_unlink_export(struct obd_export
*exp
)
908 class_handle_unhash(&exp
->exp_handle
);
910 spin_lock(&exp
->exp_obd
->obd_dev_lock
);
911 /* delete an uuid-export hashitem from hashtables */
912 if (!hlist_unhashed(&exp
->exp_uuid_hash
))
913 cfs_hash_del(exp
->exp_obd
->obd_uuid_hash
,
914 &exp
->exp_client_uuid
,
915 &exp
->exp_uuid_hash
);
917 list_move(&exp
->exp_obd_chain
, &exp
->exp_obd
->obd_unlinked_exports
);
918 list_del_init(&exp
->exp_obd_chain_timed
);
919 exp
->exp_obd
->obd_num_exports
--;
920 spin_unlock(&exp
->exp_obd
->obd_dev_lock
);
921 class_export_put(exp
);
923 EXPORT_SYMBOL(class_unlink_export
);
925 /* Import management functions */
926 void class_import_destroy(struct obd_import
*imp
)
928 CDEBUG(D_IOCTL
, "destroying import %p for %s\n", imp
,
929 imp
->imp_obd
->obd_name
);
931 LASSERT_ATOMIC_ZERO(&imp
->imp_refcount
);
933 ptlrpc_put_connection_superhack(imp
->imp_connection
);
935 while (!list_empty(&imp
->imp_conn_list
)) {
936 struct obd_import_conn
*imp_conn
;
938 imp_conn
= list_entry(imp
->imp_conn_list
.next
,
939 struct obd_import_conn
, oic_item
);
940 list_del_init(&imp_conn
->oic_item
);
941 ptlrpc_put_connection_superhack(imp_conn
->oic_conn
);
942 OBD_FREE(imp_conn
, sizeof(*imp_conn
));
945 LASSERT(imp
->imp_sec
== NULL
);
946 class_decref(imp
->imp_obd
, "import", imp
);
947 OBD_FREE_RCU(imp
, sizeof(*imp
), &imp
->imp_handle
);
950 static void import_handle_addref(void *import
)
952 class_import_get(import
);
955 static struct portals_handle_ops import_handle_ops
= {
956 .hop_addref
= import_handle_addref
,
960 struct obd_import
*class_import_get(struct obd_import
*import
)
962 atomic_inc(&import
->imp_refcount
);
963 CDEBUG(D_INFO
, "import %p refcount=%d obd=%s\n", import
,
964 atomic_read(&import
->imp_refcount
),
965 import
->imp_obd
->obd_name
);
968 EXPORT_SYMBOL(class_import_get
);
970 void class_import_put(struct obd_import
*imp
)
972 LASSERT(list_empty(&imp
->imp_zombie_chain
));
973 LASSERT_ATOMIC_GT_LT(&imp
->imp_refcount
, 0, LI_POISON
);
975 CDEBUG(D_INFO
, "import %p refcount=%d obd=%s\n", imp
,
976 atomic_read(&imp
->imp_refcount
) - 1,
977 imp
->imp_obd
->obd_name
);
979 if (atomic_dec_and_test(&imp
->imp_refcount
)) {
980 CDEBUG(D_INFO
, "final put import %p\n", imp
);
981 obd_zombie_import_add(imp
);
984 /* catch possible import put race */
985 LASSERT_ATOMIC_GE_LT(&imp
->imp_refcount
, 0, LI_POISON
);
987 EXPORT_SYMBOL(class_import_put
);
989 static void init_imp_at(struct imp_at
*at
) {
991 at_init(&at
->iat_net_latency
, 0, 0);
992 for (i
= 0; i
< IMP_AT_MAX_PORTALS
; i
++) {
993 /* max service estimates are tracked on the server side, so
994 don't use the AT history here, just use the last reported
995 val. (But keep hist for proc histogram, worst_ever) */
996 at_init(&at
->iat_service_estimate
[i
], INITIAL_CONNECT_TIMEOUT
,
1001 struct obd_import
*class_new_import(struct obd_device
*obd
)
1003 struct obd_import
*imp
;
1005 OBD_ALLOC(imp
, sizeof(*imp
));
1009 INIT_LIST_HEAD(&imp
->imp_pinger_chain
);
1010 INIT_LIST_HEAD(&imp
->imp_zombie_chain
);
1011 INIT_LIST_HEAD(&imp
->imp_replay_list
);
1012 INIT_LIST_HEAD(&imp
->imp_sending_list
);
1013 INIT_LIST_HEAD(&imp
->imp_delayed_list
);
1014 spin_lock_init(&imp
->imp_lock
);
1015 imp
->imp_last_success_conn
= 0;
1016 imp
->imp_state
= LUSTRE_IMP_NEW
;
1017 imp
->imp_obd
= class_incref(obd
, "import", imp
);
1018 mutex_init(&imp
->imp_sec_mutex
);
1019 init_waitqueue_head(&imp
->imp_recovery_waitq
);
1021 atomic_set(&imp
->imp_refcount
, 2);
1022 atomic_set(&imp
->imp_unregistering
, 0);
1023 atomic_set(&imp
->imp_inflight
, 0);
1024 atomic_set(&imp
->imp_replay_inflight
, 0);
1025 atomic_set(&imp
->imp_inval_count
, 0);
1026 INIT_LIST_HEAD(&imp
->imp_conn_list
);
1027 INIT_LIST_HEAD(&imp
->imp_handle
.h_link
);
1028 class_handle_hash(&imp
->imp_handle
, &import_handle_ops
);
1029 init_imp_at(&imp
->imp_at
);
1031 /* the default magic is V2, will be used in connect RPC, and
1032 * then adjusted according to the flags in request/reply. */
1033 imp
->imp_msg_magic
= LUSTRE_MSG_MAGIC_V2
;
1037 EXPORT_SYMBOL(class_new_import
);
1039 void class_destroy_import(struct obd_import
*import
)
1041 LASSERT(import
!= NULL
);
1042 LASSERT(import
!= LP_POISON
);
1044 class_handle_unhash(&import
->imp_handle
);
1046 spin_lock(&import
->imp_lock
);
1047 import
->imp_generation
++;
1048 spin_unlock(&import
->imp_lock
);
1049 class_import_put(import
);
1051 EXPORT_SYMBOL(class_destroy_import
);
1053 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1055 void __class_export_add_lock_ref(struct obd_export
*exp
, struct ldlm_lock
*lock
)
1057 spin_lock(&exp
->exp_locks_list_guard
);
1059 LASSERT(lock
->l_exp_refs_nr
>= 0);
1061 if (lock
->l_exp_refs_target
!= NULL
&&
1062 lock
->l_exp_refs_target
!= exp
) {
1063 LCONSOLE_WARN("setting export %p for lock %p which already has export %p\n",
1064 exp
, lock
, lock
->l_exp_refs_target
);
1066 if ((lock
->l_exp_refs_nr
++) == 0) {
1067 list_add(&lock
->l_exp_refs_link
, &exp
->exp_locks_list
);
1068 lock
->l_exp_refs_target
= exp
;
1070 CDEBUG(D_INFO
, "lock = %p, export = %p, refs = %u\n",
1071 lock
, exp
, lock
->l_exp_refs_nr
);
1072 spin_unlock(&exp
->exp_locks_list_guard
);
1074 EXPORT_SYMBOL(__class_export_add_lock_ref
);
1076 void __class_export_del_lock_ref(struct obd_export
*exp
, struct ldlm_lock
*lock
)
1078 spin_lock(&exp
->exp_locks_list_guard
);
1079 LASSERT(lock
->l_exp_refs_nr
> 0);
1080 if (lock
->l_exp_refs_target
!= exp
) {
1081 LCONSOLE_WARN("lock %p, "
1082 "mismatching export pointers: %p, %p\n",
1083 lock
, lock
->l_exp_refs_target
, exp
);
1085 if (-- lock
->l_exp_refs_nr
== 0) {
1086 list_del_init(&lock
->l_exp_refs_link
);
1087 lock
->l_exp_refs_target
= NULL
;
1089 CDEBUG(D_INFO
, "lock = %p, export = %p, refs = %u\n",
1090 lock
, exp
, lock
->l_exp_refs_nr
);
1091 spin_unlock(&exp
->exp_locks_list_guard
);
1093 EXPORT_SYMBOL(__class_export_del_lock_ref
);
1096 /* A connection defines an export context in which preallocation can
1097 be managed. This releases the export pointer reference, and returns
1098 the export handle, so the export refcount is 1 when this function
1100 int class_connect(struct lustre_handle
*conn
, struct obd_device
*obd
,
1101 struct obd_uuid
*cluuid
)
1103 struct obd_export
*export
;
1104 LASSERT(conn
!= NULL
);
1105 LASSERT(obd
!= NULL
);
1106 LASSERT(cluuid
!= NULL
);
1108 export
= class_new_export(obd
, cluuid
);
1110 return PTR_ERR(export
);
1112 conn
->cookie
= export
->exp_handle
.h_cookie
;
1113 class_export_put(export
);
1115 CDEBUG(D_IOCTL
, "connect: client %s, cookie "LPX64
"\n",
1116 cluuid
->uuid
, conn
->cookie
);
1119 EXPORT_SYMBOL(class_connect
);
1121 /* if export is involved in recovery then clean up related things */
1122 void class_export_recovery_cleanup(struct obd_export
*exp
)
1124 struct obd_device
*obd
= exp
->exp_obd
;
1126 spin_lock(&obd
->obd_recovery_task_lock
);
1127 if (exp
->exp_delayed
)
1128 obd
->obd_delayed_clients
--;
1129 if (obd
->obd_recovering
) {
1130 if (exp
->exp_in_recovery
) {
1131 spin_lock(&exp
->exp_lock
);
1132 exp
->exp_in_recovery
= 0;
1133 spin_unlock(&exp
->exp_lock
);
1134 LASSERT_ATOMIC_POS(&obd
->obd_connected_clients
);
1135 atomic_dec(&obd
->obd_connected_clients
);
1138 /* if called during recovery then should update
1139 * obd_stale_clients counter,
1140 * lightweight exports are not counted */
1141 if (exp
->exp_failed
&&
1142 (exp_connect_flags(exp
) & OBD_CONNECT_LIGHTWEIGHT
) == 0)
1143 exp
->exp_obd
->obd_stale_clients
++;
1145 spin_unlock(&obd
->obd_recovery_task_lock
);
1146 /** Cleanup req replay fields */
1147 if (exp
->exp_req_replay_needed
) {
1148 spin_lock(&exp
->exp_lock
);
1149 exp
->exp_req_replay_needed
= 0;
1150 spin_unlock(&exp
->exp_lock
);
1151 LASSERT(atomic_read(&obd
->obd_req_replay_clients
));
1152 atomic_dec(&obd
->obd_req_replay_clients
);
1154 /** Cleanup lock replay data */
1155 if (exp
->exp_lock_replay_needed
) {
1156 spin_lock(&exp
->exp_lock
);
1157 exp
->exp_lock_replay_needed
= 0;
1158 spin_unlock(&exp
->exp_lock
);
1159 LASSERT(atomic_read(&obd
->obd_lock_replay_clients
));
1160 atomic_dec(&obd
->obd_lock_replay_clients
);
1164 /* This function removes 1-3 references from the export:
1165 * 1 - for export pointer passed
1166 * and if disconnect really need
1167 * 2 - removing from hash
1168 * 3 - in client_unlink_export
1169 * The export pointer passed to this function can destroyed */
1170 int class_disconnect(struct obd_export
*export
)
1172 int already_disconnected
;
1174 if (export
== NULL
) {
1175 CWARN("attempting to free NULL export %p\n", export
);
1179 spin_lock(&export
->exp_lock
);
1180 already_disconnected
= export
->exp_disconnected
;
1181 export
->exp_disconnected
= 1;
1182 spin_unlock(&export
->exp_lock
);
1184 /* class_cleanup(), abort_recovery(), and class_fail_export()
1185 * all end up in here, and if any of them race we shouldn't
1186 * call extra class_export_puts(). */
1187 if (already_disconnected
) {
1188 LASSERT(hlist_unhashed(&export
->exp_nid_hash
));
1189 GOTO(no_disconn
, already_disconnected
);
1192 CDEBUG(D_IOCTL
, "disconnect: cookie "LPX64
"\n",
1193 export
->exp_handle
.h_cookie
);
1195 if (!hlist_unhashed(&export
->exp_nid_hash
))
1196 cfs_hash_del(export
->exp_obd
->obd_nid_hash
,
1197 &export
->exp_connection
->c_peer
.nid
,
1198 &export
->exp_nid_hash
);
1200 class_export_recovery_cleanup(export
);
1201 class_unlink_export(export
);
1203 class_export_put(export
);
1206 EXPORT_SYMBOL(class_disconnect
);
1208 /* Return non-zero for a fully connected export */
1209 int class_connected_export(struct obd_export
*exp
)
1213 spin_lock(&exp
->exp_lock
);
1214 connected
= (exp
->exp_conn_cnt
> 0);
1215 spin_unlock(&exp
->exp_lock
);
1220 EXPORT_SYMBOL(class_connected_export
);
1222 static void class_disconnect_export_list(struct list_head
*list
,
1223 enum obd_option flags
)
1226 struct obd_export
*exp
;
1228 /* It's possible that an export may disconnect itself, but
1229 * nothing else will be added to this list. */
1230 while (!list_empty(list
)) {
1231 exp
= list_entry(list
->next
, struct obd_export
,
1233 /* need for safe call CDEBUG after obd_disconnect */
1234 class_export_get(exp
);
1236 spin_lock(&exp
->exp_lock
);
1237 exp
->exp_flags
= flags
;
1238 spin_unlock(&exp
->exp_lock
);
1240 if (obd_uuid_equals(&exp
->exp_client_uuid
,
1241 &exp
->exp_obd
->obd_uuid
)) {
1243 "exp %p export uuid == obd uuid, don't discon\n",
1245 /* Need to delete this now so we don't end up pointing
1246 * to work_list later when this export is cleaned up. */
1247 list_del_init(&exp
->exp_obd_chain
);
1248 class_export_put(exp
);
1252 class_export_get(exp
);
1253 CDEBUG(D_HA
, "%s: disconnecting export at %s (%p), "
1254 "last request at "CFS_TIME_T
"\n",
1255 exp
->exp_obd
->obd_name
, obd_export_nid2str(exp
),
1256 exp
, exp
->exp_last_request_time
);
1257 /* release one export reference anyway */
1258 rc
= obd_disconnect(exp
);
1260 CDEBUG(D_HA
, "disconnected export at %s (%p): rc %d\n",
1261 obd_export_nid2str(exp
), exp
, rc
);
1262 class_export_put(exp
);
1266 void class_disconnect_exports(struct obd_device
*obd
)
1268 struct list_head work_list
;
1270 /* Move all of the exports from obd_exports to a work list, en masse. */
1271 INIT_LIST_HEAD(&work_list
);
1272 spin_lock(&obd
->obd_dev_lock
);
1273 list_splice_init(&obd
->obd_exports
, &work_list
);
1274 list_splice_init(&obd
->obd_delayed_exports
, &work_list
);
1275 spin_unlock(&obd
->obd_dev_lock
);
1277 if (!list_empty(&work_list
)) {
1278 CDEBUG(D_HA
, "OBD device %d (%p) has exports, "
1279 "disconnecting them\n", obd
->obd_minor
, obd
);
1280 class_disconnect_export_list(&work_list
,
1281 exp_flags_from_obd(obd
));
1283 CDEBUG(D_HA
, "OBD device %d (%p) has no exports\n",
1284 obd
->obd_minor
, obd
);
1286 EXPORT_SYMBOL(class_disconnect_exports
);
1288 /* Remove exports that have not completed recovery.
1290 void class_disconnect_stale_exports(struct obd_device
*obd
,
1291 int (*test_export
)(struct obd_export
*))
1293 struct list_head work_list
;
1294 struct obd_export
*exp
, *n
;
1297 INIT_LIST_HEAD(&work_list
);
1298 spin_lock(&obd
->obd_dev_lock
);
1299 list_for_each_entry_safe(exp
, n
, &obd
->obd_exports
,
1301 /* don't count self-export as client */
1302 if (obd_uuid_equals(&exp
->exp_client_uuid
,
1303 &exp
->exp_obd
->obd_uuid
))
1306 /* don't evict clients which have no slot in last_rcvd
1307 * (e.g. lightweight connection) */
1308 if (exp
->exp_target_data
.ted_lr_idx
== -1)
1311 spin_lock(&exp
->exp_lock
);
1312 if (exp
->exp_failed
|| test_export(exp
)) {
1313 spin_unlock(&exp
->exp_lock
);
1316 exp
->exp_failed
= 1;
1317 spin_unlock(&exp
->exp_lock
);
1319 list_move(&exp
->exp_obd_chain
, &work_list
);
1321 CDEBUG(D_HA
, "%s: disconnect stale client %s@%s\n",
1322 obd
->obd_name
, exp
->exp_client_uuid
.uuid
,
1323 exp
->exp_connection
== NULL
? "<unknown>" :
1324 libcfs_nid2str(exp
->exp_connection
->c_peer
.nid
));
1325 print_export_data(exp
, "EVICTING", 0);
1327 spin_unlock(&obd
->obd_dev_lock
);
1330 LCONSOLE_WARN("%s: disconnecting %d stale clients\n",
1331 obd
->obd_name
, evicted
);
1333 class_disconnect_export_list(&work_list
, exp_flags_from_obd(obd
) |
1334 OBD_OPT_ABORT_RECOV
);
1336 EXPORT_SYMBOL(class_disconnect_stale_exports
);
1338 void class_fail_export(struct obd_export
*exp
)
1340 int rc
, already_failed
;
1342 spin_lock(&exp
->exp_lock
);
1343 already_failed
= exp
->exp_failed
;
1344 exp
->exp_failed
= 1;
1345 spin_unlock(&exp
->exp_lock
);
1347 if (already_failed
) {
1348 CDEBUG(D_HA
, "disconnecting dead export %p/%s; skipping\n",
1349 exp
, exp
->exp_client_uuid
.uuid
);
1353 CDEBUG(D_HA
, "disconnecting export %p/%s\n",
1354 exp
, exp
->exp_client_uuid
.uuid
);
1356 if (obd_dump_on_timeout
)
1357 libcfs_debug_dumplog();
1359 /* need for safe call CDEBUG after obd_disconnect */
1360 class_export_get(exp
);
1362 /* Most callers into obd_disconnect are removing their own reference
1363 * (request, for example) in addition to the one from the hash table.
1364 * We don't have such a reference here, so make one. */
1365 class_export_get(exp
);
1366 rc
= obd_disconnect(exp
);
1368 CERROR("disconnecting export %p failed: %d\n", exp
, rc
);
1370 CDEBUG(D_HA
, "disconnected export %p/%s\n",
1371 exp
, exp
->exp_client_uuid
.uuid
);
1372 class_export_put(exp
);
1374 EXPORT_SYMBOL(class_fail_export
);
1376 char *obd_export_nid2str(struct obd_export
*exp
)
1378 if (exp
->exp_connection
!= NULL
)
1379 return libcfs_nid2str(exp
->exp_connection
->c_peer
.nid
);
1383 EXPORT_SYMBOL(obd_export_nid2str
);
1385 int obd_export_evict_by_nid(struct obd_device
*obd
, const char *nid
)
1387 cfs_hash_t
*nid_hash
;
1388 struct obd_export
*doomed_exp
= NULL
;
1389 int exports_evicted
= 0;
1391 lnet_nid_t nid_key
= libcfs_str2nid((char *)nid
);
1393 spin_lock(&obd
->obd_dev_lock
);
1394 /* umount has run already, so evict thread should leave
1395 * its task to umount thread now */
1396 if (obd
->obd_stopping
) {
1397 spin_unlock(&obd
->obd_dev_lock
);
1398 return exports_evicted
;
1400 nid_hash
= obd
->obd_nid_hash
;
1401 cfs_hash_getref(nid_hash
);
1402 spin_unlock(&obd
->obd_dev_lock
);
1405 doomed_exp
= cfs_hash_lookup(nid_hash
, &nid_key
);
1406 if (doomed_exp
== NULL
)
1409 LASSERTF(doomed_exp
->exp_connection
->c_peer
.nid
== nid_key
,
1410 "nid %s found, wanted nid %s, requested nid %s\n",
1411 obd_export_nid2str(doomed_exp
),
1412 libcfs_nid2str(nid_key
), nid
);
1413 LASSERTF(doomed_exp
!= obd
->obd_self_export
,
1414 "self-export is hashed by NID?\n");
1416 LCONSOLE_WARN("%s: evicting %s (at %s) by administrative "
1417 "request\n", obd
->obd_name
,
1418 obd_uuid2str(&doomed_exp
->exp_client_uuid
),
1419 obd_export_nid2str(doomed_exp
));
1420 class_fail_export(doomed_exp
);
1421 class_export_put(doomed_exp
);
1424 cfs_hash_putref(nid_hash
);
1426 if (!exports_evicted
)
1427 CDEBUG(D_HA
,"%s: can't disconnect NID '%s': no exports found\n",
1428 obd
->obd_name
, nid
);
1429 return exports_evicted
;
1431 EXPORT_SYMBOL(obd_export_evict_by_nid
);
1433 int obd_export_evict_by_uuid(struct obd_device
*obd
, const char *uuid
)
1435 cfs_hash_t
*uuid_hash
;
1436 struct obd_export
*doomed_exp
= NULL
;
1437 struct obd_uuid doomed_uuid
;
1438 int exports_evicted
= 0;
1440 spin_lock(&obd
->obd_dev_lock
);
1441 if (obd
->obd_stopping
) {
1442 spin_unlock(&obd
->obd_dev_lock
);
1443 return exports_evicted
;
1445 uuid_hash
= obd
->obd_uuid_hash
;
1446 cfs_hash_getref(uuid_hash
);
1447 spin_unlock(&obd
->obd_dev_lock
);
1449 obd_str2uuid(&doomed_uuid
, uuid
);
1450 if (obd_uuid_equals(&doomed_uuid
, &obd
->obd_uuid
)) {
1451 CERROR("%s: can't evict myself\n", obd
->obd_name
);
1452 cfs_hash_putref(uuid_hash
);
1453 return exports_evicted
;
1456 doomed_exp
= cfs_hash_lookup(uuid_hash
, &doomed_uuid
);
1458 if (doomed_exp
== NULL
) {
1459 CERROR("%s: can't disconnect %s: no exports found\n",
1460 obd
->obd_name
, uuid
);
1462 CWARN("%s: evicting %s at administrative request\n",
1463 obd
->obd_name
, doomed_exp
->exp_client_uuid
.uuid
);
1464 class_fail_export(doomed_exp
);
1465 class_export_put(doomed_exp
);
1468 cfs_hash_putref(uuid_hash
);
1470 return exports_evicted
;
1472 EXPORT_SYMBOL(obd_export_evict_by_uuid
);
1474 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1475 void (*class_export_dump_hook
)(struct obd_export
*) = NULL
;
1476 EXPORT_SYMBOL(class_export_dump_hook
);
1479 static void print_export_data(struct obd_export
*exp
, const char *status
,
1482 struct ptlrpc_reply_state
*rs
;
1483 struct ptlrpc_reply_state
*first_reply
= NULL
;
1486 spin_lock(&exp
->exp_lock
);
1487 list_for_each_entry(rs
, &exp
->exp_outstanding_replies
,
1493 spin_unlock(&exp
->exp_lock
);
1495 CDEBUG(D_HA
, "%s: %s %p %s %s %d (%d %d %d) %d %d %d %d: %p %s "LPU64
"\n",
1496 exp
->exp_obd
->obd_name
, status
, exp
, exp
->exp_client_uuid
.uuid
,
1497 obd_export_nid2str(exp
), atomic_read(&exp
->exp_refcount
),
1498 atomic_read(&exp
->exp_rpc_count
),
1499 atomic_read(&exp
->exp_cb_count
),
1500 atomic_read(&exp
->exp_locks_count
),
1501 exp
->exp_disconnected
, exp
->exp_delayed
, exp
->exp_failed
,
1502 nreplies
, first_reply
, nreplies
> 3 ? "..." : "",
1503 exp
->exp_last_committed
);
1504 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1505 if (locks
&& class_export_dump_hook
!= NULL
)
1506 class_export_dump_hook(exp
);
1510 void dump_exports(struct obd_device
*obd
, int locks
)
1512 struct obd_export
*exp
;
1514 spin_lock(&obd
->obd_dev_lock
);
1515 list_for_each_entry(exp
, &obd
->obd_exports
, exp_obd_chain
)
1516 print_export_data(exp
, "ACTIVE", locks
);
1517 list_for_each_entry(exp
, &obd
->obd_unlinked_exports
, exp_obd_chain
)
1518 print_export_data(exp
, "UNLINKED", locks
);
1519 list_for_each_entry(exp
, &obd
->obd_delayed_exports
, exp_obd_chain
)
1520 print_export_data(exp
, "DELAYED", locks
);
1521 spin_unlock(&obd
->obd_dev_lock
);
1522 spin_lock(&obd_zombie_impexp_lock
);
1523 list_for_each_entry(exp
, &obd_zombie_exports
, exp_obd_chain
)
1524 print_export_data(exp
, "ZOMBIE", locks
);
1525 spin_unlock(&obd_zombie_impexp_lock
);
1527 EXPORT_SYMBOL(dump_exports
);
1529 void obd_exports_barrier(struct obd_device
*obd
)
1532 LASSERT(list_empty(&obd
->obd_exports
));
1533 spin_lock(&obd
->obd_dev_lock
);
1534 while (!list_empty(&obd
->obd_unlinked_exports
)) {
1535 spin_unlock(&obd
->obd_dev_lock
);
1536 schedule_timeout_and_set_state(TASK_UNINTERRUPTIBLE
,
1537 cfs_time_seconds(waited
));
1538 if (waited
> 5 && IS_PO2(waited
)) {
1539 LCONSOLE_WARN("%s is waiting for obd_unlinked_exports "
1540 "more than %d seconds. "
1541 "The obd refcount = %d. Is it stuck?\n",
1542 obd
->obd_name
, waited
,
1543 atomic_read(&obd
->obd_refcount
));
1544 dump_exports(obd
, 1);
1547 spin_lock(&obd
->obd_dev_lock
);
1549 spin_unlock(&obd
->obd_dev_lock
);
1551 EXPORT_SYMBOL(obd_exports_barrier
);
1553 /* Total amount of zombies to be destroyed */
1554 static int zombies_count
= 0;
1557 * kill zombie imports and exports
1559 void obd_zombie_impexp_cull(void)
1561 struct obd_import
*import
;
1562 struct obd_export
*export
;
1565 spin_lock(&obd_zombie_impexp_lock
);
1568 if (!list_empty(&obd_zombie_imports
)) {
1569 import
= list_entry(obd_zombie_imports
.next
,
1572 list_del_init(&import
->imp_zombie_chain
);
1576 if (!list_empty(&obd_zombie_exports
)) {
1577 export
= list_entry(obd_zombie_exports
.next
,
1580 list_del_init(&export
->exp_obd_chain
);
1583 spin_unlock(&obd_zombie_impexp_lock
);
1585 if (import
!= NULL
) {
1586 class_import_destroy(import
);
1587 spin_lock(&obd_zombie_impexp_lock
);
1589 spin_unlock(&obd_zombie_impexp_lock
);
1592 if (export
!= NULL
) {
1593 class_export_destroy(export
);
1594 spin_lock(&obd_zombie_impexp_lock
);
1596 spin_unlock(&obd_zombie_impexp_lock
);
1600 } while (import
!= NULL
|| export
!= NULL
);
1603 static struct completion obd_zombie_start
;
1604 static struct completion obd_zombie_stop
;
1605 static unsigned long obd_zombie_flags
;
1606 static wait_queue_head_t obd_zombie_waitq
;
1607 static pid_t obd_zombie_pid
;
1610 OBD_ZOMBIE_STOP
= 0x0001,
1614 * check for work for kill zombie import/export thread.
1616 static int obd_zombie_impexp_check(void *arg
)
1620 spin_lock(&obd_zombie_impexp_lock
);
1621 rc
= (zombies_count
== 0) &&
1622 !test_bit(OBD_ZOMBIE_STOP
, &obd_zombie_flags
);
1623 spin_unlock(&obd_zombie_impexp_lock
);
1629 * Add export to the obd_zombe thread and notify it.
1631 static void obd_zombie_export_add(struct obd_export
*exp
) {
1632 spin_lock(&exp
->exp_obd
->obd_dev_lock
);
1633 LASSERT(!list_empty(&exp
->exp_obd_chain
));
1634 list_del_init(&exp
->exp_obd_chain
);
1635 spin_unlock(&exp
->exp_obd
->obd_dev_lock
);
1636 spin_lock(&obd_zombie_impexp_lock
);
1638 list_add(&exp
->exp_obd_chain
, &obd_zombie_exports
);
1639 spin_unlock(&obd_zombie_impexp_lock
);
1641 obd_zombie_impexp_notify();
1645 * Add import to the obd_zombe thread and notify it.
1647 static void obd_zombie_import_add(struct obd_import
*imp
) {
1648 LASSERT(imp
->imp_sec
== NULL
);
1649 LASSERT(imp
->imp_rq_pool
== NULL
);
1650 spin_lock(&obd_zombie_impexp_lock
);
1651 LASSERT(list_empty(&imp
->imp_zombie_chain
));
1653 list_add(&imp
->imp_zombie_chain
, &obd_zombie_imports
);
1654 spin_unlock(&obd_zombie_impexp_lock
);
1656 obd_zombie_impexp_notify();
1660 * notify import/export destroy thread about new zombie.
1662 static void obd_zombie_impexp_notify(void)
1665 * Make sure obd_zomebie_impexp_thread get this notification.
1666 * It is possible this signal only get by obd_zombie_barrier, and
1667 * barrier gulps this notification and sleeps away and hangs ensues
1669 wake_up_all(&obd_zombie_waitq
);
1673 * check whether obd_zombie is idle
1675 static int obd_zombie_is_idle(void)
1679 LASSERT(!test_bit(OBD_ZOMBIE_STOP
, &obd_zombie_flags
));
1680 spin_lock(&obd_zombie_impexp_lock
);
1681 rc
= (zombies_count
== 0);
1682 spin_unlock(&obd_zombie_impexp_lock
);
1687 * wait when obd_zombie import/export queues become empty
1689 void obd_zombie_barrier(void)
1691 struct l_wait_info lwi
= { 0 };
1693 if (obd_zombie_pid
== current_pid())
1694 /* don't wait for myself */
1696 l_wait_event(obd_zombie_waitq
, obd_zombie_is_idle(), &lwi
);
1698 EXPORT_SYMBOL(obd_zombie_barrier
);
1702 * destroy zombie export/import thread.
1704 static int obd_zombie_impexp_thread(void *unused
)
1706 unshare_fs_struct();
1707 complete(&obd_zombie_start
);
1709 obd_zombie_pid
= current_pid();
1711 while (!test_bit(OBD_ZOMBIE_STOP
, &obd_zombie_flags
)) {
1712 struct l_wait_info lwi
= { 0 };
1714 l_wait_event(obd_zombie_waitq
,
1715 !obd_zombie_impexp_check(NULL
), &lwi
);
1716 obd_zombie_impexp_cull();
1719 * Notify obd_zombie_barrier callers that queues
1722 wake_up(&obd_zombie_waitq
);
1725 complete(&obd_zombie_stop
);
1732 * start destroy zombie import/export thread
1734 int obd_zombie_impexp_init(void)
1738 INIT_LIST_HEAD(&obd_zombie_imports
);
1739 INIT_LIST_HEAD(&obd_zombie_exports
);
1740 spin_lock_init(&obd_zombie_impexp_lock
);
1741 init_completion(&obd_zombie_start
);
1742 init_completion(&obd_zombie_stop
);
1743 init_waitqueue_head(&obd_zombie_waitq
);
1746 task
= kthread_run(obd_zombie_impexp_thread
, NULL
, "obd_zombid");
1748 return PTR_ERR(task
);
1750 wait_for_completion(&obd_zombie_start
);
1754 * stop destroy zombie import/export thread
1756 void obd_zombie_impexp_stop(void)
1758 set_bit(OBD_ZOMBIE_STOP
, &obd_zombie_flags
);
1759 obd_zombie_impexp_notify();
1760 wait_for_completion(&obd_zombie_stop
);
1763 /***** Kernel-userspace comm helpers *******/
1765 /* Get length of entire message, including header */
1766 int kuc_len(int payload_len
)
1768 return sizeof(struct kuc_hdr
) + payload_len
;
1770 EXPORT_SYMBOL(kuc_len
);
1772 /* Get a pointer to kuc header, given a ptr to the payload
1773 * @param p Pointer to payload area
1774 * @returns Pointer to kuc header
1776 struct kuc_hdr
* kuc_ptr(void *p
)
1778 struct kuc_hdr
*lh
= ((struct kuc_hdr
*)p
) - 1;
1779 LASSERT(lh
->kuc_magic
== KUC_MAGIC
);
1782 EXPORT_SYMBOL(kuc_ptr
);
1784 /* Test if payload is part of kuc message
1785 * @param p Pointer to payload area
1788 int kuc_ispayload(void *p
)
1790 struct kuc_hdr
*kh
= ((struct kuc_hdr
*)p
) - 1;
1792 if (kh
->kuc_magic
== KUC_MAGIC
)
1797 EXPORT_SYMBOL(kuc_ispayload
);
1799 /* Alloc space for a message, and fill in header
1800 * @return Pointer to payload area
1802 void *kuc_alloc(int payload_len
, int transport
, int type
)
1805 int len
= kuc_len(payload_len
);
1809 return ERR_PTR(-ENOMEM
);
1811 lh
->kuc_magic
= KUC_MAGIC
;
1812 lh
->kuc_transport
= transport
;
1813 lh
->kuc_msgtype
= type
;
1814 lh
->kuc_msglen
= len
;
1816 return (void *)(lh
+ 1);
1818 EXPORT_SYMBOL(kuc_alloc
);
1820 /* Takes pointer to payload area */
1821 inline void kuc_free(void *p
, int payload_len
)
1823 struct kuc_hdr
*lh
= kuc_ptr(p
);
1824 OBD_FREE(lh
, kuc_len(payload_len
));
1826 EXPORT_SYMBOL(kuc_free
);