[GFS2] lockdump improvements
[deliverable/linux.git] / fs / dlm / lock.c
CommitLineData
e7fd4179
DT
1/******************************************************************************
2*******************************************************************************
3**
ef0c2bb0 4** Copyright (C) 2005-2007 Red Hat, Inc. All rights reserved.
e7fd4179
DT
5**
6** This copyrighted material is made available to anyone wishing to use,
7** modify, copy, or redistribute it subject to the terms and conditions
8** of the GNU General Public License v.2.
9**
10*******************************************************************************
11******************************************************************************/
12
13/* Central locking logic has four stages:
14
15 dlm_lock()
16 dlm_unlock()
17
18 request_lock(ls, lkb)
19 convert_lock(ls, lkb)
20 unlock_lock(ls, lkb)
21 cancel_lock(ls, lkb)
22
23 _request_lock(r, lkb)
24 _convert_lock(r, lkb)
25 _unlock_lock(r, lkb)
26 _cancel_lock(r, lkb)
27
28 do_request(r, lkb)
29 do_convert(r, lkb)
30 do_unlock(r, lkb)
31 do_cancel(r, lkb)
32
33 Stage 1 (lock, unlock) is mainly about checking input args and
34 splitting into one of the four main operations:
35
36 dlm_lock = request_lock
37 dlm_lock+CONVERT = convert_lock
38 dlm_unlock = unlock_lock
39 dlm_unlock+CANCEL = cancel_lock
40
41 Stage 2, xxxx_lock(), just finds and locks the relevant rsb which is
42 provided to the next stage.
43
44 Stage 3, _xxxx_lock(), determines if the operation is local or remote.
45 When remote, it calls send_xxxx(), when local it calls do_xxxx().
46
47 Stage 4, do_xxxx(), is the guts of the operation. It manipulates the
48 given rsb and lkb and queues callbacks.
49
50 For remote operations, send_xxxx() results in the corresponding do_xxxx()
51 function being executed on the remote node. The connecting send/receive
52 calls on local (L) and remote (R) nodes:
53
54 L: send_xxxx() -> R: receive_xxxx()
55 R: do_xxxx()
56 L: receive_xxxx_reply() <- R: send_xxxx_reply()
57*/
597d0cae 58#include <linux/types.h>
e7fd4179 59#include "dlm_internal.h"
597d0cae 60#include <linux/dlm_device.h>
e7fd4179
DT
61#include "memory.h"
62#include "lowcomms.h"
63#include "requestqueue.h"
64#include "util.h"
65#include "dir.h"
66#include "member.h"
67#include "lockspace.h"
68#include "ast.h"
69#include "lock.h"
70#include "rcom.h"
71#include "recover.h"
72#include "lvb_table.h"
597d0cae 73#include "user.h"
e7fd4179
DT
74#include "config.h"
75
76static int send_request(struct dlm_rsb *r, struct dlm_lkb *lkb);
77static int send_convert(struct dlm_rsb *r, struct dlm_lkb *lkb);
78static int send_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb);
79static int send_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb);
80static int send_grant(struct dlm_rsb *r, struct dlm_lkb *lkb);
81static int send_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int mode);
82static int send_lookup(struct dlm_rsb *r, struct dlm_lkb *lkb);
83static int send_remove(struct dlm_rsb *r);
84static int _request_lock(struct dlm_rsb *r, struct dlm_lkb *lkb);
85static void __receive_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
86 struct dlm_message *ms);
87static int receive_extralen(struct dlm_message *ms);
8499137d 88static void do_purge(struct dlm_ls *ls, int nodeid, int pid);
e7fd4179
DT
89
90/*
91 * Lock compatibilty matrix - thanks Steve
92 * UN = Unlocked state. Not really a state, used as a flag
93 * PD = Padding. Used to make the matrix a nice power of two in size
94 * Other states are the same as the VMS DLM.
95 * Usage: matrix[grmode+1][rqmode+1] (although m[rq+1][gr+1] is the same)
96 */
97
98static const int __dlm_compat_matrix[8][8] = {
99 /* UN NL CR CW PR PW EX PD */
100 {1, 1, 1, 1, 1, 1, 1, 0}, /* UN */
101 {1, 1, 1, 1, 1, 1, 1, 0}, /* NL */
102 {1, 1, 1, 1, 1, 1, 0, 0}, /* CR */
103 {1, 1, 1, 1, 0, 0, 0, 0}, /* CW */
104 {1, 1, 1, 0, 1, 0, 0, 0}, /* PR */
105 {1, 1, 1, 0, 0, 0, 0, 0}, /* PW */
106 {1, 1, 0, 0, 0, 0, 0, 0}, /* EX */
107 {0, 0, 0, 0, 0, 0, 0, 0} /* PD */
108};
109
110/*
111 * This defines the direction of transfer of LVB data.
112 * Granted mode is the row; requested mode is the column.
113 * Usage: matrix[grmode+1][rqmode+1]
114 * 1 = LVB is returned to the caller
115 * 0 = LVB is written to the resource
116 * -1 = nothing happens to the LVB
117 */
118
119const int dlm_lvb_operations[8][8] = {
120 /* UN NL CR CW PR PW EX PD*/
121 { -1, 1, 1, 1, 1, 1, 1, -1 }, /* UN */
122 { -1, 1, 1, 1, 1, 1, 1, 0 }, /* NL */
123 { -1, -1, 1, 1, 1, 1, 1, 0 }, /* CR */
124 { -1, -1, -1, 1, 1, 1, 1, 0 }, /* CW */
125 { -1, -1, -1, -1, 1, 1, 1, 0 }, /* PR */
126 { -1, 0, 0, 0, 0, 0, 1, 0 }, /* PW */
127 { -1, 0, 0, 0, 0, 0, 0, 0 }, /* EX */
128 { -1, 0, 0, 0, 0, 0, 0, 0 } /* PD */
129};
e7fd4179
DT
130
131#define modes_compat(gr, rq) \
132 __dlm_compat_matrix[(gr)->lkb_grmode + 1][(rq)->lkb_rqmode + 1]
133
134int dlm_modes_compat(int mode1, int mode2)
135{
136 return __dlm_compat_matrix[mode1 + 1][mode2 + 1];
137}
138
139/*
140 * Compatibility matrix for conversions with QUECVT set.
141 * Granted mode is the row; requested mode is the column.
142 * Usage: matrix[grmode+1][rqmode+1]
143 */
144
145static const int __quecvt_compat_matrix[8][8] = {
146 /* UN NL CR CW PR PW EX PD */
147 {0, 0, 0, 0, 0, 0, 0, 0}, /* UN */
148 {0, 0, 1, 1, 1, 1, 1, 0}, /* NL */
149 {0, 0, 0, 1, 1, 1, 1, 0}, /* CR */
150 {0, 0, 0, 0, 1, 1, 1, 0}, /* CW */
151 {0, 0, 0, 1, 0, 1, 1, 0}, /* PR */
152 {0, 0, 0, 0, 0, 0, 1, 0}, /* PW */
153 {0, 0, 0, 0, 0, 0, 0, 0}, /* EX */
154 {0, 0, 0, 0, 0, 0, 0, 0} /* PD */
155};
156
597d0cae 157void dlm_print_lkb(struct dlm_lkb *lkb)
e7fd4179
DT
158{
159 printk(KERN_ERR "lkb: nodeid %d id %x remid %x exflags %x flags %x\n"
160 " status %d rqmode %d grmode %d wait_type %d ast_type %d\n",
161 lkb->lkb_nodeid, lkb->lkb_id, lkb->lkb_remid, lkb->lkb_exflags,
162 lkb->lkb_flags, lkb->lkb_status, lkb->lkb_rqmode,
163 lkb->lkb_grmode, lkb->lkb_wait_type, lkb->lkb_ast_type);
164}
165
166void dlm_print_rsb(struct dlm_rsb *r)
167{
168 printk(KERN_ERR "rsb: nodeid %d flags %lx first %x rlc %d name %s\n",
169 r->res_nodeid, r->res_flags, r->res_first_lkid,
170 r->res_recover_locks_count, r->res_name);
171}
172
a345da3e
DT
173void dlm_dump_rsb(struct dlm_rsb *r)
174{
175 struct dlm_lkb *lkb;
176
177 dlm_print_rsb(r);
178
179 printk(KERN_ERR "rsb: root_list empty %d recover_list empty %d\n",
180 list_empty(&r->res_root_list), list_empty(&r->res_recover_list));
181 printk(KERN_ERR "rsb lookup list\n");
182 list_for_each_entry(lkb, &r->res_lookup, lkb_rsb_lookup)
183 dlm_print_lkb(lkb);
184 printk(KERN_ERR "rsb grant queue:\n");
185 list_for_each_entry(lkb, &r->res_grantqueue, lkb_statequeue)
186 dlm_print_lkb(lkb);
187 printk(KERN_ERR "rsb convert queue:\n");
188 list_for_each_entry(lkb, &r->res_convertqueue, lkb_statequeue)
189 dlm_print_lkb(lkb);
190 printk(KERN_ERR "rsb wait queue:\n");
191 list_for_each_entry(lkb, &r->res_waitqueue, lkb_statequeue)
192 dlm_print_lkb(lkb);
193}
194
e7fd4179
DT
195/* Threads cannot use the lockspace while it's being recovered */
196
197static inline void lock_recovery(struct dlm_ls *ls)
198{
199 down_read(&ls->ls_in_recovery);
200}
201
202static inline void unlock_recovery(struct dlm_ls *ls)
203{
204 up_read(&ls->ls_in_recovery);
205}
206
207static inline int lock_recovery_try(struct dlm_ls *ls)
208{
209 return down_read_trylock(&ls->ls_in_recovery);
210}
211
212static inline int can_be_queued(struct dlm_lkb *lkb)
213{
214 return !(lkb->lkb_exflags & DLM_LKF_NOQUEUE);
215}
216
217static inline int force_blocking_asts(struct dlm_lkb *lkb)
218{
219 return (lkb->lkb_exflags & DLM_LKF_NOQUEUEBAST);
220}
221
222static inline int is_demoted(struct dlm_lkb *lkb)
223{
224 return (lkb->lkb_sbflags & DLM_SBF_DEMOTED);
225}
226
227static inline int is_remote(struct dlm_rsb *r)
228{
229 DLM_ASSERT(r->res_nodeid >= 0, dlm_print_rsb(r););
230 return !!r->res_nodeid;
231}
232
233static inline int is_process_copy(struct dlm_lkb *lkb)
234{
235 return (lkb->lkb_nodeid && !(lkb->lkb_flags & DLM_IFL_MSTCPY));
236}
237
238static inline int is_master_copy(struct dlm_lkb *lkb)
239{
240 if (lkb->lkb_flags & DLM_IFL_MSTCPY)
241 DLM_ASSERT(lkb->lkb_nodeid, dlm_print_lkb(lkb););
90135925 242 return (lkb->lkb_flags & DLM_IFL_MSTCPY) ? 1 : 0;
e7fd4179
DT
243}
244
245static inline int middle_conversion(struct dlm_lkb *lkb)
246{
247 if ((lkb->lkb_grmode==DLM_LOCK_PR && lkb->lkb_rqmode==DLM_LOCK_CW) ||
248 (lkb->lkb_rqmode==DLM_LOCK_PR && lkb->lkb_grmode==DLM_LOCK_CW))
90135925
DT
249 return 1;
250 return 0;
e7fd4179
DT
251}
252
253static inline int down_conversion(struct dlm_lkb *lkb)
254{
255 return (!middle_conversion(lkb) && lkb->lkb_rqmode < lkb->lkb_grmode);
256}
257
ef0c2bb0
DT
258static inline int is_overlap_unlock(struct dlm_lkb *lkb)
259{
260 return lkb->lkb_flags & DLM_IFL_OVERLAP_UNLOCK;
261}
262
263static inline int is_overlap_cancel(struct dlm_lkb *lkb)
264{
265 return lkb->lkb_flags & DLM_IFL_OVERLAP_CANCEL;
266}
267
268static inline int is_overlap(struct dlm_lkb *lkb)
269{
270 return (lkb->lkb_flags & (DLM_IFL_OVERLAP_UNLOCK |
271 DLM_IFL_OVERLAP_CANCEL));
272}
273
e7fd4179
DT
274static void queue_cast(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
275{
276 if (is_master_copy(lkb))
277 return;
278
279 DLM_ASSERT(lkb->lkb_lksb, dlm_print_lkb(lkb););
280
281 lkb->lkb_lksb->sb_status = rv;
282 lkb->lkb_lksb->sb_flags = lkb->lkb_sbflags;
283
284 dlm_add_ast(lkb, AST_COMP);
285}
286
ef0c2bb0
DT
287static inline void queue_cast_overlap(struct dlm_rsb *r, struct dlm_lkb *lkb)
288{
289 queue_cast(r, lkb,
290 is_overlap_unlock(lkb) ? -DLM_EUNLOCK : -DLM_ECANCEL);
291}
292
e7fd4179
DT
293static void queue_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int rqmode)
294{
295 if (is_master_copy(lkb))
296 send_bast(r, lkb, rqmode);
297 else {
298 lkb->lkb_bastmode = rqmode;
299 dlm_add_ast(lkb, AST_BAST);
300 }
301}
302
303/*
304 * Basic operations on rsb's and lkb's
305 */
306
307static struct dlm_rsb *create_rsb(struct dlm_ls *ls, char *name, int len)
308{
309 struct dlm_rsb *r;
310
311 r = allocate_rsb(ls, len);
312 if (!r)
313 return NULL;
314
315 r->res_ls = ls;
316 r->res_length = len;
317 memcpy(r->res_name, name, len);
90135925 318 mutex_init(&r->res_mutex);
e7fd4179
DT
319
320 INIT_LIST_HEAD(&r->res_lookup);
321 INIT_LIST_HEAD(&r->res_grantqueue);
322 INIT_LIST_HEAD(&r->res_convertqueue);
323 INIT_LIST_HEAD(&r->res_waitqueue);
324 INIT_LIST_HEAD(&r->res_root_list);
325 INIT_LIST_HEAD(&r->res_recover_list);
326
327 return r;
328}
329
330static int search_rsb_list(struct list_head *head, char *name, int len,
331 unsigned int flags, struct dlm_rsb **r_ret)
332{
333 struct dlm_rsb *r;
334 int error = 0;
335
336 list_for_each_entry(r, head, res_hashchain) {
337 if (len == r->res_length && !memcmp(name, r->res_name, len))
338 goto found;
339 }
597d0cae 340 return -EBADR;
e7fd4179
DT
341
342 found:
343 if (r->res_nodeid && (flags & R_MASTER))
344 error = -ENOTBLK;
345 *r_ret = r;
346 return error;
347}
348
349static int _search_rsb(struct dlm_ls *ls, char *name, int len, int b,
350 unsigned int flags, struct dlm_rsb **r_ret)
351{
352 struct dlm_rsb *r;
353 int error;
354
355 error = search_rsb_list(&ls->ls_rsbtbl[b].list, name, len, flags, &r);
356 if (!error) {
357 kref_get(&r->res_ref);
358 goto out;
359 }
360 error = search_rsb_list(&ls->ls_rsbtbl[b].toss, name, len, flags, &r);
361 if (error)
362 goto out;
363
364 list_move(&r->res_hashchain, &ls->ls_rsbtbl[b].list);
365
366 if (dlm_no_directory(ls))
367 goto out;
368
369 if (r->res_nodeid == -1) {
370 rsb_clear_flag(r, RSB_MASTER_UNCERTAIN);
371 r->res_first_lkid = 0;
372 } else if (r->res_nodeid > 0) {
373 rsb_set_flag(r, RSB_MASTER_UNCERTAIN);
374 r->res_first_lkid = 0;
375 } else {
376 DLM_ASSERT(r->res_nodeid == 0, dlm_print_rsb(r););
377 DLM_ASSERT(!rsb_flag(r, RSB_MASTER_UNCERTAIN),);
378 }
379 out:
380 *r_ret = r;
381 return error;
382}
383
384static int search_rsb(struct dlm_ls *ls, char *name, int len, int b,
385 unsigned int flags, struct dlm_rsb **r_ret)
386{
387 int error;
388 write_lock(&ls->ls_rsbtbl[b].lock);
389 error = _search_rsb(ls, name, len, b, flags, r_ret);
390 write_unlock(&ls->ls_rsbtbl[b].lock);
391 return error;
392}
393
394/*
395 * Find rsb in rsbtbl and potentially create/add one
396 *
397 * Delaying the release of rsb's has a similar benefit to applications keeping
398 * NL locks on an rsb, but without the guarantee that the cached master value
399 * will still be valid when the rsb is reused. Apps aren't always smart enough
400 * to keep NL locks on an rsb that they may lock again shortly; this can lead
401 * to excessive master lookups and removals if we don't delay the release.
402 *
403 * Searching for an rsb means looking through both the normal list and toss
404 * list. When found on the toss list the rsb is moved to the normal list with
405 * ref count of 1; when found on normal list the ref count is incremented.
406 */
407
408static int find_rsb(struct dlm_ls *ls, char *name, int namelen,
409 unsigned int flags, struct dlm_rsb **r_ret)
410{
411 struct dlm_rsb *r, *tmp;
412 uint32_t hash, bucket;
413 int error = 0;
414
415 if (dlm_no_directory(ls))
416 flags |= R_CREATE;
417
418 hash = jhash(name, namelen, 0);
419 bucket = hash & (ls->ls_rsbtbl_size - 1);
420
421 error = search_rsb(ls, name, namelen, bucket, flags, &r);
422 if (!error)
423 goto out;
424
597d0cae 425 if (error == -EBADR && !(flags & R_CREATE))
e7fd4179
DT
426 goto out;
427
428 /* the rsb was found but wasn't a master copy */
429 if (error == -ENOTBLK)
430 goto out;
431
432 error = -ENOMEM;
433 r = create_rsb(ls, name, namelen);
434 if (!r)
435 goto out;
436
437 r->res_hash = hash;
438 r->res_bucket = bucket;
439 r->res_nodeid = -1;
440 kref_init(&r->res_ref);
441
442 /* With no directory, the master can be set immediately */
443 if (dlm_no_directory(ls)) {
444 int nodeid = dlm_dir_nodeid(r);
445 if (nodeid == dlm_our_nodeid())
446 nodeid = 0;
447 r->res_nodeid = nodeid;
448 }
449
450 write_lock(&ls->ls_rsbtbl[bucket].lock);
451 error = _search_rsb(ls, name, namelen, bucket, 0, &tmp);
452 if (!error) {
453 write_unlock(&ls->ls_rsbtbl[bucket].lock);
454 free_rsb(r);
455 r = tmp;
456 goto out;
457 }
458 list_add(&r->res_hashchain, &ls->ls_rsbtbl[bucket].list);
459 write_unlock(&ls->ls_rsbtbl[bucket].lock);
460 error = 0;
461 out:
462 *r_ret = r;
463 return error;
464}
465
466int dlm_find_rsb(struct dlm_ls *ls, char *name, int namelen,
467 unsigned int flags, struct dlm_rsb **r_ret)
468{
469 return find_rsb(ls, name, namelen, flags, r_ret);
470}
471
472/* This is only called to add a reference when the code already holds
473 a valid reference to the rsb, so there's no need for locking. */
474
475static inline void hold_rsb(struct dlm_rsb *r)
476{
477 kref_get(&r->res_ref);
478}
479
480void dlm_hold_rsb(struct dlm_rsb *r)
481{
482 hold_rsb(r);
483}
484
485static void toss_rsb(struct kref *kref)
486{
487 struct dlm_rsb *r = container_of(kref, struct dlm_rsb, res_ref);
488 struct dlm_ls *ls = r->res_ls;
489
490 DLM_ASSERT(list_empty(&r->res_root_list), dlm_print_rsb(r););
491 kref_init(&r->res_ref);
492 list_move(&r->res_hashchain, &ls->ls_rsbtbl[r->res_bucket].toss);
493 r->res_toss_time = jiffies;
494 if (r->res_lvbptr) {
495 free_lvb(r->res_lvbptr);
496 r->res_lvbptr = NULL;
497 }
498}
499
500/* When all references to the rsb are gone it's transfered to
501 the tossed list for later disposal. */
502
503static void put_rsb(struct dlm_rsb *r)
504{
505 struct dlm_ls *ls = r->res_ls;
506 uint32_t bucket = r->res_bucket;
507
508 write_lock(&ls->ls_rsbtbl[bucket].lock);
509 kref_put(&r->res_ref, toss_rsb);
510 write_unlock(&ls->ls_rsbtbl[bucket].lock);
511}
512
513void dlm_put_rsb(struct dlm_rsb *r)
514{
515 put_rsb(r);
516}
517
518/* See comment for unhold_lkb */
519
520static void unhold_rsb(struct dlm_rsb *r)
521{
522 int rv;
523 rv = kref_put(&r->res_ref, toss_rsb);
a345da3e 524 DLM_ASSERT(!rv, dlm_dump_rsb(r););
e7fd4179
DT
525}
526
527static void kill_rsb(struct kref *kref)
528{
529 struct dlm_rsb *r = container_of(kref, struct dlm_rsb, res_ref);
530
531 /* All work is done after the return from kref_put() so we
532 can release the write_lock before the remove and free. */
533
a345da3e
DT
534 DLM_ASSERT(list_empty(&r->res_lookup), dlm_dump_rsb(r););
535 DLM_ASSERT(list_empty(&r->res_grantqueue), dlm_dump_rsb(r););
536 DLM_ASSERT(list_empty(&r->res_convertqueue), dlm_dump_rsb(r););
537 DLM_ASSERT(list_empty(&r->res_waitqueue), dlm_dump_rsb(r););
538 DLM_ASSERT(list_empty(&r->res_root_list), dlm_dump_rsb(r););
539 DLM_ASSERT(list_empty(&r->res_recover_list), dlm_dump_rsb(r););
e7fd4179
DT
540}
541
542/* Attaching/detaching lkb's from rsb's is for rsb reference counting.
543 The rsb must exist as long as any lkb's for it do. */
544
545static void attach_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb)
546{
547 hold_rsb(r);
548 lkb->lkb_resource = r;
549}
550
551static void detach_lkb(struct dlm_lkb *lkb)
552{
553 if (lkb->lkb_resource) {
554 put_rsb(lkb->lkb_resource);
555 lkb->lkb_resource = NULL;
556 }
557}
558
559static int create_lkb(struct dlm_ls *ls, struct dlm_lkb **lkb_ret)
560{
561 struct dlm_lkb *lkb, *tmp;
562 uint32_t lkid = 0;
563 uint16_t bucket;
564
565 lkb = allocate_lkb(ls);
566 if (!lkb)
567 return -ENOMEM;
568
569 lkb->lkb_nodeid = -1;
570 lkb->lkb_grmode = DLM_LOCK_IV;
571 kref_init(&lkb->lkb_ref);
34e22bed 572 INIT_LIST_HEAD(&lkb->lkb_ownqueue);
ef0c2bb0 573 INIT_LIST_HEAD(&lkb->lkb_rsb_lookup);
e7fd4179
DT
574
575 get_random_bytes(&bucket, sizeof(bucket));
576 bucket &= (ls->ls_lkbtbl_size - 1);
577
578 write_lock(&ls->ls_lkbtbl[bucket].lock);
579
580 /* counter can roll over so we must verify lkid is not in use */
581
582 while (lkid == 0) {
ce03f12b 583 lkid = (bucket << 16) | ls->ls_lkbtbl[bucket].counter++;
e7fd4179
DT
584
585 list_for_each_entry(tmp, &ls->ls_lkbtbl[bucket].list,
586 lkb_idtbl_list) {
587 if (tmp->lkb_id != lkid)
588 continue;
589 lkid = 0;
590 break;
591 }
592 }
593
594 lkb->lkb_id = lkid;
595 list_add(&lkb->lkb_idtbl_list, &ls->ls_lkbtbl[bucket].list);
596 write_unlock(&ls->ls_lkbtbl[bucket].lock);
597
598 *lkb_ret = lkb;
599 return 0;
600}
601
602static struct dlm_lkb *__find_lkb(struct dlm_ls *ls, uint32_t lkid)
603{
e7fd4179 604 struct dlm_lkb *lkb;
ce03f12b 605 uint16_t bucket = (lkid >> 16);
e7fd4179
DT
606
607 list_for_each_entry(lkb, &ls->ls_lkbtbl[bucket].list, lkb_idtbl_list) {
608 if (lkb->lkb_id == lkid)
609 return lkb;
610 }
611 return NULL;
612}
613
614static int find_lkb(struct dlm_ls *ls, uint32_t lkid, struct dlm_lkb **lkb_ret)
615{
616 struct dlm_lkb *lkb;
ce03f12b 617 uint16_t bucket = (lkid >> 16);
e7fd4179
DT
618
619 if (bucket >= ls->ls_lkbtbl_size)
620 return -EBADSLT;
621
622 read_lock(&ls->ls_lkbtbl[bucket].lock);
623 lkb = __find_lkb(ls, lkid);
624 if (lkb)
625 kref_get(&lkb->lkb_ref);
626 read_unlock(&ls->ls_lkbtbl[bucket].lock);
627
628 *lkb_ret = lkb;
629 return lkb ? 0 : -ENOENT;
630}
631
632static void kill_lkb(struct kref *kref)
633{
634 struct dlm_lkb *lkb = container_of(kref, struct dlm_lkb, lkb_ref);
635
636 /* All work is done after the return from kref_put() so we
637 can release the write_lock before the detach_lkb */
638
639 DLM_ASSERT(!lkb->lkb_status, dlm_print_lkb(lkb););
640}
641
b3f58d8f
DT
642/* __put_lkb() is used when an lkb may not have an rsb attached to
643 it so we need to provide the lockspace explicitly */
644
645static int __put_lkb(struct dlm_ls *ls, struct dlm_lkb *lkb)
e7fd4179 646{
ce03f12b 647 uint16_t bucket = (lkb->lkb_id >> 16);
e7fd4179
DT
648
649 write_lock(&ls->ls_lkbtbl[bucket].lock);
650 if (kref_put(&lkb->lkb_ref, kill_lkb)) {
651 list_del(&lkb->lkb_idtbl_list);
652 write_unlock(&ls->ls_lkbtbl[bucket].lock);
653
654 detach_lkb(lkb);
655
656 /* for local/process lkbs, lvbptr points to caller's lksb */
657 if (lkb->lkb_lvbptr && is_master_copy(lkb))
658 free_lvb(lkb->lkb_lvbptr);
e7fd4179
DT
659 free_lkb(lkb);
660 return 1;
661 } else {
662 write_unlock(&ls->ls_lkbtbl[bucket].lock);
663 return 0;
664 }
665}
666
667int dlm_put_lkb(struct dlm_lkb *lkb)
668{
b3f58d8f
DT
669 struct dlm_ls *ls;
670
671 DLM_ASSERT(lkb->lkb_resource, dlm_print_lkb(lkb););
672 DLM_ASSERT(lkb->lkb_resource->res_ls, dlm_print_lkb(lkb););
673
674 ls = lkb->lkb_resource->res_ls;
675 return __put_lkb(ls, lkb);
e7fd4179
DT
676}
677
678/* This is only called to add a reference when the code already holds
679 a valid reference to the lkb, so there's no need for locking. */
680
681static inline void hold_lkb(struct dlm_lkb *lkb)
682{
683 kref_get(&lkb->lkb_ref);
684}
685
686/* This is called when we need to remove a reference and are certain
687 it's not the last ref. e.g. del_lkb is always called between a
688 find_lkb/put_lkb and is always the inverse of a previous add_lkb.
689 put_lkb would work fine, but would involve unnecessary locking */
690
691static inline void unhold_lkb(struct dlm_lkb *lkb)
692{
693 int rv;
694 rv = kref_put(&lkb->lkb_ref, kill_lkb);
695 DLM_ASSERT(!rv, dlm_print_lkb(lkb););
696}
697
698static void lkb_add_ordered(struct list_head *new, struct list_head *head,
699 int mode)
700{
701 struct dlm_lkb *lkb = NULL;
702
703 list_for_each_entry(lkb, head, lkb_statequeue)
704 if (lkb->lkb_rqmode < mode)
705 break;
706
707 if (!lkb)
708 list_add_tail(new, head);
709 else
710 __list_add(new, lkb->lkb_statequeue.prev, &lkb->lkb_statequeue);
711}
712
713/* add/remove lkb to rsb's grant/convert/wait queue */
714
715static void add_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb, int status)
716{
717 kref_get(&lkb->lkb_ref);
718
719 DLM_ASSERT(!lkb->lkb_status, dlm_print_lkb(lkb););
720
721 lkb->lkb_status = status;
722
723 switch (status) {
724 case DLM_LKSTS_WAITING:
725 if (lkb->lkb_exflags & DLM_LKF_HEADQUE)
726 list_add(&lkb->lkb_statequeue, &r->res_waitqueue);
727 else
728 list_add_tail(&lkb->lkb_statequeue, &r->res_waitqueue);
729 break;
730 case DLM_LKSTS_GRANTED:
731 /* convention says granted locks kept in order of grmode */
732 lkb_add_ordered(&lkb->lkb_statequeue, &r->res_grantqueue,
733 lkb->lkb_grmode);
734 break;
735 case DLM_LKSTS_CONVERT:
736 if (lkb->lkb_exflags & DLM_LKF_HEADQUE)
737 list_add(&lkb->lkb_statequeue, &r->res_convertqueue);
738 else
739 list_add_tail(&lkb->lkb_statequeue,
740 &r->res_convertqueue);
741 break;
742 default:
743 DLM_ASSERT(0, dlm_print_lkb(lkb); printk("sts=%d\n", status););
744 }
745}
746
747static void del_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb)
748{
749 lkb->lkb_status = 0;
750 list_del(&lkb->lkb_statequeue);
751 unhold_lkb(lkb);
752}
753
754static void move_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb, int sts)
755{
756 hold_lkb(lkb);
757 del_lkb(r, lkb);
758 add_lkb(r, lkb, sts);
759 unhold_lkb(lkb);
760}
761
ef0c2bb0
DT
762static int msg_reply_type(int mstype)
763{
764 switch (mstype) {
765 case DLM_MSG_REQUEST:
766 return DLM_MSG_REQUEST_REPLY;
767 case DLM_MSG_CONVERT:
768 return DLM_MSG_CONVERT_REPLY;
769 case DLM_MSG_UNLOCK:
770 return DLM_MSG_UNLOCK_REPLY;
771 case DLM_MSG_CANCEL:
772 return DLM_MSG_CANCEL_REPLY;
773 case DLM_MSG_LOOKUP:
774 return DLM_MSG_LOOKUP_REPLY;
775 }
776 return -1;
777}
778
e7fd4179
DT
779/* add/remove lkb from global waiters list of lkb's waiting for
780 a reply from a remote node */
781
ef0c2bb0 782static int add_to_waiters(struct dlm_lkb *lkb, int mstype)
e7fd4179
DT
783{
784 struct dlm_ls *ls = lkb->lkb_resource->res_ls;
ef0c2bb0 785 int error = 0;
e7fd4179 786
90135925 787 mutex_lock(&ls->ls_waiters_mutex);
ef0c2bb0
DT
788
789 if (is_overlap_unlock(lkb) ||
790 (is_overlap_cancel(lkb) && (mstype == DLM_MSG_CANCEL))) {
791 error = -EINVAL;
792 goto out;
793 }
794
795 if (lkb->lkb_wait_type || is_overlap_cancel(lkb)) {
796 switch (mstype) {
797 case DLM_MSG_UNLOCK:
798 lkb->lkb_flags |= DLM_IFL_OVERLAP_UNLOCK;
799 break;
800 case DLM_MSG_CANCEL:
801 lkb->lkb_flags |= DLM_IFL_OVERLAP_CANCEL;
802 break;
803 default:
804 error = -EBUSY;
805 goto out;
806 }
807 lkb->lkb_wait_count++;
808 hold_lkb(lkb);
809
810 log_debug(ls, "add overlap %x cur %d new %d count %d flags %x",
811 lkb->lkb_id, lkb->lkb_wait_type, mstype,
812 lkb->lkb_wait_count, lkb->lkb_flags);
e7fd4179
DT
813 goto out;
814 }
ef0c2bb0
DT
815
816 DLM_ASSERT(!lkb->lkb_wait_count,
817 dlm_print_lkb(lkb);
818 printk("wait_count %d\n", lkb->lkb_wait_count););
819
820 lkb->lkb_wait_count++;
e7fd4179 821 lkb->lkb_wait_type = mstype;
ef0c2bb0 822 hold_lkb(lkb);
e7fd4179
DT
823 list_add(&lkb->lkb_wait_reply, &ls->ls_waiters);
824 out:
ef0c2bb0
DT
825 if (error)
826 log_error(ls, "add_to_waiters %x error %d flags %x %d %d %s",
827 lkb->lkb_id, error, lkb->lkb_flags, mstype,
828 lkb->lkb_wait_type, lkb->lkb_resource->res_name);
90135925 829 mutex_unlock(&ls->ls_waiters_mutex);
ef0c2bb0 830 return error;
e7fd4179
DT
831}
832
b790c3b7
DT
833/* We clear the RESEND flag because we might be taking an lkb off the waiters
834 list as part of process_requestqueue (e.g. a lookup that has an optimized
835 request reply on the requestqueue) between dlm_recover_waiters_pre() which
836 set RESEND and dlm_recover_waiters_post() */
837
ef0c2bb0 838static int _remove_from_waiters(struct dlm_lkb *lkb, int mstype)
e7fd4179 839{
ef0c2bb0
DT
840 struct dlm_ls *ls = lkb->lkb_resource->res_ls;
841 int overlap_done = 0;
e7fd4179 842
ef0c2bb0
DT
843 if (is_overlap_unlock(lkb) && (mstype == DLM_MSG_UNLOCK_REPLY)) {
844 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK;
845 overlap_done = 1;
846 goto out_del;
e7fd4179 847 }
ef0c2bb0
DT
848
849 if (is_overlap_cancel(lkb) && (mstype == DLM_MSG_CANCEL_REPLY)) {
850 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
851 overlap_done = 1;
852 goto out_del;
853 }
854
855 /* N.B. type of reply may not always correspond to type of original
856 msg due to lookup->request optimization, verify others? */
857
858 if (lkb->lkb_wait_type) {
859 lkb->lkb_wait_type = 0;
860 goto out_del;
861 }
862
863 log_error(ls, "remove_from_waiters lkid %x flags %x types %d %d",
864 lkb->lkb_id, lkb->lkb_flags, mstype, lkb->lkb_wait_type);
865 return -1;
866
867 out_del:
868 /* the force-unlock/cancel has completed and we haven't recvd a reply
869 to the op that was in progress prior to the unlock/cancel; we
870 give up on any reply to the earlier op. FIXME: not sure when/how
871 this would happen */
872
873 if (overlap_done && lkb->lkb_wait_type) {
874 log_error(ls, "remove_from_waiters %x reply %d give up on %d",
875 lkb->lkb_id, mstype, lkb->lkb_wait_type);
876 lkb->lkb_wait_count--;
877 lkb->lkb_wait_type = 0;
878 }
879
880 DLM_ASSERT(lkb->lkb_wait_count, dlm_print_lkb(lkb););
881
b790c3b7 882 lkb->lkb_flags &= ~DLM_IFL_RESEND;
ef0c2bb0
DT
883 lkb->lkb_wait_count--;
884 if (!lkb->lkb_wait_count)
885 list_del_init(&lkb->lkb_wait_reply);
e7fd4179 886 unhold_lkb(lkb);
ef0c2bb0 887 return 0;
e7fd4179
DT
888}
889
ef0c2bb0 890static int remove_from_waiters(struct dlm_lkb *lkb, int mstype)
e7fd4179
DT
891{
892 struct dlm_ls *ls = lkb->lkb_resource->res_ls;
893 int error;
894
90135925 895 mutex_lock(&ls->ls_waiters_mutex);
ef0c2bb0 896 error = _remove_from_waiters(lkb, mstype);
90135925 897 mutex_unlock(&ls->ls_waiters_mutex);
e7fd4179
DT
898 return error;
899}
900
ef0c2bb0
DT
901/* Handles situations where we might be processing a "fake" or "stub" reply in
902 which we can't try to take waiters_mutex again. */
903
904static int remove_from_waiters_ms(struct dlm_lkb *lkb, struct dlm_message *ms)
905{
906 struct dlm_ls *ls = lkb->lkb_resource->res_ls;
907 int error;
908
909 if (ms != &ls->ls_stub_ms)
910 mutex_lock(&ls->ls_waiters_mutex);
911 error = _remove_from_waiters(lkb, ms->m_type);
912 if (ms != &ls->ls_stub_ms)
913 mutex_unlock(&ls->ls_waiters_mutex);
914 return error;
915}
916
e7fd4179
DT
917static void dir_remove(struct dlm_rsb *r)
918{
919 int to_nodeid;
920
921 if (dlm_no_directory(r->res_ls))
922 return;
923
924 to_nodeid = dlm_dir_nodeid(r);
925 if (to_nodeid != dlm_our_nodeid())
926 send_remove(r);
927 else
928 dlm_dir_remove_entry(r->res_ls, to_nodeid,
929 r->res_name, r->res_length);
930}
931
932/* FIXME: shouldn't this be able to exit as soon as one non-due rsb is
933 found since they are in order of newest to oldest? */
934
935static int shrink_bucket(struct dlm_ls *ls, int b)
936{
937 struct dlm_rsb *r;
938 int count = 0, found;
939
940 for (;;) {
90135925 941 found = 0;
e7fd4179
DT
942 write_lock(&ls->ls_rsbtbl[b].lock);
943 list_for_each_entry_reverse(r, &ls->ls_rsbtbl[b].toss,
944 res_hashchain) {
945 if (!time_after_eq(jiffies, r->res_toss_time +
68c817a1 946 dlm_config.ci_toss_secs * HZ))
e7fd4179 947 continue;
90135925 948 found = 1;
e7fd4179
DT
949 break;
950 }
951
952 if (!found) {
953 write_unlock(&ls->ls_rsbtbl[b].lock);
954 break;
955 }
956
957 if (kref_put(&r->res_ref, kill_rsb)) {
958 list_del(&r->res_hashchain);
959 write_unlock(&ls->ls_rsbtbl[b].lock);
960
961 if (is_master(r))
962 dir_remove(r);
963 free_rsb(r);
964 count++;
965 } else {
966 write_unlock(&ls->ls_rsbtbl[b].lock);
967 log_error(ls, "tossed rsb in use %s", r->res_name);
968 }
969 }
970
971 return count;
972}
973
974void dlm_scan_rsbs(struct dlm_ls *ls)
975{
976 int i;
977
978 if (dlm_locking_stopped(ls))
979 return;
980
981 for (i = 0; i < ls->ls_rsbtbl_size; i++) {
982 shrink_bucket(ls, i);
983 cond_resched();
984 }
985}
986
987/* lkb is master or local copy */
988
989static void set_lvb_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
990{
991 int b, len = r->res_ls->ls_lvblen;
992
993 /* b=1 lvb returned to caller
994 b=0 lvb written to rsb or invalidated
995 b=-1 do nothing */
996
997 b = dlm_lvb_operations[lkb->lkb_grmode + 1][lkb->lkb_rqmode + 1];
998
999 if (b == 1) {
1000 if (!lkb->lkb_lvbptr)
1001 return;
1002
1003 if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
1004 return;
1005
1006 if (!r->res_lvbptr)
1007 return;
1008
1009 memcpy(lkb->lkb_lvbptr, r->res_lvbptr, len);
1010 lkb->lkb_lvbseq = r->res_lvbseq;
1011
1012 } else if (b == 0) {
1013 if (lkb->lkb_exflags & DLM_LKF_IVVALBLK) {
1014 rsb_set_flag(r, RSB_VALNOTVALID);
1015 return;
1016 }
1017
1018 if (!lkb->lkb_lvbptr)
1019 return;
1020
1021 if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
1022 return;
1023
1024 if (!r->res_lvbptr)
1025 r->res_lvbptr = allocate_lvb(r->res_ls);
1026
1027 if (!r->res_lvbptr)
1028 return;
1029
1030 memcpy(r->res_lvbptr, lkb->lkb_lvbptr, len);
1031 r->res_lvbseq++;
1032 lkb->lkb_lvbseq = r->res_lvbseq;
1033 rsb_clear_flag(r, RSB_VALNOTVALID);
1034 }
1035
1036 if (rsb_flag(r, RSB_VALNOTVALID))
1037 lkb->lkb_sbflags |= DLM_SBF_VALNOTVALID;
1038}
1039
1040static void set_lvb_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1041{
1042 if (lkb->lkb_grmode < DLM_LOCK_PW)
1043 return;
1044
1045 if (lkb->lkb_exflags & DLM_LKF_IVVALBLK) {
1046 rsb_set_flag(r, RSB_VALNOTVALID);
1047 return;
1048 }
1049
1050 if (!lkb->lkb_lvbptr)
1051 return;
1052
1053 if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
1054 return;
1055
1056 if (!r->res_lvbptr)
1057 r->res_lvbptr = allocate_lvb(r->res_ls);
1058
1059 if (!r->res_lvbptr)
1060 return;
1061
1062 memcpy(r->res_lvbptr, lkb->lkb_lvbptr, r->res_ls->ls_lvblen);
1063 r->res_lvbseq++;
1064 rsb_clear_flag(r, RSB_VALNOTVALID);
1065}
1066
1067/* lkb is process copy (pc) */
1068
1069static void set_lvb_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb,
1070 struct dlm_message *ms)
1071{
1072 int b;
1073
1074 if (!lkb->lkb_lvbptr)
1075 return;
1076
1077 if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
1078 return;
1079
597d0cae 1080 b = dlm_lvb_operations[lkb->lkb_grmode + 1][lkb->lkb_rqmode + 1];
e7fd4179
DT
1081 if (b == 1) {
1082 int len = receive_extralen(ms);
1083 memcpy(lkb->lkb_lvbptr, ms->m_extra, len);
1084 lkb->lkb_lvbseq = ms->m_lvbseq;
1085 }
1086}
1087
1088/* Manipulate lkb's on rsb's convert/granted/waiting queues
1089 remove_lock -- used for unlock, removes lkb from granted
1090 revert_lock -- used for cancel, moves lkb from convert to granted
1091 grant_lock -- used for request and convert, adds lkb to granted or
1092 moves lkb from convert or waiting to granted
1093
1094 Each of these is used for master or local copy lkb's. There is
1095 also a _pc() variation used to make the corresponding change on
1096 a process copy (pc) lkb. */
1097
1098static void _remove_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1099{
1100 del_lkb(r, lkb);
1101 lkb->lkb_grmode = DLM_LOCK_IV;
1102 /* this unhold undoes the original ref from create_lkb()
1103 so this leads to the lkb being freed */
1104 unhold_lkb(lkb);
1105}
1106
1107static void remove_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1108{
1109 set_lvb_unlock(r, lkb);
1110 _remove_lock(r, lkb);
1111}
1112
1113static void remove_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb)
1114{
1115 _remove_lock(r, lkb);
1116}
1117
ef0c2bb0
DT
1118/* returns: 0 did nothing
1119 1 moved lock to granted
1120 -1 removed lock */
1121
1122static int revert_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
e7fd4179 1123{
ef0c2bb0
DT
1124 int rv = 0;
1125
e7fd4179
DT
1126 lkb->lkb_rqmode = DLM_LOCK_IV;
1127
1128 switch (lkb->lkb_status) {
597d0cae
DT
1129 case DLM_LKSTS_GRANTED:
1130 break;
e7fd4179
DT
1131 case DLM_LKSTS_CONVERT:
1132 move_lkb(r, lkb, DLM_LKSTS_GRANTED);
ef0c2bb0 1133 rv = 1;
e7fd4179
DT
1134 break;
1135 case DLM_LKSTS_WAITING:
1136 del_lkb(r, lkb);
1137 lkb->lkb_grmode = DLM_LOCK_IV;
1138 /* this unhold undoes the original ref from create_lkb()
1139 so this leads to the lkb being freed */
1140 unhold_lkb(lkb);
ef0c2bb0 1141 rv = -1;
e7fd4179
DT
1142 break;
1143 default:
1144 log_print("invalid status for revert %d", lkb->lkb_status);
1145 }
ef0c2bb0 1146 return rv;
e7fd4179
DT
1147}
1148
ef0c2bb0 1149static int revert_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb)
e7fd4179 1150{
ef0c2bb0 1151 return revert_lock(r, lkb);
e7fd4179
DT
1152}
1153
1154static void _grant_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1155{
1156 if (lkb->lkb_grmode != lkb->lkb_rqmode) {
1157 lkb->lkb_grmode = lkb->lkb_rqmode;
1158 if (lkb->lkb_status)
1159 move_lkb(r, lkb, DLM_LKSTS_GRANTED);
1160 else
1161 add_lkb(r, lkb, DLM_LKSTS_GRANTED);
1162 }
1163
1164 lkb->lkb_rqmode = DLM_LOCK_IV;
e7fd4179
DT
1165}
1166
1167static void grant_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1168{
1169 set_lvb_lock(r, lkb);
1170 _grant_lock(r, lkb);
1171 lkb->lkb_highbast = 0;
1172}
1173
1174static void grant_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb,
1175 struct dlm_message *ms)
1176{
1177 set_lvb_lock_pc(r, lkb, ms);
1178 _grant_lock(r, lkb);
1179}
1180
1181/* called by grant_pending_locks() which means an async grant message must
1182 be sent to the requesting node in addition to granting the lock if the
1183 lkb belongs to a remote node. */
1184
1185static void grant_lock_pending(struct dlm_rsb *r, struct dlm_lkb *lkb)
1186{
1187 grant_lock(r, lkb);
1188 if (is_master_copy(lkb))
1189 send_grant(r, lkb);
1190 else
1191 queue_cast(r, lkb, 0);
1192}
1193
1194static inline int first_in_list(struct dlm_lkb *lkb, struct list_head *head)
1195{
1196 struct dlm_lkb *first = list_entry(head->next, struct dlm_lkb,
1197 lkb_statequeue);
1198 if (lkb->lkb_id == first->lkb_id)
90135925 1199 return 1;
e7fd4179 1200
90135925 1201 return 0;
e7fd4179
DT
1202}
1203
e7fd4179
DT
1204/* Check if the given lkb conflicts with another lkb on the queue. */
1205
1206static int queue_conflict(struct list_head *head, struct dlm_lkb *lkb)
1207{
1208 struct dlm_lkb *this;
1209
1210 list_for_each_entry(this, head, lkb_statequeue) {
1211 if (this == lkb)
1212 continue;
3bcd3687 1213 if (!modes_compat(this, lkb))
90135925 1214 return 1;
e7fd4179 1215 }
90135925 1216 return 0;
e7fd4179
DT
1217}
1218
1219/*
1220 * "A conversion deadlock arises with a pair of lock requests in the converting
1221 * queue for one resource. The granted mode of each lock blocks the requested
1222 * mode of the other lock."
1223 *
1224 * Part 2: if the granted mode of lkb is preventing the first lkb in the
1225 * convert queue from being granted, then demote lkb (set grmode to NL).
1226 * This second form requires that we check for conv-deadlk even when
1227 * now == 0 in _can_be_granted().
1228 *
1229 * Example:
1230 * Granted Queue: empty
1231 * Convert Queue: NL->EX (first lock)
1232 * PR->EX (second lock)
1233 *
1234 * The first lock can't be granted because of the granted mode of the second
1235 * lock and the second lock can't be granted because it's not first in the
1236 * list. We demote the granted mode of the second lock (the lkb passed to this
1237 * function).
1238 *
1239 * After the resolution, the "grant pending" function needs to go back and try
1240 * to grant locks on the convert queue again since the first lock can now be
1241 * granted.
1242 */
1243
1244static int conversion_deadlock_detect(struct dlm_rsb *rsb, struct dlm_lkb *lkb)
1245{
1246 struct dlm_lkb *this, *first = NULL, *self = NULL;
1247
1248 list_for_each_entry(this, &rsb->res_convertqueue, lkb_statequeue) {
1249 if (!first)
1250 first = this;
1251 if (this == lkb) {
1252 self = lkb;
1253 continue;
1254 }
1255
e7fd4179 1256 if (!modes_compat(this, lkb) && !modes_compat(lkb, this))
90135925 1257 return 1;
e7fd4179
DT
1258 }
1259
1260 /* if lkb is on the convert queue and is preventing the first
1261 from being granted, then there's deadlock and we demote lkb.
1262 multiple converting locks may need to do this before the first
1263 converting lock can be granted. */
1264
1265 if (self && self != first) {
1266 if (!modes_compat(lkb, first) &&
1267 !queue_conflict(&rsb->res_grantqueue, first))
90135925 1268 return 1;
e7fd4179
DT
1269 }
1270
90135925 1271 return 0;
e7fd4179
DT
1272}
1273
1274/*
1275 * Return 1 if the lock can be granted, 0 otherwise.
1276 * Also detect and resolve conversion deadlocks.
1277 *
1278 * lkb is the lock to be granted
1279 *
1280 * now is 1 if the function is being called in the context of the
1281 * immediate request, it is 0 if called later, after the lock has been
1282 * queued.
1283 *
1284 * References are from chapter 6 of "VAXcluster Principles" by Roy Davis
1285 */
1286
1287static int _can_be_granted(struct dlm_rsb *r, struct dlm_lkb *lkb, int now)
1288{
1289 int8_t conv = (lkb->lkb_grmode != DLM_LOCK_IV);
1290
1291 /*
1292 * 6-10: Version 5.4 introduced an option to address the phenomenon of
1293 * a new request for a NL mode lock being blocked.
1294 *
1295 * 6-11: If the optional EXPEDITE flag is used with the new NL mode
1296 * request, then it would be granted. In essence, the use of this flag
1297 * tells the Lock Manager to expedite theis request by not considering
1298 * what may be in the CONVERTING or WAITING queues... As of this
1299 * writing, the EXPEDITE flag can be used only with new requests for NL
1300 * mode locks. This flag is not valid for conversion requests.
1301 *
1302 * A shortcut. Earlier checks return an error if EXPEDITE is used in a
1303 * conversion or used with a non-NL requested mode. We also know an
1304 * EXPEDITE request is always granted immediately, so now must always
1305 * be 1. The full condition to grant an expedite request: (now &&
1306 * !conv && lkb->rqmode == DLM_LOCK_NL && (flags & EXPEDITE)) can
1307 * therefore be shortened to just checking the flag.
1308 */
1309
1310 if (lkb->lkb_exflags & DLM_LKF_EXPEDITE)
90135925 1311 return 1;
e7fd4179
DT
1312
1313 /*
1314 * A shortcut. Without this, !queue_conflict(grantqueue, lkb) would be
1315 * added to the remaining conditions.
1316 */
1317
1318 if (queue_conflict(&r->res_grantqueue, lkb))
1319 goto out;
1320
1321 /*
1322 * 6-3: By default, a conversion request is immediately granted if the
1323 * requested mode is compatible with the modes of all other granted
1324 * locks
1325 */
1326
1327 if (queue_conflict(&r->res_convertqueue, lkb))
1328 goto out;
1329
1330 /*
1331 * 6-5: But the default algorithm for deciding whether to grant or
1332 * queue conversion requests does not by itself guarantee that such
1333 * requests are serviced on a "first come first serve" basis. This, in
1334 * turn, can lead to a phenomenon known as "indefinate postponement".
1335 *
1336 * 6-7: This issue is dealt with by using the optional QUECVT flag with
1337 * the system service employed to request a lock conversion. This flag
1338 * forces certain conversion requests to be queued, even if they are
1339 * compatible with the granted modes of other locks on the same
1340 * resource. Thus, the use of this flag results in conversion requests
1341 * being ordered on a "first come first servce" basis.
1342 *
1343 * DCT: This condition is all about new conversions being able to occur
1344 * "in place" while the lock remains on the granted queue (assuming
1345 * nothing else conflicts.) IOW if QUECVT isn't set, a conversion
1346 * doesn't _have_ to go onto the convert queue where it's processed in
1347 * order. The "now" variable is necessary to distinguish converts
1348 * being received and processed for the first time now, because once a
1349 * convert is moved to the conversion queue the condition below applies
1350 * requiring fifo granting.
1351 */
1352
1353 if (now && conv && !(lkb->lkb_exflags & DLM_LKF_QUECVT))
90135925 1354 return 1;
e7fd4179
DT
1355
1356 /*
3bcd3687
DT
1357 * The NOORDER flag is set to avoid the standard vms rules on grant
1358 * order.
e7fd4179
DT
1359 */
1360
1361 if (lkb->lkb_exflags & DLM_LKF_NOORDER)
90135925 1362 return 1;
e7fd4179
DT
1363
1364 /*
1365 * 6-3: Once in that queue [CONVERTING], a conversion request cannot be
1366 * granted until all other conversion requests ahead of it are granted
1367 * and/or canceled.
1368 */
1369
1370 if (!now && conv && first_in_list(lkb, &r->res_convertqueue))
90135925 1371 return 1;
e7fd4179
DT
1372
1373 /*
1374 * 6-4: By default, a new request is immediately granted only if all
1375 * three of the following conditions are satisfied when the request is
1376 * issued:
1377 * - The queue of ungranted conversion requests for the resource is
1378 * empty.
1379 * - The queue of ungranted new requests for the resource is empty.
1380 * - The mode of the new request is compatible with the most
1381 * restrictive mode of all granted locks on the resource.
1382 */
1383
1384 if (now && !conv && list_empty(&r->res_convertqueue) &&
1385 list_empty(&r->res_waitqueue))
90135925 1386 return 1;
e7fd4179
DT
1387
1388 /*
1389 * 6-4: Once a lock request is in the queue of ungranted new requests,
1390 * it cannot be granted until the queue of ungranted conversion
1391 * requests is empty, all ungranted new requests ahead of it are
1392 * granted and/or canceled, and it is compatible with the granted mode
1393 * of the most restrictive lock granted on the resource.
1394 */
1395
1396 if (!now && !conv && list_empty(&r->res_convertqueue) &&
1397 first_in_list(lkb, &r->res_waitqueue))
90135925 1398 return 1;
e7fd4179
DT
1399
1400 out:
1401 /*
1402 * The following, enabled by CONVDEADLK, departs from VMS.
1403 */
1404
1405 if (conv && (lkb->lkb_exflags & DLM_LKF_CONVDEADLK) &&
1406 conversion_deadlock_detect(r, lkb)) {
1407 lkb->lkb_grmode = DLM_LOCK_NL;
1408 lkb->lkb_sbflags |= DLM_SBF_DEMOTED;
1409 }
1410
90135925 1411 return 0;
e7fd4179
DT
1412}
1413
1414/*
1415 * The ALTPR and ALTCW flags aren't traditional lock manager flags, but are a
1416 * simple way to provide a big optimization to applications that can use them.
1417 */
1418
1419static int can_be_granted(struct dlm_rsb *r, struct dlm_lkb *lkb, int now)
1420{
1421 uint32_t flags = lkb->lkb_exflags;
1422 int rv;
1423 int8_t alt = 0, rqmode = lkb->lkb_rqmode;
1424
1425 rv = _can_be_granted(r, lkb, now);
1426 if (rv)
1427 goto out;
1428
1429 if (lkb->lkb_sbflags & DLM_SBF_DEMOTED)
1430 goto out;
1431
1432 if (rqmode != DLM_LOCK_PR && flags & DLM_LKF_ALTPR)
1433 alt = DLM_LOCK_PR;
1434 else if (rqmode != DLM_LOCK_CW && flags & DLM_LKF_ALTCW)
1435 alt = DLM_LOCK_CW;
1436
1437 if (alt) {
1438 lkb->lkb_rqmode = alt;
1439 rv = _can_be_granted(r, lkb, now);
1440 if (rv)
1441 lkb->lkb_sbflags |= DLM_SBF_ALTMODE;
1442 else
1443 lkb->lkb_rqmode = rqmode;
1444 }
1445 out:
1446 return rv;
1447}
1448
1449static int grant_pending_convert(struct dlm_rsb *r, int high)
1450{
1451 struct dlm_lkb *lkb, *s;
1452 int hi, demoted, quit, grant_restart, demote_restart;
1453
1454 quit = 0;
1455 restart:
1456 grant_restart = 0;
1457 demote_restart = 0;
1458 hi = DLM_LOCK_IV;
1459
1460 list_for_each_entry_safe(lkb, s, &r->res_convertqueue, lkb_statequeue) {
1461 demoted = is_demoted(lkb);
90135925 1462 if (can_be_granted(r, lkb, 0)) {
e7fd4179
DT
1463 grant_lock_pending(r, lkb);
1464 grant_restart = 1;
1465 } else {
1466 hi = max_t(int, lkb->lkb_rqmode, hi);
1467 if (!demoted && is_demoted(lkb))
1468 demote_restart = 1;
1469 }
1470 }
1471
1472 if (grant_restart)
1473 goto restart;
1474 if (demote_restart && !quit) {
1475 quit = 1;
1476 goto restart;
1477 }
1478
1479 return max_t(int, high, hi);
1480}
1481
1482static int grant_pending_wait(struct dlm_rsb *r, int high)
1483{
1484 struct dlm_lkb *lkb, *s;
1485
1486 list_for_each_entry_safe(lkb, s, &r->res_waitqueue, lkb_statequeue) {
90135925 1487 if (can_be_granted(r, lkb, 0))
e7fd4179
DT
1488 grant_lock_pending(r, lkb);
1489 else
1490 high = max_t(int, lkb->lkb_rqmode, high);
1491 }
1492
1493 return high;
1494}
1495
1496static void grant_pending_locks(struct dlm_rsb *r)
1497{
1498 struct dlm_lkb *lkb, *s;
1499 int high = DLM_LOCK_IV;
1500
a345da3e 1501 DLM_ASSERT(is_master(r), dlm_dump_rsb(r););
e7fd4179
DT
1502
1503 high = grant_pending_convert(r, high);
1504 high = grant_pending_wait(r, high);
1505
1506 if (high == DLM_LOCK_IV)
1507 return;
1508
1509 /*
1510 * If there are locks left on the wait/convert queue then send blocking
1511 * ASTs to granted locks based on the largest requested mode (high)
3bcd3687 1512 * found above. FIXME: highbast < high comparison not valid for PR/CW.
e7fd4179
DT
1513 */
1514
1515 list_for_each_entry_safe(lkb, s, &r->res_grantqueue, lkb_statequeue) {
1516 if (lkb->lkb_bastaddr && (lkb->lkb_highbast < high) &&
1517 !__dlm_compat_matrix[lkb->lkb_grmode+1][high+1]) {
1518 queue_bast(r, lkb, high);
1519 lkb->lkb_highbast = high;
1520 }
1521 }
1522}
1523
1524static void send_bast_queue(struct dlm_rsb *r, struct list_head *head,
1525 struct dlm_lkb *lkb)
1526{
1527 struct dlm_lkb *gr;
1528
1529 list_for_each_entry(gr, head, lkb_statequeue) {
1530 if (gr->lkb_bastaddr &&
1531 gr->lkb_highbast < lkb->lkb_rqmode &&
3bcd3687 1532 !modes_compat(gr, lkb)) {
e7fd4179
DT
1533 queue_bast(r, gr, lkb->lkb_rqmode);
1534 gr->lkb_highbast = lkb->lkb_rqmode;
1535 }
1536 }
1537}
1538
1539static void send_blocking_asts(struct dlm_rsb *r, struct dlm_lkb *lkb)
1540{
1541 send_bast_queue(r, &r->res_grantqueue, lkb);
1542}
1543
1544static void send_blocking_asts_all(struct dlm_rsb *r, struct dlm_lkb *lkb)
1545{
1546 send_bast_queue(r, &r->res_grantqueue, lkb);
1547 send_bast_queue(r, &r->res_convertqueue, lkb);
1548}
1549
1550/* set_master(r, lkb) -- set the master nodeid of a resource
1551
1552 The purpose of this function is to set the nodeid field in the given
1553 lkb using the nodeid field in the given rsb. If the rsb's nodeid is
1554 known, it can just be copied to the lkb and the function will return
1555 0. If the rsb's nodeid is _not_ known, it needs to be looked up
1556 before it can be copied to the lkb.
1557
1558 When the rsb nodeid is being looked up remotely, the initial lkb
1559 causing the lookup is kept on the ls_waiters list waiting for the
1560 lookup reply. Other lkb's waiting for the same rsb lookup are kept
1561 on the rsb's res_lookup list until the master is verified.
1562
1563 Return values:
1564 0: nodeid is set in rsb/lkb and the caller should go ahead and use it
1565 1: the rsb master is not available and the lkb has been placed on
1566 a wait queue
1567*/
1568
1569static int set_master(struct dlm_rsb *r, struct dlm_lkb *lkb)
1570{
1571 struct dlm_ls *ls = r->res_ls;
1572 int error, dir_nodeid, ret_nodeid, our_nodeid = dlm_our_nodeid();
1573
1574 if (rsb_flag(r, RSB_MASTER_UNCERTAIN)) {
1575 rsb_clear_flag(r, RSB_MASTER_UNCERTAIN);
1576 r->res_first_lkid = lkb->lkb_id;
1577 lkb->lkb_nodeid = r->res_nodeid;
1578 return 0;
1579 }
1580
1581 if (r->res_first_lkid && r->res_first_lkid != lkb->lkb_id) {
1582 list_add_tail(&lkb->lkb_rsb_lookup, &r->res_lookup);
1583 return 1;
1584 }
1585
1586 if (r->res_nodeid == 0) {
1587 lkb->lkb_nodeid = 0;
1588 return 0;
1589 }
1590
1591 if (r->res_nodeid > 0) {
1592 lkb->lkb_nodeid = r->res_nodeid;
1593 return 0;
1594 }
1595
a345da3e 1596 DLM_ASSERT(r->res_nodeid == -1, dlm_dump_rsb(r););
e7fd4179
DT
1597
1598 dir_nodeid = dlm_dir_nodeid(r);
1599
1600 if (dir_nodeid != our_nodeid) {
1601 r->res_first_lkid = lkb->lkb_id;
1602 send_lookup(r, lkb);
1603 return 1;
1604 }
1605
1606 for (;;) {
1607 /* It's possible for dlm_scand to remove an old rsb for
1608 this same resource from the toss list, us to create
1609 a new one, look up the master locally, and find it
1610 already exists just before dlm_scand does the
1611 dir_remove() on the previous rsb. */
1612
1613 error = dlm_dir_lookup(ls, our_nodeid, r->res_name,
1614 r->res_length, &ret_nodeid);
1615 if (!error)
1616 break;
1617 log_debug(ls, "dir_lookup error %d %s", error, r->res_name);
1618 schedule();
1619 }
1620
1621 if (ret_nodeid == our_nodeid) {
1622 r->res_first_lkid = 0;
1623 r->res_nodeid = 0;
1624 lkb->lkb_nodeid = 0;
1625 } else {
1626 r->res_first_lkid = lkb->lkb_id;
1627 r->res_nodeid = ret_nodeid;
1628 lkb->lkb_nodeid = ret_nodeid;
1629 }
1630 return 0;
1631}
1632
1633static void process_lookup_list(struct dlm_rsb *r)
1634{
1635 struct dlm_lkb *lkb, *safe;
1636
1637 list_for_each_entry_safe(lkb, safe, &r->res_lookup, lkb_rsb_lookup) {
ef0c2bb0 1638 list_del_init(&lkb->lkb_rsb_lookup);
e7fd4179
DT
1639 _request_lock(r, lkb);
1640 schedule();
1641 }
1642}
1643
1644/* confirm_master -- confirm (or deny) an rsb's master nodeid */
1645
1646static void confirm_master(struct dlm_rsb *r, int error)
1647{
1648 struct dlm_lkb *lkb;
1649
1650 if (!r->res_first_lkid)
1651 return;
1652
1653 switch (error) {
1654 case 0:
1655 case -EINPROGRESS:
1656 r->res_first_lkid = 0;
1657 process_lookup_list(r);
1658 break;
1659
1660 case -EAGAIN:
1661 /* the remote master didn't queue our NOQUEUE request;
1662 make a waiting lkb the first_lkid */
1663
1664 r->res_first_lkid = 0;
1665
1666 if (!list_empty(&r->res_lookup)) {
1667 lkb = list_entry(r->res_lookup.next, struct dlm_lkb,
1668 lkb_rsb_lookup);
ef0c2bb0 1669 list_del_init(&lkb->lkb_rsb_lookup);
e7fd4179
DT
1670 r->res_first_lkid = lkb->lkb_id;
1671 _request_lock(r, lkb);
1672 } else
1673 r->res_nodeid = -1;
1674 break;
1675
1676 default:
1677 log_error(r->res_ls, "confirm_master unknown error %d", error);
1678 }
1679}
1680
1681static int set_lock_args(int mode, struct dlm_lksb *lksb, uint32_t flags,
1682 int namelen, uint32_t parent_lkid, void *ast,
3bcd3687 1683 void *astarg, void *bast, struct dlm_args *args)
e7fd4179
DT
1684{
1685 int rv = -EINVAL;
1686
1687 /* check for invalid arg usage */
1688
1689 if (mode < 0 || mode > DLM_LOCK_EX)
1690 goto out;
1691
1692 if (!(flags & DLM_LKF_CONVERT) && (namelen > DLM_RESNAME_MAXLEN))
1693 goto out;
1694
1695 if (flags & DLM_LKF_CANCEL)
1696 goto out;
1697
1698 if (flags & DLM_LKF_QUECVT && !(flags & DLM_LKF_CONVERT))
1699 goto out;
1700
1701 if (flags & DLM_LKF_CONVDEADLK && !(flags & DLM_LKF_CONVERT))
1702 goto out;
1703
1704 if (flags & DLM_LKF_CONVDEADLK && flags & DLM_LKF_NOQUEUE)
1705 goto out;
1706
1707 if (flags & DLM_LKF_EXPEDITE && flags & DLM_LKF_CONVERT)
1708 goto out;
1709
1710 if (flags & DLM_LKF_EXPEDITE && flags & DLM_LKF_QUECVT)
1711 goto out;
1712
1713 if (flags & DLM_LKF_EXPEDITE && flags & DLM_LKF_NOQUEUE)
1714 goto out;
1715
1716 if (flags & DLM_LKF_EXPEDITE && mode != DLM_LOCK_NL)
1717 goto out;
1718
1719 if (!ast || !lksb)
1720 goto out;
1721
1722 if (flags & DLM_LKF_VALBLK && !lksb->sb_lvbptr)
1723 goto out;
1724
1725 /* parent/child locks not yet supported */
1726 if (parent_lkid)
1727 goto out;
1728
1729 if (flags & DLM_LKF_CONVERT && !lksb->sb_lkid)
1730 goto out;
1731
1732 /* these args will be copied to the lkb in validate_lock_args,
1733 it cannot be done now because when converting locks, fields in
1734 an active lkb cannot be modified before locking the rsb */
1735
1736 args->flags = flags;
1737 args->astaddr = ast;
1738 args->astparam = (long) astarg;
1739 args->bastaddr = bast;
1740 args->mode = mode;
1741 args->lksb = lksb;
e7fd4179
DT
1742 rv = 0;
1743 out:
1744 return rv;
1745}
1746
1747static int set_unlock_args(uint32_t flags, void *astarg, struct dlm_args *args)
1748{
1749 if (flags & ~(DLM_LKF_CANCEL | DLM_LKF_VALBLK | DLM_LKF_IVVALBLK |
1750 DLM_LKF_FORCEUNLOCK))
1751 return -EINVAL;
1752
ef0c2bb0
DT
1753 if (flags & DLM_LKF_CANCEL && flags & DLM_LKF_FORCEUNLOCK)
1754 return -EINVAL;
1755
e7fd4179
DT
1756 args->flags = flags;
1757 args->astparam = (long) astarg;
1758 return 0;
1759}
1760
1761static int validate_lock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
1762 struct dlm_args *args)
1763{
1764 int rv = -EINVAL;
1765
1766 if (args->flags & DLM_LKF_CONVERT) {
1767 if (lkb->lkb_flags & DLM_IFL_MSTCPY)
1768 goto out;
1769
1770 if (args->flags & DLM_LKF_QUECVT &&
1771 !__quecvt_compat_matrix[lkb->lkb_grmode+1][args->mode+1])
1772 goto out;
1773
1774 rv = -EBUSY;
1775 if (lkb->lkb_status != DLM_LKSTS_GRANTED)
1776 goto out;
1777
1778 if (lkb->lkb_wait_type)
1779 goto out;
ef0c2bb0
DT
1780
1781 if (is_overlap(lkb))
1782 goto out;
e7fd4179
DT
1783 }
1784
1785 lkb->lkb_exflags = args->flags;
1786 lkb->lkb_sbflags = 0;
1787 lkb->lkb_astaddr = args->astaddr;
1788 lkb->lkb_astparam = args->astparam;
1789 lkb->lkb_bastaddr = args->bastaddr;
1790 lkb->lkb_rqmode = args->mode;
1791 lkb->lkb_lksb = args->lksb;
1792 lkb->lkb_lvbptr = args->lksb->sb_lvbptr;
1793 lkb->lkb_ownpid = (int) current->pid;
e7fd4179
DT
1794 rv = 0;
1795 out:
1796 return rv;
1797}
1798
ef0c2bb0
DT
1799/* when dlm_unlock() sees -EBUSY with CANCEL/FORCEUNLOCK it returns 0
1800 for success */
1801
1802/* note: it's valid for lkb_nodeid/res_nodeid to be -1 when we get here
1803 because there may be a lookup in progress and it's valid to do
1804 cancel/unlockf on it */
1805
e7fd4179
DT
1806static int validate_unlock_args(struct dlm_lkb *lkb, struct dlm_args *args)
1807{
ef0c2bb0 1808 struct dlm_ls *ls = lkb->lkb_resource->res_ls;
e7fd4179
DT
1809 int rv = -EINVAL;
1810
ef0c2bb0
DT
1811 if (lkb->lkb_flags & DLM_IFL_MSTCPY) {
1812 log_error(ls, "unlock on MSTCPY %x", lkb->lkb_id);
1813 dlm_print_lkb(lkb);
e7fd4179 1814 goto out;
ef0c2bb0 1815 }
e7fd4179 1816
ef0c2bb0
DT
1817 /* an lkb may still exist even though the lock is EOL'ed due to a
1818 cancel, unlock or failed noqueue request; an app can't use these
1819 locks; return same error as if the lkid had not been found at all */
e7fd4179 1820
ef0c2bb0
DT
1821 if (lkb->lkb_flags & DLM_IFL_ENDOFLIFE) {
1822 log_debug(ls, "unlock on ENDOFLIFE %x", lkb->lkb_id);
1823 rv = -ENOENT;
e7fd4179 1824 goto out;
ef0c2bb0 1825 }
e7fd4179 1826
ef0c2bb0
DT
1827 /* an lkb may be waiting for an rsb lookup to complete where the
1828 lookup was initiated by another lock */
1829
1830 if (args->flags & (DLM_LKF_CANCEL | DLM_LKF_FORCEUNLOCK)) {
1831 if (!list_empty(&lkb->lkb_rsb_lookup)) {
1832 log_debug(ls, "unlock on rsb_lookup %x", lkb->lkb_id);
1833 list_del_init(&lkb->lkb_rsb_lookup);
1834 queue_cast(lkb->lkb_resource, lkb,
1835 args->flags & DLM_LKF_CANCEL ?
1836 -DLM_ECANCEL : -DLM_EUNLOCK);
1837 unhold_lkb(lkb); /* undoes create_lkb() */
1838 rv = -EBUSY;
1839 goto out;
1840 }
1841 }
1842
1843 /* cancel not allowed with another cancel/unlock in progress */
1844
1845 if (args->flags & DLM_LKF_CANCEL) {
1846 if (lkb->lkb_exflags & DLM_LKF_CANCEL)
1847 goto out;
1848
1849 if (is_overlap(lkb))
1850 goto out;
1851
1852 if (lkb->lkb_flags & DLM_IFL_RESEND) {
1853 lkb->lkb_flags |= DLM_IFL_OVERLAP_CANCEL;
1854 rv = -EBUSY;
1855 goto out;
1856 }
1857
1858 switch (lkb->lkb_wait_type) {
1859 case DLM_MSG_LOOKUP:
1860 case DLM_MSG_REQUEST:
1861 lkb->lkb_flags |= DLM_IFL_OVERLAP_CANCEL;
1862 rv = -EBUSY;
1863 goto out;
1864 case DLM_MSG_UNLOCK:
1865 case DLM_MSG_CANCEL:
1866 goto out;
1867 }
1868 /* add_to_waiters() will set OVERLAP_CANCEL */
1869 goto out_ok;
1870 }
1871
1872 /* do we need to allow a force-unlock if there's a normal unlock
1873 already in progress? in what conditions could the normal unlock
1874 fail such that we'd want to send a force-unlock to be sure? */
1875
1876 if (args->flags & DLM_LKF_FORCEUNLOCK) {
1877 if (lkb->lkb_exflags & DLM_LKF_FORCEUNLOCK)
1878 goto out;
1879
1880 if (is_overlap_unlock(lkb))
1881 goto out;
e7fd4179 1882
ef0c2bb0
DT
1883 if (lkb->lkb_flags & DLM_IFL_RESEND) {
1884 lkb->lkb_flags |= DLM_IFL_OVERLAP_UNLOCK;
1885 rv = -EBUSY;
1886 goto out;
1887 }
1888
1889 switch (lkb->lkb_wait_type) {
1890 case DLM_MSG_LOOKUP:
1891 case DLM_MSG_REQUEST:
1892 lkb->lkb_flags |= DLM_IFL_OVERLAP_UNLOCK;
1893 rv = -EBUSY;
1894 goto out;
1895 case DLM_MSG_UNLOCK:
1896 goto out;
1897 }
1898 /* add_to_waiters() will set OVERLAP_UNLOCK */
1899 goto out_ok;
1900 }
1901
1902 /* normal unlock not allowed if there's any op in progress */
e7fd4179 1903 rv = -EBUSY;
ef0c2bb0 1904 if (lkb->lkb_wait_type || lkb->lkb_wait_count)
e7fd4179
DT
1905 goto out;
1906
1907 out_ok:
ef0c2bb0
DT
1908 /* an overlapping op shouldn't blow away exflags from other op */
1909 lkb->lkb_exflags |= args->flags;
e7fd4179
DT
1910 lkb->lkb_sbflags = 0;
1911 lkb->lkb_astparam = args->astparam;
e7fd4179
DT
1912 rv = 0;
1913 out:
ef0c2bb0
DT
1914 if (rv)
1915 log_debug(ls, "validate_unlock_args %d %x %x %x %x %d %s", rv,
1916 lkb->lkb_id, lkb->lkb_flags, lkb->lkb_exflags,
1917 args->flags, lkb->lkb_wait_type,
1918 lkb->lkb_resource->res_name);
e7fd4179
DT
1919 return rv;
1920}
1921
1922/*
1923 * Four stage 4 varieties:
1924 * do_request(), do_convert(), do_unlock(), do_cancel()
1925 * These are called on the master node for the given lock and
1926 * from the central locking logic.
1927 */
1928
1929static int do_request(struct dlm_rsb *r, struct dlm_lkb *lkb)
1930{
1931 int error = 0;
1932
90135925 1933 if (can_be_granted(r, lkb, 1)) {
e7fd4179
DT
1934 grant_lock(r, lkb);
1935 queue_cast(r, lkb, 0);
1936 goto out;
1937 }
1938
1939 if (can_be_queued(lkb)) {
1940 error = -EINPROGRESS;
1941 add_lkb(r, lkb, DLM_LKSTS_WAITING);
1942 send_blocking_asts(r, lkb);
1943 goto out;
1944 }
1945
1946 error = -EAGAIN;
1947 if (force_blocking_asts(lkb))
1948 send_blocking_asts_all(r, lkb);
1949 queue_cast(r, lkb, -EAGAIN);
1950
1951 out:
1952 return error;
1953}
1954
1955static int do_convert(struct dlm_rsb *r, struct dlm_lkb *lkb)
1956{
1957 int error = 0;
1958
1959 /* changing an existing lock may allow others to be granted */
1960
90135925 1961 if (can_be_granted(r, lkb, 1)) {
e7fd4179
DT
1962 grant_lock(r, lkb);
1963 queue_cast(r, lkb, 0);
1964 grant_pending_locks(r);
1965 goto out;
1966 }
1967
1968 if (can_be_queued(lkb)) {
1969 if (is_demoted(lkb))
1970 grant_pending_locks(r);
1971 error = -EINPROGRESS;
1972 del_lkb(r, lkb);
1973 add_lkb(r, lkb, DLM_LKSTS_CONVERT);
1974 send_blocking_asts(r, lkb);
1975 goto out;
1976 }
1977
1978 error = -EAGAIN;
1979 if (force_blocking_asts(lkb))
1980 send_blocking_asts_all(r, lkb);
1981 queue_cast(r, lkb, -EAGAIN);
1982
1983 out:
1984 return error;
1985}
1986
1987static int do_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1988{
1989 remove_lock(r, lkb);
1990 queue_cast(r, lkb, -DLM_EUNLOCK);
1991 grant_pending_locks(r);
1992 return -DLM_EUNLOCK;
1993}
1994
ef0c2bb0 1995/* returns: 0 did nothing, -DLM_ECANCEL canceled lock */
907b9bce 1996
e7fd4179
DT
1997static int do_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb)
1998{
ef0c2bb0
DT
1999 int error;
2000
2001 error = revert_lock(r, lkb);
2002 if (error) {
2003 queue_cast(r, lkb, -DLM_ECANCEL);
2004 grant_pending_locks(r);
2005 return -DLM_ECANCEL;
2006 }
2007 return 0;
e7fd4179
DT
2008}
2009
2010/*
2011 * Four stage 3 varieties:
2012 * _request_lock(), _convert_lock(), _unlock_lock(), _cancel_lock()
2013 */
2014
2015/* add a new lkb to a possibly new rsb, called by requesting process */
2016
2017static int _request_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2018{
2019 int error;
2020
2021 /* set_master: sets lkb nodeid from r */
2022
2023 error = set_master(r, lkb);
2024 if (error < 0)
2025 goto out;
2026 if (error) {
2027 error = 0;
2028 goto out;
2029 }
2030
2031 if (is_remote(r))
2032 /* receive_request() calls do_request() on remote node */
2033 error = send_request(r, lkb);
2034 else
2035 error = do_request(r, lkb);
2036 out:
2037 return error;
2038}
2039
3bcd3687 2040/* change some property of an existing lkb, e.g. mode */
e7fd4179
DT
2041
2042static int _convert_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2043{
2044 int error;
2045
2046 if (is_remote(r))
2047 /* receive_convert() calls do_convert() on remote node */
2048 error = send_convert(r, lkb);
2049 else
2050 error = do_convert(r, lkb);
2051
2052 return error;
2053}
2054
2055/* remove an existing lkb from the granted queue */
2056
2057static int _unlock_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2058{
2059 int error;
2060
2061 if (is_remote(r))
2062 /* receive_unlock() calls do_unlock() on remote node */
2063 error = send_unlock(r, lkb);
2064 else
2065 error = do_unlock(r, lkb);
2066
2067 return error;
2068}
2069
2070/* remove an existing lkb from the convert or wait queue */
2071
2072static int _cancel_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2073{
2074 int error;
2075
2076 if (is_remote(r))
2077 /* receive_cancel() calls do_cancel() on remote node */
2078 error = send_cancel(r, lkb);
2079 else
2080 error = do_cancel(r, lkb);
2081
2082 return error;
2083}
2084
2085/*
2086 * Four stage 2 varieties:
2087 * request_lock(), convert_lock(), unlock_lock(), cancel_lock()
2088 */
2089
2090static int request_lock(struct dlm_ls *ls, struct dlm_lkb *lkb, char *name,
2091 int len, struct dlm_args *args)
2092{
2093 struct dlm_rsb *r;
2094 int error;
2095
2096 error = validate_lock_args(ls, lkb, args);
2097 if (error)
2098 goto out;
2099
2100 error = find_rsb(ls, name, len, R_CREATE, &r);
2101 if (error)
2102 goto out;
2103
2104 lock_rsb(r);
2105
2106 attach_lkb(r, lkb);
2107 lkb->lkb_lksb->sb_lkid = lkb->lkb_id;
2108
2109 error = _request_lock(r, lkb);
2110
2111 unlock_rsb(r);
2112 put_rsb(r);
2113
2114 out:
2115 return error;
2116}
2117
2118static int convert_lock(struct dlm_ls *ls, struct dlm_lkb *lkb,
2119 struct dlm_args *args)
2120{
2121 struct dlm_rsb *r;
2122 int error;
2123
2124 r = lkb->lkb_resource;
2125
2126 hold_rsb(r);
2127 lock_rsb(r);
2128
2129 error = validate_lock_args(ls, lkb, args);
2130 if (error)
2131 goto out;
2132
2133 error = _convert_lock(r, lkb);
2134 out:
2135 unlock_rsb(r);
2136 put_rsb(r);
2137 return error;
2138}
2139
2140static int unlock_lock(struct dlm_ls *ls, struct dlm_lkb *lkb,
2141 struct dlm_args *args)
2142{
2143 struct dlm_rsb *r;
2144 int error;
2145
2146 r = lkb->lkb_resource;
2147
2148 hold_rsb(r);
2149 lock_rsb(r);
2150
2151 error = validate_unlock_args(lkb, args);
2152 if (error)
2153 goto out;
2154
2155 error = _unlock_lock(r, lkb);
2156 out:
2157 unlock_rsb(r);
2158 put_rsb(r);
2159 return error;
2160}
2161
2162static int cancel_lock(struct dlm_ls *ls, struct dlm_lkb *lkb,
2163 struct dlm_args *args)
2164{
2165 struct dlm_rsb *r;
2166 int error;
2167
2168 r = lkb->lkb_resource;
2169
2170 hold_rsb(r);
2171 lock_rsb(r);
2172
2173 error = validate_unlock_args(lkb, args);
2174 if (error)
2175 goto out;
2176
2177 error = _cancel_lock(r, lkb);
2178 out:
2179 unlock_rsb(r);
2180 put_rsb(r);
2181 return error;
2182}
2183
2184/*
2185 * Two stage 1 varieties: dlm_lock() and dlm_unlock()
2186 */
2187
2188int dlm_lock(dlm_lockspace_t *lockspace,
2189 int mode,
2190 struct dlm_lksb *lksb,
2191 uint32_t flags,
2192 void *name,
2193 unsigned int namelen,
2194 uint32_t parent_lkid,
2195 void (*ast) (void *astarg),
2196 void *astarg,
3bcd3687 2197 void (*bast) (void *astarg, int mode))
e7fd4179
DT
2198{
2199 struct dlm_ls *ls;
2200 struct dlm_lkb *lkb;
2201 struct dlm_args args;
2202 int error, convert = flags & DLM_LKF_CONVERT;
2203
2204 ls = dlm_find_lockspace_local(lockspace);
2205 if (!ls)
2206 return -EINVAL;
2207
2208 lock_recovery(ls);
2209
2210 if (convert)
2211 error = find_lkb(ls, lksb->sb_lkid, &lkb);
2212 else
2213 error = create_lkb(ls, &lkb);
2214
2215 if (error)
2216 goto out;
2217
2218 error = set_lock_args(mode, lksb, flags, namelen, parent_lkid, ast,
3bcd3687 2219 astarg, bast, &args);
e7fd4179
DT
2220 if (error)
2221 goto out_put;
2222
2223 if (convert)
2224 error = convert_lock(ls, lkb, &args);
2225 else
2226 error = request_lock(ls, lkb, name, namelen, &args);
2227
2228 if (error == -EINPROGRESS)
2229 error = 0;
2230 out_put:
2231 if (convert || error)
b3f58d8f 2232 __put_lkb(ls, lkb);
e7fd4179
DT
2233 if (error == -EAGAIN)
2234 error = 0;
2235 out:
2236 unlock_recovery(ls);
2237 dlm_put_lockspace(ls);
2238 return error;
2239}
2240
2241int dlm_unlock(dlm_lockspace_t *lockspace,
2242 uint32_t lkid,
2243 uint32_t flags,
2244 struct dlm_lksb *lksb,
2245 void *astarg)
2246{
2247 struct dlm_ls *ls;
2248 struct dlm_lkb *lkb;
2249 struct dlm_args args;
2250 int error;
2251
2252 ls = dlm_find_lockspace_local(lockspace);
2253 if (!ls)
2254 return -EINVAL;
2255
2256 lock_recovery(ls);
2257
2258 error = find_lkb(ls, lkid, &lkb);
2259 if (error)
2260 goto out;
2261
2262 error = set_unlock_args(flags, astarg, &args);
2263 if (error)
2264 goto out_put;
2265
2266 if (flags & DLM_LKF_CANCEL)
2267 error = cancel_lock(ls, lkb, &args);
2268 else
2269 error = unlock_lock(ls, lkb, &args);
2270
2271 if (error == -DLM_EUNLOCK || error == -DLM_ECANCEL)
2272 error = 0;
ef0c2bb0
DT
2273 if (error == -EBUSY && (flags & (DLM_LKF_CANCEL | DLM_LKF_FORCEUNLOCK)))
2274 error = 0;
e7fd4179 2275 out_put:
b3f58d8f 2276 dlm_put_lkb(lkb);
e7fd4179
DT
2277 out:
2278 unlock_recovery(ls);
2279 dlm_put_lockspace(ls);
2280 return error;
2281}
2282
2283/*
2284 * send/receive routines for remote operations and replies
2285 *
2286 * send_args
2287 * send_common
2288 * send_request receive_request
2289 * send_convert receive_convert
2290 * send_unlock receive_unlock
2291 * send_cancel receive_cancel
2292 * send_grant receive_grant
2293 * send_bast receive_bast
2294 * send_lookup receive_lookup
2295 * send_remove receive_remove
2296 *
2297 * send_common_reply
2298 * receive_request_reply send_request_reply
2299 * receive_convert_reply send_convert_reply
2300 * receive_unlock_reply send_unlock_reply
2301 * receive_cancel_reply send_cancel_reply
2302 * receive_lookup_reply send_lookup_reply
2303 */
2304
7e4dac33
DT
2305static int _create_message(struct dlm_ls *ls, int mb_len,
2306 int to_nodeid, int mstype,
2307 struct dlm_message **ms_ret,
2308 struct dlm_mhandle **mh_ret)
e7fd4179
DT
2309{
2310 struct dlm_message *ms;
2311 struct dlm_mhandle *mh;
2312 char *mb;
e7fd4179
DT
2313
2314 /* get_buffer gives us a message handle (mh) that we need to
2315 pass into lowcomms_commit and a message buffer (mb) that we
2316 write our data into */
2317
2318 mh = dlm_lowcomms_get_buffer(to_nodeid, mb_len, GFP_KERNEL, &mb);
2319 if (!mh)
2320 return -ENOBUFS;
2321
2322 memset(mb, 0, mb_len);
2323
2324 ms = (struct dlm_message *) mb;
2325
2326 ms->m_header.h_version = (DLM_HEADER_MAJOR | DLM_HEADER_MINOR);
7e4dac33 2327 ms->m_header.h_lockspace = ls->ls_global_id;
e7fd4179
DT
2328 ms->m_header.h_nodeid = dlm_our_nodeid();
2329 ms->m_header.h_length = mb_len;
2330 ms->m_header.h_cmd = DLM_MSG;
2331
2332 ms->m_type = mstype;
2333
2334 *mh_ret = mh;
2335 *ms_ret = ms;
2336 return 0;
2337}
2338
7e4dac33
DT
2339static int create_message(struct dlm_rsb *r, struct dlm_lkb *lkb,
2340 int to_nodeid, int mstype,
2341 struct dlm_message **ms_ret,
2342 struct dlm_mhandle **mh_ret)
2343{
2344 int mb_len = sizeof(struct dlm_message);
2345
2346 switch (mstype) {
2347 case DLM_MSG_REQUEST:
2348 case DLM_MSG_LOOKUP:
2349 case DLM_MSG_REMOVE:
2350 mb_len += r->res_length;
2351 break;
2352 case DLM_MSG_CONVERT:
2353 case DLM_MSG_UNLOCK:
2354 case DLM_MSG_REQUEST_REPLY:
2355 case DLM_MSG_CONVERT_REPLY:
2356 case DLM_MSG_GRANT:
2357 if (lkb && lkb->lkb_lvbptr)
2358 mb_len += r->res_ls->ls_lvblen;
2359 break;
2360 }
2361
2362 return _create_message(r->res_ls, mb_len, to_nodeid, mstype,
2363 ms_ret, mh_ret);
2364}
2365
e7fd4179
DT
2366/* further lowcomms enhancements or alternate implementations may make
2367 the return value from this function useful at some point */
2368
2369static int send_message(struct dlm_mhandle *mh, struct dlm_message *ms)
2370{
2371 dlm_message_out(ms);
2372 dlm_lowcomms_commit_buffer(mh);
2373 return 0;
2374}
2375
2376static void send_args(struct dlm_rsb *r, struct dlm_lkb *lkb,
2377 struct dlm_message *ms)
2378{
2379 ms->m_nodeid = lkb->lkb_nodeid;
2380 ms->m_pid = lkb->lkb_ownpid;
2381 ms->m_lkid = lkb->lkb_id;
2382 ms->m_remid = lkb->lkb_remid;
2383 ms->m_exflags = lkb->lkb_exflags;
2384 ms->m_sbflags = lkb->lkb_sbflags;
2385 ms->m_flags = lkb->lkb_flags;
2386 ms->m_lvbseq = lkb->lkb_lvbseq;
2387 ms->m_status = lkb->lkb_status;
2388 ms->m_grmode = lkb->lkb_grmode;
2389 ms->m_rqmode = lkb->lkb_rqmode;
2390 ms->m_hash = r->res_hash;
2391
2392 /* m_result and m_bastmode are set from function args,
2393 not from lkb fields */
2394
2395 if (lkb->lkb_bastaddr)
2396 ms->m_asts |= AST_BAST;
2397 if (lkb->lkb_astaddr)
2398 ms->m_asts |= AST_COMP;
2399
da49f36f
DT
2400 /* compare with switch in create_message; send_remove() doesn't
2401 use send_args() */
e7fd4179 2402
da49f36f
DT
2403 switch (ms->m_type) {
2404 case DLM_MSG_REQUEST:
2405 case DLM_MSG_LOOKUP:
2406 memcpy(ms->m_extra, r->res_name, r->res_length);
2407 break;
2408 case DLM_MSG_CONVERT:
2409 case DLM_MSG_UNLOCK:
2410 case DLM_MSG_REQUEST_REPLY:
2411 case DLM_MSG_CONVERT_REPLY:
2412 case DLM_MSG_GRANT:
2413 if (!lkb->lkb_lvbptr)
2414 break;
e7fd4179 2415 memcpy(ms->m_extra, lkb->lkb_lvbptr, r->res_ls->ls_lvblen);
da49f36f
DT
2416 break;
2417 }
e7fd4179
DT
2418}
2419
2420static int send_common(struct dlm_rsb *r, struct dlm_lkb *lkb, int mstype)
2421{
2422 struct dlm_message *ms;
2423 struct dlm_mhandle *mh;
2424 int to_nodeid, error;
2425
ef0c2bb0
DT
2426 error = add_to_waiters(lkb, mstype);
2427 if (error)
2428 return error;
e7fd4179
DT
2429
2430 to_nodeid = r->res_nodeid;
2431
2432 error = create_message(r, lkb, to_nodeid, mstype, &ms, &mh);
2433 if (error)
2434 goto fail;
2435
2436 send_args(r, lkb, ms);
2437
2438 error = send_message(mh, ms);
2439 if (error)
2440 goto fail;
2441 return 0;
2442
2443 fail:
ef0c2bb0 2444 remove_from_waiters(lkb, msg_reply_type(mstype));
e7fd4179
DT
2445 return error;
2446}
2447
2448static int send_request(struct dlm_rsb *r, struct dlm_lkb *lkb)
2449{
2450 return send_common(r, lkb, DLM_MSG_REQUEST);
2451}
2452
2453static int send_convert(struct dlm_rsb *r, struct dlm_lkb *lkb)
2454{
2455 int error;
2456
2457 error = send_common(r, lkb, DLM_MSG_CONVERT);
2458
2459 /* down conversions go without a reply from the master */
2460 if (!error && down_conversion(lkb)) {
ef0c2bb0
DT
2461 remove_from_waiters(lkb, DLM_MSG_CONVERT_REPLY);
2462 r->res_ls->ls_stub_ms.m_type = DLM_MSG_CONVERT_REPLY;
e7fd4179 2463 r->res_ls->ls_stub_ms.m_result = 0;
32f105a1 2464 r->res_ls->ls_stub_ms.m_flags = lkb->lkb_flags;
e7fd4179
DT
2465 __receive_convert_reply(r, lkb, &r->res_ls->ls_stub_ms);
2466 }
2467
2468 return error;
2469}
2470
2471/* FIXME: if this lkb is the only lock we hold on the rsb, then set
2472 MASTER_UNCERTAIN to force the next request on the rsb to confirm
2473 that the master is still correct. */
2474
2475static int send_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2476{
2477 return send_common(r, lkb, DLM_MSG_UNLOCK);
2478}
2479
2480static int send_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb)
2481{
2482 return send_common(r, lkb, DLM_MSG_CANCEL);
2483}
2484
2485static int send_grant(struct dlm_rsb *r, struct dlm_lkb *lkb)
2486{
2487 struct dlm_message *ms;
2488 struct dlm_mhandle *mh;
2489 int to_nodeid, error;
2490
2491 to_nodeid = lkb->lkb_nodeid;
2492
2493 error = create_message(r, lkb, to_nodeid, DLM_MSG_GRANT, &ms, &mh);
2494 if (error)
2495 goto out;
2496
2497 send_args(r, lkb, ms);
2498
2499 ms->m_result = 0;
2500
2501 error = send_message(mh, ms);
2502 out:
2503 return error;
2504}
2505
2506static int send_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int mode)
2507{
2508 struct dlm_message *ms;
2509 struct dlm_mhandle *mh;
2510 int to_nodeid, error;
2511
2512 to_nodeid = lkb->lkb_nodeid;
2513
2514 error = create_message(r, NULL, to_nodeid, DLM_MSG_BAST, &ms, &mh);
2515 if (error)
2516 goto out;
2517
2518 send_args(r, lkb, ms);
2519
2520 ms->m_bastmode = mode;
2521
2522 error = send_message(mh, ms);
2523 out:
2524 return error;
2525}
2526
2527static int send_lookup(struct dlm_rsb *r, struct dlm_lkb *lkb)
2528{
2529 struct dlm_message *ms;
2530 struct dlm_mhandle *mh;
2531 int to_nodeid, error;
2532
ef0c2bb0
DT
2533 error = add_to_waiters(lkb, DLM_MSG_LOOKUP);
2534 if (error)
2535 return error;
e7fd4179
DT
2536
2537 to_nodeid = dlm_dir_nodeid(r);
2538
2539 error = create_message(r, NULL, to_nodeid, DLM_MSG_LOOKUP, &ms, &mh);
2540 if (error)
2541 goto fail;
2542
2543 send_args(r, lkb, ms);
2544
2545 error = send_message(mh, ms);
2546 if (error)
2547 goto fail;
2548 return 0;
2549
2550 fail:
ef0c2bb0 2551 remove_from_waiters(lkb, DLM_MSG_LOOKUP_REPLY);
e7fd4179
DT
2552 return error;
2553}
2554
2555static int send_remove(struct dlm_rsb *r)
2556{
2557 struct dlm_message *ms;
2558 struct dlm_mhandle *mh;
2559 int to_nodeid, error;
2560
2561 to_nodeid = dlm_dir_nodeid(r);
2562
2563 error = create_message(r, NULL, to_nodeid, DLM_MSG_REMOVE, &ms, &mh);
2564 if (error)
2565 goto out;
2566
2567 memcpy(ms->m_extra, r->res_name, r->res_length);
2568 ms->m_hash = r->res_hash;
2569
2570 error = send_message(mh, ms);
2571 out:
2572 return error;
2573}
2574
2575static int send_common_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
2576 int mstype, int rv)
2577{
2578 struct dlm_message *ms;
2579 struct dlm_mhandle *mh;
2580 int to_nodeid, error;
2581
2582 to_nodeid = lkb->lkb_nodeid;
2583
2584 error = create_message(r, lkb, to_nodeid, mstype, &ms, &mh);
2585 if (error)
2586 goto out;
2587
2588 send_args(r, lkb, ms);
2589
2590 ms->m_result = rv;
2591
2592 error = send_message(mh, ms);
2593 out:
2594 return error;
2595}
2596
2597static int send_request_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
2598{
2599 return send_common_reply(r, lkb, DLM_MSG_REQUEST_REPLY, rv);
2600}
2601
2602static int send_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
2603{
2604 return send_common_reply(r, lkb, DLM_MSG_CONVERT_REPLY, rv);
2605}
2606
2607static int send_unlock_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
2608{
2609 return send_common_reply(r, lkb, DLM_MSG_UNLOCK_REPLY, rv);
2610}
2611
2612static int send_cancel_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
2613{
2614 return send_common_reply(r, lkb, DLM_MSG_CANCEL_REPLY, rv);
2615}
2616
2617static int send_lookup_reply(struct dlm_ls *ls, struct dlm_message *ms_in,
2618 int ret_nodeid, int rv)
2619{
2620 struct dlm_rsb *r = &ls->ls_stub_rsb;
2621 struct dlm_message *ms;
2622 struct dlm_mhandle *mh;
2623 int error, nodeid = ms_in->m_header.h_nodeid;
2624
2625 error = create_message(r, NULL, nodeid, DLM_MSG_LOOKUP_REPLY, &ms, &mh);
2626 if (error)
2627 goto out;
2628
2629 ms->m_lkid = ms_in->m_lkid;
2630 ms->m_result = rv;
2631 ms->m_nodeid = ret_nodeid;
2632
2633 error = send_message(mh, ms);
2634 out:
2635 return error;
2636}
2637
2638/* which args we save from a received message depends heavily on the type
2639 of message, unlike the send side where we can safely send everything about
2640 the lkb for any type of message */
2641
2642static void receive_flags(struct dlm_lkb *lkb, struct dlm_message *ms)
2643{
2644 lkb->lkb_exflags = ms->m_exflags;
6f90a8b1 2645 lkb->lkb_sbflags = ms->m_sbflags;
e7fd4179
DT
2646 lkb->lkb_flags = (lkb->lkb_flags & 0xFFFF0000) |
2647 (ms->m_flags & 0x0000FFFF);
2648}
2649
2650static void receive_flags_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
2651{
2652 lkb->lkb_sbflags = ms->m_sbflags;
2653 lkb->lkb_flags = (lkb->lkb_flags & 0xFFFF0000) |
2654 (ms->m_flags & 0x0000FFFF);
2655}
2656
2657static int receive_extralen(struct dlm_message *ms)
2658{
2659 return (ms->m_header.h_length - sizeof(struct dlm_message));
2660}
2661
e7fd4179
DT
2662static int receive_lvb(struct dlm_ls *ls, struct dlm_lkb *lkb,
2663 struct dlm_message *ms)
2664{
2665 int len;
2666
2667 if (lkb->lkb_exflags & DLM_LKF_VALBLK) {
2668 if (!lkb->lkb_lvbptr)
2669 lkb->lkb_lvbptr = allocate_lvb(ls);
2670 if (!lkb->lkb_lvbptr)
2671 return -ENOMEM;
2672 len = receive_extralen(ms);
2673 memcpy(lkb->lkb_lvbptr, ms->m_extra, len);
2674 }
2675 return 0;
2676}
2677
2678static int receive_request_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
2679 struct dlm_message *ms)
2680{
2681 lkb->lkb_nodeid = ms->m_header.h_nodeid;
2682 lkb->lkb_ownpid = ms->m_pid;
2683 lkb->lkb_remid = ms->m_lkid;
2684 lkb->lkb_grmode = DLM_LOCK_IV;
2685 lkb->lkb_rqmode = ms->m_rqmode;
2686 lkb->lkb_bastaddr = (void *) (long) (ms->m_asts & AST_BAST);
2687 lkb->lkb_astaddr = (void *) (long) (ms->m_asts & AST_COMP);
2688
2689 DLM_ASSERT(is_master_copy(lkb), dlm_print_lkb(lkb););
2690
8d07fd50
DT
2691 if (lkb->lkb_exflags & DLM_LKF_VALBLK) {
2692 /* lkb was just created so there won't be an lvb yet */
2693 lkb->lkb_lvbptr = allocate_lvb(ls);
2694 if (!lkb->lkb_lvbptr)
2695 return -ENOMEM;
2696 }
e7fd4179
DT
2697
2698 return 0;
2699}
2700
2701static int receive_convert_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
2702 struct dlm_message *ms)
2703{
2704 if (lkb->lkb_nodeid != ms->m_header.h_nodeid) {
2705 log_error(ls, "convert_args nodeid %d %d lkid %x %x",
2706 lkb->lkb_nodeid, ms->m_header.h_nodeid,
2707 lkb->lkb_id, lkb->lkb_remid);
2708 return -EINVAL;
2709 }
2710
2711 if (!is_master_copy(lkb))
2712 return -EINVAL;
2713
2714 if (lkb->lkb_status != DLM_LKSTS_GRANTED)
2715 return -EBUSY;
2716
e7fd4179
DT
2717 if (receive_lvb(ls, lkb, ms))
2718 return -ENOMEM;
2719
2720 lkb->lkb_rqmode = ms->m_rqmode;
2721 lkb->lkb_lvbseq = ms->m_lvbseq;
2722
2723 return 0;
2724}
2725
2726static int receive_unlock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
2727 struct dlm_message *ms)
2728{
2729 if (!is_master_copy(lkb))
2730 return -EINVAL;
2731 if (receive_lvb(ls, lkb, ms))
2732 return -ENOMEM;
2733 return 0;
2734}
2735
2736/* We fill in the stub-lkb fields with the info that send_xxxx_reply()
2737 uses to send a reply and that the remote end uses to process the reply. */
2738
2739static void setup_stub_lkb(struct dlm_ls *ls, struct dlm_message *ms)
2740{
2741 struct dlm_lkb *lkb = &ls->ls_stub_lkb;
2742 lkb->lkb_nodeid = ms->m_header.h_nodeid;
2743 lkb->lkb_remid = ms->m_lkid;
2744}
2745
2746static void receive_request(struct dlm_ls *ls, struct dlm_message *ms)
2747{
2748 struct dlm_lkb *lkb;
2749 struct dlm_rsb *r;
2750 int error, namelen;
2751
2752 error = create_lkb(ls, &lkb);
2753 if (error)
2754 goto fail;
2755
2756 receive_flags(lkb, ms);
2757 lkb->lkb_flags |= DLM_IFL_MSTCPY;
2758 error = receive_request_args(ls, lkb, ms);
2759 if (error) {
b3f58d8f 2760 __put_lkb(ls, lkb);
e7fd4179
DT
2761 goto fail;
2762 }
2763
2764 namelen = receive_extralen(ms);
2765
2766 error = find_rsb(ls, ms->m_extra, namelen, R_MASTER, &r);
2767 if (error) {
b3f58d8f 2768 __put_lkb(ls, lkb);
e7fd4179
DT
2769 goto fail;
2770 }
2771
2772 lock_rsb(r);
2773
2774 attach_lkb(r, lkb);
2775 error = do_request(r, lkb);
2776 send_request_reply(r, lkb, error);
2777
2778 unlock_rsb(r);
2779 put_rsb(r);
2780
2781 if (error == -EINPROGRESS)
2782 error = 0;
2783 if (error)
b3f58d8f 2784 dlm_put_lkb(lkb);
e7fd4179
DT
2785 return;
2786
2787 fail:
2788 setup_stub_lkb(ls, ms);
2789 send_request_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
2790}
2791
2792static void receive_convert(struct dlm_ls *ls, struct dlm_message *ms)
2793{
2794 struct dlm_lkb *lkb;
2795 struct dlm_rsb *r;
90135925 2796 int error, reply = 1;
e7fd4179
DT
2797
2798 error = find_lkb(ls, ms->m_remid, &lkb);
2799 if (error)
2800 goto fail;
2801
2802 r = lkb->lkb_resource;
2803
2804 hold_rsb(r);
2805 lock_rsb(r);
2806
2807 receive_flags(lkb, ms);
2808 error = receive_convert_args(ls, lkb, ms);
2809 if (error)
2810 goto out;
2811 reply = !down_conversion(lkb);
2812
2813 error = do_convert(r, lkb);
2814 out:
2815 if (reply)
2816 send_convert_reply(r, lkb, error);
2817
2818 unlock_rsb(r);
2819 put_rsb(r);
b3f58d8f 2820 dlm_put_lkb(lkb);
e7fd4179
DT
2821 return;
2822
2823 fail:
2824 setup_stub_lkb(ls, ms);
2825 send_convert_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
2826}
2827
2828static void receive_unlock(struct dlm_ls *ls, struct dlm_message *ms)
2829{
2830 struct dlm_lkb *lkb;
2831 struct dlm_rsb *r;
2832 int error;
2833
2834 error = find_lkb(ls, ms->m_remid, &lkb);
2835 if (error)
2836 goto fail;
2837
2838 r = lkb->lkb_resource;
2839
2840 hold_rsb(r);
2841 lock_rsb(r);
2842
2843 receive_flags(lkb, ms);
2844 error = receive_unlock_args(ls, lkb, ms);
2845 if (error)
2846 goto out;
2847
2848 error = do_unlock(r, lkb);
2849 out:
2850 send_unlock_reply(r, lkb, error);
2851
2852 unlock_rsb(r);
2853 put_rsb(r);
b3f58d8f 2854 dlm_put_lkb(lkb);
e7fd4179
DT
2855 return;
2856
2857 fail:
2858 setup_stub_lkb(ls, ms);
2859 send_unlock_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
2860}
2861
2862static void receive_cancel(struct dlm_ls *ls, struct dlm_message *ms)
2863{
2864 struct dlm_lkb *lkb;
2865 struct dlm_rsb *r;
2866 int error;
2867
2868 error = find_lkb(ls, ms->m_remid, &lkb);
2869 if (error)
2870 goto fail;
2871
2872 receive_flags(lkb, ms);
2873
2874 r = lkb->lkb_resource;
2875
2876 hold_rsb(r);
2877 lock_rsb(r);
2878
2879 error = do_cancel(r, lkb);
2880 send_cancel_reply(r, lkb, error);
2881
2882 unlock_rsb(r);
2883 put_rsb(r);
b3f58d8f 2884 dlm_put_lkb(lkb);
e7fd4179
DT
2885 return;
2886
2887 fail:
2888 setup_stub_lkb(ls, ms);
2889 send_cancel_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
2890}
2891
2892static void receive_grant(struct dlm_ls *ls, struct dlm_message *ms)
2893{
2894 struct dlm_lkb *lkb;
2895 struct dlm_rsb *r;
2896 int error;
2897
2898 error = find_lkb(ls, ms->m_remid, &lkb);
2899 if (error) {
2900 log_error(ls, "receive_grant no lkb");
2901 return;
2902 }
2903 DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb););
2904
2905 r = lkb->lkb_resource;
2906
2907 hold_rsb(r);
2908 lock_rsb(r);
2909
2910 receive_flags_reply(lkb, ms);
2911 grant_lock_pc(r, lkb, ms);
2912 queue_cast(r, lkb, 0);
2913
2914 unlock_rsb(r);
2915 put_rsb(r);
b3f58d8f 2916 dlm_put_lkb(lkb);
e7fd4179
DT
2917}
2918
2919static void receive_bast(struct dlm_ls *ls, struct dlm_message *ms)
2920{
2921 struct dlm_lkb *lkb;
2922 struct dlm_rsb *r;
2923 int error;
2924
2925 error = find_lkb(ls, ms->m_remid, &lkb);
2926 if (error) {
2927 log_error(ls, "receive_bast no lkb");
2928 return;
2929 }
2930 DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb););
2931
2932 r = lkb->lkb_resource;
2933
2934 hold_rsb(r);
2935 lock_rsb(r);
2936
2937 queue_bast(r, lkb, ms->m_bastmode);
2938
2939 unlock_rsb(r);
2940 put_rsb(r);
b3f58d8f 2941 dlm_put_lkb(lkb);
e7fd4179
DT
2942}
2943
2944static void receive_lookup(struct dlm_ls *ls, struct dlm_message *ms)
2945{
2946 int len, error, ret_nodeid, dir_nodeid, from_nodeid, our_nodeid;
2947
2948 from_nodeid = ms->m_header.h_nodeid;
2949 our_nodeid = dlm_our_nodeid();
2950
2951 len = receive_extralen(ms);
2952
2953 dir_nodeid = dlm_hash2nodeid(ls, ms->m_hash);
2954 if (dir_nodeid != our_nodeid) {
2955 log_error(ls, "lookup dir_nodeid %d from %d",
2956 dir_nodeid, from_nodeid);
2957 error = -EINVAL;
2958 ret_nodeid = -1;
2959 goto out;
2960 }
2961
2962 error = dlm_dir_lookup(ls, from_nodeid, ms->m_extra, len, &ret_nodeid);
2963
2964 /* Optimization: we're master so treat lookup as a request */
2965 if (!error && ret_nodeid == our_nodeid) {
2966 receive_request(ls, ms);
2967 return;
2968 }
2969 out:
2970 send_lookup_reply(ls, ms, ret_nodeid, error);
2971}
2972
2973static void receive_remove(struct dlm_ls *ls, struct dlm_message *ms)
2974{
2975 int len, dir_nodeid, from_nodeid;
2976
2977 from_nodeid = ms->m_header.h_nodeid;
2978
2979 len = receive_extralen(ms);
2980
2981 dir_nodeid = dlm_hash2nodeid(ls, ms->m_hash);
2982 if (dir_nodeid != dlm_our_nodeid()) {
2983 log_error(ls, "remove dir entry dir_nodeid %d from %d",
2984 dir_nodeid, from_nodeid);
2985 return;
2986 }
2987
2988 dlm_dir_remove_entry(ls, from_nodeid, ms->m_extra, len);
2989}
2990
8499137d
DT
2991static void receive_purge(struct dlm_ls *ls, struct dlm_message *ms)
2992{
2993 do_purge(ls, ms->m_nodeid, ms->m_pid);
2994}
2995
e7fd4179
DT
2996static void receive_request_reply(struct dlm_ls *ls, struct dlm_message *ms)
2997{
2998 struct dlm_lkb *lkb;
2999 struct dlm_rsb *r;
ef0c2bb0 3000 int error, mstype, result;
e7fd4179
DT
3001
3002 error = find_lkb(ls, ms->m_remid, &lkb);
3003 if (error) {
3004 log_error(ls, "receive_request_reply no lkb");
3005 return;
3006 }
3007 DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb););
3008
e7fd4179
DT
3009 r = lkb->lkb_resource;
3010 hold_rsb(r);
3011 lock_rsb(r);
3012
ef0c2bb0
DT
3013 mstype = lkb->lkb_wait_type;
3014 error = remove_from_waiters(lkb, DLM_MSG_REQUEST_REPLY);
3015 if (error)
3016 goto out;
3017
e7fd4179
DT
3018 /* Optimization: the dir node was also the master, so it took our
3019 lookup as a request and sent request reply instead of lookup reply */
3020 if (mstype == DLM_MSG_LOOKUP) {
3021 r->res_nodeid = ms->m_header.h_nodeid;
3022 lkb->lkb_nodeid = r->res_nodeid;
3023 }
3024
ef0c2bb0
DT
3025 /* this is the value returned from do_request() on the master */
3026 result = ms->m_result;
3027
3028 switch (result) {
e7fd4179 3029 case -EAGAIN:
ef0c2bb0 3030 /* request would block (be queued) on remote master */
e7fd4179
DT
3031 queue_cast(r, lkb, -EAGAIN);
3032 confirm_master(r, -EAGAIN);
ef0c2bb0 3033 unhold_lkb(lkb); /* undoes create_lkb() */
e7fd4179
DT
3034 break;
3035
3036 case -EINPROGRESS:
3037 case 0:
3038 /* request was queued or granted on remote master */
3039 receive_flags_reply(lkb, ms);
3040 lkb->lkb_remid = ms->m_lkid;
ef0c2bb0 3041 if (result)
e7fd4179
DT
3042 add_lkb(r, lkb, DLM_LKSTS_WAITING);
3043 else {
3044 grant_lock_pc(r, lkb, ms);
3045 queue_cast(r, lkb, 0);
3046 }
ef0c2bb0 3047 confirm_master(r, result);
e7fd4179
DT
3048 break;
3049
597d0cae 3050 case -EBADR:
e7fd4179
DT
3051 case -ENOTBLK:
3052 /* find_rsb failed to find rsb or rsb wasn't master */
ef0c2bb0
DT
3053 log_debug(ls, "receive_request_reply %x %x master diff %d %d",
3054 lkb->lkb_id, lkb->lkb_flags, r->res_nodeid, result);
e7fd4179
DT
3055 r->res_nodeid = -1;
3056 lkb->lkb_nodeid = -1;
ef0c2bb0
DT
3057
3058 if (is_overlap(lkb)) {
3059 /* we'll ignore error in cancel/unlock reply */
3060 queue_cast_overlap(r, lkb);
3061 unhold_lkb(lkb); /* undoes create_lkb() */
3062 } else
3063 _request_lock(r, lkb);
e7fd4179
DT
3064 break;
3065
3066 default:
ef0c2bb0
DT
3067 log_error(ls, "receive_request_reply %x error %d",
3068 lkb->lkb_id, result);
e7fd4179
DT
3069 }
3070
ef0c2bb0
DT
3071 if (is_overlap_unlock(lkb) && (result == 0 || result == -EINPROGRESS)) {
3072 log_debug(ls, "receive_request_reply %x result %d unlock",
3073 lkb->lkb_id, result);
3074 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK;
3075 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
3076 send_unlock(r, lkb);
3077 } else if (is_overlap_cancel(lkb) && (result == -EINPROGRESS)) {
3078 log_debug(ls, "receive_request_reply %x cancel", lkb->lkb_id);
3079 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK;
3080 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
3081 send_cancel(r, lkb);
3082 } else {
3083 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
3084 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK;
3085 }
3086 out:
e7fd4179
DT
3087 unlock_rsb(r);
3088 put_rsb(r);
b3f58d8f 3089 dlm_put_lkb(lkb);
e7fd4179
DT
3090}
3091
3092static void __receive_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
3093 struct dlm_message *ms)
3094{
e7fd4179 3095 /* this is the value returned from do_convert() on the master */
ef0c2bb0 3096 switch (ms->m_result) {
e7fd4179
DT
3097 case -EAGAIN:
3098 /* convert would block (be queued) on remote master */
3099 queue_cast(r, lkb, -EAGAIN);
3100 break;
3101
3102 case -EINPROGRESS:
3103 /* convert was queued on remote master */
3104 del_lkb(r, lkb);
3105 add_lkb(r, lkb, DLM_LKSTS_CONVERT);
3106 break;
3107
3108 case 0:
3109 /* convert was granted on remote master */
3110 receive_flags_reply(lkb, ms);
3111 grant_lock_pc(r, lkb, ms);
3112 queue_cast(r, lkb, 0);
3113 break;
3114
3115 default:
ef0c2bb0
DT
3116 log_error(r->res_ls, "receive_convert_reply %x error %d",
3117 lkb->lkb_id, ms->m_result);
e7fd4179
DT
3118 }
3119}
3120
3121static void _receive_convert_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
3122{
3123 struct dlm_rsb *r = lkb->lkb_resource;
ef0c2bb0 3124 int error;
e7fd4179
DT
3125
3126 hold_rsb(r);
3127 lock_rsb(r);
3128
ef0c2bb0
DT
3129 /* stub reply can happen with waiters_mutex held */
3130 error = remove_from_waiters_ms(lkb, ms);
3131 if (error)
3132 goto out;
e7fd4179 3133
ef0c2bb0
DT
3134 __receive_convert_reply(r, lkb, ms);
3135 out:
e7fd4179
DT
3136 unlock_rsb(r);
3137 put_rsb(r);
3138}
3139
3140static void receive_convert_reply(struct dlm_ls *ls, struct dlm_message *ms)
3141{
3142 struct dlm_lkb *lkb;
3143 int error;
3144
3145 error = find_lkb(ls, ms->m_remid, &lkb);
3146 if (error) {
3147 log_error(ls, "receive_convert_reply no lkb");
3148 return;
3149 }
3150 DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb););
3151
e7fd4179 3152 _receive_convert_reply(lkb, ms);
b3f58d8f 3153 dlm_put_lkb(lkb);
e7fd4179
DT
3154}
3155
3156static void _receive_unlock_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
3157{
3158 struct dlm_rsb *r = lkb->lkb_resource;
ef0c2bb0 3159 int error;
e7fd4179
DT
3160
3161 hold_rsb(r);
3162 lock_rsb(r);
3163
ef0c2bb0
DT
3164 /* stub reply can happen with waiters_mutex held */
3165 error = remove_from_waiters_ms(lkb, ms);
3166 if (error)
3167 goto out;
3168
e7fd4179
DT
3169 /* this is the value returned from do_unlock() on the master */
3170
ef0c2bb0 3171 switch (ms->m_result) {
e7fd4179
DT
3172 case -DLM_EUNLOCK:
3173 receive_flags_reply(lkb, ms);
3174 remove_lock_pc(r, lkb);
3175 queue_cast(r, lkb, -DLM_EUNLOCK);
3176 break;
ef0c2bb0
DT
3177 case -ENOENT:
3178 break;
e7fd4179 3179 default:
ef0c2bb0
DT
3180 log_error(r->res_ls, "receive_unlock_reply %x error %d",
3181 lkb->lkb_id, ms->m_result);
e7fd4179 3182 }
ef0c2bb0 3183 out:
e7fd4179
DT
3184 unlock_rsb(r);
3185 put_rsb(r);
3186}
3187
3188static void receive_unlock_reply(struct dlm_ls *ls, struct dlm_message *ms)
3189{
3190 struct dlm_lkb *lkb;
3191 int error;
3192
3193 error = find_lkb(ls, ms->m_remid, &lkb);
3194 if (error) {
3195 log_error(ls, "receive_unlock_reply no lkb");
3196 return;
3197 }
3198 DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb););
3199
e7fd4179 3200 _receive_unlock_reply(lkb, ms);
b3f58d8f 3201 dlm_put_lkb(lkb);
e7fd4179
DT
3202}
3203
3204static void _receive_cancel_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
3205{
3206 struct dlm_rsb *r = lkb->lkb_resource;
ef0c2bb0 3207 int error;
e7fd4179
DT
3208
3209 hold_rsb(r);
3210 lock_rsb(r);
3211
ef0c2bb0
DT
3212 /* stub reply can happen with waiters_mutex held */
3213 error = remove_from_waiters_ms(lkb, ms);
3214 if (error)
3215 goto out;
3216
e7fd4179
DT
3217 /* this is the value returned from do_cancel() on the master */
3218
ef0c2bb0 3219 switch (ms->m_result) {
e7fd4179
DT
3220 case -DLM_ECANCEL:
3221 receive_flags_reply(lkb, ms);
3222 revert_lock_pc(r, lkb);
ef0c2bb0
DT
3223 if (ms->m_result)
3224 queue_cast(r, lkb, -DLM_ECANCEL);
3225 break;
3226 case 0:
e7fd4179
DT
3227 break;
3228 default:
ef0c2bb0
DT
3229 log_error(r->res_ls, "receive_cancel_reply %x error %d",
3230 lkb->lkb_id, ms->m_result);
e7fd4179 3231 }
ef0c2bb0 3232 out:
e7fd4179
DT
3233 unlock_rsb(r);
3234 put_rsb(r);
3235}
3236
3237static void receive_cancel_reply(struct dlm_ls *ls, struct dlm_message *ms)
3238{
3239 struct dlm_lkb *lkb;
3240 int error;
3241
3242 error = find_lkb(ls, ms->m_remid, &lkb);
3243 if (error) {
3244 log_error(ls, "receive_cancel_reply no lkb");
3245 return;
3246 }
3247 DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb););
3248
e7fd4179 3249 _receive_cancel_reply(lkb, ms);
b3f58d8f 3250 dlm_put_lkb(lkb);
e7fd4179
DT
3251}
3252
3253static void receive_lookup_reply(struct dlm_ls *ls, struct dlm_message *ms)
3254{
3255 struct dlm_lkb *lkb;
3256 struct dlm_rsb *r;
3257 int error, ret_nodeid;
3258
3259 error = find_lkb(ls, ms->m_lkid, &lkb);
3260 if (error) {
3261 log_error(ls, "receive_lookup_reply no lkb");
3262 return;
3263 }
3264
ef0c2bb0 3265 /* ms->m_result is the value returned by dlm_dir_lookup on dir node
e7fd4179 3266 FIXME: will a non-zero error ever be returned? */
e7fd4179
DT
3267
3268 r = lkb->lkb_resource;
3269 hold_rsb(r);
3270 lock_rsb(r);
3271
ef0c2bb0
DT
3272 error = remove_from_waiters(lkb, DLM_MSG_LOOKUP_REPLY);
3273 if (error)
3274 goto out;
3275
e7fd4179
DT
3276 ret_nodeid = ms->m_nodeid;
3277 if (ret_nodeid == dlm_our_nodeid()) {
3278 r->res_nodeid = 0;
3279 ret_nodeid = 0;
3280 r->res_first_lkid = 0;
3281 } else {
3282 /* set_master() will copy res_nodeid to lkb_nodeid */
3283 r->res_nodeid = ret_nodeid;
3284 }
3285
ef0c2bb0
DT
3286 if (is_overlap(lkb)) {
3287 log_debug(ls, "receive_lookup_reply %x unlock %x",
3288 lkb->lkb_id, lkb->lkb_flags);
3289 queue_cast_overlap(r, lkb);
3290 unhold_lkb(lkb); /* undoes create_lkb() */
3291 goto out_list;
3292 }
3293
e7fd4179
DT
3294 _request_lock(r, lkb);
3295
ef0c2bb0 3296 out_list:
e7fd4179
DT
3297 if (!ret_nodeid)
3298 process_lookup_list(r);
ef0c2bb0 3299 out:
e7fd4179
DT
3300 unlock_rsb(r);
3301 put_rsb(r);
b3f58d8f 3302 dlm_put_lkb(lkb);
e7fd4179
DT
3303}
3304
3305int dlm_receive_message(struct dlm_header *hd, int nodeid, int recovery)
3306{
3307 struct dlm_message *ms = (struct dlm_message *) hd;
3308 struct dlm_ls *ls;
8fd3a98f 3309 int error = 0;
e7fd4179
DT
3310
3311 if (!recovery)
3312 dlm_message_in(ms);
3313
3314 ls = dlm_find_lockspace_global(hd->h_lockspace);
3315 if (!ls) {
3316 log_print("drop message %d from %d for unknown lockspace %d",
3317 ms->m_type, nodeid, hd->h_lockspace);
3318 return -EINVAL;
3319 }
3320
3321 /* recovery may have just ended leaving a bunch of backed-up requests
3322 in the requestqueue; wait while dlm_recoverd clears them */
3323
3324 if (!recovery)
3325 dlm_wait_requestqueue(ls);
3326
3327 /* recovery may have just started while there were a bunch of
3328 in-flight requests -- save them in requestqueue to be processed
3329 after recovery. we can't let dlm_recvd block on the recovery
3330 lock. if dlm_recoverd is calling this function to clear the
3331 requestqueue, it needs to be interrupted (-EINTR) if another
3332 recovery operation is starting. */
3333
3334 while (1) {
3335 if (dlm_locking_stopped(ls)) {
d4400156
DT
3336 if (recovery) {
3337 error = -EINTR;
3338 goto out;
3339 }
3340 error = dlm_add_requestqueue(ls, nodeid, hd);
3341 if (error == -EAGAIN)
3342 continue;
3343 else {
3344 error = -EINTR;
3345 goto out;
3346 }
e7fd4179
DT
3347 }
3348
3349 if (lock_recovery_try(ls))
3350 break;
3351 schedule();
3352 }
3353
3354 switch (ms->m_type) {
3355
3356 /* messages sent to a master node */
3357
3358 case DLM_MSG_REQUEST:
3359 receive_request(ls, ms);
3360 break;
3361
3362 case DLM_MSG_CONVERT:
3363 receive_convert(ls, ms);
3364 break;
3365
3366 case DLM_MSG_UNLOCK:
3367 receive_unlock(ls, ms);
3368 break;
3369
3370 case DLM_MSG_CANCEL:
3371 receive_cancel(ls, ms);
3372 break;
3373
3374 /* messages sent from a master node (replies to above) */
3375
3376 case DLM_MSG_REQUEST_REPLY:
3377 receive_request_reply(ls, ms);
3378 break;
3379
3380 case DLM_MSG_CONVERT_REPLY:
3381 receive_convert_reply(ls, ms);
3382 break;
3383
3384 case DLM_MSG_UNLOCK_REPLY:
3385 receive_unlock_reply(ls, ms);
3386 break;
3387
3388 case DLM_MSG_CANCEL_REPLY:
3389 receive_cancel_reply(ls, ms);
3390 break;
3391
3392 /* messages sent from a master node (only two types of async msg) */
3393
3394 case DLM_MSG_GRANT:
3395 receive_grant(ls, ms);
3396 break;
3397
3398 case DLM_MSG_BAST:
3399 receive_bast(ls, ms);
3400 break;
3401
3402 /* messages sent to a dir node */
3403
3404 case DLM_MSG_LOOKUP:
3405 receive_lookup(ls, ms);
3406 break;
3407
3408 case DLM_MSG_REMOVE:
3409 receive_remove(ls, ms);
3410 break;
3411
3412 /* messages sent from a dir node (remove has no reply) */
3413
3414 case DLM_MSG_LOOKUP_REPLY:
3415 receive_lookup_reply(ls, ms);
3416 break;
3417
8499137d
DT
3418 /* other messages */
3419
3420 case DLM_MSG_PURGE:
3421 receive_purge(ls, ms);
3422 break;
3423
e7fd4179
DT
3424 default:
3425 log_error(ls, "unknown message type %d", ms->m_type);
3426 }
3427
3428 unlock_recovery(ls);
3429 out:
3430 dlm_put_lockspace(ls);
3431 dlm_astd_wake();
8fd3a98f 3432 return error;
e7fd4179
DT
3433}
3434
3435
3436/*
3437 * Recovery related
3438 */
3439
3440static void recover_convert_waiter(struct dlm_ls *ls, struct dlm_lkb *lkb)
3441{
3442 if (middle_conversion(lkb)) {
3443 hold_lkb(lkb);
ef0c2bb0 3444 ls->ls_stub_ms.m_type = DLM_MSG_CONVERT_REPLY;
e7fd4179 3445 ls->ls_stub_ms.m_result = -EINPROGRESS;
075529b5 3446 ls->ls_stub_ms.m_flags = lkb->lkb_flags;
e7fd4179
DT
3447 _receive_convert_reply(lkb, &ls->ls_stub_ms);
3448
3449 /* Same special case as in receive_rcom_lock_args() */
3450 lkb->lkb_grmode = DLM_LOCK_IV;
3451 rsb_set_flag(lkb->lkb_resource, RSB_RECOVER_CONVERT);
3452 unhold_lkb(lkb);
3453
3454 } else if (lkb->lkb_rqmode >= lkb->lkb_grmode) {
3455 lkb->lkb_flags |= DLM_IFL_RESEND;
3456 }
3457
3458 /* lkb->lkb_rqmode < lkb->lkb_grmode shouldn't happen since down
3459 conversions are async; there's no reply from the remote master */
3460}
3461
3462/* A waiting lkb needs recovery if the master node has failed, or
3463 the master node is changing (only when no directory is used) */
3464
3465static int waiter_needs_recovery(struct dlm_ls *ls, struct dlm_lkb *lkb)
3466{
3467 if (dlm_is_removed(ls, lkb->lkb_nodeid))
3468 return 1;
3469
3470 if (!dlm_no_directory(ls))
3471 return 0;
3472
3473 if (dlm_dir_nodeid(lkb->lkb_resource) != lkb->lkb_nodeid)
3474 return 1;
3475
3476 return 0;
3477}
3478
3479/* Recovery for locks that are waiting for replies from nodes that are now
3480 gone. We can just complete unlocks and cancels by faking a reply from the
3481 dead node. Requests and up-conversions we flag to be resent after
3482 recovery. Down-conversions can just be completed with a fake reply like
3483 unlocks. Conversions between PR and CW need special attention. */
3484
3485void dlm_recover_waiters_pre(struct dlm_ls *ls)
3486{
3487 struct dlm_lkb *lkb, *safe;
3488
90135925 3489 mutex_lock(&ls->ls_waiters_mutex);
e7fd4179
DT
3490
3491 list_for_each_entry_safe(lkb, safe, &ls->ls_waiters, lkb_wait_reply) {
3492 log_debug(ls, "pre recover waiter lkid %x type %d flags %x",
3493 lkb->lkb_id, lkb->lkb_wait_type, lkb->lkb_flags);
3494
3495 /* all outstanding lookups, regardless of destination will be
3496 resent after recovery is done */
3497
3498 if (lkb->lkb_wait_type == DLM_MSG_LOOKUP) {
3499 lkb->lkb_flags |= DLM_IFL_RESEND;
3500 continue;
3501 }
3502
3503 if (!waiter_needs_recovery(ls, lkb))
3504 continue;
3505
3506 switch (lkb->lkb_wait_type) {
3507
3508 case DLM_MSG_REQUEST:
3509 lkb->lkb_flags |= DLM_IFL_RESEND;
3510 break;
3511
3512 case DLM_MSG_CONVERT:
3513 recover_convert_waiter(ls, lkb);
3514 break;
3515
3516 case DLM_MSG_UNLOCK:
3517 hold_lkb(lkb);
ef0c2bb0 3518 ls->ls_stub_ms.m_type = DLM_MSG_UNLOCK_REPLY;
e7fd4179 3519 ls->ls_stub_ms.m_result = -DLM_EUNLOCK;
075529b5 3520 ls->ls_stub_ms.m_flags = lkb->lkb_flags;
e7fd4179 3521 _receive_unlock_reply(lkb, &ls->ls_stub_ms);
b3f58d8f 3522 dlm_put_lkb(lkb);
e7fd4179
DT
3523 break;
3524
3525 case DLM_MSG_CANCEL:
3526 hold_lkb(lkb);
ef0c2bb0 3527 ls->ls_stub_ms.m_type = DLM_MSG_CANCEL_REPLY;
e7fd4179 3528 ls->ls_stub_ms.m_result = -DLM_ECANCEL;
075529b5 3529 ls->ls_stub_ms.m_flags = lkb->lkb_flags;
e7fd4179 3530 _receive_cancel_reply(lkb, &ls->ls_stub_ms);
b3f58d8f 3531 dlm_put_lkb(lkb);
e7fd4179
DT
3532 break;
3533
3534 default:
3535 log_error(ls, "invalid lkb wait_type %d",
3536 lkb->lkb_wait_type);
3537 }
81456807 3538 schedule();
e7fd4179 3539 }
90135925 3540 mutex_unlock(&ls->ls_waiters_mutex);
e7fd4179
DT
3541}
3542
ef0c2bb0 3543static struct dlm_lkb *find_resend_waiter(struct dlm_ls *ls)
e7fd4179
DT
3544{
3545 struct dlm_lkb *lkb;
ef0c2bb0 3546 int found = 0;
e7fd4179 3547
90135925 3548 mutex_lock(&ls->ls_waiters_mutex);
e7fd4179
DT
3549 list_for_each_entry(lkb, &ls->ls_waiters, lkb_wait_reply) {
3550 if (lkb->lkb_flags & DLM_IFL_RESEND) {
ef0c2bb0
DT
3551 hold_lkb(lkb);
3552 found = 1;
e7fd4179
DT
3553 break;
3554 }
3555 }
90135925 3556 mutex_unlock(&ls->ls_waiters_mutex);
e7fd4179 3557
ef0c2bb0 3558 if (!found)
e7fd4179 3559 lkb = NULL;
ef0c2bb0 3560 return lkb;
e7fd4179
DT
3561}
3562
3563/* Deal with lookups and lkb's marked RESEND from _pre. We may now be the
3564 master or dir-node for r. Processing the lkb may result in it being placed
3565 back on waiters. */
3566
ef0c2bb0
DT
3567/* We do this after normal locking has been enabled and any saved messages
3568 (in requestqueue) have been processed. We should be confident that at
3569 this point we won't get or process a reply to any of these waiting
3570 operations. But, new ops may be coming in on the rsbs/locks here from
3571 userspace or remotely. */
3572
3573/* there may have been an overlap unlock/cancel prior to recovery or after
3574 recovery. if before, the lkb may still have a pos wait_count; if after, the
3575 overlap flag would just have been set and nothing new sent. we can be
3576 confident here than any replies to either the initial op or overlap ops
3577 prior to recovery have been received. */
3578
e7fd4179
DT
3579int dlm_recover_waiters_post(struct dlm_ls *ls)
3580{
3581 struct dlm_lkb *lkb;
3582 struct dlm_rsb *r;
ef0c2bb0 3583 int error = 0, mstype, err, oc, ou;
e7fd4179
DT
3584
3585 while (1) {
3586 if (dlm_locking_stopped(ls)) {
3587 log_debug(ls, "recover_waiters_post aborted");
3588 error = -EINTR;
3589 break;
3590 }
3591
ef0c2bb0
DT
3592 lkb = find_resend_waiter(ls);
3593 if (!lkb)
e7fd4179
DT
3594 break;
3595
3596 r = lkb->lkb_resource;
ef0c2bb0
DT
3597 hold_rsb(r);
3598 lock_rsb(r);
3599
3600 mstype = lkb->lkb_wait_type;
3601 oc = is_overlap_cancel(lkb);
3602 ou = is_overlap_unlock(lkb);
3603 err = 0;
e7fd4179
DT
3604
3605 log_debug(ls, "recover_waiters_post %x type %d flags %x %s",
3606 lkb->lkb_id, mstype, lkb->lkb_flags, r->res_name);
3607
ef0c2bb0
DT
3608 /* At this point we assume that we won't get a reply to any
3609 previous op or overlap op on this lock. First, do a big
3610 remove_from_waiters() for all previous ops. */
3611
3612 lkb->lkb_flags &= ~DLM_IFL_RESEND;
3613 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK;
3614 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
3615 lkb->lkb_wait_type = 0;
3616 lkb->lkb_wait_count = 0;
3617 mutex_lock(&ls->ls_waiters_mutex);
3618 list_del_init(&lkb->lkb_wait_reply);
3619 mutex_unlock(&ls->ls_waiters_mutex);
3620 unhold_lkb(lkb); /* for waiters list */
3621
3622 if (oc || ou) {
3623 /* do an unlock or cancel instead of resending */
3624 switch (mstype) {
3625 case DLM_MSG_LOOKUP:
3626 case DLM_MSG_REQUEST:
3627 queue_cast(r, lkb, ou ? -DLM_EUNLOCK :
3628 -DLM_ECANCEL);
3629 unhold_lkb(lkb); /* undoes create_lkb() */
3630 break;
3631 case DLM_MSG_CONVERT:
3632 if (oc) {
3633 queue_cast(r, lkb, -DLM_ECANCEL);
3634 } else {
3635 lkb->lkb_exflags |= DLM_LKF_FORCEUNLOCK;
3636 _unlock_lock(r, lkb);
3637 }
3638 break;
3639 default:
3640 err = 1;
3641 }
3642 } else {
3643 switch (mstype) {
3644 case DLM_MSG_LOOKUP:
3645 case DLM_MSG_REQUEST:
3646 _request_lock(r, lkb);
3647 if (is_master(r))
3648 confirm_master(r, 0);
3649 break;
3650 case DLM_MSG_CONVERT:
3651 _convert_lock(r, lkb);
3652 break;
3653 default:
3654 err = 1;
3655 }
e7fd4179 3656 }
ef0c2bb0
DT
3657
3658 if (err)
3659 log_error(ls, "recover_waiters_post %x %d %x %d %d",
3660 lkb->lkb_id, mstype, lkb->lkb_flags, oc, ou);
3661 unlock_rsb(r);
3662 put_rsb(r);
3663 dlm_put_lkb(lkb);
e7fd4179
DT
3664 }
3665
3666 return error;
3667}
3668
3669static void purge_queue(struct dlm_rsb *r, struct list_head *queue,
3670 int (*test)(struct dlm_ls *ls, struct dlm_lkb *lkb))
3671{
3672 struct dlm_ls *ls = r->res_ls;
3673 struct dlm_lkb *lkb, *safe;
3674
3675 list_for_each_entry_safe(lkb, safe, queue, lkb_statequeue) {
3676 if (test(ls, lkb)) {
97a35d1e 3677 rsb_set_flag(r, RSB_LOCKS_PURGED);
e7fd4179
DT
3678 del_lkb(r, lkb);
3679 /* this put should free the lkb */
b3f58d8f 3680 if (!dlm_put_lkb(lkb))
e7fd4179
DT
3681 log_error(ls, "purged lkb not released");
3682 }
3683 }
3684}
3685
3686static int purge_dead_test(struct dlm_ls *ls, struct dlm_lkb *lkb)
3687{
3688 return (is_master_copy(lkb) && dlm_is_removed(ls, lkb->lkb_nodeid));
3689}
3690
3691static int purge_mstcpy_test(struct dlm_ls *ls, struct dlm_lkb *lkb)
3692{
3693 return is_master_copy(lkb);
3694}
3695
3696static void purge_dead_locks(struct dlm_rsb *r)
3697{
3698 purge_queue(r, &r->res_grantqueue, &purge_dead_test);
3699 purge_queue(r, &r->res_convertqueue, &purge_dead_test);
3700 purge_queue(r, &r->res_waitqueue, &purge_dead_test);
3701}
3702
3703void dlm_purge_mstcpy_locks(struct dlm_rsb *r)
3704{
3705 purge_queue(r, &r->res_grantqueue, &purge_mstcpy_test);
3706 purge_queue(r, &r->res_convertqueue, &purge_mstcpy_test);
3707 purge_queue(r, &r->res_waitqueue, &purge_mstcpy_test);
3708}
3709
3710/* Get rid of locks held by nodes that are gone. */
3711
3712int dlm_purge_locks(struct dlm_ls *ls)
3713{
3714 struct dlm_rsb *r;
3715
3716 log_debug(ls, "dlm_purge_locks");
3717
3718 down_write(&ls->ls_root_sem);
3719 list_for_each_entry(r, &ls->ls_root_list, res_root_list) {
3720 hold_rsb(r);
3721 lock_rsb(r);
3722 if (is_master(r))
3723 purge_dead_locks(r);
3724 unlock_rsb(r);
3725 unhold_rsb(r);
3726
3727 schedule();
3728 }
3729 up_write(&ls->ls_root_sem);
3730
3731 return 0;
3732}
3733
97a35d1e
DT
3734static struct dlm_rsb *find_purged_rsb(struct dlm_ls *ls, int bucket)
3735{
3736 struct dlm_rsb *r, *r_ret = NULL;
3737
3738 read_lock(&ls->ls_rsbtbl[bucket].lock);
3739 list_for_each_entry(r, &ls->ls_rsbtbl[bucket].list, res_hashchain) {
3740 if (!rsb_flag(r, RSB_LOCKS_PURGED))
3741 continue;
3742 hold_rsb(r);
3743 rsb_clear_flag(r, RSB_LOCKS_PURGED);
3744 r_ret = r;
3745 break;
3746 }
3747 read_unlock(&ls->ls_rsbtbl[bucket].lock);
3748 return r_ret;
3749}
3750
3751void dlm_grant_after_purge(struct dlm_ls *ls)
e7fd4179
DT
3752{
3753 struct dlm_rsb *r;
2b4e926a 3754 int bucket = 0;
e7fd4179 3755
2b4e926a
DT
3756 while (1) {
3757 r = find_purged_rsb(ls, bucket);
3758 if (!r) {
3759 if (bucket == ls->ls_rsbtbl_size - 1)
3760 break;
3761 bucket++;
97a35d1e 3762 continue;
2b4e926a 3763 }
97a35d1e
DT
3764 lock_rsb(r);
3765 if (is_master(r)) {
3766 grant_pending_locks(r);
3767 confirm_master(r, 0);
e7fd4179 3768 }
97a35d1e
DT
3769 unlock_rsb(r);
3770 put_rsb(r);
2b4e926a 3771 schedule();
e7fd4179 3772 }
e7fd4179
DT
3773}
3774
3775static struct dlm_lkb *search_remid_list(struct list_head *head, int nodeid,
3776 uint32_t remid)
3777{
3778 struct dlm_lkb *lkb;
3779
3780 list_for_each_entry(lkb, head, lkb_statequeue) {
3781 if (lkb->lkb_nodeid == nodeid && lkb->lkb_remid == remid)
3782 return lkb;
3783 }
3784 return NULL;
3785}
3786
3787static struct dlm_lkb *search_remid(struct dlm_rsb *r, int nodeid,
3788 uint32_t remid)
3789{
3790 struct dlm_lkb *lkb;
3791
3792 lkb = search_remid_list(&r->res_grantqueue, nodeid, remid);
3793 if (lkb)
3794 return lkb;
3795 lkb = search_remid_list(&r->res_convertqueue, nodeid, remid);
3796 if (lkb)
3797 return lkb;
3798 lkb = search_remid_list(&r->res_waitqueue, nodeid, remid);
3799 if (lkb)
3800 return lkb;
3801 return NULL;
3802}
3803
3804static int receive_rcom_lock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
3805 struct dlm_rsb *r, struct dlm_rcom *rc)
3806{
3807 struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf;
3808 int lvblen;
3809
3810 lkb->lkb_nodeid = rc->rc_header.h_nodeid;
3811 lkb->lkb_ownpid = rl->rl_ownpid;
3812 lkb->lkb_remid = rl->rl_lkid;
3813 lkb->lkb_exflags = rl->rl_exflags;
3814 lkb->lkb_flags = rl->rl_flags & 0x0000FFFF;
3815 lkb->lkb_flags |= DLM_IFL_MSTCPY;
3816 lkb->lkb_lvbseq = rl->rl_lvbseq;
3817 lkb->lkb_rqmode = rl->rl_rqmode;
3818 lkb->lkb_grmode = rl->rl_grmode;
3819 /* don't set lkb_status because add_lkb wants to itself */
3820
3821 lkb->lkb_bastaddr = (void *) (long) (rl->rl_asts & AST_BAST);
3822 lkb->lkb_astaddr = (void *) (long) (rl->rl_asts & AST_COMP);
3823
e7fd4179
DT
3824 if (lkb->lkb_exflags & DLM_LKF_VALBLK) {
3825 lkb->lkb_lvbptr = allocate_lvb(ls);
3826 if (!lkb->lkb_lvbptr)
3827 return -ENOMEM;
3828 lvblen = rc->rc_header.h_length - sizeof(struct dlm_rcom) -
3829 sizeof(struct rcom_lock);
3830 memcpy(lkb->lkb_lvbptr, rl->rl_lvb, lvblen);
3831 }
3832
3833 /* Conversions between PR and CW (middle modes) need special handling.
3834 The real granted mode of these converting locks cannot be determined
3835 until all locks have been rebuilt on the rsb (recover_conversion) */
3836
3837 if (rl->rl_wait_type == DLM_MSG_CONVERT && middle_conversion(lkb)) {
3838 rl->rl_status = DLM_LKSTS_CONVERT;
3839 lkb->lkb_grmode = DLM_LOCK_IV;
3840 rsb_set_flag(r, RSB_RECOVER_CONVERT);
3841 }
3842
3843 return 0;
3844}
3845
3846/* This lkb may have been recovered in a previous aborted recovery so we need
3847 to check if the rsb already has an lkb with the given remote nodeid/lkid.
3848 If so we just send back a standard reply. If not, we create a new lkb with
3849 the given values and send back our lkid. We send back our lkid by sending
3850 back the rcom_lock struct we got but with the remid field filled in. */
3851
3852int dlm_recover_master_copy(struct dlm_ls *ls, struct dlm_rcom *rc)
3853{
3854 struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf;
3855 struct dlm_rsb *r;
3856 struct dlm_lkb *lkb;
3857 int error;
3858
3859 if (rl->rl_parent_lkid) {
3860 error = -EOPNOTSUPP;
3861 goto out;
3862 }
3863
3864 error = find_rsb(ls, rl->rl_name, rl->rl_namelen, R_MASTER, &r);
3865 if (error)
3866 goto out;
3867
3868 lock_rsb(r);
3869
3870 lkb = search_remid(r, rc->rc_header.h_nodeid, rl->rl_lkid);
3871 if (lkb) {
3872 error = -EEXIST;
3873 goto out_remid;
3874 }
3875
3876 error = create_lkb(ls, &lkb);
3877 if (error)
3878 goto out_unlock;
3879
3880 error = receive_rcom_lock_args(ls, lkb, r, rc);
3881 if (error) {
b3f58d8f 3882 __put_lkb(ls, lkb);
e7fd4179
DT
3883 goto out_unlock;
3884 }
3885
3886 attach_lkb(r, lkb);
3887 add_lkb(r, lkb, rl->rl_status);
3888 error = 0;
3889
3890 out_remid:
3891 /* this is the new value returned to the lock holder for
3892 saving in its process-copy lkb */
3893 rl->rl_remid = lkb->lkb_id;
3894
3895 out_unlock:
3896 unlock_rsb(r);
3897 put_rsb(r);
3898 out:
3899 if (error)
3900 log_print("recover_master_copy %d %x", error, rl->rl_lkid);
3901 rl->rl_result = error;
3902 return error;
3903}
3904
3905int dlm_recover_process_copy(struct dlm_ls *ls, struct dlm_rcom *rc)
3906{
3907 struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf;
3908 struct dlm_rsb *r;
3909 struct dlm_lkb *lkb;
3910 int error;
3911
3912 error = find_lkb(ls, rl->rl_lkid, &lkb);
3913 if (error) {
3914 log_error(ls, "recover_process_copy no lkid %x", rl->rl_lkid);
3915 return error;
3916 }
3917
3918 DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb););
3919
3920 error = rl->rl_result;
3921
3922 r = lkb->lkb_resource;
3923 hold_rsb(r);
3924 lock_rsb(r);
3925
3926 switch (error) {
dc200a88
DT
3927 case -EBADR:
3928 /* There's a chance the new master received our lock before
3929 dlm_recover_master_reply(), this wouldn't happen if we did
3930 a barrier between recover_masters and recover_locks. */
3931 log_debug(ls, "master copy not ready %x r %lx %s", lkb->lkb_id,
3932 (unsigned long)r, r->res_name);
3933 dlm_send_rcom_lock(r, lkb);
3934 goto out;
e7fd4179
DT
3935 case -EEXIST:
3936 log_debug(ls, "master copy exists %x", lkb->lkb_id);
3937 /* fall through */
3938 case 0:
3939 lkb->lkb_remid = rl->rl_remid;
3940 break;
3941 default:
3942 log_error(ls, "dlm_recover_process_copy unknown error %d %x",
3943 error, lkb->lkb_id);
3944 }
3945
3946 /* an ack for dlm_recover_locks() which waits for replies from
3947 all the locks it sends to new masters */
3948 dlm_recovered_lock(r);
dc200a88 3949 out:
e7fd4179
DT
3950 unlock_rsb(r);
3951 put_rsb(r);
b3f58d8f 3952 dlm_put_lkb(lkb);
e7fd4179
DT
3953
3954 return 0;
3955}
3956
597d0cae
DT
3957int dlm_user_request(struct dlm_ls *ls, struct dlm_user_args *ua,
3958 int mode, uint32_t flags, void *name, unsigned int namelen,
3959 uint32_t parent_lkid)
3960{
3961 struct dlm_lkb *lkb;
3962 struct dlm_args args;
3963 int error;
3964
3965 lock_recovery(ls);
3966
3967 error = create_lkb(ls, &lkb);
3968 if (error) {
3969 kfree(ua);
3970 goto out;
3971 }
3972
3973 if (flags & DLM_LKF_VALBLK) {
62a0f623 3974 ua->lksb.sb_lvbptr = kzalloc(DLM_USER_LVB_LEN, GFP_KERNEL);
597d0cae
DT
3975 if (!ua->lksb.sb_lvbptr) {
3976 kfree(ua);
3977 __put_lkb(ls, lkb);
3978 error = -ENOMEM;
3979 goto out;
3980 }
3981 }
3982
3983 /* After ua is attached to lkb it will be freed by free_lkb().
3984 When DLM_IFL_USER is set, the dlm knows that this is a userspace
3985 lock and that lkb_astparam is the dlm_user_args structure. */
3986
3987 error = set_lock_args(mode, &ua->lksb, flags, namelen, parent_lkid,
32f105a1 3988 DLM_FAKE_USER_AST, ua, DLM_FAKE_USER_AST, &args);
597d0cae
DT
3989 lkb->lkb_flags |= DLM_IFL_USER;
3990 ua->old_mode = DLM_LOCK_IV;
3991
3992 if (error) {
3993 __put_lkb(ls, lkb);
3994 goto out;
3995 }
3996
3997 error = request_lock(ls, lkb, name, namelen, &args);
3998
3999 switch (error) {
4000 case 0:
4001 break;
4002 case -EINPROGRESS:
4003 error = 0;
4004 break;
4005 case -EAGAIN:
4006 error = 0;
4007 /* fall through */
4008 default:
4009 __put_lkb(ls, lkb);
4010 goto out;
4011 }
4012
4013 /* add this new lkb to the per-process list of locks */
4014 spin_lock(&ua->proc->locks_spin);
ef0c2bb0 4015 hold_lkb(lkb);
597d0cae
DT
4016 list_add_tail(&lkb->lkb_ownqueue, &ua->proc->locks);
4017 spin_unlock(&ua->proc->locks_spin);
4018 out:
4019 unlock_recovery(ls);
4020 return error;
4021}
4022
4023int dlm_user_convert(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
4024 int mode, uint32_t flags, uint32_t lkid, char *lvb_in)
4025{
4026 struct dlm_lkb *lkb;
4027 struct dlm_args args;
4028 struct dlm_user_args *ua;
4029 int error;
4030
4031 lock_recovery(ls);
4032
4033 error = find_lkb(ls, lkid, &lkb);
4034 if (error)
4035 goto out;
4036
4037 /* user can change the params on its lock when it converts it, or
4038 add an lvb that didn't exist before */
4039
4040 ua = (struct dlm_user_args *)lkb->lkb_astparam;
4041
4042 if (flags & DLM_LKF_VALBLK && !ua->lksb.sb_lvbptr) {
62a0f623 4043 ua->lksb.sb_lvbptr = kzalloc(DLM_USER_LVB_LEN, GFP_KERNEL);
597d0cae
DT
4044 if (!ua->lksb.sb_lvbptr) {
4045 error = -ENOMEM;
4046 goto out_put;
4047 }
4048 }
4049 if (lvb_in && ua->lksb.sb_lvbptr)
4050 memcpy(ua->lksb.sb_lvbptr, lvb_in, DLM_USER_LVB_LEN);
4051
4052 ua->castparam = ua_tmp->castparam;
4053 ua->castaddr = ua_tmp->castaddr;
4054 ua->bastparam = ua_tmp->bastparam;
4055 ua->bastaddr = ua_tmp->bastaddr;
10948eb4 4056 ua->user_lksb = ua_tmp->user_lksb;
597d0cae
DT
4057 ua->old_mode = lkb->lkb_grmode;
4058
32f105a1
DT
4059 error = set_lock_args(mode, &ua->lksb, flags, 0, 0, DLM_FAKE_USER_AST,
4060 ua, DLM_FAKE_USER_AST, &args);
597d0cae
DT
4061 if (error)
4062 goto out_put;
4063
4064 error = convert_lock(ls, lkb, &args);
4065
4066 if (error == -EINPROGRESS || error == -EAGAIN)
4067 error = 0;
4068 out_put:
4069 dlm_put_lkb(lkb);
4070 out:
4071 unlock_recovery(ls);
4072 kfree(ua_tmp);
4073 return error;
4074}
4075
4076int dlm_user_unlock(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
4077 uint32_t flags, uint32_t lkid, char *lvb_in)
4078{
4079 struct dlm_lkb *lkb;
4080 struct dlm_args args;
4081 struct dlm_user_args *ua;
4082 int error;
4083
4084 lock_recovery(ls);
4085
4086 error = find_lkb(ls, lkid, &lkb);
4087 if (error)
4088 goto out;
4089
4090 ua = (struct dlm_user_args *)lkb->lkb_astparam;
4091
4092 if (lvb_in && ua->lksb.sb_lvbptr)
4093 memcpy(ua->lksb.sb_lvbptr, lvb_in, DLM_USER_LVB_LEN);
4094 ua->castparam = ua_tmp->castparam;
cc346d55 4095 ua->user_lksb = ua_tmp->user_lksb;
597d0cae
DT
4096
4097 error = set_unlock_args(flags, ua, &args);
4098 if (error)
4099 goto out_put;
4100
4101 error = unlock_lock(ls, lkb, &args);
4102
4103 if (error == -DLM_EUNLOCK)
4104 error = 0;
ef0c2bb0
DT
4105 /* from validate_unlock_args() */
4106 if (error == -EBUSY && (flags & DLM_LKF_FORCEUNLOCK))
4107 error = 0;
597d0cae
DT
4108 if (error)
4109 goto out_put;
4110
4111 spin_lock(&ua->proc->locks_spin);
a1bc86e6
DT
4112 /* dlm_user_add_ast() may have already taken lkb off the proc list */
4113 if (!list_empty(&lkb->lkb_ownqueue))
4114 list_move(&lkb->lkb_ownqueue, &ua->proc->unlocking);
597d0cae 4115 spin_unlock(&ua->proc->locks_spin);
597d0cae
DT
4116 out_put:
4117 dlm_put_lkb(lkb);
4118 out:
4119 unlock_recovery(ls);
ef0c2bb0 4120 kfree(ua_tmp);
597d0cae
DT
4121 return error;
4122}
4123
4124int dlm_user_cancel(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
4125 uint32_t flags, uint32_t lkid)
4126{
4127 struct dlm_lkb *lkb;
4128 struct dlm_args args;
4129 struct dlm_user_args *ua;
4130 int error;
4131
4132 lock_recovery(ls);
4133
4134 error = find_lkb(ls, lkid, &lkb);
4135 if (error)
4136 goto out;
4137
4138 ua = (struct dlm_user_args *)lkb->lkb_astparam;
4139 ua->castparam = ua_tmp->castparam;
c059f70e 4140 ua->user_lksb = ua_tmp->user_lksb;
597d0cae
DT
4141
4142 error = set_unlock_args(flags, ua, &args);
4143 if (error)
4144 goto out_put;
4145
4146 error = cancel_lock(ls, lkb, &args);
4147
4148 if (error == -DLM_ECANCEL)
4149 error = 0;
ef0c2bb0
DT
4150 /* from validate_unlock_args() */
4151 if (error == -EBUSY)
4152 error = 0;
597d0cae
DT
4153 out_put:
4154 dlm_put_lkb(lkb);
4155 out:
4156 unlock_recovery(ls);
ef0c2bb0 4157 kfree(ua_tmp);
597d0cae
DT
4158 return error;
4159}
4160
ef0c2bb0
DT
4161/* lkb's that are removed from the waiters list by revert are just left on the
4162 orphans list with the granted orphan locks, to be freed by purge */
4163
597d0cae
DT
4164static int orphan_proc_lock(struct dlm_ls *ls, struct dlm_lkb *lkb)
4165{
4166 struct dlm_user_args *ua = (struct dlm_user_args *)lkb->lkb_astparam;
ef0c2bb0
DT
4167 struct dlm_args args;
4168 int error;
597d0cae 4169
ef0c2bb0
DT
4170 hold_lkb(lkb);
4171 mutex_lock(&ls->ls_orphans_mutex);
4172 list_add_tail(&lkb->lkb_ownqueue, &ls->ls_orphans);
4173 mutex_unlock(&ls->ls_orphans_mutex);
597d0cae 4174
ef0c2bb0
DT
4175 set_unlock_args(0, ua, &args);
4176
4177 error = cancel_lock(ls, lkb, &args);
4178 if (error == -DLM_ECANCEL)
4179 error = 0;
4180 return error;
597d0cae
DT
4181}
4182
4183/* The force flag allows the unlock to go ahead even if the lkb isn't granted.
4184 Regardless of what rsb queue the lock is on, it's removed and freed. */
4185
4186static int unlock_proc_lock(struct dlm_ls *ls, struct dlm_lkb *lkb)
4187{
4188 struct dlm_user_args *ua = (struct dlm_user_args *)lkb->lkb_astparam;
4189 struct dlm_args args;
4190 int error;
4191
597d0cae
DT
4192 set_unlock_args(DLM_LKF_FORCEUNLOCK, ua, &args);
4193
4194 error = unlock_lock(ls, lkb, &args);
4195 if (error == -DLM_EUNLOCK)
4196 error = 0;
4197 return error;
4198}
4199
ef0c2bb0
DT
4200/* We have to release clear_proc_locks mutex before calling unlock_proc_lock()
4201 (which does lock_rsb) due to deadlock with receiving a message that does
4202 lock_rsb followed by dlm_user_add_ast() */
4203
4204static struct dlm_lkb *del_proc_lock(struct dlm_ls *ls,
4205 struct dlm_user_proc *proc)
4206{
4207 struct dlm_lkb *lkb = NULL;
4208
4209 mutex_lock(&ls->ls_clear_proc_locks);
4210 if (list_empty(&proc->locks))
4211 goto out;
4212
4213 lkb = list_entry(proc->locks.next, struct dlm_lkb, lkb_ownqueue);
4214 list_del_init(&lkb->lkb_ownqueue);
4215
4216 if (lkb->lkb_exflags & DLM_LKF_PERSISTENT)
4217 lkb->lkb_flags |= DLM_IFL_ORPHAN;
4218 else
4219 lkb->lkb_flags |= DLM_IFL_DEAD;
4220 out:
4221 mutex_unlock(&ls->ls_clear_proc_locks);
4222 return lkb;
4223}
4224
597d0cae
DT
4225/* The ls_clear_proc_locks mutex protects against dlm_user_add_asts() which
4226 1) references lkb->ua which we free here and 2) adds lkbs to proc->asts,
4227 which we clear here. */
4228
4229/* proc CLOSING flag is set so no more device_reads should look at proc->asts
4230 list, and no more device_writes should add lkb's to proc->locks list; so we
4231 shouldn't need to take asts_spin or locks_spin here. this assumes that
4232 device reads/writes/closes are serialized -- FIXME: we may need to serialize
4233 them ourself. */
4234
4235void dlm_clear_proc_locks(struct dlm_ls *ls, struct dlm_user_proc *proc)
4236{
4237 struct dlm_lkb *lkb, *safe;
4238
4239 lock_recovery(ls);
597d0cae 4240
ef0c2bb0
DT
4241 while (1) {
4242 lkb = del_proc_lock(ls, proc);
4243 if (!lkb)
4244 break;
4245 if (lkb->lkb_exflags & DLM_LKF_PERSISTENT)
597d0cae 4246 orphan_proc_lock(ls, lkb);
ef0c2bb0 4247 else
597d0cae 4248 unlock_proc_lock(ls, lkb);
597d0cae
DT
4249
4250 /* this removes the reference for the proc->locks list
4251 added by dlm_user_request, it may result in the lkb
4252 being freed */
4253
4254 dlm_put_lkb(lkb);
4255 }
a1bc86e6 4256
ef0c2bb0
DT
4257 mutex_lock(&ls->ls_clear_proc_locks);
4258
a1bc86e6
DT
4259 /* in-progress unlocks */
4260 list_for_each_entry_safe(lkb, safe, &proc->unlocking, lkb_ownqueue) {
4261 list_del_init(&lkb->lkb_ownqueue);
4262 lkb->lkb_flags |= DLM_IFL_DEAD;
4263 dlm_put_lkb(lkb);
4264 }
4265
4266 list_for_each_entry_safe(lkb, safe, &proc->asts, lkb_astqueue) {
4267 list_del(&lkb->lkb_astqueue);
4268 dlm_put_lkb(lkb);
4269 }
4270
597d0cae
DT
4271 mutex_unlock(&ls->ls_clear_proc_locks);
4272 unlock_recovery(ls);
4273}
a1bc86e6 4274
8499137d
DT
4275static void purge_proc_locks(struct dlm_ls *ls, struct dlm_user_proc *proc)
4276{
4277 struct dlm_lkb *lkb, *safe;
4278
4279 while (1) {
4280 lkb = NULL;
4281 spin_lock(&proc->locks_spin);
4282 if (!list_empty(&proc->locks)) {
4283 lkb = list_entry(proc->locks.next, struct dlm_lkb,
4284 lkb_ownqueue);
4285 list_del_init(&lkb->lkb_ownqueue);
4286 }
4287 spin_unlock(&proc->locks_spin);
4288
4289 if (!lkb)
4290 break;
4291
4292 lkb->lkb_flags |= DLM_IFL_DEAD;
4293 unlock_proc_lock(ls, lkb);
4294 dlm_put_lkb(lkb); /* ref from proc->locks list */
4295 }
4296
4297 spin_lock(&proc->locks_spin);
4298 list_for_each_entry_safe(lkb, safe, &proc->unlocking, lkb_ownqueue) {
4299 list_del_init(&lkb->lkb_ownqueue);
4300 lkb->lkb_flags |= DLM_IFL_DEAD;
4301 dlm_put_lkb(lkb);
4302 }
4303 spin_unlock(&proc->locks_spin);
4304
4305 spin_lock(&proc->asts_spin);
4306 list_for_each_entry_safe(lkb, safe, &proc->asts, lkb_astqueue) {
4307 list_del(&lkb->lkb_astqueue);
4308 dlm_put_lkb(lkb);
4309 }
4310 spin_unlock(&proc->asts_spin);
4311}
4312
4313/* pid of 0 means purge all orphans */
4314
4315static void do_purge(struct dlm_ls *ls, int nodeid, int pid)
4316{
4317 struct dlm_lkb *lkb, *safe;
4318
4319 mutex_lock(&ls->ls_orphans_mutex);
4320 list_for_each_entry_safe(lkb, safe, &ls->ls_orphans, lkb_ownqueue) {
4321 if (pid && lkb->lkb_ownpid != pid)
4322 continue;
4323 unlock_proc_lock(ls, lkb);
4324 list_del_init(&lkb->lkb_ownqueue);
4325 dlm_put_lkb(lkb);
4326 }
4327 mutex_unlock(&ls->ls_orphans_mutex);
4328}
4329
4330static int send_purge(struct dlm_ls *ls, int nodeid, int pid)
4331{
4332 struct dlm_message *ms;
4333 struct dlm_mhandle *mh;
4334 int error;
4335
4336 error = _create_message(ls, sizeof(struct dlm_message), nodeid,
4337 DLM_MSG_PURGE, &ms, &mh);
4338 if (error)
4339 return error;
4340 ms->m_nodeid = nodeid;
4341 ms->m_pid = pid;
4342
4343 return send_message(mh, ms);
4344}
4345
4346int dlm_user_purge(struct dlm_ls *ls, struct dlm_user_proc *proc,
4347 int nodeid, int pid)
4348{
4349 int error = 0;
4350
4351 if (nodeid != dlm_our_nodeid()) {
4352 error = send_purge(ls, nodeid, pid);
4353 } else {
4354 lock_recovery(ls);
4355 if (pid == current->pid)
4356 purge_proc_locks(ls, proc);
4357 else
4358 do_purge(ls, nodeid, pid);
4359 unlock_recovery(ls);
4360 }
4361 return error;
4362}
4363
This page took 0.305298 seconds and 5 git commands to generate.