0ca730948f7adbea81ecccba24f18db198e20835
[deliverable/linux.git] / drivers / staging / lustre / lustre / obdclass / genops.c
1 /*
2 * GPL HEADER START
3 *
4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5 *
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
9 *
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License version 2 for more details (a copy is included
14 * in the LICENSE file that accompanied this code).
15 *
16 * You should have received a copy of the GNU General Public License
17 * version 2 along with this program; If not, see
18 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19 *
20 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21 * CA 95054 USA or visit www.sun.com if you need additional information or
22 * have any questions.
23 *
24 * GPL HEADER END
25 */
26 /*
27 * Copyright (c) 1999, 2010, Oracle and/or its affiliates. All rights reserved.
28 * Use is subject to license terms.
29 *
30 * Copyright (c) 2011, 2012, Intel Corporation.
31 */
32 /*
33 * This file is part of Lustre, http://www.lustre.org/
34 * Lustre is a trademark of Sun Microsystems, Inc.
35 *
36 * lustre/obdclass/genops.c
37 *
38 * These are the only exported functions, they provide some generic
39 * infrastructure for managing object devices
40 */
41
42 #define DEBUG_SUBSYSTEM S_CLASS
43 #include "../include/obd_class.h"
44 #include "../include/lprocfs_status.h"
45
46 spinlock_t obd_types_lock;
47
48 static struct kmem_cache *obd_device_cachep;
49 struct kmem_cache *obdo_cachep;
50 EXPORT_SYMBOL(obdo_cachep);
51 static struct kmem_cache *import_cachep;
52
53 static struct list_head obd_zombie_imports;
54 static struct list_head obd_zombie_exports;
55 static spinlock_t obd_zombie_impexp_lock;
56 static void obd_zombie_impexp_notify(void);
57 static void obd_zombie_export_add(struct obd_export *exp);
58 static void obd_zombie_import_add(struct obd_import *imp);
59 static void print_export_data(struct obd_export *exp,
60 const char *status, int locks);
61
62 int (*ptlrpc_put_connection_superhack)(struct ptlrpc_connection *c);
63 EXPORT_SYMBOL(ptlrpc_put_connection_superhack);
64
65 /*
66 * support functions: we could use inter-module communication, but this
67 * is more portable to other OS's
68 */
69 static struct obd_device *obd_device_alloc(void)
70 {
71 struct obd_device *obd;
72
73 OBD_SLAB_ALLOC_PTR_GFP(obd, obd_device_cachep, GFP_NOFS);
74 if (obd != NULL)
75 obd->obd_magic = OBD_DEVICE_MAGIC;
76 return obd;
77 }
78
79 static void obd_device_free(struct obd_device *obd)
80 {
81 LASSERT(obd != NULL);
82 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "obd %p obd_magic %08x != %08x\n",
83 obd, obd->obd_magic, OBD_DEVICE_MAGIC);
84 if (obd->obd_namespace != NULL) {
85 CERROR("obd %p: namespace %p was not properly cleaned up (obd_force=%d)!\n",
86 obd, obd->obd_namespace, obd->obd_force);
87 LBUG();
88 }
89 lu_ref_fini(&obd->obd_reference);
90 OBD_SLAB_FREE_PTR(obd, obd_device_cachep);
91 }
92
93 struct obd_type *class_search_type(const char *name)
94 {
95 struct list_head *tmp;
96 struct obd_type *type;
97
98 spin_lock(&obd_types_lock);
99 list_for_each(tmp, &obd_types) {
100 type = list_entry(tmp, struct obd_type, typ_chain);
101 if (strcmp(type->typ_name, name) == 0) {
102 spin_unlock(&obd_types_lock);
103 return type;
104 }
105 }
106 spin_unlock(&obd_types_lock);
107 return NULL;
108 }
109 EXPORT_SYMBOL(class_search_type);
110
111 struct obd_type *class_get_type(const char *name)
112 {
113 struct obd_type *type = class_search_type(name);
114
115 if (!type) {
116 const char *modname = name;
117
118 if (strcmp(modname, "obdfilter") == 0)
119 modname = "ofd";
120
121 if (strcmp(modname, LUSTRE_LWP_NAME) == 0)
122 modname = LUSTRE_OSP_NAME;
123
124 if (!strncmp(modname, LUSTRE_MDS_NAME, strlen(LUSTRE_MDS_NAME)))
125 modname = LUSTRE_MDT_NAME;
126
127 if (!request_module("%s", modname)) {
128 CDEBUG(D_INFO, "Loaded module '%s'\n", modname);
129 type = class_search_type(name);
130 } else {
131 LCONSOLE_ERROR_MSG(0x158, "Can't load module '%s'\n",
132 modname);
133 }
134 }
135 if (type) {
136 spin_lock(&type->obd_type_lock);
137 type->typ_refcnt++;
138 try_module_get(type->typ_dt_ops->o_owner);
139 spin_unlock(&type->obd_type_lock);
140 }
141 return type;
142 }
143 EXPORT_SYMBOL(class_get_type);
144
145 void class_put_type(struct obd_type *type)
146 {
147 LASSERT(type);
148 spin_lock(&type->obd_type_lock);
149 type->typ_refcnt--;
150 module_put(type->typ_dt_ops->o_owner);
151 spin_unlock(&type->obd_type_lock);
152 }
153 EXPORT_SYMBOL(class_put_type);
154
155 #define CLASS_MAX_NAME 1024
156
157 int class_register_type(struct obd_ops *dt_ops, struct md_ops *md_ops,
158 const char *name,
159 struct lu_device_type *ldt)
160 {
161 struct obd_type *type;
162 int rc = 0;
163
164 /* sanity check */
165 LASSERT(strnlen(name, CLASS_MAX_NAME) < CLASS_MAX_NAME);
166
167 if (class_search_type(name)) {
168 CDEBUG(D_IOCTL, "Type %s already registered\n", name);
169 return -EEXIST;
170 }
171
172 rc = -ENOMEM;
173 type = kzalloc(sizeof(*type), GFP_NOFS);
174 if (!type)
175 return rc;
176
177 type->typ_dt_ops = kzalloc(sizeof(*type->typ_dt_ops), GFP_NOFS);
178 type->typ_md_ops = kzalloc(sizeof(*type->typ_md_ops), GFP_NOFS);
179 type->typ_name = kzalloc(strlen(name) + 1, GFP_NOFS);
180
181 if (type->typ_dt_ops == NULL ||
182 type->typ_md_ops == NULL ||
183 type->typ_name == NULL)
184 goto failed;
185
186 *(type->typ_dt_ops) = *dt_ops;
187 /* md_ops is optional */
188 if (md_ops)
189 *(type->typ_md_ops) = *md_ops;
190 strcpy(type->typ_name, name);
191 spin_lock_init(&type->obd_type_lock);
192
193 type->typ_debugfs_entry = ldebugfs_register(type->typ_name,
194 debugfs_lustre_root,
195 NULL, type);
196 if (IS_ERR_OR_NULL(type->typ_debugfs_entry)) {
197 rc = type->typ_debugfs_entry ? PTR_ERR(type->typ_debugfs_entry)
198 : -ENOMEM;
199 type->typ_debugfs_entry = NULL;
200 goto failed;
201 }
202
203 type->typ_kobj = kobject_create_and_add(type->typ_name, lustre_kobj);
204 if (!type->typ_kobj) {
205 rc = -ENOMEM;
206 goto failed;
207 }
208
209 if (ldt != NULL) {
210 type->typ_lu = ldt;
211 rc = lu_device_type_init(ldt);
212 if (rc != 0)
213 goto failed;
214 }
215
216 spin_lock(&obd_types_lock);
217 list_add(&type->typ_chain, &obd_types);
218 spin_unlock(&obd_types_lock);
219
220 return 0;
221
222 failed:
223 if (type->typ_kobj)
224 kobject_put(type->typ_kobj);
225 kfree(type->typ_name);
226 kfree(type->typ_md_ops);
227 kfree(type->typ_dt_ops);
228 kfree(type);
229 return rc;
230 }
231 EXPORT_SYMBOL(class_register_type);
232
233 int class_unregister_type(const char *name)
234 {
235 struct obd_type *type = class_search_type(name);
236
237 if (!type) {
238 CERROR("unknown obd type\n");
239 return -EINVAL;
240 }
241
242 if (type->typ_refcnt) {
243 CERROR("type %s has refcount (%d)\n", name, type->typ_refcnt);
244 /* This is a bad situation, let's make the best of it */
245 /* Remove ops, but leave the name for debugging */
246 kfree(type->typ_dt_ops);
247 kfree(type->typ_md_ops);
248 return -EBUSY;
249 }
250
251 if (type->typ_kobj)
252 kobject_put(type->typ_kobj);
253
254 if (!IS_ERR_OR_NULL(type->typ_debugfs_entry))
255 ldebugfs_remove(&type->typ_debugfs_entry);
256
257 if (type->typ_lu)
258 lu_device_type_fini(type->typ_lu);
259
260 spin_lock(&obd_types_lock);
261 list_del(&type->typ_chain);
262 spin_unlock(&obd_types_lock);
263 kfree(type->typ_name);
264 kfree(type->typ_dt_ops);
265 kfree(type->typ_md_ops);
266 kfree(type);
267 return 0;
268 } /* class_unregister_type */
269 EXPORT_SYMBOL(class_unregister_type);
270
271 /**
272 * Create a new obd device.
273 *
274 * Find an empty slot in ::obd_devs[], create a new obd device in it.
275 *
276 * \param[in] type_name obd device type string.
277 * \param[in] name obd device name.
278 *
279 * \retval NULL if create fails, otherwise return the obd device
280 * pointer created.
281 */
282 struct obd_device *class_newdev(const char *type_name, const char *name)
283 {
284 struct obd_device *result = NULL;
285 struct obd_device *newdev;
286 struct obd_type *type = NULL;
287 int i;
288 int new_obd_minor = 0;
289
290 if (strlen(name) >= MAX_OBD_NAME) {
291 CERROR("name/uuid must be < %u bytes long\n", MAX_OBD_NAME);
292 return ERR_PTR(-EINVAL);
293 }
294
295 type = class_get_type(type_name);
296 if (type == NULL) {
297 CERROR("OBD: unknown type: %s\n", type_name);
298 return ERR_PTR(-ENODEV);
299 }
300
301 newdev = obd_device_alloc();
302 if (newdev == NULL) {
303 result = ERR_PTR(-ENOMEM);
304 goto out_type;
305 }
306
307 LASSERT(newdev->obd_magic == OBD_DEVICE_MAGIC);
308
309 write_lock(&obd_dev_lock);
310 for (i = 0; i < class_devno_max(); i++) {
311 struct obd_device *obd = class_num2obd(i);
312
313 if (obd && (strcmp(name, obd->obd_name) == 0)) {
314 CERROR("Device %s already exists at %d, won't add\n",
315 name, i);
316 if (result) {
317 LASSERTF(result->obd_magic == OBD_DEVICE_MAGIC,
318 "%p obd_magic %08x != %08x\n", result,
319 result->obd_magic, OBD_DEVICE_MAGIC);
320 LASSERTF(result->obd_minor == new_obd_minor,
321 "%p obd_minor %d != %d\n", result,
322 result->obd_minor, new_obd_minor);
323
324 obd_devs[result->obd_minor] = NULL;
325 result->obd_name[0] = '\0';
326 }
327 result = ERR_PTR(-EEXIST);
328 break;
329 }
330 if (!result && !obd) {
331 result = newdev;
332 result->obd_minor = i;
333 new_obd_minor = i;
334 result->obd_type = type;
335 strncpy(result->obd_name, name,
336 sizeof(result->obd_name) - 1);
337 obd_devs[i] = result;
338 }
339 }
340 write_unlock(&obd_dev_lock);
341
342 if (result == NULL && i >= class_devno_max()) {
343 CERROR("all %u OBD devices used, increase MAX_OBD_DEVICES\n",
344 class_devno_max());
345 result = ERR_PTR(-EOVERFLOW);
346 goto out;
347 }
348
349 if (IS_ERR(result))
350 goto out;
351
352 CDEBUG(D_IOCTL, "Adding new device %s (%p)\n",
353 result->obd_name, result);
354
355 return result;
356 out:
357 obd_device_free(newdev);
358 out_type:
359 class_put_type(type);
360 return result;
361 }
362
363 void class_release_dev(struct obd_device *obd)
364 {
365 struct obd_type *obd_type = obd->obd_type;
366
367 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "%p obd_magic %08x != %08x\n",
368 obd, obd->obd_magic, OBD_DEVICE_MAGIC);
369 LASSERTF(obd == obd_devs[obd->obd_minor], "obd %p != obd_devs[%d] %p\n",
370 obd, obd->obd_minor, obd_devs[obd->obd_minor]);
371 LASSERT(obd_type != NULL);
372
373 CDEBUG(D_INFO, "Release obd device %s at %d obd_type name =%s\n",
374 obd->obd_name, obd->obd_minor, obd->obd_type->typ_name);
375
376 write_lock(&obd_dev_lock);
377 obd_devs[obd->obd_minor] = NULL;
378 write_unlock(&obd_dev_lock);
379 obd_device_free(obd);
380
381 class_put_type(obd_type);
382 }
383
384 int class_name2dev(const char *name)
385 {
386 int i;
387
388 if (!name)
389 return -1;
390
391 read_lock(&obd_dev_lock);
392 for (i = 0; i < class_devno_max(); i++) {
393 struct obd_device *obd = class_num2obd(i);
394
395 if (obd && strcmp(name, obd->obd_name) == 0) {
396 /* Make sure we finished attaching before we give
397 out any references */
398 LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
399 if (obd->obd_attached) {
400 read_unlock(&obd_dev_lock);
401 return i;
402 }
403 break;
404 }
405 }
406 read_unlock(&obd_dev_lock);
407
408 return -1;
409 }
410 EXPORT_SYMBOL(class_name2dev);
411
412 struct obd_device *class_name2obd(const char *name)
413 {
414 int dev = class_name2dev(name);
415
416 if (dev < 0 || dev > class_devno_max())
417 return NULL;
418 return class_num2obd(dev);
419 }
420 EXPORT_SYMBOL(class_name2obd);
421
422 int class_uuid2dev(struct obd_uuid *uuid)
423 {
424 int i;
425
426 read_lock(&obd_dev_lock);
427 for (i = 0; i < class_devno_max(); i++) {
428 struct obd_device *obd = class_num2obd(i);
429
430 if (obd && obd_uuid_equals(uuid, &obd->obd_uuid)) {
431 LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
432 read_unlock(&obd_dev_lock);
433 return i;
434 }
435 }
436 read_unlock(&obd_dev_lock);
437
438 return -1;
439 }
440 EXPORT_SYMBOL(class_uuid2dev);
441
442 struct obd_device *class_uuid2obd(struct obd_uuid *uuid)
443 {
444 int dev = class_uuid2dev(uuid);
445 if (dev < 0)
446 return NULL;
447 return class_num2obd(dev);
448 }
449 EXPORT_SYMBOL(class_uuid2obd);
450
451 /**
452 * Get obd device from ::obd_devs[]
453 *
454 * \param num [in] array index
455 *
456 * \retval NULL if ::obd_devs[\a num] does not contains an obd device
457 * otherwise return the obd device there.
458 */
459 struct obd_device *class_num2obd(int num)
460 {
461 struct obd_device *obd = NULL;
462
463 if (num < class_devno_max()) {
464 obd = obd_devs[num];
465 if (obd == NULL)
466 return NULL;
467
468 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC,
469 "%p obd_magic %08x != %08x\n",
470 obd, obd->obd_magic, OBD_DEVICE_MAGIC);
471 LASSERTF(obd->obd_minor == num,
472 "%p obd_minor %0d != %0d\n",
473 obd, obd->obd_minor, num);
474 }
475
476 return obd;
477 }
478 EXPORT_SYMBOL(class_num2obd);
479
480 /**
481 * Get obd devices count. Device in any
482 * state are counted
483 * \retval obd device count
484 */
485 int get_devices_count(void)
486 {
487 int index, max_index = class_devno_max(), dev_count = 0;
488
489 read_lock(&obd_dev_lock);
490 for (index = 0; index <= max_index; index++) {
491 struct obd_device *obd = class_num2obd(index);
492 if (obd != NULL)
493 dev_count++;
494 }
495 read_unlock(&obd_dev_lock);
496
497 return dev_count;
498 }
499 EXPORT_SYMBOL(get_devices_count);
500
501 void class_obd_list(void)
502 {
503 char *status;
504 int i;
505
506 read_lock(&obd_dev_lock);
507 for (i = 0; i < class_devno_max(); i++) {
508 struct obd_device *obd = class_num2obd(i);
509
510 if (obd == NULL)
511 continue;
512 if (obd->obd_stopping)
513 status = "ST";
514 else if (obd->obd_set_up)
515 status = "UP";
516 else if (obd->obd_attached)
517 status = "AT";
518 else
519 status = "--";
520 LCONSOLE(D_CONFIG, "%3d %s %s %s %s %d\n",
521 i, status, obd->obd_type->typ_name,
522 obd->obd_name, obd->obd_uuid.uuid,
523 atomic_read(&obd->obd_refcount));
524 }
525 read_unlock(&obd_dev_lock);
526 return;
527 }
528
529 /* Search for a client OBD connected to tgt_uuid. If grp_uuid is
530 specified, then only the client with that uuid is returned,
531 otherwise any client connected to the tgt is returned. */
532 struct obd_device *class_find_client_obd(struct obd_uuid *tgt_uuid,
533 const char *typ_name,
534 struct obd_uuid *grp_uuid)
535 {
536 int i;
537
538 read_lock(&obd_dev_lock);
539 for (i = 0; i < class_devno_max(); i++) {
540 struct obd_device *obd = class_num2obd(i);
541
542 if (obd == NULL)
543 continue;
544 if ((strncmp(obd->obd_type->typ_name, typ_name,
545 strlen(typ_name)) == 0)) {
546 if (obd_uuid_equals(tgt_uuid,
547 &obd->u.cli.cl_target_uuid) &&
548 ((grp_uuid)? obd_uuid_equals(grp_uuid,
549 &obd->obd_uuid) : 1)) {
550 read_unlock(&obd_dev_lock);
551 return obd;
552 }
553 }
554 }
555 read_unlock(&obd_dev_lock);
556
557 return NULL;
558 }
559 EXPORT_SYMBOL(class_find_client_obd);
560
561 /* Iterate the obd_device list looking devices have grp_uuid. Start
562 searching at *next, and if a device is found, the next index to look
563 at is saved in *next. If next is NULL, then the first matching device
564 will always be returned. */
565 struct obd_device *class_devices_in_group(struct obd_uuid *grp_uuid, int *next)
566 {
567 int i;
568
569 if (next == NULL)
570 i = 0;
571 else if (*next >= 0 && *next < class_devno_max())
572 i = *next;
573 else
574 return NULL;
575
576 read_lock(&obd_dev_lock);
577 for (; i < class_devno_max(); i++) {
578 struct obd_device *obd = class_num2obd(i);
579
580 if (obd == NULL)
581 continue;
582 if (obd_uuid_equals(grp_uuid, &obd->obd_uuid)) {
583 if (next != NULL)
584 *next = i+1;
585 read_unlock(&obd_dev_lock);
586 return obd;
587 }
588 }
589 read_unlock(&obd_dev_lock);
590
591 return NULL;
592 }
593 EXPORT_SYMBOL(class_devices_in_group);
594
595 /**
596 * to notify sptlrpc log for \a fsname has changed, let every relevant OBD
597 * adjust sptlrpc settings accordingly.
598 */
599 int class_notify_sptlrpc_conf(const char *fsname, int namelen)
600 {
601 struct obd_device *obd;
602 const char *type;
603 int i, rc = 0, rc2;
604
605 LASSERT(namelen > 0);
606
607 read_lock(&obd_dev_lock);
608 for (i = 0; i < class_devno_max(); i++) {
609 obd = class_num2obd(i);
610
611 if (obd == NULL || obd->obd_set_up == 0 || obd->obd_stopping)
612 continue;
613
614 /* only notify mdc, osc, mdt, ost */
615 type = obd->obd_type->typ_name;
616 if (strcmp(type, LUSTRE_MDC_NAME) != 0 &&
617 strcmp(type, LUSTRE_OSC_NAME) != 0 &&
618 strcmp(type, LUSTRE_MDT_NAME) != 0 &&
619 strcmp(type, LUSTRE_OST_NAME) != 0)
620 continue;
621
622 if (strncmp(obd->obd_name, fsname, namelen))
623 continue;
624
625 class_incref(obd, __func__, obd);
626 read_unlock(&obd_dev_lock);
627 rc2 = obd_set_info_async(NULL, obd->obd_self_export,
628 sizeof(KEY_SPTLRPC_CONF),
629 KEY_SPTLRPC_CONF, 0, NULL, NULL);
630 rc = rc ? rc : rc2;
631 class_decref(obd, __func__, obd);
632 read_lock(&obd_dev_lock);
633 }
634 read_unlock(&obd_dev_lock);
635 return rc;
636 }
637 EXPORT_SYMBOL(class_notify_sptlrpc_conf);
638
639 void obd_cleanup_caches(void)
640 {
641 if (obd_device_cachep) {
642 kmem_cache_destroy(obd_device_cachep);
643 obd_device_cachep = NULL;
644 }
645 if (obdo_cachep) {
646 kmem_cache_destroy(obdo_cachep);
647 obdo_cachep = NULL;
648 }
649 if (import_cachep) {
650 kmem_cache_destroy(import_cachep);
651 import_cachep = NULL;
652 }
653 if (capa_cachep) {
654 kmem_cache_destroy(capa_cachep);
655 capa_cachep = NULL;
656 }
657 }
658
659 int obd_init_caches(void)
660 {
661 LASSERT(obd_device_cachep == NULL);
662 obd_device_cachep = kmem_cache_create("ll_obd_dev_cache",
663 sizeof(struct obd_device),
664 0, 0, NULL);
665 if (!obd_device_cachep)
666 goto out;
667
668 LASSERT(obdo_cachep == NULL);
669 obdo_cachep = kmem_cache_create("ll_obdo_cache", sizeof(struct obdo),
670 0, 0, NULL);
671 if (!obdo_cachep)
672 goto out;
673
674 LASSERT(import_cachep == NULL);
675 import_cachep = kmem_cache_create("ll_import_cache",
676 sizeof(struct obd_import),
677 0, 0, NULL);
678 if (!import_cachep)
679 goto out;
680
681 LASSERT(capa_cachep == NULL);
682 capa_cachep = kmem_cache_create("capa_cache",
683 sizeof(struct obd_capa), 0, 0, NULL);
684 if (!capa_cachep)
685 goto out;
686
687 return 0;
688 out:
689 obd_cleanup_caches();
690 return -ENOMEM;
691
692 }
693
694 /* map connection to client */
695 struct obd_export *class_conn2export(struct lustre_handle *conn)
696 {
697 struct obd_export *export;
698
699 if (!conn) {
700 CDEBUG(D_CACHE, "looking for null handle\n");
701 return NULL;
702 }
703
704 if (conn->cookie == -1) { /* this means assign a new connection */
705 CDEBUG(D_CACHE, "want a new connection\n");
706 return NULL;
707 }
708
709 CDEBUG(D_INFO, "looking for export cookie %#llx\n", conn->cookie);
710 export = class_handle2object(conn->cookie);
711 return export;
712 }
713 EXPORT_SYMBOL(class_conn2export);
714
715 struct obd_device *class_exp2obd(struct obd_export *exp)
716 {
717 if (exp)
718 return exp->exp_obd;
719 return NULL;
720 }
721 EXPORT_SYMBOL(class_exp2obd);
722
723 struct obd_device *class_conn2obd(struct lustre_handle *conn)
724 {
725 struct obd_export *export;
726 export = class_conn2export(conn);
727 if (export) {
728 struct obd_device *obd = export->exp_obd;
729 class_export_put(export);
730 return obd;
731 }
732 return NULL;
733 }
734 EXPORT_SYMBOL(class_conn2obd);
735
736 struct obd_import *class_exp2cliimp(struct obd_export *exp)
737 {
738 struct obd_device *obd = exp->exp_obd;
739 if (obd == NULL)
740 return NULL;
741 return obd->u.cli.cl_import;
742 }
743 EXPORT_SYMBOL(class_exp2cliimp);
744
745 struct obd_import *class_conn2cliimp(struct lustre_handle *conn)
746 {
747 struct obd_device *obd = class_conn2obd(conn);
748 if (obd == NULL)
749 return NULL;
750 return obd->u.cli.cl_import;
751 }
752 EXPORT_SYMBOL(class_conn2cliimp);
753
754 /* Export management functions */
755 static void class_export_destroy(struct obd_export *exp)
756 {
757 struct obd_device *obd = exp->exp_obd;
758
759 LASSERT_ATOMIC_ZERO(&exp->exp_refcount);
760 LASSERT(obd != NULL);
761
762 CDEBUG(D_IOCTL, "destroying export %p/%s for %s\n", exp,
763 exp->exp_client_uuid.uuid, obd->obd_name);
764
765 /* "Local" exports (lctl, LOV->{mdc,osc}) have no connection. */
766 if (exp->exp_connection)
767 ptlrpc_put_connection_superhack(exp->exp_connection);
768
769 LASSERT(list_empty(&exp->exp_outstanding_replies));
770 LASSERT(list_empty(&exp->exp_uncommitted_replies));
771 LASSERT(list_empty(&exp->exp_req_replay_queue));
772 LASSERT(list_empty(&exp->exp_hp_rpcs));
773 obd_destroy_export(exp);
774 class_decref(obd, "export", exp);
775
776 OBD_FREE_RCU(exp, sizeof(*exp), &exp->exp_handle);
777 }
778
779 static void export_handle_addref(void *export)
780 {
781 class_export_get(export);
782 }
783
784 static struct portals_handle_ops export_handle_ops = {
785 .hop_addref = export_handle_addref,
786 .hop_free = NULL,
787 };
788
789 struct obd_export *class_export_get(struct obd_export *exp)
790 {
791 atomic_inc(&exp->exp_refcount);
792 CDEBUG(D_INFO, "GETting export %p : new refcount %d\n", exp,
793 atomic_read(&exp->exp_refcount));
794 return exp;
795 }
796 EXPORT_SYMBOL(class_export_get);
797
798 void class_export_put(struct obd_export *exp)
799 {
800 LASSERT(exp != NULL);
801 LASSERT_ATOMIC_GT_LT(&exp->exp_refcount, 0, LI_POISON);
802 CDEBUG(D_INFO, "PUTting export %p : new refcount %d\n", exp,
803 atomic_read(&exp->exp_refcount) - 1);
804
805 if (atomic_dec_and_test(&exp->exp_refcount)) {
806 LASSERT(!list_empty(&exp->exp_obd_chain));
807 CDEBUG(D_IOCTL, "final put %p/%s\n",
808 exp, exp->exp_client_uuid.uuid);
809
810 /* release nid stat refererence */
811 lprocfs_exp_cleanup(exp);
812
813 obd_zombie_export_add(exp);
814 }
815 }
816 EXPORT_SYMBOL(class_export_put);
817
818 /* Creates a new export, adds it to the hash table, and returns a
819 * pointer to it. The refcount is 2: one for the hash reference, and
820 * one for the pointer returned by this function. */
821 struct obd_export *class_new_export(struct obd_device *obd,
822 struct obd_uuid *cluuid)
823 {
824 struct obd_export *export;
825 struct cfs_hash *hash = NULL;
826 int rc = 0;
827
828 export = kzalloc(sizeof(*export), GFP_NOFS);
829 if (!export)
830 return ERR_PTR(-ENOMEM);
831
832 export->exp_conn_cnt = 0;
833 export->exp_lock_hash = NULL;
834 export->exp_flock_hash = NULL;
835 atomic_set(&export->exp_refcount, 2);
836 atomic_set(&export->exp_rpc_count, 0);
837 atomic_set(&export->exp_cb_count, 0);
838 atomic_set(&export->exp_locks_count, 0);
839 #if LUSTRE_TRACKS_LOCK_EXP_REFS
840 INIT_LIST_HEAD(&export->exp_locks_list);
841 spin_lock_init(&export->exp_locks_list_guard);
842 #endif
843 atomic_set(&export->exp_replay_count, 0);
844 export->exp_obd = obd;
845 INIT_LIST_HEAD(&export->exp_outstanding_replies);
846 spin_lock_init(&export->exp_uncommitted_replies_lock);
847 INIT_LIST_HEAD(&export->exp_uncommitted_replies);
848 INIT_LIST_HEAD(&export->exp_req_replay_queue);
849 INIT_LIST_HEAD(&export->exp_handle.h_link);
850 INIT_LIST_HEAD(&export->exp_hp_rpcs);
851 class_handle_hash(&export->exp_handle, &export_handle_ops);
852 export->exp_last_request_time = get_seconds();
853 spin_lock_init(&export->exp_lock);
854 spin_lock_init(&export->exp_rpc_lock);
855 INIT_HLIST_NODE(&export->exp_uuid_hash);
856 INIT_HLIST_NODE(&export->exp_nid_hash);
857 spin_lock_init(&export->exp_bl_list_lock);
858 INIT_LIST_HEAD(&export->exp_bl_list);
859
860 export->exp_sp_peer = LUSTRE_SP_ANY;
861 export->exp_flvr.sf_rpc = SPTLRPC_FLVR_INVALID;
862 export->exp_client_uuid = *cluuid;
863 obd_init_export(export);
864
865 spin_lock(&obd->obd_dev_lock);
866 /* shouldn't happen, but might race */
867 if (obd->obd_stopping) {
868 rc = -ENODEV;
869 goto exit_unlock;
870 }
871
872 hash = cfs_hash_getref(obd->obd_uuid_hash);
873 if (hash == NULL) {
874 rc = -ENODEV;
875 goto exit_unlock;
876 }
877 spin_unlock(&obd->obd_dev_lock);
878
879 if (!obd_uuid_equals(cluuid, &obd->obd_uuid)) {
880 rc = cfs_hash_add_unique(hash, cluuid, &export->exp_uuid_hash);
881 if (rc != 0) {
882 LCONSOLE_WARN("%s: denying duplicate export for %s, %d\n",
883 obd->obd_name, cluuid->uuid, rc);
884 rc = -EALREADY;
885 goto exit_err;
886 }
887 }
888
889 spin_lock(&obd->obd_dev_lock);
890 if (obd->obd_stopping) {
891 cfs_hash_del(hash, cluuid, &export->exp_uuid_hash);
892 rc = -ENODEV;
893 goto exit_unlock;
894 }
895
896 class_incref(obd, "export", export);
897 list_add(&export->exp_obd_chain, &export->exp_obd->obd_exports);
898 list_add_tail(&export->exp_obd_chain_timed,
899 &export->exp_obd->obd_exports_timed);
900 export->exp_obd->obd_num_exports++;
901 spin_unlock(&obd->obd_dev_lock);
902 cfs_hash_putref(hash);
903 return export;
904
905 exit_unlock:
906 spin_unlock(&obd->obd_dev_lock);
907 exit_err:
908 if (hash)
909 cfs_hash_putref(hash);
910 class_handle_unhash(&export->exp_handle);
911 LASSERT(hlist_unhashed(&export->exp_uuid_hash));
912 obd_destroy_export(export);
913 kfree(export);
914 return ERR_PTR(rc);
915 }
916 EXPORT_SYMBOL(class_new_export);
917
918 void class_unlink_export(struct obd_export *exp)
919 {
920 class_handle_unhash(&exp->exp_handle);
921
922 spin_lock(&exp->exp_obd->obd_dev_lock);
923 /* delete an uuid-export hashitem from hashtables */
924 if (!hlist_unhashed(&exp->exp_uuid_hash))
925 cfs_hash_del(exp->exp_obd->obd_uuid_hash,
926 &exp->exp_client_uuid,
927 &exp->exp_uuid_hash);
928
929 list_move(&exp->exp_obd_chain, &exp->exp_obd->obd_unlinked_exports);
930 list_del_init(&exp->exp_obd_chain_timed);
931 exp->exp_obd->obd_num_exports--;
932 spin_unlock(&exp->exp_obd->obd_dev_lock);
933 class_export_put(exp);
934 }
935 EXPORT_SYMBOL(class_unlink_export);
936
937 /* Import management functions */
938 static void class_import_destroy(struct obd_import *imp)
939 {
940 CDEBUG(D_IOCTL, "destroying import %p for %s\n", imp,
941 imp->imp_obd->obd_name);
942
943 LASSERT_ATOMIC_ZERO(&imp->imp_refcount);
944
945 ptlrpc_put_connection_superhack(imp->imp_connection);
946
947 while (!list_empty(&imp->imp_conn_list)) {
948 struct obd_import_conn *imp_conn;
949
950 imp_conn = list_entry(imp->imp_conn_list.next,
951 struct obd_import_conn, oic_item);
952 list_del_init(&imp_conn->oic_item);
953 ptlrpc_put_connection_superhack(imp_conn->oic_conn);
954 kfree(imp_conn);
955 }
956
957 LASSERT(imp->imp_sec == NULL);
958 class_decref(imp->imp_obd, "import", imp);
959 OBD_FREE_RCU(imp, sizeof(*imp), &imp->imp_handle);
960 }
961
962 static void import_handle_addref(void *import)
963 {
964 class_import_get(import);
965 }
966
967 static struct portals_handle_ops import_handle_ops = {
968 .hop_addref = import_handle_addref,
969 .hop_free = NULL,
970 };
971
972 struct obd_import *class_import_get(struct obd_import *import)
973 {
974 atomic_inc(&import->imp_refcount);
975 CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", import,
976 atomic_read(&import->imp_refcount),
977 import->imp_obd->obd_name);
978 return import;
979 }
980 EXPORT_SYMBOL(class_import_get);
981
982 void class_import_put(struct obd_import *imp)
983 {
984 LASSERT(list_empty(&imp->imp_zombie_chain));
985 LASSERT_ATOMIC_GT_LT(&imp->imp_refcount, 0, LI_POISON);
986
987 CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", imp,
988 atomic_read(&imp->imp_refcount) - 1,
989 imp->imp_obd->obd_name);
990
991 if (atomic_dec_and_test(&imp->imp_refcount)) {
992 CDEBUG(D_INFO, "final put import %p\n", imp);
993 obd_zombie_import_add(imp);
994 }
995
996 /* catch possible import put race */
997 LASSERT_ATOMIC_GE_LT(&imp->imp_refcount, 0, LI_POISON);
998 }
999 EXPORT_SYMBOL(class_import_put);
1000
1001 static void init_imp_at(struct imp_at *at)
1002 {
1003 int i;
1004 at_init(&at->iat_net_latency, 0, 0);
1005 for (i = 0; i < IMP_AT_MAX_PORTALS; i++) {
1006 /* max service estimates are tracked on the server side, so
1007 don't use the AT history here, just use the last reported
1008 val. (But keep hist for proc histogram, worst_ever) */
1009 at_init(&at->iat_service_estimate[i], INITIAL_CONNECT_TIMEOUT,
1010 AT_FLG_NOHIST);
1011 }
1012 }
1013
1014 struct obd_import *class_new_import(struct obd_device *obd)
1015 {
1016 struct obd_import *imp;
1017
1018 imp = kzalloc(sizeof(*imp), GFP_NOFS);
1019 if (!imp)
1020 return NULL;
1021
1022 INIT_LIST_HEAD(&imp->imp_pinger_chain);
1023 INIT_LIST_HEAD(&imp->imp_zombie_chain);
1024 INIT_LIST_HEAD(&imp->imp_replay_list);
1025 INIT_LIST_HEAD(&imp->imp_sending_list);
1026 INIT_LIST_HEAD(&imp->imp_delayed_list);
1027 INIT_LIST_HEAD(&imp->imp_committed_list);
1028 imp->imp_replay_cursor = &imp->imp_committed_list;
1029 spin_lock_init(&imp->imp_lock);
1030 imp->imp_last_success_conn = 0;
1031 imp->imp_state = LUSTRE_IMP_NEW;
1032 imp->imp_obd = class_incref(obd, "import", imp);
1033 mutex_init(&imp->imp_sec_mutex);
1034 init_waitqueue_head(&imp->imp_recovery_waitq);
1035
1036 atomic_set(&imp->imp_refcount, 2);
1037 atomic_set(&imp->imp_unregistering, 0);
1038 atomic_set(&imp->imp_inflight, 0);
1039 atomic_set(&imp->imp_replay_inflight, 0);
1040 atomic_set(&imp->imp_inval_count, 0);
1041 INIT_LIST_HEAD(&imp->imp_conn_list);
1042 INIT_LIST_HEAD(&imp->imp_handle.h_link);
1043 class_handle_hash(&imp->imp_handle, &import_handle_ops);
1044 init_imp_at(&imp->imp_at);
1045
1046 /* the default magic is V2, will be used in connect RPC, and
1047 * then adjusted according to the flags in request/reply. */
1048 imp->imp_msg_magic = LUSTRE_MSG_MAGIC_V2;
1049
1050 return imp;
1051 }
1052 EXPORT_SYMBOL(class_new_import);
1053
1054 void class_destroy_import(struct obd_import *import)
1055 {
1056 LASSERT(import != NULL);
1057 LASSERT(import != LP_POISON);
1058
1059 class_handle_unhash(&import->imp_handle);
1060
1061 spin_lock(&import->imp_lock);
1062 import->imp_generation++;
1063 spin_unlock(&import->imp_lock);
1064 class_import_put(import);
1065 }
1066 EXPORT_SYMBOL(class_destroy_import);
1067
1068 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1069
1070 void __class_export_add_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1071 {
1072 spin_lock(&exp->exp_locks_list_guard);
1073
1074 LASSERT(lock->l_exp_refs_nr >= 0);
1075
1076 if (lock->l_exp_refs_target != NULL &&
1077 lock->l_exp_refs_target != exp) {
1078 LCONSOLE_WARN("setting export %p for lock %p which already has export %p\n",
1079 exp, lock, lock->l_exp_refs_target);
1080 }
1081 if ((lock->l_exp_refs_nr ++) == 0) {
1082 list_add(&lock->l_exp_refs_link, &exp->exp_locks_list);
1083 lock->l_exp_refs_target = exp;
1084 }
1085 CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1086 lock, exp, lock->l_exp_refs_nr);
1087 spin_unlock(&exp->exp_locks_list_guard);
1088 }
1089 EXPORT_SYMBOL(__class_export_add_lock_ref);
1090
1091 void __class_export_del_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1092 {
1093 spin_lock(&exp->exp_locks_list_guard);
1094 LASSERT(lock->l_exp_refs_nr > 0);
1095 if (lock->l_exp_refs_target != exp) {
1096 LCONSOLE_WARN("lock %p, mismatching export pointers: %p, %p\n",
1097 lock, lock->l_exp_refs_target, exp);
1098 }
1099 if (-- lock->l_exp_refs_nr == 0) {
1100 list_del_init(&lock->l_exp_refs_link);
1101 lock->l_exp_refs_target = NULL;
1102 }
1103 CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1104 lock, exp, lock->l_exp_refs_nr);
1105 spin_unlock(&exp->exp_locks_list_guard);
1106 }
1107 EXPORT_SYMBOL(__class_export_del_lock_ref);
1108 #endif
1109
1110 /* A connection defines an export context in which preallocation can
1111 be managed. This releases the export pointer reference, and returns
1112 the export handle, so the export refcount is 1 when this function
1113 returns. */
1114 int class_connect(struct lustre_handle *conn, struct obd_device *obd,
1115 struct obd_uuid *cluuid)
1116 {
1117 struct obd_export *export;
1118 LASSERT(conn != NULL);
1119 LASSERT(obd != NULL);
1120 LASSERT(cluuid != NULL);
1121
1122 export = class_new_export(obd, cluuid);
1123 if (IS_ERR(export))
1124 return PTR_ERR(export);
1125
1126 conn->cookie = export->exp_handle.h_cookie;
1127 class_export_put(export);
1128
1129 CDEBUG(D_IOCTL, "connect: client %s, cookie %#llx\n",
1130 cluuid->uuid, conn->cookie);
1131 return 0;
1132 }
1133 EXPORT_SYMBOL(class_connect);
1134
1135 /* if export is involved in recovery then clean up related things */
1136 static void class_export_recovery_cleanup(struct obd_export *exp)
1137 {
1138 struct obd_device *obd = exp->exp_obd;
1139
1140 spin_lock(&obd->obd_recovery_task_lock);
1141 if (exp->exp_delayed)
1142 obd->obd_delayed_clients--;
1143 if (obd->obd_recovering) {
1144 if (exp->exp_in_recovery) {
1145 spin_lock(&exp->exp_lock);
1146 exp->exp_in_recovery = 0;
1147 spin_unlock(&exp->exp_lock);
1148 LASSERT_ATOMIC_POS(&obd->obd_connected_clients);
1149 atomic_dec(&obd->obd_connected_clients);
1150 }
1151
1152 /* if called during recovery then should update
1153 * obd_stale_clients counter,
1154 * lightweight exports are not counted */
1155 if (exp->exp_failed &&
1156 (exp_connect_flags(exp) & OBD_CONNECT_LIGHTWEIGHT) == 0)
1157 exp->exp_obd->obd_stale_clients++;
1158 }
1159 spin_unlock(&obd->obd_recovery_task_lock);
1160
1161 spin_lock(&exp->exp_lock);
1162 /** Cleanup req replay fields */
1163 if (exp->exp_req_replay_needed) {
1164 exp->exp_req_replay_needed = 0;
1165
1166 LASSERT(atomic_read(&obd->obd_req_replay_clients));
1167 atomic_dec(&obd->obd_req_replay_clients);
1168 }
1169
1170 /** Cleanup lock replay data */
1171 if (exp->exp_lock_replay_needed) {
1172 exp->exp_lock_replay_needed = 0;
1173
1174 LASSERT(atomic_read(&obd->obd_lock_replay_clients));
1175 atomic_dec(&obd->obd_lock_replay_clients);
1176 }
1177 spin_unlock(&exp->exp_lock);
1178 }
1179
1180 /* This function removes 1-3 references from the export:
1181 * 1 - for export pointer passed
1182 * and if disconnect really need
1183 * 2 - removing from hash
1184 * 3 - in client_unlink_export
1185 * The export pointer passed to this function can destroyed */
1186 int class_disconnect(struct obd_export *export)
1187 {
1188 int already_disconnected;
1189
1190 if (export == NULL) {
1191 CWARN("attempting to free NULL export %p\n", export);
1192 return -EINVAL;
1193 }
1194
1195 spin_lock(&export->exp_lock);
1196 already_disconnected = export->exp_disconnected;
1197 export->exp_disconnected = 1;
1198 spin_unlock(&export->exp_lock);
1199
1200 /* class_cleanup(), abort_recovery(), and class_fail_export()
1201 * all end up in here, and if any of them race we shouldn't
1202 * call extra class_export_puts(). */
1203 if (already_disconnected) {
1204 LASSERT(hlist_unhashed(&export->exp_nid_hash));
1205 goto no_disconn;
1206 }
1207
1208 CDEBUG(D_IOCTL, "disconnect: cookie %#llx\n",
1209 export->exp_handle.h_cookie);
1210
1211 if (!hlist_unhashed(&export->exp_nid_hash))
1212 cfs_hash_del(export->exp_obd->obd_nid_hash,
1213 &export->exp_connection->c_peer.nid,
1214 &export->exp_nid_hash);
1215
1216 class_export_recovery_cleanup(export);
1217 class_unlink_export(export);
1218 no_disconn:
1219 class_export_put(export);
1220 return 0;
1221 }
1222 EXPORT_SYMBOL(class_disconnect);
1223
1224 /* Return non-zero for a fully connected export */
1225 int class_connected_export(struct obd_export *exp)
1226 {
1227 if (exp) {
1228 int connected;
1229 spin_lock(&exp->exp_lock);
1230 connected = exp->exp_conn_cnt > 0;
1231 spin_unlock(&exp->exp_lock);
1232 return connected;
1233 }
1234 return 0;
1235 }
1236 EXPORT_SYMBOL(class_connected_export);
1237
1238 static void class_disconnect_export_list(struct list_head *list,
1239 enum obd_option flags)
1240 {
1241 int rc;
1242 struct obd_export *exp;
1243
1244 /* It's possible that an export may disconnect itself, but
1245 * nothing else will be added to this list. */
1246 while (!list_empty(list)) {
1247 exp = list_entry(list->next, struct obd_export,
1248 exp_obd_chain);
1249 /* need for safe call CDEBUG after obd_disconnect */
1250 class_export_get(exp);
1251
1252 spin_lock(&exp->exp_lock);
1253 exp->exp_flags = flags;
1254 spin_unlock(&exp->exp_lock);
1255
1256 if (obd_uuid_equals(&exp->exp_client_uuid,
1257 &exp->exp_obd->obd_uuid)) {
1258 CDEBUG(D_HA,
1259 "exp %p export uuid == obd uuid, don't discon\n",
1260 exp);
1261 /* Need to delete this now so we don't end up pointing
1262 * to work_list later when this export is cleaned up. */
1263 list_del_init(&exp->exp_obd_chain);
1264 class_export_put(exp);
1265 continue;
1266 }
1267
1268 class_export_get(exp);
1269 CDEBUG(D_HA, "%s: disconnecting export at %s (%p), last request at " CFS_TIME_T "\n",
1270 exp->exp_obd->obd_name, obd_export_nid2str(exp),
1271 exp, exp->exp_last_request_time);
1272 /* release one export reference anyway */
1273 rc = obd_disconnect(exp);
1274
1275 CDEBUG(D_HA, "disconnected export at %s (%p): rc %d\n",
1276 obd_export_nid2str(exp), exp, rc);
1277 class_export_put(exp);
1278 }
1279 }
1280
1281 void class_disconnect_exports(struct obd_device *obd)
1282 {
1283 struct list_head work_list;
1284
1285 /* Move all of the exports from obd_exports to a work list, en masse. */
1286 INIT_LIST_HEAD(&work_list);
1287 spin_lock(&obd->obd_dev_lock);
1288 list_splice_init(&obd->obd_exports, &work_list);
1289 list_splice_init(&obd->obd_delayed_exports, &work_list);
1290 spin_unlock(&obd->obd_dev_lock);
1291
1292 if (!list_empty(&work_list)) {
1293 CDEBUG(D_HA, "OBD device %d (%p) has exports, disconnecting them\n",
1294 obd->obd_minor, obd);
1295 class_disconnect_export_list(&work_list,
1296 exp_flags_from_obd(obd));
1297 } else
1298 CDEBUG(D_HA, "OBD device %d (%p) has no exports\n",
1299 obd->obd_minor, obd);
1300 }
1301 EXPORT_SYMBOL(class_disconnect_exports);
1302
1303 /* Remove exports that have not completed recovery.
1304 */
1305 void class_disconnect_stale_exports(struct obd_device *obd,
1306 int (*test_export)(struct obd_export *))
1307 {
1308 struct list_head work_list;
1309 struct obd_export *exp, *n;
1310 int evicted = 0;
1311
1312 INIT_LIST_HEAD(&work_list);
1313 spin_lock(&obd->obd_dev_lock);
1314 list_for_each_entry_safe(exp, n, &obd->obd_exports,
1315 exp_obd_chain) {
1316 /* don't count self-export as client */
1317 if (obd_uuid_equals(&exp->exp_client_uuid,
1318 &exp->exp_obd->obd_uuid))
1319 continue;
1320
1321 /* don't evict clients which have no slot in last_rcvd
1322 * (e.g. lightweight connection) */
1323 if (exp->exp_target_data.ted_lr_idx == -1)
1324 continue;
1325
1326 spin_lock(&exp->exp_lock);
1327 if (exp->exp_failed || test_export(exp)) {
1328 spin_unlock(&exp->exp_lock);
1329 continue;
1330 }
1331 exp->exp_failed = 1;
1332 spin_unlock(&exp->exp_lock);
1333
1334 list_move(&exp->exp_obd_chain, &work_list);
1335 evicted++;
1336 CDEBUG(D_HA, "%s: disconnect stale client %s@%s\n",
1337 obd->obd_name, exp->exp_client_uuid.uuid,
1338 exp->exp_connection == NULL ? "<unknown>" :
1339 libcfs_nid2str(exp->exp_connection->c_peer.nid));
1340 print_export_data(exp, "EVICTING", 0);
1341 }
1342 spin_unlock(&obd->obd_dev_lock);
1343
1344 if (evicted)
1345 LCONSOLE_WARN("%s: disconnecting %d stale clients\n",
1346 obd->obd_name, evicted);
1347
1348 class_disconnect_export_list(&work_list, exp_flags_from_obd(obd) |
1349 OBD_OPT_ABORT_RECOV);
1350 }
1351 EXPORT_SYMBOL(class_disconnect_stale_exports);
1352
1353 void class_fail_export(struct obd_export *exp)
1354 {
1355 int rc, already_failed;
1356
1357 spin_lock(&exp->exp_lock);
1358 already_failed = exp->exp_failed;
1359 exp->exp_failed = 1;
1360 spin_unlock(&exp->exp_lock);
1361
1362 if (already_failed) {
1363 CDEBUG(D_HA, "disconnecting dead export %p/%s; skipping\n",
1364 exp, exp->exp_client_uuid.uuid);
1365 return;
1366 }
1367
1368 CDEBUG(D_HA, "disconnecting export %p/%s\n",
1369 exp, exp->exp_client_uuid.uuid);
1370
1371 if (obd_dump_on_timeout)
1372 libcfs_debug_dumplog();
1373
1374 /* need for safe call CDEBUG after obd_disconnect */
1375 class_export_get(exp);
1376
1377 /* Most callers into obd_disconnect are removing their own reference
1378 * (request, for example) in addition to the one from the hash table.
1379 * We don't have such a reference here, so make one. */
1380 class_export_get(exp);
1381 rc = obd_disconnect(exp);
1382 if (rc)
1383 CERROR("disconnecting export %p failed: %d\n", exp, rc);
1384 else
1385 CDEBUG(D_HA, "disconnected export %p/%s\n",
1386 exp, exp->exp_client_uuid.uuid);
1387 class_export_put(exp);
1388 }
1389 EXPORT_SYMBOL(class_fail_export);
1390
1391 char *obd_export_nid2str(struct obd_export *exp)
1392 {
1393 if (exp->exp_connection != NULL)
1394 return libcfs_nid2str(exp->exp_connection->c_peer.nid);
1395
1396 return "(no nid)";
1397 }
1398 EXPORT_SYMBOL(obd_export_nid2str);
1399
1400 int obd_export_evict_by_nid(struct obd_device *obd, const char *nid)
1401 {
1402 struct cfs_hash *nid_hash;
1403 struct obd_export *doomed_exp = NULL;
1404 int exports_evicted = 0;
1405
1406 lnet_nid_t nid_key = libcfs_str2nid((char *)nid);
1407
1408 spin_lock(&obd->obd_dev_lock);
1409 /* umount has run already, so evict thread should leave
1410 * its task to umount thread now */
1411 if (obd->obd_stopping) {
1412 spin_unlock(&obd->obd_dev_lock);
1413 return exports_evicted;
1414 }
1415 nid_hash = obd->obd_nid_hash;
1416 cfs_hash_getref(nid_hash);
1417 spin_unlock(&obd->obd_dev_lock);
1418
1419 do {
1420 doomed_exp = cfs_hash_lookup(nid_hash, &nid_key);
1421 if (doomed_exp == NULL)
1422 break;
1423
1424 LASSERTF(doomed_exp->exp_connection->c_peer.nid == nid_key,
1425 "nid %s found, wanted nid %s, requested nid %s\n",
1426 obd_export_nid2str(doomed_exp),
1427 libcfs_nid2str(nid_key), nid);
1428 LASSERTF(doomed_exp != obd->obd_self_export,
1429 "self-export is hashed by NID?\n");
1430 exports_evicted++;
1431 LCONSOLE_WARN("%s: evicting %s (at %s) by administrative request\n",
1432 obd->obd_name,
1433 obd_uuid2str(&doomed_exp->exp_client_uuid),
1434 obd_export_nid2str(doomed_exp));
1435 class_fail_export(doomed_exp);
1436 class_export_put(doomed_exp);
1437 } while (1);
1438
1439 cfs_hash_putref(nid_hash);
1440
1441 if (!exports_evicted)
1442 CDEBUG(D_HA,
1443 "%s: can't disconnect NID '%s': no exports found\n",
1444 obd->obd_name, nid);
1445 return exports_evicted;
1446 }
1447 EXPORT_SYMBOL(obd_export_evict_by_nid);
1448
1449 int obd_export_evict_by_uuid(struct obd_device *obd, const char *uuid)
1450 {
1451 struct cfs_hash *uuid_hash;
1452 struct obd_export *doomed_exp = NULL;
1453 struct obd_uuid doomed_uuid;
1454 int exports_evicted = 0;
1455
1456 spin_lock(&obd->obd_dev_lock);
1457 if (obd->obd_stopping) {
1458 spin_unlock(&obd->obd_dev_lock);
1459 return exports_evicted;
1460 }
1461 uuid_hash = obd->obd_uuid_hash;
1462 cfs_hash_getref(uuid_hash);
1463 spin_unlock(&obd->obd_dev_lock);
1464
1465 obd_str2uuid(&doomed_uuid, uuid);
1466 if (obd_uuid_equals(&doomed_uuid, &obd->obd_uuid)) {
1467 CERROR("%s: can't evict myself\n", obd->obd_name);
1468 cfs_hash_putref(uuid_hash);
1469 return exports_evicted;
1470 }
1471
1472 doomed_exp = cfs_hash_lookup(uuid_hash, &doomed_uuid);
1473
1474 if (doomed_exp == NULL) {
1475 CERROR("%s: can't disconnect %s: no exports found\n",
1476 obd->obd_name, uuid);
1477 } else {
1478 CWARN("%s: evicting %s at administrative request\n",
1479 obd->obd_name, doomed_exp->exp_client_uuid.uuid);
1480 class_fail_export(doomed_exp);
1481 class_export_put(doomed_exp);
1482 exports_evicted++;
1483 }
1484 cfs_hash_putref(uuid_hash);
1485
1486 return exports_evicted;
1487 }
1488 EXPORT_SYMBOL(obd_export_evict_by_uuid);
1489
1490 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1491 void (*class_export_dump_hook)(struct obd_export*) = NULL;
1492 EXPORT_SYMBOL(class_export_dump_hook);
1493 #endif
1494
1495 static void print_export_data(struct obd_export *exp, const char *status,
1496 int locks)
1497 {
1498 struct ptlrpc_reply_state *rs;
1499 struct ptlrpc_reply_state *first_reply = NULL;
1500 int nreplies = 0;
1501
1502 spin_lock(&exp->exp_lock);
1503 list_for_each_entry(rs, &exp->exp_outstanding_replies,
1504 rs_exp_list) {
1505 if (nreplies == 0)
1506 first_reply = rs;
1507 nreplies++;
1508 }
1509 spin_unlock(&exp->exp_lock);
1510
1511 CDEBUG(D_HA, "%s: %s %p %s %s %d (%d %d %d) %d %d %d %d: %p %s %llu\n",
1512 exp->exp_obd->obd_name, status, exp, exp->exp_client_uuid.uuid,
1513 obd_export_nid2str(exp), atomic_read(&exp->exp_refcount),
1514 atomic_read(&exp->exp_rpc_count),
1515 atomic_read(&exp->exp_cb_count),
1516 atomic_read(&exp->exp_locks_count),
1517 exp->exp_disconnected, exp->exp_delayed, exp->exp_failed,
1518 nreplies, first_reply, nreplies > 3 ? "..." : "",
1519 exp->exp_last_committed);
1520 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1521 if (locks && class_export_dump_hook != NULL)
1522 class_export_dump_hook(exp);
1523 #endif
1524 }
1525
1526 void dump_exports(struct obd_device *obd, int locks)
1527 {
1528 struct obd_export *exp;
1529
1530 spin_lock(&obd->obd_dev_lock);
1531 list_for_each_entry(exp, &obd->obd_exports, exp_obd_chain)
1532 print_export_data(exp, "ACTIVE", locks);
1533 list_for_each_entry(exp, &obd->obd_unlinked_exports, exp_obd_chain)
1534 print_export_data(exp, "UNLINKED", locks);
1535 list_for_each_entry(exp, &obd->obd_delayed_exports, exp_obd_chain)
1536 print_export_data(exp, "DELAYED", locks);
1537 spin_unlock(&obd->obd_dev_lock);
1538 spin_lock(&obd_zombie_impexp_lock);
1539 list_for_each_entry(exp, &obd_zombie_exports, exp_obd_chain)
1540 print_export_data(exp, "ZOMBIE", locks);
1541 spin_unlock(&obd_zombie_impexp_lock);
1542 }
1543 EXPORT_SYMBOL(dump_exports);
1544
1545 void obd_exports_barrier(struct obd_device *obd)
1546 {
1547 int waited = 2;
1548 LASSERT(list_empty(&obd->obd_exports));
1549 spin_lock(&obd->obd_dev_lock);
1550 while (!list_empty(&obd->obd_unlinked_exports)) {
1551 spin_unlock(&obd->obd_dev_lock);
1552 set_current_state(TASK_UNINTERRUPTIBLE);
1553 schedule_timeout(cfs_time_seconds(waited));
1554 if (waited > 5 && IS_PO2(waited)) {
1555 LCONSOLE_WARN("%s is waiting for obd_unlinked_exports more than %d seconds. The obd refcount = %d. Is it stuck?\n",
1556 obd->obd_name, waited,
1557 atomic_read(&obd->obd_refcount));
1558 dump_exports(obd, 1);
1559 }
1560 waited *= 2;
1561 spin_lock(&obd->obd_dev_lock);
1562 }
1563 spin_unlock(&obd->obd_dev_lock);
1564 }
1565 EXPORT_SYMBOL(obd_exports_barrier);
1566
1567 /* Total amount of zombies to be destroyed */
1568 static int zombies_count;
1569
1570 /**
1571 * kill zombie imports and exports
1572 */
1573 void obd_zombie_impexp_cull(void)
1574 {
1575 struct obd_import *import;
1576 struct obd_export *export;
1577
1578 do {
1579 spin_lock(&obd_zombie_impexp_lock);
1580
1581 import = NULL;
1582 if (!list_empty(&obd_zombie_imports)) {
1583 import = list_entry(obd_zombie_imports.next,
1584 struct obd_import,
1585 imp_zombie_chain);
1586 list_del_init(&import->imp_zombie_chain);
1587 }
1588
1589 export = NULL;
1590 if (!list_empty(&obd_zombie_exports)) {
1591 export = list_entry(obd_zombie_exports.next,
1592 struct obd_export,
1593 exp_obd_chain);
1594 list_del_init(&export->exp_obd_chain);
1595 }
1596
1597 spin_unlock(&obd_zombie_impexp_lock);
1598
1599 if (import != NULL) {
1600 class_import_destroy(import);
1601 spin_lock(&obd_zombie_impexp_lock);
1602 zombies_count--;
1603 spin_unlock(&obd_zombie_impexp_lock);
1604 }
1605
1606 if (export != NULL) {
1607 class_export_destroy(export);
1608 spin_lock(&obd_zombie_impexp_lock);
1609 zombies_count--;
1610 spin_unlock(&obd_zombie_impexp_lock);
1611 }
1612
1613 cond_resched();
1614 } while (import != NULL || export != NULL);
1615 }
1616
1617 static struct completion obd_zombie_start;
1618 static struct completion obd_zombie_stop;
1619 static unsigned long obd_zombie_flags;
1620 static wait_queue_head_t obd_zombie_waitq;
1621 static pid_t obd_zombie_pid;
1622
1623 enum {
1624 OBD_ZOMBIE_STOP = 0x0001,
1625 };
1626
1627 /**
1628 * check for work for kill zombie import/export thread.
1629 */
1630 static int obd_zombie_impexp_check(void *arg)
1631 {
1632 int rc;
1633
1634 spin_lock(&obd_zombie_impexp_lock);
1635 rc = (zombies_count == 0) &&
1636 !test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags);
1637 spin_unlock(&obd_zombie_impexp_lock);
1638
1639 return rc;
1640 }
1641
1642 /**
1643 * Add export to the obd_zombie thread and notify it.
1644 */
1645 static void obd_zombie_export_add(struct obd_export *exp)
1646 {
1647 spin_lock(&exp->exp_obd->obd_dev_lock);
1648 LASSERT(!list_empty(&exp->exp_obd_chain));
1649 list_del_init(&exp->exp_obd_chain);
1650 spin_unlock(&exp->exp_obd->obd_dev_lock);
1651 spin_lock(&obd_zombie_impexp_lock);
1652 zombies_count++;
1653 list_add(&exp->exp_obd_chain, &obd_zombie_exports);
1654 spin_unlock(&obd_zombie_impexp_lock);
1655
1656 obd_zombie_impexp_notify();
1657 }
1658
1659 /**
1660 * Add import to the obd_zombie thread and notify it.
1661 */
1662 static void obd_zombie_import_add(struct obd_import *imp)
1663 {
1664 LASSERT(imp->imp_sec == NULL);
1665 LASSERT(imp->imp_rq_pool == NULL);
1666 spin_lock(&obd_zombie_impexp_lock);
1667 LASSERT(list_empty(&imp->imp_zombie_chain));
1668 zombies_count++;
1669 list_add(&imp->imp_zombie_chain, &obd_zombie_imports);
1670 spin_unlock(&obd_zombie_impexp_lock);
1671
1672 obd_zombie_impexp_notify();
1673 }
1674
1675 /**
1676 * notify import/export destroy thread about new zombie.
1677 */
1678 static void obd_zombie_impexp_notify(void)
1679 {
1680 /*
1681 * Make sure obd_zombie_impexp_thread get this notification.
1682 * It is possible this signal only get by obd_zombie_barrier, and
1683 * barrier gulps this notification and sleeps away and hangs ensues
1684 */
1685 wake_up_all(&obd_zombie_waitq);
1686 }
1687
1688 /**
1689 * check whether obd_zombie is idle
1690 */
1691 static int obd_zombie_is_idle(void)
1692 {
1693 int rc;
1694
1695 LASSERT(!test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags));
1696 spin_lock(&obd_zombie_impexp_lock);
1697 rc = (zombies_count == 0);
1698 spin_unlock(&obd_zombie_impexp_lock);
1699 return rc;
1700 }
1701
1702 /**
1703 * wait when obd_zombie import/export queues become empty
1704 */
1705 void obd_zombie_barrier(void)
1706 {
1707 struct l_wait_info lwi = { 0 };
1708
1709 if (obd_zombie_pid == current_pid())
1710 /* don't wait for myself */
1711 return;
1712 l_wait_event(obd_zombie_waitq, obd_zombie_is_idle(), &lwi);
1713 }
1714 EXPORT_SYMBOL(obd_zombie_barrier);
1715
1716
1717 /**
1718 * destroy zombie export/import thread.
1719 */
1720 static int obd_zombie_impexp_thread(void *unused)
1721 {
1722 unshare_fs_struct();
1723 complete(&obd_zombie_start);
1724
1725 obd_zombie_pid = current_pid();
1726
1727 while (!test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags)) {
1728 struct l_wait_info lwi = { 0 };
1729
1730 l_wait_event(obd_zombie_waitq,
1731 !obd_zombie_impexp_check(NULL), &lwi);
1732 obd_zombie_impexp_cull();
1733
1734 /*
1735 * Notify obd_zombie_barrier callers that queues
1736 * may be empty.
1737 */
1738 wake_up(&obd_zombie_waitq);
1739 }
1740
1741 complete(&obd_zombie_stop);
1742
1743 return 0;
1744 }
1745
1746
1747 /**
1748 * start destroy zombie import/export thread
1749 */
1750 int obd_zombie_impexp_init(void)
1751 {
1752 struct task_struct *task;
1753
1754 INIT_LIST_HEAD(&obd_zombie_imports);
1755 INIT_LIST_HEAD(&obd_zombie_exports);
1756 spin_lock_init(&obd_zombie_impexp_lock);
1757 init_completion(&obd_zombie_start);
1758 init_completion(&obd_zombie_stop);
1759 init_waitqueue_head(&obd_zombie_waitq);
1760 obd_zombie_pid = 0;
1761
1762 task = kthread_run(obd_zombie_impexp_thread, NULL, "obd_zombid");
1763 if (IS_ERR(task))
1764 return PTR_ERR(task);
1765
1766 wait_for_completion(&obd_zombie_start);
1767 return 0;
1768 }
1769 /**
1770 * stop destroy zombie import/export thread
1771 */
1772 void obd_zombie_impexp_stop(void)
1773 {
1774 set_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags);
1775 obd_zombie_impexp_notify();
1776 wait_for_completion(&obd_zombie_stop);
1777 }
1778
1779 /***** Kernel-userspace comm helpers *******/
1780
1781 /* Get length of entire message, including header */
1782 int kuc_len(int payload_len)
1783 {
1784 return sizeof(struct kuc_hdr) + payload_len;
1785 }
1786 EXPORT_SYMBOL(kuc_len);
1787
1788 /* Get a pointer to kuc header, given a ptr to the payload
1789 * @param p Pointer to payload area
1790 * @returns Pointer to kuc header
1791 */
1792 struct kuc_hdr *kuc_ptr(void *p)
1793 {
1794 struct kuc_hdr *lh = ((struct kuc_hdr *)p) - 1;
1795 LASSERT(lh->kuc_magic == KUC_MAGIC);
1796 return lh;
1797 }
1798 EXPORT_SYMBOL(kuc_ptr);
1799
1800 /* Test if payload is part of kuc message
1801 * @param p Pointer to payload area
1802 * @returns boolean
1803 */
1804 int kuc_ispayload(void *p)
1805 {
1806 struct kuc_hdr *kh = ((struct kuc_hdr *)p) - 1;
1807
1808 if (kh->kuc_magic == KUC_MAGIC)
1809 return 1;
1810 else
1811 return 0;
1812 }
1813 EXPORT_SYMBOL(kuc_ispayload);
1814
1815 /* Alloc space for a message, and fill in header
1816 * @return Pointer to payload area
1817 */
1818 void *kuc_alloc(int payload_len, int transport, int type)
1819 {
1820 struct kuc_hdr *lh;
1821 int len = kuc_len(payload_len);
1822
1823 lh = kzalloc(len, GFP_NOFS);
1824 if (!lh)
1825 return ERR_PTR(-ENOMEM);
1826
1827 lh->kuc_magic = KUC_MAGIC;
1828 lh->kuc_transport = transport;
1829 lh->kuc_msgtype = type;
1830 lh->kuc_msglen = len;
1831
1832 return (void *)(lh + 1);
1833 }
1834 EXPORT_SYMBOL(kuc_alloc);
1835
1836 /* Takes pointer to payload area */
1837 inline void kuc_free(void *p, int payload_len)
1838 {
1839 struct kuc_hdr *lh = kuc_ptr(p);
1840 kfree(lh);
1841 }
1842 EXPORT_SYMBOL(kuc_free);
This page took 0.091223 seconds and 5 git commands to generate.