md-cluster: fix deadlock issue on message lock
[deliverable/linux.git] / drivers / md / md-cluster.c
CommitLineData
8e854e9c
GR
1/*
2 * Copyright (C) 2015, SUSE
3 *
4 * This program is free software; you can redistribute it and/or modify
5 * it under the terms of the GNU General Public License as published by
6 * the Free Software Foundation; either version 2, or (at your option)
7 * any later version.
8 *
9 */
10
11
12#include <linux/module.h>
47741b7c
GR
13#include <linux/dlm.h>
14#include <linux/sched.h>
1aee41f6 15#include <linux/raid/md_p.h>
47741b7c 16#include "md.h"
e94987db 17#include "bitmap.h"
edb39c9d 18#include "md-cluster.h"
47741b7c
GR
19
20#define LVB_SIZE 64
1aee41f6 21#define NEW_DEV_TIMEOUT 5000
47741b7c
GR
22
23struct dlm_lock_resource {
24 dlm_lockspace_t *ls;
25 struct dlm_lksb lksb;
26 char *name; /* lock name. */
27 uint32_t flags; /* flags to pass to dlm_lock() */
47741b7c 28 struct completion completion; /* completion for synchronized locking */
c4ce867f
GR
29 void (*bast)(void *arg, int mode); /* blocking AST function pointer*/
30 struct mddev *mddev; /* pointing back to mddev. */
31};
32
96ae923a
GR
33struct suspend_info {
34 int slot;
35 sector_t lo;
36 sector_t hi;
37 struct list_head list;
38};
39
40struct resync_info {
41 __le64 lo;
42 __le64 hi;
43};
44
fa8259da
GR
45/* md_cluster_info flags */
46#define MD_CLUSTER_WAITING_FOR_NEWDISK 1
90382ed9 47#define MD_CLUSTER_SUSPEND_READ_BALANCING 2
fa8259da
GR
48
49
c4ce867f
GR
50struct md_cluster_info {
51 /* dlm lock space and resources for clustered raid. */
52 dlm_lockspace_t *lockspace;
cf921cc1
GR
53 int slot_number;
54 struct completion completion;
c4ce867f
GR
55 struct dlm_lock_resource *sb_lock;
56 struct mutex sb_mutex;
54519c5f 57 struct dlm_lock_resource *bitmap_lockres;
96ae923a
GR
58 struct list_head suspend_list;
59 spinlock_t suspend_lock;
e94987db
GR
60 struct md_thread *recovery_thread;
61 unsigned long recovery_map;
4664680c
GR
62 /* communication loc resources */
63 struct dlm_lock_resource *ack_lockres;
64 struct dlm_lock_resource *message_lockres;
65 struct dlm_lock_resource *token_lockres;
1aee41f6 66 struct dlm_lock_resource *no_new_dev_lockres;
4664680c 67 struct md_thread *recv_thread;
1aee41f6 68 struct completion newdisk_completion;
fa8259da 69 unsigned long state;
4664680c
GR
70};
71
72enum msg_type {
73 METADATA_UPDATED = 0,
74 RESYNCING,
1aee41f6 75 NEWDISK,
88bcfef7 76 REMOVE,
97f6cd39 77 RE_ADD,
dc737d7c 78 BITMAP_NEEDS_SYNC,
4664680c
GR
79};
80
81struct cluster_msg {
82 int type;
83 int slot;
1aee41f6 84 /* TODO: Unionize this for smaller footprint */
4664680c
GR
85 sector_t low;
86 sector_t high;
1aee41f6
GR
87 char uuid[16];
88 int raid_slot;
47741b7c
GR
89};
90
91static void sync_ast(void *arg)
92{
93 struct dlm_lock_resource *res;
94
95 res = (struct dlm_lock_resource *) arg;
96 complete(&res->completion);
97}
98
99static int dlm_lock_sync(struct dlm_lock_resource *res, int mode)
100{
101 int ret = 0;
102
103 init_completion(&res->completion);
104 ret = dlm_lock(res->ls, mode, &res->lksb,
105 res->flags, res->name, strlen(res->name),
106 0, sync_ast, res, res->bast);
107 if (ret)
108 return ret;
109 wait_for_completion(&res->completion);
110 return res->lksb.sb_status;
111}
112
113static int dlm_unlock_sync(struct dlm_lock_resource *res)
114{
115 return dlm_lock_sync(res, DLM_LOCK_NL);
116}
117
c4ce867f 118static struct dlm_lock_resource *lockres_init(struct mddev *mddev,
47741b7c
GR
119 char *name, void (*bastfn)(void *arg, int mode), int with_lvb)
120{
121 struct dlm_lock_resource *res = NULL;
122 int ret, namelen;
c4ce867f 123 struct md_cluster_info *cinfo = mddev->cluster_info;
47741b7c
GR
124
125 res = kzalloc(sizeof(struct dlm_lock_resource), GFP_KERNEL);
126 if (!res)
127 return NULL;
c4ce867f
GR
128 res->ls = cinfo->lockspace;
129 res->mddev = mddev;
47741b7c
GR
130 namelen = strlen(name);
131 res->name = kzalloc(namelen + 1, GFP_KERNEL);
132 if (!res->name) {
133 pr_err("md-cluster: Unable to allocate resource name for resource %s\n", name);
134 goto out_err;
135 }
136 strlcpy(res->name, name, namelen + 1);
137 if (with_lvb) {
138 res->lksb.sb_lvbptr = kzalloc(LVB_SIZE, GFP_KERNEL);
139 if (!res->lksb.sb_lvbptr) {
140 pr_err("md-cluster: Unable to allocate LVB for resource %s\n", name);
141 goto out_err;
142 }
143 res->flags = DLM_LKF_VALBLK;
144 }
145
146 if (bastfn)
147 res->bast = bastfn;
148
149 res->flags |= DLM_LKF_EXPEDITE;
150
151 ret = dlm_lock_sync(res, DLM_LOCK_NL);
152 if (ret) {
153 pr_err("md-cluster: Unable to lock NL on new lock resource %s\n", name);
154 goto out_err;
155 }
156 res->flags &= ~DLM_LKF_EXPEDITE;
157 res->flags |= DLM_LKF_CONVERT;
158
159 return res;
160out_err:
161 kfree(res->lksb.sb_lvbptr);
162 kfree(res->name);
163 kfree(res);
164 return NULL;
165}
166
167static void lockres_free(struct dlm_lock_resource *res)
168{
169 if (!res)
170 return;
171
172 init_completion(&res->completion);
173 dlm_unlock(res->ls, res->lksb.sb_lkid, 0, &res->lksb, res);
174 wait_for_completion(&res->completion);
175
176 kfree(res->name);
177 kfree(res->lksb.sb_lvbptr);
178 kfree(res);
179}
8e854e9c 180
96ae923a
GR
181static void add_resync_info(struct mddev *mddev, struct dlm_lock_resource *lockres,
182 sector_t lo, sector_t hi)
183{
184 struct resync_info *ri;
185
186 ri = (struct resync_info *)lockres->lksb.sb_lvbptr;
187 ri->lo = cpu_to_le64(lo);
188 ri->hi = cpu_to_le64(hi);
189}
190
191static struct suspend_info *read_resync_info(struct mddev *mddev, struct dlm_lock_resource *lockres)
192{
193 struct resync_info ri;
194 struct suspend_info *s = NULL;
195 sector_t hi = 0;
196
197 dlm_lock_sync(lockres, DLM_LOCK_CR);
198 memcpy(&ri, lockres->lksb.sb_lvbptr, sizeof(struct resync_info));
199 hi = le64_to_cpu(ri.hi);
200 if (ri.hi > 0) {
201 s = kzalloc(sizeof(struct suspend_info), GFP_KERNEL);
202 if (!s)
203 goto out;
204 s->hi = hi;
205 s->lo = le64_to_cpu(ri.lo);
206 }
207 dlm_unlock_sync(lockres);
208out:
209 return s;
210}
211
6dc69c9c 212static void recover_bitmaps(struct md_thread *thread)
e94987db
GR
213{
214 struct mddev *mddev = thread->mddev;
215 struct md_cluster_info *cinfo = mddev->cluster_info;
216 struct dlm_lock_resource *bm_lockres;
217 char str[64];
218 int slot, ret;
219 struct suspend_info *s, *tmp;
220 sector_t lo, hi;
221
222 while (cinfo->recovery_map) {
223 slot = fls64((u64)cinfo->recovery_map) - 1;
224
225 /* Clear suspend_area associated with the bitmap */
226 spin_lock_irq(&cinfo->suspend_lock);
227 list_for_each_entry_safe(s, tmp, &cinfo->suspend_list, list)
228 if (slot == s->slot) {
229 list_del(&s->list);
230 kfree(s);
231 }
232 spin_unlock_irq(&cinfo->suspend_lock);
233
234 snprintf(str, 64, "bitmap%04d", slot);
235 bm_lockres = lockres_init(mddev, str, NULL, 1);
236 if (!bm_lockres) {
237 pr_err("md-cluster: Cannot initialize bitmaps\n");
238 goto clear_bit;
239 }
240
241 ret = dlm_lock_sync(bm_lockres, DLM_LOCK_PW);
242 if (ret) {
243 pr_err("md-cluster: Could not DLM lock %s: %d\n",
244 str, ret);
245 goto clear_bit;
246 }
97f6cd39 247 ret = bitmap_copy_from_slot(mddev, slot, &lo, &hi, true);
4b26a08a 248 if (ret) {
e94987db 249 pr_err("md-cluster: Could not copy data from bitmap %d\n", slot);
4b26a08a
GR
250 goto dlm_unlock;
251 }
252 if (hi > 0) {
253 /* TODO:Wait for current resync to get over */
254 set_bit(MD_RECOVERY_NEEDED, &mddev->recovery);
255 if (lo < mddev->recovery_cp)
256 mddev->recovery_cp = lo;
257 md_check_recovery(mddev);
258 }
259dlm_unlock:
e94987db
GR
260 dlm_unlock_sync(bm_lockres);
261clear_bit:
262 clear_bit(slot, &cinfo->recovery_map);
263 }
264}
265
cf921cc1
GR
266static void recover_prep(void *arg)
267{
90382ed9
GR
268 struct mddev *mddev = arg;
269 struct md_cluster_info *cinfo = mddev->cluster_info;
270 set_bit(MD_CLUSTER_SUSPEND_READ_BALANCING, &cinfo->state);
cf921cc1
GR
271}
272
05cd0e51 273static void __recover_slot(struct mddev *mddev, int slot)
cf921cc1 274{
cf921cc1
GR
275 struct md_cluster_info *cinfo = mddev->cluster_info;
276
05cd0e51 277 set_bit(slot, &cinfo->recovery_map);
e94987db
GR
278 if (!cinfo->recovery_thread) {
279 cinfo->recovery_thread = md_register_thread(recover_bitmaps,
280 mddev, "recover");
281 if (!cinfo->recovery_thread) {
282 pr_warn("md-cluster: Could not create recovery thread\n");
283 return;
284 }
285 }
286 md_wakeup_thread(cinfo->recovery_thread);
cf921cc1
GR
287}
288
05cd0e51
GJ
289static void recover_slot(void *arg, struct dlm_slot *slot)
290{
291 struct mddev *mddev = arg;
292 struct md_cluster_info *cinfo = mddev->cluster_info;
293
294 pr_info("md-cluster: %s Node %d/%d down. My slot: %d. Initiating recovery.\n",
295 mddev->bitmap_info.cluster_name,
296 slot->nodeid, slot->slot,
297 cinfo->slot_number);
298 /* deduct one since dlm slot starts from one while the num of
299 * cluster-md begins with 0 */
300 __recover_slot(mddev, slot->slot - 1);
301}
302
cf921cc1
GR
303static void recover_done(void *arg, struct dlm_slot *slots,
304 int num_slots, int our_slot,
305 uint32_t generation)
306{
307 struct mddev *mddev = arg;
308 struct md_cluster_info *cinfo = mddev->cluster_info;
309
310 cinfo->slot_number = our_slot;
311 complete(&cinfo->completion);
90382ed9 312 clear_bit(MD_CLUSTER_SUSPEND_READ_BALANCING, &cinfo->state);
cf921cc1
GR
313}
314
315static const struct dlm_lockspace_ops md_ls_ops = {
316 .recover_prep = recover_prep,
317 .recover_slot = recover_slot,
318 .recover_done = recover_done,
319};
320
4664680c
GR
321/*
322 * The BAST function for the ack lock resource
323 * This function wakes up the receive thread in
324 * order to receive and process the message.
325 */
326static void ack_bast(void *arg, int mode)
327{
328 struct dlm_lock_resource *res = (struct dlm_lock_resource *)arg;
329 struct md_cluster_info *cinfo = res->mddev->cluster_info;
330
331 if (mode == DLM_LOCK_EX)
332 md_wakeup_thread(cinfo->recv_thread);
333}
334
e59721cc
GR
335static void __remove_suspend_info(struct md_cluster_info *cinfo, int slot)
336{
337 struct suspend_info *s, *tmp;
338
339 list_for_each_entry_safe(s, tmp, &cinfo->suspend_list, list)
340 if (slot == s->slot) {
341 pr_info("%s:%d Deleting suspend_info: %d\n",
342 __func__, __LINE__, slot);
343 list_del(&s->list);
344 kfree(s);
345 break;
346 }
347}
348
349static void remove_suspend_info(struct md_cluster_info *cinfo, int slot)
350{
351 spin_lock_irq(&cinfo->suspend_lock);
352 __remove_suspend_info(cinfo, slot);
353 spin_unlock_irq(&cinfo->suspend_lock);
354}
355
356
357static void process_suspend_info(struct md_cluster_info *cinfo,
358 int slot, sector_t lo, sector_t hi)
359{
360 struct suspend_info *s;
361
362 if (!hi) {
363 remove_suspend_info(cinfo, slot);
364 return;
365 }
366 s = kzalloc(sizeof(struct suspend_info), GFP_KERNEL);
367 if (!s)
368 return;
369 s->slot = slot;
370 s->lo = lo;
371 s->hi = hi;
372 spin_lock_irq(&cinfo->suspend_lock);
373 /* Remove existing entry (if exists) before adding */
374 __remove_suspend_info(cinfo, slot);
375 list_add(&s->list, &cinfo->suspend_list);
376 spin_unlock_irq(&cinfo->suspend_lock);
377}
378
1aee41f6
GR
379static void process_add_new_disk(struct mddev *mddev, struct cluster_msg *cmsg)
380{
381 char disk_uuid[64];
382 struct md_cluster_info *cinfo = mddev->cluster_info;
383 char event_name[] = "EVENT=ADD_DEVICE";
384 char raid_slot[16];
385 char *envp[] = {event_name, disk_uuid, raid_slot, NULL};
386 int len;
387
388 len = snprintf(disk_uuid, 64, "DEVICE_UUID=");
b89f704a 389 sprintf(disk_uuid + len, "%pU", cmsg->uuid);
1aee41f6
GR
390 snprintf(raid_slot, 16, "RAID_DISK=%d", cmsg->raid_slot);
391 pr_info("%s:%d Sending kobject change with %s and %s\n", __func__, __LINE__, disk_uuid, raid_slot);
392 init_completion(&cinfo->newdisk_completion);
fa8259da 393 set_bit(MD_CLUSTER_WAITING_FOR_NEWDISK, &cinfo->state);
1aee41f6
GR
394 kobject_uevent_env(&disk_to_dev(mddev->gendisk)->kobj, KOBJ_CHANGE, envp);
395 wait_for_completion_timeout(&cinfo->newdisk_completion,
396 NEW_DEV_TIMEOUT);
fa8259da 397 clear_bit(MD_CLUSTER_WAITING_FOR_NEWDISK, &cinfo->state);
1aee41f6
GR
398}
399
400
401static void process_metadata_update(struct mddev *mddev, struct cluster_msg *msg)
402{
403 struct md_cluster_info *cinfo = mddev->cluster_info;
404
405 md_reload_sb(mddev);
406 dlm_lock_sync(cinfo->no_new_dev_lockres, DLM_LOCK_CR);
407}
408
88bcfef7
GR
409static void process_remove_disk(struct mddev *mddev, struct cluster_msg *msg)
410{
411 struct md_rdev *rdev = md_find_rdev_nr_rcu(mddev, msg->raid_slot);
412
413 if (rdev)
414 md_kick_rdev_from_array(rdev);
415 else
416 pr_warn("%s: %d Could not find disk(%d) to REMOVE\n", __func__, __LINE__, msg->raid_slot);
417}
418
97f6cd39
GR
419static void process_readd_disk(struct mddev *mddev, struct cluster_msg *msg)
420{
421 struct md_rdev *rdev = md_find_rdev_nr_rcu(mddev, msg->raid_slot);
422
423 if (rdev && test_bit(Faulty, &rdev->flags))
424 clear_bit(Faulty, &rdev->flags);
425 else
426 pr_warn("%s: %d Could not find disk(%d) which is faulty", __func__, __LINE__, msg->raid_slot);
427}
428
4664680c
GR
429static void process_recvd_msg(struct mddev *mddev, struct cluster_msg *msg)
430{
431 switch (msg->type) {
432 case METADATA_UPDATED:
433 pr_info("%s: %d Received message: METADATA_UPDATE from %d\n",
434 __func__, __LINE__, msg->slot);
1aee41f6 435 process_metadata_update(mddev, msg);
4664680c
GR
436 break;
437 case RESYNCING:
438 pr_info("%s: %d Received message: RESYNCING from %d\n",
439 __func__, __LINE__, msg->slot);
e59721cc
GR
440 process_suspend_info(mddev->cluster_info, msg->slot,
441 msg->low, msg->high);
4664680c 442 break;
1aee41f6
GR
443 case NEWDISK:
444 pr_info("%s: %d Received message: NEWDISK from %d\n",
445 __func__, __LINE__, msg->slot);
446 process_add_new_disk(mddev, msg);
88bcfef7
GR
447 break;
448 case REMOVE:
449 pr_info("%s: %d Received REMOVE from %d\n",
450 __func__, __LINE__, msg->slot);
451 process_remove_disk(mddev, msg);
452 break;
97f6cd39
GR
453 case RE_ADD:
454 pr_info("%s: %d Received RE_ADD from %d\n",
455 __func__, __LINE__, msg->slot);
456 process_readd_disk(mddev, msg);
457 break;
dc737d7c
GJ
458 case BITMAP_NEEDS_SYNC:
459 pr_info("%s: %d Received BITMAP_NEEDS_SYNC from %d\n",
460 __func__, __LINE__, msg->slot);
461 __recover_slot(mddev, msg->slot);
462 break;
88bcfef7
GR
463 default:
464 pr_warn("%s:%d Received unknown message from %d\n",
465 __func__, __LINE__, msg->slot);
09dd1af2 466 }
4664680c
GR
467}
468
469/*
470 * thread for receiving message
471 */
472static void recv_daemon(struct md_thread *thread)
473{
474 struct md_cluster_info *cinfo = thread->mddev->cluster_info;
475 struct dlm_lock_resource *ack_lockres = cinfo->ack_lockres;
476 struct dlm_lock_resource *message_lockres = cinfo->message_lockres;
477 struct cluster_msg msg;
478
479 /*get CR on Message*/
480 if (dlm_lock_sync(message_lockres, DLM_LOCK_CR)) {
481 pr_err("md/raid1:failed to get CR on MESSAGE\n");
482 return;
483 }
484
485 /* read lvb and wake up thread to process this message_lockres */
486 memcpy(&msg, message_lockres->lksb.sb_lvbptr, sizeof(struct cluster_msg));
487 process_recvd_msg(thread->mddev, &msg);
488
489 /*release CR on ack_lockres*/
490 dlm_unlock_sync(ack_lockres);
66099bb0
GJ
491 /*up-convert to PR on message_lockres*/
492 dlm_lock_sync(message_lockres, DLM_LOCK_PR);
4664680c
GR
493 /*get CR on ack_lockres again*/
494 dlm_lock_sync(ack_lockres, DLM_LOCK_CR);
495 /*release CR on message_lockres*/
496 dlm_unlock_sync(message_lockres);
497}
498
601b515c
GR
499/* lock_comm()
500 * Takes the lock on the TOKEN lock resource so no other
501 * node can communicate while the operation is underway.
502 */
503static int lock_comm(struct md_cluster_info *cinfo)
504{
505 int error;
506
507 error = dlm_lock_sync(cinfo->token_lockres, DLM_LOCK_EX);
508 if (error)
509 pr_err("md-cluster(%s:%d): failed to get EX on TOKEN (%d)\n",
510 __func__, __LINE__, error);
511 return error;
512}
513
514static void unlock_comm(struct md_cluster_info *cinfo)
515{
516 dlm_unlock_sync(cinfo->token_lockres);
517}
518
519/* __sendmsg()
520 * This function performs the actual sending of the message. This function is
521 * usually called after performing the encompassing operation
522 * The function:
523 * 1. Grabs the message lockresource in EX mode
524 * 2. Copies the message to the message LVB
66099bb0 525 * 3. Downconverts message lockresource to CW
601b515c
GR
526 * 4. Upconverts ack lock resource from CR to EX. This forces the BAST on other nodes
527 * and the other nodes read the message. The thread will wait here until all other
528 * nodes have released ack lock resource.
529 * 5. Downconvert ack lockresource to CR
530 */
531static int __sendmsg(struct md_cluster_info *cinfo, struct cluster_msg *cmsg)
532{
533 int error;
534 int slot = cinfo->slot_number - 1;
535
536 cmsg->slot = cpu_to_le32(slot);
537 /*get EX on Message*/
538 error = dlm_lock_sync(cinfo->message_lockres, DLM_LOCK_EX);
539 if (error) {
540 pr_err("md-cluster: failed to get EX on MESSAGE (%d)\n", error);
541 goto failed_message;
542 }
543
544 memcpy(cinfo->message_lockres->lksb.sb_lvbptr, (void *)cmsg,
545 sizeof(struct cluster_msg));
66099bb0
GJ
546 /*down-convert EX to CW on Message*/
547 error = dlm_lock_sync(cinfo->message_lockres, DLM_LOCK_CW);
601b515c 548 if (error) {
66099bb0 549 pr_err("md-cluster: failed to convert EX to CW on MESSAGE(%d)\n",
601b515c 550 error);
66099bb0 551 goto failed_ack;
601b515c
GR
552 }
553
554 /*up-convert CR to EX on Ack*/
555 error = dlm_lock_sync(cinfo->ack_lockres, DLM_LOCK_EX);
556 if (error) {
557 pr_err("md-cluster: failed to convert CR to EX on ACK(%d)\n",
558 error);
559 goto failed_ack;
560 }
561
562 /*down-convert EX to CR on Ack*/
563 error = dlm_lock_sync(cinfo->ack_lockres, DLM_LOCK_CR);
564 if (error) {
565 pr_err("md-cluster: failed to convert EX to CR on ACK(%d)\n",
566 error);
567 goto failed_ack;
568 }
569
570failed_ack:
571 dlm_unlock_sync(cinfo->message_lockres);
572failed_message:
573 return error;
574}
575
576static int sendmsg(struct md_cluster_info *cinfo, struct cluster_msg *cmsg)
577{
578 int ret;
579
580 lock_comm(cinfo);
581 ret = __sendmsg(cinfo, cmsg);
582 unlock_comm(cinfo);
583 return ret;
584}
585
96ae923a
GR
586static int gather_all_resync_info(struct mddev *mddev, int total_slots)
587{
588 struct md_cluster_info *cinfo = mddev->cluster_info;
589 int i, ret = 0;
590 struct dlm_lock_resource *bm_lockres;
591 struct suspend_info *s;
592 char str[64];
593
594
595 for (i = 0; i < total_slots; i++) {
596 memset(str, '\0', 64);
597 snprintf(str, 64, "bitmap%04d", i);
598 bm_lockres = lockres_init(mddev, str, NULL, 1);
599 if (!bm_lockres)
600 return -ENOMEM;
601 if (i == (cinfo->slot_number - 1))
602 continue;
603
604 bm_lockres->flags |= DLM_LKF_NOQUEUE;
605 ret = dlm_lock_sync(bm_lockres, DLM_LOCK_PW);
606 if (ret == -EAGAIN) {
607 memset(bm_lockres->lksb.sb_lvbptr, '\0', LVB_SIZE);
608 s = read_resync_info(mddev, bm_lockres);
609 if (s) {
610 pr_info("%s:%d Resync[%llu..%llu] in progress on %d\n",
611 __func__, __LINE__,
612 (unsigned long long) s->lo,
613 (unsigned long long) s->hi, i);
614 spin_lock_irq(&cinfo->suspend_lock);
615 s->slot = i;
616 list_add(&s->list, &cinfo->suspend_list);
617 spin_unlock_irq(&cinfo->suspend_lock);
618 }
619 ret = 0;
620 lockres_free(bm_lockres);
621 continue;
622 }
623 if (ret)
624 goto out;
625 /* TODO: Read the disk bitmap sb and check if it needs recovery */
626 dlm_unlock_sync(bm_lockres);
627 lockres_free(bm_lockres);
628 }
629out:
630 return ret;
631}
632
edb39c9d
GR
633static int join(struct mddev *mddev, int nodes)
634{
c4ce867f 635 struct md_cluster_info *cinfo;
cf921cc1 636 int ret, ops_rv;
c4ce867f
GR
637 char str[64];
638
639 if (!try_module_get(THIS_MODULE))
640 return -ENOENT;
641
642 cinfo = kzalloc(sizeof(struct md_cluster_info), GFP_KERNEL);
643 if (!cinfo)
644 return -ENOMEM;
645
cf921cc1
GR
646 init_completion(&cinfo->completion);
647
648 mutex_init(&cinfo->sb_mutex);
649 mddev->cluster_info = cinfo;
650
c4ce867f 651 memset(str, 0, 64);
b89f704a 652 sprintf(str, "%pU", mddev->uuid);
cf921cc1
GR
653 ret = dlm_new_lockspace(str, mddev->bitmap_info.cluster_name,
654 DLM_LSFL_FS, LVB_SIZE,
655 &md_ls_ops, mddev, &ops_rv, &cinfo->lockspace);
c4ce867f
GR
656 if (ret)
657 goto err;
cf921cc1 658 wait_for_completion(&cinfo->completion);
8c58f02e
GJ
659 if (nodes < cinfo->slot_number) {
660 pr_err("md-cluster: Slot allotted(%d) is greater than available slots(%d).",
661 cinfo->slot_number, nodes);
b97e9257
GR
662 ret = -ERANGE;
663 goto err;
664 }
c4ce867f
GR
665 cinfo->sb_lock = lockres_init(mddev, "cmd-super",
666 NULL, 0);
667 if (!cinfo->sb_lock) {
668 ret = -ENOMEM;
669 goto err;
670 }
4664680c
GR
671 /* Initiate the communication resources */
672 ret = -ENOMEM;
673 cinfo->recv_thread = md_register_thread(recv_daemon, mddev, "cluster_recv");
674 if (!cinfo->recv_thread) {
675 pr_err("md-cluster: cannot allocate memory for recv_thread!\n");
676 goto err;
677 }
678 cinfo->message_lockres = lockres_init(mddev, "message", NULL, 1);
679 if (!cinfo->message_lockres)
680 goto err;
681 cinfo->token_lockres = lockres_init(mddev, "token", NULL, 0);
682 if (!cinfo->token_lockres)
683 goto err;
684 cinfo->ack_lockres = lockres_init(mddev, "ack", ack_bast, 0);
685 if (!cinfo->ack_lockres)
686 goto err;
1aee41f6
GR
687 cinfo->no_new_dev_lockres = lockres_init(mddev, "no-new-dev", NULL, 0);
688 if (!cinfo->no_new_dev_lockres)
689 goto err;
690
4664680c
GR
691 /* get sync CR lock on ACK. */
692 if (dlm_lock_sync(cinfo->ack_lockres, DLM_LOCK_CR))
693 pr_err("md-cluster: failed to get a sync CR lock on ACK!(%d)\n",
694 ret);
1aee41f6
GR
695 /* get sync CR lock on no-new-dev. */
696 if (dlm_lock_sync(cinfo->no_new_dev_lockres, DLM_LOCK_CR))
697 pr_err("md-cluster: failed to get a sync CR lock on no-new-dev!(%d)\n", ret);
698
54519c5f
GR
699
700 pr_info("md-cluster: Joined cluster %s slot %d\n", str, cinfo->slot_number);
701 snprintf(str, 64, "bitmap%04d", cinfo->slot_number - 1);
702 cinfo->bitmap_lockres = lockres_init(mddev, str, NULL, 1);
703 if (!cinfo->bitmap_lockres)
704 goto err;
705 if (dlm_lock_sync(cinfo->bitmap_lockres, DLM_LOCK_PW)) {
706 pr_err("Failed to get bitmap lock\n");
707 ret = -EINVAL;
708 goto err;
709 }
710
96ae923a
GR
711 INIT_LIST_HEAD(&cinfo->suspend_list);
712 spin_lock_init(&cinfo->suspend_lock);
713
714 ret = gather_all_resync_info(mddev, nodes);
715 if (ret)
716 goto err;
717
edb39c9d 718 return 0;
c4ce867f 719err:
4664680c
GR
720 lockres_free(cinfo->message_lockres);
721 lockres_free(cinfo->token_lockres);
722 lockres_free(cinfo->ack_lockres);
1aee41f6 723 lockres_free(cinfo->no_new_dev_lockres);
96ae923a
GR
724 lockres_free(cinfo->bitmap_lockres);
725 lockres_free(cinfo->sb_lock);
c4ce867f
GR
726 if (cinfo->lockspace)
727 dlm_release_lockspace(cinfo->lockspace, 2);
cf921cc1 728 mddev->cluster_info = NULL;
c4ce867f
GR
729 kfree(cinfo);
730 module_put(THIS_MODULE);
731 return ret;
edb39c9d
GR
732}
733
734static int leave(struct mddev *mddev)
735{
c4ce867f
GR
736 struct md_cluster_info *cinfo = mddev->cluster_info;
737
738 if (!cinfo)
739 return 0;
e94987db 740 md_unregister_thread(&cinfo->recovery_thread);
4664680c
GR
741 md_unregister_thread(&cinfo->recv_thread);
742 lockres_free(cinfo->message_lockres);
743 lockres_free(cinfo->token_lockres);
744 lockres_free(cinfo->ack_lockres);
1aee41f6 745 lockres_free(cinfo->no_new_dev_lockres);
c4ce867f 746 lockres_free(cinfo->sb_lock);
54519c5f 747 lockres_free(cinfo->bitmap_lockres);
c4ce867f 748 dlm_release_lockspace(cinfo->lockspace, 2);
edb39c9d
GR
749 return 0;
750}
751
cf921cc1
GR
752/* slot_number(): Returns the MD slot number to use
753 * DLM starts the slot numbers from 1, wheras cluster-md
754 * wants the number to be from zero, so we deduct one
755 */
756static int slot_number(struct mddev *mddev)
757{
758 struct md_cluster_info *cinfo = mddev->cluster_info;
759
760 return cinfo->slot_number - 1;
761}
762
96ae923a
GR
763static void resync_info_update(struct mddev *mddev, sector_t lo, sector_t hi)
764{
765 struct md_cluster_info *cinfo = mddev->cluster_info;
766
767 add_resync_info(mddev, cinfo->bitmap_lockres, lo, hi);
768 /* Re-acquire the lock to refresh LVB */
769 dlm_lock_sync(cinfo->bitmap_lockres, DLM_LOCK_PW);
770}
771
293467aa
GR
772static int metadata_update_start(struct mddev *mddev)
773{
774 return lock_comm(mddev->cluster_info);
775}
776
777static int metadata_update_finish(struct mddev *mddev)
778{
779 struct md_cluster_info *cinfo = mddev->cluster_info;
780 struct cluster_msg cmsg;
781 int ret;
782
783 memset(&cmsg, 0, sizeof(cmsg));
784 cmsg.type = cpu_to_le32(METADATA_UPDATED);
785 ret = __sendmsg(cinfo, &cmsg);
786 unlock_comm(cinfo);
787 return ret;
788}
789
790static int metadata_update_cancel(struct mddev *mddev)
791{
792 struct md_cluster_info *cinfo = mddev->cluster_info;
793
794 return dlm_unlock_sync(cinfo->token_lockres);
795}
796
965400eb
GR
797static int resync_send(struct mddev *mddev, enum msg_type type,
798 sector_t lo, sector_t hi)
799{
800 struct md_cluster_info *cinfo = mddev->cluster_info;
801 struct cluster_msg cmsg;
802 int slot = cinfo->slot_number - 1;
803
804 pr_info("%s:%d lo: %llu hi: %llu\n", __func__, __LINE__,
805 (unsigned long long)lo,
806 (unsigned long long)hi);
807 resync_info_update(mddev, lo, hi);
808 cmsg.type = cpu_to_le32(type);
809 cmsg.slot = cpu_to_le32(slot);
810 cmsg.low = cpu_to_le64(lo);
811 cmsg.high = cpu_to_le64(hi);
812 return sendmsg(cinfo, &cmsg);
813}
814
815static int resync_start(struct mddev *mddev, sector_t lo, sector_t hi)
816{
817 pr_info("%s:%d\n", __func__, __LINE__);
818 return resync_send(mddev, RESYNCING, lo, hi);
819}
820
821static void resync_finish(struct mddev *mddev)
822{
dc737d7c
GJ
823 struct md_cluster_info *cinfo = mddev->cluster_info;
824 struct cluster_msg cmsg;
825 int slot = cinfo->slot_number - 1;
826
965400eb
GR
827 pr_info("%s:%d\n", __func__, __LINE__);
828 resync_send(mddev, RESYNCING, 0, 0);
dc737d7c
GJ
829 if (test_bit(MD_RECOVERY_INTR, &mddev->recovery)) {
830 cmsg.type = cpu_to_le32(BITMAP_NEEDS_SYNC);
831 cmsg.slot = cpu_to_le32(slot);
832 sendmsg(cinfo, &cmsg);
833 }
965400eb
GR
834}
835
90382ed9
GR
836static int area_resyncing(struct mddev *mddev, int direction,
837 sector_t lo, sector_t hi)
589a1c49
GR
838{
839 struct md_cluster_info *cinfo = mddev->cluster_info;
840 int ret = 0;
841 struct suspend_info *s;
842
90382ed9
GR
843 if ((direction == READ) &&
844 test_bit(MD_CLUSTER_SUSPEND_READ_BALANCING, &cinfo->state))
845 return 1;
846
589a1c49
GR
847 spin_lock_irq(&cinfo->suspend_lock);
848 if (list_empty(&cinfo->suspend_list))
849 goto out;
850 list_for_each_entry(s, &cinfo->suspend_list, list)
851 if (hi > s->lo && lo < s->hi) {
852 ret = 1;
853 break;
854 }
855out:
856 spin_unlock_irq(&cinfo->suspend_lock);
857 return ret;
858}
859
1aee41f6
GR
860static int add_new_disk_start(struct mddev *mddev, struct md_rdev *rdev)
861{
862 struct md_cluster_info *cinfo = mddev->cluster_info;
863 struct cluster_msg cmsg;
864 int ret = 0;
865 struct mdp_superblock_1 *sb = page_address(rdev->sb_page);
866 char *uuid = sb->device_uuid;
867
868 memset(&cmsg, 0, sizeof(cmsg));
869 cmsg.type = cpu_to_le32(NEWDISK);
870 memcpy(cmsg.uuid, uuid, 16);
871 cmsg.raid_slot = rdev->desc_nr;
872 lock_comm(cinfo);
873 ret = __sendmsg(cinfo, &cmsg);
874 if (ret)
875 return ret;
876 cinfo->no_new_dev_lockres->flags |= DLM_LKF_NOQUEUE;
877 ret = dlm_lock_sync(cinfo->no_new_dev_lockres, DLM_LOCK_EX);
878 cinfo->no_new_dev_lockres->flags &= ~DLM_LKF_NOQUEUE;
879 /* Some node does not "see" the device */
880 if (ret == -EAGAIN)
881 ret = -ENOENT;
882 else
883 dlm_lock_sync(cinfo->no_new_dev_lockres, DLM_LOCK_CR);
884 return ret;
885}
886
887static int add_new_disk_finish(struct mddev *mddev)
888{
889 struct cluster_msg cmsg;
890 struct md_cluster_info *cinfo = mddev->cluster_info;
891 int ret;
892 /* Write sb and inform others */
893 md_update_sb(mddev, 1);
894 cmsg.type = METADATA_UPDATED;
895 ret = __sendmsg(cinfo, &cmsg);
896 unlock_comm(cinfo);
897 return ret;
898}
899
fa8259da 900static int new_disk_ack(struct mddev *mddev, bool ack)
1aee41f6
GR
901{
902 struct md_cluster_info *cinfo = mddev->cluster_info;
903
fa8259da
GR
904 if (!test_bit(MD_CLUSTER_WAITING_FOR_NEWDISK, &cinfo->state)) {
905 pr_warn("md-cluster(%s): Spurious cluster confirmation\n", mdname(mddev));
906 return -EINVAL;
907 }
908
1aee41f6
GR
909 if (ack)
910 dlm_unlock_sync(cinfo->no_new_dev_lockres);
911 complete(&cinfo->newdisk_completion);
fa8259da 912 return 0;
1aee41f6
GR
913}
914
88bcfef7
GR
915static int remove_disk(struct mddev *mddev, struct md_rdev *rdev)
916{
917 struct cluster_msg cmsg;
918 struct md_cluster_info *cinfo = mddev->cluster_info;
919 cmsg.type = REMOVE;
920 cmsg.raid_slot = rdev->desc_nr;
921 return __sendmsg(cinfo, &cmsg);
922}
923
97f6cd39
GR
924static int gather_bitmaps(struct md_rdev *rdev)
925{
926 int sn, err;
927 sector_t lo, hi;
928 struct cluster_msg cmsg;
929 struct mddev *mddev = rdev->mddev;
930 struct md_cluster_info *cinfo = mddev->cluster_info;
931
932 cmsg.type = RE_ADD;
933 cmsg.raid_slot = rdev->desc_nr;
934 err = sendmsg(cinfo, &cmsg);
935 if (err)
936 goto out;
937
938 for (sn = 0; sn < mddev->bitmap_info.nodes; sn++) {
939 if (sn == (cinfo->slot_number - 1))
940 continue;
941 err = bitmap_copy_from_slot(mddev, sn, &lo, &hi, false);
942 if (err) {
943 pr_warn("md-cluster: Could not gather bitmaps from slot %d", sn);
944 goto out;
945 }
946 if ((hi > 0) && (lo < mddev->recovery_cp))
947 mddev->recovery_cp = lo;
948 }
949out:
950 return err;
951}
952
edb39c9d
GR
953static struct md_cluster_operations cluster_ops = {
954 .join = join,
955 .leave = leave,
cf921cc1 956 .slot_number = slot_number,
96ae923a 957 .resync_info_update = resync_info_update,
965400eb
GR
958 .resync_start = resync_start,
959 .resync_finish = resync_finish,
293467aa
GR
960 .metadata_update_start = metadata_update_start,
961 .metadata_update_finish = metadata_update_finish,
962 .metadata_update_cancel = metadata_update_cancel,
589a1c49 963 .area_resyncing = area_resyncing,
1aee41f6
GR
964 .add_new_disk_start = add_new_disk_start,
965 .add_new_disk_finish = add_new_disk_finish,
966 .new_disk_ack = new_disk_ack,
88bcfef7 967 .remove_disk = remove_disk,
97f6cd39 968 .gather_bitmaps = gather_bitmaps,
edb39c9d
GR
969};
970
8e854e9c
GR
971static int __init cluster_init(void)
972{
973 pr_warn("md-cluster: EXPERIMENTAL. Use with caution\n");
974 pr_info("Registering Cluster MD functions\n");
edb39c9d 975 register_md_cluster_operations(&cluster_ops, THIS_MODULE);
8e854e9c
GR
976 return 0;
977}
978
979static void cluster_exit(void)
980{
edb39c9d 981 unregister_md_cluster_operations();
8e854e9c
GR
982}
983
984module_init(cluster_init);
985module_exit(cluster_exit);
986MODULE_LICENSE("GPL");
987MODULE_DESCRIPTION("Clustering support for MD");
This page took 0.098498 seconds and 5 git commands to generate.