1 /******************************************************************************
2 *******************************************************************************
4 ** Copyright (C) Sistina Software, Inc. 1997-2003 All rights reserved.
5 ** Copyright (C) 2005 Red Hat, Inc. All rights reserved.
7 ** This copyrighted material is made available to anyone wishing to use,
8 ** modify, copy, or redistribute it subject to the terms and conditions
9 ** of the GNU General Public License v.2.
11 *******************************************************************************
12 ******************************************************************************/
14 #include "dlm_internal.h"
15 #include "lockspace.h"
28 static int rcom_response(struct dlm_ls
*ls
)
30 return test_bit(LSFL_RCOM_READY
, &ls
->ls_flags
);
33 static int create_rcom(struct dlm_ls
*ls
, int to_nodeid
, int type
, int len
,
34 struct dlm_rcom
**rc_ret
, struct dlm_mhandle
**mh_ret
)
37 struct dlm_mhandle
*mh
;
39 int mb_len
= sizeof(struct dlm_rcom
) + len
;
41 mh
= dlm_lowcomms_get_buffer(to_nodeid
, mb_len
, GFP_KERNEL
, &mb
);
43 log_print("create_rcom to %d type %d len %d ENOBUFS",
44 to_nodeid
, type
, len
);
47 memset(mb
, 0, mb_len
);
49 rc
= (struct dlm_rcom
*) mb
;
51 rc
->rc_header
.h_version
= (DLM_HEADER_MAJOR
| DLM_HEADER_MINOR
);
52 rc
->rc_header
.h_lockspace
= ls
->ls_global_id
;
53 rc
->rc_header
.h_nodeid
= dlm_our_nodeid();
54 rc
->rc_header
.h_length
= mb_len
;
55 rc
->rc_header
.h_cmd
= DLM_RCOM
;
64 static void send_rcom(struct dlm_ls
*ls
, struct dlm_mhandle
*mh
,
68 dlm_lowcomms_commit_buffer(mh
);
71 /* When replying to a status request, a node also sends back its
72 configuration values. The requesting node then checks that the remote
73 node is configured the same way as itself. */
75 static void make_config(struct dlm_ls
*ls
, struct rcom_config
*rf
)
77 rf
->rf_lvblen
= ls
->ls_lvblen
;
78 rf
->rf_lsflags
= ls
->ls_exflags
;
81 static int check_config(struct dlm_ls
*ls
, struct rcom_config
*rf
, int nodeid
)
83 if (rf
->rf_lvblen
!= ls
->ls_lvblen
||
84 rf
->rf_lsflags
!= ls
->ls_exflags
) {
85 log_error(ls
, "config mismatch: %d,%x nodeid %d: %d,%x",
86 ls
->ls_lvblen
, ls
->ls_exflags
,
87 nodeid
, rf
->rf_lvblen
, rf
->rf_lsflags
);
93 int dlm_rcom_status(struct dlm_ls
*ls
, int nodeid
)
96 struct dlm_mhandle
*mh
;
99 memset(ls
->ls_recover_buf
, 0, dlm_config
.buffer_size
);
100 ls
->ls_recover_nodeid
= nodeid
;
102 if (nodeid
== dlm_our_nodeid()) {
103 rc
= (struct dlm_rcom
*) ls
->ls_recover_buf
;
104 rc
->rc_result
= dlm_recover_status(ls
);
108 error
= create_rcom(ls
, nodeid
, DLM_RCOM_STATUS
, 0, &rc
, &mh
);
112 send_rcom(ls
, mh
, rc
);
114 error
= dlm_wait_function(ls
, &rcom_response
);
115 clear_bit(LSFL_RCOM_READY
, &ls
->ls_flags
);
119 rc
= (struct dlm_rcom
*) ls
->ls_recover_buf
;
121 if (rc
->rc_result
== -ESRCH
) {
122 /* we pretend the remote lockspace exists with 0 status */
123 log_debug(ls
, "remote node %d not ready", nodeid
);
126 error
= check_config(ls
, (struct rcom_config
*) rc
->rc_buf
,
128 /* the caller looks at rc_result for the remote recovery status */
133 static void receive_rcom_status(struct dlm_ls
*ls
, struct dlm_rcom
*rc_in
)
136 struct dlm_mhandle
*mh
;
137 int error
, nodeid
= rc_in
->rc_header
.h_nodeid
;
139 error
= create_rcom(ls
, nodeid
, DLM_RCOM_STATUS_REPLY
,
140 sizeof(struct rcom_config
), &rc
, &mh
);
143 rc
->rc_result
= dlm_recover_status(ls
);
144 make_config(ls
, (struct rcom_config
*) rc
->rc_buf
);
146 send_rcom(ls
, mh
, rc
);
149 static void receive_rcom_status_reply(struct dlm_ls
*ls
, struct dlm_rcom
*rc_in
)
151 memcpy(ls
->ls_recover_buf
, rc_in
, rc_in
->rc_header
.h_length
);
152 set_bit(LSFL_RCOM_READY
, &ls
->ls_flags
);
153 wake_up(&ls
->ls_wait_general
);
156 int dlm_rcom_names(struct dlm_ls
*ls
, int nodeid
, char *last_name
, int last_len
)
159 struct dlm_mhandle
*mh
;
160 int error
= 0, len
= sizeof(struct dlm_rcom
);
162 memset(ls
->ls_recover_buf
, 0, dlm_config
.buffer_size
);
163 ls
->ls_recover_nodeid
= nodeid
;
165 if (nodeid
== dlm_our_nodeid()) {
166 dlm_copy_master_names(ls
, last_name
, last_len
,
167 ls
->ls_recover_buf
+ len
,
168 dlm_config
.buffer_size
- len
, nodeid
);
172 error
= create_rcom(ls
, nodeid
, DLM_RCOM_NAMES
, last_len
, &rc
, &mh
);
175 memcpy(rc
->rc_buf
, last_name
, last_len
);
177 send_rcom(ls
, mh
, rc
);
179 error
= dlm_wait_function(ls
, &rcom_response
);
180 clear_bit(LSFL_RCOM_READY
, &ls
->ls_flags
);
185 static void receive_rcom_names(struct dlm_ls
*ls
, struct dlm_rcom
*rc_in
)
188 struct dlm_mhandle
*mh
;
189 int error
, inlen
, outlen
;
190 int nodeid
= rc_in
->rc_header
.h_nodeid
;
191 uint32_t status
= dlm_recover_status(ls
);
194 * We can't run dlm_dir_rebuild_send (which uses ls_nodes) while
195 * dlm_recoverd is running ls_nodes_reconfig (which changes ls_nodes).
196 * It could only happen in rare cases where we get a late NAMES
197 * message from a previous instance of recovery.
200 if (!(status
& DLM_RS_NODES
)) {
201 log_debug(ls
, "ignoring RCOM_NAMES from %u", nodeid
);
205 nodeid
= rc_in
->rc_header
.h_nodeid
;
206 inlen
= rc_in
->rc_header
.h_length
- sizeof(struct dlm_rcom
);
207 outlen
= dlm_config
.buffer_size
- sizeof(struct dlm_rcom
);
209 error
= create_rcom(ls
, nodeid
, DLM_RCOM_NAMES_REPLY
, outlen
, &rc
, &mh
);
213 dlm_copy_master_names(ls
, rc_in
->rc_buf
, inlen
, rc
->rc_buf
, outlen
,
215 send_rcom(ls
, mh
, rc
);
218 static void receive_rcom_names_reply(struct dlm_ls
*ls
, struct dlm_rcom
*rc_in
)
220 memcpy(ls
->ls_recover_buf
, rc_in
, rc_in
->rc_header
.h_length
);
221 set_bit(LSFL_RCOM_READY
, &ls
->ls_flags
);
222 wake_up(&ls
->ls_wait_general
);
225 int dlm_send_rcom_lookup(struct dlm_rsb
*r
, int dir_nodeid
)
228 struct dlm_mhandle
*mh
;
229 struct dlm_ls
*ls
= r
->res_ls
;
232 error
= create_rcom(ls
, dir_nodeid
, DLM_RCOM_LOOKUP
, r
->res_length
,
236 memcpy(rc
->rc_buf
, r
->res_name
, r
->res_length
);
237 rc
->rc_id
= (unsigned long) r
;
239 send_rcom(ls
, mh
, rc
);
244 static void receive_rcom_lookup(struct dlm_ls
*ls
, struct dlm_rcom
*rc_in
)
247 struct dlm_mhandle
*mh
;
248 int error
, ret_nodeid
, nodeid
= rc_in
->rc_header
.h_nodeid
;
249 int len
= rc_in
->rc_header
.h_length
- sizeof(struct dlm_rcom
);
251 error
= create_rcom(ls
, nodeid
, DLM_RCOM_LOOKUP_REPLY
, 0, &rc
, &mh
);
255 error
= dlm_dir_lookup(ls
, nodeid
, rc_in
->rc_buf
, len
, &ret_nodeid
);
258 rc
->rc_result
= ret_nodeid
;
259 rc
->rc_id
= rc_in
->rc_id
;
261 send_rcom(ls
, mh
, rc
);
264 static void receive_rcom_lookup_reply(struct dlm_ls
*ls
, struct dlm_rcom
*rc_in
)
266 dlm_recover_master_reply(ls
, rc_in
);
269 static void pack_rcom_lock(struct dlm_rsb
*r
, struct dlm_lkb
*lkb
,
270 struct rcom_lock
*rl
)
272 memset(rl
, 0, sizeof(*rl
));
274 rl
->rl_ownpid
= lkb
->lkb_ownpid
;
275 rl
->rl_lkid
= lkb
->lkb_id
;
276 rl
->rl_exflags
= lkb
->lkb_exflags
;
277 rl
->rl_flags
= lkb
->lkb_flags
;
278 rl
->rl_lvbseq
= lkb
->lkb_lvbseq
;
279 rl
->rl_rqmode
= lkb
->lkb_rqmode
;
280 rl
->rl_grmode
= lkb
->lkb_grmode
;
281 rl
->rl_status
= lkb
->lkb_status
;
282 rl
->rl_wait_type
= lkb
->lkb_wait_type
;
284 if (lkb
->lkb_bastaddr
)
285 rl
->rl_asts
|= AST_BAST
;
286 if (lkb
->lkb_astaddr
)
287 rl
->rl_asts
|= AST_COMP
;
289 rl
->rl_namelen
= r
->res_length
;
290 memcpy(rl
->rl_name
, r
->res_name
, r
->res_length
);
292 /* FIXME: might we have an lvb without DLM_LKF_VALBLK set ?
293 If so, receive_rcom_lock_args() won't take this copy. */
296 memcpy(rl
->rl_lvb
, lkb
->lkb_lvbptr
, r
->res_ls
->ls_lvblen
);
299 int dlm_send_rcom_lock(struct dlm_rsb
*r
, struct dlm_lkb
*lkb
)
301 struct dlm_ls
*ls
= r
->res_ls
;
303 struct dlm_mhandle
*mh
;
304 struct rcom_lock
*rl
;
305 int error
, len
= sizeof(struct rcom_lock
);
308 len
+= ls
->ls_lvblen
;
310 error
= create_rcom(ls
, r
->res_nodeid
, DLM_RCOM_LOCK
, len
, &rc
, &mh
);
314 rl
= (struct rcom_lock
*) rc
->rc_buf
;
315 pack_rcom_lock(r
, lkb
, rl
);
316 rc
->rc_id
= (unsigned long) r
;
318 send_rcom(ls
, mh
, rc
);
323 static void receive_rcom_lock(struct dlm_ls
*ls
, struct dlm_rcom
*rc_in
)
326 struct dlm_mhandle
*mh
;
327 int error
, nodeid
= rc_in
->rc_header
.h_nodeid
;
329 dlm_recover_master_copy(ls
, rc_in
);
331 error
= create_rcom(ls
, nodeid
, DLM_RCOM_LOCK_REPLY
,
332 sizeof(struct rcom_lock
), &rc
, &mh
);
336 /* We send back the same rcom_lock struct we received, but
337 dlm_recover_master_copy() has filled in rl_remid and rl_result */
339 memcpy(rc
->rc_buf
, rc_in
->rc_buf
, sizeof(struct rcom_lock
));
340 rc
->rc_id
= rc_in
->rc_id
;
342 send_rcom(ls
, mh
, rc
);
345 static void receive_rcom_lock_reply(struct dlm_ls
*ls
, struct dlm_rcom
*rc_in
)
347 uint32_t status
= dlm_recover_status(ls
);
349 if (!(status
& DLM_RS_DIR
)) {
350 log_debug(ls
, "ignoring RCOM_LOCK_REPLY from %u",
351 rc_in
->rc_header
.h_nodeid
);
355 dlm_recover_process_copy(ls
, rc_in
);
358 static int send_ls_not_ready(int nodeid
, struct dlm_rcom
*rc_in
)
361 struct dlm_mhandle
*mh
;
363 int mb_len
= sizeof(struct dlm_rcom
);
365 mh
= dlm_lowcomms_get_buffer(nodeid
, mb_len
, GFP_KERNEL
, &mb
);
368 memset(mb
, 0, mb_len
);
370 rc
= (struct dlm_rcom
*) mb
;
372 rc
->rc_header
.h_version
= (DLM_HEADER_MAJOR
| DLM_HEADER_MINOR
);
373 rc
->rc_header
.h_lockspace
= rc_in
->rc_header
.h_lockspace
;
374 rc
->rc_header
.h_nodeid
= dlm_our_nodeid();
375 rc
->rc_header
.h_length
= mb_len
;
376 rc
->rc_header
.h_cmd
= DLM_RCOM
;
378 rc
->rc_type
= DLM_RCOM_STATUS_REPLY
;
379 rc
->rc_result
= -ESRCH
;
382 dlm_lowcomms_commit_buffer(mh
);
387 /* Called by dlm_recvd; corresponds to dlm_receive_message() but special
388 recovery-only comms are sent through here. */
390 void dlm_receive_rcom(struct dlm_header
*hd
, int nodeid
)
392 struct dlm_rcom
*rc
= (struct dlm_rcom
*) hd
;
397 /* If the lockspace doesn't exist then still send a status message
398 back; it's possible that it just doesn't have its global_id yet. */
400 ls
= dlm_find_lockspace_global(hd
->h_lockspace
);
402 log_print("lockspace %x from %d not found",
403 hd
->h_lockspace
, nodeid
);
404 send_ls_not_ready(nodeid
, rc
);
408 if (dlm_recovery_stopped(ls
) && (rc
->rc_type
!= DLM_RCOM_STATUS
)) {
409 log_error(ls
, "ignoring recovery message %x from %d",
410 rc
->rc_type
, nodeid
);
414 if (nodeid
!= rc
->rc_header
.h_nodeid
) {
415 log_error(ls
, "bad rcom nodeid %d from %d",
416 rc
->rc_header
.h_nodeid
, nodeid
);
420 switch (rc
->rc_type
) {
421 case DLM_RCOM_STATUS
:
422 receive_rcom_status(ls
, rc
);
426 receive_rcom_names(ls
, rc
);
429 case DLM_RCOM_LOOKUP
:
430 receive_rcom_lookup(ls
, rc
);
434 receive_rcom_lock(ls
, rc
);
437 case DLM_RCOM_STATUS_REPLY
:
438 receive_rcom_status_reply(ls
, rc
);
441 case DLM_RCOM_NAMES_REPLY
:
442 receive_rcom_names_reply(ls
, rc
);
445 case DLM_RCOM_LOOKUP_REPLY
:
446 receive_rcom_lookup_reply(ls
, rc
);
449 case DLM_RCOM_LOCK_REPLY
:
450 receive_rcom_lock_reply(ls
, rc
);
454 DLM_ASSERT(0, printk("rc_type=%x\n", rc
->rc_type
););
457 dlm_put_lockspace(ls
);
This page took 0.041237 seconds and 5 git commands to generate.