[DLM] fix coverity-spotted stupidity
[deliverable/linux.git] / fs / dlm / lock.c
CommitLineData
e7fd4179
DT
1/******************************************************************************
2*******************************************************************************
3**
4** Copyright (C) 2005 Red Hat, Inc. All rights reserved.
5**
6** This copyrighted material is made available to anyone wishing to use,
7** modify, copy, or redistribute it subject to the terms and conditions
8** of the GNU General Public License v.2.
9**
10*******************************************************************************
11******************************************************************************/
12
13/* Central locking logic has four stages:
14
15 dlm_lock()
16 dlm_unlock()
17
18 request_lock(ls, lkb)
19 convert_lock(ls, lkb)
20 unlock_lock(ls, lkb)
21 cancel_lock(ls, lkb)
22
23 _request_lock(r, lkb)
24 _convert_lock(r, lkb)
25 _unlock_lock(r, lkb)
26 _cancel_lock(r, lkb)
27
28 do_request(r, lkb)
29 do_convert(r, lkb)
30 do_unlock(r, lkb)
31 do_cancel(r, lkb)
32
33 Stage 1 (lock, unlock) is mainly about checking input args and
34 splitting into one of the four main operations:
35
36 dlm_lock = request_lock
37 dlm_lock+CONVERT = convert_lock
38 dlm_unlock = unlock_lock
39 dlm_unlock+CANCEL = cancel_lock
40
41 Stage 2, xxxx_lock(), just finds and locks the relevant rsb which is
42 provided to the next stage.
43
44 Stage 3, _xxxx_lock(), determines if the operation is local or remote.
45 When remote, it calls send_xxxx(), when local it calls do_xxxx().
46
47 Stage 4, do_xxxx(), is the guts of the operation. It manipulates the
48 given rsb and lkb and queues callbacks.
49
50 For remote operations, send_xxxx() results in the corresponding do_xxxx()
51 function being executed on the remote node. The connecting send/receive
52 calls on local (L) and remote (R) nodes:
53
54 L: send_xxxx() -> R: receive_xxxx()
55 R: do_xxxx()
56 L: receive_xxxx_reply() <- R: send_xxxx_reply()
57*/
597d0cae 58#include <linux/types.h>
e7fd4179 59#include "dlm_internal.h"
597d0cae 60#include <linux/dlm_device.h>
e7fd4179
DT
61#include "memory.h"
62#include "lowcomms.h"
63#include "requestqueue.h"
64#include "util.h"
65#include "dir.h"
66#include "member.h"
67#include "lockspace.h"
68#include "ast.h"
69#include "lock.h"
70#include "rcom.h"
71#include "recover.h"
72#include "lvb_table.h"
597d0cae 73#include "user.h"
e7fd4179
DT
74#include "config.h"
75
76static int send_request(struct dlm_rsb *r, struct dlm_lkb *lkb);
77static int send_convert(struct dlm_rsb *r, struct dlm_lkb *lkb);
78static int send_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb);
79static int send_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb);
80static int send_grant(struct dlm_rsb *r, struct dlm_lkb *lkb);
81static int send_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int mode);
82static int send_lookup(struct dlm_rsb *r, struct dlm_lkb *lkb);
83static int send_remove(struct dlm_rsb *r);
84static int _request_lock(struct dlm_rsb *r, struct dlm_lkb *lkb);
85static void __receive_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
86 struct dlm_message *ms);
87static int receive_extralen(struct dlm_message *ms);
88
89/*
90 * Lock compatibilty matrix - thanks Steve
91 * UN = Unlocked state. Not really a state, used as a flag
92 * PD = Padding. Used to make the matrix a nice power of two in size
93 * Other states are the same as the VMS DLM.
94 * Usage: matrix[grmode+1][rqmode+1] (although m[rq+1][gr+1] is the same)
95 */
96
97static const int __dlm_compat_matrix[8][8] = {
98 /* UN NL CR CW PR PW EX PD */
99 {1, 1, 1, 1, 1, 1, 1, 0}, /* UN */
100 {1, 1, 1, 1, 1, 1, 1, 0}, /* NL */
101 {1, 1, 1, 1, 1, 1, 0, 0}, /* CR */
102 {1, 1, 1, 1, 0, 0, 0, 0}, /* CW */
103 {1, 1, 1, 0, 1, 0, 0, 0}, /* PR */
104 {1, 1, 1, 0, 0, 0, 0, 0}, /* PW */
105 {1, 1, 0, 0, 0, 0, 0, 0}, /* EX */
106 {0, 0, 0, 0, 0, 0, 0, 0} /* PD */
107};
108
109/*
110 * This defines the direction of transfer of LVB data.
111 * Granted mode is the row; requested mode is the column.
112 * Usage: matrix[grmode+1][rqmode+1]
113 * 1 = LVB is returned to the caller
114 * 0 = LVB is written to the resource
115 * -1 = nothing happens to the LVB
116 */
117
118const int dlm_lvb_operations[8][8] = {
119 /* UN NL CR CW PR PW EX PD*/
120 { -1, 1, 1, 1, 1, 1, 1, -1 }, /* UN */
121 { -1, 1, 1, 1, 1, 1, 1, 0 }, /* NL */
122 { -1, -1, 1, 1, 1, 1, 1, 0 }, /* CR */
123 { -1, -1, -1, 1, 1, 1, 1, 0 }, /* CW */
124 { -1, -1, -1, -1, 1, 1, 1, 0 }, /* PR */
125 { -1, 0, 0, 0, 0, 0, 1, 0 }, /* PW */
126 { -1, 0, 0, 0, 0, 0, 0, 0 }, /* EX */
127 { -1, 0, 0, 0, 0, 0, 0, 0 } /* PD */
128};
e7fd4179
DT
129
130#define modes_compat(gr, rq) \
131 __dlm_compat_matrix[(gr)->lkb_grmode + 1][(rq)->lkb_rqmode + 1]
132
133int dlm_modes_compat(int mode1, int mode2)
134{
135 return __dlm_compat_matrix[mode1 + 1][mode2 + 1];
136}
137
138/*
139 * Compatibility matrix for conversions with QUECVT set.
140 * Granted mode is the row; requested mode is the column.
141 * Usage: matrix[grmode+1][rqmode+1]
142 */
143
144static const int __quecvt_compat_matrix[8][8] = {
145 /* UN NL CR CW PR PW EX PD */
146 {0, 0, 0, 0, 0, 0, 0, 0}, /* UN */
147 {0, 0, 1, 1, 1, 1, 1, 0}, /* NL */
148 {0, 0, 0, 1, 1, 1, 1, 0}, /* CR */
149 {0, 0, 0, 0, 1, 1, 1, 0}, /* CW */
150 {0, 0, 0, 1, 0, 1, 1, 0}, /* PR */
151 {0, 0, 0, 0, 0, 0, 1, 0}, /* PW */
152 {0, 0, 0, 0, 0, 0, 0, 0}, /* EX */
153 {0, 0, 0, 0, 0, 0, 0, 0} /* PD */
154};
155
597d0cae 156void dlm_print_lkb(struct dlm_lkb *lkb)
e7fd4179
DT
157{
158 printk(KERN_ERR "lkb: nodeid %d id %x remid %x exflags %x flags %x\n"
159 " status %d rqmode %d grmode %d wait_type %d ast_type %d\n",
160 lkb->lkb_nodeid, lkb->lkb_id, lkb->lkb_remid, lkb->lkb_exflags,
161 lkb->lkb_flags, lkb->lkb_status, lkb->lkb_rqmode,
162 lkb->lkb_grmode, lkb->lkb_wait_type, lkb->lkb_ast_type);
163}
164
165void dlm_print_rsb(struct dlm_rsb *r)
166{
167 printk(KERN_ERR "rsb: nodeid %d flags %lx first %x rlc %d name %s\n",
168 r->res_nodeid, r->res_flags, r->res_first_lkid,
169 r->res_recover_locks_count, r->res_name);
170}
171
a345da3e
DT
172void dlm_dump_rsb(struct dlm_rsb *r)
173{
174 struct dlm_lkb *lkb;
175
176 dlm_print_rsb(r);
177
178 printk(KERN_ERR "rsb: root_list empty %d recover_list empty %d\n",
179 list_empty(&r->res_root_list), list_empty(&r->res_recover_list));
180 printk(KERN_ERR "rsb lookup list\n");
181 list_for_each_entry(lkb, &r->res_lookup, lkb_rsb_lookup)
182 dlm_print_lkb(lkb);
183 printk(KERN_ERR "rsb grant queue:\n");
184 list_for_each_entry(lkb, &r->res_grantqueue, lkb_statequeue)
185 dlm_print_lkb(lkb);
186 printk(KERN_ERR "rsb convert queue:\n");
187 list_for_each_entry(lkb, &r->res_convertqueue, lkb_statequeue)
188 dlm_print_lkb(lkb);
189 printk(KERN_ERR "rsb wait queue:\n");
190 list_for_each_entry(lkb, &r->res_waitqueue, lkb_statequeue)
191 dlm_print_lkb(lkb);
192}
193
e7fd4179
DT
194/* Threads cannot use the lockspace while it's being recovered */
195
196static inline void lock_recovery(struct dlm_ls *ls)
197{
198 down_read(&ls->ls_in_recovery);
199}
200
201static inline void unlock_recovery(struct dlm_ls *ls)
202{
203 up_read(&ls->ls_in_recovery);
204}
205
206static inline int lock_recovery_try(struct dlm_ls *ls)
207{
208 return down_read_trylock(&ls->ls_in_recovery);
209}
210
211static inline int can_be_queued(struct dlm_lkb *lkb)
212{
213 return !(lkb->lkb_exflags & DLM_LKF_NOQUEUE);
214}
215
216static inline int force_blocking_asts(struct dlm_lkb *lkb)
217{
218 return (lkb->lkb_exflags & DLM_LKF_NOQUEUEBAST);
219}
220
221static inline int is_demoted(struct dlm_lkb *lkb)
222{
223 return (lkb->lkb_sbflags & DLM_SBF_DEMOTED);
224}
225
226static inline int is_remote(struct dlm_rsb *r)
227{
228 DLM_ASSERT(r->res_nodeid >= 0, dlm_print_rsb(r););
229 return !!r->res_nodeid;
230}
231
232static inline int is_process_copy(struct dlm_lkb *lkb)
233{
234 return (lkb->lkb_nodeid && !(lkb->lkb_flags & DLM_IFL_MSTCPY));
235}
236
237static inline int is_master_copy(struct dlm_lkb *lkb)
238{
239 if (lkb->lkb_flags & DLM_IFL_MSTCPY)
240 DLM_ASSERT(lkb->lkb_nodeid, dlm_print_lkb(lkb););
90135925 241 return (lkb->lkb_flags & DLM_IFL_MSTCPY) ? 1 : 0;
e7fd4179
DT
242}
243
244static inline int middle_conversion(struct dlm_lkb *lkb)
245{
246 if ((lkb->lkb_grmode==DLM_LOCK_PR && lkb->lkb_rqmode==DLM_LOCK_CW) ||
247 (lkb->lkb_rqmode==DLM_LOCK_PR && lkb->lkb_grmode==DLM_LOCK_CW))
90135925
DT
248 return 1;
249 return 0;
e7fd4179
DT
250}
251
252static inline int down_conversion(struct dlm_lkb *lkb)
253{
254 return (!middle_conversion(lkb) && lkb->lkb_rqmode < lkb->lkb_grmode);
255}
256
257static void queue_cast(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
258{
259 if (is_master_copy(lkb))
260 return;
261
262 DLM_ASSERT(lkb->lkb_lksb, dlm_print_lkb(lkb););
263
264 lkb->lkb_lksb->sb_status = rv;
265 lkb->lkb_lksb->sb_flags = lkb->lkb_sbflags;
266
267 dlm_add_ast(lkb, AST_COMP);
268}
269
270static void queue_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int rqmode)
271{
272 if (is_master_copy(lkb))
273 send_bast(r, lkb, rqmode);
274 else {
275 lkb->lkb_bastmode = rqmode;
276 dlm_add_ast(lkb, AST_BAST);
277 }
278}
279
280/*
281 * Basic operations on rsb's and lkb's
282 */
283
284static struct dlm_rsb *create_rsb(struct dlm_ls *ls, char *name, int len)
285{
286 struct dlm_rsb *r;
287
288 r = allocate_rsb(ls, len);
289 if (!r)
290 return NULL;
291
292 r->res_ls = ls;
293 r->res_length = len;
294 memcpy(r->res_name, name, len);
90135925 295 mutex_init(&r->res_mutex);
e7fd4179
DT
296
297 INIT_LIST_HEAD(&r->res_lookup);
298 INIT_LIST_HEAD(&r->res_grantqueue);
299 INIT_LIST_HEAD(&r->res_convertqueue);
300 INIT_LIST_HEAD(&r->res_waitqueue);
301 INIT_LIST_HEAD(&r->res_root_list);
302 INIT_LIST_HEAD(&r->res_recover_list);
303
304 return r;
305}
306
307static int search_rsb_list(struct list_head *head, char *name, int len,
308 unsigned int flags, struct dlm_rsb **r_ret)
309{
310 struct dlm_rsb *r;
311 int error = 0;
312
313 list_for_each_entry(r, head, res_hashchain) {
314 if (len == r->res_length && !memcmp(name, r->res_name, len))
315 goto found;
316 }
597d0cae 317 return -EBADR;
e7fd4179
DT
318
319 found:
320 if (r->res_nodeid && (flags & R_MASTER))
321 error = -ENOTBLK;
322 *r_ret = r;
323 return error;
324}
325
326static int _search_rsb(struct dlm_ls *ls, char *name, int len, int b,
327 unsigned int flags, struct dlm_rsb **r_ret)
328{
329 struct dlm_rsb *r;
330 int error;
331
332 error = search_rsb_list(&ls->ls_rsbtbl[b].list, name, len, flags, &r);
333 if (!error) {
334 kref_get(&r->res_ref);
335 goto out;
336 }
337 error = search_rsb_list(&ls->ls_rsbtbl[b].toss, name, len, flags, &r);
338 if (error)
339 goto out;
340
341 list_move(&r->res_hashchain, &ls->ls_rsbtbl[b].list);
342
343 if (dlm_no_directory(ls))
344 goto out;
345
346 if (r->res_nodeid == -1) {
347 rsb_clear_flag(r, RSB_MASTER_UNCERTAIN);
348 r->res_first_lkid = 0;
349 } else if (r->res_nodeid > 0) {
350 rsb_set_flag(r, RSB_MASTER_UNCERTAIN);
351 r->res_first_lkid = 0;
352 } else {
353 DLM_ASSERT(r->res_nodeid == 0, dlm_print_rsb(r););
354 DLM_ASSERT(!rsb_flag(r, RSB_MASTER_UNCERTAIN),);
355 }
356 out:
357 *r_ret = r;
358 return error;
359}
360
361static int search_rsb(struct dlm_ls *ls, char *name, int len, int b,
362 unsigned int flags, struct dlm_rsb **r_ret)
363{
364 int error;
365 write_lock(&ls->ls_rsbtbl[b].lock);
366 error = _search_rsb(ls, name, len, b, flags, r_ret);
367 write_unlock(&ls->ls_rsbtbl[b].lock);
368 return error;
369}
370
371/*
372 * Find rsb in rsbtbl and potentially create/add one
373 *
374 * Delaying the release of rsb's has a similar benefit to applications keeping
375 * NL locks on an rsb, but without the guarantee that the cached master value
376 * will still be valid when the rsb is reused. Apps aren't always smart enough
377 * to keep NL locks on an rsb that they may lock again shortly; this can lead
378 * to excessive master lookups and removals if we don't delay the release.
379 *
380 * Searching for an rsb means looking through both the normal list and toss
381 * list. When found on the toss list the rsb is moved to the normal list with
382 * ref count of 1; when found on normal list the ref count is incremented.
383 */
384
385static int find_rsb(struct dlm_ls *ls, char *name, int namelen,
386 unsigned int flags, struct dlm_rsb **r_ret)
387{
388 struct dlm_rsb *r, *tmp;
389 uint32_t hash, bucket;
390 int error = 0;
391
392 if (dlm_no_directory(ls))
393 flags |= R_CREATE;
394
395 hash = jhash(name, namelen, 0);
396 bucket = hash & (ls->ls_rsbtbl_size - 1);
397
398 error = search_rsb(ls, name, namelen, bucket, flags, &r);
399 if (!error)
400 goto out;
401
597d0cae 402 if (error == -EBADR && !(flags & R_CREATE))
e7fd4179
DT
403 goto out;
404
405 /* the rsb was found but wasn't a master copy */
406 if (error == -ENOTBLK)
407 goto out;
408
409 error = -ENOMEM;
410 r = create_rsb(ls, name, namelen);
411 if (!r)
412 goto out;
413
414 r->res_hash = hash;
415 r->res_bucket = bucket;
416 r->res_nodeid = -1;
417 kref_init(&r->res_ref);
418
419 /* With no directory, the master can be set immediately */
420 if (dlm_no_directory(ls)) {
421 int nodeid = dlm_dir_nodeid(r);
422 if (nodeid == dlm_our_nodeid())
423 nodeid = 0;
424 r->res_nodeid = nodeid;
425 }
426
427 write_lock(&ls->ls_rsbtbl[bucket].lock);
428 error = _search_rsb(ls, name, namelen, bucket, 0, &tmp);
429 if (!error) {
430 write_unlock(&ls->ls_rsbtbl[bucket].lock);
431 free_rsb(r);
432 r = tmp;
433 goto out;
434 }
435 list_add(&r->res_hashchain, &ls->ls_rsbtbl[bucket].list);
436 write_unlock(&ls->ls_rsbtbl[bucket].lock);
437 error = 0;
438 out:
439 *r_ret = r;
440 return error;
441}
442
443int dlm_find_rsb(struct dlm_ls *ls, char *name, int namelen,
444 unsigned int flags, struct dlm_rsb **r_ret)
445{
446 return find_rsb(ls, name, namelen, flags, r_ret);
447}
448
449/* This is only called to add a reference when the code already holds
450 a valid reference to the rsb, so there's no need for locking. */
451
452static inline void hold_rsb(struct dlm_rsb *r)
453{
454 kref_get(&r->res_ref);
455}
456
457void dlm_hold_rsb(struct dlm_rsb *r)
458{
459 hold_rsb(r);
460}
461
462static void toss_rsb(struct kref *kref)
463{
464 struct dlm_rsb *r = container_of(kref, struct dlm_rsb, res_ref);
465 struct dlm_ls *ls = r->res_ls;
466
467 DLM_ASSERT(list_empty(&r->res_root_list), dlm_print_rsb(r););
468 kref_init(&r->res_ref);
469 list_move(&r->res_hashchain, &ls->ls_rsbtbl[r->res_bucket].toss);
470 r->res_toss_time = jiffies;
471 if (r->res_lvbptr) {
472 free_lvb(r->res_lvbptr);
473 r->res_lvbptr = NULL;
474 }
475}
476
477/* When all references to the rsb are gone it's transfered to
478 the tossed list for later disposal. */
479
480static void put_rsb(struct dlm_rsb *r)
481{
482 struct dlm_ls *ls = r->res_ls;
483 uint32_t bucket = r->res_bucket;
484
485 write_lock(&ls->ls_rsbtbl[bucket].lock);
486 kref_put(&r->res_ref, toss_rsb);
487 write_unlock(&ls->ls_rsbtbl[bucket].lock);
488}
489
490void dlm_put_rsb(struct dlm_rsb *r)
491{
492 put_rsb(r);
493}
494
495/* See comment for unhold_lkb */
496
497static void unhold_rsb(struct dlm_rsb *r)
498{
499 int rv;
500 rv = kref_put(&r->res_ref, toss_rsb);
a345da3e 501 DLM_ASSERT(!rv, dlm_dump_rsb(r););
e7fd4179
DT
502}
503
504static void kill_rsb(struct kref *kref)
505{
506 struct dlm_rsb *r = container_of(kref, struct dlm_rsb, res_ref);
507
508 /* All work is done after the return from kref_put() so we
509 can release the write_lock before the remove and free. */
510
a345da3e
DT
511 DLM_ASSERT(list_empty(&r->res_lookup), dlm_dump_rsb(r););
512 DLM_ASSERT(list_empty(&r->res_grantqueue), dlm_dump_rsb(r););
513 DLM_ASSERT(list_empty(&r->res_convertqueue), dlm_dump_rsb(r););
514 DLM_ASSERT(list_empty(&r->res_waitqueue), dlm_dump_rsb(r););
515 DLM_ASSERT(list_empty(&r->res_root_list), dlm_dump_rsb(r););
516 DLM_ASSERT(list_empty(&r->res_recover_list), dlm_dump_rsb(r););
e7fd4179
DT
517}
518
519/* Attaching/detaching lkb's from rsb's is for rsb reference counting.
520 The rsb must exist as long as any lkb's for it do. */
521
522static void attach_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb)
523{
524 hold_rsb(r);
525 lkb->lkb_resource = r;
526}
527
528static void detach_lkb(struct dlm_lkb *lkb)
529{
530 if (lkb->lkb_resource) {
531 put_rsb(lkb->lkb_resource);
532 lkb->lkb_resource = NULL;
533 }
534}
535
536static int create_lkb(struct dlm_ls *ls, struct dlm_lkb **lkb_ret)
537{
538 struct dlm_lkb *lkb, *tmp;
539 uint32_t lkid = 0;
540 uint16_t bucket;
541
542 lkb = allocate_lkb(ls);
543 if (!lkb)
544 return -ENOMEM;
545
546 lkb->lkb_nodeid = -1;
547 lkb->lkb_grmode = DLM_LOCK_IV;
548 kref_init(&lkb->lkb_ref);
34e22bed 549 INIT_LIST_HEAD(&lkb->lkb_ownqueue);
e7fd4179
DT
550
551 get_random_bytes(&bucket, sizeof(bucket));
552 bucket &= (ls->ls_lkbtbl_size - 1);
553
554 write_lock(&ls->ls_lkbtbl[bucket].lock);
555
556 /* counter can roll over so we must verify lkid is not in use */
557
558 while (lkid == 0) {
559 lkid = bucket | (ls->ls_lkbtbl[bucket].counter++ << 16);
560
561 list_for_each_entry(tmp, &ls->ls_lkbtbl[bucket].list,
562 lkb_idtbl_list) {
563 if (tmp->lkb_id != lkid)
564 continue;
565 lkid = 0;
566 break;
567 }
568 }
569
570 lkb->lkb_id = lkid;
571 list_add(&lkb->lkb_idtbl_list, &ls->ls_lkbtbl[bucket].list);
572 write_unlock(&ls->ls_lkbtbl[bucket].lock);
573
574 *lkb_ret = lkb;
575 return 0;
576}
577
578static struct dlm_lkb *__find_lkb(struct dlm_ls *ls, uint32_t lkid)
579{
580 uint16_t bucket = lkid & 0xFFFF;
581 struct dlm_lkb *lkb;
582
583 list_for_each_entry(lkb, &ls->ls_lkbtbl[bucket].list, lkb_idtbl_list) {
584 if (lkb->lkb_id == lkid)
585 return lkb;
586 }
587 return NULL;
588}
589
590static int find_lkb(struct dlm_ls *ls, uint32_t lkid, struct dlm_lkb **lkb_ret)
591{
592 struct dlm_lkb *lkb;
593 uint16_t bucket = lkid & 0xFFFF;
594
595 if (bucket >= ls->ls_lkbtbl_size)
596 return -EBADSLT;
597
598 read_lock(&ls->ls_lkbtbl[bucket].lock);
599 lkb = __find_lkb(ls, lkid);
600 if (lkb)
601 kref_get(&lkb->lkb_ref);
602 read_unlock(&ls->ls_lkbtbl[bucket].lock);
603
604 *lkb_ret = lkb;
605 return lkb ? 0 : -ENOENT;
606}
607
608static void kill_lkb(struct kref *kref)
609{
610 struct dlm_lkb *lkb = container_of(kref, struct dlm_lkb, lkb_ref);
611
612 /* All work is done after the return from kref_put() so we
613 can release the write_lock before the detach_lkb */
614
615 DLM_ASSERT(!lkb->lkb_status, dlm_print_lkb(lkb););
616}
617
b3f58d8f
DT
618/* __put_lkb() is used when an lkb may not have an rsb attached to
619 it so we need to provide the lockspace explicitly */
620
621static int __put_lkb(struct dlm_ls *ls, struct dlm_lkb *lkb)
e7fd4179 622{
e7fd4179
DT
623 uint16_t bucket = lkb->lkb_id & 0xFFFF;
624
625 write_lock(&ls->ls_lkbtbl[bucket].lock);
626 if (kref_put(&lkb->lkb_ref, kill_lkb)) {
627 list_del(&lkb->lkb_idtbl_list);
628 write_unlock(&ls->ls_lkbtbl[bucket].lock);
629
630 detach_lkb(lkb);
631
632 /* for local/process lkbs, lvbptr points to caller's lksb */
633 if (lkb->lkb_lvbptr && is_master_copy(lkb))
634 free_lvb(lkb->lkb_lvbptr);
e7fd4179
DT
635 free_lkb(lkb);
636 return 1;
637 } else {
638 write_unlock(&ls->ls_lkbtbl[bucket].lock);
639 return 0;
640 }
641}
642
643int dlm_put_lkb(struct dlm_lkb *lkb)
644{
b3f58d8f
DT
645 struct dlm_ls *ls;
646
647 DLM_ASSERT(lkb->lkb_resource, dlm_print_lkb(lkb););
648 DLM_ASSERT(lkb->lkb_resource->res_ls, dlm_print_lkb(lkb););
649
650 ls = lkb->lkb_resource->res_ls;
651 return __put_lkb(ls, lkb);
e7fd4179
DT
652}
653
654/* This is only called to add a reference when the code already holds
655 a valid reference to the lkb, so there's no need for locking. */
656
657static inline void hold_lkb(struct dlm_lkb *lkb)
658{
659 kref_get(&lkb->lkb_ref);
660}
661
662/* This is called when we need to remove a reference and are certain
663 it's not the last ref. e.g. del_lkb is always called between a
664 find_lkb/put_lkb and is always the inverse of a previous add_lkb.
665 put_lkb would work fine, but would involve unnecessary locking */
666
667static inline void unhold_lkb(struct dlm_lkb *lkb)
668{
669 int rv;
670 rv = kref_put(&lkb->lkb_ref, kill_lkb);
671 DLM_ASSERT(!rv, dlm_print_lkb(lkb););
672}
673
674static void lkb_add_ordered(struct list_head *new, struct list_head *head,
675 int mode)
676{
677 struct dlm_lkb *lkb = NULL;
678
679 list_for_each_entry(lkb, head, lkb_statequeue)
680 if (lkb->lkb_rqmode < mode)
681 break;
682
683 if (!lkb)
684 list_add_tail(new, head);
685 else
686 __list_add(new, lkb->lkb_statequeue.prev, &lkb->lkb_statequeue);
687}
688
689/* add/remove lkb to rsb's grant/convert/wait queue */
690
691static void add_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb, int status)
692{
693 kref_get(&lkb->lkb_ref);
694
695 DLM_ASSERT(!lkb->lkb_status, dlm_print_lkb(lkb););
696
697 lkb->lkb_status = status;
698
699 switch (status) {
700 case DLM_LKSTS_WAITING:
701 if (lkb->lkb_exflags & DLM_LKF_HEADQUE)
702 list_add(&lkb->lkb_statequeue, &r->res_waitqueue);
703 else
704 list_add_tail(&lkb->lkb_statequeue, &r->res_waitqueue);
705 break;
706 case DLM_LKSTS_GRANTED:
707 /* convention says granted locks kept in order of grmode */
708 lkb_add_ordered(&lkb->lkb_statequeue, &r->res_grantqueue,
709 lkb->lkb_grmode);
710 break;
711 case DLM_LKSTS_CONVERT:
712 if (lkb->lkb_exflags & DLM_LKF_HEADQUE)
713 list_add(&lkb->lkb_statequeue, &r->res_convertqueue);
714 else
715 list_add_tail(&lkb->lkb_statequeue,
716 &r->res_convertqueue);
717 break;
718 default:
719 DLM_ASSERT(0, dlm_print_lkb(lkb); printk("sts=%d\n", status););
720 }
721}
722
723static void del_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb)
724{
725 lkb->lkb_status = 0;
726 list_del(&lkb->lkb_statequeue);
727 unhold_lkb(lkb);
728}
729
730static void move_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb, int sts)
731{
732 hold_lkb(lkb);
733 del_lkb(r, lkb);
734 add_lkb(r, lkb, sts);
735 unhold_lkb(lkb);
736}
737
738/* add/remove lkb from global waiters list of lkb's waiting for
739 a reply from a remote node */
740
741static void add_to_waiters(struct dlm_lkb *lkb, int mstype)
742{
743 struct dlm_ls *ls = lkb->lkb_resource->res_ls;
744
90135925 745 mutex_lock(&ls->ls_waiters_mutex);
e7fd4179
DT
746 if (lkb->lkb_wait_type) {
747 log_print("add_to_waiters error %d", lkb->lkb_wait_type);
748 goto out;
749 }
750 lkb->lkb_wait_type = mstype;
751 kref_get(&lkb->lkb_ref);
752 list_add(&lkb->lkb_wait_reply, &ls->ls_waiters);
753 out:
90135925 754 mutex_unlock(&ls->ls_waiters_mutex);
e7fd4179
DT
755}
756
b790c3b7
DT
757/* We clear the RESEND flag because we might be taking an lkb off the waiters
758 list as part of process_requestqueue (e.g. a lookup that has an optimized
759 request reply on the requestqueue) between dlm_recover_waiters_pre() which
760 set RESEND and dlm_recover_waiters_post() */
761
e7fd4179
DT
762static int _remove_from_waiters(struct dlm_lkb *lkb)
763{
764 int error = 0;
765
766 if (!lkb->lkb_wait_type) {
767 log_print("remove_from_waiters error");
768 error = -EINVAL;
769 goto out;
770 }
771 lkb->lkb_wait_type = 0;
b790c3b7 772 lkb->lkb_flags &= ~DLM_IFL_RESEND;
e7fd4179
DT
773 list_del(&lkb->lkb_wait_reply);
774 unhold_lkb(lkb);
775 out:
776 return error;
777}
778
779static int remove_from_waiters(struct dlm_lkb *lkb)
780{
781 struct dlm_ls *ls = lkb->lkb_resource->res_ls;
782 int error;
783
90135925 784 mutex_lock(&ls->ls_waiters_mutex);
e7fd4179 785 error = _remove_from_waiters(lkb);
90135925 786 mutex_unlock(&ls->ls_waiters_mutex);
e7fd4179
DT
787 return error;
788}
789
790static void dir_remove(struct dlm_rsb *r)
791{
792 int to_nodeid;
793
794 if (dlm_no_directory(r->res_ls))
795 return;
796
797 to_nodeid = dlm_dir_nodeid(r);
798 if (to_nodeid != dlm_our_nodeid())
799 send_remove(r);
800 else
801 dlm_dir_remove_entry(r->res_ls, to_nodeid,
802 r->res_name, r->res_length);
803}
804
805/* FIXME: shouldn't this be able to exit as soon as one non-due rsb is
806 found since they are in order of newest to oldest? */
807
808static int shrink_bucket(struct dlm_ls *ls, int b)
809{
810 struct dlm_rsb *r;
811 int count = 0, found;
812
813 for (;;) {
90135925 814 found = 0;
e7fd4179
DT
815 write_lock(&ls->ls_rsbtbl[b].lock);
816 list_for_each_entry_reverse(r, &ls->ls_rsbtbl[b].toss,
817 res_hashchain) {
818 if (!time_after_eq(jiffies, r->res_toss_time +
68c817a1 819 dlm_config.ci_toss_secs * HZ))
e7fd4179 820 continue;
90135925 821 found = 1;
e7fd4179
DT
822 break;
823 }
824
825 if (!found) {
826 write_unlock(&ls->ls_rsbtbl[b].lock);
827 break;
828 }
829
830 if (kref_put(&r->res_ref, kill_rsb)) {
831 list_del(&r->res_hashchain);
832 write_unlock(&ls->ls_rsbtbl[b].lock);
833
834 if (is_master(r))
835 dir_remove(r);
836 free_rsb(r);
837 count++;
838 } else {
839 write_unlock(&ls->ls_rsbtbl[b].lock);
840 log_error(ls, "tossed rsb in use %s", r->res_name);
841 }
842 }
843
844 return count;
845}
846
847void dlm_scan_rsbs(struct dlm_ls *ls)
848{
849 int i;
850
851 if (dlm_locking_stopped(ls))
852 return;
853
854 for (i = 0; i < ls->ls_rsbtbl_size; i++) {
855 shrink_bucket(ls, i);
856 cond_resched();
857 }
858}
859
860/* lkb is master or local copy */
861
862static void set_lvb_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
863{
864 int b, len = r->res_ls->ls_lvblen;
865
866 /* b=1 lvb returned to caller
867 b=0 lvb written to rsb or invalidated
868 b=-1 do nothing */
869
870 b = dlm_lvb_operations[lkb->lkb_grmode + 1][lkb->lkb_rqmode + 1];
871
872 if (b == 1) {
873 if (!lkb->lkb_lvbptr)
874 return;
875
876 if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
877 return;
878
879 if (!r->res_lvbptr)
880 return;
881
882 memcpy(lkb->lkb_lvbptr, r->res_lvbptr, len);
883 lkb->lkb_lvbseq = r->res_lvbseq;
884
885 } else if (b == 0) {
886 if (lkb->lkb_exflags & DLM_LKF_IVVALBLK) {
887 rsb_set_flag(r, RSB_VALNOTVALID);
888 return;
889 }
890
891 if (!lkb->lkb_lvbptr)
892 return;
893
894 if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
895 return;
896
897 if (!r->res_lvbptr)
898 r->res_lvbptr = allocate_lvb(r->res_ls);
899
900 if (!r->res_lvbptr)
901 return;
902
903 memcpy(r->res_lvbptr, lkb->lkb_lvbptr, len);
904 r->res_lvbseq++;
905 lkb->lkb_lvbseq = r->res_lvbseq;
906 rsb_clear_flag(r, RSB_VALNOTVALID);
907 }
908
909 if (rsb_flag(r, RSB_VALNOTVALID))
910 lkb->lkb_sbflags |= DLM_SBF_VALNOTVALID;
911}
912
913static void set_lvb_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb)
914{
915 if (lkb->lkb_grmode < DLM_LOCK_PW)
916 return;
917
918 if (lkb->lkb_exflags & DLM_LKF_IVVALBLK) {
919 rsb_set_flag(r, RSB_VALNOTVALID);
920 return;
921 }
922
923 if (!lkb->lkb_lvbptr)
924 return;
925
926 if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
927 return;
928
929 if (!r->res_lvbptr)
930 r->res_lvbptr = allocate_lvb(r->res_ls);
931
932 if (!r->res_lvbptr)
933 return;
934
935 memcpy(r->res_lvbptr, lkb->lkb_lvbptr, r->res_ls->ls_lvblen);
936 r->res_lvbseq++;
937 rsb_clear_flag(r, RSB_VALNOTVALID);
938}
939
940/* lkb is process copy (pc) */
941
942static void set_lvb_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb,
943 struct dlm_message *ms)
944{
945 int b;
946
947 if (!lkb->lkb_lvbptr)
948 return;
949
950 if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
951 return;
952
597d0cae 953 b = dlm_lvb_operations[lkb->lkb_grmode + 1][lkb->lkb_rqmode + 1];
e7fd4179
DT
954 if (b == 1) {
955 int len = receive_extralen(ms);
956 memcpy(lkb->lkb_lvbptr, ms->m_extra, len);
957 lkb->lkb_lvbseq = ms->m_lvbseq;
958 }
959}
960
961/* Manipulate lkb's on rsb's convert/granted/waiting queues
962 remove_lock -- used for unlock, removes lkb from granted
963 revert_lock -- used for cancel, moves lkb from convert to granted
964 grant_lock -- used for request and convert, adds lkb to granted or
965 moves lkb from convert or waiting to granted
966
967 Each of these is used for master or local copy lkb's. There is
968 also a _pc() variation used to make the corresponding change on
969 a process copy (pc) lkb. */
970
971static void _remove_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
972{
973 del_lkb(r, lkb);
974 lkb->lkb_grmode = DLM_LOCK_IV;
975 /* this unhold undoes the original ref from create_lkb()
976 so this leads to the lkb being freed */
977 unhold_lkb(lkb);
978}
979
980static void remove_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
981{
982 set_lvb_unlock(r, lkb);
983 _remove_lock(r, lkb);
984}
985
986static void remove_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb)
987{
988 _remove_lock(r, lkb);
989}
990
991static void revert_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
992{
993 lkb->lkb_rqmode = DLM_LOCK_IV;
994
995 switch (lkb->lkb_status) {
597d0cae
DT
996 case DLM_LKSTS_GRANTED:
997 break;
e7fd4179
DT
998 case DLM_LKSTS_CONVERT:
999 move_lkb(r, lkb, DLM_LKSTS_GRANTED);
1000 break;
1001 case DLM_LKSTS_WAITING:
1002 del_lkb(r, lkb);
1003 lkb->lkb_grmode = DLM_LOCK_IV;
1004 /* this unhold undoes the original ref from create_lkb()
1005 so this leads to the lkb being freed */
1006 unhold_lkb(lkb);
1007 break;
1008 default:
1009 log_print("invalid status for revert %d", lkb->lkb_status);
1010 }
1011}
1012
1013static void revert_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb)
1014{
1015 revert_lock(r, lkb);
1016}
1017
1018static void _grant_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1019{
1020 if (lkb->lkb_grmode != lkb->lkb_rqmode) {
1021 lkb->lkb_grmode = lkb->lkb_rqmode;
1022 if (lkb->lkb_status)
1023 move_lkb(r, lkb, DLM_LKSTS_GRANTED);
1024 else
1025 add_lkb(r, lkb, DLM_LKSTS_GRANTED);
1026 }
1027
1028 lkb->lkb_rqmode = DLM_LOCK_IV;
e7fd4179
DT
1029}
1030
1031static void grant_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1032{
1033 set_lvb_lock(r, lkb);
1034 _grant_lock(r, lkb);
1035 lkb->lkb_highbast = 0;
1036}
1037
1038static void grant_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb,
1039 struct dlm_message *ms)
1040{
1041 set_lvb_lock_pc(r, lkb, ms);
1042 _grant_lock(r, lkb);
1043}
1044
1045/* called by grant_pending_locks() which means an async grant message must
1046 be sent to the requesting node in addition to granting the lock if the
1047 lkb belongs to a remote node. */
1048
1049static void grant_lock_pending(struct dlm_rsb *r, struct dlm_lkb *lkb)
1050{
1051 grant_lock(r, lkb);
1052 if (is_master_copy(lkb))
1053 send_grant(r, lkb);
1054 else
1055 queue_cast(r, lkb, 0);
1056}
1057
1058static inline int first_in_list(struct dlm_lkb *lkb, struct list_head *head)
1059{
1060 struct dlm_lkb *first = list_entry(head->next, struct dlm_lkb,
1061 lkb_statequeue);
1062 if (lkb->lkb_id == first->lkb_id)
90135925 1063 return 1;
e7fd4179 1064
90135925 1065 return 0;
e7fd4179
DT
1066}
1067
e7fd4179
DT
1068/* Check if the given lkb conflicts with another lkb on the queue. */
1069
1070static int queue_conflict(struct list_head *head, struct dlm_lkb *lkb)
1071{
1072 struct dlm_lkb *this;
1073
1074 list_for_each_entry(this, head, lkb_statequeue) {
1075 if (this == lkb)
1076 continue;
3bcd3687 1077 if (!modes_compat(this, lkb))
90135925 1078 return 1;
e7fd4179 1079 }
90135925 1080 return 0;
e7fd4179
DT
1081}
1082
1083/*
1084 * "A conversion deadlock arises with a pair of lock requests in the converting
1085 * queue for one resource. The granted mode of each lock blocks the requested
1086 * mode of the other lock."
1087 *
1088 * Part 2: if the granted mode of lkb is preventing the first lkb in the
1089 * convert queue from being granted, then demote lkb (set grmode to NL).
1090 * This second form requires that we check for conv-deadlk even when
1091 * now == 0 in _can_be_granted().
1092 *
1093 * Example:
1094 * Granted Queue: empty
1095 * Convert Queue: NL->EX (first lock)
1096 * PR->EX (second lock)
1097 *
1098 * The first lock can't be granted because of the granted mode of the second
1099 * lock and the second lock can't be granted because it's not first in the
1100 * list. We demote the granted mode of the second lock (the lkb passed to this
1101 * function).
1102 *
1103 * After the resolution, the "grant pending" function needs to go back and try
1104 * to grant locks on the convert queue again since the first lock can now be
1105 * granted.
1106 */
1107
1108static int conversion_deadlock_detect(struct dlm_rsb *rsb, struct dlm_lkb *lkb)
1109{
1110 struct dlm_lkb *this, *first = NULL, *self = NULL;
1111
1112 list_for_each_entry(this, &rsb->res_convertqueue, lkb_statequeue) {
1113 if (!first)
1114 first = this;
1115 if (this == lkb) {
1116 self = lkb;
1117 continue;
1118 }
1119
e7fd4179 1120 if (!modes_compat(this, lkb) && !modes_compat(lkb, this))
90135925 1121 return 1;
e7fd4179
DT
1122 }
1123
1124 /* if lkb is on the convert queue and is preventing the first
1125 from being granted, then there's deadlock and we demote lkb.
1126 multiple converting locks may need to do this before the first
1127 converting lock can be granted. */
1128
1129 if (self && self != first) {
1130 if (!modes_compat(lkb, first) &&
1131 !queue_conflict(&rsb->res_grantqueue, first))
90135925 1132 return 1;
e7fd4179
DT
1133 }
1134
90135925 1135 return 0;
e7fd4179
DT
1136}
1137
1138/*
1139 * Return 1 if the lock can be granted, 0 otherwise.
1140 * Also detect and resolve conversion deadlocks.
1141 *
1142 * lkb is the lock to be granted
1143 *
1144 * now is 1 if the function is being called in the context of the
1145 * immediate request, it is 0 if called later, after the lock has been
1146 * queued.
1147 *
1148 * References are from chapter 6 of "VAXcluster Principles" by Roy Davis
1149 */
1150
1151static int _can_be_granted(struct dlm_rsb *r, struct dlm_lkb *lkb, int now)
1152{
1153 int8_t conv = (lkb->lkb_grmode != DLM_LOCK_IV);
1154
1155 /*
1156 * 6-10: Version 5.4 introduced an option to address the phenomenon of
1157 * a new request for a NL mode lock being blocked.
1158 *
1159 * 6-11: If the optional EXPEDITE flag is used with the new NL mode
1160 * request, then it would be granted. In essence, the use of this flag
1161 * tells the Lock Manager to expedite theis request by not considering
1162 * what may be in the CONVERTING or WAITING queues... As of this
1163 * writing, the EXPEDITE flag can be used only with new requests for NL
1164 * mode locks. This flag is not valid for conversion requests.
1165 *
1166 * A shortcut. Earlier checks return an error if EXPEDITE is used in a
1167 * conversion or used with a non-NL requested mode. We also know an
1168 * EXPEDITE request is always granted immediately, so now must always
1169 * be 1. The full condition to grant an expedite request: (now &&
1170 * !conv && lkb->rqmode == DLM_LOCK_NL && (flags & EXPEDITE)) can
1171 * therefore be shortened to just checking the flag.
1172 */
1173
1174 if (lkb->lkb_exflags & DLM_LKF_EXPEDITE)
90135925 1175 return 1;
e7fd4179
DT
1176
1177 /*
1178 * A shortcut. Without this, !queue_conflict(grantqueue, lkb) would be
1179 * added to the remaining conditions.
1180 */
1181
1182 if (queue_conflict(&r->res_grantqueue, lkb))
1183 goto out;
1184
1185 /*
1186 * 6-3: By default, a conversion request is immediately granted if the
1187 * requested mode is compatible with the modes of all other granted
1188 * locks
1189 */
1190
1191 if (queue_conflict(&r->res_convertqueue, lkb))
1192 goto out;
1193
1194 /*
1195 * 6-5: But the default algorithm for deciding whether to grant or
1196 * queue conversion requests does not by itself guarantee that such
1197 * requests are serviced on a "first come first serve" basis. This, in
1198 * turn, can lead to a phenomenon known as "indefinate postponement".
1199 *
1200 * 6-7: This issue is dealt with by using the optional QUECVT flag with
1201 * the system service employed to request a lock conversion. This flag
1202 * forces certain conversion requests to be queued, even if they are
1203 * compatible with the granted modes of other locks on the same
1204 * resource. Thus, the use of this flag results in conversion requests
1205 * being ordered on a "first come first servce" basis.
1206 *
1207 * DCT: This condition is all about new conversions being able to occur
1208 * "in place" while the lock remains on the granted queue (assuming
1209 * nothing else conflicts.) IOW if QUECVT isn't set, a conversion
1210 * doesn't _have_ to go onto the convert queue where it's processed in
1211 * order. The "now" variable is necessary to distinguish converts
1212 * being received and processed for the first time now, because once a
1213 * convert is moved to the conversion queue the condition below applies
1214 * requiring fifo granting.
1215 */
1216
1217 if (now && conv && !(lkb->lkb_exflags & DLM_LKF_QUECVT))
90135925 1218 return 1;
e7fd4179
DT
1219
1220 /*
3bcd3687
DT
1221 * The NOORDER flag is set to avoid the standard vms rules on grant
1222 * order.
e7fd4179
DT
1223 */
1224
1225 if (lkb->lkb_exflags & DLM_LKF_NOORDER)
90135925 1226 return 1;
e7fd4179
DT
1227
1228 /*
1229 * 6-3: Once in that queue [CONVERTING], a conversion request cannot be
1230 * granted until all other conversion requests ahead of it are granted
1231 * and/or canceled.
1232 */
1233
1234 if (!now && conv && first_in_list(lkb, &r->res_convertqueue))
90135925 1235 return 1;
e7fd4179
DT
1236
1237 /*
1238 * 6-4: By default, a new request is immediately granted only if all
1239 * three of the following conditions are satisfied when the request is
1240 * issued:
1241 * - The queue of ungranted conversion requests for the resource is
1242 * empty.
1243 * - The queue of ungranted new requests for the resource is empty.
1244 * - The mode of the new request is compatible with the most
1245 * restrictive mode of all granted locks on the resource.
1246 */
1247
1248 if (now && !conv && list_empty(&r->res_convertqueue) &&
1249 list_empty(&r->res_waitqueue))
90135925 1250 return 1;
e7fd4179
DT
1251
1252 /*
1253 * 6-4: Once a lock request is in the queue of ungranted new requests,
1254 * it cannot be granted until the queue of ungranted conversion
1255 * requests is empty, all ungranted new requests ahead of it are
1256 * granted and/or canceled, and it is compatible with the granted mode
1257 * of the most restrictive lock granted on the resource.
1258 */
1259
1260 if (!now && !conv && list_empty(&r->res_convertqueue) &&
1261 first_in_list(lkb, &r->res_waitqueue))
90135925 1262 return 1;
e7fd4179
DT
1263
1264 out:
1265 /*
1266 * The following, enabled by CONVDEADLK, departs from VMS.
1267 */
1268
1269 if (conv && (lkb->lkb_exflags & DLM_LKF_CONVDEADLK) &&
1270 conversion_deadlock_detect(r, lkb)) {
1271 lkb->lkb_grmode = DLM_LOCK_NL;
1272 lkb->lkb_sbflags |= DLM_SBF_DEMOTED;
1273 }
1274
90135925 1275 return 0;
e7fd4179
DT
1276}
1277
1278/*
1279 * The ALTPR and ALTCW flags aren't traditional lock manager flags, but are a
1280 * simple way to provide a big optimization to applications that can use them.
1281 */
1282
1283static int can_be_granted(struct dlm_rsb *r, struct dlm_lkb *lkb, int now)
1284{
1285 uint32_t flags = lkb->lkb_exflags;
1286 int rv;
1287 int8_t alt = 0, rqmode = lkb->lkb_rqmode;
1288
1289 rv = _can_be_granted(r, lkb, now);
1290 if (rv)
1291 goto out;
1292
1293 if (lkb->lkb_sbflags & DLM_SBF_DEMOTED)
1294 goto out;
1295
1296 if (rqmode != DLM_LOCK_PR && flags & DLM_LKF_ALTPR)
1297 alt = DLM_LOCK_PR;
1298 else if (rqmode != DLM_LOCK_CW && flags & DLM_LKF_ALTCW)
1299 alt = DLM_LOCK_CW;
1300
1301 if (alt) {
1302 lkb->lkb_rqmode = alt;
1303 rv = _can_be_granted(r, lkb, now);
1304 if (rv)
1305 lkb->lkb_sbflags |= DLM_SBF_ALTMODE;
1306 else
1307 lkb->lkb_rqmode = rqmode;
1308 }
1309 out:
1310 return rv;
1311}
1312
1313static int grant_pending_convert(struct dlm_rsb *r, int high)
1314{
1315 struct dlm_lkb *lkb, *s;
1316 int hi, demoted, quit, grant_restart, demote_restart;
1317
1318 quit = 0;
1319 restart:
1320 grant_restart = 0;
1321 demote_restart = 0;
1322 hi = DLM_LOCK_IV;
1323
1324 list_for_each_entry_safe(lkb, s, &r->res_convertqueue, lkb_statequeue) {
1325 demoted = is_demoted(lkb);
90135925 1326 if (can_be_granted(r, lkb, 0)) {
e7fd4179
DT
1327 grant_lock_pending(r, lkb);
1328 grant_restart = 1;
1329 } else {
1330 hi = max_t(int, lkb->lkb_rqmode, hi);
1331 if (!demoted && is_demoted(lkb))
1332 demote_restart = 1;
1333 }
1334 }
1335
1336 if (grant_restart)
1337 goto restart;
1338 if (demote_restart && !quit) {
1339 quit = 1;
1340 goto restart;
1341 }
1342
1343 return max_t(int, high, hi);
1344}
1345
1346static int grant_pending_wait(struct dlm_rsb *r, int high)
1347{
1348 struct dlm_lkb *lkb, *s;
1349
1350 list_for_each_entry_safe(lkb, s, &r->res_waitqueue, lkb_statequeue) {
90135925 1351 if (can_be_granted(r, lkb, 0))
e7fd4179
DT
1352 grant_lock_pending(r, lkb);
1353 else
1354 high = max_t(int, lkb->lkb_rqmode, high);
1355 }
1356
1357 return high;
1358}
1359
1360static void grant_pending_locks(struct dlm_rsb *r)
1361{
1362 struct dlm_lkb *lkb, *s;
1363 int high = DLM_LOCK_IV;
1364
a345da3e 1365 DLM_ASSERT(is_master(r), dlm_dump_rsb(r););
e7fd4179
DT
1366
1367 high = grant_pending_convert(r, high);
1368 high = grant_pending_wait(r, high);
1369
1370 if (high == DLM_LOCK_IV)
1371 return;
1372
1373 /*
1374 * If there are locks left on the wait/convert queue then send blocking
1375 * ASTs to granted locks based on the largest requested mode (high)
3bcd3687 1376 * found above. FIXME: highbast < high comparison not valid for PR/CW.
e7fd4179
DT
1377 */
1378
1379 list_for_each_entry_safe(lkb, s, &r->res_grantqueue, lkb_statequeue) {
1380 if (lkb->lkb_bastaddr && (lkb->lkb_highbast < high) &&
1381 !__dlm_compat_matrix[lkb->lkb_grmode+1][high+1]) {
1382 queue_bast(r, lkb, high);
1383 lkb->lkb_highbast = high;
1384 }
1385 }
1386}
1387
1388static void send_bast_queue(struct dlm_rsb *r, struct list_head *head,
1389 struct dlm_lkb *lkb)
1390{
1391 struct dlm_lkb *gr;
1392
1393 list_for_each_entry(gr, head, lkb_statequeue) {
1394 if (gr->lkb_bastaddr &&
1395 gr->lkb_highbast < lkb->lkb_rqmode &&
3bcd3687 1396 !modes_compat(gr, lkb)) {
e7fd4179
DT
1397 queue_bast(r, gr, lkb->lkb_rqmode);
1398 gr->lkb_highbast = lkb->lkb_rqmode;
1399 }
1400 }
1401}
1402
1403static void send_blocking_asts(struct dlm_rsb *r, struct dlm_lkb *lkb)
1404{
1405 send_bast_queue(r, &r->res_grantqueue, lkb);
1406}
1407
1408static void send_blocking_asts_all(struct dlm_rsb *r, struct dlm_lkb *lkb)
1409{
1410 send_bast_queue(r, &r->res_grantqueue, lkb);
1411 send_bast_queue(r, &r->res_convertqueue, lkb);
1412}
1413
1414/* set_master(r, lkb) -- set the master nodeid of a resource
1415
1416 The purpose of this function is to set the nodeid field in the given
1417 lkb using the nodeid field in the given rsb. If the rsb's nodeid is
1418 known, it can just be copied to the lkb and the function will return
1419 0. If the rsb's nodeid is _not_ known, it needs to be looked up
1420 before it can be copied to the lkb.
1421
1422 When the rsb nodeid is being looked up remotely, the initial lkb
1423 causing the lookup is kept on the ls_waiters list waiting for the
1424 lookup reply. Other lkb's waiting for the same rsb lookup are kept
1425 on the rsb's res_lookup list until the master is verified.
1426
1427 Return values:
1428 0: nodeid is set in rsb/lkb and the caller should go ahead and use it
1429 1: the rsb master is not available and the lkb has been placed on
1430 a wait queue
1431*/
1432
1433static int set_master(struct dlm_rsb *r, struct dlm_lkb *lkb)
1434{
1435 struct dlm_ls *ls = r->res_ls;
1436 int error, dir_nodeid, ret_nodeid, our_nodeid = dlm_our_nodeid();
1437
1438 if (rsb_flag(r, RSB_MASTER_UNCERTAIN)) {
1439 rsb_clear_flag(r, RSB_MASTER_UNCERTAIN);
1440 r->res_first_lkid = lkb->lkb_id;
1441 lkb->lkb_nodeid = r->res_nodeid;
1442 return 0;
1443 }
1444
1445 if (r->res_first_lkid && r->res_first_lkid != lkb->lkb_id) {
1446 list_add_tail(&lkb->lkb_rsb_lookup, &r->res_lookup);
1447 return 1;
1448 }
1449
1450 if (r->res_nodeid == 0) {
1451 lkb->lkb_nodeid = 0;
1452 return 0;
1453 }
1454
1455 if (r->res_nodeid > 0) {
1456 lkb->lkb_nodeid = r->res_nodeid;
1457 return 0;
1458 }
1459
a345da3e 1460 DLM_ASSERT(r->res_nodeid == -1, dlm_dump_rsb(r););
e7fd4179
DT
1461
1462 dir_nodeid = dlm_dir_nodeid(r);
1463
1464 if (dir_nodeid != our_nodeid) {
1465 r->res_first_lkid = lkb->lkb_id;
1466 send_lookup(r, lkb);
1467 return 1;
1468 }
1469
1470 for (;;) {
1471 /* It's possible for dlm_scand to remove an old rsb for
1472 this same resource from the toss list, us to create
1473 a new one, look up the master locally, and find it
1474 already exists just before dlm_scand does the
1475 dir_remove() on the previous rsb. */
1476
1477 error = dlm_dir_lookup(ls, our_nodeid, r->res_name,
1478 r->res_length, &ret_nodeid);
1479 if (!error)
1480 break;
1481 log_debug(ls, "dir_lookup error %d %s", error, r->res_name);
1482 schedule();
1483 }
1484
1485 if (ret_nodeid == our_nodeid) {
1486 r->res_first_lkid = 0;
1487 r->res_nodeid = 0;
1488 lkb->lkb_nodeid = 0;
1489 } else {
1490 r->res_first_lkid = lkb->lkb_id;
1491 r->res_nodeid = ret_nodeid;
1492 lkb->lkb_nodeid = ret_nodeid;
1493 }
1494 return 0;
1495}
1496
1497static void process_lookup_list(struct dlm_rsb *r)
1498{
1499 struct dlm_lkb *lkb, *safe;
1500
1501 list_for_each_entry_safe(lkb, safe, &r->res_lookup, lkb_rsb_lookup) {
1502 list_del(&lkb->lkb_rsb_lookup);
1503 _request_lock(r, lkb);
1504 schedule();
1505 }
1506}
1507
1508/* confirm_master -- confirm (or deny) an rsb's master nodeid */
1509
1510static void confirm_master(struct dlm_rsb *r, int error)
1511{
1512 struct dlm_lkb *lkb;
1513
1514 if (!r->res_first_lkid)
1515 return;
1516
1517 switch (error) {
1518 case 0:
1519 case -EINPROGRESS:
1520 r->res_first_lkid = 0;
1521 process_lookup_list(r);
1522 break;
1523
1524 case -EAGAIN:
1525 /* the remote master didn't queue our NOQUEUE request;
1526 make a waiting lkb the first_lkid */
1527
1528 r->res_first_lkid = 0;
1529
1530 if (!list_empty(&r->res_lookup)) {
1531 lkb = list_entry(r->res_lookup.next, struct dlm_lkb,
1532 lkb_rsb_lookup);
1533 list_del(&lkb->lkb_rsb_lookup);
1534 r->res_first_lkid = lkb->lkb_id;
1535 _request_lock(r, lkb);
1536 } else
1537 r->res_nodeid = -1;
1538 break;
1539
1540 default:
1541 log_error(r->res_ls, "confirm_master unknown error %d", error);
1542 }
1543}
1544
1545static int set_lock_args(int mode, struct dlm_lksb *lksb, uint32_t flags,
1546 int namelen, uint32_t parent_lkid, void *ast,
3bcd3687 1547 void *astarg, void *bast, struct dlm_args *args)
e7fd4179
DT
1548{
1549 int rv = -EINVAL;
1550
1551 /* check for invalid arg usage */
1552
1553 if (mode < 0 || mode > DLM_LOCK_EX)
1554 goto out;
1555
1556 if (!(flags & DLM_LKF_CONVERT) && (namelen > DLM_RESNAME_MAXLEN))
1557 goto out;
1558
1559 if (flags & DLM_LKF_CANCEL)
1560 goto out;
1561
1562 if (flags & DLM_LKF_QUECVT && !(flags & DLM_LKF_CONVERT))
1563 goto out;
1564
1565 if (flags & DLM_LKF_CONVDEADLK && !(flags & DLM_LKF_CONVERT))
1566 goto out;
1567
1568 if (flags & DLM_LKF_CONVDEADLK && flags & DLM_LKF_NOQUEUE)
1569 goto out;
1570
1571 if (flags & DLM_LKF_EXPEDITE && flags & DLM_LKF_CONVERT)
1572 goto out;
1573
1574 if (flags & DLM_LKF_EXPEDITE && flags & DLM_LKF_QUECVT)
1575 goto out;
1576
1577 if (flags & DLM_LKF_EXPEDITE && flags & DLM_LKF_NOQUEUE)
1578 goto out;
1579
1580 if (flags & DLM_LKF_EXPEDITE && mode != DLM_LOCK_NL)
1581 goto out;
1582
1583 if (!ast || !lksb)
1584 goto out;
1585
1586 if (flags & DLM_LKF_VALBLK && !lksb->sb_lvbptr)
1587 goto out;
1588
1589 /* parent/child locks not yet supported */
1590 if (parent_lkid)
1591 goto out;
1592
1593 if (flags & DLM_LKF_CONVERT && !lksb->sb_lkid)
1594 goto out;
1595
1596 /* these args will be copied to the lkb in validate_lock_args,
1597 it cannot be done now because when converting locks, fields in
1598 an active lkb cannot be modified before locking the rsb */
1599
1600 args->flags = flags;
1601 args->astaddr = ast;
1602 args->astparam = (long) astarg;
1603 args->bastaddr = bast;
1604 args->mode = mode;
1605 args->lksb = lksb;
e7fd4179
DT
1606 rv = 0;
1607 out:
1608 return rv;
1609}
1610
1611static int set_unlock_args(uint32_t flags, void *astarg, struct dlm_args *args)
1612{
1613 if (flags & ~(DLM_LKF_CANCEL | DLM_LKF_VALBLK | DLM_LKF_IVVALBLK |
1614 DLM_LKF_FORCEUNLOCK))
1615 return -EINVAL;
1616
1617 args->flags = flags;
1618 args->astparam = (long) astarg;
1619 return 0;
1620}
1621
1622static int validate_lock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
1623 struct dlm_args *args)
1624{
1625 int rv = -EINVAL;
1626
1627 if (args->flags & DLM_LKF_CONVERT) {
1628 if (lkb->lkb_flags & DLM_IFL_MSTCPY)
1629 goto out;
1630
1631 if (args->flags & DLM_LKF_QUECVT &&
1632 !__quecvt_compat_matrix[lkb->lkb_grmode+1][args->mode+1])
1633 goto out;
1634
1635 rv = -EBUSY;
1636 if (lkb->lkb_status != DLM_LKSTS_GRANTED)
1637 goto out;
1638
1639 if (lkb->lkb_wait_type)
1640 goto out;
1641 }
1642
1643 lkb->lkb_exflags = args->flags;
1644 lkb->lkb_sbflags = 0;
1645 lkb->lkb_astaddr = args->astaddr;
1646 lkb->lkb_astparam = args->astparam;
1647 lkb->lkb_bastaddr = args->bastaddr;
1648 lkb->lkb_rqmode = args->mode;
1649 lkb->lkb_lksb = args->lksb;
1650 lkb->lkb_lvbptr = args->lksb->sb_lvbptr;
1651 lkb->lkb_ownpid = (int) current->pid;
e7fd4179
DT
1652 rv = 0;
1653 out:
1654 return rv;
1655}
1656
1657static int validate_unlock_args(struct dlm_lkb *lkb, struct dlm_args *args)
1658{
1659 int rv = -EINVAL;
1660
1661 if (lkb->lkb_flags & DLM_IFL_MSTCPY)
1662 goto out;
1663
1664 if (args->flags & DLM_LKF_FORCEUNLOCK)
1665 goto out_ok;
1666
1667 if (args->flags & DLM_LKF_CANCEL &&
1668 lkb->lkb_status == DLM_LKSTS_GRANTED)
1669 goto out;
1670
1671 if (!(args->flags & DLM_LKF_CANCEL) &&
1672 lkb->lkb_status != DLM_LKSTS_GRANTED)
1673 goto out;
1674
1675 rv = -EBUSY;
1676 if (lkb->lkb_wait_type)
1677 goto out;
1678
1679 out_ok:
1680 lkb->lkb_exflags = args->flags;
1681 lkb->lkb_sbflags = 0;
1682 lkb->lkb_astparam = args->astparam;
1683
1684 rv = 0;
1685 out:
1686 return rv;
1687}
1688
1689/*
1690 * Four stage 4 varieties:
1691 * do_request(), do_convert(), do_unlock(), do_cancel()
1692 * These are called on the master node for the given lock and
1693 * from the central locking logic.
1694 */
1695
1696static int do_request(struct dlm_rsb *r, struct dlm_lkb *lkb)
1697{
1698 int error = 0;
1699
90135925 1700 if (can_be_granted(r, lkb, 1)) {
e7fd4179
DT
1701 grant_lock(r, lkb);
1702 queue_cast(r, lkb, 0);
1703 goto out;
1704 }
1705
1706 if (can_be_queued(lkb)) {
1707 error = -EINPROGRESS;
1708 add_lkb(r, lkb, DLM_LKSTS_WAITING);
1709 send_blocking_asts(r, lkb);
1710 goto out;
1711 }
1712
1713 error = -EAGAIN;
1714 if (force_blocking_asts(lkb))
1715 send_blocking_asts_all(r, lkb);
1716 queue_cast(r, lkb, -EAGAIN);
1717
1718 out:
1719 return error;
1720}
1721
1722static int do_convert(struct dlm_rsb *r, struct dlm_lkb *lkb)
1723{
1724 int error = 0;
1725
1726 /* changing an existing lock may allow others to be granted */
1727
90135925 1728 if (can_be_granted(r, lkb, 1)) {
e7fd4179
DT
1729 grant_lock(r, lkb);
1730 queue_cast(r, lkb, 0);
1731 grant_pending_locks(r);
1732 goto out;
1733 }
1734
1735 if (can_be_queued(lkb)) {
1736 if (is_demoted(lkb))
1737 grant_pending_locks(r);
1738 error = -EINPROGRESS;
1739 del_lkb(r, lkb);
1740 add_lkb(r, lkb, DLM_LKSTS_CONVERT);
1741 send_blocking_asts(r, lkb);
1742 goto out;
1743 }
1744
1745 error = -EAGAIN;
1746 if (force_blocking_asts(lkb))
1747 send_blocking_asts_all(r, lkb);
1748 queue_cast(r, lkb, -EAGAIN);
1749
1750 out:
1751 return error;
1752}
1753
1754static int do_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1755{
1756 remove_lock(r, lkb);
1757 queue_cast(r, lkb, -DLM_EUNLOCK);
1758 grant_pending_locks(r);
1759 return -DLM_EUNLOCK;
1760}
1761
597d0cae
DT
1762/* FIXME: if revert_lock() finds that the lkb is granted, we should
1763 skip the queue_cast(ECANCEL). It indicates that the request/convert
1764 completed (and queued a normal ast) just before the cancel; we don't
1765 want to clobber the sb_result for the normal ast with ECANCEL. */
907b9bce 1766
e7fd4179
DT
1767static int do_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb)
1768{
1769 revert_lock(r, lkb);
1770 queue_cast(r, lkb, -DLM_ECANCEL);
1771 grant_pending_locks(r);
1772 return -DLM_ECANCEL;
1773}
1774
1775/*
1776 * Four stage 3 varieties:
1777 * _request_lock(), _convert_lock(), _unlock_lock(), _cancel_lock()
1778 */
1779
1780/* add a new lkb to a possibly new rsb, called by requesting process */
1781
1782static int _request_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1783{
1784 int error;
1785
1786 /* set_master: sets lkb nodeid from r */
1787
1788 error = set_master(r, lkb);
1789 if (error < 0)
1790 goto out;
1791 if (error) {
1792 error = 0;
1793 goto out;
1794 }
1795
1796 if (is_remote(r))
1797 /* receive_request() calls do_request() on remote node */
1798 error = send_request(r, lkb);
1799 else
1800 error = do_request(r, lkb);
1801 out:
1802 return error;
1803}
1804
3bcd3687 1805/* change some property of an existing lkb, e.g. mode */
e7fd4179
DT
1806
1807static int _convert_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1808{
1809 int error;
1810
1811 if (is_remote(r))
1812 /* receive_convert() calls do_convert() on remote node */
1813 error = send_convert(r, lkb);
1814 else
1815 error = do_convert(r, lkb);
1816
1817 return error;
1818}
1819
1820/* remove an existing lkb from the granted queue */
1821
1822static int _unlock_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1823{
1824 int error;
1825
1826 if (is_remote(r))
1827 /* receive_unlock() calls do_unlock() on remote node */
1828 error = send_unlock(r, lkb);
1829 else
1830 error = do_unlock(r, lkb);
1831
1832 return error;
1833}
1834
1835/* remove an existing lkb from the convert or wait queue */
1836
1837static int _cancel_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1838{
1839 int error;
1840
1841 if (is_remote(r))
1842 /* receive_cancel() calls do_cancel() on remote node */
1843 error = send_cancel(r, lkb);
1844 else
1845 error = do_cancel(r, lkb);
1846
1847 return error;
1848}
1849
1850/*
1851 * Four stage 2 varieties:
1852 * request_lock(), convert_lock(), unlock_lock(), cancel_lock()
1853 */
1854
1855static int request_lock(struct dlm_ls *ls, struct dlm_lkb *lkb, char *name,
1856 int len, struct dlm_args *args)
1857{
1858 struct dlm_rsb *r;
1859 int error;
1860
1861 error = validate_lock_args(ls, lkb, args);
1862 if (error)
1863 goto out;
1864
1865 error = find_rsb(ls, name, len, R_CREATE, &r);
1866 if (error)
1867 goto out;
1868
1869 lock_rsb(r);
1870
1871 attach_lkb(r, lkb);
1872 lkb->lkb_lksb->sb_lkid = lkb->lkb_id;
1873
1874 error = _request_lock(r, lkb);
1875
1876 unlock_rsb(r);
1877 put_rsb(r);
1878
1879 out:
1880 return error;
1881}
1882
1883static int convert_lock(struct dlm_ls *ls, struct dlm_lkb *lkb,
1884 struct dlm_args *args)
1885{
1886 struct dlm_rsb *r;
1887 int error;
1888
1889 r = lkb->lkb_resource;
1890
1891 hold_rsb(r);
1892 lock_rsb(r);
1893
1894 error = validate_lock_args(ls, lkb, args);
1895 if (error)
1896 goto out;
1897
1898 error = _convert_lock(r, lkb);
1899 out:
1900 unlock_rsb(r);
1901 put_rsb(r);
1902 return error;
1903}
1904
1905static int unlock_lock(struct dlm_ls *ls, struct dlm_lkb *lkb,
1906 struct dlm_args *args)
1907{
1908 struct dlm_rsb *r;
1909 int error;
1910
1911 r = lkb->lkb_resource;
1912
1913 hold_rsb(r);
1914 lock_rsb(r);
1915
1916 error = validate_unlock_args(lkb, args);
1917 if (error)
1918 goto out;
1919
1920 error = _unlock_lock(r, lkb);
1921 out:
1922 unlock_rsb(r);
1923 put_rsb(r);
1924 return error;
1925}
1926
1927static int cancel_lock(struct dlm_ls *ls, struct dlm_lkb *lkb,
1928 struct dlm_args *args)
1929{
1930 struct dlm_rsb *r;
1931 int error;
1932
1933 r = lkb->lkb_resource;
1934
1935 hold_rsb(r);
1936 lock_rsb(r);
1937
1938 error = validate_unlock_args(lkb, args);
1939 if (error)
1940 goto out;
1941
1942 error = _cancel_lock(r, lkb);
1943 out:
1944 unlock_rsb(r);
1945 put_rsb(r);
1946 return error;
1947}
1948
1949/*
1950 * Two stage 1 varieties: dlm_lock() and dlm_unlock()
1951 */
1952
1953int dlm_lock(dlm_lockspace_t *lockspace,
1954 int mode,
1955 struct dlm_lksb *lksb,
1956 uint32_t flags,
1957 void *name,
1958 unsigned int namelen,
1959 uint32_t parent_lkid,
1960 void (*ast) (void *astarg),
1961 void *astarg,
3bcd3687 1962 void (*bast) (void *astarg, int mode))
e7fd4179
DT
1963{
1964 struct dlm_ls *ls;
1965 struct dlm_lkb *lkb;
1966 struct dlm_args args;
1967 int error, convert = flags & DLM_LKF_CONVERT;
1968
1969 ls = dlm_find_lockspace_local(lockspace);
1970 if (!ls)
1971 return -EINVAL;
1972
1973 lock_recovery(ls);
1974
1975 if (convert)
1976 error = find_lkb(ls, lksb->sb_lkid, &lkb);
1977 else
1978 error = create_lkb(ls, &lkb);
1979
1980 if (error)
1981 goto out;
1982
1983 error = set_lock_args(mode, lksb, flags, namelen, parent_lkid, ast,
3bcd3687 1984 astarg, bast, &args);
e7fd4179
DT
1985 if (error)
1986 goto out_put;
1987
1988 if (convert)
1989 error = convert_lock(ls, lkb, &args);
1990 else
1991 error = request_lock(ls, lkb, name, namelen, &args);
1992
1993 if (error == -EINPROGRESS)
1994 error = 0;
1995 out_put:
1996 if (convert || error)
b3f58d8f 1997 __put_lkb(ls, lkb);
e7fd4179
DT
1998 if (error == -EAGAIN)
1999 error = 0;
2000 out:
2001 unlock_recovery(ls);
2002 dlm_put_lockspace(ls);
2003 return error;
2004}
2005
2006int dlm_unlock(dlm_lockspace_t *lockspace,
2007 uint32_t lkid,
2008 uint32_t flags,
2009 struct dlm_lksb *lksb,
2010 void *astarg)
2011{
2012 struct dlm_ls *ls;
2013 struct dlm_lkb *lkb;
2014 struct dlm_args args;
2015 int error;
2016
2017 ls = dlm_find_lockspace_local(lockspace);
2018 if (!ls)
2019 return -EINVAL;
2020
2021 lock_recovery(ls);
2022
2023 error = find_lkb(ls, lkid, &lkb);
2024 if (error)
2025 goto out;
2026
2027 error = set_unlock_args(flags, astarg, &args);
2028 if (error)
2029 goto out_put;
2030
2031 if (flags & DLM_LKF_CANCEL)
2032 error = cancel_lock(ls, lkb, &args);
2033 else
2034 error = unlock_lock(ls, lkb, &args);
2035
2036 if (error == -DLM_EUNLOCK || error == -DLM_ECANCEL)
2037 error = 0;
2038 out_put:
b3f58d8f 2039 dlm_put_lkb(lkb);
e7fd4179
DT
2040 out:
2041 unlock_recovery(ls);
2042 dlm_put_lockspace(ls);
2043 return error;
2044}
2045
2046/*
2047 * send/receive routines for remote operations and replies
2048 *
2049 * send_args
2050 * send_common
2051 * send_request receive_request
2052 * send_convert receive_convert
2053 * send_unlock receive_unlock
2054 * send_cancel receive_cancel
2055 * send_grant receive_grant
2056 * send_bast receive_bast
2057 * send_lookup receive_lookup
2058 * send_remove receive_remove
2059 *
2060 * send_common_reply
2061 * receive_request_reply send_request_reply
2062 * receive_convert_reply send_convert_reply
2063 * receive_unlock_reply send_unlock_reply
2064 * receive_cancel_reply send_cancel_reply
2065 * receive_lookup_reply send_lookup_reply
2066 */
2067
2068static int create_message(struct dlm_rsb *r, struct dlm_lkb *lkb,
2069 int to_nodeid, int mstype,
2070 struct dlm_message **ms_ret,
2071 struct dlm_mhandle **mh_ret)
2072{
2073 struct dlm_message *ms;
2074 struct dlm_mhandle *mh;
2075 char *mb;
2076 int mb_len = sizeof(struct dlm_message);
2077
2078 switch (mstype) {
2079 case DLM_MSG_REQUEST:
2080 case DLM_MSG_LOOKUP:
2081 case DLM_MSG_REMOVE:
2082 mb_len += r->res_length;
2083 break;
2084 case DLM_MSG_CONVERT:
2085 case DLM_MSG_UNLOCK:
2086 case DLM_MSG_REQUEST_REPLY:
2087 case DLM_MSG_CONVERT_REPLY:
2088 case DLM_MSG_GRANT:
2089 if (lkb && lkb->lkb_lvbptr)
2090 mb_len += r->res_ls->ls_lvblen;
2091 break;
2092 }
2093
2094 /* get_buffer gives us a message handle (mh) that we need to
2095 pass into lowcomms_commit and a message buffer (mb) that we
2096 write our data into */
2097
2098 mh = dlm_lowcomms_get_buffer(to_nodeid, mb_len, GFP_KERNEL, &mb);
2099 if (!mh)
2100 return -ENOBUFS;
2101
2102 memset(mb, 0, mb_len);
2103
2104 ms = (struct dlm_message *) mb;
2105
2106 ms->m_header.h_version = (DLM_HEADER_MAJOR | DLM_HEADER_MINOR);
2107 ms->m_header.h_lockspace = r->res_ls->ls_global_id;
2108 ms->m_header.h_nodeid = dlm_our_nodeid();
2109 ms->m_header.h_length = mb_len;
2110 ms->m_header.h_cmd = DLM_MSG;
2111
2112 ms->m_type = mstype;
2113
2114 *mh_ret = mh;
2115 *ms_ret = ms;
2116 return 0;
2117}
2118
2119/* further lowcomms enhancements or alternate implementations may make
2120 the return value from this function useful at some point */
2121
2122static int send_message(struct dlm_mhandle *mh, struct dlm_message *ms)
2123{
2124 dlm_message_out(ms);
2125 dlm_lowcomms_commit_buffer(mh);
2126 return 0;
2127}
2128
2129static void send_args(struct dlm_rsb *r, struct dlm_lkb *lkb,
2130 struct dlm_message *ms)
2131{
2132 ms->m_nodeid = lkb->lkb_nodeid;
2133 ms->m_pid = lkb->lkb_ownpid;
2134 ms->m_lkid = lkb->lkb_id;
2135 ms->m_remid = lkb->lkb_remid;
2136 ms->m_exflags = lkb->lkb_exflags;
2137 ms->m_sbflags = lkb->lkb_sbflags;
2138 ms->m_flags = lkb->lkb_flags;
2139 ms->m_lvbseq = lkb->lkb_lvbseq;
2140 ms->m_status = lkb->lkb_status;
2141 ms->m_grmode = lkb->lkb_grmode;
2142 ms->m_rqmode = lkb->lkb_rqmode;
2143 ms->m_hash = r->res_hash;
2144
2145 /* m_result and m_bastmode are set from function args,
2146 not from lkb fields */
2147
2148 if (lkb->lkb_bastaddr)
2149 ms->m_asts |= AST_BAST;
2150 if (lkb->lkb_astaddr)
2151 ms->m_asts |= AST_COMP;
2152
da49f36f
DT
2153 /* compare with switch in create_message; send_remove() doesn't
2154 use send_args() */
e7fd4179 2155
da49f36f
DT
2156 switch (ms->m_type) {
2157 case DLM_MSG_REQUEST:
2158 case DLM_MSG_LOOKUP:
2159 memcpy(ms->m_extra, r->res_name, r->res_length);
2160 break;
2161 case DLM_MSG_CONVERT:
2162 case DLM_MSG_UNLOCK:
2163 case DLM_MSG_REQUEST_REPLY:
2164 case DLM_MSG_CONVERT_REPLY:
2165 case DLM_MSG_GRANT:
2166 if (!lkb->lkb_lvbptr)
2167 break;
e7fd4179 2168 memcpy(ms->m_extra, lkb->lkb_lvbptr, r->res_ls->ls_lvblen);
da49f36f
DT
2169 break;
2170 }
e7fd4179
DT
2171}
2172
2173static int send_common(struct dlm_rsb *r, struct dlm_lkb *lkb, int mstype)
2174{
2175 struct dlm_message *ms;
2176 struct dlm_mhandle *mh;
2177 int to_nodeid, error;
2178
2179 add_to_waiters(lkb, mstype);
2180
2181 to_nodeid = r->res_nodeid;
2182
2183 error = create_message(r, lkb, to_nodeid, mstype, &ms, &mh);
2184 if (error)
2185 goto fail;
2186
2187 send_args(r, lkb, ms);
2188
2189 error = send_message(mh, ms);
2190 if (error)
2191 goto fail;
2192 return 0;
2193
2194 fail:
2195 remove_from_waiters(lkb);
2196 return error;
2197}
2198
2199static int send_request(struct dlm_rsb *r, struct dlm_lkb *lkb)
2200{
2201 return send_common(r, lkb, DLM_MSG_REQUEST);
2202}
2203
2204static int send_convert(struct dlm_rsb *r, struct dlm_lkb *lkb)
2205{
2206 int error;
2207
2208 error = send_common(r, lkb, DLM_MSG_CONVERT);
2209
2210 /* down conversions go without a reply from the master */
2211 if (!error && down_conversion(lkb)) {
2212 remove_from_waiters(lkb);
2213 r->res_ls->ls_stub_ms.m_result = 0;
32f105a1 2214 r->res_ls->ls_stub_ms.m_flags = lkb->lkb_flags;
e7fd4179
DT
2215 __receive_convert_reply(r, lkb, &r->res_ls->ls_stub_ms);
2216 }
2217
2218 return error;
2219}
2220
2221/* FIXME: if this lkb is the only lock we hold on the rsb, then set
2222 MASTER_UNCERTAIN to force the next request on the rsb to confirm
2223 that the master is still correct. */
2224
2225static int send_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2226{
2227 return send_common(r, lkb, DLM_MSG_UNLOCK);
2228}
2229
2230static int send_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb)
2231{
2232 return send_common(r, lkb, DLM_MSG_CANCEL);
2233}
2234
2235static int send_grant(struct dlm_rsb *r, struct dlm_lkb *lkb)
2236{
2237 struct dlm_message *ms;
2238 struct dlm_mhandle *mh;
2239 int to_nodeid, error;
2240
2241 to_nodeid = lkb->lkb_nodeid;
2242
2243 error = create_message(r, lkb, to_nodeid, DLM_MSG_GRANT, &ms, &mh);
2244 if (error)
2245 goto out;
2246
2247 send_args(r, lkb, ms);
2248
2249 ms->m_result = 0;
2250
2251 error = send_message(mh, ms);
2252 out:
2253 return error;
2254}
2255
2256static int send_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int mode)
2257{
2258 struct dlm_message *ms;
2259 struct dlm_mhandle *mh;
2260 int to_nodeid, error;
2261
2262 to_nodeid = lkb->lkb_nodeid;
2263
2264 error = create_message(r, NULL, to_nodeid, DLM_MSG_BAST, &ms, &mh);
2265 if (error)
2266 goto out;
2267
2268 send_args(r, lkb, ms);
2269
2270 ms->m_bastmode = mode;
2271
2272 error = send_message(mh, ms);
2273 out:
2274 return error;
2275}
2276
2277static int send_lookup(struct dlm_rsb *r, struct dlm_lkb *lkb)
2278{
2279 struct dlm_message *ms;
2280 struct dlm_mhandle *mh;
2281 int to_nodeid, error;
2282
2283 add_to_waiters(lkb, DLM_MSG_LOOKUP);
2284
2285 to_nodeid = dlm_dir_nodeid(r);
2286
2287 error = create_message(r, NULL, to_nodeid, DLM_MSG_LOOKUP, &ms, &mh);
2288 if (error)
2289 goto fail;
2290
2291 send_args(r, lkb, ms);
2292
2293 error = send_message(mh, ms);
2294 if (error)
2295 goto fail;
2296 return 0;
2297
2298 fail:
2299 remove_from_waiters(lkb);
2300 return error;
2301}
2302
2303static int send_remove(struct dlm_rsb *r)
2304{
2305 struct dlm_message *ms;
2306 struct dlm_mhandle *mh;
2307 int to_nodeid, error;
2308
2309 to_nodeid = dlm_dir_nodeid(r);
2310
2311 error = create_message(r, NULL, to_nodeid, DLM_MSG_REMOVE, &ms, &mh);
2312 if (error)
2313 goto out;
2314
2315 memcpy(ms->m_extra, r->res_name, r->res_length);
2316 ms->m_hash = r->res_hash;
2317
2318 error = send_message(mh, ms);
2319 out:
2320 return error;
2321}
2322
2323static int send_common_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
2324 int mstype, int rv)
2325{
2326 struct dlm_message *ms;
2327 struct dlm_mhandle *mh;
2328 int to_nodeid, error;
2329
2330 to_nodeid = lkb->lkb_nodeid;
2331
2332 error = create_message(r, lkb, to_nodeid, mstype, &ms, &mh);
2333 if (error)
2334 goto out;
2335
2336 send_args(r, lkb, ms);
2337
2338 ms->m_result = rv;
2339
2340 error = send_message(mh, ms);
2341 out:
2342 return error;
2343}
2344
2345static int send_request_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
2346{
2347 return send_common_reply(r, lkb, DLM_MSG_REQUEST_REPLY, rv);
2348}
2349
2350static int send_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
2351{
2352 return send_common_reply(r, lkb, DLM_MSG_CONVERT_REPLY, rv);
2353}
2354
2355static int send_unlock_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
2356{
2357 return send_common_reply(r, lkb, DLM_MSG_UNLOCK_REPLY, rv);
2358}
2359
2360static int send_cancel_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
2361{
2362 return send_common_reply(r, lkb, DLM_MSG_CANCEL_REPLY, rv);
2363}
2364
2365static int send_lookup_reply(struct dlm_ls *ls, struct dlm_message *ms_in,
2366 int ret_nodeid, int rv)
2367{
2368 struct dlm_rsb *r = &ls->ls_stub_rsb;
2369 struct dlm_message *ms;
2370 struct dlm_mhandle *mh;
2371 int error, nodeid = ms_in->m_header.h_nodeid;
2372
2373 error = create_message(r, NULL, nodeid, DLM_MSG_LOOKUP_REPLY, &ms, &mh);
2374 if (error)
2375 goto out;
2376
2377 ms->m_lkid = ms_in->m_lkid;
2378 ms->m_result = rv;
2379 ms->m_nodeid = ret_nodeid;
2380
2381 error = send_message(mh, ms);
2382 out:
2383 return error;
2384}
2385
2386/* which args we save from a received message depends heavily on the type
2387 of message, unlike the send side where we can safely send everything about
2388 the lkb for any type of message */
2389
2390static void receive_flags(struct dlm_lkb *lkb, struct dlm_message *ms)
2391{
2392 lkb->lkb_exflags = ms->m_exflags;
6f90a8b1 2393 lkb->lkb_sbflags = ms->m_sbflags;
e7fd4179
DT
2394 lkb->lkb_flags = (lkb->lkb_flags & 0xFFFF0000) |
2395 (ms->m_flags & 0x0000FFFF);
2396}
2397
2398static void receive_flags_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
2399{
2400 lkb->lkb_sbflags = ms->m_sbflags;
2401 lkb->lkb_flags = (lkb->lkb_flags & 0xFFFF0000) |
2402 (ms->m_flags & 0x0000FFFF);
2403}
2404
2405static int receive_extralen(struct dlm_message *ms)
2406{
2407 return (ms->m_header.h_length - sizeof(struct dlm_message));
2408}
2409
e7fd4179
DT
2410static int receive_lvb(struct dlm_ls *ls, struct dlm_lkb *lkb,
2411 struct dlm_message *ms)
2412{
2413 int len;
2414
2415 if (lkb->lkb_exflags & DLM_LKF_VALBLK) {
2416 if (!lkb->lkb_lvbptr)
2417 lkb->lkb_lvbptr = allocate_lvb(ls);
2418 if (!lkb->lkb_lvbptr)
2419 return -ENOMEM;
2420 len = receive_extralen(ms);
2421 memcpy(lkb->lkb_lvbptr, ms->m_extra, len);
2422 }
2423 return 0;
2424}
2425
2426static int receive_request_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
2427 struct dlm_message *ms)
2428{
2429 lkb->lkb_nodeid = ms->m_header.h_nodeid;
2430 lkb->lkb_ownpid = ms->m_pid;
2431 lkb->lkb_remid = ms->m_lkid;
2432 lkb->lkb_grmode = DLM_LOCK_IV;
2433 lkb->lkb_rqmode = ms->m_rqmode;
2434 lkb->lkb_bastaddr = (void *) (long) (ms->m_asts & AST_BAST);
2435 lkb->lkb_astaddr = (void *) (long) (ms->m_asts & AST_COMP);
2436
2437 DLM_ASSERT(is_master_copy(lkb), dlm_print_lkb(lkb););
2438
8d07fd50
DT
2439 if (lkb->lkb_exflags & DLM_LKF_VALBLK) {
2440 /* lkb was just created so there won't be an lvb yet */
2441 lkb->lkb_lvbptr = allocate_lvb(ls);
2442 if (!lkb->lkb_lvbptr)
2443 return -ENOMEM;
2444 }
e7fd4179
DT
2445
2446 return 0;
2447}
2448
2449static int receive_convert_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
2450 struct dlm_message *ms)
2451{
2452 if (lkb->lkb_nodeid != ms->m_header.h_nodeid) {
2453 log_error(ls, "convert_args nodeid %d %d lkid %x %x",
2454 lkb->lkb_nodeid, ms->m_header.h_nodeid,
2455 lkb->lkb_id, lkb->lkb_remid);
2456 return -EINVAL;
2457 }
2458
2459 if (!is_master_copy(lkb))
2460 return -EINVAL;
2461
2462 if (lkb->lkb_status != DLM_LKSTS_GRANTED)
2463 return -EBUSY;
2464
e7fd4179
DT
2465 if (receive_lvb(ls, lkb, ms))
2466 return -ENOMEM;
2467
2468 lkb->lkb_rqmode = ms->m_rqmode;
2469 lkb->lkb_lvbseq = ms->m_lvbseq;
2470
2471 return 0;
2472}
2473
2474static int receive_unlock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
2475 struct dlm_message *ms)
2476{
2477 if (!is_master_copy(lkb))
2478 return -EINVAL;
2479 if (receive_lvb(ls, lkb, ms))
2480 return -ENOMEM;
2481 return 0;
2482}
2483
2484/* We fill in the stub-lkb fields with the info that send_xxxx_reply()
2485 uses to send a reply and that the remote end uses to process the reply. */
2486
2487static void setup_stub_lkb(struct dlm_ls *ls, struct dlm_message *ms)
2488{
2489 struct dlm_lkb *lkb = &ls->ls_stub_lkb;
2490 lkb->lkb_nodeid = ms->m_header.h_nodeid;
2491 lkb->lkb_remid = ms->m_lkid;
2492}
2493
2494static void receive_request(struct dlm_ls *ls, struct dlm_message *ms)
2495{
2496 struct dlm_lkb *lkb;
2497 struct dlm_rsb *r;
2498 int error, namelen;
2499
2500 error = create_lkb(ls, &lkb);
2501 if (error)
2502 goto fail;
2503
2504 receive_flags(lkb, ms);
2505 lkb->lkb_flags |= DLM_IFL_MSTCPY;
2506 error = receive_request_args(ls, lkb, ms);
2507 if (error) {
b3f58d8f 2508 __put_lkb(ls, lkb);
e7fd4179
DT
2509 goto fail;
2510 }
2511
2512 namelen = receive_extralen(ms);
2513
2514 error = find_rsb(ls, ms->m_extra, namelen, R_MASTER, &r);
2515 if (error) {
b3f58d8f 2516 __put_lkb(ls, lkb);
e7fd4179
DT
2517 goto fail;
2518 }
2519
2520 lock_rsb(r);
2521
2522 attach_lkb(r, lkb);
2523 error = do_request(r, lkb);
2524 send_request_reply(r, lkb, error);
2525
2526 unlock_rsb(r);
2527 put_rsb(r);
2528
2529 if (error == -EINPROGRESS)
2530 error = 0;
2531 if (error)
b3f58d8f 2532 dlm_put_lkb(lkb);
e7fd4179
DT
2533 return;
2534
2535 fail:
2536 setup_stub_lkb(ls, ms);
2537 send_request_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
2538}
2539
2540static void receive_convert(struct dlm_ls *ls, struct dlm_message *ms)
2541{
2542 struct dlm_lkb *lkb;
2543 struct dlm_rsb *r;
90135925 2544 int error, reply = 1;
e7fd4179
DT
2545
2546 error = find_lkb(ls, ms->m_remid, &lkb);
2547 if (error)
2548 goto fail;
2549
2550 r = lkb->lkb_resource;
2551
2552 hold_rsb(r);
2553 lock_rsb(r);
2554
2555 receive_flags(lkb, ms);
2556 error = receive_convert_args(ls, lkb, ms);
2557 if (error)
2558 goto out;
2559 reply = !down_conversion(lkb);
2560
2561 error = do_convert(r, lkb);
2562 out:
2563 if (reply)
2564 send_convert_reply(r, lkb, error);
2565
2566 unlock_rsb(r);
2567 put_rsb(r);
b3f58d8f 2568 dlm_put_lkb(lkb);
e7fd4179
DT
2569 return;
2570
2571 fail:
2572 setup_stub_lkb(ls, ms);
2573 send_convert_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
2574}
2575
2576static void receive_unlock(struct dlm_ls *ls, struct dlm_message *ms)
2577{
2578 struct dlm_lkb *lkb;
2579 struct dlm_rsb *r;
2580 int error;
2581
2582 error = find_lkb(ls, ms->m_remid, &lkb);
2583 if (error)
2584 goto fail;
2585
2586 r = lkb->lkb_resource;
2587
2588 hold_rsb(r);
2589 lock_rsb(r);
2590
2591 receive_flags(lkb, ms);
2592 error = receive_unlock_args(ls, lkb, ms);
2593 if (error)
2594 goto out;
2595
2596 error = do_unlock(r, lkb);
2597 out:
2598 send_unlock_reply(r, lkb, error);
2599
2600 unlock_rsb(r);
2601 put_rsb(r);
b3f58d8f 2602 dlm_put_lkb(lkb);
e7fd4179
DT
2603 return;
2604
2605 fail:
2606 setup_stub_lkb(ls, ms);
2607 send_unlock_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
2608}
2609
2610static void receive_cancel(struct dlm_ls *ls, struct dlm_message *ms)
2611{
2612 struct dlm_lkb *lkb;
2613 struct dlm_rsb *r;
2614 int error;
2615
2616 error = find_lkb(ls, ms->m_remid, &lkb);
2617 if (error)
2618 goto fail;
2619
2620 receive_flags(lkb, ms);
2621
2622 r = lkb->lkb_resource;
2623
2624 hold_rsb(r);
2625 lock_rsb(r);
2626
2627 error = do_cancel(r, lkb);
2628 send_cancel_reply(r, lkb, error);
2629
2630 unlock_rsb(r);
2631 put_rsb(r);
b3f58d8f 2632 dlm_put_lkb(lkb);
e7fd4179
DT
2633 return;
2634
2635 fail:
2636 setup_stub_lkb(ls, ms);
2637 send_cancel_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
2638}
2639
2640static void receive_grant(struct dlm_ls *ls, struct dlm_message *ms)
2641{
2642 struct dlm_lkb *lkb;
2643 struct dlm_rsb *r;
2644 int error;
2645
2646 error = find_lkb(ls, ms->m_remid, &lkb);
2647 if (error) {
2648 log_error(ls, "receive_grant no lkb");
2649 return;
2650 }
2651 DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb););
2652
2653 r = lkb->lkb_resource;
2654
2655 hold_rsb(r);
2656 lock_rsb(r);
2657
2658 receive_flags_reply(lkb, ms);
2659 grant_lock_pc(r, lkb, ms);
2660 queue_cast(r, lkb, 0);
2661
2662 unlock_rsb(r);
2663 put_rsb(r);
b3f58d8f 2664 dlm_put_lkb(lkb);
e7fd4179
DT
2665}
2666
2667static void receive_bast(struct dlm_ls *ls, struct dlm_message *ms)
2668{
2669 struct dlm_lkb *lkb;
2670 struct dlm_rsb *r;
2671 int error;
2672
2673 error = find_lkb(ls, ms->m_remid, &lkb);
2674 if (error) {
2675 log_error(ls, "receive_bast no lkb");
2676 return;
2677 }
2678 DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb););
2679
2680 r = lkb->lkb_resource;
2681
2682 hold_rsb(r);
2683 lock_rsb(r);
2684
2685 queue_bast(r, lkb, ms->m_bastmode);
2686
2687 unlock_rsb(r);
2688 put_rsb(r);
b3f58d8f 2689 dlm_put_lkb(lkb);
e7fd4179
DT
2690}
2691
2692static void receive_lookup(struct dlm_ls *ls, struct dlm_message *ms)
2693{
2694 int len, error, ret_nodeid, dir_nodeid, from_nodeid, our_nodeid;
2695
2696 from_nodeid = ms->m_header.h_nodeid;
2697 our_nodeid = dlm_our_nodeid();
2698
2699 len = receive_extralen(ms);
2700
2701 dir_nodeid = dlm_hash2nodeid(ls, ms->m_hash);
2702 if (dir_nodeid != our_nodeid) {
2703 log_error(ls, "lookup dir_nodeid %d from %d",
2704 dir_nodeid, from_nodeid);
2705 error = -EINVAL;
2706 ret_nodeid = -1;
2707 goto out;
2708 }
2709
2710 error = dlm_dir_lookup(ls, from_nodeid, ms->m_extra, len, &ret_nodeid);
2711
2712 /* Optimization: we're master so treat lookup as a request */
2713 if (!error && ret_nodeid == our_nodeid) {
2714 receive_request(ls, ms);
2715 return;
2716 }
2717 out:
2718 send_lookup_reply(ls, ms, ret_nodeid, error);
2719}
2720
2721static void receive_remove(struct dlm_ls *ls, struct dlm_message *ms)
2722{
2723 int len, dir_nodeid, from_nodeid;
2724
2725 from_nodeid = ms->m_header.h_nodeid;
2726
2727 len = receive_extralen(ms);
2728
2729 dir_nodeid = dlm_hash2nodeid(ls, ms->m_hash);
2730 if (dir_nodeid != dlm_our_nodeid()) {
2731 log_error(ls, "remove dir entry dir_nodeid %d from %d",
2732 dir_nodeid, from_nodeid);
2733 return;
2734 }
2735
2736 dlm_dir_remove_entry(ls, from_nodeid, ms->m_extra, len);
2737}
2738
2739static void receive_request_reply(struct dlm_ls *ls, struct dlm_message *ms)
2740{
2741 struct dlm_lkb *lkb;
2742 struct dlm_rsb *r;
2743 int error, mstype;
2744
2745 error = find_lkb(ls, ms->m_remid, &lkb);
2746 if (error) {
2747 log_error(ls, "receive_request_reply no lkb");
2748 return;
2749 }
2750 DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb););
2751
2752 mstype = lkb->lkb_wait_type;
2753 error = remove_from_waiters(lkb);
2754 if (error) {
2755 log_error(ls, "receive_request_reply not on waiters");
2756 goto out;
2757 }
2758
2759 /* this is the value returned from do_request() on the master */
2760 error = ms->m_result;
2761
2762 r = lkb->lkb_resource;
2763 hold_rsb(r);
2764 lock_rsb(r);
2765
2766 /* Optimization: the dir node was also the master, so it took our
2767 lookup as a request and sent request reply instead of lookup reply */
2768 if (mstype == DLM_MSG_LOOKUP) {
2769 r->res_nodeid = ms->m_header.h_nodeid;
2770 lkb->lkb_nodeid = r->res_nodeid;
2771 }
2772
2773 switch (error) {
2774 case -EAGAIN:
2775 /* request would block (be queued) on remote master;
2776 the unhold undoes the original ref from create_lkb()
2777 so it leads to the lkb being freed */
2778 queue_cast(r, lkb, -EAGAIN);
2779 confirm_master(r, -EAGAIN);
2780 unhold_lkb(lkb);
2781 break;
2782
2783 case -EINPROGRESS:
2784 case 0:
2785 /* request was queued or granted on remote master */
2786 receive_flags_reply(lkb, ms);
2787 lkb->lkb_remid = ms->m_lkid;
2788 if (error)
2789 add_lkb(r, lkb, DLM_LKSTS_WAITING);
2790 else {
2791 grant_lock_pc(r, lkb, ms);
2792 queue_cast(r, lkb, 0);
2793 }
2794 confirm_master(r, error);
2795 break;
2796
597d0cae 2797 case -EBADR:
e7fd4179
DT
2798 case -ENOTBLK:
2799 /* find_rsb failed to find rsb or rsb wasn't master */
2800 r->res_nodeid = -1;
2801 lkb->lkb_nodeid = -1;
2802 _request_lock(r, lkb);
2803 break;
2804
2805 default:
2806 log_error(ls, "receive_request_reply error %d", error);
2807 }
2808
2809 unlock_rsb(r);
2810 put_rsb(r);
2811 out:
b3f58d8f 2812 dlm_put_lkb(lkb);
e7fd4179
DT
2813}
2814
2815static void __receive_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
2816 struct dlm_message *ms)
2817{
2818 int error = ms->m_result;
2819
2820 /* this is the value returned from do_convert() on the master */
2821
2822 switch (error) {
2823 case -EAGAIN:
2824 /* convert would block (be queued) on remote master */
2825 queue_cast(r, lkb, -EAGAIN);
2826 break;
2827
2828 case -EINPROGRESS:
2829 /* convert was queued on remote master */
2830 del_lkb(r, lkb);
2831 add_lkb(r, lkb, DLM_LKSTS_CONVERT);
2832 break;
2833
2834 case 0:
2835 /* convert was granted on remote master */
2836 receive_flags_reply(lkb, ms);
2837 grant_lock_pc(r, lkb, ms);
2838 queue_cast(r, lkb, 0);
2839 break;
2840
2841 default:
2842 log_error(r->res_ls, "receive_convert_reply error %d", error);
2843 }
2844}
2845
2846static void _receive_convert_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
2847{
2848 struct dlm_rsb *r = lkb->lkb_resource;
2849
2850 hold_rsb(r);
2851 lock_rsb(r);
2852
2853 __receive_convert_reply(r, lkb, ms);
2854
2855 unlock_rsb(r);
2856 put_rsb(r);
2857}
2858
2859static void receive_convert_reply(struct dlm_ls *ls, struct dlm_message *ms)
2860{
2861 struct dlm_lkb *lkb;
2862 int error;
2863
2864 error = find_lkb(ls, ms->m_remid, &lkb);
2865 if (error) {
2866 log_error(ls, "receive_convert_reply no lkb");
2867 return;
2868 }
2869 DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb););
2870
2871 error = remove_from_waiters(lkb);
2872 if (error) {
2873 log_error(ls, "receive_convert_reply not on waiters");
2874 goto out;
2875 }
2876
2877 _receive_convert_reply(lkb, ms);
2878 out:
b3f58d8f 2879 dlm_put_lkb(lkb);
e7fd4179
DT
2880}
2881
2882static void _receive_unlock_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
2883{
2884 struct dlm_rsb *r = lkb->lkb_resource;
2885 int error = ms->m_result;
2886
2887 hold_rsb(r);
2888 lock_rsb(r);
2889
2890 /* this is the value returned from do_unlock() on the master */
2891
2892 switch (error) {
2893 case -DLM_EUNLOCK:
2894 receive_flags_reply(lkb, ms);
2895 remove_lock_pc(r, lkb);
2896 queue_cast(r, lkb, -DLM_EUNLOCK);
2897 break;
2898 default:
2899 log_error(r->res_ls, "receive_unlock_reply error %d", error);
2900 }
2901
2902 unlock_rsb(r);
2903 put_rsb(r);
2904}
2905
2906static void receive_unlock_reply(struct dlm_ls *ls, struct dlm_message *ms)
2907{
2908 struct dlm_lkb *lkb;
2909 int error;
2910
2911 error = find_lkb(ls, ms->m_remid, &lkb);
2912 if (error) {
2913 log_error(ls, "receive_unlock_reply no lkb");
2914 return;
2915 }
2916 DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb););
2917
2918 error = remove_from_waiters(lkb);
2919 if (error) {
2920 log_error(ls, "receive_unlock_reply not on waiters");
2921 goto out;
2922 }
2923
2924 _receive_unlock_reply(lkb, ms);
2925 out:
b3f58d8f 2926 dlm_put_lkb(lkb);
e7fd4179
DT
2927}
2928
2929static void _receive_cancel_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
2930{
2931 struct dlm_rsb *r = lkb->lkb_resource;
2932 int error = ms->m_result;
2933
2934 hold_rsb(r);
2935 lock_rsb(r);
2936
2937 /* this is the value returned from do_cancel() on the master */
2938
2939 switch (error) {
2940 case -DLM_ECANCEL:
2941 receive_flags_reply(lkb, ms);
2942 revert_lock_pc(r, lkb);
2943 queue_cast(r, lkb, -DLM_ECANCEL);
2944 break;
2945 default:
2946 log_error(r->res_ls, "receive_cancel_reply error %d", error);
2947 }
2948
2949 unlock_rsb(r);
2950 put_rsb(r);
2951}
2952
2953static void receive_cancel_reply(struct dlm_ls *ls, struct dlm_message *ms)
2954{
2955 struct dlm_lkb *lkb;
2956 int error;
2957
2958 error = find_lkb(ls, ms->m_remid, &lkb);
2959 if (error) {
2960 log_error(ls, "receive_cancel_reply no lkb");
2961 return;
2962 }
2963 DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb););
2964
2965 error = remove_from_waiters(lkb);
2966 if (error) {
2967 log_error(ls, "receive_cancel_reply not on waiters");
2968 goto out;
2969 }
2970
2971 _receive_cancel_reply(lkb, ms);
2972 out:
b3f58d8f 2973 dlm_put_lkb(lkb);
e7fd4179
DT
2974}
2975
2976static void receive_lookup_reply(struct dlm_ls *ls, struct dlm_message *ms)
2977{
2978 struct dlm_lkb *lkb;
2979 struct dlm_rsb *r;
2980 int error, ret_nodeid;
2981
2982 error = find_lkb(ls, ms->m_lkid, &lkb);
2983 if (error) {
2984 log_error(ls, "receive_lookup_reply no lkb");
2985 return;
2986 }
2987
2988 error = remove_from_waiters(lkb);
2989 if (error) {
2990 log_error(ls, "receive_lookup_reply not on waiters");
2991 goto out;
2992 }
2993
2994 /* this is the value returned by dlm_dir_lookup on dir node
2995 FIXME: will a non-zero error ever be returned? */
2996 error = ms->m_result;
2997
2998 r = lkb->lkb_resource;
2999 hold_rsb(r);
3000 lock_rsb(r);
3001
3002 ret_nodeid = ms->m_nodeid;
3003 if (ret_nodeid == dlm_our_nodeid()) {
3004 r->res_nodeid = 0;
3005 ret_nodeid = 0;
3006 r->res_first_lkid = 0;
3007 } else {
3008 /* set_master() will copy res_nodeid to lkb_nodeid */
3009 r->res_nodeid = ret_nodeid;
3010 }
3011
3012 _request_lock(r, lkb);
3013
3014 if (!ret_nodeid)
3015 process_lookup_list(r);
3016
3017 unlock_rsb(r);
3018 put_rsb(r);
3019 out:
b3f58d8f 3020 dlm_put_lkb(lkb);
e7fd4179
DT
3021}
3022
3023int dlm_receive_message(struct dlm_header *hd, int nodeid, int recovery)
3024{
3025 struct dlm_message *ms = (struct dlm_message *) hd;
3026 struct dlm_ls *ls;
8fd3a98f 3027 int error = 0;
e7fd4179
DT
3028
3029 if (!recovery)
3030 dlm_message_in(ms);
3031
3032 ls = dlm_find_lockspace_global(hd->h_lockspace);
3033 if (!ls) {
3034 log_print("drop message %d from %d for unknown lockspace %d",
3035 ms->m_type, nodeid, hd->h_lockspace);
3036 return -EINVAL;
3037 }
3038
3039 /* recovery may have just ended leaving a bunch of backed-up requests
3040 in the requestqueue; wait while dlm_recoverd clears them */
3041
3042 if (!recovery)
3043 dlm_wait_requestqueue(ls);
3044
3045 /* recovery may have just started while there were a bunch of
3046 in-flight requests -- save them in requestqueue to be processed
3047 after recovery. we can't let dlm_recvd block on the recovery
3048 lock. if dlm_recoverd is calling this function to clear the
3049 requestqueue, it needs to be interrupted (-EINTR) if another
3050 recovery operation is starting. */
3051
3052 while (1) {
3053 if (dlm_locking_stopped(ls)) {
d4400156
DT
3054 if (recovery) {
3055 error = -EINTR;
3056 goto out;
3057 }
3058 error = dlm_add_requestqueue(ls, nodeid, hd);
3059 if (error == -EAGAIN)
3060 continue;
3061 else {
3062 error = -EINTR;
3063 goto out;
3064 }
e7fd4179
DT
3065 }
3066
3067 if (lock_recovery_try(ls))
3068 break;
3069 schedule();
3070 }
3071
3072 switch (ms->m_type) {
3073
3074 /* messages sent to a master node */
3075
3076 case DLM_MSG_REQUEST:
3077 receive_request(ls, ms);
3078 break;
3079
3080 case DLM_MSG_CONVERT:
3081 receive_convert(ls, ms);
3082 break;
3083
3084 case DLM_MSG_UNLOCK:
3085 receive_unlock(ls, ms);
3086 break;
3087
3088 case DLM_MSG_CANCEL:
3089 receive_cancel(ls, ms);
3090 break;
3091
3092 /* messages sent from a master node (replies to above) */
3093
3094 case DLM_MSG_REQUEST_REPLY:
3095 receive_request_reply(ls, ms);
3096 break;
3097
3098 case DLM_MSG_CONVERT_REPLY:
3099 receive_convert_reply(ls, ms);
3100 break;
3101
3102 case DLM_MSG_UNLOCK_REPLY:
3103 receive_unlock_reply(ls, ms);
3104 break;
3105
3106 case DLM_MSG_CANCEL_REPLY:
3107 receive_cancel_reply(ls, ms);
3108 break;
3109
3110 /* messages sent from a master node (only two types of async msg) */
3111
3112 case DLM_MSG_GRANT:
3113 receive_grant(ls, ms);
3114 break;
3115
3116 case DLM_MSG_BAST:
3117 receive_bast(ls, ms);
3118 break;
3119
3120 /* messages sent to a dir node */
3121
3122 case DLM_MSG_LOOKUP:
3123 receive_lookup(ls, ms);
3124 break;
3125
3126 case DLM_MSG_REMOVE:
3127 receive_remove(ls, ms);
3128 break;
3129
3130 /* messages sent from a dir node (remove has no reply) */
3131
3132 case DLM_MSG_LOOKUP_REPLY:
3133 receive_lookup_reply(ls, ms);
3134 break;
3135
3136 default:
3137 log_error(ls, "unknown message type %d", ms->m_type);
3138 }
3139
3140 unlock_recovery(ls);
3141 out:
3142 dlm_put_lockspace(ls);
3143 dlm_astd_wake();
8fd3a98f 3144 return error;
e7fd4179
DT
3145}
3146
3147
3148/*
3149 * Recovery related
3150 */
3151
3152static void recover_convert_waiter(struct dlm_ls *ls, struct dlm_lkb *lkb)
3153{
3154 if (middle_conversion(lkb)) {
3155 hold_lkb(lkb);
3156 ls->ls_stub_ms.m_result = -EINPROGRESS;
075529b5 3157 ls->ls_stub_ms.m_flags = lkb->lkb_flags;
e7fd4179
DT
3158 _remove_from_waiters(lkb);
3159 _receive_convert_reply(lkb, &ls->ls_stub_ms);
3160
3161 /* Same special case as in receive_rcom_lock_args() */
3162 lkb->lkb_grmode = DLM_LOCK_IV;
3163 rsb_set_flag(lkb->lkb_resource, RSB_RECOVER_CONVERT);
3164 unhold_lkb(lkb);
3165
3166 } else if (lkb->lkb_rqmode >= lkb->lkb_grmode) {
3167 lkb->lkb_flags |= DLM_IFL_RESEND;
3168 }
3169
3170 /* lkb->lkb_rqmode < lkb->lkb_grmode shouldn't happen since down
3171 conversions are async; there's no reply from the remote master */
3172}
3173
3174/* A waiting lkb needs recovery if the master node has failed, or
3175 the master node is changing (only when no directory is used) */
3176
3177static int waiter_needs_recovery(struct dlm_ls *ls, struct dlm_lkb *lkb)
3178{
3179 if (dlm_is_removed(ls, lkb->lkb_nodeid))
3180 return 1;
3181
3182 if (!dlm_no_directory(ls))
3183 return 0;
3184
3185 if (dlm_dir_nodeid(lkb->lkb_resource) != lkb->lkb_nodeid)
3186 return 1;
3187
3188 return 0;
3189}
3190
3191/* Recovery for locks that are waiting for replies from nodes that are now
3192 gone. We can just complete unlocks and cancels by faking a reply from the
3193 dead node. Requests and up-conversions we flag to be resent after
3194 recovery. Down-conversions can just be completed with a fake reply like
3195 unlocks. Conversions between PR and CW need special attention. */
3196
3197void dlm_recover_waiters_pre(struct dlm_ls *ls)
3198{
3199 struct dlm_lkb *lkb, *safe;
3200
90135925 3201 mutex_lock(&ls->ls_waiters_mutex);
e7fd4179
DT
3202
3203 list_for_each_entry_safe(lkb, safe, &ls->ls_waiters, lkb_wait_reply) {
3204 log_debug(ls, "pre recover waiter lkid %x type %d flags %x",
3205 lkb->lkb_id, lkb->lkb_wait_type, lkb->lkb_flags);
3206
3207 /* all outstanding lookups, regardless of destination will be
3208 resent after recovery is done */
3209
3210 if (lkb->lkb_wait_type == DLM_MSG_LOOKUP) {
3211 lkb->lkb_flags |= DLM_IFL_RESEND;
3212 continue;
3213 }
3214
3215 if (!waiter_needs_recovery(ls, lkb))
3216 continue;
3217
3218 switch (lkb->lkb_wait_type) {
3219
3220 case DLM_MSG_REQUEST:
3221 lkb->lkb_flags |= DLM_IFL_RESEND;
3222 break;
3223
3224 case DLM_MSG_CONVERT:
3225 recover_convert_waiter(ls, lkb);
3226 break;
3227
3228 case DLM_MSG_UNLOCK:
3229 hold_lkb(lkb);
3230 ls->ls_stub_ms.m_result = -DLM_EUNLOCK;
075529b5 3231 ls->ls_stub_ms.m_flags = lkb->lkb_flags;
e7fd4179
DT
3232 _remove_from_waiters(lkb);
3233 _receive_unlock_reply(lkb, &ls->ls_stub_ms);
b3f58d8f 3234 dlm_put_lkb(lkb);
e7fd4179
DT
3235 break;
3236
3237 case DLM_MSG_CANCEL:
3238 hold_lkb(lkb);
3239 ls->ls_stub_ms.m_result = -DLM_ECANCEL;
075529b5 3240 ls->ls_stub_ms.m_flags = lkb->lkb_flags;
e7fd4179
DT
3241 _remove_from_waiters(lkb);
3242 _receive_cancel_reply(lkb, &ls->ls_stub_ms);
b3f58d8f 3243 dlm_put_lkb(lkb);
e7fd4179
DT
3244 break;
3245
3246 default:
3247 log_error(ls, "invalid lkb wait_type %d",
3248 lkb->lkb_wait_type);
3249 }
81456807 3250 schedule();
e7fd4179 3251 }
90135925 3252 mutex_unlock(&ls->ls_waiters_mutex);
e7fd4179
DT
3253}
3254
3255static int remove_resend_waiter(struct dlm_ls *ls, struct dlm_lkb **lkb_ret)
3256{
3257 struct dlm_lkb *lkb;
3258 int rv = 0;
3259
90135925 3260 mutex_lock(&ls->ls_waiters_mutex);
e7fd4179
DT
3261 list_for_each_entry(lkb, &ls->ls_waiters, lkb_wait_reply) {
3262 if (lkb->lkb_flags & DLM_IFL_RESEND) {
3263 rv = lkb->lkb_wait_type;
3264 _remove_from_waiters(lkb);
3265 lkb->lkb_flags &= ~DLM_IFL_RESEND;
3266 break;
3267 }
3268 }
90135925 3269 mutex_unlock(&ls->ls_waiters_mutex);
e7fd4179
DT
3270
3271 if (!rv)
3272 lkb = NULL;
3273 *lkb_ret = lkb;
3274 return rv;
3275}
3276
3277/* Deal with lookups and lkb's marked RESEND from _pre. We may now be the
3278 master or dir-node for r. Processing the lkb may result in it being placed
3279 back on waiters. */
3280
3281int dlm_recover_waiters_post(struct dlm_ls *ls)
3282{
3283 struct dlm_lkb *lkb;
3284 struct dlm_rsb *r;
3285 int error = 0, mstype;
3286
3287 while (1) {
3288 if (dlm_locking_stopped(ls)) {
3289 log_debug(ls, "recover_waiters_post aborted");
3290 error = -EINTR;
3291 break;
3292 }
3293
3294 mstype = remove_resend_waiter(ls, &lkb);
3295 if (!mstype)
3296 break;
3297
3298 r = lkb->lkb_resource;
3299
3300 log_debug(ls, "recover_waiters_post %x type %d flags %x %s",
3301 lkb->lkb_id, mstype, lkb->lkb_flags, r->res_name);
3302
3303 switch (mstype) {
3304
3305 case DLM_MSG_LOOKUP:
3306 hold_rsb(r);
3307 lock_rsb(r);
3308 _request_lock(r, lkb);
3309 if (is_master(r))
3310 confirm_master(r, 0);
3311 unlock_rsb(r);
3312 put_rsb(r);
3313 break;
3314
3315 case DLM_MSG_REQUEST:
3316 hold_rsb(r);
3317 lock_rsb(r);
3318 _request_lock(r, lkb);
fa9f0e49
DT
3319 if (is_master(r))
3320 confirm_master(r, 0);
e7fd4179
DT
3321 unlock_rsb(r);
3322 put_rsb(r);
3323 break;
3324
3325 case DLM_MSG_CONVERT:
3326 hold_rsb(r);
3327 lock_rsb(r);
3328 _convert_lock(r, lkb);
3329 unlock_rsb(r);
3330 put_rsb(r);
3331 break;
3332
3333 default:
3334 log_error(ls, "recover_waiters_post type %d", mstype);
3335 }
3336 }
3337
3338 return error;
3339}
3340
3341static void purge_queue(struct dlm_rsb *r, struct list_head *queue,
3342 int (*test)(struct dlm_ls *ls, struct dlm_lkb *lkb))
3343{
3344 struct dlm_ls *ls = r->res_ls;
3345 struct dlm_lkb *lkb, *safe;
3346
3347 list_for_each_entry_safe(lkb, safe, queue, lkb_statequeue) {
3348 if (test(ls, lkb)) {
97a35d1e 3349 rsb_set_flag(r, RSB_LOCKS_PURGED);
e7fd4179
DT
3350 del_lkb(r, lkb);
3351 /* this put should free the lkb */
b3f58d8f 3352 if (!dlm_put_lkb(lkb))
e7fd4179
DT
3353 log_error(ls, "purged lkb not released");
3354 }
3355 }
3356}
3357
3358static int purge_dead_test(struct dlm_ls *ls, struct dlm_lkb *lkb)
3359{
3360 return (is_master_copy(lkb) && dlm_is_removed(ls, lkb->lkb_nodeid));
3361}
3362
3363static int purge_mstcpy_test(struct dlm_ls *ls, struct dlm_lkb *lkb)
3364{
3365 return is_master_copy(lkb);
3366}
3367
3368static void purge_dead_locks(struct dlm_rsb *r)
3369{
3370 purge_queue(r, &r->res_grantqueue, &purge_dead_test);
3371 purge_queue(r, &r->res_convertqueue, &purge_dead_test);
3372 purge_queue(r, &r->res_waitqueue, &purge_dead_test);
3373}
3374
3375void dlm_purge_mstcpy_locks(struct dlm_rsb *r)
3376{
3377 purge_queue(r, &r->res_grantqueue, &purge_mstcpy_test);
3378 purge_queue(r, &r->res_convertqueue, &purge_mstcpy_test);
3379 purge_queue(r, &r->res_waitqueue, &purge_mstcpy_test);
3380}
3381
3382/* Get rid of locks held by nodes that are gone. */
3383
3384int dlm_purge_locks(struct dlm_ls *ls)
3385{
3386 struct dlm_rsb *r;
3387
3388 log_debug(ls, "dlm_purge_locks");
3389
3390 down_write(&ls->ls_root_sem);
3391 list_for_each_entry(r, &ls->ls_root_list, res_root_list) {
3392 hold_rsb(r);
3393 lock_rsb(r);
3394 if (is_master(r))
3395 purge_dead_locks(r);
3396 unlock_rsb(r);
3397 unhold_rsb(r);
3398
3399 schedule();
3400 }
3401 up_write(&ls->ls_root_sem);
3402
3403 return 0;
3404}
3405
97a35d1e
DT
3406static struct dlm_rsb *find_purged_rsb(struct dlm_ls *ls, int bucket)
3407{
3408 struct dlm_rsb *r, *r_ret = NULL;
3409
3410 read_lock(&ls->ls_rsbtbl[bucket].lock);
3411 list_for_each_entry(r, &ls->ls_rsbtbl[bucket].list, res_hashchain) {
3412 if (!rsb_flag(r, RSB_LOCKS_PURGED))
3413 continue;
3414 hold_rsb(r);
3415 rsb_clear_flag(r, RSB_LOCKS_PURGED);
3416 r_ret = r;
3417 break;
3418 }
3419 read_unlock(&ls->ls_rsbtbl[bucket].lock);
3420 return r_ret;
3421}
3422
3423void dlm_grant_after_purge(struct dlm_ls *ls)
e7fd4179
DT
3424{
3425 struct dlm_rsb *r;
2b4e926a 3426 int bucket = 0;
e7fd4179 3427
2b4e926a
DT
3428 while (1) {
3429 r = find_purged_rsb(ls, bucket);
3430 if (!r) {
3431 if (bucket == ls->ls_rsbtbl_size - 1)
3432 break;
3433 bucket++;
97a35d1e 3434 continue;
2b4e926a 3435 }
97a35d1e
DT
3436 lock_rsb(r);
3437 if (is_master(r)) {
3438 grant_pending_locks(r);
3439 confirm_master(r, 0);
e7fd4179 3440 }
97a35d1e
DT
3441 unlock_rsb(r);
3442 put_rsb(r);
2b4e926a 3443 schedule();
e7fd4179 3444 }
e7fd4179
DT
3445}
3446
3447static struct dlm_lkb *search_remid_list(struct list_head *head, int nodeid,
3448 uint32_t remid)
3449{
3450 struct dlm_lkb *lkb;
3451
3452 list_for_each_entry(lkb, head, lkb_statequeue) {
3453 if (lkb->lkb_nodeid == nodeid && lkb->lkb_remid == remid)
3454 return lkb;
3455 }
3456 return NULL;
3457}
3458
3459static struct dlm_lkb *search_remid(struct dlm_rsb *r, int nodeid,
3460 uint32_t remid)
3461{
3462 struct dlm_lkb *lkb;
3463
3464 lkb = search_remid_list(&r->res_grantqueue, nodeid, remid);
3465 if (lkb)
3466 return lkb;
3467 lkb = search_remid_list(&r->res_convertqueue, nodeid, remid);
3468 if (lkb)
3469 return lkb;
3470 lkb = search_remid_list(&r->res_waitqueue, nodeid, remid);
3471 if (lkb)
3472 return lkb;
3473 return NULL;
3474}
3475
3476static int receive_rcom_lock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
3477 struct dlm_rsb *r, struct dlm_rcom *rc)
3478{
3479 struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf;
3480 int lvblen;
3481
3482 lkb->lkb_nodeid = rc->rc_header.h_nodeid;
3483 lkb->lkb_ownpid = rl->rl_ownpid;
3484 lkb->lkb_remid = rl->rl_lkid;
3485 lkb->lkb_exflags = rl->rl_exflags;
3486 lkb->lkb_flags = rl->rl_flags & 0x0000FFFF;
3487 lkb->lkb_flags |= DLM_IFL_MSTCPY;
3488 lkb->lkb_lvbseq = rl->rl_lvbseq;
3489 lkb->lkb_rqmode = rl->rl_rqmode;
3490 lkb->lkb_grmode = rl->rl_grmode;
3491 /* don't set lkb_status because add_lkb wants to itself */
3492
3493 lkb->lkb_bastaddr = (void *) (long) (rl->rl_asts & AST_BAST);
3494 lkb->lkb_astaddr = (void *) (long) (rl->rl_asts & AST_COMP);
3495
e7fd4179
DT
3496 if (lkb->lkb_exflags & DLM_LKF_VALBLK) {
3497 lkb->lkb_lvbptr = allocate_lvb(ls);
3498 if (!lkb->lkb_lvbptr)
3499 return -ENOMEM;
3500 lvblen = rc->rc_header.h_length - sizeof(struct dlm_rcom) -
3501 sizeof(struct rcom_lock);
3502 memcpy(lkb->lkb_lvbptr, rl->rl_lvb, lvblen);
3503 }
3504
3505 /* Conversions between PR and CW (middle modes) need special handling.
3506 The real granted mode of these converting locks cannot be determined
3507 until all locks have been rebuilt on the rsb (recover_conversion) */
3508
3509 if (rl->rl_wait_type == DLM_MSG_CONVERT && middle_conversion(lkb)) {
3510 rl->rl_status = DLM_LKSTS_CONVERT;
3511 lkb->lkb_grmode = DLM_LOCK_IV;
3512 rsb_set_flag(r, RSB_RECOVER_CONVERT);
3513 }
3514
3515 return 0;
3516}
3517
3518/* This lkb may have been recovered in a previous aborted recovery so we need
3519 to check if the rsb already has an lkb with the given remote nodeid/lkid.
3520 If so we just send back a standard reply. If not, we create a new lkb with
3521 the given values and send back our lkid. We send back our lkid by sending
3522 back the rcom_lock struct we got but with the remid field filled in. */
3523
3524int dlm_recover_master_copy(struct dlm_ls *ls, struct dlm_rcom *rc)
3525{
3526 struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf;
3527 struct dlm_rsb *r;
3528 struct dlm_lkb *lkb;
3529 int error;
3530
3531 if (rl->rl_parent_lkid) {
3532 error = -EOPNOTSUPP;
3533 goto out;
3534 }
3535
3536 error = find_rsb(ls, rl->rl_name, rl->rl_namelen, R_MASTER, &r);
3537 if (error)
3538 goto out;
3539
3540 lock_rsb(r);
3541
3542 lkb = search_remid(r, rc->rc_header.h_nodeid, rl->rl_lkid);
3543 if (lkb) {
3544 error = -EEXIST;
3545 goto out_remid;
3546 }
3547
3548 error = create_lkb(ls, &lkb);
3549 if (error)
3550 goto out_unlock;
3551
3552 error = receive_rcom_lock_args(ls, lkb, r, rc);
3553 if (error) {
b3f58d8f 3554 __put_lkb(ls, lkb);
e7fd4179
DT
3555 goto out_unlock;
3556 }
3557
3558 attach_lkb(r, lkb);
3559 add_lkb(r, lkb, rl->rl_status);
3560 error = 0;
3561
3562 out_remid:
3563 /* this is the new value returned to the lock holder for
3564 saving in its process-copy lkb */
3565 rl->rl_remid = lkb->lkb_id;
3566
3567 out_unlock:
3568 unlock_rsb(r);
3569 put_rsb(r);
3570 out:
3571 if (error)
3572 log_print("recover_master_copy %d %x", error, rl->rl_lkid);
3573 rl->rl_result = error;
3574 return error;
3575}
3576
3577int dlm_recover_process_copy(struct dlm_ls *ls, struct dlm_rcom *rc)
3578{
3579 struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf;
3580 struct dlm_rsb *r;
3581 struct dlm_lkb *lkb;
3582 int error;
3583
3584 error = find_lkb(ls, rl->rl_lkid, &lkb);
3585 if (error) {
3586 log_error(ls, "recover_process_copy no lkid %x", rl->rl_lkid);
3587 return error;
3588 }
3589
3590 DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb););
3591
3592 error = rl->rl_result;
3593
3594 r = lkb->lkb_resource;
3595 hold_rsb(r);
3596 lock_rsb(r);
3597
3598 switch (error) {
dc200a88
DT
3599 case -EBADR:
3600 /* There's a chance the new master received our lock before
3601 dlm_recover_master_reply(), this wouldn't happen if we did
3602 a barrier between recover_masters and recover_locks. */
3603 log_debug(ls, "master copy not ready %x r %lx %s", lkb->lkb_id,
3604 (unsigned long)r, r->res_name);
3605 dlm_send_rcom_lock(r, lkb);
3606 goto out;
e7fd4179
DT
3607 case -EEXIST:
3608 log_debug(ls, "master copy exists %x", lkb->lkb_id);
3609 /* fall through */
3610 case 0:
3611 lkb->lkb_remid = rl->rl_remid;
3612 break;
3613 default:
3614 log_error(ls, "dlm_recover_process_copy unknown error %d %x",
3615 error, lkb->lkb_id);
3616 }
3617
3618 /* an ack for dlm_recover_locks() which waits for replies from
3619 all the locks it sends to new masters */
3620 dlm_recovered_lock(r);
dc200a88 3621 out:
e7fd4179
DT
3622 unlock_rsb(r);
3623 put_rsb(r);
b3f58d8f 3624 dlm_put_lkb(lkb);
e7fd4179
DT
3625
3626 return 0;
3627}
3628
597d0cae
DT
3629int dlm_user_request(struct dlm_ls *ls, struct dlm_user_args *ua,
3630 int mode, uint32_t flags, void *name, unsigned int namelen,
3631 uint32_t parent_lkid)
3632{
3633 struct dlm_lkb *lkb;
3634 struct dlm_args args;
3635 int error;
3636
3637 lock_recovery(ls);
3638
3639 error = create_lkb(ls, &lkb);
3640 if (error) {
3641 kfree(ua);
3642 goto out;
3643 }
3644
3645 if (flags & DLM_LKF_VALBLK) {
62a0f623 3646 ua->lksb.sb_lvbptr = kzalloc(DLM_USER_LVB_LEN, GFP_KERNEL);
597d0cae
DT
3647 if (!ua->lksb.sb_lvbptr) {
3648 kfree(ua);
3649 __put_lkb(ls, lkb);
3650 error = -ENOMEM;
3651 goto out;
3652 }
3653 }
3654
3655 /* After ua is attached to lkb it will be freed by free_lkb().
3656 When DLM_IFL_USER is set, the dlm knows that this is a userspace
3657 lock and that lkb_astparam is the dlm_user_args structure. */
3658
3659 error = set_lock_args(mode, &ua->lksb, flags, namelen, parent_lkid,
32f105a1 3660 DLM_FAKE_USER_AST, ua, DLM_FAKE_USER_AST, &args);
597d0cae
DT
3661 lkb->lkb_flags |= DLM_IFL_USER;
3662 ua->old_mode = DLM_LOCK_IV;
3663
3664 if (error) {
3665 __put_lkb(ls, lkb);
3666 goto out;
3667 }
3668
3669 error = request_lock(ls, lkb, name, namelen, &args);
3670
3671 switch (error) {
3672 case 0:
3673 break;
3674 case -EINPROGRESS:
3675 error = 0;
3676 break;
3677 case -EAGAIN:
3678 error = 0;
3679 /* fall through */
3680 default:
3681 __put_lkb(ls, lkb);
3682 goto out;
3683 }
3684
3685 /* add this new lkb to the per-process list of locks */
3686 spin_lock(&ua->proc->locks_spin);
3687 kref_get(&lkb->lkb_ref);
3688 list_add_tail(&lkb->lkb_ownqueue, &ua->proc->locks);
3689 spin_unlock(&ua->proc->locks_spin);
3690 out:
3691 unlock_recovery(ls);
3692 return error;
3693}
3694
3695int dlm_user_convert(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
3696 int mode, uint32_t flags, uint32_t lkid, char *lvb_in)
3697{
3698 struct dlm_lkb *lkb;
3699 struct dlm_args args;
3700 struct dlm_user_args *ua;
3701 int error;
3702
3703 lock_recovery(ls);
3704
3705 error = find_lkb(ls, lkid, &lkb);
3706 if (error)
3707 goto out;
3708
3709 /* user can change the params on its lock when it converts it, or
3710 add an lvb that didn't exist before */
3711
3712 ua = (struct dlm_user_args *)lkb->lkb_astparam;
3713
3714 if (flags & DLM_LKF_VALBLK && !ua->lksb.sb_lvbptr) {
62a0f623 3715 ua->lksb.sb_lvbptr = kzalloc(DLM_USER_LVB_LEN, GFP_KERNEL);
597d0cae
DT
3716 if (!ua->lksb.sb_lvbptr) {
3717 error = -ENOMEM;
3718 goto out_put;
3719 }
3720 }
3721 if (lvb_in && ua->lksb.sb_lvbptr)
3722 memcpy(ua->lksb.sb_lvbptr, lvb_in, DLM_USER_LVB_LEN);
3723
3724 ua->castparam = ua_tmp->castparam;
3725 ua->castaddr = ua_tmp->castaddr;
3726 ua->bastparam = ua_tmp->bastparam;
3727 ua->bastaddr = ua_tmp->bastaddr;
10948eb4 3728 ua->user_lksb = ua_tmp->user_lksb;
597d0cae
DT
3729 ua->old_mode = lkb->lkb_grmode;
3730
32f105a1
DT
3731 error = set_lock_args(mode, &ua->lksb, flags, 0, 0, DLM_FAKE_USER_AST,
3732 ua, DLM_FAKE_USER_AST, &args);
597d0cae
DT
3733 if (error)
3734 goto out_put;
3735
3736 error = convert_lock(ls, lkb, &args);
3737
3738 if (error == -EINPROGRESS || error == -EAGAIN)
3739 error = 0;
3740 out_put:
3741 dlm_put_lkb(lkb);
3742 out:
3743 unlock_recovery(ls);
3744 kfree(ua_tmp);
3745 return error;
3746}
3747
3748int dlm_user_unlock(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
3749 uint32_t flags, uint32_t lkid, char *lvb_in)
3750{
3751 struct dlm_lkb *lkb;
3752 struct dlm_args args;
3753 struct dlm_user_args *ua;
3754 int error;
3755
3756 lock_recovery(ls);
3757
3758 error = find_lkb(ls, lkid, &lkb);
3759 if (error)
3760 goto out;
3761
3762 ua = (struct dlm_user_args *)lkb->lkb_astparam;
3763
3764 if (lvb_in && ua->lksb.sb_lvbptr)
3765 memcpy(ua->lksb.sb_lvbptr, lvb_in, DLM_USER_LVB_LEN);
3766 ua->castparam = ua_tmp->castparam;
cc346d55 3767 ua->user_lksb = ua_tmp->user_lksb;
597d0cae
DT
3768
3769 error = set_unlock_args(flags, ua, &args);
3770 if (error)
3771 goto out_put;
3772
3773 error = unlock_lock(ls, lkb, &args);
3774
3775 if (error == -DLM_EUNLOCK)
3776 error = 0;
3777 if (error)
3778 goto out_put;
3779
3780 spin_lock(&ua->proc->locks_spin);
a1bc86e6
DT
3781 /* dlm_user_add_ast() may have already taken lkb off the proc list */
3782 if (!list_empty(&lkb->lkb_ownqueue))
3783 list_move(&lkb->lkb_ownqueue, &ua->proc->unlocking);
597d0cae 3784 spin_unlock(&ua->proc->locks_spin);
597d0cae
DT
3785 out_put:
3786 dlm_put_lkb(lkb);
3787 out:
3788 unlock_recovery(ls);
3789 return error;
3790}
3791
3792int dlm_user_cancel(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
3793 uint32_t flags, uint32_t lkid)
3794{
3795 struct dlm_lkb *lkb;
3796 struct dlm_args args;
3797 struct dlm_user_args *ua;
3798 int error;
3799
3800 lock_recovery(ls);
3801
3802 error = find_lkb(ls, lkid, &lkb);
3803 if (error)
3804 goto out;
3805
3806 ua = (struct dlm_user_args *)lkb->lkb_astparam;
3807 ua->castparam = ua_tmp->castparam;
c059f70e 3808 ua->user_lksb = ua_tmp->user_lksb;
597d0cae
DT
3809
3810 error = set_unlock_args(flags, ua, &args);
3811 if (error)
3812 goto out_put;
3813
3814 error = cancel_lock(ls, lkb, &args);
3815
3816 if (error == -DLM_ECANCEL)
3817 error = 0;
3818 if (error)
3819 goto out_put;
3820
3821 /* this lkb was removed from the WAITING queue */
3822 if (lkb->lkb_grmode == DLM_LOCK_IV) {
3823 spin_lock(&ua->proc->locks_spin);
a1bc86e6 3824 list_move(&lkb->lkb_ownqueue, &ua->proc->unlocking);
597d0cae 3825 spin_unlock(&ua->proc->locks_spin);
597d0cae
DT
3826 }
3827 out_put:
3828 dlm_put_lkb(lkb);
3829 out:
3830 unlock_recovery(ls);
3831 return error;
3832}
3833
3834static int orphan_proc_lock(struct dlm_ls *ls, struct dlm_lkb *lkb)
3835{
3836 struct dlm_user_args *ua = (struct dlm_user_args *)lkb->lkb_astparam;
3837
3838 if (ua->lksb.sb_lvbptr)
3839 kfree(ua->lksb.sb_lvbptr);
3840 kfree(ua);
3841 lkb->lkb_astparam = (long)NULL;
3842
3843 /* TODO: propogate to master if needed */
3844 return 0;
3845}
3846
3847/* The force flag allows the unlock to go ahead even if the lkb isn't granted.
3848 Regardless of what rsb queue the lock is on, it's removed and freed. */
3849
3850static int unlock_proc_lock(struct dlm_ls *ls, struct dlm_lkb *lkb)
3851{
3852 struct dlm_user_args *ua = (struct dlm_user_args *)lkb->lkb_astparam;
3853 struct dlm_args args;
3854 int error;
3855
3856 /* FIXME: we need to handle the case where the lkb is in limbo
3857 while the rsb is being looked up, currently we assert in
3858 _unlock_lock/is_remote because rsb nodeid is -1. */
3859
3860 set_unlock_args(DLM_LKF_FORCEUNLOCK, ua, &args);
3861
3862 error = unlock_lock(ls, lkb, &args);
3863 if (error == -DLM_EUNLOCK)
3864 error = 0;
3865 return error;
3866}
3867
3868/* The ls_clear_proc_locks mutex protects against dlm_user_add_asts() which
3869 1) references lkb->ua which we free here and 2) adds lkbs to proc->asts,
3870 which we clear here. */
3871
3872/* proc CLOSING flag is set so no more device_reads should look at proc->asts
3873 list, and no more device_writes should add lkb's to proc->locks list; so we
3874 shouldn't need to take asts_spin or locks_spin here. this assumes that
3875 device reads/writes/closes are serialized -- FIXME: we may need to serialize
3876 them ourself. */
3877
3878void dlm_clear_proc_locks(struct dlm_ls *ls, struct dlm_user_proc *proc)
3879{
3880 struct dlm_lkb *lkb, *safe;
3881
3882 lock_recovery(ls);
3883 mutex_lock(&ls->ls_clear_proc_locks);
3884
3885 list_for_each_entry_safe(lkb, safe, &proc->locks, lkb_ownqueue) {
34e22bed 3886 list_del_init(&lkb->lkb_ownqueue);
597d0cae
DT
3887
3888 if (lkb->lkb_exflags & DLM_LKF_PERSISTENT) {
3889 lkb->lkb_flags |= DLM_IFL_ORPHAN;
3890 orphan_proc_lock(ls, lkb);
3891 } else {
3892 lkb->lkb_flags |= DLM_IFL_DEAD;
3893 unlock_proc_lock(ls, lkb);
3894 }
3895
3896 /* this removes the reference for the proc->locks list
3897 added by dlm_user_request, it may result in the lkb
3898 being freed */
3899
3900 dlm_put_lkb(lkb);
3901 }
a1bc86e6
DT
3902
3903 /* in-progress unlocks */
3904 list_for_each_entry_safe(lkb, safe, &proc->unlocking, lkb_ownqueue) {
3905 list_del_init(&lkb->lkb_ownqueue);
3906 lkb->lkb_flags |= DLM_IFL_DEAD;
3907 dlm_put_lkb(lkb);
3908 }
3909
3910 list_for_each_entry_safe(lkb, safe, &proc->asts, lkb_astqueue) {
3911 list_del(&lkb->lkb_astqueue);
3912 dlm_put_lkb(lkb);
3913 }
3914
597d0cae
DT
3915 mutex_unlock(&ls->ls_clear_proc_locks);
3916 unlock_recovery(ls);
3917}
a1bc86e6 3918
This page took 0.519114 seconds and 5 git commands to generate.