4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License version 2 for more details (a copy is included
14 * in the LICENSE file that accompanied this code).
16 * You should have received a copy of the GNU General Public License
17 * version 2 along with this program; If not, see
18 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
20 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21 * CA 95054 USA or visit www.sun.com if you need additional information or
27 * Copyright (c) 1999, 2010, Oracle and/or its affiliates. All rights reserved.
28 * Use is subject to license terms.
30 * Copyright (c) 2011, 2012, Intel Corporation.
33 * This file is part of Lustre, http://www.lustre.org/
34 * Lustre is a trademark of Sun Microsystems, Inc.
36 * lustre/obdclass/genops.c
38 * These are the only exported functions, they provide some generic
39 * infrastructure for managing object devices
42 #define DEBUG_SUBSYSTEM S_CLASS
43 #include "../include/obd_class.h"
44 #include "../include/lprocfs_status.h"
46 spinlock_t obd_types_lock
;
48 static struct kmem_cache
*obd_device_cachep
;
49 struct kmem_cache
*obdo_cachep
;
50 EXPORT_SYMBOL(obdo_cachep
);
51 static struct kmem_cache
*import_cachep
;
53 static struct list_head obd_zombie_imports
;
54 static struct list_head obd_zombie_exports
;
55 static spinlock_t obd_zombie_impexp_lock
;
56 static void obd_zombie_impexp_notify(void);
57 static void obd_zombie_export_add(struct obd_export
*exp
);
58 static void obd_zombie_import_add(struct obd_import
*imp
);
59 static void print_export_data(struct obd_export
*exp
,
60 const char *status
, int locks
);
62 int (*ptlrpc_put_connection_superhack
)(struct ptlrpc_connection
*c
);
63 EXPORT_SYMBOL(ptlrpc_put_connection_superhack
);
66 * support functions: we could use inter-module communication, but this
67 * is more portable to other OS's
69 static struct obd_device
*obd_device_alloc(void)
71 struct obd_device
*obd
;
73 OBD_SLAB_ALLOC_PTR_GFP(obd
, obd_device_cachep
, GFP_NOFS
);
75 obd
->obd_magic
= OBD_DEVICE_MAGIC
;
79 static void obd_device_free(struct obd_device
*obd
)
82 LASSERTF(obd
->obd_magic
== OBD_DEVICE_MAGIC
, "obd %p obd_magic %08x != %08x\n",
83 obd
, obd
->obd_magic
, OBD_DEVICE_MAGIC
);
84 if (obd
->obd_namespace
!= NULL
) {
85 CERROR("obd %p: namespace %p was not properly cleaned up (obd_force=%d)!\n",
86 obd
, obd
->obd_namespace
, obd
->obd_force
);
89 lu_ref_fini(&obd
->obd_reference
);
90 OBD_SLAB_FREE_PTR(obd
, obd_device_cachep
);
93 struct obd_type
*class_search_type(const char *name
)
95 struct list_head
*tmp
;
96 struct obd_type
*type
;
98 spin_lock(&obd_types_lock
);
99 list_for_each(tmp
, &obd_types
) {
100 type
= list_entry(tmp
, struct obd_type
, typ_chain
);
101 if (strcmp(type
->typ_name
, name
) == 0) {
102 spin_unlock(&obd_types_lock
);
106 spin_unlock(&obd_types_lock
);
109 EXPORT_SYMBOL(class_search_type
);
111 struct obd_type
*class_get_type(const char *name
)
113 struct obd_type
*type
= class_search_type(name
);
116 const char *modname
= name
;
118 if (strcmp(modname
, "obdfilter") == 0)
121 if (strcmp(modname
, LUSTRE_LWP_NAME
) == 0)
122 modname
= LUSTRE_OSP_NAME
;
124 if (!strncmp(modname
, LUSTRE_MDS_NAME
, strlen(LUSTRE_MDS_NAME
)))
125 modname
= LUSTRE_MDT_NAME
;
127 if (!request_module("%s", modname
)) {
128 CDEBUG(D_INFO
, "Loaded module '%s'\n", modname
);
129 type
= class_search_type(name
);
131 LCONSOLE_ERROR_MSG(0x158, "Can't load module '%s'\n",
136 spin_lock(&type
->obd_type_lock
);
138 try_module_get(type
->typ_dt_ops
->o_owner
);
139 spin_unlock(&type
->obd_type_lock
);
143 EXPORT_SYMBOL(class_get_type
);
145 void class_put_type(struct obd_type
*type
)
148 spin_lock(&type
->obd_type_lock
);
150 module_put(type
->typ_dt_ops
->o_owner
);
151 spin_unlock(&type
->obd_type_lock
);
153 EXPORT_SYMBOL(class_put_type
);
155 #define CLASS_MAX_NAME 1024
157 int class_register_type(struct obd_ops
*dt_ops
, struct md_ops
*md_ops
,
159 struct lu_device_type
*ldt
)
161 struct obd_type
*type
;
165 LASSERT(strnlen(name
, CLASS_MAX_NAME
) < CLASS_MAX_NAME
);
167 if (class_search_type(name
)) {
168 CDEBUG(D_IOCTL
, "Type %s already registered\n", name
);
173 type
= kzalloc(sizeof(*type
), GFP_NOFS
);
177 type
->typ_dt_ops
= kzalloc(sizeof(*type
->typ_dt_ops
), GFP_NOFS
);
178 type
->typ_md_ops
= kzalloc(sizeof(*type
->typ_md_ops
), GFP_NOFS
);
179 type
->typ_name
= kzalloc(strlen(name
) + 1, GFP_NOFS
);
181 if (type
->typ_dt_ops
== NULL
||
182 type
->typ_md_ops
== NULL
||
183 type
->typ_name
== NULL
)
186 *(type
->typ_dt_ops
) = *dt_ops
;
187 /* md_ops is optional */
189 *(type
->typ_md_ops
) = *md_ops
;
190 strcpy(type
->typ_name
, name
);
191 spin_lock_init(&type
->obd_type_lock
);
193 type
->typ_debugfs_entry
= ldebugfs_register(type
->typ_name
,
196 if (IS_ERR_OR_NULL(type
->typ_debugfs_entry
)) {
197 rc
= type
->typ_debugfs_entry
? PTR_ERR(type
->typ_debugfs_entry
)
199 type
->typ_debugfs_entry
= NULL
;
203 type
->typ_kobj
= kobject_create_and_add(type
->typ_name
, lustre_kobj
);
204 if (!type
->typ_kobj
) {
211 rc
= lu_device_type_init(ldt
);
216 spin_lock(&obd_types_lock
);
217 list_add(&type
->typ_chain
, &obd_types
);
218 spin_unlock(&obd_types_lock
);
224 kobject_put(type
->typ_kobj
);
225 kfree(type
->typ_name
);
226 kfree(type
->typ_md_ops
);
227 kfree(type
->typ_dt_ops
);
231 EXPORT_SYMBOL(class_register_type
);
233 int class_unregister_type(const char *name
)
235 struct obd_type
*type
= class_search_type(name
);
238 CERROR("unknown obd type\n");
242 if (type
->typ_refcnt
) {
243 CERROR("type %s has refcount (%d)\n", name
, type
->typ_refcnt
);
244 /* This is a bad situation, let's make the best of it */
245 /* Remove ops, but leave the name for debugging */
246 kfree(type
->typ_dt_ops
);
247 kfree(type
->typ_md_ops
);
252 kobject_put(type
->typ_kobj
);
254 if (!IS_ERR_OR_NULL(type
->typ_debugfs_entry
))
255 ldebugfs_remove(&type
->typ_debugfs_entry
);
258 lu_device_type_fini(type
->typ_lu
);
260 spin_lock(&obd_types_lock
);
261 list_del(&type
->typ_chain
);
262 spin_unlock(&obd_types_lock
);
263 kfree(type
->typ_name
);
264 kfree(type
->typ_dt_ops
);
265 kfree(type
->typ_md_ops
);
268 } /* class_unregister_type */
269 EXPORT_SYMBOL(class_unregister_type
);
272 * Create a new obd device.
274 * Find an empty slot in ::obd_devs[], create a new obd device in it.
276 * \param[in] type_name obd device type string.
277 * \param[in] name obd device name.
279 * \retval NULL if create fails, otherwise return the obd device
282 struct obd_device
*class_newdev(const char *type_name
, const char *name
)
284 struct obd_device
*result
= NULL
;
285 struct obd_device
*newdev
;
286 struct obd_type
*type
= NULL
;
288 int new_obd_minor
= 0;
290 if (strlen(name
) >= MAX_OBD_NAME
) {
291 CERROR("name/uuid must be < %u bytes long\n", MAX_OBD_NAME
);
292 return ERR_PTR(-EINVAL
);
295 type
= class_get_type(type_name
);
297 CERROR("OBD: unknown type: %s\n", type_name
);
298 return ERR_PTR(-ENODEV
);
301 newdev
= obd_device_alloc();
302 if (newdev
== NULL
) {
303 result
= ERR_PTR(-ENOMEM
);
307 LASSERT(newdev
->obd_magic
== OBD_DEVICE_MAGIC
);
309 write_lock(&obd_dev_lock
);
310 for (i
= 0; i
< class_devno_max(); i
++) {
311 struct obd_device
*obd
= class_num2obd(i
);
313 if (obd
&& (strcmp(name
, obd
->obd_name
) == 0)) {
314 CERROR("Device %s already exists at %d, won't add\n",
317 LASSERTF(result
->obd_magic
== OBD_DEVICE_MAGIC
,
318 "%p obd_magic %08x != %08x\n", result
,
319 result
->obd_magic
, OBD_DEVICE_MAGIC
);
320 LASSERTF(result
->obd_minor
== new_obd_minor
,
321 "%p obd_minor %d != %d\n", result
,
322 result
->obd_minor
, new_obd_minor
);
324 obd_devs
[result
->obd_minor
] = NULL
;
325 result
->obd_name
[0] = '\0';
327 result
= ERR_PTR(-EEXIST
);
330 if (!result
&& !obd
) {
332 result
->obd_minor
= i
;
334 result
->obd_type
= type
;
335 strncpy(result
->obd_name
, name
,
336 sizeof(result
->obd_name
) - 1);
337 obd_devs
[i
] = result
;
340 write_unlock(&obd_dev_lock
);
342 if (result
== NULL
&& i
>= class_devno_max()) {
343 CERROR("all %u OBD devices used, increase MAX_OBD_DEVICES\n",
345 result
= ERR_PTR(-EOVERFLOW
);
352 CDEBUG(D_IOCTL
, "Adding new device %s (%p)\n",
353 result
->obd_name
, result
);
357 obd_device_free(newdev
);
359 class_put_type(type
);
363 void class_release_dev(struct obd_device
*obd
)
365 struct obd_type
*obd_type
= obd
->obd_type
;
367 LASSERTF(obd
->obd_magic
== OBD_DEVICE_MAGIC
, "%p obd_magic %08x != %08x\n",
368 obd
, obd
->obd_magic
, OBD_DEVICE_MAGIC
);
369 LASSERTF(obd
== obd_devs
[obd
->obd_minor
], "obd %p != obd_devs[%d] %p\n",
370 obd
, obd
->obd_minor
, obd_devs
[obd
->obd_minor
]);
371 LASSERT(obd_type
!= NULL
);
373 CDEBUG(D_INFO
, "Release obd device %s at %d obd_type name =%s\n",
374 obd
->obd_name
, obd
->obd_minor
, obd
->obd_type
->typ_name
);
376 write_lock(&obd_dev_lock
);
377 obd_devs
[obd
->obd_minor
] = NULL
;
378 write_unlock(&obd_dev_lock
);
379 obd_device_free(obd
);
381 class_put_type(obd_type
);
384 int class_name2dev(const char *name
)
391 read_lock(&obd_dev_lock
);
392 for (i
= 0; i
< class_devno_max(); i
++) {
393 struct obd_device
*obd
= class_num2obd(i
);
395 if (obd
&& strcmp(name
, obd
->obd_name
) == 0) {
396 /* Make sure we finished attaching before we give
397 out any references */
398 LASSERT(obd
->obd_magic
== OBD_DEVICE_MAGIC
);
399 if (obd
->obd_attached
) {
400 read_unlock(&obd_dev_lock
);
406 read_unlock(&obd_dev_lock
);
410 EXPORT_SYMBOL(class_name2dev
);
412 struct obd_device
*class_name2obd(const char *name
)
414 int dev
= class_name2dev(name
);
416 if (dev
< 0 || dev
> class_devno_max())
418 return class_num2obd(dev
);
420 EXPORT_SYMBOL(class_name2obd
);
422 int class_uuid2dev(struct obd_uuid
*uuid
)
426 read_lock(&obd_dev_lock
);
427 for (i
= 0; i
< class_devno_max(); i
++) {
428 struct obd_device
*obd
= class_num2obd(i
);
430 if (obd
&& obd_uuid_equals(uuid
, &obd
->obd_uuid
)) {
431 LASSERT(obd
->obd_magic
== OBD_DEVICE_MAGIC
);
432 read_unlock(&obd_dev_lock
);
436 read_unlock(&obd_dev_lock
);
440 EXPORT_SYMBOL(class_uuid2dev
);
442 struct obd_device
*class_uuid2obd(struct obd_uuid
*uuid
)
444 int dev
= class_uuid2dev(uuid
);
447 return class_num2obd(dev
);
449 EXPORT_SYMBOL(class_uuid2obd
);
452 * Get obd device from ::obd_devs[]
454 * \param num [in] array index
456 * \retval NULL if ::obd_devs[\a num] does not contains an obd device
457 * otherwise return the obd device there.
459 struct obd_device
*class_num2obd(int num
)
461 struct obd_device
*obd
= NULL
;
463 if (num
< class_devno_max()) {
468 LASSERTF(obd
->obd_magic
== OBD_DEVICE_MAGIC
,
469 "%p obd_magic %08x != %08x\n",
470 obd
, obd
->obd_magic
, OBD_DEVICE_MAGIC
);
471 LASSERTF(obd
->obd_minor
== num
,
472 "%p obd_minor %0d != %0d\n",
473 obd
, obd
->obd_minor
, num
);
478 EXPORT_SYMBOL(class_num2obd
);
481 * Get obd devices count. Device in any
483 * \retval obd device count
485 int get_devices_count(void)
487 int index
, max_index
= class_devno_max(), dev_count
= 0;
489 read_lock(&obd_dev_lock
);
490 for (index
= 0; index
<= max_index
; index
++) {
491 struct obd_device
*obd
= class_num2obd(index
);
495 read_unlock(&obd_dev_lock
);
499 EXPORT_SYMBOL(get_devices_count
);
501 void class_obd_list(void)
506 read_lock(&obd_dev_lock
);
507 for (i
= 0; i
< class_devno_max(); i
++) {
508 struct obd_device
*obd
= class_num2obd(i
);
512 if (obd
->obd_stopping
)
514 else if (obd
->obd_set_up
)
516 else if (obd
->obd_attached
)
520 LCONSOLE(D_CONFIG
, "%3d %s %s %s %s %d\n",
521 i
, status
, obd
->obd_type
->typ_name
,
522 obd
->obd_name
, obd
->obd_uuid
.uuid
,
523 atomic_read(&obd
->obd_refcount
));
525 read_unlock(&obd_dev_lock
);
529 /* Search for a client OBD connected to tgt_uuid. If grp_uuid is
530 specified, then only the client with that uuid is returned,
531 otherwise any client connected to the tgt is returned. */
532 struct obd_device
*class_find_client_obd(struct obd_uuid
*tgt_uuid
,
533 const char *typ_name
,
534 struct obd_uuid
*grp_uuid
)
538 read_lock(&obd_dev_lock
);
539 for (i
= 0; i
< class_devno_max(); i
++) {
540 struct obd_device
*obd
= class_num2obd(i
);
544 if ((strncmp(obd
->obd_type
->typ_name
, typ_name
,
545 strlen(typ_name
)) == 0)) {
546 if (obd_uuid_equals(tgt_uuid
,
547 &obd
->u
.cli
.cl_target_uuid
) &&
548 ((grp_uuid
)? obd_uuid_equals(grp_uuid
,
549 &obd
->obd_uuid
) : 1)) {
550 read_unlock(&obd_dev_lock
);
555 read_unlock(&obd_dev_lock
);
559 EXPORT_SYMBOL(class_find_client_obd
);
561 /* Iterate the obd_device list looking devices have grp_uuid. Start
562 searching at *next, and if a device is found, the next index to look
563 at is saved in *next. If next is NULL, then the first matching device
564 will always be returned. */
565 struct obd_device
*class_devices_in_group(struct obd_uuid
*grp_uuid
, int *next
)
571 else if (*next
>= 0 && *next
< class_devno_max())
576 read_lock(&obd_dev_lock
);
577 for (; i
< class_devno_max(); i
++) {
578 struct obd_device
*obd
= class_num2obd(i
);
582 if (obd_uuid_equals(grp_uuid
, &obd
->obd_uuid
)) {
585 read_unlock(&obd_dev_lock
);
589 read_unlock(&obd_dev_lock
);
593 EXPORT_SYMBOL(class_devices_in_group
);
596 * to notify sptlrpc log for \a fsname has changed, let every relevant OBD
597 * adjust sptlrpc settings accordingly.
599 int class_notify_sptlrpc_conf(const char *fsname
, int namelen
)
601 struct obd_device
*obd
;
605 LASSERT(namelen
> 0);
607 read_lock(&obd_dev_lock
);
608 for (i
= 0; i
< class_devno_max(); i
++) {
609 obd
= class_num2obd(i
);
611 if (obd
== NULL
|| obd
->obd_set_up
== 0 || obd
->obd_stopping
)
614 /* only notify mdc, osc, mdt, ost */
615 type
= obd
->obd_type
->typ_name
;
616 if (strcmp(type
, LUSTRE_MDC_NAME
) != 0 &&
617 strcmp(type
, LUSTRE_OSC_NAME
) != 0 &&
618 strcmp(type
, LUSTRE_MDT_NAME
) != 0 &&
619 strcmp(type
, LUSTRE_OST_NAME
) != 0)
622 if (strncmp(obd
->obd_name
, fsname
, namelen
))
625 class_incref(obd
, __func__
, obd
);
626 read_unlock(&obd_dev_lock
);
627 rc2
= obd_set_info_async(NULL
, obd
->obd_self_export
,
628 sizeof(KEY_SPTLRPC_CONF
),
629 KEY_SPTLRPC_CONF
, 0, NULL
, NULL
);
631 class_decref(obd
, __func__
, obd
);
632 read_lock(&obd_dev_lock
);
634 read_unlock(&obd_dev_lock
);
637 EXPORT_SYMBOL(class_notify_sptlrpc_conf
);
639 void obd_cleanup_caches(void)
641 if (obd_device_cachep
) {
642 kmem_cache_destroy(obd_device_cachep
);
643 obd_device_cachep
= NULL
;
646 kmem_cache_destroy(obdo_cachep
);
650 kmem_cache_destroy(import_cachep
);
651 import_cachep
= NULL
;
654 kmem_cache_destroy(capa_cachep
);
659 int obd_init_caches(void)
661 LASSERT(obd_device_cachep
== NULL
);
662 obd_device_cachep
= kmem_cache_create("ll_obd_dev_cache",
663 sizeof(struct obd_device
),
665 if (!obd_device_cachep
)
668 LASSERT(obdo_cachep
== NULL
);
669 obdo_cachep
= kmem_cache_create("ll_obdo_cache", sizeof(struct obdo
),
674 LASSERT(import_cachep
== NULL
);
675 import_cachep
= kmem_cache_create("ll_import_cache",
676 sizeof(struct obd_import
),
681 LASSERT(capa_cachep
== NULL
);
682 capa_cachep
= kmem_cache_create("capa_cache",
683 sizeof(struct obd_capa
), 0, 0, NULL
);
689 obd_cleanup_caches();
694 /* map connection to client */
695 struct obd_export
*class_conn2export(struct lustre_handle
*conn
)
697 struct obd_export
*export
;
700 CDEBUG(D_CACHE
, "looking for null handle\n");
704 if (conn
->cookie
== -1) { /* this means assign a new connection */
705 CDEBUG(D_CACHE
, "want a new connection\n");
709 CDEBUG(D_INFO
, "looking for export cookie %#llx\n", conn
->cookie
);
710 export
= class_handle2object(conn
->cookie
);
713 EXPORT_SYMBOL(class_conn2export
);
715 struct obd_device
*class_exp2obd(struct obd_export
*exp
)
721 EXPORT_SYMBOL(class_exp2obd
);
723 struct obd_device
*class_conn2obd(struct lustre_handle
*conn
)
725 struct obd_export
*export
;
726 export
= class_conn2export(conn
);
728 struct obd_device
*obd
= export
->exp_obd
;
729 class_export_put(export
);
734 EXPORT_SYMBOL(class_conn2obd
);
736 struct obd_import
*class_exp2cliimp(struct obd_export
*exp
)
738 struct obd_device
*obd
= exp
->exp_obd
;
741 return obd
->u
.cli
.cl_import
;
743 EXPORT_SYMBOL(class_exp2cliimp
);
745 struct obd_import
*class_conn2cliimp(struct lustre_handle
*conn
)
747 struct obd_device
*obd
= class_conn2obd(conn
);
750 return obd
->u
.cli
.cl_import
;
752 EXPORT_SYMBOL(class_conn2cliimp
);
754 /* Export management functions */
755 static void class_export_destroy(struct obd_export
*exp
)
757 struct obd_device
*obd
= exp
->exp_obd
;
759 LASSERT_ATOMIC_ZERO(&exp
->exp_refcount
);
760 LASSERT(obd
!= NULL
);
762 CDEBUG(D_IOCTL
, "destroying export %p/%s for %s\n", exp
,
763 exp
->exp_client_uuid
.uuid
, obd
->obd_name
);
765 /* "Local" exports (lctl, LOV->{mdc,osc}) have no connection. */
766 if (exp
->exp_connection
)
767 ptlrpc_put_connection_superhack(exp
->exp_connection
);
769 LASSERT(list_empty(&exp
->exp_outstanding_replies
));
770 LASSERT(list_empty(&exp
->exp_uncommitted_replies
));
771 LASSERT(list_empty(&exp
->exp_req_replay_queue
));
772 LASSERT(list_empty(&exp
->exp_hp_rpcs
));
773 obd_destroy_export(exp
);
774 class_decref(obd
, "export", exp
);
776 OBD_FREE_RCU(exp
, sizeof(*exp
), &exp
->exp_handle
);
779 static void export_handle_addref(void *export
)
781 class_export_get(export
);
784 static struct portals_handle_ops export_handle_ops
= {
785 .hop_addref
= export_handle_addref
,
789 struct obd_export
*class_export_get(struct obd_export
*exp
)
791 atomic_inc(&exp
->exp_refcount
);
792 CDEBUG(D_INFO
, "GETting export %p : new refcount %d\n", exp
,
793 atomic_read(&exp
->exp_refcount
));
796 EXPORT_SYMBOL(class_export_get
);
798 void class_export_put(struct obd_export
*exp
)
800 LASSERT(exp
!= NULL
);
801 LASSERT_ATOMIC_GT_LT(&exp
->exp_refcount
, 0, LI_POISON
);
802 CDEBUG(D_INFO
, "PUTting export %p : new refcount %d\n", exp
,
803 atomic_read(&exp
->exp_refcount
) - 1);
805 if (atomic_dec_and_test(&exp
->exp_refcount
)) {
806 LASSERT(!list_empty(&exp
->exp_obd_chain
));
807 CDEBUG(D_IOCTL
, "final put %p/%s\n",
808 exp
, exp
->exp_client_uuid
.uuid
);
810 /* release nid stat refererence */
811 lprocfs_exp_cleanup(exp
);
813 obd_zombie_export_add(exp
);
816 EXPORT_SYMBOL(class_export_put
);
818 /* Creates a new export, adds it to the hash table, and returns a
819 * pointer to it. The refcount is 2: one for the hash reference, and
820 * one for the pointer returned by this function. */
821 struct obd_export
*class_new_export(struct obd_device
*obd
,
822 struct obd_uuid
*cluuid
)
824 struct obd_export
*export
;
825 struct cfs_hash
*hash
= NULL
;
828 export
= kzalloc(sizeof(*export
), GFP_NOFS
);
830 return ERR_PTR(-ENOMEM
);
832 export
->exp_conn_cnt
= 0;
833 export
->exp_lock_hash
= NULL
;
834 export
->exp_flock_hash
= NULL
;
835 atomic_set(&export
->exp_refcount
, 2);
836 atomic_set(&export
->exp_rpc_count
, 0);
837 atomic_set(&export
->exp_cb_count
, 0);
838 atomic_set(&export
->exp_locks_count
, 0);
839 #if LUSTRE_TRACKS_LOCK_EXP_REFS
840 INIT_LIST_HEAD(&export
->exp_locks_list
);
841 spin_lock_init(&export
->exp_locks_list_guard
);
843 atomic_set(&export
->exp_replay_count
, 0);
844 export
->exp_obd
= obd
;
845 INIT_LIST_HEAD(&export
->exp_outstanding_replies
);
846 spin_lock_init(&export
->exp_uncommitted_replies_lock
);
847 INIT_LIST_HEAD(&export
->exp_uncommitted_replies
);
848 INIT_LIST_HEAD(&export
->exp_req_replay_queue
);
849 INIT_LIST_HEAD(&export
->exp_handle
.h_link
);
850 INIT_LIST_HEAD(&export
->exp_hp_rpcs
);
851 class_handle_hash(&export
->exp_handle
, &export_handle_ops
);
852 export
->exp_last_request_time
= get_seconds();
853 spin_lock_init(&export
->exp_lock
);
854 spin_lock_init(&export
->exp_rpc_lock
);
855 INIT_HLIST_NODE(&export
->exp_uuid_hash
);
856 INIT_HLIST_NODE(&export
->exp_nid_hash
);
857 spin_lock_init(&export
->exp_bl_list_lock
);
858 INIT_LIST_HEAD(&export
->exp_bl_list
);
860 export
->exp_sp_peer
= LUSTRE_SP_ANY
;
861 export
->exp_flvr
.sf_rpc
= SPTLRPC_FLVR_INVALID
;
862 export
->exp_client_uuid
= *cluuid
;
863 obd_init_export(export
);
865 spin_lock(&obd
->obd_dev_lock
);
866 /* shouldn't happen, but might race */
867 if (obd
->obd_stopping
) {
872 hash
= cfs_hash_getref(obd
->obd_uuid_hash
);
877 spin_unlock(&obd
->obd_dev_lock
);
879 if (!obd_uuid_equals(cluuid
, &obd
->obd_uuid
)) {
880 rc
= cfs_hash_add_unique(hash
, cluuid
, &export
->exp_uuid_hash
);
882 LCONSOLE_WARN("%s: denying duplicate export for %s, %d\n",
883 obd
->obd_name
, cluuid
->uuid
, rc
);
889 spin_lock(&obd
->obd_dev_lock
);
890 if (obd
->obd_stopping
) {
891 cfs_hash_del(hash
, cluuid
, &export
->exp_uuid_hash
);
896 class_incref(obd
, "export", export
);
897 list_add(&export
->exp_obd_chain
, &export
->exp_obd
->obd_exports
);
898 list_add_tail(&export
->exp_obd_chain_timed
,
899 &export
->exp_obd
->obd_exports_timed
);
900 export
->exp_obd
->obd_num_exports
++;
901 spin_unlock(&obd
->obd_dev_lock
);
902 cfs_hash_putref(hash
);
906 spin_unlock(&obd
->obd_dev_lock
);
909 cfs_hash_putref(hash
);
910 class_handle_unhash(&export
->exp_handle
);
911 LASSERT(hlist_unhashed(&export
->exp_uuid_hash
));
912 obd_destroy_export(export
);
916 EXPORT_SYMBOL(class_new_export
);
918 void class_unlink_export(struct obd_export
*exp
)
920 class_handle_unhash(&exp
->exp_handle
);
922 spin_lock(&exp
->exp_obd
->obd_dev_lock
);
923 /* delete an uuid-export hashitem from hashtables */
924 if (!hlist_unhashed(&exp
->exp_uuid_hash
))
925 cfs_hash_del(exp
->exp_obd
->obd_uuid_hash
,
926 &exp
->exp_client_uuid
,
927 &exp
->exp_uuid_hash
);
929 list_move(&exp
->exp_obd_chain
, &exp
->exp_obd
->obd_unlinked_exports
);
930 list_del_init(&exp
->exp_obd_chain_timed
);
931 exp
->exp_obd
->obd_num_exports
--;
932 spin_unlock(&exp
->exp_obd
->obd_dev_lock
);
933 class_export_put(exp
);
935 EXPORT_SYMBOL(class_unlink_export
);
937 /* Import management functions */
938 static void class_import_destroy(struct obd_import
*imp
)
940 CDEBUG(D_IOCTL
, "destroying import %p for %s\n", imp
,
941 imp
->imp_obd
->obd_name
);
943 LASSERT_ATOMIC_ZERO(&imp
->imp_refcount
);
945 ptlrpc_put_connection_superhack(imp
->imp_connection
);
947 while (!list_empty(&imp
->imp_conn_list
)) {
948 struct obd_import_conn
*imp_conn
;
950 imp_conn
= list_entry(imp
->imp_conn_list
.next
,
951 struct obd_import_conn
, oic_item
);
952 list_del_init(&imp_conn
->oic_item
);
953 ptlrpc_put_connection_superhack(imp_conn
->oic_conn
);
957 LASSERT(imp
->imp_sec
== NULL
);
958 class_decref(imp
->imp_obd
, "import", imp
);
959 OBD_FREE_RCU(imp
, sizeof(*imp
), &imp
->imp_handle
);
962 static void import_handle_addref(void *import
)
964 class_import_get(import
);
967 static struct portals_handle_ops import_handle_ops
= {
968 .hop_addref
= import_handle_addref
,
972 struct obd_import
*class_import_get(struct obd_import
*import
)
974 atomic_inc(&import
->imp_refcount
);
975 CDEBUG(D_INFO
, "import %p refcount=%d obd=%s\n", import
,
976 atomic_read(&import
->imp_refcount
),
977 import
->imp_obd
->obd_name
);
980 EXPORT_SYMBOL(class_import_get
);
982 void class_import_put(struct obd_import
*imp
)
984 LASSERT(list_empty(&imp
->imp_zombie_chain
));
985 LASSERT_ATOMIC_GT_LT(&imp
->imp_refcount
, 0, LI_POISON
);
987 CDEBUG(D_INFO
, "import %p refcount=%d obd=%s\n", imp
,
988 atomic_read(&imp
->imp_refcount
) - 1,
989 imp
->imp_obd
->obd_name
);
991 if (atomic_dec_and_test(&imp
->imp_refcount
)) {
992 CDEBUG(D_INFO
, "final put import %p\n", imp
);
993 obd_zombie_import_add(imp
);
996 /* catch possible import put race */
997 LASSERT_ATOMIC_GE_LT(&imp
->imp_refcount
, 0, LI_POISON
);
999 EXPORT_SYMBOL(class_import_put
);
1001 static void init_imp_at(struct imp_at
*at
)
1004 at_init(&at
->iat_net_latency
, 0, 0);
1005 for (i
= 0; i
< IMP_AT_MAX_PORTALS
; i
++) {
1006 /* max service estimates are tracked on the server side, so
1007 don't use the AT history here, just use the last reported
1008 val. (But keep hist for proc histogram, worst_ever) */
1009 at_init(&at
->iat_service_estimate
[i
], INITIAL_CONNECT_TIMEOUT
,
1014 struct obd_import
*class_new_import(struct obd_device
*obd
)
1016 struct obd_import
*imp
;
1018 imp
= kzalloc(sizeof(*imp
), GFP_NOFS
);
1022 INIT_LIST_HEAD(&imp
->imp_pinger_chain
);
1023 INIT_LIST_HEAD(&imp
->imp_zombie_chain
);
1024 INIT_LIST_HEAD(&imp
->imp_replay_list
);
1025 INIT_LIST_HEAD(&imp
->imp_sending_list
);
1026 INIT_LIST_HEAD(&imp
->imp_delayed_list
);
1027 INIT_LIST_HEAD(&imp
->imp_committed_list
);
1028 imp
->imp_replay_cursor
= &imp
->imp_committed_list
;
1029 spin_lock_init(&imp
->imp_lock
);
1030 imp
->imp_last_success_conn
= 0;
1031 imp
->imp_state
= LUSTRE_IMP_NEW
;
1032 imp
->imp_obd
= class_incref(obd
, "import", imp
);
1033 mutex_init(&imp
->imp_sec_mutex
);
1034 init_waitqueue_head(&imp
->imp_recovery_waitq
);
1036 atomic_set(&imp
->imp_refcount
, 2);
1037 atomic_set(&imp
->imp_unregistering
, 0);
1038 atomic_set(&imp
->imp_inflight
, 0);
1039 atomic_set(&imp
->imp_replay_inflight
, 0);
1040 atomic_set(&imp
->imp_inval_count
, 0);
1041 INIT_LIST_HEAD(&imp
->imp_conn_list
);
1042 INIT_LIST_HEAD(&imp
->imp_handle
.h_link
);
1043 class_handle_hash(&imp
->imp_handle
, &import_handle_ops
);
1044 init_imp_at(&imp
->imp_at
);
1046 /* the default magic is V2, will be used in connect RPC, and
1047 * then adjusted according to the flags in request/reply. */
1048 imp
->imp_msg_magic
= LUSTRE_MSG_MAGIC_V2
;
1052 EXPORT_SYMBOL(class_new_import
);
1054 void class_destroy_import(struct obd_import
*import
)
1056 LASSERT(import
!= NULL
);
1057 LASSERT(import
!= LP_POISON
);
1059 class_handle_unhash(&import
->imp_handle
);
1061 spin_lock(&import
->imp_lock
);
1062 import
->imp_generation
++;
1063 spin_unlock(&import
->imp_lock
);
1064 class_import_put(import
);
1066 EXPORT_SYMBOL(class_destroy_import
);
1068 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1070 void __class_export_add_lock_ref(struct obd_export
*exp
, struct ldlm_lock
*lock
)
1072 spin_lock(&exp
->exp_locks_list_guard
);
1074 LASSERT(lock
->l_exp_refs_nr
>= 0);
1076 if (lock
->l_exp_refs_target
!= NULL
&&
1077 lock
->l_exp_refs_target
!= exp
) {
1078 LCONSOLE_WARN("setting export %p for lock %p which already has export %p\n",
1079 exp
, lock
, lock
->l_exp_refs_target
);
1081 if ((lock
->l_exp_refs_nr
++) == 0) {
1082 list_add(&lock
->l_exp_refs_link
, &exp
->exp_locks_list
);
1083 lock
->l_exp_refs_target
= exp
;
1085 CDEBUG(D_INFO
, "lock = %p, export = %p, refs = %u\n",
1086 lock
, exp
, lock
->l_exp_refs_nr
);
1087 spin_unlock(&exp
->exp_locks_list_guard
);
1089 EXPORT_SYMBOL(__class_export_add_lock_ref
);
1091 void __class_export_del_lock_ref(struct obd_export
*exp
, struct ldlm_lock
*lock
)
1093 spin_lock(&exp
->exp_locks_list_guard
);
1094 LASSERT(lock
->l_exp_refs_nr
> 0);
1095 if (lock
->l_exp_refs_target
!= exp
) {
1096 LCONSOLE_WARN("lock %p, mismatching export pointers: %p, %p\n",
1097 lock
, lock
->l_exp_refs_target
, exp
);
1099 if (-- lock
->l_exp_refs_nr
== 0) {
1100 list_del_init(&lock
->l_exp_refs_link
);
1101 lock
->l_exp_refs_target
= NULL
;
1103 CDEBUG(D_INFO
, "lock = %p, export = %p, refs = %u\n",
1104 lock
, exp
, lock
->l_exp_refs_nr
);
1105 spin_unlock(&exp
->exp_locks_list_guard
);
1107 EXPORT_SYMBOL(__class_export_del_lock_ref
);
1110 /* A connection defines an export context in which preallocation can
1111 be managed. This releases the export pointer reference, and returns
1112 the export handle, so the export refcount is 1 when this function
1114 int class_connect(struct lustre_handle
*conn
, struct obd_device
*obd
,
1115 struct obd_uuid
*cluuid
)
1117 struct obd_export
*export
;
1118 LASSERT(conn
!= NULL
);
1119 LASSERT(obd
!= NULL
);
1120 LASSERT(cluuid
!= NULL
);
1122 export
= class_new_export(obd
, cluuid
);
1124 return PTR_ERR(export
);
1126 conn
->cookie
= export
->exp_handle
.h_cookie
;
1127 class_export_put(export
);
1129 CDEBUG(D_IOCTL
, "connect: client %s, cookie %#llx\n",
1130 cluuid
->uuid
, conn
->cookie
);
1133 EXPORT_SYMBOL(class_connect
);
1135 /* if export is involved in recovery then clean up related things */
1136 static void class_export_recovery_cleanup(struct obd_export
*exp
)
1138 struct obd_device
*obd
= exp
->exp_obd
;
1140 spin_lock(&obd
->obd_recovery_task_lock
);
1141 if (exp
->exp_delayed
)
1142 obd
->obd_delayed_clients
--;
1143 if (obd
->obd_recovering
) {
1144 if (exp
->exp_in_recovery
) {
1145 spin_lock(&exp
->exp_lock
);
1146 exp
->exp_in_recovery
= 0;
1147 spin_unlock(&exp
->exp_lock
);
1148 LASSERT_ATOMIC_POS(&obd
->obd_connected_clients
);
1149 atomic_dec(&obd
->obd_connected_clients
);
1152 /* if called during recovery then should update
1153 * obd_stale_clients counter,
1154 * lightweight exports are not counted */
1155 if (exp
->exp_failed
&&
1156 (exp_connect_flags(exp
) & OBD_CONNECT_LIGHTWEIGHT
) == 0)
1157 exp
->exp_obd
->obd_stale_clients
++;
1159 spin_unlock(&obd
->obd_recovery_task_lock
);
1161 spin_lock(&exp
->exp_lock
);
1162 /** Cleanup req replay fields */
1163 if (exp
->exp_req_replay_needed
) {
1164 exp
->exp_req_replay_needed
= 0;
1166 LASSERT(atomic_read(&obd
->obd_req_replay_clients
));
1167 atomic_dec(&obd
->obd_req_replay_clients
);
1170 /** Cleanup lock replay data */
1171 if (exp
->exp_lock_replay_needed
) {
1172 exp
->exp_lock_replay_needed
= 0;
1174 LASSERT(atomic_read(&obd
->obd_lock_replay_clients
));
1175 atomic_dec(&obd
->obd_lock_replay_clients
);
1177 spin_unlock(&exp
->exp_lock
);
1180 /* This function removes 1-3 references from the export:
1181 * 1 - for export pointer passed
1182 * and if disconnect really need
1183 * 2 - removing from hash
1184 * 3 - in client_unlink_export
1185 * The export pointer passed to this function can destroyed */
1186 int class_disconnect(struct obd_export
*export
)
1188 int already_disconnected
;
1190 if (export
== NULL
) {
1191 CWARN("attempting to free NULL export %p\n", export
);
1195 spin_lock(&export
->exp_lock
);
1196 already_disconnected
= export
->exp_disconnected
;
1197 export
->exp_disconnected
= 1;
1198 spin_unlock(&export
->exp_lock
);
1200 /* class_cleanup(), abort_recovery(), and class_fail_export()
1201 * all end up in here, and if any of them race we shouldn't
1202 * call extra class_export_puts(). */
1203 if (already_disconnected
) {
1204 LASSERT(hlist_unhashed(&export
->exp_nid_hash
));
1208 CDEBUG(D_IOCTL
, "disconnect: cookie %#llx\n",
1209 export
->exp_handle
.h_cookie
);
1211 if (!hlist_unhashed(&export
->exp_nid_hash
))
1212 cfs_hash_del(export
->exp_obd
->obd_nid_hash
,
1213 &export
->exp_connection
->c_peer
.nid
,
1214 &export
->exp_nid_hash
);
1216 class_export_recovery_cleanup(export
);
1217 class_unlink_export(export
);
1219 class_export_put(export
);
1222 EXPORT_SYMBOL(class_disconnect
);
1224 /* Return non-zero for a fully connected export */
1225 int class_connected_export(struct obd_export
*exp
)
1229 spin_lock(&exp
->exp_lock
);
1230 connected
= exp
->exp_conn_cnt
> 0;
1231 spin_unlock(&exp
->exp_lock
);
1236 EXPORT_SYMBOL(class_connected_export
);
1238 static void class_disconnect_export_list(struct list_head
*list
,
1239 enum obd_option flags
)
1242 struct obd_export
*exp
;
1244 /* It's possible that an export may disconnect itself, but
1245 * nothing else will be added to this list. */
1246 while (!list_empty(list
)) {
1247 exp
= list_entry(list
->next
, struct obd_export
,
1249 /* need for safe call CDEBUG after obd_disconnect */
1250 class_export_get(exp
);
1252 spin_lock(&exp
->exp_lock
);
1253 exp
->exp_flags
= flags
;
1254 spin_unlock(&exp
->exp_lock
);
1256 if (obd_uuid_equals(&exp
->exp_client_uuid
,
1257 &exp
->exp_obd
->obd_uuid
)) {
1259 "exp %p export uuid == obd uuid, don't discon\n",
1261 /* Need to delete this now so we don't end up pointing
1262 * to work_list later when this export is cleaned up. */
1263 list_del_init(&exp
->exp_obd_chain
);
1264 class_export_put(exp
);
1268 class_export_get(exp
);
1269 CDEBUG(D_HA
, "%s: disconnecting export at %s (%p), last request at " CFS_TIME_T
"\n",
1270 exp
->exp_obd
->obd_name
, obd_export_nid2str(exp
),
1271 exp
, exp
->exp_last_request_time
);
1272 /* release one export reference anyway */
1273 rc
= obd_disconnect(exp
);
1275 CDEBUG(D_HA
, "disconnected export at %s (%p): rc %d\n",
1276 obd_export_nid2str(exp
), exp
, rc
);
1277 class_export_put(exp
);
1281 void class_disconnect_exports(struct obd_device
*obd
)
1283 struct list_head work_list
;
1285 /* Move all of the exports from obd_exports to a work list, en masse. */
1286 INIT_LIST_HEAD(&work_list
);
1287 spin_lock(&obd
->obd_dev_lock
);
1288 list_splice_init(&obd
->obd_exports
, &work_list
);
1289 list_splice_init(&obd
->obd_delayed_exports
, &work_list
);
1290 spin_unlock(&obd
->obd_dev_lock
);
1292 if (!list_empty(&work_list
)) {
1293 CDEBUG(D_HA
, "OBD device %d (%p) has exports, disconnecting them\n",
1294 obd
->obd_minor
, obd
);
1295 class_disconnect_export_list(&work_list
,
1296 exp_flags_from_obd(obd
));
1298 CDEBUG(D_HA
, "OBD device %d (%p) has no exports\n",
1299 obd
->obd_minor
, obd
);
1301 EXPORT_SYMBOL(class_disconnect_exports
);
1303 /* Remove exports that have not completed recovery.
1305 void class_disconnect_stale_exports(struct obd_device
*obd
,
1306 int (*test_export
)(struct obd_export
*))
1308 struct list_head work_list
;
1309 struct obd_export
*exp
, *n
;
1312 INIT_LIST_HEAD(&work_list
);
1313 spin_lock(&obd
->obd_dev_lock
);
1314 list_for_each_entry_safe(exp
, n
, &obd
->obd_exports
,
1316 /* don't count self-export as client */
1317 if (obd_uuid_equals(&exp
->exp_client_uuid
,
1318 &exp
->exp_obd
->obd_uuid
))
1321 /* don't evict clients which have no slot in last_rcvd
1322 * (e.g. lightweight connection) */
1323 if (exp
->exp_target_data
.ted_lr_idx
== -1)
1326 spin_lock(&exp
->exp_lock
);
1327 if (exp
->exp_failed
|| test_export(exp
)) {
1328 spin_unlock(&exp
->exp_lock
);
1331 exp
->exp_failed
= 1;
1332 spin_unlock(&exp
->exp_lock
);
1334 list_move(&exp
->exp_obd_chain
, &work_list
);
1336 CDEBUG(D_HA
, "%s: disconnect stale client %s@%s\n",
1337 obd
->obd_name
, exp
->exp_client_uuid
.uuid
,
1338 exp
->exp_connection
== NULL
? "<unknown>" :
1339 libcfs_nid2str(exp
->exp_connection
->c_peer
.nid
));
1340 print_export_data(exp
, "EVICTING", 0);
1342 spin_unlock(&obd
->obd_dev_lock
);
1345 LCONSOLE_WARN("%s: disconnecting %d stale clients\n",
1346 obd
->obd_name
, evicted
);
1348 class_disconnect_export_list(&work_list
, exp_flags_from_obd(obd
) |
1349 OBD_OPT_ABORT_RECOV
);
1351 EXPORT_SYMBOL(class_disconnect_stale_exports
);
1353 void class_fail_export(struct obd_export
*exp
)
1355 int rc
, already_failed
;
1357 spin_lock(&exp
->exp_lock
);
1358 already_failed
= exp
->exp_failed
;
1359 exp
->exp_failed
= 1;
1360 spin_unlock(&exp
->exp_lock
);
1362 if (already_failed
) {
1363 CDEBUG(D_HA
, "disconnecting dead export %p/%s; skipping\n",
1364 exp
, exp
->exp_client_uuid
.uuid
);
1368 CDEBUG(D_HA
, "disconnecting export %p/%s\n",
1369 exp
, exp
->exp_client_uuid
.uuid
);
1371 if (obd_dump_on_timeout
)
1372 libcfs_debug_dumplog();
1374 /* need for safe call CDEBUG after obd_disconnect */
1375 class_export_get(exp
);
1377 /* Most callers into obd_disconnect are removing their own reference
1378 * (request, for example) in addition to the one from the hash table.
1379 * We don't have such a reference here, so make one. */
1380 class_export_get(exp
);
1381 rc
= obd_disconnect(exp
);
1383 CERROR("disconnecting export %p failed: %d\n", exp
, rc
);
1385 CDEBUG(D_HA
, "disconnected export %p/%s\n",
1386 exp
, exp
->exp_client_uuid
.uuid
);
1387 class_export_put(exp
);
1389 EXPORT_SYMBOL(class_fail_export
);
1391 char *obd_export_nid2str(struct obd_export
*exp
)
1393 if (exp
->exp_connection
!= NULL
)
1394 return libcfs_nid2str(exp
->exp_connection
->c_peer
.nid
);
1398 EXPORT_SYMBOL(obd_export_nid2str
);
1400 int obd_export_evict_by_nid(struct obd_device
*obd
, const char *nid
)
1402 struct cfs_hash
*nid_hash
;
1403 struct obd_export
*doomed_exp
= NULL
;
1404 int exports_evicted
= 0;
1406 lnet_nid_t nid_key
= libcfs_str2nid((char *)nid
);
1408 spin_lock(&obd
->obd_dev_lock
);
1409 /* umount has run already, so evict thread should leave
1410 * its task to umount thread now */
1411 if (obd
->obd_stopping
) {
1412 spin_unlock(&obd
->obd_dev_lock
);
1413 return exports_evicted
;
1415 nid_hash
= obd
->obd_nid_hash
;
1416 cfs_hash_getref(nid_hash
);
1417 spin_unlock(&obd
->obd_dev_lock
);
1420 doomed_exp
= cfs_hash_lookup(nid_hash
, &nid_key
);
1421 if (doomed_exp
== NULL
)
1424 LASSERTF(doomed_exp
->exp_connection
->c_peer
.nid
== nid_key
,
1425 "nid %s found, wanted nid %s, requested nid %s\n",
1426 obd_export_nid2str(doomed_exp
),
1427 libcfs_nid2str(nid_key
), nid
);
1428 LASSERTF(doomed_exp
!= obd
->obd_self_export
,
1429 "self-export is hashed by NID?\n");
1431 LCONSOLE_WARN("%s: evicting %s (at %s) by administrative request\n",
1433 obd_uuid2str(&doomed_exp
->exp_client_uuid
),
1434 obd_export_nid2str(doomed_exp
));
1435 class_fail_export(doomed_exp
);
1436 class_export_put(doomed_exp
);
1439 cfs_hash_putref(nid_hash
);
1441 if (!exports_evicted
)
1443 "%s: can't disconnect NID '%s': no exports found\n",
1444 obd
->obd_name
, nid
);
1445 return exports_evicted
;
1447 EXPORT_SYMBOL(obd_export_evict_by_nid
);
1449 int obd_export_evict_by_uuid(struct obd_device
*obd
, const char *uuid
)
1451 struct cfs_hash
*uuid_hash
;
1452 struct obd_export
*doomed_exp
= NULL
;
1453 struct obd_uuid doomed_uuid
;
1454 int exports_evicted
= 0;
1456 spin_lock(&obd
->obd_dev_lock
);
1457 if (obd
->obd_stopping
) {
1458 spin_unlock(&obd
->obd_dev_lock
);
1459 return exports_evicted
;
1461 uuid_hash
= obd
->obd_uuid_hash
;
1462 cfs_hash_getref(uuid_hash
);
1463 spin_unlock(&obd
->obd_dev_lock
);
1465 obd_str2uuid(&doomed_uuid
, uuid
);
1466 if (obd_uuid_equals(&doomed_uuid
, &obd
->obd_uuid
)) {
1467 CERROR("%s: can't evict myself\n", obd
->obd_name
);
1468 cfs_hash_putref(uuid_hash
);
1469 return exports_evicted
;
1472 doomed_exp
= cfs_hash_lookup(uuid_hash
, &doomed_uuid
);
1474 if (doomed_exp
== NULL
) {
1475 CERROR("%s: can't disconnect %s: no exports found\n",
1476 obd
->obd_name
, uuid
);
1478 CWARN("%s: evicting %s at administrative request\n",
1479 obd
->obd_name
, doomed_exp
->exp_client_uuid
.uuid
);
1480 class_fail_export(doomed_exp
);
1481 class_export_put(doomed_exp
);
1484 cfs_hash_putref(uuid_hash
);
1486 return exports_evicted
;
1488 EXPORT_SYMBOL(obd_export_evict_by_uuid
);
1490 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1491 void (*class_export_dump_hook
)(struct obd_export
*) = NULL
;
1492 EXPORT_SYMBOL(class_export_dump_hook
);
1495 static void print_export_data(struct obd_export
*exp
, const char *status
,
1498 struct ptlrpc_reply_state
*rs
;
1499 struct ptlrpc_reply_state
*first_reply
= NULL
;
1502 spin_lock(&exp
->exp_lock
);
1503 list_for_each_entry(rs
, &exp
->exp_outstanding_replies
,
1509 spin_unlock(&exp
->exp_lock
);
1511 CDEBUG(D_HA
, "%s: %s %p %s %s %d (%d %d %d) %d %d %d %d: %p %s %llu\n",
1512 exp
->exp_obd
->obd_name
, status
, exp
, exp
->exp_client_uuid
.uuid
,
1513 obd_export_nid2str(exp
), atomic_read(&exp
->exp_refcount
),
1514 atomic_read(&exp
->exp_rpc_count
),
1515 atomic_read(&exp
->exp_cb_count
),
1516 atomic_read(&exp
->exp_locks_count
),
1517 exp
->exp_disconnected
, exp
->exp_delayed
, exp
->exp_failed
,
1518 nreplies
, first_reply
, nreplies
> 3 ? "..." : "",
1519 exp
->exp_last_committed
);
1520 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1521 if (locks
&& class_export_dump_hook
!= NULL
)
1522 class_export_dump_hook(exp
);
1526 void dump_exports(struct obd_device
*obd
, int locks
)
1528 struct obd_export
*exp
;
1530 spin_lock(&obd
->obd_dev_lock
);
1531 list_for_each_entry(exp
, &obd
->obd_exports
, exp_obd_chain
)
1532 print_export_data(exp
, "ACTIVE", locks
);
1533 list_for_each_entry(exp
, &obd
->obd_unlinked_exports
, exp_obd_chain
)
1534 print_export_data(exp
, "UNLINKED", locks
);
1535 list_for_each_entry(exp
, &obd
->obd_delayed_exports
, exp_obd_chain
)
1536 print_export_data(exp
, "DELAYED", locks
);
1537 spin_unlock(&obd
->obd_dev_lock
);
1538 spin_lock(&obd_zombie_impexp_lock
);
1539 list_for_each_entry(exp
, &obd_zombie_exports
, exp_obd_chain
)
1540 print_export_data(exp
, "ZOMBIE", locks
);
1541 spin_unlock(&obd_zombie_impexp_lock
);
1543 EXPORT_SYMBOL(dump_exports
);
1545 void obd_exports_barrier(struct obd_device
*obd
)
1548 LASSERT(list_empty(&obd
->obd_exports
));
1549 spin_lock(&obd
->obd_dev_lock
);
1550 while (!list_empty(&obd
->obd_unlinked_exports
)) {
1551 spin_unlock(&obd
->obd_dev_lock
);
1552 set_current_state(TASK_UNINTERRUPTIBLE
);
1553 schedule_timeout(cfs_time_seconds(waited
));
1554 if (waited
> 5 && IS_PO2(waited
)) {
1555 LCONSOLE_WARN("%s is waiting for obd_unlinked_exports more than %d seconds. The obd refcount = %d. Is it stuck?\n",
1556 obd
->obd_name
, waited
,
1557 atomic_read(&obd
->obd_refcount
));
1558 dump_exports(obd
, 1);
1561 spin_lock(&obd
->obd_dev_lock
);
1563 spin_unlock(&obd
->obd_dev_lock
);
1565 EXPORT_SYMBOL(obd_exports_barrier
);
1567 /* Total amount of zombies to be destroyed */
1568 static int zombies_count
;
1571 * kill zombie imports and exports
1573 void obd_zombie_impexp_cull(void)
1575 struct obd_import
*import
;
1576 struct obd_export
*export
;
1579 spin_lock(&obd_zombie_impexp_lock
);
1582 if (!list_empty(&obd_zombie_imports
)) {
1583 import
= list_entry(obd_zombie_imports
.next
,
1586 list_del_init(&import
->imp_zombie_chain
);
1590 if (!list_empty(&obd_zombie_exports
)) {
1591 export
= list_entry(obd_zombie_exports
.next
,
1594 list_del_init(&export
->exp_obd_chain
);
1597 spin_unlock(&obd_zombie_impexp_lock
);
1599 if (import
!= NULL
) {
1600 class_import_destroy(import
);
1601 spin_lock(&obd_zombie_impexp_lock
);
1603 spin_unlock(&obd_zombie_impexp_lock
);
1606 if (export
!= NULL
) {
1607 class_export_destroy(export
);
1608 spin_lock(&obd_zombie_impexp_lock
);
1610 spin_unlock(&obd_zombie_impexp_lock
);
1614 } while (import
!= NULL
|| export
!= NULL
);
1617 static struct completion obd_zombie_start
;
1618 static struct completion obd_zombie_stop
;
1619 static unsigned long obd_zombie_flags
;
1620 static wait_queue_head_t obd_zombie_waitq
;
1621 static pid_t obd_zombie_pid
;
1624 OBD_ZOMBIE_STOP
= 0x0001,
1628 * check for work for kill zombie import/export thread.
1630 static int obd_zombie_impexp_check(void *arg
)
1634 spin_lock(&obd_zombie_impexp_lock
);
1635 rc
= (zombies_count
== 0) &&
1636 !test_bit(OBD_ZOMBIE_STOP
, &obd_zombie_flags
);
1637 spin_unlock(&obd_zombie_impexp_lock
);
1643 * Add export to the obd_zombie thread and notify it.
1645 static void obd_zombie_export_add(struct obd_export
*exp
)
1647 spin_lock(&exp
->exp_obd
->obd_dev_lock
);
1648 LASSERT(!list_empty(&exp
->exp_obd_chain
));
1649 list_del_init(&exp
->exp_obd_chain
);
1650 spin_unlock(&exp
->exp_obd
->obd_dev_lock
);
1651 spin_lock(&obd_zombie_impexp_lock
);
1653 list_add(&exp
->exp_obd_chain
, &obd_zombie_exports
);
1654 spin_unlock(&obd_zombie_impexp_lock
);
1656 obd_zombie_impexp_notify();
1660 * Add import to the obd_zombie thread and notify it.
1662 static void obd_zombie_import_add(struct obd_import
*imp
)
1664 LASSERT(imp
->imp_sec
== NULL
);
1665 LASSERT(imp
->imp_rq_pool
== NULL
);
1666 spin_lock(&obd_zombie_impexp_lock
);
1667 LASSERT(list_empty(&imp
->imp_zombie_chain
));
1669 list_add(&imp
->imp_zombie_chain
, &obd_zombie_imports
);
1670 spin_unlock(&obd_zombie_impexp_lock
);
1672 obd_zombie_impexp_notify();
1676 * notify import/export destroy thread about new zombie.
1678 static void obd_zombie_impexp_notify(void)
1681 * Make sure obd_zombie_impexp_thread get this notification.
1682 * It is possible this signal only get by obd_zombie_barrier, and
1683 * barrier gulps this notification and sleeps away and hangs ensues
1685 wake_up_all(&obd_zombie_waitq
);
1689 * check whether obd_zombie is idle
1691 static int obd_zombie_is_idle(void)
1695 LASSERT(!test_bit(OBD_ZOMBIE_STOP
, &obd_zombie_flags
));
1696 spin_lock(&obd_zombie_impexp_lock
);
1697 rc
= (zombies_count
== 0);
1698 spin_unlock(&obd_zombie_impexp_lock
);
1703 * wait when obd_zombie import/export queues become empty
1705 void obd_zombie_barrier(void)
1707 struct l_wait_info lwi
= { 0 };
1709 if (obd_zombie_pid
== current_pid())
1710 /* don't wait for myself */
1712 l_wait_event(obd_zombie_waitq
, obd_zombie_is_idle(), &lwi
);
1714 EXPORT_SYMBOL(obd_zombie_barrier
);
1718 * destroy zombie export/import thread.
1720 static int obd_zombie_impexp_thread(void *unused
)
1722 unshare_fs_struct();
1723 complete(&obd_zombie_start
);
1725 obd_zombie_pid
= current_pid();
1727 while (!test_bit(OBD_ZOMBIE_STOP
, &obd_zombie_flags
)) {
1728 struct l_wait_info lwi
= { 0 };
1730 l_wait_event(obd_zombie_waitq
,
1731 !obd_zombie_impexp_check(NULL
), &lwi
);
1732 obd_zombie_impexp_cull();
1735 * Notify obd_zombie_barrier callers that queues
1738 wake_up(&obd_zombie_waitq
);
1741 complete(&obd_zombie_stop
);
1748 * start destroy zombie import/export thread
1750 int obd_zombie_impexp_init(void)
1752 struct task_struct
*task
;
1754 INIT_LIST_HEAD(&obd_zombie_imports
);
1755 INIT_LIST_HEAD(&obd_zombie_exports
);
1756 spin_lock_init(&obd_zombie_impexp_lock
);
1757 init_completion(&obd_zombie_start
);
1758 init_completion(&obd_zombie_stop
);
1759 init_waitqueue_head(&obd_zombie_waitq
);
1762 task
= kthread_run(obd_zombie_impexp_thread
, NULL
, "obd_zombid");
1764 return PTR_ERR(task
);
1766 wait_for_completion(&obd_zombie_start
);
1770 * stop destroy zombie import/export thread
1772 void obd_zombie_impexp_stop(void)
1774 set_bit(OBD_ZOMBIE_STOP
, &obd_zombie_flags
);
1775 obd_zombie_impexp_notify();
1776 wait_for_completion(&obd_zombie_stop
);
1779 /***** Kernel-userspace comm helpers *******/
1781 /* Get length of entire message, including header */
1782 int kuc_len(int payload_len
)
1784 return sizeof(struct kuc_hdr
) + payload_len
;
1786 EXPORT_SYMBOL(kuc_len
);
1788 /* Get a pointer to kuc header, given a ptr to the payload
1789 * @param p Pointer to payload area
1790 * @returns Pointer to kuc header
1792 struct kuc_hdr
*kuc_ptr(void *p
)
1794 struct kuc_hdr
*lh
= ((struct kuc_hdr
*)p
) - 1;
1795 LASSERT(lh
->kuc_magic
== KUC_MAGIC
);
1798 EXPORT_SYMBOL(kuc_ptr
);
1800 /* Test if payload is part of kuc message
1801 * @param p Pointer to payload area
1804 int kuc_ispayload(void *p
)
1806 struct kuc_hdr
*kh
= ((struct kuc_hdr
*)p
) - 1;
1808 if (kh
->kuc_magic
== KUC_MAGIC
)
1813 EXPORT_SYMBOL(kuc_ispayload
);
1815 /* Alloc space for a message, and fill in header
1816 * @return Pointer to payload area
1818 void *kuc_alloc(int payload_len
, int transport
, int type
)
1821 int len
= kuc_len(payload_len
);
1823 lh
= kzalloc(len
, GFP_NOFS
);
1825 return ERR_PTR(-ENOMEM
);
1827 lh
->kuc_magic
= KUC_MAGIC
;
1828 lh
->kuc_transport
= transport
;
1829 lh
->kuc_msgtype
= type
;
1830 lh
->kuc_msglen
= len
;
1832 return (void *)(lh
+ 1);
1834 EXPORT_SYMBOL(kuc_alloc
);
1836 /* Takes pointer to payload area */
1837 inline void kuc_free(void *p
, int payload_len
)
1839 struct kuc_hdr
*lh
= kuc_ptr(p
);
1842 EXPORT_SYMBOL(kuc_free
);