dlm: fix reconnecting but not sending data
[deliverable/linux.git] / fs / dlm / lowcomms.c
CommitLineData
fdda387f
PC
1/******************************************************************************
2*******************************************************************************
3**
4** Copyright (C) Sistina Software, Inc. 1997-2003 All rights reserved.
5e9ccc37 5** Copyright (C) 2004-2009 Red Hat, Inc. All rights reserved.
fdda387f
PC
6**
7** This copyrighted material is made available to anyone wishing to use,
8** modify, copy, or redistribute it subject to the terms and conditions
9** of the GNU General Public License v.2.
10**
11*******************************************************************************
12******************************************************************************/
13
14/*
15 * lowcomms.c
16 *
17 * This is the "low-level" comms layer.
18 *
19 * It is responsible for sending/receiving messages
20 * from other nodes in the cluster.
21 *
22 * Cluster nodes are referred to by their nodeids. nodeids are
23 * simply 32 bit numbers to the locking module - if they need to
2cf12c0b 24 * be expanded for the cluster infrastructure then that is its
fdda387f
PC
25 * responsibility. It is this layer's
26 * responsibility to resolve these into IP address or
27 * whatever it needs for inter-node communication.
28 *
29 * The comms level is two kernel threads that deal mainly with
30 * the receiving of messages from other nodes and passing them
31 * up to the mid-level comms layer (which understands the
32 * message format) for execution by the locking core, and
33 * a send thread which does all the setting up of connections
34 * to remote nodes and the sending of data. Threads are not allowed
35 * to send their own data because it may cause them to wait in times
36 * of high load. Also, this way, the sending thread can collect together
37 * messages bound for one node and send them in one block.
38 *
2cf12c0b 39 * lowcomms will choose to use either TCP or SCTP as its transport layer
6ed7257b 40 * depending on the configuration variable 'protocol'. This should be set
2cf12c0b 41 * to 0 (default) for TCP or 1 for SCTP. It should be configured using a
6ed7257b
PC
42 * cluster-wide mechanism as it must be the same on all nodes of the cluster
43 * for the DLM to function.
fdda387f
PC
44 *
45 */
46
fdda387f
PC
47#include <asm/ioctls.h>
48#include <net/sock.h>
49#include <net/tcp.h>
50#include <linux/pagemap.h>
6ed7257b 51#include <linux/file.h>
7a936ce7 52#include <linux/mutex.h>
6ed7257b 53#include <linux/sctp.h>
5a0e3ad6 54#include <linux/slab.h>
2f2d76cc 55#include <net/sctp/sctp.h>
44ad532b 56#include <net/ipv6.h>
fdda387f
PC
57
58#include "dlm_internal.h"
59#include "lowcomms.h"
60#include "midcomms.h"
61#include "config.h"
62
6ed7257b 63#define NEEDED_RMEM (4*1024*1024)
5e9ccc37 64#define CONN_HASH_SIZE 32
6ed7257b 65
f92c8dd7
BP
66/* Number of messages to send before rescheduling */
67#define MAX_SEND_MSG_COUNT 25
68
fdda387f 69struct cbuf {
ac33d071
PC
70 unsigned int base;
71 unsigned int len;
72 unsigned int mask;
fdda387f
PC
73};
74
ac33d071
PC
75static void cbuf_add(struct cbuf *cb, int n)
76{
77 cb->len += n;
78}
fdda387f 79
ac33d071
PC
80static int cbuf_data(struct cbuf *cb)
81{
82 return ((cb->base + cb->len) & cb->mask);
83}
84
85static void cbuf_init(struct cbuf *cb, int size)
86{
87 cb->base = cb->len = 0;
88 cb->mask = size-1;
89}
90
91static void cbuf_eat(struct cbuf *cb, int n)
92{
93 cb->len -= n;
94 cb->base += n;
95 cb->base &= cb->mask;
96}
97
98static bool cbuf_empty(struct cbuf *cb)
99{
100 return cb->len == 0;
101}
fdda387f 102
fdda387f
PC
103struct connection {
104 struct socket *sock; /* NULL if not connected */
105 uint32_t nodeid; /* So we know who we are in the list */
f1f1c1cc 106 struct mutex sock_mutex;
6ed7257b 107 unsigned long flags;
fdda387f
PC
108#define CF_READ_PENDING 1
109#define CF_WRITE_PENDING 2
110#define CF_CONNECT_PENDING 3
6ed7257b
PC
111#define CF_INIT_PENDING 4
112#define CF_IS_OTHERCON 5
063c4c99 113#define CF_CLOSE 6
b36930dd 114#define CF_APP_LIMITED 7
ac33d071 115 struct list_head writequeue; /* List of outgoing writequeue_entries */
fdda387f
PC
116 spinlock_t writequeue_lock;
117 int (*rx_action) (struct connection *); /* What to do when active */
6ed7257b 118 void (*connect_action) (struct connection *); /* What to do to connect */
fdda387f
PC
119 struct page *rx_page;
120 struct cbuf cb;
121 int retries;
fdda387f 122#define MAX_CONNECT_RETRIES 3
5e9ccc37 123 struct hlist_node list;
fdda387f 124 struct connection *othercon;
1d6e8131
PC
125 struct work_struct rwork; /* Receive workqueue */
126 struct work_struct swork; /* Send workqueue */
fdda387f
PC
127};
128#define sock2con(x) ((struct connection *)(x)->sk_user_data)
129
130/* An entry waiting to be sent */
131struct writequeue_entry {
132 struct list_head list;
133 struct page *page;
134 int offset;
135 int len;
136 int end;
137 int users;
138 struct connection *con;
139};
140
36b71a8b
DT
141struct dlm_node_addr {
142 struct list_head list;
143 int nodeid;
144 int addr_count;
98e1b60e 145 int curr_addr_index;
36b71a8b
DT
146 struct sockaddr_storage *addr[DLM_MAX_ADDR_COUNT];
147};
148
149static LIST_HEAD(dlm_node_addrs);
150static DEFINE_SPINLOCK(dlm_node_addrs_spin);
151
6ed7257b
PC
152static struct sockaddr_storage *dlm_local_addr[DLM_MAX_ADDR_COUNT];
153static int dlm_local_count;
513ef596 154static int dlm_allow_conn;
fdda387f 155
1d6e8131
PC
156/* Work queues */
157static struct workqueue_struct *recv_workqueue;
158static struct workqueue_struct *send_workqueue;
fdda387f 159
5e9ccc37 160static struct hlist_head connection_hash[CONN_HASH_SIZE];
7a936ce7 161static DEFINE_MUTEX(connections_lock);
c80e7c83 162static struct kmem_cache *con_cache;
fdda387f 163
1d6e8131
PC
164static void process_recv_sockets(struct work_struct *work);
165static void process_send_sockets(struct work_struct *work);
fdda387f 166
5e9ccc37
CC
167
168/* This is deliberately very simple because most clusters have simple
169 sequential nodeids, so we should be able to go straight to a connection
170 struct in the array */
171static inline int nodeid_hash(int nodeid)
172{
173 return nodeid & (CONN_HASH_SIZE-1);
174}
175
176static struct connection *__find_con(int nodeid)
177{
178 int r;
5e9ccc37
CC
179 struct connection *con;
180
181 r = nodeid_hash(nodeid);
182
b67bfe0d 183 hlist_for_each_entry(con, &connection_hash[r], list) {
5e9ccc37
CC
184 if (con->nodeid == nodeid)
185 return con;
186 }
187 return NULL;
188}
189
6ed7257b
PC
190/*
191 * If 'allocation' is zero then we don't attempt to create a new
192 * connection structure for this node.
193 */
194static struct connection *__nodeid2con(int nodeid, gfp_t alloc)
fdda387f
PC
195{
196 struct connection *con = NULL;
6ed7257b 197 int r;
fdda387f 198
5e9ccc37 199 con = __find_con(nodeid);
6ed7257b
PC
200 if (con || !alloc)
201 return con;
fdda387f 202
6ed7257b
PC
203 con = kmem_cache_zalloc(con_cache, alloc);
204 if (!con)
205 return NULL;
fdda387f 206
5e9ccc37
CC
207 r = nodeid_hash(nodeid);
208 hlist_add_head(&con->list, &connection_hash[r]);
fdda387f 209
6ed7257b
PC
210 con->nodeid = nodeid;
211 mutex_init(&con->sock_mutex);
212 INIT_LIST_HEAD(&con->writequeue);
213 spin_lock_init(&con->writequeue_lock);
214 INIT_WORK(&con->swork, process_send_sockets);
215 INIT_WORK(&con->rwork, process_recv_sockets);
fdda387f 216
6ed7257b
PC
217 /* Setup action pointers for child sockets */
218 if (con->nodeid) {
5e9ccc37 219 struct connection *zerocon = __find_con(0);
fdda387f 220
6ed7257b
PC
221 con->connect_action = zerocon->connect_action;
222 if (!con->rx_action)
223 con->rx_action = zerocon->rx_action;
fdda387f
PC
224 }
225
6ed7257b
PC
226 return con;
227}
228
5e9ccc37
CC
229/* Loop round all connections */
230static void foreach_conn(void (*conn_func)(struct connection *c))
231{
232 int i;
b67bfe0d 233 struct hlist_node *n;
5e9ccc37
CC
234 struct connection *con;
235
236 for (i = 0; i < CONN_HASH_SIZE; i++) {
b67bfe0d 237 hlist_for_each_entry_safe(con, n, &connection_hash[i], list)
5e9ccc37 238 conn_func(con);
5e9ccc37
CC
239 }
240}
241
6ed7257b
PC
242static struct connection *nodeid2con(int nodeid, gfp_t allocation)
243{
244 struct connection *con;
245
7a936ce7 246 mutex_lock(&connections_lock);
6ed7257b 247 con = __nodeid2con(nodeid, allocation);
7a936ce7 248 mutex_unlock(&connections_lock);
6ed7257b 249
fdda387f
PC
250 return con;
251}
252
36b71a8b
DT
253static struct dlm_node_addr *find_node_addr(int nodeid)
254{
255 struct dlm_node_addr *na;
256
257 list_for_each_entry(na, &dlm_node_addrs, list) {
258 if (na->nodeid == nodeid)
259 return na;
260 }
261 return NULL;
262}
263
264static int addr_compare(struct sockaddr_storage *x, struct sockaddr_storage *y)
6ed7257b 265{
36b71a8b
DT
266 switch (x->ss_family) {
267 case AF_INET: {
268 struct sockaddr_in *sinx = (struct sockaddr_in *)x;
269 struct sockaddr_in *siny = (struct sockaddr_in *)y;
270 if (sinx->sin_addr.s_addr != siny->sin_addr.s_addr)
271 return 0;
272 if (sinx->sin_port != siny->sin_port)
273 return 0;
274 break;
275 }
276 case AF_INET6: {
277 struct sockaddr_in6 *sinx = (struct sockaddr_in6 *)x;
278 struct sockaddr_in6 *siny = (struct sockaddr_in6 *)y;
279 if (!ipv6_addr_equal(&sinx->sin6_addr, &siny->sin6_addr))
280 return 0;
281 if (sinx->sin6_port != siny->sin6_port)
282 return 0;
283 break;
284 }
285 default:
286 return 0;
287 }
288 return 1;
289}
290
291static int nodeid_to_addr(int nodeid, struct sockaddr_storage *sas_out,
98e1b60e 292 struct sockaddr *sa_out, bool try_new_addr)
36b71a8b
DT
293{
294 struct sockaddr_storage sas;
295 struct dlm_node_addr *na;
6ed7257b
PC
296
297 if (!dlm_local_count)
298 return -1;
299
36b71a8b
DT
300 spin_lock(&dlm_node_addrs_spin);
301 na = find_node_addr(nodeid);
98e1b60e 302 if (na && na->addr_count) {
ee44b4bc
MRL
303 memcpy(&sas, na->addr[na->curr_addr_index],
304 sizeof(struct sockaddr_storage));
305
98e1b60e
MC
306 if (try_new_addr) {
307 na->curr_addr_index++;
308 if (na->curr_addr_index == na->addr_count)
309 na->curr_addr_index = 0;
310 }
98e1b60e 311 }
36b71a8b
DT
312 spin_unlock(&dlm_node_addrs_spin);
313
314 if (!na)
315 return -EEXIST;
316
317 if (!na->addr_count)
318 return -ENOENT;
319
320 if (sas_out)
321 memcpy(sas_out, &sas, sizeof(struct sockaddr_storage));
322
323 if (!sa_out)
324 return 0;
6ed7257b
PC
325
326 if (dlm_local_addr[0]->ss_family == AF_INET) {
36b71a8b
DT
327 struct sockaddr_in *in4 = (struct sockaddr_in *) &sas;
328 struct sockaddr_in *ret4 = (struct sockaddr_in *) sa_out;
6ed7257b
PC
329 ret4->sin_addr.s_addr = in4->sin_addr.s_addr;
330 } else {
36b71a8b
DT
331 struct sockaddr_in6 *in6 = (struct sockaddr_in6 *) &sas;
332 struct sockaddr_in6 *ret6 = (struct sockaddr_in6 *) sa_out;
4e3fd7a0 333 ret6->sin6_addr = in6->sin6_addr;
6ed7257b
PC
334 }
335
336 return 0;
337}
338
36b71a8b
DT
339static int addr_to_nodeid(struct sockaddr_storage *addr, int *nodeid)
340{
341 struct dlm_node_addr *na;
342 int rv = -EEXIST;
98e1b60e 343 int addr_i;
36b71a8b
DT
344
345 spin_lock(&dlm_node_addrs_spin);
346 list_for_each_entry(na, &dlm_node_addrs, list) {
347 if (!na->addr_count)
348 continue;
349
98e1b60e
MC
350 for (addr_i = 0; addr_i < na->addr_count; addr_i++) {
351 if (addr_compare(na->addr[addr_i], addr)) {
352 *nodeid = na->nodeid;
353 rv = 0;
354 goto unlock;
355 }
356 }
36b71a8b 357 }
98e1b60e 358unlock:
36b71a8b
DT
359 spin_unlock(&dlm_node_addrs_spin);
360 return rv;
361}
362
363int dlm_lowcomms_addr(int nodeid, struct sockaddr_storage *addr, int len)
364{
365 struct sockaddr_storage *new_addr;
366 struct dlm_node_addr *new_node, *na;
367
368 new_node = kzalloc(sizeof(struct dlm_node_addr), GFP_NOFS);
369 if (!new_node)
370 return -ENOMEM;
371
372 new_addr = kzalloc(sizeof(struct sockaddr_storage), GFP_NOFS);
373 if (!new_addr) {
374 kfree(new_node);
375 return -ENOMEM;
376 }
377
378 memcpy(new_addr, addr, len);
379
380 spin_lock(&dlm_node_addrs_spin);
381 na = find_node_addr(nodeid);
382 if (!na) {
383 new_node->nodeid = nodeid;
384 new_node->addr[0] = new_addr;
385 new_node->addr_count = 1;
386 list_add(&new_node->list, &dlm_node_addrs);
387 spin_unlock(&dlm_node_addrs_spin);
388 return 0;
389 }
390
391 if (na->addr_count >= DLM_MAX_ADDR_COUNT) {
392 spin_unlock(&dlm_node_addrs_spin);
393 kfree(new_addr);
394 kfree(new_node);
395 return -ENOSPC;
396 }
397
398 na->addr[na->addr_count++] = new_addr;
399 spin_unlock(&dlm_node_addrs_spin);
400 kfree(new_node);
401 return 0;
402}
403
fdda387f 404/* Data available on socket or listen socket received a connect */
676d2369 405static void lowcomms_data_ready(struct sock *sk)
fdda387f
PC
406{
407 struct connection *con = sock2con(sk);
afb853fb 408 if (con && !test_and_set_bit(CF_READ_PENDING, &con->flags))
1d6e8131 409 queue_work(recv_workqueue, &con->rwork);
fdda387f
PC
410}
411
412static void lowcomms_write_space(struct sock *sk)
413{
414 struct connection *con = sock2con(sk);
415
b36930dd
DM
416 if (!con)
417 return;
418
419 clear_bit(SOCK_NOSPACE, &con->sock->flags);
420
421 if (test_and_clear_bit(CF_APP_LIMITED, &con->flags)) {
422 con->sock->sk->sk_write_pending--;
423 clear_bit(SOCK_ASYNC_NOSPACE, &con->sock->flags);
424 }
425
426 if (!test_and_set_bit(CF_WRITE_PENDING, &con->flags))
1d6e8131 427 queue_work(send_workqueue, &con->swork);
fdda387f
PC
428}
429
430static inline void lowcomms_connect_sock(struct connection *con)
431{
063c4c99
LMB
432 if (test_bit(CF_CLOSE, &con->flags))
433 return;
1d6e8131
PC
434 if (!test_and_set_bit(CF_CONNECT_PENDING, &con->flags))
435 queue_work(send_workqueue, &con->swork);
fdda387f
PC
436}
437
438static void lowcomms_state_change(struct sock *sk)
439{
ee44b4bc
MRL
440 /* SCTP layer is not calling sk_data_ready when the connection
441 * is done, so we catch the signal through here. Also, it
442 * doesn't switch socket state when entering shutdown, so we
443 * skip the write in that case.
444 */
445 if (sk->sk_shutdown) {
446 if (sk->sk_shutdown == RCV_SHUTDOWN)
447 lowcomms_data_ready(sk);
448 } else if (sk->sk_state == TCP_ESTABLISHED) {
fdda387f 449 lowcomms_write_space(sk);
ee44b4bc 450 }
fdda387f
PC
451}
452
391fbdc5
CC
453int dlm_lowcomms_connect_node(int nodeid)
454{
455 struct connection *con;
456
457 if (nodeid == dlm_our_nodeid())
458 return 0;
459
460 con = nodeid2con(nodeid, GFP_NOFS);
461 if (!con)
462 return -ENOMEM;
463 lowcomms_connect_sock(con);
464 return 0;
465}
466
fdda387f 467/* Make a socket active */
4dd40f0c 468static void add_sock(struct socket *sock, struct connection *con)
fdda387f
PC
469{
470 con->sock = sock;
471
472 /* Install a data_ready callback */
473 con->sock->sk->sk_data_ready = lowcomms_data_ready;
474 con->sock->sk->sk_write_space = lowcomms_write_space;
475 con->sock->sk->sk_state_change = lowcomms_state_change;
6ed7257b 476 con->sock->sk->sk_user_data = con;
d6d7b702 477 con->sock->sk->sk_allocation = GFP_NOFS;
fdda387f
PC
478}
479
6ed7257b 480/* Add the port number to an IPv6 or 4 sockaddr and return the address
fdda387f
PC
481 length */
482static void make_sockaddr(struct sockaddr_storage *saddr, uint16_t port,
483 int *addr_len)
484{
6ed7257b 485 saddr->ss_family = dlm_local_addr[0]->ss_family;
ac33d071 486 if (saddr->ss_family == AF_INET) {
fdda387f
PC
487 struct sockaddr_in *in4_addr = (struct sockaddr_in *)saddr;
488 in4_addr->sin_port = cpu_to_be16(port);
489 *addr_len = sizeof(struct sockaddr_in);
6ed7257b 490 memset(&in4_addr->sin_zero, 0, sizeof(in4_addr->sin_zero));
ac33d071 491 } else {
fdda387f
PC
492 struct sockaddr_in6 *in6_addr = (struct sockaddr_in6 *)saddr;
493 in6_addr->sin6_port = cpu_to_be16(port);
494 *addr_len = sizeof(struct sockaddr_in6);
495 }
01c8cab2 496 memset((char *)saddr + *addr_len, 0, sizeof(struct sockaddr_storage) - *addr_len);
fdda387f
PC
497}
498
499/* Close a remote connection and tidy up */
0d737a8c
MRL
500static void close_connection(struct connection *con, bool and_other,
501 bool tx, bool rx)
fdda387f 502{
0d737a8c
MRL
503 clear_bit(CF_CONNECT_PENDING, &con->flags);
504 clear_bit(CF_WRITE_PENDING, &con->flags);
505 if (tx && cancel_work_sync(&con->swork))
506 log_print("canceled swork for node %d", con->nodeid);
507 if (rx && cancel_work_sync(&con->rwork))
508 log_print("canceled rwork for node %d", con->nodeid);
fdda387f 509
0d737a8c 510 mutex_lock(&con->sock_mutex);
fdda387f
PC
511 if (con->sock) {
512 sock_release(con->sock);
513 con->sock = NULL;
514 }
515 if (con->othercon && and_other) {
ac33d071 516 /* Will only re-enter once. */
0d737a8c 517 close_connection(con->othercon, false, true, true);
fdda387f
PC
518 }
519 if (con->rx_page) {
520 __free_page(con->rx_page);
521 con->rx_page = NULL;
522 }
9e5f2825 523
61d96be0
PC
524 con->retries = 0;
525 mutex_unlock(&con->sock_mutex);
fdda387f
PC
526}
527
528/* Data received from remote end */
529static int receive_from_sock(struct connection *con)
530{
531 int ret = 0;
58addbff
AV
532 struct msghdr msg = {};
533 struct kvec iov[2];
fdda387f
PC
534 unsigned len;
535 int r;
536 int call_again_soon = 0;
58addbff 537 int nvec;
fdda387f 538
f1f1c1cc 539 mutex_lock(&con->sock_mutex);
fdda387f 540
a34fbc63
PC
541 if (con->sock == NULL) {
542 ret = -EAGAIN;
543 goto out_close;
544 }
acee4e52
MRL
545 if (con->nodeid == 0) {
546 ret = -EINVAL;
547 goto out_close;
548 }
a34fbc63 549
fdda387f
PC
550 if (con->rx_page == NULL) {
551 /*
552 * This doesn't need to be atomic, but I think it should
553 * improve performance if it is.
554 */
555 con->rx_page = alloc_page(GFP_ATOMIC);
556 if (con->rx_page == NULL)
557 goto out_resched;
ac33d071 558 cbuf_init(&con->cb, PAGE_CACHE_SIZE);
fdda387f
PC
559 }
560
fdda387f
PC
561 /*
562 * iov[0] is the bit of the circular buffer between the current end
563 * point (cb.base + cb.len) and the end of the buffer.
564 */
ac33d071
PC
565 iov[0].iov_len = con->cb.base - cbuf_data(&con->cb);
566 iov[0].iov_base = page_address(con->rx_page) + cbuf_data(&con->cb);
89adc934 567 iov[1].iov_len = 0;
58addbff 568 nvec = 1;
fdda387f
PC
569
570 /*
571 * iov[1] is the bit of the circular buffer between the start of the
572 * buffer and the start of the currently used section (cb.base)
573 */
ac33d071
PC
574 if (cbuf_data(&con->cb) >= con->cb.base) {
575 iov[0].iov_len = PAGE_CACHE_SIZE - cbuf_data(&con->cb);
fdda387f
PC
576 iov[1].iov_len = con->cb.base;
577 iov[1].iov_base = page_address(con->rx_page);
58addbff 578 nvec = 2;
fdda387f
PC
579 }
580 len = iov[0].iov_len + iov[1].iov_len;
581
58addbff 582 r = ret = kernel_recvmsg(con->sock, &msg, iov, nvec, len,
fdda387f 583 MSG_DONTWAIT | MSG_NOSIGNAL);
fdda387f
PC
584 if (ret <= 0)
585 goto out_close;
ee44b4bc
MRL
586 else if (ret == len)
587 call_again_soon = 1;
bd44e2b0 588
ac33d071 589 cbuf_add(&con->cb, ret);
fdda387f
PC
590 ret = dlm_process_incoming_buffer(con->nodeid,
591 page_address(con->rx_page),
592 con->cb.base, con->cb.len,
593 PAGE_CACHE_SIZE);
594 if (ret == -EBADMSG) {
ee44b4bc
MRL
595 log_print("lowcomms: addr=%p, base=%u, len=%u, read=%d",
596 page_address(con->rx_page), con->cb.base,
597 con->cb.len, r);
fdda387f
PC
598 }
599 if (ret < 0)
600 goto out_close;
ac33d071 601 cbuf_eat(&con->cb, ret);
fdda387f 602
ac33d071 603 if (cbuf_empty(&con->cb) && !call_again_soon) {
fdda387f
PC
604 __free_page(con->rx_page);
605 con->rx_page = NULL;
606 }
607
fdda387f
PC
608 if (call_again_soon)
609 goto out_resched;
f1f1c1cc 610 mutex_unlock(&con->sock_mutex);
ac33d071 611 return 0;
fdda387f 612
ac33d071 613out_resched:
1d6e8131
PC
614 if (!test_and_set_bit(CF_READ_PENDING, &con->flags))
615 queue_work(recv_workqueue, &con->rwork);
f1f1c1cc 616 mutex_unlock(&con->sock_mutex);
bd44e2b0 617 return -EAGAIN;
fdda387f 618
ac33d071 619out_close:
f1f1c1cc 620 mutex_unlock(&con->sock_mutex);
9e5f2825 621 if (ret != -EAGAIN) {
0d737a8c 622 close_connection(con, false, true, false);
fdda387f
PC
623 /* Reconnect when there is something to send */
624 }
a34fbc63
PC
625 /* Don't return success if we really got EOF */
626 if (ret == 0)
627 ret = -EAGAIN;
fdda387f 628
fdda387f
PC
629 return ret;
630}
631
632/* Listening socket is busy, accept a connection */
6ed7257b 633static int tcp_accept_from_sock(struct connection *con)
fdda387f
PC
634{
635 int result;
636 struct sockaddr_storage peeraddr;
637 struct socket *newsock;
638 int len;
639 int nodeid;
640 struct connection *newcon;
bd44e2b0 641 struct connection *addcon;
fdda387f 642
513ef596
DT
643 mutex_lock(&connections_lock);
644 if (!dlm_allow_conn) {
645 mutex_unlock(&connections_lock);
646 return -1;
647 }
648 mutex_unlock(&connections_lock);
649
fdda387f 650 memset(&peeraddr, 0, sizeof(peeraddr));
eeb1bd5c
EB
651 result = sock_create_kern(&init_net, dlm_local_addr[0]->ss_family,
652 SOCK_STREAM, IPPROTO_TCP, &newsock);
fdda387f
PC
653 if (result < 0)
654 return -ENOMEM;
655
f1f1c1cc 656 mutex_lock_nested(&con->sock_mutex, 0);
fdda387f
PC
657
658 result = -ENOTCONN;
659 if (con->sock == NULL)
660 goto accept_err;
661
662 newsock->type = con->sock->type;
663 newsock->ops = con->sock->ops;
664
665 result = con->sock->ops->accept(con->sock, newsock, O_NONBLOCK);
666 if (result < 0)
667 goto accept_err;
668
669 /* Get the connected socket's peer */
670 memset(&peeraddr, 0, sizeof(peeraddr));
671 if (newsock->ops->getname(newsock, (struct sockaddr *)&peeraddr,
672 &len, 2)) {
673 result = -ECONNABORTED;
674 goto accept_err;
675 }
676
677 /* Get the new node's NODEID */
678 make_sockaddr(&peeraddr, 0, &len);
36b71a8b 679 if (addr_to_nodeid(&peeraddr, &nodeid)) {
bcaadf5c 680 unsigned char *b=(unsigned char *)&peeraddr;
617e82e1 681 log_print("connect from non cluster node");
bcaadf5c
MY
682 print_hex_dump_bytes("ss: ", DUMP_PREFIX_NONE,
683 b, sizeof(struct sockaddr_storage));
fdda387f 684 sock_release(newsock);
f1f1c1cc 685 mutex_unlock(&con->sock_mutex);
fdda387f
PC
686 return -1;
687 }
688
689 log_print("got connection from %d", nodeid);
690
691 /* Check to see if we already have a connection to this node. This
692 * could happen if the two nodes initiate a connection at roughly
693 * the same time and the connections cross on the wire.
fdda387f
PC
694 * In this case we store the incoming one in "othercon"
695 */
748285cc 696 newcon = nodeid2con(nodeid, GFP_NOFS);
fdda387f
PC
697 if (!newcon) {
698 result = -ENOMEM;
699 goto accept_err;
700 }
f1f1c1cc 701 mutex_lock_nested(&newcon->sock_mutex, 1);
fdda387f 702 if (newcon->sock) {
ac33d071 703 struct connection *othercon = newcon->othercon;
fdda387f
PC
704
705 if (!othercon) {
748285cc 706 othercon = kmem_cache_zalloc(con_cache, GFP_NOFS);
fdda387f 707 if (!othercon) {
617e82e1 708 log_print("failed to allocate incoming socket");
f1f1c1cc 709 mutex_unlock(&newcon->sock_mutex);
fdda387f
PC
710 result = -ENOMEM;
711 goto accept_err;
712 }
fdda387f
PC
713 othercon->nodeid = nodeid;
714 othercon->rx_action = receive_from_sock;
f1f1c1cc 715 mutex_init(&othercon->sock_mutex);
1d6e8131
PC
716 INIT_WORK(&othercon->swork, process_send_sockets);
717 INIT_WORK(&othercon->rwork, process_recv_sockets);
fdda387f 718 set_bit(CF_IS_OTHERCON, &othercon->flags);
61d96be0
PC
719 }
720 if (!othercon->sock) {
fdda387f 721 newcon->othercon = othercon;
97d84836
PC
722 othercon->sock = newsock;
723 newsock->sk->sk_user_data = othercon;
724 add_sock(newsock, othercon);
725 addcon = othercon;
726 }
727 else {
728 printk("Extra connection from node %d attempted\n", nodeid);
729 result = -EAGAIN;
f4fadb23 730 mutex_unlock(&newcon->sock_mutex);
97d84836 731 goto accept_err;
fdda387f 732 }
fdda387f
PC
733 }
734 else {
735 newsock->sk->sk_user_data = newcon;
736 newcon->rx_action = receive_from_sock;
737 add_sock(newsock, newcon);
bd44e2b0 738 addcon = newcon;
fdda387f
PC
739 }
740
f1f1c1cc 741 mutex_unlock(&newcon->sock_mutex);
fdda387f
PC
742
743 /*
744 * Add it to the active queue in case we got data
25985edc 745 * between processing the accept adding the socket
fdda387f
PC
746 * to the read_sockets list
747 */
bd44e2b0
PC
748 if (!test_and_set_bit(CF_READ_PENDING, &addcon->flags))
749 queue_work(recv_workqueue, &addcon->rwork);
f1f1c1cc 750 mutex_unlock(&con->sock_mutex);
fdda387f
PC
751
752 return 0;
753
ac33d071 754accept_err:
f1f1c1cc 755 mutex_unlock(&con->sock_mutex);
fdda387f
PC
756 sock_release(newsock);
757
758 if (result != -EAGAIN)
617e82e1 759 log_print("error accepting connection from node: %d", result);
fdda387f
PC
760 return result;
761}
762
ee44b4bc
MRL
763int sctp_accept_from_sock(struct connection *con)
764{
765 /* Check that the new node is in the lockspace */
766 struct sctp_prim prim;
767 int nodeid;
768 int prim_len, ret;
769 int addr_len;
770 struct connection *newcon;
771 struct connection *addcon;
772 struct socket *newsock;
773
774 mutex_lock(&connections_lock);
775 if (!dlm_allow_conn) {
776 mutex_unlock(&connections_lock);
777 return -1;
778 }
779 mutex_unlock(&connections_lock);
780
781 mutex_lock_nested(&con->sock_mutex, 0);
782
783 ret = kernel_accept(con->sock, &newsock, O_NONBLOCK);
784 if (ret < 0)
785 goto accept_err;
786
787 memset(&prim, 0, sizeof(struct sctp_prim));
788 prim_len = sizeof(struct sctp_prim);
789
790 ret = kernel_getsockopt(newsock, IPPROTO_SCTP, SCTP_PRIMARY_ADDR,
791 (char *)&prim, &prim_len);
792 if (ret < 0) {
793 log_print("getsockopt/sctp_primary_addr failed: %d", ret);
794 goto accept_err;
795 }
796
797 make_sockaddr(&prim.ssp_addr, 0, &addr_len);
798 if (addr_to_nodeid(&prim.ssp_addr, &nodeid)) {
799 unsigned char *b = (unsigned char *)&prim.ssp_addr;
800
801 log_print("reject connect from unknown addr");
802 print_hex_dump_bytes("ss: ", DUMP_PREFIX_NONE,
803 b, sizeof(struct sockaddr_storage));
804 goto accept_err;
805 }
806
807 newcon = nodeid2con(nodeid, GFP_NOFS);
808 if (!newcon) {
809 ret = -ENOMEM;
810 goto accept_err;
811 }
812
813 mutex_lock_nested(&newcon->sock_mutex, 1);
814
815 if (newcon->sock) {
816 struct connection *othercon = newcon->othercon;
817
818 if (!othercon) {
819 othercon = kmem_cache_zalloc(con_cache, GFP_NOFS);
820 if (!othercon) {
821 log_print("failed to allocate incoming socket");
822 mutex_unlock(&newcon->sock_mutex);
823 ret = -ENOMEM;
824 goto accept_err;
825 }
826 othercon->nodeid = nodeid;
827 othercon->rx_action = receive_from_sock;
828 mutex_init(&othercon->sock_mutex);
829 INIT_WORK(&othercon->swork, process_send_sockets);
830 INIT_WORK(&othercon->rwork, process_recv_sockets);
831 set_bit(CF_IS_OTHERCON, &othercon->flags);
832 }
833 if (!othercon->sock) {
834 newcon->othercon = othercon;
835 othercon->sock = newsock;
836 newsock->sk->sk_user_data = othercon;
837 add_sock(newsock, othercon);
838 addcon = othercon;
839 } else {
840 printk("Extra connection from node %d attempted\n", nodeid);
841 ret = -EAGAIN;
842 mutex_unlock(&newcon->sock_mutex);
843 goto accept_err;
844 }
845 } else {
846 newsock->sk->sk_user_data = newcon;
847 newcon->rx_action = receive_from_sock;
848 add_sock(newsock, newcon);
849 addcon = newcon;
850 }
851
852 log_print("connected to %d", nodeid);
853
854 mutex_unlock(&newcon->sock_mutex);
855
856 /*
857 * Add it to the active queue in case we got data
858 * between processing the accept adding the socket
859 * to the read_sockets list
860 */
861 if (!test_and_set_bit(CF_READ_PENDING, &addcon->flags))
862 queue_work(recv_workqueue, &addcon->rwork);
863 mutex_unlock(&con->sock_mutex);
864
865 return 0;
866
867accept_err:
868 mutex_unlock(&con->sock_mutex);
869 if (newsock)
870 sock_release(newsock);
871 if (ret != -EAGAIN)
872 log_print("error accepting connection from node: %d", ret);
873
874 return ret;
875}
876
6ed7257b
PC
877static void free_entry(struct writequeue_entry *e)
878{
879 __free_page(e->page);
880 kfree(e);
881}
882
5d689871
MC
883/*
884 * writequeue_entry_complete - try to delete and free write queue entry
885 * @e: write queue entry to try to delete
886 * @completed: bytes completed
887 *
888 * writequeue_lock must be held.
889 */
890static void writequeue_entry_complete(struct writequeue_entry *e, int completed)
891{
892 e->offset += completed;
893 e->len -= completed;
894
895 if (e->len == 0 && e->users == 0) {
896 list_del(&e->list);
897 free_entry(e);
898 }
899}
900
ee44b4bc
MRL
901/*
902 * sctp_bind_addrs - bind a SCTP socket to all our addresses
903 */
904static int sctp_bind_addrs(struct connection *con, uint16_t port)
905{
906 struct sockaddr_storage localaddr;
907 int i, addr_len, result = 0;
908
909 for (i = 0; i < dlm_local_count; i++) {
910 memcpy(&localaddr, dlm_local_addr[i], sizeof(localaddr));
911 make_sockaddr(&localaddr, port, &addr_len);
912
913 if (!i)
914 result = kernel_bind(con->sock,
915 (struct sockaddr *)&localaddr,
916 addr_len);
917 else
918 result = kernel_setsockopt(con->sock, SOL_SCTP,
919 SCTP_SOCKOPT_BINDX_ADD,
920 (char *)&localaddr, addr_len);
921
922 if (result < 0) {
923 log_print("Can't bind to %d addr number %d, %d.\n",
924 port, i + 1, result);
925 break;
926 }
927 }
928 return result;
929}
930
6ed7257b
PC
931/* Initiate an SCTP association.
932 This is a special case of send_to_sock() in that we don't yet have a
933 peeled-off socket for this association, so we use the listening socket
934 and add the primary IP address of the remote node.
935 */
ee44b4bc 936static void sctp_connect_to_sock(struct connection *con)
6ed7257b 937{
ee44b4bc
MRL
938 struct sockaddr_storage daddr;
939 int one = 1;
940 int result;
941 int addr_len;
942 struct socket *sock;
943
944 if (con->nodeid == 0) {
945 log_print("attempt to connect sock 0 foiled");
946 return;
947 }
6ed7257b 948
5d689871 949 mutex_lock(&con->sock_mutex);
6ed7257b 950
ee44b4bc
MRL
951 /* Some odd races can cause double-connects, ignore them */
952 if (con->retries++ > MAX_CONNECT_RETRIES)
953 goto out;
954
955 if (con->sock) {
956 log_print("node %d already connected.", con->nodeid);
957 goto out;
958 }
959
960 memset(&daddr, 0, sizeof(daddr));
961 result = nodeid_to_addr(con->nodeid, &daddr, NULL, true);
962 if (result < 0) {
6ed7257b 963 log_print("no address for nodeid %d", con->nodeid);
ee44b4bc 964 goto out;
6ed7257b 965 }
6ed7257b 966
ee44b4bc
MRL
967 /* Create a socket to communicate with */
968 result = sock_create_kern(&init_net, dlm_local_addr[0]->ss_family,
969 SOCK_STREAM, IPPROTO_SCTP, &sock);
970 if (result < 0)
971 goto socket_err;
6ed7257b 972
ee44b4bc
MRL
973 sock->sk->sk_user_data = con;
974 con->rx_action = receive_from_sock;
975 con->connect_action = sctp_connect_to_sock;
976 add_sock(sock, con);
6ed7257b 977
ee44b4bc
MRL
978 /* Bind to all addresses. */
979 if (sctp_bind_addrs(con, 0))
980 goto bind_err;
6ed7257b 981
ee44b4bc 982 make_sockaddr(&daddr, dlm_config.ci_tcp_port, &addr_len);
6ed7257b 983
ee44b4bc 984 log_print("connecting to %d", con->nodeid);
6ed7257b 985
ee44b4bc
MRL
986 /* Turn off Nagle's algorithm */
987 kernel_setsockopt(sock, SOL_TCP, TCP_NODELAY, (char *)&one,
988 sizeof(one));
6ed7257b 989
ee44b4bc
MRL
990 result = sock->ops->connect(sock, (struct sockaddr *)&daddr, addr_len,
991 O_NONBLOCK);
992 if (result == -EINPROGRESS)
993 result = 0;
994 if (result == 0)
995 goto out;
98e1b60e 996
6ed7257b 997
ee44b4bc
MRL
998bind_err:
999 con->sock = NULL;
1000 sock_release(sock);
6ed7257b 1001
ee44b4bc
MRL
1002socket_err:
1003 /*
1004 * Some errors are fatal and this list might need adjusting. For other
1005 * errors we try again until the max number of retries is reached.
1006 */
1007 if (result != -EHOSTUNREACH &&
1008 result != -ENETUNREACH &&
1009 result != -ENETDOWN &&
1010 result != -EINVAL &&
1011 result != -EPROTONOSUPPORT) {
1012 log_print("connect %d try %d error %d", con->nodeid,
1013 con->retries, result);
1014 mutex_unlock(&con->sock_mutex);
1015 msleep(1000);
6ed7257b 1016 clear_bit(CF_CONNECT_PENDING, &con->flags);
ee44b4bc
MRL
1017 lowcomms_connect_sock(con);
1018 return;
6ed7257b 1019 }
5d689871 1020
ee44b4bc 1021out:
5d689871 1022 mutex_unlock(&con->sock_mutex);
00dcffae 1023 set_bit(CF_WRITE_PENDING, &con->flags);
6ed7257b
PC
1024}
1025
fdda387f 1026/* Connect a new socket to its peer */
6ed7257b 1027static void tcp_connect_to_sock(struct connection *con)
fdda387f 1028{
6bd8feda 1029 struct sockaddr_storage saddr, src_addr;
fdda387f 1030 int addr_len;
a89d63a1 1031 struct socket *sock = NULL;
cb2d45da 1032 int one = 1;
36b71a8b 1033 int result;
fdda387f
PC
1034
1035 if (con->nodeid == 0) {
1036 log_print("attempt to connect sock 0 foiled");
ac33d071 1037 return;
fdda387f
PC
1038 }
1039
f1f1c1cc 1040 mutex_lock(&con->sock_mutex);
fdda387f
PC
1041 if (con->retries++ > MAX_CONNECT_RETRIES)
1042 goto out;
1043
1044 /* Some odd races can cause double-connects, ignore them */
36b71a8b 1045 if (con->sock)
fdda387f 1046 goto out;
fdda387f
PC
1047
1048 /* Create a socket to communicate with */
eeb1bd5c
EB
1049 result = sock_create_kern(&init_net, dlm_local_addr[0]->ss_family,
1050 SOCK_STREAM, IPPROTO_TCP, &sock);
fdda387f
PC
1051 if (result < 0)
1052 goto out_err;
1053
1054 memset(&saddr, 0, sizeof(saddr));
98e1b60e 1055 result = nodeid_to_addr(con->nodeid, &saddr, NULL, false);
36b71a8b
DT
1056 if (result < 0) {
1057 log_print("no address for nodeid %d", con->nodeid);
ac33d071 1058 goto out_err;
36b71a8b 1059 }
fdda387f
PC
1060
1061 sock->sk->sk_user_data = con;
1062 con->rx_action = receive_from_sock;
6ed7257b
PC
1063 con->connect_action = tcp_connect_to_sock;
1064 add_sock(sock, con);
fdda387f 1065
6bd8feda
LH
1066 /* Bind to our cluster-known address connecting to avoid
1067 routing problems */
1068 memcpy(&src_addr, dlm_local_addr[0], sizeof(src_addr));
1069 make_sockaddr(&src_addr, 0, &addr_len);
1070 result = sock->ops->bind(sock, (struct sockaddr *) &src_addr,
1071 addr_len);
1072 if (result < 0) {
1073 log_print("could not bind for connect: %d", result);
1074 /* This *may* not indicate a critical error */
1075 }
1076
68c817a1 1077 make_sockaddr(&saddr, dlm_config.ci_tcp_port, &addr_len);
fdda387f 1078
fdda387f 1079 log_print("connecting to %d", con->nodeid);
cb2d45da
DT
1080
1081 /* Turn off Nagle's algorithm */
1082 kernel_setsockopt(sock, SOL_TCP, TCP_NODELAY, (char *)&one,
1083 sizeof(one));
1084
36b71a8b 1085 result = sock->ops->connect(sock, (struct sockaddr *)&saddr, addr_len,
ac33d071 1086 O_NONBLOCK);
fdda387f
PC
1087 if (result == -EINPROGRESS)
1088 result = 0;
ac33d071
PC
1089 if (result == 0)
1090 goto out;
fdda387f 1091
ac33d071 1092out_err:
fdda387f
PC
1093 if (con->sock) {
1094 sock_release(con->sock);
1095 con->sock = NULL;
a89d63a1
CD
1096 } else if (sock) {
1097 sock_release(sock);
fdda387f
PC
1098 }
1099 /*
1100 * Some errors are fatal and this list might need adjusting. For other
1101 * errors we try again until the max number of retries is reached.
1102 */
36b71a8b
DT
1103 if (result != -EHOSTUNREACH &&
1104 result != -ENETUNREACH &&
1105 result != -ENETDOWN &&
1106 result != -EINVAL &&
1107 result != -EPROTONOSUPPORT) {
1108 log_print("connect %d try %d error %d", con->nodeid,
1109 con->retries, result);
1110 mutex_unlock(&con->sock_mutex);
1111 msleep(1000);
356344c4 1112 clear_bit(CF_CONNECT_PENDING, &con->flags);
fdda387f 1113 lowcomms_connect_sock(con);
36b71a8b 1114 return;
fdda387f 1115 }
ac33d071 1116out:
f1f1c1cc 1117 mutex_unlock(&con->sock_mutex);
00dcffae 1118 set_bit(CF_WRITE_PENDING, &con->flags);
ac33d071 1119 return;
fdda387f
PC
1120}
1121
6ed7257b
PC
1122static struct socket *tcp_create_listen_sock(struct connection *con,
1123 struct sockaddr_storage *saddr)
fdda387f 1124{
ac33d071 1125 struct socket *sock = NULL;
fdda387f
PC
1126 int result = 0;
1127 int one = 1;
1128 int addr_len;
1129
6ed7257b 1130 if (dlm_local_addr[0]->ss_family == AF_INET)
fdda387f
PC
1131 addr_len = sizeof(struct sockaddr_in);
1132 else
1133 addr_len = sizeof(struct sockaddr_in6);
1134
1135 /* Create a socket to communicate with */
eeb1bd5c
EB
1136 result = sock_create_kern(&init_net, dlm_local_addr[0]->ss_family,
1137 SOCK_STREAM, IPPROTO_TCP, &sock);
fdda387f 1138 if (result < 0) {
617e82e1 1139 log_print("Can't create listening comms socket");
fdda387f
PC
1140 goto create_out;
1141 }
1142
cb2d45da
DT
1143 /* Turn off Nagle's algorithm */
1144 kernel_setsockopt(sock, SOL_TCP, TCP_NODELAY, (char *)&one,
1145 sizeof(one));
1146
6ed7257b
PC
1147 result = kernel_setsockopt(sock, SOL_SOCKET, SO_REUSEADDR,
1148 (char *)&one, sizeof(one));
1149
fdda387f 1150 if (result < 0) {
617e82e1 1151 log_print("Failed to set SO_REUSEADDR on socket: %d", result);
fdda387f 1152 }
6ed7257b
PC
1153 con->rx_action = tcp_accept_from_sock;
1154 con->connect_action = tcp_connect_to_sock;
fdda387f
PC
1155
1156 /* Bind to our port */
68c817a1 1157 make_sockaddr(saddr, dlm_config.ci_tcp_port, &addr_len);
fdda387f
PC
1158 result = sock->ops->bind(sock, (struct sockaddr *) saddr, addr_len);
1159 if (result < 0) {
617e82e1 1160 log_print("Can't bind to port %d", dlm_config.ci_tcp_port);
fdda387f
PC
1161 sock_release(sock);
1162 sock = NULL;
1163 con->sock = NULL;
1164 goto create_out;
1165 }
6ed7257b 1166 result = kernel_setsockopt(sock, SOL_SOCKET, SO_KEEPALIVE,
ac33d071 1167 (char *)&one, sizeof(one));
fdda387f 1168 if (result < 0) {
617e82e1 1169 log_print("Set keepalive failed: %d", result);
fdda387f
PC
1170 }
1171
1172 result = sock->ops->listen(sock, 5);
1173 if (result < 0) {
617e82e1 1174 log_print("Can't listen on port %d", dlm_config.ci_tcp_port);
fdda387f
PC
1175 sock_release(sock);
1176 sock = NULL;
1177 goto create_out;
1178 }
1179
ac33d071 1180create_out:
fdda387f
PC
1181 return sock;
1182}
1183
6ed7257b
PC
1184/* Get local addresses */
1185static void init_local(void)
1186{
1187 struct sockaddr_storage sas, *addr;
1188 int i;
1189
30d3a237 1190 dlm_local_count = 0;
1b189b88 1191 for (i = 0; i < DLM_MAX_ADDR_COUNT; i++) {
6ed7257b
PC
1192 if (dlm_our_addr(&sas, i))
1193 break;
1194
573c24c4 1195 addr = kmalloc(sizeof(*addr), GFP_NOFS);
6ed7257b
PC
1196 if (!addr)
1197 break;
1198 memcpy(addr, &sas, sizeof(*addr));
1199 dlm_local_addr[dlm_local_count++] = addr;
1200 }
1201}
1202
6ed7257b
PC
1203/* Initialise SCTP socket and bind to all interfaces */
1204static int sctp_listen_for_all(void)
1205{
1206 struct socket *sock = NULL;
ee44b4bc 1207 int result = -EINVAL;
573c24c4 1208 struct connection *con = nodeid2con(0, GFP_NOFS);
6ed7257b 1209 int bufsize = NEEDED_RMEM;
86e92ad2 1210 int one = 1;
6ed7257b
PC
1211
1212 if (!con)
1213 return -ENOMEM;
1214
1215 log_print("Using SCTP for communications");
1216
eeb1bd5c 1217 result = sock_create_kern(&init_net, dlm_local_addr[0]->ss_family,
ee44b4bc 1218 SOCK_STREAM, IPPROTO_SCTP, &sock);
6ed7257b
PC
1219 if (result < 0) {
1220 log_print("Can't create comms socket, check SCTP is loaded");
1221 goto out;
1222 }
1223
df61c952 1224 result = kernel_setsockopt(sock, SOL_SOCKET, SO_RCVBUFFORCE,
6ed7257b
PC
1225 (char *)&bufsize, sizeof(bufsize));
1226 if (result)
617e82e1 1227 log_print("Error increasing buffer space on socket %d", result);
6ed7257b 1228
86e92ad2
MC
1229 result = kernel_setsockopt(sock, SOL_SCTP, SCTP_NODELAY, (char *)&one,
1230 sizeof(one));
1231 if (result < 0)
1232 log_print("Could not set SCTP NODELAY error %d\n", result);
1233
6ed7257b
PC
1234 /* Init con struct */
1235 sock->sk->sk_user_data = con;
1236 con->sock = sock;
1237 con->sock->sk->sk_data_ready = lowcomms_data_ready;
ee44b4bc
MRL
1238 con->rx_action = sctp_accept_from_sock;
1239 con->connect_action = sctp_connect_to_sock;
6ed7257b 1240
ee44b4bc
MRL
1241 /* Bind to all addresses. */
1242 if (sctp_bind_addrs(con, dlm_config.ci_tcp_port))
1243 goto create_delsock;
6ed7257b
PC
1244
1245 result = sock->ops->listen(sock, 5);
1246 if (result < 0) {
1247 log_print("Can't set socket listening");
1248 goto create_delsock;
1249 }
1250
1251 return 0;
1252
1253create_delsock:
1254 sock_release(sock);
1255 con->sock = NULL;
1256out:
1257 return result;
1258}
1259
1260static int tcp_listen_for_all(void)
fdda387f
PC
1261{
1262 struct socket *sock = NULL;
573c24c4 1263 struct connection *con = nodeid2con(0, GFP_NOFS);
fdda387f
PC
1264 int result = -EINVAL;
1265
6ed7257b
PC
1266 if (!con)
1267 return -ENOMEM;
1268
fdda387f 1269 /* We don't support multi-homed hosts */
6ed7257b 1270 if (dlm_local_addr[1] != NULL) {
617e82e1
DT
1271 log_print("TCP protocol can't handle multi-homed hosts, "
1272 "try SCTP");
6ed7257b
PC
1273 return -EINVAL;
1274 }
1275
1276 log_print("Using TCP for communications");
1277
6ed7257b 1278 sock = tcp_create_listen_sock(con, dlm_local_addr[0]);
fdda387f
PC
1279 if (sock) {
1280 add_sock(sock, con);
1281 result = 0;
1282 }
1283 else {
1284 result = -EADDRINUSE;
1285 }
1286
1287 return result;
1288}
1289
1290
1291
1292static struct writequeue_entry *new_writequeue_entry(struct connection *con,
1293 gfp_t allocation)
1294{
1295 struct writequeue_entry *entry;
1296
1297 entry = kmalloc(sizeof(struct writequeue_entry), allocation);
1298 if (!entry)
1299 return NULL;
1300
1301 entry->page = alloc_page(allocation);
1302 if (!entry->page) {
1303 kfree(entry);
1304 return NULL;
1305 }
1306
1307 entry->offset = 0;
1308 entry->len = 0;
1309 entry->end = 0;
1310 entry->users = 0;
1311 entry->con = con;
1312
1313 return entry;
1314}
1315
617e82e1 1316void *dlm_lowcomms_get_buffer(int nodeid, int len, gfp_t allocation, char **ppc)
fdda387f
PC
1317{
1318 struct connection *con;
1319 struct writequeue_entry *e;
1320 int offset = 0;
fdda387f 1321
fdda387f
PC
1322 con = nodeid2con(nodeid, allocation);
1323 if (!con)
1324 return NULL;
1325
4edde74e 1326 spin_lock(&con->writequeue_lock);
fdda387f 1327 e = list_entry(con->writequeue.prev, struct writequeue_entry, list);
ac33d071 1328 if ((&e->list == &con->writequeue) ||
fdda387f
PC
1329 (PAGE_CACHE_SIZE - e->end < len)) {
1330 e = NULL;
1331 } else {
1332 offset = e->end;
1333 e->end += len;
eeee2b5f 1334 e->users++;
fdda387f
PC
1335 }
1336 spin_unlock(&con->writequeue_lock);
1337
1338 if (e) {
ac33d071 1339 got_one:
fdda387f
PC
1340 *ppc = page_address(e->page) + offset;
1341 return e;
1342 }
1343
1344 e = new_writequeue_entry(con, allocation);
1345 if (e) {
1346 spin_lock(&con->writequeue_lock);
1347 offset = e->end;
1348 e->end += len;
eeee2b5f 1349 e->users++;
fdda387f
PC
1350 list_add_tail(&e->list, &con->writequeue);
1351 spin_unlock(&con->writequeue_lock);
1352 goto got_one;
1353 }
1354 return NULL;
1355}
1356
1357void dlm_lowcomms_commit_buffer(void *mh)
1358{
1359 struct writequeue_entry *e = (struct writequeue_entry *)mh;
1360 struct connection *con = e->con;
1361 int users;
1362
4edde74e 1363 spin_lock(&con->writequeue_lock);
fdda387f
PC
1364 users = --e->users;
1365 if (users)
1366 goto out;
1367 e->len = e->end - e->offset;
fdda387f
PC
1368 spin_unlock(&con->writequeue_lock);
1369
1d6e8131
PC
1370 if (!test_and_set_bit(CF_WRITE_PENDING, &con->flags)) {
1371 queue_work(send_workqueue, &con->swork);
fdda387f
PC
1372 }
1373 return;
1374
ac33d071 1375out:
fdda387f
PC
1376 spin_unlock(&con->writequeue_lock);
1377 return;
1378}
1379
fdda387f 1380/* Send a message */
ac33d071 1381static void send_to_sock(struct connection *con)
fdda387f
PC
1382{
1383 int ret = 0;
fdda387f
PC
1384 const int msg_flags = MSG_DONTWAIT | MSG_NOSIGNAL;
1385 struct writequeue_entry *e;
1386 int len, offset;
f92c8dd7 1387 int count = 0;
fdda387f 1388
f1f1c1cc 1389 mutex_lock(&con->sock_mutex);
fdda387f
PC
1390 if (con->sock == NULL)
1391 goto out_connect;
1392
fdda387f
PC
1393 spin_lock(&con->writequeue_lock);
1394 for (;;) {
1395 e = list_entry(con->writequeue.next, struct writequeue_entry,
1396 list);
1397 if ((struct list_head *) e == &con->writequeue)
1398 break;
1399
1400 len = e->len;
1401 offset = e->offset;
1402 BUG_ON(len == 0 && e->users == 0);
1403 spin_unlock(&con->writequeue_lock);
1404
1405 ret = 0;
1406 if (len) {
1329e3f2
PB
1407 ret = kernel_sendpage(con->sock, e->page, offset, len,
1408 msg_flags);
d66f8277 1409 if (ret == -EAGAIN || ret == 0) {
b36930dd
DM
1410 if (ret == -EAGAIN &&
1411 test_bit(SOCK_ASYNC_NOSPACE, &con->sock->flags) &&
1412 !test_and_set_bit(CF_APP_LIMITED, &con->flags)) {
1413 /* Notify TCP that we're limited by the
1414 * application window size.
1415 */
1416 set_bit(SOCK_NOSPACE, &con->sock->flags);
1417 con->sock->sk->sk_write_pending++;
1418 }
d66f8277 1419 cond_resched();
fdda387f 1420 goto out;
9c5bef58 1421 } else if (ret < 0)
fdda387f 1422 goto send_error;
d66f8277 1423 }
f92c8dd7
BP
1424
1425 /* Don't starve people filling buffers */
1426 if (++count >= MAX_SEND_MSG_COUNT) {
ac33d071 1427 cond_resched();
f92c8dd7
BP
1428 count = 0;
1429 }
fdda387f
PC
1430
1431 spin_lock(&con->writequeue_lock);
5d689871 1432 writequeue_entry_complete(e, ret);
fdda387f
PC
1433 }
1434 spin_unlock(&con->writequeue_lock);
ac33d071 1435out:
f1f1c1cc 1436 mutex_unlock(&con->sock_mutex);
ac33d071 1437 return;
fdda387f 1438
ac33d071 1439send_error:
f1f1c1cc 1440 mutex_unlock(&con->sock_mutex);
0d737a8c 1441 close_connection(con, false, false, true);
fdda387f 1442 lowcomms_connect_sock(con);
ac33d071 1443 return;
fdda387f 1444
ac33d071 1445out_connect:
f1f1c1cc 1446 mutex_unlock(&con->sock_mutex);
ee44b4bc 1447 lowcomms_connect_sock(con);
fdda387f
PC
1448}
1449
1450static void clean_one_writequeue(struct connection *con)
1451{
5e9ccc37 1452 struct writequeue_entry *e, *safe;
fdda387f
PC
1453
1454 spin_lock(&con->writequeue_lock);
5e9ccc37 1455 list_for_each_entry_safe(e, safe, &con->writequeue, list) {
fdda387f
PC
1456 list_del(&e->list);
1457 free_entry(e);
1458 }
1459 spin_unlock(&con->writequeue_lock);
1460}
1461
1462/* Called from recovery when it knows that a node has
1463 left the cluster */
1464int dlm_lowcomms_close(int nodeid)
1465{
1466 struct connection *con;
36b71a8b 1467 struct dlm_node_addr *na;
fdda387f 1468
fdda387f
PC
1469 log_print("closing connection to node %d", nodeid);
1470 con = nodeid2con(nodeid, 0);
1471 if (con) {
063c4c99 1472 set_bit(CF_CLOSE, &con->flags);
0d737a8c 1473 close_connection(con, true, true, true);
fdda387f 1474 clean_one_writequeue(con);
fdda387f 1475 }
36b71a8b
DT
1476
1477 spin_lock(&dlm_node_addrs_spin);
1478 na = find_node_addr(nodeid);
1479 if (na) {
1480 list_del(&na->list);
1481 while (na->addr_count--)
1482 kfree(na->addr[na->addr_count]);
1483 kfree(na);
1484 }
1485 spin_unlock(&dlm_node_addrs_spin);
1486
fdda387f 1487 return 0;
fdda387f
PC
1488}
1489
6ed7257b 1490/* Receive workqueue function */
1d6e8131 1491static void process_recv_sockets(struct work_struct *work)
fdda387f 1492{
1d6e8131
PC
1493 struct connection *con = container_of(work, struct connection, rwork);
1494 int err;
fdda387f 1495
1d6e8131
PC
1496 clear_bit(CF_READ_PENDING, &con->flags);
1497 do {
1498 err = con->rx_action(con);
1499 } while (!err);
fdda387f
PC
1500}
1501
6ed7257b 1502/* Send workqueue function */
1d6e8131 1503static void process_send_sockets(struct work_struct *work)
fdda387f 1504{
1d6e8131 1505 struct connection *con = container_of(work, struct connection, swork);
fdda387f 1506
00dcffae 1507 if (test_and_clear_bit(CF_CONNECT_PENDING, &con->flags))
6ed7257b 1508 con->connect_action(con);
063c4c99
LMB
1509 if (test_and_clear_bit(CF_WRITE_PENDING, &con->flags))
1510 send_to_sock(con);
fdda387f
PC
1511}
1512
1513
1514/* Discard all entries on the write queues */
1515static void clean_writequeues(void)
1516{
5e9ccc37 1517 foreach_conn(clean_one_writequeue);
fdda387f
PC
1518}
1519
1d6e8131 1520static void work_stop(void)
fdda387f 1521{
1d6e8131
PC
1522 destroy_workqueue(recv_workqueue);
1523 destroy_workqueue(send_workqueue);
fdda387f
PC
1524}
1525
1d6e8131 1526static int work_start(void)
fdda387f 1527{
e43f055a
DT
1528 recv_workqueue = alloc_workqueue("dlm_recv",
1529 WQ_UNBOUND | WQ_MEM_RECLAIM, 1);
b9d41052
NK
1530 if (!recv_workqueue) {
1531 log_print("can't start dlm_recv");
1532 return -ENOMEM;
fdda387f 1533 }
fdda387f 1534
e43f055a
DT
1535 send_workqueue = alloc_workqueue("dlm_send",
1536 WQ_UNBOUND | WQ_MEM_RECLAIM, 1);
b9d41052
NK
1537 if (!send_workqueue) {
1538 log_print("can't start dlm_send");
1d6e8131 1539 destroy_workqueue(recv_workqueue);
b9d41052 1540 return -ENOMEM;
fdda387f 1541 }
fdda387f
PC
1542
1543 return 0;
1544}
1545
5e9ccc37 1546static void stop_conn(struct connection *con)
fdda387f 1547{
5e9ccc37 1548 con->flags |= 0x0F;
391fbdc5 1549 if (con->sock && con->sock->sk)
5e9ccc37
CC
1550 con->sock->sk->sk_user_data = NULL;
1551}
fdda387f 1552
5e9ccc37
CC
1553static void free_conn(struct connection *con)
1554{
0d737a8c 1555 close_connection(con, true, true, true);
5e9ccc37
CC
1556 if (con->othercon)
1557 kmem_cache_free(con_cache, con->othercon);
1558 hlist_del(&con->list);
1559 kmem_cache_free(con_cache, con);
1560}
1561
1562void dlm_lowcomms_stop(void)
1563{
ac33d071 1564 /* Set all the flags to prevent any
fdda387f
PC
1565 socket activity.
1566 */
7a936ce7 1567 mutex_lock(&connections_lock);
513ef596 1568 dlm_allow_conn = 0;
5e9ccc37 1569 foreach_conn(stop_conn);
7a936ce7 1570 mutex_unlock(&connections_lock);
ac33d071 1571
1d6e8131 1572 work_stop();
6ed7257b 1573
7a936ce7 1574 mutex_lock(&connections_lock);
fdda387f
PC
1575 clean_writequeues();
1576
5e9ccc37
CC
1577 foreach_conn(free_conn);
1578
7a936ce7 1579 mutex_unlock(&connections_lock);
fdda387f
PC
1580 kmem_cache_destroy(con_cache);
1581}
1582
fdda387f
PC
1583int dlm_lowcomms_start(void)
1584{
6ed7257b
PC
1585 int error = -EINVAL;
1586 struct connection *con;
5e9ccc37
CC
1587 int i;
1588
1589 for (i = 0; i < CONN_HASH_SIZE; i++)
1590 INIT_HLIST_HEAD(&connection_hash[i]);
fdda387f 1591
6ed7257b
PC
1592 init_local();
1593 if (!dlm_local_count) {
617e82e1 1594 error = -ENOTCONN;
fdda387f 1595 log_print("no local IP address has been set");
513ef596 1596 goto fail;
fdda387f
PC
1597 }
1598
6ed7257b 1599 error = -ENOMEM;
fdda387f 1600 con_cache = kmem_cache_create("dlm_conn", sizeof(struct connection),
ac33d071 1601 __alignof__(struct connection), 0,
20c2df83 1602 NULL);
fdda387f 1603 if (!con_cache)
513ef596
DT
1604 goto fail;
1605
1606 error = work_start();
1607 if (error)
1608 goto fail_destroy;
1609
1610 dlm_allow_conn = 1;
fdda387f 1611
fdda387f 1612 /* Start listening */
6ed7257b
PC
1613 if (dlm_config.ci_protocol == 0)
1614 error = tcp_listen_for_all();
1615 else
1616 error = sctp_listen_for_all();
fdda387f
PC
1617 if (error)
1618 goto fail_unlisten;
1619
fdda387f
PC
1620 return 0;
1621
ac33d071 1622fail_unlisten:
513ef596 1623 dlm_allow_conn = 0;
6ed7257b
PC
1624 con = nodeid2con(0,0);
1625 if (con) {
0d737a8c 1626 close_connection(con, false, true, true);
6ed7257b
PC
1627 kmem_cache_free(con_cache, con);
1628 }
513ef596 1629fail_destroy:
fdda387f 1630 kmem_cache_destroy(con_cache);
513ef596 1631fail:
fdda387f
PC
1632 return error;
1633}
36b71a8b
DT
1634
1635void dlm_lowcomms_exit(void)
1636{
1637 struct dlm_node_addr *na, *safe;
1638
1639 spin_lock(&dlm_node_addrs_spin);
1640 list_for_each_entry_safe(na, safe, &dlm_node_addrs, list) {
1641 list_del(&na->list);
1642 while (na->addr_count--)
1643 kfree(na->addr[na->addr_count]);
1644 kfree(na);
1645 }
1646 spin_unlock(&dlm_node_addrs_spin);
1647}
This page took 0.62537 seconds and 5 git commands to generate.