dlm: avoid unnecessary search in search_rsb
[deliverable/linux.git] / fs / dlm / lock.c
CommitLineData
e7fd4179
DT
1/******************************************************************************
2*******************************************************************************
3**
7fe2b319 4** Copyright (C) 2005-2010 Red Hat, Inc. All rights reserved.
e7fd4179
DT
5**
6** This copyrighted material is made available to anyone wishing to use,
7** modify, copy, or redistribute it subject to the terms and conditions
8** of the GNU General Public License v.2.
9**
10*******************************************************************************
11******************************************************************************/
12
13/* Central locking logic has four stages:
14
15 dlm_lock()
16 dlm_unlock()
17
18 request_lock(ls, lkb)
19 convert_lock(ls, lkb)
20 unlock_lock(ls, lkb)
21 cancel_lock(ls, lkb)
22
23 _request_lock(r, lkb)
24 _convert_lock(r, lkb)
25 _unlock_lock(r, lkb)
26 _cancel_lock(r, lkb)
27
28 do_request(r, lkb)
29 do_convert(r, lkb)
30 do_unlock(r, lkb)
31 do_cancel(r, lkb)
32
33 Stage 1 (lock, unlock) is mainly about checking input args and
34 splitting into one of the four main operations:
35
36 dlm_lock = request_lock
37 dlm_lock+CONVERT = convert_lock
38 dlm_unlock = unlock_lock
39 dlm_unlock+CANCEL = cancel_lock
40
41 Stage 2, xxxx_lock(), just finds and locks the relevant rsb which is
42 provided to the next stage.
43
44 Stage 3, _xxxx_lock(), determines if the operation is local or remote.
45 When remote, it calls send_xxxx(), when local it calls do_xxxx().
46
47 Stage 4, do_xxxx(), is the guts of the operation. It manipulates the
48 given rsb and lkb and queues callbacks.
49
50 For remote operations, send_xxxx() results in the corresponding do_xxxx()
51 function being executed on the remote node. The connecting send/receive
52 calls on local (L) and remote (R) nodes:
53
54 L: send_xxxx() -> R: receive_xxxx()
55 R: do_xxxx()
56 L: receive_xxxx_reply() <- R: send_xxxx_reply()
57*/
597d0cae 58#include <linux/types.h>
9beb3bf5 59#include <linux/rbtree.h>
5a0e3ad6 60#include <linux/slab.h>
e7fd4179 61#include "dlm_internal.h"
597d0cae 62#include <linux/dlm_device.h>
e7fd4179
DT
63#include "memory.h"
64#include "lowcomms.h"
65#include "requestqueue.h"
66#include "util.h"
67#include "dir.h"
68#include "member.h"
69#include "lockspace.h"
70#include "ast.h"
71#include "lock.h"
72#include "rcom.h"
73#include "recover.h"
74#include "lvb_table.h"
597d0cae 75#include "user.h"
e7fd4179
DT
76#include "config.h"
77
78static int send_request(struct dlm_rsb *r, struct dlm_lkb *lkb);
79static int send_convert(struct dlm_rsb *r, struct dlm_lkb *lkb);
80static int send_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb);
81static int send_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb);
82static int send_grant(struct dlm_rsb *r, struct dlm_lkb *lkb);
83static int send_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int mode);
84static int send_lookup(struct dlm_rsb *r, struct dlm_lkb *lkb);
85static int send_remove(struct dlm_rsb *r);
86static int _request_lock(struct dlm_rsb *r, struct dlm_lkb *lkb);
3ae1acf9 87static int _cancel_lock(struct dlm_rsb *r, struct dlm_lkb *lkb);
e7fd4179
DT
88static void __receive_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
89 struct dlm_message *ms);
90static int receive_extralen(struct dlm_message *ms);
8499137d 91static void do_purge(struct dlm_ls *ls, int nodeid, int pid);
3ae1acf9 92static void del_timeout(struct dlm_lkb *lkb);
e7fd4179
DT
93
94/*
95 * Lock compatibilty matrix - thanks Steve
96 * UN = Unlocked state. Not really a state, used as a flag
97 * PD = Padding. Used to make the matrix a nice power of two in size
98 * Other states are the same as the VMS DLM.
99 * Usage: matrix[grmode+1][rqmode+1] (although m[rq+1][gr+1] is the same)
100 */
101
102static const int __dlm_compat_matrix[8][8] = {
103 /* UN NL CR CW PR PW EX PD */
104 {1, 1, 1, 1, 1, 1, 1, 0}, /* UN */
105 {1, 1, 1, 1, 1, 1, 1, 0}, /* NL */
106 {1, 1, 1, 1, 1, 1, 0, 0}, /* CR */
107 {1, 1, 1, 1, 0, 0, 0, 0}, /* CW */
108 {1, 1, 1, 0, 1, 0, 0, 0}, /* PR */
109 {1, 1, 1, 0, 0, 0, 0, 0}, /* PW */
110 {1, 1, 0, 0, 0, 0, 0, 0}, /* EX */
111 {0, 0, 0, 0, 0, 0, 0, 0} /* PD */
112};
113
114/*
115 * This defines the direction of transfer of LVB data.
116 * Granted mode is the row; requested mode is the column.
117 * Usage: matrix[grmode+1][rqmode+1]
118 * 1 = LVB is returned to the caller
119 * 0 = LVB is written to the resource
120 * -1 = nothing happens to the LVB
121 */
122
123const int dlm_lvb_operations[8][8] = {
124 /* UN NL CR CW PR PW EX PD*/
125 { -1, 1, 1, 1, 1, 1, 1, -1 }, /* UN */
126 { -1, 1, 1, 1, 1, 1, 1, 0 }, /* NL */
127 { -1, -1, 1, 1, 1, 1, 1, 0 }, /* CR */
128 { -1, -1, -1, 1, 1, 1, 1, 0 }, /* CW */
129 { -1, -1, -1, -1, 1, 1, 1, 0 }, /* PR */
130 { -1, 0, 0, 0, 0, 0, 1, 0 }, /* PW */
131 { -1, 0, 0, 0, 0, 0, 0, 0 }, /* EX */
132 { -1, 0, 0, 0, 0, 0, 0, 0 } /* PD */
133};
e7fd4179
DT
134
135#define modes_compat(gr, rq) \
136 __dlm_compat_matrix[(gr)->lkb_grmode + 1][(rq)->lkb_rqmode + 1]
137
138int dlm_modes_compat(int mode1, int mode2)
139{
140 return __dlm_compat_matrix[mode1 + 1][mode2 + 1];
141}
142
143/*
144 * Compatibility matrix for conversions with QUECVT set.
145 * Granted mode is the row; requested mode is the column.
146 * Usage: matrix[grmode+1][rqmode+1]
147 */
148
149static const int __quecvt_compat_matrix[8][8] = {
150 /* UN NL CR CW PR PW EX PD */
151 {0, 0, 0, 0, 0, 0, 0, 0}, /* UN */
152 {0, 0, 1, 1, 1, 1, 1, 0}, /* NL */
153 {0, 0, 0, 1, 1, 1, 1, 0}, /* CR */
154 {0, 0, 0, 0, 1, 1, 1, 0}, /* CW */
155 {0, 0, 0, 1, 0, 1, 1, 0}, /* PR */
156 {0, 0, 0, 0, 0, 0, 1, 0}, /* PW */
157 {0, 0, 0, 0, 0, 0, 0, 0}, /* EX */
158 {0, 0, 0, 0, 0, 0, 0, 0} /* PD */
159};
160
597d0cae 161void dlm_print_lkb(struct dlm_lkb *lkb)
e7fd4179
DT
162{
163 printk(KERN_ERR "lkb: nodeid %d id %x remid %x exflags %x flags %x\n"
8304d6f2 164 " status %d rqmode %d grmode %d wait_type %d\n",
e7fd4179
DT
165 lkb->lkb_nodeid, lkb->lkb_id, lkb->lkb_remid, lkb->lkb_exflags,
166 lkb->lkb_flags, lkb->lkb_status, lkb->lkb_rqmode,
8304d6f2 167 lkb->lkb_grmode, lkb->lkb_wait_type);
e7fd4179
DT
168}
169
170e19ab 170static void dlm_print_rsb(struct dlm_rsb *r)
e7fd4179
DT
171{
172 printk(KERN_ERR "rsb: nodeid %d flags %lx first %x rlc %d name %s\n",
173 r->res_nodeid, r->res_flags, r->res_first_lkid,
174 r->res_recover_locks_count, r->res_name);
175}
176
a345da3e
DT
177void dlm_dump_rsb(struct dlm_rsb *r)
178{
179 struct dlm_lkb *lkb;
180
181 dlm_print_rsb(r);
182
183 printk(KERN_ERR "rsb: root_list empty %d recover_list empty %d\n",
184 list_empty(&r->res_root_list), list_empty(&r->res_recover_list));
185 printk(KERN_ERR "rsb lookup list\n");
186 list_for_each_entry(lkb, &r->res_lookup, lkb_rsb_lookup)
187 dlm_print_lkb(lkb);
188 printk(KERN_ERR "rsb grant queue:\n");
189 list_for_each_entry(lkb, &r->res_grantqueue, lkb_statequeue)
190 dlm_print_lkb(lkb);
191 printk(KERN_ERR "rsb convert queue:\n");
192 list_for_each_entry(lkb, &r->res_convertqueue, lkb_statequeue)
193 dlm_print_lkb(lkb);
194 printk(KERN_ERR "rsb wait queue:\n");
195 list_for_each_entry(lkb, &r->res_waitqueue, lkb_statequeue)
196 dlm_print_lkb(lkb);
197}
198
e7fd4179
DT
199/* Threads cannot use the lockspace while it's being recovered */
200
85e86edf 201static inline void dlm_lock_recovery(struct dlm_ls *ls)
e7fd4179
DT
202{
203 down_read(&ls->ls_in_recovery);
204}
205
85e86edf 206void dlm_unlock_recovery(struct dlm_ls *ls)
e7fd4179
DT
207{
208 up_read(&ls->ls_in_recovery);
209}
210
85e86edf 211int dlm_lock_recovery_try(struct dlm_ls *ls)
e7fd4179
DT
212{
213 return down_read_trylock(&ls->ls_in_recovery);
214}
215
216static inline int can_be_queued(struct dlm_lkb *lkb)
217{
218 return !(lkb->lkb_exflags & DLM_LKF_NOQUEUE);
219}
220
221static inline int force_blocking_asts(struct dlm_lkb *lkb)
222{
223 return (lkb->lkb_exflags & DLM_LKF_NOQUEUEBAST);
224}
225
226static inline int is_demoted(struct dlm_lkb *lkb)
227{
228 return (lkb->lkb_sbflags & DLM_SBF_DEMOTED);
229}
230
7d3c1feb
DT
231static inline int is_altmode(struct dlm_lkb *lkb)
232{
233 return (lkb->lkb_sbflags & DLM_SBF_ALTMODE);
234}
235
236static inline int is_granted(struct dlm_lkb *lkb)
237{
238 return (lkb->lkb_status == DLM_LKSTS_GRANTED);
239}
240
e7fd4179
DT
241static inline int is_remote(struct dlm_rsb *r)
242{
243 DLM_ASSERT(r->res_nodeid >= 0, dlm_print_rsb(r););
244 return !!r->res_nodeid;
245}
246
247static inline int is_process_copy(struct dlm_lkb *lkb)
248{
249 return (lkb->lkb_nodeid && !(lkb->lkb_flags & DLM_IFL_MSTCPY));
250}
251
252static inline int is_master_copy(struct dlm_lkb *lkb)
253{
254 if (lkb->lkb_flags & DLM_IFL_MSTCPY)
255 DLM_ASSERT(lkb->lkb_nodeid, dlm_print_lkb(lkb););
90135925 256 return (lkb->lkb_flags & DLM_IFL_MSTCPY) ? 1 : 0;
e7fd4179
DT
257}
258
259static inline int middle_conversion(struct dlm_lkb *lkb)
260{
261 if ((lkb->lkb_grmode==DLM_LOCK_PR && lkb->lkb_rqmode==DLM_LOCK_CW) ||
262 (lkb->lkb_rqmode==DLM_LOCK_PR && lkb->lkb_grmode==DLM_LOCK_CW))
90135925
DT
263 return 1;
264 return 0;
e7fd4179
DT
265}
266
267static inline int down_conversion(struct dlm_lkb *lkb)
268{
269 return (!middle_conversion(lkb) && lkb->lkb_rqmode < lkb->lkb_grmode);
270}
271
ef0c2bb0
DT
272static inline int is_overlap_unlock(struct dlm_lkb *lkb)
273{
274 return lkb->lkb_flags & DLM_IFL_OVERLAP_UNLOCK;
275}
276
277static inline int is_overlap_cancel(struct dlm_lkb *lkb)
278{
279 return lkb->lkb_flags & DLM_IFL_OVERLAP_CANCEL;
280}
281
282static inline int is_overlap(struct dlm_lkb *lkb)
283{
284 return (lkb->lkb_flags & (DLM_IFL_OVERLAP_UNLOCK |
285 DLM_IFL_OVERLAP_CANCEL));
286}
287
e7fd4179
DT
288static void queue_cast(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
289{
290 if (is_master_copy(lkb))
291 return;
292
3ae1acf9
DT
293 del_timeout(lkb);
294
e7fd4179
DT
295 DLM_ASSERT(lkb->lkb_lksb, dlm_print_lkb(lkb););
296
3ae1acf9
DT
297 /* if the operation was a cancel, then return -DLM_ECANCEL, if a
298 timeout caused the cancel then return -ETIMEDOUT */
299 if (rv == -DLM_ECANCEL && (lkb->lkb_flags & DLM_IFL_TIMEOUT_CANCEL)) {
300 lkb->lkb_flags &= ~DLM_IFL_TIMEOUT_CANCEL;
301 rv = -ETIMEDOUT;
302 }
303
8b4021fa
DT
304 if (rv == -DLM_ECANCEL && (lkb->lkb_flags & DLM_IFL_DEADLOCK_CANCEL)) {
305 lkb->lkb_flags &= ~DLM_IFL_DEADLOCK_CANCEL;
306 rv = -EDEADLK;
307 }
308
23e8e1aa 309 dlm_add_cb(lkb, DLM_CB_CAST, lkb->lkb_grmode, rv, lkb->lkb_sbflags);
e7fd4179
DT
310}
311
ef0c2bb0
DT
312static inline void queue_cast_overlap(struct dlm_rsb *r, struct dlm_lkb *lkb)
313{
314 queue_cast(r, lkb,
315 is_overlap_unlock(lkb) ? -DLM_EUNLOCK : -DLM_ECANCEL);
316}
317
e7fd4179
DT
318static void queue_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int rqmode)
319{
b6fa8796 320 if (is_master_copy(lkb)) {
e7fd4179 321 send_bast(r, lkb, rqmode);
b6fa8796 322 } else {
23e8e1aa 323 dlm_add_cb(lkb, DLM_CB_BAST, rqmode, 0, 0);
b6fa8796 324 }
e7fd4179
DT
325}
326
327/*
328 * Basic operations on rsb's and lkb's
329 */
330
3881ac04
DT
331static int pre_rsb_struct(struct dlm_ls *ls)
332{
333 struct dlm_rsb *r1, *r2;
334 int count = 0;
335
336 spin_lock(&ls->ls_new_rsb_spin);
337 if (ls->ls_new_rsb_count > dlm_config.ci_new_rsb_count / 2) {
338 spin_unlock(&ls->ls_new_rsb_spin);
339 return 0;
340 }
341 spin_unlock(&ls->ls_new_rsb_spin);
342
343 r1 = dlm_allocate_rsb(ls);
344 r2 = dlm_allocate_rsb(ls);
345
346 spin_lock(&ls->ls_new_rsb_spin);
347 if (r1) {
348 list_add(&r1->res_hashchain, &ls->ls_new_rsb);
349 ls->ls_new_rsb_count++;
350 }
351 if (r2) {
352 list_add(&r2->res_hashchain, &ls->ls_new_rsb);
353 ls->ls_new_rsb_count++;
354 }
355 count = ls->ls_new_rsb_count;
356 spin_unlock(&ls->ls_new_rsb_spin);
357
358 if (!count)
359 return -ENOMEM;
360 return 0;
361}
362
363/* If ls->ls_new_rsb is empty, return -EAGAIN, so the caller can
364 unlock any spinlocks, go back and call pre_rsb_struct again.
365 Otherwise, take an rsb off the list and return it. */
366
367static int get_rsb_struct(struct dlm_ls *ls, char *name, int len,
368 struct dlm_rsb **r_ret)
e7fd4179
DT
369{
370 struct dlm_rsb *r;
3881ac04
DT
371 int count;
372
373 spin_lock(&ls->ls_new_rsb_spin);
374 if (list_empty(&ls->ls_new_rsb)) {
375 count = ls->ls_new_rsb_count;
376 spin_unlock(&ls->ls_new_rsb_spin);
377 log_debug(ls, "find_rsb retry %d %d %s",
378 count, dlm_config.ci_new_rsb_count, name);
379 return -EAGAIN;
380 }
e7fd4179 381
3881ac04
DT
382 r = list_first_entry(&ls->ls_new_rsb, struct dlm_rsb, res_hashchain);
383 list_del(&r->res_hashchain);
9beb3bf5
BP
384 /* Convert the empty list_head to a NULL rb_node for tree usage: */
385 memset(&r->res_hashnode, 0, sizeof(struct rb_node));
3881ac04
DT
386 ls->ls_new_rsb_count--;
387 spin_unlock(&ls->ls_new_rsb_spin);
e7fd4179
DT
388
389 r->res_ls = ls;
390 r->res_length = len;
391 memcpy(r->res_name, name, len);
90135925 392 mutex_init(&r->res_mutex);
e7fd4179
DT
393
394 INIT_LIST_HEAD(&r->res_lookup);
395 INIT_LIST_HEAD(&r->res_grantqueue);
396 INIT_LIST_HEAD(&r->res_convertqueue);
397 INIT_LIST_HEAD(&r->res_waitqueue);
398 INIT_LIST_HEAD(&r->res_root_list);
399 INIT_LIST_HEAD(&r->res_recover_list);
400
3881ac04
DT
401 *r_ret = r;
402 return 0;
e7fd4179
DT
403}
404
9beb3bf5
BP
405static int rsb_cmp(struct dlm_rsb *r, const char *name, int nlen)
406{
407 char maxname[DLM_RESNAME_MAXLEN];
408
409 memset(maxname, 0, DLM_RESNAME_MAXLEN);
410 memcpy(maxname, name, nlen);
411 return memcmp(r->res_name, maxname, DLM_RESNAME_MAXLEN);
412}
413
7210cb7a
DT
414int dlm_search_rsb_tree(struct rb_root *tree, char *name, int len,
415 unsigned int flags, struct dlm_rsb **r_ret)
e7fd4179 416{
9beb3bf5 417 struct rb_node *node = tree->rb_node;
e7fd4179
DT
418 struct dlm_rsb *r;
419 int error = 0;
9beb3bf5
BP
420 int rc;
421
422 while (node) {
423 r = rb_entry(node, struct dlm_rsb, res_hashnode);
424 rc = rsb_cmp(r, name, len);
425 if (rc < 0)
426 node = node->rb_left;
427 else if (rc > 0)
428 node = node->rb_right;
429 else
e7fd4179
DT
430 goto found;
431 }
18c60c0a 432 *r_ret = NULL;
597d0cae 433 return -EBADR;
e7fd4179
DT
434
435 found:
436 if (r->res_nodeid && (flags & R_MASTER))
437 error = -ENOTBLK;
438 *r_ret = r;
439 return error;
440}
441
9beb3bf5
BP
442static int rsb_insert(struct dlm_rsb *rsb, struct rb_root *tree)
443{
444 struct rb_node **newn = &tree->rb_node;
445 struct rb_node *parent = NULL;
446 int rc;
447
448 while (*newn) {
449 struct dlm_rsb *cur = rb_entry(*newn, struct dlm_rsb,
450 res_hashnode);
451
452 parent = *newn;
453 rc = rsb_cmp(cur, rsb->res_name, rsb->res_length);
454 if (rc < 0)
455 newn = &parent->rb_left;
456 else if (rc > 0)
457 newn = &parent->rb_right;
458 else {
459 log_print("rsb_insert match");
460 dlm_dump_rsb(rsb);
461 dlm_dump_rsb(cur);
462 return -EEXIST;
463 }
464 }
465
466 rb_link_node(&rsb->res_hashnode, parent, newn);
467 rb_insert_color(&rsb->res_hashnode, tree);
468 return 0;
469}
470
e7fd4179
DT
471static int _search_rsb(struct dlm_ls *ls, char *name, int len, int b,
472 unsigned int flags, struct dlm_rsb **r_ret)
473{
474 struct dlm_rsb *r;
475 int error;
476
7210cb7a 477 error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].keep, name, len, flags, &r);
e7fd4179
DT
478 if (!error) {
479 kref_get(&r->res_ref);
480 goto out;
481 }
57638bf3
DT
482 if (error == -ENOTBLK)
483 goto out;
484
7210cb7a 485 error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].toss, name, len, flags, &r);
e7fd4179
DT
486 if (error)
487 goto out;
488
9beb3bf5
BP
489 rb_erase(&r->res_hashnode, &ls->ls_rsbtbl[b].toss);
490 error = rsb_insert(r, &ls->ls_rsbtbl[b].keep);
491 if (error)
492 return error;
e7fd4179
DT
493
494 if (dlm_no_directory(ls))
495 goto out;
496
497 if (r->res_nodeid == -1) {
498 rsb_clear_flag(r, RSB_MASTER_UNCERTAIN);
499 r->res_first_lkid = 0;
500 } else if (r->res_nodeid > 0) {
501 rsb_set_flag(r, RSB_MASTER_UNCERTAIN);
502 r->res_first_lkid = 0;
503 } else {
504 DLM_ASSERT(r->res_nodeid == 0, dlm_print_rsb(r););
505 DLM_ASSERT(!rsb_flag(r, RSB_MASTER_UNCERTAIN),);
506 }
507 out:
508 *r_ret = r;
509 return error;
510}
511
e7fd4179
DT
512/*
513 * Find rsb in rsbtbl and potentially create/add one
514 *
515 * Delaying the release of rsb's has a similar benefit to applications keeping
516 * NL locks on an rsb, but without the guarantee that the cached master value
517 * will still be valid when the rsb is reused. Apps aren't always smart enough
518 * to keep NL locks on an rsb that they may lock again shortly; this can lead
519 * to excessive master lookups and removals if we don't delay the release.
520 *
521 * Searching for an rsb means looking through both the normal list and toss
522 * list. When found on the toss list the rsb is moved to the normal list with
523 * ref count of 1; when found on normal list the ref count is incremented.
524 */
525
526static int find_rsb(struct dlm_ls *ls, char *name, int namelen,
527 unsigned int flags, struct dlm_rsb **r_ret)
528{
3881ac04 529 struct dlm_rsb *r = NULL;
e7fd4179 530 uint32_t hash, bucket;
3881ac04 531 int error;
ef58bcca 532
3881ac04
DT
533 if (namelen > DLM_RESNAME_MAXLEN) {
534 error = -EINVAL;
ef58bcca 535 goto out;
3881ac04 536 }
e7fd4179
DT
537
538 if (dlm_no_directory(ls))
539 flags |= R_CREATE;
540
541 hash = jhash(name, namelen, 0);
542 bucket = hash & (ls->ls_rsbtbl_size - 1);
543
3881ac04
DT
544 retry:
545 if (flags & R_CREATE) {
546 error = pre_rsb_struct(ls);
547 if (error < 0)
548 goto out;
549 }
550
551 spin_lock(&ls->ls_rsbtbl[bucket].lock);
552
553 error = _search_rsb(ls, name, namelen, bucket, flags, &r);
e7fd4179 554 if (!error)
3881ac04 555 goto out_unlock;
e7fd4179 556
597d0cae 557 if (error == -EBADR && !(flags & R_CREATE))
3881ac04 558 goto out_unlock;
e7fd4179
DT
559
560 /* the rsb was found but wasn't a master copy */
561 if (error == -ENOTBLK)
3881ac04 562 goto out_unlock;
e7fd4179 563
3881ac04
DT
564 error = get_rsb_struct(ls, name, namelen, &r);
565 if (error == -EAGAIN) {
566 spin_unlock(&ls->ls_rsbtbl[bucket].lock);
567 goto retry;
568 }
569 if (error)
570 goto out_unlock;
e7fd4179
DT
571
572 r->res_hash = hash;
573 r->res_bucket = bucket;
574 r->res_nodeid = -1;
575 kref_init(&r->res_ref);
576
577 /* With no directory, the master can be set immediately */
578 if (dlm_no_directory(ls)) {
579 int nodeid = dlm_dir_nodeid(r);
580 if (nodeid == dlm_our_nodeid())
581 nodeid = 0;
582 r->res_nodeid = nodeid;
583 }
9beb3bf5 584 error = rsb_insert(r, &ls->ls_rsbtbl[bucket].keep);
3881ac04
DT
585 out_unlock:
586 spin_unlock(&ls->ls_rsbtbl[bucket].lock);
e7fd4179
DT
587 out:
588 *r_ret = r;
589 return error;
590}
591
e7fd4179
DT
592/* This is only called to add a reference when the code already holds
593 a valid reference to the rsb, so there's no need for locking. */
594
595static inline void hold_rsb(struct dlm_rsb *r)
596{
597 kref_get(&r->res_ref);
598}
599
600void dlm_hold_rsb(struct dlm_rsb *r)
601{
602 hold_rsb(r);
603}
604
605static void toss_rsb(struct kref *kref)
606{
607 struct dlm_rsb *r = container_of(kref, struct dlm_rsb, res_ref);
608 struct dlm_ls *ls = r->res_ls;
609
610 DLM_ASSERT(list_empty(&r->res_root_list), dlm_print_rsb(r););
611 kref_init(&r->res_ref);
9beb3bf5
BP
612 rb_erase(&r->res_hashnode, &ls->ls_rsbtbl[r->res_bucket].keep);
613 rsb_insert(r, &ls->ls_rsbtbl[r->res_bucket].toss);
e7fd4179
DT
614 r->res_toss_time = jiffies;
615 if (r->res_lvbptr) {
52bda2b5 616 dlm_free_lvb(r->res_lvbptr);
e7fd4179
DT
617 r->res_lvbptr = NULL;
618 }
619}
620
25985edc 621/* When all references to the rsb are gone it's transferred to
e7fd4179
DT
622 the tossed list for later disposal. */
623
624static void put_rsb(struct dlm_rsb *r)
625{
626 struct dlm_ls *ls = r->res_ls;
627 uint32_t bucket = r->res_bucket;
628
c7be761a 629 spin_lock(&ls->ls_rsbtbl[bucket].lock);
e7fd4179 630 kref_put(&r->res_ref, toss_rsb);
c7be761a 631 spin_unlock(&ls->ls_rsbtbl[bucket].lock);
e7fd4179
DT
632}
633
634void dlm_put_rsb(struct dlm_rsb *r)
635{
636 put_rsb(r);
637}
638
639/* See comment for unhold_lkb */
640
641static void unhold_rsb(struct dlm_rsb *r)
642{
643 int rv;
644 rv = kref_put(&r->res_ref, toss_rsb);
a345da3e 645 DLM_ASSERT(!rv, dlm_dump_rsb(r););
e7fd4179
DT
646}
647
648static void kill_rsb(struct kref *kref)
649{
650 struct dlm_rsb *r = container_of(kref, struct dlm_rsb, res_ref);
651
652 /* All work is done after the return from kref_put() so we
653 can release the write_lock before the remove and free. */
654
a345da3e
DT
655 DLM_ASSERT(list_empty(&r->res_lookup), dlm_dump_rsb(r););
656 DLM_ASSERT(list_empty(&r->res_grantqueue), dlm_dump_rsb(r););
657 DLM_ASSERT(list_empty(&r->res_convertqueue), dlm_dump_rsb(r););
658 DLM_ASSERT(list_empty(&r->res_waitqueue), dlm_dump_rsb(r););
659 DLM_ASSERT(list_empty(&r->res_root_list), dlm_dump_rsb(r););
660 DLM_ASSERT(list_empty(&r->res_recover_list), dlm_dump_rsb(r););
e7fd4179
DT
661}
662
663/* Attaching/detaching lkb's from rsb's is for rsb reference counting.
664 The rsb must exist as long as any lkb's for it do. */
665
666static void attach_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb)
667{
668 hold_rsb(r);
669 lkb->lkb_resource = r;
670}
671
672static void detach_lkb(struct dlm_lkb *lkb)
673{
674 if (lkb->lkb_resource) {
675 put_rsb(lkb->lkb_resource);
676 lkb->lkb_resource = NULL;
677 }
678}
679
680static int create_lkb(struct dlm_ls *ls, struct dlm_lkb **lkb_ret)
681{
3d6aa675
DT
682 struct dlm_lkb *lkb;
683 int rv, id;
e7fd4179 684
52bda2b5 685 lkb = dlm_allocate_lkb(ls);
e7fd4179
DT
686 if (!lkb)
687 return -ENOMEM;
688
689 lkb->lkb_nodeid = -1;
690 lkb->lkb_grmode = DLM_LOCK_IV;
691 kref_init(&lkb->lkb_ref);
34e22bed 692 INIT_LIST_HEAD(&lkb->lkb_ownqueue);
ef0c2bb0 693 INIT_LIST_HEAD(&lkb->lkb_rsb_lookup);
3ae1acf9 694 INIT_LIST_HEAD(&lkb->lkb_time_list);
23e8e1aa
DT
695 INIT_LIST_HEAD(&lkb->lkb_cb_list);
696 mutex_init(&lkb->lkb_cb_mutex);
697 INIT_WORK(&lkb->lkb_cb_work, dlm_callback_work);
e7fd4179 698
3d6aa675
DT
699 retry:
700 rv = idr_pre_get(&ls->ls_lkbidr, GFP_NOFS);
701 if (!rv)
702 return -ENOMEM;
e7fd4179 703
3d6aa675
DT
704 spin_lock(&ls->ls_lkbidr_spin);
705 rv = idr_get_new_above(&ls->ls_lkbidr, lkb, 1, &id);
706 if (!rv)
707 lkb->lkb_id = id;
708 spin_unlock(&ls->ls_lkbidr_spin);
e7fd4179 709
3d6aa675
DT
710 if (rv == -EAGAIN)
711 goto retry;
e7fd4179 712
3d6aa675
DT
713 if (rv < 0) {
714 log_error(ls, "create_lkb idr error %d", rv);
715 return rv;
e7fd4179
DT
716 }
717
e7fd4179
DT
718 *lkb_ret = lkb;
719 return 0;
720}
721
e7fd4179
DT
722static int find_lkb(struct dlm_ls *ls, uint32_t lkid, struct dlm_lkb **lkb_ret)
723{
724 struct dlm_lkb *lkb;
e7fd4179 725
3d6aa675
DT
726 spin_lock(&ls->ls_lkbidr_spin);
727 lkb = idr_find(&ls->ls_lkbidr, lkid);
e7fd4179
DT
728 if (lkb)
729 kref_get(&lkb->lkb_ref);
3d6aa675 730 spin_unlock(&ls->ls_lkbidr_spin);
e7fd4179
DT
731
732 *lkb_ret = lkb;
733 return lkb ? 0 : -ENOENT;
734}
735
736static void kill_lkb(struct kref *kref)
737{
738 struct dlm_lkb *lkb = container_of(kref, struct dlm_lkb, lkb_ref);
739
740 /* All work is done after the return from kref_put() so we
741 can release the write_lock before the detach_lkb */
742
743 DLM_ASSERT(!lkb->lkb_status, dlm_print_lkb(lkb););
744}
745
b3f58d8f
DT
746/* __put_lkb() is used when an lkb may not have an rsb attached to
747 it so we need to provide the lockspace explicitly */
748
749static int __put_lkb(struct dlm_ls *ls, struct dlm_lkb *lkb)
e7fd4179 750{
3d6aa675 751 uint32_t lkid = lkb->lkb_id;
e7fd4179 752
3d6aa675 753 spin_lock(&ls->ls_lkbidr_spin);
e7fd4179 754 if (kref_put(&lkb->lkb_ref, kill_lkb)) {
3d6aa675
DT
755 idr_remove(&ls->ls_lkbidr, lkid);
756 spin_unlock(&ls->ls_lkbidr_spin);
e7fd4179
DT
757
758 detach_lkb(lkb);
759
760 /* for local/process lkbs, lvbptr points to caller's lksb */
761 if (lkb->lkb_lvbptr && is_master_copy(lkb))
52bda2b5
DT
762 dlm_free_lvb(lkb->lkb_lvbptr);
763 dlm_free_lkb(lkb);
e7fd4179
DT
764 return 1;
765 } else {
3d6aa675 766 spin_unlock(&ls->ls_lkbidr_spin);
e7fd4179
DT
767 return 0;
768 }
769}
770
771int dlm_put_lkb(struct dlm_lkb *lkb)
772{
b3f58d8f
DT
773 struct dlm_ls *ls;
774
775 DLM_ASSERT(lkb->lkb_resource, dlm_print_lkb(lkb););
776 DLM_ASSERT(lkb->lkb_resource->res_ls, dlm_print_lkb(lkb););
777
778 ls = lkb->lkb_resource->res_ls;
779 return __put_lkb(ls, lkb);
e7fd4179
DT
780}
781
782/* This is only called to add a reference when the code already holds
783 a valid reference to the lkb, so there's no need for locking. */
784
785static inline void hold_lkb(struct dlm_lkb *lkb)
786{
787 kref_get(&lkb->lkb_ref);
788}
789
790/* This is called when we need to remove a reference and are certain
791 it's not the last ref. e.g. del_lkb is always called between a
792 find_lkb/put_lkb and is always the inverse of a previous add_lkb.
793 put_lkb would work fine, but would involve unnecessary locking */
794
795static inline void unhold_lkb(struct dlm_lkb *lkb)
796{
797 int rv;
798 rv = kref_put(&lkb->lkb_ref, kill_lkb);
799 DLM_ASSERT(!rv, dlm_print_lkb(lkb););
800}
801
802static void lkb_add_ordered(struct list_head *new, struct list_head *head,
803 int mode)
804{
805 struct dlm_lkb *lkb = NULL;
806
807 list_for_each_entry(lkb, head, lkb_statequeue)
808 if (lkb->lkb_rqmode < mode)
809 break;
810
99fb19d4 811 __list_add(new, lkb->lkb_statequeue.prev, &lkb->lkb_statequeue);
e7fd4179
DT
812}
813
814/* add/remove lkb to rsb's grant/convert/wait queue */
815
816static void add_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb, int status)
817{
818 kref_get(&lkb->lkb_ref);
819
820 DLM_ASSERT(!lkb->lkb_status, dlm_print_lkb(lkb););
821
eeda418d
DT
822 lkb->lkb_timestamp = ktime_get();
823
e7fd4179
DT
824 lkb->lkb_status = status;
825
826 switch (status) {
827 case DLM_LKSTS_WAITING:
828 if (lkb->lkb_exflags & DLM_LKF_HEADQUE)
829 list_add(&lkb->lkb_statequeue, &r->res_waitqueue);
830 else
831 list_add_tail(&lkb->lkb_statequeue, &r->res_waitqueue);
832 break;
833 case DLM_LKSTS_GRANTED:
834 /* convention says granted locks kept in order of grmode */
835 lkb_add_ordered(&lkb->lkb_statequeue, &r->res_grantqueue,
836 lkb->lkb_grmode);
837 break;
838 case DLM_LKSTS_CONVERT:
839 if (lkb->lkb_exflags & DLM_LKF_HEADQUE)
840 list_add(&lkb->lkb_statequeue, &r->res_convertqueue);
841 else
842 list_add_tail(&lkb->lkb_statequeue,
843 &r->res_convertqueue);
844 break;
845 default:
846 DLM_ASSERT(0, dlm_print_lkb(lkb); printk("sts=%d\n", status););
847 }
848}
849
850static void del_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb)
851{
852 lkb->lkb_status = 0;
853 list_del(&lkb->lkb_statequeue);
854 unhold_lkb(lkb);
855}
856
857static void move_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb, int sts)
858{
859 hold_lkb(lkb);
860 del_lkb(r, lkb);
861 add_lkb(r, lkb, sts);
862 unhold_lkb(lkb);
863}
864
ef0c2bb0
DT
865static int msg_reply_type(int mstype)
866{
867 switch (mstype) {
868 case DLM_MSG_REQUEST:
869 return DLM_MSG_REQUEST_REPLY;
870 case DLM_MSG_CONVERT:
871 return DLM_MSG_CONVERT_REPLY;
872 case DLM_MSG_UNLOCK:
873 return DLM_MSG_UNLOCK_REPLY;
874 case DLM_MSG_CANCEL:
875 return DLM_MSG_CANCEL_REPLY;
876 case DLM_MSG_LOOKUP:
877 return DLM_MSG_LOOKUP_REPLY;
878 }
879 return -1;
880}
881
c6ff669b
DT
882static int nodeid_warned(int nodeid, int num_nodes, int *warned)
883{
884 int i;
885
886 for (i = 0; i < num_nodes; i++) {
887 if (!warned[i]) {
888 warned[i] = nodeid;
889 return 0;
890 }
891 if (warned[i] == nodeid)
892 return 1;
893 }
894 return 0;
895}
896
897void dlm_scan_waiters(struct dlm_ls *ls)
898{
899 struct dlm_lkb *lkb;
900 ktime_t zero = ktime_set(0, 0);
901 s64 us;
902 s64 debug_maxus = 0;
903 u32 debug_scanned = 0;
904 u32 debug_expired = 0;
905 int num_nodes = 0;
906 int *warned = NULL;
907
908 if (!dlm_config.ci_waitwarn_us)
909 return;
910
911 mutex_lock(&ls->ls_waiters_mutex);
912
913 list_for_each_entry(lkb, &ls->ls_waiters, lkb_wait_reply) {
914 if (ktime_equal(lkb->lkb_wait_time, zero))
915 continue;
916
917 debug_scanned++;
918
919 us = ktime_to_us(ktime_sub(ktime_get(), lkb->lkb_wait_time));
920
921 if (us < dlm_config.ci_waitwarn_us)
922 continue;
923
924 lkb->lkb_wait_time = zero;
925
926 debug_expired++;
927 if (us > debug_maxus)
928 debug_maxus = us;
929
930 if (!num_nodes) {
931 num_nodes = ls->ls_num_nodes;
5d70828a 932 warned = kzalloc(num_nodes * sizeof(int), GFP_KERNEL);
c6ff669b
DT
933 }
934 if (!warned)
935 continue;
936 if (nodeid_warned(lkb->lkb_wait_nodeid, num_nodes, warned))
937 continue;
938
939 log_error(ls, "waitwarn %x %lld %d us check connection to "
940 "node %d", lkb->lkb_id, (long long)us,
941 dlm_config.ci_waitwarn_us, lkb->lkb_wait_nodeid);
942 }
943 mutex_unlock(&ls->ls_waiters_mutex);
5d70828a 944 kfree(warned);
c6ff669b
DT
945
946 if (debug_expired)
947 log_debug(ls, "scan_waiters %u warn %u over %d us max %lld us",
948 debug_scanned, debug_expired,
949 dlm_config.ci_waitwarn_us, (long long)debug_maxus);
950}
951
e7fd4179
DT
952/* add/remove lkb from global waiters list of lkb's waiting for
953 a reply from a remote node */
954
c6ff669b 955static int add_to_waiters(struct dlm_lkb *lkb, int mstype, int to_nodeid)
e7fd4179
DT
956{
957 struct dlm_ls *ls = lkb->lkb_resource->res_ls;
ef0c2bb0 958 int error = 0;
e7fd4179 959
90135925 960 mutex_lock(&ls->ls_waiters_mutex);
ef0c2bb0
DT
961
962 if (is_overlap_unlock(lkb) ||
963 (is_overlap_cancel(lkb) && (mstype == DLM_MSG_CANCEL))) {
964 error = -EINVAL;
965 goto out;
966 }
967
968 if (lkb->lkb_wait_type || is_overlap_cancel(lkb)) {
969 switch (mstype) {
970 case DLM_MSG_UNLOCK:
971 lkb->lkb_flags |= DLM_IFL_OVERLAP_UNLOCK;
972 break;
973 case DLM_MSG_CANCEL:
974 lkb->lkb_flags |= DLM_IFL_OVERLAP_CANCEL;
975 break;
976 default:
977 error = -EBUSY;
978 goto out;
979 }
980 lkb->lkb_wait_count++;
981 hold_lkb(lkb);
982
43279e53 983 log_debug(ls, "addwait %x cur %d overlap %d count %d f %x",
ef0c2bb0
DT
984 lkb->lkb_id, lkb->lkb_wait_type, mstype,
985 lkb->lkb_wait_count, lkb->lkb_flags);
e7fd4179
DT
986 goto out;
987 }
ef0c2bb0
DT
988
989 DLM_ASSERT(!lkb->lkb_wait_count,
990 dlm_print_lkb(lkb);
991 printk("wait_count %d\n", lkb->lkb_wait_count););
992
993 lkb->lkb_wait_count++;
e7fd4179 994 lkb->lkb_wait_type = mstype;
c6ff669b
DT
995 lkb->lkb_wait_time = ktime_get();
996 lkb->lkb_wait_nodeid = to_nodeid; /* for debugging */
ef0c2bb0 997 hold_lkb(lkb);
e7fd4179
DT
998 list_add(&lkb->lkb_wait_reply, &ls->ls_waiters);
999 out:
ef0c2bb0 1000 if (error)
43279e53 1001 log_error(ls, "addwait error %x %d flags %x %d %d %s",
ef0c2bb0
DT
1002 lkb->lkb_id, error, lkb->lkb_flags, mstype,
1003 lkb->lkb_wait_type, lkb->lkb_resource->res_name);
90135925 1004 mutex_unlock(&ls->ls_waiters_mutex);
ef0c2bb0 1005 return error;
e7fd4179
DT
1006}
1007
b790c3b7
DT
1008/* We clear the RESEND flag because we might be taking an lkb off the waiters
1009 list as part of process_requestqueue (e.g. a lookup that has an optimized
1010 request reply on the requestqueue) between dlm_recover_waiters_pre() which
1011 set RESEND and dlm_recover_waiters_post() */
1012
43279e53
DT
1013static int _remove_from_waiters(struct dlm_lkb *lkb, int mstype,
1014 struct dlm_message *ms)
e7fd4179 1015{
ef0c2bb0
DT
1016 struct dlm_ls *ls = lkb->lkb_resource->res_ls;
1017 int overlap_done = 0;
e7fd4179 1018
ef0c2bb0 1019 if (is_overlap_unlock(lkb) && (mstype == DLM_MSG_UNLOCK_REPLY)) {
43279e53 1020 log_debug(ls, "remwait %x unlock_reply overlap", lkb->lkb_id);
ef0c2bb0
DT
1021 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK;
1022 overlap_done = 1;
1023 goto out_del;
e7fd4179 1024 }
ef0c2bb0
DT
1025
1026 if (is_overlap_cancel(lkb) && (mstype == DLM_MSG_CANCEL_REPLY)) {
43279e53 1027 log_debug(ls, "remwait %x cancel_reply overlap", lkb->lkb_id);
ef0c2bb0
DT
1028 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
1029 overlap_done = 1;
1030 goto out_del;
1031 }
1032
43279e53
DT
1033 /* Cancel state was preemptively cleared by a successful convert,
1034 see next comment, nothing to do. */
1035
1036 if ((mstype == DLM_MSG_CANCEL_REPLY) &&
1037 (lkb->lkb_wait_type != DLM_MSG_CANCEL)) {
1038 log_debug(ls, "remwait %x cancel_reply wait_type %d",
1039 lkb->lkb_id, lkb->lkb_wait_type);
1040 return -1;
1041 }
1042
1043 /* Remove for the convert reply, and premptively remove for the
1044 cancel reply. A convert has been granted while there's still
1045 an outstanding cancel on it (the cancel is moot and the result
1046 in the cancel reply should be 0). We preempt the cancel reply
1047 because the app gets the convert result and then can follow up
1048 with another op, like convert. This subsequent op would see the
1049 lingering state of the cancel and fail with -EBUSY. */
1050
1051 if ((mstype == DLM_MSG_CONVERT_REPLY) &&
1052 (lkb->lkb_wait_type == DLM_MSG_CONVERT) &&
1053 is_overlap_cancel(lkb) && ms && !ms->m_result) {
1054 log_debug(ls, "remwait %x convert_reply zap overlap_cancel",
1055 lkb->lkb_id);
1056 lkb->lkb_wait_type = 0;
1057 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
1058 lkb->lkb_wait_count--;
1059 goto out_del;
1060 }
1061
ef0c2bb0
DT
1062 /* N.B. type of reply may not always correspond to type of original
1063 msg due to lookup->request optimization, verify others? */
1064
1065 if (lkb->lkb_wait_type) {
1066 lkb->lkb_wait_type = 0;
1067 goto out_del;
1068 }
1069
43279e53
DT
1070 log_error(ls, "remwait error %x reply %d flags %x no wait_type",
1071 lkb->lkb_id, mstype, lkb->lkb_flags);
ef0c2bb0
DT
1072 return -1;
1073
1074 out_del:
1075 /* the force-unlock/cancel has completed and we haven't recvd a reply
1076 to the op that was in progress prior to the unlock/cancel; we
1077 give up on any reply to the earlier op. FIXME: not sure when/how
1078 this would happen */
1079
1080 if (overlap_done && lkb->lkb_wait_type) {
43279e53 1081 log_error(ls, "remwait error %x reply %d wait_type %d overlap",
ef0c2bb0
DT
1082 lkb->lkb_id, mstype, lkb->lkb_wait_type);
1083 lkb->lkb_wait_count--;
1084 lkb->lkb_wait_type = 0;
1085 }
1086
1087 DLM_ASSERT(lkb->lkb_wait_count, dlm_print_lkb(lkb););
1088
b790c3b7 1089 lkb->lkb_flags &= ~DLM_IFL_RESEND;
ef0c2bb0
DT
1090 lkb->lkb_wait_count--;
1091 if (!lkb->lkb_wait_count)
1092 list_del_init(&lkb->lkb_wait_reply);
e7fd4179 1093 unhold_lkb(lkb);
ef0c2bb0 1094 return 0;
e7fd4179
DT
1095}
1096
ef0c2bb0 1097static int remove_from_waiters(struct dlm_lkb *lkb, int mstype)
e7fd4179
DT
1098{
1099 struct dlm_ls *ls = lkb->lkb_resource->res_ls;
1100 int error;
1101
90135925 1102 mutex_lock(&ls->ls_waiters_mutex);
43279e53 1103 error = _remove_from_waiters(lkb, mstype, NULL);
90135925 1104 mutex_unlock(&ls->ls_waiters_mutex);
e7fd4179
DT
1105 return error;
1106}
1107
ef0c2bb0
DT
1108/* Handles situations where we might be processing a "fake" or "stub" reply in
1109 which we can't try to take waiters_mutex again. */
1110
1111static int remove_from_waiters_ms(struct dlm_lkb *lkb, struct dlm_message *ms)
1112{
1113 struct dlm_ls *ls = lkb->lkb_resource->res_ls;
1114 int error;
1115
2a7ce0ed 1116 if (ms->m_flags != DLM_IFL_STUB_MS)
ef0c2bb0 1117 mutex_lock(&ls->ls_waiters_mutex);
43279e53 1118 error = _remove_from_waiters(lkb, ms->m_type, ms);
2a7ce0ed 1119 if (ms->m_flags != DLM_IFL_STUB_MS)
ef0c2bb0
DT
1120 mutex_unlock(&ls->ls_waiters_mutex);
1121 return error;
1122}
1123
e7fd4179
DT
1124static void dir_remove(struct dlm_rsb *r)
1125{
1126 int to_nodeid;
1127
1128 if (dlm_no_directory(r->res_ls))
1129 return;
1130
1131 to_nodeid = dlm_dir_nodeid(r);
1132 if (to_nodeid != dlm_our_nodeid())
1133 send_remove(r);
1134 else
1135 dlm_dir_remove_entry(r->res_ls, to_nodeid,
1136 r->res_name, r->res_length);
1137}
1138
9beb3bf5 1139/* FIXME: make this more efficient */
e7fd4179
DT
1140
1141static int shrink_bucket(struct dlm_ls *ls, int b)
1142{
9beb3bf5 1143 struct rb_node *n;
e7fd4179
DT
1144 struct dlm_rsb *r;
1145 int count = 0, found;
1146
1147 for (;;) {
90135925 1148 found = 0;
c7be761a 1149 spin_lock(&ls->ls_rsbtbl[b].lock);
9beb3bf5
BP
1150 for (n = rb_first(&ls->ls_rsbtbl[b].toss); n; n = rb_next(n)) {
1151 r = rb_entry(n, struct dlm_rsb, res_hashnode);
e7fd4179 1152 if (!time_after_eq(jiffies, r->res_toss_time +
68c817a1 1153 dlm_config.ci_toss_secs * HZ))
e7fd4179 1154 continue;
90135925 1155 found = 1;
e7fd4179
DT
1156 break;
1157 }
1158
1159 if (!found) {
c7be761a 1160 spin_unlock(&ls->ls_rsbtbl[b].lock);
e7fd4179
DT
1161 break;
1162 }
1163
1164 if (kref_put(&r->res_ref, kill_rsb)) {
9beb3bf5 1165 rb_erase(&r->res_hashnode, &ls->ls_rsbtbl[b].toss);
c7be761a 1166 spin_unlock(&ls->ls_rsbtbl[b].lock);
e7fd4179
DT
1167
1168 if (is_master(r))
1169 dir_remove(r);
52bda2b5 1170 dlm_free_rsb(r);
e7fd4179
DT
1171 count++;
1172 } else {
c7be761a 1173 spin_unlock(&ls->ls_rsbtbl[b].lock);
e7fd4179
DT
1174 log_error(ls, "tossed rsb in use %s", r->res_name);
1175 }
1176 }
1177
1178 return count;
1179}
1180
1181void dlm_scan_rsbs(struct dlm_ls *ls)
1182{
1183 int i;
1184
e7fd4179
DT
1185 for (i = 0; i < ls->ls_rsbtbl_size; i++) {
1186 shrink_bucket(ls, i);
85e86edf
DT
1187 if (dlm_locking_stopped(ls))
1188 break;
e7fd4179
DT
1189 cond_resched();
1190 }
1191}
1192
3ae1acf9
DT
1193static void add_timeout(struct dlm_lkb *lkb)
1194{
1195 struct dlm_ls *ls = lkb->lkb_resource->res_ls;
1196
eeda418d 1197 if (is_master_copy(lkb))
3ae1acf9 1198 return;
3ae1acf9
DT
1199
1200 if (test_bit(LSFL_TIMEWARN, &ls->ls_flags) &&
1201 !(lkb->lkb_exflags & DLM_LKF_NODLCKWT)) {
1202 lkb->lkb_flags |= DLM_IFL_WATCH_TIMEWARN;
1203 goto add_it;
1204 }
84d8cd69
DT
1205 if (lkb->lkb_exflags & DLM_LKF_TIMEOUT)
1206 goto add_it;
3ae1acf9
DT
1207 return;
1208
1209 add_it:
1210 DLM_ASSERT(list_empty(&lkb->lkb_time_list), dlm_print_lkb(lkb););
1211 mutex_lock(&ls->ls_timeout_mutex);
1212 hold_lkb(lkb);
3ae1acf9
DT
1213 list_add_tail(&lkb->lkb_time_list, &ls->ls_timeout);
1214 mutex_unlock(&ls->ls_timeout_mutex);
1215}
1216
1217static void del_timeout(struct dlm_lkb *lkb)
1218{
1219 struct dlm_ls *ls = lkb->lkb_resource->res_ls;
1220
1221 mutex_lock(&ls->ls_timeout_mutex);
1222 if (!list_empty(&lkb->lkb_time_list)) {
1223 list_del_init(&lkb->lkb_time_list);
1224 unhold_lkb(lkb);
1225 }
1226 mutex_unlock(&ls->ls_timeout_mutex);
1227}
1228
1229/* FIXME: is it safe to look at lkb_exflags, lkb_flags, lkb_timestamp, and
1230 lkb_lksb_timeout without lock_rsb? Note: we can't lock timeout_mutex
1231 and then lock rsb because of lock ordering in add_timeout. We may need
1232 to specify some special timeout-related bits in the lkb that are just to
1233 be accessed under the timeout_mutex. */
1234
1235void dlm_scan_timeout(struct dlm_ls *ls)
1236{
1237 struct dlm_rsb *r;
1238 struct dlm_lkb *lkb;
1239 int do_cancel, do_warn;
eeda418d 1240 s64 wait_us;
3ae1acf9
DT
1241
1242 for (;;) {
1243 if (dlm_locking_stopped(ls))
1244 break;
1245
1246 do_cancel = 0;
1247 do_warn = 0;
1248 mutex_lock(&ls->ls_timeout_mutex);
1249 list_for_each_entry(lkb, &ls->ls_timeout, lkb_time_list) {
1250
eeda418d
DT
1251 wait_us = ktime_to_us(ktime_sub(ktime_get(),
1252 lkb->lkb_timestamp));
1253
3ae1acf9 1254 if ((lkb->lkb_exflags & DLM_LKF_TIMEOUT) &&
eeda418d 1255 wait_us >= (lkb->lkb_timeout_cs * 10000))
3ae1acf9
DT
1256 do_cancel = 1;
1257
1258 if ((lkb->lkb_flags & DLM_IFL_WATCH_TIMEWARN) &&
eeda418d 1259 wait_us >= dlm_config.ci_timewarn_cs * 10000)
3ae1acf9
DT
1260 do_warn = 1;
1261
1262 if (!do_cancel && !do_warn)
1263 continue;
1264 hold_lkb(lkb);
1265 break;
1266 }
1267 mutex_unlock(&ls->ls_timeout_mutex);
1268
1269 if (!do_cancel && !do_warn)
1270 break;
1271
1272 r = lkb->lkb_resource;
1273 hold_rsb(r);
1274 lock_rsb(r);
1275
1276 if (do_warn) {
1277 /* clear flag so we only warn once */
1278 lkb->lkb_flags &= ~DLM_IFL_WATCH_TIMEWARN;
1279 if (!(lkb->lkb_exflags & DLM_LKF_TIMEOUT))
1280 del_timeout(lkb);
1281 dlm_timeout_warn(lkb);
1282 }
1283
1284 if (do_cancel) {
b3cab7b9 1285 log_debug(ls, "timeout cancel %x node %d %s",
639aca41 1286 lkb->lkb_id, lkb->lkb_nodeid, r->res_name);
3ae1acf9
DT
1287 lkb->lkb_flags &= ~DLM_IFL_WATCH_TIMEWARN;
1288 lkb->lkb_flags |= DLM_IFL_TIMEOUT_CANCEL;
1289 del_timeout(lkb);
1290 _cancel_lock(r, lkb);
1291 }
1292
1293 unlock_rsb(r);
1294 unhold_rsb(r);
1295 dlm_put_lkb(lkb);
1296 }
1297}
1298
1299/* This is only called by dlm_recoverd, and we rely on dlm_ls_stop() stopping
1300 dlm_recoverd before checking/setting ls_recover_begin. */
1301
1302void dlm_adjust_timeouts(struct dlm_ls *ls)
1303{
1304 struct dlm_lkb *lkb;
eeda418d 1305 u64 adj_us = jiffies_to_usecs(jiffies - ls->ls_recover_begin);
3ae1acf9
DT
1306
1307 ls->ls_recover_begin = 0;
1308 mutex_lock(&ls->ls_timeout_mutex);
1309 list_for_each_entry(lkb, &ls->ls_timeout, lkb_time_list)
eeda418d 1310 lkb->lkb_timestamp = ktime_add_us(lkb->lkb_timestamp, adj_us);
3ae1acf9 1311 mutex_unlock(&ls->ls_timeout_mutex);
c6ff669b
DT
1312
1313 if (!dlm_config.ci_waitwarn_us)
1314 return;
1315
1316 mutex_lock(&ls->ls_waiters_mutex);
1317 list_for_each_entry(lkb, &ls->ls_waiters, lkb_wait_reply) {
1318 if (ktime_to_us(lkb->lkb_wait_time))
1319 lkb->lkb_wait_time = ktime_get();
1320 }
1321 mutex_unlock(&ls->ls_waiters_mutex);
3ae1acf9
DT
1322}
1323
e7fd4179
DT
1324/* lkb is master or local copy */
1325
1326static void set_lvb_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1327{
1328 int b, len = r->res_ls->ls_lvblen;
1329
1330 /* b=1 lvb returned to caller
1331 b=0 lvb written to rsb or invalidated
1332 b=-1 do nothing */
1333
1334 b = dlm_lvb_operations[lkb->lkb_grmode + 1][lkb->lkb_rqmode + 1];
1335
1336 if (b == 1) {
1337 if (!lkb->lkb_lvbptr)
1338 return;
1339
1340 if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
1341 return;
1342
1343 if (!r->res_lvbptr)
1344 return;
1345
1346 memcpy(lkb->lkb_lvbptr, r->res_lvbptr, len);
1347 lkb->lkb_lvbseq = r->res_lvbseq;
1348
1349 } else if (b == 0) {
1350 if (lkb->lkb_exflags & DLM_LKF_IVVALBLK) {
1351 rsb_set_flag(r, RSB_VALNOTVALID);
1352 return;
1353 }
1354
1355 if (!lkb->lkb_lvbptr)
1356 return;
1357
1358 if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
1359 return;
1360
1361 if (!r->res_lvbptr)
52bda2b5 1362 r->res_lvbptr = dlm_allocate_lvb(r->res_ls);
e7fd4179
DT
1363
1364 if (!r->res_lvbptr)
1365 return;
1366
1367 memcpy(r->res_lvbptr, lkb->lkb_lvbptr, len);
1368 r->res_lvbseq++;
1369 lkb->lkb_lvbseq = r->res_lvbseq;
1370 rsb_clear_flag(r, RSB_VALNOTVALID);
1371 }
1372
1373 if (rsb_flag(r, RSB_VALNOTVALID))
1374 lkb->lkb_sbflags |= DLM_SBF_VALNOTVALID;
1375}
1376
1377static void set_lvb_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1378{
1379 if (lkb->lkb_grmode < DLM_LOCK_PW)
1380 return;
1381
1382 if (lkb->lkb_exflags & DLM_LKF_IVVALBLK) {
1383 rsb_set_flag(r, RSB_VALNOTVALID);
1384 return;
1385 }
1386
1387 if (!lkb->lkb_lvbptr)
1388 return;
1389
1390 if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
1391 return;
1392
1393 if (!r->res_lvbptr)
52bda2b5 1394 r->res_lvbptr = dlm_allocate_lvb(r->res_ls);
e7fd4179
DT
1395
1396 if (!r->res_lvbptr)
1397 return;
1398
1399 memcpy(r->res_lvbptr, lkb->lkb_lvbptr, r->res_ls->ls_lvblen);
1400 r->res_lvbseq++;
1401 rsb_clear_flag(r, RSB_VALNOTVALID);
1402}
1403
1404/* lkb is process copy (pc) */
1405
1406static void set_lvb_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb,
1407 struct dlm_message *ms)
1408{
1409 int b;
1410
1411 if (!lkb->lkb_lvbptr)
1412 return;
1413
1414 if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
1415 return;
1416
597d0cae 1417 b = dlm_lvb_operations[lkb->lkb_grmode + 1][lkb->lkb_rqmode + 1];
e7fd4179
DT
1418 if (b == 1) {
1419 int len = receive_extralen(ms);
a9cc9159
AV
1420 if (len > DLM_RESNAME_MAXLEN)
1421 len = DLM_RESNAME_MAXLEN;
e7fd4179
DT
1422 memcpy(lkb->lkb_lvbptr, ms->m_extra, len);
1423 lkb->lkb_lvbseq = ms->m_lvbseq;
1424 }
1425}
1426
1427/* Manipulate lkb's on rsb's convert/granted/waiting queues
1428 remove_lock -- used for unlock, removes lkb from granted
1429 revert_lock -- used for cancel, moves lkb from convert to granted
1430 grant_lock -- used for request and convert, adds lkb to granted or
1431 moves lkb from convert or waiting to granted
1432
1433 Each of these is used for master or local copy lkb's. There is
1434 also a _pc() variation used to make the corresponding change on
1435 a process copy (pc) lkb. */
1436
1437static void _remove_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1438{
1439 del_lkb(r, lkb);
1440 lkb->lkb_grmode = DLM_LOCK_IV;
1441 /* this unhold undoes the original ref from create_lkb()
1442 so this leads to the lkb being freed */
1443 unhold_lkb(lkb);
1444}
1445
1446static void remove_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1447{
1448 set_lvb_unlock(r, lkb);
1449 _remove_lock(r, lkb);
1450}
1451
1452static void remove_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb)
1453{
1454 _remove_lock(r, lkb);
1455}
1456
ef0c2bb0
DT
1457/* returns: 0 did nothing
1458 1 moved lock to granted
1459 -1 removed lock */
1460
1461static int revert_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
e7fd4179 1462{
ef0c2bb0
DT
1463 int rv = 0;
1464
e7fd4179
DT
1465 lkb->lkb_rqmode = DLM_LOCK_IV;
1466
1467 switch (lkb->lkb_status) {
597d0cae
DT
1468 case DLM_LKSTS_GRANTED:
1469 break;
e7fd4179
DT
1470 case DLM_LKSTS_CONVERT:
1471 move_lkb(r, lkb, DLM_LKSTS_GRANTED);
ef0c2bb0 1472 rv = 1;
e7fd4179
DT
1473 break;
1474 case DLM_LKSTS_WAITING:
1475 del_lkb(r, lkb);
1476 lkb->lkb_grmode = DLM_LOCK_IV;
1477 /* this unhold undoes the original ref from create_lkb()
1478 so this leads to the lkb being freed */
1479 unhold_lkb(lkb);
ef0c2bb0 1480 rv = -1;
e7fd4179
DT
1481 break;
1482 default:
1483 log_print("invalid status for revert %d", lkb->lkb_status);
1484 }
ef0c2bb0 1485 return rv;
e7fd4179
DT
1486}
1487
ef0c2bb0 1488static int revert_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb)
e7fd4179 1489{
ef0c2bb0 1490 return revert_lock(r, lkb);
e7fd4179
DT
1491}
1492
1493static void _grant_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1494{
1495 if (lkb->lkb_grmode != lkb->lkb_rqmode) {
1496 lkb->lkb_grmode = lkb->lkb_rqmode;
1497 if (lkb->lkb_status)
1498 move_lkb(r, lkb, DLM_LKSTS_GRANTED);
1499 else
1500 add_lkb(r, lkb, DLM_LKSTS_GRANTED);
1501 }
1502
1503 lkb->lkb_rqmode = DLM_LOCK_IV;
e7fd4179
DT
1504}
1505
1506static void grant_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1507{
1508 set_lvb_lock(r, lkb);
1509 _grant_lock(r, lkb);
1510 lkb->lkb_highbast = 0;
1511}
1512
1513static void grant_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb,
1514 struct dlm_message *ms)
1515{
1516 set_lvb_lock_pc(r, lkb, ms);
1517 _grant_lock(r, lkb);
1518}
1519
1520/* called by grant_pending_locks() which means an async grant message must
1521 be sent to the requesting node in addition to granting the lock if the
1522 lkb belongs to a remote node. */
1523
1524static void grant_lock_pending(struct dlm_rsb *r, struct dlm_lkb *lkb)
1525{
1526 grant_lock(r, lkb);
1527 if (is_master_copy(lkb))
1528 send_grant(r, lkb);
1529 else
1530 queue_cast(r, lkb, 0);
1531}
1532
7d3c1feb
DT
1533/* The special CONVDEADLK, ALTPR and ALTCW flags allow the master to
1534 change the granted/requested modes. We're munging things accordingly in
1535 the process copy.
1536 CONVDEADLK: our grmode may have been forced down to NL to resolve a
1537 conversion deadlock
1538 ALTPR/ALTCW: our rqmode may have been changed to PR or CW to become
1539 compatible with other granted locks */
1540
2a7ce0ed 1541static void munge_demoted(struct dlm_lkb *lkb)
7d3c1feb 1542{
7d3c1feb
DT
1543 if (lkb->lkb_rqmode == DLM_LOCK_IV || lkb->lkb_grmode == DLM_LOCK_IV) {
1544 log_print("munge_demoted %x invalid modes gr %d rq %d",
1545 lkb->lkb_id, lkb->lkb_grmode, lkb->lkb_rqmode);
1546 return;
1547 }
1548
1549 lkb->lkb_grmode = DLM_LOCK_NL;
1550}
1551
1552static void munge_altmode(struct dlm_lkb *lkb, struct dlm_message *ms)
1553{
1554 if (ms->m_type != DLM_MSG_REQUEST_REPLY &&
1555 ms->m_type != DLM_MSG_GRANT) {
1556 log_print("munge_altmode %x invalid reply type %d",
1557 lkb->lkb_id, ms->m_type);
1558 return;
1559 }
1560
1561 if (lkb->lkb_exflags & DLM_LKF_ALTPR)
1562 lkb->lkb_rqmode = DLM_LOCK_PR;
1563 else if (lkb->lkb_exflags & DLM_LKF_ALTCW)
1564 lkb->lkb_rqmode = DLM_LOCK_CW;
1565 else {
1566 log_print("munge_altmode invalid exflags %x", lkb->lkb_exflags);
1567 dlm_print_lkb(lkb);
1568 }
1569}
1570
e7fd4179
DT
1571static inline int first_in_list(struct dlm_lkb *lkb, struct list_head *head)
1572{
1573 struct dlm_lkb *first = list_entry(head->next, struct dlm_lkb,
1574 lkb_statequeue);
1575 if (lkb->lkb_id == first->lkb_id)
90135925 1576 return 1;
e7fd4179 1577
90135925 1578 return 0;
e7fd4179
DT
1579}
1580
e7fd4179
DT
1581/* Check if the given lkb conflicts with another lkb on the queue. */
1582
1583static int queue_conflict(struct list_head *head, struct dlm_lkb *lkb)
1584{
1585 struct dlm_lkb *this;
1586
1587 list_for_each_entry(this, head, lkb_statequeue) {
1588 if (this == lkb)
1589 continue;
3bcd3687 1590 if (!modes_compat(this, lkb))
90135925 1591 return 1;
e7fd4179 1592 }
90135925 1593 return 0;
e7fd4179
DT
1594}
1595
1596/*
1597 * "A conversion deadlock arises with a pair of lock requests in the converting
1598 * queue for one resource. The granted mode of each lock blocks the requested
1599 * mode of the other lock."
1600 *
c85d65e9
DT
1601 * Part 2: if the granted mode of lkb is preventing an earlier lkb in the
1602 * convert queue from being granted, then deadlk/demote lkb.
e7fd4179
DT
1603 *
1604 * Example:
1605 * Granted Queue: empty
1606 * Convert Queue: NL->EX (first lock)
1607 * PR->EX (second lock)
1608 *
1609 * The first lock can't be granted because of the granted mode of the second
1610 * lock and the second lock can't be granted because it's not first in the
c85d65e9
DT
1611 * list. We either cancel lkb's conversion (PR->EX) and return EDEADLK, or we
1612 * demote the granted mode of lkb (from PR to NL) if it has the CONVDEADLK
1613 * flag set and return DEMOTED in the lksb flags.
e7fd4179 1614 *
c85d65e9
DT
1615 * Originally, this function detected conv-deadlk in a more limited scope:
1616 * - if !modes_compat(lkb1, lkb2) && !modes_compat(lkb2, lkb1), or
1617 * - if lkb1 was the first entry in the queue (not just earlier), and was
1618 * blocked by the granted mode of lkb2, and there was nothing on the
1619 * granted queue preventing lkb1 from being granted immediately, i.e.
1620 * lkb2 was the only thing preventing lkb1 from being granted.
1621 *
1622 * That second condition meant we'd only say there was conv-deadlk if
1623 * resolving it (by demotion) would lead to the first lock on the convert
1624 * queue being granted right away. It allowed conversion deadlocks to exist
1625 * between locks on the convert queue while they couldn't be granted anyway.
1626 *
1627 * Now, we detect and take action on conversion deadlocks immediately when
1628 * they're created, even if they may not be immediately consequential. If
1629 * lkb1 exists anywhere in the convert queue and lkb2 comes in with a granted
1630 * mode that would prevent lkb1's conversion from being granted, we do a
1631 * deadlk/demote on lkb2 right away and don't let it onto the convert queue.
1632 * I think this means that the lkb_is_ahead condition below should always
1633 * be zero, i.e. there will never be conv-deadlk between two locks that are
1634 * both already on the convert queue.
e7fd4179
DT
1635 */
1636
c85d65e9 1637static int conversion_deadlock_detect(struct dlm_rsb *r, struct dlm_lkb *lkb2)
e7fd4179 1638{
c85d65e9
DT
1639 struct dlm_lkb *lkb1;
1640 int lkb_is_ahead = 0;
e7fd4179 1641
c85d65e9
DT
1642 list_for_each_entry(lkb1, &r->res_convertqueue, lkb_statequeue) {
1643 if (lkb1 == lkb2) {
1644 lkb_is_ahead = 1;
e7fd4179
DT
1645 continue;
1646 }
1647
c85d65e9
DT
1648 if (!lkb_is_ahead) {
1649 if (!modes_compat(lkb2, lkb1))
1650 return 1;
1651 } else {
1652 if (!modes_compat(lkb2, lkb1) &&
1653 !modes_compat(lkb1, lkb2))
1654 return 1;
1655 }
e7fd4179 1656 }
90135925 1657 return 0;
e7fd4179
DT
1658}
1659
1660/*
1661 * Return 1 if the lock can be granted, 0 otherwise.
1662 * Also detect and resolve conversion deadlocks.
1663 *
1664 * lkb is the lock to be granted
1665 *
1666 * now is 1 if the function is being called in the context of the
1667 * immediate request, it is 0 if called later, after the lock has been
1668 * queued.
1669 *
1670 * References are from chapter 6 of "VAXcluster Principles" by Roy Davis
1671 */
1672
1673static int _can_be_granted(struct dlm_rsb *r, struct dlm_lkb *lkb, int now)
1674{
1675 int8_t conv = (lkb->lkb_grmode != DLM_LOCK_IV);
1676
1677 /*
1678 * 6-10: Version 5.4 introduced an option to address the phenomenon of
1679 * a new request for a NL mode lock being blocked.
1680 *
1681 * 6-11: If the optional EXPEDITE flag is used with the new NL mode
1682 * request, then it would be granted. In essence, the use of this flag
1683 * tells the Lock Manager to expedite theis request by not considering
1684 * what may be in the CONVERTING or WAITING queues... As of this
1685 * writing, the EXPEDITE flag can be used only with new requests for NL
1686 * mode locks. This flag is not valid for conversion requests.
1687 *
1688 * A shortcut. Earlier checks return an error if EXPEDITE is used in a
1689 * conversion or used with a non-NL requested mode. We also know an
1690 * EXPEDITE request is always granted immediately, so now must always
1691 * be 1. The full condition to grant an expedite request: (now &&
1692 * !conv && lkb->rqmode == DLM_LOCK_NL && (flags & EXPEDITE)) can
1693 * therefore be shortened to just checking the flag.
1694 */
1695
1696 if (lkb->lkb_exflags & DLM_LKF_EXPEDITE)
90135925 1697 return 1;
e7fd4179
DT
1698
1699 /*
1700 * A shortcut. Without this, !queue_conflict(grantqueue, lkb) would be
1701 * added to the remaining conditions.
1702 */
1703
1704 if (queue_conflict(&r->res_grantqueue, lkb))
1705 goto out;
1706
1707 /*
1708 * 6-3: By default, a conversion request is immediately granted if the
1709 * requested mode is compatible with the modes of all other granted
1710 * locks
1711 */
1712
1713 if (queue_conflict(&r->res_convertqueue, lkb))
1714 goto out;
1715
1716 /*
1717 * 6-5: But the default algorithm for deciding whether to grant or
1718 * queue conversion requests does not by itself guarantee that such
1719 * requests are serviced on a "first come first serve" basis. This, in
1720 * turn, can lead to a phenomenon known as "indefinate postponement".
1721 *
1722 * 6-7: This issue is dealt with by using the optional QUECVT flag with
1723 * the system service employed to request a lock conversion. This flag
1724 * forces certain conversion requests to be queued, even if they are
1725 * compatible with the granted modes of other locks on the same
1726 * resource. Thus, the use of this flag results in conversion requests
1727 * being ordered on a "first come first servce" basis.
1728 *
1729 * DCT: This condition is all about new conversions being able to occur
1730 * "in place" while the lock remains on the granted queue (assuming
1731 * nothing else conflicts.) IOW if QUECVT isn't set, a conversion
1732 * doesn't _have_ to go onto the convert queue where it's processed in
1733 * order. The "now" variable is necessary to distinguish converts
1734 * being received and processed for the first time now, because once a
1735 * convert is moved to the conversion queue the condition below applies
1736 * requiring fifo granting.
1737 */
1738
1739 if (now && conv && !(lkb->lkb_exflags & DLM_LKF_QUECVT))
90135925 1740 return 1;
e7fd4179 1741
53ad1c98
DT
1742 /*
1743 * Even if the convert is compat with all granted locks,
1744 * QUECVT forces it behind other locks on the convert queue.
1745 */
1746
1747 if (now && conv && (lkb->lkb_exflags & DLM_LKF_QUECVT)) {
1748 if (list_empty(&r->res_convertqueue))
1749 return 1;
1750 else
1751 goto out;
1752 }
1753
e7fd4179 1754 /*
3bcd3687
DT
1755 * The NOORDER flag is set to avoid the standard vms rules on grant
1756 * order.
e7fd4179
DT
1757 */
1758
1759 if (lkb->lkb_exflags & DLM_LKF_NOORDER)
90135925 1760 return 1;
e7fd4179
DT
1761
1762 /*
1763 * 6-3: Once in that queue [CONVERTING], a conversion request cannot be
1764 * granted until all other conversion requests ahead of it are granted
1765 * and/or canceled.
1766 */
1767
1768 if (!now && conv && first_in_list(lkb, &r->res_convertqueue))
90135925 1769 return 1;
e7fd4179
DT
1770
1771 /*
1772 * 6-4: By default, a new request is immediately granted only if all
1773 * three of the following conditions are satisfied when the request is
1774 * issued:
1775 * - The queue of ungranted conversion requests for the resource is
1776 * empty.
1777 * - The queue of ungranted new requests for the resource is empty.
1778 * - The mode of the new request is compatible with the most
1779 * restrictive mode of all granted locks on the resource.
1780 */
1781
1782 if (now && !conv && list_empty(&r->res_convertqueue) &&
1783 list_empty(&r->res_waitqueue))
90135925 1784 return 1;
e7fd4179
DT
1785
1786 /*
1787 * 6-4: Once a lock request is in the queue of ungranted new requests,
1788 * it cannot be granted until the queue of ungranted conversion
1789 * requests is empty, all ungranted new requests ahead of it are
1790 * granted and/or canceled, and it is compatible with the granted mode
1791 * of the most restrictive lock granted on the resource.
1792 */
1793
1794 if (!now && !conv && list_empty(&r->res_convertqueue) &&
1795 first_in_list(lkb, &r->res_waitqueue))
90135925 1796 return 1;
e7fd4179 1797 out:
90135925 1798 return 0;
e7fd4179
DT
1799}
1800
c85d65e9
DT
1801static int can_be_granted(struct dlm_rsb *r, struct dlm_lkb *lkb, int now,
1802 int *err)
e7fd4179 1803{
e7fd4179
DT
1804 int rv;
1805 int8_t alt = 0, rqmode = lkb->lkb_rqmode;
c85d65e9
DT
1806 int8_t is_convert = (lkb->lkb_grmode != DLM_LOCK_IV);
1807
1808 if (err)
1809 *err = 0;
e7fd4179
DT
1810
1811 rv = _can_be_granted(r, lkb, now);
1812 if (rv)
1813 goto out;
1814
c85d65e9
DT
1815 /*
1816 * The CONVDEADLK flag is non-standard and tells the dlm to resolve
1817 * conversion deadlocks by demoting grmode to NL, otherwise the dlm
1818 * cancels one of the locks.
1819 */
1820
1821 if (is_convert && can_be_queued(lkb) &&
1822 conversion_deadlock_detect(r, lkb)) {
1823 if (lkb->lkb_exflags & DLM_LKF_CONVDEADLK) {
1824 lkb->lkb_grmode = DLM_LOCK_NL;
1825 lkb->lkb_sbflags |= DLM_SBF_DEMOTED;
1826 } else if (!(lkb->lkb_exflags & DLM_LKF_NODLCKWT)) {
1827 if (err)
1828 *err = -EDEADLK;
1829 else {
1830 log_print("can_be_granted deadlock %x now %d",
1831 lkb->lkb_id, now);
1832 dlm_dump_rsb(r);
1833 }
1834 }
e7fd4179 1835 goto out;
c85d65e9 1836 }
e7fd4179 1837
c85d65e9
DT
1838 /*
1839 * The ALTPR and ALTCW flags are non-standard and tell the dlm to try
1840 * to grant a request in a mode other than the normal rqmode. It's a
1841 * simple way to provide a big optimization to applications that can
1842 * use them.
1843 */
1844
1845 if (rqmode != DLM_LOCK_PR && (lkb->lkb_exflags & DLM_LKF_ALTPR))
e7fd4179 1846 alt = DLM_LOCK_PR;
c85d65e9 1847 else if (rqmode != DLM_LOCK_CW && (lkb->lkb_exflags & DLM_LKF_ALTCW))
e7fd4179
DT
1848 alt = DLM_LOCK_CW;
1849
1850 if (alt) {
1851 lkb->lkb_rqmode = alt;
1852 rv = _can_be_granted(r, lkb, now);
1853 if (rv)
1854 lkb->lkb_sbflags |= DLM_SBF_ALTMODE;
1855 else
1856 lkb->lkb_rqmode = rqmode;
1857 }
1858 out:
1859 return rv;
1860}
1861
c85d65e9
DT
1862/* FIXME: I don't think that can_be_granted() can/will demote or find deadlock
1863 for locks pending on the convert list. Once verified (watch for these
1864 log_prints), we should be able to just call _can_be_granted() and not
1865 bother with the demote/deadlk cases here (and there's no easy way to deal
1866 with a deadlk here, we'd have to generate something like grant_lock with
1867 the deadlk error.) */
1868
36509258
DT
1869/* Returns the highest requested mode of all blocked conversions; sets
1870 cw if there's a blocked conversion to DLM_LOCK_CW. */
c85d65e9 1871
36509258 1872static int grant_pending_convert(struct dlm_rsb *r, int high, int *cw)
e7fd4179
DT
1873{
1874 struct dlm_lkb *lkb, *s;
1875 int hi, demoted, quit, grant_restart, demote_restart;
c85d65e9 1876 int deadlk;
e7fd4179
DT
1877
1878 quit = 0;
1879 restart:
1880 grant_restart = 0;
1881 demote_restart = 0;
1882 hi = DLM_LOCK_IV;
1883
1884 list_for_each_entry_safe(lkb, s, &r->res_convertqueue, lkb_statequeue) {
1885 demoted = is_demoted(lkb);
c85d65e9
DT
1886 deadlk = 0;
1887
1888 if (can_be_granted(r, lkb, 0, &deadlk)) {
e7fd4179
DT
1889 grant_lock_pending(r, lkb);
1890 grant_restart = 1;
c85d65e9 1891 continue;
e7fd4179 1892 }
c85d65e9
DT
1893
1894 if (!demoted && is_demoted(lkb)) {
1895 log_print("WARN: pending demoted %x node %d %s",
1896 lkb->lkb_id, lkb->lkb_nodeid, r->res_name);
1897 demote_restart = 1;
1898 continue;
1899 }
1900
1901 if (deadlk) {
1902 log_print("WARN: pending deadlock %x node %d %s",
1903 lkb->lkb_id, lkb->lkb_nodeid, r->res_name);
1904 dlm_dump_rsb(r);
1905 continue;
1906 }
1907
1908 hi = max_t(int, lkb->lkb_rqmode, hi);
36509258
DT
1909
1910 if (cw && lkb->lkb_rqmode == DLM_LOCK_CW)
1911 *cw = 1;
e7fd4179
DT
1912 }
1913
1914 if (grant_restart)
1915 goto restart;
1916 if (demote_restart && !quit) {
1917 quit = 1;
1918 goto restart;
1919 }
1920
1921 return max_t(int, high, hi);
1922}
1923
36509258 1924static int grant_pending_wait(struct dlm_rsb *r, int high, int *cw)
e7fd4179
DT
1925{
1926 struct dlm_lkb *lkb, *s;
1927
1928 list_for_each_entry_safe(lkb, s, &r->res_waitqueue, lkb_statequeue) {
c85d65e9 1929 if (can_be_granted(r, lkb, 0, NULL))
e7fd4179 1930 grant_lock_pending(r, lkb);
36509258 1931 else {
e7fd4179 1932 high = max_t(int, lkb->lkb_rqmode, high);
36509258
DT
1933 if (lkb->lkb_rqmode == DLM_LOCK_CW)
1934 *cw = 1;
1935 }
e7fd4179
DT
1936 }
1937
1938 return high;
1939}
1940
36509258
DT
1941/* cw of 1 means there's a lock with a rqmode of DLM_LOCK_CW that's blocked
1942 on either the convert or waiting queue.
1943 high is the largest rqmode of all locks blocked on the convert or
1944 waiting queue. */
1945
1946static int lock_requires_bast(struct dlm_lkb *gr, int high, int cw)
1947{
1948 if (gr->lkb_grmode == DLM_LOCK_PR && cw) {
1949 if (gr->lkb_highbast < DLM_LOCK_EX)
1950 return 1;
1951 return 0;
1952 }
1953
1954 if (gr->lkb_highbast < high &&
1955 !__dlm_compat_matrix[gr->lkb_grmode+1][high+1])
1956 return 1;
1957 return 0;
1958}
1959
e7fd4179
DT
1960static void grant_pending_locks(struct dlm_rsb *r)
1961{
1962 struct dlm_lkb *lkb, *s;
1963 int high = DLM_LOCK_IV;
36509258 1964 int cw = 0;
e7fd4179 1965
a345da3e 1966 DLM_ASSERT(is_master(r), dlm_dump_rsb(r););
e7fd4179 1967
36509258
DT
1968 high = grant_pending_convert(r, high, &cw);
1969 high = grant_pending_wait(r, high, &cw);
e7fd4179
DT
1970
1971 if (high == DLM_LOCK_IV)
1972 return;
1973
1974 /*
1975 * If there are locks left on the wait/convert queue then send blocking
1976 * ASTs to granted locks based on the largest requested mode (high)
36509258 1977 * found above.
e7fd4179
DT
1978 */
1979
1980 list_for_each_entry_safe(lkb, s, &r->res_grantqueue, lkb_statequeue) {
e5dae548 1981 if (lkb->lkb_bastfn && lock_requires_bast(lkb, high, cw)) {
329fc4c3
DT
1982 if (cw && high == DLM_LOCK_PR &&
1983 lkb->lkb_grmode == DLM_LOCK_PR)
36509258
DT
1984 queue_bast(r, lkb, DLM_LOCK_CW);
1985 else
1986 queue_bast(r, lkb, high);
e7fd4179
DT
1987 lkb->lkb_highbast = high;
1988 }
1989 }
1990}
1991
36509258
DT
1992static int modes_require_bast(struct dlm_lkb *gr, struct dlm_lkb *rq)
1993{
1994 if ((gr->lkb_grmode == DLM_LOCK_PR && rq->lkb_rqmode == DLM_LOCK_CW) ||
1995 (gr->lkb_grmode == DLM_LOCK_CW && rq->lkb_rqmode == DLM_LOCK_PR)) {
1996 if (gr->lkb_highbast < DLM_LOCK_EX)
1997 return 1;
1998 return 0;
1999 }
2000
2001 if (gr->lkb_highbast < rq->lkb_rqmode && !modes_compat(gr, rq))
2002 return 1;
2003 return 0;
2004}
2005
e7fd4179
DT
2006static void send_bast_queue(struct dlm_rsb *r, struct list_head *head,
2007 struct dlm_lkb *lkb)
2008{
2009 struct dlm_lkb *gr;
2010
2011 list_for_each_entry(gr, head, lkb_statequeue) {
314dd2a0
SW
2012 /* skip self when sending basts to convertqueue */
2013 if (gr == lkb)
2014 continue;
e5dae548 2015 if (gr->lkb_bastfn && modes_require_bast(gr, lkb)) {
e7fd4179
DT
2016 queue_bast(r, gr, lkb->lkb_rqmode);
2017 gr->lkb_highbast = lkb->lkb_rqmode;
2018 }
2019 }
2020}
2021
2022static void send_blocking_asts(struct dlm_rsb *r, struct dlm_lkb *lkb)
2023{
2024 send_bast_queue(r, &r->res_grantqueue, lkb);
2025}
2026
2027static void send_blocking_asts_all(struct dlm_rsb *r, struct dlm_lkb *lkb)
2028{
2029 send_bast_queue(r, &r->res_grantqueue, lkb);
2030 send_bast_queue(r, &r->res_convertqueue, lkb);
2031}
2032
2033/* set_master(r, lkb) -- set the master nodeid of a resource
2034
2035 The purpose of this function is to set the nodeid field in the given
2036 lkb using the nodeid field in the given rsb. If the rsb's nodeid is
2037 known, it can just be copied to the lkb and the function will return
2038 0. If the rsb's nodeid is _not_ known, it needs to be looked up
2039 before it can be copied to the lkb.
2040
2041 When the rsb nodeid is being looked up remotely, the initial lkb
2042 causing the lookup is kept on the ls_waiters list waiting for the
2043 lookup reply. Other lkb's waiting for the same rsb lookup are kept
2044 on the rsb's res_lookup list until the master is verified.
2045
2046 Return values:
2047 0: nodeid is set in rsb/lkb and the caller should go ahead and use it
2048 1: the rsb master is not available and the lkb has been placed on
2049 a wait queue
2050*/
2051
2052static int set_master(struct dlm_rsb *r, struct dlm_lkb *lkb)
2053{
2054 struct dlm_ls *ls = r->res_ls;
755b5eb8 2055 int i, error, dir_nodeid, ret_nodeid, our_nodeid = dlm_our_nodeid();
e7fd4179
DT
2056
2057 if (rsb_flag(r, RSB_MASTER_UNCERTAIN)) {
2058 rsb_clear_flag(r, RSB_MASTER_UNCERTAIN);
2059 r->res_first_lkid = lkb->lkb_id;
2060 lkb->lkb_nodeid = r->res_nodeid;
2061 return 0;
2062 }
2063
2064 if (r->res_first_lkid && r->res_first_lkid != lkb->lkb_id) {
2065 list_add_tail(&lkb->lkb_rsb_lookup, &r->res_lookup);
2066 return 1;
2067 }
2068
2069 if (r->res_nodeid == 0) {
2070 lkb->lkb_nodeid = 0;
2071 return 0;
2072 }
2073
2074 if (r->res_nodeid > 0) {
2075 lkb->lkb_nodeid = r->res_nodeid;
2076 return 0;
2077 }
2078
a345da3e 2079 DLM_ASSERT(r->res_nodeid == -1, dlm_dump_rsb(r););
e7fd4179
DT
2080
2081 dir_nodeid = dlm_dir_nodeid(r);
2082
2083 if (dir_nodeid != our_nodeid) {
2084 r->res_first_lkid = lkb->lkb_id;
2085 send_lookup(r, lkb);
2086 return 1;
2087 }
2088
755b5eb8 2089 for (i = 0; i < 2; i++) {
e7fd4179
DT
2090 /* It's possible for dlm_scand to remove an old rsb for
2091 this same resource from the toss list, us to create
2092 a new one, look up the master locally, and find it
2093 already exists just before dlm_scand does the
2094 dir_remove() on the previous rsb. */
2095
2096 error = dlm_dir_lookup(ls, our_nodeid, r->res_name,
2097 r->res_length, &ret_nodeid);
2098 if (!error)
2099 break;
2100 log_debug(ls, "dir_lookup error %d %s", error, r->res_name);
2101 schedule();
2102 }
755b5eb8
DT
2103 if (error && error != -EEXIST)
2104 return error;
e7fd4179
DT
2105
2106 if (ret_nodeid == our_nodeid) {
2107 r->res_first_lkid = 0;
2108 r->res_nodeid = 0;
2109 lkb->lkb_nodeid = 0;
2110 } else {
2111 r->res_first_lkid = lkb->lkb_id;
2112 r->res_nodeid = ret_nodeid;
2113 lkb->lkb_nodeid = ret_nodeid;
2114 }
2115 return 0;
2116}
2117
2118static void process_lookup_list(struct dlm_rsb *r)
2119{
2120 struct dlm_lkb *lkb, *safe;
2121
2122 list_for_each_entry_safe(lkb, safe, &r->res_lookup, lkb_rsb_lookup) {
ef0c2bb0 2123 list_del_init(&lkb->lkb_rsb_lookup);
e7fd4179
DT
2124 _request_lock(r, lkb);
2125 schedule();
2126 }
2127}
2128
2129/* confirm_master -- confirm (or deny) an rsb's master nodeid */
2130
2131static void confirm_master(struct dlm_rsb *r, int error)
2132{
2133 struct dlm_lkb *lkb;
2134
2135 if (!r->res_first_lkid)
2136 return;
2137
2138 switch (error) {
2139 case 0:
2140 case -EINPROGRESS:
2141 r->res_first_lkid = 0;
2142 process_lookup_list(r);
2143 break;
2144
2145 case -EAGAIN:
aec64e1b
DT
2146 case -EBADR:
2147 case -ENOTBLK:
2148 /* the remote request failed and won't be retried (it was
2149 a NOQUEUE, or has been canceled/unlocked); make a waiting
2150 lkb the first_lkid */
e7fd4179
DT
2151
2152 r->res_first_lkid = 0;
2153
2154 if (!list_empty(&r->res_lookup)) {
2155 lkb = list_entry(r->res_lookup.next, struct dlm_lkb,
2156 lkb_rsb_lookup);
ef0c2bb0 2157 list_del_init(&lkb->lkb_rsb_lookup);
e7fd4179
DT
2158 r->res_first_lkid = lkb->lkb_id;
2159 _request_lock(r, lkb);
761b9d3f 2160 }
e7fd4179
DT
2161 break;
2162
2163 default:
2164 log_error(r->res_ls, "confirm_master unknown error %d", error);
2165 }
2166}
2167
2168static int set_lock_args(int mode, struct dlm_lksb *lksb, uint32_t flags,
e5dae548
DT
2169 int namelen, unsigned long timeout_cs,
2170 void (*ast) (void *astparam),
2171 void *astparam,
2172 void (*bast) (void *astparam, int mode),
2173 struct dlm_args *args)
e7fd4179
DT
2174{
2175 int rv = -EINVAL;
2176
2177 /* check for invalid arg usage */
2178
2179 if (mode < 0 || mode > DLM_LOCK_EX)
2180 goto out;
2181
2182 if (!(flags & DLM_LKF_CONVERT) && (namelen > DLM_RESNAME_MAXLEN))
2183 goto out;
2184
2185 if (flags & DLM_LKF_CANCEL)
2186 goto out;
2187
2188 if (flags & DLM_LKF_QUECVT && !(flags & DLM_LKF_CONVERT))
2189 goto out;
2190
2191 if (flags & DLM_LKF_CONVDEADLK && !(flags & DLM_LKF_CONVERT))
2192 goto out;
2193
2194 if (flags & DLM_LKF_CONVDEADLK && flags & DLM_LKF_NOQUEUE)
2195 goto out;
2196
2197 if (flags & DLM_LKF_EXPEDITE && flags & DLM_LKF_CONVERT)
2198 goto out;
2199
2200 if (flags & DLM_LKF_EXPEDITE && flags & DLM_LKF_QUECVT)
2201 goto out;
2202
2203 if (flags & DLM_LKF_EXPEDITE && flags & DLM_LKF_NOQUEUE)
2204 goto out;
2205
2206 if (flags & DLM_LKF_EXPEDITE && mode != DLM_LOCK_NL)
2207 goto out;
2208
2209 if (!ast || !lksb)
2210 goto out;
2211
2212 if (flags & DLM_LKF_VALBLK && !lksb->sb_lvbptr)
2213 goto out;
2214
e7fd4179
DT
2215 if (flags & DLM_LKF_CONVERT && !lksb->sb_lkid)
2216 goto out;
2217
2218 /* these args will be copied to the lkb in validate_lock_args,
2219 it cannot be done now because when converting locks, fields in
2220 an active lkb cannot be modified before locking the rsb */
2221
2222 args->flags = flags;
e5dae548
DT
2223 args->astfn = ast;
2224 args->astparam = astparam;
2225 args->bastfn = bast;
d7db923e 2226 args->timeout = timeout_cs;
e7fd4179
DT
2227 args->mode = mode;
2228 args->lksb = lksb;
e7fd4179
DT
2229 rv = 0;
2230 out:
2231 return rv;
2232}
2233
2234static int set_unlock_args(uint32_t flags, void *astarg, struct dlm_args *args)
2235{
2236 if (flags & ~(DLM_LKF_CANCEL | DLM_LKF_VALBLK | DLM_LKF_IVVALBLK |
2237 DLM_LKF_FORCEUNLOCK))
2238 return -EINVAL;
2239
ef0c2bb0
DT
2240 if (flags & DLM_LKF_CANCEL && flags & DLM_LKF_FORCEUNLOCK)
2241 return -EINVAL;
2242
e7fd4179 2243 args->flags = flags;
e5dae548 2244 args->astparam = astarg;
e7fd4179
DT
2245 return 0;
2246}
2247
2248static int validate_lock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
2249 struct dlm_args *args)
2250{
2251 int rv = -EINVAL;
2252
2253 if (args->flags & DLM_LKF_CONVERT) {
2254 if (lkb->lkb_flags & DLM_IFL_MSTCPY)
2255 goto out;
2256
2257 if (args->flags & DLM_LKF_QUECVT &&
2258 !__quecvt_compat_matrix[lkb->lkb_grmode+1][args->mode+1])
2259 goto out;
2260
2261 rv = -EBUSY;
2262 if (lkb->lkb_status != DLM_LKSTS_GRANTED)
2263 goto out;
2264
2265 if (lkb->lkb_wait_type)
2266 goto out;
ef0c2bb0
DT
2267
2268 if (is_overlap(lkb))
2269 goto out;
e7fd4179
DT
2270 }
2271
2272 lkb->lkb_exflags = args->flags;
2273 lkb->lkb_sbflags = 0;
e5dae548 2274 lkb->lkb_astfn = args->astfn;
e7fd4179 2275 lkb->lkb_astparam = args->astparam;
e5dae548 2276 lkb->lkb_bastfn = args->bastfn;
e7fd4179
DT
2277 lkb->lkb_rqmode = args->mode;
2278 lkb->lkb_lksb = args->lksb;
2279 lkb->lkb_lvbptr = args->lksb->sb_lvbptr;
2280 lkb->lkb_ownpid = (int) current->pid;
d7db923e 2281 lkb->lkb_timeout_cs = args->timeout;
e7fd4179
DT
2282 rv = 0;
2283 out:
43279e53
DT
2284 if (rv)
2285 log_debug(ls, "validate_lock_args %d %x %x %x %d %d %s",
2286 rv, lkb->lkb_id, lkb->lkb_flags, args->flags,
2287 lkb->lkb_status, lkb->lkb_wait_type,
2288 lkb->lkb_resource->res_name);
e7fd4179
DT
2289 return rv;
2290}
2291
ef0c2bb0
DT
2292/* when dlm_unlock() sees -EBUSY with CANCEL/FORCEUNLOCK it returns 0
2293 for success */
2294
2295/* note: it's valid for lkb_nodeid/res_nodeid to be -1 when we get here
2296 because there may be a lookup in progress and it's valid to do
2297 cancel/unlockf on it */
2298
e7fd4179
DT
2299static int validate_unlock_args(struct dlm_lkb *lkb, struct dlm_args *args)
2300{
ef0c2bb0 2301 struct dlm_ls *ls = lkb->lkb_resource->res_ls;
e7fd4179
DT
2302 int rv = -EINVAL;
2303
ef0c2bb0
DT
2304 if (lkb->lkb_flags & DLM_IFL_MSTCPY) {
2305 log_error(ls, "unlock on MSTCPY %x", lkb->lkb_id);
2306 dlm_print_lkb(lkb);
e7fd4179 2307 goto out;
ef0c2bb0 2308 }
e7fd4179 2309
ef0c2bb0
DT
2310 /* an lkb may still exist even though the lock is EOL'ed due to a
2311 cancel, unlock or failed noqueue request; an app can't use these
2312 locks; return same error as if the lkid had not been found at all */
e7fd4179 2313
ef0c2bb0
DT
2314 if (lkb->lkb_flags & DLM_IFL_ENDOFLIFE) {
2315 log_debug(ls, "unlock on ENDOFLIFE %x", lkb->lkb_id);
2316 rv = -ENOENT;
e7fd4179 2317 goto out;
ef0c2bb0 2318 }
e7fd4179 2319
ef0c2bb0
DT
2320 /* an lkb may be waiting for an rsb lookup to complete where the
2321 lookup was initiated by another lock */
2322
42dc1601
DT
2323 if (!list_empty(&lkb->lkb_rsb_lookup)) {
2324 if (args->flags & (DLM_LKF_CANCEL | DLM_LKF_FORCEUNLOCK)) {
ef0c2bb0
DT
2325 log_debug(ls, "unlock on rsb_lookup %x", lkb->lkb_id);
2326 list_del_init(&lkb->lkb_rsb_lookup);
2327 queue_cast(lkb->lkb_resource, lkb,
2328 args->flags & DLM_LKF_CANCEL ?
2329 -DLM_ECANCEL : -DLM_EUNLOCK);
2330 unhold_lkb(lkb); /* undoes create_lkb() */
ef0c2bb0 2331 }
42dc1601
DT
2332 /* caller changes -EBUSY to 0 for CANCEL and FORCEUNLOCK */
2333 rv = -EBUSY;
2334 goto out;
ef0c2bb0
DT
2335 }
2336
2337 /* cancel not allowed with another cancel/unlock in progress */
2338
2339 if (args->flags & DLM_LKF_CANCEL) {
2340 if (lkb->lkb_exflags & DLM_LKF_CANCEL)
2341 goto out;
2342
2343 if (is_overlap(lkb))
2344 goto out;
2345
3ae1acf9
DT
2346 /* don't let scand try to do a cancel */
2347 del_timeout(lkb);
2348
ef0c2bb0
DT
2349 if (lkb->lkb_flags & DLM_IFL_RESEND) {
2350 lkb->lkb_flags |= DLM_IFL_OVERLAP_CANCEL;
2351 rv = -EBUSY;
2352 goto out;
2353 }
2354
a536e381
DT
2355 /* there's nothing to cancel */
2356 if (lkb->lkb_status == DLM_LKSTS_GRANTED &&
2357 !lkb->lkb_wait_type) {
2358 rv = -EBUSY;
2359 goto out;
2360 }
2361
ef0c2bb0
DT
2362 switch (lkb->lkb_wait_type) {
2363 case DLM_MSG_LOOKUP:
2364 case DLM_MSG_REQUEST:
2365 lkb->lkb_flags |= DLM_IFL_OVERLAP_CANCEL;
2366 rv = -EBUSY;
2367 goto out;
2368 case DLM_MSG_UNLOCK:
2369 case DLM_MSG_CANCEL:
2370 goto out;
2371 }
2372 /* add_to_waiters() will set OVERLAP_CANCEL */
2373 goto out_ok;
2374 }
2375
2376 /* do we need to allow a force-unlock if there's a normal unlock
2377 already in progress? in what conditions could the normal unlock
2378 fail such that we'd want to send a force-unlock to be sure? */
2379
2380 if (args->flags & DLM_LKF_FORCEUNLOCK) {
2381 if (lkb->lkb_exflags & DLM_LKF_FORCEUNLOCK)
2382 goto out;
2383
2384 if (is_overlap_unlock(lkb))
2385 goto out;
e7fd4179 2386
3ae1acf9
DT
2387 /* don't let scand try to do a cancel */
2388 del_timeout(lkb);
2389
ef0c2bb0
DT
2390 if (lkb->lkb_flags & DLM_IFL_RESEND) {
2391 lkb->lkb_flags |= DLM_IFL_OVERLAP_UNLOCK;
2392 rv = -EBUSY;
2393 goto out;
2394 }
2395
2396 switch (lkb->lkb_wait_type) {
2397 case DLM_MSG_LOOKUP:
2398 case DLM_MSG_REQUEST:
2399 lkb->lkb_flags |= DLM_IFL_OVERLAP_UNLOCK;
2400 rv = -EBUSY;
2401 goto out;
2402 case DLM_MSG_UNLOCK:
2403 goto out;
2404 }
2405 /* add_to_waiters() will set OVERLAP_UNLOCK */
2406 goto out_ok;
2407 }
2408
2409 /* normal unlock not allowed if there's any op in progress */
e7fd4179 2410 rv = -EBUSY;
ef0c2bb0 2411 if (lkb->lkb_wait_type || lkb->lkb_wait_count)
e7fd4179
DT
2412 goto out;
2413
2414 out_ok:
ef0c2bb0
DT
2415 /* an overlapping op shouldn't blow away exflags from other op */
2416 lkb->lkb_exflags |= args->flags;
e7fd4179
DT
2417 lkb->lkb_sbflags = 0;
2418 lkb->lkb_astparam = args->astparam;
e7fd4179
DT
2419 rv = 0;
2420 out:
ef0c2bb0
DT
2421 if (rv)
2422 log_debug(ls, "validate_unlock_args %d %x %x %x %x %d %s", rv,
2423 lkb->lkb_id, lkb->lkb_flags, lkb->lkb_exflags,
2424 args->flags, lkb->lkb_wait_type,
2425 lkb->lkb_resource->res_name);
e7fd4179
DT
2426 return rv;
2427}
2428
2429/*
2430 * Four stage 4 varieties:
2431 * do_request(), do_convert(), do_unlock(), do_cancel()
2432 * These are called on the master node for the given lock and
2433 * from the central locking logic.
2434 */
2435
2436static int do_request(struct dlm_rsb *r, struct dlm_lkb *lkb)
2437{
2438 int error = 0;
2439
c85d65e9 2440 if (can_be_granted(r, lkb, 1, NULL)) {
e7fd4179
DT
2441 grant_lock(r, lkb);
2442 queue_cast(r, lkb, 0);
2443 goto out;
2444 }
2445
2446 if (can_be_queued(lkb)) {
2447 error = -EINPROGRESS;
2448 add_lkb(r, lkb, DLM_LKSTS_WAITING);
3ae1acf9 2449 add_timeout(lkb);
e7fd4179
DT
2450 goto out;
2451 }
2452
2453 error = -EAGAIN;
e7fd4179 2454 queue_cast(r, lkb, -EAGAIN);
e7fd4179
DT
2455 out:
2456 return error;
2457}
2458
cf6620ac
DT
2459static void do_request_effects(struct dlm_rsb *r, struct dlm_lkb *lkb,
2460 int error)
2461{
2462 switch (error) {
2463 case -EAGAIN:
2464 if (force_blocking_asts(lkb))
2465 send_blocking_asts_all(r, lkb);
2466 break;
2467 case -EINPROGRESS:
2468 send_blocking_asts(r, lkb);
2469 break;
2470 }
2471}
2472
e7fd4179
DT
2473static int do_convert(struct dlm_rsb *r, struct dlm_lkb *lkb)
2474{
2475 int error = 0;
c85d65e9 2476 int deadlk = 0;
e7fd4179
DT
2477
2478 /* changing an existing lock may allow others to be granted */
2479
c85d65e9 2480 if (can_be_granted(r, lkb, 1, &deadlk)) {
e7fd4179
DT
2481 grant_lock(r, lkb);
2482 queue_cast(r, lkb, 0);
e7fd4179
DT
2483 goto out;
2484 }
2485
c85d65e9
DT
2486 /* can_be_granted() detected that this lock would block in a conversion
2487 deadlock, so we leave it on the granted queue and return EDEADLK in
2488 the ast for the convert. */
2489
2490 if (deadlk) {
2491 /* it's left on the granted queue */
c85d65e9
DT
2492 revert_lock(r, lkb);
2493 queue_cast(r, lkb, -EDEADLK);
2494 error = -EDEADLK;
2495 goto out;
2496 }
2497
7d3c1feb
DT
2498 /* is_demoted() means the can_be_granted() above set the grmode
2499 to NL, and left us on the granted queue. This auto-demotion
2500 (due to CONVDEADLK) might mean other locks, and/or this lock, are
2501 now grantable. We have to try to grant other converting locks
2502 before we try again to grant this one. */
2503
2504 if (is_demoted(lkb)) {
36509258 2505 grant_pending_convert(r, DLM_LOCK_IV, NULL);
7d3c1feb
DT
2506 if (_can_be_granted(r, lkb, 1)) {
2507 grant_lock(r, lkb);
2508 queue_cast(r, lkb, 0);
7d3c1feb
DT
2509 goto out;
2510 }
2511 /* else fall through and move to convert queue */
2512 }
2513
2514 if (can_be_queued(lkb)) {
e7fd4179
DT
2515 error = -EINPROGRESS;
2516 del_lkb(r, lkb);
2517 add_lkb(r, lkb, DLM_LKSTS_CONVERT);
3ae1acf9 2518 add_timeout(lkb);
e7fd4179
DT
2519 goto out;
2520 }
2521
2522 error = -EAGAIN;
e7fd4179 2523 queue_cast(r, lkb, -EAGAIN);
e7fd4179
DT
2524 out:
2525 return error;
2526}
2527
cf6620ac
DT
2528static void do_convert_effects(struct dlm_rsb *r, struct dlm_lkb *lkb,
2529 int error)
2530{
2531 switch (error) {
2532 case 0:
2533 grant_pending_locks(r);
2534 /* grant_pending_locks also sends basts */
2535 break;
2536 case -EAGAIN:
2537 if (force_blocking_asts(lkb))
2538 send_blocking_asts_all(r, lkb);
2539 break;
2540 case -EINPROGRESS:
2541 send_blocking_asts(r, lkb);
2542 break;
2543 }
2544}
2545
e7fd4179
DT
2546static int do_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2547{
2548 remove_lock(r, lkb);
2549 queue_cast(r, lkb, -DLM_EUNLOCK);
e7fd4179
DT
2550 return -DLM_EUNLOCK;
2551}
2552
cf6620ac
DT
2553static void do_unlock_effects(struct dlm_rsb *r, struct dlm_lkb *lkb,
2554 int error)
2555{
2556 grant_pending_locks(r);
2557}
2558
ef0c2bb0 2559/* returns: 0 did nothing, -DLM_ECANCEL canceled lock */
907b9bce 2560
e7fd4179
DT
2561static int do_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb)
2562{
ef0c2bb0
DT
2563 int error;
2564
2565 error = revert_lock(r, lkb);
2566 if (error) {
2567 queue_cast(r, lkb, -DLM_ECANCEL);
ef0c2bb0
DT
2568 return -DLM_ECANCEL;
2569 }
2570 return 0;
e7fd4179
DT
2571}
2572
cf6620ac
DT
2573static void do_cancel_effects(struct dlm_rsb *r, struct dlm_lkb *lkb,
2574 int error)
2575{
2576 if (error)
2577 grant_pending_locks(r);
2578}
2579
e7fd4179
DT
2580/*
2581 * Four stage 3 varieties:
2582 * _request_lock(), _convert_lock(), _unlock_lock(), _cancel_lock()
2583 */
2584
2585/* add a new lkb to a possibly new rsb, called by requesting process */
2586
2587static int _request_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2588{
2589 int error;
2590
2591 /* set_master: sets lkb nodeid from r */
2592
2593 error = set_master(r, lkb);
2594 if (error < 0)
2595 goto out;
2596 if (error) {
2597 error = 0;
2598 goto out;
2599 }
2600
cf6620ac 2601 if (is_remote(r)) {
e7fd4179
DT
2602 /* receive_request() calls do_request() on remote node */
2603 error = send_request(r, lkb);
cf6620ac 2604 } else {
e7fd4179 2605 error = do_request(r, lkb);
cf6620ac
DT
2606 /* for remote locks the request_reply is sent
2607 between do_request and do_request_effects */
2608 do_request_effects(r, lkb, error);
2609 }
e7fd4179
DT
2610 out:
2611 return error;
2612}
2613
3bcd3687 2614/* change some property of an existing lkb, e.g. mode */
e7fd4179
DT
2615
2616static int _convert_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2617{
2618 int error;
2619
cf6620ac 2620 if (is_remote(r)) {
e7fd4179
DT
2621 /* receive_convert() calls do_convert() on remote node */
2622 error = send_convert(r, lkb);
cf6620ac 2623 } else {
e7fd4179 2624 error = do_convert(r, lkb);
cf6620ac
DT
2625 /* for remote locks the convert_reply is sent
2626 between do_convert and do_convert_effects */
2627 do_convert_effects(r, lkb, error);
2628 }
e7fd4179
DT
2629
2630 return error;
2631}
2632
2633/* remove an existing lkb from the granted queue */
2634
2635static int _unlock_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2636{
2637 int error;
2638
cf6620ac 2639 if (is_remote(r)) {
e7fd4179
DT
2640 /* receive_unlock() calls do_unlock() on remote node */
2641 error = send_unlock(r, lkb);
cf6620ac 2642 } else {
e7fd4179 2643 error = do_unlock(r, lkb);
cf6620ac
DT
2644 /* for remote locks the unlock_reply is sent
2645 between do_unlock and do_unlock_effects */
2646 do_unlock_effects(r, lkb, error);
2647 }
e7fd4179
DT
2648
2649 return error;
2650}
2651
2652/* remove an existing lkb from the convert or wait queue */
2653
2654static int _cancel_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2655{
2656 int error;
2657
cf6620ac 2658 if (is_remote(r)) {
e7fd4179
DT
2659 /* receive_cancel() calls do_cancel() on remote node */
2660 error = send_cancel(r, lkb);
cf6620ac 2661 } else {
e7fd4179 2662 error = do_cancel(r, lkb);
cf6620ac
DT
2663 /* for remote locks the cancel_reply is sent
2664 between do_cancel and do_cancel_effects */
2665 do_cancel_effects(r, lkb, error);
2666 }
e7fd4179
DT
2667
2668 return error;
2669}
2670
2671/*
2672 * Four stage 2 varieties:
2673 * request_lock(), convert_lock(), unlock_lock(), cancel_lock()
2674 */
2675
2676static int request_lock(struct dlm_ls *ls, struct dlm_lkb *lkb, char *name,
2677 int len, struct dlm_args *args)
2678{
2679 struct dlm_rsb *r;
2680 int error;
2681
2682 error = validate_lock_args(ls, lkb, args);
2683 if (error)
2684 goto out;
2685
2686 error = find_rsb(ls, name, len, R_CREATE, &r);
2687 if (error)
2688 goto out;
2689
2690 lock_rsb(r);
2691
2692 attach_lkb(r, lkb);
2693 lkb->lkb_lksb->sb_lkid = lkb->lkb_id;
2694
2695 error = _request_lock(r, lkb);
2696
2697 unlock_rsb(r);
2698 put_rsb(r);
2699
2700 out:
2701 return error;
2702}
2703
2704static int convert_lock(struct dlm_ls *ls, struct dlm_lkb *lkb,
2705 struct dlm_args *args)
2706{
2707 struct dlm_rsb *r;
2708 int error;
2709
2710 r = lkb->lkb_resource;
2711
2712 hold_rsb(r);
2713 lock_rsb(r);
2714
2715 error = validate_lock_args(ls, lkb, args);
2716 if (error)
2717 goto out;
2718
2719 error = _convert_lock(r, lkb);
2720 out:
2721 unlock_rsb(r);
2722 put_rsb(r);
2723 return error;
2724}
2725
2726static int unlock_lock(struct dlm_ls *ls, struct dlm_lkb *lkb,
2727 struct dlm_args *args)
2728{
2729 struct dlm_rsb *r;
2730 int error;
2731
2732 r = lkb->lkb_resource;
2733
2734 hold_rsb(r);
2735 lock_rsb(r);
2736
2737 error = validate_unlock_args(lkb, args);
2738 if (error)
2739 goto out;
2740
2741 error = _unlock_lock(r, lkb);
2742 out:
2743 unlock_rsb(r);
2744 put_rsb(r);
2745 return error;
2746}
2747
2748static int cancel_lock(struct dlm_ls *ls, struct dlm_lkb *lkb,
2749 struct dlm_args *args)
2750{
2751 struct dlm_rsb *r;
2752 int error;
2753
2754 r = lkb->lkb_resource;
2755
2756 hold_rsb(r);
2757 lock_rsb(r);
2758
2759 error = validate_unlock_args(lkb, args);
2760 if (error)
2761 goto out;
2762
2763 error = _cancel_lock(r, lkb);
2764 out:
2765 unlock_rsb(r);
2766 put_rsb(r);
2767 return error;
2768}
2769
2770/*
2771 * Two stage 1 varieties: dlm_lock() and dlm_unlock()
2772 */
2773
2774int dlm_lock(dlm_lockspace_t *lockspace,
2775 int mode,
2776 struct dlm_lksb *lksb,
2777 uint32_t flags,
2778 void *name,
2779 unsigned int namelen,
2780 uint32_t parent_lkid,
2781 void (*ast) (void *astarg),
2782 void *astarg,
3bcd3687 2783 void (*bast) (void *astarg, int mode))
e7fd4179
DT
2784{
2785 struct dlm_ls *ls;
2786 struct dlm_lkb *lkb;
2787 struct dlm_args args;
2788 int error, convert = flags & DLM_LKF_CONVERT;
2789
2790 ls = dlm_find_lockspace_local(lockspace);
2791 if (!ls)
2792 return -EINVAL;
2793
85e86edf 2794 dlm_lock_recovery(ls);
e7fd4179
DT
2795
2796 if (convert)
2797 error = find_lkb(ls, lksb->sb_lkid, &lkb);
2798 else
2799 error = create_lkb(ls, &lkb);
2800
2801 if (error)
2802 goto out;
2803
d7db923e 2804 error = set_lock_args(mode, lksb, flags, namelen, 0, ast,
3bcd3687 2805 astarg, bast, &args);
e7fd4179
DT
2806 if (error)
2807 goto out_put;
2808
2809 if (convert)
2810 error = convert_lock(ls, lkb, &args);
2811 else
2812 error = request_lock(ls, lkb, name, namelen, &args);
2813
2814 if (error == -EINPROGRESS)
2815 error = 0;
2816 out_put:
2817 if (convert || error)
b3f58d8f 2818 __put_lkb(ls, lkb);
c85d65e9 2819 if (error == -EAGAIN || error == -EDEADLK)
e7fd4179
DT
2820 error = 0;
2821 out:
85e86edf 2822 dlm_unlock_recovery(ls);
e7fd4179
DT
2823 dlm_put_lockspace(ls);
2824 return error;
2825}
2826
2827int dlm_unlock(dlm_lockspace_t *lockspace,
2828 uint32_t lkid,
2829 uint32_t flags,
2830 struct dlm_lksb *lksb,
2831 void *astarg)
2832{
2833 struct dlm_ls *ls;
2834 struct dlm_lkb *lkb;
2835 struct dlm_args args;
2836 int error;
2837
2838 ls = dlm_find_lockspace_local(lockspace);
2839 if (!ls)
2840 return -EINVAL;
2841
85e86edf 2842 dlm_lock_recovery(ls);
e7fd4179
DT
2843
2844 error = find_lkb(ls, lkid, &lkb);
2845 if (error)
2846 goto out;
2847
2848 error = set_unlock_args(flags, astarg, &args);
2849 if (error)
2850 goto out_put;
2851
2852 if (flags & DLM_LKF_CANCEL)
2853 error = cancel_lock(ls, lkb, &args);
2854 else
2855 error = unlock_lock(ls, lkb, &args);
2856
2857 if (error == -DLM_EUNLOCK || error == -DLM_ECANCEL)
2858 error = 0;
ef0c2bb0
DT
2859 if (error == -EBUSY && (flags & (DLM_LKF_CANCEL | DLM_LKF_FORCEUNLOCK)))
2860 error = 0;
e7fd4179 2861 out_put:
b3f58d8f 2862 dlm_put_lkb(lkb);
e7fd4179 2863 out:
85e86edf 2864 dlm_unlock_recovery(ls);
e7fd4179
DT
2865 dlm_put_lockspace(ls);
2866 return error;
2867}
2868
2869/*
2870 * send/receive routines for remote operations and replies
2871 *
2872 * send_args
2873 * send_common
2874 * send_request receive_request
2875 * send_convert receive_convert
2876 * send_unlock receive_unlock
2877 * send_cancel receive_cancel
2878 * send_grant receive_grant
2879 * send_bast receive_bast
2880 * send_lookup receive_lookup
2881 * send_remove receive_remove
2882 *
2883 * send_common_reply
2884 * receive_request_reply send_request_reply
2885 * receive_convert_reply send_convert_reply
2886 * receive_unlock_reply send_unlock_reply
2887 * receive_cancel_reply send_cancel_reply
2888 * receive_lookup_reply send_lookup_reply
2889 */
2890
7e4dac33
DT
2891static int _create_message(struct dlm_ls *ls, int mb_len,
2892 int to_nodeid, int mstype,
2893 struct dlm_message **ms_ret,
2894 struct dlm_mhandle **mh_ret)
e7fd4179
DT
2895{
2896 struct dlm_message *ms;
2897 struct dlm_mhandle *mh;
2898 char *mb;
e7fd4179
DT
2899
2900 /* get_buffer gives us a message handle (mh) that we need to
2901 pass into lowcomms_commit and a message buffer (mb) that we
2902 write our data into */
2903
573c24c4 2904 mh = dlm_lowcomms_get_buffer(to_nodeid, mb_len, GFP_NOFS, &mb);
e7fd4179
DT
2905 if (!mh)
2906 return -ENOBUFS;
2907
2908 memset(mb, 0, mb_len);
2909
2910 ms = (struct dlm_message *) mb;
2911
2912 ms->m_header.h_version = (DLM_HEADER_MAJOR | DLM_HEADER_MINOR);
7e4dac33 2913 ms->m_header.h_lockspace = ls->ls_global_id;
e7fd4179
DT
2914 ms->m_header.h_nodeid = dlm_our_nodeid();
2915 ms->m_header.h_length = mb_len;
2916 ms->m_header.h_cmd = DLM_MSG;
2917
2918 ms->m_type = mstype;
2919
2920 *mh_ret = mh;
2921 *ms_ret = ms;
2922 return 0;
2923}
2924
7e4dac33
DT
2925static int create_message(struct dlm_rsb *r, struct dlm_lkb *lkb,
2926 int to_nodeid, int mstype,
2927 struct dlm_message **ms_ret,
2928 struct dlm_mhandle **mh_ret)
2929{
2930 int mb_len = sizeof(struct dlm_message);
2931
2932 switch (mstype) {
2933 case DLM_MSG_REQUEST:
2934 case DLM_MSG_LOOKUP:
2935 case DLM_MSG_REMOVE:
2936 mb_len += r->res_length;
2937 break;
2938 case DLM_MSG_CONVERT:
2939 case DLM_MSG_UNLOCK:
2940 case DLM_MSG_REQUEST_REPLY:
2941 case DLM_MSG_CONVERT_REPLY:
2942 case DLM_MSG_GRANT:
2943 if (lkb && lkb->lkb_lvbptr)
2944 mb_len += r->res_ls->ls_lvblen;
2945 break;
2946 }
2947
2948 return _create_message(r->res_ls, mb_len, to_nodeid, mstype,
2949 ms_ret, mh_ret);
2950}
2951
e7fd4179
DT
2952/* further lowcomms enhancements or alternate implementations may make
2953 the return value from this function useful at some point */
2954
2955static int send_message(struct dlm_mhandle *mh, struct dlm_message *ms)
2956{
2957 dlm_message_out(ms);
2958 dlm_lowcomms_commit_buffer(mh);
2959 return 0;
2960}
2961
2962static void send_args(struct dlm_rsb *r, struct dlm_lkb *lkb,
2963 struct dlm_message *ms)
2964{
2965 ms->m_nodeid = lkb->lkb_nodeid;
2966 ms->m_pid = lkb->lkb_ownpid;
2967 ms->m_lkid = lkb->lkb_id;
2968 ms->m_remid = lkb->lkb_remid;
2969 ms->m_exflags = lkb->lkb_exflags;
2970 ms->m_sbflags = lkb->lkb_sbflags;
2971 ms->m_flags = lkb->lkb_flags;
2972 ms->m_lvbseq = lkb->lkb_lvbseq;
2973 ms->m_status = lkb->lkb_status;
2974 ms->m_grmode = lkb->lkb_grmode;
2975 ms->m_rqmode = lkb->lkb_rqmode;
2976 ms->m_hash = r->res_hash;
2977
2978 /* m_result and m_bastmode are set from function args,
2979 not from lkb fields */
2980
e5dae548 2981 if (lkb->lkb_bastfn)
8304d6f2 2982 ms->m_asts |= DLM_CB_BAST;
e5dae548 2983 if (lkb->lkb_astfn)
8304d6f2 2984 ms->m_asts |= DLM_CB_CAST;
e7fd4179 2985
da49f36f
DT
2986 /* compare with switch in create_message; send_remove() doesn't
2987 use send_args() */
e7fd4179 2988
da49f36f
DT
2989 switch (ms->m_type) {
2990 case DLM_MSG_REQUEST:
2991 case DLM_MSG_LOOKUP:
2992 memcpy(ms->m_extra, r->res_name, r->res_length);
2993 break;
2994 case DLM_MSG_CONVERT:
2995 case DLM_MSG_UNLOCK:
2996 case DLM_MSG_REQUEST_REPLY:
2997 case DLM_MSG_CONVERT_REPLY:
2998 case DLM_MSG_GRANT:
2999 if (!lkb->lkb_lvbptr)
3000 break;
e7fd4179 3001 memcpy(ms->m_extra, lkb->lkb_lvbptr, r->res_ls->ls_lvblen);
da49f36f
DT
3002 break;
3003 }
e7fd4179
DT
3004}
3005
3006static int send_common(struct dlm_rsb *r, struct dlm_lkb *lkb, int mstype)
3007{
3008 struct dlm_message *ms;
3009 struct dlm_mhandle *mh;
3010 int to_nodeid, error;
3011
c6ff669b
DT
3012 to_nodeid = r->res_nodeid;
3013
3014 error = add_to_waiters(lkb, mstype, to_nodeid);
ef0c2bb0
DT
3015 if (error)
3016 return error;
e7fd4179 3017
e7fd4179
DT
3018 error = create_message(r, lkb, to_nodeid, mstype, &ms, &mh);
3019 if (error)
3020 goto fail;
3021
3022 send_args(r, lkb, ms);
3023
3024 error = send_message(mh, ms);
3025 if (error)
3026 goto fail;
3027 return 0;
3028
3029 fail:
ef0c2bb0 3030 remove_from_waiters(lkb, msg_reply_type(mstype));
e7fd4179
DT
3031 return error;
3032}
3033
3034static int send_request(struct dlm_rsb *r, struct dlm_lkb *lkb)
3035{
3036 return send_common(r, lkb, DLM_MSG_REQUEST);
3037}
3038
3039static int send_convert(struct dlm_rsb *r, struct dlm_lkb *lkb)
3040{
3041 int error;
3042
3043 error = send_common(r, lkb, DLM_MSG_CONVERT);
3044
3045 /* down conversions go without a reply from the master */
3046 if (!error && down_conversion(lkb)) {
ef0c2bb0 3047 remove_from_waiters(lkb, DLM_MSG_CONVERT_REPLY);
2a7ce0ed 3048 r->res_ls->ls_stub_ms.m_flags = DLM_IFL_STUB_MS;
ef0c2bb0 3049 r->res_ls->ls_stub_ms.m_type = DLM_MSG_CONVERT_REPLY;
e7fd4179
DT
3050 r->res_ls->ls_stub_ms.m_result = 0;
3051 __receive_convert_reply(r, lkb, &r->res_ls->ls_stub_ms);
3052 }
3053
3054 return error;
3055}
3056
3057/* FIXME: if this lkb is the only lock we hold on the rsb, then set
3058 MASTER_UNCERTAIN to force the next request on the rsb to confirm
3059 that the master is still correct. */
3060
3061static int send_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb)
3062{
3063 return send_common(r, lkb, DLM_MSG_UNLOCK);
3064}
3065
3066static int send_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb)
3067{
3068 return send_common(r, lkb, DLM_MSG_CANCEL);
3069}
3070
3071static int send_grant(struct dlm_rsb *r, struct dlm_lkb *lkb)
3072{
3073 struct dlm_message *ms;
3074 struct dlm_mhandle *mh;
3075 int to_nodeid, error;
3076
3077 to_nodeid = lkb->lkb_nodeid;
3078
3079 error = create_message(r, lkb, to_nodeid, DLM_MSG_GRANT, &ms, &mh);
3080 if (error)
3081 goto out;
3082
3083 send_args(r, lkb, ms);
3084
3085 ms->m_result = 0;
3086
3087 error = send_message(mh, ms);
3088 out:
3089 return error;
3090}
3091
3092static int send_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int mode)
3093{
3094 struct dlm_message *ms;
3095 struct dlm_mhandle *mh;
3096 int to_nodeid, error;
3097
3098 to_nodeid = lkb->lkb_nodeid;
3099
3100 error = create_message(r, NULL, to_nodeid, DLM_MSG_BAST, &ms, &mh);
3101 if (error)
3102 goto out;
3103
3104 send_args(r, lkb, ms);
3105
3106 ms->m_bastmode = mode;
3107
3108 error = send_message(mh, ms);
3109 out:
3110 return error;
3111}
3112
3113static int send_lookup(struct dlm_rsb *r, struct dlm_lkb *lkb)
3114{
3115 struct dlm_message *ms;
3116 struct dlm_mhandle *mh;
3117 int to_nodeid, error;
3118
c6ff669b
DT
3119 to_nodeid = dlm_dir_nodeid(r);
3120
3121 error = add_to_waiters(lkb, DLM_MSG_LOOKUP, to_nodeid);
ef0c2bb0
DT
3122 if (error)
3123 return error;
e7fd4179 3124
e7fd4179
DT
3125 error = create_message(r, NULL, to_nodeid, DLM_MSG_LOOKUP, &ms, &mh);
3126 if (error)
3127 goto fail;
3128
3129 send_args(r, lkb, ms);
3130
3131 error = send_message(mh, ms);
3132 if (error)
3133 goto fail;
3134 return 0;
3135
3136 fail:
ef0c2bb0 3137 remove_from_waiters(lkb, DLM_MSG_LOOKUP_REPLY);
e7fd4179
DT
3138 return error;
3139}
3140
3141static int send_remove(struct dlm_rsb *r)
3142{
3143 struct dlm_message *ms;
3144 struct dlm_mhandle *mh;
3145 int to_nodeid, error;
3146
3147 to_nodeid = dlm_dir_nodeid(r);
3148
3149 error = create_message(r, NULL, to_nodeid, DLM_MSG_REMOVE, &ms, &mh);
3150 if (error)
3151 goto out;
3152
3153 memcpy(ms->m_extra, r->res_name, r->res_length);
3154 ms->m_hash = r->res_hash;
3155
3156 error = send_message(mh, ms);
3157 out:
3158 return error;
3159}
3160
3161static int send_common_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
3162 int mstype, int rv)
3163{
3164 struct dlm_message *ms;
3165 struct dlm_mhandle *mh;
3166 int to_nodeid, error;
3167
3168 to_nodeid = lkb->lkb_nodeid;
3169
3170 error = create_message(r, lkb, to_nodeid, mstype, &ms, &mh);
3171 if (error)
3172 goto out;
3173
3174 send_args(r, lkb, ms);
3175
3176 ms->m_result = rv;
3177
3178 error = send_message(mh, ms);
3179 out:
3180 return error;
3181}
3182
3183static int send_request_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
3184{
3185 return send_common_reply(r, lkb, DLM_MSG_REQUEST_REPLY, rv);
3186}
3187
3188static int send_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
3189{
3190 return send_common_reply(r, lkb, DLM_MSG_CONVERT_REPLY, rv);
3191}
3192
3193static int send_unlock_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
3194{
3195 return send_common_reply(r, lkb, DLM_MSG_UNLOCK_REPLY, rv);
3196}
3197
3198static int send_cancel_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
3199{
3200 return send_common_reply(r, lkb, DLM_MSG_CANCEL_REPLY, rv);
3201}
3202
3203static int send_lookup_reply(struct dlm_ls *ls, struct dlm_message *ms_in,
3204 int ret_nodeid, int rv)
3205{
3206 struct dlm_rsb *r = &ls->ls_stub_rsb;
3207 struct dlm_message *ms;
3208 struct dlm_mhandle *mh;
3209 int error, nodeid = ms_in->m_header.h_nodeid;
3210
3211 error = create_message(r, NULL, nodeid, DLM_MSG_LOOKUP_REPLY, &ms, &mh);
3212 if (error)
3213 goto out;
3214
3215 ms->m_lkid = ms_in->m_lkid;
3216 ms->m_result = rv;
3217 ms->m_nodeid = ret_nodeid;
3218
3219 error = send_message(mh, ms);
3220 out:
3221 return error;
3222}
3223
3224/* which args we save from a received message depends heavily on the type
3225 of message, unlike the send side where we can safely send everything about
3226 the lkb for any type of message */
3227
3228static void receive_flags(struct dlm_lkb *lkb, struct dlm_message *ms)
3229{
3230 lkb->lkb_exflags = ms->m_exflags;
6f90a8b1 3231 lkb->lkb_sbflags = ms->m_sbflags;
e7fd4179
DT
3232 lkb->lkb_flags = (lkb->lkb_flags & 0xFFFF0000) |
3233 (ms->m_flags & 0x0000FFFF);
3234}
3235
3236static void receive_flags_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
3237{
2a7ce0ed
DT
3238 if (ms->m_flags == DLM_IFL_STUB_MS)
3239 return;
3240
e7fd4179
DT
3241 lkb->lkb_sbflags = ms->m_sbflags;
3242 lkb->lkb_flags = (lkb->lkb_flags & 0xFFFF0000) |
3243 (ms->m_flags & 0x0000FFFF);
3244}
3245
3246static int receive_extralen(struct dlm_message *ms)
3247{
3248 return (ms->m_header.h_length - sizeof(struct dlm_message));
3249}
3250
e7fd4179
DT
3251static int receive_lvb(struct dlm_ls *ls, struct dlm_lkb *lkb,
3252 struct dlm_message *ms)
3253{
3254 int len;
3255
3256 if (lkb->lkb_exflags & DLM_LKF_VALBLK) {
3257 if (!lkb->lkb_lvbptr)
52bda2b5 3258 lkb->lkb_lvbptr = dlm_allocate_lvb(ls);
e7fd4179
DT
3259 if (!lkb->lkb_lvbptr)
3260 return -ENOMEM;
3261 len = receive_extralen(ms);
a9cc9159
AV
3262 if (len > DLM_RESNAME_MAXLEN)
3263 len = DLM_RESNAME_MAXLEN;
e7fd4179
DT
3264 memcpy(lkb->lkb_lvbptr, ms->m_extra, len);
3265 }
3266 return 0;
3267}
3268
e5dae548
DT
3269static void fake_bastfn(void *astparam, int mode)
3270{
3271 log_print("fake_bastfn should not be called");
3272}
3273
3274static void fake_astfn(void *astparam)
3275{
3276 log_print("fake_astfn should not be called");
3277}
3278
e7fd4179
DT
3279static int receive_request_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
3280 struct dlm_message *ms)
3281{
3282 lkb->lkb_nodeid = ms->m_header.h_nodeid;
3283 lkb->lkb_ownpid = ms->m_pid;
3284 lkb->lkb_remid = ms->m_lkid;
3285 lkb->lkb_grmode = DLM_LOCK_IV;
3286 lkb->lkb_rqmode = ms->m_rqmode;
e5dae548 3287
8304d6f2
DT
3288 lkb->lkb_bastfn = (ms->m_asts & DLM_CB_BAST) ? &fake_bastfn : NULL;
3289 lkb->lkb_astfn = (ms->m_asts & DLM_CB_CAST) ? &fake_astfn : NULL;
e7fd4179 3290
8d07fd50
DT
3291 if (lkb->lkb_exflags & DLM_LKF_VALBLK) {
3292 /* lkb was just created so there won't be an lvb yet */
52bda2b5 3293 lkb->lkb_lvbptr = dlm_allocate_lvb(ls);
8d07fd50
DT
3294 if (!lkb->lkb_lvbptr)
3295 return -ENOMEM;
3296 }
e7fd4179
DT
3297
3298 return 0;
3299}
3300
3301static int receive_convert_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
3302 struct dlm_message *ms)
3303{
e7fd4179
DT
3304 if (lkb->lkb_status != DLM_LKSTS_GRANTED)
3305 return -EBUSY;
3306
e7fd4179
DT
3307 if (receive_lvb(ls, lkb, ms))
3308 return -ENOMEM;
3309
3310 lkb->lkb_rqmode = ms->m_rqmode;
3311 lkb->lkb_lvbseq = ms->m_lvbseq;
3312
3313 return 0;
3314}
3315
3316static int receive_unlock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
3317 struct dlm_message *ms)
3318{
e7fd4179
DT
3319 if (receive_lvb(ls, lkb, ms))
3320 return -ENOMEM;
3321 return 0;
3322}
3323
3324/* We fill in the stub-lkb fields with the info that send_xxxx_reply()
3325 uses to send a reply and that the remote end uses to process the reply. */
3326
3327static void setup_stub_lkb(struct dlm_ls *ls, struct dlm_message *ms)
3328{
3329 struct dlm_lkb *lkb = &ls->ls_stub_lkb;
3330 lkb->lkb_nodeid = ms->m_header.h_nodeid;
3331 lkb->lkb_remid = ms->m_lkid;
3332}
3333
c54e04b0
DT
3334/* This is called after the rsb is locked so that we can safely inspect
3335 fields in the lkb. */
3336
3337static int validate_message(struct dlm_lkb *lkb, struct dlm_message *ms)
3338{
3339 int from = ms->m_header.h_nodeid;
3340 int error = 0;
3341
3342 switch (ms->m_type) {
3343 case DLM_MSG_CONVERT:
3344 case DLM_MSG_UNLOCK:
3345 case DLM_MSG_CANCEL:
3346 if (!is_master_copy(lkb) || lkb->lkb_nodeid != from)
3347 error = -EINVAL;
3348 break;
3349
3350 case DLM_MSG_CONVERT_REPLY:
3351 case DLM_MSG_UNLOCK_REPLY:
3352 case DLM_MSG_CANCEL_REPLY:
3353 case DLM_MSG_GRANT:
3354 case DLM_MSG_BAST:
3355 if (!is_process_copy(lkb) || lkb->lkb_nodeid != from)
3356 error = -EINVAL;
3357 break;
3358
3359 case DLM_MSG_REQUEST_REPLY:
3360 if (!is_process_copy(lkb))
3361 error = -EINVAL;
3362 else if (lkb->lkb_nodeid != -1 && lkb->lkb_nodeid != from)
3363 error = -EINVAL;
3364 break;
3365
3366 default:
3367 error = -EINVAL;
3368 }
3369
3370 if (error)
3371 log_error(lkb->lkb_resource->res_ls,
3372 "ignore invalid message %d from %d %x %x %x %d",
3373 ms->m_type, from, lkb->lkb_id, lkb->lkb_remid,
3374 lkb->lkb_flags, lkb->lkb_nodeid);
3375 return error;
3376}
3377
e7fd4179
DT
3378static void receive_request(struct dlm_ls *ls, struct dlm_message *ms)
3379{
3380 struct dlm_lkb *lkb;
3381 struct dlm_rsb *r;
3382 int error, namelen;
3383
3384 error = create_lkb(ls, &lkb);
3385 if (error)
3386 goto fail;
3387
3388 receive_flags(lkb, ms);
3389 lkb->lkb_flags |= DLM_IFL_MSTCPY;
3390 error = receive_request_args(ls, lkb, ms);
3391 if (error) {
b3f58d8f 3392 __put_lkb(ls, lkb);
e7fd4179
DT
3393 goto fail;
3394 }
3395
3396 namelen = receive_extralen(ms);
3397
3398 error = find_rsb(ls, ms->m_extra, namelen, R_MASTER, &r);
3399 if (error) {
b3f58d8f 3400 __put_lkb(ls, lkb);
e7fd4179
DT
3401 goto fail;
3402 }
3403
3404 lock_rsb(r);
3405
3406 attach_lkb(r, lkb);
3407 error = do_request(r, lkb);
3408 send_request_reply(r, lkb, error);
cf6620ac 3409 do_request_effects(r, lkb, error);
e7fd4179
DT
3410
3411 unlock_rsb(r);
3412 put_rsb(r);
3413
3414 if (error == -EINPROGRESS)
3415 error = 0;
3416 if (error)
b3f58d8f 3417 dlm_put_lkb(lkb);
e7fd4179
DT
3418 return;
3419
3420 fail:
3421 setup_stub_lkb(ls, ms);
3422 send_request_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
3423}
3424
3425static void receive_convert(struct dlm_ls *ls, struct dlm_message *ms)
3426{
3427 struct dlm_lkb *lkb;
3428 struct dlm_rsb *r;
90135925 3429 int error, reply = 1;
e7fd4179
DT
3430
3431 error = find_lkb(ls, ms->m_remid, &lkb);
3432 if (error)
3433 goto fail;
3434
3435 r = lkb->lkb_resource;
3436
3437 hold_rsb(r);
3438 lock_rsb(r);
3439
c54e04b0
DT
3440 error = validate_message(lkb, ms);
3441 if (error)
3442 goto out;
3443
e7fd4179 3444 receive_flags(lkb, ms);
cf6620ac 3445
e7fd4179 3446 error = receive_convert_args(ls, lkb, ms);
cf6620ac
DT
3447 if (error) {
3448 send_convert_reply(r, lkb, error);
3449 goto out;
3450 }
3451
e7fd4179
DT
3452 reply = !down_conversion(lkb);
3453
3454 error = do_convert(r, lkb);
e7fd4179
DT
3455 if (reply)
3456 send_convert_reply(r, lkb, error);
cf6620ac 3457 do_convert_effects(r, lkb, error);
c54e04b0 3458 out:
e7fd4179
DT
3459 unlock_rsb(r);
3460 put_rsb(r);
b3f58d8f 3461 dlm_put_lkb(lkb);
e7fd4179
DT
3462 return;
3463
3464 fail:
3465 setup_stub_lkb(ls, ms);
3466 send_convert_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
3467}
3468
3469static void receive_unlock(struct dlm_ls *ls, struct dlm_message *ms)
3470{
3471 struct dlm_lkb *lkb;
3472 struct dlm_rsb *r;
3473 int error;
3474
3475 error = find_lkb(ls, ms->m_remid, &lkb);
3476 if (error)
3477 goto fail;
3478
3479 r = lkb->lkb_resource;
3480
3481 hold_rsb(r);
3482 lock_rsb(r);
3483
c54e04b0
DT
3484 error = validate_message(lkb, ms);
3485 if (error)
3486 goto out;
3487
e7fd4179 3488 receive_flags(lkb, ms);
cf6620ac 3489
e7fd4179 3490 error = receive_unlock_args(ls, lkb, ms);
cf6620ac
DT
3491 if (error) {
3492 send_unlock_reply(r, lkb, error);
3493 goto out;
3494 }
e7fd4179
DT
3495
3496 error = do_unlock(r, lkb);
e7fd4179 3497 send_unlock_reply(r, lkb, error);
cf6620ac 3498 do_unlock_effects(r, lkb, error);
c54e04b0 3499 out:
e7fd4179
DT
3500 unlock_rsb(r);
3501 put_rsb(r);
b3f58d8f 3502 dlm_put_lkb(lkb);
e7fd4179
DT
3503 return;
3504
3505 fail:
3506 setup_stub_lkb(ls, ms);
3507 send_unlock_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
3508}
3509
3510static void receive_cancel(struct dlm_ls *ls, struct dlm_message *ms)
3511{
3512 struct dlm_lkb *lkb;
3513 struct dlm_rsb *r;
3514 int error;
3515
3516 error = find_lkb(ls, ms->m_remid, &lkb);
3517 if (error)
3518 goto fail;
3519
3520 receive_flags(lkb, ms);
3521
3522 r = lkb->lkb_resource;
3523
3524 hold_rsb(r);
3525 lock_rsb(r);
3526
c54e04b0
DT
3527 error = validate_message(lkb, ms);
3528 if (error)
3529 goto out;
3530
e7fd4179
DT
3531 error = do_cancel(r, lkb);
3532 send_cancel_reply(r, lkb, error);
cf6620ac 3533 do_cancel_effects(r, lkb, error);
c54e04b0 3534 out:
e7fd4179
DT
3535 unlock_rsb(r);
3536 put_rsb(r);
b3f58d8f 3537 dlm_put_lkb(lkb);
e7fd4179
DT
3538 return;
3539
3540 fail:
3541 setup_stub_lkb(ls, ms);
3542 send_cancel_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
3543}
3544
3545static void receive_grant(struct dlm_ls *ls, struct dlm_message *ms)
3546{
3547 struct dlm_lkb *lkb;
3548 struct dlm_rsb *r;
3549 int error;
3550
3551 error = find_lkb(ls, ms->m_remid, &lkb);
3552 if (error) {
c54e04b0
DT
3553 log_debug(ls, "receive_grant from %d no lkb %x",
3554 ms->m_header.h_nodeid, ms->m_remid);
e7fd4179
DT
3555 return;
3556 }
e7fd4179
DT
3557
3558 r = lkb->lkb_resource;
3559
3560 hold_rsb(r);
3561 lock_rsb(r);
3562
c54e04b0
DT
3563 error = validate_message(lkb, ms);
3564 if (error)
3565 goto out;
3566
e7fd4179 3567 receive_flags_reply(lkb, ms);
7d3c1feb
DT
3568 if (is_altmode(lkb))
3569 munge_altmode(lkb, ms);
e7fd4179
DT
3570 grant_lock_pc(r, lkb, ms);
3571 queue_cast(r, lkb, 0);
c54e04b0 3572 out:
e7fd4179
DT
3573 unlock_rsb(r);
3574 put_rsb(r);
b3f58d8f 3575 dlm_put_lkb(lkb);
e7fd4179
DT
3576}
3577
3578static void receive_bast(struct dlm_ls *ls, struct dlm_message *ms)
3579{
3580 struct dlm_lkb *lkb;
3581 struct dlm_rsb *r;
3582 int error;
3583
3584 error = find_lkb(ls, ms->m_remid, &lkb);
3585 if (error) {
c54e04b0
DT
3586 log_debug(ls, "receive_bast from %d no lkb %x",
3587 ms->m_header.h_nodeid, ms->m_remid);
e7fd4179
DT
3588 return;
3589 }
e7fd4179
DT
3590
3591 r = lkb->lkb_resource;
3592
3593 hold_rsb(r);
3594 lock_rsb(r);
3595
c54e04b0
DT
3596 error = validate_message(lkb, ms);
3597 if (error)
3598 goto out;
e7fd4179 3599
c54e04b0
DT
3600 queue_bast(r, lkb, ms->m_bastmode);
3601 out:
e7fd4179
DT
3602 unlock_rsb(r);
3603 put_rsb(r);
b3f58d8f 3604 dlm_put_lkb(lkb);
e7fd4179
DT
3605}
3606
3607static void receive_lookup(struct dlm_ls *ls, struct dlm_message *ms)
3608{
3609 int len, error, ret_nodeid, dir_nodeid, from_nodeid, our_nodeid;
3610
3611 from_nodeid = ms->m_header.h_nodeid;
3612 our_nodeid = dlm_our_nodeid();
3613
3614 len = receive_extralen(ms);
3615
3616 dir_nodeid = dlm_hash2nodeid(ls, ms->m_hash);
3617 if (dir_nodeid != our_nodeid) {
3618 log_error(ls, "lookup dir_nodeid %d from %d",
3619 dir_nodeid, from_nodeid);
3620 error = -EINVAL;
3621 ret_nodeid = -1;
3622 goto out;
3623 }
3624
3625 error = dlm_dir_lookup(ls, from_nodeid, ms->m_extra, len, &ret_nodeid);
3626
3627 /* Optimization: we're master so treat lookup as a request */
3628 if (!error && ret_nodeid == our_nodeid) {
3629 receive_request(ls, ms);
3630 return;
3631 }
3632 out:
3633 send_lookup_reply(ls, ms, ret_nodeid, error);
3634}
3635
3636static void receive_remove(struct dlm_ls *ls, struct dlm_message *ms)
3637{
3638 int len, dir_nodeid, from_nodeid;
3639
3640 from_nodeid = ms->m_header.h_nodeid;
3641
3642 len = receive_extralen(ms);
3643
3644 dir_nodeid = dlm_hash2nodeid(ls, ms->m_hash);
3645 if (dir_nodeid != dlm_our_nodeid()) {
3646 log_error(ls, "remove dir entry dir_nodeid %d from %d",
3647 dir_nodeid, from_nodeid);
3648 return;
3649 }
3650
3651 dlm_dir_remove_entry(ls, from_nodeid, ms->m_extra, len);
3652}
3653
8499137d
DT
3654static void receive_purge(struct dlm_ls *ls, struct dlm_message *ms)
3655{
3656 do_purge(ls, ms->m_nodeid, ms->m_pid);
3657}
3658
e7fd4179
DT
3659static void receive_request_reply(struct dlm_ls *ls, struct dlm_message *ms)
3660{
3661 struct dlm_lkb *lkb;
3662 struct dlm_rsb *r;
ef0c2bb0 3663 int error, mstype, result;
e7fd4179
DT
3664
3665 error = find_lkb(ls, ms->m_remid, &lkb);
3666 if (error) {
c54e04b0
DT
3667 log_debug(ls, "receive_request_reply from %d no lkb %x",
3668 ms->m_header.h_nodeid, ms->m_remid);
e7fd4179
DT
3669 return;
3670 }
e7fd4179 3671
e7fd4179
DT
3672 r = lkb->lkb_resource;
3673 hold_rsb(r);
3674 lock_rsb(r);
3675
c54e04b0
DT
3676 error = validate_message(lkb, ms);
3677 if (error)
3678 goto out;
3679
ef0c2bb0
DT
3680 mstype = lkb->lkb_wait_type;
3681 error = remove_from_waiters(lkb, DLM_MSG_REQUEST_REPLY);
3682 if (error)
3683 goto out;
3684
e7fd4179
DT
3685 /* Optimization: the dir node was also the master, so it took our
3686 lookup as a request and sent request reply instead of lookup reply */
3687 if (mstype == DLM_MSG_LOOKUP) {
3688 r->res_nodeid = ms->m_header.h_nodeid;
3689 lkb->lkb_nodeid = r->res_nodeid;
3690 }
3691
ef0c2bb0
DT
3692 /* this is the value returned from do_request() on the master */
3693 result = ms->m_result;
3694
3695 switch (result) {
e7fd4179 3696 case -EAGAIN:
ef0c2bb0 3697 /* request would block (be queued) on remote master */
e7fd4179
DT
3698 queue_cast(r, lkb, -EAGAIN);
3699 confirm_master(r, -EAGAIN);
ef0c2bb0 3700 unhold_lkb(lkb); /* undoes create_lkb() */
e7fd4179
DT
3701 break;
3702
3703 case -EINPROGRESS:
3704 case 0:
3705 /* request was queued or granted on remote master */
3706 receive_flags_reply(lkb, ms);
3707 lkb->lkb_remid = ms->m_lkid;
7d3c1feb
DT
3708 if (is_altmode(lkb))
3709 munge_altmode(lkb, ms);
3ae1acf9 3710 if (result) {
e7fd4179 3711 add_lkb(r, lkb, DLM_LKSTS_WAITING);
3ae1acf9
DT
3712 add_timeout(lkb);
3713 } else {
e7fd4179
DT
3714 grant_lock_pc(r, lkb, ms);
3715 queue_cast(r, lkb, 0);
3716 }
ef0c2bb0 3717 confirm_master(r, result);
e7fd4179
DT
3718 break;
3719
597d0cae 3720 case -EBADR:
e7fd4179
DT
3721 case -ENOTBLK:
3722 /* find_rsb failed to find rsb or rsb wasn't master */
ef0c2bb0
DT
3723 log_debug(ls, "receive_request_reply %x %x master diff %d %d",
3724 lkb->lkb_id, lkb->lkb_flags, r->res_nodeid, result);
e7fd4179
DT
3725 r->res_nodeid = -1;
3726 lkb->lkb_nodeid = -1;
ef0c2bb0
DT
3727
3728 if (is_overlap(lkb)) {
3729 /* we'll ignore error in cancel/unlock reply */
3730 queue_cast_overlap(r, lkb);
aec64e1b 3731 confirm_master(r, result);
ef0c2bb0
DT
3732 unhold_lkb(lkb); /* undoes create_lkb() */
3733 } else
3734 _request_lock(r, lkb);
e7fd4179
DT
3735 break;
3736
3737 default:
ef0c2bb0
DT
3738 log_error(ls, "receive_request_reply %x error %d",
3739 lkb->lkb_id, result);
e7fd4179
DT
3740 }
3741
ef0c2bb0
DT
3742 if (is_overlap_unlock(lkb) && (result == 0 || result == -EINPROGRESS)) {
3743 log_debug(ls, "receive_request_reply %x result %d unlock",
3744 lkb->lkb_id, result);
3745 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK;
3746 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
3747 send_unlock(r, lkb);
3748 } else if (is_overlap_cancel(lkb) && (result == -EINPROGRESS)) {
3749 log_debug(ls, "receive_request_reply %x cancel", lkb->lkb_id);
3750 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK;
3751 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
3752 send_cancel(r, lkb);
3753 } else {
3754 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
3755 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK;
3756 }
3757 out:
e7fd4179
DT
3758 unlock_rsb(r);
3759 put_rsb(r);
b3f58d8f 3760 dlm_put_lkb(lkb);
e7fd4179
DT
3761}
3762
3763static void __receive_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
3764 struct dlm_message *ms)
3765{
e7fd4179 3766 /* this is the value returned from do_convert() on the master */
ef0c2bb0 3767 switch (ms->m_result) {
e7fd4179
DT
3768 case -EAGAIN:
3769 /* convert would block (be queued) on remote master */
3770 queue_cast(r, lkb, -EAGAIN);
3771 break;
3772
c85d65e9
DT
3773 case -EDEADLK:
3774 receive_flags_reply(lkb, ms);
3775 revert_lock_pc(r, lkb);
3776 queue_cast(r, lkb, -EDEADLK);
3777 break;
3778
e7fd4179
DT
3779 case -EINPROGRESS:
3780 /* convert was queued on remote master */
7d3c1feb
DT
3781 receive_flags_reply(lkb, ms);
3782 if (is_demoted(lkb))
2a7ce0ed 3783 munge_demoted(lkb);
e7fd4179
DT
3784 del_lkb(r, lkb);
3785 add_lkb(r, lkb, DLM_LKSTS_CONVERT);
3ae1acf9 3786 add_timeout(lkb);
e7fd4179
DT
3787 break;
3788
3789 case 0:
3790 /* convert was granted on remote master */
3791 receive_flags_reply(lkb, ms);
7d3c1feb 3792 if (is_demoted(lkb))
2a7ce0ed 3793 munge_demoted(lkb);
e7fd4179
DT
3794 grant_lock_pc(r, lkb, ms);
3795 queue_cast(r, lkb, 0);
3796 break;
3797
3798 default:
ef0c2bb0
DT
3799 log_error(r->res_ls, "receive_convert_reply %x error %d",
3800 lkb->lkb_id, ms->m_result);
e7fd4179
DT
3801 }
3802}
3803
3804static void _receive_convert_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
3805{
3806 struct dlm_rsb *r = lkb->lkb_resource;
ef0c2bb0 3807 int error;
e7fd4179
DT
3808
3809 hold_rsb(r);
3810 lock_rsb(r);
3811
c54e04b0
DT
3812 error = validate_message(lkb, ms);
3813 if (error)
3814 goto out;
3815
ef0c2bb0
DT
3816 /* stub reply can happen with waiters_mutex held */
3817 error = remove_from_waiters_ms(lkb, ms);
3818 if (error)
3819 goto out;
e7fd4179 3820
ef0c2bb0
DT
3821 __receive_convert_reply(r, lkb, ms);
3822 out:
e7fd4179
DT
3823 unlock_rsb(r);
3824 put_rsb(r);
3825}
3826
3827static void receive_convert_reply(struct dlm_ls *ls, struct dlm_message *ms)
3828{
3829 struct dlm_lkb *lkb;
3830 int error;
3831
3832 error = find_lkb(ls, ms->m_remid, &lkb);
3833 if (error) {
c54e04b0
DT
3834 log_debug(ls, "receive_convert_reply from %d no lkb %x",
3835 ms->m_header.h_nodeid, ms->m_remid);
e7fd4179
DT
3836 return;
3837 }
e7fd4179 3838
e7fd4179 3839 _receive_convert_reply(lkb, ms);
b3f58d8f 3840 dlm_put_lkb(lkb);
e7fd4179
DT
3841}
3842
3843static void _receive_unlock_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
3844{
3845 struct dlm_rsb *r = lkb->lkb_resource;
ef0c2bb0 3846 int error;
e7fd4179
DT
3847
3848 hold_rsb(r);
3849 lock_rsb(r);
3850
c54e04b0
DT
3851 error = validate_message(lkb, ms);
3852 if (error)
3853 goto out;
3854
ef0c2bb0
DT
3855 /* stub reply can happen with waiters_mutex held */
3856 error = remove_from_waiters_ms(lkb, ms);
3857 if (error)
3858 goto out;
3859
e7fd4179
DT
3860 /* this is the value returned from do_unlock() on the master */
3861
ef0c2bb0 3862 switch (ms->m_result) {
e7fd4179
DT
3863 case -DLM_EUNLOCK:
3864 receive_flags_reply(lkb, ms);
3865 remove_lock_pc(r, lkb);
3866 queue_cast(r, lkb, -DLM_EUNLOCK);
3867 break;
ef0c2bb0
DT
3868 case -ENOENT:
3869 break;
e7fd4179 3870 default:
ef0c2bb0
DT
3871 log_error(r->res_ls, "receive_unlock_reply %x error %d",
3872 lkb->lkb_id, ms->m_result);
e7fd4179 3873 }
ef0c2bb0 3874 out:
e7fd4179
DT
3875 unlock_rsb(r);
3876 put_rsb(r);
3877}
3878
3879static void receive_unlock_reply(struct dlm_ls *ls, struct dlm_message *ms)
3880{
3881 struct dlm_lkb *lkb;
3882 int error;
3883
3884 error = find_lkb(ls, ms->m_remid, &lkb);
3885 if (error) {
c54e04b0
DT
3886 log_debug(ls, "receive_unlock_reply from %d no lkb %x",
3887 ms->m_header.h_nodeid, ms->m_remid);
e7fd4179
DT
3888 return;
3889 }
e7fd4179 3890
e7fd4179 3891 _receive_unlock_reply(lkb, ms);
b3f58d8f 3892 dlm_put_lkb(lkb);
e7fd4179
DT
3893}
3894
3895static void _receive_cancel_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
3896{
3897 struct dlm_rsb *r = lkb->lkb_resource;
ef0c2bb0 3898 int error;
e7fd4179
DT
3899
3900 hold_rsb(r);
3901 lock_rsb(r);
3902
c54e04b0
DT
3903 error = validate_message(lkb, ms);
3904 if (error)
3905 goto out;
3906
ef0c2bb0
DT
3907 /* stub reply can happen with waiters_mutex held */
3908 error = remove_from_waiters_ms(lkb, ms);
3909 if (error)
3910 goto out;
3911
e7fd4179
DT
3912 /* this is the value returned from do_cancel() on the master */
3913
ef0c2bb0 3914 switch (ms->m_result) {
e7fd4179
DT
3915 case -DLM_ECANCEL:
3916 receive_flags_reply(lkb, ms);
3917 revert_lock_pc(r, lkb);
84d8cd69 3918 queue_cast(r, lkb, -DLM_ECANCEL);
ef0c2bb0
DT
3919 break;
3920 case 0:
e7fd4179
DT
3921 break;
3922 default:
ef0c2bb0
DT
3923 log_error(r->res_ls, "receive_cancel_reply %x error %d",
3924 lkb->lkb_id, ms->m_result);
e7fd4179 3925 }
ef0c2bb0 3926 out:
e7fd4179
DT
3927 unlock_rsb(r);
3928 put_rsb(r);
3929}
3930
3931static void receive_cancel_reply(struct dlm_ls *ls, struct dlm_message *ms)
3932{
3933 struct dlm_lkb *lkb;
3934 int error;
3935
3936 error = find_lkb(ls, ms->m_remid, &lkb);
3937 if (error) {
c54e04b0
DT
3938 log_debug(ls, "receive_cancel_reply from %d no lkb %x",
3939 ms->m_header.h_nodeid, ms->m_remid);
e7fd4179
DT
3940 return;
3941 }
e7fd4179 3942
e7fd4179 3943 _receive_cancel_reply(lkb, ms);
b3f58d8f 3944 dlm_put_lkb(lkb);
e7fd4179
DT
3945}
3946
3947static void receive_lookup_reply(struct dlm_ls *ls, struct dlm_message *ms)
3948{
3949 struct dlm_lkb *lkb;
3950 struct dlm_rsb *r;
3951 int error, ret_nodeid;
3952
3953 error = find_lkb(ls, ms->m_lkid, &lkb);
3954 if (error) {
3955 log_error(ls, "receive_lookup_reply no lkb");
3956 return;
3957 }
3958
ef0c2bb0 3959 /* ms->m_result is the value returned by dlm_dir_lookup on dir node
e7fd4179 3960 FIXME: will a non-zero error ever be returned? */
e7fd4179
DT
3961
3962 r = lkb->lkb_resource;
3963 hold_rsb(r);
3964 lock_rsb(r);
3965
ef0c2bb0
DT
3966 error = remove_from_waiters(lkb, DLM_MSG_LOOKUP_REPLY);
3967 if (error)
3968 goto out;
3969
e7fd4179
DT
3970 ret_nodeid = ms->m_nodeid;
3971 if (ret_nodeid == dlm_our_nodeid()) {
3972 r->res_nodeid = 0;
3973 ret_nodeid = 0;
3974 r->res_first_lkid = 0;
3975 } else {
3976 /* set_master() will copy res_nodeid to lkb_nodeid */
3977 r->res_nodeid = ret_nodeid;
3978 }
3979
ef0c2bb0
DT
3980 if (is_overlap(lkb)) {
3981 log_debug(ls, "receive_lookup_reply %x unlock %x",
3982 lkb->lkb_id, lkb->lkb_flags);
3983 queue_cast_overlap(r, lkb);
3984 unhold_lkb(lkb); /* undoes create_lkb() */
3985 goto out_list;
3986 }
3987
e7fd4179
DT
3988 _request_lock(r, lkb);
3989
ef0c2bb0 3990 out_list:
e7fd4179
DT
3991 if (!ret_nodeid)
3992 process_lookup_list(r);
ef0c2bb0 3993 out:
e7fd4179
DT
3994 unlock_rsb(r);
3995 put_rsb(r);
b3f58d8f 3996 dlm_put_lkb(lkb);
e7fd4179
DT
3997}
3998
c36258b5 3999static void _receive_message(struct dlm_ls *ls, struct dlm_message *ms)
e7fd4179 4000{
46b43eed
DT
4001 if (!dlm_is_member(ls, ms->m_header.h_nodeid)) {
4002 log_debug(ls, "ignore non-member message %d from %d %x %x %d",
4003 ms->m_type, ms->m_header.h_nodeid, ms->m_lkid,
4004 ms->m_remid, ms->m_result);
4005 return;
4006 }
4007
e7fd4179
DT
4008 switch (ms->m_type) {
4009
4010 /* messages sent to a master node */
4011
4012 case DLM_MSG_REQUEST:
4013 receive_request(ls, ms);
4014 break;
4015
4016 case DLM_MSG_CONVERT:
4017 receive_convert(ls, ms);
4018 break;
4019
4020 case DLM_MSG_UNLOCK:
4021 receive_unlock(ls, ms);
4022 break;
4023
4024 case DLM_MSG_CANCEL:
4025 receive_cancel(ls, ms);
4026 break;
4027
4028 /* messages sent from a master node (replies to above) */
4029
4030 case DLM_MSG_REQUEST_REPLY:
4031 receive_request_reply(ls, ms);
4032 break;
4033
4034 case DLM_MSG_CONVERT_REPLY:
4035 receive_convert_reply(ls, ms);
4036 break;
4037
4038 case DLM_MSG_UNLOCK_REPLY:
4039 receive_unlock_reply(ls, ms);
4040 break;
4041
4042 case DLM_MSG_CANCEL_REPLY:
4043 receive_cancel_reply(ls, ms);
4044 break;
4045
4046 /* messages sent from a master node (only two types of async msg) */
4047
4048 case DLM_MSG_GRANT:
4049 receive_grant(ls, ms);
4050 break;
4051
4052 case DLM_MSG_BAST:
4053 receive_bast(ls, ms);
4054 break;
4055
4056 /* messages sent to a dir node */
4057
4058 case DLM_MSG_LOOKUP:
4059 receive_lookup(ls, ms);
4060 break;
4061
4062 case DLM_MSG_REMOVE:
4063 receive_remove(ls, ms);
4064 break;
4065
4066 /* messages sent from a dir node (remove has no reply) */
4067
4068 case DLM_MSG_LOOKUP_REPLY:
4069 receive_lookup_reply(ls, ms);
4070 break;
4071
8499137d
DT
4072 /* other messages */
4073
4074 case DLM_MSG_PURGE:
4075 receive_purge(ls, ms);
4076 break;
4077
e7fd4179
DT
4078 default:
4079 log_error(ls, "unknown message type %d", ms->m_type);
4080 }
e7fd4179
DT
4081}
4082
c36258b5
DT
4083/* If the lockspace is in recovery mode (locking stopped), then normal
4084 messages are saved on the requestqueue for processing after recovery is
4085 done. When not in recovery mode, we wait for dlm_recoverd to drain saved
4086 messages off the requestqueue before we process new ones. This occurs right
4087 after recovery completes when we transition from saving all messages on
4088 requestqueue, to processing all the saved messages, to processing new
4089 messages as they arrive. */
e7fd4179 4090
c36258b5
DT
4091static void dlm_receive_message(struct dlm_ls *ls, struct dlm_message *ms,
4092 int nodeid)
4093{
4094 if (dlm_locking_stopped(ls)) {
8b0d8e03 4095 dlm_add_requestqueue(ls, nodeid, ms);
c36258b5
DT
4096 } else {
4097 dlm_wait_requestqueue(ls);
4098 _receive_message(ls, ms);
4099 }
4100}
4101
4102/* This is called by dlm_recoverd to process messages that were saved on
4103 the requestqueue. */
4104
4105void dlm_receive_message_saved(struct dlm_ls *ls, struct dlm_message *ms)
4106{
4107 _receive_message(ls, ms);
4108}
4109
4110/* This is called by the midcomms layer when something is received for
4111 the lockspace. It could be either a MSG (normal message sent as part of
4112 standard locking activity) or an RCOM (recovery message sent as part of
4113 lockspace recovery). */
4114
eef7d739 4115void dlm_receive_buffer(union dlm_packet *p, int nodeid)
c36258b5 4116{
eef7d739 4117 struct dlm_header *hd = &p->header;
c36258b5
DT
4118 struct dlm_ls *ls;
4119 int type = 0;
4120
4121 switch (hd->h_cmd) {
4122 case DLM_MSG:
eef7d739
AV
4123 dlm_message_in(&p->message);
4124 type = p->message.m_type;
c36258b5
DT
4125 break;
4126 case DLM_RCOM:
eef7d739
AV
4127 dlm_rcom_in(&p->rcom);
4128 type = p->rcom.rc_type;
c36258b5
DT
4129 break;
4130 default:
4131 log_print("invalid h_cmd %d from %u", hd->h_cmd, nodeid);
4132 return;
4133 }
4134
4135 if (hd->h_nodeid != nodeid) {
4136 log_print("invalid h_nodeid %d from %d lockspace %x",
4137 hd->h_nodeid, nodeid, hd->h_lockspace);
4138 return;
4139 }
4140
4141 ls = dlm_find_lockspace_global(hd->h_lockspace);
4142 if (!ls) {
594199eb
DT
4143 if (dlm_config.ci_log_debug)
4144 log_print("invalid lockspace %x from %d cmd %d type %d",
4145 hd->h_lockspace, nodeid, hd->h_cmd, type);
c36258b5
DT
4146
4147 if (hd->h_cmd == DLM_RCOM && type == DLM_RCOM_STATUS)
eef7d739 4148 dlm_send_ls_not_ready(nodeid, &p->rcom);
c36258b5
DT
4149 return;
4150 }
4151
4152 /* this rwsem allows dlm_ls_stop() to wait for all dlm_recv threads to
4153 be inactive (in this ls) before transitioning to recovery mode */
4154
4155 down_read(&ls->ls_recv_active);
4156 if (hd->h_cmd == DLM_MSG)
eef7d739 4157 dlm_receive_message(ls, &p->message, nodeid);
c36258b5 4158 else
eef7d739 4159 dlm_receive_rcom(ls, &p->rcom, nodeid);
c36258b5
DT
4160 up_read(&ls->ls_recv_active);
4161
4162 dlm_put_lockspace(ls);
4163}
e7fd4179 4164
2a7ce0ed
DT
4165static void recover_convert_waiter(struct dlm_ls *ls, struct dlm_lkb *lkb,
4166 struct dlm_message *ms_stub)
e7fd4179
DT
4167{
4168 if (middle_conversion(lkb)) {
4169 hold_lkb(lkb);
2a7ce0ed
DT
4170 memset(ms_stub, 0, sizeof(struct dlm_message));
4171 ms_stub->m_flags = DLM_IFL_STUB_MS;
4172 ms_stub->m_type = DLM_MSG_CONVERT_REPLY;
4173 ms_stub->m_result = -EINPROGRESS;
4174 ms_stub->m_header.h_nodeid = lkb->lkb_nodeid;
4175 _receive_convert_reply(lkb, ms_stub);
e7fd4179
DT
4176
4177 /* Same special case as in receive_rcom_lock_args() */
4178 lkb->lkb_grmode = DLM_LOCK_IV;
4179 rsb_set_flag(lkb->lkb_resource, RSB_RECOVER_CONVERT);
4180 unhold_lkb(lkb);
4181
4182 } else if (lkb->lkb_rqmode >= lkb->lkb_grmode) {
4183 lkb->lkb_flags |= DLM_IFL_RESEND;
4184 }
4185
4186 /* lkb->lkb_rqmode < lkb->lkb_grmode shouldn't happen since down
4187 conversions are async; there's no reply from the remote master */
4188}
4189
4190/* A waiting lkb needs recovery if the master node has failed, or
4191 the master node is changing (only when no directory is used) */
4192
13ef1111
DT
4193static int waiter_needs_recovery(struct dlm_ls *ls, struct dlm_lkb *lkb,
4194 int dir_nodeid)
e7fd4179 4195{
13ef1111 4196 if (dlm_is_removed(ls, lkb->lkb_wait_nodeid))
e7fd4179
DT
4197 return 1;
4198
4199 if (!dlm_no_directory(ls))
4200 return 0;
4201
13ef1111
DT
4202 if (dir_nodeid == dlm_our_nodeid())
4203 return 1;
4204
4205 if (dir_nodeid != lkb->lkb_wait_nodeid)
e7fd4179
DT
4206 return 1;
4207
4208 return 0;
4209}
4210
4211/* Recovery for locks that are waiting for replies from nodes that are now
4212 gone. We can just complete unlocks and cancels by faking a reply from the
4213 dead node. Requests and up-conversions we flag to be resent after
4214 recovery. Down-conversions can just be completed with a fake reply like
4215 unlocks. Conversions between PR and CW need special attention. */
4216
4217void dlm_recover_waiters_pre(struct dlm_ls *ls)
4218{
4219 struct dlm_lkb *lkb, *safe;
2a7ce0ed 4220 struct dlm_message *ms_stub;
601342ce 4221 int wait_type, stub_unlock_result, stub_cancel_result;
13ef1111 4222 int dir_nodeid;
e7fd4179 4223
a22ca480 4224 ms_stub = kmalloc(sizeof(struct dlm_message), GFP_KERNEL);
2a7ce0ed
DT
4225 if (!ms_stub) {
4226 log_error(ls, "dlm_recover_waiters_pre no mem");
4227 return;
4228 }
4229
90135925 4230 mutex_lock(&ls->ls_waiters_mutex);
e7fd4179
DT
4231
4232 list_for_each_entry_safe(lkb, safe, &ls->ls_waiters, lkb_wait_reply) {
2a7ce0ed 4233
13ef1111
DT
4234 dir_nodeid = dlm_dir_nodeid(lkb->lkb_resource);
4235
2a7ce0ed
DT
4236 /* exclude debug messages about unlocks because there can be so
4237 many and they aren't very interesting */
4238
4239 if (lkb->lkb_wait_type != DLM_MSG_UNLOCK) {
13ef1111
DT
4240 log_debug(ls, "waiter %x remote %x msg %d r_nodeid %d "
4241 "lkb_nodeid %d wait_nodeid %d dir_nodeid %d",
4242 lkb->lkb_id,
4243 lkb->lkb_remid,
4244 lkb->lkb_wait_type,
4245 lkb->lkb_resource->res_nodeid,
4246 lkb->lkb_nodeid,
4247 lkb->lkb_wait_nodeid,
4248 dir_nodeid);
2a7ce0ed 4249 }
e7fd4179
DT
4250
4251 /* all outstanding lookups, regardless of destination will be
4252 resent after recovery is done */
4253
4254 if (lkb->lkb_wait_type == DLM_MSG_LOOKUP) {
4255 lkb->lkb_flags |= DLM_IFL_RESEND;
4256 continue;
4257 }
4258
13ef1111 4259 if (!waiter_needs_recovery(ls, lkb, dir_nodeid))
e7fd4179
DT
4260 continue;
4261
601342ce
DT
4262 wait_type = lkb->lkb_wait_type;
4263 stub_unlock_result = -DLM_EUNLOCK;
4264 stub_cancel_result = -DLM_ECANCEL;
4265
4266 /* Main reply may have been received leaving a zero wait_type,
4267 but a reply for the overlapping op may not have been
4268 received. In that case we need to fake the appropriate
4269 reply for the overlap op. */
4270
4271 if (!wait_type) {
4272 if (is_overlap_cancel(lkb)) {
4273 wait_type = DLM_MSG_CANCEL;
4274 if (lkb->lkb_grmode == DLM_LOCK_IV)
4275 stub_cancel_result = 0;
4276 }
4277 if (is_overlap_unlock(lkb)) {
4278 wait_type = DLM_MSG_UNLOCK;
4279 if (lkb->lkb_grmode == DLM_LOCK_IV)
4280 stub_unlock_result = -ENOENT;
4281 }
4282
4283 log_debug(ls, "rwpre overlap %x %x %d %d %d",
4284 lkb->lkb_id, lkb->lkb_flags, wait_type,
4285 stub_cancel_result, stub_unlock_result);
4286 }
4287
4288 switch (wait_type) {
e7fd4179
DT
4289
4290 case DLM_MSG_REQUEST:
4291 lkb->lkb_flags |= DLM_IFL_RESEND;
4292 break;
4293
4294 case DLM_MSG_CONVERT:
2a7ce0ed 4295 recover_convert_waiter(ls, lkb, ms_stub);
e7fd4179
DT
4296 break;
4297
4298 case DLM_MSG_UNLOCK:
4299 hold_lkb(lkb);
2a7ce0ed
DT
4300 memset(ms_stub, 0, sizeof(struct dlm_message));
4301 ms_stub->m_flags = DLM_IFL_STUB_MS;
4302 ms_stub->m_type = DLM_MSG_UNLOCK_REPLY;
4303 ms_stub->m_result = stub_unlock_result;
4304 ms_stub->m_header.h_nodeid = lkb->lkb_nodeid;
4305 _receive_unlock_reply(lkb, ms_stub);
b3f58d8f 4306 dlm_put_lkb(lkb);
e7fd4179
DT
4307 break;
4308
4309 case DLM_MSG_CANCEL:
4310 hold_lkb(lkb);
2a7ce0ed
DT
4311 memset(ms_stub, 0, sizeof(struct dlm_message));
4312 ms_stub->m_flags = DLM_IFL_STUB_MS;
4313 ms_stub->m_type = DLM_MSG_CANCEL_REPLY;
4314 ms_stub->m_result = stub_cancel_result;
4315 ms_stub->m_header.h_nodeid = lkb->lkb_nodeid;
4316 _receive_cancel_reply(lkb, ms_stub);
b3f58d8f 4317 dlm_put_lkb(lkb);
e7fd4179
DT
4318 break;
4319
4320 default:
601342ce
DT
4321 log_error(ls, "invalid lkb wait_type %d %d",
4322 lkb->lkb_wait_type, wait_type);
e7fd4179 4323 }
81456807 4324 schedule();
e7fd4179 4325 }
90135925 4326 mutex_unlock(&ls->ls_waiters_mutex);
2a7ce0ed 4327 kfree(ms_stub);
e7fd4179
DT
4328}
4329
ef0c2bb0 4330static struct dlm_lkb *find_resend_waiter(struct dlm_ls *ls)
e7fd4179
DT
4331{
4332 struct dlm_lkb *lkb;
ef0c2bb0 4333 int found = 0;
e7fd4179 4334
90135925 4335 mutex_lock(&ls->ls_waiters_mutex);
e7fd4179
DT
4336 list_for_each_entry(lkb, &ls->ls_waiters, lkb_wait_reply) {
4337 if (lkb->lkb_flags & DLM_IFL_RESEND) {
ef0c2bb0
DT
4338 hold_lkb(lkb);
4339 found = 1;
e7fd4179
DT
4340 break;
4341 }
4342 }
90135925 4343 mutex_unlock(&ls->ls_waiters_mutex);
e7fd4179 4344
ef0c2bb0 4345 if (!found)
e7fd4179 4346 lkb = NULL;
ef0c2bb0 4347 return lkb;
e7fd4179
DT
4348}
4349
4350/* Deal with lookups and lkb's marked RESEND from _pre. We may now be the
4351 master or dir-node for r. Processing the lkb may result in it being placed
4352 back on waiters. */
4353
ef0c2bb0
DT
4354/* We do this after normal locking has been enabled and any saved messages
4355 (in requestqueue) have been processed. We should be confident that at
4356 this point we won't get or process a reply to any of these waiting
4357 operations. But, new ops may be coming in on the rsbs/locks here from
4358 userspace or remotely. */
4359
4360/* there may have been an overlap unlock/cancel prior to recovery or after
4361 recovery. if before, the lkb may still have a pos wait_count; if after, the
4362 overlap flag would just have been set and nothing new sent. we can be
4363 confident here than any replies to either the initial op or overlap ops
4364 prior to recovery have been received. */
4365
e7fd4179
DT
4366int dlm_recover_waiters_post(struct dlm_ls *ls)
4367{
4368 struct dlm_lkb *lkb;
4369 struct dlm_rsb *r;
ef0c2bb0 4370 int error = 0, mstype, err, oc, ou;
e7fd4179
DT
4371
4372 while (1) {
4373 if (dlm_locking_stopped(ls)) {
4374 log_debug(ls, "recover_waiters_post aborted");
4375 error = -EINTR;
4376 break;
4377 }
4378
ef0c2bb0
DT
4379 lkb = find_resend_waiter(ls);
4380 if (!lkb)
e7fd4179
DT
4381 break;
4382
4383 r = lkb->lkb_resource;
ef0c2bb0
DT
4384 hold_rsb(r);
4385 lock_rsb(r);
4386
4387 mstype = lkb->lkb_wait_type;
4388 oc = is_overlap_cancel(lkb);
4389 ou = is_overlap_unlock(lkb);
4390 err = 0;
e7fd4179 4391
13ef1111
DT
4392 log_debug(ls, "waiter %x remote %x msg %d r_nodeid %d "
4393 "lkb_nodeid %d wait_nodeid %d dir_nodeid %d "
4394 "overlap %d %d", lkb->lkb_id, lkb->lkb_remid, mstype,
4395 r->res_nodeid, lkb->lkb_nodeid, lkb->lkb_wait_nodeid,
4396 dlm_dir_nodeid(r), oc, ou);
e7fd4179 4397
ef0c2bb0
DT
4398 /* At this point we assume that we won't get a reply to any
4399 previous op or overlap op on this lock. First, do a big
4400 remove_from_waiters() for all previous ops. */
4401
4402 lkb->lkb_flags &= ~DLM_IFL_RESEND;
4403 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK;
4404 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
4405 lkb->lkb_wait_type = 0;
4406 lkb->lkb_wait_count = 0;
4407 mutex_lock(&ls->ls_waiters_mutex);
4408 list_del_init(&lkb->lkb_wait_reply);
4409 mutex_unlock(&ls->ls_waiters_mutex);
4410 unhold_lkb(lkb); /* for waiters list */
4411
4412 if (oc || ou) {
4413 /* do an unlock or cancel instead of resending */
4414 switch (mstype) {
4415 case DLM_MSG_LOOKUP:
4416 case DLM_MSG_REQUEST:
4417 queue_cast(r, lkb, ou ? -DLM_EUNLOCK :
4418 -DLM_ECANCEL);
4419 unhold_lkb(lkb); /* undoes create_lkb() */
4420 break;
4421 case DLM_MSG_CONVERT:
4422 if (oc) {
4423 queue_cast(r, lkb, -DLM_ECANCEL);
4424 } else {
4425 lkb->lkb_exflags |= DLM_LKF_FORCEUNLOCK;
4426 _unlock_lock(r, lkb);
4427 }
4428 break;
4429 default:
4430 err = 1;
4431 }
4432 } else {
4433 switch (mstype) {
4434 case DLM_MSG_LOOKUP:
4435 case DLM_MSG_REQUEST:
4436 _request_lock(r, lkb);
4437 if (is_master(r))
4438 confirm_master(r, 0);
4439 break;
4440 case DLM_MSG_CONVERT:
4441 _convert_lock(r, lkb);
4442 break;
4443 default:
4444 err = 1;
4445 }
e7fd4179 4446 }
ef0c2bb0 4447
13ef1111
DT
4448 if (err) {
4449 log_error(ls, "waiter %x msg %d r_nodeid %d "
4450 "dir_nodeid %d overlap %d %d",
4451 lkb->lkb_id, mstype, r->res_nodeid,
4452 dlm_dir_nodeid(r), oc, ou);
4453 }
ef0c2bb0
DT
4454 unlock_rsb(r);
4455 put_rsb(r);
4456 dlm_put_lkb(lkb);
e7fd4179
DT
4457 }
4458
4459 return error;
4460}
4461
4462static void purge_queue(struct dlm_rsb *r, struct list_head *queue,
4463 int (*test)(struct dlm_ls *ls, struct dlm_lkb *lkb))
4464{
4465 struct dlm_ls *ls = r->res_ls;
4466 struct dlm_lkb *lkb, *safe;
4467
4468 list_for_each_entry_safe(lkb, safe, queue, lkb_statequeue) {
4469 if (test(ls, lkb)) {
97a35d1e 4470 rsb_set_flag(r, RSB_LOCKS_PURGED);
e7fd4179
DT
4471 del_lkb(r, lkb);
4472 /* this put should free the lkb */
b3f58d8f 4473 if (!dlm_put_lkb(lkb))
e7fd4179
DT
4474 log_error(ls, "purged lkb not released");
4475 }
4476 }
4477}
4478
4479static int purge_dead_test(struct dlm_ls *ls, struct dlm_lkb *lkb)
4480{
4481 return (is_master_copy(lkb) && dlm_is_removed(ls, lkb->lkb_nodeid));
4482}
4483
4484static int purge_mstcpy_test(struct dlm_ls *ls, struct dlm_lkb *lkb)
4485{
4486 return is_master_copy(lkb);
4487}
4488
4489static void purge_dead_locks(struct dlm_rsb *r)
4490{
4491 purge_queue(r, &r->res_grantqueue, &purge_dead_test);
4492 purge_queue(r, &r->res_convertqueue, &purge_dead_test);
4493 purge_queue(r, &r->res_waitqueue, &purge_dead_test);
4494}
4495
4496void dlm_purge_mstcpy_locks(struct dlm_rsb *r)
4497{
4498 purge_queue(r, &r->res_grantqueue, &purge_mstcpy_test);
4499 purge_queue(r, &r->res_convertqueue, &purge_mstcpy_test);
4500 purge_queue(r, &r->res_waitqueue, &purge_mstcpy_test);
4501}
4502
4503/* Get rid of locks held by nodes that are gone. */
4504
4505int dlm_purge_locks(struct dlm_ls *ls)
4506{
4507 struct dlm_rsb *r;
4508
4509 log_debug(ls, "dlm_purge_locks");
4510
4511 down_write(&ls->ls_root_sem);
4512 list_for_each_entry(r, &ls->ls_root_list, res_root_list) {
4513 hold_rsb(r);
4514 lock_rsb(r);
4515 if (is_master(r))
4516 purge_dead_locks(r);
4517 unlock_rsb(r);
4518 unhold_rsb(r);
4519
4520 schedule();
4521 }
4522 up_write(&ls->ls_root_sem);
4523
4524 return 0;
4525}
4526
97a35d1e
DT
4527static struct dlm_rsb *find_purged_rsb(struct dlm_ls *ls, int bucket)
4528{
9beb3bf5 4529 struct rb_node *n;
97a35d1e
DT
4530 struct dlm_rsb *r, *r_ret = NULL;
4531
c7be761a 4532 spin_lock(&ls->ls_rsbtbl[bucket].lock);
9beb3bf5
BP
4533 for (n = rb_first(&ls->ls_rsbtbl[bucket].keep); n; n = rb_next(n)) {
4534 r = rb_entry(n, struct dlm_rsb, res_hashnode);
97a35d1e
DT
4535 if (!rsb_flag(r, RSB_LOCKS_PURGED))
4536 continue;
4537 hold_rsb(r);
4538 rsb_clear_flag(r, RSB_LOCKS_PURGED);
4539 r_ret = r;
4540 break;
4541 }
c7be761a 4542 spin_unlock(&ls->ls_rsbtbl[bucket].lock);
97a35d1e
DT
4543 return r_ret;
4544}
4545
4546void dlm_grant_after_purge(struct dlm_ls *ls)
e7fd4179
DT
4547{
4548 struct dlm_rsb *r;
2b4e926a 4549 int bucket = 0;
e7fd4179 4550
2b4e926a
DT
4551 while (1) {
4552 r = find_purged_rsb(ls, bucket);
4553 if (!r) {
4554 if (bucket == ls->ls_rsbtbl_size - 1)
4555 break;
4556 bucket++;
97a35d1e 4557 continue;
2b4e926a 4558 }
97a35d1e
DT
4559 lock_rsb(r);
4560 if (is_master(r)) {
4561 grant_pending_locks(r);
4562 confirm_master(r, 0);
e7fd4179 4563 }
97a35d1e
DT
4564 unlock_rsb(r);
4565 put_rsb(r);
2b4e926a 4566 schedule();
e7fd4179 4567 }
e7fd4179
DT
4568}
4569
4570static struct dlm_lkb *search_remid_list(struct list_head *head, int nodeid,
4571 uint32_t remid)
4572{
4573 struct dlm_lkb *lkb;
4574
4575 list_for_each_entry(lkb, head, lkb_statequeue) {
4576 if (lkb->lkb_nodeid == nodeid && lkb->lkb_remid == remid)
4577 return lkb;
4578 }
4579 return NULL;
4580}
4581
4582static struct dlm_lkb *search_remid(struct dlm_rsb *r, int nodeid,
4583 uint32_t remid)
4584{
4585 struct dlm_lkb *lkb;
4586
4587 lkb = search_remid_list(&r->res_grantqueue, nodeid, remid);
4588 if (lkb)
4589 return lkb;
4590 lkb = search_remid_list(&r->res_convertqueue, nodeid, remid);
4591 if (lkb)
4592 return lkb;
4593 lkb = search_remid_list(&r->res_waitqueue, nodeid, remid);
4594 if (lkb)
4595 return lkb;
4596 return NULL;
4597}
4598
ae773d0b 4599/* needs at least dlm_rcom + rcom_lock */
e7fd4179
DT
4600static int receive_rcom_lock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
4601 struct dlm_rsb *r, struct dlm_rcom *rc)
4602{
4603 struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf;
e7fd4179
DT
4604
4605 lkb->lkb_nodeid = rc->rc_header.h_nodeid;
163a1859
AV
4606 lkb->lkb_ownpid = le32_to_cpu(rl->rl_ownpid);
4607 lkb->lkb_remid = le32_to_cpu(rl->rl_lkid);
4608 lkb->lkb_exflags = le32_to_cpu(rl->rl_exflags);
4609 lkb->lkb_flags = le32_to_cpu(rl->rl_flags) & 0x0000FFFF;
e7fd4179 4610 lkb->lkb_flags |= DLM_IFL_MSTCPY;
163a1859 4611 lkb->lkb_lvbseq = le32_to_cpu(rl->rl_lvbseq);
e7fd4179
DT
4612 lkb->lkb_rqmode = rl->rl_rqmode;
4613 lkb->lkb_grmode = rl->rl_grmode;
4614 /* don't set lkb_status because add_lkb wants to itself */
4615
8304d6f2
DT
4616 lkb->lkb_bastfn = (rl->rl_asts & DLM_CB_BAST) ? &fake_bastfn : NULL;
4617 lkb->lkb_astfn = (rl->rl_asts & DLM_CB_CAST) ? &fake_astfn : NULL;
e7fd4179 4618
e7fd4179 4619 if (lkb->lkb_exflags & DLM_LKF_VALBLK) {
a5dd0631
AV
4620 int lvblen = rc->rc_header.h_length - sizeof(struct dlm_rcom) -
4621 sizeof(struct rcom_lock);
4622 if (lvblen > ls->ls_lvblen)
4623 return -EINVAL;
52bda2b5 4624 lkb->lkb_lvbptr = dlm_allocate_lvb(ls);
e7fd4179
DT
4625 if (!lkb->lkb_lvbptr)
4626 return -ENOMEM;
e7fd4179
DT
4627 memcpy(lkb->lkb_lvbptr, rl->rl_lvb, lvblen);
4628 }
4629
4630 /* Conversions between PR and CW (middle modes) need special handling.
4631 The real granted mode of these converting locks cannot be determined
4632 until all locks have been rebuilt on the rsb (recover_conversion) */
4633
163a1859
AV
4634 if (rl->rl_wait_type == cpu_to_le16(DLM_MSG_CONVERT) &&
4635 middle_conversion(lkb)) {
e7fd4179
DT
4636 rl->rl_status = DLM_LKSTS_CONVERT;
4637 lkb->lkb_grmode = DLM_LOCK_IV;
4638 rsb_set_flag(r, RSB_RECOVER_CONVERT);
4639 }
4640
4641 return 0;
4642}
4643
4644/* This lkb may have been recovered in a previous aborted recovery so we need
4645 to check if the rsb already has an lkb with the given remote nodeid/lkid.
4646 If so we just send back a standard reply. If not, we create a new lkb with
4647 the given values and send back our lkid. We send back our lkid by sending
4648 back the rcom_lock struct we got but with the remid field filled in. */
4649
ae773d0b 4650/* needs at least dlm_rcom + rcom_lock */
e7fd4179
DT
4651int dlm_recover_master_copy(struct dlm_ls *ls, struct dlm_rcom *rc)
4652{
4653 struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf;
4654 struct dlm_rsb *r;
4655 struct dlm_lkb *lkb;
4656 int error;
4657
4658 if (rl->rl_parent_lkid) {
4659 error = -EOPNOTSUPP;
4660 goto out;
4661 }
4662
163a1859
AV
4663 error = find_rsb(ls, rl->rl_name, le16_to_cpu(rl->rl_namelen),
4664 R_MASTER, &r);
e7fd4179
DT
4665 if (error)
4666 goto out;
4667
4668 lock_rsb(r);
4669
163a1859 4670 lkb = search_remid(r, rc->rc_header.h_nodeid, le32_to_cpu(rl->rl_lkid));
e7fd4179
DT
4671 if (lkb) {
4672 error = -EEXIST;
4673 goto out_remid;
4674 }
4675
4676 error = create_lkb(ls, &lkb);
4677 if (error)
4678 goto out_unlock;
4679
4680 error = receive_rcom_lock_args(ls, lkb, r, rc);
4681 if (error) {
b3f58d8f 4682 __put_lkb(ls, lkb);
e7fd4179
DT
4683 goto out_unlock;
4684 }
4685
4686 attach_lkb(r, lkb);
4687 add_lkb(r, lkb, rl->rl_status);
4688 error = 0;
4689
4690 out_remid:
4691 /* this is the new value returned to the lock holder for
4692 saving in its process-copy lkb */
163a1859 4693 rl->rl_remid = cpu_to_le32(lkb->lkb_id);
e7fd4179
DT
4694
4695 out_unlock:
4696 unlock_rsb(r);
4697 put_rsb(r);
4698 out:
4699 if (error)
163a1859
AV
4700 log_debug(ls, "recover_master_copy %d %x", error,
4701 le32_to_cpu(rl->rl_lkid));
4702 rl->rl_result = cpu_to_le32(error);
e7fd4179
DT
4703 return error;
4704}
4705
ae773d0b 4706/* needs at least dlm_rcom + rcom_lock */
e7fd4179
DT
4707int dlm_recover_process_copy(struct dlm_ls *ls, struct dlm_rcom *rc)
4708{
4709 struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf;
4710 struct dlm_rsb *r;
4711 struct dlm_lkb *lkb;
4712 int error;
4713
163a1859 4714 error = find_lkb(ls, le32_to_cpu(rl->rl_lkid), &lkb);
e7fd4179 4715 if (error) {
163a1859
AV
4716 log_error(ls, "recover_process_copy no lkid %x",
4717 le32_to_cpu(rl->rl_lkid));
e7fd4179
DT
4718 return error;
4719 }
4720
4721 DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb););
4722
163a1859 4723 error = le32_to_cpu(rl->rl_result);
e7fd4179
DT
4724
4725 r = lkb->lkb_resource;
4726 hold_rsb(r);
4727 lock_rsb(r);
4728
4729 switch (error) {
dc200a88
DT
4730 case -EBADR:
4731 /* There's a chance the new master received our lock before
4732 dlm_recover_master_reply(), this wouldn't happen if we did
4733 a barrier between recover_masters and recover_locks. */
4734 log_debug(ls, "master copy not ready %x r %lx %s", lkb->lkb_id,
4735 (unsigned long)r, r->res_name);
4736 dlm_send_rcom_lock(r, lkb);
4737 goto out;
e7fd4179
DT
4738 case -EEXIST:
4739 log_debug(ls, "master copy exists %x", lkb->lkb_id);
4740 /* fall through */
4741 case 0:
163a1859 4742 lkb->lkb_remid = le32_to_cpu(rl->rl_remid);
e7fd4179
DT
4743 break;
4744 default:
4745 log_error(ls, "dlm_recover_process_copy unknown error %d %x",
4746 error, lkb->lkb_id);
4747 }
4748
4749 /* an ack for dlm_recover_locks() which waits for replies from
4750 all the locks it sends to new masters */
4751 dlm_recovered_lock(r);
dc200a88 4752 out:
e7fd4179
DT
4753 unlock_rsb(r);
4754 put_rsb(r);
b3f58d8f 4755 dlm_put_lkb(lkb);
e7fd4179
DT
4756
4757 return 0;
4758}
4759
597d0cae
DT
4760int dlm_user_request(struct dlm_ls *ls, struct dlm_user_args *ua,
4761 int mode, uint32_t flags, void *name, unsigned int namelen,
d7db923e 4762 unsigned long timeout_cs)
597d0cae
DT
4763{
4764 struct dlm_lkb *lkb;
4765 struct dlm_args args;
4766 int error;
4767
85e86edf 4768 dlm_lock_recovery(ls);
597d0cae
DT
4769
4770 error = create_lkb(ls, &lkb);
4771 if (error) {
4772 kfree(ua);
4773 goto out;
4774 }
4775
4776 if (flags & DLM_LKF_VALBLK) {
573c24c4 4777 ua->lksb.sb_lvbptr = kzalloc(DLM_USER_LVB_LEN, GFP_NOFS);
597d0cae
DT
4778 if (!ua->lksb.sb_lvbptr) {
4779 kfree(ua);
4780 __put_lkb(ls, lkb);
4781 error = -ENOMEM;
4782 goto out;
4783 }
4784 }
4785
52bda2b5 4786 /* After ua is attached to lkb it will be freed by dlm_free_lkb().
597d0cae
DT
4787 When DLM_IFL_USER is set, the dlm knows that this is a userspace
4788 lock and that lkb_astparam is the dlm_user_args structure. */
4789
d7db923e 4790 error = set_lock_args(mode, &ua->lksb, flags, namelen, timeout_cs,
e5dae548 4791 fake_astfn, ua, fake_bastfn, &args);
597d0cae 4792 lkb->lkb_flags |= DLM_IFL_USER;
597d0cae
DT
4793
4794 if (error) {
4795 __put_lkb(ls, lkb);
4796 goto out;
4797 }
4798
4799 error = request_lock(ls, lkb, name, namelen, &args);
4800
4801 switch (error) {
4802 case 0:
4803 break;
4804 case -EINPROGRESS:
4805 error = 0;
4806 break;
4807 case -EAGAIN:
4808 error = 0;
4809 /* fall through */
4810 default:
4811 __put_lkb(ls, lkb);
4812 goto out;
4813 }
4814
4815 /* add this new lkb to the per-process list of locks */
4816 spin_lock(&ua->proc->locks_spin);
ef0c2bb0 4817 hold_lkb(lkb);
597d0cae
DT
4818 list_add_tail(&lkb->lkb_ownqueue, &ua->proc->locks);
4819 spin_unlock(&ua->proc->locks_spin);
4820 out:
85e86edf 4821 dlm_unlock_recovery(ls);
597d0cae
DT
4822 return error;
4823}
4824
4825int dlm_user_convert(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
d7db923e
DT
4826 int mode, uint32_t flags, uint32_t lkid, char *lvb_in,
4827 unsigned long timeout_cs)
597d0cae
DT
4828{
4829 struct dlm_lkb *lkb;
4830 struct dlm_args args;
4831 struct dlm_user_args *ua;
4832 int error;
4833
85e86edf 4834 dlm_lock_recovery(ls);
597d0cae
DT
4835
4836 error = find_lkb(ls, lkid, &lkb);
4837 if (error)
4838 goto out;
4839
4840 /* user can change the params on its lock when it converts it, or
4841 add an lvb that didn't exist before */
4842
d292c0cc 4843 ua = lkb->lkb_ua;
597d0cae
DT
4844
4845 if (flags & DLM_LKF_VALBLK && !ua->lksb.sb_lvbptr) {
573c24c4 4846 ua->lksb.sb_lvbptr = kzalloc(DLM_USER_LVB_LEN, GFP_NOFS);
597d0cae
DT
4847 if (!ua->lksb.sb_lvbptr) {
4848 error = -ENOMEM;
4849 goto out_put;
4850 }
4851 }
4852 if (lvb_in && ua->lksb.sb_lvbptr)
4853 memcpy(ua->lksb.sb_lvbptr, lvb_in, DLM_USER_LVB_LEN);
4854
d7db923e 4855 ua->xid = ua_tmp->xid;
597d0cae
DT
4856 ua->castparam = ua_tmp->castparam;
4857 ua->castaddr = ua_tmp->castaddr;
4858 ua->bastparam = ua_tmp->bastparam;
4859 ua->bastaddr = ua_tmp->bastaddr;
10948eb4 4860 ua->user_lksb = ua_tmp->user_lksb;
597d0cae 4861
d7db923e 4862 error = set_lock_args(mode, &ua->lksb, flags, 0, timeout_cs,
e5dae548 4863 fake_astfn, ua, fake_bastfn, &args);
597d0cae
DT
4864 if (error)
4865 goto out_put;
4866
4867 error = convert_lock(ls, lkb, &args);
4868
c85d65e9 4869 if (error == -EINPROGRESS || error == -EAGAIN || error == -EDEADLK)
597d0cae
DT
4870 error = 0;
4871 out_put:
4872 dlm_put_lkb(lkb);
4873 out:
85e86edf 4874 dlm_unlock_recovery(ls);
597d0cae
DT
4875 kfree(ua_tmp);
4876 return error;
4877}
4878
4879int dlm_user_unlock(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
4880 uint32_t flags, uint32_t lkid, char *lvb_in)
4881{
4882 struct dlm_lkb *lkb;
4883 struct dlm_args args;
4884 struct dlm_user_args *ua;
4885 int error;
4886
85e86edf 4887 dlm_lock_recovery(ls);
597d0cae
DT
4888
4889 error = find_lkb(ls, lkid, &lkb);
4890 if (error)
4891 goto out;
4892
d292c0cc 4893 ua = lkb->lkb_ua;
597d0cae
DT
4894
4895 if (lvb_in && ua->lksb.sb_lvbptr)
4896 memcpy(ua->lksb.sb_lvbptr, lvb_in, DLM_USER_LVB_LEN);
b434eda6
PC
4897 if (ua_tmp->castparam)
4898 ua->castparam = ua_tmp->castparam;
cc346d55 4899 ua->user_lksb = ua_tmp->user_lksb;
597d0cae
DT
4900
4901 error = set_unlock_args(flags, ua, &args);
4902 if (error)
4903 goto out_put;
4904
4905 error = unlock_lock(ls, lkb, &args);
4906
4907 if (error == -DLM_EUNLOCK)
4908 error = 0;
ef0c2bb0
DT
4909 /* from validate_unlock_args() */
4910 if (error == -EBUSY && (flags & DLM_LKF_FORCEUNLOCK))
4911 error = 0;
597d0cae
DT
4912 if (error)
4913 goto out_put;
4914
4915 spin_lock(&ua->proc->locks_spin);
23e8e1aa 4916 /* dlm_user_add_cb() may have already taken lkb off the proc list */
a1bc86e6
DT
4917 if (!list_empty(&lkb->lkb_ownqueue))
4918 list_move(&lkb->lkb_ownqueue, &ua->proc->unlocking);
597d0cae 4919 spin_unlock(&ua->proc->locks_spin);
597d0cae
DT
4920 out_put:
4921 dlm_put_lkb(lkb);
4922 out:
85e86edf 4923 dlm_unlock_recovery(ls);
ef0c2bb0 4924 kfree(ua_tmp);
597d0cae
DT
4925 return error;
4926}
4927
4928int dlm_user_cancel(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
4929 uint32_t flags, uint32_t lkid)
4930{
4931 struct dlm_lkb *lkb;
4932 struct dlm_args args;
4933 struct dlm_user_args *ua;
4934 int error;
4935
85e86edf 4936 dlm_lock_recovery(ls);
597d0cae
DT
4937
4938 error = find_lkb(ls, lkid, &lkb);
4939 if (error)
4940 goto out;
4941
d292c0cc 4942 ua = lkb->lkb_ua;
b434eda6
PC
4943 if (ua_tmp->castparam)
4944 ua->castparam = ua_tmp->castparam;
c059f70e 4945 ua->user_lksb = ua_tmp->user_lksb;
597d0cae
DT
4946
4947 error = set_unlock_args(flags, ua, &args);
4948 if (error)
4949 goto out_put;
4950
4951 error = cancel_lock(ls, lkb, &args);
4952
4953 if (error == -DLM_ECANCEL)
4954 error = 0;
ef0c2bb0
DT
4955 /* from validate_unlock_args() */
4956 if (error == -EBUSY)
4957 error = 0;
597d0cae
DT
4958 out_put:
4959 dlm_put_lkb(lkb);
4960 out:
85e86edf 4961 dlm_unlock_recovery(ls);
ef0c2bb0 4962 kfree(ua_tmp);
597d0cae
DT
4963 return error;
4964}
4965
8b4021fa
DT
4966int dlm_user_deadlock(struct dlm_ls *ls, uint32_t flags, uint32_t lkid)
4967{
4968 struct dlm_lkb *lkb;
4969 struct dlm_args args;
4970 struct dlm_user_args *ua;
4971 struct dlm_rsb *r;
4972 int error;
4973
4974 dlm_lock_recovery(ls);
4975
4976 error = find_lkb(ls, lkid, &lkb);
4977 if (error)
4978 goto out;
4979
d292c0cc 4980 ua = lkb->lkb_ua;
8b4021fa
DT
4981
4982 error = set_unlock_args(flags, ua, &args);
4983 if (error)
4984 goto out_put;
4985
4986 /* same as cancel_lock(), but set DEADLOCK_CANCEL after lock_rsb */
4987
4988 r = lkb->lkb_resource;
4989 hold_rsb(r);
4990 lock_rsb(r);
4991
4992 error = validate_unlock_args(lkb, &args);
4993 if (error)
4994 goto out_r;
4995 lkb->lkb_flags |= DLM_IFL_DEADLOCK_CANCEL;
4996
4997 error = _cancel_lock(r, lkb);
4998 out_r:
4999 unlock_rsb(r);
5000 put_rsb(r);
5001
5002 if (error == -DLM_ECANCEL)
5003 error = 0;
5004 /* from validate_unlock_args() */
5005 if (error == -EBUSY)
5006 error = 0;
5007 out_put:
5008 dlm_put_lkb(lkb);
5009 out:
5010 dlm_unlock_recovery(ls);
5011 return error;
5012}
5013
ef0c2bb0
DT
5014/* lkb's that are removed from the waiters list by revert are just left on the
5015 orphans list with the granted orphan locks, to be freed by purge */
5016
597d0cae
DT
5017static int orphan_proc_lock(struct dlm_ls *ls, struct dlm_lkb *lkb)
5018{
ef0c2bb0
DT
5019 struct dlm_args args;
5020 int error;
597d0cae 5021
ef0c2bb0
DT
5022 hold_lkb(lkb);
5023 mutex_lock(&ls->ls_orphans_mutex);
5024 list_add_tail(&lkb->lkb_ownqueue, &ls->ls_orphans);
5025 mutex_unlock(&ls->ls_orphans_mutex);
597d0cae 5026
d292c0cc 5027 set_unlock_args(0, lkb->lkb_ua, &args);
ef0c2bb0
DT
5028
5029 error = cancel_lock(ls, lkb, &args);
5030 if (error == -DLM_ECANCEL)
5031 error = 0;
5032 return error;
597d0cae
DT
5033}
5034
5035/* The force flag allows the unlock to go ahead even if the lkb isn't granted.
5036 Regardless of what rsb queue the lock is on, it's removed and freed. */
5037
5038static int unlock_proc_lock(struct dlm_ls *ls, struct dlm_lkb *lkb)
5039{
597d0cae
DT
5040 struct dlm_args args;
5041 int error;
5042
d292c0cc 5043 set_unlock_args(DLM_LKF_FORCEUNLOCK, lkb->lkb_ua, &args);
597d0cae
DT
5044
5045 error = unlock_lock(ls, lkb, &args);
5046 if (error == -DLM_EUNLOCK)
5047 error = 0;
5048 return error;
5049}
5050
ef0c2bb0
DT
5051/* We have to release clear_proc_locks mutex before calling unlock_proc_lock()
5052 (which does lock_rsb) due to deadlock with receiving a message that does
23e8e1aa 5053 lock_rsb followed by dlm_user_add_cb() */
ef0c2bb0
DT
5054
5055static struct dlm_lkb *del_proc_lock(struct dlm_ls *ls,
5056 struct dlm_user_proc *proc)
5057{
5058 struct dlm_lkb *lkb = NULL;
5059
5060 mutex_lock(&ls->ls_clear_proc_locks);
5061 if (list_empty(&proc->locks))
5062 goto out;
5063
5064 lkb = list_entry(proc->locks.next, struct dlm_lkb, lkb_ownqueue);
5065 list_del_init(&lkb->lkb_ownqueue);
5066
5067 if (lkb->lkb_exflags & DLM_LKF_PERSISTENT)
5068 lkb->lkb_flags |= DLM_IFL_ORPHAN;
5069 else
5070 lkb->lkb_flags |= DLM_IFL_DEAD;
5071 out:
5072 mutex_unlock(&ls->ls_clear_proc_locks);
5073 return lkb;
5074}
5075
23e8e1aa 5076/* The ls_clear_proc_locks mutex protects against dlm_user_add_cb() which
597d0cae
DT
5077 1) references lkb->ua which we free here and 2) adds lkbs to proc->asts,
5078 which we clear here. */
5079
5080/* proc CLOSING flag is set so no more device_reads should look at proc->asts
5081 list, and no more device_writes should add lkb's to proc->locks list; so we
5082 shouldn't need to take asts_spin or locks_spin here. this assumes that
5083 device reads/writes/closes are serialized -- FIXME: we may need to serialize
5084 them ourself. */
5085
5086void dlm_clear_proc_locks(struct dlm_ls *ls, struct dlm_user_proc *proc)
5087{
5088 struct dlm_lkb *lkb, *safe;
5089
85e86edf 5090 dlm_lock_recovery(ls);
597d0cae 5091
ef0c2bb0
DT
5092 while (1) {
5093 lkb = del_proc_lock(ls, proc);
5094 if (!lkb)
5095 break;
84d8cd69 5096 del_timeout(lkb);
ef0c2bb0 5097 if (lkb->lkb_exflags & DLM_LKF_PERSISTENT)
597d0cae 5098 orphan_proc_lock(ls, lkb);
ef0c2bb0 5099 else
597d0cae 5100 unlock_proc_lock(ls, lkb);
597d0cae
DT
5101
5102 /* this removes the reference for the proc->locks list
5103 added by dlm_user_request, it may result in the lkb
5104 being freed */
5105
5106 dlm_put_lkb(lkb);
5107 }
a1bc86e6 5108
ef0c2bb0
DT
5109 mutex_lock(&ls->ls_clear_proc_locks);
5110
a1bc86e6
DT
5111 /* in-progress unlocks */
5112 list_for_each_entry_safe(lkb, safe, &proc->unlocking, lkb_ownqueue) {
5113 list_del_init(&lkb->lkb_ownqueue);
5114 lkb->lkb_flags |= DLM_IFL_DEAD;
5115 dlm_put_lkb(lkb);
5116 }
5117
23e8e1aa 5118 list_for_each_entry_safe(lkb, safe, &proc->asts, lkb_cb_list) {
8304d6f2
DT
5119 memset(&lkb->lkb_callbacks, 0,
5120 sizeof(struct dlm_callback) * DLM_CALLBACKS_SIZE);
23e8e1aa 5121 list_del_init(&lkb->lkb_cb_list);
a1bc86e6
DT
5122 dlm_put_lkb(lkb);
5123 }
5124
597d0cae 5125 mutex_unlock(&ls->ls_clear_proc_locks);
85e86edf 5126 dlm_unlock_recovery(ls);
597d0cae 5127}
a1bc86e6 5128
8499137d
DT
5129static void purge_proc_locks(struct dlm_ls *ls, struct dlm_user_proc *proc)
5130{
5131 struct dlm_lkb *lkb, *safe;
5132
5133 while (1) {
5134 lkb = NULL;
5135 spin_lock(&proc->locks_spin);
5136 if (!list_empty(&proc->locks)) {
5137 lkb = list_entry(proc->locks.next, struct dlm_lkb,
5138 lkb_ownqueue);
5139 list_del_init(&lkb->lkb_ownqueue);
5140 }
5141 spin_unlock(&proc->locks_spin);
5142
5143 if (!lkb)
5144 break;
5145
5146 lkb->lkb_flags |= DLM_IFL_DEAD;
5147 unlock_proc_lock(ls, lkb);
5148 dlm_put_lkb(lkb); /* ref from proc->locks list */
5149 }
5150
5151 spin_lock(&proc->locks_spin);
5152 list_for_each_entry_safe(lkb, safe, &proc->unlocking, lkb_ownqueue) {
5153 list_del_init(&lkb->lkb_ownqueue);
5154 lkb->lkb_flags |= DLM_IFL_DEAD;
5155 dlm_put_lkb(lkb);
5156 }
5157 spin_unlock(&proc->locks_spin);
5158
5159 spin_lock(&proc->asts_spin);
23e8e1aa 5160 list_for_each_entry_safe(lkb, safe, &proc->asts, lkb_cb_list) {
8304d6f2
DT
5161 memset(&lkb->lkb_callbacks, 0,
5162 sizeof(struct dlm_callback) * DLM_CALLBACKS_SIZE);
23e8e1aa 5163 list_del_init(&lkb->lkb_cb_list);
8499137d
DT
5164 dlm_put_lkb(lkb);
5165 }
5166 spin_unlock(&proc->asts_spin);
5167}
5168
5169/* pid of 0 means purge all orphans */
5170
5171static void do_purge(struct dlm_ls *ls, int nodeid, int pid)
5172{
5173 struct dlm_lkb *lkb, *safe;
5174
5175 mutex_lock(&ls->ls_orphans_mutex);
5176 list_for_each_entry_safe(lkb, safe, &ls->ls_orphans, lkb_ownqueue) {
5177 if (pid && lkb->lkb_ownpid != pid)
5178 continue;
5179 unlock_proc_lock(ls, lkb);
5180 list_del_init(&lkb->lkb_ownqueue);
5181 dlm_put_lkb(lkb);
5182 }
5183 mutex_unlock(&ls->ls_orphans_mutex);
5184}
5185
5186static int send_purge(struct dlm_ls *ls, int nodeid, int pid)
5187{
5188 struct dlm_message *ms;
5189 struct dlm_mhandle *mh;
5190 int error;
5191
5192 error = _create_message(ls, sizeof(struct dlm_message), nodeid,
5193 DLM_MSG_PURGE, &ms, &mh);
5194 if (error)
5195 return error;
5196 ms->m_nodeid = nodeid;
5197 ms->m_pid = pid;
5198
5199 return send_message(mh, ms);
5200}
5201
5202int dlm_user_purge(struct dlm_ls *ls, struct dlm_user_proc *proc,
5203 int nodeid, int pid)
5204{
5205 int error = 0;
5206
5207 if (nodeid != dlm_our_nodeid()) {
5208 error = send_purge(ls, nodeid, pid);
5209 } else {
85e86edf 5210 dlm_lock_recovery(ls);
8499137d
DT
5211 if (pid == current->pid)
5212 purge_proc_locks(ls, proc);
5213 else
5214 do_purge(ls, nodeid, pid);
85e86edf 5215 dlm_unlock_recovery(ls);
8499137d
DT
5216 }
5217 return error;
5218}
5219
This page took 0.615962 seconds and 5 git commands to generate.