[DLM] [RFC: -mm patch] fs/dlm/lock.c: unexport dlm_lvb_operations
[deliverable/linux.git] / fs / dlm / lock.c
CommitLineData
e7fd4179
DT
1/******************************************************************************
2*******************************************************************************
3**
4** Copyright (C) 2005 Red Hat, Inc. All rights reserved.
5**
6** This copyrighted material is made available to anyone wishing to use,
7** modify, copy, or redistribute it subject to the terms and conditions
8** of the GNU General Public License v.2.
9**
10*******************************************************************************
11******************************************************************************/
12
13/* Central locking logic has four stages:
14
15 dlm_lock()
16 dlm_unlock()
17
18 request_lock(ls, lkb)
19 convert_lock(ls, lkb)
20 unlock_lock(ls, lkb)
21 cancel_lock(ls, lkb)
22
23 _request_lock(r, lkb)
24 _convert_lock(r, lkb)
25 _unlock_lock(r, lkb)
26 _cancel_lock(r, lkb)
27
28 do_request(r, lkb)
29 do_convert(r, lkb)
30 do_unlock(r, lkb)
31 do_cancel(r, lkb)
32
33 Stage 1 (lock, unlock) is mainly about checking input args and
34 splitting into one of the four main operations:
35
36 dlm_lock = request_lock
37 dlm_lock+CONVERT = convert_lock
38 dlm_unlock = unlock_lock
39 dlm_unlock+CANCEL = cancel_lock
40
41 Stage 2, xxxx_lock(), just finds and locks the relevant rsb which is
42 provided to the next stage.
43
44 Stage 3, _xxxx_lock(), determines if the operation is local or remote.
45 When remote, it calls send_xxxx(), when local it calls do_xxxx().
46
47 Stage 4, do_xxxx(), is the guts of the operation. It manipulates the
48 given rsb and lkb and queues callbacks.
49
50 For remote operations, send_xxxx() results in the corresponding do_xxxx()
51 function being executed on the remote node. The connecting send/receive
52 calls on local (L) and remote (R) nodes:
53
54 L: send_xxxx() -> R: receive_xxxx()
55 R: do_xxxx()
56 L: receive_xxxx_reply() <- R: send_xxxx_reply()
57*/
597d0cae 58#include <linux/types.h>
e7fd4179 59#include "dlm_internal.h"
597d0cae 60#include <linux/dlm_device.h>
e7fd4179
DT
61#include "memory.h"
62#include "lowcomms.h"
63#include "requestqueue.h"
64#include "util.h"
65#include "dir.h"
66#include "member.h"
67#include "lockspace.h"
68#include "ast.h"
69#include "lock.h"
70#include "rcom.h"
71#include "recover.h"
72#include "lvb_table.h"
597d0cae 73#include "user.h"
e7fd4179
DT
74#include "config.h"
75
76static int send_request(struct dlm_rsb *r, struct dlm_lkb *lkb);
77static int send_convert(struct dlm_rsb *r, struct dlm_lkb *lkb);
78static int send_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb);
79static int send_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb);
80static int send_grant(struct dlm_rsb *r, struct dlm_lkb *lkb);
81static int send_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int mode);
82static int send_lookup(struct dlm_rsb *r, struct dlm_lkb *lkb);
83static int send_remove(struct dlm_rsb *r);
84static int _request_lock(struct dlm_rsb *r, struct dlm_lkb *lkb);
85static void __receive_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
86 struct dlm_message *ms);
87static int receive_extralen(struct dlm_message *ms);
88
597d0cae
DT
89#define FAKE_USER_AST (void*)0xff00ff00
90
e7fd4179
DT
91/*
92 * Lock compatibilty matrix - thanks Steve
93 * UN = Unlocked state. Not really a state, used as a flag
94 * PD = Padding. Used to make the matrix a nice power of two in size
95 * Other states are the same as the VMS DLM.
96 * Usage: matrix[grmode+1][rqmode+1] (although m[rq+1][gr+1] is the same)
97 */
98
99static const int __dlm_compat_matrix[8][8] = {
100 /* UN NL CR CW PR PW EX PD */
101 {1, 1, 1, 1, 1, 1, 1, 0}, /* UN */
102 {1, 1, 1, 1, 1, 1, 1, 0}, /* NL */
103 {1, 1, 1, 1, 1, 1, 0, 0}, /* CR */
104 {1, 1, 1, 1, 0, 0, 0, 0}, /* CW */
105 {1, 1, 1, 0, 1, 0, 0, 0}, /* PR */
106 {1, 1, 1, 0, 0, 0, 0, 0}, /* PW */
107 {1, 1, 0, 0, 0, 0, 0, 0}, /* EX */
108 {0, 0, 0, 0, 0, 0, 0, 0} /* PD */
109};
110
111/*
112 * This defines the direction of transfer of LVB data.
113 * Granted mode is the row; requested mode is the column.
114 * Usage: matrix[grmode+1][rqmode+1]
115 * 1 = LVB is returned to the caller
116 * 0 = LVB is written to the resource
117 * -1 = nothing happens to the LVB
118 */
119
120const int dlm_lvb_operations[8][8] = {
121 /* UN NL CR CW PR PW EX PD*/
122 { -1, 1, 1, 1, 1, 1, 1, -1 }, /* UN */
123 { -1, 1, 1, 1, 1, 1, 1, 0 }, /* NL */
124 { -1, -1, 1, 1, 1, 1, 1, 0 }, /* CR */
125 { -1, -1, -1, 1, 1, 1, 1, 0 }, /* CW */
126 { -1, -1, -1, -1, 1, 1, 1, 0 }, /* PR */
127 { -1, 0, 0, 0, 0, 0, 1, 0 }, /* PW */
128 { -1, 0, 0, 0, 0, 0, 0, 0 }, /* EX */
129 { -1, 0, 0, 0, 0, 0, 0, 0 } /* PD */
130};
e7fd4179
DT
131
132#define modes_compat(gr, rq) \
133 __dlm_compat_matrix[(gr)->lkb_grmode + 1][(rq)->lkb_rqmode + 1]
134
135int dlm_modes_compat(int mode1, int mode2)
136{
137 return __dlm_compat_matrix[mode1 + 1][mode2 + 1];
138}
139
140/*
141 * Compatibility matrix for conversions with QUECVT set.
142 * Granted mode is the row; requested mode is the column.
143 * Usage: matrix[grmode+1][rqmode+1]
144 */
145
146static const int __quecvt_compat_matrix[8][8] = {
147 /* UN NL CR CW PR PW EX PD */
148 {0, 0, 0, 0, 0, 0, 0, 0}, /* UN */
149 {0, 0, 1, 1, 1, 1, 1, 0}, /* NL */
150 {0, 0, 0, 1, 1, 1, 1, 0}, /* CR */
151 {0, 0, 0, 0, 1, 1, 1, 0}, /* CW */
152 {0, 0, 0, 1, 0, 1, 1, 0}, /* PR */
153 {0, 0, 0, 0, 0, 0, 1, 0}, /* PW */
154 {0, 0, 0, 0, 0, 0, 0, 0}, /* EX */
155 {0, 0, 0, 0, 0, 0, 0, 0} /* PD */
156};
157
597d0cae 158void dlm_print_lkb(struct dlm_lkb *lkb)
e7fd4179
DT
159{
160 printk(KERN_ERR "lkb: nodeid %d id %x remid %x exflags %x flags %x\n"
161 " status %d rqmode %d grmode %d wait_type %d ast_type %d\n",
162 lkb->lkb_nodeid, lkb->lkb_id, lkb->lkb_remid, lkb->lkb_exflags,
163 lkb->lkb_flags, lkb->lkb_status, lkb->lkb_rqmode,
164 lkb->lkb_grmode, lkb->lkb_wait_type, lkb->lkb_ast_type);
165}
166
167void dlm_print_rsb(struct dlm_rsb *r)
168{
169 printk(KERN_ERR "rsb: nodeid %d flags %lx first %x rlc %d name %s\n",
170 r->res_nodeid, r->res_flags, r->res_first_lkid,
171 r->res_recover_locks_count, r->res_name);
172}
173
174/* Threads cannot use the lockspace while it's being recovered */
175
176static inline void lock_recovery(struct dlm_ls *ls)
177{
178 down_read(&ls->ls_in_recovery);
179}
180
181static inline void unlock_recovery(struct dlm_ls *ls)
182{
183 up_read(&ls->ls_in_recovery);
184}
185
186static inline int lock_recovery_try(struct dlm_ls *ls)
187{
188 return down_read_trylock(&ls->ls_in_recovery);
189}
190
191static inline int can_be_queued(struct dlm_lkb *lkb)
192{
193 return !(lkb->lkb_exflags & DLM_LKF_NOQUEUE);
194}
195
196static inline int force_blocking_asts(struct dlm_lkb *lkb)
197{
198 return (lkb->lkb_exflags & DLM_LKF_NOQUEUEBAST);
199}
200
201static inline int is_demoted(struct dlm_lkb *lkb)
202{
203 return (lkb->lkb_sbflags & DLM_SBF_DEMOTED);
204}
205
206static inline int is_remote(struct dlm_rsb *r)
207{
208 DLM_ASSERT(r->res_nodeid >= 0, dlm_print_rsb(r););
209 return !!r->res_nodeid;
210}
211
212static inline int is_process_copy(struct dlm_lkb *lkb)
213{
214 return (lkb->lkb_nodeid && !(lkb->lkb_flags & DLM_IFL_MSTCPY));
215}
216
217static inline int is_master_copy(struct dlm_lkb *lkb)
218{
219 if (lkb->lkb_flags & DLM_IFL_MSTCPY)
220 DLM_ASSERT(lkb->lkb_nodeid, dlm_print_lkb(lkb););
90135925 221 return (lkb->lkb_flags & DLM_IFL_MSTCPY) ? 1 : 0;
e7fd4179
DT
222}
223
224static inline int middle_conversion(struct dlm_lkb *lkb)
225{
226 if ((lkb->lkb_grmode==DLM_LOCK_PR && lkb->lkb_rqmode==DLM_LOCK_CW) ||
227 (lkb->lkb_rqmode==DLM_LOCK_PR && lkb->lkb_grmode==DLM_LOCK_CW))
90135925
DT
228 return 1;
229 return 0;
e7fd4179
DT
230}
231
232static inline int down_conversion(struct dlm_lkb *lkb)
233{
234 return (!middle_conversion(lkb) && lkb->lkb_rqmode < lkb->lkb_grmode);
235}
236
237static void queue_cast(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
238{
239 if (is_master_copy(lkb))
240 return;
241
242 DLM_ASSERT(lkb->lkb_lksb, dlm_print_lkb(lkb););
243
244 lkb->lkb_lksb->sb_status = rv;
245 lkb->lkb_lksb->sb_flags = lkb->lkb_sbflags;
246
247 dlm_add_ast(lkb, AST_COMP);
248}
249
250static void queue_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int rqmode)
251{
252 if (is_master_copy(lkb))
253 send_bast(r, lkb, rqmode);
254 else {
255 lkb->lkb_bastmode = rqmode;
256 dlm_add_ast(lkb, AST_BAST);
257 }
258}
259
260/*
261 * Basic operations on rsb's and lkb's
262 */
263
264static struct dlm_rsb *create_rsb(struct dlm_ls *ls, char *name, int len)
265{
266 struct dlm_rsb *r;
267
268 r = allocate_rsb(ls, len);
269 if (!r)
270 return NULL;
271
272 r->res_ls = ls;
273 r->res_length = len;
274 memcpy(r->res_name, name, len);
90135925 275 mutex_init(&r->res_mutex);
e7fd4179
DT
276
277 INIT_LIST_HEAD(&r->res_lookup);
278 INIT_LIST_HEAD(&r->res_grantqueue);
279 INIT_LIST_HEAD(&r->res_convertqueue);
280 INIT_LIST_HEAD(&r->res_waitqueue);
281 INIT_LIST_HEAD(&r->res_root_list);
282 INIT_LIST_HEAD(&r->res_recover_list);
283
284 return r;
285}
286
287static int search_rsb_list(struct list_head *head, char *name, int len,
288 unsigned int flags, struct dlm_rsb **r_ret)
289{
290 struct dlm_rsb *r;
291 int error = 0;
292
293 list_for_each_entry(r, head, res_hashchain) {
294 if (len == r->res_length && !memcmp(name, r->res_name, len))
295 goto found;
296 }
597d0cae 297 return -EBADR;
e7fd4179
DT
298
299 found:
300 if (r->res_nodeid && (flags & R_MASTER))
301 error = -ENOTBLK;
302 *r_ret = r;
303 return error;
304}
305
306static int _search_rsb(struct dlm_ls *ls, char *name, int len, int b,
307 unsigned int flags, struct dlm_rsb **r_ret)
308{
309 struct dlm_rsb *r;
310 int error;
311
312 error = search_rsb_list(&ls->ls_rsbtbl[b].list, name, len, flags, &r);
313 if (!error) {
314 kref_get(&r->res_ref);
315 goto out;
316 }
317 error = search_rsb_list(&ls->ls_rsbtbl[b].toss, name, len, flags, &r);
318 if (error)
319 goto out;
320
321 list_move(&r->res_hashchain, &ls->ls_rsbtbl[b].list);
322
323 if (dlm_no_directory(ls))
324 goto out;
325
326 if (r->res_nodeid == -1) {
327 rsb_clear_flag(r, RSB_MASTER_UNCERTAIN);
328 r->res_first_lkid = 0;
329 } else if (r->res_nodeid > 0) {
330 rsb_set_flag(r, RSB_MASTER_UNCERTAIN);
331 r->res_first_lkid = 0;
332 } else {
333 DLM_ASSERT(r->res_nodeid == 0, dlm_print_rsb(r););
334 DLM_ASSERT(!rsb_flag(r, RSB_MASTER_UNCERTAIN),);
335 }
336 out:
337 *r_ret = r;
338 return error;
339}
340
341static int search_rsb(struct dlm_ls *ls, char *name, int len, int b,
342 unsigned int flags, struct dlm_rsb **r_ret)
343{
344 int error;
345 write_lock(&ls->ls_rsbtbl[b].lock);
346 error = _search_rsb(ls, name, len, b, flags, r_ret);
347 write_unlock(&ls->ls_rsbtbl[b].lock);
348 return error;
349}
350
351/*
352 * Find rsb in rsbtbl and potentially create/add one
353 *
354 * Delaying the release of rsb's has a similar benefit to applications keeping
355 * NL locks on an rsb, but without the guarantee that the cached master value
356 * will still be valid when the rsb is reused. Apps aren't always smart enough
357 * to keep NL locks on an rsb that they may lock again shortly; this can lead
358 * to excessive master lookups and removals if we don't delay the release.
359 *
360 * Searching for an rsb means looking through both the normal list and toss
361 * list. When found on the toss list the rsb is moved to the normal list with
362 * ref count of 1; when found on normal list the ref count is incremented.
363 */
364
365static int find_rsb(struct dlm_ls *ls, char *name, int namelen,
366 unsigned int flags, struct dlm_rsb **r_ret)
367{
368 struct dlm_rsb *r, *tmp;
369 uint32_t hash, bucket;
370 int error = 0;
371
372 if (dlm_no_directory(ls))
373 flags |= R_CREATE;
374
375 hash = jhash(name, namelen, 0);
376 bucket = hash & (ls->ls_rsbtbl_size - 1);
377
378 error = search_rsb(ls, name, namelen, bucket, flags, &r);
379 if (!error)
380 goto out;
381
597d0cae 382 if (error == -EBADR && !(flags & R_CREATE))
e7fd4179
DT
383 goto out;
384
385 /* the rsb was found but wasn't a master copy */
386 if (error == -ENOTBLK)
387 goto out;
388
389 error = -ENOMEM;
390 r = create_rsb(ls, name, namelen);
391 if (!r)
392 goto out;
393
394 r->res_hash = hash;
395 r->res_bucket = bucket;
396 r->res_nodeid = -1;
397 kref_init(&r->res_ref);
398
399 /* With no directory, the master can be set immediately */
400 if (dlm_no_directory(ls)) {
401 int nodeid = dlm_dir_nodeid(r);
402 if (nodeid == dlm_our_nodeid())
403 nodeid = 0;
404 r->res_nodeid = nodeid;
405 }
406
407 write_lock(&ls->ls_rsbtbl[bucket].lock);
408 error = _search_rsb(ls, name, namelen, bucket, 0, &tmp);
409 if (!error) {
410 write_unlock(&ls->ls_rsbtbl[bucket].lock);
411 free_rsb(r);
412 r = tmp;
413 goto out;
414 }
415 list_add(&r->res_hashchain, &ls->ls_rsbtbl[bucket].list);
416 write_unlock(&ls->ls_rsbtbl[bucket].lock);
417 error = 0;
418 out:
419 *r_ret = r;
420 return error;
421}
422
423int dlm_find_rsb(struct dlm_ls *ls, char *name, int namelen,
424 unsigned int flags, struct dlm_rsb **r_ret)
425{
426 return find_rsb(ls, name, namelen, flags, r_ret);
427}
428
429/* This is only called to add a reference when the code already holds
430 a valid reference to the rsb, so there's no need for locking. */
431
432static inline void hold_rsb(struct dlm_rsb *r)
433{
434 kref_get(&r->res_ref);
435}
436
437void dlm_hold_rsb(struct dlm_rsb *r)
438{
439 hold_rsb(r);
440}
441
442static void toss_rsb(struct kref *kref)
443{
444 struct dlm_rsb *r = container_of(kref, struct dlm_rsb, res_ref);
445 struct dlm_ls *ls = r->res_ls;
446
447 DLM_ASSERT(list_empty(&r->res_root_list), dlm_print_rsb(r););
448 kref_init(&r->res_ref);
449 list_move(&r->res_hashchain, &ls->ls_rsbtbl[r->res_bucket].toss);
450 r->res_toss_time = jiffies;
451 if (r->res_lvbptr) {
452 free_lvb(r->res_lvbptr);
453 r->res_lvbptr = NULL;
454 }
455}
456
457/* When all references to the rsb are gone it's transfered to
458 the tossed list for later disposal. */
459
460static void put_rsb(struct dlm_rsb *r)
461{
462 struct dlm_ls *ls = r->res_ls;
463 uint32_t bucket = r->res_bucket;
464
465 write_lock(&ls->ls_rsbtbl[bucket].lock);
466 kref_put(&r->res_ref, toss_rsb);
467 write_unlock(&ls->ls_rsbtbl[bucket].lock);
468}
469
470void dlm_put_rsb(struct dlm_rsb *r)
471{
472 put_rsb(r);
473}
474
475/* See comment for unhold_lkb */
476
477static void unhold_rsb(struct dlm_rsb *r)
478{
479 int rv;
480 rv = kref_put(&r->res_ref, toss_rsb);
481 DLM_ASSERT(!rv, dlm_print_rsb(r););
482}
483
484static void kill_rsb(struct kref *kref)
485{
486 struct dlm_rsb *r = container_of(kref, struct dlm_rsb, res_ref);
487
488 /* All work is done after the return from kref_put() so we
489 can release the write_lock before the remove and free. */
490
491 DLM_ASSERT(list_empty(&r->res_lookup),);
492 DLM_ASSERT(list_empty(&r->res_grantqueue),);
493 DLM_ASSERT(list_empty(&r->res_convertqueue),);
494 DLM_ASSERT(list_empty(&r->res_waitqueue),);
495 DLM_ASSERT(list_empty(&r->res_root_list),);
496 DLM_ASSERT(list_empty(&r->res_recover_list),);
497}
498
499/* Attaching/detaching lkb's from rsb's is for rsb reference counting.
500 The rsb must exist as long as any lkb's for it do. */
501
502static void attach_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb)
503{
504 hold_rsb(r);
505 lkb->lkb_resource = r;
506}
507
508static void detach_lkb(struct dlm_lkb *lkb)
509{
510 if (lkb->lkb_resource) {
511 put_rsb(lkb->lkb_resource);
512 lkb->lkb_resource = NULL;
513 }
514}
515
516static int create_lkb(struct dlm_ls *ls, struct dlm_lkb **lkb_ret)
517{
518 struct dlm_lkb *lkb, *tmp;
519 uint32_t lkid = 0;
520 uint16_t bucket;
521
522 lkb = allocate_lkb(ls);
523 if (!lkb)
524 return -ENOMEM;
525
526 lkb->lkb_nodeid = -1;
527 lkb->lkb_grmode = DLM_LOCK_IV;
528 kref_init(&lkb->lkb_ref);
529
530 get_random_bytes(&bucket, sizeof(bucket));
531 bucket &= (ls->ls_lkbtbl_size - 1);
532
533 write_lock(&ls->ls_lkbtbl[bucket].lock);
534
535 /* counter can roll over so we must verify lkid is not in use */
536
537 while (lkid == 0) {
538 lkid = bucket | (ls->ls_lkbtbl[bucket].counter++ << 16);
539
540 list_for_each_entry(tmp, &ls->ls_lkbtbl[bucket].list,
541 lkb_idtbl_list) {
542 if (tmp->lkb_id != lkid)
543 continue;
544 lkid = 0;
545 break;
546 }
547 }
548
549 lkb->lkb_id = lkid;
550 list_add(&lkb->lkb_idtbl_list, &ls->ls_lkbtbl[bucket].list);
551 write_unlock(&ls->ls_lkbtbl[bucket].lock);
552
553 *lkb_ret = lkb;
554 return 0;
555}
556
557static struct dlm_lkb *__find_lkb(struct dlm_ls *ls, uint32_t lkid)
558{
559 uint16_t bucket = lkid & 0xFFFF;
560 struct dlm_lkb *lkb;
561
562 list_for_each_entry(lkb, &ls->ls_lkbtbl[bucket].list, lkb_idtbl_list) {
563 if (lkb->lkb_id == lkid)
564 return lkb;
565 }
566 return NULL;
567}
568
569static int find_lkb(struct dlm_ls *ls, uint32_t lkid, struct dlm_lkb **lkb_ret)
570{
571 struct dlm_lkb *lkb;
572 uint16_t bucket = lkid & 0xFFFF;
573
574 if (bucket >= ls->ls_lkbtbl_size)
575 return -EBADSLT;
576
577 read_lock(&ls->ls_lkbtbl[bucket].lock);
578 lkb = __find_lkb(ls, lkid);
579 if (lkb)
580 kref_get(&lkb->lkb_ref);
581 read_unlock(&ls->ls_lkbtbl[bucket].lock);
582
583 *lkb_ret = lkb;
584 return lkb ? 0 : -ENOENT;
585}
586
587static void kill_lkb(struct kref *kref)
588{
589 struct dlm_lkb *lkb = container_of(kref, struct dlm_lkb, lkb_ref);
590
591 /* All work is done after the return from kref_put() so we
592 can release the write_lock before the detach_lkb */
593
594 DLM_ASSERT(!lkb->lkb_status, dlm_print_lkb(lkb););
595}
596
b3f58d8f
DT
597/* __put_lkb() is used when an lkb may not have an rsb attached to
598 it so we need to provide the lockspace explicitly */
599
600static int __put_lkb(struct dlm_ls *ls, struct dlm_lkb *lkb)
e7fd4179 601{
e7fd4179
DT
602 uint16_t bucket = lkb->lkb_id & 0xFFFF;
603
604 write_lock(&ls->ls_lkbtbl[bucket].lock);
605 if (kref_put(&lkb->lkb_ref, kill_lkb)) {
606 list_del(&lkb->lkb_idtbl_list);
607 write_unlock(&ls->ls_lkbtbl[bucket].lock);
608
609 detach_lkb(lkb);
610
611 /* for local/process lkbs, lvbptr points to caller's lksb */
612 if (lkb->lkb_lvbptr && is_master_copy(lkb))
613 free_lvb(lkb->lkb_lvbptr);
e7fd4179
DT
614 free_lkb(lkb);
615 return 1;
616 } else {
617 write_unlock(&ls->ls_lkbtbl[bucket].lock);
618 return 0;
619 }
620}
621
622int dlm_put_lkb(struct dlm_lkb *lkb)
623{
b3f58d8f
DT
624 struct dlm_ls *ls;
625
626 DLM_ASSERT(lkb->lkb_resource, dlm_print_lkb(lkb););
627 DLM_ASSERT(lkb->lkb_resource->res_ls, dlm_print_lkb(lkb););
628
629 ls = lkb->lkb_resource->res_ls;
630 return __put_lkb(ls, lkb);
e7fd4179
DT
631}
632
633/* This is only called to add a reference when the code already holds
634 a valid reference to the lkb, so there's no need for locking. */
635
636static inline void hold_lkb(struct dlm_lkb *lkb)
637{
638 kref_get(&lkb->lkb_ref);
639}
640
641/* This is called when we need to remove a reference and are certain
642 it's not the last ref. e.g. del_lkb is always called between a
643 find_lkb/put_lkb and is always the inverse of a previous add_lkb.
644 put_lkb would work fine, but would involve unnecessary locking */
645
646static inline void unhold_lkb(struct dlm_lkb *lkb)
647{
648 int rv;
649 rv = kref_put(&lkb->lkb_ref, kill_lkb);
650 DLM_ASSERT(!rv, dlm_print_lkb(lkb););
651}
652
653static void lkb_add_ordered(struct list_head *new, struct list_head *head,
654 int mode)
655{
656 struct dlm_lkb *lkb = NULL;
657
658 list_for_each_entry(lkb, head, lkb_statequeue)
659 if (lkb->lkb_rqmode < mode)
660 break;
661
662 if (!lkb)
663 list_add_tail(new, head);
664 else
665 __list_add(new, lkb->lkb_statequeue.prev, &lkb->lkb_statequeue);
666}
667
668/* add/remove lkb to rsb's grant/convert/wait queue */
669
670static void add_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb, int status)
671{
672 kref_get(&lkb->lkb_ref);
673
674 DLM_ASSERT(!lkb->lkb_status, dlm_print_lkb(lkb););
675
676 lkb->lkb_status = status;
677
678 switch (status) {
679 case DLM_LKSTS_WAITING:
680 if (lkb->lkb_exflags & DLM_LKF_HEADQUE)
681 list_add(&lkb->lkb_statequeue, &r->res_waitqueue);
682 else
683 list_add_tail(&lkb->lkb_statequeue, &r->res_waitqueue);
684 break;
685 case DLM_LKSTS_GRANTED:
686 /* convention says granted locks kept in order of grmode */
687 lkb_add_ordered(&lkb->lkb_statequeue, &r->res_grantqueue,
688 lkb->lkb_grmode);
689 break;
690 case DLM_LKSTS_CONVERT:
691 if (lkb->lkb_exflags & DLM_LKF_HEADQUE)
692 list_add(&lkb->lkb_statequeue, &r->res_convertqueue);
693 else
694 list_add_tail(&lkb->lkb_statequeue,
695 &r->res_convertqueue);
696 break;
697 default:
698 DLM_ASSERT(0, dlm_print_lkb(lkb); printk("sts=%d\n", status););
699 }
700}
701
702static void del_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb)
703{
704 lkb->lkb_status = 0;
705 list_del(&lkb->lkb_statequeue);
706 unhold_lkb(lkb);
707}
708
709static void move_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb, int sts)
710{
711 hold_lkb(lkb);
712 del_lkb(r, lkb);
713 add_lkb(r, lkb, sts);
714 unhold_lkb(lkb);
715}
716
717/* add/remove lkb from global waiters list of lkb's waiting for
718 a reply from a remote node */
719
720static void add_to_waiters(struct dlm_lkb *lkb, int mstype)
721{
722 struct dlm_ls *ls = lkb->lkb_resource->res_ls;
723
90135925 724 mutex_lock(&ls->ls_waiters_mutex);
e7fd4179
DT
725 if (lkb->lkb_wait_type) {
726 log_print("add_to_waiters error %d", lkb->lkb_wait_type);
727 goto out;
728 }
729 lkb->lkb_wait_type = mstype;
730 kref_get(&lkb->lkb_ref);
731 list_add(&lkb->lkb_wait_reply, &ls->ls_waiters);
732 out:
90135925 733 mutex_unlock(&ls->ls_waiters_mutex);
e7fd4179
DT
734}
735
736static int _remove_from_waiters(struct dlm_lkb *lkb)
737{
738 int error = 0;
739
740 if (!lkb->lkb_wait_type) {
741 log_print("remove_from_waiters error");
742 error = -EINVAL;
743 goto out;
744 }
745 lkb->lkb_wait_type = 0;
746 list_del(&lkb->lkb_wait_reply);
747 unhold_lkb(lkb);
748 out:
749 return error;
750}
751
752static int remove_from_waiters(struct dlm_lkb *lkb)
753{
754 struct dlm_ls *ls = lkb->lkb_resource->res_ls;
755 int error;
756
90135925 757 mutex_lock(&ls->ls_waiters_mutex);
e7fd4179 758 error = _remove_from_waiters(lkb);
90135925 759 mutex_unlock(&ls->ls_waiters_mutex);
e7fd4179
DT
760 return error;
761}
762
763static void dir_remove(struct dlm_rsb *r)
764{
765 int to_nodeid;
766
767 if (dlm_no_directory(r->res_ls))
768 return;
769
770 to_nodeid = dlm_dir_nodeid(r);
771 if (to_nodeid != dlm_our_nodeid())
772 send_remove(r);
773 else
774 dlm_dir_remove_entry(r->res_ls, to_nodeid,
775 r->res_name, r->res_length);
776}
777
778/* FIXME: shouldn't this be able to exit as soon as one non-due rsb is
779 found since they are in order of newest to oldest? */
780
781static int shrink_bucket(struct dlm_ls *ls, int b)
782{
783 struct dlm_rsb *r;
784 int count = 0, found;
785
786 for (;;) {
90135925 787 found = 0;
e7fd4179
DT
788 write_lock(&ls->ls_rsbtbl[b].lock);
789 list_for_each_entry_reverse(r, &ls->ls_rsbtbl[b].toss,
790 res_hashchain) {
791 if (!time_after_eq(jiffies, r->res_toss_time +
792 dlm_config.toss_secs * HZ))
793 continue;
90135925 794 found = 1;
e7fd4179
DT
795 break;
796 }
797
798 if (!found) {
799 write_unlock(&ls->ls_rsbtbl[b].lock);
800 break;
801 }
802
803 if (kref_put(&r->res_ref, kill_rsb)) {
804 list_del(&r->res_hashchain);
805 write_unlock(&ls->ls_rsbtbl[b].lock);
806
807 if (is_master(r))
808 dir_remove(r);
809 free_rsb(r);
810 count++;
811 } else {
812 write_unlock(&ls->ls_rsbtbl[b].lock);
813 log_error(ls, "tossed rsb in use %s", r->res_name);
814 }
815 }
816
817 return count;
818}
819
820void dlm_scan_rsbs(struct dlm_ls *ls)
821{
822 int i;
823
824 if (dlm_locking_stopped(ls))
825 return;
826
827 for (i = 0; i < ls->ls_rsbtbl_size; i++) {
828 shrink_bucket(ls, i);
829 cond_resched();
830 }
831}
832
833/* lkb is master or local copy */
834
835static void set_lvb_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
836{
837 int b, len = r->res_ls->ls_lvblen;
838
839 /* b=1 lvb returned to caller
840 b=0 lvb written to rsb or invalidated
841 b=-1 do nothing */
842
843 b = dlm_lvb_operations[lkb->lkb_grmode + 1][lkb->lkb_rqmode + 1];
844
845 if (b == 1) {
846 if (!lkb->lkb_lvbptr)
847 return;
848
849 if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
850 return;
851
852 if (!r->res_lvbptr)
853 return;
854
855 memcpy(lkb->lkb_lvbptr, r->res_lvbptr, len);
856 lkb->lkb_lvbseq = r->res_lvbseq;
857
858 } else if (b == 0) {
859 if (lkb->lkb_exflags & DLM_LKF_IVVALBLK) {
860 rsb_set_flag(r, RSB_VALNOTVALID);
861 return;
862 }
863
864 if (!lkb->lkb_lvbptr)
865 return;
866
867 if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
868 return;
869
870 if (!r->res_lvbptr)
871 r->res_lvbptr = allocate_lvb(r->res_ls);
872
873 if (!r->res_lvbptr)
874 return;
875
876 memcpy(r->res_lvbptr, lkb->lkb_lvbptr, len);
877 r->res_lvbseq++;
878 lkb->lkb_lvbseq = r->res_lvbseq;
879 rsb_clear_flag(r, RSB_VALNOTVALID);
880 }
881
882 if (rsb_flag(r, RSB_VALNOTVALID))
883 lkb->lkb_sbflags |= DLM_SBF_VALNOTVALID;
884}
885
886static void set_lvb_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb)
887{
888 if (lkb->lkb_grmode < DLM_LOCK_PW)
889 return;
890
891 if (lkb->lkb_exflags & DLM_LKF_IVVALBLK) {
892 rsb_set_flag(r, RSB_VALNOTVALID);
893 return;
894 }
895
896 if (!lkb->lkb_lvbptr)
897 return;
898
899 if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
900 return;
901
902 if (!r->res_lvbptr)
903 r->res_lvbptr = allocate_lvb(r->res_ls);
904
905 if (!r->res_lvbptr)
906 return;
907
908 memcpy(r->res_lvbptr, lkb->lkb_lvbptr, r->res_ls->ls_lvblen);
909 r->res_lvbseq++;
910 rsb_clear_flag(r, RSB_VALNOTVALID);
911}
912
913/* lkb is process copy (pc) */
914
915static void set_lvb_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb,
916 struct dlm_message *ms)
917{
918 int b;
919
920 if (!lkb->lkb_lvbptr)
921 return;
922
923 if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
924 return;
925
597d0cae 926 b = dlm_lvb_operations[lkb->lkb_grmode + 1][lkb->lkb_rqmode + 1];
e7fd4179
DT
927 if (b == 1) {
928 int len = receive_extralen(ms);
929 memcpy(lkb->lkb_lvbptr, ms->m_extra, len);
930 lkb->lkb_lvbseq = ms->m_lvbseq;
931 }
932}
933
934/* Manipulate lkb's on rsb's convert/granted/waiting queues
935 remove_lock -- used for unlock, removes lkb from granted
936 revert_lock -- used for cancel, moves lkb from convert to granted
937 grant_lock -- used for request and convert, adds lkb to granted or
938 moves lkb from convert or waiting to granted
939
940 Each of these is used for master or local copy lkb's. There is
941 also a _pc() variation used to make the corresponding change on
942 a process copy (pc) lkb. */
943
944static void _remove_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
945{
946 del_lkb(r, lkb);
947 lkb->lkb_grmode = DLM_LOCK_IV;
948 /* this unhold undoes the original ref from create_lkb()
949 so this leads to the lkb being freed */
950 unhold_lkb(lkb);
951}
952
953static void remove_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
954{
955 set_lvb_unlock(r, lkb);
956 _remove_lock(r, lkb);
957}
958
959static void remove_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb)
960{
961 _remove_lock(r, lkb);
962}
963
964static void revert_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
965{
966 lkb->lkb_rqmode = DLM_LOCK_IV;
967
968 switch (lkb->lkb_status) {
597d0cae
DT
969 case DLM_LKSTS_GRANTED:
970 break;
e7fd4179
DT
971 case DLM_LKSTS_CONVERT:
972 move_lkb(r, lkb, DLM_LKSTS_GRANTED);
973 break;
974 case DLM_LKSTS_WAITING:
975 del_lkb(r, lkb);
976 lkb->lkb_grmode = DLM_LOCK_IV;
977 /* this unhold undoes the original ref from create_lkb()
978 so this leads to the lkb being freed */
979 unhold_lkb(lkb);
980 break;
981 default:
982 log_print("invalid status for revert %d", lkb->lkb_status);
983 }
984}
985
986static void revert_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb)
987{
988 revert_lock(r, lkb);
989}
990
991static void _grant_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
992{
993 if (lkb->lkb_grmode != lkb->lkb_rqmode) {
994 lkb->lkb_grmode = lkb->lkb_rqmode;
995 if (lkb->lkb_status)
996 move_lkb(r, lkb, DLM_LKSTS_GRANTED);
997 else
998 add_lkb(r, lkb, DLM_LKSTS_GRANTED);
999 }
1000
1001 lkb->lkb_rqmode = DLM_LOCK_IV;
e7fd4179
DT
1002}
1003
1004static void grant_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1005{
1006 set_lvb_lock(r, lkb);
1007 _grant_lock(r, lkb);
1008 lkb->lkb_highbast = 0;
1009}
1010
1011static void grant_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb,
1012 struct dlm_message *ms)
1013{
1014 set_lvb_lock_pc(r, lkb, ms);
1015 _grant_lock(r, lkb);
1016}
1017
1018/* called by grant_pending_locks() which means an async grant message must
1019 be sent to the requesting node in addition to granting the lock if the
1020 lkb belongs to a remote node. */
1021
1022static void grant_lock_pending(struct dlm_rsb *r, struct dlm_lkb *lkb)
1023{
1024 grant_lock(r, lkb);
1025 if (is_master_copy(lkb))
1026 send_grant(r, lkb);
1027 else
1028 queue_cast(r, lkb, 0);
1029}
1030
1031static inline int first_in_list(struct dlm_lkb *lkb, struct list_head *head)
1032{
1033 struct dlm_lkb *first = list_entry(head->next, struct dlm_lkb,
1034 lkb_statequeue);
1035 if (lkb->lkb_id == first->lkb_id)
90135925 1036 return 1;
e7fd4179 1037
90135925 1038 return 0;
e7fd4179
DT
1039}
1040
e7fd4179
DT
1041/* Check if the given lkb conflicts with another lkb on the queue. */
1042
1043static int queue_conflict(struct list_head *head, struct dlm_lkb *lkb)
1044{
1045 struct dlm_lkb *this;
1046
1047 list_for_each_entry(this, head, lkb_statequeue) {
1048 if (this == lkb)
1049 continue;
3bcd3687 1050 if (!modes_compat(this, lkb))
90135925 1051 return 1;
e7fd4179 1052 }
90135925 1053 return 0;
e7fd4179
DT
1054}
1055
1056/*
1057 * "A conversion deadlock arises with a pair of lock requests in the converting
1058 * queue for one resource. The granted mode of each lock blocks the requested
1059 * mode of the other lock."
1060 *
1061 * Part 2: if the granted mode of lkb is preventing the first lkb in the
1062 * convert queue from being granted, then demote lkb (set grmode to NL).
1063 * This second form requires that we check for conv-deadlk even when
1064 * now == 0 in _can_be_granted().
1065 *
1066 * Example:
1067 * Granted Queue: empty
1068 * Convert Queue: NL->EX (first lock)
1069 * PR->EX (second lock)
1070 *
1071 * The first lock can't be granted because of the granted mode of the second
1072 * lock and the second lock can't be granted because it's not first in the
1073 * list. We demote the granted mode of the second lock (the lkb passed to this
1074 * function).
1075 *
1076 * After the resolution, the "grant pending" function needs to go back and try
1077 * to grant locks on the convert queue again since the first lock can now be
1078 * granted.
1079 */
1080
1081static int conversion_deadlock_detect(struct dlm_rsb *rsb, struct dlm_lkb *lkb)
1082{
1083 struct dlm_lkb *this, *first = NULL, *self = NULL;
1084
1085 list_for_each_entry(this, &rsb->res_convertqueue, lkb_statequeue) {
1086 if (!first)
1087 first = this;
1088 if (this == lkb) {
1089 self = lkb;
1090 continue;
1091 }
1092
e7fd4179 1093 if (!modes_compat(this, lkb) && !modes_compat(lkb, this))
90135925 1094 return 1;
e7fd4179
DT
1095 }
1096
1097 /* if lkb is on the convert queue and is preventing the first
1098 from being granted, then there's deadlock and we demote lkb.
1099 multiple converting locks may need to do this before the first
1100 converting lock can be granted. */
1101
1102 if (self && self != first) {
1103 if (!modes_compat(lkb, first) &&
1104 !queue_conflict(&rsb->res_grantqueue, first))
90135925 1105 return 1;
e7fd4179
DT
1106 }
1107
90135925 1108 return 0;
e7fd4179
DT
1109}
1110
1111/*
1112 * Return 1 if the lock can be granted, 0 otherwise.
1113 * Also detect and resolve conversion deadlocks.
1114 *
1115 * lkb is the lock to be granted
1116 *
1117 * now is 1 if the function is being called in the context of the
1118 * immediate request, it is 0 if called later, after the lock has been
1119 * queued.
1120 *
1121 * References are from chapter 6 of "VAXcluster Principles" by Roy Davis
1122 */
1123
1124static int _can_be_granted(struct dlm_rsb *r, struct dlm_lkb *lkb, int now)
1125{
1126 int8_t conv = (lkb->lkb_grmode != DLM_LOCK_IV);
1127
1128 /*
1129 * 6-10: Version 5.4 introduced an option to address the phenomenon of
1130 * a new request for a NL mode lock being blocked.
1131 *
1132 * 6-11: If the optional EXPEDITE flag is used with the new NL mode
1133 * request, then it would be granted. In essence, the use of this flag
1134 * tells the Lock Manager to expedite theis request by not considering
1135 * what may be in the CONVERTING or WAITING queues... As of this
1136 * writing, the EXPEDITE flag can be used only with new requests for NL
1137 * mode locks. This flag is not valid for conversion requests.
1138 *
1139 * A shortcut. Earlier checks return an error if EXPEDITE is used in a
1140 * conversion or used with a non-NL requested mode. We also know an
1141 * EXPEDITE request is always granted immediately, so now must always
1142 * be 1. The full condition to grant an expedite request: (now &&
1143 * !conv && lkb->rqmode == DLM_LOCK_NL && (flags & EXPEDITE)) can
1144 * therefore be shortened to just checking the flag.
1145 */
1146
1147 if (lkb->lkb_exflags & DLM_LKF_EXPEDITE)
90135925 1148 return 1;
e7fd4179
DT
1149
1150 /*
1151 * A shortcut. Without this, !queue_conflict(grantqueue, lkb) would be
1152 * added to the remaining conditions.
1153 */
1154
1155 if (queue_conflict(&r->res_grantqueue, lkb))
1156 goto out;
1157
1158 /*
1159 * 6-3: By default, a conversion request is immediately granted if the
1160 * requested mode is compatible with the modes of all other granted
1161 * locks
1162 */
1163
1164 if (queue_conflict(&r->res_convertqueue, lkb))
1165 goto out;
1166
1167 /*
1168 * 6-5: But the default algorithm for deciding whether to grant or
1169 * queue conversion requests does not by itself guarantee that such
1170 * requests are serviced on a "first come first serve" basis. This, in
1171 * turn, can lead to a phenomenon known as "indefinate postponement".
1172 *
1173 * 6-7: This issue is dealt with by using the optional QUECVT flag with
1174 * the system service employed to request a lock conversion. This flag
1175 * forces certain conversion requests to be queued, even if they are
1176 * compatible with the granted modes of other locks on the same
1177 * resource. Thus, the use of this flag results in conversion requests
1178 * being ordered on a "first come first servce" basis.
1179 *
1180 * DCT: This condition is all about new conversions being able to occur
1181 * "in place" while the lock remains on the granted queue (assuming
1182 * nothing else conflicts.) IOW if QUECVT isn't set, a conversion
1183 * doesn't _have_ to go onto the convert queue where it's processed in
1184 * order. The "now" variable is necessary to distinguish converts
1185 * being received and processed for the first time now, because once a
1186 * convert is moved to the conversion queue the condition below applies
1187 * requiring fifo granting.
1188 */
1189
1190 if (now && conv && !(lkb->lkb_exflags & DLM_LKF_QUECVT))
90135925 1191 return 1;
e7fd4179
DT
1192
1193 /*
3bcd3687
DT
1194 * The NOORDER flag is set to avoid the standard vms rules on grant
1195 * order.
e7fd4179
DT
1196 */
1197
1198 if (lkb->lkb_exflags & DLM_LKF_NOORDER)
90135925 1199 return 1;
e7fd4179
DT
1200
1201 /*
1202 * 6-3: Once in that queue [CONVERTING], a conversion request cannot be
1203 * granted until all other conversion requests ahead of it are granted
1204 * and/or canceled.
1205 */
1206
1207 if (!now && conv && first_in_list(lkb, &r->res_convertqueue))
90135925 1208 return 1;
e7fd4179
DT
1209
1210 /*
1211 * 6-4: By default, a new request is immediately granted only if all
1212 * three of the following conditions are satisfied when the request is
1213 * issued:
1214 * - The queue of ungranted conversion requests for the resource is
1215 * empty.
1216 * - The queue of ungranted new requests for the resource is empty.
1217 * - The mode of the new request is compatible with the most
1218 * restrictive mode of all granted locks on the resource.
1219 */
1220
1221 if (now && !conv && list_empty(&r->res_convertqueue) &&
1222 list_empty(&r->res_waitqueue))
90135925 1223 return 1;
e7fd4179
DT
1224
1225 /*
1226 * 6-4: Once a lock request is in the queue of ungranted new requests,
1227 * it cannot be granted until the queue of ungranted conversion
1228 * requests is empty, all ungranted new requests ahead of it are
1229 * granted and/or canceled, and it is compatible with the granted mode
1230 * of the most restrictive lock granted on the resource.
1231 */
1232
1233 if (!now && !conv && list_empty(&r->res_convertqueue) &&
1234 first_in_list(lkb, &r->res_waitqueue))
90135925 1235 return 1;
e7fd4179
DT
1236
1237 out:
1238 /*
1239 * The following, enabled by CONVDEADLK, departs from VMS.
1240 */
1241
1242 if (conv && (lkb->lkb_exflags & DLM_LKF_CONVDEADLK) &&
1243 conversion_deadlock_detect(r, lkb)) {
1244 lkb->lkb_grmode = DLM_LOCK_NL;
1245 lkb->lkb_sbflags |= DLM_SBF_DEMOTED;
1246 }
1247
90135925 1248 return 0;
e7fd4179
DT
1249}
1250
1251/*
1252 * The ALTPR and ALTCW flags aren't traditional lock manager flags, but are a
1253 * simple way to provide a big optimization to applications that can use them.
1254 */
1255
1256static int can_be_granted(struct dlm_rsb *r, struct dlm_lkb *lkb, int now)
1257{
1258 uint32_t flags = lkb->lkb_exflags;
1259 int rv;
1260 int8_t alt = 0, rqmode = lkb->lkb_rqmode;
1261
1262 rv = _can_be_granted(r, lkb, now);
1263 if (rv)
1264 goto out;
1265
1266 if (lkb->lkb_sbflags & DLM_SBF_DEMOTED)
1267 goto out;
1268
1269 if (rqmode != DLM_LOCK_PR && flags & DLM_LKF_ALTPR)
1270 alt = DLM_LOCK_PR;
1271 else if (rqmode != DLM_LOCK_CW && flags & DLM_LKF_ALTCW)
1272 alt = DLM_LOCK_CW;
1273
1274 if (alt) {
1275 lkb->lkb_rqmode = alt;
1276 rv = _can_be_granted(r, lkb, now);
1277 if (rv)
1278 lkb->lkb_sbflags |= DLM_SBF_ALTMODE;
1279 else
1280 lkb->lkb_rqmode = rqmode;
1281 }
1282 out:
1283 return rv;
1284}
1285
1286static int grant_pending_convert(struct dlm_rsb *r, int high)
1287{
1288 struct dlm_lkb *lkb, *s;
1289 int hi, demoted, quit, grant_restart, demote_restart;
1290
1291 quit = 0;
1292 restart:
1293 grant_restart = 0;
1294 demote_restart = 0;
1295 hi = DLM_LOCK_IV;
1296
1297 list_for_each_entry_safe(lkb, s, &r->res_convertqueue, lkb_statequeue) {
1298 demoted = is_demoted(lkb);
90135925 1299 if (can_be_granted(r, lkb, 0)) {
e7fd4179
DT
1300 grant_lock_pending(r, lkb);
1301 grant_restart = 1;
1302 } else {
1303 hi = max_t(int, lkb->lkb_rqmode, hi);
1304 if (!demoted && is_demoted(lkb))
1305 demote_restart = 1;
1306 }
1307 }
1308
1309 if (grant_restart)
1310 goto restart;
1311 if (demote_restart && !quit) {
1312 quit = 1;
1313 goto restart;
1314 }
1315
1316 return max_t(int, high, hi);
1317}
1318
1319static int grant_pending_wait(struct dlm_rsb *r, int high)
1320{
1321 struct dlm_lkb *lkb, *s;
1322
1323 list_for_each_entry_safe(lkb, s, &r->res_waitqueue, lkb_statequeue) {
90135925 1324 if (can_be_granted(r, lkb, 0))
e7fd4179
DT
1325 grant_lock_pending(r, lkb);
1326 else
1327 high = max_t(int, lkb->lkb_rqmode, high);
1328 }
1329
1330 return high;
1331}
1332
1333static void grant_pending_locks(struct dlm_rsb *r)
1334{
1335 struct dlm_lkb *lkb, *s;
1336 int high = DLM_LOCK_IV;
1337
1338 DLM_ASSERT(is_master(r), dlm_print_rsb(r););
1339
1340 high = grant_pending_convert(r, high);
1341 high = grant_pending_wait(r, high);
1342
1343 if (high == DLM_LOCK_IV)
1344 return;
1345
1346 /*
1347 * If there are locks left on the wait/convert queue then send blocking
1348 * ASTs to granted locks based on the largest requested mode (high)
3bcd3687 1349 * found above. FIXME: highbast < high comparison not valid for PR/CW.
e7fd4179
DT
1350 */
1351
1352 list_for_each_entry_safe(lkb, s, &r->res_grantqueue, lkb_statequeue) {
1353 if (lkb->lkb_bastaddr && (lkb->lkb_highbast < high) &&
1354 !__dlm_compat_matrix[lkb->lkb_grmode+1][high+1]) {
1355 queue_bast(r, lkb, high);
1356 lkb->lkb_highbast = high;
1357 }
1358 }
1359}
1360
1361static void send_bast_queue(struct dlm_rsb *r, struct list_head *head,
1362 struct dlm_lkb *lkb)
1363{
1364 struct dlm_lkb *gr;
1365
1366 list_for_each_entry(gr, head, lkb_statequeue) {
1367 if (gr->lkb_bastaddr &&
1368 gr->lkb_highbast < lkb->lkb_rqmode &&
3bcd3687 1369 !modes_compat(gr, lkb)) {
e7fd4179
DT
1370 queue_bast(r, gr, lkb->lkb_rqmode);
1371 gr->lkb_highbast = lkb->lkb_rqmode;
1372 }
1373 }
1374}
1375
1376static void send_blocking_asts(struct dlm_rsb *r, struct dlm_lkb *lkb)
1377{
1378 send_bast_queue(r, &r->res_grantqueue, lkb);
1379}
1380
1381static void send_blocking_asts_all(struct dlm_rsb *r, struct dlm_lkb *lkb)
1382{
1383 send_bast_queue(r, &r->res_grantqueue, lkb);
1384 send_bast_queue(r, &r->res_convertqueue, lkb);
1385}
1386
1387/* set_master(r, lkb) -- set the master nodeid of a resource
1388
1389 The purpose of this function is to set the nodeid field in the given
1390 lkb using the nodeid field in the given rsb. If the rsb's nodeid is
1391 known, it can just be copied to the lkb and the function will return
1392 0. If the rsb's nodeid is _not_ known, it needs to be looked up
1393 before it can be copied to the lkb.
1394
1395 When the rsb nodeid is being looked up remotely, the initial lkb
1396 causing the lookup is kept on the ls_waiters list waiting for the
1397 lookup reply. Other lkb's waiting for the same rsb lookup are kept
1398 on the rsb's res_lookup list until the master is verified.
1399
1400 Return values:
1401 0: nodeid is set in rsb/lkb and the caller should go ahead and use it
1402 1: the rsb master is not available and the lkb has been placed on
1403 a wait queue
1404*/
1405
1406static int set_master(struct dlm_rsb *r, struct dlm_lkb *lkb)
1407{
1408 struct dlm_ls *ls = r->res_ls;
1409 int error, dir_nodeid, ret_nodeid, our_nodeid = dlm_our_nodeid();
1410
1411 if (rsb_flag(r, RSB_MASTER_UNCERTAIN)) {
1412 rsb_clear_flag(r, RSB_MASTER_UNCERTAIN);
1413 r->res_first_lkid = lkb->lkb_id;
1414 lkb->lkb_nodeid = r->res_nodeid;
1415 return 0;
1416 }
1417
1418 if (r->res_first_lkid && r->res_first_lkid != lkb->lkb_id) {
1419 list_add_tail(&lkb->lkb_rsb_lookup, &r->res_lookup);
1420 return 1;
1421 }
1422
1423 if (r->res_nodeid == 0) {
1424 lkb->lkb_nodeid = 0;
1425 return 0;
1426 }
1427
1428 if (r->res_nodeid > 0) {
1429 lkb->lkb_nodeid = r->res_nodeid;
1430 return 0;
1431 }
1432
1433 DLM_ASSERT(r->res_nodeid == -1, dlm_print_rsb(r););
1434
1435 dir_nodeid = dlm_dir_nodeid(r);
1436
1437 if (dir_nodeid != our_nodeid) {
1438 r->res_first_lkid = lkb->lkb_id;
1439 send_lookup(r, lkb);
1440 return 1;
1441 }
1442
1443 for (;;) {
1444 /* It's possible for dlm_scand to remove an old rsb for
1445 this same resource from the toss list, us to create
1446 a new one, look up the master locally, and find it
1447 already exists just before dlm_scand does the
1448 dir_remove() on the previous rsb. */
1449
1450 error = dlm_dir_lookup(ls, our_nodeid, r->res_name,
1451 r->res_length, &ret_nodeid);
1452 if (!error)
1453 break;
1454 log_debug(ls, "dir_lookup error %d %s", error, r->res_name);
1455 schedule();
1456 }
1457
1458 if (ret_nodeid == our_nodeid) {
1459 r->res_first_lkid = 0;
1460 r->res_nodeid = 0;
1461 lkb->lkb_nodeid = 0;
1462 } else {
1463 r->res_first_lkid = lkb->lkb_id;
1464 r->res_nodeid = ret_nodeid;
1465 lkb->lkb_nodeid = ret_nodeid;
1466 }
1467 return 0;
1468}
1469
1470static void process_lookup_list(struct dlm_rsb *r)
1471{
1472 struct dlm_lkb *lkb, *safe;
1473
1474 list_for_each_entry_safe(lkb, safe, &r->res_lookup, lkb_rsb_lookup) {
1475 list_del(&lkb->lkb_rsb_lookup);
1476 _request_lock(r, lkb);
1477 schedule();
1478 }
1479}
1480
1481/* confirm_master -- confirm (or deny) an rsb's master nodeid */
1482
1483static void confirm_master(struct dlm_rsb *r, int error)
1484{
1485 struct dlm_lkb *lkb;
1486
1487 if (!r->res_first_lkid)
1488 return;
1489
1490 switch (error) {
1491 case 0:
1492 case -EINPROGRESS:
1493 r->res_first_lkid = 0;
1494 process_lookup_list(r);
1495 break;
1496
1497 case -EAGAIN:
1498 /* the remote master didn't queue our NOQUEUE request;
1499 make a waiting lkb the first_lkid */
1500
1501 r->res_first_lkid = 0;
1502
1503 if (!list_empty(&r->res_lookup)) {
1504 lkb = list_entry(r->res_lookup.next, struct dlm_lkb,
1505 lkb_rsb_lookup);
1506 list_del(&lkb->lkb_rsb_lookup);
1507 r->res_first_lkid = lkb->lkb_id;
1508 _request_lock(r, lkb);
1509 } else
1510 r->res_nodeid = -1;
1511 break;
1512
1513 default:
1514 log_error(r->res_ls, "confirm_master unknown error %d", error);
1515 }
1516}
1517
1518static int set_lock_args(int mode, struct dlm_lksb *lksb, uint32_t flags,
1519 int namelen, uint32_t parent_lkid, void *ast,
3bcd3687 1520 void *astarg, void *bast, struct dlm_args *args)
e7fd4179
DT
1521{
1522 int rv = -EINVAL;
1523
1524 /* check for invalid arg usage */
1525
1526 if (mode < 0 || mode > DLM_LOCK_EX)
1527 goto out;
1528
1529 if (!(flags & DLM_LKF_CONVERT) && (namelen > DLM_RESNAME_MAXLEN))
1530 goto out;
1531
1532 if (flags & DLM_LKF_CANCEL)
1533 goto out;
1534
1535 if (flags & DLM_LKF_QUECVT && !(flags & DLM_LKF_CONVERT))
1536 goto out;
1537
1538 if (flags & DLM_LKF_CONVDEADLK && !(flags & DLM_LKF_CONVERT))
1539 goto out;
1540
1541 if (flags & DLM_LKF_CONVDEADLK && flags & DLM_LKF_NOQUEUE)
1542 goto out;
1543
1544 if (flags & DLM_LKF_EXPEDITE && flags & DLM_LKF_CONVERT)
1545 goto out;
1546
1547 if (flags & DLM_LKF_EXPEDITE && flags & DLM_LKF_QUECVT)
1548 goto out;
1549
1550 if (flags & DLM_LKF_EXPEDITE && flags & DLM_LKF_NOQUEUE)
1551 goto out;
1552
1553 if (flags & DLM_LKF_EXPEDITE && mode != DLM_LOCK_NL)
1554 goto out;
1555
1556 if (!ast || !lksb)
1557 goto out;
1558
1559 if (flags & DLM_LKF_VALBLK && !lksb->sb_lvbptr)
1560 goto out;
1561
1562 /* parent/child locks not yet supported */
1563 if (parent_lkid)
1564 goto out;
1565
1566 if (flags & DLM_LKF_CONVERT && !lksb->sb_lkid)
1567 goto out;
1568
1569 /* these args will be copied to the lkb in validate_lock_args,
1570 it cannot be done now because when converting locks, fields in
1571 an active lkb cannot be modified before locking the rsb */
1572
1573 args->flags = flags;
1574 args->astaddr = ast;
1575 args->astparam = (long) astarg;
1576 args->bastaddr = bast;
1577 args->mode = mode;
1578 args->lksb = lksb;
e7fd4179
DT
1579 rv = 0;
1580 out:
1581 return rv;
1582}
1583
1584static int set_unlock_args(uint32_t flags, void *astarg, struct dlm_args *args)
1585{
1586 if (flags & ~(DLM_LKF_CANCEL | DLM_LKF_VALBLK | DLM_LKF_IVVALBLK |
1587 DLM_LKF_FORCEUNLOCK))
1588 return -EINVAL;
1589
1590 args->flags = flags;
1591 args->astparam = (long) astarg;
1592 return 0;
1593}
1594
1595static int validate_lock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
1596 struct dlm_args *args)
1597{
1598 int rv = -EINVAL;
1599
1600 if (args->flags & DLM_LKF_CONVERT) {
1601 if (lkb->lkb_flags & DLM_IFL_MSTCPY)
1602 goto out;
1603
1604 if (args->flags & DLM_LKF_QUECVT &&
1605 !__quecvt_compat_matrix[lkb->lkb_grmode+1][args->mode+1])
1606 goto out;
1607
1608 rv = -EBUSY;
1609 if (lkb->lkb_status != DLM_LKSTS_GRANTED)
1610 goto out;
1611
1612 if (lkb->lkb_wait_type)
1613 goto out;
1614 }
1615
1616 lkb->lkb_exflags = args->flags;
1617 lkb->lkb_sbflags = 0;
1618 lkb->lkb_astaddr = args->astaddr;
1619 lkb->lkb_astparam = args->astparam;
1620 lkb->lkb_bastaddr = args->bastaddr;
1621 lkb->lkb_rqmode = args->mode;
1622 lkb->lkb_lksb = args->lksb;
1623 lkb->lkb_lvbptr = args->lksb->sb_lvbptr;
1624 lkb->lkb_ownpid = (int) current->pid;
e7fd4179
DT
1625 rv = 0;
1626 out:
1627 return rv;
1628}
1629
1630static int validate_unlock_args(struct dlm_lkb *lkb, struct dlm_args *args)
1631{
1632 int rv = -EINVAL;
1633
1634 if (lkb->lkb_flags & DLM_IFL_MSTCPY)
1635 goto out;
1636
1637 if (args->flags & DLM_LKF_FORCEUNLOCK)
1638 goto out_ok;
1639
1640 if (args->flags & DLM_LKF_CANCEL &&
1641 lkb->lkb_status == DLM_LKSTS_GRANTED)
1642 goto out;
1643
1644 if (!(args->flags & DLM_LKF_CANCEL) &&
1645 lkb->lkb_status != DLM_LKSTS_GRANTED)
1646 goto out;
1647
1648 rv = -EBUSY;
1649 if (lkb->lkb_wait_type)
1650 goto out;
1651
1652 out_ok:
1653 lkb->lkb_exflags = args->flags;
1654 lkb->lkb_sbflags = 0;
1655 lkb->lkb_astparam = args->astparam;
1656
1657 rv = 0;
1658 out:
1659 return rv;
1660}
1661
1662/*
1663 * Four stage 4 varieties:
1664 * do_request(), do_convert(), do_unlock(), do_cancel()
1665 * These are called on the master node for the given lock and
1666 * from the central locking logic.
1667 */
1668
1669static int do_request(struct dlm_rsb *r, struct dlm_lkb *lkb)
1670{
1671 int error = 0;
1672
90135925 1673 if (can_be_granted(r, lkb, 1)) {
e7fd4179
DT
1674 grant_lock(r, lkb);
1675 queue_cast(r, lkb, 0);
1676 goto out;
1677 }
1678
1679 if (can_be_queued(lkb)) {
1680 error = -EINPROGRESS;
1681 add_lkb(r, lkb, DLM_LKSTS_WAITING);
1682 send_blocking_asts(r, lkb);
1683 goto out;
1684 }
1685
1686 error = -EAGAIN;
1687 if (force_blocking_asts(lkb))
1688 send_blocking_asts_all(r, lkb);
1689 queue_cast(r, lkb, -EAGAIN);
1690
1691 out:
1692 return error;
1693}
1694
1695static int do_convert(struct dlm_rsb *r, struct dlm_lkb *lkb)
1696{
1697 int error = 0;
1698
1699 /* changing an existing lock may allow others to be granted */
1700
90135925 1701 if (can_be_granted(r, lkb, 1)) {
e7fd4179
DT
1702 grant_lock(r, lkb);
1703 queue_cast(r, lkb, 0);
1704 grant_pending_locks(r);
1705 goto out;
1706 }
1707
1708 if (can_be_queued(lkb)) {
1709 if (is_demoted(lkb))
1710 grant_pending_locks(r);
1711 error = -EINPROGRESS;
1712 del_lkb(r, lkb);
1713 add_lkb(r, lkb, DLM_LKSTS_CONVERT);
1714 send_blocking_asts(r, lkb);
1715 goto out;
1716 }
1717
1718 error = -EAGAIN;
1719 if (force_blocking_asts(lkb))
1720 send_blocking_asts_all(r, lkb);
1721 queue_cast(r, lkb, -EAGAIN);
1722
1723 out:
1724 return error;
1725}
1726
1727static int do_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1728{
1729 remove_lock(r, lkb);
1730 queue_cast(r, lkb, -DLM_EUNLOCK);
1731 grant_pending_locks(r);
1732 return -DLM_EUNLOCK;
1733}
1734
597d0cae
DT
1735/* FIXME: if revert_lock() finds that the lkb is granted, we should
1736 skip the queue_cast(ECANCEL). It indicates that the request/convert
1737 completed (and queued a normal ast) just before the cancel; we don't
1738 want to clobber the sb_result for the normal ast with ECANCEL. */
1739
e7fd4179
DT
1740static int do_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb)
1741{
1742 revert_lock(r, lkb);
1743 queue_cast(r, lkb, -DLM_ECANCEL);
1744 grant_pending_locks(r);
1745 return -DLM_ECANCEL;
1746}
1747
1748/*
1749 * Four stage 3 varieties:
1750 * _request_lock(), _convert_lock(), _unlock_lock(), _cancel_lock()
1751 */
1752
1753/* add a new lkb to a possibly new rsb, called by requesting process */
1754
1755static int _request_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1756{
1757 int error;
1758
1759 /* set_master: sets lkb nodeid from r */
1760
1761 error = set_master(r, lkb);
1762 if (error < 0)
1763 goto out;
1764 if (error) {
1765 error = 0;
1766 goto out;
1767 }
1768
1769 if (is_remote(r))
1770 /* receive_request() calls do_request() on remote node */
1771 error = send_request(r, lkb);
1772 else
1773 error = do_request(r, lkb);
1774 out:
1775 return error;
1776}
1777
3bcd3687 1778/* change some property of an existing lkb, e.g. mode */
e7fd4179
DT
1779
1780static int _convert_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1781{
1782 int error;
1783
1784 if (is_remote(r))
1785 /* receive_convert() calls do_convert() on remote node */
1786 error = send_convert(r, lkb);
1787 else
1788 error = do_convert(r, lkb);
1789
1790 return error;
1791}
1792
1793/* remove an existing lkb from the granted queue */
1794
1795static int _unlock_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1796{
1797 int error;
1798
1799 if (is_remote(r))
1800 /* receive_unlock() calls do_unlock() on remote node */
1801 error = send_unlock(r, lkb);
1802 else
1803 error = do_unlock(r, lkb);
1804
1805 return error;
1806}
1807
1808/* remove an existing lkb from the convert or wait queue */
1809
1810static int _cancel_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1811{
1812 int error;
1813
1814 if (is_remote(r))
1815 /* receive_cancel() calls do_cancel() on remote node */
1816 error = send_cancel(r, lkb);
1817 else
1818 error = do_cancel(r, lkb);
1819
1820 return error;
1821}
1822
1823/*
1824 * Four stage 2 varieties:
1825 * request_lock(), convert_lock(), unlock_lock(), cancel_lock()
1826 */
1827
1828static int request_lock(struct dlm_ls *ls, struct dlm_lkb *lkb, char *name,
1829 int len, struct dlm_args *args)
1830{
1831 struct dlm_rsb *r;
1832 int error;
1833
1834 error = validate_lock_args(ls, lkb, args);
1835 if (error)
1836 goto out;
1837
1838 error = find_rsb(ls, name, len, R_CREATE, &r);
1839 if (error)
1840 goto out;
1841
1842 lock_rsb(r);
1843
1844 attach_lkb(r, lkb);
1845 lkb->lkb_lksb->sb_lkid = lkb->lkb_id;
1846
1847 error = _request_lock(r, lkb);
1848
1849 unlock_rsb(r);
1850 put_rsb(r);
1851
1852 out:
1853 return error;
1854}
1855
1856static int convert_lock(struct dlm_ls *ls, struct dlm_lkb *lkb,
1857 struct dlm_args *args)
1858{
1859 struct dlm_rsb *r;
1860 int error;
1861
1862 r = lkb->lkb_resource;
1863
1864 hold_rsb(r);
1865 lock_rsb(r);
1866
1867 error = validate_lock_args(ls, lkb, args);
1868 if (error)
1869 goto out;
1870
1871 error = _convert_lock(r, lkb);
1872 out:
1873 unlock_rsb(r);
1874 put_rsb(r);
1875 return error;
1876}
1877
1878static int unlock_lock(struct dlm_ls *ls, struct dlm_lkb *lkb,
1879 struct dlm_args *args)
1880{
1881 struct dlm_rsb *r;
1882 int error;
1883
1884 r = lkb->lkb_resource;
1885
1886 hold_rsb(r);
1887 lock_rsb(r);
1888
1889 error = validate_unlock_args(lkb, args);
1890 if (error)
1891 goto out;
1892
1893 error = _unlock_lock(r, lkb);
1894 out:
1895 unlock_rsb(r);
1896 put_rsb(r);
1897 return error;
1898}
1899
1900static int cancel_lock(struct dlm_ls *ls, struct dlm_lkb *lkb,
1901 struct dlm_args *args)
1902{
1903 struct dlm_rsb *r;
1904 int error;
1905
1906 r = lkb->lkb_resource;
1907
1908 hold_rsb(r);
1909 lock_rsb(r);
1910
1911 error = validate_unlock_args(lkb, args);
1912 if (error)
1913 goto out;
1914
1915 error = _cancel_lock(r, lkb);
1916 out:
1917 unlock_rsb(r);
1918 put_rsb(r);
1919 return error;
1920}
1921
1922/*
1923 * Two stage 1 varieties: dlm_lock() and dlm_unlock()
1924 */
1925
1926int dlm_lock(dlm_lockspace_t *lockspace,
1927 int mode,
1928 struct dlm_lksb *lksb,
1929 uint32_t flags,
1930 void *name,
1931 unsigned int namelen,
1932 uint32_t parent_lkid,
1933 void (*ast) (void *astarg),
1934 void *astarg,
3bcd3687 1935 void (*bast) (void *astarg, int mode))
e7fd4179
DT
1936{
1937 struct dlm_ls *ls;
1938 struct dlm_lkb *lkb;
1939 struct dlm_args args;
1940 int error, convert = flags & DLM_LKF_CONVERT;
1941
1942 ls = dlm_find_lockspace_local(lockspace);
1943 if (!ls)
1944 return -EINVAL;
1945
1946 lock_recovery(ls);
1947
1948 if (convert)
1949 error = find_lkb(ls, lksb->sb_lkid, &lkb);
1950 else
1951 error = create_lkb(ls, &lkb);
1952
1953 if (error)
1954 goto out;
1955
1956 error = set_lock_args(mode, lksb, flags, namelen, parent_lkid, ast,
3bcd3687 1957 astarg, bast, &args);
e7fd4179
DT
1958 if (error)
1959 goto out_put;
1960
1961 if (convert)
1962 error = convert_lock(ls, lkb, &args);
1963 else
1964 error = request_lock(ls, lkb, name, namelen, &args);
1965
1966 if (error == -EINPROGRESS)
1967 error = 0;
1968 out_put:
1969 if (convert || error)
b3f58d8f 1970 __put_lkb(ls, lkb);
e7fd4179
DT
1971 if (error == -EAGAIN)
1972 error = 0;
1973 out:
1974 unlock_recovery(ls);
1975 dlm_put_lockspace(ls);
1976 return error;
1977}
1978
1979int dlm_unlock(dlm_lockspace_t *lockspace,
1980 uint32_t lkid,
1981 uint32_t flags,
1982 struct dlm_lksb *lksb,
1983 void *astarg)
1984{
1985 struct dlm_ls *ls;
1986 struct dlm_lkb *lkb;
1987 struct dlm_args args;
1988 int error;
1989
1990 ls = dlm_find_lockspace_local(lockspace);
1991 if (!ls)
1992 return -EINVAL;
1993
1994 lock_recovery(ls);
1995
1996 error = find_lkb(ls, lkid, &lkb);
1997 if (error)
1998 goto out;
1999
2000 error = set_unlock_args(flags, astarg, &args);
2001 if (error)
2002 goto out_put;
2003
2004 if (flags & DLM_LKF_CANCEL)
2005 error = cancel_lock(ls, lkb, &args);
2006 else
2007 error = unlock_lock(ls, lkb, &args);
2008
2009 if (error == -DLM_EUNLOCK || error == -DLM_ECANCEL)
2010 error = 0;
2011 out_put:
b3f58d8f 2012 dlm_put_lkb(lkb);
e7fd4179
DT
2013 out:
2014 unlock_recovery(ls);
2015 dlm_put_lockspace(ls);
2016 return error;
2017}
2018
2019/*
2020 * send/receive routines for remote operations and replies
2021 *
2022 * send_args
2023 * send_common
2024 * send_request receive_request
2025 * send_convert receive_convert
2026 * send_unlock receive_unlock
2027 * send_cancel receive_cancel
2028 * send_grant receive_grant
2029 * send_bast receive_bast
2030 * send_lookup receive_lookup
2031 * send_remove receive_remove
2032 *
2033 * send_common_reply
2034 * receive_request_reply send_request_reply
2035 * receive_convert_reply send_convert_reply
2036 * receive_unlock_reply send_unlock_reply
2037 * receive_cancel_reply send_cancel_reply
2038 * receive_lookup_reply send_lookup_reply
2039 */
2040
2041static int create_message(struct dlm_rsb *r, struct dlm_lkb *lkb,
2042 int to_nodeid, int mstype,
2043 struct dlm_message **ms_ret,
2044 struct dlm_mhandle **mh_ret)
2045{
2046 struct dlm_message *ms;
2047 struct dlm_mhandle *mh;
2048 char *mb;
2049 int mb_len = sizeof(struct dlm_message);
2050
2051 switch (mstype) {
2052 case DLM_MSG_REQUEST:
2053 case DLM_MSG_LOOKUP:
2054 case DLM_MSG_REMOVE:
2055 mb_len += r->res_length;
2056 break;
2057 case DLM_MSG_CONVERT:
2058 case DLM_MSG_UNLOCK:
2059 case DLM_MSG_REQUEST_REPLY:
2060 case DLM_MSG_CONVERT_REPLY:
2061 case DLM_MSG_GRANT:
2062 if (lkb && lkb->lkb_lvbptr)
2063 mb_len += r->res_ls->ls_lvblen;
2064 break;
2065 }
2066
2067 /* get_buffer gives us a message handle (mh) that we need to
2068 pass into lowcomms_commit and a message buffer (mb) that we
2069 write our data into */
2070
2071 mh = dlm_lowcomms_get_buffer(to_nodeid, mb_len, GFP_KERNEL, &mb);
2072 if (!mh)
2073 return -ENOBUFS;
2074
2075 memset(mb, 0, mb_len);
2076
2077 ms = (struct dlm_message *) mb;
2078
2079 ms->m_header.h_version = (DLM_HEADER_MAJOR | DLM_HEADER_MINOR);
2080 ms->m_header.h_lockspace = r->res_ls->ls_global_id;
2081 ms->m_header.h_nodeid = dlm_our_nodeid();
2082 ms->m_header.h_length = mb_len;
2083 ms->m_header.h_cmd = DLM_MSG;
2084
2085 ms->m_type = mstype;
2086
2087 *mh_ret = mh;
2088 *ms_ret = ms;
2089 return 0;
2090}
2091
2092/* further lowcomms enhancements or alternate implementations may make
2093 the return value from this function useful at some point */
2094
2095static int send_message(struct dlm_mhandle *mh, struct dlm_message *ms)
2096{
2097 dlm_message_out(ms);
2098 dlm_lowcomms_commit_buffer(mh);
2099 return 0;
2100}
2101
2102static void send_args(struct dlm_rsb *r, struct dlm_lkb *lkb,
2103 struct dlm_message *ms)
2104{
2105 ms->m_nodeid = lkb->lkb_nodeid;
2106 ms->m_pid = lkb->lkb_ownpid;
2107 ms->m_lkid = lkb->lkb_id;
2108 ms->m_remid = lkb->lkb_remid;
2109 ms->m_exflags = lkb->lkb_exflags;
2110 ms->m_sbflags = lkb->lkb_sbflags;
2111 ms->m_flags = lkb->lkb_flags;
2112 ms->m_lvbseq = lkb->lkb_lvbseq;
2113 ms->m_status = lkb->lkb_status;
2114 ms->m_grmode = lkb->lkb_grmode;
2115 ms->m_rqmode = lkb->lkb_rqmode;
2116 ms->m_hash = r->res_hash;
2117
2118 /* m_result and m_bastmode are set from function args,
2119 not from lkb fields */
2120
2121 if (lkb->lkb_bastaddr)
2122 ms->m_asts |= AST_BAST;
2123 if (lkb->lkb_astaddr)
2124 ms->m_asts |= AST_COMP;
2125
e7fd4179
DT
2126 if (ms->m_type == DLM_MSG_REQUEST || ms->m_type == DLM_MSG_LOOKUP)
2127 memcpy(ms->m_extra, r->res_name, r->res_length);
2128
2129 else if (lkb->lkb_lvbptr)
2130 memcpy(ms->m_extra, lkb->lkb_lvbptr, r->res_ls->ls_lvblen);
2131
2132}
2133
2134static int send_common(struct dlm_rsb *r, struct dlm_lkb *lkb, int mstype)
2135{
2136 struct dlm_message *ms;
2137 struct dlm_mhandle *mh;
2138 int to_nodeid, error;
2139
2140 add_to_waiters(lkb, mstype);
2141
2142 to_nodeid = r->res_nodeid;
2143
2144 error = create_message(r, lkb, to_nodeid, mstype, &ms, &mh);
2145 if (error)
2146 goto fail;
2147
2148 send_args(r, lkb, ms);
2149
2150 error = send_message(mh, ms);
2151 if (error)
2152 goto fail;
2153 return 0;
2154
2155 fail:
2156 remove_from_waiters(lkb);
2157 return error;
2158}
2159
2160static int send_request(struct dlm_rsb *r, struct dlm_lkb *lkb)
2161{
2162 return send_common(r, lkb, DLM_MSG_REQUEST);
2163}
2164
2165static int send_convert(struct dlm_rsb *r, struct dlm_lkb *lkb)
2166{
2167 int error;
2168
2169 error = send_common(r, lkb, DLM_MSG_CONVERT);
2170
2171 /* down conversions go without a reply from the master */
2172 if (!error && down_conversion(lkb)) {
2173 remove_from_waiters(lkb);
2174 r->res_ls->ls_stub_ms.m_result = 0;
2175 __receive_convert_reply(r, lkb, &r->res_ls->ls_stub_ms);
2176 }
2177
2178 return error;
2179}
2180
2181/* FIXME: if this lkb is the only lock we hold on the rsb, then set
2182 MASTER_UNCERTAIN to force the next request on the rsb to confirm
2183 that the master is still correct. */
2184
2185static int send_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2186{
2187 return send_common(r, lkb, DLM_MSG_UNLOCK);
2188}
2189
2190static int send_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb)
2191{
2192 return send_common(r, lkb, DLM_MSG_CANCEL);
2193}
2194
2195static int send_grant(struct dlm_rsb *r, struct dlm_lkb *lkb)
2196{
2197 struct dlm_message *ms;
2198 struct dlm_mhandle *mh;
2199 int to_nodeid, error;
2200
2201 to_nodeid = lkb->lkb_nodeid;
2202
2203 error = create_message(r, lkb, to_nodeid, DLM_MSG_GRANT, &ms, &mh);
2204 if (error)
2205 goto out;
2206
2207 send_args(r, lkb, ms);
2208
2209 ms->m_result = 0;
2210
2211 error = send_message(mh, ms);
2212 out:
2213 return error;
2214}
2215
2216static int send_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int mode)
2217{
2218 struct dlm_message *ms;
2219 struct dlm_mhandle *mh;
2220 int to_nodeid, error;
2221
2222 to_nodeid = lkb->lkb_nodeid;
2223
2224 error = create_message(r, NULL, to_nodeid, DLM_MSG_BAST, &ms, &mh);
2225 if (error)
2226 goto out;
2227
2228 send_args(r, lkb, ms);
2229
2230 ms->m_bastmode = mode;
2231
2232 error = send_message(mh, ms);
2233 out:
2234 return error;
2235}
2236
2237static int send_lookup(struct dlm_rsb *r, struct dlm_lkb *lkb)
2238{
2239 struct dlm_message *ms;
2240 struct dlm_mhandle *mh;
2241 int to_nodeid, error;
2242
2243 add_to_waiters(lkb, DLM_MSG_LOOKUP);
2244
2245 to_nodeid = dlm_dir_nodeid(r);
2246
2247 error = create_message(r, NULL, to_nodeid, DLM_MSG_LOOKUP, &ms, &mh);
2248 if (error)
2249 goto fail;
2250
2251 send_args(r, lkb, ms);
2252
2253 error = send_message(mh, ms);
2254 if (error)
2255 goto fail;
2256 return 0;
2257
2258 fail:
2259 remove_from_waiters(lkb);
2260 return error;
2261}
2262
2263static int send_remove(struct dlm_rsb *r)
2264{
2265 struct dlm_message *ms;
2266 struct dlm_mhandle *mh;
2267 int to_nodeid, error;
2268
2269 to_nodeid = dlm_dir_nodeid(r);
2270
2271 error = create_message(r, NULL, to_nodeid, DLM_MSG_REMOVE, &ms, &mh);
2272 if (error)
2273 goto out;
2274
2275 memcpy(ms->m_extra, r->res_name, r->res_length);
2276 ms->m_hash = r->res_hash;
2277
2278 error = send_message(mh, ms);
2279 out:
2280 return error;
2281}
2282
2283static int send_common_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
2284 int mstype, int rv)
2285{
2286 struct dlm_message *ms;
2287 struct dlm_mhandle *mh;
2288 int to_nodeid, error;
2289
2290 to_nodeid = lkb->lkb_nodeid;
2291
2292 error = create_message(r, lkb, to_nodeid, mstype, &ms, &mh);
2293 if (error)
2294 goto out;
2295
2296 send_args(r, lkb, ms);
2297
2298 ms->m_result = rv;
2299
2300 error = send_message(mh, ms);
2301 out:
2302 return error;
2303}
2304
2305static int send_request_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
2306{
2307 return send_common_reply(r, lkb, DLM_MSG_REQUEST_REPLY, rv);
2308}
2309
2310static int send_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
2311{
2312 return send_common_reply(r, lkb, DLM_MSG_CONVERT_REPLY, rv);
2313}
2314
2315static int send_unlock_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
2316{
2317 return send_common_reply(r, lkb, DLM_MSG_UNLOCK_REPLY, rv);
2318}
2319
2320static int send_cancel_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
2321{
2322 return send_common_reply(r, lkb, DLM_MSG_CANCEL_REPLY, rv);
2323}
2324
2325static int send_lookup_reply(struct dlm_ls *ls, struct dlm_message *ms_in,
2326 int ret_nodeid, int rv)
2327{
2328 struct dlm_rsb *r = &ls->ls_stub_rsb;
2329 struct dlm_message *ms;
2330 struct dlm_mhandle *mh;
2331 int error, nodeid = ms_in->m_header.h_nodeid;
2332
2333 error = create_message(r, NULL, nodeid, DLM_MSG_LOOKUP_REPLY, &ms, &mh);
2334 if (error)
2335 goto out;
2336
2337 ms->m_lkid = ms_in->m_lkid;
2338 ms->m_result = rv;
2339 ms->m_nodeid = ret_nodeid;
2340
2341 error = send_message(mh, ms);
2342 out:
2343 return error;
2344}
2345
2346/* which args we save from a received message depends heavily on the type
2347 of message, unlike the send side where we can safely send everything about
2348 the lkb for any type of message */
2349
2350static void receive_flags(struct dlm_lkb *lkb, struct dlm_message *ms)
2351{
2352 lkb->lkb_exflags = ms->m_exflags;
2353 lkb->lkb_flags = (lkb->lkb_flags & 0xFFFF0000) |
2354 (ms->m_flags & 0x0000FFFF);
2355}
2356
2357static void receive_flags_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
2358{
2359 lkb->lkb_sbflags = ms->m_sbflags;
2360 lkb->lkb_flags = (lkb->lkb_flags & 0xFFFF0000) |
2361 (ms->m_flags & 0x0000FFFF);
2362}
2363
2364static int receive_extralen(struct dlm_message *ms)
2365{
2366 return (ms->m_header.h_length - sizeof(struct dlm_message));
2367}
2368
e7fd4179
DT
2369static int receive_lvb(struct dlm_ls *ls, struct dlm_lkb *lkb,
2370 struct dlm_message *ms)
2371{
2372 int len;
2373
2374 if (lkb->lkb_exflags & DLM_LKF_VALBLK) {
2375 if (!lkb->lkb_lvbptr)
2376 lkb->lkb_lvbptr = allocate_lvb(ls);
2377 if (!lkb->lkb_lvbptr)
2378 return -ENOMEM;
2379 len = receive_extralen(ms);
2380 memcpy(lkb->lkb_lvbptr, ms->m_extra, len);
2381 }
2382 return 0;
2383}
2384
2385static int receive_request_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
2386 struct dlm_message *ms)
2387{
2388 lkb->lkb_nodeid = ms->m_header.h_nodeid;
2389 lkb->lkb_ownpid = ms->m_pid;
2390 lkb->lkb_remid = ms->m_lkid;
2391 lkb->lkb_grmode = DLM_LOCK_IV;
2392 lkb->lkb_rqmode = ms->m_rqmode;
2393 lkb->lkb_bastaddr = (void *) (long) (ms->m_asts & AST_BAST);
2394 lkb->lkb_astaddr = (void *) (long) (ms->m_asts & AST_COMP);
2395
2396 DLM_ASSERT(is_master_copy(lkb), dlm_print_lkb(lkb););
2397
e7fd4179
DT
2398 if (receive_lvb(ls, lkb, ms))
2399 return -ENOMEM;
2400
2401 return 0;
2402}
2403
2404static int receive_convert_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
2405 struct dlm_message *ms)
2406{
2407 if (lkb->lkb_nodeid != ms->m_header.h_nodeid) {
2408 log_error(ls, "convert_args nodeid %d %d lkid %x %x",
2409 lkb->lkb_nodeid, ms->m_header.h_nodeid,
2410 lkb->lkb_id, lkb->lkb_remid);
2411 return -EINVAL;
2412 }
2413
2414 if (!is_master_copy(lkb))
2415 return -EINVAL;
2416
2417 if (lkb->lkb_status != DLM_LKSTS_GRANTED)
2418 return -EBUSY;
2419
e7fd4179
DT
2420 if (receive_lvb(ls, lkb, ms))
2421 return -ENOMEM;
2422
2423 lkb->lkb_rqmode = ms->m_rqmode;
2424 lkb->lkb_lvbseq = ms->m_lvbseq;
2425
2426 return 0;
2427}
2428
2429static int receive_unlock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
2430 struct dlm_message *ms)
2431{
2432 if (!is_master_copy(lkb))
2433 return -EINVAL;
2434 if (receive_lvb(ls, lkb, ms))
2435 return -ENOMEM;
2436 return 0;
2437}
2438
2439/* We fill in the stub-lkb fields with the info that send_xxxx_reply()
2440 uses to send a reply and that the remote end uses to process the reply. */
2441
2442static void setup_stub_lkb(struct dlm_ls *ls, struct dlm_message *ms)
2443{
2444 struct dlm_lkb *lkb = &ls->ls_stub_lkb;
2445 lkb->lkb_nodeid = ms->m_header.h_nodeid;
2446 lkb->lkb_remid = ms->m_lkid;
2447}
2448
2449static void receive_request(struct dlm_ls *ls, struct dlm_message *ms)
2450{
2451 struct dlm_lkb *lkb;
2452 struct dlm_rsb *r;
2453 int error, namelen;
2454
2455 error = create_lkb(ls, &lkb);
2456 if (error)
2457 goto fail;
2458
2459 receive_flags(lkb, ms);
2460 lkb->lkb_flags |= DLM_IFL_MSTCPY;
2461 error = receive_request_args(ls, lkb, ms);
2462 if (error) {
b3f58d8f 2463 __put_lkb(ls, lkb);
e7fd4179
DT
2464 goto fail;
2465 }
2466
2467 namelen = receive_extralen(ms);
2468
2469 error = find_rsb(ls, ms->m_extra, namelen, R_MASTER, &r);
2470 if (error) {
b3f58d8f 2471 __put_lkb(ls, lkb);
e7fd4179
DT
2472 goto fail;
2473 }
2474
2475 lock_rsb(r);
2476
2477 attach_lkb(r, lkb);
2478 error = do_request(r, lkb);
2479 send_request_reply(r, lkb, error);
2480
2481 unlock_rsb(r);
2482 put_rsb(r);
2483
2484 if (error == -EINPROGRESS)
2485 error = 0;
2486 if (error)
b3f58d8f 2487 dlm_put_lkb(lkb);
e7fd4179
DT
2488 return;
2489
2490 fail:
2491 setup_stub_lkb(ls, ms);
2492 send_request_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
2493}
2494
2495static void receive_convert(struct dlm_ls *ls, struct dlm_message *ms)
2496{
2497 struct dlm_lkb *lkb;
2498 struct dlm_rsb *r;
90135925 2499 int error, reply = 1;
e7fd4179
DT
2500
2501 error = find_lkb(ls, ms->m_remid, &lkb);
2502 if (error)
2503 goto fail;
2504
2505 r = lkb->lkb_resource;
2506
2507 hold_rsb(r);
2508 lock_rsb(r);
2509
2510 receive_flags(lkb, ms);
2511 error = receive_convert_args(ls, lkb, ms);
2512 if (error)
2513 goto out;
2514 reply = !down_conversion(lkb);
2515
2516 error = do_convert(r, lkb);
2517 out:
2518 if (reply)
2519 send_convert_reply(r, lkb, error);
2520
2521 unlock_rsb(r);
2522 put_rsb(r);
b3f58d8f 2523 dlm_put_lkb(lkb);
e7fd4179
DT
2524 return;
2525
2526 fail:
2527 setup_stub_lkb(ls, ms);
2528 send_convert_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
2529}
2530
2531static void receive_unlock(struct dlm_ls *ls, struct dlm_message *ms)
2532{
2533 struct dlm_lkb *lkb;
2534 struct dlm_rsb *r;
2535 int error;
2536
2537 error = find_lkb(ls, ms->m_remid, &lkb);
2538 if (error)
2539 goto fail;
2540
2541 r = lkb->lkb_resource;
2542
2543 hold_rsb(r);
2544 lock_rsb(r);
2545
2546 receive_flags(lkb, ms);
2547 error = receive_unlock_args(ls, lkb, ms);
2548 if (error)
2549 goto out;
2550
2551 error = do_unlock(r, lkb);
2552 out:
2553 send_unlock_reply(r, lkb, error);
2554
2555 unlock_rsb(r);
2556 put_rsb(r);
b3f58d8f 2557 dlm_put_lkb(lkb);
e7fd4179
DT
2558 return;
2559
2560 fail:
2561 setup_stub_lkb(ls, ms);
2562 send_unlock_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
2563}
2564
2565static void receive_cancel(struct dlm_ls *ls, struct dlm_message *ms)
2566{
2567 struct dlm_lkb *lkb;
2568 struct dlm_rsb *r;
2569 int error;
2570
2571 error = find_lkb(ls, ms->m_remid, &lkb);
2572 if (error)
2573 goto fail;
2574
2575 receive_flags(lkb, ms);
2576
2577 r = lkb->lkb_resource;
2578
2579 hold_rsb(r);
2580 lock_rsb(r);
2581
2582 error = do_cancel(r, lkb);
2583 send_cancel_reply(r, lkb, error);
2584
2585 unlock_rsb(r);
2586 put_rsb(r);
b3f58d8f 2587 dlm_put_lkb(lkb);
e7fd4179
DT
2588 return;
2589
2590 fail:
2591 setup_stub_lkb(ls, ms);
2592 send_cancel_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
2593}
2594
2595static void receive_grant(struct dlm_ls *ls, struct dlm_message *ms)
2596{
2597 struct dlm_lkb *lkb;
2598 struct dlm_rsb *r;
2599 int error;
2600
2601 error = find_lkb(ls, ms->m_remid, &lkb);
2602 if (error) {
2603 log_error(ls, "receive_grant no lkb");
2604 return;
2605 }
2606 DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb););
2607
2608 r = lkb->lkb_resource;
2609
2610 hold_rsb(r);
2611 lock_rsb(r);
2612
2613 receive_flags_reply(lkb, ms);
2614 grant_lock_pc(r, lkb, ms);
2615 queue_cast(r, lkb, 0);
2616
2617 unlock_rsb(r);
2618 put_rsb(r);
b3f58d8f 2619 dlm_put_lkb(lkb);
e7fd4179
DT
2620}
2621
2622static void receive_bast(struct dlm_ls *ls, struct dlm_message *ms)
2623{
2624 struct dlm_lkb *lkb;
2625 struct dlm_rsb *r;
2626 int error;
2627
2628 error = find_lkb(ls, ms->m_remid, &lkb);
2629 if (error) {
2630 log_error(ls, "receive_bast no lkb");
2631 return;
2632 }
2633 DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb););
2634
2635 r = lkb->lkb_resource;
2636
2637 hold_rsb(r);
2638 lock_rsb(r);
2639
2640 queue_bast(r, lkb, ms->m_bastmode);
2641
2642 unlock_rsb(r);
2643 put_rsb(r);
b3f58d8f 2644 dlm_put_lkb(lkb);
e7fd4179
DT
2645}
2646
2647static void receive_lookup(struct dlm_ls *ls, struct dlm_message *ms)
2648{
2649 int len, error, ret_nodeid, dir_nodeid, from_nodeid, our_nodeid;
2650
2651 from_nodeid = ms->m_header.h_nodeid;
2652 our_nodeid = dlm_our_nodeid();
2653
2654 len = receive_extralen(ms);
2655
2656 dir_nodeid = dlm_hash2nodeid(ls, ms->m_hash);
2657 if (dir_nodeid != our_nodeid) {
2658 log_error(ls, "lookup dir_nodeid %d from %d",
2659 dir_nodeid, from_nodeid);
2660 error = -EINVAL;
2661 ret_nodeid = -1;
2662 goto out;
2663 }
2664
2665 error = dlm_dir_lookup(ls, from_nodeid, ms->m_extra, len, &ret_nodeid);
2666
2667 /* Optimization: we're master so treat lookup as a request */
2668 if (!error && ret_nodeid == our_nodeid) {
2669 receive_request(ls, ms);
2670 return;
2671 }
2672 out:
2673 send_lookup_reply(ls, ms, ret_nodeid, error);
2674}
2675
2676static void receive_remove(struct dlm_ls *ls, struct dlm_message *ms)
2677{
2678 int len, dir_nodeid, from_nodeid;
2679
2680 from_nodeid = ms->m_header.h_nodeid;
2681
2682 len = receive_extralen(ms);
2683
2684 dir_nodeid = dlm_hash2nodeid(ls, ms->m_hash);
2685 if (dir_nodeid != dlm_our_nodeid()) {
2686 log_error(ls, "remove dir entry dir_nodeid %d from %d",
2687 dir_nodeid, from_nodeid);
2688 return;
2689 }
2690
2691 dlm_dir_remove_entry(ls, from_nodeid, ms->m_extra, len);
2692}
2693
2694static void receive_request_reply(struct dlm_ls *ls, struct dlm_message *ms)
2695{
2696 struct dlm_lkb *lkb;
2697 struct dlm_rsb *r;
2698 int error, mstype;
2699
2700 error = find_lkb(ls, ms->m_remid, &lkb);
2701 if (error) {
2702 log_error(ls, "receive_request_reply no lkb");
2703 return;
2704 }
2705 DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb););
2706
2707 mstype = lkb->lkb_wait_type;
2708 error = remove_from_waiters(lkb);
2709 if (error) {
2710 log_error(ls, "receive_request_reply not on waiters");
2711 goto out;
2712 }
2713
2714 /* this is the value returned from do_request() on the master */
2715 error = ms->m_result;
2716
2717 r = lkb->lkb_resource;
2718 hold_rsb(r);
2719 lock_rsb(r);
2720
2721 /* Optimization: the dir node was also the master, so it took our
2722 lookup as a request and sent request reply instead of lookup reply */
2723 if (mstype == DLM_MSG_LOOKUP) {
2724 r->res_nodeid = ms->m_header.h_nodeid;
2725 lkb->lkb_nodeid = r->res_nodeid;
2726 }
2727
2728 switch (error) {
2729 case -EAGAIN:
2730 /* request would block (be queued) on remote master;
2731 the unhold undoes the original ref from create_lkb()
2732 so it leads to the lkb being freed */
2733 queue_cast(r, lkb, -EAGAIN);
2734 confirm_master(r, -EAGAIN);
2735 unhold_lkb(lkb);
2736 break;
2737
2738 case -EINPROGRESS:
2739 case 0:
2740 /* request was queued or granted on remote master */
2741 receive_flags_reply(lkb, ms);
2742 lkb->lkb_remid = ms->m_lkid;
2743 if (error)
2744 add_lkb(r, lkb, DLM_LKSTS_WAITING);
2745 else {
2746 grant_lock_pc(r, lkb, ms);
2747 queue_cast(r, lkb, 0);
2748 }
2749 confirm_master(r, error);
2750 break;
2751
597d0cae 2752 case -EBADR:
e7fd4179
DT
2753 case -ENOTBLK:
2754 /* find_rsb failed to find rsb or rsb wasn't master */
2755 r->res_nodeid = -1;
2756 lkb->lkb_nodeid = -1;
2757 _request_lock(r, lkb);
2758 break;
2759
2760 default:
2761 log_error(ls, "receive_request_reply error %d", error);
2762 }
2763
2764 unlock_rsb(r);
2765 put_rsb(r);
2766 out:
b3f58d8f 2767 dlm_put_lkb(lkb);
e7fd4179
DT
2768}
2769
2770static void __receive_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
2771 struct dlm_message *ms)
2772{
2773 int error = ms->m_result;
2774
2775 /* this is the value returned from do_convert() on the master */
2776
2777 switch (error) {
2778 case -EAGAIN:
2779 /* convert would block (be queued) on remote master */
2780 queue_cast(r, lkb, -EAGAIN);
2781 break;
2782
2783 case -EINPROGRESS:
2784 /* convert was queued on remote master */
2785 del_lkb(r, lkb);
2786 add_lkb(r, lkb, DLM_LKSTS_CONVERT);
2787 break;
2788
2789 case 0:
2790 /* convert was granted on remote master */
2791 receive_flags_reply(lkb, ms);
2792 grant_lock_pc(r, lkb, ms);
2793 queue_cast(r, lkb, 0);
2794 break;
2795
2796 default:
2797 log_error(r->res_ls, "receive_convert_reply error %d", error);
2798 }
2799}
2800
2801static void _receive_convert_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
2802{
2803 struct dlm_rsb *r = lkb->lkb_resource;
2804
2805 hold_rsb(r);
2806 lock_rsb(r);
2807
2808 __receive_convert_reply(r, lkb, ms);
2809
2810 unlock_rsb(r);
2811 put_rsb(r);
2812}
2813
2814static void receive_convert_reply(struct dlm_ls *ls, struct dlm_message *ms)
2815{
2816 struct dlm_lkb *lkb;
2817 int error;
2818
2819 error = find_lkb(ls, ms->m_remid, &lkb);
2820 if (error) {
2821 log_error(ls, "receive_convert_reply no lkb");
2822 return;
2823 }
2824 DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb););
2825
2826 error = remove_from_waiters(lkb);
2827 if (error) {
2828 log_error(ls, "receive_convert_reply not on waiters");
2829 goto out;
2830 }
2831
2832 _receive_convert_reply(lkb, ms);
2833 out:
b3f58d8f 2834 dlm_put_lkb(lkb);
e7fd4179
DT
2835}
2836
2837static void _receive_unlock_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
2838{
2839 struct dlm_rsb *r = lkb->lkb_resource;
2840 int error = ms->m_result;
2841
2842 hold_rsb(r);
2843 lock_rsb(r);
2844
2845 /* this is the value returned from do_unlock() on the master */
2846
2847 switch (error) {
2848 case -DLM_EUNLOCK:
2849 receive_flags_reply(lkb, ms);
2850 remove_lock_pc(r, lkb);
2851 queue_cast(r, lkb, -DLM_EUNLOCK);
2852 break;
2853 default:
2854 log_error(r->res_ls, "receive_unlock_reply error %d", error);
2855 }
2856
2857 unlock_rsb(r);
2858 put_rsb(r);
2859}
2860
2861static void receive_unlock_reply(struct dlm_ls *ls, struct dlm_message *ms)
2862{
2863 struct dlm_lkb *lkb;
2864 int error;
2865
2866 error = find_lkb(ls, ms->m_remid, &lkb);
2867 if (error) {
2868 log_error(ls, "receive_unlock_reply no lkb");
2869 return;
2870 }
2871 DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb););
2872
2873 error = remove_from_waiters(lkb);
2874 if (error) {
2875 log_error(ls, "receive_unlock_reply not on waiters");
2876 goto out;
2877 }
2878
2879 _receive_unlock_reply(lkb, ms);
2880 out:
b3f58d8f 2881 dlm_put_lkb(lkb);
e7fd4179
DT
2882}
2883
2884static void _receive_cancel_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
2885{
2886 struct dlm_rsb *r = lkb->lkb_resource;
2887 int error = ms->m_result;
2888
2889 hold_rsb(r);
2890 lock_rsb(r);
2891
2892 /* this is the value returned from do_cancel() on the master */
2893
2894 switch (error) {
2895 case -DLM_ECANCEL:
2896 receive_flags_reply(lkb, ms);
2897 revert_lock_pc(r, lkb);
2898 queue_cast(r, lkb, -DLM_ECANCEL);
2899 break;
2900 default:
2901 log_error(r->res_ls, "receive_cancel_reply error %d", error);
2902 }
2903
2904 unlock_rsb(r);
2905 put_rsb(r);
2906}
2907
2908static void receive_cancel_reply(struct dlm_ls *ls, struct dlm_message *ms)
2909{
2910 struct dlm_lkb *lkb;
2911 int error;
2912
2913 error = find_lkb(ls, ms->m_remid, &lkb);
2914 if (error) {
2915 log_error(ls, "receive_cancel_reply no lkb");
2916 return;
2917 }
2918 DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb););
2919
2920 error = remove_from_waiters(lkb);
2921 if (error) {
2922 log_error(ls, "receive_cancel_reply not on waiters");
2923 goto out;
2924 }
2925
2926 _receive_cancel_reply(lkb, ms);
2927 out:
b3f58d8f 2928 dlm_put_lkb(lkb);
e7fd4179
DT
2929}
2930
2931static void receive_lookup_reply(struct dlm_ls *ls, struct dlm_message *ms)
2932{
2933 struct dlm_lkb *lkb;
2934 struct dlm_rsb *r;
2935 int error, ret_nodeid;
2936
2937 error = find_lkb(ls, ms->m_lkid, &lkb);
2938 if (error) {
2939 log_error(ls, "receive_lookup_reply no lkb");
2940 return;
2941 }
2942
2943 error = remove_from_waiters(lkb);
2944 if (error) {
2945 log_error(ls, "receive_lookup_reply not on waiters");
2946 goto out;
2947 }
2948
2949 /* this is the value returned by dlm_dir_lookup on dir node
2950 FIXME: will a non-zero error ever be returned? */
2951 error = ms->m_result;
2952
2953 r = lkb->lkb_resource;
2954 hold_rsb(r);
2955 lock_rsb(r);
2956
2957 ret_nodeid = ms->m_nodeid;
2958 if (ret_nodeid == dlm_our_nodeid()) {
2959 r->res_nodeid = 0;
2960 ret_nodeid = 0;
2961 r->res_first_lkid = 0;
2962 } else {
2963 /* set_master() will copy res_nodeid to lkb_nodeid */
2964 r->res_nodeid = ret_nodeid;
2965 }
2966
2967 _request_lock(r, lkb);
2968
2969 if (!ret_nodeid)
2970 process_lookup_list(r);
2971
2972 unlock_rsb(r);
2973 put_rsb(r);
2974 out:
b3f58d8f 2975 dlm_put_lkb(lkb);
e7fd4179
DT
2976}
2977
2978int dlm_receive_message(struct dlm_header *hd, int nodeid, int recovery)
2979{
2980 struct dlm_message *ms = (struct dlm_message *) hd;
2981 struct dlm_ls *ls;
2982 int error;
2983
2984 if (!recovery)
2985 dlm_message_in(ms);
2986
2987 ls = dlm_find_lockspace_global(hd->h_lockspace);
2988 if (!ls) {
2989 log_print("drop message %d from %d for unknown lockspace %d",
2990 ms->m_type, nodeid, hd->h_lockspace);
2991 return -EINVAL;
2992 }
2993
2994 /* recovery may have just ended leaving a bunch of backed-up requests
2995 in the requestqueue; wait while dlm_recoverd clears them */
2996
2997 if (!recovery)
2998 dlm_wait_requestqueue(ls);
2999
3000 /* recovery may have just started while there were a bunch of
3001 in-flight requests -- save them in requestqueue to be processed
3002 after recovery. we can't let dlm_recvd block on the recovery
3003 lock. if dlm_recoverd is calling this function to clear the
3004 requestqueue, it needs to be interrupted (-EINTR) if another
3005 recovery operation is starting. */
3006
3007 while (1) {
3008 if (dlm_locking_stopped(ls)) {
3009 if (!recovery)
3010 dlm_add_requestqueue(ls, nodeid, hd);
3011 error = -EINTR;
3012 goto out;
3013 }
3014
3015 if (lock_recovery_try(ls))
3016 break;
3017 schedule();
3018 }
3019
3020 switch (ms->m_type) {
3021
3022 /* messages sent to a master node */
3023
3024 case DLM_MSG_REQUEST:
3025 receive_request(ls, ms);
3026 break;
3027
3028 case DLM_MSG_CONVERT:
3029 receive_convert(ls, ms);
3030 break;
3031
3032 case DLM_MSG_UNLOCK:
3033 receive_unlock(ls, ms);
3034 break;
3035
3036 case DLM_MSG_CANCEL:
3037 receive_cancel(ls, ms);
3038 break;
3039
3040 /* messages sent from a master node (replies to above) */
3041
3042 case DLM_MSG_REQUEST_REPLY:
3043 receive_request_reply(ls, ms);
3044 break;
3045
3046 case DLM_MSG_CONVERT_REPLY:
3047 receive_convert_reply(ls, ms);
3048 break;
3049
3050 case DLM_MSG_UNLOCK_REPLY:
3051 receive_unlock_reply(ls, ms);
3052 break;
3053
3054 case DLM_MSG_CANCEL_REPLY:
3055 receive_cancel_reply(ls, ms);
3056 break;
3057
3058 /* messages sent from a master node (only two types of async msg) */
3059
3060 case DLM_MSG_GRANT:
3061 receive_grant(ls, ms);
3062 break;
3063
3064 case DLM_MSG_BAST:
3065 receive_bast(ls, ms);
3066 break;
3067
3068 /* messages sent to a dir node */
3069
3070 case DLM_MSG_LOOKUP:
3071 receive_lookup(ls, ms);
3072 break;
3073
3074 case DLM_MSG_REMOVE:
3075 receive_remove(ls, ms);
3076 break;
3077
3078 /* messages sent from a dir node (remove has no reply) */
3079
3080 case DLM_MSG_LOOKUP_REPLY:
3081 receive_lookup_reply(ls, ms);
3082 break;
3083
3084 default:
3085 log_error(ls, "unknown message type %d", ms->m_type);
3086 }
3087
3088 unlock_recovery(ls);
3089 out:
3090 dlm_put_lockspace(ls);
3091 dlm_astd_wake();
3092 return 0;
3093}
3094
3095
3096/*
3097 * Recovery related
3098 */
3099
3100static void recover_convert_waiter(struct dlm_ls *ls, struct dlm_lkb *lkb)
3101{
3102 if (middle_conversion(lkb)) {
3103 hold_lkb(lkb);
3104 ls->ls_stub_ms.m_result = -EINPROGRESS;
3105 _remove_from_waiters(lkb);
3106 _receive_convert_reply(lkb, &ls->ls_stub_ms);
3107
3108 /* Same special case as in receive_rcom_lock_args() */
3109 lkb->lkb_grmode = DLM_LOCK_IV;
3110 rsb_set_flag(lkb->lkb_resource, RSB_RECOVER_CONVERT);
3111 unhold_lkb(lkb);
3112
3113 } else if (lkb->lkb_rqmode >= lkb->lkb_grmode) {
3114 lkb->lkb_flags |= DLM_IFL_RESEND;
3115 }
3116
3117 /* lkb->lkb_rqmode < lkb->lkb_grmode shouldn't happen since down
3118 conversions are async; there's no reply from the remote master */
3119}
3120
3121/* A waiting lkb needs recovery if the master node has failed, or
3122 the master node is changing (only when no directory is used) */
3123
3124static int waiter_needs_recovery(struct dlm_ls *ls, struct dlm_lkb *lkb)
3125{
3126 if (dlm_is_removed(ls, lkb->lkb_nodeid))
3127 return 1;
3128
3129 if (!dlm_no_directory(ls))
3130 return 0;
3131
3132 if (dlm_dir_nodeid(lkb->lkb_resource) != lkb->lkb_nodeid)
3133 return 1;
3134
3135 return 0;
3136}
3137
3138/* Recovery for locks that are waiting for replies from nodes that are now
3139 gone. We can just complete unlocks and cancels by faking a reply from the
3140 dead node. Requests and up-conversions we flag to be resent after
3141 recovery. Down-conversions can just be completed with a fake reply like
3142 unlocks. Conversions between PR and CW need special attention. */
3143
3144void dlm_recover_waiters_pre(struct dlm_ls *ls)
3145{
3146 struct dlm_lkb *lkb, *safe;
3147
90135925 3148 mutex_lock(&ls->ls_waiters_mutex);
e7fd4179
DT
3149
3150 list_for_each_entry_safe(lkb, safe, &ls->ls_waiters, lkb_wait_reply) {
3151 log_debug(ls, "pre recover waiter lkid %x type %d flags %x",
3152 lkb->lkb_id, lkb->lkb_wait_type, lkb->lkb_flags);
3153
3154 /* all outstanding lookups, regardless of destination will be
3155 resent after recovery is done */
3156
3157 if (lkb->lkb_wait_type == DLM_MSG_LOOKUP) {
3158 lkb->lkb_flags |= DLM_IFL_RESEND;
3159 continue;
3160 }
3161
3162 if (!waiter_needs_recovery(ls, lkb))
3163 continue;
3164
3165 switch (lkb->lkb_wait_type) {
3166
3167 case DLM_MSG_REQUEST:
3168 lkb->lkb_flags |= DLM_IFL_RESEND;
3169 break;
3170
3171 case DLM_MSG_CONVERT:
3172 recover_convert_waiter(ls, lkb);
3173 break;
3174
3175 case DLM_MSG_UNLOCK:
3176 hold_lkb(lkb);
3177 ls->ls_stub_ms.m_result = -DLM_EUNLOCK;
3178 _remove_from_waiters(lkb);
3179 _receive_unlock_reply(lkb, &ls->ls_stub_ms);
b3f58d8f 3180 dlm_put_lkb(lkb);
e7fd4179
DT
3181 break;
3182
3183 case DLM_MSG_CANCEL:
3184 hold_lkb(lkb);
3185 ls->ls_stub_ms.m_result = -DLM_ECANCEL;
3186 _remove_from_waiters(lkb);
3187 _receive_cancel_reply(lkb, &ls->ls_stub_ms);
b3f58d8f 3188 dlm_put_lkb(lkb);
e7fd4179
DT
3189 break;
3190
3191 default:
3192 log_error(ls, "invalid lkb wait_type %d",
3193 lkb->lkb_wait_type);
3194 }
3195 }
90135925 3196 mutex_unlock(&ls->ls_waiters_mutex);
e7fd4179
DT
3197}
3198
3199static int remove_resend_waiter(struct dlm_ls *ls, struct dlm_lkb **lkb_ret)
3200{
3201 struct dlm_lkb *lkb;
3202 int rv = 0;
3203
90135925 3204 mutex_lock(&ls->ls_waiters_mutex);
e7fd4179
DT
3205 list_for_each_entry(lkb, &ls->ls_waiters, lkb_wait_reply) {
3206 if (lkb->lkb_flags & DLM_IFL_RESEND) {
3207 rv = lkb->lkb_wait_type;
3208 _remove_from_waiters(lkb);
3209 lkb->lkb_flags &= ~DLM_IFL_RESEND;
3210 break;
3211 }
3212 }
90135925 3213 mutex_unlock(&ls->ls_waiters_mutex);
e7fd4179
DT
3214
3215 if (!rv)
3216 lkb = NULL;
3217 *lkb_ret = lkb;
3218 return rv;
3219}
3220
3221/* Deal with lookups and lkb's marked RESEND from _pre. We may now be the
3222 master or dir-node for r. Processing the lkb may result in it being placed
3223 back on waiters. */
3224
3225int dlm_recover_waiters_post(struct dlm_ls *ls)
3226{
3227 struct dlm_lkb *lkb;
3228 struct dlm_rsb *r;
3229 int error = 0, mstype;
3230
3231 while (1) {
3232 if (dlm_locking_stopped(ls)) {
3233 log_debug(ls, "recover_waiters_post aborted");
3234 error = -EINTR;
3235 break;
3236 }
3237
3238 mstype = remove_resend_waiter(ls, &lkb);
3239 if (!mstype)
3240 break;
3241
3242 r = lkb->lkb_resource;
3243
3244 log_debug(ls, "recover_waiters_post %x type %d flags %x %s",
3245 lkb->lkb_id, mstype, lkb->lkb_flags, r->res_name);
3246
3247 switch (mstype) {
3248
3249 case DLM_MSG_LOOKUP:
3250 hold_rsb(r);
3251 lock_rsb(r);
3252 _request_lock(r, lkb);
3253 if (is_master(r))
3254 confirm_master(r, 0);
3255 unlock_rsb(r);
3256 put_rsb(r);
3257 break;
3258
3259 case DLM_MSG_REQUEST:
3260 hold_rsb(r);
3261 lock_rsb(r);
3262 _request_lock(r, lkb);
3263 unlock_rsb(r);
3264 put_rsb(r);
3265 break;
3266
3267 case DLM_MSG_CONVERT:
3268 hold_rsb(r);
3269 lock_rsb(r);
3270 _convert_lock(r, lkb);
3271 unlock_rsb(r);
3272 put_rsb(r);
3273 break;
3274
3275 default:
3276 log_error(ls, "recover_waiters_post type %d", mstype);
3277 }
3278 }
3279
3280 return error;
3281}
3282
3283static void purge_queue(struct dlm_rsb *r, struct list_head *queue,
3284 int (*test)(struct dlm_ls *ls, struct dlm_lkb *lkb))
3285{
3286 struct dlm_ls *ls = r->res_ls;
3287 struct dlm_lkb *lkb, *safe;
3288
3289 list_for_each_entry_safe(lkb, safe, queue, lkb_statequeue) {
3290 if (test(ls, lkb)) {
97a35d1e 3291 rsb_set_flag(r, RSB_LOCKS_PURGED);
e7fd4179
DT
3292 del_lkb(r, lkb);
3293 /* this put should free the lkb */
b3f58d8f 3294 if (!dlm_put_lkb(lkb))
e7fd4179
DT
3295 log_error(ls, "purged lkb not released");
3296 }
3297 }
3298}
3299
3300static int purge_dead_test(struct dlm_ls *ls, struct dlm_lkb *lkb)
3301{
3302 return (is_master_copy(lkb) && dlm_is_removed(ls, lkb->lkb_nodeid));
3303}
3304
3305static int purge_mstcpy_test(struct dlm_ls *ls, struct dlm_lkb *lkb)
3306{
3307 return is_master_copy(lkb);
3308}
3309
3310static void purge_dead_locks(struct dlm_rsb *r)
3311{
3312 purge_queue(r, &r->res_grantqueue, &purge_dead_test);
3313 purge_queue(r, &r->res_convertqueue, &purge_dead_test);
3314 purge_queue(r, &r->res_waitqueue, &purge_dead_test);
3315}
3316
3317void dlm_purge_mstcpy_locks(struct dlm_rsb *r)
3318{
3319 purge_queue(r, &r->res_grantqueue, &purge_mstcpy_test);
3320 purge_queue(r, &r->res_convertqueue, &purge_mstcpy_test);
3321 purge_queue(r, &r->res_waitqueue, &purge_mstcpy_test);
3322}
3323
3324/* Get rid of locks held by nodes that are gone. */
3325
3326int dlm_purge_locks(struct dlm_ls *ls)
3327{
3328 struct dlm_rsb *r;
3329
3330 log_debug(ls, "dlm_purge_locks");
3331
3332 down_write(&ls->ls_root_sem);
3333 list_for_each_entry(r, &ls->ls_root_list, res_root_list) {
3334 hold_rsb(r);
3335 lock_rsb(r);
3336 if (is_master(r))
3337 purge_dead_locks(r);
3338 unlock_rsb(r);
3339 unhold_rsb(r);
3340
3341 schedule();
3342 }
3343 up_write(&ls->ls_root_sem);
3344
3345 return 0;
3346}
3347
97a35d1e
DT
3348static struct dlm_rsb *find_purged_rsb(struct dlm_ls *ls, int bucket)
3349{
3350 struct dlm_rsb *r, *r_ret = NULL;
3351
3352 read_lock(&ls->ls_rsbtbl[bucket].lock);
3353 list_for_each_entry(r, &ls->ls_rsbtbl[bucket].list, res_hashchain) {
3354 if (!rsb_flag(r, RSB_LOCKS_PURGED))
3355 continue;
3356 hold_rsb(r);
3357 rsb_clear_flag(r, RSB_LOCKS_PURGED);
3358 r_ret = r;
3359 break;
3360 }
3361 read_unlock(&ls->ls_rsbtbl[bucket].lock);
3362 return r_ret;
3363}
3364
3365void dlm_grant_after_purge(struct dlm_ls *ls)
e7fd4179
DT
3366{
3367 struct dlm_rsb *r;
3368 int i;
3369
3370 for (i = 0; i < ls->ls_rsbtbl_size; i++) {
97a35d1e
DT
3371 r = find_purged_rsb(ls, i);
3372 if (!r)
3373 continue;
3374 lock_rsb(r);
3375 if (is_master(r)) {
3376 grant_pending_locks(r);
3377 confirm_master(r, 0);
e7fd4179 3378 }
97a35d1e
DT
3379 unlock_rsb(r);
3380 put_rsb(r);
e7fd4179 3381 }
e7fd4179
DT
3382}
3383
3384static struct dlm_lkb *search_remid_list(struct list_head *head, int nodeid,
3385 uint32_t remid)
3386{
3387 struct dlm_lkb *lkb;
3388
3389 list_for_each_entry(lkb, head, lkb_statequeue) {
3390 if (lkb->lkb_nodeid == nodeid && lkb->lkb_remid == remid)
3391 return lkb;
3392 }
3393 return NULL;
3394}
3395
3396static struct dlm_lkb *search_remid(struct dlm_rsb *r, int nodeid,
3397 uint32_t remid)
3398{
3399 struct dlm_lkb *lkb;
3400
3401 lkb = search_remid_list(&r->res_grantqueue, nodeid, remid);
3402 if (lkb)
3403 return lkb;
3404 lkb = search_remid_list(&r->res_convertqueue, nodeid, remid);
3405 if (lkb)
3406 return lkb;
3407 lkb = search_remid_list(&r->res_waitqueue, nodeid, remid);
3408 if (lkb)
3409 return lkb;
3410 return NULL;
3411}
3412
3413static int receive_rcom_lock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
3414 struct dlm_rsb *r, struct dlm_rcom *rc)
3415{
3416 struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf;
3417 int lvblen;
3418
3419 lkb->lkb_nodeid = rc->rc_header.h_nodeid;
3420 lkb->lkb_ownpid = rl->rl_ownpid;
3421 lkb->lkb_remid = rl->rl_lkid;
3422 lkb->lkb_exflags = rl->rl_exflags;
3423 lkb->lkb_flags = rl->rl_flags & 0x0000FFFF;
3424 lkb->lkb_flags |= DLM_IFL_MSTCPY;
3425 lkb->lkb_lvbseq = rl->rl_lvbseq;
3426 lkb->lkb_rqmode = rl->rl_rqmode;
3427 lkb->lkb_grmode = rl->rl_grmode;
3428 /* don't set lkb_status because add_lkb wants to itself */
3429
3430 lkb->lkb_bastaddr = (void *) (long) (rl->rl_asts & AST_BAST);
3431 lkb->lkb_astaddr = (void *) (long) (rl->rl_asts & AST_COMP);
3432
e7fd4179
DT
3433 if (lkb->lkb_exflags & DLM_LKF_VALBLK) {
3434 lkb->lkb_lvbptr = allocate_lvb(ls);
3435 if (!lkb->lkb_lvbptr)
3436 return -ENOMEM;
3437 lvblen = rc->rc_header.h_length - sizeof(struct dlm_rcom) -
3438 sizeof(struct rcom_lock);
3439 memcpy(lkb->lkb_lvbptr, rl->rl_lvb, lvblen);
3440 }
3441
3442 /* Conversions between PR and CW (middle modes) need special handling.
3443 The real granted mode of these converting locks cannot be determined
3444 until all locks have been rebuilt on the rsb (recover_conversion) */
3445
3446 if (rl->rl_wait_type == DLM_MSG_CONVERT && middle_conversion(lkb)) {
3447 rl->rl_status = DLM_LKSTS_CONVERT;
3448 lkb->lkb_grmode = DLM_LOCK_IV;
3449 rsb_set_flag(r, RSB_RECOVER_CONVERT);
3450 }
3451
3452 return 0;
3453}
3454
3455/* This lkb may have been recovered in a previous aborted recovery so we need
3456 to check if the rsb already has an lkb with the given remote nodeid/lkid.
3457 If so we just send back a standard reply. If not, we create a new lkb with
3458 the given values and send back our lkid. We send back our lkid by sending
3459 back the rcom_lock struct we got but with the remid field filled in. */
3460
3461int dlm_recover_master_copy(struct dlm_ls *ls, struct dlm_rcom *rc)
3462{
3463 struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf;
3464 struct dlm_rsb *r;
3465 struct dlm_lkb *lkb;
3466 int error;
3467
3468 if (rl->rl_parent_lkid) {
3469 error = -EOPNOTSUPP;
3470 goto out;
3471 }
3472
3473 error = find_rsb(ls, rl->rl_name, rl->rl_namelen, R_MASTER, &r);
3474 if (error)
3475 goto out;
3476
3477 lock_rsb(r);
3478
3479 lkb = search_remid(r, rc->rc_header.h_nodeid, rl->rl_lkid);
3480 if (lkb) {
3481 error = -EEXIST;
3482 goto out_remid;
3483 }
3484
3485 error = create_lkb(ls, &lkb);
3486 if (error)
3487 goto out_unlock;
3488
3489 error = receive_rcom_lock_args(ls, lkb, r, rc);
3490 if (error) {
b3f58d8f 3491 __put_lkb(ls, lkb);
e7fd4179
DT
3492 goto out_unlock;
3493 }
3494
3495 attach_lkb(r, lkb);
3496 add_lkb(r, lkb, rl->rl_status);
3497 error = 0;
3498
3499 out_remid:
3500 /* this is the new value returned to the lock holder for
3501 saving in its process-copy lkb */
3502 rl->rl_remid = lkb->lkb_id;
3503
3504 out_unlock:
3505 unlock_rsb(r);
3506 put_rsb(r);
3507 out:
3508 if (error)
3509 log_print("recover_master_copy %d %x", error, rl->rl_lkid);
3510 rl->rl_result = error;
3511 return error;
3512}
3513
3514int dlm_recover_process_copy(struct dlm_ls *ls, struct dlm_rcom *rc)
3515{
3516 struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf;
3517 struct dlm_rsb *r;
3518 struct dlm_lkb *lkb;
3519 int error;
3520
3521 error = find_lkb(ls, rl->rl_lkid, &lkb);
3522 if (error) {
3523 log_error(ls, "recover_process_copy no lkid %x", rl->rl_lkid);
3524 return error;
3525 }
3526
3527 DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb););
3528
3529 error = rl->rl_result;
3530
3531 r = lkb->lkb_resource;
3532 hold_rsb(r);
3533 lock_rsb(r);
3534
3535 switch (error) {
3536 case -EEXIST:
3537 log_debug(ls, "master copy exists %x", lkb->lkb_id);
3538 /* fall through */
3539 case 0:
3540 lkb->lkb_remid = rl->rl_remid;
3541 break;
3542 default:
3543 log_error(ls, "dlm_recover_process_copy unknown error %d %x",
3544 error, lkb->lkb_id);
3545 }
3546
3547 /* an ack for dlm_recover_locks() which waits for replies from
3548 all the locks it sends to new masters */
3549 dlm_recovered_lock(r);
3550
3551 unlock_rsb(r);
3552 put_rsb(r);
b3f58d8f 3553 dlm_put_lkb(lkb);
e7fd4179
DT
3554
3555 return 0;
3556}
3557
597d0cae
DT
3558int dlm_user_request(struct dlm_ls *ls, struct dlm_user_args *ua,
3559 int mode, uint32_t flags, void *name, unsigned int namelen,
3560 uint32_t parent_lkid)
3561{
3562 struct dlm_lkb *lkb;
3563 struct dlm_args args;
3564 int error;
3565
3566 lock_recovery(ls);
3567
3568 error = create_lkb(ls, &lkb);
3569 if (error) {
3570 kfree(ua);
3571 goto out;
3572 }
3573
3574 if (flags & DLM_LKF_VALBLK) {
3575 ua->lksb.sb_lvbptr = kmalloc(DLM_USER_LVB_LEN, GFP_KERNEL);
3576 if (!ua->lksb.sb_lvbptr) {
3577 kfree(ua);
3578 __put_lkb(ls, lkb);
3579 error = -ENOMEM;
3580 goto out;
3581 }
3582 }
3583
3584 /* After ua is attached to lkb it will be freed by free_lkb().
3585 When DLM_IFL_USER is set, the dlm knows that this is a userspace
3586 lock and that lkb_astparam is the dlm_user_args structure. */
3587
3588 error = set_lock_args(mode, &ua->lksb, flags, namelen, parent_lkid,
3589 FAKE_USER_AST, ua, FAKE_USER_AST, &args);
3590 lkb->lkb_flags |= DLM_IFL_USER;
3591 ua->old_mode = DLM_LOCK_IV;
3592
3593 if (error) {
3594 __put_lkb(ls, lkb);
3595 goto out;
3596 }
3597
3598 error = request_lock(ls, lkb, name, namelen, &args);
3599
3600 switch (error) {
3601 case 0:
3602 break;
3603 case -EINPROGRESS:
3604 error = 0;
3605 break;
3606 case -EAGAIN:
3607 error = 0;
3608 /* fall through */
3609 default:
3610 __put_lkb(ls, lkb);
3611 goto out;
3612 }
3613
3614 /* add this new lkb to the per-process list of locks */
3615 spin_lock(&ua->proc->locks_spin);
3616 kref_get(&lkb->lkb_ref);
3617 list_add_tail(&lkb->lkb_ownqueue, &ua->proc->locks);
3618 spin_unlock(&ua->proc->locks_spin);
3619 out:
3620 unlock_recovery(ls);
3621 return error;
3622}
3623
3624int dlm_user_convert(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
3625 int mode, uint32_t flags, uint32_t lkid, char *lvb_in)
3626{
3627 struct dlm_lkb *lkb;
3628 struct dlm_args args;
3629 struct dlm_user_args *ua;
3630 int error;
3631
3632 lock_recovery(ls);
3633
3634 error = find_lkb(ls, lkid, &lkb);
3635 if (error)
3636 goto out;
3637
3638 /* user can change the params on its lock when it converts it, or
3639 add an lvb that didn't exist before */
3640
3641 ua = (struct dlm_user_args *)lkb->lkb_astparam;
3642
3643 if (flags & DLM_LKF_VALBLK && !ua->lksb.sb_lvbptr) {
3644 ua->lksb.sb_lvbptr = kmalloc(DLM_USER_LVB_LEN, GFP_KERNEL);
3645 if (!ua->lksb.sb_lvbptr) {
3646 error = -ENOMEM;
3647 goto out_put;
3648 }
3649 }
3650 if (lvb_in && ua->lksb.sb_lvbptr)
3651 memcpy(ua->lksb.sb_lvbptr, lvb_in, DLM_USER_LVB_LEN);
3652
3653 ua->castparam = ua_tmp->castparam;
3654 ua->castaddr = ua_tmp->castaddr;
3655 ua->bastparam = ua_tmp->bastparam;
3656 ua->bastaddr = ua_tmp->bastaddr;
3657 ua->old_mode = lkb->lkb_grmode;
3658
3659 error = set_lock_args(mode, &ua->lksb, flags, 0, 0, FAKE_USER_AST, ua,
3660 FAKE_USER_AST, &args);
3661 if (error)
3662 goto out_put;
3663
3664 error = convert_lock(ls, lkb, &args);
3665
3666 if (error == -EINPROGRESS || error == -EAGAIN)
3667 error = 0;
3668 out_put:
3669 dlm_put_lkb(lkb);
3670 out:
3671 unlock_recovery(ls);
3672 kfree(ua_tmp);
3673 return error;
3674}
3675
3676int dlm_user_unlock(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
3677 uint32_t flags, uint32_t lkid, char *lvb_in)
3678{
3679 struct dlm_lkb *lkb;
3680 struct dlm_args args;
3681 struct dlm_user_args *ua;
3682 int error;
3683
3684 lock_recovery(ls);
3685
3686 error = find_lkb(ls, lkid, &lkb);
3687 if (error)
3688 goto out;
3689
3690 ua = (struct dlm_user_args *)lkb->lkb_astparam;
3691
3692 if (lvb_in && ua->lksb.sb_lvbptr)
3693 memcpy(ua->lksb.sb_lvbptr, lvb_in, DLM_USER_LVB_LEN);
3694 ua->castparam = ua_tmp->castparam;
3695
3696 error = set_unlock_args(flags, ua, &args);
3697 if (error)
3698 goto out_put;
3699
3700 error = unlock_lock(ls, lkb, &args);
3701
3702 if (error == -DLM_EUNLOCK)
3703 error = 0;
3704 if (error)
3705 goto out_put;
3706
3707 spin_lock(&ua->proc->locks_spin);
3708 list_del(&lkb->lkb_ownqueue);
3709 spin_unlock(&ua->proc->locks_spin);
3710
3711 /* this removes the reference for the proc->locks list added by
3712 dlm_user_request */
3713 unhold_lkb(lkb);
3714 out_put:
3715 dlm_put_lkb(lkb);
3716 out:
3717 unlock_recovery(ls);
3718 return error;
3719}
3720
3721int dlm_user_cancel(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
3722 uint32_t flags, uint32_t lkid)
3723{
3724 struct dlm_lkb *lkb;
3725 struct dlm_args args;
3726 struct dlm_user_args *ua;
3727 int error;
3728
3729 lock_recovery(ls);
3730
3731 error = find_lkb(ls, lkid, &lkb);
3732 if (error)
3733 goto out;
3734
3735 ua = (struct dlm_user_args *)lkb->lkb_astparam;
3736 ua->castparam = ua_tmp->castparam;
3737
3738 error = set_unlock_args(flags, ua, &args);
3739 if (error)
3740 goto out_put;
3741
3742 error = cancel_lock(ls, lkb, &args);
3743
3744 if (error == -DLM_ECANCEL)
3745 error = 0;
3746 if (error)
3747 goto out_put;
3748
3749 /* this lkb was removed from the WAITING queue */
3750 if (lkb->lkb_grmode == DLM_LOCK_IV) {
3751 spin_lock(&ua->proc->locks_spin);
3752 list_del(&lkb->lkb_ownqueue);
3753 spin_unlock(&ua->proc->locks_spin);
3754 unhold_lkb(lkb);
3755 }
3756 out_put:
3757 dlm_put_lkb(lkb);
3758 out:
3759 unlock_recovery(ls);
3760 return error;
3761}
3762
3763static int orphan_proc_lock(struct dlm_ls *ls, struct dlm_lkb *lkb)
3764{
3765 struct dlm_user_args *ua = (struct dlm_user_args *)lkb->lkb_astparam;
3766
3767 if (ua->lksb.sb_lvbptr)
3768 kfree(ua->lksb.sb_lvbptr);
3769 kfree(ua);
3770 lkb->lkb_astparam = (long)NULL;
3771
3772 /* TODO: propogate to master if needed */
3773 return 0;
3774}
3775
3776/* The force flag allows the unlock to go ahead even if the lkb isn't granted.
3777 Regardless of what rsb queue the lock is on, it's removed and freed. */
3778
3779static int unlock_proc_lock(struct dlm_ls *ls, struct dlm_lkb *lkb)
3780{
3781 struct dlm_user_args *ua = (struct dlm_user_args *)lkb->lkb_astparam;
3782 struct dlm_args args;
3783 int error;
3784
3785 /* FIXME: we need to handle the case where the lkb is in limbo
3786 while the rsb is being looked up, currently we assert in
3787 _unlock_lock/is_remote because rsb nodeid is -1. */
3788
3789 set_unlock_args(DLM_LKF_FORCEUNLOCK, ua, &args);
3790
3791 error = unlock_lock(ls, lkb, &args);
3792 if (error == -DLM_EUNLOCK)
3793 error = 0;
3794 return error;
3795}
3796
3797/* The ls_clear_proc_locks mutex protects against dlm_user_add_asts() which
3798 1) references lkb->ua which we free here and 2) adds lkbs to proc->asts,
3799 which we clear here. */
3800
3801/* proc CLOSING flag is set so no more device_reads should look at proc->asts
3802 list, and no more device_writes should add lkb's to proc->locks list; so we
3803 shouldn't need to take asts_spin or locks_spin here. this assumes that
3804 device reads/writes/closes are serialized -- FIXME: we may need to serialize
3805 them ourself. */
3806
3807void dlm_clear_proc_locks(struct dlm_ls *ls, struct dlm_user_proc *proc)
3808{
3809 struct dlm_lkb *lkb, *safe;
3810
3811 lock_recovery(ls);
3812 mutex_lock(&ls->ls_clear_proc_locks);
3813
3814 list_for_each_entry_safe(lkb, safe, &proc->locks, lkb_ownqueue) {
3815 if (lkb->lkb_ast_type) {
3816 list_del(&lkb->lkb_astqueue);
3817 unhold_lkb(lkb);
3818 }
3819
3820 list_del(&lkb->lkb_ownqueue);
3821
3822 if (lkb->lkb_exflags & DLM_LKF_PERSISTENT) {
3823 lkb->lkb_flags |= DLM_IFL_ORPHAN;
3824 orphan_proc_lock(ls, lkb);
3825 } else {
3826 lkb->lkb_flags |= DLM_IFL_DEAD;
3827 unlock_proc_lock(ls, lkb);
3828 }
3829
3830 /* this removes the reference for the proc->locks list
3831 added by dlm_user_request, it may result in the lkb
3832 being freed */
3833
3834 dlm_put_lkb(lkb);
3835 }
3836 mutex_unlock(&ls->ls_clear_proc_locks);
3837 unlock_recovery(ls);
3838}
This page took 0.171419 seconds and 5 git commands to generate.