fs/ocfs2/dlm: Use GFP_ATOMIC under spin_lock
[deliverable/linux.git] / fs / ocfs2 / dlm / dlmdomain.c
CommitLineData
6714d8e8
KH
1/* -*- mode: c; c-basic-offset: 8; -*-
2 * vim: noexpandtab sw=8 ts=8 sts=0:
3 *
4 * dlmdomain.c
5 *
6 * defines domain join / leave apis
7 *
8 * Copyright (C) 2004 Oracle. All rights reserved.
9 *
10 * This program is free software; you can redistribute it and/or
11 * modify it under the terms of the GNU General Public
12 * License as published by the Free Software Foundation; either
13 * version 2 of the License, or (at your option) any later version.
14 *
15 * This program is distributed in the hope that it will be useful,
16 * but WITHOUT ANY WARRANTY; without even the implied warranty of
17 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
18 * General Public License for more details.
19 *
20 * You should have received a copy of the GNU General Public
21 * License along with this program; if not, write to the
22 * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
23 * Boston, MA 021110-1307, USA.
24 *
25 */
26
27#include <linux/module.h>
28#include <linux/types.h>
29#include <linux/slab.h>
30#include <linux/highmem.h>
6714d8e8
KH
31#include <linux/init.h>
32#include <linux/spinlock.h>
33#include <linux/delay.h>
34#include <linux/err.h>
6325b4a2 35#include <linux/debugfs.h>
6714d8e8
KH
36
37#include "cluster/heartbeat.h"
38#include "cluster/nodemanager.h"
39#include "cluster/tcp.h"
40
41#include "dlmapi.h"
42#include "dlmcommon.h"
6714d8e8 43#include "dlmdomain.h"
6325b4a2 44#include "dlmdebug.h"
6714d8e8
KH
45
46#include "dlmver.h"
47
48#define MLOG_MASK_PREFIX (ML_DLM|ML_DLM_DOMAIN)
49#include "cluster/masklog.h"
50
1faf2894
SE
51/*
52 * ocfs2 node maps are array of long int, which limits to send them freely
53 * across the wire due to endianness issues. To workaround this, we convert
54 * long ints to byte arrays. Following 3 routines are helper functions to
55 * set/test/copy bits within those array of bytes
56 */
57static inline void byte_set_bit(u8 nr, u8 map[])
58{
59 map[nr >> 3] |= (1UL << (nr & 7));
60}
61
62static inline int byte_test_bit(u8 nr, u8 map[])
63{
64 return ((1UL << (nr & 7)) & (map[nr >> 3])) != 0;
65}
66
67static inline void byte_copymap(u8 dmap[], unsigned long smap[],
68 unsigned int sz)
69{
70 unsigned int nn;
71
72 if (!sz)
73 return;
74
75 memset(dmap, 0, ((sz + 7) >> 3));
76 for (nn = 0 ; nn < sz; nn++)
77 if (test_bit(nn, smap))
78 byte_set_bit(nn, dmap);
79}
80
03d864c0
DP
81static void dlm_free_pagevec(void **vec, int pages)
82{
83 while (pages--)
84 free_page((unsigned long)vec[pages]);
85 kfree(vec);
86}
87
88static void **dlm_alloc_pagevec(int pages)
89{
90 void **vec = kmalloc(pages * sizeof(void *), GFP_KERNEL);
91 int i;
92
93 if (!vec)
94 return NULL;
95
96 for (i = 0; i < pages; i++)
97 if (!(vec[i] = (void *)__get_free_page(GFP_KERNEL)))
98 goto out_free;
c8f33b6e 99
685f1adb 100 mlog(0, "Allocated DLM hash pagevec; %d pages (%lu expected), %lu buckets per page\n",
f5a923d1
MF
101 pages, (unsigned long)DLM_HASH_PAGES,
102 (unsigned long)DLM_BUCKETS_PER_PAGE);
03d864c0
DP
103 return vec;
104out_free:
105 dlm_free_pagevec(vec, i);
106 return NULL;
107}
108
6714d8e8
KH
109/*
110 *
111 * spinlock lock ordering: if multiple locks are needed, obey this ordering:
112 * dlm_domain_lock
113 * struct dlm_ctxt->spinlock
114 * struct dlm_lock_resource->spinlock
115 * struct dlm_ctxt->master_lock
116 * struct dlm_ctxt->ast_lock
117 * dlm_master_list_entry->spinlock
118 * dlm_lock->spinlock
119 *
120 */
121
34af946a 122DEFINE_SPINLOCK(dlm_domain_lock);
6714d8e8
KH
123LIST_HEAD(dlm_domains);
124static DECLARE_WAIT_QUEUE_HEAD(dlm_domain_events);
125
d24fbcda
JB
126/*
127 * The supported protocol version for DLM communication. Running domains
128 * will have a negotiated version with the same major number and a minor
129 * number equal or smaller. The dlm_ctxt->dlm_locking_proto field should
130 * be used to determine what a running domain is actually using.
ea203441
SM
131 *
132 * New in version 1.1:
133 * - Message DLM_QUERY_REGION added to support global heartbeat
18cfdf1b 134 * - Message DLM_QUERY_NODEINFO added to allow online node removes
d24fbcda
JB
135 */
136static const struct dlm_protocol_version dlm_protocol = {
137 .pv_major = 1,
4d94aa1b 138 .pv_minor = 1,
d24fbcda
JB
139};
140
6714d8e8
KH
141#define DLM_DOMAIN_BACKOFF_MS 200
142
d74c9803
KH
143static int dlm_query_join_handler(struct o2net_msg *msg, u32 len, void *data,
144 void **ret_data);
145static int dlm_assert_joined_handler(struct o2net_msg *msg, u32 len, void *data,
146 void **ret_data);
147static int dlm_cancel_join_handler(struct o2net_msg *msg, u32 len, void *data,
148 void **ret_data);
ea203441
SM
149static int dlm_query_region_handler(struct o2net_msg *msg, u32 len,
150 void *data, void **ret_data);
d74c9803
KH
151static int dlm_exit_domain_handler(struct o2net_msg *msg, u32 len, void *data,
152 void **ret_data);
d24fbcda
JB
153static int dlm_protocol_compare(struct dlm_protocol_version *existing,
154 struct dlm_protocol_version *request);
6714d8e8
KH
155
156static void dlm_unregister_domain_handlers(struct dlm_ctxt *dlm);
157
158void __dlm_unhash_lockres(struct dlm_lock_resource *lockres)
159{
78062cb2
SM
160 if (!hlist_unhashed(&lockres->hash_node)) {
161 hlist_del_init(&lockres->hash_node);
162 dlm_lockres_put(lockres);
163 }
6714d8e8
KH
164}
165
166void __dlm_insert_lockres(struct dlm_ctxt *dlm,
167 struct dlm_lock_resource *res)
168{
81f2094a 169 struct hlist_head *bucket;
6714d8e8
KH
170 struct qstr *q;
171
172 assert_spin_locked(&dlm->spinlock);
173
174 q = &res->lockname;
03d864c0 175 bucket = dlm_lockres_hash(dlm, q->hash);
6714d8e8
KH
176
177 /* get a reference for our hashtable */
178 dlm_lockres_get(res);
179
81f2094a 180 hlist_add_head(&res->hash_node, bucket);
6714d8e8
KH
181}
182
ba2bf218
KH
183struct dlm_lock_resource * __dlm_lookup_lockres_full(struct dlm_ctxt *dlm,
184 const char *name,
185 unsigned int len,
186 unsigned int hash)
6714d8e8 187{
81f2094a 188 struct hlist_head *bucket;
4198985f 189 struct hlist_node *list;
6714d8e8
KH
190
191 mlog_entry("%.*s\n", len, name);
192
193 assert_spin_locked(&dlm->spinlock);
194
03d864c0
DP
195 bucket = dlm_lockres_hash(dlm, hash);
196
4198985f
DP
197 hlist_for_each(list, bucket) {
198 struct dlm_lock_resource *res = hlist_entry(list,
199 struct dlm_lock_resource, hash_node);
200 if (res->lockname.name[0] != name[0])
201 continue;
202 if (unlikely(res->lockname.len != len))
203 continue;
204 if (memcmp(res->lockname.name + 1, name + 1, len - 1))
205 continue;
206 dlm_lockres_get(res);
207 return res;
6714d8e8 208 }
4198985f 209 return NULL;
6714d8e8
KH
210}
211
ba2bf218
KH
212/* intended to be called by functions which do not care about lock
213 * resources which are being purged (most net _handler functions).
214 * this will return NULL for any lock resource which is found but
215 * currently in the process of dropping its mastery reference.
216 * use __dlm_lookup_lockres_full when you need the lock resource
217 * regardless (e.g. dlm_get_lock_resource) */
218struct dlm_lock_resource * __dlm_lookup_lockres(struct dlm_ctxt *dlm,
219 const char *name,
220 unsigned int len,
221 unsigned int hash)
222{
223 struct dlm_lock_resource *res = NULL;
224
225 mlog_entry("%.*s\n", len, name);
226
227 assert_spin_locked(&dlm->spinlock);
228
229 res = __dlm_lookup_lockres_full(dlm, name, len, hash);
230 if (res) {
231 spin_lock(&res->spinlock);
232 if (res->state & DLM_LOCK_RES_DROPPING_REF) {
233 spin_unlock(&res->spinlock);
234 dlm_lockres_put(res);
235 return NULL;
236 }
237 spin_unlock(&res->spinlock);
238 }
239
240 return res;
241}
242
6714d8e8
KH
243struct dlm_lock_resource * dlm_lookup_lockres(struct dlm_ctxt *dlm,
244 const char *name,
245 unsigned int len)
246{
247 struct dlm_lock_resource *res;
a3d33291 248 unsigned int hash = dlm_lockid_hash(name, len);
6714d8e8
KH
249
250 spin_lock(&dlm->spinlock);
a3d33291 251 res = __dlm_lookup_lockres(dlm, name, len, hash);
6714d8e8
KH
252 spin_unlock(&dlm->spinlock);
253 return res;
254}
255
256static struct dlm_ctxt * __dlm_lookup_domain_full(const char *domain, int len)
257{
258 struct dlm_ctxt *tmp = NULL;
259 struct list_head *iter;
260
261 assert_spin_locked(&dlm_domain_lock);
262
263 /* tmp->name here is always NULL terminated,
264 * but domain may not be! */
265 list_for_each(iter, &dlm_domains) {
266 tmp = list_entry (iter, struct dlm_ctxt, list);
267 if (strlen(tmp->name) == len &&
268 memcmp(tmp->name, domain, len)==0)
269 break;
270 tmp = NULL;
271 }
272
273 return tmp;
274}
275
276/* For null terminated domain strings ONLY */
277static struct dlm_ctxt * __dlm_lookup_domain(const char *domain)
278{
279 assert_spin_locked(&dlm_domain_lock);
280
281 return __dlm_lookup_domain_full(domain, strlen(domain));
282}
283
284
285/* returns true on one of two conditions:
286 * 1) the domain does not exist
287 * 2) the domain exists and it's state is "joined" */
288static int dlm_wait_on_domain_helper(const char *domain)
289{
290 int ret = 0;
291 struct dlm_ctxt *tmp = NULL;
292
293 spin_lock(&dlm_domain_lock);
294
295 tmp = __dlm_lookup_domain(domain);
296 if (!tmp)
297 ret = 1;
298 else if (tmp->dlm_state == DLM_CTXT_JOINED)
299 ret = 1;
300
301 spin_unlock(&dlm_domain_lock);
302 return ret;
303}
304
305static void dlm_free_ctxt_mem(struct dlm_ctxt *dlm)
306{
6325b4a2
SM
307 dlm_destroy_debugfs_subroot(dlm);
308
81f2094a 309 if (dlm->lockres_hash)
03d864c0 310 dlm_free_pagevec((void **)dlm->lockres_hash, DLM_HASH_PAGES);
6714d8e8 311
e2b66ddc
SM
312 if (dlm->master_hash)
313 dlm_free_pagevec((void **)dlm->master_hash, DLM_HASH_PAGES);
314
6714d8e8
KH
315 if (dlm->name)
316 kfree(dlm->name);
317
318 kfree(dlm);
319}
320
321/* A little strange - this function will be called while holding
322 * dlm_domain_lock and is expected to be holding it on the way out. We
323 * will however drop and reacquire it multiple times */
324static void dlm_ctxt_release(struct kref *kref)
325{
326 struct dlm_ctxt *dlm;
327
328 dlm = container_of(kref, struct dlm_ctxt, dlm_refs);
329
330 BUG_ON(dlm->num_joins);
331 BUG_ON(dlm->dlm_state == DLM_CTXT_JOINED);
332
333 /* we may still be in the list if we hit an error during join. */
334 list_del_init(&dlm->list);
335
336 spin_unlock(&dlm_domain_lock);
337
338 mlog(0, "freeing memory from domain %s\n", dlm->name);
339
340 wake_up(&dlm_domain_events);
341
342 dlm_free_ctxt_mem(dlm);
343
344 spin_lock(&dlm_domain_lock);
345}
346
347void dlm_put(struct dlm_ctxt *dlm)
348{
349 spin_lock(&dlm_domain_lock);
350 kref_put(&dlm->dlm_refs, dlm_ctxt_release);
351 spin_unlock(&dlm_domain_lock);
352}
353
354static void __dlm_get(struct dlm_ctxt *dlm)
355{
356 kref_get(&dlm->dlm_refs);
357}
358
359/* given a questionable reference to a dlm object, gets a reference if
360 * it can find it in the list, otherwise returns NULL in which case
361 * you shouldn't trust your pointer. */
362struct dlm_ctxt *dlm_grab(struct dlm_ctxt *dlm)
363{
364 struct list_head *iter;
365 struct dlm_ctxt *target = NULL;
366
367 spin_lock(&dlm_domain_lock);
368
369 list_for_each(iter, &dlm_domains) {
370 target = list_entry (iter, struct dlm_ctxt, list);
371
372 if (target == dlm) {
373 __dlm_get(target);
374 break;
375 }
376
377 target = NULL;
378 }
379
380 spin_unlock(&dlm_domain_lock);
381
382 return target;
383}
384
385int dlm_domain_fully_joined(struct dlm_ctxt *dlm)
386{
387 int ret;
388
389 spin_lock(&dlm_domain_lock);
390 ret = (dlm->dlm_state == DLM_CTXT_JOINED) ||
391 (dlm->dlm_state == DLM_CTXT_IN_SHUTDOWN);
392 spin_unlock(&dlm_domain_lock);
393
394 return ret;
395}
396
3156d267
KH
397static void dlm_destroy_dlm_worker(struct dlm_ctxt *dlm)
398{
399 if (dlm->dlm_worker) {
400 flush_workqueue(dlm->dlm_worker);
401 destroy_workqueue(dlm->dlm_worker);
402 dlm->dlm_worker = NULL;
403 }
404}
405
6714d8e8
KH
406static void dlm_complete_dlm_shutdown(struct dlm_ctxt *dlm)
407{
408 dlm_unregister_domain_handlers(dlm);
007dce53 409 dlm_debug_shutdown(dlm);
6714d8e8
KH
410 dlm_complete_thread(dlm);
411 dlm_complete_recovery_thread(dlm);
3156d267 412 dlm_destroy_dlm_worker(dlm);
6714d8e8
KH
413
414 /* We've left the domain. Now we can take ourselves out of the
415 * list and allow the kref stuff to help us free the
416 * memory. */
417 spin_lock(&dlm_domain_lock);
418 list_del_init(&dlm->list);
419 spin_unlock(&dlm_domain_lock);
420
421 /* Wake up anyone waiting for us to remove this domain */
422 wake_up(&dlm_domain_events);
423}
424
ba2bf218 425static int dlm_migrate_all_locks(struct dlm_ctxt *dlm)
6714d8e8 426{
ba2bf218 427 int i, num, n, ret = 0;
6714d8e8 428 struct dlm_lock_resource *res;
ba2bf218
KH
429 struct hlist_node *iter;
430 struct hlist_head *bucket;
431 int dropped;
6714d8e8
KH
432
433 mlog(0, "Migrating locks from domain %s\n", dlm->name);
ba2bf218
KH
434
435 num = 0;
6714d8e8 436 spin_lock(&dlm->spinlock);
81f2094a 437 for (i = 0; i < DLM_HASH_BUCKETS; i++) {
ba2bf218
KH
438redo_bucket:
439 n = 0;
440 bucket = dlm_lockres_hash(dlm, i);
441 iter = bucket->first;
442 while (iter) {
443 n++;
444 res = hlist_entry(iter, struct dlm_lock_resource,
445 hash_node);
6714d8e8 446 dlm_lockres_get(res);
ba2bf218
KH
447 /* migrate, if necessary. this will drop the dlm
448 * spinlock and retake it if it does migration. */
449 dropped = dlm_empty_lockres(dlm, res);
450
451 spin_lock(&res->spinlock);
452 __dlm_lockres_calc_usage(dlm, res);
453 iter = res->hash_node.next;
454 spin_unlock(&res->spinlock);
455
6714d8e8 456 dlm_lockres_put(res);
ba2bf218 457
ba2bf218
KH
458 if (dropped)
459 goto redo_bucket;
6714d8e8 460 }
0d01af6e 461 cond_resched_lock(&dlm->spinlock);
ba2bf218
KH
462 num += n;
463 mlog(0, "%s: touched %d lockreses in bucket %d "
464 "(tot=%d)\n", dlm->name, n, i, num);
6714d8e8
KH
465 }
466 spin_unlock(&dlm->spinlock);
ba2bf218
KH
467 wake_up(&dlm->dlm_thread_wq);
468
469 /* let the dlm thread take care of purging, keep scanning until
470 * nothing remains in the hash */
471 if (num) {
472 mlog(0, "%s: %d lock resources in hash last pass\n",
473 dlm->name, num);
474 ret = -EAGAIN;
475 }
6714d8e8 476 mlog(0, "DONE Migrating locks from domain %s\n", dlm->name);
ba2bf218 477 return ret;
6714d8e8
KH
478}
479
480static int dlm_no_joining_node(struct dlm_ctxt *dlm)
481{
482 int ret;
483
484 spin_lock(&dlm->spinlock);
485 ret = dlm->joining_node == DLM_LOCK_RES_OWNER_UNKNOWN;
486 spin_unlock(&dlm->spinlock);
487
488 return ret;
489}
490
491static void dlm_mark_domain_leaving(struct dlm_ctxt *dlm)
492{
493 /* Yikes, a double spinlock! I need domain_lock for the dlm
494 * state and the dlm spinlock for join state... Sorry! */
495again:
496 spin_lock(&dlm_domain_lock);
497 spin_lock(&dlm->spinlock);
498
499 if (dlm->joining_node != DLM_LOCK_RES_OWNER_UNKNOWN) {
500 mlog(0, "Node %d is joining, we wait on it.\n",
501 dlm->joining_node);
502 spin_unlock(&dlm->spinlock);
503 spin_unlock(&dlm_domain_lock);
504
505 wait_event(dlm->dlm_join_events, dlm_no_joining_node(dlm));
506 goto again;
507 }
508
509 dlm->dlm_state = DLM_CTXT_LEAVING;
510 spin_unlock(&dlm->spinlock);
511 spin_unlock(&dlm_domain_lock);
512}
513
514static void __dlm_print_nodes(struct dlm_ctxt *dlm)
515{
516 int node = -1;
517
518 assert_spin_locked(&dlm->spinlock);
519
5c80d4c9 520 printk(KERN_NOTICE "o2dlm: Nodes in domain %s: ", dlm->name);
6714d8e8
KH
521
522 while ((node = find_next_bit(dlm->domain_map, O2NM_MAX_NODES,
523 node + 1)) < O2NM_MAX_NODES) {
781ee3e2 524 printk("%d ", node);
6714d8e8 525 }
781ee3e2 526 printk("\n");
6714d8e8
KH
527}
528
d74c9803
KH
529static int dlm_exit_domain_handler(struct o2net_msg *msg, u32 len, void *data,
530 void **ret_data)
6714d8e8
KH
531{
532 struct dlm_ctxt *dlm = data;
533 unsigned int node;
534 struct dlm_exit_domain *exit_msg = (struct dlm_exit_domain *) msg->buf;
535
536 mlog_entry("%p %u %p", msg, len, data);
537
538 if (!dlm_grab(dlm))
539 return 0;
540
541 node = exit_msg->node_idx;
542
5c80d4c9 543 printk(KERN_NOTICE "o2dlm: Node %u leaves domain %s\n", node, dlm->name);
6714d8e8
KH
544
545 spin_lock(&dlm->spinlock);
546 clear_bit(node, dlm->domain_map);
547 __dlm_print_nodes(dlm);
548
549 /* notify anything attached to the heartbeat events */
550 dlm_hb_event_notify_attached(dlm, node, 0);
551
552 spin_unlock(&dlm->spinlock);
553
554 dlm_put(dlm);
555
556 return 0;
557}
558
559static int dlm_send_one_domain_exit(struct dlm_ctxt *dlm,
560 unsigned int node)
561{
562 int status;
563 struct dlm_exit_domain leave_msg;
564
565 mlog(0, "Asking node %u if we can leave the domain %s me = %u\n",
566 node, dlm->name, dlm->node_num);
567
568 memset(&leave_msg, 0, sizeof(leave_msg));
569 leave_msg.node_idx = dlm->node_num;
570
571 status = o2net_send_message(DLM_EXIT_DOMAIN_MSG, dlm->key,
572 &leave_msg, sizeof(leave_msg), node,
573 NULL);
a5196ec5
WW
574 if (status < 0)
575 mlog(ML_ERROR, "Error %d when sending message %u (key 0x%x) to "
576 "node %u\n", status, DLM_EXIT_DOMAIN_MSG, dlm->key, node);
6714d8e8
KH
577 mlog(0, "status return %d from o2net_send_message\n", status);
578
579 return status;
580}
581
582
583static void dlm_leave_domain(struct dlm_ctxt *dlm)
584{
585 int node, clear_node, status;
586
587 /* At this point we've migrated away all our locks and won't
588 * accept mastership of new ones. The dlm is responsible for
589 * almost nothing now. We make sure not to confuse any joining
590 * nodes and then commence shutdown procedure. */
591
592 spin_lock(&dlm->spinlock);
593 /* Clear ourselves from the domain map */
594 clear_bit(dlm->node_num, dlm->domain_map);
595 while ((node = find_next_bit(dlm->domain_map, O2NM_MAX_NODES,
596 0)) < O2NM_MAX_NODES) {
597 /* Drop the dlm spinlock. This is safe wrt the domain_map.
598 * -nodes cannot be added now as the
599 * query_join_handlers knows to respond with OK_NO_MAP
600 * -we catch the right network errors if a node is
601 * removed from the map while we're sending him the
602 * exit message. */
603 spin_unlock(&dlm->spinlock);
604
605 clear_node = 1;
606
607 status = dlm_send_one_domain_exit(dlm, node);
608 if (status < 0 &&
609 status != -ENOPROTOOPT &&
610 status != -ENOTCONN) {
611 mlog(ML_NOTICE, "Error %d sending domain exit message "
612 "to node %d\n", status, node);
613
614 /* Not sure what to do here but lets sleep for
615 * a bit in case this was a transient
616 * error... */
617 msleep(DLM_DOMAIN_BACKOFF_MS);
618 clear_node = 0;
619 }
620
621 spin_lock(&dlm->spinlock);
622 /* If we're not clearing the node bit then we intend
623 * to loop back around to try again. */
624 if (clear_node)
625 clear_bit(node, dlm->domain_map);
626 }
627 spin_unlock(&dlm->spinlock);
628}
629
630int dlm_joined(struct dlm_ctxt *dlm)
631{
632 int ret = 0;
633
634 spin_lock(&dlm_domain_lock);
635
636 if (dlm->dlm_state == DLM_CTXT_JOINED)
637 ret = 1;
638
639 spin_unlock(&dlm_domain_lock);
640
641 return ret;
642}
643
644int dlm_shutting_down(struct dlm_ctxt *dlm)
645{
646 int ret = 0;
647
648 spin_lock(&dlm_domain_lock);
649
650 if (dlm->dlm_state == DLM_CTXT_IN_SHUTDOWN)
651 ret = 1;
652
653 spin_unlock(&dlm_domain_lock);
654
655 return ret;
656}
657
658void dlm_unregister_domain(struct dlm_ctxt *dlm)
659{
660 int leave = 0;
29576f8b 661 struct dlm_lock_resource *res;
6714d8e8
KH
662
663 spin_lock(&dlm_domain_lock);
664 BUG_ON(dlm->dlm_state != DLM_CTXT_JOINED);
665 BUG_ON(!dlm->num_joins);
666
667 dlm->num_joins--;
668 if (!dlm->num_joins) {
669 /* We mark it "in shutdown" now so new register
670 * requests wait until we've completely left the
671 * domain. Don't use DLM_CTXT_LEAVING yet as we still
672 * want new domain joins to communicate with us at
673 * least until we've completed migration of our
674 * resources. */
675 dlm->dlm_state = DLM_CTXT_IN_SHUTDOWN;
676 leave = 1;
677 }
678 spin_unlock(&dlm_domain_lock);
679
680 if (leave) {
681 mlog(0, "shutting down domain %s\n", dlm->name);
682
683 /* We changed dlm state, notify the thread */
684 dlm_kick_thread(dlm, NULL);
685
ba2bf218 686 while (dlm_migrate_all_locks(dlm)) {
2f5bf1f2
SM
687 /* Give dlm_thread time to purge the lockres' */
688 msleep(500);
ba2bf218
KH
689 mlog(0, "%s: more migration to do\n", dlm->name);
690 }
29576f8b
SM
691
692 /* This list should be empty. If not, print remaining lockres */
693 if (!list_empty(&dlm->tracking_list)) {
694 mlog(ML_ERROR, "Following lockres' are still on the "
695 "tracking list:\n");
696 list_for_each_entry(res, &dlm->tracking_list, tracking)
697 dlm_print_one_lock_resource(res);
698 }
699
6714d8e8
KH
700 dlm_mark_domain_leaving(dlm);
701 dlm_leave_domain(dlm);
5dad6c39 702 dlm_force_free_mles(dlm);
6714d8e8
KH
703 dlm_complete_dlm_shutdown(dlm);
704 }
705 dlm_put(dlm);
706}
707EXPORT_SYMBOL_GPL(dlm_unregister_domain);
708
d24fbcda
JB
709static int dlm_query_join_proto_check(char *proto_type, int node,
710 struct dlm_protocol_version *ours,
711 struct dlm_protocol_version *request)
712{
713 int rc;
714 struct dlm_protocol_version proto = *request;
715
716 if (!dlm_protocol_compare(ours, &proto)) {
717 mlog(0,
718 "node %u wanted to join with %s locking protocol "
719 "%u.%u, we respond with %u.%u\n",
720 node, proto_type,
721 request->pv_major,
722 request->pv_minor,
723 proto.pv_major, proto.pv_minor);
724 request->pv_minor = proto.pv_minor;
725 rc = 0;
726 } else {
727 mlog(ML_NOTICE,
728 "Node %u wanted to join with %s locking "
729 "protocol %u.%u, but we have %u.%u, disallowing\n",
730 node, proto_type,
731 request->pv_major,
732 request->pv_minor,
733 ours->pv_major,
734 ours->pv_minor);
735 rc = 1;
736 }
737
738 return rc;
739}
740
0f71b7b4
JB
741/*
742 * struct dlm_query_join_packet is made up of four one-byte fields. They
743 * are effectively in big-endian order already. However, little-endian
744 * machines swap them before putting the packet on the wire (because
745 * query_join's response is a status, and that status is treated as a u32
746 * on the wire). Thus, a big-endian and little-endian machines will treat
747 * this structure differently.
748 *
749 * The solution is to have little-endian machines swap the structure when
750 * converting from the structure to the u32 representation. This will
751 * result in the structure having the correct format on the wire no matter
752 * the host endian format.
753 */
754static void dlm_query_join_packet_to_wire(struct dlm_query_join_packet *packet,
755 u32 *wire)
756{
757 union dlm_query_join_response response;
758
759 response.packet = *packet;
760 *wire = cpu_to_be32(response.intval);
761}
762
763static void dlm_query_join_wire_to_packet(u32 wire,
764 struct dlm_query_join_packet *packet)
765{
766 union dlm_query_join_response response;
767
768 response.intval = cpu_to_be32(wire);
769 *packet = response.packet;
770}
771
d74c9803
KH
772static int dlm_query_join_handler(struct o2net_msg *msg, u32 len, void *data,
773 void **ret_data)
6714d8e8
KH
774{
775 struct dlm_query_join_request *query;
0f71b7b4
JB
776 struct dlm_query_join_packet packet = {
777 .code = JOIN_DISALLOW,
d24fbcda 778 };
6714d8e8 779 struct dlm_ctxt *dlm = NULL;
0f71b7b4 780 u32 response;
1faf2894 781 u8 nodenum;
6714d8e8
KH
782
783 query = (struct dlm_query_join_request *) msg->buf;
784
785 mlog(0, "node %u wants to join domain %s\n", query->node_idx,
786 query->domain);
787
788 /*
789 * If heartbeat doesn't consider the node live, tell it
790 * to back off and try again. This gives heartbeat a chance
791 * to catch up.
792 */
793 if (!o2hb_check_node_heartbeating(query->node_idx)) {
794 mlog(0, "node %u is not in our live map yet\n",
795 query->node_idx);
796
0f71b7b4 797 packet.code = JOIN_DISALLOW;
6714d8e8
KH
798 goto respond;
799 }
800
0f71b7b4 801 packet.code = JOIN_OK_NO_MAP;
6714d8e8
KH
802
803 spin_lock(&dlm_domain_lock);
804 dlm = __dlm_lookup_domain_full(query->domain, query->name_len);
1faf2894
SE
805 if (!dlm)
806 goto unlock_respond;
807
808 /*
809 * There is a small window where the joining node may not see the
810 * node(s) that just left but still part of the cluster. DISALLOW
811 * join request if joining node has different node map.
812 */
813 nodenum=0;
814 while (nodenum < O2NM_MAX_NODES) {
815 if (test_bit(nodenum, dlm->domain_map)) {
816 if (!byte_test_bit(nodenum, query->node_map)) {
e4968476
SM
817 mlog(0, "disallow join as node %u does not "
818 "have node %u in its nodemap\n",
819 query->node_idx, nodenum);
0f71b7b4 820 packet.code = JOIN_DISALLOW;
1faf2894
SE
821 goto unlock_respond;
822 }
823 }
824 nodenum++;
825 }
826
6714d8e8 827 /* Once the dlm ctxt is marked as leaving then we don't want
2bd63216 828 * to be put in someone's domain map.
e2faea4c
KH
829 * Also, explicitly disallow joining at certain troublesome
830 * times (ie. during recovery). */
6714d8e8 831 if (dlm && dlm->dlm_state != DLM_CTXT_LEAVING) {
e2faea4c 832 int bit = query->node_idx;
6714d8e8
KH
833 spin_lock(&dlm->spinlock);
834
835 if (dlm->dlm_state == DLM_CTXT_NEW &&
836 dlm->joining_node == DLM_LOCK_RES_OWNER_UNKNOWN) {
837 /*If this is a brand new context and we
838 * haven't started our join process yet, then
839 * the other node won the race. */
0f71b7b4 840 packet.code = JOIN_OK_NO_MAP;
6714d8e8
KH
841 } else if (dlm->joining_node != DLM_LOCK_RES_OWNER_UNKNOWN) {
842 /* Disallow parallel joins. */
0f71b7b4 843 packet.code = JOIN_DISALLOW;
e2faea4c 844 } else if (dlm->reco.state & DLM_RECO_STATE_ACTIVE) {
e4968476 845 mlog(0, "node %u trying to join, but recovery "
e2faea4c 846 "is ongoing.\n", bit);
0f71b7b4 847 packet.code = JOIN_DISALLOW;
e2faea4c 848 } else if (test_bit(bit, dlm->recovery_map)) {
e4968476 849 mlog(0, "node %u trying to join, but it "
e2faea4c 850 "still needs recovery.\n", bit);
0f71b7b4 851 packet.code = JOIN_DISALLOW;
e2faea4c 852 } else if (test_bit(bit, dlm->domain_map)) {
e4968476 853 mlog(0, "node %u trying to join, but it "
e2faea4c
KH
854 "is still in the domain! needs recovery?\n",
855 bit);
0f71b7b4 856 packet.code = JOIN_DISALLOW;
6714d8e8
KH
857 } else {
858 /* Alright we're fully a part of this domain
859 * so we keep some state as to who's joining
860 * and indicate to him that needs to be fixed
861 * up. */
d24fbcda
JB
862
863 /* Make sure we speak compatible locking protocols. */
864 if (dlm_query_join_proto_check("DLM", bit,
865 &dlm->dlm_locking_proto,
866 &query->dlm_proto)) {
0f71b7b4 867 packet.code = JOIN_PROTOCOL_MISMATCH;
d24fbcda
JB
868 } else if (dlm_query_join_proto_check("fs", bit,
869 &dlm->fs_locking_proto,
870 &query->fs_proto)) {
0f71b7b4 871 packet.code = JOIN_PROTOCOL_MISMATCH;
d24fbcda 872 } else {
0f71b7b4
JB
873 packet.dlm_minor = query->dlm_proto.pv_minor;
874 packet.fs_minor = query->fs_proto.pv_minor;
875 packet.code = JOIN_OK;
d24fbcda
JB
876 __dlm_set_joining_node(dlm, query->node_idx);
877 }
6714d8e8
KH
878 }
879
880 spin_unlock(&dlm->spinlock);
881 }
1faf2894 882unlock_respond:
6714d8e8
KH
883 spin_unlock(&dlm_domain_lock);
884
885respond:
0f71b7b4 886 mlog(0, "We respond with %u\n", packet.code);
6714d8e8 887
0f71b7b4
JB
888 dlm_query_join_packet_to_wire(&packet, &response);
889 return response;
6714d8e8
KH
890}
891
d74c9803
KH
892static int dlm_assert_joined_handler(struct o2net_msg *msg, u32 len, void *data,
893 void **ret_data)
6714d8e8
KH
894{
895 struct dlm_assert_joined *assert;
896 struct dlm_ctxt *dlm = NULL;
897
898 assert = (struct dlm_assert_joined *) msg->buf;
899
900 mlog(0, "node %u asserts join on domain %s\n", assert->node_idx,
901 assert->domain);
902
903 spin_lock(&dlm_domain_lock);
904 dlm = __dlm_lookup_domain_full(assert->domain, assert->name_len);
905 /* XXX should we consider no dlm ctxt an error? */
906 if (dlm) {
907 spin_lock(&dlm->spinlock);
908
909 /* Alright, this node has officially joined our
910 * domain. Set him in the map and clean up our
911 * leftover join state. */
912 BUG_ON(dlm->joining_node != assert->node_idx);
913 set_bit(assert->node_idx, dlm->domain_map);
914 __dlm_set_joining_node(dlm, DLM_LOCK_RES_OWNER_UNKNOWN);
915
5c80d4c9 916 printk(KERN_NOTICE "o2dlm: Node %u joins domain %s\n",
781ee3e2 917 assert->node_idx, dlm->name);
6714d8e8
KH
918 __dlm_print_nodes(dlm);
919
920 /* notify anything attached to the heartbeat events */
921 dlm_hb_event_notify_attached(dlm, assert->node_idx, 1);
922
923 spin_unlock(&dlm->spinlock);
924 }
925 spin_unlock(&dlm_domain_lock);
926
927 return 0;
928}
929
ea203441
SM
930static int dlm_match_regions(struct dlm_ctxt *dlm,
931 struct dlm_query_region *qr)
932{
933 char *local = NULL, *remote = qr->qr_regions;
934 char *l, *r;
935 int localnr, i, j, foundit;
936 int status = 0;
937
938 if (!o2hb_global_heartbeat_active()) {
939 if (qr->qr_numregions) {
940 mlog(ML_ERROR, "Domain %s: Joining node %d has global "
941 "heartbeat enabled but local node %d does not\n",
942 qr->qr_domain, qr->qr_node, dlm->node_num);
943 status = -EINVAL;
944 }
945 goto bail;
946 }
947
948 if (o2hb_global_heartbeat_active() && !qr->qr_numregions) {
949 mlog(ML_ERROR, "Domain %s: Local node %d has global "
950 "heartbeat enabled but joining node %d does not\n",
951 qr->qr_domain, dlm->node_num, qr->qr_node);
952 status = -EINVAL;
953 goto bail;
954 }
955
956 r = remote;
957 for (i = 0; i < qr->qr_numregions; ++i) {
958 mlog(0, "Region %.*s\n", O2HB_MAX_REGION_NAME_LEN, r);
959 r += O2HB_MAX_REGION_NAME_LEN;
960 }
961
a48a982a 962 local = kmalloc(sizeof(qr->qr_regions), GFP_ATOMIC);
ea203441
SM
963 if (!local) {
964 status = -ENOMEM;
965 goto bail;
966 }
967
968 localnr = o2hb_get_all_regions(local, O2NM_MAX_REGIONS);
969
970 /* compare local regions with remote */
971 l = local;
972 for (i = 0; i < localnr; ++i) {
973 foundit = 0;
974 r = remote;
975 for (j = 0; j <= qr->qr_numregions; ++j) {
976 if (!memcmp(l, r, O2HB_MAX_REGION_NAME_LEN)) {
977 foundit = 1;
978 break;
979 }
980 r += O2HB_MAX_REGION_NAME_LEN;
981 }
982 if (!foundit) {
983 status = -EINVAL;
984 mlog(ML_ERROR, "Domain %s: Region '%.*s' registered "
985 "in local node %d but not in joining node %d\n",
986 qr->qr_domain, O2HB_MAX_REGION_NAME_LEN, l,
987 dlm->node_num, qr->qr_node);
988 goto bail;
989 }
990 l += O2HB_MAX_REGION_NAME_LEN;
991 }
992
993 /* compare remote with local regions */
994 r = remote;
995 for (i = 0; i < qr->qr_numregions; ++i) {
996 foundit = 0;
997 l = local;
998 for (j = 0; j < localnr; ++j) {
999 if (!memcmp(r, l, O2HB_MAX_REGION_NAME_LEN)) {
1000 foundit = 1;
1001 break;
1002 }
1003 l += O2HB_MAX_REGION_NAME_LEN;
1004 }
1005 if (!foundit) {
1006 status = -EINVAL;
1007 mlog(ML_ERROR, "Domain %s: Region '%.*s' registered "
1008 "in joining node %d but not in local node %d\n",
1009 qr->qr_domain, O2HB_MAX_REGION_NAME_LEN, r,
1010 qr->qr_node, dlm->node_num);
1011 goto bail;
1012 }
1013 r += O2HB_MAX_REGION_NAME_LEN;
1014 }
1015
1016bail:
1017 kfree(local);
1018
1019 return status;
1020}
1021
1022static int dlm_send_regions(struct dlm_ctxt *dlm, unsigned long *node_map)
1023{
1024 struct dlm_query_region *qr = NULL;
1025 int status, ret = 0, i;
1026 char *p;
1027
1028 if (find_next_bit(node_map, O2NM_MAX_NODES, 0) >= O2NM_MAX_NODES)
1029 goto bail;
1030
1031 qr = kzalloc(sizeof(struct dlm_query_region), GFP_KERNEL);
1032 if (!qr) {
1033 ret = -ENOMEM;
1034 mlog_errno(ret);
1035 goto bail;
1036 }
1037
1038 qr->qr_node = dlm->node_num;
1039 qr->qr_namelen = strlen(dlm->name);
1040 memcpy(qr->qr_domain, dlm->name, qr->qr_namelen);
1041 /* if local hb, the numregions will be zero */
1042 if (o2hb_global_heartbeat_active())
1043 qr->qr_numregions = o2hb_get_all_regions(qr->qr_regions,
1044 O2NM_MAX_REGIONS);
1045
1046 p = qr->qr_regions;
1047 for (i = 0; i < qr->qr_numregions; ++i, p += O2HB_MAX_REGION_NAME_LEN)
1048 mlog(0, "Region %.*s\n", O2HB_MAX_REGION_NAME_LEN, p);
1049
1050 i = -1;
1051 while ((i = find_next_bit(node_map, O2NM_MAX_NODES,
1052 i + 1)) < O2NM_MAX_NODES) {
1053 if (i == dlm->node_num)
1054 continue;
1055
1056 mlog(0, "Sending regions to node %d\n", i);
1057
1058 ret = o2net_send_message(DLM_QUERY_REGION, DLM_MOD_KEY, qr,
1059 sizeof(struct dlm_query_region),
1060 i, &status);
1061 if (ret >= 0)
1062 ret = status;
1063 if (ret) {
1064 mlog(ML_ERROR, "Region mismatch %d, node %d\n",
1065 ret, i);
1066 break;
1067 }
1068 }
1069
1070bail:
1071 kfree(qr);
1072 return ret;
1073}
1074
1075static int dlm_query_region_handler(struct o2net_msg *msg, u32 len,
1076 void *data, void **ret_data)
1077{
1078 struct dlm_query_region *qr;
1079 struct dlm_ctxt *dlm = NULL;
1080 int status = 0;
1081 int locked = 0;
1082
1083 qr = (struct dlm_query_region *) msg->buf;
1084
1085 mlog(0, "Node %u queries hb regions on domain %s\n", qr->qr_node,
1086 qr->qr_domain);
1087
1088 status = -EINVAL;
1089
1090 spin_lock(&dlm_domain_lock);
1091 dlm = __dlm_lookup_domain_full(qr->qr_domain, qr->qr_namelen);
1092 if (!dlm) {
1093 mlog(ML_ERROR, "Node %d queried hb regions on domain %s "
1094 "before join domain\n", qr->qr_node, qr->qr_domain);
1095 goto bail;
1096 }
1097
1098 spin_lock(&dlm->spinlock);
1099 locked = 1;
1100 if (dlm->joining_node != qr->qr_node) {
1101 mlog(ML_ERROR, "Node %d queried hb regions on domain %s "
1102 "but joining node is %d\n", qr->qr_node, qr->qr_domain,
1103 dlm->joining_node);
1104 goto bail;
1105 }
1106
1107 /* Support for global heartbeat was added in 1.1 */
1108 if (dlm->dlm_locking_proto.pv_major == 1 &&
1109 dlm->dlm_locking_proto.pv_minor == 0) {
1110 mlog(ML_ERROR, "Node %d queried hb regions on domain %s "
1111 "but active dlm protocol is %d.%d\n", qr->qr_node,
1112 qr->qr_domain, dlm->dlm_locking_proto.pv_major,
1113 dlm->dlm_locking_proto.pv_minor);
1114 goto bail;
1115 }
1116
1117 status = dlm_match_regions(dlm, qr);
1118
1119bail:
1120 if (locked)
1121 spin_unlock(&dlm->spinlock);
1122 spin_unlock(&dlm_domain_lock);
1123
1124 return status;
1125}
1126
18cfdf1b
SM
1127static int dlm_match_nodes(struct dlm_ctxt *dlm, struct dlm_query_nodeinfo *qn)
1128{
1129 struct o2nm_node *local;
1130 struct dlm_node_info *remote;
1131 int i, j;
1132 int status = 0;
1133
1134 for (j = 0; j < qn->qn_numnodes; ++j)
1135 mlog(0, "Node %3d, %pI4:%u\n", qn->qn_nodes[j].ni_nodenum,
1136 &(qn->qn_nodes[j].ni_ipv4_address),
1137 ntohs(qn->qn_nodes[j].ni_ipv4_port));
1138
1139 for (i = 0; i < O2NM_MAX_NODES && !status; ++i) {
1140 local = o2nm_get_node_by_num(i);
1141 remote = NULL;
1142 for (j = 0; j < qn->qn_numnodes; ++j) {
1143 if (qn->qn_nodes[j].ni_nodenum == i) {
1144 remote = &(qn->qn_nodes[j]);
1145 break;
1146 }
1147 }
1148
1149 if (!local && !remote)
1150 continue;
1151
1152 if ((local && !remote) || (!local && remote))
1153 status = -EINVAL;
1154
1155 if (!status &&
1156 ((remote->ni_nodenum != local->nd_num) ||
1157 (remote->ni_ipv4_port != local->nd_ipv4_port) ||
1158 (remote->ni_ipv4_address != local->nd_ipv4_address)))
1159 status = -EINVAL;
1160
1161 if (status) {
1162 if (remote && !local)
1163 mlog(ML_ERROR, "Domain %s: Node %d (%pI4:%u) "
1164 "registered in joining node %d but not in "
1165 "local node %d\n", qn->qn_domain,
1166 remote->ni_nodenum,
1167 &(remote->ni_ipv4_address),
1168 ntohs(remote->ni_ipv4_port),
1169 qn->qn_nodenum, dlm->node_num);
1170 if (local && !remote)
1171 mlog(ML_ERROR, "Domain %s: Node %d (%pI4:%u) "
1172 "registered in local node %d but not in "
1173 "joining node %d\n", qn->qn_domain,
1174 local->nd_num, &(local->nd_ipv4_address),
1175 ntohs(local->nd_ipv4_port),
1176 dlm->node_num, qn->qn_nodenum);
1177 BUG_ON((!local && !remote));
1178 }
1179
1180 if (local)
1181 o2nm_node_put(local);
1182 }
1183
1184 return status;
1185}
1186
1187static int dlm_send_nodeinfo(struct dlm_ctxt *dlm, unsigned long *node_map)
1188{
1189 struct dlm_query_nodeinfo *qn = NULL;
1190 struct o2nm_node *node;
1191 int ret = 0, status, count, i;
1192
1193 if (find_next_bit(node_map, O2NM_MAX_NODES, 0) >= O2NM_MAX_NODES)
1194 goto bail;
1195
1196 qn = kzalloc(sizeof(struct dlm_query_nodeinfo), GFP_KERNEL);
1197 if (!qn) {
1198 ret = -ENOMEM;
1199 mlog_errno(ret);
1200 goto bail;
1201 }
1202
1203 for (i = 0, count = 0; i < O2NM_MAX_NODES; ++i) {
1204 node = o2nm_get_node_by_num(i);
1205 if (!node)
1206 continue;
1207 qn->qn_nodes[count].ni_nodenum = node->nd_num;
1208 qn->qn_nodes[count].ni_ipv4_port = node->nd_ipv4_port;
1209 qn->qn_nodes[count].ni_ipv4_address = node->nd_ipv4_address;
1210 mlog(0, "Node %3d, %pI4:%u\n", node->nd_num,
1211 &(node->nd_ipv4_address), ntohs(node->nd_ipv4_port));
1212 ++count;
1213 o2nm_node_put(node);
1214 }
1215
1216 qn->qn_nodenum = dlm->node_num;
1217 qn->qn_numnodes = count;
1218 qn->qn_namelen = strlen(dlm->name);
1219 memcpy(qn->qn_domain, dlm->name, qn->qn_namelen);
1220
1221 i = -1;
1222 while ((i = find_next_bit(node_map, O2NM_MAX_NODES,
1223 i + 1)) < O2NM_MAX_NODES) {
1224 if (i == dlm->node_num)
1225 continue;
1226
1227 mlog(0, "Sending nodeinfo to node %d\n", i);
1228
1229 ret = o2net_send_message(DLM_QUERY_NODEINFO, DLM_MOD_KEY,
1230 qn, sizeof(struct dlm_query_nodeinfo),
1231 i, &status);
1232 if (ret >= 0)
1233 ret = status;
1234 if (ret) {
1235 mlog(ML_ERROR, "node mismatch %d, node %d\n", ret, i);
1236 break;
1237 }
1238 }
1239
1240bail:
1241 kfree(qn);
1242 return ret;
1243}
1244
1245static int dlm_query_nodeinfo_handler(struct o2net_msg *msg, u32 len,
1246 void *data, void **ret_data)
1247{
1248 struct dlm_query_nodeinfo *qn;
1249 struct dlm_ctxt *dlm = NULL;
1250 int locked = 0, status = -EINVAL;
1251
1252 qn = (struct dlm_query_nodeinfo *) msg->buf;
1253
1254 mlog(0, "Node %u queries nodes on domain %s\n", qn->qn_nodenum,
1255 qn->qn_domain);
1256
1257 spin_lock(&dlm_domain_lock);
1258 dlm = __dlm_lookup_domain_full(qn->qn_domain, qn->qn_namelen);
1259 if (!dlm) {
1260 mlog(ML_ERROR, "Node %d queried nodes on domain %s before "
1261 "join domain\n", qn->qn_nodenum, qn->qn_domain);
1262 goto bail;
1263 }
1264
1265 spin_lock(&dlm->spinlock);
1266 locked = 1;
1267 if (dlm->joining_node != qn->qn_nodenum) {
1268 mlog(ML_ERROR, "Node %d queried nodes on domain %s but "
1269 "joining node is %d\n", qn->qn_nodenum, qn->qn_domain,
1270 dlm->joining_node);
1271 goto bail;
1272 }
1273
1274 /* Support for node query was added in 1.1 */
1275 if (dlm->dlm_locking_proto.pv_major == 1 &&
1276 dlm->dlm_locking_proto.pv_minor == 0) {
1277 mlog(ML_ERROR, "Node %d queried nodes on domain %s "
1278 "but active dlm protocol is %d.%d\n", qn->qn_nodenum,
1279 qn->qn_domain, dlm->dlm_locking_proto.pv_major,
1280 dlm->dlm_locking_proto.pv_minor);
1281 goto bail;
1282 }
1283
1284 status = dlm_match_nodes(dlm, qn);
1285
1286bail:
1287 if (locked)
1288 spin_unlock(&dlm->spinlock);
1289 spin_unlock(&dlm_domain_lock);
1290
1291 return status;
1292}
1293
d74c9803
KH
1294static int dlm_cancel_join_handler(struct o2net_msg *msg, u32 len, void *data,
1295 void **ret_data)
6714d8e8
KH
1296{
1297 struct dlm_cancel_join *cancel;
1298 struct dlm_ctxt *dlm = NULL;
1299
1300 cancel = (struct dlm_cancel_join *) msg->buf;
1301
1302 mlog(0, "node %u cancels join on domain %s\n", cancel->node_idx,
1303 cancel->domain);
1304
1305 spin_lock(&dlm_domain_lock);
1306 dlm = __dlm_lookup_domain_full(cancel->domain, cancel->name_len);
1307
1308 if (dlm) {
1309 spin_lock(&dlm->spinlock);
1310
1311 /* Yikes, this guy wants to cancel his join. No
1312 * problem, we simply cleanup our join state. */
1313 BUG_ON(dlm->joining_node != cancel->node_idx);
1314 __dlm_set_joining_node(dlm, DLM_LOCK_RES_OWNER_UNKNOWN);
1315
1316 spin_unlock(&dlm->spinlock);
1317 }
1318 spin_unlock(&dlm_domain_lock);
1319
1320 return 0;
1321}
1322
1323static int dlm_send_one_join_cancel(struct dlm_ctxt *dlm,
1324 unsigned int node)
1325{
1326 int status;
1327 struct dlm_cancel_join cancel_msg;
1328
1329 memset(&cancel_msg, 0, sizeof(cancel_msg));
1330 cancel_msg.node_idx = dlm->node_num;
1331 cancel_msg.name_len = strlen(dlm->name);
1332 memcpy(cancel_msg.domain, dlm->name, cancel_msg.name_len);
1333
1334 status = o2net_send_message(DLM_CANCEL_JOIN_MSG, DLM_MOD_KEY,
1335 &cancel_msg, sizeof(cancel_msg), node,
1336 NULL);
1337 if (status < 0) {
a5196ec5
WW
1338 mlog(ML_ERROR, "Error %d when sending message %u (key 0x%x) to "
1339 "node %u\n", status, DLM_CANCEL_JOIN_MSG, DLM_MOD_KEY,
1340 node);
6714d8e8
KH
1341 goto bail;
1342 }
1343
1344bail:
1345 return status;
1346}
1347
1348/* map_size should be in bytes. */
1349static int dlm_send_join_cancels(struct dlm_ctxt *dlm,
1350 unsigned long *node_map,
1351 unsigned int map_size)
1352{
1353 int status, tmpstat;
1354 unsigned int node;
1355
1356 if (map_size != (BITS_TO_LONGS(O2NM_MAX_NODES) *
1357 sizeof(unsigned long))) {
1358 mlog(ML_ERROR,
1359 "map_size %u != BITS_TO_LONGS(O2NM_MAX_NODES) %u\n",
3a4780a8 1360 map_size, (unsigned)BITS_TO_LONGS(O2NM_MAX_NODES));
6714d8e8
KH
1361 return -EINVAL;
1362 }
1363
1364 status = 0;
1365 node = -1;
1366 while ((node = find_next_bit(node_map, O2NM_MAX_NODES,
1367 node + 1)) < O2NM_MAX_NODES) {
1368 if (node == dlm->node_num)
1369 continue;
1370
1371 tmpstat = dlm_send_one_join_cancel(dlm, node);
1372 if (tmpstat) {
1373 mlog(ML_ERROR, "Error return %d cancelling join on "
1374 "node %d\n", tmpstat, node);
1375 if (!status)
1376 status = tmpstat;
1377 }
1378 }
1379
1380 if (status)
1381 mlog_errno(status);
1382 return status;
1383}
1384
1385static int dlm_request_join(struct dlm_ctxt *dlm,
1386 int node,
d24fbcda 1387 enum dlm_query_join_response_code *response)
6714d8e8 1388{
d24fbcda 1389 int status;
6714d8e8 1390 struct dlm_query_join_request join_msg;
0f71b7b4
JB
1391 struct dlm_query_join_packet packet;
1392 u32 join_resp;
6714d8e8
KH
1393
1394 mlog(0, "querying node %d\n", node);
1395
1396 memset(&join_msg, 0, sizeof(join_msg));
1397 join_msg.node_idx = dlm->node_num;
1398 join_msg.name_len = strlen(dlm->name);
1399 memcpy(join_msg.domain, dlm->name, join_msg.name_len);
d24fbcda
JB
1400 join_msg.dlm_proto = dlm->dlm_locking_proto;
1401 join_msg.fs_proto = dlm->fs_locking_proto;
6714d8e8 1402
1faf2894
SE
1403 /* copy live node map to join message */
1404 byte_copymap(join_msg.node_map, dlm->live_nodes_map, O2NM_MAX_NODES);
1405
6714d8e8 1406 status = o2net_send_message(DLM_QUERY_JOIN_MSG, DLM_MOD_KEY, &join_msg,
a5196ec5 1407 sizeof(join_msg), node, &join_resp);
6714d8e8 1408 if (status < 0 && status != -ENOPROTOOPT) {
a5196ec5
WW
1409 mlog(ML_ERROR, "Error %d when sending message %u (key 0x%x) to "
1410 "node %u\n", status, DLM_QUERY_JOIN_MSG, DLM_MOD_KEY,
1411 node);
6714d8e8
KH
1412 goto bail;
1413 }
0f71b7b4 1414 dlm_query_join_wire_to_packet(join_resp, &packet);
6714d8e8
KH
1415
1416 /* -ENOPROTOOPT from the net code means the other side isn't
1417 listening for our message type -- that's fine, it means
1418 his dlm isn't up, so we can consider him a 'yes' but not
1419 joined into the domain. */
1420 if (status == -ENOPROTOOPT) {
1421 status = 0;
1422 *response = JOIN_OK_NO_MAP;
0f71b7b4
JB
1423 } else if (packet.code == JOIN_DISALLOW ||
1424 packet.code == JOIN_OK_NO_MAP) {
1425 *response = packet.code;
1426 } else if (packet.code == JOIN_PROTOCOL_MISMATCH) {
d24fbcda
JB
1427 mlog(ML_NOTICE,
1428 "This node requested DLM locking protocol %u.%u and "
1429 "filesystem locking protocol %u.%u. At least one of "
1430 "the protocol versions on node %d is not compatible, "
1431 "disconnecting\n",
1432 dlm->dlm_locking_proto.pv_major,
1433 dlm->dlm_locking_proto.pv_minor,
1434 dlm->fs_locking_proto.pv_major,
1435 dlm->fs_locking_proto.pv_minor,
1436 node);
1437 status = -EPROTO;
0f71b7b4
JB
1438 *response = packet.code;
1439 } else if (packet.code == JOIN_OK) {
1440 *response = packet.code;
d24fbcda 1441 /* Use the same locking protocol as the remote node */
0f71b7b4
JB
1442 dlm->dlm_locking_proto.pv_minor = packet.dlm_minor;
1443 dlm->fs_locking_proto.pv_minor = packet.fs_minor;
d24fbcda
JB
1444 mlog(0,
1445 "Node %d responds JOIN_OK with DLM locking protocol "
1446 "%u.%u and fs locking protocol %u.%u\n",
1447 node,
1448 dlm->dlm_locking_proto.pv_major,
1449 dlm->dlm_locking_proto.pv_minor,
1450 dlm->fs_locking_proto.pv_major,
1451 dlm->fs_locking_proto.pv_minor);
6714d8e8
KH
1452 } else {
1453 status = -EINVAL;
d24fbcda 1454 mlog(ML_ERROR, "invalid response %d from node %u\n",
0f71b7b4 1455 packet.code, node);
6714d8e8
KH
1456 }
1457
1458 mlog(0, "status %d, node %d response is %d\n", status, node,
0f71b7b4 1459 *response);
6714d8e8
KH
1460
1461bail:
1462 return status;
1463}
1464
1465static int dlm_send_one_join_assert(struct dlm_ctxt *dlm,
1466 unsigned int node)
1467{
1468 int status;
1469 struct dlm_assert_joined assert_msg;
1470
1471 mlog(0, "Sending join assert to node %u\n", node);
1472
1473 memset(&assert_msg, 0, sizeof(assert_msg));
1474 assert_msg.node_idx = dlm->node_num;
1475 assert_msg.name_len = strlen(dlm->name);
1476 memcpy(assert_msg.domain, dlm->name, assert_msg.name_len);
1477
1478 status = o2net_send_message(DLM_ASSERT_JOINED_MSG, DLM_MOD_KEY,
1479 &assert_msg, sizeof(assert_msg), node,
1480 NULL);
1481 if (status < 0)
a5196ec5
WW
1482 mlog(ML_ERROR, "Error %d when sending message %u (key 0x%x) to "
1483 "node %u\n", status, DLM_ASSERT_JOINED_MSG, DLM_MOD_KEY,
1484 node);
6714d8e8
KH
1485
1486 return status;
1487}
1488
1489static void dlm_send_join_asserts(struct dlm_ctxt *dlm,
1490 unsigned long *node_map)
1491{
1492 int status, node, live;
1493
1494 status = 0;
1495 node = -1;
1496 while ((node = find_next_bit(node_map, O2NM_MAX_NODES,
1497 node + 1)) < O2NM_MAX_NODES) {
1498 if (node == dlm->node_num)
1499 continue;
1500
1501 do {
1502 /* It is very important that this message be
1503 * received so we spin until either the node
1504 * has died or it gets the message. */
1505 status = dlm_send_one_join_assert(dlm, node);
1506
1507 spin_lock(&dlm->spinlock);
1508 live = test_bit(node, dlm->live_nodes_map);
1509 spin_unlock(&dlm->spinlock);
1510
1511 if (status) {
1512 mlog(ML_ERROR, "Error return %d asserting "
1513 "join on node %d\n", status, node);
1514
1515 /* give us some time between errors... */
1516 if (live)
1517 msleep(DLM_DOMAIN_BACKOFF_MS);
1518 }
1519 } while (status && live);
1520 }
1521}
1522
1523struct domain_join_ctxt {
1524 unsigned long live_map[BITS_TO_LONGS(O2NM_MAX_NODES)];
1525 unsigned long yes_resp_map[BITS_TO_LONGS(O2NM_MAX_NODES)];
1526};
1527
1528static int dlm_should_restart_join(struct dlm_ctxt *dlm,
1529 struct domain_join_ctxt *ctxt,
d24fbcda 1530 enum dlm_query_join_response_code response)
6714d8e8
KH
1531{
1532 int ret;
1533
1534 if (response == JOIN_DISALLOW) {
1535 mlog(0, "Latest response of disallow -- should restart\n");
1536 return 1;
1537 }
1538
1539 spin_lock(&dlm->spinlock);
1540 /* For now, we restart the process if the node maps have
1541 * changed at all */
1542 ret = memcmp(ctxt->live_map, dlm->live_nodes_map,
1543 sizeof(dlm->live_nodes_map));
1544 spin_unlock(&dlm->spinlock);
1545
1546 if (ret)
1547 mlog(0, "Node maps changed -- should restart\n");
1548
1549 return ret;
1550}
1551
1552static int dlm_try_to_join_domain(struct dlm_ctxt *dlm)
1553{
1554 int status = 0, tmpstat, node;
1555 struct domain_join_ctxt *ctxt;
d24fbcda 1556 enum dlm_query_join_response_code response = JOIN_DISALLOW;
6714d8e8
KH
1557
1558 mlog_entry("%p", dlm);
1559
cd861280 1560 ctxt = kzalloc(sizeof(*ctxt), GFP_KERNEL);
6714d8e8
KH
1561 if (!ctxt) {
1562 status = -ENOMEM;
1563 mlog_errno(status);
1564 goto bail;
1565 }
1566
1567 /* group sem locking should work for us here -- we're already
1568 * registered for heartbeat events so filling this should be
1569 * atomic wrt getting those handlers called. */
1570 o2hb_fill_node_map(dlm->live_nodes_map, sizeof(dlm->live_nodes_map));
1571
1572 spin_lock(&dlm->spinlock);
1573 memcpy(ctxt->live_map, dlm->live_nodes_map, sizeof(ctxt->live_map));
1574
1575 __dlm_set_joining_node(dlm, dlm->node_num);
1576
1577 spin_unlock(&dlm->spinlock);
1578
1579 node = -1;
1580 while ((node = find_next_bit(ctxt->live_map, O2NM_MAX_NODES,
1581 node + 1)) < O2NM_MAX_NODES) {
1582 if (node == dlm->node_num)
1583 continue;
1584
1585 status = dlm_request_join(dlm, node, &response);
1586 if (status < 0) {
1587 mlog_errno(status);
1588 goto bail;
1589 }
1590
1591 /* Ok, either we got a response or the node doesn't have a
1592 * dlm up. */
1593 if (response == JOIN_OK)
1594 set_bit(node, ctxt->yes_resp_map);
1595
1596 if (dlm_should_restart_join(dlm, ctxt, response)) {
1597 status = -EAGAIN;
1598 goto bail;
1599 }
1600 }
1601
1602 mlog(0, "Yay, done querying nodes!\n");
1603
1604 /* Yay, everyone agree's we can join the domain. My domain is
1605 * comprised of all nodes who were put in the
1606 * yes_resp_map. Copy that into our domain map and send a join
1607 * assert message to clean up everyone elses state. */
1608 spin_lock(&dlm->spinlock);
1609 memcpy(dlm->domain_map, ctxt->yes_resp_map,
1610 sizeof(ctxt->yes_resp_map));
1611 set_bit(dlm->node_num, dlm->domain_map);
1612 spin_unlock(&dlm->spinlock);
1613
18cfdf1b 1614 /* Support for global heartbeat and node info was added in 1.1 */
ea203441 1615 if (dlm_protocol.pv_major > 1 || dlm_protocol.pv_minor > 0) {
18cfdf1b
SM
1616 status = dlm_send_nodeinfo(dlm, ctxt->yes_resp_map);
1617 if (status) {
1618 mlog_errno(status);
1619 goto bail;
1620 }
ea203441
SM
1621 status = dlm_send_regions(dlm, ctxt->yes_resp_map);
1622 if (status) {
1623 mlog_errno(status);
1624 goto bail;
1625 }
1626 }
1627
6714d8e8
KH
1628 dlm_send_join_asserts(dlm, ctxt->yes_resp_map);
1629
1630 /* Joined state *must* be set before the joining node
1631 * information, otherwise the query_join handler may read no
1632 * current joiner but a state of NEW and tell joining nodes
1633 * we're not in the domain. */
1634 spin_lock(&dlm_domain_lock);
1635 dlm->dlm_state = DLM_CTXT_JOINED;
1636 dlm->num_joins++;
1637 spin_unlock(&dlm_domain_lock);
1638
1639bail:
1640 spin_lock(&dlm->spinlock);
1641 __dlm_set_joining_node(dlm, DLM_LOCK_RES_OWNER_UNKNOWN);
1642 if (!status)
1643 __dlm_print_nodes(dlm);
1644 spin_unlock(&dlm->spinlock);
1645
1646 if (ctxt) {
1647 /* Do we need to send a cancel message to any nodes? */
1648 if (status < 0) {
1649 tmpstat = dlm_send_join_cancels(dlm,
1650 ctxt->yes_resp_map,
1651 sizeof(ctxt->yes_resp_map));
1652 if (tmpstat < 0)
1653 mlog_errno(tmpstat);
1654 }
1655 kfree(ctxt);
1656 }
1657
1658 mlog(0, "returning %d\n", status);
1659 return status;
1660}
1661
1662static void dlm_unregister_domain_handlers(struct dlm_ctxt *dlm)
1663{
14829422
JB
1664 o2hb_unregister_callback(NULL, &dlm->dlm_hb_up);
1665 o2hb_unregister_callback(NULL, &dlm->dlm_hb_down);
6714d8e8
KH
1666 o2net_unregister_handler_list(&dlm->dlm_domain_handlers);
1667}
1668
1669static int dlm_register_domain_handlers(struct dlm_ctxt *dlm)
1670{
1671 int status;
1672
1673 mlog(0, "registering handlers.\n");
1674
1675 o2hb_setup_callback(&dlm->dlm_hb_down, O2HB_NODE_DOWN_CB,
1676 dlm_hb_node_down_cb, dlm, DLM_HB_NODE_DOWN_PRI);
14829422 1677 status = o2hb_register_callback(NULL, &dlm->dlm_hb_down);
6714d8e8
KH
1678 if (status)
1679 goto bail;
1680
1681 o2hb_setup_callback(&dlm->dlm_hb_up, O2HB_NODE_UP_CB,
1682 dlm_hb_node_up_cb, dlm, DLM_HB_NODE_UP_PRI);
14829422 1683 status = o2hb_register_callback(NULL, &dlm->dlm_hb_up);
6714d8e8
KH
1684 if (status)
1685 goto bail;
1686
1687 status = o2net_register_handler(DLM_MASTER_REQUEST_MSG, dlm->key,
1688 sizeof(struct dlm_master_request),
1689 dlm_master_request_handler,
d74c9803 1690 dlm, NULL, &dlm->dlm_domain_handlers);
6714d8e8
KH
1691 if (status)
1692 goto bail;
1693
1694 status = o2net_register_handler(DLM_ASSERT_MASTER_MSG, dlm->key,
1695 sizeof(struct dlm_assert_master),
1696 dlm_assert_master_handler,
3b8118cf
KH
1697 dlm, dlm_assert_master_post_handler,
1698 &dlm->dlm_domain_handlers);
6714d8e8
KH
1699 if (status)
1700 goto bail;
1701
1702 status = o2net_register_handler(DLM_CREATE_LOCK_MSG, dlm->key,
1703 sizeof(struct dlm_create_lock),
1704 dlm_create_lock_handler,
d74c9803 1705 dlm, NULL, &dlm->dlm_domain_handlers);
6714d8e8
KH
1706 if (status)
1707 goto bail;
1708
1709 status = o2net_register_handler(DLM_CONVERT_LOCK_MSG, dlm->key,
1710 DLM_CONVERT_LOCK_MAX_LEN,
1711 dlm_convert_lock_handler,
d74c9803 1712 dlm, NULL, &dlm->dlm_domain_handlers);
6714d8e8
KH
1713 if (status)
1714 goto bail;
1715
1716 status = o2net_register_handler(DLM_UNLOCK_LOCK_MSG, dlm->key,
1717 DLM_UNLOCK_LOCK_MAX_LEN,
1718 dlm_unlock_lock_handler,
d74c9803 1719 dlm, NULL, &dlm->dlm_domain_handlers);
6714d8e8
KH
1720 if (status)
1721 goto bail;
1722
1723 status = o2net_register_handler(DLM_PROXY_AST_MSG, dlm->key,
1724 DLM_PROXY_AST_MAX_LEN,
1725 dlm_proxy_ast_handler,
d74c9803 1726 dlm, NULL, &dlm->dlm_domain_handlers);
6714d8e8
KH
1727 if (status)
1728 goto bail;
1729
1730 status = o2net_register_handler(DLM_EXIT_DOMAIN_MSG, dlm->key,
1731 sizeof(struct dlm_exit_domain),
1732 dlm_exit_domain_handler,
d74c9803 1733 dlm, NULL, &dlm->dlm_domain_handlers);
6714d8e8
KH
1734 if (status)
1735 goto bail;
1736
ba2bf218
KH
1737 status = o2net_register_handler(DLM_DEREF_LOCKRES_MSG, dlm->key,
1738 sizeof(struct dlm_deref_lockres),
1739 dlm_deref_lockres_handler,
d74c9803 1740 dlm, NULL, &dlm->dlm_domain_handlers);
ba2bf218
KH
1741 if (status)
1742 goto bail;
1743
6714d8e8
KH
1744 status = o2net_register_handler(DLM_MIGRATE_REQUEST_MSG, dlm->key,
1745 sizeof(struct dlm_migrate_request),
1746 dlm_migrate_request_handler,
d74c9803 1747 dlm, NULL, &dlm->dlm_domain_handlers);
6714d8e8
KH
1748 if (status)
1749 goto bail;
1750
1751 status = o2net_register_handler(DLM_MIG_LOCKRES_MSG, dlm->key,
1752 DLM_MIG_LOCKRES_MAX_LEN,
1753 dlm_mig_lockres_handler,
d74c9803 1754 dlm, NULL, &dlm->dlm_domain_handlers);
6714d8e8
KH
1755 if (status)
1756 goto bail;
1757
1758 status = o2net_register_handler(DLM_MASTER_REQUERY_MSG, dlm->key,
1759 sizeof(struct dlm_master_requery),
1760 dlm_master_requery_handler,
d74c9803 1761 dlm, NULL, &dlm->dlm_domain_handlers);
6714d8e8
KH
1762 if (status)
1763 goto bail;
1764
1765 status = o2net_register_handler(DLM_LOCK_REQUEST_MSG, dlm->key,
1766 sizeof(struct dlm_lock_request),
1767 dlm_request_all_locks_handler,
d74c9803 1768 dlm, NULL, &dlm->dlm_domain_handlers);
6714d8e8
KH
1769 if (status)
1770 goto bail;
1771
1772 status = o2net_register_handler(DLM_RECO_DATA_DONE_MSG, dlm->key,
1773 sizeof(struct dlm_reco_data_done),
1774 dlm_reco_data_done_handler,
d74c9803 1775 dlm, NULL, &dlm->dlm_domain_handlers);
6714d8e8
KH
1776 if (status)
1777 goto bail;
1778
1779 status = o2net_register_handler(DLM_BEGIN_RECO_MSG, dlm->key,
1780 sizeof(struct dlm_begin_reco),
1781 dlm_begin_reco_handler,
d74c9803 1782 dlm, NULL, &dlm->dlm_domain_handlers);
6714d8e8
KH
1783 if (status)
1784 goto bail;
1785
1786 status = o2net_register_handler(DLM_FINALIZE_RECO_MSG, dlm->key,
1787 sizeof(struct dlm_finalize_reco),
1788 dlm_finalize_reco_handler,
d74c9803 1789 dlm, NULL, &dlm->dlm_domain_handlers);
6714d8e8
KH
1790 if (status)
1791 goto bail;
1792
1793bail:
1794 if (status)
1795 dlm_unregister_domain_handlers(dlm);
1796
1797 return status;
1798}
1799
1800static int dlm_join_domain(struct dlm_ctxt *dlm)
1801{
1802 int status;
0dd82141
SM
1803 unsigned int backoff;
1804 unsigned int total_backoff = 0;
6714d8e8
KH
1805
1806 BUG_ON(!dlm);
1807
1808 mlog(0, "Join domain %s\n", dlm->name);
1809
1810 status = dlm_register_domain_handlers(dlm);
1811 if (status) {
1812 mlog_errno(status);
1813 goto bail;
1814 }
1815
007dce53
SM
1816 status = dlm_debug_init(dlm);
1817 if (status < 0) {
1818 mlog_errno(status);
1819 goto bail;
1820 }
1821
6714d8e8
KH
1822 status = dlm_launch_thread(dlm);
1823 if (status < 0) {
1824 mlog_errno(status);
1825 goto bail;
1826 }
1827
1828 status = dlm_launch_recovery_thread(dlm);
1829 if (status < 0) {
1830 mlog_errno(status);
1831 goto bail;
1832 }
1833
3156d267
KH
1834 dlm->dlm_worker = create_singlethread_workqueue("dlm_wq");
1835 if (!dlm->dlm_worker) {
1836 status = -ENOMEM;
1837 mlog_errno(status);
1838 goto bail;
1839 }
1840
6714d8e8 1841 do {
6714d8e8
KH
1842 status = dlm_try_to_join_domain(dlm);
1843
1844 /* If we're racing another node to the join, then we
1845 * need to back off temporarily and let them
1846 * complete. */
0dd82141 1847#define DLM_JOIN_TIMEOUT_MSECS 90000
6714d8e8
KH
1848 if (status == -EAGAIN) {
1849 if (signal_pending(current)) {
1850 status = -ERESTARTSYS;
1851 goto bail;
1852 }
1853
0dd82141
SM
1854 if (total_backoff >
1855 msecs_to_jiffies(DLM_JOIN_TIMEOUT_MSECS)) {
1856 status = -ERESTARTSYS;
1857 mlog(ML_NOTICE, "Timed out joining dlm domain "
1858 "%s after %u msecs\n", dlm->name,
1859 jiffies_to_msecs(total_backoff));
1860 goto bail;
1861 }
1862
6714d8e8
KH
1863 /*
1864 * <chip> After you!
1865 * <dale> No, after you!
1866 * <chip> I insist!
1867 * <dale> But you first!
1868 * ...
1869 */
1870 backoff = (unsigned int)(jiffies & 0x3);
1871 backoff *= DLM_DOMAIN_BACKOFF_MS;
0dd82141 1872 total_backoff += backoff;
6714d8e8
KH
1873 mlog(0, "backoff %d\n", backoff);
1874 msleep(backoff);
1875 }
1876 } while (status == -EAGAIN);
1877
1878 if (status < 0) {
1879 mlog_errno(status);
1880 goto bail;
1881 }
1882
1883 status = 0;
1884bail:
1885 wake_up(&dlm_domain_events);
1886
1887 if (status) {
1888 dlm_unregister_domain_handlers(dlm);
007dce53 1889 dlm_debug_shutdown(dlm);
6714d8e8
KH
1890 dlm_complete_thread(dlm);
1891 dlm_complete_recovery_thread(dlm);
3156d267 1892 dlm_destroy_dlm_worker(dlm);
6714d8e8
KH
1893 }
1894
1895 return status;
1896}
1897
1898static struct dlm_ctxt *dlm_alloc_ctxt(const char *domain,
1899 u32 key)
1900{
1901 int i;
6325b4a2 1902 int ret;
6714d8e8
KH
1903 struct dlm_ctxt *dlm = NULL;
1904
cd861280 1905 dlm = kzalloc(sizeof(*dlm), GFP_KERNEL);
6714d8e8
KH
1906 if (!dlm) {
1907 mlog_errno(-ENOMEM);
1908 goto leave;
1909 }
1910
316ce2ba 1911 dlm->name = kstrdup(domain, GFP_KERNEL);
6714d8e8
KH
1912 if (dlm->name == NULL) {
1913 mlog_errno(-ENOMEM);
1914 kfree(dlm);
1915 dlm = NULL;
1916 goto leave;
1917 }
1918
03d864c0 1919 dlm->lockres_hash = (struct hlist_head **)dlm_alloc_pagevec(DLM_HASH_PAGES);
81f2094a 1920 if (!dlm->lockres_hash) {
6714d8e8
KH
1921 mlog_errno(-ENOMEM);
1922 kfree(dlm->name);
1923 kfree(dlm);
1924 dlm = NULL;
1925 goto leave;
1926 }
6714d8e8 1927
03d864c0
DP
1928 for (i = 0; i < DLM_HASH_BUCKETS; i++)
1929 INIT_HLIST_HEAD(dlm_lockres_hash(dlm, i));
6714d8e8 1930
e2b66ddc
SM
1931 dlm->master_hash = (struct hlist_head **)
1932 dlm_alloc_pagevec(DLM_HASH_PAGES);
1933 if (!dlm->master_hash) {
1934 mlog_errno(-ENOMEM);
1935 dlm_free_pagevec((void **)dlm->lockres_hash, DLM_HASH_PAGES);
1936 kfree(dlm->name);
1937 kfree(dlm);
1938 dlm = NULL;
1939 goto leave;
1940 }
1941
1942 for (i = 0; i < DLM_HASH_BUCKETS; i++)
1943 INIT_HLIST_HEAD(dlm_master_hash(dlm, i));
1944
6714d8e8
KH
1945 dlm->key = key;
1946 dlm->node_num = o2nm_this_node();
1947
6325b4a2
SM
1948 ret = dlm_create_debugfs_subroot(dlm);
1949 if (ret < 0) {
e2b66ddc 1950 dlm_free_pagevec((void **)dlm->master_hash, DLM_HASH_PAGES);
6325b4a2
SM
1951 dlm_free_pagevec((void **)dlm->lockres_hash, DLM_HASH_PAGES);
1952 kfree(dlm->name);
1953 kfree(dlm);
1954 dlm = NULL;
1955 goto leave;
1956 }
1957
6714d8e8
KH
1958 spin_lock_init(&dlm->spinlock);
1959 spin_lock_init(&dlm->master_lock);
1960 spin_lock_init(&dlm->ast_lock);
b0d4f817 1961 spin_lock_init(&dlm->track_lock);
6714d8e8
KH
1962 INIT_LIST_HEAD(&dlm->list);
1963 INIT_LIST_HEAD(&dlm->dirty_list);
1964 INIT_LIST_HEAD(&dlm->reco.resources);
1965 INIT_LIST_HEAD(&dlm->reco.received);
1966 INIT_LIST_HEAD(&dlm->reco.node_data);
1967 INIT_LIST_HEAD(&dlm->purge_list);
1968 INIT_LIST_HEAD(&dlm->dlm_domain_handlers);
29576f8b 1969 INIT_LIST_HEAD(&dlm->tracking_list);
6714d8e8
KH
1970 dlm->reco.state = 0;
1971
1972 INIT_LIST_HEAD(&dlm->pending_asts);
1973 INIT_LIST_HEAD(&dlm->pending_basts);
1974
1975 mlog(0, "dlm->recovery_map=%p, &(dlm->recovery_map[0])=%p\n",
1976 dlm->recovery_map, &(dlm->recovery_map[0]));
1977
1978 memset(dlm->recovery_map, 0, sizeof(dlm->recovery_map));
1979 memset(dlm->live_nodes_map, 0, sizeof(dlm->live_nodes_map));
1980 memset(dlm->domain_map, 0, sizeof(dlm->domain_map));
1981
1982 dlm->dlm_thread_task = NULL;
1983 dlm->dlm_reco_thread_task = NULL;
3156d267 1984 dlm->dlm_worker = NULL;
6714d8e8
KH
1985 init_waitqueue_head(&dlm->dlm_thread_wq);
1986 init_waitqueue_head(&dlm->dlm_reco_thread_wq);
1987 init_waitqueue_head(&dlm->reco.event);
1988 init_waitqueue_head(&dlm->ast_wq);
1989 init_waitqueue_head(&dlm->migration_wq);
6714d8e8
KH
1990 INIT_LIST_HEAD(&dlm->mle_hb_events);
1991
1992 dlm->joining_node = DLM_LOCK_RES_OWNER_UNKNOWN;
1993 init_waitqueue_head(&dlm->dlm_join_events);
1994
1995 dlm->reco.new_master = O2NM_INVALID_NODE_NUM;
1996 dlm->reco.dead_node = O2NM_INVALID_NODE_NUM;
6714d8e8 1997
6800791a
SM
1998 atomic_set(&dlm->res_tot_count, 0);
1999 atomic_set(&dlm->res_cur_count, 0);
2041d8fd
SM
2000 for (i = 0; i < DLM_MLE_NUM_TYPES; ++i) {
2001 atomic_set(&dlm->mle_tot_count[i], 0);
2002 atomic_set(&dlm->mle_cur_count[i], 0);
2003 }
2004
6714d8e8
KH
2005 spin_lock_init(&dlm->work_lock);
2006 INIT_LIST_HEAD(&dlm->work_list);
c4028958 2007 INIT_WORK(&dlm->dispatched_work, dlm_dispatch_work);
6714d8e8
KH
2008
2009 kref_init(&dlm->dlm_refs);
2010 dlm->dlm_state = DLM_CTXT_NEW;
2011
2012 INIT_LIST_HEAD(&dlm->dlm_eviction_callbacks);
2013
2014 mlog(0, "context init: refcount %u\n",
2015 atomic_read(&dlm->dlm_refs.refcount));
2016
2017leave:
2018 return dlm;
2019}
2020
2021/*
d24fbcda
JB
2022 * Compare a requested locking protocol version against the current one.
2023 *
2024 * If the major numbers are different, they are incompatible.
2025 * If the current minor is greater than the request, they are incompatible.
2026 * If the current minor is less than or equal to the request, they are
2027 * compatible, and the requester should run at the current minor version.
2028 */
2029static int dlm_protocol_compare(struct dlm_protocol_version *existing,
2030 struct dlm_protocol_version *request)
2031{
2032 if (existing->pv_major != request->pv_major)
2033 return 1;
2034
2035 if (existing->pv_minor > request->pv_minor)
2036 return 1;
2037
2038 if (existing->pv_minor < request->pv_minor)
2039 request->pv_minor = existing->pv_minor;
2040
2041 return 0;
2042}
2043
2044/*
2045 * dlm_register_domain: one-time setup per "domain".
2046 *
2047 * The filesystem passes in the requested locking version via proto.
2048 * If registration was successful, proto will contain the negotiated
2049 * locking protocol.
6714d8e8
KH
2050 */
2051struct dlm_ctxt * dlm_register_domain(const char *domain,
d24fbcda
JB
2052 u32 key,
2053 struct dlm_protocol_version *fs_proto)
6714d8e8
KH
2054{
2055 int ret;
2056 struct dlm_ctxt *dlm = NULL;
2057 struct dlm_ctxt *new_ctxt = NULL;
2058
e372357b 2059 if (strlen(domain) >= O2NM_MAX_NAME_LEN) {
6714d8e8
KH
2060 ret = -ENAMETOOLONG;
2061 mlog(ML_ERROR, "domain name length too long\n");
2062 goto leave;
2063 }
2064
2065 if (!o2hb_check_local_node_heartbeating()) {
2066 mlog(ML_ERROR, "the local node has not been configured, or is "
2067 "not heartbeating\n");
2068 ret = -EPROTO;
2069 goto leave;
2070 }
2071
2072 mlog(0, "register called for domain \"%s\"\n", domain);
2073
2074retry:
2075 dlm = NULL;
2076 if (signal_pending(current)) {
2077 ret = -ERESTARTSYS;
2078 mlog_errno(ret);
2079 goto leave;
2080 }
2081
2082 spin_lock(&dlm_domain_lock);
2083
2084 dlm = __dlm_lookup_domain(domain);
2085 if (dlm) {
2086 if (dlm->dlm_state != DLM_CTXT_JOINED) {
2087 spin_unlock(&dlm_domain_lock);
2088
2089 mlog(0, "This ctxt is not joined yet!\n");
2090 wait_event_interruptible(dlm_domain_events,
2091 dlm_wait_on_domain_helper(
2092 domain));
2093 goto retry;
2094 }
2095
d24fbcda 2096 if (dlm_protocol_compare(&dlm->fs_locking_proto, fs_proto)) {
6469272c 2097 spin_unlock(&dlm_domain_lock);
d24fbcda
JB
2098 mlog(ML_ERROR,
2099 "Requested locking protocol version is not "
2100 "compatible with already registered domain "
2101 "\"%s\"\n", domain);
2102 ret = -EPROTO;
2103 goto leave;
2104 }
2105
6714d8e8
KH
2106 __dlm_get(dlm);
2107 dlm->num_joins++;
2108
2109 spin_unlock(&dlm_domain_lock);
2110
2111 ret = 0;
2112 goto leave;
2113 }
2114
2115 /* doesn't exist */
2116 if (!new_ctxt) {
2117 spin_unlock(&dlm_domain_lock);
2118
2119 new_ctxt = dlm_alloc_ctxt(domain, key);
2120 if (new_ctxt)
2121 goto retry;
2122
2123 ret = -ENOMEM;
2124 mlog_errno(ret);
2125 goto leave;
2126 }
2127
2128 /* a little variable switch-a-roo here... */
2129 dlm = new_ctxt;
2130 new_ctxt = NULL;
2131
2132 /* add the new domain */
2133 list_add_tail(&dlm->list, &dlm_domains);
2134 spin_unlock(&dlm_domain_lock);
2135
d24fbcda
JB
2136 /*
2137 * Pass the locking protocol version into the join. If the join
2138 * succeeds, it will have the negotiated protocol set.
2139 */
2140 dlm->dlm_locking_proto = dlm_protocol;
2141 dlm->fs_locking_proto = *fs_proto;
2142
6714d8e8
KH
2143 ret = dlm_join_domain(dlm);
2144 if (ret) {
2145 mlog_errno(ret);
2146 dlm_put(dlm);
2147 goto leave;
2148 }
2149
d24fbcda
JB
2150 /* Tell the caller what locking protocol we negotiated */
2151 *fs_proto = dlm->fs_locking_proto;
2152
6714d8e8
KH
2153 ret = 0;
2154leave:
2155 if (new_ctxt)
2156 dlm_free_ctxt_mem(new_ctxt);
2157
2158 if (ret < 0)
2159 dlm = ERR_PTR(ret);
2160
2161 return dlm;
2162}
2163EXPORT_SYMBOL_GPL(dlm_register_domain);
2164
2165static LIST_HEAD(dlm_join_handlers);
2166
2167static void dlm_unregister_net_handlers(void)
2168{
2169 o2net_unregister_handler_list(&dlm_join_handlers);
2170}
2171
2172static int dlm_register_net_handlers(void)
2173{
2174 int status = 0;
2175
2176 status = o2net_register_handler(DLM_QUERY_JOIN_MSG, DLM_MOD_KEY,
2177 sizeof(struct dlm_query_join_request),
2178 dlm_query_join_handler,
d74c9803 2179 NULL, NULL, &dlm_join_handlers);
6714d8e8
KH
2180 if (status)
2181 goto bail;
2182
2183 status = o2net_register_handler(DLM_ASSERT_JOINED_MSG, DLM_MOD_KEY,
2184 sizeof(struct dlm_assert_joined),
2185 dlm_assert_joined_handler,
d74c9803 2186 NULL, NULL, &dlm_join_handlers);
6714d8e8
KH
2187 if (status)
2188 goto bail;
2189
2190 status = o2net_register_handler(DLM_CANCEL_JOIN_MSG, DLM_MOD_KEY,
2191 sizeof(struct dlm_cancel_join),
2192 dlm_cancel_join_handler,
d74c9803 2193 NULL, NULL, &dlm_join_handlers);
ea203441
SM
2194 if (status)
2195 goto bail;
2196
2197 status = o2net_register_handler(DLM_QUERY_REGION, DLM_MOD_KEY,
2198 sizeof(struct dlm_query_region),
2199 dlm_query_region_handler,
2200 NULL, NULL, &dlm_join_handlers);
6714d8e8 2201
18cfdf1b
SM
2202 if (status)
2203 goto bail;
2204
2205 status = o2net_register_handler(DLM_QUERY_NODEINFO, DLM_MOD_KEY,
2206 sizeof(struct dlm_query_nodeinfo),
2207 dlm_query_nodeinfo_handler,
2208 NULL, NULL, &dlm_join_handlers);
6714d8e8
KH
2209bail:
2210 if (status < 0)
2211 dlm_unregister_net_handlers();
2212
2213 return status;
2214}
2215
2216/* Domain eviction callback handling.
2217 *
2218 * The file system requires notification of node death *before* the
2219 * dlm completes it's recovery work, otherwise it may be able to
2220 * acquire locks on resources requiring recovery. Since the dlm can
2221 * evict a node from it's domain *before* heartbeat fires, a similar
2222 * mechanism is required. */
2223
2224/* Eviction is not expected to happen often, so a per-domain lock is
2225 * not necessary. Eviction callbacks are allowed to sleep for short
2226 * periods of time. */
2227static DECLARE_RWSEM(dlm_callback_sem);
2228
2229void dlm_fire_domain_eviction_callbacks(struct dlm_ctxt *dlm,
2230 int node_num)
2231{
2232 struct list_head *iter;
2233 struct dlm_eviction_cb *cb;
2234
2235 down_read(&dlm_callback_sem);
2236 list_for_each(iter, &dlm->dlm_eviction_callbacks) {
2237 cb = list_entry(iter, struct dlm_eviction_cb, ec_item);
2238
2239 cb->ec_func(node_num, cb->ec_data);
2240 }
2241 up_read(&dlm_callback_sem);
2242}
2243
2244void dlm_setup_eviction_cb(struct dlm_eviction_cb *cb,
2245 dlm_eviction_func *f,
2246 void *data)
2247{
2248 INIT_LIST_HEAD(&cb->ec_item);
2249 cb->ec_func = f;
2250 cb->ec_data = data;
2251}
2252EXPORT_SYMBOL_GPL(dlm_setup_eviction_cb);
2253
2254void dlm_register_eviction_cb(struct dlm_ctxt *dlm,
2255 struct dlm_eviction_cb *cb)
2256{
2257 down_write(&dlm_callback_sem);
2258 list_add_tail(&cb->ec_item, &dlm->dlm_eviction_callbacks);
2259 up_write(&dlm_callback_sem);
2260}
2261EXPORT_SYMBOL_GPL(dlm_register_eviction_cb);
2262
2263void dlm_unregister_eviction_cb(struct dlm_eviction_cb *cb)
2264{
2265 down_write(&dlm_callback_sem);
2266 list_del_init(&cb->ec_item);
2267 up_write(&dlm_callback_sem);
2268}
2269EXPORT_SYMBOL_GPL(dlm_unregister_eviction_cb);
2270
2271static int __init dlm_init(void)
2272{
2273 int status;
2274
2275 dlm_print_version();
2276
2277 status = dlm_init_mle_cache();
12eb0035
SM
2278 if (status) {
2279 mlog(ML_ERROR, "Could not create o2dlm_mle slabcache\n");
724bdca9
SM
2280 goto error;
2281 }
2282
2283 status = dlm_init_master_caches();
2284 if (status) {
2285 mlog(ML_ERROR, "Could not create o2dlm_lockres and "
2286 "o2dlm_lockname slabcaches\n");
2287 goto error;
2288 }
2289
2290 status = dlm_init_lock_cache();
2291 if (status) {
2292 mlog(ML_ERROR, "Count not create o2dlm_lock slabcache\n");
2293 goto error;
12eb0035 2294 }
6714d8e8
KH
2295
2296 status = dlm_register_net_handlers();
2297 if (status) {
724bdca9
SM
2298 mlog(ML_ERROR, "Unable to register network handlers\n");
2299 goto error;
6714d8e8
KH
2300 }
2301
6325b4a2
SM
2302 status = dlm_create_debugfs_root();
2303 if (status)
2304 goto error;
2305
6714d8e8 2306 return 0;
724bdca9 2307error:
6325b4a2 2308 dlm_unregister_net_handlers();
724bdca9
SM
2309 dlm_destroy_lock_cache();
2310 dlm_destroy_master_caches();
2311 dlm_destroy_mle_cache();
2312 return -1;
6714d8e8
KH
2313}
2314
2315static void __exit dlm_exit (void)
2316{
6325b4a2 2317 dlm_destroy_debugfs_root();
6714d8e8 2318 dlm_unregister_net_handlers();
724bdca9
SM
2319 dlm_destroy_lock_cache();
2320 dlm_destroy_master_caches();
6714d8e8
KH
2321 dlm_destroy_mle_cache();
2322}
2323
2324MODULE_AUTHOR("Oracle");
2325MODULE_LICENSE("GPL");
2326
2327module_init(dlm_init);
2328module_exit(dlm_exit);
This page took 0.565695 seconds and 5 git commands to generate.