drbd: Converted got_Ping() and got_PingAck() from mdev to tconn
[deliverable/linux.git] / drivers / block / drbd / drbd_receiver.c
CommitLineData
b411b363
PR
1/*
2 drbd_receiver.c
3
4 This file is part of DRBD by Philipp Reisner and Lars Ellenberg.
5
6 Copyright (C) 2001-2008, LINBIT Information Technologies GmbH.
7 Copyright (C) 1999-2008, Philipp Reisner <philipp.reisner@linbit.com>.
8 Copyright (C) 2002-2008, Lars Ellenberg <lars.ellenberg@linbit.com>.
9
10 drbd is free software; you can redistribute it and/or modify
11 it under the terms of the GNU General Public License as published by
12 the Free Software Foundation; either version 2, or (at your option)
13 any later version.
14
15 drbd is distributed in the hope that it will be useful,
16 but WITHOUT ANY WARRANTY; without even the implied warranty of
17 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18 GNU General Public License for more details.
19
20 You should have received a copy of the GNU General Public License
21 along with drbd; see the file COPYING. If not, write to
22 the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
23 */
24
25
b411b363
PR
26#include <linux/module.h>
27
28#include <asm/uaccess.h>
29#include <net/sock.h>
30
b411b363
PR
31#include <linux/drbd.h>
32#include <linux/fs.h>
33#include <linux/file.h>
34#include <linux/in.h>
35#include <linux/mm.h>
36#include <linux/memcontrol.h>
37#include <linux/mm_inline.h>
38#include <linux/slab.h>
b411b363
PR
39#include <linux/pkt_sched.h>
40#define __KERNEL_SYSCALLS__
41#include <linux/unistd.h>
42#include <linux/vmalloc.h>
43#include <linux/random.h>
b411b363
PR
44#include <linux/string.h>
45#include <linux/scatterlist.h>
46#include "drbd_int.h"
b411b363
PR
47#include "drbd_req.h"
48
49#include "drbd_vli.h"
50
77351055
PR
51struct packet_info {
52 enum drbd_packet cmd;
53 int size;
54 int vnr;
55};
56
b411b363
PR
57enum finish_epoch {
58 FE_STILL_LIVE,
59 FE_DESTROYED,
60 FE_RECYCLED,
61};
62
a4fbda8e
PR
63enum mdev_or_conn {
64 MDEV,
65 CONN,
66};
67
65d11ed6 68static int drbd_do_handshake(struct drbd_tconn *tconn);
13e6037d 69static int drbd_do_auth(struct drbd_tconn *tconn);
360cc740 70static int drbd_disconnected(int vnr, void *p, void *data);
b411b363
PR
71
72static enum finish_epoch drbd_may_finish_epoch(struct drbd_conf *, struct drbd_epoch *, enum epoch_event);
00d56944 73static int e_end_block(struct drbd_work *, int);
b411b363 74
b411b363
PR
75
76#define GFP_TRY (__GFP_HIGHMEM | __GFP_NOWARN)
77
45bb912b
LE
78/*
79 * some helper functions to deal with single linked page lists,
80 * page->private being our "next" pointer.
81 */
82
83/* If at least n pages are linked at head, get n pages off.
84 * Otherwise, don't modify head, and return NULL.
85 * Locking is the responsibility of the caller.
86 */
87static struct page *page_chain_del(struct page **head, int n)
88{
89 struct page *page;
90 struct page *tmp;
91
92 BUG_ON(!n);
93 BUG_ON(!head);
94
95 page = *head;
23ce4227
PR
96
97 if (!page)
98 return NULL;
99
45bb912b
LE
100 while (page) {
101 tmp = page_chain_next(page);
102 if (--n == 0)
103 break; /* found sufficient pages */
104 if (tmp == NULL)
105 /* insufficient pages, don't use any of them. */
106 return NULL;
107 page = tmp;
108 }
109
110 /* add end of list marker for the returned list */
111 set_page_private(page, 0);
112 /* actual return value, and adjustment of head */
113 page = *head;
114 *head = tmp;
115 return page;
116}
117
118/* may be used outside of locks to find the tail of a (usually short)
119 * "private" page chain, before adding it back to a global chain head
120 * with page_chain_add() under a spinlock. */
121static struct page *page_chain_tail(struct page *page, int *len)
122{
123 struct page *tmp;
124 int i = 1;
125 while ((tmp = page_chain_next(page)))
126 ++i, page = tmp;
127 if (len)
128 *len = i;
129 return page;
130}
131
132static int page_chain_free(struct page *page)
133{
134 struct page *tmp;
135 int i = 0;
136 page_chain_for_each_safe(page, tmp) {
137 put_page(page);
138 ++i;
139 }
140 return i;
141}
142
143static void page_chain_add(struct page **head,
144 struct page *chain_first, struct page *chain_last)
145{
146#if 1
147 struct page *tmp;
148 tmp = page_chain_tail(chain_first, NULL);
149 BUG_ON(tmp != chain_last);
150#endif
151
152 /* add chain to head */
153 set_page_private(chain_last, (unsigned long)*head);
154 *head = chain_first;
155}
156
157static struct page *drbd_pp_first_pages_or_try_alloc(struct drbd_conf *mdev, int number)
b411b363
PR
158{
159 struct page *page = NULL;
45bb912b
LE
160 struct page *tmp = NULL;
161 int i = 0;
b411b363
PR
162
163 /* Yes, testing drbd_pp_vacant outside the lock is racy.
164 * So what. It saves a spin_lock. */
45bb912b 165 if (drbd_pp_vacant >= number) {
b411b363 166 spin_lock(&drbd_pp_lock);
45bb912b
LE
167 page = page_chain_del(&drbd_pp_pool, number);
168 if (page)
169 drbd_pp_vacant -= number;
b411b363 170 spin_unlock(&drbd_pp_lock);
45bb912b
LE
171 if (page)
172 return page;
b411b363 173 }
45bb912b 174
b411b363
PR
175 /* GFP_TRY, because we must not cause arbitrary write-out: in a DRBD
176 * "criss-cross" setup, that might cause write-out on some other DRBD,
177 * which in turn might block on the other node at this very place. */
45bb912b
LE
178 for (i = 0; i < number; i++) {
179 tmp = alloc_page(GFP_TRY);
180 if (!tmp)
181 break;
182 set_page_private(tmp, (unsigned long)page);
183 page = tmp;
184 }
185
186 if (i == number)
187 return page;
188
189 /* Not enough pages immediately available this time.
190 * No need to jump around here, drbd_pp_alloc will retry this
191 * function "soon". */
192 if (page) {
193 tmp = page_chain_tail(page, NULL);
194 spin_lock(&drbd_pp_lock);
195 page_chain_add(&drbd_pp_pool, page, tmp);
196 drbd_pp_vacant += i;
197 spin_unlock(&drbd_pp_lock);
198 }
199 return NULL;
b411b363
PR
200}
201
b411b363
PR
202static void reclaim_net_ee(struct drbd_conf *mdev, struct list_head *to_be_freed)
203{
db830c46 204 struct drbd_peer_request *peer_req;
b411b363
PR
205 struct list_head *le, *tle;
206
207 /* The EEs are always appended to the end of the list. Since
208 they are sent in order over the wire, they have to finish
209 in order. As soon as we see the first not finished we can
210 stop to examine the list... */
211
212 list_for_each_safe(le, tle, &mdev->net_ee) {
db830c46
AG
213 peer_req = list_entry(le, struct drbd_peer_request, w.list);
214 if (drbd_ee_has_active_page(peer_req))
b411b363
PR
215 break;
216 list_move(le, to_be_freed);
217 }
218}
219
220static void drbd_kick_lo_and_reclaim_net(struct drbd_conf *mdev)
221{
222 LIST_HEAD(reclaimed);
db830c46 223 struct drbd_peer_request *peer_req, *t;
b411b363 224
87eeee41 225 spin_lock_irq(&mdev->tconn->req_lock);
b411b363 226 reclaim_net_ee(mdev, &reclaimed);
87eeee41 227 spin_unlock_irq(&mdev->tconn->req_lock);
b411b363 228
db830c46
AG
229 list_for_each_entry_safe(peer_req, t, &reclaimed, w.list)
230 drbd_free_net_ee(mdev, peer_req);
b411b363
PR
231}
232
233/**
45bb912b 234 * drbd_pp_alloc() - Returns @number pages, retries forever (or until signalled)
b411b363 235 * @mdev: DRBD device.
45bb912b
LE
236 * @number: number of pages requested
237 * @retry: whether to retry, if not enough pages are available right now
238 *
239 * Tries to allocate number pages, first from our own page pool, then from
240 * the kernel, unless this allocation would exceed the max_buffers setting.
241 * Possibly retry until DRBD frees sufficient pages somewhere else.
b411b363 242 *
45bb912b 243 * Returns a page chain linked via page->private.
b411b363 244 */
45bb912b 245static struct page *drbd_pp_alloc(struct drbd_conf *mdev, unsigned number, bool retry)
b411b363
PR
246{
247 struct page *page = NULL;
248 DEFINE_WAIT(wait);
249
45bb912b
LE
250 /* Yes, we may run up to @number over max_buffers. If we
251 * follow it strictly, the admin will get it wrong anyways. */
89e58e75 252 if (atomic_read(&mdev->pp_in_use) < mdev->tconn->net_conf->max_buffers)
45bb912b 253 page = drbd_pp_first_pages_or_try_alloc(mdev, number);
b411b363 254
45bb912b 255 while (page == NULL) {
b411b363
PR
256 prepare_to_wait(&drbd_pp_wait, &wait, TASK_INTERRUPTIBLE);
257
258 drbd_kick_lo_and_reclaim_net(mdev);
259
89e58e75 260 if (atomic_read(&mdev->pp_in_use) < mdev->tconn->net_conf->max_buffers) {
45bb912b 261 page = drbd_pp_first_pages_or_try_alloc(mdev, number);
b411b363
PR
262 if (page)
263 break;
264 }
265
266 if (!retry)
267 break;
268
269 if (signal_pending(current)) {
270 dev_warn(DEV, "drbd_pp_alloc interrupted!\n");
271 break;
272 }
273
274 schedule();
275 }
276 finish_wait(&drbd_pp_wait, &wait);
277
45bb912b
LE
278 if (page)
279 atomic_add(number, &mdev->pp_in_use);
b411b363
PR
280 return page;
281}
282
283/* Must not be used from irq, as that may deadlock: see drbd_pp_alloc.
87eeee41 284 * Is also used from inside an other spin_lock_irq(&mdev->tconn->req_lock);
45bb912b
LE
285 * Either links the page chain back to the global pool,
286 * or returns all pages to the system. */
435f0740 287static void drbd_pp_free(struct drbd_conf *mdev, struct page *page, int is_net)
b411b363 288{
435f0740 289 atomic_t *a = is_net ? &mdev->pp_in_use_by_net : &mdev->pp_in_use;
b411b363 290 int i;
435f0740 291
81a5d60e 292 if (drbd_pp_vacant > (DRBD_MAX_BIO_SIZE/PAGE_SIZE) * minor_count)
45bb912b
LE
293 i = page_chain_free(page);
294 else {
295 struct page *tmp;
296 tmp = page_chain_tail(page, &i);
297 spin_lock(&drbd_pp_lock);
298 page_chain_add(&drbd_pp_pool, page, tmp);
299 drbd_pp_vacant += i;
300 spin_unlock(&drbd_pp_lock);
b411b363 301 }
435f0740 302 i = atomic_sub_return(i, a);
45bb912b 303 if (i < 0)
435f0740
LE
304 dev_warn(DEV, "ASSERTION FAILED: %s: %d < 0\n",
305 is_net ? "pp_in_use_by_net" : "pp_in_use", i);
b411b363
PR
306 wake_up(&drbd_pp_wait);
307}
308
309/*
310You need to hold the req_lock:
311 _drbd_wait_ee_list_empty()
312
313You must not have the req_lock:
314 drbd_free_ee()
315 drbd_alloc_ee()
316 drbd_init_ee()
317 drbd_release_ee()
318 drbd_ee_fix_bhs()
319 drbd_process_done_ee()
320 drbd_clear_done_ee()
321 drbd_wait_ee_list_empty()
322*/
323
f6ffca9f
AG
324struct drbd_peer_request *
325drbd_alloc_ee(struct drbd_conf *mdev, u64 id, sector_t sector,
326 unsigned int data_size, gfp_t gfp_mask) __must_hold(local)
b411b363 327{
db830c46 328 struct drbd_peer_request *peer_req;
b411b363 329 struct page *page;
45bb912b 330 unsigned nr_pages = (data_size + PAGE_SIZE -1) >> PAGE_SHIFT;
b411b363 331
0cf9d27e 332 if (drbd_insert_fault(mdev, DRBD_FAULT_AL_EE))
b411b363
PR
333 return NULL;
334
db830c46
AG
335 peer_req = mempool_alloc(drbd_ee_mempool, gfp_mask & ~__GFP_HIGHMEM);
336 if (!peer_req) {
b411b363
PR
337 if (!(gfp_mask & __GFP_NOWARN))
338 dev_err(DEV, "alloc_ee: Allocation of an EE failed\n");
339 return NULL;
340 }
341
45bb912b
LE
342 page = drbd_pp_alloc(mdev, nr_pages, (gfp_mask & __GFP_WAIT));
343 if (!page)
344 goto fail;
b411b363 345
db830c46
AG
346 drbd_clear_interval(&peer_req->i);
347 peer_req->i.size = data_size;
348 peer_req->i.sector = sector;
349 peer_req->i.local = false;
350 peer_req->i.waiting = false;
351
352 peer_req->epoch = NULL;
a21e9298 353 peer_req->w.mdev = mdev;
db830c46
AG
354 peer_req->pages = page;
355 atomic_set(&peer_req->pending_bios, 0);
356 peer_req->flags = 0;
9a8e7753
AG
357 /*
358 * The block_id is opaque to the receiver. It is not endianness
359 * converted, and sent back to the sender unchanged.
360 */
db830c46 361 peer_req->block_id = id;
b411b363 362
db830c46 363 return peer_req;
b411b363 364
45bb912b 365 fail:
db830c46 366 mempool_free(peer_req, drbd_ee_mempool);
b411b363
PR
367 return NULL;
368}
369
db830c46 370void drbd_free_some_ee(struct drbd_conf *mdev, struct drbd_peer_request *peer_req,
f6ffca9f 371 int is_net)
b411b363 372{
db830c46
AG
373 if (peer_req->flags & EE_HAS_DIGEST)
374 kfree(peer_req->digest);
375 drbd_pp_free(mdev, peer_req->pages, is_net);
376 D_ASSERT(atomic_read(&peer_req->pending_bios) == 0);
377 D_ASSERT(drbd_interval_empty(&peer_req->i));
378 mempool_free(peer_req, drbd_ee_mempool);
b411b363
PR
379}
380
381int drbd_release_ee(struct drbd_conf *mdev, struct list_head *list)
382{
383 LIST_HEAD(work_list);
db830c46 384 struct drbd_peer_request *peer_req, *t;
b411b363 385 int count = 0;
435f0740 386 int is_net = list == &mdev->net_ee;
b411b363 387
87eeee41 388 spin_lock_irq(&mdev->tconn->req_lock);
b411b363 389 list_splice_init(list, &work_list);
87eeee41 390 spin_unlock_irq(&mdev->tconn->req_lock);
b411b363 391
db830c46
AG
392 list_for_each_entry_safe(peer_req, t, &work_list, w.list) {
393 drbd_free_some_ee(mdev, peer_req, is_net);
b411b363
PR
394 count++;
395 }
396 return count;
397}
398
399
32862ec7 400/* See also comments in _req_mod(,BARRIER_ACKED)
b411b363
PR
401 * and receive_Barrier.
402 *
403 * Move entries from net_ee to done_ee, if ready.
404 * Grab done_ee, call all callbacks, free the entries.
405 * The callbacks typically send out ACKs.
406 */
407static int drbd_process_done_ee(struct drbd_conf *mdev)
408{
409 LIST_HEAD(work_list);
410 LIST_HEAD(reclaimed);
db830c46 411 struct drbd_peer_request *peer_req, *t;
082a3439 412 int ok = 1;
b411b363 413
87eeee41 414 spin_lock_irq(&mdev->tconn->req_lock);
b411b363
PR
415 reclaim_net_ee(mdev, &reclaimed);
416 list_splice_init(&mdev->done_ee, &work_list);
87eeee41 417 spin_unlock_irq(&mdev->tconn->req_lock);
b411b363 418
db830c46
AG
419 list_for_each_entry_safe(peer_req, t, &reclaimed, w.list)
420 drbd_free_net_ee(mdev, peer_req);
b411b363
PR
421
422 /* possible callbacks here:
7be8da07 423 * e_end_block, and e_end_resync_block, e_send_discard_write.
b411b363
PR
424 * all ignore the last argument.
425 */
db830c46 426 list_for_each_entry_safe(peer_req, t, &work_list, w.list) {
b411b363 427 /* list_del not necessary, next/prev members not touched */
00d56944 428 ok = peer_req->w.cb(&peer_req->w, !ok) && ok;
db830c46 429 drbd_free_ee(mdev, peer_req);
b411b363
PR
430 }
431 wake_up(&mdev->ee_wait);
432
433 return ok;
434}
435
436void _drbd_wait_ee_list_empty(struct drbd_conf *mdev, struct list_head *head)
437{
438 DEFINE_WAIT(wait);
439
440 /* avoids spin_lock/unlock
441 * and calling prepare_to_wait in the fast path */
442 while (!list_empty(head)) {
443 prepare_to_wait(&mdev->ee_wait, &wait, TASK_UNINTERRUPTIBLE);
87eeee41 444 spin_unlock_irq(&mdev->tconn->req_lock);
7eaceacc 445 io_schedule();
b411b363 446 finish_wait(&mdev->ee_wait, &wait);
87eeee41 447 spin_lock_irq(&mdev->tconn->req_lock);
b411b363
PR
448 }
449}
450
451void drbd_wait_ee_list_empty(struct drbd_conf *mdev, struct list_head *head)
452{
87eeee41 453 spin_lock_irq(&mdev->tconn->req_lock);
b411b363 454 _drbd_wait_ee_list_empty(mdev, head);
87eeee41 455 spin_unlock_irq(&mdev->tconn->req_lock);
b411b363
PR
456}
457
458/* see also kernel_accept; which is only present since 2.6.18.
459 * also we want to log which part of it failed, exactly */
7653620d 460static int drbd_accept(const char **what, struct socket *sock, struct socket **newsock)
b411b363
PR
461{
462 struct sock *sk = sock->sk;
463 int err = 0;
464
465 *what = "listen";
466 err = sock->ops->listen(sock, 5);
467 if (err < 0)
468 goto out;
469
470 *what = "sock_create_lite";
471 err = sock_create_lite(sk->sk_family, sk->sk_type, sk->sk_protocol,
472 newsock);
473 if (err < 0)
474 goto out;
475
476 *what = "accept";
477 err = sock->ops->accept(sock, *newsock, 0);
478 if (err < 0) {
479 sock_release(*newsock);
480 *newsock = NULL;
481 goto out;
482 }
483 (*newsock)->ops = sock->ops;
484
485out:
486 return err;
487}
488
dbd9eea0 489static int drbd_recv_short(struct socket *sock, void *buf, size_t size, int flags)
b411b363
PR
490{
491 mm_segment_t oldfs;
492 struct kvec iov = {
493 .iov_base = buf,
494 .iov_len = size,
495 };
496 struct msghdr msg = {
497 .msg_iovlen = 1,
498 .msg_iov = (struct iovec *)&iov,
499 .msg_flags = (flags ? flags : MSG_WAITALL | MSG_NOSIGNAL)
500 };
501 int rv;
502
503 oldfs = get_fs();
504 set_fs(KERNEL_DS);
505 rv = sock_recvmsg(sock, &msg, size, msg.msg_flags);
506 set_fs(oldfs);
507
508 return rv;
509}
510
de0ff338 511static int drbd_recv(struct drbd_tconn *tconn, void *buf, size_t size)
b411b363
PR
512{
513 mm_segment_t oldfs;
514 struct kvec iov = {
515 .iov_base = buf,
516 .iov_len = size,
517 };
518 struct msghdr msg = {
519 .msg_iovlen = 1,
520 .msg_iov = (struct iovec *)&iov,
521 .msg_flags = MSG_WAITALL | MSG_NOSIGNAL
522 };
523 int rv;
524
525 oldfs = get_fs();
526 set_fs(KERNEL_DS);
527
528 for (;;) {
de0ff338 529 rv = sock_recvmsg(tconn->data.socket, &msg, size, msg.msg_flags);
b411b363
PR
530 if (rv == size)
531 break;
532
533 /* Note:
534 * ECONNRESET other side closed the connection
535 * ERESTARTSYS (on sock) we got a signal
536 */
537
538 if (rv < 0) {
539 if (rv == -ECONNRESET)
de0ff338 540 conn_info(tconn, "sock was reset by peer\n");
b411b363 541 else if (rv != -ERESTARTSYS)
de0ff338 542 conn_err(tconn, "sock_recvmsg returned %d\n", rv);
b411b363
PR
543 break;
544 } else if (rv == 0) {
de0ff338 545 conn_info(tconn, "sock was shut down by peer\n");
b411b363
PR
546 break;
547 } else {
548 /* signal came in, or peer/link went down,
549 * after we read a partial message
550 */
551 /* D_ASSERT(signal_pending(current)); */
552 break;
553 }
554 };
555
556 set_fs(oldfs);
557
558 if (rv != size)
bbeb641c 559 conn_request_state(tconn, NS(conn, C_BROKEN_PIPE), CS_HARD);
b411b363
PR
560
561 return rv;
562}
563
5dbf1673
LE
564/* quoting tcp(7):
565 * On individual connections, the socket buffer size must be set prior to the
566 * listen(2) or connect(2) calls in order to have it take effect.
567 * This is our wrapper to do so.
568 */
569static void drbd_setbufsize(struct socket *sock, unsigned int snd,
570 unsigned int rcv)
571{
572 /* open coded SO_SNDBUF, SO_RCVBUF */
573 if (snd) {
574 sock->sk->sk_sndbuf = snd;
575 sock->sk->sk_userlocks |= SOCK_SNDBUF_LOCK;
576 }
577 if (rcv) {
578 sock->sk->sk_rcvbuf = rcv;
579 sock->sk->sk_userlocks |= SOCK_RCVBUF_LOCK;
580 }
581}
582
eac3e990 583static struct socket *drbd_try_connect(struct drbd_tconn *tconn)
b411b363
PR
584{
585 const char *what;
586 struct socket *sock;
587 struct sockaddr_in6 src_in6;
588 int err;
589 int disconnect_on_error = 1;
590
eac3e990 591 if (!get_net_conf(tconn))
b411b363
PR
592 return NULL;
593
594 what = "sock_create_kern";
eac3e990 595 err = sock_create_kern(((struct sockaddr *)tconn->net_conf->my_addr)->sa_family,
b411b363
PR
596 SOCK_STREAM, IPPROTO_TCP, &sock);
597 if (err < 0) {
598 sock = NULL;
599 goto out;
600 }
601
602 sock->sk->sk_rcvtimeo =
eac3e990
PR
603 sock->sk->sk_sndtimeo = tconn->net_conf->try_connect_int*HZ;
604 drbd_setbufsize(sock, tconn->net_conf->sndbuf_size,
605 tconn->net_conf->rcvbuf_size);
b411b363
PR
606
607 /* explicitly bind to the configured IP as source IP
608 * for the outgoing connections.
609 * This is needed for multihomed hosts and to be
610 * able to use lo: interfaces for drbd.
611 * Make sure to use 0 as port number, so linux selects
612 * a free one dynamically.
613 */
eac3e990
PR
614 memcpy(&src_in6, tconn->net_conf->my_addr,
615 min_t(int, tconn->net_conf->my_addr_len, sizeof(src_in6)));
616 if (((struct sockaddr *)tconn->net_conf->my_addr)->sa_family == AF_INET6)
b411b363
PR
617 src_in6.sin6_port = 0;
618 else
619 ((struct sockaddr_in *)&src_in6)->sin_port = 0; /* AF_INET & AF_SCI */
620
621 what = "bind before connect";
622 err = sock->ops->bind(sock,
623 (struct sockaddr *) &src_in6,
eac3e990 624 tconn->net_conf->my_addr_len);
b411b363
PR
625 if (err < 0)
626 goto out;
627
628 /* connect may fail, peer not yet available.
629 * stay C_WF_CONNECTION, don't go Disconnecting! */
630 disconnect_on_error = 0;
631 what = "connect";
632 err = sock->ops->connect(sock,
eac3e990
PR
633 (struct sockaddr *)tconn->net_conf->peer_addr,
634 tconn->net_conf->peer_addr_len, 0);
b411b363
PR
635
636out:
637 if (err < 0) {
638 if (sock) {
639 sock_release(sock);
640 sock = NULL;
641 }
642 switch (-err) {
643 /* timeout, busy, signal pending */
644 case ETIMEDOUT: case EAGAIN: case EINPROGRESS:
645 case EINTR: case ERESTARTSYS:
646 /* peer not (yet) available, network problem */
647 case ECONNREFUSED: case ENETUNREACH:
648 case EHOSTDOWN: case EHOSTUNREACH:
649 disconnect_on_error = 0;
650 break;
651 default:
eac3e990 652 conn_err(tconn, "%s failed, err = %d\n", what, err);
b411b363
PR
653 }
654 if (disconnect_on_error)
bbeb641c 655 conn_request_state(tconn, NS(conn, C_DISCONNECTING), CS_HARD);
b411b363 656 }
eac3e990 657 put_net_conf(tconn);
b411b363
PR
658 return sock;
659}
660
7653620d 661static struct socket *drbd_wait_for_connect(struct drbd_tconn *tconn)
b411b363
PR
662{
663 int timeo, err;
664 struct socket *s_estab = NULL, *s_listen;
665 const char *what;
666
7653620d 667 if (!get_net_conf(tconn))
b411b363
PR
668 return NULL;
669
670 what = "sock_create_kern";
7653620d 671 err = sock_create_kern(((struct sockaddr *)tconn->net_conf->my_addr)->sa_family,
b411b363
PR
672 SOCK_STREAM, IPPROTO_TCP, &s_listen);
673 if (err) {
674 s_listen = NULL;
675 goto out;
676 }
677
7653620d 678 timeo = tconn->net_conf->try_connect_int * HZ;
b411b363
PR
679 timeo += (random32() & 1) ? timeo / 7 : -timeo / 7; /* 28.5% random jitter */
680
681 s_listen->sk->sk_reuse = 1; /* SO_REUSEADDR */
682 s_listen->sk->sk_rcvtimeo = timeo;
683 s_listen->sk->sk_sndtimeo = timeo;
7653620d
PR
684 drbd_setbufsize(s_listen, tconn->net_conf->sndbuf_size,
685 tconn->net_conf->rcvbuf_size);
b411b363
PR
686
687 what = "bind before listen";
688 err = s_listen->ops->bind(s_listen,
7653620d
PR
689 (struct sockaddr *) tconn->net_conf->my_addr,
690 tconn->net_conf->my_addr_len);
b411b363
PR
691 if (err < 0)
692 goto out;
693
7653620d 694 err = drbd_accept(&what, s_listen, &s_estab);
b411b363
PR
695
696out:
697 if (s_listen)
698 sock_release(s_listen);
699 if (err < 0) {
700 if (err != -EAGAIN && err != -EINTR && err != -ERESTARTSYS) {
7653620d 701 conn_err(tconn, "%s failed, err = %d\n", what, err);
bbeb641c 702 conn_request_state(tconn, NS(conn, C_DISCONNECTING), CS_HARD);
b411b363
PR
703 }
704 }
7653620d 705 put_net_conf(tconn);
b411b363
PR
706
707 return s_estab;
708}
709
d38e787e 710static int drbd_send_fp(struct drbd_tconn *tconn, struct socket *sock, enum drbd_packet cmd)
b411b363 711{
d38e787e 712 struct p_header *h = &tconn->data.sbuf.header;
b411b363 713
d38e787e 714 return _conn_send_cmd(tconn, 0, sock, cmd, h, sizeof(*h), 0);
b411b363
PR
715}
716
a25b63f1 717static enum drbd_packet drbd_recv_fp(struct drbd_tconn *tconn, struct socket *sock)
b411b363 718{
a25b63f1 719 struct p_header80 *h = &tconn->data.rbuf.header.h80;
b411b363
PR
720 int rr;
721
dbd9eea0 722 rr = drbd_recv_short(sock, h, sizeof(*h), 0);
b411b363 723
ca9bc12b 724 if (rr == sizeof(*h) && h->magic == cpu_to_be32(DRBD_MAGIC))
b411b363
PR
725 return be16_to_cpu(h->command);
726
727 return 0xffff;
728}
729
730/**
731 * drbd_socket_okay() - Free the socket if its connection is not okay
b411b363
PR
732 * @sock: pointer to the pointer to the socket.
733 */
dbd9eea0 734static int drbd_socket_okay(struct socket **sock)
b411b363
PR
735{
736 int rr;
737 char tb[4];
738
739 if (!*sock)
81e84650 740 return false;
b411b363 741
dbd9eea0 742 rr = drbd_recv_short(*sock, tb, 4, MSG_DONTWAIT | MSG_PEEK);
b411b363
PR
743
744 if (rr > 0 || rr == -EAGAIN) {
81e84650 745 return true;
b411b363
PR
746 } else {
747 sock_release(*sock);
748 *sock = NULL;
81e84650 749 return false;
b411b363
PR
750 }
751}
2325eb66
PR
752/* Gets called if a connection is established, or if a new minor gets created
753 in a connection */
754int drbd_connected(int vnr, void *p, void *data)
907599e0
PR
755{
756 struct drbd_conf *mdev = (struct drbd_conf *)p;
757 int ok = 1;
758
759 atomic_set(&mdev->packet_seq, 0);
760 mdev->peer_seq = 0;
761
8410da8f
PR
762 mdev->state_mutex = mdev->tconn->agreed_pro_version < 100 ?
763 &mdev->tconn->cstate_mutex :
764 &mdev->own_state_mutex;
765
907599e0
PR
766 ok &= drbd_send_sync_param(mdev, &mdev->sync_conf);
767 ok &= drbd_send_sizes(mdev, 0, 0);
768 ok &= drbd_send_uuids(mdev);
769 ok &= drbd_send_state(mdev);
770 clear_bit(USE_DEGR_WFC_T, &mdev->flags);
771 clear_bit(RESIZE_PENDING, &mdev->flags);
772
8410da8f 773
907599e0
PR
774 return !ok;
775}
776
b411b363
PR
777/*
778 * return values:
779 * 1 yes, we have a valid connection
780 * 0 oops, did not work out, please try again
781 * -1 peer talks different language,
782 * no point in trying again, please go standalone.
783 * -2 We do not have a network config...
784 */
907599e0 785static int drbd_connect(struct drbd_tconn *tconn)
b411b363
PR
786{
787 struct socket *s, *sock, *msock;
788 int try, h, ok;
789
bbeb641c 790 if (conn_request_state(tconn, NS(conn, C_WF_CONNECTION), CS_VERBOSE) < SS_SUCCESS)
b411b363
PR
791 return -2;
792
907599e0
PR
793 clear_bit(DISCARD_CONCURRENT, &tconn->flags);
794 tconn->agreed_pro_version = 99;
fd340c12
PR
795 /* agreed_pro_version must be smaller than 100 so we send the old
796 header (h80) in the first packet and in the handshake packet. */
b411b363
PR
797
798 sock = NULL;
799 msock = NULL;
800
801 do {
802 for (try = 0;;) {
803 /* 3 tries, this should take less than a second! */
907599e0 804 s = drbd_try_connect(tconn);
b411b363
PR
805 if (s || ++try >= 3)
806 break;
807 /* give the other side time to call bind() & listen() */
20ee6390 808 schedule_timeout_interruptible(HZ / 10);
b411b363
PR
809 }
810
811 if (s) {
812 if (!sock) {
907599e0 813 drbd_send_fp(tconn, s, P_HAND_SHAKE_S);
b411b363
PR
814 sock = s;
815 s = NULL;
816 } else if (!msock) {
907599e0 817 drbd_send_fp(tconn, s, P_HAND_SHAKE_M);
b411b363
PR
818 msock = s;
819 s = NULL;
820 } else {
907599e0 821 conn_err(tconn, "Logic error in drbd_connect()\n");
b411b363
PR
822 goto out_release_sockets;
823 }
824 }
825
826 if (sock && msock) {
907599e0 827 schedule_timeout_interruptible(tconn->net_conf->ping_timeo*HZ/10);
dbd9eea0
PR
828 ok = drbd_socket_okay(&sock);
829 ok = drbd_socket_okay(&msock) && ok;
b411b363
PR
830 if (ok)
831 break;
832 }
833
834retry:
907599e0 835 s = drbd_wait_for_connect(tconn);
b411b363 836 if (s) {
907599e0 837 try = drbd_recv_fp(tconn, s);
dbd9eea0
PR
838 drbd_socket_okay(&sock);
839 drbd_socket_okay(&msock);
b411b363
PR
840 switch (try) {
841 case P_HAND_SHAKE_S:
842 if (sock) {
907599e0 843 conn_warn(tconn, "initial packet S crossed\n");
b411b363
PR
844 sock_release(sock);
845 }
846 sock = s;
847 break;
848 case P_HAND_SHAKE_M:
849 if (msock) {
907599e0 850 conn_warn(tconn, "initial packet M crossed\n");
b411b363
PR
851 sock_release(msock);
852 }
853 msock = s;
907599e0 854 set_bit(DISCARD_CONCURRENT, &tconn->flags);
b411b363
PR
855 break;
856 default:
907599e0 857 conn_warn(tconn, "Error receiving initial packet\n");
b411b363
PR
858 sock_release(s);
859 if (random32() & 1)
860 goto retry;
861 }
862 }
863
bbeb641c 864 if (tconn->cstate <= C_DISCONNECTING)
b411b363
PR
865 goto out_release_sockets;
866 if (signal_pending(current)) {
867 flush_signals(current);
868 smp_rmb();
907599e0 869 if (get_t_state(&tconn->receiver) == EXITING)
b411b363
PR
870 goto out_release_sockets;
871 }
872
873 if (sock && msock) {
dbd9eea0
PR
874 ok = drbd_socket_okay(&sock);
875 ok = drbd_socket_okay(&msock) && ok;
b411b363
PR
876 if (ok)
877 break;
878 }
879 } while (1);
880
881 msock->sk->sk_reuse = 1; /* SO_REUSEADDR */
882 sock->sk->sk_reuse = 1; /* SO_REUSEADDR */
883
884 sock->sk->sk_allocation = GFP_NOIO;
885 msock->sk->sk_allocation = GFP_NOIO;
886
887 sock->sk->sk_priority = TC_PRIO_INTERACTIVE_BULK;
888 msock->sk->sk_priority = TC_PRIO_INTERACTIVE;
889
b411b363 890 /* NOT YET ...
907599e0 891 * sock->sk->sk_sndtimeo = tconn->net_conf->timeout*HZ/10;
b411b363
PR
892 * sock->sk->sk_rcvtimeo = MAX_SCHEDULE_TIMEOUT;
893 * first set it to the P_HAND_SHAKE timeout,
894 * which we set to 4x the configured ping_timeout. */
895 sock->sk->sk_sndtimeo =
907599e0 896 sock->sk->sk_rcvtimeo = tconn->net_conf->ping_timeo*4*HZ/10;
b411b363 897
907599e0
PR
898 msock->sk->sk_sndtimeo = tconn->net_conf->timeout*HZ/10;
899 msock->sk->sk_rcvtimeo = tconn->net_conf->ping_int*HZ;
b411b363
PR
900
901 /* we don't want delays.
25985edc 902 * we use TCP_CORK where appropriate, though */
b411b363
PR
903 drbd_tcp_nodelay(sock);
904 drbd_tcp_nodelay(msock);
905
907599e0
PR
906 tconn->data.socket = sock;
907 tconn->meta.socket = msock;
908 tconn->last_received = jiffies;
b411b363 909
907599e0 910 h = drbd_do_handshake(tconn);
b411b363
PR
911 if (h <= 0)
912 return h;
913
907599e0 914 if (tconn->cram_hmac_tfm) {
b411b363 915 /* drbd_request_state(mdev, NS(conn, WFAuth)); */
907599e0 916 switch (drbd_do_auth(tconn)) {
b10d96cb 917 case -1:
907599e0 918 conn_err(tconn, "Authentication of peer failed\n");
b411b363 919 return -1;
b10d96cb 920 case 0:
907599e0 921 conn_err(tconn, "Authentication of peer failed, trying again.\n");
b10d96cb 922 return 0;
b411b363
PR
923 }
924 }
925
bbeb641c 926 if (conn_request_state(tconn, NS(conn, C_WF_REPORT_PARAMS), CS_VERBOSE) < SS_SUCCESS)
b411b363
PR
927 return 0;
928
907599e0 929 sock->sk->sk_sndtimeo = tconn->net_conf->timeout*HZ/10;
b411b363
PR
930 sock->sk->sk_rcvtimeo = MAX_SCHEDULE_TIMEOUT;
931
907599e0 932 drbd_thread_start(&tconn->asender);
b411b363 933
907599e0 934 if (drbd_send_protocol(tconn) == -1)
7e2455c1 935 return -1;
b411b363 936
907599e0 937 return !idr_for_each(&tconn->volumes, drbd_connected, tconn);
b411b363
PR
938
939out_release_sockets:
940 if (sock)
941 sock_release(sock);
942 if (msock)
943 sock_release(msock);
944 return -1;
945}
946
ce243853 947static bool decode_header(struct drbd_tconn *tconn, struct p_header *h, struct packet_info *pi)
b411b363 948{
fd340c12 949 if (h->h80.magic == cpu_to_be32(DRBD_MAGIC)) {
77351055
PR
950 pi->cmd = be16_to_cpu(h->h80.command);
951 pi->size = be16_to_cpu(h->h80.length);
eefc2f7d 952 pi->vnr = 0;
ca9bc12b 953 } else if (h->h95.magic == cpu_to_be16(DRBD_MAGIC_BIG)) {
77351055
PR
954 pi->cmd = be16_to_cpu(h->h95.command);
955 pi->size = be32_to_cpu(h->h95.length) & 0x00ffffff;
956 pi->vnr = 0;
02918be2 957 } else {
ce243853 958 conn_err(tconn, "magic?? on data m: 0x%08x c: %d l: %d\n",
004352fa
LE
959 be32_to_cpu(h->h80.magic),
960 be16_to_cpu(h->h80.command),
961 be16_to_cpu(h->h80.length));
81e84650 962 return false;
b411b363 963 }
257d0af6
PR
964 return true;
965}
966
9ba7aa00 967static int drbd_recv_header(struct drbd_tconn *tconn, struct packet_info *pi)
257d0af6 968{
9ba7aa00 969 struct p_header *h = &tconn->data.rbuf.header;
257d0af6
PR
970 int r;
971
9ba7aa00 972 r = drbd_recv(tconn, h, sizeof(*h));
257d0af6
PR
973 if (unlikely(r != sizeof(*h))) {
974 if (!signal_pending(current))
9ba7aa00 975 conn_warn(tconn, "short read expecting header on sock: r=%d\n", r);
257d0af6
PR
976 return false;
977 }
978
9ba7aa00
PR
979 r = decode_header(tconn, h, pi);
980 tconn->last_received = jiffies;
b411b363 981
257d0af6 982 return r;
b411b363
PR
983}
984
2451fc3b 985static void drbd_flush(struct drbd_conf *mdev)
b411b363
PR
986{
987 int rv;
988
989 if (mdev->write_ordering >= WO_bdev_flush && get_ldev(mdev)) {
fbd9b09a 990 rv = blkdev_issue_flush(mdev->ldev->backing_bdev, GFP_KERNEL,
dd3932ed 991 NULL);
b411b363
PR
992 if (rv) {
993 dev_err(DEV, "local disk flush failed with status %d\n", rv);
994 /* would rather check on EOPNOTSUPP, but that is not reliable.
995 * don't try again for ANY return value != 0
996 * if (rv == -EOPNOTSUPP) */
997 drbd_bump_write_ordering(mdev, WO_drain_io);
998 }
999 put_ldev(mdev);
1000 }
b411b363
PR
1001}
1002
1003/**
1004 * drbd_may_finish_epoch() - Applies an epoch_event to the epoch's state, eventually finishes it.
1005 * @mdev: DRBD device.
1006 * @epoch: Epoch object.
1007 * @ev: Epoch event.
1008 */
1009static enum finish_epoch drbd_may_finish_epoch(struct drbd_conf *mdev,
1010 struct drbd_epoch *epoch,
1011 enum epoch_event ev)
1012{
2451fc3b 1013 int epoch_size;
b411b363 1014 struct drbd_epoch *next_epoch;
b411b363
PR
1015 enum finish_epoch rv = FE_STILL_LIVE;
1016
1017 spin_lock(&mdev->epoch_lock);
1018 do {
1019 next_epoch = NULL;
b411b363
PR
1020
1021 epoch_size = atomic_read(&epoch->epoch_size);
1022
1023 switch (ev & ~EV_CLEANUP) {
1024 case EV_PUT:
1025 atomic_dec(&epoch->active);
1026 break;
1027 case EV_GOT_BARRIER_NR:
1028 set_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags);
b411b363
PR
1029 break;
1030 case EV_BECAME_LAST:
1031 /* nothing to do*/
1032 break;
1033 }
1034
b411b363
PR
1035 if (epoch_size != 0 &&
1036 atomic_read(&epoch->active) == 0 &&
2451fc3b 1037 test_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags)) {
b411b363
PR
1038 if (!(ev & EV_CLEANUP)) {
1039 spin_unlock(&mdev->epoch_lock);
1040 drbd_send_b_ack(mdev, epoch->barrier_nr, epoch_size);
1041 spin_lock(&mdev->epoch_lock);
1042 }
1043 dec_unacked(mdev);
1044
1045 if (mdev->current_epoch != epoch) {
1046 next_epoch = list_entry(epoch->list.next, struct drbd_epoch, list);
1047 list_del(&epoch->list);
1048 ev = EV_BECAME_LAST | (ev & EV_CLEANUP);
1049 mdev->epochs--;
b411b363
PR
1050 kfree(epoch);
1051
1052 if (rv == FE_STILL_LIVE)
1053 rv = FE_DESTROYED;
1054 } else {
1055 epoch->flags = 0;
1056 atomic_set(&epoch->epoch_size, 0);
698f9315 1057 /* atomic_set(&epoch->active, 0); is already zero */
b411b363
PR
1058 if (rv == FE_STILL_LIVE)
1059 rv = FE_RECYCLED;
2451fc3b 1060 wake_up(&mdev->ee_wait);
b411b363
PR
1061 }
1062 }
1063
1064 if (!next_epoch)
1065 break;
1066
1067 epoch = next_epoch;
1068 } while (1);
1069
1070 spin_unlock(&mdev->epoch_lock);
1071
b411b363
PR
1072 return rv;
1073}
1074
1075/**
1076 * drbd_bump_write_ordering() - Fall back to an other write ordering method
1077 * @mdev: DRBD device.
1078 * @wo: Write ordering method to try.
1079 */
1080void drbd_bump_write_ordering(struct drbd_conf *mdev, enum write_ordering_e wo) __must_hold(local)
1081{
1082 enum write_ordering_e pwo;
1083 static char *write_ordering_str[] = {
1084 [WO_none] = "none",
1085 [WO_drain_io] = "drain",
1086 [WO_bdev_flush] = "flush",
b411b363
PR
1087 };
1088
1089 pwo = mdev->write_ordering;
1090 wo = min(pwo, wo);
b411b363
PR
1091 if (wo == WO_bdev_flush && mdev->ldev->dc.no_disk_flush)
1092 wo = WO_drain_io;
1093 if (wo == WO_drain_io && mdev->ldev->dc.no_disk_drain)
1094 wo = WO_none;
1095 mdev->write_ordering = wo;
2451fc3b 1096 if (pwo != mdev->write_ordering || wo == WO_bdev_flush)
b411b363
PR
1097 dev_info(DEV, "Method to ensure write ordering: %s\n", write_ordering_str[mdev->write_ordering]);
1098}
1099
45bb912b 1100/**
fbe29dec 1101 * drbd_submit_peer_request()
45bb912b 1102 * @mdev: DRBD device.
db830c46 1103 * @peer_req: peer request
45bb912b 1104 * @rw: flag field, see bio->bi_rw
10f6d992
LE
1105 *
1106 * May spread the pages to multiple bios,
1107 * depending on bio_add_page restrictions.
1108 *
1109 * Returns 0 if all bios have been submitted,
1110 * -ENOMEM if we could not allocate enough bios,
1111 * -ENOSPC (any better suggestion?) if we have not been able to bio_add_page a
1112 * single page to an empty bio (which should never happen and likely indicates
1113 * that the lower level IO stack is in some way broken). This has been observed
1114 * on certain Xen deployments.
45bb912b
LE
1115 */
1116/* TODO allocate from our own bio_set. */
fbe29dec
AG
1117int drbd_submit_peer_request(struct drbd_conf *mdev,
1118 struct drbd_peer_request *peer_req,
1119 const unsigned rw, const int fault_type)
45bb912b
LE
1120{
1121 struct bio *bios = NULL;
1122 struct bio *bio;
db830c46
AG
1123 struct page *page = peer_req->pages;
1124 sector_t sector = peer_req->i.sector;
1125 unsigned ds = peer_req->i.size;
45bb912b
LE
1126 unsigned n_bios = 0;
1127 unsigned nr_pages = (ds + PAGE_SIZE -1) >> PAGE_SHIFT;
10f6d992 1128 int err = -ENOMEM;
45bb912b
LE
1129
1130 /* In most cases, we will only need one bio. But in case the lower
1131 * level restrictions happen to be different at this offset on this
1132 * side than those of the sending peer, we may need to submit the
da4a75d2
LE
1133 * request in more than one bio.
1134 *
1135 * Plain bio_alloc is good enough here, this is no DRBD internally
1136 * generated bio, but a bio allocated on behalf of the peer.
1137 */
45bb912b
LE
1138next_bio:
1139 bio = bio_alloc(GFP_NOIO, nr_pages);
1140 if (!bio) {
1141 dev_err(DEV, "submit_ee: Allocation of a bio failed\n");
1142 goto fail;
1143 }
db830c46 1144 /* > peer_req->i.sector, unless this is the first bio */
45bb912b
LE
1145 bio->bi_sector = sector;
1146 bio->bi_bdev = mdev->ldev->backing_bdev;
45bb912b 1147 bio->bi_rw = rw;
db830c46 1148 bio->bi_private = peer_req;
fcefa62e 1149 bio->bi_end_io = drbd_peer_request_endio;
45bb912b
LE
1150
1151 bio->bi_next = bios;
1152 bios = bio;
1153 ++n_bios;
1154
1155 page_chain_for_each(page) {
1156 unsigned len = min_t(unsigned, ds, PAGE_SIZE);
1157 if (!bio_add_page(bio, page, len, 0)) {
10f6d992
LE
1158 /* A single page must always be possible!
1159 * But in case it fails anyways,
1160 * we deal with it, and complain (below). */
1161 if (bio->bi_vcnt == 0) {
1162 dev_err(DEV,
1163 "bio_add_page failed for len=%u, "
1164 "bi_vcnt=0 (bi_sector=%llu)\n",
1165 len, (unsigned long long)bio->bi_sector);
1166 err = -ENOSPC;
1167 goto fail;
1168 }
45bb912b
LE
1169 goto next_bio;
1170 }
1171 ds -= len;
1172 sector += len >> 9;
1173 --nr_pages;
1174 }
1175 D_ASSERT(page == NULL);
1176 D_ASSERT(ds == 0);
1177
db830c46 1178 atomic_set(&peer_req->pending_bios, n_bios);
45bb912b
LE
1179 do {
1180 bio = bios;
1181 bios = bios->bi_next;
1182 bio->bi_next = NULL;
1183
45bb912b 1184 drbd_generic_make_request(mdev, fault_type, bio);
45bb912b 1185 } while (bios);
45bb912b
LE
1186 return 0;
1187
1188fail:
1189 while (bios) {
1190 bio = bios;
1191 bios = bios->bi_next;
1192 bio_put(bio);
1193 }
10f6d992 1194 return err;
45bb912b
LE
1195}
1196
53840641 1197static void drbd_remove_epoch_entry_interval(struct drbd_conf *mdev,
db830c46 1198 struct drbd_peer_request *peer_req)
53840641 1199{
db830c46 1200 struct drbd_interval *i = &peer_req->i;
53840641
AG
1201
1202 drbd_remove_interval(&mdev->write_requests, i);
1203 drbd_clear_interval(i);
1204
6c852bec 1205 /* Wake up any processes waiting for this peer request to complete. */
53840641
AG
1206 if (i->waiting)
1207 wake_up(&mdev->misc_wait);
1208}
1209
d8763023
AG
1210static int receive_Barrier(struct drbd_conf *mdev, enum drbd_packet cmd,
1211 unsigned int data_size)
b411b363 1212{
2451fc3b 1213 int rv;
e42325a5 1214 struct p_barrier *p = &mdev->tconn->data.rbuf.barrier;
b411b363
PR
1215 struct drbd_epoch *epoch;
1216
b411b363
PR
1217 inc_unacked(mdev);
1218
b411b363
PR
1219 mdev->current_epoch->barrier_nr = p->barrier;
1220 rv = drbd_may_finish_epoch(mdev, mdev->current_epoch, EV_GOT_BARRIER_NR);
1221
1222 /* P_BARRIER_ACK may imply that the corresponding extent is dropped from
1223 * the activity log, which means it would not be resynced in case the
1224 * R_PRIMARY crashes now.
1225 * Therefore we must send the barrier_ack after the barrier request was
1226 * completed. */
1227 switch (mdev->write_ordering) {
b411b363
PR
1228 case WO_none:
1229 if (rv == FE_RECYCLED)
81e84650 1230 return true;
2451fc3b
PR
1231
1232 /* receiver context, in the writeout path of the other node.
1233 * avoid potential distributed deadlock */
1234 epoch = kmalloc(sizeof(struct drbd_epoch), GFP_NOIO);
1235 if (epoch)
1236 break;
1237 else
1238 dev_warn(DEV, "Allocation of an epoch failed, slowing down\n");
1239 /* Fall through */
b411b363
PR
1240
1241 case WO_bdev_flush:
1242 case WO_drain_io:
b411b363 1243 drbd_wait_ee_list_empty(mdev, &mdev->active_ee);
2451fc3b
PR
1244 drbd_flush(mdev);
1245
1246 if (atomic_read(&mdev->current_epoch->epoch_size)) {
1247 epoch = kmalloc(sizeof(struct drbd_epoch), GFP_NOIO);
1248 if (epoch)
1249 break;
b411b363
PR
1250 }
1251
2451fc3b
PR
1252 epoch = mdev->current_epoch;
1253 wait_event(mdev->ee_wait, atomic_read(&epoch->epoch_size) == 0);
1254
1255 D_ASSERT(atomic_read(&epoch->active) == 0);
1256 D_ASSERT(epoch->flags == 0);
b411b363 1257
81e84650 1258 return true;
2451fc3b
PR
1259 default:
1260 dev_err(DEV, "Strangeness in mdev->write_ordering %d\n", mdev->write_ordering);
81e84650 1261 return false;
b411b363
PR
1262 }
1263
1264 epoch->flags = 0;
1265 atomic_set(&epoch->epoch_size, 0);
1266 atomic_set(&epoch->active, 0);
1267
1268 spin_lock(&mdev->epoch_lock);
1269 if (atomic_read(&mdev->current_epoch->epoch_size)) {
1270 list_add(&epoch->list, &mdev->current_epoch->list);
1271 mdev->current_epoch = epoch;
1272 mdev->epochs++;
b411b363
PR
1273 } else {
1274 /* The current_epoch got recycled while we allocated this one... */
1275 kfree(epoch);
1276 }
1277 spin_unlock(&mdev->epoch_lock);
1278
81e84650 1279 return true;
b411b363
PR
1280}
1281
1282/* used from receive_RSDataReply (recv_resync_read)
1283 * and from receive_Data */
f6ffca9f
AG
1284static struct drbd_peer_request *
1285read_in_block(struct drbd_conf *mdev, u64 id, sector_t sector,
1286 int data_size) __must_hold(local)
b411b363 1287{
6666032a 1288 const sector_t capacity = drbd_get_capacity(mdev->this_bdev);
db830c46 1289 struct drbd_peer_request *peer_req;
b411b363 1290 struct page *page;
45bb912b 1291 int dgs, ds, rr;
a0638456
PR
1292 void *dig_in = mdev->tconn->int_dig_in;
1293 void *dig_vv = mdev->tconn->int_dig_vv;
6b4388ac 1294 unsigned long *data;
b411b363 1295
a0638456
PR
1296 dgs = (mdev->tconn->agreed_pro_version >= 87 && mdev->tconn->integrity_r_tfm) ?
1297 crypto_hash_digestsize(mdev->tconn->integrity_r_tfm) : 0;
b411b363
PR
1298
1299 if (dgs) {
de0ff338 1300 rr = drbd_recv(mdev->tconn, dig_in, dgs);
b411b363 1301 if (rr != dgs) {
0ddc5549
LE
1302 if (!signal_pending(current))
1303 dev_warn(DEV,
1304 "short read receiving data digest: read %d expected %d\n",
1305 rr, dgs);
b411b363
PR
1306 return NULL;
1307 }
1308 }
1309
1310 data_size -= dgs;
1311
841ce241
AG
1312 if (!expect(data_size != 0))
1313 return NULL;
1314 if (!expect(IS_ALIGNED(data_size, 512)))
1315 return NULL;
1316 if (!expect(data_size <= DRBD_MAX_BIO_SIZE))
1317 return NULL;
b411b363 1318
6666032a
LE
1319 /* even though we trust out peer,
1320 * we sometimes have to double check. */
1321 if (sector + (data_size>>9) > capacity) {
fdda6544
LE
1322 dev_err(DEV, "request from peer beyond end of local disk: "
1323 "capacity: %llus < sector: %llus + size: %u\n",
6666032a
LE
1324 (unsigned long long)capacity,
1325 (unsigned long long)sector, data_size);
1326 return NULL;
1327 }
1328
b411b363
PR
1329 /* GFP_NOIO, because we must not cause arbitrary write-out: in a DRBD
1330 * "criss-cross" setup, that might cause write-out on some other DRBD,
1331 * which in turn might block on the other node at this very place. */
db830c46
AG
1332 peer_req = drbd_alloc_ee(mdev, id, sector, data_size, GFP_NOIO);
1333 if (!peer_req)
b411b363 1334 return NULL;
45bb912b 1335
b411b363 1336 ds = data_size;
db830c46 1337 page = peer_req->pages;
45bb912b
LE
1338 page_chain_for_each(page) {
1339 unsigned len = min_t(int, ds, PAGE_SIZE);
6b4388ac 1340 data = kmap(page);
de0ff338 1341 rr = drbd_recv(mdev->tconn, data, len);
0cf9d27e 1342 if (drbd_insert_fault(mdev, DRBD_FAULT_RECEIVE)) {
6b4388ac
PR
1343 dev_err(DEV, "Fault injection: Corrupting data on receive\n");
1344 data[0] = data[0] ^ (unsigned long)-1;
1345 }
b411b363 1346 kunmap(page);
45bb912b 1347 if (rr != len) {
db830c46 1348 drbd_free_ee(mdev, peer_req);
0ddc5549
LE
1349 if (!signal_pending(current))
1350 dev_warn(DEV, "short read receiving data: read %d expected %d\n",
1351 rr, len);
b411b363
PR
1352 return NULL;
1353 }
1354 ds -= rr;
1355 }
1356
1357 if (dgs) {
db830c46 1358 drbd_csum_ee(mdev, mdev->tconn->integrity_r_tfm, peer_req, dig_vv);
b411b363 1359 if (memcmp(dig_in, dig_vv, dgs)) {
470be44a
LE
1360 dev_err(DEV, "Digest integrity check FAILED: %llus +%u\n",
1361 (unsigned long long)sector, data_size);
db830c46 1362 drbd_free_ee(mdev, peer_req);
b411b363
PR
1363 return NULL;
1364 }
1365 }
1366 mdev->recv_cnt += data_size>>9;
db830c46 1367 return peer_req;
b411b363
PR
1368}
1369
1370/* drbd_drain_block() just takes a data block
1371 * out of the socket input buffer, and discards it.
1372 */
1373static int drbd_drain_block(struct drbd_conf *mdev, int data_size)
1374{
1375 struct page *page;
1376 int rr, rv = 1;
1377 void *data;
1378
c3470cde 1379 if (!data_size)
81e84650 1380 return true;
c3470cde 1381
45bb912b 1382 page = drbd_pp_alloc(mdev, 1, 1);
b411b363
PR
1383
1384 data = kmap(page);
1385 while (data_size) {
de0ff338 1386 rr = drbd_recv(mdev->tconn, data, min_t(int, data_size, PAGE_SIZE));
b411b363
PR
1387 if (rr != min_t(int, data_size, PAGE_SIZE)) {
1388 rv = 0;
0ddc5549
LE
1389 if (!signal_pending(current))
1390 dev_warn(DEV,
1391 "short read receiving data: read %d expected %d\n",
1392 rr, min_t(int, data_size, PAGE_SIZE));
b411b363
PR
1393 break;
1394 }
1395 data_size -= rr;
1396 }
1397 kunmap(page);
435f0740 1398 drbd_pp_free(mdev, page, 0);
b411b363
PR
1399 return rv;
1400}
1401
1402static int recv_dless_read(struct drbd_conf *mdev, struct drbd_request *req,
1403 sector_t sector, int data_size)
1404{
1405 struct bio_vec *bvec;
1406 struct bio *bio;
1407 int dgs, rr, i, expect;
a0638456
PR
1408 void *dig_in = mdev->tconn->int_dig_in;
1409 void *dig_vv = mdev->tconn->int_dig_vv;
b411b363 1410
a0638456
PR
1411 dgs = (mdev->tconn->agreed_pro_version >= 87 && mdev->tconn->integrity_r_tfm) ?
1412 crypto_hash_digestsize(mdev->tconn->integrity_r_tfm) : 0;
b411b363
PR
1413
1414 if (dgs) {
de0ff338 1415 rr = drbd_recv(mdev->tconn, dig_in, dgs);
b411b363 1416 if (rr != dgs) {
0ddc5549
LE
1417 if (!signal_pending(current))
1418 dev_warn(DEV,
1419 "short read receiving data reply digest: read %d expected %d\n",
1420 rr, dgs);
b411b363
PR
1421 return 0;
1422 }
1423 }
1424
1425 data_size -= dgs;
1426
1427 /* optimistically update recv_cnt. if receiving fails below,
1428 * we disconnect anyways, and counters will be reset. */
1429 mdev->recv_cnt += data_size>>9;
1430
1431 bio = req->master_bio;
1432 D_ASSERT(sector == bio->bi_sector);
1433
1434 bio_for_each_segment(bvec, bio, i) {
1435 expect = min_t(int, data_size, bvec->bv_len);
de0ff338 1436 rr = drbd_recv(mdev->tconn,
b411b363
PR
1437 kmap(bvec->bv_page)+bvec->bv_offset,
1438 expect);
1439 kunmap(bvec->bv_page);
1440 if (rr != expect) {
0ddc5549
LE
1441 if (!signal_pending(current))
1442 dev_warn(DEV, "short read receiving data reply: "
1443 "read %d expected %d\n",
1444 rr, expect);
b411b363
PR
1445 return 0;
1446 }
1447 data_size -= rr;
1448 }
1449
1450 if (dgs) {
a0638456 1451 drbd_csum_bio(mdev, mdev->tconn->integrity_r_tfm, bio, dig_vv);
b411b363
PR
1452 if (memcmp(dig_in, dig_vv, dgs)) {
1453 dev_err(DEV, "Digest integrity check FAILED. Broken NICs?\n");
1454 return 0;
1455 }
1456 }
1457
1458 D_ASSERT(data_size == 0);
1459 return 1;
1460}
1461
1462/* e_end_resync_block() is called via
1463 * drbd_process_done_ee() by asender only */
00d56944 1464static int e_end_resync_block(struct drbd_work *w, int unused)
b411b363 1465{
8050e6d0
AG
1466 struct drbd_peer_request *peer_req =
1467 container_of(w, struct drbd_peer_request, w);
00d56944 1468 struct drbd_conf *mdev = w->mdev;
db830c46 1469 sector_t sector = peer_req->i.sector;
b411b363
PR
1470 int ok;
1471
db830c46 1472 D_ASSERT(drbd_interval_empty(&peer_req->i));
b411b363 1473
db830c46
AG
1474 if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
1475 drbd_set_in_sync(mdev, sector, peer_req->i.size);
1476 ok = drbd_send_ack(mdev, P_RS_WRITE_ACK, peer_req);
b411b363
PR
1477 } else {
1478 /* Record failure to sync */
db830c46 1479 drbd_rs_failed_io(mdev, sector, peer_req->i.size);
b411b363 1480
db830c46 1481 ok = drbd_send_ack(mdev, P_NEG_ACK, peer_req);
b411b363
PR
1482 }
1483 dec_unacked(mdev);
1484
1485 return ok;
1486}
1487
1488static int recv_resync_read(struct drbd_conf *mdev, sector_t sector, int data_size) __releases(local)
1489{
db830c46 1490 struct drbd_peer_request *peer_req;
b411b363 1491
db830c46
AG
1492 peer_req = read_in_block(mdev, ID_SYNCER, sector, data_size);
1493 if (!peer_req)
45bb912b 1494 goto fail;
b411b363
PR
1495
1496 dec_rs_pending(mdev);
1497
b411b363
PR
1498 inc_unacked(mdev);
1499 /* corresponding dec_unacked() in e_end_resync_block()
1500 * respective _drbd_clear_done_ee */
1501
db830c46 1502 peer_req->w.cb = e_end_resync_block;
45bb912b 1503
87eeee41 1504 spin_lock_irq(&mdev->tconn->req_lock);
db830c46 1505 list_add(&peer_req->w.list, &mdev->sync_ee);
87eeee41 1506 spin_unlock_irq(&mdev->tconn->req_lock);
b411b363 1507
0f0601f4 1508 atomic_add(data_size >> 9, &mdev->rs_sect_ev);
fbe29dec 1509 if (drbd_submit_peer_request(mdev, peer_req, WRITE, DRBD_FAULT_RS_WR) == 0)
81e84650 1510 return true;
b411b363 1511
10f6d992
LE
1512 /* don't care for the reason here */
1513 dev_err(DEV, "submit failed, triggering re-connect\n");
87eeee41 1514 spin_lock_irq(&mdev->tconn->req_lock);
db830c46 1515 list_del(&peer_req->w.list);
87eeee41 1516 spin_unlock_irq(&mdev->tconn->req_lock);
22cc37a9 1517
db830c46 1518 drbd_free_ee(mdev, peer_req);
45bb912b
LE
1519fail:
1520 put_ldev(mdev);
81e84650 1521 return false;
b411b363
PR
1522}
1523
668eebc6 1524static struct drbd_request *
bc9c5c41
AG
1525find_request(struct drbd_conf *mdev, struct rb_root *root, u64 id,
1526 sector_t sector, bool missing_ok, const char *func)
51624585 1527{
51624585
AG
1528 struct drbd_request *req;
1529
bc9c5c41
AG
1530 /* Request object according to our peer */
1531 req = (struct drbd_request *)(unsigned long)id;
5e472264 1532 if (drbd_contains_interval(root, sector, &req->i) && req->i.local)
668eebc6 1533 return req;
c3afd8f5
AG
1534 if (!missing_ok) {
1535 dev_err(DEV, "%s: failed to find request %lu, sector %llus\n", func,
1536 (unsigned long)id, (unsigned long long)sector);
1537 }
51624585
AG
1538 return NULL;
1539}
1540
d8763023
AG
1541static int receive_DataReply(struct drbd_conf *mdev, enum drbd_packet cmd,
1542 unsigned int data_size)
b411b363
PR
1543{
1544 struct drbd_request *req;
1545 sector_t sector;
b411b363 1546 int ok;
e42325a5 1547 struct p_data *p = &mdev->tconn->data.rbuf.data;
b411b363
PR
1548
1549 sector = be64_to_cpu(p->sector);
1550
87eeee41 1551 spin_lock_irq(&mdev->tconn->req_lock);
bc9c5c41 1552 req = find_request(mdev, &mdev->read_requests, p->block_id, sector, false, __func__);
87eeee41 1553 spin_unlock_irq(&mdev->tconn->req_lock);
c3afd8f5 1554 if (unlikely(!req))
81e84650 1555 return false;
b411b363 1556
24c4830c 1557 /* hlist_del(&req->collision) is done in _req_may_be_done, to avoid
b411b363
PR
1558 * special casing it there for the various failure cases.
1559 * still no race with drbd_fail_pending_reads */
1560 ok = recv_dless_read(mdev, req, sector, data_size);
1561
1562 if (ok)
8554df1c 1563 req_mod(req, DATA_RECEIVED);
b411b363
PR
1564 /* else: nothing. handled from drbd_disconnect...
1565 * I don't think we may complete this just yet
1566 * in case we are "on-disconnect: freeze" */
1567
1568 return ok;
1569}
1570
d8763023
AG
1571static int receive_RSDataReply(struct drbd_conf *mdev, enum drbd_packet cmd,
1572 unsigned int data_size)
b411b363
PR
1573{
1574 sector_t sector;
b411b363 1575 int ok;
e42325a5 1576 struct p_data *p = &mdev->tconn->data.rbuf.data;
b411b363
PR
1577
1578 sector = be64_to_cpu(p->sector);
1579 D_ASSERT(p->block_id == ID_SYNCER);
1580
1581 if (get_ldev(mdev)) {
1582 /* data is submitted to disk within recv_resync_read.
1583 * corresponding put_ldev done below on error,
fcefa62e 1584 * or in drbd_peer_request_endio. */
b411b363
PR
1585 ok = recv_resync_read(mdev, sector, data_size);
1586 } else {
1587 if (__ratelimit(&drbd_ratelimit_state))
1588 dev_err(DEV, "Can not write resync data to local disk.\n");
1589
1590 ok = drbd_drain_block(mdev, data_size);
1591
2b2bf214 1592 drbd_send_ack_dp(mdev, P_NEG_ACK, p, data_size);
b411b363
PR
1593 }
1594
778f271d
PR
1595 atomic_add(data_size >> 9, &mdev->rs_sect_in);
1596
b411b363
PR
1597 return ok;
1598}
1599
7be8da07
AG
1600static int w_restart_write(struct drbd_work *w, int cancel)
1601{
1602 struct drbd_request *req = container_of(w, struct drbd_request, w);
1603 struct drbd_conf *mdev = w->mdev;
1604 struct bio *bio;
1605 unsigned long start_time;
1606 unsigned long flags;
1607
1608 spin_lock_irqsave(&mdev->tconn->req_lock, flags);
1609 if (!expect(req->rq_state & RQ_POSTPONED)) {
1610 spin_unlock_irqrestore(&mdev->tconn->req_lock, flags);
1611 return 0;
1612 }
1613 bio = req->master_bio;
1614 start_time = req->start_time;
1615 /* Postponed requests will not have their master_bio completed! */
1616 __req_mod(req, DISCARD_WRITE, NULL);
1617 spin_unlock_irqrestore(&mdev->tconn->req_lock, flags);
1618
1619 while (__drbd_make_request(mdev, bio, start_time))
1620 /* retry */ ;
1621 return 1;
1622}
1623
1624static void restart_conflicting_writes(struct drbd_conf *mdev,
1625 sector_t sector, int size)
1626{
1627 struct drbd_interval *i;
1628 struct drbd_request *req;
1629
1630 drbd_for_each_overlap(i, &mdev->write_requests, sector, size) {
1631 if (!i->local)
1632 continue;
1633 req = container_of(i, struct drbd_request, i);
1634 if (req->rq_state & RQ_LOCAL_PENDING ||
1635 !(req->rq_state & RQ_POSTPONED))
1636 continue;
1637 if (expect(list_empty(&req->w.list))) {
1638 req->w.mdev = mdev;
1639 req->w.cb = w_restart_write;
1640 drbd_queue_work(&mdev->tconn->data.work, &req->w);
1641 }
1642 }
1643}
1644
b411b363
PR
1645/* e_end_block() is called via drbd_process_done_ee().
1646 * this means this function only runs in the asender thread
1647 */
00d56944 1648static int e_end_block(struct drbd_work *w, int cancel)
b411b363 1649{
8050e6d0
AG
1650 struct drbd_peer_request *peer_req =
1651 container_of(w, struct drbd_peer_request, w);
00d56944 1652 struct drbd_conf *mdev = w->mdev;
db830c46 1653 sector_t sector = peer_req->i.sector;
b411b363
PR
1654 int ok = 1, pcmd;
1655
89e58e75 1656 if (mdev->tconn->net_conf->wire_protocol == DRBD_PROT_C) {
db830c46 1657 if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
b411b363
PR
1658 pcmd = (mdev->state.conn >= C_SYNC_SOURCE &&
1659 mdev->state.conn <= C_PAUSED_SYNC_T &&
db830c46 1660 peer_req->flags & EE_MAY_SET_IN_SYNC) ?
b411b363 1661 P_RS_WRITE_ACK : P_WRITE_ACK;
db830c46 1662 ok &= drbd_send_ack(mdev, pcmd, peer_req);
b411b363 1663 if (pcmd == P_RS_WRITE_ACK)
db830c46 1664 drbd_set_in_sync(mdev, sector, peer_req->i.size);
b411b363 1665 } else {
db830c46 1666 ok = drbd_send_ack(mdev, P_NEG_ACK, peer_req);
b411b363
PR
1667 /* we expect it to be marked out of sync anyways...
1668 * maybe assert this? */
1669 }
1670 dec_unacked(mdev);
1671 }
1672 /* we delete from the conflict detection hash _after_ we sent out the
1673 * P_WRITE_ACK / P_NEG_ACK, to get the sequence number right. */
89e58e75 1674 if (mdev->tconn->net_conf->two_primaries) {
87eeee41 1675 spin_lock_irq(&mdev->tconn->req_lock);
db830c46
AG
1676 D_ASSERT(!drbd_interval_empty(&peer_req->i));
1677 drbd_remove_epoch_entry_interval(mdev, peer_req);
7be8da07
AG
1678 if (peer_req->flags & EE_RESTART_REQUESTS)
1679 restart_conflicting_writes(mdev, sector, peer_req->i.size);
87eeee41 1680 spin_unlock_irq(&mdev->tconn->req_lock);
bb3bfe96 1681 } else
db830c46 1682 D_ASSERT(drbd_interval_empty(&peer_req->i));
b411b363 1683
db830c46 1684 drbd_may_finish_epoch(mdev, peer_req->epoch, EV_PUT + (cancel ? EV_CLEANUP : 0));
b411b363
PR
1685
1686 return ok;
1687}
1688
7be8da07 1689static int e_send_ack(struct drbd_work *w, enum drbd_packet ack)
b411b363 1690{
7be8da07 1691 struct drbd_conf *mdev = w->mdev;
8050e6d0
AG
1692 struct drbd_peer_request *peer_req =
1693 container_of(w, struct drbd_peer_request, w);
206d3589 1694 int ok;
b411b363 1695
7be8da07 1696 ok = drbd_send_ack(mdev, ack, peer_req);
b411b363
PR
1697 dec_unacked(mdev);
1698
1699 return ok;
1700}
1701
7be8da07
AG
1702static int e_send_discard_write(struct drbd_work *w, int unused)
1703{
1704 return e_send_ack(w, P_DISCARD_WRITE);
1705}
1706
1707static int e_send_retry_write(struct drbd_work *w, int unused)
1708{
1709 struct drbd_tconn *tconn = w->mdev->tconn;
1710
1711 return e_send_ack(w, tconn->agreed_pro_version >= 100 ?
1712 P_RETRY_WRITE : P_DISCARD_WRITE);
1713}
1714
3e394da1
AG
1715static bool seq_greater(u32 a, u32 b)
1716{
1717 /*
1718 * We assume 32-bit wrap-around here.
1719 * For 24-bit wrap-around, we would have to shift:
1720 * a <<= 8; b <<= 8;
1721 */
1722 return (s32)a - (s32)b > 0;
1723}
1724
1725static u32 seq_max(u32 a, u32 b)
1726{
1727 return seq_greater(a, b) ? a : b;
1728}
1729
7be8da07
AG
1730static bool need_peer_seq(struct drbd_conf *mdev)
1731{
1732 struct drbd_tconn *tconn = mdev->tconn;
1733
1734 /*
1735 * We only need to keep track of the last packet_seq number of our peer
1736 * if we are in dual-primary mode and we have the discard flag set; see
1737 * handle_write_conflicts().
1738 */
1739 return tconn->net_conf->two_primaries &&
1740 test_bit(DISCARD_CONCURRENT, &tconn->flags);
1741}
1742
43ae077d 1743static void update_peer_seq(struct drbd_conf *mdev, unsigned int peer_seq)
3e394da1 1744{
3c13b680 1745 unsigned int newest_peer_seq;
3e394da1 1746
7be8da07
AG
1747 if (need_peer_seq(mdev)) {
1748 spin_lock(&mdev->peer_seq_lock);
3c13b680
LE
1749 newest_peer_seq = seq_max(mdev->peer_seq, peer_seq);
1750 mdev->peer_seq = newest_peer_seq;
7be8da07 1751 spin_unlock(&mdev->peer_seq_lock);
3c13b680
LE
1752 /* wake up only if we actually changed mdev->peer_seq */
1753 if (peer_seq == newest_peer_seq)
7be8da07
AG
1754 wake_up(&mdev->seq_wait);
1755 }
3e394da1
AG
1756}
1757
b411b363
PR
1758/* Called from receive_Data.
1759 * Synchronize packets on sock with packets on msock.
1760 *
1761 * This is here so even when a P_DATA packet traveling via sock overtook an Ack
1762 * packet traveling on msock, they are still processed in the order they have
1763 * been sent.
1764 *
1765 * Note: we don't care for Ack packets overtaking P_DATA packets.
1766 *
1767 * In case packet_seq is larger than mdev->peer_seq number, there are
1768 * outstanding packets on the msock. We wait for them to arrive.
1769 * In case we are the logically next packet, we update mdev->peer_seq
1770 * ourselves. Correctly handles 32bit wrap around.
1771 *
1772 * Assume we have a 10 GBit connection, that is about 1<<30 byte per second,
1773 * about 1<<21 sectors per second. So "worst" case, we have 1<<3 == 8 seconds
1774 * for the 24bit wrap (historical atomic_t guarantee on some archs), and we have
1775 * 1<<9 == 512 seconds aka ages for the 32bit wrap around...
1776 *
1777 * returns 0 if we may process the packet,
1778 * -ERESTARTSYS if we were interrupted (by disconnect signal). */
7be8da07 1779static int wait_for_and_update_peer_seq(struct drbd_conf *mdev, const u32 peer_seq)
b411b363
PR
1780{
1781 DEFINE_WAIT(wait);
b411b363 1782 long timeout;
7be8da07
AG
1783 int ret;
1784
1785 if (!need_peer_seq(mdev))
1786 return 0;
1787
b411b363
PR
1788 spin_lock(&mdev->peer_seq_lock);
1789 for (;;) {
7be8da07
AG
1790 if (!seq_greater(peer_seq - 1, mdev->peer_seq)) {
1791 mdev->peer_seq = seq_max(mdev->peer_seq, peer_seq);
1792 ret = 0;
b411b363 1793 break;
7be8da07 1794 }
b411b363
PR
1795 if (signal_pending(current)) {
1796 ret = -ERESTARTSYS;
1797 break;
1798 }
7be8da07 1799 prepare_to_wait(&mdev->seq_wait, &wait, TASK_INTERRUPTIBLE);
b411b363 1800 spin_unlock(&mdev->peer_seq_lock);
71b1c1eb
AG
1801 timeout = mdev->tconn->net_conf->ping_timeo*HZ/10;
1802 timeout = schedule_timeout(timeout);
b411b363 1803 spin_lock(&mdev->peer_seq_lock);
7be8da07 1804 if (!timeout) {
b411b363 1805 ret = -ETIMEDOUT;
71b1c1eb 1806 dev_err(DEV, "Timed out waiting for missing ack packets; disconnecting\n");
b411b363
PR
1807 break;
1808 }
1809 }
b411b363 1810 spin_unlock(&mdev->peer_seq_lock);
7be8da07 1811 finish_wait(&mdev->seq_wait, &wait);
b411b363
PR
1812 return ret;
1813}
1814
688593c5
LE
1815/* see also bio_flags_to_wire()
1816 * DRBD_REQ_*, because we need to semantically map the flags to data packet
1817 * flags and back. We may replicate to other kernel versions. */
1818static unsigned long wire_flags_to_bio(struct drbd_conf *mdev, u32 dpf)
76d2e7ec 1819{
688593c5
LE
1820 return (dpf & DP_RW_SYNC ? REQ_SYNC : 0) |
1821 (dpf & DP_FUA ? REQ_FUA : 0) |
1822 (dpf & DP_FLUSH ? REQ_FLUSH : 0) |
1823 (dpf & DP_DISCARD ? REQ_DISCARD : 0);
76d2e7ec
PR
1824}
1825
7be8da07
AG
1826static void fail_postponed_requests(struct drbd_conf *mdev, sector_t sector,
1827 unsigned int size)
1828{
1829 struct drbd_interval *i;
1830
1831 repeat:
1832 drbd_for_each_overlap(i, &mdev->write_requests, sector, size) {
1833 struct drbd_request *req;
1834 struct bio_and_error m;
1835
1836 if (!i->local)
1837 continue;
1838 req = container_of(i, struct drbd_request, i);
1839 if (!(req->rq_state & RQ_POSTPONED))
1840 continue;
1841 req->rq_state &= ~RQ_POSTPONED;
1842 __req_mod(req, NEG_ACKED, &m);
1843 spin_unlock_irq(&mdev->tconn->req_lock);
1844 if (m.bio)
1845 complete_master_bio(mdev, &m);
1846 spin_lock_irq(&mdev->tconn->req_lock);
1847 goto repeat;
1848 }
1849}
1850
1851static int handle_write_conflicts(struct drbd_conf *mdev,
1852 struct drbd_peer_request *peer_req)
1853{
1854 struct drbd_tconn *tconn = mdev->tconn;
1855 bool resolve_conflicts = test_bit(DISCARD_CONCURRENT, &tconn->flags);
1856 sector_t sector = peer_req->i.sector;
1857 const unsigned int size = peer_req->i.size;
1858 struct drbd_interval *i;
1859 bool equal;
1860 int err;
1861
1862 /*
1863 * Inserting the peer request into the write_requests tree will prevent
1864 * new conflicting local requests from being added.
1865 */
1866 drbd_insert_interval(&mdev->write_requests, &peer_req->i);
1867
1868 repeat:
1869 drbd_for_each_overlap(i, &mdev->write_requests, sector, size) {
1870 if (i == &peer_req->i)
1871 continue;
1872
1873 if (!i->local) {
1874 /*
1875 * Our peer has sent a conflicting remote request; this
1876 * should not happen in a two-node setup. Wait for the
1877 * earlier peer request to complete.
1878 */
1879 err = drbd_wait_misc(mdev, i);
1880 if (err)
1881 goto out;
1882 goto repeat;
1883 }
1884
1885 equal = i->sector == sector && i->size == size;
1886 if (resolve_conflicts) {
1887 /*
1888 * If the peer request is fully contained within the
1889 * overlapping request, it can be discarded; otherwise,
1890 * it will be retried once all overlapping requests
1891 * have completed.
1892 */
1893 bool discard = i->sector <= sector && i->sector +
1894 (i->size >> 9) >= sector + (size >> 9);
1895
1896 if (!equal)
1897 dev_alert(DEV, "Concurrent writes detected: "
1898 "local=%llus +%u, remote=%llus +%u, "
1899 "assuming %s came first\n",
1900 (unsigned long long)i->sector, i->size,
1901 (unsigned long long)sector, size,
1902 discard ? "local" : "remote");
1903
1904 inc_unacked(mdev);
1905 peer_req->w.cb = discard ? e_send_discard_write :
1906 e_send_retry_write;
1907 list_add_tail(&peer_req->w.list, &mdev->done_ee);
1908 wake_asender(mdev->tconn);
1909
1910 err = -ENOENT;
1911 goto out;
1912 } else {
1913 struct drbd_request *req =
1914 container_of(i, struct drbd_request, i);
1915
1916 if (!equal)
1917 dev_alert(DEV, "Concurrent writes detected: "
1918 "local=%llus +%u, remote=%llus +%u\n",
1919 (unsigned long long)i->sector, i->size,
1920 (unsigned long long)sector, size);
1921
1922 if (req->rq_state & RQ_LOCAL_PENDING ||
1923 !(req->rq_state & RQ_POSTPONED)) {
1924 /*
1925 * Wait for the node with the discard flag to
1926 * decide if this request will be discarded or
1927 * retried. Requests that are discarded will
1928 * disappear from the write_requests tree.
1929 *
1930 * In addition, wait for the conflicting
1931 * request to finish locally before submitting
1932 * the conflicting peer request.
1933 */
1934 err = drbd_wait_misc(mdev, &req->i);
1935 if (err) {
1936 _conn_request_state(mdev->tconn,
1937 NS(conn, C_TIMEOUT),
1938 CS_HARD);
1939 fail_postponed_requests(mdev, sector, size);
1940 goto out;
1941 }
1942 goto repeat;
1943 }
1944 /*
1945 * Remember to restart the conflicting requests after
1946 * the new peer request has completed.
1947 */
1948 peer_req->flags |= EE_RESTART_REQUESTS;
1949 }
1950 }
1951 err = 0;
1952
1953 out:
1954 if (err)
1955 drbd_remove_epoch_entry_interval(mdev, peer_req);
1956 return err;
1957}
1958
b411b363 1959/* mirrored write */
d8763023
AG
1960static int receive_Data(struct drbd_conf *mdev, enum drbd_packet cmd,
1961 unsigned int data_size)
b411b363
PR
1962{
1963 sector_t sector;
db830c46 1964 struct drbd_peer_request *peer_req;
e42325a5 1965 struct p_data *p = &mdev->tconn->data.rbuf.data;
7be8da07 1966 u32 peer_seq = be32_to_cpu(p->seq_num);
b411b363
PR
1967 int rw = WRITE;
1968 u32 dp_flags;
7be8da07 1969 int err;
b411b363 1970
b411b363 1971
7be8da07
AG
1972 if (!get_ldev(mdev)) {
1973 err = wait_for_and_update_peer_seq(mdev, peer_seq);
2b2bf214 1974 drbd_send_ack_dp(mdev, P_NEG_ACK, p, data_size);
b411b363 1975 atomic_inc(&mdev->current_epoch->epoch_size);
7be8da07 1976 return drbd_drain_block(mdev, data_size) && err == 0;
b411b363
PR
1977 }
1978
fcefa62e
AG
1979 /*
1980 * Corresponding put_ldev done either below (on various errors), or in
1981 * drbd_peer_request_endio, if we successfully submit the data at the
1982 * end of this function.
1983 */
b411b363
PR
1984
1985 sector = be64_to_cpu(p->sector);
db830c46
AG
1986 peer_req = read_in_block(mdev, p->block_id, sector, data_size);
1987 if (!peer_req) {
b411b363 1988 put_ldev(mdev);
81e84650 1989 return false;
b411b363
PR
1990 }
1991
db830c46 1992 peer_req->w.cb = e_end_block;
b411b363 1993
688593c5
LE
1994 dp_flags = be32_to_cpu(p->dp_flags);
1995 rw |= wire_flags_to_bio(mdev, dp_flags);
1996
1997 if (dp_flags & DP_MAY_SET_IN_SYNC)
db830c46 1998 peer_req->flags |= EE_MAY_SET_IN_SYNC;
688593c5 1999
b411b363 2000 spin_lock(&mdev->epoch_lock);
db830c46
AG
2001 peer_req->epoch = mdev->current_epoch;
2002 atomic_inc(&peer_req->epoch->epoch_size);
2003 atomic_inc(&peer_req->epoch->active);
b411b363
PR
2004 spin_unlock(&mdev->epoch_lock);
2005
7be8da07
AG
2006 if (mdev->tconn->net_conf->two_primaries) {
2007 err = wait_for_and_update_peer_seq(mdev, peer_seq);
2008 if (err)
b411b363 2009 goto out_interrupted;
87eeee41 2010 spin_lock_irq(&mdev->tconn->req_lock);
7be8da07
AG
2011 err = handle_write_conflicts(mdev, peer_req);
2012 if (err) {
2013 spin_unlock_irq(&mdev->tconn->req_lock);
2014 if (err == -ENOENT) {
b411b363 2015 put_ldev(mdev);
81e84650 2016 return true;
b411b363 2017 }
7be8da07 2018 goto out_interrupted;
b411b363 2019 }
7be8da07
AG
2020 } else
2021 spin_lock_irq(&mdev->tconn->req_lock);
db830c46 2022 list_add(&peer_req->w.list, &mdev->active_ee);
87eeee41 2023 spin_unlock_irq(&mdev->tconn->req_lock);
b411b363 2024
89e58e75 2025 switch (mdev->tconn->net_conf->wire_protocol) {
b411b363
PR
2026 case DRBD_PROT_C:
2027 inc_unacked(mdev);
2028 /* corresponding dec_unacked() in e_end_block()
2029 * respective _drbd_clear_done_ee */
2030 break;
2031 case DRBD_PROT_B:
2032 /* I really don't like it that the receiver thread
2033 * sends on the msock, but anyways */
db830c46 2034 drbd_send_ack(mdev, P_RECV_ACK, peer_req);
b411b363
PR
2035 break;
2036 case DRBD_PROT_A:
2037 /* nothing to do */
2038 break;
2039 }
2040
6719fb03 2041 if (mdev->state.pdsk < D_INCONSISTENT) {
b411b363 2042 /* In case we have the only disk of the cluster, */
db830c46
AG
2043 drbd_set_out_of_sync(mdev, peer_req->i.sector, peer_req->i.size);
2044 peer_req->flags |= EE_CALL_AL_COMPLETE_IO;
2045 peer_req->flags &= ~EE_MAY_SET_IN_SYNC;
2046 drbd_al_begin_io(mdev, peer_req->i.sector);
b411b363
PR
2047 }
2048
fbe29dec 2049 if (drbd_submit_peer_request(mdev, peer_req, rw, DRBD_FAULT_DT_WR) == 0)
81e84650 2050 return true;
b411b363 2051
10f6d992
LE
2052 /* don't care for the reason here */
2053 dev_err(DEV, "submit failed, triggering re-connect\n");
87eeee41 2054 spin_lock_irq(&mdev->tconn->req_lock);
db830c46
AG
2055 list_del(&peer_req->w.list);
2056 drbd_remove_epoch_entry_interval(mdev, peer_req);
87eeee41 2057 spin_unlock_irq(&mdev->tconn->req_lock);
db830c46
AG
2058 if (peer_req->flags & EE_CALL_AL_COMPLETE_IO)
2059 drbd_al_complete_io(mdev, peer_req->i.sector);
22cc37a9 2060
b411b363 2061out_interrupted:
db830c46 2062 drbd_may_finish_epoch(mdev, peer_req->epoch, EV_PUT + EV_CLEANUP);
b411b363 2063 put_ldev(mdev);
db830c46 2064 drbd_free_ee(mdev, peer_req);
81e84650 2065 return false;
b411b363
PR
2066}
2067
0f0601f4
LE
2068/* We may throttle resync, if the lower device seems to be busy,
2069 * and current sync rate is above c_min_rate.
2070 *
2071 * To decide whether or not the lower device is busy, we use a scheme similar
2072 * to MD RAID is_mddev_idle(): if the partition stats reveal "significant"
2073 * (more than 64 sectors) of activity we cannot account for with our own resync
2074 * activity, it obviously is "busy".
2075 *
2076 * The current sync rate used here uses only the most recent two step marks,
2077 * to have a short time average so we can react faster.
2078 */
e3555d85 2079int drbd_rs_should_slow_down(struct drbd_conf *mdev, sector_t sector)
0f0601f4
LE
2080{
2081 struct gendisk *disk = mdev->ldev->backing_bdev->bd_contains->bd_disk;
2082 unsigned long db, dt, dbdt;
e3555d85 2083 struct lc_element *tmp;
0f0601f4
LE
2084 int curr_events;
2085 int throttle = 0;
2086
2087 /* feature disabled? */
2088 if (mdev->sync_conf.c_min_rate == 0)
2089 return 0;
2090
e3555d85
PR
2091 spin_lock_irq(&mdev->al_lock);
2092 tmp = lc_find(mdev->resync, BM_SECT_TO_EXT(sector));
2093 if (tmp) {
2094 struct bm_extent *bm_ext = lc_entry(tmp, struct bm_extent, lce);
2095 if (test_bit(BME_PRIORITY, &bm_ext->flags)) {
2096 spin_unlock_irq(&mdev->al_lock);
2097 return 0;
2098 }
2099 /* Do not slow down if app IO is already waiting for this extent */
2100 }
2101 spin_unlock_irq(&mdev->al_lock);
2102
0f0601f4
LE
2103 curr_events = (int)part_stat_read(&disk->part0, sectors[0]) +
2104 (int)part_stat_read(&disk->part0, sectors[1]) -
2105 atomic_read(&mdev->rs_sect_ev);
e3555d85 2106
0f0601f4
LE
2107 if (!mdev->rs_last_events || curr_events - mdev->rs_last_events > 64) {
2108 unsigned long rs_left;
2109 int i;
2110
2111 mdev->rs_last_events = curr_events;
2112
2113 /* sync speed average over the last 2*DRBD_SYNC_MARK_STEP,
2114 * approx. */
2649f080
LE
2115 i = (mdev->rs_last_mark + DRBD_SYNC_MARKS-1) % DRBD_SYNC_MARKS;
2116
2117 if (mdev->state.conn == C_VERIFY_S || mdev->state.conn == C_VERIFY_T)
2118 rs_left = mdev->ov_left;
2119 else
2120 rs_left = drbd_bm_total_weight(mdev) - mdev->rs_failed;
0f0601f4
LE
2121
2122 dt = ((long)jiffies - (long)mdev->rs_mark_time[i]) / HZ;
2123 if (!dt)
2124 dt++;
2125 db = mdev->rs_mark_left[i] - rs_left;
2126 dbdt = Bit2KB(db/dt);
2127
2128 if (dbdt > mdev->sync_conf.c_min_rate)
2129 throttle = 1;
2130 }
2131 return throttle;
2132}
2133
2134
d8763023
AG
2135static int receive_DataRequest(struct drbd_conf *mdev, enum drbd_packet cmd,
2136 unsigned int digest_size)
b411b363
PR
2137{
2138 sector_t sector;
2139 const sector_t capacity = drbd_get_capacity(mdev->this_bdev);
db830c46 2140 struct drbd_peer_request *peer_req;
b411b363 2141 struct digest_info *di = NULL;
b18b37be 2142 int size, verb;
b411b363 2143 unsigned int fault_type;
e42325a5 2144 struct p_block_req *p = &mdev->tconn->data.rbuf.block_req;
b411b363
PR
2145
2146 sector = be64_to_cpu(p->sector);
2147 size = be32_to_cpu(p->blksize);
2148
c670a398 2149 if (size <= 0 || !IS_ALIGNED(size, 512) || size > DRBD_MAX_BIO_SIZE) {
b411b363
PR
2150 dev_err(DEV, "%s:%d: sector: %llus, size: %u\n", __FILE__, __LINE__,
2151 (unsigned long long)sector, size);
81e84650 2152 return false;
b411b363
PR
2153 }
2154 if (sector + (size>>9) > capacity) {
2155 dev_err(DEV, "%s:%d: sector: %llus, size: %u\n", __FILE__, __LINE__,
2156 (unsigned long long)sector, size);
81e84650 2157 return false;
b411b363
PR
2158 }
2159
2160 if (!get_ldev_if_state(mdev, D_UP_TO_DATE)) {
b18b37be
PR
2161 verb = 1;
2162 switch (cmd) {
2163 case P_DATA_REQUEST:
2164 drbd_send_ack_rp(mdev, P_NEG_DREPLY, p);
2165 break;
2166 case P_RS_DATA_REQUEST:
2167 case P_CSUM_RS_REQUEST:
2168 case P_OV_REQUEST:
2169 drbd_send_ack_rp(mdev, P_NEG_RS_DREPLY , p);
2170 break;
2171 case P_OV_REPLY:
2172 verb = 0;
2173 dec_rs_pending(mdev);
2174 drbd_send_ack_ex(mdev, P_OV_RESULT, sector, size, ID_IN_SYNC);
2175 break;
2176 default:
2177 dev_err(DEV, "unexpected command (%s) in receive_DataRequest\n",
2178 cmdname(cmd));
2179 }
2180 if (verb && __ratelimit(&drbd_ratelimit_state))
b411b363
PR
2181 dev_err(DEV, "Can not satisfy peer's read request, "
2182 "no local data.\n");
b18b37be 2183
a821cc4a
LE
2184 /* drain possibly payload */
2185 return drbd_drain_block(mdev, digest_size);
b411b363
PR
2186 }
2187
2188 /* GFP_NOIO, because we must not cause arbitrary write-out: in a DRBD
2189 * "criss-cross" setup, that might cause write-out on some other DRBD,
2190 * which in turn might block on the other node at this very place. */
db830c46
AG
2191 peer_req = drbd_alloc_ee(mdev, p->block_id, sector, size, GFP_NOIO);
2192 if (!peer_req) {
b411b363 2193 put_ldev(mdev);
81e84650 2194 return false;
b411b363
PR
2195 }
2196
02918be2 2197 switch (cmd) {
b411b363 2198 case P_DATA_REQUEST:
db830c46 2199 peer_req->w.cb = w_e_end_data_req;
b411b363 2200 fault_type = DRBD_FAULT_DT_RD;
80a40e43
LE
2201 /* application IO, don't drbd_rs_begin_io */
2202 goto submit;
2203
b411b363 2204 case P_RS_DATA_REQUEST:
db830c46 2205 peer_req->w.cb = w_e_end_rsdata_req;
b411b363 2206 fault_type = DRBD_FAULT_RS_RD;
5f9915bb
LE
2207 /* used in the sector offset progress display */
2208 mdev->bm_resync_fo = BM_SECT_TO_BIT(sector);
b411b363
PR
2209 break;
2210
2211 case P_OV_REPLY:
2212 case P_CSUM_RS_REQUEST:
2213 fault_type = DRBD_FAULT_RS_RD;
b411b363
PR
2214 di = kmalloc(sizeof(*di) + digest_size, GFP_NOIO);
2215 if (!di)
2216 goto out_free_e;
2217
2218 di->digest_size = digest_size;
2219 di->digest = (((char *)di)+sizeof(struct digest_info));
2220
db830c46
AG
2221 peer_req->digest = di;
2222 peer_req->flags |= EE_HAS_DIGEST;
c36c3ced 2223
de0ff338 2224 if (drbd_recv(mdev->tconn, di->digest, digest_size) != digest_size)
b411b363
PR
2225 goto out_free_e;
2226
02918be2 2227 if (cmd == P_CSUM_RS_REQUEST) {
31890f4a 2228 D_ASSERT(mdev->tconn->agreed_pro_version >= 89);
db830c46 2229 peer_req->w.cb = w_e_end_csum_rs_req;
5f9915bb
LE
2230 /* used in the sector offset progress display */
2231 mdev->bm_resync_fo = BM_SECT_TO_BIT(sector);
02918be2 2232 } else if (cmd == P_OV_REPLY) {
2649f080
LE
2233 /* track progress, we may need to throttle */
2234 atomic_add(size >> 9, &mdev->rs_sect_in);
db830c46 2235 peer_req->w.cb = w_e_end_ov_reply;
b411b363 2236 dec_rs_pending(mdev);
0f0601f4
LE
2237 /* drbd_rs_begin_io done when we sent this request,
2238 * but accounting still needs to be done. */
2239 goto submit_for_resync;
b411b363
PR
2240 }
2241 break;
2242
2243 case P_OV_REQUEST:
b411b363 2244 if (mdev->ov_start_sector == ~(sector_t)0 &&
31890f4a 2245 mdev->tconn->agreed_pro_version >= 90) {
de228bba
LE
2246 unsigned long now = jiffies;
2247 int i;
b411b363
PR
2248 mdev->ov_start_sector = sector;
2249 mdev->ov_position = sector;
30b743a2
LE
2250 mdev->ov_left = drbd_bm_bits(mdev) - BM_SECT_TO_BIT(sector);
2251 mdev->rs_total = mdev->ov_left;
de228bba
LE
2252 for (i = 0; i < DRBD_SYNC_MARKS; i++) {
2253 mdev->rs_mark_left[i] = mdev->ov_left;
2254 mdev->rs_mark_time[i] = now;
2255 }
b411b363
PR
2256 dev_info(DEV, "Online Verify start sector: %llu\n",
2257 (unsigned long long)sector);
2258 }
db830c46 2259 peer_req->w.cb = w_e_end_ov_req;
b411b363 2260 fault_type = DRBD_FAULT_RS_RD;
b411b363
PR
2261 break;
2262
b411b363
PR
2263 default:
2264 dev_err(DEV, "unexpected command (%s) in receive_DataRequest\n",
02918be2 2265 cmdname(cmd));
b411b363 2266 fault_type = DRBD_FAULT_MAX;
80a40e43 2267 goto out_free_e;
b411b363
PR
2268 }
2269
0f0601f4
LE
2270 /* Throttle, drbd_rs_begin_io and submit should become asynchronous
2271 * wrt the receiver, but it is not as straightforward as it may seem.
2272 * Various places in the resync start and stop logic assume resync
2273 * requests are processed in order, requeuing this on the worker thread
2274 * introduces a bunch of new code for synchronization between threads.
2275 *
2276 * Unlimited throttling before drbd_rs_begin_io may stall the resync
2277 * "forever", throttling after drbd_rs_begin_io will lock that extent
2278 * for application writes for the same time. For now, just throttle
2279 * here, where the rest of the code expects the receiver to sleep for
2280 * a while, anyways.
2281 */
2282
2283 /* Throttle before drbd_rs_begin_io, as that locks out application IO;
2284 * this defers syncer requests for some time, before letting at least
2285 * on request through. The resync controller on the receiving side
2286 * will adapt to the incoming rate accordingly.
2287 *
2288 * We cannot throttle here if remote is Primary/SyncTarget:
2289 * we would also throttle its application reads.
2290 * In that case, throttling is done on the SyncTarget only.
2291 */
e3555d85
PR
2292 if (mdev->state.peer != R_PRIMARY && drbd_rs_should_slow_down(mdev, sector))
2293 schedule_timeout_uninterruptible(HZ/10);
2294 if (drbd_rs_begin_io(mdev, sector))
80a40e43 2295 goto out_free_e;
b411b363 2296
0f0601f4
LE
2297submit_for_resync:
2298 atomic_add(size >> 9, &mdev->rs_sect_ev);
2299
80a40e43 2300submit:
b411b363 2301 inc_unacked(mdev);
87eeee41 2302 spin_lock_irq(&mdev->tconn->req_lock);
db830c46 2303 list_add_tail(&peer_req->w.list, &mdev->read_ee);
87eeee41 2304 spin_unlock_irq(&mdev->tconn->req_lock);
b411b363 2305
fbe29dec 2306 if (drbd_submit_peer_request(mdev, peer_req, READ, fault_type) == 0)
81e84650 2307 return true;
b411b363 2308
10f6d992
LE
2309 /* don't care for the reason here */
2310 dev_err(DEV, "submit failed, triggering re-connect\n");
87eeee41 2311 spin_lock_irq(&mdev->tconn->req_lock);
db830c46 2312 list_del(&peer_req->w.list);
87eeee41 2313 spin_unlock_irq(&mdev->tconn->req_lock);
22cc37a9
LE
2314 /* no drbd_rs_complete_io(), we are dropping the connection anyways */
2315
b411b363 2316out_free_e:
b411b363 2317 put_ldev(mdev);
db830c46 2318 drbd_free_ee(mdev, peer_req);
81e84650 2319 return false;
b411b363
PR
2320}
2321
2322static int drbd_asb_recover_0p(struct drbd_conf *mdev) __must_hold(local)
2323{
2324 int self, peer, rv = -100;
2325 unsigned long ch_self, ch_peer;
2326
2327 self = mdev->ldev->md.uuid[UI_BITMAP] & 1;
2328 peer = mdev->p_uuid[UI_BITMAP] & 1;
2329
2330 ch_peer = mdev->p_uuid[UI_SIZE];
2331 ch_self = mdev->comm_bm_set;
2332
89e58e75 2333 switch (mdev->tconn->net_conf->after_sb_0p) {
b411b363
PR
2334 case ASB_CONSENSUS:
2335 case ASB_DISCARD_SECONDARY:
2336 case ASB_CALL_HELPER:
2337 dev_err(DEV, "Configuration error.\n");
2338 break;
2339 case ASB_DISCONNECT:
2340 break;
2341 case ASB_DISCARD_YOUNGER_PRI:
2342 if (self == 0 && peer == 1) {
2343 rv = -1;
2344 break;
2345 }
2346 if (self == 1 && peer == 0) {
2347 rv = 1;
2348 break;
2349 }
2350 /* Else fall through to one of the other strategies... */
2351 case ASB_DISCARD_OLDER_PRI:
2352 if (self == 0 && peer == 1) {
2353 rv = 1;
2354 break;
2355 }
2356 if (self == 1 && peer == 0) {
2357 rv = -1;
2358 break;
2359 }
2360 /* Else fall through to one of the other strategies... */
ad19bf6e 2361 dev_warn(DEV, "Discard younger/older primary did not find a decision\n"
b411b363
PR
2362 "Using discard-least-changes instead\n");
2363 case ASB_DISCARD_ZERO_CHG:
2364 if (ch_peer == 0 && ch_self == 0) {
25703f83 2365 rv = test_bit(DISCARD_CONCURRENT, &mdev->tconn->flags)
b411b363
PR
2366 ? -1 : 1;
2367 break;
2368 } else {
2369 if (ch_peer == 0) { rv = 1; break; }
2370 if (ch_self == 0) { rv = -1; break; }
2371 }
89e58e75 2372 if (mdev->tconn->net_conf->after_sb_0p == ASB_DISCARD_ZERO_CHG)
b411b363
PR
2373 break;
2374 case ASB_DISCARD_LEAST_CHG:
2375 if (ch_self < ch_peer)
2376 rv = -1;
2377 else if (ch_self > ch_peer)
2378 rv = 1;
2379 else /* ( ch_self == ch_peer ) */
2380 /* Well, then use something else. */
25703f83 2381 rv = test_bit(DISCARD_CONCURRENT, &mdev->tconn->flags)
b411b363
PR
2382 ? -1 : 1;
2383 break;
2384 case ASB_DISCARD_LOCAL:
2385 rv = -1;
2386 break;
2387 case ASB_DISCARD_REMOTE:
2388 rv = 1;
2389 }
2390
2391 return rv;
2392}
2393
2394static int drbd_asb_recover_1p(struct drbd_conf *mdev) __must_hold(local)
2395{
6184ea21 2396 int hg, rv = -100;
b411b363 2397
89e58e75 2398 switch (mdev->tconn->net_conf->after_sb_1p) {
b411b363
PR
2399 case ASB_DISCARD_YOUNGER_PRI:
2400 case ASB_DISCARD_OLDER_PRI:
2401 case ASB_DISCARD_LEAST_CHG:
2402 case ASB_DISCARD_LOCAL:
2403 case ASB_DISCARD_REMOTE:
2404 dev_err(DEV, "Configuration error.\n");
2405 break;
2406 case ASB_DISCONNECT:
2407 break;
2408 case ASB_CONSENSUS:
2409 hg = drbd_asb_recover_0p(mdev);
2410 if (hg == -1 && mdev->state.role == R_SECONDARY)
2411 rv = hg;
2412 if (hg == 1 && mdev->state.role == R_PRIMARY)
2413 rv = hg;
2414 break;
2415 case ASB_VIOLENTLY:
2416 rv = drbd_asb_recover_0p(mdev);
2417 break;
2418 case ASB_DISCARD_SECONDARY:
2419 return mdev->state.role == R_PRIMARY ? 1 : -1;
2420 case ASB_CALL_HELPER:
2421 hg = drbd_asb_recover_0p(mdev);
2422 if (hg == -1 && mdev->state.role == R_PRIMARY) {
bb437946
AG
2423 enum drbd_state_rv rv2;
2424
2425 drbd_set_role(mdev, R_SECONDARY, 0);
b411b363
PR
2426 /* drbd_change_state() does not sleep while in SS_IN_TRANSIENT_STATE,
2427 * we might be here in C_WF_REPORT_PARAMS which is transient.
2428 * we do not need to wait for the after state change work either. */
bb437946
AG
2429 rv2 = drbd_change_state(mdev, CS_VERBOSE, NS(role, R_SECONDARY));
2430 if (rv2 != SS_SUCCESS) {
b411b363
PR
2431 drbd_khelper(mdev, "pri-lost-after-sb");
2432 } else {
2433 dev_warn(DEV, "Successfully gave up primary role.\n");
2434 rv = hg;
2435 }
2436 } else
2437 rv = hg;
2438 }
2439
2440 return rv;
2441}
2442
2443static int drbd_asb_recover_2p(struct drbd_conf *mdev) __must_hold(local)
2444{
6184ea21 2445 int hg, rv = -100;
b411b363 2446
89e58e75 2447 switch (mdev->tconn->net_conf->after_sb_2p) {
b411b363
PR
2448 case ASB_DISCARD_YOUNGER_PRI:
2449 case ASB_DISCARD_OLDER_PRI:
2450 case ASB_DISCARD_LEAST_CHG:
2451 case ASB_DISCARD_LOCAL:
2452 case ASB_DISCARD_REMOTE:
2453 case ASB_CONSENSUS:
2454 case ASB_DISCARD_SECONDARY:
2455 dev_err(DEV, "Configuration error.\n");
2456 break;
2457 case ASB_VIOLENTLY:
2458 rv = drbd_asb_recover_0p(mdev);
2459 break;
2460 case ASB_DISCONNECT:
2461 break;
2462 case ASB_CALL_HELPER:
2463 hg = drbd_asb_recover_0p(mdev);
2464 if (hg == -1) {
bb437946
AG
2465 enum drbd_state_rv rv2;
2466
b411b363
PR
2467 /* drbd_change_state() does not sleep while in SS_IN_TRANSIENT_STATE,
2468 * we might be here in C_WF_REPORT_PARAMS which is transient.
2469 * we do not need to wait for the after state change work either. */
bb437946
AG
2470 rv2 = drbd_change_state(mdev, CS_VERBOSE, NS(role, R_SECONDARY));
2471 if (rv2 != SS_SUCCESS) {
b411b363
PR
2472 drbd_khelper(mdev, "pri-lost-after-sb");
2473 } else {
2474 dev_warn(DEV, "Successfully gave up primary role.\n");
2475 rv = hg;
2476 }
2477 } else
2478 rv = hg;
2479 }
2480
2481 return rv;
2482}
2483
2484static void drbd_uuid_dump(struct drbd_conf *mdev, char *text, u64 *uuid,
2485 u64 bits, u64 flags)
2486{
2487 if (!uuid) {
2488 dev_info(DEV, "%s uuid info vanished while I was looking!\n", text);
2489 return;
2490 }
2491 dev_info(DEV, "%s %016llX:%016llX:%016llX:%016llX bits:%llu flags:%llX\n",
2492 text,
2493 (unsigned long long)uuid[UI_CURRENT],
2494 (unsigned long long)uuid[UI_BITMAP],
2495 (unsigned long long)uuid[UI_HISTORY_START],
2496 (unsigned long long)uuid[UI_HISTORY_END],
2497 (unsigned long long)bits,
2498 (unsigned long long)flags);
2499}
2500
2501/*
2502 100 after split brain try auto recover
2503 2 C_SYNC_SOURCE set BitMap
2504 1 C_SYNC_SOURCE use BitMap
2505 0 no Sync
2506 -1 C_SYNC_TARGET use BitMap
2507 -2 C_SYNC_TARGET set BitMap
2508 -100 after split brain, disconnect
2509-1000 unrelated data
4a23f264
PR
2510-1091 requires proto 91
2511-1096 requires proto 96
b411b363
PR
2512 */
2513static int drbd_uuid_compare(struct drbd_conf *mdev, int *rule_nr) __must_hold(local)
2514{
2515 u64 self, peer;
2516 int i, j;
2517
2518 self = mdev->ldev->md.uuid[UI_CURRENT] & ~((u64)1);
2519 peer = mdev->p_uuid[UI_CURRENT] & ~((u64)1);
2520
2521 *rule_nr = 10;
2522 if (self == UUID_JUST_CREATED && peer == UUID_JUST_CREATED)
2523 return 0;
2524
2525 *rule_nr = 20;
2526 if ((self == UUID_JUST_CREATED || self == (u64)0) &&
2527 peer != UUID_JUST_CREATED)
2528 return -2;
2529
2530 *rule_nr = 30;
2531 if (self != UUID_JUST_CREATED &&
2532 (peer == UUID_JUST_CREATED || peer == (u64)0))
2533 return 2;
2534
2535 if (self == peer) {
2536 int rct, dc; /* roles at crash time */
2537
2538 if (mdev->p_uuid[UI_BITMAP] == (u64)0 && mdev->ldev->md.uuid[UI_BITMAP] != (u64)0) {
2539
31890f4a 2540 if (mdev->tconn->agreed_pro_version < 91)
4a23f264 2541 return -1091;
b411b363
PR
2542
2543 if ((mdev->ldev->md.uuid[UI_BITMAP] & ~((u64)1)) == (mdev->p_uuid[UI_HISTORY_START] & ~((u64)1)) &&
2544 (mdev->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) == (mdev->p_uuid[UI_HISTORY_START + 1] & ~((u64)1))) {
2545 dev_info(DEV, "was SyncSource, missed the resync finished event, corrected myself:\n");
2546 drbd_uuid_set_bm(mdev, 0UL);
2547
2548 drbd_uuid_dump(mdev, "self", mdev->ldev->md.uuid,
2549 mdev->state.disk >= D_NEGOTIATING ? drbd_bm_total_weight(mdev) : 0, 0);
2550 *rule_nr = 34;
2551 } else {
2552 dev_info(DEV, "was SyncSource (peer failed to write sync_uuid)\n");
2553 *rule_nr = 36;
2554 }
2555
2556 return 1;
2557 }
2558
2559 if (mdev->ldev->md.uuid[UI_BITMAP] == (u64)0 && mdev->p_uuid[UI_BITMAP] != (u64)0) {
2560
31890f4a 2561 if (mdev->tconn->agreed_pro_version < 91)
4a23f264 2562 return -1091;
b411b363
PR
2563
2564 if ((mdev->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) == (mdev->p_uuid[UI_BITMAP] & ~((u64)1)) &&
2565 (mdev->ldev->md.uuid[UI_HISTORY_START + 1] & ~((u64)1)) == (mdev->p_uuid[UI_HISTORY_START] & ~((u64)1))) {
2566 dev_info(DEV, "was SyncTarget, peer missed the resync finished event, corrected peer:\n");
2567
2568 mdev->p_uuid[UI_HISTORY_START + 1] = mdev->p_uuid[UI_HISTORY_START];
2569 mdev->p_uuid[UI_HISTORY_START] = mdev->p_uuid[UI_BITMAP];
2570 mdev->p_uuid[UI_BITMAP] = 0UL;
2571
2572 drbd_uuid_dump(mdev, "peer", mdev->p_uuid, mdev->p_uuid[UI_SIZE], mdev->p_uuid[UI_FLAGS]);
2573 *rule_nr = 35;
2574 } else {
2575 dev_info(DEV, "was SyncTarget (failed to write sync_uuid)\n");
2576 *rule_nr = 37;
2577 }
2578
2579 return -1;
2580 }
2581
2582 /* Common power [off|failure] */
2583 rct = (test_bit(CRASHED_PRIMARY, &mdev->flags) ? 1 : 0) +
2584 (mdev->p_uuid[UI_FLAGS] & 2);
2585 /* lowest bit is set when we were primary,
2586 * next bit (weight 2) is set when peer was primary */
2587 *rule_nr = 40;
2588
2589 switch (rct) {
2590 case 0: /* !self_pri && !peer_pri */ return 0;
2591 case 1: /* self_pri && !peer_pri */ return 1;
2592 case 2: /* !self_pri && peer_pri */ return -1;
2593 case 3: /* self_pri && peer_pri */
25703f83 2594 dc = test_bit(DISCARD_CONCURRENT, &mdev->tconn->flags);
b411b363
PR
2595 return dc ? -1 : 1;
2596 }
2597 }
2598
2599 *rule_nr = 50;
2600 peer = mdev->p_uuid[UI_BITMAP] & ~((u64)1);
2601 if (self == peer)
2602 return -1;
2603
2604 *rule_nr = 51;
2605 peer = mdev->p_uuid[UI_HISTORY_START] & ~((u64)1);
2606 if (self == peer) {
31890f4a 2607 if (mdev->tconn->agreed_pro_version < 96 ?
4a23f264
PR
2608 (mdev->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) ==
2609 (mdev->p_uuid[UI_HISTORY_START + 1] & ~((u64)1)) :
2610 peer + UUID_NEW_BM_OFFSET == (mdev->p_uuid[UI_BITMAP] & ~((u64)1))) {
b411b363
PR
2611 /* The last P_SYNC_UUID did not get though. Undo the last start of
2612 resync as sync source modifications of the peer's UUIDs. */
2613
31890f4a 2614 if (mdev->tconn->agreed_pro_version < 91)
4a23f264 2615 return -1091;
b411b363
PR
2616
2617 mdev->p_uuid[UI_BITMAP] = mdev->p_uuid[UI_HISTORY_START];
2618 mdev->p_uuid[UI_HISTORY_START] = mdev->p_uuid[UI_HISTORY_START + 1];
4a23f264
PR
2619
2620 dev_info(DEV, "Did not got last syncUUID packet, corrected:\n");
2621 drbd_uuid_dump(mdev, "peer", mdev->p_uuid, mdev->p_uuid[UI_SIZE], mdev->p_uuid[UI_FLAGS]);
2622
b411b363
PR
2623 return -1;
2624 }
2625 }
2626
2627 *rule_nr = 60;
2628 self = mdev->ldev->md.uuid[UI_CURRENT] & ~((u64)1);
2629 for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
2630 peer = mdev->p_uuid[i] & ~((u64)1);
2631 if (self == peer)
2632 return -2;
2633 }
2634
2635 *rule_nr = 70;
2636 self = mdev->ldev->md.uuid[UI_BITMAP] & ~((u64)1);
2637 peer = mdev->p_uuid[UI_CURRENT] & ~((u64)1);
2638 if (self == peer)
2639 return 1;
2640
2641 *rule_nr = 71;
2642 self = mdev->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1);
2643 if (self == peer) {
31890f4a 2644 if (mdev->tconn->agreed_pro_version < 96 ?
4a23f264
PR
2645 (mdev->ldev->md.uuid[UI_HISTORY_START + 1] & ~((u64)1)) ==
2646 (mdev->p_uuid[UI_HISTORY_START] & ~((u64)1)) :
2647 self + UUID_NEW_BM_OFFSET == (mdev->ldev->md.uuid[UI_BITMAP] & ~((u64)1))) {
b411b363
PR
2648 /* The last P_SYNC_UUID did not get though. Undo the last start of
2649 resync as sync source modifications of our UUIDs. */
2650
31890f4a 2651 if (mdev->tconn->agreed_pro_version < 91)
4a23f264 2652 return -1091;
b411b363
PR
2653
2654 _drbd_uuid_set(mdev, UI_BITMAP, mdev->ldev->md.uuid[UI_HISTORY_START]);
2655 _drbd_uuid_set(mdev, UI_HISTORY_START, mdev->ldev->md.uuid[UI_HISTORY_START + 1]);
2656
4a23f264 2657 dev_info(DEV, "Last syncUUID did not get through, corrected:\n");
b411b363
PR
2658 drbd_uuid_dump(mdev, "self", mdev->ldev->md.uuid,
2659 mdev->state.disk >= D_NEGOTIATING ? drbd_bm_total_weight(mdev) : 0, 0);
2660
2661 return 1;
2662 }
2663 }
2664
2665
2666 *rule_nr = 80;
d8c2a36b 2667 peer = mdev->p_uuid[UI_CURRENT] & ~((u64)1);
b411b363
PR
2668 for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
2669 self = mdev->ldev->md.uuid[i] & ~((u64)1);
2670 if (self == peer)
2671 return 2;
2672 }
2673
2674 *rule_nr = 90;
2675 self = mdev->ldev->md.uuid[UI_BITMAP] & ~((u64)1);
2676 peer = mdev->p_uuid[UI_BITMAP] & ~((u64)1);
2677 if (self == peer && self != ((u64)0))
2678 return 100;
2679
2680 *rule_nr = 100;
2681 for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
2682 self = mdev->ldev->md.uuid[i] & ~((u64)1);
2683 for (j = UI_HISTORY_START; j <= UI_HISTORY_END; j++) {
2684 peer = mdev->p_uuid[j] & ~((u64)1);
2685 if (self == peer)
2686 return -100;
2687 }
2688 }
2689
2690 return -1000;
2691}
2692
2693/* drbd_sync_handshake() returns the new conn state on success, or
2694 CONN_MASK (-1) on failure.
2695 */
2696static enum drbd_conns drbd_sync_handshake(struct drbd_conf *mdev, enum drbd_role peer_role,
2697 enum drbd_disk_state peer_disk) __must_hold(local)
2698{
2699 int hg, rule_nr;
2700 enum drbd_conns rv = C_MASK;
2701 enum drbd_disk_state mydisk;
2702
2703 mydisk = mdev->state.disk;
2704 if (mydisk == D_NEGOTIATING)
2705 mydisk = mdev->new_state_tmp.disk;
2706
2707 dev_info(DEV, "drbd_sync_handshake:\n");
2708 drbd_uuid_dump(mdev, "self", mdev->ldev->md.uuid, mdev->comm_bm_set, 0);
2709 drbd_uuid_dump(mdev, "peer", mdev->p_uuid,
2710 mdev->p_uuid[UI_SIZE], mdev->p_uuid[UI_FLAGS]);
2711
2712 hg = drbd_uuid_compare(mdev, &rule_nr);
2713
2714 dev_info(DEV, "uuid_compare()=%d by rule %d\n", hg, rule_nr);
2715
2716 if (hg == -1000) {
2717 dev_alert(DEV, "Unrelated data, aborting!\n");
2718 return C_MASK;
2719 }
4a23f264
PR
2720 if (hg < -1000) {
2721 dev_alert(DEV, "To resolve this both sides have to support at least protocol %d\n", -hg - 1000);
b411b363
PR
2722 return C_MASK;
2723 }
2724
2725 if ((mydisk == D_INCONSISTENT && peer_disk > D_INCONSISTENT) ||
2726 (peer_disk == D_INCONSISTENT && mydisk > D_INCONSISTENT)) {
2727 int f = (hg == -100) || abs(hg) == 2;
2728 hg = mydisk > D_INCONSISTENT ? 1 : -1;
2729 if (f)
2730 hg = hg*2;
2731 dev_info(DEV, "Becoming sync %s due to disk states.\n",
2732 hg > 0 ? "source" : "target");
2733 }
2734
3a11a487
AG
2735 if (abs(hg) == 100)
2736 drbd_khelper(mdev, "initial-split-brain");
2737
89e58e75 2738 if (hg == 100 || (hg == -100 && mdev->tconn->net_conf->always_asbp)) {
b411b363
PR
2739 int pcount = (mdev->state.role == R_PRIMARY)
2740 + (peer_role == R_PRIMARY);
2741 int forced = (hg == -100);
2742
2743 switch (pcount) {
2744 case 0:
2745 hg = drbd_asb_recover_0p(mdev);
2746 break;
2747 case 1:
2748 hg = drbd_asb_recover_1p(mdev);
2749 break;
2750 case 2:
2751 hg = drbd_asb_recover_2p(mdev);
2752 break;
2753 }
2754 if (abs(hg) < 100) {
2755 dev_warn(DEV, "Split-Brain detected, %d primaries, "
2756 "automatically solved. Sync from %s node\n",
2757 pcount, (hg < 0) ? "peer" : "this");
2758 if (forced) {
2759 dev_warn(DEV, "Doing a full sync, since"
2760 " UUIDs where ambiguous.\n");
2761 hg = hg*2;
2762 }
2763 }
2764 }
2765
2766 if (hg == -100) {
89e58e75 2767 if (mdev->tconn->net_conf->want_lose && !(mdev->p_uuid[UI_FLAGS]&1))
b411b363 2768 hg = -1;
89e58e75 2769 if (!mdev->tconn->net_conf->want_lose && (mdev->p_uuid[UI_FLAGS]&1))
b411b363
PR
2770 hg = 1;
2771
2772 if (abs(hg) < 100)
2773 dev_warn(DEV, "Split-Brain detected, manually solved. "
2774 "Sync from %s node\n",
2775 (hg < 0) ? "peer" : "this");
2776 }
2777
2778 if (hg == -100) {
580b9767
LE
2779 /* FIXME this log message is not correct if we end up here
2780 * after an attempted attach on a diskless node.
2781 * We just refuse to attach -- well, we drop the "connection"
2782 * to that disk, in a way... */
3a11a487 2783 dev_alert(DEV, "Split-Brain detected but unresolved, dropping connection!\n");
b411b363
PR
2784 drbd_khelper(mdev, "split-brain");
2785 return C_MASK;
2786 }
2787
2788 if (hg > 0 && mydisk <= D_INCONSISTENT) {
2789 dev_err(DEV, "I shall become SyncSource, but I am inconsistent!\n");
2790 return C_MASK;
2791 }
2792
2793 if (hg < 0 && /* by intention we do not use mydisk here. */
2794 mdev->state.role == R_PRIMARY && mdev->state.disk >= D_CONSISTENT) {
89e58e75 2795 switch (mdev->tconn->net_conf->rr_conflict) {
b411b363
PR
2796 case ASB_CALL_HELPER:
2797 drbd_khelper(mdev, "pri-lost");
2798 /* fall through */
2799 case ASB_DISCONNECT:
2800 dev_err(DEV, "I shall become SyncTarget, but I am primary!\n");
2801 return C_MASK;
2802 case ASB_VIOLENTLY:
2803 dev_warn(DEV, "Becoming SyncTarget, violating the stable-data"
2804 "assumption\n");
2805 }
2806 }
2807
8169e41b 2808 if (mdev->tconn->net_conf->dry_run || test_bit(CONN_DRY_RUN, &mdev->tconn->flags)) {
cf14c2e9
PR
2809 if (hg == 0)
2810 dev_info(DEV, "dry-run connect: No resync, would become Connected immediately.\n");
2811 else
2812 dev_info(DEV, "dry-run connect: Would become %s, doing a %s resync.",
2813 drbd_conn_str(hg > 0 ? C_SYNC_SOURCE : C_SYNC_TARGET),
2814 abs(hg) >= 2 ? "full" : "bit-map based");
2815 return C_MASK;
2816 }
2817
b411b363
PR
2818 if (abs(hg) >= 2) {
2819 dev_info(DEV, "Writing the whole bitmap, full sync required after drbd_sync_handshake.\n");
20ceb2b2
LE
2820 if (drbd_bitmap_io(mdev, &drbd_bmio_set_n_write, "set_n_write from sync_handshake",
2821 BM_LOCKED_SET_ALLOWED))
b411b363
PR
2822 return C_MASK;
2823 }
2824
2825 if (hg > 0) { /* become sync source. */
2826 rv = C_WF_BITMAP_S;
2827 } else if (hg < 0) { /* become sync target */
2828 rv = C_WF_BITMAP_T;
2829 } else {
2830 rv = C_CONNECTED;
2831 if (drbd_bm_total_weight(mdev)) {
2832 dev_info(DEV, "No resync, but %lu bits in bitmap!\n",
2833 drbd_bm_total_weight(mdev));
2834 }
2835 }
2836
2837 return rv;
2838}
2839
2840/* returns 1 if invalid */
2841static int cmp_after_sb(enum drbd_after_sb_p peer, enum drbd_after_sb_p self)
2842{
2843 /* ASB_DISCARD_REMOTE - ASB_DISCARD_LOCAL is valid */
2844 if ((peer == ASB_DISCARD_REMOTE && self == ASB_DISCARD_LOCAL) ||
2845 (self == ASB_DISCARD_REMOTE && peer == ASB_DISCARD_LOCAL))
2846 return 0;
2847
2848 /* any other things with ASB_DISCARD_REMOTE or ASB_DISCARD_LOCAL are invalid */
2849 if (peer == ASB_DISCARD_REMOTE || peer == ASB_DISCARD_LOCAL ||
2850 self == ASB_DISCARD_REMOTE || self == ASB_DISCARD_LOCAL)
2851 return 1;
2852
2853 /* everything else is valid if they are equal on both sides. */
2854 if (peer == self)
2855 return 0;
2856
2857 /* everything es is invalid. */
2858 return 1;
2859}
2860
7204624c 2861static int receive_protocol(struct drbd_tconn *tconn, enum drbd_packet cmd,
d8763023 2862 unsigned int data_size)
b411b363 2863{
7204624c 2864 struct p_protocol *p = &tconn->data.rbuf.protocol;
b411b363 2865 int p_proto, p_after_sb_0p, p_after_sb_1p, p_after_sb_2p;
cf14c2e9 2866 int p_want_lose, p_two_primaries, cf;
b411b363
PR
2867 char p_integrity_alg[SHARED_SECRET_MAX] = "";
2868
b411b363
PR
2869 p_proto = be32_to_cpu(p->protocol);
2870 p_after_sb_0p = be32_to_cpu(p->after_sb_0p);
2871 p_after_sb_1p = be32_to_cpu(p->after_sb_1p);
2872 p_after_sb_2p = be32_to_cpu(p->after_sb_2p);
b411b363 2873 p_two_primaries = be32_to_cpu(p->two_primaries);
cf14c2e9
PR
2874 cf = be32_to_cpu(p->conn_flags);
2875 p_want_lose = cf & CF_WANT_LOSE;
2876
7204624c 2877 clear_bit(CONN_DRY_RUN, &tconn->flags);
cf14c2e9
PR
2878
2879 if (cf & CF_DRY_RUN)
7204624c 2880 set_bit(CONN_DRY_RUN, &tconn->flags);
b411b363 2881
7204624c
PR
2882 if (p_proto != tconn->net_conf->wire_protocol) {
2883 conn_err(tconn, "incompatible communication protocols\n");
b411b363
PR
2884 goto disconnect;
2885 }
2886
7204624c
PR
2887 if (cmp_after_sb(p_after_sb_0p, tconn->net_conf->after_sb_0p)) {
2888 conn_err(tconn, "incompatible after-sb-0pri settings\n");
b411b363
PR
2889 goto disconnect;
2890 }
2891
7204624c
PR
2892 if (cmp_after_sb(p_after_sb_1p, tconn->net_conf->after_sb_1p)) {
2893 conn_err(tconn, "incompatible after-sb-1pri settings\n");
b411b363
PR
2894 goto disconnect;
2895 }
2896
7204624c
PR
2897 if (cmp_after_sb(p_after_sb_2p, tconn->net_conf->after_sb_2p)) {
2898 conn_err(tconn, "incompatible after-sb-2pri settings\n");
b411b363
PR
2899 goto disconnect;
2900 }
2901
7204624c
PR
2902 if (p_want_lose && tconn->net_conf->want_lose) {
2903 conn_err(tconn, "both sides have the 'want_lose' flag set\n");
b411b363
PR
2904 goto disconnect;
2905 }
2906
7204624c
PR
2907 if (p_two_primaries != tconn->net_conf->two_primaries) {
2908 conn_err(tconn, "incompatible setting of the two-primaries options\n");
b411b363
PR
2909 goto disconnect;
2910 }
2911
7204624c
PR
2912 if (tconn->agreed_pro_version >= 87) {
2913 unsigned char *my_alg = tconn->net_conf->integrity_alg;
b411b363 2914
7204624c 2915 if (drbd_recv(tconn, p_integrity_alg, data_size) != data_size)
81e84650 2916 return false;
b411b363
PR
2917
2918 p_integrity_alg[SHARED_SECRET_MAX-1] = 0;
2919 if (strcmp(p_integrity_alg, my_alg)) {
7204624c 2920 conn_err(tconn, "incompatible setting of the data-integrity-alg\n");
b411b363
PR
2921 goto disconnect;
2922 }
7204624c 2923 conn_info(tconn, "data-integrity-alg: %s\n",
b411b363
PR
2924 my_alg[0] ? my_alg : (unsigned char *)"<not-used>");
2925 }
2926
81e84650 2927 return true;
b411b363
PR
2928
2929disconnect:
7204624c 2930 conn_request_state(tconn, NS(conn, C_DISCONNECTING), CS_HARD);
81e84650 2931 return false;
b411b363
PR
2932}
2933
2934/* helper function
2935 * input: alg name, feature name
2936 * return: NULL (alg name was "")
2937 * ERR_PTR(error) if something goes wrong
2938 * or the crypto hash ptr, if it worked out ok. */
2939struct crypto_hash *drbd_crypto_alloc_digest_safe(const struct drbd_conf *mdev,
2940 const char *alg, const char *name)
2941{
2942 struct crypto_hash *tfm;
2943
2944 if (!alg[0])
2945 return NULL;
2946
2947 tfm = crypto_alloc_hash(alg, 0, CRYPTO_ALG_ASYNC);
2948 if (IS_ERR(tfm)) {
2949 dev_err(DEV, "Can not allocate \"%s\" as %s (reason: %ld)\n",
2950 alg, name, PTR_ERR(tfm));
2951 return tfm;
2952 }
2953 if (!drbd_crypto_is_hash(crypto_hash_tfm(tfm))) {
2954 crypto_free_hash(tfm);
2955 dev_err(DEV, "\"%s\" is not a digest (%s)\n", alg, name);
2956 return ERR_PTR(-EINVAL);
2957 }
2958 return tfm;
2959}
2960
d8763023
AG
2961static int receive_SyncParam(struct drbd_conf *mdev, enum drbd_packet cmd,
2962 unsigned int packet_size)
b411b363 2963{
81e84650 2964 int ok = true;
e42325a5 2965 struct p_rs_param_95 *p = &mdev->tconn->data.rbuf.rs_param_95;
b411b363
PR
2966 unsigned int header_size, data_size, exp_max_sz;
2967 struct crypto_hash *verify_tfm = NULL;
2968 struct crypto_hash *csums_tfm = NULL;
31890f4a 2969 const int apv = mdev->tconn->agreed_pro_version;
778f271d
PR
2970 int *rs_plan_s = NULL;
2971 int fifo_size = 0;
b411b363
PR
2972
2973 exp_max_sz = apv <= 87 ? sizeof(struct p_rs_param)
2974 : apv == 88 ? sizeof(struct p_rs_param)
2975 + SHARED_SECRET_MAX
8e26f9cc
PR
2976 : apv <= 94 ? sizeof(struct p_rs_param_89)
2977 : /* apv >= 95 */ sizeof(struct p_rs_param_95);
b411b363 2978
02918be2 2979 if (packet_size > exp_max_sz) {
b411b363 2980 dev_err(DEV, "SyncParam packet too long: received %u, expected <= %u bytes\n",
02918be2 2981 packet_size, exp_max_sz);
81e84650 2982 return false;
b411b363
PR
2983 }
2984
2985 if (apv <= 88) {
257d0af6 2986 header_size = sizeof(struct p_rs_param) - sizeof(struct p_header);
02918be2 2987 data_size = packet_size - header_size;
8e26f9cc 2988 } else if (apv <= 94) {
257d0af6 2989 header_size = sizeof(struct p_rs_param_89) - sizeof(struct p_header);
02918be2 2990 data_size = packet_size - header_size;
b411b363 2991 D_ASSERT(data_size == 0);
8e26f9cc 2992 } else {
257d0af6 2993 header_size = sizeof(struct p_rs_param_95) - sizeof(struct p_header);
02918be2 2994 data_size = packet_size - header_size;
b411b363
PR
2995 D_ASSERT(data_size == 0);
2996 }
2997
2998 /* initialize verify_alg and csums_alg */
2999 memset(p->verify_alg, 0, 2 * SHARED_SECRET_MAX);
3000
de0ff338 3001 if (drbd_recv(mdev->tconn, &p->head.payload, header_size) != header_size)
81e84650 3002 return false;
b411b363
PR
3003
3004 mdev->sync_conf.rate = be32_to_cpu(p->rate);
3005
3006 if (apv >= 88) {
3007 if (apv == 88) {
3008 if (data_size > SHARED_SECRET_MAX) {
3009 dev_err(DEV, "verify-alg too long, "
3010 "peer wants %u, accepting only %u byte\n",
3011 data_size, SHARED_SECRET_MAX);
81e84650 3012 return false;
b411b363
PR
3013 }
3014
de0ff338 3015 if (drbd_recv(mdev->tconn, p->verify_alg, data_size) != data_size)
81e84650 3016 return false;
b411b363
PR
3017
3018 /* we expect NUL terminated string */
3019 /* but just in case someone tries to be evil */
3020 D_ASSERT(p->verify_alg[data_size-1] == 0);
3021 p->verify_alg[data_size-1] = 0;
3022
3023 } else /* apv >= 89 */ {
3024 /* we still expect NUL terminated strings */
3025 /* but just in case someone tries to be evil */
3026 D_ASSERT(p->verify_alg[SHARED_SECRET_MAX-1] == 0);
3027 D_ASSERT(p->csums_alg[SHARED_SECRET_MAX-1] == 0);
3028 p->verify_alg[SHARED_SECRET_MAX-1] = 0;
3029 p->csums_alg[SHARED_SECRET_MAX-1] = 0;
3030 }
3031
3032 if (strcmp(mdev->sync_conf.verify_alg, p->verify_alg)) {
3033 if (mdev->state.conn == C_WF_REPORT_PARAMS) {
3034 dev_err(DEV, "Different verify-alg settings. me=\"%s\" peer=\"%s\"\n",
3035 mdev->sync_conf.verify_alg, p->verify_alg);
3036 goto disconnect;
3037 }
3038 verify_tfm = drbd_crypto_alloc_digest_safe(mdev,
3039 p->verify_alg, "verify-alg");
3040 if (IS_ERR(verify_tfm)) {
3041 verify_tfm = NULL;
3042 goto disconnect;
3043 }
3044 }
3045
3046 if (apv >= 89 && strcmp(mdev->sync_conf.csums_alg, p->csums_alg)) {
3047 if (mdev->state.conn == C_WF_REPORT_PARAMS) {
3048 dev_err(DEV, "Different csums-alg settings. me=\"%s\" peer=\"%s\"\n",
3049 mdev->sync_conf.csums_alg, p->csums_alg);
3050 goto disconnect;
3051 }
3052 csums_tfm = drbd_crypto_alloc_digest_safe(mdev,
3053 p->csums_alg, "csums-alg");
3054 if (IS_ERR(csums_tfm)) {
3055 csums_tfm = NULL;
3056 goto disconnect;
3057 }
3058 }
3059
8e26f9cc
PR
3060 if (apv > 94) {
3061 mdev->sync_conf.rate = be32_to_cpu(p->rate);
3062 mdev->sync_conf.c_plan_ahead = be32_to_cpu(p->c_plan_ahead);
3063 mdev->sync_conf.c_delay_target = be32_to_cpu(p->c_delay_target);
3064 mdev->sync_conf.c_fill_target = be32_to_cpu(p->c_fill_target);
3065 mdev->sync_conf.c_max_rate = be32_to_cpu(p->c_max_rate);
778f271d
PR
3066
3067 fifo_size = (mdev->sync_conf.c_plan_ahead * 10 * SLEEP_TIME) / HZ;
3068 if (fifo_size != mdev->rs_plan_s.size && fifo_size > 0) {
3069 rs_plan_s = kzalloc(sizeof(int) * fifo_size, GFP_KERNEL);
3070 if (!rs_plan_s) {
3071 dev_err(DEV, "kmalloc of fifo_buffer failed");
3072 goto disconnect;
3073 }
3074 }
8e26f9cc 3075 }
b411b363
PR
3076
3077 spin_lock(&mdev->peer_seq_lock);
3078 /* lock against drbd_nl_syncer_conf() */
3079 if (verify_tfm) {
3080 strcpy(mdev->sync_conf.verify_alg, p->verify_alg);
3081 mdev->sync_conf.verify_alg_len = strlen(p->verify_alg) + 1;
3082 crypto_free_hash(mdev->verify_tfm);
3083 mdev->verify_tfm = verify_tfm;
3084 dev_info(DEV, "using verify-alg: \"%s\"\n", p->verify_alg);
3085 }
3086 if (csums_tfm) {
3087 strcpy(mdev->sync_conf.csums_alg, p->csums_alg);
3088 mdev->sync_conf.csums_alg_len = strlen(p->csums_alg) + 1;
3089 crypto_free_hash(mdev->csums_tfm);
3090 mdev->csums_tfm = csums_tfm;
3091 dev_info(DEV, "using csums-alg: \"%s\"\n", p->csums_alg);
3092 }
778f271d
PR
3093 if (fifo_size != mdev->rs_plan_s.size) {
3094 kfree(mdev->rs_plan_s.values);
3095 mdev->rs_plan_s.values = rs_plan_s;
3096 mdev->rs_plan_s.size = fifo_size;
3097 mdev->rs_planed = 0;
3098 }
b411b363
PR
3099 spin_unlock(&mdev->peer_seq_lock);
3100 }
3101
3102 return ok;
3103disconnect:
3104 /* just for completeness: actually not needed,
3105 * as this is not reached if csums_tfm was ok. */
3106 crypto_free_hash(csums_tfm);
3107 /* but free the verify_tfm again, if csums_tfm did not work out */
3108 crypto_free_hash(verify_tfm);
38fa9988 3109 conn_request_state(mdev->tconn, NS(conn, C_DISCONNECTING), CS_HARD);
81e84650 3110 return false;
b411b363
PR
3111}
3112
b411b363
PR
3113/* warn if the arguments differ by more than 12.5% */
3114static void warn_if_differ_considerably(struct drbd_conf *mdev,
3115 const char *s, sector_t a, sector_t b)
3116{
3117 sector_t d;
3118 if (a == 0 || b == 0)
3119 return;
3120 d = (a > b) ? (a - b) : (b - a);
3121 if (d > (a>>3) || d > (b>>3))
3122 dev_warn(DEV, "Considerable difference in %s: %llus vs. %llus\n", s,
3123 (unsigned long long)a, (unsigned long long)b);
3124}
3125
d8763023
AG
3126static int receive_sizes(struct drbd_conf *mdev, enum drbd_packet cmd,
3127 unsigned int data_size)
b411b363 3128{
e42325a5 3129 struct p_sizes *p = &mdev->tconn->data.rbuf.sizes;
b411b363 3130 enum determine_dev_size dd = unchanged;
b411b363
PR
3131 sector_t p_size, p_usize, my_usize;
3132 int ldsc = 0; /* local disk size changed */
e89b591c 3133 enum dds_flags ddsf;
b411b363 3134
b411b363
PR
3135 p_size = be64_to_cpu(p->d_size);
3136 p_usize = be64_to_cpu(p->u_size);
3137
b411b363
PR
3138 /* just store the peer's disk size for now.
3139 * we still need to figure out whether we accept that. */
3140 mdev->p_size = p_size;
3141
b411b363
PR
3142 if (get_ldev(mdev)) {
3143 warn_if_differ_considerably(mdev, "lower level device sizes",
3144 p_size, drbd_get_max_capacity(mdev->ldev));
3145 warn_if_differ_considerably(mdev, "user requested size",
3146 p_usize, mdev->ldev->dc.disk_size);
3147
3148 /* if this is the first connect, or an otherwise expected
3149 * param exchange, choose the minimum */
3150 if (mdev->state.conn == C_WF_REPORT_PARAMS)
3151 p_usize = min_not_zero((sector_t)mdev->ldev->dc.disk_size,
3152 p_usize);
3153
3154 my_usize = mdev->ldev->dc.disk_size;
3155
3156 if (mdev->ldev->dc.disk_size != p_usize) {
3157 mdev->ldev->dc.disk_size = p_usize;
3158 dev_info(DEV, "Peer sets u_size to %lu sectors\n",
3159 (unsigned long)mdev->ldev->dc.disk_size);
3160 }
3161
3162 /* Never shrink a device with usable data during connect.
3163 But allow online shrinking if we are connected. */
a393db6f 3164 if (drbd_new_dev_size(mdev, mdev->ldev, 0) <
b411b363
PR
3165 drbd_get_capacity(mdev->this_bdev) &&
3166 mdev->state.disk >= D_OUTDATED &&
3167 mdev->state.conn < C_CONNECTED) {
3168 dev_err(DEV, "The peer's disk size is too small!\n");
38fa9988 3169 conn_request_state(mdev->tconn, NS(conn, C_DISCONNECTING), CS_HARD);
b411b363
PR
3170 mdev->ldev->dc.disk_size = my_usize;
3171 put_ldev(mdev);
81e84650 3172 return false;
b411b363
PR
3173 }
3174 put_ldev(mdev);
3175 }
b411b363 3176
e89b591c 3177 ddsf = be16_to_cpu(p->dds_flags);
b411b363 3178 if (get_ldev(mdev)) {
24c4830c 3179 dd = drbd_determine_dev_size(mdev, ddsf);
b411b363
PR
3180 put_ldev(mdev);
3181 if (dd == dev_size_error)
81e84650 3182 return false;
b411b363
PR
3183 drbd_md_sync(mdev);
3184 } else {
3185 /* I am diskless, need to accept the peer's size. */
3186 drbd_set_my_capacity(mdev, p_size);
3187 }
3188
99432fcc
PR
3189 mdev->peer_max_bio_size = be32_to_cpu(p->max_bio_size);
3190 drbd_reconsider_max_bio_size(mdev);
3191
b411b363
PR
3192 if (get_ldev(mdev)) {
3193 if (mdev->ldev->known_size != drbd_get_capacity(mdev->ldev->backing_bdev)) {
3194 mdev->ldev->known_size = drbd_get_capacity(mdev->ldev->backing_bdev);
3195 ldsc = 1;
3196 }
3197
b411b363
PR
3198 put_ldev(mdev);
3199 }
3200
3201 if (mdev->state.conn > C_WF_REPORT_PARAMS) {
3202 if (be64_to_cpu(p->c_size) !=
3203 drbd_get_capacity(mdev->this_bdev) || ldsc) {
3204 /* we have different sizes, probably peer
3205 * needs to know my new size... */
e89b591c 3206 drbd_send_sizes(mdev, 0, ddsf);
b411b363
PR
3207 }
3208 if (test_and_clear_bit(RESIZE_PENDING, &mdev->flags) ||
3209 (dd == grew && mdev->state.conn == C_CONNECTED)) {
3210 if (mdev->state.pdsk >= D_INCONSISTENT &&
e89b591c
PR
3211 mdev->state.disk >= D_INCONSISTENT) {
3212 if (ddsf & DDSF_NO_RESYNC)
3213 dev_info(DEV, "Resync of new storage suppressed with --assume-clean\n");
3214 else
3215 resync_after_online_grow(mdev);
3216 } else
b411b363
PR
3217 set_bit(RESYNC_AFTER_NEG, &mdev->flags);
3218 }
3219 }
3220
81e84650 3221 return true;
b411b363
PR
3222}
3223
d8763023
AG
3224static int receive_uuids(struct drbd_conf *mdev, enum drbd_packet cmd,
3225 unsigned int data_size)
b411b363 3226{
e42325a5 3227 struct p_uuids *p = &mdev->tconn->data.rbuf.uuids;
b411b363 3228 u64 *p_uuid;
62b0da3a 3229 int i, updated_uuids = 0;
b411b363 3230
b411b363
PR
3231 p_uuid = kmalloc(sizeof(u64)*UI_EXTENDED_SIZE, GFP_NOIO);
3232
3233 for (i = UI_CURRENT; i < UI_EXTENDED_SIZE; i++)
3234 p_uuid[i] = be64_to_cpu(p->uuid[i]);
3235
3236 kfree(mdev->p_uuid);
3237 mdev->p_uuid = p_uuid;
3238
3239 if (mdev->state.conn < C_CONNECTED &&
3240 mdev->state.disk < D_INCONSISTENT &&
3241 mdev->state.role == R_PRIMARY &&
3242 (mdev->ed_uuid & ~((u64)1)) != (p_uuid[UI_CURRENT] & ~((u64)1))) {
3243 dev_err(DEV, "Can only connect to data with current UUID=%016llX\n",
3244 (unsigned long long)mdev->ed_uuid);
38fa9988 3245 conn_request_state(mdev->tconn, NS(conn, C_DISCONNECTING), CS_HARD);
81e84650 3246 return false;
b411b363
PR
3247 }
3248
3249 if (get_ldev(mdev)) {
3250 int skip_initial_sync =
3251 mdev->state.conn == C_CONNECTED &&
31890f4a 3252 mdev->tconn->agreed_pro_version >= 90 &&
b411b363
PR
3253 mdev->ldev->md.uuid[UI_CURRENT] == UUID_JUST_CREATED &&
3254 (p_uuid[UI_FLAGS] & 8);
3255 if (skip_initial_sync) {
3256 dev_info(DEV, "Accepted new current UUID, preparing to skip initial sync\n");
3257 drbd_bitmap_io(mdev, &drbd_bmio_clear_n_write,
20ceb2b2
LE
3258 "clear_n_write from receive_uuids",
3259 BM_LOCKED_TEST_ALLOWED);
b411b363
PR
3260 _drbd_uuid_set(mdev, UI_CURRENT, p_uuid[UI_CURRENT]);
3261 _drbd_uuid_set(mdev, UI_BITMAP, 0);
3262 _drbd_set_state(_NS2(mdev, disk, D_UP_TO_DATE, pdsk, D_UP_TO_DATE),
3263 CS_VERBOSE, NULL);
3264 drbd_md_sync(mdev);
62b0da3a 3265 updated_uuids = 1;
b411b363
PR
3266 }
3267 put_ldev(mdev);
18a50fa2
PR
3268 } else if (mdev->state.disk < D_INCONSISTENT &&
3269 mdev->state.role == R_PRIMARY) {
3270 /* I am a diskless primary, the peer just created a new current UUID
3271 for me. */
62b0da3a 3272 updated_uuids = drbd_set_ed_uuid(mdev, p_uuid[UI_CURRENT]);
b411b363
PR
3273 }
3274
3275 /* Before we test for the disk state, we should wait until an eventually
3276 ongoing cluster wide state change is finished. That is important if
3277 we are primary and are detaching from our disk. We need to see the
3278 new disk state... */
8410da8f
PR
3279 mutex_lock(mdev->state_mutex);
3280 mutex_unlock(mdev->state_mutex);
b411b363 3281 if (mdev->state.conn >= C_CONNECTED && mdev->state.disk < D_INCONSISTENT)
62b0da3a
LE
3282 updated_uuids |= drbd_set_ed_uuid(mdev, p_uuid[UI_CURRENT]);
3283
3284 if (updated_uuids)
3285 drbd_print_uuids(mdev, "receiver updated UUIDs to");
b411b363 3286
81e84650 3287 return true;
b411b363
PR
3288}
3289
3290/**
3291 * convert_state() - Converts the peer's view of the cluster state to our point of view
3292 * @ps: The state as seen by the peer.
3293 */
3294static union drbd_state convert_state(union drbd_state ps)
3295{
3296 union drbd_state ms;
3297
3298 static enum drbd_conns c_tab[] = {
3299 [C_CONNECTED] = C_CONNECTED,
3300
3301 [C_STARTING_SYNC_S] = C_STARTING_SYNC_T,
3302 [C_STARTING_SYNC_T] = C_STARTING_SYNC_S,
3303 [C_DISCONNECTING] = C_TEAR_DOWN, /* C_NETWORK_FAILURE, */
3304 [C_VERIFY_S] = C_VERIFY_T,
3305 [C_MASK] = C_MASK,
3306 };
3307
3308 ms.i = ps.i;
3309
3310 ms.conn = c_tab[ps.conn];
3311 ms.peer = ps.role;
3312 ms.role = ps.peer;
3313 ms.pdsk = ps.disk;
3314 ms.disk = ps.pdsk;
3315 ms.peer_isp = (ps.aftr_isp | ps.user_isp);
3316
3317 return ms;
3318}
3319
d8763023
AG
3320static int receive_req_state(struct drbd_conf *mdev, enum drbd_packet cmd,
3321 unsigned int data_size)
b411b363 3322{
e42325a5 3323 struct p_req_state *p = &mdev->tconn->data.rbuf.req_state;
b411b363 3324 union drbd_state mask, val;
bf885f8a 3325 enum drbd_state_rv rv;
b411b363 3326
b411b363
PR
3327 mask.i = be32_to_cpu(p->mask);
3328 val.i = be32_to_cpu(p->val);
3329
25703f83 3330 if (test_bit(DISCARD_CONCURRENT, &mdev->tconn->flags) &&
8410da8f 3331 mutex_is_locked(mdev->state_mutex)) {
b411b363 3332 drbd_send_sr_reply(mdev, SS_CONCURRENT_ST_CHG);
81e84650 3333 return true;
b411b363
PR
3334 }
3335
3336 mask = convert_state(mask);
3337 val = convert_state(val);
3338
dfafcc8a
PR
3339 rv = drbd_change_state(mdev, CS_VERBOSE, mask, val);
3340 drbd_send_sr_reply(mdev, rv);
b411b363 3341
b411b363
PR
3342 drbd_md_sync(mdev);
3343
81e84650 3344 return true;
b411b363
PR
3345}
3346
dfafcc8a
PR
3347static int receive_req_conn_state(struct drbd_tconn *tconn, enum drbd_packet cmd,
3348 unsigned int data_size)
3349{
3350 struct p_req_state *p = &tconn->data.rbuf.req_state;
3351 union drbd_state mask, val;
3352 enum drbd_state_rv rv;
3353
3354 mask.i = be32_to_cpu(p->mask);
3355 val.i = be32_to_cpu(p->val);
3356
3357 if (test_bit(DISCARD_CONCURRENT, &tconn->flags) &&
3358 mutex_is_locked(&tconn->cstate_mutex)) {
3359 conn_send_sr_reply(tconn, SS_CONCURRENT_ST_CHG);
3360 return true;
3361 }
3362
3363 mask = convert_state(mask);
3364 val = convert_state(val);
3365
3366 rv = conn_request_state(tconn, mask, val, CS_VERBOSE | CS_LOCAL_ONLY);
3367 conn_send_sr_reply(tconn, rv);
3368
3369 return true;
3370}
3371
d8763023
AG
3372static int receive_state(struct drbd_conf *mdev, enum drbd_packet cmd,
3373 unsigned int data_size)
b411b363 3374{
e42325a5 3375 struct p_state *p = &mdev->tconn->data.rbuf.state;
4ac4aada 3376 union drbd_state os, ns, peer_state;
b411b363 3377 enum drbd_disk_state real_peer_disk;
65d922c3 3378 enum chg_state_flags cs_flags;
b411b363
PR
3379 int rv;
3380
b411b363
PR
3381 peer_state.i = be32_to_cpu(p->state);
3382
3383 real_peer_disk = peer_state.disk;
3384 if (peer_state.disk == D_NEGOTIATING) {
3385 real_peer_disk = mdev->p_uuid[UI_FLAGS] & 4 ? D_INCONSISTENT : D_CONSISTENT;
3386 dev_info(DEV, "real peer disk state = %s\n", drbd_disk_str(real_peer_disk));
3387 }
3388
87eeee41 3389 spin_lock_irq(&mdev->tconn->req_lock);
b411b363 3390 retry:
4ac4aada 3391 os = ns = mdev->state;
87eeee41 3392 spin_unlock_irq(&mdev->tconn->req_lock);
b411b363 3393
e9ef7bb6
LE
3394 /* peer says his disk is uptodate, while we think it is inconsistent,
3395 * and this happens while we think we have a sync going on. */
3396 if (os.pdsk == D_INCONSISTENT && real_peer_disk == D_UP_TO_DATE &&
3397 os.conn > C_CONNECTED && os.disk == D_UP_TO_DATE) {
3398 /* If we are (becoming) SyncSource, but peer is still in sync
3399 * preparation, ignore its uptodate-ness to avoid flapping, it
3400 * will change to inconsistent once the peer reaches active
3401 * syncing states.
3402 * It may have changed syncer-paused flags, however, so we
3403 * cannot ignore this completely. */
3404 if (peer_state.conn > C_CONNECTED &&
3405 peer_state.conn < C_SYNC_SOURCE)
3406 real_peer_disk = D_INCONSISTENT;
3407
3408 /* if peer_state changes to connected at the same time,
3409 * it explicitly notifies us that it finished resync.
3410 * Maybe we should finish it up, too? */
3411 else if (os.conn >= C_SYNC_SOURCE &&
3412 peer_state.conn == C_CONNECTED) {
3413 if (drbd_bm_total_weight(mdev) <= mdev->rs_failed)
3414 drbd_resync_finished(mdev);
81e84650 3415 return true;
e9ef7bb6
LE
3416 }
3417 }
3418
3419 /* peer says his disk is inconsistent, while we think it is uptodate,
3420 * and this happens while the peer still thinks we have a sync going on,
3421 * but we think we are already done with the sync.
3422 * We ignore this to avoid flapping pdsk.
3423 * This should not happen, if the peer is a recent version of drbd. */
3424 if (os.pdsk == D_UP_TO_DATE && real_peer_disk == D_INCONSISTENT &&
3425 os.conn == C_CONNECTED && peer_state.conn > C_SYNC_SOURCE)
3426 real_peer_disk = D_UP_TO_DATE;
3427
4ac4aada
LE
3428 if (ns.conn == C_WF_REPORT_PARAMS)
3429 ns.conn = C_CONNECTED;
b411b363 3430
67531718
PR
3431 if (peer_state.conn == C_AHEAD)
3432 ns.conn = C_BEHIND;
3433
b411b363
PR
3434 if (mdev->p_uuid && peer_state.disk >= D_NEGOTIATING &&
3435 get_ldev_if_state(mdev, D_NEGOTIATING)) {
3436 int cr; /* consider resync */
3437
3438 /* if we established a new connection */
4ac4aada 3439 cr = (os.conn < C_CONNECTED);
b411b363
PR
3440 /* if we had an established connection
3441 * and one of the nodes newly attaches a disk */
4ac4aada 3442 cr |= (os.conn == C_CONNECTED &&
b411b363 3443 (peer_state.disk == D_NEGOTIATING ||
4ac4aada 3444 os.disk == D_NEGOTIATING));
b411b363
PR
3445 /* if we have both been inconsistent, and the peer has been
3446 * forced to be UpToDate with --overwrite-data */
3447 cr |= test_bit(CONSIDER_RESYNC, &mdev->flags);
3448 /* if we had been plain connected, and the admin requested to
3449 * start a sync by "invalidate" or "invalidate-remote" */
4ac4aada 3450 cr |= (os.conn == C_CONNECTED &&
b411b363
PR
3451 (peer_state.conn >= C_STARTING_SYNC_S &&
3452 peer_state.conn <= C_WF_BITMAP_T));
3453
3454 if (cr)
4ac4aada 3455 ns.conn = drbd_sync_handshake(mdev, peer_state.role, real_peer_disk);
b411b363
PR
3456
3457 put_ldev(mdev);
4ac4aada
LE
3458 if (ns.conn == C_MASK) {
3459 ns.conn = C_CONNECTED;
b411b363 3460 if (mdev->state.disk == D_NEGOTIATING) {
82f59cc6 3461 drbd_force_state(mdev, NS(disk, D_FAILED));
b411b363
PR
3462 } else if (peer_state.disk == D_NEGOTIATING) {
3463 dev_err(DEV, "Disk attach process on the peer node was aborted.\n");
3464 peer_state.disk = D_DISKLESS;
580b9767 3465 real_peer_disk = D_DISKLESS;
b411b363 3466 } else {
8169e41b 3467 if (test_and_clear_bit(CONN_DRY_RUN, &mdev->tconn->flags))
81e84650 3468 return false;
4ac4aada 3469 D_ASSERT(os.conn == C_WF_REPORT_PARAMS);
38fa9988 3470 conn_request_state(mdev->tconn, NS(conn, C_DISCONNECTING), CS_HARD);
81e84650 3471 return false;
b411b363
PR
3472 }
3473 }
3474 }
3475
87eeee41 3476 spin_lock_irq(&mdev->tconn->req_lock);
4ac4aada 3477 if (mdev->state.i != os.i)
b411b363
PR
3478 goto retry;
3479 clear_bit(CONSIDER_RESYNC, &mdev->flags);
b411b363
PR
3480 ns.peer = peer_state.role;
3481 ns.pdsk = real_peer_disk;
3482 ns.peer_isp = (peer_state.aftr_isp | peer_state.user_isp);
4ac4aada 3483 if ((ns.conn == C_CONNECTED || ns.conn == C_WF_BITMAP_S) && ns.disk == D_NEGOTIATING)
b411b363 3484 ns.disk = mdev->new_state_tmp.disk;
4ac4aada
LE
3485 cs_flags = CS_VERBOSE + (os.conn < C_CONNECTED && ns.conn >= C_CONNECTED ? 0 : CS_HARD);
3486 if (ns.pdsk == D_CONSISTENT && is_susp(ns) && ns.conn == C_CONNECTED && os.conn < C_CONNECTED &&
481c6f50 3487 test_bit(NEW_CUR_UUID, &mdev->flags)) {
8554df1c 3488 /* Do not allow tl_restart(RESEND) for a rebooted peer. We can only allow this
481c6f50 3489 for temporal network outages! */
87eeee41 3490 spin_unlock_irq(&mdev->tconn->req_lock);
481c6f50 3491 dev_err(DEV, "Aborting Connect, can not thaw IO with an only Consistent peer\n");
2f5cdd0b 3492 tl_clear(mdev->tconn);
481c6f50
PR
3493 drbd_uuid_new_current(mdev);
3494 clear_bit(NEW_CUR_UUID, &mdev->flags);
38fa9988 3495 conn_request_state(mdev->tconn, NS2(conn, C_PROTOCOL_ERROR, susp, 0), CS_HARD);
81e84650 3496 return false;
481c6f50 3497 }
65d922c3 3498 rv = _drbd_set_state(mdev, ns, cs_flags, NULL);
b411b363 3499 ns = mdev->state;
87eeee41 3500 spin_unlock_irq(&mdev->tconn->req_lock);
b411b363
PR
3501
3502 if (rv < SS_SUCCESS) {
38fa9988 3503 conn_request_state(mdev->tconn, NS(conn, C_DISCONNECTING), CS_HARD);
81e84650 3504 return false;
b411b363
PR
3505 }
3506
4ac4aada
LE
3507 if (os.conn > C_WF_REPORT_PARAMS) {
3508 if (ns.conn > C_CONNECTED && peer_state.conn <= C_CONNECTED &&
b411b363
PR
3509 peer_state.disk != D_NEGOTIATING ) {
3510 /* we want resync, peer has not yet decided to sync... */
3511 /* Nowadays only used when forcing a node into primary role and
3512 setting its disk to UpToDate with that */
3513 drbd_send_uuids(mdev);
3514 drbd_send_state(mdev);
3515 }
3516 }
3517
89e58e75 3518 mdev->tconn->net_conf->want_lose = 0;
b411b363
PR
3519
3520 drbd_md_sync(mdev); /* update connected indicator, la_size, ... */
3521
81e84650 3522 return true;
b411b363
PR
3523}
3524
d8763023
AG
3525static int receive_sync_uuid(struct drbd_conf *mdev, enum drbd_packet cmd,
3526 unsigned int data_size)
b411b363 3527{
e42325a5 3528 struct p_rs_uuid *p = &mdev->tconn->data.rbuf.rs_uuid;
b411b363
PR
3529
3530 wait_event(mdev->misc_wait,
3531 mdev->state.conn == C_WF_SYNC_UUID ||
c4752ef1 3532 mdev->state.conn == C_BEHIND ||
b411b363
PR
3533 mdev->state.conn < C_CONNECTED ||
3534 mdev->state.disk < D_NEGOTIATING);
3535
3536 /* D_ASSERT( mdev->state.conn == C_WF_SYNC_UUID ); */
3537
b411b363
PR
3538 /* Here the _drbd_uuid_ functions are right, current should
3539 _not_ be rotated into the history */
3540 if (get_ldev_if_state(mdev, D_NEGOTIATING)) {
3541 _drbd_uuid_set(mdev, UI_CURRENT, be64_to_cpu(p->uuid));
3542 _drbd_uuid_set(mdev, UI_BITMAP, 0UL);
3543
62b0da3a 3544 drbd_print_uuids(mdev, "updated sync uuid");
b411b363
PR
3545 drbd_start_resync(mdev, C_SYNC_TARGET);
3546
3547 put_ldev(mdev);
3548 } else
3549 dev_err(DEV, "Ignoring SyncUUID packet!\n");
3550
81e84650 3551 return true;
b411b363
PR
3552}
3553
2c46407d
AG
3554/**
3555 * receive_bitmap_plain
3556 *
3557 * Return 0 when done, 1 when another iteration is needed, and a negative error
3558 * code upon failure.
3559 */
3560static int
02918be2
PR
3561receive_bitmap_plain(struct drbd_conf *mdev, unsigned int data_size,
3562 unsigned long *buffer, struct bm_xfer_ctx *c)
b411b363
PR
3563{
3564 unsigned num_words = min_t(size_t, BM_PACKET_WORDS, c->bm_words - c->word_offset);
3565 unsigned want = num_words * sizeof(long);
2c46407d 3566 int err;
b411b363 3567
02918be2
PR
3568 if (want != data_size) {
3569 dev_err(DEV, "%s:want (%u) != data_size (%u)\n", __func__, want, data_size);
2c46407d 3570 return -EIO;
b411b363
PR
3571 }
3572 if (want == 0)
2c46407d 3573 return 0;
de0ff338 3574 err = drbd_recv(mdev->tconn, buffer, want);
2c46407d
AG
3575 if (err != want) {
3576 if (err >= 0)
3577 err = -EIO;
3578 return err;
3579 }
b411b363
PR
3580
3581 drbd_bm_merge_lel(mdev, c->word_offset, num_words, buffer);
3582
3583 c->word_offset += num_words;
3584 c->bit_offset = c->word_offset * BITS_PER_LONG;
3585 if (c->bit_offset > c->bm_bits)
3586 c->bit_offset = c->bm_bits;
3587
2c46407d 3588 return 1;
b411b363
PR
3589}
3590
2c46407d
AG
3591/**
3592 * recv_bm_rle_bits
3593 *
3594 * Return 0 when done, 1 when another iteration is needed, and a negative error
3595 * code upon failure.
3596 */
3597static int
b411b363
PR
3598recv_bm_rle_bits(struct drbd_conf *mdev,
3599 struct p_compressed_bm *p,
c6d25cfe
PR
3600 struct bm_xfer_ctx *c,
3601 unsigned int len)
b411b363
PR
3602{
3603 struct bitstream bs;
3604 u64 look_ahead;
3605 u64 rl;
3606 u64 tmp;
3607 unsigned long s = c->bit_offset;
3608 unsigned long e;
b411b363
PR
3609 int toggle = DCBP_get_start(p);
3610 int have;
3611 int bits;
3612
3613 bitstream_init(&bs, p->code, len, DCBP_get_pad_bits(p));
3614
3615 bits = bitstream_get_bits(&bs, &look_ahead, 64);
3616 if (bits < 0)
2c46407d 3617 return -EIO;
b411b363
PR
3618
3619 for (have = bits; have > 0; s += rl, toggle = !toggle) {
3620 bits = vli_decode_bits(&rl, look_ahead);
3621 if (bits <= 0)
2c46407d 3622 return -EIO;
b411b363
PR
3623
3624 if (toggle) {
3625 e = s + rl -1;
3626 if (e >= c->bm_bits) {
3627 dev_err(DEV, "bitmap overflow (e:%lu) while decoding bm RLE packet\n", e);
2c46407d 3628 return -EIO;
b411b363
PR
3629 }
3630 _drbd_bm_set_bits(mdev, s, e);
3631 }
3632
3633 if (have < bits) {
3634 dev_err(DEV, "bitmap decoding error: h:%d b:%d la:0x%08llx l:%u/%u\n",
3635 have, bits, look_ahead,
3636 (unsigned int)(bs.cur.b - p->code),
3637 (unsigned int)bs.buf_len);
2c46407d 3638 return -EIO;
b411b363
PR
3639 }
3640 look_ahead >>= bits;
3641 have -= bits;
3642
3643 bits = bitstream_get_bits(&bs, &tmp, 64 - have);
3644 if (bits < 0)
2c46407d 3645 return -EIO;
b411b363
PR
3646 look_ahead |= tmp << have;
3647 have += bits;
3648 }
3649
3650 c->bit_offset = s;
3651 bm_xfer_ctx_bit_to_word_offset(c);
3652
2c46407d 3653 return (s != c->bm_bits);
b411b363
PR
3654}
3655
2c46407d
AG
3656/**
3657 * decode_bitmap_c
3658 *
3659 * Return 0 when done, 1 when another iteration is needed, and a negative error
3660 * code upon failure.
3661 */
3662static int
b411b363
PR
3663decode_bitmap_c(struct drbd_conf *mdev,
3664 struct p_compressed_bm *p,
c6d25cfe
PR
3665 struct bm_xfer_ctx *c,
3666 unsigned int len)
b411b363
PR
3667{
3668 if (DCBP_get_code(p) == RLE_VLI_Bits)
c6d25cfe 3669 return recv_bm_rle_bits(mdev, p, c, len);
b411b363
PR
3670
3671 /* other variants had been implemented for evaluation,
3672 * but have been dropped as this one turned out to be "best"
3673 * during all our tests. */
3674
3675 dev_err(DEV, "receive_bitmap_c: unknown encoding %u\n", p->encoding);
38fa9988 3676 conn_request_state(mdev->tconn, NS(conn, C_PROTOCOL_ERROR), CS_HARD);
2c46407d 3677 return -EIO;
b411b363
PR
3678}
3679
3680void INFO_bm_xfer_stats(struct drbd_conf *mdev,
3681 const char *direction, struct bm_xfer_ctx *c)
3682{
3683 /* what would it take to transfer it "plaintext" */
c012949a 3684 unsigned plain = sizeof(struct p_header) *
b411b363
PR
3685 ((c->bm_words+BM_PACKET_WORDS-1)/BM_PACKET_WORDS+1)
3686 + c->bm_words * sizeof(long);
3687 unsigned total = c->bytes[0] + c->bytes[1];
3688 unsigned r;
3689
3690 /* total can not be zero. but just in case: */
3691 if (total == 0)
3692 return;
3693
3694 /* don't report if not compressed */
3695 if (total >= plain)
3696 return;
3697
3698 /* total < plain. check for overflow, still */
3699 r = (total > UINT_MAX/1000) ? (total / (plain/1000))
3700 : (1000 * total / plain);
3701
3702 if (r > 1000)
3703 r = 1000;
3704
3705 r = 1000 - r;
3706 dev_info(DEV, "%s bitmap stats [Bytes(packets)]: plain %u(%u), RLE %u(%u), "
3707 "total %u; compression: %u.%u%%\n",
3708 direction,
3709 c->bytes[1], c->packets[1],
3710 c->bytes[0], c->packets[0],
3711 total, r/10, r % 10);
3712}
3713
3714/* Since we are processing the bitfield from lower addresses to higher,
3715 it does not matter if the process it in 32 bit chunks or 64 bit
3716 chunks as long as it is little endian. (Understand it as byte stream,
3717 beginning with the lowest byte...) If we would use big endian
3718 we would need to process it from the highest address to the lowest,
3719 in order to be agnostic to the 32 vs 64 bits issue.
3720
3721 returns 0 on failure, 1 if we successfully received it. */
d8763023
AG
3722static int receive_bitmap(struct drbd_conf *mdev, enum drbd_packet cmd,
3723 unsigned int data_size)
b411b363
PR
3724{
3725 struct bm_xfer_ctx c;
3726 void *buffer;
2c46407d 3727 int err;
81e84650 3728 int ok = false;
257d0af6 3729 struct p_header *h = &mdev->tconn->data.rbuf.header;
77351055 3730 struct packet_info pi;
b411b363 3731
20ceb2b2
LE
3732 drbd_bm_lock(mdev, "receive bitmap", BM_LOCKED_SET_ALLOWED);
3733 /* you are supposed to send additional out-of-sync information
3734 * if you actually set bits during this phase */
b411b363
PR
3735
3736 /* maybe we should use some per thread scratch page,
3737 * and allocate that during initial device creation? */
3738 buffer = (unsigned long *) __get_free_page(GFP_NOIO);
3739 if (!buffer) {
3740 dev_err(DEV, "failed to allocate one page buffer in %s\n", __func__);
3741 goto out;
3742 }
3743
3744 c = (struct bm_xfer_ctx) {
3745 .bm_bits = drbd_bm_bits(mdev),
3746 .bm_words = drbd_bm_words(mdev),
3747 };
3748
2c46407d 3749 for(;;) {
02918be2 3750 if (cmd == P_BITMAP) {
2c46407d 3751 err = receive_bitmap_plain(mdev, data_size, buffer, &c);
02918be2 3752 } else if (cmd == P_COMPRESSED_BITMAP) {
b411b363
PR
3753 /* MAYBE: sanity check that we speak proto >= 90,
3754 * and the feature is enabled! */
3755 struct p_compressed_bm *p;
3756
02918be2 3757 if (data_size > BM_PACKET_PAYLOAD_BYTES) {
b411b363
PR
3758 dev_err(DEV, "ReportCBitmap packet too large\n");
3759 goto out;
3760 }
3761 /* use the page buff */
3762 p = buffer;
3763 memcpy(p, h, sizeof(*h));
de0ff338 3764 if (drbd_recv(mdev->tconn, p->head.payload, data_size) != data_size)
b411b363 3765 goto out;
004352fa
LE
3766 if (data_size <= (sizeof(*p) - sizeof(p->head))) {
3767 dev_err(DEV, "ReportCBitmap packet too small (l:%u)\n", data_size);
78fcbdae 3768 goto out;
b411b363 3769 }
c6d25cfe 3770 err = decode_bitmap_c(mdev, p, &c, data_size);
b411b363 3771 } else {
02918be2 3772 dev_warn(DEV, "receive_bitmap: cmd neither ReportBitMap nor ReportCBitMap (is 0x%x)", cmd);
b411b363
PR
3773 goto out;
3774 }
3775
02918be2 3776 c.packets[cmd == P_BITMAP]++;
257d0af6 3777 c.bytes[cmd == P_BITMAP] += sizeof(struct p_header) + data_size;
b411b363 3778
2c46407d
AG
3779 if (err <= 0) {
3780 if (err < 0)
3781 goto out;
b411b363 3782 break;
2c46407d 3783 }
9ba7aa00 3784 if (!drbd_recv_header(mdev->tconn, &pi))
b411b363 3785 goto out;
77351055
PR
3786 cmd = pi.cmd;
3787 data_size = pi.size;
2c46407d 3788 }
b411b363
PR
3789
3790 INFO_bm_xfer_stats(mdev, "receive", &c);
3791
3792 if (mdev->state.conn == C_WF_BITMAP_T) {
de1f8e4a
AG
3793 enum drbd_state_rv rv;
3794
b411b363
PR
3795 ok = !drbd_send_bitmap(mdev);
3796 if (!ok)
3797 goto out;
3798 /* Omit CS_ORDERED with this state transition to avoid deadlocks. */
de1f8e4a
AG
3799 rv = _drbd_request_state(mdev, NS(conn, C_WF_SYNC_UUID), CS_VERBOSE);
3800 D_ASSERT(rv == SS_SUCCESS);
b411b363
PR
3801 } else if (mdev->state.conn != C_WF_BITMAP_S) {
3802 /* admin may have requested C_DISCONNECTING,
3803 * other threads may have noticed network errors */
3804 dev_info(DEV, "unexpected cstate (%s) in receive_bitmap\n",
3805 drbd_conn_str(mdev->state.conn));
3806 }
3807
81e84650 3808 ok = true;
b411b363 3809 out:
20ceb2b2 3810 drbd_bm_unlock(mdev);
b411b363
PR
3811 if (ok && mdev->state.conn == C_WF_BITMAP_S)
3812 drbd_start_resync(mdev, C_SYNC_SOURCE);
3813 free_page((unsigned long) buffer);
3814 return ok;
3815}
3816
2de876ef 3817static int _tconn_receive_skip(struct drbd_tconn *tconn, unsigned int data_size)
b411b363
PR
3818{
3819 /* TODO zero copy sink :) */
3820 static char sink[128];
3821 int size, want, r;
3822
02918be2 3823 size = data_size;
b411b363
PR
3824 while (size > 0) {
3825 want = min_t(int, size, sizeof(sink));
2de876ef
PR
3826 r = drbd_recv(tconn, sink, want);
3827 if (r <= 0)
841ce241 3828 break;
b411b363
PR
3829 size -= r;
3830 }
3831 return size == 0;
3832}
3833
2de876ef
PR
3834static int receive_skip(struct drbd_conf *mdev, enum drbd_packet cmd,
3835 unsigned int data_size)
3836{
3837 dev_warn(DEV, "skipping unknown optional packet type %d, l: %d!\n",
3838 cmd, data_size);
3839
3840 return _tconn_receive_skip(mdev->tconn, data_size);
3841}
3842
3843static int tconn_receive_skip(struct drbd_tconn *tconn, enum drbd_packet cmd, unsigned int data_size)
3844{
3845 conn_warn(tconn, "skipping packet for non existing volume type %d, l: %d!\n",
3846 cmd, data_size);
3847
3848 return _tconn_receive_skip(tconn, data_size);
3849}
3850
d8763023
AG
3851static int receive_UnplugRemote(struct drbd_conf *mdev, enum drbd_packet cmd,
3852 unsigned int data_size)
0ced55a3 3853{
e7f52dfb
LE
3854 /* Make sure we've acked all the TCP data associated
3855 * with the data requests being unplugged */
e42325a5 3856 drbd_tcp_quickack(mdev->tconn->data.socket);
0ced55a3 3857
81e84650 3858 return true;
0ced55a3
PR
3859}
3860
d8763023
AG
3861static int receive_out_of_sync(struct drbd_conf *mdev, enum drbd_packet cmd,
3862 unsigned int data_size)
73a01a18 3863{
e42325a5 3864 struct p_block_desc *p = &mdev->tconn->data.rbuf.block_desc;
73a01a18 3865
f735e363
LE
3866 switch (mdev->state.conn) {
3867 case C_WF_SYNC_UUID:
3868 case C_WF_BITMAP_T:
3869 case C_BEHIND:
3870 break;
3871 default:
3872 dev_err(DEV, "ASSERT FAILED cstate = %s, expected: WFSyncUUID|WFBitMapT|Behind\n",
3873 drbd_conn_str(mdev->state.conn));
3874 }
3875
73a01a18
PR
3876 drbd_set_out_of_sync(mdev, be64_to_cpu(p->sector), be32_to_cpu(p->blksize));
3877
81e84650 3878 return true;
73a01a18
PR
3879}
3880
02918be2
PR
3881struct data_cmd {
3882 int expect_payload;
3883 size_t pkt_size;
a4fbda8e 3884 enum mdev_or_conn fa_type; /* first argument's type */
d9ae84e7
PR
3885 union {
3886 int (*mdev_fn)(struct drbd_conf *, enum drbd_packet cmd,
3887 unsigned int to_receive);
3888 int (*conn_fn)(struct drbd_tconn *, enum drbd_packet cmd,
3889 unsigned int to_receive);
3890 };
02918be2
PR
3891};
3892
3893static struct data_cmd drbd_cmd_handler[] = {
d9ae84e7
PR
3894 [P_DATA] = { 1, sizeof(struct p_data), MDEV, { receive_Data } },
3895 [P_DATA_REPLY] = { 1, sizeof(struct p_data), MDEV, { receive_DataReply } },
3896 [P_RS_DATA_REPLY] = { 1, sizeof(struct p_data), MDEV, { receive_RSDataReply } } ,
3897 [P_BARRIER] = { 0, sizeof(struct p_barrier), MDEV, { receive_Barrier } } ,
3898 [P_BITMAP] = { 1, sizeof(struct p_header), MDEV, { receive_bitmap } } ,
3899 [P_COMPRESSED_BITMAP] = { 1, sizeof(struct p_header), MDEV, { receive_bitmap } } ,
3900 [P_UNPLUG_REMOTE] = { 0, sizeof(struct p_header), MDEV, { receive_UnplugRemote } },
3901 [P_DATA_REQUEST] = { 0, sizeof(struct p_block_req), MDEV, { receive_DataRequest } },
3902 [P_RS_DATA_REQUEST] = { 0, sizeof(struct p_block_req), MDEV, { receive_DataRequest } },
3903 [P_SYNC_PARAM] = { 1, sizeof(struct p_header), MDEV, { receive_SyncParam } },
3904 [P_SYNC_PARAM89] = { 1, sizeof(struct p_header), MDEV, { receive_SyncParam } },
7204624c 3905 [P_PROTOCOL] = { 1, sizeof(struct p_protocol), CONN, { .conn_fn = receive_protocol } },
d9ae84e7
PR
3906 [P_UUIDS] = { 0, sizeof(struct p_uuids), MDEV, { receive_uuids } },
3907 [P_SIZES] = { 0, sizeof(struct p_sizes), MDEV, { receive_sizes } },
3908 [P_STATE] = { 0, sizeof(struct p_state), MDEV, { receive_state } },
3909 [P_STATE_CHG_REQ] = { 0, sizeof(struct p_req_state), MDEV, { receive_req_state } },
3910 [P_SYNC_UUID] = { 0, sizeof(struct p_rs_uuid), MDEV, { receive_sync_uuid } },
3911 [P_OV_REQUEST] = { 0, sizeof(struct p_block_req), MDEV, { receive_DataRequest } },
3912 [P_OV_REPLY] = { 1, sizeof(struct p_block_req), MDEV, { receive_DataRequest } },
3913 [P_CSUM_RS_REQUEST] = { 1, sizeof(struct p_block_req), MDEV, { receive_DataRequest } },
3914 [P_DELAY_PROBE] = { 0, sizeof(struct p_delay_probe93), MDEV, { receive_skip } },
3915 [P_OUT_OF_SYNC] = { 0, sizeof(struct p_block_desc), MDEV, { receive_out_of_sync } },
dfafcc8a 3916 [P_CONN_ST_CHG_REQ] = { 0, sizeof(struct p_req_state), CONN, { .conn_fn = receive_req_conn_state } },
b411b363
PR
3917};
3918
02918be2 3919/* All handler functions that expect a sub-header get that sub-heder in
e42325a5 3920 mdev->tconn->data.rbuf.header.head.payload.
02918be2 3921
e42325a5 3922 Usually in mdev->tconn->data.rbuf.header.head the callback can find the usual
02918be2
PR
3923 p_header, but they may not rely on that. Since there is also p_header95 !
3924 */
b411b363 3925
eefc2f7d 3926static void drbdd(struct drbd_tconn *tconn)
b411b363 3927{
eefc2f7d 3928 struct p_header *header = &tconn->data.rbuf.header;
77351055 3929 struct packet_info pi;
02918be2
PR
3930 size_t shs; /* sub header size */
3931 int rv;
b411b363 3932
eefc2f7d
PR
3933 while (get_t_state(&tconn->receiver) == RUNNING) {
3934 drbd_thread_current_set_cpu(&tconn->receiver);
3935 if (!drbd_recv_header(tconn, &pi))
02918be2 3936 goto err_out;
b411b363 3937
6e849ce8 3938 if (unlikely(pi.cmd >= ARRAY_SIZE(drbd_cmd_handler) ||
d9ae84e7 3939 !drbd_cmd_handler[pi.cmd].mdev_fn)) {
eefc2f7d 3940 conn_err(tconn, "unknown packet type %d, l: %d!\n", pi.cmd, pi.size);
02918be2 3941 goto err_out;
0b33a916 3942 }
b411b363 3943
77351055
PR
3944 shs = drbd_cmd_handler[pi.cmd].pkt_size - sizeof(struct p_header);
3945 if (pi.size - shs > 0 && !drbd_cmd_handler[pi.cmd].expect_payload) {
eefc2f7d 3946 conn_err(tconn, "No payload expected %s l:%d\n", cmdname(pi.cmd), pi.size);
02918be2 3947 goto err_out;
b411b363 3948 }
b411b363 3949
c13f7e1a 3950 if (shs) {
eefc2f7d 3951 rv = drbd_recv(tconn, &header->payload, shs);
c13f7e1a 3952 if (unlikely(rv != shs)) {
0ddc5549 3953 if (!signal_pending(current))
eefc2f7d 3954 conn_warn(tconn, "short read while reading sub header: rv=%d\n", rv);
c13f7e1a
LE
3955 goto err_out;
3956 }
3957 }
3958
a4fbda8e 3959 if (drbd_cmd_handler[pi.cmd].fa_type == CONN) {
d9ae84e7
PR
3960 rv = drbd_cmd_handler[pi.cmd].conn_fn(tconn, pi.cmd, pi.size - shs);
3961 } else {
3962 struct drbd_conf *mdev = vnr_to_mdev(tconn, pi.vnr);
3963 rv = mdev ?
3964 drbd_cmd_handler[pi.cmd].mdev_fn(mdev, pi.cmd, pi.size - shs) :
3965 tconn_receive_skip(tconn, pi.cmd, pi.size - shs);
3966 }
b411b363 3967
02918be2 3968 if (unlikely(!rv)) {
eefc2f7d 3969 conn_err(tconn, "error receiving %s, l: %d!\n",
77351055 3970 cmdname(pi.cmd), pi.size);
02918be2 3971 goto err_out;
b411b363
PR
3972 }
3973 }
b411b363 3974
02918be2
PR
3975 if (0) {
3976 err_out:
bbeb641c 3977 conn_request_state(tconn, NS(conn, C_PROTOCOL_ERROR), CS_HARD);
02918be2 3978 }
b411b363
PR
3979}
3980
0e29d163 3981void conn_flush_workqueue(struct drbd_tconn *tconn)
b411b363
PR
3982{
3983 struct drbd_wq_barrier barr;
3984
3985 barr.w.cb = w_prev_work_done;
0e29d163 3986 barr.w.tconn = tconn;
b411b363 3987 init_completion(&barr.done);
0e29d163 3988 drbd_queue_work(&tconn->data.work, &barr.w);
b411b363
PR
3989 wait_for_completion(&barr.done);
3990}
3991
360cc740 3992static void drbd_disconnect(struct drbd_tconn *tconn)
b411b363 3993{
bbeb641c 3994 enum drbd_conns oc;
b411b363 3995 int rv = SS_UNKNOWN_ERROR;
b411b363 3996
bbeb641c 3997 if (tconn->cstate == C_STANDALONE)
b411b363 3998 return;
b411b363
PR
3999
4000 /* asender does not clean up anything. it must not interfere, either */
360cc740
PR
4001 drbd_thread_stop(&tconn->asender);
4002 drbd_free_sock(tconn);
4003
4004 idr_for_each(&tconn->volumes, drbd_disconnected, tconn);
4005
4006 conn_info(tconn, "Connection closed\n");
4007
4008 spin_lock_irq(&tconn->req_lock);
bbeb641c
PR
4009 oc = tconn->cstate;
4010 if (oc >= C_UNCONNECTED)
4011 rv = _conn_request_state(tconn, NS(conn, C_UNCONNECTED), CS_VERBOSE);
4012
360cc740
PR
4013 spin_unlock_irq(&tconn->req_lock);
4014
bbeb641c 4015 if (oc == C_DISCONNECTING) {
360cc740
PR
4016 wait_event(tconn->net_cnt_wait, atomic_read(&tconn->net_cnt) == 0);
4017
4018 crypto_free_hash(tconn->cram_hmac_tfm);
4019 tconn->cram_hmac_tfm = NULL;
4020
4021 kfree(tconn->net_conf);
4022 tconn->net_conf = NULL;
bbeb641c 4023 conn_request_state(tconn, NS(conn, C_STANDALONE), CS_VERBOSE);
360cc740
PR
4024 }
4025}
4026
4027static int drbd_disconnected(int vnr, void *p, void *data)
4028{
4029 struct drbd_conf *mdev = (struct drbd_conf *)p;
4030 enum drbd_fencing_p fp;
4031 unsigned int i;
b411b363 4032
85719573 4033 /* wait for current activity to cease. */
87eeee41 4034 spin_lock_irq(&mdev->tconn->req_lock);
b411b363
PR
4035 _drbd_wait_ee_list_empty(mdev, &mdev->active_ee);
4036 _drbd_wait_ee_list_empty(mdev, &mdev->sync_ee);
4037 _drbd_wait_ee_list_empty(mdev, &mdev->read_ee);
87eeee41 4038 spin_unlock_irq(&mdev->tconn->req_lock);
b411b363
PR
4039
4040 /* We do not have data structures that would allow us to
4041 * get the rs_pending_cnt down to 0 again.
4042 * * On C_SYNC_TARGET we do not have any data structures describing
4043 * the pending RSDataRequest's we have sent.
4044 * * On C_SYNC_SOURCE there is no data structure that tracks
4045 * the P_RS_DATA_REPLY blocks that we sent to the SyncTarget.
4046 * And no, it is not the sum of the reference counts in the
4047 * resync_LRU. The resync_LRU tracks the whole operation including
4048 * the disk-IO, while the rs_pending_cnt only tracks the blocks
4049 * on the fly. */
4050 drbd_rs_cancel_all(mdev);
4051 mdev->rs_total = 0;
4052 mdev->rs_failed = 0;
4053 atomic_set(&mdev->rs_pending_cnt, 0);
4054 wake_up(&mdev->misc_wait);
4055
7fde2be9
PR
4056 del_timer(&mdev->request_timer);
4057
b411b363 4058 del_timer_sync(&mdev->resync_timer);
b411b363
PR
4059 resync_timer_fn((unsigned long)mdev);
4060
b411b363
PR
4061 /* wait for all w_e_end_data_req, w_e_end_rsdata_req, w_send_barrier,
4062 * w_make_resync_request etc. which may still be on the worker queue
4063 * to be "canceled" */
a21e9298 4064 drbd_flush_workqueue(mdev);
b411b363
PR
4065
4066 /* This also does reclaim_net_ee(). If we do this too early, we might
4067 * miss some resync ee and pages.*/
4068 drbd_process_done_ee(mdev);
4069
4070 kfree(mdev->p_uuid);
4071 mdev->p_uuid = NULL;
4072
fb22c402 4073 if (!is_susp(mdev->state))
2f5cdd0b 4074 tl_clear(mdev->tconn);
b411b363 4075
b411b363
PR
4076 drbd_md_sync(mdev);
4077
4078 fp = FP_DONT_CARE;
4079 if (get_ldev(mdev)) {
4080 fp = mdev->ldev->dc.fencing;
4081 put_ldev(mdev);
4082 }
4083
87f7be4c
PR
4084 if (mdev->state.role == R_PRIMARY && fp >= FP_RESOURCE && mdev->state.pdsk >= D_UNKNOWN)
4085 drbd_try_outdate_peer_async(mdev);
b411b363 4086
20ceb2b2
LE
4087 /* serialize with bitmap writeout triggered by the state change,
4088 * if any. */
4089 wait_event(mdev->misc_wait, !test_bit(BITMAP_IO, &mdev->flags));
4090
b411b363
PR
4091 /* tcp_close and release of sendpage pages can be deferred. I don't
4092 * want to use SO_LINGER, because apparently it can be deferred for
4093 * more than 20 seconds (longest time I checked).
4094 *
4095 * Actually we don't care for exactly when the network stack does its
4096 * put_page(), but release our reference on these pages right here.
4097 */
4098 i = drbd_release_ee(mdev, &mdev->net_ee);
4099 if (i)
4100 dev_info(DEV, "net_ee not empty, killed %u entries\n", i);
435f0740
LE
4101 i = atomic_read(&mdev->pp_in_use_by_net);
4102 if (i)
4103 dev_info(DEV, "pp_in_use_by_net = %d, expected 0\n", i);
b411b363
PR
4104 i = atomic_read(&mdev->pp_in_use);
4105 if (i)
45bb912b 4106 dev_info(DEV, "pp_in_use = %d, expected 0\n", i);
b411b363
PR
4107
4108 D_ASSERT(list_empty(&mdev->read_ee));
4109 D_ASSERT(list_empty(&mdev->active_ee));
4110 D_ASSERT(list_empty(&mdev->sync_ee));
4111 D_ASSERT(list_empty(&mdev->done_ee));
4112
4113 /* ok, no more ee's on the fly, it is safe to reset the epoch_size */
4114 atomic_set(&mdev->current_epoch->epoch_size, 0);
4115 D_ASSERT(list_empty(&mdev->current_epoch->list));
360cc740
PR
4116
4117 return 0;
b411b363
PR
4118}
4119
4120/*
4121 * We support PRO_VERSION_MIN to PRO_VERSION_MAX. The protocol version
4122 * we can agree on is stored in agreed_pro_version.
4123 *
4124 * feature flags and the reserved array should be enough room for future
4125 * enhancements of the handshake protocol, and possible plugins...
4126 *
4127 * for now, they are expected to be zero, but ignored.
4128 */
8a22cccc 4129static int drbd_send_handshake(struct drbd_tconn *tconn)
b411b363 4130{
e6b3ea83 4131 /* ASSERT current == mdev->tconn->receiver ... */
8a22cccc 4132 struct p_handshake *p = &tconn->data.sbuf.handshake;
b411b363
PR
4133 int ok;
4134
8a22cccc
PR
4135 if (mutex_lock_interruptible(&tconn->data.mutex)) {
4136 conn_err(tconn, "interrupted during initial handshake\n");
b411b363
PR
4137 return 0; /* interrupted. not ok. */
4138 }
4139
8a22cccc
PR
4140 if (tconn->data.socket == NULL) {
4141 mutex_unlock(&tconn->data.mutex);
b411b363
PR
4142 return 0;
4143 }
4144
4145 memset(p, 0, sizeof(*p));
4146 p->protocol_min = cpu_to_be32(PRO_VERSION_MIN);
4147 p->protocol_max = cpu_to_be32(PRO_VERSION_MAX);
8a22cccc
PR
4148 ok = _conn_send_cmd(tconn, 0, tconn->data.socket, P_HAND_SHAKE,
4149 &p->head, sizeof(*p), 0);
4150 mutex_unlock(&tconn->data.mutex);
b411b363
PR
4151 return ok;
4152}
4153
4154/*
4155 * return values:
4156 * 1 yes, we have a valid connection
4157 * 0 oops, did not work out, please try again
4158 * -1 peer talks different language,
4159 * no point in trying again, please go standalone.
4160 */
65d11ed6 4161static int drbd_do_handshake(struct drbd_tconn *tconn)
b411b363 4162{
65d11ed6
PR
4163 /* ASSERT current == tconn->receiver ... */
4164 struct p_handshake *p = &tconn->data.rbuf.handshake;
02918be2 4165 const int expect = sizeof(struct p_handshake) - sizeof(struct p_header80);
77351055 4166 struct packet_info pi;
b411b363
PR
4167 int rv;
4168
65d11ed6 4169 rv = drbd_send_handshake(tconn);
b411b363
PR
4170 if (!rv)
4171 return 0;
4172
65d11ed6 4173 rv = drbd_recv_header(tconn, &pi);
b411b363
PR
4174 if (!rv)
4175 return 0;
4176
77351055 4177 if (pi.cmd != P_HAND_SHAKE) {
65d11ed6 4178 conn_err(tconn, "expected HandShake packet, received: %s (0x%04x)\n",
77351055 4179 cmdname(pi.cmd), pi.cmd);
b411b363
PR
4180 return -1;
4181 }
4182
77351055 4183 if (pi.size != expect) {
65d11ed6 4184 conn_err(tconn, "expected HandShake length: %u, received: %u\n",
77351055 4185 expect, pi.size);
b411b363
PR
4186 return -1;
4187 }
4188
65d11ed6 4189 rv = drbd_recv(tconn, &p->head.payload, expect);
b411b363
PR
4190
4191 if (rv != expect) {
0ddc5549 4192 if (!signal_pending(current))
65d11ed6 4193 conn_warn(tconn, "short read receiving handshake packet: l=%u\n", rv);
b411b363
PR
4194 return 0;
4195 }
4196
b411b363
PR
4197 p->protocol_min = be32_to_cpu(p->protocol_min);
4198 p->protocol_max = be32_to_cpu(p->protocol_max);
4199 if (p->protocol_max == 0)
4200 p->protocol_max = p->protocol_min;
4201
4202 if (PRO_VERSION_MAX < p->protocol_min ||
4203 PRO_VERSION_MIN > p->protocol_max)
4204 goto incompat;
4205
65d11ed6 4206 tconn->agreed_pro_version = min_t(int, PRO_VERSION_MAX, p->protocol_max);
b411b363 4207
65d11ed6
PR
4208 conn_info(tconn, "Handshake successful: "
4209 "Agreed network protocol version %d\n", tconn->agreed_pro_version);
b411b363
PR
4210
4211 return 1;
4212
4213 incompat:
65d11ed6 4214 conn_err(tconn, "incompatible DRBD dialects: "
b411b363
PR
4215 "I support %d-%d, peer supports %d-%d\n",
4216 PRO_VERSION_MIN, PRO_VERSION_MAX,
4217 p->protocol_min, p->protocol_max);
4218 return -1;
4219}
4220
4221#if !defined(CONFIG_CRYPTO_HMAC) && !defined(CONFIG_CRYPTO_HMAC_MODULE)
13e6037d 4222static int drbd_do_auth(struct drbd_tconn *tconn)
b411b363
PR
4223{
4224 dev_err(DEV, "This kernel was build without CONFIG_CRYPTO_HMAC.\n");
4225 dev_err(DEV, "You need to disable 'cram-hmac-alg' in drbd.conf.\n");
b10d96cb 4226 return -1;
b411b363
PR
4227}
4228#else
4229#define CHALLENGE_LEN 64
b10d96cb
JT
4230
4231/* Return value:
4232 1 - auth succeeded,
4233 0 - failed, try again (network error),
4234 -1 - auth failed, don't try again.
4235*/
4236
13e6037d 4237static int drbd_do_auth(struct drbd_tconn *tconn)
b411b363
PR
4238{
4239 char my_challenge[CHALLENGE_LEN]; /* 64 Bytes... */
4240 struct scatterlist sg;
4241 char *response = NULL;
4242 char *right_response = NULL;
4243 char *peers_ch = NULL;
13e6037d 4244 unsigned int key_len = strlen(tconn->net_conf->shared_secret);
b411b363
PR
4245 unsigned int resp_size;
4246 struct hash_desc desc;
77351055 4247 struct packet_info pi;
b411b363
PR
4248 int rv;
4249
13e6037d 4250 desc.tfm = tconn->cram_hmac_tfm;
b411b363
PR
4251 desc.flags = 0;
4252
13e6037d
PR
4253 rv = crypto_hash_setkey(tconn->cram_hmac_tfm,
4254 (u8 *)tconn->net_conf->shared_secret, key_len);
b411b363 4255 if (rv) {
13e6037d 4256 conn_err(tconn, "crypto_hash_setkey() failed with %d\n", rv);
b10d96cb 4257 rv = -1;
b411b363
PR
4258 goto fail;
4259 }
4260
4261 get_random_bytes(my_challenge, CHALLENGE_LEN);
4262
13e6037d 4263 rv = conn_send_cmd2(tconn, P_AUTH_CHALLENGE, my_challenge, CHALLENGE_LEN);
b411b363
PR
4264 if (!rv)
4265 goto fail;
4266
13e6037d 4267 rv = drbd_recv_header(tconn, &pi);
b411b363
PR
4268 if (!rv)
4269 goto fail;
4270
77351055 4271 if (pi.cmd != P_AUTH_CHALLENGE) {
13e6037d 4272 conn_err(tconn, "expected AuthChallenge packet, received: %s (0x%04x)\n",
77351055 4273 cmdname(pi.cmd), pi.cmd);
b411b363
PR
4274 rv = 0;
4275 goto fail;
4276 }
4277
77351055 4278 if (pi.size > CHALLENGE_LEN * 2) {
13e6037d 4279 conn_err(tconn, "expected AuthChallenge payload too big.\n");
b10d96cb 4280 rv = -1;
b411b363
PR
4281 goto fail;
4282 }
4283
77351055 4284 peers_ch = kmalloc(pi.size, GFP_NOIO);
b411b363 4285 if (peers_ch == NULL) {
13e6037d 4286 conn_err(tconn, "kmalloc of peers_ch failed\n");
b10d96cb 4287 rv = -1;
b411b363
PR
4288 goto fail;
4289 }
4290
13e6037d 4291 rv = drbd_recv(tconn, peers_ch, pi.size);
b411b363 4292
77351055 4293 if (rv != pi.size) {
0ddc5549 4294 if (!signal_pending(current))
13e6037d 4295 conn_warn(tconn, "short read AuthChallenge: l=%u\n", rv);
b411b363
PR
4296 rv = 0;
4297 goto fail;
4298 }
4299
13e6037d 4300 resp_size = crypto_hash_digestsize(tconn->cram_hmac_tfm);
b411b363
PR
4301 response = kmalloc(resp_size, GFP_NOIO);
4302 if (response == NULL) {
13e6037d 4303 conn_err(tconn, "kmalloc of response failed\n");
b10d96cb 4304 rv = -1;
b411b363
PR
4305 goto fail;
4306 }
4307
4308 sg_init_table(&sg, 1);
77351055 4309 sg_set_buf(&sg, peers_ch, pi.size);
b411b363
PR
4310
4311 rv = crypto_hash_digest(&desc, &sg, sg.length, response);
4312 if (rv) {
13e6037d 4313 conn_err(tconn, "crypto_hash_digest() failed with %d\n", rv);
b10d96cb 4314 rv = -1;
b411b363
PR
4315 goto fail;
4316 }
4317
13e6037d 4318 rv = conn_send_cmd2(tconn, P_AUTH_RESPONSE, response, resp_size);
b411b363
PR
4319 if (!rv)
4320 goto fail;
4321
13e6037d 4322 rv = drbd_recv_header(tconn, &pi);
b411b363
PR
4323 if (!rv)
4324 goto fail;
4325
77351055 4326 if (pi.cmd != P_AUTH_RESPONSE) {
13e6037d 4327 conn_err(tconn, "expected AuthResponse packet, received: %s (0x%04x)\n",
77351055 4328 cmdname(pi.cmd), pi.cmd);
b411b363
PR
4329 rv = 0;
4330 goto fail;
4331 }
4332
77351055 4333 if (pi.size != resp_size) {
13e6037d 4334 conn_err(tconn, "expected AuthResponse payload of wrong size\n");
b411b363
PR
4335 rv = 0;
4336 goto fail;
4337 }
4338
13e6037d 4339 rv = drbd_recv(tconn, response , resp_size);
b411b363
PR
4340
4341 if (rv != resp_size) {
0ddc5549 4342 if (!signal_pending(current))
13e6037d 4343 conn_warn(tconn, "short read receiving AuthResponse: l=%u\n", rv);
b411b363
PR
4344 rv = 0;
4345 goto fail;
4346 }
4347
4348 right_response = kmalloc(resp_size, GFP_NOIO);
2d1ee87d 4349 if (right_response == NULL) {
13e6037d 4350 conn_err(tconn, "kmalloc of right_response failed\n");
b10d96cb 4351 rv = -1;
b411b363
PR
4352 goto fail;
4353 }
4354
4355 sg_set_buf(&sg, my_challenge, CHALLENGE_LEN);
4356
4357 rv = crypto_hash_digest(&desc, &sg, sg.length, right_response);
4358 if (rv) {
13e6037d 4359 conn_err(tconn, "crypto_hash_digest() failed with %d\n", rv);
b10d96cb 4360 rv = -1;
b411b363
PR
4361 goto fail;
4362 }
4363
4364 rv = !memcmp(response, right_response, resp_size);
4365
4366 if (rv)
13e6037d
PR
4367 conn_info(tconn, "Peer authenticated using %d bytes of '%s' HMAC\n",
4368 resp_size, tconn->net_conf->cram_hmac_alg);
b10d96cb
JT
4369 else
4370 rv = -1;
b411b363
PR
4371
4372 fail:
4373 kfree(peers_ch);
4374 kfree(response);
4375 kfree(right_response);
4376
4377 return rv;
4378}
4379#endif
4380
4381int drbdd_init(struct drbd_thread *thi)
4382{
392c8801 4383 struct drbd_tconn *tconn = thi->tconn;
b411b363
PR
4384 int h;
4385
4d641dd7 4386 conn_info(tconn, "receiver (re)started\n");
b411b363
PR
4387
4388 do {
4d641dd7 4389 h = drbd_connect(tconn);
b411b363 4390 if (h == 0) {
4d641dd7 4391 drbd_disconnect(tconn);
20ee6390 4392 schedule_timeout_interruptible(HZ);
b411b363
PR
4393 }
4394 if (h == -1) {
4d641dd7 4395 conn_warn(tconn, "Discarding network configuration.\n");
bbeb641c 4396 conn_request_state(tconn, NS(conn, C_DISCONNECTING), CS_HARD);
b411b363
PR
4397 }
4398 } while (h == 0);
4399
4400 if (h > 0) {
4d641dd7
PR
4401 if (get_net_conf(tconn)) {
4402 drbdd(tconn);
4403 put_net_conf(tconn);
b411b363
PR
4404 }
4405 }
4406
4d641dd7 4407 drbd_disconnect(tconn);
b411b363 4408
4d641dd7 4409 conn_info(tconn, "receiver terminated\n");
b411b363
PR
4410 return 0;
4411}
4412
4413/* ********* acknowledge sender ******** */
4414
d8763023 4415static int got_RqSReply(struct drbd_conf *mdev, enum drbd_packet cmd)
b411b363 4416{
257d0af6 4417 struct p_req_state_reply *p = &mdev->tconn->meta.rbuf.req_state_reply;
fc3b10a4 4418 struct drbd_tconn *tconn = mdev->tconn;
b411b363
PR
4419
4420 int retcode = be32_to_cpu(p->retcode);
4421
fc3b10a4
PR
4422 if (cmd == P_STATE_CHG_REPLY) {
4423 if (retcode >= SS_SUCCESS) {
4424 set_bit(CL_ST_CHG_SUCCESS, &mdev->flags);
4425 } else {
4426 set_bit(CL_ST_CHG_FAIL, &mdev->flags);
4427 dev_err(DEV, "Requested state change failed by peer: %s (%d)\n",
4428 drbd_set_st_err_str(retcode), retcode);
4429 }
4430 wake_up(&mdev->state_wait);
4431 } else /* conn == P_CONN_ST_CHG_REPLY */ {
4432 if (retcode >= SS_SUCCESS) {
4433 set_bit(CONN_WD_ST_CHG_OKAY, &tconn->flags);
4434 } else {
4435 set_bit(CONN_WD_ST_CHG_FAIL, &tconn->flags);
4436 conn_err(tconn, "Requested state change failed by peer: %s (%d)\n",
4437 drbd_set_st_err_str(retcode), retcode);
4438 }
4439 wake_up(&tconn->ping_wait);
b411b363 4440 }
81e84650 4441 return true;
b411b363
PR
4442}
4443
f19e4f8b 4444static int got_Ping(struct drbd_tconn *tconn, enum drbd_packet cmd)
b411b363 4445{
f19e4f8b 4446 return drbd_send_ping_ack(tconn);
b411b363
PR
4447
4448}
4449
f19e4f8b 4450static int got_PingAck(struct drbd_tconn *tconn, enum drbd_packet cmd)
b411b363
PR
4451{
4452 /* restore idle timeout */
2a67d8b9
PR
4453 tconn->meta.socket->sk->sk_rcvtimeo = tconn->net_conf->ping_int*HZ;
4454 if (!test_and_set_bit(GOT_PING_ACK, &tconn->flags))
4455 wake_up(&tconn->ping_wait);
b411b363 4456
81e84650 4457 return true;
b411b363
PR
4458}
4459
d8763023 4460static int got_IsInSync(struct drbd_conf *mdev, enum drbd_packet cmd)
b411b363 4461{
257d0af6 4462 struct p_block_ack *p = &mdev->tconn->meta.rbuf.block_ack;
b411b363
PR
4463 sector_t sector = be64_to_cpu(p->sector);
4464 int blksize = be32_to_cpu(p->blksize);
4465
31890f4a 4466 D_ASSERT(mdev->tconn->agreed_pro_version >= 89);
b411b363
PR
4467
4468 update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4469
1d53f09e
LE
4470 if (get_ldev(mdev)) {
4471 drbd_rs_complete_io(mdev, sector);
4472 drbd_set_in_sync(mdev, sector, blksize);
4473 /* rs_same_csums is supposed to count in units of BM_BLOCK_SIZE */
4474 mdev->rs_same_csum += (blksize >> BM_BLOCK_SHIFT);
4475 put_ldev(mdev);
4476 }
b411b363 4477 dec_rs_pending(mdev);
778f271d 4478 atomic_add(blksize >> 9, &mdev->rs_sect_in);
b411b363 4479
81e84650 4480 return true;
b411b363
PR
4481}
4482
bc9c5c41
AG
4483static int
4484validate_req_change_req_state(struct drbd_conf *mdev, u64 id, sector_t sector,
4485 struct rb_root *root, const char *func,
4486 enum drbd_req_event what, bool missing_ok)
b411b363
PR
4487{
4488 struct drbd_request *req;
4489 struct bio_and_error m;
4490
87eeee41 4491 spin_lock_irq(&mdev->tconn->req_lock);
bc9c5c41 4492 req = find_request(mdev, root, id, sector, missing_ok, func);
b411b363 4493 if (unlikely(!req)) {
87eeee41 4494 spin_unlock_irq(&mdev->tconn->req_lock);
81e84650 4495 return false;
b411b363
PR
4496 }
4497 __req_mod(req, what, &m);
87eeee41 4498 spin_unlock_irq(&mdev->tconn->req_lock);
b411b363
PR
4499
4500 if (m.bio)
4501 complete_master_bio(mdev, &m);
81e84650 4502 return true;
b411b363
PR
4503}
4504
d8763023 4505static int got_BlockAck(struct drbd_conf *mdev, enum drbd_packet cmd)
b411b363 4506{
257d0af6 4507 struct p_block_ack *p = &mdev->tconn->meta.rbuf.block_ack;
b411b363
PR
4508 sector_t sector = be64_to_cpu(p->sector);
4509 int blksize = be32_to_cpu(p->blksize);
4510 enum drbd_req_event what;
4511
4512 update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4513
579b57ed 4514 if (p->block_id == ID_SYNCER) {
b411b363
PR
4515 drbd_set_in_sync(mdev, sector, blksize);
4516 dec_rs_pending(mdev);
81e84650 4517 return true;
b411b363 4518 }
257d0af6 4519 switch (cmd) {
b411b363 4520 case P_RS_WRITE_ACK:
89e58e75 4521 D_ASSERT(mdev->tconn->net_conf->wire_protocol == DRBD_PROT_C);
8554df1c 4522 what = WRITE_ACKED_BY_PEER_AND_SIS;
b411b363
PR
4523 break;
4524 case P_WRITE_ACK:
89e58e75 4525 D_ASSERT(mdev->tconn->net_conf->wire_protocol == DRBD_PROT_C);
8554df1c 4526 what = WRITE_ACKED_BY_PEER;
b411b363
PR
4527 break;
4528 case P_RECV_ACK:
89e58e75 4529 D_ASSERT(mdev->tconn->net_conf->wire_protocol == DRBD_PROT_B);
8554df1c 4530 what = RECV_ACKED_BY_PEER;
b411b363 4531 break;
7be8da07 4532 case P_DISCARD_WRITE:
89e58e75 4533 D_ASSERT(mdev->tconn->net_conf->wire_protocol == DRBD_PROT_C);
7be8da07
AG
4534 what = DISCARD_WRITE;
4535 break;
4536 case P_RETRY_WRITE:
4537 D_ASSERT(mdev->tconn->net_conf->wire_protocol == DRBD_PROT_C);
4538 what = POSTPONE_WRITE;
b411b363
PR
4539 break;
4540 default:
4541 D_ASSERT(0);
81e84650 4542 return false;
b411b363
PR
4543 }
4544
4545 return validate_req_change_req_state(mdev, p->block_id, sector,
bc9c5c41
AG
4546 &mdev->write_requests, __func__,
4547 what, false);
b411b363
PR
4548}
4549
d8763023 4550static int got_NegAck(struct drbd_conf *mdev, enum drbd_packet cmd)
b411b363 4551{
257d0af6 4552 struct p_block_ack *p = &mdev->tconn->meta.rbuf.block_ack;
b411b363 4553 sector_t sector = be64_to_cpu(p->sector);
2deb8336 4554 int size = be32_to_cpu(p->blksize);
89e58e75
PR
4555 bool missing_ok = mdev->tconn->net_conf->wire_protocol == DRBD_PROT_A ||
4556 mdev->tconn->net_conf->wire_protocol == DRBD_PROT_B;
c3afd8f5 4557 bool found;
b411b363
PR
4558
4559 update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4560
579b57ed 4561 if (p->block_id == ID_SYNCER) {
b411b363
PR
4562 dec_rs_pending(mdev);
4563 drbd_rs_failed_io(mdev, sector, size);
81e84650 4564 return true;
b411b363 4565 }
2deb8336 4566
c3afd8f5 4567 found = validate_req_change_req_state(mdev, p->block_id, sector,
bc9c5c41 4568 &mdev->write_requests, __func__,
8554df1c 4569 NEG_ACKED, missing_ok);
c3afd8f5
AG
4570 if (!found) {
4571 /* Protocol A has no P_WRITE_ACKs, but has P_NEG_ACKs.
4572 The master bio might already be completed, therefore the
4573 request is no longer in the collision hash. */
4574 /* In Protocol B we might already have got a P_RECV_ACK
4575 but then get a P_NEG_ACK afterwards. */
4576 if (!missing_ok)
2deb8336 4577 return false;
c3afd8f5 4578 drbd_set_out_of_sync(mdev, sector, size);
2deb8336 4579 }
2deb8336 4580 return true;
b411b363
PR
4581}
4582
d8763023 4583static int got_NegDReply(struct drbd_conf *mdev, enum drbd_packet cmd)
b411b363 4584{
257d0af6 4585 struct p_block_ack *p = &mdev->tconn->meta.rbuf.block_ack;
b411b363
PR
4586 sector_t sector = be64_to_cpu(p->sector);
4587
4588 update_peer_seq(mdev, be32_to_cpu(p->seq_num));
7be8da07 4589
b411b363
PR
4590 dev_err(DEV, "Got NegDReply; Sector %llus, len %u; Fail original request.\n",
4591 (unsigned long long)sector, be32_to_cpu(p->blksize));
4592
4593 return validate_req_change_req_state(mdev, p->block_id, sector,
bc9c5c41 4594 &mdev->read_requests, __func__,
8554df1c 4595 NEG_ACKED, false);
b411b363
PR
4596}
4597
d8763023 4598static int got_NegRSDReply(struct drbd_conf *mdev, enum drbd_packet cmd)
b411b363
PR
4599{
4600 sector_t sector;
4601 int size;
257d0af6 4602 struct p_block_ack *p = &mdev->tconn->meta.rbuf.block_ack;
b411b363
PR
4603
4604 sector = be64_to_cpu(p->sector);
4605 size = be32_to_cpu(p->blksize);
b411b363
PR
4606
4607 update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4608
4609 dec_rs_pending(mdev);
4610
4611 if (get_ldev_if_state(mdev, D_FAILED)) {
4612 drbd_rs_complete_io(mdev, sector);
257d0af6 4613 switch (cmd) {
d612d309
PR
4614 case P_NEG_RS_DREPLY:
4615 drbd_rs_failed_io(mdev, sector, size);
4616 case P_RS_CANCEL:
4617 break;
4618 default:
4619 D_ASSERT(0);
4620 put_ldev(mdev);
4621 return false;
4622 }
b411b363
PR
4623 put_ldev(mdev);
4624 }
4625
81e84650 4626 return true;
b411b363
PR
4627}
4628
d8763023 4629static int got_BarrierAck(struct drbd_conf *mdev, enum drbd_packet cmd)
b411b363 4630{
257d0af6 4631 struct p_barrier_ack *p = &mdev->tconn->meta.rbuf.barrier_ack;
b411b363 4632
2f5cdd0b 4633 tl_release(mdev->tconn, p->barrier, be32_to_cpu(p->set_size));
b411b363 4634
c4752ef1
PR
4635 if (mdev->state.conn == C_AHEAD &&
4636 atomic_read(&mdev->ap_in_flight) == 0 &&
370a43e7
PR
4637 !test_and_set_bit(AHEAD_TO_SYNC_SOURCE, &mdev->current_epoch->flags)) {
4638 mdev->start_resync_timer.expires = jiffies + HZ;
4639 add_timer(&mdev->start_resync_timer);
c4752ef1
PR
4640 }
4641
81e84650 4642 return true;
b411b363
PR
4643}
4644
d8763023 4645static int got_OVResult(struct drbd_conf *mdev, enum drbd_packet cmd)
b411b363 4646{
257d0af6 4647 struct p_block_ack *p = &mdev->tconn->meta.rbuf.block_ack;
b411b363
PR
4648 struct drbd_work *w;
4649 sector_t sector;
4650 int size;
4651
4652 sector = be64_to_cpu(p->sector);
4653 size = be32_to_cpu(p->blksize);
4654
4655 update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4656
4657 if (be64_to_cpu(p->block_id) == ID_OUT_OF_SYNC)
4658 drbd_ov_oos_found(mdev, sector, size);
4659 else
4660 ov_oos_print(mdev);
4661
1d53f09e 4662 if (!get_ldev(mdev))
81e84650 4663 return true;
1d53f09e 4664
b411b363
PR
4665 drbd_rs_complete_io(mdev, sector);
4666 dec_rs_pending(mdev);
4667
ea5442af
LE
4668 --mdev->ov_left;
4669
4670 /* let's advance progress step marks only for every other megabyte */
4671 if ((mdev->ov_left & 0x200) == 0x200)
4672 drbd_advance_rs_marks(mdev, mdev->ov_left);
4673
4674 if (mdev->ov_left == 0) {
b411b363
PR
4675 w = kmalloc(sizeof(*w), GFP_NOIO);
4676 if (w) {
4677 w->cb = w_ov_finished;
a21e9298 4678 w->mdev = mdev;
e42325a5 4679 drbd_queue_work_front(&mdev->tconn->data.work, w);
b411b363
PR
4680 } else {
4681 dev_err(DEV, "kmalloc(w) failed.");
4682 ov_oos_print(mdev);
4683 drbd_resync_finished(mdev);
4684 }
4685 }
1d53f09e 4686 put_ldev(mdev);
81e84650 4687 return true;
b411b363
PR
4688}
4689
d8763023 4690static int got_skip(struct drbd_conf *mdev, enum drbd_packet cmd)
0ced55a3 4691{
81e84650 4692 return true;
0ced55a3
PR
4693}
4694
32862ec7
PR
4695static int tconn_process_done_ee(struct drbd_tconn *tconn)
4696{
082a3439
PR
4697 struct drbd_conf *mdev;
4698 int i, not_empty = 0;
32862ec7
PR
4699
4700 do {
4701 clear_bit(SIGNAL_ASENDER, &tconn->flags);
4702 flush_signals(current);
082a3439
PR
4703 idr_for_each_entry(&tconn->volumes, mdev, i) {
4704 if (!drbd_process_done_ee(mdev))
4705 return 1; /* error */
4706 }
32862ec7 4707 set_bit(SIGNAL_ASENDER, &tconn->flags);
082a3439
PR
4708
4709 spin_lock_irq(&tconn->req_lock);
4710 idr_for_each_entry(&tconn->volumes, mdev, i) {
4711 not_empty = !list_empty(&mdev->done_ee);
4712 if (not_empty)
4713 break;
4714 }
4715 spin_unlock_irq(&tconn->req_lock);
32862ec7
PR
4716 } while (not_empty);
4717
4718 return 0;
4719}
4720
7201b972
AG
4721struct asender_cmd {
4722 size_t pkt_size;
a4fbda8e
PR
4723 enum mdev_or_conn fa_type; /* first argument's type */
4724 union {
4725 int (*mdev_fn)(struct drbd_conf *mdev, enum drbd_packet cmd);
4726 int (*conn_fn)(struct drbd_tconn *tconn, enum drbd_packet cmd);
4727 };
7201b972
AG
4728};
4729
4730static struct asender_cmd asender_tbl[] = {
f19e4f8b
PR
4731 [P_PING] = { sizeof(struct p_header), CONN, { .conn_fn = got_Ping } },
4732 [P_PING_ACK] = { sizeof(struct p_header), CONN, { .conn_fn = got_PingAck } },
a4fbda8e
PR
4733 [P_RECV_ACK] = { sizeof(struct p_block_ack), MDEV, { got_BlockAck } },
4734 [P_WRITE_ACK] = { sizeof(struct p_block_ack), MDEV, { got_BlockAck } },
4735 [P_RS_WRITE_ACK] = { sizeof(struct p_block_ack), MDEV, { got_BlockAck } },
4736 [P_DISCARD_WRITE] = { sizeof(struct p_block_ack), MDEV, { got_BlockAck } },
4737 [P_NEG_ACK] = { sizeof(struct p_block_ack), MDEV, { got_NegAck } },
4738 [P_NEG_DREPLY] = { sizeof(struct p_block_ack), MDEV, { got_NegDReply } },
4739 [P_NEG_RS_DREPLY] = { sizeof(struct p_block_ack), MDEV, { got_NegRSDReply } },
4740 [P_OV_RESULT] = { sizeof(struct p_block_ack), MDEV, { got_OVResult } },
4741 [P_BARRIER_ACK] = { sizeof(struct p_barrier_ack), MDEV, { got_BarrierAck } },
4742 [P_STATE_CHG_REPLY] = { sizeof(struct p_req_state_reply), MDEV, { got_RqSReply } },
4743 [P_RS_IS_IN_SYNC] = { sizeof(struct p_block_ack), MDEV, { got_IsInSync } },
4744 [P_DELAY_PROBE] = { sizeof(struct p_delay_probe93), MDEV, { got_skip } },
4745 [P_RS_CANCEL] = { sizeof(struct p_block_ack), MDEV, { got_NegRSDReply } },
4746 [P_CONN_ST_CHG_REPLY]={ sizeof(struct p_req_state_reply), MDEV, { got_RqSReply } },
4747 [P_RETRY_WRITE] = { sizeof(struct p_block_ack), MDEV, { got_BlockAck } },
7201b972
AG
4748};
4749
b411b363
PR
4750int drbd_asender(struct drbd_thread *thi)
4751{
392c8801 4752 struct drbd_tconn *tconn = thi->tconn;
32862ec7 4753 struct p_header *h = &tconn->meta.rbuf.header;
b411b363 4754 struct asender_cmd *cmd = NULL;
77351055 4755 struct packet_info pi;
257d0af6 4756 int rv;
b411b363
PR
4757 void *buf = h;
4758 int received = 0;
257d0af6 4759 int expect = sizeof(struct p_header);
f36af18c 4760 int ping_timeout_active = 0;
b411b363 4761
b411b363
PR
4762 current->policy = SCHED_RR; /* Make this a realtime task! */
4763 current->rt_priority = 2; /* more important than all other tasks */
4764
e77a0a5c 4765 while (get_t_state(thi) == RUNNING) {
80822284 4766 drbd_thread_current_set_cpu(thi);
32862ec7 4767 if (test_and_clear_bit(SEND_PING, &tconn->flags)) {
2a67d8b9 4768 if (!drbd_send_ping(tconn)) {
32862ec7 4769 conn_err(tconn, "drbd_send_ping has failed\n");
841ce241
AG
4770 goto reconnect;
4771 }
32862ec7
PR
4772 tconn->meta.socket->sk->sk_rcvtimeo =
4773 tconn->net_conf->ping_timeo*HZ/10;
f36af18c 4774 ping_timeout_active = 1;
b411b363
PR
4775 }
4776
32862ec7
PR
4777 /* TODO: conditionally cork; it may hurt latency if we cork without
4778 much to send */
4779 if (!tconn->net_conf->no_cork)
4780 drbd_tcp_cork(tconn->meta.socket);
082a3439
PR
4781 if (tconn_process_done_ee(tconn)) {
4782 conn_err(tconn, "tconn_process_done_ee() failed\n");
32862ec7 4783 goto reconnect;
082a3439 4784 }
b411b363 4785 /* but unconditionally uncork unless disabled */
32862ec7
PR
4786 if (!tconn->net_conf->no_cork)
4787 drbd_tcp_uncork(tconn->meta.socket);
b411b363
PR
4788
4789 /* short circuit, recv_msg would return EINTR anyways. */
4790 if (signal_pending(current))
4791 continue;
4792
32862ec7
PR
4793 rv = drbd_recv_short(tconn->meta.socket, buf, expect-received, 0);
4794 clear_bit(SIGNAL_ASENDER, &tconn->flags);
b411b363
PR
4795
4796 flush_signals(current);
4797
4798 /* Note:
4799 * -EINTR (on meta) we got a signal
4800 * -EAGAIN (on meta) rcvtimeo expired
4801 * -ECONNRESET other side closed the connection
4802 * -ERESTARTSYS (on data) we got a signal
4803 * rv < 0 other than above: unexpected error!
4804 * rv == expected: full header or command
4805 * rv < expected: "woken" by signal during receive
4806 * rv == 0 : "connection shut down by peer"
4807 */
4808 if (likely(rv > 0)) {
4809 received += rv;
4810 buf += rv;
4811 } else if (rv == 0) {
32862ec7 4812 conn_err(tconn, "meta connection shut down by peer.\n");
b411b363
PR
4813 goto reconnect;
4814 } else if (rv == -EAGAIN) {
cb6518cb
LE
4815 /* If the data socket received something meanwhile,
4816 * that is good enough: peer is still alive. */
32862ec7
PR
4817 if (time_after(tconn->last_received,
4818 jiffies - tconn->meta.socket->sk->sk_rcvtimeo))
cb6518cb 4819 continue;
f36af18c 4820 if (ping_timeout_active) {
32862ec7 4821 conn_err(tconn, "PingAck did not arrive in time.\n");
b411b363
PR
4822 goto reconnect;
4823 }
32862ec7 4824 set_bit(SEND_PING, &tconn->flags);
b411b363
PR
4825 continue;
4826 } else if (rv == -EINTR) {
4827 continue;
4828 } else {
32862ec7 4829 conn_err(tconn, "sock_recvmsg returned %d\n", rv);
b411b363
PR
4830 goto reconnect;
4831 }
4832
4833 if (received == expect && cmd == NULL) {
32862ec7 4834 if (!decode_header(tconn, h, &pi))
b411b363 4835 goto reconnect;
7201b972
AG
4836 cmd = &asender_tbl[pi.cmd];
4837 if (pi.cmd >= ARRAY_SIZE(asender_tbl) || !cmd) {
32862ec7 4838 conn_err(tconn, "unknown command %d on meta (l: %d)\n",
77351055 4839 pi.cmd, pi.size);
b411b363
PR
4840 goto disconnect;
4841 }
4842 expect = cmd->pkt_size;
77351055 4843 if (pi.size != expect - sizeof(struct p_header)) {
32862ec7 4844 conn_err(tconn, "Wrong packet size on meta (c: %d, l: %d)\n",
77351055 4845 pi.cmd, pi.size);
b411b363 4846 goto reconnect;
257d0af6 4847 }
b411b363
PR
4848 }
4849 if (received == expect) {
a4fbda8e
PR
4850 bool rv;
4851
4852 if (cmd->fa_type == CONN) {
4853 rv = cmd->conn_fn(tconn, pi.cmd);
4854 } else {
4855 struct drbd_conf *mdev = vnr_to_mdev(tconn, pi.vnr);
4856 rv = cmd->mdev_fn(mdev, pi.cmd);
4857 }
4858
4859 if (!rv)
b411b363
PR
4860 goto reconnect;
4861
a4fbda8e
PR
4862 tconn->last_received = jiffies;
4863
f36af18c
LE
4864 /* the idle_timeout (ping-int)
4865 * has been restored in got_PingAck() */
7201b972 4866 if (cmd == &asender_tbl[P_PING_ACK])
f36af18c
LE
4867 ping_timeout_active = 0;
4868
b411b363
PR
4869 buf = h;
4870 received = 0;
257d0af6 4871 expect = sizeof(struct p_header);
b411b363
PR
4872 cmd = NULL;
4873 }
4874 }
4875
4876 if (0) {
4877reconnect:
bbeb641c 4878 conn_request_state(tconn, NS(conn, C_NETWORK_FAILURE), CS_HARD);
b411b363
PR
4879 }
4880 if (0) {
4881disconnect:
bbeb641c 4882 conn_request_state(tconn, NS(conn, C_DISCONNECTING), CS_HARD);
b411b363 4883 }
32862ec7 4884 clear_bit(SIGNAL_ASENDER, &tconn->flags);
b411b363 4885
32862ec7 4886 conn_info(tconn, "asender terminated\n");
b411b363
PR
4887
4888 return 0;
4889}
This page took 0.416211 seconds and 5 git commands to generate.