drbd: Map from (connection, volume number) to device in the asender handlers
[deliverable/linux.git] / drivers / block / drbd / drbd_receiver.c
CommitLineData
b411b363
PR
1/*
2 drbd_receiver.c
3
4 This file is part of DRBD by Philipp Reisner and Lars Ellenberg.
5
6 Copyright (C) 2001-2008, LINBIT Information Technologies GmbH.
7 Copyright (C) 1999-2008, Philipp Reisner <philipp.reisner@linbit.com>.
8 Copyright (C) 2002-2008, Lars Ellenberg <lars.ellenberg@linbit.com>.
9
10 drbd is free software; you can redistribute it and/or modify
11 it under the terms of the GNU General Public License as published by
12 the Free Software Foundation; either version 2, or (at your option)
13 any later version.
14
15 drbd is distributed in the hope that it will be useful,
16 but WITHOUT ANY WARRANTY; without even the implied warranty of
17 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18 GNU General Public License for more details.
19
20 You should have received a copy of the GNU General Public License
21 along with drbd; see the file COPYING. If not, write to
22 the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
23 */
24
25
b411b363
PR
26#include <linux/module.h>
27
28#include <asm/uaccess.h>
29#include <net/sock.h>
30
b411b363
PR
31#include <linux/drbd.h>
32#include <linux/fs.h>
33#include <linux/file.h>
34#include <linux/in.h>
35#include <linux/mm.h>
36#include <linux/memcontrol.h>
37#include <linux/mm_inline.h>
38#include <linux/slab.h>
b411b363
PR
39#include <linux/pkt_sched.h>
40#define __KERNEL_SYSCALLS__
41#include <linux/unistd.h>
42#include <linux/vmalloc.h>
43#include <linux/random.h>
b411b363
PR
44#include <linux/string.h>
45#include <linux/scatterlist.h>
46#include "drbd_int.h"
b411b363
PR
47#include "drbd_req.h"
48
49#include "drbd_vli.h"
50
77351055
PR
51struct packet_info {
52 enum drbd_packet cmd;
e2857216
AG
53 unsigned int size;
54 unsigned int vnr;
77351055
PR
55};
56
b411b363
PR
57enum finish_epoch {
58 FE_STILL_LIVE,
59 FE_DESTROYED,
60 FE_RECYCLED,
61};
62
65d11ed6 63static int drbd_do_handshake(struct drbd_tconn *tconn);
13e6037d 64static int drbd_do_auth(struct drbd_tconn *tconn);
360cc740 65static int drbd_disconnected(int vnr, void *p, void *data);
b411b363
PR
66
67static enum finish_epoch drbd_may_finish_epoch(struct drbd_conf *, struct drbd_epoch *, enum epoch_event);
99920dc5 68static int e_end_block(struct drbd_work *, int);
b411b363 69
b411b363
PR
70
71#define GFP_TRY (__GFP_HIGHMEM | __GFP_NOWARN)
72
45bb912b
LE
73/*
74 * some helper functions to deal with single linked page lists,
75 * page->private being our "next" pointer.
76 */
77
78/* If at least n pages are linked at head, get n pages off.
79 * Otherwise, don't modify head, and return NULL.
80 * Locking is the responsibility of the caller.
81 */
82static struct page *page_chain_del(struct page **head, int n)
83{
84 struct page *page;
85 struct page *tmp;
86
87 BUG_ON(!n);
88 BUG_ON(!head);
89
90 page = *head;
23ce4227
PR
91
92 if (!page)
93 return NULL;
94
45bb912b
LE
95 while (page) {
96 tmp = page_chain_next(page);
97 if (--n == 0)
98 break; /* found sufficient pages */
99 if (tmp == NULL)
100 /* insufficient pages, don't use any of them. */
101 return NULL;
102 page = tmp;
103 }
104
105 /* add end of list marker for the returned list */
106 set_page_private(page, 0);
107 /* actual return value, and adjustment of head */
108 page = *head;
109 *head = tmp;
110 return page;
111}
112
113/* may be used outside of locks to find the tail of a (usually short)
114 * "private" page chain, before adding it back to a global chain head
115 * with page_chain_add() under a spinlock. */
116static struct page *page_chain_tail(struct page *page, int *len)
117{
118 struct page *tmp;
119 int i = 1;
120 while ((tmp = page_chain_next(page)))
121 ++i, page = tmp;
122 if (len)
123 *len = i;
124 return page;
125}
126
127static int page_chain_free(struct page *page)
128{
129 struct page *tmp;
130 int i = 0;
131 page_chain_for_each_safe(page, tmp) {
132 put_page(page);
133 ++i;
134 }
135 return i;
136}
137
138static void page_chain_add(struct page **head,
139 struct page *chain_first, struct page *chain_last)
140{
141#if 1
142 struct page *tmp;
143 tmp = page_chain_tail(chain_first, NULL);
144 BUG_ON(tmp != chain_last);
145#endif
146
147 /* add chain to head */
148 set_page_private(chain_last, (unsigned long)*head);
149 *head = chain_first;
150}
151
152static struct page *drbd_pp_first_pages_or_try_alloc(struct drbd_conf *mdev, int number)
b411b363
PR
153{
154 struct page *page = NULL;
45bb912b
LE
155 struct page *tmp = NULL;
156 int i = 0;
b411b363
PR
157
158 /* Yes, testing drbd_pp_vacant outside the lock is racy.
159 * So what. It saves a spin_lock. */
45bb912b 160 if (drbd_pp_vacant >= number) {
b411b363 161 spin_lock(&drbd_pp_lock);
45bb912b
LE
162 page = page_chain_del(&drbd_pp_pool, number);
163 if (page)
164 drbd_pp_vacant -= number;
b411b363 165 spin_unlock(&drbd_pp_lock);
45bb912b
LE
166 if (page)
167 return page;
b411b363 168 }
45bb912b 169
b411b363
PR
170 /* GFP_TRY, because we must not cause arbitrary write-out: in a DRBD
171 * "criss-cross" setup, that might cause write-out on some other DRBD,
172 * which in turn might block on the other node at this very place. */
45bb912b
LE
173 for (i = 0; i < number; i++) {
174 tmp = alloc_page(GFP_TRY);
175 if (!tmp)
176 break;
177 set_page_private(tmp, (unsigned long)page);
178 page = tmp;
179 }
180
181 if (i == number)
182 return page;
183
184 /* Not enough pages immediately available this time.
185 * No need to jump around here, drbd_pp_alloc will retry this
186 * function "soon". */
187 if (page) {
188 tmp = page_chain_tail(page, NULL);
189 spin_lock(&drbd_pp_lock);
190 page_chain_add(&drbd_pp_pool, page, tmp);
191 drbd_pp_vacant += i;
192 spin_unlock(&drbd_pp_lock);
193 }
194 return NULL;
b411b363
PR
195}
196
b411b363
PR
197static void reclaim_net_ee(struct drbd_conf *mdev, struct list_head *to_be_freed)
198{
db830c46 199 struct drbd_peer_request *peer_req;
b411b363
PR
200 struct list_head *le, *tle;
201
202 /* The EEs are always appended to the end of the list. Since
203 they are sent in order over the wire, they have to finish
204 in order. As soon as we see the first not finished we can
205 stop to examine the list... */
206
207 list_for_each_safe(le, tle, &mdev->net_ee) {
db830c46
AG
208 peer_req = list_entry(le, struct drbd_peer_request, w.list);
209 if (drbd_ee_has_active_page(peer_req))
b411b363
PR
210 break;
211 list_move(le, to_be_freed);
212 }
213}
214
215static void drbd_kick_lo_and_reclaim_net(struct drbd_conf *mdev)
216{
217 LIST_HEAD(reclaimed);
db830c46 218 struct drbd_peer_request *peer_req, *t;
b411b363 219
87eeee41 220 spin_lock_irq(&mdev->tconn->req_lock);
b411b363 221 reclaim_net_ee(mdev, &reclaimed);
87eeee41 222 spin_unlock_irq(&mdev->tconn->req_lock);
b411b363 223
db830c46
AG
224 list_for_each_entry_safe(peer_req, t, &reclaimed, w.list)
225 drbd_free_net_ee(mdev, peer_req);
b411b363
PR
226}
227
228/**
45bb912b 229 * drbd_pp_alloc() - Returns @number pages, retries forever (or until signalled)
b411b363 230 * @mdev: DRBD device.
45bb912b
LE
231 * @number: number of pages requested
232 * @retry: whether to retry, if not enough pages are available right now
233 *
234 * Tries to allocate number pages, first from our own page pool, then from
235 * the kernel, unless this allocation would exceed the max_buffers setting.
236 * Possibly retry until DRBD frees sufficient pages somewhere else.
b411b363 237 *
45bb912b 238 * Returns a page chain linked via page->private.
b411b363 239 */
45bb912b 240static struct page *drbd_pp_alloc(struct drbd_conf *mdev, unsigned number, bool retry)
b411b363
PR
241{
242 struct page *page = NULL;
243 DEFINE_WAIT(wait);
244
45bb912b
LE
245 /* Yes, we may run up to @number over max_buffers. If we
246 * follow it strictly, the admin will get it wrong anyways. */
89e58e75 247 if (atomic_read(&mdev->pp_in_use) < mdev->tconn->net_conf->max_buffers)
45bb912b 248 page = drbd_pp_first_pages_or_try_alloc(mdev, number);
b411b363 249
45bb912b 250 while (page == NULL) {
b411b363
PR
251 prepare_to_wait(&drbd_pp_wait, &wait, TASK_INTERRUPTIBLE);
252
253 drbd_kick_lo_and_reclaim_net(mdev);
254
89e58e75 255 if (atomic_read(&mdev->pp_in_use) < mdev->tconn->net_conf->max_buffers) {
45bb912b 256 page = drbd_pp_first_pages_or_try_alloc(mdev, number);
b411b363
PR
257 if (page)
258 break;
259 }
260
261 if (!retry)
262 break;
263
264 if (signal_pending(current)) {
265 dev_warn(DEV, "drbd_pp_alloc interrupted!\n");
266 break;
267 }
268
269 schedule();
270 }
271 finish_wait(&drbd_pp_wait, &wait);
272
45bb912b
LE
273 if (page)
274 atomic_add(number, &mdev->pp_in_use);
b411b363
PR
275 return page;
276}
277
278/* Must not be used from irq, as that may deadlock: see drbd_pp_alloc.
87eeee41 279 * Is also used from inside an other spin_lock_irq(&mdev->tconn->req_lock);
45bb912b
LE
280 * Either links the page chain back to the global pool,
281 * or returns all pages to the system. */
435f0740 282static void drbd_pp_free(struct drbd_conf *mdev, struct page *page, int is_net)
b411b363 283{
435f0740 284 atomic_t *a = is_net ? &mdev->pp_in_use_by_net : &mdev->pp_in_use;
b411b363 285 int i;
435f0740 286
81a5d60e 287 if (drbd_pp_vacant > (DRBD_MAX_BIO_SIZE/PAGE_SIZE) * minor_count)
45bb912b
LE
288 i = page_chain_free(page);
289 else {
290 struct page *tmp;
291 tmp = page_chain_tail(page, &i);
292 spin_lock(&drbd_pp_lock);
293 page_chain_add(&drbd_pp_pool, page, tmp);
294 drbd_pp_vacant += i;
295 spin_unlock(&drbd_pp_lock);
b411b363 296 }
435f0740 297 i = atomic_sub_return(i, a);
45bb912b 298 if (i < 0)
435f0740
LE
299 dev_warn(DEV, "ASSERTION FAILED: %s: %d < 0\n",
300 is_net ? "pp_in_use_by_net" : "pp_in_use", i);
b411b363
PR
301 wake_up(&drbd_pp_wait);
302}
303
304/*
305You need to hold the req_lock:
306 _drbd_wait_ee_list_empty()
307
308You must not have the req_lock:
309 drbd_free_ee()
310 drbd_alloc_ee()
311 drbd_init_ee()
312 drbd_release_ee()
313 drbd_ee_fix_bhs()
314 drbd_process_done_ee()
315 drbd_clear_done_ee()
316 drbd_wait_ee_list_empty()
317*/
318
f6ffca9f
AG
319struct drbd_peer_request *
320drbd_alloc_ee(struct drbd_conf *mdev, u64 id, sector_t sector,
321 unsigned int data_size, gfp_t gfp_mask) __must_hold(local)
b411b363 322{
db830c46 323 struct drbd_peer_request *peer_req;
b411b363 324 struct page *page;
45bb912b 325 unsigned nr_pages = (data_size + PAGE_SIZE -1) >> PAGE_SHIFT;
b411b363 326
0cf9d27e 327 if (drbd_insert_fault(mdev, DRBD_FAULT_AL_EE))
b411b363
PR
328 return NULL;
329
db830c46
AG
330 peer_req = mempool_alloc(drbd_ee_mempool, gfp_mask & ~__GFP_HIGHMEM);
331 if (!peer_req) {
b411b363
PR
332 if (!(gfp_mask & __GFP_NOWARN))
333 dev_err(DEV, "alloc_ee: Allocation of an EE failed\n");
334 return NULL;
335 }
336
45bb912b
LE
337 page = drbd_pp_alloc(mdev, nr_pages, (gfp_mask & __GFP_WAIT));
338 if (!page)
339 goto fail;
b411b363 340
db830c46
AG
341 drbd_clear_interval(&peer_req->i);
342 peer_req->i.size = data_size;
343 peer_req->i.sector = sector;
344 peer_req->i.local = false;
345 peer_req->i.waiting = false;
346
347 peer_req->epoch = NULL;
a21e9298 348 peer_req->w.mdev = mdev;
db830c46
AG
349 peer_req->pages = page;
350 atomic_set(&peer_req->pending_bios, 0);
351 peer_req->flags = 0;
9a8e7753
AG
352 /*
353 * The block_id is opaque to the receiver. It is not endianness
354 * converted, and sent back to the sender unchanged.
355 */
db830c46 356 peer_req->block_id = id;
b411b363 357
db830c46 358 return peer_req;
b411b363 359
45bb912b 360 fail:
db830c46 361 mempool_free(peer_req, drbd_ee_mempool);
b411b363
PR
362 return NULL;
363}
364
db830c46 365void drbd_free_some_ee(struct drbd_conf *mdev, struct drbd_peer_request *peer_req,
f6ffca9f 366 int is_net)
b411b363 367{
db830c46
AG
368 if (peer_req->flags & EE_HAS_DIGEST)
369 kfree(peer_req->digest);
370 drbd_pp_free(mdev, peer_req->pages, is_net);
371 D_ASSERT(atomic_read(&peer_req->pending_bios) == 0);
372 D_ASSERT(drbd_interval_empty(&peer_req->i));
373 mempool_free(peer_req, drbd_ee_mempool);
b411b363
PR
374}
375
376int drbd_release_ee(struct drbd_conf *mdev, struct list_head *list)
377{
378 LIST_HEAD(work_list);
db830c46 379 struct drbd_peer_request *peer_req, *t;
b411b363 380 int count = 0;
435f0740 381 int is_net = list == &mdev->net_ee;
b411b363 382
87eeee41 383 spin_lock_irq(&mdev->tconn->req_lock);
b411b363 384 list_splice_init(list, &work_list);
87eeee41 385 spin_unlock_irq(&mdev->tconn->req_lock);
b411b363 386
db830c46
AG
387 list_for_each_entry_safe(peer_req, t, &work_list, w.list) {
388 drbd_free_some_ee(mdev, peer_req, is_net);
b411b363
PR
389 count++;
390 }
391 return count;
392}
393
394
32862ec7 395/* See also comments in _req_mod(,BARRIER_ACKED)
b411b363
PR
396 * and receive_Barrier.
397 *
398 * Move entries from net_ee to done_ee, if ready.
399 * Grab done_ee, call all callbacks, free the entries.
400 * The callbacks typically send out ACKs.
401 */
402static int drbd_process_done_ee(struct drbd_conf *mdev)
403{
404 LIST_HEAD(work_list);
405 LIST_HEAD(reclaimed);
db830c46 406 struct drbd_peer_request *peer_req, *t;
e2b3032b 407 int err = 0;
b411b363 408
87eeee41 409 spin_lock_irq(&mdev->tconn->req_lock);
b411b363
PR
410 reclaim_net_ee(mdev, &reclaimed);
411 list_splice_init(&mdev->done_ee, &work_list);
87eeee41 412 spin_unlock_irq(&mdev->tconn->req_lock);
b411b363 413
db830c46
AG
414 list_for_each_entry_safe(peer_req, t, &reclaimed, w.list)
415 drbd_free_net_ee(mdev, peer_req);
b411b363
PR
416
417 /* possible callbacks here:
7be8da07 418 * e_end_block, and e_end_resync_block, e_send_discard_write.
b411b363
PR
419 * all ignore the last argument.
420 */
db830c46 421 list_for_each_entry_safe(peer_req, t, &work_list, w.list) {
e2b3032b
AG
422 int err2;
423
b411b363 424 /* list_del not necessary, next/prev members not touched */
e2b3032b
AG
425 err2 = peer_req->w.cb(&peer_req->w, !!err);
426 if (!err)
427 err = err2;
db830c46 428 drbd_free_ee(mdev, peer_req);
b411b363
PR
429 }
430 wake_up(&mdev->ee_wait);
431
e2b3032b 432 return err;
b411b363
PR
433}
434
435void _drbd_wait_ee_list_empty(struct drbd_conf *mdev, struct list_head *head)
436{
437 DEFINE_WAIT(wait);
438
439 /* avoids spin_lock/unlock
440 * and calling prepare_to_wait in the fast path */
441 while (!list_empty(head)) {
442 prepare_to_wait(&mdev->ee_wait, &wait, TASK_UNINTERRUPTIBLE);
87eeee41 443 spin_unlock_irq(&mdev->tconn->req_lock);
7eaceacc 444 io_schedule();
b411b363 445 finish_wait(&mdev->ee_wait, &wait);
87eeee41 446 spin_lock_irq(&mdev->tconn->req_lock);
b411b363
PR
447 }
448}
449
450void drbd_wait_ee_list_empty(struct drbd_conf *mdev, struct list_head *head)
451{
87eeee41 452 spin_lock_irq(&mdev->tconn->req_lock);
b411b363 453 _drbd_wait_ee_list_empty(mdev, head);
87eeee41 454 spin_unlock_irq(&mdev->tconn->req_lock);
b411b363
PR
455}
456
457/* see also kernel_accept; which is only present since 2.6.18.
458 * also we want to log which part of it failed, exactly */
7653620d 459static int drbd_accept(const char **what, struct socket *sock, struct socket **newsock)
b411b363
PR
460{
461 struct sock *sk = sock->sk;
462 int err = 0;
463
464 *what = "listen";
465 err = sock->ops->listen(sock, 5);
466 if (err < 0)
467 goto out;
468
469 *what = "sock_create_lite";
470 err = sock_create_lite(sk->sk_family, sk->sk_type, sk->sk_protocol,
471 newsock);
472 if (err < 0)
473 goto out;
474
475 *what = "accept";
476 err = sock->ops->accept(sock, *newsock, 0);
477 if (err < 0) {
478 sock_release(*newsock);
479 *newsock = NULL;
480 goto out;
481 }
482 (*newsock)->ops = sock->ops;
483
484out:
485 return err;
486}
487
dbd9eea0 488static int drbd_recv_short(struct socket *sock, void *buf, size_t size, int flags)
b411b363
PR
489{
490 mm_segment_t oldfs;
491 struct kvec iov = {
492 .iov_base = buf,
493 .iov_len = size,
494 };
495 struct msghdr msg = {
496 .msg_iovlen = 1,
497 .msg_iov = (struct iovec *)&iov,
498 .msg_flags = (flags ? flags : MSG_WAITALL | MSG_NOSIGNAL)
499 };
500 int rv;
501
502 oldfs = get_fs();
503 set_fs(KERNEL_DS);
504 rv = sock_recvmsg(sock, &msg, size, msg.msg_flags);
505 set_fs(oldfs);
506
507 return rv;
508}
509
de0ff338 510static int drbd_recv(struct drbd_tconn *tconn, void *buf, size_t size)
b411b363
PR
511{
512 mm_segment_t oldfs;
513 struct kvec iov = {
514 .iov_base = buf,
515 .iov_len = size,
516 };
517 struct msghdr msg = {
518 .msg_iovlen = 1,
519 .msg_iov = (struct iovec *)&iov,
520 .msg_flags = MSG_WAITALL | MSG_NOSIGNAL
521 };
522 int rv;
523
524 oldfs = get_fs();
525 set_fs(KERNEL_DS);
526
527 for (;;) {
de0ff338 528 rv = sock_recvmsg(tconn->data.socket, &msg, size, msg.msg_flags);
b411b363
PR
529 if (rv == size)
530 break;
531
532 /* Note:
533 * ECONNRESET other side closed the connection
534 * ERESTARTSYS (on sock) we got a signal
535 */
536
537 if (rv < 0) {
538 if (rv == -ECONNRESET)
de0ff338 539 conn_info(tconn, "sock was reset by peer\n");
b411b363 540 else if (rv != -ERESTARTSYS)
de0ff338 541 conn_err(tconn, "sock_recvmsg returned %d\n", rv);
b411b363
PR
542 break;
543 } else if (rv == 0) {
de0ff338 544 conn_info(tconn, "sock was shut down by peer\n");
b411b363
PR
545 break;
546 } else {
547 /* signal came in, or peer/link went down,
548 * after we read a partial message
549 */
550 /* D_ASSERT(signal_pending(current)); */
551 break;
552 }
553 };
554
555 set_fs(oldfs);
556
557 if (rv != size)
bbeb641c 558 conn_request_state(tconn, NS(conn, C_BROKEN_PIPE), CS_HARD);
b411b363
PR
559
560 return rv;
561}
562
c6967746
AG
563static int drbd_recv_all(struct drbd_tconn *tconn, void *buf, size_t size)
564{
565 int err;
566
567 err = drbd_recv(tconn, buf, size);
568 if (err != size) {
569 if (err >= 0)
570 err = -EIO;
571 } else
572 err = 0;
573 return err;
574}
575
a5c31904
AG
576static int drbd_recv_all_warn(struct drbd_tconn *tconn, void *buf, size_t size)
577{
578 int err;
579
580 err = drbd_recv_all(tconn, buf, size);
581 if (err && !signal_pending(current))
582 conn_warn(tconn, "short read (expected size %d)\n", (int)size);
583 return err;
584}
585
5dbf1673
LE
586/* quoting tcp(7):
587 * On individual connections, the socket buffer size must be set prior to the
588 * listen(2) or connect(2) calls in order to have it take effect.
589 * This is our wrapper to do so.
590 */
591static void drbd_setbufsize(struct socket *sock, unsigned int snd,
592 unsigned int rcv)
593{
594 /* open coded SO_SNDBUF, SO_RCVBUF */
595 if (snd) {
596 sock->sk->sk_sndbuf = snd;
597 sock->sk->sk_userlocks |= SOCK_SNDBUF_LOCK;
598 }
599 if (rcv) {
600 sock->sk->sk_rcvbuf = rcv;
601 sock->sk->sk_userlocks |= SOCK_RCVBUF_LOCK;
602 }
603}
604
eac3e990 605static struct socket *drbd_try_connect(struct drbd_tconn *tconn)
b411b363
PR
606{
607 const char *what;
608 struct socket *sock;
609 struct sockaddr_in6 src_in6;
610 int err;
611 int disconnect_on_error = 1;
612
eac3e990 613 if (!get_net_conf(tconn))
b411b363
PR
614 return NULL;
615
616 what = "sock_create_kern";
eac3e990 617 err = sock_create_kern(((struct sockaddr *)tconn->net_conf->my_addr)->sa_family,
b411b363
PR
618 SOCK_STREAM, IPPROTO_TCP, &sock);
619 if (err < 0) {
620 sock = NULL;
621 goto out;
622 }
623
624 sock->sk->sk_rcvtimeo =
eac3e990
PR
625 sock->sk->sk_sndtimeo = tconn->net_conf->try_connect_int*HZ;
626 drbd_setbufsize(sock, tconn->net_conf->sndbuf_size,
627 tconn->net_conf->rcvbuf_size);
b411b363
PR
628
629 /* explicitly bind to the configured IP as source IP
630 * for the outgoing connections.
631 * This is needed for multihomed hosts and to be
632 * able to use lo: interfaces for drbd.
633 * Make sure to use 0 as port number, so linux selects
634 * a free one dynamically.
635 */
eac3e990
PR
636 memcpy(&src_in6, tconn->net_conf->my_addr,
637 min_t(int, tconn->net_conf->my_addr_len, sizeof(src_in6)));
638 if (((struct sockaddr *)tconn->net_conf->my_addr)->sa_family == AF_INET6)
b411b363
PR
639 src_in6.sin6_port = 0;
640 else
641 ((struct sockaddr_in *)&src_in6)->sin_port = 0; /* AF_INET & AF_SCI */
642
643 what = "bind before connect";
644 err = sock->ops->bind(sock,
645 (struct sockaddr *) &src_in6,
eac3e990 646 tconn->net_conf->my_addr_len);
b411b363
PR
647 if (err < 0)
648 goto out;
649
650 /* connect may fail, peer not yet available.
651 * stay C_WF_CONNECTION, don't go Disconnecting! */
652 disconnect_on_error = 0;
653 what = "connect";
654 err = sock->ops->connect(sock,
eac3e990
PR
655 (struct sockaddr *)tconn->net_conf->peer_addr,
656 tconn->net_conf->peer_addr_len, 0);
b411b363
PR
657
658out:
659 if (err < 0) {
660 if (sock) {
661 sock_release(sock);
662 sock = NULL;
663 }
664 switch (-err) {
665 /* timeout, busy, signal pending */
666 case ETIMEDOUT: case EAGAIN: case EINPROGRESS:
667 case EINTR: case ERESTARTSYS:
668 /* peer not (yet) available, network problem */
669 case ECONNREFUSED: case ENETUNREACH:
670 case EHOSTDOWN: case EHOSTUNREACH:
671 disconnect_on_error = 0;
672 break;
673 default:
eac3e990 674 conn_err(tconn, "%s failed, err = %d\n", what, err);
b411b363
PR
675 }
676 if (disconnect_on_error)
bbeb641c 677 conn_request_state(tconn, NS(conn, C_DISCONNECTING), CS_HARD);
b411b363 678 }
eac3e990 679 put_net_conf(tconn);
b411b363
PR
680 return sock;
681}
682
7653620d 683static struct socket *drbd_wait_for_connect(struct drbd_tconn *tconn)
b411b363
PR
684{
685 int timeo, err;
686 struct socket *s_estab = NULL, *s_listen;
687 const char *what;
688
7653620d 689 if (!get_net_conf(tconn))
b411b363
PR
690 return NULL;
691
692 what = "sock_create_kern";
7653620d 693 err = sock_create_kern(((struct sockaddr *)tconn->net_conf->my_addr)->sa_family,
b411b363
PR
694 SOCK_STREAM, IPPROTO_TCP, &s_listen);
695 if (err) {
696 s_listen = NULL;
697 goto out;
698 }
699
7653620d 700 timeo = tconn->net_conf->try_connect_int * HZ;
b411b363
PR
701 timeo += (random32() & 1) ? timeo / 7 : -timeo / 7; /* 28.5% random jitter */
702
703 s_listen->sk->sk_reuse = 1; /* SO_REUSEADDR */
704 s_listen->sk->sk_rcvtimeo = timeo;
705 s_listen->sk->sk_sndtimeo = timeo;
7653620d
PR
706 drbd_setbufsize(s_listen, tconn->net_conf->sndbuf_size,
707 tconn->net_conf->rcvbuf_size);
b411b363
PR
708
709 what = "bind before listen";
710 err = s_listen->ops->bind(s_listen,
7653620d
PR
711 (struct sockaddr *) tconn->net_conf->my_addr,
712 tconn->net_conf->my_addr_len);
b411b363
PR
713 if (err < 0)
714 goto out;
715
7653620d 716 err = drbd_accept(&what, s_listen, &s_estab);
b411b363
PR
717
718out:
719 if (s_listen)
720 sock_release(s_listen);
721 if (err < 0) {
722 if (err != -EAGAIN && err != -EINTR && err != -ERESTARTSYS) {
7653620d 723 conn_err(tconn, "%s failed, err = %d\n", what, err);
bbeb641c 724 conn_request_state(tconn, NS(conn, C_DISCONNECTING), CS_HARD);
b411b363
PR
725 }
726 }
7653620d 727 put_net_conf(tconn);
b411b363
PR
728
729 return s_estab;
730}
731
d38e787e 732static int drbd_send_fp(struct drbd_tconn *tconn, struct socket *sock, enum drbd_packet cmd)
b411b363 733{
5a87d920 734 struct p_header *h = tconn->data.sbuf;
b411b363 735
ecf2363c 736 return !_conn_send_cmd(tconn, 0, sock, cmd, h, sizeof(*h), 0);
b411b363
PR
737}
738
a25b63f1 739static enum drbd_packet drbd_recv_fp(struct drbd_tconn *tconn, struct socket *sock)
b411b363 740{
e6ef8a5c 741 struct p_header80 *h = tconn->data.rbuf;
b411b363
PR
742 int rr;
743
dbd9eea0 744 rr = drbd_recv_short(sock, h, sizeof(*h), 0);
b411b363 745
ca9bc12b 746 if (rr == sizeof(*h) && h->magic == cpu_to_be32(DRBD_MAGIC))
b411b363
PR
747 return be16_to_cpu(h->command);
748
749 return 0xffff;
750}
751
752/**
753 * drbd_socket_okay() - Free the socket if its connection is not okay
b411b363
PR
754 * @sock: pointer to the pointer to the socket.
755 */
dbd9eea0 756static int drbd_socket_okay(struct socket **sock)
b411b363
PR
757{
758 int rr;
759 char tb[4];
760
761 if (!*sock)
81e84650 762 return false;
b411b363 763
dbd9eea0 764 rr = drbd_recv_short(*sock, tb, 4, MSG_DONTWAIT | MSG_PEEK);
b411b363
PR
765
766 if (rr > 0 || rr == -EAGAIN) {
81e84650 767 return true;
b411b363
PR
768 } else {
769 sock_release(*sock);
770 *sock = NULL;
81e84650 771 return false;
b411b363
PR
772 }
773}
2325eb66
PR
774/* Gets called if a connection is established, or if a new minor gets created
775 in a connection */
776int drbd_connected(int vnr, void *p, void *data)
907599e0
PR
777{
778 struct drbd_conf *mdev = (struct drbd_conf *)p;
0829f5ed 779 int err;
907599e0
PR
780
781 atomic_set(&mdev->packet_seq, 0);
782 mdev->peer_seq = 0;
783
8410da8f
PR
784 mdev->state_mutex = mdev->tconn->agreed_pro_version < 100 ?
785 &mdev->tconn->cstate_mutex :
786 &mdev->own_state_mutex;
787
0829f5ed
AG
788 err = drbd_send_sync_param(mdev);
789 if (!err)
790 err = drbd_send_sizes(mdev, 0, 0);
791 if (!err)
792 err = drbd_send_uuids(mdev);
793 if (!err)
794 err = drbd_send_state(mdev);
907599e0
PR
795 clear_bit(USE_DEGR_WFC_T, &mdev->flags);
796 clear_bit(RESIZE_PENDING, &mdev->flags);
0829f5ed 797 return err;
907599e0
PR
798}
799
b411b363
PR
800/*
801 * return values:
802 * 1 yes, we have a valid connection
803 * 0 oops, did not work out, please try again
804 * -1 peer talks different language,
805 * no point in trying again, please go standalone.
806 * -2 We do not have a network config...
807 */
907599e0 808static int drbd_connect(struct drbd_tconn *tconn)
b411b363
PR
809{
810 struct socket *s, *sock, *msock;
811 int try, h, ok;
812
bbeb641c 813 if (conn_request_state(tconn, NS(conn, C_WF_CONNECTION), CS_VERBOSE) < SS_SUCCESS)
b411b363
PR
814 return -2;
815
907599e0 816 clear_bit(DISCARD_CONCURRENT, &tconn->flags);
0916e0e3
AG
817
818 /* Assume that the peer only understands protocol 80 until we know better. */
819 tconn->agreed_pro_version = 80;
b411b363
PR
820
821 sock = NULL;
822 msock = NULL;
823
824 do {
825 for (try = 0;;) {
826 /* 3 tries, this should take less than a second! */
907599e0 827 s = drbd_try_connect(tconn);
b411b363
PR
828 if (s || ++try >= 3)
829 break;
830 /* give the other side time to call bind() & listen() */
20ee6390 831 schedule_timeout_interruptible(HZ / 10);
b411b363
PR
832 }
833
834 if (s) {
835 if (!sock) {
907599e0 836 drbd_send_fp(tconn, s, P_HAND_SHAKE_S);
b411b363
PR
837 sock = s;
838 s = NULL;
839 } else if (!msock) {
907599e0 840 drbd_send_fp(tconn, s, P_HAND_SHAKE_M);
b411b363
PR
841 msock = s;
842 s = NULL;
843 } else {
907599e0 844 conn_err(tconn, "Logic error in drbd_connect()\n");
b411b363
PR
845 goto out_release_sockets;
846 }
847 }
848
849 if (sock && msock) {
907599e0 850 schedule_timeout_interruptible(tconn->net_conf->ping_timeo*HZ/10);
dbd9eea0
PR
851 ok = drbd_socket_okay(&sock);
852 ok = drbd_socket_okay(&msock) && ok;
b411b363
PR
853 if (ok)
854 break;
855 }
856
857retry:
907599e0 858 s = drbd_wait_for_connect(tconn);
b411b363 859 if (s) {
907599e0 860 try = drbd_recv_fp(tconn, s);
dbd9eea0
PR
861 drbd_socket_okay(&sock);
862 drbd_socket_okay(&msock);
b411b363
PR
863 switch (try) {
864 case P_HAND_SHAKE_S:
865 if (sock) {
907599e0 866 conn_warn(tconn, "initial packet S crossed\n");
b411b363
PR
867 sock_release(sock);
868 }
869 sock = s;
870 break;
871 case P_HAND_SHAKE_M:
872 if (msock) {
907599e0 873 conn_warn(tconn, "initial packet M crossed\n");
b411b363
PR
874 sock_release(msock);
875 }
876 msock = s;
907599e0 877 set_bit(DISCARD_CONCURRENT, &tconn->flags);
b411b363
PR
878 break;
879 default:
907599e0 880 conn_warn(tconn, "Error receiving initial packet\n");
b411b363
PR
881 sock_release(s);
882 if (random32() & 1)
883 goto retry;
884 }
885 }
886
bbeb641c 887 if (tconn->cstate <= C_DISCONNECTING)
b411b363
PR
888 goto out_release_sockets;
889 if (signal_pending(current)) {
890 flush_signals(current);
891 smp_rmb();
907599e0 892 if (get_t_state(&tconn->receiver) == EXITING)
b411b363
PR
893 goto out_release_sockets;
894 }
895
896 if (sock && msock) {
dbd9eea0
PR
897 ok = drbd_socket_okay(&sock);
898 ok = drbd_socket_okay(&msock) && ok;
b411b363
PR
899 if (ok)
900 break;
901 }
902 } while (1);
903
904 msock->sk->sk_reuse = 1; /* SO_REUSEADDR */
905 sock->sk->sk_reuse = 1; /* SO_REUSEADDR */
906
907 sock->sk->sk_allocation = GFP_NOIO;
908 msock->sk->sk_allocation = GFP_NOIO;
909
910 sock->sk->sk_priority = TC_PRIO_INTERACTIVE_BULK;
911 msock->sk->sk_priority = TC_PRIO_INTERACTIVE;
912
b411b363 913 /* NOT YET ...
907599e0 914 * sock->sk->sk_sndtimeo = tconn->net_conf->timeout*HZ/10;
b411b363
PR
915 * sock->sk->sk_rcvtimeo = MAX_SCHEDULE_TIMEOUT;
916 * first set it to the P_HAND_SHAKE timeout,
917 * which we set to 4x the configured ping_timeout. */
918 sock->sk->sk_sndtimeo =
907599e0 919 sock->sk->sk_rcvtimeo = tconn->net_conf->ping_timeo*4*HZ/10;
b411b363 920
907599e0
PR
921 msock->sk->sk_sndtimeo = tconn->net_conf->timeout*HZ/10;
922 msock->sk->sk_rcvtimeo = tconn->net_conf->ping_int*HZ;
b411b363
PR
923
924 /* we don't want delays.
25985edc 925 * we use TCP_CORK where appropriate, though */
b411b363
PR
926 drbd_tcp_nodelay(sock);
927 drbd_tcp_nodelay(msock);
928
907599e0
PR
929 tconn->data.socket = sock;
930 tconn->meta.socket = msock;
931 tconn->last_received = jiffies;
b411b363 932
907599e0 933 h = drbd_do_handshake(tconn);
b411b363
PR
934 if (h <= 0)
935 return h;
936
907599e0 937 if (tconn->cram_hmac_tfm) {
b411b363 938 /* drbd_request_state(mdev, NS(conn, WFAuth)); */
907599e0 939 switch (drbd_do_auth(tconn)) {
b10d96cb 940 case -1:
907599e0 941 conn_err(tconn, "Authentication of peer failed\n");
b411b363 942 return -1;
b10d96cb 943 case 0:
907599e0 944 conn_err(tconn, "Authentication of peer failed, trying again.\n");
b10d96cb 945 return 0;
b411b363
PR
946 }
947 }
948
bbeb641c 949 if (conn_request_state(tconn, NS(conn, C_WF_REPORT_PARAMS), CS_VERBOSE) < SS_SUCCESS)
b411b363
PR
950 return 0;
951
907599e0 952 sock->sk->sk_sndtimeo = tconn->net_conf->timeout*HZ/10;
b411b363
PR
953 sock->sk->sk_rcvtimeo = MAX_SCHEDULE_TIMEOUT;
954
907599e0 955 drbd_thread_start(&tconn->asender);
b411b363 956
387eb308 957 if (drbd_send_protocol(tconn) == -EOPNOTSUPP)
7e2455c1 958 return -1;
b411b363 959
907599e0 960 return !idr_for_each(&tconn->volumes, drbd_connected, tconn);
b411b363
PR
961
962out_release_sockets:
963 if (sock)
964 sock_release(sock);
965 if (msock)
966 sock_release(msock);
967 return -1;
968}
969
8172f3e9 970static int decode_header(struct drbd_tconn *tconn, struct p_header *h, struct packet_info *pi)
b411b363 971{
fd340c12 972 if (h->h80.magic == cpu_to_be32(DRBD_MAGIC)) {
77351055
PR
973 pi->cmd = be16_to_cpu(h->h80.command);
974 pi->size = be16_to_cpu(h->h80.length);
eefc2f7d 975 pi->vnr = 0;
ca9bc12b 976 } else if (h->h95.magic == cpu_to_be16(DRBD_MAGIC_BIG)) {
77351055
PR
977 pi->cmd = be16_to_cpu(h->h95.command);
978 pi->size = be32_to_cpu(h->h95.length) & 0x00ffffff;
979 pi->vnr = 0;
02918be2 980 } else {
ce243853 981 conn_err(tconn, "magic?? on data m: 0x%08x c: %d l: %d\n",
004352fa
LE
982 be32_to_cpu(h->h80.magic),
983 be16_to_cpu(h->h80.command),
984 be16_to_cpu(h->h80.length));
8172f3e9 985 return -EINVAL;
b411b363 986 }
8172f3e9 987 return 0;
257d0af6
PR
988}
989
9ba7aa00 990static int drbd_recv_header(struct drbd_tconn *tconn, struct packet_info *pi)
257d0af6 991{
e6ef8a5c 992 struct p_header *h = tconn->data.rbuf;
69bc7bc3 993 int err;
257d0af6 994
a5c31904
AG
995 err = drbd_recv_all_warn(tconn, h, sizeof(*h));
996 if (err)
69bc7bc3 997 return err;
257d0af6 998
69bc7bc3 999 err = decode_header(tconn, h, pi);
9ba7aa00 1000 tconn->last_received = jiffies;
b411b363 1001
69bc7bc3 1002 return err;
b411b363
PR
1003}
1004
2451fc3b 1005static void drbd_flush(struct drbd_conf *mdev)
b411b363
PR
1006{
1007 int rv;
1008
1009 if (mdev->write_ordering >= WO_bdev_flush && get_ldev(mdev)) {
fbd9b09a 1010 rv = blkdev_issue_flush(mdev->ldev->backing_bdev, GFP_KERNEL,
dd3932ed 1011 NULL);
b411b363
PR
1012 if (rv) {
1013 dev_err(DEV, "local disk flush failed with status %d\n", rv);
1014 /* would rather check on EOPNOTSUPP, but that is not reliable.
1015 * don't try again for ANY return value != 0
1016 * if (rv == -EOPNOTSUPP) */
1017 drbd_bump_write_ordering(mdev, WO_drain_io);
1018 }
1019 put_ldev(mdev);
1020 }
b411b363
PR
1021}
1022
1023/**
1024 * drbd_may_finish_epoch() - Applies an epoch_event to the epoch's state, eventually finishes it.
1025 * @mdev: DRBD device.
1026 * @epoch: Epoch object.
1027 * @ev: Epoch event.
1028 */
1029static enum finish_epoch drbd_may_finish_epoch(struct drbd_conf *mdev,
1030 struct drbd_epoch *epoch,
1031 enum epoch_event ev)
1032{
2451fc3b 1033 int epoch_size;
b411b363 1034 struct drbd_epoch *next_epoch;
b411b363
PR
1035 enum finish_epoch rv = FE_STILL_LIVE;
1036
1037 spin_lock(&mdev->epoch_lock);
1038 do {
1039 next_epoch = NULL;
b411b363
PR
1040
1041 epoch_size = atomic_read(&epoch->epoch_size);
1042
1043 switch (ev & ~EV_CLEANUP) {
1044 case EV_PUT:
1045 atomic_dec(&epoch->active);
1046 break;
1047 case EV_GOT_BARRIER_NR:
1048 set_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags);
b411b363
PR
1049 break;
1050 case EV_BECAME_LAST:
1051 /* nothing to do*/
1052 break;
1053 }
1054
b411b363
PR
1055 if (epoch_size != 0 &&
1056 atomic_read(&epoch->active) == 0 &&
2451fc3b 1057 test_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags)) {
b411b363
PR
1058 if (!(ev & EV_CLEANUP)) {
1059 spin_unlock(&mdev->epoch_lock);
1060 drbd_send_b_ack(mdev, epoch->barrier_nr, epoch_size);
1061 spin_lock(&mdev->epoch_lock);
1062 }
1063 dec_unacked(mdev);
1064
1065 if (mdev->current_epoch != epoch) {
1066 next_epoch = list_entry(epoch->list.next, struct drbd_epoch, list);
1067 list_del(&epoch->list);
1068 ev = EV_BECAME_LAST | (ev & EV_CLEANUP);
1069 mdev->epochs--;
b411b363
PR
1070 kfree(epoch);
1071
1072 if (rv == FE_STILL_LIVE)
1073 rv = FE_DESTROYED;
1074 } else {
1075 epoch->flags = 0;
1076 atomic_set(&epoch->epoch_size, 0);
698f9315 1077 /* atomic_set(&epoch->active, 0); is already zero */
b411b363
PR
1078 if (rv == FE_STILL_LIVE)
1079 rv = FE_RECYCLED;
2451fc3b 1080 wake_up(&mdev->ee_wait);
b411b363
PR
1081 }
1082 }
1083
1084 if (!next_epoch)
1085 break;
1086
1087 epoch = next_epoch;
1088 } while (1);
1089
1090 spin_unlock(&mdev->epoch_lock);
1091
b411b363
PR
1092 return rv;
1093}
1094
1095/**
1096 * drbd_bump_write_ordering() - Fall back to an other write ordering method
1097 * @mdev: DRBD device.
1098 * @wo: Write ordering method to try.
1099 */
1100void drbd_bump_write_ordering(struct drbd_conf *mdev, enum write_ordering_e wo) __must_hold(local)
1101{
1102 enum write_ordering_e pwo;
1103 static char *write_ordering_str[] = {
1104 [WO_none] = "none",
1105 [WO_drain_io] = "drain",
1106 [WO_bdev_flush] = "flush",
b411b363
PR
1107 };
1108
1109 pwo = mdev->write_ordering;
1110 wo = min(pwo, wo);
b411b363
PR
1111 if (wo == WO_bdev_flush && mdev->ldev->dc.no_disk_flush)
1112 wo = WO_drain_io;
1113 if (wo == WO_drain_io && mdev->ldev->dc.no_disk_drain)
1114 wo = WO_none;
1115 mdev->write_ordering = wo;
2451fc3b 1116 if (pwo != mdev->write_ordering || wo == WO_bdev_flush)
b411b363
PR
1117 dev_info(DEV, "Method to ensure write ordering: %s\n", write_ordering_str[mdev->write_ordering]);
1118}
1119
45bb912b 1120/**
fbe29dec 1121 * drbd_submit_peer_request()
45bb912b 1122 * @mdev: DRBD device.
db830c46 1123 * @peer_req: peer request
45bb912b 1124 * @rw: flag field, see bio->bi_rw
10f6d992
LE
1125 *
1126 * May spread the pages to multiple bios,
1127 * depending on bio_add_page restrictions.
1128 *
1129 * Returns 0 if all bios have been submitted,
1130 * -ENOMEM if we could not allocate enough bios,
1131 * -ENOSPC (any better suggestion?) if we have not been able to bio_add_page a
1132 * single page to an empty bio (which should never happen and likely indicates
1133 * that the lower level IO stack is in some way broken). This has been observed
1134 * on certain Xen deployments.
45bb912b
LE
1135 */
1136/* TODO allocate from our own bio_set. */
fbe29dec
AG
1137int drbd_submit_peer_request(struct drbd_conf *mdev,
1138 struct drbd_peer_request *peer_req,
1139 const unsigned rw, const int fault_type)
45bb912b
LE
1140{
1141 struct bio *bios = NULL;
1142 struct bio *bio;
db830c46
AG
1143 struct page *page = peer_req->pages;
1144 sector_t sector = peer_req->i.sector;
1145 unsigned ds = peer_req->i.size;
45bb912b
LE
1146 unsigned n_bios = 0;
1147 unsigned nr_pages = (ds + PAGE_SIZE -1) >> PAGE_SHIFT;
10f6d992 1148 int err = -ENOMEM;
45bb912b
LE
1149
1150 /* In most cases, we will only need one bio. But in case the lower
1151 * level restrictions happen to be different at this offset on this
1152 * side than those of the sending peer, we may need to submit the
da4a75d2
LE
1153 * request in more than one bio.
1154 *
1155 * Plain bio_alloc is good enough here, this is no DRBD internally
1156 * generated bio, but a bio allocated on behalf of the peer.
1157 */
45bb912b
LE
1158next_bio:
1159 bio = bio_alloc(GFP_NOIO, nr_pages);
1160 if (!bio) {
1161 dev_err(DEV, "submit_ee: Allocation of a bio failed\n");
1162 goto fail;
1163 }
db830c46 1164 /* > peer_req->i.sector, unless this is the first bio */
45bb912b
LE
1165 bio->bi_sector = sector;
1166 bio->bi_bdev = mdev->ldev->backing_bdev;
45bb912b 1167 bio->bi_rw = rw;
db830c46 1168 bio->bi_private = peer_req;
fcefa62e 1169 bio->bi_end_io = drbd_peer_request_endio;
45bb912b
LE
1170
1171 bio->bi_next = bios;
1172 bios = bio;
1173 ++n_bios;
1174
1175 page_chain_for_each(page) {
1176 unsigned len = min_t(unsigned, ds, PAGE_SIZE);
1177 if (!bio_add_page(bio, page, len, 0)) {
10f6d992
LE
1178 /* A single page must always be possible!
1179 * But in case it fails anyways,
1180 * we deal with it, and complain (below). */
1181 if (bio->bi_vcnt == 0) {
1182 dev_err(DEV,
1183 "bio_add_page failed for len=%u, "
1184 "bi_vcnt=0 (bi_sector=%llu)\n",
1185 len, (unsigned long long)bio->bi_sector);
1186 err = -ENOSPC;
1187 goto fail;
1188 }
45bb912b
LE
1189 goto next_bio;
1190 }
1191 ds -= len;
1192 sector += len >> 9;
1193 --nr_pages;
1194 }
1195 D_ASSERT(page == NULL);
1196 D_ASSERT(ds == 0);
1197
db830c46 1198 atomic_set(&peer_req->pending_bios, n_bios);
45bb912b
LE
1199 do {
1200 bio = bios;
1201 bios = bios->bi_next;
1202 bio->bi_next = NULL;
1203
45bb912b 1204 drbd_generic_make_request(mdev, fault_type, bio);
45bb912b 1205 } while (bios);
45bb912b
LE
1206 return 0;
1207
1208fail:
1209 while (bios) {
1210 bio = bios;
1211 bios = bios->bi_next;
1212 bio_put(bio);
1213 }
10f6d992 1214 return err;
45bb912b
LE
1215}
1216
53840641 1217static void drbd_remove_epoch_entry_interval(struct drbd_conf *mdev,
db830c46 1218 struct drbd_peer_request *peer_req)
53840641 1219{
db830c46 1220 struct drbd_interval *i = &peer_req->i;
53840641
AG
1221
1222 drbd_remove_interval(&mdev->write_requests, i);
1223 drbd_clear_interval(i);
1224
6c852bec 1225 /* Wake up any processes waiting for this peer request to complete. */
53840641
AG
1226 if (i->waiting)
1227 wake_up(&mdev->misc_wait);
1228}
1229
4a76b161 1230static int receive_Barrier(struct drbd_tconn *tconn, struct packet_info *pi)
b411b363 1231{
4a76b161 1232 struct drbd_conf *mdev;
2451fc3b 1233 int rv;
4a76b161 1234 struct p_barrier *p = tconn->data.rbuf;
b411b363
PR
1235 struct drbd_epoch *epoch;
1236
4a76b161
AG
1237 mdev = vnr_to_mdev(tconn, pi->vnr);
1238 if (!mdev)
1239 return -EIO;
1240
b411b363
PR
1241 inc_unacked(mdev);
1242
b411b363
PR
1243 mdev->current_epoch->barrier_nr = p->barrier;
1244 rv = drbd_may_finish_epoch(mdev, mdev->current_epoch, EV_GOT_BARRIER_NR);
1245
1246 /* P_BARRIER_ACK may imply that the corresponding extent is dropped from
1247 * the activity log, which means it would not be resynced in case the
1248 * R_PRIMARY crashes now.
1249 * Therefore we must send the barrier_ack after the barrier request was
1250 * completed. */
1251 switch (mdev->write_ordering) {
b411b363
PR
1252 case WO_none:
1253 if (rv == FE_RECYCLED)
82bc0194 1254 return 0;
2451fc3b
PR
1255
1256 /* receiver context, in the writeout path of the other node.
1257 * avoid potential distributed deadlock */
1258 epoch = kmalloc(sizeof(struct drbd_epoch), GFP_NOIO);
1259 if (epoch)
1260 break;
1261 else
1262 dev_warn(DEV, "Allocation of an epoch failed, slowing down\n");
1263 /* Fall through */
b411b363
PR
1264
1265 case WO_bdev_flush:
1266 case WO_drain_io:
b411b363 1267 drbd_wait_ee_list_empty(mdev, &mdev->active_ee);
2451fc3b
PR
1268 drbd_flush(mdev);
1269
1270 if (atomic_read(&mdev->current_epoch->epoch_size)) {
1271 epoch = kmalloc(sizeof(struct drbd_epoch), GFP_NOIO);
1272 if (epoch)
1273 break;
b411b363
PR
1274 }
1275
2451fc3b
PR
1276 epoch = mdev->current_epoch;
1277 wait_event(mdev->ee_wait, atomic_read(&epoch->epoch_size) == 0);
1278
1279 D_ASSERT(atomic_read(&epoch->active) == 0);
1280 D_ASSERT(epoch->flags == 0);
b411b363 1281
82bc0194 1282 return 0;
2451fc3b
PR
1283 default:
1284 dev_err(DEV, "Strangeness in mdev->write_ordering %d\n", mdev->write_ordering);
82bc0194 1285 return -EIO;
b411b363
PR
1286 }
1287
1288 epoch->flags = 0;
1289 atomic_set(&epoch->epoch_size, 0);
1290 atomic_set(&epoch->active, 0);
1291
1292 spin_lock(&mdev->epoch_lock);
1293 if (atomic_read(&mdev->current_epoch->epoch_size)) {
1294 list_add(&epoch->list, &mdev->current_epoch->list);
1295 mdev->current_epoch = epoch;
1296 mdev->epochs++;
b411b363
PR
1297 } else {
1298 /* The current_epoch got recycled while we allocated this one... */
1299 kfree(epoch);
1300 }
1301 spin_unlock(&mdev->epoch_lock);
1302
82bc0194 1303 return 0;
b411b363
PR
1304}
1305
1306/* used from receive_RSDataReply (recv_resync_read)
1307 * and from receive_Data */
f6ffca9f
AG
1308static struct drbd_peer_request *
1309read_in_block(struct drbd_conf *mdev, u64 id, sector_t sector,
1310 int data_size) __must_hold(local)
b411b363 1311{
6666032a 1312 const sector_t capacity = drbd_get_capacity(mdev->this_bdev);
db830c46 1313 struct drbd_peer_request *peer_req;
b411b363 1314 struct page *page;
a5c31904 1315 int dgs, ds, err;
a0638456
PR
1316 void *dig_in = mdev->tconn->int_dig_in;
1317 void *dig_vv = mdev->tconn->int_dig_vv;
6b4388ac 1318 unsigned long *data;
b411b363 1319
a0638456
PR
1320 dgs = (mdev->tconn->agreed_pro_version >= 87 && mdev->tconn->integrity_r_tfm) ?
1321 crypto_hash_digestsize(mdev->tconn->integrity_r_tfm) : 0;
b411b363
PR
1322
1323 if (dgs) {
a5c31904
AG
1324 err = drbd_recv_all_warn(mdev->tconn, dig_in, dgs);
1325 if (err)
b411b363 1326 return NULL;
b411b363
PR
1327 }
1328
1329 data_size -= dgs;
1330
841ce241
AG
1331 if (!expect(data_size != 0))
1332 return NULL;
1333 if (!expect(IS_ALIGNED(data_size, 512)))
1334 return NULL;
1335 if (!expect(data_size <= DRBD_MAX_BIO_SIZE))
1336 return NULL;
b411b363 1337
6666032a
LE
1338 /* even though we trust out peer,
1339 * we sometimes have to double check. */
1340 if (sector + (data_size>>9) > capacity) {
fdda6544
LE
1341 dev_err(DEV, "request from peer beyond end of local disk: "
1342 "capacity: %llus < sector: %llus + size: %u\n",
6666032a
LE
1343 (unsigned long long)capacity,
1344 (unsigned long long)sector, data_size);
1345 return NULL;
1346 }
1347
b411b363
PR
1348 /* GFP_NOIO, because we must not cause arbitrary write-out: in a DRBD
1349 * "criss-cross" setup, that might cause write-out on some other DRBD,
1350 * which in turn might block on the other node at this very place. */
db830c46
AG
1351 peer_req = drbd_alloc_ee(mdev, id, sector, data_size, GFP_NOIO);
1352 if (!peer_req)
b411b363 1353 return NULL;
45bb912b 1354
b411b363 1355 ds = data_size;
db830c46 1356 page = peer_req->pages;
45bb912b
LE
1357 page_chain_for_each(page) {
1358 unsigned len = min_t(int, ds, PAGE_SIZE);
6b4388ac 1359 data = kmap(page);
a5c31904 1360 err = drbd_recv_all_warn(mdev->tconn, data, len);
0cf9d27e 1361 if (drbd_insert_fault(mdev, DRBD_FAULT_RECEIVE)) {
6b4388ac
PR
1362 dev_err(DEV, "Fault injection: Corrupting data on receive\n");
1363 data[0] = data[0] ^ (unsigned long)-1;
1364 }
b411b363 1365 kunmap(page);
a5c31904 1366 if (err) {
db830c46 1367 drbd_free_ee(mdev, peer_req);
b411b363
PR
1368 return NULL;
1369 }
a5c31904 1370 ds -= len;
b411b363
PR
1371 }
1372
1373 if (dgs) {
db830c46 1374 drbd_csum_ee(mdev, mdev->tconn->integrity_r_tfm, peer_req, dig_vv);
b411b363 1375 if (memcmp(dig_in, dig_vv, dgs)) {
470be44a
LE
1376 dev_err(DEV, "Digest integrity check FAILED: %llus +%u\n",
1377 (unsigned long long)sector, data_size);
db830c46 1378 drbd_free_ee(mdev, peer_req);
b411b363
PR
1379 return NULL;
1380 }
1381 }
1382 mdev->recv_cnt += data_size>>9;
db830c46 1383 return peer_req;
b411b363
PR
1384}
1385
1386/* drbd_drain_block() just takes a data block
1387 * out of the socket input buffer, and discards it.
1388 */
1389static int drbd_drain_block(struct drbd_conf *mdev, int data_size)
1390{
1391 struct page *page;
a5c31904 1392 int err = 0;
b411b363
PR
1393 void *data;
1394
c3470cde 1395 if (!data_size)
fc5be839 1396 return 0;
c3470cde 1397
45bb912b 1398 page = drbd_pp_alloc(mdev, 1, 1);
b411b363
PR
1399
1400 data = kmap(page);
1401 while (data_size) {
fc5be839
AG
1402 unsigned int len = min_t(int, data_size, PAGE_SIZE);
1403
a5c31904
AG
1404 err = drbd_recv_all_warn(mdev->tconn, data, len);
1405 if (err)
b411b363 1406 break;
a5c31904 1407 data_size -= len;
b411b363
PR
1408 }
1409 kunmap(page);
435f0740 1410 drbd_pp_free(mdev, page, 0);
fc5be839 1411 return err;
b411b363
PR
1412}
1413
1414static int recv_dless_read(struct drbd_conf *mdev, struct drbd_request *req,
1415 sector_t sector, int data_size)
1416{
1417 struct bio_vec *bvec;
1418 struct bio *bio;
a5c31904 1419 int dgs, err, i, expect;
a0638456
PR
1420 void *dig_in = mdev->tconn->int_dig_in;
1421 void *dig_vv = mdev->tconn->int_dig_vv;
b411b363 1422
a0638456
PR
1423 dgs = (mdev->tconn->agreed_pro_version >= 87 && mdev->tconn->integrity_r_tfm) ?
1424 crypto_hash_digestsize(mdev->tconn->integrity_r_tfm) : 0;
b411b363
PR
1425
1426 if (dgs) {
a5c31904
AG
1427 err = drbd_recv_all_warn(mdev->tconn, dig_in, dgs);
1428 if (err)
1429 return err;
b411b363
PR
1430 }
1431
1432 data_size -= dgs;
1433
1434 /* optimistically update recv_cnt. if receiving fails below,
1435 * we disconnect anyways, and counters will be reset. */
1436 mdev->recv_cnt += data_size>>9;
1437
1438 bio = req->master_bio;
1439 D_ASSERT(sector == bio->bi_sector);
1440
1441 bio_for_each_segment(bvec, bio, i) {
a5c31904 1442 void *mapped = kmap(bvec->bv_page) + bvec->bv_offset;
b411b363 1443 expect = min_t(int, data_size, bvec->bv_len);
a5c31904 1444 err = drbd_recv_all_warn(mdev->tconn, mapped, expect);
b411b363 1445 kunmap(bvec->bv_page);
a5c31904
AG
1446 if (err)
1447 return err;
1448 data_size -= expect;
b411b363
PR
1449 }
1450
1451 if (dgs) {
a0638456 1452 drbd_csum_bio(mdev, mdev->tconn->integrity_r_tfm, bio, dig_vv);
b411b363
PR
1453 if (memcmp(dig_in, dig_vv, dgs)) {
1454 dev_err(DEV, "Digest integrity check FAILED. Broken NICs?\n");
28284cef 1455 return -EINVAL;
b411b363
PR
1456 }
1457 }
1458
1459 D_ASSERT(data_size == 0);
28284cef 1460 return 0;
b411b363
PR
1461}
1462
1463/* e_end_resync_block() is called via
1464 * drbd_process_done_ee() by asender only */
99920dc5 1465static int e_end_resync_block(struct drbd_work *w, int unused)
b411b363 1466{
8050e6d0
AG
1467 struct drbd_peer_request *peer_req =
1468 container_of(w, struct drbd_peer_request, w);
00d56944 1469 struct drbd_conf *mdev = w->mdev;
db830c46 1470 sector_t sector = peer_req->i.sector;
99920dc5 1471 int err;
b411b363 1472
db830c46 1473 D_ASSERT(drbd_interval_empty(&peer_req->i));
b411b363 1474
db830c46
AG
1475 if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
1476 drbd_set_in_sync(mdev, sector, peer_req->i.size);
99920dc5 1477 err = drbd_send_ack(mdev, P_RS_WRITE_ACK, peer_req);
b411b363
PR
1478 } else {
1479 /* Record failure to sync */
db830c46 1480 drbd_rs_failed_io(mdev, sector, peer_req->i.size);
b411b363 1481
99920dc5 1482 err = drbd_send_ack(mdev, P_NEG_ACK, peer_req);
b411b363
PR
1483 }
1484 dec_unacked(mdev);
1485
99920dc5 1486 return err;
b411b363
PR
1487}
1488
1489static int recv_resync_read(struct drbd_conf *mdev, sector_t sector, int data_size) __releases(local)
1490{
db830c46 1491 struct drbd_peer_request *peer_req;
b411b363 1492
db830c46
AG
1493 peer_req = read_in_block(mdev, ID_SYNCER, sector, data_size);
1494 if (!peer_req)
45bb912b 1495 goto fail;
b411b363
PR
1496
1497 dec_rs_pending(mdev);
1498
b411b363
PR
1499 inc_unacked(mdev);
1500 /* corresponding dec_unacked() in e_end_resync_block()
1501 * respective _drbd_clear_done_ee */
1502
db830c46 1503 peer_req->w.cb = e_end_resync_block;
45bb912b 1504
87eeee41 1505 spin_lock_irq(&mdev->tconn->req_lock);
db830c46 1506 list_add(&peer_req->w.list, &mdev->sync_ee);
87eeee41 1507 spin_unlock_irq(&mdev->tconn->req_lock);
b411b363 1508
0f0601f4 1509 atomic_add(data_size >> 9, &mdev->rs_sect_ev);
fbe29dec 1510 if (drbd_submit_peer_request(mdev, peer_req, WRITE, DRBD_FAULT_RS_WR) == 0)
e1c1b0fc 1511 return 0;
b411b363 1512
10f6d992
LE
1513 /* don't care for the reason here */
1514 dev_err(DEV, "submit failed, triggering re-connect\n");
87eeee41 1515 spin_lock_irq(&mdev->tconn->req_lock);
db830c46 1516 list_del(&peer_req->w.list);
87eeee41 1517 spin_unlock_irq(&mdev->tconn->req_lock);
22cc37a9 1518
db830c46 1519 drbd_free_ee(mdev, peer_req);
45bb912b
LE
1520fail:
1521 put_ldev(mdev);
e1c1b0fc 1522 return -EIO;
b411b363
PR
1523}
1524
668eebc6 1525static struct drbd_request *
bc9c5c41
AG
1526find_request(struct drbd_conf *mdev, struct rb_root *root, u64 id,
1527 sector_t sector, bool missing_ok, const char *func)
51624585 1528{
51624585
AG
1529 struct drbd_request *req;
1530
bc9c5c41
AG
1531 /* Request object according to our peer */
1532 req = (struct drbd_request *)(unsigned long)id;
5e472264 1533 if (drbd_contains_interval(root, sector, &req->i) && req->i.local)
668eebc6 1534 return req;
c3afd8f5
AG
1535 if (!missing_ok) {
1536 dev_err(DEV, "%s: failed to find request %lu, sector %llus\n", func,
1537 (unsigned long)id, (unsigned long long)sector);
1538 }
51624585
AG
1539 return NULL;
1540}
1541
4a76b161 1542static int receive_DataReply(struct drbd_tconn *tconn, struct packet_info *pi)
b411b363 1543{
4a76b161 1544 struct drbd_conf *mdev;
b411b363
PR
1545 struct drbd_request *req;
1546 sector_t sector;
82bc0194 1547 int err;
4a76b161
AG
1548 struct p_data *p = tconn->data.rbuf;
1549
1550 mdev = vnr_to_mdev(tconn, pi->vnr);
1551 if (!mdev)
1552 return -EIO;
b411b363
PR
1553
1554 sector = be64_to_cpu(p->sector);
1555
87eeee41 1556 spin_lock_irq(&mdev->tconn->req_lock);
bc9c5c41 1557 req = find_request(mdev, &mdev->read_requests, p->block_id, sector, false, __func__);
87eeee41 1558 spin_unlock_irq(&mdev->tconn->req_lock);
c3afd8f5 1559 if (unlikely(!req))
82bc0194 1560 return -EIO;
b411b363 1561
24c4830c 1562 /* hlist_del(&req->collision) is done in _req_may_be_done, to avoid
b411b363
PR
1563 * special casing it there for the various failure cases.
1564 * still no race with drbd_fail_pending_reads */
e2857216 1565 err = recv_dless_read(mdev, req, sector, pi->size);
82bc0194 1566 if (!err)
8554df1c 1567 req_mod(req, DATA_RECEIVED);
b411b363
PR
1568 /* else: nothing. handled from drbd_disconnect...
1569 * I don't think we may complete this just yet
1570 * in case we are "on-disconnect: freeze" */
1571
82bc0194 1572 return err;
b411b363
PR
1573}
1574
4a76b161 1575static int receive_RSDataReply(struct drbd_tconn *tconn, struct packet_info *pi)
b411b363 1576{
4a76b161 1577 struct drbd_conf *mdev;
b411b363 1578 sector_t sector;
82bc0194 1579 int err;
4a76b161
AG
1580 struct p_data *p = tconn->data.rbuf;
1581
1582 mdev = vnr_to_mdev(tconn, pi->vnr);
1583 if (!mdev)
1584 return -EIO;
b411b363
PR
1585
1586 sector = be64_to_cpu(p->sector);
1587 D_ASSERT(p->block_id == ID_SYNCER);
1588
1589 if (get_ldev(mdev)) {
1590 /* data is submitted to disk within recv_resync_read.
1591 * corresponding put_ldev done below on error,
fcefa62e 1592 * or in drbd_peer_request_endio. */
e2857216 1593 err = recv_resync_read(mdev, sector, pi->size);
b411b363
PR
1594 } else {
1595 if (__ratelimit(&drbd_ratelimit_state))
1596 dev_err(DEV, "Can not write resync data to local disk.\n");
1597
e2857216 1598 err = drbd_drain_block(mdev, pi->size);
b411b363 1599
e2857216 1600 drbd_send_ack_dp(mdev, P_NEG_ACK, p, pi->size);
b411b363
PR
1601 }
1602
e2857216 1603 atomic_add(pi->size >> 9, &mdev->rs_sect_in);
778f271d 1604
82bc0194 1605 return err;
b411b363
PR
1606}
1607
99920dc5 1608static int w_restart_write(struct drbd_work *w, int cancel)
7be8da07
AG
1609{
1610 struct drbd_request *req = container_of(w, struct drbd_request, w);
1611 struct drbd_conf *mdev = w->mdev;
1612 struct bio *bio;
1613 unsigned long start_time;
1614 unsigned long flags;
1615
1616 spin_lock_irqsave(&mdev->tconn->req_lock, flags);
1617 if (!expect(req->rq_state & RQ_POSTPONED)) {
1618 spin_unlock_irqrestore(&mdev->tconn->req_lock, flags);
99920dc5 1619 return -EIO;
7be8da07
AG
1620 }
1621 bio = req->master_bio;
1622 start_time = req->start_time;
1623 /* Postponed requests will not have their master_bio completed! */
1624 __req_mod(req, DISCARD_WRITE, NULL);
1625 spin_unlock_irqrestore(&mdev->tconn->req_lock, flags);
1626
1627 while (__drbd_make_request(mdev, bio, start_time))
1628 /* retry */ ;
99920dc5 1629 return 0;
7be8da07
AG
1630}
1631
1632static void restart_conflicting_writes(struct drbd_conf *mdev,
1633 sector_t sector, int size)
1634{
1635 struct drbd_interval *i;
1636 struct drbd_request *req;
1637
1638 drbd_for_each_overlap(i, &mdev->write_requests, sector, size) {
1639 if (!i->local)
1640 continue;
1641 req = container_of(i, struct drbd_request, i);
1642 if (req->rq_state & RQ_LOCAL_PENDING ||
1643 !(req->rq_state & RQ_POSTPONED))
1644 continue;
1645 if (expect(list_empty(&req->w.list))) {
1646 req->w.mdev = mdev;
1647 req->w.cb = w_restart_write;
1648 drbd_queue_work(&mdev->tconn->data.work, &req->w);
1649 }
1650 }
1651}
1652
b411b363
PR
1653/* e_end_block() is called via drbd_process_done_ee().
1654 * this means this function only runs in the asender thread
1655 */
99920dc5 1656static int e_end_block(struct drbd_work *w, int cancel)
b411b363 1657{
8050e6d0
AG
1658 struct drbd_peer_request *peer_req =
1659 container_of(w, struct drbd_peer_request, w);
00d56944 1660 struct drbd_conf *mdev = w->mdev;
db830c46 1661 sector_t sector = peer_req->i.sector;
99920dc5 1662 int err = 0, pcmd;
b411b363 1663
89e58e75 1664 if (mdev->tconn->net_conf->wire_protocol == DRBD_PROT_C) {
db830c46 1665 if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
b411b363
PR
1666 pcmd = (mdev->state.conn >= C_SYNC_SOURCE &&
1667 mdev->state.conn <= C_PAUSED_SYNC_T &&
db830c46 1668 peer_req->flags & EE_MAY_SET_IN_SYNC) ?
b411b363 1669 P_RS_WRITE_ACK : P_WRITE_ACK;
99920dc5 1670 err = drbd_send_ack(mdev, pcmd, peer_req);
b411b363 1671 if (pcmd == P_RS_WRITE_ACK)
db830c46 1672 drbd_set_in_sync(mdev, sector, peer_req->i.size);
b411b363 1673 } else {
99920dc5 1674 err = drbd_send_ack(mdev, P_NEG_ACK, peer_req);
b411b363
PR
1675 /* we expect it to be marked out of sync anyways...
1676 * maybe assert this? */
1677 }
1678 dec_unacked(mdev);
1679 }
1680 /* we delete from the conflict detection hash _after_ we sent out the
1681 * P_WRITE_ACK / P_NEG_ACK, to get the sequence number right. */
89e58e75 1682 if (mdev->tconn->net_conf->two_primaries) {
87eeee41 1683 spin_lock_irq(&mdev->tconn->req_lock);
db830c46
AG
1684 D_ASSERT(!drbd_interval_empty(&peer_req->i));
1685 drbd_remove_epoch_entry_interval(mdev, peer_req);
7be8da07
AG
1686 if (peer_req->flags & EE_RESTART_REQUESTS)
1687 restart_conflicting_writes(mdev, sector, peer_req->i.size);
87eeee41 1688 spin_unlock_irq(&mdev->tconn->req_lock);
bb3bfe96 1689 } else
db830c46 1690 D_ASSERT(drbd_interval_empty(&peer_req->i));
b411b363 1691
db830c46 1692 drbd_may_finish_epoch(mdev, peer_req->epoch, EV_PUT + (cancel ? EV_CLEANUP : 0));
b411b363 1693
99920dc5 1694 return err;
b411b363
PR
1695}
1696
7be8da07 1697static int e_send_ack(struct drbd_work *w, enum drbd_packet ack)
b411b363 1698{
7be8da07 1699 struct drbd_conf *mdev = w->mdev;
8050e6d0
AG
1700 struct drbd_peer_request *peer_req =
1701 container_of(w, struct drbd_peer_request, w);
99920dc5 1702 int err;
b411b363 1703
99920dc5 1704 err = drbd_send_ack(mdev, ack, peer_req);
b411b363
PR
1705 dec_unacked(mdev);
1706
99920dc5 1707 return err;
b411b363
PR
1708}
1709
99920dc5 1710static int e_send_discard_write(struct drbd_work *w, int unused)
7be8da07
AG
1711{
1712 return e_send_ack(w, P_DISCARD_WRITE);
1713}
1714
99920dc5 1715static int e_send_retry_write(struct drbd_work *w, int unused)
7be8da07
AG
1716{
1717 struct drbd_tconn *tconn = w->mdev->tconn;
1718
1719 return e_send_ack(w, tconn->agreed_pro_version >= 100 ?
1720 P_RETRY_WRITE : P_DISCARD_WRITE);
1721}
1722
3e394da1
AG
1723static bool seq_greater(u32 a, u32 b)
1724{
1725 /*
1726 * We assume 32-bit wrap-around here.
1727 * For 24-bit wrap-around, we would have to shift:
1728 * a <<= 8; b <<= 8;
1729 */
1730 return (s32)a - (s32)b > 0;
1731}
1732
1733static u32 seq_max(u32 a, u32 b)
1734{
1735 return seq_greater(a, b) ? a : b;
1736}
1737
7be8da07
AG
1738static bool need_peer_seq(struct drbd_conf *mdev)
1739{
1740 struct drbd_tconn *tconn = mdev->tconn;
1741
1742 /*
1743 * We only need to keep track of the last packet_seq number of our peer
1744 * if we are in dual-primary mode and we have the discard flag set; see
1745 * handle_write_conflicts().
1746 */
1747 return tconn->net_conf->two_primaries &&
1748 test_bit(DISCARD_CONCURRENT, &tconn->flags);
1749}
1750
43ae077d 1751static void update_peer_seq(struct drbd_conf *mdev, unsigned int peer_seq)
3e394da1 1752{
3c13b680 1753 unsigned int newest_peer_seq;
3e394da1 1754
7be8da07
AG
1755 if (need_peer_seq(mdev)) {
1756 spin_lock(&mdev->peer_seq_lock);
3c13b680
LE
1757 newest_peer_seq = seq_max(mdev->peer_seq, peer_seq);
1758 mdev->peer_seq = newest_peer_seq;
7be8da07 1759 spin_unlock(&mdev->peer_seq_lock);
3c13b680
LE
1760 /* wake up only if we actually changed mdev->peer_seq */
1761 if (peer_seq == newest_peer_seq)
7be8da07
AG
1762 wake_up(&mdev->seq_wait);
1763 }
3e394da1
AG
1764}
1765
b411b363
PR
1766/* Called from receive_Data.
1767 * Synchronize packets on sock with packets on msock.
1768 *
1769 * This is here so even when a P_DATA packet traveling via sock overtook an Ack
1770 * packet traveling on msock, they are still processed in the order they have
1771 * been sent.
1772 *
1773 * Note: we don't care for Ack packets overtaking P_DATA packets.
1774 *
1775 * In case packet_seq is larger than mdev->peer_seq number, there are
1776 * outstanding packets on the msock. We wait for them to arrive.
1777 * In case we are the logically next packet, we update mdev->peer_seq
1778 * ourselves. Correctly handles 32bit wrap around.
1779 *
1780 * Assume we have a 10 GBit connection, that is about 1<<30 byte per second,
1781 * about 1<<21 sectors per second. So "worst" case, we have 1<<3 == 8 seconds
1782 * for the 24bit wrap (historical atomic_t guarantee on some archs), and we have
1783 * 1<<9 == 512 seconds aka ages for the 32bit wrap around...
1784 *
1785 * returns 0 if we may process the packet,
1786 * -ERESTARTSYS if we were interrupted (by disconnect signal). */
7be8da07 1787static int wait_for_and_update_peer_seq(struct drbd_conf *mdev, const u32 peer_seq)
b411b363
PR
1788{
1789 DEFINE_WAIT(wait);
b411b363 1790 long timeout;
7be8da07
AG
1791 int ret;
1792
1793 if (!need_peer_seq(mdev))
1794 return 0;
1795
b411b363
PR
1796 spin_lock(&mdev->peer_seq_lock);
1797 for (;;) {
7be8da07
AG
1798 if (!seq_greater(peer_seq - 1, mdev->peer_seq)) {
1799 mdev->peer_seq = seq_max(mdev->peer_seq, peer_seq);
1800 ret = 0;
b411b363 1801 break;
7be8da07 1802 }
b411b363
PR
1803 if (signal_pending(current)) {
1804 ret = -ERESTARTSYS;
1805 break;
1806 }
7be8da07 1807 prepare_to_wait(&mdev->seq_wait, &wait, TASK_INTERRUPTIBLE);
b411b363 1808 spin_unlock(&mdev->peer_seq_lock);
71b1c1eb
AG
1809 timeout = mdev->tconn->net_conf->ping_timeo*HZ/10;
1810 timeout = schedule_timeout(timeout);
b411b363 1811 spin_lock(&mdev->peer_seq_lock);
7be8da07 1812 if (!timeout) {
b411b363 1813 ret = -ETIMEDOUT;
71b1c1eb 1814 dev_err(DEV, "Timed out waiting for missing ack packets; disconnecting\n");
b411b363
PR
1815 break;
1816 }
1817 }
b411b363 1818 spin_unlock(&mdev->peer_seq_lock);
7be8da07 1819 finish_wait(&mdev->seq_wait, &wait);
b411b363
PR
1820 return ret;
1821}
1822
688593c5
LE
1823/* see also bio_flags_to_wire()
1824 * DRBD_REQ_*, because we need to semantically map the flags to data packet
1825 * flags and back. We may replicate to other kernel versions. */
1826static unsigned long wire_flags_to_bio(struct drbd_conf *mdev, u32 dpf)
76d2e7ec 1827{
688593c5
LE
1828 return (dpf & DP_RW_SYNC ? REQ_SYNC : 0) |
1829 (dpf & DP_FUA ? REQ_FUA : 0) |
1830 (dpf & DP_FLUSH ? REQ_FLUSH : 0) |
1831 (dpf & DP_DISCARD ? REQ_DISCARD : 0);
76d2e7ec
PR
1832}
1833
7be8da07
AG
1834static void fail_postponed_requests(struct drbd_conf *mdev, sector_t sector,
1835 unsigned int size)
1836{
1837 struct drbd_interval *i;
1838
1839 repeat:
1840 drbd_for_each_overlap(i, &mdev->write_requests, sector, size) {
1841 struct drbd_request *req;
1842 struct bio_and_error m;
1843
1844 if (!i->local)
1845 continue;
1846 req = container_of(i, struct drbd_request, i);
1847 if (!(req->rq_state & RQ_POSTPONED))
1848 continue;
1849 req->rq_state &= ~RQ_POSTPONED;
1850 __req_mod(req, NEG_ACKED, &m);
1851 spin_unlock_irq(&mdev->tconn->req_lock);
1852 if (m.bio)
1853 complete_master_bio(mdev, &m);
1854 spin_lock_irq(&mdev->tconn->req_lock);
1855 goto repeat;
1856 }
1857}
1858
1859static int handle_write_conflicts(struct drbd_conf *mdev,
1860 struct drbd_peer_request *peer_req)
1861{
1862 struct drbd_tconn *tconn = mdev->tconn;
1863 bool resolve_conflicts = test_bit(DISCARD_CONCURRENT, &tconn->flags);
1864 sector_t sector = peer_req->i.sector;
1865 const unsigned int size = peer_req->i.size;
1866 struct drbd_interval *i;
1867 bool equal;
1868 int err;
1869
1870 /*
1871 * Inserting the peer request into the write_requests tree will prevent
1872 * new conflicting local requests from being added.
1873 */
1874 drbd_insert_interval(&mdev->write_requests, &peer_req->i);
1875
1876 repeat:
1877 drbd_for_each_overlap(i, &mdev->write_requests, sector, size) {
1878 if (i == &peer_req->i)
1879 continue;
1880
1881 if (!i->local) {
1882 /*
1883 * Our peer has sent a conflicting remote request; this
1884 * should not happen in a two-node setup. Wait for the
1885 * earlier peer request to complete.
1886 */
1887 err = drbd_wait_misc(mdev, i);
1888 if (err)
1889 goto out;
1890 goto repeat;
1891 }
1892
1893 equal = i->sector == sector && i->size == size;
1894 if (resolve_conflicts) {
1895 /*
1896 * If the peer request is fully contained within the
1897 * overlapping request, it can be discarded; otherwise,
1898 * it will be retried once all overlapping requests
1899 * have completed.
1900 */
1901 bool discard = i->sector <= sector && i->sector +
1902 (i->size >> 9) >= sector + (size >> 9);
1903
1904 if (!equal)
1905 dev_alert(DEV, "Concurrent writes detected: "
1906 "local=%llus +%u, remote=%llus +%u, "
1907 "assuming %s came first\n",
1908 (unsigned long long)i->sector, i->size,
1909 (unsigned long long)sector, size,
1910 discard ? "local" : "remote");
1911
1912 inc_unacked(mdev);
1913 peer_req->w.cb = discard ? e_send_discard_write :
1914 e_send_retry_write;
1915 list_add_tail(&peer_req->w.list, &mdev->done_ee);
1916 wake_asender(mdev->tconn);
1917
1918 err = -ENOENT;
1919 goto out;
1920 } else {
1921 struct drbd_request *req =
1922 container_of(i, struct drbd_request, i);
1923
1924 if (!equal)
1925 dev_alert(DEV, "Concurrent writes detected: "
1926 "local=%llus +%u, remote=%llus +%u\n",
1927 (unsigned long long)i->sector, i->size,
1928 (unsigned long long)sector, size);
1929
1930 if (req->rq_state & RQ_LOCAL_PENDING ||
1931 !(req->rq_state & RQ_POSTPONED)) {
1932 /*
1933 * Wait for the node with the discard flag to
1934 * decide if this request will be discarded or
1935 * retried. Requests that are discarded will
1936 * disappear from the write_requests tree.
1937 *
1938 * In addition, wait for the conflicting
1939 * request to finish locally before submitting
1940 * the conflicting peer request.
1941 */
1942 err = drbd_wait_misc(mdev, &req->i);
1943 if (err) {
1944 _conn_request_state(mdev->tconn,
1945 NS(conn, C_TIMEOUT),
1946 CS_HARD);
1947 fail_postponed_requests(mdev, sector, size);
1948 goto out;
1949 }
1950 goto repeat;
1951 }
1952 /*
1953 * Remember to restart the conflicting requests after
1954 * the new peer request has completed.
1955 */
1956 peer_req->flags |= EE_RESTART_REQUESTS;
1957 }
1958 }
1959 err = 0;
1960
1961 out:
1962 if (err)
1963 drbd_remove_epoch_entry_interval(mdev, peer_req);
1964 return err;
1965}
1966
b411b363 1967/* mirrored write */
4a76b161 1968static int receive_Data(struct drbd_tconn *tconn, struct packet_info *pi)
b411b363 1969{
4a76b161 1970 struct drbd_conf *mdev;
b411b363 1971 sector_t sector;
db830c46 1972 struct drbd_peer_request *peer_req;
4a76b161 1973 struct p_data *p = tconn->data.rbuf;
7be8da07 1974 u32 peer_seq = be32_to_cpu(p->seq_num);
b411b363
PR
1975 int rw = WRITE;
1976 u32 dp_flags;
7be8da07 1977 int err;
b411b363 1978
4a76b161
AG
1979 mdev = vnr_to_mdev(tconn, pi->vnr);
1980 if (!mdev)
1981 return -EIO;
1982
7be8da07 1983 if (!get_ldev(mdev)) {
82bc0194
AG
1984 int err2;
1985
7be8da07 1986 err = wait_for_and_update_peer_seq(mdev, peer_seq);
e2857216 1987 drbd_send_ack_dp(mdev, P_NEG_ACK, p, pi->size);
b411b363 1988 atomic_inc(&mdev->current_epoch->epoch_size);
e2857216 1989 err2 = drbd_drain_block(mdev, pi->size);
82bc0194
AG
1990 if (!err)
1991 err = err2;
1992 return err;
b411b363
PR
1993 }
1994
fcefa62e
AG
1995 /*
1996 * Corresponding put_ldev done either below (on various errors), or in
1997 * drbd_peer_request_endio, if we successfully submit the data at the
1998 * end of this function.
1999 */
b411b363
PR
2000
2001 sector = be64_to_cpu(p->sector);
e2857216 2002 peer_req = read_in_block(mdev, p->block_id, sector, pi->size);
db830c46 2003 if (!peer_req) {
b411b363 2004 put_ldev(mdev);
82bc0194 2005 return -EIO;
b411b363
PR
2006 }
2007
db830c46 2008 peer_req->w.cb = e_end_block;
b411b363 2009
688593c5
LE
2010 dp_flags = be32_to_cpu(p->dp_flags);
2011 rw |= wire_flags_to_bio(mdev, dp_flags);
2012
2013 if (dp_flags & DP_MAY_SET_IN_SYNC)
db830c46 2014 peer_req->flags |= EE_MAY_SET_IN_SYNC;
688593c5 2015
b411b363 2016 spin_lock(&mdev->epoch_lock);
db830c46
AG
2017 peer_req->epoch = mdev->current_epoch;
2018 atomic_inc(&peer_req->epoch->epoch_size);
2019 atomic_inc(&peer_req->epoch->active);
b411b363
PR
2020 spin_unlock(&mdev->epoch_lock);
2021
7be8da07
AG
2022 if (mdev->tconn->net_conf->two_primaries) {
2023 err = wait_for_and_update_peer_seq(mdev, peer_seq);
2024 if (err)
b411b363 2025 goto out_interrupted;
87eeee41 2026 spin_lock_irq(&mdev->tconn->req_lock);
7be8da07
AG
2027 err = handle_write_conflicts(mdev, peer_req);
2028 if (err) {
2029 spin_unlock_irq(&mdev->tconn->req_lock);
2030 if (err == -ENOENT) {
b411b363 2031 put_ldev(mdev);
82bc0194 2032 return 0;
b411b363 2033 }
7be8da07 2034 goto out_interrupted;
b411b363 2035 }
7be8da07
AG
2036 } else
2037 spin_lock_irq(&mdev->tconn->req_lock);
db830c46 2038 list_add(&peer_req->w.list, &mdev->active_ee);
87eeee41 2039 spin_unlock_irq(&mdev->tconn->req_lock);
b411b363 2040
89e58e75 2041 switch (mdev->tconn->net_conf->wire_protocol) {
b411b363
PR
2042 case DRBD_PROT_C:
2043 inc_unacked(mdev);
2044 /* corresponding dec_unacked() in e_end_block()
2045 * respective _drbd_clear_done_ee */
2046 break;
2047 case DRBD_PROT_B:
2048 /* I really don't like it that the receiver thread
2049 * sends on the msock, but anyways */
db830c46 2050 drbd_send_ack(mdev, P_RECV_ACK, peer_req);
b411b363
PR
2051 break;
2052 case DRBD_PROT_A:
2053 /* nothing to do */
2054 break;
2055 }
2056
6719fb03 2057 if (mdev->state.pdsk < D_INCONSISTENT) {
b411b363 2058 /* In case we have the only disk of the cluster, */
db830c46
AG
2059 drbd_set_out_of_sync(mdev, peer_req->i.sector, peer_req->i.size);
2060 peer_req->flags |= EE_CALL_AL_COMPLETE_IO;
2061 peer_req->flags &= ~EE_MAY_SET_IN_SYNC;
2062 drbd_al_begin_io(mdev, peer_req->i.sector);
b411b363
PR
2063 }
2064
82bc0194
AG
2065 err = drbd_submit_peer_request(mdev, peer_req, rw, DRBD_FAULT_DT_WR);
2066 if (!err)
2067 return 0;
b411b363 2068
10f6d992
LE
2069 /* don't care for the reason here */
2070 dev_err(DEV, "submit failed, triggering re-connect\n");
87eeee41 2071 spin_lock_irq(&mdev->tconn->req_lock);
db830c46
AG
2072 list_del(&peer_req->w.list);
2073 drbd_remove_epoch_entry_interval(mdev, peer_req);
87eeee41 2074 spin_unlock_irq(&mdev->tconn->req_lock);
db830c46
AG
2075 if (peer_req->flags & EE_CALL_AL_COMPLETE_IO)
2076 drbd_al_complete_io(mdev, peer_req->i.sector);
22cc37a9 2077
b411b363 2078out_interrupted:
db830c46 2079 drbd_may_finish_epoch(mdev, peer_req->epoch, EV_PUT + EV_CLEANUP);
b411b363 2080 put_ldev(mdev);
db830c46 2081 drbd_free_ee(mdev, peer_req);
82bc0194 2082 return err;
b411b363
PR
2083}
2084
0f0601f4
LE
2085/* We may throttle resync, if the lower device seems to be busy,
2086 * and current sync rate is above c_min_rate.
2087 *
2088 * To decide whether or not the lower device is busy, we use a scheme similar
2089 * to MD RAID is_mddev_idle(): if the partition stats reveal "significant"
2090 * (more than 64 sectors) of activity we cannot account for with our own resync
2091 * activity, it obviously is "busy".
2092 *
2093 * The current sync rate used here uses only the most recent two step marks,
2094 * to have a short time average so we can react faster.
2095 */
e3555d85 2096int drbd_rs_should_slow_down(struct drbd_conf *mdev, sector_t sector)
0f0601f4
LE
2097{
2098 struct gendisk *disk = mdev->ldev->backing_bdev->bd_contains->bd_disk;
2099 unsigned long db, dt, dbdt;
e3555d85 2100 struct lc_element *tmp;
0f0601f4
LE
2101 int curr_events;
2102 int throttle = 0;
2103
2104 /* feature disabled? */
f399002e 2105 if (mdev->ldev->dc.c_min_rate == 0)
0f0601f4
LE
2106 return 0;
2107
e3555d85
PR
2108 spin_lock_irq(&mdev->al_lock);
2109 tmp = lc_find(mdev->resync, BM_SECT_TO_EXT(sector));
2110 if (tmp) {
2111 struct bm_extent *bm_ext = lc_entry(tmp, struct bm_extent, lce);
2112 if (test_bit(BME_PRIORITY, &bm_ext->flags)) {
2113 spin_unlock_irq(&mdev->al_lock);
2114 return 0;
2115 }
2116 /* Do not slow down if app IO is already waiting for this extent */
2117 }
2118 spin_unlock_irq(&mdev->al_lock);
2119
0f0601f4
LE
2120 curr_events = (int)part_stat_read(&disk->part0, sectors[0]) +
2121 (int)part_stat_read(&disk->part0, sectors[1]) -
2122 atomic_read(&mdev->rs_sect_ev);
e3555d85 2123
0f0601f4
LE
2124 if (!mdev->rs_last_events || curr_events - mdev->rs_last_events > 64) {
2125 unsigned long rs_left;
2126 int i;
2127
2128 mdev->rs_last_events = curr_events;
2129
2130 /* sync speed average over the last 2*DRBD_SYNC_MARK_STEP,
2131 * approx. */
2649f080
LE
2132 i = (mdev->rs_last_mark + DRBD_SYNC_MARKS-1) % DRBD_SYNC_MARKS;
2133
2134 if (mdev->state.conn == C_VERIFY_S || mdev->state.conn == C_VERIFY_T)
2135 rs_left = mdev->ov_left;
2136 else
2137 rs_left = drbd_bm_total_weight(mdev) - mdev->rs_failed;
0f0601f4
LE
2138
2139 dt = ((long)jiffies - (long)mdev->rs_mark_time[i]) / HZ;
2140 if (!dt)
2141 dt++;
2142 db = mdev->rs_mark_left[i] - rs_left;
2143 dbdt = Bit2KB(db/dt);
2144
f399002e 2145 if (dbdt > mdev->ldev->dc.c_min_rate)
0f0601f4
LE
2146 throttle = 1;
2147 }
2148 return throttle;
2149}
2150
2151
4a76b161 2152static int receive_DataRequest(struct drbd_tconn *tconn, struct packet_info *pi)
b411b363 2153{
4a76b161 2154 struct drbd_conf *mdev;
b411b363 2155 sector_t sector;
4a76b161 2156 sector_t capacity;
db830c46 2157 struct drbd_peer_request *peer_req;
b411b363 2158 struct digest_info *di = NULL;
b18b37be 2159 int size, verb;
b411b363 2160 unsigned int fault_type;
4a76b161
AG
2161 struct p_block_req *p = tconn->data.rbuf;
2162
2163 mdev = vnr_to_mdev(tconn, pi->vnr);
2164 if (!mdev)
2165 return -EIO;
2166 capacity = drbd_get_capacity(mdev->this_bdev);
b411b363
PR
2167
2168 sector = be64_to_cpu(p->sector);
2169 size = be32_to_cpu(p->blksize);
2170
c670a398 2171 if (size <= 0 || !IS_ALIGNED(size, 512) || size > DRBD_MAX_BIO_SIZE) {
b411b363
PR
2172 dev_err(DEV, "%s:%d: sector: %llus, size: %u\n", __FILE__, __LINE__,
2173 (unsigned long long)sector, size);
82bc0194 2174 return -EINVAL;
b411b363
PR
2175 }
2176 if (sector + (size>>9) > capacity) {
2177 dev_err(DEV, "%s:%d: sector: %llus, size: %u\n", __FILE__, __LINE__,
2178 (unsigned long long)sector, size);
82bc0194 2179 return -EINVAL;
b411b363
PR
2180 }
2181
2182 if (!get_ldev_if_state(mdev, D_UP_TO_DATE)) {
b18b37be 2183 verb = 1;
e2857216 2184 switch (pi->cmd) {
b18b37be
PR
2185 case P_DATA_REQUEST:
2186 drbd_send_ack_rp(mdev, P_NEG_DREPLY, p);
2187 break;
2188 case P_RS_DATA_REQUEST:
2189 case P_CSUM_RS_REQUEST:
2190 case P_OV_REQUEST:
2191 drbd_send_ack_rp(mdev, P_NEG_RS_DREPLY , p);
2192 break;
2193 case P_OV_REPLY:
2194 verb = 0;
2195 dec_rs_pending(mdev);
2196 drbd_send_ack_ex(mdev, P_OV_RESULT, sector, size, ID_IN_SYNC);
2197 break;
2198 default:
49ba9b1b 2199 BUG();
b18b37be
PR
2200 }
2201 if (verb && __ratelimit(&drbd_ratelimit_state))
b411b363
PR
2202 dev_err(DEV, "Can not satisfy peer's read request, "
2203 "no local data.\n");
b18b37be 2204
a821cc4a 2205 /* drain possibly payload */
e2857216 2206 return drbd_drain_block(mdev, pi->size);
b411b363
PR
2207 }
2208
2209 /* GFP_NOIO, because we must not cause arbitrary write-out: in a DRBD
2210 * "criss-cross" setup, that might cause write-out on some other DRBD,
2211 * which in turn might block on the other node at this very place. */
db830c46
AG
2212 peer_req = drbd_alloc_ee(mdev, p->block_id, sector, size, GFP_NOIO);
2213 if (!peer_req) {
b411b363 2214 put_ldev(mdev);
82bc0194 2215 return -ENOMEM;
b411b363
PR
2216 }
2217
e2857216 2218 switch (pi->cmd) {
b411b363 2219 case P_DATA_REQUEST:
db830c46 2220 peer_req->w.cb = w_e_end_data_req;
b411b363 2221 fault_type = DRBD_FAULT_DT_RD;
80a40e43
LE
2222 /* application IO, don't drbd_rs_begin_io */
2223 goto submit;
2224
b411b363 2225 case P_RS_DATA_REQUEST:
db830c46 2226 peer_req->w.cb = w_e_end_rsdata_req;
b411b363 2227 fault_type = DRBD_FAULT_RS_RD;
5f9915bb
LE
2228 /* used in the sector offset progress display */
2229 mdev->bm_resync_fo = BM_SECT_TO_BIT(sector);
b411b363
PR
2230 break;
2231
2232 case P_OV_REPLY:
2233 case P_CSUM_RS_REQUEST:
2234 fault_type = DRBD_FAULT_RS_RD;
e2857216 2235 di = kmalloc(sizeof(*di) + pi->size, GFP_NOIO);
b411b363
PR
2236 if (!di)
2237 goto out_free_e;
2238
e2857216 2239 di->digest_size = pi->size;
b411b363
PR
2240 di->digest = (((char *)di)+sizeof(struct digest_info));
2241
db830c46
AG
2242 peer_req->digest = di;
2243 peer_req->flags |= EE_HAS_DIGEST;
c36c3ced 2244
e2857216 2245 if (drbd_recv_all(mdev->tconn, di->digest, pi->size))
b411b363
PR
2246 goto out_free_e;
2247
e2857216 2248 if (pi->cmd == P_CSUM_RS_REQUEST) {
31890f4a 2249 D_ASSERT(mdev->tconn->agreed_pro_version >= 89);
db830c46 2250 peer_req->w.cb = w_e_end_csum_rs_req;
5f9915bb
LE
2251 /* used in the sector offset progress display */
2252 mdev->bm_resync_fo = BM_SECT_TO_BIT(sector);
e2857216 2253 } else if (pi->cmd == P_OV_REPLY) {
2649f080
LE
2254 /* track progress, we may need to throttle */
2255 atomic_add(size >> 9, &mdev->rs_sect_in);
db830c46 2256 peer_req->w.cb = w_e_end_ov_reply;
b411b363 2257 dec_rs_pending(mdev);
0f0601f4
LE
2258 /* drbd_rs_begin_io done when we sent this request,
2259 * but accounting still needs to be done. */
2260 goto submit_for_resync;
b411b363
PR
2261 }
2262 break;
2263
2264 case P_OV_REQUEST:
b411b363 2265 if (mdev->ov_start_sector == ~(sector_t)0 &&
31890f4a 2266 mdev->tconn->agreed_pro_version >= 90) {
de228bba
LE
2267 unsigned long now = jiffies;
2268 int i;
b411b363
PR
2269 mdev->ov_start_sector = sector;
2270 mdev->ov_position = sector;
30b743a2
LE
2271 mdev->ov_left = drbd_bm_bits(mdev) - BM_SECT_TO_BIT(sector);
2272 mdev->rs_total = mdev->ov_left;
de228bba
LE
2273 for (i = 0; i < DRBD_SYNC_MARKS; i++) {
2274 mdev->rs_mark_left[i] = mdev->ov_left;
2275 mdev->rs_mark_time[i] = now;
2276 }
b411b363
PR
2277 dev_info(DEV, "Online Verify start sector: %llu\n",
2278 (unsigned long long)sector);
2279 }
db830c46 2280 peer_req->w.cb = w_e_end_ov_req;
b411b363 2281 fault_type = DRBD_FAULT_RS_RD;
b411b363
PR
2282 break;
2283
b411b363 2284 default:
49ba9b1b 2285 BUG();
b411b363
PR
2286 }
2287
0f0601f4
LE
2288 /* Throttle, drbd_rs_begin_io and submit should become asynchronous
2289 * wrt the receiver, but it is not as straightforward as it may seem.
2290 * Various places in the resync start and stop logic assume resync
2291 * requests are processed in order, requeuing this on the worker thread
2292 * introduces a bunch of new code for synchronization between threads.
2293 *
2294 * Unlimited throttling before drbd_rs_begin_io may stall the resync
2295 * "forever", throttling after drbd_rs_begin_io will lock that extent
2296 * for application writes for the same time. For now, just throttle
2297 * here, where the rest of the code expects the receiver to sleep for
2298 * a while, anyways.
2299 */
2300
2301 /* Throttle before drbd_rs_begin_io, as that locks out application IO;
2302 * this defers syncer requests for some time, before letting at least
2303 * on request through. The resync controller on the receiving side
2304 * will adapt to the incoming rate accordingly.
2305 *
2306 * We cannot throttle here if remote is Primary/SyncTarget:
2307 * we would also throttle its application reads.
2308 * In that case, throttling is done on the SyncTarget only.
2309 */
e3555d85
PR
2310 if (mdev->state.peer != R_PRIMARY && drbd_rs_should_slow_down(mdev, sector))
2311 schedule_timeout_uninterruptible(HZ/10);
2312 if (drbd_rs_begin_io(mdev, sector))
80a40e43 2313 goto out_free_e;
b411b363 2314
0f0601f4
LE
2315submit_for_resync:
2316 atomic_add(size >> 9, &mdev->rs_sect_ev);
2317
80a40e43 2318submit:
b411b363 2319 inc_unacked(mdev);
87eeee41 2320 spin_lock_irq(&mdev->tconn->req_lock);
db830c46 2321 list_add_tail(&peer_req->w.list, &mdev->read_ee);
87eeee41 2322 spin_unlock_irq(&mdev->tconn->req_lock);
b411b363 2323
fbe29dec 2324 if (drbd_submit_peer_request(mdev, peer_req, READ, fault_type) == 0)
82bc0194 2325 return 0;
b411b363 2326
10f6d992
LE
2327 /* don't care for the reason here */
2328 dev_err(DEV, "submit failed, triggering re-connect\n");
87eeee41 2329 spin_lock_irq(&mdev->tconn->req_lock);
db830c46 2330 list_del(&peer_req->w.list);
87eeee41 2331 spin_unlock_irq(&mdev->tconn->req_lock);
22cc37a9
LE
2332 /* no drbd_rs_complete_io(), we are dropping the connection anyways */
2333
b411b363 2334out_free_e:
b411b363 2335 put_ldev(mdev);
db830c46 2336 drbd_free_ee(mdev, peer_req);
82bc0194 2337 return -EIO;
b411b363
PR
2338}
2339
2340static int drbd_asb_recover_0p(struct drbd_conf *mdev) __must_hold(local)
2341{
2342 int self, peer, rv = -100;
2343 unsigned long ch_self, ch_peer;
2344
2345 self = mdev->ldev->md.uuid[UI_BITMAP] & 1;
2346 peer = mdev->p_uuid[UI_BITMAP] & 1;
2347
2348 ch_peer = mdev->p_uuid[UI_SIZE];
2349 ch_self = mdev->comm_bm_set;
2350
89e58e75 2351 switch (mdev->tconn->net_conf->after_sb_0p) {
b411b363
PR
2352 case ASB_CONSENSUS:
2353 case ASB_DISCARD_SECONDARY:
2354 case ASB_CALL_HELPER:
2355 dev_err(DEV, "Configuration error.\n");
2356 break;
2357 case ASB_DISCONNECT:
2358 break;
2359 case ASB_DISCARD_YOUNGER_PRI:
2360 if (self == 0 && peer == 1) {
2361 rv = -1;
2362 break;
2363 }
2364 if (self == 1 && peer == 0) {
2365 rv = 1;
2366 break;
2367 }
2368 /* Else fall through to one of the other strategies... */
2369 case ASB_DISCARD_OLDER_PRI:
2370 if (self == 0 && peer == 1) {
2371 rv = 1;
2372 break;
2373 }
2374 if (self == 1 && peer == 0) {
2375 rv = -1;
2376 break;
2377 }
2378 /* Else fall through to one of the other strategies... */
ad19bf6e 2379 dev_warn(DEV, "Discard younger/older primary did not find a decision\n"
b411b363
PR
2380 "Using discard-least-changes instead\n");
2381 case ASB_DISCARD_ZERO_CHG:
2382 if (ch_peer == 0 && ch_self == 0) {
25703f83 2383 rv = test_bit(DISCARD_CONCURRENT, &mdev->tconn->flags)
b411b363
PR
2384 ? -1 : 1;
2385 break;
2386 } else {
2387 if (ch_peer == 0) { rv = 1; break; }
2388 if (ch_self == 0) { rv = -1; break; }
2389 }
89e58e75 2390 if (mdev->tconn->net_conf->after_sb_0p == ASB_DISCARD_ZERO_CHG)
b411b363
PR
2391 break;
2392 case ASB_DISCARD_LEAST_CHG:
2393 if (ch_self < ch_peer)
2394 rv = -1;
2395 else if (ch_self > ch_peer)
2396 rv = 1;
2397 else /* ( ch_self == ch_peer ) */
2398 /* Well, then use something else. */
25703f83 2399 rv = test_bit(DISCARD_CONCURRENT, &mdev->tconn->flags)
b411b363
PR
2400 ? -1 : 1;
2401 break;
2402 case ASB_DISCARD_LOCAL:
2403 rv = -1;
2404 break;
2405 case ASB_DISCARD_REMOTE:
2406 rv = 1;
2407 }
2408
2409 return rv;
2410}
2411
2412static int drbd_asb_recover_1p(struct drbd_conf *mdev) __must_hold(local)
2413{
6184ea21 2414 int hg, rv = -100;
b411b363 2415
89e58e75 2416 switch (mdev->tconn->net_conf->after_sb_1p) {
b411b363
PR
2417 case ASB_DISCARD_YOUNGER_PRI:
2418 case ASB_DISCARD_OLDER_PRI:
2419 case ASB_DISCARD_LEAST_CHG:
2420 case ASB_DISCARD_LOCAL:
2421 case ASB_DISCARD_REMOTE:
2422 dev_err(DEV, "Configuration error.\n");
2423 break;
2424 case ASB_DISCONNECT:
2425 break;
2426 case ASB_CONSENSUS:
2427 hg = drbd_asb_recover_0p(mdev);
2428 if (hg == -1 && mdev->state.role == R_SECONDARY)
2429 rv = hg;
2430 if (hg == 1 && mdev->state.role == R_PRIMARY)
2431 rv = hg;
2432 break;
2433 case ASB_VIOLENTLY:
2434 rv = drbd_asb_recover_0p(mdev);
2435 break;
2436 case ASB_DISCARD_SECONDARY:
2437 return mdev->state.role == R_PRIMARY ? 1 : -1;
2438 case ASB_CALL_HELPER:
2439 hg = drbd_asb_recover_0p(mdev);
2440 if (hg == -1 && mdev->state.role == R_PRIMARY) {
bb437946
AG
2441 enum drbd_state_rv rv2;
2442
2443 drbd_set_role(mdev, R_SECONDARY, 0);
b411b363
PR
2444 /* drbd_change_state() does not sleep while in SS_IN_TRANSIENT_STATE,
2445 * we might be here in C_WF_REPORT_PARAMS which is transient.
2446 * we do not need to wait for the after state change work either. */
bb437946
AG
2447 rv2 = drbd_change_state(mdev, CS_VERBOSE, NS(role, R_SECONDARY));
2448 if (rv2 != SS_SUCCESS) {
b411b363
PR
2449 drbd_khelper(mdev, "pri-lost-after-sb");
2450 } else {
2451 dev_warn(DEV, "Successfully gave up primary role.\n");
2452 rv = hg;
2453 }
2454 } else
2455 rv = hg;
2456 }
2457
2458 return rv;
2459}
2460
2461static int drbd_asb_recover_2p(struct drbd_conf *mdev) __must_hold(local)
2462{
6184ea21 2463 int hg, rv = -100;
b411b363 2464
89e58e75 2465 switch (mdev->tconn->net_conf->after_sb_2p) {
b411b363
PR
2466 case ASB_DISCARD_YOUNGER_PRI:
2467 case ASB_DISCARD_OLDER_PRI:
2468 case ASB_DISCARD_LEAST_CHG:
2469 case ASB_DISCARD_LOCAL:
2470 case ASB_DISCARD_REMOTE:
2471 case ASB_CONSENSUS:
2472 case ASB_DISCARD_SECONDARY:
2473 dev_err(DEV, "Configuration error.\n");
2474 break;
2475 case ASB_VIOLENTLY:
2476 rv = drbd_asb_recover_0p(mdev);
2477 break;
2478 case ASB_DISCONNECT:
2479 break;
2480 case ASB_CALL_HELPER:
2481 hg = drbd_asb_recover_0p(mdev);
2482 if (hg == -1) {
bb437946
AG
2483 enum drbd_state_rv rv2;
2484
b411b363
PR
2485 /* drbd_change_state() does not sleep while in SS_IN_TRANSIENT_STATE,
2486 * we might be here in C_WF_REPORT_PARAMS which is transient.
2487 * we do not need to wait for the after state change work either. */
bb437946
AG
2488 rv2 = drbd_change_state(mdev, CS_VERBOSE, NS(role, R_SECONDARY));
2489 if (rv2 != SS_SUCCESS) {
b411b363
PR
2490 drbd_khelper(mdev, "pri-lost-after-sb");
2491 } else {
2492 dev_warn(DEV, "Successfully gave up primary role.\n");
2493 rv = hg;
2494 }
2495 } else
2496 rv = hg;
2497 }
2498
2499 return rv;
2500}
2501
2502static void drbd_uuid_dump(struct drbd_conf *mdev, char *text, u64 *uuid,
2503 u64 bits, u64 flags)
2504{
2505 if (!uuid) {
2506 dev_info(DEV, "%s uuid info vanished while I was looking!\n", text);
2507 return;
2508 }
2509 dev_info(DEV, "%s %016llX:%016llX:%016llX:%016llX bits:%llu flags:%llX\n",
2510 text,
2511 (unsigned long long)uuid[UI_CURRENT],
2512 (unsigned long long)uuid[UI_BITMAP],
2513 (unsigned long long)uuid[UI_HISTORY_START],
2514 (unsigned long long)uuid[UI_HISTORY_END],
2515 (unsigned long long)bits,
2516 (unsigned long long)flags);
2517}
2518
2519/*
2520 100 after split brain try auto recover
2521 2 C_SYNC_SOURCE set BitMap
2522 1 C_SYNC_SOURCE use BitMap
2523 0 no Sync
2524 -1 C_SYNC_TARGET use BitMap
2525 -2 C_SYNC_TARGET set BitMap
2526 -100 after split brain, disconnect
2527-1000 unrelated data
4a23f264
PR
2528-1091 requires proto 91
2529-1096 requires proto 96
b411b363
PR
2530 */
2531static int drbd_uuid_compare(struct drbd_conf *mdev, int *rule_nr) __must_hold(local)
2532{
2533 u64 self, peer;
2534 int i, j;
2535
2536 self = mdev->ldev->md.uuid[UI_CURRENT] & ~((u64)1);
2537 peer = mdev->p_uuid[UI_CURRENT] & ~((u64)1);
2538
2539 *rule_nr = 10;
2540 if (self == UUID_JUST_CREATED && peer == UUID_JUST_CREATED)
2541 return 0;
2542
2543 *rule_nr = 20;
2544 if ((self == UUID_JUST_CREATED || self == (u64)0) &&
2545 peer != UUID_JUST_CREATED)
2546 return -2;
2547
2548 *rule_nr = 30;
2549 if (self != UUID_JUST_CREATED &&
2550 (peer == UUID_JUST_CREATED || peer == (u64)0))
2551 return 2;
2552
2553 if (self == peer) {
2554 int rct, dc; /* roles at crash time */
2555
2556 if (mdev->p_uuid[UI_BITMAP] == (u64)0 && mdev->ldev->md.uuid[UI_BITMAP] != (u64)0) {
2557
31890f4a 2558 if (mdev->tconn->agreed_pro_version < 91)
4a23f264 2559 return -1091;
b411b363
PR
2560
2561 if ((mdev->ldev->md.uuid[UI_BITMAP] & ~((u64)1)) == (mdev->p_uuid[UI_HISTORY_START] & ~((u64)1)) &&
2562 (mdev->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) == (mdev->p_uuid[UI_HISTORY_START + 1] & ~((u64)1))) {
2563 dev_info(DEV, "was SyncSource, missed the resync finished event, corrected myself:\n");
2564 drbd_uuid_set_bm(mdev, 0UL);
2565
2566 drbd_uuid_dump(mdev, "self", mdev->ldev->md.uuid,
2567 mdev->state.disk >= D_NEGOTIATING ? drbd_bm_total_weight(mdev) : 0, 0);
2568 *rule_nr = 34;
2569 } else {
2570 dev_info(DEV, "was SyncSource (peer failed to write sync_uuid)\n");
2571 *rule_nr = 36;
2572 }
2573
2574 return 1;
2575 }
2576
2577 if (mdev->ldev->md.uuid[UI_BITMAP] == (u64)0 && mdev->p_uuid[UI_BITMAP] != (u64)0) {
2578
31890f4a 2579 if (mdev->tconn->agreed_pro_version < 91)
4a23f264 2580 return -1091;
b411b363
PR
2581
2582 if ((mdev->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) == (mdev->p_uuid[UI_BITMAP] & ~((u64)1)) &&
2583 (mdev->ldev->md.uuid[UI_HISTORY_START + 1] & ~((u64)1)) == (mdev->p_uuid[UI_HISTORY_START] & ~((u64)1))) {
2584 dev_info(DEV, "was SyncTarget, peer missed the resync finished event, corrected peer:\n");
2585
2586 mdev->p_uuid[UI_HISTORY_START + 1] = mdev->p_uuid[UI_HISTORY_START];
2587 mdev->p_uuid[UI_HISTORY_START] = mdev->p_uuid[UI_BITMAP];
2588 mdev->p_uuid[UI_BITMAP] = 0UL;
2589
2590 drbd_uuid_dump(mdev, "peer", mdev->p_uuid, mdev->p_uuid[UI_SIZE], mdev->p_uuid[UI_FLAGS]);
2591 *rule_nr = 35;
2592 } else {
2593 dev_info(DEV, "was SyncTarget (failed to write sync_uuid)\n");
2594 *rule_nr = 37;
2595 }
2596
2597 return -1;
2598 }
2599
2600 /* Common power [off|failure] */
2601 rct = (test_bit(CRASHED_PRIMARY, &mdev->flags) ? 1 : 0) +
2602 (mdev->p_uuid[UI_FLAGS] & 2);
2603 /* lowest bit is set when we were primary,
2604 * next bit (weight 2) is set when peer was primary */
2605 *rule_nr = 40;
2606
2607 switch (rct) {
2608 case 0: /* !self_pri && !peer_pri */ return 0;
2609 case 1: /* self_pri && !peer_pri */ return 1;
2610 case 2: /* !self_pri && peer_pri */ return -1;
2611 case 3: /* self_pri && peer_pri */
25703f83 2612 dc = test_bit(DISCARD_CONCURRENT, &mdev->tconn->flags);
b411b363
PR
2613 return dc ? -1 : 1;
2614 }
2615 }
2616
2617 *rule_nr = 50;
2618 peer = mdev->p_uuid[UI_BITMAP] & ~((u64)1);
2619 if (self == peer)
2620 return -1;
2621
2622 *rule_nr = 51;
2623 peer = mdev->p_uuid[UI_HISTORY_START] & ~((u64)1);
2624 if (self == peer) {
31890f4a 2625 if (mdev->tconn->agreed_pro_version < 96 ?
4a23f264
PR
2626 (mdev->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) ==
2627 (mdev->p_uuid[UI_HISTORY_START + 1] & ~((u64)1)) :
2628 peer + UUID_NEW_BM_OFFSET == (mdev->p_uuid[UI_BITMAP] & ~((u64)1))) {
b411b363
PR
2629 /* The last P_SYNC_UUID did not get though. Undo the last start of
2630 resync as sync source modifications of the peer's UUIDs. */
2631
31890f4a 2632 if (mdev->tconn->agreed_pro_version < 91)
4a23f264 2633 return -1091;
b411b363
PR
2634
2635 mdev->p_uuid[UI_BITMAP] = mdev->p_uuid[UI_HISTORY_START];
2636 mdev->p_uuid[UI_HISTORY_START] = mdev->p_uuid[UI_HISTORY_START + 1];
4a23f264
PR
2637
2638 dev_info(DEV, "Did not got last syncUUID packet, corrected:\n");
2639 drbd_uuid_dump(mdev, "peer", mdev->p_uuid, mdev->p_uuid[UI_SIZE], mdev->p_uuid[UI_FLAGS]);
2640
b411b363
PR
2641 return -1;
2642 }
2643 }
2644
2645 *rule_nr = 60;
2646 self = mdev->ldev->md.uuid[UI_CURRENT] & ~((u64)1);
2647 for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
2648 peer = mdev->p_uuid[i] & ~((u64)1);
2649 if (self == peer)
2650 return -2;
2651 }
2652
2653 *rule_nr = 70;
2654 self = mdev->ldev->md.uuid[UI_BITMAP] & ~((u64)1);
2655 peer = mdev->p_uuid[UI_CURRENT] & ~((u64)1);
2656 if (self == peer)
2657 return 1;
2658
2659 *rule_nr = 71;
2660 self = mdev->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1);
2661 if (self == peer) {
31890f4a 2662 if (mdev->tconn->agreed_pro_version < 96 ?
4a23f264
PR
2663 (mdev->ldev->md.uuid[UI_HISTORY_START + 1] & ~((u64)1)) ==
2664 (mdev->p_uuid[UI_HISTORY_START] & ~((u64)1)) :
2665 self + UUID_NEW_BM_OFFSET == (mdev->ldev->md.uuid[UI_BITMAP] & ~((u64)1))) {
b411b363
PR
2666 /* The last P_SYNC_UUID did not get though. Undo the last start of
2667 resync as sync source modifications of our UUIDs. */
2668
31890f4a 2669 if (mdev->tconn->agreed_pro_version < 91)
4a23f264 2670 return -1091;
b411b363
PR
2671
2672 _drbd_uuid_set(mdev, UI_BITMAP, mdev->ldev->md.uuid[UI_HISTORY_START]);
2673 _drbd_uuid_set(mdev, UI_HISTORY_START, mdev->ldev->md.uuid[UI_HISTORY_START + 1]);
2674
4a23f264 2675 dev_info(DEV, "Last syncUUID did not get through, corrected:\n");
b411b363
PR
2676 drbd_uuid_dump(mdev, "self", mdev->ldev->md.uuid,
2677 mdev->state.disk >= D_NEGOTIATING ? drbd_bm_total_weight(mdev) : 0, 0);
2678
2679 return 1;
2680 }
2681 }
2682
2683
2684 *rule_nr = 80;
d8c2a36b 2685 peer = mdev->p_uuid[UI_CURRENT] & ~((u64)1);
b411b363
PR
2686 for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
2687 self = mdev->ldev->md.uuid[i] & ~((u64)1);
2688 if (self == peer)
2689 return 2;
2690 }
2691
2692 *rule_nr = 90;
2693 self = mdev->ldev->md.uuid[UI_BITMAP] & ~((u64)1);
2694 peer = mdev->p_uuid[UI_BITMAP] & ~((u64)1);
2695 if (self == peer && self != ((u64)0))
2696 return 100;
2697
2698 *rule_nr = 100;
2699 for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
2700 self = mdev->ldev->md.uuid[i] & ~((u64)1);
2701 for (j = UI_HISTORY_START; j <= UI_HISTORY_END; j++) {
2702 peer = mdev->p_uuid[j] & ~((u64)1);
2703 if (self == peer)
2704 return -100;
2705 }
2706 }
2707
2708 return -1000;
2709}
2710
2711/* drbd_sync_handshake() returns the new conn state on success, or
2712 CONN_MASK (-1) on failure.
2713 */
2714static enum drbd_conns drbd_sync_handshake(struct drbd_conf *mdev, enum drbd_role peer_role,
2715 enum drbd_disk_state peer_disk) __must_hold(local)
2716{
2717 int hg, rule_nr;
2718 enum drbd_conns rv = C_MASK;
2719 enum drbd_disk_state mydisk;
2720
2721 mydisk = mdev->state.disk;
2722 if (mydisk == D_NEGOTIATING)
2723 mydisk = mdev->new_state_tmp.disk;
2724
2725 dev_info(DEV, "drbd_sync_handshake:\n");
2726 drbd_uuid_dump(mdev, "self", mdev->ldev->md.uuid, mdev->comm_bm_set, 0);
2727 drbd_uuid_dump(mdev, "peer", mdev->p_uuid,
2728 mdev->p_uuid[UI_SIZE], mdev->p_uuid[UI_FLAGS]);
2729
2730 hg = drbd_uuid_compare(mdev, &rule_nr);
2731
2732 dev_info(DEV, "uuid_compare()=%d by rule %d\n", hg, rule_nr);
2733
2734 if (hg == -1000) {
2735 dev_alert(DEV, "Unrelated data, aborting!\n");
2736 return C_MASK;
2737 }
4a23f264
PR
2738 if (hg < -1000) {
2739 dev_alert(DEV, "To resolve this both sides have to support at least protocol %d\n", -hg - 1000);
b411b363
PR
2740 return C_MASK;
2741 }
2742
2743 if ((mydisk == D_INCONSISTENT && peer_disk > D_INCONSISTENT) ||
2744 (peer_disk == D_INCONSISTENT && mydisk > D_INCONSISTENT)) {
2745 int f = (hg == -100) || abs(hg) == 2;
2746 hg = mydisk > D_INCONSISTENT ? 1 : -1;
2747 if (f)
2748 hg = hg*2;
2749 dev_info(DEV, "Becoming sync %s due to disk states.\n",
2750 hg > 0 ? "source" : "target");
2751 }
2752
3a11a487
AG
2753 if (abs(hg) == 100)
2754 drbd_khelper(mdev, "initial-split-brain");
2755
89e58e75 2756 if (hg == 100 || (hg == -100 && mdev->tconn->net_conf->always_asbp)) {
b411b363
PR
2757 int pcount = (mdev->state.role == R_PRIMARY)
2758 + (peer_role == R_PRIMARY);
2759 int forced = (hg == -100);
2760
2761 switch (pcount) {
2762 case 0:
2763 hg = drbd_asb_recover_0p(mdev);
2764 break;
2765 case 1:
2766 hg = drbd_asb_recover_1p(mdev);
2767 break;
2768 case 2:
2769 hg = drbd_asb_recover_2p(mdev);
2770 break;
2771 }
2772 if (abs(hg) < 100) {
2773 dev_warn(DEV, "Split-Brain detected, %d primaries, "
2774 "automatically solved. Sync from %s node\n",
2775 pcount, (hg < 0) ? "peer" : "this");
2776 if (forced) {
2777 dev_warn(DEV, "Doing a full sync, since"
2778 " UUIDs where ambiguous.\n");
2779 hg = hg*2;
2780 }
2781 }
2782 }
2783
2784 if (hg == -100) {
89e58e75 2785 if (mdev->tconn->net_conf->want_lose && !(mdev->p_uuid[UI_FLAGS]&1))
b411b363 2786 hg = -1;
89e58e75 2787 if (!mdev->tconn->net_conf->want_lose && (mdev->p_uuid[UI_FLAGS]&1))
b411b363
PR
2788 hg = 1;
2789
2790 if (abs(hg) < 100)
2791 dev_warn(DEV, "Split-Brain detected, manually solved. "
2792 "Sync from %s node\n",
2793 (hg < 0) ? "peer" : "this");
2794 }
2795
2796 if (hg == -100) {
580b9767
LE
2797 /* FIXME this log message is not correct if we end up here
2798 * after an attempted attach on a diskless node.
2799 * We just refuse to attach -- well, we drop the "connection"
2800 * to that disk, in a way... */
3a11a487 2801 dev_alert(DEV, "Split-Brain detected but unresolved, dropping connection!\n");
b411b363
PR
2802 drbd_khelper(mdev, "split-brain");
2803 return C_MASK;
2804 }
2805
2806 if (hg > 0 && mydisk <= D_INCONSISTENT) {
2807 dev_err(DEV, "I shall become SyncSource, but I am inconsistent!\n");
2808 return C_MASK;
2809 }
2810
2811 if (hg < 0 && /* by intention we do not use mydisk here. */
2812 mdev->state.role == R_PRIMARY && mdev->state.disk >= D_CONSISTENT) {
89e58e75 2813 switch (mdev->tconn->net_conf->rr_conflict) {
b411b363
PR
2814 case ASB_CALL_HELPER:
2815 drbd_khelper(mdev, "pri-lost");
2816 /* fall through */
2817 case ASB_DISCONNECT:
2818 dev_err(DEV, "I shall become SyncTarget, but I am primary!\n");
2819 return C_MASK;
2820 case ASB_VIOLENTLY:
2821 dev_warn(DEV, "Becoming SyncTarget, violating the stable-data"
2822 "assumption\n");
2823 }
2824 }
2825
8169e41b 2826 if (mdev->tconn->net_conf->dry_run || test_bit(CONN_DRY_RUN, &mdev->tconn->flags)) {
cf14c2e9
PR
2827 if (hg == 0)
2828 dev_info(DEV, "dry-run connect: No resync, would become Connected immediately.\n");
2829 else
2830 dev_info(DEV, "dry-run connect: Would become %s, doing a %s resync.",
2831 drbd_conn_str(hg > 0 ? C_SYNC_SOURCE : C_SYNC_TARGET),
2832 abs(hg) >= 2 ? "full" : "bit-map based");
2833 return C_MASK;
2834 }
2835
b411b363
PR
2836 if (abs(hg) >= 2) {
2837 dev_info(DEV, "Writing the whole bitmap, full sync required after drbd_sync_handshake.\n");
20ceb2b2
LE
2838 if (drbd_bitmap_io(mdev, &drbd_bmio_set_n_write, "set_n_write from sync_handshake",
2839 BM_LOCKED_SET_ALLOWED))
b411b363
PR
2840 return C_MASK;
2841 }
2842
2843 if (hg > 0) { /* become sync source. */
2844 rv = C_WF_BITMAP_S;
2845 } else if (hg < 0) { /* become sync target */
2846 rv = C_WF_BITMAP_T;
2847 } else {
2848 rv = C_CONNECTED;
2849 if (drbd_bm_total_weight(mdev)) {
2850 dev_info(DEV, "No resync, but %lu bits in bitmap!\n",
2851 drbd_bm_total_weight(mdev));
2852 }
2853 }
2854
2855 return rv;
2856}
2857
2858/* returns 1 if invalid */
2859static int cmp_after_sb(enum drbd_after_sb_p peer, enum drbd_after_sb_p self)
2860{
2861 /* ASB_DISCARD_REMOTE - ASB_DISCARD_LOCAL is valid */
2862 if ((peer == ASB_DISCARD_REMOTE && self == ASB_DISCARD_LOCAL) ||
2863 (self == ASB_DISCARD_REMOTE && peer == ASB_DISCARD_LOCAL))
2864 return 0;
2865
2866 /* any other things with ASB_DISCARD_REMOTE or ASB_DISCARD_LOCAL are invalid */
2867 if (peer == ASB_DISCARD_REMOTE || peer == ASB_DISCARD_LOCAL ||
2868 self == ASB_DISCARD_REMOTE || self == ASB_DISCARD_LOCAL)
2869 return 1;
2870
2871 /* everything else is valid if they are equal on both sides. */
2872 if (peer == self)
2873 return 0;
2874
2875 /* everything es is invalid. */
2876 return 1;
2877}
2878
e2857216 2879static int receive_protocol(struct drbd_tconn *tconn, struct packet_info *pi)
b411b363 2880{
e6ef8a5c 2881 struct p_protocol *p = tconn->data.rbuf;
b411b363 2882 int p_proto, p_after_sb_0p, p_after_sb_1p, p_after_sb_2p;
cf14c2e9 2883 int p_want_lose, p_two_primaries, cf;
b411b363
PR
2884 char p_integrity_alg[SHARED_SECRET_MAX] = "";
2885
b411b363
PR
2886 p_proto = be32_to_cpu(p->protocol);
2887 p_after_sb_0p = be32_to_cpu(p->after_sb_0p);
2888 p_after_sb_1p = be32_to_cpu(p->after_sb_1p);
2889 p_after_sb_2p = be32_to_cpu(p->after_sb_2p);
b411b363 2890 p_two_primaries = be32_to_cpu(p->two_primaries);
cf14c2e9
PR
2891 cf = be32_to_cpu(p->conn_flags);
2892 p_want_lose = cf & CF_WANT_LOSE;
2893
7204624c 2894 clear_bit(CONN_DRY_RUN, &tconn->flags);
cf14c2e9
PR
2895
2896 if (cf & CF_DRY_RUN)
7204624c 2897 set_bit(CONN_DRY_RUN, &tconn->flags);
b411b363 2898
7204624c
PR
2899 if (p_proto != tconn->net_conf->wire_protocol) {
2900 conn_err(tconn, "incompatible communication protocols\n");
b411b363
PR
2901 goto disconnect;
2902 }
2903
7204624c
PR
2904 if (cmp_after_sb(p_after_sb_0p, tconn->net_conf->after_sb_0p)) {
2905 conn_err(tconn, "incompatible after-sb-0pri settings\n");
b411b363
PR
2906 goto disconnect;
2907 }
2908
7204624c
PR
2909 if (cmp_after_sb(p_after_sb_1p, tconn->net_conf->after_sb_1p)) {
2910 conn_err(tconn, "incompatible after-sb-1pri settings\n");
b411b363
PR
2911 goto disconnect;
2912 }
2913
7204624c
PR
2914 if (cmp_after_sb(p_after_sb_2p, tconn->net_conf->after_sb_2p)) {
2915 conn_err(tconn, "incompatible after-sb-2pri settings\n");
b411b363
PR
2916 goto disconnect;
2917 }
2918
7204624c
PR
2919 if (p_want_lose && tconn->net_conf->want_lose) {
2920 conn_err(tconn, "both sides have the 'want_lose' flag set\n");
b411b363
PR
2921 goto disconnect;
2922 }
2923
7204624c
PR
2924 if (p_two_primaries != tconn->net_conf->two_primaries) {
2925 conn_err(tconn, "incompatible setting of the two-primaries options\n");
b411b363
PR
2926 goto disconnect;
2927 }
2928
7204624c
PR
2929 if (tconn->agreed_pro_version >= 87) {
2930 unsigned char *my_alg = tconn->net_conf->integrity_alg;
82bc0194 2931 int err;
b411b363 2932
e2857216 2933 err = drbd_recv_all(tconn, p_integrity_alg, pi->size);
82bc0194
AG
2934 if (err)
2935 return err;
b411b363
PR
2936
2937 p_integrity_alg[SHARED_SECRET_MAX-1] = 0;
2938 if (strcmp(p_integrity_alg, my_alg)) {
7204624c 2939 conn_err(tconn, "incompatible setting of the data-integrity-alg\n");
b411b363
PR
2940 goto disconnect;
2941 }
7204624c 2942 conn_info(tconn, "data-integrity-alg: %s\n",
b411b363
PR
2943 my_alg[0] ? my_alg : (unsigned char *)"<not-used>");
2944 }
2945
82bc0194 2946 return 0;
b411b363
PR
2947
2948disconnect:
7204624c 2949 conn_request_state(tconn, NS(conn, C_DISCONNECTING), CS_HARD);
82bc0194 2950 return -EIO;
b411b363
PR
2951}
2952
2953/* helper function
2954 * input: alg name, feature name
2955 * return: NULL (alg name was "")
2956 * ERR_PTR(error) if something goes wrong
2957 * or the crypto hash ptr, if it worked out ok. */
2958struct crypto_hash *drbd_crypto_alloc_digest_safe(const struct drbd_conf *mdev,
2959 const char *alg, const char *name)
2960{
2961 struct crypto_hash *tfm;
2962
2963 if (!alg[0])
2964 return NULL;
2965
2966 tfm = crypto_alloc_hash(alg, 0, CRYPTO_ALG_ASYNC);
2967 if (IS_ERR(tfm)) {
2968 dev_err(DEV, "Can not allocate \"%s\" as %s (reason: %ld)\n",
2969 alg, name, PTR_ERR(tfm));
2970 return tfm;
2971 }
2972 if (!drbd_crypto_is_hash(crypto_hash_tfm(tfm))) {
2973 crypto_free_hash(tfm);
2974 dev_err(DEV, "\"%s\" is not a digest (%s)\n", alg, name);
2975 return ERR_PTR(-EINVAL);
2976 }
2977 return tfm;
2978}
2979
4a76b161
AG
2980static int ignore_remaining_packet(struct drbd_tconn *tconn, struct packet_info *pi)
2981{
2982 void *buffer = tconn->data.rbuf;
2983 int size = pi->size;
2984
2985 while (size) {
2986 int s = min_t(int, size, DRBD_SOCKET_BUFFER_SIZE);
2987 s = drbd_recv(tconn, buffer, s);
2988 if (s <= 0) {
2989 if (s < 0)
2990 return s;
2991 break;
2992 }
2993 size -= s;
2994 }
2995 if (size)
2996 return -EIO;
2997 return 0;
2998}
2999
3000/*
3001 * config_unknown_volume - device configuration command for unknown volume
3002 *
3003 * When a device is added to an existing connection, the node on which the
3004 * device is added first will send configuration commands to its peer but the
3005 * peer will not know about the device yet. It will warn and ignore these
3006 * commands. Once the device is added on the second node, the second node will
3007 * send the same device configuration commands, but in the other direction.
3008 *
3009 * (We can also end up here if drbd is misconfigured.)
3010 */
3011static int config_unknown_volume(struct drbd_tconn *tconn, struct packet_info *pi)
3012{
3013 conn_warn(tconn, "Volume %u unknown; ignoring %s packet\n",
3014 pi->vnr, cmdname(pi->cmd));
3015 return ignore_remaining_packet(tconn, pi);
3016}
3017
3018static int receive_SyncParam(struct drbd_tconn *tconn, struct packet_info *pi)
b411b363 3019{
4a76b161
AG
3020 struct drbd_conf *mdev;
3021 struct p_rs_param_95 *p = tconn->data.rbuf;
b411b363
PR
3022 unsigned int header_size, data_size, exp_max_sz;
3023 struct crypto_hash *verify_tfm = NULL;
3024 struct crypto_hash *csums_tfm = NULL;
4a76b161 3025 const int apv = tconn->agreed_pro_version;
778f271d
PR
3026 int *rs_plan_s = NULL;
3027 int fifo_size = 0;
82bc0194 3028 int err;
b411b363 3029
4a76b161
AG
3030 mdev = vnr_to_mdev(tconn, pi->vnr);
3031 if (!mdev)
3032 return config_unknown_volume(tconn, pi);
3033
b411b363
PR
3034 exp_max_sz = apv <= 87 ? sizeof(struct p_rs_param)
3035 : apv == 88 ? sizeof(struct p_rs_param)
3036 + SHARED_SECRET_MAX
8e26f9cc
PR
3037 : apv <= 94 ? sizeof(struct p_rs_param_89)
3038 : /* apv >= 95 */ sizeof(struct p_rs_param_95);
b411b363 3039
e2857216 3040 if (pi->size > exp_max_sz) {
b411b363 3041 dev_err(DEV, "SyncParam packet too long: received %u, expected <= %u bytes\n",
e2857216 3042 pi->size, exp_max_sz);
82bc0194 3043 return -EIO;
b411b363
PR
3044 }
3045
3046 if (apv <= 88) {
257d0af6 3047 header_size = sizeof(struct p_rs_param) - sizeof(struct p_header);
e2857216 3048 data_size = pi->size - header_size;
8e26f9cc 3049 } else if (apv <= 94) {
257d0af6 3050 header_size = sizeof(struct p_rs_param_89) - sizeof(struct p_header);
e2857216 3051 data_size = pi->size - header_size;
b411b363 3052 D_ASSERT(data_size == 0);
8e26f9cc 3053 } else {
257d0af6 3054 header_size = sizeof(struct p_rs_param_95) - sizeof(struct p_header);
e2857216 3055 data_size = pi->size - header_size;
b411b363
PR
3056 D_ASSERT(data_size == 0);
3057 }
3058
3059 /* initialize verify_alg and csums_alg */
3060 memset(p->verify_alg, 0, 2 * SHARED_SECRET_MAX);
3061
82bc0194
AG
3062 err = drbd_recv_all(mdev->tconn, &p->head.payload, header_size);
3063 if (err)
3064 return err;
b411b363 3065
f399002e
LE
3066 if (get_ldev(mdev)) {
3067 mdev->ldev->dc.resync_rate = be32_to_cpu(p->rate);
3068 put_ldev(mdev);
3069 }
b411b363
PR
3070
3071 if (apv >= 88) {
3072 if (apv == 88) {
3073 if (data_size > SHARED_SECRET_MAX) {
3074 dev_err(DEV, "verify-alg too long, "
3075 "peer wants %u, accepting only %u byte\n",
3076 data_size, SHARED_SECRET_MAX);
82bc0194 3077 return -EIO;
b411b363
PR
3078 }
3079
82bc0194
AG
3080 err = drbd_recv_all(mdev->tconn, p->verify_alg, data_size);
3081 if (err)
3082 return err;
b411b363
PR
3083
3084 /* we expect NUL terminated string */
3085 /* but just in case someone tries to be evil */
3086 D_ASSERT(p->verify_alg[data_size-1] == 0);
3087 p->verify_alg[data_size-1] = 0;
3088
3089 } else /* apv >= 89 */ {
3090 /* we still expect NUL terminated strings */
3091 /* but just in case someone tries to be evil */
3092 D_ASSERT(p->verify_alg[SHARED_SECRET_MAX-1] == 0);
3093 D_ASSERT(p->csums_alg[SHARED_SECRET_MAX-1] == 0);
3094 p->verify_alg[SHARED_SECRET_MAX-1] = 0;
3095 p->csums_alg[SHARED_SECRET_MAX-1] = 0;
3096 }
3097
f399002e 3098 if (strcmp(mdev->tconn->net_conf->verify_alg, p->verify_alg)) {
b411b363
PR
3099 if (mdev->state.conn == C_WF_REPORT_PARAMS) {
3100 dev_err(DEV, "Different verify-alg settings. me=\"%s\" peer=\"%s\"\n",
f399002e 3101 mdev->tconn->net_conf->verify_alg, p->verify_alg);
b411b363
PR
3102 goto disconnect;
3103 }
3104 verify_tfm = drbd_crypto_alloc_digest_safe(mdev,
3105 p->verify_alg, "verify-alg");
3106 if (IS_ERR(verify_tfm)) {
3107 verify_tfm = NULL;
3108 goto disconnect;
3109 }
3110 }
3111
f399002e 3112 if (apv >= 89 && strcmp(mdev->tconn->net_conf->csums_alg, p->csums_alg)) {
b411b363
PR
3113 if (mdev->state.conn == C_WF_REPORT_PARAMS) {
3114 dev_err(DEV, "Different csums-alg settings. me=\"%s\" peer=\"%s\"\n",
f399002e 3115 mdev->tconn->net_conf->csums_alg, p->csums_alg);
b411b363
PR
3116 goto disconnect;
3117 }
3118 csums_tfm = drbd_crypto_alloc_digest_safe(mdev,
3119 p->csums_alg, "csums-alg");
3120 if (IS_ERR(csums_tfm)) {
3121 csums_tfm = NULL;
3122 goto disconnect;
3123 }
3124 }
3125
f399002e
LE
3126 if (apv > 94 && get_ldev(mdev)) {
3127 mdev->ldev->dc.resync_rate = be32_to_cpu(p->rate);
3128 mdev->ldev->dc.c_plan_ahead = be32_to_cpu(p->c_plan_ahead);
3129 mdev->ldev->dc.c_delay_target = be32_to_cpu(p->c_delay_target);
3130 mdev->ldev->dc.c_fill_target = be32_to_cpu(p->c_fill_target);
3131 mdev->ldev->dc.c_max_rate = be32_to_cpu(p->c_max_rate);
778f271d 3132
f399002e 3133 fifo_size = (mdev->ldev->dc.c_plan_ahead * 10 * SLEEP_TIME) / HZ;
778f271d
PR
3134 if (fifo_size != mdev->rs_plan_s.size && fifo_size > 0) {
3135 rs_plan_s = kzalloc(sizeof(int) * fifo_size, GFP_KERNEL);
3136 if (!rs_plan_s) {
3137 dev_err(DEV, "kmalloc of fifo_buffer failed");
f399002e 3138 put_ldev(mdev);
778f271d
PR
3139 goto disconnect;
3140 }
3141 }
f399002e 3142 put_ldev(mdev);
8e26f9cc 3143 }
b411b363
PR
3144
3145 spin_lock(&mdev->peer_seq_lock);
3146 /* lock against drbd_nl_syncer_conf() */
3147 if (verify_tfm) {
f399002e
LE
3148 strcpy(mdev->tconn->net_conf->verify_alg, p->verify_alg);
3149 mdev->tconn->net_conf->verify_alg_len = strlen(p->verify_alg) + 1;
3150 crypto_free_hash(mdev->tconn->verify_tfm);
3151 mdev->tconn->verify_tfm = verify_tfm;
b411b363
PR
3152 dev_info(DEV, "using verify-alg: \"%s\"\n", p->verify_alg);
3153 }
3154 if (csums_tfm) {
f399002e
LE
3155 strcpy(mdev->tconn->net_conf->csums_alg, p->csums_alg);
3156 mdev->tconn->net_conf->csums_alg_len = strlen(p->csums_alg) + 1;
3157 crypto_free_hash(mdev->tconn->csums_tfm);
3158 mdev->tconn->csums_tfm = csums_tfm;
b411b363
PR
3159 dev_info(DEV, "using csums-alg: \"%s\"\n", p->csums_alg);
3160 }
778f271d
PR
3161 if (fifo_size != mdev->rs_plan_s.size) {
3162 kfree(mdev->rs_plan_s.values);
3163 mdev->rs_plan_s.values = rs_plan_s;
3164 mdev->rs_plan_s.size = fifo_size;
3165 mdev->rs_planed = 0;
3166 }
b411b363
PR
3167 spin_unlock(&mdev->peer_seq_lock);
3168 }
82bc0194 3169 return 0;
b411b363 3170
b411b363
PR
3171disconnect:
3172 /* just for completeness: actually not needed,
3173 * as this is not reached if csums_tfm was ok. */
3174 crypto_free_hash(csums_tfm);
3175 /* but free the verify_tfm again, if csums_tfm did not work out */
3176 crypto_free_hash(verify_tfm);
38fa9988 3177 conn_request_state(mdev->tconn, NS(conn, C_DISCONNECTING), CS_HARD);
82bc0194 3178 return -EIO;
b411b363
PR
3179}
3180
b411b363
PR
3181/* warn if the arguments differ by more than 12.5% */
3182static void warn_if_differ_considerably(struct drbd_conf *mdev,
3183 const char *s, sector_t a, sector_t b)
3184{
3185 sector_t d;
3186 if (a == 0 || b == 0)
3187 return;
3188 d = (a > b) ? (a - b) : (b - a);
3189 if (d > (a>>3) || d > (b>>3))
3190 dev_warn(DEV, "Considerable difference in %s: %llus vs. %llus\n", s,
3191 (unsigned long long)a, (unsigned long long)b);
3192}
3193
4a76b161 3194static int receive_sizes(struct drbd_tconn *tconn, struct packet_info *pi)
b411b363 3195{
4a76b161
AG
3196 struct drbd_conf *mdev;
3197 struct p_sizes *p = tconn->data.rbuf;
b411b363 3198 enum determine_dev_size dd = unchanged;
b411b363
PR
3199 sector_t p_size, p_usize, my_usize;
3200 int ldsc = 0; /* local disk size changed */
e89b591c 3201 enum dds_flags ddsf;
b411b363 3202
4a76b161
AG
3203 mdev = vnr_to_mdev(tconn, pi->vnr);
3204 if (!mdev)
3205 return config_unknown_volume(tconn, pi);
3206
b411b363
PR
3207 p_size = be64_to_cpu(p->d_size);
3208 p_usize = be64_to_cpu(p->u_size);
3209
b411b363
PR
3210 /* just store the peer's disk size for now.
3211 * we still need to figure out whether we accept that. */
3212 mdev->p_size = p_size;
3213
b411b363
PR
3214 if (get_ldev(mdev)) {
3215 warn_if_differ_considerably(mdev, "lower level device sizes",
3216 p_size, drbd_get_max_capacity(mdev->ldev));
3217 warn_if_differ_considerably(mdev, "user requested size",
3218 p_usize, mdev->ldev->dc.disk_size);
3219
3220 /* if this is the first connect, or an otherwise expected
3221 * param exchange, choose the minimum */
3222 if (mdev->state.conn == C_WF_REPORT_PARAMS)
3223 p_usize = min_not_zero((sector_t)mdev->ldev->dc.disk_size,
3224 p_usize);
3225
3226 my_usize = mdev->ldev->dc.disk_size;
3227
3228 if (mdev->ldev->dc.disk_size != p_usize) {
3229 mdev->ldev->dc.disk_size = p_usize;
3230 dev_info(DEV, "Peer sets u_size to %lu sectors\n",
3231 (unsigned long)mdev->ldev->dc.disk_size);
3232 }
3233
3234 /* Never shrink a device with usable data during connect.
3235 But allow online shrinking if we are connected. */
a393db6f 3236 if (drbd_new_dev_size(mdev, mdev->ldev, 0) <
b411b363
PR
3237 drbd_get_capacity(mdev->this_bdev) &&
3238 mdev->state.disk >= D_OUTDATED &&
3239 mdev->state.conn < C_CONNECTED) {
3240 dev_err(DEV, "The peer's disk size is too small!\n");
38fa9988 3241 conn_request_state(mdev->tconn, NS(conn, C_DISCONNECTING), CS_HARD);
b411b363
PR
3242 mdev->ldev->dc.disk_size = my_usize;
3243 put_ldev(mdev);
82bc0194 3244 return -EIO;
b411b363
PR
3245 }
3246 put_ldev(mdev);
3247 }
b411b363 3248
e89b591c 3249 ddsf = be16_to_cpu(p->dds_flags);
b411b363 3250 if (get_ldev(mdev)) {
24c4830c 3251 dd = drbd_determine_dev_size(mdev, ddsf);
b411b363
PR
3252 put_ldev(mdev);
3253 if (dd == dev_size_error)
82bc0194 3254 return -EIO;
b411b363
PR
3255 drbd_md_sync(mdev);
3256 } else {
3257 /* I am diskless, need to accept the peer's size. */
3258 drbd_set_my_capacity(mdev, p_size);
3259 }
3260
99432fcc
PR
3261 mdev->peer_max_bio_size = be32_to_cpu(p->max_bio_size);
3262 drbd_reconsider_max_bio_size(mdev);
3263
b411b363
PR
3264 if (get_ldev(mdev)) {
3265 if (mdev->ldev->known_size != drbd_get_capacity(mdev->ldev->backing_bdev)) {
3266 mdev->ldev->known_size = drbd_get_capacity(mdev->ldev->backing_bdev);
3267 ldsc = 1;
3268 }
3269
b411b363
PR
3270 put_ldev(mdev);
3271 }
3272
3273 if (mdev->state.conn > C_WF_REPORT_PARAMS) {
3274 if (be64_to_cpu(p->c_size) !=
3275 drbd_get_capacity(mdev->this_bdev) || ldsc) {
3276 /* we have different sizes, probably peer
3277 * needs to know my new size... */
e89b591c 3278 drbd_send_sizes(mdev, 0, ddsf);
b411b363
PR
3279 }
3280 if (test_and_clear_bit(RESIZE_PENDING, &mdev->flags) ||
3281 (dd == grew && mdev->state.conn == C_CONNECTED)) {
3282 if (mdev->state.pdsk >= D_INCONSISTENT &&
e89b591c
PR
3283 mdev->state.disk >= D_INCONSISTENT) {
3284 if (ddsf & DDSF_NO_RESYNC)
3285 dev_info(DEV, "Resync of new storage suppressed with --assume-clean\n");
3286 else
3287 resync_after_online_grow(mdev);
3288 } else
b411b363
PR
3289 set_bit(RESYNC_AFTER_NEG, &mdev->flags);
3290 }
3291 }
3292
82bc0194 3293 return 0;
b411b363
PR
3294}
3295
4a76b161 3296static int receive_uuids(struct drbd_tconn *tconn, struct packet_info *pi)
b411b363 3297{
4a76b161
AG
3298 struct drbd_conf *mdev;
3299 struct p_uuids *p = tconn->data.rbuf;
b411b363 3300 u64 *p_uuid;
62b0da3a 3301 int i, updated_uuids = 0;
b411b363 3302
4a76b161
AG
3303 mdev = vnr_to_mdev(tconn, pi->vnr);
3304 if (!mdev)
3305 return config_unknown_volume(tconn, pi);
3306
b411b363
PR
3307 p_uuid = kmalloc(sizeof(u64)*UI_EXTENDED_SIZE, GFP_NOIO);
3308
3309 for (i = UI_CURRENT; i < UI_EXTENDED_SIZE; i++)
3310 p_uuid[i] = be64_to_cpu(p->uuid[i]);
3311
3312 kfree(mdev->p_uuid);
3313 mdev->p_uuid = p_uuid;
3314
3315 if (mdev->state.conn < C_CONNECTED &&
3316 mdev->state.disk < D_INCONSISTENT &&
3317 mdev->state.role == R_PRIMARY &&
3318 (mdev->ed_uuid & ~((u64)1)) != (p_uuid[UI_CURRENT] & ~((u64)1))) {
3319 dev_err(DEV, "Can only connect to data with current UUID=%016llX\n",
3320 (unsigned long long)mdev->ed_uuid);
38fa9988 3321 conn_request_state(mdev->tconn, NS(conn, C_DISCONNECTING), CS_HARD);
82bc0194 3322 return -EIO;
b411b363
PR
3323 }
3324
3325 if (get_ldev(mdev)) {
3326 int skip_initial_sync =
3327 mdev->state.conn == C_CONNECTED &&
31890f4a 3328 mdev->tconn->agreed_pro_version >= 90 &&
b411b363
PR
3329 mdev->ldev->md.uuid[UI_CURRENT] == UUID_JUST_CREATED &&
3330 (p_uuid[UI_FLAGS] & 8);
3331 if (skip_initial_sync) {
3332 dev_info(DEV, "Accepted new current UUID, preparing to skip initial sync\n");
3333 drbd_bitmap_io(mdev, &drbd_bmio_clear_n_write,
20ceb2b2
LE
3334 "clear_n_write from receive_uuids",
3335 BM_LOCKED_TEST_ALLOWED);
b411b363
PR
3336 _drbd_uuid_set(mdev, UI_CURRENT, p_uuid[UI_CURRENT]);
3337 _drbd_uuid_set(mdev, UI_BITMAP, 0);
3338 _drbd_set_state(_NS2(mdev, disk, D_UP_TO_DATE, pdsk, D_UP_TO_DATE),
3339 CS_VERBOSE, NULL);
3340 drbd_md_sync(mdev);
62b0da3a 3341 updated_uuids = 1;
b411b363
PR
3342 }
3343 put_ldev(mdev);
18a50fa2
PR
3344 } else if (mdev->state.disk < D_INCONSISTENT &&
3345 mdev->state.role == R_PRIMARY) {
3346 /* I am a diskless primary, the peer just created a new current UUID
3347 for me. */
62b0da3a 3348 updated_uuids = drbd_set_ed_uuid(mdev, p_uuid[UI_CURRENT]);
b411b363
PR
3349 }
3350
3351 /* Before we test for the disk state, we should wait until an eventually
3352 ongoing cluster wide state change is finished. That is important if
3353 we are primary and are detaching from our disk. We need to see the
3354 new disk state... */
8410da8f
PR
3355 mutex_lock(mdev->state_mutex);
3356 mutex_unlock(mdev->state_mutex);
b411b363 3357 if (mdev->state.conn >= C_CONNECTED && mdev->state.disk < D_INCONSISTENT)
62b0da3a
LE
3358 updated_uuids |= drbd_set_ed_uuid(mdev, p_uuid[UI_CURRENT]);
3359
3360 if (updated_uuids)
3361 drbd_print_uuids(mdev, "receiver updated UUIDs to");
b411b363 3362
82bc0194 3363 return 0;
b411b363
PR
3364}
3365
3366/**
3367 * convert_state() - Converts the peer's view of the cluster state to our point of view
3368 * @ps: The state as seen by the peer.
3369 */
3370static union drbd_state convert_state(union drbd_state ps)
3371{
3372 union drbd_state ms;
3373
3374 static enum drbd_conns c_tab[] = {
3375 [C_CONNECTED] = C_CONNECTED,
3376
3377 [C_STARTING_SYNC_S] = C_STARTING_SYNC_T,
3378 [C_STARTING_SYNC_T] = C_STARTING_SYNC_S,
3379 [C_DISCONNECTING] = C_TEAR_DOWN, /* C_NETWORK_FAILURE, */
3380 [C_VERIFY_S] = C_VERIFY_T,
3381 [C_MASK] = C_MASK,
3382 };
3383
3384 ms.i = ps.i;
3385
3386 ms.conn = c_tab[ps.conn];
3387 ms.peer = ps.role;
3388 ms.role = ps.peer;
3389 ms.pdsk = ps.disk;
3390 ms.disk = ps.pdsk;
3391 ms.peer_isp = (ps.aftr_isp | ps.user_isp);
3392
3393 return ms;
3394}
3395
4a76b161 3396static int receive_req_state(struct drbd_tconn *tconn, struct packet_info *pi)
b411b363 3397{
4a76b161
AG
3398 struct drbd_conf *mdev;
3399 struct p_req_state *p = tconn->data.rbuf;
b411b363 3400 union drbd_state mask, val;
bf885f8a 3401 enum drbd_state_rv rv;
b411b363 3402
4a76b161
AG
3403 mdev = vnr_to_mdev(tconn, pi->vnr);
3404 if (!mdev)
3405 return -EIO;
3406
b411b363
PR
3407 mask.i = be32_to_cpu(p->mask);
3408 val.i = be32_to_cpu(p->val);
3409
25703f83 3410 if (test_bit(DISCARD_CONCURRENT, &mdev->tconn->flags) &&
8410da8f 3411 mutex_is_locked(mdev->state_mutex)) {
b411b363 3412 drbd_send_sr_reply(mdev, SS_CONCURRENT_ST_CHG);
82bc0194 3413 return 0;
b411b363
PR
3414 }
3415
3416 mask = convert_state(mask);
3417 val = convert_state(val);
3418
dfafcc8a
PR
3419 rv = drbd_change_state(mdev, CS_VERBOSE, mask, val);
3420 drbd_send_sr_reply(mdev, rv);
b411b363 3421
b411b363
PR
3422 drbd_md_sync(mdev);
3423
82bc0194 3424 return 0;
b411b363
PR
3425}
3426
e2857216 3427static int receive_req_conn_state(struct drbd_tconn *tconn, struct packet_info *pi)
dfafcc8a 3428{
e6ef8a5c 3429 struct p_req_state *p = tconn->data.rbuf;
dfafcc8a
PR
3430 union drbd_state mask, val;
3431 enum drbd_state_rv rv;
3432
3433 mask.i = be32_to_cpu(p->mask);
3434 val.i = be32_to_cpu(p->val);
3435
3436 if (test_bit(DISCARD_CONCURRENT, &tconn->flags) &&
3437 mutex_is_locked(&tconn->cstate_mutex)) {
3438 conn_send_sr_reply(tconn, SS_CONCURRENT_ST_CHG);
82bc0194 3439 return 0;
dfafcc8a
PR
3440 }
3441
3442 mask = convert_state(mask);
3443 val = convert_state(val);
3444
778bcf2e 3445 rv = conn_request_state(tconn, mask, val, CS_VERBOSE | CS_LOCAL_ONLY | CS_IGN_OUTD_FAIL);
dfafcc8a
PR
3446 conn_send_sr_reply(tconn, rv);
3447
82bc0194 3448 return 0;
dfafcc8a
PR
3449}
3450
4a76b161 3451static int receive_state(struct drbd_tconn *tconn, struct packet_info *pi)
b411b363 3452{
4a76b161
AG
3453 struct drbd_conf *mdev;
3454 struct p_state *p = tconn->data.rbuf;
4ac4aada 3455 union drbd_state os, ns, peer_state;
b411b363 3456 enum drbd_disk_state real_peer_disk;
65d922c3 3457 enum chg_state_flags cs_flags;
b411b363
PR
3458 int rv;
3459
4a76b161
AG
3460 mdev = vnr_to_mdev(tconn, pi->vnr);
3461 if (!mdev)
3462 return config_unknown_volume(tconn, pi);
3463
b411b363
PR
3464 peer_state.i = be32_to_cpu(p->state);
3465
3466 real_peer_disk = peer_state.disk;
3467 if (peer_state.disk == D_NEGOTIATING) {
3468 real_peer_disk = mdev->p_uuid[UI_FLAGS] & 4 ? D_INCONSISTENT : D_CONSISTENT;
3469 dev_info(DEV, "real peer disk state = %s\n", drbd_disk_str(real_peer_disk));
3470 }
3471
87eeee41 3472 spin_lock_irq(&mdev->tconn->req_lock);
b411b363 3473 retry:
4ac4aada 3474 os = ns = mdev->state;
87eeee41 3475 spin_unlock_irq(&mdev->tconn->req_lock);
b411b363 3476
e9ef7bb6
LE
3477 /* peer says his disk is uptodate, while we think it is inconsistent,
3478 * and this happens while we think we have a sync going on. */
3479 if (os.pdsk == D_INCONSISTENT && real_peer_disk == D_UP_TO_DATE &&
3480 os.conn > C_CONNECTED && os.disk == D_UP_TO_DATE) {
3481 /* If we are (becoming) SyncSource, but peer is still in sync
3482 * preparation, ignore its uptodate-ness to avoid flapping, it
3483 * will change to inconsistent once the peer reaches active
3484 * syncing states.
3485 * It may have changed syncer-paused flags, however, so we
3486 * cannot ignore this completely. */
3487 if (peer_state.conn > C_CONNECTED &&
3488 peer_state.conn < C_SYNC_SOURCE)
3489 real_peer_disk = D_INCONSISTENT;
3490
3491 /* if peer_state changes to connected at the same time,
3492 * it explicitly notifies us that it finished resync.
3493 * Maybe we should finish it up, too? */
3494 else if (os.conn >= C_SYNC_SOURCE &&
3495 peer_state.conn == C_CONNECTED) {
3496 if (drbd_bm_total_weight(mdev) <= mdev->rs_failed)
3497 drbd_resync_finished(mdev);
82bc0194 3498 return 0;
e9ef7bb6
LE
3499 }
3500 }
3501
3502 /* peer says his disk is inconsistent, while we think it is uptodate,
3503 * and this happens while the peer still thinks we have a sync going on,
3504 * but we think we are already done with the sync.
3505 * We ignore this to avoid flapping pdsk.
3506 * This should not happen, if the peer is a recent version of drbd. */
3507 if (os.pdsk == D_UP_TO_DATE && real_peer_disk == D_INCONSISTENT &&
3508 os.conn == C_CONNECTED && peer_state.conn > C_SYNC_SOURCE)
3509 real_peer_disk = D_UP_TO_DATE;
3510
4ac4aada
LE
3511 if (ns.conn == C_WF_REPORT_PARAMS)
3512 ns.conn = C_CONNECTED;
b411b363 3513
67531718
PR
3514 if (peer_state.conn == C_AHEAD)
3515 ns.conn = C_BEHIND;
3516
b411b363
PR
3517 if (mdev->p_uuid && peer_state.disk >= D_NEGOTIATING &&
3518 get_ldev_if_state(mdev, D_NEGOTIATING)) {
3519 int cr; /* consider resync */
3520
3521 /* if we established a new connection */
4ac4aada 3522 cr = (os.conn < C_CONNECTED);
b411b363
PR
3523 /* if we had an established connection
3524 * and one of the nodes newly attaches a disk */
4ac4aada 3525 cr |= (os.conn == C_CONNECTED &&
b411b363 3526 (peer_state.disk == D_NEGOTIATING ||
4ac4aada 3527 os.disk == D_NEGOTIATING));
b411b363
PR
3528 /* if we have both been inconsistent, and the peer has been
3529 * forced to be UpToDate with --overwrite-data */
3530 cr |= test_bit(CONSIDER_RESYNC, &mdev->flags);
3531 /* if we had been plain connected, and the admin requested to
3532 * start a sync by "invalidate" or "invalidate-remote" */
4ac4aada 3533 cr |= (os.conn == C_CONNECTED &&
b411b363
PR
3534 (peer_state.conn >= C_STARTING_SYNC_S &&
3535 peer_state.conn <= C_WF_BITMAP_T));
3536
3537 if (cr)
4ac4aada 3538 ns.conn = drbd_sync_handshake(mdev, peer_state.role, real_peer_disk);
b411b363
PR
3539
3540 put_ldev(mdev);
4ac4aada
LE
3541 if (ns.conn == C_MASK) {
3542 ns.conn = C_CONNECTED;
b411b363 3543 if (mdev->state.disk == D_NEGOTIATING) {
82f59cc6 3544 drbd_force_state(mdev, NS(disk, D_FAILED));
b411b363
PR
3545 } else if (peer_state.disk == D_NEGOTIATING) {
3546 dev_err(DEV, "Disk attach process on the peer node was aborted.\n");
3547 peer_state.disk = D_DISKLESS;
580b9767 3548 real_peer_disk = D_DISKLESS;
b411b363 3549 } else {
8169e41b 3550 if (test_and_clear_bit(CONN_DRY_RUN, &mdev->tconn->flags))
82bc0194 3551 return -EIO;
4ac4aada 3552 D_ASSERT(os.conn == C_WF_REPORT_PARAMS);
38fa9988 3553 conn_request_state(mdev->tconn, NS(conn, C_DISCONNECTING), CS_HARD);
82bc0194 3554 return -EIO;
b411b363
PR
3555 }
3556 }
3557 }
3558
87eeee41 3559 spin_lock_irq(&mdev->tconn->req_lock);
4ac4aada 3560 if (mdev->state.i != os.i)
b411b363
PR
3561 goto retry;
3562 clear_bit(CONSIDER_RESYNC, &mdev->flags);
b411b363
PR
3563 ns.peer = peer_state.role;
3564 ns.pdsk = real_peer_disk;
3565 ns.peer_isp = (peer_state.aftr_isp | peer_state.user_isp);
4ac4aada 3566 if ((ns.conn == C_CONNECTED || ns.conn == C_WF_BITMAP_S) && ns.disk == D_NEGOTIATING)
b411b363 3567 ns.disk = mdev->new_state_tmp.disk;
4ac4aada
LE
3568 cs_flags = CS_VERBOSE + (os.conn < C_CONNECTED && ns.conn >= C_CONNECTED ? 0 : CS_HARD);
3569 if (ns.pdsk == D_CONSISTENT && is_susp(ns) && ns.conn == C_CONNECTED && os.conn < C_CONNECTED &&
481c6f50 3570 test_bit(NEW_CUR_UUID, &mdev->flags)) {
8554df1c 3571 /* Do not allow tl_restart(RESEND) for a rebooted peer. We can only allow this
481c6f50 3572 for temporal network outages! */
87eeee41 3573 spin_unlock_irq(&mdev->tconn->req_lock);
481c6f50 3574 dev_err(DEV, "Aborting Connect, can not thaw IO with an only Consistent peer\n");
2f5cdd0b 3575 tl_clear(mdev->tconn);
481c6f50
PR
3576 drbd_uuid_new_current(mdev);
3577 clear_bit(NEW_CUR_UUID, &mdev->flags);
38fa9988 3578 conn_request_state(mdev->tconn, NS2(conn, C_PROTOCOL_ERROR, susp, 0), CS_HARD);
82bc0194 3579 return -EIO;
481c6f50 3580 }
65d922c3 3581 rv = _drbd_set_state(mdev, ns, cs_flags, NULL);
b411b363 3582 ns = mdev->state;
87eeee41 3583 spin_unlock_irq(&mdev->tconn->req_lock);
b411b363
PR
3584
3585 if (rv < SS_SUCCESS) {
38fa9988 3586 conn_request_state(mdev->tconn, NS(conn, C_DISCONNECTING), CS_HARD);
82bc0194 3587 return -EIO;
b411b363
PR
3588 }
3589
4ac4aada
LE
3590 if (os.conn > C_WF_REPORT_PARAMS) {
3591 if (ns.conn > C_CONNECTED && peer_state.conn <= C_CONNECTED &&
b411b363
PR
3592 peer_state.disk != D_NEGOTIATING ) {
3593 /* we want resync, peer has not yet decided to sync... */
3594 /* Nowadays only used when forcing a node into primary role and
3595 setting its disk to UpToDate with that */
3596 drbd_send_uuids(mdev);
3597 drbd_send_state(mdev);
3598 }
3599 }
3600
89e58e75 3601 mdev->tconn->net_conf->want_lose = 0;
b411b363
PR
3602
3603 drbd_md_sync(mdev); /* update connected indicator, la_size, ... */
3604
82bc0194 3605 return 0;
b411b363
PR
3606}
3607
4a76b161 3608static int receive_sync_uuid(struct drbd_tconn *tconn, struct packet_info *pi)
b411b363 3609{
4a76b161
AG
3610 struct drbd_conf *mdev;
3611 struct p_rs_uuid *p = tconn->data.rbuf;
3612
3613 mdev = vnr_to_mdev(tconn, pi->vnr);
3614 if (!mdev)
3615 return -EIO;
b411b363
PR
3616
3617 wait_event(mdev->misc_wait,
3618 mdev->state.conn == C_WF_SYNC_UUID ||
c4752ef1 3619 mdev->state.conn == C_BEHIND ||
b411b363
PR
3620 mdev->state.conn < C_CONNECTED ||
3621 mdev->state.disk < D_NEGOTIATING);
3622
3623 /* D_ASSERT( mdev->state.conn == C_WF_SYNC_UUID ); */
3624
b411b363
PR
3625 /* Here the _drbd_uuid_ functions are right, current should
3626 _not_ be rotated into the history */
3627 if (get_ldev_if_state(mdev, D_NEGOTIATING)) {
3628 _drbd_uuid_set(mdev, UI_CURRENT, be64_to_cpu(p->uuid));
3629 _drbd_uuid_set(mdev, UI_BITMAP, 0UL);
3630
62b0da3a 3631 drbd_print_uuids(mdev, "updated sync uuid");
b411b363
PR
3632 drbd_start_resync(mdev, C_SYNC_TARGET);
3633
3634 put_ldev(mdev);
3635 } else
3636 dev_err(DEV, "Ignoring SyncUUID packet!\n");
3637
82bc0194 3638 return 0;
b411b363
PR
3639}
3640
2c46407d
AG
3641/**
3642 * receive_bitmap_plain
3643 *
3644 * Return 0 when done, 1 when another iteration is needed, and a negative error
3645 * code upon failure.
3646 */
3647static int
02918be2 3648receive_bitmap_plain(struct drbd_conf *mdev, unsigned int data_size,
fc56815c 3649 struct p_header *h, struct bm_xfer_ctx *c)
b411b363 3650{
fc56815c 3651 unsigned long *buffer = (unsigned long *)h->payload;
b411b363
PR
3652 unsigned num_words = min_t(size_t, BM_PACKET_WORDS, c->bm_words - c->word_offset);
3653 unsigned want = num_words * sizeof(long);
2c46407d 3654 int err;
b411b363 3655
02918be2
PR
3656 if (want != data_size) {
3657 dev_err(DEV, "%s:want (%u) != data_size (%u)\n", __func__, want, data_size);
2c46407d 3658 return -EIO;
b411b363
PR
3659 }
3660 if (want == 0)
2c46407d 3661 return 0;
82bc0194
AG
3662 err = drbd_recv_all(mdev->tconn, buffer, want);
3663 if (err)
2c46407d 3664 return err;
b411b363
PR
3665
3666 drbd_bm_merge_lel(mdev, c->word_offset, num_words, buffer);
3667
3668 c->word_offset += num_words;
3669 c->bit_offset = c->word_offset * BITS_PER_LONG;
3670 if (c->bit_offset > c->bm_bits)
3671 c->bit_offset = c->bm_bits;
3672
2c46407d 3673 return 1;
b411b363
PR
3674}
3675
a02d1240
AG
3676static enum drbd_bitmap_code dcbp_get_code(struct p_compressed_bm *p)
3677{
3678 return (enum drbd_bitmap_code)(p->encoding & 0x0f);
3679}
3680
3681static int dcbp_get_start(struct p_compressed_bm *p)
3682{
3683 return (p->encoding & 0x80) != 0;
3684}
3685
3686static int dcbp_get_pad_bits(struct p_compressed_bm *p)
3687{
3688 return (p->encoding >> 4) & 0x7;
3689}
3690
2c46407d
AG
3691/**
3692 * recv_bm_rle_bits
3693 *
3694 * Return 0 when done, 1 when another iteration is needed, and a negative error
3695 * code upon failure.
3696 */
3697static int
b411b363
PR
3698recv_bm_rle_bits(struct drbd_conf *mdev,
3699 struct p_compressed_bm *p,
c6d25cfe
PR
3700 struct bm_xfer_ctx *c,
3701 unsigned int len)
b411b363
PR
3702{
3703 struct bitstream bs;
3704 u64 look_ahead;
3705 u64 rl;
3706 u64 tmp;
3707 unsigned long s = c->bit_offset;
3708 unsigned long e;
a02d1240 3709 int toggle = dcbp_get_start(p);
b411b363
PR
3710 int have;
3711 int bits;
3712
a02d1240 3713 bitstream_init(&bs, p->code, len, dcbp_get_pad_bits(p));
b411b363
PR
3714
3715 bits = bitstream_get_bits(&bs, &look_ahead, 64);
3716 if (bits < 0)
2c46407d 3717 return -EIO;
b411b363
PR
3718
3719 for (have = bits; have > 0; s += rl, toggle = !toggle) {
3720 bits = vli_decode_bits(&rl, look_ahead);
3721 if (bits <= 0)
2c46407d 3722 return -EIO;
b411b363
PR
3723
3724 if (toggle) {
3725 e = s + rl -1;
3726 if (e >= c->bm_bits) {
3727 dev_err(DEV, "bitmap overflow (e:%lu) while decoding bm RLE packet\n", e);
2c46407d 3728 return -EIO;
b411b363
PR
3729 }
3730 _drbd_bm_set_bits(mdev, s, e);
3731 }
3732
3733 if (have < bits) {
3734 dev_err(DEV, "bitmap decoding error: h:%d b:%d la:0x%08llx l:%u/%u\n",
3735 have, bits, look_ahead,
3736 (unsigned int)(bs.cur.b - p->code),
3737 (unsigned int)bs.buf_len);
2c46407d 3738 return -EIO;
b411b363
PR
3739 }
3740 look_ahead >>= bits;
3741 have -= bits;
3742
3743 bits = bitstream_get_bits(&bs, &tmp, 64 - have);
3744 if (bits < 0)
2c46407d 3745 return -EIO;
b411b363
PR
3746 look_ahead |= tmp << have;
3747 have += bits;
3748 }
3749
3750 c->bit_offset = s;
3751 bm_xfer_ctx_bit_to_word_offset(c);
3752
2c46407d 3753 return (s != c->bm_bits);
b411b363
PR
3754}
3755
2c46407d
AG
3756/**
3757 * decode_bitmap_c
3758 *
3759 * Return 0 when done, 1 when another iteration is needed, and a negative error
3760 * code upon failure.
3761 */
3762static int
b411b363
PR
3763decode_bitmap_c(struct drbd_conf *mdev,
3764 struct p_compressed_bm *p,
c6d25cfe
PR
3765 struct bm_xfer_ctx *c,
3766 unsigned int len)
b411b363 3767{
a02d1240 3768 if (dcbp_get_code(p) == RLE_VLI_Bits)
c6d25cfe 3769 return recv_bm_rle_bits(mdev, p, c, len);
b411b363
PR
3770
3771 /* other variants had been implemented for evaluation,
3772 * but have been dropped as this one turned out to be "best"
3773 * during all our tests. */
3774
3775 dev_err(DEV, "receive_bitmap_c: unknown encoding %u\n", p->encoding);
38fa9988 3776 conn_request_state(mdev->tconn, NS(conn, C_PROTOCOL_ERROR), CS_HARD);
2c46407d 3777 return -EIO;
b411b363
PR
3778}
3779
3780void INFO_bm_xfer_stats(struct drbd_conf *mdev,
3781 const char *direction, struct bm_xfer_ctx *c)
3782{
3783 /* what would it take to transfer it "plaintext" */
c012949a 3784 unsigned plain = sizeof(struct p_header) *
b411b363
PR
3785 ((c->bm_words+BM_PACKET_WORDS-1)/BM_PACKET_WORDS+1)
3786 + c->bm_words * sizeof(long);
3787 unsigned total = c->bytes[0] + c->bytes[1];
3788 unsigned r;
3789
3790 /* total can not be zero. but just in case: */
3791 if (total == 0)
3792 return;
3793
3794 /* don't report if not compressed */
3795 if (total >= plain)
3796 return;
3797
3798 /* total < plain. check for overflow, still */
3799 r = (total > UINT_MAX/1000) ? (total / (plain/1000))
3800 : (1000 * total / plain);
3801
3802 if (r > 1000)
3803 r = 1000;
3804
3805 r = 1000 - r;
3806 dev_info(DEV, "%s bitmap stats [Bytes(packets)]: plain %u(%u), RLE %u(%u), "
3807 "total %u; compression: %u.%u%%\n",
3808 direction,
3809 c->bytes[1], c->packets[1],
3810 c->bytes[0], c->packets[0],
3811 total, r/10, r % 10);
3812}
3813
3814/* Since we are processing the bitfield from lower addresses to higher,
3815 it does not matter if the process it in 32 bit chunks or 64 bit
3816 chunks as long as it is little endian. (Understand it as byte stream,
3817 beginning with the lowest byte...) If we would use big endian
3818 we would need to process it from the highest address to the lowest,
3819 in order to be agnostic to the 32 vs 64 bits issue.
3820
3821 returns 0 on failure, 1 if we successfully received it. */
4a76b161 3822static int receive_bitmap(struct drbd_tconn *tconn, struct packet_info *pi)
b411b363 3823{
4a76b161 3824 struct drbd_conf *mdev;
b411b363 3825 struct bm_xfer_ctx c;
2c46407d 3826 int err;
4a76b161
AG
3827 struct p_header *h = tconn->data.rbuf;
3828
3829 mdev = vnr_to_mdev(tconn, pi->vnr);
3830 if (!mdev)
3831 return -EIO;
b411b363 3832
20ceb2b2
LE
3833 drbd_bm_lock(mdev, "receive bitmap", BM_LOCKED_SET_ALLOWED);
3834 /* you are supposed to send additional out-of-sync information
3835 * if you actually set bits during this phase */
b411b363 3836
b411b363
PR
3837 c = (struct bm_xfer_ctx) {
3838 .bm_bits = drbd_bm_bits(mdev),
3839 .bm_words = drbd_bm_words(mdev),
3840 };
3841
2c46407d 3842 for(;;) {
e2857216
AG
3843 if (pi->cmd == P_BITMAP) {
3844 err = receive_bitmap_plain(mdev, pi->size, h, &c);
3845 } else if (pi->cmd == P_COMPRESSED_BITMAP) {
b411b363
PR
3846 /* MAYBE: sanity check that we speak proto >= 90,
3847 * and the feature is enabled! */
3848 struct p_compressed_bm *p;
3849
e2857216 3850 if (pi->size > BM_PACKET_PAYLOAD_BYTES) {
b411b363 3851 dev_err(DEV, "ReportCBitmap packet too large\n");
82bc0194 3852 err = -EIO;
b411b363
PR
3853 goto out;
3854 }
fc56815c
AG
3855
3856 p = mdev->tconn->data.rbuf;
e2857216 3857 err = drbd_recv_all(mdev->tconn, p->head.payload, pi->size);
82bc0194
AG
3858 if (err)
3859 goto out;
e2857216
AG
3860 if (pi->size <= (sizeof(*p) - sizeof(p->head))) {
3861 dev_err(DEV, "ReportCBitmap packet too small (l:%u)\n", pi->size);
82bc0194 3862 err = -EIO;
78fcbdae 3863 goto out;
b411b363 3864 }
e2857216 3865 err = decode_bitmap_c(mdev, p, &c, pi->size);
b411b363 3866 } else {
e2857216 3867 dev_warn(DEV, "receive_bitmap: cmd neither ReportBitMap nor ReportCBitMap (is 0x%x)", pi->cmd);
82bc0194 3868 err = -EIO;
b411b363
PR
3869 goto out;
3870 }
3871
e2857216
AG
3872 c.packets[pi->cmd == P_BITMAP]++;
3873 c.bytes[pi->cmd == P_BITMAP] += sizeof(struct p_header) + pi->size;
b411b363 3874
2c46407d
AG
3875 if (err <= 0) {
3876 if (err < 0)
3877 goto out;
b411b363 3878 break;
2c46407d 3879 }
e2857216 3880 err = drbd_recv_header(mdev->tconn, pi);
82bc0194 3881 if (err)
b411b363 3882 goto out;
2c46407d 3883 }
b411b363
PR
3884
3885 INFO_bm_xfer_stats(mdev, "receive", &c);
3886
3887 if (mdev->state.conn == C_WF_BITMAP_T) {
de1f8e4a
AG
3888 enum drbd_state_rv rv;
3889
82bc0194
AG
3890 err = drbd_send_bitmap(mdev);
3891 if (err)
b411b363
PR
3892 goto out;
3893 /* Omit CS_ORDERED with this state transition to avoid deadlocks. */
de1f8e4a
AG
3894 rv = _drbd_request_state(mdev, NS(conn, C_WF_SYNC_UUID), CS_VERBOSE);
3895 D_ASSERT(rv == SS_SUCCESS);
b411b363
PR
3896 } else if (mdev->state.conn != C_WF_BITMAP_S) {
3897 /* admin may have requested C_DISCONNECTING,
3898 * other threads may have noticed network errors */
3899 dev_info(DEV, "unexpected cstate (%s) in receive_bitmap\n",
3900 drbd_conn_str(mdev->state.conn));
3901 }
82bc0194 3902 err = 0;
b411b363 3903
b411b363 3904 out:
20ceb2b2 3905 drbd_bm_unlock(mdev);
82bc0194 3906 if (!err && mdev->state.conn == C_WF_BITMAP_S)
b411b363 3907 drbd_start_resync(mdev, C_SYNC_SOURCE);
82bc0194 3908 return err;
b411b363
PR
3909}
3910
4a76b161 3911static int receive_skip(struct drbd_tconn *tconn, struct packet_info *pi)
b411b363 3912{
4a76b161 3913 conn_warn(tconn, "skipping unknown optional packet type %d, l: %d!\n",
e2857216 3914 pi->cmd, pi->size);
2de876ef 3915
4a76b161 3916 return ignore_remaining_packet(tconn, pi);
2de876ef
PR
3917}
3918
4a76b161 3919static int receive_UnplugRemote(struct drbd_tconn *tconn, struct packet_info *pi)
0ced55a3 3920{
e7f52dfb
LE
3921 /* Make sure we've acked all the TCP data associated
3922 * with the data requests being unplugged */
4a76b161 3923 drbd_tcp_quickack(tconn->data.socket);
0ced55a3 3924
82bc0194 3925 return 0;
0ced55a3
PR
3926}
3927
4a76b161 3928static int receive_out_of_sync(struct drbd_tconn *tconn, struct packet_info *pi)
73a01a18 3929{
4a76b161
AG
3930 struct drbd_conf *mdev;
3931 struct p_block_desc *p = tconn->data.rbuf;
3932
3933 mdev = vnr_to_mdev(tconn, pi->vnr);
3934 if (!mdev)
3935 return -EIO;
73a01a18 3936
f735e363
LE
3937 switch (mdev->state.conn) {
3938 case C_WF_SYNC_UUID:
3939 case C_WF_BITMAP_T:
3940 case C_BEHIND:
3941 break;
3942 default:
3943 dev_err(DEV, "ASSERT FAILED cstate = %s, expected: WFSyncUUID|WFBitMapT|Behind\n",
3944 drbd_conn_str(mdev->state.conn));
3945 }
3946
73a01a18
PR
3947 drbd_set_out_of_sync(mdev, be64_to_cpu(p->sector), be32_to_cpu(p->blksize));
3948
82bc0194 3949 return 0;
73a01a18
PR
3950}
3951
02918be2
PR
3952struct data_cmd {
3953 int expect_payload;
3954 size_t pkt_size;
4a76b161 3955 int (*fn)(struct drbd_tconn *, struct packet_info *);
02918be2
PR
3956};
3957
3958static struct data_cmd drbd_cmd_handler[] = {
4a76b161
AG
3959 [P_DATA] = { 1, sizeof(struct p_data), receive_Data },
3960 [P_DATA_REPLY] = { 1, sizeof(struct p_data), receive_DataReply },
3961 [P_RS_DATA_REPLY] = { 1, sizeof(struct p_data), receive_RSDataReply } ,
3962 [P_BARRIER] = { 0, sizeof(struct p_barrier), receive_Barrier } ,
3963 [P_BITMAP] = { 1, sizeof(struct p_header), receive_bitmap } ,
3964 [P_COMPRESSED_BITMAP] = { 1, sizeof(struct p_header), receive_bitmap } ,
3965 [P_UNPLUG_REMOTE] = { 0, sizeof(struct p_header), receive_UnplugRemote },
3966 [P_DATA_REQUEST] = { 0, sizeof(struct p_block_req), receive_DataRequest },
3967 [P_RS_DATA_REQUEST] = { 0, sizeof(struct p_block_req), receive_DataRequest },
3968 [P_SYNC_PARAM] = { 1, sizeof(struct p_header), receive_SyncParam },
3969 [P_SYNC_PARAM89] = { 1, sizeof(struct p_header), receive_SyncParam },
3970 [P_PROTOCOL] = { 1, sizeof(struct p_protocol), receive_protocol },
3971 [P_UUIDS] = { 0, sizeof(struct p_uuids), receive_uuids },
3972 [P_SIZES] = { 0, sizeof(struct p_sizes), receive_sizes },
3973 [P_STATE] = { 0, sizeof(struct p_state), receive_state },
3974 [P_STATE_CHG_REQ] = { 0, sizeof(struct p_req_state), receive_req_state },
3975 [P_SYNC_UUID] = { 0, sizeof(struct p_rs_uuid), receive_sync_uuid },
3976 [P_OV_REQUEST] = { 0, sizeof(struct p_block_req), receive_DataRequest },
3977 [P_OV_REPLY] = { 1, sizeof(struct p_block_req), receive_DataRequest },
3978 [P_CSUM_RS_REQUEST] = { 1, sizeof(struct p_block_req), receive_DataRequest },
3979 [P_DELAY_PROBE] = { 0, sizeof(struct p_delay_probe93), receive_skip },
3980 [P_OUT_OF_SYNC] = { 0, sizeof(struct p_block_desc), receive_out_of_sync },
3981 [P_CONN_ST_CHG_REQ] = { 0, sizeof(struct p_req_state), receive_req_conn_state },
b411b363
PR
3982};
3983
eefc2f7d 3984static void drbdd(struct drbd_tconn *tconn)
b411b363 3985{
e6ef8a5c 3986 struct p_header *header = tconn->data.rbuf;
77351055 3987 struct packet_info pi;
02918be2 3988 size_t shs; /* sub header size */
82bc0194 3989 int err;
b411b363 3990
eefc2f7d 3991 while (get_t_state(&tconn->receiver) == RUNNING) {
deebe195
AG
3992 struct data_cmd *cmd;
3993
eefc2f7d 3994 drbd_thread_current_set_cpu(&tconn->receiver);
69bc7bc3 3995 if (drbd_recv_header(tconn, &pi))
02918be2 3996 goto err_out;
b411b363 3997
deebe195 3998 cmd = &drbd_cmd_handler[pi.cmd];
4a76b161 3999 if (unlikely(pi.cmd >= ARRAY_SIZE(drbd_cmd_handler) || !cmd->fn)) {
eefc2f7d 4000 conn_err(tconn, "unknown packet type %d, l: %d!\n", pi.cmd, pi.size);
02918be2 4001 goto err_out;
0b33a916 4002 }
b411b363 4003
deebe195
AG
4004 shs = cmd->pkt_size - sizeof(struct p_header);
4005 if (pi.size - shs > 0 && !cmd->expect_payload) {
eefc2f7d 4006 conn_err(tconn, "No payload expected %s l:%d\n", cmdname(pi.cmd), pi.size);
02918be2 4007 goto err_out;
b411b363 4008 }
b411b363 4009
c13f7e1a 4010 if (shs) {
a5c31904
AG
4011 err = drbd_recv_all_warn(tconn, &header->payload, shs);
4012 if (err)
c13f7e1a 4013 goto err_out;
e2857216 4014 pi.size -= shs;
c13f7e1a
LE
4015 }
4016
4a76b161
AG
4017 err = cmd->fn(tconn, &pi);
4018 if (err) {
eefc2f7d 4019 conn_err(tconn, "error receiving %s, l: %d!\n",
77351055 4020 cmdname(pi.cmd), pi.size);
02918be2 4021 goto err_out;
b411b363
PR
4022 }
4023 }
82bc0194 4024 return;
b411b363 4025
82bc0194
AG
4026 err_out:
4027 conn_request_state(tconn, NS(conn, C_PROTOCOL_ERROR), CS_HARD);
b411b363
PR
4028}
4029
0e29d163 4030void conn_flush_workqueue(struct drbd_tconn *tconn)
b411b363
PR
4031{
4032 struct drbd_wq_barrier barr;
4033
4034 barr.w.cb = w_prev_work_done;
0e29d163 4035 barr.w.tconn = tconn;
b411b363 4036 init_completion(&barr.done);
0e29d163 4037 drbd_queue_work(&tconn->data.work, &barr.w);
b411b363
PR
4038 wait_for_completion(&barr.done);
4039}
4040
360cc740 4041static void drbd_disconnect(struct drbd_tconn *tconn)
b411b363 4042{
bbeb641c 4043 enum drbd_conns oc;
b411b363 4044 int rv = SS_UNKNOWN_ERROR;
b411b363 4045
bbeb641c 4046 if (tconn->cstate == C_STANDALONE)
b411b363 4047 return;
b411b363
PR
4048
4049 /* asender does not clean up anything. it must not interfere, either */
360cc740
PR
4050 drbd_thread_stop(&tconn->asender);
4051 drbd_free_sock(tconn);
4052
4053 idr_for_each(&tconn->volumes, drbd_disconnected, tconn);
360cc740
PR
4054 conn_info(tconn, "Connection closed\n");
4055
cb703454
PR
4056 if (conn_highest_role(tconn) == R_PRIMARY && conn_highest_pdsk(tconn) >= D_UNKNOWN)
4057 conn_try_outdate_peer_async(tconn);
4058
360cc740 4059 spin_lock_irq(&tconn->req_lock);
bbeb641c
PR
4060 oc = tconn->cstate;
4061 if (oc >= C_UNCONNECTED)
4062 rv = _conn_request_state(tconn, NS(conn, C_UNCONNECTED), CS_VERBOSE);
4063
360cc740
PR
4064 spin_unlock_irq(&tconn->req_lock);
4065
bbeb641c 4066 if (oc == C_DISCONNECTING) {
360cc740
PR
4067 wait_event(tconn->net_cnt_wait, atomic_read(&tconn->net_cnt) == 0);
4068
4069 crypto_free_hash(tconn->cram_hmac_tfm);
4070 tconn->cram_hmac_tfm = NULL;
4071
4072 kfree(tconn->net_conf);
4073 tconn->net_conf = NULL;
bbeb641c 4074 conn_request_state(tconn, NS(conn, C_STANDALONE), CS_VERBOSE);
360cc740
PR
4075 }
4076}
4077
4078static int drbd_disconnected(int vnr, void *p, void *data)
4079{
4080 struct drbd_conf *mdev = (struct drbd_conf *)p;
4081 enum drbd_fencing_p fp;
4082 unsigned int i;
b411b363 4083
85719573 4084 /* wait for current activity to cease. */
87eeee41 4085 spin_lock_irq(&mdev->tconn->req_lock);
b411b363
PR
4086 _drbd_wait_ee_list_empty(mdev, &mdev->active_ee);
4087 _drbd_wait_ee_list_empty(mdev, &mdev->sync_ee);
4088 _drbd_wait_ee_list_empty(mdev, &mdev->read_ee);
87eeee41 4089 spin_unlock_irq(&mdev->tconn->req_lock);
b411b363
PR
4090
4091 /* We do not have data structures that would allow us to
4092 * get the rs_pending_cnt down to 0 again.
4093 * * On C_SYNC_TARGET we do not have any data structures describing
4094 * the pending RSDataRequest's we have sent.
4095 * * On C_SYNC_SOURCE there is no data structure that tracks
4096 * the P_RS_DATA_REPLY blocks that we sent to the SyncTarget.
4097 * And no, it is not the sum of the reference counts in the
4098 * resync_LRU. The resync_LRU tracks the whole operation including
4099 * the disk-IO, while the rs_pending_cnt only tracks the blocks
4100 * on the fly. */
4101 drbd_rs_cancel_all(mdev);
4102 mdev->rs_total = 0;
4103 mdev->rs_failed = 0;
4104 atomic_set(&mdev->rs_pending_cnt, 0);
4105 wake_up(&mdev->misc_wait);
4106
7fde2be9
PR
4107 del_timer(&mdev->request_timer);
4108
b411b363 4109 del_timer_sync(&mdev->resync_timer);
b411b363
PR
4110 resync_timer_fn((unsigned long)mdev);
4111
b411b363
PR
4112 /* wait for all w_e_end_data_req, w_e_end_rsdata_req, w_send_barrier,
4113 * w_make_resync_request etc. which may still be on the worker queue
4114 * to be "canceled" */
a21e9298 4115 drbd_flush_workqueue(mdev);
b411b363
PR
4116
4117 /* This also does reclaim_net_ee(). If we do this too early, we might
4118 * miss some resync ee and pages.*/
4119 drbd_process_done_ee(mdev);
4120
4121 kfree(mdev->p_uuid);
4122 mdev->p_uuid = NULL;
4123
fb22c402 4124 if (!is_susp(mdev->state))
2f5cdd0b 4125 tl_clear(mdev->tconn);
b411b363 4126
b411b363
PR
4127 drbd_md_sync(mdev);
4128
4129 fp = FP_DONT_CARE;
4130 if (get_ldev(mdev)) {
4131 fp = mdev->ldev->dc.fencing;
4132 put_ldev(mdev);
4133 }
4134
20ceb2b2
LE
4135 /* serialize with bitmap writeout triggered by the state change,
4136 * if any. */
4137 wait_event(mdev->misc_wait, !test_bit(BITMAP_IO, &mdev->flags));
4138
b411b363
PR
4139 /* tcp_close and release of sendpage pages can be deferred. I don't
4140 * want to use SO_LINGER, because apparently it can be deferred for
4141 * more than 20 seconds (longest time I checked).
4142 *
4143 * Actually we don't care for exactly when the network stack does its
4144 * put_page(), but release our reference on these pages right here.
4145 */
4146 i = drbd_release_ee(mdev, &mdev->net_ee);
4147 if (i)
4148 dev_info(DEV, "net_ee not empty, killed %u entries\n", i);
435f0740
LE
4149 i = atomic_read(&mdev->pp_in_use_by_net);
4150 if (i)
4151 dev_info(DEV, "pp_in_use_by_net = %d, expected 0\n", i);
b411b363
PR
4152 i = atomic_read(&mdev->pp_in_use);
4153 if (i)
45bb912b 4154 dev_info(DEV, "pp_in_use = %d, expected 0\n", i);
b411b363
PR
4155
4156 D_ASSERT(list_empty(&mdev->read_ee));
4157 D_ASSERT(list_empty(&mdev->active_ee));
4158 D_ASSERT(list_empty(&mdev->sync_ee));
4159 D_ASSERT(list_empty(&mdev->done_ee));
4160
4161 /* ok, no more ee's on the fly, it is safe to reset the epoch_size */
4162 atomic_set(&mdev->current_epoch->epoch_size, 0);
4163 D_ASSERT(list_empty(&mdev->current_epoch->list));
360cc740
PR
4164
4165 return 0;
b411b363
PR
4166}
4167
4168/*
4169 * We support PRO_VERSION_MIN to PRO_VERSION_MAX. The protocol version
4170 * we can agree on is stored in agreed_pro_version.
4171 *
4172 * feature flags and the reserved array should be enough room for future
4173 * enhancements of the handshake protocol, and possible plugins...
4174 *
4175 * for now, they are expected to be zero, but ignored.
4176 */
8a22cccc 4177static int drbd_send_handshake(struct drbd_tconn *tconn)
b411b363 4178{
e6b3ea83 4179 /* ASSERT current == mdev->tconn->receiver ... */
5a87d920 4180 struct p_handshake *p = tconn->data.sbuf;
e8d17b01 4181 int err;
b411b363 4182
8a22cccc
PR
4183 if (mutex_lock_interruptible(&tconn->data.mutex)) {
4184 conn_err(tconn, "interrupted during initial handshake\n");
e8d17b01 4185 return -EINTR;
b411b363
PR
4186 }
4187
8a22cccc
PR
4188 if (tconn->data.socket == NULL) {
4189 mutex_unlock(&tconn->data.mutex);
e8d17b01 4190 return -EIO;
b411b363
PR
4191 }
4192
4193 memset(p, 0, sizeof(*p));
4194 p->protocol_min = cpu_to_be32(PRO_VERSION_MIN);
4195 p->protocol_max = cpu_to_be32(PRO_VERSION_MAX);
e8d17b01 4196 err = _conn_send_cmd(tconn, 0, tconn->data.socket, P_HAND_SHAKE,
ecf2363c 4197 &p->head, sizeof(*p), 0);
8a22cccc 4198 mutex_unlock(&tconn->data.mutex);
e8d17b01 4199 return err;
b411b363
PR
4200}
4201
4202/*
4203 * return values:
4204 * 1 yes, we have a valid connection
4205 * 0 oops, did not work out, please try again
4206 * -1 peer talks different language,
4207 * no point in trying again, please go standalone.
4208 */
65d11ed6 4209static int drbd_do_handshake(struct drbd_tconn *tconn)
b411b363 4210{
65d11ed6 4211 /* ASSERT current == tconn->receiver ... */
e6ef8a5c 4212 struct p_handshake *p = tconn->data.rbuf;
02918be2 4213 const int expect = sizeof(struct p_handshake) - sizeof(struct p_header80);
77351055 4214 struct packet_info pi;
a5c31904 4215 int err;
b411b363 4216
e8d17b01
AG
4217 err = drbd_send_handshake(tconn);
4218 if (err)
b411b363
PR
4219 return 0;
4220
69bc7bc3
AG
4221 err = drbd_recv_header(tconn, &pi);
4222 if (err)
b411b363
PR
4223 return 0;
4224
77351055 4225 if (pi.cmd != P_HAND_SHAKE) {
65d11ed6 4226 conn_err(tconn, "expected HandShake packet, received: %s (0x%04x)\n",
77351055 4227 cmdname(pi.cmd), pi.cmd);
b411b363
PR
4228 return -1;
4229 }
4230
77351055 4231 if (pi.size != expect) {
65d11ed6 4232 conn_err(tconn, "expected HandShake length: %u, received: %u\n",
77351055 4233 expect, pi.size);
b411b363
PR
4234 return -1;
4235 }
4236
a5c31904
AG
4237 err = drbd_recv_all_warn(tconn, &p->head.payload, expect);
4238 if (err)
b411b363 4239 return 0;
b411b363 4240
b411b363
PR
4241 p->protocol_min = be32_to_cpu(p->protocol_min);
4242 p->protocol_max = be32_to_cpu(p->protocol_max);
4243 if (p->protocol_max == 0)
4244 p->protocol_max = p->protocol_min;
4245
4246 if (PRO_VERSION_MAX < p->protocol_min ||
4247 PRO_VERSION_MIN > p->protocol_max)
4248 goto incompat;
4249
65d11ed6 4250 tconn->agreed_pro_version = min_t(int, PRO_VERSION_MAX, p->protocol_max);
b411b363 4251
65d11ed6
PR
4252 conn_info(tconn, "Handshake successful: "
4253 "Agreed network protocol version %d\n", tconn->agreed_pro_version);
b411b363
PR
4254
4255 return 1;
4256
4257 incompat:
65d11ed6 4258 conn_err(tconn, "incompatible DRBD dialects: "
b411b363
PR
4259 "I support %d-%d, peer supports %d-%d\n",
4260 PRO_VERSION_MIN, PRO_VERSION_MAX,
4261 p->protocol_min, p->protocol_max);
4262 return -1;
4263}
4264
4265#if !defined(CONFIG_CRYPTO_HMAC) && !defined(CONFIG_CRYPTO_HMAC_MODULE)
13e6037d 4266static int drbd_do_auth(struct drbd_tconn *tconn)
b411b363
PR
4267{
4268 dev_err(DEV, "This kernel was build without CONFIG_CRYPTO_HMAC.\n");
4269 dev_err(DEV, "You need to disable 'cram-hmac-alg' in drbd.conf.\n");
b10d96cb 4270 return -1;
b411b363
PR
4271}
4272#else
4273#define CHALLENGE_LEN 64
b10d96cb
JT
4274
4275/* Return value:
4276 1 - auth succeeded,
4277 0 - failed, try again (network error),
4278 -1 - auth failed, don't try again.
4279*/
4280
13e6037d 4281static int drbd_do_auth(struct drbd_tconn *tconn)
b411b363
PR
4282{
4283 char my_challenge[CHALLENGE_LEN]; /* 64 Bytes... */
4284 struct scatterlist sg;
4285 char *response = NULL;
4286 char *right_response = NULL;
4287 char *peers_ch = NULL;
13e6037d 4288 unsigned int key_len = strlen(tconn->net_conf->shared_secret);
b411b363
PR
4289 unsigned int resp_size;
4290 struct hash_desc desc;
77351055 4291 struct packet_info pi;
69bc7bc3 4292 int err, rv;
b411b363 4293
13e6037d 4294 desc.tfm = tconn->cram_hmac_tfm;
b411b363
PR
4295 desc.flags = 0;
4296
13e6037d
PR
4297 rv = crypto_hash_setkey(tconn->cram_hmac_tfm,
4298 (u8 *)tconn->net_conf->shared_secret, key_len);
b411b363 4299 if (rv) {
13e6037d 4300 conn_err(tconn, "crypto_hash_setkey() failed with %d\n", rv);
b10d96cb 4301 rv = -1;
b411b363
PR
4302 goto fail;
4303 }
4304
4305 get_random_bytes(my_challenge, CHALLENGE_LEN);
4306
ce9879cb 4307 rv = !conn_send_cmd2(tconn, P_AUTH_CHALLENGE, my_challenge, CHALLENGE_LEN);
b411b363
PR
4308 if (!rv)
4309 goto fail;
4310
69bc7bc3
AG
4311 err = drbd_recv_header(tconn, &pi);
4312 if (err) {
4313 rv = 0;
b411b363 4314 goto fail;
69bc7bc3 4315 }
b411b363 4316
77351055 4317 if (pi.cmd != P_AUTH_CHALLENGE) {
13e6037d 4318 conn_err(tconn, "expected AuthChallenge packet, received: %s (0x%04x)\n",
77351055 4319 cmdname(pi.cmd), pi.cmd);
b411b363
PR
4320 rv = 0;
4321 goto fail;
4322 }
4323
77351055 4324 if (pi.size > CHALLENGE_LEN * 2) {
13e6037d 4325 conn_err(tconn, "expected AuthChallenge payload too big.\n");
b10d96cb 4326 rv = -1;
b411b363
PR
4327 goto fail;
4328 }
4329
77351055 4330 peers_ch = kmalloc(pi.size, GFP_NOIO);
b411b363 4331 if (peers_ch == NULL) {
13e6037d 4332 conn_err(tconn, "kmalloc of peers_ch failed\n");
b10d96cb 4333 rv = -1;
b411b363
PR
4334 goto fail;
4335 }
4336
a5c31904
AG
4337 err = drbd_recv_all_warn(tconn, peers_ch, pi.size);
4338 if (err) {
b411b363
PR
4339 rv = 0;
4340 goto fail;
4341 }
4342
13e6037d 4343 resp_size = crypto_hash_digestsize(tconn->cram_hmac_tfm);
b411b363
PR
4344 response = kmalloc(resp_size, GFP_NOIO);
4345 if (response == NULL) {
13e6037d 4346 conn_err(tconn, "kmalloc of response failed\n");
b10d96cb 4347 rv = -1;
b411b363
PR
4348 goto fail;
4349 }
4350
4351 sg_init_table(&sg, 1);
77351055 4352 sg_set_buf(&sg, peers_ch, pi.size);
b411b363
PR
4353
4354 rv = crypto_hash_digest(&desc, &sg, sg.length, response);
4355 if (rv) {
13e6037d 4356 conn_err(tconn, "crypto_hash_digest() failed with %d\n", rv);
b10d96cb 4357 rv = -1;
b411b363
PR
4358 goto fail;
4359 }
4360
ce9879cb 4361 rv = !conn_send_cmd2(tconn, P_AUTH_RESPONSE, response, resp_size);
b411b363
PR
4362 if (!rv)
4363 goto fail;
4364
69bc7bc3
AG
4365 err = drbd_recv_header(tconn, &pi);
4366 if (err) {
4367 rv = 0;
b411b363 4368 goto fail;
69bc7bc3 4369 }
b411b363 4370
77351055 4371 if (pi.cmd != P_AUTH_RESPONSE) {
13e6037d 4372 conn_err(tconn, "expected AuthResponse packet, received: %s (0x%04x)\n",
77351055 4373 cmdname(pi.cmd), pi.cmd);
b411b363
PR
4374 rv = 0;
4375 goto fail;
4376 }
4377
77351055 4378 if (pi.size != resp_size) {
13e6037d 4379 conn_err(tconn, "expected AuthResponse payload of wrong size\n");
b411b363
PR
4380 rv = 0;
4381 goto fail;
4382 }
4383
a5c31904
AG
4384 err = drbd_recv_all_warn(tconn, response , resp_size);
4385 if (err) {
b411b363
PR
4386 rv = 0;
4387 goto fail;
4388 }
4389
4390 right_response = kmalloc(resp_size, GFP_NOIO);
2d1ee87d 4391 if (right_response == NULL) {
13e6037d 4392 conn_err(tconn, "kmalloc of right_response failed\n");
b10d96cb 4393 rv = -1;
b411b363
PR
4394 goto fail;
4395 }
4396
4397 sg_set_buf(&sg, my_challenge, CHALLENGE_LEN);
4398
4399 rv = crypto_hash_digest(&desc, &sg, sg.length, right_response);
4400 if (rv) {
13e6037d 4401 conn_err(tconn, "crypto_hash_digest() failed with %d\n", rv);
b10d96cb 4402 rv = -1;
b411b363
PR
4403 goto fail;
4404 }
4405
4406 rv = !memcmp(response, right_response, resp_size);
4407
4408 if (rv)
13e6037d
PR
4409 conn_info(tconn, "Peer authenticated using %d bytes of '%s' HMAC\n",
4410 resp_size, tconn->net_conf->cram_hmac_alg);
b10d96cb
JT
4411 else
4412 rv = -1;
b411b363
PR
4413
4414 fail:
4415 kfree(peers_ch);
4416 kfree(response);
4417 kfree(right_response);
4418
4419 return rv;
4420}
4421#endif
4422
4423int drbdd_init(struct drbd_thread *thi)
4424{
392c8801 4425 struct drbd_tconn *tconn = thi->tconn;
b411b363
PR
4426 int h;
4427
4d641dd7 4428 conn_info(tconn, "receiver (re)started\n");
b411b363
PR
4429
4430 do {
4d641dd7 4431 h = drbd_connect(tconn);
b411b363 4432 if (h == 0) {
4d641dd7 4433 drbd_disconnect(tconn);
20ee6390 4434 schedule_timeout_interruptible(HZ);
b411b363
PR
4435 }
4436 if (h == -1) {
4d641dd7 4437 conn_warn(tconn, "Discarding network configuration.\n");
bbeb641c 4438 conn_request_state(tconn, NS(conn, C_DISCONNECTING), CS_HARD);
b411b363
PR
4439 }
4440 } while (h == 0);
4441
4442 if (h > 0) {
4d641dd7
PR
4443 if (get_net_conf(tconn)) {
4444 drbdd(tconn);
4445 put_net_conf(tconn);
b411b363
PR
4446 }
4447 }
4448
4d641dd7 4449 drbd_disconnect(tconn);
b411b363 4450
4d641dd7 4451 conn_info(tconn, "receiver terminated\n");
b411b363
PR
4452 return 0;
4453}
4454
4455/* ********* acknowledge sender ******** */
4456
e05e1e59 4457static int got_conn_RqSReply(struct drbd_tconn *tconn, struct packet_info *pi)
e4f78ede 4458{
e6ef8a5c 4459 struct p_req_state_reply *p = tconn->meta.rbuf;
e4f78ede
PR
4460 int retcode = be32_to_cpu(p->retcode);
4461
4462 if (retcode >= SS_SUCCESS) {
4463 set_bit(CONN_WD_ST_CHG_OKAY, &tconn->flags);
4464 } else {
4465 set_bit(CONN_WD_ST_CHG_FAIL, &tconn->flags);
4466 conn_err(tconn, "Requested state change failed by peer: %s (%d)\n",
4467 drbd_set_st_err_str(retcode), retcode);
4468 }
4469 wake_up(&tconn->ping_wait);
4470
4471 return true;
4472}
4473
1952e916 4474static int got_RqSReply(struct drbd_tconn *tconn, struct packet_info *pi)
b411b363 4475{
1952e916
AG
4476 struct drbd_conf *mdev;
4477 struct p_req_state_reply *p = tconn->meta.rbuf;
b411b363
PR
4478 int retcode = be32_to_cpu(p->retcode);
4479
1952e916
AG
4480 mdev = vnr_to_mdev(tconn, pi->vnr);
4481 if (!mdev)
4482 return false;
4483
e4f78ede
PR
4484 if (retcode >= SS_SUCCESS) {
4485 set_bit(CL_ST_CHG_SUCCESS, &mdev->flags);
4486 } else {
4487 set_bit(CL_ST_CHG_FAIL, &mdev->flags);
4488 dev_err(DEV, "Requested state change failed by peer: %s (%d)\n",
4489 drbd_set_st_err_str(retcode), retcode);
b411b363 4490 }
e4f78ede
PR
4491 wake_up(&mdev->state_wait);
4492
81e84650 4493 return true;
b411b363
PR
4494}
4495
e05e1e59 4496static int got_Ping(struct drbd_tconn *tconn, struct packet_info *pi)
b411b363 4497{
f19e4f8b 4498 return drbd_send_ping_ack(tconn);
b411b363
PR
4499
4500}
4501
e05e1e59 4502static int got_PingAck(struct drbd_tconn *tconn, struct packet_info *pi)
b411b363
PR
4503{
4504 /* restore idle timeout */
2a67d8b9
PR
4505 tconn->meta.socket->sk->sk_rcvtimeo = tconn->net_conf->ping_int*HZ;
4506 if (!test_and_set_bit(GOT_PING_ACK, &tconn->flags))
4507 wake_up(&tconn->ping_wait);
b411b363 4508
81e84650 4509 return true;
b411b363
PR
4510}
4511
1952e916 4512static int got_IsInSync(struct drbd_tconn *tconn, struct packet_info *pi)
b411b363 4513{
1952e916
AG
4514 struct drbd_conf *mdev;
4515 struct p_block_ack *p = tconn->meta.rbuf;
b411b363
PR
4516 sector_t sector = be64_to_cpu(p->sector);
4517 int blksize = be32_to_cpu(p->blksize);
4518
1952e916
AG
4519 mdev = vnr_to_mdev(tconn, pi->vnr);
4520 if (!mdev)
4521 return false;
4522
31890f4a 4523 D_ASSERT(mdev->tconn->agreed_pro_version >= 89);
b411b363
PR
4524
4525 update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4526
1d53f09e
LE
4527 if (get_ldev(mdev)) {
4528 drbd_rs_complete_io(mdev, sector);
4529 drbd_set_in_sync(mdev, sector, blksize);
4530 /* rs_same_csums is supposed to count in units of BM_BLOCK_SIZE */
4531 mdev->rs_same_csum += (blksize >> BM_BLOCK_SHIFT);
4532 put_ldev(mdev);
4533 }
b411b363 4534 dec_rs_pending(mdev);
778f271d 4535 atomic_add(blksize >> 9, &mdev->rs_sect_in);
b411b363 4536
81e84650 4537 return true;
b411b363
PR
4538}
4539
bc9c5c41
AG
4540static int
4541validate_req_change_req_state(struct drbd_conf *mdev, u64 id, sector_t sector,
4542 struct rb_root *root, const char *func,
4543 enum drbd_req_event what, bool missing_ok)
b411b363
PR
4544{
4545 struct drbd_request *req;
4546 struct bio_and_error m;
4547
87eeee41 4548 spin_lock_irq(&mdev->tconn->req_lock);
bc9c5c41 4549 req = find_request(mdev, root, id, sector, missing_ok, func);
b411b363 4550 if (unlikely(!req)) {
87eeee41 4551 spin_unlock_irq(&mdev->tconn->req_lock);
81e84650 4552 return false;
b411b363
PR
4553 }
4554 __req_mod(req, what, &m);
87eeee41 4555 spin_unlock_irq(&mdev->tconn->req_lock);
b411b363
PR
4556
4557 if (m.bio)
4558 complete_master_bio(mdev, &m);
81e84650 4559 return true;
b411b363
PR
4560}
4561
1952e916 4562static int got_BlockAck(struct drbd_tconn *tconn, struct packet_info *pi)
b411b363 4563{
1952e916
AG
4564 struct drbd_conf *mdev;
4565 struct p_block_ack *p = tconn->meta.rbuf;
b411b363
PR
4566 sector_t sector = be64_to_cpu(p->sector);
4567 int blksize = be32_to_cpu(p->blksize);
4568 enum drbd_req_event what;
4569
1952e916
AG
4570 mdev = vnr_to_mdev(tconn, pi->vnr);
4571 if (!mdev)
4572 return false;
4573
b411b363
PR
4574 update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4575
579b57ed 4576 if (p->block_id == ID_SYNCER) {
b411b363
PR
4577 drbd_set_in_sync(mdev, sector, blksize);
4578 dec_rs_pending(mdev);
81e84650 4579 return true;
b411b363 4580 }
e05e1e59 4581 switch (pi->cmd) {
b411b363 4582 case P_RS_WRITE_ACK:
89e58e75 4583 D_ASSERT(mdev->tconn->net_conf->wire_protocol == DRBD_PROT_C);
8554df1c 4584 what = WRITE_ACKED_BY_PEER_AND_SIS;
b411b363
PR
4585 break;
4586 case P_WRITE_ACK:
89e58e75 4587 D_ASSERT(mdev->tconn->net_conf->wire_protocol == DRBD_PROT_C);
8554df1c 4588 what = WRITE_ACKED_BY_PEER;
b411b363
PR
4589 break;
4590 case P_RECV_ACK:
89e58e75 4591 D_ASSERT(mdev->tconn->net_conf->wire_protocol == DRBD_PROT_B);
8554df1c 4592 what = RECV_ACKED_BY_PEER;
b411b363 4593 break;
7be8da07 4594 case P_DISCARD_WRITE:
89e58e75 4595 D_ASSERT(mdev->tconn->net_conf->wire_protocol == DRBD_PROT_C);
7be8da07
AG
4596 what = DISCARD_WRITE;
4597 break;
4598 case P_RETRY_WRITE:
4599 D_ASSERT(mdev->tconn->net_conf->wire_protocol == DRBD_PROT_C);
4600 what = POSTPONE_WRITE;
b411b363
PR
4601 break;
4602 default:
4603 D_ASSERT(0);
81e84650 4604 return false;
b411b363
PR
4605 }
4606
4607 return validate_req_change_req_state(mdev, p->block_id, sector,
bc9c5c41
AG
4608 &mdev->write_requests, __func__,
4609 what, false);
b411b363
PR
4610}
4611
1952e916 4612static int got_NegAck(struct drbd_tconn *tconn, struct packet_info *pi)
b411b363 4613{
1952e916
AG
4614 struct drbd_conf *mdev;
4615 struct p_block_ack *p = tconn->meta.rbuf;
b411b363 4616 sector_t sector = be64_to_cpu(p->sector);
2deb8336 4617 int size = be32_to_cpu(p->blksize);
1952e916
AG
4618 bool missing_ok = tconn->net_conf->wire_protocol == DRBD_PROT_A ||
4619 tconn->net_conf->wire_protocol == DRBD_PROT_B;
c3afd8f5 4620 bool found;
b411b363 4621
1952e916
AG
4622 mdev = vnr_to_mdev(tconn, pi->vnr);
4623 if (!mdev)
4624 return false;
4625
b411b363
PR
4626 update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4627
579b57ed 4628 if (p->block_id == ID_SYNCER) {
b411b363
PR
4629 dec_rs_pending(mdev);
4630 drbd_rs_failed_io(mdev, sector, size);
81e84650 4631 return true;
b411b363 4632 }
2deb8336 4633
c3afd8f5 4634 found = validate_req_change_req_state(mdev, p->block_id, sector,
bc9c5c41 4635 &mdev->write_requests, __func__,
8554df1c 4636 NEG_ACKED, missing_ok);
c3afd8f5
AG
4637 if (!found) {
4638 /* Protocol A has no P_WRITE_ACKs, but has P_NEG_ACKs.
4639 The master bio might already be completed, therefore the
4640 request is no longer in the collision hash. */
4641 /* In Protocol B we might already have got a P_RECV_ACK
4642 but then get a P_NEG_ACK afterwards. */
4643 if (!missing_ok)
2deb8336 4644 return false;
c3afd8f5 4645 drbd_set_out_of_sync(mdev, sector, size);
2deb8336 4646 }
2deb8336 4647 return true;
b411b363
PR
4648}
4649
1952e916 4650static int got_NegDReply(struct drbd_tconn *tconn, struct packet_info *pi)
b411b363 4651{
1952e916
AG
4652 struct drbd_conf *mdev;
4653 struct p_block_ack *p = tconn->meta.rbuf;
b411b363
PR
4654 sector_t sector = be64_to_cpu(p->sector);
4655
1952e916
AG
4656 mdev = vnr_to_mdev(tconn, pi->vnr);
4657 if (!mdev)
4658 return false;
4659
b411b363 4660 update_peer_seq(mdev, be32_to_cpu(p->seq_num));
7be8da07 4661
b411b363
PR
4662 dev_err(DEV, "Got NegDReply; Sector %llus, len %u; Fail original request.\n",
4663 (unsigned long long)sector, be32_to_cpu(p->blksize));
4664
4665 return validate_req_change_req_state(mdev, p->block_id, sector,
bc9c5c41 4666 &mdev->read_requests, __func__,
8554df1c 4667 NEG_ACKED, false);
b411b363
PR
4668}
4669
1952e916 4670static int got_NegRSDReply(struct drbd_tconn *tconn, struct packet_info *pi)
b411b363 4671{
1952e916 4672 struct drbd_conf *mdev;
b411b363
PR
4673 sector_t sector;
4674 int size;
1952e916
AG
4675 struct p_block_ack *p = tconn->meta.rbuf;
4676
4677 mdev = vnr_to_mdev(tconn, pi->vnr);
4678 if (!mdev)
4679 return false;
b411b363
PR
4680
4681 sector = be64_to_cpu(p->sector);
4682 size = be32_to_cpu(p->blksize);
b411b363
PR
4683
4684 update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4685
4686 dec_rs_pending(mdev);
4687
4688 if (get_ldev_if_state(mdev, D_FAILED)) {
4689 drbd_rs_complete_io(mdev, sector);
e05e1e59 4690 switch (pi->cmd) {
d612d309
PR
4691 case P_NEG_RS_DREPLY:
4692 drbd_rs_failed_io(mdev, sector, size);
4693 case P_RS_CANCEL:
4694 break;
4695 default:
4696 D_ASSERT(0);
4697 put_ldev(mdev);
4698 return false;
4699 }
b411b363
PR
4700 put_ldev(mdev);
4701 }
4702
81e84650 4703 return true;
b411b363
PR
4704}
4705
1952e916 4706static int got_BarrierAck(struct drbd_tconn *tconn, struct packet_info *pi)
b411b363 4707{
1952e916
AG
4708 struct drbd_conf *mdev;
4709 struct p_barrier_ack *p = tconn->meta.rbuf;
4710
4711 mdev = vnr_to_mdev(tconn, pi->vnr);
4712 if (!mdev)
4713 return false;
b411b363 4714
2f5cdd0b 4715 tl_release(mdev->tconn, p->barrier, be32_to_cpu(p->set_size));
b411b363 4716
c4752ef1
PR
4717 if (mdev->state.conn == C_AHEAD &&
4718 atomic_read(&mdev->ap_in_flight) == 0 &&
370a43e7
PR
4719 !test_and_set_bit(AHEAD_TO_SYNC_SOURCE, &mdev->current_epoch->flags)) {
4720 mdev->start_resync_timer.expires = jiffies + HZ;
4721 add_timer(&mdev->start_resync_timer);
c4752ef1
PR
4722 }
4723
81e84650 4724 return true;
b411b363
PR
4725}
4726
1952e916 4727static int got_OVResult(struct drbd_tconn *tconn, struct packet_info *pi)
b411b363 4728{
1952e916
AG
4729 struct drbd_conf *mdev;
4730 struct p_block_ack *p = tconn->meta.rbuf;
b411b363
PR
4731 struct drbd_work *w;
4732 sector_t sector;
4733 int size;
4734
1952e916
AG
4735 mdev = vnr_to_mdev(tconn, pi->vnr);
4736 if (!mdev)
4737 return false;
4738
b411b363
PR
4739 sector = be64_to_cpu(p->sector);
4740 size = be32_to_cpu(p->blksize);
4741
4742 update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4743
4744 if (be64_to_cpu(p->block_id) == ID_OUT_OF_SYNC)
8f7bed77 4745 drbd_ov_out_of_sync_found(mdev, sector, size);
b411b363 4746 else
8f7bed77 4747 ov_out_of_sync_print(mdev);
b411b363 4748
1d53f09e 4749 if (!get_ldev(mdev))
81e84650 4750 return true;
1d53f09e 4751
b411b363
PR
4752 drbd_rs_complete_io(mdev, sector);
4753 dec_rs_pending(mdev);
4754
ea5442af
LE
4755 --mdev->ov_left;
4756
4757 /* let's advance progress step marks only for every other megabyte */
4758 if ((mdev->ov_left & 0x200) == 0x200)
4759 drbd_advance_rs_marks(mdev, mdev->ov_left);
4760
4761 if (mdev->ov_left == 0) {
b411b363
PR
4762 w = kmalloc(sizeof(*w), GFP_NOIO);
4763 if (w) {
4764 w->cb = w_ov_finished;
a21e9298 4765 w->mdev = mdev;
e42325a5 4766 drbd_queue_work_front(&mdev->tconn->data.work, w);
b411b363
PR
4767 } else {
4768 dev_err(DEV, "kmalloc(w) failed.");
8f7bed77 4769 ov_out_of_sync_print(mdev);
b411b363
PR
4770 drbd_resync_finished(mdev);
4771 }
4772 }
1d53f09e 4773 put_ldev(mdev);
81e84650 4774 return true;
b411b363
PR
4775}
4776
1952e916 4777static int got_skip(struct drbd_tconn *tconn, struct packet_info *pi)
0ced55a3 4778{
81e84650 4779 return true;
0ced55a3
PR
4780}
4781
32862ec7
PR
4782static int tconn_process_done_ee(struct drbd_tconn *tconn)
4783{
082a3439
PR
4784 struct drbd_conf *mdev;
4785 int i, not_empty = 0;
32862ec7
PR
4786
4787 do {
4788 clear_bit(SIGNAL_ASENDER, &tconn->flags);
4789 flush_signals(current);
082a3439 4790 idr_for_each_entry(&tconn->volumes, mdev, i) {
e2b3032b 4791 if (drbd_process_done_ee(mdev))
082a3439
PR
4792 return 1; /* error */
4793 }
32862ec7 4794 set_bit(SIGNAL_ASENDER, &tconn->flags);
082a3439
PR
4795
4796 spin_lock_irq(&tconn->req_lock);
4797 idr_for_each_entry(&tconn->volumes, mdev, i) {
4798 not_empty = !list_empty(&mdev->done_ee);
4799 if (not_empty)
4800 break;
4801 }
4802 spin_unlock_irq(&tconn->req_lock);
32862ec7
PR
4803 } while (not_empty);
4804
4805 return 0;
4806}
4807
7201b972
AG
4808struct asender_cmd {
4809 size_t pkt_size;
1952e916 4810 int (*fn)(struct drbd_tconn *tconn, struct packet_info *);
7201b972
AG
4811};
4812
4813static struct asender_cmd asender_tbl[] = {
1952e916
AG
4814 [P_PING] = { sizeof(struct p_header), got_Ping },
4815 [P_PING_ACK] = { sizeof(struct p_header), got_PingAck },
4816 [P_RECV_ACK] = { sizeof(struct p_block_ack), got_BlockAck },
4817 [P_WRITE_ACK] = { sizeof(struct p_block_ack), got_BlockAck },
4818 [P_RS_WRITE_ACK] = { sizeof(struct p_block_ack), got_BlockAck },
4819 [P_DISCARD_WRITE] = { sizeof(struct p_block_ack), got_BlockAck },
4820 [P_NEG_ACK] = { sizeof(struct p_block_ack), got_NegAck },
4821 [P_NEG_DREPLY] = { sizeof(struct p_block_ack), got_NegDReply },
4822 [P_NEG_RS_DREPLY] = { sizeof(struct p_block_ack), got_NegRSDReply },
4823 [P_OV_RESULT] = { sizeof(struct p_block_ack), got_OVResult },
4824 [P_BARRIER_ACK] = { sizeof(struct p_barrier_ack), got_BarrierAck },
4825 [P_STATE_CHG_REPLY] = { sizeof(struct p_req_state_reply), got_RqSReply },
4826 [P_RS_IS_IN_SYNC] = { sizeof(struct p_block_ack), got_IsInSync },
4827 [P_DELAY_PROBE] = { sizeof(struct p_delay_probe93), got_skip },
4828 [P_RS_CANCEL] = { sizeof(struct p_block_ack), got_NegRSDReply },
4829 [P_CONN_ST_CHG_REPLY]={ sizeof(struct p_req_state_reply), got_conn_RqSReply },
4830 [P_RETRY_WRITE] = { sizeof(struct p_block_ack), got_BlockAck },
7201b972
AG
4831};
4832
b411b363
PR
4833int drbd_asender(struct drbd_thread *thi)
4834{
392c8801 4835 struct drbd_tconn *tconn = thi->tconn;
e6ef8a5c 4836 struct p_header *h = tconn->meta.rbuf;
b411b363 4837 struct asender_cmd *cmd = NULL;
77351055 4838 struct packet_info pi;
257d0af6 4839 int rv;
b411b363
PR
4840 void *buf = h;
4841 int received = 0;
257d0af6 4842 int expect = sizeof(struct p_header);
f36af18c 4843 int ping_timeout_active = 0;
b411b363 4844
b411b363
PR
4845 current->policy = SCHED_RR; /* Make this a realtime task! */
4846 current->rt_priority = 2; /* more important than all other tasks */
4847
e77a0a5c 4848 while (get_t_state(thi) == RUNNING) {
80822284 4849 drbd_thread_current_set_cpu(thi);
32862ec7 4850 if (test_and_clear_bit(SEND_PING, &tconn->flags)) {
2a67d8b9 4851 if (!drbd_send_ping(tconn)) {
32862ec7 4852 conn_err(tconn, "drbd_send_ping has failed\n");
841ce241
AG
4853 goto reconnect;
4854 }
32862ec7
PR
4855 tconn->meta.socket->sk->sk_rcvtimeo =
4856 tconn->net_conf->ping_timeo*HZ/10;
f36af18c 4857 ping_timeout_active = 1;
b411b363
PR
4858 }
4859
32862ec7
PR
4860 /* TODO: conditionally cork; it may hurt latency if we cork without
4861 much to send */
4862 if (!tconn->net_conf->no_cork)
4863 drbd_tcp_cork(tconn->meta.socket);
082a3439
PR
4864 if (tconn_process_done_ee(tconn)) {
4865 conn_err(tconn, "tconn_process_done_ee() failed\n");
32862ec7 4866 goto reconnect;
082a3439 4867 }
b411b363 4868 /* but unconditionally uncork unless disabled */
32862ec7
PR
4869 if (!tconn->net_conf->no_cork)
4870 drbd_tcp_uncork(tconn->meta.socket);
b411b363
PR
4871
4872 /* short circuit, recv_msg would return EINTR anyways. */
4873 if (signal_pending(current))
4874 continue;
4875
32862ec7
PR
4876 rv = drbd_recv_short(tconn->meta.socket, buf, expect-received, 0);
4877 clear_bit(SIGNAL_ASENDER, &tconn->flags);
b411b363
PR
4878
4879 flush_signals(current);
4880
4881 /* Note:
4882 * -EINTR (on meta) we got a signal
4883 * -EAGAIN (on meta) rcvtimeo expired
4884 * -ECONNRESET other side closed the connection
4885 * -ERESTARTSYS (on data) we got a signal
4886 * rv < 0 other than above: unexpected error!
4887 * rv == expected: full header or command
4888 * rv < expected: "woken" by signal during receive
4889 * rv == 0 : "connection shut down by peer"
4890 */
4891 if (likely(rv > 0)) {
4892 received += rv;
4893 buf += rv;
4894 } else if (rv == 0) {
32862ec7 4895 conn_err(tconn, "meta connection shut down by peer.\n");
b411b363
PR
4896 goto reconnect;
4897 } else if (rv == -EAGAIN) {
cb6518cb
LE
4898 /* If the data socket received something meanwhile,
4899 * that is good enough: peer is still alive. */
32862ec7
PR
4900 if (time_after(tconn->last_received,
4901 jiffies - tconn->meta.socket->sk->sk_rcvtimeo))
cb6518cb 4902 continue;
f36af18c 4903 if (ping_timeout_active) {
32862ec7 4904 conn_err(tconn, "PingAck did not arrive in time.\n");
b411b363
PR
4905 goto reconnect;
4906 }
32862ec7 4907 set_bit(SEND_PING, &tconn->flags);
b411b363
PR
4908 continue;
4909 } else if (rv == -EINTR) {
4910 continue;
4911 } else {
32862ec7 4912 conn_err(tconn, "sock_recvmsg returned %d\n", rv);
b411b363
PR
4913 goto reconnect;
4914 }
4915
4916 if (received == expect && cmd == NULL) {
8172f3e9 4917 if (decode_header(tconn, h, &pi))
b411b363 4918 goto reconnect;
7201b972 4919 cmd = &asender_tbl[pi.cmd];
1952e916 4920 if (pi.cmd >= ARRAY_SIZE(asender_tbl) || !cmd->fn) {
32862ec7 4921 conn_err(tconn, "unknown command %d on meta (l: %d)\n",
77351055 4922 pi.cmd, pi.size);
b411b363
PR
4923 goto disconnect;
4924 }
4925 expect = cmd->pkt_size;
77351055 4926 if (pi.size != expect - sizeof(struct p_header)) {
32862ec7 4927 conn_err(tconn, "Wrong packet size on meta (c: %d, l: %d)\n",
77351055 4928 pi.cmd, pi.size);
b411b363 4929 goto reconnect;
257d0af6 4930 }
b411b363
PR
4931 }
4932 if (received == expect) {
a4fbda8e
PR
4933 bool rv;
4934
1952e916
AG
4935 rv = cmd->fn(tconn, &pi);
4936 if (!rv) {
4937 conn_err(tconn, "%pf failed\n", cmd->fn);
b411b363 4938 goto reconnect;
1952e916 4939 }
b411b363 4940
a4fbda8e
PR
4941 tconn->last_received = jiffies;
4942
f36af18c
LE
4943 /* the idle_timeout (ping-int)
4944 * has been restored in got_PingAck() */
7201b972 4945 if (cmd == &asender_tbl[P_PING_ACK])
f36af18c
LE
4946 ping_timeout_active = 0;
4947
b411b363
PR
4948 buf = h;
4949 received = 0;
257d0af6 4950 expect = sizeof(struct p_header);
b411b363
PR
4951 cmd = NULL;
4952 }
4953 }
4954
4955 if (0) {
4956reconnect:
bbeb641c 4957 conn_request_state(tconn, NS(conn, C_NETWORK_FAILURE), CS_HARD);
b411b363
PR
4958 }
4959 if (0) {
4960disconnect:
bbeb641c 4961 conn_request_state(tconn, NS(conn, C_DISCONNECTING), CS_HARD);
b411b363 4962 }
32862ec7 4963 clear_bit(SIGNAL_ASENDER, &tconn->flags);
b411b363 4964
32862ec7 4965 conn_info(tconn, "asender terminated\n");
b411b363
PR
4966
4967 return 0;
4968}
This page took 0.451454 seconds and 5 git commands to generate.