4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License version 2 for more details (a copy is included
14 * in the LICENSE file that accompanied this code).
16 * You should have received a copy of the GNU General Public License
17 * version 2 along with this program; If not, see
18 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
20 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21 * CA 95054 USA or visit www.sun.com if you need additional information or
27 * Copyright (c) 1999, 2010, Oracle and/or its affiliates. All rights reserved.
28 * Use is subject to license terms.
30 * Copyright (c) 2011, 2012, Intel Corporation.
33 * This file is part of Lustre, http://www.lustre.org/
34 * Lustre is a trademark of Sun Microsystems, Inc.
36 * lustre/obdclass/genops.c
38 * These are the only exported functions, they provide some generic
39 * infrastructure for managing object devices
42 #define DEBUG_SUBSYSTEM S_CLASS
43 #include "../include/obd_ost.h"
44 #include "../include/obd_class.h"
45 #include "../include/lprocfs_status.h"
47 extern struct list_head obd_types
;
48 spinlock_t obd_types_lock
;
50 struct kmem_cache
*obd_device_cachep
;
51 struct kmem_cache
*obdo_cachep
;
52 EXPORT_SYMBOL(obdo_cachep
);
53 struct kmem_cache
*import_cachep
;
55 struct list_head obd_zombie_imports
;
56 struct list_head obd_zombie_exports
;
57 spinlock_t obd_zombie_impexp_lock
;
58 static void obd_zombie_impexp_notify(void);
59 static void obd_zombie_export_add(struct obd_export
*exp
);
60 static void obd_zombie_import_add(struct obd_import
*imp
);
61 static void print_export_data(struct obd_export
*exp
,
62 const char *status
, int locks
);
64 int (*ptlrpc_put_connection_superhack
)(struct ptlrpc_connection
*c
);
65 EXPORT_SYMBOL(ptlrpc_put_connection_superhack
);
68 * support functions: we could use inter-module communication, but this
69 * is more portable to other OS's
71 static struct obd_device
*obd_device_alloc(void)
73 struct obd_device
*obd
;
75 OBD_SLAB_ALLOC_PTR_GFP(obd
, obd_device_cachep
, GFP_NOFS
);
77 obd
->obd_magic
= OBD_DEVICE_MAGIC
;
82 static void obd_device_free(struct obd_device
*obd
)
85 LASSERTF(obd
->obd_magic
== OBD_DEVICE_MAGIC
, "obd %p obd_magic %08x != %08x\n",
86 obd
, obd
->obd_magic
, OBD_DEVICE_MAGIC
);
87 if (obd
->obd_namespace
!= NULL
) {
88 CERROR("obd %p: namespace %p was not properly cleaned up (obd_force=%d)!\n",
89 obd
, obd
->obd_namespace
, obd
->obd_force
);
92 lu_ref_fini(&obd
->obd_reference
);
93 OBD_SLAB_FREE_PTR(obd
, obd_device_cachep
);
96 struct obd_type
*class_search_type(const char *name
)
98 struct list_head
*tmp
;
99 struct obd_type
*type
;
101 spin_lock(&obd_types_lock
);
102 list_for_each(tmp
, &obd_types
) {
103 type
= list_entry(tmp
, struct obd_type
, typ_chain
);
104 if (strcmp(type
->typ_name
, name
) == 0) {
105 spin_unlock(&obd_types_lock
);
109 spin_unlock(&obd_types_lock
);
112 EXPORT_SYMBOL(class_search_type
);
114 struct obd_type
*class_get_type(const char *name
)
116 struct obd_type
*type
= class_search_type(name
);
119 const char *modname
= name
;
121 if (strcmp(modname
, "obdfilter") == 0)
124 if (strcmp(modname
, LUSTRE_LWP_NAME
) == 0)
125 modname
= LUSTRE_OSP_NAME
;
127 if (!strncmp(modname
, LUSTRE_MDS_NAME
, strlen(LUSTRE_MDS_NAME
)))
128 modname
= LUSTRE_MDT_NAME
;
130 if (!request_module("%s", modname
)) {
131 CDEBUG(D_INFO
, "Loaded module '%s'\n", modname
);
132 type
= class_search_type(name
);
134 LCONSOLE_ERROR_MSG(0x158, "Can't load module '%s'\n",
139 spin_lock(&type
->obd_type_lock
);
141 try_module_get(type
->typ_dt_ops
->o_owner
);
142 spin_unlock(&type
->obd_type_lock
);
146 EXPORT_SYMBOL(class_get_type
);
148 void class_put_type(struct obd_type
*type
)
151 spin_lock(&type
->obd_type_lock
);
153 module_put(type
->typ_dt_ops
->o_owner
);
154 spin_unlock(&type
->obd_type_lock
);
156 EXPORT_SYMBOL(class_put_type
);
158 #define CLASS_MAX_NAME 1024
160 int class_register_type(struct obd_ops
*dt_ops
, struct md_ops
*md_ops
,
161 struct lprocfs_vars
*vars
, const char *name
,
162 struct lu_device_type
*ldt
)
164 struct obd_type
*type
;
168 LASSERT(strnlen(name
, CLASS_MAX_NAME
) < CLASS_MAX_NAME
);
170 if (class_search_type(name
)) {
171 CDEBUG(D_IOCTL
, "Type %s already registered\n", name
);
176 OBD_ALLOC(type
, sizeof(*type
));
180 OBD_ALLOC_PTR(type
->typ_dt_ops
);
181 OBD_ALLOC_PTR(type
->typ_md_ops
);
182 OBD_ALLOC(type
->typ_name
, strlen(name
) + 1);
184 if (type
->typ_dt_ops
== NULL
||
185 type
->typ_md_ops
== NULL
||
186 type
->typ_name
== NULL
)
189 *(type
->typ_dt_ops
) = *dt_ops
;
190 /* md_ops is optional */
192 *(type
->typ_md_ops
) = *md_ops
;
193 strcpy(type
->typ_name
, name
);
194 spin_lock_init(&type
->obd_type_lock
);
196 type
->typ_procroot
= lprocfs_register(type
->typ_name
, proc_lustre_root
,
198 if (IS_ERR(type
->typ_procroot
)) {
199 rc
= PTR_ERR(type
->typ_procroot
);
200 type
->typ_procroot
= NULL
;
206 rc
= lu_device_type_init(ldt
);
211 spin_lock(&obd_types_lock
);
212 list_add(&type
->typ_chain
, &obd_types
);
213 spin_unlock(&obd_types_lock
);
218 if (type
->typ_name
!= NULL
)
219 OBD_FREE(type
->typ_name
, strlen(name
) + 1);
220 if (type
->typ_md_ops
!= NULL
)
221 OBD_FREE_PTR(type
->typ_md_ops
);
222 if (type
->typ_dt_ops
!= NULL
)
223 OBD_FREE_PTR(type
->typ_dt_ops
);
224 OBD_FREE(type
, sizeof(*type
));
227 EXPORT_SYMBOL(class_register_type
);
229 int class_unregister_type(const char *name
)
231 struct obd_type
*type
= class_search_type(name
);
234 CERROR("unknown obd type\n");
238 if (type
->typ_refcnt
) {
239 CERROR("type %s has refcount (%d)\n", name
, type
->typ_refcnt
);
240 /* This is a bad situation, let's make the best of it */
241 /* Remove ops, but leave the name for debugging */
242 OBD_FREE_PTR(type
->typ_dt_ops
);
243 OBD_FREE_PTR(type
->typ_md_ops
);
247 if (type
->typ_procroot
) {
248 lprocfs_remove(&type
->typ_procroot
);
252 lu_device_type_fini(type
->typ_lu
);
254 spin_lock(&obd_types_lock
);
255 list_del(&type
->typ_chain
);
256 spin_unlock(&obd_types_lock
);
257 OBD_FREE(type
->typ_name
, strlen(name
) + 1);
258 if (type
->typ_dt_ops
!= NULL
)
259 OBD_FREE_PTR(type
->typ_dt_ops
);
260 if (type
->typ_md_ops
!= NULL
)
261 OBD_FREE_PTR(type
->typ_md_ops
);
262 OBD_FREE(type
, sizeof(*type
));
264 } /* class_unregister_type */
265 EXPORT_SYMBOL(class_unregister_type
);
268 * Create a new obd device.
270 * Find an empty slot in ::obd_devs[], create a new obd device in it.
272 * \param[in] type_name obd device type string.
273 * \param[in] name obd device name.
275 * \retval NULL if create fails, otherwise return the obd device
278 struct obd_device
*class_newdev(const char *type_name
, const char *name
)
280 struct obd_device
*result
= NULL
;
281 struct obd_device
*newdev
;
282 struct obd_type
*type
= NULL
;
284 int new_obd_minor
= 0;
286 if (strlen(name
) >= MAX_OBD_NAME
) {
287 CERROR("name/uuid must be < %u bytes long\n", MAX_OBD_NAME
);
288 return ERR_PTR(-EINVAL
);
291 type
= class_get_type(type_name
);
293 CERROR("OBD: unknown type: %s\n", type_name
);
294 return ERR_PTR(-ENODEV
);
297 newdev
= obd_device_alloc();
299 GOTO(out_type
, result
= ERR_PTR(-ENOMEM
));
301 LASSERT(newdev
->obd_magic
== OBD_DEVICE_MAGIC
);
303 write_lock(&obd_dev_lock
);
304 for (i
= 0; i
< class_devno_max(); i
++) {
305 struct obd_device
*obd
= class_num2obd(i
);
307 if (obd
&& (strcmp(name
, obd
->obd_name
) == 0)) {
308 CERROR("Device %s already exists at %d, won't add\n",
311 LASSERTF(result
->obd_magic
== OBD_DEVICE_MAGIC
,
312 "%p obd_magic %08x != %08x\n", result
,
313 result
->obd_magic
, OBD_DEVICE_MAGIC
);
314 LASSERTF(result
->obd_minor
== new_obd_minor
,
315 "%p obd_minor %d != %d\n", result
,
316 result
->obd_minor
, new_obd_minor
);
318 obd_devs
[result
->obd_minor
] = NULL
;
319 result
->obd_name
[0]='\0';
321 result
= ERR_PTR(-EEXIST
);
324 if (!result
&& !obd
) {
326 result
->obd_minor
= i
;
328 result
->obd_type
= type
;
329 strncpy(result
->obd_name
, name
,
330 sizeof(result
->obd_name
) - 1);
331 obd_devs
[i
] = result
;
334 write_unlock(&obd_dev_lock
);
336 if (result
== NULL
&& i
>= class_devno_max()) {
337 CERROR("all %u OBD devices used, increase MAX_OBD_DEVICES\n",
339 GOTO(out
, result
= ERR_PTR(-EOVERFLOW
));
345 CDEBUG(D_IOCTL
, "Adding new device %s (%p)\n",
346 result
->obd_name
, result
);
350 obd_device_free(newdev
);
352 class_put_type(type
);
356 void class_release_dev(struct obd_device
*obd
)
358 struct obd_type
*obd_type
= obd
->obd_type
;
360 LASSERTF(obd
->obd_magic
== OBD_DEVICE_MAGIC
, "%p obd_magic %08x != %08x\n",
361 obd
, obd
->obd_magic
, OBD_DEVICE_MAGIC
);
362 LASSERTF(obd
== obd_devs
[obd
->obd_minor
], "obd %p != obd_devs[%d] %p\n",
363 obd
, obd
->obd_minor
, obd_devs
[obd
->obd_minor
]);
364 LASSERT(obd_type
!= NULL
);
366 CDEBUG(D_INFO
, "Release obd device %s at %d obd_type name =%s\n",
367 obd
->obd_name
, obd
->obd_minor
, obd
->obd_type
->typ_name
);
369 write_lock(&obd_dev_lock
);
370 obd_devs
[obd
->obd_minor
] = NULL
;
371 write_unlock(&obd_dev_lock
);
372 obd_device_free(obd
);
374 class_put_type(obd_type
);
377 int class_name2dev(const char *name
)
384 read_lock(&obd_dev_lock
);
385 for (i
= 0; i
< class_devno_max(); i
++) {
386 struct obd_device
*obd
= class_num2obd(i
);
388 if (obd
&& strcmp(name
, obd
->obd_name
) == 0) {
389 /* Make sure we finished attaching before we give
390 out any references */
391 LASSERT(obd
->obd_magic
== OBD_DEVICE_MAGIC
);
392 if (obd
->obd_attached
) {
393 read_unlock(&obd_dev_lock
);
399 read_unlock(&obd_dev_lock
);
403 EXPORT_SYMBOL(class_name2dev
);
405 struct obd_device
*class_name2obd(const char *name
)
407 int dev
= class_name2dev(name
);
409 if (dev
< 0 || dev
> class_devno_max())
411 return class_num2obd(dev
);
413 EXPORT_SYMBOL(class_name2obd
);
415 int class_uuid2dev(struct obd_uuid
*uuid
)
419 read_lock(&obd_dev_lock
);
420 for (i
= 0; i
< class_devno_max(); i
++) {
421 struct obd_device
*obd
= class_num2obd(i
);
423 if (obd
&& obd_uuid_equals(uuid
, &obd
->obd_uuid
)) {
424 LASSERT(obd
->obd_magic
== OBD_DEVICE_MAGIC
);
425 read_unlock(&obd_dev_lock
);
429 read_unlock(&obd_dev_lock
);
433 EXPORT_SYMBOL(class_uuid2dev
);
435 struct obd_device
*class_uuid2obd(struct obd_uuid
*uuid
)
437 int dev
= class_uuid2dev(uuid
);
440 return class_num2obd(dev
);
442 EXPORT_SYMBOL(class_uuid2obd
);
445 * Get obd device from ::obd_devs[]
447 * \param num [in] array index
449 * \retval NULL if ::obd_devs[\a num] does not contains an obd device
450 * otherwise return the obd device there.
452 struct obd_device
*class_num2obd(int num
)
454 struct obd_device
*obd
= NULL
;
456 if (num
< class_devno_max()) {
461 LASSERTF(obd
->obd_magic
== OBD_DEVICE_MAGIC
,
462 "%p obd_magic %08x != %08x\n",
463 obd
, obd
->obd_magic
, OBD_DEVICE_MAGIC
);
464 LASSERTF(obd
->obd_minor
== num
,
465 "%p obd_minor %0d != %0d\n",
466 obd
, obd
->obd_minor
, num
);
471 EXPORT_SYMBOL(class_num2obd
);
474 * Get obd devices count. Device in any
476 * \retval obd device count
478 int get_devices_count(void)
480 int index
, max_index
= class_devno_max(), dev_count
= 0;
482 read_lock(&obd_dev_lock
);
483 for (index
= 0; index
<= max_index
; index
++) {
484 struct obd_device
*obd
= class_num2obd(index
);
488 read_unlock(&obd_dev_lock
);
492 EXPORT_SYMBOL(get_devices_count
);
494 void class_obd_list(void)
499 read_lock(&obd_dev_lock
);
500 for (i
= 0; i
< class_devno_max(); i
++) {
501 struct obd_device
*obd
= class_num2obd(i
);
505 if (obd
->obd_stopping
)
507 else if (obd
->obd_set_up
)
509 else if (obd
->obd_attached
)
513 LCONSOLE(D_CONFIG
, "%3d %s %s %s %s %d\n",
514 i
, status
, obd
->obd_type
->typ_name
,
515 obd
->obd_name
, obd
->obd_uuid
.uuid
,
516 atomic_read(&obd
->obd_refcount
));
518 read_unlock(&obd_dev_lock
);
522 /* Search for a client OBD connected to tgt_uuid. If grp_uuid is
523 specified, then only the client with that uuid is returned,
524 otherwise any client connected to the tgt is returned. */
525 struct obd_device
* class_find_client_obd(struct obd_uuid
*tgt_uuid
,
526 const char * typ_name
,
527 struct obd_uuid
*grp_uuid
)
531 read_lock(&obd_dev_lock
);
532 for (i
= 0; i
< class_devno_max(); i
++) {
533 struct obd_device
*obd
= class_num2obd(i
);
537 if ((strncmp(obd
->obd_type
->typ_name
, typ_name
,
538 strlen(typ_name
)) == 0)) {
539 if (obd_uuid_equals(tgt_uuid
,
540 &obd
->u
.cli
.cl_target_uuid
) &&
541 ((grp_uuid
)? obd_uuid_equals(grp_uuid
,
542 &obd
->obd_uuid
) : 1)) {
543 read_unlock(&obd_dev_lock
);
548 read_unlock(&obd_dev_lock
);
552 EXPORT_SYMBOL(class_find_client_obd
);
554 /* Iterate the obd_device list looking devices have grp_uuid. Start
555 searching at *next, and if a device is found, the next index to look
556 at is saved in *next. If next is NULL, then the first matching device
557 will always be returned. */
558 struct obd_device
* class_devices_in_group(struct obd_uuid
*grp_uuid
, int *next
)
564 else if (*next
>= 0 && *next
< class_devno_max())
569 read_lock(&obd_dev_lock
);
570 for (; i
< class_devno_max(); i
++) {
571 struct obd_device
*obd
= class_num2obd(i
);
575 if (obd_uuid_equals(grp_uuid
, &obd
->obd_uuid
)) {
578 read_unlock(&obd_dev_lock
);
582 read_unlock(&obd_dev_lock
);
586 EXPORT_SYMBOL(class_devices_in_group
);
589 * to notify sptlrpc log for \a fsname has changed, let every relevant OBD
590 * adjust sptlrpc settings accordingly.
592 int class_notify_sptlrpc_conf(const char *fsname
, int namelen
)
594 struct obd_device
*obd
;
598 LASSERT(namelen
> 0);
600 read_lock(&obd_dev_lock
);
601 for (i
= 0; i
< class_devno_max(); i
++) {
602 obd
= class_num2obd(i
);
604 if (obd
== NULL
|| obd
->obd_set_up
== 0 || obd
->obd_stopping
)
607 /* only notify mdc, osc, mdt, ost */
608 type
= obd
->obd_type
->typ_name
;
609 if (strcmp(type
, LUSTRE_MDC_NAME
) != 0 &&
610 strcmp(type
, LUSTRE_OSC_NAME
) != 0 &&
611 strcmp(type
, LUSTRE_MDT_NAME
) != 0 &&
612 strcmp(type
, LUSTRE_OST_NAME
) != 0)
615 if (strncmp(obd
->obd_name
, fsname
, namelen
))
618 class_incref(obd
, __func__
, obd
);
619 read_unlock(&obd_dev_lock
);
620 rc2
= obd_set_info_async(NULL
, obd
->obd_self_export
,
621 sizeof(KEY_SPTLRPC_CONF
),
622 KEY_SPTLRPC_CONF
, 0, NULL
, NULL
);
624 class_decref(obd
, __func__
, obd
);
625 read_lock(&obd_dev_lock
);
627 read_unlock(&obd_dev_lock
);
630 EXPORT_SYMBOL(class_notify_sptlrpc_conf
);
632 void obd_cleanup_caches(void)
634 if (obd_device_cachep
) {
635 kmem_cache_destroy(obd_device_cachep
);
636 obd_device_cachep
= NULL
;
639 kmem_cache_destroy(obdo_cachep
);
643 kmem_cache_destroy(import_cachep
);
644 import_cachep
= NULL
;
647 kmem_cache_destroy(capa_cachep
);
652 int obd_init_caches(void)
654 LASSERT(obd_device_cachep
== NULL
);
655 obd_device_cachep
= kmem_cache_create("ll_obd_dev_cache",
656 sizeof(struct obd_device
),
658 if (!obd_device_cachep
)
661 LASSERT(obdo_cachep
== NULL
);
662 obdo_cachep
= kmem_cache_create("ll_obdo_cache", sizeof(struct obdo
),
667 LASSERT(import_cachep
== NULL
);
668 import_cachep
= kmem_cache_create("ll_import_cache",
669 sizeof(struct obd_import
),
674 LASSERT(capa_cachep
== NULL
);
675 capa_cachep
= kmem_cache_create("capa_cache",
676 sizeof(struct obd_capa
), 0, 0, NULL
);
682 obd_cleanup_caches();
687 /* map connection to client */
688 struct obd_export
*class_conn2export(struct lustre_handle
*conn
)
690 struct obd_export
*export
;
693 CDEBUG(D_CACHE
, "looking for null handle\n");
697 if (conn
->cookie
== -1) { /* this means assign a new connection */
698 CDEBUG(D_CACHE
, "want a new connection\n");
702 CDEBUG(D_INFO
, "looking for export cookie %#llx\n", conn
->cookie
);
703 export
= class_handle2object(conn
->cookie
);
706 EXPORT_SYMBOL(class_conn2export
);
708 struct obd_device
*class_exp2obd(struct obd_export
*exp
)
714 EXPORT_SYMBOL(class_exp2obd
);
716 struct obd_device
*class_conn2obd(struct lustre_handle
*conn
)
718 struct obd_export
*export
;
719 export
= class_conn2export(conn
);
721 struct obd_device
*obd
= export
->exp_obd
;
722 class_export_put(export
);
727 EXPORT_SYMBOL(class_conn2obd
);
729 struct obd_import
*class_exp2cliimp(struct obd_export
*exp
)
731 struct obd_device
*obd
= exp
->exp_obd
;
734 return obd
->u
.cli
.cl_import
;
736 EXPORT_SYMBOL(class_exp2cliimp
);
738 struct obd_import
*class_conn2cliimp(struct lustre_handle
*conn
)
740 struct obd_device
*obd
= class_conn2obd(conn
);
743 return obd
->u
.cli
.cl_import
;
745 EXPORT_SYMBOL(class_conn2cliimp
);
747 /* Export management functions */
748 static void class_export_destroy(struct obd_export
*exp
)
750 struct obd_device
*obd
= exp
->exp_obd
;
752 LASSERT_ATOMIC_ZERO(&exp
->exp_refcount
);
753 LASSERT(obd
!= NULL
);
755 CDEBUG(D_IOCTL
, "destroying export %p/%s for %s\n", exp
,
756 exp
->exp_client_uuid
.uuid
, obd
->obd_name
);
758 /* "Local" exports (lctl, LOV->{mdc,osc}) have no connection. */
759 if (exp
->exp_connection
)
760 ptlrpc_put_connection_superhack(exp
->exp_connection
);
762 LASSERT(list_empty(&exp
->exp_outstanding_replies
));
763 LASSERT(list_empty(&exp
->exp_uncommitted_replies
));
764 LASSERT(list_empty(&exp
->exp_req_replay_queue
));
765 LASSERT(list_empty(&exp
->exp_hp_rpcs
));
766 obd_destroy_export(exp
);
767 class_decref(obd
, "export", exp
);
769 OBD_FREE_RCU(exp
, sizeof(*exp
), &exp
->exp_handle
);
772 static void export_handle_addref(void *export
)
774 class_export_get(export
);
777 static struct portals_handle_ops export_handle_ops
= {
778 .hop_addref
= export_handle_addref
,
782 struct obd_export
*class_export_get(struct obd_export
*exp
)
784 atomic_inc(&exp
->exp_refcount
);
785 CDEBUG(D_INFO
, "GETting export %p : new refcount %d\n", exp
,
786 atomic_read(&exp
->exp_refcount
));
789 EXPORT_SYMBOL(class_export_get
);
791 void class_export_put(struct obd_export
*exp
)
793 LASSERT(exp
!= NULL
);
794 LASSERT_ATOMIC_GT_LT(&exp
->exp_refcount
, 0, LI_POISON
);
795 CDEBUG(D_INFO
, "PUTting export %p : new refcount %d\n", exp
,
796 atomic_read(&exp
->exp_refcount
) - 1);
798 if (atomic_dec_and_test(&exp
->exp_refcount
)) {
799 LASSERT(!list_empty(&exp
->exp_obd_chain
));
800 CDEBUG(D_IOCTL
, "final put %p/%s\n",
801 exp
, exp
->exp_client_uuid
.uuid
);
803 /* release nid stat refererence */
804 lprocfs_exp_cleanup(exp
);
806 obd_zombie_export_add(exp
);
809 EXPORT_SYMBOL(class_export_put
);
811 /* Creates a new export, adds it to the hash table, and returns a
812 * pointer to it. The refcount is 2: one for the hash reference, and
813 * one for the pointer returned by this function. */
814 struct obd_export
*class_new_export(struct obd_device
*obd
,
815 struct obd_uuid
*cluuid
)
817 struct obd_export
*export
;
818 struct cfs_hash
*hash
= NULL
;
821 OBD_ALLOC_PTR(export
);
823 return ERR_PTR(-ENOMEM
);
825 export
->exp_conn_cnt
= 0;
826 export
->exp_lock_hash
= NULL
;
827 export
->exp_flock_hash
= NULL
;
828 atomic_set(&export
->exp_refcount
, 2);
829 atomic_set(&export
->exp_rpc_count
, 0);
830 atomic_set(&export
->exp_cb_count
, 0);
831 atomic_set(&export
->exp_locks_count
, 0);
832 #if LUSTRE_TRACKS_LOCK_EXP_REFS
833 INIT_LIST_HEAD(&export
->exp_locks_list
);
834 spin_lock_init(&export
->exp_locks_list_guard
);
836 atomic_set(&export
->exp_replay_count
, 0);
837 export
->exp_obd
= obd
;
838 INIT_LIST_HEAD(&export
->exp_outstanding_replies
);
839 spin_lock_init(&export
->exp_uncommitted_replies_lock
);
840 INIT_LIST_HEAD(&export
->exp_uncommitted_replies
);
841 INIT_LIST_HEAD(&export
->exp_req_replay_queue
);
842 INIT_LIST_HEAD(&export
->exp_handle
.h_link
);
843 INIT_LIST_HEAD(&export
->exp_hp_rpcs
);
844 class_handle_hash(&export
->exp_handle
, &export_handle_ops
);
845 export
->exp_last_request_time
= get_seconds();
846 spin_lock_init(&export
->exp_lock
);
847 spin_lock_init(&export
->exp_rpc_lock
);
848 INIT_HLIST_NODE(&export
->exp_uuid_hash
);
849 INIT_HLIST_NODE(&export
->exp_nid_hash
);
850 spin_lock_init(&export
->exp_bl_list_lock
);
851 INIT_LIST_HEAD(&export
->exp_bl_list
);
853 export
->exp_sp_peer
= LUSTRE_SP_ANY
;
854 export
->exp_flvr
.sf_rpc
= SPTLRPC_FLVR_INVALID
;
855 export
->exp_client_uuid
= *cluuid
;
856 obd_init_export(export
);
858 spin_lock(&obd
->obd_dev_lock
);
859 /* shouldn't happen, but might race */
860 if (obd
->obd_stopping
)
861 GOTO(exit_unlock
, rc
= -ENODEV
);
863 hash
= cfs_hash_getref(obd
->obd_uuid_hash
);
865 GOTO(exit_unlock
, rc
= -ENODEV
);
866 spin_unlock(&obd
->obd_dev_lock
);
868 if (!obd_uuid_equals(cluuid
, &obd
->obd_uuid
)) {
869 rc
= cfs_hash_add_unique(hash
, cluuid
, &export
->exp_uuid_hash
);
871 LCONSOLE_WARN("%s: denying duplicate export for %s, %d\n",
872 obd
->obd_name
, cluuid
->uuid
, rc
);
873 GOTO(exit_err
, rc
= -EALREADY
);
877 spin_lock(&obd
->obd_dev_lock
);
878 if (obd
->obd_stopping
) {
879 cfs_hash_del(hash
, cluuid
, &export
->exp_uuid_hash
);
880 GOTO(exit_unlock
, rc
= -ENODEV
);
883 class_incref(obd
, "export", export
);
884 list_add(&export
->exp_obd_chain
, &export
->exp_obd
->obd_exports
);
885 list_add_tail(&export
->exp_obd_chain_timed
,
886 &export
->exp_obd
->obd_exports_timed
);
887 export
->exp_obd
->obd_num_exports
++;
888 spin_unlock(&obd
->obd_dev_lock
);
889 cfs_hash_putref(hash
);
893 spin_unlock(&obd
->obd_dev_lock
);
896 cfs_hash_putref(hash
);
897 class_handle_unhash(&export
->exp_handle
);
898 LASSERT(hlist_unhashed(&export
->exp_uuid_hash
));
899 obd_destroy_export(export
);
900 OBD_FREE_PTR(export
);
903 EXPORT_SYMBOL(class_new_export
);
905 void class_unlink_export(struct obd_export
*exp
)
907 class_handle_unhash(&exp
->exp_handle
);
909 spin_lock(&exp
->exp_obd
->obd_dev_lock
);
910 /* delete an uuid-export hashitem from hashtables */
911 if (!hlist_unhashed(&exp
->exp_uuid_hash
))
912 cfs_hash_del(exp
->exp_obd
->obd_uuid_hash
,
913 &exp
->exp_client_uuid
,
914 &exp
->exp_uuid_hash
);
916 list_move(&exp
->exp_obd_chain
, &exp
->exp_obd
->obd_unlinked_exports
);
917 list_del_init(&exp
->exp_obd_chain_timed
);
918 exp
->exp_obd
->obd_num_exports
--;
919 spin_unlock(&exp
->exp_obd
->obd_dev_lock
);
920 class_export_put(exp
);
922 EXPORT_SYMBOL(class_unlink_export
);
924 /* Import management functions */
925 void class_import_destroy(struct obd_import
*imp
)
927 CDEBUG(D_IOCTL
, "destroying import %p for %s\n", imp
,
928 imp
->imp_obd
->obd_name
);
930 LASSERT_ATOMIC_ZERO(&imp
->imp_refcount
);
932 ptlrpc_put_connection_superhack(imp
->imp_connection
);
934 while (!list_empty(&imp
->imp_conn_list
)) {
935 struct obd_import_conn
*imp_conn
;
937 imp_conn
= list_entry(imp
->imp_conn_list
.next
,
938 struct obd_import_conn
, oic_item
);
939 list_del_init(&imp_conn
->oic_item
);
940 ptlrpc_put_connection_superhack(imp_conn
->oic_conn
);
941 OBD_FREE(imp_conn
, sizeof(*imp_conn
));
944 LASSERT(imp
->imp_sec
== NULL
);
945 class_decref(imp
->imp_obd
, "import", imp
);
946 OBD_FREE_RCU(imp
, sizeof(*imp
), &imp
->imp_handle
);
949 static void import_handle_addref(void *import
)
951 class_import_get(import
);
954 static struct portals_handle_ops import_handle_ops
= {
955 .hop_addref
= import_handle_addref
,
959 struct obd_import
*class_import_get(struct obd_import
*import
)
961 atomic_inc(&import
->imp_refcount
);
962 CDEBUG(D_INFO
, "import %p refcount=%d obd=%s\n", import
,
963 atomic_read(&import
->imp_refcount
),
964 import
->imp_obd
->obd_name
);
967 EXPORT_SYMBOL(class_import_get
);
969 void class_import_put(struct obd_import
*imp
)
971 LASSERT(list_empty(&imp
->imp_zombie_chain
));
972 LASSERT_ATOMIC_GT_LT(&imp
->imp_refcount
, 0, LI_POISON
);
974 CDEBUG(D_INFO
, "import %p refcount=%d obd=%s\n", imp
,
975 atomic_read(&imp
->imp_refcount
) - 1,
976 imp
->imp_obd
->obd_name
);
978 if (atomic_dec_and_test(&imp
->imp_refcount
)) {
979 CDEBUG(D_INFO
, "final put import %p\n", imp
);
980 obd_zombie_import_add(imp
);
983 /* catch possible import put race */
984 LASSERT_ATOMIC_GE_LT(&imp
->imp_refcount
, 0, LI_POISON
);
986 EXPORT_SYMBOL(class_import_put
);
988 static void init_imp_at(struct imp_at
*at
) {
990 at_init(&at
->iat_net_latency
, 0, 0);
991 for (i
= 0; i
< IMP_AT_MAX_PORTALS
; i
++) {
992 /* max service estimates are tracked on the server side, so
993 don't use the AT history here, just use the last reported
994 val. (But keep hist for proc histogram, worst_ever) */
995 at_init(&at
->iat_service_estimate
[i
], INITIAL_CONNECT_TIMEOUT
,
1000 struct obd_import
*class_new_import(struct obd_device
*obd
)
1002 struct obd_import
*imp
;
1004 OBD_ALLOC(imp
, sizeof(*imp
));
1008 INIT_LIST_HEAD(&imp
->imp_pinger_chain
);
1009 INIT_LIST_HEAD(&imp
->imp_zombie_chain
);
1010 INIT_LIST_HEAD(&imp
->imp_replay_list
);
1011 INIT_LIST_HEAD(&imp
->imp_sending_list
);
1012 INIT_LIST_HEAD(&imp
->imp_delayed_list
);
1013 INIT_LIST_HEAD(&imp
->imp_committed_list
);
1014 imp
->imp_replay_cursor
= &imp
->imp_committed_list
;
1015 spin_lock_init(&imp
->imp_lock
);
1016 imp
->imp_last_success_conn
= 0;
1017 imp
->imp_state
= LUSTRE_IMP_NEW
;
1018 imp
->imp_obd
= class_incref(obd
, "import", imp
);
1019 mutex_init(&imp
->imp_sec_mutex
);
1020 init_waitqueue_head(&imp
->imp_recovery_waitq
);
1022 atomic_set(&imp
->imp_refcount
, 2);
1023 atomic_set(&imp
->imp_unregistering
, 0);
1024 atomic_set(&imp
->imp_inflight
, 0);
1025 atomic_set(&imp
->imp_replay_inflight
, 0);
1026 atomic_set(&imp
->imp_inval_count
, 0);
1027 INIT_LIST_HEAD(&imp
->imp_conn_list
);
1028 INIT_LIST_HEAD(&imp
->imp_handle
.h_link
);
1029 class_handle_hash(&imp
->imp_handle
, &import_handle_ops
);
1030 init_imp_at(&imp
->imp_at
);
1032 /* the default magic is V2, will be used in connect RPC, and
1033 * then adjusted according to the flags in request/reply. */
1034 imp
->imp_msg_magic
= LUSTRE_MSG_MAGIC_V2
;
1038 EXPORT_SYMBOL(class_new_import
);
1040 void class_destroy_import(struct obd_import
*import
)
1042 LASSERT(import
!= NULL
);
1043 LASSERT(import
!= LP_POISON
);
1045 class_handle_unhash(&import
->imp_handle
);
1047 spin_lock(&import
->imp_lock
);
1048 import
->imp_generation
++;
1049 spin_unlock(&import
->imp_lock
);
1050 class_import_put(import
);
1052 EXPORT_SYMBOL(class_destroy_import
);
1054 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1056 void __class_export_add_lock_ref(struct obd_export
*exp
, struct ldlm_lock
*lock
)
1058 spin_lock(&exp
->exp_locks_list_guard
);
1060 LASSERT(lock
->l_exp_refs_nr
>= 0);
1062 if (lock
->l_exp_refs_target
!= NULL
&&
1063 lock
->l_exp_refs_target
!= exp
) {
1064 LCONSOLE_WARN("setting export %p for lock %p which already has export %p\n",
1065 exp
, lock
, lock
->l_exp_refs_target
);
1067 if ((lock
->l_exp_refs_nr
++) == 0) {
1068 list_add(&lock
->l_exp_refs_link
, &exp
->exp_locks_list
);
1069 lock
->l_exp_refs_target
= exp
;
1071 CDEBUG(D_INFO
, "lock = %p, export = %p, refs = %u\n",
1072 lock
, exp
, lock
->l_exp_refs_nr
);
1073 spin_unlock(&exp
->exp_locks_list_guard
);
1075 EXPORT_SYMBOL(__class_export_add_lock_ref
);
1077 void __class_export_del_lock_ref(struct obd_export
*exp
, struct ldlm_lock
*lock
)
1079 spin_lock(&exp
->exp_locks_list_guard
);
1080 LASSERT(lock
->l_exp_refs_nr
> 0);
1081 if (lock
->l_exp_refs_target
!= exp
) {
1082 LCONSOLE_WARN("lock %p, "
1083 "mismatching export pointers: %p, %p\n",
1084 lock
, lock
->l_exp_refs_target
, exp
);
1086 if (-- lock
->l_exp_refs_nr
== 0) {
1087 list_del_init(&lock
->l_exp_refs_link
);
1088 lock
->l_exp_refs_target
= NULL
;
1090 CDEBUG(D_INFO
, "lock = %p, export = %p, refs = %u\n",
1091 lock
, exp
, lock
->l_exp_refs_nr
);
1092 spin_unlock(&exp
->exp_locks_list_guard
);
1094 EXPORT_SYMBOL(__class_export_del_lock_ref
);
1097 /* A connection defines an export context in which preallocation can
1098 be managed. This releases the export pointer reference, and returns
1099 the export handle, so the export refcount is 1 when this function
1101 int class_connect(struct lustre_handle
*conn
, struct obd_device
*obd
,
1102 struct obd_uuid
*cluuid
)
1104 struct obd_export
*export
;
1105 LASSERT(conn
!= NULL
);
1106 LASSERT(obd
!= NULL
);
1107 LASSERT(cluuid
!= NULL
);
1109 export
= class_new_export(obd
, cluuid
);
1111 return PTR_ERR(export
);
1113 conn
->cookie
= export
->exp_handle
.h_cookie
;
1114 class_export_put(export
);
1116 CDEBUG(D_IOCTL
, "connect: client %s, cookie %#llx\n",
1117 cluuid
->uuid
, conn
->cookie
);
1120 EXPORT_SYMBOL(class_connect
);
1122 /* if export is involved in recovery then clean up related things */
1123 void class_export_recovery_cleanup(struct obd_export
*exp
)
1125 struct obd_device
*obd
= exp
->exp_obd
;
1127 spin_lock(&obd
->obd_recovery_task_lock
);
1128 if (exp
->exp_delayed
)
1129 obd
->obd_delayed_clients
--;
1130 if (obd
->obd_recovering
) {
1131 if (exp
->exp_in_recovery
) {
1132 spin_lock(&exp
->exp_lock
);
1133 exp
->exp_in_recovery
= 0;
1134 spin_unlock(&exp
->exp_lock
);
1135 LASSERT_ATOMIC_POS(&obd
->obd_connected_clients
);
1136 atomic_dec(&obd
->obd_connected_clients
);
1139 /* if called during recovery then should update
1140 * obd_stale_clients counter,
1141 * lightweight exports are not counted */
1142 if (exp
->exp_failed
&&
1143 (exp_connect_flags(exp
) & OBD_CONNECT_LIGHTWEIGHT
) == 0)
1144 exp
->exp_obd
->obd_stale_clients
++;
1146 spin_unlock(&obd
->obd_recovery_task_lock
);
1147 /** Cleanup req replay fields */
1148 if (exp
->exp_req_replay_needed
) {
1149 spin_lock(&exp
->exp_lock
);
1150 exp
->exp_req_replay_needed
= 0;
1151 spin_unlock(&exp
->exp_lock
);
1152 LASSERT(atomic_read(&obd
->obd_req_replay_clients
));
1153 atomic_dec(&obd
->obd_req_replay_clients
);
1155 /** Cleanup lock replay data */
1156 if (exp
->exp_lock_replay_needed
) {
1157 spin_lock(&exp
->exp_lock
);
1158 exp
->exp_lock_replay_needed
= 0;
1159 spin_unlock(&exp
->exp_lock
);
1160 LASSERT(atomic_read(&obd
->obd_lock_replay_clients
));
1161 atomic_dec(&obd
->obd_lock_replay_clients
);
1165 /* This function removes 1-3 references from the export:
1166 * 1 - for export pointer passed
1167 * and if disconnect really need
1168 * 2 - removing from hash
1169 * 3 - in client_unlink_export
1170 * The export pointer passed to this function can destroyed */
1171 int class_disconnect(struct obd_export
*export
)
1173 int already_disconnected
;
1175 if (export
== NULL
) {
1176 CWARN("attempting to free NULL export %p\n", export
);
1180 spin_lock(&export
->exp_lock
);
1181 already_disconnected
= export
->exp_disconnected
;
1182 export
->exp_disconnected
= 1;
1183 spin_unlock(&export
->exp_lock
);
1185 /* class_cleanup(), abort_recovery(), and class_fail_export()
1186 * all end up in here, and if any of them race we shouldn't
1187 * call extra class_export_puts(). */
1188 if (already_disconnected
) {
1189 LASSERT(hlist_unhashed(&export
->exp_nid_hash
));
1190 GOTO(no_disconn
, already_disconnected
);
1193 CDEBUG(D_IOCTL
, "disconnect: cookie %#llx\n",
1194 export
->exp_handle
.h_cookie
);
1196 if (!hlist_unhashed(&export
->exp_nid_hash
))
1197 cfs_hash_del(export
->exp_obd
->obd_nid_hash
,
1198 &export
->exp_connection
->c_peer
.nid
,
1199 &export
->exp_nid_hash
);
1201 class_export_recovery_cleanup(export
);
1202 class_unlink_export(export
);
1204 class_export_put(export
);
1207 EXPORT_SYMBOL(class_disconnect
);
1209 /* Return non-zero for a fully connected export */
1210 int class_connected_export(struct obd_export
*exp
)
1214 spin_lock(&exp
->exp_lock
);
1215 connected
= (exp
->exp_conn_cnt
> 0);
1216 spin_unlock(&exp
->exp_lock
);
1221 EXPORT_SYMBOL(class_connected_export
);
1223 static void class_disconnect_export_list(struct list_head
*list
,
1224 enum obd_option flags
)
1227 struct obd_export
*exp
;
1229 /* It's possible that an export may disconnect itself, but
1230 * nothing else will be added to this list. */
1231 while (!list_empty(list
)) {
1232 exp
= list_entry(list
->next
, struct obd_export
,
1234 /* need for safe call CDEBUG after obd_disconnect */
1235 class_export_get(exp
);
1237 spin_lock(&exp
->exp_lock
);
1238 exp
->exp_flags
= flags
;
1239 spin_unlock(&exp
->exp_lock
);
1241 if (obd_uuid_equals(&exp
->exp_client_uuid
,
1242 &exp
->exp_obd
->obd_uuid
)) {
1244 "exp %p export uuid == obd uuid, don't discon\n",
1246 /* Need to delete this now so we don't end up pointing
1247 * to work_list later when this export is cleaned up. */
1248 list_del_init(&exp
->exp_obd_chain
);
1249 class_export_put(exp
);
1253 class_export_get(exp
);
1254 CDEBUG(D_HA
, "%s: disconnecting export at %s (%p), "
1255 "last request at "CFS_TIME_T
"\n",
1256 exp
->exp_obd
->obd_name
, obd_export_nid2str(exp
),
1257 exp
, exp
->exp_last_request_time
);
1258 /* release one export reference anyway */
1259 rc
= obd_disconnect(exp
);
1261 CDEBUG(D_HA
, "disconnected export at %s (%p): rc %d\n",
1262 obd_export_nid2str(exp
), exp
, rc
);
1263 class_export_put(exp
);
1267 void class_disconnect_exports(struct obd_device
*obd
)
1269 struct list_head work_list
;
1271 /* Move all of the exports from obd_exports to a work list, en masse. */
1272 INIT_LIST_HEAD(&work_list
);
1273 spin_lock(&obd
->obd_dev_lock
);
1274 list_splice_init(&obd
->obd_exports
, &work_list
);
1275 list_splice_init(&obd
->obd_delayed_exports
, &work_list
);
1276 spin_unlock(&obd
->obd_dev_lock
);
1278 if (!list_empty(&work_list
)) {
1279 CDEBUG(D_HA
, "OBD device %d (%p) has exports, "
1280 "disconnecting them\n", obd
->obd_minor
, obd
);
1281 class_disconnect_export_list(&work_list
,
1282 exp_flags_from_obd(obd
));
1284 CDEBUG(D_HA
, "OBD device %d (%p) has no exports\n",
1285 obd
->obd_minor
, obd
);
1287 EXPORT_SYMBOL(class_disconnect_exports
);
1289 /* Remove exports that have not completed recovery.
1291 void class_disconnect_stale_exports(struct obd_device
*obd
,
1292 int (*test_export
)(struct obd_export
*))
1294 struct list_head work_list
;
1295 struct obd_export
*exp
, *n
;
1298 INIT_LIST_HEAD(&work_list
);
1299 spin_lock(&obd
->obd_dev_lock
);
1300 list_for_each_entry_safe(exp
, n
, &obd
->obd_exports
,
1302 /* don't count self-export as client */
1303 if (obd_uuid_equals(&exp
->exp_client_uuid
,
1304 &exp
->exp_obd
->obd_uuid
))
1307 /* don't evict clients which have no slot in last_rcvd
1308 * (e.g. lightweight connection) */
1309 if (exp
->exp_target_data
.ted_lr_idx
== -1)
1312 spin_lock(&exp
->exp_lock
);
1313 if (exp
->exp_failed
|| test_export(exp
)) {
1314 spin_unlock(&exp
->exp_lock
);
1317 exp
->exp_failed
= 1;
1318 spin_unlock(&exp
->exp_lock
);
1320 list_move(&exp
->exp_obd_chain
, &work_list
);
1322 CDEBUG(D_HA
, "%s: disconnect stale client %s@%s\n",
1323 obd
->obd_name
, exp
->exp_client_uuid
.uuid
,
1324 exp
->exp_connection
== NULL
? "<unknown>" :
1325 libcfs_nid2str(exp
->exp_connection
->c_peer
.nid
));
1326 print_export_data(exp
, "EVICTING", 0);
1328 spin_unlock(&obd
->obd_dev_lock
);
1331 LCONSOLE_WARN("%s: disconnecting %d stale clients\n",
1332 obd
->obd_name
, evicted
);
1334 class_disconnect_export_list(&work_list
, exp_flags_from_obd(obd
) |
1335 OBD_OPT_ABORT_RECOV
);
1337 EXPORT_SYMBOL(class_disconnect_stale_exports
);
1339 void class_fail_export(struct obd_export
*exp
)
1341 int rc
, already_failed
;
1343 spin_lock(&exp
->exp_lock
);
1344 already_failed
= exp
->exp_failed
;
1345 exp
->exp_failed
= 1;
1346 spin_unlock(&exp
->exp_lock
);
1348 if (already_failed
) {
1349 CDEBUG(D_HA
, "disconnecting dead export %p/%s; skipping\n",
1350 exp
, exp
->exp_client_uuid
.uuid
);
1354 CDEBUG(D_HA
, "disconnecting export %p/%s\n",
1355 exp
, exp
->exp_client_uuid
.uuid
);
1357 if (obd_dump_on_timeout
)
1358 libcfs_debug_dumplog();
1360 /* need for safe call CDEBUG after obd_disconnect */
1361 class_export_get(exp
);
1363 /* Most callers into obd_disconnect are removing their own reference
1364 * (request, for example) in addition to the one from the hash table.
1365 * We don't have such a reference here, so make one. */
1366 class_export_get(exp
);
1367 rc
= obd_disconnect(exp
);
1369 CERROR("disconnecting export %p failed: %d\n", exp
, rc
);
1371 CDEBUG(D_HA
, "disconnected export %p/%s\n",
1372 exp
, exp
->exp_client_uuid
.uuid
);
1373 class_export_put(exp
);
1375 EXPORT_SYMBOL(class_fail_export
);
1377 char *obd_export_nid2str(struct obd_export
*exp
)
1379 if (exp
->exp_connection
!= NULL
)
1380 return libcfs_nid2str(exp
->exp_connection
->c_peer
.nid
);
1384 EXPORT_SYMBOL(obd_export_nid2str
);
1386 int obd_export_evict_by_nid(struct obd_device
*obd
, const char *nid
)
1388 struct cfs_hash
*nid_hash
;
1389 struct obd_export
*doomed_exp
= NULL
;
1390 int exports_evicted
= 0;
1392 lnet_nid_t nid_key
= libcfs_str2nid((char *)nid
);
1394 spin_lock(&obd
->obd_dev_lock
);
1395 /* umount has run already, so evict thread should leave
1396 * its task to umount thread now */
1397 if (obd
->obd_stopping
) {
1398 spin_unlock(&obd
->obd_dev_lock
);
1399 return exports_evicted
;
1401 nid_hash
= obd
->obd_nid_hash
;
1402 cfs_hash_getref(nid_hash
);
1403 spin_unlock(&obd
->obd_dev_lock
);
1406 doomed_exp
= cfs_hash_lookup(nid_hash
, &nid_key
);
1407 if (doomed_exp
== NULL
)
1410 LASSERTF(doomed_exp
->exp_connection
->c_peer
.nid
== nid_key
,
1411 "nid %s found, wanted nid %s, requested nid %s\n",
1412 obd_export_nid2str(doomed_exp
),
1413 libcfs_nid2str(nid_key
), nid
);
1414 LASSERTF(doomed_exp
!= obd
->obd_self_export
,
1415 "self-export is hashed by NID?\n");
1417 LCONSOLE_WARN("%s: evicting %s (at %s) by administrative "
1418 "request\n", obd
->obd_name
,
1419 obd_uuid2str(&doomed_exp
->exp_client_uuid
),
1420 obd_export_nid2str(doomed_exp
));
1421 class_fail_export(doomed_exp
);
1422 class_export_put(doomed_exp
);
1425 cfs_hash_putref(nid_hash
);
1427 if (!exports_evicted
)
1428 CDEBUG(D_HA
,"%s: can't disconnect NID '%s': no exports found\n",
1429 obd
->obd_name
, nid
);
1430 return exports_evicted
;
1432 EXPORT_SYMBOL(obd_export_evict_by_nid
);
1434 int obd_export_evict_by_uuid(struct obd_device
*obd
, const char *uuid
)
1436 struct cfs_hash
*uuid_hash
;
1437 struct obd_export
*doomed_exp
= NULL
;
1438 struct obd_uuid doomed_uuid
;
1439 int exports_evicted
= 0;
1441 spin_lock(&obd
->obd_dev_lock
);
1442 if (obd
->obd_stopping
) {
1443 spin_unlock(&obd
->obd_dev_lock
);
1444 return exports_evicted
;
1446 uuid_hash
= obd
->obd_uuid_hash
;
1447 cfs_hash_getref(uuid_hash
);
1448 spin_unlock(&obd
->obd_dev_lock
);
1450 obd_str2uuid(&doomed_uuid
, uuid
);
1451 if (obd_uuid_equals(&doomed_uuid
, &obd
->obd_uuid
)) {
1452 CERROR("%s: can't evict myself\n", obd
->obd_name
);
1453 cfs_hash_putref(uuid_hash
);
1454 return exports_evicted
;
1457 doomed_exp
= cfs_hash_lookup(uuid_hash
, &doomed_uuid
);
1459 if (doomed_exp
== NULL
) {
1460 CERROR("%s: can't disconnect %s: no exports found\n",
1461 obd
->obd_name
, uuid
);
1463 CWARN("%s: evicting %s at administrative request\n",
1464 obd
->obd_name
, doomed_exp
->exp_client_uuid
.uuid
);
1465 class_fail_export(doomed_exp
);
1466 class_export_put(doomed_exp
);
1469 cfs_hash_putref(uuid_hash
);
1471 return exports_evicted
;
1473 EXPORT_SYMBOL(obd_export_evict_by_uuid
);
1475 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1476 void (*class_export_dump_hook
)(struct obd_export
*) = NULL
;
1477 EXPORT_SYMBOL(class_export_dump_hook
);
1480 static void print_export_data(struct obd_export
*exp
, const char *status
,
1483 struct ptlrpc_reply_state
*rs
;
1484 struct ptlrpc_reply_state
*first_reply
= NULL
;
1487 spin_lock(&exp
->exp_lock
);
1488 list_for_each_entry(rs
, &exp
->exp_outstanding_replies
,
1494 spin_unlock(&exp
->exp_lock
);
1496 CDEBUG(D_HA
, "%s: %s %p %s %s %d (%d %d %d) %d %d %d %d: %p %s %llu\n",
1497 exp
->exp_obd
->obd_name
, status
, exp
, exp
->exp_client_uuid
.uuid
,
1498 obd_export_nid2str(exp
), atomic_read(&exp
->exp_refcount
),
1499 atomic_read(&exp
->exp_rpc_count
),
1500 atomic_read(&exp
->exp_cb_count
),
1501 atomic_read(&exp
->exp_locks_count
),
1502 exp
->exp_disconnected
, exp
->exp_delayed
, exp
->exp_failed
,
1503 nreplies
, first_reply
, nreplies
> 3 ? "..." : "",
1504 exp
->exp_last_committed
);
1505 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1506 if (locks
&& class_export_dump_hook
!= NULL
)
1507 class_export_dump_hook(exp
);
1511 void dump_exports(struct obd_device
*obd
, int locks
)
1513 struct obd_export
*exp
;
1515 spin_lock(&obd
->obd_dev_lock
);
1516 list_for_each_entry(exp
, &obd
->obd_exports
, exp_obd_chain
)
1517 print_export_data(exp
, "ACTIVE", locks
);
1518 list_for_each_entry(exp
, &obd
->obd_unlinked_exports
, exp_obd_chain
)
1519 print_export_data(exp
, "UNLINKED", locks
);
1520 list_for_each_entry(exp
, &obd
->obd_delayed_exports
, exp_obd_chain
)
1521 print_export_data(exp
, "DELAYED", locks
);
1522 spin_unlock(&obd
->obd_dev_lock
);
1523 spin_lock(&obd_zombie_impexp_lock
);
1524 list_for_each_entry(exp
, &obd_zombie_exports
, exp_obd_chain
)
1525 print_export_data(exp
, "ZOMBIE", locks
);
1526 spin_unlock(&obd_zombie_impexp_lock
);
1528 EXPORT_SYMBOL(dump_exports
);
1530 void obd_exports_barrier(struct obd_device
*obd
)
1533 LASSERT(list_empty(&obd
->obd_exports
));
1534 spin_lock(&obd
->obd_dev_lock
);
1535 while (!list_empty(&obd
->obd_unlinked_exports
)) {
1536 spin_unlock(&obd
->obd_dev_lock
);
1537 set_current_state(TASK_UNINTERRUPTIBLE
);
1538 schedule_timeout(cfs_time_seconds(waited
));
1539 if (waited
> 5 && IS_PO2(waited
)) {
1540 LCONSOLE_WARN("%s is waiting for obd_unlinked_exports "
1541 "more than %d seconds. "
1542 "The obd refcount = %d. Is it stuck?\n",
1543 obd
->obd_name
, waited
,
1544 atomic_read(&obd
->obd_refcount
));
1545 dump_exports(obd
, 1);
1548 spin_lock(&obd
->obd_dev_lock
);
1550 spin_unlock(&obd
->obd_dev_lock
);
1552 EXPORT_SYMBOL(obd_exports_barrier
);
1554 /* Total amount of zombies to be destroyed */
1555 static int zombies_count
= 0;
1558 * kill zombie imports and exports
1560 void obd_zombie_impexp_cull(void)
1562 struct obd_import
*import
;
1563 struct obd_export
*export
;
1566 spin_lock(&obd_zombie_impexp_lock
);
1569 if (!list_empty(&obd_zombie_imports
)) {
1570 import
= list_entry(obd_zombie_imports
.next
,
1573 list_del_init(&import
->imp_zombie_chain
);
1577 if (!list_empty(&obd_zombie_exports
)) {
1578 export
= list_entry(obd_zombie_exports
.next
,
1581 list_del_init(&export
->exp_obd_chain
);
1584 spin_unlock(&obd_zombie_impexp_lock
);
1586 if (import
!= NULL
) {
1587 class_import_destroy(import
);
1588 spin_lock(&obd_zombie_impexp_lock
);
1590 spin_unlock(&obd_zombie_impexp_lock
);
1593 if (export
!= NULL
) {
1594 class_export_destroy(export
);
1595 spin_lock(&obd_zombie_impexp_lock
);
1597 spin_unlock(&obd_zombie_impexp_lock
);
1601 } while (import
!= NULL
|| export
!= NULL
);
1604 static struct completion obd_zombie_start
;
1605 static struct completion obd_zombie_stop
;
1606 static unsigned long obd_zombie_flags
;
1607 static wait_queue_head_t obd_zombie_waitq
;
1608 static pid_t obd_zombie_pid
;
1611 OBD_ZOMBIE_STOP
= 0x0001,
1615 * check for work for kill zombie import/export thread.
1617 static int obd_zombie_impexp_check(void *arg
)
1621 spin_lock(&obd_zombie_impexp_lock
);
1622 rc
= (zombies_count
== 0) &&
1623 !test_bit(OBD_ZOMBIE_STOP
, &obd_zombie_flags
);
1624 spin_unlock(&obd_zombie_impexp_lock
);
1630 * Add export to the obd_zombie thread and notify it.
1632 static void obd_zombie_export_add(struct obd_export
*exp
) {
1633 spin_lock(&exp
->exp_obd
->obd_dev_lock
);
1634 LASSERT(!list_empty(&exp
->exp_obd_chain
));
1635 list_del_init(&exp
->exp_obd_chain
);
1636 spin_unlock(&exp
->exp_obd
->obd_dev_lock
);
1637 spin_lock(&obd_zombie_impexp_lock
);
1639 list_add(&exp
->exp_obd_chain
, &obd_zombie_exports
);
1640 spin_unlock(&obd_zombie_impexp_lock
);
1642 obd_zombie_impexp_notify();
1646 * Add import to the obd_zombie thread and notify it.
1648 static void obd_zombie_import_add(struct obd_import
*imp
) {
1649 LASSERT(imp
->imp_sec
== NULL
);
1650 LASSERT(imp
->imp_rq_pool
== NULL
);
1651 spin_lock(&obd_zombie_impexp_lock
);
1652 LASSERT(list_empty(&imp
->imp_zombie_chain
));
1654 list_add(&imp
->imp_zombie_chain
, &obd_zombie_imports
);
1655 spin_unlock(&obd_zombie_impexp_lock
);
1657 obd_zombie_impexp_notify();
1661 * notify import/export destroy thread about new zombie.
1663 static void obd_zombie_impexp_notify(void)
1666 * Make sure obd_zombie_impexp_thread get this notification.
1667 * It is possible this signal only get by obd_zombie_barrier, and
1668 * barrier gulps this notification and sleeps away and hangs ensues
1670 wake_up_all(&obd_zombie_waitq
);
1674 * check whether obd_zombie is idle
1676 static int obd_zombie_is_idle(void)
1680 LASSERT(!test_bit(OBD_ZOMBIE_STOP
, &obd_zombie_flags
));
1681 spin_lock(&obd_zombie_impexp_lock
);
1682 rc
= (zombies_count
== 0);
1683 spin_unlock(&obd_zombie_impexp_lock
);
1688 * wait when obd_zombie import/export queues become empty
1690 void obd_zombie_barrier(void)
1692 struct l_wait_info lwi
= { 0 };
1694 if (obd_zombie_pid
== current_pid())
1695 /* don't wait for myself */
1697 l_wait_event(obd_zombie_waitq
, obd_zombie_is_idle(), &lwi
);
1699 EXPORT_SYMBOL(obd_zombie_barrier
);
1703 * destroy zombie export/import thread.
1705 static int obd_zombie_impexp_thread(void *unused
)
1707 unshare_fs_struct();
1708 complete(&obd_zombie_start
);
1710 obd_zombie_pid
= current_pid();
1712 while (!test_bit(OBD_ZOMBIE_STOP
, &obd_zombie_flags
)) {
1713 struct l_wait_info lwi
= { 0 };
1715 l_wait_event(obd_zombie_waitq
,
1716 !obd_zombie_impexp_check(NULL
), &lwi
);
1717 obd_zombie_impexp_cull();
1720 * Notify obd_zombie_barrier callers that queues
1723 wake_up(&obd_zombie_waitq
);
1726 complete(&obd_zombie_stop
);
1733 * start destroy zombie import/export thread
1735 int obd_zombie_impexp_init(void)
1737 struct task_struct
*task
;
1739 INIT_LIST_HEAD(&obd_zombie_imports
);
1740 INIT_LIST_HEAD(&obd_zombie_exports
);
1741 spin_lock_init(&obd_zombie_impexp_lock
);
1742 init_completion(&obd_zombie_start
);
1743 init_completion(&obd_zombie_stop
);
1744 init_waitqueue_head(&obd_zombie_waitq
);
1747 task
= kthread_run(obd_zombie_impexp_thread
, NULL
, "obd_zombid");
1749 return PTR_ERR(task
);
1751 wait_for_completion(&obd_zombie_start
);
1755 * stop destroy zombie import/export thread
1757 void obd_zombie_impexp_stop(void)
1759 set_bit(OBD_ZOMBIE_STOP
, &obd_zombie_flags
);
1760 obd_zombie_impexp_notify();
1761 wait_for_completion(&obd_zombie_stop
);
1764 /***** Kernel-userspace comm helpers *******/
1766 /* Get length of entire message, including header */
1767 int kuc_len(int payload_len
)
1769 return sizeof(struct kuc_hdr
) + payload_len
;
1771 EXPORT_SYMBOL(kuc_len
);
1773 /* Get a pointer to kuc header, given a ptr to the payload
1774 * @param p Pointer to payload area
1775 * @returns Pointer to kuc header
1777 struct kuc_hdr
* kuc_ptr(void *p
)
1779 struct kuc_hdr
*lh
= ((struct kuc_hdr
*)p
) - 1;
1780 LASSERT(lh
->kuc_magic
== KUC_MAGIC
);
1783 EXPORT_SYMBOL(kuc_ptr
);
1785 /* Test if payload is part of kuc message
1786 * @param p Pointer to payload area
1789 int kuc_ispayload(void *p
)
1791 struct kuc_hdr
*kh
= ((struct kuc_hdr
*)p
) - 1;
1793 if (kh
->kuc_magic
== KUC_MAGIC
)
1798 EXPORT_SYMBOL(kuc_ispayload
);
1800 /* Alloc space for a message, and fill in header
1801 * @return Pointer to payload area
1803 void *kuc_alloc(int payload_len
, int transport
, int type
)
1806 int len
= kuc_len(payload_len
);
1810 return ERR_PTR(-ENOMEM
);
1812 lh
->kuc_magic
= KUC_MAGIC
;
1813 lh
->kuc_transport
= transport
;
1814 lh
->kuc_msgtype
= type
;
1815 lh
->kuc_msglen
= len
;
1817 return (void *)(lh
+ 1);
1819 EXPORT_SYMBOL(kuc_alloc
);
1821 /* Takes pointer to payload area */
1822 inline void kuc_free(void *p
, int payload_len
)
1824 struct kuc_hdr
*lh
= kuc_ptr(p
);
1825 OBD_FREE(lh
, kuc_len(payload_len
));
1827 EXPORT_SYMBOL(kuc_free
);