staging: lustre: remove RETURN macro
[deliverable/linux.git] / drivers / staging / lustre / lustre / obdclass / genops.c
1 /*
2 * GPL HEADER START
3 *
4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5 *
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
9 *
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License version 2 for more details (a copy is included
14 * in the LICENSE file that accompanied this code).
15 *
16 * You should have received a copy of the GNU General Public License
17 * version 2 along with this program; If not, see
18 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19 *
20 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21 * CA 95054 USA or visit www.sun.com if you need additional information or
22 * have any questions.
23 *
24 * GPL HEADER END
25 */
26 /*
27 * Copyright (c) 1999, 2010, Oracle and/or its affiliates. All rights reserved.
28 * Use is subject to license terms.
29 *
30 * Copyright (c) 2011, 2012, Intel Corporation.
31 */
32 /*
33 * This file is part of Lustre, http://www.lustre.org/
34 * Lustre is a trademark of Sun Microsystems, Inc.
35 *
36 * lustre/obdclass/genops.c
37 *
38 * These are the only exported functions, they provide some generic
39 * infrastructure for managing object devices
40 */
41
42 #define DEBUG_SUBSYSTEM S_CLASS
43 #include <obd_ost.h>
44 #include <obd_class.h>
45 #include <lprocfs_status.h>
46
47 extern struct list_head obd_types;
48 spinlock_t obd_types_lock;
49
50 struct kmem_cache *obd_device_cachep;
51 struct kmem_cache *obdo_cachep;
52 EXPORT_SYMBOL(obdo_cachep);
53 struct kmem_cache *import_cachep;
54
55 struct list_head obd_zombie_imports;
56 struct list_head obd_zombie_exports;
57 spinlock_t obd_zombie_impexp_lock;
58 static void obd_zombie_impexp_notify(void);
59 static void obd_zombie_export_add(struct obd_export *exp);
60 static void obd_zombie_import_add(struct obd_import *imp);
61 static void print_export_data(struct obd_export *exp,
62 const char *status, int locks);
63
64 int (*ptlrpc_put_connection_superhack)(struct ptlrpc_connection *c);
65 EXPORT_SYMBOL(ptlrpc_put_connection_superhack);
66
67 /*
68 * support functions: we could use inter-module communication, but this
69 * is more portable to other OS's
70 */
71 static struct obd_device *obd_device_alloc(void)
72 {
73 struct obd_device *obd;
74
75 OBD_SLAB_ALLOC_PTR_GFP(obd, obd_device_cachep, __GFP_IO);
76 if (obd != NULL) {
77 obd->obd_magic = OBD_DEVICE_MAGIC;
78 }
79 return obd;
80 }
81
82 static void obd_device_free(struct obd_device *obd)
83 {
84 LASSERT(obd != NULL);
85 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "obd %p obd_magic %08x != %08x\n",
86 obd, obd->obd_magic, OBD_DEVICE_MAGIC);
87 if (obd->obd_namespace != NULL) {
88 CERROR("obd %p: namespace %p was not properly cleaned up (obd_force=%d)!\n",
89 obd, obd->obd_namespace, obd->obd_force);
90 LBUG();
91 }
92 lu_ref_fini(&obd->obd_reference);
93 OBD_SLAB_FREE_PTR(obd, obd_device_cachep);
94 }
95
96 struct obd_type *class_search_type(const char *name)
97 {
98 struct list_head *tmp;
99 struct obd_type *type;
100
101 spin_lock(&obd_types_lock);
102 list_for_each(tmp, &obd_types) {
103 type = list_entry(tmp, struct obd_type, typ_chain);
104 if (strcmp(type->typ_name, name) == 0) {
105 spin_unlock(&obd_types_lock);
106 return type;
107 }
108 }
109 spin_unlock(&obd_types_lock);
110 return NULL;
111 }
112 EXPORT_SYMBOL(class_search_type);
113
114 struct obd_type *class_get_type(const char *name)
115 {
116 struct obd_type *type = class_search_type(name);
117
118 if (!type) {
119 const char *modname = name;
120
121 if (strcmp(modname, "obdfilter") == 0)
122 modname = "ofd";
123
124 if (strcmp(modname, LUSTRE_LWP_NAME) == 0)
125 modname = LUSTRE_OSP_NAME;
126
127 if (!strncmp(modname, LUSTRE_MDS_NAME, strlen(LUSTRE_MDS_NAME)))
128 modname = LUSTRE_MDT_NAME;
129
130 if (!request_module("%s", modname)) {
131 CDEBUG(D_INFO, "Loaded module '%s'\n", modname);
132 type = class_search_type(name);
133 } else {
134 LCONSOLE_ERROR_MSG(0x158, "Can't load module '%s'\n",
135 modname);
136 }
137 }
138 if (type) {
139 spin_lock(&type->obd_type_lock);
140 type->typ_refcnt++;
141 try_module_get(type->typ_dt_ops->o_owner);
142 spin_unlock(&type->obd_type_lock);
143 }
144 return type;
145 }
146 EXPORT_SYMBOL(class_get_type);
147
148 void class_put_type(struct obd_type *type)
149 {
150 LASSERT(type);
151 spin_lock(&type->obd_type_lock);
152 type->typ_refcnt--;
153 module_put(type->typ_dt_ops->o_owner);
154 spin_unlock(&type->obd_type_lock);
155 }
156 EXPORT_SYMBOL(class_put_type);
157
158 #define CLASS_MAX_NAME 1024
159
160 int class_register_type(struct obd_ops *dt_ops, struct md_ops *md_ops,
161 struct lprocfs_vars *vars, const char *name,
162 struct lu_device_type *ldt)
163 {
164 struct obd_type *type;
165 int rc = 0;
166
167 /* sanity check */
168 LASSERT(strnlen(name, CLASS_MAX_NAME) < CLASS_MAX_NAME);
169
170 if (class_search_type(name)) {
171 CDEBUG(D_IOCTL, "Type %s already registered\n", name);
172 return -EEXIST;
173 }
174
175 rc = -ENOMEM;
176 OBD_ALLOC(type, sizeof(*type));
177 if (type == NULL)
178 return rc;
179
180 OBD_ALLOC_PTR(type->typ_dt_ops);
181 OBD_ALLOC_PTR(type->typ_md_ops);
182 OBD_ALLOC(type->typ_name, strlen(name) + 1);
183
184 if (type->typ_dt_ops == NULL ||
185 type->typ_md_ops == NULL ||
186 type->typ_name == NULL)
187 GOTO (failed, rc);
188
189 *(type->typ_dt_ops) = *dt_ops;
190 /* md_ops is optional */
191 if (md_ops)
192 *(type->typ_md_ops) = *md_ops;
193 strcpy(type->typ_name, name);
194 spin_lock_init(&type->obd_type_lock);
195
196 #ifdef LPROCFS
197 type->typ_procroot = lprocfs_register(type->typ_name, proc_lustre_root,
198 vars, type);
199 if (IS_ERR(type->typ_procroot)) {
200 rc = PTR_ERR(type->typ_procroot);
201 type->typ_procroot = NULL;
202 GOTO (failed, rc);
203 }
204 #endif
205 if (ldt != NULL) {
206 type->typ_lu = ldt;
207 rc = lu_device_type_init(ldt);
208 if (rc != 0)
209 GOTO (failed, rc);
210 }
211
212 spin_lock(&obd_types_lock);
213 list_add(&type->typ_chain, &obd_types);
214 spin_unlock(&obd_types_lock);
215
216 return 0;
217
218 failed:
219 if (type->typ_name != NULL)
220 OBD_FREE(type->typ_name, strlen(name) + 1);
221 if (type->typ_md_ops != NULL)
222 OBD_FREE_PTR(type->typ_md_ops);
223 if (type->typ_dt_ops != NULL)
224 OBD_FREE_PTR(type->typ_dt_ops);
225 OBD_FREE(type, sizeof(*type));
226 return rc;
227 }
228 EXPORT_SYMBOL(class_register_type);
229
230 int class_unregister_type(const char *name)
231 {
232 struct obd_type *type = class_search_type(name);
233
234 if (!type) {
235 CERROR("unknown obd type\n");
236 return -EINVAL;
237 }
238
239 if (type->typ_refcnt) {
240 CERROR("type %s has refcount (%d)\n", name, type->typ_refcnt);
241 /* This is a bad situation, let's make the best of it */
242 /* Remove ops, but leave the name for debugging */
243 OBD_FREE_PTR(type->typ_dt_ops);
244 OBD_FREE_PTR(type->typ_md_ops);
245 return -EBUSY;
246 }
247
248 if (type->typ_procroot) {
249 lprocfs_remove(&type->typ_procroot);
250 }
251
252 if (type->typ_lu)
253 lu_device_type_fini(type->typ_lu);
254
255 spin_lock(&obd_types_lock);
256 list_del(&type->typ_chain);
257 spin_unlock(&obd_types_lock);
258 OBD_FREE(type->typ_name, strlen(name) + 1);
259 if (type->typ_dt_ops != NULL)
260 OBD_FREE_PTR(type->typ_dt_ops);
261 if (type->typ_md_ops != NULL)
262 OBD_FREE_PTR(type->typ_md_ops);
263 OBD_FREE(type, sizeof(*type));
264 return 0;
265 } /* class_unregister_type */
266 EXPORT_SYMBOL(class_unregister_type);
267
268 /**
269 * Create a new obd device.
270 *
271 * Find an empty slot in ::obd_devs[], create a new obd device in it.
272 *
273 * \param[in] type_name obd device type string.
274 * \param[in] name obd device name.
275 *
276 * \retval NULL if create fails, otherwise return the obd device
277 * pointer created.
278 */
279 struct obd_device *class_newdev(const char *type_name, const char *name)
280 {
281 struct obd_device *result = NULL;
282 struct obd_device *newdev;
283 struct obd_type *type = NULL;
284 int i;
285 int new_obd_minor = 0;
286
287 if (strlen(name) >= MAX_OBD_NAME) {
288 CERROR("name/uuid must be < %u bytes long\n", MAX_OBD_NAME);
289 return ERR_PTR(-EINVAL);
290 }
291
292 type = class_get_type(type_name);
293 if (type == NULL){
294 CERROR("OBD: unknown type: %s\n", type_name);
295 return ERR_PTR(-ENODEV);
296 }
297
298 newdev = obd_device_alloc();
299 if (newdev == NULL)
300 GOTO(out_type, result = ERR_PTR(-ENOMEM));
301
302 LASSERT(newdev->obd_magic == OBD_DEVICE_MAGIC);
303
304 write_lock(&obd_dev_lock);
305 for (i = 0; i < class_devno_max(); i++) {
306 struct obd_device *obd = class_num2obd(i);
307
308 if (obd && (strcmp(name, obd->obd_name) == 0)) {
309 CERROR("Device %s already exists at %d, won't add\n",
310 name, i);
311 if (result) {
312 LASSERTF(result->obd_magic == OBD_DEVICE_MAGIC,
313 "%p obd_magic %08x != %08x\n", result,
314 result->obd_magic, OBD_DEVICE_MAGIC);
315 LASSERTF(result->obd_minor == new_obd_minor,
316 "%p obd_minor %d != %d\n", result,
317 result->obd_minor, new_obd_minor);
318
319 obd_devs[result->obd_minor] = NULL;
320 result->obd_name[0]='\0';
321 }
322 result = ERR_PTR(-EEXIST);
323 break;
324 }
325 if (!result && !obd) {
326 result = newdev;
327 result->obd_minor = i;
328 new_obd_minor = i;
329 result->obd_type = type;
330 strncpy(result->obd_name, name,
331 sizeof(result->obd_name) - 1);
332 obd_devs[i] = result;
333 }
334 }
335 write_unlock(&obd_dev_lock);
336
337 if (result == NULL && i >= class_devno_max()) {
338 CERROR("all %u OBD devices used, increase MAX_OBD_DEVICES\n",
339 class_devno_max());
340 GOTO(out, result = ERR_PTR(-EOVERFLOW));
341 }
342
343 if (IS_ERR(result))
344 GOTO(out, result);
345
346 CDEBUG(D_IOCTL, "Adding new device %s (%p)\n",
347 result->obd_name, result);
348
349 return result;
350 out:
351 obd_device_free(newdev);
352 out_type:
353 class_put_type(type);
354 return result;
355 }
356
357 void class_release_dev(struct obd_device *obd)
358 {
359 struct obd_type *obd_type = obd->obd_type;
360
361 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "%p obd_magic %08x != %08x\n",
362 obd, obd->obd_magic, OBD_DEVICE_MAGIC);
363 LASSERTF(obd == obd_devs[obd->obd_minor], "obd %p != obd_devs[%d] %p\n",
364 obd, obd->obd_minor, obd_devs[obd->obd_minor]);
365 LASSERT(obd_type != NULL);
366
367 CDEBUG(D_INFO, "Release obd device %s at %d obd_type name =%s\n",
368 obd->obd_name, obd->obd_minor, obd->obd_type->typ_name);
369
370 write_lock(&obd_dev_lock);
371 obd_devs[obd->obd_minor] = NULL;
372 write_unlock(&obd_dev_lock);
373 obd_device_free(obd);
374
375 class_put_type(obd_type);
376 }
377
378 int class_name2dev(const char *name)
379 {
380 int i;
381
382 if (!name)
383 return -1;
384
385 read_lock(&obd_dev_lock);
386 for (i = 0; i < class_devno_max(); i++) {
387 struct obd_device *obd = class_num2obd(i);
388
389 if (obd && strcmp(name, obd->obd_name) == 0) {
390 /* Make sure we finished attaching before we give
391 out any references */
392 LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
393 if (obd->obd_attached) {
394 read_unlock(&obd_dev_lock);
395 return i;
396 }
397 break;
398 }
399 }
400 read_unlock(&obd_dev_lock);
401
402 return -1;
403 }
404 EXPORT_SYMBOL(class_name2dev);
405
406 struct obd_device *class_name2obd(const char *name)
407 {
408 int dev = class_name2dev(name);
409
410 if (dev < 0 || dev > class_devno_max())
411 return NULL;
412 return class_num2obd(dev);
413 }
414 EXPORT_SYMBOL(class_name2obd);
415
416 int class_uuid2dev(struct obd_uuid *uuid)
417 {
418 int i;
419
420 read_lock(&obd_dev_lock);
421 for (i = 0; i < class_devno_max(); i++) {
422 struct obd_device *obd = class_num2obd(i);
423
424 if (obd && obd_uuid_equals(uuid, &obd->obd_uuid)) {
425 LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
426 read_unlock(&obd_dev_lock);
427 return i;
428 }
429 }
430 read_unlock(&obd_dev_lock);
431
432 return -1;
433 }
434 EXPORT_SYMBOL(class_uuid2dev);
435
436 struct obd_device *class_uuid2obd(struct obd_uuid *uuid)
437 {
438 int dev = class_uuid2dev(uuid);
439 if (dev < 0)
440 return NULL;
441 return class_num2obd(dev);
442 }
443 EXPORT_SYMBOL(class_uuid2obd);
444
445 /**
446 * Get obd device from ::obd_devs[]
447 *
448 * \param num [in] array index
449 *
450 * \retval NULL if ::obd_devs[\a num] does not contains an obd device
451 * otherwise return the obd device there.
452 */
453 struct obd_device *class_num2obd(int num)
454 {
455 struct obd_device *obd = NULL;
456
457 if (num < class_devno_max()) {
458 obd = obd_devs[num];
459 if (obd == NULL)
460 return NULL;
461
462 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC,
463 "%p obd_magic %08x != %08x\n",
464 obd, obd->obd_magic, OBD_DEVICE_MAGIC);
465 LASSERTF(obd->obd_minor == num,
466 "%p obd_minor %0d != %0d\n",
467 obd, obd->obd_minor, num);
468 }
469
470 return obd;
471 }
472 EXPORT_SYMBOL(class_num2obd);
473
474 /**
475 * Get obd devices count. Device in any
476 * state are counted
477 * \retval obd device count
478 */
479 int get_devices_count(void)
480 {
481 int index, max_index = class_devno_max(), dev_count = 0;
482
483 read_lock(&obd_dev_lock);
484 for (index = 0; index <= max_index; index++) {
485 struct obd_device *obd = class_num2obd(index);
486 if (obd != NULL)
487 dev_count++;
488 }
489 read_unlock(&obd_dev_lock);
490
491 return dev_count;
492 }
493 EXPORT_SYMBOL(get_devices_count);
494
495 void class_obd_list(void)
496 {
497 char *status;
498 int i;
499
500 read_lock(&obd_dev_lock);
501 for (i = 0; i < class_devno_max(); i++) {
502 struct obd_device *obd = class_num2obd(i);
503
504 if (obd == NULL)
505 continue;
506 if (obd->obd_stopping)
507 status = "ST";
508 else if (obd->obd_set_up)
509 status = "UP";
510 else if (obd->obd_attached)
511 status = "AT";
512 else
513 status = "--";
514 LCONSOLE(D_CONFIG, "%3d %s %s %s %s %d\n",
515 i, status, obd->obd_type->typ_name,
516 obd->obd_name, obd->obd_uuid.uuid,
517 atomic_read(&obd->obd_refcount));
518 }
519 read_unlock(&obd_dev_lock);
520 return;
521 }
522
523 /* Search for a client OBD connected to tgt_uuid. If grp_uuid is
524 specified, then only the client with that uuid is returned,
525 otherwise any client connected to the tgt is returned. */
526 struct obd_device * class_find_client_obd(struct obd_uuid *tgt_uuid,
527 const char * typ_name,
528 struct obd_uuid *grp_uuid)
529 {
530 int i;
531
532 read_lock(&obd_dev_lock);
533 for (i = 0; i < class_devno_max(); i++) {
534 struct obd_device *obd = class_num2obd(i);
535
536 if (obd == NULL)
537 continue;
538 if ((strncmp(obd->obd_type->typ_name, typ_name,
539 strlen(typ_name)) == 0)) {
540 if (obd_uuid_equals(tgt_uuid,
541 &obd->u.cli.cl_target_uuid) &&
542 ((grp_uuid)? obd_uuid_equals(grp_uuid,
543 &obd->obd_uuid) : 1)) {
544 read_unlock(&obd_dev_lock);
545 return obd;
546 }
547 }
548 }
549 read_unlock(&obd_dev_lock);
550
551 return NULL;
552 }
553 EXPORT_SYMBOL(class_find_client_obd);
554
555 /* Iterate the obd_device list looking devices have grp_uuid. Start
556 searching at *next, and if a device is found, the next index to look
557 at is saved in *next. If next is NULL, then the first matching device
558 will always be returned. */
559 struct obd_device * class_devices_in_group(struct obd_uuid *grp_uuid, int *next)
560 {
561 int i;
562
563 if (next == NULL)
564 i = 0;
565 else if (*next >= 0 && *next < class_devno_max())
566 i = *next;
567 else
568 return NULL;
569
570 read_lock(&obd_dev_lock);
571 for (; i < class_devno_max(); i++) {
572 struct obd_device *obd = class_num2obd(i);
573
574 if (obd == NULL)
575 continue;
576 if (obd_uuid_equals(grp_uuid, &obd->obd_uuid)) {
577 if (next != NULL)
578 *next = i+1;
579 read_unlock(&obd_dev_lock);
580 return obd;
581 }
582 }
583 read_unlock(&obd_dev_lock);
584
585 return NULL;
586 }
587 EXPORT_SYMBOL(class_devices_in_group);
588
589 /**
590 * to notify sptlrpc log for \a fsname has changed, let every relevant OBD
591 * adjust sptlrpc settings accordingly.
592 */
593 int class_notify_sptlrpc_conf(const char *fsname, int namelen)
594 {
595 struct obd_device *obd;
596 const char *type;
597 int i, rc = 0, rc2;
598
599 LASSERT(namelen > 0);
600
601 read_lock(&obd_dev_lock);
602 for (i = 0; i < class_devno_max(); i++) {
603 obd = class_num2obd(i);
604
605 if (obd == NULL || obd->obd_set_up == 0 || obd->obd_stopping)
606 continue;
607
608 /* only notify mdc, osc, mdt, ost */
609 type = obd->obd_type->typ_name;
610 if (strcmp(type, LUSTRE_MDC_NAME) != 0 &&
611 strcmp(type, LUSTRE_OSC_NAME) != 0 &&
612 strcmp(type, LUSTRE_MDT_NAME) != 0 &&
613 strcmp(type, LUSTRE_OST_NAME) != 0)
614 continue;
615
616 if (strncmp(obd->obd_name, fsname, namelen))
617 continue;
618
619 class_incref(obd, __FUNCTION__, obd);
620 read_unlock(&obd_dev_lock);
621 rc2 = obd_set_info_async(NULL, obd->obd_self_export,
622 sizeof(KEY_SPTLRPC_CONF),
623 KEY_SPTLRPC_CONF, 0, NULL, NULL);
624 rc = rc ? rc : rc2;
625 class_decref(obd, __FUNCTION__, obd);
626 read_lock(&obd_dev_lock);
627 }
628 read_unlock(&obd_dev_lock);
629 return rc;
630 }
631 EXPORT_SYMBOL(class_notify_sptlrpc_conf);
632
633 void obd_cleanup_caches(void)
634 {
635 if (obd_device_cachep) {
636 kmem_cache_destroy(obd_device_cachep);
637 obd_device_cachep = NULL;
638 }
639 if (obdo_cachep) {
640 kmem_cache_destroy(obdo_cachep);
641 obdo_cachep = NULL;
642 }
643 if (import_cachep) {
644 kmem_cache_destroy(import_cachep);
645 import_cachep = NULL;
646 }
647 if (capa_cachep) {
648 kmem_cache_destroy(capa_cachep);
649 capa_cachep = NULL;
650 }
651 }
652
653 int obd_init_caches(void)
654 {
655 LASSERT(obd_device_cachep == NULL);
656 obd_device_cachep = kmem_cache_create("ll_obd_dev_cache",
657 sizeof(struct obd_device),
658 0, 0, NULL);
659 if (!obd_device_cachep)
660 GOTO(out, -ENOMEM);
661
662 LASSERT(obdo_cachep == NULL);
663 obdo_cachep = kmem_cache_create("ll_obdo_cache", sizeof(struct obdo),
664 0, 0, NULL);
665 if (!obdo_cachep)
666 GOTO(out, -ENOMEM);
667
668 LASSERT(import_cachep == NULL);
669 import_cachep = kmem_cache_create("ll_import_cache",
670 sizeof(struct obd_import),
671 0, 0, NULL);
672 if (!import_cachep)
673 GOTO(out, -ENOMEM);
674
675 LASSERT(capa_cachep == NULL);
676 capa_cachep = kmem_cache_create("capa_cache",
677 sizeof(struct obd_capa), 0, 0, NULL);
678 if (!capa_cachep)
679 GOTO(out, -ENOMEM);
680
681 return 0;
682 out:
683 obd_cleanup_caches();
684 return -ENOMEM;
685
686 }
687
688 /* map connection to client */
689 struct obd_export *class_conn2export(struct lustre_handle *conn)
690 {
691 struct obd_export *export;
692
693 if (!conn) {
694 CDEBUG(D_CACHE, "looking for null handle\n");
695 return NULL;
696 }
697
698 if (conn->cookie == -1) { /* this means assign a new connection */
699 CDEBUG(D_CACHE, "want a new connection\n");
700 return NULL;
701 }
702
703 CDEBUG(D_INFO, "looking for export cookie "LPX64"\n", conn->cookie);
704 export = class_handle2object(conn->cookie);
705 return export;
706 }
707 EXPORT_SYMBOL(class_conn2export);
708
709 struct obd_device *class_exp2obd(struct obd_export *exp)
710 {
711 if (exp)
712 return exp->exp_obd;
713 return NULL;
714 }
715 EXPORT_SYMBOL(class_exp2obd);
716
717 struct obd_device *class_conn2obd(struct lustre_handle *conn)
718 {
719 struct obd_export *export;
720 export = class_conn2export(conn);
721 if (export) {
722 struct obd_device *obd = export->exp_obd;
723 class_export_put(export);
724 return obd;
725 }
726 return NULL;
727 }
728 EXPORT_SYMBOL(class_conn2obd);
729
730 struct obd_import *class_exp2cliimp(struct obd_export *exp)
731 {
732 struct obd_device *obd = exp->exp_obd;
733 if (obd == NULL)
734 return NULL;
735 return obd->u.cli.cl_import;
736 }
737 EXPORT_SYMBOL(class_exp2cliimp);
738
739 struct obd_import *class_conn2cliimp(struct lustre_handle *conn)
740 {
741 struct obd_device *obd = class_conn2obd(conn);
742 if (obd == NULL)
743 return NULL;
744 return obd->u.cli.cl_import;
745 }
746 EXPORT_SYMBOL(class_conn2cliimp);
747
748 /* Export management functions */
749 static void class_export_destroy(struct obd_export *exp)
750 {
751 struct obd_device *obd = exp->exp_obd;
752
753 LASSERT_ATOMIC_ZERO(&exp->exp_refcount);
754 LASSERT(obd != NULL);
755
756 CDEBUG(D_IOCTL, "destroying export %p/%s for %s\n", exp,
757 exp->exp_client_uuid.uuid, obd->obd_name);
758
759 /* "Local" exports (lctl, LOV->{mdc,osc}) have no connection. */
760 if (exp->exp_connection)
761 ptlrpc_put_connection_superhack(exp->exp_connection);
762
763 LASSERT(list_empty(&exp->exp_outstanding_replies));
764 LASSERT(list_empty(&exp->exp_uncommitted_replies));
765 LASSERT(list_empty(&exp->exp_req_replay_queue));
766 LASSERT(list_empty(&exp->exp_hp_rpcs));
767 obd_destroy_export(exp);
768 class_decref(obd, "export", exp);
769
770 OBD_FREE_RCU(exp, sizeof(*exp), &exp->exp_handle);
771 }
772
773 static void export_handle_addref(void *export)
774 {
775 class_export_get(export);
776 }
777
778 static struct portals_handle_ops export_handle_ops = {
779 .hop_addref = export_handle_addref,
780 .hop_free = NULL,
781 };
782
783 struct obd_export *class_export_get(struct obd_export *exp)
784 {
785 atomic_inc(&exp->exp_refcount);
786 CDEBUG(D_INFO, "GETting export %p : new refcount %d\n", exp,
787 atomic_read(&exp->exp_refcount));
788 return exp;
789 }
790 EXPORT_SYMBOL(class_export_get);
791
792 void class_export_put(struct obd_export *exp)
793 {
794 LASSERT(exp != NULL);
795 LASSERT_ATOMIC_GT_LT(&exp->exp_refcount, 0, LI_POISON);
796 CDEBUG(D_INFO, "PUTting export %p : new refcount %d\n", exp,
797 atomic_read(&exp->exp_refcount) - 1);
798
799 if (atomic_dec_and_test(&exp->exp_refcount)) {
800 LASSERT(!list_empty(&exp->exp_obd_chain));
801 CDEBUG(D_IOCTL, "final put %p/%s\n",
802 exp, exp->exp_client_uuid.uuid);
803
804 /* release nid stat refererence */
805 lprocfs_exp_cleanup(exp);
806
807 obd_zombie_export_add(exp);
808 }
809 }
810 EXPORT_SYMBOL(class_export_put);
811
812 /* Creates a new export, adds it to the hash table, and returns a
813 * pointer to it. The refcount is 2: one for the hash reference, and
814 * one for the pointer returned by this function. */
815 struct obd_export *class_new_export(struct obd_device *obd,
816 struct obd_uuid *cluuid)
817 {
818 struct obd_export *export;
819 cfs_hash_t *hash = NULL;
820 int rc = 0;
821
822 OBD_ALLOC_PTR(export);
823 if (!export)
824 return ERR_PTR(-ENOMEM);
825
826 export->exp_conn_cnt = 0;
827 export->exp_lock_hash = NULL;
828 export->exp_flock_hash = NULL;
829 atomic_set(&export->exp_refcount, 2);
830 atomic_set(&export->exp_rpc_count, 0);
831 atomic_set(&export->exp_cb_count, 0);
832 atomic_set(&export->exp_locks_count, 0);
833 #if LUSTRE_TRACKS_LOCK_EXP_REFS
834 INIT_LIST_HEAD(&export->exp_locks_list);
835 spin_lock_init(&export->exp_locks_list_guard);
836 #endif
837 atomic_set(&export->exp_replay_count, 0);
838 export->exp_obd = obd;
839 INIT_LIST_HEAD(&export->exp_outstanding_replies);
840 spin_lock_init(&export->exp_uncommitted_replies_lock);
841 INIT_LIST_HEAD(&export->exp_uncommitted_replies);
842 INIT_LIST_HEAD(&export->exp_req_replay_queue);
843 INIT_LIST_HEAD(&export->exp_handle.h_link);
844 INIT_LIST_HEAD(&export->exp_hp_rpcs);
845 class_handle_hash(&export->exp_handle, &export_handle_ops);
846 export->exp_last_request_time = cfs_time_current_sec();
847 spin_lock_init(&export->exp_lock);
848 spin_lock_init(&export->exp_rpc_lock);
849 INIT_HLIST_NODE(&export->exp_uuid_hash);
850 INIT_HLIST_NODE(&export->exp_nid_hash);
851 spin_lock_init(&export->exp_bl_list_lock);
852 INIT_LIST_HEAD(&export->exp_bl_list);
853
854 export->exp_sp_peer = LUSTRE_SP_ANY;
855 export->exp_flvr.sf_rpc = SPTLRPC_FLVR_INVALID;
856 export->exp_client_uuid = *cluuid;
857 obd_init_export(export);
858
859 spin_lock(&obd->obd_dev_lock);
860 /* shouldn't happen, but might race */
861 if (obd->obd_stopping)
862 GOTO(exit_unlock, rc = -ENODEV);
863
864 hash = cfs_hash_getref(obd->obd_uuid_hash);
865 if (hash == NULL)
866 GOTO(exit_unlock, rc = -ENODEV);
867 spin_unlock(&obd->obd_dev_lock);
868
869 if (!obd_uuid_equals(cluuid, &obd->obd_uuid)) {
870 rc = cfs_hash_add_unique(hash, cluuid, &export->exp_uuid_hash);
871 if (rc != 0) {
872 LCONSOLE_WARN("%s: denying duplicate export for %s, %d\n",
873 obd->obd_name, cluuid->uuid, rc);
874 GOTO(exit_err, rc = -EALREADY);
875 }
876 }
877
878 spin_lock(&obd->obd_dev_lock);
879 if (obd->obd_stopping) {
880 cfs_hash_del(hash, cluuid, &export->exp_uuid_hash);
881 GOTO(exit_unlock, rc = -ENODEV);
882 }
883
884 class_incref(obd, "export", export);
885 list_add(&export->exp_obd_chain, &export->exp_obd->obd_exports);
886 list_add_tail(&export->exp_obd_chain_timed,
887 &export->exp_obd->obd_exports_timed);
888 export->exp_obd->obd_num_exports++;
889 spin_unlock(&obd->obd_dev_lock);
890 cfs_hash_putref(hash);
891 return export;
892
893 exit_unlock:
894 spin_unlock(&obd->obd_dev_lock);
895 exit_err:
896 if (hash)
897 cfs_hash_putref(hash);
898 class_handle_unhash(&export->exp_handle);
899 LASSERT(hlist_unhashed(&export->exp_uuid_hash));
900 obd_destroy_export(export);
901 OBD_FREE_PTR(export);
902 return ERR_PTR(rc);
903 }
904 EXPORT_SYMBOL(class_new_export);
905
906 void class_unlink_export(struct obd_export *exp)
907 {
908 class_handle_unhash(&exp->exp_handle);
909
910 spin_lock(&exp->exp_obd->obd_dev_lock);
911 /* delete an uuid-export hashitem from hashtables */
912 if (!hlist_unhashed(&exp->exp_uuid_hash))
913 cfs_hash_del(exp->exp_obd->obd_uuid_hash,
914 &exp->exp_client_uuid,
915 &exp->exp_uuid_hash);
916
917 list_move(&exp->exp_obd_chain, &exp->exp_obd->obd_unlinked_exports);
918 list_del_init(&exp->exp_obd_chain_timed);
919 exp->exp_obd->obd_num_exports--;
920 spin_unlock(&exp->exp_obd->obd_dev_lock);
921 class_export_put(exp);
922 }
923 EXPORT_SYMBOL(class_unlink_export);
924
925 /* Import management functions */
926 void class_import_destroy(struct obd_import *imp)
927 {
928 CDEBUG(D_IOCTL, "destroying import %p for %s\n", imp,
929 imp->imp_obd->obd_name);
930
931 LASSERT_ATOMIC_ZERO(&imp->imp_refcount);
932
933 ptlrpc_put_connection_superhack(imp->imp_connection);
934
935 while (!list_empty(&imp->imp_conn_list)) {
936 struct obd_import_conn *imp_conn;
937
938 imp_conn = list_entry(imp->imp_conn_list.next,
939 struct obd_import_conn, oic_item);
940 list_del_init(&imp_conn->oic_item);
941 ptlrpc_put_connection_superhack(imp_conn->oic_conn);
942 OBD_FREE(imp_conn, sizeof(*imp_conn));
943 }
944
945 LASSERT(imp->imp_sec == NULL);
946 class_decref(imp->imp_obd, "import", imp);
947 OBD_FREE_RCU(imp, sizeof(*imp), &imp->imp_handle);
948 }
949
950 static void import_handle_addref(void *import)
951 {
952 class_import_get(import);
953 }
954
955 static struct portals_handle_ops import_handle_ops = {
956 .hop_addref = import_handle_addref,
957 .hop_free = NULL,
958 };
959
960 struct obd_import *class_import_get(struct obd_import *import)
961 {
962 atomic_inc(&import->imp_refcount);
963 CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", import,
964 atomic_read(&import->imp_refcount),
965 import->imp_obd->obd_name);
966 return import;
967 }
968 EXPORT_SYMBOL(class_import_get);
969
970 void class_import_put(struct obd_import *imp)
971 {
972 LASSERT(list_empty(&imp->imp_zombie_chain));
973 LASSERT_ATOMIC_GT_LT(&imp->imp_refcount, 0, LI_POISON);
974
975 CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", imp,
976 atomic_read(&imp->imp_refcount) - 1,
977 imp->imp_obd->obd_name);
978
979 if (atomic_dec_and_test(&imp->imp_refcount)) {
980 CDEBUG(D_INFO, "final put import %p\n", imp);
981 obd_zombie_import_add(imp);
982 }
983
984 /* catch possible import put race */
985 LASSERT_ATOMIC_GE_LT(&imp->imp_refcount, 0, LI_POISON);
986 }
987 EXPORT_SYMBOL(class_import_put);
988
989 static void init_imp_at(struct imp_at *at) {
990 int i;
991 at_init(&at->iat_net_latency, 0, 0);
992 for (i = 0; i < IMP_AT_MAX_PORTALS; i++) {
993 /* max service estimates are tracked on the server side, so
994 don't use the AT history here, just use the last reported
995 val. (But keep hist for proc histogram, worst_ever) */
996 at_init(&at->iat_service_estimate[i], INITIAL_CONNECT_TIMEOUT,
997 AT_FLG_NOHIST);
998 }
999 }
1000
1001 struct obd_import *class_new_import(struct obd_device *obd)
1002 {
1003 struct obd_import *imp;
1004
1005 OBD_ALLOC(imp, sizeof(*imp));
1006 if (imp == NULL)
1007 return NULL;
1008
1009 INIT_LIST_HEAD(&imp->imp_pinger_chain);
1010 INIT_LIST_HEAD(&imp->imp_zombie_chain);
1011 INIT_LIST_HEAD(&imp->imp_replay_list);
1012 INIT_LIST_HEAD(&imp->imp_sending_list);
1013 INIT_LIST_HEAD(&imp->imp_delayed_list);
1014 spin_lock_init(&imp->imp_lock);
1015 imp->imp_last_success_conn = 0;
1016 imp->imp_state = LUSTRE_IMP_NEW;
1017 imp->imp_obd = class_incref(obd, "import", imp);
1018 mutex_init(&imp->imp_sec_mutex);
1019 init_waitqueue_head(&imp->imp_recovery_waitq);
1020
1021 atomic_set(&imp->imp_refcount, 2);
1022 atomic_set(&imp->imp_unregistering, 0);
1023 atomic_set(&imp->imp_inflight, 0);
1024 atomic_set(&imp->imp_replay_inflight, 0);
1025 atomic_set(&imp->imp_inval_count, 0);
1026 INIT_LIST_HEAD(&imp->imp_conn_list);
1027 INIT_LIST_HEAD(&imp->imp_handle.h_link);
1028 class_handle_hash(&imp->imp_handle, &import_handle_ops);
1029 init_imp_at(&imp->imp_at);
1030
1031 /* the default magic is V2, will be used in connect RPC, and
1032 * then adjusted according to the flags in request/reply. */
1033 imp->imp_msg_magic = LUSTRE_MSG_MAGIC_V2;
1034
1035 return imp;
1036 }
1037 EXPORT_SYMBOL(class_new_import);
1038
1039 void class_destroy_import(struct obd_import *import)
1040 {
1041 LASSERT(import != NULL);
1042 LASSERT(import != LP_POISON);
1043
1044 class_handle_unhash(&import->imp_handle);
1045
1046 spin_lock(&import->imp_lock);
1047 import->imp_generation++;
1048 spin_unlock(&import->imp_lock);
1049 class_import_put(import);
1050 }
1051 EXPORT_SYMBOL(class_destroy_import);
1052
1053 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1054
1055 void __class_export_add_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1056 {
1057 spin_lock(&exp->exp_locks_list_guard);
1058
1059 LASSERT(lock->l_exp_refs_nr >= 0);
1060
1061 if (lock->l_exp_refs_target != NULL &&
1062 lock->l_exp_refs_target != exp) {
1063 LCONSOLE_WARN("setting export %p for lock %p which already has export %p\n",
1064 exp, lock, lock->l_exp_refs_target);
1065 }
1066 if ((lock->l_exp_refs_nr ++) == 0) {
1067 list_add(&lock->l_exp_refs_link, &exp->exp_locks_list);
1068 lock->l_exp_refs_target = exp;
1069 }
1070 CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1071 lock, exp, lock->l_exp_refs_nr);
1072 spin_unlock(&exp->exp_locks_list_guard);
1073 }
1074 EXPORT_SYMBOL(__class_export_add_lock_ref);
1075
1076 void __class_export_del_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1077 {
1078 spin_lock(&exp->exp_locks_list_guard);
1079 LASSERT(lock->l_exp_refs_nr > 0);
1080 if (lock->l_exp_refs_target != exp) {
1081 LCONSOLE_WARN("lock %p, "
1082 "mismatching export pointers: %p, %p\n",
1083 lock, lock->l_exp_refs_target, exp);
1084 }
1085 if (-- lock->l_exp_refs_nr == 0) {
1086 list_del_init(&lock->l_exp_refs_link);
1087 lock->l_exp_refs_target = NULL;
1088 }
1089 CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1090 lock, exp, lock->l_exp_refs_nr);
1091 spin_unlock(&exp->exp_locks_list_guard);
1092 }
1093 EXPORT_SYMBOL(__class_export_del_lock_ref);
1094 #endif
1095
1096 /* A connection defines an export context in which preallocation can
1097 be managed. This releases the export pointer reference, and returns
1098 the export handle, so the export refcount is 1 when this function
1099 returns. */
1100 int class_connect(struct lustre_handle *conn, struct obd_device *obd,
1101 struct obd_uuid *cluuid)
1102 {
1103 struct obd_export *export;
1104 LASSERT(conn != NULL);
1105 LASSERT(obd != NULL);
1106 LASSERT(cluuid != NULL);
1107
1108 export = class_new_export(obd, cluuid);
1109 if (IS_ERR(export))
1110 return PTR_ERR(export);
1111
1112 conn->cookie = export->exp_handle.h_cookie;
1113 class_export_put(export);
1114
1115 CDEBUG(D_IOCTL, "connect: client %s, cookie "LPX64"\n",
1116 cluuid->uuid, conn->cookie);
1117 return 0;
1118 }
1119 EXPORT_SYMBOL(class_connect);
1120
1121 /* if export is involved in recovery then clean up related things */
1122 void class_export_recovery_cleanup(struct obd_export *exp)
1123 {
1124 struct obd_device *obd = exp->exp_obd;
1125
1126 spin_lock(&obd->obd_recovery_task_lock);
1127 if (exp->exp_delayed)
1128 obd->obd_delayed_clients--;
1129 if (obd->obd_recovering) {
1130 if (exp->exp_in_recovery) {
1131 spin_lock(&exp->exp_lock);
1132 exp->exp_in_recovery = 0;
1133 spin_unlock(&exp->exp_lock);
1134 LASSERT_ATOMIC_POS(&obd->obd_connected_clients);
1135 atomic_dec(&obd->obd_connected_clients);
1136 }
1137
1138 /* if called during recovery then should update
1139 * obd_stale_clients counter,
1140 * lightweight exports are not counted */
1141 if (exp->exp_failed &&
1142 (exp_connect_flags(exp) & OBD_CONNECT_LIGHTWEIGHT) == 0)
1143 exp->exp_obd->obd_stale_clients++;
1144 }
1145 spin_unlock(&obd->obd_recovery_task_lock);
1146 /** Cleanup req replay fields */
1147 if (exp->exp_req_replay_needed) {
1148 spin_lock(&exp->exp_lock);
1149 exp->exp_req_replay_needed = 0;
1150 spin_unlock(&exp->exp_lock);
1151 LASSERT(atomic_read(&obd->obd_req_replay_clients));
1152 atomic_dec(&obd->obd_req_replay_clients);
1153 }
1154 /** Cleanup lock replay data */
1155 if (exp->exp_lock_replay_needed) {
1156 spin_lock(&exp->exp_lock);
1157 exp->exp_lock_replay_needed = 0;
1158 spin_unlock(&exp->exp_lock);
1159 LASSERT(atomic_read(&obd->obd_lock_replay_clients));
1160 atomic_dec(&obd->obd_lock_replay_clients);
1161 }
1162 }
1163
1164 /* This function removes 1-3 references from the export:
1165 * 1 - for export pointer passed
1166 * and if disconnect really need
1167 * 2 - removing from hash
1168 * 3 - in client_unlink_export
1169 * The export pointer passed to this function can destroyed */
1170 int class_disconnect(struct obd_export *export)
1171 {
1172 int already_disconnected;
1173
1174 if (export == NULL) {
1175 CWARN("attempting to free NULL export %p\n", export);
1176 return -EINVAL;
1177 }
1178
1179 spin_lock(&export->exp_lock);
1180 already_disconnected = export->exp_disconnected;
1181 export->exp_disconnected = 1;
1182 spin_unlock(&export->exp_lock);
1183
1184 /* class_cleanup(), abort_recovery(), and class_fail_export()
1185 * all end up in here, and if any of them race we shouldn't
1186 * call extra class_export_puts(). */
1187 if (already_disconnected) {
1188 LASSERT(hlist_unhashed(&export->exp_nid_hash));
1189 GOTO(no_disconn, already_disconnected);
1190 }
1191
1192 CDEBUG(D_IOCTL, "disconnect: cookie "LPX64"\n",
1193 export->exp_handle.h_cookie);
1194
1195 if (!hlist_unhashed(&export->exp_nid_hash))
1196 cfs_hash_del(export->exp_obd->obd_nid_hash,
1197 &export->exp_connection->c_peer.nid,
1198 &export->exp_nid_hash);
1199
1200 class_export_recovery_cleanup(export);
1201 class_unlink_export(export);
1202 no_disconn:
1203 class_export_put(export);
1204 return 0;
1205 }
1206 EXPORT_SYMBOL(class_disconnect);
1207
1208 /* Return non-zero for a fully connected export */
1209 int class_connected_export(struct obd_export *exp)
1210 {
1211 if (exp) {
1212 int connected;
1213 spin_lock(&exp->exp_lock);
1214 connected = (exp->exp_conn_cnt > 0);
1215 spin_unlock(&exp->exp_lock);
1216 return connected;
1217 }
1218 return 0;
1219 }
1220 EXPORT_SYMBOL(class_connected_export);
1221
1222 static void class_disconnect_export_list(struct list_head *list,
1223 enum obd_option flags)
1224 {
1225 int rc;
1226 struct obd_export *exp;
1227
1228 /* It's possible that an export may disconnect itself, but
1229 * nothing else will be added to this list. */
1230 while (!list_empty(list)) {
1231 exp = list_entry(list->next, struct obd_export,
1232 exp_obd_chain);
1233 /* need for safe call CDEBUG after obd_disconnect */
1234 class_export_get(exp);
1235
1236 spin_lock(&exp->exp_lock);
1237 exp->exp_flags = flags;
1238 spin_unlock(&exp->exp_lock);
1239
1240 if (obd_uuid_equals(&exp->exp_client_uuid,
1241 &exp->exp_obd->obd_uuid)) {
1242 CDEBUG(D_HA,
1243 "exp %p export uuid == obd uuid, don't discon\n",
1244 exp);
1245 /* Need to delete this now so we don't end up pointing
1246 * to work_list later when this export is cleaned up. */
1247 list_del_init(&exp->exp_obd_chain);
1248 class_export_put(exp);
1249 continue;
1250 }
1251
1252 class_export_get(exp);
1253 CDEBUG(D_HA, "%s: disconnecting export at %s (%p), "
1254 "last request at "CFS_TIME_T"\n",
1255 exp->exp_obd->obd_name, obd_export_nid2str(exp),
1256 exp, exp->exp_last_request_time);
1257 /* release one export reference anyway */
1258 rc = obd_disconnect(exp);
1259
1260 CDEBUG(D_HA, "disconnected export at %s (%p): rc %d\n",
1261 obd_export_nid2str(exp), exp, rc);
1262 class_export_put(exp);
1263 }
1264 }
1265
1266 void class_disconnect_exports(struct obd_device *obd)
1267 {
1268 struct list_head work_list;
1269
1270 /* Move all of the exports from obd_exports to a work list, en masse. */
1271 INIT_LIST_HEAD(&work_list);
1272 spin_lock(&obd->obd_dev_lock);
1273 list_splice_init(&obd->obd_exports, &work_list);
1274 list_splice_init(&obd->obd_delayed_exports, &work_list);
1275 spin_unlock(&obd->obd_dev_lock);
1276
1277 if (!list_empty(&work_list)) {
1278 CDEBUG(D_HA, "OBD device %d (%p) has exports, "
1279 "disconnecting them\n", obd->obd_minor, obd);
1280 class_disconnect_export_list(&work_list,
1281 exp_flags_from_obd(obd));
1282 } else
1283 CDEBUG(D_HA, "OBD device %d (%p) has no exports\n",
1284 obd->obd_minor, obd);
1285 }
1286 EXPORT_SYMBOL(class_disconnect_exports);
1287
1288 /* Remove exports that have not completed recovery.
1289 */
1290 void class_disconnect_stale_exports(struct obd_device *obd,
1291 int (*test_export)(struct obd_export *))
1292 {
1293 struct list_head work_list;
1294 struct obd_export *exp, *n;
1295 int evicted = 0;
1296
1297 INIT_LIST_HEAD(&work_list);
1298 spin_lock(&obd->obd_dev_lock);
1299 list_for_each_entry_safe(exp, n, &obd->obd_exports,
1300 exp_obd_chain) {
1301 /* don't count self-export as client */
1302 if (obd_uuid_equals(&exp->exp_client_uuid,
1303 &exp->exp_obd->obd_uuid))
1304 continue;
1305
1306 /* don't evict clients which have no slot in last_rcvd
1307 * (e.g. lightweight connection) */
1308 if (exp->exp_target_data.ted_lr_idx == -1)
1309 continue;
1310
1311 spin_lock(&exp->exp_lock);
1312 if (exp->exp_failed || test_export(exp)) {
1313 spin_unlock(&exp->exp_lock);
1314 continue;
1315 }
1316 exp->exp_failed = 1;
1317 spin_unlock(&exp->exp_lock);
1318
1319 list_move(&exp->exp_obd_chain, &work_list);
1320 evicted++;
1321 CDEBUG(D_HA, "%s: disconnect stale client %s@%s\n",
1322 obd->obd_name, exp->exp_client_uuid.uuid,
1323 exp->exp_connection == NULL ? "<unknown>" :
1324 libcfs_nid2str(exp->exp_connection->c_peer.nid));
1325 print_export_data(exp, "EVICTING", 0);
1326 }
1327 spin_unlock(&obd->obd_dev_lock);
1328
1329 if (evicted)
1330 LCONSOLE_WARN("%s: disconnecting %d stale clients\n",
1331 obd->obd_name, evicted);
1332
1333 class_disconnect_export_list(&work_list, exp_flags_from_obd(obd) |
1334 OBD_OPT_ABORT_RECOV);
1335 }
1336 EXPORT_SYMBOL(class_disconnect_stale_exports);
1337
1338 void class_fail_export(struct obd_export *exp)
1339 {
1340 int rc, already_failed;
1341
1342 spin_lock(&exp->exp_lock);
1343 already_failed = exp->exp_failed;
1344 exp->exp_failed = 1;
1345 spin_unlock(&exp->exp_lock);
1346
1347 if (already_failed) {
1348 CDEBUG(D_HA, "disconnecting dead export %p/%s; skipping\n",
1349 exp, exp->exp_client_uuid.uuid);
1350 return;
1351 }
1352
1353 CDEBUG(D_HA, "disconnecting export %p/%s\n",
1354 exp, exp->exp_client_uuid.uuid);
1355
1356 if (obd_dump_on_timeout)
1357 libcfs_debug_dumplog();
1358
1359 /* need for safe call CDEBUG after obd_disconnect */
1360 class_export_get(exp);
1361
1362 /* Most callers into obd_disconnect are removing their own reference
1363 * (request, for example) in addition to the one from the hash table.
1364 * We don't have such a reference here, so make one. */
1365 class_export_get(exp);
1366 rc = obd_disconnect(exp);
1367 if (rc)
1368 CERROR("disconnecting export %p failed: %d\n", exp, rc);
1369 else
1370 CDEBUG(D_HA, "disconnected export %p/%s\n",
1371 exp, exp->exp_client_uuid.uuid);
1372 class_export_put(exp);
1373 }
1374 EXPORT_SYMBOL(class_fail_export);
1375
1376 char *obd_export_nid2str(struct obd_export *exp)
1377 {
1378 if (exp->exp_connection != NULL)
1379 return libcfs_nid2str(exp->exp_connection->c_peer.nid);
1380
1381 return "(no nid)";
1382 }
1383 EXPORT_SYMBOL(obd_export_nid2str);
1384
1385 int obd_export_evict_by_nid(struct obd_device *obd, const char *nid)
1386 {
1387 cfs_hash_t *nid_hash;
1388 struct obd_export *doomed_exp = NULL;
1389 int exports_evicted = 0;
1390
1391 lnet_nid_t nid_key = libcfs_str2nid((char *)nid);
1392
1393 spin_lock(&obd->obd_dev_lock);
1394 /* umount has run already, so evict thread should leave
1395 * its task to umount thread now */
1396 if (obd->obd_stopping) {
1397 spin_unlock(&obd->obd_dev_lock);
1398 return exports_evicted;
1399 }
1400 nid_hash = obd->obd_nid_hash;
1401 cfs_hash_getref(nid_hash);
1402 spin_unlock(&obd->obd_dev_lock);
1403
1404 do {
1405 doomed_exp = cfs_hash_lookup(nid_hash, &nid_key);
1406 if (doomed_exp == NULL)
1407 break;
1408
1409 LASSERTF(doomed_exp->exp_connection->c_peer.nid == nid_key,
1410 "nid %s found, wanted nid %s, requested nid %s\n",
1411 obd_export_nid2str(doomed_exp),
1412 libcfs_nid2str(nid_key), nid);
1413 LASSERTF(doomed_exp != obd->obd_self_export,
1414 "self-export is hashed by NID?\n");
1415 exports_evicted++;
1416 LCONSOLE_WARN("%s: evicting %s (at %s) by administrative "
1417 "request\n", obd->obd_name,
1418 obd_uuid2str(&doomed_exp->exp_client_uuid),
1419 obd_export_nid2str(doomed_exp));
1420 class_fail_export(doomed_exp);
1421 class_export_put(doomed_exp);
1422 } while (1);
1423
1424 cfs_hash_putref(nid_hash);
1425
1426 if (!exports_evicted)
1427 CDEBUG(D_HA,"%s: can't disconnect NID '%s': no exports found\n",
1428 obd->obd_name, nid);
1429 return exports_evicted;
1430 }
1431 EXPORT_SYMBOL(obd_export_evict_by_nid);
1432
1433 int obd_export_evict_by_uuid(struct obd_device *obd, const char *uuid)
1434 {
1435 cfs_hash_t *uuid_hash;
1436 struct obd_export *doomed_exp = NULL;
1437 struct obd_uuid doomed_uuid;
1438 int exports_evicted = 0;
1439
1440 spin_lock(&obd->obd_dev_lock);
1441 if (obd->obd_stopping) {
1442 spin_unlock(&obd->obd_dev_lock);
1443 return exports_evicted;
1444 }
1445 uuid_hash = obd->obd_uuid_hash;
1446 cfs_hash_getref(uuid_hash);
1447 spin_unlock(&obd->obd_dev_lock);
1448
1449 obd_str2uuid(&doomed_uuid, uuid);
1450 if (obd_uuid_equals(&doomed_uuid, &obd->obd_uuid)) {
1451 CERROR("%s: can't evict myself\n", obd->obd_name);
1452 cfs_hash_putref(uuid_hash);
1453 return exports_evicted;
1454 }
1455
1456 doomed_exp = cfs_hash_lookup(uuid_hash, &doomed_uuid);
1457
1458 if (doomed_exp == NULL) {
1459 CERROR("%s: can't disconnect %s: no exports found\n",
1460 obd->obd_name, uuid);
1461 } else {
1462 CWARN("%s: evicting %s at administrative request\n",
1463 obd->obd_name, doomed_exp->exp_client_uuid.uuid);
1464 class_fail_export(doomed_exp);
1465 class_export_put(doomed_exp);
1466 exports_evicted++;
1467 }
1468 cfs_hash_putref(uuid_hash);
1469
1470 return exports_evicted;
1471 }
1472 EXPORT_SYMBOL(obd_export_evict_by_uuid);
1473
1474 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1475 void (*class_export_dump_hook)(struct obd_export*) = NULL;
1476 EXPORT_SYMBOL(class_export_dump_hook);
1477 #endif
1478
1479 static void print_export_data(struct obd_export *exp, const char *status,
1480 int locks)
1481 {
1482 struct ptlrpc_reply_state *rs;
1483 struct ptlrpc_reply_state *first_reply = NULL;
1484 int nreplies = 0;
1485
1486 spin_lock(&exp->exp_lock);
1487 list_for_each_entry(rs, &exp->exp_outstanding_replies,
1488 rs_exp_list) {
1489 if (nreplies == 0)
1490 first_reply = rs;
1491 nreplies++;
1492 }
1493 spin_unlock(&exp->exp_lock);
1494
1495 CDEBUG(D_HA, "%s: %s %p %s %s %d (%d %d %d) %d %d %d %d: %p %s "LPU64"\n",
1496 exp->exp_obd->obd_name, status, exp, exp->exp_client_uuid.uuid,
1497 obd_export_nid2str(exp), atomic_read(&exp->exp_refcount),
1498 atomic_read(&exp->exp_rpc_count),
1499 atomic_read(&exp->exp_cb_count),
1500 atomic_read(&exp->exp_locks_count),
1501 exp->exp_disconnected, exp->exp_delayed, exp->exp_failed,
1502 nreplies, first_reply, nreplies > 3 ? "..." : "",
1503 exp->exp_last_committed);
1504 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1505 if (locks && class_export_dump_hook != NULL)
1506 class_export_dump_hook(exp);
1507 #endif
1508 }
1509
1510 void dump_exports(struct obd_device *obd, int locks)
1511 {
1512 struct obd_export *exp;
1513
1514 spin_lock(&obd->obd_dev_lock);
1515 list_for_each_entry(exp, &obd->obd_exports, exp_obd_chain)
1516 print_export_data(exp, "ACTIVE", locks);
1517 list_for_each_entry(exp, &obd->obd_unlinked_exports, exp_obd_chain)
1518 print_export_data(exp, "UNLINKED", locks);
1519 list_for_each_entry(exp, &obd->obd_delayed_exports, exp_obd_chain)
1520 print_export_data(exp, "DELAYED", locks);
1521 spin_unlock(&obd->obd_dev_lock);
1522 spin_lock(&obd_zombie_impexp_lock);
1523 list_for_each_entry(exp, &obd_zombie_exports, exp_obd_chain)
1524 print_export_data(exp, "ZOMBIE", locks);
1525 spin_unlock(&obd_zombie_impexp_lock);
1526 }
1527 EXPORT_SYMBOL(dump_exports);
1528
1529 void obd_exports_barrier(struct obd_device *obd)
1530 {
1531 int waited = 2;
1532 LASSERT(list_empty(&obd->obd_exports));
1533 spin_lock(&obd->obd_dev_lock);
1534 while (!list_empty(&obd->obd_unlinked_exports)) {
1535 spin_unlock(&obd->obd_dev_lock);
1536 schedule_timeout_and_set_state(TASK_UNINTERRUPTIBLE,
1537 cfs_time_seconds(waited));
1538 if (waited > 5 && IS_PO2(waited)) {
1539 LCONSOLE_WARN("%s is waiting for obd_unlinked_exports "
1540 "more than %d seconds. "
1541 "The obd refcount = %d. Is it stuck?\n",
1542 obd->obd_name, waited,
1543 atomic_read(&obd->obd_refcount));
1544 dump_exports(obd, 1);
1545 }
1546 waited *= 2;
1547 spin_lock(&obd->obd_dev_lock);
1548 }
1549 spin_unlock(&obd->obd_dev_lock);
1550 }
1551 EXPORT_SYMBOL(obd_exports_barrier);
1552
1553 /* Total amount of zombies to be destroyed */
1554 static int zombies_count = 0;
1555
1556 /**
1557 * kill zombie imports and exports
1558 */
1559 void obd_zombie_impexp_cull(void)
1560 {
1561 struct obd_import *import;
1562 struct obd_export *export;
1563
1564 do {
1565 spin_lock(&obd_zombie_impexp_lock);
1566
1567 import = NULL;
1568 if (!list_empty(&obd_zombie_imports)) {
1569 import = list_entry(obd_zombie_imports.next,
1570 struct obd_import,
1571 imp_zombie_chain);
1572 list_del_init(&import->imp_zombie_chain);
1573 }
1574
1575 export = NULL;
1576 if (!list_empty(&obd_zombie_exports)) {
1577 export = list_entry(obd_zombie_exports.next,
1578 struct obd_export,
1579 exp_obd_chain);
1580 list_del_init(&export->exp_obd_chain);
1581 }
1582
1583 spin_unlock(&obd_zombie_impexp_lock);
1584
1585 if (import != NULL) {
1586 class_import_destroy(import);
1587 spin_lock(&obd_zombie_impexp_lock);
1588 zombies_count--;
1589 spin_unlock(&obd_zombie_impexp_lock);
1590 }
1591
1592 if (export != NULL) {
1593 class_export_destroy(export);
1594 spin_lock(&obd_zombie_impexp_lock);
1595 zombies_count--;
1596 spin_unlock(&obd_zombie_impexp_lock);
1597 }
1598
1599 cond_resched();
1600 } while (import != NULL || export != NULL);
1601 }
1602
1603 static struct completion obd_zombie_start;
1604 static struct completion obd_zombie_stop;
1605 static unsigned long obd_zombie_flags;
1606 static wait_queue_head_t obd_zombie_waitq;
1607 static pid_t obd_zombie_pid;
1608
1609 enum {
1610 OBD_ZOMBIE_STOP = 0x0001,
1611 };
1612
1613 /**
1614 * check for work for kill zombie import/export thread.
1615 */
1616 static int obd_zombie_impexp_check(void *arg)
1617 {
1618 int rc;
1619
1620 spin_lock(&obd_zombie_impexp_lock);
1621 rc = (zombies_count == 0) &&
1622 !test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags);
1623 spin_unlock(&obd_zombie_impexp_lock);
1624
1625 return rc;
1626 }
1627
1628 /**
1629 * Add export to the obd_zombe thread and notify it.
1630 */
1631 static void obd_zombie_export_add(struct obd_export *exp) {
1632 spin_lock(&exp->exp_obd->obd_dev_lock);
1633 LASSERT(!list_empty(&exp->exp_obd_chain));
1634 list_del_init(&exp->exp_obd_chain);
1635 spin_unlock(&exp->exp_obd->obd_dev_lock);
1636 spin_lock(&obd_zombie_impexp_lock);
1637 zombies_count++;
1638 list_add(&exp->exp_obd_chain, &obd_zombie_exports);
1639 spin_unlock(&obd_zombie_impexp_lock);
1640
1641 obd_zombie_impexp_notify();
1642 }
1643
1644 /**
1645 * Add import to the obd_zombe thread and notify it.
1646 */
1647 static void obd_zombie_import_add(struct obd_import *imp) {
1648 LASSERT(imp->imp_sec == NULL);
1649 LASSERT(imp->imp_rq_pool == NULL);
1650 spin_lock(&obd_zombie_impexp_lock);
1651 LASSERT(list_empty(&imp->imp_zombie_chain));
1652 zombies_count++;
1653 list_add(&imp->imp_zombie_chain, &obd_zombie_imports);
1654 spin_unlock(&obd_zombie_impexp_lock);
1655
1656 obd_zombie_impexp_notify();
1657 }
1658
1659 /**
1660 * notify import/export destroy thread about new zombie.
1661 */
1662 static void obd_zombie_impexp_notify(void)
1663 {
1664 /*
1665 * Make sure obd_zomebie_impexp_thread get this notification.
1666 * It is possible this signal only get by obd_zombie_barrier, and
1667 * barrier gulps this notification and sleeps away and hangs ensues
1668 */
1669 wake_up_all(&obd_zombie_waitq);
1670 }
1671
1672 /**
1673 * check whether obd_zombie is idle
1674 */
1675 static int obd_zombie_is_idle(void)
1676 {
1677 int rc;
1678
1679 LASSERT(!test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags));
1680 spin_lock(&obd_zombie_impexp_lock);
1681 rc = (zombies_count == 0);
1682 spin_unlock(&obd_zombie_impexp_lock);
1683 return rc;
1684 }
1685
1686 /**
1687 * wait when obd_zombie import/export queues become empty
1688 */
1689 void obd_zombie_barrier(void)
1690 {
1691 struct l_wait_info lwi = { 0 };
1692
1693 if (obd_zombie_pid == current_pid())
1694 /* don't wait for myself */
1695 return;
1696 l_wait_event(obd_zombie_waitq, obd_zombie_is_idle(), &lwi);
1697 }
1698 EXPORT_SYMBOL(obd_zombie_barrier);
1699
1700
1701 /**
1702 * destroy zombie export/import thread.
1703 */
1704 static int obd_zombie_impexp_thread(void *unused)
1705 {
1706 unshare_fs_struct();
1707 complete(&obd_zombie_start);
1708
1709 obd_zombie_pid = current_pid();
1710
1711 while (!test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags)) {
1712 struct l_wait_info lwi = { 0 };
1713
1714 l_wait_event(obd_zombie_waitq,
1715 !obd_zombie_impexp_check(NULL), &lwi);
1716 obd_zombie_impexp_cull();
1717
1718 /*
1719 * Notify obd_zombie_barrier callers that queues
1720 * may be empty.
1721 */
1722 wake_up(&obd_zombie_waitq);
1723 }
1724
1725 complete(&obd_zombie_stop);
1726
1727 return 0;
1728 }
1729
1730
1731 /**
1732 * start destroy zombie import/export thread
1733 */
1734 int obd_zombie_impexp_init(void)
1735 {
1736 task_t *task;
1737
1738 INIT_LIST_HEAD(&obd_zombie_imports);
1739 INIT_LIST_HEAD(&obd_zombie_exports);
1740 spin_lock_init(&obd_zombie_impexp_lock);
1741 init_completion(&obd_zombie_start);
1742 init_completion(&obd_zombie_stop);
1743 init_waitqueue_head(&obd_zombie_waitq);
1744 obd_zombie_pid = 0;
1745
1746 task = kthread_run(obd_zombie_impexp_thread, NULL, "obd_zombid");
1747 if (IS_ERR(task))
1748 return PTR_ERR(task);
1749
1750 wait_for_completion(&obd_zombie_start);
1751 return 0;
1752 }
1753 /**
1754 * stop destroy zombie import/export thread
1755 */
1756 void obd_zombie_impexp_stop(void)
1757 {
1758 set_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags);
1759 obd_zombie_impexp_notify();
1760 wait_for_completion(&obd_zombie_stop);
1761 }
1762
1763 /***** Kernel-userspace comm helpers *******/
1764
1765 /* Get length of entire message, including header */
1766 int kuc_len(int payload_len)
1767 {
1768 return sizeof(struct kuc_hdr) + payload_len;
1769 }
1770 EXPORT_SYMBOL(kuc_len);
1771
1772 /* Get a pointer to kuc header, given a ptr to the payload
1773 * @param p Pointer to payload area
1774 * @returns Pointer to kuc header
1775 */
1776 struct kuc_hdr * kuc_ptr(void *p)
1777 {
1778 struct kuc_hdr *lh = ((struct kuc_hdr *)p) - 1;
1779 LASSERT(lh->kuc_magic == KUC_MAGIC);
1780 return lh;
1781 }
1782 EXPORT_SYMBOL(kuc_ptr);
1783
1784 /* Test if payload is part of kuc message
1785 * @param p Pointer to payload area
1786 * @returns boolean
1787 */
1788 int kuc_ispayload(void *p)
1789 {
1790 struct kuc_hdr *kh = ((struct kuc_hdr *)p) - 1;
1791
1792 if (kh->kuc_magic == KUC_MAGIC)
1793 return 1;
1794 else
1795 return 0;
1796 }
1797 EXPORT_SYMBOL(kuc_ispayload);
1798
1799 /* Alloc space for a message, and fill in header
1800 * @return Pointer to payload area
1801 */
1802 void *kuc_alloc(int payload_len, int transport, int type)
1803 {
1804 struct kuc_hdr *lh;
1805 int len = kuc_len(payload_len);
1806
1807 OBD_ALLOC(lh, len);
1808 if (lh == NULL)
1809 return ERR_PTR(-ENOMEM);
1810
1811 lh->kuc_magic = KUC_MAGIC;
1812 lh->kuc_transport = transport;
1813 lh->kuc_msgtype = type;
1814 lh->kuc_msglen = len;
1815
1816 return (void *)(lh + 1);
1817 }
1818 EXPORT_SYMBOL(kuc_alloc);
1819
1820 /* Takes pointer to payload area */
1821 inline void kuc_free(void *p, int payload_len)
1822 {
1823 struct kuc_hdr *lh = kuc_ptr(p);
1824 OBD_FREE(lh, kuc_len(payload_len));
1825 }
1826 EXPORT_SYMBOL(kuc_free);
This page took 0.105292 seconds and 5 git commands to generate.