[GFS2] Macros removal in gfs2.h
[deliverable/linux.git] / fs / dlm / lock.c
CommitLineData
e7fd4179
DT
1/******************************************************************************
2*******************************************************************************
3**
4** Copyright (C) 2005 Red Hat, Inc. All rights reserved.
5**
6** This copyrighted material is made available to anyone wishing to use,
7** modify, copy, or redistribute it subject to the terms and conditions
8** of the GNU General Public License v.2.
9**
10*******************************************************************************
11******************************************************************************/
12
13/* Central locking logic has four stages:
14
15 dlm_lock()
16 dlm_unlock()
17
18 request_lock(ls, lkb)
19 convert_lock(ls, lkb)
20 unlock_lock(ls, lkb)
21 cancel_lock(ls, lkb)
22
23 _request_lock(r, lkb)
24 _convert_lock(r, lkb)
25 _unlock_lock(r, lkb)
26 _cancel_lock(r, lkb)
27
28 do_request(r, lkb)
29 do_convert(r, lkb)
30 do_unlock(r, lkb)
31 do_cancel(r, lkb)
32
33 Stage 1 (lock, unlock) is mainly about checking input args and
34 splitting into one of the four main operations:
35
36 dlm_lock = request_lock
37 dlm_lock+CONVERT = convert_lock
38 dlm_unlock = unlock_lock
39 dlm_unlock+CANCEL = cancel_lock
40
41 Stage 2, xxxx_lock(), just finds and locks the relevant rsb which is
42 provided to the next stage.
43
44 Stage 3, _xxxx_lock(), determines if the operation is local or remote.
45 When remote, it calls send_xxxx(), when local it calls do_xxxx().
46
47 Stage 4, do_xxxx(), is the guts of the operation. It manipulates the
48 given rsb and lkb and queues callbacks.
49
50 For remote operations, send_xxxx() results in the corresponding do_xxxx()
51 function being executed on the remote node. The connecting send/receive
52 calls on local (L) and remote (R) nodes:
53
54 L: send_xxxx() -> R: receive_xxxx()
55 R: do_xxxx()
56 L: receive_xxxx_reply() <- R: send_xxxx_reply()
57*/
58
59#include "dlm_internal.h"
60#include "memory.h"
61#include "lowcomms.h"
62#include "requestqueue.h"
63#include "util.h"
64#include "dir.h"
65#include "member.h"
66#include "lockspace.h"
67#include "ast.h"
68#include "lock.h"
69#include "rcom.h"
70#include "recover.h"
71#include "lvb_table.h"
72#include "config.h"
73
74static int send_request(struct dlm_rsb *r, struct dlm_lkb *lkb);
75static int send_convert(struct dlm_rsb *r, struct dlm_lkb *lkb);
76static int send_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb);
77static int send_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb);
78static int send_grant(struct dlm_rsb *r, struct dlm_lkb *lkb);
79static int send_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int mode);
80static int send_lookup(struct dlm_rsb *r, struct dlm_lkb *lkb);
81static int send_remove(struct dlm_rsb *r);
82static int _request_lock(struct dlm_rsb *r, struct dlm_lkb *lkb);
83static void __receive_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
84 struct dlm_message *ms);
85static int receive_extralen(struct dlm_message *ms);
86
87/*
88 * Lock compatibilty matrix - thanks Steve
89 * UN = Unlocked state. Not really a state, used as a flag
90 * PD = Padding. Used to make the matrix a nice power of two in size
91 * Other states are the same as the VMS DLM.
92 * Usage: matrix[grmode+1][rqmode+1] (although m[rq+1][gr+1] is the same)
93 */
94
95static const int __dlm_compat_matrix[8][8] = {
96 /* UN NL CR CW PR PW EX PD */
97 {1, 1, 1, 1, 1, 1, 1, 0}, /* UN */
98 {1, 1, 1, 1, 1, 1, 1, 0}, /* NL */
99 {1, 1, 1, 1, 1, 1, 0, 0}, /* CR */
100 {1, 1, 1, 1, 0, 0, 0, 0}, /* CW */
101 {1, 1, 1, 0, 1, 0, 0, 0}, /* PR */
102 {1, 1, 1, 0, 0, 0, 0, 0}, /* PW */
103 {1, 1, 0, 0, 0, 0, 0, 0}, /* EX */
104 {0, 0, 0, 0, 0, 0, 0, 0} /* PD */
105};
106
107/*
108 * This defines the direction of transfer of LVB data.
109 * Granted mode is the row; requested mode is the column.
110 * Usage: matrix[grmode+1][rqmode+1]
111 * 1 = LVB is returned to the caller
112 * 0 = LVB is written to the resource
113 * -1 = nothing happens to the LVB
114 */
115
116const int dlm_lvb_operations[8][8] = {
117 /* UN NL CR CW PR PW EX PD*/
118 { -1, 1, 1, 1, 1, 1, 1, -1 }, /* UN */
119 { -1, 1, 1, 1, 1, 1, 1, 0 }, /* NL */
120 { -1, -1, 1, 1, 1, 1, 1, 0 }, /* CR */
121 { -1, -1, -1, 1, 1, 1, 1, 0 }, /* CW */
122 { -1, -1, -1, -1, 1, 1, 1, 0 }, /* PR */
123 { -1, 0, 0, 0, 0, 0, 1, 0 }, /* PW */
124 { -1, 0, 0, 0, 0, 0, 0, 0 }, /* EX */
125 { -1, 0, 0, 0, 0, 0, 0, 0 } /* PD */
126};
127EXPORT_SYMBOL_GPL(dlm_lvb_operations);
128
129#define modes_compat(gr, rq) \
130 __dlm_compat_matrix[(gr)->lkb_grmode + 1][(rq)->lkb_rqmode + 1]
131
132int dlm_modes_compat(int mode1, int mode2)
133{
134 return __dlm_compat_matrix[mode1 + 1][mode2 + 1];
135}
136
137/*
138 * Compatibility matrix for conversions with QUECVT set.
139 * Granted mode is the row; requested mode is the column.
140 * Usage: matrix[grmode+1][rqmode+1]
141 */
142
143static const int __quecvt_compat_matrix[8][8] = {
144 /* UN NL CR CW PR PW EX PD */
145 {0, 0, 0, 0, 0, 0, 0, 0}, /* UN */
146 {0, 0, 1, 1, 1, 1, 1, 0}, /* NL */
147 {0, 0, 0, 1, 1, 1, 1, 0}, /* CR */
148 {0, 0, 0, 0, 1, 1, 1, 0}, /* CW */
149 {0, 0, 0, 1, 0, 1, 1, 0}, /* PR */
150 {0, 0, 0, 0, 0, 0, 1, 0}, /* PW */
151 {0, 0, 0, 0, 0, 0, 0, 0}, /* EX */
152 {0, 0, 0, 0, 0, 0, 0, 0} /* PD */
153};
154
155static void dlm_print_lkb(struct dlm_lkb *lkb)
156{
157 printk(KERN_ERR "lkb: nodeid %d id %x remid %x exflags %x flags %x\n"
158 " status %d rqmode %d grmode %d wait_type %d ast_type %d\n",
159 lkb->lkb_nodeid, lkb->lkb_id, lkb->lkb_remid, lkb->lkb_exflags,
160 lkb->lkb_flags, lkb->lkb_status, lkb->lkb_rqmode,
161 lkb->lkb_grmode, lkb->lkb_wait_type, lkb->lkb_ast_type);
162}
163
164void dlm_print_rsb(struct dlm_rsb *r)
165{
166 printk(KERN_ERR "rsb: nodeid %d flags %lx first %x rlc %d name %s\n",
167 r->res_nodeid, r->res_flags, r->res_first_lkid,
168 r->res_recover_locks_count, r->res_name);
169}
170
171/* Threads cannot use the lockspace while it's being recovered */
172
173static inline void lock_recovery(struct dlm_ls *ls)
174{
175 down_read(&ls->ls_in_recovery);
176}
177
178static inline void unlock_recovery(struct dlm_ls *ls)
179{
180 up_read(&ls->ls_in_recovery);
181}
182
183static inline int lock_recovery_try(struct dlm_ls *ls)
184{
185 return down_read_trylock(&ls->ls_in_recovery);
186}
187
188static inline int can_be_queued(struct dlm_lkb *lkb)
189{
190 return !(lkb->lkb_exflags & DLM_LKF_NOQUEUE);
191}
192
193static inline int force_blocking_asts(struct dlm_lkb *lkb)
194{
195 return (lkb->lkb_exflags & DLM_LKF_NOQUEUEBAST);
196}
197
198static inline int is_demoted(struct dlm_lkb *lkb)
199{
200 return (lkb->lkb_sbflags & DLM_SBF_DEMOTED);
201}
202
203static inline int is_remote(struct dlm_rsb *r)
204{
205 DLM_ASSERT(r->res_nodeid >= 0, dlm_print_rsb(r););
206 return !!r->res_nodeid;
207}
208
209static inline int is_process_copy(struct dlm_lkb *lkb)
210{
211 return (lkb->lkb_nodeid && !(lkb->lkb_flags & DLM_IFL_MSTCPY));
212}
213
214static inline int is_master_copy(struct dlm_lkb *lkb)
215{
216 if (lkb->lkb_flags & DLM_IFL_MSTCPY)
217 DLM_ASSERT(lkb->lkb_nodeid, dlm_print_lkb(lkb););
90135925 218 return (lkb->lkb_flags & DLM_IFL_MSTCPY) ? 1 : 0;
e7fd4179
DT
219}
220
221static inline int middle_conversion(struct dlm_lkb *lkb)
222{
223 if ((lkb->lkb_grmode==DLM_LOCK_PR && lkb->lkb_rqmode==DLM_LOCK_CW) ||
224 (lkb->lkb_rqmode==DLM_LOCK_PR && lkb->lkb_grmode==DLM_LOCK_CW))
90135925
DT
225 return 1;
226 return 0;
e7fd4179
DT
227}
228
229static inline int down_conversion(struct dlm_lkb *lkb)
230{
231 return (!middle_conversion(lkb) && lkb->lkb_rqmode < lkb->lkb_grmode);
232}
233
234static void queue_cast(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
235{
236 if (is_master_copy(lkb))
237 return;
238
239 DLM_ASSERT(lkb->lkb_lksb, dlm_print_lkb(lkb););
240
241 lkb->lkb_lksb->sb_status = rv;
242 lkb->lkb_lksb->sb_flags = lkb->lkb_sbflags;
243
244 dlm_add_ast(lkb, AST_COMP);
245}
246
247static void queue_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int rqmode)
248{
249 if (is_master_copy(lkb))
250 send_bast(r, lkb, rqmode);
251 else {
252 lkb->lkb_bastmode = rqmode;
253 dlm_add_ast(lkb, AST_BAST);
254 }
255}
256
257/*
258 * Basic operations on rsb's and lkb's
259 */
260
261static struct dlm_rsb *create_rsb(struct dlm_ls *ls, char *name, int len)
262{
263 struct dlm_rsb *r;
264
265 r = allocate_rsb(ls, len);
266 if (!r)
267 return NULL;
268
269 r->res_ls = ls;
270 r->res_length = len;
271 memcpy(r->res_name, name, len);
90135925 272 mutex_init(&r->res_mutex);
e7fd4179
DT
273
274 INIT_LIST_HEAD(&r->res_lookup);
275 INIT_LIST_HEAD(&r->res_grantqueue);
276 INIT_LIST_HEAD(&r->res_convertqueue);
277 INIT_LIST_HEAD(&r->res_waitqueue);
278 INIT_LIST_HEAD(&r->res_root_list);
279 INIT_LIST_HEAD(&r->res_recover_list);
280
281 return r;
282}
283
284static int search_rsb_list(struct list_head *head, char *name, int len,
285 unsigned int flags, struct dlm_rsb **r_ret)
286{
287 struct dlm_rsb *r;
288 int error = 0;
289
290 list_for_each_entry(r, head, res_hashchain) {
291 if (len == r->res_length && !memcmp(name, r->res_name, len))
292 goto found;
293 }
294 return -ENOENT;
295
296 found:
297 if (r->res_nodeid && (flags & R_MASTER))
298 error = -ENOTBLK;
299 *r_ret = r;
300 return error;
301}
302
303static int _search_rsb(struct dlm_ls *ls, char *name, int len, int b,
304 unsigned int flags, struct dlm_rsb **r_ret)
305{
306 struct dlm_rsb *r;
307 int error;
308
309 error = search_rsb_list(&ls->ls_rsbtbl[b].list, name, len, flags, &r);
310 if (!error) {
311 kref_get(&r->res_ref);
312 goto out;
313 }
314 error = search_rsb_list(&ls->ls_rsbtbl[b].toss, name, len, flags, &r);
315 if (error)
316 goto out;
317
318 list_move(&r->res_hashchain, &ls->ls_rsbtbl[b].list);
319
320 if (dlm_no_directory(ls))
321 goto out;
322
323 if (r->res_nodeid == -1) {
324 rsb_clear_flag(r, RSB_MASTER_UNCERTAIN);
325 r->res_first_lkid = 0;
326 } else if (r->res_nodeid > 0) {
327 rsb_set_flag(r, RSB_MASTER_UNCERTAIN);
328 r->res_first_lkid = 0;
329 } else {
330 DLM_ASSERT(r->res_nodeid == 0, dlm_print_rsb(r););
331 DLM_ASSERT(!rsb_flag(r, RSB_MASTER_UNCERTAIN),);
332 }
333 out:
334 *r_ret = r;
335 return error;
336}
337
338static int search_rsb(struct dlm_ls *ls, char *name, int len, int b,
339 unsigned int flags, struct dlm_rsb **r_ret)
340{
341 int error;
342 write_lock(&ls->ls_rsbtbl[b].lock);
343 error = _search_rsb(ls, name, len, b, flags, r_ret);
344 write_unlock(&ls->ls_rsbtbl[b].lock);
345 return error;
346}
347
348/*
349 * Find rsb in rsbtbl and potentially create/add one
350 *
351 * Delaying the release of rsb's has a similar benefit to applications keeping
352 * NL locks on an rsb, but without the guarantee that the cached master value
353 * will still be valid when the rsb is reused. Apps aren't always smart enough
354 * to keep NL locks on an rsb that they may lock again shortly; this can lead
355 * to excessive master lookups and removals if we don't delay the release.
356 *
357 * Searching for an rsb means looking through both the normal list and toss
358 * list. When found on the toss list the rsb is moved to the normal list with
359 * ref count of 1; when found on normal list the ref count is incremented.
360 */
361
362static int find_rsb(struct dlm_ls *ls, char *name, int namelen,
363 unsigned int flags, struct dlm_rsb **r_ret)
364{
365 struct dlm_rsb *r, *tmp;
366 uint32_t hash, bucket;
367 int error = 0;
368
369 if (dlm_no_directory(ls))
370 flags |= R_CREATE;
371
372 hash = jhash(name, namelen, 0);
373 bucket = hash & (ls->ls_rsbtbl_size - 1);
374
375 error = search_rsb(ls, name, namelen, bucket, flags, &r);
376 if (!error)
377 goto out;
378
379 if (error == -ENOENT && !(flags & R_CREATE))
380 goto out;
381
382 /* the rsb was found but wasn't a master copy */
383 if (error == -ENOTBLK)
384 goto out;
385
386 error = -ENOMEM;
387 r = create_rsb(ls, name, namelen);
388 if (!r)
389 goto out;
390
391 r->res_hash = hash;
392 r->res_bucket = bucket;
393 r->res_nodeid = -1;
394 kref_init(&r->res_ref);
395
396 /* With no directory, the master can be set immediately */
397 if (dlm_no_directory(ls)) {
398 int nodeid = dlm_dir_nodeid(r);
399 if (nodeid == dlm_our_nodeid())
400 nodeid = 0;
401 r->res_nodeid = nodeid;
402 }
403
404 write_lock(&ls->ls_rsbtbl[bucket].lock);
405 error = _search_rsb(ls, name, namelen, bucket, 0, &tmp);
406 if (!error) {
407 write_unlock(&ls->ls_rsbtbl[bucket].lock);
408 free_rsb(r);
409 r = tmp;
410 goto out;
411 }
412 list_add(&r->res_hashchain, &ls->ls_rsbtbl[bucket].list);
413 write_unlock(&ls->ls_rsbtbl[bucket].lock);
414 error = 0;
415 out:
416 *r_ret = r;
417 return error;
418}
419
420int dlm_find_rsb(struct dlm_ls *ls, char *name, int namelen,
421 unsigned int flags, struct dlm_rsb **r_ret)
422{
423 return find_rsb(ls, name, namelen, flags, r_ret);
424}
425
426/* This is only called to add a reference when the code already holds
427 a valid reference to the rsb, so there's no need for locking. */
428
429static inline void hold_rsb(struct dlm_rsb *r)
430{
431 kref_get(&r->res_ref);
432}
433
434void dlm_hold_rsb(struct dlm_rsb *r)
435{
436 hold_rsb(r);
437}
438
439static void toss_rsb(struct kref *kref)
440{
441 struct dlm_rsb *r = container_of(kref, struct dlm_rsb, res_ref);
442 struct dlm_ls *ls = r->res_ls;
443
444 DLM_ASSERT(list_empty(&r->res_root_list), dlm_print_rsb(r););
445 kref_init(&r->res_ref);
446 list_move(&r->res_hashchain, &ls->ls_rsbtbl[r->res_bucket].toss);
447 r->res_toss_time = jiffies;
448 if (r->res_lvbptr) {
449 free_lvb(r->res_lvbptr);
450 r->res_lvbptr = NULL;
451 }
452}
453
454/* When all references to the rsb are gone it's transfered to
455 the tossed list for later disposal. */
456
457static void put_rsb(struct dlm_rsb *r)
458{
459 struct dlm_ls *ls = r->res_ls;
460 uint32_t bucket = r->res_bucket;
461
462 write_lock(&ls->ls_rsbtbl[bucket].lock);
463 kref_put(&r->res_ref, toss_rsb);
464 write_unlock(&ls->ls_rsbtbl[bucket].lock);
465}
466
467void dlm_put_rsb(struct dlm_rsb *r)
468{
469 put_rsb(r);
470}
471
472/* See comment for unhold_lkb */
473
474static void unhold_rsb(struct dlm_rsb *r)
475{
476 int rv;
477 rv = kref_put(&r->res_ref, toss_rsb);
478 DLM_ASSERT(!rv, dlm_print_rsb(r););
479}
480
481static void kill_rsb(struct kref *kref)
482{
483 struct dlm_rsb *r = container_of(kref, struct dlm_rsb, res_ref);
484
485 /* All work is done after the return from kref_put() so we
486 can release the write_lock before the remove and free. */
487
488 DLM_ASSERT(list_empty(&r->res_lookup),);
489 DLM_ASSERT(list_empty(&r->res_grantqueue),);
490 DLM_ASSERT(list_empty(&r->res_convertqueue),);
491 DLM_ASSERT(list_empty(&r->res_waitqueue),);
492 DLM_ASSERT(list_empty(&r->res_root_list),);
493 DLM_ASSERT(list_empty(&r->res_recover_list),);
494}
495
496/* Attaching/detaching lkb's from rsb's is for rsb reference counting.
497 The rsb must exist as long as any lkb's for it do. */
498
499static void attach_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb)
500{
501 hold_rsb(r);
502 lkb->lkb_resource = r;
503}
504
505static void detach_lkb(struct dlm_lkb *lkb)
506{
507 if (lkb->lkb_resource) {
508 put_rsb(lkb->lkb_resource);
509 lkb->lkb_resource = NULL;
510 }
511}
512
513static int create_lkb(struct dlm_ls *ls, struct dlm_lkb **lkb_ret)
514{
515 struct dlm_lkb *lkb, *tmp;
516 uint32_t lkid = 0;
517 uint16_t bucket;
518
519 lkb = allocate_lkb(ls);
520 if (!lkb)
521 return -ENOMEM;
522
523 lkb->lkb_nodeid = -1;
524 lkb->lkb_grmode = DLM_LOCK_IV;
525 kref_init(&lkb->lkb_ref);
526
527 get_random_bytes(&bucket, sizeof(bucket));
528 bucket &= (ls->ls_lkbtbl_size - 1);
529
530 write_lock(&ls->ls_lkbtbl[bucket].lock);
531
532 /* counter can roll over so we must verify lkid is not in use */
533
534 while (lkid == 0) {
535 lkid = bucket | (ls->ls_lkbtbl[bucket].counter++ << 16);
536
537 list_for_each_entry(tmp, &ls->ls_lkbtbl[bucket].list,
538 lkb_idtbl_list) {
539 if (tmp->lkb_id != lkid)
540 continue;
541 lkid = 0;
542 break;
543 }
544 }
545
546 lkb->lkb_id = lkid;
547 list_add(&lkb->lkb_idtbl_list, &ls->ls_lkbtbl[bucket].list);
548 write_unlock(&ls->ls_lkbtbl[bucket].lock);
549
550 *lkb_ret = lkb;
551 return 0;
552}
553
554static struct dlm_lkb *__find_lkb(struct dlm_ls *ls, uint32_t lkid)
555{
556 uint16_t bucket = lkid & 0xFFFF;
557 struct dlm_lkb *lkb;
558
559 list_for_each_entry(lkb, &ls->ls_lkbtbl[bucket].list, lkb_idtbl_list) {
560 if (lkb->lkb_id == lkid)
561 return lkb;
562 }
563 return NULL;
564}
565
566static int find_lkb(struct dlm_ls *ls, uint32_t lkid, struct dlm_lkb **lkb_ret)
567{
568 struct dlm_lkb *lkb;
569 uint16_t bucket = lkid & 0xFFFF;
570
571 if (bucket >= ls->ls_lkbtbl_size)
572 return -EBADSLT;
573
574 read_lock(&ls->ls_lkbtbl[bucket].lock);
575 lkb = __find_lkb(ls, lkid);
576 if (lkb)
577 kref_get(&lkb->lkb_ref);
578 read_unlock(&ls->ls_lkbtbl[bucket].lock);
579
580 *lkb_ret = lkb;
581 return lkb ? 0 : -ENOENT;
582}
583
584static void kill_lkb(struct kref *kref)
585{
586 struct dlm_lkb *lkb = container_of(kref, struct dlm_lkb, lkb_ref);
587
588 /* All work is done after the return from kref_put() so we
589 can release the write_lock before the detach_lkb */
590
591 DLM_ASSERT(!lkb->lkb_status, dlm_print_lkb(lkb););
592}
593
594static int put_lkb(struct dlm_lkb *lkb)
595{
596 struct dlm_ls *ls = lkb->lkb_resource->res_ls;
597 uint16_t bucket = lkb->lkb_id & 0xFFFF;
598
599 write_lock(&ls->ls_lkbtbl[bucket].lock);
600 if (kref_put(&lkb->lkb_ref, kill_lkb)) {
601 list_del(&lkb->lkb_idtbl_list);
602 write_unlock(&ls->ls_lkbtbl[bucket].lock);
603
604 detach_lkb(lkb);
605
606 /* for local/process lkbs, lvbptr points to caller's lksb */
607 if (lkb->lkb_lvbptr && is_master_copy(lkb))
608 free_lvb(lkb->lkb_lvbptr);
e7fd4179
DT
609 free_lkb(lkb);
610 return 1;
611 } else {
612 write_unlock(&ls->ls_lkbtbl[bucket].lock);
613 return 0;
614 }
615}
616
617int dlm_put_lkb(struct dlm_lkb *lkb)
618{
619 return put_lkb(lkb);
620}
621
622/* This is only called to add a reference when the code already holds
623 a valid reference to the lkb, so there's no need for locking. */
624
625static inline void hold_lkb(struct dlm_lkb *lkb)
626{
627 kref_get(&lkb->lkb_ref);
628}
629
630/* This is called when we need to remove a reference and are certain
631 it's not the last ref. e.g. del_lkb is always called between a
632 find_lkb/put_lkb and is always the inverse of a previous add_lkb.
633 put_lkb would work fine, but would involve unnecessary locking */
634
635static inline void unhold_lkb(struct dlm_lkb *lkb)
636{
637 int rv;
638 rv = kref_put(&lkb->lkb_ref, kill_lkb);
639 DLM_ASSERT(!rv, dlm_print_lkb(lkb););
640}
641
642static void lkb_add_ordered(struct list_head *new, struct list_head *head,
643 int mode)
644{
645 struct dlm_lkb *lkb = NULL;
646
647 list_for_each_entry(lkb, head, lkb_statequeue)
648 if (lkb->lkb_rqmode < mode)
649 break;
650
651 if (!lkb)
652 list_add_tail(new, head);
653 else
654 __list_add(new, lkb->lkb_statequeue.prev, &lkb->lkb_statequeue);
655}
656
657/* add/remove lkb to rsb's grant/convert/wait queue */
658
659static void add_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb, int status)
660{
661 kref_get(&lkb->lkb_ref);
662
663 DLM_ASSERT(!lkb->lkb_status, dlm_print_lkb(lkb););
664
665 lkb->lkb_status = status;
666
667 switch (status) {
668 case DLM_LKSTS_WAITING:
669 if (lkb->lkb_exflags & DLM_LKF_HEADQUE)
670 list_add(&lkb->lkb_statequeue, &r->res_waitqueue);
671 else
672 list_add_tail(&lkb->lkb_statequeue, &r->res_waitqueue);
673 break;
674 case DLM_LKSTS_GRANTED:
675 /* convention says granted locks kept in order of grmode */
676 lkb_add_ordered(&lkb->lkb_statequeue, &r->res_grantqueue,
677 lkb->lkb_grmode);
678 break;
679 case DLM_LKSTS_CONVERT:
680 if (lkb->lkb_exflags & DLM_LKF_HEADQUE)
681 list_add(&lkb->lkb_statequeue, &r->res_convertqueue);
682 else
683 list_add_tail(&lkb->lkb_statequeue,
684 &r->res_convertqueue);
685 break;
686 default:
687 DLM_ASSERT(0, dlm_print_lkb(lkb); printk("sts=%d\n", status););
688 }
689}
690
691static void del_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb)
692{
693 lkb->lkb_status = 0;
694 list_del(&lkb->lkb_statequeue);
695 unhold_lkb(lkb);
696}
697
698static void move_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb, int sts)
699{
700 hold_lkb(lkb);
701 del_lkb(r, lkb);
702 add_lkb(r, lkb, sts);
703 unhold_lkb(lkb);
704}
705
706/* add/remove lkb from global waiters list of lkb's waiting for
707 a reply from a remote node */
708
709static void add_to_waiters(struct dlm_lkb *lkb, int mstype)
710{
711 struct dlm_ls *ls = lkb->lkb_resource->res_ls;
712
90135925 713 mutex_lock(&ls->ls_waiters_mutex);
e7fd4179
DT
714 if (lkb->lkb_wait_type) {
715 log_print("add_to_waiters error %d", lkb->lkb_wait_type);
716 goto out;
717 }
718 lkb->lkb_wait_type = mstype;
719 kref_get(&lkb->lkb_ref);
720 list_add(&lkb->lkb_wait_reply, &ls->ls_waiters);
721 out:
90135925 722 mutex_unlock(&ls->ls_waiters_mutex);
e7fd4179
DT
723}
724
725static int _remove_from_waiters(struct dlm_lkb *lkb)
726{
727 int error = 0;
728
729 if (!lkb->lkb_wait_type) {
730 log_print("remove_from_waiters error");
731 error = -EINVAL;
732 goto out;
733 }
734 lkb->lkb_wait_type = 0;
735 list_del(&lkb->lkb_wait_reply);
736 unhold_lkb(lkb);
737 out:
738 return error;
739}
740
741static int remove_from_waiters(struct dlm_lkb *lkb)
742{
743 struct dlm_ls *ls = lkb->lkb_resource->res_ls;
744 int error;
745
90135925 746 mutex_lock(&ls->ls_waiters_mutex);
e7fd4179 747 error = _remove_from_waiters(lkb);
90135925 748 mutex_unlock(&ls->ls_waiters_mutex);
e7fd4179
DT
749 return error;
750}
751
752static void dir_remove(struct dlm_rsb *r)
753{
754 int to_nodeid;
755
756 if (dlm_no_directory(r->res_ls))
757 return;
758
759 to_nodeid = dlm_dir_nodeid(r);
760 if (to_nodeid != dlm_our_nodeid())
761 send_remove(r);
762 else
763 dlm_dir_remove_entry(r->res_ls, to_nodeid,
764 r->res_name, r->res_length);
765}
766
767/* FIXME: shouldn't this be able to exit as soon as one non-due rsb is
768 found since they are in order of newest to oldest? */
769
770static int shrink_bucket(struct dlm_ls *ls, int b)
771{
772 struct dlm_rsb *r;
773 int count = 0, found;
774
775 for (;;) {
90135925 776 found = 0;
e7fd4179
DT
777 write_lock(&ls->ls_rsbtbl[b].lock);
778 list_for_each_entry_reverse(r, &ls->ls_rsbtbl[b].toss,
779 res_hashchain) {
780 if (!time_after_eq(jiffies, r->res_toss_time +
781 dlm_config.toss_secs * HZ))
782 continue;
90135925 783 found = 1;
e7fd4179
DT
784 break;
785 }
786
787 if (!found) {
788 write_unlock(&ls->ls_rsbtbl[b].lock);
789 break;
790 }
791
792 if (kref_put(&r->res_ref, kill_rsb)) {
793 list_del(&r->res_hashchain);
794 write_unlock(&ls->ls_rsbtbl[b].lock);
795
796 if (is_master(r))
797 dir_remove(r);
798 free_rsb(r);
799 count++;
800 } else {
801 write_unlock(&ls->ls_rsbtbl[b].lock);
802 log_error(ls, "tossed rsb in use %s", r->res_name);
803 }
804 }
805
806 return count;
807}
808
809void dlm_scan_rsbs(struct dlm_ls *ls)
810{
811 int i;
812
813 if (dlm_locking_stopped(ls))
814 return;
815
816 for (i = 0; i < ls->ls_rsbtbl_size; i++) {
817 shrink_bucket(ls, i);
818 cond_resched();
819 }
820}
821
822/* lkb is master or local copy */
823
824static void set_lvb_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
825{
826 int b, len = r->res_ls->ls_lvblen;
827
828 /* b=1 lvb returned to caller
829 b=0 lvb written to rsb or invalidated
830 b=-1 do nothing */
831
832 b = dlm_lvb_operations[lkb->lkb_grmode + 1][lkb->lkb_rqmode + 1];
833
834 if (b == 1) {
835 if (!lkb->lkb_lvbptr)
836 return;
837
838 if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
839 return;
840
841 if (!r->res_lvbptr)
842 return;
843
844 memcpy(lkb->lkb_lvbptr, r->res_lvbptr, len);
845 lkb->lkb_lvbseq = r->res_lvbseq;
846
847 } else if (b == 0) {
848 if (lkb->lkb_exflags & DLM_LKF_IVVALBLK) {
849 rsb_set_flag(r, RSB_VALNOTVALID);
850 return;
851 }
852
853 if (!lkb->lkb_lvbptr)
854 return;
855
856 if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
857 return;
858
859 if (!r->res_lvbptr)
860 r->res_lvbptr = allocate_lvb(r->res_ls);
861
862 if (!r->res_lvbptr)
863 return;
864
865 memcpy(r->res_lvbptr, lkb->lkb_lvbptr, len);
866 r->res_lvbseq++;
867 lkb->lkb_lvbseq = r->res_lvbseq;
868 rsb_clear_flag(r, RSB_VALNOTVALID);
869 }
870
871 if (rsb_flag(r, RSB_VALNOTVALID))
872 lkb->lkb_sbflags |= DLM_SBF_VALNOTVALID;
873}
874
875static void set_lvb_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb)
876{
877 if (lkb->lkb_grmode < DLM_LOCK_PW)
878 return;
879
880 if (lkb->lkb_exflags & DLM_LKF_IVVALBLK) {
881 rsb_set_flag(r, RSB_VALNOTVALID);
882 return;
883 }
884
885 if (!lkb->lkb_lvbptr)
886 return;
887
888 if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
889 return;
890
891 if (!r->res_lvbptr)
892 r->res_lvbptr = allocate_lvb(r->res_ls);
893
894 if (!r->res_lvbptr)
895 return;
896
897 memcpy(r->res_lvbptr, lkb->lkb_lvbptr, r->res_ls->ls_lvblen);
898 r->res_lvbseq++;
899 rsb_clear_flag(r, RSB_VALNOTVALID);
900}
901
902/* lkb is process copy (pc) */
903
904static void set_lvb_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb,
905 struct dlm_message *ms)
906{
907 int b;
908
909 if (!lkb->lkb_lvbptr)
910 return;
911
912 if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
913 return;
914
915 b = dlm_lvb_operations[lkb->lkb_grmode + 1][lkb->lkb_rqmode + 1];
916 if (b == 1) {
917 int len = receive_extralen(ms);
918 memcpy(lkb->lkb_lvbptr, ms->m_extra, len);
919 lkb->lkb_lvbseq = ms->m_lvbseq;
920 }
921}
922
923/* Manipulate lkb's on rsb's convert/granted/waiting queues
924 remove_lock -- used for unlock, removes lkb from granted
925 revert_lock -- used for cancel, moves lkb from convert to granted
926 grant_lock -- used for request and convert, adds lkb to granted or
927 moves lkb from convert or waiting to granted
928
929 Each of these is used for master or local copy lkb's. There is
930 also a _pc() variation used to make the corresponding change on
931 a process copy (pc) lkb. */
932
933static void _remove_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
934{
935 del_lkb(r, lkb);
936 lkb->lkb_grmode = DLM_LOCK_IV;
937 /* this unhold undoes the original ref from create_lkb()
938 so this leads to the lkb being freed */
939 unhold_lkb(lkb);
940}
941
942static void remove_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
943{
944 set_lvb_unlock(r, lkb);
945 _remove_lock(r, lkb);
946}
947
948static void remove_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb)
949{
950 _remove_lock(r, lkb);
951}
952
953static void revert_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
954{
955 lkb->lkb_rqmode = DLM_LOCK_IV;
956
957 switch (lkb->lkb_status) {
958 case DLM_LKSTS_CONVERT:
959 move_lkb(r, lkb, DLM_LKSTS_GRANTED);
960 break;
961 case DLM_LKSTS_WAITING:
962 del_lkb(r, lkb);
963 lkb->lkb_grmode = DLM_LOCK_IV;
964 /* this unhold undoes the original ref from create_lkb()
965 so this leads to the lkb being freed */
966 unhold_lkb(lkb);
967 break;
968 default:
969 log_print("invalid status for revert %d", lkb->lkb_status);
970 }
971}
972
973static void revert_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb)
974{
975 revert_lock(r, lkb);
976}
977
978static void _grant_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
979{
980 if (lkb->lkb_grmode != lkb->lkb_rqmode) {
981 lkb->lkb_grmode = lkb->lkb_rqmode;
982 if (lkb->lkb_status)
983 move_lkb(r, lkb, DLM_LKSTS_GRANTED);
984 else
985 add_lkb(r, lkb, DLM_LKSTS_GRANTED);
986 }
987
988 lkb->lkb_rqmode = DLM_LOCK_IV;
e7fd4179
DT
989}
990
991static void grant_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
992{
993 set_lvb_lock(r, lkb);
994 _grant_lock(r, lkb);
995 lkb->lkb_highbast = 0;
996}
997
998static void grant_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb,
999 struct dlm_message *ms)
1000{
1001 set_lvb_lock_pc(r, lkb, ms);
1002 _grant_lock(r, lkb);
1003}
1004
1005/* called by grant_pending_locks() which means an async grant message must
1006 be sent to the requesting node in addition to granting the lock if the
1007 lkb belongs to a remote node. */
1008
1009static void grant_lock_pending(struct dlm_rsb *r, struct dlm_lkb *lkb)
1010{
1011 grant_lock(r, lkb);
1012 if (is_master_copy(lkb))
1013 send_grant(r, lkb);
1014 else
1015 queue_cast(r, lkb, 0);
1016}
1017
1018static inline int first_in_list(struct dlm_lkb *lkb, struct list_head *head)
1019{
1020 struct dlm_lkb *first = list_entry(head->next, struct dlm_lkb,
1021 lkb_statequeue);
1022 if (lkb->lkb_id == first->lkb_id)
90135925 1023 return 1;
e7fd4179 1024
90135925 1025 return 0;
e7fd4179
DT
1026}
1027
e7fd4179
DT
1028/* Check if the given lkb conflicts with another lkb on the queue. */
1029
1030static int queue_conflict(struct list_head *head, struct dlm_lkb *lkb)
1031{
1032 struct dlm_lkb *this;
1033
1034 list_for_each_entry(this, head, lkb_statequeue) {
1035 if (this == lkb)
1036 continue;
3bcd3687 1037 if (!modes_compat(this, lkb))
90135925 1038 return 1;
e7fd4179 1039 }
90135925 1040 return 0;
e7fd4179
DT
1041}
1042
1043/*
1044 * "A conversion deadlock arises with a pair of lock requests in the converting
1045 * queue for one resource. The granted mode of each lock blocks the requested
1046 * mode of the other lock."
1047 *
1048 * Part 2: if the granted mode of lkb is preventing the first lkb in the
1049 * convert queue from being granted, then demote lkb (set grmode to NL).
1050 * This second form requires that we check for conv-deadlk even when
1051 * now == 0 in _can_be_granted().
1052 *
1053 * Example:
1054 * Granted Queue: empty
1055 * Convert Queue: NL->EX (first lock)
1056 * PR->EX (second lock)
1057 *
1058 * The first lock can't be granted because of the granted mode of the second
1059 * lock and the second lock can't be granted because it's not first in the
1060 * list. We demote the granted mode of the second lock (the lkb passed to this
1061 * function).
1062 *
1063 * After the resolution, the "grant pending" function needs to go back and try
1064 * to grant locks on the convert queue again since the first lock can now be
1065 * granted.
1066 */
1067
1068static int conversion_deadlock_detect(struct dlm_rsb *rsb, struct dlm_lkb *lkb)
1069{
1070 struct dlm_lkb *this, *first = NULL, *self = NULL;
1071
1072 list_for_each_entry(this, &rsb->res_convertqueue, lkb_statequeue) {
1073 if (!first)
1074 first = this;
1075 if (this == lkb) {
1076 self = lkb;
1077 continue;
1078 }
1079
e7fd4179 1080 if (!modes_compat(this, lkb) && !modes_compat(lkb, this))
90135925 1081 return 1;
e7fd4179
DT
1082 }
1083
1084 /* if lkb is on the convert queue and is preventing the first
1085 from being granted, then there's deadlock and we demote lkb.
1086 multiple converting locks may need to do this before the first
1087 converting lock can be granted. */
1088
1089 if (self && self != first) {
1090 if (!modes_compat(lkb, first) &&
1091 !queue_conflict(&rsb->res_grantqueue, first))
90135925 1092 return 1;
e7fd4179
DT
1093 }
1094
90135925 1095 return 0;
e7fd4179
DT
1096}
1097
1098/*
1099 * Return 1 if the lock can be granted, 0 otherwise.
1100 * Also detect and resolve conversion deadlocks.
1101 *
1102 * lkb is the lock to be granted
1103 *
1104 * now is 1 if the function is being called in the context of the
1105 * immediate request, it is 0 if called later, after the lock has been
1106 * queued.
1107 *
1108 * References are from chapter 6 of "VAXcluster Principles" by Roy Davis
1109 */
1110
1111static int _can_be_granted(struct dlm_rsb *r, struct dlm_lkb *lkb, int now)
1112{
1113 int8_t conv = (lkb->lkb_grmode != DLM_LOCK_IV);
1114
1115 /*
1116 * 6-10: Version 5.4 introduced an option to address the phenomenon of
1117 * a new request for a NL mode lock being blocked.
1118 *
1119 * 6-11: If the optional EXPEDITE flag is used with the new NL mode
1120 * request, then it would be granted. In essence, the use of this flag
1121 * tells the Lock Manager to expedite theis request by not considering
1122 * what may be in the CONVERTING or WAITING queues... As of this
1123 * writing, the EXPEDITE flag can be used only with new requests for NL
1124 * mode locks. This flag is not valid for conversion requests.
1125 *
1126 * A shortcut. Earlier checks return an error if EXPEDITE is used in a
1127 * conversion or used with a non-NL requested mode. We also know an
1128 * EXPEDITE request is always granted immediately, so now must always
1129 * be 1. The full condition to grant an expedite request: (now &&
1130 * !conv && lkb->rqmode == DLM_LOCK_NL && (flags & EXPEDITE)) can
1131 * therefore be shortened to just checking the flag.
1132 */
1133
1134 if (lkb->lkb_exflags & DLM_LKF_EXPEDITE)
90135925 1135 return 1;
e7fd4179
DT
1136
1137 /*
1138 * A shortcut. Without this, !queue_conflict(grantqueue, lkb) would be
1139 * added to the remaining conditions.
1140 */
1141
1142 if (queue_conflict(&r->res_grantqueue, lkb))
1143 goto out;
1144
1145 /*
1146 * 6-3: By default, a conversion request is immediately granted if the
1147 * requested mode is compatible with the modes of all other granted
1148 * locks
1149 */
1150
1151 if (queue_conflict(&r->res_convertqueue, lkb))
1152 goto out;
1153
1154 /*
1155 * 6-5: But the default algorithm for deciding whether to grant or
1156 * queue conversion requests does not by itself guarantee that such
1157 * requests are serviced on a "first come first serve" basis. This, in
1158 * turn, can lead to a phenomenon known as "indefinate postponement".
1159 *
1160 * 6-7: This issue is dealt with by using the optional QUECVT flag with
1161 * the system service employed to request a lock conversion. This flag
1162 * forces certain conversion requests to be queued, even if they are
1163 * compatible with the granted modes of other locks on the same
1164 * resource. Thus, the use of this flag results in conversion requests
1165 * being ordered on a "first come first servce" basis.
1166 *
1167 * DCT: This condition is all about new conversions being able to occur
1168 * "in place" while the lock remains on the granted queue (assuming
1169 * nothing else conflicts.) IOW if QUECVT isn't set, a conversion
1170 * doesn't _have_ to go onto the convert queue where it's processed in
1171 * order. The "now" variable is necessary to distinguish converts
1172 * being received and processed for the first time now, because once a
1173 * convert is moved to the conversion queue the condition below applies
1174 * requiring fifo granting.
1175 */
1176
1177 if (now && conv && !(lkb->lkb_exflags & DLM_LKF_QUECVT))
90135925 1178 return 1;
e7fd4179
DT
1179
1180 /*
3bcd3687
DT
1181 * The NOORDER flag is set to avoid the standard vms rules on grant
1182 * order.
e7fd4179
DT
1183 */
1184
1185 if (lkb->lkb_exflags & DLM_LKF_NOORDER)
90135925 1186 return 1;
e7fd4179
DT
1187
1188 /*
1189 * 6-3: Once in that queue [CONVERTING], a conversion request cannot be
1190 * granted until all other conversion requests ahead of it are granted
1191 * and/or canceled.
1192 */
1193
1194 if (!now && conv && first_in_list(lkb, &r->res_convertqueue))
90135925 1195 return 1;
e7fd4179
DT
1196
1197 /*
1198 * 6-4: By default, a new request is immediately granted only if all
1199 * three of the following conditions are satisfied when the request is
1200 * issued:
1201 * - The queue of ungranted conversion requests for the resource is
1202 * empty.
1203 * - The queue of ungranted new requests for the resource is empty.
1204 * - The mode of the new request is compatible with the most
1205 * restrictive mode of all granted locks on the resource.
1206 */
1207
1208 if (now && !conv && list_empty(&r->res_convertqueue) &&
1209 list_empty(&r->res_waitqueue))
90135925 1210 return 1;
e7fd4179
DT
1211
1212 /*
1213 * 6-4: Once a lock request is in the queue of ungranted new requests,
1214 * it cannot be granted until the queue of ungranted conversion
1215 * requests is empty, all ungranted new requests ahead of it are
1216 * granted and/or canceled, and it is compatible with the granted mode
1217 * of the most restrictive lock granted on the resource.
1218 */
1219
1220 if (!now && !conv && list_empty(&r->res_convertqueue) &&
1221 first_in_list(lkb, &r->res_waitqueue))
90135925 1222 return 1;
e7fd4179
DT
1223
1224 out:
1225 /*
1226 * The following, enabled by CONVDEADLK, departs from VMS.
1227 */
1228
1229 if (conv && (lkb->lkb_exflags & DLM_LKF_CONVDEADLK) &&
1230 conversion_deadlock_detect(r, lkb)) {
1231 lkb->lkb_grmode = DLM_LOCK_NL;
1232 lkb->lkb_sbflags |= DLM_SBF_DEMOTED;
1233 }
1234
90135925 1235 return 0;
e7fd4179
DT
1236}
1237
1238/*
1239 * The ALTPR and ALTCW flags aren't traditional lock manager flags, but are a
1240 * simple way to provide a big optimization to applications that can use them.
1241 */
1242
1243static int can_be_granted(struct dlm_rsb *r, struct dlm_lkb *lkb, int now)
1244{
1245 uint32_t flags = lkb->lkb_exflags;
1246 int rv;
1247 int8_t alt = 0, rqmode = lkb->lkb_rqmode;
1248
1249 rv = _can_be_granted(r, lkb, now);
1250 if (rv)
1251 goto out;
1252
1253 if (lkb->lkb_sbflags & DLM_SBF_DEMOTED)
1254 goto out;
1255
1256 if (rqmode != DLM_LOCK_PR && flags & DLM_LKF_ALTPR)
1257 alt = DLM_LOCK_PR;
1258 else if (rqmode != DLM_LOCK_CW && flags & DLM_LKF_ALTCW)
1259 alt = DLM_LOCK_CW;
1260
1261 if (alt) {
1262 lkb->lkb_rqmode = alt;
1263 rv = _can_be_granted(r, lkb, now);
1264 if (rv)
1265 lkb->lkb_sbflags |= DLM_SBF_ALTMODE;
1266 else
1267 lkb->lkb_rqmode = rqmode;
1268 }
1269 out:
1270 return rv;
1271}
1272
1273static int grant_pending_convert(struct dlm_rsb *r, int high)
1274{
1275 struct dlm_lkb *lkb, *s;
1276 int hi, demoted, quit, grant_restart, demote_restart;
1277
1278 quit = 0;
1279 restart:
1280 grant_restart = 0;
1281 demote_restart = 0;
1282 hi = DLM_LOCK_IV;
1283
1284 list_for_each_entry_safe(lkb, s, &r->res_convertqueue, lkb_statequeue) {
1285 demoted = is_demoted(lkb);
90135925 1286 if (can_be_granted(r, lkb, 0)) {
e7fd4179
DT
1287 grant_lock_pending(r, lkb);
1288 grant_restart = 1;
1289 } else {
1290 hi = max_t(int, lkb->lkb_rqmode, hi);
1291 if (!demoted && is_demoted(lkb))
1292 demote_restart = 1;
1293 }
1294 }
1295
1296 if (grant_restart)
1297 goto restart;
1298 if (demote_restart && !quit) {
1299 quit = 1;
1300 goto restart;
1301 }
1302
1303 return max_t(int, high, hi);
1304}
1305
1306static int grant_pending_wait(struct dlm_rsb *r, int high)
1307{
1308 struct dlm_lkb *lkb, *s;
1309
1310 list_for_each_entry_safe(lkb, s, &r->res_waitqueue, lkb_statequeue) {
90135925 1311 if (can_be_granted(r, lkb, 0))
e7fd4179
DT
1312 grant_lock_pending(r, lkb);
1313 else
1314 high = max_t(int, lkb->lkb_rqmode, high);
1315 }
1316
1317 return high;
1318}
1319
1320static void grant_pending_locks(struct dlm_rsb *r)
1321{
1322 struct dlm_lkb *lkb, *s;
1323 int high = DLM_LOCK_IV;
1324
1325 DLM_ASSERT(is_master(r), dlm_print_rsb(r););
1326
1327 high = grant_pending_convert(r, high);
1328 high = grant_pending_wait(r, high);
1329
1330 if (high == DLM_LOCK_IV)
1331 return;
1332
1333 /*
1334 * If there are locks left on the wait/convert queue then send blocking
1335 * ASTs to granted locks based on the largest requested mode (high)
3bcd3687 1336 * found above. FIXME: highbast < high comparison not valid for PR/CW.
e7fd4179
DT
1337 */
1338
1339 list_for_each_entry_safe(lkb, s, &r->res_grantqueue, lkb_statequeue) {
1340 if (lkb->lkb_bastaddr && (lkb->lkb_highbast < high) &&
1341 !__dlm_compat_matrix[lkb->lkb_grmode+1][high+1]) {
1342 queue_bast(r, lkb, high);
1343 lkb->lkb_highbast = high;
1344 }
1345 }
1346}
1347
1348static void send_bast_queue(struct dlm_rsb *r, struct list_head *head,
1349 struct dlm_lkb *lkb)
1350{
1351 struct dlm_lkb *gr;
1352
1353 list_for_each_entry(gr, head, lkb_statequeue) {
1354 if (gr->lkb_bastaddr &&
1355 gr->lkb_highbast < lkb->lkb_rqmode &&
3bcd3687 1356 !modes_compat(gr, lkb)) {
e7fd4179
DT
1357 queue_bast(r, gr, lkb->lkb_rqmode);
1358 gr->lkb_highbast = lkb->lkb_rqmode;
1359 }
1360 }
1361}
1362
1363static void send_blocking_asts(struct dlm_rsb *r, struct dlm_lkb *lkb)
1364{
1365 send_bast_queue(r, &r->res_grantqueue, lkb);
1366}
1367
1368static void send_blocking_asts_all(struct dlm_rsb *r, struct dlm_lkb *lkb)
1369{
1370 send_bast_queue(r, &r->res_grantqueue, lkb);
1371 send_bast_queue(r, &r->res_convertqueue, lkb);
1372}
1373
1374/* set_master(r, lkb) -- set the master nodeid of a resource
1375
1376 The purpose of this function is to set the nodeid field in the given
1377 lkb using the nodeid field in the given rsb. If the rsb's nodeid is
1378 known, it can just be copied to the lkb and the function will return
1379 0. If the rsb's nodeid is _not_ known, it needs to be looked up
1380 before it can be copied to the lkb.
1381
1382 When the rsb nodeid is being looked up remotely, the initial lkb
1383 causing the lookup is kept on the ls_waiters list waiting for the
1384 lookup reply. Other lkb's waiting for the same rsb lookup are kept
1385 on the rsb's res_lookup list until the master is verified.
1386
1387 Return values:
1388 0: nodeid is set in rsb/lkb and the caller should go ahead and use it
1389 1: the rsb master is not available and the lkb has been placed on
1390 a wait queue
1391*/
1392
1393static int set_master(struct dlm_rsb *r, struct dlm_lkb *lkb)
1394{
1395 struct dlm_ls *ls = r->res_ls;
1396 int error, dir_nodeid, ret_nodeid, our_nodeid = dlm_our_nodeid();
1397
1398 if (rsb_flag(r, RSB_MASTER_UNCERTAIN)) {
1399 rsb_clear_flag(r, RSB_MASTER_UNCERTAIN);
1400 r->res_first_lkid = lkb->lkb_id;
1401 lkb->lkb_nodeid = r->res_nodeid;
1402 return 0;
1403 }
1404
1405 if (r->res_first_lkid && r->res_first_lkid != lkb->lkb_id) {
1406 list_add_tail(&lkb->lkb_rsb_lookup, &r->res_lookup);
1407 return 1;
1408 }
1409
1410 if (r->res_nodeid == 0) {
1411 lkb->lkb_nodeid = 0;
1412 return 0;
1413 }
1414
1415 if (r->res_nodeid > 0) {
1416 lkb->lkb_nodeid = r->res_nodeid;
1417 return 0;
1418 }
1419
1420 DLM_ASSERT(r->res_nodeid == -1, dlm_print_rsb(r););
1421
1422 dir_nodeid = dlm_dir_nodeid(r);
1423
1424 if (dir_nodeid != our_nodeid) {
1425 r->res_first_lkid = lkb->lkb_id;
1426 send_lookup(r, lkb);
1427 return 1;
1428 }
1429
1430 for (;;) {
1431 /* It's possible for dlm_scand to remove an old rsb for
1432 this same resource from the toss list, us to create
1433 a new one, look up the master locally, and find it
1434 already exists just before dlm_scand does the
1435 dir_remove() on the previous rsb. */
1436
1437 error = dlm_dir_lookup(ls, our_nodeid, r->res_name,
1438 r->res_length, &ret_nodeid);
1439 if (!error)
1440 break;
1441 log_debug(ls, "dir_lookup error %d %s", error, r->res_name);
1442 schedule();
1443 }
1444
1445 if (ret_nodeid == our_nodeid) {
1446 r->res_first_lkid = 0;
1447 r->res_nodeid = 0;
1448 lkb->lkb_nodeid = 0;
1449 } else {
1450 r->res_first_lkid = lkb->lkb_id;
1451 r->res_nodeid = ret_nodeid;
1452 lkb->lkb_nodeid = ret_nodeid;
1453 }
1454 return 0;
1455}
1456
1457static void process_lookup_list(struct dlm_rsb *r)
1458{
1459 struct dlm_lkb *lkb, *safe;
1460
1461 list_for_each_entry_safe(lkb, safe, &r->res_lookup, lkb_rsb_lookup) {
1462 list_del(&lkb->lkb_rsb_lookup);
1463 _request_lock(r, lkb);
1464 schedule();
1465 }
1466}
1467
1468/* confirm_master -- confirm (or deny) an rsb's master nodeid */
1469
1470static void confirm_master(struct dlm_rsb *r, int error)
1471{
1472 struct dlm_lkb *lkb;
1473
1474 if (!r->res_first_lkid)
1475 return;
1476
1477 switch (error) {
1478 case 0:
1479 case -EINPROGRESS:
1480 r->res_first_lkid = 0;
1481 process_lookup_list(r);
1482 break;
1483
1484 case -EAGAIN:
1485 /* the remote master didn't queue our NOQUEUE request;
1486 make a waiting lkb the first_lkid */
1487
1488 r->res_first_lkid = 0;
1489
1490 if (!list_empty(&r->res_lookup)) {
1491 lkb = list_entry(r->res_lookup.next, struct dlm_lkb,
1492 lkb_rsb_lookup);
1493 list_del(&lkb->lkb_rsb_lookup);
1494 r->res_first_lkid = lkb->lkb_id;
1495 _request_lock(r, lkb);
1496 } else
1497 r->res_nodeid = -1;
1498 break;
1499
1500 default:
1501 log_error(r->res_ls, "confirm_master unknown error %d", error);
1502 }
1503}
1504
1505static int set_lock_args(int mode, struct dlm_lksb *lksb, uint32_t flags,
1506 int namelen, uint32_t parent_lkid, void *ast,
3bcd3687 1507 void *astarg, void *bast, struct dlm_args *args)
e7fd4179
DT
1508{
1509 int rv = -EINVAL;
1510
1511 /* check for invalid arg usage */
1512
1513 if (mode < 0 || mode > DLM_LOCK_EX)
1514 goto out;
1515
1516 if (!(flags & DLM_LKF_CONVERT) && (namelen > DLM_RESNAME_MAXLEN))
1517 goto out;
1518
1519 if (flags & DLM_LKF_CANCEL)
1520 goto out;
1521
1522 if (flags & DLM_LKF_QUECVT && !(flags & DLM_LKF_CONVERT))
1523 goto out;
1524
1525 if (flags & DLM_LKF_CONVDEADLK && !(flags & DLM_LKF_CONVERT))
1526 goto out;
1527
1528 if (flags & DLM_LKF_CONVDEADLK && flags & DLM_LKF_NOQUEUE)
1529 goto out;
1530
1531 if (flags & DLM_LKF_EXPEDITE && flags & DLM_LKF_CONVERT)
1532 goto out;
1533
1534 if (flags & DLM_LKF_EXPEDITE && flags & DLM_LKF_QUECVT)
1535 goto out;
1536
1537 if (flags & DLM_LKF_EXPEDITE && flags & DLM_LKF_NOQUEUE)
1538 goto out;
1539
1540 if (flags & DLM_LKF_EXPEDITE && mode != DLM_LOCK_NL)
1541 goto out;
1542
1543 if (!ast || !lksb)
1544 goto out;
1545
1546 if (flags & DLM_LKF_VALBLK && !lksb->sb_lvbptr)
1547 goto out;
1548
1549 /* parent/child locks not yet supported */
1550 if (parent_lkid)
1551 goto out;
1552
1553 if (flags & DLM_LKF_CONVERT && !lksb->sb_lkid)
1554 goto out;
1555
1556 /* these args will be copied to the lkb in validate_lock_args,
1557 it cannot be done now because when converting locks, fields in
1558 an active lkb cannot be modified before locking the rsb */
1559
1560 args->flags = flags;
1561 args->astaddr = ast;
1562 args->astparam = (long) astarg;
1563 args->bastaddr = bast;
1564 args->mode = mode;
1565 args->lksb = lksb;
e7fd4179
DT
1566 rv = 0;
1567 out:
1568 return rv;
1569}
1570
1571static int set_unlock_args(uint32_t flags, void *astarg, struct dlm_args *args)
1572{
1573 if (flags & ~(DLM_LKF_CANCEL | DLM_LKF_VALBLK | DLM_LKF_IVVALBLK |
1574 DLM_LKF_FORCEUNLOCK))
1575 return -EINVAL;
1576
1577 args->flags = flags;
1578 args->astparam = (long) astarg;
1579 return 0;
1580}
1581
1582static int validate_lock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
1583 struct dlm_args *args)
1584{
1585 int rv = -EINVAL;
1586
1587 if (args->flags & DLM_LKF_CONVERT) {
1588 if (lkb->lkb_flags & DLM_IFL_MSTCPY)
1589 goto out;
1590
1591 if (args->flags & DLM_LKF_QUECVT &&
1592 !__quecvt_compat_matrix[lkb->lkb_grmode+1][args->mode+1])
1593 goto out;
1594
1595 rv = -EBUSY;
1596 if (lkb->lkb_status != DLM_LKSTS_GRANTED)
1597 goto out;
1598
1599 if (lkb->lkb_wait_type)
1600 goto out;
1601 }
1602
1603 lkb->lkb_exflags = args->flags;
1604 lkb->lkb_sbflags = 0;
1605 lkb->lkb_astaddr = args->astaddr;
1606 lkb->lkb_astparam = args->astparam;
1607 lkb->lkb_bastaddr = args->bastaddr;
1608 lkb->lkb_rqmode = args->mode;
1609 lkb->lkb_lksb = args->lksb;
1610 lkb->lkb_lvbptr = args->lksb->sb_lvbptr;
1611 lkb->lkb_ownpid = (int) current->pid;
e7fd4179
DT
1612 rv = 0;
1613 out:
1614 return rv;
1615}
1616
1617static int validate_unlock_args(struct dlm_lkb *lkb, struct dlm_args *args)
1618{
1619 int rv = -EINVAL;
1620
1621 if (lkb->lkb_flags & DLM_IFL_MSTCPY)
1622 goto out;
1623
1624 if (args->flags & DLM_LKF_FORCEUNLOCK)
1625 goto out_ok;
1626
1627 if (args->flags & DLM_LKF_CANCEL &&
1628 lkb->lkb_status == DLM_LKSTS_GRANTED)
1629 goto out;
1630
1631 if (!(args->flags & DLM_LKF_CANCEL) &&
1632 lkb->lkb_status != DLM_LKSTS_GRANTED)
1633 goto out;
1634
1635 rv = -EBUSY;
1636 if (lkb->lkb_wait_type)
1637 goto out;
1638
1639 out_ok:
1640 lkb->lkb_exflags = args->flags;
1641 lkb->lkb_sbflags = 0;
1642 lkb->lkb_astparam = args->astparam;
1643
1644 rv = 0;
1645 out:
1646 return rv;
1647}
1648
1649/*
1650 * Four stage 4 varieties:
1651 * do_request(), do_convert(), do_unlock(), do_cancel()
1652 * These are called on the master node for the given lock and
1653 * from the central locking logic.
1654 */
1655
1656static int do_request(struct dlm_rsb *r, struct dlm_lkb *lkb)
1657{
1658 int error = 0;
1659
90135925 1660 if (can_be_granted(r, lkb, 1)) {
e7fd4179
DT
1661 grant_lock(r, lkb);
1662 queue_cast(r, lkb, 0);
1663 goto out;
1664 }
1665
1666 if (can_be_queued(lkb)) {
1667 error = -EINPROGRESS;
1668 add_lkb(r, lkb, DLM_LKSTS_WAITING);
1669 send_blocking_asts(r, lkb);
1670 goto out;
1671 }
1672
1673 error = -EAGAIN;
1674 if (force_blocking_asts(lkb))
1675 send_blocking_asts_all(r, lkb);
1676 queue_cast(r, lkb, -EAGAIN);
1677
1678 out:
1679 return error;
1680}
1681
1682static int do_convert(struct dlm_rsb *r, struct dlm_lkb *lkb)
1683{
1684 int error = 0;
1685
1686 /* changing an existing lock may allow others to be granted */
1687
90135925 1688 if (can_be_granted(r, lkb, 1)) {
e7fd4179
DT
1689 grant_lock(r, lkb);
1690 queue_cast(r, lkb, 0);
1691 grant_pending_locks(r);
1692 goto out;
1693 }
1694
1695 if (can_be_queued(lkb)) {
1696 if (is_demoted(lkb))
1697 grant_pending_locks(r);
1698 error = -EINPROGRESS;
1699 del_lkb(r, lkb);
1700 add_lkb(r, lkb, DLM_LKSTS_CONVERT);
1701 send_blocking_asts(r, lkb);
1702 goto out;
1703 }
1704
1705 error = -EAGAIN;
1706 if (force_blocking_asts(lkb))
1707 send_blocking_asts_all(r, lkb);
1708 queue_cast(r, lkb, -EAGAIN);
1709
1710 out:
1711 return error;
1712}
1713
1714static int do_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1715{
1716 remove_lock(r, lkb);
1717 queue_cast(r, lkb, -DLM_EUNLOCK);
1718 grant_pending_locks(r);
1719 return -DLM_EUNLOCK;
1720}
1721
1722static int do_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb)
1723{
1724 revert_lock(r, lkb);
1725 queue_cast(r, lkb, -DLM_ECANCEL);
1726 grant_pending_locks(r);
1727 return -DLM_ECANCEL;
1728}
1729
1730/*
1731 * Four stage 3 varieties:
1732 * _request_lock(), _convert_lock(), _unlock_lock(), _cancel_lock()
1733 */
1734
1735/* add a new lkb to a possibly new rsb, called by requesting process */
1736
1737static int _request_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1738{
1739 int error;
1740
1741 /* set_master: sets lkb nodeid from r */
1742
1743 error = set_master(r, lkb);
1744 if (error < 0)
1745 goto out;
1746 if (error) {
1747 error = 0;
1748 goto out;
1749 }
1750
1751 if (is_remote(r))
1752 /* receive_request() calls do_request() on remote node */
1753 error = send_request(r, lkb);
1754 else
1755 error = do_request(r, lkb);
1756 out:
1757 return error;
1758}
1759
3bcd3687 1760/* change some property of an existing lkb, e.g. mode */
e7fd4179
DT
1761
1762static int _convert_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1763{
1764 int error;
1765
1766 if (is_remote(r))
1767 /* receive_convert() calls do_convert() on remote node */
1768 error = send_convert(r, lkb);
1769 else
1770 error = do_convert(r, lkb);
1771
1772 return error;
1773}
1774
1775/* remove an existing lkb from the granted queue */
1776
1777static int _unlock_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1778{
1779 int error;
1780
1781 if (is_remote(r))
1782 /* receive_unlock() calls do_unlock() on remote node */
1783 error = send_unlock(r, lkb);
1784 else
1785 error = do_unlock(r, lkb);
1786
1787 return error;
1788}
1789
1790/* remove an existing lkb from the convert or wait queue */
1791
1792static int _cancel_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1793{
1794 int error;
1795
1796 if (is_remote(r))
1797 /* receive_cancel() calls do_cancel() on remote node */
1798 error = send_cancel(r, lkb);
1799 else
1800 error = do_cancel(r, lkb);
1801
1802 return error;
1803}
1804
1805/*
1806 * Four stage 2 varieties:
1807 * request_lock(), convert_lock(), unlock_lock(), cancel_lock()
1808 */
1809
1810static int request_lock(struct dlm_ls *ls, struct dlm_lkb *lkb, char *name,
1811 int len, struct dlm_args *args)
1812{
1813 struct dlm_rsb *r;
1814 int error;
1815
1816 error = validate_lock_args(ls, lkb, args);
1817 if (error)
1818 goto out;
1819
1820 error = find_rsb(ls, name, len, R_CREATE, &r);
1821 if (error)
1822 goto out;
1823
1824 lock_rsb(r);
1825
1826 attach_lkb(r, lkb);
1827 lkb->lkb_lksb->sb_lkid = lkb->lkb_id;
1828
1829 error = _request_lock(r, lkb);
1830
1831 unlock_rsb(r);
1832 put_rsb(r);
1833
1834 out:
1835 return error;
1836}
1837
1838static int convert_lock(struct dlm_ls *ls, struct dlm_lkb *lkb,
1839 struct dlm_args *args)
1840{
1841 struct dlm_rsb *r;
1842 int error;
1843
1844 r = lkb->lkb_resource;
1845
1846 hold_rsb(r);
1847 lock_rsb(r);
1848
1849 error = validate_lock_args(ls, lkb, args);
1850 if (error)
1851 goto out;
1852
1853 error = _convert_lock(r, lkb);
1854 out:
1855 unlock_rsb(r);
1856 put_rsb(r);
1857 return error;
1858}
1859
1860static int unlock_lock(struct dlm_ls *ls, struct dlm_lkb *lkb,
1861 struct dlm_args *args)
1862{
1863 struct dlm_rsb *r;
1864 int error;
1865
1866 r = lkb->lkb_resource;
1867
1868 hold_rsb(r);
1869 lock_rsb(r);
1870
1871 error = validate_unlock_args(lkb, args);
1872 if (error)
1873 goto out;
1874
1875 error = _unlock_lock(r, lkb);
1876 out:
1877 unlock_rsb(r);
1878 put_rsb(r);
1879 return error;
1880}
1881
1882static int cancel_lock(struct dlm_ls *ls, struct dlm_lkb *lkb,
1883 struct dlm_args *args)
1884{
1885 struct dlm_rsb *r;
1886 int error;
1887
1888 r = lkb->lkb_resource;
1889
1890 hold_rsb(r);
1891 lock_rsb(r);
1892
1893 error = validate_unlock_args(lkb, args);
1894 if (error)
1895 goto out;
1896
1897 error = _cancel_lock(r, lkb);
1898 out:
1899 unlock_rsb(r);
1900 put_rsb(r);
1901 return error;
1902}
1903
1904/*
1905 * Two stage 1 varieties: dlm_lock() and dlm_unlock()
1906 */
1907
1908int dlm_lock(dlm_lockspace_t *lockspace,
1909 int mode,
1910 struct dlm_lksb *lksb,
1911 uint32_t flags,
1912 void *name,
1913 unsigned int namelen,
1914 uint32_t parent_lkid,
1915 void (*ast) (void *astarg),
1916 void *astarg,
3bcd3687 1917 void (*bast) (void *astarg, int mode))
e7fd4179
DT
1918{
1919 struct dlm_ls *ls;
1920 struct dlm_lkb *lkb;
1921 struct dlm_args args;
1922 int error, convert = flags & DLM_LKF_CONVERT;
1923
1924 ls = dlm_find_lockspace_local(lockspace);
1925 if (!ls)
1926 return -EINVAL;
1927
1928 lock_recovery(ls);
1929
1930 if (convert)
1931 error = find_lkb(ls, lksb->sb_lkid, &lkb);
1932 else
1933 error = create_lkb(ls, &lkb);
1934
1935 if (error)
1936 goto out;
1937
1938 error = set_lock_args(mode, lksb, flags, namelen, parent_lkid, ast,
3bcd3687 1939 astarg, bast, &args);
e7fd4179
DT
1940 if (error)
1941 goto out_put;
1942
1943 if (convert)
1944 error = convert_lock(ls, lkb, &args);
1945 else
1946 error = request_lock(ls, lkb, name, namelen, &args);
1947
1948 if (error == -EINPROGRESS)
1949 error = 0;
1950 out_put:
1951 if (convert || error)
1952 put_lkb(lkb);
1953 if (error == -EAGAIN)
1954 error = 0;
1955 out:
1956 unlock_recovery(ls);
1957 dlm_put_lockspace(ls);
1958 return error;
1959}
1960
1961int dlm_unlock(dlm_lockspace_t *lockspace,
1962 uint32_t lkid,
1963 uint32_t flags,
1964 struct dlm_lksb *lksb,
1965 void *astarg)
1966{
1967 struct dlm_ls *ls;
1968 struct dlm_lkb *lkb;
1969 struct dlm_args args;
1970 int error;
1971
1972 ls = dlm_find_lockspace_local(lockspace);
1973 if (!ls)
1974 return -EINVAL;
1975
1976 lock_recovery(ls);
1977
1978 error = find_lkb(ls, lkid, &lkb);
1979 if (error)
1980 goto out;
1981
1982 error = set_unlock_args(flags, astarg, &args);
1983 if (error)
1984 goto out_put;
1985
1986 if (flags & DLM_LKF_CANCEL)
1987 error = cancel_lock(ls, lkb, &args);
1988 else
1989 error = unlock_lock(ls, lkb, &args);
1990
1991 if (error == -DLM_EUNLOCK || error == -DLM_ECANCEL)
1992 error = 0;
1993 out_put:
1994 put_lkb(lkb);
1995 out:
1996 unlock_recovery(ls);
1997 dlm_put_lockspace(ls);
1998 return error;
1999}
2000
2001/*
2002 * send/receive routines for remote operations and replies
2003 *
2004 * send_args
2005 * send_common
2006 * send_request receive_request
2007 * send_convert receive_convert
2008 * send_unlock receive_unlock
2009 * send_cancel receive_cancel
2010 * send_grant receive_grant
2011 * send_bast receive_bast
2012 * send_lookup receive_lookup
2013 * send_remove receive_remove
2014 *
2015 * send_common_reply
2016 * receive_request_reply send_request_reply
2017 * receive_convert_reply send_convert_reply
2018 * receive_unlock_reply send_unlock_reply
2019 * receive_cancel_reply send_cancel_reply
2020 * receive_lookup_reply send_lookup_reply
2021 */
2022
2023static int create_message(struct dlm_rsb *r, struct dlm_lkb *lkb,
2024 int to_nodeid, int mstype,
2025 struct dlm_message **ms_ret,
2026 struct dlm_mhandle **mh_ret)
2027{
2028 struct dlm_message *ms;
2029 struct dlm_mhandle *mh;
2030 char *mb;
2031 int mb_len = sizeof(struct dlm_message);
2032
2033 switch (mstype) {
2034 case DLM_MSG_REQUEST:
2035 case DLM_MSG_LOOKUP:
2036 case DLM_MSG_REMOVE:
2037 mb_len += r->res_length;
2038 break;
2039 case DLM_MSG_CONVERT:
2040 case DLM_MSG_UNLOCK:
2041 case DLM_MSG_REQUEST_REPLY:
2042 case DLM_MSG_CONVERT_REPLY:
2043 case DLM_MSG_GRANT:
2044 if (lkb && lkb->lkb_lvbptr)
2045 mb_len += r->res_ls->ls_lvblen;
2046 break;
2047 }
2048
2049 /* get_buffer gives us a message handle (mh) that we need to
2050 pass into lowcomms_commit and a message buffer (mb) that we
2051 write our data into */
2052
2053 mh = dlm_lowcomms_get_buffer(to_nodeid, mb_len, GFP_KERNEL, &mb);
2054 if (!mh)
2055 return -ENOBUFS;
2056
2057 memset(mb, 0, mb_len);
2058
2059 ms = (struct dlm_message *) mb;
2060
2061 ms->m_header.h_version = (DLM_HEADER_MAJOR | DLM_HEADER_MINOR);
2062 ms->m_header.h_lockspace = r->res_ls->ls_global_id;
2063 ms->m_header.h_nodeid = dlm_our_nodeid();
2064 ms->m_header.h_length = mb_len;
2065 ms->m_header.h_cmd = DLM_MSG;
2066
2067 ms->m_type = mstype;
2068
2069 *mh_ret = mh;
2070 *ms_ret = ms;
2071 return 0;
2072}
2073
2074/* further lowcomms enhancements or alternate implementations may make
2075 the return value from this function useful at some point */
2076
2077static int send_message(struct dlm_mhandle *mh, struct dlm_message *ms)
2078{
2079 dlm_message_out(ms);
2080 dlm_lowcomms_commit_buffer(mh);
2081 return 0;
2082}
2083
2084static void send_args(struct dlm_rsb *r, struct dlm_lkb *lkb,
2085 struct dlm_message *ms)
2086{
2087 ms->m_nodeid = lkb->lkb_nodeid;
2088 ms->m_pid = lkb->lkb_ownpid;
2089 ms->m_lkid = lkb->lkb_id;
2090 ms->m_remid = lkb->lkb_remid;
2091 ms->m_exflags = lkb->lkb_exflags;
2092 ms->m_sbflags = lkb->lkb_sbflags;
2093 ms->m_flags = lkb->lkb_flags;
2094 ms->m_lvbseq = lkb->lkb_lvbseq;
2095 ms->m_status = lkb->lkb_status;
2096 ms->m_grmode = lkb->lkb_grmode;
2097 ms->m_rqmode = lkb->lkb_rqmode;
2098 ms->m_hash = r->res_hash;
2099
2100 /* m_result and m_bastmode are set from function args,
2101 not from lkb fields */
2102
2103 if (lkb->lkb_bastaddr)
2104 ms->m_asts |= AST_BAST;
2105 if (lkb->lkb_astaddr)
2106 ms->m_asts |= AST_COMP;
2107
e7fd4179
DT
2108 if (ms->m_type == DLM_MSG_REQUEST || ms->m_type == DLM_MSG_LOOKUP)
2109 memcpy(ms->m_extra, r->res_name, r->res_length);
2110
2111 else if (lkb->lkb_lvbptr)
2112 memcpy(ms->m_extra, lkb->lkb_lvbptr, r->res_ls->ls_lvblen);
2113
2114}
2115
2116static int send_common(struct dlm_rsb *r, struct dlm_lkb *lkb, int mstype)
2117{
2118 struct dlm_message *ms;
2119 struct dlm_mhandle *mh;
2120 int to_nodeid, error;
2121
2122 add_to_waiters(lkb, mstype);
2123
2124 to_nodeid = r->res_nodeid;
2125
2126 error = create_message(r, lkb, to_nodeid, mstype, &ms, &mh);
2127 if (error)
2128 goto fail;
2129
2130 send_args(r, lkb, ms);
2131
2132 error = send_message(mh, ms);
2133 if (error)
2134 goto fail;
2135 return 0;
2136
2137 fail:
2138 remove_from_waiters(lkb);
2139 return error;
2140}
2141
2142static int send_request(struct dlm_rsb *r, struct dlm_lkb *lkb)
2143{
2144 return send_common(r, lkb, DLM_MSG_REQUEST);
2145}
2146
2147static int send_convert(struct dlm_rsb *r, struct dlm_lkb *lkb)
2148{
2149 int error;
2150
2151 error = send_common(r, lkb, DLM_MSG_CONVERT);
2152
2153 /* down conversions go without a reply from the master */
2154 if (!error && down_conversion(lkb)) {
2155 remove_from_waiters(lkb);
2156 r->res_ls->ls_stub_ms.m_result = 0;
2157 __receive_convert_reply(r, lkb, &r->res_ls->ls_stub_ms);
2158 }
2159
2160 return error;
2161}
2162
2163/* FIXME: if this lkb is the only lock we hold on the rsb, then set
2164 MASTER_UNCERTAIN to force the next request on the rsb to confirm
2165 that the master is still correct. */
2166
2167static int send_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2168{
2169 return send_common(r, lkb, DLM_MSG_UNLOCK);
2170}
2171
2172static int send_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb)
2173{
2174 return send_common(r, lkb, DLM_MSG_CANCEL);
2175}
2176
2177static int send_grant(struct dlm_rsb *r, struct dlm_lkb *lkb)
2178{
2179 struct dlm_message *ms;
2180 struct dlm_mhandle *mh;
2181 int to_nodeid, error;
2182
2183 to_nodeid = lkb->lkb_nodeid;
2184
2185 error = create_message(r, lkb, to_nodeid, DLM_MSG_GRANT, &ms, &mh);
2186 if (error)
2187 goto out;
2188
2189 send_args(r, lkb, ms);
2190
2191 ms->m_result = 0;
2192
2193 error = send_message(mh, ms);
2194 out:
2195 return error;
2196}
2197
2198static int send_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int mode)
2199{
2200 struct dlm_message *ms;
2201 struct dlm_mhandle *mh;
2202 int to_nodeid, error;
2203
2204 to_nodeid = lkb->lkb_nodeid;
2205
2206 error = create_message(r, NULL, to_nodeid, DLM_MSG_BAST, &ms, &mh);
2207 if (error)
2208 goto out;
2209
2210 send_args(r, lkb, ms);
2211
2212 ms->m_bastmode = mode;
2213
2214 error = send_message(mh, ms);
2215 out:
2216 return error;
2217}
2218
2219static int send_lookup(struct dlm_rsb *r, struct dlm_lkb *lkb)
2220{
2221 struct dlm_message *ms;
2222 struct dlm_mhandle *mh;
2223 int to_nodeid, error;
2224
2225 add_to_waiters(lkb, DLM_MSG_LOOKUP);
2226
2227 to_nodeid = dlm_dir_nodeid(r);
2228
2229 error = create_message(r, NULL, to_nodeid, DLM_MSG_LOOKUP, &ms, &mh);
2230 if (error)
2231 goto fail;
2232
2233 send_args(r, lkb, ms);
2234
2235 error = send_message(mh, ms);
2236 if (error)
2237 goto fail;
2238 return 0;
2239
2240 fail:
2241 remove_from_waiters(lkb);
2242 return error;
2243}
2244
2245static int send_remove(struct dlm_rsb *r)
2246{
2247 struct dlm_message *ms;
2248 struct dlm_mhandle *mh;
2249 int to_nodeid, error;
2250
2251 to_nodeid = dlm_dir_nodeid(r);
2252
2253 error = create_message(r, NULL, to_nodeid, DLM_MSG_REMOVE, &ms, &mh);
2254 if (error)
2255 goto out;
2256
2257 memcpy(ms->m_extra, r->res_name, r->res_length);
2258 ms->m_hash = r->res_hash;
2259
2260 error = send_message(mh, ms);
2261 out:
2262 return error;
2263}
2264
2265static int send_common_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
2266 int mstype, int rv)
2267{
2268 struct dlm_message *ms;
2269 struct dlm_mhandle *mh;
2270 int to_nodeid, error;
2271
2272 to_nodeid = lkb->lkb_nodeid;
2273
2274 error = create_message(r, lkb, to_nodeid, mstype, &ms, &mh);
2275 if (error)
2276 goto out;
2277
2278 send_args(r, lkb, ms);
2279
2280 ms->m_result = rv;
2281
2282 error = send_message(mh, ms);
2283 out:
2284 return error;
2285}
2286
2287static int send_request_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
2288{
2289 return send_common_reply(r, lkb, DLM_MSG_REQUEST_REPLY, rv);
2290}
2291
2292static int send_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
2293{
2294 return send_common_reply(r, lkb, DLM_MSG_CONVERT_REPLY, rv);
2295}
2296
2297static int send_unlock_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
2298{
2299 return send_common_reply(r, lkb, DLM_MSG_UNLOCK_REPLY, rv);
2300}
2301
2302static int send_cancel_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
2303{
2304 return send_common_reply(r, lkb, DLM_MSG_CANCEL_REPLY, rv);
2305}
2306
2307static int send_lookup_reply(struct dlm_ls *ls, struct dlm_message *ms_in,
2308 int ret_nodeid, int rv)
2309{
2310 struct dlm_rsb *r = &ls->ls_stub_rsb;
2311 struct dlm_message *ms;
2312 struct dlm_mhandle *mh;
2313 int error, nodeid = ms_in->m_header.h_nodeid;
2314
2315 error = create_message(r, NULL, nodeid, DLM_MSG_LOOKUP_REPLY, &ms, &mh);
2316 if (error)
2317 goto out;
2318
2319 ms->m_lkid = ms_in->m_lkid;
2320 ms->m_result = rv;
2321 ms->m_nodeid = ret_nodeid;
2322
2323 error = send_message(mh, ms);
2324 out:
2325 return error;
2326}
2327
2328/* which args we save from a received message depends heavily on the type
2329 of message, unlike the send side where we can safely send everything about
2330 the lkb for any type of message */
2331
2332static void receive_flags(struct dlm_lkb *lkb, struct dlm_message *ms)
2333{
2334 lkb->lkb_exflags = ms->m_exflags;
2335 lkb->lkb_flags = (lkb->lkb_flags & 0xFFFF0000) |
2336 (ms->m_flags & 0x0000FFFF);
2337}
2338
2339static void receive_flags_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
2340{
2341 lkb->lkb_sbflags = ms->m_sbflags;
2342 lkb->lkb_flags = (lkb->lkb_flags & 0xFFFF0000) |
2343 (ms->m_flags & 0x0000FFFF);
2344}
2345
2346static int receive_extralen(struct dlm_message *ms)
2347{
2348 return (ms->m_header.h_length - sizeof(struct dlm_message));
2349}
2350
e7fd4179
DT
2351static int receive_lvb(struct dlm_ls *ls, struct dlm_lkb *lkb,
2352 struct dlm_message *ms)
2353{
2354 int len;
2355
2356 if (lkb->lkb_exflags & DLM_LKF_VALBLK) {
2357 if (!lkb->lkb_lvbptr)
2358 lkb->lkb_lvbptr = allocate_lvb(ls);
2359 if (!lkb->lkb_lvbptr)
2360 return -ENOMEM;
2361 len = receive_extralen(ms);
2362 memcpy(lkb->lkb_lvbptr, ms->m_extra, len);
2363 }
2364 return 0;
2365}
2366
2367static int receive_request_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
2368 struct dlm_message *ms)
2369{
2370 lkb->lkb_nodeid = ms->m_header.h_nodeid;
2371 lkb->lkb_ownpid = ms->m_pid;
2372 lkb->lkb_remid = ms->m_lkid;
2373 lkb->lkb_grmode = DLM_LOCK_IV;
2374 lkb->lkb_rqmode = ms->m_rqmode;
2375 lkb->lkb_bastaddr = (void *) (long) (ms->m_asts & AST_BAST);
2376 lkb->lkb_astaddr = (void *) (long) (ms->m_asts & AST_COMP);
2377
2378 DLM_ASSERT(is_master_copy(lkb), dlm_print_lkb(lkb););
2379
e7fd4179
DT
2380 if (receive_lvb(ls, lkb, ms))
2381 return -ENOMEM;
2382
2383 return 0;
2384}
2385
2386static int receive_convert_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
2387 struct dlm_message *ms)
2388{
2389 if (lkb->lkb_nodeid != ms->m_header.h_nodeid) {
2390 log_error(ls, "convert_args nodeid %d %d lkid %x %x",
2391 lkb->lkb_nodeid, ms->m_header.h_nodeid,
2392 lkb->lkb_id, lkb->lkb_remid);
2393 return -EINVAL;
2394 }
2395
2396 if (!is_master_copy(lkb))
2397 return -EINVAL;
2398
2399 if (lkb->lkb_status != DLM_LKSTS_GRANTED)
2400 return -EBUSY;
2401
e7fd4179
DT
2402 if (receive_lvb(ls, lkb, ms))
2403 return -ENOMEM;
2404
2405 lkb->lkb_rqmode = ms->m_rqmode;
2406 lkb->lkb_lvbseq = ms->m_lvbseq;
2407
2408 return 0;
2409}
2410
2411static int receive_unlock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
2412 struct dlm_message *ms)
2413{
2414 if (!is_master_copy(lkb))
2415 return -EINVAL;
2416 if (receive_lvb(ls, lkb, ms))
2417 return -ENOMEM;
2418 return 0;
2419}
2420
2421/* We fill in the stub-lkb fields with the info that send_xxxx_reply()
2422 uses to send a reply and that the remote end uses to process the reply. */
2423
2424static void setup_stub_lkb(struct dlm_ls *ls, struct dlm_message *ms)
2425{
2426 struct dlm_lkb *lkb = &ls->ls_stub_lkb;
2427 lkb->lkb_nodeid = ms->m_header.h_nodeid;
2428 lkb->lkb_remid = ms->m_lkid;
2429}
2430
2431static void receive_request(struct dlm_ls *ls, struct dlm_message *ms)
2432{
2433 struct dlm_lkb *lkb;
2434 struct dlm_rsb *r;
2435 int error, namelen;
2436
2437 error = create_lkb(ls, &lkb);
2438 if (error)
2439 goto fail;
2440
2441 receive_flags(lkb, ms);
2442 lkb->lkb_flags |= DLM_IFL_MSTCPY;
2443 error = receive_request_args(ls, lkb, ms);
2444 if (error) {
2445 put_lkb(lkb);
2446 goto fail;
2447 }
2448
2449 namelen = receive_extralen(ms);
2450
2451 error = find_rsb(ls, ms->m_extra, namelen, R_MASTER, &r);
2452 if (error) {
2453 put_lkb(lkb);
2454 goto fail;
2455 }
2456
2457 lock_rsb(r);
2458
2459 attach_lkb(r, lkb);
2460 error = do_request(r, lkb);
2461 send_request_reply(r, lkb, error);
2462
2463 unlock_rsb(r);
2464 put_rsb(r);
2465
2466 if (error == -EINPROGRESS)
2467 error = 0;
2468 if (error)
2469 put_lkb(lkb);
2470 return;
2471
2472 fail:
2473 setup_stub_lkb(ls, ms);
2474 send_request_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
2475}
2476
2477static void receive_convert(struct dlm_ls *ls, struct dlm_message *ms)
2478{
2479 struct dlm_lkb *lkb;
2480 struct dlm_rsb *r;
90135925 2481 int error, reply = 1;
e7fd4179
DT
2482
2483 error = find_lkb(ls, ms->m_remid, &lkb);
2484 if (error)
2485 goto fail;
2486
2487 r = lkb->lkb_resource;
2488
2489 hold_rsb(r);
2490 lock_rsb(r);
2491
2492 receive_flags(lkb, ms);
2493 error = receive_convert_args(ls, lkb, ms);
2494 if (error)
2495 goto out;
2496 reply = !down_conversion(lkb);
2497
2498 error = do_convert(r, lkb);
2499 out:
2500 if (reply)
2501 send_convert_reply(r, lkb, error);
2502
2503 unlock_rsb(r);
2504 put_rsb(r);
2505 put_lkb(lkb);
2506 return;
2507
2508 fail:
2509 setup_stub_lkb(ls, ms);
2510 send_convert_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
2511}
2512
2513static void receive_unlock(struct dlm_ls *ls, struct dlm_message *ms)
2514{
2515 struct dlm_lkb *lkb;
2516 struct dlm_rsb *r;
2517 int error;
2518
2519 error = find_lkb(ls, ms->m_remid, &lkb);
2520 if (error)
2521 goto fail;
2522
2523 r = lkb->lkb_resource;
2524
2525 hold_rsb(r);
2526 lock_rsb(r);
2527
2528 receive_flags(lkb, ms);
2529 error = receive_unlock_args(ls, lkb, ms);
2530 if (error)
2531 goto out;
2532
2533 error = do_unlock(r, lkb);
2534 out:
2535 send_unlock_reply(r, lkb, error);
2536
2537 unlock_rsb(r);
2538 put_rsb(r);
2539 put_lkb(lkb);
2540 return;
2541
2542 fail:
2543 setup_stub_lkb(ls, ms);
2544 send_unlock_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
2545}
2546
2547static void receive_cancel(struct dlm_ls *ls, struct dlm_message *ms)
2548{
2549 struct dlm_lkb *lkb;
2550 struct dlm_rsb *r;
2551 int error;
2552
2553 error = find_lkb(ls, ms->m_remid, &lkb);
2554 if (error)
2555 goto fail;
2556
2557 receive_flags(lkb, ms);
2558
2559 r = lkb->lkb_resource;
2560
2561 hold_rsb(r);
2562 lock_rsb(r);
2563
2564 error = do_cancel(r, lkb);
2565 send_cancel_reply(r, lkb, error);
2566
2567 unlock_rsb(r);
2568 put_rsb(r);
2569 put_lkb(lkb);
2570 return;
2571
2572 fail:
2573 setup_stub_lkb(ls, ms);
2574 send_cancel_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
2575}
2576
2577static void receive_grant(struct dlm_ls *ls, struct dlm_message *ms)
2578{
2579 struct dlm_lkb *lkb;
2580 struct dlm_rsb *r;
2581 int error;
2582
2583 error = find_lkb(ls, ms->m_remid, &lkb);
2584 if (error) {
2585 log_error(ls, "receive_grant no lkb");
2586 return;
2587 }
2588 DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb););
2589
2590 r = lkb->lkb_resource;
2591
2592 hold_rsb(r);
2593 lock_rsb(r);
2594
2595 receive_flags_reply(lkb, ms);
2596 grant_lock_pc(r, lkb, ms);
2597 queue_cast(r, lkb, 0);
2598
2599 unlock_rsb(r);
2600 put_rsb(r);
2601 put_lkb(lkb);
2602}
2603
2604static void receive_bast(struct dlm_ls *ls, struct dlm_message *ms)
2605{
2606 struct dlm_lkb *lkb;
2607 struct dlm_rsb *r;
2608 int error;
2609
2610 error = find_lkb(ls, ms->m_remid, &lkb);
2611 if (error) {
2612 log_error(ls, "receive_bast no lkb");
2613 return;
2614 }
2615 DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb););
2616
2617 r = lkb->lkb_resource;
2618
2619 hold_rsb(r);
2620 lock_rsb(r);
2621
2622 queue_bast(r, lkb, ms->m_bastmode);
2623
2624 unlock_rsb(r);
2625 put_rsb(r);
2626 put_lkb(lkb);
2627}
2628
2629static void receive_lookup(struct dlm_ls *ls, struct dlm_message *ms)
2630{
2631 int len, error, ret_nodeid, dir_nodeid, from_nodeid, our_nodeid;
2632
2633 from_nodeid = ms->m_header.h_nodeid;
2634 our_nodeid = dlm_our_nodeid();
2635
2636 len = receive_extralen(ms);
2637
2638 dir_nodeid = dlm_hash2nodeid(ls, ms->m_hash);
2639 if (dir_nodeid != our_nodeid) {
2640 log_error(ls, "lookup dir_nodeid %d from %d",
2641 dir_nodeid, from_nodeid);
2642 error = -EINVAL;
2643 ret_nodeid = -1;
2644 goto out;
2645 }
2646
2647 error = dlm_dir_lookup(ls, from_nodeid, ms->m_extra, len, &ret_nodeid);
2648
2649 /* Optimization: we're master so treat lookup as a request */
2650 if (!error && ret_nodeid == our_nodeid) {
2651 receive_request(ls, ms);
2652 return;
2653 }
2654 out:
2655 send_lookup_reply(ls, ms, ret_nodeid, error);
2656}
2657
2658static void receive_remove(struct dlm_ls *ls, struct dlm_message *ms)
2659{
2660 int len, dir_nodeid, from_nodeid;
2661
2662 from_nodeid = ms->m_header.h_nodeid;
2663
2664 len = receive_extralen(ms);
2665
2666 dir_nodeid = dlm_hash2nodeid(ls, ms->m_hash);
2667 if (dir_nodeid != dlm_our_nodeid()) {
2668 log_error(ls, "remove dir entry dir_nodeid %d from %d",
2669 dir_nodeid, from_nodeid);
2670 return;
2671 }
2672
2673 dlm_dir_remove_entry(ls, from_nodeid, ms->m_extra, len);
2674}
2675
2676static void receive_request_reply(struct dlm_ls *ls, struct dlm_message *ms)
2677{
2678 struct dlm_lkb *lkb;
2679 struct dlm_rsb *r;
2680 int error, mstype;
2681
2682 error = find_lkb(ls, ms->m_remid, &lkb);
2683 if (error) {
2684 log_error(ls, "receive_request_reply no lkb");
2685 return;
2686 }
2687 DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb););
2688
2689 mstype = lkb->lkb_wait_type;
2690 error = remove_from_waiters(lkb);
2691 if (error) {
2692 log_error(ls, "receive_request_reply not on waiters");
2693 goto out;
2694 }
2695
2696 /* this is the value returned from do_request() on the master */
2697 error = ms->m_result;
2698
2699 r = lkb->lkb_resource;
2700 hold_rsb(r);
2701 lock_rsb(r);
2702
2703 /* Optimization: the dir node was also the master, so it took our
2704 lookup as a request and sent request reply instead of lookup reply */
2705 if (mstype == DLM_MSG_LOOKUP) {
2706 r->res_nodeid = ms->m_header.h_nodeid;
2707 lkb->lkb_nodeid = r->res_nodeid;
2708 }
2709
2710 switch (error) {
2711 case -EAGAIN:
2712 /* request would block (be queued) on remote master;
2713 the unhold undoes the original ref from create_lkb()
2714 so it leads to the lkb being freed */
2715 queue_cast(r, lkb, -EAGAIN);
2716 confirm_master(r, -EAGAIN);
2717 unhold_lkb(lkb);
2718 break;
2719
2720 case -EINPROGRESS:
2721 case 0:
2722 /* request was queued or granted on remote master */
2723 receive_flags_reply(lkb, ms);
2724 lkb->lkb_remid = ms->m_lkid;
2725 if (error)
2726 add_lkb(r, lkb, DLM_LKSTS_WAITING);
2727 else {
2728 grant_lock_pc(r, lkb, ms);
2729 queue_cast(r, lkb, 0);
2730 }
2731 confirm_master(r, error);
2732 break;
2733
2734 case -ENOENT:
2735 case -ENOTBLK:
2736 /* find_rsb failed to find rsb or rsb wasn't master */
2737 r->res_nodeid = -1;
2738 lkb->lkb_nodeid = -1;
2739 _request_lock(r, lkb);
2740 break;
2741
2742 default:
2743 log_error(ls, "receive_request_reply error %d", error);
2744 }
2745
2746 unlock_rsb(r);
2747 put_rsb(r);
2748 out:
2749 put_lkb(lkb);
2750}
2751
2752static void __receive_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
2753 struct dlm_message *ms)
2754{
2755 int error = ms->m_result;
2756
2757 /* this is the value returned from do_convert() on the master */
2758
2759 switch (error) {
2760 case -EAGAIN:
2761 /* convert would block (be queued) on remote master */
2762 queue_cast(r, lkb, -EAGAIN);
2763 break;
2764
2765 case -EINPROGRESS:
2766 /* convert was queued on remote master */
2767 del_lkb(r, lkb);
2768 add_lkb(r, lkb, DLM_LKSTS_CONVERT);
2769 break;
2770
2771 case 0:
2772 /* convert was granted on remote master */
2773 receive_flags_reply(lkb, ms);
2774 grant_lock_pc(r, lkb, ms);
2775 queue_cast(r, lkb, 0);
2776 break;
2777
2778 default:
2779 log_error(r->res_ls, "receive_convert_reply error %d", error);
2780 }
2781}
2782
2783static void _receive_convert_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
2784{
2785 struct dlm_rsb *r = lkb->lkb_resource;
2786
2787 hold_rsb(r);
2788 lock_rsb(r);
2789
2790 __receive_convert_reply(r, lkb, ms);
2791
2792 unlock_rsb(r);
2793 put_rsb(r);
2794}
2795
2796static void receive_convert_reply(struct dlm_ls *ls, struct dlm_message *ms)
2797{
2798 struct dlm_lkb *lkb;
2799 int error;
2800
2801 error = find_lkb(ls, ms->m_remid, &lkb);
2802 if (error) {
2803 log_error(ls, "receive_convert_reply no lkb");
2804 return;
2805 }
2806 DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb););
2807
2808 error = remove_from_waiters(lkb);
2809 if (error) {
2810 log_error(ls, "receive_convert_reply not on waiters");
2811 goto out;
2812 }
2813
2814 _receive_convert_reply(lkb, ms);
2815 out:
2816 put_lkb(lkb);
2817}
2818
2819static void _receive_unlock_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
2820{
2821 struct dlm_rsb *r = lkb->lkb_resource;
2822 int error = ms->m_result;
2823
2824 hold_rsb(r);
2825 lock_rsb(r);
2826
2827 /* this is the value returned from do_unlock() on the master */
2828
2829 switch (error) {
2830 case -DLM_EUNLOCK:
2831 receive_flags_reply(lkb, ms);
2832 remove_lock_pc(r, lkb);
2833 queue_cast(r, lkb, -DLM_EUNLOCK);
2834 break;
2835 default:
2836 log_error(r->res_ls, "receive_unlock_reply error %d", error);
2837 }
2838
2839 unlock_rsb(r);
2840 put_rsb(r);
2841}
2842
2843static void receive_unlock_reply(struct dlm_ls *ls, struct dlm_message *ms)
2844{
2845 struct dlm_lkb *lkb;
2846 int error;
2847
2848 error = find_lkb(ls, ms->m_remid, &lkb);
2849 if (error) {
2850 log_error(ls, "receive_unlock_reply no lkb");
2851 return;
2852 }
2853 DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb););
2854
2855 error = remove_from_waiters(lkb);
2856 if (error) {
2857 log_error(ls, "receive_unlock_reply not on waiters");
2858 goto out;
2859 }
2860
2861 _receive_unlock_reply(lkb, ms);
2862 out:
2863 put_lkb(lkb);
2864}
2865
2866static void _receive_cancel_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
2867{
2868 struct dlm_rsb *r = lkb->lkb_resource;
2869 int error = ms->m_result;
2870
2871 hold_rsb(r);
2872 lock_rsb(r);
2873
2874 /* this is the value returned from do_cancel() on the master */
2875
2876 switch (error) {
2877 case -DLM_ECANCEL:
2878 receive_flags_reply(lkb, ms);
2879 revert_lock_pc(r, lkb);
2880 queue_cast(r, lkb, -DLM_ECANCEL);
2881 break;
2882 default:
2883 log_error(r->res_ls, "receive_cancel_reply error %d", error);
2884 }
2885
2886 unlock_rsb(r);
2887 put_rsb(r);
2888}
2889
2890static void receive_cancel_reply(struct dlm_ls *ls, struct dlm_message *ms)
2891{
2892 struct dlm_lkb *lkb;
2893 int error;
2894
2895 error = find_lkb(ls, ms->m_remid, &lkb);
2896 if (error) {
2897 log_error(ls, "receive_cancel_reply no lkb");
2898 return;
2899 }
2900 DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb););
2901
2902 error = remove_from_waiters(lkb);
2903 if (error) {
2904 log_error(ls, "receive_cancel_reply not on waiters");
2905 goto out;
2906 }
2907
2908 _receive_cancel_reply(lkb, ms);
2909 out:
2910 put_lkb(lkb);
2911}
2912
2913static void receive_lookup_reply(struct dlm_ls *ls, struct dlm_message *ms)
2914{
2915 struct dlm_lkb *lkb;
2916 struct dlm_rsb *r;
2917 int error, ret_nodeid;
2918
2919 error = find_lkb(ls, ms->m_lkid, &lkb);
2920 if (error) {
2921 log_error(ls, "receive_lookup_reply no lkb");
2922 return;
2923 }
2924
2925 error = remove_from_waiters(lkb);
2926 if (error) {
2927 log_error(ls, "receive_lookup_reply not on waiters");
2928 goto out;
2929 }
2930
2931 /* this is the value returned by dlm_dir_lookup on dir node
2932 FIXME: will a non-zero error ever be returned? */
2933 error = ms->m_result;
2934
2935 r = lkb->lkb_resource;
2936 hold_rsb(r);
2937 lock_rsb(r);
2938
2939 ret_nodeid = ms->m_nodeid;
2940 if (ret_nodeid == dlm_our_nodeid()) {
2941 r->res_nodeid = 0;
2942 ret_nodeid = 0;
2943 r->res_first_lkid = 0;
2944 } else {
2945 /* set_master() will copy res_nodeid to lkb_nodeid */
2946 r->res_nodeid = ret_nodeid;
2947 }
2948
2949 _request_lock(r, lkb);
2950
2951 if (!ret_nodeid)
2952 process_lookup_list(r);
2953
2954 unlock_rsb(r);
2955 put_rsb(r);
2956 out:
2957 put_lkb(lkb);
2958}
2959
2960int dlm_receive_message(struct dlm_header *hd, int nodeid, int recovery)
2961{
2962 struct dlm_message *ms = (struct dlm_message *) hd;
2963 struct dlm_ls *ls;
2964 int error;
2965
2966 if (!recovery)
2967 dlm_message_in(ms);
2968
2969 ls = dlm_find_lockspace_global(hd->h_lockspace);
2970 if (!ls) {
2971 log_print("drop message %d from %d for unknown lockspace %d",
2972 ms->m_type, nodeid, hd->h_lockspace);
2973 return -EINVAL;
2974 }
2975
2976 /* recovery may have just ended leaving a bunch of backed-up requests
2977 in the requestqueue; wait while dlm_recoverd clears them */
2978
2979 if (!recovery)
2980 dlm_wait_requestqueue(ls);
2981
2982 /* recovery may have just started while there were a bunch of
2983 in-flight requests -- save them in requestqueue to be processed
2984 after recovery. we can't let dlm_recvd block on the recovery
2985 lock. if dlm_recoverd is calling this function to clear the
2986 requestqueue, it needs to be interrupted (-EINTR) if another
2987 recovery operation is starting. */
2988
2989 while (1) {
2990 if (dlm_locking_stopped(ls)) {
2991 if (!recovery)
2992 dlm_add_requestqueue(ls, nodeid, hd);
2993 error = -EINTR;
2994 goto out;
2995 }
2996
2997 if (lock_recovery_try(ls))
2998 break;
2999 schedule();
3000 }
3001
3002 switch (ms->m_type) {
3003
3004 /* messages sent to a master node */
3005
3006 case DLM_MSG_REQUEST:
3007 receive_request(ls, ms);
3008 break;
3009
3010 case DLM_MSG_CONVERT:
3011 receive_convert(ls, ms);
3012 break;
3013
3014 case DLM_MSG_UNLOCK:
3015 receive_unlock(ls, ms);
3016 break;
3017
3018 case DLM_MSG_CANCEL:
3019 receive_cancel(ls, ms);
3020 break;
3021
3022 /* messages sent from a master node (replies to above) */
3023
3024 case DLM_MSG_REQUEST_REPLY:
3025 receive_request_reply(ls, ms);
3026 break;
3027
3028 case DLM_MSG_CONVERT_REPLY:
3029 receive_convert_reply(ls, ms);
3030 break;
3031
3032 case DLM_MSG_UNLOCK_REPLY:
3033 receive_unlock_reply(ls, ms);
3034 break;
3035
3036 case DLM_MSG_CANCEL_REPLY:
3037 receive_cancel_reply(ls, ms);
3038 break;
3039
3040 /* messages sent from a master node (only two types of async msg) */
3041
3042 case DLM_MSG_GRANT:
3043 receive_grant(ls, ms);
3044 break;
3045
3046 case DLM_MSG_BAST:
3047 receive_bast(ls, ms);
3048 break;
3049
3050 /* messages sent to a dir node */
3051
3052 case DLM_MSG_LOOKUP:
3053 receive_lookup(ls, ms);
3054 break;
3055
3056 case DLM_MSG_REMOVE:
3057 receive_remove(ls, ms);
3058 break;
3059
3060 /* messages sent from a dir node (remove has no reply) */
3061
3062 case DLM_MSG_LOOKUP_REPLY:
3063 receive_lookup_reply(ls, ms);
3064 break;
3065
3066 default:
3067 log_error(ls, "unknown message type %d", ms->m_type);
3068 }
3069
3070 unlock_recovery(ls);
3071 out:
3072 dlm_put_lockspace(ls);
3073 dlm_astd_wake();
3074 return 0;
3075}
3076
3077
3078/*
3079 * Recovery related
3080 */
3081
3082static void recover_convert_waiter(struct dlm_ls *ls, struct dlm_lkb *lkb)
3083{
3084 if (middle_conversion(lkb)) {
3085 hold_lkb(lkb);
3086 ls->ls_stub_ms.m_result = -EINPROGRESS;
3087 _remove_from_waiters(lkb);
3088 _receive_convert_reply(lkb, &ls->ls_stub_ms);
3089
3090 /* Same special case as in receive_rcom_lock_args() */
3091 lkb->lkb_grmode = DLM_LOCK_IV;
3092 rsb_set_flag(lkb->lkb_resource, RSB_RECOVER_CONVERT);
3093 unhold_lkb(lkb);
3094
3095 } else if (lkb->lkb_rqmode >= lkb->lkb_grmode) {
3096 lkb->lkb_flags |= DLM_IFL_RESEND;
3097 }
3098
3099 /* lkb->lkb_rqmode < lkb->lkb_grmode shouldn't happen since down
3100 conversions are async; there's no reply from the remote master */
3101}
3102
3103/* A waiting lkb needs recovery if the master node has failed, or
3104 the master node is changing (only when no directory is used) */
3105
3106static int waiter_needs_recovery(struct dlm_ls *ls, struct dlm_lkb *lkb)
3107{
3108 if (dlm_is_removed(ls, lkb->lkb_nodeid))
3109 return 1;
3110
3111 if (!dlm_no_directory(ls))
3112 return 0;
3113
3114 if (dlm_dir_nodeid(lkb->lkb_resource) != lkb->lkb_nodeid)
3115 return 1;
3116
3117 return 0;
3118}
3119
3120/* Recovery for locks that are waiting for replies from nodes that are now
3121 gone. We can just complete unlocks and cancels by faking a reply from the
3122 dead node. Requests and up-conversions we flag to be resent after
3123 recovery. Down-conversions can just be completed with a fake reply like
3124 unlocks. Conversions between PR and CW need special attention. */
3125
3126void dlm_recover_waiters_pre(struct dlm_ls *ls)
3127{
3128 struct dlm_lkb *lkb, *safe;
3129
90135925 3130 mutex_lock(&ls->ls_waiters_mutex);
e7fd4179
DT
3131
3132 list_for_each_entry_safe(lkb, safe, &ls->ls_waiters, lkb_wait_reply) {
3133 log_debug(ls, "pre recover waiter lkid %x type %d flags %x",
3134 lkb->lkb_id, lkb->lkb_wait_type, lkb->lkb_flags);
3135
3136 /* all outstanding lookups, regardless of destination will be
3137 resent after recovery is done */
3138
3139 if (lkb->lkb_wait_type == DLM_MSG_LOOKUP) {
3140 lkb->lkb_flags |= DLM_IFL_RESEND;
3141 continue;
3142 }
3143
3144 if (!waiter_needs_recovery(ls, lkb))
3145 continue;
3146
3147 switch (lkb->lkb_wait_type) {
3148
3149 case DLM_MSG_REQUEST:
3150 lkb->lkb_flags |= DLM_IFL_RESEND;
3151 break;
3152
3153 case DLM_MSG_CONVERT:
3154 recover_convert_waiter(ls, lkb);
3155 break;
3156
3157 case DLM_MSG_UNLOCK:
3158 hold_lkb(lkb);
3159 ls->ls_stub_ms.m_result = -DLM_EUNLOCK;
3160 _remove_from_waiters(lkb);
3161 _receive_unlock_reply(lkb, &ls->ls_stub_ms);
3162 put_lkb(lkb);
3163 break;
3164
3165 case DLM_MSG_CANCEL:
3166 hold_lkb(lkb);
3167 ls->ls_stub_ms.m_result = -DLM_ECANCEL;
3168 _remove_from_waiters(lkb);
3169 _receive_cancel_reply(lkb, &ls->ls_stub_ms);
3170 put_lkb(lkb);
3171 break;
3172
3173 default:
3174 log_error(ls, "invalid lkb wait_type %d",
3175 lkb->lkb_wait_type);
3176 }
3177 }
90135925 3178 mutex_unlock(&ls->ls_waiters_mutex);
e7fd4179
DT
3179}
3180
3181static int remove_resend_waiter(struct dlm_ls *ls, struct dlm_lkb **lkb_ret)
3182{
3183 struct dlm_lkb *lkb;
3184 int rv = 0;
3185
90135925 3186 mutex_lock(&ls->ls_waiters_mutex);
e7fd4179
DT
3187 list_for_each_entry(lkb, &ls->ls_waiters, lkb_wait_reply) {
3188 if (lkb->lkb_flags & DLM_IFL_RESEND) {
3189 rv = lkb->lkb_wait_type;
3190 _remove_from_waiters(lkb);
3191 lkb->lkb_flags &= ~DLM_IFL_RESEND;
3192 break;
3193 }
3194 }
90135925 3195 mutex_unlock(&ls->ls_waiters_mutex);
e7fd4179
DT
3196
3197 if (!rv)
3198 lkb = NULL;
3199 *lkb_ret = lkb;
3200 return rv;
3201}
3202
3203/* Deal with lookups and lkb's marked RESEND from _pre. We may now be the
3204 master or dir-node for r. Processing the lkb may result in it being placed
3205 back on waiters. */
3206
3207int dlm_recover_waiters_post(struct dlm_ls *ls)
3208{
3209 struct dlm_lkb *lkb;
3210 struct dlm_rsb *r;
3211 int error = 0, mstype;
3212
3213 while (1) {
3214 if (dlm_locking_stopped(ls)) {
3215 log_debug(ls, "recover_waiters_post aborted");
3216 error = -EINTR;
3217 break;
3218 }
3219
3220 mstype = remove_resend_waiter(ls, &lkb);
3221 if (!mstype)
3222 break;
3223
3224 r = lkb->lkb_resource;
3225
3226 log_debug(ls, "recover_waiters_post %x type %d flags %x %s",
3227 lkb->lkb_id, mstype, lkb->lkb_flags, r->res_name);
3228
3229 switch (mstype) {
3230
3231 case DLM_MSG_LOOKUP:
3232 hold_rsb(r);
3233 lock_rsb(r);
3234 _request_lock(r, lkb);
3235 if (is_master(r))
3236 confirm_master(r, 0);
3237 unlock_rsb(r);
3238 put_rsb(r);
3239 break;
3240
3241 case DLM_MSG_REQUEST:
3242 hold_rsb(r);
3243 lock_rsb(r);
3244 _request_lock(r, lkb);
3245 unlock_rsb(r);
3246 put_rsb(r);
3247 break;
3248
3249 case DLM_MSG_CONVERT:
3250 hold_rsb(r);
3251 lock_rsb(r);
3252 _convert_lock(r, lkb);
3253 unlock_rsb(r);
3254 put_rsb(r);
3255 break;
3256
3257 default:
3258 log_error(ls, "recover_waiters_post type %d", mstype);
3259 }
3260 }
3261
3262 return error;
3263}
3264
3265static void purge_queue(struct dlm_rsb *r, struct list_head *queue,
3266 int (*test)(struct dlm_ls *ls, struct dlm_lkb *lkb))
3267{
3268 struct dlm_ls *ls = r->res_ls;
3269 struct dlm_lkb *lkb, *safe;
3270
3271 list_for_each_entry_safe(lkb, safe, queue, lkb_statequeue) {
3272 if (test(ls, lkb)) {
3273 del_lkb(r, lkb);
3274 /* this put should free the lkb */
3275 if (!put_lkb(lkb))
3276 log_error(ls, "purged lkb not released");
3277 }
3278 }
3279}
3280
3281static int purge_dead_test(struct dlm_ls *ls, struct dlm_lkb *lkb)
3282{
3283 return (is_master_copy(lkb) && dlm_is_removed(ls, lkb->lkb_nodeid));
3284}
3285
3286static int purge_mstcpy_test(struct dlm_ls *ls, struct dlm_lkb *lkb)
3287{
3288 return is_master_copy(lkb);
3289}
3290
3291static void purge_dead_locks(struct dlm_rsb *r)
3292{
3293 purge_queue(r, &r->res_grantqueue, &purge_dead_test);
3294 purge_queue(r, &r->res_convertqueue, &purge_dead_test);
3295 purge_queue(r, &r->res_waitqueue, &purge_dead_test);
3296}
3297
3298void dlm_purge_mstcpy_locks(struct dlm_rsb *r)
3299{
3300 purge_queue(r, &r->res_grantqueue, &purge_mstcpy_test);
3301 purge_queue(r, &r->res_convertqueue, &purge_mstcpy_test);
3302 purge_queue(r, &r->res_waitqueue, &purge_mstcpy_test);
3303}
3304
3305/* Get rid of locks held by nodes that are gone. */
3306
3307int dlm_purge_locks(struct dlm_ls *ls)
3308{
3309 struct dlm_rsb *r;
3310
3311 log_debug(ls, "dlm_purge_locks");
3312
3313 down_write(&ls->ls_root_sem);
3314 list_for_each_entry(r, &ls->ls_root_list, res_root_list) {
3315 hold_rsb(r);
3316 lock_rsb(r);
3317 if (is_master(r))
3318 purge_dead_locks(r);
3319 unlock_rsb(r);
3320 unhold_rsb(r);
3321
3322 schedule();
3323 }
3324 up_write(&ls->ls_root_sem);
3325
3326 return 0;
3327}
3328
3329int dlm_grant_after_purge(struct dlm_ls *ls)
3330{
3331 struct dlm_rsb *r;
3332 int i;
3333
3334 for (i = 0; i < ls->ls_rsbtbl_size; i++) {
3335 read_lock(&ls->ls_rsbtbl[i].lock);
3336 list_for_each_entry(r, &ls->ls_rsbtbl[i].list, res_hashchain) {
3337 hold_rsb(r);
3338 lock_rsb(r);
3339 if (is_master(r)) {
3340 grant_pending_locks(r);
3341 confirm_master(r, 0);
3342 }
3343 unlock_rsb(r);
3344 put_rsb(r);
3345 }
3346 read_unlock(&ls->ls_rsbtbl[i].lock);
3347 }
3348
3349 return 0;
3350}
3351
3352static struct dlm_lkb *search_remid_list(struct list_head *head, int nodeid,
3353 uint32_t remid)
3354{
3355 struct dlm_lkb *lkb;
3356
3357 list_for_each_entry(lkb, head, lkb_statequeue) {
3358 if (lkb->lkb_nodeid == nodeid && lkb->lkb_remid == remid)
3359 return lkb;
3360 }
3361 return NULL;
3362}
3363
3364static struct dlm_lkb *search_remid(struct dlm_rsb *r, int nodeid,
3365 uint32_t remid)
3366{
3367 struct dlm_lkb *lkb;
3368
3369 lkb = search_remid_list(&r->res_grantqueue, nodeid, remid);
3370 if (lkb)
3371 return lkb;
3372 lkb = search_remid_list(&r->res_convertqueue, nodeid, remid);
3373 if (lkb)
3374 return lkb;
3375 lkb = search_remid_list(&r->res_waitqueue, nodeid, remid);
3376 if (lkb)
3377 return lkb;
3378 return NULL;
3379}
3380
3381static int receive_rcom_lock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
3382 struct dlm_rsb *r, struct dlm_rcom *rc)
3383{
3384 struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf;
3385 int lvblen;
3386
3387 lkb->lkb_nodeid = rc->rc_header.h_nodeid;
3388 lkb->lkb_ownpid = rl->rl_ownpid;
3389 lkb->lkb_remid = rl->rl_lkid;
3390 lkb->lkb_exflags = rl->rl_exflags;
3391 lkb->lkb_flags = rl->rl_flags & 0x0000FFFF;
3392 lkb->lkb_flags |= DLM_IFL_MSTCPY;
3393 lkb->lkb_lvbseq = rl->rl_lvbseq;
3394 lkb->lkb_rqmode = rl->rl_rqmode;
3395 lkb->lkb_grmode = rl->rl_grmode;
3396 /* don't set lkb_status because add_lkb wants to itself */
3397
3398 lkb->lkb_bastaddr = (void *) (long) (rl->rl_asts & AST_BAST);
3399 lkb->lkb_astaddr = (void *) (long) (rl->rl_asts & AST_COMP);
3400
e7fd4179
DT
3401 if (lkb->lkb_exflags & DLM_LKF_VALBLK) {
3402 lkb->lkb_lvbptr = allocate_lvb(ls);
3403 if (!lkb->lkb_lvbptr)
3404 return -ENOMEM;
3405 lvblen = rc->rc_header.h_length - sizeof(struct dlm_rcom) -
3406 sizeof(struct rcom_lock);
3407 memcpy(lkb->lkb_lvbptr, rl->rl_lvb, lvblen);
3408 }
3409
3410 /* Conversions between PR and CW (middle modes) need special handling.
3411 The real granted mode of these converting locks cannot be determined
3412 until all locks have been rebuilt on the rsb (recover_conversion) */
3413
3414 if (rl->rl_wait_type == DLM_MSG_CONVERT && middle_conversion(lkb)) {
3415 rl->rl_status = DLM_LKSTS_CONVERT;
3416 lkb->lkb_grmode = DLM_LOCK_IV;
3417 rsb_set_flag(r, RSB_RECOVER_CONVERT);
3418 }
3419
3420 return 0;
3421}
3422
3423/* This lkb may have been recovered in a previous aborted recovery so we need
3424 to check if the rsb already has an lkb with the given remote nodeid/lkid.
3425 If so we just send back a standard reply. If not, we create a new lkb with
3426 the given values and send back our lkid. We send back our lkid by sending
3427 back the rcom_lock struct we got but with the remid field filled in. */
3428
3429int dlm_recover_master_copy(struct dlm_ls *ls, struct dlm_rcom *rc)
3430{
3431 struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf;
3432 struct dlm_rsb *r;
3433 struct dlm_lkb *lkb;
3434 int error;
3435
3436 if (rl->rl_parent_lkid) {
3437 error = -EOPNOTSUPP;
3438 goto out;
3439 }
3440
3441 error = find_rsb(ls, rl->rl_name, rl->rl_namelen, R_MASTER, &r);
3442 if (error)
3443 goto out;
3444
3445 lock_rsb(r);
3446
3447 lkb = search_remid(r, rc->rc_header.h_nodeid, rl->rl_lkid);
3448 if (lkb) {
3449 error = -EEXIST;
3450 goto out_remid;
3451 }
3452
3453 error = create_lkb(ls, &lkb);
3454 if (error)
3455 goto out_unlock;
3456
3457 error = receive_rcom_lock_args(ls, lkb, r, rc);
3458 if (error) {
3459 put_lkb(lkb);
3460 goto out_unlock;
3461 }
3462
3463 attach_lkb(r, lkb);
3464 add_lkb(r, lkb, rl->rl_status);
3465 error = 0;
3466
3467 out_remid:
3468 /* this is the new value returned to the lock holder for
3469 saving in its process-copy lkb */
3470 rl->rl_remid = lkb->lkb_id;
3471
3472 out_unlock:
3473 unlock_rsb(r);
3474 put_rsb(r);
3475 out:
3476 if (error)
3477 log_print("recover_master_copy %d %x", error, rl->rl_lkid);
3478 rl->rl_result = error;
3479 return error;
3480}
3481
3482int dlm_recover_process_copy(struct dlm_ls *ls, struct dlm_rcom *rc)
3483{
3484 struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf;
3485 struct dlm_rsb *r;
3486 struct dlm_lkb *lkb;
3487 int error;
3488
3489 error = find_lkb(ls, rl->rl_lkid, &lkb);
3490 if (error) {
3491 log_error(ls, "recover_process_copy no lkid %x", rl->rl_lkid);
3492 return error;
3493 }
3494
3495 DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb););
3496
3497 error = rl->rl_result;
3498
3499 r = lkb->lkb_resource;
3500 hold_rsb(r);
3501 lock_rsb(r);
3502
3503 switch (error) {
3504 case -EEXIST:
3505 log_debug(ls, "master copy exists %x", lkb->lkb_id);
3506 /* fall through */
3507 case 0:
3508 lkb->lkb_remid = rl->rl_remid;
3509 break;
3510 default:
3511 log_error(ls, "dlm_recover_process_copy unknown error %d %x",
3512 error, lkb->lkb_id);
3513 }
3514
3515 /* an ack for dlm_recover_locks() which waits for replies from
3516 all the locks it sends to new masters */
3517 dlm_recovered_lock(r);
3518
3519 unlock_rsb(r);
3520 put_rsb(r);
3521 put_lkb(lkb);
3522
3523 return 0;
3524}
3525
This page took 0.191586 seconds and 5 git commands to generate.