dlm: improve error and debug messages
[deliverable/linux.git] / fs / dlm / lock.c
CommitLineData
e7fd4179
DT
1/******************************************************************************
2*******************************************************************************
3**
7fe2b319 4** Copyright (C) 2005-2010 Red Hat, Inc. All rights reserved.
e7fd4179
DT
5**
6** This copyrighted material is made available to anyone wishing to use,
7** modify, copy, or redistribute it subject to the terms and conditions
8** of the GNU General Public License v.2.
9**
10*******************************************************************************
11******************************************************************************/
12
13/* Central locking logic has four stages:
14
15 dlm_lock()
16 dlm_unlock()
17
18 request_lock(ls, lkb)
19 convert_lock(ls, lkb)
20 unlock_lock(ls, lkb)
21 cancel_lock(ls, lkb)
22
23 _request_lock(r, lkb)
24 _convert_lock(r, lkb)
25 _unlock_lock(r, lkb)
26 _cancel_lock(r, lkb)
27
28 do_request(r, lkb)
29 do_convert(r, lkb)
30 do_unlock(r, lkb)
31 do_cancel(r, lkb)
32
33 Stage 1 (lock, unlock) is mainly about checking input args and
34 splitting into one of the four main operations:
35
36 dlm_lock = request_lock
37 dlm_lock+CONVERT = convert_lock
38 dlm_unlock = unlock_lock
39 dlm_unlock+CANCEL = cancel_lock
40
41 Stage 2, xxxx_lock(), just finds and locks the relevant rsb which is
42 provided to the next stage.
43
44 Stage 3, _xxxx_lock(), determines if the operation is local or remote.
45 When remote, it calls send_xxxx(), when local it calls do_xxxx().
46
47 Stage 4, do_xxxx(), is the guts of the operation. It manipulates the
48 given rsb and lkb and queues callbacks.
49
50 For remote operations, send_xxxx() results in the corresponding do_xxxx()
51 function being executed on the remote node. The connecting send/receive
52 calls on local (L) and remote (R) nodes:
53
54 L: send_xxxx() -> R: receive_xxxx()
55 R: do_xxxx()
56 L: receive_xxxx_reply() <- R: send_xxxx_reply()
57*/
597d0cae 58#include <linux/types.h>
9beb3bf5 59#include <linux/rbtree.h>
5a0e3ad6 60#include <linux/slab.h>
e7fd4179 61#include "dlm_internal.h"
597d0cae 62#include <linux/dlm_device.h>
e7fd4179
DT
63#include "memory.h"
64#include "lowcomms.h"
65#include "requestqueue.h"
66#include "util.h"
67#include "dir.h"
68#include "member.h"
69#include "lockspace.h"
70#include "ast.h"
71#include "lock.h"
72#include "rcom.h"
73#include "recover.h"
74#include "lvb_table.h"
597d0cae 75#include "user.h"
e7fd4179
DT
76#include "config.h"
77
78static int send_request(struct dlm_rsb *r, struct dlm_lkb *lkb);
79static int send_convert(struct dlm_rsb *r, struct dlm_lkb *lkb);
80static int send_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb);
81static int send_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb);
82static int send_grant(struct dlm_rsb *r, struct dlm_lkb *lkb);
83static int send_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int mode);
84static int send_lookup(struct dlm_rsb *r, struct dlm_lkb *lkb);
85static int send_remove(struct dlm_rsb *r);
86static int _request_lock(struct dlm_rsb *r, struct dlm_lkb *lkb);
3ae1acf9 87static int _cancel_lock(struct dlm_rsb *r, struct dlm_lkb *lkb);
e7fd4179
DT
88static void __receive_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
89 struct dlm_message *ms);
90static int receive_extralen(struct dlm_message *ms);
8499137d 91static void do_purge(struct dlm_ls *ls, int nodeid, int pid);
3ae1acf9 92static void del_timeout(struct dlm_lkb *lkb);
e7fd4179
DT
93
94/*
95 * Lock compatibilty matrix - thanks Steve
96 * UN = Unlocked state. Not really a state, used as a flag
97 * PD = Padding. Used to make the matrix a nice power of two in size
98 * Other states are the same as the VMS DLM.
99 * Usage: matrix[grmode+1][rqmode+1] (although m[rq+1][gr+1] is the same)
100 */
101
102static const int __dlm_compat_matrix[8][8] = {
103 /* UN NL CR CW PR PW EX PD */
104 {1, 1, 1, 1, 1, 1, 1, 0}, /* UN */
105 {1, 1, 1, 1, 1, 1, 1, 0}, /* NL */
106 {1, 1, 1, 1, 1, 1, 0, 0}, /* CR */
107 {1, 1, 1, 1, 0, 0, 0, 0}, /* CW */
108 {1, 1, 1, 0, 1, 0, 0, 0}, /* PR */
109 {1, 1, 1, 0, 0, 0, 0, 0}, /* PW */
110 {1, 1, 0, 0, 0, 0, 0, 0}, /* EX */
111 {0, 0, 0, 0, 0, 0, 0, 0} /* PD */
112};
113
114/*
115 * This defines the direction of transfer of LVB data.
116 * Granted mode is the row; requested mode is the column.
117 * Usage: matrix[grmode+1][rqmode+1]
118 * 1 = LVB is returned to the caller
119 * 0 = LVB is written to the resource
120 * -1 = nothing happens to the LVB
121 */
122
123const int dlm_lvb_operations[8][8] = {
124 /* UN NL CR CW PR PW EX PD*/
125 { -1, 1, 1, 1, 1, 1, 1, -1 }, /* UN */
126 { -1, 1, 1, 1, 1, 1, 1, 0 }, /* NL */
127 { -1, -1, 1, 1, 1, 1, 1, 0 }, /* CR */
128 { -1, -1, -1, 1, 1, 1, 1, 0 }, /* CW */
129 { -1, -1, -1, -1, 1, 1, 1, 0 }, /* PR */
130 { -1, 0, 0, 0, 0, 0, 1, 0 }, /* PW */
131 { -1, 0, 0, 0, 0, 0, 0, 0 }, /* EX */
132 { -1, 0, 0, 0, 0, 0, 0, 0 } /* PD */
133};
e7fd4179
DT
134
135#define modes_compat(gr, rq) \
136 __dlm_compat_matrix[(gr)->lkb_grmode + 1][(rq)->lkb_rqmode + 1]
137
138int dlm_modes_compat(int mode1, int mode2)
139{
140 return __dlm_compat_matrix[mode1 + 1][mode2 + 1];
141}
142
143/*
144 * Compatibility matrix for conversions with QUECVT set.
145 * Granted mode is the row; requested mode is the column.
146 * Usage: matrix[grmode+1][rqmode+1]
147 */
148
149static const int __quecvt_compat_matrix[8][8] = {
150 /* UN NL CR CW PR PW EX PD */
151 {0, 0, 0, 0, 0, 0, 0, 0}, /* UN */
152 {0, 0, 1, 1, 1, 1, 1, 0}, /* NL */
153 {0, 0, 0, 1, 1, 1, 1, 0}, /* CR */
154 {0, 0, 0, 0, 1, 1, 1, 0}, /* CW */
155 {0, 0, 0, 1, 0, 1, 1, 0}, /* PR */
156 {0, 0, 0, 0, 0, 0, 1, 0}, /* PW */
157 {0, 0, 0, 0, 0, 0, 0, 0}, /* EX */
158 {0, 0, 0, 0, 0, 0, 0, 0} /* PD */
159};
160
597d0cae 161void dlm_print_lkb(struct dlm_lkb *lkb)
e7fd4179 162{
6d40c4a7
DT
163 printk(KERN_ERR "lkb: nodeid %d id %x remid %x exflags %x flags %x "
164 "sts %d rq %d gr %d wait_type %d wait_nodeid %d\n",
e7fd4179
DT
165 lkb->lkb_nodeid, lkb->lkb_id, lkb->lkb_remid, lkb->lkb_exflags,
166 lkb->lkb_flags, lkb->lkb_status, lkb->lkb_rqmode,
6d40c4a7 167 lkb->lkb_grmode, lkb->lkb_wait_type, lkb->lkb_wait_nodeid);
e7fd4179
DT
168}
169
170e19ab 170static void dlm_print_rsb(struct dlm_rsb *r)
e7fd4179
DT
171{
172 printk(KERN_ERR "rsb: nodeid %d flags %lx first %x rlc %d name %s\n",
173 r->res_nodeid, r->res_flags, r->res_first_lkid,
174 r->res_recover_locks_count, r->res_name);
175}
176
a345da3e
DT
177void dlm_dump_rsb(struct dlm_rsb *r)
178{
179 struct dlm_lkb *lkb;
180
181 dlm_print_rsb(r);
182
183 printk(KERN_ERR "rsb: root_list empty %d recover_list empty %d\n",
184 list_empty(&r->res_root_list), list_empty(&r->res_recover_list));
185 printk(KERN_ERR "rsb lookup list\n");
186 list_for_each_entry(lkb, &r->res_lookup, lkb_rsb_lookup)
187 dlm_print_lkb(lkb);
188 printk(KERN_ERR "rsb grant queue:\n");
189 list_for_each_entry(lkb, &r->res_grantqueue, lkb_statequeue)
190 dlm_print_lkb(lkb);
191 printk(KERN_ERR "rsb convert queue:\n");
192 list_for_each_entry(lkb, &r->res_convertqueue, lkb_statequeue)
193 dlm_print_lkb(lkb);
194 printk(KERN_ERR "rsb wait queue:\n");
195 list_for_each_entry(lkb, &r->res_waitqueue, lkb_statequeue)
196 dlm_print_lkb(lkb);
197}
198
e7fd4179
DT
199/* Threads cannot use the lockspace while it's being recovered */
200
85e86edf 201static inline void dlm_lock_recovery(struct dlm_ls *ls)
e7fd4179
DT
202{
203 down_read(&ls->ls_in_recovery);
204}
205
85e86edf 206void dlm_unlock_recovery(struct dlm_ls *ls)
e7fd4179
DT
207{
208 up_read(&ls->ls_in_recovery);
209}
210
85e86edf 211int dlm_lock_recovery_try(struct dlm_ls *ls)
e7fd4179
DT
212{
213 return down_read_trylock(&ls->ls_in_recovery);
214}
215
216static inline int can_be_queued(struct dlm_lkb *lkb)
217{
218 return !(lkb->lkb_exflags & DLM_LKF_NOQUEUE);
219}
220
221static inline int force_blocking_asts(struct dlm_lkb *lkb)
222{
223 return (lkb->lkb_exflags & DLM_LKF_NOQUEUEBAST);
224}
225
226static inline int is_demoted(struct dlm_lkb *lkb)
227{
228 return (lkb->lkb_sbflags & DLM_SBF_DEMOTED);
229}
230
7d3c1feb
DT
231static inline int is_altmode(struct dlm_lkb *lkb)
232{
233 return (lkb->lkb_sbflags & DLM_SBF_ALTMODE);
234}
235
236static inline int is_granted(struct dlm_lkb *lkb)
237{
238 return (lkb->lkb_status == DLM_LKSTS_GRANTED);
239}
240
e7fd4179
DT
241static inline int is_remote(struct dlm_rsb *r)
242{
243 DLM_ASSERT(r->res_nodeid >= 0, dlm_print_rsb(r););
244 return !!r->res_nodeid;
245}
246
247static inline int is_process_copy(struct dlm_lkb *lkb)
248{
249 return (lkb->lkb_nodeid && !(lkb->lkb_flags & DLM_IFL_MSTCPY));
250}
251
252static inline int is_master_copy(struct dlm_lkb *lkb)
253{
254 if (lkb->lkb_flags & DLM_IFL_MSTCPY)
255 DLM_ASSERT(lkb->lkb_nodeid, dlm_print_lkb(lkb););
90135925 256 return (lkb->lkb_flags & DLM_IFL_MSTCPY) ? 1 : 0;
e7fd4179
DT
257}
258
259static inline int middle_conversion(struct dlm_lkb *lkb)
260{
261 if ((lkb->lkb_grmode==DLM_LOCK_PR && lkb->lkb_rqmode==DLM_LOCK_CW) ||
262 (lkb->lkb_rqmode==DLM_LOCK_PR && lkb->lkb_grmode==DLM_LOCK_CW))
90135925
DT
263 return 1;
264 return 0;
e7fd4179
DT
265}
266
267static inline int down_conversion(struct dlm_lkb *lkb)
268{
269 return (!middle_conversion(lkb) && lkb->lkb_rqmode < lkb->lkb_grmode);
270}
271
ef0c2bb0
DT
272static inline int is_overlap_unlock(struct dlm_lkb *lkb)
273{
274 return lkb->lkb_flags & DLM_IFL_OVERLAP_UNLOCK;
275}
276
277static inline int is_overlap_cancel(struct dlm_lkb *lkb)
278{
279 return lkb->lkb_flags & DLM_IFL_OVERLAP_CANCEL;
280}
281
282static inline int is_overlap(struct dlm_lkb *lkb)
283{
284 return (lkb->lkb_flags & (DLM_IFL_OVERLAP_UNLOCK |
285 DLM_IFL_OVERLAP_CANCEL));
286}
287
e7fd4179
DT
288static void queue_cast(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
289{
290 if (is_master_copy(lkb))
291 return;
292
3ae1acf9
DT
293 del_timeout(lkb);
294
e7fd4179
DT
295 DLM_ASSERT(lkb->lkb_lksb, dlm_print_lkb(lkb););
296
3ae1acf9
DT
297 /* if the operation was a cancel, then return -DLM_ECANCEL, if a
298 timeout caused the cancel then return -ETIMEDOUT */
299 if (rv == -DLM_ECANCEL && (lkb->lkb_flags & DLM_IFL_TIMEOUT_CANCEL)) {
300 lkb->lkb_flags &= ~DLM_IFL_TIMEOUT_CANCEL;
301 rv = -ETIMEDOUT;
302 }
303
8b4021fa
DT
304 if (rv == -DLM_ECANCEL && (lkb->lkb_flags & DLM_IFL_DEADLOCK_CANCEL)) {
305 lkb->lkb_flags &= ~DLM_IFL_DEADLOCK_CANCEL;
306 rv = -EDEADLK;
307 }
308
23e8e1aa 309 dlm_add_cb(lkb, DLM_CB_CAST, lkb->lkb_grmode, rv, lkb->lkb_sbflags);
e7fd4179
DT
310}
311
ef0c2bb0
DT
312static inline void queue_cast_overlap(struct dlm_rsb *r, struct dlm_lkb *lkb)
313{
314 queue_cast(r, lkb,
315 is_overlap_unlock(lkb) ? -DLM_EUNLOCK : -DLM_ECANCEL);
316}
317
e7fd4179
DT
318static void queue_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int rqmode)
319{
b6fa8796 320 if (is_master_copy(lkb)) {
e7fd4179 321 send_bast(r, lkb, rqmode);
b6fa8796 322 } else {
23e8e1aa 323 dlm_add_cb(lkb, DLM_CB_BAST, rqmode, 0, 0);
b6fa8796 324 }
e7fd4179
DT
325}
326
327/*
328 * Basic operations on rsb's and lkb's
329 */
330
3881ac04
DT
331static int pre_rsb_struct(struct dlm_ls *ls)
332{
333 struct dlm_rsb *r1, *r2;
334 int count = 0;
335
336 spin_lock(&ls->ls_new_rsb_spin);
337 if (ls->ls_new_rsb_count > dlm_config.ci_new_rsb_count / 2) {
338 spin_unlock(&ls->ls_new_rsb_spin);
339 return 0;
340 }
341 spin_unlock(&ls->ls_new_rsb_spin);
342
343 r1 = dlm_allocate_rsb(ls);
344 r2 = dlm_allocate_rsb(ls);
345
346 spin_lock(&ls->ls_new_rsb_spin);
347 if (r1) {
348 list_add(&r1->res_hashchain, &ls->ls_new_rsb);
349 ls->ls_new_rsb_count++;
350 }
351 if (r2) {
352 list_add(&r2->res_hashchain, &ls->ls_new_rsb);
353 ls->ls_new_rsb_count++;
354 }
355 count = ls->ls_new_rsb_count;
356 spin_unlock(&ls->ls_new_rsb_spin);
357
358 if (!count)
359 return -ENOMEM;
360 return 0;
361}
362
363/* If ls->ls_new_rsb is empty, return -EAGAIN, so the caller can
364 unlock any spinlocks, go back and call pre_rsb_struct again.
365 Otherwise, take an rsb off the list and return it. */
366
367static int get_rsb_struct(struct dlm_ls *ls, char *name, int len,
368 struct dlm_rsb **r_ret)
e7fd4179
DT
369{
370 struct dlm_rsb *r;
3881ac04
DT
371 int count;
372
373 spin_lock(&ls->ls_new_rsb_spin);
374 if (list_empty(&ls->ls_new_rsb)) {
375 count = ls->ls_new_rsb_count;
376 spin_unlock(&ls->ls_new_rsb_spin);
377 log_debug(ls, "find_rsb retry %d %d %s",
378 count, dlm_config.ci_new_rsb_count, name);
379 return -EAGAIN;
380 }
e7fd4179 381
3881ac04
DT
382 r = list_first_entry(&ls->ls_new_rsb, struct dlm_rsb, res_hashchain);
383 list_del(&r->res_hashchain);
9beb3bf5
BP
384 /* Convert the empty list_head to a NULL rb_node for tree usage: */
385 memset(&r->res_hashnode, 0, sizeof(struct rb_node));
3881ac04
DT
386 ls->ls_new_rsb_count--;
387 spin_unlock(&ls->ls_new_rsb_spin);
e7fd4179
DT
388
389 r->res_ls = ls;
390 r->res_length = len;
391 memcpy(r->res_name, name, len);
90135925 392 mutex_init(&r->res_mutex);
e7fd4179
DT
393
394 INIT_LIST_HEAD(&r->res_lookup);
395 INIT_LIST_HEAD(&r->res_grantqueue);
396 INIT_LIST_HEAD(&r->res_convertqueue);
397 INIT_LIST_HEAD(&r->res_waitqueue);
398 INIT_LIST_HEAD(&r->res_root_list);
399 INIT_LIST_HEAD(&r->res_recover_list);
400
3881ac04
DT
401 *r_ret = r;
402 return 0;
e7fd4179
DT
403}
404
9beb3bf5
BP
405static int rsb_cmp(struct dlm_rsb *r, const char *name, int nlen)
406{
407 char maxname[DLM_RESNAME_MAXLEN];
408
409 memset(maxname, 0, DLM_RESNAME_MAXLEN);
410 memcpy(maxname, name, nlen);
411 return memcmp(r->res_name, maxname, DLM_RESNAME_MAXLEN);
412}
413
7210cb7a
DT
414int dlm_search_rsb_tree(struct rb_root *tree, char *name, int len,
415 unsigned int flags, struct dlm_rsb **r_ret)
e7fd4179 416{
9beb3bf5 417 struct rb_node *node = tree->rb_node;
e7fd4179
DT
418 struct dlm_rsb *r;
419 int error = 0;
9beb3bf5
BP
420 int rc;
421
422 while (node) {
423 r = rb_entry(node, struct dlm_rsb, res_hashnode);
424 rc = rsb_cmp(r, name, len);
425 if (rc < 0)
426 node = node->rb_left;
427 else if (rc > 0)
428 node = node->rb_right;
429 else
e7fd4179
DT
430 goto found;
431 }
18c60c0a 432 *r_ret = NULL;
597d0cae 433 return -EBADR;
e7fd4179
DT
434
435 found:
436 if (r->res_nodeid && (flags & R_MASTER))
437 error = -ENOTBLK;
438 *r_ret = r;
439 return error;
440}
441
9beb3bf5
BP
442static int rsb_insert(struct dlm_rsb *rsb, struct rb_root *tree)
443{
444 struct rb_node **newn = &tree->rb_node;
445 struct rb_node *parent = NULL;
446 int rc;
447
448 while (*newn) {
449 struct dlm_rsb *cur = rb_entry(*newn, struct dlm_rsb,
450 res_hashnode);
451
452 parent = *newn;
453 rc = rsb_cmp(cur, rsb->res_name, rsb->res_length);
454 if (rc < 0)
455 newn = &parent->rb_left;
456 else if (rc > 0)
457 newn = &parent->rb_right;
458 else {
459 log_print("rsb_insert match");
460 dlm_dump_rsb(rsb);
461 dlm_dump_rsb(cur);
462 return -EEXIST;
463 }
464 }
465
466 rb_link_node(&rsb->res_hashnode, parent, newn);
467 rb_insert_color(&rsb->res_hashnode, tree);
468 return 0;
469}
470
e7fd4179
DT
471static int _search_rsb(struct dlm_ls *ls, char *name, int len, int b,
472 unsigned int flags, struct dlm_rsb **r_ret)
473{
474 struct dlm_rsb *r;
475 int error;
476
7210cb7a 477 error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].keep, name, len, flags, &r);
e7fd4179
DT
478 if (!error) {
479 kref_get(&r->res_ref);
480 goto out;
481 }
57638bf3
DT
482 if (error == -ENOTBLK)
483 goto out;
484
7210cb7a 485 error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].toss, name, len, flags, &r);
e7fd4179
DT
486 if (error)
487 goto out;
488
9beb3bf5
BP
489 rb_erase(&r->res_hashnode, &ls->ls_rsbtbl[b].toss);
490 error = rsb_insert(r, &ls->ls_rsbtbl[b].keep);
491 if (error)
492 return error;
e7fd4179
DT
493
494 if (dlm_no_directory(ls))
495 goto out;
496
497 if (r->res_nodeid == -1) {
498 rsb_clear_flag(r, RSB_MASTER_UNCERTAIN);
499 r->res_first_lkid = 0;
500 } else if (r->res_nodeid > 0) {
501 rsb_set_flag(r, RSB_MASTER_UNCERTAIN);
502 r->res_first_lkid = 0;
503 } else {
504 DLM_ASSERT(r->res_nodeid == 0, dlm_print_rsb(r););
505 DLM_ASSERT(!rsb_flag(r, RSB_MASTER_UNCERTAIN),);
506 }
507 out:
508 *r_ret = r;
509 return error;
510}
511
e7fd4179
DT
512/*
513 * Find rsb in rsbtbl and potentially create/add one
514 *
515 * Delaying the release of rsb's has a similar benefit to applications keeping
516 * NL locks on an rsb, but without the guarantee that the cached master value
517 * will still be valid when the rsb is reused. Apps aren't always smart enough
518 * to keep NL locks on an rsb that they may lock again shortly; this can lead
519 * to excessive master lookups and removals if we don't delay the release.
520 *
521 * Searching for an rsb means looking through both the normal list and toss
522 * list. When found on the toss list the rsb is moved to the normal list with
523 * ref count of 1; when found on normal list the ref count is incremented.
524 */
525
526static int find_rsb(struct dlm_ls *ls, char *name, int namelen,
527 unsigned int flags, struct dlm_rsb **r_ret)
528{
3881ac04 529 struct dlm_rsb *r = NULL;
e7fd4179 530 uint32_t hash, bucket;
3881ac04 531 int error;
ef58bcca 532
3881ac04
DT
533 if (namelen > DLM_RESNAME_MAXLEN) {
534 error = -EINVAL;
ef58bcca 535 goto out;
3881ac04 536 }
e7fd4179
DT
537
538 if (dlm_no_directory(ls))
539 flags |= R_CREATE;
540
541 hash = jhash(name, namelen, 0);
542 bucket = hash & (ls->ls_rsbtbl_size - 1);
543
3881ac04
DT
544 retry:
545 if (flags & R_CREATE) {
546 error = pre_rsb_struct(ls);
547 if (error < 0)
548 goto out;
549 }
550
551 spin_lock(&ls->ls_rsbtbl[bucket].lock);
552
553 error = _search_rsb(ls, name, namelen, bucket, flags, &r);
e7fd4179 554 if (!error)
3881ac04 555 goto out_unlock;
e7fd4179 556
597d0cae 557 if (error == -EBADR && !(flags & R_CREATE))
3881ac04 558 goto out_unlock;
e7fd4179
DT
559
560 /* the rsb was found but wasn't a master copy */
561 if (error == -ENOTBLK)
3881ac04 562 goto out_unlock;
e7fd4179 563
3881ac04
DT
564 error = get_rsb_struct(ls, name, namelen, &r);
565 if (error == -EAGAIN) {
566 spin_unlock(&ls->ls_rsbtbl[bucket].lock);
567 goto retry;
568 }
569 if (error)
570 goto out_unlock;
e7fd4179
DT
571
572 r->res_hash = hash;
573 r->res_bucket = bucket;
574 r->res_nodeid = -1;
575 kref_init(&r->res_ref);
576
577 /* With no directory, the master can be set immediately */
578 if (dlm_no_directory(ls)) {
579 int nodeid = dlm_dir_nodeid(r);
580 if (nodeid == dlm_our_nodeid())
581 nodeid = 0;
582 r->res_nodeid = nodeid;
583 }
9beb3bf5 584 error = rsb_insert(r, &ls->ls_rsbtbl[bucket].keep);
3881ac04
DT
585 out_unlock:
586 spin_unlock(&ls->ls_rsbtbl[bucket].lock);
e7fd4179
DT
587 out:
588 *r_ret = r;
589 return error;
590}
591
6d40c4a7
DT
592static void dlm_dump_rsb_hash(struct dlm_ls *ls, uint32_t hash)
593{
594 struct rb_node *n;
595 struct dlm_rsb *r;
596 int i;
597
598 for (i = 0; i < ls->ls_rsbtbl_size; i++) {
599 spin_lock(&ls->ls_rsbtbl[i].lock);
600 for (n = rb_first(&ls->ls_rsbtbl[i].keep); n; n = rb_next(n)) {
601 r = rb_entry(n, struct dlm_rsb, res_hashnode);
602 if (r->res_hash == hash)
603 dlm_dump_rsb(r);
604 }
605 spin_unlock(&ls->ls_rsbtbl[i].lock);
606 }
607}
608
e7fd4179
DT
609/* This is only called to add a reference when the code already holds
610 a valid reference to the rsb, so there's no need for locking. */
611
612static inline void hold_rsb(struct dlm_rsb *r)
613{
614 kref_get(&r->res_ref);
615}
616
617void dlm_hold_rsb(struct dlm_rsb *r)
618{
619 hold_rsb(r);
620}
621
622static void toss_rsb(struct kref *kref)
623{
624 struct dlm_rsb *r = container_of(kref, struct dlm_rsb, res_ref);
625 struct dlm_ls *ls = r->res_ls;
626
627 DLM_ASSERT(list_empty(&r->res_root_list), dlm_print_rsb(r););
628 kref_init(&r->res_ref);
9beb3bf5
BP
629 rb_erase(&r->res_hashnode, &ls->ls_rsbtbl[r->res_bucket].keep);
630 rsb_insert(r, &ls->ls_rsbtbl[r->res_bucket].toss);
e7fd4179
DT
631 r->res_toss_time = jiffies;
632 if (r->res_lvbptr) {
52bda2b5 633 dlm_free_lvb(r->res_lvbptr);
e7fd4179
DT
634 r->res_lvbptr = NULL;
635 }
636}
637
25985edc 638/* When all references to the rsb are gone it's transferred to
e7fd4179
DT
639 the tossed list for later disposal. */
640
641static void put_rsb(struct dlm_rsb *r)
642{
643 struct dlm_ls *ls = r->res_ls;
644 uint32_t bucket = r->res_bucket;
645
c7be761a 646 spin_lock(&ls->ls_rsbtbl[bucket].lock);
e7fd4179 647 kref_put(&r->res_ref, toss_rsb);
c7be761a 648 spin_unlock(&ls->ls_rsbtbl[bucket].lock);
e7fd4179
DT
649}
650
651void dlm_put_rsb(struct dlm_rsb *r)
652{
653 put_rsb(r);
654}
655
656/* See comment for unhold_lkb */
657
658static void unhold_rsb(struct dlm_rsb *r)
659{
660 int rv;
661 rv = kref_put(&r->res_ref, toss_rsb);
a345da3e 662 DLM_ASSERT(!rv, dlm_dump_rsb(r););
e7fd4179
DT
663}
664
665static void kill_rsb(struct kref *kref)
666{
667 struct dlm_rsb *r = container_of(kref, struct dlm_rsb, res_ref);
668
669 /* All work is done after the return from kref_put() so we
670 can release the write_lock before the remove and free. */
671
a345da3e
DT
672 DLM_ASSERT(list_empty(&r->res_lookup), dlm_dump_rsb(r););
673 DLM_ASSERT(list_empty(&r->res_grantqueue), dlm_dump_rsb(r););
674 DLM_ASSERT(list_empty(&r->res_convertqueue), dlm_dump_rsb(r););
675 DLM_ASSERT(list_empty(&r->res_waitqueue), dlm_dump_rsb(r););
676 DLM_ASSERT(list_empty(&r->res_root_list), dlm_dump_rsb(r););
677 DLM_ASSERT(list_empty(&r->res_recover_list), dlm_dump_rsb(r););
e7fd4179
DT
678}
679
680/* Attaching/detaching lkb's from rsb's is for rsb reference counting.
681 The rsb must exist as long as any lkb's for it do. */
682
683static void attach_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb)
684{
685 hold_rsb(r);
686 lkb->lkb_resource = r;
687}
688
689static void detach_lkb(struct dlm_lkb *lkb)
690{
691 if (lkb->lkb_resource) {
692 put_rsb(lkb->lkb_resource);
693 lkb->lkb_resource = NULL;
694 }
695}
696
697static int create_lkb(struct dlm_ls *ls, struct dlm_lkb **lkb_ret)
698{
3d6aa675
DT
699 struct dlm_lkb *lkb;
700 int rv, id;
e7fd4179 701
52bda2b5 702 lkb = dlm_allocate_lkb(ls);
e7fd4179
DT
703 if (!lkb)
704 return -ENOMEM;
705
706 lkb->lkb_nodeid = -1;
707 lkb->lkb_grmode = DLM_LOCK_IV;
708 kref_init(&lkb->lkb_ref);
34e22bed 709 INIT_LIST_HEAD(&lkb->lkb_ownqueue);
ef0c2bb0 710 INIT_LIST_HEAD(&lkb->lkb_rsb_lookup);
3ae1acf9 711 INIT_LIST_HEAD(&lkb->lkb_time_list);
23e8e1aa
DT
712 INIT_LIST_HEAD(&lkb->lkb_cb_list);
713 mutex_init(&lkb->lkb_cb_mutex);
714 INIT_WORK(&lkb->lkb_cb_work, dlm_callback_work);
e7fd4179 715
3d6aa675
DT
716 retry:
717 rv = idr_pre_get(&ls->ls_lkbidr, GFP_NOFS);
718 if (!rv)
719 return -ENOMEM;
e7fd4179 720
3d6aa675
DT
721 spin_lock(&ls->ls_lkbidr_spin);
722 rv = idr_get_new_above(&ls->ls_lkbidr, lkb, 1, &id);
723 if (!rv)
724 lkb->lkb_id = id;
725 spin_unlock(&ls->ls_lkbidr_spin);
e7fd4179 726
3d6aa675
DT
727 if (rv == -EAGAIN)
728 goto retry;
e7fd4179 729
3d6aa675
DT
730 if (rv < 0) {
731 log_error(ls, "create_lkb idr error %d", rv);
732 return rv;
e7fd4179
DT
733 }
734
e7fd4179
DT
735 *lkb_ret = lkb;
736 return 0;
737}
738
e7fd4179
DT
739static int find_lkb(struct dlm_ls *ls, uint32_t lkid, struct dlm_lkb **lkb_ret)
740{
741 struct dlm_lkb *lkb;
e7fd4179 742
3d6aa675
DT
743 spin_lock(&ls->ls_lkbidr_spin);
744 lkb = idr_find(&ls->ls_lkbidr, lkid);
e7fd4179
DT
745 if (lkb)
746 kref_get(&lkb->lkb_ref);
3d6aa675 747 spin_unlock(&ls->ls_lkbidr_spin);
e7fd4179
DT
748
749 *lkb_ret = lkb;
750 return lkb ? 0 : -ENOENT;
751}
752
753static void kill_lkb(struct kref *kref)
754{
755 struct dlm_lkb *lkb = container_of(kref, struct dlm_lkb, lkb_ref);
756
757 /* All work is done after the return from kref_put() so we
758 can release the write_lock before the detach_lkb */
759
760 DLM_ASSERT(!lkb->lkb_status, dlm_print_lkb(lkb););
761}
762
b3f58d8f
DT
763/* __put_lkb() is used when an lkb may not have an rsb attached to
764 it so we need to provide the lockspace explicitly */
765
766static int __put_lkb(struct dlm_ls *ls, struct dlm_lkb *lkb)
e7fd4179 767{
3d6aa675 768 uint32_t lkid = lkb->lkb_id;
e7fd4179 769
3d6aa675 770 spin_lock(&ls->ls_lkbidr_spin);
e7fd4179 771 if (kref_put(&lkb->lkb_ref, kill_lkb)) {
3d6aa675
DT
772 idr_remove(&ls->ls_lkbidr, lkid);
773 spin_unlock(&ls->ls_lkbidr_spin);
e7fd4179
DT
774
775 detach_lkb(lkb);
776
777 /* for local/process lkbs, lvbptr points to caller's lksb */
778 if (lkb->lkb_lvbptr && is_master_copy(lkb))
52bda2b5
DT
779 dlm_free_lvb(lkb->lkb_lvbptr);
780 dlm_free_lkb(lkb);
e7fd4179
DT
781 return 1;
782 } else {
3d6aa675 783 spin_unlock(&ls->ls_lkbidr_spin);
e7fd4179
DT
784 return 0;
785 }
786}
787
788int dlm_put_lkb(struct dlm_lkb *lkb)
789{
b3f58d8f
DT
790 struct dlm_ls *ls;
791
792 DLM_ASSERT(lkb->lkb_resource, dlm_print_lkb(lkb););
793 DLM_ASSERT(lkb->lkb_resource->res_ls, dlm_print_lkb(lkb););
794
795 ls = lkb->lkb_resource->res_ls;
796 return __put_lkb(ls, lkb);
e7fd4179
DT
797}
798
799/* This is only called to add a reference when the code already holds
800 a valid reference to the lkb, so there's no need for locking. */
801
802static inline void hold_lkb(struct dlm_lkb *lkb)
803{
804 kref_get(&lkb->lkb_ref);
805}
806
807/* This is called when we need to remove a reference and are certain
808 it's not the last ref. e.g. del_lkb is always called between a
809 find_lkb/put_lkb and is always the inverse of a previous add_lkb.
810 put_lkb would work fine, but would involve unnecessary locking */
811
812static inline void unhold_lkb(struct dlm_lkb *lkb)
813{
814 int rv;
815 rv = kref_put(&lkb->lkb_ref, kill_lkb);
816 DLM_ASSERT(!rv, dlm_print_lkb(lkb););
817}
818
819static void lkb_add_ordered(struct list_head *new, struct list_head *head,
820 int mode)
821{
822 struct dlm_lkb *lkb = NULL;
823
824 list_for_each_entry(lkb, head, lkb_statequeue)
825 if (lkb->lkb_rqmode < mode)
826 break;
827
99fb19d4 828 __list_add(new, lkb->lkb_statequeue.prev, &lkb->lkb_statequeue);
e7fd4179
DT
829}
830
831/* add/remove lkb to rsb's grant/convert/wait queue */
832
833static void add_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb, int status)
834{
835 kref_get(&lkb->lkb_ref);
836
837 DLM_ASSERT(!lkb->lkb_status, dlm_print_lkb(lkb););
838
eeda418d
DT
839 lkb->lkb_timestamp = ktime_get();
840
e7fd4179
DT
841 lkb->lkb_status = status;
842
843 switch (status) {
844 case DLM_LKSTS_WAITING:
845 if (lkb->lkb_exflags & DLM_LKF_HEADQUE)
846 list_add(&lkb->lkb_statequeue, &r->res_waitqueue);
847 else
848 list_add_tail(&lkb->lkb_statequeue, &r->res_waitqueue);
849 break;
850 case DLM_LKSTS_GRANTED:
851 /* convention says granted locks kept in order of grmode */
852 lkb_add_ordered(&lkb->lkb_statequeue, &r->res_grantqueue,
853 lkb->lkb_grmode);
854 break;
855 case DLM_LKSTS_CONVERT:
856 if (lkb->lkb_exflags & DLM_LKF_HEADQUE)
857 list_add(&lkb->lkb_statequeue, &r->res_convertqueue);
858 else
859 list_add_tail(&lkb->lkb_statequeue,
860 &r->res_convertqueue);
861 break;
862 default:
863 DLM_ASSERT(0, dlm_print_lkb(lkb); printk("sts=%d\n", status););
864 }
865}
866
867static void del_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb)
868{
869 lkb->lkb_status = 0;
870 list_del(&lkb->lkb_statequeue);
871 unhold_lkb(lkb);
872}
873
874static void move_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb, int sts)
875{
876 hold_lkb(lkb);
877 del_lkb(r, lkb);
878 add_lkb(r, lkb, sts);
879 unhold_lkb(lkb);
880}
881
ef0c2bb0
DT
882static int msg_reply_type(int mstype)
883{
884 switch (mstype) {
885 case DLM_MSG_REQUEST:
886 return DLM_MSG_REQUEST_REPLY;
887 case DLM_MSG_CONVERT:
888 return DLM_MSG_CONVERT_REPLY;
889 case DLM_MSG_UNLOCK:
890 return DLM_MSG_UNLOCK_REPLY;
891 case DLM_MSG_CANCEL:
892 return DLM_MSG_CANCEL_REPLY;
893 case DLM_MSG_LOOKUP:
894 return DLM_MSG_LOOKUP_REPLY;
895 }
896 return -1;
897}
898
c6ff669b
DT
899static int nodeid_warned(int nodeid, int num_nodes, int *warned)
900{
901 int i;
902
903 for (i = 0; i < num_nodes; i++) {
904 if (!warned[i]) {
905 warned[i] = nodeid;
906 return 0;
907 }
908 if (warned[i] == nodeid)
909 return 1;
910 }
911 return 0;
912}
913
914void dlm_scan_waiters(struct dlm_ls *ls)
915{
916 struct dlm_lkb *lkb;
917 ktime_t zero = ktime_set(0, 0);
918 s64 us;
919 s64 debug_maxus = 0;
920 u32 debug_scanned = 0;
921 u32 debug_expired = 0;
922 int num_nodes = 0;
923 int *warned = NULL;
924
925 if (!dlm_config.ci_waitwarn_us)
926 return;
927
928 mutex_lock(&ls->ls_waiters_mutex);
929
930 list_for_each_entry(lkb, &ls->ls_waiters, lkb_wait_reply) {
931 if (ktime_equal(lkb->lkb_wait_time, zero))
932 continue;
933
934 debug_scanned++;
935
936 us = ktime_to_us(ktime_sub(ktime_get(), lkb->lkb_wait_time));
937
938 if (us < dlm_config.ci_waitwarn_us)
939 continue;
940
941 lkb->lkb_wait_time = zero;
942
943 debug_expired++;
944 if (us > debug_maxus)
945 debug_maxus = us;
946
947 if (!num_nodes) {
948 num_nodes = ls->ls_num_nodes;
5d70828a 949 warned = kzalloc(num_nodes * sizeof(int), GFP_KERNEL);
c6ff669b
DT
950 }
951 if (!warned)
952 continue;
953 if (nodeid_warned(lkb->lkb_wait_nodeid, num_nodes, warned))
954 continue;
955
956 log_error(ls, "waitwarn %x %lld %d us check connection to "
957 "node %d", lkb->lkb_id, (long long)us,
958 dlm_config.ci_waitwarn_us, lkb->lkb_wait_nodeid);
959 }
960 mutex_unlock(&ls->ls_waiters_mutex);
5d70828a 961 kfree(warned);
c6ff669b
DT
962
963 if (debug_expired)
964 log_debug(ls, "scan_waiters %u warn %u over %d us max %lld us",
965 debug_scanned, debug_expired,
966 dlm_config.ci_waitwarn_us, (long long)debug_maxus);
967}
968
e7fd4179
DT
969/* add/remove lkb from global waiters list of lkb's waiting for
970 a reply from a remote node */
971
c6ff669b 972static int add_to_waiters(struct dlm_lkb *lkb, int mstype, int to_nodeid)
e7fd4179
DT
973{
974 struct dlm_ls *ls = lkb->lkb_resource->res_ls;
ef0c2bb0 975 int error = 0;
e7fd4179 976
90135925 977 mutex_lock(&ls->ls_waiters_mutex);
ef0c2bb0
DT
978
979 if (is_overlap_unlock(lkb) ||
980 (is_overlap_cancel(lkb) && (mstype == DLM_MSG_CANCEL))) {
981 error = -EINVAL;
982 goto out;
983 }
984
985 if (lkb->lkb_wait_type || is_overlap_cancel(lkb)) {
986 switch (mstype) {
987 case DLM_MSG_UNLOCK:
988 lkb->lkb_flags |= DLM_IFL_OVERLAP_UNLOCK;
989 break;
990 case DLM_MSG_CANCEL:
991 lkb->lkb_flags |= DLM_IFL_OVERLAP_CANCEL;
992 break;
993 default:
994 error = -EBUSY;
995 goto out;
996 }
997 lkb->lkb_wait_count++;
998 hold_lkb(lkb);
999
43279e53 1000 log_debug(ls, "addwait %x cur %d overlap %d count %d f %x",
ef0c2bb0
DT
1001 lkb->lkb_id, lkb->lkb_wait_type, mstype,
1002 lkb->lkb_wait_count, lkb->lkb_flags);
e7fd4179
DT
1003 goto out;
1004 }
ef0c2bb0
DT
1005
1006 DLM_ASSERT(!lkb->lkb_wait_count,
1007 dlm_print_lkb(lkb);
1008 printk("wait_count %d\n", lkb->lkb_wait_count););
1009
1010 lkb->lkb_wait_count++;
e7fd4179 1011 lkb->lkb_wait_type = mstype;
c6ff669b
DT
1012 lkb->lkb_wait_time = ktime_get();
1013 lkb->lkb_wait_nodeid = to_nodeid; /* for debugging */
ef0c2bb0 1014 hold_lkb(lkb);
e7fd4179
DT
1015 list_add(&lkb->lkb_wait_reply, &ls->ls_waiters);
1016 out:
ef0c2bb0 1017 if (error)
43279e53 1018 log_error(ls, "addwait error %x %d flags %x %d %d %s",
ef0c2bb0
DT
1019 lkb->lkb_id, error, lkb->lkb_flags, mstype,
1020 lkb->lkb_wait_type, lkb->lkb_resource->res_name);
90135925 1021 mutex_unlock(&ls->ls_waiters_mutex);
ef0c2bb0 1022 return error;
e7fd4179
DT
1023}
1024
b790c3b7
DT
1025/* We clear the RESEND flag because we might be taking an lkb off the waiters
1026 list as part of process_requestqueue (e.g. a lookup that has an optimized
1027 request reply on the requestqueue) between dlm_recover_waiters_pre() which
1028 set RESEND and dlm_recover_waiters_post() */
1029
43279e53
DT
1030static int _remove_from_waiters(struct dlm_lkb *lkb, int mstype,
1031 struct dlm_message *ms)
e7fd4179 1032{
ef0c2bb0
DT
1033 struct dlm_ls *ls = lkb->lkb_resource->res_ls;
1034 int overlap_done = 0;
e7fd4179 1035
ef0c2bb0 1036 if (is_overlap_unlock(lkb) && (mstype == DLM_MSG_UNLOCK_REPLY)) {
43279e53 1037 log_debug(ls, "remwait %x unlock_reply overlap", lkb->lkb_id);
ef0c2bb0
DT
1038 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK;
1039 overlap_done = 1;
1040 goto out_del;
e7fd4179 1041 }
ef0c2bb0
DT
1042
1043 if (is_overlap_cancel(lkb) && (mstype == DLM_MSG_CANCEL_REPLY)) {
43279e53 1044 log_debug(ls, "remwait %x cancel_reply overlap", lkb->lkb_id);
ef0c2bb0
DT
1045 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
1046 overlap_done = 1;
1047 goto out_del;
1048 }
1049
43279e53
DT
1050 /* Cancel state was preemptively cleared by a successful convert,
1051 see next comment, nothing to do. */
1052
1053 if ((mstype == DLM_MSG_CANCEL_REPLY) &&
1054 (lkb->lkb_wait_type != DLM_MSG_CANCEL)) {
1055 log_debug(ls, "remwait %x cancel_reply wait_type %d",
1056 lkb->lkb_id, lkb->lkb_wait_type);
1057 return -1;
1058 }
1059
1060 /* Remove for the convert reply, and premptively remove for the
1061 cancel reply. A convert has been granted while there's still
1062 an outstanding cancel on it (the cancel is moot and the result
1063 in the cancel reply should be 0). We preempt the cancel reply
1064 because the app gets the convert result and then can follow up
1065 with another op, like convert. This subsequent op would see the
1066 lingering state of the cancel and fail with -EBUSY. */
1067
1068 if ((mstype == DLM_MSG_CONVERT_REPLY) &&
1069 (lkb->lkb_wait_type == DLM_MSG_CONVERT) &&
1070 is_overlap_cancel(lkb) && ms && !ms->m_result) {
1071 log_debug(ls, "remwait %x convert_reply zap overlap_cancel",
1072 lkb->lkb_id);
1073 lkb->lkb_wait_type = 0;
1074 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
1075 lkb->lkb_wait_count--;
1076 goto out_del;
1077 }
1078
ef0c2bb0
DT
1079 /* N.B. type of reply may not always correspond to type of original
1080 msg due to lookup->request optimization, verify others? */
1081
1082 if (lkb->lkb_wait_type) {
1083 lkb->lkb_wait_type = 0;
1084 goto out_del;
1085 }
1086
6d40c4a7
DT
1087 log_error(ls, "remwait error %x remote %d %x msg %d flags %x no wait",
1088 lkb->lkb_id, ms ? ms->m_header.h_nodeid : 0, lkb->lkb_remid,
1089 mstype, lkb->lkb_flags);
ef0c2bb0
DT
1090 return -1;
1091
1092 out_del:
1093 /* the force-unlock/cancel has completed and we haven't recvd a reply
1094 to the op that was in progress prior to the unlock/cancel; we
1095 give up on any reply to the earlier op. FIXME: not sure when/how
1096 this would happen */
1097
1098 if (overlap_done && lkb->lkb_wait_type) {
43279e53 1099 log_error(ls, "remwait error %x reply %d wait_type %d overlap",
ef0c2bb0
DT
1100 lkb->lkb_id, mstype, lkb->lkb_wait_type);
1101 lkb->lkb_wait_count--;
1102 lkb->lkb_wait_type = 0;
1103 }
1104
1105 DLM_ASSERT(lkb->lkb_wait_count, dlm_print_lkb(lkb););
1106
b790c3b7 1107 lkb->lkb_flags &= ~DLM_IFL_RESEND;
ef0c2bb0
DT
1108 lkb->lkb_wait_count--;
1109 if (!lkb->lkb_wait_count)
1110 list_del_init(&lkb->lkb_wait_reply);
e7fd4179 1111 unhold_lkb(lkb);
ef0c2bb0 1112 return 0;
e7fd4179
DT
1113}
1114
ef0c2bb0 1115static int remove_from_waiters(struct dlm_lkb *lkb, int mstype)
e7fd4179
DT
1116{
1117 struct dlm_ls *ls = lkb->lkb_resource->res_ls;
1118 int error;
1119
90135925 1120 mutex_lock(&ls->ls_waiters_mutex);
43279e53 1121 error = _remove_from_waiters(lkb, mstype, NULL);
90135925 1122 mutex_unlock(&ls->ls_waiters_mutex);
e7fd4179
DT
1123 return error;
1124}
1125
ef0c2bb0
DT
1126/* Handles situations where we might be processing a "fake" or "stub" reply in
1127 which we can't try to take waiters_mutex again. */
1128
1129static int remove_from_waiters_ms(struct dlm_lkb *lkb, struct dlm_message *ms)
1130{
1131 struct dlm_ls *ls = lkb->lkb_resource->res_ls;
1132 int error;
1133
2a7ce0ed 1134 if (ms->m_flags != DLM_IFL_STUB_MS)
ef0c2bb0 1135 mutex_lock(&ls->ls_waiters_mutex);
43279e53 1136 error = _remove_from_waiters(lkb, ms->m_type, ms);
2a7ce0ed 1137 if (ms->m_flags != DLM_IFL_STUB_MS)
ef0c2bb0
DT
1138 mutex_unlock(&ls->ls_waiters_mutex);
1139 return error;
1140}
1141
e7fd4179
DT
1142static void dir_remove(struct dlm_rsb *r)
1143{
1144 int to_nodeid;
1145
1146 if (dlm_no_directory(r->res_ls))
1147 return;
1148
1149 to_nodeid = dlm_dir_nodeid(r);
1150 if (to_nodeid != dlm_our_nodeid())
1151 send_remove(r);
1152 else
1153 dlm_dir_remove_entry(r->res_ls, to_nodeid,
1154 r->res_name, r->res_length);
1155}
1156
9beb3bf5 1157/* FIXME: make this more efficient */
e7fd4179
DT
1158
1159static int shrink_bucket(struct dlm_ls *ls, int b)
1160{
9beb3bf5 1161 struct rb_node *n;
e7fd4179
DT
1162 struct dlm_rsb *r;
1163 int count = 0, found;
1164
1165 for (;;) {
90135925 1166 found = 0;
c7be761a 1167 spin_lock(&ls->ls_rsbtbl[b].lock);
9beb3bf5
BP
1168 for (n = rb_first(&ls->ls_rsbtbl[b].toss); n; n = rb_next(n)) {
1169 r = rb_entry(n, struct dlm_rsb, res_hashnode);
e7fd4179 1170 if (!time_after_eq(jiffies, r->res_toss_time +
68c817a1 1171 dlm_config.ci_toss_secs * HZ))
e7fd4179 1172 continue;
90135925 1173 found = 1;
e7fd4179
DT
1174 break;
1175 }
1176
1177 if (!found) {
c7be761a 1178 spin_unlock(&ls->ls_rsbtbl[b].lock);
e7fd4179
DT
1179 break;
1180 }
1181
1182 if (kref_put(&r->res_ref, kill_rsb)) {
9beb3bf5 1183 rb_erase(&r->res_hashnode, &ls->ls_rsbtbl[b].toss);
c7be761a 1184 spin_unlock(&ls->ls_rsbtbl[b].lock);
e7fd4179
DT
1185
1186 if (is_master(r))
1187 dir_remove(r);
52bda2b5 1188 dlm_free_rsb(r);
e7fd4179
DT
1189 count++;
1190 } else {
c7be761a 1191 spin_unlock(&ls->ls_rsbtbl[b].lock);
e7fd4179
DT
1192 log_error(ls, "tossed rsb in use %s", r->res_name);
1193 }
1194 }
1195
1196 return count;
1197}
1198
1199void dlm_scan_rsbs(struct dlm_ls *ls)
1200{
1201 int i;
1202
e7fd4179
DT
1203 for (i = 0; i < ls->ls_rsbtbl_size; i++) {
1204 shrink_bucket(ls, i);
85e86edf
DT
1205 if (dlm_locking_stopped(ls))
1206 break;
e7fd4179
DT
1207 cond_resched();
1208 }
1209}
1210
3ae1acf9
DT
1211static void add_timeout(struct dlm_lkb *lkb)
1212{
1213 struct dlm_ls *ls = lkb->lkb_resource->res_ls;
1214
eeda418d 1215 if (is_master_copy(lkb))
3ae1acf9 1216 return;
3ae1acf9
DT
1217
1218 if (test_bit(LSFL_TIMEWARN, &ls->ls_flags) &&
1219 !(lkb->lkb_exflags & DLM_LKF_NODLCKWT)) {
1220 lkb->lkb_flags |= DLM_IFL_WATCH_TIMEWARN;
1221 goto add_it;
1222 }
84d8cd69
DT
1223 if (lkb->lkb_exflags & DLM_LKF_TIMEOUT)
1224 goto add_it;
3ae1acf9
DT
1225 return;
1226
1227 add_it:
1228 DLM_ASSERT(list_empty(&lkb->lkb_time_list), dlm_print_lkb(lkb););
1229 mutex_lock(&ls->ls_timeout_mutex);
1230 hold_lkb(lkb);
3ae1acf9
DT
1231 list_add_tail(&lkb->lkb_time_list, &ls->ls_timeout);
1232 mutex_unlock(&ls->ls_timeout_mutex);
1233}
1234
1235static void del_timeout(struct dlm_lkb *lkb)
1236{
1237 struct dlm_ls *ls = lkb->lkb_resource->res_ls;
1238
1239 mutex_lock(&ls->ls_timeout_mutex);
1240 if (!list_empty(&lkb->lkb_time_list)) {
1241 list_del_init(&lkb->lkb_time_list);
1242 unhold_lkb(lkb);
1243 }
1244 mutex_unlock(&ls->ls_timeout_mutex);
1245}
1246
1247/* FIXME: is it safe to look at lkb_exflags, lkb_flags, lkb_timestamp, and
1248 lkb_lksb_timeout without lock_rsb? Note: we can't lock timeout_mutex
1249 and then lock rsb because of lock ordering in add_timeout. We may need
1250 to specify some special timeout-related bits in the lkb that are just to
1251 be accessed under the timeout_mutex. */
1252
1253void dlm_scan_timeout(struct dlm_ls *ls)
1254{
1255 struct dlm_rsb *r;
1256 struct dlm_lkb *lkb;
1257 int do_cancel, do_warn;
eeda418d 1258 s64 wait_us;
3ae1acf9
DT
1259
1260 for (;;) {
1261 if (dlm_locking_stopped(ls))
1262 break;
1263
1264 do_cancel = 0;
1265 do_warn = 0;
1266 mutex_lock(&ls->ls_timeout_mutex);
1267 list_for_each_entry(lkb, &ls->ls_timeout, lkb_time_list) {
1268
eeda418d
DT
1269 wait_us = ktime_to_us(ktime_sub(ktime_get(),
1270 lkb->lkb_timestamp));
1271
3ae1acf9 1272 if ((lkb->lkb_exflags & DLM_LKF_TIMEOUT) &&
eeda418d 1273 wait_us >= (lkb->lkb_timeout_cs * 10000))
3ae1acf9
DT
1274 do_cancel = 1;
1275
1276 if ((lkb->lkb_flags & DLM_IFL_WATCH_TIMEWARN) &&
eeda418d 1277 wait_us >= dlm_config.ci_timewarn_cs * 10000)
3ae1acf9
DT
1278 do_warn = 1;
1279
1280 if (!do_cancel && !do_warn)
1281 continue;
1282 hold_lkb(lkb);
1283 break;
1284 }
1285 mutex_unlock(&ls->ls_timeout_mutex);
1286
1287 if (!do_cancel && !do_warn)
1288 break;
1289
1290 r = lkb->lkb_resource;
1291 hold_rsb(r);
1292 lock_rsb(r);
1293
1294 if (do_warn) {
1295 /* clear flag so we only warn once */
1296 lkb->lkb_flags &= ~DLM_IFL_WATCH_TIMEWARN;
1297 if (!(lkb->lkb_exflags & DLM_LKF_TIMEOUT))
1298 del_timeout(lkb);
1299 dlm_timeout_warn(lkb);
1300 }
1301
1302 if (do_cancel) {
b3cab7b9 1303 log_debug(ls, "timeout cancel %x node %d %s",
639aca41 1304 lkb->lkb_id, lkb->lkb_nodeid, r->res_name);
3ae1acf9
DT
1305 lkb->lkb_flags &= ~DLM_IFL_WATCH_TIMEWARN;
1306 lkb->lkb_flags |= DLM_IFL_TIMEOUT_CANCEL;
1307 del_timeout(lkb);
1308 _cancel_lock(r, lkb);
1309 }
1310
1311 unlock_rsb(r);
1312 unhold_rsb(r);
1313 dlm_put_lkb(lkb);
1314 }
1315}
1316
1317/* This is only called by dlm_recoverd, and we rely on dlm_ls_stop() stopping
1318 dlm_recoverd before checking/setting ls_recover_begin. */
1319
1320void dlm_adjust_timeouts(struct dlm_ls *ls)
1321{
1322 struct dlm_lkb *lkb;
eeda418d 1323 u64 adj_us = jiffies_to_usecs(jiffies - ls->ls_recover_begin);
3ae1acf9
DT
1324
1325 ls->ls_recover_begin = 0;
1326 mutex_lock(&ls->ls_timeout_mutex);
1327 list_for_each_entry(lkb, &ls->ls_timeout, lkb_time_list)
eeda418d 1328 lkb->lkb_timestamp = ktime_add_us(lkb->lkb_timestamp, adj_us);
3ae1acf9 1329 mutex_unlock(&ls->ls_timeout_mutex);
c6ff669b
DT
1330
1331 if (!dlm_config.ci_waitwarn_us)
1332 return;
1333
1334 mutex_lock(&ls->ls_waiters_mutex);
1335 list_for_each_entry(lkb, &ls->ls_waiters, lkb_wait_reply) {
1336 if (ktime_to_us(lkb->lkb_wait_time))
1337 lkb->lkb_wait_time = ktime_get();
1338 }
1339 mutex_unlock(&ls->ls_waiters_mutex);
3ae1acf9
DT
1340}
1341
e7fd4179
DT
1342/* lkb is master or local copy */
1343
1344static void set_lvb_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1345{
1346 int b, len = r->res_ls->ls_lvblen;
1347
1348 /* b=1 lvb returned to caller
1349 b=0 lvb written to rsb or invalidated
1350 b=-1 do nothing */
1351
1352 b = dlm_lvb_operations[lkb->lkb_grmode + 1][lkb->lkb_rqmode + 1];
1353
1354 if (b == 1) {
1355 if (!lkb->lkb_lvbptr)
1356 return;
1357
1358 if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
1359 return;
1360
1361 if (!r->res_lvbptr)
1362 return;
1363
1364 memcpy(lkb->lkb_lvbptr, r->res_lvbptr, len);
1365 lkb->lkb_lvbseq = r->res_lvbseq;
1366
1367 } else if (b == 0) {
1368 if (lkb->lkb_exflags & DLM_LKF_IVVALBLK) {
1369 rsb_set_flag(r, RSB_VALNOTVALID);
1370 return;
1371 }
1372
1373 if (!lkb->lkb_lvbptr)
1374 return;
1375
1376 if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
1377 return;
1378
1379 if (!r->res_lvbptr)
52bda2b5 1380 r->res_lvbptr = dlm_allocate_lvb(r->res_ls);
e7fd4179
DT
1381
1382 if (!r->res_lvbptr)
1383 return;
1384
1385 memcpy(r->res_lvbptr, lkb->lkb_lvbptr, len);
1386 r->res_lvbseq++;
1387 lkb->lkb_lvbseq = r->res_lvbseq;
1388 rsb_clear_flag(r, RSB_VALNOTVALID);
1389 }
1390
1391 if (rsb_flag(r, RSB_VALNOTVALID))
1392 lkb->lkb_sbflags |= DLM_SBF_VALNOTVALID;
1393}
1394
1395static void set_lvb_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1396{
1397 if (lkb->lkb_grmode < DLM_LOCK_PW)
1398 return;
1399
1400 if (lkb->lkb_exflags & DLM_LKF_IVVALBLK) {
1401 rsb_set_flag(r, RSB_VALNOTVALID);
1402 return;
1403 }
1404
1405 if (!lkb->lkb_lvbptr)
1406 return;
1407
1408 if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
1409 return;
1410
1411 if (!r->res_lvbptr)
52bda2b5 1412 r->res_lvbptr = dlm_allocate_lvb(r->res_ls);
e7fd4179
DT
1413
1414 if (!r->res_lvbptr)
1415 return;
1416
1417 memcpy(r->res_lvbptr, lkb->lkb_lvbptr, r->res_ls->ls_lvblen);
1418 r->res_lvbseq++;
1419 rsb_clear_flag(r, RSB_VALNOTVALID);
1420}
1421
1422/* lkb is process copy (pc) */
1423
1424static void set_lvb_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb,
1425 struct dlm_message *ms)
1426{
1427 int b;
1428
1429 if (!lkb->lkb_lvbptr)
1430 return;
1431
1432 if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
1433 return;
1434
597d0cae 1435 b = dlm_lvb_operations[lkb->lkb_grmode + 1][lkb->lkb_rqmode + 1];
e7fd4179
DT
1436 if (b == 1) {
1437 int len = receive_extralen(ms);
a9cc9159
AV
1438 if (len > DLM_RESNAME_MAXLEN)
1439 len = DLM_RESNAME_MAXLEN;
e7fd4179
DT
1440 memcpy(lkb->lkb_lvbptr, ms->m_extra, len);
1441 lkb->lkb_lvbseq = ms->m_lvbseq;
1442 }
1443}
1444
1445/* Manipulate lkb's on rsb's convert/granted/waiting queues
1446 remove_lock -- used for unlock, removes lkb from granted
1447 revert_lock -- used for cancel, moves lkb from convert to granted
1448 grant_lock -- used for request and convert, adds lkb to granted or
1449 moves lkb from convert or waiting to granted
1450
1451 Each of these is used for master or local copy lkb's. There is
1452 also a _pc() variation used to make the corresponding change on
1453 a process copy (pc) lkb. */
1454
1455static void _remove_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1456{
1457 del_lkb(r, lkb);
1458 lkb->lkb_grmode = DLM_LOCK_IV;
1459 /* this unhold undoes the original ref from create_lkb()
1460 so this leads to the lkb being freed */
1461 unhold_lkb(lkb);
1462}
1463
1464static void remove_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1465{
1466 set_lvb_unlock(r, lkb);
1467 _remove_lock(r, lkb);
1468}
1469
1470static void remove_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb)
1471{
1472 _remove_lock(r, lkb);
1473}
1474
ef0c2bb0
DT
1475/* returns: 0 did nothing
1476 1 moved lock to granted
1477 -1 removed lock */
1478
1479static int revert_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
e7fd4179 1480{
ef0c2bb0
DT
1481 int rv = 0;
1482
e7fd4179
DT
1483 lkb->lkb_rqmode = DLM_LOCK_IV;
1484
1485 switch (lkb->lkb_status) {
597d0cae
DT
1486 case DLM_LKSTS_GRANTED:
1487 break;
e7fd4179
DT
1488 case DLM_LKSTS_CONVERT:
1489 move_lkb(r, lkb, DLM_LKSTS_GRANTED);
ef0c2bb0 1490 rv = 1;
e7fd4179
DT
1491 break;
1492 case DLM_LKSTS_WAITING:
1493 del_lkb(r, lkb);
1494 lkb->lkb_grmode = DLM_LOCK_IV;
1495 /* this unhold undoes the original ref from create_lkb()
1496 so this leads to the lkb being freed */
1497 unhold_lkb(lkb);
ef0c2bb0 1498 rv = -1;
e7fd4179
DT
1499 break;
1500 default:
1501 log_print("invalid status for revert %d", lkb->lkb_status);
1502 }
ef0c2bb0 1503 return rv;
e7fd4179
DT
1504}
1505
ef0c2bb0 1506static int revert_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb)
e7fd4179 1507{
ef0c2bb0 1508 return revert_lock(r, lkb);
e7fd4179
DT
1509}
1510
1511static void _grant_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1512{
1513 if (lkb->lkb_grmode != lkb->lkb_rqmode) {
1514 lkb->lkb_grmode = lkb->lkb_rqmode;
1515 if (lkb->lkb_status)
1516 move_lkb(r, lkb, DLM_LKSTS_GRANTED);
1517 else
1518 add_lkb(r, lkb, DLM_LKSTS_GRANTED);
1519 }
1520
1521 lkb->lkb_rqmode = DLM_LOCK_IV;
e7fd4179
DT
1522}
1523
1524static void grant_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1525{
1526 set_lvb_lock(r, lkb);
1527 _grant_lock(r, lkb);
1528 lkb->lkb_highbast = 0;
1529}
1530
1531static void grant_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb,
1532 struct dlm_message *ms)
1533{
1534 set_lvb_lock_pc(r, lkb, ms);
1535 _grant_lock(r, lkb);
1536}
1537
1538/* called by grant_pending_locks() which means an async grant message must
1539 be sent to the requesting node in addition to granting the lock if the
1540 lkb belongs to a remote node. */
1541
1542static void grant_lock_pending(struct dlm_rsb *r, struct dlm_lkb *lkb)
1543{
1544 grant_lock(r, lkb);
1545 if (is_master_copy(lkb))
1546 send_grant(r, lkb);
1547 else
1548 queue_cast(r, lkb, 0);
1549}
1550
7d3c1feb
DT
1551/* The special CONVDEADLK, ALTPR and ALTCW flags allow the master to
1552 change the granted/requested modes. We're munging things accordingly in
1553 the process copy.
1554 CONVDEADLK: our grmode may have been forced down to NL to resolve a
1555 conversion deadlock
1556 ALTPR/ALTCW: our rqmode may have been changed to PR or CW to become
1557 compatible with other granted locks */
1558
2a7ce0ed 1559static void munge_demoted(struct dlm_lkb *lkb)
7d3c1feb 1560{
7d3c1feb
DT
1561 if (lkb->lkb_rqmode == DLM_LOCK_IV || lkb->lkb_grmode == DLM_LOCK_IV) {
1562 log_print("munge_demoted %x invalid modes gr %d rq %d",
1563 lkb->lkb_id, lkb->lkb_grmode, lkb->lkb_rqmode);
1564 return;
1565 }
1566
1567 lkb->lkb_grmode = DLM_LOCK_NL;
1568}
1569
1570static void munge_altmode(struct dlm_lkb *lkb, struct dlm_message *ms)
1571{
1572 if (ms->m_type != DLM_MSG_REQUEST_REPLY &&
1573 ms->m_type != DLM_MSG_GRANT) {
1574 log_print("munge_altmode %x invalid reply type %d",
1575 lkb->lkb_id, ms->m_type);
1576 return;
1577 }
1578
1579 if (lkb->lkb_exflags & DLM_LKF_ALTPR)
1580 lkb->lkb_rqmode = DLM_LOCK_PR;
1581 else if (lkb->lkb_exflags & DLM_LKF_ALTCW)
1582 lkb->lkb_rqmode = DLM_LOCK_CW;
1583 else {
1584 log_print("munge_altmode invalid exflags %x", lkb->lkb_exflags);
1585 dlm_print_lkb(lkb);
1586 }
1587}
1588
e7fd4179
DT
1589static inline int first_in_list(struct dlm_lkb *lkb, struct list_head *head)
1590{
1591 struct dlm_lkb *first = list_entry(head->next, struct dlm_lkb,
1592 lkb_statequeue);
1593 if (lkb->lkb_id == first->lkb_id)
90135925 1594 return 1;
e7fd4179 1595
90135925 1596 return 0;
e7fd4179
DT
1597}
1598
e7fd4179
DT
1599/* Check if the given lkb conflicts with another lkb on the queue. */
1600
1601static int queue_conflict(struct list_head *head, struct dlm_lkb *lkb)
1602{
1603 struct dlm_lkb *this;
1604
1605 list_for_each_entry(this, head, lkb_statequeue) {
1606 if (this == lkb)
1607 continue;
3bcd3687 1608 if (!modes_compat(this, lkb))
90135925 1609 return 1;
e7fd4179 1610 }
90135925 1611 return 0;
e7fd4179
DT
1612}
1613
1614/*
1615 * "A conversion deadlock arises with a pair of lock requests in the converting
1616 * queue for one resource. The granted mode of each lock blocks the requested
1617 * mode of the other lock."
1618 *
c85d65e9
DT
1619 * Part 2: if the granted mode of lkb is preventing an earlier lkb in the
1620 * convert queue from being granted, then deadlk/demote lkb.
e7fd4179
DT
1621 *
1622 * Example:
1623 * Granted Queue: empty
1624 * Convert Queue: NL->EX (first lock)
1625 * PR->EX (second lock)
1626 *
1627 * The first lock can't be granted because of the granted mode of the second
1628 * lock and the second lock can't be granted because it's not first in the
c85d65e9
DT
1629 * list. We either cancel lkb's conversion (PR->EX) and return EDEADLK, or we
1630 * demote the granted mode of lkb (from PR to NL) if it has the CONVDEADLK
1631 * flag set and return DEMOTED in the lksb flags.
e7fd4179 1632 *
c85d65e9
DT
1633 * Originally, this function detected conv-deadlk in a more limited scope:
1634 * - if !modes_compat(lkb1, lkb2) && !modes_compat(lkb2, lkb1), or
1635 * - if lkb1 was the first entry in the queue (not just earlier), and was
1636 * blocked by the granted mode of lkb2, and there was nothing on the
1637 * granted queue preventing lkb1 from being granted immediately, i.e.
1638 * lkb2 was the only thing preventing lkb1 from being granted.
1639 *
1640 * That second condition meant we'd only say there was conv-deadlk if
1641 * resolving it (by demotion) would lead to the first lock on the convert
1642 * queue being granted right away. It allowed conversion deadlocks to exist
1643 * between locks on the convert queue while they couldn't be granted anyway.
1644 *
1645 * Now, we detect and take action on conversion deadlocks immediately when
1646 * they're created, even if they may not be immediately consequential. If
1647 * lkb1 exists anywhere in the convert queue and lkb2 comes in with a granted
1648 * mode that would prevent lkb1's conversion from being granted, we do a
1649 * deadlk/demote on lkb2 right away and don't let it onto the convert queue.
1650 * I think this means that the lkb_is_ahead condition below should always
1651 * be zero, i.e. there will never be conv-deadlk between two locks that are
1652 * both already on the convert queue.
e7fd4179
DT
1653 */
1654
c85d65e9 1655static int conversion_deadlock_detect(struct dlm_rsb *r, struct dlm_lkb *lkb2)
e7fd4179 1656{
c85d65e9
DT
1657 struct dlm_lkb *lkb1;
1658 int lkb_is_ahead = 0;
e7fd4179 1659
c85d65e9
DT
1660 list_for_each_entry(lkb1, &r->res_convertqueue, lkb_statequeue) {
1661 if (lkb1 == lkb2) {
1662 lkb_is_ahead = 1;
e7fd4179
DT
1663 continue;
1664 }
1665
c85d65e9
DT
1666 if (!lkb_is_ahead) {
1667 if (!modes_compat(lkb2, lkb1))
1668 return 1;
1669 } else {
1670 if (!modes_compat(lkb2, lkb1) &&
1671 !modes_compat(lkb1, lkb2))
1672 return 1;
1673 }
e7fd4179 1674 }
90135925 1675 return 0;
e7fd4179
DT
1676}
1677
1678/*
1679 * Return 1 if the lock can be granted, 0 otherwise.
1680 * Also detect and resolve conversion deadlocks.
1681 *
1682 * lkb is the lock to be granted
1683 *
1684 * now is 1 if the function is being called in the context of the
1685 * immediate request, it is 0 if called later, after the lock has been
1686 * queued.
1687 *
1688 * References are from chapter 6 of "VAXcluster Principles" by Roy Davis
1689 */
1690
1691static int _can_be_granted(struct dlm_rsb *r, struct dlm_lkb *lkb, int now)
1692{
1693 int8_t conv = (lkb->lkb_grmode != DLM_LOCK_IV);
1694
1695 /*
1696 * 6-10: Version 5.4 introduced an option to address the phenomenon of
1697 * a new request for a NL mode lock being blocked.
1698 *
1699 * 6-11: If the optional EXPEDITE flag is used with the new NL mode
1700 * request, then it would be granted. In essence, the use of this flag
1701 * tells the Lock Manager to expedite theis request by not considering
1702 * what may be in the CONVERTING or WAITING queues... As of this
1703 * writing, the EXPEDITE flag can be used only with new requests for NL
1704 * mode locks. This flag is not valid for conversion requests.
1705 *
1706 * A shortcut. Earlier checks return an error if EXPEDITE is used in a
1707 * conversion or used with a non-NL requested mode. We also know an
1708 * EXPEDITE request is always granted immediately, so now must always
1709 * be 1. The full condition to grant an expedite request: (now &&
1710 * !conv && lkb->rqmode == DLM_LOCK_NL && (flags & EXPEDITE)) can
1711 * therefore be shortened to just checking the flag.
1712 */
1713
1714 if (lkb->lkb_exflags & DLM_LKF_EXPEDITE)
90135925 1715 return 1;
e7fd4179
DT
1716
1717 /*
1718 * A shortcut. Without this, !queue_conflict(grantqueue, lkb) would be
1719 * added to the remaining conditions.
1720 */
1721
1722 if (queue_conflict(&r->res_grantqueue, lkb))
1723 goto out;
1724
1725 /*
1726 * 6-3: By default, a conversion request is immediately granted if the
1727 * requested mode is compatible with the modes of all other granted
1728 * locks
1729 */
1730
1731 if (queue_conflict(&r->res_convertqueue, lkb))
1732 goto out;
1733
1734 /*
1735 * 6-5: But the default algorithm for deciding whether to grant or
1736 * queue conversion requests does not by itself guarantee that such
1737 * requests are serviced on a "first come first serve" basis. This, in
1738 * turn, can lead to a phenomenon known as "indefinate postponement".
1739 *
1740 * 6-7: This issue is dealt with by using the optional QUECVT flag with
1741 * the system service employed to request a lock conversion. This flag
1742 * forces certain conversion requests to be queued, even if they are
1743 * compatible with the granted modes of other locks on the same
1744 * resource. Thus, the use of this flag results in conversion requests
1745 * being ordered on a "first come first servce" basis.
1746 *
1747 * DCT: This condition is all about new conversions being able to occur
1748 * "in place" while the lock remains on the granted queue (assuming
1749 * nothing else conflicts.) IOW if QUECVT isn't set, a conversion
1750 * doesn't _have_ to go onto the convert queue where it's processed in
1751 * order. The "now" variable is necessary to distinguish converts
1752 * being received and processed for the first time now, because once a
1753 * convert is moved to the conversion queue the condition below applies
1754 * requiring fifo granting.
1755 */
1756
1757 if (now && conv && !(lkb->lkb_exflags & DLM_LKF_QUECVT))
90135925 1758 return 1;
e7fd4179 1759
53ad1c98
DT
1760 /*
1761 * Even if the convert is compat with all granted locks,
1762 * QUECVT forces it behind other locks on the convert queue.
1763 */
1764
1765 if (now && conv && (lkb->lkb_exflags & DLM_LKF_QUECVT)) {
1766 if (list_empty(&r->res_convertqueue))
1767 return 1;
1768 else
1769 goto out;
1770 }
1771
e7fd4179 1772 /*
3bcd3687
DT
1773 * The NOORDER flag is set to avoid the standard vms rules on grant
1774 * order.
e7fd4179
DT
1775 */
1776
1777 if (lkb->lkb_exflags & DLM_LKF_NOORDER)
90135925 1778 return 1;
e7fd4179
DT
1779
1780 /*
1781 * 6-3: Once in that queue [CONVERTING], a conversion request cannot be
1782 * granted until all other conversion requests ahead of it are granted
1783 * and/or canceled.
1784 */
1785
1786 if (!now && conv && first_in_list(lkb, &r->res_convertqueue))
90135925 1787 return 1;
e7fd4179
DT
1788
1789 /*
1790 * 6-4: By default, a new request is immediately granted only if all
1791 * three of the following conditions are satisfied when the request is
1792 * issued:
1793 * - The queue of ungranted conversion requests for the resource is
1794 * empty.
1795 * - The queue of ungranted new requests for the resource is empty.
1796 * - The mode of the new request is compatible with the most
1797 * restrictive mode of all granted locks on the resource.
1798 */
1799
1800 if (now && !conv && list_empty(&r->res_convertqueue) &&
1801 list_empty(&r->res_waitqueue))
90135925 1802 return 1;
e7fd4179
DT
1803
1804 /*
1805 * 6-4: Once a lock request is in the queue of ungranted new requests,
1806 * it cannot be granted until the queue of ungranted conversion
1807 * requests is empty, all ungranted new requests ahead of it are
1808 * granted and/or canceled, and it is compatible with the granted mode
1809 * of the most restrictive lock granted on the resource.
1810 */
1811
1812 if (!now && !conv && list_empty(&r->res_convertqueue) &&
1813 first_in_list(lkb, &r->res_waitqueue))
90135925 1814 return 1;
e7fd4179 1815 out:
90135925 1816 return 0;
e7fd4179
DT
1817}
1818
c85d65e9
DT
1819static int can_be_granted(struct dlm_rsb *r, struct dlm_lkb *lkb, int now,
1820 int *err)
e7fd4179 1821{
e7fd4179
DT
1822 int rv;
1823 int8_t alt = 0, rqmode = lkb->lkb_rqmode;
c85d65e9
DT
1824 int8_t is_convert = (lkb->lkb_grmode != DLM_LOCK_IV);
1825
1826 if (err)
1827 *err = 0;
e7fd4179
DT
1828
1829 rv = _can_be_granted(r, lkb, now);
1830 if (rv)
1831 goto out;
1832
c85d65e9
DT
1833 /*
1834 * The CONVDEADLK flag is non-standard and tells the dlm to resolve
1835 * conversion deadlocks by demoting grmode to NL, otherwise the dlm
1836 * cancels one of the locks.
1837 */
1838
1839 if (is_convert && can_be_queued(lkb) &&
1840 conversion_deadlock_detect(r, lkb)) {
1841 if (lkb->lkb_exflags & DLM_LKF_CONVDEADLK) {
1842 lkb->lkb_grmode = DLM_LOCK_NL;
1843 lkb->lkb_sbflags |= DLM_SBF_DEMOTED;
1844 } else if (!(lkb->lkb_exflags & DLM_LKF_NODLCKWT)) {
1845 if (err)
1846 *err = -EDEADLK;
1847 else {
1848 log_print("can_be_granted deadlock %x now %d",
1849 lkb->lkb_id, now);
1850 dlm_dump_rsb(r);
1851 }
1852 }
e7fd4179 1853 goto out;
c85d65e9 1854 }
e7fd4179 1855
c85d65e9
DT
1856 /*
1857 * The ALTPR and ALTCW flags are non-standard and tell the dlm to try
1858 * to grant a request in a mode other than the normal rqmode. It's a
1859 * simple way to provide a big optimization to applications that can
1860 * use them.
1861 */
1862
1863 if (rqmode != DLM_LOCK_PR && (lkb->lkb_exflags & DLM_LKF_ALTPR))
e7fd4179 1864 alt = DLM_LOCK_PR;
c85d65e9 1865 else if (rqmode != DLM_LOCK_CW && (lkb->lkb_exflags & DLM_LKF_ALTCW))
e7fd4179
DT
1866 alt = DLM_LOCK_CW;
1867
1868 if (alt) {
1869 lkb->lkb_rqmode = alt;
1870 rv = _can_be_granted(r, lkb, now);
1871 if (rv)
1872 lkb->lkb_sbflags |= DLM_SBF_ALTMODE;
1873 else
1874 lkb->lkb_rqmode = rqmode;
1875 }
1876 out:
1877 return rv;
1878}
1879
c85d65e9
DT
1880/* FIXME: I don't think that can_be_granted() can/will demote or find deadlock
1881 for locks pending on the convert list. Once verified (watch for these
1882 log_prints), we should be able to just call _can_be_granted() and not
1883 bother with the demote/deadlk cases here (and there's no easy way to deal
1884 with a deadlk here, we'd have to generate something like grant_lock with
1885 the deadlk error.) */
1886
36509258
DT
1887/* Returns the highest requested mode of all blocked conversions; sets
1888 cw if there's a blocked conversion to DLM_LOCK_CW. */
c85d65e9 1889
36509258 1890static int grant_pending_convert(struct dlm_rsb *r, int high, int *cw)
e7fd4179
DT
1891{
1892 struct dlm_lkb *lkb, *s;
1893 int hi, demoted, quit, grant_restart, demote_restart;
c85d65e9 1894 int deadlk;
e7fd4179
DT
1895
1896 quit = 0;
1897 restart:
1898 grant_restart = 0;
1899 demote_restart = 0;
1900 hi = DLM_LOCK_IV;
1901
1902 list_for_each_entry_safe(lkb, s, &r->res_convertqueue, lkb_statequeue) {
1903 demoted = is_demoted(lkb);
c85d65e9
DT
1904 deadlk = 0;
1905
1906 if (can_be_granted(r, lkb, 0, &deadlk)) {
e7fd4179
DT
1907 grant_lock_pending(r, lkb);
1908 grant_restart = 1;
c85d65e9 1909 continue;
e7fd4179 1910 }
c85d65e9
DT
1911
1912 if (!demoted && is_demoted(lkb)) {
1913 log_print("WARN: pending demoted %x node %d %s",
1914 lkb->lkb_id, lkb->lkb_nodeid, r->res_name);
1915 demote_restart = 1;
1916 continue;
1917 }
1918
1919 if (deadlk) {
1920 log_print("WARN: pending deadlock %x node %d %s",
1921 lkb->lkb_id, lkb->lkb_nodeid, r->res_name);
1922 dlm_dump_rsb(r);
1923 continue;
1924 }
1925
1926 hi = max_t(int, lkb->lkb_rqmode, hi);
36509258
DT
1927
1928 if (cw && lkb->lkb_rqmode == DLM_LOCK_CW)
1929 *cw = 1;
e7fd4179
DT
1930 }
1931
1932 if (grant_restart)
1933 goto restart;
1934 if (demote_restart && !quit) {
1935 quit = 1;
1936 goto restart;
1937 }
1938
1939 return max_t(int, high, hi);
1940}
1941
36509258 1942static int grant_pending_wait(struct dlm_rsb *r, int high, int *cw)
e7fd4179
DT
1943{
1944 struct dlm_lkb *lkb, *s;
1945
1946 list_for_each_entry_safe(lkb, s, &r->res_waitqueue, lkb_statequeue) {
c85d65e9 1947 if (can_be_granted(r, lkb, 0, NULL))
e7fd4179 1948 grant_lock_pending(r, lkb);
36509258 1949 else {
e7fd4179 1950 high = max_t(int, lkb->lkb_rqmode, high);
36509258
DT
1951 if (lkb->lkb_rqmode == DLM_LOCK_CW)
1952 *cw = 1;
1953 }
e7fd4179
DT
1954 }
1955
1956 return high;
1957}
1958
36509258
DT
1959/* cw of 1 means there's a lock with a rqmode of DLM_LOCK_CW that's blocked
1960 on either the convert or waiting queue.
1961 high is the largest rqmode of all locks blocked on the convert or
1962 waiting queue. */
1963
1964static int lock_requires_bast(struct dlm_lkb *gr, int high, int cw)
1965{
1966 if (gr->lkb_grmode == DLM_LOCK_PR && cw) {
1967 if (gr->lkb_highbast < DLM_LOCK_EX)
1968 return 1;
1969 return 0;
1970 }
1971
1972 if (gr->lkb_highbast < high &&
1973 !__dlm_compat_matrix[gr->lkb_grmode+1][high+1])
1974 return 1;
1975 return 0;
1976}
1977
e7fd4179
DT
1978static void grant_pending_locks(struct dlm_rsb *r)
1979{
1980 struct dlm_lkb *lkb, *s;
1981 int high = DLM_LOCK_IV;
36509258 1982 int cw = 0;
e7fd4179 1983
a345da3e 1984 DLM_ASSERT(is_master(r), dlm_dump_rsb(r););
e7fd4179 1985
36509258
DT
1986 high = grant_pending_convert(r, high, &cw);
1987 high = grant_pending_wait(r, high, &cw);
e7fd4179
DT
1988
1989 if (high == DLM_LOCK_IV)
1990 return;
1991
1992 /*
1993 * If there are locks left on the wait/convert queue then send blocking
1994 * ASTs to granted locks based on the largest requested mode (high)
36509258 1995 * found above.
e7fd4179
DT
1996 */
1997
1998 list_for_each_entry_safe(lkb, s, &r->res_grantqueue, lkb_statequeue) {
e5dae548 1999 if (lkb->lkb_bastfn && lock_requires_bast(lkb, high, cw)) {
329fc4c3
DT
2000 if (cw && high == DLM_LOCK_PR &&
2001 lkb->lkb_grmode == DLM_LOCK_PR)
36509258
DT
2002 queue_bast(r, lkb, DLM_LOCK_CW);
2003 else
2004 queue_bast(r, lkb, high);
e7fd4179
DT
2005 lkb->lkb_highbast = high;
2006 }
2007 }
2008}
2009
36509258
DT
2010static int modes_require_bast(struct dlm_lkb *gr, struct dlm_lkb *rq)
2011{
2012 if ((gr->lkb_grmode == DLM_LOCK_PR && rq->lkb_rqmode == DLM_LOCK_CW) ||
2013 (gr->lkb_grmode == DLM_LOCK_CW && rq->lkb_rqmode == DLM_LOCK_PR)) {
2014 if (gr->lkb_highbast < DLM_LOCK_EX)
2015 return 1;
2016 return 0;
2017 }
2018
2019 if (gr->lkb_highbast < rq->lkb_rqmode && !modes_compat(gr, rq))
2020 return 1;
2021 return 0;
2022}
2023
e7fd4179
DT
2024static void send_bast_queue(struct dlm_rsb *r, struct list_head *head,
2025 struct dlm_lkb *lkb)
2026{
2027 struct dlm_lkb *gr;
2028
2029 list_for_each_entry(gr, head, lkb_statequeue) {
314dd2a0
SW
2030 /* skip self when sending basts to convertqueue */
2031 if (gr == lkb)
2032 continue;
e5dae548 2033 if (gr->lkb_bastfn && modes_require_bast(gr, lkb)) {
e7fd4179
DT
2034 queue_bast(r, gr, lkb->lkb_rqmode);
2035 gr->lkb_highbast = lkb->lkb_rqmode;
2036 }
2037 }
2038}
2039
2040static void send_blocking_asts(struct dlm_rsb *r, struct dlm_lkb *lkb)
2041{
2042 send_bast_queue(r, &r->res_grantqueue, lkb);
2043}
2044
2045static void send_blocking_asts_all(struct dlm_rsb *r, struct dlm_lkb *lkb)
2046{
2047 send_bast_queue(r, &r->res_grantqueue, lkb);
2048 send_bast_queue(r, &r->res_convertqueue, lkb);
2049}
2050
2051/* set_master(r, lkb) -- set the master nodeid of a resource
2052
2053 The purpose of this function is to set the nodeid field in the given
2054 lkb using the nodeid field in the given rsb. If the rsb's nodeid is
2055 known, it can just be copied to the lkb and the function will return
2056 0. If the rsb's nodeid is _not_ known, it needs to be looked up
2057 before it can be copied to the lkb.
2058
2059 When the rsb nodeid is being looked up remotely, the initial lkb
2060 causing the lookup is kept on the ls_waiters list waiting for the
2061 lookup reply. Other lkb's waiting for the same rsb lookup are kept
2062 on the rsb's res_lookup list until the master is verified.
2063
2064 Return values:
2065 0: nodeid is set in rsb/lkb and the caller should go ahead and use it
2066 1: the rsb master is not available and the lkb has been placed on
2067 a wait queue
2068*/
2069
2070static int set_master(struct dlm_rsb *r, struct dlm_lkb *lkb)
2071{
2072 struct dlm_ls *ls = r->res_ls;
755b5eb8 2073 int i, error, dir_nodeid, ret_nodeid, our_nodeid = dlm_our_nodeid();
e7fd4179
DT
2074
2075 if (rsb_flag(r, RSB_MASTER_UNCERTAIN)) {
2076 rsb_clear_flag(r, RSB_MASTER_UNCERTAIN);
2077 r->res_first_lkid = lkb->lkb_id;
2078 lkb->lkb_nodeid = r->res_nodeid;
2079 return 0;
2080 }
2081
2082 if (r->res_first_lkid && r->res_first_lkid != lkb->lkb_id) {
2083 list_add_tail(&lkb->lkb_rsb_lookup, &r->res_lookup);
2084 return 1;
2085 }
2086
2087 if (r->res_nodeid == 0) {
2088 lkb->lkb_nodeid = 0;
2089 return 0;
2090 }
2091
2092 if (r->res_nodeid > 0) {
2093 lkb->lkb_nodeid = r->res_nodeid;
2094 return 0;
2095 }
2096
a345da3e 2097 DLM_ASSERT(r->res_nodeid == -1, dlm_dump_rsb(r););
e7fd4179
DT
2098
2099 dir_nodeid = dlm_dir_nodeid(r);
2100
2101 if (dir_nodeid != our_nodeid) {
2102 r->res_first_lkid = lkb->lkb_id;
2103 send_lookup(r, lkb);
2104 return 1;
2105 }
2106
755b5eb8 2107 for (i = 0; i < 2; i++) {
e7fd4179
DT
2108 /* It's possible for dlm_scand to remove an old rsb for
2109 this same resource from the toss list, us to create
2110 a new one, look up the master locally, and find it
2111 already exists just before dlm_scand does the
2112 dir_remove() on the previous rsb. */
2113
2114 error = dlm_dir_lookup(ls, our_nodeid, r->res_name,
2115 r->res_length, &ret_nodeid);
2116 if (!error)
2117 break;
2118 log_debug(ls, "dir_lookup error %d %s", error, r->res_name);
2119 schedule();
2120 }
755b5eb8
DT
2121 if (error && error != -EEXIST)
2122 return error;
e7fd4179
DT
2123
2124 if (ret_nodeid == our_nodeid) {
2125 r->res_first_lkid = 0;
2126 r->res_nodeid = 0;
2127 lkb->lkb_nodeid = 0;
2128 } else {
2129 r->res_first_lkid = lkb->lkb_id;
2130 r->res_nodeid = ret_nodeid;
2131 lkb->lkb_nodeid = ret_nodeid;
2132 }
2133 return 0;
2134}
2135
2136static void process_lookup_list(struct dlm_rsb *r)
2137{
2138 struct dlm_lkb *lkb, *safe;
2139
2140 list_for_each_entry_safe(lkb, safe, &r->res_lookup, lkb_rsb_lookup) {
ef0c2bb0 2141 list_del_init(&lkb->lkb_rsb_lookup);
e7fd4179
DT
2142 _request_lock(r, lkb);
2143 schedule();
2144 }
2145}
2146
2147/* confirm_master -- confirm (or deny) an rsb's master nodeid */
2148
2149static void confirm_master(struct dlm_rsb *r, int error)
2150{
2151 struct dlm_lkb *lkb;
2152
2153 if (!r->res_first_lkid)
2154 return;
2155
2156 switch (error) {
2157 case 0:
2158 case -EINPROGRESS:
2159 r->res_first_lkid = 0;
2160 process_lookup_list(r);
2161 break;
2162
2163 case -EAGAIN:
aec64e1b
DT
2164 case -EBADR:
2165 case -ENOTBLK:
2166 /* the remote request failed and won't be retried (it was
2167 a NOQUEUE, or has been canceled/unlocked); make a waiting
2168 lkb the first_lkid */
e7fd4179
DT
2169
2170 r->res_first_lkid = 0;
2171
2172 if (!list_empty(&r->res_lookup)) {
2173 lkb = list_entry(r->res_lookup.next, struct dlm_lkb,
2174 lkb_rsb_lookup);
ef0c2bb0 2175 list_del_init(&lkb->lkb_rsb_lookup);
e7fd4179
DT
2176 r->res_first_lkid = lkb->lkb_id;
2177 _request_lock(r, lkb);
761b9d3f 2178 }
e7fd4179
DT
2179 break;
2180
2181 default:
2182 log_error(r->res_ls, "confirm_master unknown error %d", error);
2183 }
2184}
2185
2186static int set_lock_args(int mode, struct dlm_lksb *lksb, uint32_t flags,
e5dae548
DT
2187 int namelen, unsigned long timeout_cs,
2188 void (*ast) (void *astparam),
2189 void *astparam,
2190 void (*bast) (void *astparam, int mode),
2191 struct dlm_args *args)
e7fd4179
DT
2192{
2193 int rv = -EINVAL;
2194
2195 /* check for invalid arg usage */
2196
2197 if (mode < 0 || mode > DLM_LOCK_EX)
2198 goto out;
2199
2200 if (!(flags & DLM_LKF_CONVERT) && (namelen > DLM_RESNAME_MAXLEN))
2201 goto out;
2202
2203 if (flags & DLM_LKF_CANCEL)
2204 goto out;
2205
2206 if (flags & DLM_LKF_QUECVT && !(flags & DLM_LKF_CONVERT))
2207 goto out;
2208
2209 if (flags & DLM_LKF_CONVDEADLK && !(flags & DLM_LKF_CONVERT))
2210 goto out;
2211
2212 if (flags & DLM_LKF_CONVDEADLK && flags & DLM_LKF_NOQUEUE)
2213 goto out;
2214
2215 if (flags & DLM_LKF_EXPEDITE && flags & DLM_LKF_CONVERT)
2216 goto out;
2217
2218 if (flags & DLM_LKF_EXPEDITE && flags & DLM_LKF_QUECVT)
2219 goto out;
2220
2221 if (flags & DLM_LKF_EXPEDITE && flags & DLM_LKF_NOQUEUE)
2222 goto out;
2223
2224 if (flags & DLM_LKF_EXPEDITE && mode != DLM_LOCK_NL)
2225 goto out;
2226
2227 if (!ast || !lksb)
2228 goto out;
2229
2230 if (flags & DLM_LKF_VALBLK && !lksb->sb_lvbptr)
2231 goto out;
2232
e7fd4179
DT
2233 if (flags & DLM_LKF_CONVERT && !lksb->sb_lkid)
2234 goto out;
2235
2236 /* these args will be copied to the lkb in validate_lock_args,
2237 it cannot be done now because when converting locks, fields in
2238 an active lkb cannot be modified before locking the rsb */
2239
2240 args->flags = flags;
e5dae548
DT
2241 args->astfn = ast;
2242 args->astparam = astparam;
2243 args->bastfn = bast;
d7db923e 2244 args->timeout = timeout_cs;
e7fd4179
DT
2245 args->mode = mode;
2246 args->lksb = lksb;
e7fd4179
DT
2247 rv = 0;
2248 out:
2249 return rv;
2250}
2251
2252static int set_unlock_args(uint32_t flags, void *astarg, struct dlm_args *args)
2253{
2254 if (flags & ~(DLM_LKF_CANCEL | DLM_LKF_VALBLK | DLM_LKF_IVVALBLK |
2255 DLM_LKF_FORCEUNLOCK))
2256 return -EINVAL;
2257
ef0c2bb0
DT
2258 if (flags & DLM_LKF_CANCEL && flags & DLM_LKF_FORCEUNLOCK)
2259 return -EINVAL;
2260
e7fd4179 2261 args->flags = flags;
e5dae548 2262 args->astparam = astarg;
e7fd4179
DT
2263 return 0;
2264}
2265
2266static int validate_lock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
2267 struct dlm_args *args)
2268{
2269 int rv = -EINVAL;
2270
2271 if (args->flags & DLM_LKF_CONVERT) {
2272 if (lkb->lkb_flags & DLM_IFL_MSTCPY)
2273 goto out;
2274
2275 if (args->flags & DLM_LKF_QUECVT &&
2276 !__quecvt_compat_matrix[lkb->lkb_grmode+1][args->mode+1])
2277 goto out;
2278
2279 rv = -EBUSY;
2280 if (lkb->lkb_status != DLM_LKSTS_GRANTED)
2281 goto out;
2282
2283 if (lkb->lkb_wait_type)
2284 goto out;
ef0c2bb0
DT
2285
2286 if (is_overlap(lkb))
2287 goto out;
e7fd4179
DT
2288 }
2289
2290 lkb->lkb_exflags = args->flags;
2291 lkb->lkb_sbflags = 0;
e5dae548 2292 lkb->lkb_astfn = args->astfn;
e7fd4179 2293 lkb->lkb_astparam = args->astparam;
e5dae548 2294 lkb->lkb_bastfn = args->bastfn;
e7fd4179
DT
2295 lkb->lkb_rqmode = args->mode;
2296 lkb->lkb_lksb = args->lksb;
2297 lkb->lkb_lvbptr = args->lksb->sb_lvbptr;
2298 lkb->lkb_ownpid = (int) current->pid;
d7db923e 2299 lkb->lkb_timeout_cs = args->timeout;
e7fd4179
DT
2300 rv = 0;
2301 out:
43279e53
DT
2302 if (rv)
2303 log_debug(ls, "validate_lock_args %d %x %x %x %d %d %s",
2304 rv, lkb->lkb_id, lkb->lkb_flags, args->flags,
2305 lkb->lkb_status, lkb->lkb_wait_type,
2306 lkb->lkb_resource->res_name);
e7fd4179
DT
2307 return rv;
2308}
2309
ef0c2bb0
DT
2310/* when dlm_unlock() sees -EBUSY with CANCEL/FORCEUNLOCK it returns 0
2311 for success */
2312
2313/* note: it's valid for lkb_nodeid/res_nodeid to be -1 when we get here
2314 because there may be a lookup in progress and it's valid to do
2315 cancel/unlockf on it */
2316
e7fd4179
DT
2317static int validate_unlock_args(struct dlm_lkb *lkb, struct dlm_args *args)
2318{
ef0c2bb0 2319 struct dlm_ls *ls = lkb->lkb_resource->res_ls;
e7fd4179
DT
2320 int rv = -EINVAL;
2321
ef0c2bb0
DT
2322 if (lkb->lkb_flags & DLM_IFL_MSTCPY) {
2323 log_error(ls, "unlock on MSTCPY %x", lkb->lkb_id);
2324 dlm_print_lkb(lkb);
e7fd4179 2325 goto out;
ef0c2bb0 2326 }
e7fd4179 2327
ef0c2bb0
DT
2328 /* an lkb may still exist even though the lock is EOL'ed due to a
2329 cancel, unlock or failed noqueue request; an app can't use these
2330 locks; return same error as if the lkid had not been found at all */
e7fd4179 2331
ef0c2bb0
DT
2332 if (lkb->lkb_flags & DLM_IFL_ENDOFLIFE) {
2333 log_debug(ls, "unlock on ENDOFLIFE %x", lkb->lkb_id);
2334 rv = -ENOENT;
e7fd4179 2335 goto out;
ef0c2bb0 2336 }
e7fd4179 2337
ef0c2bb0
DT
2338 /* an lkb may be waiting for an rsb lookup to complete where the
2339 lookup was initiated by another lock */
2340
42dc1601
DT
2341 if (!list_empty(&lkb->lkb_rsb_lookup)) {
2342 if (args->flags & (DLM_LKF_CANCEL | DLM_LKF_FORCEUNLOCK)) {
ef0c2bb0
DT
2343 log_debug(ls, "unlock on rsb_lookup %x", lkb->lkb_id);
2344 list_del_init(&lkb->lkb_rsb_lookup);
2345 queue_cast(lkb->lkb_resource, lkb,
2346 args->flags & DLM_LKF_CANCEL ?
2347 -DLM_ECANCEL : -DLM_EUNLOCK);
2348 unhold_lkb(lkb); /* undoes create_lkb() */
ef0c2bb0 2349 }
42dc1601
DT
2350 /* caller changes -EBUSY to 0 for CANCEL and FORCEUNLOCK */
2351 rv = -EBUSY;
2352 goto out;
ef0c2bb0
DT
2353 }
2354
2355 /* cancel not allowed with another cancel/unlock in progress */
2356
2357 if (args->flags & DLM_LKF_CANCEL) {
2358 if (lkb->lkb_exflags & DLM_LKF_CANCEL)
2359 goto out;
2360
2361 if (is_overlap(lkb))
2362 goto out;
2363
3ae1acf9
DT
2364 /* don't let scand try to do a cancel */
2365 del_timeout(lkb);
2366
ef0c2bb0
DT
2367 if (lkb->lkb_flags & DLM_IFL_RESEND) {
2368 lkb->lkb_flags |= DLM_IFL_OVERLAP_CANCEL;
2369 rv = -EBUSY;
2370 goto out;
2371 }
2372
a536e381
DT
2373 /* there's nothing to cancel */
2374 if (lkb->lkb_status == DLM_LKSTS_GRANTED &&
2375 !lkb->lkb_wait_type) {
2376 rv = -EBUSY;
2377 goto out;
2378 }
2379
ef0c2bb0
DT
2380 switch (lkb->lkb_wait_type) {
2381 case DLM_MSG_LOOKUP:
2382 case DLM_MSG_REQUEST:
2383 lkb->lkb_flags |= DLM_IFL_OVERLAP_CANCEL;
2384 rv = -EBUSY;
2385 goto out;
2386 case DLM_MSG_UNLOCK:
2387 case DLM_MSG_CANCEL:
2388 goto out;
2389 }
2390 /* add_to_waiters() will set OVERLAP_CANCEL */
2391 goto out_ok;
2392 }
2393
2394 /* do we need to allow a force-unlock if there's a normal unlock
2395 already in progress? in what conditions could the normal unlock
2396 fail such that we'd want to send a force-unlock to be sure? */
2397
2398 if (args->flags & DLM_LKF_FORCEUNLOCK) {
2399 if (lkb->lkb_exflags & DLM_LKF_FORCEUNLOCK)
2400 goto out;
2401
2402 if (is_overlap_unlock(lkb))
2403 goto out;
e7fd4179 2404
3ae1acf9
DT
2405 /* don't let scand try to do a cancel */
2406 del_timeout(lkb);
2407
ef0c2bb0
DT
2408 if (lkb->lkb_flags & DLM_IFL_RESEND) {
2409 lkb->lkb_flags |= DLM_IFL_OVERLAP_UNLOCK;
2410 rv = -EBUSY;
2411 goto out;
2412 }
2413
2414 switch (lkb->lkb_wait_type) {
2415 case DLM_MSG_LOOKUP:
2416 case DLM_MSG_REQUEST:
2417 lkb->lkb_flags |= DLM_IFL_OVERLAP_UNLOCK;
2418 rv = -EBUSY;
2419 goto out;
2420 case DLM_MSG_UNLOCK:
2421 goto out;
2422 }
2423 /* add_to_waiters() will set OVERLAP_UNLOCK */
2424 goto out_ok;
2425 }
2426
2427 /* normal unlock not allowed if there's any op in progress */
e7fd4179 2428 rv = -EBUSY;
ef0c2bb0 2429 if (lkb->lkb_wait_type || lkb->lkb_wait_count)
e7fd4179
DT
2430 goto out;
2431
2432 out_ok:
ef0c2bb0
DT
2433 /* an overlapping op shouldn't blow away exflags from other op */
2434 lkb->lkb_exflags |= args->flags;
e7fd4179
DT
2435 lkb->lkb_sbflags = 0;
2436 lkb->lkb_astparam = args->astparam;
e7fd4179
DT
2437 rv = 0;
2438 out:
ef0c2bb0
DT
2439 if (rv)
2440 log_debug(ls, "validate_unlock_args %d %x %x %x %x %d %s", rv,
2441 lkb->lkb_id, lkb->lkb_flags, lkb->lkb_exflags,
2442 args->flags, lkb->lkb_wait_type,
2443 lkb->lkb_resource->res_name);
e7fd4179
DT
2444 return rv;
2445}
2446
2447/*
2448 * Four stage 4 varieties:
2449 * do_request(), do_convert(), do_unlock(), do_cancel()
2450 * These are called on the master node for the given lock and
2451 * from the central locking logic.
2452 */
2453
2454static int do_request(struct dlm_rsb *r, struct dlm_lkb *lkb)
2455{
2456 int error = 0;
2457
c85d65e9 2458 if (can_be_granted(r, lkb, 1, NULL)) {
e7fd4179
DT
2459 grant_lock(r, lkb);
2460 queue_cast(r, lkb, 0);
2461 goto out;
2462 }
2463
2464 if (can_be_queued(lkb)) {
2465 error = -EINPROGRESS;
2466 add_lkb(r, lkb, DLM_LKSTS_WAITING);
3ae1acf9 2467 add_timeout(lkb);
e7fd4179
DT
2468 goto out;
2469 }
2470
2471 error = -EAGAIN;
e7fd4179 2472 queue_cast(r, lkb, -EAGAIN);
e7fd4179
DT
2473 out:
2474 return error;
2475}
2476
cf6620ac
DT
2477static void do_request_effects(struct dlm_rsb *r, struct dlm_lkb *lkb,
2478 int error)
2479{
2480 switch (error) {
2481 case -EAGAIN:
2482 if (force_blocking_asts(lkb))
2483 send_blocking_asts_all(r, lkb);
2484 break;
2485 case -EINPROGRESS:
2486 send_blocking_asts(r, lkb);
2487 break;
2488 }
2489}
2490
e7fd4179
DT
2491static int do_convert(struct dlm_rsb *r, struct dlm_lkb *lkb)
2492{
2493 int error = 0;
c85d65e9 2494 int deadlk = 0;
e7fd4179
DT
2495
2496 /* changing an existing lock may allow others to be granted */
2497
c85d65e9 2498 if (can_be_granted(r, lkb, 1, &deadlk)) {
e7fd4179
DT
2499 grant_lock(r, lkb);
2500 queue_cast(r, lkb, 0);
e7fd4179
DT
2501 goto out;
2502 }
2503
c85d65e9
DT
2504 /* can_be_granted() detected that this lock would block in a conversion
2505 deadlock, so we leave it on the granted queue and return EDEADLK in
2506 the ast for the convert. */
2507
2508 if (deadlk) {
2509 /* it's left on the granted queue */
c85d65e9
DT
2510 revert_lock(r, lkb);
2511 queue_cast(r, lkb, -EDEADLK);
2512 error = -EDEADLK;
2513 goto out;
2514 }
2515
7d3c1feb
DT
2516 /* is_demoted() means the can_be_granted() above set the grmode
2517 to NL, and left us on the granted queue. This auto-demotion
2518 (due to CONVDEADLK) might mean other locks, and/or this lock, are
2519 now grantable. We have to try to grant other converting locks
2520 before we try again to grant this one. */
2521
2522 if (is_demoted(lkb)) {
36509258 2523 grant_pending_convert(r, DLM_LOCK_IV, NULL);
7d3c1feb
DT
2524 if (_can_be_granted(r, lkb, 1)) {
2525 grant_lock(r, lkb);
2526 queue_cast(r, lkb, 0);
7d3c1feb
DT
2527 goto out;
2528 }
2529 /* else fall through and move to convert queue */
2530 }
2531
2532 if (can_be_queued(lkb)) {
e7fd4179
DT
2533 error = -EINPROGRESS;
2534 del_lkb(r, lkb);
2535 add_lkb(r, lkb, DLM_LKSTS_CONVERT);
3ae1acf9 2536 add_timeout(lkb);
e7fd4179
DT
2537 goto out;
2538 }
2539
2540 error = -EAGAIN;
e7fd4179 2541 queue_cast(r, lkb, -EAGAIN);
e7fd4179
DT
2542 out:
2543 return error;
2544}
2545
cf6620ac
DT
2546static void do_convert_effects(struct dlm_rsb *r, struct dlm_lkb *lkb,
2547 int error)
2548{
2549 switch (error) {
2550 case 0:
2551 grant_pending_locks(r);
2552 /* grant_pending_locks also sends basts */
2553 break;
2554 case -EAGAIN:
2555 if (force_blocking_asts(lkb))
2556 send_blocking_asts_all(r, lkb);
2557 break;
2558 case -EINPROGRESS:
2559 send_blocking_asts(r, lkb);
2560 break;
2561 }
2562}
2563
e7fd4179
DT
2564static int do_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2565{
2566 remove_lock(r, lkb);
2567 queue_cast(r, lkb, -DLM_EUNLOCK);
e7fd4179
DT
2568 return -DLM_EUNLOCK;
2569}
2570
cf6620ac
DT
2571static void do_unlock_effects(struct dlm_rsb *r, struct dlm_lkb *lkb,
2572 int error)
2573{
2574 grant_pending_locks(r);
2575}
2576
ef0c2bb0 2577/* returns: 0 did nothing, -DLM_ECANCEL canceled lock */
907b9bce 2578
e7fd4179
DT
2579static int do_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb)
2580{
ef0c2bb0
DT
2581 int error;
2582
2583 error = revert_lock(r, lkb);
2584 if (error) {
2585 queue_cast(r, lkb, -DLM_ECANCEL);
ef0c2bb0
DT
2586 return -DLM_ECANCEL;
2587 }
2588 return 0;
e7fd4179
DT
2589}
2590
cf6620ac
DT
2591static void do_cancel_effects(struct dlm_rsb *r, struct dlm_lkb *lkb,
2592 int error)
2593{
2594 if (error)
2595 grant_pending_locks(r);
2596}
2597
e7fd4179
DT
2598/*
2599 * Four stage 3 varieties:
2600 * _request_lock(), _convert_lock(), _unlock_lock(), _cancel_lock()
2601 */
2602
2603/* add a new lkb to a possibly new rsb, called by requesting process */
2604
2605static int _request_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2606{
2607 int error;
2608
2609 /* set_master: sets lkb nodeid from r */
2610
2611 error = set_master(r, lkb);
2612 if (error < 0)
2613 goto out;
2614 if (error) {
2615 error = 0;
2616 goto out;
2617 }
2618
cf6620ac 2619 if (is_remote(r)) {
e7fd4179
DT
2620 /* receive_request() calls do_request() on remote node */
2621 error = send_request(r, lkb);
cf6620ac 2622 } else {
e7fd4179 2623 error = do_request(r, lkb);
cf6620ac
DT
2624 /* for remote locks the request_reply is sent
2625 between do_request and do_request_effects */
2626 do_request_effects(r, lkb, error);
2627 }
e7fd4179
DT
2628 out:
2629 return error;
2630}
2631
3bcd3687 2632/* change some property of an existing lkb, e.g. mode */
e7fd4179
DT
2633
2634static int _convert_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2635{
2636 int error;
2637
cf6620ac 2638 if (is_remote(r)) {
e7fd4179
DT
2639 /* receive_convert() calls do_convert() on remote node */
2640 error = send_convert(r, lkb);
cf6620ac 2641 } else {
e7fd4179 2642 error = do_convert(r, lkb);
cf6620ac
DT
2643 /* for remote locks the convert_reply is sent
2644 between do_convert and do_convert_effects */
2645 do_convert_effects(r, lkb, error);
2646 }
e7fd4179
DT
2647
2648 return error;
2649}
2650
2651/* remove an existing lkb from the granted queue */
2652
2653static int _unlock_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2654{
2655 int error;
2656
cf6620ac 2657 if (is_remote(r)) {
e7fd4179
DT
2658 /* receive_unlock() calls do_unlock() on remote node */
2659 error = send_unlock(r, lkb);
cf6620ac 2660 } else {
e7fd4179 2661 error = do_unlock(r, lkb);
cf6620ac
DT
2662 /* for remote locks the unlock_reply is sent
2663 between do_unlock and do_unlock_effects */
2664 do_unlock_effects(r, lkb, error);
2665 }
e7fd4179
DT
2666
2667 return error;
2668}
2669
2670/* remove an existing lkb from the convert or wait queue */
2671
2672static int _cancel_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2673{
2674 int error;
2675
cf6620ac 2676 if (is_remote(r)) {
e7fd4179
DT
2677 /* receive_cancel() calls do_cancel() on remote node */
2678 error = send_cancel(r, lkb);
cf6620ac 2679 } else {
e7fd4179 2680 error = do_cancel(r, lkb);
cf6620ac
DT
2681 /* for remote locks the cancel_reply is sent
2682 between do_cancel and do_cancel_effects */
2683 do_cancel_effects(r, lkb, error);
2684 }
e7fd4179
DT
2685
2686 return error;
2687}
2688
2689/*
2690 * Four stage 2 varieties:
2691 * request_lock(), convert_lock(), unlock_lock(), cancel_lock()
2692 */
2693
2694static int request_lock(struct dlm_ls *ls, struct dlm_lkb *lkb, char *name,
2695 int len, struct dlm_args *args)
2696{
2697 struct dlm_rsb *r;
2698 int error;
2699
2700 error = validate_lock_args(ls, lkb, args);
2701 if (error)
2702 goto out;
2703
2704 error = find_rsb(ls, name, len, R_CREATE, &r);
2705 if (error)
2706 goto out;
2707
2708 lock_rsb(r);
2709
2710 attach_lkb(r, lkb);
2711 lkb->lkb_lksb->sb_lkid = lkb->lkb_id;
2712
2713 error = _request_lock(r, lkb);
2714
2715 unlock_rsb(r);
2716 put_rsb(r);
2717
2718 out:
2719 return error;
2720}
2721
2722static int convert_lock(struct dlm_ls *ls, struct dlm_lkb *lkb,
2723 struct dlm_args *args)
2724{
2725 struct dlm_rsb *r;
2726 int error;
2727
2728 r = lkb->lkb_resource;
2729
2730 hold_rsb(r);
2731 lock_rsb(r);
2732
2733 error = validate_lock_args(ls, lkb, args);
2734 if (error)
2735 goto out;
2736
2737 error = _convert_lock(r, lkb);
2738 out:
2739 unlock_rsb(r);
2740 put_rsb(r);
2741 return error;
2742}
2743
2744static int unlock_lock(struct dlm_ls *ls, struct dlm_lkb *lkb,
2745 struct dlm_args *args)
2746{
2747 struct dlm_rsb *r;
2748 int error;
2749
2750 r = lkb->lkb_resource;
2751
2752 hold_rsb(r);
2753 lock_rsb(r);
2754
2755 error = validate_unlock_args(lkb, args);
2756 if (error)
2757 goto out;
2758
2759 error = _unlock_lock(r, lkb);
2760 out:
2761 unlock_rsb(r);
2762 put_rsb(r);
2763 return error;
2764}
2765
2766static int cancel_lock(struct dlm_ls *ls, struct dlm_lkb *lkb,
2767 struct dlm_args *args)
2768{
2769 struct dlm_rsb *r;
2770 int error;
2771
2772 r = lkb->lkb_resource;
2773
2774 hold_rsb(r);
2775 lock_rsb(r);
2776
2777 error = validate_unlock_args(lkb, args);
2778 if (error)
2779 goto out;
2780
2781 error = _cancel_lock(r, lkb);
2782 out:
2783 unlock_rsb(r);
2784 put_rsb(r);
2785 return error;
2786}
2787
2788/*
2789 * Two stage 1 varieties: dlm_lock() and dlm_unlock()
2790 */
2791
2792int dlm_lock(dlm_lockspace_t *lockspace,
2793 int mode,
2794 struct dlm_lksb *lksb,
2795 uint32_t flags,
2796 void *name,
2797 unsigned int namelen,
2798 uint32_t parent_lkid,
2799 void (*ast) (void *astarg),
2800 void *astarg,
3bcd3687 2801 void (*bast) (void *astarg, int mode))
e7fd4179
DT
2802{
2803 struct dlm_ls *ls;
2804 struct dlm_lkb *lkb;
2805 struct dlm_args args;
2806 int error, convert = flags & DLM_LKF_CONVERT;
2807
2808 ls = dlm_find_lockspace_local(lockspace);
2809 if (!ls)
2810 return -EINVAL;
2811
85e86edf 2812 dlm_lock_recovery(ls);
e7fd4179
DT
2813
2814 if (convert)
2815 error = find_lkb(ls, lksb->sb_lkid, &lkb);
2816 else
2817 error = create_lkb(ls, &lkb);
2818
2819 if (error)
2820 goto out;
2821
d7db923e 2822 error = set_lock_args(mode, lksb, flags, namelen, 0, ast,
3bcd3687 2823 astarg, bast, &args);
e7fd4179
DT
2824 if (error)
2825 goto out_put;
2826
2827 if (convert)
2828 error = convert_lock(ls, lkb, &args);
2829 else
2830 error = request_lock(ls, lkb, name, namelen, &args);
2831
2832 if (error == -EINPROGRESS)
2833 error = 0;
2834 out_put:
2835 if (convert || error)
b3f58d8f 2836 __put_lkb(ls, lkb);
c85d65e9 2837 if (error == -EAGAIN || error == -EDEADLK)
e7fd4179
DT
2838 error = 0;
2839 out:
85e86edf 2840 dlm_unlock_recovery(ls);
e7fd4179
DT
2841 dlm_put_lockspace(ls);
2842 return error;
2843}
2844
2845int dlm_unlock(dlm_lockspace_t *lockspace,
2846 uint32_t lkid,
2847 uint32_t flags,
2848 struct dlm_lksb *lksb,
2849 void *astarg)
2850{
2851 struct dlm_ls *ls;
2852 struct dlm_lkb *lkb;
2853 struct dlm_args args;
2854 int error;
2855
2856 ls = dlm_find_lockspace_local(lockspace);
2857 if (!ls)
2858 return -EINVAL;
2859
85e86edf 2860 dlm_lock_recovery(ls);
e7fd4179
DT
2861
2862 error = find_lkb(ls, lkid, &lkb);
2863 if (error)
2864 goto out;
2865
2866 error = set_unlock_args(flags, astarg, &args);
2867 if (error)
2868 goto out_put;
2869
2870 if (flags & DLM_LKF_CANCEL)
2871 error = cancel_lock(ls, lkb, &args);
2872 else
2873 error = unlock_lock(ls, lkb, &args);
2874
2875 if (error == -DLM_EUNLOCK || error == -DLM_ECANCEL)
2876 error = 0;
ef0c2bb0
DT
2877 if (error == -EBUSY && (flags & (DLM_LKF_CANCEL | DLM_LKF_FORCEUNLOCK)))
2878 error = 0;
e7fd4179 2879 out_put:
b3f58d8f 2880 dlm_put_lkb(lkb);
e7fd4179 2881 out:
85e86edf 2882 dlm_unlock_recovery(ls);
e7fd4179
DT
2883 dlm_put_lockspace(ls);
2884 return error;
2885}
2886
2887/*
2888 * send/receive routines for remote operations and replies
2889 *
2890 * send_args
2891 * send_common
2892 * send_request receive_request
2893 * send_convert receive_convert
2894 * send_unlock receive_unlock
2895 * send_cancel receive_cancel
2896 * send_grant receive_grant
2897 * send_bast receive_bast
2898 * send_lookup receive_lookup
2899 * send_remove receive_remove
2900 *
2901 * send_common_reply
2902 * receive_request_reply send_request_reply
2903 * receive_convert_reply send_convert_reply
2904 * receive_unlock_reply send_unlock_reply
2905 * receive_cancel_reply send_cancel_reply
2906 * receive_lookup_reply send_lookup_reply
2907 */
2908
7e4dac33
DT
2909static int _create_message(struct dlm_ls *ls, int mb_len,
2910 int to_nodeid, int mstype,
2911 struct dlm_message **ms_ret,
2912 struct dlm_mhandle **mh_ret)
e7fd4179
DT
2913{
2914 struct dlm_message *ms;
2915 struct dlm_mhandle *mh;
2916 char *mb;
e7fd4179
DT
2917
2918 /* get_buffer gives us a message handle (mh) that we need to
2919 pass into lowcomms_commit and a message buffer (mb) that we
2920 write our data into */
2921
573c24c4 2922 mh = dlm_lowcomms_get_buffer(to_nodeid, mb_len, GFP_NOFS, &mb);
e7fd4179
DT
2923 if (!mh)
2924 return -ENOBUFS;
2925
2926 memset(mb, 0, mb_len);
2927
2928 ms = (struct dlm_message *) mb;
2929
2930 ms->m_header.h_version = (DLM_HEADER_MAJOR | DLM_HEADER_MINOR);
7e4dac33 2931 ms->m_header.h_lockspace = ls->ls_global_id;
e7fd4179
DT
2932 ms->m_header.h_nodeid = dlm_our_nodeid();
2933 ms->m_header.h_length = mb_len;
2934 ms->m_header.h_cmd = DLM_MSG;
2935
2936 ms->m_type = mstype;
2937
2938 *mh_ret = mh;
2939 *ms_ret = ms;
2940 return 0;
2941}
2942
7e4dac33
DT
2943static int create_message(struct dlm_rsb *r, struct dlm_lkb *lkb,
2944 int to_nodeid, int mstype,
2945 struct dlm_message **ms_ret,
2946 struct dlm_mhandle **mh_ret)
2947{
2948 int mb_len = sizeof(struct dlm_message);
2949
2950 switch (mstype) {
2951 case DLM_MSG_REQUEST:
2952 case DLM_MSG_LOOKUP:
2953 case DLM_MSG_REMOVE:
2954 mb_len += r->res_length;
2955 break;
2956 case DLM_MSG_CONVERT:
2957 case DLM_MSG_UNLOCK:
2958 case DLM_MSG_REQUEST_REPLY:
2959 case DLM_MSG_CONVERT_REPLY:
2960 case DLM_MSG_GRANT:
2961 if (lkb && lkb->lkb_lvbptr)
2962 mb_len += r->res_ls->ls_lvblen;
2963 break;
2964 }
2965
2966 return _create_message(r->res_ls, mb_len, to_nodeid, mstype,
2967 ms_ret, mh_ret);
2968}
2969
e7fd4179
DT
2970/* further lowcomms enhancements or alternate implementations may make
2971 the return value from this function useful at some point */
2972
2973static int send_message(struct dlm_mhandle *mh, struct dlm_message *ms)
2974{
2975 dlm_message_out(ms);
2976 dlm_lowcomms_commit_buffer(mh);
2977 return 0;
2978}
2979
2980static void send_args(struct dlm_rsb *r, struct dlm_lkb *lkb,
2981 struct dlm_message *ms)
2982{
2983 ms->m_nodeid = lkb->lkb_nodeid;
2984 ms->m_pid = lkb->lkb_ownpid;
2985 ms->m_lkid = lkb->lkb_id;
2986 ms->m_remid = lkb->lkb_remid;
2987 ms->m_exflags = lkb->lkb_exflags;
2988 ms->m_sbflags = lkb->lkb_sbflags;
2989 ms->m_flags = lkb->lkb_flags;
2990 ms->m_lvbseq = lkb->lkb_lvbseq;
2991 ms->m_status = lkb->lkb_status;
2992 ms->m_grmode = lkb->lkb_grmode;
2993 ms->m_rqmode = lkb->lkb_rqmode;
2994 ms->m_hash = r->res_hash;
2995
2996 /* m_result and m_bastmode are set from function args,
2997 not from lkb fields */
2998
e5dae548 2999 if (lkb->lkb_bastfn)
8304d6f2 3000 ms->m_asts |= DLM_CB_BAST;
e5dae548 3001 if (lkb->lkb_astfn)
8304d6f2 3002 ms->m_asts |= DLM_CB_CAST;
e7fd4179 3003
da49f36f
DT
3004 /* compare with switch in create_message; send_remove() doesn't
3005 use send_args() */
e7fd4179 3006
da49f36f
DT
3007 switch (ms->m_type) {
3008 case DLM_MSG_REQUEST:
3009 case DLM_MSG_LOOKUP:
3010 memcpy(ms->m_extra, r->res_name, r->res_length);
3011 break;
3012 case DLM_MSG_CONVERT:
3013 case DLM_MSG_UNLOCK:
3014 case DLM_MSG_REQUEST_REPLY:
3015 case DLM_MSG_CONVERT_REPLY:
3016 case DLM_MSG_GRANT:
3017 if (!lkb->lkb_lvbptr)
3018 break;
e7fd4179 3019 memcpy(ms->m_extra, lkb->lkb_lvbptr, r->res_ls->ls_lvblen);
da49f36f
DT
3020 break;
3021 }
e7fd4179
DT
3022}
3023
3024static int send_common(struct dlm_rsb *r, struct dlm_lkb *lkb, int mstype)
3025{
3026 struct dlm_message *ms;
3027 struct dlm_mhandle *mh;
3028 int to_nodeid, error;
3029
c6ff669b
DT
3030 to_nodeid = r->res_nodeid;
3031
3032 error = add_to_waiters(lkb, mstype, to_nodeid);
ef0c2bb0
DT
3033 if (error)
3034 return error;
e7fd4179 3035
e7fd4179
DT
3036 error = create_message(r, lkb, to_nodeid, mstype, &ms, &mh);
3037 if (error)
3038 goto fail;
3039
3040 send_args(r, lkb, ms);
3041
3042 error = send_message(mh, ms);
3043 if (error)
3044 goto fail;
3045 return 0;
3046
3047 fail:
ef0c2bb0 3048 remove_from_waiters(lkb, msg_reply_type(mstype));
e7fd4179
DT
3049 return error;
3050}
3051
3052static int send_request(struct dlm_rsb *r, struct dlm_lkb *lkb)
3053{
3054 return send_common(r, lkb, DLM_MSG_REQUEST);
3055}
3056
3057static int send_convert(struct dlm_rsb *r, struct dlm_lkb *lkb)
3058{
3059 int error;
3060
3061 error = send_common(r, lkb, DLM_MSG_CONVERT);
3062
3063 /* down conversions go without a reply from the master */
3064 if (!error && down_conversion(lkb)) {
ef0c2bb0 3065 remove_from_waiters(lkb, DLM_MSG_CONVERT_REPLY);
2a7ce0ed 3066 r->res_ls->ls_stub_ms.m_flags = DLM_IFL_STUB_MS;
ef0c2bb0 3067 r->res_ls->ls_stub_ms.m_type = DLM_MSG_CONVERT_REPLY;
e7fd4179
DT
3068 r->res_ls->ls_stub_ms.m_result = 0;
3069 __receive_convert_reply(r, lkb, &r->res_ls->ls_stub_ms);
3070 }
3071
3072 return error;
3073}
3074
3075/* FIXME: if this lkb is the only lock we hold on the rsb, then set
3076 MASTER_UNCERTAIN to force the next request on the rsb to confirm
3077 that the master is still correct. */
3078
3079static int send_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb)
3080{
3081 return send_common(r, lkb, DLM_MSG_UNLOCK);
3082}
3083
3084static int send_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb)
3085{
3086 return send_common(r, lkb, DLM_MSG_CANCEL);
3087}
3088
3089static int send_grant(struct dlm_rsb *r, struct dlm_lkb *lkb)
3090{
3091 struct dlm_message *ms;
3092 struct dlm_mhandle *mh;
3093 int to_nodeid, error;
3094
3095 to_nodeid = lkb->lkb_nodeid;
3096
3097 error = create_message(r, lkb, to_nodeid, DLM_MSG_GRANT, &ms, &mh);
3098 if (error)
3099 goto out;
3100
3101 send_args(r, lkb, ms);
3102
3103 ms->m_result = 0;
3104
3105 error = send_message(mh, ms);
3106 out:
3107 return error;
3108}
3109
3110static int send_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int mode)
3111{
3112 struct dlm_message *ms;
3113 struct dlm_mhandle *mh;
3114 int to_nodeid, error;
3115
3116 to_nodeid = lkb->lkb_nodeid;
3117
3118 error = create_message(r, NULL, to_nodeid, DLM_MSG_BAST, &ms, &mh);
3119 if (error)
3120 goto out;
3121
3122 send_args(r, lkb, ms);
3123
3124 ms->m_bastmode = mode;
3125
3126 error = send_message(mh, ms);
3127 out:
3128 return error;
3129}
3130
3131static int send_lookup(struct dlm_rsb *r, struct dlm_lkb *lkb)
3132{
3133 struct dlm_message *ms;
3134 struct dlm_mhandle *mh;
3135 int to_nodeid, error;
3136
c6ff669b
DT
3137 to_nodeid = dlm_dir_nodeid(r);
3138
3139 error = add_to_waiters(lkb, DLM_MSG_LOOKUP, to_nodeid);
ef0c2bb0
DT
3140 if (error)
3141 return error;
e7fd4179 3142
e7fd4179
DT
3143 error = create_message(r, NULL, to_nodeid, DLM_MSG_LOOKUP, &ms, &mh);
3144 if (error)
3145 goto fail;
3146
3147 send_args(r, lkb, ms);
3148
3149 error = send_message(mh, ms);
3150 if (error)
3151 goto fail;
3152 return 0;
3153
3154 fail:
ef0c2bb0 3155 remove_from_waiters(lkb, DLM_MSG_LOOKUP_REPLY);
e7fd4179
DT
3156 return error;
3157}
3158
3159static int send_remove(struct dlm_rsb *r)
3160{
3161 struct dlm_message *ms;
3162 struct dlm_mhandle *mh;
3163 int to_nodeid, error;
3164
3165 to_nodeid = dlm_dir_nodeid(r);
3166
3167 error = create_message(r, NULL, to_nodeid, DLM_MSG_REMOVE, &ms, &mh);
3168 if (error)
3169 goto out;
3170
3171 memcpy(ms->m_extra, r->res_name, r->res_length);
3172 ms->m_hash = r->res_hash;
3173
3174 error = send_message(mh, ms);
3175 out:
3176 return error;
3177}
3178
3179static int send_common_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
3180 int mstype, int rv)
3181{
3182 struct dlm_message *ms;
3183 struct dlm_mhandle *mh;
3184 int to_nodeid, error;
3185
3186 to_nodeid = lkb->lkb_nodeid;
3187
3188 error = create_message(r, lkb, to_nodeid, mstype, &ms, &mh);
3189 if (error)
3190 goto out;
3191
3192 send_args(r, lkb, ms);
3193
3194 ms->m_result = rv;
3195
3196 error = send_message(mh, ms);
3197 out:
3198 return error;
3199}
3200
3201static int send_request_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
3202{
3203 return send_common_reply(r, lkb, DLM_MSG_REQUEST_REPLY, rv);
3204}
3205
3206static int send_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
3207{
3208 return send_common_reply(r, lkb, DLM_MSG_CONVERT_REPLY, rv);
3209}
3210
3211static int send_unlock_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
3212{
3213 return send_common_reply(r, lkb, DLM_MSG_UNLOCK_REPLY, rv);
3214}
3215
3216static int send_cancel_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
3217{
3218 return send_common_reply(r, lkb, DLM_MSG_CANCEL_REPLY, rv);
3219}
3220
3221static int send_lookup_reply(struct dlm_ls *ls, struct dlm_message *ms_in,
3222 int ret_nodeid, int rv)
3223{
3224 struct dlm_rsb *r = &ls->ls_stub_rsb;
3225 struct dlm_message *ms;
3226 struct dlm_mhandle *mh;
3227 int error, nodeid = ms_in->m_header.h_nodeid;
3228
3229 error = create_message(r, NULL, nodeid, DLM_MSG_LOOKUP_REPLY, &ms, &mh);
3230 if (error)
3231 goto out;
3232
3233 ms->m_lkid = ms_in->m_lkid;
3234 ms->m_result = rv;
3235 ms->m_nodeid = ret_nodeid;
3236
3237 error = send_message(mh, ms);
3238 out:
3239 return error;
3240}
3241
3242/* which args we save from a received message depends heavily on the type
3243 of message, unlike the send side where we can safely send everything about
3244 the lkb for any type of message */
3245
3246static void receive_flags(struct dlm_lkb *lkb, struct dlm_message *ms)
3247{
3248 lkb->lkb_exflags = ms->m_exflags;
6f90a8b1 3249 lkb->lkb_sbflags = ms->m_sbflags;
e7fd4179
DT
3250 lkb->lkb_flags = (lkb->lkb_flags & 0xFFFF0000) |
3251 (ms->m_flags & 0x0000FFFF);
3252}
3253
3254static void receive_flags_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
3255{
2a7ce0ed
DT
3256 if (ms->m_flags == DLM_IFL_STUB_MS)
3257 return;
3258
e7fd4179
DT
3259 lkb->lkb_sbflags = ms->m_sbflags;
3260 lkb->lkb_flags = (lkb->lkb_flags & 0xFFFF0000) |
3261 (ms->m_flags & 0x0000FFFF);
3262}
3263
3264static int receive_extralen(struct dlm_message *ms)
3265{
3266 return (ms->m_header.h_length - sizeof(struct dlm_message));
3267}
3268
e7fd4179
DT
3269static int receive_lvb(struct dlm_ls *ls, struct dlm_lkb *lkb,
3270 struct dlm_message *ms)
3271{
3272 int len;
3273
3274 if (lkb->lkb_exflags & DLM_LKF_VALBLK) {
3275 if (!lkb->lkb_lvbptr)
52bda2b5 3276 lkb->lkb_lvbptr = dlm_allocate_lvb(ls);
e7fd4179
DT
3277 if (!lkb->lkb_lvbptr)
3278 return -ENOMEM;
3279 len = receive_extralen(ms);
a9cc9159
AV
3280 if (len > DLM_RESNAME_MAXLEN)
3281 len = DLM_RESNAME_MAXLEN;
e7fd4179
DT
3282 memcpy(lkb->lkb_lvbptr, ms->m_extra, len);
3283 }
3284 return 0;
3285}
3286
e5dae548
DT
3287static void fake_bastfn(void *astparam, int mode)
3288{
3289 log_print("fake_bastfn should not be called");
3290}
3291
3292static void fake_astfn(void *astparam)
3293{
3294 log_print("fake_astfn should not be called");
3295}
3296
e7fd4179
DT
3297static int receive_request_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
3298 struct dlm_message *ms)
3299{
3300 lkb->lkb_nodeid = ms->m_header.h_nodeid;
3301 lkb->lkb_ownpid = ms->m_pid;
3302 lkb->lkb_remid = ms->m_lkid;
3303 lkb->lkb_grmode = DLM_LOCK_IV;
3304 lkb->lkb_rqmode = ms->m_rqmode;
e5dae548 3305
8304d6f2
DT
3306 lkb->lkb_bastfn = (ms->m_asts & DLM_CB_BAST) ? &fake_bastfn : NULL;
3307 lkb->lkb_astfn = (ms->m_asts & DLM_CB_CAST) ? &fake_astfn : NULL;
e7fd4179 3308
8d07fd50
DT
3309 if (lkb->lkb_exflags & DLM_LKF_VALBLK) {
3310 /* lkb was just created so there won't be an lvb yet */
52bda2b5 3311 lkb->lkb_lvbptr = dlm_allocate_lvb(ls);
8d07fd50
DT
3312 if (!lkb->lkb_lvbptr)
3313 return -ENOMEM;
3314 }
e7fd4179
DT
3315
3316 return 0;
3317}
3318
3319static int receive_convert_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
3320 struct dlm_message *ms)
3321{
e7fd4179
DT
3322 if (lkb->lkb_status != DLM_LKSTS_GRANTED)
3323 return -EBUSY;
3324
e7fd4179
DT
3325 if (receive_lvb(ls, lkb, ms))
3326 return -ENOMEM;
3327
3328 lkb->lkb_rqmode = ms->m_rqmode;
3329 lkb->lkb_lvbseq = ms->m_lvbseq;
3330
3331 return 0;
3332}
3333
3334static int receive_unlock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
3335 struct dlm_message *ms)
3336{
e7fd4179
DT
3337 if (receive_lvb(ls, lkb, ms))
3338 return -ENOMEM;
3339 return 0;
3340}
3341
3342/* We fill in the stub-lkb fields with the info that send_xxxx_reply()
3343 uses to send a reply and that the remote end uses to process the reply. */
3344
3345static void setup_stub_lkb(struct dlm_ls *ls, struct dlm_message *ms)
3346{
3347 struct dlm_lkb *lkb = &ls->ls_stub_lkb;
3348 lkb->lkb_nodeid = ms->m_header.h_nodeid;
3349 lkb->lkb_remid = ms->m_lkid;
3350}
3351
c54e04b0
DT
3352/* This is called after the rsb is locked so that we can safely inspect
3353 fields in the lkb. */
3354
3355static int validate_message(struct dlm_lkb *lkb, struct dlm_message *ms)
3356{
3357 int from = ms->m_header.h_nodeid;
3358 int error = 0;
3359
3360 switch (ms->m_type) {
3361 case DLM_MSG_CONVERT:
3362 case DLM_MSG_UNLOCK:
3363 case DLM_MSG_CANCEL:
3364 if (!is_master_copy(lkb) || lkb->lkb_nodeid != from)
3365 error = -EINVAL;
3366 break;
3367
3368 case DLM_MSG_CONVERT_REPLY:
3369 case DLM_MSG_UNLOCK_REPLY:
3370 case DLM_MSG_CANCEL_REPLY:
3371 case DLM_MSG_GRANT:
3372 case DLM_MSG_BAST:
3373 if (!is_process_copy(lkb) || lkb->lkb_nodeid != from)
3374 error = -EINVAL;
3375 break;
3376
3377 case DLM_MSG_REQUEST_REPLY:
3378 if (!is_process_copy(lkb))
3379 error = -EINVAL;
3380 else if (lkb->lkb_nodeid != -1 && lkb->lkb_nodeid != from)
3381 error = -EINVAL;
3382 break;
3383
3384 default:
3385 error = -EINVAL;
3386 }
3387
3388 if (error)
3389 log_error(lkb->lkb_resource->res_ls,
3390 "ignore invalid message %d from %d %x %x %x %d",
3391 ms->m_type, from, lkb->lkb_id, lkb->lkb_remid,
3392 lkb->lkb_flags, lkb->lkb_nodeid);
3393 return error;
3394}
3395
6d40c4a7 3396static int receive_request(struct dlm_ls *ls, struct dlm_message *ms)
e7fd4179
DT
3397{
3398 struct dlm_lkb *lkb;
3399 struct dlm_rsb *r;
3400 int error, namelen;
3401
3402 error = create_lkb(ls, &lkb);
3403 if (error)
3404 goto fail;
3405
3406 receive_flags(lkb, ms);
3407 lkb->lkb_flags |= DLM_IFL_MSTCPY;
3408 error = receive_request_args(ls, lkb, ms);
3409 if (error) {
b3f58d8f 3410 __put_lkb(ls, lkb);
e7fd4179
DT
3411 goto fail;
3412 }
3413
3414 namelen = receive_extralen(ms);
3415
3416 error = find_rsb(ls, ms->m_extra, namelen, R_MASTER, &r);
3417 if (error) {
b3f58d8f 3418 __put_lkb(ls, lkb);
e7fd4179
DT
3419 goto fail;
3420 }
3421
3422 lock_rsb(r);
3423
3424 attach_lkb(r, lkb);
3425 error = do_request(r, lkb);
3426 send_request_reply(r, lkb, error);
cf6620ac 3427 do_request_effects(r, lkb, error);
e7fd4179
DT
3428
3429 unlock_rsb(r);
3430 put_rsb(r);
3431
3432 if (error == -EINPROGRESS)
3433 error = 0;
3434 if (error)
b3f58d8f 3435 dlm_put_lkb(lkb);
6d40c4a7 3436 return 0;
e7fd4179
DT
3437
3438 fail:
3439 setup_stub_lkb(ls, ms);
3440 send_request_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
6d40c4a7 3441 return error;
e7fd4179
DT
3442}
3443
6d40c4a7 3444static int receive_convert(struct dlm_ls *ls, struct dlm_message *ms)
e7fd4179
DT
3445{
3446 struct dlm_lkb *lkb;
3447 struct dlm_rsb *r;
90135925 3448 int error, reply = 1;
e7fd4179
DT
3449
3450 error = find_lkb(ls, ms->m_remid, &lkb);
3451 if (error)
3452 goto fail;
3453
6d40c4a7
DT
3454 if (lkb->lkb_remid != ms->m_lkid) {
3455 log_error(ls, "receive_convert %x remid %x remote %d %x",
3456 lkb->lkb_id, lkb->lkb_remid,
3457 ms->m_header.h_nodeid, ms->m_lkid);
3458 error = -ENOENT;
3459 goto fail;
3460 }
3461
e7fd4179
DT
3462 r = lkb->lkb_resource;
3463
3464 hold_rsb(r);
3465 lock_rsb(r);
3466
c54e04b0
DT
3467 error = validate_message(lkb, ms);
3468 if (error)
3469 goto out;
3470
e7fd4179 3471 receive_flags(lkb, ms);
cf6620ac 3472
e7fd4179 3473 error = receive_convert_args(ls, lkb, ms);
cf6620ac
DT
3474 if (error) {
3475 send_convert_reply(r, lkb, error);
3476 goto out;
3477 }
3478
e7fd4179
DT
3479 reply = !down_conversion(lkb);
3480
3481 error = do_convert(r, lkb);
e7fd4179
DT
3482 if (reply)
3483 send_convert_reply(r, lkb, error);
cf6620ac 3484 do_convert_effects(r, lkb, error);
c54e04b0 3485 out:
e7fd4179
DT
3486 unlock_rsb(r);
3487 put_rsb(r);
b3f58d8f 3488 dlm_put_lkb(lkb);
6d40c4a7 3489 return 0;
e7fd4179
DT
3490
3491 fail:
3492 setup_stub_lkb(ls, ms);
3493 send_convert_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
6d40c4a7 3494 return error;
e7fd4179
DT
3495}
3496
6d40c4a7 3497static int receive_unlock(struct dlm_ls *ls, struct dlm_message *ms)
e7fd4179
DT
3498{
3499 struct dlm_lkb *lkb;
3500 struct dlm_rsb *r;
3501 int error;
3502
3503 error = find_lkb(ls, ms->m_remid, &lkb);
3504 if (error)
3505 goto fail;
3506
6d40c4a7
DT
3507 if (lkb->lkb_remid != ms->m_lkid) {
3508 log_error(ls, "receive_unlock %x remid %x remote %d %x",
3509 lkb->lkb_id, lkb->lkb_remid,
3510 ms->m_header.h_nodeid, ms->m_lkid);
3511 error = -ENOENT;
3512 goto fail;
3513 }
3514
e7fd4179
DT
3515 r = lkb->lkb_resource;
3516
3517 hold_rsb(r);
3518 lock_rsb(r);
3519
c54e04b0
DT
3520 error = validate_message(lkb, ms);
3521 if (error)
3522 goto out;
3523
e7fd4179 3524 receive_flags(lkb, ms);
cf6620ac 3525
e7fd4179 3526 error = receive_unlock_args(ls, lkb, ms);
cf6620ac
DT
3527 if (error) {
3528 send_unlock_reply(r, lkb, error);
3529 goto out;
3530 }
e7fd4179
DT
3531
3532 error = do_unlock(r, lkb);
e7fd4179 3533 send_unlock_reply(r, lkb, error);
cf6620ac 3534 do_unlock_effects(r, lkb, error);
c54e04b0 3535 out:
e7fd4179
DT
3536 unlock_rsb(r);
3537 put_rsb(r);
b3f58d8f 3538 dlm_put_lkb(lkb);
6d40c4a7 3539 return 0;
e7fd4179
DT
3540
3541 fail:
3542 setup_stub_lkb(ls, ms);
3543 send_unlock_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
6d40c4a7 3544 return error;
e7fd4179
DT
3545}
3546
6d40c4a7 3547static int receive_cancel(struct dlm_ls *ls, struct dlm_message *ms)
e7fd4179
DT
3548{
3549 struct dlm_lkb *lkb;
3550 struct dlm_rsb *r;
3551 int error;
3552
3553 error = find_lkb(ls, ms->m_remid, &lkb);
3554 if (error)
3555 goto fail;
3556
3557 receive_flags(lkb, ms);
3558
3559 r = lkb->lkb_resource;
3560
3561 hold_rsb(r);
3562 lock_rsb(r);
3563
c54e04b0
DT
3564 error = validate_message(lkb, ms);
3565 if (error)
3566 goto out;
3567
e7fd4179
DT
3568 error = do_cancel(r, lkb);
3569 send_cancel_reply(r, lkb, error);
cf6620ac 3570 do_cancel_effects(r, lkb, error);
c54e04b0 3571 out:
e7fd4179
DT
3572 unlock_rsb(r);
3573 put_rsb(r);
b3f58d8f 3574 dlm_put_lkb(lkb);
6d40c4a7 3575 return 0;
e7fd4179
DT
3576
3577 fail:
3578 setup_stub_lkb(ls, ms);
3579 send_cancel_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
6d40c4a7 3580 return error;
e7fd4179
DT
3581}
3582
6d40c4a7 3583static int receive_grant(struct dlm_ls *ls, struct dlm_message *ms)
e7fd4179
DT
3584{
3585 struct dlm_lkb *lkb;
3586 struct dlm_rsb *r;
3587 int error;
3588
3589 error = find_lkb(ls, ms->m_remid, &lkb);
6d40c4a7
DT
3590 if (error)
3591 return error;
e7fd4179
DT
3592
3593 r = lkb->lkb_resource;
3594
3595 hold_rsb(r);
3596 lock_rsb(r);
3597
c54e04b0
DT
3598 error = validate_message(lkb, ms);
3599 if (error)
3600 goto out;
3601
e7fd4179 3602 receive_flags_reply(lkb, ms);
7d3c1feb
DT
3603 if (is_altmode(lkb))
3604 munge_altmode(lkb, ms);
e7fd4179
DT
3605 grant_lock_pc(r, lkb, ms);
3606 queue_cast(r, lkb, 0);
c54e04b0 3607 out:
e7fd4179
DT
3608 unlock_rsb(r);
3609 put_rsb(r);
b3f58d8f 3610 dlm_put_lkb(lkb);
6d40c4a7 3611 return 0;
e7fd4179
DT
3612}
3613
6d40c4a7 3614static int receive_bast(struct dlm_ls *ls, struct dlm_message *ms)
e7fd4179
DT
3615{
3616 struct dlm_lkb *lkb;
3617 struct dlm_rsb *r;
3618 int error;
3619
3620 error = find_lkb(ls, ms->m_remid, &lkb);
6d40c4a7
DT
3621 if (error)
3622 return error;
e7fd4179
DT
3623
3624 r = lkb->lkb_resource;
3625
3626 hold_rsb(r);
3627 lock_rsb(r);
3628
c54e04b0
DT
3629 error = validate_message(lkb, ms);
3630 if (error)
3631 goto out;
e7fd4179 3632
c54e04b0
DT
3633 queue_bast(r, lkb, ms->m_bastmode);
3634 out:
e7fd4179
DT
3635 unlock_rsb(r);
3636 put_rsb(r);
b3f58d8f 3637 dlm_put_lkb(lkb);
6d40c4a7 3638 return 0;
e7fd4179
DT
3639}
3640
3641static void receive_lookup(struct dlm_ls *ls, struct dlm_message *ms)
3642{
3643 int len, error, ret_nodeid, dir_nodeid, from_nodeid, our_nodeid;
3644
3645 from_nodeid = ms->m_header.h_nodeid;
3646 our_nodeid = dlm_our_nodeid();
3647
3648 len = receive_extralen(ms);
3649
3650 dir_nodeid = dlm_hash2nodeid(ls, ms->m_hash);
3651 if (dir_nodeid != our_nodeid) {
3652 log_error(ls, "lookup dir_nodeid %d from %d",
3653 dir_nodeid, from_nodeid);
3654 error = -EINVAL;
3655 ret_nodeid = -1;
3656 goto out;
3657 }
3658
3659 error = dlm_dir_lookup(ls, from_nodeid, ms->m_extra, len, &ret_nodeid);
3660
3661 /* Optimization: we're master so treat lookup as a request */
3662 if (!error && ret_nodeid == our_nodeid) {
3663 receive_request(ls, ms);
3664 return;
3665 }
3666 out:
3667 send_lookup_reply(ls, ms, ret_nodeid, error);
3668}
3669
3670static void receive_remove(struct dlm_ls *ls, struct dlm_message *ms)
3671{
3672 int len, dir_nodeid, from_nodeid;
3673
3674 from_nodeid = ms->m_header.h_nodeid;
3675
3676 len = receive_extralen(ms);
3677
3678 dir_nodeid = dlm_hash2nodeid(ls, ms->m_hash);
3679 if (dir_nodeid != dlm_our_nodeid()) {
3680 log_error(ls, "remove dir entry dir_nodeid %d from %d",
3681 dir_nodeid, from_nodeid);
3682 return;
3683 }
3684
3685 dlm_dir_remove_entry(ls, from_nodeid, ms->m_extra, len);
3686}
3687
8499137d
DT
3688static void receive_purge(struct dlm_ls *ls, struct dlm_message *ms)
3689{
3690 do_purge(ls, ms->m_nodeid, ms->m_pid);
3691}
3692
6d40c4a7 3693static int receive_request_reply(struct dlm_ls *ls, struct dlm_message *ms)
e7fd4179
DT
3694{
3695 struct dlm_lkb *lkb;
3696 struct dlm_rsb *r;
ef0c2bb0 3697 int error, mstype, result;
e7fd4179
DT
3698
3699 error = find_lkb(ls, ms->m_remid, &lkb);
6d40c4a7
DT
3700 if (error)
3701 return error;
e7fd4179 3702
e7fd4179
DT
3703 r = lkb->lkb_resource;
3704 hold_rsb(r);
3705 lock_rsb(r);
3706
c54e04b0
DT
3707 error = validate_message(lkb, ms);
3708 if (error)
3709 goto out;
3710
ef0c2bb0
DT
3711 mstype = lkb->lkb_wait_type;
3712 error = remove_from_waiters(lkb, DLM_MSG_REQUEST_REPLY);
3713 if (error)
3714 goto out;
3715
e7fd4179
DT
3716 /* Optimization: the dir node was also the master, so it took our
3717 lookup as a request and sent request reply instead of lookup reply */
3718 if (mstype == DLM_MSG_LOOKUP) {
3719 r->res_nodeid = ms->m_header.h_nodeid;
3720 lkb->lkb_nodeid = r->res_nodeid;
3721 }
3722
ef0c2bb0
DT
3723 /* this is the value returned from do_request() on the master */
3724 result = ms->m_result;
3725
3726 switch (result) {
e7fd4179 3727 case -EAGAIN:
ef0c2bb0 3728 /* request would block (be queued) on remote master */
e7fd4179
DT
3729 queue_cast(r, lkb, -EAGAIN);
3730 confirm_master(r, -EAGAIN);
ef0c2bb0 3731 unhold_lkb(lkb); /* undoes create_lkb() */
e7fd4179
DT
3732 break;
3733
3734 case -EINPROGRESS:
3735 case 0:
3736 /* request was queued or granted on remote master */
3737 receive_flags_reply(lkb, ms);
3738 lkb->lkb_remid = ms->m_lkid;
7d3c1feb
DT
3739 if (is_altmode(lkb))
3740 munge_altmode(lkb, ms);
3ae1acf9 3741 if (result) {
e7fd4179 3742 add_lkb(r, lkb, DLM_LKSTS_WAITING);
3ae1acf9
DT
3743 add_timeout(lkb);
3744 } else {
e7fd4179
DT
3745 grant_lock_pc(r, lkb, ms);
3746 queue_cast(r, lkb, 0);
3747 }
ef0c2bb0 3748 confirm_master(r, result);
e7fd4179
DT
3749 break;
3750
597d0cae 3751 case -EBADR:
e7fd4179
DT
3752 case -ENOTBLK:
3753 /* find_rsb failed to find rsb or rsb wasn't master */
ef0c2bb0
DT
3754 log_debug(ls, "receive_request_reply %x %x master diff %d %d",
3755 lkb->lkb_id, lkb->lkb_flags, r->res_nodeid, result);
e7fd4179
DT
3756 r->res_nodeid = -1;
3757 lkb->lkb_nodeid = -1;
ef0c2bb0
DT
3758
3759 if (is_overlap(lkb)) {
3760 /* we'll ignore error in cancel/unlock reply */
3761 queue_cast_overlap(r, lkb);
aec64e1b 3762 confirm_master(r, result);
ef0c2bb0
DT
3763 unhold_lkb(lkb); /* undoes create_lkb() */
3764 } else
3765 _request_lock(r, lkb);
e7fd4179
DT
3766 break;
3767
3768 default:
ef0c2bb0
DT
3769 log_error(ls, "receive_request_reply %x error %d",
3770 lkb->lkb_id, result);
e7fd4179
DT
3771 }
3772
ef0c2bb0
DT
3773 if (is_overlap_unlock(lkb) && (result == 0 || result == -EINPROGRESS)) {
3774 log_debug(ls, "receive_request_reply %x result %d unlock",
3775 lkb->lkb_id, result);
3776 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK;
3777 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
3778 send_unlock(r, lkb);
3779 } else if (is_overlap_cancel(lkb) && (result == -EINPROGRESS)) {
3780 log_debug(ls, "receive_request_reply %x cancel", lkb->lkb_id);
3781 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK;
3782 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
3783 send_cancel(r, lkb);
3784 } else {
3785 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
3786 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK;
3787 }
3788 out:
e7fd4179
DT
3789 unlock_rsb(r);
3790 put_rsb(r);
b3f58d8f 3791 dlm_put_lkb(lkb);
6d40c4a7 3792 return 0;
e7fd4179
DT
3793}
3794
3795static void __receive_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
3796 struct dlm_message *ms)
3797{
e7fd4179 3798 /* this is the value returned from do_convert() on the master */
ef0c2bb0 3799 switch (ms->m_result) {
e7fd4179
DT
3800 case -EAGAIN:
3801 /* convert would block (be queued) on remote master */
3802 queue_cast(r, lkb, -EAGAIN);
3803 break;
3804
c85d65e9
DT
3805 case -EDEADLK:
3806 receive_flags_reply(lkb, ms);
3807 revert_lock_pc(r, lkb);
3808 queue_cast(r, lkb, -EDEADLK);
3809 break;
3810
e7fd4179
DT
3811 case -EINPROGRESS:
3812 /* convert was queued on remote master */
7d3c1feb
DT
3813 receive_flags_reply(lkb, ms);
3814 if (is_demoted(lkb))
2a7ce0ed 3815 munge_demoted(lkb);
e7fd4179
DT
3816 del_lkb(r, lkb);
3817 add_lkb(r, lkb, DLM_LKSTS_CONVERT);
3ae1acf9 3818 add_timeout(lkb);
e7fd4179
DT
3819 break;
3820
3821 case 0:
3822 /* convert was granted on remote master */
3823 receive_flags_reply(lkb, ms);
7d3c1feb 3824 if (is_demoted(lkb))
2a7ce0ed 3825 munge_demoted(lkb);
e7fd4179
DT
3826 grant_lock_pc(r, lkb, ms);
3827 queue_cast(r, lkb, 0);
3828 break;
3829
3830 default:
6d40c4a7
DT
3831 log_error(r->res_ls, "receive_convert_reply %x remote %d %x %d",
3832 lkb->lkb_id, ms->m_header.h_nodeid, ms->m_lkid,
3833 ms->m_result);
3834 dlm_print_rsb(r);
3835 dlm_print_lkb(lkb);
e7fd4179
DT
3836 }
3837}
3838
3839static void _receive_convert_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
3840{
3841 struct dlm_rsb *r = lkb->lkb_resource;
ef0c2bb0 3842 int error;
e7fd4179
DT
3843
3844 hold_rsb(r);
3845 lock_rsb(r);
3846
c54e04b0
DT
3847 error = validate_message(lkb, ms);
3848 if (error)
3849 goto out;
3850
ef0c2bb0
DT
3851 /* stub reply can happen with waiters_mutex held */
3852 error = remove_from_waiters_ms(lkb, ms);
3853 if (error)
3854 goto out;
e7fd4179 3855
ef0c2bb0
DT
3856 __receive_convert_reply(r, lkb, ms);
3857 out:
e7fd4179
DT
3858 unlock_rsb(r);
3859 put_rsb(r);
3860}
3861
6d40c4a7 3862static int receive_convert_reply(struct dlm_ls *ls, struct dlm_message *ms)
e7fd4179
DT
3863{
3864 struct dlm_lkb *lkb;
3865 int error;
3866
3867 error = find_lkb(ls, ms->m_remid, &lkb);
6d40c4a7
DT
3868 if (error)
3869 return error;
e7fd4179 3870
e7fd4179 3871 _receive_convert_reply(lkb, ms);
b3f58d8f 3872 dlm_put_lkb(lkb);
6d40c4a7 3873 return 0;
e7fd4179
DT
3874}
3875
3876static void _receive_unlock_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
3877{
3878 struct dlm_rsb *r = lkb->lkb_resource;
ef0c2bb0 3879 int error;
e7fd4179
DT
3880
3881 hold_rsb(r);
3882 lock_rsb(r);
3883
c54e04b0
DT
3884 error = validate_message(lkb, ms);
3885 if (error)
3886 goto out;
3887
ef0c2bb0
DT
3888 /* stub reply can happen with waiters_mutex held */
3889 error = remove_from_waiters_ms(lkb, ms);
3890 if (error)
3891 goto out;
3892
e7fd4179
DT
3893 /* this is the value returned from do_unlock() on the master */
3894
ef0c2bb0 3895 switch (ms->m_result) {
e7fd4179
DT
3896 case -DLM_EUNLOCK:
3897 receive_flags_reply(lkb, ms);
3898 remove_lock_pc(r, lkb);
3899 queue_cast(r, lkb, -DLM_EUNLOCK);
3900 break;
ef0c2bb0
DT
3901 case -ENOENT:
3902 break;
e7fd4179 3903 default:
ef0c2bb0
DT
3904 log_error(r->res_ls, "receive_unlock_reply %x error %d",
3905 lkb->lkb_id, ms->m_result);
e7fd4179 3906 }
ef0c2bb0 3907 out:
e7fd4179
DT
3908 unlock_rsb(r);
3909 put_rsb(r);
3910}
3911
6d40c4a7 3912static int receive_unlock_reply(struct dlm_ls *ls, struct dlm_message *ms)
e7fd4179
DT
3913{
3914 struct dlm_lkb *lkb;
3915 int error;
3916
3917 error = find_lkb(ls, ms->m_remid, &lkb);
6d40c4a7
DT
3918 if (error)
3919 return error;
e7fd4179 3920
e7fd4179 3921 _receive_unlock_reply(lkb, ms);
b3f58d8f 3922 dlm_put_lkb(lkb);
6d40c4a7 3923 return 0;
e7fd4179
DT
3924}
3925
3926static void _receive_cancel_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
3927{
3928 struct dlm_rsb *r = lkb->lkb_resource;
ef0c2bb0 3929 int error;
e7fd4179
DT
3930
3931 hold_rsb(r);
3932 lock_rsb(r);
3933
c54e04b0
DT
3934 error = validate_message(lkb, ms);
3935 if (error)
3936 goto out;
3937
ef0c2bb0
DT
3938 /* stub reply can happen with waiters_mutex held */
3939 error = remove_from_waiters_ms(lkb, ms);
3940 if (error)
3941 goto out;
3942
e7fd4179
DT
3943 /* this is the value returned from do_cancel() on the master */
3944
ef0c2bb0 3945 switch (ms->m_result) {
e7fd4179
DT
3946 case -DLM_ECANCEL:
3947 receive_flags_reply(lkb, ms);
3948 revert_lock_pc(r, lkb);
84d8cd69 3949 queue_cast(r, lkb, -DLM_ECANCEL);
ef0c2bb0
DT
3950 break;
3951 case 0:
e7fd4179
DT
3952 break;
3953 default:
ef0c2bb0
DT
3954 log_error(r->res_ls, "receive_cancel_reply %x error %d",
3955 lkb->lkb_id, ms->m_result);
e7fd4179 3956 }
ef0c2bb0 3957 out:
e7fd4179
DT
3958 unlock_rsb(r);
3959 put_rsb(r);
3960}
3961
6d40c4a7 3962static int receive_cancel_reply(struct dlm_ls *ls, struct dlm_message *ms)
e7fd4179
DT
3963{
3964 struct dlm_lkb *lkb;
3965 int error;
3966
3967 error = find_lkb(ls, ms->m_remid, &lkb);
6d40c4a7
DT
3968 if (error)
3969 return error;
e7fd4179 3970
e7fd4179 3971 _receive_cancel_reply(lkb, ms);
b3f58d8f 3972 dlm_put_lkb(lkb);
6d40c4a7 3973 return 0;
e7fd4179
DT
3974}
3975
3976static void receive_lookup_reply(struct dlm_ls *ls, struct dlm_message *ms)
3977{
3978 struct dlm_lkb *lkb;
3979 struct dlm_rsb *r;
3980 int error, ret_nodeid;
3981
3982 error = find_lkb(ls, ms->m_lkid, &lkb);
3983 if (error) {
6d40c4a7 3984 log_error(ls, "receive_lookup_reply no lkid %x", ms->m_lkid);
e7fd4179
DT
3985 return;
3986 }
3987
ef0c2bb0 3988 /* ms->m_result is the value returned by dlm_dir_lookup on dir node
e7fd4179 3989 FIXME: will a non-zero error ever be returned? */
e7fd4179
DT
3990
3991 r = lkb->lkb_resource;
3992 hold_rsb(r);
3993 lock_rsb(r);
3994
ef0c2bb0
DT
3995 error = remove_from_waiters(lkb, DLM_MSG_LOOKUP_REPLY);
3996 if (error)
3997 goto out;
3998
e7fd4179
DT
3999 ret_nodeid = ms->m_nodeid;
4000 if (ret_nodeid == dlm_our_nodeid()) {
4001 r->res_nodeid = 0;
4002 ret_nodeid = 0;
4003 r->res_first_lkid = 0;
4004 } else {
4005 /* set_master() will copy res_nodeid to lkb_nodeid */
4006 r->res_nodeid = ret_nodeid;
4007 }
4008
ef0c2bb0
DT
4009 if (is_overlap(lkb)) {
4010 log_debug(ls, "receive_lookup_reply %x unlock %x",
4011 lkb->lkb_id, lkb->lkb_flags);
4012 queue_cast_overlap(r, lkb);
4013 unhold_lkb(lkb); /* undoes create_lkb() */
4014 goto out_list;
4015 }
4016
e7fd4179
DT
4017 _request_lock(r, lkb);
4018
ef0c2bb0 4019 out_list:
e7fd4179
DT
4020 if (!ret_nodeid)
4021 process_lookup_list(r);
ef0c2bb0 4022 out:
e7fd4179
DT
4023 unlock_rsb(r);
4024 put_rsb(r);
b3f58d8f 4025 dlm_put_lkb(lkb);
e7fd4179
DT
4026}
4027
6d40c4a7
DT
4028static void _receive_message(struct dlm_ls *ls, struct dlm_message *ms,
4029 uint32_t saved_seq)
e7fd4179 4030{
6d40c4a7
DT
4031 int error = 0, noent = 0;
4032
46b43eed
DT
4033 if (!dlm_is_member(ls, ms->m_header.h_nodeid)) {
4034 log_debug(ls, "ignore non-member message %d from %d %x %x %d",
4035 ms->m_type, ms->m_header.h_nodeid, ms->m_lkid,
4036 ms->m_remid, ms->m_result);
4037 return;
4038 }
4039
e7fd4179
DT
4040 switch (ms->m_type) {
4041
4042 /* messages sent to a master node */
4043
4044 case DLM_MSG_REQUEST:
6d40c4a7 4045 error = receive_request(ls, ms);
e7fd4179
DT
4046 break;
4047
4048 case DLM_MSG_CONVERT:
6d40c4a7 4049 error = receive_convert(ls, ms);
e7fd4179
DT
4050 break;
4051
4052 case DLM_MSG_UNLOCK:
6d40c4a7 4053 error = receive_unlock(ls, ms);
e7fd4179
DT
4054 break;
4055
4056 case DLM_MSG_CANCEL:
6d40c4a7
DT
4057 noent = 1;
4058 error = receive_cancel(ls, ms);
e7fd4179
DT
4059 break;
4060
4061 /* messages sent from a master node (replies to above) */
4062
4063 case DLM_MSG_REQUEST_REPLY:
6d40c4a7 4064 error = receive_request_reply(ls, ms);
e7fd4179
DT
4065 break;
4066
4067 case DLM_MSG_CONVERT_REPLY:
6d40c4a7 4068 error = receive_convert_reply(ls, ms);
e7fd4179
DT
4069 break;
4070
4071 case DLM_MSG_UNLOCK_REPLY:
6d40c4a7 4072 error = receive_unlock_reply(ls, ms);
e7fd4179
DT
4073 break;
4074
4075 case DLM_MSG_CANCEL_REPLY:
6d40c4a7 4076 error = receive_cancel_reply(ls, ms);
e7fd4179
DT
4077 break;
4078
4079 /* messages sent from a master node (only two types of async msg) */
4080
4081 case DLM_MSG_GRANT:
6d40c4a7
DT
4082 noent = 1;
4083 error = receive_grant(ls, ms);
e7fd4179
DT
4084 break;
4085
4086 case DLM_MSG_BAST:
6d40c4a7
DT
4087 noent = 1;
4088 error = receive_bast(ls, ms);
e7fd4179
DT
4089 break;
4090
4091 /* messages sent to a dir node */
4092
4093 case DLM_MSG_LOOKUP:
4094 receive_lookup(ls, ms);
4095 break;
4096
4097 case DLM_MSG_REMOVE:
4098 receive_remove(ls, ms);
4099 break;
4100
4101 /* messages sent from a dir node (remove has no reply) */
4102
4103 case DLM_MSG_LOOKUP_REPLY:
4104 receive_lookup_reply(ls, ms);
4105 break;
4106
8499137d
DT
4107 /* other messages */
4108
4109 case DLM_MSG_PURGE:
4110 receive_purge(ls, ms);
4111 break;
4112
e7fd4179
DT
4113 default:
4114 log_error(ls, "unknown message type %d", ms->m_type);
4115 }
6d40c4a7
DT
4116
4117 /*
4118 * When checking for ENOENT, we're checking the result of
4119 * find_lkb(m_remid):
4120 *
4121 * The lock id referenced in the message wasn't found. This may
4122 * happen in normal usage for the async messages and cancel, so
4123 * only use log_debug for them.
4124 *
4125 * Other errors are expected and normal.
4126 */
4127
4128 if (error == -ENOENT && noent) {
4129 log_debug(ls, "receive %d no %x remote %d %x seq %u",
4130 ms->m_type, ms->m_remid, ms->m_header.h_nodeid,
4131 ms->m_lkid, saved_seq);
4132 } else if (error == -ENOENT) {
4133 log_error(ls, "receive %d no %x remote %d %x seq %u",
4134 ms->m_type, ms->m_remid, ms->m_header.h_nodeid,
4135 ms->m_lkid, saved_seq);
4136
4137 if (ms->m_type == DLM_MSG_CONVERT)
4138 dlm_dump_rsb_hash(ls, ms->m_hash);
4139 }
e7fd4179
DT
4140}
4141
c36258b5
DT
4142/* If the lockspace is in recovery mode (locking stopped), then normal
4143 messages are saved on the requestqueue for processing after recovery is
4144 done. When not in recovery mode, we wait for dlm_recoverd to drain saved
4145 messages off the requestqueue before we process new ones. This occurs right
4146 after recovery completes when we transition from saving all messages on
4147 requestqueue, to processing all the saved messages, to processing new
4148 messages as they arrive. */
e7fd4179 4149
c36258b5
DT
4150static void dlm_receive_message(struct dlm_ls *ls, struct dlm_message *ms,
4151 int nodeid)
4152{
4153 if (dlm_locking_stopped(ls)) {
8b0d8e03 4154 dlm_add_requestqueue(ls, nodeid, ms);
c36258b5
DT
4155 } else {
4156 dlm_wait_requestqueue(ls);
6d40c4a7 4157 _receive_message(ls, ms, 0);
c36258b5
DT
4158 }
4159}
4160
4161/* This is called by dlm_recoverd to process messages that were saved on
4162 the requestqueue. */
4163
6d40c4a7
DT
4164void dlm_receive_message_saved(struct dlm_ls *ls, struct dlm_message *ms,
4165 uint32_t saved_seq)
c36258b5 4166{
6d40c4a7 4167 _receive_message(ls, ms, saved_seq);
c36258b5
DT
4168}
4169
4170/* This is called by the midcomms layer when something is received for
4171 the lockspace. It could be either a MSG (normal message sent as part of
4172 standard locking activity) or an RCOM (recovery message sent as part of
4173 lockspace recovery). */
4174
eef7d739 4175void dlm_receive_buffer(union dlm_packet *p, int nodeid)
c36258b5 4176{
eef7d739 4177 struct dlm_header *hd = &p->header;
c36258b5
DT
4178 struct dlm_ls *ls;
4179 int type = 0;
4180
4181 switch (hd->h_cmd) {
4182 case DLM_MSG:
eef7d739
AV
4183 dlm_message_in(&p->message);
4184 type = p->message.m_type;
c36258b5
DT
4185 break;
4186 case DLM_RCOM:
eef7d739
AV
4187 dlm_rcom_in(&p->rcom);
4188 type = p->rcom.rc_type;
c36258b5
DT
4189 break;
4190 default:
4191 log_print("invalid h_cmd %d from %u", hd->h_cmd, nodeid);
4192 return;
4193 }
4194
4195 if (hd->h_nodeid != nodeid) {
4196 log_print("invalid h_nodeid %d from %d lockspace %x",
4197 hd->h_nodeid, nodeid, hd->h_lockspace);
4198 return;
4199 }
4200
4201 ls = dlm_find_lockspace_global(hd->h_lockspace);
4202 if (!ls) {
594199eb
DT
4203 if (dlm_config.ci_log_debug)
4204 log_print("invalid lockspace %x from %d cmd %d type %d",
4205 hd->h_lockspace, nodeid, hd->h_cmd, type);
c36258b5
DT
4206
4207 if (hd->h_cmd == DLM_RCOM && type == DLM_RCOM_STATUS)
eef7d739 4208 dlm_send_ls_not_ready(nodeid, &p->rcom);
c36258b5
DT
4209 return;
4210 }
4211
4212 /* this rwsem allows dlm_ls_stop() to wait for all dlm_recv threads to
4213 be inactive (in this ls) before transitioning to recovery mode */
4214
4215 down_read(&ls->ls_recv_active);
4216 if (hd->h_cmd == DLM_MSG)
eef7d739 4217 dlm_receive_message(ls, &p->message, nodeid);
c36258b5 4218 else
eef7d739 4219 dlm_receive_rcom(ls, &p->rcom, nodeid);
c36258b5
DT
4220 up_read(&ls->ls_recv_active);
4221
4222 dlm_put_lockspace(ls);
4223}
e7fd4179 4224
2a7ce0ed
DT
4225static void recover_convert_waiter(struct dlm_ls *ls, struct dlm_lkb *lkb,
4226 struct dlm_message *ms_stub)
e7fd4179
DT
4227{
4228 if (middle_conversion(lkb)) {
4229 hold_lkb(lkb);
2a7ce0ed
DT
4230 memset(ms_stub, 0, sizeof(struct dlm_message));
4231 ms_stub->m_flags = DLM_IFL_STUB_MS;
4232 ms_stub->m_type = DLM_MSG_CONVERT_REPLY;
4233 ms_stub->m_result = -EINPROGRESS;
4234 ms_stub->m_header.h_nodeid = lkb->lkb_nodeid;
4235 _receive_convert_reply(lkb, ms_stub);
e7fd4179
DT
4236
4237 /* Same special case as in receive_rcom_lock_args() */
4238 lkb->lkb_grmode = DLM_LOCK_IV;
4239 rsb_set_flag(lkb->lkb_resource, RSB_RECOVER_CONVERT);
4240 unhold_lkb(lkb);
4241
4242 } else if (lkb->lkb_rqmode >= lkb->lkb_grmode) {
4243 lkb->lkb_flags |= DLM_IFL_RESEND;
4244 }
4245
4246 /* lkb->lkb_rqmode < lkb->lkb_grmode shouldn't happen since down
4247 conversions are async; there's no reply from the remote master */
4248}
4249
4250/* A waiting lkb needs recovery if the master node has failed, or
4251 the master node is changing (only when no directory is used) */
4252
13ef1111
DT
4253static int waiter_needs_recovery(struct dlm_ls *ls, struct dlm_lkb *lkb,
4254 int dir_nodeid)
e7fd4179 4255{
13ef1111 4256 if (dlm_is_removed(ls, lkb->lkb_wait_nodeid))
e7fd4179
DT
4257 return 1;
4258
4259 if (!dlm_no_directory(ls))
4260 return 0;
4261
13ef1111
DT
4262 if (dir_nodeid == dlm_our_nodeid())
4263 return 1;
4264
4265 if (dir_nodeid != lkb->lkb_wait_nodeid)
e7fd4179
DT
4266 return 1;
4267
4268 return 0;
4269}
4270
4271/* Recovery for locks that are waiting for replies from nodes that are now
4272 gone. We can just complete unlocks and cancels by faking a reply from the
4273 dead node. Requests and up-conversions we flag to be resent after
4274 recovery. Down-conversions can just be completed with a fake reply like
4275 unlocks. Conversions between PR and CW need special attention. */
4276
4277void dlm_recover_waiters_pre(struct dlm_ls *ls)
4278{
4279 struct dlm_lkb *lkb, *safe;
2a7ce0ed 4280 struct dlm_message *ms_stub;
601342ce 4281 int wait_type, stub_unlock_result, stub_cancel_result;
13ef1111 4282 int dir_nodeid;
e7fd4179 4283
a22ca480 4284 ms_stub = kmalloc(sizeof(struct dlm_message), GFP_KERNEL);
2a7ce0ed
DT
4285 if (!ms_stub) {
4286 log_error(ls, "dlm_recover_waiters_pre no mem");
4287 return;
4288 }
4289
90135925 4290 mutex_lock(&ls->ls_waiters_mutex);
e7fd4179
DT
4291
4292 list_for_each_entry_safe(lkb, safe, &ls->ls_waiters, lkb_wait_reply) {
2a7ce0ed 4293
13ef1111
DT
4294 dir_nodeid = dlm_dir_nodeid(lkb->lkb_resource);
4295
2a7ce0ed
DT
4296 /* exclude debug messages about unlocks because there can be so
4297 many and they aren't very interesting */
4298
4299 if (lkb->lkb_wait_type != DLM_MSG_UNLOCK) {
13ef1111
DT
4300 log_debug(ls, "waiter %x remote %x msg %d r_nodeid %d "
4301 "lkb_nodeid %d wait_nodeid %d dir_nodeid %d",
4302 lkb->lkb_id,
4303 lkb->lkb_remid,
4304 lkb->lkb_wait_type,
4305 lkb->lkb_resource->res_nodeid,
4306 lkb->lkb_nodeid,
4307 lkb->lkb_wait_nodeid,
4308 dir_nodeid);
2a7ce0ed 4309 }
e7fd4179
DT
4310
4311 /* all outstanding lookups, regardless of destination will be
4312 resent after recovery is done */
4313
4314 if (lkb->lkb_wait_type == DLM_MSG_LOOKUP) {
4315 lkb->lkb_flags |= DLM_IFL_RESEND;
4316 continue;
4317 }
4318
13ef1111 4319 if (!waiter_needs_recovery(ls, lkb, dir_nodeid))
e7fd4179
DT
4320 continue;
4321
601342ce
DT
4322 wait_type = lkb->lkb_wait_type;
4323 stub_unlock_result = -DLM_EUNLOCK;
4324 stub_cancel_result = -DLM_ECANCEL;
4325
4326 /* Main reply may have been received leaving a zero wait_type,
4327 but a reply for the overlapping op may not have been
4328 received. In that case we need to fake the appropriate
4329 reply for the overlap op. */
4330
4331 if (!wait_type) {
4332 if (is_overlap_cancel(lkb)) {
4333 wait_type = DLM_MSG_CANCEL;
4334 if (lkb->lkb_grmode == DLM_LOCK_IV)
4335 stub_cancel_result = 0;
4336 }
4337 if (is_overlap_unlock(lkb)) {
4338 wait_type = DLM_MSG_UNLOCK;
4339 if (lkb->lkb_grmode == DLM_LOCK_IV)
4340 stub_unlock_result = -ENOENT;
4341 }
4342
4343 log_debug(ls, "rwpre overlap %x %x %d %d %d",
4344 lkb->lkb_id, lkb->lkb_flags, wait_type,
4345 stub_cancel_result, stub_unlock_result);
4346 }
4347
4348 switch (wait_type) {
e7fd4179
DT
4349
4350 case DLM_MSG_REQUEST:
4351 lkb->lkb_flags |= DLM_IFL_RESEND;
4352 break;
4353
4354 case DLM_MSG_CONVERT:
2a7ce0ed 4355 recover_convert_waiter(ls, lkb, ms_stub);
e7fd4179
DT
4356 break;
4357
4358 case DLM_MSG_UNLOCK:
4359 hold_lkb(lkb);
2a7ce0ed
DT
4360 memset(ms_stub, 0, sizeof(struct dlm_message));
4361 ms_stub->m_flags = DLM_IFL_STUB_MS;
4362 ms_stub->m_type = DLM_MSG_UNLOCK_REPLY;
4363 ms_stub->m_result = stub_unlock_result;
4364 ms_stub->m_header.h_nodeid = lkb->lkb_nodeid;
4365 _receive_unlock_reply(lkb, ms_stub);
b3f58d8f 4366 dlm_put_lkb(lkb);
e7fd4179
DT
4367 break;
4368
4369 case DLM_MSG_CANCEL:
4370 hold_lkb(lkb);
2a7ce0ed
DT
4371 memset(ms_stub, 0, sizeof(struct dlm_message));
4372 ms_stub->m_flags = DLM_IFL_STUB_MS;
4373 ms_stub->m_type = DLM_MSG_CANCEL_REPLY;
4374 ms_stub->m_result = stub_cancel_result;
4375 ms_stub->m_header.h_nodeid = lkb->lkb_nodeid;
4376 _receive_cancel_reply(lkb, ms_stub);
b3f58d8f 4377 dlm_put_lkb(lkb);
e7fd4179
DT
4378 break;
4379
4380 default:
601342ce
DT
4381 log_error(ls, "invalid lkb wait_type %d %d",
4382 lkb->lkb_wait_type, wait_type);
e7fd4179 4383 }
81456807 4384 schedule();
e7fd4179 4385 }
90135925 4386 mutex_unlock(&ls->ls_waiters_mutex);
2a7ce0ed 4387 kfree(ms_stub);
e7fd4179
DT
4388}
4389
ef0c2bb0 4390static struct dlm_lkb *find_resend_waiter(struct dlm_ls *ls)
e7fd4179
DT
4391{
4392 struct dlm_lkb *lkb;
ef0c2bb0 4393 int found = 0;
e7fd4179 4394
90135925 4395 mutex_lock(&ls->ls_waiters_mutex);
e7fd4179
DT
4396 list_for_each_entry(lkb, &ls->ls_waiters, lkb_wait_reply) {
4397 if (lkb->lkb_flags & DLM_IFL_RESEND) {
ef0c2bb0
DT
4398 hold_lkb(lkb);
4399 found = 1;
e7fd4179
DT
4400 break;
4401 }
4402 }
90135925 4403 mutex_unlock(&ls->ls_waiters_mutex);
e7fd4179 4404
ef0c2bb0 4405 if (!found)
e7fd4179 4406 lkb = NULL;
ef0c2bb0 4407 return lkb;
e7fd4179
DT
4408}
4409
4410/* Deal with lookups and lkb's marked RESEND from _pre. We may now be the
4411 master or dir-node for r. Processing the lkb may result in it being placed
4412 back on waiters. */
4413
ef0c2bb0
DT
4414/* We do this after normal locking has been enabled and any saved messages
4415 (in requestqueue) have been processed. We should be confident that at
4416 this point we won't get or process a reply to any of these waiting
4417 operations. But, new ops may be coming in on the rsbs/locks here from
4418 userspace or remotely. */
4419
4420/* there may have been an overlap unlock/cancel prior to recovery or after
4421 recovery. if before, the lkb may still have a pos wait_count; if after, the
4422 overlap flag would just have been set and nothing new sent. we can be
4423 confident here than any replies to either the initial op or overlap ops
4424 prior to recovery have been received. */
4425
e7fd4179
DT
4426int dlm_recover_waiters_post(struct dlm_ls *ls)
4427{
4428 struct dlm_lkb *lkb;
4429 struct dlm_rsb *r;
ef0c2bb0 4430 int error = 0, mstype, err, oc, ou;
e7fd4179
DT
4431
4432 while (1) {
4433 if (dlm_locking_stopped(ls)) {
4434 log_debug(ls, "recover_waiters_post aborted");
4435 error = -EINTR;
4436 break;
4437 }
4438
ef0c2bb0
DT
4439 lkb = find_resend_waiter(ls);
4440 if (!lkb)
e7fd4179
DT
4441 break;
4442
4443 r = lkb->lkb_resource;
ef0c2bb0
DT
4444 hold_rsb(r);
4445 lock_rsb(r);
4446
4447 mstype = lkb->lkb_wait_type;
4448 oc = is_overlap_cancel(lkb);
4449 ou = is_overlap_unlock(lkb);
4450 err = 0;
e7fd4179 4451
13ef1111
DT
4452 log_debug(ls, "waiter %x remote %x msg %d r_nodeid %d "
4453 "lkb_nodeid %d wait_nodeid %d dir_nodeid %d "
4454 "overlap %d %d", lkb->lkb_id, lkb->lkb_remid, mstype,
4455 r->res_nodeid, lkb->lkb_nodeid, lkb->lkb_wait_nodeid,
4456 dlm_dir_nodeid(r), oc, ou);
e7fd4179 4457
ef0c2bb0
DT
4458 /* At this point we assume that we won't get a reply to any
4459 previous op or overlap op on this lock. First, do a big
4460 remove_from_waiters() for all previous ops. */
4461
4462 lkb->lkb_flags &= ~DLM_IFL_RESEND;
4463 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK;
4464 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
4465 lkb->lkb_wait_type = 0;
4466 lkb->lkb_wait_count = 0;
4467 mutex_lock(&ls->ls_waiters_mutex);
4468 list_del_init(&lkb->lkb_wait_reply);
4469 mutex_unlock(&ls->ls_waiters_mutex);
4470 unhold_lkb(lkb); /* for waiters list */
4471
4472 if (oc || ou) {
4473 /* do an unlock or cancel instead of resending */
4474 switch (mstype) {
4475 case DLM_MSG_LOOKUP:
4476 case DLM_MSG_REQUEST:
4477 queue_cast(r, lkb, ou ? -DLM_EUNLOCK :
4478 -DLM_ECANCEL);
4479 unhold_lkb(lkb); /* undoes create_lkb() */
4480 break;
4481 case DLM_MSG_CONVERT:
4482 if (oc) {
4483 queue_cast(r, lkb, -DLM_ECANCEL);
4484 } else {
4485 lkb->lkb_exflags |= DLM_LKF_FORCEUNLOCK;
4486 _unlock_lock(r, lkb);
4487 }
4488 break;
4489 default:
4490 err = 1;
4491 }
4492 } else {
4493 switch (mstype) {
4494 case DLM_MSG_LOOKUP:
4495 case DLM_MSG_REQUEST:
4496 _request_lock(r, lkb);
4497 if (is_master(r))
4498 confirm_master(r, 0);
4499 break;
4500 case DLM_MSG_CONVERT:
4501 _convert_lock(r, lkb);
4502 break;
4503 default:
4504 err = 1;
4505 }
e7fd4179 4506 }
ef0c2bb0 4507
13ef1111
DT
4508 if (err) {
4509 log_error(ls, "waiter %x msg %d r_nodeid %d "
4510 "dir_nodeid %d overlap %d %d",
4511 lkb->lkb_id, mstype, r->res_nodeid,
4512 dlm_dir_nodeid(r), oc, ou);
4513 }
ef0c2bb0
DT
4514 unlock_rsb(r);
4515 put_rsb(r);
4516 dlm_put_lkb(lkb);
e7fd4179
DT
4517 }
4518
4519 return error;
4520}
4521
4522static void purge_queue(struct dlm_rsb *r, struct list_head *queue,
4523 int (*test)(struct dlm_ls *ls, struct dlm_lkb *lkb))
4524{
4525 struct dlm_ls *ls = r->res_ls;
4526 struct dlm_lkb *lkb, *safe;
4527
4528 list_for_each_entry_safe(lkb, safe, queue, lkb_statequeue) {
4529 if (test(ls, lkb)) {
97a35d1e 4530 rsb_set_flag(r, RSB_LOCKS_PURGED);
e7fd4179
DT
4531 del_lkb(r, lkb);
4532 /* this put should free the lkb */
b3f58d8f 4533 if (!dlm_put_lkb(lkb))
e7fd4179
DT
4534 log_error(ls, "purged lkb not released");
4535 }
4536 }
4537}
4538
4539static int purge_dead_test(struct dlm_ls *ls, struct dlm_lkb *lkb)
4540{
4541 return (is_master_copy(lkb) && dlm_is_removed(ls, lkb->lkb_nodeid));
4542}
4543
4544static int purge_mstcpy_test(struct dlm_ls *ls, struct dlm_lkb *lkb)
4545{
4546 return is_master_copy(lkb);
4547}
4548
4549static void purge_dead_locks(struct dlm_rsb *r)
4550{
4551 purge_queue(r, &r->res_grantqueue, &purge_dead_test);
4552 purge_queue(r, &r->res_convertqueue, &purge_dead_test);
4553 purge_queue(r, &r->res_waitqueue, &purge_dead_test);
4554}
4555
4556void dlm_purge_mstcpy_locks(struct dlm_rsb *r)
4557{
4558 purge_queue(r, &r->res_grantqueue, &purge_mstcpy_test);
4559 purge_queue(r, &r->res_convertqueue, &purge_mstcpy_test);
4560 purge_queue(r, &r->res_waitqueue, &purge_mstcpy_test);
4561}
4562
4563/* Get rid of locks held by nodes that are gone. */
4564
4565int dlm_purge_locks(struct dlm_ls *ls)
4566{
4567 struct dlm_rsb *r;
4568
4569 log_debug(ls, "dlm_purge_locks");
4570
4571 down_write(&ls->ls_root_sem);
4572 list_for_each_entry(r, &ls->ls_root_list, res_root_list) {
4573 hold_rsb(r);
4574 lock_rsb(r);
4575 if (is_master(r))
4576 purge_dead_locks(r);
4577 unlock_rsb(r);
4578 unhold_rsb(r);
4579
4580 schedule();
4581 }
4582 up_write(&ls->ls_root_sem);
4583
4584 return 0;
4585}
4586
97a35d1e
DT
4587static struct dlm_rsb *find_purged_rsb(struct dlm_ls *ls, int bucket)
4588{
9beb3bf5 4589 struct rb_node *n;
97a35d1e
DT
4590 struct dlm_rsb *r, *r_ret = NULL;
4591
c7be761a 4592 spin_lock(&ls->ls_rsbtbl[bucket].lock);
9beb3bf5
BP
4593 for (n = rb_first(&ls->ls_rsbtbl[bucket].keep); n; n = rb_next(n)) {
4594 r = rb_entry(n, struct dlm_rsb, res_hashnode);
97a35d1e
DT
4595 if (!rsb_flag(r, RSB_LOCKS_PURGED))
4596 continue;
4597 hold_rsb(r);
4598 rsb_clear_flag(r, RSB_LOCKS_PURGED);
4599 r_ret = r;
4600 break;
4601 }
c7be761a 4602 spin_unlock(&ls->ls_rsbtbl[bucket].lock);
97a35d1e
DT
4603 return r_ret;
4604}
4605
4606void dlm_grant_after_purge(struct dlm_ls *ls)
e7fd4179
DT
4607{
4608 struct dlm_rsb *r;
2b4e926a 4609 int bucket = 0;
e7fd4179 4610
2b4e926a
DT
4611 while (1) {
4612 r = find_purged_rsb(ls, bucket);
4613 if (!r) {
4614 if (bucket == ls->ls_rsbtbl_size - 1)
4615 break;
4616 bucket++;
97a35d1e 4617 continue;
2b4e926a 4618 }
97a35d1e
DT
4619 lock_rsb(r);
4620 if (is_master(r)) {
4621 grant_pending_locks(r);
4622 confirm_master(r, 0);
e7fd4179 4623 }
97a35d1e
DT
4624 unlock_rsb(r);
4625 put_rsb(r);
2b4e926a 4626 schedule();
e7fd4179 4627 }
e7fd4179
DT
4628}
4629
4630static struct dlm_lkb *search_remid_list(struct list_head *head, int nodeid,
4631 uint32_t remid)
4632{
4633 struct dlm_lkb *lkb;
4634
4635 list_for_each_entry(lkb, head, lkb_statequeue) {
4636 if (lkb->lkb_nodeid == nodeid && lkb->lkb_remid == remid)
4637 return lkb;
4638 }
4639 return NULL;
4640}
4641
4642static struct dlm_lkb *search_remid(struct dlm_rsb *r, int nodeid,
4643 uint32_t remid)
4644{
4645 struct dlm_lkb *lkb;
4646
4647 lkb = search_remid_list(&r->res_grantqueue, nodeid, remid);
4648 if (lkb)
4649 return lkb;
4650 lkb = search_remid_list(&r->res_convertqueue, nodeid, remid);
4651 if (lkb)
4652 return lkb;
4653 lkb = search_remid_list(&r->res_waitqueue, nodeid, remid);
4654 if (lkb)
4655 return lkb;
4656 return NULL;
4657}
4658
ae773d0b 4659/* needs at least dlm_rcom + rcom_lock */
e7fd4179
DT
4660static int receive_rcom_lock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
4661 struct dlm_rsb *r, struct dlm_rcom *rc)
4662{
4663 struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf;
e7fd4179
DT
4664
4665 lkb->lkb_nodeid = rc->rc_header.h_nodeid;
163a1859
AV
4666 lkb->lkb_ownpid = le32_to_cpu(rl->rl_ownpid);
4667 lkb->lkb_remid = le32_to_cpu(rl->rl_lkid);
4668 lkb->lkb_exflags = le32_to_cpu(rl->rl_exflags);
4669 lkb->lkb_flags = le32_to_cpu(rl->rl_flags) & 0x0000FFFF;
e7fd4179 4670 lkb->lkb_flags |= DLM_IFL_MSTCPY;
163a1859 4671 lkb->lkb_lvbseq = le32_to_cpu(rl->rl_lvbseq);
e7fd4179
DT
4672 lkb->lkb_rqmode = rl->rl_rqmode;
4673 lkb->lkb_grmode = rl->rl_grmode;
4674 /* don't set lkb_status because add_lkb wants to itself */
4675
8304d6f2
DT
4676 lkb->lkb_bastfn = (rl->rl_asts & DLM_CB_BAST) ? &fake_bastfn : NULL;
4677 lkb->lkb_astfn = (rl->rl_asts & DLM_CB_CAST) ? &fake_astfn : NULL;
e7fd4179 4678
e7fd4179 4679 if (lkb->lkb_exflags & DLM_LKF_VALBLK) {
a5dd0631
AV
4680 int lvblen = rc->rc_header.h_length - sizeof(struct dlm_rcom) -
4681 sizeof(struct rcom_lock);
4682 if (lvblen > ls->ls_lvblen)
4683 return -EINVAL;
52bda2b5 4684 lkb->lkb_lvbptr = dlm_allocate_lvb(ls);
e7fd4179
DT
4685 if (!lkb->lkb_lvbptr)
4686 return -ENOMEM;
e7fd4179
DT
4687 memcpy(lkb->lkb_lvbptr, rl->rl_lvb, lvblen);
4688 }
4689
4690 /* Conversions between PR and CW (middle modes) need special handling.
4691 The real granted mode of these converting locks cannot be determined
4692 until all locks have been rebuilt on the rsb (recover_conversion) */
4693
163a1859
AV
4694 if (rl->rl_wait_type == cpu_to_le16(DLM_MSG_CONVERT) &&
4695 middle_conversion(lkb)) {
e7fd4179
DT
4696 rl->rl_status = DLM_LKSTS_CONVERT;
4697 lkb->lkb_grmode = DLM_LOCK_IV;
4698 rsb_set_flag(r, RSB_RECOVER_CONVERT);
4699 }
4700
4701 return 0;
4702}
4703
4704/* This lkb may have been recovered in a previous aborted recovery so we need
4705 to check if the rsb already has an lkb with the given remote nodeid/lkid.
4706 If so we just send back a standard reply. If not, we create a new lkb with
4707 the given values and send back our lkid. We send back our lkid by sending
4708 back the rcom_lock struct we got but with the remid field filled in. */
4709
ae773d0b 4710/* needs at least dlm_rcom + rcom_lock */
e7fd4179
DT
4711int dlm_recover_master_copy(struct dlm_ls *ls, struct dlm_rcom *rc)
4712{
4713 struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf;
4714 struct dlm_rsb *r;
4715 struct dlm_lkb *lkb;
6d40c4a7 4716 uint32_t remid = 0;
e7fd4179
DT
4717 int error;
4718
4719 if (rl->rl_parent_lkid) {
4720 error = -EOPNOTSUPP;
4721 goto out;
4722 }
4723
6d40c4a7
DT
4724 remid = le32_to_cpu(rl->rl_lkid);
4725
163a1859
AV
4726 error = find_rsb(ls, rl->rl_name, le16_to_cpu(rl->rl_namelen),
4727 R_MASTER, &r);
e7fd4179
DT
4728 if (error)
4729 goto out;
4730
4731 lock_rsb(r);
4732
6d40c4a7 4733 lkb = search_remid(r, rc->rc_header.h_nodeid, remid);
e7fd4179
DT
4734 if (lkb) {
4735 error = -EEXIST;
4736 goto out_remid;
4737 }
4738
4739 error = create_lkb(ls, &lkb);
4740 if (error)
4741 goto out_unlock;
4742
4743 error = receive_rcom_lock_args(ls, lkb, r, rc);
4744 if (error) {
b3f58d8f 4745 __put_lkb(ls, lkb);
e7fd4179
DT
4746 goto out_unlock;
4747 }
4748
4749 attach_lkb(r, lkb);
4750 add_lkb(r, lkb, rl->rl_status);
4751 error = 0;
4752
4753 out_remid:
4754 /* this is the new value returned to the lock holder for
4755 saving in its process-copy lkb */
163a1859 4756 rl->rl_remid = cpu_to_le32(lkb->lkb_id);
e7fd4179
DT
4757
4758 out_unlock:
4759 unlock_rsb(r);
4760 put_rsb(r);
4761 out:
6d40c4a7
DT
4762 if (error && error != -EEXIST)
4763 log_debug(ls, "dlm_recover_master_copy remote %d %x error %d",
4764 rc->rc_header.h_nodeid, remid, error);
163a1859 4765 rl->rl_result = cpu_to_le32(error);
e7fd4179
DT
4766 return error;
4767}
4768
ae773d0b 4769/* needs at least dlm_rcom + rcom_lock */
e7fd4179
DT
4770int dlm_recover_process_copy(struct dlm_ls *ls, struct dlm_rcom *rc)
4771{
4772 struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf;
4773 struct dlm_rsb *r;
4774 struct dlm_lkb *lkb;
6d40c4a7
DT
4775 uint32_t lkid, remid;
4776 int error, result;
4777
4778 lkid = le32_to_cpu(rl->rl_lkid);
4779 remid = le32_to_cpu(rl->rl_remid);
4780 result = le32_to_cpu(rl->rl_result);
e7fd4179 4781
6d40c4a7 4782 error = find_lkb(ls, lkid, &lkb);
e7fd4179 4783 if (error) {
6d40c4a7
DT
4784 log_error(ls, "dlm_recover_process_copy no %x remote %d %x %d",
4785 lkid, rc->rc_header.h_nodeid, remid, result);
e7fd4179
DT
4786 return error;
4787 }
4788
6d40c4a7
DT
4789 if (!is_process_copy(lkb)) {
4790 log_error(ls, "dlm_recover_process_copy bad %x remote %d %x %d",
4791 lkid, rc->rc_header.h_nodeid, remid, result);
4792 dlm_print_lkb(lkb);
4793 return -EINVAL;
4794 }
e7fd4179
DT
4795
4796 r = lkb->lkb_resource;
4797 hold_rsb(r);
4798 lock_rsb(r);
4799
6d40c4a7 4800 switch (result) {
dc200a88
DT
4801 case -EBADR:
4802 /* There's a chance the new master received our lock before
4803 dlm_recover_master_reply(), this wouldn't happen if we did
4804 a barrier between recover_masters and recover_locks. */
6d40c4a7
DT
4805
4806 log_debug(ls, "dlm_recover_process_copy %x remote %d %x %d",
4807 lkid, rc->rc_header.h_nodeid, remid, result);
4808
dc200a88
DT
4809 dlm_send_rcom_lock(r, lkb);
4810 goto out;
e7fd4179 4811 case -EEXIST:
e7fd4179 4812 case 0:
6d40c4a7 4813 lkb->lkb_remid = remid;
e7fd4179
DT
4814 break;
4815 default:
6d40c4a7
DT
4816 log_error(ls, "dlm_recover_process_copy %x remote %d %x %d unk",
4817 lkid, rc->rc_header.h_nodeid, remid, result);
e7fd4179
DT
4818 }
4819
4820 /* an ack for dlm_recover_locks() which waits for replies from
4821 all the locks it sends to new masters */
4822 dlm_recovered_lock(r);
dc200a88 4823 out:
e7fd4179
DT
4824 unlock_rsb(r);
4825 put_rsb(r);
b3f58d8f 4826 dlm_put_lkb(lkb);
e7fd4179
DT
4827
4828 return 0;
4829}
4830
597d0cae
DT
4831int dlm_user_request(struct dlm_ls *ls, struct dlm_user_args *ua,
4832 int mode, uint32_t flags, void *name, unsigned int namelen,
d7db923e 4833 unsigned long timeout_cs)
597d0cae
DT
4834{
4835 struct dlm_lkb *lkb;
4836 struct dlm_args args;
4837 int error;
4838
85e86edf 4839 dlm_lock_recovery(ls);
597d0cae
DT
4840
4841 error = create_lkb(ls, &lkb);
4842 if (error) {
4843 kfree(ua);
4844 goto out;
4845 }
4846
4847 if (flags & DLM_LKF_VALBLK) {
573c24c4 4848 ua->lksb.sb_lvbptr = kzalloc(DLM_USER_LVB_LEN, GFP_NOFS);
597d0cae
DT
4849 if (!ua->lksb.sb_lvbptr) {
4850 kfree(ua);
4851 __put_lkb(ls, lkb);
4852 error = -ENOMEM;
4853 goto out;
4854 }
4855 }
4856
52bda2b5 4857 /* After ua is attached to lkb it will be freed by dlm_free_lkb().
597d0cae
DT
4858 When DLM_IFL_USER is set, the dlm knows that this is a userspace
4859 lock and that lkb_astparam is the dlm_user_args structure. */
4860
d7db923e 4861 error = set_lock_args(mode, &ua->lksb, flags, namelen, timeout_cs,
e5dae548 4862 fake_astfn, ua, fake_bastfn, &args);
597d0cae 4863 lkb->lkb_flags |= DLM_IFL_USER;
597d0cae
DT
4864
4865 if (error) {
4866 __put_lkb(ls, lkb);
4867 goto out;
4868 }
4869
4870 error = request_lock(ls, lkb, name, namelen, &args);
4871
4872 switch (error) {
4873 case 0:
4874 break;
4875 case -EINPROGRESS:
4876 error = 0;
4877 break;
4878 case -EAGAIN:
4879 error = 0;
4880 /* fall through */
4881 default:
4882 __put_lkb(ls, lkb);
4883 goto out;
4884 }
4885
4886 /* add this new lkb to the per-process list of locks */
4887 spin_lock(&ua->proc->locks_spin);
ef0c2bb0 4888 hold_lkb(lkb);
597d0cae
DT
4889 list_add_tail(&lkb->lkb_ownqueue, &ua->proc->locks);
4890 spin_unlock(&ua->proc->locks_spin);
4891 out:
85e86edf 4892 dlm_unlock_recovery(ls);
597d0cae
DT
4893 return error;
4894}
4895
4896int dlm_user_convert(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
d7db923e
DT
4897 int mode, uint32_t flags, uint32_t lkid, char *lvb_in,
4898 unsigned long timeout_cs)
597d0cae
DT
4899{
4900 struct dlm_lkb *lkb;
4901 struct dlm_args args;
4902 struct dlm_user_args *ua;
4903 int error;
4904
85e86edf 4905 dlm_lock_recovery(ls);
597d0cae
DT
4906
4907 error = find_lkb(ls, lkid, &lkb);
4908 if (error)
4909 goto out;
4910
4911 /* user can change the params on its lock when it converts it, or
4912 add an lvb that didn't exist before */
4913
d292c0cc 4914 ua = lkb->lkb_ua;
597d0cae
DT
4915
4916 if (flags & DLM_LKF_VALBLK && !ua->lksb.sb_lvbptr) {
573c24c4 4917 ua->lksb.sb_lvbptr = kzalloc(DLM_USER_LVB_LEN, GFP_NOFS);
597d0cae
DT
4918 if (!ua->lksb.sb_lvbptr) {
4919 error = -ENOMEM;
4920 goto out_put;
4921 }
4922 }
4923 if (lvb_in && ua->lksb.sb_lvbptr)
4924 memcpy(ua->lksb.sb_lvbptr, lvb_in, DLM_USER_LVB_LEN);
4925
d7db923e 4926 ua->xid = ua_tmp->xid;
597d0cae
DT
4927 ua->castparam = ua_tmp->castparam;
4928 ua->castaddr = ua_tmp->castaddr;
4929 ua->bastparam = ua_tmp->bastparam;
4930 ua->bastaddr = ua_tmp->bastaddr;
10948eb4 4931 ua->user_lksb = ua_tmp->user_lksb;
597d0cae 4932
d7db923e 4933 error = set_lock_args(mode, &ua->lksb, flags, 0, timeout_cs,
e5dae548 4934 fake_astfn, ua, fake_bastfn, &args);
597d0cae
DT
4935 if (error)
4936 goto out_put;
4937
4938 error = convert_lock(ls, lkb, &args);
4939
c85d65e9 4940 if (error == -EINPROGRESS || error == -EAGAIN || error == -EDEADLK)
597d0cae
DT
4941 error = 0;
4942 out_put:
4943 dlm_put_lkb(lkb);
4944 out:
85e86edf 4945 dlm_unlock_recovery(ls);
597d0cae
DT
4946 kfree(ua_tmp);
4947 return error;
4948}
4949
4950int dlm_user_unlock(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
4951 uint32_t flags, uint32_t lkid, char *lvb_in)
4952{
4953 struct dlm_lkb *lkb;
4954 struct dlm_args args;
4955 struct dlm_user_args *ua;
4956 int error;
4957
85e86edf 4958 dlm_lock_recovery(ls);
597d0cae
DT
4959
4960 error = find_lkb(ls, lkid, &lkb);
4961 if (error)
4962 goto out;
4963
d292c0cc 4964 ua = lkb->lkb_ua;
597d0cae
DT
4965
4966 if (lvb_in && ua->lksb.sb_lvbptr)
4967 memcpy(ua->lksb.sb_lvbptr, lvb_in, DLM_USER_LVB_LEN);
b434eda6
PC
4968 if (ua_tmp->castparam)
4969 ua->castparam = ua_tmp->castparam;
cc346d55 4970 ua->user_lksb = ua_tmp->user_lksb;
597d0cae
DT
4971
4972 error = set_unlock_args(flags, ua, &args);
4973 if (error)
4974 goto out_put;
4975
4976 error = unlock_lock(ls, lkb, &args);
4977
4978 if (error == -DLM_EUNLOCK)
4979 error = 0;
ef0c2bb0
DT
4980 /* from validate_unlock_args() */
4981 if (error == -EBUSY && (flags & DLM_LKF_FORCEUNLOCK))
4982 error = 0;
597d0cae
DT
4983 if (error)
4984 goto out_put;
4985
4986 spin_lock(&ua->proc->locks_spin);
23e8e1aa 4987 /* dlm_user_add_cb() may have already taken lkb off the proc list */
a1bc86e6
DT
4988 if (!list_empty(&lkb->lkb_ownqueue))
4989 list_move(&lkb->lkb_ownqueue, &ua->proc->unlocking);
597d0cae 4990 spin_unlock(&ua->proc->locks_spin);
597d0cae
DT
4991 out_put:
4992 dlm_put_lkb(lkb);
4993 out:
85e86edf 4994 dlm_unlock_recovery(ls);
ef0c2bb0 4995 kfree(ua_tmp);
597d0cae
DT
4996 return error;
4997}
4998
4999int dlm_user_cancel(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
5000 uint32_t flags, uint32_t lkid)
5001{
5002 struct dlm_lkb *lkb;
5003 struct dlm_args args;
5004 struct dlm_user_args *ua;
5005 int error;
5006
85e86edf 5007 dlm_lock_recovery(ls);
597d0cae
DT
5008
5009 error = find_lkb(ls, lkid, &lkb);
5010 if (error)
5011 goto out;
5012
d292c0cc 5013 ua = lkb->lkb_ua;
b434eda6
PC
5014 if (ua_tmp->castparam)
5015 ua->castparam = ua_tmp->castparam;
c059f70e 5016 ua->user_lksb = ua_tmp->user_lksb;
597d0cae
DT
5017
5018 error = set_unlock_args(flags, ua, &args);
5019 if (error)
5020 goto out_put;
5021
5022 error = cancel_lock(ls, lkb, &args);
5023
5024 if (error == -DLM_ECANCEL)
5025 error = 0;
ef0c2bb0
DT
5026 /* from validate_unlock_args() */
5027 if (error == -EBUSY)
5028 error = 0;
597d0cae
DT
5029 out_put:
5030 dlm_put_lkb(lkb);
5031 out:
85e86edf 5032 dlm_unlock_recovery(ls);
ef0c2bb0 5033 kfree(ua_tmp);
597d0cae
DT
5034 return error;
5035}
5036
8b4021fa
DT
5037int dlm_user_deadlock(struct dlm_ls *ls, uint32_t flags, uint32_t lkid)
5038{
5039 struct dlm_lkb *lkb;
5040 struct dlm_args args;
5041 struct dlm_user_args *ua;
5042 struct dlm_rsb *r;
5043 int error;
5044
5045 dlm_lock_recovery(ls);
5046
5047 error = find_lkb(ls, lkid, &lkb);
5048 if (error)
5049 goto out;
5050
d292c0cc 5051 ua = lkb->lkb_ua;
8b4021fa
DT
5052
5053 error = set_unlock_args(flags, ua, &args);
5054 if (error)
5055 goto out_put;
5056
5057 /* same as cancel_lock(), but set DEADLOCK_CANCEL after lock_rsb */
5058
5059 r = lkb->lkb_resource;
5060 hold_rsb(r);
5061 lock_rsb(r);
5062
5063 error = validate_unlock_args(lkb, &args);
5064 if (error)
5065 goto out_r;
5066 lkb->lkb_flags |= DLM_IFL_DEADLOCK_CANCEL;
5067
5068 error = _cancel_lock(r, lkb);
5069 out_r:
5070 unlock_rsb(r);
5071 put_rsb(r);
5072
5073 if (error == -DLM_ECANCEL)
5074 error = 0;
5075 /* from validate_unlock_args() */
5076 if (error == -EBUSY)
5077 error = 0;
5078 out_put:
5079 dlm_put_lkb(lkb);
5080 out:
5081 dlm_unlock_recovery(ls);
5082 return error;
5083}
5084
ef0c2bb0
DT
5085/* lkb's that are removed from the waiters list by revert are just left on the
5086 orphans list with the granted orphan locks, to be freed by purge */
5087
597d0cae
DT
5088static int orphan_proc_lock(struct dlm_ls *ls, struct dlm_lkb *lkb)
5089{
ef0c2bb0
DT
5090 struct dlm_args args;
5091 int error;
597d0cae 5092
ef0c2bb0
DT
5093 hold_lkb(lkb);
5094 mutex_lock(&ls->ls_orphans_mutex);
5095 list_add_tail(&lkb->lkb_ownqueue, &ls->ls_orphans);
5096 mutex_unlock(&ls->ls_orphans_mutex);
597d0cae 5097
d292c0cc 5098 set_unlock_args(0, lkb->lkb_ua, &args);
ef0c2bb0
DT
5099
5100 error = cancel_lock(ls, lkb, &args);
5101 if (error == -DLM_ECANCEL)
5102 error = 0;
5103 return error;
597d0cae
DT
5104}
5105
5106/* The force flag allows the unlock to go ahead even if the lkb isn't granted.
5107 Regardless of what rsb queue the lock is on, it's removed and freed. */
5108
5109static int unlock_proc_lock(struct dlm_ls *ls, struct dlm_lkb *lkb)
5110{
597d0cae
DT
5111 struct dlm_args args;
5112 int error;
5113
d292c0cc 5114 set_unlock_args(DLM_LKF_FORCEUNLOCK, lkb->lkb_ua, &args);
597d0cae
DT
5115
5116 error = unlock_lock(ls, lkb, &args);
5117 if (error == -DLM_EUNLOCK)
5118 error = 0;
5119 return error;
5120}
5121
ef0c2bb0
DT
5122/* We have to release clear_proc_locks mutex before calling unlock_proc_lock()
5123 (which does lock_rsb) due to deadlock with receiving a message that does
23e8e1aa 5124 lock_rsb followed by dlm_user_add_cb() */
ef0c2bb0
DT
5125
5126static struct dlm_lkb *del_proc_lock(struct dlm_ls *ls,
5127 struct dlm_user_proc *proc)
5128{
5129 struct dlm_lkb *lkb = NULL;
5130
5131 mutex_lock(&ls->ls_clear_proc_locks);
5132 if (list_empty(&proc->locks))
5133 goto out;
5134
5135 lkb = list_entry(proc->locks.next, struct dlm_lkb, lkb_ownqueue);
5136 list_del_init(&lkb->lkb_ownqueue);
5137
5138 if (lkb->lkb_exflags & DLM_LKF_PERSISTENT)
5139 lkb->lkb_flags |= DLM_IFL_ORPHAN;
5140 else
5141 lkb->lkb_flags |= DLM_IFL_DEAD;
5142 out:
5143 mutex_unlock(&ls->ls_clear_proc_locks);
5144 return lkb;
5145}
5146
23e8e1aa 5147/* The ls_clear_proc_locks mutex protects against dlm_user_add_cb() which
597d0cae
DT
5148 1) references lkb->ua which we free here and 2) adds lkbs to proc->asts,
5149 which we clear here. */
5150
5151/* proc CLOSING flag is set so no more device_reads should look at proc->asts
5152 list, and no more device_writes should add lkb's to proc->locks list; so we
5153 shouldn't need to take asts_spin or locks_spin here. this assumes that
5154 device reads/writes/closes are serialized -- FIXME: we may need to serialize
5155 them ourself. */
5156
5157void dlm_clear_proc_locks(struct dlm_ls *ls, struct dlm_user_proc *proc)
5158{
5159 struct dlm_lkb *lkb, *safe;
5160
85e86edf 5161 dlm_lock_recovery(ls);
597d0cae 5162
ef0c2bb0
DT
5163 while (1) {
5164 lkb = del_proc_lock(ls, proc);
5165 if (!lkb)
5166 break;
84d8cd69 5167 del_timeout(lkb);
ef0c2bb0 5168 if (lkb->lkb_exflags & DLM_LKF_PERSISTENT)
597d0cae 5169 orphan_proc_lock(ls, lkb);
ef0c2bb0 5170 else
597d0cae 5171 unlock_proc_lock(ls, lkb);
597d0cae
DT
5172
5173 /* this removes the reference for the proc->locks list
5174 added by dlm_user_request, it may result in the lkb
5175 being freed */
5176
5177 dlm_put_lkb(lkb);
5178 }
a1bc86e6 5179
ef0c2bb0
DT
5180 mutex_lock(&ls->ls_clear_proc_locks);
5181
a1bc86e6
DT
5182 /* in-progress unlocks */
5183 list_for_each_entry_safe(lkb, safe, &proc->unlocking, lkb_ownqueue) {
5184 list_del_init(&lkb->lkb_ownqueue);
5185 lkb->lkb_flags |= DLM_IFL_DEAD;
5186 dlm_put_lkb(lkb);
5187 }
5188
23e8e1aa 5189 list_for_each_entry_safe(lkb, safe, &proc->asts, lkb_cb_list) {
8304d6f2
DT
5190 memset(&lkb->lkb_callbacks, 0,
5191 sizeof(struct dlm_callback) * DLM_CALLBACKS_SIZE);
23e8e1aa 5192 list_del_init(&lkb->lkb_cb_list);
a1bc86e6
DT
5193 dlm_put_lkb(lkb);
5194 }
5195
597d0cae 5196 mutex_unlock(&ls->ls_clear_proc_locks);
85e86edf 5197 dlm_unlock_recovery(ls);
597d0cae 5198}
a1bc86e6 5199
8499137d
DT
5200static void purge_proc_locks(struct dlm_ls *ls, struct dlm_user_proc *proc)
5201{
5202 struct dlm_lkb *lkb, *safe;
5203
5204 while (1) {
5205 lkb = NULL;
5206 spin_lock(&proc->locks_spin);
5207 if (!list_empty(&proc->locks)) {
5208 lkb = list_entry(proc->locks.next, struct dlm_lkb,
5209 lkb_ownqueue);
5210 list_del_init(&lkb->lkb_ownqueue);
5211 }
5212 spin_unlock(&proc->locks_spin);
5213
5214 if (!lkb)
5215 break;
5216
5217 lkb->lkb_flags |= DLM_IFL_DEAD;
5218 unlock_proc_lock(ls, lkb);
5219 dlm_put_lkb(lkb); /* ref from proc->locks list */
5220 }
5221
5222 spin_lock(&proc->locks_spin);
5223 list_for_each_entry_safe(lkb, safe, &proc->unlocking, lkb_ownqueue) {
5224 list_del_init(&lkb->lkb_ownqueue);
5225 lkb->lkb_flags |= DLM_IFL_DEAD;
5226 dlm_put_lkb(lkb);
5227 }
5228 spin_unlock(&proc->locks_spin);
5229
5230 spin_lock(&proc->asts_spin);
23e8e1aa 5231 list_for_each_entry_safe(lkb, safe, &proc->asts, lkb_cb_list) {
8304d6f2
DT
5232 memset(&lkb->lkb_callbacks, 0,
5233 sizeof(struct dlm_callback) * DLM_CALLBACKS_SIZE);
23e8e1aa 5234 list_del_init(&lkb->lkb_cb_list);
8499137d
DT
5235 dlm_put_lkb(lkb);
5236 }
5237 spin_unlock(&proc->asts_spin);
5238}
5239
5240/* pid of 0 means purge all orphans */
5241
5242static void do_purge(struct dlm_ls *ls, int nodeid, int pid)
5243{
5244 struct dlm_lkb *lkb, *safe;
5245
5246 mutex_lock(&ls->ls_orphans_mutex);
5247 list_for_each_entry_safe(lkb, safe, &ls->ls_orphans, lkb_ownqueue) {
5248 if (pid && lkb->lkb_ownpid != pid)
5249 continue;
5250 unlock_proc_lock(ls, lkb);
5251 list_del_init(&lkb->lkb_ownqueue);
5252 dlm_put_lkb(lkb);
5253 }
5254 mutex_unlock(&ls->ls_orphans_mutex);
5255}
5256
5257static int send_purge(struct dlm_ls *ls, int nodeid, int pid)
5258{
5259 struct dlm_message *ms;
5260 struct dlm_mhandle *mh;
5261 int error;
5262
5263 error = _create_message(ls, sizeof(struct dlm_message), nodeid,
5264 DLM_MSG_PURGE, &ms, &mh);
5265 if (error)
5266 return error;
5267 ms->m_nodeid = nodeid;
5268 ms->m_pid = pid;
5269
5270 return send_message(mh, ms);
5271}
5272
5273int dlm_user_purge(struct dlm_ls *ls, struct dlm_user_proc *proc,
5274 int nodeid, int pid)
5275{
5276 int error = 0;
5277
5278 if (nodeid != dlm_our_nodeid()) {
5279 error = send_purge(ls, nodeid, pid);
5280 } else {
85e86edf 5281 dlm_lock_recovery(ls);
8499137d
DT
5282 if (pid == current->pid)
5283 purge_proc_locks(ls, proc);
5284 else
5285 do_purge(ls, nodeid, pid);
85e86edf 5286 dlm_unlock_recovery(ls);
8499137d
DT
5287 }
5288 return error;
5289}
5290
This page took 1.064061 seconds and 5 git commands to generate.