2 * Copyright (C) 2015, SUSE
4 * This program is free software; you can redistribute it and/or modify
5 * it under the terms of the GNU General Public License as published by
6 * the Free Software Foundation; either version 2, or (at your option)
12 #include <linux/module.h>
13 #include <linux/dlm.h>
14 #include <linux/sched.h>
17 #include "md-cluster.h"
21 struct dlm_lock_resource
{
24 char *name
; /* lock name. */
25 uint32_t flags
; /* flags to pass to dlm_lock() */
26 struct completion completion
; /* completion for synchronized locking */
27 void (*bast
)(void *arg
, int mode
); /* blocking AST function pointer*/
28 struct mddev
*mddev
; /* pointing back to mddev. */
35 struct list_head list
;
43 struct md_cluster_info
{
44 /* dlm lock space and resources for clustered raid. */
45 dlm_lockspace_t
*lockspace
;
47 struct completion completion
;
48 struct dlm_lock_resource
*sb_lock
;
49 struct mutex sb_mutex
;
50 struct dlm_lock_resource
*bitmap_lockres
;
51 struct list_head suspend_list
;
52 spinlock_t suspend_lock
;
53 struct md_thread
*recovery_thread
;
54 unsigned long recovery_map
;
57 static void sync_ast(void *arg
)
59 struct dlm_lock_resource
*res
;
61 res
= (struct dlm_lock_resource
*) arg
;
62 complete(&res
->completion
);
65 static int dlm_lock_sync(struct dlm_lock_resource
*res
, int mode
)
69 init_completion(&res
->completion
);
70 ret
= dlm_lock(res
->ls
, mode
, &res
->lksb
,
71 res
->flags
, res
->name
, strlen(res
->name
),
72 0, sync_ast
, res
, res
->bast
);
75 wait_for_completion(&res
->completion
);
76 return res
->lksb
.sb_status
;
79 static int dlm_unlock_sync(struct dlm_lock_resource
*res
)
81 return dlm_lock_sync(res
, DLM_LOCK_NL
);
84 static struct dlm_lock_resource
*lockres_init(struct mddev
*mddev
,
85 char *name
, void (*bastfn
)(void *arg
, int mode
), int with_lvb
)
87 struct dlm_lock_resource
*res
= NULL
;
89 struct md_cluster_info
*cinfo
= mddev
->cluster_info
;
91 res
= kzalloc(sizeof(struct dlm_lock_resource
), GFP_KERNEL
);
94 res
->ls
= cinfo
->lockspace
;
96 namelen
= strlen(name
);
97 res
->name
= kzalloc(namelen
+ 1, GFP_KERNEL
);
99 pr_err("md-cluster: Unable to allocate resource name for resource %s\n", name
);
102 strlcpy(res
->name
, name
, namelen
+ 1);
104 res
->lksb
.sb_lvbptr
= kzalloc(LVB_SIZE
, GFP_KERNEL
);
105 if (!res
->lksb
.sb_lvbptr
) {
106 pr_err("md-cluster: Unable to allocate LVB for resource %s\n", name
);
109 res
->flags
= DLM_LKF_VALBLK
;
115 res
->flags
|= DLM_LKF_EXPEDITE
;
117 ret
= dlm_lock_sync(res
, DLM_LOCK_NL
);
119 pr_err("md-cluster: Unable to lock NL on new lock resource %s\n", name
);
122 res
->flags
&= ~DLM_LKF_EXPEDITE
;
123 res
->flags
|= DLM_LKF_CONVERT
;
127 kfree(res
->lksb
.sb_lvbptr
);
133 static void lockres_free(struct dlm_lock_resource
*res
)
138 init_completion(&res
->completion
);
139 dlm_unlock(res
->ls
, res
->lksb
.sb_lkid
, 0, &res
->lksb
, res
);
140 wait_for_completion(&res
->completion
);
143 kfree(res
->lksb
.sb_lvbptr
);
147 static char *pretty_uuid(char *dest
, char *src
)
151 for (i
= 0; i
< 16; i
++) {
152 if (i
== 4 || i
== 6 || i
== 8 || i
== 10)
153 len
+= sprintf(dest
+ len
, "-");
154 len
+= sprintf(dest
+ len
, "%02x", (__u8
)src
[i
]);
159 static void add_resync_info(struct mddev
*mddev
, struct dlm_lock_resource
*lockres
,
160 sector_t lo
, sector_t hi
)
162 struct resync_info
*ri
;
164 ri
= (struct resync_info
*)lockres
->lksb
.sb_lvbptr
;
165 ri
->lo
= cpu_to_le64(lo
);
166 ri
->hi
= cpu_to_le64(hi
);
169 static struct suspend_info
*read_resync_info(struct mddev
*mddev
, struct dlm_lock_resource
*lockres
)
171 struct resync_info ri
;
172 struct suspend_info
*s
= NULL
;
175 dlm_lock_sync(lockres
, DLM_LOCK_CR
);
176 memcpy(&ri
, lockres
->lksb
.sb_lvbptr
, sizeof(struct resync_info
));
177 hi
= le64_to_cpu(ri
.hi
);
179 s
= kzalloc(sizeof(struct suspend_info
), GFP_KERNEL
);
183 s
->lo
= le64_to_cpu(ri
.lo
);
185 dlm_unlock_sync(lockres
);
190 void recover_bitmaps(struct md_thread
*thread
)
192 struct mddev
*mddev
= thread
->mddev
;
193 struct md_cluster_info
*cinfo
= mddev
->cluster_info
;
194 struct dlm_lock_resource
*bm_lockres
;
197 struct suspend_info
*s
, *tmp
;
200 while (cinfo
->recovery_map
) {
201 slot
= fls64((u64
)cinfo
->recovery_map
) - 1;
203 /* Clear suspend_area associated with the bitmap */
204 spin_lock_irq(&cinfo
->suspend_lock
);
205 list_for_each_entry_safe(s
, tmp
, &cinfo
->suspend_list
, list
)
206 if (slot
== s
->slot
) {
210 spin_unlock_irq(&cinfo
->suspend_lock
);
212 snprintf(str
, 64, "bitmap%04d", slot
);
213 bm_lockres
= lockres_init(mddev
, str
, NULL
, 1);
215 pr_err("md-cluster: Cannot initialize bitmaps\n");
219 ret
= dlm_lock_sync(bm_lockres
, DLM_LOCK_PW
);
221 pr_err("md-cluster: Could not DLM lock %s: %d\n",
225 ret
= bitmap_copy_from_slot(mddev
, slot
, &lo
, &hi
);
227 pr_err("md-cluster: Could not copy data from bitmap %d\n", slot
);
228 dlm_unlock_sync(bm_lockres
);
230 clear_bit(slot
, &cinfo
->recovery_map
);
234 static void recover_prep(void *arg
)
238 static void recover_slot(void *arg
, struct dlm_slot
*slot
)
240 struct mddev
*mddev
= arg
;
241 struct md_cluster_info
*cinfo
= mddev
->cluster_info
;
243 pr_info("md-cluster: %s Node %d/%d down. My slot: %d. Initiating recovery.\n",
244 mddev
->bitmap_info
.cluster_name
,
245 slot
->nodeid
, slot
->slot
,
247 set_bit(slot
->slot
- 1, &cinfo
->recovery_map
);
248 if (!cinfo
->recovery_thread
) {
249 cinfo
->recovery_thread
= md_register_thread(recover_bitmaps
,
251 if (!cinfo
->recovery_thread
) {
252 pr_warn("md-cluster: Could not create recovery thread\n");
256 md_wakeup_thread(cinfo
->recovery_thread
);
259 static void recover_done(void *arg
, struct dlm_slot
*slots
,
260 int num_slots
, int our_slot
,
263 struct mddev
*mddev
= arg
;
264 struct md_cluster_info
*cinfo
= mddev
->cluster_info
;
266 cinfo
->slot_number
= our_slot
;
267 complete(&cinfo
->completion
);
270 static const struct dlm_lockspace_ops md_ls_ops
= {
271 .recover_prep
= recover_prep
,
272 .recover_slot
= recover_slot
,
273 .recover_done
= recover_done
,
276 static int gather_all_resync_info(struct mddev
*mddev
, int total_slots
)
278 struct md_cluster_info
*cinfo
= mddev
->cluster_info
;
280 struct dlm_lock_resource
*bm_lockres
;
281 struct suspend_info
*s
;
285 for (i
= 0; i
< total_slots
; i
++) {
286 memset(str
, '\0', 64);
287 snprintf(str
, 64, "bitmap%04d", i
);
288 bm_lockres
= lockres_init(mddev
, str
, NULL
, 1);
291 if (i
== (cinfo
->slot_number
- 1))
294 bm_lockres
->flags
|= DLM_LKF_NOQUEUE
;
295 ret
= dlm_lock_sync(bm_lockres
, DLM_LOCK_PW
);
296 if (ret
== -EAGAIN
) {
297 memset(bm_lockres
->lksb
.sb_lvbptr
, '\0', LVB_SIZE
);
298 s
= read_resync_info(mddev
, bm_lockres
);
300 pr_info("%s:%d Resync[%llu..%llu] in progress on %d\n",
302 (unsigned long long) s
->lo
,
303 (unsigned long long) s
->hi
, i
);
304 spin_lock_irq(&cinfo
->suspend_lock
);
306 list_add(&s
->list
, &cinfo
->suspend_list
);
307 spin_unlock_irq(&cinfo
->suspend_lock
);
310 lockres_free(bm_lockres
);
315 /* TODO: Read the disk bitmap sb and check if it needs recovery */
316 dlm_unlock_sync(bm_lockres
);
317 lockres_free(bm_lockres
);
323 static int join(struct mddev
*mddev
, int nodes
)
325 struct md_cluster_info
*cinfo
;
329 if (!try_module_get(THIS_MODULE
))
332 cinfo
= kzalloc(sizeof(struct md_cluster_info
), GFP_KERNEL
);
336 init_completion(&cinfo
->completion
);
338 mutex_init(&cinfo
->sb_mutex
);
339 mddev
->cluster_info
= cinfo
;
342 pretty_uuid(str
, mddev
->uuid
);
343 ret
= dlm_new_lockspace(str
, mddev
->bitmap_info
.cluster_name
,
344 DLM_LSFL_FS
, LVB_SIZE
,
345 &md_ls_ops
, mddev
, &ops_rv
, &cinfo
->lockspace
);
348 wait_for_completion(&cinfo
->completion
);
349 if (nodes
<= cinfo
->slot_number
) {
350 pr_err("md-cluster: Slot allotted(%d) greater than available slots(%d)", cinfo
->slot_number
- 1,
355 cinfo
->sb_lock
= lockres_init(mddev
, "cmd-super",
357 if (!cinfo
->sb_lock
) {
362 pr_info("md-cluster: Joined cluster %s slot %d\n", str
, cinfo
->slot_number
);
363 snprintf(str
, 64, "bitmap%04d", cinfo
->slot_number
- 1);
364 cinfo
->bitmap_lockres
= lockres_init(mddev
, str
, NULL
, 1);
365 if (!cinfo
->bitmap_lockres
)
367 if (dlm_lock_sync(cinfo
->bitmap_lockres
, DLM_LOCK_PW
)) {
368 pr_err("Failed to get bitmap lock\n");
373 INIT_LIST_HEAD(&cinfo
->suspend_list
);
374 spin_lock_init(&cinfo
->suspend_lock
);
376 ret
= gather_all_resync_info(mddev
, nodes
);
382 lockres_free(cinfo
->bitmap_lockres
);
383 lockres_free(cinfo
->sb_lock
);
384 if (cinfo
->lockspace
)
385 dlm_release_lockspace(cinfo
->lockspace
, 2);
386 mddev
->cluster_info
= NULL
;
388 module_put(THIS_MODULE
);
392 static int leave(struct mddev
*mddev
)
394 struct md_cluster_info
*cinfo
= mddev
->cluster_info
;
398 md_unregister_thread(&cinfo
->recovery_thread
);
399 lockres_free(cinfo
->sb_lock
);
400 lockres_free(cinfo
->bitmap_lockres
);
401 dlm_release_lockspace(cinfo
->lockspace
, 2);
405 /* slot_number(): Returns the MD slot number to use
406 * DLM starts the slot numbers from 1, wheras cluster-md
407 * wants the number to be from zero, so we deduct one
409 static int slot_number(struct mddev
*mddev
)
411 struct md_cluster_info
*cinfo
= mddev
->cluster_info
;
413 return cinfo
->slot_number
- 1;
416 static void resync_info_update(struct mddev
*mddev
, sector_t lo
, sector_t hi
)
418 struct md_cluster_info
*cinfo
= mddev
->cluster_info
;
420 add_resync_info(mddev
, cinfo
->bitmap_lockres
, lo
, hi
);
421 /* Re-acquire the lock to refresh LVB */
422 dlm_lock_sync(cinfo
->bitmap_lockres
, DLM_LOCK_PW
);
425 static struct md_cluster_operations cluster_ops
= {
428 .slot_number
= slot_number
,
429 .resync_info_update
= resync_info_update
,
432 static int __init
cluster_init(void)
434 pr_warn("md-cluster: EXPERIMENTAL. Use with caution\n");
435 pr_info("Registering Cluster MD functions\n");
436 register_md_cluster_operations(&cluster_ops
, THIS_MODULE
);
440 static void cluster_exit(void)
442 unregister_md_cluster_operations();
445 module_init(cluster_init
);
446 module_exit(cluster_exit
);
447 MODULE_LICENSE("GPL");
448 MODULE_DESCRIPTION("Clustering support for MD");