Commit | Line | Data |
---|---|---|
e7fd4179 DT |
1 | /****************************************************************************** |
2 | ******************************************************************************* | |
3 | ** | |
4 | ** Copyright (C) Sistina Software, Inc. 1997-2003 All rights reserved. | |
ac33d071 | 5 | ** Copyright (C) 2004-2006 Red Hat, Inc. All rights reserved. |
e7fd4179 DT |
6 | ** |
7 | ** This copyrighted material is made available to anyone wishing to use, | |
8 | ** modify, copy, or redistribute it subject to the terms and conditions | |
9 | ** of the GNU General Public License v.2. | |
10 | ** | |
11 | ******************************************************************************* | |
12 | ******************************************************************************/ | |
13 | ||
14 | /* | |
15 | * lowcomms.c | |
16 | * | |
17 | * This is the "low-level" comms layer. | |
18 | * | |
19 | * It is responsible for sending/receiving messages | |
20 | * from other nodes in the cluster. | |
21 | * | |
22 | * Cluster nodes are referred to by their nodeids. nodeids are | |
23 | * simply 32 bit numbers to the locking module - if they need to | |
24 | * be expanded for the cluster infrastructure then that is it's | |
25 | * responsibility. It is this layer's | |
26 | * responsibility to resolve these into IP address or | |
27 | * whatever it needs for inter-node communication. | |
28 | * | |
29 | * The comms level is two kernel threads that deal mainly with | |
30 | * the receiving of messages from other nodes and passing them | |
31 | * up to the mid-level comms layer (which understands the | |
32 | * message format) for execution by the locking core, and | |
33 | * a send thread which does all the setting up of connections | |
34 | * to remote nodes and the sending of data. Threads are not allowed | |
35 | * to send their own data because it may cause them to wait in times | |
36 | * of high load. Also, this way, the sending thread can collect together | |
37 | * messages bound for one node and send them in one block. | |
38 | * | |
39 | * I don't see any problem with the recv thread executing the locking | |
40 | * code on behalf of remote processes as the locking code is | |
41 | * short, efficient and never (well, hardly ever) waits. | |
42 | * | |
43 | */ | |
44 | ||
45 | #include <asm/ioctls.h> | |
46 | #include <net/sock.h> | |
47 | #include <net/tcp.h> | |
48 | #include <net/sctp/user.h> | |
49 | #include <linux/pagemap.h> | |
50 | #include <linux/socket.h> | |
51 | #include <linux/idr.h> | |
52 | ||
53 | #include "dlm_internal.h" | |
54 | #include "lowcomms.h" | |
55 | #include "config.h" | |
56 | #include "midcomms.h" | |
57 | ||
47c96298 SW |
58 | static struct sockaddr_storage *dlm_local_addr[DLM_MAX_ADDR_COUNT]; |
59 | static int dlm_local_count; | |
60 | static int dlm_local_nodeid; | |
e7fd4179 DT |
61 | |
62 | /* One of these per connected node */ | |
63 | ||
64 | #define NI_INIT_PENDING 1 | |
65 | #define NI_WRITE_PENDING 2 | |
66 | ||
67 | struct nodeinfo { | |
68 | spinlock_t lock; | |
69 | sctp_assoc_t assoc_id; | |
70 | unsigned long flags; | |
71 | struct list_head write_list; /* nodes with pending writes */ | |
72 | struct list_head writequeue; /* outgoing writequeue_entries */ | |
73 | spinlock_t writequeue_lock; | |
74 | int nodeid; | |
1d6e8131 PC |
75 | struct work_struct swork; /* Send workqueue */ |
76 | struct work_struct lwork; /* Locking workqueue */ | |
e7fd4179 DT |
77 | }; |
78 | ||
79 | static DEFINE_IDR(nodeinfo_idr); | |
ac33d071 PC |
80 | static DECLARE_RWSEM(nodeinfo_lock); |
81 | static int max_nodeid; | |
e7fd4179 DT |
82 | |
83 | struct cbuf { | |
ac33d071 PC |
84 | unsigned int base; |
85 | unsigned int len; | |
86 | unsigned int mask; | |
e7fd4179 DT |
87 | }; |
88 | ||
89 | /* Just the one of these, now. But this struct keeps | |
90 | the connection-specific variables together */ | |
91 | ||
92 | #define CF_READ_PENDING 1 | |
93 | ||
94 | struct connection { | |
ac33d071 | 95 | struct socket *sock; |
e7fd4179 | 96 | unsigned long flags; |
ac33d071 | 97 | struct page *rx_page; |
e7fd4179 DT |
98 | atomic_t waiting_requests; |
99 | struct cbuf cb; | |
100 | int eagain_flag; | |
1d6e8131 | 101 | struct work_struct work; /* Send workqueue */ |
e7fd4179 DT |
102 | }; |
103 | ||
104 | /* An entry waiting to be sent */ | |
105 | ||
106 | struct writequeue_entry { | |
107 | struct list_head list; | |
ac33d071 | 108 | struct page *page; |
e7fd4179 DT |
109 | int offset; |
110 | int len; | |
111 | int end; | |
112 | int users; | |
ac33d071 | 113 | struct nodeinfo *ni; |
e7fd4179 DT |
114 | }; |
115 | ||
ac33d071 PC |
116 | static void cbuf_add(struct cbuf *cb, int n) |
117 | { | |
118 | cb->len += n; | |
119 | } | |
e7fd4179 | 120 | |
ac33d071 PC |
121 | static int cbuf_data(struct cbuf *cb) |
122 | { | |
123 | return ((cb->base + cb->len) & cb->mask); | |
124 | } | |
e7fd4179 | 125 | |
ac33d071 PC |
126 | static void cbuf_init(struct cbuf *cb, int size) |
127 | { | |
128 | cb->base = cb->len = 0; | |
129 | cb->mask = size-1; | |
130 | } | |
e7fd4179 | 131 | |
ac33d071 PC |
132 | static void cbuf_eat(struct cbuf *cb, int n) |
133 | { | |
134 | cb->len -= n; | |
135 | cb->base += n; | |
136 | cb->base &= cb->mask; | |
137 | } | |
e7fd4179 DT |
138 | |
139 | /* List of nodes which have writes pending */ | |
ac33d071 PC |
140 | static LIST_HEAD(write_nodes); |
141 | static DEFINE_SPINLOCK(write_nodes_lock); | |
e7fd4179 | 142 | |
1d6e8131 | 143 | |
e7fd4179 DT |
144 | /* Maximum number of incoming messages to process before |
145 | * doing a schedule() | |
146 | */ | |
147 | #define MAX_RX_MSG_COUNT 25 | |
148 | ||
1d6e8131 PC |
149 | /* Work queues */ |
150 | static struct workqueue_struct *recv_workqueue; | |
151 | static struct workqueue_struct *send_workqueue; | |
152 | static struct workqueue_struct *lock_workqueue; | |
e7fd4179 DT |
153 | |
154 | /* The SCTP connection */ | |
155 | static struct connection sctp_con; | |
156 | ||
1d6e8131 PC |
157 | static void process_send_sockets(struct work_struct *work); |
158 | static void process_recv_sockets(struct work_struct *work); | |
159 | static void process_lock_request(struct work_struct *work); | |
e7fd4179 DT |
160 | |
161 | static int nodeid_to_addr(int nodeid, struct sockaddr *retaddr) | |
162 | { | |
163 | struct sockaddr_storage addr; | |
164 | int error; | |
165 | ||
47c96298 | 166 | if (!dlm_local_count) |
e7fd4179 DT |
167 | return -1; |
168 | ||
169 | error = dlm_nodeid_to_addr(nodeid, &addr); | |
170 | if (error) | |
171 | return error; | |
172 | ||
47c96298 | 173 | if (dlm_local_addr[0]->ss_family == AF_INET) { |
ac33d071 | 174 | struct sockaddr_in *in4 = (struct sockaddr_in *) &addr; |
e7fd4179 DT |
175 | struct sockaddr_in *ret4 = (struct sockaddr_in *) retaddr; |
176 | ret4->sin_addr.s_addr = in4->sin_addr.s_addr; | |
177 | } else { | |
ac33d071 | 178 | struct sockaddr_in6 *in6 = (struct sockaddr_in6 *) &addr; |
e7fd4179 DT |
179 | struct sockaddr_in6 *ret6 = (struct sockaddr_in6 *) retaddr; |
180 | memcpy(&ret6->sin6_addr, &in6->sin6_addr, | |
181 | sizeof(in6->sin6_addr)); | |
182 | } | |
183 | ||
184 | return 0; | |
185 | } | |
186 | ||
ac33d071 PC |
187 | /* If alloc is 0 here we will not attempt to allocate a new |
188 | nodeinfo struct */ | |
38d6fd26 | 189 | static struct nodeinfo *nodeid2nodeinfo(int nodeid, gfp_t alloc) |
e7fd4179 DT |
190 | { |
191 | struct nodeinfo *ni; | |
192 | int r; | |
193 | int n; | |
194 | ||
195 | down_read(&nodeinfo_lock); | |
196 | ni = idr_find(&nodeinfo_idr, nodeid); | |
197 | up_read(&nodeinfo_lock); | |
198 | ||
ac33d071 PC |
199 | if (ni || !alloc) |
200 | return ni; | |
e7fd4179 | 201 | |
ac33d071 | 202 | down_write(&nodeinfo_lock); |
e7fd4179 | 203 | |
ac33d071 PC |
204 | ni = idr_find(&nodeinfo_idr, nodeid); |
205 | if (ni) | |
206 | goto out_up; | |
e7fd4179 | 207 | |
ac33d071 PC |
208 | r = idr_pre_get(&nodeinfo_idr, alloc); |
209 | if (!r) | |
210 | goto out_up; | |
e7fd4179 | 211 | |
ac33d071 PC |
212 | ni = kmalloc(sizeof(struct nodeinfo), alloc); |
213 | if (!ni) | |
214 | goto out_up; | |
215 | ||
216 | r = idr_get_new_above(&nodeinfo_idr, ni, nodeid, &n); | |
217 | if (r) { | |
218 | kfree(ni); | |
219 | ni = NULL; | |
220 | goto out_up; | |
e7fd4179 | 221 | } |
ac33d071 PC |
222 | if (n != nodeid) { |
223 | idr_remove(&nodeinfo_idr, n); | |
224 | kfree(ni); | |
225 | ni = NULL; | |
226 | goto out_up; | |
227 | } | |
228 | memset(ni, 0, sizeof(struct nodeinfo)); | |
229 | spin_lock_init(&ni->lock); | |
230 | INIT_LIST_HEAD(&ni->writequeue); | |
231 | spin_lock_init(&ni->writequeue_lock); | |
1d6e8131 PC |
232 | INIT_WORK(&ni->lwork, process_lock_request); |
233 | INIT_WORK(&ni->swork, process_send_sockets); | |
ac33d071 PC |
234 | ni->nodeid = nodeid; |
235 | ||
236 | if (nodeid > max_nodeid) | |
237 | max_nodeid = nodeid; | |
238 | out_up: | |
239 | up_write(&nodeinfo_lock); | |
e7fd4179 DT |
240 | |
241 | return ni; | |
242 | } | |
243 | ||
244 | /* Don't call this too often... */ | |
245 | static struct nodeinfo *assoc2nodeinfo(sctp_assoc_t assoc) | |
246 | { | |
247 | int i; | |
248 | struct nodeinfo *ni; | |
249 | ||
250 | for (i=1; i<=max_nodeid; i++) { | |
251 | ni = nodeid2nodeinfo(i, 0); | |
252 | if (ni && ni->assoc_id == assoc) | |
253 | return ni; | |
254 | } | |
255 | return NULL; | |
256 | } | |
257 | ||
258 | /* Data or notification available on socket */ | |
259 | static void lowcomms_data_ready(struct sock *sk, int count_unused) | |
260 | { | |
e7fd4179 | 261 | if (test_and_set_bit(CF_READ_PENDING, &sctp_con.flags)) |
1d6e8131 | 262 | queue_work(recv_workqueue, &sctp_con.work); |
e7fd4179 DT |
263 | } |
264 | ||
265 | ||
266 | /* Add the port number to an IP6 or 4 sockaddr and return the address length. | |
267 | Also padd out the struct with zeros to make comparisons meaningful */ | |
268 | ||
269 | static void make_sockaddr(struct sockaddr_storage *saddr, uint16_t port, | |
270 | int *addr_len) | |
271 | { | |
272 | struct sockaddr_in *local4_addr; | |
273 | struct sockaddr_in6 *local6_addr; | |
274 | ||
47c96298 | 275 | if (!dlm_local_count) |
e7fd4179 DT |
276 | return; |
277 | ||
278 | if (!port) { | |
47c96298 SW |
279 | if (dlm_local_addr[0]->ss_family == AF_INET) { |
280 | local4_addr = (struct sockaddr_in *)dlm_local_addr[0]; | |
e7fd4179 DT |
281 | port = be16_to_cpu(local4_addr->sin_port); |
282 | } else { | |
47c96298 | 283 | local6_addr = (struct sockaddr_in6 *)dlm_local_addr[0]; |
e7fd4179 DT |
284 | port = be16_to_cpu(local6_addr->sin6_port); |
285 | } | |
286 | } | |
287 | ||
47c96298 SW |
288 | saddr->ss_family = dlm_local_addr[0]->ss_family; |
289 | if (dlm_local_addr[0]->ss_family == AF_INET) { | |
e7fd4179 DT |
290 | struct sockaddr_in *in4_addr = (struct sockaddr_in *)saddr; |
291 | in4_addr->sin_port = cpu_to_be16(port); | |
292 | memset(&in4_addr->sin_zero, 0, sizeof(in4_addr->sin_zero)); | |
293 | memset(in4_addr+1, 0, sizeof(struct sockaddr_storage) - | |
ac33d071 | 294 | sizeof(struct sockaddr_in)); |
e7fd4179 DT |
295 | *addr_len = sizeof(struct sockaddr_in); |
296 | } else { | |
297 | struct sockaddr_in6 *in6_addr = (struct sockaddr_in6 *)saddr; | |
298 | in6_addr->sin6_port = cpu_to_be16(port); | |
299 | memset(in6_addr+1, 0, sizeof(struct sockaddr_storage) - | |
ac33d071 | 300 | sizeof(struct sockaddr_in6)); |
e7fd4179 DT |
301 | *addr_len = sizeof(struct sockaddr_in6); |
302 | } | |
303 | } | |
304 | ||
305 | /* Close the connection and tidy up */ | |
306 | static void close_connection(void) | |
307 | { | |
308 | if (sctp_con.sock) { | |
309 | sock_release(sctp_con.sock); | |
310 | sctp_con.sock = NULL; | |
311 | } | |
312 | ||
313 | if (sctp_con.rx_page) { | |
314 | __free_page(sctp_con.rx_page); | |
315 | sctp_con.rx_page = NULL; | |
316 | } | |
317 | } | |
318 | ||
319 | /* We only send shutdown messages to nodes that are not part of the cluster */ | |
320 | static void send_shutdown(sctp_assoc_t associd) | |
321 | { | |
322 | static char outcmsg[CMSG_SPACE(sizeof(struct sctp_sndrcvinfo))]; | |
323 | struct msghdr outmessage; | |
324 | struct cmsghdr *cmsg; | |
325 | struct sctp_sndrcvinfo *sinfo; | |
326 | int ret; | |
327 | ||
328 | outmessage.msg_name = NULL; | |
329 | outmessage.msg_namelen = 0; | |
330 | outmessage.msg_control = outcmsg; | |
331 | outmessage.msg_controllen = sizeof(outcmsg); | |
332 | outmessage.msg_flags = MSG_EOR; | |
333 | ||
334 | cmsg = CMSG_FIRSTHDR(&outmessage); | |
335 | cmsg->cmsg_level = IPPROTO_SCTP; | |
336 | cmsg->cmsg_type = SCTP_SNDRCV; | |
337 | cmsg->cmsg_len = CMSG_LEN(sizeof(struct sctp_sndrcvinfo)); | |
338 | outmessage.msg_controllen = cmsg->cmsg_len; | |
ac33d071 | 339 | sinfo = CMSG_DATA(cmsg); |
e7fd4179 DT |
340 | memset(sinfo, 0x00, sizeof(struct sctp_sndrcvinfo)); |
341 | ||
342 | sinfo->sinfo_flags |= MSG_EOF; | |
343 | sinfo->sinfo_assoc_id = associd; | |
344 | ||
345 | ret = kernel_sendmsg(sctp_con.sock, &outmessage, NULL, 0, 0); | |
346 | ||
347 | if (ret != 0) | |
348 | log_print("send EOF to node failed: %d", ret); | |
349 | } | |
350 | ||
351 | ||
352 | /* INIT failed but we don't know which node... | |
353 | restart INIT on all pending nodes */ | |
354 | static void init_failed(void) | |
355 | { | |
356 | int i; | |
357 | struct nodeinfo *ni; | |
358 | ||
359 | for (i=1; i<=max_nodeid; i++) { | |
360 | ni = nodeid2nodeinfo(i, 0); | |
361 | if (!ni) | |
362 | continue; | |
363 | ||
364 | if (test_and_clear_bit(NI_INIT_PENDING, &ni->flags)) { | |
365 | ni->assoc_id = 0; | |
366 | if (!test_and_set_bit(NI_WRITE_PENDING, &ni->flags)) { | |
367 | spin_lock_bh(&write_nodes_lock); | |
368 | list_add_tail(&ni->write_list, &write_nodes); | |
369 | spin_unlock_bh(&write_nodes_lock); | |
1d6e8131 | 370 | queue_work(send_workqueue, &ni->swork); |
e7fd4179 DT |
371 | } |
372 | } | |
373 | } | |
e7fd4179 DT |
374 | } |
375 | ||
376 | /* Something happened to an association */ | |
377 | static void process_sctp_notification(struct msghdr *msg, char *buf) | |
378 | { | |
379 | union sctp_notification *sn = (union sctp_notification *)buf; | |
380 | ||
381 | if (sn->sn_header.sn_type == SCTP_ASSOC_CHANGE) { | |
382 | switch (sn->sn_assoc_change.sac_state) { | |
383 | ||
384 | case SCTP_COMM_UP: | |
385 | case SCTP_RESTART: | |
386 | { | |
387 | /* Check that the new node is in the lockspace */ | |
388 | struct sctp_prim prim; | |
389 | mm_segment_t fs; | |
390 | int nodeid; | |
391 | int prim_len, ret; | |
392 | int addr_len; | |
393 | struct nodeinfo *ni; | |
394 | ||
395 | /* This seems to happen when we received a connection | |
396 | * too early... or something... anyway, it happens but | |
397 | * we always seem to get a real message too, see | |
398 | * receive_from_sock */ | |
399 | ||
400 | if ((int)sn->sn_assoc_change.sac_assoc_id <= 0) { | |
401 | log_print("COMM_UP for invalid assoc ID %d", | |
ac33d071 | 402 | (int)sn->sn_assoc_change.sac_assoc_id); |
e7fd4179 DT |
403 | init_failed(); |
404 | return; | |
405 | } | |
406 | memset(&prim, 0, sizeof(struct sctp_prim)); | |
407 | prim_len = sizeof(struct sctp_prim); | |
408 | prim.ssp_assoc_id = sn->sn_assoc_change.sac_assoc_id; | |
409 | ||
410 | fs = get_fs(); | |
411 | set_fs(get_ds()); | |
412 | ret = sctp_con.sock->ops->getsockopt(sctp_con.sock, | |
ac33d071 PC |
413 | IPPROTO_SCTP, |
414 | SCTP_PRIMARY_ADDR, | |
415 | (char*)&prim, | |
416 | &prim_len); | |
e7fd4179 DT |
417 | set_fs(fs); |
418 | if (ret < 0) { | |
419 | struct nodeinfo *ni; | |
420 | ||
421 | log_print("getsockopt/sctp_primary_addr on " | |
422 | "new assoc %d failed : %d", | |
ac33d071 PC |
423 | (int)sn->sn_assoc_change.sac_assoc_id, |
424 | ret); | |
e7fd4179 DT |
425 | |
426 | /* Retry INIT later */ | |
427 | ni = assoc2nodeinfo(sn->sn_assoc_change.sac_assoc_id); | |
428 | if (ni) | |
429 | clear_bit(NI_INIT_PENDING, &ni->flags); | |
430 | return; | |
431 | } | |
432 | make_sockaddr(&prim.ssp_addr, 0, &addr_len); | |
433 | if (dlm_addr_to_nodeid(&prim.ssp_addr, &nodeid)) { | |
434 | log_print("reject connect from unknown addr"); | |
435 | send_shutdown(prim.ssp_assoc_id); | |
436 | return; | |
437 | } | |
438 | ||
439 | ni = nodeid2nodeinfo(nodeid, GFP_KERNEL); | |
440 | if (!ni) | |
441 | return; | |
442 | ||
443 | /* Save the assoc ID */ | |
e7fd4179 | 444 | ni->assoc_id = sn->sn_assoc_change.sac_assoc_id; |
e7fd4179 DT |
445 | |
446 | log_print("got new/restarted association %d nodeid %d", | |
ac33d071 | 447 | (int)sn->sn_assoc_change.sac_assoc_id, nodeid); |
e7fd4179 DT |
448 | |
449 | /* Send any pending writes */ | |
450 | clear_bit(NI_INIT_PENDING, &ni->flags); | |
451 | if (!test_and_set_bit(NI_WRITE_PENDING, &ni->flags)) { | |
452 | spin_lock_bh(&write_nodes_lock); | |
453 | list_add_tail(&ni->write_list, &write_nodes); | |
454 | spin_unlock_bh(&write_nodes_lock); | |
1d6e8131 | 455 | queue_work(send_workqueue, &ni->swork); |
e7fd4179 | 456 | } |
e7fd4179 DT |
457 | } |
458 | break; | |
459 | ||
460 | case SCTP_COMM_LOST: | |
461 | case SCTP_SHUTDOWN_COMP: | |
462 | { | |
463 | struct nodeinfo *ni; | |
464 | ||
465 | ni = assoc2nodeinfo(sn->sn_assoc_change.sac_assoc_id); | |
466 | if (ni) { | |
467 | spin_lock(&ni->lock); | |
468 | ni->assoc_id = 0; | |
469 | spin_unlock(&ni->lock); | |
470 | } | |
471 | } | |
472 | break; | |
473 | ||
474 | /* We don't know which INIT failed, so clear the PENDING flags | |
475 | * on them all. if assoc_id is zero then it will then try | |
476 | * again */ | |
477 | ||
478 | case SCTP_CANT_STR_ASSOC: | |
479 | { | |
480 | log_print("Can't start SCTP association - retrying"); | |
481 | init_failed(); | |
482 | } | |
483 | break; | |
484 | ||
485 | default: | |
486 | log_print("unexpected SCTP assoc change id=%d state=%d", | |
487 | (int)sn->sn_assoc_change.sac_assoc_id, | |
488 | sn->sn_assoc_change.sac_state); | |
489 | } | |
490 | } | |
491 | } | |
492 | ||
493 | /* Data received from remote end */ | |
494 | static int receive_from_sock(void) | |
495 | { | |
496 | int ret = 0; | |
497 | struct msghdr msg; | |
498 | struct kvec iov[2]; | |
499 | unsigned len; | |
500 | int r; | |
501 | struct sctp_sndrcvinfo *sinfo; | |
502 | struct cmsghdr *cmsg; | |
503 | struct nodeinfo *ni; | |
504 | ||
505 | /* These two are marginally too big for stack allocation, but this | |
506 | * function is (currently) only called by dlm_recvd so static should be | |
507 | * OK. | |
508 | */ | |
509 | static struct sockaddr_storage msgname; | |
510 | static char incmsg[CMSG_SPACE(sizeof(struct sctp_sndrcvinfo))]; | |
511 | ||
512 | if (sctp_con.sock == NULL) | |
513 | goto out; | |
514 | ||
515 | if (sctp_con.rx_page == NULL) { | |
516 | /* | |
517 | * This doesn't need to be atomic, but I think it should | |
518 | * improve performance if it is. | |
519 | */ | |
520 | sctp_con.rx_page = alloc_page(GFP_ATOMIC); | |
521 | if (sctp_con.rx_page == NULL) | |
522 | goto out_resched; | |
ac33d071 | 523 | cbuf_init(&sctp_con.cb, PAGE_CACHE_SIZE); |
e7fd4179 DT |
524 | } |
525 | ||
526 | memset(&incmsg, 0, sizeof(incmsg)); | |
527 | memset(&msgname, 0, sizeof(msgname)); | |
528 | ||
e7fd4179 DT |
529 | msg.msg_name = &msgname; |
530 | msg.msg_namelen = sizeof(msgname); | |
531 | msg.msg_flags = 0; | |
532 | msg.msg_control = incmsg; | |
533 | msg.msg_controllen = sizeof(incmsg); | |
42fb0083 | 534 | msg.msg_iovlen = 1; |
e7fd4179 DT |
535 | |
536 | /* I don't see why this circular buffer stuff is necessary for SCTP | |
537 | * which is a packet-based protocol, but the whole thing breaks under | |
538 | * load without it! The overhead is minimal (and is in the TCP lowcomms | |
539 | * anyway, of course) so I'll leave it in until I can figure out what's | |
540 | * really happening. | |
541 | */ | |
542 | ||
543 | /* | |
544 | * iov[0] is the bit of the circular buffer between the current end | |
545 | * point (cb.base + cb.len) and the end of the buffer. | |
546 | */ | |
ac33d071 | 547 | iov[0].iov_len = sctp_con.cb.base - cbuf_data(&sctp_con.cb); |
e7fd4179 | 548 | iov[0].iov_base = page_address(sctp_con.rx_page) + |
ac33d071 | 549 | cbuf_data(&sctp_con.cb); |
e7fd4179 DT |
550 | iov[1].iov_len = 0; |
551 | ||
552 | /* | |
553 | * iov[1] is the bit of the circular buffer between the start of the | |
554 | * buffer and the start of the currently used section (cb.base) | |
555 | */ | |
ac33d071 PC |
556 | if (cbuf_data(&sctp_con.cb) >= sctp_con.cb.base) { |
557 | iov[0].iov_len = PAGE_CACHE_SIZE - cbuf_data(&sctp_con.cb); | |
e7fd4179 DT |
558 | iov[1].iov_len = sctp_con.cb.base; |
559 | iov[1].iov_base = page_address(sctp_con.rx_page); | |
560 | msg.msg_iovlen = 2; | |
561 | } | |
562 | len = iov[0].iov_len + iov[1].iov_len; | |
563 | ||
4c5e1b1a | 564 | r = ret = kernel_recvmsg(sctp_con.sock, &msg, iov, msg.msg_iovlen, len, |
e7fd4179 DT |
565 | MSG_NOSIGNAL | MSG_DONTWAIT); |
566 | if (ret <= 0) | |
567 | goto out_close; | |
568 | ||
569 | msg.msg_control = incmsg; | |
570 | msg.msg_controllen = sizeof(incmsg); | |
571 | cmsg = CMSG_FIRSTHDR(&msg); | |
ac33d071 | 572 | sinfo = CMSG_DATA(cmsg); |
e7fd4179 DT |
573 | |
574 | if (msg.msg_flags & MSG_NOTIFICATION) { | |
575 | process_sctp_notification(&msg, page_address(sctp_con.rx_page)); | |
576 | return 0; | |
577 | } | |
578 | ||
579 | /* Is this a new association ? */ | |
580 | ni = nodeid2nodeinfo(le32_to_cpu(sinfo->sinfo_ppid), GFP_KERNEL); | |
581 | if (ni) { | |
582 | ni->assoc_id = sinfo->sinfo_assoc_id; | |
583 | if (test_and_clear_bit(NI_INIT_PENDING, &ni->flags)) { | |
584 | ||
585 | if (!test_and_set_bit(NI_WRITE_PENDING, &ni->flags)) { | |
586 | spin_lock_bh(&write_nodes_lock); | |
587 | list_add_tail(&ni->write_list, &write_nodes); | |
588 | spin_unlock_bh(&write_nodes_lock); | |
1d6e8131 | 589 | queue_work(send_workqueue, &ni->swork); |
e7fd4179 | 590 | } |
e7fd4179 DT |
591 | } |
592 | } | |
593 | ||
594 | /* INIT sends a message with length of 1 - ignore it */ | |
595 | if (r == 1) | |
596 | return 0; | |
597 | ||
ac33d071 | 598 | cbuf_add(&sctp_con.cb, ret); |
1d6e8131 | 599 | // PJC: TODO: Add to node's workqueue....can we ?? |
e7fd4179 DT |
600 | ret = dlm_process_incoming_buffer(cpu_to_le32(sinfo->sinfo_ppid), |
601 | page_address(sctp_con.rx_page), | |
602 | sctp_con.cb.base, sctp_con.cb.len, | |
603 | PAGE_CACHE_SIZE); | |
604 | if (ret < 0) | |
605 | goto out_close; | |
ac33d071 | 606 | cbuf_eat(&sctp_con.cb, ret); |
e7fd4179 | 607 | |
ac33d071 | 608 | out: |
e7fd4179 DT |
609 | ret = 0; |
610 | goto out_ret; | |
611 | ||
ac33d071 | 612 | out_resched: |
e7fd4179 DT |
613 | lowcomms_data_ready(sctp_con.sock->sk, 0); |
614 | ret = 0; | |
ac33d071 | 615 | cond_resched(); |
e7fd4179 DT |
616 | goto out_ret; |
617 | ||
ac33d071 | 618 | out_close: |
e7fd4179 DT |
619 | if (ret != -EAGAIN) |
620 | log_print("error reading from sctp socket: %d", ret); | |
ac33d071 | 621 | out_ret: |
e7fd4179 DT |
622 | return ret; |
623 | } | |
624 | ||
625 | /* Bind to an IP address. SCTP allows multiple address so it can do multi-homing */ | |
626 | static int add_bind_addr(struct sockaddr_storage *addr, int addr_len, int num) | |
627 | { | |
628 | mm_segment_t fs; | |
629 | int result = 0; | |
630 | ||
631 | fs = get_fs(); | |
632 | set_fs(get_ds()); | |
633 | if (num == 1) | |
634 | result = sctp_con.sock->ops->bind(sctp_con.sock, | |
ac33d071 PC |
635 | (struct sockaddr *) addr, |
636 | addr_len); | |
e7fd4179 DT |
637 | else |
638 | result = sctp_con.sock->ops->setsockopt(sctp_con.sock, SOL_SCTP, | |
ac33d071 PC |
639 | SCTP_SOCKOPT_BINDX_ADD, |
640 | (char *)addr, addr_len); | |
e7fd4179 DT |
641 | set_fs(fs); |
642 | ||
643 | if (result < 0) | |
644 | log_print("Can't bind to port %d addr number %d", | |
68c817a1 | 645 | dlm_config.ci_tcp_port, num); |
e7fd4179 DT |
646 | |
647 | return result; | |
648 | } | |
649 | ||
650 | static void init_local(void) | |
651 | { | |
652 | struct sockaddr_storage sas, *addr; | |
653 | int i; | |
654 | ||
47c96298 | 655 | dlm_local_nodeid = dlm_our_nodeid(); |
e7fd4179 DT |
656 | |
657 | for (i = 0; i < DLM_MAX_ADDR_COUNT - 1; i++) { | |
658 | if (dlm_our_addr(&sas, i)) | |
659 | break; | |
660 | ||
661 | addr = kmalloc(sizeof(*addr), GFP_KERNEL); | |
662 | if (!addr) | |
663 | break; | |
664 | memcpy(addr, &sas, sizeof(*addr)); | |
47c96298 | 665 | dlm_local_addr[dlm_local_count++] = addr; |
e7fd4179 DT |
666 | } |
667 | } | |
668 | ||
669 | /* Initialise SCTP socket and bind to all interfaces */ | |
670 | static int init_sock(void) | |
671 | { | |
672 | mm_segment_t fs; | |
673 | struct socket *sock = NULL; | |
674 | struct sockaddr_storage localaddr; | |
675 | struct sctp_event_subscribe subscribe; | |
676 | int result = -EINVAL, num = 1, i, addr_len; | |
677 | ||
47c96298 | 678 | if (!dlm_local_count) { |
e7fd4179 | 679 | init_local(); |
47c96298 | 680 | if (!dlm_local_count) { |
e7fd4179 DT |
681 | log_print("no local IP address has been set"); |
682 | goto out; | |
683 | } | |
684 | } | |
685 | ||
47c96298 | 686 | result = sock_create_kern(dlm_local_addr[0]->ss_family, SOCK_SEQPACKET, |
e7fd4179 DT |
687 | IPPROTO_SCTP, &sock); |
688 | if (result < 0) { | |
689 | log_print("Can't create comms socket, check SCTP is loaded"); | |
690 | goto out; | |
691 | } | |
692 | ||
693 | /* Listen for events */ | |
694 | memset(&subscribe, 0, sizeof(subscribe)); | |
695 | subscribe.sctp_data_io_event = 1; | |
696 | subscribe.sctp_association_event = 1; | |
697 | subscribe.sctp_send_failure_event = 1; | |
698 | subscribe.sctp_shutdown_event = 1; | |
699 | subscribe.sctp_partial_delivery_event = 1; | |
700 | ||
701 | fs = get_fs(); | |
702 | set_fs(get_ds()); | |
703 | result = sock->ops->setsockopt(sock, SOL_SCTP, SCTP_EVENTS, | |
704 | (char *)&subscribe, sizeof(subscribe)); | |
705 | set_fs(fs); | |
706 | ||
707 | if (result < 0) { | |
708 | log_print("Failed to set SCTP_EVENTS on socket: result=%d", | |
709 | result); | |
710 | goto create_delsock; | |
711 | } | |
712 | ||
713 | /* Init con struct */ | |
714 | sock->sk->sk_user_data = &sctp_con; | |
715 | sctp_con.sock = sock; | |
716 | sctp_con.sock->sk->sk_data_ready = lowcomms_data_ready; | |
717 | ||
718 | /* Bind to all interfaces. */ | |
47c96298 SW |
719 | for (i = 0; i < dlm_local_count; i++) { |
720 | memcpy(&localaddr, dlm_local_addr[i], sizeof(localaddr)); | |
68c817a1 | 721 | make_sockaddr(&localaddr, dlm_config.ci_tcp_port, &addr_len); |
e7fd4179 DT |
722 | |
723 | result = add_bind_addr(&localaddr, addr_len, num); | |
724 | if (result) | |
725 | goto create_delsock; | |
726 | ++num; | |
727 | } | |
728 | ||
729 | result = sock->ops->listen(sock, 5); | |
730 | if (result < 0) { | |
731 | log_print("Can't set socket listening"); | |
732 | goto create_delsock; | |
733 | } | |
734 | ||
735 | return 0; | |
736 | ||
ac33d071 | 737 | create_delsock: |
e7fd4179 DT |
738 | sock_release(sock); |
739 | sctp_con.sock = NULL; | |
ac33d071 | 740 | out: |
e7fd4179 DT |
741 | return result; |
742 | } | |
743 | ||
744 | ||
38d6fd26 | 745 | static struct writequeue_entry *new_writequeue_entry(gfp_t allocation) |
e7fd4179 DT |
746 | { |
747 | struct writequeue_entry *entry; | |
748 | ||
749 | entry = kmalloc(sizeof(struct writequeue_entry), allocation); | |
750 | if (!entry) | |
751 | return NULL; | |
752 | ||
753 | entry->page = alloc_page(allocation); | |
754 | if (!entry->page) { | |
755 | kfree(entry); | |
756 | return NULL; | |
757 | } | |
758 | ||
759 | entry->offset = 0; | |
760 | entry->len = 0; | |
761 | entry->end = 0; | |
762 | entry->users = 0; | |
763 | ||
764 | return entry; | |
765 | } | |
766 | ||
38d6fd26 | 767 | void *dlm_lowcomms_get_buffer(int nodeid, int len, gfp_t allocation, char **ppc) |
e7fd4179 DT |
768 | { |
769 | struct writequeue_entry *e; | |
770 | int offset = 0; | |
771 | int users = 0; | |
772 | struct nodeinfo *ni; | |
773 | ||
e7fd4179 DT |
774 | ni = nodeid2nodeinfo(nodeid, allocation); |
775 | if (!ni) | |
776 | return NULL; | |
777 | ||
778 | spin_lock(&ni->writequeue_lock); | |
779 | e = list_entry(ni->writequeue.prev, struct writequeue_entry, list); | |
ac33d071 | 780 | if ((&e->list == &ni->writequeue) || |
e7fd4179 DT |
781 | (PAGE_CACHE_SIZE - e->end < len)) { |
782 | e = NULL; | |
783 | } else { | |
784 | offset = e->end; | |
785 | e->end += len; | |
786 | users = e->users++; | |
787 | } | |
788 | spin_unlock(&ni->writequeue_lock); | |
789 | ||
790 | if (e) { | |
ac33d071 | 791 | got_one: |
e7fd4179 DT |
792 | if (users == 0) |
793 | kmap(e->page); | |
794 | *ppc = page_address(e->page) + offset; | |
795 | return e; | |
796 | } | |
797 | ||
798 | e = new_writequeue_entry(allocation); | |
799 | if (e) { | |
800 | spin_lock(&ni->writequeue_lock); | |
801 | offset = e->end; | |
802 | e->end += len; | |
803 | e->ni = ni; | |
804 | users = e->users++; | |
805 | list_add_tail(&e->list, &ni->writequeue); | |
806 | spin_unlock(&ni->writequeue_lock); | |
807 | goto got_one; | |
808 | } | |
809 | return NULL; | |
810 | } | |
811 | ||
812 | void dlm_lowcomms_commit_buffer(void *arg) | |
813 | { | |
814 | struct writequeue_entry *e = (struct writequeue_entry *) arg; | |
815 | int users; | |
816 | struct nodeinfo *ni = e->ni; | |
817 | ||
e7fd4179 DT |
818 | spin_lock(&ni->writequeue_lock); |
819 | users = --e->users; | |
820 | if (users) | |
821 | goto out; | |
822 | e->len = e->end - e->offset; | |
823 | kunmap(e->page); | |
824 | spin_unlock(&ni->writequeue_lock); | |
825 | ||
826 | if (!test_and_set_bit(NI_WRITE_PENDING, &ni->flags)) { | |
827 | spin_lock_bh(&write_nodes_lock); | |
828 | list_add_tail(&ni->write_list, &write_nodes); | |
829 | spin_unlock_bh(&write_nodes_lock); | |
1d6e8131 PC |
830 | |
831 | queue_work(send_workqueue, &ni->swork); | |
e7fd4179 DT |
832 | } |
833 | return; | |
834 | ||
ac33d071 | 835 | out: |
e7fd4179 DT |
836 | spin_unlock(&ni->writequeue_lock); |
837 | return; | |
838 | } | |
839 | ||
840 | static void free_entry(struct writequeue_entry *e) | |
841 | { | |
842 | __free_page(e->page); | |
843 | kfree(e); | |
844 | } | |
845 | ||
846 | /* Initiate an SCTP association. In theory we could just use sendmsg() on | |
847 | the first IP address and it should work, but this allows us to set up the | |
848 | association before sending any valuable data that we can't afford to lose. | |
849 | It also keeps the send path clean as it can now always use the association ID */ | |
850 | static void initiate_association(int nodeid) | |
851 | { | |
852 | struct sockaddr_storage rem_addr; | |
853 | static char outcmsg[CMSG_SPACE(sizeof(struct sctp_sndrcvinfo))]; | |
854 | struct msghdr outmessage; | |
855 | struct cmsghdr *cmsg; | |
856 | struct sctp_sndrcvinfo *sinfo; | |
857 | int ret; | |
858 | int addrlen; | |
859 | char buf[1]; | |
860 | struct kvec iov[1]; | |
861 | struct nodeinfo *ni; | |
862 | ||
863 | log_print("Initiating association with node %d", nodeid); | |
864 | ||
865 | ni = nodeid2nodeinfo(nodeid, GFP_KERNEL); | |
866 | if (!ni) | |
867 | return; | |
868 | ||
869 | if (nodeid_to_addr(nodeid, (struct sockaddr *)&rem_addr)) { | |
870 | log_print("no address for nodeid %d", nodeid); | |
871 | return; | |
872 | } | |
873 | ||
68c817a1 | 874 | make_sockaddr(&rem_addr, dlm_config.ci_tcp_port, &addrlen); |
e7fd4179 DT |
875 | |
876 | outmessage.msg_name = &rem_addr; | |
877 | outmessage.msg_namelen = addrlen; | |
878 | outmessage.msg_control = outcmsg; | |
879 | outmessage.msg_controllen = sizeof(outcmsg); | |
880 | outmessage.msg_flags = MSG_EOR; | |
881 | ||
882 | iov[0].iov_base = buf; | |
883 | iov[0].iov_len = 1; | |
884 | ||
885 | /* Real INIT messages seem to cause trouble. Just send a 1 byte message | |
886 | we can afford to lose */ | |
887 | cmsg = CMSG_FIRSTHDR(&outmessage); | |
888 | cmsg->cmsg_level = IPPROTO_SCTP; | |
889 | cmsg->cmsg_type = SCTP_SNDRCV; | |
890 | cmsg->cmsg_len = CMSG_LEN(sizeof(struct sctp_sndrcvinfo)); | |
ac33d071 | 891 | sinfo = CMSG_DATA(cmsg); |
e7fd4179 | 892 | memset(sinfo, 0x00, sizeof(struct sctp_sndrcvinfo)); |
47c96298 | 893 | sinfo->sinfo_ppid = cpu_to_le32(dlm_local_nodeid); |
e7fd4179 DT |
894 | |
895 | outmessage.msg_controllen = cmsg->cmsg_len; | |
896 | ret = kernel_sendmsg(sctp_con.sock, &outmessage, iov, 1, 1); | |
897 | if (ret < 0) { | |
898 | log_print("send INIT to node failed: %d", ret); | |
899 | /* Try again later */ | |
900 | clear_bit(NI_INIT_PENDING, &ni->flags); | |
901 | } | |
902 | } | |
903 | ||
904 | /* Send a message */ | |
ac33d071 | 905 | static void send_to_sock(struct nodeinfo *ni) |
e7fd4179 DT |
906 | { |
907 | int ret = 0; | |
908 | struct writequeue_entry *e; | |
909 | int len, offset; | |
910 | struct msghdr outmsg; | |
911 | static char outcmsg[CMSG_SPACE(sizeof(struct sctp_sndrcvinfo))]; | |
912 | struct cmsghdr *cmsg; | |
913 | struct sctp_sndrcvinfo *sinfo; | |
914 | struct kvec iov; | |
915 | ||
ac33d071 | 916 | /* See if we need to init an association before we start |
e7fd4179 DT |
917 | sending precious messages */ |
918 | spin_lock(&ni->lock); | |
919 | if (!ni->assoc_id && !test_and_set_bit(NI_INIT_PENDING, &ni->flags)) { | |
920 | spin_unlock(&ni->lock); | |
921 | initiate_association(ni->nodeid); | |
ac33d071 | 922 | return; |
e7fd4179 DT |
923 | } |
924 | spin_unlock(&ni->lock); | |
925 | ||
926 | outmsg.msg_name = NULL; /* We use assoc_id */ | |
927 | outmsg.msg_namelen = 0; | |
928 | outmsg.msg_control = outcmsg; | |
929 | outmsg.msg_controllen = sizeof(outcmsg); | |
930 | outmsg.msg_flags = MSG_DONTWAIT | MSG_NOSIGNAL | MSG_EOR; | |
931 | ||
932 | cmsg = CMSG_FIRSTHDR(&outmsg); | |
933 | cmsg->cmsg_level = IPPROTO_SCTP; | |
934 | cmsg->cmsg_type = SCTP_SNDRCV; | |
935 | cmsg->cmsg_len = CMSG_LEN(sizeof(struct sctp_sndrcvinfo)); | |
ac33d071 | 936 | sinfo = CMSG_DATA(cmsg); |
e7fd4179 | 937 | memset(sinfo, 0x00, sizeof(struct sctp_sndrcvinfo)); |
47c96298 | 938 | sinfo->sinfo_ppid = cpu_to_le32(dlm_local_nodeid); |
e7fd4179 DT |
939 | sinfo->sinfo_assoc_id = ni->assoc_id; |
940 | outmsg.msg_controllen = cmsg->cmsg_len; | |
941 | ||
942 | spin_lock(&ni->writequeue_lock); | |
943 | for (;;) { | |
944 | if (list_empty(&ni->writequeue)) | |
945 | break; | |
946 | e = list_entry(ni->writequeue.next, struct writequeue_entry, | |
947 | list); | |
e7fd4179 DT |
948 | len = e->len; |
949 | offset = e->offset; | |
950 | BUG_ON(len == 0 && e->users == 0); | |
951 | spin_unlock(&ni->writequeue_lock); | |
fcc8abc8 | 952 | kmap(e->page); |
e7fd4179 DT |
953 | |
954 | ret = 0; | |
955 | if (len) { | |
956 | iov.iov_base = page_address(e->page)+offset; | |
957 | iov.iov_len = len; | |
958 | ||
959 | ret = kernel_sendmsg(sctp_con.sock, &outmsg, &iov, 1, | |
960 | len); | |
961 | if (ret == -EAGAIN) { | |
962 | sctp_con.eagain_flag = 1; | |
963 | goto out; | |
964 | } else if (ret < 0) | |
965 | goto send_error; | |
966 | } else { | |
967 | /* Don't starve people filling buffers */ | |
ac33d071 | 968 | cond_resched(); |
e7fd4179 DT |
969 | } |
970 | ||
971 | spin_lock(&ni->writequeue_lock); | |
972 | e->offset += ret; | |
973 | e->len -= ret; | |
974 | ||
975 | if (e->len == 0 && e->users == 0) { | |
976 | list_del(&e->list); | |
ac33d071 | 977 | kunmap(e->page); |
e7fd4179 DT |
978 | free_entry(e); |
979 | continue; | |
980 | } | |
981 | } | |
982 | spin_unlock(&ni->writequeue_lock); | |
ac33d071 PC |
983 | out: |
984 | return; | |
e7fd4179 | 985 | |
ac33d071 | 986 | send_error: |
e7fd4179 DT |
987 | log_print("Error sending to node %d %d", ni->nodeid, ret); |
988 | spin_lock(&ni->lock); | |
989 | if (!test_and_set_bit(NI_INIT_PENDING, &ni->flags)) { | |
990 | ni->assoc_id = 0; | |
991 | spin_unlock(&ni->lock); | |
992 | initiate_association(ni->nodeid); | |
993 | } else | |
994 | spin_unlock(&ni->lock); | |
995 | ||
ac33d071 | 996 | return; |
e7fd4179 DT |
997 | } |
998 | ||
999 | /* Try to send any messages that are pending */ | |
1000 | static void process_output_queue(void) | |
1001 | { | |
1002 | struct list_head *list; | |
1003 | struct list_head *temp; | |
1004 | ||
1005 | spin_lock_bh(&write_nodes_lock); | |
1006 | list_for_each_safe(list, temp, &write_nodes) { | |
1007 | struct nodeinfo *ni = | |
ac33d071 | 1008 | list_entry(list, struct nodeinfo, write_list); |
e7fd4179 DT |
1009 | clear_bit(NI_WRITE_PENDING, &ni->flags); |
1010 | list_del(&ni->write_list); | |
1011 | ||
1012 | spin_unlock_bh(&write_nodes_lock); | |
1013 | ||
1014 | send_to_sock(ni); | |
1015 | spin_lock_bh(&write_nodes_lock); | |
1016 | } | |
1017 | spin_unlock_bh(&write_nodes_lock); | |
1018 | } | |
1019 | ||
1020 | /* Called after we've had -EAGAIN and been woken up */ | |
1021 | static void refill_write_queue(void) | |
1022 | { | |
1023 | int i; | |
1024 | ||
1025 | for (i=1; i<=max_nodeid; i++) { | |
1026 | struct nodeinfo *ni = nodeid2nodeinfo(i, 0); | |
1027 | ||
1028 | if (ni) { | |
1029 | if (!test_and_set_bit(NI_WRITE_PENDING, &ni->flags)) { | |
1030 | spin_lock_bh(&write_nodes_lock); | |
1031 | list_add_tail(&ni->write_list, &write_nodes); | |
1032 | spin_unlock_bh(&write_nodes_lock); | |
1033 | } | |
1034 | } | |
1035 | } | |
1036 | } | |
1037 | ||
1038 | static void clean_one_writequeue(struct nodeinfo *ni) | |
1039 | { | |
1040 | struct list_head *list; | |
1041 | struct list_head *temp; | |
1042 | ||
1043 | spin_lock(&ni->writequeue_lock); | |
1044 | list_for_each_safe(list, temp, &ni->writequeue) { | |
1045 | struct writequeue_entry *e = | |
1046 | list_entry(list, struct writequeue_entry, list); | |
1047 | list_del(&e->list); | |
1048 | free_entry(e); | |
1049 | } | |
1050 | spin_unlock(&ni->writequeue_lock); | |
1051 | } | |
1052 | ||
1053 | static void clean_writequeues(void) | |
1054 | { | |
1055 | int i; | |
1056 | ||
1057 | for (i=1; i<=max_nodeid; i++) { | |
1058 | struct nodeinfo *ni = nodeid2nodeinfo(i, 0); | |
1059 | if (ni) | |
1060 | clean_one_writequeue(ni); | |
1061 | } | |
1062 | } | |
1063 | ||
1064 | ||
1065 | static void dealloc_nodeinfo(void) | |
1066 | { | |
1067 | int i; | |
1068 | ||
1069 | for (i=1; i<=max_nodeid; i++) { | |
1070 | struct nodeinfo *ni = nodeid2nodeinfo(i, 0); | |
1071 | if (ni) { | |
1072 | idr_remove(&nodeinfo_idr, i); | |
1073 | kfree(ni); | |
1074 | } | |
1075 | } | |
1076 | } | |
1077 | ||
1c032c03 DT |
1078 | int dlm_lowcomms_close(int nodeid) |
1079 | { | |
1080 | struct nodeinfo *ni; | |
1081 | ||
1082 | ni = nodeid2nodeinfo(nodeid, 0); | |
1083 | if (!ni) | |
1084 | return -1; | |
1085 | ||
1086 | spin_lock(&ni->lock); | |
1087 | if (ni->assoc_id) { | |
1088 | ni->assoc_id = 0; | |
1089 | /* Don't send shutdown here, sctp will just queue it | |
1090 | till the node comes back up! */ | |
1091 | } | |
1092 | spin_unlock(&ni->lock); | |
1093 | ||
1094 | clean_one_writequeue(ni); | |
1095 | clear_bit(NI_INIT_PENDING, &ni->flags); | |
1096 | return 0; | |
1097 | } | |
1098 | ||
1d6e8131 PC |
1099 | // PJC: The work queue function for receiving. |
1100 | static void process_recv_sockets(struct work_struct *work) | |
e7fd4179 | 1101 | { |
1d6e8131 PC |
1102 | if (test_and_clear_bit(CF_READ_PENDING, &sctp_con.flags)) { |
1103 | int ret; | |
1104 | int count = 0; | |
e7fd4179 | 1105 | |
1d6e8131 PC |
1106 | do { |
1107 | ret = receive_from_sock(); | |
e7fd4179 | 1108 | |
1d6e8131 PC |
1109 | /* Don't starve out everyone else */ |
1110 | if (++count >= MAX_RX_MSG_COUNT) { | |
1111 | cond_resched(); | |
1112 | count = 0; | |
1113 | } | |
1114 | } while (!kthread_should_stop() && ret >=0); | |
1115 | } | |
1116 | cond_resched(); | |
e7fd4179 DT |
1117 | } |
1118 | ||
1d6e8131 PC |
1119 | // PJC: the work queue function for sending |
1120 | static void process_send_sockets(struct work_struct *work) | |
e7fd4179 | 1121 | { |
1d6e8131 PC |
1122 | if (sctp_con.eagain_flag) { |
1123 | sctp_con.eagain_flag = 0; | |
1124 | refill_write_queue(); | |
e7fd4179 | 1125 | } |
1d6e8131 | 1126 | process_output_queue(); |
e7fd4179 DT |
1127 | } |
1128 | ||
1d6e8131 PC |
1129 | // PJC: Process lock requests from a particular node. |
1130 | // TODO: can we optimise this out on UP ?? | |
1131 | static void process_lock_request(struct work_struct *work) | |
e7fd4179 | 1132 | { |
e7fd4179 DT |
1133 | } |
1134 | ||
1135 | static void daemons_stop(void) | |
1136 | { | |
1d6e8131 PC |
1137 | destroy_workqueue(recv_workqueue); |
1138 | destroy_workqueue(send_workqueue); | |
1139 | destroy_workqueue(lock_workqueue); | |
e7fd4179 DT |
1140 | } |
1141 | ||
1142 | static int daemons_start(void) | |
1143 | { | |
e7fd4179 | 1144 | int error; |
1d6e8131 PC |
1145 | recv_workqueue = create_workqueue("dlm_recv"); |
1146 | error = IS_ERR(recv_workqueue); | |
1147 | if (error) { | |
1148 | log_print("can't start dlm_recv %d", error); | |
1149 | return error; | |
1150 | } | |
e7fd4179 | 1151 | |
1d6e8131 PC |
1152 | send_workqueue = create_singlethread_workqueue("dlm_send"); |
1153 | error = IS_ERR(send_workqueue); | |
ac33d071 | 1154 | if (error) { |
1d6e8131 PC |
1155 | log_print("can't start dlm_send %d", error); |
1156 | destroy_workqueue(recv_workqueue); | |
e7fd4179 DT |
1157 | return error; |
1158 | } | |
e7fd4179 | 1159 | |
1d6e8131 PC |
1160 | lock_workqueue = create_workqueue("dlm_rlock"); |
1161 | error = IS_ERR(lock_workqueue); | |
ac33d071 | 1162 | if (error) { |
1d6e8131 PC |
1163 | log_print("can't start dlm_rlock %d", error); |
1164 | destroy_workqueue(send_workqueue); | |
1165 | destroy_workqueue(recv_workqueue); | |
e7fd4179 DT |
1166 | return error; |
1167 | } | |
e7fd4179 DT |
1168 | |
1169 | return 0; | |
1170 | } | |
1171 | ||
1172 | /* | |
1173 | * This is quite likely to sleep... | |
1174 | */ | |
1175 | int dlm_lowcomms_start(void) | |
1176 | { | |
1177 | int error; | |
1178 | ||
1d6e8131 PC |
1179 | INIT_WORK(&sctp_con.work, process_recv_sockets); |
1180 | ||
e7fd4179 DT |
1181 | error = init_sock(); |
1182 | if (error) | |
1183 | goto fail_sock; | |
1184 | error = daemons_start(); | |
1185 | if (error) | |
1186 | goto fail_sock; | |
e7fd4179 DT |
1187 | return 0; |
1188 | ||
ac33d071 | 1189 | fail_sock: |
e7fd4179 DT |
1190 | close_connection(); |
1191 | return error; | |
1192 | } | |
1193 | ||
e7fd4179 DT |
1194 | void dlm_lowcomms_stop(void) |
1195 | { | |
ac33d071 PC |
1196 | int i; |
1197 | ||
e7fd4179 DT |
1198 | sctp_con.flags = 0x7; |
1199 | daemons_stop(); | |
1200 | clean_writequeues(); | |
1201 | close_connection(); | |
1202 | dealloc_nodeinfo(); | |
1203 | max_nodeid = 0; | |
e7fd4179 | 1204 | |
ac33d071 PC |
1205 | dlm_local_count = 0; |
1206 | dlm_local_nodeid = 0; | |
e7fd4179 | 1207 | |
47c96298 SW |
1208 | for (i = 0; i < dlm_local_count; i++) |
1209 | kfree(dlm_local_addr[i]); | |
e7fd4179 | 1210 | } |