dlm: keep lkbs in idr
[deliverable/linux.git] / fs / dlm / lock.c
CommitLineData
e7fd4179
DT
1/******************************************************************************
2*******************************************************************************
3**
7fe2b319 4** Copyright (C) 2005-2010 Red Hat, Inc. All rights reserved.
e7fd4179
DT
5**
6** This copyrighted material is made available to anyone wishing to use,
7** modify, copy, or redistribute it subject to the terms and conditions
8** of the GNU General Public License v.2.
9**
10*******************************************************************************
11******************************************************************************/
12
13/* Central locking logic has four stages:
14
15 dlm_lock()
16 dlm_unlock()
17
18 request_lock(ls, lkb)
19 convert_lock(ls, lkb)
20 unlock_lock(ls, lkb)
21 cancel_lock(ls, lkb)
22
23 _request_lock(r, lkb)
24 _convert_lock(r, lkb)
25 _unlock_lock(r, lkb)
26 _cancel_lock(r, lkb)
27
28 do_request(r, lkb)
29 do_convert(r, lkb)
30 do_unlock(r, lkb)
31 do_cancel(r, lkb)
32
33 Stage 1 (lock, unlock) is mainly about checking input args and
34 splitting into one of the four main operations:
35
36 dlm_lock = request_lock
37 dlm_lock+CONVERT = convert_lock
38 dlm_unlock = unlock_lock
39 dlm_unlock+CANCEL = cancel_lock
40
41 Stage 2, xxxx_lock(), just finds and locks the relevant rsb which is
42 provided to the next stage.
43
44 Stage 3, _xxxx_lock(), determines if the operation is local or remote.
45 When remote, it calls send_xxxx(), when local it calls do_xxxx().
46
47 Stage 4, do_xxxx(), is the guts of the operation. It manipulates the
48 given rsb and lkb and queues callbacks.
49
50 For remote operations, send_xxxx() results in the corresponding do_xxxx()
51 function being executed on the remote node. The connecting send/receive
52 calls on local (L) and remote (R) nodes:
53
54 L: send_xxxx() -> R: receive_xxxx()
55 R: do_xxxx()
56 L: receive_xxxx_reply() <- R: send_xxxx_reply()
57*/
597d0cae 58#include <linux/types.h>
5a0e3ad6 59#include <linux/slab.h>
e7fd4179 60#include "dlm_internal.h"
597d0cae 61#include <linux/dlm_device.h>
e7fd4179
DT
62#include "memory.h"
63#include "lowcomms.h"
64#include "requestqueue.h"
65#include "util.h"
66#include "dir.h"
67#include "member.h"
68#include "lockspace.h"
69#include "ast.h"
70#include "lock.h"
71#include "rcom.h"
72#include "recover.h"
73#include "lvb_table.h"
597d0cae 74#include "user.h"
e7fd4179
DT
75#include "config.h"
76
77static int send_request(struct dlm_rsb *r, struct dlm_lkb *lkb);
78static int send_convert(struct dlm_rsb *r, struct dlm_lkb *lkb);
79static int send_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb);
80static int send_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb);
81static int send_grant(struct dlm_rsb *r, struct dlm_lkb *lkb);
82static int send_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int mode);
83static int send_lookup(struct dlm_rsb *r, struct dlm_lkb *lkb);
84static int send_remove(struct dlm_rsb *r);
85static int _request_lock(struct dlm_rsb *r, struct dlm_lkb *lkb);
3ae1acf9 86static int _cancel_lock(struct dlm_rsb *r, struct dlm_lkb *lkb);
e7fd4179
DT
87static void __receive_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
88 struct dlm_message *ms);
89static int receive_extralen(struct dlm_message *ms);
8499137d 90static void do_purge(struct dlm_ls *ls, int nodeid, int pid);
3ae1acf9 91static void del_timeout(struct dlm_lkb *lkb);
e7fd4179
DT
92
93/*
94 * Lock compatibilty matrix - thanks Steve
95 * UN = Unlocked state. Not really a state, used as a flag
96 * PD = Padding. Used to make the matrix a nice power of two in size
97 * Other states are the same as the VMS DLM.
98 * Usage: matrix[grmode+1][rqmode+1] (although m[rq+1][gr+1] is the same)
99 */
100
101static const int __dlm_compat_matrix[8][8] = {
102 /* UN NL CR CW PR PW EX PD */
103 {1, 1, 1, 1, 1, 1, 1, 0}, /* UN */
104 {1, 1, 1, 1, 1, 1, 1, 0}, /* NL */
105 {1, 1, 1, 1, 1, 1, 0, 0}, /* CR */
106 {1, 1, 1, 1, 0, 0, 0, 0}, /* CW */
107 {1, 1, 1, 0, 1, 0, 0, 0}, /* PR */
108 {1, 1, 1, 0, 0, 0, 0, 0}, /* PW */
109 {1, 1, 0, 0, 0, 0, 0, 0}, /* EX */
110 {0, 0, 0, 0, 0, 0, 0, 0} /* PD */
111};
112
113/*
114 * This defines the direction of transfer of LVB data.
115 * Granted mode is the row; requested mode is the column.
116 * Usage: matrix[grmode+1][rqmode+1]
117 * 1 = LVB is returned to the caller
118 * 0 = LVB is written to the resource
119 * -1 = nothing happens to the LVB
120 */
121
122const int dlm_lvb_operations[8][8] = {
123 /* UN NL CR CW PR PW EX PD*/
124 { -1, 1, 1, 1, 1, 1, 1, -1 }, /* UN */
125 { -1, 1, 1, 1, 1, 1, 1, 0 }, /* NL */
126 { -1, -1, 1, 1, 1, 1, 1, 0 }, /* CR */
127 { -1, -1, -1, 1, 1, 1, 1, 0 }, /* CW */
128 { -1, -1, -1, -1, 1, 1, 1, 0 }, /* PR */
129 { -1, 0, 0, 0, 0, 0, 1, 0 }, /* PW */
130 { -1, 0, 0, 0, 0, 0, 0, 0 }, /* EX */
131 { -1, 0, 0, 0, 0, 0, 0, 0 } /* PD */
132};
e7fd4179
DT
133
134#define modes_compat(gr, rq) \
135 __dlm_compat_matrix[(gr)->lkb_grmode + 1][(rq)->lkb_rqmode + 1]
136
137int dlm_modes_compat(int mode1, int mode2)
138{
139 return __dlm_compat_matrix[mode1 + 1][mode2 + 1];
140}
141
142/*
143 * Compatibility matrix for conversions with QUECVT set.
144 * Granted mode is the row; requested mode is the column.
145 * Usage: matrix[grmode+1][rqmode+1]
146 */
147
148static const int __quecvt_compat_matrix[8][8] = {
149 /* UN NL CR CW PR PW EX PD */
150 {0, 0, 0, 0, 0, 0, 0, 0}, /* UN */
151 {0, 0, 1, 1, 1, 1, 1, 0}, /* NL */
152 {0, 0, 0, 1, 1, 1, 1, 0}, /* CR */
153 {0, 0, 0, 0, 1, 1, 1, 0}, /* CW */
154 {0, 0, 0, 1, 0, 1, 1, 0}, /* PR */
155 {0, 0, 0, 0, 0, 0, 1, 0}, /* PW */
156 {0, 0, 0, 0, 0, 0, 0, 0}, /* EX */
157 {0, 0, 0, 0, 0, 0, 0, 0} /* PD */
158};
159
597d0cae 160void dlm_print_lkb(struct dlm_lkb *lkb)
e7fd4179
DT
161{
162 printk(KERN_ERR "lkb: nodeid %d id %x remid %x exflags %x flags %x\n"
8304d6f2 163 " status %d rqmode %d grmode %d wait_type %d\n",
e7fd4179
DT
164 lkb->lkb_nodeid, lkb->lkb_id, lkb->lkb_remid, lkb->lkb_exflags,
165 lkb->lkb_flags, lkb->lkb_status, lkb->lkb_rqmode,
8304d6f2 166 lkb->lkb_grmode, lkb->lkb_wait_type);
e7fd4179
DT
167}
168
170e19ab 169static void dlm_print_rsb(struct dlm_rsb *r)
e7fd4179
DT
170{
171 printk(KERN_ERR "rsb: nodeid %d flags %lx first %x rlc %d name %s\n",
172 r->res_nodeid, r->res_flags, r->res_first_lkid,
173 r->res_recover_locks_count, r->res_name);
174}
175
a345da3e
DT
176void dlm_dump_rsb(struct dlm_rsb *r)
177{
178 struct dlm_lkb *lkb;
179
180 dlm_print_rsb(r);
181
182 printk(KERN_ERR "rsb: root_list empty %d recover_list empty %d\n",
183 list_empty(&r->res_root_list), list_empty(&r->res_recover_list));
184 printk(KERN_ERR "rsb lookup list\n");
185 list_for_each_entry(lkb, &r->res_lookup, lkb_rsb_lookup)
186 dlm_print_lkb(lkb);
187 printk(KERN_ERR "rsb grant queue:\n");
188 list_for_each_entry(lkb, &r->res_grantqueue, lkb_statequeue)
189 dlm_print_lkb(lkb);
190 printk(KERN_ERR "rsb convert queue:\n");
191 list_for_each_entry(lkb, &r->res_convertqueue, lkb_statequeue)
192 dlm_print_lkb(lkb);
193 printk(KERN_ERR "rsb wait queue:\n");
194 list_for_each_entry(lkb, &r->res_waitqueue, lkb_statequeue)
195 dlm_print_lkb(lkb);
196}
197
e7fd4179
DT
198/* Threads cannot use the lockspace while it's being recovered */
199
85e86edf 200static inline void dlm_lock_recovery(struct dlm_ls *ls)
e7fd4179
DT
201{
202 down_read(&ls->ls_in_recovery);
203}
204
85e86edf 205void dlm_unlock_recovery(struct dlm_ls *ls)
e7fd4179
DT
206{
207 up_read(&ls->ls_in_recovery);
208}
209
85e86edf 210int dlm_lock_recovery_try(struct dlm_ls *ls)
e7fd4179
DT
211{
212 return down_read_trylock(&ls->ls_in_recovery);
213}
214
215static inline int can_be_queued(struct dlm_lkb *lkb)
216{
217 return !(lkb->lkb_exflags & DLM_LKF_NOQUEUE);
218}
219
220static inline int force_blocking_asts(struct dlm_lkb *lkb)
221{
222 return (lkb->lkb_exflags & DLM_LKF_NOQUEUEBAST);
223}
224
225static inline int is_demoted(struct dlm_lkb *lkb)
226{
227 return (lkb->lkb_sbflags & DLM_SBF_DEMOTED);
228}
229
7d3c1feb
DT
230static inline int is_altmode(struct dlm_lkb *lkb)
231{
232 return (lkb->lkb_sbflags & DLM_SBF_ALTMODE);
233}
234
235static inline int is_granted(struct dlm_lkb *lkb)
236{
237 return (lkb->lkb_status == DLM_LKSTS_GRANTED);
238}
239
e7fd4179
DT
240static inline int is_remote(struct dlm_rsb *r)
241{
242 DLM_ASSERT(r->res_nodeid >= 0, dlm_print_rsb(r););
243 return !!r->res_nodeid;
244}
245
246static inline int is_process_copy(struct dlm_lkb *lkb)
247{
248 return (lkb->lkb_nodeid && !(lkb->lkb_flags & DLM_IFL_MSTCPY));
249}
250
251static inline int is_master_copy(struct dlm_lkb *lkb)
252{
253 if (lkb->lkb_flags & DLM_IFL_MSTCPY)
254 DLM_ASSERT(lkb->lkb_nodeid, dlm_print_lkb(lkb););
90135925 255 return (lkb->lkb_flags & DLM_IFL_MSTCPY) ? 1 : 0;
e7fd4179
DT
256}
257
258static inline int middle_conversion(struct dlm_lkb *lkb)
259{
260 if ((lkb->lkb_grmode==DLM_LOCK_PR && lkb->lkb_rqmode==DLM_LOCK_CW) ||
261 (lkb->lkb_rqmode==DLM_LOCK_PR && lkb->lkb_grmode==DLM_LOCK_CW))
90135925
DT
262 return 1;
263 return 0;
e7fd4179
DT
264}
265
266static inline int down_conversion(struct dlm_lkb *lkb)
267{
268 return (!middle_conversion(lkb) && lkb->lkb_rqmode < lkb->lkb_grmode);
269}
270
ef0c2bb0
DT
271static inline int is_overlap_unlock(struct dlm_lkb *lkb)
272{
273 return lkb->lkb_flags & DLM_IFL_OVERLAP_UNLOCK;
274}
275
276static inline int is_overlap_cancel(struct dlm_lkb *lkb)
277{
278 return lkb->lkb_flags & DLM_IFL_OVERLAP_CANCEL;
279}
280
281static inline int is_overlap(struct dlm_lkb *lkb)
282{
283 return (lkb->lkb_flags & (DLM_IFL_OVERLAP_UNLOCK |
284 DLM_IFL_OVERLAP_CANCEL));
285}
286
e7fd4179
DT
287static void queue_cast(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
288{
289 if (is_master_copy(lkb))
290 return;
291
3ae1acf9
DT
292 del_timeout(lkb);
293
e7fd4179
DT
294 DLM_ASSERT(lkb->lkb_lksb, dlm_print_lkb(lkb););
295
3ae1acf9
DT
296 /* if the operation was a cancel, then return -DLM_ECANCEL, if a
297 timeout caused the cancel then return -ETIMEDOUT */
298 if (rv == -DLM_ECANCEL && (lkb->lkb_flags & DLM_IFL_TIMEOUT_CANCEL)) {
299 lkb->lkb_flags &= ~DLM_IFL_TIMEOUT_CANCEL;
300 rv = -ETIMEDOUT;
301 }
302
8b4021fa
DT
303 if (rv == -DLM_ECANCEL && (lkb->lkb_flags & DLM_IFL_DEADLOCK_CANCEL)) {
304 lkb->lkb_flags &= ~DLM_IFL_DEADLOCK_CANCEL;
305 rv = -EDEADLK;
306 }
307
8304d6f2 308 dlm_add_ast(lkb, DLM_CB_CAST, lkb->lkb_grmode, rv, lkb->lkb_sbflags);
e7fd4179
DT
309}
310
ef0c2bb0
DT
311static inline void queue_cast_overlap(struct dlm_rsb *r, struct dlm_lkb *lkb)
312{
313 queue_cast(r, lkb,
314 is_overlap_unlock(lkb) ? -DLM_EUNLOCK : -DLM_ECANCEL);
315}
316
e7fd4179
DT
317static void queue_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int rqmode)
318{
b6fa8796 319 if (is_master_copy(lkb)) {
e7fd4179 320 send_bast(r, lkb, rqmode);
b6fa8796 321 } else {
8304d6f2 322 dlm_add_ast(lkb, DLM_CB_BAST, rqmode, 0, 0);
b6fa8796 323 }
e7fd4179
DT
324}
325
326/*
327 * Basic operations on rsb's and lkb's
328 */
329
330static struct dlm_rsb *create_rsb(struct dlm_ls *ls, char *name, int len)
331{
332 struct dlm_rsb *r;
333
52bda2b5 334 r = dlm_allocate_rsb(ls, len);
e7fd4179
DT
335 if (!r)
336 return NULL;
337
338 r->res_ls = ls;
339 r->res_length = len;
340 memcpy(r->res_name, name, len);
90135925 341 mutex_init(&r->res_mutex);
e7fd4179
DT
342
343 INIT_LIST_HEAD(&r->res_lookup);
344 INIT_LIST_HEAD(&r->res_grantqueue);
345 INIT_LIST_HEAD(&r->res_convertqueue);
346 INIT_LIST_HEAD(&r->res_waitqueue);
347 INIT_LIST_HEAD(&r->res_root_list);
348 INIT_LIST_HEAD(&r->res_recover_list);
349
350 return r;
351}
352
353static int search_rsb_list(struct list_head *head, char *name, int len,
354 unsigned int flags, struct dlm_rsb **r_ret)
355{
356 struct dlm_rsb *r;
357 int error = 0;
358
359 list_for_each_entry(r, head, res_hashchain) {
360 if (len == r->res_length && !memcmp(name, r->res_name, len))
361 goto found;
362 }
18c60c0a 363 *r_ret = NULL;
597d0cae 364 return -EBADR;
e7fd4179
DT
365
366 found:
367 if (r->res_nodeid && (flags & R_MASTER))
368 error = -ENOTBLK;
369 *r_ret = r;
370 return error;
371}
372
373static int _search_rsb(struct dlm_ls *ls, char *name, int len, int b,
374 unsigned int flags, struct dlm_rsb **r_ret)
375{
376 struct dlm_rsb *r;
377 int error;
378
379 error = search_rsb_list(&ls->ls_rsbtbl[b].list, name, len, flags, &r);
380 if (!error) {
381 kref_get(&r->res_ref);
382 goto out;
383 }
384 error = search_rsb_list(&ls->ls_rsbtbl[b].toss, name, len, flags, &r);
385 if (error)
386 goto out;
387
388 list_move(&r->res_hashchain, &ls->ls_rsbtbl[b].list);
389
390 if (dlm_no_directory(ls))
391 goto out;
392
393 if (r->res_nodeid == -1) {
394 rsb_clear_flag(r, RSB_MASTER_UNCERTAIN);
395 r->res_first_lkid = 0;
396 } else if (r->res_nodeid > 0) {
397 rsb_set_flag(r, RSB_MASTER_UNCERTAIN);
398 r->res_first_lkid = 0;
399 } else {
400 DLM_ASSERT(r->res_nodeid == 0, dlm_print_rsb(r););
401 DLM_ASSERT(!rsb_flag(r, RSB_MASTER_UNCERTAIN),);
402 }
403 out:
404 *r_ret = r;
405 return error;
406}
407
408static int search_rsb(struct dlm_ls *ls, char *name, int len, int b,
409 unsigned int flags, struct dlm_rsb **r_ret)
410{
411 int error;
c7be761a 412 spin_lock(&ls->ls_rsbtbl[b].lock);
e7fd4179 413 error = _search_rsb(ls, name, len, b, flags, r_ret);
c7be761a 414 spin_unlock(&ls->ls_rsbtbl[b].lock);
e7fd4179
DT
415 return error;
416}
417
418/*
419 * Find rsb in rsbtbl and potentially create/add one
420 *
421 * Delaying the release of rsb's has a similar benefit to applications keeping
422 * NL locks on an rsb, but without the guarantee that the cached master value
423 * will still be valid when the rsb is reused. Apps aren't always smart enough
424 * to keep NL locks on an rsb that they may lock again shortly; this can lead
425 * to excessive master lookups and removals if we don't delay the release.
426 *
427 * Searching for an rsb means looking through both the normal list and toss
428 * list. When found on the toss list the rsb is moved to the normal list with
429 * ref count of 1; when found on normal list the ref count is incremented.
430 */
431
432static int find_rsb(struct dlm_ls *ls, char *name, int namelen,
433 unsigned int flags, struct dlm_rsb **r_ret)
434{
a566a6b1 435 struct dlm_rsb *r = NULL, *tmp;
e7fd4179 436 uint32_t hash, bucket;
ef58bcca
AV
437 int error = -EINVAL;
438
439 if (namelen > DLM_RESNAME_MAXLEN)
440 goto out;
e7fd4179
DT
441
442 if (dlm_no_directory(ls))
443 flags |= R_CREATE;
444
ef58bcca 445 error = 0;
e7fd4179
DT
446 hash = jhash(name, namelen, 0);
447 bucket = hash & (ls->ls_rsbtbl_size - 1);
448
449 error = search_rsb(ls, name, namelen, bucket, flags, &r);
450 if (!error)
451 goto out;
452
597d0cae 453 if (error == -EBADR && !(flags & R_CREATE))
e7fd4179
DT
454 goto out;
455
456 /* the rsb was found but wasn't a master copy */
457 if (error == -ENOTBLK)
458 goto out;
459
460 error = -ENOMEM;
461 r = create_rsb(ls, name, namelen);
462 if (!r)
463 goto out;
464
465 r->res_hash = hash;
466 r->res_bucket = bucket;
467 r->res_nodeid = -1;
468 kref_init(&r->res_ref);
469
470 /* With no directory, the master can be set immediately */
471 if (dlm_no_directory(ls)) {
472 int nodeid = dlm_dir_nodeid(r);
473 if (nodeid == dlm_our_nodeid())
474 nodeid = 0;
475 r->res_nodeid = nodeid;
476 }
477
c7be761a 478 spin_lock(&ls->ls_rsbtbl[bucket].lock);
e7fd4179
DT
479 error = _search_rsb(ls, name, namelen, bucket, 0, &tmp);
480 if (!error) {
c7be761a 481 spin_unlock(&ls->ls_rsbtbl[bucket].lock);
52bda2b5 482 dlm_free_rsb(r);
e7fd4179
DT
483 r = tmp;
484 goto out;
485 }
486 list_add(&r->res_hashchain, &ls->ls_rsbtbl[bucket].list);
c7be761a 487 spin_unlock(&ls->ls_rsbtbl[bucket].lock);
e7fd4179
DT
488 error = 0;
489 out:
490 *r_ret = r;
491 return error;
492}
493
e7fd4179
DT
494/* This is only called to add a reference when the code already holds
495 a valid reference to the rsb, so there's no need for locking. */
496
497static inline void hold_rsb(struct dlm_rsb *r)
498{
499 kref_get(&r->res_ref);
500}
501
502void dlm_hold_rsb(struct dlm_rsb *r)
503{
504 hold_rsb(r);
505}
506
507static void toss_rsb(struct kref *kref)
508{
509 struct dlm_rsb *r = container_of(kref, struct dlm_rsb, res_ref);
510 struct dlm_ls *ls = r->res_ls;
511
512 DLM_ASSERT(list_empty(&r->res_root_list), dlm_print_rsb(r););
513 kref_init(&r->res_ref);
514 list_move(&r->res_hashchain, &ls->ls_rsbtbl[r->res_bucket].toss);
515 r->res_toss_time = jiffies;
516 if (r->res_lvbptr) {
52bda2b5 517 dlm_free_lvb(r->res_lvbptr);
e7fd4179
DT
518 r->res_lvbptr = NULL;
519 }
520}
521
25985edc 522/* When all references to the rsb are gone it's transferred to
e7fd4179
DT
523 the tossed list for later disposal. */
524
525static void put_rsb(struct dlm_rsb *r)
526{
527 struct dlm_ls *ls = r->res_ls;
528 uint32_t bucket = r->res_bucket;
529
c7be761a 530 spin_lock(&ls->ls_rsbtbl[bucket].lock);
e7fd4179 531 kref_put(&r->res_ref, toss_rsb);
c7be761a 532 spin_unlock(&ls->ls_rsbtbl[bucket].lock);
e7fd4179
DT
533}
534
535void dlm_put_rsb(struct dlm_rsb *r)
536{
537 put_rsb(r);
538}
539
540/* See comment for unhold_lkb */
541
542static void unhold_rsb(struct dlm_rsb *r)
543{
544 int rv;
545 rv = kref_put(&r->res_ref, toss_rsb);
a345da3e 546 DLM_ASSERT(!rv, dlm_dump_rsb(r););
e7fd4179
DT
547}
548
549static void kill_rsb(struct kref *kref)
550{
551 struct dlm_rsb *r = container_of(kref, struct dlm_rsb, res_ref);
552
553 /* All work is done after the return from kref_put() so we
554 can release the write_lock before the remove and free. */
555
a345da3e
DT
556 DLM_ASSERT(list_empty(&r->res_lookup), dlm_dump_rsb(r););
557 DLM_ASSERT(list_empty(&r->res_grantqueue), dlm_dump_rsb(r););
558 DLM_ASSERT(list_empty(&r->res_convertqueue), dlm_dump_rsb(r););
559 DLM_ASSERT(list_empty(&r->res_waitqueue), dlm_dump_rsb(r););
560 DLM_ASSERT(list_empty(&r->res_root_list), dlm_dump_rsb(r););
561 DLM_ASSERT(list_empty(&r->res_recover_list), dlm_dump_rsb(r););
e7fd4179
DT
562}
563
564/* Attaching/detaching lkb's from rsb's is for rsb reference counting.
565 The rsb must exist as long as any lkb's for it do. */
566
567static void attach_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb)
568{
569 hold_rsb(r);
570 lkb->lkb_resource = r;
571}
572
573static void detach_lkb(struct dlm_lkb *lkb)
574{
575 if (lkb->lkb_resource) {
576 put_rsb(lkb->lkb_resource);
577 lkb->lkb_resource = NULL;
578 }
579}
580
581static int create_lkb(struct dlm_ls *ls, struct dlm_lkb **lkb_ret)
582{
3d6aa675
DT
583 struct dlm_lkb *lkb;
584 int rv, id;
e7fd4179 585
52bda2b5 586 lkb = dlm_allocate_lkb(ls);
e7fd4179
DT
587 if (!lkb)
588 return -ENOMEM;
589
590 lkb->lkb_nodeid = -1;
591 lkb->lkb_grmode = DLM_LOCK_IV;
592 kref_init(&lkb->lkb_ref);
34e22bed 593 INIT_LIST_HEAD(&lkb->lkb_ownqueue);
ef0c2bb0 594 INIT_LIST_HEAD(&lkb->lkb_rsb_lookup);
3ae1acf9 595 INIT_LIST_HEAD(&lkb->lkb_time_list);
8304d6f2 596 INIT_LIST_HEAD(&lkb->lkb_astqueue);
e7fd4179 597
3d6aa675
DT
598 retry:
599 rv = idr_pre_get(&ls->ls_lkbidr, GFP_NOFS);
600 if (!rv)
601 return -ENOMEM;
e7fd4179 602
3d6aa675
DT
603 spin_lock(&ls->ls_lkbidr_spin);
604 rv = idr_get_new_above(&ls->ls_lkbidr, lkb, 1, &id);
605 if (!rv)
606 lkb->lkb_id = id;
607 spin_unlock(&ls->ls_lkbidr_spin);
e7fd4179 608
3d6aa675
DT
609 if (rv == -EAGAIN)
610 goto retry;
e7fd4179 611
3d6aa675
DT
612 if (rv < 0) {
613 log_error(ls, "create_lkb idr error %d", rv);
614 return rv;
e7fd4179
DT
615 }
616
e7fd4179
DT
617 *lkb_ret = lkb;
618 return 0;
619}
620
e7fd4179
DT
621static int find_lkb(struct dlm_ls *ls, uint32_t lkid, struct dlm_lkb **lkb_ret)
622{
623 struct dlm_lkb *lkb;
e7fd4179 624
3d6aa675
DT
625 spin_lock(&ls->ls_lkbidr_spin);
626 lkb = idr_find(&ls->ls_lkbidr, lkid);
e7fd4179
DT
627 if (lkb)
628 kref_get(&lkb->lkb_ref);
3d6aa675 629 spin_unlock(&ls->ls_lkbidr_spin);
e7fd4179
DT
630
631 *lkb_ret = lkb;
632 return lkb ? 0 : -ENOENT;
633}
634
635static void kill_lkb(struct kref *kref)
636{
637 struct dlm_lkb *lkb = container_of(kref, struct dlm_lkb, lkb_ref);
638
639 /* All work is done after the return from kref_put() so we
640 can release the write_lock before the detach_lkb */
641
642 DLM_ASSERT(!lkb->lkb_status, dlm_print_lkb(lkb););
643}
644
b3f58d8f
DT
645/* __put_lkb() is used when an lkb may not have an rsb attached to
646 it so we need to provide the lockspace explicitly */
647
648static int __put_lkb(struct dlm_ls *ls, struct dlm_lkb *lkb)
e7fd4179 649{
3d6aa675 650 uint32_t lkid = lkb->lkb_id;
e7fd4179 651
3d6aa675 652 spin_lock(&ls->ls_lkbidr_spin);
e7fd4179 653 if (kref_put(&lkb->lkb_ref, kill_lkb)) {
3d6aa675
DT
654 idr_remove(&ls->ls_lkbidr, lkid);
655 spin_unlock(&ls->ls_lkbidr_spin);
e7fd4179
DT
656
657 detach_lkb(lkb);
658
659 /* for local/process lkbs, lvbptr points to caller's lksb */
660 if (lkb->lkb_lvbptr && is_master_copy(lkb))
52bda2b5
DT
661 dlm_free_lvb(lkb->lkb_lvbptr);
662 dlm_free_lkb(lkb);
e7fd4179
DT
663 return 1;
664 } else {
3d6aa675 665 spin_unlock(&ls->ls_lkbidr_spin);
e7fd4179
DT
666 return 0;
667 }
668}
669
670int dlm_put_lkb(struct dlm_lkb *lkb)
671{
b3f58d8f
DT
672 struct dlm_ls *ls;
673
674 DLM_ASSERT(lkb->lkb_resource, dlm_print_lkb(lkb););
675 DLM_ASSERT(lkb->lkb_resource->res_ls, dlm_print_lkb(lkb););
676
677 ls = lkb->lkb_resource->res_ls;
678 return __put_lkb(ls, lkb);
e7fd4179
DT
679}
680
681/* This is only called to add a reference when the code already holds
682 a valid reference to the lkb, so there's no need for locking. */
683
684static inline void hold_lkb(struct dlm_lkb *lkb)
685{
686 kref_get(&lkb->lkb_ref);
687}
688
689/* This is called when we need to remove a reference and are certain
690 it's not the last ref. e.g. del_lkb is always called between a
691 find_lkb/put_lkb and is always the inverse of a previous add_lkb.
692 put_lkb would work fine, but would involve unnecessary locking */
693
694static inline void unhold_lkb(struct dlm_lkb *lkb)
695{
696 int rv;
697 rv = kref_put(&lkb->lkb_ref, kill_lkb);
698 DLM_ASSERT(!rv, dlm_print_lkb(lkb););
699}
700
701static void lkb_add_ordered(struct list_head *new, struct list_head *head,
702 int mode)
703{
704 struct dlm_lkb *lkb = NULL;
705
706 list_for_each_entry(lkb, head, lkb_statequeue)
707 if (lkb->lkb_rqmode < mode)
708 break;
709
99fb19d4 710 __list_add(new, lkb->lkb_statequeue.prev, &lkb->lkb_statequeue);
e7fd4179
DT
711}
712
713/* add/remove lkb to rsb's grant/convert/wait queue */
714
715static void add_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb, int status)
716{
717 kref_get(&lkb->lkb_ref);
718
719 DLM_ASSERT(!lkb->lkb_status, dlm_print_lkb(lkb););
720
eeda418d
DT
721 lkb->lkb_timestamp = ktime_get();
722
e7fd4179
DT
723 lkb->lkb_status = status;
724
725 switch (status) {
726 case DLM_LKSTS_WAITING:
727 if (lkb->lkb_exflags & DLM_LKF_HEADQUE)
728 list_add(&lkb->lkb_statequeue, &r->res_waitqueue);
729 else
730 list_add_tail(&lkb->lkb_statequeue, &r->res_waitqueue);
731 break;
732 case DLM_LKSTS_GRANTED:
733 /* convention says granted locks kept in order of grmode */
734 lkb_add_ordered(&lkb->lkb_statequeue, &r->res_grantqueue,
735 lkb->lkb_grmode);
736 break;
737 case DLM_LKSTS_CONVERT:
738 if (lkb->lkb_exflags & DLM_LKF_HEADQUE)
739 list_add(&lkb->lkb_statequeue, &r->res_convertqueue);
740 else
741 list_add_tail(&lkb->lkb_statequeue,
742 &r->res_convertqueue);
743 break;
744 default:
745 DLM_ASSERT(0, dlm_print_lkb(lkb); printk("sts=%d\n", status););
746 }
747}
748
749static void del_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb)
750{
751 lkb->lkb_status = 0;
752 list_del(&lkb->lkb_statequeue);
753 unhold_lkb(lkb);
754}
755
756static void move_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb, int sts)
757{
758 hold_lkb(lkb);
759 del_lkb(r, lkb);
760 add_lkb(r, lkb, sts);
761 unhold_lkb(lkb);
762}
763
ef0c2bb0
DT
764static int msg_reply_type(int mstype)
765{
766 switch (mstype) {
767 case DLM_MSG_REQUEST:
768 return DLM_MSG_REQUEST_REPLY;
769 case DLM_MSG_CONVERT:
770 return DLM_MSG_CONVERT_REPLY;
771 case DLM_MSG_UNLOCK:
772 return DLM_MSG_UNLOCK_REPLY;
773 case DLM_MSG_CANCEL:
774 return DLM_MSG_CANCEL_REPLY;
775 case DLM_MSG_LOOKUP:
776 return DLM_MSG_LOOKUP_REPLY;
777 }
778 return -1;
779}
780
c6ff669b
DT
781static int nodeid_warned(int nodeid, int num_nodes, int *warned)
782{
783 int i;
784
785 for (i = 0; i < num_nodes; i++) {
786 if (!warned[i]) {
787 warned[i] = nodeid;
788 return 0;
789 }
790 if (warned[i] == nodeid)
791 return 1;
792 }
793 return 0;
794}
795
796void dlm_scan_waiters(struct dlm_ls *ls)
797{
798 struct dlm_lkb *lkb;
799 ktime_t zero = ktime_set(0, 0);
800 s64 us;
801 s64 debug_maxus = 0;
802 u32 debug_scanned = 0;
803 u32 debug_expired = 0;
804 int num_nodes = 0;
805 int *warned = NULL;
806
807 if (!dlm_config.ci_waitwarn_us)
808 return;
809
810 mutex_lock(&ls->ls_waiters_mutex);
811
812 list_for_each_entry(lkb, &ls->ls_waiters, lkb_wait_reply) {
813 if (ktime_equal(lkb->lkb_wait_time, zero))
814 continue;
815
816 debug_scanned++;
817
818 us = ktime_to_us(ktime_sub(ktime_get(), lkb->lkb_wait_time));
819
820 if (us < dlm_config.ci_waitwarn_us)
821 continue;
822
823 lkb->lkb_wait_time = zero;
824
825 debug_expired++;
826 if (us > debug_maxus)
827 debug_maxus = us;
828
829 if (!num_nodes) {
830 num_nodes = ls->ls_num_nodes;
5d70828a 831 warned = kzalloc(num_nodes * sizeof(int), GFP_KERNEL);
c6ff669b
DT
832 }
833 if (!warned)
834 continue;
835 if (nodeid_warned(lkb->lkb_wait_nodeid, num_nodes, warned))
836 continue;
837
838 log_error(ls, "waitwarn %x %lld %d us check connection to "
839 "node %d", lkb->lkb_id, (long long)us,
840 dlm_config.ci_waitwarn_us, lkb->lkb_wait_nodeid);
841 }
842 mutex_unlock(&ls->ls_waiters_mutex);
5d70828a 843 kfree(warned);
c6ff669b
DT
844
845 if (debug_expired)
846 log_debug(ls, "scan_waiters %u warn %u over %d us max %lld us",
847 debug_scanned, debug_expired,
848 dlm_config.ci_waitwarn_us, (long long)debug_maxus);
849}
850
e7fd4179
DT
851/* add/remove lkb from global waiters list of lkb's waiting for
852 a reply from a remote node */
853
c6ff669b 854static int add_to_waiters(struct dlm_lkb *lkb, int mstype, int to_nodeid)
e7fd4179
DT
855{
856 struct dlm_ls *ls = lkb->lkb_resource->res_ls;
ef0c2bb0 857 int error = 0;
e7fd4179 858
90135925 859 mutex_lock(&ls->ls_waiters_mutex);
ef0c2bb0
DT
860
861 if (is_overlap_unlock(lkb) ||
862 (is_overlap_cancel(lkb) && (mstype == DLM_MSG_CANCEL))) {
863 error = -EINVAL;
864 goto out;
865 }
866
867 if (lkb->lkb_wait_type || is_overlap_cancel(lkb)) {
868 switch (mstype) {
869 case DLM_MSG_UNLOCK:
870 lkb->lkb_flags |= DLM_IFL_OVERLAP_UNLOCK;
871 break;
872 case DLM_MSG_CANCEL:
873 lkb->lkb_flags |= DLM_IFL_OVERLAP_CANCEL;
874 break;
875 default:
876 error = -EBUSY;
877 goto out;
878 }
879 lkb->lkb_wait_count++;
880 hold_lkb(lkb);
881
43279e53 882 log_debug(ls, "addwait %x cur %d overlap %d count %d f %x",
ef0c2bb0
DT
883 lkb->lkb_id, lkb->lkb_wait_type, mstype,
884 lkb->lkb_wait_count, lkb->lkb_flags);
e7fd4179
DT
885 goto out;
886 }
ef0c2bb0
DT
887
888 DLM_ASSERT(!lkb->lkb_wait_count,
889 dlm_print_lkb(lkb);
890 printk("wait_count %d\n", lkb->lkb_wait_count););
891
892 lkb->lkb_wait_count++;
e7fd4179 893 lkb->lkb_wait_type = mstype;
c6ff669b
DT
894 lkb->lkb_wait_time = ktime_get();
895 lkb->lkb_wait_nodeid = to_nodeid; /* for debugging */
ef0c2bb0 896 hold_lkb(lkb);
e7fd4179
DT
897 list_add(&lkb->lkb_wait_reply, &ls->ls_waiters);
898 out:
ef0c2bb0 899 if (error)
43279e53 900 log_error(ls, "addwait error %x %d flags %x %d %d %s",
ef0c2bb0
DT
901 lkb->lkb_id, error, lkb->lkb_flags, mstype,
902 lkb->lkb_wait_type, lkb->lkb_resource->res_name);
90135925 903 mutex_unlock(&ls->ls_waiters_mutex);
ef0c2bb0 904 return error;
e7fd4179
DT
905}
906
b790c3b7
DT
907/* We clear the RESEND flag because we might be taking an lkb off the waiters
908 list as part of process_requestqueue (e.g. a lookup that has an optimized
909 request reply on the requestqueue) between dlm_recover_waiters_pre() which
910 set RESEND and dlm_recover_waiters_post() */
911
43279e53
DT
912static int _remove_from_waiters(struct dlm_lkb *lkb, int mstype,
913 struct dlm_message *ms)
e7fd4179 914{
ef0c2bb0
DT
915 struct dlm_ls *ls = lkb->lkb_resource->res_ls;
916 int overlap_done = 0;
e7fd4179 917
ef0c2bb0 918 if (is_overlap_unlock(lkb) && (mstype == DLM_MSG_UNLOCK_REPLY)) {
43279e53 919 log_debug(ls, "remwait %x unlock_reply overlap", lkb->lkb_id);
ef0c2bb0
DT
920 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK;
921 overlap_done = 1;
922 goto out_del;
e7fd4179 923 }
ef0c2bb0
DT
924
925 if (is_overlap_cancel(lkb) && (mstype == DLM_MSG_CANCEL_REPLY)) {
43279e53 926 log_debug(ls, "remwait %x cancel_reply overlap", lkb->lkb_id);
ef0c2bb0
DT
927 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
928 overlap_done = 1;
929 goto out_del;
930 }
931
43279e53
DT
932 /* Cancel state was preemptively cleared by a successful convert,
933 see next comment, nothing to do. */
934
935 if ((mstype == DLM_MSG_CANCEL_REPLY) &&
936 (lkb->lkb_wait_type != DLM_MSG_CANCEL)) {
937 log_debug(ls, "remwait %x cancel_reply wait_type %d",
938 lkb->lkb_id, lkb->lkb_wait_type);
939 return -1;
940 }
941
942 /* Remove for the convert reply, and premptively remove for the
943 cancel reply. A convert has been granted while there's still
944 an outstanding cancel on it (the cancel is moot and the result
945 in the cancel reply should be 0). We preempt the cancel reply
946 because the app gets the convert result and then can follow up
947 with another op, like convert. This subsequent op would see the
948 lingering state of the cancel and fail with -EBUSY. */
949
950 if ((mstype == DLM_MSG_CONVERT_REPLY) &&
951 (lkb->lkb_wait_type == DLM_MSG_CONVERT) &&
952 is_overlap_cancel(lkb) && ms && !ms->m_result) {
953 log_debug(ls, "remwait %x convert_reply zap overlap_cancel",
954 lkb->lkb_id);
955 lkb->lkb_wait_type = 0;
956 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
957 lkb->lkb_wait_count--;
958 goto out_del;
959 }
960
ef0c2bb0
DT
961 /* N.B. type of reply may not always correspond to type of original
962 msg due to lookup->request optimization, verify others? */
963
964 if (lkb->lkb_wait_type) {
965 lkb->lkb_wait_type = 0;
966 goto out_del;
967 }
968
43279e53
DT
969 log_error(ls, "remwait error %x reply %d flags %x no wait_type",
970 lkb->lkb_id, mstype, lkb->lkb_flags);
ef0c2bb0
DT
971 return -1;
972
973 out_del:
974 /* the force-unlock/cancel has completed and we haven't recvd a reply
975 to the op that was in progress prior to the unlock/cancel; we
976 give up on any reply to the earlier op. FIXME: not sure when/how
977 this would happen */
978
979 if (overlap_done && lkb->lkb_wait_type) {
43279e53 980 log_error(ls, "remwait error %x reply %d wait_type %d overlap",
ef0c2bb0
DT
981 lkb->lkb_id, mstype, lkb->lkb_wait_type);
982 lkb->lkb_wait_count--;
983 lkb->lkb_wait_type = 0;
984 }
985
986 DLM_ASSERT(lkb->lkb_wait_count, dlm_print_lkb(lkb););
987
b790c3b7 988 lkb->lkb_flags &= ~DLM_IFL_RESEND;
ef0c2bb0
DT
989 lkb->lkb_wait_count--;
990 if (!lkb->lkb_wait_count)
991 list_del_init(&lkb->lkb_wait_reply);
e7fd4179 992 unhold_lkb(lkb);
ef0c2bb0 993 return 0;
e7fd4179
DT
994}
995
ef0c2bb0 996static int remove_from_waiters(struct dlm_lkb *lkb, int mstype)
e7fd4179
DT
997{
998 struct dlm_ls *ls = lkb->lkb_resource->res_ls;
999 int error;
1000
90135925 1001 mutex_lock(&ls->ls_waiters_mutex);
43279e53 1002 error = _remove_from_waiters(lkb, mstype, NULL);
90135925 1003 mutex_unlock(&ls->ls_waiters_mutex);
e7fd4179
DT
1004 return error;
1005}
1006
ef0c2bb0
DT
1007/* Handles situations where we might be processing a "fake" or "stub" reply in
1008 which we can't try to take waiters_mutex again. */
1009
1010static int remove_from_waiters_ms(struct dlm_lkb *lkb, struct dlm_message *ms)
1011{
1012 struct dlm_ls *ls = lkb->lkb_resource->res_ls;
1013 int error;
1014
2a7ce0ed 1015 if (ms->m_flags != DLM_IFL_STUB_MS)
ef0c2bb0 1016 mutex_lock(&ls->ls_waiters_mutex);
43279e53 1017 error = _remove_from_waiters(lkb, ms->m_type, ms);
2a7ce0ed 1018 if (ms->m_flags != DLM_IFL_STUB_MS)
ef0c2bb0
DT
1019 mutex_unlock(&ls->ls_waiters_mutex);
1020 return error;
1021}
1022
e7fd4179
DT
1023static void dir_remove(struct dlm_rsb *r)
1024{
1025 int to_nodeid;
1026
1027 if (dlm_no_directory(r->res_ls))
1028 return;
1029
1030 to_nodeid = dlm_dir_nodeid(r);
1031 if (to_nodeid != dlm_our_nodeid())
1032 send_remove(r);
1033 else
1034 dlm_dir_remove_entry(r->res_ls, to_nodeid,
1035 r->res_name, r->res_length);
1036}
1037
1038/* FIXME: shouldn't this be able to exit as soon as one non-due rsb is
1039 found since they are in order of newest to oldest? */
1040
1041static int shrink_bucket(struct dlm_ls *ls, int b)
1042{
1043 struct dlm_rsb *r;
1044 int count = 0, found;
1045
1046 for (;;) {
90135925 1047 found = 0;
c7be761a 1048 spin_lock(&ls->ls_rsbtbl[b].lock);
e7fd4179
DT
1049 list_for_each_entry_reverse(r, &ls->ls_rsbtbl[b].toss,
1050 res_hashchain) {
1051 if (!time_after_eq(jiffies, r->res_toss_time +
68c817a1 1052 dlm_config.ci_toss_secs * HZ))
e7fd4179 1053 continue;
90135925 1054 found = 1;
e7fd4179
DT
1055 break;
1056 }
1057
1058 if (!found) {
c7be761a 1059 spin_unlock(&ls->ls_rsbtbl[b].lock);
e7fd4179
DT
1060 break;
1061 }
1062
1063 if (kref_put(&r->res_ref, kill_rsb)) {
1064 list_del(&r->res_hashchain);
c7be761a 1065 spin_unlock(&ls->ls_rsbtbl[b].lock);
e7fd4179
DT
1066
1067 if (is_master(r))
1068 dir_remove(r);
52bda2b5 1069 dlm_free_rsb(r);
e7fd4179
DT
1070 count++;
1071 } else {
c7be761a 1072 spin_unlock(&ls->ls_rsbtbl[b].lock);
e7fd4179
DT
1073 log_error(ls, "tossed rsb in use %s", r->res_name);
1074 }
1075 }
1076
1077 return count;
1078}
1079
1080void dlm_scan_rsbs(struct dlm_ls *ls)
1081{
1082 int i;
1083
e7fd4179
DT
1084 for (i = 0; i < ls->ls_rsbtbl_size; i++) {
1085 shrink_bucket(ls, i);
85e86edf
DT
1086 if (dlm_locking_stopped(ls))
1087 break;
e7fd4179
DT
1088 cond_resched();
1089 }
1090}
1091
3ae1acf9
DT
1092static void add_timeout(struct dlm_lkb *lkb)
1093{
1094 struct dlm_ls *ls = lkb->lkb_resource->res_ls;
1095
eeda418d 1096 if (is_master_copy(lkb))
3ae1acf9 1097 return;
3ae1acf9
DT
1098
1099 if (test_bit(LSFL_TIMEWARN, &ls->ls_flags) &&
1100 !(lkb->lkb_exflags & DLM_LKF_NODLCKWT)) {
1101 lkb->lkb_flags |= DLM_IFL_WATCH_TIMEWARN;
1102 goto add_it;
1103 }
84d8cd69
DT
1104 if (lkb->lkb_exflags & DLM_LKF_TIMEOUT)
1105 goto add_it;
3ae1acf9
DT
1106 return;
1107
1108 add_it:
1109 DLM_ASSERT(list_empty(&lkb->lkb_time_list), dlm_print_lkb(lkb););
1110 mutex_lock(&ls->ls_timeout_mutex);
1111 hold_lkb(lkb);
3ae1acf9
DT
1112 list_add_tail(&lkb->lkb_time_list, &ls->ls_timeout);
1113 mutex_unlock(&ls->ls_timeout_mutex);
1114}
1115
1116static void del_timeout(struct dlm_lkb *lkb)
1117{
1118 struct dlm_ls *ls = lkb->lkb_resource->res_ls;
1119
1120 mutex_lock(&ls->ls_timeout_mutex);
1121 if (!list_empty(&lkb->lkb_time_list)) {
1122 list_del_init(&lkb->lkb_time_list);
1123 unhold_lkb(lkb);
1124 }
1125 mutex_unlock(&ls->ls_timeout_mutex);
1126}
1127
1128/* FIXME: is it safe to look at lkb_exflags, lkb_flags, lkb_timestamp, and
1129 lkb_lksb_timeout without lock_rsb? Note: we can't lock timeout_mutex
1130 and then lock rsb because of lock ordering in add_timeout. We may need
1131 to specify some special timeout-related bits in the lkb that are just to
1132 be accessed under the timeout_mutex. */
1133
1134void dlm_scan_timeout(struct dlm_ls *ls)
1135{
1136 struct dlm_rsb *r;
1137 struct dlm_lkb *lkb;
1138 int do_cancel, do_warn;
eeda418d 1139 s64 wait_us;
3ae1acf9
DT
1140
1141 for (;;) {
1142 if (dlm_locking_stopped(ls))
1143 break;
1144
1145 do_cancel = 0;
1146 do_warn = 0;
1147 mutex_lock(&ls->ls_timeout_mutex);
1148 list_for_each_entry(lkb, &ls->ls_timeout, lkb_time_list) {
1149
eeda418d
DT
1150 wait_us = ktime_to_us(ktime_sub(ktime_get(),
1151 lkb->lkb_timestamp));
1152
3ae1acf9 1153 if ((lkb->lkb_exflags & DLM_LKF_TIMEOUT) &&
eeda418d 1154 wait_us >= (lkb->lkb_timeout_cs * 10000))
3ae1acf9
DT
1155 do_cancel = 1;
1156
1157 if ((lkb->lkb_flags & DLM_IFL_WATCH_TIMEWARN) &&
eeda418d 1158 wait_us >= dlm_config.ci_timewarn_cs * 10000)
3ae1acf9
DT
1159 do_warn = 1;
1160
1161 if (!do_cancel && !do_warn)
1162 continue;
1163 hold_lkb(lkb);
1164 break;
1165 }
1166 mutex_unlock(&ls->ls_timeout_mutex);
1167
1168 if (!do_cancel && !do_warn)
1169 break;
1170
1171 r = lkb->lkb_resource;
1172 hold_rsb(r);
1173 lock_rsb(r);
1174
1175 if (do_warn) {
1176 /* clear flag so we only warn once */
1177 lkb->lkb_flags &= ~DLM_IFL_WATCH_TIMEWARN;
1178 if (!(lkb->lkb_exflags & DLM_LKF_TIMEOUT))
1179 del_timeout(lkb);
1180 dlm_timeout_warn(lkb);
1181 }
1182
1183 if (do_cancel) {
b3cab7b9 1184 log_debug(ls, "timeout cancel %x node %d %s",
639aca41 1185 lkb->lkb_id, lkb->lkb_nodeid, r->res_name);
3ae1acf9
DT
1186 lkb->lkb_flags &= ~DLM_IFL_WATCH_TIMEWARN;
1187 lkb->lkb_flags |= DLM_IFL_TIMEOUT_CANCEL;
1188 del_timeout(lkb);
1189 _cancel_lock(r, lkb);
1190 }
1191
1192 unlock_rsb(r);
1193 unhold_rsb(r);
1194 dlm_put_lkb(lkb);
1195 }
1196}
1197
1198/* This is only called by dlm_recoverd, and we rely on dlm_ls_stop() stopping
1199 dlm_recoverd before checking/setting ls_recover_begin. */
1200
1201void dlm_adjust_timeouts(struct dlm_ls *ls)
1202{
1203 struct dlm_lkb *lkb;
eeda418d 1204 u64 adj_us = jiffies_to_usecs(jiffies - ls->ls_recover_begin);
3ae1acf9
DT
1205
1206 ls->ls_recover_begin = 0;
1207 mutex_lock(&ls->ls_timeout_mutex);
1208 list_for_each_entry(lkb, &ls->ls_timeout, lkb_time_list)
eeda418d 1209 lkb->lkb_timestamp = ktime_add_us(lkb->lkb_timestamp, adj_us);
3ae1acf9 1210 mutex_unlock(&ls->ls_timeout_mutex);
c6ff669b
DT
1211
1212 if (!dlm_config.ci_waitwarn_us)
1213 return;
1214
1215 mutex_lock(&ls->ls_waiters_mutex);
1216 list_for_each_entry(lkb, &ls->ls_waiters, lkb_wait_reply) {
1217 if (ktime_to_us(lkb->lkb_wait_time))
1218 lkb->lkb_wait_time = ktime_get();
1219 }
1220 mutex_unlock(&ls->ls_waiters_mutex);
3ae1acf9
DT
1221}
1222
e7fd4179
DT
1223/* lkb is master or local copy */
1224
1225static void set_lvb_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1226{
1227 int b, len = r->res_ls->ls_lvblen;
1228
1229 /* b=1 lvb returned to caller
1230 b=0 lvb written to rsb or invalidated
1231 b=-1 do nothing */
1232
1233 b = dlm_lvb_operations[lkb->lkb_grmode + 1][lkb->lkb_rqmode + 1];
1234
1235 if (b == 1) {
1236 if (!lkb->lkb_lvbptr)
1237 return;
1238
1239 if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
1240 return;
1241
1242 if (!r->res_lvbptr)
1243 return;
1244
1245 memcpy(lkb->lkb_lvbptr, r->res_lvbptr, len);
1246 lkb->lkb_lvbseq = r->res_lvbseq;
1247
1248 } else if (b == 0) {
1249 if (lkb->lkb_exflags & DLM_LKF_IVVALBLK) {
1250 rsb_set_flag(r, RSB_VALNOTVALID);
1251 return;
1252 }
1253
1254 if (!lkb->lkb_lvbptr)
1255 return;
1256
1257 if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
1258 return;
1259
1260 if (!r->res_lvbptr)
52bda2b5 1261 r->res_lvbptr = dlm_allocate_lvb(r->res_ls);
e7fd4179
DT
1262
1263 if (!r->res_lvbptr)
1264 return;
1265
1266 memcpy(r->res_lvbptr, lkb->lkb_lvbptr, len);
1267 r->res_lvbseq++;
1268 lkb->lkb_lvbseq = r->res_lvbseq;
1269 rsb_clear_flag(r, RSB_VALNOTVALID);
1270 }
1271
1272 if (rsb_flag(r, RSB_VALNOTVALID))
1273 lkb->lkb_sbflags |= DLM_SBF_VALNOTVALID;
1274}
1275
1276static void set_lvb_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1277{
1278 if (lkb->lkb_grmode < DLM_LOCK_PW)
1279 return;
1280
1281 if (lkb->lkb_exflags & DLM_LKF_IVVALBLK) {
1282 rsb_set_flag(r, RSB_VALNOTVALID);
1283 return;
1284 }
1285
1286 if (!lkb->lkb_lvbptr)
1287 return;
1288
1289 if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
1290 return;
1291
1292 if (!r->res_lvbptr)
52bda2b5 1293 r->res_lvbptr = dlm_allocate_lvb(r->res_ls);
e7fd4179
DT
1294
1295 if (!r->res_lvbptr)
1296 return;
1297
1298 memcpy(r->res_lvbptr, lkb->lkb_lvbptr, r->res_ls->ls_lvblen);
1299 r->res_lvbseq++;
1300 rsb_clear_flag(r, RSB_VALNOTVALID);
1301}
1302
1303/* lkb is process copy (pc) */
1304
1305static void set_lvb_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb,
1306 struct dlm_message *ms)
1307{
1308 int b;
1309
1310 if (!lkb->lkb_lvbptr)
1311 return;
1312
1313 if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
1314 return;
1315
597d0cae 1316 b = dlm_lvb_operations[lkb->lkb_grmode + 1][lkb->lkb_rqmode + 1];
e7fd4179
DT
1317 if (b == 1) {
1318 int len = receive_extralen(ms);
a9cc9159
AV
1319 if (len > DLM_RESNAME_MAXLEN)
1320 len = DLM_RESNAME_MAXLEN;
e7fd4179
DT
1321 memcpy(lkb->lkb_lvbptr, ms->m_extra, len);
1322 lkb->lkb_lvbseq = ms->m_lvbseq;
1323 }
1324}
1325
1326/* Manipulate lkb's on rsb's convert/granted/waiting queues
1327 remove_lock -- used for unlock, removes lkb from granted
1328 revert_lock -- used for cancel, moves lkb from convert to granted
1329 grant_lock -- used for request and convert, adds lkb to granted or
1330 moves lkb from convert or waiting to granted
1331
1332 Each of these is used for master or local copy lkb's. There is
1333 also a _pc() variation used to make the corresponding change on
1334 a process copy (pc) lkb. */
1335
1336static void _remove_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1337{
1338 del_lkb(r, lkb);
1339 lkb->lkb_grmode = DLM_LOCK_IV;
1340 /* this unhold undoes the original ref from create_lkb()
1341 so this leads to the lkb being freed */
1342 unhold_lkb(lkb);
1343}
1344
1345static void remove_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1346{
1347 set_lvb_unlock(r, lkb);
1348 _remove_lock(r, lkb);
1349}
1350
1351static void remove_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb)
1352{
1353 _remove_lock(r, lkb);
1354}
1355
ef0c2bb0
DT
1356/* returns: 0 did nothing
1357 1 moved lock to granted
1358 -1 removed lock */
1359
1360static int revert_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
e7fd4179 1361{
ef0c2bb0
DT
1362 int rv = 0;
1363
e7fd4179
DT
1364 lkb->lkb_rqmode = DLM_LOCK_IV;
1365
1366 switch (lkb->lkb_status) {
597d0cae
DT
1367 case DLM_LKSTS_GRANTED:
1368 break;
e7fd4179
DT
1369 case DLM_LKSTS_CONVERT:
1370 move_lkb(r, lkb, DLM_LKSTS_GRANTED);
ef0c2bb0 1371 rv = 1;
e7fd4179
DT
1372 break;
1373 case DLM_LKSTS_WAITING:
1374 del_lkb(r, lkb);
1375 lkb->lkb_grmode = DLM_LOCK_IV;
1376 /* this unhold undoes the original ref from create_lkb()
1377 so this leads to the lkb being freed */
1378 unhold_lkb(lkb);
ef0c2bb0 1379 rv = -1;
e7fd4179
DT
1380 break;
1381 default:
1382 log_print("invalid status for revert %d", lkb->lkb_status);
1383 }
ef0c2bb0 1384 return rv;
e7fd4179
DT
1385}
1386
ef0c2bb0 1387static int revert_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb)
e7fd4179 1388{
ef0c2bb0 1389 return revert_lock(r, lkb);
e7fd4179
DT
1390}
1391
1392static void _grant_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1393{
1394 if (lkb->lkb_grmode != lkb->lkb_rqmode) {
1395 lkb->lkb_grmode = lkb->lkb_rqmode;
1396 if (lkb->lkb_status)
1397 move_lkb(r, lkb, DLM_LKSTS_GRANTED);
1398 else
1399 add_lkb(r, lkb, DLM_LKSTS_GRANTED);
1400 }
1401
1402 lkb->lkb_rqmode = DLM_LOCK_IV;
e7fd4179
DT
1403}
1404
1405static void grant_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1406{
1407 set_lvb_lock(r, lkb);
1408 _grant_lock(r, lkb);
1409 lkb->lkb_highbast = 0;
1410}
1411
1412static void grant_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb,
1413 struct dlm_message *ms)
1414{
1415 set_lvb_lock_pc(r, lkb, ms);
1416 _grant_lock(r, lkb);
1417}
1418
1419/* called by grant_pending_locks() which means an async grant message must
1420 be sent to the requesting node in addition to granting the lock if the
1421 lkb belongs to a remote node. */
1422
1423static void grant_lock_pending(struct dlm_rsb *r, struct dlm_lkb *lkb)
1424{
1425 grant_lock(r, lkb);
1426 if (is_master_copy(lkb))
1427 send_grant(r, lkb);
1428 else
1429 queue_cast(r, lkb, 0);
1430}
1431
7d3c1feb
DT
1432/* The special CONVDEADLK, ALTPR and ALTCW flags allow the master to
1433 change the granted/requested modes. We're munging things accordingly in
1434 the process copy.
1435 CONVDEADLK: our grmode may have been forced down to NL to resolve a
1436 conversion deadlock
1437 ALTPR/ALTCW: our rqmode may have been changed to PR or CW to become
1438 compatible with other granted locks */
1439
2a7ce0ed 1440static void munge_demoted(struct dlm_lkb *lkb)
7d3c1feb 1441{
7d3c1feb
DT
1442 if (lkb->lkb_rqmode == DLM_LOCK_IV || lkb->lkb_grmode == DLM_LOCK_IV) {
1443 log_print("munge_demoted %x invalid modes gr %d rq %d",
1444 lkb->lkb_id, lkb->lkb_grmode, lkb->lkb_rqmode);
1445 return;
1446 }
1447
1448 lkb->lkb_grmode = DLM_LOCK_NL;
1449}
1450
1451static void munge_altmode(struct dlm_lkb *lkb, struct dlm_message *ms)
1452{
1453 if (ms->m_type != DLM_MSG_REQUEST_REPLY &&
1454 ms->m_type != DLM_MSG_GRANT) {
1455 log_print("munge_altmode %x invalid reply type %d",
1456 lkb->lkb_id, ms->m_type);
1457 return;
1458 }
1459
1460 if (lkb->lkb_exflags & DLM_LKF_ALTPR)
1461 lkb->lkb_rqmode = DLM_LOCK_PR;
1462 else if (lkb->lkb_exflags & DLM_LKF_ALTCW)
1463 lkb->lkb_rqmode = DLM_LOCK_CW;
1464 else {
1465 log_print("munge_altmode invalid exflags %x", lkb->lkb_exflags);
1466 dlm_print_lkb(lkb);
1467 }
1468}
1469
e7fd4179
DT
1470static inline int first_in_list(struct dlm_lkb *lkb, struct list_head *head)
1471{
1472 struct dlm_lkb *first = list_entry(head->next, struct dlm_lkb,
1473 lkb_statequeue);
1474 if (lkb->lkb_id == first->lkb_id)
90135925 1475 return 1;
e7fd4179 1476
90135925 1477 return 0;
e7fd4179
DT
1478}
1479
e7fd4179
DT
1480/* Check if the given lkb conflicts with another lkb on the queue. */
1481
1482static int queue_conflict(struct list_head *head, struct dlm_lkb *lkb)
1483{
1484 struct dlm_lkb *this;
1485
1486 list_for_each_entry(this, head, lkb_statequeue) {
1487 if (this == lkb)
1488 continue;
3bcd3687 1489 if (!modes_compat(this, lkb))
90135925 1490 return 1;
e7fd4179 1491 }
90135925 1492 return 0;
e7fd4179
DT
1493}
1494
1495/*
1496 * "A conversion deadlock arises with a pair of lock requests in the converting
1497 * queue for one resource. The granted mode of each lock blocks the requested
1498 * mode of the other lock."
1499 *
c85d65e9
DT
1500 * Part 2: if the granted mode of lkb is preventing an earlier lkb in the
1501 * convert queue from being granted, then deadlk/demote lkb.
e7fd4179
DT
1502 *
1503 * Example:
1504 * Granted Queue: empty
1505 * Convert Queue: NL->EX (first lock)
1506 * PR->EX (second lock)
1507 *
1508 * The first lock can't be granted because of the granted mode of the second
1509 * lock and the second lock can't be granted because it's not first in the
c85d65e9
DT
1510 * list. We either cancel lkb's conversion (PR->EX) and return EDEADLK, or we
1511 * demote the granted mode of lkb (from PR to NL) if it has the CONVDEADLK
1512 * flag set and return DEMOTED in the lksb flags.
e7fd4179 1513 *
c85d65e9
DT
1514 * Originally, this function detected conv-deadlk in a more limited scope:
1515 * - if !modes_compat(lkb1, lkb2) && !modes_compat(lkb2, lkb1), or
1516 * - if lkb1 was the first entry in the queue (not just earlier), and was
1517 * blocked by the granted mode of lkb2, and there was nothing on the
1518 * granted queue preventing lkb1 from being granted immediately, i.e.
1519 * lkb2 was the only thing preventing lkb1 from being granted.
1520 *
1521 * That second condition meant we'd only say there was conv-deadlk if
1522 * resolving it (by demotion) would lead to the first lock on the convert
1523 * queue being granted right away. It allowed conversion deadlocks to exist
1524 * between locks on the convert queue while they couldn't be granted anyway.
1525 *
1526 * Now, we detect and take action on conversion deadlocks immediately when
1527 * they're created, even if they may not be immediately consequential. If
1528 * lkb1 exists anywhere in the convert queue and lkb2 comes in with a granted
1529 * mode that would prevent lkb1's conversion from being granted, we do a
1530 * deadlk/demote on lkb2 right away and don't let it onto the convert queue.
1531 * I think this means that the lkb_is_ahead condition below should always
1532 * be zero, i.e. there will never be conv-deadlk between two locks that are
1533 * both already on the convert queue.
e7fd4179
DT
1534 */
1535
c85d65e9 1536static int conversion_deadlock_detect(struct dlm_rsb *r, struct dlm_lkb *lkb2)
e7fd4179 1537{
c85d65e9
DT
1538 struct dlm_lkb *lkb1;
1539 int lkb_is_ahead = 0;
e7fd4179 1540
c85d65e9
DT
1541 list_for_each_entry(lkb1, &r->res_convertqueue, lkb_statequeue) {
1542 if (lkb1 == lkb2) {
1543 lkb_is_ahead = 1;
e7fd4179
DT
1544 continue;
1545 }
1546
c85d65e9
DT
1547 if (!lkb_is_ahead) {
1548 if (!modes_compat(lkb2, lkb1))
1549 return 1;
1550 } else {
1551 if (!modes_compat(lkb2, lkb1) &&
1552 !modes_compat(lkb1, lkb2))
1553 return 1;
1554 }
e7fd4179 1555 }
90135925 1556 return 0;
e7fd4179
DT
1557}
1558
1559/*
1560 * Return 1 if the lock can be granted, 0 otherwise.
1561 * Also detect and resolve conversion deadlocks.
1562 *
1563 * lkb is the lock to be granted
1564 *
1565 * now is 1 if the function is being called in the context of the
1566 * immediate request, it is 0 if called later, after the lock has been
1567 * queued.
1568 *
1569 * References are from chapter 6 of "VAXcluster Principles" by Roy Davis
1570 */
1571
1572static int _can_be_granted(struct dlm_rsb *r, struct dlm_lkb *lkb, int now)
1573{
1574 int8_t conv = (lkb->lkb_grmode != DLM_LOCK_IV);
1575
1576 /*
1577 * 6-10: Version 5.4 introduced an option to address the phenomenon of
1578 * a new request for a NL mode lock being blocked.
1579 *
1580 * 6-11: If the optional EXPEDITE flag is used with the new NL mode
1581 * request, then it would be granted. In essence, the use of this flag
1582 * tells the Lock Manager to expedite theis request by not considering
1583 * what may be in the CONVERTING or WAITING queues... As of this
1584 * writing, the EXPEDITE flag can be used only with new requests for NL
1585 * mode locks. This flag is not valid for conversion requests.
1586 *
1587 * A shortcut. Earlier checks return an error if EXPEDITE is used in a
1588 * conversion or used with a non-NL requested mode. We also know an
1589 * EXPEDITE request is always granted immediately, so now must always
1590 * be 1. The full condition to grant an expedite request: (now &&
1591 * !conv && lkb->rqmode == DLM_LOCK_NL && (flags & EXPEDITE)) can
1592 * therefore be shortened to just checking the flag.
1593 */
1594
1595 if (lkb->lkb_exflags & DLM_LKF_EXPEDITE)
90135925 1596 return 1;
e7fd4179
DT
1597
1598 /*
1599 * A shortcut. Without this, !queue_conflict(grantqueue, lkb) would be
1600 * added to the remaining conditions.
1601 */
1602
1603 if (queue_conflict(&r->res_grantqueue, lkb))
1604 goto out;
1605
1606 /*
1607 * 6-3: By default, a conversion request is immediately granted if the
1608 * requested mode is compatible with the modes of all other granted
1609 * locks
1610 */
1611
1612 if (queue_conflict(&r->res_convertqueue, lkb))
1613 goto out;
1614
1615 /*
1616 * 6-5: But the default algorithm for deciding whether to grant or
1617 * queue conversion requests does not by itself guarantee that such
1618 * requests are serviced on a "first come first serve" basis. This, in
1619 * turn, can lead to a phenomenon known as "indefinate postponement".
1620 *
1621 * 6-7: This issue is dealt with by using the optional QUECVT flag with
1622 * the system service employed to request a lock conversion. This flag
1623 * forces certain conversion requests to be queued, even if they are
1624 * compatible with the granted modes of other locks on the same
1625 * resource. Thus, the use of this flag results in conversion requests
1626 * being ordered on a "first come first servce" basis.
1627 *
1628 * DCT: This condition is all about new conversions being able to occur
1629 * "in place" while the lock remains on the granted queue (assuming
1630 * nothing else conflicts.) IOW if QUECVT isn't set, a conversion
1631 * doesn't _have_ to go onto the convert queue where it's processed in
1632 * order. The "now" variable is necessary to distinguish converts
1633 * being received and processed for the first time now, because once a
1634 * convert is moved to the conversion queue the condition below applies
1635 * requiring fifo granting.
1636 */
1637
1638 if (now && conv && !(lkb->lkb_exflags & DLM_LKF_QUECVT))
90135925 1639 return 1;
e7fd4179
DT
1640
1641 /*
3bcd3687
DT
1642 * The NOORDER flag is set to avoid the standard vms rules on grant
1643 * order.
e7fd4179
DT
1644 */
1645
1646 if (lkb->lkb_exflags & DLM_LKF_NOORDER)
90135925 1647 return 1;
e7fd4179
DT
1648
1649 /*
1650 * 6-3: Once in that queue [CONVERTING], a conversion request cannot be
1651 * granted until all other conversion requests ahead of it are granted
1652 * and/or canceled.
1653 */
1654
1655 if (!now && conv && first_in_list(lkb, &r->res_convertqueue))
90135925 1656 return 1;
e7fd4179
DT
1657
1658 /*
1659 * 6-4: By default, a new request is immediately granted only if all
1660 * three of the following conditions are satisfied when the request is
1661 * issued:
1662 * - The queue of ungranted conversion requests for the resource is
1663 * empty.
1664 * - The queue of ungranted new requests for the resource is empty.
1665 * - The mode of the new request is compatible with the most
1666 * restrictive mode of all granted locks on the resource.
1667 */
1668
1669 if (now && !conv && list_empty(&r->res_convertqueue) &&
1670 list_empty(&r->res_waitqueue))
90135925 1671 return 1;
e7fd4179
DT
1672
1673 /*
1674 * 6-4: Once a lock request is in the queue of ungranted new requests,
1675 * it cannot be granted until the queue of ungranted conversion
1676 * requests is empty, all ungranted new requests ahead of it are
1677 * granted and/or canceled, and it is compatible with the granted mode
1678 * of the most restrictive lock granted on the resource.
1679 */
1680
1681 if (!now && !conv && list_empty(&r->res_convertqueue) &&
1682 first_in_list(lkb, &r->res_waitqueue))
90135925 1683 return 1;
e7fd4179 1684 out:
90135925 1685 return 0;
e7fd4179
DT
1686}
1687
c85d65e9
DT
1688static int can_be_granted(struct dlm_rsb *r, struct dlm_lkb *lkb, int now,
1689 int *err)
e7fd4179 1690{
e7fd4179
DT
1691 int rv;
1692 int8_t alt = 0, rqmode = lkb->lkb_rqmode;
c85d65e9
DT
1693 int8_t is_convert = (lkb->lkb_grmode != DLM_LOCK_IV);
1694
1695 if (err)
1696 *err = 0;
e7fd4179
DT
1697
1698 rv = _can_be_granted(r, lkb, now);
1699 if (rv)
1700 goto out;
1701
c85d65e9
DT
1702 /*
1703 * The CONVDEADLK flag is non-standard and tells the dlm to resolve
1704 * conversion deadlocks by demoting grmode to NL, otherwise the dlm
1705 * cancels one of the locks.
1706 */
1707
1708 if (is_convert && can_be_queued(lkb) &&
1709 conversion_deadlock_detect(r, lkb)) {
1710 if (lkb->lkb_exflags & DLM_LKF_CONVDEADLK) {
1711 lkb->lkb_grmode = DLM_LOCK_NL;
1712 lkb->lkb_sbflags |= DLM_SBF_DEMOTED;
1713 } else if (!(lkb->lkb_exflags & DLM_LKF_NODLCKWT)) {
1714 if (err)
1715 *err = -EDEADLK;
1716 else {
1717 log_print("can_be_granted deadlock %x now %d",
1718 lkb->lkb_id, now);
1719 dlm_dump_rsb(r);
1720 }
1721 }
e7fd4179 1722 goto out;
c85d65e9 1723 }
e7fd4179 1724
c85d65e9
DT
1725 /*
1726 * The ALTPR and ALTCW flags are non-standard and tell the dlm to try
1727 * to grant a request in a mode other than the normal rqmode. It's a
1728 * simple way to provide a big optimization to applications that can
1729 * use them.
1730 */
1731
1732 if (rqmode != DLM_LOCK_PR && (lkb->lkb_exflags & DLM_LKF_ALTPR))
e7fd4179 1733 alt = DLM_LOCK_PR;
c85d65e9 1734 else if (rqmode != DLM_LOCK_CW && (lkb->lkb_exflags & DLM_LKF_ALTCW))
e7fd4179
DT
1735 alt = DLM_LOCK_CW;
1736
1737 if (alt) {
1738 lkb->lkb_rqmode = alt;
1739 rv = _can_be_granted(r, lkb, now);
1740 if (rv)
1741 lkb->lkb_sbflags |= DLM_SBF_ALTMODE;
1742 else
1743 lkb->lkb_rqmode = rqmode;
1744 }
1745 out:
1746 return rv;
1747}
1748
c85d65e9
DT
1749/* FIXME: I don't think that can_be_granted() can/will demote or find deadlock
1750 for locks pending on the convert list. Once verified (watch for these
1751 log_prints), we should be able to just call _can_be_granted() and not
1752 bother with the demote/deadlk cases here (and there's no easy way to deal
1753 with a deadlk here, we'd have to generate something like grant_lock with
1754 the deadlk error.) */
1755
36509258
DT
1756/* Returns the highest requested mode of all blocked conversions; sets
1757 cw if there's a blocked conversion to DLM_LOCK_CW. */
c85d65e9 1758
36509258 1759static int grant_pending_convert(struct dlm_rsb *r, int high, int *cw)
e7fd4179
DT
1760{
1761 struct dlm_lkb *lkb, *s;
1762 int hi, demoted, quit, grant_restart, demote_restart;
c85d65e9 1763 int deadlk;
e7fd4179
DT
1764
1765 quit = 0;
1766 restart:
1767 grant_restart = 0;
1768 demote_restart = 0;
1769 hi = DLM_LOCK_IV;
1770
1771 list_for_each_entry_safe(lkb, s, &r->res_convertqueue, lkb_statequeue) {
1772 demoted = is_demoted(lkb);
c85d65e9
DT
1773 deadlk = 0;
1774
1775 if (can_be_granted(r, lkb, 0, &deadlk)) {
e7fd4179
DT
1776 grant_lock_pending(r, lkb);
1777 grant_restart = 1;
c85d65e9 1778 continue;
e7fd4179 1779 }
c85d65e9
DT
1780
1781 if (!demoted && is_demoted(lkb)) {
1782 log_print("WARN: pending demoted %x node %d %s",
1783 lkb->lkb_id, lkb->lkb_nodeid, r->res_name);
1784 demote_restart = 1;
1785 continue;
1786 }
1787
1788 if (deadlk) {
1789 log_print("WARN: pending deadlock %x node %d %s",
1790 lkb->lkb_id, lkb->lkb_nodeid, r->res_name);
1791 dlm_dump_rsb(r);
1792 continue;
1793 }
1794
1795 hi = max_t(int, lkb->lkb_rqmode, hi);
36509258
DT
1796
1797 if (cw && lkb->lkb_rqmode == DLM_LOCK_CW)
1798 *cw = 1;
e7fd4179
DT
1799 }
1800
1801 if (grant_restart)
1802 goto restart;
1803 if (demote_restart && !quit) {
1804 quit = 1;
1805 goto restart;
1806 }
1807
1808 return max_t(int, high, hi);
1809}
1810
36509258 1811static int grant_pending_wait(struct dlm_rsb *r, int high, int *cw)
e7fd4179
DT
1812{
1813 struct dlm_lkb *lkb, *s;
1814
1815 list_for_each_entry_safe(lkb, s, &r->res_waitqueue, lkb_statequeue) {
c85d65e9 1816 if (can_be_granted(r, lkb, 0, NULL))
e7fd4179 1817 grant_lock_pending(r, lkb);
36509258 1818 else {
e7fd4179 1819 high = max_t(int, lkb->lkb_rqmode, high);
36509258
DT
1820 if (lkb->lkb_rqmode == DLM_LOCK_CW)
1821 *cw = 1;
1822 }
e7fd4179
DT
1823 }
1824
1825 return high;
1826}
1827
36509258
DT
1828/* cw of 1 means there's a lock with a rqmode of DLM_LOCK_CW that's blocked
1829 on either the convert or waiting queue.
1830 high is the largest rqmode of all locks blocked on the convert or
1831 waiting queue. */
1832
1833static int lock_requires_bast(struct dlm_lkb *gr, int high, int cw)
1834{
1835 if (gr->lkb_grmode == DLM_LOCK_PR && cw) {
1836 if (gr->lkb_highbast < DLM_LOCK_EX)
1837 return 1;
1838 return 0;
1839 }
1840
1841 if (gr->lkb_highbast < high &&
1842 !__dlm_compat_matrix[gr->lkb_grmode+1][high+1])
1843 return 1;
1844 return 0;
1845}
1846
e7fd4179
DT
1847static void grant_pending_locks(struct dlm_rsb *r)
1848{
1849 struct dlm_lkb *lkb, *s;
1850 int high = DLM_LOCK_IV;
36509258 1851 int cw = 0;
e7fd4179 1852
a345da3e 1853 DLM_ASSERT(is_master(r), dlm_dump_rsb(r););
e7fd4179 1854
36509258
DT
1855 high = grant_pending_convert(r, high, &cw);
1856 high = grant_pending_wait(r, high, &cw);
e7fd4179
DT
1857
1858 if (high == DLM_LOCK_IV)
1859 return;
1860
1861 /*
1862 * If there are locks left on the wait/convert queue then send blocking
1863 * ASTs to granted locks based on the largest requested mode (high)
36509258 1864 * found above.
e7fd4179
DT
1865 */
1866
1867 list_for_each_entry_safe(lkb, s, &r->res_grantqueue, lkb_statequeue) {
e5dae548 1868 if (lkb->lkb_bastfn && lock_requires_bast(lkb, high, cw)) {
329fc4c3
DT
1869 if (cw && high == DLM_LOCK_PR &&
1870 lkb->lkb_grmode == DLM_LOCK_PR)
36509258
DT
1871 queue_bast(r, lkb, DLM_LOCK_CW);
1872 else
1873 queue_bast(r, lkb, high);
e7fd4179
DT
1874 lkb->lkb_highbast = high;
1875 }
1876 }
1877}
1878
36509258
DT
1879static int modes_require_bast(struct dlm_lkb *gr, struct dlm_lkb *rq)
1880{
1881 if ((gr->lkb_grmode == DLM_LOCK_PR && rq->lkb_rqmode == DLM_LOCK_CW) ||
1882 (gr->lkb_grmode == DLM_LOCK_CW && rq->lkb_rqmode == DLM_LOCK_PR)) {
1883 if (gr->lkb_highbast < DLM_LOCK_EX)
1884 return 1;
1885 return 0;
1886 }
1887
1888 if (gr->lkb_highbast < rq->lkb_rqmode && !modes_compat(gr, rq))
1889 return 1;
1890 return 0;
1891}
1892
e7fd4179
DT
1893static void send_bast_queue(struct dlm_rsb *r, struct list_head *head,
1894 struct dlm_lkb *lkb)
1895{
1896 struct dlm_lkb *gr;
1897
1898 list_for_each_entry(gr, head, lkb_statequeue) {
314dd2a0
SW
1899 /* skip self when sending basts to convertqueue */
1900 if (gr == lkb)
1901 continue;
e5dae548 1902 if (gr->lkb_bastfn && modes_require_bast(gr, lkb)) {
e7fd4179
DT
1903 queue_bast(r, gr, lkb->lkb_rqmode);
1904 gr->lkb_highbast = lkb->lkb_rqmode;
1905 }
1906 }
1907}
1908
1909static void send_blocking_asts(struct dlm_rsb *r, struct dlm_lkb *lkb)
1910{
1911 send_bast_queue(r, &r->res_grantqueue, lkb);
1912}
1913
1914static void send_blocking_asts_all(struct dlm_rsb *r, struct dlm_lkb *lkb)
1915{
1916 send_bast_queue(r, &r->res_grantqueue, lkb);
1917 send_bast_queue(r, &r->res_convertqueue, lkb);
1918}
1919
1920/* set_master(r, lkb) -- set the master nodeid of a resource
1921
1922 The purpose of this function is to set the nodeid field in the given
1923 lkb using the nodeid field in the given rsb. If the rsb's nodeid is
1924 known, it can just be copied to the lkb and the function will return
1925 0. If the rsb's nodeid is _not_ known, it needs to be looked up
1926 before it can be copied to the lkb.
1927
1928 When the rsb nodeid is being looked up remotely, the initial lkb
1929 causing the lookup is kept on the ls_waiters list waiting for the
1930 lookup reply. Other lkb's waiting for the same rsb lookup are kept
1931 on the rsb's res_lookup list until the master is verified.
1932
1933 Return values:
1934 0: nodeid is set in rsb/lkb and the caller should go ahead and use it
1935 1: the rsb master is not available and the lkb has been placed on
1936 a wait queue
1937*/
1938
1939static int set_master(struct dlm_rsb *r, struct dlm_lkb *lkb)
1940{
1941 struct dlm_ls *ls = r->res_ls;
755b5eb8 1942 int i, error, dir_nodeid, ret_nodeid, our_nodeid = dlm_our_nodeid();
e7fd4179
DT
1943
1944 if (rsb_flag(r, RSB_MASTER_UNCERTAIN)) {
1945 rsb_clear_flag(r, RSB_MASTER_UNCERTAIN);
1946 r->res_first_lkid = lkb->lkb_id;
1947 lkb->lkb_nodeid = r->res_nodeid;
1948 return 0;
1949 }
1950
1951 if (r->res_first_lkid && r->res_first_lkid != lkb->lkb_id) {
1952 list_add_tail(&lkb->lkb_rsb_lookup, &r->res_lookup);
1953 return 1;
1954 }
1955
1956 if (r->res_nodeid == 0) {
1957 lkb->lkb_nodeid = 0;
1958 return 0;
1959 }
1960
1961 if (r->res_nodeid > 0) {
1962 lkb->lkb_nodeid = r->res_nodeid;
1963 return 0;
1964 }
1965
a345da3e 1966 DLM_ASSERT(r->res_nodeid == -1, dlm_dump_rsb(r););
e7fd4179
DT
1967
1968 dir_nodeid = dlm_dir_nodeid(r);
1969
1970 if (dir_nodeid != our_nodeid) {
1971 r->res_first_lkid = lkb->lkb_id;
1972 send_lookup(r, lkb);
1973 return 1;
1974 }
1975
755b5eb8 1976 for (i = 0; i < 2; i++) {
e7fd4179
DT
1977 /* It's possible for dlm_scand to remove an old rsb for
1978 this same resource from the toss list, us to create
1979 a new one, look up the master locally, and find it
1980 already exists just before dlm_scand does the
1981 dir_remove() on the previous rsb. */
1982
1983 error = dlm_dir_lookup(ls, our_nodeid, r->res_name,
1984 r->res_length, &ret_nodeid);
1985 if (!error)
1986 break;
1987 log_debug(ls, "dir_lookup error %d %s", error, r->res_name);
1988 schedule();
1989 }
755b5eb8
DT
1990 if (error && error != -EEXIST)
1991 return error;
e7fd4179
DT
1992
1993 if (ret_nodeid == our_nodeid) {
1994 r->res_first_lkid = 0;
1995 r->res_nodeid = 0;
1996 lkb->lkb_nodeid = 0;
1997 } else {
1998 r->res_first_lkid = lkb->lkb_id;
1999 r->res_nodeid = ret_nodeid;
2000 lkb->lkb_nodeid = ret_nodeid;
2001 }
2002 return 0;
2003}
2004
2005static void process_lookup_list(struct dlm_rsb *r)
2006{
2007 struct dlm_lkb *lkb, *safe;
2008
2009 list_for_each_entry_safe(lkb, safe, &r->res_lookup, lkb_rsb_lookup) {
ef0c2bb0 2010 list_del_init(&lkb->lkb_rsb_lookup);
e7fd4179
DT
2011 _request_lock(r, lkb);
2012 schedule();
2013 }
2014}
2015
2016/* confirm_master -- confirm (or deny) an rsb's master nodeid */
2017
2018static void confirm_master(struct dlm_rsb *r, int error)
2019{
2020 struct dlm_lkb *lkb;
2021
2022 if (!r->res_first_lkid)
2023 return;
2024
2025 switch (error) {
2026 case 0:
2027 case -EINPROGRESS:
2028 r->res_first_lkid = 0;
2029 process_lookup_list(r);
2030 break;
2031
2032 case -EAGAIN:
aec64e1b
DT
2033 case -EBADR:
2034 case -ENOTBLK:
2035 /* the remote request failed and won't be retried (it was
2036 a NOQUEUE, or has been canceled/unlocked); make a waiting
2037 lkb the first_lkid */
e7fd4179
DT
2038
2039 r->res_first_lkid = 0;
2040
2041 if (!list_empty(&r->res_lookup)) {
2042 lkb = list_entry(r->res_lookup.next, struct dlm_lkb,
2043 lkb_rsb_lookup);
ef0c2bb0 2044 list_del_init(&lkb->lkb_rsb_lookup);
e7fd4179
DT
2045 r->res_first_lkid = lkb->lkb_id;
2046 _request_lock(r, lkb);
761b9d3f 2047 }
e7fd4179
DT
2048 break;
2049
2050 default:
2051 log_error(r->res_ls, "confirm_master unknown error %d", error);
2052 }
2053}
2054
2055static int set_lock_args(int mode, struct dlm_lksb *lksb, uint32_t flags,
e5dae548
DT
2056 int namelen, unsigned long timeout_cs,
2057 void (*ast) (void *astparam),
2058 void *astparam,
2059 void (*bast) (void *astparam, int mode),
2060 struct dlm_args *args)
e7fd4179
DT
2061{
2062 int rv = -EINVAL;
2063
2064 /* check for invalid arg usage */
2065
2066 if (mode < 0 || mode > DLM_LOCK_EX)
2067 goto out;
2068
2069 if (!(flags & DLM_LKF_CONVERT) && (namelen > DLM_RESNAME_MAXLEN))
2070 goto out;
2071
2072 if (flags & DLM_LKF_CANCEL)
2073 goto out;
2074
2075 if (flags & DLM_LKF_QUECVT && !(flags & DLM_LKF_CONVERT))
2076 goto out;
2077
2078 if (flags & DLM_LKF_CONVDEADLK && !(flags & DLM_LKF_CONVERT))
2079 goto out;
2080
2081 if (flags & DLM_LKF_CONVDEADLK && flags & DLM_LKF_NOQUEUE)
2082 goto out;
2083
2084 if (flags & DLM_LKF_EXPEDITE && flags & DLM_LKF_CONVERT)
2085 goto out;
2086
2087 if (flags & DLM_LKF_EXPEDITE && flags & DLM_LKF_QUECVT)
2088 goto out;
2089
2090 if (flags & DLM_LKF_EXPEDITE && flags & DLM_LKF_NOQUEUE)
2091 goto out;
2092
2093 if (flags & DLM_LKF_EXPEDITE && mode != DLM_LOCK_NL)
2094 goto out;
2095
2096 if (!ast || !lksb)
2097 goto out;
2098
2099 if (flags & DLM_LKF_VALBLK && !lksb->sb_lvbptr)
2100 goto out;
2101
e7fd4179
DT
2102 if (flags & DLM_LKF_CONVERT && !lksb->sb_lkid)
2103 goto out;
2104
2105 /* these args will be copied to the lkb in validate_lock_args,
2106 it cannot be done now because when converting locks, fields in
2107 an active lkb cannot be modified before locking the rsb */
2108
2109 args->flags = flags;
e5dae548
DT
2110 args->astfn = ast;
2111 args->astparam = astparam;
2112 args->bastfn = bast;
d7db923e 2113 args->timeout = timeout_cs;
e7fd4179
DT
2114 args->mode = mode;
2115 args->lksb = lksb;
e7fd4179
DT
2116 rv = 0;
2117 out:
2118 return rv;
2119}
2120
2121static int set_unlock_args(uint32_t flags, void *astarg, struct dlm_args *args)
2122{
2123 if (flags & ~(DLM_LKF_CANCEL | DLM_LKF_VALBLK | DLM_LKF_IVVALBLK |
2124 DLM_LKF_FORCEUNLOCK))
2125 return -EINVAL;
2126
ef0c2bb0
DT
2127 if (flags & DLM_LKF_CANCEL && flags & DLM_LKF_FORCEUNLOCK)
2128 return -EINVAL;
2129
e7fd4179 2130 args->flags = flags;
e5dae548 2131 args->astparam = astarg;
e7fd4179
DT
2132 return 0;
2133}
2134
2135static int validate_lock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
2136 struct dlm_args *args)
2137{
2138 int rv = -EINVAL;
2139
2140 if (args->flags & DLM_LKF_CONVERT) {
2141 if (lkb->lkb_flags & DLM_IFL_MSTCPY)
2142 goto out;
2143
2144 if (args->flags & DLM_LKF_QUECVT &&
2145 !__quecvt_compat_matrix[lkb->lkb_grmode+1][args->mode+1])
2146 goto out;
2147
2148 rv = -EBUSY;
2149 if (lkb->lkb_status != DLM_LKSTS_GRANTED)
2150 goto out;
2151
2152 if (lkb->lkb_wait_type)
2153 goto out;
ef0c2bb0
DT
2154
2155 if (is_overlap(lkb))
2156 goto out;
e7fd4179
DT
2157 }
2158
2159 lkb->lkb_exflags = args->flags;
2160 lkb->lkb_sbflags = 0;
e5dae548 2161 lkb->lkb_astfn = args->astfn;
e7fd4179 2162 lkb->lkb_astparam = args->astparam;
e5dae548 2163 lkb->lkb_bastfn = args->bastfn;
e7fd4179
DT
2164 lkb->lkb_rqmode = args->mode;
2165 lkb->lkb_lksb = args->lksb;
2166 lkb->lkb_lvbptr = args->lksb->sb_lvbptr;
2167 lkb->lkb_ownpid = (int) current->pid;
d7db923e 2168 lkb->lkb_timeout_cs = args->timeout;
e7fd4179
DT
2169 rv = 0;
2170 out:
43279e53
DT
2171 if (rv)
2172 log_debug(ls, "validate_lock_args %d %x %x %x %d %d %s",
2173 rv, lkb->lkb_id, lkb->lkb_flags, args->flags,
2174 lkb->lkb_status, lkb->lkb_wait_type,
2175 lkb->lkb_resource->res_name);
e7fd4179
DT
2176 return rv;
2177}
2178
ef0c2bb0
DT
2179/* when dlm_unlock() sees -EBUSY with CANCEL/FORCEUNLOCK it returns 0
2180 for success */
2181
2182/* note: it's valid for lkb_nodeid/res_nodeid to be -1 when we get here
2183 because there may be a lookup in progress and it's valid to do
2184 cancel/unlockf on it */
2185
e7fd4179
DT
2186static int validate_unlock_args(struct dlm_lkb *lkb, struct dlm_args *args)
2187{
ef0c2bb0 2188 struct dlm_ls *ls = lkb->lkb_resource->res_ls;
e7fd4179
DT
2189 int rv = -EINVAL;
2190
ef0c2bb0
DT
2191 if (lkb->lkb_flags & DLM_IFL_MSTCPY) {
2192 log_error(ls, "unlock on MSTCPY %x", lkb->lkb_id);
2193 dlm_print_lkb(lkb);
e7fd4179 2194 goto out;
ef0c2bb0 2195 }
e7fd4179 2196
ef0c2bb0
DT
2197 /* an lkb may still exist even though the lock is EOL'ed due to a
2198 cancel, unlock or failed noqueue request; an app can't use these
2199 locks; return same error as if the lkid had not been found at all */
e7fd4179 2200
ef0c2bb0
DT
2201 if (lkb->lkb_flags & DLM_IFL_ENDOFLIFE) {
2202 log_debug(ls, "unlock on ENDOFLIFE %x", lkb->lkb_id);
2203 rv = -ENOENT;
e7fd4179 2204 goto out;
ef0c2bb0 2205 }
e7fd4179 2206
ef0c2bb0
DT
2207 /* an lkb may be waiting for an rsb lookup to complete where the
2208 lookup was initiated by another lock */
2209
42dc1601
DT
2210 if (!list_empty(&lkb->lkb_rsb_lookup)) {
2211 if (args->flags & (DLM_LKF_CANCEL | DLM_LKF_FORCEUNLOCK)) {
ef0c2bb0
DT
2212 log_debug(ls, "unlock on rsb_lookup %x", lkb->lkb_id);
2213 list_del_init(&lkb->lkb_rsb_lookup);
2214 queue_cast(lkb->lkb_resource, lkb,
2215 args->flags & DLM_LKF_CANCEL ?
2216 -DLM_ECANCEL : -DLM_EUNLOCK);
2217 unhold_lkb(lkb); /* undoes create_lkb() */
ef0c2bb0 2218 }
42dc1601
DT
2219 /* caller changes -EBUSY to 0 for CANCEL and FORCEUNLOCK */
2220 rv = -EBUSY;
2221 goto out;
ef0c2bb0
DT
2222 }
2223
2224 /* cancel not allowed with another cancel/unlock in progress */
2225
2226 if (args->flags & DLM_LKF_CANCEL) {
2227 if (lkb->lkb_exflags & DLM_LKF_CANCEL)
2228 goto out;
2229
2230 if (is_overlap(lkb))
2231 goto out;
2232
3ae1acf9
DT
2233 /* don't let scand try to do a cancel */
2234 del_timeout(lkb);
2235
ef0c2bb0
DT
2236 if (lkb->lkb_flags & DLM_IFL_RESEND) {
2237 lkb->lkb_flags |= DLM_IFL_OVERLAP_CANCEL;
2238 rv = -EBUSY;
2239 goto out;
2240 }
2241
a536e381
DT
2242 /* there's nothing to cancel */
2243 if (lkb->lkb_status == DLM_LKSTS_GRANTED &&
2244 !lkb->lkb_wait_type) {
2245 rv = -EBUSY;
2246 goto out;
2247 }
2248
ef0c2bb0
DT
2249 switch (lkb->lkb_wait_type) {
2250 case DLM_MSG_LOOKUP:
2251 case DLM_MSG_REQUEST:
2252 lkb->lkb_flags |= DLM_IFL_OVERLAP_CANCEL;
2253 rv = -EBUSY;
2254 goto out;
2255 case DLM_MSG_UNLOCK:
2256 case DLM_MSG_CANCEL:
2257 goto out;
2258 }
2259 /* add_to_waiters() will set OVERLAP_CANCEL */
2260 goto out_ok;
2261 }
2262
2263 /* do we need to allow a force-unlock if there's a normal unlock
2264 already in progress? in what conditions could the normal unlock
2265 fail such that we'd want to send a force-unlock to be sure? */
2266
2267 if (args->flags & DLM_LKF_FORCEUNLOCK) {
2268 if (lkb->lkb_exflags & DLM_LKF_FORCEUNLOCK)
2269 goto out;
2270
2271 if (is_overlap_unlock(lkb))
2272 goto out;
e7fd4179 2273
3ae1acf9
DT
2274 /* don't let scand try to do a cancel */
2275 del_timeout(lkb);
2276
ef0c2bb0
DT
2277 if (lkb->lkb_flags & DLM_IFL_RESEND) {
2278 lkb->lkb_flags |= DLM_IFL_OVERLAP_UNLOCK;
2279 rv = -EBUSY;
2280 goto out;
2281 }
2282
2283 switch (lkb->lkb_wait_type) {
2284 case DLM_MSG_LOOKUP:
2285 case DLM_MSG_REQUEST:
2286 lkb->lkb_flags |= DLM_IFL_OVERLAP_UNLOCK;
2287 rv = -EBUSY;
2288 goto out;
2289 case DLM_MSG_UNLOCK:
2290 goto out;
2291 }
2292 /* add_to_waiters() will set OVERLAP_UNLOCK */
2293 goto out_ok;
2294 }
2295
2296 /* normal unlock not allowed if there's any op in progress */
e7fd4179 2297 rv = -EBUSY;
ef0c2bb0 2298 if (lkb->lkb_wait_type || lkb->lkb_wait_count)
e7fd4179
DT
2299 goto out;
2300
2301 out_ok:
ef0c2bb0
DT
2302 /* an overlapping op shouldn't blow away exflags from other op */
2303 lkb->lkb_exflags |= args->flags;
e7fd4179
DT
2304 lkb->lkb_sbflags = 0;
2305 lkb->lkb_astparam = args->astparam;
e7fd4179
DT
2306 rv = 0;
2307 out:
ef0c2bb0
DT
2308 if (rv)
2309 log_debug(ls, "validate_unlock_args %d %x %x %x %x %d %s", rv,
2310 lkb->lkb_id, lkb->lkb_flags, lkb->lkb_exflags,
2311 args->flags, lkb->lkb_wait_type,
2312 lkb->lkb_resource->res_name);
e7fd4179
DT
2313 return rv;
2314}
2315
2316/*
2317 * Four stage 4 varieties:
2318 * do_request(), do_convert(), do_unlock(), do_cancel()
2319 * These are called on the master node for the given lock and
2320 * from the central locking logic.
2321 */
2322
2323static int do_request(struct dlm_rsb *r, struct dlm_lkb *lkb)
2324{
2325 int error = 0;
2326
c85d65e9 2327 if (can_be_granted(r, lkb, 1, NULL)) {
e7fd4179
DT
2328 grant_lock(r, lkb);
2329 queue_cast(r, lkb, 0);
2330 goto out;
2331 }
2332
2333 if (can_be_queued(lkb)) {
2334 error = -EINPROGRESS;
2335 add_lkb(r, lkb, DLM_LKSTS_WAITING);
3ae1acf9 2336 add_timeout(lkb);
e7fd4179
DT
2337 goto out;
2338 }
2339
2340 error = -EAGAIN;
e7fd4179 2341 queue_cast(r, lkb, -EAGAIN);
e7fd4179
DT
2342 out:
2343 return error;
2344}
2345
cf6620ac
DT
2346static void do_request_effects(struct dlm_rsb *r, struct dlm_lkb *lkb,
2347 int error)
2348{
2349 switch (error) {
2350 case -EAGAIN:
2351 if (force_blocking_asts(lkb))
2352 send_blocking_asts_all(r, lkb);
2353 break;
2354 case -EINPROGRESS:
2355 send_blocking_asts(r, lkb);
2356 break;
2357 }
2358}
2359
e7fd4179
DT
2360static int do_convert(struct dlm_rsb *r, struct dlm_lkb *lkb)
2361{
2362 int error = 0;
c85d65e9 2363 int deadlk = 0;
e7fd4179
DT
2364
2365 /* changing an existing lock may allow others to be granted */
2366
c85d65e9 2367 if (can_be_granted(r, lkb, 1, &deadlk)) {
e7fd4179
DT
2368 grant_lock(r, lkb);
2369 queue_cast(r, lkb, 0);
e7fd4179
DT
2370 goto out;
2371 }
2372
c85d65e9
DT
2373 /* can_be_granted() detected that this lock would block in a conversion
2374 deadlock, so we leave it on the granted queue and return EDEADLK in
2375 the ast for the convert. */
2376
2377 if (deadlk) {
2378 /* it's left on the granted queue */
2379 log_debug(r->res_ls, "deadlock %x node %d sts%d g%d r%d %s",
2380 lkb->lkb_id, lkb->lkb_nodeid, lkb->lkb_status,
2381 lkb->lkb_grmode, lkb->lkb_rqmode, r->res_name);
2382 revert_lock(r, lkb);
2383 queue_cast(r, lkb, -EDEADLK);
2384 error = -EDEADLK;
2385 goto out;
2386 }
2387
7d3c1feb
DT
2388 /* is_demoted() means the can_be_granted() above set the grmode
2389 to NL, and left us on the granted queue. This auto-demotion
2390 (due to CONVDEADLK) might mean other locks, and/or this lock, are
2391 now grantable. We have to try to grant other converting locks
2392 before we try again to grant this one. */
2393
2394 if (is_demoted(lkb)) {
36509258 2395 grant_pending_convert(r, DLM_LOCK_IV, NULL);
7d3c1feb
DT
2396 if (_can_be_granted(r, lkb, 1)) {
2397 grant_lock(r, lkb);
2398 queue_cast(r, lkb, 0);
7d3c1feb
DT
2399 goto out;
2400 }
2401 /* else fall through and move to convert queue */
2402 }
2403
2404 if (can_be_queued(lkb)) {
e7fd4179
DT
2405 error = -EINPROGRESS;
2406 del_lkb(r, lkb);
2407 add_lkb(r, lkb, DLM_LKSTS_CONVERT);
3ae1acf9 2408 add_timeout(lkb);
e7fd4179
DT
2409 goto out;
2410 }
2411
2412 error = -EAGAIN;
e7fd4179 2413 queue_cast(r, lkb, -EAGAIN);
e7fd4179
DT
2414 out:
2415 return error;
2416}
2417
cf6620ac
DT
2418static void do_convert_effects(struct dlm_rsb *r, struct dlm_lkb *lkb,
2419 int error)
2420{
2421 switch (error) {
2422 case 0:
2423 grant_pending_locks(r);
2424 /* grant_pending_locks also sends basts */
2425 break;
2426 case -EAGAIN:
2427 if (force_blocking_asts(lkb))
2428 send_blocking_asts_all(r, lkb);
2429 break;
2430 case -EINPROGRESS:
2431 send_blocking_asts(r, lkb);
2432 break;
2433 }
2434}
2435
e7fd4179
DT
2436static int do_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2437{
2438 remove_lock(r, lkb);
2439 queue_cast(r, lkb, -DLM_EUNLOCK);
e7fd4179
DT
2440 return -DLM_EUNLOCK;
2441}
2442
cf6620ac
DT
2443static void do_unlock_effects(struct dlm_rsb *r, struct dlm_lkb *lkb,
2444 int error)
2445{
2446 grant_pending_locks(r);
2447}
2448
ef0c2bb0 2449/* returns: 0 did nothing, -DLM_ECANCEL canceled lock */
907b9bce 2450
e7fd4179
DT
2451static int do_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb)
2452{
ef0c2bb0
DT
2453 int error;
2454
2455 error = revert_lock(r, lkb);
2456 if (error) {
2457 queue_cast(r, lkb, -DLM_ECANCEL);
ef0c2bb0
DT
2458 return -DLM_ECANCEL;
2459 }
2460 return 0;
e7fd4179
DT
2461}
2462
cf6620ac
DT
2463static void do_cancel_effects(struct dlm_rsb *r, struct dlm_lkb *lkb,
2464 int error)
2465{
2466 if (error)
2467 grant_pending_locks(r);
2468}
2469
e7fd4179
DT
2470/*
2471 * Four stage 3 varieties:
2472 * _request_lock(), _convert_lock(), _unlock_lock(), _cancel_lock()
2473 */
2474
2475/* add a new lkb to a possibly new rsb, called by requesting process */
2476
2477static int _request_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2478{
2479 int error;
2480
2481 /* set_master: sets lkb nodeid from r */
2482
2483 error = set_master(r, lkb);
2484 if (error < 0)
2485 goto out;
2486 if (error) {
2487 error = 0;
2488 goto out;
2489 }
2490
cf6620ac 2491 if (is_remote(r)) {
e7fd4179
DT
2492 /* receive_request() calls do_request() on remote node */
2493 error = send_request(r, lkb);
cf6620ac 2494 } else {
e7fd4179 2495 error = do_request(r, lkb);
cf6620ac
DT
2496 /* for remote locks the request_reply is sent
2497 between do_request and do_request_effects */
2498 do_request_effects(r, lkb, error);
2499 }
e7fd4179
DT
2500 out:
2501 return error;
2502}
2503
3bcd3687 2504/* change some property of an existing lkb, e.g. mode */
e7fd4179
DT
2505
2506static int _convert_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2507{
2508 int error;
2509
cf6620ac 2510 if (is_remote(r)) {
e7fd4179
DT
2511 /* receive_convert() calls do_convert() on remote node */
2512 error = send_convert(r, lkb);
cf6620ac 2513 } else {
e7fd4179 2514 error = do_convert(r, lkb);
cf6620ac
DT
2515 /* for remote locks the convert_reply is sent
2516 between do_convert and do_convert_effects */
2517 do_convert_effects(r, lkb, error);
2518 }
e7fd4179
DT
2519
2520 return error;
2521}
2522
2523/* remove an existing lkb from the granted queue */
2524
2525static int _unlock_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2526{
2527 int error;
2528
cf6620ac 2529 if (is_remote(r)) {
e7fd4179
DT
2530 /* receive_unlock() calls do_unlock() on remote node */
2531 error = send_unlock(r, lkb);
cf6620ac 2532 } else {
e7fd4179 2533 error = do_unlock(r, lkb);
cf6620ac
DT
2534 /* for remote locks the unlock_reply is sent
2535 between do_unlock and do_unlock_effects */
2536 do_unlock_effects(r, lkb, error);
2537 }
e7fd4179
DT
2538
2539 return error;
2540}
2541
2542/* remove an existing lkb from the convert or wait queue */
2543
2544static int _cancel_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2545{
2546 int error;
2547
cf6620ac 2548 if (is_remote(r)) {
e7fd4179
DT
2549 /* receive_cancel() calls do_cancel() on remote node */
2550 error = send_cancel(r, lkb);
cf6620ac 2551 } else {
e7fd4179 2552 error = do_cancel(r, lkb);
cf6620ac
DT
2553 /* for remote locks the cancel_reply is sent
2554 between do_cancel and do_cancel_effects */
2555 do_cancel_effects(r, lkb, error);
2556 }
e7fd4179
DT
2557
2558 return error;
2559}
2560
2561/*
2562 * Four stage 2 varieties:
2563 * request_lock(), convert_lock(), unlock_lock(), cancel_lock()
2564 */
2565
2566static int request_lock(struct dlm_ls *ls, struct dlm_lkb *lkb, char *name,
2567 int len, struct dlm_args *args)
2568{
2569 struct dlm_rsb *r;
2570 int error;
2571
2572 error = validate_lock_args(ls, lkb, args);
2573 if (error)
2574 goto out;
2575
2576 error = find_rsb(ls, name, len, R_CREATE, &r);
2577 if (error)
2578 goto out;
2579
2580 lock_rsb(r);
2581
2582 attach_lkb(r, lkb);
2583 lkb->lkb_lksb->sb_lkid = lkb->lkb_id;
2584
2585 error = _request_lock(r, lkb);
2586
2587 unlock_rsb(r);
2588 put_rsb(r);
2589
2590 out:
2591 return error;
2592}
2593
2594static int convert_lock(struct dlm_ls *ls, struct dlm_lkb *lkb,
2595 struct dlm_args *args)
2596{
2597 struct dlm_rsb *r;
2598 int error;
2599
2600 r = lkb->lkb_resource;
2601
2602 hold_rsb(r);
2603 lock_rsb(r);
2604
2605 error = validate_lock_args(ls, lkb, args);
2606 if (error)
2607 goto out;
2608
2609 error = _convert_lock(r, lkb);
2610 out:
2611 unlock_rsb(r);
2612 put_rsb(r);
2613 return error;
2614}
2615
2616static int unlock_lock(struct dlm_ls *ls, struct dlm_lkb *lkb,
2617 struct dlm_args *args)
2618{
2619 struct dlm_rsb *r;
2620 int error;
2621
2622 r = lkb->lkb_resource;
2623
2624 hold_rsb(r);
2625 lock_rsb(r);
2626
2627 error = validate_unlock_args(lkb, args);
2628 if (error)
2629 goto out;
2630
2631 error = _unlock_lock(r, lkb);
2632 out:
2633 unlock_rsb(r);
2634 put_rsb(r);
2635 return error;
2636}
2637
2638static int cancel_lock(struct dlm_ls *ls, struct dlm_lkb *lkb,
2639 struct dlm_args *args)
2640{
2641 struct dlm_rsb *r;
2642 int error;
2643
2644 r = lkb->lkb_resource;
2645
2646 hold_rsb(r);
2647 lock_rsb(r);
2648
2649 error = validate_unlock_args(lkb, args);
2650 if (error)
2651 goto out;
2652
2653 error = _cancel_lock(r, lkb);
2654 out:
2655 unlock_rsb(r);
2656 put_rsb(r);
2657 return error;
2658}
2659
2660/*
2661 * Two stage 1 varieties: dlm_lock() and dlm_unlock()
2662 */
2663
2664int dlm_lock(dlm_lockspace_t *lockspace,
2665 int mode,
2666 struct dlm_lksb *lksb,
2667 uint32_t flags,
2668 void *name,
2669 unsigned int namelen,
2670 uint32_t parent_lkid,
2671 void (*ast) (void *astarg),
2672 void *astarg,
3bcd3687 2673 void (*bast) (void *astarg, int mode))
e7fd4179
DT
2674{
2675 struct dlm_ls *ls;
2676 struct dlm_lkb *lkb;
2677 struct dlm_args args;
2678 int error, convert = flags & DLM_LKF_CONVERT;
2679
2680 ls = dlm_find_lockspace_local(lockspace);
2681 if (!ls)
2682 return -EINVAL;
2683
85e86edf 2684 dlm_lock_recovery(ls);
e7fd4179
DT
2685
2686 if (convert)
2687 error = find_lkb(ls, lksb->sb_lkid, &lkb);
2688 else
2689 error = create_lkb(ls, &lkb);
2690
2691 if (error)
2692 goto out;
2693
d7db923e 2694 error = set_lock_args(mode, lksb, flags, namelen, 0, ast,
3bcd3687 2695 astarg, bast, &args);
e7fd4179
DT
2696 if (error)
2697 goto out_put;
2698
2699 if (convert)
2700 error = convert_lock(ls, lkb, &args);
2701 else
2702 error = request_lock(ls, lkb, name, namelen, &args);
2703
2704 if (error == -EINPROGRESS)
2705 error = 0;
2706 out_put:
2707 if (convert || error)
b3f58d8f 2708 __put_lkb(ls, lkb);
c85d65e9 2709 if (error == -EAGAIN || error == -EDEADLK)
e7fd4179
DT
2710 error = 0;
2711 out:
85e86edf 2712 dlm_unlock_recovery(ls);
e7fd4179
DT
2713 dlm_put_lockspace(ls);
2714 return error;
2715}
2716
2717int dlm_unlock(dlm_lockspace_t *lockspace,
2718 uint32_t lkid,
2719 uint32_t flags,
2720 struct dlm_lksb *lksb,
2721 void *astarg)
2722{
2723 struct dlm_ls *ls;
2724 struct dlm_lkb *lkb;
2725 struct dlm_args args;
2726 int error;
2727
2728 ls = dlm_find_lockspace_local(lockspace);
2729 if (!ls)
2730 return -EINVAL;
2731
85e86edf 2732 dlm_lock_recovery(ls);
e7fd4179
DT
2733
2734 error = find_lkb(ls, lkid, &lkb);
2735 if (error)
2736 goto out;
2737
2738 error = set_unlock_args(flags, astarg, &args);
2739 if (error)
2740 goto out_put;
2741
2742 if (flags & DLM_LKF_CANCEL)
2743 error = cancel_lock(ls, lkb, &args);
2744 else
2745 error = unlock_lock(ls, lkb, &args);
2746
2747 if (error == -DLM_EUNLOCK || error == -DLM_ECANCEL)
2748 error = 0;
ef0c2bb0
DT
2749 if (error == -EBUSY && (flags & (DLM_LKF_CANCEL | DLM_LKF_FORCEUNLOCK)))
2750 error = 0;
e7fd4179 2751 out_put:
b3f58d8f 2752 dlm_put_lkb(lkb);
e7fd4179 2753 out:
85e86edf 2754 dlm_unlock_recovery(ls);
e7fd4179
DT
2755 dlm_put_lockspace(ls);
2756 return error;
2757}
2758
2759/*
2760 * send/receive routines for remote operations and replies
2761 *
2762 * send_args
2763 * send_common
2764 * send_request receive_request
2765 * send_convert receive_convert
2766 * send_unlock receive_unlock
2767 * send_cancel receive_cancel
2768 * send_grant receive_grant
2769 * send_bast receive_bast
2770 * send_lookup receive_lookup
2771 * send_remove receive_remove
2772 *
2773 * send_common_reply
2774 * receive_request_reply send_request_reply
2775 * receive_convert_reply send_convert_reply
2776 * receive_unlock_reply send_unlock_reply
2777 * receive_cancel_reply send_cancel_reply
2778 * receive_lookup_reply send_lookup_reply
2779 */
2780
7e4dac33
DT
2781static int _create_message(struct dlm_ls *ls, int mb_len,
2782 int to_nodeid, int mstype,
2783 struct dlm_message **ms_ret,
2784 struct dlm_mhandle **mh_ret)
e7fd4179
DT
2785{
2786 struct dlm_message *ms;
2787 struct dlm_mhandle *mh;
2788 char *mb;
e7fd4179
DT
2789
2790 /* get_buffer gives us a message handle (mh) that we need to
2791 pass into lowcomms_commit and a message buffer (mb) that we
2792 write our data into */
2793
573c24c4 2794 mh = dlm_lowcomms_get_buffer(to_nodeid, mb_len, GFP_NOFS, &mb);
e7fd4179
DT
2795 if (!mh)
2796 return -ENOBUFS;
2797
2798 memset(mb, 0, mb_len);
2799
2800 ms = (struct dlm_message *) mb;
2801
2802 ms->m_header.h_version = (DLM_HEADER_MAJOR | DLM_HEADER_MINOR);
7e4dac33 2803 ms->m_header.h_lockspace = ls->ls_global_id;
e7fd4179
DT
2804 ms->m_header.h_nodeid = dlm_our_nodeid();
2805 ms->m_header.h_length = mb_len;
2806 ms->m_header.h_cmd = DLM_MSG;
2807
2808 ms->m_type = mstype;
2809
2810 *mh_ret = mh;
2811 *ms_ret = ms;
2812 return 0;
2813}
2814
7e4dac33
DT
2815static int create_message(struct dlm_rsb *r, struct dlm_lkb *lkb,
2816 int to_nodeid, int mstype,
2817 struct dlm_message **ms_ret,
2818 struct dlm_mhandle **mh_ret)
2819{
2820 int mb_len = sizeof(struct dlm_message);
2821
2822 switch (mstype) {
2823 case DLM_MSG_REQUEST:
2824 case DLM_MSG_LOOKUP:
2825 case DLM_MSG_REMOVE:
2826 mb_len += r->res_length;
2827 break;
2828 case DLM_MSG_CONVERT:
2829 case DLM_MSG_UNLOCK:
2830 case DLM_MSG_REQUEST_REPLY:
2831 case DLM_MSG_CONVERT_REPLY:
2832 case DLM_MSG_GRANT:
2833 if (lkb && lkb->lkb_lvbptr)
2834 mb_len += r->res_ls->ls_lvblen;
2835 break;
2836 }
2837
2838 return _create_message(r->res_ls, mb_len, to_nodeid, mstype,
2839 ms_ret, mh_ret);
2840}
2841
e7fd4179
DT
2842/* further lowcomms enhancements or alternate implementations may make
2843 the return value from this function useful at some point */
2844
2845static int send_message(struct dlm_mhandle *mh, struct dlm_message *ms)
2846{
2847 dlm_message_out(ms);
2848 dlm_lowcomms_commit_buffer(mh);
2849 return 0;
2850}
2851
2852static void send_args(struct dlm_rsb *r, struct dlm_lkb *lkb,
2853 struct dlm_message *ms)
2854{
2855 ms->m_nodeid = lkb->lkb_nodeid;
2856 ms->m_pid = lkb->lkb_ownpid;
2857 ms->m_lkid = lkb->lkb_id;
2858 ms->m_remid = lkb->lkb_remid;
2859 ms->m_exflags = lkb->lkb_exflags;
2860 ms->m_sbflags = lkb->lkb_sbflags;
2861 ms->m_flags = lkb->lkb_flags;
2862 ms->m_lvbseq = lkb->lkb_lvbseq;
2863 ms->m_status = lkb->lkb_status;
2864 ms->m_grmode = lkb->lkb_grmode;
2865 ms->m_rqmode = lkb->lkb_rqmode;
2866 ms->m_hash = r->res_hash;
2867
2868 /* m_result and m_bastmode are set from function args,
2869 not from lkb fields */
2870
e5dae548 2871 if (lkb->lkb_bastfn)
8304d6f2 2872 ms->m_asts |= DLM_CB_BAST;
e5dae548 2873 if (lkb->lkb_astfn)
8304d6f2 2874 ms->m_asts |= DLM_CB_CAST;
e7fd4179 2875
da49f36f
DT
2876 /* compare with switch in create_message; send_remove() doesn't
2877 use send_args() */
e7fd4179 2878
da49f36f
DT
2879 switch (ms->m_type) {
2880 case DLM_MSG_REQUEST:
2881 case DLM_MSG_LOOKUP:
2882 memcpy(ms->m_extra, r->res_name, r->res_length);
2883 break;
2884 case DLM_MSG_CONVERT:
2885 case DLM_MSG_UNLOCK:
2886 case DLM_MSG_REQUEST_REPLY:
2887 case DLM_MSG_CONVERT_REPLY:
2888 case DLM_MSG_GRANT:
2889 if (!lkb->lkb_lvbptr)
2890 break;
e7fd4179 2891 memcpy(ms->m_extra, lkb->lkb_lvbptr, r->res_ls->ls_lvblen);
da49f36f
DT
2892 break;
2893 }
e7fd4179
DT
2894}
2895
2896static int send_common(struct dlm_rsb *r, struct dlm_lkb *lkb, int mstype)
2897{
2898 struct dlm_message *ms;
2899 struct dlm_mhandle *mh;
2900 int to_nodeid, error;
2901
c6ff669b
DT
2902 to_nodeid = r->res_nodeid;
2903
2904 error = add_to_waiters(lkb, mstype, to_nodeid);
ef0c2bb0
DT
2905 if (error)
2906 return error;
e7fd4179 2907
e7fd4179
DT
2908 error = create_message(r, lkb, to_nodeid, mstype, &ms, &mh);
2909 if (error)
2910 goto fail;
2911
2912 send_args(r, lkb, ms);
2913
2914 error = send_message(mh, ms);
2915 if (error)
2916 goto fail;
2917 return 0;
2918
2919 fail:
ef0c2bb0 2920 remove_from_waiters(lkb, msg_reply_type(mstype));
e7fd4179
DT
2921 return error;
2922}
2923
2924static int send_request(struct dlm_rsb *r, struct dlm_lkb *lkb)
2925{
2926 return send_common(r, lkb, DLM_MSG_REQUEST);
2927}
2928
2929static int send_convert(struct dlm_rsb *r, struct dlm_lkb *lkb)
2930{
2931 int error;
2932
2933 error = send_common(r, lkb, DLM_MSG_CONVERT);
2934
2935 /* down conversions go without a reply from the master */
2936 if (!error && down_conversion(lkb)) {
ef0c2bb0 2937 remove_from_waiters(lkb, DLM_MSG_CONVERT_REPLY);
2a7ce0ed 2938 r->res_ls->ls_stub_ms.m_flags = DLM_IFL_STUB_MS;
ef0c2bb0 2939 r->res_ls->ls_stub_ms.m_type = DLM_MSG_CONVERT_REPLY;
e7fd4179
DT
2940 r->res_ls->ls_stub_ms.m_result = 0;
2941 __receive_convert_reply(r, lkb, &r->res_ls->ls_stub_ms);
2942 }
2943
2944 return error;
2945}
2946
2947/* FIXME: if this lkb is the only lock we hold on the rsb, then set
2948 MASTER_UNCERTAIN to force the next request on the rsb to confirm
2949 that the master is still correct. */
2950
2951static int send_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2952{
2953 return send_common(r, lkb, DLM_MSG_UNLOCK);
2954}
2955
2956static int send_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb)
2957{
2958 return send_common(r, lkb, DLM_MSG_CANCEL);
2959}
2960
2961static int send_grant(struct dlm_rsb *r, struct dlm_lkb *lkb)
2962{
2963 struct dlm_message *ms;
2964 struct dlm_mhandle *mh;
2965 int to_nodeid, error;
2966
2967 to_nodeid = lkb->lkb_nodeid;
2968
2969 error = create_message(r, lkb, to_nodeid, DLM_MSG_GRANT, &ms, &mh);
2970 if (error)
2971 goto out;
2972
2973 send_args(r, lkb, ms);
2974
2975 ms->m_result = 0;
2976
2977 error = send_message(mh, ms);
2978 out:
2979 return error;
2980}
2981
2982static int send_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int mode)
2983{
2984 struct dlm_message *ms;
2985 struct dlm_mhandle *mh;
2986 int to_nodeid, error;
2987
2988 to_nodeid = lkb->lkb_nodeid;
2989
2990 error = create_message(r, NULL, to_nodeid, DLM_MSG_BAST, &ms, &mh);
2991 if (error)
2992 goto out;
2993
2994 send_args(r, lkb, ms);
2995
2996 ms->m_bastmode = mode;
2997
2998 error = send_message(mh, ms);
2999 out:
3000 return error;
3001}
3002
3003static int send_lookup(struct dlm_rsb *r, struct dlm_lkb *lkb)
3004{
3005 struct dlm_message *ms;
3006 struct dlm_mhandle *mh;
3007 int to_nodeid, error;
3008
c6ff669b
DT
3009 to_nodeid = dlm_dir_nodeid(r);
3010
3011 error = add_to_waiters(lkb, DLM_MSG_LOOKUP, to_nodeid);
ef0c2bb0
DT
3012 if (error)
3013 return error;
e7fd4179 3014
e7fd4179
DT
3015 error = create_message(r, NULL, to_nodeid, DLM_MSG_LOOKUP, &ms, &mh);
3016 if (error)
3017 goto fail;
3018
3019 send_args(r, lkb, ms);
3020
3021 error = send_message(mh, ms);
3022 if (error)
3023 goto fail;
3024 return 0;
3025
3026 fail:
ef0c2bb0 3027 remove_from_waiters(lkb, DLM_MSG_LOOKUP_REPLY);
e7fd4179
DT
3028 return error;
3029}
3030
3031static int send_remove(struct dlm_rsb *r)
3032{
3033 struct dlm_message *ms;
3034 struct dlm_mhandle *mh;
3035 int to_nodeid, error;
3036
3037 to_nodeid = dlm_dir_nodeid(r);
3038
3039 error = create_message(r, NULL, to_nodeid, DLM_MSG_REMOVE, &ms, &mh);
3040 if (error)
3041 goto out;
3042
3043 memcpy(ms->m_extra, r->res_name, r->res_length);
3044 ms->m_hash = r->res_hash;
3045
3046 error = send_message(mh, ms);
3047 out:
3048 return error;
3049}
3050
3051static int send_common_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
3052 int mstype, int rv)
3053{
3054 struct dlm_message *ms;
3055 struct dlm_mhandle *mh;
3056 int to_nodeid, error;
3057
3058 to_nodeid = lkb->lkb_nodeid;
3059
3060 error = create_message(r, lkb, to_nodeid, mstype, &ms, &mh);
3061 if (error)
3062 goto out;
3063
3064 send_args(r, lkb, ms);
3065
3066 ms->m_result = rv;
3067
3068 error = send_message(mh, ms);
3069 out:
3070 return error;
3071}
3072
3073static int send_request_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
3074{
3075 return send_common_reply(r, lkb, DLM_MSG_REQUEST_REPLY, rv);
3076}
3077
3078static int send_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
3079{
3080 return send_common_reply(r, lkb, DLM_MSG_CONVERT_REPLY, rv);
3081}
3082
3083static int send_unlock_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
3084{
3085 return send_common_reply(r, lkb, DLM_MSG_UNLOCK_REPLY, rv);
3086}
3087
3088static int send_cancel_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
3089{
3090 return send_common_reply(r, lkb, DLM_MSG_CANCEL_REPLY, rv);
3091}
3092
3093static int send_lookup_reply(struct dlm_ls *ls, struct dlm_message *ms_in,
3094 int ret_nodeid, int rv)
3095{
3096 struct dlm_rsb *r = &ls->ls_stub_rsb;
3097 struct dlm_message *ms;
3098 struct dlm_mhandle *mh;
3099 int error, nodeid = ms_in->m_header.h_nodeid;
3100
3101 error = create_message(r, NULL, nodeid, DLM_MSG_LOOKUP_REPLY, &ms, &mh);
3102 if (error)
3103 goto out;
3104
3105 ms->m_lkid = ms_in->m_lkid;
3106 ms->m_result = rv;
3107 ms->m_nodeid = ret_nodeid;
3108
3109 error = send_message(mh, ms);
3110 out:
3111 return error;
3112}
3113
3114/* which args we save from a received message depends heavily on the type
3115 of message, unlike the send side where we can safely send everything about
3116 the lkb for any type of message */
3117
3118static void receive_flags(struct dlm_lkb *lkb, struct dlm_message *ms)
3119{
3120 lkb->lkb_exflags = ms->m_exflags;
6f90a8b1 3121 lkb->lkb_sbflags = ms->m_sbflags;
e7fd4179
DT
3122 lkb->lkb_flags = (lkb->lkb_flags & 0xFFFF0000) |
3123 (ms->m_flags & 0x0000FFFF);
3124}
3125
3126static void receive_flags_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
3127{
2a7ce0ed
DT
3128 if (ms->m_flags == DLM_IFL_STUB_MS)
3129 return;
3130
e7fd4179
DT
3131 lkb->lkb_sbflags = ms->m_sbflags;
3132 lkb->lkb_flags = (lkb->lkb_flags & 0xFFFF0000) |
3133 (ms->m_flags & 0x0000FFFF);
3134}
3135
3136static int receive_extralen(struct dlm_message *ms)
3137{
3138 return (ms->m_header.h_length - sizeof(struct dlm_message));
3139}
3140
e7fd4179
DT
3141static int receive_lvb(struct dlm_ls *ls, struct dlm_lkb *lkb,
3142 struct dlm_message *ms)
3143{
3144 int len;
3145
3146 if (lkb->lkb_exflags & DLM_LKF_VALBLK) {
3147 if (!lkb->lkb_lvbptr)
52bda2b5 3148 lkb->lkb_lvbptr = dlm_allocate_lvb(ls);
e7fd4179
DT
3149 if (!lkb->lkb_lvbptr)
3150 return -ENOMEM;
3151 len = receive_extralen(ms);
a9cc9159
AV
3152 if (len > DLM_RESNAME_MAXLEN)
3153 len = DLM_RESNAME_MAXLEN;
e7fd4179
DT
3154 memcpy(lkb->lkb_lvbptr, ms->m_extra, len);
3155 }
3156 return 0;
3157}
3158
e5dae548
DT
3159static void fake_bastfn(void *astparam, int mode)
3160{
3161 log_print("fake_bastfn should not be called");
3162}
3163
3164static void fake_astfn(void *astparam)
3165{
3166 log_print("fake_astfn should not be called");
3167}
3168
e7fd4179
DT
3169static int receive_request_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
3170 struct dlm_message *ms)
3171{
3172 lkb->lkb_nodeid = ms->m_header.h_nodeid;
3173 lkb->lkb_ownpid = ms->m_pid;
3174 lkb->lkb_remid = ms->m_lkid;
3175 lkb->lkb_grmode = DLM_LOCK_IV;
3176 lkb->lkb_rqmode = ms->m_rqmode;
e5dae548 3177
8304d6f2
DT
3178 lkb->lkb_bastfn = (ms->m_asts & DLM_CB_BAST) ? &fake_bastfn : NULL;
3179 lkb->lkb_astfn = (ms->m_asts & DLM_CB_CAST) ? &fake_astfn : NULL;
e7fd4179 3180
8d07fd50
DT
3181 if (lkb->lkb_exflags & DLM_LKF_VALBLK) {
3182 /* lkb was just created so there won't be an lvb yet */
52bda2b5 3183 lkb->lkb_lvbptr = dlm_allocate_lvb(ls);
8d07fd50
DT
3184 if (!lkb->lkb_lvbptr)
3185 return -ENOMEM;
3186 }
e7fd4179
DT
3187
3188 return 0;
3189}
3190
3191static int receive_convert_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
3192 struct dlm_message *ms)
3193{
e7fd4179
DT
3194 if (lkb->lkb_status != DLM_LKSTS_GRANTED)
3195 return -EBUSY;
3196
e7fd4179
DT
3197 if (receive_lvb(ls, lkb, ms))
3198 return -ENOMEM;
3199
3200 lkb->lkb_rqmode = ms->m_rqmode;
3201 lkb->lkb_lvbseq = ms->m_lvbseq;
3202
3203 return 0;
3204}
3205
3206static int receive_unlock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
3207 struct dlm_message *ms)
3208{
e7fd4179
DT
3209 if (receive_lvb(ls, lkb, ms))
3210 return -ENOMEM;
3211 return 0;
3212}
3213
3214/* We fill in the stub-lkb fields with the info that send_xxxx_reply()
3215 uses to send a reply and that the remote end uses to process the reply. */
3216
3217static void setup_stub_lkb(struct dlm_ls *ls, struct dlm_message *ms)
3218{
3219 struct dlm_lkb *lkb = &ls->ls_stub_lkb;
3220 lkb->lkb_nodeid = ms->m_header.h_nodeid;
3221 lkb->lkb_remid = ms->m_lkid;
3222}
3223
c54e04b0
DT
3224/* This is called after the rsb is locked so that we can safely inspect
3225 fields in the lkb. */
3226
3227static int validate_message(struct dlm_lkb *lkb, struct dlm_message *ms)
3228{
3229 int from = ms->m_header.h_nodeid;
3230 int error = 0;
3231
3232 switch (ms->m_type) {
3233 case DLM_MSG_CONVERT:
3234 case DLM_MSG_UNLOCK:
3235 case DLM_MSG_CANCEL:
3236 if (!is_master_copy(lkb) || lkb->lkb_nodeid != from)
3237 error = -EINVAL;
3238 break;
3239
3240 case DLM_MSG_CONVERT_REPLY:
3241 case DLM_MSG_UNLOCK_REPLY:
3242 case DLM_MSG_CANCEL_REPLY:
3243 case DLM_MSG_GRANT:
3244 case DLM_MSG_BAST:
3245 if (!is_process_copy(lkb) || lkb->lkb_nodeid != from)
3246 error = -EINVAL;
3247 break;
3248
3249 case DLM_MSG_REQUEST_REPLY:
3250 if (!is_process_copy(lkb))
3251 error = -EINVAL;
3252 else if (lkb->lkb_nodeid != -1 && lkb->lkb_nodeid != from)
3253 error = -EINVAL;
3254 break;
3255
3256 default:
3257 error = -EINVAL;
3258 }
3259
3260 if (error)
3261 log_error(lkb->lkb_resource->res_ls,
3262 "ignore invalid message %d from %d %x %x %x %d",
3263 ms->m_type, from, lkb->lkb_id, lkb->lkb_remid,
3264 lkb->lkb_flags, lkb->lkb_nodeid);
3265 return error;
3266}
3267
e7fd4179
DT
3268static void receive_request(struct dlm_ls *ls, struct dlm_message *ms)
3269{
3270 struct dlm_lkb *lkb;
3271 struct dlm_rsb *r;
3272 int error, namelen;
3273
3274 error = create_lkb(ls, &lkb);
3275 if (error)
3276 goto fail;
3277
3278 receive_flags(lkb, ms);
3279 lkb->lkb_flags |= DLM_IFL_MSTCPY;
3280 error = receive_request_args(ls, lkb, ms);
3281 if (error) {
b3f58d8f 3282 __put_lkb(ls, lkb);
e7fd4179
DT
3283 goto fail;
3284 }
3285
3286 namelen = receive_extralen(ms);
3287
3288 error = find_rsb(ls, ms->m_extra, namelen, R_MASTER, &r);
3289 if (error) {
b3f58d8f 3290 __put_lkb(ls, lkb);
e7fd4179
DT
3291 goto fail;
3292 }
3293
3294 lock_rsb(r);
3295
3296 attach_lkb(r, lkb);
3297 error = do_request(r, lkb);
3298 send_request_reply(r, lkb, error);
cf6620ac 3299 do_request_effects(r, lkb, error);
e7fd4179
DT
3300
3301 unlock_rsb(r);
3302 put_rsb(r);
3303
3304 if (error == -EINPROGRESS)
3305 error = 0;
3306 if (error)
b3f58d8f 3307 dlm_put_lkb(lkb);
e7fd4179
DT
3308 return;
3309
3310 fail:
3311 setup_stub_lkb(ls, ms);
3312 send_request_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
3313}
3314
3315static void receive_convert(struct dlm_ls *ls, struct dlm_message *ms)
3316{
3317 struct dlm_lkb *lkb;
3318 struct dlm_rsb *r;
90135925 3319 int error, reply = 1;
e7fd4179
DT
3320
3321 error = find_lkb(ls, ms->m_remid, &lkb);
3322 if (error)
3323 goto fail;
3324
3325 r = lkb->lkb_resource;
3326
3327 hold_rsb(r);
3328 lock_rsb(r);
3329
c54e04b0
DT
3330 error = validate_message(lkb, ms);
3331 if (error)
3332 goto out;
3333
e7fd4179 3334 receive_flags(lkb, ms);
cf6620ac 3335
e7fd4179 3336 error = receive_convert_args(ls, lkb, ms);
cf6620ac
DT
3337 if (error) {
3338 send_convert_reply(r, lkb, error);
3339 goto out;
3340 }
3341
e7fd4179
DT
3342 reply = !down_conversion(lkb);
3343
3344 error = do_convert(r, lkb);
e7fd4179
DT
3345 if (reply)
3346 send_convert_reply(r, lkb, error);
cf6620ac 3347 do_convert_effects(r, lkb, error);
c54e04b0 3348 out:
e7fd4179
DT
3349 unlock_rsb(r);
3350 put_rsb(r);
b3f58d8f 3351 dlm_put_lkb(lkb);
e7fd4179
DT
3352 return;
3353
3354 fail:
3355 setup_stub_lkb(ls, ms);
3356 send_convert_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
3357}
3358
3359static void receive_unlock(struct dlm_ls *ls, struct dlm_message *ms)
3360{
3361 struct dlm_lkb *lkb;
3362 struct dlm_rsb *r;
3363 int error;
3364
3365 error = find_lkb(ls, ms->m_remid, &lkb);
3366 if (error)
3367 goto fail;
3368
3369 r = lkb->lkb_resource;
3370
3371 hold_rsb(r);
3372 lock_rsb(r);
3373
c54e04b0
DT
3374 error = validate_message(lkb, ms);
3375 if (error)
3376 goto out;
3377
e7fd4179 3378 receive_flags(lkb, ms);
cf6620ac 3379
e7fd4179 3380 error = receive_unlock_args(ls, lkb, ms);
cf6620ac
DT
3381 if (error) {
3382 send_unlock_reply(r, lkb, error);
3383 goto out;
3384 }
e7fd4179
DT
3385
3386 error = do_unlock(r, lkb);
e7fd4179 3387 send_unlock_reply(r, lkb, error);
cf6620ac 3388 do_unlock_effects(r, lkb, error);
c54e04b0 3389 out:
e7fd4179
DT
3390 unlock_rsb(r);
3391 put_rsb(r);
b3f58d8f 3392 dlm_put_lkb(lkb);
e7fd4179
DT
3393 return;
3394
3395 fail:
3396 setup_stub_lkb(ls, ms);
3397 send_unlock_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
3398}
3399
3400static void receive_cancel(struct dlm_ls *ls, struct dlm_message *ms)
3401{
3402 struct dlm_lkb *lkb;
3403 struct dlm_rsb *r;
3404 int error;
3405
3406 error = find_lkb(ls, ms->m_remid, &lkb);
3407 if (error)
3408 goto fail;
3409
3410 receive_flags(lkb, ms);
3411
3412 r = lkb->lkb_resource;
3413
3414 hold_rsb(r);
3415 lock_rsb(r);
3416
c54e04b0
DT
3417 error = validate_message(lkb, ms);
3418 if (error)
3419 goto out;
3420
e7fd4179
DT
3421 error = do_cancel(r, lkb);
3422 send_cancel_reply(r, lkb, error);
cf6620ac 3423 do_cancel_effects(r, lkb, error);
c54e04b0 3424 out:
e7fd4179
DT
3425 unlock_rsb(r);
3426 put_rsb(r);
b3f58d8f 3427 dlm_put_lkb(lkb);
e7fd4179
DT
3428 return;
3429
3430 fail:
3431 setup_stub_lkb(ls, ms);
3432 send_cancel_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
3433}
3434
3435static void receive_grant(struct dlm_ls *ls, struct dlm_message *ms)
3436{
3437 struct dlm_lkb *lkb;
3438 struct dlm_rsb *r;
3439 int error;
3440
3441 error = find_lkb(ls, ms->m_remid, &lkb);
3442 if (error) {
c54e04b0
DT
3443 log_debug(ls, "receive_grant from %d no lkb %x",
3444 ms->m_header.h_nodeid, ms->m_remid);
e7fd4179
DT
3445 return;
3446 }
e7fd4179
DT
3447
3448 r = lkb->lkb_resource;
3449
3450 hold_rsb(r);
3451 lock_rsb(r);
3452
c54e04b0
DT
3453 error = validate_message(lkb, ms);
3454 if (error)
3455 goto out;
3456
e7fd4179 3457 receive_flags_reply(lkb, ms);
7d3c1feb
DT
3458 if (is_altmode(lkb))
3459 munge_altmode(lkb, ms);
e7fd4179
DT
3460 grant_lock_pc(r, lkb, ms);
3461 queue_cast(r, lkb, 0);
c54e04b0 3462 out:
e7fd4179
DT
3463 unlock_rsb(r);
3464 put_rsb(r);
b3f58d8f 3465 dlm_put_lkb(lkb);
e7fd4179
DT
3466}
3467
3468static void receive_bast(struct dlm_ls *ls, struct dlm_message *ms)
3469{
3470 struct dlm_lkb *lkb;
3471 struct dlm_rsb *r;
3472 int error;
3473
3474 error = find_lkb(ls, ms->m_remid, &lkb);
3475 if (error) {
c54e04b0
DT
3476 log_debug(ls, "receive_bast from %d no lkb %x",
3477 ms->m_header.h_nodeid, ms->m_remid);
e7fd4179
DT
3478 return;
3479 }
e7fd4179
DT
3480
3481 r = lkb->lkb_resource;
3482
3483 hold_rsb(r);
3484 lock_rsb(r);
3485
c54e04b0
DT
3486 error = validate_message(lkb, ms);
3487 if (error)
3488 goto out;
e7fd4179 3489
c54e04b0
DT
3490 queue_bast(r, lkb, ms->m_bastmode);
3491 out:
e7fd4179
DT
3492 unlock_rsb(r);
3493 put_rsb(r);
b3f58d8f 3494 dlm_put_lkb(lkb);
e7fd4179
DT
3495}
3496
3497static void receive_lookup(struct dlm_ls *ls, struct dlm_message *ms)
3498{
3499 int len, error, ret_nodeid, dir_nodeid, from_nodeid, our_nodeid;
3500
3501 from_nodeid = ms->m_header.h_nodeid;
3502 our_nodeid = dlm_our_nodeid();
3503
3504 len = receive_extralen(ms);
3505
3506 dir_nodeid = dlm_hash2nodeid(ls, ms->m_hash);
3507 if (dir_nodeid != our_nodeid) {
3508 log_error(ls, "lookup dir_nodeid %d from %d",
3509 dir_nodeid, from_nodeid);
3510 error = -EINVAL;
3511 ret_nodeid = -1;
3512 goto out;
3513 }
3514
3515 error = dlm_dir_lookup(ls, from_nodeid, ms->m_extra, len, &ret_nodeid);
3516
3517 /* Optimization: we're master so treat lookup as a request */
3518 if (!error && ret_nodeid == our_nodeid) {
3519 receive_request(ls, ms);
3520 return;
3521 }
3522 out:
3523 send_lookup_reply(ls, ms, ret_nodeid, error);
3524}
3525
3526static void receive_remove(struct dlm_ls *ls, struct dlm_message *ms)
3527{
3528 int len, dir_nodeid, from_nodeid;
3529
3530 from_nodeid = ms->m_header.h_nodeid;
3531
3532 len = receive_extralen(ms);
3533
3534 dir_nodeid = dlm_hash2nodeid(ls, ms->m_hash);
3535 if (dir_nodeid != dlm_our_nodeid()) {
3536 log_error(ls, "remove dir entry dir_nodeid %d from %d",
3537 dir_nodeid, from_nodeid);
3538 return;
3539 }
3540
3541 dlm_dir_remove_entry(ls, from_nodeid, ms->m_extra, len);
3542}
3543
8499137d
DT
3544static void receive_purge(struct dlm_ls *ls, struct dlm_message *ms)
3545{
3546 do_purge(ls, ms->m_nodeid, ms->m_pid);
3547}
3548
e7fd4179
DT
3549static void receive_request_reply(struct dlm_ls *ls, struct dlm_message *ms)
3550{
3551 struct dlm_lkb *lkb;
3552 struct dlm_rsb *r;
ef0c2bb0 3553 int error, mstype, result;
e7fd4179
DT
3554
3555 error = find_lkb(ls, ms->m_remid, &lkb);
3556 if (error) {
c54e04b0
DT
3557 log_debug(ls, "receive_request_reply from %d no lkb %x",
3558 ms->m_header.h_nodeid, ms->m_remid);
e7fd4179
DT
3559 return;
3560 }
e7fd4179 3561
e7fd4179
DT
3562 r = lkb->lkb_resource;
3563 hold_rsb(r);
3564 lock_rsb(r);
3565
c54e04b0
DT
3566 error = validate_message(lkb, ms);
3567 if (error)
3568 goto out;
3569
ef0c2bb0
DT
3570 mstype = lkb->lkb_wait_type;
3571 error = remove_from_waiters(lkb, DLM_MSG_REQUEST_REPLY);
3572 if (error)
3573 goto out;
3574
e7fd4179
DT
3575 /* Optimization: the dir node was also the master, so it took our
3576 lookup as a request and sent request reply instead of lookup reply */
3577 if (mstype == DLM_MSG_LOOKUP) {
3578 r->res_nodeid = ms->m_header.h_nodeid;
3579 lkb->lkb_nodeid = r->res_nodeid;
3580 }
3581
ef0c2bb0
DT
3582 /* this is the value returned from do_request() on the master */
3583 result = ms->m_result;
3584
3585 switch (result) {
e7fd4179 3586 case -EAGAIN:
ef0c2bb0 3587 /* request would block (be queued) on remote master */
e7fd4179
DT
3588 queue_cast(r, lkb, -EAGAIN);
3589 confirm_master(r, -EAGAIN);
ef0c2bb0 3590 unhold_lkb(lkb); /* undoes create_lkb() */
e7fd4179
DT
3591 break;
3592
3593 case -EINPROGRESS:
3594 case 0:
3595 /* request was queued or granted on remote master */
3596 receive_flags_reply(lkb, ms);
3597 lkb->lkb_remid = ms->m_lkid;
7d3c1feb
DT
3598 if (is_altmode(lkb))
3599 munge_altmode(lkb, ms);
3ae1acf9 3600 if (result) {
e7fd4179 3601 add_lkb(r, lkb, DLM_LKSTS_WAITING);
3ae1acf9
DT
3602 add_timeout(lkb);
3603 } else {
e7fd4179
DT
3604 grant_lock_pc(r, lkb, ms);
3605 queue_cast(r, lkb, 0);
3606 }
ef0c2bb0 3607 confirm_master(r, result);
e7fd4179
DT
3608 break;
3609
597d0cae 3610 case -EBADR:
e7fd4179
DT
3611 case -ENOTBLK:
3612 /* find_rsb failed to find rsb or rsb wasn't master */
ef0c2bb0
DT
3613 log_debug(ls, "receive_request_reply %x %x master diff %d %d",
3614 lkb->lkb_id, lkb->lkb_flags, r->res_nodeid, result);
e7fd4179
DT
3615 r->res_nodeid = -1;
3616 lkb->lkb_nodeid = -1;
ef0c2bb0
DT
3617
3618 if (is_overlap(lkb)) {
3619 /* we'll ignore error in cancel/unlock reply */
3620 queue_cast_overlap(r, lkb);
aec64e1b 3621 confirm_master(r, result);
ef0c2bb0
DT
3622 unhold_lkb(lkb); /* undoes create_lkb() */
3623 } else
3624 _request_lock(r, lkb);
e7fd4179
DT
3625 break;
3626
3627 default:
ef0c2bb0
DT
3628 log_error(ls, "receive_request_reply %x error %d",
3629 lkb->lkb_id, result);
e7fd4179
DT
3630 }
3631
ef0c2bb0
DT
3632 if (is_overlap_unlock(lkb) && (result == 0 || result == -EINPROGRESS)) {
3633 log_debug(ls, "receive_request_reply %x result %d unlock",
3634 lkb->lkb_id, result);
3635 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK;
3636 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
3637 send_unlock(r, lkb);
3638 } else if (is_overlap_cancel(lkb) && (result == -EINPROGRESS)) {
3639 log_debug(ls, "receive_request_reply %x cancel", lkb->lkb_id);
3640 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK;
3641 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
3642 send_cancel(r, lkb);
3643 } else {
3644 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
3645 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK;
3646 }
3647 out:
e7fd4179
DT
3648 unlock_rsb(r);
3649 put_rsb(r);
b3f58d8f 3650 dlm_put_lkb(lkb);
e7fd4179
DT
3651}
3652
3653static void __receive_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
3654 struct dlm_message *ms)
3655{
e7fd4179 3656 /* this is the value returned from do_convert() on the master */
ef0c2bb0 3657 switch (ms->m_result) {
e7fd4179
DT
3658 case -EAGAIN:
3659 /* convert would block (be queued) on remote master */
3660 queue_cast(r, lkb, -EAGAIN);
3661 break;
3662
c85d65e9
DT
3663 case -EDEADLK:
3664 receive_flags_reply(lkb, ms);
3665 revert_lock_pc(r, lkb);
3666 queue_cast(r, lkb, -EDEADLK);
3667 break;
3668
e7fd4179
DT
3669 case -EINPROGRESS:
3670 /* convert was queued on remote master */
7d3c1feb
DT
3671 receive_flags_reply(lkb, ms);
3672 if (is_demoted(lkb))
2a7ce0ed 3673 munge_demoted(lkb);
e7fd4179
DT
3674 del_lkb(r, lkb);
3675 add_lkb(r, lkb, DLM_LKSTS_CONVERT);
3ae1acf9 3676 add_timeout(lkb);
e7fd4179
DT
3677 break;
3678
3679 case 0:
3680 /* convert was granted on remote master */
3681 receive_flags_reply(lkb, ms);
7d3c1feb 3682 if (is_demoted(lkb))
2a7ce0ed 3683 munge_demoted(lkb);
e7fd4179
DT
3684 grant_lock_pc(r, lkb, ms);
3685 queue_cast(r, lkb, 0);
3686 break;
3687
3688 default:
ef0c2bb0
DT
3689 log_error(r->res_ls, "receive_convert_reply %x error %d",
3690 lkb->lkb_id, ms->m_result);
e7fd4179
DT
3691 }
3692}
3693
3694static void _receive_convert_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
3695{
3696 struct dlm_rsb *r = lkb->lkb_resource;
ef0c2bb0 3697 int error;
e7fd4179
DT
3698
3699 hold_rsb(r);
3700 lock_rsb(r);
3701
c54e04b0
DT
3702 error = validate_message(lkb, ms);
3703 if (error)
3704 goto out;
3705
ef0c2bb0
DT
3706 /* stub reply can happen with waiters_mutex held */
3707 error = remove_from_waiters_ms(lkb, ms);
3708 if (error)
3709 goto out;
e7fd4179 3710
ef0c2bb0
DT
3711 __receive_convert_reply(r, lkb, ms);
3712 out:
e7fd4179
DT
3713 unlock_rsb(r);
3714 put_rsb(r);
3715}
3716
3717static void receive_convert_reply(struct dlm_ls *ls, struct dlm_message *ms)
3718{
3719 struct dlm_lkb *lkb;
3720 int error;
3721
3722 error = find_lkb(ls, ms->m_remid, &lkb);
3723 if (error) {
c54e04b0
DT
3724 log_debug(ls, "receive_convert_reply from %d no lkb %x",
3725 ms->m_header.h_nodeid, ms->m_remid);
e7fd4179
DT
3726 return;
3727 }
e7fd4179 3728
e7fd4179 3729 _receive_convert_reply(lkb, ms);
b3f58d8f 3730 dlm_put_lkb(lkb);
e7fd4179
DT
3731}
3732
3733static void _receive_unlock_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
3734{
3735 struct dlm_rsb *r = lkb->lkb_resource;
ef0c2bb0 3736 int error;
e7fd4179
DT
3737
3738 hold_rsb(r);
3739 lock_rsb(r);
3740
c54e04b0
DT
3741 error = validate_message(lkb, ms);
3742 if (error)
3743 goto out;
3744
ef0c2bb0
DT
3745 /* stub reply can happen with waiters_mutex held */
3746 error = remove_from_waiters_ms(lkb, ms);
3747 if (error)
3748 goto out;
3749
e7fd4179
DT
3750 /* this is the value returned from do_unlock() on the master */
3751
ef0c2bb0 3752 switch (ms->m_result) {
e7fd4179
DT
3753 case -DLM_EUNLOCK:
3754 receive_flags_reply(lkb, ms);
3755 remove_lock_pc(r, lkb);
3756 queue_cast(r, lkb, -DLM_EUNLOCK);
3757 break;
ef0c2bb0
DT
3758 case -ENOENT:
3759 break;
e7fd4179 3760 default:
ef0c2bb0
DT
3761 log_error(r->res_ls, "receive_unlock_reply %x error %d",
3762 lkb->lkb_id, ms->m_result);
e7fd4179 3763 }
ef0c2bb0 3764 out:
e7fd4179
DT
3765 unlock_rsb(r);
3766 put_rsb(r);
3767}
3768
3769static void receive_unlock_reply(struct dlm_ls *ls, struct dlm_message *ms)
3770{
3771 struct dlm_lkb *lkb;
3772 int error;
3773
3774 error = find_lkb(ls, ms->m_remid, &lkb);
3775 if (error) {
c54e04b0
DT
3776 log_debug(ls, "receive_unlock_reply from %d no lkb %x",
3777 ms->m_header.h_nodeid, ms->m_remid);
e7fd4179
DT
3778 return;
3779 }
e7fd4179 3780
e7fd4179 3781 _receive_unlock_reply(lkb, ms);
b3f58d8f 3782 dlm_put_lkb(lkb);
e7fd4179
DT
3783}
3784
3785static void _receive_cancel_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
3786{
3787 struct dlm_rsb *r = lkb->lkb_resource;
ef0c2bb0 3788 int error;
e7fd4179
DT
3789
3790 hold_rsb(r);
3791 lock_rsb(r);
3792
c54e04b0
DT
3793 error = validate_message(lkb, ms);
3794 if (error)
3795 goto out;
3796
ef0c2bb0
DT
3797 /* stub reply can happen with waiters_mutex held */
3798 error = remove_from_waiters_ms(lkb, ms);
3799 if (error)
3800 goto out;
3801
e7fd4179
DT
3802 /* this is the value returned from do_cancel() on the master */
3803
ef0c2bb0 3804 switch (ms->m_result) {
e7fd4179
DT
3805 case -DLM_ECANCEL:
3806 receive_flags_reply(lkb, ms);
3807 revert_lock_pc(r, lkb);
84d8cd69 3808 queue_cast(r, lkb, -DLM_ECANCEL);
ef0c2bb0
DT
3809 break;
3810 case 0:
e7fd4179
DT
3811 break;
3812 default:
ef0c2bb0
DT
3813 log_error(r->res_ls, "receive_cancel_reply %x error %d",
3814 lkb->lkb_id, ms->m_result);
e7fd4179 3815 }
ef0c2bb0 3816 out:
e7fd4179
DT
3817 unlock_rsb(r);
3818 put_rsb(r);
3819}
3820
3821static void receive_cancel_reply(struct dlm_ls *ls, struct dlm_message *ms)
3822{
3823 struct dlm_lkb *lkb;
3824 int error;
3825
3826 error = find_lkb(ls, ms->m_remid, &lkb);
3827 if (error) {
c54e04b0
DT
3828 log_debug(ls, "receive_cancel_reply from %d no lkb %x",
3829 ms->m_header.h_nodeid, ms->m_remid);
e7fd4179
DT
3830 return;
3831 }
e7fd4179 3832
e7fd4179 3833 _receive_cancel_reply(lkb, ms);
b3f58d8f 3834 dlm_put_lkb(lkb);
e7fd4179
DT
3835}
3836
3837static void receive_lookup_reply(struct dlm_ls *ls, struct dlm_message *ms)
3838{
3839 struct dlm_lkb *lkb;
3840 struct dlm_rsb *r;
3841 int error, ret_nodeid;
3842
3843 error = find_lkb(ls, ms->m_lkid, &lkb);
3844 if (error) {
3845 log_error(ls, "receive_lookup_reply no lkb");
3846 return;
3847 }
3848
ef0c2bb0 3849 /* ms->m_result is the value returned by dlm_dir_lookup on dir node
e7fd4179 3850 FIXME: will a non-zero error ever be returned? */
e7fd4179
DT
3851
3852 r = lkb->lkb_resource;
3853 hold_rsb(r);
3854 lock_rsb(r);
3855
ef0c2bb0
DT
3856 error = remove_from_waiters(lkb, DLM_MSG_LOOKUP_REPLY);
3857 if (error)
3858 goto out;
3859
e7fd4179
DT
3860 ret_nodeid = ms->m_nodeid;
3861 if (ret_nodeid == dlm_our_nodeid()) {
3862 r->res_nodeid = 0;
3863 ret_nodeid = 0;
3864 r->res_first_lkid = 0;
3865 } else {
3866 /* set_master() will copy res_nodeid to lkb_nodeid */
3867 r->res_nodeid = ret_nodeid;
3868 }
3869
ef0c2bb0
DT
3870 if (is_overlap(lkb)) {
3871 log_debug(ls, "receive_lookup_reply %x unlock %x",
3872 lkb->lkb_id, lkb->lkb_flags);
3873 queue_cast_overlap(r, lkb);
3874 unhold_lkb(lkb); /* undoes create_lkb() */
3875 goto out_list;
3876 }
3877
e7fd4179
DT
3878 _request_lock(r, lkb);
3879
ef0c2bb0 3880 out_list:
e7fd4179
DT
3881 if (!ret_nodeid)
3882 process_lookup_list(r);
ef0c2bb0 3883 out:
e7fd4179
DT
3884 unlock_rsb(r);
3885 put_rsb(r);
b3f58d8f 3886 dlm_put_lkb(lkb);
e7fd4179
DT
3887}
3888
c36258b5 3889static void _receive_message(struct dlm_ls *ls, struct dlm_message *ms)
e7fd4179 3890{
46b43eed
DT
3891 if (!dlm_is_member(ls, ms->m_header.h_nodeid)) {
3892 log_debug(ls, "ignore non-member message %d from %d %x %x %d",
3893 ms->m_type, ms->m_header.h_nodeid, ms->m_lkid,
3894 ms->m_remid, ms->m_result);
3895 return;
3896 }
3897
e7fd4179
DT
3898 switch (ms->m_type) {
3899
3900 /* messages sent to a master node */
3901
3902 case DLM_MSG_REQUEST:
3903 receive_request(ls, ms);
3904 break;
3905
3906 case DLM_MSG_CONVERT:
3907 receive_convert(ls, ms);
3908 break;
3909
3910 case DLM_MSG_UNLOCK:
3911 receive_unlock(ls, ms);
3912 break;
3913
3914 case DLM_MSG_CANCEL:
3915 receive_cancel(ls, ms);
3916 break;
3917
3918 /* messages sent from a master node (replies to above) */
3919
3920 case DLM_MSG_REQUEST_REPLY:
3921 receive_request_reply(ls, ms);
3922 break;
3923
3924 case DLM_MSG_CONVERT_REPLY:
3925 receive_convert_reply(ls, ms);
3926 break;
3927
3928 case DLM_MSG_UNLOCK_REPLY:
3929 receive_unlock_reply(ls, ms);
3930 break;
3931
3932 case DLM_MSG_CANCEL_REPLY:
3933 receive_cancel_reply(ls, ms);
3934 break;
3935
3936 /* messages sent from a master node (only two types of async msg) */
3937
3938 case DLM_MSG_GRANT:
3939 receive_grant(ls, ms);
3940 break;
3941
3942 case DLM_MSG_BAST:
3943 receive_bast(ls, ms);
3944 break;
3945
3946 /* messages sent to a dir node */
3947
3948 case DLM_MSG_LOOKUP:
3949 receive_lookup(ls, ms);
3950 break;
3951
3952 case DLM_MSG_REMOVE:
3953 receive_remove(ls, ms);
3954 break;
3955
3956 /* messages sent from a dir node (remove has no reply) */
3957
3958 case DLM_MSG_LOOKUP_REPLY:
3959 receive_lookup_reply(ls, ms);
3960 break;
3961
8499137d
DT
3962 /* other messages */
3963
3964 case DLM_MSG_PURGE:
3965 receive_purge(ls, ms);
3966 break;
3967
e7fd4179
DT
3968 default:
3969 log_error(ls, "unknown message type %d", ms->m_type);
3970 }
3971
e7fd4179 3972 dlm_astd_wake();
e7fd4179
DT
3973}
3974
c36258b5
DT
3975/* If the lockspace is in recovery mode (locking stopped), then normal
3976 messages are saved on the requestqueue for processing after recovery is
3977 done. When not in recovery mode, we wait for dlm_recoverd to drain saved
3978 messages off the requestqueue before we process new ones. This occurs right
3979 after recovery completes when we transition from saving all messages on
3980 requestqueue, to processing all the saved messages, to processing new
3981 messages as they arrive. */
e7fd4179 3982
c36258b5
DT
3983static void dlm_receive_message(struct dlm_ls *ls, struct dlm_message *ms,
3984 int nodeid)
3985{
3986 if (dlm_locking_stopped(ls)) {
8b0d8e03 3987 dlm_add_requestqueue(ls, nodeid, ms);
c36258b5
DT
3988 } else {
3989 dlm_wait_requestqueue(ls);
3990 _receive_message(ls, ms);
3991 }
3992}
3993
3994/* This is called by dlm_recoverd to process messages that were saved on
3995 the requestqueue. */
3996
3997void dlm_receive_message_saved(struct dlm_ls *ls, struct dlm_message *ms)
3998{
3999 _receive_message(ls, ms);
4000}
4001
4002/* This is called by the midcomms layer when something is received for
4003 the lockspace. It could be either a MSG (normal message sent as part of
4004 standard locking activity) or an RCOM (recovery message sent as part of
4005 lockspace recovery). */
4006
eef7d739 4007void dlm_receive_buffer(union dlm_packet *p, int nodeid)
c36258b5 4008{
eef7d739 4009 struct dlm_header *hd = &p->header;
c36258b5
DT
4010 struct dlm_ls *ls;
4011 int type = 0;
4012
4013 switch (hd->h_cmd) {
4014 case DLM_MSG:
eef7d739
AV
4015 dlm_message_in(&p->message);
4016 type = p->message.m_type;
c36258b5
DT
4017 break;
4018 case DLM_RCOM:
eef7d739
AV
4019 dlm_rcom_in(&p->rcom);
4020 type = p->rcom.rc_type;
c36258b5
DT
4021 break;
4022 default:
4023 log_print("invalid h_cmd %d from %u", hd->h_cmd, nodeid);
4024 return;
4025 }
4026
4027 if (hd->h_nodeid != nodeid) {
4028 log_print("invalid h_nodeid %d from %d lockspace %x",
4029 hd->h_nodeid, nodeid, hd->h_lockspace);
4030 return;
4031 }
4032
4033 ls = dlm_find_lockspace_global(hd->h_lockspace);
4034 if (!ls) {
594199eb
DT
4035 if (dlm_config.ci_log_debug)
4036 log_print("invalid lockspace %x from %d cmd %d type %d",
4037 hd->h_lockspace, nodeid, hd->h_cmd, type);
c36258b5
DT
4038
4039 if (hd->h_cmd == DLM_RCOM && type == DLM_RCOM_STATUS)
eef7d739 4040 dlm_send_ls_not_ready(nodeid, &p->rcom);
c36258b5
DT
4041 return;
4042 }
4043
4044 /* this rwsem allows dlm_ls_stop() to wait for all dlm_recv threads to
4045 be inactive (in this ls) before transitioning to recovery mode */
4046
4047 down_read(&ls->ls_recv_active);
4048 if (hd->h_cmd == DLM_MSG)
eef7d739 4049 dlm_receive_message(ls, &p->message, nodeid);
c36258b5 4050 else
eef7d739 4051 dlm_receive_rcom(ls, &p->rcom, nodeid);
c36258b5
DT
4052 up_read(&ls->ls_recv_active);
4053
4054 dlm_put_lockspace(ls);
4055}
e7fd4179 4056
2a7ce0ed
DT
4057static void recover_convert_waiter(struct dlm_ls *ls, struct dlm_lkb *lkb,
4058 struct dlm_message *ms_stub)
e7fd4179
DT
4059{
4060 if (middle_conversion(lkb)) {
4061 hold_lkb(lkb);
2a7ce0ed
DT
4062 memset(ms_stub, 0, sizeof(struct dlm_message));
4063 ms_stub->m_flags = DLM_IFL_STUB_MS;
4064 ms_stub->m_type = DLM_MSG_CONVERT_REPLY;
4065 ms_stub->m_result = -EINPROGRESS;
4066 ms_stub->m_header.h_nodeid = lkb->lkb_nodeid;
4067 _receive_convert_reply(lkb, ms_stub);
e7fd4179
DT
4068
4069 /* Same special case as in receive_rcom_lock_args() */
4070 lkb->lkb_grmode = DLM_LOCK_IV;
4071 rsb_set_flag(lkb->lkb_resource, RSB_RECOVER_CONVERT);
4072 unhold_lkb(lkb);
4073
4074 } else if (lkb->lkb_rqmode >= lkb->lkb_grmode) {
4075 lkb->lkb_flags |= DLM_IFL_RESEND;
4076 }
4077
4078 /* lkb->lkb_rqmode < lkb->lkb_grmode shouldn't happen since down
4079 conversions are async; there's no reply from the remote master */
4080}
4081
4082/* A waiting lkb needs recovery if the master node has failed, or
4083 the master node is changing (only when no directory is used) */
4084
4085static int waiter_needs_recovery(struct dlm_ls *ls, struct dlm_lkb *lkb)
4086{
4087 if (dlm_is_removed(ls, lkb->lkb_nodeid))
4088 return 1;
4089
4090 if (!dlm_no_directory(ls))
4091 return 0;
4092
4093 if (dlm_dir_nodeid(lkb->lkb_resource) != lkb->lkb_nodeid)
4094 return 1;
4095
4096 return 0;
4097}
4098
4099/* Recovery for locks that are waiting for replies from nodes that are now
4100 gone. We can just complete unlocks and cancels by faking a reply from the
4101 dead node. Requests and up-conversions we flag to be resent after
4102 recovery. Down-conversions can just be completed with a fake reply like
4103 unlocks. Conversions between PR and CW need special attention. */
4104
4105void dlm_recover_waiters_pre(struct dlm_ls *ls)
4106{
4107 struct dlm_lkb *lkb, *safe;
2a7ce0ed 4108 struct dlm_message *ms_stub;
601342ce 4109 int wait_type, stub_unlock_result, stub_cancel_result;
e7fd4179 4110
a22ca480 4111 ms_stub = kmalloc(sizeof(struct dlm_message), GFP_KERNEL);
2a7ce0ed
DT
4112 if (!ms_stub) {
4113 log_error(ls, "dlm_recover_waiters_pre no mem");
4114 return;
4115 }
4116
90135925 4117 mutex_lock(&ls->ls_waiters_mutex);
e7fd4179
DT
4118
4119 list_for_each_entry_safe(lkb, safe, &ls->ls_waiters, lkb_wait_reply) {
2a7ce0ed
DT
4120
4121 /* exclude debug messages about unlocks because there can be so
4122 many and they aren't very interesting */
4123
4124 if (lkb->lkb_wait_type != DLM_MSG_UNLOCK) {
4125 log_debug(ls, "recover_waiter %x nodeid %d "
4126 "msg %d to %d", lkb->lkb_id, lkb->lkb_nodeid,
4127 lkb->lkb_wait_type, lkb->lkb_wait_nodeid);
4128 }
e7fd4179
DT
4129
4130 /* all outstanding lookups, regardless of destination will be
4131 resent after recovery is done */
4132
4133 if (lkb->lkb_wait_type == DLM_MSG_LOOKUP) {
4134 lkb->lkb_flags |= DLM_IFL_RESEND;
4135 continue;
4136 }
4137
4138 if (!waiter_needs_recovery(ls, lkb))
4139 continue;
4140
601342ce
DT
4141 wait_type = lkb->lkb_wait_type;
4142 stub_unlock_result = -DLM_EUNLOCK;
4143 stub_cancel_result = -DLM_ECANCEL;
4144
4145 /* Main reply may have been received leaving a zero wait_type,
4146 but a reply for the overlapping op may not have been
4147 received. In that case we need to fake the appropriate
4148 reply for the overlap op. */
4149
4150 if (!wait_type) {
4151 if (is_overlap_cancel(lkb)) {
4152 wait_type = DLM_MSG_CANCEL;
4153 if (lkb->lkb_grmode == DLM_LOCK_IV)
4154 stub_cancel_result = 0;
4155 }
4156 if (is_overlap_unlock(lkb)) {
4157 wait_type = DLM_MSG_UNLOCK;
4158 if (lkb->lkb_grmode == DLM_LOCK_IV)
4159 stub_unlock_result = -ENOENT;
4160 }
4161
4162 log_debug(ls, "rwpre overlap %x %x %d %d %d",
4163 lkb->lkb_id, lkb->lkb_flags, wait_type,
4164 stub_cancel_result, stub_unlock_result);
4165 }
4166
4167 switch (wait_type) {
e7fd4179
DT
4168
4169 case DLM_MSG_REQUEST:
4170 lkb->lkb_flags |= DLM_IFL_RESEND;
4171 break;
4172
4173 case DLM_MSG_CONVERT:
2a7ce0ed 4174 recover_convert_waiter(ls, lkb, ms_stub);
e7fd4179
DT
4175 break;
4176
4177 case DLM_MSG_UNLOCK:
4178 hold_lkb(lkb);
2a7ce0ed
DT
4179 memset(ms_stub, 0, sizeof(struct dlm_message));
4180 ms_stub->m_flags = DLM_IFL_STUB_MS;
4181 ms_stub->m_type = DLM_MSG_UNLOCK_REPLY;
4182 ms_stub->m_result = stub_unlock_result;
4183 ms_stub->m_header.h_nodeid = lkb->lkb_nodeid;
4184 _receive_unlock_reply(lkb, ms_stub);
b3f58d8f 4185 dlm_put_lkb(lkb);
e7fd4179
DT
4186 break;
4187
4188 case DLM_MSG_CANCEL:
4189 hold_lkb(lkb);
2a7ce0ed
DT
4190 memset(ms_stub, 0, sizeof(struct dlm_message));
4191 ms_stub->m_flags = DLM_IFL_STUB_MS;
4192 ms_stub->m_type = DLM_MSG_CANCEL_REPLY;
4193 ms_stub->m_result = stub_cancel_result;
4194 ms_stub->m_header.h_nodeid = lkb->lkb_nodeid;
4195 _receive_cancel_reply(lkb, ms_stub);
b3f58d8f 4196 dlm_put_lkb(lkb);
e7fd4179
DT
4197 break;
4198
4199 default:
601342ce
DT
4200 log_error(ls, "invalid lkb wait_type %d %d",
4201 lkb->lkb_wait_type, wait_type);
e7fd4179 4202 }
81456807 4203 schedule();
e7fd4179 4204 }
90135925 4205 mutex_unlock(&ls->ls_waiters_mutex);
2a7ce0ed 4206 kfree(ms_stub);
e7fd4179
DT
4207}
4208
ef0c2bb0 4209static struct dlm_lkb *find_resend_waiter(struct dlm_ls *ls)
e7fd4179
DT
4210{
4211 struct dlm_lkb *lkb;
ef0c2bb0 4212 int found = 0;
e7fd4179 4213
90135925 4214 mutex_lock(&ls->ls_waiters_mutex);
e7fd4179
DT
4215 list_for_each_entry(lkb, &ls->ls_waiters, lkb_wait_reply) {
4216 if (lkb->lkb_flags & DLM_IFL_RESEND) {
ef0c2bb0
DT
4217 hold_lkb(lkb);
4218 found = 1;
e7fd4179
DT
4219 break;
4220 }
4221 }
90135925 4222 mutex_unlock(&ls->ls_waiters_mutex);
e7fd4179 4223
ef0c2bb0 4224 if (!found)
e7fd4179 4225 lkb = NULL;
ef0c2bb0 4226 return lkb;
e7fd4179
DT
4227}
4228
4229/* Deal with lookups and lkb's marked RESEND from _pre. We may now be the
4230 master or dir-node for r. Processing the lkb may result in it being placed
4231 back on waiters. */
4232
ef0c2bb0
DT
4233/* We do this after normal locking has been enabled and any saved messages
4234 (in requestqueue) have been processed. We should be confident that at
4235 this point we won't get or process a reply to any of these waiting
4236 operations. But, new ops may be coming in on the rsbs/locks here from
4237 userspace or remotely. */
4238
4239/* there may have been an overlap unlock/cancel prior to recovery or after
4240 recovery. if before, the lkb may still have a pos wait_count; if after, the
4241 overlap flag would just have been set and nothing new sent. we can be
4242 confident here than any replies to either the initial op or overlap ops
4243 prior to recovery have been received. */
4244
e7fd4179
DT
4245int dlm_recover_waiters_post(struct dlm_ls *ls)
4246{
4247 struct dlm_lkb *lkb;
4248 struct dlm_rsb *r;
ef0c2bb0 4249 int error = 0, mstype, err, oc, ou;
e7fd4179
DT
4250
4251 while (1) {
4252 if (dlm_locking_stopped(ls)) {
4253 log_debug(ls, "recover_waiters_post aborted");
4254 error = -EINTR;
4255 break;
4256 }
4257
ef0c2bb0
DT
4258 lkb = find_resend_waiter(ls);
4259 if (!lkb)
e7fd4179
DT
4260 break;
4261
4262 r = lkb->lkb_resource;
ef0c2bb0
DT
4263 hold_rsb(r);
4264 lock_rsb(r);
4265
4266 mstype = lkb->lkb_wait_type;
4267 oc = is_overlap_cancel(lkb);
4268 ou = is_overlap_unlock(lkb);
4269 err = 0;
e7fd4179 4270
2a7ce0ed
DT
4271 log_debug(ls, "recover_waiter %x nodeid %d msg %d r_nodeid %d",
4272 lkb->lkb_id, lkb->lkb_nodeid, mstype, r->res_nodeid);
e7fd4179 4273
ef0c2bb0
DT
4274 /* At this point we assume that we won't get a reply to any
4275 previous op or overlap op on this lock. First, do a big
4276 remove_from_waiters() for all previous ops. */
4277
4278 lkb->lkb_flags &= ~DLM_IFL_RESEND;
4279 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK;
4280 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
4281 lkb->lkb_wait_type = 0;
4282 lkb->lkb_wait_count = 0;
4283 mutex_lock(&ls->ls_waiters_mutex);
4284 list_del_init(&lkb->lkb_wait_reply);
4285 mutex_unlock(&ls->ls_waiters_mutex);
4286 unhold_lkb(lkb); /* for waiters list */
4287
4288 if (oc || ou) {
4289 /* do an unlock or cancel instead of resending */
4290 switch (mstype) {
4291 case DLM_MSG_LOOKUP:
4292 case DLM_MSG_REQUEST:
4293 queue_cast(r, lkb, ou ? -DLM_EUNLOCK :
4294 -DLM_ECANCEL);
4295 unhold_lkb(lkb); /* undoes create_lkb() */
4296 break;
4297 case DLM_MSG_CONVERT:
4298 if (oc) {
4299 queue_cast(r, lkb, -DLM_ECANCEL);
4300 } else {
4301 lkb->lkb_exflags |= DLM_LKF_FORCEUNLOCK;
4302 _unlock_lock(r, lkb);
4303 }
4304 break;
4305 default:
4306 err = 1;
4307 }
4308 } else {
4309 switch (mstype) {
4310 case DLM_MSG_LOOKUP:
4311 case DLM_MSG_REQUEST:
4312 _request_lock(r, lkb);
4313 if (is_master(r))
4314 confirm_master(r, 0);
4315 break;
4316 case DLM_MSG_CONVERT:
4317 _convert_lock(r, lkb);
4318 break;
4319 default:
4320 err = 1;
4321 }
e7fd4179 4322 }
ef0c2bb0
DT
4323
4324 if (err)
4325 log_error(ls, "recover_waiters_post %x %d %x %d %d",
4326 lkb->lkb_id, mstype, lkb->lkb_flags, oc, ou);
4327 unlock_rsb(r);
4328 put_rsb(r);
4329 dlm_put_lkb(lkb);
e7fd4179
DT
4330 }
4331
4332 return error;
4333}
4334
4335static void purge_queue(struct dlm_rsb *r, struct list_head *queue,
4336 int (*test)(struct dlm_ls *ls, struct dlm_lkb *lkb))
4337{
4338 struct dlm_ls *ls = r->res_ls;
4339 struct dlm_lkb *lkb, *safe;
4340
4341 list_for_each_entry_safe(lkb, safe, queue, lkb_statequeue) {
4342 if (test(ls, lkb)) {
97a35d1e 4343 rsb_set_flag(r, RSB_LOCKS_PURGED);
e7fd4179
DT
4344 del_lkb(r, lkb);
4345 /* this put should free the lkb */
b3f58d8f 4346 if (!dlm_put_lkb(lkb))
e7fd4179
DT
4347 log_error(ls, "purged lkb not released");
4348 }
4349 }
4350}
4351
4352static int purge_dead_test(struct dlm_ls *ls, struct dlm_lkb *lkb)
4353{
4354 return (is_master_copy(lkb) && dlm_is_removed(ls, lkb->lkb_nodeid));
4355}
4356
4357static int purge_mstcpy_test(struct dlm_ls *ls, struct dlm_lkb *lkb)
4358{
4359 return is_master_copy(lkb);
4360}
4361
4362static void purge_dead_locks(struct dlm_rsb *r)
4363{
4364 purge_queue(r, &r->res_grantqueue, &purge_dead_test);
4365 purge_queue(r, &r->res_convertqueue, &purge_dead_test);
4366 purge_queue(r, &r->res_waitqueue, &purge_dead_test);
4367}
4368
4369void dlm_purge_mstcpy_locks(struct dlm_rsb *r)
4370{
4371 purge_queue(r, &r->res_grantqueue, &purge_mstcpy_test);
4372 purge_queue(r, &r->res_convertqueue, &purge_mstcpy_test);
4373 purge_queue(r, &r->res_waitqueue, &purge_mstcpy_test);
4374}
4375
4376/* Get rid of locks held by nodes that are gone. */
4377
4378int dlm_purge_locks(struct dlm_ls *ls)
4379{
4380 struct dlm_rsb *r;
4381
4382 log_debug(ls, "dlm_purge_locks");
4383
4384 down_write(&ls->ls_root_sem);
4385 list_for_each_entry(r, &ls->ls_root_list, res_root_list) {
4386 hold_rsb(r);
4387 lock_rsb(r);
4388 if (is_master(r))
4389 purge_dead_locks(r);
4390 unlock_rsb(r);
4391 unhold_rsb(r);
4392
4393 schedule();
4394 }
4395 up_write(&ls->ls_root_sem);
4396
4397 return 0;
4398}
4399
97a35d1e
DT
4400static struct dlm_rsb *find_purged_rsb(struct dlm_ls *ls, int bucket)
4401{
4402 struct dlm_rsb *r, *r_ret = NULL;
4403
c7be761a 4404 spin_lock(&ls->ls_rsbtbl[bucket].lock);
97a35d1e
DT
4405 list_for_each_entry(r, &ls->ls_rsbtbl[bucket].list, res_hashchain) {
4406 if (!rsb_flag(r, RSB_LOCKS_PURGED))
4407 continue;
4408 hold_rsb(r);
4409 rsb_clear_flag(r, RSB_LOCKS_PURGED);
4410 r_ret = r;
4411 break;
4412 }
c7be761a 4413 spin_unlock(&ls->ls_rsbtbl[bucket].lock);
97a35d1e
DT
4414 return r_ret;
4415}
4416
4417void dlm_grant_after_purge(struct dlm_ls *ls)
e7fd4179
DT
4418{
4419 struct dlm_rsb *r;
2b4e926a 4420 int bucket = 0;
e7fd4179 4421
2b4e926a
DT
4422 while (1) {
4423 r = find_purged_rsb(ls, bucket);
4424 if (!r) {
4425 if (bucket == ls->ls_rsbtbl_size - 1)
4426 break;
4427 bucket++;
97a35d1e 4428 continue;
2b4e926a 4429 }
97a35d1e
DT
4430 lock_rsb(r);
4431 if (is_master(r)) {
4432 grant_pending_locks(r);
4433 confirm_master(r, 0);
e7fd4179 4434 }
97a35d1e
DT
4435 unlock_rsb(r);
4436 put_rsb(r);
2b4e926a 4437 schedule();
e7fd4179 4438 }
e7fd4179
DT
4439}
4440
4441static struct dlm_lkb *search_remid_list(struct list_head *head, int nodeid,
4442 uint32_t remid)
4443{
4444 struct dlm_lkb *lkb;
4445
4446 list_for_each_entry(lkb, head, lkb_statequeue) {
4447 if (lkb->lkb_nodeid == nodeid && lkb->lkb_remid == remid)
4448 return lkb;
4449 }
4450 return NULL;
4451}
4452
4453static struct dlm_lkb *search_remid(struct dlm_rsb *r, int nodeid,
4454 uint32_t remid)
4455{
4456 struct dlm_lkb *lkb;
4457
4458 lkb = search_remid_list(&r->res_grantqueue, nodeid, remid);
4459 if (lkb)
4460 return lkb;
4461 lkb = search_remid_list(&r->res_convertqueue, nodeid, remid);
4462 if (lkb)
4463 return lkb;
4464 lkb = search_remid_list(&r->res_waitqueue, nodeid, remid);
4465 if (lkb)
4466 return lkb;
4467 return NULL;
4468}
4469
ae773d0b 4470/* needs at least dlm_rcom + rcom_lock */
e7fd4179
DT
4471static int receive_rcom_lock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
4472 struct dlm_rsb *r, struct dlm_rcom *rc)
4473{
4474 struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf;
e7fd4179
DT
4475
4476 lkb->lkb_nodeid = rc->rc_header.h_nodeid;
163a1859
AV
4477 lkb->lkb_ownpid = le32_to_cpu(rl->rl_ownpid);
4478 lkb->lkb_remid = le32_to_cpu(rl->rl_lkid);
4479 lkb->lkb_exflags = le32_to_cpu(rl->rl_exflags);
4480 lkb->lkb_flags = le32_to_cpu(rl->rl_flags) & 0x0000FFFF;
e7fd4179 4481 lkb->lkb_flags |= DLM_IFL_MSTCPY;
163a1859 4482 lkb->lkb_lvbseq = le32_to_cpu(rl->rl_lvbseq);
e7fd4179
DT
4483 lkb->lkb_rqmode = rl->rl_rqmode;
4484 lkb->lkb_grmode = rl->rl_grmode;
4485 /* don't set lkb_status because add_lkb wants to itself */
4486
8304d6f2
DT
4487 lkb->lkb_bastfn = (rl->rl_asts & DLM_CB_BAST) ? &fake_bastfn : NULL;
4488 lkb->lkb_astfn = (rl->rl_asts & DLM_CB_CAST) ? &fake_astfn : NULL;
e7fd4179 4489
e7fd4179 4490 if (lkb->lkb_exflags & DLM_LKF_VALBLK) {
a5dd0631
AV
4491 int lvblen = rc->rc_header.h_length - sizeof(struct dlm_rcom) -
4492 sizeof(struct rcom_lock);
4493 if (lvblen > ls->ls_lvblen)
4494 return -EINVAL;
52bda2b5 4495 lkb->lkb_lvbptr = dlm_allocate_lvb(ls);
e7fd4179
DT
4496 if (!lkb->lkb_lvbptr)
4497 return -ENOMEM;
e7fd4179
DT
4498 memcpy(lkb->lkb_lvbptr, rl->rl_lvb, lvblen);
4499 }
4500
4501 /* Conversions between PR and CW (middle modes) need special handling.
4502 The real granted mode of these converting locks cannot be determined
4503 until all locks have been rebuilt on the rsb (recover_conversion) */
4504
163a1859
AV
4505 if (rl->rl_wait_type == cpu_to_le16(DLM_MSG_CONVERT) &&
4506 middle_conversion(lkb)) {
e7fd4179
DT
4507 rl->rl_status = DLM_LKSTS_CONVERT;
4508 lkb->lkb_grmode = DLM_LOCK_IV;
4509 rsb_set_flag(r, RSB_RECOVER_CONVERT);
4510 }
4511
4512 return 0;
4513}
4514
4515/* This lkb may have been recovered in a previous aborted recovery so we need
4516 to check if the rsb already has an lkb with the given remote nodeid/lkid.
4517 If so we just send back a standard reply. If not, we create a new lkb with
4518 the given values and send back our lkid. We send back our lkid by sending
4519 back the rcom_lock struct we got but with the remid field filled in. */
4520
ae773d0b 4521/* needs at least dlm_rcom + rcom_lock */
e7fd4179
DT
4522int dlm_recover_master_copy(struct dlm_ls *ls, struct dlm_rcom *rc)
4523{
4524 struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf;
4525 struct dlm_rsb *r;
4526 struct dlm_lkb *lkb;
4527 int error;
4528
4529 if (rl->rl_parent_lkid) {
4530 error = -EOPNOTSUPP;
4531 goto out;
4532 }
4533
163a1859
AV
4534 error = find_rsb(ls, rl->rl_name, le16_to_cpu(rl->rl_namelen),
4535 R_MASTER, &r);
e7fd4179
DT
4536 if (error)
4537 goto out;
4538
4539 lock_rsb(r);
4540
163a1859 4541 lkb = search_remid(r, rc->rc_header.h_nodeid, le32_to_cpu(rl->rl_lkid));
e7fd4179
DT
4542 if (lkb) {
4543 error = -EEXIST;
4544 goto out_remid;
4545 }
4546
4547 error = create_lkb(ls, &lkb);
4548 if (error)
4549 goto out_unlock;
4550
4551 error = receive_rcom_lock_args(ls, lkb, r, rc);
4552 if (error) {
b3f58d8f 4553 __put_lkb(ls, lkb);
e7fd4179
DT
4554 goto out_unlock;
4555 }
4556
4557 attach_lkb(r, lkb);
4558 add_lkb(r, lkb, rl->rl_status);
4559 error = 0;
4560
4561 out_remid:
4562 /* this is the new value returned to the lock holder for
4563 saving in its process-copy lkb */
163a1859 4564 rl->rl_remid = cpu_to_le32(lkb->lkb_id);
e7fd4179
DT
4565
4566 out_unlock:
4567 unlock_rsb(r);
4568 put_rsb(r);
4569 out:
4570 if (error)
163a1859
AV
4571 log_debug(ls, "recover_master_copy %d %x", error,
4572 le32_to_cpu(rl->rl_lkid));
4573 rl->rl_result = cpu_to_le32(error);
e7fd4179
DT
4574 return error;
4575}
4576
ae773d0b 4577/* needs at least dlm_rcom + rcom_lock */
e7fd4179
DT
4578int dlm_recover_process_copy(struct dlm_ls *ls, struct dlm_rcom *rc)
4579{
4580 struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf;
4581 struct dlm_rsb *r;
4582 struct dlm_lkb *lkb;
4583 int error;
4584
163a1859 4585 error = find_lkb(ls, le32_to_cpu(rl->rl_lkid), &lkb);
e7fd4179 4586 if (error) {
163a1859
AV
4587 log_error(ls, "recover_process_copy no lkid %x",
4588 le32_to_cpu(rl->rl_lkid));
e7fd4179
DT
4589 return error;
4590 }
4591
4592 DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb););
4593
163a1859 4594 error = le32_to_cpu(rl->rl_result);
e7fd4179
DT
4595
4596 r = lkb->lkb_resource;
4597 hold_rsb(r);
4598 lock_rsb(r);
4599
4600 switch (error) {
dc200a88
DT
4601 case -EBADR:
4602 /* There's a chance the new master received our lock before
4603 dlm_recover_master_reply(), this wouldn't happen if we did
4604 a barrier between recover_masters and recover_locks. */
4605 log_debug(ls, "master copy not ready %x r %lx %s", lkb->lkb_id,
4606 (unsigned long)r, r->res_name);
4607 dlm_send_rcom_lock(r, lkb);
4608 goto out;
e7fd4179
DT
4609 case -EEXIST:
4610 log_debug(ls, "master copy exists %x", lkb->lkb_id);
4611 /* fall through */
4612 case 0:
163a1859 4613 lkb->lkb_remid = le32_to_cpu(rl->rl_remid);
e7fd4179
DT
4614 break;
4615 default:
4616 log_error(ls, "dlm_recover_process_copy unknown error %d %x",
4617 error, lkb->lkb_id);
4618 }
4619
4620 /* an ack for dlm_recover_locks() which waits for replies from
4621 all the locks it sends to new masters */
4622 dlm_recovered_lock(r);
dc200a88 4623 out:
e7fd4179
DT
4624 unlock_rsb(r);
4625 put_rsb(r);
b3f58d8f 4626 dlm_put_lkb(lkb);
e7fd4179
DT
4627
4628 return 0;
4629}
4630
597d0cae
DT
4631int dlm_user_request(struct dlm_ls *ls, struct dlm_user_args *ua,
4632 int mode, uint32_t flags, void *name, unsigned int namelen,
d7db923e 4633 unsigned long timeout_cs)
597d0cae
DT
4634{
4635 struct dlm_lkb *lkb;
4636 struct dlm_args args;
4637 int error;
4638
85e86edf 4639 dlm_lock_recovery(ls);
597d0cae
DT
4640
4641 error = create_lkb(ls, &lkb);
4642 if (error) {
4643 kfree(ua);
4644 goto out;
4645 }
4646
4647 if (flags & DLM_LKF_VALBLK) {
573c24c4 4648 ua->lksb.sb_lvbptr = kzalloc(DLM_USER_LVB_LEN, GFP_NOFS);
597d0cae
DT
4649 if (!ua->lksb.sb_lvbptr) {
4650 kfree(ua);
4651 __put_lkb(ls, lkb);
4652 error = -ENOMEM;
4653 goto out;
4654 }
4655 }
4656
52bda2b5 4657 /* After ua is attached to lkb it will be freed by dlm_free_lkb().
597d0cae
DT
4658 When DLM_IFL_USER is set, the dlm knows that this is a userspace
4659 lock and that lkb_astparam is the dlm_user_args structure. */
4660
d7db923e 4661 error = set_lock_args(mode, &ua->lksb, flags, namelen, timeout_cs,
e5dae548 4662 fake_astfn, ua, fake_bastfn, &args);
597d0cae 4663 lkb->lkb_flags |= DLM_IFL_USER;
597d0cae
DT
4664
4665 if (error) {
4666 __put_lkb(ls, lkb);
4667 goto out;
4668 }
4669
4670 error = request_lock(ls, lkb, name, namelen, &args);
4671
4672 switch (error) {
4673 case 0:
4674 break;
4675 case -EINPROGRESS:
4676 error = 0;
4677 break;
4678 case -EAGAIN:
4679 error = 0;
4680 /* fall through */
4681 default:
4682 __put_lkb(ls, lkb);
4683 goto out;
4684 }
4685
4686 /* add this new lkb to the per-process list of locks */
4687 spin_lock(&ua->proc->locks_spin);
ef0c2bb0 4688 hold_lkb(lkb);
597d0cae
DT
4689 list_add_tail(&lkb->lkb_ownqueue, &ua->proc->locks);
4690 spin_unlock(&ua->proc->locks_spin);
4691 out:
85e86edf 4692 dlm_unlock_recovery(ls);
597d0cae
DT
4693 return error;
4694}
4695
4696int dlm_user_convert(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
d7db923e
DT
4697 int mode, uint32_t flags, uint32_t lkid, char *lvb_in,
4698 unsigned long timeout_cs)
597d0cae
DT
4699{
4700 struct dlm_lkb *lkb;
4701 struct dlm_args args;
4702 struct dlm_user_args *ua;
4703 int error;
4704
85e86edf 4705 dlm_lock_recovery(ls);
597d0cae
DT
4706
4707 error = find_lkb(ls, lkid, &lkb);
4708 if (error)
4709 goto out;
4710
4711 /* user can change the params on its lock when it converts it, or
4712 add an lvb that didn't exist before */
4713
d292c0cc 4714 ua = lkb->lkb_ua;
597d0cae
DT
4715
4716 if (flags & DLM_LKF_VALBLK && !ua->lksb.sb_lvbptr) {
573c24c4 4717 ua->lksb.sb_lvbptr = kzalloc(DLM_USER_LVB_LEN, GFP_NOFS);
597d0cae
DT
4718 if (!ua->lksb.sb_lvbptr) {
4719 error = -ENOMEM;
4720 goto out_put;
4721 }
4722 }
4723 if (lvb_in && ua->lksb.sb_lvbptr)
4724 memcpy(ua->lksb.sb_lvbptr, lvb_in, DLM_USER_LVB_LEN);
4725
d7db923e 4726 ua->xid = ua_tmp->xid;
597d0cae
DT
4727 ua->castparam = ua_tmp->castparam;
4728 ua->castaddr = ua_tmp->castaddr;
4729 ua->bastparam = ua_tmp->bastparam;
4730 ua->bastaddr = ua_tmp->bastaddr;
10948eb4 4731 ua->user_lksb = ua_tmp->user_lksb;
597d0cae 4732
d7db923e 4733 error = set_lock_args(mode, &ua->lksb, flags, 0, timeout_cs,
e5dae548 4734 fake_astfn, ua, fake_bastfn, &args);
597d0cae
DT
4735 if (error)
4736 goto out_put;
4737
4738 error = convert_lock(ls, lkb, &args);
4739
c85d65e9 4740 if (error == -EINPROGRESS || error == -EAGAIN || error == -EDEADLK)
597d0cae
DT
4741 error = 0;
4742 out_put:
4743 dlm_put_lkb(lkb);
4744 out:
85e86edf 4745 dlm_unlock_recovery(ls);
597d0cae
DT
4746 kfree(ua_tmp);
4747 return error;
4748}
4749
4750int dlm_user_unlock(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
4751 uint32_t flags, uint32_t lkid, char *lvb_in)
4752{
4753 struct dlm_lkb *lkb;
4754 struct dlm_args args;
4755 struct dlm_user_args *ua;
4756 int error;
4757
85e86edf 4758 dlm_lock_recovery(ls);
597d0cae
DT
4759
4760 error = find_lkb(ls, lkid, &lkb);
4761 if (error)
4762 goto out;
4763
d292c0cc 4764 ua = lkb->lkb_ua;
597d0cae
DT
4765
4766 if (lvb_in && ua->lksb.sb_lvbptr)
4767 memcpy(ua->lksb.sb_lvbptr, lvb_in, DLM_USER_LVB_LEN);
b434eda6
PC
4768 if (ua_tmp->castparam)
4769 ua->castparam = ua_tmp->castparam;
cc346d55 4770 ua->user_lksb = ua_tmp->user_lksb;
597d0cae
DT
4771
4772 error = set_unlock_args(flags, ua, &args);
4773 if (error)
4774 goto out_put;
4775
4776 error = unlock_lock(ls, lkb, &args);
4777
4778 if (error == -DLM_EUNLOCK)
4779 error = 0;
ef0c2bb0
DT
4780 /* from validate_unlock_args() */
4781 if (error == -EBUSY && (flags & DLM_LKF_FORCEUNLOCK))
4782 error = 0;
597d0cae
DT
4783 if (error)
4784 goto out_put;
4785
4786 spin_lock(&ua->proc->locks_spin);
a1bc86e6
DT
4787 /* dlm_user_add_ast() may have already taken lkb off the proc list */
4788 if (!list_empty(&lkb->lkb_ownqueue))
4789 list_move(&lkb->lkb_ownqueue, &ua->proc->unlocking);
597d0cae 4790 spin_unlock(&ua->proc->locks_spin);
597d0cae
DT
4791 out_put:
4792 dlm_put_lkb(lkb);
4793 out:
85e86edf 4794 dlm_unlock_recovery(ls);
ef0c2bb0 4795 kfree(ua_tmp);
597d0cae
DT
4796 return error;
4797}
4798
4799int dlm_user_cancel(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
4800 uint32_t flags, uint32_t lkid)
4801{
4802 struct dlm_lkb *lkb;
4803 struct dlm_args args;
4804 struct dlm_user_args *ua;
4805 int error;
4806
85e86edf 4807 dlm_lock_recovery(ls);
597d0cae
DT
4808
4809 error = find_lkb(ls, lkid, &lkb);
4810 if (error)
4811 goto out;
4812
d292c0cc 4813 ua = lkb->lkb_ua;
b434eda6
PC
4814 if (ua_tmp->castparam)
4815 ua->castparam = ua_tmp->castparam;
c059f70e 4816 ua->user_lksb = ua_tmp->user_lksb;
597d0cae
DT
4817
4818 error = set_unlock_args(flags, ua, &args);
4819 if (error)
4820 goto out_put;
4821
4822 error = cancel_lock(ls, lkb, &args);
4823
4824 if (error == -DLM_ECANCEL)
4825 error = 0;
ef0c2bb0
DT
4826 /* from validate_unlock_args() */
4827 if (error == -EBUSY)
4828 error = 0;
597d0cae
DT
4829 out_put:
4830 dlm_put_lkb(lkb);
4831 out:
85e86edf 4832 dlm_unlock_recovery(ls);
ef0c2bb0 4833 kfree(ua_tmp);
597d0cae
DT
4834 return error;
4835}
4836
8b4021fa
DT
4837int dlm_user_deadlock(struct dlm_ls *ls, uint32_t flags, uint32_t lkid)
4838{
4839 struct dlm_lkb *lkb;
4840 struct dlm_args args;
4841 struct dlm_user_args *ua;
4842 struct dlm_rsb *r;
4843 int error;
4844
4845 dlm_lock_recovery(ls);
4846
4847 error = find_lkb(ls, lkid, &lkb);
4848 if (error)
4849 goto out;
4850
d292c0cc 4851 ua = lkb->lkb_ua;
8b4021fa
DT
4852
4853 error = set_unlock_args(flags, ua, &args);
4854 if (error)
4855 goto out_put;
4856
4857 /* same as cancel_lock(), but set DEADLOCK_CANCEL after lock_rsb */
4858
4859 r = lkb->lkb_resource;
4860 hold_rsb(r);
4861 lock_rsb(r);
4862
4863 error = validate_unlock_args(lkb, &args);
4864 if (error)
4865 goto out_r;
4866 lkb->lkb_flags |= DLM_IFL_DEADLOCK_CANCEL;
4867
4868 error = _cancel_lock(r, lkb);
4869 out_r:
4870 unlock_rsb(r);
4871 put_rsb(r);
4872
4873 if (error == -DLM_ECANCEL)
4874 error = 0;
4875 /* from validate_unlock_args() */
4876 if (error == -EBUSY)
4877 error = 0;
4878 out_put:
4879 dlm_put_lkb(lkb);
4880 out:
4881 dlm_unlock_recovery(ls);
4882 return error;
4883}
4884
ef0c2bb0
DT
4885/* lkb's that are removed from the waiters list by revert are just left on the
4886 orphans list with the granted orphan locks, to be freed by purge */
4887
597d0cae
DT
4888static int orphan_proc_lock(struct dlm_ls *ls, struct dlm_lkb *lkb)
4889{
ef0c2bb0
DT
4890 struct dlm_args args;
4891 int error;
597d0cae 4892
ef0c2bb0
DT
4893 hold_lkb(lkb);
4894 mutex_lock(&ls->ls_orphans_mutex);
4895 list_add_tail(&lkb->lkb_ownqueue, &ls->ls_orphans);
4896 mutex_unlock(&ls->ls_orphans_mutex);
597d0cae 4897
d292c0cc 4898 set_unlock_args(0, lkb->lkb_ua, &args);
ef0c2bb0
DT
4899
4900 error = cancel_lock(ls, lkb, &args);
4901 if (error == -DLM_ECANCEL)
4902 error = 0;
4903 return error;
597d0cae
DT
4904}
4905
4906/* The force flag allows the unlock to go ahead even if the lkb isn't granted.
4907 Regardless of what rsb queue the lock is on, it's removed and freed. */
4908
4909static int unlock_proc_lock(struct dlm_ls *ls, struct dlm_lkb *lkb)
4910{
597d0cae
DT
4911 struct dlm_args args;
4912 int error;
4913
d292c0cc 4914 set_unlock_args(DLM_LKF_FORCEUNLOCK, lkb->lkb_ua, &args);
597d0cae
DT
4915
4916 error = unlock_lock(ls, lkb, &args);
4917 if (error == -DLM_EUNLOCK)
4918 error = 0;
4919 return error;
4920}
4921
ef0c2bb0
DT
4922/* We have to release clear_proc_locks mutex before calling unlock_proc_lock()
4923 (which does lock_rsb) due to deadlock with receiving a message that does
4924 lock_rsb followed by dlm_user_add_ast() */
4925
4926static struct dlm_lkb *del_proc_lock(struct dlm_ls *ls,
4927 struct dlm_user_proc *proc)
4928{
4929 struct dlm_lkb *lkb = NULL;
4930
4931 mutex_lock(&ls->ls_clear_proc_locks);
4932 if (list_empty(&proc->locks))
4933 goto out;
4934
4935 lkb = list_entry(proc->locks.next, struct dlm_lkb, lkb_ownqueue);
4936 list_del_init(&lkb->lkb_ownqueue);
4937
4938 if (lkb->lkb_exflags & DLM_LKF_PERSISTENT)
4939 lkb->lkb_flags |= DLM_IFL_ORPHAN;
4940 else
4941 lkb->lkb_flags |= DLM_IFL_DEAD;
4942 out:
4943 mutex_unlock(&ls->ls_clear_proc_locks);
4944 return lkb;
4945}
4946
597d0cae
DT
4947/* The ls_clear_proc_locks mutex protects against dlm_user_add_asts() which
4948 1) references lkb->ua which we free here and 2) adds lkbs to proc->asts,
4949 which we clear here. */
4950
4951/* proc CLOSING flag is set so no more device_reads should look at proc->asts
4952 list, and no more device_writes should add lkb's to proc->locks list; so we
4953 shouldn't need to take asts_spin or locks_spin here. this assumes that
4954 device reads/writes/closes are serialized -- FIXME: we may need to serialize
4955 them ourself. */
4956
4957void dlm_clear_proc_locks(struct dlm_ls *ls, struct dlm_user_proc *proc)
4958{
4959 struct dlm_lkb *lkb, *safe;
4960
85e86edf 4961 dlm_lock_recovery(ls);
597d0cae 4962
ef0c2bb0
DT
4963 while (1) {
4964 lkb = del_proc_lock(ls, proc);
4965 if (!lkb)
4966 break;
84d8cd69 4967 del_timeout(lkb);
ef0c2bb0 4968 if (lkb->lkb_exflags & DLM_LKF_PERSISTENT)
597d0cae 4969 orphan_proc_lock(ls, lkb);
ef0c2bb0 4970 else
597d0cae 4971 unlock_proc_lock(ls, lkb);
597d0cae
DT
4972
4973 /* this removes the reference for the proc->locks list
4974 added by dlm_user_request, it may result in the lkb
4975 being freed */
4976
4977 dlm_put_lkb(lkb);
4978 }
a1bc86e6 4979
ef0c2bb0
DT
4980 mutex_lock(&ls->ls_clear_proc_locks);
4981
a1bc86e6
DT
4982 /* in-progress unlocks */
4983 list_for_each_entry_safe(lkb, safe, &proc->unlocking, lkb_ownqueue) {
4984 list_del_init(&lkb->lkb_ownqueue);
4985 lkb->lkb_flags |= DLM_IFL_DEAD;
4986 dlm_put_lkb(lkb);
4987 }
4988
4989 list_for_each_entry_safe(lkb, safe, &proc->asts, lkb_astqueue) {
8304d6f2
DT
4990 memset(&lkb->lkb_callbacks, 0,
4991 sizeof(struct dlm_callback) * DLM_CALLBACKS_SIZE);
4992 list_del_init(&lkb->lkb_astqueue);
a1bc86e6
DT
4993 dlm_put_lkb(lkb);
4994 }
4995
597d0cae 4996 mutex_unlock(&ls->ls_clear_proc_locks);
85e86edf 4997 dlm_unlock_recovery(ls);
597d0cae 4998}
a1bc86e6 4999
8499137d
DT
5000static void purge_proc_locks(struct dlm_ls *ls, struct dlm_user_proc *proc)
5001{
5002 struct dlm_lkb *lkb, *safe;
5003
5004 while (1) {
5005 lkb = NULL;
5006 spin_lock(&proc->locks_spin);
5007 if (!list_empty(&proc->locks)) {
5008 lkb = list_entry(proc->locks.next, struct dlm_lkb,
5009 lkb_ownqueue);
5010 list_del_init(&lkb->lkb_ownqueue);
5011 }
5012 spin_unlock(&proc->locks_spin);
5013
5014 if (!lkb)
5015 break;
5016
5017 lkb->lkb_flags |= DLM_IFL_DEAD;
5018 unlock_proc_lock(ls, lkb);
5019 dlm_put_lkb(lkb); /* ref from proc->locks list */
5020 }
5021
5022 spin_lock(&proc->locks_spin);
5023 list_for_each_entry_safe(lkb, safe, &proc->unlocking, lkb_ownqueue) {
5024 list_del_init(&lkb->lkb_ownqueue);
5025 lkb->lkb_flags |= DLM_IFL_DEAD;
5026 dlm_put_lkb(lkb);
5027 }
5028 spin_unlock(&proc->locks_spin);
5029
5030 spin_lock(&proc->asts_spin);
5031 list_for_each_entry_safe(lkb, safe, &proc->asts, lkb_astqueue) {
8304d6f2
DT
5032 memset(&lkb->lkb_callbacks, 0,
5033 sizeof(struct dlm_callback) * DLM_CALLBACKS_SIZE);
5034 list_del_init(&lkb->lkb_astqueue);
8499137d
DT
5035 dlm_put_lkb(lkb);
5036 }
5037 spin_unlock(&proc->asts_spin);
5038}
5039
5040/* pid of 0 means purge all orphans */
5041
5042static void do_purge(struct dlm_ls *ls, int nodeid, int pid)
5043{
5044 struct dlm_lkb *lkb, *safe;
5045
5046 mutex_lock(&ls->ls_orphans_mutex);
5047 list_for_each_entry_safe(lkb, safe, &ls->ls_orphans, lkb_ownqueue) {
5048 if (pid && lkb->lkb_ownpid != pid)
5049 continue;
5050 unlock_proc_lock(ls, lkb);
5051 list_del_init(&lkb->lkb_ownqueue);
5052 dlm_put_lkb(lkb);
5053 }
5054 mutex_unlock(&ls->ls_orphans_mutex);
5055}
5056
5057static int send_purge(struct dlm_ls *ls, int nodeid, int pid)
5058{
5059 struct dlm_message *ms;
5060 struct dlm_mhandle *mh;
5061 int error;
5062
5063 error = _create_message(ls, sizeof(struct dlm_message), nodeid,
5064 DLM_MSG_PURGE, &ms, &mh);
5065 if (error)
5066 return error;
5067 ms->m_nodeid = nodeid;
5068 ms->m_pid = pid;
5069
5070 return send_message(mh, ms);
5071}
5072
5073int dlm_user_purge(struct dlm_ls *ls, struct dlm_user_proc *proc,
5074 int nodeid, int pid)
5075{
5076 int error = 0;
5077
5078 if (nodeid != dlm_our_nodeid()) {
5079 error = send_purge(ls, nodeid, pid);
5080 } else {
85e86edf 5081 dlm_lock_recovery(ls);
8499137d
DT
5082 if (pid == current->pid)
5083 purge_proc_locks(ls, proc);
5084 else
5085 do_purge(ls, nodeid, pid);
85e86edf 5086 dlm_unlock_recovery(ls);
8499137d
DT
5087 }
5088 return error;
5089}
5090
This page took 0.64584 seconds and 5 git commands to generate.