[DLM] block scand during recovery [1/6]
[deliverable/linux.git] / fs / dlm / lock.c
CommitLineData
e7fd4179
DT
1/******************************************************************************
2*******************************************************************************
3**
ef0c2bb0 4** Copyright (C) 2005-2007 Red Hat, Inc. All rights reserved.
e7fd4179
DT
5**
6** This copyrighted material is made available to anyone wishing to use,
7** modify, copy, or redistribute it subject to the terms and conditions
8** of the GNU General Public License v.2.
9**
10*******************************************************************************
11******************************************************************************/
12
13/* Central locking logic has four stages:
14
15 dlm_lock()
16 dlm_unlock()
17
18 request_lock(ls, lkb)
19 convert_lock(ls, lkb)
20 unlock_lock(ls, lkb)
21 cancel_lock(ls, lkb)
22
23 _request_lock(r, lkb)
24 _convert_lock(r, lkb)
25 _unlock_lock(r, lkb)
26 _cancel_lock(r, lkb)
27
28 do_request(r, lkb)
29 do_convert(r, lkb)
30 do_unlock(r, lkb)
31 do_cancel(r, lkb)
32
33 Stage 1 (lock, unlock) is mainly about checking input args and
34 splitting into one of the four main operations:
35
36 dlm_lock = request_lock
37 dlm_lock+CONVERT = convert_lock
38 dlm_unlock = unlock_lock
39 dlm_unlock+CANCEL = cancel_lock
40
41 Stage 2, xxxx_lock(), just finds and locks the relevant rsb which is
42 provided to the next stage.
43
44 Stage 3, _xxxx_lock(), determines if the operation is local or remote.
45 When remote, it calls send_xxxx(), when local it calls do_xxxx().
46
47 Stage 4, do_xxxx(), is the guts of the operation. It manipulates the
48 given rsb and lkb and queues callbacks.
49
50 For remote operations, send_xxxx() results in the corresponding do_xxxx()
51 function being executed on the remote node. The connecting send/receive
52 calls on local (L) and remote (R) nodes:
53
54 L: send_xxxx() -> R: receive_xxxx()
55 R: do_xxxx()
56 L: receive_xxxx_reply() <- R: send_xxxx_reply()
57*/
597d0cae 58#include <linux/types.h>
e7fd4179 59#include "dlm_internal.h"
597d0cae 60#include <linux/dlm_device.h>
e7fd4179
DT
61#include "memory.h"
62#include "lowcomms.h"
63#include "requestqueue.h"
64#include "util.h"
65#include "dir.h"
66#include "member.h"
67#include "lockspace.h"
68#include "ast.h"
69#include "lock.h"
70#include "rcom.h"
71#include "recover.h"
72#include "lvb_table.h"
597d0cae 73#include "user.h"
e7fd4179
DT
74#include "config.h"
75
76static int send_request(struct dlm_rsb *r, struct dlm_lkb *lkb);
77static int send_convert(struct dlm_rsb *r, struct dlm_lkb *lkb);
78static int send_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb);
79static int send_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb);
80static int send_grant(struct dlm_rsb *r, struct dlm_lkb *lkb);
81static int send_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int mode);
82static int send_lookup(struct dlm_rsb *r, struct dlm_lkb *lkb);
83static int send_remove(struct dlm_rsb *r);
84static int _request_lock(struct dlm_rsb *r, struct dlm_lkb *lkb);
85static void __receive_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
86 struct dlm_message *ms);
87static int receive_extralen(struct dlm_message *ms);
8499137d 88static void do_purge(struct dlm_ls *ls, int nodeid, int pid);
e7fd4179
DT
89
90/*
91 * Lock compatibilty matrix - thanks Steve
92 * UN = Unlocked state. Not really a state, used as a flag
93 * PD = Padding. Used to make the matrix a nice power of two in size
94 * Other states are the same as the VMS DLM.
95 * Usage: matrix[grmode+1][rqmode+1] (although m[rq+1][gr+1] is the same)
96 */
97
98static const int __dlm_compat_matrix[8][8] = {
99 /* UN NL CR CW PR PW EX PD */
100 {1, 1, 1, 1, 1, 1, 1, 0}, /* UN */
101 {1, 1, 1, 1, 1, 1, 1, 0}, /* NL */
102 {1, 1, 1, 1, 1, 1, 0, 0}, /* CR */
103 {1, 1, 1, 1, 0, 0, 0, 0}, /* CW */
104 {1, 1, 1, 0, 1, 0, 0, 0}, /* PR */
105 {1, 1, 1, 0, 0, 0, 0, 0}, /* PW */
106 {1, 1, 0, 0, 0, 0, 0, 0}, /* EX */
107 {0, 0, 0, 0, 0, 0, 0, 0} /* PD */
108};
109
110/*
111 * This defines the direction of transfer of LVB data.
112 * Granted mode is the row; requested mode is the column.
113 * Usage: matrix[grmode+1][rqmode+1]
114 * 1 = LVB is returned to the caller
115 * 0 = LVB is written to the resource
116 * -1 = nothing happens to the LVB
117 */
118
119const int dlm_lvb_operations[8][8] = {
120 /* UN NL CR CW PR PW EX PD*/
121 { -1, 1, 1, 1, 1, 1, 1, -1 }, /* UN */
122 { -1, 1, 1, 1, 1, 1, 1, 0 }, /* NL */
123 { -1, -1, 1, 1, 1, 1, 1, 0 }, /* CR */
124 { -1, -1, -1, 1, 1, 1, 1, 0 }, /* CW */
125 { -1, -1, -1, -1, 1, 1, 1, 0 }, /* PR */
126 { -1, 0, 0, 0, 0, 0, 1, 0 }, /* PW */
127 { -1, 0, 0, 0, 0, 0, 0, 0 }, /* EX */
128 { -1, 0, 0, 0, 0, 0, 0, 0 } /* PD */
129};
e7fd4179
DT
130
131#define modes_compat(gr, rq) \
132 __dlm_compat_matrix[(gr)->lkb_grmode + 1][(rq)->lkb_rqmode + 1]
133
134int dlm_modes_compat(int mode1, int mode2)
135{
136 return __dlm_compat_matrix[mode1 + 1][mode2 + 1];
137}
138
139/*
140 * Compatibility matrix for conversions with QUECVT set.
141 * Granted mode is the row; requested mode is the column.
142 * Usage: matrix[grmode+1][rqmode+1]
143 */
144
145static const int __quecvt_compat_matrix[8][8] = {
146 /* UN NL CR CW PR PW EX PD */
147 {0, 0, 0, 0, 0, 0, 0, 0}, /* UN */
148 {0, 0, 1, 1, 1, 1, 1, 0}, /* NL */
149 {0, 0, 0, 1, 1, 1, 1, 0}, /* CR */
150 {0, 0, 0, 0, 1, 1, 1, 0}, /* CW */
151 {0, 0, 0, 1, 0, 1, 1, 0}, /* PR */
152 {0, 0, 0, 0, 0, 0, 1, 0}, /* PW */
153 {0, 0, 0, 0, 0, 0, 0, 0}, /* EX */
154 {0, 0, 0, 0, 0, 0, 0, 0} /* PD */
155};
156
597d0cae 157void dlm_print_lkb(struct dlm_lkb *lkb)
e7fd4179
DT
158{
159 printk(KERN_ERR "lkb: nodeid %d id %x remid %x exflags %x flags %x\n"
160 " status %d rqmode %d grmode %d wait_type %d ast_type %d\n",
161 lkb->lkb_nodeid, lkb->lkb_id, lkb->lkb_remid, lkb->lkb_exflags,
162 lkb->lkb_flags, lkb->lkb_status, lkb->lkb_rqmode,
163 lkb->lkb_grmode, lkb->lkb_wait_type, lkb->lkb_ast_type);
164}
165
166void dlm_print_rsb(struct dlm_rsb *r)
167{
168 printk(KERN_ERR "rsb: nodeid %d flags %lx first %x rlc %d name %s\n",
169 r->res_nodeid, r->res_flags, r->res_first_lkid,
170 r->res_recover_locks_count, r->res_name);
171}
172
a345da3e
DT
173void dlm_dump_rsb(struct dlm_rsb *r)
174{
175 struct dlm_lkb *lkb;
176
177 dlm_print_rsb(r);
178
179 printk(KERN_ERR "rsb: root_list empty %d recover_list empty %d\n",
180 list_empty(&r->res_root_list), list_empty(&r->res_recover_list));
181 printk(KERN_ERR "rsb lookup list\n");
182 list_for_each_entry(lkb, &r->res_lookup, lkb_rsb_lookup)
183 dlm_print_lkb(lkb);
184 printk(KERN_ERR "rsb grant queue:\n");
185 list_for_each_entry(lkb, &r->res_grantqueue, lkb_statequeue)
186 dlm_print_lkb(lkb);
187 printk(KERN_ERR "rsb convert queue:\n");
188 list_for_each_entry(lkb, &r->res_convertqueue, lkb_statequeue)
189 dlm_print_lkb(lkb);
190 printk(KERN_ERR "rsb wait queue:\n");
191 list_for_each_entry(lkb, &r->res_waitqueue, lkb_statequeue)
192 dlm_print_lkb(lkb);
193}
194
e7fd4179
DT
195/* Threads cannot use the lockspace while it's being recovered */
196
85e86edf 197static inline void dlm_lock_recovery(struct dlm_ls *ls)
e7fd4179
DT
198{
199 down_read(&ls->ls_in_recovery);
200}
201
85e86edf 202void dlm_unlock_recovery(struct dlm_ls *ls)
e7fd4179
DT
203{
204 up_read(&ls->ls_in_recovery);
205}
206
85e86edf 207int dlm_lock_recovery_try(struct dlm_ls *ls)
e7fd4179
DT
208{
209 return down_read_trylock(&ls->ls_in_recovery);
210}
211
212static inline int can_be_queued(struct dlm_lkb *lkb)
213{
214 return !(lkb->lkb_exflags & DLM_LKF_NOQUEUE);
215}
216
217static inline int force_blocking_asts(struct dlm_lkb *lkb)
218{
219 return (lkb->lkb_exflags & DLM_LKF_NOQUEUEBAST);
220}
221
222static inline int is_demoted(struct dlm_lkb *lkb)
223{
224 return (lkb->lkb_sbflags & DLM_SBF_DEMOTED);
225}
226
7d3c1feb
DT
227static inline int is_altmode(struct dlm_lkb *lkb)
228{
229 return (lkb->lkb_sbflags & DLM_SBF_ALTMODE);
230}
231
232static inline int is_granted(struct dlm_lkb *lkb)
233{
234 return (lkb->lkb_status == DLM_LKSTS_GRANTED);
235}
236
e7fd4179
DT
237static inline int is_remote(struct dlm_rsb *r)
238{
239 DLM_ASSERT(r->res_nodeid >= 0, dlm_print_rsb(r););
240 return !!r->res_nodeid;
241}
242
243static inline int is_process_copy(struct dlm_lkb *lkb)
244{
245 return (lkb->lkb_nodeid && !(lkb->lkb_flags & DLM_IFL_MSTCPY));
246}
247
248static inline int is_master_copy(struct dlm_lkb *lkb)
249{
250 if (lkb->lkb_flags & DLM_IFL_MSTCPY)
251 DLM_ASSERT(lkb->lkb_nodeid, dlm_print_lkb(lkb););
90135925 252 return (lkb->lkb_flags & DLM_IFL_MSTCPY) ? 1 : 0;
e7fd4179
DT
253}
254
255static inline int middle_conversion(struct dlm_lkb *lkb)
256{
257 if ((lkb->lkb_grmode==DLM_LOCK_PR && lkb->lkb_rqmode==DLM_LOCK_CW) ||
258 (lkb->lkb_rqmode==DLM_LOCK_PR && lkb->lkb_grmode==DLM_LOCK_CW))
90135925
DT
259 return 1;
260 return 0;
e7fd4179
DT
261}
262
263static inline int down_conversion(struct dlm_lkb *lkb)
264{
265 return (!middle_conversion(lkb) && lkb->lkb_rqmode < lkb->lkb_grmode);
266}
267
ef0c2bb0
DT
268static inline int is_overlap_unlock(struct dlm_lkb *lkb)
269{
270 return lkb->lkb_flags & DLM_IFL_OVERLAP_UNLOCK;
271}
272
273static inline int is_overlap_cancel(struct dlm_lkb *lkb)
274{
275 return lkb->lkb_flags & DLM_IFL_OVERLAP_CANCEL;
276}
277
278static inline int is_overlap(struct dlm_lkb *lkb)
279{
280 return (lkb->lkb_flags & (DLM_IFL_OVERLAP_UNLOCK |
281 DLM_IFL_OVERLAP_CANCEL));
282}
283
e7fd4179
DT
284static void queue_cast(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
285{
286 if (is_master_copy(lkb))
287 return;
288
289 DLM_ASSERT(lkb->lkb_lksb, dlm_print_lkb(lkb););
290
291 lkb->lkb_lksb->sb_status = rv;
292 lkb->lkb_lksb->sb_flags = lkb->lkb_sbflags;
293
294 dlm_add_ast(lkb, AST_COMP);
295}
296
ef0c2bb0
DT
297static inline void queue_cast_overlap(struct dlm_rsb *r, struct dlm_lkb *lkb)
298{
299 queue_cast(r, lkb,
300 is_overlap_unlock(lkb) ? -DLM_EUNLOCK : -DLM_ECANCEL);
301}
302
e7fd4179
DT
303static void queue_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int rqmode)
304{
305 if (is_master_copy(lkb))
306 send_bast(r, lkb, rqmode);
307 else {
308 lkb->lkb_bastmode = rqmode;
309 dlm_add_ast(lkb, AST_BAST);
310 }
311}
312
313/*
314 * Basic operations on rsb's and lkb's
315 */
316
317static struct dlm_rsb *create_rsb(struct dlm_ls *ls, char *name, int len)
318{
319 struct dlm_rsb *r;
320
321 r = allocate_rsb(ls, len);
322 if (!r)
323 return NULL;
324
325 r->res_ls = ls;
326 r->res_length = len;
327 memcpy(r->res_name, name, len);
90135925 328 mutex_init(&r->res_mutex);
e7fd4179
DT
329
330 INIT_LIST_HEAD(&r->res_lookup);
331 INIT_LIST_HEAD(&r->res_grantqueue);
332 INIT_LIST_HEAD(&r->res_convertqueue);
333 INIT_LIST_HEAD(&r->res_waitqueue);
334 INIT_LIST_HEAD(&r->res_root_list);
335 INIT_LIST_HEAD(&r->res_recover_list);
336
337 return r;
338}
339
340static int search_rsb_list(struct list_head *head, char *name, int len,
341 unsigned int flags, struct dlm_rsb **r_ret)
342{
343 struct dlm_rsb *r;
344 int error = 0;
345
346 list_for_each_entry(r, head, res_hashchain) {
347 if (len == r->res_length && !memcmp(name, r->res_name, len))
348 goto found;
349 }
597d0cae 350 return -EBADR;
e7fd4179
DT
351
352 found:
353 if (r->res_nodeid && (flags & R_MASTER))
354 error = -ENOTBLK;
355 *r_ret = r;
356 return error;
357}
358
359static int _search_rsb(struct dlm_ls *ls, char *name, int len, int b,
360 unsigned int flags, struct dlm_rsb **r_ret)
361{
362 struct dlm_rsb *r;
363 int error;
364
365 error = search_rsb_list(&ls->ls_rsbtbl[b].list, name, len, flags, &r);
366 if (!error) {
367 kref_get(&r->res_ref);
368 goto out;
369 }
370 error = search_rsb_list(&ls->ls_rsbtbl[b].toss, name, len, flags, &r);
371 if (error)
372 goto out;
373
374 list_move(&r->res_hashchain, &ls->ls_rsbtbl[b].list);
375
376 if (dlm_no_directory(ls))
377 goto out;
378
379 if (r->res_nodeid == -1) {
380 rsb_clear_flag(r, RSB_MASTER_UNCERTAIN);
381 r->res_first_lkid = 0;
382 } else if (r->res_nodeid > 0) {
383 rsb_set_flag(r, RSB_MASTER_UNCERTAIN);
384 r->res_first_lkid = 0;
385 } else {
386 DLM_ASSERT(r->res_nodeid == 0, dlm_print_rsb(r););
387 DLM_ASSERT(!rsb_flag(r, RSB_MASTER_UNCERTAIN),);
388 }
389 out:
390 *r_ret = r;
391 return error;
392}
393
394static int search_rsb(struct dlm_ls *ls, char *name, int len, int b,
395 unsigned int flags, struct dlm_rsb **r_ret)
396{
397 int error;
398 write_lock(&ls->ls_rsbtbl[b].lock);
399 error = _search_rsb(ls, name, len, b, flags, r_ret);
400 write_unlock(&ls->ls_rsbtbl[b].lock);
401 return error;
402}
403
404/*
405 * Find rsb in rsbtbl and potentially create/add one
406 *
407 * Delaying the release of rsb's has a similar benefit to applications keeping
408 * NL locks on an rsb, but without the guarantee that the cached master value
409 * will still be valid when the rsb is reused. Apps aren't always smart enough
410 * to keep NL locks on an rsb that they may lock again shortly; this can lead
411 * to excessive master lookups and removals if we don't delay the release.
412 *
413 * Searching for an rsb means looking through both the normal list and toss
414 * list. When found on the toss list the rsb is moved to the normal list with
415 * ref count of 1; when found on normal list the ref count is incremented.
416 */
417
418static int find_rsb(struct dlm_ls *ls, char *name, int namelen,
419 unsigned int flags, struct dlm_rsb **r_ret)
420{
421 struct dlm_rsb *r, *tmp;
422 uint32_t hash, bucket;
423 int error = 0;
424
425 if (dlm_no_directory(ls))
426 flags |= R_CREATE;
427
428 hash = jhash(name, namelen, 0);
429 bucket = hash & (ls->ls_rsbtbl_size - 1);
430
431 error = search_rsb(ls, name, namelen, bucket, flags, &r);
432 if (!error)
433 goto out;
434
597d0cae 435 if (error == -EBADR && !(flags & R_CREATE))
e7fd4179
DT
436 goto out;
437
438 /* the rsb was found but wasn't a master copy */
439 if (error == -ENOTBLK)
440 goto out;
441
442 error = -ENOMEM;
443 r = create_rsb(ls, name, namelen);
444 if (!r)
445 goto out;
446
447 r->res_hash = hash;
448 r->res_bucket = bucket;
449 r->res_nodeid = -1;
450 kref_init(&r->res_ref);
451
452 /* With no directory, the master can be set immediately */
453 if (dlm_no_directory(ls)) {
454 int nodeid = dlm_dir_nodeid(r);
455 if (nodeid == dlm_our_nodeid())
456 nodeid = 0;
457 r->res_nodeid = nodeid;
458 }
459
460 write_lock(&ls->ls_rsbtbl[bucket].lock);
461 error = _search_rsb(ls, name, namelen, bucket, 0, &tmp);
462 if (!error) {
463 write_unlock(&ls->ls_rsbtbl[bucket].lock);
464 free_rsb(r);
465 r = tmp;
466 goto out;
467 }
468 list_add(&r->res_hashchain, &ls->ls_rsbtbl[bucket].list);
469 write_unlock(&ls->ls_rsbtbl[bucket].lock);
470 error = 0;
471 out:
472 *r_ret = r;
473 return error;
474}
475
476int dlm_find_rsb(struct dlm_ls *ls, char *name, int namelen,
477 unsigned int flags, struct dlm_rsb **r_ret)
478{
479 return find_rsb(ls, name, namelen, flags, r_ret);
480}
481
482/* This is only called to add a reference when the code already holds
483 a valid reference to the rsb, so there's no need for locking. */
484
485static inline void hold_rsb(struct dlm_rsb *r)
486{
487 kref_get(&r->res_ref);
488}
489
490void dlm_hold_rsb(struct dlm_rsb *r)
491{
492 hold_rsb(r);
493}
494
495static void toss_rsb(struct kref *kref)
496{
497 struct dlm_rsb *r = container_of(kref, struct dlm_rsb, res_ref);
498 struct dlm_ls *ls = r->res_ls;
499
500 DLM_ASSERT(list_empty(&r->res_root_list), dlm_print_rsb(r););
501 kref_init(&r->res_ref);
502 list_move(&r->res_hashchain, &ls->ls_rsbtbl[r->res_bucket].toss);
503 r->res_toss_time = jiffies;
504 if (r->res_lvbptr) {
505 free_lvb(r->res_lvbptr);
506 r->res_lvbptr = NULL;
507 }
508}
509
510/* When all references to the rsb are gone it's transfered to
511 the tossed list for later disposal. */
512
513static void put_rsb(struct dlm_rsb *r)
514{
515 struct dlm_ls *ls = r->res_ls;
516 uint32_t bucket = r->res_bucket;
517
518 write_lock(&ls->ls_rsbtbl[bucket].lock);
519 kref_put(&r->res_ref, toss_rsb);
520 write_unlock(&ls->ls_rsbtbl[bucket].lock);
521}
522
523void dlm_put_rsb(struct dlm_rsb *r)
524{
525 put_rsb(r);
526}
527
528/* See comment for unhold_lkb */
529
530static void unhold_rsb(struct dlm_rsb *r)
531{
532 int rv;
533 rv = kref_put(&r->res_ref, toss_rsb);
a345da3e 534 DLM_ASSERT(!rv, dlm_dump_rsb(r););
e7fd4179
DT
535}
536
537static void kill_rsb(struct kref *kref)
538{
539 struct dlm_rsb *r = container_of(kref, struct dlm_rsb, res_ref);
540
541 /* All work is done after the return from kref_put() so we
542 can release the write_lock before the remove and free. */
543
a345da3e
DT
544 DLM_ASSERT(list_empty(&r->res_lookup), dlm_dump_rsb(r););
545 DLM_ASSERT(list_empty(&r->res_grantqueue), dlm_dump_rsb(r););
546 DLM_ASSERT(list_empty(&r->res_convertqueue), dlm_dump_rsb(r););
547 DLM_ASSERT(list_empty(&r->res_waitqueue), dlm_dump_rsb(r););
548 DLM_ASSERT(list_empty(&r->res_root_list), dlm_dump_rsb(r););
549 DLM_ASSERT(list_empty(&r->res_recover_list), dlm_dump_rsb(r););
e7fd4179
DT
550}
551
552/* Attaching/detaching lkb's from rsb's is for rsb reference counting.
553 The rsb must exist as long as any lkb's for it do. */
554
555static void attach_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb)
556{
557 hold_rsb(r);
558 lkb->lkb_resource = r;
559}
560
561static void detach_lkb(struct dlm_lkb *lkb)
562{
563 if (lkb->lkb_resource) {
564 put_rsb(lkb->lkb_resource);
565 lkb->lkb_resource = NULL;
566 }
567}
568
569static int create_lkb(struct dlm_ls *ls, struct dlm_lkb **lkb_ret)
570{
571 struct dlm_lkb *lkb, *tmp;
572 uint32_t lkid = 0;
573 uint16_t bucket;
574
575 lkb = allocate_lkb(ls);
576 if (!lkb)
577 return -ENOMEM;
578
579 lkb->lkb_nodeid = -1;
580 lkb->lkb_grmode = DLM_LOCK_IV;
581 kref_init(&lkb->lkb_ref);
34e22bed 582 INIT_LIST_HEAD(&lkb->lkb_ownqueue);
ef0c2bb0 583 INIT_LIST_HEAD(&lkb->lkb_rsb_lookup);
e7fd4179
DT
584
585 get_random_bytes(&bucket, sizeof(bucket));
586 bucket &= (ls->ls_lkbtbl_size - 1);
587
588 write_lock(&ls->ls_lkbtbl[bucket].lock);
589
590 /* counter can roll over so we must verify lkid is not in use */
591
592 while (lkid == 0) {
ce03f12b 593 lkid = (bucket << 16) | ls->ls_lkbtbl[bucket].counter++;
e7fd4179
DT
594
595 list_for_each_entry(tmp, &ls->ls_lkbtbl[bucket].list,
596 lkb_idtbl_list) {
597 if (tmp->lkb_id != lkid)
598 continue;
599 lkid = 0;
600 break;
601 }
602 }
603
604 lkb->lkb_id = lkid;
605 list_add(&lkb->lkb_idtbl_list, &ls->ls_lkbtbl[bucket].list);
606 write_unlock(&ls->ls_lkbtbl[bucket].lock);
607
608 *lkb_ret = lkb;
609 return 0;
610}
611
612static struct dlm_lkb *__find_lkb(struct dlm_ls *ls, uint32_t lkid)
613{
e7fd4179 614 struct dlm_lkb *lkb;
ce03f12b 615 uint16_t bucket = (lkid >> 16);
e7fd4179
DT
616
617 list_for_each_entry(lkb, &ls->ls_lkbtbl[bucket].list, lkb_idtbl_list) {
618 if (lkb->lkb_id == lkid)
619 return lkb;
620 }
621 return NULL;
622}
623
624static int find_lkb(struct dlm_ls *ls, uint32_t lkid, struct dlm_lkb **lkb_ret)
625{
626 struct dlm_lkb *lkb;
ce03f12b 627 uint16_t bucket = (lkid >> 16);
e7fd4179
DT
628
629 if (bucket >= ls->ls_lkbtbl_size)
630 return -EBADSLT;
631
632 read_lock(&ls->ls_lkbtbl[bucket].lock);
633 lkb = __find_lkb(ls, lkid);
634 if (lkb)
635 kref_get(&lkb->lkb_ref);
636 read_unlock(&ls->ls_lkbtbl[bucket].lock);
637
638 *lkb_ret = lkb;
639 return lkb ? 0 : -ENOENT;
640}
641
642static void kill_lkb(struct kref *kref)
643{
644 struct dlm_lkb *lkb = container_of(kref, struct dlm_lkb, lkb_ref);
645
646 /* All work is done after the return from kref_put() so we
647 can release the write_lock before the detach_lkb */
648
649 DLM_ASSERT(!lkb->lkb_status, dlm_print_lkb(lkb););
650}
651
b3f58d8f
DT
652/* __put_lkb() is used when an lkb may not have an rsb attached to
653 it so we need to provide the lockspace explicitly */
654
655static int __put_lkb(struct dlm_ls *ls, struct dlm_lkb *lkb)
e7fd4179 656{
ce03f12b 657 uint16_t bucket = (lkb->lkb_id >> 16);
e7fd4179
DT
658
659 write_lock(&ls->ls_lkbtbl[bucket].lock);
660 if (kref_put(&lkb->lkb_ref, kill_lkb)) {
661 list_del(&lkb->lkb_idtbl_list);
662 write_unlock(&ls->ls_lkbtbl[bucket].lock);
663
664 detach_lkb(lkb);
665
666 /* for local/process lkbs, lvbptr points to caller's lksb */
667 if (lkb->lkb_lvbptr && is_master_copy(lkb))
668 free_lvb(lkb->lkb_lvbptr);
e7fd4179
DT
669 free_lkb(lkb);
670 return 1;
671 } else {
672 write_unlock(&ls->ls_lkbtbl[bucket].lock);
673 return 0;
674 }
675}
676
677int dlm_put_lkb(struct dlm_lkb *lkb)
678{
b3f58d8f
DT
679 struct dlm_ls *ls;
680
681 DLM_ASSERT(lkb->lkb_resource, dlm_print_lkb(lkb););
682 DLM_ASSERT(lkb->lkb_resource->res_ls, dlm_print_lkb(lkb););
683
684 ls = lkb->lkb_resource->res_ls;
685 return __put_lkb(ls, lkb);
e7fd4179
DT
686}
687
688/* This is only called to add a reference when the code already holds
689 a valid reference to the lkb, so there's no need for locking. */
690
691static inline void hold_lkb(struct dlm_lkb *lkb)
692{
693 kref_get(&lkb->lkb_ref);
694}
695
696/* This is called when we need to remove a reference and are certain
697 it's not the last ref. e.g. del_lkb is always called between a
698 find_lkb/put_lkb and is always the inverse of a previous add_lkb.
699 put_lkb would work fine, but would involve unnecessary locking */
700
701static inline void unhold_lkb(struct dlm_lkb *lkb)
702{
703 int rv;
704 rv = kref_put(&lkb->lkb_ref, kill_lkb);
705 DLM_ASSERT(!rv, dlm_print_lkb(lkb););
706}
707
708static void lkb_add_ordered(struct list_head *new, struct list_head *head,
709 int mode)
710{
711 struct dlm_lkb *lkb = NULL;
712
713 list_for_each_entry(lkb, head, lkb_statequeue)
714 if (lkb->lkb_rqmode < mode)
715 break;
716
717 if (!lkb)
718 list_add_tail(new, head);
719 else
720 __list_add(new, lkb->lkb_statequeue.prev, &lkb->lkb_statequeue);
721}
722
723/* add/remove lkb to rsb's grant/convert/wait queue */
724
725static void add_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb, int status)
726{
727 kref_get(&lkb->lkb_ref);
728
729 DLM_ASSERT(!lkb->lkb_status, dlm_print_lkb(lkb););
730
731 lkb->lkb_status = status;
732
733 switch (status) {
734 case DLM_LKSTS_WAITING:
735 if (lkb->lkb_exflags & DLM_LKF_HEADQUE)
736 list_add(&lkb->lkb_statequeue, &r->res_waitqueue);
737 else
738 list_add_tail(&lkb->lkb_statequeue, &r->res_waitqueue);
739 break;
740 case DLM_LKSTS_GRANTED:
741 /* convention says granted locks kept in order of grmode */
742 lkb_add_ordered(&lkb->lkb_statequeue, &r->res_grantqueue,
743 lkb->lkb_grmode);
744 break;
745 case DLM_LKSTS_CONVERT:
746 if (lkb->lkb_exflags & DLM_LKF_HEADQUE)
747 list_add(&lkb->lkb_statequeue, &r->res_convertqueue);
748 else
749 list_add_tail(&lkb->lkb_statequeue,
750 &r->res_convertqueue);
751 break;
752 default:
753 DLM_ASSERT(0, dlm_print_lkb(lkb); printk("sts=%d\n", status););
754 }
755}
756
757static void del_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb)
758{
759 lkb->lkb_status = 0;
760 list_del(&lkb->lkb_statequeue);
761 unhold_lkb(lkb);
762}
763
764static void move_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb, int sts)
765{
766 hold_lkb(lkb);
767 del_lkb(r, lkb);
768 add_lkb(r, lkb, sts);
769 unhold_lkb(lkb);
770}
771
ef0c2bb0
DT
772static int msg_reply_type(int mstype)
773{
774 switch (mstype) {
775 case DLM_MSG_REQUEST:
776 return DLM_MSG_REQUEST_REPLY;
777 case DLM_MSG_CONVERT:
778 return DLM_MSG_CONVERT_REPLY;
779 case DLM_MSG_UNLOCK:
780 return DLM_MSG_UNLOCK_REPLY;
781 case DLM_MSG_CANCEL:
782 return DLM_MSG_CANCEL_REPLY;
783 case DLM_MSG_LOOKUP:
784 return DLM_MSG_LOOKUP_REPLY;
785 }
786 return -1;
787}
788
e7fd4179
DT
789/* add/remove lkb from global waiters list of lkb's waiting for
790 a reply from a remote node */
791
ef0c2bb0 792static int add_to_waiters(struct dlm_lkb *lkb, int mstype)
e7fd4179
DT
793{
794 struct dlm_ls *ls = lkb->lkb_resource->res_ls;
ef0c2bb0 795 int error = 0;
e7fd4179 796
90135925 797 mutex_lock(&ls->ls_waiters_mutex);
ef0c2bb0
DT
798
799 if (is_overlap_unlock(lkb) ||
800 (is_overlap_cancel(lkb) && (mstype == DLM_MSG_CANCEL))) {
801 error = -EINVAL;
802 goto out;
803 }
804
805 if (lkb->lkb_wait_type || is_overlap_cancel(lkb)) {
806 switch (mstype) {
807 case DLM_MSG_UNLOCK:
808 lkb->lkb_flags |= DLM_IFL_OVERLAP_UNLOCK;
809 break;
810 case DLM_MSG_CANCEL:
811 lkb->lkb_flags |= DLM_IFL_OVERLAP_CANCEL;
812 break;
813 default:
814 error = -EBUSY;
815 goto out;
816 }
817 lkb->lkb_wait_count++;
818 hold_lkb(lkb);
819
820 log_debug(ls, "add overlap %x cur %d new %d count %d flags %x",
821 lkb->lkb_id, lkb->lkb_wait_type, mstype,
822 lkb->lkb_wait_count, lkb->lkb_flags);
e7fd4179
DT
823 goto out;
824 }
ef0c2bb0
DT
825
826 DLM_ASSERT(!lkb->lkb_wait_count,
827 dlm_print_lkb(lkb);
828 printk("wait_count %d\n", lkb->lkb_wait_count););
829
830 lkb->lkb_wait_count++;
e7fd4179 831 lkb->lkb_wait_type = mstype;
ef0c2bb0 832 hold_lkb(lkb);
e7fd4179
DT
833 list_add(&lkb->lkb_wait_reply, &ls->ls_waiters);
834 out:
ef0c2bb0
DT
835 if (error)
836 log_error(ls, "add_to_waiters %x error %d flags %x %d %d %s",
837 lkb->lkb_id, error, lkb->lkb_flags, mstype,
838 lkb->lkb_wait_type, lkb->lkb_resource->res_name);
90135925 839 mutex_unlock(&ls->ls_waiters_mutex);
ef0c2bb0 840 return error;
e7fd4179
DT
841}
842
b790c3b7
DT
843/* We clear the RESEND flag because we might be taking an lkb off the waiters
844 list as part of process_requestqueue (e.g. a lookup that has an optimized
845 request reply on the requestqueue) between dlm_recover_waiters_pre() which
846 set RESEND and dlm_recover_waiters_post() */
847
ef0c2bb0 848static int _remove_from_waiters(struct dlm_lkb *lkb, int mstype)
e7fd4179 849{
ef0c2bb0
DT
850 struct dlm_ls *ls = lkb->lkb_resource->res_ls;
851 int overlap_done = 0;
e7fd4179 852
ef0c2bb0
DT
853 if (is_overlap_unlock(lkb) && (mstype == DLM_MSG_UNLOCK_REPLY)) {
854 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK;
855 overlap_done = 1;
856 goto out_del;
e7fd4179 857 }
ef0c2bb0
DT
858
859 if (is_overlap_cancel(lkb) && (mstype == DLM_MSG_CANCEL_REPLY)) {
860 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
861 overlap_done = 1;
862 goto out_del;
863 }
864
865 /* N.B. type of reply may not always correspond to type of original
866 msg due to lookup->request optimization, verify others? */
867
868 if (lkb->lkb_wait_type) {
869 lkb->lkb_wait_type = 0;
870 goto out_del;
871 }
872
873 log_error(ls, "remove_from_waiters lkid %x flags %x types %d %d",
874 lkb->lkb_id, lkb->lkb_flags, mstype, lkb->lkb_wait_type);
875 return -1;
876
877 out_del:
878 /* the force-unlock/cancel has completed and we haven't recvd a reply
879 to the op that was in progress prior to the unlock/cancel; we
880 give up on any reply to the earlier op. FIXME: not sure when/how
881 this would happen */
882
883 if (overlap_done && lkb->lkb_wait_type) {
884 log_error(ls, "remove_from_waiters %x reply %d give up on %d",
885 lkb->lkb_id, mstype, lkb->lkb_wait_type);
886 lkb->lkb_wait_count--;
887 lkb->lkb_wait_type = 0;
888 }
889
890 DLM_ASSERT(lkb->lkb_wait_count, dlm_print_lkb(lkb););
891
b790c3b7 892 lkb->lkb_flags &= ~DLM_IFL_RESEND;
ef0c2bb0
DT
893 lkb->lkb_wait_count--;
894 if (!lkb->lkb_wait_count)
895 list_del_init(&lkb->lkb_wait_reply);
e7fd4179 896 unhold_lkb(lkb);
ef0c2bb0 897 return 0;
e7fd4179
DT
898}
899
ef0c2bb0 900static int remove_from_waiters(struct dlm_lkb *lkb, int mstype)
e7fd4179
DT
901{
902 struct dlm_ls *ls = lkb->lkb_resource->res_ls;
903 int error;
904
90135925 905 mutex_lock(&ls->ls_waiters_mutex);
ef0c2bb0 906 error = _remove_from_waiters(lkb, mstype);
90135925 907 mutex_unlock(&ls->ls_waiters_mutex);
e7fd4179
DT
908 return error;
909}
910
ef0c2bb0
DT
911/* Handles situations where we might be processing a "fake" or "stub" reply in
912 which we can't try to take waiters_mutex again. */
913
914static int remove_from_waiters_ms(struct dlm_lkb *lkb, struct dlm_message *ms)
915{
916 struct dlm_ls *ls = lkb->lkb_resource->res_ls;
917 int error;
918
919 if (ms != &ls->ls_stub_ms)
920 mutex_lock(&ls->ls_waiters_mutex);
921 error = _remove_from_waiters(lkb, ms->m_type);
922 if (ms != &ls->ls_stub_ms)
923 mutex_unlock(&ls->ls_waiters_mutex);
924 return error;
925}
926
e7fd4179
DT
927static void dir_remove(struct dlm_rsb *r)
928{
929 int to_nodeid;
930
931 if (dlm_no_directory(r->res_ls))
932 return;
933
934 to_nodeid = dlm_dir_nodeid(r);
935 if (to_nodeid != dlm_our_nodeid())
936 send_remove(r);
937 else
938 dlm_dir_remove_entry(r->res_ls, to_nodeid,
939 r->res_name, r->res_length);
940}
941
942/* FIXME: shouldn't this be able to exit as soon as one non-due rsb is
943 found since they are in order of newest to oldest? */
944
945static int shrink_bucket(struct dlm_ls *ls, int b)
946{
947 struct dlm_rsb *r;
948 int count = 0, found;
949
950 for (;;) {
90135925 951 found = 0;
e7fd4179
DT
952 write_lock(&ls->ls_rsbtbl[b].lock);
953 list_for_each_entry_reverse(r, &ls->ls_rsbtbl[b].toss,
954 res_hashchain) {
955 if (!time_after_eq(jiffies, r->res_toss_time +
68c817a1 956 dlm_config.ci_toss_secs * HZ))
e7fd4179 957 continue;
90135925 958 found = 1;
e7fd4179
DT
959 break;
960 }
961
962 if (!found) {
963 write_unlock(&ls->ls_rsbtbl[b].lock);
964 break;
965 }
966
967 if (kref_put(&r->res_ref, kill_rsb)) {
968 list_del(&r->res_hashchain);
969 write_unlock(&ls->ls_rsbtbl[b].lock);
970
971 if (is_master(r))
972 dir_remove(r);
973 free_rsb(r);
974 count++;
975 } else {
976 write_unlock(&ls->ls_rsbtbl[b].lock);
977 log_error(ls, "tossed rsb in use %s", r->res_name);
978 }
979 }
980
981 return count;
982}
983
984void dlm_scan_rsbs(struct dlm_ls *ls)
985{
986 int i;
987
e7fd4179
DT
988 for (i = 0; i < ls->ls_rsbtbl_size; i++) {
989 shrink_bucket(ls, i);
85e86edf
DT
990 if (dlm_locking_stopped(ls))
991 break;
e7fd4179
DT
992 cond_resched();
993 }
994}
995
996/* lkb is master or local copy */
997
998static void set_lvb_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
999{
1000 int b, len = r->res_ls->ls_lvblen;
1001
1002 /* b=1 lvb returned to caller
1003 b=0 lvb written to rsb or invalidated
1004 b=-1 do nothing */
1005
1006 b = dlm_lvb_operations[lkb->lkb_grmode + 1][lkb->lkb_rqmode + 1];
1007
1008 if (b == 1) {
1009 if (!lkb->lkb_lvbptr)
1010 return;
1011
1012 if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
1013 return;
1014
1015 if (!r->res_lvbptr)
1016 return;
1017
1018 memcpy(lkb->lkb_lvbptr, r->res_lvbptr, len);
1019 lkb->lkb_lvbseq = r->res_lvbseq;
1020
1021 } else if (b == 0) {
1022 if (lkb->lkb_exflags & DLM_LKF_IVVALBLK) {
1023 rsb_set_flag(r, RSB_VALNOTVALID);
1024 return;
1025 }
1026
1027 if (!lkb->lkb_lvbptr)
1028 return;
1029
1030 if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
1031 return;
1032
1033 if (!r->res_lvbptr)
1034 r->res_lvbptr = allocate_lvb(r->res_ls);
1035
1036 if (!r->res_lvbptr)
1037 return;
1038
1039 memcpy(r->res_lvbptr, lkb->lkb_lvbptr, len);
1040 r->res_lvbseq++;
1041 lkb->lkb_lvbseq = r->res_lvbseq;
1042 rsb_clear_flag(r, RSB_VALNOTVALID);
1043 }
1044
1045 if (rsb_flag(r, RSB_VALNOTVALID))
1046 lkb->lkb_sbflags |= DLM_SBF_VALNOTVALID;
1047}
1048
1049static void set_lvb_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1050{
1051 if (lkb->lkb_grmode < DLM_LOCK_PW)
1052 return;
1053
1054 if (lkb->lkb_exflags & DLM_LKF_IVVALBLK) {
1055 rsb_set_flag(r, RSB_VALNOTVALID);
1056 return;
1057 }
1058
1059 if (!lkb->lkb_lvbptr)
1060 return;
1061
1062 if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
1063 return;
1064
1065 if (!r->res_lvbptr)
1066 r->res_lvbptr = allocate_lvb(r->res_ls);
1067
1068 if (!r->res_lvbptr)
1069 return;
1070
1071 memcpy(r->res_lvbptr, lkb->lkb_lvbptr, r->res_ls->ls_lvblen);
1072 r->res_lvbseq++;
1073 rsb_clear_flag(r, RSB_VALNOTVALID);
1074}
1075
1076/* lkb is process copy (pc) */
1077
1078static void set_lvb_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb,
1079 struct dlm_message *ms)
1080{
1081 int b;
1082
1083 if (!lkb->lkb_lvbptr)
1084 return;
1085
1086 if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
1087 return;
1088
597d0cae 1089 b = dlm_lvb_operations[lkb->lkb_grmode + 1][lkb->lkb_rqmode + 1];
e7fd4179
DT
1090 if (b == 1) {
1091 int len = receive_extralen(ms);
1092 memcpy(lkb->lkb_lvbptr, ms->m_extra, len);
1093 lkb->lkb_lvbseq = ms->m_lvbseq;
1094 }
1095}
1096
1097/* Manipulate lkb's on rsb's convert/granted/waiting queues
1098 remove_lock -- used for unlock, removes lkb from granted
1099 revert_lock -- used for cancel, moves lkb from convert to granted
1100 grant_lock -- used for request and convert, adds lkb to granted or
1101 moves lkb from convert or waiting to granted
1102
1103 Each of these is used for master or local copy lkb's. There is
1104 also a _pc() variation used to make the corresponding change on
1105 a process copy (pc) lkb. */
1106
1107static void _remove_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1108{
1109 del_lkb(r, lkb);
1110 lkb->lkb_grmode = DLM_LOCK_IV;
1111 /* this unhold undoes the original ref from create_lkb()
1112 so this leads to the lkb being freed */
1113 unhold_lkb(lkb);
1114}
1115
1116static void remove_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1117{
1118 set_lvb_unlock(r, lkb);
1119 _remove_lock(r, lkb);
1120}
1121
1122static void remove_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb)
1123{
1124 _remove_lock(r, lkb);
1125}
1126
ef0c2bb0
DT
1127/* returns: 0 did nothing
1128 1 moved lock to granted
1129 -1 removed lock */
1130
1131static int revert_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
e7fd4179 1132{
ef0c2bb0
DT
1133 int rv = 0;
1134
e7fd4179
DT
1135 lkb->lkb_rqmode = DLM_LOCK_IV;
1136
1137 switch (lkb->lkb_status) {
597d0cae
DT
1138 case DLM_LKSTS_GRANTED:
1139 break;
e7fd4179
DT
1140 case DLM_LKSTS_CONVERT:
1141 move_lkb(r, lkb, DLM_LKSTS_GRANTED);
ef0c2bb0 1142 rv = 1;
e7fd4179
DT
1143 break;
1144 case DLM_LKSTS_WAITING:
1145 del_lkb(r, lkb);
1146 lkb->lkb_grmode = DLM_LOCK_IV;
1147 /* this unhold undoes the original ref from create_lkb()
1148 so this leads to the lkb being freed */
1149 unhold_lkb(lkb);
ef0c2bb0 1150 rv = -1;
e7fd4179
DT
1151 break;
1152 default:
1153 log_print("invalid status for revert %d", lkb->lkb_status);
1154 }
ef0c2bb0 1155 return rv;
e7fd4179
DT
1156}
1157
ef0c2bb0 1158static int revert_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb)
e7fd4179 1159{
ef0c2bb0 1160 return revert_lock(r, lkb);
e7fd4179
DT
1161}
1162
1163static void _grant_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1164{
1165 if (lkb->lkb_grmode != lkb->lkb_rqmode) {
1166 lkb->lkb_grmode = lkb->lkb_rqmode;
1167 if (lkb->lkb_status)
1168 move_lkb(r, lkb, DLM_LKSTS_GRANTED);
1169 else
1170 add_lkb(r, lkb, DLM_LKSTS_GRANTED);
1171 }
1172
1173 lkb->lkb_rqmode = DLM_LOCK_IV;
e7fd4179
DT
1174}
1175
1176static void grant_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1177{
1178 set_lvb_lock(r, lkb);
1179 _grant_lock(r, lkb);
1180 lkb->lkb_highbast = 0;
1181}
1182
1183static void grant_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb,
1184 struct dlm_message *ms)
1185{
1186 set_lvb_lock_pc(r, lkb, ms);
1187 _grant_lock(r, lkb);
1188}
1189
1190/* called by grant_pending_locks() which means an async grant message must
1191 be sent to the requesting node in addition to granting the lock if the
1192 lkb belongs to a remote node. */
1193
1194static void grant_lock_pending(struct dlm_rsb *r, struct dlm_lkb *lkb)
1195{
1196 grant_lock(r, lkb);
1197 if (is_master_copy(lkb))
1198 send_grant(r, lkb);
1199 else
1200 queue_cast(r, lkb, 0);
1201}
1202
7d3c1feb
DT
1203/* The special CONVDEADLK, ALTPR and ALTCW flags allow the master to
1204 change the granted/requested modes. We're munging things accordingly in
1205 the process copy.
1206 CONVDEADLK: our grmode may have been forced down to NL to resolve a
1207 conversion deadlock
1208 ALTPR/ALTCW: our rqmode may have been changed to PR or CW to become
1209 compatible with other granted locks */
1210
1211static void munge_demoted(struct dlm_lkb *lkb, struct dlm_message *ms)
1212{
1213 if (ms->m_type != DLM_MSG_CONVERT_REPLY) {
1214 log_print("munge_demoted %x invalid reply type %d",
1215 lkb->lkb_id, ms->m_type);
1216 return;
1217 }
1218
1219 if (lkb->lkb_rqmode == DLM_LOCK_IV || lkb->lkb_grmode == DLM_LOCK_IV) {
1220 log_print("munge_demoted %x invalid modes gr %d rq %d",
1221 lkb->lkb_id, lkb->lkb_grmode, lkb->lkb_rqmode);
1222 return;
1223 }
1224
1225 lkb->lkb_grmode = DLM_LOCK_NL;
1226}
1227
1228static void munge_altmode(struct dlm_lkb *lkb, struct dlm_message *ms)
1229{
1230 if (ms->m_type != DLM_MSG_REQUEST_REPLY &&
1231 ms->m_type != DLM_MSG_GRANT) {
1232 log_print("munge_altmode %x invalid reply type %d",
1233 lkb->lkb_id, ms->m_type);
1234 return;
1235 }
1236
1237 if (lkb->lkb_exflags & DLM_LKF_ALTPR)
1238 lkb->lkb_rqmode = DLM_LOCK_PR;
1239 else if (lkb->lkb_exflags & DLM_LKF_ALTCW)
1240 lkb->lkb_rqmode = DLM_LOCK_CW;
1241 else {
1242 log_print("munge_altmode invalid exflags %x", lkb->lkb_exflags);
1243 dlm_print_lkb(lkb);
1244 }
1245}
1246
e7fd4179
DT
1247static inline int first_in_list(struct dlm_lkb *lkb, struct list_head *head)
1248{
1249 struct dlm_lkb *first = list_entry(head->next, struct dlm_lkb,
1250 lkb_statequeue);
1251 if (lkb->lkb_id == first->lkb_id)
90135925 1252 return 1;
e7fd4179 1253
90135925 1254 return 0;
e7fd4179
DT
1255}
1256
e7fd4179
DT
1257/* Check if the given lkb conflicts with another lkb on the queue. */
1258
1259static int queue_conflict(struct list_head *head, struct dlm_lkb *lkb)
1260{
1261 struct dlm_lkb *this;
1262
1263 list_for_each_entry(this, head, lkb_statequeue) {
1264 if (this == lkb)
1265 continue;
3bcd3687 1266 if (!modes_compat(this, lkb))
90135925 1267 return 1;
e7fd4179 1268 }
90135925 1269 return 0;
e7fd4179
DT
1270}
1271
1272/*
1273 * "A conversion deadlock arises with a pair of lock requests in the converting
1274 * queue for one resource. The granted mode of each lock blocks the requested
1275 * mode of the other lock."
1276 *
1277 * Part 2: if the granted mode of lkb is preventing the first lkb in the
1278 * convert queue from being granted, then demote lkb (set grmode to NL).
1279 * This second form requires that we check for conv-deadlk even when
1280 * now == 0 in _can_be_granted().
1281 *
1282 * Example:
1283 * Granted Queue: empty
1284 * Convert Queue: NL->EX (first lock)
1285 * PR->EX (second lock)
1286 *
1287 * The first lock can't be granted because of the granted mode of the second
1288 * lock and the second lock can't be granted because it's not first in the
1289 * list. We demote the granted mode of the second lock (the lkb passed to this
1290 * function).
1291 *
1292 * After the resolution, the "grant pending" function needs to go back and try
1293 * to grant locks on the convert queue again since the first lock can now be
1294 * granted.
1295 */
1296
1297static int conversion_deadlock_detect(struct dlm_rsb *rsb, struct dlm_lkb *lkb)
1298{
1299 struct dlm_lkb *this, *first = NULL, *self = NULL;
1300
1301 list_for_each_entry(this, &rsb->res_convertqueue, lkb_statequeue) {
1302 if (!first)
1303 first = this;
1304 if (this == lkb) {
1305 self = lkb;
1306 continue;
1307 }
1308
e7fd4179 1309 if (!modes_compat(this, lkb) && !modes_compat(lkb, this))
90135925 1310 return 1;
e7fd4179
DT
1311 }
1312
1313 /* if lkb is on the convert queue and is preventing the first
1314 from being granted, then there's deadlock and we demote lkb.
1315 multiple converting locks may need to do this before the first
1316 converting lock can be granted. */
1317
1318 if (self && self != first) {
1319 if (!modes_compat(lkb, first) &&
1320 !queue_conflict(&rsb->res_grantqueue, first))
90135925 1321 return 1;
e7fd4179
DT
1322 }
1323
90135925 1324 return 0;
e7fd4179
DT
1325}
1326
1327/*
1328 * Return 1 if the lock can be granted, 0 otherwise.
1329 * Also detect and resolve conversion deadlocks.
1330 *
1331 * lkb is the lock to be granted
1332 *
1333 * now is 1 if the function is being called in the context of the
1334 * immediate request, it is 0 if called later, after the lock has been
1335 * queued.
1336 *
1337 * References are from chapter 6 of "VAXcluster Principles" by Roy Davis
1338 */
1339
1340static int _can_be_granted(struct dlm_rsb *r, struct dlm_lkb *lkb, int now)
1341{
1342 int8_t conv = (lkb->lkb_grmode != DLM_LOCK_IV);
1343
1344 /*
1345 * 6-10: Version 5.4 introduced an option to address the phenomenon of
1346 * a new request for a NL mode lock being blocked.
1347 *
1348 * 6-11: If the optional EXPEDITE flag is used with the new NL mode
1349 * request, then it would be granted. In essence, the use of this flag
1350 * tells the Lock Manager to expedite theis request by not considering
1351 * what may be in the CONVERTING or WAITING queues... As of this
1352 * writing, the EXPEDITE flag can be used only with new requests for NL
1353 * mode locks. This flag is not valid for conversion requests.
1354 *
1355 * A shortcut. Earlier checks return an error if EXPEDITE is used in a
1356 * conversion or used with a non-NL requested mode. We also know an
1357 * EXPEDITE request is always granted immediately, so now must always
1358 * be 1. The full condition to grant an expedite request: (now &&
1359 * !conv && lkb->rqmode == DLM_LOCK_NL && (flags & EXPEDITE)) can
1360 * therefore be shortened to just checking the flag.
1361 */
1362
1363 if (lkb->lkb_exflags & DLM_LKF_EXPEDITE)
90135925 1364 return 1;
e7fd4179
DT
1365
1366 /*
1367 * A shortcut. Without this, !queue_conflict(grantqueue, lkb) would be
1368 * added to the remaining conditions.
1369 */
1370
1371 if (queue_conflict(&r->res_grantqueue, lkb))
1372 goto out;
1373
1374 /*
1375 * 6-3: By default, a conversion request is immediately granted if the
1376 * requested mode is compatible with the modes of all other granted
1377 * locks
1378 */
1379
1380 if (queue_conflict(&r->res_convertqueue, lkb))
1381 goto out;
1382
1383 /*
1384 * 6-5: But the default algorithm for deciding whether to grant or
1385 * queue conversion requests does not by itself guarantee that such
1386 * requests are serviced on a "first come first serve" basis. This, in
1387 * turn, can lead to a phenomenon known as "indefinate postponement".
1388 *
1389 * 6-7: This issue is dealt with by using the optional QUECVT flag with
1390 * the system service employed to request a lock conversion. This flag
1391 * forces certain conversion requests to be queued, even if they are
1392 * compatible with the granted modes of other locks on the same
1393 * resource. Thus, the use of this flag results in conversion requests
1394 * being ordered on a "first come first servce" basis.
1395 *
1396 * DCT: This condition is all about new conversions being able to occur
1397 * "in place" while the lock remains on the granted queue (assuming
1398 * nothing else conflicts.) IOW if QUECVT isn't set, a conversion
1399 * doesn't _have_ to go onto the convert queue where it's processed in
1400 * order. The "now" variable is necessary to distinguish converts
1401 * being received and processed for the first time now, because once a
1402 * convert is moved to the conversion queue the condition below applies
1403 * requiring fifo granting.
1404 */
1405
1406 if (now && conv && !(lkb->lkb_exflags & DLM_LKF_QUECVT))
90135925 1407 return 1;
e7fd4179
DT
1408
1409 /*
3bcd3687
DT
1410 * The NOORDER flag is set to avoid the standard vms rules on grant
1411 * order.
e7fd4179
DT
1412 */
1413
1414 if (lkb->lkb_exflags & DLM_LKF_NOORDER)
90135925 1415 return 1;
e7fd4179
DT
1416
1417 /*
1418 * 6-3: Once in that queue [CONVERTING], a conversion request cannot be
1419 * granted until all other conversion requests ahead of it are granted
1420 * and/or canceled.
1421 */
1422
1423 if (!now && conv && first_in_list(lkb, &r->res_convertqueue))
90135925 1424 return 1;
e7fd4179
DT
1425
1426 /*
1427 * 6-4: By default, a new request is immediately granted only if all
1428 * three of the following conditions are satisfied when the request is
1429 * issued:
1430 * - The queue of ungranted conversion requests for the resource is
1431 * empty.
1432 * - The queue of ungranted new requests for the resource is empty.
1433 * - The mode of the new request is compatible with the most
1434 * restrictive mode of all granted locks on the resource.
1435 */
1436
1437 if (now && !conv && list_empty(&r->res_convertqueue) &&
1438 list_empty(&r->res_waitqueue))
90135925 1439 return 1;
e7fd4179
DT
1440
1441 /*
1442 * 6-4: Once a lock request is in the queue of ungranted new requests,
1443 * it cannot be granted until the queue of ungranted conversion
1444 * requests is empty, all ungranted new requests ahead of it are
1445 * granted and/or canceled, and it is compatible with the granted mode
1446 * of the most restrictive lock granted on the resource.
1447 */
1448
1449 if (!now && !conv && list_empty(&r->res_convertqueue) &&
1450 first_in_list(lkb, &r->res_waitqueue))
90135925 1451 return 1;
e7fd4179
DT
1452
1453 out:
1454 /*
1455 * The following, enabled by CONVDEADLK, departs from VMS.
1456 */
1457
1458 if (conv && (lkb->lkb_exflags & DLM_LKF_CONVDEADLK) &&
1459 conversion_deadlock_detect(r, lkb)) {
1460 lkb->lkb_grmode = DLM_LOCK_NL;
1461 lkb->lkb_sbflags |= DLM_SBF_DEMOTED;
1462 }
1463
90135925 1464 return 0;
e7fd4179
DT
1465}
1466
1467/*
1468 * The ALTPR and ALTCW flags aren't traditional lock manager flags, but are a
1469 * simple way to provide a big optimization to applications that can use them.
1470 */
1471
1472static int can_be_granted(struct dlm_rsb *r, struct dlm_lkb *lkb, int now)
1473{
1474 uint32_t flags = lkb->lkb_exflags;
1475 int rv;
1476 int8_t alt = 0, rqmode = lkb->lkb_rqmode;
1477
1478 rv = _can_be_granted(r, lkb, now);
1479 if (rv)
1480 goto out;
1481
1482 if (lkb->lkb_sbflags & DLM_SBF_DEMOTED)
1483 goto out;
1484
1485 if (rqmode != DLM_LOCK_PR && flags & DLM_LKF_ALTPR)
1486 alt = DLM_LOCK_PR;
1487 else if (rqmode != DLM_LOCK_CW && flags & DLM_LKF_ALTCW)
1488 alt = DLM_LOCK_CW;
1489
1490 if (alt) {
1491 lkb->lkb_rqmode = alt;
1492 rv = _can_be_granted(r, lkb, now);
1493 if (rv)
1494 lkb->lkb_sbflags |= DLM_SBF_ALTMODE;
1495 else
1496 lkb->lkb_rqmode = rqmode;
1497 }
1498 out:
1499 return rv;
1500}
1501
1502static int grant_pending_convert(struct dlm_rsb *r, int high)
1503{
1504 struct dlm_lkb *lkb, *s;
1505 int hi, demoted, quit, grant_restart, demote_restart;
1506
1507 quit = 0;
1508 restart:
1509 grant_restart = 0;
1510 demote_restart = 0;
1511 hi = DLM_LOCK_IV;
1512
1513 list_for_each_entry_safe(lkb, s, &r->res_convertqueue, lkb_statequeue) {
1514 demoted = is_demoted(lkb);
90135925 1515 if (can_be_granted(r, lkb, 0)) {
e7fd4179
DT
1516 grant_lock_pending(r, lkb);
1517 grant_restart = 1;
1518 } else {
1519 hi = max_t(int, lkb->lkb_rqmode, hi);
1520 if (!demoted && is_demoted(lkb))
1521 demote_restart = 1;
1522 }
1523 }
1524
1525 if (grant_restart)
1526 goto restart;
1527 if (demote_restart && !quit) {
1528 quit = 1;
1529 goto restart;
1530 }
1531
1532 return max_t(int, high, hi);
1533}
1534
1535static int grant_pending_wait(struct dlm_rsb *r, int high)
1536{
1537 struct dlm_lkb *lkb, *s;
1538
1539 list_for_each_entry_safe(lkb, s, &r->res_waitqueue, lkb_statequeue) {
90135925 1540 if (can_be_granted(r, lkb, 0))
e7fd4179
DT
1541 grant_lock_pending(r, lkb);
1542 else
1543 high = max_t(int, lkb->lkb_rqmode, high);
1544 }
1545
1546 return high;
1547}
1548
1549static void grant_pending_locks(struct dlm_rsb *r)
1550{
1551 struct dlm_lkb *lkb, *s;
1552 int high = DLM_LOCK_IV;
1553
a345da3e 1554 DLM_ASSERT(is_master(r), dlm_dump_rsb(r););
e7fd4179
DT
1555
1556 high = grant_pending_convert(r, high);
1557 high = grant_pending_wait(r, high);
1558
1559 if (high == DLM_LOCK_IV)
1560 return;
1561
1562 /*
1563 * If there are locks left on the wait/convert queue then send blocking
1564 * ASTs to granted locks based on the largest requested mode (high)
3bcd3687 1565 * found above. FIXME: highbast < high comparison not valid for PR/CW.
e7fd4179
DT
1566 */
1567
1568 list_for_each_entry_safe(lkb, s, &r->res_grantqueue, lkb_statequeue) {
1569 if (lkb->lkb_bastaddr && (lkb->lkb_highbast < high) &&
1570 !__dlm_compat_matrix[lkb->lkb_grmode+1][high+1]) {
1571 queue_bast(r, lkb, high);
1572 lkb->lkb_highbast = high;
1573 }
1574 }
1575}
1576
1577static void send_bast_queue(struct dlm_rsb *r, struct list_head *head,
1578 struct dlm_lkb *lkb)
1579{
1580 struct dlm_lkb *gr;
1581
1582 list_for_each_entry(gr, head, lkb_statequeue) {
1583 if (gr->lkb_bastaddr &&
1584 gr->lkb_highbast < lkb->lkb_rqmode &&
3bcd3687 1585 !modes_compat(gr, lkb)) {
e7fd4179
DT
1586 queue_bast(r, gr, lkb->lkb_rqmode);
1587 gr->lkb_highbast = lkb->lkb_rqmode;
1588 }
1589 }
1590}
1591
1592static void send_blocking_asts(struct dlm_rsb *r, struct dlm_lkb *lkb)
1593{
1594 send_bast_queue(r, &r->res_grantqueue, lkb);
1595}
1596
1597static void send_blocking_asts_all(struct dlm_rsb *r, struct dlm_lkb *lkb)
1598{
1599 send_bast_queue(r, &r->res_grantqueue, lkb);
1600 send_bast_queue(r, &r->res_convertqueue, lkb);
1601}
1602
1603/* set_master(r, lkb) -- set the master nodeid of a resource
1604
1605 The purpose of this function is to set the nodeid field in the given
1606 lkb using the nodeid field in the given rsb. If the rsb's nodeid is
1607 known, it can just be copied to the lkb and the function will return
1608 0. If the rsb's nodeid is _not_ known, it needs to be looked up
1609 before it can be copied to the lkb.
1610
1611 When the rsb nodeid is being looked up remotely, the initial lkb
1612 causing the lookup is kept on the ls_waiters list waiting for the
1613 lookup reply. Other lkb's waiting for the same rsb lookup are kept
1614 on the rsb's res_lookup list until the master is verified.
1615
1616 Return values:
1617 0: nodeid is set in rsb/lkb and the caller should go ahead and use it
1618 1: the rsb master is not available and the lkb has been placed on
1619 a wait queue
1620*/
1621
1622static int set_master(struct dlm_rsb *r, struct dlm_lkb *lkb)
1623{
1624 struct dlm_ls *ls = r->res_ls;
1625 int error, dir_nodeid, ret_nodeid, our_nodeid = dlm_our_nodeid();
1626
1627 if (rsb_flag(r, RSB_MASTER_UNCERTAIN)) {
1628 rsb_clear_flag(r, RSB_MASTER_UNCERTAIN);
1629 r->res_first_lkid = lkb->lkb_id;
1630 lkb->lkb_nodeid = r->res_nodeid;
1631 return 0;
1632 }
1633
1634 if (r->res_first_lkid && r->res_first_lkid != lkb->lkb_id) {
1635 list_add_tail(&lkb->lkb_rsb_lookup, &r->res_lookup);
1636 return 1;
1637 }
1638
1639 if (r->res_nodeid == 0) {
1640 lkb->lkb_nodeid = 0;
1641 return 0;
1642 }
1643
1644 if (r->res_nodeid > 0) {
1645 lkb->lkb_nodeid = r->res_nodeid;
1646 return 0;
1647 }
1648
a345da3e 1649 DLM_ASSERT(r->res_nodeid == -1, dlm_dump_rsb(r););
e7fd4179
DT
1650
1651 dir_nodeid = dlm_dir_nodeid(r);
1652
1653 if (dir_nodeid != our_nodeid) {
1654 r->res_first_lkid = lkb->lkb_id;
1655 send_lookup(r, lkb);
1656 return 1;
1657 }
1658
1659 for (;;) {
1660 /* It's possible for dlm_scand to remove an old rsb for
1661 this same resource from the toss list, us to create
1662 a new one, look up the master locally, and find it
1663 already exists just before dlm_scand does the
1664 dir_remove() on the previous rsb. */
1665
1666 error = dlm_dir_lookup(ls, our_nodeid, r->res_name,
1667 r->res_length, &ret_nodeid);
1668 if (!error)
1669 break;
1670 log_debug(ls, "dir_lookup error %d %s", error, r->res_name);
1671 schedule();
1672 }
1673
1674 if (ret_nodeid == our_nodeid) {
1675 r->res_first_lkid = 0;
1676 r->res_nodeid = 0;
1677 lkb->lkb_nodeid = 0;
1678 } else {
1679 r->res_first_lkid = lkb->lkb_id;
1680 r->res_nodeid = ret_nodeid;
1681 lkb->lkb_nodeid = ret_nodeid;
1682 }
1683 return 0;
1684}
1685
1686static void process_lookup_list(struct dlm_rsb *r)
1687{
1688 struct dlm_lkb *lkb, *safe;
1689
1690 list_for_each_entry_safe(lkb, safe, &r->res_lookup, lkb_rsb_lookup) {
ef0c2bb0 1691 list_del_init(&lkb->lkb_rsb_lookup);
e7fd4179
DT
1692 _request_lock(r, lkb);
1693 schedule();
1694 }
1695}
1696
1697/* confirm_master -- confirm (or deny) an rsb's master nodeid */
1698
1699static void confirm_master(struct dlm_rsb *r, int error)
1700{
1701 struct dlm_lkb *lkb;
1702
1703 if (!r->res_first_lkid)
1704 return;
1705
1706 switch (error) {
1707 case 0:
1708 case -EINPROGRESS:
1709 r->res_first_lkid = 0;
1710 process_lookup_list(r);
1711 break;
1712
1713 case -EAGAIN:
1714 /* the remote master didn't queue our NOQUEUE request;
1715 make a waiting lkb the first_lkid */
1716
1717 r->res_first_lkid = 0;
1718
1719 if (!list_empty(&r->res_lookup)) {
1720 lkb = list_entry(r->res_lookup.next, struct dlm_lkb,
1721 lkb_rsb_lookup);
ef0c2bb0 1722 list_del_init(&lkb->lkb_rsb_lookup);
e7fd4179
DT
1723 r->res_first_lkid = lkb->lkb_id;
1724 _request_lock(r, lkb);
1725 } else
1726 r->res_nodeid = -1;
1727 break;
1728
1729 default:
1730 log_error(r->res_ls, "confirm_master unknown error %d", error);
1731 }
1732}
1733
1734static int set_lock_args(int mode, struct dlm_lksb *lksb, uint32_t flags,
1735 int namelen, uint32_t parent_lkid, void *ast,
3bcd3687 1736 void *astarg, void *bast, struct dlm_args *args)
e7fd4179
DT
1737{
1738 int rv = -EINVAL;
1739
1740 /* check for invalid arg usage */
1741
1742 if (mode < 0 || mode > DLM_LOCK_EX)
1743 goto out;
1744
1745 if (!(flags & DLM_LKF_CONVERT) && (namelen > DLM_RESNAME_MAXLEN))
1746 goto out;
1747
1748 if (flags & DLM_LKF_CANCEL)
1749 goto out;
1750
1751 if (flags & DLM_LKF_QUECVT && !(flags & DLM_LKF_CONVERT))
1752 goto out;
1753
1754 if (flags & DLM_LKF_CONVDEADLK && !(flags & DLM_LKF_CONVERT))
1755 goto out;
1756
1757 if (flags & DLM_LKF_CONVDEADLK && flags & DLM_LKF_NOQUEUE)
1758 goto out;
1759
1760 if (flags & DLM_LKF_EXPEDITE && flags & DLM_LKF_CONVERT)
1761 goto out;
1762
1763 if (flags & DLM_LKF_EXPEDITE && flags & DLM_LKF_QUECVT)
1764 goto out;
1765
1766 if (flags & DLM_LKF_EXPEDITE && flags & DLM_LKF_NOQUEUE)
1767 goto out;
1768
1769 if (flags & DLM_LKF_EXPEDITE && mode != DLM_LOCK_NL)
1770 goto out;
1771
1772 if (!ast || !lksb)
1773 goto out;
1774
1775 if (flags & DLM_LKF_VALBLK && !lksb->sb_lvbptr)
1776 goto out;
1777
1778 /* parent/child locks not yet supported */
1779 if (parent_lkid)
1780 goto out;
1781
1782 if (flags & DLM_LKF_CONVERT && !lksb->sb_lkid)
1783 goto out;
1784
1785 /* these args will be copied to the lkb in validate_lock_args,
1786 it cannot be done now because when converting locks, fields in
1787 an active lkb cannot be modified before locking the rsb */
1788
1789 args->flags = flags;
1790 args->astaddr = ast;
1791 args->astparam = (long) astarg;
1792 args->bastaddr = bast;
1793 args->mode = mode;
1794 args->lksb = lksb;
e7fd4179
DT
1795 rv = 0;
1796 out:
1797 return rv;
1798}
1799
1800static int set_unlock_args(uint32_t flags, void *astarg, struct dlm_args *args)
1801{
1802 if (flags & ~(DLM_LKF_CANCEL | DLM_LKF_VALBLK | DLM_LKF_IVVALBLK |
1803 DLM_LKF_FORCEUNLOCK))
1804 return -EINVAL;
1805
ef0c2bb0
DT
1806 if (flags & DLM_LKF_CANCEL && flags & DLM_LKF_FORCEUNLOCK)
1807 return -EINVAL;
1808
e7fd4179
DT
1809 args->flags = flags;
1810 args->astparam = (long) astarg;
1811 return 0;
1812}
1813
1814static int validate_lock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
1815 struct dlm_args *args)
1816{
1817 int rv = -EINVAL;
1818
1819 if (args->flags & DLM_LKF_CONVERT) {
1820 if (lkb->lkb_flags & DLM_IFL_MSTCPY)
1821 goto out;
1822
1823 if (args->flags & DLM_LKF_QUECVT &&
1824 !__quecvt_compat_matrix[lkb->lkb_grmode+1][args->mode+1])
1825 goto out;
1826
1827 rv = -EBUSY;
1828 if (lkb->lkb_status != DLM_LKSTS_GRANTED)
1829 goto out;
1830
1831 if (lkb->lkb_wait_type)
1832 goto out;
ef0c2bb0
DT
1833
1834 if (is_overlap(lkb))
1835 goto out;
e7fd4179
DT
1836 }
1837
1838 lkb->lkb_exflags = args->flags;
1839 lkb->lkb_sbflags = 0;
1840 lkb->lkb_astaddr = args->astaddr;
1841 lkb->lkb_astparam = args->astparam;
1842 lkb->lkb_bastaddr = args->bastaddr;
1843 lkb->lkb_rqmode = args->mode;
1844 lkb->lkb_lksb = args->lksb;
1845 lkb->lkb_lvbptr = args->lksb->sb_lvbptr;
1846 lkb->lkb_ownpid = (int) current->pid;
e7fd4179
DT
1847 rv = 0;
1848 out:
1849 return rv;
1850}
1851
ef0c2bb0
DT
1852/* when dlm_unlock() sees -EBUSY with CANCEL/FORCEUNLOCK it returns 0
1853 for success */
1854
1855/* note: it's valid for lkb_nodeid/res_nodeid to be -1 when we get here
1856 because there may be a lookup in progress and it's valid to do
1857 cancel/unlockf on it */
1858
e7fd4179
DT
1859static int validate_unlock_args(struct dlm_lkb *lkb, struct dlm_args *args)
1860{
ef0c2bb0 1861 struct dlm_ls *ls = lkb->lkb_resource->res_ls;
e7fd4179
DT
1862 int rv = -EINVAL;
1863
ef0c2bb0
DT
1864 if (lkb->lkb_flags & DLM_IFL_MSTCPY) {
1865 log_error(ls, "unlock on MSTCPY %x", lkb->lkb_id);
1866 dlm_print_lkb(lkb);
e7fd4179 1867 goto out;
ef0c2bb0 1868 }
e7fd4179 1869
ef0c2bb0
DT
1870 /* an lkb may still exist even though the lock is EOL'ed due to a
1871 cancel, unlock or failed noqueue request; an app can't use these
1872 locks; return same error as if the lkid had not been found at all */
e7fd4179 1873
ef0c2bb0
DT
1874 if (lkb->lkb_flags & DLM_IFL_ENDOFLIFE) {
1875 log_debug(ls, "unlock on ENDOFLIFE %x", lkb->lkb_id);
1876 rv = -ENOENT;
e7fd4179 1877 goto out;
ef0c2bb0 1878 }
e7fd4179 1879
ef0c2bb0
DT
1880 /* an lkb may be waiting for an rsb lookup to complete where the
1881 lookup was initiated by another lock */
1882
1883 if (args->flags & (DLM_LKF_CANCEL | DLM_LKF_FORCEUNLOCK)) {
1884 if (!list_empty(&lkb->lkb_rsb_lookup)) {
1885 log_debug(ls, "unlock on rsb_lookup %x", lkb->lkb_id);
1886 list_del_init(&lkb->lkb_rsb_lookup);
1887 queue_cast(lkb->lkb_resource, lkb,
1888 args->flags & DLM_LKF_CANCEL ?
1889 -DLM_ECANCEL : -DLM_EUNLOCK);
1890 unhold_lkb(lkb); /* undoes create_lkb() */
1891 rv = -EBUSY;
1892 goto out;
1893 }
1894 }
1895
1896 /* cancel not allowed with another cancel/unlock in progress */
1897
1898 if (args->flags & DLM_LKF_CANCEL) {
1899 if (lkb->lkb_exflags & DLM_LKF_CANCEL)
1900 goto out;
1901
1902 if (is_overlap(lkb))
1903 goto out;
1904
1905 if (lkb->lkb_flags & DLM_IFL_RESEND) {
1906 lkb->lkb_flags |= DLM_IFL_OVERLAP_CANCEL;
1907 rv = -EBUSY;
1908 goto out;
1909 }
1910
1911 switch (lkb->lkb_wait_type) {
1912 case DLM_MSG_LOOKUP:
1913 case DLM_MSG_REQUEST:
1914 lkb->lkb_flags |= DLM_IFL_OVERLAP_CANCEL;
1915 rv = -EBUSY;
1916 goto out;
1917 case DLM_MSG_UNLOCK:
1918 case DLM_MSG_CANCEL:
1919 goto out;
1920 }
1921 /* add_to_waiters() will set OVERLAP_CANCEL */
1922 goto out_ok;
1923 }
1924
1925 /* do we need to allow a force-unlock if there's a normal unlock
1926 already in progress? in what conditions could the normal unlock
1927 fail such that we'd want to send a force-unlock to be sure? */
1928
1929 if (args->flags & DLM_LKF_FORCEUNLOCK) {
1930 if (lkb->lkb_exflags & DLM_LKF_FORCEUNLOCK)
1931 goto out;
1932
1933 if (is_overlap_unlock(lkb))
1934 goto out;
e7fd4179 1935
ef0c2bb0
DT
1936 if (lkb->lkb_flags & DLM_IFL_RESEND) {
1937 lkb->lkb_flags |= DLM_IFL_OVERLAP_UNLOCK;
1938 rv = -EBUSY;
1939 goto out;
1940 }
1941
1942 switch (lkb->lkb_wait_type) {
1943 case DLM_MSG_LOOKUP:
1944 case DLM_MSG_REQUEST:
1945 lkb->lkb_flags |= DLM_IFL_OVERLAP_UNLOCK;
1946 rv = -EBUSY;
1947 goto out;
1948 case DLM_MSG_UNLOCK:
1949 goto out;
1950 }
1951 /* add_to_waiters() will set OVERLAP_UNLOCK */
1952 goto out_ok;
1953 }
1954
1955 /* normal unlock not allowed if there's any op in progress */
e7fd4179 1956 rv = -EBUSY;
ef0c2bb0 1957 if (lkb->lkb_wait_type || lkb->lkb_wait_count)
e7fd4179
DT
1958 goto out;
1959
1960 out_ok:
ef0c2bb0
DT
1961 /* an overlapping op shouldn't blow away exflags from other op */
1962 lkb->lkb_exflags |= args->flags;
e7fd4179
DT
1963 lkb->lkb_sbflags = 0;
1964 lkb->lkb_astparam = args->astparam;
e7fd4179
DT
1965 rv = 0;
1966 out:
ef0c2bb0
DT
1967 if (rv)
1968 log_debug(ls, "validate_unlock_args %d %x %x %x %x %d %s", rv,
1969 lkb->lkb_id, lkb->lkb_flags, lkb->lkb_exflags,
1970 args->flags, lkb->lkb_wait_type,
1971 lkb->lkb_resource->res_name);
e7fd4179
DT
1972 return rv;
1973}
1974
1975/*
1976 * Four stage 4 varieties:
1977 * do_request(), do_convert(), do_unlock(), do_cancel()
1978 * These are called on the master node for the given lock and
1979 * from the central locking logic.
1980 */
1981
1982static int do_request(struct dlm_rsb *r, struct dlm_lkb *lkb)
1983{
1984 int error = 0;
1985
90135925 1986 if (can_be_granted(r, lkb, 1)) {
e7fd4179
DT
1987 grant_lock(r, lkb);
1988 queue_cast(r, lkb, 0);
1989 goto out;
1990 }
1991
1992 if (can_be_queued(lkb)) {
1993 error = -EINPROGRESS;
1994 add_lkb(r, lkb, DLM_LKSTS_WAITING);
1995 send_blocking_asts(r, lkb);
1996 goto out;
1997 }
1998
1999 error = -EAGAIN;
2000 if (force_blocking_asts(lkb))
2001 send_blocking_asts_all(r, lkb);
2002 queue_cast(r, lkb, -EAGAIN);
2003
2004 out:
2005 return error;
2006}
2007
2008static int do_convert(struct dlm_rsb *r, struct dlm_lkb *lkb)
2009{
2010 int error = 0;
2011
2012 /* changing an existing lock may allow others to be granted */
2013
90135925 2014 if (can_be_granted(r, lkb, 1)) {
e7fd4179
DT
2015 grant_lock(r, lkb);
2016 queue_cast(r, lkb, 0);
2017 grant_pending_locks(r);
2018 goto out;
2019 }
2020
7d3c1feb
DT
2021 /* is_demoted() means the can_be_granted() above set the grmode
2022 to NL, and left us on the granted queue. This auto-demotion
2023 (due to CONVDEADLK) might mean other locks, and/or this lock, are
2024 now grantable. We have to try to grant other converting locks
2025 before we try again to grant this one. */
2026
2027 if (is_demoted(lkb)) {
2028 grant_pending_convert(r, DLM_LOCK_IV);
2029 if (_can_be_granted(r, lkb, 1)) {
2030 grant_lock(r, lkb);
2031 queue_cast(r, lkb, 0);
e7fd4179 2032 grant_pending_locks(r);
7d3c1feb
DT
2033 goto out;
2034 }
2035 /* else fall through and move to convert queue */
2036 }
2037
2038 if (can_be_queued(lkb)) {
e7fd4179
DT
2039 error = -EINPROGRESS;
2040 del_lkb(r, lkb);
2041 add_lkb(r, lkb, DLM_LKSTS_CONVERT);
2042 send_blocking_asts(r, lkb);
2043 goto out;
2044 }
2045
2046 error = -EAGAIN;
2047 if (force_blocking_asts(lkb))
2048 send_blocking_asts_all(r, lkb);
2049 queue_cast(r, lkb, -EAGAIN);
2050
2051 out:
2052 return error;
2053}
2054
2055static int do_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2056{
2057 remove_lock(r, lkb);
2058 queue_cast(r, lkb, -DLM_EUNLOCK);
2059 grant_pending_locks(r);
2060 return -DLM_EUNLOCK;
2061}
2062
ef0c2bb0 2063/* returns: 0 did nothing, -DLM_ECANCEL canceled lock */
907b9bce 2064
e7fd4179
DT
2065static int do_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb)
2066{
ef0c2bb0
DT
2067 int error;
2068
2069 error = revert_lock(r, lkb);
2070 if (error) {
2071 queue_cast(r, lkb, -DLM_ECANCEL);
2072 grant_pending_locks(r);
2073 return -DLM_ECANCEL;
2074 }
2075 return 0;
e7fd4179
DT
2076}
2077
2078/*
2079 * Four stage 3 varieties:
2080 * _request_lock(), _convert_lock(), _unlock_lock(), _cancel_lock()
2081 */
2082
2083/* add a new lkb to a possibly new rsb, called by requesting process */
2084
2085static int _request_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2086{
2087 int error;
2088
2089 /* set_master: sets lkb nodeid from r */
2090
2091 error = set_master(r, lkb);
2092 if (error < 0)
2093 goto out;
2094 if (error) {
2095 error = 0;
2096 goto out;
2097 }
2098
2099 if (is_remote(r))
2100 /* receive_request() calls do_request() on remote node */
2101 error = send_request(r, lkb);
2102 else
2103 error = do_request(r, lkb);
2104 out:
2105 return error;
2106}
2107
3bcd3687 2108/* change some property of an existing lkb, e.g. mode */
e7fd4179
DT
2109
2110static int _convert_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2111{
2112 int error;
2113
2114 if (is_remote(r))
2115 /* receive_convert() calls do_convert() on remote node */
2116 error = send_convert(r, lkb);
2117 else
2118 error = do_convert(r, lkb);
2119
2120 return error;
2121}
2122
2123/* remove an existing lkb from the granted queue */
2124
2125static int _unlock_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2126{
2127 int error;
2128
2129 if (is_remote(r))
2130 /* receive_unlock() calls do_unlock() on remote node */
2131 error = send_unlock(r, lkb);
2132 else
2133 error = do_unlock(r, lkb);
2134
2135 return error;
2136}
2137
2138/* remove an existing lkb from the convert or wait queue */
2139
2140static int _cancel_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2141{
2142 int error;
2143
2144 if (is_remote(r))
2145 /* receive_cancel() calls do_cancel() on remote node */
2146 error = send_cancel(r, lkb);
2147 else
2148 error = do_cancel(r, lkb);
2149
2150 return error;
2151}
2152
2153/*
2154 * Four stage 2 varieties:
2155 * request_lock(), convert_lock(), unlock_lock(), cancel_lock()
2156 */
2157
2158static int request_lock(struct dlm_ls *ls, struct dlm_lkb *lkb, char *name,
2159 int len, struct dlm_args *args)
2160{
2161 struct dlm_rsb *r;
2162 int error;
2163
2164 error = validate_lock_args(ls, lkb, args);
2165 if (error)
2166 goto out;
2167
2168 error = find_rsb(ls, name, len, R_CREATE, &r);
2169 if (error)
2170 goto out;
2171
2172 lock_rsb(r);
2173
2174 attach_lkb(r, lkb);
2175 lkb->lkb_lksb->sb_lkid = lkb->lkb_id;
2176
2177 error = _request_lock(r, lkb);
2178
2179 unlock_rsb(r);
2180 put_rsb(r);
2181
2182 out:
2183 return error;
2184}
2185
2186static int convert_lock(struct dlm_ls *ls, struct dlm_lkb *lkb,
2187 struct dlm_args *args)
2188{
2189 struct dlm_rsb *r;
2190 int error;
2191
2192 r = lkb->lkb_resource;
2193
2194 hold_rsb(r);
2195 lock_rsb(r);
2196
2197 error = validate_lock_args(ls, lkb, args);
2198 if (error)
2199 goto out;
2200
2201 error = _convert_lock(r, lkb);
2202 out:
2203 unlock_rsb(r);
2204 put_rsb(r);
2205 return error;
2206}
2207
2208static int unlock_lock(struct dlm_ls *ls, struct dlm_lkb *lkb,
2209 struct dlm_args *args)
2210{
2211 struct dlm_rsb *r;
2212 int error;
2213
2214 r = lkb->lkb_resource;
2215
2216 hold_rsb(r);
2217 lock_rsb(r);
2218
2219 error = validate_unlock_args(lkb, args);
2220 if (error)
2221 goto out;
2222
2223 error = _unlock_lock(r, lkb);
2224 out:
2225 unlock_rsb(r);
2226 put_rsb(r);
2227 return error;
2228}
2229
2230static int cancel_lock(struct dlm_ls *ls, struct dlm_lkb *lkb,
2231 struct dlm_args *args)
2232{
2233 struct dlm_rsb *r;
2234 int error;
2235
2236 r = lkb->lkb_resource;
2237
2238 hold_rsb(r);
2239 lock_rsb(r);
2240
2241 error = validate_unlock_args(lkb, args);
2242 if (error)
2243 goto out;
2244
2245 error = _cancel_lock(r, lkb);
2246 out:
2247 unlock_rsb(r);
2248 put_rsb(r);
2249 return error;
2250}
2251
2252/*
2253 * Two stage 1 varieties: dlm_lock() and dlm_unlock()
2254 */
2255
2256int dlm_lock(dlm_lockspace_t *lockspace,
2257 int mode,
2258 struct dlm_lksb *lksb,
2259 uint32_t flags,
2260 void *name,
2261 unsigned int namelen,
2262 uint32_t parent_lkid,
2263 void (*ast) (void *astarg),
2264 void *astarg,
3bcd3687 2265 void (*bast) (void *astarg, int mode))
e7fd4179
DT
2266{
2267 struct dlm_ls *ls;
2268 struct dlm_lkb *lkb;
2269 struct dlm_args args;
2270 int error, convert = flags & DLM_LKF_CONVERT;
2271
2272 ls = dlm_find_lockspace_local(lockspace);
2273 if (!ls)
2274 return -EINVAL;
2275
85e86edf 2276 dlm_lock_recovery(ls);
e7fd4179
DT
2277
2278 if (convert)
2279 error = find_lkb(ls, lksb->sb_lkid, &lkb);
2280 else
2281 error = create_lkb(ls, &lkb);
2282
2283 if (error)
2284 goto out;
2285
2286 error = set_lock_args(mode, lksb, flags, namelen, parent_lkid, ast,
3bcd3687 2287 astarg, bast, &args);
e7fd4179
DT
2288 if (error)
2289 goto out_put;
2290
2291 if (convert)
2292 error = convert_lock(ls, lkb, &args);
2293 else
2294 error = request_lock(ls, lkb, name, namelen, &args);
2295
2296 if (error == -EINPROGRESS)
2297 error = 0;
2298 out_put:
2299 if (convert || error)
b3f58d8f 2300 __put_lkb(ls, lkb);
e7fd4179
DT
2301 if (error == -EAGAIN)
2302 error = 0;
2303 out:
85e86edf 2304 dlm_unlock_recovery(ls);
e7fd4179
DT
2305 dlm_put_lockspace(ls);
2306 return error;
2307}
2308
2309int dlm_unlock(dlm_lockspace_t *lockspace,
2310 uint32_t lkid,
2311 uint32_t flags,
2312 struct dlm_lksb *lksb,
2313 void *astarg)
2314{
2315 struct dlm_ls *ls;
2316 struct dlm_lkb *lkb;
2317 struct dlm_args args;
2318 int error;
2319
2320 ls = dlm_find_lockspace_local(lockspace);
2321 if (!ls)
2322 return -EINVAL;
2323
85e86edf 2324 dlm_lock_recovery(ls);
e7fd4179
DT
2325
2326 error = find_lkb(ls, lkid, &lkb);
2327 if (error)
2328 goto out;
2329
2330 error = set_unlock_args(flags, astarg, &args);
2331 if (error)
2332 goto out_put;
2333
2334 if (flags & DLM_LKF_CANCEL)
2335 error = cancel_lock(ls, lkb, &args);
2336 else
2337 error = unlock_lock(ls, lkb, &args);
2338
2339 if (error == -DLM_EUNLOCK || error == -DLM_ECANCEL)
2340 error = 0;
ef0c2bb0
DT
2341 if (error == -EBUSY && (flags & (DLM_LKF_CANCEL | DLM_LKF_FORCEUNLOCK)))
2342 error = 0;
e7fd4179 2343 out_put:
b3f58d8f 2344 dlm_put_lkb(lkb);
e7fd4179 2345 out:
85e86edf 2346 dlm_unlock_recovery(ls);
e7fd4179
DT
2347 dlm_put_lockspace(ls);
2348 return error;
2349}
2350
2351/*
2352 * send/receive routines for remote operations and replies
2353 *
2354 * send_args
2355 * send_common
2356 * send_request receive_request
2357 * send_convert receive_convert
2358 * send_unlock receive_unlock
2359 * send_cancel receive_cancel
2360 * send_grant receive_grant
2361 * send_bast receive_bast
2362 * send_lookup receive_lookup
2363 * send_remove receive_remove
2364 *
2365 * send_common_reply
2366 * receive_request_reply send_request_reply
2367 * receive_convert_reply send_convert_reply
2368 * receive_unlock_reply send_unlock_reply
2369 * receive_cancel_reply send_cancel_reply
2370 * receive_lookup_reply send_lookup_reply
2371 */
2372
7e4dac33
DT
2373static int _create_message(struct dlm_ls *ls, int mb_len,
2374 int to_nodeid, int mstype,
2375 struct dlm_message **ms_ret,
2376 struct dlm_mhandle **mh_ret)
e7fd4179
DT
2377{
2378 struct dlm_message *ms;
2379 struct dlm_mhandle *mh;
2380 char *mb;
e7fd4179
DT
2381
2382 /* get_buffer gives us a message handle (mh) that we need to
2383 pass into lowcomms_commit and a message buffer (mb) that we
2384 write our data into */
2385
2386 mh = dlm_lowcomms_get_buffer(to_nodeid, mb_len, GFP_KERNEL, &mb);
2387 if (!mh)
2388 return -ENOBUFS;
2389
2390 memset(mb, 0, mb_len);
2391
2392 ms = (struct dlm_message *) mb;
2393
2394 ms->m_header.h_version = (DLM_HEADER_MAJOR | DLM_HEADER_MINOR);
7e4dac33 2395 ms->m_header.h_lockspace = ls->ls_global_id;
e7fd4179
DT
2396 ms->m_header.h_nodeid = dlm_our_nodeid();
2397 ms->m_header.h_length = mb_len;
2398 ms->m_header.h_cmd = DLM_MSG;
2399
2400 ms->m_type = mstype;
2401
2402 *mh_ret = mh;
2403 *ms_ret = ms;
2404 return 0;
2405}
2406
7e4dac33
DT
2407static int create_message(struct dlm_rsb *r, struct dlm_lkb *lkb,
2408 int to_nodeid, int mstype,
2409 struct dlm_message **ms_ret,
2410 struct dlm_mhandle **mh_ret)
2411{
2412 int mb_len = sizeof(struct dlm_message);
2413
2414 switch (mstype) {
2415 case DLM_MSG_REQUEST:
2416 case DLM_MSG_LOOKUP:
2417 case DLM_MSG_REMOVE:
2418 mb_len += r->res_length;
2419 break;
2420 case DLM_MSG_CONVERT:
2421 case DLM_MSG_UNLOCK:
2422 case DLM_MSG_REQUEST_REPLY:
2423 case DLM_MSG_CONVERT_REPLY:
2424 case DLM_MSG_GRANT:
2425 if (lkb && lkb->lkb_lvbptr)
2426 mb_len += r->res_ls->ls_lvblen;
2427 break;
2428 }
2429
2430 return _create_message(r->res_ls, mb_len, to_nodeid, mstype,
2431 ms_ret, mh_ret);
2432}
2433
e7fd4179
DT
2434/* further lowcomms enhancements or alternate implementations may make
2435 the return value from this function useful at some point */
2436
2437static int send_message(struct dlm_mhandle *mh, struct dlm_message *ms)
2438{
2439 dlm_message_out(ms);
2440 dlm_lowcomms_commit_buffer(mh);
2441 return 0;
2442}
2443
2444static void send_args(struct dlm_rsb *r, struct dlm_lkb *lkb,
2445 struct dlm_message *ms)
2446{
2447 ms->m_nodeid = lkb->lkb_nodeid;
2448 ms->m_pid = lkb->lkb_ownpid;
2449 ms->m_lkid = lkb->lkb_id;
2450 ms->m_remid = lkb->lkb_remid;
2451 ms->m_exflags = lkb->lkb_exflags;
2452 ms->m_sbflags = lkb->lkb_sbflags;
2453 ms->m_flags = lkb->lkb_flags;
2454 ms->m_lvbseq = lkb->lkb_lvbseq;
2455 ms->m_status = lkb->lkb_status;
2456 ms->m_grmode = lkb->lkb_grmode;
2457 ms->m_rqmode = lkb->lkb_rqmode;
2458 ms->m_hash = r->res_hash;
2459
2460 /* m_result and m_bastmode are set from function args,
2461 not from lkb fields */
2462
2463 if (lkb->lkb_bastaddr)
2464 ms->m_asts |= AST_BAST;
2465 if (lkb->lkb_astaddr)
2466 ms->m_asts |= AST_COMP;
2467
da49f36f
DT
2468 /* compare with switch in create_message; send_remove() doesn't
2469 use send_args() */
e7fd4179 2470
da49f36f
DT
2471 switch (ms->m_type) {
2472 case DLM_MSG_REQUEST:
2473 case DLM_MSG_LOOKUP:
2474 memcpy(ms->m_extra, r->res_name, r->res_length);
2475 break;
2476 case DLM_MSG_CONVERT:
2477 case DLM_MSG_UNLOCK:
2478 case DLM_MSG_REQUEST_REPLY:
2479 case DLM_MSG_CONVERT_REPLY:
2480 case DLM_MSG_GRANT:
2481 if (!lkb->lkb_lvbptr)
2482 break;
e7fd4179 2483 memcpy(ms->m_extra, lkb->lkb_lvbptr, r->res_ls->ls_lvblen);
da49f36f
DT
2484 break;
2485 }
e7fd4179
DT
2486}
2487
2488static int send_common(struct dlm_rsb *r, struct dlm_lkb *lkb, int mstype)
2489{
2490 struct dlm_message *ms;
2491 struct dlm_mhandle *mh;
2492 int to_nodeid, error;
2493
ef0c2bb0
DT
2494 error = add_to_waiters(lkb, mstype);
2495 if (error)
2496 return error;
e7fd4179
DT
2497
2498 to_nodeid = r->res_nodeid;
2499
2500 error = create_message(r, lkb, to_nodeid, mstype, &ms, &mh);
2501 if (error)
2502 goto fail;
2503
2504 send_args(r, lkb, ms);
2505
2506 error = send_message(mh, ms);
2507 if (error)
2508 goto fail;
2509 return 0;
2510
2511 fail:
ef0c2bb0 2512 remove_from_waiters(lkb, msg_reply_type(mstype));
e7fd4179
DT
2513 return error;
2514}
2515
2516static int send_request(struct dlm_rsb *r, struct dlm_lkb *lkb)
2517{
2518 return send_common(r, lkb, DLM_MSG_REQUEST);
2519}
2520
2521static int send_convert(struct dlm_rsb *r, struct dlm_lkb *lkb)
2522{
2523 int error;
2524
2525 error = send_common(r, lkb, DLM_MSG_CONVERT);
2526
2527 /* down conversions go without a reply from the master */
2528 if (!error && down_conversion(lkb)) {
ef0c2bb0
DT
2529 remove_from_waiters(lkb, DLM_MSG_CONVERT_REPLY);
2530 r->res_ls->ls_stub_ms.m_type = DLM_MSG_CONVERT_REPLY;
e7fd4179 2531 r->res_ls->ls_stub_ms.m_result = 0;
32f105a1 2532 r->res_ls->ls_stub_ms.m_flags = lkb->lkb_flags;
e7fd4179
DT
2533 __receive_convert_reply(r, lkb, &r->res_ls->ls_stub_ms);
2534 }
2535
2536 return error;
2537}
2538
2539/* FIXME: if this lkb is the only lock we hold on the rsb, then set
2540 MASTER_UNCERTAIN to force the next request on the rsb to confirm
2541 that the master is still correct. */
2542
2543static int send_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2544{
2545 return send_common(r, lkb, DLM_MSG_UNLOCK);
2546}
2547
2548static int send_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb)
2549{
2550 return send_common(r, lkb, DLM_MSG_CANCEL);
2551}
2552
2553static int send_grant(struct dlm_rsb *r, struct dlm_lkb *lkb)
2554{
2555 struct dlm_message *ms;
2556 struct dlm_mhandle *mh;
2557 int to_nodeid, error;
2558
2559 to_nodeid = lkb->lkb_nodeid;
2560
2561 error = create_message(r, lkb, to_nodeid, DLM_MSG_GRANT, &ms, &mh);
2562 if (error)
2563 goto out;
2564
2565 send_args(r, lkb, ms);
2566
2567 ms->m_result = 0;
2568
2569 error = send_message(mh, ms);
2570 out:
2571 return error;
2572}
2573
2574static int send_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int mode)
2575{
2576 struct dlm_message *ms;
2577 struct dlm_mhandle *mh;
2578 int to_nodeid, error;
2579
2580 to_nodeid = lkb->lkb_nodeid;
2581
2582 error = create_message(r, NULL, to_nodeid, DLM_MSG_BAST, &ms, &mh);
2583 if (error)
2584 goto out;
2585
2586 send_args(r, lkb, ms);
2587
2588 ms->m_bastmode = mode;
2589
2590 error = send_message(mh, ms);
2591 out:
2592 return error;
2593}
2594
2595static int send_lookup(struct dlm_rsb *r, struct dlm_lkb *lkb)
2596{
2597 struct dlm_message *ms;
2598 struct dlm_mhandle *mh;
2599 int to_nodeid, error;
2600
ef0c2bb0
DT
2601 error = add_to_waiters(lkb, DLM_MSG_LOOKUP);
2602 if (error)
2603 return error;
e7fd4179
DT
2604
2605 to_nodeid = dlm_dir_nodeid(r);
2606
2607 error = create_message(r, NULL, to_nodeid, DLM_MSG_LOOKUP, &ms, &mh);
2608 if (error)
2609 goto fail;
2610
2611 send_args(r, lkb, ms);
2612
2613 error = send_message(mh, ms);
2614 if (error)
2615 goto fail;
2616 return 0;
2617
2618 fail:
ef0c2bb0 2619 remove_from_waiters(lkb, DLM_MSG_LOOKUP_REPLY);
e7fd4179
DT
2620 return error;
2621}
2622
2623static int send_remove(struct dlm_rsb *r)
2624{
2625 struct dlm_message *ms;
2626 struct dlm_mhandle *mh;
2627 int to_nodeid, error;
2628
2629 to_nodeid = dlm_dir_nodeid(r);
2630
2631 error = create_message(r, NULL, to_nodeid, DLM_MSG_REMOVE, &ms, &mh);
2632 if (error)
2633 goto out;
2634
2635 memcpy(ms->m_extra, r->res_name, r->res_length);
2636 ms->m_hash = r->res_hash;
2637
2638 error = send_message(mh, ms);
2639 out:
2640 return error;
2641}
2642
2643static int send_common_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
2644 int mstype, int rv)
2645{
2646 struct dlm_message *ms;
2647 struct dlm_mhandle *mh;
2648 int to_nodeid, error;
2649
2650 to_nodeid = lkb->lkb_nodeid;
2651
2652 error = create_message(r, lkb, to_nodeid, mstype, &ms, &mh);
2653 if (error)
2654 goto out;
2655
2656 send_args(r, lkb, ms);
2657
2658 ms->m_result = rv;
2659
2660 error = send_message(mh, ms);
2661 out:
2662 return error;
2663}
2664
2665static int send_request_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
2666{
2667 return send_common_reply(r, lkb, DLM_MSG_REQUEST_REPLY, rv);
2668}
2669
2670static int send_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
2671{
2672 return send_common_reply(r, lkb, DLM_MSG_CONVERT_REPLY, rv);
2673}
2674
2675static int send_unlock_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
2676{
2677 return send_common_reply(r, lkb, DLM_MSG_UNLOCK_REPLY, rv);
2678}
2679
2680static int send_cancel_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
2681{
2682 return send_common_reply(r, lkb, DLM_MSG_CANCEL_REPLY, rv);
2683}
2684
2685static int send_lookup_reply(struct dlm_ls *ls, struct dlm_message *ms_in,
2686 int ret_nodeid, int rv)
2687{
2688 struct dlm_rsb *r = &ls->ls_stub_rsb;
2689 struct dlm_message *ms;
2690 struct dlm_mhandle *mh;
2691 int error, nodeid = ms_in->m_header.h_nodeid;
2692
2693 error = create_message(r, NULL, nodeid, DLM_MSG_LOOKUP_REPLY, &ms, &mh);
2694 if (error)
2695 goto out;
2696
2697 ms->m_lkid = ms_in->m_lkid;
2698 ms->m_result = rv;
2699 ms->m_nodeid = ret_nodeid;
2700
2701 error = send_message(mh, ms);
2702 out:
2703 return error;
2704}
2705
2706/* which args we save from a received message depends heavily on the type
2707 of message, unlike the send side where we can safely send everything about
2708 the lkb for any type of message */
2709
2710static void receive_flags(struct dlm_lkb *lkb, struct dlm_message *ms)
2711{
2712 lkb->lkb_exflags = ms->m_exflags;
6f90a8b1 2713 lkb->lkb_sbflags = ms->m_sbflags;
e7fd4179
DT
2714 lkb->lkb_flags = (lkb->lkb_flags & 0xFFFF0000) |
2715 (ms->m_flags & 0x0000FFFF);
2716}
2717
2718static void receive_flags_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
2719{
2720 lkb->lkb_sbflags = ms->m_sbflags;
2721 lkb->lkb_flags = (lkb->lkb_flags & 0xFFFF0000) |
2722 (ms->m_flags & 0x0000FFFF);
2723}
2724
2725static int receive_extralen(struct dlm_message *ms)
2726{
2727 return (ms->m_header.h_length - sizeof(struct dlm_message));
2728}
2729
e7fd4179
DT
2730static int receive_lvb(struct dlm_ls *ls, struct dlm_lkb *lkb,
2731 struct dlm_message *ms)
2732{
2733 int len;
2734
2735 if (lkb->lkb_exflags & DLM_LKF_VALBLK) {
2736 if (!lkb->lkb_lvbptr)
2737 lkb->lkb_lvbptr = allocate_lvb(ls);
2738 if (!lkb->lkb_lvbptr)
2739 return -ENOMEM;
2740 len = receive_extralen(ms);
2741 memcpy(lkb->lkb_lvbptr, ms->m_extra, len);
2742 }
2743 return 0;
2744}
2745
2746static int receive_request_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
2747 struct dlm_message *ms)
2748{
2749 lkb->lkb_nodeid = ms->m_header.h_nodeid;
2750 lkb->lkb_ownpid = ms->m_pid;
2751 lkb->lkb_remid = ms->m_lkid;
2752 lkb->lkb_grmode = DLM_LOCK_IV;
2753 lkb->lkb_rqmode = ms->m_rqmode;
2754 lkb->lkb_bastaddr = (void *) (long) (ms->m_asts & AST_BAST);
2755 lkb->lkb_astaddr = (void *) (long) (ms->m_asts & AST_COMP);
2756
2757 DLM_ASSERT(is_master_copy(lkb), dlm_print_lkb(lkb););
2758
8d07fd50
DT
2759 if (lkb->lkb_exflags & DLM_LKF_VALBLK) {
2760 /* lkb was just created so there won't be an lvb yet */
2761 lkb->lkb_lvbptr = allocate_lvb(ls);
2762 if (!lkb->lkb_lvbptr)
2763 return -ENOMEM;
2764 }
e7fd4179
DT
2765
2766 return 0;
2767}
2768
2769static int receive_convert_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
2770 struct dlm_message *ms)
2771{
2772 if (lkb->lkb_nodeid != ms->m_header.h_nodeid) {
2773 log_error(ls, "convert_args nodeid %d %d lkid %x %x",
2774 lkb->lkb_nodeid, ms->m_header.h_nodeid,
2775 lkb->lkb_id, lkb->lkb_remid);
2776 return -EINVAL;
2777 }
2778
2779 if (!is_master_copy(lkb))
2780 return -EINVAL;
2781
2782 if (lkb->lkb_status != DLM_LKSTS_GRANTED)
2783 return -EBUSY;
2784
e7fd4179
DT
2785 if (receive_lvb(ls, lkb, ms))
2786 return -ENOMEM;
2787
2788 lkb->lkb_rqmode = ms->m_rqmode;
2789 lkb->lkb_lvbseq = ms->m_lvbseq;
2790
2791 return 0;
2792}
2793
2794static int receive_unlock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
2795 struct dlm_message *ms)
2796{
2797 if (!is_master_copy(lkb))
2798 return -EINVAL;
2799 if (receive_lvb(ls, lkb, ms))
2800 return -ENOMEM;
2801 return 0;
2802}
2803
2804/* We fill in the stub-lkb fields with the info that send_xxxx_reply()
2805 uses to send a reply and that the remote end uses to process the reply. */
2806
2807static void setup_stub_lkb(struct dlm_ls *ls, struct dlm_message *ms)
2808{
2809 struct dlm_lkb *lkb = &ls->ls_stub_lkb;
2810 lkb->lkb_nodeid = ms->m_header.h_nodeid;
2811 lkb->lkb_remid = ms->m_lkid;
2812}
2813
2814static void receive_request(struct dlm_ls *ls, struct dlm_message *ms)
2815{
2816 struct dlm_lkb *lkb;
2817 struct dlm_rsb *r;
2818 int error, namelen;
2819
2820 error = create_lkb(ls, &lkb);
2821 if (error)
2822 goto fail;
2823
2824 receive_flags(lkb, ms);
2825 lkb->lkb_flags |= DLM_IFL_MSTCPY;
2826 error = receive_request_args(ls, lkb, ms);
2827 if (error) {
b3f58d8f 2828 __put_lkb(ls, lkb);
e7fd4179
DT
2829 goto fail;
2830 }
2831
2832 namelen = receive_extralen(ms);
2833
2834 error = find_rsb(ls, ms->m_extra, namelen, R_MASTER, &r);
2835 if (error) {
b3f58d8f 2836 __put_lkb(ls, lkb);
e7fd4179
DT
2837 goto fail;
2838 }
2839
2840 lock_rsb(r);
2841
2842 attach_lkb(r, lkb);
2843 error = do_request(r, lkb);
2844 send_request_reply(r, lkb, error);
2845
2846 unlock_rsb(r);
2847 put_rsb(r);
2848
2849 if (error == -EINPROGRESS)
2850 error = 0;
2851 if (error)
b3f58d8f 2852 dlm_put_lkb(lkb);
e7fd4179
DT
2853 return;
2854
2855 fail:
2856 setup_stub_lkb(ls, ms);
2857 send_request_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
2858}
2859
2860static void receive_convert(struct dlm_ls *ls, struct dlm_message *ms)
2861{
2862 struct dlm_lkb *lkb;
2863 struct dlm_rsb *r;
90135925 2864 int error, reply = 1;
e7fd4179
DT
2865
2866 error = find_lkb(ls, ms->m_remid, &lkb);
2867 if (error)
2868 goto fail;
2869
2870 r = lkb->lkb_resource;
2871
2872 hold_rsb(r);
2873 lock_rsb(r);
2874
2875 receive_flags(lkb, ms);
2876 error = receive_convert_args(ls, lkb, ms);
2877 if (error)
2878 goto out;
2879 reply = !down_conversion(lkb);
2880
2881 error = do_convert(r, lkb);
2882 out:
2883 if (reply)
2884 send_convert_reply(r, lkb, error);
2885
2886 unlock_rsb(r);
2887 put_rsb(r);
b3f58d8f 2888 dlm_put_lkb(lkb);
e7fd4179
DT
2889 return;
2890
2891 fail:
2892 setup_stub_lkb(ls, ms);
2893 send_convert_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
2894}
2895
2896static void receive_unlock(struct dlm_ls *ls, struct dlm_message *ms)
2897{
2898 struct dlm_lkb *lkb;
2899 struct dlm_rsb *r;
2900 int error;
2901
2902 error = find_lkb(ls, ms->m_remid, &lkb);
2903 if (error)
2904 goto fail;
2905
2906 r = lkb->lkb_resource;
2907
2908 hold_rsb(r);
2909 lock_rsb(r);
2910
2911 receive_flags(lkb, ms);
2912 error = receive_unlock_args(ls, lkb, ms);
2913 if (error)
2914 goto out;
2915
2916 error = do_unlock(r, lkb);
2917 out:
2918 send_unlock_reply(r, lkb, error);
2919
2920 unlock_rsb(r);
2921 put_rsb(r);
b3f58d8f 2922 dlm_put_lkb(lkb);
e7fd4179
DT
2923 return;
2924
2925 fail:
2926 setup_stub_lkb(ls, ms);
2927 send_unlock_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
2928}
2929
2930static void receive_cancel(struct dlm_ls *ls, struct dlm_message *ms)
2931{
2932 struct dlm_lkb *lkb;
2933 struct dlm_rsb *r;
2934 int error;
2935
2936 error = find_lkb(ls, ms->m_remid, &lkb);
2937 if (error)
2938 goto fail;
2939
2940 receive_flags(lkb, ms);
2941
2942 r = lkb->lkb_resource;
2943
2944 hold_rsb(r);
2945 lock_rsb(r);
2946
2947 error = do_cancel(r, lkb);
2948 send_cancel_reply(r, lkb, error);
2949
2950 unlock_rsb(r);
2951 put_rsb(r);
b3f58d8f 2952 dlm_put_lkb(lkb);
e7fd4179
DT
2953 return;
2954
2955 fail:
2956 setup_stub_lkb(ls, ms);
2957 send_cancel_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
2958}
2959
2960static void receive_grant(struct dlm_ls *ls, struct dlm_message *ms)
2961{
2962 struct dlm_lkb *lkb;
2963 struct dlm_rsb *r;
2964 int error;
2965
2966 error = find_lkb(ls, ms->m_remid, &lkb);
2967 if (error) {
2968 log_error(ls, "receive_grant no lkb");
2969 return;
2970 }
2971 DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb););
2972
2973 r = lkb->lkb_resource;
2974
2975 hold_rsb(r);
2976 lock_rsb(r);
2977
2978 receive_flags_reply(lkb, ms);
7d3c1feb
DT
2979 if (is_altmode(lkb))
2980 munge_altmode(lkb, ms);
e7fd4179
DT
2981 grant_lock_pc(r, lkb, ms);
2982 queue_cast(r, lkb, 0);
2983
2984 unlock_rsb(r);
2985 put_rsb(r);
b3f58d8f 2986 dlm_put_lkb(lkb);
e7fd4179
DT
2987}
2988
2989static void receive_bast(struct dlm_ls *ls, struct dlm_message *ms)
2990{
2991 struct dlm_lkb *lkb;
2992 struct dlm_rsb *r;
2993 int error;
2994
2995 error = find_lkb(ls, ms->m_remid, &lkb);
2996 if (error) {
2997 log_error(ls, "receive_bast no lkb");
2998 return;
2999 }
3000 DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb););
3001
3002 r = lkb->lkb_resource;
3003
3004 hold_rsb(r);
3005 lock_rsb(r);
3006
3007 queue_bast(r, lkb, ms->m_bastmode);
3008
3009 unlock_rsb(r);
3010 put_rsb(r);
b3f58d8f 3011 dlm_put_lkb(lkb);
e7fd4179
DT
3012}
3013
3014static void receive_lookup(struct dlm_ls *ls, struct dlm_message *ms)
3015{
3016 int len, error, ret_nodeid, dir_nodeid, from_nodeid, our_nodeid;
3017
3018 from_nodeid = ms->m_header.h_nodeid;
3019 our_nodeid = dlm_our_nodeid();
3020
3021 len = receive_extralen(ms);
3022
3023 dir_nodeid = dlm_hash2nodeid(ls, ms->m_hash);
3024 if (dir_nodeid != our_nodeid) {
3025 log_error(ls, "lookup dir_nodeid %d from %d",
3026 dir_nodeid, from_nodeid);
3027 error = -EINVAL;
3028 ret_nodeid = -1;
3029 goto out;
3030 }
3031
3032 error = dlm_dir_lookup(ls, from_nodeid, ms->m_extra, len, &ret_nodeid);
3033
3034 /* Optimization: we're master so treat lookup as a request */
3035 if (!error && ret_nodeid == our_nodeid) {
3036 receive_request(ls, ms);
3037 return;
3038 }
3039 out:
3040 send_lookup_reply(ls, ms, ret_nodeid, error);
3041}
3042
3043static void receive_remove(struct dlm_ls *ls, struct dlm_message *ms)
3044{
3045 int len, dir_nodeid, from_nodeid;
3046
3047 from_nodeid = ms->m_header.h_nodeid;
3048
3049 len = receive_extralen(ms);
3050
3051 dir_nodeid = dlm_hash2nodeid(ls, ms->m_hash);
3052 if (dir_nodeid != dlm_our_nodeid()) {
3053 log_error(ls, "remove dir entry dir_nodeid %d from %d",
3054 dir_nodeid, from_nodeid);
3055 return;
3056 }
3057
3058 dlm_dir_remove_entry(ls, from_nodeid, ms->m_extra, len);
3059}
3060
8499137d
DT
3061static void receive_purge(struct dlm_ls *ls, struct dlm_message *ms)
3062{
3063 do_purge(ls, ms->m_nodeid, ms->m_pid);
3064}
3065
e7fd4179
DT
3066static void receive_request_reply(struct dlm_ls *ls, struct dlm_message *ms)
3067{
3068 struct dlm_lkb *lkb;
3069 struct dlm_rsb *r;
ef0c2bb0 3070 int error, mstype, result;
e7fd4179
DT
3071
3072 error = find_lkb(ls, ms->m_remid, &lkb);
3073 if (error) {
3074 log_error(ls, "receive_request_reply no lkb");
3075 return;
3076 }
3077 DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb););
3078
e7fd4179
DT
3079 r = lkb->lkb_resource;
3080 hold_rsb(r);
3081 lock_rsb(r);
3082
ef0c2bb0
DT
3083 mstype = lkb->lkb_wait_type;
3084 error = remove_from_waiters(lkb, DLM_MSG_REQUEST_REPLY);
3085 if (error)
3086 goto out;
3087
e7fd4179
DT
3088 /* Optimization: the dir node was also the master, so it took our
3089 lookup as a request and sent request reply instead of lookup reply */
3090 if (mstype == DLM_MSG_LOOKUP) {
3091 r->res_nodeid = ms->m_header.h_nodeid;
3092 lkb->lkb_nodeid = r->res_nodeid;
3093 }
3094
ef0c2bb0
DT
3095 /* this is the value returned from do_request() on the master */
3096 result = ms->m_result;
3097
3098 switch (result) {
e7fd4179 3099 case -EAGAIN:
ef0c2bb0 3100 /* request would block (be queued) on remote master */
e7fd4179
DT
3101 queue_cast(r, lkb, -EAGAIN);
3102 confirm_master(r, -EAGAIN);
ef0c2bb0 3103 unhold_lkb(lkb); /* undoes create_lkb() */
e7fd4179
DT
3104 break;
3105
3106 case -EINPROGRESS:
3107 case 0:
3108 /* request was queued or granted on remote master */
3109 receive_flags_reply(lkb, ms);
3110 lkb->lkb_remid = ms->m_lkid;
7d3c1feb
DT
3111 if (is_altmode(lkb))
3112 munge_altmode(lkb, ms);
ef0c2bb0 3113 if (result)
e7fd4179
DT
3114 add_lkb(r, lkb, DLM_LKSTS_WAITING);
3115 else {
3116 grant_lock_pc(r, lkb, ms);
3117 queue_cast(r, lkb, 0);
3118 }
ef0c2bb0 3119 confirm_master(r, result);
e7fd4179
DT
3120 break;
3121
597d0cae 3122 case -EBADR:
e7fd4179
DT
3123 case -ENOTBLK:
3124 /* find_rsb failed to find rsb or rsb wasn't master */
ef0c2bb0
DT
3125 log_debug(ls, "receive_request_reply %x %x master diff %d %d",
3126 lkb->lkb_id, lkb->lkb_flags, r->res_nodeid, result);
e7fd4179
DT
3127 r->res_nodeid = -1;
3128 lkb->lkb_nodeid = -1;
ef0c2bb0
DT
3129
3130 if (is_overlap(lkb)) {
3131 /* we'll ignore error in cancel/unlock reply */
3132 queue_cast_overlap(r, lkb);
3133 unhold_lkb(lkb); /* undoes create_lkb() */
3134 } else
3135 _request_lock(r, lkb);
e7fd4179
DT
3136 break;
3137
3138 default:
ef0c2bb0
DT
3139 log_error(ls, "receive_request_reply %x error %d",
3140 lkb->lkb_id, result);
e7fd4179
DT
3141 }
3142
ef0c2bb0
DT
3143 if (is_overlap_unlock(lkb) && (result == 0 || result == -EINPROGRESS)) {
3144 log_debug(ls, "receive_request_reply %x result %d unlock",
3145 lkb->lkb_id, result);
3146 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK;
3147 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
3148 send_unlock(r, lkb);
3149 } else if (is_overlap_cancel(lkb) && (result == -EINPROGRESS)) {
3150 log_debug(ls, "receive_request_reply %x cancel", lkb->lkb_id);
3151 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK;
3152 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
3153 send_cancel(r, lkb);
3154 } else {
3155 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
3156 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK;
3157 }
3158 out:
e7fd4179
DT
3159 unlock_rsb(r);
3160 put_rsb(r);
b3f58d8f 3161 dlm_put_lkb(lkb);
e7fd4179
DT
3162}
3163
3164static void __receive_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
3165 struct dlm_message *ms)
3166{
e7fd4179 3167 /* this is the value returned from do_convert() on the master */
ef0c2bb0 3168 switch (ms->m_result) {
e7fd4179
DT
3169 case -EAGAIN:
3170 /* convert would block (be queued) on remote master */
3171 queue_cast(r, lkb, -EAGAIN);
3172 break;
3173
3174 case -EINPROGRESS:
3175 /* convert was queued on remote master */
7d3c1feb
DT
3176 receive_flags_reply(lkb, ms);
3177 if (is_demoted(lkb))
3178 munge_demoted(lkb, ms);
e7fd4179
DT
3179 del_lkb(r, lkb);
3180 add_lkb(r, lkb, DLM_LKSTS_CONVERT);
3181 break;
3182
3183 case 0:
3184 /* convert was granted on remote master */
3185 receive_flags_reply(lkb, ms);
7d3c1feb
DT
3186 if (is_demoted(lkb))
3187 munge_demoted(lkb, ms);
e7fd4179
DT
3188 grant_lock_pc(r, lkb, ms);
3189 queue_cast(r, lkb, 0);
3190 break;
3191
3192 default:
ef0c2bb0
DT
3193 log_error(r->res_ls, "receive_convert_reply %x error %d",
3194 lkb->lkb_id, ms->m_result);
e7fd4179
DT
3195 }
3196}
3197
3198static void _receive_convert_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
3199{
3200 struct dlm_rsb *r = lkb->lkb_resource;
ef0c2bb0 3201 int error;
e7fd4179
DT
3202
3203 hold_rsb(r);
3204 lock_rsb(r);
3205
ef0c2bb0
DT
3206 /* stub reply can happen with waiters_mutex held */
3207 error = remove_from_waiters_ms(lkb, ms);
3208 if (error)
3209 goto out;
e7fd4179 3210
ef0c2bb0
DT
3211 __receive_convert_reply(r, lkb, ms);
3212 out:
e7fd4179
DT
3213 unlock_rsb(r);
3214 put_rsb(r);
3215}
3216
3217static void receive_convert_reply(struct dlm_ls *ls, struct dlm_message *ms)
3218{
3219 struct dlm_lkb *lkb;
3220 int error;
3221
3222 error = find_lkb(ls, ms->m_remid, &lkb);
3223 if (error) {
3224 log_error(ls, "receive_convert_reply no lkb");
3225 return;
3226 }
3227 DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb););
3228
e7fd4179 3229 _receive_convert_reply(lkb, ms);
b3f58d8f 3230 dlm_put_lkb(lkb);
e7fd4179
DT
3231}
3232
3233static void _receive_unlock_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
3234{
3235 struct dlm_rsb *r = lkb->lkb_resource;
ef0c2bb0 3236 int error;
e7fd4179
DT
3237
3238 hold_rsb(r);
3239 lock_rsb(r);
3240
ef0c2bb0
DT
3241 /* stub reply can happen with waiters_mutex held */
3242 error = remove_from_waiters_ms(lkb, ms);
3243 if (error)
3244 goto out;
3245
e7fd4179
DT
3246 /* this is the value returned from do_unlock() on the master */
3247
ef0c2bb0 3248 switch (ms->m_result) {
e7fd4179
DT
3249 case -DLM_EUNLOCK:
3250 receive_flags_reply(lkb, ms);
3251 remove_lock_pc(r, lkb);
3252 queue_cast(r, lkb, -DLM_EUNLOCK);
3253 break;
ef0c2bb0
DT
3254 case -ENOENT:
3255 break;
e7fd4179 3256 default:
ef0c2bb0
DT
3257 log_error(r->res_ls, "receive_unlock_reply %x error %d",
3258 lkb->lkb_id, ms->m_result);
e7fd4179 3259 }
ef0c2bb0 3260 out:
e7fd4179
DT
3261 unlock_rsb(r);
3262 put_rsb(r);
3263}
3264
3265static void receive_unlock_reply(struct dlm_ls *ls, struct dlm_message *ms)
3266{
3267 struct dlm_lkb *lkb;
3268 int error;
3269
3270 error = find_lkb(ls, ms->m_remid, &lkb);
3271 if (error) {
3272 log_error(ls, "receive_unlock_reply no lkb");
3273 return;
3274 }
3275 DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb););
3276
e7fd4179 3277 _receive_unlock_reply(lkb, ms);
b3f58d8f 3278 dlm_put_lkb(lkb);
e7fd4179
DT
3279}
3280
3281static void _receive_cancel_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
3282{
3283 struct dlm_rsb *r = lkb->lkb_resource;
ef0c2bb0 3284 int error;
e7fd4179
DT
3285
3286 hold_rsb(r);
3287 lock_rsb(r);
3288
ef0c2bb0
DT
3289 /* stub reply can happen with waiters_mutex held */
3290 error = remove_from_waiters_ms(lkb, ms);
3291 if (error)
3292 goto out;
3293
e7fd4179
DT
3294 /* this is the value returned from do_cancel() on the master */
3295
ef0c2bb0 3296 switch (ms->m_result) {
e7fd4179
DT
3297 case -DLM_ECANCEL:
3298 receive_flags_reply(lkb, ms);
3299 revert_lock_pc(r, lkb);
ef0c2bb0
DT
3300 if (ms->m_result)
3301 queue_cast(r, lkb, -DLM_ECANCEL);
3302 break;
3303 case 0:
e7fd4179
DT
3304 break;
3305 default:
ef0c2bb0
DT
3306 log_error(r->res_ls, "receive_cancel_reply %x error %d",
3307 lkb->lkb_id, ms->m_result);
e7fd4179 3308 }
ef0c2bb0 3309 out:
e7fd4179
DT
3310 unlock_rsb(r);
3311 put_rsb(r);
3312}
3313
3314static void receive_cancel_reply(struct dlm_ls *ls, struct dlm_message *ms)
3315{
3316 struct dlm_lkb *lkb;
3317 int error;
3318
3319 error = find_lkb(ls, ms->m_remid, &lkb);
3320 if (error) {
3321 log_error(ls, "receive_cancel_reply no lkb");
3322 return;
3323 }
3324 DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb););
3325
e7fd4179 3326 _receive_cancel_reply(lkb, ms);
b3f58d8f 3327 dlm_put_lkb(lkb);
e7fd4179
DT
3328}
3329
3330static void receive_lookup_reply(struct dlm_ls *ls, struct dlm_message *ms)
3331{
3332 struct dlm_lkb *lkb;
3333 struct dlm_rsb *r;
3334 int error, ret_nodeid;
3335
3336 error = find_lkb(ls, ms->m_lkid, &lkb);
3337 if (error) {
3338 log_error(ls, "receive_lookup_reply no lkb");
3339 return;
3340 }
3341
ef0c2bb0 3342 /* ms->m_result is the value returned by dlm_dir_lookup on dir node
e7fd4179 3343 FIXME: will a non-zero error ever be returned? */
e7fd4179
DT
3344
3345 r = lkb->lkb_resource;
3346 hold_rsb(r);
3347 lock_rsb(r);
3348
ef0c2bb0
DT
3349 error = remove_from_waiters(lkb, DLM_MSG_LOOKUP_REPLY);
3350 if (error)
3351 goto out;
3352
e7fd4179
DT
3353 ret_nodeid = ms->m_nodeid;
3354 if (ret_nodeid == dlm_our_nodeid()) {
3355 r->res_nodeid = 0;
3356 ret_nodeid = 0;
3357 r->res_first_lkid = 0;
3358 } else {
3359 /* set_master() will copy res_nodeid to lkb_nodeid */
3360 r->res_nodeid = ret_nodeid;
3361 }
3362
ef0c2bb0
DT
3363 if (is_overlap(lkb)) {
3364 log_debug(ls, "receive_lookup_reply %x unlock %x",
3365 lkb->lkb_id, lkb->lkb_flags);
3366 queue_cast_overlap(r, lkb);
3367 unhold_lkb(lkb); /* undoes create_lkb() */
3368 goto out_list;
3369 }
3370
e7fd4179
DT
3371 _request_lock(r, lkb);
3372
ef0c2bb0 3373 out_list:
e7fd4179
DT
3374 if (!ret_nodeid)
3375 process_lookup_list(r);
ef0c2bb0 3376 out:
e7fd4179
DT
3377 unlock_rsb(r);
3378 put_rsb(r);
b3f58d8f 3379 dlm_put_lkb(lkb);
e7fd4179
DT
3380}
3381
3382int dlm_receive_message(struct dlm_header *hd, int nodeid, int recovery)
3383{
3384 struct dlm_message *ms = (struct dlm_message *) hd;
3385 struct dlm_ls *ls;
8fd3a98f 3386 int error = 0;
e7fd4179
DT
3387
3388 if (!recovery)
3389 dlm_message_in(ms);
3390
3391 ls = dlm_find_lockspace_global(hd->h_lockspace);
3392 if (!ls) {
3393 log_print("drop message %d from %d for unknown lockspace %d",
3394 ms->m_type, nodeid, hd->h_lockspace);
3395 return -EINVAL;
3396 }
3397
3398 /* recovery may have just ended leaving a bunch of backed-up requests
3399 in the requestqueue; wait while dlm_recoverd clears them */
3400
3401 if (!recovery)
3402 dlm_wait_requestqueue(ls);
3403
3404 /* recovery may have just started while there were a bunch of
3405 in-flight requests -- save them in requestqueue to be processed
3406 after recovery. we can't let dlm_recvd block on the recovery
3407 lock. if dlm_recoverd is calling this function to clear the
3408 requestqueue, it needs to be interrupted (-EINTR) if another
3409 recovery operation is starting. */
3410
3411 while (1) {
3412 if (dlm_locking_stopped(ls)) {
d4400156
DT
3413 if (recovery) {
3414 error = -EINTR;
3415 goto out;
3416 }
3417 error = dlm_add_requestqueue(ls, nodeid, hd);
3418 if (error == -EAGAIN)
3419 continue;
3420 else {
3421 error = -EINTR;
3422 goto out;
3423 }
e7fd4179
DT
3424 }
3425
85e86edf 3426 if (dlm_lock_recovery_try(ls))
e7fd4179
DT
3427 break;
3428 schedule();
3429 }
3430
3431 switch (ms->m_type) {
3432
3433 /* messages sent to a master node */
3434
3435 case DLM_MSG_REQUEST:
3436 receive_request(ls, ms);
3437 break;
3438
3439 case DLM_MSG_CONVERT:
3440 receive_convert(ls, ms);
3441 break;
3442
3443 case DLM_MSG_UNLOCK:
3444 receive_unlock(ls, ms);
3445 break;
3446
3447 case DLM_MSG_CANCEL:
3448 receive_cancel(ls, ms);
3449 break;
3450
3451 /* messages sent from a master node (replies to above) */
3452
3453 case DLM_MSG_REQUEST_REPLY:
3454 receive_request_reply(ls, ms);
3455 break;
3456
3457 case DLM_MSG_CONVERT_REPLY:
3458 receive_convert_reply(ls, ms);
3459 break;
3460
3461 case DLM_MSG_UNLOCK_REPLY:
3462 receive_unlock_reply(ls, ms);
3463 break;
3464
3465 case DLM_MSG_CANCEL_REPLY:
3466 receive_cancel_reply(ls, ms);
3467 break;
3468
3469 /* messages sent from a master node (only two types of async msg) */
3470
3471 case DLM_MSG_GRANT:
3472 receive_grant(ls, ms);
3473 break;
3474
3475 case DLM_MSG_BAST:
3476 receive_bast(ls, ms);
3477 break;
3478
3479 /* messages sent to a dir node */
3480
3481 case DLM_MSG_LOOKUP:
3482 receive_lookup(ls, ms);
3483 break;
3484
3485 case DLM_MSG_REMOVE:
3486 receive_remove(ls, ms);
3487 break;
3488
3489 /* messages sent from a dir node (remove has no reply) */
3490
3491 case DLM_MSG_LOOKUP_REPLY:
3492 receive_lookup_reply(ls, ms);
3493 break;
3494
8499137d
DT
3495 /* other messages */
3496
3497 case DLM_MSG_PURGE:
3498 receive_purge(ls, ms);
3499 break;
3500
e7fd4179
DT
3501 default:
3502 log_error(ls, "unknown message type %d", ms->m_type);
3503 }
3504
85e86edf 3505 dlm_unlock_recovery(ls);
e7fd4179
DT
3506 out:
3507 dlm_put_lockspace(ls);
3508 dlm_astd_wake();
8fd3a98f 3509 return error;
e7fd4179
DT
3510}
3511
3512
3513/*
3514 * Recovery related
3515 */
3516
3517static void recover_convert_waiter(struct dlm_ls *ls, struct dlm_lkb *lkb)
3518{
3519 if (middle_conversion(lkb)) {
3520 hold_lkb(lkb);
ef0c2bb0 3521 ls->ls_stub_ms.m_type = DLM_MSG_CONVERT_REPLY;
e7fd4179 3522 ls->ls_stub_ms.m_result = -EINPROGRESS;
075529b5 3523 ls->ls_stub_ms.m_flags = lkb->lkb_flags;
e7fd4179
DT
3524 _receive_convert_reply(lkb, &ls->ls_stub_ms);
3525
3526 /* Same special case as in receive_rcom_lock_args() */
3527 lkb->lkb_grmode = DLM_LOCK_IV;
3528 rsb_set_flag(lkb->lkb_resource, RSB_RECOVER_CONVERT);
3529 unhold_lkb(lkb);
3530
3531 } else if (lkb->lkb_rqmode >= lkb->lkb_grmode) {
3532 lkb->lkb_flags |= DLM_IFL_RESEND;
3533 }
3534
3535 /* lkb->lkb_rqmode < lkb->lkb_grmode shouldn't happen since down
3536 conversions are async; there's no reply from the remote master */
3537}
3538
3539/* A waiting lkb needs recovery if the master node has failed, or
3540 the master node is changing (only when no directory is used) */
3541
3542static int waiter_needs_recovery(struct dlm_ls *ls, struct dlm_lkb *lkb)
3543{
3544 if (dlm_is_removed(ls, lkb->lkb_nodeid))
3545 return 1;
3546
3547 if (!dlm_no_directory(ls))
3548 return 0;
3549
3550 if (dlm_dir_nodeid(lkb->lkb_resource) != lkb->lkb_nodeid)
3551 return 1;
3552
3553 return 0;
3554}
3555
3556/* Recovery for locks that are waiting for replies from nodes that are now
3557 gone. We can just complete unlocks and cancels by faking a reply from the
3558 dead node. Requests and up-conversions we flag to be resent after
3559 recovery. Down-conversions can just be completed with a fake reply like
3560 unlocks. Conversions between PR and CW need special attention. */
3561
3562void dlm_recover_waiters_pre(struct dlm_ls *ls)
3563{
3564 struct dlm_lkb *lkb, *safe;
3565
90135925 3566 mutex_lock(&ls->ls_waiters_mutex);
e7fd4179
DT
3567
3568 list_for_each_entry_safe(lkb, safe, &ls->ls_waiters, lkb_wait_reply) {
3569 log_debug(ls, "pre recover waiter lkid %x type %d flags %x",
3570 lkb->lkb_id, lkb->lkb_wait_type, lkb->lkb_flags);
3571
3572 /* all outstanding lookups, regardless of destination will be
3573 resent after recovery is done */
3574
3575 if (lkb->lkb_wait_type == DLM_MSG_LOOKUP) {
3576 lkb->lkb_flags |= DLM_IFL_RESEND;
3577 continue;
3578 }
3579
3580 if (!waiter_needs_recovery(ls, lkb))
3581 continue;
3582
3583 switch (lkb->lkb_wait_type) {
3584
3585 case DLM_MSG_REQUEST:
3586 lkb->lkb_flags |= DLM_IFL_RESEND;
3587 break;
3588
3589 case DLM_MSG_CONVERT:
3590 recover_convert_waiter(ls, lkb);
3591 break;
3592
3593 case DLM_MSG_UNLOCK:
3594 hold_lkb(lkb);
ef0c2bb0 3595 ls->ls_stub_ms.m_type = DLM_MSG_UNLOCK_REPLY;
e7fd4179 3596 ls->ls_stub_ms.m_result = -DLM_EUNLOCK;
075529b5 3597 ls->ls_stub_ms.m_flags = lkb->lkb_flags;
e7fd4179 3598 _receive_unlock_reply(lkb, &ls->ls_stub_ms);
b3f58d8f 3599 dlm_put_lkb(lkb);
e7fd4179
DT
3600 break;
3601
3602 case DLM_MSG_CANCEL:
3603 hold_lkb(lkb);
ef0c2bb0 3604 ls->ls_stub_ms.m_type = DLM_MSG_CANCEL_REPLY;
e7fd4179 3605 ls->ls_stub_ms.m_result = -DLM_ECANCEL;
075529b5 3606 ls->ls_stub_ms.m_flags = lkb->lkb_flags;
e7fd4179 3607 _receive_cancel_reply(lkb, &ls->ls_stub_ms);
b3f58d8f 3608 dlm_put_lkb(lkb);
e7fd4179
DT
3609 break;
3610
3611 default:
3612 log_error(ls, "invalid lkb wait_type %d",
3613 lkb->lkb_wait_type);
3614 }
81456807 3615 schedule();
e7fd4179 3616 }
90135925 3617 mutex_unlock(&ls->ls_waiters_mutex);
e7fd4179
DT
3618}
3619
ef0c2bb0 3620static struct dlm_lkb *find_resend_waiter(struct dlm_ls *ls)
e7fd4179
DT
3621{
3622 struct dlm_lkb *lkb;
ef0c2bb0 3623 int found = 0;
e7fd4179 3624
90135925 3625 mutex_lock(&ls->ls_waiters_mutex);
e7fd4179
DT
3626 list_for_each_entry(lkb, &ls->ls_waiters, lkb_wait_reply) {
3627 if (lkb->lkb_flags & DLM_IFL_RESEND) {
ef0c2bb0
DT
3628 hold_lkb(lkb);
3629 found = 1;
e7fd4179
DT
3630 break;
3631 }
3632 }
90135925 3633 mutex_unlock(&ls->ls_waiters_mutex);
e7fd4179 3634
ef0c2bb0 3635 if (!found)
e7fd4179 3636 lkb = NULL;
ef0c2bb0 3637 return lkb;
e7fd4179
DT
3638}
3639
3640/* Deal with lookups and lkb's marked RESEND from _pre. We may now be the
3641 master or dir-node for r. Processing the lkb may result in it being placed
3642 back on waiters. */
3643
ef0c2bb0
DT
3644/* We do this after normal locking has been enabled and any saved messages
3645 (in requestqueue) have been processed. We should be confident that at
3646 this point we won't get or process a reply to any of these waiting
3647 operations. But, new ops may be coming in on the rsbs/locks here from
3648 userspace or remotely. */
3649
3650/* there may have been an overlap unlock/cancel prior to recovery or after
3651 recovery. if before, the lkb may still have a pos wait_count; if after, the
3652 overlap flag would just have been set and nothing new sent. we can be
3653 confident here than any replies to either the initial op or overlap ops
3654 prior to recovery have been received. */
3655
e7fd4179
DT
3656int dlm_recover_waiters_post(struct dlm_ls *ls)
3657{
3658 struct dlm_lkb *lkb;
3659 struct dlm_rsb *r;
ef0c2bb0 3660 int error = 0, mstype, err, oc, ou;
e7fd4179
DT
3661
3662 while (1) {
3663 if (dlm_locking_stopped(ls)) {
3664 log_debug(ls, "recover_waiters_post aborted");
3665 error = -EINTR;
3666 break;
3667 }
3668
ef0c2bb0
DT
3669 lkb = find_resend_waiter(ls);
3670 if (!lkb)
e7fd4179
DT
3671 break;
3672
3673 r = lkb->lkb_resource;
ef0c2bb0
DT
3674 hold_rsb(r);
3675 lock_rsb(r);
3676
3677 mstype = lkb->lkb_wait_type;
3678 oc = is_overlap_cancel(lkb);
3679 ou = is_overlap_unlock(lkb);
3680 err = 0;
e7fd4179
DT
3681
3682 log_debug(ls, "recover_waiters_post %x type %d flags %x %s",
3683 lkb->lkb_id, mstype, lkb->lkb_flags, r->res_name);
3684
ef0c2bb0
DT
3685 /* At this point we assume that we won't get a reply to any
3686 previous op or overlap op on this lock. First, do a big
3687 remove_from_waiters() for all previous ops. */
3688
3689 lkb->lkb_flags &= ~DLM_IFL_RESEND;
3690 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK;
3691 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
3692 lkb->lkb_wait_type = 0;
3693 lkb->lkb_wait_count = 0;
3694 mutex_lock(&ls->ls_waiters_mutex);
3695 list_del_init(&lkb->lkb_wait_reply);
3696 mutex_unlock(&ls->ls_waiters_mutex);
3697 unhold_lkb(lkb); /* for waiters list */
3698
3699 if (oc || ou) {
3700 /* do an unlock or cancel instead of resending */
3701 switch (mstype) {
3702 case DLM_MSG_LOOKUP:
3703 case DLM_MSG_REQUEST:
3704 queue_cast(r, lkb, ou ? -DLM_EUNLOCK :
3705 -DLM_ECANCEL);
3706 unhold_lkb(lkb); /* undoes create_lkb() */
3707 break;
3708 case DLM_MSG_CONVERT:
3709 if (oc) {
3710 queue_cast(r, lkb, -DLM_ECANCEL);
3711 } else {
3712 lkb->lkb_exflags |= DLM_LKF_FORCEUNLOCK;
3713 _unlock_lock(r, lkb);
3714 }
3715 break;
3716 default:
3717 err = 1;
3718 }
3719 } else {
3720 switch (mstype) {
3721 case DLM_MSG_LOOKUP:
3722 case DLM_MSG_REQUEST:
3723 _request_lock(r, lkb);
3724 if (is_master(r))
3725 confirm_master(r, 0);
3726 break;
3727 case DLM_MSG_CONVERT:
3728 _convert_lock(r, lkb);
3729 break;
3730 default:
3731 err = 1;
3732 }
e7fd4179 3733 }
ef0c2bb0
DT
3734
3735 if (err)
3736 log_error(ls, "recover_waiters_post %x %d %x %d %d",
3737 lkb->lkb_id, mstype, lkb->lkb_flags, oc, ou);
3738 unlock_rsb(r);
3739 put_rsb(r);
3740 dlm_put_lkb(lkb);
e7fd4179
DT
3741 }
3742
3743 return error;
3744}
3745
3746static void purge_queue(struct dlm_rsb *r, struct list_head *queue,
3747 int (*test)(struct dlm_ls *ls, struct dlm_lkb *lkb))
3748{
3749 struct dlm_ls *ls = r->res_ls;
3750 struct dlm_lkb *lkb, *safe;
3751
3752 list_for_each_entry_safe(lkb, safe, queue, lkb_statequeue) {
3753 if (test(ls, lkb)) {
97a35d1e 3754 rsb_set_flag(r, RSB_LOCKS_PURGED);
e7fd4179
DT
3755 del_lkb(r, lkb);
3756 /* this put should free the lkb */
b3f58d8f 3757 if (!dlm_put_lkb(lkb))
e7fd4179
DT
3758 log_error(ls, "purged lkb not released");
3759 }
3760 }
3761}
3762
3763static int purge_dead_test(struct dlm_ls *ls, struct dlm_lkb *lkb)
3764{
3765 return (is_master_copy(lkb) && dlm_is_removed(ls, lkb->lkb_nodeid));
3766}
3767
3768static int purge_mstcpy_test(struct dlm_ls *ls, struct dlm_lkb *lkb)
3769{
3770 return is_master_copy(lkb);
3771}
3772
3773static void purge_dead_locks(struct dlm_rsb *r)
3774{
3775 purge_queue(r, &r->res_grantqueue, &purge_dead_test);
3776 purge_queue(r, &r->res_convertqueue, &purge_dead_test);
3777 purge_queue(r, &r->res_waitqueue, &purge_dead_test);
3778}
3779
3780void dlm_purge_mstcpy_locks(struct dlm_rsb *r)
3781{
3782 purge_queue(r, &r->res_grantqueue, &purge_mstcpy_test);
3783 purge_queue(r, &r->res_convertqueue, &purge_mstcpy_test);
3784 purge_queue(r, &r->res_waitqueue, &purge_mstcpy_test);
3785}
3786
3787/* Get rid of locks held by nodes that are gone. */
3788
3789int dlm_purge_locks(struct dlm_ls *ls)
3790{
3791 struct dlm_rsb *r;
3792
3793 log_debug(ls, "dlm_purge_locks");
3794
3795 down_write(&ls->ls_root_sem);
3796 list_for_each_entry(r, &ls->ls_root_list, res_root_list) {
3797 hold_rsb(r);
3798 lock_rsb(r);
3799 if (is_master(r))
3800 purge_dead_locks(r);
3801 unlock_rsb(r);
3802 unhold_rsb(r);
3803
3804 schedule();
3805 }
3806 up_write(&ls->ls_root_sem);
3807
3808 return 0;
3809}
3810
97a35d1e
DT
3811static struct dlm_rsb *find_purged_rsb(struct dlm_ls *ls, int bucket)
3812{
3813 struct dlm_rsb *r, *r_ret = NULL;
3814
3815 read_lock(&ls->ls_rsbtbl[bucket].lock);
3816 list_for_each_entry(r, &ls->ls_rsbtbl[bucket].list, res_hashchain) {
3817 if (!rsb_flag(r, RSB_LOCKS_PURGED))
3818 continue;
3819 hold_rsb(r);
3820 rsb_clear_flag(r, RSB_LOCKS_PURGED);
3821 r_ret = r;
3822 break;
3823 }
3824 read_unlock(&ls->ls_rsbtbl[bucket].lock);
3825 return r_ret;
3826}
3827
3828void dlm_grant_after_purge(struct dlm_ls *ls)
e7fd4179
DT
3829{
3830 struct dlm_rsb *r;
2b4e926a 3831 int bucket = 0;
e7fd4179 3832
2b4e926a
DT
3833 while (1) {
3834 r = find_purged_rsb(ls, bucket);
3835 if (!r) {
3836 if (bucket == ls->ls_rsbtbl_size - 1)
3837 break;
3838 bucket++;
97a35d1e 3839 continue;
2b4e926a 3840 }
97a35d1e
DT
3841 lock_rsb(r);
3842 if (is_master(r)) {
3843 grant_pending_locks(r);
3844 confirm_master(r, 0);
e7fd4179 3845 }
97a35d1e
DT
3846 unlock_rsb(r);
3847 put_rsb(r);
2b4e926a 3848 schedule();
e7fd4179 3849 }
e7fd4179
DT
3850}
3851
3852static struct dlm_lkb *search_remid_list(struct list_head *head, int nodeid,
3853 uint32_t remid)
3854{
3855 struct dlm_lkb *lkb;
3856
3857 list_for_each_entry(lkb, head, lkb_statequeue) {
3858 if (lkb->lkb_nodeid == nodeid && lkb->lkb_remid == remid)
3859 return lkb;
3860 }
3861 return NULL;
3862}
3863
3864static struct dlm_lkb *search_remid(struct dlm_rsb *r, int nodeid,
3865 uint32_t remid)
3866{
3867 struct dlm_lkb *lkb;
3868
3869 lkb = search_remid_list(&r->res_grantqueue, nodeid, remid);
3870 if (lkb)
3871 return lkb;
3872 lkb = search_remid_list(&r->res_convertqueue, nodeid, remid);
3873 if (lkb)
3874 return lkb;
3875 lkb = search_remid_list(&r->res_waitqueue, nodeid, remid);
3876 if (lkb)
3877 return lkb;
3878 return NULL;
3879}
3880
3881static int receive_rcom_lock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
3882 struct dlm_rsb *r, struct dlm_rcom *rc)
3883{
3884 struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf;
3885 int lvblen;
3886
3887 lkb->lkb_nodeid = rc->rc_header.h_nodeid;
3888 lkb->lkb_ownpid = rl->rl_ownpid;
3889 lkb->lkb_remid = rl->rl_lkid;
3890 lkb->lkb_exflags = rl->rl_exflags;
3891 lkb->lkb_flags = rl->rl_flags & 0x0000FFFF;
3892 lkb->lkb_flags |= DLM_IFL_MSTCPY;
3893 lkb->lkb_lvbseq = rl->rl_lvbseq;
3894 lkb->lkb_rqmode = rl->rl_rqmode;
3895 lkb->lkb_grmode = rl->rl_grmode;
3896 /* don't set lkb_status because add_lkb wants to itself */
3897
3898 lkb->lkb_bastaddr = (void *) (long) (rl->rl_asts & AST_BAST);
3899 lkb->lkb_astaddr = (void *) (long) (rl->rl_asts & AST_COMP);
3900
e7fd4179
DT
3901 if (lkb->lkb_exflags & DLM_LKF_VALBLK) {
3902 lkb->lkb_lvbptr = allocate_lvb(ls);
3903 if (!lkb->lkb_lvbptr)
3904 return -ENOMEM;
3905 lvblen = rc->rc_header.h_length - sizeof(struct dlm_rcom) -
3906 sizeof(struct rcom_lock);
3907 memcpy(lkb->lkb_lvbptr, rl->rl_lvb, lvblen);
3908 }
3909
3910 /* Conversions between PR and CW (middle modes) need special handling.
3911 The real granted mode of these converting locks cannot be determined
3912 until all locks have been rebuilt on the rsb (recover_conversion) */
3913
3914 if (rl->rl_wait_type == DLM_MSG_CONVERT && middle_conversion(lkb)) {
3915 rl->rl_status = DLM_LKSTS_CONVERT;
3916 lkb->lkb_grmode = DLM_LOCK_IV;
3917 rsb_set_flag(r, RSB_RECOVER_CONVERT);
3918 }
3919
3920 return 0;
3921}
3922
3923/* This lkb may have been recovered in a previous aborted recovery so we need
3924 to check if the rsb already has an lkb with the given remote nodeid/lkid.
3925 If so we just send back a standard reply. If not, we create a new lkb with
3926 the given values and send back our lkid. We send back our lkid by sending
3927 back the rcom_lock struct we got but with the remid field filled in. */
3928
3929int dlm_recover_master_copy(struct dlm_ls *ls, struct dlm_rcom *rc)
3930{
3931 struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf;
3932 struct dlm_rsb *r;
3933 struct dlm_lkb *lkb;
3934 int error;
3935
3936 if (rl->rl_parent_lkid) {
3937 error = -EOPNOTSUPP;
3938 goto out;
3939 }
3940
3941 error = find_rsb(ls, rl->rl_name, rl->rl_namelen, R_MASTER, &r);
3942 if (error)
3943 goto out;
3944
3945 lock_rsb(r);
3946
3947 lkb = search_remid(r, rc->rc_header.h_nodeid, rl->rl_lkid);
3948 if (lkb) {
3949 error = -EEXIST;
3950 goto out_remid;
3951 }
3952
3953 error = create_lkb(ls, &lkb);
3954 if (error)
3955 goto out_unlock;
3956
3957 error = receive_rcom_lock_args(ls, lkb, r, rc);
3958 if (error) {
b3f58d8f 3959 __put_lkb(ls, lkb);
e7fd4179
DT
3960 goto out_unlock;
3961 }
3962
3963 attach_lkb(r, lkb);
3964 add_lkb(r, lkb, rl->rl_status);
3965 error = 0;
3966
3967 out_remid:
3968 /* this is the new value returned to the lock holder for
3969 saving in its process-copy lkb */
3970 rl->rl_remid = lkb->lkb_id;
3971
3972 out_unlock:
3973 unlock_rsb(r);
3974 put_rsb(r);
3975 out:
3976 if (error)
3977 log_print("recover_master_copy %d %x", error, rl->rl_lkid);
3978 rl->rl_result = error;
3979 return error;
3980}
3981
3982int dlm_recover_process_copy(struct dlm_ls *ls, struct dlm_rcom *rc)
3983{
3984 struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf;
3985 struct dlm_rsb *r;
3986 struct dlm_lkb *lkb;
3987 int error;
3988
3989 error = find_lkb(ls, rl->rl_lkid, &lkb);
3990 if (error) {
3991 log_error(ls, "recover_process_copy no lkid %x", rl->rl_lkid);
3992 return error;
3993 }
3994
3995 DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb););
3996
3997 error = rl->rl_result;
3998
3999 r = lkb->lkb_resource;
4000 hold_rsb(r);
4001 lock_rsb(r);
4002
4003 switch (error) {
dc200a88
DT
4004 case -EBADR:
4005 /* There's a chance the new master received our lock before
4006 dlm_recover_master_reply(), this wouldn't happen if we did
4007 a barrier between recover_masters and recover_locks. */
4008 log_debug(ls, "master copy not ready %x r %lx %s", lkb->lkb_id,
4009 (unsigned long)r, r->res_name);
4010 dlm_send_rcom_lock(r, lkb);
4011 goto out;
e7fd4179
DT
4012 case -EEXIST:
4013 log_debug(ls, "master copy exists %x", lkb->lkb_id);
4014 /* fall through */
4015 case 0:
4016 lkb->lkb_remid = rl->rl_remid;
4017 break;
4018 default:
4019 log_error(ls, "dlm_recover_process_copy unknown error %d %x",
4020 error, lkb->lkb_id);
4021 }
4022
4023 /* an ack for dlm_recover_locks() which waits for replies from
4024 all the locks it sends to new masters */
4025 dlm_recovered_lock(r);
dc200a88 4026 out:
e7fd4179
DT
4027 unlock_rsb(r);
4028 put_rsb(r);
b3f58d8f 4029 dlm_put_lkb(lkb);
e7fd4179
DT
4030
4031 return 0;
4032}
4033
597d0cae
DT
4034int dlm_user_request(struct dlm_ls *ls, struct dlm_user_args *ua,
4035 int mode, uint32_t flags, void *name, unsigned int namelen,
4036 uint32_t parent_lkid)
4037{
4038 struct dlm_lkb *lkb;
4039 struct dlm_args args;
4040 int error;
4041
85e86edf 4042 dlm_lock_recovery(ls);
597d0cae
DT
4043
4044 error = create_lkb(ls, &lkb);
4045 if (error) {
4046 kfree(ua);
4047 goto out;
4048 }
4049
4050 if (flags & DLM_LKF_VALBLK) {
62a0f623 4051 ua->lksb.sb_lvbptr = kzalloc(DLM_USER_LVB_LEN, GFP_KERNEL);
597d0cae
DT
4052 if (!ua->lksb.sb_lvbptr) {
4053 kfree(ua);
4054 __put_lkb(ls, lkb);
4055 error = -ENOMEM;
4056 goto out;
4057 }
4058 }
4059
4060 /* After ua is attached to lkb it will be freed by free_lkb().
4061 When DLM_IFL_USER is set, the dlm knows that this is a userspace
4062 lock and that lkb_astparam is the dlm_user_args structure. */
4063
4064 error = set_lock_args(mode, &ua->lksb, flags, namelen, parent_lkid,
32f105a1 4065 DLM_FAKE_USER_AST, ua, DLM_FAKE_USER_AST, &args);
597d0cae
DT
4066 lkb->lkb_flags |= DLM_IFL_USER;
4067 ua->old_mode = DLM_LOCK_IV;
4068
4069 if (error) {
4070 __put_lkb(ls, lkb);
4071 goto out;
4072 }
4073
4074 error = request_lock(ls, lkb, name, namelen, &args);
4075
4076 switch (error) {
4077 case 0:
4078 break;
4079 case -EINPROGRESS:
4080 error = 0;
4081 break;
4082 case -EAGAIN:
4083 error = 0;
4084 /* fall through */
4085 default:
4086 __put_lkb(ls, lkb);
4087 goto out;
4088 }
4089
4090 /* add this new lkb to the per-process list of locks */
4091 spin_lock(&ua->proc->locks_spin);
ef0c2bb0 4092 hold_lkb(lkb);
597d0cae
DT
4093 list_add_tail(&lkb->lkb_ownqueue, &ua->proc->locks);
4094 spin_unlock(&ua->proc->locks_spin);
4095 out:
85e86edf 4096 dlm_unlock_recovery(ls);
597d0cae
DT
4097 return error;
4098}
4099
4100int dlm_user_convert(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
4101 int mode, uint32_t flags, uint32_t lkid, char *lvb_in)
4102{
4103 struct dlm_lkb *lkb;
4104 struct dlm_args args;
4105 struct dlm_user_args *ua;
4106 int error;
4107
85e86edf 4108 dlm_lock_recovery(ls);
597d0cae
DT
4109
4110 error = find_lkb(ls, lkid, &lkb);
4111 if (error)
4112 goto out;
4113
4114 /* user can change the params on its lock when it converts it, or
4115 add an lvb that didn't exist before */
4116
4117 ua = (struct dlm_user_args *)lkb->lkb_astparam;
4118
4119 if (flags & DLM_LKF_VALBLK && !ua->lksb.sb_lvbptr) {
62a0f623 4120 ua->lksb.sb_lvbptr = kzalloc(DLM_USER_LVB_LEN, GFP_KERNEL);
597d0cae
DT
4121 if (!ua->lksb.sb_lvbptr) {
4122 error = -ENOMEM;
4123 goto out_put;
4124 }
4125 }
4126 if (lvb_in && ua->lksb.sb_lvbptr)
4127 memcpy(ua->lksb.sb_lvbptr, lvb_in, DLM_USER_LVB_LEN);
4128
4129 ua->castparam = ua_tmp->castparam;
4130 ua->castaddr = ua_tmp->castaddr;
4131 ua->bastparam = ua_tmp->bastparam;
4132 ua->bastaddr = ua_tmp->bastaddr;
10948eb4 4133 ua->user_lksb = ua_tmp->user_lksb;
597d0cae
DT
4134 ua->old_mode = lkb->lkb_grmode;
4135
32f105a1
DT
4136 error = set_lock_args(mode, &ua->lksb, flags, 0, 0, DLM_FAKE_USER_AST,
4137 ua, DLM_FAKE_USER_AST, &args);
597d0cae
DT
4138 if (error)
4139 goto out_put;
4140
4141 error = convert_lock(ls, lkb, &args);
4142
4143 if (error == -EINPROGRESS || error == -EAGAIN)
4144 error = 0;
4145 out_put:
4146 dlm_put_lkb(lkb);
4147 out:
85e86edf 4148 dlm_unlock_recovery(ls);
597d0cae
DT
4149 kfree(ua_tmp);
4150 return error;
4151}
4152
4153int dlm_user_unlock(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
4154 uint32_t flags, uint32_t lkid, char *lvb_in)
4155{
4156 struct dlm_lkb *lkb;
4157 struct dlm_args args;
4158 struct dlm_user_args *ua;
4159 int error;
4160
85e86edf 4161 dlm_lock_recovery(ls);
597d0cae
DT
4162
4163 error = find_lkb(ls, lkid, &lkb);
4164 if (error)
4165 goto out;
4166
4167 ua = (struct dlm_user_args *)lkb->lkb_astparam;
4168
4169 if (lvb_in && ua->lksb.sb_lvbptr)
4170 memcpy(ua->lksb.sb_lvbptr, lvb_in, DLM_USER_LVB_LEN);
4171 ua->castparam = ua_tmp->castparam;
cc346d55 4172 ua->user_lksb = ua_tmp->user_lksb;
597d0cae
DT
4173
4174 error = set_unlock_args(flags, ua, &args);
4175 if (error)
4176 goto out_put;
4177
4178 error = unlock_lock(ls, lkb, &args);
4179
4180 if (error == -DLM_EUNLOCK)
4181 error = 0;
ef0c2bb0
DT
4182 /* from validate_unlock_args() */
4183 if (error == -EBUSY && (flags & DLM_LKF_FORCEUNLOCK))
4184 error = 0;
597d0cae
DT
4185 if (error)
4186 goto out_put;
4187
4188 spin_lock(&ua->proc->locks_spin);
a1bc86e6
DT
4189 /* dlm_user_add_ast() may have already taken lkb off the proc list */
4190 if (!list_empty(&lkb->lkb_ownqueue))
4191 list_move(&lkb->lkb_ownqueue, &ua->proc->unlocking);
597d0cae 4192 spin_unlock(&ua->proc->locks_spin);
597d0cae
DT
4193 out_put:
4194 dlm_put_lkb(lkb);
4195 out:
85e86edf 4196 dlm_unlock_recovery(ls);
ef0c2bb0 4197 kfree(ua_tmp);
597d0cae
DT
4198 return error;
4199}
4200
4201int dlm_user_cancel(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
4202 uint32_t flags, uint32_t lkid)
4203{
4204 struct dlm_lkb *lkb;
4205 struct dlm_args args;
4206 struct dlm_user_args *ua;
4207 int error;
4208
85e86edf 4209 dlm_lock_recovery(ls);
597d0cae
DT
4210
4211 error = find_lkb(ls, lkid, &lkb);
4212 if (error)
4213 goto out;
4214
4215 ua = (struct dlm_user_args *)lkb->lkb_astparam;
4216 ua->castparam = ua_tmp->castparam;
c059f70e 4217 ua->user_lksb = ua_tmp->user_lksb;
597d0cae
DT
4218
4219 error = set_unlock_args(flags, ua, &args);
4220 if (error)
4221 goto out_put;
4222
4223 error = cancel_lock(ls, lkb, &args);
4224
4225 if (error == -DLM_ECANCEL)
4226 error = 0;
ef0c2bb0
DT
4227 /* from validate_unlock_args() */
4228 if (error == -EBUSY)
4229 error = 0;
597d0cae
DT
4230 out_put:
4231 dlm_put_lkb(lkb);
4232 out:
85e86edf 4233 dlm_unlock_recovery(ls);
ef0c2bb0 4234 kfree(ua_tmp);
597d0cae
DT
4235 return error;
4236}
4237
ef0c2bb0
DT
4238/* lkb's that are removed from the waiters list by revert are just left on the
4239 orphans list with the granted orphan locks, to be freed by purge */
4240
597d0cae
DT
4241static int orphan_proc_lock(struct dlm_ls *ls, struct dlm_lkb *lkb)
4242{
4243 struct dlm_user_args *ua = (struct dlm_user_args *)lkb->lkb_astparam;
ef0c2bb0
DT
4244 struct dlm_args args;
4245 int error;
597d0cae 4246
ef0c2bb0
DT
4247 hold_lkb(lkb);
4248 mutex_lock(&ls->ls_orphans_mutex);
4249 list_add_tail(&lkb->lkb_ownqueue, &ls->ls_orphans);
4250 mutex_unlock(&ls->ls_orphans_mutex);
597d0cae 4251
ef0c2bb0
DT
4252 set_unlock_args(0, ua, &args);
4253
4254 error = cancel_lock(ls, lkb, &args);
4255 if (error == -DLM_ECANCEL)
4256 error = 0;
4257 return error;
597d0cae
DT
4258}
4259
4260/* The force flag allows the unlock to go ahead even if the lkb isn't granted.
4261 Regardless of what rsb queue the lock is on, it's removed and freed. */
4262
4263static int unlock_proc_lock(struct dlm_ls *ls, struct dlm_lkb *lkb)
4264{
4265 struct dlm_user_args *ua = (struct dlm_user_args *)lkb->lkb_astparam;
4266 struct dlm_args args;
4267 int error;
4268
597d0cae
DT
4269 set_unlock_args(DLM_LKF_FORCEUNLOCK, ua, &args);
4270
4271 error = unlock_lock(ls, lkb, &args);
4272 if (error == -DLM_EUNLOCK)
4273 error = 0;
4274 return error;
4275}
4276
ef0c2bb0
DT
4277/* We have to release clear_proc_locks mutex before calling unlock_proc_lock()
4278 (which does lock_rsb) due to deadlock with receiving a message that does
4279 lock_rsb followed by dlm_user_add_ast() */
4280
4281static struct dlm_lkb *del_proc_lock(struct dlm_ls *ls,
4282 struct dlm_user_proc *proc)
4283{
4284 struct dlm_lkb *lkb = NULL;
4285
4286 mutex_lock(&ls->ls_clear_proc_locks);
4287 if (list_empty(&proc->locks))
4288 goto out;
4289
4290 lkb = list_entry(proc->locks.next, struct dlm_lkb, lkb_ownqueue);
4291 list_del_init(&lkb->lkb_ownqueue);
4292
4293 if (lkb->lkb_exflags & DLM_LKF_PERSISTENT)
4294 lkb->lkb_flags |= DLM_IFL_ORPHAN;
4295 else
4296 lkb->lkb_flags |= DLM_IFL_DEAD;
4297 out:
4298 mutex_unlock(&ls->ls_clear_proc_locks);
4299 return lkb;
4300}
4301
597d0cae
DT
4302/* The ls_clear_proc_locks mutex protects against dlm_user_add_asts() which
4303 1) references lkb->ua which we free here and 2) adds lkbs to proc->asts,
4304 which we clear here. */
4305
4306/* proc CLOSING flag is set so no more device_reads should look at proc->asts
4307 list, and no more device_writes should add lkb's to proc->locks list; so we
4308 shouldn't need to take asts_spin or locks_spin here. this assumes that
4309 device reads/writes/closes are serialized -- FIXME: we may need to serialize
4310 them ourself. */
4311
4312void dlm_clear_proc_locks(struct dlm_ls *ls, struct dlm_user_proc *proc)
4313{
4314 struct dlm_lkb *lkb, *safe;
4315
85e86edf 4316 dlm_lock_recovery(ls);
597d0cae 4317
ef0c2bb0
DT
4318 while (1) {
4319 lkb = del_proc_lock(ls, proc);
4320 if (!lkb)
4321 break;
4322 if (lkb->lkb_exflags & DLM_LKF_PERSISTENT)
597d0cae 4323 orphan_proc_lock(ls, lkb);
ef0c2bb0 4324 else
597d0cae 4325 unlock_proc_lock(ls, lkb);
597d0cae
DT
4326
4327 /* this removes the reference for the proc->locks list
4328 added by dlm_user_request, it may result in the lkb
4329 being freed */
4330
4331 dlm_put_lkb(lkb);
4332 }
a1bc86e6 4333
ef0c2bb0
DT
4334 mutex_lock(&ls->ls_clear_proc_locks);
4335
a1bc86e6
DT
4336 /* in-progress unlocks */
4337 list_for_each_entry_safe(lkb, safe, &proc->unlocking, lkb_ownqueue) {
4338 list_del_init(&lkb->lkb_ownqueue);
4339 lkb->lkb_flags |= DLM_IFL_DEAD;
4340 dlm_put_lkb(lkb);
4341 }
4342
4343 list_for_each_entry_safe(lkb, safe, &proc->asts, lkb_astqueue) {
4344 list_del(&lkb->lkb_astqueue);
4345 dlm_put_lkb(lkb);
4346 }
4347
597d0cae 4348 mutex_unlock(&ls->ls_clear_proc_locks);
85e86edf 4349 dlm_unlock_recovery(ls);
597d0cae 4350}
a1bc86e6 4351
8499137d
DT
4352static void purge_proc_locks(struct dlm_ls *ls, struct dlm_user_proc *proc)
4353{
4354 struct dlm_lkb *lkb, *safe;
4355
4356 while (1) {
4357 lkb = NULL;
4358 spin_lock(&proc->locks_spin);
4359 if (!list_empty(&proc->locks)) {
4360 lkb = list_entry(proc->locks.next, struct dlm_lkb,
4361 lkb_ownqueue);
4362 list_del_init(&lkb->lkb_ownqueue);
4363 }
4364 spin_unlock(&proc->locks_spin);
4365
4366 if (!lkb)
4367 break;
4368
4369 lkb->lkb_flags |= DLM_IFL_DEAD;
4370 unlock_proc_lock(ls, lkb);
4371 dlm_put_lkb(lkb); /* ref from proc->locks list */
4372 }
4373
4374 spin_lock(&proc->locks_spin);
4375 list_for_each_entry_safe(lkb, safe, &proc->unlocking, lkb_ownqueue) {
4376 list_del_init(&lkb->lkb_ownqueue);
4377 lkb->lkb_flags |= DLM_IFL_DEAD;
4378 dlm_put_lkb(lkb);
4379 }
4380 spin_unlock(&proc->locks_spin);
4381
4382 spin_lock(&proc->asts_spin);
4383 list_for_each_entry_safe(lkb, safe, &proc->asts, lkb_astqueue) {
4384 list_del(&lkb->lkb_astqueue);
4385 dlm_put_lkb(lkb);
4386 }
4387 spin_unlock(&proc->asts_spin);
4388}
4389
4390/* pid of 0 means purge all orphans */
4391
4392static void do_purge(struct dlm_ls *ls, int nodeid, int pid)
4393{
4394 struct dlm_lkb *lkb, *safe;
4395
4396 mutex_lock(&ls->ls_orphans_mutex);
4397 list_for_each_entry_safe(lkb, safe, &ls->ls_orphans, lkb_ownqueue) {
4398 if (pid && lkb->lkb_ownpid != pid)
4399 continue;
4400 unlock_proc_lock(ls, lkb);
4401 list_del_init(&lkb->lkb_ownqueue);
4402 dlm_put_lkb(lkb);
4403 }
4404 mutex_unlock(&ls->ls_orphans_mutex);
4405}
4406
4407static int send_purge(struct dlm_ls *ls, int nodeid, int pid)
4408{
4409 struct dlm_message *ms;
4410 struct dlm_mhandle *mh;
4411 int error;
4412
4413 error = _create_message(ls, sizeof(struct dlm_message), nodeid,
4414 DLM_MSG_PURGE, &ms, &mh);
4415 if (error)
4416 return error;
4417 ms->m_nodeid = nodeid;
4418 ms->m_pid = pid;
4419
4420 return send_message(mh, ms);
4421}
4422
4423int dlm_user_purge(struct dlm_ls *ls, struct dlm_user_proc *proc,
4424 int nodeid, int pid)
4425{
4426 int error = 0;
4427
4428 if (nodeid != dlm_our_nodeid()) {
4429 error = send_purge(ls, nodeid, pid);
4430 } else {
85e86edf 4431 dlm_lock_recovery(ls);
8499137d
DT
4432 if (pid == current->pid)
4433 purge_proc_locks(ls, proc);
4434 else
4435 do_purge(ls, nodeid, pid);
85e86edf 4436 dlm_unlock_recovery(ls);
8499137d
DT
4437 }
4438 return error;
4439}
4440
This page took 0.451845 seconds and 5 git commands to generate.