[GFS2] Fix bug in writepage()
[deliverable/linux.git] / fs / dlm / lock.c
CommitLineData
e7fd4179
DT
1/******************************************************************************
2*******************************************************************************
3**
4** Copyright (C) 2005 Red Hat, Inc. All rights reserved.
5**
6** This copyrighted material is made available to anyone wishing to use,
7** modify, copy, or redistribute it subject to the terms and conditions
8** of the GNU General Public License v.2.
9**
10*******************************************************************************
11******************************************************************************/
12
13/* Central locking logic has four stages:
14
15 dlm_lock()
16 dlm_unlock()
17
18 request_lock(ls, lkb)
19 convert_lock(ls, lkb)
20 unlock_lock(ls, lkb)
21 cancel_lock(ls, lkb)
22
23 _request_lock(r, lkb)
24 _convert_lock(r, lkb)
25 _unlock_lock(r, lkb)
26 _cancel_lock(r, lkb)
27
28 do_request(r, lkb)
29 do_convert(r, lkb)
30 do_unlock(r, lkb)
31 do_cancel(r, lkb)
32
33 Stage 1 (lock, unlock) is mainly about checking input args and
34 splitting into one of the four main operations:
35
36 dlm_lock = request_lock
37 dlm_lock+CONVERT = convert_lock
38 dlm_unlock = unlock_lock
39 dlm_unlock+CANCEL = cancel_lock
40
41 Stage 2, xxxx_lock(), just finds and locks the relevant rsb which is
42 provided to the next stage.
43
44 Stage 3, _xxxx_lock(), determines if the operation is local or remote.
45 When remote, it calls send_xxxx(), when local it calls do_xxxx().
46
47 Stage 4, do_xxxx(), is the guts of the operation. It manipulates the
48 given rsb and lkb and queues callbacks.
49
50 For remote operations, send_xxxx() results in the corresponding do_xxxx()
51 function being executed on the remote node. The connecting send/receive
52 calls on local (L) and remote (R) nodes:
53
54 L: send_xxxx() -> R: receive_xxxx()
55 R: do_xxxx()
56 L: receive_xxxx_reply() <- R: send_xxxx_reply()
57*/
58
59#include "dlm_internal.h"
60#include "memory.h"
61#include "lowcomms.h"
62#include "requestqueue.h"
63#include "util.h"
64#include "dir.h"
65#include "member.h"
66#include "lockspace.h"
67#include "ast.h"
68#include "lock.h"
69#include "rcom.h"
70#include "recover.h"
71#include "lvb_table.h"
72#include "config.h"
73
74static int send_request(struct dlm_rsb *r, struct dlm_lkb *lkb);
75static int send_convert(struct dlm_rsb *r, struct dlm_lkb *lkb);
76static int send_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb);
77static int send_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb);
78static int send_grant(struct dlm_rsb *r, struct dlm_lkb *lkb);
79static int send_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int mode);
80static int send_lookup(struct dlm_rsb *r, struct dlm_lkb *lkb);
81static int send_remove(struct dlm_rsb *r);
82static int _request_lock(struct dlm_rsb *r, struct dlm_lkb *lkb);
83static void __receive_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
84 struct dlm_message *ms);
85static int receive_extralen(struct dlm_message *ms);
86
87/*
88 * Lock compatibilty matrix - thanks Steve
89 * UN = Unlocked state. Not really a state, used as a flag
90 * PD = Padding. Used to make the matrix a nice power of two in size
91 * Other states are the same as the VMS DLM.
92 * Usage: matrix[grmode+1][rqmode+1] (although m[rq+1][gr+1] is the same)
93 */
94
95static const int __dlm_compat_matrix[8][8] = {
96 /* UN NL CR CW PR PW EX PD */
97 {1, 1, 1, 1, 1, 1, 1, 0}, /* UN */
98 {1, 1, 1, 1, 1, 1, 1, 0}, /* NL */
99 {1, 1, 1, 1, 1, 1, 0, 0}, /* CR */
100 {1, 1, 1, 1, 0, 0, 0, 0}, /* CW */
101 {1, 1, 1, 0, 1, 0, 0, 0}, /* PR */
102 {1, 1, 1, 0, 0, 0, 0, 0}, /* PW */
103 {1, 1, 0, 0, 0, 0, 0, 0}, /* EX */
104 {0, 0, 0, 0, 0, 0, 0, 0} /* PD */
105};
106
107/*
108 * This defines the direction of transfer of LVB data.
109 * Granted mode is the row; requested mode is the column.
110 * Usage: matrix[grmode+1][rqmode+1]
111 * 1 = LVB is returned to the caller
112 * 0 = LVB is written to the resource
113 * -1 = nothing happens to the LVB
114 */
115
116const int dlm_lvb_operations[8][8] = {
117 /* UN NL CR CW PR PW EX PD*/
118 { -1, 1, 1, 1, 1, 1, 1, -1 }, /* UN */
119 { -1, 1, 1, 1, 1, 1, 1, 0 }, /* NL */
120 { -1, -1, 1, 1, 1, 1, 1, 0 }, /* CR */
121 { -1, -1, -1, 1, 1, 1, 1, 0 }, /* CW */
122 { -1, -1, -1, -1, 1, 1, 1, 0 }, /* PR */
123 { -1, 0, 0, 0, 0, 0, 1, 0 }, /* PW */
124 { -1, 0, 0, 0, 0, 0, 0, 0 }, /* EX */
125 { -1, 0, 0, 0, 0, 0, 0, 0 } /* PD */
126};
127EXPORT_SYMBOL_GPL(dlm_lvb_operations);
128
129#define modes_compat(gr, rq) \
130 __dlm_compat_matrix[(gr)->lkb_grmode + 1][(rq)->lkb_rqmode + 1]
131
132int dlm_modes_compat(int mode1, int mode2)
133{
134 return __dlm_compat_matrix[mode1 + 1][mode2 + 1];
135}
136
137/*
138 * Compatibility matrix for conversions with QUECVT set.
139 * Granted mode is the row; requested mode is the column.
140 * Usage: matrix[grmode+1][rqmode+1]
141 */
142
143static const int __quecvt_compat_matrix[8][8] = {
144 /* UN NL CR CW PR PW EX PD */
145 {0, 0, 0, 0, 0, 0, 0, 0}, /* UN */
146 {0, 0, 1, 1, 1, 1, 1, 0}, /* NL */
147 {0, 0, 0, 1, 1, 1, 1, 0}, /* CR */
148 {0, 0, 0, 0, 1, 1, 1, 0}, /* CW */
149 {0, 0, 0, 1, 0, 1, 1, 0}, /* PR */
150 {0, 0, 0, 0, 0, 0, 1, 0}, /* PW */
151 {0, 0, 0, 0, 0, 0, 0, 0}, /* EX */
152 {0, 0, 0, 0, 0, 0, 0, 0} /* PD */
153};
154
155static void dlm_print_lkb(struct dlm_lkb *lkb)
156{
157 printk(KERN_ERR "lkb: nodeid %d id %x remid %x exflags %x flags %x\n"
158 " status %d rqmode %d grmode %d wait_type %d ast_type %d\n",
159 lkb->lkb_nodeid, lkb->lkb_id, lkb->lkb_remid, lkb->lkb_exflags,
160 lkb->lkb_flags, lkb->lkb_status, lkb->lkb_rqmode,
161 lkb->lkb_grmode, lkb->lkb_wait_type, lkb->lkb_ast_type);
162}
163
164void dlm_print_rsb(struct dlm_rsb *r)
165{
166 printk(KERN_ERR "rsb: nodeid %d flags %lx first %x rlc %d name %s\n",
167 r->res_nodeid, r->res_flags, r->res_first_lkid,
168 r->res_recover_locks_count, r->res_name);
169}
170
171/* Threads cannot use the lockspace while it's being recovered */
172
173static inline void lock_recovery(struct dlm_ls *ls)
174{
175 down_read(&ls->ls_in_recovery);
176}
177
178static inline void unlock_recovery(struct dlm_ls *ls)
179{
180 up_read(&ls->ls_in_recovery);
181}
182
183static inline int lock_recovery_try(struct dlm_ls *ls)
184{
185 return down_read_trylock(&ls->ls_in_recovery);
186}
187
188static inline int can_be_queued(struct dlm_lkb *lkb)
189{
190 return !(lkb->lkb_exflags & DLM_LKF_NOQUEUE);
191}
192
193static inline int force_blocking_asts(struct dlm_lkb *lkb)
194{
195 return (lkb->lkb_exflags & DLM_LKF_NOQUEUEBAST);
196}
197
198static inline int is_demoted(struct dlm_lkb *lkb)
199{
200 return (lkb->lkb_sbflags & DLM_SBF_DEMOTED);
201}
202
203static inline int is_remote(struct dlm_rsb *r)
204{
205 DLM_ASSERT(r->res_nodeid >= 0, dlm_print_rsb(r););
206 return !!r->res_nodeid;
207}
208
209static inline int is_process_copy(struct dlm_lkb *lkb)
210{
211 return (lkb->lkb_nodeid && !(lkb->lkb_flags & DLM_IFL_MSTCPY));
212}
213
214static inline int is_master_copy(struct dlm_lkb *lkb)
215{
216 if (lkb->lkb_flags & DLM_IFL_MSTCPY)
217 DLM_ASSERT(lkb->lkb_nodeid, dlm_print_lkb(lkb););
90135925 218 return (lkb->lkb_flags & DLM_IFL_MSTCPY) ? 1 : 0;
e7fd4179
DT
219}
220
221static inline int middle_conversion(struct dlm_lkb *lkb)
222{
223 if ((lkb->lkb_grmode==DLM_LOCK_PR && lkb->lkb_rqmode==DLM_LOCK_CW) ||
224 (lkb->lkb_rqmode==DLM_LOCK_PR && lkb->lkb_grmode==DLM_LOCK_CW))
90135925
DT
225 return 1;
226 return 0;
e7fd4179
DT
227}
228
229static inline int down_conversion(struct dlm_lkb *lkb)
230{
231 return (!middle_conversion(lkb) && lkb->lkb_rqmode < lkb->lkb_grmode);
232}
233
234static void queue_cast(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
235{
236 if (is_master_copy(lkb))
237 return;
238
239 DLM_ASSERT(lkb->lkb_lksb, dlm_print_lkb(lkb););
240
241 lkb->lkb_lksb->sb_status = rv;
242 lkb->lkb_lksb->sb_flags = lkb->lkb_sbflags;
243
244 dlm_add_ast(lkb, AST_COMP);
245}
246
247static void queue_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int rqmode)
248{
249 if (is_master_copy(lkb))
250 send_bast(r, lkb, rqmode);
251 else {
252 lkb->lkb_bastmode = rqmode;
253 dlm_add_ast(lkb, AST_BAST);
254 }
255}
256
257/*
258 * Basic operations on rsb's and lkb's
259 */
260
261static struct dlm_rsb *create_rsb(struct dlm_ls *ls, char *name, int len)
262{
263 struct dlm_rsb *r;
264
265 r = allocate_rsb(ls, len);
266 if (!r)
267 return NULL;
268
269 r->res_ls = ls;
270 r->res_length = len;
271 memcpy(r->res_name, name, len);
90135925 272 mutex_init(&r->res_mutex);
e7fd4179
DT
273
274 INIT_LIST_HEAD(&r->res_lookup);
275 INIT_LIST_HEAD(&r->res_grantqueue);
276 INIT_LIST_HEAD(&r->res_convertqueue);
277 INIT_LIST_HEAD(&r->res_waitqueue);
278 INIT_LIST_HEAD(&r->res_root_list);
279 INIT_LIST_HEAD(&r->res_recover_list);
280
281 return r;
282}
283
284static int search_rsb_list(struct list_head *head, char *name, int len,
285 unsigned int flags, struct dlm_rsb **r_ret)
286{
287 struct dlm_rsb *r;
288 int error = 0;
289
290 list_for_each_entry(r, head, res_hashchain) {
291 if (len == r->res_length && !memcmp(name, r->res_name, len))
292 goto found;
293 }
294 return -ENOENT;
295
296 found:
297 if (r->res_nodeid && (flags & R_MASTER))
298 error = -ENOTBLK;
299 *r_ret = r;
300 return error;
301}
302
303static int _search_rsb(struct dlm_ls *ls, char *name, int len, int b,
304 unsigned int flags, struct dlm_rsb **r_ret)
305{
306 struct dlm_rsb *r;
307 int error;
308
309 error = search_rsb_list(&ls->ls_rsbtbl[b].list, name, len, flags, &r);
310 if (!error) {
311 kref_get(&r->res_ref);
312 goto out;
313 }
314 error = search_rsb_list(&ls->ls_rsbtbl[b].toss, name, len, flags, &r);
315 if (error)
316 goto out;
317
318 list_move(&r->res_hashchain, &ls->ls_rsbtbl[b].list);
319
320 if (dlm_no_directory(ls))
321 goto out;
322
323 if (r->res_nodeid == -1) {
324 rsb_clear_flag(r, RSB_MASTER_UNCERTAIN);
325 r->res_first_lkid = 0;
326 } else if (r->res_nodeid > 0) {
327 rsb_set_flag(r, RSB_MASTER_UNCERTAIN);
328 r->res_first_lkid = 0;
329 } else {
330 DLM_ASSERT(r->res_nodeid == 0, dlm_print_rsb(r););
331 DLM_ASSERT(!rsb_flag(r, RSB_MASTER_UNCERTAIN),);
332 }
333 out:
334 *r_ret = r;
335 return error;
336}
337
338static int search_rsb(struct dlm_ls *ls, char *name, int len, int b,
339 unsigned int flags, struct dlm_rsb **r_ret)
340{
341 int error;
342 write_lock(&ls->ls_rsbtbl[b].lock);
343 error = _search_rsb(ls, name, len, b, flags, r_ret);
344 write_unlock(&ls->ls_rsbtbl[b].lock);
345 return error;
346}
347
348/*
349 * Find rsb in rsbtbl and potentially create/add one
350 *
351 * Delaying the release of rsb's has a similar benefit to applications keeping
352 * NL locks on an rsb, but without the guarantee that the cached master value
353 * will still be valid when the rsb is reused. Apps aren't always smart enough
354 * to keep NL locks on an rsb that they may lock again shortly; this can lead
355 * to excessive master lookups and removals if we don't delay the release.
356 *
357 * Searching for an rsb means looking through both the normal list and toss
358 * list. When found on the toss list the rsb is moved to the normal list with
359 * ref count of 1; when found on normal list the ref count is incremented.
360 */
361
362static int find_rsb(struct dlm_ls *ls, char *name, int namelen,
363 unsigned int flags, struct dlm_rsb **r_ret)
364{
365 struct dlm_rsb *r, *tmp;
366 uint32_t hash, bucket;
367 int error = 0;
368
369 if (dlm_no_directory(ls))
370 flags |= R_CREATE;
371
372 hash = jhash(name, namelen, 0);
373 bucket = hash & (ls->ls_rsbtbl_size - 1);
374
375 error = search_rsb(ls, name, namelen, bucket, flags, &r);
376 if (!error)
377 goto out;
378
379 if (error == -ENOENT && !(flags & R_CREATE))
380 goto out;
381
382 /* the rsb was found but wasn't a master copy */
383 if (error == -ENOTBLK)
384 goto out;
385
386 error = -ENOMEM;
387 r = create_rsb(ls, name, namelen);
388 if (!r)
389 goto out;
390
391 r->res_hash = hash;
392 r->res_bucket = bucket;
393 r->res_nodeid = -1;
394 kref_init(&r->res_ref);
395
396 /* With no directory, the master can be set immediately */
397 if (dlm_no_directory(ls)) {
398 int nodeid = dlm_dir_nodeid(r);
399 if (nodeid == dlm_our_nodeid())
400 nodeid = 0;
401 r->res_nodeid = nodeid;
402 }
403
404 write_lock(&ls->ls_rsbtbl[bucket].lock);
405 error = _search_rsb(ls, name, namelen, bucket, 0, &tmp);
406 if (!error) {
407 write_unlock(&ls->ls_rsbtbl[bucket].lock);
408 free_rsb(r);
409 r = tmp;
410 goto out;
411 }
412 list_add(&r->res_hashchain, &ls->ls_rsbtbl[bucket].list);
413 write_unlock(&ls->ls_rsbtbl[bucket].lock);
414 error = 0;
415 out:
416 *r_ret = r;
417 return error;
418}
419
420int dlm_find_rsb(struct dlm_ls *ls, char *name, int namelen,
421 unsigned int flags, struct dlm_rsb **r_ret)
422{
423 return find_rsb(ls, name, namelen, flags, r_ret);
424}
425
426/* This is only called to add a reference when the code already holds
427 a valid reference to the rsb, so there's no need for locking. */
428
429static inline void hold_rsb(struct dlm_rsb *r)
430{
431 kref_get(&r->res_ref);
432}
433
434void dlm_hold_rsb(struct dlm_rsb *r)
435{
436 hold_rsb(r);
437}
438
439static void toss_rsb(struct kref *kref)
440{
441 struct dlm_rsb *r = container_of(kref, struct dlm_rsb, res_ref);
442 struct dlm_ls *ls = r->res_ls;
443
444 DLM_ASSERT(list_empty(&r->res_root_list), dlm_print_rsb(r););
445 kref_init(&r->res_ref);
446 list_move(&r->res_hashchain, &ls->ls_rsbtbl[r->res_bucket].toss);
447 r->res_toss_time = jiffies;
448 if (r->res_lvbptr) {
449 free_lvb(r->res_lvbptr);
450 r->res_lvbptr = NULL;
451 }
452}
453
454/* When all references to the rsb are gone it's transfered to
455 the tossed list for later disposal. */
456
457static void put_rsb(struct dlm_rsb *r)
458{
459 struct dlm_ls *ls = r->res_ls;
460 uint32_t bucket = r->res_bucket;
461
462 write_lock(&ls->ls_rsbtbl[bucket].lock);
463 kref_put(&r->res_ref, toss_rsb);
464 write_unlock(&ls->ls_rsbtbl[bucket].lock);
465}
466
467void dlm_put_rsb(struct dlm_rsb *r)
468{
469 put_rsb(r);
470}
471
472/* See comment for unhold_lkb */
473
474static void unhold_rsb(struct dlm_rsb *r)
475{
476 int rv;
477 rv = kref_put(&r->res_ref, toss_rsb);
478 DLM_ASSERT(!rv, dlm_print_rsb(r););
479}
480
481static void kill_rsb(struct kref *kref)
482{
483 struct dlm_rsb *r = container_of(kref, struct dlm_rsb, res_ref);
484
485 /* All work is done after the return from kref_put() so we
486 can release the write_lock before the remove and free. */
487
488 DLM_ASSERT(list_empty(&r->res_lookup),);
489 DLM_ASSERT(list_empty(&r->res_grantqueue),);
490 DLM_ASSERT(list_empty(&r->res_convertqueue),);
491 DLM_ASSERT(list_empty(&r->res_waitqueue),);
492 DLM_ASSERT(list_empty(&r->res_root_list),);
493 DLM_ASSERT(list_empty(&r->res_recover_list),);
494}
495
496/* Attaching/detaching lkb's from rsb's is for rsb reference counting.
497 The rsb must exist as long as any lkb's for it do. */
498
499static void attach_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb)
500{
501 hold_rsb(r);
502 lkb->lkb_resource = r;
503}
504
505static void detach_lkb(struct dlm_lkb *lkb)
506{
507 if (lkb->lkb_resource) {
508 put_rsb(lkb->lkb_resource);
509 lkb->lkb_resource = NULL;
510 }
511}
512
513static int create_lkb(struct dlm_ls *ls, struct dlm_lkb **lkb_ret)
514{
515 struct dlm_lkb *lkb, *tmp;
516 uint32_t lkid = 0;
517 uint16_t bucket;
518
519 lkb = allocate_lkb(ls);
520 if (!lkb)
521 return -ENOMEM;
522
523 lkb->lkb_nodeid = -1;
524 lkb->lkb_grmode = DLM_LOCK_IV;
525 kref_init(&lkb->lkb_ref);
526
527 get_random_bytes(&bucket, sizeof(bucket));
528 bucket &= (ls->ls_lkbtbl_size - 1);
529
530 write_lock(&ls->ls_lkbtbl[bucket].lock);
531
532 /* counter can roll over so we must verify lkid is not in use */
533
534 while (lkid == 0) {
535 lkid = bucket | (ls->ls_lkbtbl[bucket].counter++ << 16);
536
537 list_for_each_entry(tmp, &ls->ls_lkbtbl[bucket].list,
538 lkb_idtbl_list) {
539 if (tmp->lkb_id != lkid)
540 continue;
541 lkid = 0;
542 break;
543 }
544 }
545
546 lkb->lkb_id = lkid;
547 list_add(&lkb->lkb_idtbl_list, &ls->ls_lkbtbl[bucket].list);
548 write_unlock(&ls->ls_lkbtbl[bucket].lock);
549
550 *lkb_ret = lkb;
551 return 0;
552}
553
554static struct dlm_lkb *__find_lkb(struct dlm_ls *ls, uint32_t lkid)
555{
556 uint16_t bucket = lkid & 0xFFFF;
557 struct dlm_lkb *lkb;
558
559 list_for_each_entry(lkb, &ls->ls_lkbtbl[bucket].list, lkb_idtbl_list) {
560 if (lkb->lkb_id == lkid)
561 return lkb;
562 }
563 return NULL;
564}
565
566static int find_lkb(struct dlm_ls *ls, uint32_t lkid, struct dlm_lkb **lkb_ret)
567{
568 struct dlm_lkb *lkb;
569 uint16_t bucket = lkid & 0xFFFF;
570
571 if (bucket >= ls->ls_lkbtbl_size)
572 return -EBADSLT;
573
574 read_lock(&ls->ls_lkbtbl[bucket].lock);
575 lkb = __find_lkb(ls, lkid);
576 if (lkb)
577 kref_get(&lkb->lkb_ref);
578 read_unlock(&ls->ls_lkbtbl[bucket].lock);
579
580 *lkb_ret = lkb;
581 return lkb ? 0 : -ENOENT;
582}
583
584static void kill_lkb(struct kref *kref)
585{
586 struct dlm_lkb *lkb = container_of(kref, struct dlm_lkb, lkb_ref);
587
588 /* All work is done after the return from kref_put() so we
589 can release the write_lock before the detach_lkb */
590
591 DLM_ASSERT(!lkb->lkb_status, dlm_print_lkb(lkb););
592}
593
b3f58d8f
DT
594/* __put_lkb() is used when an lkb may not have an rsb attached to
595 it so we need to provide the lockspace explicitly */
596
597static int __put_lkb(struct dlm_ls *ls, struct dlm_lkb *lkb)
e7fd4179 598{
e7fd4179
DT
599 uint16_t bucket = lkb->lkb_id & 0xFFFF;
600
601 write_lock(&ls->ls_lkbtbl[bucket].lock);
602 if (kref_put(&lkb->lkb_ref, kill_lkb)) {
603 list_del(&lkb->lkb_idtbl_list);
604 write_unlock(&ls->ls_lkbtbl[bucket].lock);
605
606 detach_lkb(lkb);
607
608 /* for local/process lkbs, lvbptr points to caller's lksb */
609 if (lkb->lkb_lvbptr && is_master_copy(lkb))
610 free_lvb(lkb->lkb_lvbptr);
e7fd4179
DT
611 free_lkb(lkb);
612 return 1;
613 } else {
614 write_unlock(&ls->ls_lkbtbl[bucket].lock);
615 return 0;
616 }
617}
618
619int dlm_put_lkb(struct dlm_lkb *lkb)
620{
b3f58d8f
DT
621 struct dlm_ls *ls;
622
623 DLM_ASSERT(lkb->lkb_resource, dlm_print_lkb(lkb););
624 DLM_ASSERT(lkb->lkb_resource->res_ls, dlm_print_lkb(lkb););
625
626 ls = lkb->lkb_resource->res_ls;
627 return __put_lkb(ls, lkb);
e7fd4179
DT
628}
629
630/* This is only called to add a reference when the code already holds
631 a valid reference to the lkb, so there's no need for locking. */
632
633static inline void hold_lkb(struct dlm_lkb *lkb)
634{
635 kref_get(&lkb->lkb_ref);
636}
637
638/* This is called when we need to remove a reference and are certain
639 it's not the last ref. e.g. del_lkb is always called between a
640 find_lkb/put_lkb and is always the inverse of a previous add_lkb.
641 put_lkb would work fine, but would involve unnecessary locking */
642
643static inline void unhold_lkb(struct dlm_lkb *lkb)
644{
645 int rv;
646 rv = kref_put(&lkb->lkb_ref, kill_lkb);
647 DLM_ASSERT(!rv, dlm_print_lkb(lkb););
648}
649
650static void lkb_add_ordered(struct list_head *new, struct list_head *head,
651 int mode)
652{
653 struct dlm_lkb *lkb = NULL;
654
655 list_for_each_entry(lkb, head, lkb_statequeue)
656 if (lkb->lkb_rqmode < mode)
657 break;
658
659 if (!lkb)
660 list_add_tail(new, head);
661 else
662 __list_add(new, lkb->lkb_statequeue.prev, &lkb->lkb_statequeue);
663}
664
665/* add/remove lkb to rsb's grant/convert/wait queue */
666
667static void add_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb, int status)
668{
669 kref_get(&lkb->lkb_ref);
670
671 DLM_ASSERT(!lkb->lkb_status, dlm_print_lkb(lkb););
672
673 lkb->lkb_status = status;
674
675 switch (status) {
676 case DLM_LKSTS_WAITING:
677 if (lkb->lkb_exflags & DLM_LKF_HEADQUE)
678 list_add(&lkb->lkb_statequeue, &r->res_waitqueue);
679 else
680 list_add_tail(&lkb->lkb_statequeue, &r->res_waitqueue);
681 break;
682 case DLM_LKSTS_GRANTED:
683 /* convention says granted locks kept in order of grmode */
684 lkb_add_ordered(&lkb->lkb_statequeue, &r->res_grantqueue,
685 lkb->lkb_grmode);
686 break;
687 case DLM_LKSTS_CONVERT:
688 if (lkb->lkb_exflags & DLM_LKF_HEADQUE)
689 list_add(&lkb->lkb_statequeue, &r->res_convertqueue);
690 else
691 list_add_tail(&lkb->lkb_statequeue,
692 &r->res_convertqueue);
693 break;
694 default:
695 DLM_ASSERT(0, dlm_print_lkb(lkb); printk("sts=%d\n", status););
696 }
697}
698
699static void del_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb)
700{
701 lkb->lkb_status = 0;
702 list_del(&lkb->lkb_statequeue);
703 unhold_lkb(lkb);
704}
705
706static void move_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb, int sts)
707{
708 hold_lkb(lkb);
709 del_lkb(r, lkb);
710 add_lkb(r, lkb, sts);
711 unhold_lkb(lkb);
712}
713
714/* add/remove lkb from global waiters list of lkb's waiting for
715 a reply from a remote node */
716
717static void add_to_waiters(struct dlm_lkb *lkb, int mstype)
718{
719 struct dlm_ls *ls = lkb->lkb_resource->res_ls;
720
90135925 721 mutex_lock(&ls->ls_waiters_mutex);
e7fd4179
DT
722 if (lkb->lkb_wait_type) {
723 log_print("add_to_waiters error %d", lkb->lkb_wait_type);
724 goto out;
725 }
726 lkb->lkb_wait_type = mstype;
727 kref_get(&lkb->lkb_ref);
728 list_add(&lkb->lkb_wait_reply, &ls->ls_waiters);
729 out:
90135925 730 mutex_unlock(&ls->ls_waiters_mutex);
e7fd4179
DT
731}
732
733static int _remove_from_waiters(struct dlm_lkb *lkb)
734{
735 int error = 0;
736
737 if (!lkb->lkb_wait_type) {
738 log_print("remove_from_waiters error");
739 error = -EINVAL;
740 goto out;
741 }
742 lkb->lkb_wait_type = 0;
743 list_del(&lkb->lkb_wait_reply);
744 unhold_lkb(lkb);
745 out:
746 return error;
747}
748
749static int remove_from_waiters(struct dlm_lkb *lkb)
750{
751 struct dlm_ls *ls = lkb->lkb_resource->res_ls;
752 int error;
753
90135925 754 mutex_lock(&ls->ls_waiters_mutex);
e7fd4179 755 error = _remove_from_waiters(lkb);
90135925 756 mutex_unlock(&ls->ls_waiters_mutex);
e7fd4179
DT
757 return error;
758}
759
760static void dir_remove(struct dlm_rsb *r)
761{
762 int to_nodeid;
763
764 if (dlm_no_directory(r->res_ls))
765 return;
766
767 to_nodeid = dlm_dir_nodeid(r);
768 if (to_nodeid != dlm_our_nodeid())
769 send_remove(r);
770 else
771 dlm_dir_remove_entry(r->res_ls, to_nodeid,
772 r->res_name, r->res_length);
773}
774
775/* FIXME: shouldn't this be able to exit as soon as one non-due rsb is
776 found since they are in order of newest to oldest? */
777
778static int shrink_bucket(struct dlm_ls *ls, int b)
779{
780 struct dlm_rsb *r;
781 int count = 0, found;
782
783 for (;;) {
90135925 784 found = 0;
e7fd4179
DT
785 write_lock(&ls->ls_rsbtbl[b].lock);
786 list_for_each_entry_reverse(r, &ls->ls_rsbtbl[b].toss,
787 res_hashchain) {
788 if (!time_after_eq(jiffies, r->res_toss_time +
789 dlm_config.toss_secs * HZ))
790 continue;
90135925 791 found = 1;
e7fd4179
DT
792 break;
793 }
794
795 if (!found) {
796 write_unlock(&ls->ls_rsbtbl[b].lock);
797 break;
798 }
799
800 if (kref_put(&r->res_ref, kill_rsb)) {
801 list_del(&r->res_hashchain);
802 write_unlock(&ls->ls_rsbtbl[b].lock);
803
804 if (is_master(r))
805 dir_remove(r);
806 free_rsb(r);
807 count++;
808 } else {
809 write_unlock(&ls->ls_rsbtbl[b].lock);
810 log_error(ls, "tossed rsb in use %s", r->res_name);
811 }
812 }
813
814 return count;
815}
816
817void dlm_scan_rsbs(struct dlm_ls *ls)
818{
819 int i;
820
821 if (dlm_locking_stopped(ls))
822 return;
823
824 for (i = 0; i < ls->ls_rsbtbl_size; i++) {
825 shrink_bucket(ls, i);
826 cond_resched();
827 }
828}
829
830/* lkb is master or local copy */
831
832static void set_lvb_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
833{
834 int b, len = r->res_ls->ls_lvblen;
835
836 /* b=1 lvb returned to caller
837 b=0 lvb written to rsb or invalidated
838 b=-1 do nothing */
839
840 b = dlm_lvb_operations[lkb->lkb_grmode + 1][lkb->lkb_rqmode + 1];
841
842 if (b == 1) {
843 if (!lkb->lkb_lvbptr)
844 return;
845
846 if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
847 return;
848
849 if (!r->res_lvbptr)
850 return;
851
852 memcpy(lkb->lkb_lvbptr, r->res_lvbptr, len);
853 lkb->lkb_lvbseq = r->res_lvbseq;
854
855 } else if (b == 0) {
856 if (lkb->lkb_exflags & DLM_LKF_IVVALBLK) {
857 rsb_set_flag(r, RSB_VALNOTVALID);
858 return;
859 }
860
861 if (!lkb->lkb_lvbptr)
862 return;
863
864 if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
865 return;
866
867 if (!r->res_lvbptr)
868 r->res_lvbptr = allocate_lvb(r->res_ls);
869
870 if (!r->res_lvbptr)
871 return;
872
873 memcpy(r->res_lvbptr, lkb->lkb_lvbptr, len);
874 r->res_lvbseq++;
875 lkb->lkb_lvbseq = r->res_lvbseq;
876 rsb_clear_flag(r, RSB_VALNOTVALID);
877 }
878
879 if (rsb_flag(r, RSB_VALNOTVALID))
880 lkb->lkb_sbflags |= DLM_SBF_VALNOTVALID;
881}
882
883static void set_lvb_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb)
884{
885 if (lkb->lkb_grmode < DLM_LOCK_PW)
886 return;
887
888 if (lkb->lkb_exflags & DLM_LKF_IVVALBLK) {
889 rsb_set_flag(r, RSB_VALNOTVALID);
890 return;
891 }
892
893 if (!lkb->lkb_lvbptr)
894 return;
895
896 if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
897 return;
898
899 if (!r->res_lvbptr)
900 r->res_lvbptr = allocate_lvb(r->res_ls);
901
902 if (!r->res_lvbptr)
903 return;
904
905 memcpy(r->res_lvbptr, lkb->lkb_lvbptr, r->res_ls->ls_lvblen);
906 r->res_lvbseq++;
907 rsb_clear_flag(r, RSB_VALNOTVALID);
908}
909
910/* lkb is process copy (pc) */
911
912static void set_lvb_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb,
913 struct dlm_message *ms)
914{
915 int b;
916
917 if (!lkb->lkb_lvbptr)
918 return;
919
920 if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
921 return;
922
923 b = dlm_lvb_operations[lkb->lkb_grmode + 1][lkb->lkb_rqmode + 1];
924 if (b == 1) {
925 int len = receive_extralen(ms);
926 memcpy(lkb->lkb_lvbptr, ms->m_extra, len);
927 lkb->lkb_lvbseq = ms->m_lvbseq;
928 }
929}
930
931/* Manipulate lkb's on rsb's convert/granted/waiting queues
932 remove_lock -- used for unlock, removes lkb from granted
933 revert_lock -- used for cancel, moves lkb from convert to granted
934 grant_lock -- used for request and convert, adds lkb to granted or
935 moves lkb from convert or waiting to granted
936
937 Each of these is used for master or local copy lkb's. There is
938 also a _pc() variation used to make the corresponding change on
939 a process copy (pc) lkb. */
940
941static void _remove_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
942{
943 del_lkb(r, lkb);
944 lkb->lkb_grmode = DLM_LOCK_IV;
945 /* this unhold undoes the original ref from create_lkb()
946 so this leads to the lkb being freed */
947 unhold_lkb(lkb);
948}
949
950static void remove_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
951{
952 set_lvb_unlock(r, lkb);
953 _remove_lock(r, lkb);
954}
955
956static void remove_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb)
957{
958 _remove_lock(r, lkb);
959}
960
961static void revert_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
962{
963 lkb->lkb_rqmode = DLM_LOCK_IV;
964
965 switch (lkb->lkb_status) {
966 case DLM_LKSTS_CONVERT:
967 move_lkb(r, lkb, DLM_LKSTS_GRANTED);
968 break;
969 case DLM_LKSTS_WAITING:
970 del_lkb(r, lkb);
971 lkb->lkb_grmode = DLM_LOCK_IV;
972 /* this unhold undoes the original ref from create_lkb()
973 so this leads to the lkb being freed */
974 unhold_lkb(lkb);
975 break;
976 default:
977 log_print("invalid status for revert %d", lkb->lkb_status);
978 }
979}
980
981static void revert_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb)
982{
983 revert_lock(r, lkb);
984}
985
986static void _grant_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
987{
988 if (lkb->lkb_grmode != lkb->lkb_rqmode) {
989 lkb->lkb_grmode = lkb->lkb_rqmode;
990 if (lkb->lkb_status)
991 move_lkb(r, lkb, DLM_LKSTS_GRANTED);
992 else
993 add_lkb(r, lkb, DLM_LKSTS_GRANTED);
994 }
995
996 lkb->lkb_rqmode = DLM_LOCK_IV;
e7fd4179
DT
997}
998
999static void grant_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1000{
1001 set_lvb_lock(r, lkb);
1002 _grant_lock(r, lkb);
1003 lkb->lkb_highbast = 0;
1004}
1005
1006static void grant_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb,
1007 struct dlm_message *ms)
1008{
1009 set_lvb_lock_pc(r, lkb, ms);
1010 _grant_lock(r, lkb);
1011}
1012
1013/* called by grant_pending_locks() which means an async grant message must
1014 be sent to the requesting node in addition to granting the lock if the
1015 lkb belongs to a remote node. */
1016
1017static void grant_lock_pending(struct dlm_rsb *r, struct dlm_lkb *lkb)
1018{
1019 grant_lock(r, lkb);
1020 if (is_master_copy(lkb))
1021 send_grant(r, lkb);
1022 else
1023 queue_cast(r, lkb, 0);
1024}
1025
1026static inline int first_in_list(struct dlm_lkb *lkb, struct list_head *head)
1027{
1028 struct dlm_lkb *first = list_entry(head->next, struct dlm_lkb,
1029 lkb_statequeue);
1030 if (lkb->lkb_id == first->lkb_id)
90135925 1031 return 1;
e7fd4179 1032
90135925 1033 return 0;
e7fd4179
DT
1034}
1035
e7fd4179
DT
1036/* Check if the given lkb conflicts with another lkb on the queue. */
1037
1038static int queue_conflict(struct list_head *head, struct dlm_lkb *lkb)
1039{
1040 struct dlm_lkb *this;
1041
1042 list_for_each_entry(this, head, lkb_statequeue) {
1043 if (this == lkb)
1044 continue;
3bcd3687 1045 if (!modes_compat(this, lkb))
90135925 1046 return 1;
e7fd4179 1047 }
90135925 1048 return 0;
e7fd4179
DT
1049}
1050
1051/*
1052 * "A conversion deadlock arises with a pair of lock requests in the converting
1053 * queue for one resource. The granted mode of each lock blocks the requested
1054 * mode of the other lock."
1055 *
1056 * Part 2: if the granted mode of lkb is preventing the first lkb in the
1057 * convert queue from being granted, then demote lkb (set grmode to NL).
1058 * This second form requires that we check for conv-deadlk even when
1059 * now == 0 in _can_be_granted().
1060 *
1061 * Example:
1062 * Granted Queue: empty
1063 * Convert Queue: NL->EX (first lock)
1064 * PR->EX (second lock)
1065 *
1066 * The first lock can't be granted because of the granted mode of the second
1067 * lock and the second lock can't be granted because it's not first in the
1068 * list. We demote the granted mode of the second lock (the lkb passed to this
1069 * function).
1070 *
1071 * After the resolution, the "grant pending" function needs to go back and try
1072 * to grant locks on the convert queue again since the first lock can now be
1073 * granted.
1074 */
1075
1076static int conversion_deadlock_detect(struct dlm_rsb *rsb, struct dlm_lkb *lkb)
1077{
1078 struct dlm_lkb *this, *first = NULL, *self = NULL;
1079
1080 list_for_each_entry(this, &rsb->res_convertqueue, lkb_statequeue) {
1081 if (!first)
1082 first = this;
1083 if (this == lkb) {
1084 self = lkb;
1085 continue;
1086 }
1087
e7fd4179 1088 if (!modes_compat(this, lkb) && !modes_compat(lkb, this))
90135925 1089 return 1;
e7fd4179
DT
1090 }
1091
1092 /* if lkb is on the convert queue and is preventing the first
1093 from being granted, then there's deadlock and we demote lkb.
1094 multiple converting locks may need to do this before the first
1095 converting lock can be granted. */
1096
1097 if (self && self != first) {
1098 if (!modes_compat(lkb, first) &&
1099 !queue_conflict(&rsb->res_grantqueue, first))
90135925 1100 return 1;
e7fd4179
DT
1101 }
1102
90135925 1103 return 0;
e7fd4179
DT
1104}
1105
1106/*
1107 * Return 1 if the lock can be granted, 0 otherwise.
1108 * Also detect and resolve conversion deadlocks.
1109 *
1110 * lkb is the lock to be granted
1111 *
1112 * now is 1 if the function is being called in the context of the
1113 * immediate request, it is 0 if called later, after the lock has been
1114 * queued.
1115 *
1116 * References are from chapter 6 of "VAXcluster Principles" by Roy Davis
1117 */
1118
1119static int _can_be_granted(struct dlm_rsb *r, struct dlm_lkb *lkb, int now)
1120{
1121 int8_t conv = (lkb->lkb_grmode != DLM_LOCK_IV);
1122
1123 /*
1124 * 6-10: Version 5.4 introduced an option to address the phenomenon of
1125 * a new request for a NL mode lock being blocked.
1126 *
1127 * 6-11: If the optional EXPEDITE flag is used with the new NL mode
1128 * request, then it would be granted. In essence, the use of this flag
1129 * tells the Lock Manager to expedite theis request by not considering
1130 * what may be in the CONVERTING or WAITING queues... As of this
1131 * writing, the EXPEDITE flag can be used only with new requests for NL
1132 * mode locks. This flag is not valid for conversion requests.
1133 *
1134 * A shortcut. Earlier checks return an error if EXPEDITE is used in a
1135 * conversion or used with a non-NL requested mode. We also know an
1136 * EXPEDITE request is always granted immediately, so now must always
1137 * be 1. The full condition to grant an expedite request: (now &&
1138 * !conv && lkb->rqmode == DLM_LOCK_NL && (flags & EXPEDITE)) can
1139 * therefore be shortened to just checking the flag.
1140 */
1141
1142 if (lkb->lkb_exflags & DLM_LKF_EXPEDITE)
90135925 1143 return 1;
e7fd4179
DT
1144
1145 /*
1146 * A shortcut. Without this, !queue_conflict(grantqueue, lkb) would be
1147 * added to the remaining conditions.
1148 */
1149
1150 if (queue_conflict(&r->res_grantqueue, lkb))
1151 goto out;
1152
1153 /*
1154 * 6-3: By default, a conversion request is immediately granted if the
1155 * requested mode is compatible with the modes of all other granted
1156 * locks
1157 */
1158
1159 if (queue_conflict(&r->res_convertqueue, lkb))
1160 goto out;
1161
1162 /*
1163 * 6-5: But the default algorithm for deciding whether to grant or
1164 * queue conversion requests does not by itself guarantee that such
1165 * requests are serviced on a "first come first serve" basis. This, in
1166 * turn, can lead to a phenomenon known as "indefinate postponement".
1167 *
1168 * 6-7: This issue is dealt with by using the optional QUECVT flag with
1169 * the system service employed to request a lock conversion. This flag
1170 * forces certain conversion requests to be queued, even if they are
1171 * compatible with the granted modes of other locks on the same
1172 * resource. Thus, the use of this flag results in conversion requests
1173 * being ordered on a "first come first servce" basis.
1174 *
1175 * DCT: This condition is all about new conversions being able to occur
1176 * "in place" while the lock remains on the granted queue (assuming
1177 * nothing else conflicts.) IOW if QUECVT isn't set, a conversion
1178 * doesn't _have_ to go onto the convert queue where it's processed in
1179 * order. The "now" variable is necessary to distinguish converts
1180 * being received and processed for the first time now, because once a
1181 * convert is moved to the conversion queue the condition below applies
1182 * requiring fifo granting.
1183 */
1184
1185 if (now && conv && !(lkb->lkb_exflags & DLM_LKF_QUECVT))
90135925 1186 return 1;
e7fd4179
DT
1187
1188 /*
3bcd3687
DT
1189 * The NOORDER flag is set to avoid the standard vms rules on grant
1190 * order.
e7fd4179
DT
1191 */
1192
1193 if (lkb->lkb_exflags & DLM_LKF_NOORDER)
90135925 1194 return 1;
e7fd4179
DT
1195
1196 /*
1197 * 6-3: Once in that queue [CONVERTING], a conversion request cannot be
1198 * granted until all other conversion requests ahead of it are granted
1199 * and/or canceled.
1200 */
1201
1202 if (!now && conv && first_in_list(lkb, &r->res_convertqueue))
90135925 1203 return 1;
e7fd4179
DT
1204
1205 /*
1206 * 6-4: By default, a new request is immediately granted only if all
1207 * three of the following conditions are satisfied when the request is
1208 * issued:
1209 * - The queue of ungranted conversion requests for the resource is
1210 * empty.
1211 * - The queue of ungranted new requests for the resource is empty.
1212 * - The mode of the new request is compatible with the most
1213 * restrictive mode of all granted locks on the resource.
1214 */
1215
1216 if (now && !conv && list_empty(&r->res_convertqueue) &&
1217 list_empty(&r->res_waitqueue))
90135925 1218 return 1;
e7fd4179
DT
1219
1220 /*
1221 * 6-4: Once a lock request is in the queue of ungranted new requests,
1222 * it cannot be granted until the queue of ungranted conversion
1223 * requests is empty, all ungranted new requests ahead of it are
1224 * granted and/or canceled, and it is compatible with the granted mode
1225 * of the most restrictive lock granted on the resource.
1226 */
1227
1228 if (!now && !conv && list_empty(&r->res_convertqueue) &&
1229 first_in_list(lkb, &r->res_waitqueue))
90135925 1230 return 1;
e7fd4179
DT
1231
1232 out:
1233 /*
1234 * The following, enabled by CONVDEADLK, departs from VMS.
1235 */
1236
1237 if (conv && (lkb->lkb_exflags & DLM_LKF_CONVDEADLK) &&
1238 conversion_deadlock_detect(r, lkb)) {
1239 lkb->lkb_grmode = DLM_LOCK_NL;
1240 lkb->lkb_sbflags |= DLM_SBF_DEMOTED;
1241 }
1242
90135925 1243 return 0;
e7fd4179
DT
1244}
1245
1246/*
1247 * The ALTPR and ALTCW flags aren't traditional lock manager flags, but are a
1248 * simple way to provide a big optimization to applications that can use them.
1249 */
1250
1251static int can_be_granted(struct dlm_rsb *r, struct dlm_lkb *lkb, int now)
1252{
1253 uint32_t flags = lkb->lkb_exflags;
1254 int rv;
1255 int8_t alt = 0, rqmode = lkb->lkb_rqmode;
1256
1257 rv = _can_be_granted(r, lkb, now);
1258 if (rv)
1259 goto out;
1260
1261 if (lkb->lkb_sbflags & DLM_SBF_DEMOTED)
1262 goto out;
1263
1264 if (rqmode != DLM_LOCK_PR && flags & DLM_LKF_ALTPR)
1265 alt = DLM_LOCK_PR;
1266 else if (rqmode != DLM_LOCK_CW && flags & DLM_LKF_ALTCW)
1267 alt = DLM_LOCK_CW;
1268
1269 if (alt) {
1270 lkb->lkb_rqmode = alt;
1271 rv = _can_be_granted(r, lkb, now);
1272 if (rv)
1273 lkb->lkb_sbflags |= DLM_SBF_ALTMODE;
1274 else
1275 lkb->lkb_rqmode = rqmode;
1276 }
1277 out:
1278 return rv;
1279}
1280
1281static int grant_pending_convert(struct dlm_rsb *r, int high)
1282{
1283 struct dlm_lkb *lkb, *s;
1284 int hi, demoted, quit, grant_restart, demote_restart;
1285
1286 quit = 0;
1287 restart:
1288 grant_restart = 0;
1289 demote_restart = 0;
1290 hi = DLM_LOCK_IV;
1291
1292 list_for_each_entry_safe(lkb, s, &r->res_convertqueue, lkb_statequeue) {
1293 demoted = is_demoted(lkb);
90135925 1294 if (can_be_granted(r, lkb, 0)) {
e7fd4179
DT
1295 grant_lock_pending(r, lkb);
1296 grant_restart = 1;
1297 } else {
1298 hi = max_t(int, lkb->lkb_rqmode, hi);
1299 if (!demoted && is_demoted(lkb))
1300 demote_restart = 1;
1301 }
1302 }
1303
1304 if (grant_restart)
1305 goto restart;
1306 if (demote_restart && !quit) {
1307 quit = 1;
1308 goto restart;
1309 }
1310
1311 return max_t(int, high, hi);
1312}
1313
1314static int grant_pending_wait(struct dlm_rsb *r, int high)
1315{
1316 struct dlm_lkb *lkb, *s;
1317
1318 list_for_each_entry_safe(lkb, s, &r->res_waitqueue, lkb_statequeue) {
90135925 1319 if (can_be_granted(r, lkb, 0))
e7fd4179
DT
1320 grant_lock_pending(r, lkb);
1321 else
1322 high = max_t(int, lkb->lkb_rqmode, high);
1323 }
1324
1325 return high;
1326}
1327
1328static void grant_pending_locks(struct dlm_rsb *r)
1329{
1330 struct dlm_lkb *lkb, *s;
1331 int high = DLM_LOCK_IV;
1332
1333 DLM_ASSERT(is_master(r), dlm_print_rsb(r););
1334
1335 high = grant_pending_convert(r, high);
1336 high = grant_pending_wait(r, high);
1337
1338 if (high == DLM_LOCK_IV)
1339 return;
1340
1341 /*
1342 * If there are locks left on the wait/convert queue then send blocking
1343 * ASTs to granted locks based on the largest requested mode (high)
3bcd3687 1344 * found above. FIXME: highbast < high comparison not valid for PR/CW.
e7fd4179
DT
1345 */
1346
1347 list_for_each_entry_safe(lkb, s, &r->res_grantqueue, lkb_statequeue) {
1348 if (lkb->lkb_bastaddr && (lkb->lkb_highbast < high) &&
1349 !__dlm_compat_matrix[lkb->lkb_grmode+1][high+1]) {
1350 queue_bast(r, lkb, high);
1351 lkb->lkb_highbast = high;
1352 }
1353 }
1354}
1355
1356static void send_bast_queue(struct dlm_rsb *r, struct list_head *head,
1357 struct dlm_lkb *lkb)
1358{
1359 struct dlm_lkb *gr;
1360
1361 list_for_each_entry(gr, head, lkb_statequeue) {
1362 if (gr->lkb_bastaddr &&
1363 gr->lkb_highbast < lkb->lkb_rqmode &&
3bcd3687 1364 !modes_compat(gr, lkb)) {
e7fd4179
DT
1365 queue_bast(r, gr, lkb->lkb_rqmode);
1366 gr->lkb_highbast = lkb->lkb_rqmode;
1367 }
1368 }
1369}
1370
1371static void send_blocking_asts(struct dlm_rsb *r, struct dlm_lkb *lkb)
1372{
1373 send_bast_queue(r, &r->res_grantqueue, lkb);
1374}
1375
1376static void send_blocking_asts_all(struct dlm_rsb *r, struct dlm_lkb *lkb)
1377{
1378 send_bast_queue(r, &r->res_grantqueue, lkb);
1379 send_bast_queue(r, &r->res_convertqueue, lkb);
1380}
1381
1382/* set_master(r, lkb) -- set the master nodeid of a resource
1383
1384 The purpose of this function is to set the nodeid field in the given
1385 lkb using the nodeid field in the given rsb. If the rsb's nodeid is
1386 known, it can just be copied to the lkb and the function will return
1387 0. If the rsb's nodeid is _not_ known, it needs to be looked up
1388 before it can be copied to the lkb.
1389
1390 When the rsb nodeid is being looked up remotely, the initial lkb
1391 causing the lookup is kept on the ls_waiters list waiting for the
1392 lookup reply. Other lkb's waiting for the same rsb lookup are kept
1393 on the rsb's res_lookup list until the master is verified.
1394
1395 Return values:
1396 0: nodeid is set in rsb/lkb and the caller should go ahead and use it
1397 1: the rsb master is not available and the lkb has been placed on
1398 a wait queue
1399*/
1400
1401static int set_master(struct dlm_rsb *r, struct dlm_lkb *lkb)
1402{
1403 struct dlm_ls *ls = r->res_ls;
1404 int error, dir_nodeid, ret_nodeid, our_nodeid = dlm_our_nodeid();
1405
1406 if (rsb_flag(r, RSB_MASTER_UNCERTAIN)) {
1407 rsb_clear_flag(r, RSB_MASTER_UNCERTAIN);
1408 r->res_first_lkid = lkb->lkb_id;
1409 lkb->lkb_nodeid = r->res_nodeid;
1410 return 0;
1411 }
1412
1413 if (r->res_first_lkid && r->res_first_lkid != lkb->lkb_id) {
1414 list_add_tail(&lkb->lkb_rsb_lookup, &r->res_lookup);
1415 return 1;
1416 }
1417
1418 if (r->res_nodeid == 0) {
1419 lkb->lkb_nodeid = 0;
1420 return 0;
1421 }
1422
1423 if (r->res_nodeid > 0) {
1424 lkb->lkb_nodeid = r->res_nodeid;
1425 return 0;
1426 }
1427
1428 DLM_ASSERT(r->res_nodeid == -1, dlm_print_rsb(r););
1429
1430 dir_nodeid = dlm_dir_nodeid(r);
1431
1432 if (dir_nodeid != our_nodeid) {
1433 r->res_first_lkid = lkb->lkb_id;
1434 send_lookup(r, lkb);
1435 return 1;
1436 }
1437
1438 for (;;) {
1439 /* It's possible for dlm_scand to remove an old rsb for
1440 this same resource from the toss list, us to create
1441 a new one, look up the master locally, and find it
1442 already exists just before dlm_scand does the
1443 dir_remove() on the previous rsb. */
1444
1445 error = dlm_dir_lookup(ls, our_nodeid, r->res_name,
1446 r->res_length, &ret_nodeid);
1447 if (!error)
1448 break;
1449 log_debug(ls, "dir_lookup error %d %s", error, r->res_name);
1450 schedule();
1451 }
1452
1453 if (ret_nodeid == our_nodeid) {
1454 r->res_first_lkid = 0;
1455 r->res_nodeid = 0;
1456 lkb->lkb_nodeid = 0;
1457 } else {
1458 r->res_first_lkid = lkb->lkb_id;
1459 r->res_nodeid = ret_nodeid;
1460 lkb->lkb_nodeid = ret_nodeid;
1461 }
1462 return 0;
1463}
1464
1465static void process_lookup_list(struct dlm_rsb *r)
1466{
1467 struct dlm_lkb *lkb, *safe;
1468
1469 list_for_each_entry_safe(lkb, safe, &r->res_lookup, lkb_rsb_lookup) {
1470 list_del(&lkb->lkb_rsb_lookup);
1471 _request_lock(r, lkb);
1472 schedule();
1473 }
1474}
1475
1476/* confirm_master -- confirm (or deny) an rsb's master nodeid */
1477
1478static void confirm_master(struct dlm_rsb *r, int error)
1479{
1480 struct dlm_lkb *lkb;
1481
1482 if (!r->res_first_lkid)
1483 return;
1484
1485 switch (error) {
1486 case 0:
1487 case -EINPROGRESS:
1488 r->res_first_lkid = 0;
1489 process_lookup_list(r);
1490 break;
1491
1492 case -EAGAIN:
1493 /* the remote master didn't queue our NOQUEUE request;
1494 make a waiting lkb the first_lkid */
1495
1496 r->res_first_lkid = 0;
1497
1498 if (!list_empty(&r->res_lookup)) {
1499 lkb = list_entry(r->res_lookup.next, struct dlm_lkb,
1500 lkb_rsb_lookup);
1501 list_del(&lkb->lkb_rsb_lookup);
1502 r->res_first_lkid = lkb->lkb_id;
1503 _request_lock(r, lkb);
1504 } else
1505 r->res_nodeid = -1;
1506 break;
1507
1508 default:
1509 log_error(r->res_ls, "confirm_master unknown error %d", error);
1510 }
1511}
1512
1513static int set_lock_args(int mode, struct dlm_lksb *lksb, uint32_t flags,
1514 int namelen, uint32_t parent_lkid, void *ast,
3bcd3687 1515 void *astarg, void *bast, struct dlm_args *args)
e7fd4179
DT
1516{
1517 int rv = -EINVAL;
1518
1519 /* check for invalid arg usage */
1520
1521 if (mode < 0 || mode > DLM_LOCK_EX)
1522 goto out;
1523
1524 if (!(flags & DLM_LKF_CONVERT) && (namelen > DLM_RESNAME_MAXLEN))
1525 goto out;
1526
1527 if (flags & DLM_LKF_CANCEL)
1528 goto out;
1529
1530 if (flags & DLM_LKF_QUECVT && !(flags & DLM_LKF_CONVERT))
1531 goto out;
1532
1533 if (flags & DLM_LKF_CONVDEADLK && !(flags & DLM_LKF_CONVERT))
1534 goto out;
1535
1536 if (flags & DLM_LKF_CONVDEADLK && flags & DLM_LKF_NOQUEUE)
1537 goto out;
1538
1539 if (flags & DLM_LKF_EXPEDITE && flags & DLM_LKF_CONVERT)
1540 goto out;
1541
1542 if (flags & DLM_LKF_EXPEDITE && flags & DLM_LKF_QUECVT)
1543 goto out;
1544
1545 if (flags & DLM_LKF_EXPEDITE && flags & DLM_LKF_NOQUEUE)
1546 goto out;
1547
1548 if (flags & DLM_LKF_EXPEDITE && mode != DLM_LOCK_NL)
1549 goto out;
1550
1551 if (!ast || !lksb)
1552 goto out;
1553
1554 if (flags & DLM_LKF_VALBLK && !lksb->sb_lvbptr)
1555 goto out;
1556
1557 /* parent/child locks not yet supported */
1558 if (parent_lkid)
1559 goto out;
1560
1561 if (flags & DLM_LKF_CONVERT && !lksb->sb_lkid)
1562 goto out;
1563
1564 /* these args will be copied to the lkb in validate_lock_args,
1565 it cannot be done now because when converting locks, fields in
1566 an active lkb cannot be modified before locking the rsb */
1567
1568 args->flags = flags;
1569 args->astaddr = ast;
1570 args->astparam = (long) astarg;
1571 args->bastaddr = bast;
1572 args->mode = mode;
1573 args->lksb = lksb;
e7fd4179
DT
1574 rv = 0;
1575 out:
1576 return rv;
1577}
1578
1579static int set_unlock_args(uint32_t flags, void *astarg, struct dlm_args *args)
1580{
1581 if (flags & ~(DLM_LKF_CANCEL | DLM_LKF_VALBLK | DLM_LKF_IVVALBLK |
1582 DLM_LKF_FORCEUNLOCK))
1583 return -EINVAL;
1584
1585 args->flags = flags;
1586 args->astparam = (long) astarg;
1587 return 0;
1588}
1589
1590static int validate_lock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
1591 struct dlm_args *args)
1592{
1593 int rv = -EINVAL;
1594
1595 if (args->flags & DLM_LKF_CONVERT) {
1596 if (lkb->lkb_flags & DLM_IFL_MSTCPY)
1597 goto out;
1598
1599 if (args->flags & DLM_LKF_QUECVT &&
1600 !__quecvt_compat_matrix[lkb->lkb_grmode+1][args->mode+1])
1601 goto out;
1602
1603 rv = -EBUSY;
1604 if (lkb->lkb_status != DLM_LKSTS_GRANTED)
1605 goto out;
1606
1607 if (lkb->lkb_wait_type)
1608 goto out;
1609 }
1610
1611 lkb->lkb_exflags = args->flags;
1612 lkb->lkb_sbflags = 0;
1613 lkb->lkb_astaddr = args->astaddr;
1614 lkb->lkb_astparam = args->astparam;
1615 lkb->lkb_bastaddr = args->bastaddr;
1616 lkb->lkb_rqmode = args->mode;
1617 lkb->lkb_lksb = args->lksb;
1618 lkb->lkb_lvbptr = args->lksb->sb_lvbptr;
1619 lkb->lkb_ownpid = (int) current->pid;
e7fd4179
DT
1620 rv = 0;
1621 out:
1622 return rv;
1623}
1624
1625static int validate_unlock_args(struct dlm_lkb *lkb, struct dlm_args *args)
1626{
1627 int rv = -EINVAL;
1628
1629 if (lkb->lkb_flags & DLM_IFL_MSTCPY)
1630 goto out;
1631
1632 if (args->flags & DLM_LKF_FORCEUNLOCK)
1633 goto out_ok;
1634
1635 if (args->flags & DLM_LKF_CANCEL &&
1636 lkb->lkb_status == DLM_LKSTS_GRANTED)
1637 goto out;
1638
1639 if (!(args->flags & DLM_LKF_CANCEL) &&
1640 lkb->lkb_status != DLM_LKSTS_GRANTED)
1641 goto out;
1642
1643 rv = -EBUSY;
1644 if (lkb->lkb_wait_type)
1645 goto out;
1646
1647 out_ok:
1648 lkb->lkb_exflags = args->flags;
1649 lkb->lkb_sbflags = 0;
1650 lkb->lkb_astparam = args->astparam;
1651
1652 rv = 0;
1653 out:
1654 return rv;
1655}
1656
1657/*
1658 * Four stage 4 varieties:
1659 * do_request(), do_convert(), do_unlock(), do_cancel()
1660 * These are called on the master node for the given lock and
1661 * from the central locking logic.
1662 */
1663
1664static int do_request(struct dlm_rsb *r, struct dlm_lkb *lkb)
1665{
1666 int error = 0;
1667
90135925 1668 if (can_be_granted(r, lkb, 1)) {
e7fd4179
DT
1669 grant_lock(r, lkb);
1670 queue_cast(r, lkb, 0);
1671 goto out;
1672 }
1673
1674 if (can_be_queued(lkb)) {
1675 error = -EINPROGRESS;
1676 add_lkb(r, lkb, DLM_LKSTS_WAITING);
1677 send_blocking_asts(r, lkb);
1678 goto out;
1679 }
1680
1681 error = -EAGAIN;
1682 if (force_blocking_asts(lkb))
1683 send_blocking_asts_all(r, lkb);
1684 queue_cast(r, lkb, -EAGAIN);
1685
1686 out:
1687 return error;
1688}
1689
1690static int do_convert(struct dlm_rsb *r, struct dlm_lkb *lkb)
1691{
1692 int error = 0;
1693
1694 /* changing an existing lock may allow others to be granted */
1695
90135925 1696 if (can_be_granted(r, lkb, 1)) {
e7fd4179
DT
1697 grant_lock(r, lkb);
1698 queue_cast(r, lkb, 0);
1699 grant_pending_locks(r);
1700 goto out;
1701 }
1702
1703 if (can_be_queued(lkb)) {
1704 if (is_demoted(lkb))
1705 grant_pending_locks(r);
1706 error = -EINPROGRESS;
1707 del_lkb(r, lkb);
1708 add_lkb(r, lkb, DLM_LKSTS_CONVERT);
1709 send_blocking_asts(r, lkb);
1710 goto out;
1711 }
1712
1713 error = -EAGAIN;
1714 if (force_blocking_asts(lkb))
1715 send_blocking_asts_all(r, lkb);
1716 queue_cast(r, lkb, -EAGAIN);
1717
1718 out:
1719 return error;
1720}
1721
1722static int do_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1723{
1724 remove_lock(r, lkb);
1725 queue_cast(r, lkb, -DLM_EUNLOCK);
1726 grant_pending_locks(r);
1727 return -DLM_EUNLOCK;
1728}
1729
1730static int do_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb)
1731{
1732 revert_lock(r, lkb);
1733 queue_cast(r, lkb, -DLM_ECANCEL);
1734 grant_pending_locks(r);
1735 return -DLM_ECANCEL;
1736}
1737
1738/*
1739 * Four stage 3 varieties:
1740 * _request_lock(), _convert_lock(), _unlock_lock(), _cancel_lock()
1741 */
1742
1743/* add a new lkb to a possibly new rsb, called by requesting process */
1744
1745static int _request_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1746{
1747 int error;
1748
1749 /* set_master: sets lkb nodeid from r */
1750
1751 error = set_master(r, lkb);
1752 if (error < 0)
1753 goto out;
1754 if (error) {
1755 error = 0;
1756 goto out;
1757 }
1758
1759 if (is_remote(r))
1760 /* receive_request() calls do_request() on remote node */
1761 error = send_request(r, lkb);
1762 else
1763 error = do_request(r, lkb);
1764 out:
1765 return error;
1766}
1767
3bcd3687 1768/* change some property of an existing lkb, e.g. mode */
e7fd4179
DT
1769
1770static int _convert_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1771{
1772 int error;
1773
1774 if (is_remote(r))
1775 /* receive_convert() calls do_convert() on remote node */
1776 error = send_convert(r, lkb);
1777 else
1778 error = do_convert(r, lkb);
1779
1780 return error;
1781}
1782
1783/* remove an existing lkb from the granted queue */
1784
1785static int _unlock_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1786{
1787 int error;
1788
1789 if (is_remote(r))
1790 /* receive_unlock() calls do_unlock() on remote node */
1791 error = send_unlock(r, lkb);
1792 else
1793 error = do_unlock(r, lkb);
1794
1795 return error;
1796}
1797
1798/* remove an existing lkb from the convert or wait queue */
1799
1800static int _cancel_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1801{
1802 int error;
1803
1804 if (is_remote(r))
1805 /* receive_cancel() calls do_cancel() on remote node */
1806 error = send_cancel(r, lkb);
1807 else
1808 error = do_cancel(r, lkb);
1809
1810 return error;
1811}
1812
1813/*
1814 * Four stage 2 varieties:
1815 * request_lock(), convert_lock(), unlock_lock(), cancel_lock()
1816 */
1817
1818static int request_lock(struct dlm_ls *ls, struct dlm_lkb *lkb, char *name,
1819 int len, struct dlm_args *args)
1820{
1821 struct dlm_rsb *r;
1822 int error;
1823
1824 error = validate_lock_args(ls, lkb, args);
1825 if (error)
1826 goto out;
1827
1828 error = find_rsb(ls, name, len, R_CREATE, &r);
1829 if (error)
1830 goto out;
1831
1832 lock_rsb(r);
1833
1834 attach_lkb(r, lkb);
1835 lkb->lkb_lksb->sb_lkid = lkb->lkb_id;
1836
1837 error = _request_lock(r, lkb);
1838
1839 unlock_rsb(r);
1840 put_rsb(r);
1841
1842 out:
1843 return error;
1844}
1845
1846static int convert_lock(struct dlm_ls *ls, struct dlm_lkb *lkb,
1847 struct dlm_args *args)
1848{
1849 struct dlm_rsb *r;
1850 int error;
1851
1852 r = lkb->lkb_resource;
1853
1854 hold_rsb(r);
1855 lock_rsb(r);
1856
1857 error = validate_lock_args(ls, lkb, args);
1858 if (error)
1859 goto out;
1860
1861 error = _convert_lock(r, lkb);
1862 out:
1863 unlock_rsb(r);
1864 put_rsb(r);
1865 return error;
1866}
1867
1868static int unlock_lock(struct dlm_ls *ls, struct dlm_lkb *lkb,
1869 struct dlm_args *args)
1870{
1871 struct dlm_rsb *r;
1872 int error;
1873
1874 r = lkb->lkb_resource;
1875
1876 hold_rsb(r);
1877 lock_rsb(r);
1878
1879 error = validate_unlock_args(lkb, args);
1880 if (error)
1881 goto out;
1882
1883 error = _unlock_lock(r, lkb);
1884 out:
1885 unlock_rsb(r);
1886 put_rsb(r);
1887 return error;
1888}
1889
1890static int cancel_lock(struct dlm_ls *ls, struct dlm_lkb *lkb,
1891 struct dlm_args *args)
1892{
1893 struct dlm_rsb *r;
1894 int error;
1895
1896 r = lkb->lkb_resource;
1897
1898 hold_rsb(r);
1899 lock_rsb(r);
1900
1901 error = validate_unlock_args(lkb, args);
1902 if (error)
1903 goto out;
1904
1905 error = _cancel_lock(r, lkb);
1906 out:
1907 unlock_rsb(r);
1908 put_rsb(r);
1909 return error;
1910}
1911
1912/*
1913 * Two stage 1 varieties: dlm_lock() and dlm_unlock()
1914 */
1915
1916int dlm_lock(dlm_lockspace_t *lockspace,
1917 int mode,
1918 struct dlm_lksb *lksb,
1919 uint32_t flags,
1920 void *name,
1921 unsigned int namelen,
1922 uint32_t parent_lkid,
1923 void (*ast) (void *astarg),
1924 void *astarg,
3bcd3687 1925 void (*bast) (void *astarg, int mode))
e7fd4179
DT
1926{
1927 struct dlm_ls *ls;
1928 struct dlm_lkb *lkb;
1929 struct dlm_args args;
1930 int error, convert = flags & DLM_LKF_CONVERT;
1931
1932 ls = dlm_find_lockspace_local(lockspace);
1933 if (!ls)
1934 return -EINVAL;
1935
1936 lock_recovery(ls);
1937
1938 if (convert)
1939 error = find_lkb(ls, lksb->sb_lkid, &lkb);
1940 else
1941 error = create_lkb(ls, &lkb);
1942
1943 if (error)
1944 goto out;
1945
1946 error = set_lock_args(mode, lksb, flags, namelen, parent_lkid, ast,
3bcd3687 1947 astarg, bast, &args);
e7fd4179
DT
1948 if (error)
1949 goto out_put;
1950
1951 if (convert)
1952 error = convert_lock(ls, lkb, &args);
1953 else
1954 error = request_lock(ls, lkb, name, namelen, &args);
1955
1956 if (error == -EINPROGRESS)
1957 error = 0;
1958 out_put:
1959 if (convert || error)
b3f58d8f 1960 __put_lkb(ls, lkb);
e7fd4179
DT
1961 if (error == -EAGAIN)
1962 error = 0;
1963 out:
1964 unlock_recovery(ls);
1965 dlm_put_lockspace(ls);
1966 return error;
1967}
1968
1969int dlm_unlock(dlm_lockspace_t *lockspace,
1970 uint32_t lkid,
1971 uint32_t flags,
1972 struct dlm_lksb *lksb,
1973 void *astarg)
1974{
1975 struct dlm_ls *ls;
1976 struct dlm_lkb *lkb;
1977 struct dlm_args args;
1978 int error;
1979
1980 ls = dlm_find_lockspace_local(lockspace);
1981 if (!ls)
1982 return -EINVAL;
1983
1984 lock_recovery(ls);
1985
1986 error = find_lkb(ls, lkid, &lkb);
1987 if (error)
1988 goto out;
1989
1990 error = set_unlock_args(flags, astarg, &args);
1991 if (error)
1992 goto out_put;
1993
1994 if (flags & DLM_LKF_CANCEL)
1995 error = cancel_lock(ls, lkb, &args);
1996 else
1997 error = unlock_lock(ls, lkb, &args);
1998
1999 if (error == -DLM_EUNLOCK || error == -DLM_ECANCEL)
2000 error = 0;
2001 out_put:
b3f58d8f 2002 dlm_put_lkb(lkb);
e7fd4179
DT
2003 out:
2004 unlock_recovery(ls);
2005 dlm_put_lockspace(ls);
2006 return error;
2007}
2008
2009/*
2010 * send/receive routines for remote operations and replies
2011 *
2012 * send_args
2013 * send_common
2014 * send_request receive_request
2015 * send_convert receive_convert
2016 * send_unlock receive_unlock
2017 * send_cancel receive_cancel
2018 * send_grant receive_grant
2019 * send_bast receive_bast
2020 * send_lookup receive_lookup
2021 * send_remove receive_remove
2022 *
2023 * send_common_reply
2024 * receive_request_reply send_request_reply
2025 * receive_convert_reply send_convert_reply
2026 * receive_unlock_reply send_unlock_reply
2027 * receive_cancel_reply send_cancel_reply
2028 * receive_lookup_reply send_lookup_reply
2029 */
2030
2031static int create_message(struct dlm_rsb *r, struct dlm_lkb *lkb,
2032 int to_nodeid, int mstype,
2033 struct dlm_message **ms_ret,
2034 struct dlm_mhandle **mh_ret)
2035{
2036 struct dlm_message *ms;
2037 struct dlm_mhandle *mh;
2038 char *mb;
2039 int mb_len = sizeof(struct dlm_message);
2040
2041 switch (mstype) {
2042 case DLM_MSG_REQUEST:
2043 case DLM_MSG_LOOKUP:
2044 case DLM_MSG_REMOVE:
2045 mb_len += r->res_length;
2046 break;
2047 case DLM_MSG_CONVERT:
2048 case DLM_MSG_UNLOCK:
2049 case DLM_MSG_REQUEST_REPLY:
2050 case DLM_MSG_CONVERT_REPLY:
2051 case DLM_MSG_GRANT:
2052 if (lkb && lkb->lkb_lvbptr)
2053 mb_len += r->res_ls->ls_lvblen;
2054 break;
2055 }
2056
2057 /* get_buffer gives us a message handle (mh) that we need to
2058 pass into lowcomms_commit and a message buffer (mb) that we
2059 write our data into */
2060
2061 mh = dlm_lowcomms_get_buffer(to_nodeid, mb_len, GFP_KERNEL, &mb);
2062 if (!mh)
2063 return -ENOBUFS;
2064
2065 memset(mb, 0, mb_len);
2066
2067 ms = (struct dlm_message *) mb;
2068
2069 ms->m_header.h_version = (DLM_HEADER_MAJOR | DLM_HEADER_MINOR);
2070 ms->m_header.h_lockspace = r->res_ls->ls_global_id;
2071 ms->m_header.h_nodeid = dlm_our_nodeid();
2072 ms->m_header.h_length = mb_len;
2073 ms->m_header.h_cmd = DLM_MSG;
2074
2075 ms->m_type = mstype;
2076
2077 *mh_ret = mh;
2078 *ms_ret = ms;
2079 return 0;
2080}
2081
2082/* further lowcomms enhancements or alternate implementations may make
2083 the return value from this function useful at some point */
2084
2085static int send_message(struct dlm_mhandle *mh, struct dlm_message *ms)
2086{
2087 dlm_message_out(ms);
2088 dlm_lowcomms_commit_buffer(mh);
2089 return 0;
2090}
2091
2092static void send_args(struct dlm_rsb *r, struct dlm_lkb *lkb,
2093 struct dlm_message *ms)
2094{
2095 ms->m_nodeid = lkb->lkb_nodeid;
2096 ms->m_pid = lkb->lkb_ownpid;
2097 ms->m_lkid = lkb->lkb_id;
2098 ms->m_remid = lkb->lkb_remid;
2099 ms->m_exflags = lkb->lkb_exflags;
2100 ms->m_sbflags = lkb->lkb_sbflags;
2101 ms->m_flags = lkb->lkb_flags;
2102 ms->m_lvbseq = lkb->lkb_lvbseq;
2103 ms->m_status = lkb->lkb_status;
2104 ms->m_grmode = lkb->lkb_grmode;
2105 ms->m_rqmode = lkb->lkb_rqmode;
2106 ms->m_hash = r->res_hash;
2107
2108 /* m_result and m_bastmode are set from function args,
2109 not from lkb fields */
2110
2111 if (lkb->lkb_bastaddr)
2112 ms->m_asts |= AST_BAST;
2113 if (lkb->lkb_astaddr)
2114 ms->m_asts |= AST_COMP;
2115
e7fd4179
DT
2116 if (ms->m_type == DLM_MSG_REQUEST || ms->m_type == DLM_MSG_LOOKUP)
2117 memcpy(ms->m_extra, r->res_name, r->res_length);
2118
2119 else if (lkb->lkb_lvbptr)
2120 memcpy(ms->m_extra, lkb->lkb_lvbptr, r->res_ls->ls_lvblen);
2121
2122}
2123
2124static int send_common(struct dlm_rsb *r, struct dlm_lkb *lkb, int mstype)
2125{
2126 struct dlm_message *ms;
2127 struct dlm_mhandle *mh;
2128 int to_nodeid, error;
2129
2130 add_to_waiters(lkb, mstype);
2131
2132 to_nodeid = r->res_nodeid;
2133
2134 error = create_message(r, lkb, to_nodeid, mstype, &ms, &mh);
2135 if (error)
2136 goto fail;
2137
2138 send_args(r, lkb, ms);
2139
2140 error = send_message(mh, ms);
2141 if (error)
2142 goto fail;
2143 return 0;
2144
2145 fail:
2146 remove_from_waiters(lkb);
2147 return error;
2148}
2149
2150static int send_request(struct dlm_rsb *r, struct dlm_lkb *lkb)
2151{
2152 return send_common(r, lkb, DLM_MSG_REQUEST);
2153}
2154
2155static int send_convert(struct dlm_rsb *r, struct dlm_lkb *lkb)
2156{
2157 int error;
2158
2159 error = send_common(r, lkb, DLM_MSG_CONVERT);
2160
2161 /* down conversions go without a reply from the master */
2162 if (!error && down_conversion(lkb)) {
2163 remove_from_waiters(lkb);
2164 r->res_ls->ls_stub_ms.m_result = 0;
2165 __receive_convert_reply(r, lkb, &r->res_ls->ls_stub_ms);
2166 }
2167
2168 return error;
2169}
2170
2171/* FIXME: if this lkb is the only lock we hold on the rsb, then set
2172 MASTER_UNCERTAIN to force the next request on the rsb to confirm
2173 that the master is still correct. */
2174
2175static int send_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2176{
2177 return send_common(r, lkb, DLM_MSG_UNLOCK);
2178}
2179
2180static int send_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb)
2181{
2182 return send_common(r, lkb, DLM_MSG_CANCEL);
2183}
2184
2185static int send_grant(struct dlm_rsb *r, struct dlm_lkb *lkb)
2186{
2187 struct dlm_message *ms;
2188 struct dlm_mhandle *mh;
2189 int to_nodeid, error;
2190
2191 to_nodeid = lkb->lkb_nodeid;
2192
2193 error = create_message(r, lkb, to_nodeid, DLM_MSG_GRANT, &ms, &mh);
2194 if (error)
2195 goto out;
2196
2197 send_args(r, lkb, ms);
2198
2199 ms->m_result = 0;
2200
2201 error = send_message(mh, ms);
2202 out:
2203 return error;
2204}
2205
2206static int send_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int mode)
2207{
2208 struct dlm_message *ms;
2209 struct dlm_mhandle *mh;
2210 int to_nodeid, error;
2211
2212 to_nodeid = lkb->lkb_nodeid;
2213
2214 error = create_message(r, NULL, to_nodeid, DLM_MSG_BAST, &ms, &mh);
2215 if (error)
2216 goto out;
2217
2218 send_args(r, lkb, ms);
2219
2220 ms->m_bastmode = mode;
2221
2222 error = send_message(mh, ms);
2223 out:
2224 return error;
2225}
2226
2227static int send_lookup(struct dlm_rsb *r, struct dlm_lkb *lkb)
2228{
2229 struct dlm_message *ms;
2230 struct dlm_mhandle *mh;
2231 int to_nodeid, error;
2232
2233 add_to_waiters(lkb, DLM_MSG_LOOKUP);
2234
2235 to_nodeid = dlm_dir_nodeid(r);
2236
2237 error = create_message(r, NULL, to_nodeid, DLM_MSG_LOOKUP, &ms, &mh);
2238 if (error)
2239 goto fail;
2240
2241 send_args(r, lkb, ms);
2242
2243 error = send_message(mh, ms);
2244 if (error)
2245 goto fail;
2246 return 0;
2247
2248 fail:
2249 remove_from_waiters(lkb);
2250 return error;
2251}
2252
2253static int send_remove(struct dlm_rsb *r)
2254{
2255 struct dlm_message *ms;
2256 struct dlm_mhandle *mh;
2257 int to_nodeid, error;
2258
2259 to_nodeid = dlm_dir_nodeid(r);
2260
2261 error = create_message(r, NULL, to_nodeid, DLM_MSG_REMOVE, &ms, &mh);
2262 if (error)
2263 goto out;
2264
2265 memcpy(ms->m_extra, r->res_name, r->res_length);
2266 ms->m_hash = r->res_hash;
2267
2268 error = send_message(mh, ms);
2269 out:
2270 return error;
2271}
2272
2273static int send_common_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
2274 int mstype, int rv)
2275{
2276 struct dlm_message *ms;
2277 struct dlm_mhandle *mh;
2278 int to_nodeid, error;
2279
2280 to_nodeid = lkb->lkb_nodeid;
2281
2282 error = create_message(r, lkb, to_nodeid, mstype, &ms, &mh);
2283 if (error)
2284 goto out;
2285
2286 send_args(r, lkb, ms);
2287
2288 ms->m_result = rv;
2289
2290 error = send_message(mh, ms);
2291 out:
2292 return error;
2293}
2294
2295static int send_request_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
2296{
2297 return send_common_reply(r, lkb, DLM_MSG_REQUEST_REPLY, rv);
2298}
2299
2300static int send_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
2301{
2302 return send_common_reply(r, lkb, DLM_MSG_CONVERT_REPLY, rv);
2303}
2304
2305static int send_unlock_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
2306{
2307 return send_common_reply(r, lkb, DLM_MSG_UNLOCK_REPLY, rv);
2308}
2309
2310static int send_cancel_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
2311{
2312 return send_common_reply(r, lkb, DLM_MSG_CANCEL_REPLY, rv);
2313}
2314
2315static int send_lookup_reply(struct dlm_ls *ls, struct dlm_message *ms_in,
2316 int ret_nodeid, int rv)
2317{
2318 struct dlm_rsb *r = &ls->ls_stub_rsb;
2319 struct dlm_message *ms;
2320 struct dlm_mhandle *mh;
2321 int error, nodeid = ms_in->m_header.h_nodeid;
2322
2323 error = create_message(r, NULL, nodeid, DLM_MSG_LOOKUP_REPLY, &ms, &mh);
2324 if (error)
2325 goto out;
2326
2327 ms->m_lkid = ms_in->m_lkid;
2328 ms->m_result = rv;
2329 ms->m_nodeid = ret_nodeid;
2330
2331 error = send_message(mh, ms);
2332 out:
2333 return error;
2334}
2335
2336/* which args we save from a received message depends heavily on the type
2337 of message, unlike the send side where we can safely send everything about
2338 the lkb for any type of message */
2339
2340static void receive_flags(struct dlm_lkb *lkb, struct dlm_message *ms)
2341{
2342 lkb->lkb_exflags = ms->m_exflags;
2343 lkb->lkb_flags = (lkb->lkb_flags & 0xFFFF0000) |
2344 (ms->m_flags & 0x0000FFFF);
2345}
2346
2347static void receive_flags_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
2348{
2349 lkb->lkb_sbflags = ms->m_sbflags;
2350 lkb->lkb_flags = (lkb->lkb_flags & 0xFFFF0000) |
2351 (ms->m_flags & 0x0000FFFF);
2352}
2353
2354static int receive_extralen(struct dlm_message *ms)
2355{
2356 return (ms->m_header.h_length - sizeof(struct dlm_message));
2357}
2358
e7fd4179
DT
2359static int receive_lvb(struct dlm_ls *ls, struct dlm_lkb *lkb,
2360 struct dlm_message *ms)
2361{
2362 int len;
2363
2364 if (lkb->lkb_exflags & DLM_LKF_VALBLK) {
2365 if (!lkb->lkb_lvbptr)
2366 lkb->lkb_lvbptr = allocate_lvb(ls);
2367 if (!lkb->lkb_lvbptr)
2368 return -ENOMEM;
2369 len = receive_extralen(ms);
2370 memcpy(lkb->lkb_lvbptr, ms->m_extra, len);
2371 }
2372 return 0;
2373}
2374
2375static int receive_request_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
2376 struct dlm_message *ms)
2377{
2378 lkb->lkb_nodeid = ms->m_header.h_nodeid;
2379 lkb->lkb_ownpid = ms->m_pid;
2380 lkb->lkb_remid = ms->m_lkid;
2381 lkb->lkb_grmode = DLM_LOCK_IV;
2382 lkb->lkb_rqmode = ms->m_rqmode;
2383 lkb->lkb_bastaddr = (void *) (long) (ms->m_asts & AST_BAST);
2384 lkb->lkb_astaddr = (void *) (long) (ms->m_asts & AST_COMP);
2385
2386 DLM_ASSERT(is_master_copy(lkb), dlm_print_lkb(lkb););
2387
e7fd4179
DT
2388 if (receive_lvb(ls, lkb, ms))
2389 return -ENOMEM;
2390
2391 return 0;
2392}
2393
2394static int receive_convert_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
2395 struct dlm_message *ms)
2396{
2397 if (lkb->lkb_nodeid != ms->m_header.h_nodeid) {
2398 log_error(ls, "convert_args nodeid %d %d lkid %x %x",
2399 lkb->lkb_nodeid, ms->m_header.h_nodeid,
2400 lkb->lkb_id, lkb->lkb_remid);
2401 return -EINVAL;
2402 }
2403
2404 if (!is_master_copy(lkb))
2405 return -EINVAL;
2406
2407 if (lkb->lkb_status != DLM_LKSTS_GRANTED)
2408 return -EBUSY;
2409
e7fd4179
DT
2410 if (receive_lvb(ls, lkb, ms))
2411 return -ENOMEM;
2412
2413 lkb->lkb_rqmode = ms->m_rqmode;
2414 lkb->lkb_lvbseq = ms->m_lvbseq;
2415
2416 return 0;
2417}
2418
2419static int receive_unlock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
2420 struct dlm_message *ms)
2421{
2422 if (!is_master_copy(lkb))
2423 return -EINVAL;
2424 if (receive_lvb(ls, lkb, ms))
2425 return -ENOMEM;
2426 return 0;
2427}
2428
2429/* We fill in the stub-lkb fields with the info that send_xxxx_reply()
2430 uses to send a reply and that the remote end uses to process the reply. */
2431
2432static void setup_stub_lkb(struct dlm_ls *ls, struct dlm_message *ms)
2433{
2434 struct dlm_lkb *lkb = &ls->ls_stub_lkb;
2435 lkb->lkb_nodeid = ms->m_header.h_nodeid;
2436 lkb->lkb_remid = ms->m_lkid;
2437}
2438
2439static void receive_request(struct dlm_ls *ls, struct dlm_message *ms)
2440{
2441 struct dlm_lkb *lkb;
2442 struct dlm_rsb *r;
2443 int error, namelen;
2444
2445 error = create_lkb(ls, &lkb);
2446 if (error)
2447 goto fail;
2448
2449 receive_flags(lkb, ms);
2450 lkb->lkb_flags |= DLM_IFL_MSTCPY;
2451 error = receive_request_args(ls, lkb, ms);
2452 if (error) {
b3f58d8f 2453 __put_lkb(ls, lkb);
e7fd4179
DT
2454 goto fail;
2455 }
2456
2457 namelen = receive_extralen(ms);
2458
2459 error = find_rsb(ls, ms->m_extra, namelen, R_MASTER, &r);
2460 if (error) {
b3f58d8f 2461 __put_lkb(ls, lkb);
e7fd4179
DT
2462 goto fail;
2463 }
2464
2465 lock_rsb(r);
2466
2467 attach_lkb(r, lkb);
2468 error = do_request(r, lkb);
2469 send_request_reply(r, lkb, error);
2470
2471 unlock_rsb(r);
2472 put_rsb(r);
2473
2474 if (error == -EINPROGRESS)
2475 error = 0;
2476 if (error)
b3f58d8f 2477 dlm_put_lkb(lkb);
e7fd4179
DT
2478 return;
2479
2480 fail:
2481 setup_stub_lkb(ls, ms);
2482 send_request_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
2483}
2484
2485static void receive_convert(struct dlm_ls *ls, struct dlm_message *ms)
2486{
2487 struct dlm_lkb *lkb;
2488 struct dlm_rsb *r;
90135925 2489 int error, reply = 1;
e7fd4179
DT
2490
2491 error = find_lkb(ls, ms->m_remid, &lkb);
2492 if (error)
2493 goto fail;
2494
2495 r = lkb->lkb_resource;
2496
2497 hold_rsb(r);
2498 lock_rsb(r);
2499
2500 receive_flags(lkb, ms);
2501 error = receive_convert_args(ls, lkb, ms);
2502 if (error)
2503 goto out;
2504 reply = !down_conversion(lkb);
2505
2506 error = do_convert(r, lkb);
2507 out:
2508 if (reply)
2509 send_convert_reply(r, lkb, error);
2510
2511 unlock_rsb(r);
2512 put_rsb(r);
b3f58d8f 2513 dlm_put_lkb(lkb);
e7fd4179
DT
2514 return;
2515
2516 fail:
2517 setup_stub_lkb(ls, ms);
2518 send_convert_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
2519}
2520
2521static void receive_unlock(struct dlm_ls *ls, struct dlm_message *ms)
2522{
2523 struct dlm_lkb *lkb;
2524 struct dlm_rsb *r;
2525 int error;
2526
2527 error = find_lkb(ls, ms->m_remid, &lkb);
2528 if (error)
2529 goto fail;
2530
2531 r = lkb->lkb_resource;
2532
2533 hold_rsb(r);
2534 lock_rsb(r);
2535
2536 receive_flags(lkb, ms);
2537 error = receive_unlock_args(ls, lkb, ms);
2538 if (error)
2539 goto out;
2540
2541 error = do_unlock(r, lkb);
2542 out:
2543 send_unlock_reply(r, lkb, error);
2544
2545 unlock_rsb(r);
2546 put_rsb(r);
b3f58d8f 2547 dlm_put_lkb(lkb);
e7fd4179
DT
2548 return;
2549
2550 fail:
2551 setup_stub_lkb(ls, ms);
2552 send_unlock_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
2553}
2554
2555static void receive_cancel(struct dlm_ls *ls, struct dlm_message *ms)
2556{
2557 struct dlm_lkb *lkb;
2558 struct dlm_rsb *r;
2559 int error;
2560
2561 error = find_lkb(ls, ms->m_remid, &lkb);
2562 if (error)
2563 goto fail;
2564
2565 receive_flags(lkb, ms);
2566
2567 r = lkb->lkb_resource;
2568
2569 hold_rsb(r);
2570 lock_rsb(r);
2571
2572 error = do_cancel(r, lkb);
2573 send_cancel_reply(r, lkb, error);
2574
2575 unlock_rsb(r);
2576 put_rsb(r);
b3f58d8f 2577 dlm_put_lkb(lkb);
e7fd4179
DT
2578 return;
2579
2580 fail:
2581 setup_stub_lkb(ls, ms);
2582 send_cancel_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
2583}
2584
2585static void receive_grant(struct dlm_ls *ls, struct dlm_message *ms)
2586{
2587 struct dlm_lkb *lkb;
2588 struct dlm_rsb *r;
2589 int error;
2590
2591 error = find_lkb(ls, ms->m_remid, &lkb);
2592 if (error) {
2593 log_error(ls, "receive_grant no lkb");
2594 return;
2595 }
2596 DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb););
2597
2598 r = lkb->lkb_resource;
2599
2600 hold_rsb(r);
2601 lock_rsb(r);
2602
2603 receive_flags_reply(lkb, ms);
2604 grant_lock_pc(r, lkb, ms);
2605 queue_cast(r, lkb, 0);
2606
2607 unlock_rsb(r);
2608 put_rsb(r);
b3f58d8f 2609 dlm_put_lkb(lkb);
e7fd4179
DT
2610}
2611
2612static void receive_bast(struct dlm_ls *ls, struct dlm_message *ms)
2613{
2614 struct dlm_lkb *lkb;
2615 struct dlm_rsb *r;
2616 int error;
2617
2618 error = find_lkb(ls, ms->m_remid, &lkb);
2619 if (error) {
2620 log_error(ls, "receive_bast no lkb");
2621 return;
2622 }
2623 DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb););
2624
2625 r = lkb->lkb_resource;
2626
2627 hold_rsb(r);
2628 lock_rsb(r);
2629
2630 queue_bast(r, lkb, ms->m_bastmode);
2631
2632 unlock_rsb(r);
2633 put_rsb(r);
b3f58d8f 2634 dlm_put_lkb(lkb);
e7fd4179
DT
2635}
2636
2637static void receive_lookup(struct dlm_ls *ls, struct dlm_message *ms)
2638{
2639 int len, error, ret_nodeid, dir_nodeid, from_nodeid, our_nodeid;
2640
2641 from_nodeid = ms->m_header.h_nodeid;
2642 our_nodeid = dlm_our_nodeid();
2643
2644 len = receive_extralen(ms);
2645
2646 dir_nodeid = dlm_hash2nodeid(ls, ms->m_hash);
2647 if (dir_nodeid != our_nodeid) {
2648 log_error(ls, "lookup dir_nodeid %d from %d",
2649 dir_nodeid, from_nodeid);
2650 error = -EINVAL;
2651 ret_nodeid = -1;
2652 goto out;
2653 }
2654
2655 error = dlm_dir_lookup(ls, from_nodeid, ms->m_extra, len, &ret_nodeid);
2656
2657 /* Optimization: we're master so treat lookup as a request */
2658 if (!error && ret_nodeid == our_nodeid) {
2659 receive_request(ls, ms);
2660 return;
2661 }
2662 out:
2663 send_lookup_reply(ls, ms, ret_nodeid, error);
2664}
2665
2666static void receive_remove(struct dlm_ls *ls, struct dlm_message *ms)
2667{
2668 int len, dir_nodeid, from_nodeid;
2669
2670 from_nodeid = ms->m_header.h_nodeid;
2671
2672 len = receive_extralen(ms);
2673
2674 dir_nodeid = dlm_hash2nodeid(ls, ms->m_hash);
2675 if (dir_nodeid != dlm_our_nodeid()) {
2676 log_error(ls, "remove dir entry dir_nodeid %d from %d",
2677 dir_nodeid, from_nodeid);
2678 return;
2679 }
2680
2681 dlm_dir_remove_entry(ls, from_nodeid, ms->m_extra, len);
2682}
2683
2684static void receive_request_reply(struct dlm_ls *ls, struct dlm_message *ms)
2685{
2686 struct dlm_lkb *lkb;
2687 struct dlm_rsb *r;
2688 int error, mstype;
2689
2690 error = find_lkb(ls, ms->m_remid, &lkb);
2691 if (error) {
2692 log_error(ls, "receive_request_reply no lkb");
2693 return;
2694 }
2695 DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb););
2696
2697 mstype = lkb->lkb_wait_type;
2698 error = remove_from_waiters(lkb);
2699 if (error) {
2700 log_error(ls, "receive_request_reply not on waiters");
2701 goto out;
2702 }
2703
2704 /* this is the value returned from do_request() on the master */
2705 error = ms->m_result;
2706
2707 r = lkb->lkb_resource;
2708 hold_rsb(r);
2709 lock_rsb(r);
2710
2711 /* Optimization: the dir node was also the master, so it took our
2712 lookup as a request and sent request reply instead of lookup reply */
2713 if (mstype == DLM_MSG_LOOKUP) {
2714 r->res_nodeid = ms->m_header.h_nodeid;
2715 lkb->lkb_nodeid = r->res_nodeid;
2716 }
2717
2718 switch (error) {
2719 case -EAGAIN:
2720 /* request would block (be queued) on remote master;
2721 the unhold undoes the original ref from create_lkb()
2722 so it leads to the lkb being freed */
2723 queue_cast(r, lkb, -EAGAIN);
2724 confirm_master(r, -EAGAIN);
2725 unhold_lkb(lkb);
2726 break;
2727
2728 case -EINPROGRESS:
2729 case 0:
2730 /* request was queued or granted on remote master */
2731 receive_flags_reply(lkb, ms);
2732 lkb->lkb_remid = ms->m_lkid;
2733 if (error)
2734 add_lkb(r, lkb, DLM_LKSTS_WAITING);
2735 else {
2736 grant_lock_pc(r, lkb, ms);
2737 queue_cast(r, lkb, 0);
2738 }
2739 confirm_master(r, error);
2740 break;
2741
2742 case -ENOENT:
2743 case -ENOTBLK:
2744 /* find_rsb failed to find rsb or rsb wasn't master */
2745 r->res_nodeid = -1;
2746 lkb->lkb_nodeid = -1;
2747 _request_lock(r, lkb);
2748 break;
2749
2750 default:
2751 log_error(ls, "receive_request_reply error %d", error);
2752 }
2753
2754 unlock_rsb(r);
2755 put_rsb(r);
2756 out:
b3f58d8f 2757 dlm_put_lkb(lkb);
e7fd4179
DT
2758}
2759
2760static void __receive_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
2761 struct dlm_message *ms)
2762{
2763 int error = ms->m_result;
2764
2765 /* this is the value returned from do_convert() on the master */
2766
2767 switch (error) {
2768 case -EAGAIN:
2769 /* convert would block (be queued) on remote master */
2770 queue_cast(r, lkb, -EAGAIN);
2771 break;
2772
2773 case -EINPROGRESS:
2774 /* convert was queued on remote master */
2775 del_lkb(r, lkb);
2776 add_lkb(r, lkb, DLM_LKSTS_CONVERT);
2777 break;
2778
2779 case 0:
2780 /* convert was granted on remote master */
2781 receive_flags_reply(lkb, ms);
2782 grant_lock_pc(r, lkb, ms);
2783 queue_cast(r, lkb, 0);
2784 break;
2785
2786 default:
2787 log_error(r->res_ls, "receive_convert_reply error %d", error);
2788 }
2789}
2790
2791static void _receive_convert_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
2792{
2793 struct dlm_rsb *r = lkb->lkb_resource;
2794
2795 hold_rsb(r);
2796 lock_rsb(r);
2797
2798 __receive_convert_reply(r, lkb, ms);
2799
2800 unlock_rsb(r);
2801 put_rsb(r);
2802}
2803
2804static void receive_convert_reply(struct dlm_ls *ls, struct dlm_message *ms)
2805{
2806 struct dlm_lkb *lkb;
2807 int error;
2808
2809 error = find_lkb(ls, ms->m_remid, &lkb);
2810 if (error) {
2811 log_error(ls, "receive_convert_reply no lkb");
2812 return;
2813 }
2814 DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb););
2815
2816 error = remove_from_waiters(lkb);
2817 if (error) {
2818 log_error(ls, "receive_convert_reply not on waiters");
2819 goto out;
2820 }
2821
2822 _receive_convert_reply(lkb, ms);
2823 out:
b3f58d8f 2824 dlm_put_lkb(lkb);
e7fd4179
DT
2825}
2826
2827static void _receive_unlock_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
2828{
2829 struct dlm_rsb *r = lkb->lkb_resource;
2830 int error = ms->m_result;
2831
2832 hold_rsb(r);
2833 lock_rsb(r);
2834
2835 /* this is the value returned from do_unlock() on the master */
2836
2837 switch (error) {
2838 case -DLM_EUNLOCK:
2839 receive_flags_reply(lkb, ms);
2840 remove_lock_pc(r, lkb);
2841 queue_cast(r, lkb, -DLM_EUNLOCK);
2842 break;
2843 default:
2844 log_error(r->res_ls, "receive_unlock_reply error %d", error);
2845 }
2846
2847 unlock_rsb(r);
2848 put_rsb(r);
2849}
2850
2851static void receive_unlock_reply(struct dlm_ls *ls, struct dlm_message *ms)
2852{
2853 struct dlm_lkb *lkb;
2854 int error;
2855
2856 error = find_lkb(ls, ms->m_remid, &lkb);
2857 if (error) {
2858 log_error(ls, "receive_unlock_reply no lkb");
2859 return;
2860 }
2861 DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb););
2862
2863 error = remove_from_waiters(lkb);
2864 if (error) {
2865 log_error(ls, "receive_unlock_reply not on waiters");
2866 goto out;
2867 }
2868
2869 _receive_unlock_reply(lkb, ms);
2870 out:
b3f58d8f 2871 dlm_put_lkb(lkb);
e7fd4179
DT
2872}
2873
2874static void _receive_cancel_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
2875{
2876 struct dlm_rsb *r = lkb->lkb_resource;
2877 int error = ms->m_result;
2878
2879 hold_rsb(r);
2880 lock_rsb(r);
2881
2882 /* this is the value returned from do_cancel() on the master */
2883
2884 switch (error) {
2885 case -DLM_ECANCEL:
2886 receive_flags_reply(lkb, ms);
2887 revert_lock_pc(r, lkb);
2888 queue_cast(r, lkb, -DLM_ECANCEL);
2889 break;
2890 default:
2891 log_error(r->res_ls, "receive_cancel_reply error %d", error);
2892 }
2893
2894 unlock_rsb(r);
2895 put_rsb(r);
2896}
2897
2898static void receive_cancel_reply(struct dlm_ls *ls, struct dlm_message *ms)
2899{
2900 struct dlm_lkb *lkb;
2901 int error;
2902
2903 error = find_lkb(ls, ms->m_remid, &lkb);
2904 if (error) {
2905 log_error(ls, "receive_cancel_reply no lkb");
2906 return;
2907 }
2908 DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb););
2909
2910 error = remove_from_waiters(lkb);
2911 if (error) {
2912 log_error(ls, "receive_cancel_reply not on waiters");
2913 goto out;
2914 }
2915
2916 _receive_cancel_reply(lkb, ms);
2917 out:
b3f58d8f 2918 dlm_put_lkb(lkb);
e7fd4179
DT
2919}
2920
2921static void receive_lookup_reply(struct dlm_ls *ls, struct dlm_message *ms)
2922{
2923 struct dlm_lkb *lkb;
2924 struct dlm_rsb *r;
2925 int error, ret_nodeid;
2926
2927 error = find_lkb(ls, ms->m_lkid, &lkb);
2928 if (error) {
2929 log_error(ls, "receive_lookup_reply no lkb");
2930 return;
2931 }
2932
2933 error = remove_from_waiters(lkb);
2934 if (error) {
2935 log_error(ls, "receive_lookup_reply not on waiters");
2936 goto out;
2937 }
2938
2939 /* this is the value returned by dlm_dir_lookup on dir node
2940 FIXME: will a non-zero error ever be returned? */
2941 error = ms->m_result;
2942
2943 r = lkb->lkb_resource;
2944 hold_rsb(r);
2945 lock_rsb(r);
2946
2947 ret_nodeid = ms->m_nodeid;
2948 if (ret_nodeid == dlm_our_nodeid()) {
2949 r->res_nodeid = 0;
2950 ret_nodeid = 0;
2951 r->res_first_lkid = 0;
2952 } else {
2953 /* set_master() will copy res_nodeid to lkb_nodeid */
2954 r->res_nodeid = ret_nodeid;
2955 }
2956
2957 _request_lock(r, lkb);
2958
2959 if (!ret_nodeid)
2960 process_lookup_list(r);
2961
2962 unlock_rsb(r);
2963 put_rsb(r);
2964 out:
b3f58d8f 2965 dlm_put_lkb(lkb);
e7fd4179
DT
2966}
2967
2968int dlm_receive_message(struct dlm_header *hd, int nodeid, int recovery)
2969{
2970 struct dlm_message *ms = (struct dlm_message *) hd;
2971 struct dlm_ls *ls;
2972 int error;
2973
2974 if (!recovery)
2975 dlm_message_in(ms);
2976
2977 ls = dlm_find_lockspace_global(hd->h_lockspace);
2978 if (!ls) {
2979 log_print("drop message %d from %d for unknown lockspace %d",
2980 ms->m_type, nodeid, hd->h_lockspace);
2981 return -EINVAL;
2982 }
2983
2984 /* recovery may have just ended leaving a bunch of backed-up requests
2985 in the requestqueue; wait while dlm_recoverd clears them */
2986
2987 if (!recovery)
2988 dlm_wait_requestqueue(ls);
2989
2990 /* recovery may have just started while there were a bunch of
2991 in-flight requests -- save them in requestqueue to be processed
2992 after recovery. we can't let dlm_recvd block on the recovery
2993 lock. if dlm_recoverd is calling this function to clear the
2994 requestqueue, it needs to be interrupted (-EINTR) if another
2995 recovery operation is starting. */
2996
2997 while (1) {
2998 if (dlm_locking_stopped(ls)) {
2999 if (!recovery)
3000 dlm_add_requestqueue(ls, nodeid, hd);
3001 error = -EINTR;
3002 goto out;
3003 }
3004
3005 if (lock_recovery_try(ls))
3006 break;
3007 schedule();
3008 }
3009
3010 switch (ms->m_type) {
3011
3012 /* messages sent to a master node */
3013
3014 case DLM_MSG_REQUEST:
3015 receive_request(ls, ms);
3016 break;
3017
3018 case DLM_MSG_CONVERT:
3019 receive_convert(ls, ms);
3020 break;
3021
3022 case DLM_MSG_UNLOCK:
3023 receive_unlock(ls, ms);
3024 break;
3025
3026 case DLM_MSG_CANCEL:
3027 receive_cancel(ls, ms);
3028 break;
3029
3030 /* messages sent from a master node (replies to above) */
3031
3032 case DLM_MSG_REQUEST_REPLY:
3033 receive_request_reply(ls, ms);
3034 break;
3035
3036 case DLM_MSG_CONVERT_REPLY:
3037 receive_convert_reply(ls, ms);
3038 break;
3039
3040 case DLM_MSG_UNLOCK_REPLY:
3041 receive_unlock_reply(ls, ms);
3042 break;
3043
3044 case DLM_MSG_CANCEL_REPLY:
3045 receive_cancel_reply(ls, ms);
3046 break;
3047
3048 /* messages sent from a master node (only two types of async msg) */
3049
3050 case DLM_MSG_GRANT:
3051 receive_grant(ls, ms);
3052 break;
3053
3054 case DLM_MSG_BAST:
3055 receive_bast(ls, ms);
3056 break;
3057
3058 /* messages sent to a dir node */
3059
3060 case DLM_MSG_LOOKUP:
3061 receive_lookup(ls, ms);
3062 break;
3063
3064 case DLM_MSG_REMOVE:
3065 receive_remove(ls, ms);
3066 break;
3067
3068 /* messages sent from a dir node (remove has no reply) */
3069
3070 case DLM_MSG_LOOKUP_REPLY:
3071 receive_lookup_reply(ls, ms);
3072 break;
3073
3074 default:
3075 log_error(ls, "unknown message type %d", ms->m_type);
3076 }
3077
3078 unlock_recovery(ls);
3079 out:
3080 dlm_put_lockspace(ls);
3081 dlm_astd_wake();
3082 return 0;
3083}
3084
3085
3086/*
3087 * Recovery related
3088 */
3089
3090static void recover_convert_waiter(struct dlm_ls *ls, struct dlm_lkb *lkb)
3091{
3092 if (middle_conversion(lkb)) {
3093 hold_lkb(lkb);
3094 ls->ls_stub_ms.m_result = -EINPROGRESS;
3095 _remove_from_waiters(lkb);
3096 _receive_convert_reply(lkb, &ls->ls_stub_ms);
3097
3098 /* Same special case as in receive_rcom_lock_args() */
3099 lkb->lkb_grmode = DLM_LOCK_IV;
3100 rsb_set_flag(lkb->lkb_resource, RSB_RECOVER_CONVERT);
3101 unhold_lkb(lkb);
3102
3103 } else if (lkb->lkb_rqmode >= lkb->lkb_grmode) {
3104 lkb->lkb_flags |= DLM_IFL_RESEND;
3105 }
3106
3107 /* lkb->lkb_rqmode < lkb->lkb_grmode shouldn't happen since down
3108 conversions are async; there's no reply from the remote master */
3109}
3110
3111/* A waiting lkb needs recovery if the master node has failed, or
3112 the master node is changing (only when no directory is used) */
3113
3114static int waiter_needs_recovery(struct dlm_ls *ls, struct dlm_lkb *lkb)
3115{
3116 if (dlm_is_removed(ls, lkb->lkb_nodeid))
3117 return 1;
3118
3119 if (!dlm_no_directory(ls))
3120 return 0;
3121
3122 if (dlm_dir_nodeid(lkb->lkb_resource) != lkb->lkb_nodeid)
3123 return 1;
3124
3125 return 0;
3126}
3127
3128/* Recovery for locks that are waiting for replies from nodes that are now
3129 gone. We can just complete unlocks and cancels by faking a reply from the
3130 dead node. Requests and up-conversions we flag to be resent after
3131 recovery. Down-conversions can just be completed with a fake reply like
3132 unlocks. Conversions between PR and CW need special attention. */
3133
3134void dlm_recover_waiters_pre(struct dlm_ls *ls)
3135{
3136 struct dlm_lkb *lkb, *safe;
3137
90135925 3138 mutex_lock(&ls->ls_waiters_mutex);
e7fd4179
DT
3139
3140 list_for_each_entry_safe(lkb, safe, &ls->ls_waiters, lkb_wait_reply) {
3141 log_debug(ls, "pre recover waiter lkid %x type %d flags %x",
3142 lkb->lkb_id, lkb->lkb_wait_type, lkb->lkb_flags);
3143
3144 /* all outstanding lookups, regardless of destination will be
3145 resent after recovery is done */
3146
3147 if (lkb->lkb_wait_type == DLM_MSG_LOOKUP) {
3148 lkb->lkb_flags |= DLM_IFL_RESEND;
3149 continue;
3150 }
3151
3152 if (!waiter_needs_recovery(ls, lkb))
3153 continue;
3154
3155 switch (lkb->lkb_wait_type) {
3156
3157 case DLM_MSG_REQUEST:
3158 lkb->lkb_flags |= DLM_IFL_RESEND;
3159 break;
3160
3161 case DLM_MSG_CONVERT:
3162 recover_convert_waiter(ls, lkb);
3163 break;
3164
3165 case DLM_MSG_UNLOCK:
3166 hold_lkb(lkb);
3167 ls->ls_stub_ms.m_result = -DLM_EUNLOCK;
3168 _remove_from_waiters(lkb);
3169 _receive_unlock_reply(lkb, &ls->ls_stub_ms);
b3f58d8f 3170 dlm_put_lkb(lkb);
e7fd4179
DT
3171 break;
3172
3173 case DLM_MSG_CANCEL:
3174 hold_lkb(lkb);
3175 ls->ls_stub_ms.m_result = -DLM_ECANCEL;
3176 _remove_from_waiters(lkb);
3177 _receive_cancel_reply(lkb, &ls->ls_stub_ms);
b3f58d8f 3178 dlm_put_lkb(lkb);
e7fd4179
DT
3179 break;
3180
3181 default:
3182 log_error(ls, "invalid lkb wait_type %d",
3183 lkb->lkb_wait_type);
3184 }
3185 }
90135925 3186 mutex_unlock(&ls->ls_waiters_mutex);
e7fd4179
DT
3187}
3188
3189static int remove_resend_waiter(struct dlm_ls *ls, struct dlm_lkb **lkb_ret)
3190{
3191 struct dlm_lkb *lkb;
3192 int rv = 0;
3193
90135925 3194 mutex_lock(&ls->ls_waiters_mutex);
e7fd4179
DT
3195 list_for_each_entry(lkb, &ls->ls_waiters, lkb_wait_reply) {
3196 if (lkb->lkb_flags & DLM_IFL_RESEND) {
3197 rv = lkb->lkb_wait_type;
3198 _remove_from_waiters(lkb);
3199 lkb->lkb_flags &= ~DLM_IFL_RESEND;
3200 break;
3201 }
3202 }
90135925 3203 mutex_unlock(&ls->ls_waiters_mutex);
e7fd4179
DT
3204
3205 if (!rv)
3206 lkb = NULL;
3207 *lkb_ret = lkb;
3208 return rv;
3209}
3210
3211/* Deal with lookups and lkb's marked RESEND from _pre. We may now be the
3212 master or dir-node for r. Processing the lkb may result in it being placed
3213 back on waiters. */
3214
3215int dlm_recover_waiters_post(struct dlm_ls *ls)
3216{
3217 struct dlm_lkb *lkb;
3218 struct dlm_rsb *r;
3219 int error = 0, mstype;
3220
3221 while (1) {
3222 if (dlm_locking_stopped(ls)) {
3223 log_debug(ls, "recover_waiters_post aborted");
3224 error = -EINTR;
3225 break;
3226 }
3227
3228 mstype = remove_resend_waiter(ls, &lkb);
3229 if (!mstype)
3230 break;
3231
3232 r = lkb->lkb_resource;
3233
3234 log_debug(ls, "recover_waiters_post %x type %d flags %x %s",
3235 lkb->lkb_id, mstype, lkb->lkb_flags, r->res_name);
3236
3237 switch (mstype) {
3238
3239 case DLM_MSG_LOOKUP:
3240 hold_rsb(r);
3241 lock_rsb(r);
3242 _request_lock(r, lkb);
3243 if (is_master(r))
3244 confirm_master(r, 0);
3245 unlock_rsb(r);
3246 put_rsb(r);
3247 break;
3248
3249 case DLM_MSG_REQUEST:
3250 hold_rsb(r);
3251 lock_rsb(r);
3252 _request_lock(r, lkb);
3253 unlock_rsb(r);
3254 put_rsb(r);
3255 break;
3256
3257 case DLM_MSG_CONVERT:
3258 hold_rsb(r);
3259 lock_rsb(r);
3260 _convert_lock(r, lkb);
3261 unlock_rsb(r);
3262 put_rsb(r);
3263 break;
3264
3265 default:
3266 log_error(ls, "recover_waiters_post type %d", mstype);
3267 }
3268 }
3269
3270 return error;
3271}
3272
3273static void purge_queue(struct dlm_rsb *r, struct list_head *queue,
3274 int (*test)(struct dlm_ls *ls, struct dlm_lkb *lkb))
3275{
3276 struct dlm_ls *ls = r->res_ls;
3277 struct dlm_lkb *lkb, *safe;
3278
3279 list_for_each_entry_safe(lkb, safe, queue, lkb_statequeue) {
3280 if (test(ls, lkb)) {
3281 del_lkb(r, lkb);
3282 /* this put should free the lkb */
b3f58d8f 3283 if (!dlm_put_lkb(lkb))
e7fd4179
DT
3284 log_error(ls, "purged lkb not released");
3285 }
3286 }
3287}
3288
3289static int purge_dead_test(struct dlm_ls *ls, struct dlm_lkb *lkb)
3290{
3291 return (is_master_copy(lkb) && dlm_is_removed(ls, lkb->lkb_nodeid));
3292}
3293
3294static int purge_mstcpy_test(struct dlm_ls *ls, struct dlm_lkb *lkb)
3295{
3296 return is_master_copy(lkb);
3297}
3298
3299static void purge_dead_locks(struct dlm_rsb *r)
3300{
3301 purge_queue(r, &r->res_grantqueue, &purge_dead_test);
3302 purge_queue(r, &r->res_convertqueue, &purge_dead_test);
3303 purge_queue(r, &r->res_waitqueue, &purge_dead_test);
3304}
3305
3306void dlm_purge_mstcpy_locks(struct dlm_rsb *r)
3307{
3308 purge_queue(r, &r->res_grantqueue, &purge_mstcpy_test);
3309 purge_queue(r, &r->res_convertqueue, &purge_mstcpy_test);
3310 purge_queue(r, &r->res_waitqueue, &purge_mstcpy_test);
3311}
3312
3313/* Get rid of locks held by nodes that are gone. */
3314
3315int dlm_purge_locks(struct dlm_ls *ls)
3316{
3317 struct dlm_rsb *r;
3318
3319 log_debug(ls, "dlm_purge_locks");
3320
3321 down_write(&ls->ls_root_sem);
3322 list_for_each_entry(r, &ls->ls_root_list, res_root_list) {
3323 hold_rsb(r);
3324 lock_rsb(r);
3325 if (is_master(r))
3326 purge_dead_locks(r);
3327 unlock_rsb(r);
3328 unhold_rsb(r);
3329
3330 schedule();
3331 }
3332 up_write(&ls->ls_root_sem);
3333
3334 return 0;
3335}
3336
3337int dlm_grant_after_purge(struct dlm_ls *ls)
3338{
3339 struct dlm_rsb *r;
3340 int i;
3341
3342 for (i = 0; i < ls->ls_rsbtbl_size; i++) {
3343 read_lock(&ls->ls_rsbtbl[i].lock);
3344 list_for_each_entry(r, &ls->ls_rsbtbl[i].list, res_hashchain) {
3345 hold_rsb(r);
3346 lock_rsb(r);
3347 if (is_master(r)) {
3348 grant_pending_locks(r);
3349 confirm_master(r, 0);
3350 }
3351 unlock_rsb(r);
3352 put_rsb(r);
3353 }
3354 read_unlock(&ls->ls_rsbtbl[i].lock);
3355 }
3356
3357 return 0;
3358}
3359
3360static struct dlm_lkb *search_remid_list(struct list_head *head, int nodeid,
3361 uint32_t remid)
3362{
3363 struct dlm_lkb *lkb;
3364
3365 list_for_each_entry(lkb, head, lkb_statequeue) {
3366 if (lkb->lkb_nodeid == nodeid && lkb->lkb_remid == remid)
3367 return lkb;
3368 }
3369 return NULL;
3370}
3371
3372static struct dlm_lkb *search_remid(struct dlm_rsb *r, int nodeid,
3373 uint32_t remid)
3374{
3375 struct dlm_lkb *lkb;
3376
3377 lkb = search_remid_list(&r->res_grantqueue, nodeid, remid);
3378 if (lkb)
3379 return lkb;
3380 lkb = search_remid_list(&r->res_convertqueue, nodeid, remid);
3381 if (lkb)
3382 return lkb;
3383 lkb = search_remid_list(&r->res_waitqueue, nodeid, remid);
3384 if (lkb)
3385 return lkb;
3386 return NULL;
3387}
3388
3389static int receive_rcom_lock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
3390 struct dlm_rsb *r, struct dlm_rcom *rc)
3391{
3392 struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf;
3393 int lvblen;
3394
3395 lkb->lkb_nodeid = rc->rc_header.h_nodeid;
3396 lkb->lkb_ownpid = rl->rl_ownpid;
3397 lkb->lkb_remid = rl->rl_lkid;
3398 lkb->lkb_exflags = rl->rl_exflags;
3399 lkb->lkb_flags = rl->rl_flags & 0x0000FFFF;
3400 lkb->lkb_flags |= DLM_IFL_MSTCPY;
3401 lkb->lkb_lvbseq = rl->rl_lvbseq;
3402 lkb->lkb_rqmode = rl->rl_rqmode;
3403 lkb->lkb_grmode = rl->rl_grmode;
3404 /* don't set lkb_status because add_lkb wants to itself */
3405
3406 lkb->lkb_bastaddr = (void *) (long) (rl->rl_asts & AST_BAST);
3407 lkb->lkb_astaddr = (void *) (long) (rl->rl_asts & AST_COMP);
3408
e7fd4179
DT
3409 if (lkb->lkb_exflags & DLM_LKF_VALBLK) {
3410 lkb->lkb_lvbptr = allocate_lvb(ls);
3411 if (!lkb->lkb_lvbptr)
3412 return -ENOMEM;
3413 lvblen = rc->rc_header.h_length - sizeof(struct dlm_rcom) -
3414 sizeof(struct rcom_lock);
3415 memcpy(lkb->lkb_lvbptr, rl->rl_lvb, lvblen);
3416 }
3417
3418 /* Conversions between PR and CW (middle modes) need special handling.
3419 The real granted mode of these converting locks cannot be determined
3420 until all locks have been rebuilt on the rsb (recover_conversion) */
3421
3422 if (rl->rl_wait_type == DLM_MSG_CONVERT && middle_conversion(lkb)) {
3423 rl->rl_status = DLM_LKSTS_CONVERT;
3424 lkb->lkb_grmode = DLM_LOCK_IV;
3425 rsb_set_flag(r, RSB_RECOVER_CONVERT);
3426 }
3427
3428 return 0;
3429}
3430
3431/* This lkb may have been recovered in a previous aborted recovery so we need
3432 to check if the rsb already has an lkb with the given remote nodeid/lkid.
3433 If so we just send back a standard reply. If not, we create a new lkb with
3434 the given values and send back our lkid. We send back our lkid by sending
3435 back the rcom_lock struct we got but with the remid field filled in. */
3436
3437int dlm_recover_master_copy(struct dlm_ls *ls, struct dlm_rcom *rc)
3438{
3439 struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf;
3440 struct dlm_rsb *r;
3441 struct dlm_lkb *lkb;
3442 int error;
3443
3444 if (rl->rl_parent_lkid) {
3445 error = -EOPNOTSUPP;
3446 goto out;
3447 }
3448
3449 error = find_rsb(ls, rl->rl_name, rl->rl_namelen, R_MASTER, &r);
3450 if (error)
3451 goto out;
3452
3453 lock_rsb(r);
3454
3455 lkb = search_remid(r, rc->rc_header.h_nodeid, rl->rl_lkid);
3456 if (lkb) {
3457 error = -EEXIST;
3458 goto out_remid;
3459 }
3460
3461 error = create_lkb(ls, &lkb);
3462 if (error)
3463 goto out_unlock;
3464
3465 error = receive_rcom_lock_args(ls, lkb, r, rc);
3466 if (error) {
b3f58d8f 3467 __put_lkb(ls, lkb);
e7fd4179
DT
3468 goto out_unlock;
3469 }
3470
3471 attach_lkb(r, lkb);
3472 add_lkb(r, lkb, rl->rl_status);
3473 error = 0;
3474
3475 out_remid:
3476 /* this is the new value returned to the lock holder for
3477 saving in its process-copy lkb */
3478 rl->rl_remid = lkb->lkb_id;
3479
3480 out_unlock:
3481 unlock_rsb(r);
3482 put_rsb(r);
3483 out:
3484 if (error)
3485 log_print("recover_master_copy %d %x", error, rl->rl_lkid);
3486 rl->rl_result = error;
3487 return error;
3488}
3489
3490int dlm_recover_process_copy(struct dlm_ls *ls, struct dlm_rcom *rc)
3491{
3492 struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf;
3493 struct dlm_rsb *r;
3494 struct dlm_lkb *lkb;
3495 int error;
3496
3497 error = find_lkb(ls, rl->rl_lkid, &lkb);
3498 if (error) {
3499 log_error(ls, "recover_process_copy no lkid %x", rl->rl_lkid);
3500 return error;
3501 }
3502
3503 DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb););
3504
3505 error = rl->rl_result;
3506
3507 r = lkb->lkb_resource;
3508 hold_rsb(r);
3509 lock_rsb(r);
3510
3511 switch (error) {
3512 case -EEXIST:
3513 log_debug(ls, "master copy exists %x", lkb->lkb_id);
3514 /* fall through */
3515 case 0:
3516 lkb->lkb_remid = rl->rl_remid;
3517 break;
3518 default:
3519 log_error(ls, "dlm_recover_process_copy unknown error %d %x",
3520 error, lkb->lkb_id);
3521 }
3522
3523 /* an ack for dlm_recover_locks() which waits for replies from
3524 all the locks it sends to new masters */
3525 dlm_recovered_lock(r);
3526
3527 unlock_rsb(r);
3528 put_rsb(r);
b3f58d8f 3529 dlm_put_lkb(lkb);
e7fd4179
DT
3530
3531 return 0;
3532}
3533
This page took 0.159493 seconds and 5 git commands to generate.