Gather on-going resync information of other nodes
[deliverable/linux.git] / drivers / md / md-cluster.c
1 /*
2 * Copyright (C) 2015, SUSE
3 *
4 * This program is free software; you can redistribute it and/or modify
5 * it under the terms of the GNU General Public License as published by
6 * the Free Software Foundation; either version 2, or (at your option)
7 * any later version.
8 *
9 */
10
11
12 #include <linux/module.h>
13 #include <linux/dlm.h>
14 #include <linux/sched.h>
15 #include "md.h"
16 #include "md-cluster.h"
17
18 #define LVB_SIZE 64
19
20 struct dlm_lock_resource {
21 dlm_lockspace_t *ls;
22 struct dlm_lksb lksb;
23 char *name; /* lock name. */
24 uint32_t flags; /* flags to pass to dlm_lock() */
25 struct completion completion; /* completion for synchronized locking */
26 void (*bast)(void *arg, int mode); /* blocking AST function pointer*/
27 struct mddev *mddev; /* pointing back to mddev. */
28 };
29
30 struct suspend_info {
31 int slot;
32 sector_t lo;
33 sector_t hi;
34 struct list_head list;
35 };
36
37 struct resync_info {
38 __le64 lo;
39 __le64 hi;
40 };
41
42 struct md_cluster_info {
43 /* dlm lock space and resources for clustered raid. */
44 dlm_lockspace_t *lockspace;
45 int slot_number;
46 struct completion completion;
47 struct dlm_lock_resource *sb_lock;
48 struct mutex sb_mutex;
49 struct dlm_lock_resource *bitmap_lockres;
50 struct list_head suspend_list;
51 spinlock_t suspend_lock;
52 };
53
54 static void sync_ast(void *arg)
55 {
56 struct dlm_lock_resource *res;
57
58 res = (struct dlm_lock_resource *) arg;
59 complete(&res->completion);
60 }
61
62 static int dlm_lock_sync(struct dlm_lock_resource *res, int mode)
63 {
64 int ret = 0;
65
66 init_completion(&res->completion);
67 ret = dlm_lock(res->ls, mode, &res->lksb,
68 res->flags, res->name, strlen(res->name),
69 0, sync_ast, res, res->bast);
70 if (ret)
71 return ret;
72 wait_for_completion(&res->completion);
73 return res->lksb.sb_status;
74 }
75
76 static int dlm_unlock_sync(struct dlm_lock_resource *res)
77 {
78 return dlm_lock_sync(res, DLM_LOCK_NL);
79 }
80
81 static struct dlm_lock_resource *lockres_init(struct mddev *mddev,
82 char *name, void (*bastfn)(void *arg, int mode), int with_lvb)
83 {
84 struct dlm_lock_resource *res = NULL;
85 int ret, namelen;
86 struct md_cluster_info *cinfo = mddev->cluster_info;
87
88 res = kzalloc(sizeof(struct dlm_lock_resource), GFP_KERNEL);
89 if (!res)
90 return NULL;
91 res->ls = cinfo->lockspace;
92 res->mddev = mddev;
93 namelen = strlen(name);
94 res->name = kzalloc(namelen + 1, GFP_KERNEL);
95 if (!res->name) {
96 pr_err("md-cluster: Unable to allocate resource name for resource %s\n", name);
97 goto out_err;
98 }
99 strlcpy(res->name, name, namelen + 1);
100 if (with_lvb) {
101 res->lksb.sb_lvbptr = kzalloc(LVB_SIZE, GFP_KERNEL);
102 if (!res->lksb.sb_lvbptr) {
103 pr_err("md-cluster: Unable to allocate LVB for resource %s\n", name);
104 goto out_err;
105 }
106 res->flags = DLM_LKF_VALBLK;
107 }
108
109 if (bastfn)
110 res->bast = bastfn;
111
112 res->flags |= DLM_LKF_EXPEDITE;
113
114 ret = dlm_lock_sync(res, DLM_LOCK_NL);
115 if (ret) {
116 pr_err("md-cluster: Unable to lock NL on new lock resource %s\n", name);
117 goto out_err;
118 }
119 res->flags &= ~DLM_LKF_EXPEDITE;
120 res->flags |= DLM_LKF_CONVERT;
121
122 return res;
123 out_err:
124 kfree(res->lksb.sb_lvbptr);
125 kfree(res->name);
126 kfree(res);
127 return NULL;
128 }
129
130 static void lockres_free(struct dlm_lock_resource *res)
131 {
132 if (!res)
133 return;
134
135 init_completion(&res->completion);
136 dlm_unlock(res->ls, res->lksb.sb_lkid, 0, &res->lksb, res);
137 wait_for_completion(&res->completion);
138
139 kfree(res->name);
140 kfree(res->lksb.sb_lvbptr);
141 kfree(res);
142 }
143
144 static char *pretty_uuid(char *dest, char *src)
145 {
146 int i, len = 0;
147
148 for (i = 0; i < 16; i++) {
149 if (i == 4 || i == 6 || i == 8 || i == 10)
150 len += sprintf(dest + len, "-");
151 len += sprintf(dest + len, "%02x", (__u8)src[i]);
152 }
153 return dest;
154 }
155
156 static void add_resync_info(struct mddev *mddev, struct dlm_lock_resource *lockres,
157 sector_t lo, sector_t hi)
158 {
159 struct resync_info *ri;
160
161 ri = (struct resync_info *)lockres->lksb.sb_lvbptr;
162 ri->lo = cpu_to_le64(lo);
163 ri->hi = cpu_to_le64(hi);
164 }
165
166 static struct suspend_info *read_resync_info(struct mddev *mddev, struct dlm_lock_resource *lockres)
167 {
168 struct resync_info ri;
169 struct suspend_info *s = NULL;
170 sector_t hi = 0;
171
172 dlm_lock_sync(lockres, DLM_LOCK_CR);
173 memcpy(&ri, lockres->lksb.sb_lvbptr, sizeof(struct resync_info));
174 hi = le64_to_cpu(ri.hi);
175 if (ri.hi > 0) {
176 s = kzalloc(sizeof(struct suspend_info), GFP_KERNEL);
177 if (!s)
178 goto out;
179 s->hi = hi;
180 s->lo = le64_to_cpu(ri.lo);
181 }
182 dlm_unlock_sync(lockres);
183 out:
184 return s;
185 }
186
187 static void recover_prep(void *arg)
188 {
189 }
190
191 static void recover_slot(void *arg, struct dlm_slot *slot)
192 {
193 struct mddev *mddev = arg;
194 struct md_cluster_info *cinfo = mddev->cluster_info;
195
196 pr_info("md-cluster: %s Node %d/%d down. My slot: %d. Initiating recovery.\n",
197 mddev->bitmap_info.cluster_name,
198 slot->nodeid, slot->slot,
199 cinfo->slot_number);
200 }
201
202 static void recover_done(void *arg, struct dlm_slot *slots,
203 int num_slots, int our_slot,
204 uint32_t generation)
205 {
206 struct mddev *mddev = arg;
207 struct md_cluster_info *cinfo = mddev->cluster_info;
208
209 cinfo->slot_number = our_slot;
210 complete(&cinfo->completion);
211 }
212
213 static const struct dlm_lockspace_ops md_ls_ops = {
214 .recover_prep = recover_prep,
215 .recover_slot = recover_slot,
216 .recover_done = recover_done,
217 };
218
219 static int gather_all_resync_info(struct mddev *mddev, int total_slots)
220 {
221 struct md_cluster_info *cinfo = mddev->cluster_info;
222 int i, ret = 0;
223 struct dlm_lock_resource *bm_lockres;
224 struct suspend_info *s;
225 char str[64];
226
227
228 for (i = 0; i < total_slots; i++) {
229 memset(str, '\0', 64);
230 snprintf(str, 64, "bitmap%04d", i);
231 bm_lockres = lockres_init(mddev, str, NULL, 1);
232 if (!bm_lockres)
233 return -ENOMEM;
234 if (i == (cinfo->slot_number - 1))
235 continue;
236
237 bm_lockres->flags |= DLM_LKF_NOQUEUE;
238 ret = dlm_lock_sync(bm_lockres, DLM_LOCK_PW);
239 if (ret == -EAGAIN) {
240 memset(bm_lockres->lksb.sb_lvbptr, '\0', LVB_SIZE);
241 s = read_resync_info(mddev, bm_lockres);
242 if (s) {
243 pr_info("%s:%d Resync[%llu..%llu] in progress on %d\n",
244 __func__, __LINE__,
245 (unsigned long long) s->lo,
246 (unsigned long long) s->hi, i);
247 spin_lock_irq(&cinfo->suspend_lock);
248 s->slot = i;
249 list_add(&s->list, &cinfo->suspend_list);
250 spin_unlock_irq(&cinfo->suspend_lock);
251 }
252 ret = 0;
253 lockres_free(bm_lockres);
254 continue;
255 }
256 if (ret)
257 goto out;
258 /* TODO: Read the disk bitmap sb and check if it needs recovery */
259 dlm_unlock_sync(bm_lockres);
260 lockres_free(bm_lockres);
261 }
262 out:
263 return ret;
264 }
265
266 static int join(struct mddev *mddev, int nodes)
267 {
268 struct md_cluster_info *cinfo;
269 int ret, ops_rv;
270 char str[64];
271
272 if (!try_module_get(THIS_MODULE))
273 return -ENOENT;
274
275 cinfo = kzalloc(sizeof(struct md_cluster_info), GFP_KERNEL);
276 if (!cinfo)
277 return -ENOMEM;
278
279 init_completion(&cinfo->completion);
280
281 mutex_init(&cinfo->sb_mutex);
282 mddev->cluster_info = cinfo;
283
284 memset(str, 0, 64);
285 pretty_uuid(str, mddev->uuid);
286 ret = dlm_new_lockspace(str, mddev->bitmap_info.cluster_name,
287 DLM_LSFL_FS, LVB_SIZE,
288 &md_ls_ops, mddev, &ops_rv, &cinfo->lockspace);
289 if (ret)
290 goto err;
291 wait_for_completion(&cinfo->completion);
292 if (nodes <= cinfo->slot_number) {
293 pr_err("md-cluster: Slot allotted(%d) greater than available slots(%d)", cinfo->slot_number - 1,
294 nodes);
295 ret = -ERANGE;
296 goto err;
297 }
298 cinfo->sb_lock = lockres_init(mddev, "cmd-super",
299 NULL, 0);
300 if (!cinfo->sb_lock) {
301 ret = -ENOMEM;
302 goto err;
303 }
304
305 pr_info("md-cluster: Joined cluster %s slot %d\n", str, cinfo->slot_number);
306 snprintf(str, 64, "bitmap%04d", cinfo->slot_number - 1);
307 cinfo->bitmap_lockres = lockres_init(mddev, str, NULL, 1);
308 if (!cinfo->bitmap_lockres)
309 goto err;
310 if (dlm_lock_sync(cinfo->bitmap_lockres, DLM_LOCK_PW)) {
311 pr_err("Failed to get bitmap lock\n");
312 ret = -EINVAL;
313 goto err;
314 }
315
316 INIT_LIST_HEAD(&cinfo->suspend_list);
317 spin_lock_init(&cinfo->suspend_lock);
318
319 ret = gather_all_resync_info(mddev, nodes);
320 if (ret)
321 goto err;
322
323 return 0;
324 err:
325 lockres_free(cinfo->bitmap_lockres);
326 lockres_free(cinfo->sb_lock);
327 if (cinfo->lockspace)
328 dlm_release_lockspace(cinfo->lockspace, 2);
329 mddev->cluster_info = NULL;
330 kfree(cinfo);
331 module_put(THIS_MODULE);
332 return ret;
333 }
334
335 static int leave(struct mddev *mddev)
336 {
337 struct md_cluster_info *cinfo = mddev->cluster_info;
338
339 if (!cinfo)
340 return 0;
341 lockres_free(cinfo->sb_lock);
342 lockres_free(cinfo->bitmap_lockres);
343 dlm_release_lockspace(cinfo->lockspace, 2);
344 return 0;
345 }
346
347 /* slot_number(): Returns the MD slot number to use
348 * DLM starts the slot numbers from 1, wheras cluster-md
349 * wants the number to be from zero, so we deduct one
350 */
351 static int slot_number(struct mddev *mddev)
352 {
353 struct md_cluster_info *cinfo = mddev->cluster_info;
354
355 return cinfo->slot_number - 1;
356 }
357
358 static void resync_info_update(struct mddev *mddev, sector_t lo, sector_t hi)
359 {
360 struct md_cluster_info *cinfo = mddev->cluster_info;
361
362 add_resync_info(mddev, cinfo->bitmap_lockres, lo, hi);
363 /* Re-acquire the lock to refresh LVB */
364 dlm_lock_sync(cinfo->bitmap_lockres, DLM_LOCK_PW);
365 }
366
367 static struct md_cluster_operations cluster_ops = {
368 .join = join,
369 .leave = leave,
370 .slot_number = slot_number,
371 .resync_info_update = resync_info_update,
372 };
373
374 static int __init cluster_init(void)
375 {
376 pr_warn("md-cluster: EXPERIMENTAL. Use with caution\n");
377 pr_info("Registering Cluster MD functions\n");
378 register_md_cluster_operations(&cluster_ops, THIS_MODULE);
379 return 0;
380 }
381
382 static void cluster_exit(void)
383 {
384 unregister_md_cluster_operations();
385 }
386
387 module_init(cluster_init);
388 module_exit(cluster_exit);
389 MODULE_LICENSE("GPL");
390 MODULE_DESCRIPTION("Clustering support for MD");
This page took 0.040504 seconds and 6 git commands to generate.