[DLM] keep dlm from panicing when traversing rsb list in debugfs
[deliverable/linux.git] / fs / dlm / lock.c
CommitLineData
e7fd4179
DT
1/******************************************************************************
2*******************************************************************************
3**
ef0c2bb0 4** Copyright (C) 2005-2007 Red Hat, Inc. All rights reserved.
e7fd4179
DT
5**
6** This copyrighted material is made available to anyone wishing to use,
7** modify, copy, or redistribute it subject to the terms and conditions
8** of the GNU General Public License v.2.
9**
10*******************************************************************************
11******************************************************************************/
12
13/* Central locking logic has four stages:
14
15 dlm_lock()
16 dlm_unlock()
17
18 request_lock(ls, lkb)
19 convert_lock(ls, lkb)
20 unlock_lock(ls, lkb)
21 cancel_lock(ls, lkb)
22
23 _request_lock(r, lkb)
24 _convert_lock(r, lkb)
25 _unlock_lock(r, lkb)
26 _cancel_lock(r, lkb)
27
28 do_request(r, lkb)
29 do_convert(r, lkb)
30 do_unlock(r, lkb)
31 do_cancel(r, lkb)
32
33 Stage 1 (lock, unlock) is mainly about checking input args and
34 splitting into one of the four main operations:
35
36 dlm_lock = request_lock
37 dlm_lock+CONVERT = convert_lock
38 dlm_unlock = unlock_lock
39 dlm_unlock+CANCEL = cancel_lock
40
41 Stage 2, xxxx_lock(), just finds and locks the relevant rsb which is
42 provided to the next stage.
43
44 Stage 3, _xxxx_lock(), determines if the operation is local or remote.
45 When remote, it calls send_xxxx(), when local it calls do_xxxx().
46
47 Stage 4, do_xxxx(), is the guts of the operation. It manipulates the
48 given rsb and lkb and queues callbacks.
49
50 For remote operations, send_xxxx() results in the corresponding do_xxxx()
51 function being executed on the remote node. The connecting send/receive
52 calls on local (L) and remote (R) nodes:
53
54 L: send_xxxx() -> R: receive_xxxx()
55 R: do_xxxx()
56 L: receive_xxxx_reply() <- R: send_xxxx_reply()
57*/
597d0cae 58#include <linux/types.h>
e7fd4179 59#include "dlm_internal.h"
597d0cae 60#include <linux/dlm_device.h>
e7fd4179
DT
61#include "memory.h"
62#include "lowcomms.h"
63#include "requestqueue.h"
64#include "util.h"
65#include "dir.h"
66#include "member.h"
67#include "lockspace.h"
68#include "ast.h"
69#include "lock.h"
70#include "rcom.h"
71#include "recover.h"
72#include "lvb_table.h"
597d0cae 73#include "user.h"
e7fd4179
DT
74#include "config.h"
75
76static int send_request(struct dlm_rsb *r, struct dlm_lkb *lkb);
77static int send_convert(struct dlm_rsb *r, struct dlm_lkb *lkb);
78static int send_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb);
79static int send_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb);
80static int send_grant(struct dlm_rsb *r, struct dlm_lkb *lkb);
81static int send_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int mode);
82static int send_lookup(struct dlm_rsb *r, struct dlm_lkb *lkb);
83static int send_remove(struct dlm_rsb *r);
84static int _request_lock(struct dlm_rsb *r, struct dlm_lkb *lkb);
85static void __receive_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
86 struct dlm_message *ms);
87static int receive_extralen(struct dlm_message *ms);
8499137d 88static void do_purge(struct dlm_ls *ls, int nodeid, int pid);
e7fd4179
DT
89
90/*
91 * Lock compatibilty matrix - thanks Steve
92 * UN = Unlocked state. Not really a state, used as a flag
93 * PD = Padding. Used to make the matrix a nice power of two in size
94 * Other states are the same as the VMS DLM.
95 * Usage: matrix[grmode+1][rqmode+1] (although m[rq+1][gr+1] is the same)
96 */
97
98static const int __dlm_compat_matrix[8][8] = {
99 /* UN NL CR CW PR PW EX PD */
100 {1, 1, 1, 1, 1, 1, 1, 0}, /* UN */
101 {1, 1, 1, 1, 1, 1, 1, 0}, /* NL */
102 {1, 1, 1, 1, 1, 1, 0, 0}, /* CR */
103 {1, 1, 1, 1, 0, 0, 0, 0}, /* CW */
104 {1, 1, 1, 0, 1, 0, 0, 0}, /* PR */
105 {1, 1, 1, 0, 0, 0, 0, 0}, /* PW */
106 {1, 1, 0, 0, 0, 0, 0, 0}, /* EX */
107 {0, 0, 0, 0, 0, 0, 0, 0} /* PD */
108};
109
110/*
111 * This defines the direction of transfer of LVB data.
112 * Granted mode is the row; requested mode is the column.
113 * Usage: matrix[grmode+1][rqmode+1]
114 * 1 = LVB is returned to the caller
115 * 0 = LVB is written to the resource
116 * -1 = nothing happens to the LVB
117 */
118
119const int dlm_lvb_operations[8][8] = {
120 /* UN NL CR CW PR PW EX PD*/
121 { -1, 1, 1, 1, 1, 1, 1, -1 }, /* UN */
122 { -1, 1, 1, 1, 1, 1, 1, 0 }, /* NL */
123 { -1, -1, 1, 1, 1, 1, 1, 0 }, /* CR */
124 { -1, -1, -1, 1, 1, 1, 1, 0 }, /* CW */
125 { -1, -1, -1, -1, 1, 1, 1, 0 }, /* PR */
126 { -1, 0, 0, 0, 0, 0, 1, 0 }, /* PW */
127 { -1, 0, 0, 0, 0, 0, 0, 0 }, /* EX */
128 { -1, 0, 0, 0, 0, 0, 0, 0 } /* PD */
129};
e7fd4179
DT
130
131#define modes_compat(gr, rq) \
132 __dlm_compat_matrix[(gr)->lkb_grmode + 1][(rq)->lkb_rqmode + 1]
133
134int dlm_modes_compat(int mode1, int mode2)
135{
136 return __dlm_compat_matrix[mode1 + 1][mode2 + 1];
137}
138
139/*
140 * Compatibility matrix for conversions with QUECVT set.
141 * Granted mode is the row; requested mode is the column.
142 * Usage: matrix[grmode+1][rqmode+1]
143 */
144
145static const int __quecvt_compat_matrix[8][8] = {
146 /* UN NL CR CW PR PW EX PD */
147 {0, 0, 0, 0, 0, 0, 0, 0}, /* UN */
148 {0, 0, 1, 1, 1, 1, 1, 0}, /* NL */
149 {0, 0, 0, 1, 1, 1, 1, 0}, /* CR */
150 {0, 0, 0, 0, 1, 1, 1, 0}, /* CW */
151 {0, 0, 0, 1, 0, 1, 1, 0}, /* PR */
152 {0, 0, 0, 0, 0, 0, 1, 0}, /* PW */
153 {0, 0, 0, 0, 0, 0, 0, 0}, /* EX */
154 {0, 0, 0, 0, 0, 0, 0, 0} /* PD */
155};
156
597d0cae 157void dlm_print_lkb(struct dlm_lkb *lkb)
e7fd4179
DT
158{
159 printk(KERN_ERR "lkb: nodeid %d id %x remid %x exflags %x flags %x\n"
160 " status %d rqmode %d grmode %d wait_type %d ast_type %d\n",
161 lkb->lkb_nodeid, lkb->lkb_id, lkb->lkb_remid, lkb->lkb_exflags,
162 lkb->lkb_flags, lkb->lkb_status, lkb->lkb_rqmode,
163 lkb->lkb_grmode, lkb->lkb_wait_type, lkb->lkb_ast_type);
164}
165
166void dlm_print_rsb(struct dlm_rsb *r)
167{
168 printk(KERN_ERR "rsb: nodeid %d flags %lx first %x rlc %d name %s\n",
169 r->res_nodeid, r->res_flags, r->res_first_lkid,
170 r->res_recover_locks_count, r->res_name);
171}
172
a345da3e
DT
173void dlm_dump_rsb(struct dlm_rsb *r)
174{
175 struct dlm_lkb *lkb;
176
177 dlm_print_rsb(r);
178
179 printk(KERN_ERR "rsb: root_list empty %d recover_list empty %d\n",
180 list_empty(&r->res_root_list), list_empty(&r->res_recover_list));
181 printk(KERN_ERR "rsb lookup list\n");
182 list_for_each_entry(lkb, &r->res_lookup, lkb_rsb_lookup)
183 dlm_print_lkb(lkb);
184 printk(KERN_ERR "rsb grant queue:\n");
185 list_for_each_entry(lkb, &r->res_grantqueue, lkb_statequeue)
186 dlm_print_lkb(lkb);
187 printk(KERN_ERR "rsb convert queue:\n");
188 list_for_each_entry(lkb, &r->res_convertqueue, lkb_statequeue)
189 dlm_print_lkb(lkb);
190 printk(KERN_ERR "rsb wait queue:\n");
191 list_for_each_entry(lkb, &r->res_waitqueue, lkb_statequeue)
192 dlm_print_lkb(lkb);
193}
194
e7fd4179
DT
195/* Threads cannot use the lockspace while it's being recovered */
196
197static inline void lock_recovery(struct dlm_ls *ls)
198{
199 down_read(&ls->ls_in_recovery);
200}
201
202static inline void unlock_recovery(struct dlm_ls *ls)
203{
204 up_read(&ls->ls_in_recovery);
205}
206
207static inline int lock_recovery_try(struct dlm_ls *ls)
208{
209 return down_read_trylock(&ls->ls_in_recovery);
210}
211
212static inline int can_be_queued(struct dlm_lkb *lkb)
213{
214 return !(lkb->lkb_exflags & DLM_LKF_NOQUEUE);
215}
216
217static inline int force_blocking_asts(struct dlm_lkb *lkb)
218{
219 return (lkb->lkb_exflags & DLM_LKF_NOQUEUEBAST);
220}
221
222static inline int is_demoted(struct dlm_lkb *lkb)
223{
224 return (lkb->lkb_sbflags & DLM_SBF_DEMOTED);
225}
226
7d3c1feb
DT
227static inline int is_altmode(struct dlm_lkb *lkb)
228{
229 return (lkb->lkb_sbflags & DLM_SBF_ALTMODE);
230}
231
232static inline int is_granted(struct dlm_lkb *lkb)
233{
234 return (lkb->lkb_status == DLM_LKSTS_GRANTED);
235}
236
e7fd4179
DT
237static inline int is_remote(struct dlm_rsb *r)
238{
239 DLM_ASSERT(r->res_nodeid >= 0, dlm_print_rsb(r););
240 return !!r->res_nodeid;
241}
242
243static inline int is_process_copy(struct dlm_lkb *lkb)
244{
245 return (lkb->lkb_nodeid && !(lkb->lkb_flags & DLM_IFL_MSTCPY));
246}
247
248static inline int is_master_copy(struct dlm_lkb *lkb)
249{
250 if (lkb->lkb_flags & DLM_IFL_MSTCPY)
251 DLM_ASSERT(lkb->lkb_nodeid, dlm_print_lkb(lkb););
90135925 252 return (lkb->lkb_flags & DLM_IFL_MSTCPY) ? 1 : 0;
e7fd4179
DT
253}
254
255static inline int middle_conversion(struct dlm_lkb *lkb)
256{
257 if ((lkb->lkb_grmode==DLM_LOCK_PR && lkb->lkb_rqmode==DLM_LOCK_CW) ||
258 (lkb->lkb_rqmode==DLM_LOCK_PR && lkb->lkb_grmode==DLM_LOCK_CW))
90135925
DT
259 return 1;
260 return 0;
e7fd4179
DT
261}
262
263static inline int down_conversion(struct dlm_lkb *lkb)
264{
265 return (!middle_conversion(lkb) && lkb->lkb_rqmode < lkb->lkb_grmode);
266}
267
ef0c2bb0
DT
268static inline int is_overlap_unlock(struct dlm_lkb *lkb)
269{
270 return lkb->lkb_flags & DLM_IFL_OVERLAP_UNLOCK;
271}
272
273static inline int is_overlap_cancel(struct dlm_lkb *lkb)
274{
275 return lkb->lkb_flags & DLM_IFL_OVERLAP_CANCEL;
276}
277
278static inline int is_overlap(struct dlm_lkb *lkb)
279{
280 return (lkb->lkb_flags & (DLM_IFL_OVERLAP_UNLOCK |
281 DLM_IFL_OVERLAP_CANCEL));
282}
283
e7fd4179
DT
284static void queue_cast(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
285{
286 if (is_master_copy(lkb))
287 return;
288
289 DLM_ASSERT(lkb->lkb_lksb, dlm_print_lkb(lkb););
290
291 lkb->lkb_lksb->sb_status = rv;
292 lkb->lkb_lksb->sb_flags = lkb->lkb_sbflags;
293
294 dlm_add_ast(lkb, AST_COMP);
295}
296
ef0c2bb0
DT
297static inline void queue_cast_overlap(struct dlm_rsb *r, struct dlm_lkb *lkb)
298{
299 queue_cast(r, lkb,
300 is_overlap_unlock(lkb) ? -DLM_EUNLOCK : -DLM_ECANCEL);
301}
302
e7fd4179
DT
303static void queue_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int rqmode)
304{
305 if (is_master_copy(lkb))
306 send_bast(r, lkb, rqmode);
307 else {
308 lkb->lkb_bastmode = rqmode;
309 dlm_add_ast(lkb, AST_BAST);
310 }
311}
312
313/*
314 * Basic operations on rsb's and lkb's
315 */
316
317static struct dlm_rsb *create_rsb(struct dlm_ls *ls, char *name, int len)
318{
319 struct dlm_rsb *r;
320
321 r = allocate_rsb(ls, len);
322 if (!r)
323 return NULL;
324
325 r->res_ls = ls;
326 r->res_length = len;
327 memcpy(r->res_name, name, len);
90135925 328 mutex_init(&r->res_mutex);
e7fd4179
DT
329
330 INIT_LIST_HEAD(&r->res_lookup);
331 INIT_LIST_HEAD(&r->res_grantqueue);
332 INIT_LIST_HEAD(&r->res_convertqueue);
333 INIT_LIST_HEAD(&r->res_waitqueue);
334 INIT_LIST_HEAD(&r->res_root_list);
335 INIT_LIST_HEAD(&r->res_recover_list);
336
337 return r;
338}
339
340static int search_rsb_list(struct list_head *head, char *name, int len,
341 unsigned int flags, struct dlm_rsb **r_ret)
342{
343 struct dlm_rsb *r;
344 int error = 0;
345
346 list_for_each_entry(r, head, res_hashchain) {
347 if (len == r->res_length && !memcmp(name, r->res_name, len))
348 goto found;
349 }
597d0cae 350 return -EBADR;
e7fd4179
DT
351
352 found:
353 if (r->res_nodeid && (flags & R_MASTER))
354 error = -ENOTBLK;
355 *r_ret = r;
356 return error;
357}
358
359static int _search_rsb(struct dlm_ls *ls, char *name, int len, int b,
360 unsigned int flags, struct dlm_rsb **r_ret)
361{
362 struct dlm_rsb *r;
363 int error;
364
365 error = search_rsb_list(&ls->ls_rsbtbl[b].list, name, len, flags, &r);
366 if (!error) {
367 kref_get(&r->res_ref);
368 goto out;
369 }
370 error = search_rsb_list(&ls->ls_rsbtbl[b].toss, name, len, flags, &r);
371 if (error)
372 goto out;
373
374 list_move(&r->res_hashchain, &ls->ls_rsbtbl[b].list);
375
376 if (dlm_no_directory(ls))
377 goto out;
378
379 if (r->res_nodeid == -1) {
380 rsb_clear_flag(r, RSB_MASTER_UNCERTAIN);
381 r->res_first_lkid = 0;
382 } else if (r->res_nodeid > 0) {
383 rsb_set_flag(r, RSB_MASTER_UNCERTAIN);
384 r->res_first_lkid = 0;
385 } else {
386 DLM_ASSERT(r->res_nodeid == 0, dlm_print_rsb(r););
387 DLM_ASSERT(!rsb_flag(r, RSB_MASTER_UNCERTAIN),);
388 }
389 out:
390 *r_ret = r;
391 return error;
392}
393
394static int search_rsb(struct dlm_ls *ls, char *name, int len, int b,
395 unsigned int flags, struct dlm_rsb **r_ret)
396{
397 int error;
398 write_lock(&ls->ls_rsbtbl[b].lock);
399 error = _search_rsb(ls, name, len, b, flags, r_ret);
400 write_unlock(&ls->ls_rsbtbl[b].lock);
401 return error;
402}
403
404/*
405 * Find rsb in rsbtbl and potentially create/add one
406 *
407 * Delaying the release of rsb's has a similar benefit to applications keeping
408 * NL locks on an rsb, but without the guarantee that the cached master value
409 * will still be valid when the rsb is reused. Apps aren't always smart enough
410 * to keep NL locks on an rsb that they may lock again shortly; this can lead
411 * to excessive master lookups and removals if we don't delay the release.
412 *
413 * Searching for an rsb means looking through both the normal list and toss
414 * list. When found on the toss list the rsb is moved to the normal list with
415 * ref count of 1; when found on normal list the ref count is incremented.
416 */
417
418static int find_rsb(struct dlm_ls *ls, char *name, int namelen,
419 unsigned int flags, struct dlm_rsb **r_ret)
420{
421 struct dlm_rsb *r, *tmp;
422 uint32_t hash, bucket;
423 int error = 0;
424
425 if (dlm_no_directory(ls))
426 flags |= R_CREATE;
427
428 hash = jhash(name, namelen, 0);
429 bucket = hash & (ls->ls_rsbtbl_size - 1);
430
431 error = search_rsb(ls, name, namelen, bucket, flags, &r);
432 if (!error)
433 goto out;
434
597d0cae 435 if (error == -EBADR && !(flags & R_CREATE))
e7fd4179
DT
436 goto out;
437
438 /* the rsb was found but wasn't a master copy */
439 if (error == -ENOTBLK)
440 goto out;
441
442 error = -ENOMEM;
443 r = create_rsb(ls, name, namelen);
444 if (!r)
445 goto out;
446
447 r->res_hash = hash;
448 r->res_bucket = bucket;
449 r->res_nodeid = -1;
450 kref_init(&r->res_ref);
451
452 /* With no directory, the master can be set immediately */
453 if (dlm_no_directory(ls)) {
454 int nodeid = dlm_dir_nodeid(r);
455 if (nodeid == dlm_our_nodeid())
456 nodeid = 0;
457 r->res_nodeid = nodeid;
458 }
459
460 write_lock(&ls->ls_rsbtbl[bucket].lock);
461 error = _search_rsb(ls, name, namelen, bucket, 0, &tmp);
462 if (!error) {
463 write_unlock(&ls->ls_rsbtbl[bucket].lock);
464 free_rsb(r);
465 r = tmp;
466 goto out;
467 }
468 list_add(&r->res_hashchain, &ls->ls_rsbtbl[bucket].list);
469 write_unlock(&ls->ls_rsbtbl[bucket].lock);
470 error = 0;
471 out:
472 *r_ret = r;
473 return error;
474}
475
476int dlm_find_rsb(struct dlm_ls *ls, char *name, int namelen,
477 unsigned int flags, struct dlm_rsb **r_ret)
478{
479 return find_rsb(ls, name, namelen, flags, r_ret);
480}
481
482/* This is only called to add a reference when the code already holds
483 a valid reference to the rsb, so there's no need for locking. */
484
485static inline void hold_rsb(struct dlm_rsb *r)
486{
487 kref_get(&r->res_ref);
488}
489
490void dlm_hold_rsb(struct dlm_rsb *r)
491{
492 hold_rsb(r);
493}
494
495static void toss_rsb(struct kref *kref)
496{
497 struct dlm_rsb *r = container_of(kref, struct dlm_rsb, res_ref);
498 struct dlm_ls *ls = r->res_ls;
499
500 DLM_ASSERT(list_empty(&r->res_root_list), dlm_print_rsb(r););
501 kref_init(&r->res_ref);
502 list_move(&r->res_hashchain, &ls->ls_rsbtbl[r->res_bucket].toss);
503 r->res_toss_time = jiffies;
504 if (r->res_lvbptr) {
505 free_lvb(r->res_lvbptr);
506 r->res_lvbptr = NULL;
507 }
508}
509
510/* When all references to the rsb are gone it's transfered to
511 the tossed list for later disposal. */
512
513static void put_rsb(struct dlm_rsb *r)
514{
515 struct dlm_ls *ls = r->res_ls;
516 uint32_t bucket = r->res_bucket;
517
518 write_lock(&ls->ls_rsbtbl[bucket].lock);
519 kref_put(&r->res_ref, toss_rsb);
520 write_unlock(&ls->ls_rsbtbl[bucket].lock);
521}
522
523void dlm_put_rsb(struct dlm_rsb *r)
524{
525 put_rsb(r);
526}
527
528/* See comment for unhold_lkb */
529
530static void unhold_rsb(struct dlm_rsb *r)
531{
532 int rv;
533 rv = kref_put(&r->res_ref, toss_rsb);
a345da3e 534 DLM_ASSERT(!rv, dlm_dump_rsb(r););
e7fd4179
DT
535}
536
537static void kill_rsb(struct kref *kref)
538{
539 struct dlm_rsb *r = container_of(kref, struct dlm_rsb, res_ref);
540
541 /* All work is done after the return from kref_put() so we
542 can release the write_lock before the remove and free. */
543
a345da3e
DT
544 DLM_ASSERT(list_empty(&r->res_lookup), dlm_dump_rsb(r););
545 DLM_ASSERT(list_empty(&r->res_grantqueue), dlm_dump_rsb(r););
546 DLM_ASSERT(list_empty(&r->res_convertqueue), dlm_dump_rsb(r););
547 DLM_ASSERT(list_empty(&r->res_waitqueue), dlm_dump_rsb(r););
548 DLM_ASSERT(list_empty(&r->res_root_list), dlm_dump_rsb(r););
549 DLM_ASSERT(list_empty(&r->res_recover_list), dlm_dump_rsb(r););
e7fd4179
DT
550}
551
552/* Attaching/detaching lkb's from rsb's is for rsb reference counting.
553 The rsb must exist as long as any lkb's for it do. */
554
555static void attach_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb)
556{
557 hold_rsb(r);
558 lkb->lkb_resource = r;
559}
560
561static void detach_lkb(struct dlm_lkb *lkb)
562{
563 if (lkb->lkb_resource) {
564 put_rsb(lkb->lkb_resource);
565 lkb->lkb_resource = NULL;
566 }
567}
568
569static int create_lkb(struct dlm_ls *ls, struct dlm_lkb **lkb_ret)
570{
571 struct dlm_lkb *lkb, *tmp;
572 uint32_t lkid = 0;
573 uint16_t bucket;
574
575 lkb = allocate_lkb(ls);
576 if (!lkb)
577 return -ENOMEM;
578
579 lkb->lkb_nodeid = -1;
580 lkb->lkb_grmode = DLM_LOCK_IV;
581 kref_init(&lkb->lkb_ref);
34e22bed 582 INIT_LIST_HEAD(&lkb->lkb_ownqueue);
ef0c2bb0 583 INIT_LIST_HEAD(&lkb->lkb_rsb_lookup);
e7fd4179
DT
584
585 get_random_bytes(&bucket, sizeof(bucket));
586 bucket &= (ls->ls_lkbtbl_size - 1);
587
588 write_lock(&ls->ls_lkbtbl[bucket].lock);
589
590 /* counter can roll over so we must verify lkid is not in use */
591
592 while (lkid == 0) {
ce03f12b 593 lkid = (bucket << 16) | ls->ls_lkbtbl[bucket].counter++;
e7fd4179
DT
594
595 list_for_each_entry(tmp, &ls->ls_lkbtbl[bucket].list,
596 lkb_idtbl_list) {
597 if (tmp->lkb_id != lkid)
598 continue;
599 lkid = 0;
600 break;
601 }
602 }
603
604 lkb->lkb_id = lkid;
605 list_add(&lkb->lkb_idtbl_list, &ls->ls_lkbtbl[bucket].list);
606 write_unlock(&ls->ls_lkbtbl[bucket].lock);
607
608 *lkb_ret = lkb;
609 return 0;
610}
611
612static struct dlm_lkb *__find_lkb(struct dlm_ls *ls, uint32_t lkid)
613{
e7fd4179 614 struct dlm_lkb *lkb;
ce03f12b 615 uint16_t bucket = (lkid >> 16);
e7fd4179
DT
616
617 list_for_each_entry(lkb, &ls->ls_lkbtbl[bucket].list, lkb_idtbl_list) {
618 if (lkb->lkb_id == lkid)
619 return lkb;
620 }
621 return NULL;
622}
623
624static int find_lkb(struct dlm_ls *ls, uint32_t lkid, struct dlm_lkb **lkb_ret)
625{
626 struct dlm_lkb *lkb;
ce03f12b 627 uint16_t bucket = (lkid >> 16);
e7fd4179
DT
628
629 if (bucket >= ls->ls_lkbtbl_size)
630 return -EBADSLT;
631
632 read_lock(&ls->ls_lkbtbl[bucket].lock);
633 lkb = __find_lkb(ls, lkid);
634 if (lkb)
635 kref_get(&lkb->lkb_ref);
636 read_unlock(&ls->ls_lkbtbl[bucket].lock);
637
638 *lkb_ret = lkb;
639 return lkb ? 0 : -ENOENT;
640}
641
642static void kill_lkb(struct kref *kref)
643{
644 struct dlm_lkb *lkb = container_of(kref, struct dlm_lkb, lkb_ref);
645
646 /* All work is done after the return from kref_put() so we
647 can release the write_lock before the detach_lkb */
648
649 DLM_ASSERT(!lkb->lkb_status, dlm_print_lkb(lkb););
650}
651
b3f58d8f
DT
652/* __put_lkb() is used when an lkb may not have an rsb attached to
653 it so we need to provide the lockspace explicitly */
654
655static int __put_lkb(struct dlm_ls *ls, struct dlm_lkb *lkb)
e7fd4179 656{
ce03f12b 657 uint16_t bucket = (lkb->lkb_id >> 16);
e7fd4179
DT
658
659 write_lock(&ls->ls_lkbtbl[bucket].lock);
660 if (kref_put(&lkb->lkb_ref, kill_lkb)) {
661 list_del(&lkb->lkb_idtbl_list);
662 write_unlock(&ls->ls_lkbtbl[bucket].lock);
663
664 detach_lkb(lkb);
665
666 /* for local/process lkbs, lvbptr points to caller's lksb */
667 if (lkb->lkb_lvbptr && is_master_copy(lkb))
668 free_lvb(lkb->lkb_lvbptr);
e7fd4179
DT
669 free_lkb(lkb);
670 return 1;
671 } else {
672 write_unlock(&ls->ls_lkbtbl[bucket].lock);
673 return 0;
674 }
675}
676
677int dlm_put_lkb(struct dlm_lkb *lkb)
678{
b3f58d8f
DT
679 struct dlm_ls *ls;
680
681 DLM_ASSERT(lkb->lkb_resource, dlm_print_lkb(lkb););
682 DLM_ASSERT(lkb->lkb_resource->res_ls, dlm_print_lkb(lkb););
683
684 ls = lkb->lkb_resource->res_ls;
685 return __put_lkb(ls, lkb);
e7fd4179
DT
686}
687
688/* This is only called to add a reference when the code already holds
689 a valid reference to the lkb, so there's no need for locking. */
690
691static inline void hold_lkb(struct dlm_lkb *lkb)
692{
693 kref_get(&lkb->lkb_ref);
694}
695
696/* This is called when we need to remove a reference and are certain
697 it's not the last ref. e.g. del_lkb is always called between a
698 find_lkb/put_lkb and is always the inverse of a previous add_lkb.
699 put_lkb would work fine, but would involve unnecessary locking */
700
701static inline void unhold_lkb(struct dlm_lkb *lkb)
702{
703 int rv;
704 rv = kref_put(&lkb->lkb_ref, kill_lkb);
705 DLM_ASSERT(!rv, dlm_print_lkb(lkb););
706}
707
708static void lkb_add_ordered(struct list_head *new, struct list_head *head,
709 int mode)
710{
711 struct dlm_lkb *lkb = NULL;
712
713 list_for_each_entry(lkb, head, lkb_statequeue)
714 if (lkb->lkb_rqmode < mode)
715 break;
716
717 if (!lkb)
718 list_add_tail(new, head);
719 else
720 __list_add(new, lkb->lkb_statequeue.prev, &lkb->lkb_statequeue);
721}
722
723/* add/remove lkb to rsb's grant/convert/wait queue */
724
725static void add_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb, int status)
726{
727 kref_get(&lkb->lkb_ref);
728
729 DLM_ASSERT(!lkb->lkb_status, dlm_print_lkb(lkb););
730
731 lkb->lkb_status = status;
732
733 switch (status) {
734 case DLM_LKSTS_WAITING:
735 if (lkb->lkb_exflags & DLM_LKF_HEADQUE)
736 list_add(&lkb->lkb_statequeue, &r->res_waitqueue);
737 else
738 list_add_tail(&lkb->lkb_statequeue, &r->res_waitqueue);
739 break;
740 case DLM_LKSTS_GRANTED:
741 /* convention says granted locks kept in order of grmode */
742 lkb_add_ordered(&lkb->lkb_statequeue, &r->res_grantqueue,
743 lkb->lkb_grmode);
744 break;
745 case DLM_LKSTS_CONVERT:
746 if (lkb->lkb_exflags & DLM_LKF_HEADQUE)
747 list_add(&lkb->lkb_statequeue, &r->res_convertqueue);
748 else
749 list_add_tail(&lkb->lkb_statequeue,
750 &r->res_convertqueue);
751 break;
752 default:
753 DLM_ASSERT(0, dlm_print_lkb(lkb); printk("sts=%d\n", status););
754 }
755}
756
757static void del_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb)
758{
759 lkb->lkb_status = 0;
760 list_del(&lkb->lkb_statequeue);
761 unhold_lkb(lkb);
762}
763
764static void move_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb, int sts)
765{
766 hold_lkb(lkb);
767 del_lkb(r, lkb);
768 add_lkb(r, lkb, sts);
769 unhold_lkb(lkb);
770}
771
ef0c2bb0
DT
772static int msg_reply_type(int mstype)
773{
774 switch (mstype) {
775 case DLM_MSG_REQUEST:
776 return DLM_MSG_REQUEST_REPLY;
777 case DLM_MSG_CONVERT:
778 return DLM_MSG_CONVERT_REPLY;
779 case DLM_MSG_UNLOCK:
780 return DLM_MSG_UNLOCK_REPLY;
781 case DLM_MSG_CANCEL:
782 return DLM_MSG_CANCEL_REPLY;
783 case DLM_MSG_LOOKUP:
784 return DLM_MSG_LOOKUP_REPLY;
785 }
786 return -1;
787}
788
e7fd4179
DT
789/* add/remove lkb from global waiters list of lkb's waiting for
790 a reply from a remote node */
791
ef0c2bb0 792static int add_to_waiters(struct dlm_lkb *lkb, int mstype)
e7fd4179
DT
793{
794 struct dlm_ls *ls = lkb->lkb_resource->res_ls;
ef0c2bb0 795 int error = 0;
e7fd4179 796
90135925 797 mutex_lock(&ls->ls_waiters_mutex);
ef0c2bb0
DT
798
799 if (is_overlap_unlock(lkb) ||
800 (is_overlap_cancel(lkb) && (mstype == DLM_MSG_CANCEL))) {
801 error = -EINVAL;
802 goto out;
803 }
804
805 if (lkb->lkb_wait_type || is_overlap_cancel(lkb)) {
806 switch (mstype) {
807 case DLM_MSG_UNLOCK:
808 lkb->lkb_flags |= DLM_IFL_OVERLAP_UNLOCK;
809 break;
810 case DLM_MSG_CANCEL:
811 lkb->lkb_flags |= DLM_IFL_OVERLAP_CANCEL;
812 break;
813 default:
814 error = -EBUSY;
815 goto out;
816 }
817 lkb->lkb_wait_count++;
818 hold_lkb(lkb);
819
820 log_debug(ls, "add overlap %x cur %d new %d count %d flags %x",
821 lkb->lkb_id, lkb->lkb_wait_type, mstype,
822 lkb->lkb_wait_count, lkb->lkb_flags);
e7fd4179
DT
823 goto out;
824 }
ef0c2bb0
DT
825
826 DLM_ASSERT(!lkb->lkb_wait_count,
827 dlm_print_lkb(lkb);
828 printk("wait_count %d\n", lkb->lkb_wait_count););
829
830 lkb->lkb_wait_count++;
e7fd4179 831 lkb->lkb_wait_type = mstype;
ef0c2bb0 832 hold_lkb(lkb);
e7fd4179
DT
833 list_add(&lkb->lkb_wait_reply, &ls->ls_waiters);
834 out:
ef0c2bb0
DT
835 if (error)
836 log_error(ls, "add_to_waiters %x error %d flags %x %d %d %s",
837 lkb->lkb_id, error, lkb->lkb_flags, mstype,
838 lkb->lkb_wait_type, lkb->lkb_resource->res_name);
90135925 839 mutex_unlock(&ls->ls_waiters_mutex);
ef0c2bb0 840 return error;
e7fd4179
DT
841}
842
b790c3b7
DT
843/* We clear the RESEND flag because we might be taking an lkb off the waiters
844 list as part of process_requestqueue (e.g. a lookup that has an optimized
845 request reply on the requestqueue) between dlm_recover_waiters_pre() which
846 set RESEND and dlm_recover_waiters_post() */
847
ef0c2bb0 848static int _remove_from_waiters(struct dlm_lkb *lkb, int mstype)
e7fd4179 849{
ef0c2bb0
DT
850 struct dlm_ls *ls = lkb->lkb_resource->res_ls;
851 int overlap_done = 0;
e7fd4179 852
ef0c2bb0
DT
853 if (is_overlap_unlock(lkb) && (mstype == DLM_MSG_UNLOCK_REPLY)) {
854 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK;
855 overlap_done = 1;
856 goto out_del;
e7fd4179 857 }
ef0c2bb0
DT
858
859 if (is_overlap_cancel(lkb) && (mstype == DLM_MSG_CANCEL_REPLY)) {
860 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
861 overlap_done = 1;
862 goto out_del;
863 }
864
865 /* N.B. type of reply may not always correspond to type of original
866 msg due to lookup->request optimization, verify others? */
867
868 if (lkb->lkb_wait_type) {
869 lkb->lkb_wait_type = 0;
870 goto out_del;
871 }
872
873 log_error(ls, "remove_from_waiters lkid %x flags %x types %d %d",
874 lkb->lkb_id, lkb->lkb_flags, mstype, lkb->lkb_wait_type);
875 return -1;
876
877 out_del:
878 /* the force-unlock/cancel has completed and we haven't recvd a reply
879 to the op that was in progress prior to the unlock/cancel; we
880 give up on any reply to the earlier op. FIXME: not sure when/how
881 this would happen */
882
883 if (overlap_done && lkb->lkb_wait_type) {
884 log_error(ls, "remove_from_waiters %x reply %d give up on %d",
885 lkb->lkb_id, mstype, lkb->lkb_wait_type);
886 lkb->lkb_wait_count--;
887 lkb->lkb_wait_type = 0;
888 }
889
890 DLM_ASSERT(lkb->lkb_wait_count, dlm_print_lkb(lkb););
891
b790c3b7 892 lkb->lkb_flags &= ~DLM_IFL_RESEND;
ef0c2bb0
DT
893 lkb->lkb_wait_count--;
894 if (!lkb->lkb_wait_count)
895 list_del_init(&lkb->lkb_wait_reply);
e7fd4179 896 unhold_lkb(lkb);
ef0c2bb0 897 return 0;
e7fd4179
DT
898}
899
ef0c2bb0 900static int remove_from_waiters(struct dlm_lkb *lkb, int mstype)
e7fd4179
DT
901{
902 struct dlm_ls *ls = lkb->lkb_resource->res_ls;
903 int error;
904
90135925 905 mutex_lock(&ls->ls_waiters_mutex);
ef0c2bb0 906 error = _remove_from_waiters(lkb, mstype);
90135925 907 mutex_unlock(&ls->ls_waiters_mutex);
e7fd4179
DT
908 return error;
909}
910
ef0c2bb0
DT
911/* Handles situations where we might be processing a "fake" or "stub" reply in
912 which we can't try to take waiters_mutex again. */
913
914static int remove_from_waiters_ms(struct dlm_lkb *lkb, struct dlm_message *ms)
915{
916 struct dlm_ls *ls = lkb->lkb_resource->res_ls;
917 int error;
918
919 if (ms != &ls->ls_stub_ms)
920 mutex_lock(&ls->ls_waiters_mutex);
921 error = _remove_from_waiters(lkb, ms->m_type);
922 if (ms != &ls->ls_stub_ms)
923 mutex_unlock(&ls->ls_waiters_mutex);
924 return error;
925}
926
e7fd4179
DT
927static void dir_remove(struct dlm_rsb *r)
928{
929 int to_nodeid;
930
931 if (dlm_no_directory(r->res_ls))
932 return;
933
934 to_nodeid = dlm_dir_nodeid(r);
935 if (to_nodeid != dlm_our_nodeid())
936 send_remove(r);
937 else
938 dlm_dir_remove_entry(r->res_ls, to_nodeid,
939 r->res_name, r->res_length);
940}
941
942/* FIXME: shouldn't this be able to exit as soon as one non-due rsb is
943 found since they are in order of newest to oldest? */
944
945static int shrink_bucket(struct dlm_ls *ls, int b)
946{
947 struct dlm_rsb *r;
948 int count = 0, found;
949
950 for (;;) {
90135925 951 found = 0;
e7fd4179
DT
952 write_lock(&ls->ls_rsbtbl[b].lock);
953 list_for_each_entry_reverse(r, &ls->ls_rsbtbl[b].toss,
954 res_hashchain) {
955 if (!time_after_eq(jiffies, r->res_toss_time +
68c817a1 956 dlm_config.ci_toss_secs * HZ))
e7fd4179 957 continue;
90135925 958 found = 1;
e7fd4179
DT
959 break;
960 }
961
962 if (!found) {
963 write_unlock(&ls->ls_rsbtbl[b].lock);
964 break;
965 }
966
967 if (kref_put(&r->res_ref, kill_rsb)) {
968 list_del(&r->res_hashchain);
969 write_unlock(&ls->ls_rsbtbl[b].lock);
970
971 if (is_master(r))
972 dir_remove(r);
973 free_rsb(r);
974 count++;
975 } else {
976 write_unlock(&ls->ls_rsbtbl[b].lock);
977 log_error(ls, "tossed rsb in use %s", r->res_name);
978 }
979 }
980
981 return count;
982}
983
984void dlm_scan_rsbs(struct dlm_ls *ls)
985{
986 int i;
987
988 if (dlm_locking_stopped(ls))
989 return;
990
991 for (i = 0; i < ls->ls_rsbtbl_size; i++) {
992 shrink_bucket(ls, i);
993 cond_resched();
994 }
995}
996
997/* lkb is master or local copy */
998
999static void set_lvb_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1000{
1001 int b, len = r->res_ls->ls_lvblen;
1002
1003 /* b=1 lvb returned to caller
1004 b=0 lvb written to rsb or invalidated
1005 b=-1 do nothing */
1006
1007 b = dlm_lvb_operations[lkb->lkb_grmode + 1][lkb->lkb_rqmode + 1];
1008
1009 if (b == 1) {
1010 if (!lkb->lkb_lvbptr)
1011 return;
1012
1013 if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
1014 return;
1015
1016 if (!r->res_lvbptr)
1017 return;
1018
1019 memcpy(lkb->lkb_lvbptr, r->res_lvbptr, len);
1020 lkb->lkb_lvbseq = r->res_lvbseq;
1021
1022 } else if (b == 0) {
1023 if (lkb->lkb_exflags & DLM_LKF_IVVALBLK) {
1024 rsb_set_flag(r, RSB_VALNOTVALID);
1025 return;
1026 }
1027
1028 if (!lkb->lkb_lvbptr)
1029 return;
1030
1031 if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
1032 return;
1033
1034 if (!r->res_lvbptr)
1035 r->res_lvbptr = allocate_lvb(r->res_ls);
1036
1037 if (!r->res_lvbptr)
1038 return;
1039
1040 memcpy(r->res_lvbptr, lkb->lkb_lvbptr, len);
1041 r->res_lvbseq++;
1042 lkb->lkb_lvbseq = r->res_lvbseq;
1043 rsb_clear_flag(r, RSB_VALNOTVALID);
1044 }
1045
1046 if (rsb_flag(r, RSB_VALNOTVALID))
1047 lkb->lkb_sbflags |= DLM_SBF_VALNOTVALID;
1048}
1049
1050static void set_lvb_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1051{
1052 if (lkb->lkb_grmode < DLM_LOCK_PW)
1053 return;
1054
1055 if (lkb->lkb_exflags & DLM_LKF_IVVALBLK) {
1056 rsb_set_flag(r, RSB_VALNOTVALID);
1057 return;
1058 }
1059
1060 if (!lkb->lkb_lvbptr)
1061 return;
1062
1063 if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
1064 return;
1065
1066 if (!r->res_lvbptr)
1067 r->res_lvbptr = allocate_lvb(r->res_ls);
1068
1069 if (!r->res_lvbptr)
1070 return;
1071
1072 memcpy(r->res_lvbptr, lkb->lkb_lvbptr, r->res_ls->ls_lvblen);
1073 r->res_lvbseq++;
1074 rsb_clear_flag(r, RSB_VALNOTVALID);
1075}
1076
1077/* lkb is process copy (pc) */
1078
1079static void set_lvb_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb,
1080 struct dlm_message *ms)
1081{
1082 int b;
1083
1084 if (!lkb->lkb_lvbptr)
1085 return;
1086
1087 if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
1088 return;
1089
597d0cae 1090 b = dlm_lvb_operations[lkb->lkb_grmode + 1][lkb->lkb_rqmode + 1];
e7fd4179
DT
1091 if (b == 1) {
1092 int len = receive_extralen(ms);
1093 memcpy(lkb->lkb_lvbptr, ms->m_extra, len);
1094 lkb->lkb_lvbseq = ms->m_lvbseq;
1095 }
1096}
1097
1098/* Manipulate lkb's on rsb's convert/granted/waiting queues
1099 remove_lock -- used for unlock, removes lkb from granted
1100 revert_lock -- used for cancel, moves lkb from convert to granted
1101 grant_lock -- used for request and convert, adds lkb to granted or
1102 moves lkb from convert or waiting to granted
1103
1104 Each of these is used for master or local copy lkb's. There is
1105 also a _pc() variation used to make the corresponding change on
1106 a process copy (pc) lkb. */
1107
1108static void _remove_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1109{
1110 del_lkb(r, lkb);
1111 lkb->lkb_grmode = DLM_LOCK_IV;
1112 /* this unhold undoes the original ref from create_lkb()
1113 so this leads to the lkb being freed */
1114 unhold_lkb(lkb);
1115}
1116
1117static void remove_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1118{
1119 set_lvb_unlock(r, lkb);
1120 _remove_lock(r, lkb);
1121}
1122
1123static void remove_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb)
1124{
1125 _remove_lock(r, lkb);
1126}
1127
ef0c2bb0
DT
1128/* returns: 0 did nothing
1129 1 moved lock to granted
1130 -1 removed lock */
1131
1132static int revert_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
e7fd4179 1133{
ef0c2bb0
DT
1134 int rv = 0;
1135
e7fd4179
DT
1136 lkb->lkb_rqmode = DLM_LOCK_IV;
1137
1138 switch (lkb->lkb_status) {
597d0cae
DT
1139 case DLM_LKSTS_GRANTED:
1140 break;
e7fd4179
DT
1141 case DLM_LKSTS_CONVERT:
1142 move_lkb(r, lkb, DLM_LKSTS_GRANTED);
ef0c2bb0 1143 rv = 1;
e7fd4179
DT
1144 break;
1145 case DLM_LKSTS_WAITING:
1146 del_lkb(r, lkb);
1147 lkb->lkb_grmode = DLM_LOCK_IV;
1148 /* this unhold undoes the original ref from create_lkb()
1149 so this leads to the lkb being freed */
1150 unhold_lkb(lkb);
ef0c2bb0 1151 rv = -1;
e7fd4179
DT
1152 break;
1153 default:
1154 log_print("invalid status for revert %d", lkb->lkb_status);
1155 }
ef0c2bb0 1156 return rv;
e7fd4179
DT
1157}
1158
ef0c2bb0 1159static int revert_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb)
e7fd4179 1160{
ef0c2bb0 1161 return revert_lock(r, lkb);
e7fd4179
DT
1162}
1163
1164static void _grant_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1165{
1166 if (lkb->lkb_grmode != lkb->lkb_rqmode) {
1167 lkb->lkb_grmode = lkb->lkb_rqmode;
1168 if (lkb->lkb_status)
1169 move_lkb(r, lkb, DLM_LKSTS_GRANTED);
1170 else
1171 add_lkb(r, lkb, DLM_LKSTS_GRANTED);
1172 }
1173
1174 lkb->lkb_rqmode = DLM_LOCK_IV;
e7fd4179
DT
1175}
1176
1177static void grant_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1178{
1179 set_lvb_lock(r, lkb);
1180 _grant_lock(r, lkb);
1181 lkb->lkb_highbast = 0;
1182}
1183
1184static void grant_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb,
1185 struct dlm_message *ms)
1186{
1187 set_lvb_lock_pc(r, lkb, ms);
1188 _grant_lock(r, lkb);
1189}
1190
1191/* called by grant_pending_locks() which means an async grant message must
1192 be sent to the requesting node in addition to granting the lock if the
1193 lkb belongs to a remote node. */
1194
1195static void grant_lock_pending(struct dlm_rsb *r, struct dlm_lkb *lkb)
1196{
1197 grant_lock(r, lkb);
1198 if (is_master_copy(lkb))
1199 send_grant(r, lkb);
1200 else
1201 queue_cast(r, lkb, 0);
1202}
1203
7d3c1feb
DT
1204/* The special CONVDEADLK, ALTPR and ALTCW flags allow the master to
1205 change the granted/requested modes. We're munging things accordingly in
1206 the process copy.
1207 CONVDEADLK: our grmode may have been forced down to NL to resolve a
1208 conversion deadlock
1209 ALTPR/ALTCW: our rqmode may have been changed to PR or CW to become
1210 compatible with other granted locks */
1211
1212static void munge_demoted(struct dlm_lkb *lkb, struct dlm_message *ms)
1213{
1214 if (ms->m_type != DLM_MSG_CONVERT_REPLY) {
1215 log_print("munge_demoted %x invalid reply type %d",
1216 lkb->lkb_id, ms->m_type);
1217 return;
1218 }
1219
1220 if (lkb->lkb_rqmode == DLM_LOCK_IV || lkb->lkb_grmode == DLM_LOCK_IV) {
1221 log_print("munge_demoted %x invalid modes gr %d rq %d",
1222 lkb->lkb_id, lkb->lkb_grmode, lkb->lkb_rqmode);
1223 return;
1224 }
1225
1226 lkb->lkb_grmode = DLM_LOCK_NL;
1227}
1228
1229static void munge_altmode(struct dlm_lkb *lkb, struct dlm_message *ms)
1230{
1231 if (ms->m_type != DLM_MSG_REQUEST_REPLY &&
1232 ms->m_type != DLM_MSG_GRANT) {
1233 log_print("munge_altmode %x invalid reply type %d",
1234 lkb->lkb_id, ms->m_type);
1235 return;
1236 }
1237
1238 if (lkb->lkb_exflags & DLM_LKF_ALTPR)
1239 lkb->lkb_rqmode = DLM_LOCK_PR;
1240 else if (lkb->lkb_exflags & DLM_LKF_ALTCW)
1241 lkb->lkb_rqmode = DLM_LOCK_CW;
1242 else {
1243 log_print("munge_altmode invalid exflags %x", lkb->lkb_exflags);
1244 dlm_print_lkb(lkb);
1245 }
1246}
1247
e7fd4179
DT
1248static inline int first_in_list(struct dlm_lkb *lkb, struct list_head *head)
1249{
1250 struct dlm_lkb *first = list_entry(head->next, struct dlm_lkb,
1251 lkb_statequeue);
1252 if (lkb->lkb_id == first->lkb_id)
90135925 1253 return 1;
e7fd4179 1254
90135925 1255 return 0;
e7fd4179
DT
1256}
1257
e7fd4179
DT
1258/* Check if the given lkb conflicts with another lkb on the queue. */
1259
1260static int queue_conflict(struct list_head *head, struct dlm_lkb *lkb)
1261{
1262 struct dlm_lkb *this;
1263
1264 list_for_each_entry(this, head, lkb_statequeue) {
1265 if (this == lkb)
1266 continue;
3bcd3687 1267 if (!modes_compat(this, lkb))
90135925 1268 return 1;
e7fd4179 1269 }
90135925 1270 return 0;
e7fd4179
DT
1271}
1272
1273/*
1274 * "A conversion deadlock arises with a pair of lock requests in the converting
1275 * queue for one resource. The granted mode of each lock blocks the requested
1276 * mode of the other lock."
1277 *
1278 * Part 2: if the granted mode of lkb is preventing the first lkb in the
1279 * convert queue from being granted, then demote lkb (set grmode to NL).
1280 * This second form requires that we check for conv-deadlk even when
1281 * now == 0 in _can_be_granted().
1282 *
1283 * Example:
1284 * Granted Queue: empty
1285 * Convert Queue: NL->EX (first lock)
1286 * PR->EX (second lock)
1287 *
1288 * The first lock can't be granted because of the granted mode of the second
1289 * lock and the second lock can't be granted because it's not first in the
1290 * list. We demote the granted mode of the second lock (the lkb passed to this
1291 * function).
1292 *
1293 * After the resolution, the "grant pending" function needs to go back and try
1294 * to grant locks on the convert queue again since the first lock can now be
1295 * granted.
1296 */
1297
1298static int conversion_deadlock_detect(struct dlm_rsb *rsb, struct dlm_lkb *lkb)
1299{
1300 struct dlm_lkb *this, *first = NULL, *self = NULL;
1301
1302 list_for_each_entry(this, &rsb->res_convertqueue, lkb_statequeue) {
1303 if (!first)
1304 first = this;
1305 if (this == lkb) {
1306 self = lkb;
1307 continue;
1308 }
1309
e7fd4179 1310 if (!modes_compat(this, lkb) && !modes_compat(lkb, this))
90135925 1311 return 1;
e7fd4179
DT
1312 }
1313
1314 /* if lkb is on the convert queue and is preventing the first
1315 from being granted, then there's deadlock and we demote lkb.
1316 multiple converting locks may need to do this before the first
1317 converting lock can be granted. */
1318
1319 if (self && self != first) {
1320 if (!modes_compat(lkb, first) &&
1321 !queue_conflict(&rsb->res_grantqueue, first))
90135925 1322 return 1;
e7fd4179
DT
1323 }
1324
90135925 1325 return 0;
e7fd4179
DT
1326}
1327
1328/*
1329 * Return 1 if the lock can be granted, 0 otherwise.
1330 * Also detect and resolve conversion deadlocks.
1331 *
1332 * lkb is the lock to be granted
1333 *
1334 * now is 1 if the function is being called in the context of the
1335 * immediate request, it is 0 if called later, after the lock has been
1336 * queued.
1337 *
1338 * References are from chapter 6 of "VAXcluster Principles" by Roy Davis
1339 */
1340
1341static int _can_be_granted(struct dlm_rsb *r, struct dlm_lkb *lkb, int now)
1342{
1343 int8_t conv = (lkb->lkb_grmode != DLM_LOCK_IV);
1344
1345 /*
1346 * 6-10: Version 5.4 introduced an option to address the phenomenon of
1347 * a new request for a NL mode lock being blocked.
1348 *
1349 * 6-11: If the optional EXPEDITE flag is used with the new NL mode
1350 * request, then it would be granted. In essence, the use of this flag
1351 * tells the Lock Manager to expedite theis request by not considering
1352 * what may be in the CONVERTING or WAITING queues... As of this
1353 * writing, the EXPEDITE flag can be used only with new requests for NL
1354 * mode locks. This flag is not valid for conversion requests.
1355 *
1356 * A shortcut. Earlier checks return an error if EXPEDITE is used in a
1357 * conversion or used with a non-NL requested mode. We also know an
1358 * EXPEDITE request is always granted immediately, so now must always
1359 * be 1. The full condition to grant an expedite request: (now &&
1360 * !conv && lkb->rqmode == DLM_LOCK_NL && (flags & EXPEDITE)) can
1361 * therefore be shortened to just checking the flag.
1362 */
1363
1364 if (lkb->lkb_exflags & DLM_LKF_EXPEDITE)
90135925 1365 return 1;
e7fd4179
DT
1366
1367 /*
1368 * A shortcut. Without this, !queue_conflict(grantqueue, lkb) would be
1369 * added to the remaining conditions.
1370 */
1371
1372 if (queue_conflict(&r->res_grantqueue, lkb))
1373 goto out;
1374
1375 /*
1376 * 6-3: By default, a conversion request is immediately granted if the
1377 * requested mode is compatible with the modes of all other granted
1378 * locks
1379 */
1380
1381 if (queue_conflict(&r->res_convertqueue, lkb))
1382 goto out;
1383
1384 /*
1385 * 6-5: But the default algorithm for deciding whether to grant or
1386 * queue conversion requests does not by itself guarantee that such
1387 * requests are serviced on a "first come first serve" basis. This, in
1388 * turn, can lead to a phenomenon known as "indefinate postponement".
1389 *
1390 * 6-7: This issue is dealt with by using the optional QUECVT flag with
1391 * the system service employed to request a lock conversion. This flag
1392 * forces certain conversion requests to be queued, even if they are
1393 * compatible with the granted modes of other locks on the same
1394 * resource. Thus, the use of this flag results in conversion requests
1395 * being ordered on a "first come first servce" basis.
1396 *
1397 * DCT: This condition is all about new conversions being able to occur
1398 * "in place" while the lock remains on the granted queue (assuming
1399 * nothing else conflicts.) IOW if QUECVT isn't set, a conversion
1400 * doesn't _have_ to go onto the convert queue where it's processed in
1401 * order. The "now" variable is necessary to distinguish converts
1402 * being received and processed for the first time now, because once a
1403 * convert is moved to the conversion queue the condition below applies
1404 * requiring fifo granting.
1405 */
1406
1407 if (now && conv && !(lkb->lkb_exflags & DLM_LKF_QUECVT))
90135925 1408 return 1;
e7fd4179
DT
1409
1410 /*
3bcd3687
DT
1411 * The NOORDER flag is set to avoid the standard vms rules on grant
1412 * order.
e7fd4179
DT
1413 */
1414
1415 if (lkb->lkb_exflags & DLM_LKF_NOORDER)
90135925 1416 return 1;
e7fd4179
DT
1417
1418 /*
1419 * 6-3: Once in that queue [CONVERTING], a conversion request cannot be
1420 * granted until all other conversion requests ahead of it are granted
1421 * and/or canceled.
1422 */
1423
1424 if (!now && conv && first_in_list(lkb, &r->res_convertqueue))
90135925 1425 return 1;
e7fd4179
DT
1426
1427 /*
1428 * 6-4: By default, a new request is immediately granted only if all
1429 * three of the following conditions are satisfied when the request is
1430 * issued:
1431 * - The queue of ungranted conversion requests for the resource is
1432 * empty.
1433 * - The queue of ungranted new requests for the resource is empty.
1434 * - The mode of the new request is compatible with the most
1435 * restrictive mode of all granted locks on the resource.
1436 */
1437
1438 if (now && !conv && list_empty(&r->res_convertqueue) &&
1439 list_empty(&r->res_waitqueue))
90135925 1440 return 1;
e7fd4179
DT
1441
1442 /*
1443 * 6-4: Once a lock request is in the queue of ungranted new requests,
1444 * it cannot be granted until the queue of ungranted conversion
1445 * requests is empty, all ungranted new requests ahead of it are
1446 * granted and/or canceled, and it is compatible with the granted mode
1447 * of the most restrictive lock granted on the resource.
1448 */
1449
1450 if (!now && !conv && list_empty(&r->res_convertqueue) &&
1451 first_in_list(lkb, &r->res_waitqueue))
90135925 1452 return 1;
e7fd4179
DT
1453
1454 out:
1455 /*
1456 * The following, enabled by CONVDEADLK, departs from VMS.
1457 */
1458
1459 if (conv && (lkb->lkb_exflags & DLM_LKF_CONVDEADLK) &&
1460 conversion_deadlock_detect(r, lkb)) {
1461 lkb->lkb_grmode = DLM_LOCK_NL;
1462 lkb->lkb_sbflags |= DLM_SBF_DEMOTED;
1463 }
1464
90135925 1465 return 0;
e7fd4179
DT
1466}
1467
1468/*
1469 * The ALTPR and ALTCW flags aren't traditional lock manager flags, but are a
1470 * simple way to provide a big optimization to applications that can use them.
1471 */
1472
1473static int can_be_granted(struct dlm_rsb *r, struct dlm_lkb *lkb, int now)
1474{
1475 uint32_t flags = lkb->lkb_exflags;
1476 int rv;
1477 int8_t alt = 0, rqmode = lkb->lkb_rqmode;
1478
1479 rv = _can_be_granted(r, lkb, now);
1480 if (rv)
1481 goto out;
1482
1483 if (lkb->lkb_sbflags & DLM_SBF_DEMOTED)
1484 goto out;
1485
1486 if (rqmode != DLM_LOCK_PR && flags & DLM_LKF_ALTPR)
1487 alt = DLM_LOCK_PR;
1488 else if (rqmode != DLM_LOCK_CW && flags & DLM_LKF_ALTCW)
1489 alt = DLM_LOCK_CW;
1490
1491 if (alt) {
1492 lkb->lkb_rqmode = alt;
1493 rv = _can_be_granted(r, lkb, now);
1494 if (rv)
1495 lkb->lkb_sbflags |= DLM_SBF_ALTMODE;
1496 else
1497 lkb->lkb_rqmode = rqmode;
1498 }
1499 out:
1500 return rv;
1501}
1502
1503static int grant_pending_convert(struct dlm_rsb *r, int high)
1504{
1505 struct dlm_lkb *lkb, *s;
1506 int hi, demoted, quit, grant_restart, demote_restart;
1507
1508 quit = 0;
1509 restart:
1510 grant_restart = 0;
1511 demote_restart = 0;
1512 hi = DLM_LOCK_IV;
1513
1514 list_for_each_entry_safe(lkb, s, &r->res_convertqueue, lkb_statequeue) {
1515 demoted = is_demoted(lkb);
90135925 1516 if (can_be_granted(r, lkb, 0)) {
e7fd4179
DT
1517 grant_lock_pending(r, lkb);
1518 grant_restart = 1;
1519 } else {
1520 hi = max_t(int, lkb->lkb_rqmode, hi);
1521 if (!demoted && is_demoted(lkb))
1522 demote_restart = 1;
1523 }
1524 }
1525
1526 if (grant_restart)
1527 goto restart;
1528 if (demote_restart && !quit) {
1529 quit = 1;
1530 goto restart;
1531 }
1532
1533 return max_t(int, high, hi);
1534}
1535
1536static int grant_pending_wait(struct dlm_rsb *r, int high)
1537{
1538 struct dlm_lkb *lkb, *s;
1539
1540 list_for_each_entry_safe(lkb, s, &r->res_waitqueue, lkb_statequeue) {
90135925 1541 if (can_be_granted(r, lkb, 0))
e7fd4179
DT
1542 grant_lock_pending(r, lkb);
1543 else
1544 high = max_t(int, lkb->lkb_rqmode, high);
1545 }
1546
1547 return high;
1548}
1549
1550static void grant_pending_locks(struct dlm_rsb *r)
1551{
1552 struct dlm_lkb *lkb, *s;
1553 int high = DLM_LOCK_IV;
1554
a345da3e 1555 DLM_ASSERT(is_master(r), dlm_dump_rsb(r););
e7fd4179
DT
1556
1557 high = grant_pending_convert(r, high);
1558 high = grant_pending_wait(r, high);
1559
1560 if (high == DLM_LOCK_IV)
1561 return;
1562
1563 /*
1564 * If there are locks left on the wait/convert queue then send blocking
1565 * ASTs to granted locks based on the largest requested mode (high)
3bcd3687 1566 * found above. FIXME: highbast < high comparison not valid for PR/CW.
e7fd4179
DT
1567 */
1568
1569 list_for_each_entry_safe(lkb, s, &r->res_grantqueue, lkb_statequeue) {
1570 if (lkb->lkb_bastaddr && (lkb->lkb_highbast < high) &&
1571 !__dlm_compat_matrix[lkb->lkb_grmode+1][high+1]) {
1572 queue_bast(r, lkb, high);
1573 lkb->lkb_highbast = high;
1574 }
1575 }
1576}
1577
1578static void send_bast_queue(struct dlm_rsb *r, struct list_head *head,
1579 struct dlm_lkb *lkb)
1580{
1581 struct dlm_lkb *gr;
1582
1583 list_for_each_entry(gr, head, lkb_statequeue) {
1584 if (gr->lkb_bastaddr &&
1585 gr->lkb_highbast < lkb->lkb_rqmode &&
3bcd3687 1586 !modes_compat(gr, lkb)) {
e7fd4179
DT
1587 queue_bast(r, gr, lkb->lkb_rqmode);
1588 gr->lkb_highbast = lkb->lkb_rqmode;
1589 }
1590 }
1591}
1592
1593static void send_blocking_asts(struct dlm_rsb *r, struct dlm_lkb *lkb)
1594{
1595 send_bast_queue(r, &r->res_grantqueue, lkb);
1596}
1597
1598static void send_blocking_asts_all(struct dlm_rsb *r, struct dlm_lkb *lkb)
1599{
1600 send_bast_queue(r, &r->res_grantqueue, lkb);
1601 send_bast_queue(r, &r->res_convertqueue, lkb);
1602}
1603
1604/* set_master(r, lkb) -- set the master nodeid of a resource
1605
1606 The purpose of this function is to set the nodeid field in the given
1607 lkb using the nodeid field in the given rsb. If the rsb's nodeid is
1608 known, it can just be copied to the lkb and the function will return
1609 0. If the rsb's nodeid is _not_ known, it needs to be looked up
1610 before it can be copied to the lkb.
1611
1612 When the rsb nodeid is being looked up remotely, the initial lkb
1613 causing the lookup is kept on the ls_waiters list waiting for the
1614 lookup reply. Other lkb's waiting for the same rsb lookup are kept
1615 on the rsb's res_lookup list until the master is verified.
1616
1617 Return values:
1618 0: nodeid is set in rsb/lkb and the caller should go ahead and use it
1619 1: the rsb master is not available and the lkb has been placed on
1620 a wait queue
1621*/
1622
1623static int set_master(struct dlm_rsb *r, struct dlm_lkb *lkb)
1624{
1625 struct dlm_ls *ls = r->res_ls;
1626 int error, dir_nodeid, ret_nodeid, our_nodeid = dlm_our_nodeid();
1627
1628 if (rsb_flag(r, RSB_MASTER_UNCERTAIN)) {
1629 rsb_clear_flag(r, RSB_MASTER_UNCERTAIN);
1630 r->res_first_lkid = lkb->lkb_id;
1631 lkb->lkb_nodeid = r->res_nodeid;
1632 return 0;
1633 }
1634
1635 if (r->res_first_lkid && r->res_first_lkid != lkb->lkb_id) {
1636 list_add_tail(&lkb->lkb_rsb_lookup, &r->res_lookup);
1637 return 1;
1638 }
1639
1640 if (r->res_nodeid == 0) {
1641 lkb->lkb_nodeid = 0;
1642 return 0;
1643 }
1644
1645 if (r->res_nodeid > 0) {
1646 lkb->lkb_nodeid = r->res_nodeid;
1647 return 0;
1648 }
1649
a345da3e 1650 DLM_ASSERT(r->res_nodeid == -1, dlm_dump_rsb(r););
e7fd4179
DT
1651
1652 dir_nodeid = dlm_dir_nodeid(r);
1653
1654 if (dir_nodeid != our_nodeid) {
1655 r->res_first_lkid = lkb->lkb_id;
1656 send_lookup(r, lkb);
1657 return 1;
1658 }
1659
1660 for (;;) {
1661 /* It's possible for dlm_scand to remove an old rsb for
1662 this same resource from the toss list, us to create
1663 a new one, look up the master locally, and find it
1664 already exists just before dlm_scand does the
1665 dir_remove() on the previous rsb. */
1666
1667 error = dlm_dir_lookup(ls, our_nodeid, r->res_name,
1668 r->res_length, &ret_nodeid);
1669 if (!error)
1670 break;
1671 log_debug(ls, "dir_lookup error %d %s", error, r->res_name);
1672 schedule();
1673 }
1674
1675 if (ret_nodeid == our_nodeid) {
1676 r->res_first_lkid = 0;
1677 r->res_nodeid = 0;
1678 lkb->lkb_nodeid = 0;
1679 } else {
1680 r->res_first_lkid = lkb->lkb_id;
1681 r->res_nodeid = ret_nodeid;
1682 lkb->lkb_nodeid = ret_nodeid;
1683 }
1684 return 0;
1685}
1686
1687static void process_lookup_list(struct dlm_rsb *r)
1688{
1689 struct dlm_lkb *lkb, *safe;
1690
1691 list_for_each_entry_safe(lkb, safe, &r->res_lookup, lkb_rsb_lookup) {
ef0c2bb0 1692 list_del_init(&lkb->lkb_rsb_lookup);
e7fd4179
DT
1693 _request_lock(r, lkb);
1694 schedule();
1695 }
1696}
1697
1698/* confirm_master -- confirm (or deny) an rsb's master nodeid */
1699
1700static void confirm_master(struct dlm_rsb *r, int error)
1701{
1702 struct dlm_lkb *lkb;
1703
1704 if (!r->res_first_lkid)
1705 return;
1706
1707 switch (error) {
1708 case 0:
1709 case -EINPROGRESS:
1710 r->res_first_lkid = 0;
1711 process_lookup_list(r);
1712 break;
1713
1714 case -EAGAIN:
1715 /* the remote master didn't queue our NOQUEUE request;
1716 make a waiting lkb the first_lkid */
1717
1718 r->res_first_lkid = 0;
1719
1720 if (!list_empty(&r->res_lookup)) {
1721 lkb = list_entry(r->res_lookup.next, struct dlm_lkb,
1722 lkb_rsb_lookup);
ef0c2bb0 1723 list_del_init(&lkb->lkb_rsb_lookup);
e7fd4179
DT
1724 r->res_first_lkid = lkb->lkb_id;
1725 _request_lock(r, lkb);
1726 } else
1727 r->res_nodeid = -1;
1728 break;
1729
1730 default:
1731 log_error(r->res_ls, "confirm_master unknown error %d", error);
1732 }
1733}
1734
1735static int set_lock_args(int mode, struct dlm_lksb *lksb, uint32_t flags,
1736 int namelen, uint32_t parent_lkid, void *ast,
3bcd3687 1737 void *astarg, void *bast, struct dlm_args *args)
e7fd4179
DT
1738{
1739 int rv = -EINVAL;
1740
1741 /* check for invalid arg usage */
1742
1743 if (mode < 0 || mode > DLM_LOCK_EX)
1744 goto out;
1745
1746 if (!(flags & DLM_LKF_CONVERT) && (namelen > DLM_RESNAME_MAXLEN))
1747 goto out;
1748
1749 if (flags & DLM_LKF_CANCEL)
1750 goto out;
1751
1752 if (flags & DLM_LKF_QUECVT && !(flags & DLM_LKF_CONVERT))
1753 goto out;
1754
1755 if (flags & DLM_LKF_CONVDEADLK && !(flags & DLM_LKF_CONVERT))
1756 goto out;
1757
1758 if (flags & DLM_LKF_CONVDEADLK && flags & DLM_LKF_NOQUEUE)
1759 goto out;
1760
1761 if (flags & DLM_LKF_EXPEDITE && flags & DLM_LKF_CONVERT)
1762 goto out;
1763
1764 if (flags & DLM_LKF_EXPEDITE && flags & DLM_LKF_QUECVT)
1765 goto out;
1766
1767 if (flags & DLM_LKF_EXPEDITE && flags & DLM_LKF_NOQUEUE)
1768 goto out;
1769
1770 if (flags & DLM_LKF_EXPEDITE && mode != DLM_LOCK_NL)
1771 goto out;
1772
1773 if (!ast || !lksb)
1774 goto out;
1775
1776 if (flags & DLM_LKF_VALBLK && !lksb->sb_lvbptr)
1777 goto out;
1778
1779 /* parent/child locks not yet supported */
1780 if (parent_lkid)
1781 goto out;
1782
1783 if (flags & DLM_LKF_CONVERT && !lksb->sb_lkid)
1784 goto out;
1785
1786 /* these args will be copied to the lkb in validate_lock_args,
1787 it cannot be done now because when converting locks, fields in
1788 an active lkb cannot be modified before locking the rsb */
1789
1790 args->flags = flags;
1791 args->astaddr = ast;
1792 args->astparam = (long) astarg;
1793 args->bastaddr = bast;
1794 args->mode = mode;
1795 args->lksb = lksb;
e7fd4179
DT
1796 rv = 0;
1797 out:
1798 return rv;
1799}
1800
1801static int set_unlock_args(uint32_t flags, void *astarg, struct dlm_args *args)
1802{
1803 if (flags & ~(DLM_LKF_CANCEL | DLM_LKF_VALBLK | DLM_LKF_IVVALBLK |
1804 DLM_LKF_FORCEUNLOCK))
1805 return -EINVAL;
1806
ef0c2bb0
DT
1807 if (flags & DLM_LKF_CANCEL && flags & DLM_LKF_FORCEUNLOCK)
1808 return -EINVAL;
1809
e7fd4179
DT
1810 args->flags = flags;
1811 args->astparam = (long) astarg;
1812 return 0;
1813}
1814
1815static int validate_lock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
1816 struct dlm_args *args)
1817{
1818 int rv = -EINVAL;
1819
1820 if (args->flags & DLM_LKF_CONVERT) {
1821 if (lkb->lkb_flags & DLM_IFL_MSTCPY)
1822 goto out;
1823
1824 if (args->flags & DLM_LKF_QUECVT &&
1825 !__quecvt_compat_matrix[lkb->lkb_grmode+1][args->mode+1])
1826 goto out;
1827
1828 rv = -EBUSY;
1829 if (lkb->lkb_status != DLM_LKSTS_GRANTED)
1830 goto out;
1831
1832 if (lkb->lkb_wait_type)
1833 goto out;
ef0c2bb0
DT
1834
1835 if (is_overlap(lkb))
1836 goto out;
e7fd4179
DT
1837 }
1838
1839 lkb->lkb_exflags = args->flags;
1840 lkb->lkb_sbflags = 0;
1841 lkb->lkb_astaddr = args->astaddr;
1842 lkb->lkb_astparam = args->astparam;
1843 lkb->lkb_bastaddr = args->bastaddr;
1844 lkb->lkb_rqmode = args->mode;
1845 lkb->lkb_lksb = args->lksb;
1846 lkb->lkb_lvbptr = args->lksb->sb_lvbptr;
1847 lkb->lkb_ownpid = (int) current->pid;
e7fd4179
DT
1848 rv = 0;
1849 out:
1850 return rv;
1851}
1852
ef0c2bb0
DT
1853/* when dlm_unlock() sees -EBUSY with CANCEL/FORCEUNLOCK it returns 0
1854 for success */
1855
1856/* note: it's valid for lkb_nodeid/res_nodeid to be -1 when we get here
1857 because there may be a lookup in progress and it's valid to do
1858 cancel/unlockf on it */
1859
e7fd4179
DT
1860static int validate_unlock_args(struct dlm_lkb *lkb, struct dlm_args *args)
1861{
ef0c2bb0 1862 struct dlm_ls *ls = lkb->lkb_resource->res_ls;
e7fd4179
DT
1863 int rv = -EINVAL;
1864
ef0c2bb0
DT
1865 if (lkb->lkb_flags & DLM_IFL_MSTCPY) {
1866 log_error(ls, "unlock on MSTCPY %x", lkb->lkb_id);
1867 dlm_print_lkb(lkb);
e7fd4179 1868 goto out;
ef0c2bb0 1869 }
e7fd4179 1870
ef0c2bb0
DT
1871 /* an lkb may still exist even though the lock is EOL'ed due to a
1872 cancel, unlock or failed noqueue request; an app can't use these
1873 locks; return same error as if the lkid had not been found at all */
e7fd4179 1874
ef0c2bb0
DT
1875 if (lkb->lkb_flags & DLM_IFL_ENDOFLIFE) {
1876 log_debug(ls, "unlock on ENDOFLIFE %x", lkb->lkb_id);
1877 rv = -ENOENT;
e7fd4179 1878 goto out;
ef0c2bb0 1879 }
e7fd4179 1880
ef0c2bb0
DT
1881 /* an lkb may be waiting for an rsb lookup to complete where the
1882 lookup was initiated by another lock */
1883
1884 if (args->flags & (DLM_LKF_CANCEL | DLM_LKF_FORCEUNLOCK)) {
1885 if (!list_empty(&lkb->lkb_rsb_lookup)) {
1886 log_debug(ls, "unlock on rsb_lookup %x", lkb->lkb_id);
1887 list_del_init(&lkb->lkb_rsb_lookup);
1888 queue_cast(lkb->lkb_resource, lkb,
1889 args->flags & DLM_LKF_CANCEL ?
1890 -DLM_ECANCEL : -DLM_EUNLOCK);
1891 unhold_lkb(lkb); /* undoes create_lkb() */
1892 rv = -EBUSY;
1893 goto out;
1894 }
1895 }
1896
1897 /* cancel not allowed with another cancel/unlock in progress */
1898
1899 if (args->flags & DLM_LKF_CANCEL) {
1900 if (lkb->lkb_exflags & DLM_LKF_CANCEL)
1901 goto out;
1902
1903 if (is_overlap(lkb))
1904 goto out;
1905
1906 if (lkb->lkb_flags & DLM_IFL_RESEND) {
1907 lkb->lkb_flags |= DLM_IFL_OVERLAP_CANCEL;
1908 rv = -EBUSY;
1909 goto out;
1910 }
1911
1912 switch (lkb->lkb_wait_type) {
1913 case DLM_MSG_LOOKUP:
1914 case DLM_MSG_REQUEST:
1915 lkb->lkb_flags |= DLM_IFL_OVERLAP_CANCEL;
1916 rv = -EBUSY;
1917 goto out;
1918 case DLM_MSG_UNLOCK:
1919 case DLM_MSG_CANCEL:
1920 goto out;
1921 }
1922 /* add_to_waiters() will set OVERLAP_CANCEL */
1923 goto out_ok;
1924 }
1925
1926 /* do we need to allow a force-unlock if there's a normal unlock
1927 already in progress? in what conditions could the normal unlock
1928 fail such that we'd want to send a force-unlock to be sure? */
1929
1930 if (args->flags & DLM_LKF_FORCEUNLOCK) {
1931 if (lkb->lkb_exflags & DLM_LKF_FORCEUNLOCK)
1932 goto out;
1933
1934 if (is_overlap_unlock(lkb))
1935 goto out;
e7fd4179 1936
ef0c2bb0
DT
1937 if (lkb->lkb_flags & DLM_IFL_RESEND) {
1938 lkb->lkb_flags |= DLM_IFL_OVERLAP_UNLOCK;
1939 rv = -EBUSY;
1940 goto out;
1941 }
1942
1943 switch (lkb->lkb_wait_type) {
1944 case DLM_MSG_LOOKUP:
1945 case DLM_MSG_REQUEST:
1946 lkb->lkb_flags |= DLM_IFL_OVERLAP_UNLOCK;
1947 rv = -EBUSY;
1948 goto out;
1949 case DLM_MSG_UNLOCK:
1950 goto out;
1951 }
1952 /* add_to_waiters() will set OVERLAP_UNLOCK */
1953 goto out_ok;
1954 }
1955
1956 /* normal unlock not allowed if there's any op in progress */
e7fd4179 1957 rv = -EBUSY;
ef0c2bb0 1958 if (lkb->lkb_wait_type || lkb->lkb_wait_count)
e7fd4179
DT
1959 goto out;
1960
1961 out_ok:
ef0c2bb0
DT
1962 /* an overlapping op shouldn't blow away exflags from other op */
1963 lkb->lkb_exflags |= args->flags;
e7fd4179
DT
1964 lkb->lkb_sbflags = 0;
1965 lkb->lkb_astparam = args->astparam;
e7fd4179
DT
1966 rv = 0;
1967 out:
ef0c2bb0
DT
1968 if (rv)
1969 log_debug(ls, "validate_unlock_args %d %x %x %x %x %d %s", rv,
1970 lkb->lkb_id, lkb->lkb_flags, lkb->lkb_exflags,
1971 args->flags, lkb->lkb_wait_type,
1972 lkb->lkb_resource->res_name);
e7fd4179
DT
1973 return rv;
1974}
1975
1976/*
1977 * Four stage 4 varieties:
1978 * do_request(), do_convert(), do_unlock(), do_cancel()
1979 * These are called on the master node for the given lock and
1980 * from the central locking logic.
1981 */
1982
1983static int do_request(struct dlm_rsb *r, struct dlm_lkb *lkb)
1984{
1985 int error = 0;
1986
90135925 1987 if (can_be_granted(r, lkb, 1)) {
e7fd4179
DT
1988 grant_lock(r, lkb);
1989 queue_cast(r, lkb, 0);
1990 goto out;
1991 }
1992
1993 if (can_be_queued(lkb)) {
1994 error = -EINPROGRESS;
1995 add_lkb(r, lkb, DLM_LKSTS_WAITING);
1996 send_blocking_asts(r, lkb);
1997 goto out;
1998 }
1999
2000 error = -EAGAIN;
2001 if (force_blocking_asts(lkb))
2002 send_blocking_asts_all(r, lkb);
2003 queue_cast(r, lkb, -EAGAIN);
2004
2005 out:
2006 return error;
2007}
2008
2009static int do_convert(struct dlm_rsb *r, struct dlm_lkb *lkb)
2010{
2011 int error = 0;
2012
2013 /* changing an existing lock may allow others to be granted */
2014
90135925 2015 if (can_be_granted(r, lkb, 1)) {
e7fd4179
DT
2016 grant_lock(r, lkb);
2017 queue_cast(r, lkb, 0);
2018 grant_pending_locks(r);
2019 goto out;
2020 }
2021
7d3c1feb
DT
2022 /* is_demoted() means the can_be_granted() above set the grmode
2023 to NL, and left us on the granted queue. This auto-demotion
2024 (due to CONVDEADLK) might mean other locks, and/or this lock, are
2025 now grantable. We have to try to grant other converting locks
2026 before we try again to grant this one. */
2027
2028 if (is_demoted(lkb)) {
2029 grant_pending_convert(r, DLM_LOCK_IV);
2030 if (_can_be_granted(r, lkb, 1)) {
2031 grant_lock(r, lkb);
2032 queue_cast(r, lkb, 0);
e7fd4179 2033 grant_pending_locks(r);
7d3c1feb
DT
2034 goto out;
2035 }
2036 /* else fall through and move to convert queue */
2037 }
2038
2039 if (can_be_queued(lkb)) {
e7fd4179
DT
2040 error = -EINPROGRESS;
2041 del_lkb(r, lkb);
2042 add_lkb(r, lkb, DLM_LKSTS_CONVERT);
2043 send_blocking_asts(r, lkb);
2044 goto out;
2045 }
2046
2047 error = -EAGAIN;
2048 if (force_blocking_asts(lkb))
2049 send_blocking_asts_all(r, lkb);
2050 queue_cast(r, lkb, -EAGAIN);
2051
2052 out:
2053 return error;
2054}
2055
2056static int do_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2057{
2058 remove_lock(r, lkb);
2059 queue_cast(r, lkb, -DLM_EUNLOCK);
2060 grant_pending_locks(r);
2061 return -DLM_EUNLOCK;
2062}
2063
ef0c2bb0 2064/* returns: 0 did nothing, -DLM_ECANCEL canceled lock */
907b9bce 2065
e7fd4179
DT
2066static int do_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb)
2067{
ef0c2bb0
DT
2068 int error;
2069
2070 error = revert_lock(r, lkb);
2071 if (error) {
2072 queue_cast(r, lkb, -DLM_ECANCEL);
2073 grant_pending_locks(r);
2074 return -DLM_ECANCEL;
2075 }
2076 return 0;
e7fd4179
DT
2077}
2078
2079/*
2080 * Four stage 3 varieties:
2081 * _request_lock(), _convert_lock(), _unlock_lock(), _cancel_lock()
2082 */
2083
2084/* add a new lkb to a possibly new rsb, called by requesting process */
2085
2086static int _request_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2087{
2088 int error;
2089
2090 /* set_master: sets lkb nodeid from r */
2091
2092 error = set_master(r, lkb);
2093 if (error < 0)
2094 goto out;
2095 if (error) {
2096 error = 0;
2097 goto out;
2098 }
2099
2100 if (is_remote(r))
2101 /* receive_request() calls do_request() on remote node */
2102 error = send_request(r, lkb);
2103 else
2104 error = do_request(r, lkb);
2105 out:
2106 return error;
2107}
2108
3bcd3687 2109/* change some property of an existing lkb, e.g. mode */
e7fd4179
DT
2110
2111static int _convert_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2112{
2113 int error;
2114
2115 if (is_remote(r))
2116 /* receive_convert() calls do_convert() on remote node */
2117 error = send_convert(r, lkb);
2118 else
2119 error = do_convert(r, lkb);
2120
2121 return error;
2122}
2123
2124/* remove an existing lkb from the granted queue */
2125
2126static int _unlock_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2127{
2128 int error;
2129
2130 if (is_remote(r))
2131 /* receive_unlock() calls do_unlock() on remote node */
2132 error = send_unlock(r, lkb);
2133 else
2134 error = do_unlock(r, lkb);
2135
2136 return error;
2137}
2138
2139/* remove an existing lkb from the convert or wait queue */
2140
2141static int _cancel_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2142{
2143 int error;
2144
2145 if (is_remote(r))
2146 /* receive_cancel() calls do_cancel() on remote node */
2147 error = send_cancel(r, lkb);
2148 else
2149 error = do_cancel(r, lkb);
2150
2151 return error;
2152}
2153
2154/*
2155 * Four stage 2 varieties:
2156 * request_lock(), convert_lock(), unlock_lock(), cancel_lock()
2157 */
2158
2159static int request_lock(struct dlm_ls *ls, struct dlm_lkb *lkb, char *name,
2160 int len, struct dlm_args *args)
2161{
2162 struct dlm_rsb *r;
2163 int error;
2164
2165 error = validate_lock_args(ls, lkb, args);
2166 if (error)
2167 goto out;
2168
2169 error = find_rsb(ls, name, len, R_CREATE, &r);
2170 if (error)
2171 goto out;
2172
2173 lock_rsb(r);
2174
2175 attach_lkb(r, lkb);
2176 lkb->lkb_lksb->sb_lkid = lkb->lkb_id;
2177
2178 error = _request_lock(r, lkb);
2179
2180 unlock_rsb(r);
2181 put_rsb(r);
2182
2183 out:
2184 return error;
2185}
2186
2187static int convert_lock(struct dlm_ls *ls, struct dlm_lkb *lkb,
2188 struct dlm_args *args)
2189{
2190 struct dlm_rsb *r;
2191 int error;
2192
2193 r = lkb->lkb_resource;
2194
2195 hold_rsb(r);
2196 lock_rsb(r);
2197
2198 error = validate_lock_args(ls, lkb, args);
2199 if (error)
2200 goto out;
2201
2202 error = _convert_lock(r, lkb);
2203 out:
2204 unlock_rsb(r);
2205 put_rsb(r);
2206 return error;
2207}
2208
2209static int unlock_lock(struct dlm_ls *ls, struct dlm_lkb *lkb,
2210 struct dlm_args *args)
2211{
2212 struct dlm_rsb *r;
2213 int error;
2214
2215 r = lkb->lkb_resource;
2216
2217 hold_rsb(r);
2218 lock_rsb(r);
2219
2220 error = validate_unlock_args(lkb, args);
2221 if (error)
2222 goto out;
2223
2224 error = _unlock_lock(r, lkb);
2225 out:
2226 unlock_rsb(r);
2227 put_rsb(r);
2228 return error;
2229}
2230
2231static int cancel_lock(struct dlm_ls *ls, struct dlm_lkb *lkb,
2232 struct dlm_args *args)
2233{
2234 struct dlm_rsb *r;
2235 int error;
2236
2237 r = lkb->lkb_resource;
2238
2239 hold_rsb(r);
2240 lock_rsb(r);
2241
2242 error = validate_unlock_args(lkb, args);
2243 if (error)
2244 goto out;
2245
2246 error = _cancel_lock(r, lkb);
2247 out:
2248 unlock_rsb(r);
2249 put_rsb(r);
2250 return error;
2251}
2252
2253/*
2254 * Two stage 1 varieties: dlm_lock() and dlm_unlock()
2255 */
2256
2257int dlm_lock(dlm_lockspace_t *lockspace,
2258 int mode,
2259 struct dlm_lksb *lksb,
2260 uint32_t flags,
2261 void *name,
2262 unsigned int namelen,
2263 uint32_t parent_lkid,
2264 void (*ast) (void *astarg),
2265 void *astarg,
3bcd3687 2266 void (*bast) (void *astarg, int mode))
e7fd4179
DT
2267{
2268 struct dlm_ls *ls;
2269 struct dlm_lkb *lkb;
2270 struct dlm_args args;
2271 int error, convert = flags & DLM_LKF_CONVERT;
2272
2273 ls = dlm_find_lockspace_local(lockspace);
2274 if (!ls)
2275 return -EINVAL;
2276
2277 lock_recovery(ls);
2278
2279 if (convert)
2280 error = find_lkb(ls, lksb->sb_lkid, &lkb);
2281 else
2282 error = create_lkb(ls, &lkb);
2283
2284 if (error)
2285 goto out;
2286
2287 error = set_lock_args(mode, lksb, flags, namelen, parent_lkid, ast,
3bcd3687 2288 astarg, bast, &args);
e7fd4179
DT
2289 if (error)
2290 goto out_put;
2291
2292 if (convert)
2293 error = convert_lock(ls, lkb, &args);
2294 else
2295 error = request_lock(ls, lkb, name, namelen, &args);
2296
2297 if (error == -EINPROGRESS)
2298 error = 0;
2299 out_put:
2300 if (convert || error)
b3f58d8f 2301 __put_lkb(ls, lkb);
e7fd4179
DT
2302 if (error == -EAGAIN)
2303 error = 0;
2304 out:
2305 unlock_recovery(ls);
2306 dlm_put_lockspace(ls);
2307 return error;
2308}
2309
2310int dlm_unlock(dlm_lockspace_t *lockspace,
2311 uint32_t lkid,
2312 uint32_t flags,
2313 struct dlm_lksb *lksb,
2314 void *astarg)
2315{
2316 struct dlm_ls *ls;
2317 struct dlm_lkb *lkb;
2318 struct dlm_args args;
2319 int error;
2320
2321 ls = dlm_find_lockspace_local(lockspace);
2322 if (!ls)
2323 return -EINVAL;
2324
2325 lock_recovery(ls);
2326
2327 error = find_lkb(ls, lkid, &lkb);
2328 if (error)
2329 goto out;
2330
2331 error = set_unlock_args(flags, astarg, &args);
2332 if (error)
2333 goto out_put;
2334
2335 if (flags & DLM_LKF_CANCEL)
2336 error = cancel_lock(ls, lkb, &args);
2337 else
2338 error = unlock_lock(ls, lkb, &args);
2339
2340 if (error == -DLM_EUNLOCK || error == -DLM_ECANCEL)
2341 error = 0;
ef0c2bb0
DT
2342 if (error == -EBUSY && (flags & (DLM_LKF_CANCEL | DLM_LKF_FORCEUNLOCK)))
2343 error = 0;
e7fd4179 2344 out_put:
b3f58d8f 2345 dlm_put_lkb(lkb);
e7fd4179
DT
2346 out:
2347 unlock_recovery(ls);
2348 dlm_put_lockspace(ls);
2349 return error;
2350}
2351
2352/*
2353 * send/receive routines for remote operations and replies
2354 *
2355 * send_args
2356 * send_common
2357 * send_request receive_request
2358 * send_convert receive_convert
2359 * send_unlock receive_unlock
2360 * send_cancel receive_cancel
2361 * send_grant receive_grant
2362 * send_bast receive_bast
2363 * send_lookup receive_lookup
2364 * send_remove receive_remove
2365 *
2366 * send_common_reply
2367 * receive_request_reply send_request_reply
2368 * receive_convert_reply send_convert_reply
2369 * receive_unlock_reply send_unlock_reply
2370 * receive_cancel_reply send_cancel_reply
2371 * receive_lookup_reply send_lookup_reply
2372 */
2373
7e4dac33
DT
2374static int _create_message(struct dlm_ls *ls, int mb_len,
2375 int to_nodeid, int mstype,
2376 struct dlm_message **ms_ret,
2377 struct dlm_mhandle **mh_ret)
e7fd4179
DT
2378{
2379 struct dlm_message *ms;
2380 struct dlm_mhandle *mh;
2381 char *mb;
e7fd4179
DT
2382
2383 /* get_buffer gives us a message handle (mh) that we need to
2384 pass into lowcomms_commit and a message buffer (mb) that we
2385 write our data into */
2386
2387 mh = dlm_lowcomms_get_buffer(to_nodeid, mb_len, GFP_KERNEL, &mb);
2388 if (!mh)
2389 return -ENOBUFS;
2390
2391 memset(mb, 0, mb_len);
2392
2393 ms = (struct dlm_message *) mb;
2394
2395 ms->m_header.h_version = (DLM_HEADER_MAJOR | DLM_HEADER_MINOR);
7e4dac33 2396 ms->m_header.h_lockspace = ls->ls_global_id;
e7fd4179
DT
2397 ms->m_header.h_nodeid = dlm_our_nodeid();
2398 ms->m_header.h_length = mb_len;
2399 ms->m_header.h_cmd = DLM_MSG;
2400
2401 ms->m_type = mstype;
2402
2403 *mh_ret = mh;
2404 *ms_ret = ms;
2405 return 0;
2406}
2407
7e4dac33
DT
2408static int create_message(struct dlm_rsb *r, struct dlm_lkb *lkb,
2409 int to_nodeid, int mstype,
2410 struct dlm_message **ms_ret,
2411 struct dlm_mhandle **mh_ret)
2412{
2413 int mb_len = sizeof(struct dlm_message);
2414
2415 switch (mstype) {
2416 case DLM_MSG_REQUEST:
2417 case DLM_MSG_LOOKUP:
2418 case DLM_MSG_REMOVE:
2419 mb_len += r->res_length;
2420 break;
2421 case DLM_MSG_CONVERT:
2422 case DLM_MSG_UNLOCK:
2423 case DLM_MSG_REQUEST_REPLY:
2424 case DLM_MSG_CONVERT_REPLY:
2425 case DLM_MSG_GRANT:
2426 if (lkb && lkb->lkb_lvbptr)
2427 mb_len += r->res_ls->ls_lvblen;
2428 break;
2429 }
2430
2431 return _create_message(r->res_ls, mb_len, to_nodeid, mstype,
2432 ms_ret, mh_ret);
2433}
2434
e7fd4179
DT
2435/* further lowcomms enhancements or alternate implementations may make
2436 the return value from this function useful at some point */
2437
2438static int send_message(struct dlm_mhandle *mh, struct dlm_message *ms)
2439{
2440 dlm_message_out(ms);
2441 dlm_lowcomms_commit_buffer(mh);
2442 return 0;
2443}
2444
2445static void send_args(struct dlm_rsb *r, struct dlm_lkb *lkb,
2446 struct dlm_message *ms)
2447{
2448 ms->m_nodeid = lkb->lkb_nodeid;
2449 ms->m_pid = lkb->lkb_ownpid;
2450 ms->m_lkid = lkb->lkb_id;
2451 ms->m_remid = lkb->lkb_remid;
2452 ms->m_exflags = lkb->lkb_exflags;
2453 ms->m_sbflags = lkb->lkb_sbflags;
2454 ms->m_flags = lkb->lkb_flags;
2455 ms->m_lvbseq = lkb->lkb_lvbseq;
2456 ms->m_status = lkb->lkb_status;
2457 ms->m_grmode = lkb->lkb_grmode;
2458 ms->m_rqmode = lkb->lkb_rqmode;
2459 ms->m_hash = r->res_hash;
2460
2461 /* m_result and m_bastmode are set from function args,
2462 not from lkb fields */
2463
2464 if (lkb->lkb_bastaddr)
2465 ms->m_asts |= AST_BAST;
2466 if (lkb->lkb_astaddr)
2467 ms->m_asts |= AST_COMP;
2468
da49f36f
DT
2469 /* compare with switch in create_message; send_remove() doesn't
2470 use send_args() */
e7fd4179 2471
da49f36f
DT
2472 switch (ms->m_type) {
2473 case DLM_MSG_REQUEST:
2474 case DLM_MSG_LOOKUP:
2475 memcpy(ms->m_extra, r->res_name, r->res_length);
2476 break;
2477 case DLM_MSG_CONVERT:
2478 case DLM_MSG_UNLOCK:
2479 case DLM_MSG_REQUEST_REPLY:
2480 case DLM_MSG_CONVERT_REPLY:
2481 case DLM_MSG_GRANT:
2482 if (!lkb->lkb_lvbptr)
2483 break;
e7fd4179 2484 memcpy(ms->m_extra, lkb->lkb_lvbptr, r->res_ls->ls_lvblen);
da49f36f
DT
2485 break;
2486 }
e7fd4179
DT
2487}
2488
2489static int send_common(struct dlm_rsb *r, struct dlm_lkb *lkb, int mstype)
2490{
2491 struct dlm_message *ms;
2492 struct dlm_mhandle *mh;
2493 int to_nodeid, error;
2494
ef0c2bb0
DT
2495 error = add_to_waiters(lkb, mstype);
2496 if (error)
2497 return error;
e7fd4179
DT
2498
2499 to_nodeid = r->res_nodeid;
2500
2501 error = create_message(r, lkb, to_nodeid, mstype, &ms, &mh);
2502 if (error)
2503 goto fail;
2504
2505 send_args(r, lkb, ms);
2506
2507 error = send_message(mh, ms);
2508 if (error)
2509 goto fail;
2510 return 0;
2511
2512 fail:
ef0c2bb0 2513 remove_from_waiters(lkb, msg_reply_type(mstype));
e7fd4179
DT
2514 return error;
2515}
2516
2517static int send_request(struct dlm_rsb *r, struct dlm_lkb *lkb)
2518{
2519 return send_common(r, lkb, DLM_MSG_REQUEST);
2520}
2521
2522static int send_convert(struct dlm_rsb *r, struct dlm_lkb *lkb)
2523{
2524 int error;
2525
2526 error = send_common(r, lkb, DLM_MSG_CONVERT);
2527
2528 /* down conversions go without a reply from the master */
2529 if (!error && down_conversion(lkb)) {
ef0c2bb0
DT
2530 remove_from_waiters(lkb, DLM_MSG_CONVERT_REPLY);
2531 r->res_ls->ls_stub_ms.m_type = DLM_MSG_CONVERT_REPLY;
e7fd4179 2532 r->res_ls->ls_stub_ms.m_result = 0;
32f105a1 2533 r->res_ls->ls_stub_ms.m_flags = lkb->lkb_flags;
e7fd4179
DT
2534 __receive_convert_reply(r, lkb, &r->res_ls->ls_stub_ms);
2535 }
2536
2537 return error;
2538}
2539
2540/* FIXME: if this lkb is the only lock we hold on the rsb, then set
2541 MASTER_UNCERTAIN to force the next request on the rsb to confirm
2542 that the master is still correct. */
2543
2544static int send_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2545{
2546 return send_common(r, lkb, DLM_MSG_UNLOCK);
2547}
2548
2549static int send_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb)
2550{
2551 return send_common(r, lkb, DLM_MSG_CANCEL);
2552}
2553
2554static int send_grant(struct dlm_rsb *r, struct dlm_lkb *lkb)
2555{
2556 struct dlm_message *ms;
2557 struct dlm_mhandle *mh;
2558 int to_nodeid, error;
2559
2560 to_nodeid = lkb->lkb_nodeid;
2561
2562 error = create_message(r, lkb, to_nodeid, DLM_MSG_GRANT, &ms, &mh);
2563 if (error)
2564 goto out;
2565
2566 send_args(r, lkb, ms);
2567
2568 ms->m_result = 0;
2569
2570 error = send_message(mh, ms);
2571 out:
2572 return error;
2573}
2574
2575static int send_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int mode)
2576{
2577 struct dlm_message *ms;
2578 struct dlm_mhandle *mh;
2579 int to_nodeid, error;
2580
2581 to_nodeid = lkb->lkb_nodeid;
2582
2583 error = create_message(r, NULL, to_nodeid, DLM_MSG_BAST, &ms, &mh);
2584 if (error)
2585 goto out;
2586
2587 send_args(r, lkb, ms);
2588
2589 ms->m_bastmode = mode;
2590
2591 error = send_message(mh, ms);
2592 out:
2593 return error;
2594}
2595
2596static int send_lookup(struct dlm_rsb *r, struct dlm_lkb *lkb)
2597{
2598 struct dlm_message *ms;
2599 struct dlm_mhandle *mh;
2600 int to_nodeid, error;
2601
ef0c2bb0
DT
2602 error = add_to_waiters(lkb, DLM_MSG_LOOKUP);
2603 if (error)
2604 return error;
e7fd4179
DT
2605
2606 to_nodeid = dlm_dir_nodeid(r);
2607
2608 error = create_message(r, NULL, to_nodeid, DLM_MSG_LOOKUP, &ms, &mh);
2609 if (error)
2610 goto fail;
2611
2612 send_args(r, lkb, ms);
2613
2614 error = send_message(mh, ms);
2615 if (error)
2616 goto fail;
2617 return 0;
2618
2619 fail:
ef0c2bb0 2620 remove_from_waiters(lkb, DLM_MSG_LOOKUP_REPLY);
e7fd4179
DT
2621 return error;
2622}
2623
2624static int send_remove(struct dlm_rsb *r)
2625{
2626 struct dlm_message *ms;
2627 struct dlm_mhandle *mh;
2628 int to_nodeid, error;
2629
2630 to_nodeid = dlm_dir_nodeid(r);
2631
2632 error = create_message(r, NULL, to_nodeid, DLM_MSG_REMOVE, &ms, &mh);
2633 if (error)
2634 goto out;
2635
2636 memcpy(ms->m_extra, r->res_name, r->res_length);
2637 ms->m_hash = r->res_hash;
2638
2639 error = send_message(mh, ms);
2640 out:
2641 return error;
2642}
2643
2644static int send_common_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
2645 int mstype, int rv)
2646{
2647 struct dlm_message *ms;
2648 struct dlm_mhandle *mh;
2649 int to_nodeid, error;
2650
2651 to_nodeid = lkb->lkb_nodeid;
2652
2653 error = create_message(r, lkb, to_nodeid, mstype, &ms, &mh);
2654 if (error)
2655 goto out;
2656
2657 send_args(r, lkb, ms);
2658
2659 ms->m_result = rv;
2660
2661 error = send_message(mh, ms);
2662 out:
2663 return error;
2664}
2665
2666static int send_request_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
2667{
2668 return send_common_reply(r, lkb, DLM_MSG_REQUEST_REPLY, rv);
2669}
2670
2671static int send_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
2672{
2673 return send_common_reply(r, lkb, DLM_MSG_CONVERT_REPLY, rv);
2674}
2675
2676static int send_unlock_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
2677{
2678 return send_common_reply(r, lkb, DLM_MSG_UNLOCK_REPLY, rv);
2679}
2680
2681static int send_cancel_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
2682{
2683 return send_common_reply(r, lkb, DLM_MSG_CANCEL_REPLY, rv);
2684}
2685
2686static int send_lookup_reply(struct dlm_ls *ls, struct dlm_message *ms_in,
2687 int ret_nodeid, int rv)
2688{
2689 struct dlm_rsb *r = &ls->ls_stub_rsb;
2690 struct dlm_message *ms;
2691 struct dlm_mhandle *mh;
2692 int error, nodeid = ms_in->m_header.h_nodeid;
2693
2694 error = create_message(r, NULL, nodeid, DLM_MSG_LOOKUP_REPLY, &ms, &mh);
2695 if (error)
2696 goto out;
2697
2698 ms->m_lkid = ms_in->m_lkid;
2699 ms->m_result = rv;
2700 ms->m_nodeid = ret_nodeid;
2701
2702 error = send_message(mh, ms);
2703 out:
2704 return error;
2705}
2706
2707/* which args we save from a received message depends heavily on the type
2708 of message, unlike the send side where we can safely send everything about
2709 the lkb for any type of message */
2710
2711static void receive_flags(struct dlm_lkb *lkb, struct dlm_message *ms)
2712{
2713 lkb->lkb_exflags = ms->m_exflags;
6f90a8b1 2714 lkb->lkb_sbflags = ms->m_sbflags;
e7fd4179
DT
2715 lkb->lkb_flags = (lkb->lkb_flags & 0xFFFF0000) |
2716 (ms->m_flags & 0x0000FFFF);
2717}
2718
2719static void receive_flags_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
2720{
2721 lkb->lkb_sbflags = ms->m_sbflags;
2722 lkb->lkb_flags = (lkb->lkb_flags & 0xFFFF0000) |
2723 (ms->m_flags & 0x0000FFFF);
2724}
2725
2726static int receive_extralen(struct dlm_message *ms)
2727{
2728 return (ms->m_header.h_length - sizeof(struct dlm_message));
2729}
2730
e7fd4179
DT
2731static int receive_lvb(struct dlm_ls *ls, struct dlm_lkb *lkb,
2732 struct dlm_message *ms)
2733{
2734 int len;
2735
2736 if (lkb->lkb_exflags & DLM_LKF_VALBLK) {
2737 if (!lkb->lkb_lvbptr)
2738 lkb->lkb_lvbptr = allocate_lvb(ls);
2739 if (!lkb->lkb_lvbptr)
2740 return -ENOMEM;
2741 len = receive_extralen(ms);
2742 memcpy(lkb->lkb_lvbptr, ms->m_extra, len);
2743 }
2744 return 0;
2745}
2746
2747static int receive_request_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
2748 struct dlm_message *ms)
2749{
2750 lkb->lkb_nodeid = ms->m_header.h_nodeid;
2751 lkb->lkb_ownpid = ms->m_pid;
2752 lkb->lkb_remid = ms->m_lkid;
2753 lkb->lkb_grmode = DLM_LOCK_IV;
2754 lkb->lkb_rqmode = ms->m_rqmode;
2755 lkb->lkb_bastaddr = (void *) (long) (ms->m_asts & AST_BAST);
2756 lkb->lkb_astaddr = (void *) (long) (ms->m_asts & AST_COMP);
2757
2758 DLM_ASSERT(is_master_copy(lkb), dlm_print_lkb(lkb););
2759
8d07fd50
DT
2760 if (lkb->lkb_exflags & DLM_LKF_VALBLK) {
2761 /* lkb was just created so there won't be an lvb yet */
2762 lkb->lkb_lvbptr = allocate_lvb(ls);
2763 if (!lkb->lkb_lvbptr)
2764 return -ENOMEM;
2765 }
e7fd4179
DT
2766
2767 return 0;
2768}
2769
2770static int receive_convert_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
2771 struct dlm_message *ms)
2772{
2773 if (lkb->lkb_nodeid != ms->m_header.h_nodeid) {
2774 log_error(ls, "convert_args nodeid %d %d lkid %x %x",
2775 lkb->lkb_nodeid, ms->m_header.h_nodeid,
2776 lkb->lkb_id, lkb->lkb_remid);
2777 return -EINVAL;
2778 }
2779
2780 if (!is_master_copy(lkb))
2781 return -EINVAL;
2782
2783 if (lkb->lkb_status != DLM_LKSTS_GRANTED)
2784 return -EBUSY;
2785
e7fd4179
DT
2786 if (receive_lvb(ls, lkb, ms))
2787 return -ENOMEM;
2788
2789 lkb->lkb_rqmode = ms->m_rqmode;
2790 lkb->lkb_lvbseq = ms->m_lvbseq;
2791
2792 return 0;
2793}
2794
2795static int receive_unlock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
2796 struct dlm_message *ms)
2797{
2798 if (!is_master_copy(lkb))
2799 return -EINVAL;
2800 if (receive_lvb(ls, lkb, ms))
2801 return -ENOMEM;
2802 return 0;
2803}
2804
2805/* We fill in the stub-lkb fields with the info that send_xxxx_reply()
2806 uses to send a reply and that the remote end uses to process the reply. */
2807
2808static void setup_stub_lkb(struct dlm_ls *ls, struct dlm_message *ms)
2809{
2810 struct dlm_lkb *lkb = &ls->ls_stub_lkb;
2811 lkb->lkb_nodeid = ms->m_header.h_nodeid;
2812 lkb->lkb_remid = ms->m_lkid;
2813}
2814
2815static void receive_request(struct dlm_ls *ls, struct dlm_message *ms)
2816{
2817 struct dlm_lkb *lkb;
2818 struct dlm_rsb *r;
2819 int error, namelen;
2820
2821 error = create_lkb(ls, &lkb);
2822 if (error)
2823 goto fail;
2824
2825 receive_flags(lkb, ms);
2826 lkb->lkb_flags |= DLM_IFL_MSTCPY;
2827 error = receive_request_args(ls, lkb, ms);
2828 if (error) {
b3f58d8f 2829 __put_lkb(ls, lkb);
e7fd4179
DT
2830 goto fail;
2831 }
2832
2833 namelen = receive_extralen(ms);
2834
2835 error = find_rsb(ls, ms->m_extra, namelen, R_MASTER, &r);
2836 if (error) {
b3f58d8f 2837 __put_lkb(ls, lkb);
e7fd4179
DT
2838 goto fail;
2839 }
2840
2841 lock_rsb(r);
2842
2843 attach_lkb(r, lkb);
2844 error = do_request(r, lkb);
2845 send_request_reply(r, lkb, error);
2846
2847 unlock_rsb(r);
2848 put_rsb(r);
2849
2850 if (error == -EINPROGRESS)
2851 error = 0;
2852 if (error)
b3f58d8f 2853 dlm_put_lkb(lkb);
e7fd4179
DT
2854 return;
2855
2856 fail:
2857 setup_stub_lkb(ls, ms);
2858 send_request_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
2859}
2860
2861static void receive_convert(struct dlm_ls *ls, struct dlm_message *ms)
2862{
2863 struct dlm_lkb *lkb;
2864 struct dlm_rsb *r;
90135925 2865 int error, reply = 1;
e7fd4179
DT
2866
2867 error = find_lkb(ls, ms->m_remid, &lkb);
2868 if (error)
2869 goto fail;
2870
2871 r = lkb->lkb_resource;
2872
2873 hold_rsb(r);
2874 lock_rsb(r);
2875
2876 receive_flags(lkb, ms);
2877 error = receive_convert_args(ls, lkb, ms);
2878 if (error)
2879 goto out;
2880 reply = !down_conversion(lkb);
2881
2882 error = do_convert(r, lkb);
2883 out:
2884 if (reply)
2885 send_convert_reply(r, lkb, error);
2886
2887 unlock_rsb(r);
2888 put_rsb(r);
b3f58d8f 2889 dlm_put_lkb(lkb);
e7fd4179
DT
2890 return;
2891
2892 fail:
2893 setup_stub_lkb(ls, ms);
2894 send_convert_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
2895}
2896
2897static void receive_unlock(struct dlm_ls *ls, struct dlm_message *ms)
2898{
2899 struct dlm_lkb *lkb;
2900 struct dlm_rsb *r;
2901 int error;
2902
2903 error = find_lkb(ls, ms->m_remid, &lkb);
2904 if (error)
2905 goto fail;
2906
2907 r = lkb->lkb_resource;
2908
2909 hold_rsb(r);
2910 lock_rsb(r);
2911
2912 receive_flags(lkb, ms);
2913 error = receive_unlock_args(ls, lkb, ms);
2914 if (error)
2915 goto out;
2916
2917 error = do_unlock(r, lkb);
2918 out:
2919 send_unlock_reply(r, lkb, error);
2920
2921 unlock_rsb(r);
2922 put_rsb(r);
b3f58d8f 2923 dlm_put_lkb(lkb);
e7fd4179
DT
2924 return;
2925
2926 fail:
2927 setup_stub_lkb(ls, ms);
2928 send_unlock_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
2929}
2930
2931static void receive_cancel(struct dlm_ls *ls, struct dlm_message *ms)
2932{
2933 struct dlm_lkb *lkb;
2934 struct dlm_rsb *r;
2935 int error;
2936
2937 error = find_lkb(ls, ms->m_remid, &lkb);
2938 if (error)
2939 goto fail;
2940
2941 receive_flags(lkb, ms);
2942
2943 r = lkb->lkb_resource;
2944
2945 hold_rsb(r);
2946 lock_rsb(r);
2947
2948 error = do_cancel(r, lkb);
2949 send_cancel_reply(r, lkb, error);
2950
2951 unlock_rsb(r);
2952 put_rsb(r);
b3f58d8f 2953 dlm_put_lkb(lkb);
e7fd4179
DT
2954 return;
2955
2956 fail:
2957 setup_stub_lkb(ls, ms);
2958 send_cancel_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
2959}
2960
2961static void receive_grant(struct dlm_ls *ls, struct dlm_message *ms)
2962{
2963 struct dlm_lkb *lkb;
2964 struct dlm_rsb *r;
2965 int error;
2966
2967 error = find_lkb(ls, ms->m_remid, &lkb);
2968 if (error) {
2969 log_error(ls, "receive_grant no lkb");
2970 return;
2971 }
2972 DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb););
2973
2974 r = lkb->lkb_resource;
2975
2976 hold_rsb(r);
2977 lock_rsb(r);
2978
2979 receive_flags_reply(lkb, ms);
7d3c1feb
DT
2980 if (is_altmode(lkb))
2981 munge_altmode(lkb, ms);
e7fd4179
DT
2982 grant_lock_pc(r, lkb, ms);
2983 queue_cast(r, lkb, 0);
2984
2985 unlock_rsb(r);
2986 put_rsb(r);
b3f58d8f 2987 dlm_put_lkb(lkb);
e7fd4179
DT
2988}
2989
2990static void receive_bast(struct dlm_ls *ls, struct dlm_message *ms)
2991{
2992 struct dlm_lkb *lkb;
2993 struct dlm_rsb *r;
2994 int error;
2995
2996 error = find_lkb(ls, ms->m_remid, &lkb);
2997 if (error) {
2998 log_error(ls, "receive_bast no lkb");
2999 return;
3000 }
3001 DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb););
3002
3003 r = lkb->lkb_resource;
3004
3005 hold_rsb(r);
3006 lock_rsb(r);
3007
3008 queue_bast(r, lkb, ms->m_bastmode);
3009
3010 unlock_rsb(r);
3011 put_rsb(r);
b3f58d8f 3012 dlm_put_lkb(lkb);
e7fd4179
DT
3013}
3014
3015static void receive_lookup(struct dlm_ls *ls, struct dlm_message *ms)
3016{
3017 int len, error, ret_nodeid, dir_nodeid, from_nodeid, our_nodeid;
3018
3019 from_nodeid = ms->m_header.h_nodeid;
3020 our_nodeid = dlm_our_nodeid();
3021
3022 len = receive_extralen(ms);
3023
3024 dir_nodeid = dlm_hash2nodeid(ls, ms->m_hash);
3025 if (dir_nodeid != our_nodeid) {
3026 log_error(ls, "lookup dir_nodeid %d from %d",
3027 dir_nodeid, from_nodeid);
3028 error = -EINVAL;
3029 ret_nodeid = -1;
3030 goto out;
3031 }
3032
3033 error = dlm_dir_lookup(ls, from_nodeid, ms->m_extra, len, &ret_nodeid);
3034
3035 /* Optimization: we're master so treat lookup as a request */
3036 if (!error && ret_nodeid == our_nodeid) {
3037 receive_request(ls, ms);
3038 return;
3039 }
3040 out:
3041 send_lookup_reply(ls, ms, ret_nodeid, error);
3042}
3043
3044static void receive_remove(struct dlm_ls *ls, struct dlm_message *ms)
3045{
3046 int len, dir_nodeid, from_nodeid;
3047
3048 from_nodeid = ms->m_header.h_nodeid;
3049
3050 len = receive_extralen(ms);
3051
3052 dir_nodeid = dlm_hash2nodeid(ls, ms->m_hash);
3053 if (dir_nodeid != dlm_our_nodeid()) {
3054 log_error(ls, "remove dir entry dir_nodeid %d from %d",
3055 dir_nodeid, from_nodeid);
3056 return;
3057 }
3058
3059 dlm_dir_remove_entry(ls, from_nodeid, ms->m_extra, len);
3060}
3061
8499137d
DT
3062static void receive_purge(struct dlm_ls *ls, struct dlm_message *ms)
3063{
3064 do_purge(ls, ms->m_nodeid, ms->m_pid);
3065}
3066
e7fd4179
DT
3067static void receive_request_reply(struct dlm_ls *ls, struct dlm_message *ms)
3068{
3069 struct dlm_lkb *lkb;
3070 struct dlm_rsb *r;
ef0c2bb0 3071 int error, mstype, result;
e7fd4179
DT
3072
3073 error = find_lkb(ls, ms->m_remid, &lkb);
3074 if (error) {
3075 log_error(ls, "receive_request_reply no lkb");
3076 return;
3077 }
3078 DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb););
3079
e7fd4179
DT
3080 r = lkb->lkb_resource;
3081 hold_rsb(r);
3082 lock_rsb(r);
3083
ef0c2bb0
DT
3084 mstype = lkb->lkb_wait_type;
3085 error = remove_from_waiters(lkb, DLM_MSG_REQUEST_REPLY);
3086 if (error)
3087 goto out;
3088
e7fd4179
DT
3089 /* Optimization: the dir node was also the master, so it took our
3090 lookup as a request and sent request reply instead of lookup reply */
3091 if (mstype == DLM_MSG_LOOKUP) {
3092 r->res_nodeid = ms->m_header.h_nodeid;
3093 lkb->lkb_nodeid = r->res_nodeid;
3094 }
3095
ef0c2bb0
DT
3096 /* this is the value returned from do_request() on the master */
3097 result = ms->m_result;
3098
3099 switch (result) {
e7fd4179 3100 case -EAGAIN:
ef0c2bb0 3101 /* request would block (be queued) on remote master */
e7fd4179
DT
3102 queue_cast(r, lkb, -EAGAIN);
3103 confirm_master(r, -EAGAIN);
ef0c2bb0 3104 unhold_lkb(lkb); /* undoes create_lkb() */
e7fd4179
DT
3105 break;
3106
3107 case -EINPROGRESS:
3108 case 0:
3109 /* request was queued or granted on remote master */
3110 receive_flags_reply(lkb, ms);
3111 lkb->lkb_remid = ms->m_lkid;
7d3c1feb
DT
3112 if (is_altmode(lkb))
3113 munge_altmode(lkb, ms);
ef0c2bb0 3114 if (result)
e7fd4179
DT
3115 add_lkb(r, lkb, DLM_LKSTS_WAITING);
3116 else {
3117 grant_lock_pc(r, lkb, ms);
3118 queue_cast(r, lkb, 0);
3119 }
ef0c2bb0 3120 confirm_master(r, result);
e7fd4179
DT
3121 break;
3122
597d0cae 3123 case -EBADR:
e7fd4179
DT
3124 case -ENOTBLK:
3125 /* find_rsb failed to find rsb or rsb wasn't master */
ef0c2bb0
DT
3126 log_debug(ls, "receive_request_reply %x %x master diff %d %d",
3127 lkb->lkb_id, lkb->lkb_flags, r->res_nodeid, result);
e7fd4179
DT
3128 r->res_nodeid = -1;
3129 lkb->lkb_nodeid = -1;
ef0c2bb0
DT
3130
3131 if (is_overlap(lkb)) {
3132 /* we'll ignore error in cancel/unlock reply */
3133 queue_cast_overlap(r, lkb);
3134 unhold_lkb(lkb); /* undoes create_lkb() */
3135 } else
3136 _request_lock(r, lkb);
e7fd4179
DT
3137 break;
3138
3139 default:
ef0c2bb0
DT
3140 log_error(ls, "receive_request_reply %x error %d",
3141 lkb->lkb_id, result);
e7fd4179
DT
3142 }
3143
ef0c2bb0
DT
3144 if (is_overlap_unlock(lkb) && (result == 0 || result == -EINPROGRESS)) {
3145 log_debug(ls, "receive_request_reply %x result %d unlock",
3146 lkb->lkb_id, result);
3147 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK;
3148 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
3149 send_unlock(r, lkb);
3150 } else if (is_overlap_cancel(lkb) && (result == -EINPROGRESS)) {
3151 log_debug(ls, "receive_request_reply %x cancel", lkb->lkb_id);
3152 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK;
3153 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
3154 send_cancel(r, lkb);
3155 } else {
3156 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
3157 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK;
3158 }
3159 out:
e7fd4179
DT
3160 unlock_rsb(r);
3161 put_rsb(r);
b3f58d8f 3162 dlm_put_lkb(lkb);
e7fd4179
DT
3163}
3164
3165static void __receive_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
3166 struct dlm_message *ms)
3167{
e7fd4179 3168 /* this is the value returned from do_convert() on the master */
ef0c2bb0 3169 switch (ms->m_result) {
e7fd4179
DT
3170 case -EAGAIN:
3171 /* convert would block (be queued) on remote master */
3172 queue_cast(r, lkb, -EAGAIN);
3173 break;
3174
3175 case -EINPROGRESS:
3176 /* convert was queued on remote master */
7d3c1feb
DT
3177 receive_flags_reply(lkb, ms);
3178 if (is_demoted(lkb))
3179 munge_demoted(lkb, ms);
e7fd4179
DT
3180 del_lkb(r, lkb);
3181 add_lkb(r, lkb, DLM_LKSTS_CONVERT);
3182 break;
3183
3184 case 0:
3185 /* convert was granted on remote master */
3186 receive_flags_reply(lkb, ms);
7d3c1feb
DT
3187 if (is_demoted(lkb))
3188 munge_demoted(lkb, ms);
e7fd4179
DT
3189 grant_lock_pc(r, lkb, ms);
3190 queue_cast(r, lkb, 0);
3191 break;
3192
3193 default:
ef0c2bb0
DT
3194 log_error(r->res_ls, "receive_convert_reply %x error %d",
3195 lkb->lkb_id, ms->m_result);
e7fd4179
DT
3196 }
3197}
3198
3199static void _receive_convert_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
3200{
3201 struct dlm_rsb *r = lkb->lkb_resource;
ef0c2bb0 3202 int error;
e7fd4179
DT
3203
3204 hold_rsb(r);
3205 lock_rsb(r);
3206
ef0c2bb0
DT
3207 /* stub reply can happen with waiters_mutex held */
3208 error = remove_from_waiters_ms(lkb, ms);
3209 if (error)
3210 goto out;
e7fd4179 3211
ef0c2bb0
DT
3212 __receive_convert_reply(r, lkb, ms);
3213 out:
e7fd4179
DT
3214 unlock_rsb(r);
3215 put_rsb(r);
3216}
3217
3218static void receive_convert_reply(struct dlm_ls *ls, struct dlm_message *ms)
3219{
3220 struct dlm_lkb *lkb;
3221 int error;
3222
3223 error = find_lkb(ls, ms->m_remid, &lkb);
3224 if (error) {
3225 log_error(ls, "receive_convert_reply no lkb");
3226 return;
3227 }
3228 DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb););
3229
e7fd4179 3230 _receive_convert_reply(lkb, ms);
b3f58d8f 3231 dlm_put_lkb(lkb);
e7fd4179
DT
3232}
3233
3234static void _receive_unlock_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
3235{
3236 struct dlm_rsb *r = lkb->lkb_resource;
ef0c2bb0 3237 int error;
e7fd4179
DT
3238
3239 hold_rsb(r);
3240 lock_rsb(r);
3241
ef0c2bb0
DT
3242 /* stub reply can happen with waiters_mutex held */
3243 error = remove_from_waiters_ms(lkb, ms);
3244 if (error)
3245 goto out;
3246
e7fd4179
DT
3247 /* this is the value returned from do_unlock() on the master */
3248
ef0c2bb0 3249 switch (ms->m_result) {
e7fd4179
DT
3250 case -DLM_EUNLOCK:
3251 receive_flags_reply(lkb, ms);
3252 remove_lock_pc(r, lkb);
3253 queue_cast(r, lkb, -DLM_EUNLOCK);
3254 break;
ef0c2bb0
DT
3255 case -ENOENT:
3256 break;
e7fd4179 3257 default:
ef0c2bb0
DT
3258 log_error(r->res_ls, "receive_unlock_reply %x error %d",
3259 lkb->lkb_id, ms->m_result);
e7fd4179 3260 }
ef0c2bb0 3261 out:
e7fd4179
DT
3262 unlock_rsb(r);
3263 put_rsb(r);
3264}
3265
3266static void receive_unlock_reply(struct dlm_ls *ls, struct dlm_message *ms)
3267{
3268 struct dlm_lkb *lkb;
3269 int error;
3270
3271 error = find_lkb(ls, ms->m_remid, &lkb);
3272 if (error) {
3273 log_error(ls, "receive_unlock_reply no lkb");
3274 return;
3275 }
3276 DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb););
3277
e7fd4179 3278 _receive_unlock_reply(lkb, ms);
b3f58d8f 3279 dlm_put_lkb(lkb);
e7fd4179
DT
3280}
3281
3282static void _receive_cancel_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
3283{
3284 struct dlm_rsb *r = lkb->lkb_resource;
ef0c2bb0 3285 int error;
e7fd4179
DT
3286
3287 hold_rsb(r);
3288 lock_rsb(r);
3289
ef0c2bb0
DT
3290 /* stub reply can happen with waiters_mutex held */
3291 error = remove_from_waiters_ms(lkb, ms);
3292 if (error)
3293 goto out;
3294
e7fd4179
DT
3295 /* this is the value returned from do_cancel() on the master */
3296
ef0c2bb0 3297 switch (ms->m_result) {
e7fd4179
DT
3298 case -DLM_ECANCEL:
3299 receive_flags_reply(lkb, ms);
3300 revert_lock_pc(r, lkb);
ef0c2bb0
DT
3301 if (ms->m_result)
3302 queue_cast(r, lkb, -DLM_ECANCEL);
3303 break;
3304 case 0:
e7fd4179
DT
3305 break;
3306 default:
ef0c2bb0
DT
3307 log_error(r->res_ls, "receive_cancel_reply %x error %d",
3308 lkb->lkb_id, ms->m_result);
e7fd4179 3309 }
ef0c2bb0 3310 out:
e7fd4179
DT
3311 unlock_rsb(r);
3312 put_rsb(r);
3313}
3314
3315static void receive_cancel_reply(struct dlm_ls *ls, struct dlm_message *ms)
3316{
3317 struct dlm_lkb *lkb;
3318 int error;
3319
3320 error = find_lkb(ls, ms->m_remid, &lkb);
3321 if (error) {
3322 log_error(ls, "receive_cancel_reply no lkb");
3323 return;
3324 }
3325 DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb););
3326
e7fd4179 3327 _receive_cancel_reply(lkb, ms);
b3f58d8f 3328 dlm_put_lkb(lkb);
e7fd4179
DT
3329}
3330
3331static void receive_lookup_reply(struct dlm_ls *ls, struct dlm_message *ms)
3332{
3333 struct dlm_lkb *lkb;
3334 struct dlm_rsb *r;
3335 int error, ret_nodeid;
3336
3337 error = find_lkb(ls, ms->m_lkid, &lkb);
3338 if (error) {
3339 log_error(ls, "receive_lookup_reply no lkb");
3340 return;
3341 }
3342
ef0c2bb0 3343 /* ms->m_result is the value returned by dlm_dir_lookup on dir node
e7fd4179 3344 FIXME: will a non-zero error ever be returned? */
e7fd4179
DT
3345
3346 r = lkb->lkb_resource;
3347 hold_rsb(r);
3348 lock_rsb(r);
3349
ef0c2bb0
DT
3350 error = remove_from_waiters(lkb, DLM_MSG_LOOKUP_REPLY);
3351 if (error)
3352 goto out;
3353
e7fd4179
DT
3354 ret_nodeid = ms->m_nodeid;
3355 if (ret_nodeid == dlm_our_nodeid()) {
3356 r->res_nodeid = 0;
3357 ret_nodeid = 0;
3358 r->res_first_lkid = 0;
3359 } else {
3360 /* set_master() will copy res_nodeid to lkb_nodeid */
3361 r->res_nodeid = ret_nodeid;
3362 }
3363
ef0c2bb0
DT
3364 if (is_overlap(lkb)) {
3365 log_debug(ls, "receive_lookup_reply %x unlock %x",
3366 lkb->lkb_id, lkb->lkb_flags);
3367 queue_cast_overlap(r, lkb);
3368 unhold_lkb(lkb); /* undoes create_lkb() */
3369 goto out_list;
3370 }
3371
e7fd4179
DT
3372 _request_lock(r, lkb);
3373
ef0c2bb0 3374 out_list:
e7fd4179
DT
3375 if (!ret_nodeid)
3376 process_lookup_list(r);
ef0c2bb0 3377 out:
e7fd4179
DT
3378 unlock_rsb(r);
3379 put_rsb(r);
b3f58d8f 3380 dlm_put_lkb(lkb);
e7fd4179
DT
3381}
3382
3383int dlm_receive_message(struct dlm_header *hd, int nodeid, int recovery)
3384{
3385 struct dlm_message *ms = (struct dlm_message *) hd;
3386 struct dlm_ls *ls;
8fd3a98f 3387 int error = 0;
e7fd4179
DT
3388
3389 if (!recovery)
3390 dlm_message_in(ms);
3391
3392 ls = dlm_find_lockspace_global(hd->h_lockspace);
3393 if (!ls) {
3394 log_print("drop message %d from %d for unknown lockspace %d",
3395 ms->m_type, nodeid, hd->h_lockspace);
3396 return -EINVAL;
3397 }
3398
3399 /* recovery may have just ended leaving a bunch of backed-up requests
3400 in the requestqueue; wait while dlm_recoverd clears them */
3401
3402 if (!recovery)
3403 dlm_wait_requestqueue(ls);
3404
3405 /* recovery may have just started while there were a bunch of
3406 in-flight requests -- save them in requestqueue to be processed
3407 after recovery. we can't let dlm_recvd block on the recovery
3408 lock. if dlm_recoverd is calling this function to clear the
3409 requestqueue, it needs to be interrupted (-EINTR) if another
3410 recovery operation is starting. */
3411
3412 while (1) {
3413 if (dlm_locking_stopped(ls)) {
d4400156
DT
3414 if (recovery) {
3415 error = -EINTR;
3416 goto out;
3417 }
3418 error = dlm_add_requestqueue(ls, nodeid, hd);
3419 if (error == -EAGAIN)
3420 continue;
3421 else {
3422 error = -EINTR;
3423 goto out;
3424 }
e7fd4179
DT
3425 }
3426
3427 if (lock_recovery_try(ls))
3428 break;
3429 schedule();
3430 }
3431
3432 switch (ms->m_type) {
3433
3434 /* messages sent to a master node */
3435
3436 case DLM_MSG_REQUEST:
3437 receive_request(ls, ms);
3438 break;
3439
3440 case DLM_MSG_CONVERT:
3441 receive_convert(ls, ms);
3442 break;
3443
3444 case DLM_MSG_UNLOCK:
3445 receive_unlock(ls, ms);
3446 break;
3447
3448 case DLM_MSG_CANCEL:
3449 receive_cancel(ls, ms);
3450 break;
3451
3452 /* messages sent from a master node (replies to above) */
3453
3454 case DLM_MSG_REQUEST_REPLY:
3455 receive_request_reply(ls, ms);
3456 break;
3457
3458 case DLM_MSG_CONVERT_REPLY:
3459 receive_convert_reply(ls, ms);
3460 break;
3461
3462 case DLM_MSG_UNLOCK_REPLY:
3463 receive_unlock_reply(ls, ms);
3464 break;
3465
3466 case DLM_MSG_CANCEL_REPLY:
3467 receive_cancel_reply(ls, ms);
3468 break;
3469
3470 /* messages sent from a master node (only two types of async msg) */
3471
3472 case DLM_MSG_GRANT:
3473 receive_grant(ls, ms);
3474 break;
3475
3476 case DLM_MSG_BAST:
3477 receive_bast(ls, ms);
3478 break;
3479
3480 /* messages sent to a dir node */
3481
3482 case DLM_MSG_LOOKUP:
3483 receive_lookup(ls, ms);
3484 break;
3485
3486 case DLM_MSG_REMOVE:
3487 receive_remove(ls, ms);
3488 break;
3489
3490 /* messages sent from a dir node (remove has no reply) */
3491
3492 case DLM_MSG_LOOKUP_REPLY:
3493 receive_lookup_reply(ls, ms);
3494 break;
3495
8499137d
DT
3496 /* other messages */
3497
3498 case DLM_MSG_PURGE:
3499 receive_purge(ls, ms);
3500 break;
3501
e7fd4179
DT
3502 default:
3503 log_error(ls, "unknown message type %d", ms->m_type);
3504 }
3505
3506 unlock_recovery(ls);
3507 out:
3508 dlm_put_lockspace(ls);
3509 dlm_astd_wake();
8fd3a98f 3510 return error;
e7fd4179
DT
3511}
3512
3513
3514/*
3515 * Recovery related
3516 */
3517
3518static void recover_convert_waiter(struct dlm_ls *ls, struct dlm_lkb *lkb)
3519{
3520 if (middle_conversion(lkb)) {
3521 hold_lkb(lkb);
ef0c2bb0 3522 ls->ls_stub_ms.m_type = DLM_MSG_CONVERT_REPLY;
e7fd4179 3523 ls->ls_stub_ms.m_result = -EINPROGRESS;
075529b5 3524 ls->ls_stub_ms.m_flags = lkb->lkb_flags;
e7fd4179
DT
3525 _receive_convert_reply(lkb, &ls->ls_stub_ms);
3526
3527 /* Same special case as in receive_rcom_lock_args() */
3528 lkb->lkb_grmode = DLM_LOCK_IV;
3529 rsb_set_flag(lkb->lkb_resource, RSB_RECOVER_CONVERT);
3530 unhold_lkb(lkb);
3531
3532 } else if (lkb->lkb_rqmode >= lkb->lkb_grmode) {
3533 lkb->lkb_flags |= DLM_IFL_RESEND;
3534 }
3535
3536 /* lkb->lkb_rqmode < lkb->lkb_grmode shouldn't happen since down
3537 conversions are async; there's no reply from the remote master */
3538}
3539
3540/* A waiting lkb needs recovery if the master node has failed, or
3541 the master node is changing (only when no directory is used) */
3542
3543static int waiter_needs_recovery(struct dlm_ls *ls, struct dlm_lkb *lkb)
3544{
3545 if (dlm_is_removed(ls, lkb->lkb_nodeid))
3546 return 1;
3547
3548 if (!dlm_no_directory(ls))
3549 return 0;
3550
3551 if (dlm_dir_nodeid(lkb->lkb_resource) != lkb->lkb_nodeid)
3552 return 1;
3553
3554 return 0;
3555}
3556
3557/* Recovery for locks that are waiting for replies from nodes that are now
3558 gone. We can just complete unlocks and cancels by faking a reply from the
3559 dead node. Requests and up-conversions we flag to be resent after
3560 recovery. Down-conversions can just be completed with a fake reply like
3561 unlocks. Conversions between PR and CW need special attention. */
3562
3563void dlm_recover_waiters_pre(struct dlm_ls *ls)
3564{
3565 struct dlm_lkb *lkb, *safe;
3566
90135925 3567 mutex_lock(&ls->ls_waiters_mutex);
e7fd4179
DT
3568
3569 list_for_each_entry_safe(lkb, safe, &ls->ls_waiters, lkb_wait_reply) {
3570 log_debug(ls, "pre recover waiter lkid %x type %d flags %x",
3571 lkb->lkb_id, lkb->lkb_wait_type, lkb->lkb_flags);
3572
3573 /* all outstanding lookups, regardless of destination will be
3574 resent after recovery is done */
3575
3576 if (lkb->lkb_wait_type == DLM_MSG_LOOKUP) {
3577 lkb->lkb_flags |= DLM_IFL_RESEND;
3578 continue;
3579 }
3580
3581 if (!waiter_needs_recovery(ls, lkb))
3582 continue;
3583
3584 switch (lkb->lkb_wait_type) {
3585
3586 case DLM_MSG_REQUEST:
3587 lkb->lkb_flags |= DLM_IFL_RESEND;
3588 break;
3589
3590 case DLM_MSG_CONVERT:
3591 recover_convert_waiter(ls, lkb);
3592 break;
3593
3594 case DLM_MSG_UNLOCK:
3595 hold_lkb(lkb);
ef0c2bb0 3596 ls->ls_stub_ms.m_type = DLM_MSG_UNLOCK_REPLY;
e7fd4179 3597 ls->ls_stub_ms.m_result = -DLM_EUNLOCK;
075529b5 3598 ls->ls_stub_ms.m_flags = lkb->lkb_flags;
e7fd4179 3599 _receive_unlock_reply(lkb, &ls->ls_stub_ms);
b3f58d8f 3600 dlm_put_lkb(lkb);
e7fd4179
DT
3601 break;
3602
3603 case DLM_MSG_CANCEL:
3604 hold_lkb(lkb);
ef0c2bb0 3605 ls->ls_stub_ms.m_type = DLM_MSG_CANCEL_REPLY;
e7fd4179 3606 ls->ls_stub_ms.m_result = -DLM_ECANCEL;
075529b5 3607 ls->ls_stub_ms.m_flags = lkb->lkb_flags;
e7fd4179 3608 _receive_cancel_reply(lkb, &ls->ls_stub_ms);
b3f58d8f 3609 dlm_put_lkb(lkb);
e7fd4179
DT
3610 break;
3611
3612 default:
3613 log_error(ls, "invalid lkb wait_type %d",
3614 lkb->lkb_wait_type);
3615 }
81456807 3616 schedule();
e7fd4179 3617 }
90135925 3618 mutex_unlock(&ls->ls_waiters_mutex);
e7fd4179
DT
3619}
3620
ef0c2bb0 3621static struct dlm_lkb *find_resend_waiter(struct dlm_ls *ls)
e7fd4179
DT
3622{
3623 struct dlm_lkb *lkb;
ef0c2bb0 3624 int found = 0;
e7fd4179 3625
90135925 3626 mutex_lock(&ls->ls_waiters_mutex);
e7fd4179
DT
3627 list_for_each_entry(lkb, &ls->ls_waiters, lkb_wait_reply) {
3628 if (lkb->lkb_flags & DLM_IFL_RESEND) {
ef0c2bb0
DT
3629 hold_lkb(lkb);
3630 found = 1;
e7fd4179
DT
3631 break;
3632 }
3633 }
90135925 3634 mutex_unlock(&ls->ls_waiters_mutex);
e7fd4179 3635
ef0c2bb0 3636 if (!found)
e7fd4179 3637 lkb = NULL;
ef0c2bb0 3638 return lkb;
e7fd4179
DT
3639}
3640
3641/* Deal with lookups and lkb's marked RESEND from _pre. We may now be the
3642 master or dir-node for r. Processing the lkb may result in it being placed
3643 back on waiters. */
3644
ef0c2bb0
DT
3645/* We do this after normal locking has been enabled and any saved messages
3646 (in requestqueue) have been processed. We should be confident that at
3647 this point we won't get or process a reply to any of these waiting
3648 operations. But, new ops may be coming in on the rsbs/locks here from
3649 userspace or remotely. */
3650
3651/* there may have been an overlap unlock/cancel prior to recovery or after
3652 recovery. if before, the lkb may still have a pos wait_count; if after, the
3653 overlap flag would just have been set and nothing new sent. we can be
3654 confident here than any replies to either the initial op or overlap ops
3655 prior to recovery have been received. */
3656
e7fd4179
DT
3657int dlm_recover_waiters_post(struct dlm_ls *ls)
3658{
3659 struct dlm_lkb *lkb;
3660 struct dlm_rsb *r;
ef0c2bb0 3661 int error = 0, mstype, err, oc, ou;
e7fd4179
DT
3662
3663 while (1) {
3664 if (dlm_locking_stopped(ls)) {
3665 log_debug(ls, "recover_waiters_post aborted");
3666 error = -EINTR;
3667 break;
3668 }
3669
ef0c2bb0
DT
3670 lkb = find_resend_waiter(ls);
3671 if (!lkb)
e7fd4179
DT
3672 break;
3673
3674 r = lkb->lkb_resource;
ef0c2bb0
DT
3675 hold_rsb(r);
3676 lock_rsb(r);
3677
3678 mstype = lkb->lkb_wait_type;
3679 oc = is_overlap_cancel(lkb);
3680 ou = is_overlap_unlock(lkb);
3681 err = 0;
e7fd4179
DT
3682
3683 log_debug(ls, "recover_waiters_post %x type %d flags %x %s",
3684 lkb->lkb_id, mstype, lkb->lkb_flags, r->res_name);
3685
ef0c2bb0
DT
3686 /* At this point we assume that we won't get a reply to any
3687 previous op or overlap op on this lock. First, do a big
3688 remove_from_waiters() for all previous ops. */
3689
3690 lkb->lkb_flags &= ~DLM_IFL_RESEND;
3691 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK;
3692 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
3693 lkb->lkb_wait_type = 0;
3694 lkb->lkb_wait_count = 0;
3695 mutex_lock(&ls->ls_waiters_mutex);
3696 list_del_init(&lkb->lkb_wait_reply);
3697 mutex_unlock(&ls->ls_waiters_mutex);
3698 unhold_lkb(lkb); /* for waiters list */
3699
3700 if (oc || ou) {
3701 /* do an unlock or cancel instead of resending */
3702 switch (mstype) {
3703 case DLM_MSG_LOOKUP:
3704 case DLM_MSG_REQUEST:
3705 queue_cast(r, lkb, ou ? -DLM_EUNLOCK :
3706 -DLM_ECANCEL);
3707 unhold_lkb(lkb); /* undoes create_lkb() */
3708 break;
3709 case DLM_MSG_CONVERT:
3710 if (oc) {
3711 queue_cast(r, lkb, -DLM_ECANCEL);
3712 } else {
3713 lkb->lkb_exflags |= DLM_LKF_FORCEUNLOCK;
3714 _unlock_lock(r, lkb);
3715 }
3716 break;
3717 default:
3718 err = 1;
3719 }
3720 } else {
3721 switch (mstype) {
3722 case DLM_MSG_LOOKUP:
3723 case DLM_MSG_REQUEST:
3724 _request_lock(r, lkb);
3725 if (is_master(r))
3726 confirm_master(r, 0);
3727 break;
3728 case DLM_MSG_CONVERT:
3729 _convert_lock(r, lkb);
3730 break;
3731 default:
3732 err = 1;
3733 }
e7fd4179 3734 }
ef0c2bb0
DT
3735
3736 if (err)
3737 log_error(ls, "recover_waiters_post %x %d %x %d %d",
3738 lkb->lkb_id, mstype, lkb->lkb_flags, oc, ou);
3739 unlock_rsb(r);
3740 put_rsb(r);
3741 dlm_put_lkb(lkb);
e7fd4179
DT
3742 }
3743
3744 return error;
3745}
3746
3747static void purge_queue(struct dlm_rsb *r, struct list_head *queue,
3748 int (*test)(struct dlm_ls *ls, struct dlm_lkb *lkb))
3749{
3750 struct dlm_ls *ls = r->res_ls;
3751 struct dlm_lkb *lkb, *safe;
3752
3753 list_for_each_entry_safe(lkb, safe, queue, lkb_statequeue) {
3754 if (test(ls, lkb)) {
97a35d1e 3755 rsb_set_flag(r, RSB_LOCKS_PURGED);
e7fd4179
DT
3756 del_lkb(r, lkb);
3757 /* this put should free the lkb */
b3f58d8f 3758 if (!dlm_put_lkb(lkb))
e7fd4179
DT
3759 log_error(ls, "purged lkb not released");
3760 }
3761 }
3762}
3763
3764static int purge_dead_test(struct dlm_ls *ls, struct dlm_lkb *lkb)
3765{
3766 return (is_master_copy(lkb) && dlm_is_removed(ls, lkb->lkb_nodeid));
3767}
3768
3769static int purge_mstcpy_test(struct dlm_ls *ls, struct dlm_lkb *lkb)
3770{
3771 return is_master_copy(lkb);
3772}
3773
3774static void purge_dead_locks(struct dlm_rsb *r)
3775{
3776 purge_queue(r, &r->res_grantqueue, &purge_dead_test);
3777 purge_queue(r, &r->res_convertqueue, &purge_dead_test);
3778 purge_queue(r, &r->res_waitqueue, &purge_dead_test);
3779}
3780
3781void dlm_purge_mstcpy_locks(struct dlm_rsb *r)
3782{
3783 purge_queue(r, &r->res_grantqueue, &purge_mstcpy_test);
3784 purge_queue(r, &r->res_convertqueue, &purge_mstcpy_test);
3785 purge_queue(r, &r->res_waitqueue, &purge_mstcpy_test);
3786}
3787
3788/* Get rid of locks held by nodes that are gone. */
3789
3790int dlm_purge_locks(struct dlm_ls *ls)
3791{
3792 struct dlm_rsb *r;
3793
3794 log_debug(ls, "dlm_purge_locks");
3795
3796 down_write(&ls->ls_root_sem);
3797 list_for_each_entry(r, &ls->ls_root_list, res_root_list) {
3798 hold_rsb(r);
3799 lock_rsb(r);
3800 if (is_master(r))
3801 purge_dead_locks(r);
3802 unlock_rsb(r);
3803 unhold_rsb(r);
3804
3805 schedule();
3806 }
3807 up_write(&ls->ls_root_sem);
3808
3809 return 0;
3810}
3811
97a35d1e
DT
3812static struct dlm_rsb *find_purged_rsb(struct dlm_ls *ls, int bucket)
3813{
3814 struct dlm_rsb *r, *r_ret = NULL;
3815
3816 read_lock(&ls->ls_rsbtbl[bucket].lock);
3817 list_for_each_entry(r, &ls->ls_rsbtbl[bucket].list, res_hashchain) {
3818 if (!rsb_flag(r, RSB_LOCKS_PURGED))
3819 continue;
3820 hold_rsb(r);
3821 rsb_clear_flag(r, RSB_LOCKS_PURGED);
3822 r_ret = r;
3823 break;
3824 }
3825 read_unlock(&ls->ls_rsbtbl[bucket].lock);
3826 return r_ret;
3827}
3828
3829void dlm_grant_after_purge(struct dlm_ls *ls)
e7fd4179
DT
3830{
3831 struct dlm_rsb *r;
2b4e926a 3832 int bucket = 0;
e7fd4179 3833
2b4e926a
DT
3834 while (1) {
3835 r = find_purged_rsb(ls, bucket);
3836 if (!r) {
3837 if (bucket == ls->ls_rsbtbl_size - 1)
3838 break;
3839 bucket++;
97a35d1e 3840 continue;
2b4e926a 3841 }
97a35d1e
DT
3842 lock_rsb(r);
3843 if (is_master(r)) {
3844 grant_pending_locks(r);
3845 confirm_master(r, 0);
e7fd4179 3846 }
97a35d1e
DT
3847 unlock_rsb(r);
3848 put_rsb(r);
2b4e926a 3849 schedule();
e7fd4179 3850 }
e7fd4179
DT
3851}
3852
3853static struct dlm_lkb *search_remid_list(struct list_head *head, int nodeid,
3854 uint32_t remid)
3855{
3856 struct dlm_lkb *lkb;
3857
3858 list_for_each_entry(lkb, head, lkb_statequeue) {
3859 if (lkb->lkb_nodeid == nodeid && lkb->lkb_remid == remid)
3860 return lkb;
3861 }
3862 return NULL;
3863}
3864
3865static struct dlm_lkb *search_remid(struct dlm_rsb *r, int nodeid,
3866 uint32_t remid)
3867{
3868 struct dlm_lkb *lkb;
3869
3870 lkb = search_remid_list(&r->res_grantqueue, nodeid, remid);
3871 if (lkb)
3872 return lkb;
3873 lkb = search_remid_list(&r->res_convertqueue, nodeid, remid);
3874 if (lkb)
3875 return lkb;
3876 lkb = search_remid_list(&r->res_waitqueue, nodeid, remid);
3877 if (lkb)
3878 return lkb;
3879 return NULL;
3880}
3881
3882static int receive_rcom_lock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
3883 struct dlm_rsb *r, struct dlm_rcom *rc)
3884{
3885 struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf;
3886 int lvblen;
3887
3888 lkb->lkb_nodeid = rc->rc_header.h_nodeid;
3889 lkb->lkb_ownpid = rl->rl_ownpid;
3890 lkb->lkb_remid = rl->rl_lkid;
3891 lkb->lkb_exflags = rl->rl_exflags;
3892 lkb->lkb_flags = rl->rl_flags & 0x0000FFFF;
3893 lkb->lkb_flags |= DLM_IFL_MSTCPY;
3894 lkb->lkb_lvbseq = rl->rl_lvbseq;
3895 lkb->lkb_rqmode = rl->rl_rqmode;
3896 lkb->lkb_grmode = rl->rl_grmode;
3897 /* don't set lkb_status because add_lkb wants to itself */
3898
3899 lkb->lkb_bastaddr = (void *) (long) (rl->rl_asts & AST_BAST);
3900 lkb->lkb_astaddr = (void *) (long) (rl->rl_asts & AST_COMP);
3901
e7fd4179
DT
3902 if (lkb->lkb_exflags & DLM_LKF_VALBLK) {
3903 lkb->lkb_lvbptr = allocate_lvb(ls);
3904 if (!lkb->lkb_lvbptr)
3905 return -ENOMEM;
3906 lvblen = rc->rc_header.h_length - sizeof(struct dlm_rcom) -
3907 sizeof(struct rcom_lock);
3908 memcpy(lkb->lkb_lvbptr, rl->rl_lvb, lvblen);
3909 }
3910
3911 /* Conversions between PR and CW (middle modes) need special handling.
3912 The real granted mode of these converting locks cannot be determined
3913 until all locks have been rebuilt on the rsb (recover_conversion) */
3914
3915 if (rl->rl_wait_type == DLM_MSG_CONVERT && middle_conversion(lkb)) {
3916 rl->rl_status = DLM_LKSTS_CONVERT;
3917 lkb->lkb_grmode = DLM_LOCK_IV;
3918 rsb_set_flag(r, RSB_RECOVER_CONVERT);
3919 }
3920
3921 return 0;
3922}
3923
3924/* This lkb may have been recovered in a previous aborted recovery so we need
3925 to check if the rsb already has an lkb with the given remote nodeid/lkid.
3926 If so we just send back a standard reply. If not, we create a new lkb with
3927 the given values and send back our lkid. We send back our lkid by sending
3928 back the rcom_lock struct we got but with the remid field filled in. */
3929
3930int dlm_recover_master_copy(struct dlm_ls *ls, struct dlm_rcom *rc)
3931{
3932 struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf;
3933 struct dlm_rsb *r;
3934 struct dlm_lkb *lkb;
3935 int error;
3936
3937 if (rl->rl_parent_lkid) {
3938 error = -EOPNOTSUPP;
3939 goto out;
3940 }
3941
3942 error = find_rsb(ls, rl->rl_name, rl->rl_namelen, R_MASTER, &r);
3943 if (error)
3944 goto out;
3945
3946 lock_rsb(r);
3947
3948 lkb = search_remid(r, rc->rc_header.h_nodeid, rl->rl_lkid);
3949 if (lkb) {
3950 error = -EEXIST;
3951 goto out_remid;
3952 }
3953
3954 error = create_lkb(ls, &lkb);
3955 if (error)
3956 goto out_unlock;
3957
3958 error = receive_rcom_lock_args(ls, lkb, r, rc);
3959 if (error) {
b3f58d8f 3960 __put_lkb(ls, lkb);
e7fd4179
DT
3961 goto out_unlock;
3962 }
3963
3964 attach_lkb(r, lkb);
3965 add_lkb(r, lkb, rl->rl_status);
3966 error = 0;
3967
3968 out_remid:
3969 /* this is the new value returned to the lock holder for
3970 saving in its process-copy lkb */
3971 rl->rl_remid = lkb->lkb_id;
3972
3973 out_unlock:
3974 unlock_rsb(r);
3975 put_rsb(r);
3976 out:
3977 if (error)
3978 log_print("recover_master_copy %d %x", error, rl->rl_lkid);
3979 rl->rl_result = error;
3980 return error;
3981}
3982
3983int dlm_recover_process_copy(struct dlm_ls *ls, struct dlm_rcom *rc)
3984{
3985 struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf;
3986 struct dlm_rsb *r;
3987 struct dlm_lkb *lkb;
3988 int error;
3989
3990 error = find_lkb(ls, rl->rl_lkid, &lkb);
3991 if (error) {
3992 log_error(ls, "recover_process_copy no lkid %x", rl->rl_lkid);
3993 return error;
3994 }
3995
3996 DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb););
3997
3998 error = rl->rl_result;
3999
4000 r = lkb->lkb_resource;
4001 hold_rsb(r);
4002 lock_rsb(r);
4003
4004 switch (error) {
dc200a88
DT
4005 case -EBADR:
4006 /* There's a chance the new master received our lock before
4007 dlm_recover_master_reply(), this wouldn't happen if we did
4008 a barrier between recover_masters and recover_locks. */
4009 log_debug(ls, "master copy not ready %x r %lx %s", lkb->lkb_id,
4010 (unsigned long)r, r->res_name);
4011 dlm_send_rcom_lock(r, lkb);
4012 goto out;
e7fd4179
DT
4013 case -EEXIST:
4014 log_debug(ls, "master copy exists %x", lkb->lkb_id);
4015 /* fall through */
4016 case 0:
4017 lkb->lkb_remid = rl->rl_remid;
4018 break;
4019 default:
4020 log_error(ls, "dlm_recover_process_copy unknown error %d %x",
4021 error, lkb->lkb_id);
4022 }
4023
4024 /* an ack for dlm_recover_locks() which waits for replies from
4025 all the locks it sends to new masters */
4026 dlm_recovered_lock(r);
dc200a88 4027 out:
e7fd4179
DT
4028 unlock_rsb(r);
4029 put_rsb(r);
b3f58d8f 4030 dlm_put_lkb(lkb);
e7fd4179
DT
4031
4032 return 0;
4033}
4034
597d0cae
DT
4035int dlm_user_request(struct dlm_ls *ls, struct dlm_user_args *ua,
4036 int mode, uint32_t flags, void *name, unsigned int namelen,
4037 uint32_t parent_lkid)
4038{
4039 struct dlm_lkb *lkb;
4040 struct dlm_args args;
4041 int error;
4042
4043 lock_recovery(ls);
4044
4045 error = create_lkb(ls, &lkb);
4046 if (error) {
4047 kfree(ua);
4048 goto out;
4049 }
4050
4051 if (flags & DLM_LKF_VALBLK) {
62a0f623 4052 ua->lksb.sb_lvbptr = kzalloc(DLM_USER_LVB_LEN, GFP_KERNEL);
597d0cae
DT
4053 if (!ua->lksb.sb_lvbptr) {
4054 kfree(ua);
4055 __put_lkb(ls, lkb);
4056 error = -ENOMEM;
4057 goto out;
4058 }
4059 }
4060
4061 /* After ua is attached to lkb it will be freed by free_lkb().
4062 When DLM_IFL_USER is set, the dlm knows that this is a userspace
4063 lock and that lkb_astparam is the dlm_user_args structure. */
4064
4065 error = set_lock_args(mode, &ua->lksb, flags, namelen, parent_lkid,
32f105a1 4066 DLM_FAKE_USER_AST, ua, DLM_FAKE_USER_AST, &args);
597d0cae
DT
4067 lkb->lkb_flags |= DLM_IFL_USER;
4068 ua->old_mode = DLM_LOCK_IV;
4069
4070 if (error) {
4071 __put_lkb(ls, lkb);
4072 goto out;
4073 }
4074
4075 error = request_lock(ls, lkb, name, namelen, &args);
4076
4077 switch (error) {
4078 case 0:
4079 break;
4080 case -EINPROGRESS:
4081 error = 0;
4082 break;
4083 case -EAGAIN:
4084 error = 0;
4085 /* fall through */
4086 default:
4087 __put_lkb(ls, lkb);
4088 goto out;
4089 }
4090
4091 /* add this new lkb to the per-process list of locks */
4092 spin_lock(&ua->proc->locks_spin);
ef0c2bb0 4093 hold_lkb(lkb);
597d0cae
DT
4094 list_add_tail(&lkb->lkb_ownqueue, &ua->proc->locks);
4095 spin_unlock(&ua->proc->locks_spin);
4096 out:
4097 unlock_recovery(ls);
4098 return error;
4099}
4100
4101int dlm_user_convert(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
4102 int mode, uint32_t flags, uint32_t lkid, char *lvb_in)
4103{
4104 struct dlm_lkb *lkb;
4105 struct dlm_args args;
4106 struct dlm_user_args *ua;
4107 int error;
4108
4109 lock_recovery(ls);
4110
4111 error = find_lkb(ls, lkid, &lkb);
4112 if (error)
4113 goto out;
4114
4115 /* user can change the params on its lock when it converts it, or
4116 add an lvb that didn't exist before */
4117
4118 ua = (struct dlm_user_args *)lkb->lkb_astparam;
4119
4120 if (flags & DLM_LKF_VALBLK && !ua->lksb.sb_lvbptr) {
62a0f623 4121 ua->lksb.sb_lvbptr = kzalloc(DLM_USER_LVB_LEN, GFP_KERNEL);
597d0cae
DT
4122 if (!ua->lksb.sb_lvbptr) {
4123 error = -ENOMEM;
4124 goto out_put;
4125 }
4126 }
4127 if (lvb_in && ua->lksb.sb_lvbptr)
4128 memcpy(ua->lksb.sb_lvbptr, lvb_in, DLM_USER_LVB_LEN);
4129
4130 ua->castparam = ua_tmp->castparam;
4131 ua->castaddr = ua_tmp->castaddr;
4132 ua->bastparam = ua_tmp->bastparam;
4133 ua->bastaddr = ua_tmp->bastaddr;
10948eb4 4134 ua->user_lksb = ua_tmp->user_lksb;
597d0cae
DT
4135 ua->old_mode = lkb->lkb_grmode;
4136
32f105a1
DT
4137 error = set_lock_args(mode, &ua->lksb, flags, 0, 0, DLM_FAKE_USER_AST,
4138 ua, DLM_FAKE_USER_AST, &args);
597d0cae
DT
4139 if (error)
4140 goto out_put;
4141
4142 error = convert_lock(ls, lkb, &args);
4143
4144 if (error == -EINPROGRESS || error == -EAGAIN)
4145 error = 0;
4146 out_put:
4147 dlm_put_lkb(lkb);
4148 out:
4149 unlock_recovery(ls);
4150 kfree(ua_tmp);
4151 return error;
4152}
4153
4154int dlm_user_unlock(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
4155 uint32_t flags, uint32_t lkid, char *lvb_in)
4156{
4157 struct dlm_lkb *lkb;
4158 struct dlm_args args;
4159 struct dlm_user_args *ua;
4160 int error;
4161
4162 lock_recovery(ls);
4163
4164 error = find_lkb(ls, lkid, &lkb);
4165 if (error)
4166 goto out;
4167
4168 ua = (struct dlm_user_args *)lkb->lkb_astparam;
4169
4170 if (lvb_in && ua->lksb.sb_lvbptr)
4171 memcpy(ua->lksb.sb_lvbptr, lvb_in, DLM_USER_LVB_LEN);
4172 ua->castparam = ua_tmp->castparam;
cc346d55 4173 ua->user_lksb = ua_tmp->user_lksb;
597d0cae
DT
4174
4175 error = set_unlock_args(flags, ua, &args);
4176 if (error)
4177 goto out_put;
4178
4179 error = unlock_lock(ls, lkb, &args);
4180
4181 if (error == -DLM_EUNLOCK)
4182 error = 0;
ef0c2bb0
DT
4183 /* from validate_unlock_args() */
4184 if (error == -EBUSY && (flags & DLM_LKF_FORCEUNLOCK))
4185 error = 0;
597d0cae
DT
4186 if (error)
4187 goto out_put;
4188
4189 spin_lock(&ua->proc->locks_spin);
a1bc86e6
DT
4190 /* dlm_user_add_ast() may have already taken lkb off the proc list */
4191 if (!list_empty(&lkb->lkb_ownqueue))
4192 list_move(&lkb->lkb_ownqueue, &ua->proc->unlocking);
597d0cae 4193 spin_unlock(&ua->proc->locks_spin);
597d0cae
DT
4194 out_put:
4195 dlm_put_lkb(lkb);
4196 out:
4197 unlock_recovery(ls);
ef0c2bb0 4198 kfree(ua_tmp);
597d0cae
DT
4199 return error;
4200}
4201
4202int dlm_user_cancel(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
4203 uint32_t flags, uint32_t lkid)
4204{
4205 struct dlm_lkb *lkb;
4206 struct dlm_args args;
4207 struct dlm_user_args *ua;
4208 int error;
4209
4210 lock_recovery(ls);
4211
4212 error = find_lkb(ls, lkid, &lkb);
4213 if (error)
4214 goto out;
4215
4216 ua = (struct dlm_user_args *)lkb->lkb_astparam;
4217 ua->castparam = ua_tmp->castparam;
c059f70e 4218 ua->user_lksb = ua_tmp->user_lksb;
597d0cae
DT
4219
4220 error = set_unlock_args(flags, ua, &args);
4221 if (error)
4222 goto out_put;
4223
4224 error = cancel_lock(ls, lkb, &args);
4225
4226 if (error == -DLM_ECANCEL)
4227 error = 0;
ef0c2bb0
DT
4228 /* from validate_unlock_args() */
4229 if (error == -EBUSY)
4230 error = 0;
597d0cae
DT
4231 out_put:
4232 dlm_put_lkb(lkb);
4233 out:
4234 unlock_recovery(ls);
ef0c2bb0 4235 kfree(ua_tmp);
597d0cae
DT
4236 return error;
4237}
4238
ef0c2bb0
DT
4239/* lkb's that are removed from the waiters list by revert are just left on the
4240 orphans list with the granted orphan locks, to be freed by purge */
4241
597d0cae
DT
4242static int orphan_proc_lock(struct dlm_ls *ls, struct dlm_lkb *lkb)
4243{
4244 struct dlm_user_args *ua = (struct dlm_user_args *)lkb->lkb_astparam;
ef0c2bb0
DT
4245 struct dlm_args args;
4246 int error;
597d0cae 4247
ef0c2bb0
DT
4248 hold_lkb(lkb);
4249 mutex_lock(&ls->ls_orphans_mutex);
4250 list_add_tail(&lkb->lkb_ownqueue, &ls->ls_orphans);
4251 mutex_unlock(&ls->ls_orphans_mutex);
597d0cae 4252
ef0c2bb0
DT
4253 set_unlock_args(0, ua, &args);
4254
4255 error = cancel_lock(ls, lkb, &args);
4256 if (error == -DLM_ECANCEL)
4257 error = 0;
4258 return error;
597d0cae
DT
4259}
4260
4261/* The force flag allows the unlock to go ahead even if the lkb isn't granted.
4262 Regardless of what rsb queue the lock is on, it's removed and freed. */
4263
4264static int unlock_proc_lock(struct dlm_ls *ls, struct dlm_lkb *lkb)
4265{
4266 struct dlm_user_args *ua = (struct dlm_user_args *)lkb->lkb_astparam;
4267 struct dlm_args args;
4268 int error;
4269
597d0cae
DT
4270 set_unlock_args(DLM_LKF_FORCEUNLOCK, ua, &args);
4271
4272 error = unlock_lock(ls, lkb, &args);
4273 if (error == -DLM_EUNLOCK)
4274 error = 0;
4275 return error;
4276}
4277
ef0c2bb0
DT
4278/* We have to release clear_proc_locks mutex before calling unlock_proc_lock()
4279 (which does lock_rsb) due to deadlock with receiving a message that does
4280 lock_rsb followed by dlm_user_add_ast() */
4281
4282static struct dlm_lkb *del_proc_lock(struct dlm_ls *ls,
4283 struct dlm_user_proc *proc)
4284{
4285 struct dlm_lkb *lkb = NULL;
4286
4287 mutex_lock(&ls->ls_clear_proc_locks);
4288 if (list_empty(&proc->locks))
4289 goto out;
4290
4291 lkb = list_entry(proc->locks.next, struct dlm_lkb, lkb_ownqueue);
4292 list_del_init(&lkb->lkb_ownqueue);
4293
4294 if (lkb->lkb_exflags & DLM_LKF_PERSISTENT)
4295 lkb->lkb_flags |= DLM_IFL_ORPHAN;
4296 else
4297 lkb->lkb_flags |= DLM_IFL_DEAD;
4298 out:
4299 mutex_unlock(&ls->ls_clear_proc_locks);
4300 return lkb;
4301}
4302
597d0cae
DT
4303/* The ls_clear_proc_locks mutex protects against dlm_user_add_asts() which
4304 1) references lkb->ua which we free here and 2) adds lkbs to proc->asts,
4305 which we clear here. */
4306
4307/* proc CLOSING flag is set so no more device_reads should look at proc->asts
4308 list, and no more device_writes should add lkb's to proc->locks list; so we
4309 shouldn't need to take asts_spin or locks_spin here. this assumes that
4310 device reads/writes/closes are serialized -- FIXME: we may need to serialize
4311 them ourself. */
4312
4313void dlm_clear_proc_locks(struct dlm_ls *ls, struct dlm_user_proc *proc)
4314{
4315 struct dlm_lkb *lkb, *safe;
4316
4317 lock_recovery(ls);
597d0cae 4318
ef0c2bb0
DT
4319 while (1) {
4320 lkb = del_proc_lock(ls, proc);
4321 if (!lkb)
4322 break;
4323 if (lkb->lkb_exflags & DLM_LKF_PERSISTENT)
597d0cae 4324 orphan_proc_lock(ls, lkb);
ef0c2bb0 4325 else
597d0cae 4326 unlock_proc_lock(ls, lkb);
597d0cae
DT
4327
4328 /* this removes the reference for the proc->locks list
4329 added by dlm_user_request, it may result in the lkb
4330 being freed */
4331
4332 dlm_put_lkb(lkb);
4333 }
a1bc86e6 4334
ef0c2bb0
DT
4335 mutex_lock(&ls->ls_clear_proc_locks);
4336
a1bc86e6
DT
4337 /* in-progress unlocks */
4338 list_for_each_entry_safe(lkb, safe, &proc->unlocking, lkb_ownqueue) {
4339 list_del_init(&lkb->lkb_ownqueue);
4340 lkb->lkb_flags |= DLM_IFL_DEAD;
4341 dlm_put_lkb(lkb);
4342 }
4343
4344 list_for_each_entry_safe(lkb, safe, &proc->asts, lkb_astqueue) {
4345 list_del(&lkb->lkb_astqueue);
4346 dlm_put_lkb(lkb);
4347 }
4348
597d0cae
DT
4349 mutex_unlock(&ls->ls_clear_proc_locks);
4350 unlock_recovery(ls);
4351}
a1bc86e6 4352
8499137d
DT
4353static void purge_proc_locks(struct dlm_ls *ls, struct dlm_user_proc *proc)
4354{
4355 struct dlm_lkb *lkb, *safe;
4356
4357 while (1) {
4358 lkb = NULL;
4359 spin_lock(&proc->locks_spin);
4360 if (!list_empty(&proc->locks)) {
4361 lkb = list_entry(proc->locks.next, struct dlm_lkb,
4362 lkb_ownqueue);
4363 list_del_init(&lkb->lkb_ownqueue);
4364 }
4365 spin_unlock(&proc->locks_spin);
4366
4367 if (!lkb)
4368 break;
4369
4370 lkb->lkb_flags |= DLM_IFL_DEAD;
4371 unlock_proc_lock(ls, lkb);
4372 dlm_put_lkb(lkb); /* ref from proc->locks list */
4373 }
4374
4375 spin_lock(&proc->locks_spin);
4376 list_for_each_entry_safe(lkb, safe, &proc->unlocking, lkb_ownqueue) {
4377 list_del_init(&lkb->lkb_ownqueue);
4378 lkb->lkb_flags |= DLM_IFL_DEAD;
4379 dlm_put_lkb(lkb);
4380 }
4381 spin_unlock(&proc->locks_spin);
4382
4383 spin_lock(&proc->asts_spin);
4384 list_for_each_entry_safe(lkb, safe, &proc->asts, lkb_astqueue) {
4385 list_del(&lkb->lkb_astqueue);
4386 dlm_put_lkb(lkb);
4387 }
4388 spin_unlock(&proc->asts_spin);
4389}
4390
4391/* pid of 0 means purge all orphans */
4392
4393static void do_purge(struct dlm_ls *ls, int nodeid, int pid)
4394{
4395 struct dlm_lkb *lkb, *safe;
4396
4397 mutex_lock(&ls->ls_orphans_mutex);
4398 list_for_each_entry_safe(lkb, safe, &ls->ls_orphans, lkb_ownqueue) {
4399 if (pid && lkb->lkb_ownpid != pid)
4400 continue;
4401 unlock_proc_lock(ls, lkb);
4402 list_del_init(&lkb->lkb_ownqueue);
4403 dlm_put_lkb(lkb);
4404 }
4405 mutex_unlock(&ls->ls_orphans_mutex);
4406}
4407
4408static int send_purge(struct dlm_ls *ls, int nodeid, int pid)
4409{
4410 struct dlm_message *ms;
4411 struct dlm_mhandle *mh;
4412 int error;
4413
4414 error = _create_message(ls, sizeof(struct dlm_message), nodeid,
4415 DLM_MSG_PURGE, &ms, &mh);
4416 if (error)
4417 return error;
4418 ms->m_nodeid = nodeid;
4419 ms->m_pid = pid;
4420
4421 return send_message(mh, ms);
4422}
4423
4424int dlm_user_purge(struct dlm_ls *ls, struct dlm_user_proc *proc,
4425 int nodeid, int pid)
4426{
4427 int error = 0;
4428
4429 if (nodeid != dlm_our_nodeid()) {
4430 error = send_purge(ls, nodeid, pid);
4431 } else {
4432 lock_recovery(ls);
4433 if (pid == current->pid)
4434 purge_proc_locks(ls, proc);
4435 else
4436 do_purge(ls, nodeid, pid);
4437 unlock_recovery(ls);
4438 }
4439 return error;
4440}
4441
This page took 0.382493 seconds and 5 git commands to generate.