staging/lustre: remove IS_MDS|IS_OST|IS_MGS defines and users
[deliverable/linux.git] / drivers / staging / lustre / lustre / mgc / mgc_request.c
1 /*
2 * GPL HEADER START
3 *
4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5 *
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
9 *
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License version 2 for more details (a copy is included
14 * in the LICENSE file that accompanied this code).
15 *
16 * You should have received a copy of the GNU General Public License
17 * version 2 along with this program; If not, see
18 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19 *
20 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21 * CA 95054 USA or visit www.sun.com if you need additional information or
22 * have any questions.
23 *
24 * GPL HEADER END
25 */
26 /*
27 * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
28 * Use is subject to license terms.
29 *
30 * Copyright (c) 2011, 2012, Intel Corporation.
31 */
32 /*
33 * This file is part of Lustre, http://www.lustre.org/
34 * Lustre is a trademark of Sun Microsystems, Inc.
35 *
36 * lustre/mgc/mgc_request.c
37 *
38 * Author: Nathan Rutman <nathan@clusterfs.com>
39 */
40
41 #define DEBUG_SUBSYSTEM S_MGC
42 #define D_MGC D_CONFIG /*|D_WARNING*/
43
44 #include <linux/module.h>
45 #include "../include/obd_class.h"
46 #include "../include/lustre_dlm.h"
47 #include "../include/lprocfs_status.h"
48 #include "../include/lustre_log.h"
49 #include "../include/lustre_disk.h"
50
51 #include "mgc_internal.h"
52
53 static int mgc_name2resid(char *name, int len, struct ldlm_res_id *res_id,
54 int type)
55 {
56 __u64 resname = 0;
57
58 if (len > sizeof(resname)) {
59 CERROR("name too long: %s\n", name);
60 return -EINVAL;
61 }
62 if (len <= 0) {
63 CERROR("missing name: %s\n", name);
64 return -EINVAL;
65 }
66 memcpy(&resname, name, len);
67
68 /* Always use the same endianness for the resid */
69 memset(res_id, 0, sizeof(*res_id));
70 res_id->name[0] = cpu_to_le64(resname);
71 /* XXX: unfortunately, sptlprc and config llog share one lock */
72 switch (type) {
73 case CONFIG_T_CONFIG:
74 case CONFIG_T_SPTLRPC:
75 resname = 0;
76 break;
77 case CONFIG_T_RECOVER:
78 case CONFIG_T_PARAMS:
79 resname = type;
80 break;
81 default:
82 LBUG();
83 }
84 res_id->name[1] = cpu_to_le64(resname);
85 CDEBUG(D_MGC, "log %s to resid %#llx/%#llx (%.8s)\n", name,
86 res_id->name[0], res_id->name[1], (char *)&res_id->name[0]);
87 return 0;
88 }
89
90 int mgc_fsname2resid(char *fsname, struct ldlm_res_id *res_id, int type)
91 {
92 /* fsname is at most 8 chars long, maybe contain "-".
93 * e.g. "lustre", "SUN-000" */
94 return mgc_name2resid(fsname, strlen(fsname), res_id, type);
95 }
96 EXPORT_SYMBOL(mgc_fsname2resid);
97
98 static int mgc_logname2resid(char *logname, struct ldlm_res_id *res_id, int type)
99 {
100 char *name_end;
101 int len;
102
103 /* logname consists of "fsname-nodetype".
104 * e.g. "lustre-MDT0001", "SUN-000-client"
105 * there is an exception: llog "params" */
106 name_end = strrchr(logname, '-');
107 if (!name_end)
108 len = strlen(logname);
109 else
110 len = name_end - logname;
111 return mgc_name2resid(logname, len, res_id, type);
112 }
113
114 /********************** config llog list **********************/
115 static LIST_HEAD(config_llog_list);
116 static DEFINE_SPINLOCK(config_list_lock);
117
118 /* Take a reference to a config log */
119 static int config_log_get(struct config_llog_data *cld)
120 {
121 atomic_inc(&cld->cld_refcount);
122 CDEBUG(D_INFO, "log %s refs %d\n", cld->cld_logname,
123 atomic_read(&cld->cld_refcount));
124 return 0;
125 }
126
127 /* Drop a reference to a config log. When no longer referenced,
128 we can free the config log data */
129 static void config_log_put(struct config_llog_data *cld)
130 {
131 CDEBUG(D_INFO, "log %s refs %d\n", cld->cld_logname,
132 atomic_read(&cld->cld_refcount));
133 LASSERT(atomic_read(&cld->cld_refcount) > 0);
134
135 /* spinlock to make sure no item with 0 refcount in the list */
136 if (atomic_dec_and_lock(&cld->cld_refcount, &config_list_lock)) {
137 list_del(&cld->cld_list_chain);
138 spin_unlock(&config_list_lock);
139
140 CDEBUG(D_MGC, "dropping config log %s\n", cld->cld_logname);
141
142 if (cld->cld_recover)
143 config_log_put(cld->cld_recover);
144 if (cld->cld_sptlrpc)
145 config_log_put(cld->cld_sptlrpc);
146 if (cld->cld_params)
147 config_log_put(cld->cld_params);
148 if (cld_is_sptlrpc(cld))
149 sptlrpc_conf_log_stop(cld->cld_logname);
150
151 class_export_put(cld->cld_mgcexp);
152 kfree(cld);
153 }
154 }
155
156 /* Find a config log by name */
157 static
158 struct config_llog_data *config_log_find(char *logname,
159 struct config_llog_instance *cfg)
160 {
161 struct config_llog_data *cld;
162 struct config_llog_data *found = NULL;
163 void *instance;
164
165 LASSERT(logname != NULL);
166
167 instance = cfg ? cfg->cfg_instance : NULL;
168 spin_lock(&config_list_lock);
169 list_for_each_entry(cld, &config_llog_list, cld_list_chain) {
170 /* check if instance equals */
171 if (instance != cld->cld_cfg.cfg_instance)
172 continue;
173
174 /* instance may be NULL, should check name */
175 if (strcmp(logname, cld->cld_logname) == 0) {
176 found = cld;
177 break;
178 }
179 }
180 if (found) {
181 atomic_inc(&found->cld_refcount);
182 LASSERT(found->cld_stopping == 0 || cld_is_sptlrpc(found) == 0);
183 }
184 spin_unlock(&config_list_lock);
185 return found;
186 }
187
188 static
189 struct config_llog_data *do_config_log_add(struct obd_device *obd,
190 char *logname,
191 int type,
192 struct config_llog_instance *cfg,
193 struct super_block *sb)
194 {
195 struct config_llog_data *cld;
196 int rc;
197
198 CDEBUG(D_MGC, "do adding config log %s:%p\n", logname,
199 cfg ? cfg->cfg_instance : NULL);
200
201 cld = kzalloc(sizeof(*cld) + strlen(logname) + 1, GFP_NOFS);
202 if (!cld)
203 return ERR_PTR(-ENOMEM);
204
205 strcpy(cld->cld_logname, logname);
206 if (cfg)
207 cld->cld_cfg = *cfg;
208 else
209 cld->cld_cfg.cfg_callback = class_config_llog_handler;
210 mutex_init(&cld->cld_lock);
211 cld->cld_cfg.cfg_last_idx = 0;
212 cld->cld_cfg.cfg_flags = 0;
213 cld->cld_cfg.cfg_sb = sb;
214 cld->cld_type = type;
215 atomic_set(&cld->cld_refcount, 1);
216
217 /* Keep the mgc around until we are done */
218 cld->cld_mgcexp = class_export_get(obd->obd_self_export);
219
220 if (cld_is_sptlrpc(cld)) {
221 sptlrpc_conf_log_start(logname);
222 cld->cld_cfg.cfg_obdname = obd->obd_name;
223 }
224
225 rc = mgc_logname2resid(logname, &cld->cld_resid, type);
226
227 spin_lock(&config_list_lock);
228 list_add(&cld->cld_list_chain, &config_llog_list);
229 spin_unlock(&config_list_lock);
230
231 if (rc) {
232 config_log_put(cld);
233 return ERR_PTR(rc);
234 }
235
236 if (cld_is_sptlrpc(cld)) {
237 rc = mgc_process_log(obd, cld);
238 if (rc && rc != -ENOENT)
239 CERROR("failed processing sptlrpc log: %d\n", rc);
240 }
241
242 return cld;
243 }
244
245 static struct config_llog_data *config_recover_log_add(struct obd_device *obd,
246 char *fsname,
247 struct config_llog_instance *cfg,
248 struct super_block *sb)
249 {
250 struct config_llog_instance lcfg = *cfg;
251 struct config_llog_data *cld;
252 char logname[32];
253
254 /* we have to use different llog for clients and mdts for cmd
255 * where only clients are notified if one of cmd server restarts */
256 LASSERT(strlen(fsname) < sizeof(logname) / 2);
257 strcpy(logname, fsname);
258 LASSERT(lcfg.cfg_instance);
259 strcat(logname, "-cliir");
260
261 cld = do_config_log_add(obd, logname, CONFIG_T_RECOVER, &lcfg, sb);
262 return cld;
263 }
264
265 static struct config_llog_data *config_params_log_add(struct obd_device *obd,
266 struct config_llog_instance *cfg, struct super_block *sb)
267 {
268 struct config_llog_instance lcfg = *cfg;
269 struct config_llog_data *cld;
270
271 lcfg.cfg_instance = sb;
272
273 cld = do_config_log_add(obd, PARAMS_FILENAME, CONFIG_T_PARAMS,
274 &lcfg, sb);
275
276 return cld;
277 }
278
279 /** Add this log to the list of active logs watched by an MGC.
280 * Active means we're watching for updates.
281 * We have one active log per "mount" - client instance or servername.
282 * Each instance may be at a different point in the log.
283 */
284 static int config_log_add(struct obd_device *obd, char *logname,
285 struct config_llog_instance *cfg,
286 struct super_block *sb)
287 {
288 struct lustre_sb_info *lsi = s2lsi(sb);
289 struct config_llog_data *cld;
290 struct config_llog_data *sptlrpc_cld;
291 struct config_llog_data *params_cld;
292 char seclogname[32];
293 char *ptr;
294 int rc;
295
296 CDEBUG(D_MGC, "adding config log %s:%p\n", logname, cfg->cfg_instance);
297
298 /*
299 * for each regular log, the depended sptlrpc log name is
300 * <fsname>-sptlrpc. multiple regular logs may share one sptlrpc log.
301 */
302 ptr = strrchr(logname, '-');
303 if (ptr == NULL || ptr - logname > 8) {
304 CERROR("logname %s is too long\n", logname);
305 return -EINVAL;
306 }
307
308 memcpy(seclogname, logname, ptr - logname);
309 strcpy(seclogname + (ptr - logname), "-sptlrpc");
310
311 sptlrpc_cld = config_log_find(seclogname, NULL);
312 if (sptlrpc_cld == NULL) {
313 sptlrpc_cld = do_config_log_add(obd, seclogname,
314 CONFIG_T_SPTLRPC, NULL, NULL);
315 if (IS_ERR(sptlrpc_cld)) {
316 CERROR("can't create sptlrpc log: %s\n", seclogname);
317 rc = PTR_ERR(sptlrpc_cld);
318 goto out_err;
319 }
320 }
321 params_cld = config_params_log_add(obd, cfg, sb);
322 if (IS_ERR(params_cld)) {
323 rc = PTR_ERR(params_cld);
324 CERROR("%s: can't create params log: rc = %d\n",
325 obd->obd_name, rc);
326 goto out_err1;
327 }
328
329 cld = do_config_log_add(obd, logname, CONFIG_T_CONFIG, cfg, sb);
330 if (IS_ERR(cld)) {
331 CERROR("can't create log: %s\n", logname);
332 rc = PTR_ERR(cld);
333 goto out_err2;
334 }
335
336 cld->cld_sptlrpc = sptlrpc_cld;
337 cld->cld_params = params_cld;
338
339 LASSERT(lsi->lsi_lmd);
340 if (!(lsi->lsi_lmd->lmd_flags & LMD_FLG_NOIR)) {
341 struct config_llog_data *recover_cld;
342 *strrchr(seclogname, '-') = 0;
343 recover_cld = config_recover_log_add(obd, seclogname, cfg, sb);
344 if (IS_ERR(recover_cld)) {
345 rc = PTR_ERR(recover_cld);
346 goto out_err3;
347 }
348 cld->cld_recover = recover_cld;
349 }
350
351 return 0;
352
353 out_err3:
354 config_log_put(cld);
355
356 out_err2:
357 config_log_put(params_cld);
358
359 out_err1:
360 config_log_put(sptlrpc_cld);
361
362 out_err:
363 return rc;
364 }
365
366 DEFINE_MUTEX(llog_process_lock);
367
368 /** Stop watching for updates on this log.
369 */
370 static int config_log_end(char *logname, struct config_llog_instance *cfg)
371 {
372 struct config_llog_data *cld;
373 struct config_llog_data *cld_sptlrpc = NULL;
374 struct config_llog_data *cld_params = NULL;
375 struct config_llog_data *cld_recover = NULL;
376 int rc = 0;
377
378 cld = config_log_find(logname, cfg);
379 if (cld == NULL)
380 return -ENOENT;
381
382 mutex_lock(&cld->cld_lock);
383 /*
384 * if cld_stopping is set, it means we didn't start the log thus
385 * not owning the start ref. this can happen after previous umount:
386 * the cld still hanging there waiting for lock cancel, and we
387 * remount again but failed in the middle and call log_end without
388 * calling start_log.
389 */
390 if (unlikely(cld->cld_stopping)) {
391 mutex_unlock(&cld->cld_lock);
392 /* drop the ref from the find */
393 config_log_put(cld);
394 return rc;
395 }
396
397 cld->cld_stopping = 1;
398
399 cld_recover = cld->cld_recover;
400 cld->cld_recover = NULL;
401 mutex_unlock(&cld->cld_lock);
402
403 if (cld_recover) {
404 mutex_lock(&cld_recover->cld_lock);
405 cld_recover->cld_stopping = 1;
406 mutex_unlock(&cld_recover->cld_lock);
407 config_log_put(cld_recover);
408 }
409
410 spin_lock(&config_list_lock);
411 cld_sptlrpc = cld->cld_sptlrpc;
412 cld->cld_sptlrpc = NULL;
413 cld_params = cld->cld_params;
414 cld->cld_params = NULL;
415 spin_unlock(&config_list_lock);
416
417 if (cld_sptlrpc)
418 config_log_put(cld_sptlrpc);
419
420 if (cld_params) {
421 mutex_lock(&cld_params->cld_lock);
422 cld_params->cld_stopping = 1;
423 mutex_unlock(&cld_params->cld_lock);
424 config_log_put(cld_params);
425 }
426
427 /* drop the ref from the find */
428 config_log_put(cld);
429 /* drop the start ref */
430 config_log_put(cld);
431
432 CDEBUG(D_MGC, "end config log %s (%d)\n", logname ? logname : "client",
433 rc);
434 return rc;
435 }
436
437 int lprocfs_mgc_rd_ir_state(struct seq_file *m, void *data)
438 {
439 struct obd_device *obd = data;
440 struct obd_import *imp;
441 struct obd_connect_data *ocd;
442 struct config_llog_data *cld;
443
444 LPROCFS_CLIMP_CHECK(obd);
445 imp = obd->u.cli.cl_import;
446 ocd = &imp->imp_connect_data;
447
448 seq_printf(m, "imperative_recovery: %s\n",
449 OCD_HAS_FLAG(ocd, IMP_RECOV) ? "ENABLED" : "DISABLED");
450 seq_printf(m, "client_state:\n");
451
452 spin_lock(&config_list_lock);
453 list_for_each_entry(cld, &config_llog_list, cld_list_chain) {
454 if (cld->cld_recover == NULL)
455 continue;
456 seq_printf(m, " - { client: %s, nidtbl_version: %u }\n",
457 cld->cld_logname,
458 cld->cld_recover->cld_cfg.cfg_last_idx);
459 }
460 spin_unlock(&config_list_lock);
461
462 LPROCFS_CLIMP_EXIT(obd);
463 return 0;
464 }
465
466 /* reenqueue any lost locks */
467 #define RQ_RUNNING 0x1
468 #define RQ_NOW 0x2
469 #define RQ_LATER 0x4
470 #define RQ_STOP 0x8
471 #define RQ_PRECLEANUP 0x10
472 static int rq_state;
473 static wait_queue_head_t rq_waitq;
474 static DECLARE_COMPLETION(rq_exit);
475 static DECLARE_COMPLETION(rq_start);
476
477 static void do_requeue(struct config_llog_data *cld)
478 {
479 LASSERT(atomic_read(&cld->cld_refcount) > 0);
480
481 /* Do not run mgc_process_log on a disconnected export or an
482 export which is being disconnected. Take the client
483 semaphore to make the check non-racy. */
484 down_read(&cld->cld_mgcexp->exp_obd->u.cli.cl_sem);
485 if (cld->cld_mgcexp->exp_obd->u.cli.cl_conn_count != 0) {
486 CDEBUG(D_MGC, "updating log %s\n", cld->cld_logname);
487 mgc_process_log(cld->cld_mgcexp->exp_obd, cld);
488 } else {
489 CDEBUG(D_MGC, "disconnecting, won't update log %s\n",
490 cld->cld_logname);
491 }
492 up_read(&cld->cld_mgcexp->exp_obd->u.cli.cl_sem);
493 }
494
495 /* this timeout represents how many seconds MGC should wait before
496 * requeue config and recover lock to the MGS. We need to randomize this
497 * in order to not flood the MGS.
498 */
499 #define MGC_TIMEOUT_MIN_SECONDS 5
500 #define MGC_TIMEOUT_RAND_CENTISEC 0x1ff /* ~500 */
501
502 static int mgc_requeue_thread(void *data)
503 {
504 bool first = true;
505
506 CDEBUG(D_MGC, "Starting requeue thread\n");
507
508 /* Keep trying failed locks periodically */
509 spin_lock(&config_list_lock);
510 rq_state |= RQ_RUNNING;
511 while (1) {
512 struct l_wait_info lwi;
513 struct config_llog_data *cld, *cld_prev;
514 int rand = cfs_rand() & MGC_TIMEOUT_RAND_CENTISEC;
515 int stopped = !!(rq_state & RQ_STOP);
516 int to;
517
518 /* Any new or requeued lostlocks will change the state */
519 rq_state &= ~(RQ_NOW | RQ_LATER);
520 spin_unlock(&config_list_lock);
521
522 if (first) {
523 first = false;
524 complete(&rq_start);
525 }
526
527 /* Always wait a few seconds to allow the server who
528 caused the lock revocation to finish its setup, plus some
529 random so everyone doesn't try to reconnect at once. */
530 to = MGC_TIMEOUT_MIN_SECONDS * HZ;
531 to += rand * HZ / 100; /* rand is centi-seconds */
532 lwi = LWI_TIMEOUT(to, NULL, NULL);
533 l_wait_event(rq_waitq, rq_state & (RQ_STOP | RQ_PRECLEANUP),
534 &lwi);
535
536 /*
537 * iterate & processing through the list. for each cld, process
538 * its depending sptlrpc cld firstly (if any) and then itself.
539 *
540 * it's guaranteed any item in the list must have
541 * reference > 0; and if cld_lostlock is set, at
542 * least one reference is taken by the previous enqueue.
543 */
544 cld_prev = NULL;
545
546 spin_lock(&config_list_lock);
547 rq_state &= ~RQ_PRECLEANUP;
548 list_for_each_entry(cld, &config_llog_list,
549 cld_list_chain) {
550 if (!cld->cld_lostlock)
551 continue;
552
553 spin_unlock(&config_list_lock);
554
555 LASSERT(atomic_read(&cld->cld_refcount) > 0);
556
557 /* Whether we enqueued again or not in mgc_process_log,
558 * we're done with the ref from the old enqueue */
559 if (cld_prev)
560 config_log_put(cld_prev);
561 cld_prev = cld;
562
563 cld->cld_lostlock = 0;
564 if (likely(!stopped))
565 do_requeue(cld);
566
567 spin_lock(&config_list_lock);
568 }
569 spin_unlock(&config_list_lock);
570 if (cld_prev)
571 config_log_put(cld_prev);
572
573 /* break after scanning the list so that we can drop
574 * refcount to losing lock clds */
575 if (unlikely(stopped)) {
576 spin_lock(&config_list_lock);
577 break;
578 }
579
580 /* Wait a bit to see if anyone else needs a requeue */
581 lwi = (struct l_wait_info) { 0 };
582 l_wait_event(rq_waitq, rq_state & (RQ_NOW | RQ_STOP),
583 &lwi);
584 spin_lock(&config_list_lock);
585 }
586 /* spinlock and while guarantee RQ_NOW and RQ_LATER are not set */
587 rq_state &= ~RQ_RUNNING;
588 spin_unlock(&config_list_lock);
589
590 complete(&rq_exit);
591
592 CDEBUG(D_MGC, "Ending requeue thread\n");
593 return 0;
594 }
595
596 /* Add a cld to the list to requeue. Start the requeue thread if needed.
597 We are responsible for dropping the config log reference from here on out. */
598 static void mgc_requeue_add(struct config_llog_data *cld)
599 {
600 CDEBUG(D_INFO, "log %s: requeue (r=%d sp=%d st=%x)\n",
601 cld->cld_logname, atomic_read(&cld->cld_refcount),
602 cld->cld_stopping, rq_state);
603 LASSERT(atomic_read(&cld->cld_refcount) > 0);
604
605 mutex_lock(&cld->cld_lock);
606 if (cld->cld_stopping || cld->cld_lostlock) {
607 mutex_unlock(&cld->cld_lock);
608 return;
609 }
610 /* this refcount will be released in mgc_requeue_thread. */
611 config_log_get(cld);
612 cld->cld_lostlock = 1;
613 mutex_unlock(&cld->cld_lock);
614
615 /* Hold lock for rq_state */
616 spin_lock(&config_list_lock);
617 if (rq_state & RQ_STOP) {
618 spin_unlock(&config_list_lock);
619 cld->cld_lostlock = 0;
620 config_log_put(cld);
621 } else {
622 rq_state |= RQ_NOW;
623 spin_unlock(&config_list_lock);
624 wake_up(&rq_waitq);
625 }
626 }
627
628 static int mgc_llog_init(const struct lu_env *env, struct obd_device *obd)
629 {
630 struct llog_ctxt *ctxt;
631 int rc;
632
633 /* setup only remote ctxt, the local disk context is switched per each
634 * filesystem during mgc_fs_setup() */
635 rc = llog_setup(env, obd, &obd->obd_olg, LLOG_CONFIG_REPL_CTXT, obd,
636 &llog_client_ops);
637 if (rc)
638 return rc;
639
640 ctxt = llog_get_context(obd, LLOG_CONFIG_REPL_CTXT);
641 LASSERT(ctxt);
642
643 llog_initiator_connect(ctxt);
644 llog_ctxt_put(ctxt);
645
646 return 0;
647 }
648
649 static int mgc_llog_fini(const struct lu_env *env, struct obd_device *obd)
650 {
651 struct llog_ctxt *ctxt;
652
653 ctxt = llog_get_context(obd, LLOG_CONFIG_REPL_CTXT);
654 if (ctxt)
655 llog_cleanup(env, ctxt);
656
657 return 0;
658 }
659
660 static atomic_t mgc_count = ATOMIC_INIT(0);
661 static int mgc_precleanup(struct obd_device *obd, enum obd_cleanup_stage stage)
662 {
663 int rc = 0;
664 int temp;
665
666 switch (stage) {
667 case OBD_CLEANUP_EARLY:
668 break;
669 case OBD_CLEANUP_EXPORTS:
670 if (atomic_dec_and_test(&mgc_count)) {
671 LASSERT(rq_state & RQ_RUNNING);
672 /* stop requeue thread */
673 temp = RQ_STOP;
674 } else {
675 /* wakeup requeue thread to clean our cld */
676 temp = RQ_NOW | RQ_PRECLEANUP;
677 }
678 spin_lock(&config_list_lock);
679 rq_state |= temp;
680 spin_unlock(&config_list_lock);
681 wake_up(&rq_waitq);
682 if (temp & RQ_STOP)
683 wait_for_completion(&rq_exit);
684 obd_cleanup_client_import(obd);
685 rc = mgc_llog_fini(NULL, obd);
686 if (rc != 0)
687 CERROR("failed to cleanup llogging subsystems\n");
688 break;
689 }
690 return rc;
691 }
692
693 static int mgc_cleanup(struct obd_device *obd)
694 {
695 /* COMPAT_146 - old config logs may have added profiles we don't
696 know about */
697 if (obd->obd_type->typ_refcnt <= 1)
698 /* Only for the last mgc */
699 class_del_profiles();
700
701 lprocfs_obd_cleanup(obd);
702 ptlrpcd_decref();
703
704 return client_obd_cleanup(obd);
705 }
706
707 static int mgc_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
708 {
709 struct lprocfs_static_vars lvars = { NULL };
710 int rc;
711
712 ptlrpcd_addref();
713
714 rc = client_obd_setup(obd, lcfg);
715 if (rc)
716 goto err_decref;
717
718 rc = mgc_llog_init(NULL, obd);
719 if (rc) {
720 CERROR("failed to setup llogging subsystems\n");
721 goto err_cleanup;
722 }
723
724 lprocfs_mgc_init_vars(&lvars);
725 lprocfs_obd_setup(obd, lvars.obd_vars, lvars.sysfs_vars);
726 sptlrpc_lprocfs_cliobd_attach(obd);
727
728 if (atomic_inc_return(&mgc_count) == 1) {
729 rq_state = 0;
730 init_waitqueue_head(&rq_waitq);
731
732 /* start requeue thread */
733 rc = PTR_ERR(kthread_run(mgc_requeue_thread, NULL,
734 "ll_cfg_requeue"));
735 if (IS_ERR_VALUE(rc)) {
736 CERROR("%s: Cannot start requeue thread (%d),no more log updates!\n",
737 obd->obd_name, rc);
738 goto err_cleanup;
739 }
740 /* rc is the task_struct pointer of mgc_requeue_thread. */
741 rc = 0;
742 wait_for_completion(&rq_start);
743 }
744
745 return rc;
746
747 err_cleanup:
748 client_obd_cleanup(obd);
749 err_decref:
750 ptlrpcd_decref();
751 return rc;
752 }
753
754 /* based on ll_mdc_blocking_ast */
755 static int mgc_blocking_ast(struct ldlm_lock *lock, struct ldlm_lock_desc *desc,
756 void *data, int flag)
757 {
758 struct lustre_handle lockh;
759 struct config_llog_data *cld = (struct config_llog_data *)data;
760 int rc = 0;
761
762 switch (flag) {
763 case LDLM_CB_BLOCKING:
764 /* mgs wants the lock, give it up... */
765 LDLM_DEBUG(lock, "MGC blocking CB");
766 ldlm_lock2handle(lock, &lockh);
767 rc = ldlm_cli_cancel(&lockh, LCF_ASYNC);
768 break;
769 case LDLM_CB_CANCELING:
770 /* We've given up the lock, prepare ourselves to update. */
771 LDLM_DEBUG(lock, "MGC cancel CB");
772
773 CDEBUG(D_MGC, "Lock res "DLDLMRES" (%.8s)\n",
774 PLDLMRES(lock->l_resource),
775 (char *)&lock->l_resource->lr_name.name[0]);
776
777 if (!cld) {
778 CDEBUG(D_INFO, "missing data, won't requeue\n");
779 break;
780 }
781
782 /* held at mgc_process_log(). */
783 LASSERT(atomic_read(&cld->cld_refcount) > 0);
784 /* Are we done with this log? */
785 if (cld->cld_stopping) {
786 CDEBUG(D_MGC, "log %s: stopping, won't requeue\n",
787 cld->cld_logname);
788 config_log_put(cld);
789 break;
790 }
791 /* Make sure not to re-enqueue when the mgc is stopping
792 (we get called from client_disconnect_export) */
793 if (!lock->l_conn_export ||
794 !lock->l_conn_export->exp_obd->u.cli.cl_conn_count) {
795 CDEBUG(D_MGC, "log %.8s: disconnecting, won't requeue\n",
796 cld->cld_logname);
797 config_log_put(cld);
798 break;
799 }
800
801 /* Re-enqueue now */
802 mgc_requeue_add(cld);
803 config_log_put(cld);
804 break;
805 default:
806 LBUG();
807 }
808
809 return rc;
810 }
811
812 /* Not sure where this should go... */
813 /* This is the timeout value for MGS_CONNECT request plus a ping interval, such
814 * that we can have a chance to try the secondary MGS if any. */
815 #define MGC_ENQUEUE_LIMIT (INITIAL_CONNECT_TIMEOUT + (AT_OFF ? 0 : at_min) \
816 + PING_INTERVAL)
817 #define MGC_TARGET_REG_LIMIT 10
818 #define MGC_SEND_PARAM_LIMIT 10
819
820 /* Send parameter to MGS*/
821 static int mgc_set_mgs_param(struct obd_export *exp,
822 struct mgs_send_param *msp)
823 {
824 struct ptlrpc_request *req;
825 struct mgs_send_param *req_msp, *rep_msp;
826 int rc;
827
828 req = ptlrpc_request_alloc_pack(class_exp2cliimp(exp),
829 &RQF_MGS_SET_INFO, LUSTRE_MGS_VERSION,
830 MGS_SET_INFO);
831 if (!req)
832 return -ENOMEM;
833
834 req_msp = req_capsule_client_get(&req->rq_pill, &RMF_MGS_SEND_PARAM);
835 if (!req_msp) {
836 ptlrpc_req_finished(req);
837 return -ENOMEM;
838 }
839
840 memcpy(req_msp, msp, sizeof(*req_msp));
841 ptlrpc_request_set_replen(req);
842
843 /* Limit how long we will wait for the enqueue to complete */
844 req->rq_delay_limit = MGC_SEND_PARAM_LIMIT;
845 rc = ptlrpc_queue_wait(req);
846 if (!rc) {
847 rep_msp = req_capsule_server_get(&req->rq_pill, &RMF_MGS_SEND_PARAM);
848 memcpy(msp, rep_msp, sizeof(*rep_msp));
849 }
850
851 ptlrpc_req_finished(req);
852
853 return rc;
854 }
855
856 /* Take a config lock so we can get cancel notifications */
857 static int mgc_enqueue(struct obd_export *exp, struct lov_stripe_md *lsm,
858 __u32 type, ldlm_policy_data_t *policy, __u32 mode,
859 __u64 *flags, void *bl_cb, void *cp_cb, void *gl_cb,
860 void *data, __u32 lvb_len, void *lvb_swabber,
861 struct lustre_handle *lockh)
862 {
863 struct config_llog_data *cld = (struct config_llog_data *)data;
864 struct ldlm_enqueue_info einfo = {
865 .ei_type = type,
866 .ei_mode = mode,
867 .ei_cb_bl = mgc_blocking_ast,
868 .ei_cb_cp = ldlm_completion_ast,
869 };
870 struct ptlrpc_request *req;
871 int short_limit = cld_is_sptlrpc(cld);
872 int rc;
873
874 CDEBUG(D_MGC, "Enqueue for %s (res %#llx)\n", cld->cld_logname,
875 cld->cld_resid.name[0]);
876
877 /* We need a callback for every lockholder, so don't try to
878 ldlm_lock_match (see rev 1.1.2.11.2.47) */
879 req = ptlrpc_request_alloc_pack(class_exp2cliimp(exp),
880 &RQF_LDLM_ENQUEUE, LUSTRE_DLM_VERSION,
881 LDLM_ENQUEUE);
882 if (req == NULL)
883 return -ENOMEM;
884
885 req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER, 0);
886 ptlrpc_request_set_replen(req);
887
888 /* Limit how long we will wait for the enqueue to complete */
889 req->rq_delay_limit = short_limit ? 5 : MGC_ENQUEUE_LIMIT;
890 rc = ldlm_cli_enqueue(exp, &req, &einfo, &cld->cld_resid, NULL, flags,
891 NULL, 0, LVB_T_NONE, lockh, 0);
892 /* A failed enqueue should still call the mgc_blocking_ast,
893 where it will be requeued if needed ("grant failed"). */
894 ptlrpc_req_finished(req);
895 return rc;
896 }
897
898 static void mgc_notify_active(struct obd_device *unused)
899 {
900 /* wakeup mgc_requeue_thread to requeue mgc lock */
901 spin_lock(&config_list_lock);
902 rq_state |= RQ_NOW;
903 spin_unlock(&config_list_lock);
904 wake_up(&rq_waitq);
905
906 /* TODO: Help the MGS rebuild nidtbl. -jay */
907 }
908
909 /* Send target_reg message to MGS */
910 static int mgc_target_register(struct obd_export *exp,
911 struct mgs_target_info *mti)
912 {
913 struct ptlrpc_request *req;
914 struct mgs_target_info *req_mti, *rep_mti;
915 int rc;
916
917 req = ptlrpc_request_alloc_pack(class_exp2cliimp(exp),
918 &RQF_MGS_TARGET_REG, LUSTRE_MGS_VERSION,
919 MGS_TARGET_REG);
920 if (req == NULL)
921 return -ENOMEM;
922
923 req_mti = req_capsule_client_get(&req->rq_pill, &RMF_MGS_TARGET_INFO);
924 if (!req_mti) {
925 ptlrpc_req_finished(req);
926 return -ENOMEM;
927 }
928
929 memcpy(req_mti, mti, sizeof(*req_mti));
930 ptlrpc_request_set_replen(req);
931 CDEBUG(D_MGC, "register %s\n", mti->mti_svname);
932 /* Limit how long we will wait for the enqueue to complete */
933 req->rq_delay_limit = MGC_TARGET_REG_LIMIT;
934
935 rc = ptlrpc_queue_wait(req);
936 if (!rc) {
937 rep_mti = req_capsule_server_get(&req->rq_pill,
938 &RMF_MGS_TARGET_INFO);
939 memcpy(mti, rep_mti, sizeof(*rep_mti));
940 CDEBUG(D_MGC, "register %s got index = %d\n",
941 mti->mti_svname, mti->mti_stripe_index);
942 }
943 ptlrpc_req_finished(req);
944
945 return rc;
946 }
947
948 static int mgc_set_info_async(const struct lu_env *env, struct obd_export *exp,
949 u32 keylen, void *key, u32 vallen,
950 void *val, struct ptlrpc_request_set *set)
951 {
952 int rc = -EINVAL;
953
954 /* Turn off initial_recov after we try all backup servers once */
955 if (KEY_IS(KEY_INIT_RECOV_BACKUP)) {
956 struct obd_import *imp = class_exp2cliimp(exp);
957 int value;
958 if (vallen != sizeof(int))
959 return -EINVAL;
960 value = *(int *)val;
961 CDEBUG(D_MGC, "InitRecov %s %d/d%d:i%d:r%d:or%d:%s\n",
962 imp->imp_obd->obd_name, value,
963 imp->imp_deactive, imp->imp_invalid,
964 imp->imp_replayable, imp->imp_obd->obd_replayable,
965 ptlrpc_import_state_name(imp->imp_state));
966 /* Resurrect if we previously died */
967 if ((imp->imp_state != LUSTRE_IMP_FULL &&
968 imp->imp_state != LUSTRE_IMP_NEW) || value > 1)
969 ptlrpc_reconnect_import(imp);
970 return 0;
971 }
972 if (KEY_IS(KEY_SET_INFO)) {
973 struct mgs_send_param *msp;
974
975 msp = (struct mgs_send_param *)val;
976 rc = mgc_set_mgs_param(exp, msp);
977 return rc;
978 }
979 if (KEY_IS(KEY_MGSSEC)) {
980 struct client_obd *cli = &exp->exp_obd->u.cli;
981 struct sptlrpc_flavor flvr;
982
983 /*
984 * empty string means using current flavor, if which haven't
985 * been set yet, set it as null.
986 *
987 * if flavor has been set previously, check the asking flavor
988 * must match the existing one.
989 */
990 if (vallen == 0) {
991 if (cli->cl_flvr_mgc.sf_rpc != SPTLRPC_FLVR_INVALID)
992 return 0;
993 val = "null";
994 vallen = 4;
995 }
996
997 rc = sptlrpc_parse_flavor(val, &flvr);
998 if (rc) {
999 CERROR("invalid sptlrpc flavor %s to MGS\n",
1000 (char *) val);
1001 return rc;
1002 }
1003
1004 /*
1005 * caller already hold a mutex
1006 */
1007 if (cli->cl_flvr_mgc.sf_rpc == SPTLRPC_FLVR_INVALID) {
1008 cli->cl_flvr_mgc = flvr;
1009 } else if (memcmp(&cli->cl_flvr_mgc, &flvr,
1010 sizeof(flvr)) != 0) {
1011 char str[20];
1012
1013 sptlrpc_flavor2name(&cli->cl_flvr_mgc,
1014 str, sizeof(str));
1015 LCONSOLE_ERROR("asking sptlrpc flavor %s to MGS but currently %s is in use\n",
1016 (char *) val, str);
1017 rc = -EPERM;
1018 }
1019 return rc;
1020 }
1021
1022 return rc;
1023 }
1024
1025 static int mgc_get_info(const struct lu_env *env, struct obd_export *exp,
1026 __u32 keylen, void *key, __u32 *vallen, void *val,
1027 struct lov_stripe_md *unused)
1028 {
1029 int rc = -EINVAL;
1030
1031 if (KEY_IS(KEY_CONN_DATA)) {
1032 struct obd_import *imp = class_exp2cliimp(exp);
1033 struct obd_connect_data *data = val;
1034
1035 if (*vallen == sizeof(*data)) {
1036 *data = imp->imp_connect_data;
1037 rc = 0;
1038 }
1039 }
1040
1041 return rc;
1042 }
1043
1044 static int mgc_import_event(struct obd_device *obd,
1045 struct obd_import *imp,
1046 enum obd_import_event event)
1047 {
1048 LASSERT(imp->imp_obd == obd);
1049 CDEBUG(D_MGC, "import event %#x\n", event);
1050
1051 switch (event) {
1052 case IMP_EVENT_DISCON:
1053 /* MGC imports should not wait for recovery */
1054 if (OCD_HAS_FLAG(&imp->imp_connect_data, IMP_RECOV))
1055 ptlrpc_pinger_ir_down();
1056 break;
1057 case IMP_EVENT_INACTIVE:
1058 break;
1059 case IMP_EVENT_INVALIDATE: {
1060 struct ldlm_namespace *ns = obd->obd_namespace;
1061 ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY);
1062 break;
1063 }
1064 case IMP_EVENT_ACTIVE:
1065 CDEBUG(D_INFO, "%s: Reactivating import\n", obd->obd_name);
1066 /* Clearing obd_no_recov allows us to continue pinging */
1067 obd->obd_no_recov = 0;
1068 mgc_notify_active(obd);
1069 if (OCD_HAS_FLAG(&imp->imp_connect_data, IMP_RECOV))
1070 ptlrpc_pinger_ir_up();
1071 break;
1072 case IMP_EVENT_OCD:
1073 break;
1074 case IMP_EVENT_DEACTIVATE:
1075 case IMP_EVENT_ACTIVATE:
1076 break;
1077 default:
1078 CERROR("Unknown import event %#x\n", event);
1079 LBUG();
1080 }
1081 return 0;
1082 }
1083
1084 enum {
1085 CONFIG_READ_NRPAGES_INIT = 1 << (20 - PAGE_CACHE_SHIFT),
1086 CONFIG_READ_NRPAGES = 4
1087 };
1088
1089 static int mgc_apply_recover_logs(struct obd_device *mgc,
1090 struct config_llog_data *cld,
1091 __u64 max_version,
1092 void *data, int datalen, bool mne_swab)
1093 {
1094 struct config_llog_instance *cfg = &cld->cld_cfg;
1095 struct mgs_nidtbl_entry *entry;
1096 struct lustre_cfg *lcfg;
1097 struct lustre_cfg_bufs bufs;
1098 u64 prev_version = 0;
1099 char *inst;
1100 char *buf;
1101 int bufsz;
1102 int pos;
1103 int rc = 0;
1104 int off = 0;
1105
1106 LASSERT(cfg->cfg_instance != NULL);
1107 LASSERT(cfg->cfg_sb == cfg->cfg_instance);
1108
1109 inst = kzalloc(PAGE_CACHE_SIZE, GFP_NOFS);
1110 if (!inst)
1111 return -ENOMEM;
1112
1113 pos = snprintf(inst, PAGE_CACHE_SIZE, "%p", cfg->cfg_instance);
1114 if (pos >= PAGE_CACHE_SIZE) {
1115 kfree(inst);
1116 return -E2BIG;
1117 }
1118
1119 ++pos;
1120 buf = inst + pos;
1121 bufsz = PAGE_CACHE_SIZE - pos;
1122
1123 while (datalen > 0) {
1124 int entry_len = sizeof(*entry);
1125 int is_ost;
1126 struct obd_device *obd;
1127 char *obdname;
1128 char *cname;
1129 char *params;
1130 char *uuid;
1131
1132 rc = -EINVAL;
1133 if (datalen < sizeof(*entry))
1134 break;
1135
1136 entry = (typeof(entry))(data + off);
1137
1138 /* sanity check */
1139 if (entry->mne_nid_type != 0) /* only support type 0 for ipv4 */
1140 break;
1141 if (entry->mne_nid_count == 0) /* at least one nid entry */
1142 break;
1143 if (entry->mne_nid_size != sizeof(lnet_nid_t))
1144 break;
1145
1146 entry_len += entry->mne_nid_count * entry->mne_nid_size;
1147 if (datalen < entry_len) /* must have entry_len at least */
1148 break;
1149
1150 /* Keep this swab for normal mixed endian handling. LU-1644 */
1151 if (mne_swab)
1152 lustre_swab_mgs_nidtbl_entry(entry);
1153 if (entry->mne_length > PAGE_CACHE_SIZE) {
1154 CERROR("MNE too large (%u)\n", entry->mne_length);
1155 break;
1156 }
1157
1158 if (entry->mne_length < entry_len)
1159 break;
1160
1161 off += entry->mne_length;
1162 datalen -= entry->mne_length;
1163 if (datalen < 0)
1164 break;
1165
1166 if (entry->mne_version > max_version) {
1167 CERROR("entry index(%lld) is over max_index(%lld)\n",
1168 entry->mne_version, max_version);
1169 break;
1170 }
1171
1172 if (prev_version >= entry->mne_version) {
1173 CERROR("index unsorted, prev %lld, now %lld\n",
1174 prev_version, entry->mne_version);
1175 break;
1176 }
1177 prev_version = entry->mne_version;
1178
1179 /*
1180 * Write a string with format "nid::instance" to
1181 * lustre/<osc|mdc>/<target>-<osc|mdc>-<instance>/import.
1182 */
1183
1184 is_ost = entry->mne_type == LDD_F_SV_TYPE_OST;
1185 memset(buf, 0, bufsz);
1186 obdname = buf;
1187 pos = 0;
1188
1189 /* lustre-OST0001-osc-<instance #> */
1190 strcpy(obdname, cld->cld_logname);
1191 cname = strrchr(obdname, '-');
1192 if (cname == NULL) {
1193 CERROR("mgc %s: invalid logname %s\n",
1194 mgc->obd_name, obdname);
1195 break;
1196 }
1197
1198 pos = cname - obdname;
1199 obdname[pos] = 0;
1200 pos += sprintf(obdname + pos, "-%s%04x",
1201 is_ost ? "OST" : "MDT", entry->mne_index);
1202
1203 cname = is_ost ? "osc" : "mdc";
1204 pos += sprintf(obdname + pos, "-%s-%s", cname, inst);
1205 lustre_cfg_bufs_reset(&bufs, obdname);
1206
1207 /* find the obd by obdname */
1208 obd = class_name2obd(obdname);
1209 if (obd == NULL) {
1210 CDEBUG(D_INFO, "mgc %s: cannot find obdname %s\n",
1211 mgc->obd_name, obdname);
1212 rc = 0;
1213 /* this is a safe race, when the ost is starting up...*/
1214 continue;
1215 }
1216
1217 /* osc.import = "connection=<Conn UUID>::<target instance>" */
1218 ++pos;
1219 params = buf + pos;
1220 pos += sprintf(params, "%s.import=%s", cname, "connection=");
1221 uuid = buf + pos;
1222
1223 down_read(&obd->u.cli.cl_sem);
1224 if (obd->u.cli.cl_import == NULL) {
1225 /* client does not connect to the OST yet */
1226 up_read(&obd->u.cli.cl_sem);
1227 rc = 0;
1228 continue;
1229 }
1230
1231 /* TODO: iterate all nids to find one */
1232 /* find uuid by nid */
1233 rc = client_import_find_conn(obd->u.cli.cl_import,
1234 entry->u.nids[0],
1235 (struct obd_uuid *)uuid);
1236 up_read(&obd->u.cli.cl_sem);
1237 if (rc < 0) {
1238 CERROR("mgc: cannot find uuid by nid %s\n",
1239 libcfs_nid2str(entry->u.nids[0]));
1240 break;
1241 }
1242
1243 CDEBUG(D_INFO, "Find uuid %s by nid %s\n",
1244 uuid, libcfs_nid2str(entry->u.nids[0]));
1245
1246 pos += strlen(uuid);
1247 pos += sprintf(buf + pos, "::%u", entry->mne_instance);
1248 LASSERT(pos < bufsz);
1249
1250 lustre_cfg_bufs_set_string(&bufs, 1, params);
1251
1252 rc = -ENOMEM;
1253 lcfg = lustre_cfg_new(LCFG_PARAM, &bufs);
1254 if (lcfg == NULL) {
1255 CERROR("mgc: cannot allocate memory\n");
1256 break;
1257 }
1258
1259 CDEBUG(D_INFO, "ir apply logs %lld/%lld for %s -> %s\n",
1260 prev_version, max_version, obdname, params);
1261
1262 rc = class_process_config(lcfg);
1263 lustre_cfg_free(lcfg);
1264 if (rc)
1265 CDEBUG(D_INFO, "process config for %s error %d\n",
1266 obdname, rc);
1267
1268 /* continue, even one with error */
1269 }
1270
1271 kfree(inst);
1272 return rc;
1273 }
1274
1275 /**
1276 * This function is called if this client was notified for target restarting
1277 * by the MGS. A CONFIG_READ RPC is going to send to fetch recovery logs.
1278 */
1279 static int mgc_process_recover_log(struct obd_device *obd,
1280 struct config_llog_data *cld)
1281 {
1282 struct ptlrpc_request *req = NULL;
1283 struct config_llog_instance *cfg = &cld->cld_cfg;
1284 struct mgs_config_body *body;
1285 struct mgs_config_res *res;
1286 struct ptlrpc_bulk_desc *desc;
1287 struct page **pages;
1288 int nrpages;
1289 bool eof = true;
1290 bool mne_swab = false;
1291 int i;
1292 int ealen;
1293 int rc;
1294
1295 /* allocate buffer for bulk transfer.
1296 * if this is the first time for this mgs to read logs,
1297 * CONFIG_READ_NRPAGES_INIT will be used since it will read all logs
1298 * once; otherwise, it only reads increment of logs, this should be
1299 * small and CONFIG_READ_NRPAGES will be used.
1300 */
1301 nrpages = CONFIG_READ_NRPAGES;
1302 if (cfg->cfg_last_idx == 0) /* the first time */
1303 nrpages = CONFIG_READ_NRPAGES_INIT;
1304
1305 pages = kcalloc(nrpages, sizeof(*pages), GFP_NOFS);
1306 if (pages == NULL) {
1307 rc = -ENOMEM;
1308 goto out;
1309 }
1310
1311 for (i = 0; i < nrpages; i++) {
1312 pages[i] = alloc_page(GFP_IOFS);
1313 if (pages[i] == NULL) {
1314 rc = -ENOMEM;
1315 goto out;
1316 }
1317 }
1318
1319 again:
1320 LASSERT(cld_is_recover(cld));
1321 LASSERT(mutex_is_locked(&cld->cld_lock));
1322 req = ptlrpc_request_alloc(class_exp2cliimp(cld->cld_mgcexp),
1323 &RQF_MGS_CONFIG_READ);
1324 if (req == NULL) {
1325 rc = -ENOMEM;
1326 goto out;
1327 }
1328
1329 rc = ptlrpc_request_pack(req, LUSTRE_MGS_VERSION, MGS_CONFIG_READ);
1330 if (rc)
1331 goto out;
1332
1333 /* pack request */
1334 body = req_capsule_client_get(&req->rq_pill, &RMF_MGS_CONFIG_BODY);
1335 LASSERT(body != NULL);
1336 LASSERT(sizeof(body->mcb_name) > strlen(cld->cld_logname));
1337 if (strlcpy(body->mcb_name, cld->cld_logname, sizeof(body->mcb_name))
1338 >= sizeof(body->mcb_name)) {
1339 rc = -E2BIG;
1340 goto out;
1341 }
1342 body->mcb_offset = cfg->cfg_last_idx + 1;
1343 body->mcb_type = cld->cld_type;
1344 body->mcb_bits = PAGE_CACHE_SHIFT;
1345 body->mcb_units = nrpages;
1346
1347 /* allocate bulk transfer descriptor */
1348 desc = ptlrpc_prep_bulk_imp(req, nrpages, 1, BULK_PUT_SINK,
1349 MGS_BULK_PORTAL);
1350 if (desc == NULL) {
1351 rc = -ENOMEM;
1352 goto out;
1353 }
1354
1355 for (i = 0; i < nrpages; i++)
1356 ptlrpc_prep_bulk_page_pin(desc, pages[i], 0, PAGE_CACHE_SIZE);
1357
1358 ptlrpc_request_set_replen(req);
1359 rc = ptlrpc_queue_wait(req);
1360 if (rc)
1361 goto out;
1362
1363 res = req_capsule_server_get(&req->rq_pill, &RMF_MGS_CONFIG_RES);
1364 if (res->mcr_size < res->mcr_offset) {
1365 rc = -EINVAL;
1366 goto out;
1367 }
1368
1369 /* always update the index even though it might have errors with
1370 * handling the recover logs */
1371 cfg->cfg_last_idx = res->mcr_offset;
1372 eof = res->mcr_offset == res->mcr_size;
1373
1374 CDEBUG(D_INFO, "Latest version %lld, more %d.\n",
1375 res->mcr_offset, eof == false);
1376
1377 ealen = sptlrpc_cli_unwrap_bulk_read(req, req->rq_bulk, 0);
1378 if (ealen < 0) {
1379 rc = ealen;
1380 goto out;
1381 }
1382
1383 if (ealen > nrpages << PAGE_CACHE_SHIFT) {
1384 rc = -EINVAL;
1385 goto out;
1386 }
1387
1388 if (ealen == 0) { /* no logs transferred */
1389 if (!eof)
1390 rc = -EINVAL;
1391 goto out;
1392 }
1393
1394 mne_swab = !!ptlrpc_rep_need_swab(req);
1395 #if LUSTRE_VERSION_CODE < OBD_OCD_VERSION(3, 2, 50, 0)
1396 /* This import flag means the server did an extra swab of IR MNE
1397 * records (fixed in LU-1252), reverse it here if needed. LU-1644 */
1398 if (unlikely(req->rq_import->imp_need_mne_swab))
1399 mne_swab = !mne_swab;
1400 #else
1401 #warning "LU-1644: Remove old OBD_CONNECT_MNE_SWAB fixup and imp_need_mne_swab"
1402 #endif
1403
1404 for (i = 0; i < nrpages && ealen > 0; i++) {
1405 int rc2;
1406 void *ptr;
1407
1408 ptr = kmap(pages[i]);
1409 rc2 = mgc_apply_recover_logs(obd, cld, res->mcr_offset, ptr,
1410 min_t(int, ealen, PAGE_CACHE_SIZE),
1411 mne_swab);
1412 kunmap(pages[i]);
1413 if (rc2 < 0) {
1414 CWARN("Process recover log %s error %d\n",
1415 cld->cld_logname, rc2);
1416 break;
1417 }
1418
1419 ealen -= PAGE_CACHE_SIZE;
1420 }
1421
1422 out:
1423 if (req)
1424 ptlrpc_req_finished(req);
1425
1426 if (rc == 0 && !eof)
1427 goto again;
1428
1429 if (pages) {
1430 for (i = 0; i < nrpages; i++) {
1431 if (pages[i] == NULL)
1432 break;
1433 __free_page(pages[i]);
1434 }
1435 kfree(pages);
1436 }
1437 return rc;
1438 }
1439
1440 /* local_only means it cannot get remote llogs */
1441 static int mgc_process_cfg_log(struct obd_device *mgc,
1442 struct config_llog_data *cld, int local_only)
1443 {
1444 struct llog_ctxt *ctxt;
1445 struct lustre_sb_info *lsi = NULL;
1446 int rc = 0;
1447 bool sptlrpc_started = false;
1448 struct lu_env *env;
1449
1450 LASSERT(cld);
1451 LASSERT(mutex_is_locked(&cld->cld_lock));
1452
1453 /*
1454 * local copy of sptlrpc log is controlled elsewhere, don't try to
1455 * read it up here.
1456 */
1457 if (cld_is_sptlrpc(cld) && local_only)
1458 return 0;
1459
1460 if (cld->cld_cfg.cfg_sb)
1461 lsi = s2lsi(cld->cld_cfg.cfg_sb);
1462
1463 env = kzalloc(sizeof(*env), GFP_NOFS);
1464 if (!env)
1465 return -ENOMEM;
1466
1467 rc = lu_env_init(env, LCT_MG_THREAD);
1468 if (rc)
1469 goto out_free;
1470
1471 ctxt = llog_get_context(mgc, LLOG_CONFIG_REPL_CTXT);
1472 LASSERT(ctxt);
1473
1474 if (local_only) /* no local log at client side */ {
1475 rc = -EIO;
1476 goto out_pop;
1477 }
1478
1479 if (cld_is_sptlrpc(cld)) {
1480 sptlrpc_conf_log_update_begin(cld->cld_logname);
1481 sptlrpc_started = true;
1482 }
1483
1484 /* logname and instance info should be the same, so use our
1485 * copy of the instance for the update. The cfg_last_idx will
1486 * be updated here. */
1487 rc = class_config_parse_llog(env, ctxt, cld->cld_logname,
1488 &cld->cld_cfg);
1489
1490 out_pop:
1491 __llog_ctxt_put(env, ctxt);
1492
1493 /*
1494 * update settings on existing OBDs. doing it inside
1495 * of llog_process_lock so no device is attaching/detaching
1496 * in parallel.
1497 * the logname must be <fsname>-sptlrpc
1498 */
1499 if (sptlrpc_started) {
1500 LASSERT(cld_is_sptlrpc(cld));
1501 sptlrpc_conf_log_update_end(cld->cld_logname);
1502 class_notify_sptlrpc_conf(cld->cld_logname,
1503 strlen(cld->cld_logname) -
1504 strlen("-sptlrpc"));
1505 }
1506
1507 lu_env_fini(env);
1508 out_free:
1509 kfree(env);
1510 return rc;
1511 }
1512
1513 /** Get a config log from the MGS and process it.
1514 * This func is called for both clients and servers.
1515 * Copy the log locally before parsing it if appropriate (non-MGS server)
1516 */
1517 int mgc_process_log(struct obd_device *mgc, struct config_llog_data *cld)
1518 {
1519 struct lustre_handle lockh = { 0 };
1520 __u64 flags = LDLM_FL_NO_LRU;
1521 int rc = 0, rcl;
1522
1523 LASSERT(cld);
1524
1525 /* I don't want multiple processes running process_log at once --
1526 sounds like badness. It actually might be fine, as long as
1527 we're not trying to update from the same log
1528 simultaneously (in which case we should use a per-log sem.) */
1529 mutex_lock(&cld->cld_lock);
1530 if (cld->cld_stopping) {
1531 mutex_unlock(&cld->cld_lock);
1532 return 0;
1533 }
1534
1535 OBD_FAIL_TIMEOUT(OBD_FAIL_MGC_PAUSE_PROCESS_LOG, 20);
1536
1537 CDEBUG(D_MGC, "Process log %s:%p from %d\n", cld->cld_logname,
1538 cld->cld_cfg.cfg_instance, cld->cld_cfg.cfg_last_idx + 1);
1539
1540 /* Get the cfg lock on the llog */
1541 rcl = mgc_enqueue(mgc->u.cli.cl_mgc_mgsexp, NULL, LDLM_PLAIN, NULL,
1542 LCK_CR, &flags, NULL, NULL, NULL,
1543 cld, 0, NULL, &lockh);
1544 if (rcl == 0) {
1545 /* Get the cld, it will be released in mgc_blocking_ast. */
1546 config_log_get(cld);
1547 rc = ldlm_lock_set_data(&lockh, (void *)cld);
1548 LASSERT(rc == 0);
1549 } else {
1550 CDEBUG(D_MGC, "Can't get cfg lock: %d\n", rcl);
1551
1552 /* mark cld_lostlock so that it will requeue
1553 * after MGC becomes available. */
1554 cld->cld_lostlock = 1;
1555 /* Get extra reference, it will be put in requeue thread */
1556 config_log_get(cld);
1557 }
1558
1559
1560 if (cld_is_recover(cld)) {
1561 rc = 0; /* this is not a fatal error for recover log */
1562 if (rcl == 0)
1563 rc = mgc_process_recover_log(mgc, cld);
1564 } else {
1565 rc = mgc_process_cfg_log(mgc, cld, rcl != 0);
1566 }
1567
1568 CDEBUG(D_MGC, "%s: configuration from log '%s' %sed (%d).\n",
1569 mgc->obd_name, cld->cld_logname, rc ? "fail" : "succeed", rc);
1570
1571 mutex_unlock(&cld->cld_lock);
1572
1573 /* Now drop the lock so MGS can revoke it */
1574 if (!rcl)
1575 ldlm_lock_decref(&lockh, LCK_CR);
1576
1577 return rc;
1578 }
1579
1580
1581 /** Called from lustre_process_log.
1582 * LCFG_LOG_START gets the config log from the MGS, processes it to start
1583 * any services, and adds it to the list logs to watch (follow).
1584 */
1585 static int mgc_process_config(struct obd_device *obd, u32 len, void *buf)
1586 {
1587 struct lustre_cfg *lcfg = buf;
1588 struct config_llog_instance *cfg = NULL;
1589 char *logname;
1590 int rc = 0;
1591
1592 switch (lcfg->lcfg_command) {
1593 case LCFG_LOV_ADD_OBD: {
1594 /* Overloading this cfg command: register a new target */
1595 struct mgs_target_info *mti;
1596
1597 if (LUSTRE_CFG_BUFLEN(lcfg, 1) !=
1598 sizeof(struct mgs_target_info)) {
1599 rc = -EINVAL;
1600 goto out;
1601 }
1602
1603 mti = (struct mgs_target_info *)lustre_cfg_buf(lcfg, 1);
1604 CDEBUG(D_MGC, "add_target %s %#x\n",
1605 mti->mti_svname, mti->mti_flags);
1606 rc = mgc_target_register(obd->u.cli.cl_mgc_mgsexp, mti);
1607 break;
1608 }
1609 case LCFG_LOV_DEL_OBD:
1610 /* Unregister has no meaning at the moment. */
1611 CERROR("lov_del_obd unimplemented\n");
1612 rc = -ENOSYS;
1613 break;
1614 case LCFG_SPTLRPC_CONF: {
1615 rc = sptlrpc_process_config(lcfg);
1616 break;
1617 }
1618 case LCFG_LOG_START: {
1619 struct config_llog_data *cld;
1620 struct super_block *sb;
1621
1622 logname = lustre_cfg_string(lcfg, 1);
1623 cfg = (struct config_llog_instance *)lustre_cfg_buf(lcfg, 2);
1624 sb = *(struct super_block **)lustre_cfg_buf(lcfg, 3);
1625
1626 CDEBUG(D_MGC, "parse_log %s from %d\n", logname,
1627 cfg->cfg_last_idx);
1628
1629 /* We're only called through here on the initial mount */
1630 rc = config_log_add(obd, logname, cfg, sb);
1631 if (rc)
1632 break;
1633 cld = config_log_find(logname, cfg);
1634 if (cld == NULL) {
1635 rc = -ENOENT;
1636 break;
1637 }
1638
1639 /* COMPAT_146 */
1640 /* FIXME only set this for old logs! Right now this forces
1641 us to always skip the "inside markers" check */
1642 cld->cld_cfg.cfg_flags |= CFG_F_COMPAT146;
1643
1644 rc = mgc_process_log(obd, cld);
1645 if (rc == 0 && cld->cld_recover != NULL) {
1646 if (OCD_HAS_FLAG(&obd->u.cli.cl_import->
1647 imp_connect_data, IMP_RECOV)) {
1648 rc = mgc_process_log(obd, cld->cld_recover);
1649 } else {
1650 struct config_llog_data *cir = cld->cld_recover;
1651 cld->cld_recover = NULL;
1652 config_log_put(cir);
1653 }
1654 if (rc)
1655 CERROR("Cannot process recover llog %d\n", rc);
1656 }
1657
1658 if (rc == 0 && cld->cld_params != NULL) {
1659 rc = mgc_process_log(obd, cld->cld_params);
1660 if (rc == -ENOENT) {
1661 CDEBUG(D_MGC,
1662 "There is no params config file yet\n");
1663 rc = 0;
1664 }
1665 /* params log is optional */
1666 if (rc)
1667 CERROR(
1668 "%s: can't process params llog: rc = %d\n",
1669 obd->obd_name, rc);
1670 }
1671 config_log_put(cld);
1672
1673 break;
1674 }
1675 case LCFG_LOG_END: {
1676 logname = lustre_cfg_string(lcfg, 1);
1677
1678 if (lcfg->lcfg_bufcount >= 2)
1679 cfg = (struct config_llog_instance *)lustre_cfg_buf(
1680 lcfg, 2);
1681 rc = config_log_end(logname, cfg);
1682 break;
1683 }
1684 default: {
1685 CERROR("Unknown command: %d\n", lcfg->lcfg_command);
1686 rc = -EINVAL;
1687 goto out;
1688
1689 }
1690 }
1691 out:
1692 return rc;
1693 }
1694
1695 struct obd_ops mgc_obd_ops = {
1696 .o_owner = THIS_MODULE,
1697 .o_setup = mgc_setup,
1698 .o_precleanup = mgc_precleanup,
1699 .o_cleanup = mgc_cleanup,
1700 .o_add_conn = client_import_add_conn,
1701 .o_del_conn = client_import_del_conn,
1702 .o_connect = client_connect_import,
1703 .o_disconnect = client_disconnect_export,
1704 /* .o_enqueue = mgc_enqueue, */
1705 /* .o_iocontrol = mgc_iocontrol, */
1706 .o_set_info_async = mgc_set_info_async,
1707 .o_get_info = mgc_get_info,
1708 .o_import_event = mgc_import_event,
1709 .o_process_config = mgc_process_config,
1710 };
1711
1712 static int __init mgc_init(void)
1713 {
1714 return class_register_type(&mgc_obd_ops, NULL,
1715 LUSTRE_MGC_NAME, NULL);
1716 }
1717
1718 static void /*__exit*/ mgc_exit(void)
1719 {
1720 class_unregister_type(LUSTRE_MGC_NAME);
1721 }
1722
1723 MODULE_AUTHOR("Sun Microsystems, Inc. <http://www.lustre.org/>");
1724 MODULE_DESCRIPTION("Lustre Management Client");
1725 MODULE_LICENSE("GPL");
1726
1727 module_init(mgc_init);
1728 module_exit(mgc_exit);
This page took 0.148026 seconds and 5 git commands to generate.