drbd: Use ping-timeout when waiting for missing ack packets
[deliverable/linux.git] / drivers / block / drbd / drbd_receiver.c
CommitLineData
b411b363
PR
1/*
2 drbd_receiver.c
3
4 This file is part of DRBD by Philipp Reisner and Lars Ellenberg.
5
6 Copyright (C) 2001-2008, LINBIT Information Technologies GmbH.
7 Copyright (C) 1999-2008, Philipp Reisner <philipp.reisner@linbit.com>.
8 Copyright (C) 2002-2008, Lars Ellenberg <lars.ellenberg@linbit.com>.
9
10 drbd is free software; you can redistribute it and/or modify
11 it under the terms of the GNU General Public License as published by
12 the Free Software Foundation; either version 2, or (at your option)
13 any later version.
14
15 drbd is distributed in the hope that it will be useful,
16 but WITHOUT ANY WARRANTY; without even the implied warranty of
17 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18 GNU General Public License for more details.
19
20 You should have received a copy of the GNU General Public License
21 along with drbd; see the file COPYING. If not, write to
22 the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
23 */
24
25
b411b363
PR
26#include <linux/module.h>
27
28#include <asm/uaccess.h>
29#include <net/sock.h>
30
b411b363
PR
31#include <linux/drbd.h>
32#include <linux/fs.h>
33#include <linux/file.h>
34#include <linux/in.h>
35#include <linux/mm.h>
36#include <linux/memcontrol.h>
37#include <linux/mm_inline.h>
38#include <linux/slab.h>
b411b363
PR
39#include <linux/pkt_sched.h>
40#define __KERNEL_SYSCALLS__
41#include <linux/unistd.h>
42#include <linux/vmalloc.h>
43#include <linux/random.h>
b411b363
PR
44#include <linux/string.h>
45#include <linux/scatterlist.h>
46#include "drbd_int.h"
b411b363
PR
47#include "drbd_req.h"
48
49#include "drbd_vli.h"
50
77351055
PR
51struct packet_info {
52 enum drbd_packet cmd;
53 int size;
54 int vnr;
55};
56
b411b363
PR
57enum finish_epoch {
58 FE_STILL_LIVE,
59 FE_DESTROYED,
60 FE_RECYCLED,
61};
62
65d11ed6 63static int drbd_do_handshake(struct drbd_tconn *tconn);
13e6037d 64static int drbd_do_auth(struct drbd_tconn *tconn);
360cc740 65static int drbd_disconnected(int vnr, void *p, void *data);
b411b363
PR
66
67static enum finish_epoch drbd_may_finish_epoch(struct drbd_conf *, struct drbd_epoch *, enum epoch_event);
00d56944 68static int e_end_block(struct drbd_work *, int);
b411b363 69
b411b363
PR
70
71#define GFP_TRY (__GFP_HIGHMEM | __GFP_NOWARN)
72
45bb912b
LE
73/*
74 * some helper functions to deal with single linked page lists,
75 * page->private being our "next" pointer.
76 */
77
78/* If at least n pages are linked at head, get n pages off.
79 * Otherwise, don't modify head, and return NULL.
80 * Locking is the responsibility of the caller.
81 */
82static struct page *page_chain_del(struct page **head, int n)
83{
84 struct page *page;
85 struct page *tmp;
86
87 BUG_ON(!n);
88 BUG_ON(!head);
89
90 page = *head;
23ce4227
PR
91
92 if (!page)
93 return NULL;
94
45bb912b
LE
95 while (page) {
96 tmp = page_chain_next(page);
97 if (--n == 0)
98 break; /* found sufficient pages */
99 if (tmp == NULL)
100 /* insufficient pages, don't use any of them. */
101 return NULL;
102 page = tmp;
103 }
104
105 /* add end of list marker for the returned list */
106 set_page_private(page, 0);
107 /* actual return value, and adjustment of head */
108 page = *head;
109 *head = tmp;
110 return page;
111}
112
113/* may be used outside of locks to find the tail of a (usually short)
114 * "private" page chain, before adding it back to a global chain head
115 * with page_chain_add() under a spinlock. */
116static struct page *page_chain_tail(struct page *page, int *len)
117{
118 struct page *tmp;
119 int i = 1;
120 while ((tmp = page_chain_next(page)))
121 ++i, page = tmp;
122 if (len)
123 *len = i;
124 return page;
125}
126
127static int page_chain_free(struct page *page)
128{
129 struct page *tmp;
130 int i = 0;
131 page_chain_for_each_safe(page, tmp) {
132 put_page(page);
133 ++i;
134 }
135 return i;
136}
137
138static void page_chain_add(struct page **head,
139 struct page *chain_first, struct page *chain_last)
140{
141#if 1
142 struct page *tmp;
143 tmp = page_chain_tail(chain_first, NULL);
144 BUG_ON(tmp != chain_last);
145#endif
146
147 /* add chain to head */
148 set_page_private(chain_last, (unsigned long)*head);
149 *head = chain_first;
150}
151
152static struct page *drbd_pp_first_pages_or_try_alloc(struct drbd_conf *mdev, int number)
b411b363
PR
153{
154 struct page *page = NULL;
45bb912b
LE
155 struct page *tmp = NULL;
156 int i = 0;
b411b363
PR
157
158 /* Yes, testing drbd_pp_vacant outside the lock is racy.
159 * So what. It saves a spin_lock. */
45bb912b 160 if (drbd_pp_vacant >= number) {
b411b363 161 spin_lock(&drbd_pp_lock);
45bb912b
LE
162 page = page_chain_del(&drbd_pp_pool, number);
163 if (page)
164 drbd_pp_vacant -= number;
b411b363 165 spin_unlock(&drbd_pp_lock);
45bb912b
LE
166 if (page)
167 return page;
b411b363 168 }
45bb912b 169
b411b363
PR
170 /* GFP_TRY, because we must not cause arbitrary write-out: in a DRBD
171 * "criss-cross" setup, that might cause write-out on some other DRBD,
172 * which in turn might block on the other node at this very place. */
45bb912b
LE
173 for (i = 0; i < number; i++) {
174 tmp = alloc_page(GFP_TRY);
175 if (!tmp)
176 break;
177 set_page_private(tmp, (unsigned long)page);
178 page = tmp;
179 }
180
181 if (i == number)
182 return page;
183
184 /* Not enough pages immediately available this time.
185 * No need to jump around here, drbd_pp_alloc will retry this
186 * function "soon". */
187 if (page) {
188 tmp = page_chain_tail(page, NULL);
189 spin_lock(&drbd_pp_lock);
190 page_chain_add(&drbd_pp_pool, page, tmp);
191 drbd_pp_vacant += i;
192 spin_unlock(&drbd_pp_lock);
193 }
194 return NULL;
b411b363
PR
195}
196
b411b363
PR
197static void reclaim_net_ee(struct drbd_conf *mdev, struct list_head *to_be_freed)
198{
db830c46 199 struct drbd_peer_request *peer_req;
b411b363
PR
200 struct list_head *le, *tle;
201
202 /* The EEs are always appended to the end of the list. Since
203 they are sent in order over the wire, they have to finish
204 in order. As soon as we see the first not finished we can
205 stop to examine the list... */
206
207 list_for_each_safe(le, tle, &mdev->net_ee) {
db830c46
AG
208 peer_req = list_entry(le, struct drbd_peer_request, w.list);
209 if (drbd_ee_has_active_page(peer_req))
b411b363
PR
210 break;
211 list_move(le, to_be_freed);
212 }
213}
214
215static void drbd_kick_lo_and_reclaim_net(struct drbd_conf *mdev)
216{
217 LIST_HEAD(reclaimed);
db830c46 218 struct drbd_peer_request *peer_req, *t;
b411b363 219
87eeee41 220 spin_lock_irq(&mdev->tconn->req_lock);
b411b363 221 reclaim_net_ee(mdev, &reclaimed);
87eeee41 222 spin_unlock_irq(&mdev->tconn->req_lock);
b411b363 223
db830c46
AG
224 list_for_each_entry_safe(peer_req, t, &reclaimed, w.list)
225 drbd_free_net_ee(mdev, peer_req);
b411b363
PR
226}
227
228/**
45bb912b 229 * drbd_pp_alloc() - Returns @number pages, retries forever (or until signalled)
b411b363 230 * @mdev: DRBD device.
45bb912b
LE
231 * @number: number of pages requested
232 * @retry: whether to retry, if not enough pages are available right now
233 *
234 * Tries to allocate number pages, first from our own page pool, then from
235 * the kernel, unless this allocation would exceed the max_buffers setting.
236 * Possibly retry until DRBD frees sufficient pages somewhere else.
b411b363 237 *
45bb912b 238 * Returns a page chain linked via page->private.
b411b363 239 */
45bb912b 240static struct page *drbd_pp_alloc(struct drbd_conf *mdev, unsigned number, bool retry)
b411b363
PR
241{
242 struct page *page = NULL;
243 DEFINE_WAIT(wait);
244
45bb912b
LE
245 /* Yes, we may run up to @number over max_buffers. If we
246 * follow it strictly, the admin will get it wrong anyways. */
89e58e75 247 if (atomic_read(&mdev->pp_in_use) < mdev->tconn->net_conf->max_buffers)
45bb912b 248 page = drbd_pp_first_pages_or_try_alloc(mdev, number);
b411b363 249
45bb912b 250 while (page == NULL) {
b411b363
PR
251 prepare_to_wait(&drbd_pp_wait, &wait, TASK_INTERRUPTIBLE);
252
253 drbd_kick_lo_and_reclaim_net(mdev);
254
89e58e75 255 if (atomic_read(&mdev->pp_in_use) < mdev->tconn->net_conf->max_buffers) {
45bb912b 256 page = drbd_pp_first_pages_or_try_alloc(mdev, number);
b411b363
PR
257 if (page)
258 break;
259 }
260
261 if (!retry)
262 break;
263
264 if (signal_pending(current)) {
265 dev_warn(DEV, "drbd_pp_alloc interrupted!\n");
266 break;
267 }
268
269 schedule();
270 }
271 finish_wait(&drbd_pp_wait, &wait);
272
45bb912b
LE
273 if (page)
274 atomic_add(number, &mdev->pp_in_use);
b411b363
PR
275 return page;
276}
277
278/* Must not be used from irq, as that may deadlock: see drbd_pp_alloc.
87eeee41 279 * Is also used from inside an other spin_lock_irq(&mdev->tconn->req_lock);
45bb912b
LE
280 * Either links the page chain back to the global pool,
281 * or returns all pages to the system. */
435f0740 282static void drbd_pp_free(struct drbd_conf *mdev, struct page *page, int is_net)
b411b363 283{
435f0740 284 atomic_t *a = is_net ? &mdev->pp_in_use_by_net : &mdev->pp_in_use;
b411b363 285 int i;
435f0740 286
1816a2b4 287 if (drbd_pp_vacant > (DRBD_MAX_BIO_SIZE/PAGE_SIZE)*minor_count)
45bb912b
LE
288 i = page_chain_free(page);
289 else {
290 struct page *tmp;
291 tmp = page_chain_tail(page, &i);
292 spin_lock(&drbd_pp_lock);
293 page_chain_add(&drbd_pp_pool, page, tmp);
294 drbd_pp_vacant += i;
295 spin_unlock(&drbd_pp_lock);
b411b363 296 }
435f0740 297 i = atomic_sub_return(i, a);
45bb912b 298 if (i < 0)
435f0740
LE
299 dev_warn(DEV, "ASSERTION FAILED: %s: %d < 0\n",
300 is_net ? "pp_in_use_by_net" : "pp_in_use", i);
b411b363
PR
301 wake_up(&drbd_pp_wait);
302}
303
304/*
305You need to hold the req_lock:
306 _drbd_wait_ee_list_empty()
307
308You must not have the req_lock:
309 drbd_free_ee()
310 drbd_alloc_ee()
311 drbd_init_ee()
312 drbd_release_ee()
313 drbd_ee_fix_bhs()
314 drbd_process_done_ee()
315 drbd_clear_done_ee()
316 drbd_wait_ee_list_empty()
317*/
318
f6ffca9f
AG
319struct drbd_peer_request *
320drbd_alloc_ee(struct drbd_conf *mdev, u64 id, sector_t sector,
321 unsigned int data_size, gfp_t gfp_mask) __must_hold(local)
b411b363 322{
db830c46 323 struct drbd_peer_request *peer_req;
b411b363 324 struct page *page;
45bb912b 325 unsigned nr_pages = (data_size + PAGE_SIZE -1) >> PAGE_SHIFT;
b411b363 326
0cf9d27e 327 if (drbd_insert_fault(mdev, DRBD_FAULT_AL_EE))
b411b363
PR
328 return NULL;
329
db830c46
AG
330 peer_req = mempool_alloc(drbd_ee_mempool, gfp_mask & ~__GFP_HIGHMEM);
331 if (!peer_req) {
b411b363
PR
332 if (!(gfp_mask & __GFP_NOWARN))
333 dev_err(DEV, "alloc_ee: Allocation of an EE failed\n");
334 return NULL;
335 }
336
45bb912b
LE
337 page = drbd_pp_alloc(mdev, nr_pages, (gfp_mask & __GFP_WAIT));
338 if (!page)
339 goto fail;
b411b363 340
db830c46
AG
341 drbd_clear_interval(&peer_req->i);
342 peer_req->i.size = data_size;
343 peer_req->i.sector = sector;
344 peer_req->i.local = false;
345 peer_req->i.waiting = false;
346
347 peer_req->epoch = NULL;
a21e9298 348 peer_req->w.mdev = mdev;
db830c46
AG
349 peer_req->pages = page;
350 atomic_set(&peer_req->pending_bios, 0);
351 peer_req->flags = 0;
9a8e7753
AG
352 /*
353 * The block_id is opaque to the receiver. It is not endianness
354 * converted, and sent back to the sender unchanged.
355 */
db830c46 356 peer_req->block_id = id;
b411b363 357
db830c46 358 return peer_req;
b411b363 359
45bb912b 360 fail:
db830c46 361 mempool_free(peer_req, drbd_ee_mempool);
b411b363
PR
362 return NULL;
363}
364
db830c46 365void drbd_free_some_ee(struct drbd_conf *mdev, struct drbd_peer_request *peer_req,
f6ffca9f 366 int is_net)
b411b363 367{
db830c46
AG
368 if (peer_req->flags & EE_HAS_DIGEST)
369 kfree(peer_req->digest);
370 drbd_pp_free(mdev, peer_req->pages, is_net);
371 D_ASSERT(atomic_read(&peer_req->pending_bios) == 0);
372 D_ASSERT(drbd_interval_empty(&peer_req->i));
373 mempool_free(peer_req, drbd_ee_mempool);
b411b363
PR
374}
375
376int drbd_release_ee(struct drbd_conf *mdev, struct list_head *list)
377{
378 LIST_HEAD(work_list);
db830c46 379 struct drbd_peer_request *peer_req, *t;
b411b363 380 int count = 0;
435f0740 381 int is_net = list == &mdev->net_ee;
b411b363 382
87eeee41 383 spin_lock_irq(&mdev->tconn->req_lock);
b411b363 384 list_splice_init(list, &work_list);
87eeee41 385 spin_unlock_irq(&mdev->tconn->req_lock);
b411b363 386
db830c46
AG
387 list_for_each_entry_safe(peer_req, t, &work_list, w.list) {
388 drbd_free_some_ee(mdev, peer_req, is_net);
b411b363
PR
389 count++;
390 }
391 return count;
392}
393
394
32862ec7 395/* See also comments in _req_mod(,BARRIER_ACKED)
b411b363
PR
396 * and receive_Barrier.
397 *
398 * Move entries from net_ee to done_ee, if ready.
399 * Grab done_ee, call all callbacks, free the entries.
400 * The callbacks typically send out ACKs.
401 */
402static int drbd_process_done_ee(struct drbd_conf *mdev)
403{
404 LIST_HEAD(work_list);
405 LIST_HEAD(reclaimed);
db830c46 406 struct drbd_peer_request *peer_req, *t;
b411b363
PR
407 int ok = (mdev->state.conn >= C_WF_REPORT_PARAMS);
408
87eeee41 409 spin_lock_irq(&mdev->tconn->req_lock);
b411b363
PR
410 reclaim_net_ee(mdev, &reclaimed);
411 list_splice_init(&mdev->done_ee, &work_list);
87eeee41 412 spin_unlock_irq(&mdev->tconn->req_lock);
b411b363 413
db830c46
AG
414 list_for_each_entry_safe(peer_req, t, &reclaimed, w.list)
415 drbd_free_net_ee(mdev, peer_req);
b411b363
PR
416
417 /* possible callbacks here:
418 * e_end_block, and e_end_resync_block, e_send_discard_ack.
419 * all ignore the last argument.
420 */
db830c46 421 list_for_each_entry_safe(peer_req, t, &work_list, w.list) {
b411b363 422 /* list_del not necessary, next/prev members not touched */
00d56944 423 ok = peer_req->w.cb(&peer_req->w, !ok) && ok;
db830c46 424 drbd_free_ee(mdev, peer_req);
b411b363
PR
425 }
426 wake_up(&mdev->ee_wait);
427
428 return ok;
429}
430
431void _drbd_wait_ee_list_empty(struct drbd_conf *mdev, struct list_head *head)
432{
433 DEFINE_WAIT(wait);
434
435 /* avoids spin_lock/unlock
436 * and calling prepare_to_wait in the fast path */
437 while (!list_empty(head)) {
438 prepare_to_wait(&mdev->ee_wait, &wait, TASK_UNINTERRUPTIBLE);
87eeee41 439 spin_unlock_irq(&mdev->tconn->req_lock);
7eaceacc 440 io_schedule();
b411b363 441 finish_wait(&mdev->ee_wait, &wait);
87eeee41 442 spin_lock_irq(&mdev->tconn->req_lock);
b411b363
PR
443 }
444}
445
446void drbd_wait_ee_list_empty(struct drbd_conf *mdev, struct list_head *head)
447{
87eeee41 448 spin_lock_irq(&mdev->tconn->req_lock);
b411b363 449 _drbd_wait_ee_list_empty(mdev, head);
87eeee41 450 spin_unlock_irq(&mdev->tconn->req_lock);
b411b363
PR
451}
452
453/* see also kernel_accept; which is only present since 2.6.18.
454 * also we want to log which part of it failed, exactly */
7653620d 455static int drbd_accept(const char **what, struct socket *sock, struct socket **newsock)
b411b363
PR
456{
457 struct sock *sk = sock->sk;
458 int err = 0;
459
460 *what = "listen";
461 err = sock->ops->listen(sock, 5);
462 if (err < 0)
463 goto out;
464
465 *what = "sock_create_lite";
466 err = sock_create_lite(sk->sk_family, sk->sk_type, sk->sk_protocol,
467 newsock);
468 if (err < 0)
469 goto out;
470
471 *what = "accept";
472 err = sock->ops->accept(sock, *newsock, 0);
473 if (err < 0) {
474 sock_release(*newsock);
475 *newsock = NULL;
476 goto out;
477 }
478 (*newsock)->ops = sock->ops;
479
480out:
481 return err;
482}
483
dbd9eea0 484static int drbd_recv_short(struct socket *sock, void *buf, size_t size, int flags)
b411b363
PR
485{
486 mm_segment_t oldfs;
487 struct kvec iov = {
488 .iov_base = buf,
489 .iov_len = size,
490 };
491 struct msghdr msg = {
492 .msg_iovlen = 1,
493 .msg_iov = (struct iovec *)&iov,
494 .msg_flags = (flags ? flags : MSG_WAITALL | MSG_NOSIGNAL)
495 };
496 int rv;
497
498 oldfs = get_fs();
499 set_fs(KERNEL_DS);
500 rv = sock_recvmsg(sock, &msg, size, msg.msg_flags);
501 set_fs(oldfs);
502
503 return rv;
504}
505
de0ff338 506static int drbd_recv(struct drbd_tconn *tconn, void *buf, size_t size)
b411b363
PR
507{
508 mm_segment_t oldfs;
509 struct kvec iov = {
510 .iov_base = buf,
511 .iov_len = size,
512 };
513 struct msghdr msg = {
514 .msg_iovlen = 1,
515 .msg_iov = (struct iovec *)&iov,
516 .msg_flags = MSG_WAITALL | MSG_NOSIGNAL
517 };
518 int rv;
519
520 oldfs = get_fs();
521 set_fs(KERNEL_DS);
522
523 for (;;) {
de0ff338 524 rv = sock_recvmsg(tconn->data.socket, &msg, size, msg.msg_flags);
b411b363
PR
525 if (rv == size)
526 break;
527
528 /* Note:
529 * ECONNRESET other side closed the connection
530 * ERESTARTSYS (on sock) we got a signal
531 */
532
533 if (rv < 0) {
534 if (rv == -ECONNRESET)
de0ff338 535 conn_info(tconn, "sock was reset by peer\n");
b411b363 536 else if (rv != -ERESTARTSYS)
de0ff338 537 conn_err(tconn, "sock_recvmsg returned %d\n", rv);
b411b363
PR
538 break;
539 } else if (rv == 0) {
de0ff338 540 conn_info(tconn, "sock was shut down by peer\n");
b411b363
PR
541 break;
542 } else {
543 /* signal came in, or peer/link went down,
544 * after we read a partial message
545 */
546 /* D_ASSERT(signal_pending(current)); */
547 break;
548 }
549 };
550
551 set_fs(oldfs);
552
553 if (rv != size)
bbeb641c 554 conn_request_state(tconn, NS(conn, C_BROKEN_PIPE), CS_HARD);
b411b363
PR
555
556 return rv;
557}
558
5dbf1673
LE
559/* quoting tcp(7):
560 * On individual connections, the socket buffer size must be set prior to the
561 * listen(2) or connect(2) calls in order to have it take effect.
562 * This is our wrapper to do so.
563 */
564static void drbd_setbufsize(struct socket *sock, unsigned int snd,
565 unsigned int rcv)
566{
567 /* open coded SO_SNDBUF, SO_RCVBUF */
568 if (snd) {
569 sock->sk->sk_sndbuf = snd;
570 sock->sk->sk_userlocks |= SOCK_SNDBUF_LOCK;
571 }
572 if (rcv) {
573 sock->sk->sk_rcvbuf = rcv;
574 sock->sk->sk_userlocks |= SOCK_RCVBUF_LOCK;
575 }
576}
577
eac3e990 578static struct socket *drbd_try_connect(struct drbd_tconn *tconn)
b411b363
PR
579{
580 const char *what;
581 struct socket *sock;
582 struct sockaddr_in6 src_in6;
583 int err;
584 int disconnect_on_error = 1;
585
eac3e990 586 if (!get_net_conf(tconn))
b411b363
PR
587 return NULL;
588
589 what = "sock_create_kern";
eac3e990 590 err = sock_create_kern(((struct sockaddr *)tconn->net_conf->my_addr)->sa_family,
b411b363
PR
591 SOCK_STREAM, IPPROTO_TCP, &sock);
592 if (err < 0) {
593 sock = NULL;
594 goto out;
595 }
596
597 sock->sk->sk_rcvtimeo =
eac3e990
PR
598 sock->sk->sk_sndtimeo = tconn->net_conf->try_connect_int*HZ;
599 drbd_setbufsize(sock, tconn->net_conf->sndbuf_size,
600 tconn->net_conf->rcvbuf_size);
b411b363
PR
601
602 /* explicitly bind to the configured IP as source IP
603 * for the outgoing connections.
604 * This is needed for multihomed hosts and to be
605 * able to use lo: interfaces for drbd.
606 * Make sure to use 0 as port number, so linux selects
607 * a free one dynamically.
608 */
eac3e990
PR
609 memcpy(&src_in6, tconn->net_conf->my_addr,
610 min_t(int, tconn->net_conf->my_addr_len, sizeof(src_in6)));
611 if (((struct sockaddr *)tconn->net_conf->my_addr)->sa_family == AF_INET6)
b411b363
PR
612 src_in6.sin6_port = 0;
613 else
614 ((struct sockaddr_in *)&src_in6)->sin_port = 0; /* AF_INET & AF_SCI */
615
616 what = "bind before connect";
617 err = sock->ops->bind(sock,
618 (struct sockaddr *) &src_in6,
eac3e990 619 tconn->net_conf->my_addr_len);
b411b363
PR
620 if (err < 0)
621 goto out;
622
623 /* connect may fail, peer not yet available.
624 * stay C_WF_CONNECTION, don't go Disconnecting! */
625 disconnect_on_error = 0;
626 what = "connect";
627 err = sock->ops->connect(sock,
eac3e990
PR
628 (struct sockaddr *)tconn->net_conf->peer_addr,
629 tconn->net_conf->peer_addr_len, 0);
b411b363
PR
630
631out:
632 if (err < 0) {
633 if (sock) {
634 sock_release(sock);
635 sock = NULL;
636 }
637 switch (-err) {
638 /* timeout, busy, signal pending */
639 case ETIMEDOUT: case EAGAIN: case EINPROGRESS:
640 case EINTR: case ERESTARTSYS:
641 /* peer not (yet) available, network problem */
642 case ECONNREFUSED: case ENETUNREACH:
643 case EHOSTDOWN: case EHOSTUNREACH:
644 disconnect_on_error = 0;
645 break;
646 default:
eac3e990 647 conn_err(tconn, "%s failed, err = %d\n", what, err);
b411b363
PR
648 }
649 if (disconnect_on_error)
bbeb641c 650 conn_request_state(tconn, NS(conn, C_DISCONNECTING), CS_HARD);
b411b363 651 }
eac3e990 652 put_net_conf(tconn);
b411b363
PR
653 return sock;
654}
655
7653620d 656static struct socket *drbd_wait_for_connect(struct drbd_tconn *tconn)
b411b363
PR
657{
658 int timeo, err;
659 struct socket *s_estab = NULL, *s_listen;
660 const char *what;
661
7653620d 662 if (!get_net_conf(tconn))
b411b363
PR
663 return NULL;
664
665 what = "sock_create_kern";
7653620d 666 err = sock_create_kern(((struct sockaddr *)tconn->net_conf->my_addr)->sa_family,
b411b363
PR
667 SOCK_STREAM, IPPROTO_TCP, &s_listen);
668 if (err) {
669 s_listen = NULL;
670 goto out;
671 }
672
7653620d 673 timeo = tconn->net_conf->try_connect_int * HZ;
b411b363
PR
674 timeo += (random32() & 1) ? timeo / 7 : -timeo / 7; /* 28.5% random jitter */
675
676 s_listen->sk->sk_reuse = 1; /* SO_REUSEADDR */
677 s_listen->sk->sk_rcvtimeo = timeo;
678 s_listen->sk->sk_sndtimeo = timeo;
7653620d
PR
679 drbd_setbufsize(s_listen, tconn->net_conf->sndbuf_size,
680 tconn->net_conf->rcvbuf_size);
b411b363
PR
681
682 what = "bind before listen";
683 err = s_listen->ops->bind(s_listen,
7653620d
PR
684 (struct sockaddr *) tconn->net_conf->my_addr,
685 tconn->net_conf->my_addr_len);
b411b363
PR
686 if (err < 0)
687 goto out;
688
7653620d 689 err = drbd_accept(&what, s_listen, &s_estab);
b411b363
PR
690
691out:
692 if (s_listen)
693 sock_release(s_listen);
694 if (err < 0) {
695 if (err != -EAGAIN && err != -EINTR && err != -ERESTARTSYS) {
7653620d 696 conn_err(tconn, "%s failed, err = %d\n", what, err);
bbeb641c 697 conn_request_state(tconn, NS(conn, C_DISCONNECTING), CS_HARD);
b411b363
PR
698 }
699 }
7653620d 700 put_net_conf(tconn);
b411b363
PR
701
702 return s_estab;
703}
704
d38e787e 705static int drbd_send_fp(struct drbd_tconn *tconn, struct socket *sock, enum drbd_packet cmd)
b411b363 706{
d38e787e 707 struct p_header *h = &tconn->data.sbuf.header;
b411b363 708
d38e787e 709 return _conn_send_cmd(tconn, 0, sock, cmd, h, sizeof(*h), 0);
b411b363
PR
710}
711
a25b63f1 712static enum drbd_packet drbd_recv_fp(struct drbd_tconn *tconn, struct socket *sock)
b411b363 713{
a25b63f1 714 struct p_header80 *h = &tconn->data.rbuf.header.h80;
b411b363
PR
715 int rr;
716
dbd9eea0 717 rr = drbd_recv_short(sock, h, sizeof(*h), 0);
b411b363 718
ca9bc12b 719 if (rr == sizeof(*h) && h->magic == cpu_to_be32(DRBD_MAGIC))
b411b363
PR
720 return be16_to_cpu(h->command);
721
722 return 0xffff;
723}
724
725/**
726 * drbd_socket_okay() - Free the socket if its connection is not okay
b411b363
PR
727 * @sock: pointer to the pointer to the socket.
728 */
dbd9eea0 729static int drbd_socket_okay(struct socket **sock)
b411b363
PR
730{
731 int rr;
732 char tb[4];
733
734 if (!*sock)
81e84650 735 return false;
b411b363 736
dbd9eea0 737 rr = drbd_recv_short(*sock, tb, 4, MSG_DONTWAIT | MSG_PEEK);
b411b363
PR
738
739 if (rr > 0 || rr == -EAGAIN) {
81e84650 740 return true;
b411b363
PR
741 } else {
742 sock_release(*sock);
743 *sock = NULL;
81e84650 744 return false;
b411b363
PR
745 }
746}
747
907599e0
PR
748static int drbd_connected(int vnr, void *p, void *data)
749{
750 struct drbd_conf *mdev = (struct drbd_conf *)p;
751 int ok = 1;
752
753 atomic_set(&mdev->packet_seq, 0);
754 mdev->peer_seq = 0;
755
8410da8f
PR
756 mdev->state_mutex = mdev->tconn->agreed_pro_version < 100 ?
757 &mdev->tconn->cstate_mutex :
758 &mdev->own_state_mutex;
759
907599e0
PR
760 ok &= drbd_send_sync_param(mdev, &mdev->sync_conf);
761 ok &= drbd_send_sizes(mdev, 0, 0);
762 ok &= drbd_send_uuids(mdev);
763 ok &= drbd_send_state(mdev);
764 clear_bit(USE_DEGR_WFC_T, &mdev->flags);
765 clear_bit(RESIZE_PENDING, &mdev->flags);
766
8410da8f 767
907599e0
PR
768 return !ok;
769}
770
b411b363
PR
771/*
772 * return values:
773 * 1 yes, we have a valid connection
774 * 0 oops, did not work out, please try again
775 * -1 peer talks different language,
776 * no point in trying again, please go standalone.
777 * -2 We do not have a network config...
778 */
907599e0 779static int drbd_connect(struct drbd_tconn *tconn)
b411b363
PR
780{
781 struct socket *s, *sock, *msock;
782 int try, h, ok;
783
bbeb641c 784 if (conn_request_state(tconn, NS(conn, C_WF_CONNECTION), CS_VERBOSE) < SS_SUCCESS)
b411b363
PR
785 return -2;
786
907599e0
PR
787 clear_bit(DISCARD_CONCURRENT, &tconn->flags);
788 tconn->agreed_pro_version = 99;
fd340c12
PR
789 /* agreed_pro_version must be smaller than 100 so we send the old
790 header (h80) in the first packet and in the handshake packet. */
b411b363
PR
791
792 sock = NULL;
793 msock = NULL;
794
795 do {
796 for (try = 0;;) {
797 /* 3 tries, this should take less than a second! */
907599e0 798 s = drbd_try_connect(tconn);
b411b363
PR
799 if (s || ++try >= 3)
800 break;
801 /* give the other side time to call bind() & listen() */
20ee6390 802 schedule_timeout_interruptible(HZ / 10);
b411b363
PR
803 }
804
805 if (s) {
806 if (!sock) {
907599e0 807 drbd_send_fp(tconn, s, P_HAND_SHAKE_S);
b411b363
PR
808 sock = s;
809 s = NULL;
810 } else if (!msock) {
907599e0 811 drbd_send_fp(tconn, s, P_HAND_SHAKE_M);
b411b363
PR
812 msock = s;
813 s = NULL;
814 } else {
907599e0 815 conn_err(tconn, "Logic error in drbd_connect()\n");
b411b363
PR
816 goto out_release_sockets;
817 }
818 }
819
820 if (sock && msock) {
907599e0 821 schedule_timeout_interruptible(tconn->net_conf->ping_timeo*HZ/10);
dbd9eea0
PR
822 ok = drbd_socket_okay(&sock);
823 ok = drbd_socket_okay(&msock) && ok;
b411b363
PR
824 if (ok)
825 break;
826 }
827
828retry:
907599e0 829 s = drbd_wait_for_connect(tconn);
b411b363 830 if (s) {
907599e0 831 try = drbd_recv_fp(tconn, s);
dbd9eea0
PR
832 drbd_socket_okay(&sock);
833 drbd_socket_okay(&msock);
b411b363
PR
834 switch (try) {
835 case P_HAND_SHAKE_S:
836 if (sock) {
907599e0 837 conn_warn(tconn, "initial packet S crossed\n");
b411b363
PR
838 sock_release(sock);
839 }
840 sock = s;
841 break;
842 case P_HAND_SHAKE_M:
843 if (msock) {
907599e0 844 conn_warn(tconn, "initial packet M crossed\n");
b411b363
PR
845 sock_release(msock);
846 }
847 msock = s;
907599e0 848 set_bit(DISCARD_CONCURRENT, &tconn->flags);
b411b363
PR
849 break;
850 default:
907599e0 851 conn_warn(tconn, "Error receiving initial packet\n");
b411b363
PR
852 sock_release(s);
853 if (random32() & 1)
854 goto retry;
855 }
856 }
857
bbeb641c 858 if (tconn->cstate <= C_DISCONNECTING)
b411b363
PR
859 goto out_release_sockets;
860 if (signal_pending(current)) {
861 flush_signals(current);
862 smp_rmb();
907599e0 863 if (get_t_state(&tconn->receiver) == EXITING)
b411b363
PR
864 goto out_release_sockets;
865 }
866
867 if (sock && msock) {
dbd9eea0
PR
868 ok = drbd_socket_okay(&sock);
869 ok = drbd_socket_okay(&msock) && ok;
b411b363
PR
870 if (ok)
871 break;
872 }
873 } while (1);
874
875 msock->sk->sk_reuse = 1; /* SO_REUSEADDR */
876 sock->sk->sk_reuse = 1; /* SO_REUSEADDR */
877
878 sock->sk->sk_allocation = GFP_NOIO;
879 msock->sk->sk_allocation = GFP_NOIO;
880
881 sock->sk->sk_priority = TC_PRIO_INTERACTIVE_BULK;
882 msock->sk->sk_priority = TC_PRIO_INTERACTIVE;
883
b411b363 884 /* NOT YET ...
907599e0 885 * sock->sk->sk_sndtimeo = tconn->net_conf->timeout*HZ/10;
b411b363
PR
886 * sock->sk->sk_rcvtimeo = MAX_SCHEDULE_TIMEOUT;
887 * first set it to the P_HAND_SHAKE timeout,
888 * which we set to 4x the configured ping_timeout. */
889 sock->sk->sk_sndtimeo =
907599e0 890 sock->sk->sk_rcvtimeo = tconn->net_conf->ping_timeo*4*HZ/10;
b411b363 891
907599e0
PR
892 msock->sk->sk_sndtimeo = tconn->net_conf->timeout*HZ/10;
893 msock->sk->sk_rcvtimeo = tconn->net_conf->ping_int*HZ;
b411b363
PR
894
895 /* we don't want delays.
25985edc 896 * we use TCP_CORK where appropriate, though */
b411b363
PR
897 drbd_tcp_nodelay(sock);
898 drbd_tcp_nodelay(msock);
899
907599e0
PR
900 tconn->data.socket = sock;
901 tconn->meta.socket = msock;
902 tconn->last_received = jiffies;
b411b363 903
907599e0 904 h = drbd_do_handshake(tconn);
b411b363
PR
905 if (h <= 0)
906 return h;
907
907599e0 908 if (tconn->cram_hmac_tfm) {
b411b363 909 /* drbd_request_state(mdev, NS(conn, WFAuth)); */
907599e0 910 switch (drbd_do_auth(tconn)) {
b10d96cb 911 case -1:
907599e0 912 conn_err(tconn, "Authentication of peer failed\n");
b411b363 913 return -1;
b10d96cb 914 case 0:
907599e0 915 conn_err(tconn, "Authentication of peer failed, trying again.\n");
b10d96cb 916 return 0;
b411b363
PR
917 }
918 }
919
bbeb641c 920 if (conn_request_state(tconn, NS(conn, C_WF_REPORT_PARAMS), CS_VERBOSE) < SS_SUCCESS)
b411b363
PR
921 return 0;
922
907599e0 923 sock->sk->sk_sndtimeo = tconn->net_conf->timeout*HZ/10;
b411b363
PR
924 sock->sk->sk_rcvtimeo = MAX_SCHEDULE_TIMEOUT;
925
907599e0 926 drbd_thread_start(&tconn->asender);
b411b363 927
907599e0 928 if (drbd_send_protocol(tconn) == -1)
7e2455c1 929 return -1;
b411b363 930
907599e0 931 return !idr_for_each(&tconn->volumes, drbd_connected, tconn);
b411b363
PR
932
933out_release_sockets:
934 if (sock)
935 sock_release(sock);
936 if (msock)
937 sock_release(msock);
938 return -1;
939}
940
ce243853 941static bool decode_header(struct drbd_tconn *tconn, struct p_header *h, struct packet_info *pi)
b411b363 942{
fd340c12 943 if (h->h80.magic == cpu_to_be32(DRBD_MAGIC)) {
77351055
PR
944 pi->cmd = be16_to_cpu(h->h80.command);
945 pi->size = be16_to_cpu(h->h80.length);
eefc2f7d 946 pi->vnr = 0;
ca9bc12b 947 } else if (h->h95.magic == cpu_to_be16(DRBD_MAGIC_BIG)) {
77351055
PR
948 pi->cmd = be16_to_cpu(h->h95.command);
949 pi->size = be32_to_cpu(h->h95.length) & 0x00ffffff;
950 pi->vnr = 0;
02918be2 951 } else {
ce243853 952 conn_err(tconn, "magic?? on data m: 0x%08x c: %d l: %d\n",
004352fa
LE
953 be32_to_cpu(h->h80.magic),
954 be16_to_cpu(h->h80.command),
955 be16_to_cpu(h->h80.length));
81e84650 956 return false;
b411b363 957 }
257d0af6
PR
958 return true;
959}
960
9ba7aa00 961static int drbd_recv_header(struct drbd_tconn *tconn, struct packet_info *pi)
257d0af6 962{
9ba7aa00 963 struct p_header *h = &tconn->data.rbuf.header;
257d0af6
PR
964 int r;
965
9ba7aa00 966 r = drbd_recv(tconn, h, sizeof(*h));
257d0af6
PR
967 if (unlikely(r != sizeof(*h))) {
968 if (!signal_pending(current))
9ba7aa00 969 conn_warn(tconn, "short read expecting header on sock: r=%d\n", r);
257d0af6
PR
970 return false;
971 }
972
9ba7aa00
PR
973 r = decode_header(tconn, h, pi);
974 tconn->last_received = jiffies;
b411b363 975
257d0af6 976 return r;
b411b363
PR
977}
978
2451fc3b 979static void drbd_flush(struct drbd_conf *mdev)
b411b363
PR
980{
981 int rv;
982
983 if (mdev->write_ordering >= WO_bdev_flush && get_ldev(mdev)) {
fbd9b09a 984 rv = blkdev_issue_flush(mdev->ldev->backing_bdev, GFP_KERNEL,
dd3932ed 985 NULL);
b411b363
PR
986 if (rv) {
987 dev_err(DEV, "local disk flush failed with status %d\n", rv);
988 /* would rather check on EOPNOTSUPP, but that is not reliable.
989 * don't try again for ANY return value != 0
990 * if (rv == -EOPNOTSUPP) */
991 drbd_bump_write_ordering(mdev, WO_drain_io);
992 }
993 put_ldev(mdev);
994 }
b411b363
PR
995}
996
997/**
998 * drbd_may_finish_epoch() - Applies an epoch_event to the epoch's state, eventually finishes it.
999 * @mdev: DRBD device.
1000 * @epoch: Epoch object.
1001 * @ev: Epoch event.
1002 */
1003static enum finish_epoch drbd_may_finish_epoch(struct drbd_conf *mdev,
1004 struct drbd_epoch *epoch,
1005 enum epoch_event ev)
1006{
2451fc3b 1007 int epoch_size;
b411b363 1008 struct drbd_epoch *next_epoch;
b411b363
PR
1009 enum finish_epoch rv = FE_STILL_LIVE;
1010
1011 spin_lock(&mdev->epoch_lock);
1012 do {
1013 next_epoch = NULL;
b411b363
PR
1014
1015 epoch_size = atomic_read(&epoch->epoch_size);
1016
1017 switch (ev & ~EV_CLEANUP) {
1018 case EV_PUT:
1019 atomic_dec(&epoch->active);
1020 break;
1021 case EV_GOT_BARRIER_NR:
1022 set_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags);
b411b363
PR
1023 break;
1024 case EV_BECAME_LAST:
1025 /* nothing to do*/
1026 break;
1027 }
1028
b411b363
PR
1029 if (epoch_size != 0 &&
1030 atomic_read(&epoch->active) == 0 &&
2451fc3b 1031 test_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags)) {
b411b363
PR
1032 if (!(ev & EV_CLEANUP)) {
1033 spin_unlock(&mdev->epoch_lock);
1034 drbd_send_b_ack(mdev, epoch->barrier_nr, epoch_size);
1035 spin_lock(&mdev->epoch_lock);
1036 }
1037 dec_unacked(mdev);
1038
1039 if (mdev->current_epoch != epoch) {
1040 next_epoch = list_entry(epoch->list.next, struct drbd_epoch, list);
1041 list_del(&epoch->list);
1042 ev = EV_BECAME_LAST | (ev & EV_CLEANUP);
1043 mdev->epochs--;
b411b363
PR
1044 kfree(epoch);
1045
1046 if (rv == FE_STILL_LIVE)
1047 rv = FE_DESTROYED;
1048 } else {
1049 epoch->flags = 0;
1050 atomic_set(&epoch->epoch_size, 0);
698f9315 1051 /* atomic_set(&epoch->active, 0); is already zero */
b411b363
PR
1052 if (rv == FE_STILL_LIVE)
1053 rv = FE_RECYCLED;
2451fc3b 1054 wake_up(&mdev->ee_wait);
b411b363
PR
1055 }
1056 }
1057
1058 if (!next_epoch)
1059 break;
1060
1061 epoch = next_epoch;
1062 } while (1);
1063
1064 spin_unlock(&mdev->epoch_lock);
1065
b411b363
PR
1066 return rv;
1067}
1068
1069/**
1070 * drbd_bump_write_ordering() - Fall back to an other write ordering method
1071 * @mdev: DRBD device.
1072 * @wo: Write ordering method to try.
1073 */
1074void drbd_bump_write_ordering(struct drbd_conf *mdev, enum write_ordering_e wo) __must_hold(local)
1075{
1076 enum write_ordering_e pwo;
1077 static char *write_ordering_str[] = {
1078 [WO_none] = "none",
1079 [WO_drain_io] = "drain",
1080 [WO_bdev_flush] = "flush",
b411b363
PR
1081 };
1082
1083 pwo = mdev->write_ordering;
1084 wo = min(pwo, wo);
b411b363
PR
1085 if (wo == WO_bdev_flush && mdev->ldev->dc.no_disk_flush)
1086 wo = WO_drain_io;
1087 if (wo == WO_drain_io && mdev->ldev->dc.no_disk_drain)
1088 wo = WO_none;
1089 mdev->write_ordering = wo;
2451fc3b 1090 if (pwo != mdev->write_ordering || wo == WO_bdev_flush)
b411b363
PR
1091 dev_info(DEV, "Method to ensure write ordering: %s\n", write_ordering_str[mdev->write_ordering]);
1092}
1093
45bb912b 1094/**
fbe29dec 1095 * drbd_submit_peer_request()
45bb912b 1096 * @mdev: DRBD device.
db830c46 1097 * @peer_req: peer request
45bb912b 1098 * @rw: flag field, see bio->bi_rw
10f6d992
LE
1099 *
1100 * May spread the pages to multiple bios,
1101 * depending on bio_add_page restrictions.
1102 *
1103 * Returns 0 if all bios have been submitted,
1104 * -ENOMEM if we could not allocate enough bios,
1105 * -ENOSPC (any better suggestion?) if we have not been able to bio_add_page a
1106 * single page to an empty bio (which should never happen and likely indicates
1107 * that the lower level IO stack is in some way broken). This has been observed
1108 * on certain Xen deployments.
45bb912b
LE
1109 */
1110/* TODO allocate from our own bio_set. */
fbe29dec
AG
1111int drbd_submit_peer_request(struct drbd_conf *mdev,
1112 struct drbd_peer_request *peer_req,
1113 const unsigned rw, const int fault_type)
45bb912b
LE
1114{
1115 struct bio *bios = NULL;
1116 struct bio *bio;
db830c46
AG
1117 struct page *page = peer_req->pages;
1118 sector_t sector = peer_req->i.sector;
1119 unsigned ds = peer_req->i.size;
45bb912b
LE
1120 unsigned n_bios = 0;
1121 unsigned nr_pages = (ds + PAGE_SIZE -1) >> PAGE_SHIFT;
10f6d992 1122 int err = -ENOMEM;
45bb912b
LE
1123
1124 /* In most cases, we will only need one bio. But in case the lower
1125 * level restrictions happen to be different at this offset on this
1126 * side than those of the sending peer, we may need to submit the
1127 * request in more than one bio. */
1128next_bio:
1129 bio = bio_alloc(GFP_NOIO, nr_pages);
1130 if (!bio) {
1131 dev_err(DEV, "submit_ee: Allocation of a bio failed\n");
1132 goto fail;
1133 }
db830c46 1134 /* > peer_req->i.sector, unless this is the first bio */
45bb912b
LE
1135 bio->bi_sector = sector;
1136 bio->bi_bdev = mdev->ldev->backing_bdev;
45bb912b 1137 bio->bi_rw = rw;
db830c46 1138 bio->bi_private = peer_req;
fcefa62e 1139 bio->bi_end_io = drbd_peer_request_endio;
45bb912b
LE
1140
1141 bio->bi_next = bios;
1142 bios = bio;
1143 ++n_bios;
1144
1145 page_chain_for_each(page) {
1146 unsigned len = min_t(unsigned, ds, PAGE_SIZE);
1147 if (!bio_add_page(bio, page, len, 0)) {
10f6d992
LE
1148 /* A single page must always be possible!
1149 * But in case it fails anyways,
1150 * we deal with it, and complain (below). */
1151 if (bio->bi_vcnt == 0) {
1152 dev_err(DEV,
1153 "bio_add_page failed for len=%u, "
1154 "bi_vcnt=0 (bi_sector=%llu)\n",
1155 len, (unsigned long long)bio->bi_sector);
1156 err = -ENOSPC;
1157 goto fail;
1158 }
45bb912b
LE
1159 goto next_bio;
1160 }
1161 ds -= len;
1162 sector += len >> 9;
1163 --nr_pages;
1164 }
1165 D_ASSERT(page == NULL);
1166 D_ASSERT(ds == 0);
1167
db830c46 1168 atomic_set(&peer_req->pending_bios, n_bios);
45bb912b
LE
1169 do {
1170 bio = bios;
1171 bios = bios->bi_next;
1172 bio->bi_next = NULL;
1173
45bb912b 1174 drbd_generic_make_request(mdev, fault_type, bio);
45bb912b 1175 } while (bios);
45bb912b
LE
1176 return 0;
1177
1178fail:
1179 while (bios) {
1180 bio = bios;
1181 bios = bios->bi_next;
1182 bio_put(bio);
1183 }
10f6d992 1184 return err;
45bb912b
LE
1185}
1186
53840641 1187static void drbd_remove_epoch_entry_interval(struct drbd_conf *mdev,
db830c46 1188 struct drbd_peer_request *peer_req)
53840641 1189{
db830c46 1190 struct drbd_interval *i = &peer_req->i;
53840641
AG
1191
1192 drbd_remove_interval(&mdev->write_requests, i);
1193 drbd_clear_interval(i);
1194
6c852bec 1195 /* Wake up any processes waiting for this peer request to complete. */
53840641
AG
1196 if (i->waiting)
1197 wake_up(&mdev->misc_wait);
1198}
1199
d8763023
AG
1200static int receive_Barrier(struct drbd_conf *mdev, enum drbd_packet cmd,
1201 unsigned int data_size)
b411b363 1202{
2451fc3b 1203 int rv;
e42325a5 1204 struct p_barrier *p = &mdev->tconn->data.rbuf.barrier;
b411b363
PR
1205 struct drbd_epoch *epoch;
1206
b411b363
PR
1207 inc_unacked(mdev);
1208
b411b363
PR
1209 mdev->current_epoch->barrier_nr = p->barrier;
1210 rv = drbd_may_finish_epoch(mdev, mdev->current_epoch, EV_GOT_BARRIER_NR);
1211
1212 /* P_BARRIER_ACK may imply that the corresponding extent is dropped from
1213 * the activity log, which means it would not be resynced in case the
1214 * R_PRIMARY crashes now.
1215 * Therefore we must send the barrier_ack after the barrier request was
1216 * completed. */
1217 switch (mdev->write_ordering) {
b411b363
PR
1218 case WO_none:
1219 if (rv == FE_RECYCLED)
81e84650 1220 return true;
2451fc3b
PR
1221
1222 /* receiver context, in the writeout path of the other node.
1223 * avoid potential distributed deadlock */
1224 epoch = kmalloc(sizeof(struct drbd_epoch), GFP_NOIO);
1225 if (epoch)
1226 break;
1227 else
1228 dev_warn(DEV, "Allocation of an epoch failed, slowing down\n");
1229 /* Fall through */
b411b363
PR
1230
1231 case WO_bdev_flush:
1232 case WO_drain_io:
b411b363 1233 drbd_wait_ee_list_empty(mdev, &mdev->active_ee);
2451fc3b
PR
1234 drbd_flush(mdev);
1235
1236 if (atomic_read(&mdev->current_epoch->epoch_size)) {
1237 epoch = kmalloc(sizeof(struct drbd_epoch), GFP_NOIO);
1238 if (epoch)
1239 break;
b411b363
PR
1240 }
1241
2451fc3b
PR
1242 epoch = mdev->current_epoch;
1243 wait_event(mdev->ee_wait, atomic_read(&epoch->epoch_size) == 0);
1244
1245 D_ASSERT(atomic_read(&epoch->active) == 0);
1246 D_ASSERT(epoch->flags == 0);
b411b363 1247
81e84650 1248 return true;
2451fc3b
PR
1249 default:
1250 dev_err(DEV, "Strangeness in mdev->write_ordering %d\n", mdev->write_ordering);
81e84650 1251 return false;
b411b363
PR
1252 }
1253
1254 epoch->flags = 0;
1255 atomic_set(&epoch->epoch_size, 0);
1256 atomic_set(&epoch->active, 0);
1257
1258 spin_lock(&mdev->epoch_lock);
1259 if (atomic_read(&mdev->current_epoch->epoch_size)) {
1260 list_add(&epoch->list, &mdev->current_epoch->list);
1261 mdev->current_epoch = epoch;
1262 mdev->epochs++;
b411b363
PR
1263 } else {
1264 /* The current_epoch got recycled while we allocated this one... */
1265 kfree(epoch);
1266 }
1267 spin_unlock(&mdev->epoch_lock);
1268
81e84650 1269 return true;
b411b363
PR
1270}
1271
1272/* used from receive_RSDataReply (recv_resync_read)
1273 * and from receive_Data */
f6ffca9f
AG
1274static struct drbd_peer_request *
1275read_in_block(struct drbd_conf *mdev, u64 id, sector_t sector,
1276 int data_size) __must_hold(local)
b411b363 1277{
6666032a 1278 const sector_t capacity = drbd_get_capacity(mdev->this_bdev);
db830c46 1279 struct drbd_peer_request *peer_req;
b411b363 1280 struct page *page;
45bb912b 1281 int dgs, ds, rr;
a0638456
PR
1282 void *dig_in = mdev->tconn->int_dig_in;
1283 void *dig_vv = mdev->tconn->int_dig_vv;
6b4388ac 1284 unsigned long *data;
b411b363 1285
a0638456
PR
1286 dgs = (mdev->tconn->agreed_pro_version >= 87 && mdev->tconn->integrity_r_tfm) ?
1287 crypto_hash_digestsize(mdev->tconn->integrity_r_tfm) : 0;
b411b363
PR
1288
1289 if (dgs) {
de0ff338 1290 rr = drbd_recv(mdev->tconn, dig_in, dgs);
b411b363 1291 if (rr != dgs) {
0ddc5549
LE
1292 if (!signal_pending(current))
1293 dev_warn(DEV,
1294 "short read receiving data digest: read %d expected %d\n",
1295 rr, dgs);
b411b363
PR
1296 return NULL;
1297 }
1298 }
1299
1300 data_size -= dgs;
1301
841ce241
AG
1302 if (!expect(data_size != 0))
1303 return NULL;
1304 if (!expect(IS_ALIGNED(data_size, 512)))
1305 return NULL;
1306 if (!expect(data_size <= DRBD_MAX_BIO_SIZE))
1307 return NULL;
b411b363 1308
6666032a
LE
1309 /* even though we trust out peer,
1310 * we sometimes have to double check. */
1311 if (sector + (data_size>>9) > capacity) {
fdda6544
LE
1312 dev_err(DEV, "request from peer beyond end of local disk: "
1313 "capacity: %llus < sector: %llus + size: %u\n",
6666032a
LE
1314 (unsigned long long)capacity,
1315 (unsigned long long)sector, data_size);
1316 return NULL;
1317 }
1318
b411b363
PR
1319 /* GFP_NOIO, because we must not cause arbitrary write-out: in a DRBD
1320 * "criss-cross" setup, that might cause write-out on some other DRBD,
1321 * which in turn might block on the other node at this very place. */
db830c46
AG
1322 peer_req = drbd_alloc_ee(mdev, id, sector, data_size, GFP_NOIO);
1323 if (!peer_req)
b411b363 1324 return NULL;
45bb912b 1325
b411b363 1326 ds = data_size;
db830c46 1327 page = peer_req->pages;
45bb912b
LE
1328 page_chain_for_each(page) {
1329 unsigned len = min_t(int, ds, PAGE_SIZE);
6b4388ac 1330 data = kmap(page);
de0ff338 1331 rr = drbd_recv(mdev->tconn, data, len);
0cf9d27e 1332 if (drbd_insert_fault(mdev, DRBD_FAULT_RECEIVE)) {
6b4388ac
PR
1333 dev_err(DEV, "Fault injection: Corrupting data on receive\n");
1334 data[0] = data[0] ^ (unsigned long)-1;
1335 }
b411b363 1336 kunmap(page);
45bb912b 1337 if (rr != len) {
db830c46 1338 drbd_free_ee(mdev, peer_req);
0ddc5549
LE
1339 if (!signal_pending(current))
1340 dev_warn(DEV, "short read receiving data: read %d expected %d\n",
1341 rr, len);
b411b363
PR
1342 return NULL;
1343 }
1344 ds -= rr;
1345 }
1346
1347 if (dgs) {
db830c46 1348 drbd_csum_ee(mdev, mdev->tconn->integrity_r_tfm, peer_req, dig_vv);
b411b363 1349 if (memcmp(dig_in, dig_vv, dgs)) {
470be44a
LE
1350 dev_err(DEV, "Digest integrity check FAILED: %llus +%u\n",
1351 (unsigned long long)sector, data_size);
b411b363 1352 drbd_bcast_ee(mdev, "digest failed",
db830c46
AG
1353 dgs, dig_in, dig_vv, peer_req);
1354 drbd_free_ee(mdev, peer_req);
b411b363
PR
1355 return NULL;
1356 }
1357 }
1358 mdev->recv_cnt += data_size>>9;
db830c46 1359 return peer_req;
b411b363
PR
1360}
1361
1362/* drbd_drain_block() just takes a data block
1363 * out of the socket input buffer, and discards it.
1364 */
1365static int drbd_drain_block(struct drbd_conf *mdev, int data_size)
1366{
1367 struct page *page;
1368 int rr, rv = 1;
1369 void *data;
1370
c3470cde 1371 if (!data_size)
81e84650 1372 return true;
c3470cde 1373
45bb912b 1374 page = drbd_pp_alloc(mdev, 1, 1);
b411b363
PR
1375
1376 data = kmap(page);
1377 while (data_size) {
de0ff338 1378 rr = drbd_recv(mdev->tconn, data, min_t(int, data_size, PAGE_SIZE));
b411b363
PR
1379 if (rr != min_t(int, data_size, PAGE_SIZE)) {
1380 rv = 0;
0ddc5549
LE
1381 if (!signal_pending(current))
1382 dev_warn(DEV,
1383 "short read receiving data: read %d expected %d\n",
1384 rr, min_t(int, data_size, PAGE_SIZE));
b411b363
PR
1385 break;
1386 }
1387 data_size -= rr;
1388 }
1389 kunmap(page);
435f0740 1390 drbd_pp_free(mdev, page, 0);
b411b363
PR
1391 return rv;
1392}
1393
1394static int recv_dless_read(struct drbd_conf *mdev, struct drbd_request *req,
1395 sector_t sector, int data_size)
1396{
1397 struct bio_vec *bvec;
1398 struct bio *bio;
1399 int dgs, rr, i, expect;
a0638456
PR
1400 void *dig_in = mdev->tconn->int_dig_in;
1401 void *dig_vv = mdev->tconn->int_dig_vv;
b411b363 1402
a0638456
PR
1403 dgs = (mdev->tconn->agreed_pro_version >= 87 && mdev->tconn->integrity_r_tfm) ?
1404 crypto_hash_digestsize(mdev->tconn->integrity_r_tfm) : 0;
b411b363
PR
1405
1406 if (dgs) {
de0ff338 1407 rr = drbd_recv(mdev->tconn, dig_in, dgs);
b411b363 1408 if (rr != dgs) {
0ddc5549
LE
1409 if (!signal_pending(current))
1410 dev_warn(DEV,
1411 "short read receiving data reply digest: read %d expected %d\n",
1412 rr, dgs);
b411b363
PR
1413 return 0;
1414 }
1415 }
1416
1417 data_size -= dgs;
1418
1419 /* optimistically update recv_cnt. if receiving fails below,
1420 * we disconnect anyways, and counters will be reset. */
1421 mdev->recv_cnt += data_size>>9;
1422
1423 bio = req->master_bio;
1424 D_ASSERT(sector == bio->bi_sector);
1425
1426 bio_for_each_segment(bvec, bio, i) {
1427 expect = min_t(int, data_size, bvec->bv_len);
de0ff338 1428 rr = drbd_recv(mdev->tconn,
b411b363
PR
1429 kmap(bvec->bv_page)+bvec->bv_offset,
1430 expect);
1431 kunmap(bvec->bv_page);
1432 if (rr != expect) {
0ddc5549
LE
1433 if (!signal_pending(current))
1434 dev_warn(DEV, "short read receiving data reply: "
1435 "read %d expected %d\n",
1436 rr, expect);
b411b363
PR
1437 return 0;
1438 }
1439 data_size -= rr;
1440 }
1441
1442 if (dgs) {
a0638456 1443 drbd_csum_bio(mdev, mdev->tconn->integrity_r_tfm, bio, dig_vv);
b411b363
PR
1444 if (memcmp(dig_in, dig_vv, dgs)) {
1445 dev_err(DEV, "Digest integrity check FAILED. Broken NICs?\n");
1446 return 0;
1447 }
1448 }
1449
1450 D_ASSERT(data_size == 0);
1451 return 1;
1452}
1453
1454/* e_end_resync_block() is called via
1455 * drbd_process_done_ee() by asender only */
00d56944 1456static int e_end_resync_block(struct drbd_work *w, int unused)
b411b363 1457{
8050e6d0
AG
1458 struct drbd_peer_request *peer_req =
1459 container_of(w, struct drbd_peer_request, w);
00d56944 1460 struct drbd_conf *mdev = w->mdev;
db830c46 1461 sector_t sector = peer_req->i.sector;
b411b363
PR
1462 int ok;
1463
db830c46 1464 D_ASSERT(drbd_interval_empty(&peer_req->i));
b411b363 1465
db830c46
AG
1466 if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
1467 drbd_set_in_sync(mdev, sector, peer_req->i.size);
1468 ok = drbd_send_ack(mdev, P_RS_WRITE_ACK, peer_req);
b411b363
PR
1469 } else {
1470 /* Record failure to sync */
db830c46 1471 drbd_rs_failed_io(mdev, sector, peer_req->i.size);
b411b363 1472
db830c46 1473 ok = drbd_send_ack(mdev, P_NEG_ACK, peer_req);
b411b363
PR
1474 }
1475 dec_unacked(mdev);
1476
1477 return ok;
1478}
1479
1480static int recv_resync_read(struct drbd_conf *mdev, sector_t sector, int data_size) __releases(local)
1481{
db830c46 1482 struct drbd_peer_request *peer_req;
b411b363 1483
db830c46
AG
1484 peer_req = read_in_block(mdev, ID_SYNCER, sector, data_size);
1485 if (!peer_req)
45bb912b 1486 goto fail;
b411b363
PR
1487
1488 dec_rs_pending(mdev);
1489
b411b363
PR
1490 inc_unacked(mdev);
1491 /* corresponding dec_unacked() in e_end_resync_block()
1492 * respective _drbd_clear_done_ee */
1493
db830c46 1494 peer_req->w.cb = e_end_resync_block;
45bb912b 1495
87eeee41 1496 spin_lock_irq(&mdev->tconn->req_lock);
db830c46 1497 list_add(&peer_req->w.list, &mdev->sync_ee);
87eeee41 1498 spin_unlock_irq(&mdev->tconn->req_lock);
b411b363 1499
0f0601f4 1500 atomic_add(data_size >> 9, &mdev->rs_sect_ev);
fbe29dec 1501 if (drbd_submit_peer_request(mdev, peer_req, WRITE, DRBD_FAULT_RS_WR) == 0)
81e84650 1502 return true;
b411b363 1503
10f6d992
LE
1504 /* don't care for the reason here */
1505 dev_err(DEV, "submit failed, triggering re-connect\n");
87eeee41 1506 spin_lock_irq(&mdev->tconn->req_lock);
db830c46 1507 list_del(&peer_req->w.list);
87eeee41 1508 spin_unlock_irq(&mdev->tconn->req_lock);
22cc37a9 1509
db830c46 1510 drbd_free_ee(mdev, peer_req);
45bb912b
LE
1511fail:
1512 put_ldev(mdev);
81e84650 1513 return false;
b411b363
PR
1514}
1515
668eebc6 1516static struct drbd_request *
bc9c5c41
AG
1517find_request(struct drbd_conf *mdev, struct rb_root *root, u64 id,
1518 sector_t sector, bool missing_ok, const char *func)
51624585 1519{
51624585
AG
1520 struct drbd_request *req;
1521
bc9c5c41
AG
1522 /* Request object according to our peer */
1523 req = (struct drbd_request *)(unsigned long)id;
5e472264 1524 if (drbd_contains_interval(root, sector, &req->i) && req->i.local)
668eebc6 1525 return req;
c3afd8f5
AG
1526 if (!missing_ok) {
1527 dev_err(DEV, "%s: failed to find request %lu, sector %llus\n", func,
1528 (unsigned long)id, (unsigned long long)sector);
1529 }
51624585
AG
1530 return NULL;
1531}
1532
d8763023
AG
1533static int receive_DataReply(struct drbd_conf *mdev, enum drbd_packet cmd,
1534 unsigned int data_size)
b411b363
PR
1535{
1536 struct drbd_request *req;
1537 sector_t sector;
b411b363 1538 int ok;
e42325a5 1539 struct p_data *p = &mdev->tconn->data.rbuf.data;
b411b363
PR
1540
1541 sector = be64_to_cpu(p->sector);
1542
87eeee41 1543 spin_lock_irq(&mdev->tconn->req_lock);
bc9c5c41 1544 req = find_request(mdev, &mdev->read_requests, p->block_id, sector, false, __func__);
87eeee41 1545 spin_unlock_irq(&mdev->tconn->req_lock);
c3afd8f5 1546 if (unlikely(!req))
81e84650 1547 return false;
b411b363 1548
24c4830c 1549 /* hlist_del(&req->collision) is done in _req_may_be_done, to avoid
b411b363
PR
1550 * special casing it there for the various failure cases.
1551 * still no race with drbd_fail_pending_reads */
1552 ok = recv_dless_read(mdev, req, sector, data_size);
1553
1554 if (ok)
8554df1c 1555 req_mod(req, DATA_RECEIVED);
b411b363
PR
1556 /* else: nothing. handled from drbd_disconnect...
1557 * I don't think we may complete this just yet
1558 * in case we are "on-disconnect: freeze" */
1559
1560 return ok;
1561}
1562
d8763023
AG
1563static int receive_RSDataReply(struct drbd_conf *mdev, enum drbd_packet cmd,
1564 unsigned int data_size)
b411b363
PR
1565{
1566 sector_t sector;
b411b363 1567 int ok;
e42325a5 1568 struct p_data *p = &mdev->tconn->data.rbuf.data;
b411b363
PR
1569
1570 sector = be64_to_cpu(p->sector);
1571 D_ASSERT(p->block_id == ID_SYNCER);
1572
1573 if (get_ldev(mdev)) {
1574 /* data is submitted to disk within recv_resync_read.
1575 * corresponding put_ldev done below on error,
fcefa62e 1576 * or in drbd_peer_request_endio. */
b411b363
PR
1577 ok = recv_resync_read(mdev, sector, data_size);
1578 } else {
1579 if (__ratelimit(&drbd_ratelimit_state))
1580 dev_err(DEV, "Can not write resync data to local disk.\n");
1581
1582 ok = drbd_drain_block(mdev, data_size);
1583
2b2bf214 1584 drbd_send_ack_dp(mdev, P_NEG_ACK, p, data_size);
b411b363
PR
1585 }
1586
778f271d
PR
1587 atomic_add(data_size >> 9, &mdev->rs_sect_in);
1588
b411b363
PR
1589 return ok;
1590}
1591
1592/* e_end_block() is called via drbd_process_done_ee().
1593 * this means this function only runs in the asender thread
1594 */
00d56944 1595static int e_end_block(struct drbd_work *w, int cancel)
b411b363 1596{
8050e6d0
AG
1597 struct drbd_peer_request *peer_req =
1598 container_of(w, struct drbd_peer_request, w);
00d56944 1599 struct drbd_conf *mdev = w->mdev;
db830c46 1600 sector_t sector = peer_req->i.sector;
b411b363
PR
1601 int ok = 1, pcmd;
1602
89e58e75 1603 if (mdev->tconn->net_conf->wire_protocol == DRBD_PROT_C) {
db830c46 1604 if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
b411b363
PR
1605 pcmd = (mdev->state.conn >= C_SYNC_SOURCE &&
1606 mdev->state.conn <= C_PAUSED_SYNC_T &&
db830c46 1607 peer_req->flags & EE_MAY_SET_IN_SYNC) ?
b411b363 1608 P_RS_WRITE_ACK : P_WRITE_ACK;
db830c46 1609 ok &= drbd_send_ack(mdev, pcmd, peer_req);
b411b363 1610 if (pcmd == P_RS_WRITE_ACK)
db830c46 1611 drbd_set_in_sync(mdev, sector, peer_req->i.size);
b411b363 1612 } else {
db830c46 1613 ok = drbd_send_ack(mdev, P_NEG_ACK, peer_req);
b411b363
PR
1614 /* we expect it to be marked out of sync anyways...
1615 * maybe assert this? */
1616 }
1617 dec_unacked(mdev);
1618 }
1619 /* we delete from the conflict detection hash _after_ we sent out the
1620 * P_WRITE_ACK / P_NEG_ACK, to get the sequence number right. */
89e58e75 1621 if (mdev->tconn->net_conf->two_primaries) {
87eeee41 1622 spin_lock_irq(&mdev->tconn->req_lock);
db830c46
AG
1623 D_ASSERT(!drbd_interval_empty(&peer_req->i));
1624 drbd_remove_epoch_entry_interval(mdev, peer_req);
87eeee41 1625 spin_unlock_irq(&mdev->tconn->req_lock);
bb3bfe96 1626 } else
db830c46 1627 D_ASSERT(drbd_interval_empty(&peer_req->i));
b411b363 1628
db830c46 1629 drbd_may_finish_epoch(mdev, peer_req->epoch, EV_PUT + (cancel ? EV_CLEANUP : 0));
b411b363
PR
1630
1631 return ok;
1632}
1633
00d56944 1634static int e_send_discard_ack(struct drbd_work *w, int unused)
b411b363 1635{
8050e6d0
AG
1636 struct drbd_peer_request *peer_req =
1637 container_of(w, struct drbd_peer_request, w);
00d56944 1638 struct drbd_conf *mdev = w->mdev;
206d3589 1639 int ok;
b411b363 1640
89e58e75 1641 D_ASSERT(mdev->tconn->net_conf->wire_protocol == DRBD_PROT_C);
db830c46 1642 ok = drbd_send_ack(mdev, P_DISCARD_ACK, peer_req);
b411b363
PR
1643 dec_unacked(mdev);
1644
1645 return ok;
1646}
1647
3e394da1
AG
1648static bool seq_greater(u32 a, u32 b)
1649{
1650 /*
1651 * We assume 32-bit wrap-around here.
1652 * For 24-bit wrap-around, we would have to shift:
1653 * a <<= 8; b <<= 8;
1654 */
1655 return (s32)a - (s32)b > 0;
1656}
1657
1658static u32 seq_max(u32 a, u32 b)
1659{
1660 return seq_greater(a, b) ? a : b;
1661}
1662
43ae077d 1663static void update_peer_seq(struct drbd_conf *mdev, unsigned int peer_seq)
3e394da1 1664{
43ae077d 1665 unsigned int old_peer_seq;
3e394da1
AG
1666
1667 spin_lock(&mdev->peer_seq_lock);
43ae077d
AG
1668 old_peer_seq = mdev->peer_seq;
1669 mdev->peer_seq = seq_max(mdev->peer_seq, peer_seq);
3e394da1 1670 spin_unlock(&mdev->peer_seq_lock);
43ae077d 1671 if (old_peer_seq != peer_seq)
3e394da1
AG
1672 wake_up(&mdev->seq_wait);
1673}
1674
b411b363
PR
1675/* Called from receive_Data.
1676 * Synchronize packets on sock with packets on msock.
1677 *
1678 * This is here so even when a P_DATA packet traveling via sock overtook an Ack
1679 * packet traveling on msock, they are still processed in the order they have
1680 * been sent.
1681 *
1682 * Note: we don't care for Ack packets overtaking P_DATA packets.
1683 *
1684 * In case packet_seq is larger than mdev->peer_seq number, there are
1685 * outstanding packets on the msock. We wait for them to arrive.
1686 * In case we are the logically next packet, we update mdev->peer_seq
1687 * ourselves. Correctly handles 32bit wrap around.
1688 *
1689 * Assume we have a 10 GBit connection, that is about 1<<30 byte per second,
1690 * about 1<<21 sectors per second. So "worst" case, we have 1<<3 == 8 seconds
1691 * for the 24bit wrap (historical atomic_t guarantee on some archs), and we have
1692 * 1<<9 == 512 seconds aka ages for the 32bit wrap around...
1693 *
1694 * returns 0 if we may process the packet,
1695 * -ERESTARTSYS if we were interrupted (by disconnect signal). */
1696static int drbd_wait_peer_seq(struct drbd_conf *mdev, const u32 packet_seq)
1697{
1698 DEFINE_WAIT(wait);
1699 unsigned int p_seq;
1700 long timeout;
1701 int ret = 0;
1702 spin_lock(&mdev->peer_seq_lock);
1703 for (;;) {
1704 prepare_to_wait(&mdev->seq_wait, &wait, TASK_INTERRUPTIBLE);
3e394da1 1705 if (!seq_greater(packet_seq, mdev->peer_seq + 1))
b411b363
PR
1706 break;
1707 if (signal_pending(current)) {
1708 ret = -ERESTARTSYS;
1709 break;
1710 }
1711 p_seq = mdev->peer_seq;
1712 spin_unlock(&mdev->peer_seq_lock);
71b1c1eb
AG
1713 timeout = mdev->tconn->net_conf->ping_timeo*HZ/10;
1714 timeout = schedule_timeout(timeout);
b411b363
PR
1715 spin_lock(&mdev->peer_seq_lock);
1716 if (timeout == 0 && p_seq == mdev->peer_seq) {
1717 ret = -ETIMEDOUT;
71b1c1eb 1718 dev_err(DEV, "Timed out waiting for missing ack packets; disconnecting\n");
b411b363
PR
1719 break;
1720 }
1721 }
1722 finish_wait(&mdev->seq_wait, &wait);
1723 if (mdev->peer_seq+1 == packet_seq)
1724 mdev->peer_seq++;
1725 spin_unlock(&mdev->peer_seq_lock);
1726 return ret;
1727}
1728
688593c5
LE
1729/* see also bio_flags_to_wire()
1730 * DRBD_REQ_*, because we need to semantically map the flags to data packet
1731 * flags and back. We may replicate to other kernel versions. */
1732static unsigned long wire_flags_to_bio(struct drbd_conf *mdev, u32 dpf)
76d2e7ec 1733{
688593c5
LE
1734 return (dpf & DP_RW_SYNC ? REQ_SYNC : 0) |
1735 (dpf & DP_FUA ? REQ_FUA : 0) |
1736 (dpf & DP_FLUSH ? REQ_FLUSH : 0) |
1737 (dpf & DP_DISCARD ? REQ_DISCARD : 0);
76d2e7ec
PR
1738}
1739
b411b363 1740/* mirrored write */
d8763023
AG
1741static int receive_Data(struct drbd_conf *mdev, enum drbd_packet cmd,
1742 unsigned int data_size)
b411b363
PR
1743{
1744 sector_t sector;
db830c46 1745 struct drbd_peer_request *peer_req;
e42325a5 1746 struct p_data *p = &mdev->tconn->data.rbuf.data;
b411b363
PR
1747 int rw = WRITE;
1748 u32 dp_flags;
1749
b411b363 1750 if (!get_ldev(mdev)) {
b411b363
PR
1751 spin_lock(&mdev->peer_seq_lock);
1752 if (mdev->peer_seq+1 == be32_to_cpu(p->seq_num))
1753 mdev->peer_seq++;
1754 spin_unlock(&mdev->peer_seq_lock);
1755
2b2bf214 1756 drbd_send_ack_dp(mdev, P_NEG_ACK, p, data_size);
b411b363
PR
1757 atomic_inc(&mdev->current_epoch->epoch_size);
1758 return drbd_drain_block(mdev, data_size);
1759 }
1760
fcefa62e
AG
1761 /*
1762 * Corresponding put_ldev done either below (on various errors), or in
1763 * drbd_peer_request_endio, if we successfully submit the data at the
1764 * end of this function.
1765 */
b411b363
PR
1766
1767 sector = be64_to_cpu(p->sector);
db830c46
AG
1768 peer_req = read_in_block(mdev, p->block_id, sector, data_size);
1769 if (!peer_req) {
b411b363 1770 put_ldev(mdev);
81e84650 1771 return false;
b411b363
PR
1772 }
1773
db830c46 1774 peer_req->w.cb = e_end_block;
b411b363 1775
688593c5
LE
1776 dp_flags = be32_to_cpu(p->dp_flags);
1777 rw |= wire_flags_to_bio(mdev, dp_flags);
1778
1779 if (dp_flags & DP_MAY_SET_IN_SYNC)
db830c46 1780 peer_req->flags |= EE_MAY_SET_IN_SYNC;
688593c5 1781
b411b363 1782 spin_lock(&mdev->epoch_lock);
db830c46
AG
1783 peer_req->epoch = mdev->current_epoch;
1784 atomic_inc(&peer_req->epoch->epoch_size);
1785 atomic_inc(&peer_req->epoch->active);
b411b363
PR
1786 spin_unlock(&mdev->epoch_lock);
1787
b411b363 1788 /* I'm the receiver, I do hold a net_cnt reference. */
89e58e75 1789 if (!mdev->tconn->net_conf->two_primaries) {
87eeee41 1790 spin_lock_irq(&mdev->tconn->req_lock);
b411b363
PR
1791 } else {
1792 /* don't get the req_lock yet,
1793 * we may sleep in drbd_wait_peer_seq */
db830c46 1794 const int size = peer_req->i.size;
25703f83 1795 const int discard = test_bit(DISCARD_CONCURRENT, &mdev->tconn->flags);
b411b363 1796 DEFINE_WAIT(wait);
b411b363
PR
1797 int first;
1798
89e58e75 1799 D_ASSERT(mdev->tconn->net_conf->wire_protocol == DRBD_PROT_C);
b411b363
PR
1800
1801 /* conflict detection and handling:
1802 * 1. wait on the sequence number,
1803 * in case this data packet overtook ACK packets.
5e472264 1804 * 2. check for conflicting write requests.
b411b363
PR
1805 *
1806 * Note: for two_primaries, we are protocol C,
1807 * so there cannot be any request that is DONE
1808 * but still on the transfer log.
1809 *
b411b363
PR
1810 * if no conflicting request is found:
1811 * submit.
1812 *
1813 * if any conflicting request is found
1814 * that has not yet been acked,
1815 * AND I have the "discard concurrent writes" flag:
1816 * queue (via done_ee) the P_DISCARD_ACK; OUT.
1817 *
1818 * if any conflicting request is found:
1819 * block the receiver, waiting on misc_wait
1820 * until no more conflicting requests are there,
1821 * or we get interrupted (disconnect).
1822 *
1823 * we do not just write after local io completion of those
1824 * requests, but only after req is done completely, i.e.
1825 * we wait for the P_DISCARD_ACK to arrive!
1826 *
1827 * then proceed normally, i.e. submit.
1828 */
1829 if (drbd_wait_peer_seq(mdev, be32_to_cpu(p->seq_num)))
1830 goto out_interrupted;
1831
87eeee41 1832 spin_lock_irq(&mdev->tconn->req_lock);
b411b363 1833
206d3589
AG
1834 /*
1835 * Inserting the peer request into the write_requests tree will
1836 * prevent new conflicting local requests from being added.
1837 */
1838 drbd_insert_interval(&mdev->write_requests, &peer_req->i);
1839
b411b363
PR
1840 first = 1;
1841 for (;;) {
de696716 1842 struct drbd_interval *i;
b411b363
PR
1843 int have_unacked = 0;
1844 int have_conflict = 0;
1845 prepare_to_wait(&mdev->misc_wait, &wait,
1846 TASK_INTERRUPTIBLE);
de696716 1847
206d3589
AG
1848 drbd_for_each_overlap(i, &mdev->write_requests, sector, size) {
1849 struct drbd_request *req2;
1850
1851 if (i == &peer_req->i || !i->local)
1852 continue;
1853
de696716
AG
1854 /* only ALERT on first iteration,
1855 * we may be woken up early... */
1856 if (first)
206d3589 1857 dev_alert(DEV, "%s[%u] Concurrent local write detected!"
de696716
AG
1858 " new: %llus +%u; pending: %llus +%u\n",
1859 current->comm, current->pid,
1860 (unsigned long long)sector, size,
5e472264
AG
1861 (unsigned long long)i->sector, i->size);
1862
206d3589
AG
1863 req2 = container_of(i, struct drbd_request, i);
1864 if (req2->rq_state & RQ_NET_PENDING)
1865 ++have_unacked;
de696716 1866 ++have_conflict;
206d3589 1867 break;
b411b363 1868 }
b411b363
PR
1869 if (!have_conflict)
1870 break;
1871
1872 /* Discard Ack only for the _first_ iteration */
1873 if (first && discard && have_unacked) {
1874 dev_alert(DEV, "Concurrent write! [DISCARD BY FLAG] sec=%llus\n",
1875 (unsigned long long)sector);
206d3589 1876 drbd_remove_epoch_entry_interval(mdev, peer_req);
b411b363 1877 inc_unacked(mdev);
db830c46
AG
1878 peer_req->w.cb = e_send_discard_ack;
1879 list_add_tail(&peer_req->w.list, &mdev->done_ee);
b411b363 1880
87eeee41 1881 spin_unlock_irq(&mdev->tconn->req_lock);
b411b363
PR
1882
1883 /* we could probably send that P_DISCARD_ACK ourselves,
1884 * but I don't like the receiver using the msock */
1885
1886 put_ldev(mdev);
0625ac19 1887 wake_asender(mdev->tconn);
b411b363 1888 finish_wait(&mdev->misc_wait, &wait);
81e84650 1889 return true;
b411b363
PR
1890 }
1891
1892 if (signal_pending(current)) {
206d3589 1893 drbd_remove_epoch_entry_interval(mdev, peer_req);
87eeee41 1894 spin_unlock_irq(&mdev->tconn->req_lock);
b411b363
PR
1895 finish_wait(&mdev->misc_wait, &wait);
1896 goto out_interrupted;
1897 }
1898
a500c2ef 1899 /* Indicate to wake up mdev->misc_wait upon completion. */
53840641 1900 i->waiting = true;
a500c2ef 1901
87eeee41 1902 spin_unlock_irq(&mdev->tconn->req_lock);
b411b363
PR
1903 if (first) {
1904 first = 0;
1905 dev_alert(DEV, "Concurrent write! [W AFTERWARDS] "
1906 "sec=%llus\n", (unsigned long long)sector);
1907 } else if (discard) {
1908 /* we had none on the first iteration.
1909 * there must be none now. */
1910 D_ASSERT(have_unacked == 0);
1911 }
206d3589 1912 /* FIXME: Introduce a timeout here after which we disconnect. */
b411b363 1913 schedule();
87eeee41 1914 spin_lock_irq(&mdev->tconn->req_lock);
b411b363
PR
1915 }
1916 finish_wait(&mdev->misc_wait, &wait);
1917 }
1918
db830c46 1919 list_add(&peer_req->w.list, &mdev->active_ee);
87eeee41 1920 spin_unlock_irq(&mdev->tconn->req_lock);
b411b363 1921
89e58e75 1922 switch (mdev->tconn->net_conf->wire_protocol) {
b411b363
PR
1923 case DRBD_PROT_C:
1924 inc_unacked(mdev);
1925 /* corresponding dec_unacked() in e_end_block()
1926 * respective _drbd_clear_done_ee */
1927 break;
1928 case DRBD_PROT_B:
1929 /* I really don't like it that the receiver thread
1930 * sends on the msock, but anyways */
db830c46 1931 drbd_send_ack(mdev, P_RECV_ACK, peer_req);
b411b363
PR
1932 break;
1933 case DRBD_PROT_A:
1934 /* nothing to do */
1935 break;
1936 }
1937
6719fb03 1938 if (mdev->state.pdsk < D_INCONSISTENT) {
b411b363 1939 /* In case we have the only disk of the cluster, */
db830c46
AG
1940 drbd_set_out_of_sync(mdev, peer_req->i.sector, peer_req->i.size);
1941 peer_req->flags |= EE_CALL_AL_COMPLETE_IO;
1942 peer_req->flags &= ~EE_MAY_SET_IN_SYNC;
1943 drbd_al_begin_io(mdev, peer_req->i.sector);
b411b363
PR
1944 }
1945
fbe29dec 1946 if (drbd_submit_peer_request(mdev, peer_req, rw, DRBD_FAULT_DT_WR) == 0)
81e84650 1947 return true;
b411b363 1948
10f6d992
LE
1949 /* don't care for the reason here */
1950 dev_err(DEV, "submit failed, triggering re-connect\n");
87eeee41 1951 spin_lock_irq(&mdev->tconn->req_lock);
db830c46
AG
1952 list_del(&peer_req->w.list);
1953 drbd_remove_epoch_entry_interval(mdev, peer_req);
87eeee41 1954 spin_unlock_irq(&mdev->tconn->req_lock);
db830c46
AG
1955 if (peer_req->flags & EE_CALL_AL_COMPLETE_IO)
1956 drbd_al_complete_io(mdev, peer_req->i.sector);
22cc37a9 1957
b411b363 1958out_interrupted:
db830c46 1959 drbd_may_finish_epoch(mdev, peer_req->epoch, EV_PUT + EV_CLEANUP);
b411b363 1960 put_ldev(mdev);
db830c46 1961 drbd_free_ee(mdev, peer_req);
81e84650 1962 return false;
b411b363
PR
1963}
1964
0f0601f4
LE
1965/* We may throttle resync, if the lower device seems to be busy,
1966 * and current sync rate is above c_min_rate.
1967 *
1968 * To decide whether or not the lower device is busy, we use a scheme similar
1969 * to MD RAID is_mddev_idle(): if the partition stats reveal "significant"
1970 * (more than 64 sectors) of activity we cannot account for with our own resync
1971 * activity, it obviously is "busy".
1972 *
1973 * The current sync rate used here uses only the most recent two step marks,
1974 * to have a short time average so we can react faster.
1975 */
e3555d85 1976int drbd_rs_should_slow_down(struct drbd_conf *mdev, sector_t sector)
0f0601f4
LE
1977{
1978 struct gendisk *disk = mdev->ldev->backing_bdev->bd_contains->bd_disk;
1979 unsigned long db, dt, dbdt;
e3555d85 1980 struct lc_element *tmp;
0f0601f4
LE
1981 int curr_events;
1982 int throttle = 0;
1983
1984 /* feature disabled? */
1985 if (mdev->sync_conf.c_min_rate == 0)
1986 return 0;
1987
e3555d85
PR
1988 spin_lock_irq(&mdev->al_lock);
1989 tmp = lc_find(mdev->resync, BM_SECT_TO_EXT(sector));
1990 if (tmp) {
1991 struct bm_extent *bm_ext = lc_entry(tmp, struct bm_extent, lce);
1992 if (test_bit(BME_PRIORITY, &bm_ext->flags)) {
1993 spin_unlock_irq(&mdev->al_lock);
1994 return 0;
1995 }
1996 /* Do not slow down if app IO is already waiting for this extent */
1997 }
1998 spin_unlock_irq(&mdev->al_lock);
1999
0f0601f4
LE
2000 curr_events = (int)part_stat_read(&disk->part0, sectors[0]) +
2001 (int)part_stat_read(&disk->part0, sectors[1]) -
2002 atomic_read(&mdev->rs_sect_ev);
e3555d85 2003
0f0601f4
LE
2004 if (!mdev->rs_last_events || curr_events - mdev->rs_last_events > 64) {
2005 unsigned long rs_left;
2006 int i;
2007
2008 mdev->rs_last_events = curr_events;
2009
2010 /* sync speed average over the last 2*DRBD_SYNC_MARK_STEP,
2011 * approx. */
2649f080
LE
2012 i = (mdev->rs_last_mark + DRBD_SYNC_MARKS-1) % DRBD_SYNC_MARKS;
2013
2014 if (mdev->state.conn == C_VERIFY_S || mdev->state.conn == C_VERIFY_T)
2015 rs_left = mdev->ov_left;
2016 else
2017 rs_left = drbd_bm_total_weight(mdev) - mdev->rs_failed;
0f0601f4
LE
2018
2019 dt = ((long)jiffies - (long)mdev->rs_mark_time[i]) / HZ;
2020 if (!dt)
2021 dt++;
2022 db = mdev->rs_mark_left[i] - rs_left;
2023 dbdt = Bit2KB(db/dt);
2024
2025 if (dbdt > mdev->sync_conf.c_min_rate)
2026 throttle = 1;
2027 }
2028 return throttle;
2029}
2030
2031
d8763023
AG
2032static int receive_DataRequest(struct drbd_conf *mdev, enum drbd_packet cmd,
2033 unsigned int digest_size)
b411b363
PR
2034{
2035 sector_t sector;
2036 const sector_t capacity = drbd_get_capacity(mdev->this_bdev);
db830c46 2037 struct drbd_peer_request *peer_req;
b411b363 2038 struct digest_info *di = NULL;
b18b37be 2039 int size, verb;
b411b363 2040 unsigned int fault_type;
e42325a5 2041 struct p_block_req *p = &mdev->tconn->data.rbuf.block_req;
b411b363
PR
2042
2043 sector = be64_to_cpu(p->sector);
2044 size = be32_to_cpu(p->blksize);
2045
c670a398 2046 if (size <= 0 || !IS_ALIGNED(size, 512) || size > DRBD_MAX_BIO_SIZE) {
b411b363
PR
2047 dev_err(DEV, "%s:%d: sector: %llus, size: %u\n", __FILE__, __LINE__,
2048 (unsigned long long)sector, size);
81e84650 2049 return false;
b411b363
PR
2050 }
2051 if (sector + (size>>9) > capacity) {
2052 dev_err(DEV, "%s:%d: sector: %llus, size: %u\n", __FILE__, __LINE__,
2053 (unsigned long long)sector, size);
81e84650 2054 return false;
b411b363
PR
2055 }
2056
2057 if (!get_ldev_if_state(mdev, D_UP_TO_DATE)) {
b18b37be
PR
2058 verb = 1;
2059 switch (cmd) {
2060 case P_DATA_REQUEST:
2061 drbd_send_ack_rp(mdev, P_NEG_DREPLY, p);
2062 break;
2063 case P_RS_DATA_REQUEST:
2064 case P_CSUM_RS_REQUEST:
2065 case P_OV_REQUEST:
2066 drbd_send_ack_rp(mdev, P_NEG_RS_DREPLY , p);
2067 break;
2068 case P_OV_REPLY:
2069 verb = 0;
2070 dec_rs_pending(mdev);
2071 drbd_send_ack_ex(mdev, P_OV_RESULT, sector, size, ID_IN_SYNC);
2072 break;
2073 default:
2074 dev_err(DEV, "unexpected command (%s) in receive_DataRequest\n",
2075 cmdname(cmd));
2076 }
2077 if (verb && __ratelimit(&drbd_ratelimit_state))
b411b363
PR
2078 dev_err(DEV, "Can not satisfy peer's read request, "
2079 "no local data.\n");
b18b37be 2080
a821cc4a
LE
2081 /* drain possibly payload */
2082 return drbd_drain_block(mdev, digest_size);
b411b363
PR
2083 }
2084
2085 /* GFP_NOIO, because we must not cause arbitrary write-out: in a DRBD
2086 * "criss-cross" setup, that might cause write-out on some other DRBD,
2087 * which in turn might block on the other node at this very place. */
db830c46
AG
2088 peer_req = drbd_alloc_ee(mdev, p->block_id, sector, size, GFP_NOIO);
2089 if (!peer_req) {
b411b363 2090 put_ldev(mdev);
81e84650 2091 return false;
b411b363
PR
2092 }
2093
02918be2 2094 switch (cmd) {
b411b363 2095 case P_DATA_REQUEST:
db830c46 2096 peer_req->w.cb = w_e_end_data_req;
b411b363 2097 fault_type = DRBD_FAULT_DT_RD;
80a40e43
LE
2098 /* application IO, don't drbd_rs_begin_io */
2099 goto submit;
2100
b411b363 2101 case P_RS_DATA_REQUEST:
db830c46 2102 peer_req->w.cb = w_e_end_rsdata_req;
b411b363 2103 fault_type = DRBD_FAULT_RS_RD;
5f9915bb
LE
2104 /* used in the sector offset progress display */
2105 mdev->bm_resync_fo = BM_SECT_TO_BIT(sector);
b411b363
PR
2106 break;
2107
2108 case P_OV_REPLY:
2109 case P_CSUM_RS_REQUEST:
2110 fault_type = DRBD_FAULT_RS_RD;
b411b363
PR
2111 di = kmalloc(sizeof(*di) + digest_size, GFP_NOIO);
2112 if (!di)
2113 goto out_free_e;
2114
2115 di->digest_size = digest_size;
2116 di->digest = (((char *)di)+sizeof(struct digest_info));
2117
db830c46
AG
2118 peer_req->digest = di;
2119 peer_req->flags |= EE_HAS_DIGEST;
c36c3ced 2120
de0ff338 2121 if (drbd_recv(mdev->tconn, di->digest, digest_size) != digest_size)
b411b363
PR
2122 goto out_free_e;
2123
02918be2 2124 if (cmd == P_CSUM_RS_REQUEST) {
31890f4a 2125 D_ASSERT(mdev->tconn->agreed_pro_version >= 89);
db830c46 2126 peer_req->w.cb = w_e_end_csum_rs_req;
5f9915bb
LE
2127 /* used in the sector offset progress display */
2128 mdev->bm_resync_fo = BM_SECT_TO_BIT(sector);
02918be2 2129 } else if (cmd == P_OV_REPLY) {
2649f080
LE
2130 /* track progress, we may need to throttle */
2131 atomic_add(size >> 9, &mdev->rs_sect_in);
db830c46 2132 peer_req->w.cb = w_e_end_ov_reply;
b411b363 2133 dec_rs_pending(mdev);
0f0601f4
LE
2134 /* drbd_rs_begin_io done when we sent this request,
2135 * but accounting still needs to be done. */
2136 goto submit_for_resync;
b411b363
PR
2137 }
2138 break;
2139
2140 case P_OV_REQUEST:
b411b363 2141 if (mdev->ov_start_sector == ~(sector_t)0 &&
31890f4a 2142 mdev->tconn->agreed_pro_version >= 90) {
de228bba
LE
2143 unsigned long now = jiffies;
2144 int i;
b411b363
PR
2145 mdev->ov_start_sector = sector;
2146 mdev->ov_position = sector;
30b743a2
LE
2147 mdev->ov_left = drbd_bm_bits(mdev) - BM_SECT_TO_BIT(sector);
2148 mdev->rs_total = mdev->ov_left;
de228bba
LE
2149 for (i = 0; i < DRBD_SYNC_MARKS; i++) {
2150 mdev->rs_mark_left[i] = mdev->ov_left;
2151 mdev->rs_mark_time[i] = now;
2152 }
b411b363
PR
2153 dev_info(DEV, "Online Verify start sector: %llu\n",
2154 (unsigned long long)sector);
2155 }
db830c46 2156 peer_req->w.cb = w_e_end_ov_req;
b411b363 2157 fault_type = DRBD_FAULT_RS_RD;
b411b363
PR
2158 break;
2159
b411b363
PR
2160 default:
2161 dev_err(DEV, "unexpected command (%s) in receive_DataRequest\n",
02918be2 2162 cmdname(cmd));
b411b363 2163 fault_type = DRBD_FAULT_MAX;
80a40e43 2164 goto out_free_e;
b411b363
PR
2165 }
2166
0f0601f4
LE
2167 /* Throttle, drbd_rs_begin_io and submit should become asynchronous
2168 * wrt the receiver, but it is not as straightforward as it may seem.
2169 * Various places in the resync start and stop logic assume resync
2170 * requests are processed in order, requeuing this on the worker thread
2171 * introduces a bunch of new code for synchronization between threads.
2172 *
2173 * Unlimited throttling before drbd_rs_begin_io may stall the resync
2174 * "forever", throttling after drbd_rs_begin_io will lock that extent
2175 * for application writes for the same time. For now, just throttle
2176 * here, where the rest of the code expects the receiver to sleep for
2177 * a while, anyways.
2178 */
2179
2180 /* Throttle before drbd_rs_begin_io, as that locks out application IO;
2181 * this defers syncer requests for some time, before letting at least
2182 * on request through. The resync controller on the receiving side
2183 * will adapt to the incoming rate accordingly.
2184 *
2185 * We cannot throttle here if remote is Primary/SyncTarget:
2186 * we would also throttle its application reads.
2187 * In that case, throttling is done on the SyncTarget only.
2188 */
e3555d85
PR
2189 if (mdev->state.peer != R_PRIMARY && drbd_rs_should_slow_down(mdev, sector))
2190 schedule_timeout_uninterruptible(HZ/10);
2191 if (drbd_rs_begin_io(mdev, sector))
80a40e43 2192 goto out_free_e;
b411b363 2193
0f0601f4
LE
2194submit_for_resync:
2195 atomic_add(size >> 9, &mdev->rs_sect_ev);
2196
80a40e43 2197submit:
b411b363 2198 inc_unacked(mdev);
87eeee41 2199 spin_lock_irq(&mdev->tconn->req_lock);
db830c46 2200 list_add_tail(&peer_req->w.list, &mdev->read_ee);
87eeee41 2201 spin_unlock_irq(&mdev->tconn->req_lock);
b411b363 2202
fbe29dec 2203 if (drbd_submit_peer_request(mdev, peer_req, READ, fault_type) == 0)
81e84650 2204 return true;
b411b363 2205
10f6d992
LE
2206 /* don't care for the reason here */
2207 dev_err(DEV, "submit failed, triggering re-connect\n");
87eeee41 2208 spin_lock_irq(&mdev->tconn->req_lock);
db830c46 2209 list_del(&peer_req->w.list);
87eeee41 2210 spin_unlock_irq(&mdev->tconn->req_lock);
22cc37a9
LE
2211 /* no drbd_rs_complete_io(), we are dropping the connection anyways */
2212
b411b363 2213out_free_e:
b411b363 2214 put_ldev(mdev);
db830c46 2215 drbd_free_ee(mdev, peer_req);
81e84650 2216 return false;
b411b363
PR
2217}
2218
2219static int drbd_asb_recover_0p(struct drbd_conf *mdev) __must_hold(local)
2220{
2221 int self, peer, rv = -100;
2222 unsigned long ch_self, ch_peer;
2223
2224 self = mdev->ldev->md.uuid[UI_BITMAP] & 1;
2225 peer = mdev->p_uuid[UI_BITMAP] & 1;
2226
2227 ch_peer = mdev->p_uuid[UI_SIZE];
2228 ch_self = mdev->comm_bm_set;
2229
89e58e75 2230 switch (mdev->tconn->net_conf->after_sb_0p) {
b411b363
PR
2231 case ASB_CONSENSUS:
2232 case ASB_DISCARD_SECONDARY:
2233 case ASB_CALL_HELPER:
2234 dev_err(DEV, "Configuration error.\n");
2235 break;
2236 case ASB_DISCONNECT:
2237 break;
2238 case ASB_DISCARD_YOUNGER_PRI:
2239 if (self == 0 && peer == 1) {
2240 rv = -1;
2241 break;
2242 }
2243 if (self == 1 && peer == 0) {
2244 rv = 1;
2245 break;
2246 }
2247 /* Else fall through to one of the other strategies... */
2248 case ASB_DISCARD_OLDER_PRI:
2249 if (self == 0 && peer == 1) {
2250 rv = 1;
2251 break;
2252 }
2253 if (self == 1 && peer == 0) {
2254 rv = -1;
2255 break;
2256 }
2257 /* Else fall through to one of the other strategies... */
ad19bf6e 2258 dev_warn(DEV, "Discard younger/older primary did not find a decision\n"
b411b363
PR
2259 "Using discard-least-changes instead\n");
2260 case ASB_DISCARD_ZERO_CHG:
2261 if (ch_peer == 0 && ch_self == 0) {
25703f83 2262 rv = test_bit(DISCARD_CONCURRENT, &mdev->tconn->flags)
b411b363
PR
2263 ? -1 : 1;
2264 break;
2265 } else {
2266 if (ch_peer == 0) { rv = 1; break; }
2267 if (ch_self == 0) { rv = -1; break; }
2268 }
89e58e75 2269 if (mdev->tconn->net_conf->after_sb_0p == ASB_DISCARD_ZERO_CHG)
b411b363
PR
2270 break;
2271 case ASB_DISCARD_LEAST_CHG:
2272 if (ch_self < ch_peer)
2273 rv = -1;
2274 else if (ch_self > ch_peer)
2275 rv = 1;
2276 else /* ( ch_self == ch_peer ) */
2277 /* Well, then use something else. */
25703f83 2278 rv = test_bit(DISCARD_CONCURRENT, &mdev->tconn->flags)
b411b363
PR
2279 ? -1 : 1;
2280 break;
2281 case ASB_DISCARD_LOCAL:
2282 rv = -1;
2283 break;
2284 case ASB_DISCARD_REMOTE:
2285 rv = 1;
2286 }
2287
2288 return rv;
2289}
2290
2291static int drbd_asb_recover_1p(struct drbd_conf *mdev) __must_hold(local)
2292{
6184ea21 2293 int hg, rv = -100;
b411b363 2294
89e58e75 2295 switch (mdev->tconn->net_conf->after_sb_1p) {
b411b363
PR
2296 case ASB_DISCARD_YOUNGER_PRI:
2297 case ASB_DISCARD_OLDER_PRI:
2298 case ASB_DISCARD_LEAST_CHG:
2299 case ASB_DISCARD_LOCAL:
2300 case ASB_DISCARD_REMOTE:
2301 dev_err(DEV, "Configuration error.\n");
2302 break;
2303 case ASB_DISCONNECT:
2304 break;
2305 case ASB_CONSENSUS:
2306 hg = drbd_asb_recover_0p(mdev);
2307 if (hg == -1 && mdev->state.role == R_SECONDARY)
2308 rv = hg;
2309 if (hg == 1 && mdev->state.role == R_PRIMARY)
2310 rv = hg;
2311 break;
2312 case ASB_VIOLENTLY:
2313 rv = drbd_asb_recover_0p(mdev);
2314 break;
2315 case ASB_DISCARD_SECONDARY:
2316 return mdev->state.role == R_PRIMARY ? 1 : -1;
2317 case ASB_CALL_HELPER:
2318 hg = drbd_asb_recover_0p(mdev);
2319 if (hg == -1 && mdev->state.role == R_PRIMARY) {
bb437946
AG
2320 enum drbd_state_rv rv2;
2321
2322 drbd_set_role(mdev, R_SECONDARY, 0);
b411b363
PR
2323 /* drbd_change_state() does not sleep while in SS_IN_TRANSIENT_STATE,
2324 * we might be here in C_WF_REPORT_PARAMS which is transient.
2325 * we do not need to wait for the after state change work either. */
bb437946
AG
2326 rv2 = drbd_change_state(mdev, CS_VERBOSE, NS(role, R_SECONDARY));
2327 if (rv2 != SS_SUCCESS) {
b411b363
PR
2328 drbd_khelper(mdev, "pri-lost-after-sb");
2329 } else {
2330 dev_warn(DEV, "Successfully gave up primary role.\n");
2331 rv = hg;
2332 }
2333 } else
2334 rv = hg;
2335 }
2336
2337 return rv;
2338}
2339
2340static int drbd_asb_recover_2p(struct drbd_conf *mdev) __must_hold(local)
2341{
6184ea21 2342 int hg, rv = -100;
b411b363 2343
89e58e75 2344 switch (mdev->tconn->net_conf->after_sb_2p) {
b411b363
PR
2345 case ASB_DISCARD_YOUNGER_PRI:
2346 case ASB_DISCARD_OLDER_PRI:
2347 case ASB_DISCARD_LEAST_CHG:
2348 case ASB_DISCARD_LOCAL:
2349 case ASB_DISCARD_REMOTE:
2350 case ASB_CONSENSUS:
2351 case ASB_DISCARD_SECONDARY:
2352 dev_err(DEV, "Configuration error.\n");
2353 break;
2354 case ASB_VIOLENTLY:
2355 rv = drbd_asb_recover_0p(mdev);
2356 break;
2357 case ASB_DISCONNECT:
2358 break;
2359 case ASB_CALL_HELPER:
2360 hg = drbd_asb_recover_0p(mdev);
2361 if (hg == -1) {
bb437946
AG
2362 enum drbd_state_rv rv2;
2363
b411b363
PR
2364 /* drbd_change_state() does not sleep while in SS_IN_TRANSIENT_STATE,
2365 * we might be here in C_WF_REPORT_PARAMS which is transient.
2366 * we do not need to wait for the after state change work either. */
bb437946
AG
2367 rv2 = drbd_change_state(mdev, CS_VERBOSE, NS(role, R_SECONDARY));
2368 if (rv2 != SS_SUCCESS) {
b411b363
PR
2369 drbd_khelper(mdev, "pri-lost-after-sb");
2370 } else {
2371 dev_warn(DEV, "Successfully gave up primary role.\n");
2372 rv = hg;
2373 }
2374 } else
2375 rv = hg;
2376 }
2377
2378 return rv;
2379}
2380
2381static void drbd_uuid_dump(struct drbd_conf *mdev, char *text, u64 *uuid,
2382 u64 bits, u64 flags)
2383{
2384 if (!uuid) {
2385 dev_info(DEV, "%s uuid info vanished while I was looking!\n", text);
2386 return;
2387 }
2388 dev_info(DEV, "%s %016llX:%016llX:%016llX:%016llX bits:%llu flags:%llX\n",
2389 text,
2390 (unsigned long long)uuid[UI_CURRENT],
2391 (unsigned long long)uuid[UI_BITMAP],
2392 (unsigned long long)uuid[UI_HISTORY_START],
2393 (unsigned long long)uuid[UI_HISTORY_END],
2394 (unsigned long long)bits,
2395 (unsigned long long)flags);
2396}
2397
2398/*
2399 100 after split brain try auto recover
2400 2 C_SYNC_SOURCE set BitMap
2401 1 C_SYNC_SOURCE use BitMap
2402 0 no Sync
2403 -1 C_SYNC_TARGET use BitMap
2404 -2 C_SYNC_TARGET set BitMap
2405 -100 after split brain, disconnect
2406-1000 unrelated data
4a23f264
PR
2407-1091 requires proto 91
2408-1096 requires proto 96
b411b363
PR
2409 */
2410static int drbd_uuid_compare(struct drbd_conf *mdev, int *rule_nr) __must_hold(local)
2411{
2412 u64 self, peer;
2413 int i, j;
2414
2415 self = mdev->ldev->md.uuid[UI_CURRENT] & ~((u64)1);
2416 peer = mdev->p_uuid[UI_CURRENT] & ~((u64)1);
2417
2418 *rule_nr = 10;
2419 if (self == UUID_JUST_CREATED && peer == UUID_JUST_CREATED)
2420 return 0;
2421
2422 *rule_nr = 20;
2423 if ((self == UUID_JUST_CREATED || self == (u64)0) &&
2424 peer != UUID_JUST_CREATED)
2425 return -2;
2426
2427 *rule_nr = 30;
2428 if (self != UUID_JUST_CREATED &&
2429 (peer == UUID_JUST_CREATED || peer == (u64)0))
2430 return 2;
2431
2432 if (self == peer) {
2433 int rct, dc; /* roles at crash time */
2434
2435 if (mdev->p_uuid[UI_BITMAP] == (u64)0 && mdev->ldev->md.uuid[UI_BITMAP] != (u64)0) {
2436
31890f4a 2437 if (mdev->tconn->agreed_pro_version < 91)
4a23f264 2438 return -1091;
b411b363
PR
2439
2440 if ((mdev->ldev->md.uuid[UI_BITMAP] & ~((u64)1)) == (mdev->p_uuid[UI_HISTORY_START] & ~((u64)1)) &&
2441 (mdev->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) == (mdev->p_uuid[UI_HISTORY_START + 1] & ~((u64)1))) {
2442 dev_info(DEV, "was SyncSource, missed the resync finished event, corrected myself:\n");
2443 drbd_uuid_set_bm(mdev, 0UL);
2444
2445 drbd_uuid_dump(mdev, "self", mdev->ldev->md.uuid,
2446 mdev->state.disk >= D_NEGOTIATING ? drbd_bm_total_weight(mdev) : 0, 0);
2447 *rule_nr = 34;
2448 } else {
2449 dev_info(DEV, "was SyncSource (peer failed to write sync_uuid)\n");
2450 *rule_nr = 36;
2451 }
2452
2453 return 1;
2454 }
2455
2456 if (mdev->ldev->md.uuid[UI_BITMAP] == (u64)0 && mdev->p_uuid[UI_BITMAP] != (u64)0) {
2457
31890f4a 2458 if (mdev->tconn->agreed_pro_version < 91)
4a23f264 2459 return -1091;
b411b363
PR
2460
2461 if ((mdev->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) == (mdev->p_uuid[UI_BITMAP] & ~((u64)1)) &&
2462 (mdev->ldev->md.uuid[UI_HISTORY_START + 1] & ~((u64)1)) == (mdev->p_uuid[UI_HISTORY_START] & ~((u64)1))) {
2463 dev_info(DEV, "was SyncTarget, peer missed the resync finished event, corrected peer:\n");
2464
2465 mdev->p_uuid[UI_HISTORY_START + 1] = mdev->p_uuid[UI_HISTORY_START];
2466 mdev->p_uuid[UI_HISTORY_START] = mdev->p_uuid[UI_BITMAP];
2467 mdev->p_uuid[UI_BITMAP] = 0UL;
2468
2469 drbd_uuid_dump(mdev, "peer", mdev->p_uuid, mdev->p_uuid[UI_SIZE], mdev->p_uuid[UI_FLAGS]);
2470 *rule_nr = 35;
2471 } else {
2472 dev_info(DEV, "was SyncTarget (failed to write sync_uuid)\n");
2473 *rule_nr = 37;
2474 }
2475
2476 return -1;
2477 }
2478
2479 /* Common power [off|failure] */
2480 rct = (test_bit(CRASHED_PRIMARY, &mdev->flags) ? 1 : 0) +
2481 (mdev->p_uuid[UI_FLAGS] & 2);
2482 /* lowest bit is set when we were primary,
2483 * next bit (weight 2) is set when peer was primary */
2484 *rule_nr = 40;
2485
2486 switch (rct) {
2487 case 0: /* !self_pri && !peer_pri */ return 0;
2488 case 1: /* self_pri && !peer_pri */ return 1;
2489 case 2: /* !self_pri && peer_pri */ return -1;
2490 case 3: /* self_pri && peer_pri */
25703f83 2491 dc = test_bit(DISCARD_CONCURRENT, &mdev->tconn->flags);
b411b363
PR
2492 return dc ? -1 : 1;
2493 }
2494 }
2495
2496 *rule_nr = 50;
2497 peer = mdev->p_uuid[UI_BITMAP] & ~((u64)1);
2498 if (self == peer)
2499 return -1;
2500
2501 *rule_nr = 51;
2502 peer = mdev->p_uuid[UI_HISTORY_START] & ~((u64)1);
2503 if (self == peer) {
31890f4a 2504 if (mdev->tconn->agreed_pro_version < 96 ?
4a23f264
PR
2505 (mdev->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) ==
2506 (mdev->p_uuid[UI_HISTORY_START + 1] & ~((u64)1)) :
2507 peer + UUID_NEW_BM_OFFSET == (mdev->p_uuid[UI_BITMAP] & ~((u64)1))) {
b411b363
PR
2508 /* The last P_SYNC_UUID did not get though. Undo the last start of
2509 resync as sync source modifications of the peer's UUIDs. */
2510
31890f4a 2511 if (mdev->tconn->agreed_pro_version < 91)
4a23f264 2512 return -1091;
b411b363
PR
2513
2514 mdev->p_uuid[UI_BITMAP] = mdev->p_uuid[UI_HISTORY_START];
2515 mdev->p_uuid[UI_HISTORY_START] = mdev->p_uuid[UI_HISTORY_START + 1];
4a23f264
PR
2516
2517 dev_info(DEV, "Did not got last syncUUID packet, corrected:\n");
2518 drbd_uuid_dump(mdev, "peer", mdev->p_uuid, mdev->p_uuid[UI_SIZE], mdev->p_uuid[UI_FLAGS]);
2519
b411b363
PR
2520 return -1;
2521 }
2522 }
2523
2524 *rule_nr = 60;
2525 self = mdev->ldev->md.uuid[UI_CURRENT] & ~((u64)1);
2526 for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
2527 peer = mdev->p_uuid[i] & ~((u64)1);
2528 if (self == peer)
2529 return -2;
2530 }
2531
2532 *rule_nr = 70;
2533 self = mdev->ldev->md.uuid[UI_BITMAP] & ~((u64)1);
2534 peer = mdev->p_uuid[UI_CURRENT] & ~((u64)1);
2535 if (self == peer)
2536 return 1;
2537
2538 *rule_nr = 71;
2539 self = mdev->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1);
2540 if (self == peer) {
31890f4a 2541 if (mdev->tconn->agreed_pro_version < 96 ?
4a23f264
PR
2542 (mdev->ldev->md.uuid[UI_HISTORY_START + 1] & ~((u64)1)) ==
2543 (mdev->p_uuid[UI_HISTORY_START] & ~((u64)1)) :
2544 self + UUID_NEW_BM_OFFSET == (mdev->ldev->md.uuid[UI_BITMAP] & ~((u64)1))) {
b411b363
PR
2545 /* The last P_SYNC_UUID did not get though. Undo the last start of
2546 resync as sync source modifications of our UUIDs. */
2547
31890f4a 2548 if (mdev->tconn->agreed_pro_version < 91)
4a23f264 2549 return -1091;
b411b363
PR
2550
2551 _drbd_uuid_set(mdev, UI_BITMAP, mdev->ldev->md.uuid[UI_HISTORY_START]);
2552 _drbd_uuid_set(mdev, UI_HISTORY_START, mdev->ldev->md.uuid[UI_HISTORY_START + 1]);
2553
4a23f264 2554 dev_info(DEV, "Last syncUUID did not get through, corrected:\n");
b411b363
PR
2555 drbd_uuid_dump(mdev, "self", mdev->ldev->md.uuid,
2556 mdev->state.disk >= D_NEGOTIATING ? drbd_bm_total_weight(mdev) : 0, 0);
2557
2558 return 1;
2559 }
2560 }
2561
2562
2563 *rule_nr = 80;
d8c2a36b 2564 peer = mdev->p_uuid[UI_CURRENT] & ~((u64)1);
b411b363
PR
2565 for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
2566 self = mdev->ldev->md.uuid[i] & ~((u64)1);
2567 if (self == peer)
2568 return 2;
2569 }
2570
2571 *rule_nr = 90;
2572 self = mdev->ldev->md.uuid[UI_BITMAP] & ~((u64)1);
2573 peer = mdev->p_uuid[UI_BITMAP] & ~((u64)1);
2574 if (self == peer && self != ((u64)0))
2575 return 100;
2576
2577 *rule_nr = 100;
2578 for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
2579 self = mdev->ldev->md.uuid[i] & ~((u64)1);
2580 for (j = UI_HISTORY_START; j <= UI_HISTORY_END; j++) {
2581 peer = mdev->p_uuid[j] & ~((u64)1);
2582 if (self == peer)
2583 return -100;
2584 }
2585 }
2586
2587 return -1000;
2588}
2589
2590/* drbd_sync_handshake() returns the new conn state on success, or
2591 CONN_MASK (-1) on failure.
2592 */
2593static enum drbd_conns drbd_sync_handshake(struct drbd_conf *mdev, enum drbd_role peer_role,
2594 enum drbd_disk_state peer_disk) __must_hold(local)
2595{
2596 int hg, rule_nr;
2597 enum drbd_conns rv = C_MASK;
2598 enum drbd_disk_state mydisk;
2599
2600 mydisk = mdev->state.disk;
2601 if (mydisk == D_NEGOTIATING)
2602 mydisk = mdev->new_state_tmp.disk;
2603
2604 dev_info(DEV, "drbd_sync_handshake:\n");
2605 drbd_uuid_dump(mdev, "self", mdev->ldev->md.uuid, mdev->comm_bm_set, 0);
2606 drbd_uuid_dump(mdev, "peer", mdev->p_uuid,
2607 mdev->p_uuid[UI_SIZE], mdev->p_uuid[UI_FLAGS]);
2608
2609 hg = drbd_uuid_compare(mdev, &rule_nr);
2610
2611 dev_info(DEV, "uuid_compare()=%d by rule %d\n", hg, rule_nr);
2612
2613 if (hg == -1000) {
2614 dev_alert(DEV, "Unrelated data, aborting!\n");
2615 return C_MASK;
2616 }
4a23f264
PR
2617 if (hg < -1000) {
2618 dev_alert(DEV, "To resolve this both sides have to support at least protocol %d\n", -hg - 1000);
b411b363
PR
2619 return C_MASK;
2620 }
2621
2622 if ((mydisk == D_INCONSISTENT && peer_disk > D_INCONSISTENT) ||
2623 (peer_disk == D_INCONSISTENT && mydisk > D_INCONSISTENT)) {
2624 int f = (hg == -100) || abs(hg) == 2;
2625 hg = mydisk > D_INCONSISTENT ? 1 : -1;
2626 if (f)
2627 hg = hg*2;
2628 dev_info(DEV, "Becoming sync %s due to disk states.\n",
2629 hg > 0 ? "source" : "target");
2630 }
2631
3a11a487
AG
2632 if (abs(hg) == 100)
2633 drbd_khelper(mdev, "initial-split-brain");
2634
89e58e75 2635 if (hg == 100 || (hg == -100 && mdev->tconn->net_conf->always_asbp)) {
b411b363
PR
2636 int pcount = (mdev->state.role == R_PRIMARY)
2637 + (peer_role == R_PRIMARY);
2638 int forced = (hg == -100);
2639
2640 switch (pcount) {
2641 case 0:
2642 hg = drbd_asb_recover_0p(mdev);
2643 break;
2644 case 1:
2645 hg = drbd_asb_recover_1p(mdev);
2646 break;
2647 case 2:
2648 hg = drbd_asb_recover_2p(mdev);
2649 break;
2650 }
2651 if (abs(hg) < 100) {
2652 dev_warn(DEV, "Split-Brain detected, %d primaries, "
2653 "automatically solved. Sync from %s node\n",
2654 pcount, (hg < 0) ? "peer" : "this");
2655 if (forced) {
2656 dev_warn(DEV, "Doing a full sync, since"
2657 " UUIDs where ambiguous.\n");
2658 hg = hg*2;
2659 }
2660 }
2661 }
2662
2663 if (hg == -100) {
89e58e75 2664 if (mdev->tconn->net_conf->want_lose && !(mdev->p_uuid[UI_FLAGS]&1))
b411b363 2665 hg = -1;
89e58e75 2666 if (!mdev->tconn->net_conf->want_lose && (mdev->p_uuid[UI_FLAGS]&1))
b411b363
PR
2667 hg = 1;
2668
2669 if (abs(hg) < 100)
2670 dev_warn(DEV, "Split-Brain detected, manually solved. "
2671 "Sync from %s node\n",
2672 (hg < 0) ? "peer" : "this");
2673 }
2674
2675 if (hg == -100) {
580b9767
LE
2676 /* FIXME this log message is not correct if we end up here
2677 * after an attempted attach on a diskless node.
2678 * We just refuse to attach -- well, we drop the "connection"
2679 * to that disk, in a way... */
3a11a487 2680 dev_alert(DEV, "Split-Brain detected but unresolved, dropping connection!\n");
b411b363
PR
2681 drbd_khelper(mdev, "split-brain");
2682 return C_MASK;
2683 }
2684
2685 if (hg > 0 && mydisk <= D_INCONSISTENT) {
2686 dev_err(DEV, "I shall become SyncSource, but I am inconsistent!\n");
2687 return C_MASK;
2688 }
2689
2690 if (hg < 0 && /* by intention we do not use mydisk here. */
2691 mdev->state.role == R_PRIMARY && mdev->state.disk >= D_CONSISTENT) {
89e58e75 2692 switch (mdev->tconn->net_conf->rr_conflict) {
b411b363
PR
2693 case ASB_CALL_HELPER:
2694 drbd_khelper(mdev, "pri-lost");
2695 /* fall through */
2696 case ASB_DISCONNECT:
2697 dev_err(DEV, "I shall become SyncTarget, but I am primary!\n");
2698 return C_MASK;
2699 case ASB_VIOLENTLY:
2700 dev_warn(DEV, "Becoming SyncTarget, violating the stable-data"
2701 "assumption\n");
2702 }
2703 }
2704
89e58e75 2705 if (mdev->tconn->net_conf->dry_run || test_bit(CONN_DRY_RUN, &mdev->flags)) {
cf14c2e9
PR
2706 if (hg == 0)
2707 dev_info(DEV, "dry-run connect: No resync, would become Connected immediately.\n");
2708 else
2709 dev_info(DEV, "dry-run connect: Would become %s, doing a %s resync.",
2710 drbd_conn_str(hg > 0 ? C_SYNC_SOURCE : C_SYNC_TARGET),
2711 abs(hg) >= 2 ? "full" : "bit-map based");
2712 return C_MASK;
2713 }
2714
b411b363
PR
2715 if (abs(hg) >= 2) {
2716 dev_info(DEV, "Writing the whole bitmap, full sync required after drbd_sync_handshake.\n");
20ceb2b2
LE
2717 if (drbd_bitmap_io(mdev, &drbd_bmio_set_n_write, "set_n_write from sync_handshake",
2718 BM_LOCKED_SET_ALLOWED))
b411b363
PR
2719 return C_MASK;
2720 }
2721
2722 if (hg > 0) { /* become sync source. */
2723 rv = C_WF_BITMAP_S;
2724 } else if (hg < 0) { /* become sync target */
2725 rv = C_WF_BITMAP_T;
2726 } else {
2727 rv = C_CONNECTED;
2728 if (drbd_bm_total_weight(mdev)) {
2729 dev_info(DEV, "No resync, but %lu bits in bitmap!\n",
2730 drbd_bm_total_weight(mdev));
2731 }
2732 }
2733
2734 return rv;
2735}
2736
2737/* returns 1 if invalid */
2738static int cmp_after_sb(enum drbd_after_sb_p peer, enum drbd_after_sb_p self)
2739{
2740 /* ASB_DISCARD_REMOTE - ASB_DISCARD_LOCAL is valid */
2741 if ((peer == ASB_DISCARD_REMOTE && self == ASB_DISCARD_LOCAL) ||
2742 (self == ASB_DISCARD_REMOTE && peer == ASB_DISCARD_LOCAL))
2743 return 0;
2744
2745 /* any other things with ASB_DISCARD_REMOTE or ASB_DISCARD_LOCAL are invalid */
2746 if (peer == ASB_DISCARD_REMOTE || peer == ASB_DISCARD_LOCAL ||
2747 self == ASB_DISCARD_REMOTE || self == ASB_DISCARD_LOCAL)
2748 return 1;
2749
2750 /* everything else is valid if they are equal on both sides. */
2751 if (peer == self)
2752 return 0;
2753
2754 /* everything es is invalid. */
2755 return 1;
2756}
2757
d8763023
AG
2758static int receive_protocol(struct drbd_conf *mdev, enum drbd_packet cmd,
2759 unsigned int data_size)
b411b363 2760{
e42325a5 2761 struct p_protocol *p = &mdev->tconn->data.rbuf.protocol;
b411b363 2762 int p_proto, p_after_sb_0p, p_after_sb_1p, p_after_sb_2p;
cf14c2e9 2763 int p_want_lose, p_two_primaries, cf;
b411b363
PR
2764 char p_integrity_alg[SHARED_SECRET_MAX] = "";
2765
b411b363
PR
2766 p_proto = be32_to_cpu(p->protocol);
2767 p_after_sb_0p = be32_to_cpu(p->after_sb_0p);
2768 p_after_sb_1p = be32_to_cpu(p->after_sb_1p);
2769 p_after_sb_2p = be32_to_cpu(p->after_sb_2p);
b411b363 2770 p_two_primaries = be32_to_cpu(p->two_primaries);
cf14c2e9
PR
2771 cf = be32_to_cpu(p->conn_flags);
2772 p_want_lose = cf & CF_WANT_LOSE;
2773
2774 clear_bit(CONN_DRY_RUN, &mdev->flags);
2775
2776 if (cf & CF_DRY_RUN)
2777 set_bit(CONN_DRY_RUN, &mdev->flags);
b411b363 2778
89e58e75 2779 if (p_proto != mdev->tconn->net_conf->wire_protocol) {
b411b363
PR
2780 dev_err(DEV, "incompatible communication protocols\n");
2781 goto disconnect;
2782 }
2783
89e58e75 2784 if (cmp_after_sb(p_after_sb_0p, mdev->tconn->net_conf->after_sb_0p)) {
b411b363
PR
2785 dev_err(DEV, "incompatible after-sb-0pri settings\n");
2786 goto disconnect;
2787 }
2788
89e58e75 2789 if (cmp_after_sb(p_after_sb_1p, mdev->tconn->net_conf->after_sb_1p)) {
b411b363
PR
2790 dev_err(DEV, "incompatible after-sb-1pri settings\n");
2791 goto disconnect;
2792 }
2793
89e58e75 2794 if (cmp_after_sb(p_after_sb_2p, mdev->tconn->net_conf->after_sb_2p)) {
b411b363
PR
2795 dev_err(DEV, "incompatible after-sb-2pri settings\n");
2796 goto disconnect;
2797 }
2798
89e58e75 2799 if (p_want_lose && mdev->tconn->net_conf->want_lose) {
b411b363
PR
2800 dev_err(DEV, "both sides have the 'want_lose' flag set\n");
2801 goto disconnect;
2802 }
2803
89e58e75 2804 if (p_two_primaries != mdev->tconn->net_conf->two_primaries) {
b411b363
PR
2805 dev_err(DEV, "incompatible setting of the two-primaries options\n");
2806 goto disconnect;
2807 }
2808
31890f4a 2809 if (mdev->tconn->agreed_pro_version >= 87) {
89e58e75 2810 unsigned char *my_alg = mdev->tconn->net_conf->integrity_alg;
b411b363 2811
de0ff338 2812 if (drbd_recv(mdev->tconn, p_integrity_alg, data_size) != data_size)
81e84650 2813 return false;
b411b363
PR
2814
2815 p_integrity_alg[SHARED_SECRET_MAX-1] = 0;
2816 if (strcmp(p_integrity_alg, my_alg)) {
2817 dev_err(DEV, "incompatible setting of the data-integrity-alg\n");
2818 goto disconnect;
2819 }
2820 dev_info(DEV, "data-integrity-alg: %s\n",
2821 my_alg[0] ? my_alg : (unsigned char *)"<not-used>");
2822 }
2823
81e84650 2824 return true;
b411b363
PR
2825
2826disconnect:
2827 drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
81e84650 2828 return false;
b411b363
PR
2829}
2830
2831/* helper function
2832 * input: alg name, feature name
2833 * return: NULL (alg name was "")
2834 * ERR_PTR(error) if something goes wrong
2835 * or the crypto hash ptr, if it worked out ok. */
2836struct crypto_hash *drbd_crypto_alloc_digest_safe(const struct drbd_conf *mdev,
2837 const char *alg, const char *name)
2838{
2839 struct crypto_hash *tfm;
2840
2841 if (!alg[0])
2842 return NULL;
2843
2844 tfm = crypto_alloc_hash(alg, 0, CRYPTO_ALG_ASYNC);
2845 if (IS_ERR(tfm)) {
2846 dev_err(DEV, "Can not allocate \"%s\" as %s (reason: %ld)\n",
2847 alg, name, PTR_ERR(tfm));
2848 return tfm;
2849 }
2850 if (!drbd_crypto_is_hash(crypto_hash_tfm(tfm))) {
2851 crypto_free_hash(tfm);
2852 dev_err(DEV, "\"%s\" is not a digest (%s)\n", alg, name);
2853 return ERR_PTR(-EINVAL);
2854 }
2855 return tfm;
2856}
2857
d8763023
AG
2858static int receive_SyncParam(struct drbd_conf *mdev, enum drbd_packet cmd,
2859 unsigned int packet_size)
b411b363 2860{
81e84650 2861 int ok = true;
e42325a5 2862 struct p_rs_param_95 *p = &mdev->tconn->data.rbuf.rs_param_95;
b411b363
PR
2863 unsigned int header_size, data_size, exp_max_sz;
2864 struct crypto_hash *verify_tfm = NULL;
2865 struct crypto_hash *csums_tfm = NULL;
31890f4a 2866 const int apv = mdev->tconn->agreed_pro_version;
778f271d
PR
2867 int *rs_plan_s = NULL;
2868 int fifo_size = 0;
b411b363
PR
2869
2870 exp_max_sz = apv <= 87 ? sizeof(struct p_rs_param)
2871 : apv == 88 ? sizeof(struct p_rs_param)
2872 + SHARED_SECRET_MAX
8e26f9cc
PR
2873 : apv <= 94 ? sizeof(struct p_rs_param_89)
2874 : /* apv >= 95 */ sizeof(struct p_rs_param_95);
b411b363 2875
02918be2 2876 if (packet_size > exp_max_sz) {
b411b363 2877 dev_err(DEV, "SyncParam packet too long: received %u, expected <= %u bytes\n",
02918be2 2878 packet_size, exp_max_sz);
81e84650 2879 return false;
b411b363
PR
2880 }
2881
2882 if (apv <= 88) {
257d0af6 2883 header_size = sizeof(struct p_rs_param) - sizeof(struct p_header);
02918be2 2884 data_size = packet_size - header_size;
8e26f9cc 2885 } else if (apv <= 94) {
257d0af6 2886 header_size = sizeof(struct p_rs_param_89) - sizeof(struct p_header);
02918be2 2887 data_size = packet_size - header_size;
b411b363 2888 D_ASSERT(data_size == 0);
8e26f9cc 2889 } else {
257d0af6 2890 header_size = sizeof(struct p_rs_param_95) - sizeof(struct p_header);
02918be2 2891 data_size = packet_size - header_size;
b411b363
PR
2892 D_ASSERT(data_size == 0);
2893 }
2894
2895 /* initialize verify_alg and csums_alg */
2896 memset(p->verify_alg, 0, 2 * SHARED_SECRET_MAX);
2897
de0ff338 2898 if (drbd_recv(mdev->tconn, &p->head.payload, header_size) != header_size)
81e84650 2899 return false;
b411b363
PR
2900
2901 mdev->sync_conf.rate = be32_to_cpu(p->rate);
2902
2903 if (apv >= 88) {
2904 if (apv == 88) {
2905 if (data_size > SHARED_SECRET_MAX) {
2906 dev_err(DEV, "verify-alg too long, "
2907 "peer wants %u, accepting only %u byte\n",
2908 data_size, SHARED_SECRET_MAX);
81e84650 2909 return false;
b411b363
PR
2910 }
2911
de0ff338 2912 if (drbd_recv(mdev->tconn, p->verify_alg, data_size) != data_size)
81e84650 2913 return false;
b411b363
PR
2914
2915 /* we expect NUL terminated string */
2916 /* but just in case someone tries to be evil */
2917 D_ASSERT(p->verify_alg[data_size-1] == 0);
2918 p->verify_alg[data_size-1] = 0;
2919
2920 } else /* apv >= 89 */ {
2921 /* we still expect NUL terminated strings */
2922 /* but just in case someone tries to be evil */
2923 D_ASSERT(p->verify_alg[SHARED_SECRET_MAX-1] == 0);
2924 D_ASSERT(p->csums_alg[SHARED_SECRET_MAX-1] == 0);
2925 p->verify_alg[SHARED_SECRET_MAX-1] = 0;
2926 p->csums_alg[SHARED_SECRET_MAX-1] = 0;
2927 }
2928
2929 if (strcmp(mdev->sync_conf.verify_alg, p->verify_alg)) {
2930 if (mdev->state.conn == C_WF_REPORT_PARAMS) {
2931 dev_err(DEV, "Different verify-alg settings. me=\"%s\" peer=\"%s\"\n",
2932 mdev->sync_conf.verify_alg, p->verify_alg);
2933 goto disconnect;
2934 }
2935 verify_tfm = drbd_crypto_alloc_digest_safe(mdev,
2936 p->verify_alg, "verify-alg");
2937 if (IS_ERR(verify_tfm)) {
2938 verify_tfm = NULL;
2939 goto disconnect;
2940 }
2941 }
2942
2943 if (apv >= 89 && strcmp(mdev->sync_conf.csums_alg, p->csums_alg)) {
2944 if (mdev->state.conn == C_WF_REPORT_PARAMS) {
2945 dev_err(DEV, "Different csums-alg settings. me=\"%s\" peer=\"%s\"\n",
2946 mdev->sync_conf.csums_alg, p->csums_alg);
2947 goto disconnect;
2948 }
2949 csums_tfm = drbd_crypto_alloc_digest_safe(mdev,
2950 p->csums_alg, "csums-alg");
2951 if (IS_ERR(csums_tfm)) {
2952 csums_tfm = NULL;
2953 goto disconnect;
2954 }
2955 }
2956
8e26f9cc
PR
2957 if (apv > 94) {
2958 mdev->sync_conf.rate = be32_to_cpu(p->rate);
2959 mdev->sync_conf.c_plan_ahead = be32_to_cpu(p->c_plan_ahead);
2960 mdev->sync_conf.c_delay_target = be32_to_cpu(p->c_delay_target);
2961 mdev->sync_conf.c_fill_target = be32_to_cpu(p->c_fill_target);
2962 mdev->sync_conf.c_max_rate = be32_to_cpu(p->c_max_rate);
778f271d
PR
2963
2964 fifo_size = (mdev->sync_conf.c_plan_ahead * 10 * SLEEP_TIME) / HZ;
2965 if (fifo_size != mdev->rs_plan_s.size && fifo_size > 0) {
2966 rs_plan_s = kzalloc(sizeof(int) * fifo_size, GFP_KERNEL);
2967 if (!rs_plan_s) {
2968 dev_err(DEV, "kmalloc of fifo_buffer failed");
2969 goto disconnect;
2970 }
2971 }
8e26f9cc 2972 }
b411b363
PR
2973
2974 spin_lock(&mdev->peer_seq_lock);
2975 /* lock against drbd_nl_syncer_conf() */
2976 if (verify_tfm) {
2977 strcpy(mdev->sync_conf.verify_alg, p->verify_alg);
2978 mdev->sync_conf.verify_alg_len = strlen(p->verify_alg) + 1;
2979 crypto_free_hash(mdev->verify_tfm);
2980 mdev->verify_tfm = verify_tfm;
2981 dev_info(DEV, "using verify-alg: \"%s\"\n", p->verify_alg);
2982 }
2983 if (csums_tfm) {
2984 strcpy(mdev->sync_conf.csums_alg, p->csums_alg);
2985 mdev->sync_conf.csums_alg_len = strlen(p->csums_alg) + 1;
2986 crypto_free_hash(mdev->csums_tfm);
2987 mdev->csums_tfm = csums_tfm;
2988 dev_info(DEV, "using csums-alg: \"%s\"\n", p->csums_alg);
2989 }
778f271d
PR
2990 if (fifo_size != mdev->rs_plan_s.size) {
2991 kfree(mdev->rs_plan_s.values);
2992 mdev->rs_plan_s.values = rs_plan_s;
2993 mdev->rs_plan_s.size = fifo_size;
2994 mdev->rs_planed = 0;
2995 }
b411b363
PR
2996 spin_unlock(&mdev->peer_seq_lock);
2997 }
2998
2999 return ok;
3000disconnect:
3001 /* just for completeness: actually not needed,
3002 * as this is not reached if csums_tfm was ok. */
3003 crypto_free_hash(csums_tfm);
3004 /* but free the verify_tfm again, if csums_tfm did not work out */
3005 crypto_free_hash(verify_tfm);
3006 drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
81e84650 3007 return false;
b411b363
PR
3008}
3009
b411b363
PR
3010/* warn if the arguments differ by more than 12.5% */
3011static void warn_if_differ_considerably(struct drbd_conf *mdev,
3012 const char *s, sector_t a, sector_t b)
3013{
3014 sector_t d;
3015 if (a == 0 || b == 0)
3016 return;
3017 d = (a > b) ? (a - b) : (b - a);
3018 if (d > (a>>3) || d > (b>>3))
3019 dev_warn(DEV, "Considerable difference in %s: %llus vs. %llus\n", s,
3020 (unsigned long long)a, (unsigned long long)b);
3021}
3022
d8763023
AG
3023static int receive_sizes(struct drbd_conf *mdev, enum drbd_packet cmd,
3024 unsigned int data_size)
b411b363 3025{
e42325a5 3026 struct p_sizes *p = &mdev->tconn->data.rbuf.sizes;
b411b363 3027 enum determine_dev_size dd = unchanged;
b411b363
PR
3028 sector_t p_size, p_usize, my_usize;
3029 int ldsc = 0; /* local disk size changed */
e89b591c 3030 enum dds_flags ddsf;
b411b363 3031
b411b363
PR
3032 p_size = be64_to_cpu(p->d_size);
3033 p_usize = be64_to_cpu(p->u_size);
3034
3035 if (p_size == 0 && mdev->state.disk == D_DISKLESS) {
3036 dev_err(DEV, "some backing storage is needed\n");
3037 drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
81e84650 3038 return false;
b411b363
PR
3039 }
3040
3041 /* just store the peer's disk size for now.
3042 * we still need to figure out whether we accept that. */
3043 mdev->p_size = p_size;
3044
b411b363
PR
3045 if (get_ldev(mdev)) {
3046 warn_if_differ_considerably(mdev, "lower level device sizes",
3047 p_size, drbd_get_max_capacity(mdev->ldev));
3048 warn_if_differ_considerably(mdev, "user requested size",
3049 p_usize, mdev->ldev->dc.disk_size);
3050
3051 /* if this is the first connect, or an otherwise expected
3052 * param exchange, choose the minimum */
3053 if (mdev->state.conn == C_WF_REPORT_PARAMS)
3054 p_usize = min_not_zero((sector_t)mdev->ldev->dc.disk_size,
3055 p_usize);
3056
3057 my_usize = mdev->ldev->dc.disk_size;
3058
3059 if (mdev->ldev->dc.disk_size != p_usize) {
3060 mdev->ldev->dc.disk_size = p_usize;
3061 dev_info(DEV, "Peer sets u_size to %lu sectors\n",
3062 (unsigned long)mdev->ldev->dc.disk_size);
3063 }
3064
3065 /* Never shrink a device with usable data during connect.
3066 But allow online shrinking if we are connected. */
a393db6f 3067 if (drbd_new_dev_size(mdev, mdev->ldev, 0) <
b411b363
PR
3068 drbd_get_capacity(mdev->this_bdev) &&
3069 mdev->state.disk >= D_OUTDATED &&
3070 mdev->state.conn < C_CONNECTED) {
3071 dev_err(DEV, "The peer's disk size is too small!\n");
3072 drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
3073 mdev->ldev->dc.disk_size = my_usize;
3074 put_ldev(mdev);
81e84650 3075 return false;
b411b363
PR
3076 }
3077 put_ldev(mdev);
3078 }
b411b363 3079
e89b591c 3080 ddsf = be16_to_cpu(p->dds_flags);
b411b363 3081 if (get_ldev(mdev)) {
24c4830c 3082 dd = drbd_determine_dev_size(mdev, ddsf);
b411b363
PR
3083 put_ldev(mdev);
3084 if (dd == dev_size_error)
81e84650 3085 return false;
b411b363
PR
3086 drbd_md_sync(mdev);
3087 } else {
3088 /* I am diskless, need to accept the peer's size. */
3089 drbd_set_my_capacity(mdev, p_size);
3090 }
3091
99432fcc
PR
3092 mdev->peer_max_bio_size = be32_to_cpu(p->max_bio_size);
3093 drbd_reconsider_max_bio_size(mdev);
3094
b411b363
PR
3095 if (get_ldev(mdev)) {
3096 if (mdev->ldev->known_size != drbd_get_capacity(mdev->ldev->backing_bdev)) {
3097 mdev->ldev->known_size = drbd_get_capacity(mdev->ldev->backing_bdev);
3098 ldsc = 1;
3099 }
3100
b411b363
PR
3101 put_ldev(mdev);
3102 }
3103
3104 if (mdev->state.conn > C_WF_REPORT_PARAMS) {
3105 if (be64_to_cpu(p->c_size) !=
3106 drbd_get_capacity(mdev->this_bdev) || ldsc) {
3107 /* we have different sizes, probably peer
3108 * needs to know my new size... */
e89b591c 3109 drbd_send_sizes(mdev, 0, ddsf);
b411b363
PR
3110 }
3111 if (test_and_clear_bit(RESIZE_PENDING, &mdev->flags) ||
3112 (dd == grew && mdev->state.conn == C_CONNECTED)) {
3113 if (mdev->state.pdsk >= D_INCONSISTENT &&
e89b591c
PR
3114 mdev->state.disk >= D_INCONSISTENT) {
3115 if (ddsf & DDSF_NO_RESYNC)
3116 dev_info(DEV, "Resync of new storage suppressed with --assume-clean\n");
3117 else
3118 resync_after_online_grow(mdev);
3119 } else
b411b363
PR
3120 set_bit(RESYNC_AFTER_NEG, &mdev->flags);
3121 }
3122 }
3123
81e84650 3124 return true;
b411b363
PR
3125}
3126
d8763023
AG
3127static int receive_uuids(struct drbd_conf *mdev, enum drbd_packet cmd,
3128 unsigned int data_size)
b411b363 3129{
e42325a5 3130 struct p_uuids *p = &mdev->tconn->data.rbuf.uuids;
b411b363 3131 u64 *p_uuid;
62b0da3a 3132 int i, updated_uuids = 0;
b411b363 3133
b411b363
PR
3134 p_uuid = kmalloc(sizeof(u64)*UI_EXTENDED_SIZE, GFP_NOIO);
3135
3136 for (i = UI_CURRENT; i < UI_EXTENDED_SIZE; i++)
3137 p_uuid[i] = be64_to_cpu(p->uuid[i]);
3138
3139 kfree(mdev->p_uuid);
3140 mdev->p_uuid = p_uuid;
3141
3142 if (mdev->state.conn < C_CONNECTED &&
3143 mdev->state.disk < D_INCONSISTENT &&
3144 mdev->state.role == R_PRIMARY &&
3145 (mdev->ed_uuid & ~((u64)1)) != (p_uuid[UI_CURRENT] & ~((u64)1))) {
3146 dev_err(DEV, "Can only connect to data with current UUID=%016llX\n",
3147 (unsigned long long)mdev->ed_uuid);
3148 drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
81e84650 3149 return false;
b411b363
PR
3150 }
3151
3152 if (get_ldev(mdev)) {
3153 int skip_initial_sync =
3154 mdev->state.conn == C_CONNECTED &&
31890f4a 3155 mdev->tconn->agreed_pro_version >= 90 &&
b411b363
PR
3156 mdev->ldev->md.uuid[UI_CURRENT] == UUID_JUST_CREATED &&
3157 (p_uuid[UI_FLAGS] & 8);
3158 if (skip_initial_sync) {
3159 dev_info(DEV, "Accepted new current UUID, preparing to skip initial sync\n");
3160 drbd_bitmap_io(mdev, &drbd_bmio_clear_n_write,
20ceb2b2
LE
3161 "clear_n_write from receive_uuids",
3162 BM_LOCKED_TEST_ALLOWED);
b411b363
PR
3163 _drbd_uuid_set(mdev, UI_CURRENT, p_uuid[UI_CURRENT]);
3164 _drbd_uuid_set(mdev, UI_BITMAP, 0);
3165 _drbd_set_state(_NS2(mdev, disk, D_UP_TO_DATE, pdsk, D_UP_TO_DATE),
3166 CS_VERBOSE, NULL);
3167 drbd_md_sync(mdev);
62b0da3a 3168 updated_uuids = 1;
b411b363
PR
3169 }
3170 put_ldev(mdev);
18a50fa2
PR
3171 } else if (mdev->state.disk < D_INCONSISTENT &&
3172 mdev->state.role == R_PRIMARY) {
3173 /* I am a diskless primary, the peer just created a new current UUID
3174 for me. */
62b0da3a 3175 updated_uuids = drbd_set_ed_uuid(mdev, p_uuid[UI_CURRENT]);
b411b363
PR
3176 }
3177
3178 /* Before we test for the disk state, we should wait until an eventually
3179 ongoing cluster wide state change is finished. That is important if
3180 we are primary and are detaching from our disk. We need to see the
3181 new disk state... */
8410da8f
PR
3182 mutex_lock(mdev->state_mutex);
3183 mutex_unlock(mdev->state_mutex);
b411b363 3184 if (mdev->state.conn >= C_CONNECTED && mdev->state.disk < D_INCONSISTENT)
62b0da3a
LE
3185 updated_uuids |= drbd_set_ed_uuid(mdev, p_uuid[UI_CURRENT]);
3186
3187 if (updated_uuids)
3188 drbd_print_uuids(mdev, "receiver updated UUIDs to");
b411b363 3189
81e84650 3190 return true;
b411b363
PR
3191}
3192
3193/**
3194 * convert_state() - Converts the peer's view of the cluster state to our point of view
3195 * @ps: The state as seen by the peer.
3196 */
3197static union drbd_state convert_state(union drbd_state ps)
3198{
3199 union drbd_state ms;
3200
3201 static enum drbd_conns c_tab[] = {
3202 [C_CONNECTED] = C_CONNECTED,
3203
3204 [C_STARTING_SYNC_S] = C_STARTING_SYNC_T,
3205 [C_STARTING_SYNC_T] = C_STARTING_SYNC_S,
3206 [C_DISCONNECTING] = C_TEAR_DOWN, /* C_NETWORK_FAILURE, */
3207 [C_VERIFY_S] = C_VERIFY_T,
3208 [C_MASK] = C_MASK,
3209 };
3210
3211 ms.i = ps.i;
3212
3213 ms.conn = c_tab[ps.conn];
3214 ms.peer = ps.role;
3215 ms.role = ps.peer;
3216 ms.pdsk = ps.disk;
3217 ms.disk = ps.pdsk;
3218 ms.peer_isp = (ps.aftr_isp | ps.user_isp);
3219
3220 return ms;
3221}
3222
d8763023
AG
3223static int receive_req_state(struct drbd_conf *mdev, enum drbd_packet cmd,
3224 unsigned int data_size)
b411b363 3225{
e42325a5 3226 struct p_req_state *p = &mdev->tconn->data.rbuf.req_state;
b411b363 3227 union drbd_state mask, val;
bf885f8a 3228 enum drbd_state_rv rv;
b411b363 3229
b411b363
PR
3230 mask.i = be32_to_cpu(p->mask);
3231 val.i = be32_to_cpu(p->val);
3232
25703f83 3233 if (test_bit(DISCARD_CONCURRENT, &mdev->tconn->flags) &&
8410da8f 3234 mutex_is_locked(mdev->state_mutex)) {
b411b363 3235 drbd_send_sr_reply(mdev, SS_CONCURRENT_ST_CHG);
81e84650 3236 return true;
b411b363
PR
3237 }
3238
3239 mask = convert_state(mask);
3240 val = convert_state(val);
3241
047cd4a6
PR
3242 if (cmd == P_CONN_ST_CHG_REQ) {
3243 rv = conn_request_state(mdev->tconn, mask, val, CS_VERBOSE | CS_LOCAL_ONLY);
3244 conn_send_sr_reply(mdev->tconn, rv);
3245 } else {
3246 rv = drbd_change_state(mdev, CS_VERBOSE, mask, val);
3247 drbd_send_sr_reply(mdev, rv);
3248 }
b411b363 3249
b411b363
PR
3250 drbd_md_sync(mdev);
3251
81e84650 3252 return true;
b411b363
PR
3253}
3254
d8763023
AG
3255static int receive_state(struct drbd_conf *mdev, enum drbd_packet cmd,
3256 unsigned int data_size)
b411b363 3257{
e42325a5 3258 struct p_state *p = &mdev->tconn->data.rbuf.state;
4ac4aada 3259 union drbd_state os, ns, peer_state;
b411b363 3260 enum drbd_disk_state real_peer_disk;
65d922c3 3261 enum chg_state_flags cs_flags;
b411b363
PR
3262 int rv;
3263
b411b363
PR
3264 peer_state.i = be32_to_cpu(p->state);
3265
3266 real_peer_disk = peer_state.disk;
3267 if (peer_state.disk == D_NEGOTIATING) {
3268 real_peer_disk = mdev->p_uuid[UI_FLAGS] & 4 ? D_INCONSISTENT : D_CONSISTENT;
3269 dev_info(DEV, "real peer disk state = %s\n", drbd_disk_str(real_peer_disk));
3270 }
3271
87eeee41 3272 spin_lock_irq(&mdev->tconn->req_lock);
b411b363 3273 retry:
4ac4aada 3274 os = ns = mdev->state;
87eeee41 3275 spin_unlock_irq(&mdev->tconn->req_lock);
b411b363 3276
e9ef7bb6
LE
3277 /* peer says his disk is uptodate, while we think it is inconsistent,
3278 * and this happens while we think we have a sync going on. */
3279 if (os.pdsk == D_INCONSISTENT && real_peer_disk == D_UP_TO_DATE &&
3280 os.conn > C_CONNECTED && os.disk == D_UP_TO_DATE) {
3281 /* If we are (becoming) SyncSource, but peer is still in sync
3282 * preparation, ignore its uptodate-ness to avoid flapping, it
3283 * will change to inconsistent once the peer reaches active
3284 * syncing states.
3285 * It may have changed syncer-paused flags, however, so we
3286 * cannot ignore this completely. */
3287 if (peer_state.conn > C_CONNECTED &&
3288 peer_state.conn < C_SYNC_SOURCE)
3289 real_peer_disk = D_INCONSISTENT;
3290
3291 /* if peer_state changes to connected at the same time,
3292 * it explicitly notifies us that it finished resync.
3293 * Maybe we should finish it up, too? */
3294 else if (os.conn >= C_SYNC_SOURCE &&
3295 peer_state.conn == C_CONNECTED) {
3296 if (drbd_bm_total_weight(mdev) <= mdev->rs_failed)
3297 drbd_resync_finished(mdev);
81e84650 3298 return true;
e9ef7bb6
LE
3299 }
3300 }
3301
3302 /* peer says his disk is inconsistent, while we think it is uptodate,
3303 * and this happens while the peer still thinks we have a sync going on,
3304 * but we think we are already done with the sync.
3305 * We ignore this to avoid flapping pdsk.
3306 * This should not happen, if the peer is a recent version of drbd. */
3307 if (os.pdsk == D_UP_TO_DATE && real_peer_disk == D_INCONSISTENT &&
3308 os.conn == C_CONNECTED && peer_state.conn > C_SYNC_SOURCE)
3309 real_peer_disk = D_UP_TO_DATE;
3310
4ac4aada
LE
3311 if (ns.conn == C_WF_REPORT_PARAMS)
3312 ns.conn = C_CONNECTED;
b411b363 3313
67531718
PR
3314 if (peer_state.conn == C_AHEAD)
3315 ns.conn = C_BEHIND;
3316
b411b363
PR
3317 if (mdev->p_uuid && peer_state.disk >= D_NEGOTIATING &&
3318 get_ldev_if_state(mdev, D_NEGOTIATING)) {
3319 int cr; /* consider resync */
3320
3321 /* if we established a new connection */
4ac4aada 3322 cr = (os.conn < C_CONNECTED);
b411b363
PR
3323 /* if we had an established connection
3324 * and one of the nodes newly attaches a disk */
4ac4aada 3325 cr |= (os.conn == C_CONNECTED &&
b411b363 3326 (peer_state.disk == D_NEGOTIATING ||
4ac4aada 3327 os.disk == D_NEGOTIATING));
b411b363
PR
3328 /* if we have both been inconsistent, and the peer has been
3329 * forced to be UpToDate with --overwrite-data */
3330 cr |= test_bit(CONSIDER_RESYNC, &mdev->flags);
3331 /* if we had been plain connected, and the admin requested to
3332 * start a sync by "invalidate" or "invalidate-remote" */
4ac4aada 3333 cr |= (os.conn == C_CONNECTED &&
b411b363
PR
3334 (peer_state.conn >= C_STARTING_SYNC_S &&
3335 peer_state.conn <= C_WF_BITMAP_T));
3336
3337 if (cr)
4ac4aada 3338 ns.conn = drbd_sync_handshake(mdev, peer_state.role, real_peer_disk);
b411b363
PR
3339
3340 put_ldev(mdev);
4ac4aada
LE
3341 if (ns.conn == C_MASK) {
3342 ns.conn = C_CONNECTED;
b411b363 3343 if (mdev->state.disk == D_NEGOTIATING) {
82f59cc6 3344 drbd_force_state(mdev, NS(disk, D_FAILED));
b411b363
PR
3345 } else if (peer_state.disk == D_NEGOTIATING) {
3346 dev_err(DEV, "Disk attach process on the peer node was aborted.\n");
3347 peer_state.disk = D_DISKLESS;
580b9767 3348 real_peer_disk = D_DISKLESS;
b411b363 3349 } else {
cf14c2e9 3350 if (test_and_clear_bit(CONN_DRY_RUN, &mdev->flags))
81e84650 3351 return false;
4ac4aada 3352 D_ASSERT(os.conn == C_WF_REPORT_PARAMS);
b411b363 3353 drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
81e84650 3354 return false;
b411b363
PR
3355 }
3356 }
3357 }
3358
87eeee41 3359 spin_lock_irq(&mdev->tconn->req_lock);
4ac4aada 3360 if (mdev->state.i != os.i)
b411b363
PR
3361 goto retry;
3362 clear_bit(CONSIDER_RESYNC, &mdev->flags);
b411b363
PR
3363 ns.peer = peer_state.role;
3364 ns.pdsk = real_peer_disk;
3365 ns.peer_isp = (peer_state.aftr_isp | peer_state.user_isp);
4ac4aada 3366 if ((ns.conn == C_CONNECTED || ns.conn == C_WF_BITMAP_S) && ns.disk == D_NEGOTIATING)
b411b363 3367 ns.disk = mdev->new_state_tmp.disk;
4ac4aada
LE
3368 cs_flags = CS_VERBOSE + (os.conn < C_CONNECTED && ns.conn >= C_CONNECTED ? 0 : CS_HARD);
3369 if (ns.pdsk == D_CONSISTENT && is_susp(ns) && ns.conn == C_CONNECTED && os.conn < C_CONNECTED &&
481c6f50 3370 test_bit(NEW_CUR_UUID, &mdev->flags)) {
8554df1c 3371 /* Do not allow tl_restart(RESEND) for a rebooted peer. We can only allow this
481c6f50 3372 for temporal network outages! */
87eeee41 3373 spin_unlock_irq(&mdev->tconn->req_lock);
481c6f50
PR
3374 dev_err(DEV, "Aborting Connect, can not thaw IO with an only Consistent peer\n");
3375 tl_clear(mdev);
3376 drbd_uuid_new_current(mdev);
3377 clear_bit(NEW_CUR_UUID, &mdev->flags);
3378 drbd_force_state(mdev, NS2(conn, C_PROTOCOL_ERROR, susp, 0));
81e84650 3379 return false;
481c6f50 3380 }
65d922c3 3381 rv = _drbd_set_state(mdev, ns, cs_flags, NULL);
b411b363 3382 ns = mdev->state;
87eeee41 3383 spin_unlock_irq(&mdev->tconn->req_lock);
b411b363
PR
3384
3385 if (rv < SS_SUCCESS) {
3386 drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
81e84650 3387 return false;
b411b363
PR
3388 }
3389
4ac4aada
LE
3390 if (os.conn > C_WF_REPORT_PARAMS) {
3391 if (ns.conn > C_CONNECTED && peer_state.conn <= C_CONNECTED &&
b411b363
PR
3392 peer_state.disk != D_NEGOTIATING ) {
3393 /* we want resync, peer has not yet decided to sync... */
3394 /* Nowadays only used when forcing a node into primary role and
3395 setting its disk to UpToDate with that */
3396 drbd_send_uuids(mdev);
3397 drbd_send_state(mdev);
3398 }
3399 }
3400
89e58e75 3401 mdev->tconn->net_conf->want_lose = 0;
b411b363
PR
3402
3403 drbd_md_sync(mdev); /* update connected indicator, la_size, ... */
3404
81e84650 3405 return true;
b411b363
PR
3406}
3407
d8763023
AG
3408static int receive_sync_uuid(struct drbd_conf *mdev, enum drbd_packet cmd,
3409 unsigned int data_size)
b411b363 3410{
e42325a5 3411 struct p_rs_uuid *p = &mdev->tconn->data.rbuf.rs_uuid;
b411b363
PR
3412
3413 wait_event(mdev->misc_wait,
3414 mdev->state.conn == C_WF_SYNC_UUID ||
c4752ef1 3415 mdev->state.conn == C_BEHIND ||
b411b363
PR
3416 mdev->state.conn < C_CONNECTED ||
3417 mdev->state.disk < D_NEGOTIATING);
3418
3419 /* D_ASSERT( mdev->state.conn == C_WF_SYNC_UUID ); */
3420
b411b363
PR
3421 /* Here the _drbd_uuid_ functions are right, current should
3422 _not_ be rotated into the history */
3423 if (get_ldev_if_state(mdev, D_NEGOTIATING)) {
3424 _drbd_uuid_set(mdev, UI_CURRENT, be64_to_cpu(p->uuid));
3425 _drbd_uuid_set(mdev, UI_BITMAP, 0UL);
3426
62b0da3a 3427 drbd_print_uuids(mdev, "updated sync uuid");
b411b363
PR
3428 drbd_start_resync(mdev, C_SYNC_TARGET);
3429
3430 put_ldev(mdev);
3431 } else
3432 dev_err(DEV, "Ignoring SyncUUID packet!\n");
3433
81e84650 3434 return true;
b411b363
PR
3435}
3436
2c46407d
AG
3437/**
3438 * receive_bitmap_plain
3439 *
3440 * Return 0 when done, 1 when another iteration is needed, and a negative error
3441 * code upon failure.
3442 */
3443static int
02918be2
PR
3444receive_bitmap_plain(struct drbd_conf *mdev, unsigned int data_size,
3445 unsigned long *buffer, struct bm_xfer_ctx *c)
b411b363
PR
3446{
3447 unsigned num_words = min_t(size_t, BM_PACKET_WORDS, c->bm_words - c->word_offset);
3448 unsigned want = num_words * sizeof(long);
2c46407d 3449 int err;
b411b363 3450
02918be2
PR
3451 if (want != data_size) {
3452 dev_err(DEV, "%s:want (%u) != data_size (%u)\n", __func__, want, data_size);
2c46407d 3453 return -EIO;
b411b363
PR
3454 }
3455 if (want == 0)
2c46407d 3456 return 0;
de0ff338 3457 err = drbd_recv(mdev->tconn, buffer, want);
2c46407d
AG
3458 if (err != want) {
3459 if (err >= 0)
3460 err = -EIO;
3461 return err;
3462 }
b411b363
PR
3463
3464 drbd_bm_merge_lel(mdev, c->word_offset, num_words, buffer);
3465
3466 c->word_offset += num_words;
3467 c->bit_offset = c->word_offset * BITS_PER_LONG;
3468 if (c->bit_offset > c->bm_bits)
3469 c->bit_offset = c->bm_bits;
3470
2c46407d 3471 return 1;
b411b363
PR
3472}
3473
2c46407d
AG
3474/**
3475 * recv_bm_rle_bits
3476 *
3477 * Return 0 when done, 1 when another iteration is needed, and a negative error
3478 * code upon failure.
3479 */
3480static int
b411b363
PR
3481recv_bm_rle_bits(struct drbd_conf *mdev,
3482 struct p_compressed_bm *p,
c6d25cfe
PR
3483 struct bm_xfer_ctx *c,
3484 unsigned int len)
b411b363
PR
3485{
3486 struct bitstream bs;
3487 u64 look_ahead;
3488 u64 rl;
3489 u64 tmp;
3490 unsigned long s = c->bit_offset;
3491 unsigned long e;
b411b363
PR
3492 int toggle = DCBP_get_start(p);
3493 int have;
3494 int bits;
3495
3496 bitstream_init(&bs, p->code, len, DCBP_get_pad_bits(p));
3497
3498 bits = bitstream_get_bits(&bs, &look_ahead, 64);
3499 if (bits < 0)
2c46407d 3500 return -EIO;
b411b363
PR
3501
3502 for (have = bits; have > 0; s += rl, toggle = !toggle) {
3503 bits = vli_decode_bits(&rl, look_ahead);
3504 if (bits <= 0)
2c46407d 3505 return -EIO;
b411b363
PR
3506
3507 if (toggle) {
3508 e = s + rl -1;
3509 if (e >= c->bm_bits) {
3510 dev_err(DEV, "bitmap overflow (e:%lu) while decoding bm RLE packet\n", e);
2c46407d 3511 return -EIO;
b411b363
PR
3512 }
3513 _drbd_bm_set_bits(mdev, s, e);
3514 }
3515
3516 if (have < bits) {
3517 dev_err(DEV, "bitmap decoding error: h:%d b:%d la:0x%08llx l:%u/%u\n",
3518 have, bits, look_ahead,
3519 (unsigned int)(bs.cur.b - p->code),
3520 (unsigned int)bs.buf_len);
2c46407d 3521 return -EIO;
b411b363
PR
3522 }
3523 look_ahead >>= bits;
3524 have -= bits;
3525
3526 bits = bitstream_get_bits(&bs, &tmp, 64 - have);
3527 if (bits < 0)
2c46407d 3528 return -EIO;
b411b363
PR
3529 look_ahead |= tmp << have;
3530 have += bits;
3531 }
3532
3533 c->bit_offset = s;
3534 bm_xfer_ctx_bit_to_word_offset(c);
3535
2c46407d 3536 return (s != c->bm_bits);
b411b363
PR
3537}
3538
2c46407d
AG
3539/**
3540 * decode_bitmap_c
3541 *
3542 * Return 0 when done, 1 when another iteration is needed, and a negative error
3543 * code upon failure.
3544 */
3545static int
b411b363
PR
3546decode_bitmap_c(struct drbd_conf *mdev,
3547 struct p_compressed_bm *p,
c6d25cfe
PR
3548 struct bm_xfer_ctx *c,
3549 unsigned int len)
b411b363
PR
3550{
3551 if (DCBP_get_code(p) == RLE_VLI_Bits)
c6d25cfe 3552 return recv_bm_rle_bits(mdev, p, c, len);
b411b363
PR
3553
3554 /* other variants had been implemented for evaluation,
3555 * but have been dropped as this one turned out to be "best"
3556 * during all our tests. */
3557
3558 dev_err(DEV, "receive_bitmap_c: unknown encoding %u\n", p->encoding);
3559 drbd_force_state(mdev, NS(conn, C_PROTOCOL_ERROR));
2c46407d 3560 return -EIO;
b411b363
PR
3561}
3562
3563void INFO_bm_xfer_stats(struct drbd_conf *mdev,
3564 const char *direction, struct bm_xfer_ctx *c)
3565{
3566 /* what would it take to transfer it "plaintext" */
c012949a 3567 unsigned plain = sizeof(struct p_header) *
b411b363
PR
3568 ((c->bm_words+BM_PACKET_WORDS-1)/BM_PACKET_WORDS+1)
3569 + c->bm_words * sizeof(long);
3570 unsigned total = c->bytes[0] + c->bytes[1];
3571 unsigned r;
3572
3573 /* total can not be zero. but just in case: */
3574 if (total == 0)
3575 return;
3576
3577 /* don't report if not compressed */
3578 if (total >= plain)
3579 return;
3580
3581 /* total < plain. check for overflow, still */
3582 r = (total > UINT_MAX/1000) ? (total / (plain/1000))
3583 : (1000 * total / plain);
3584
3585 if (r > 1000)
3586 r = 1000;
3587
3588 r = 1000 - r;
3589 dev_info(DEV, "%s bitmap stats [Bytes(packets)]: plain %u(%u), RLE %u(%u), "
3590 "total %u; compression: %u.%u%%\n",
3591 direction,
3592 c->bytes[1], c->packets[1],
3593 c->bytes[0], c->packets[0],
3594 total, r/10, r % 10);
3595}
3596
3597/* Since we are processing the bitfield from lower addresses to higher,
3598 it does not matter if the process it in 32 bit chunks or 64 bit
3599 chunks as long as it is little endian. (Understand it as byte stream,
3600 beginning with the lowest byte...) If we would use big endian
3601 we would need to process it from the highest address to the lowest,
3602 in order to be agnostic to the 32 vs 64 bits issue.
3603
3604 returns 0 on failure, 1 if we successfully received it. */
d8763023
AG
3605static int receive_bitmap(struct drbd_conf *mdev, enum drbd_packet cmd,
3606 unsigned int data_size)
b411b363
PR
3607{
3608 struct bm_xfer_ctx c;
3609 void *buffer;
2c46407d 3610 int err;
81e84650 3611 int ok = false;
257d0af6 3612 struct p_header *h = &mdev->tconn->data.rbuf.header;
77351055 3613 struct packet_info pi;
b411b363 3614
20ceb2b2
LE
3615 drbd_bm_lock(mdev, "receive bitmap", BM_LOCKED_SET_ALLOWED);
3616 /* you are supposed to send additional out-of-sync information
3617 * if you actually set bits during this phase */
b411b363
PR
3618
3619 /* maybe we should use some per thread scratch page,
3620 * and allocate that during initial device creation? */
3621 buffer = (unsigned long *) __get_free_page(GFP_NOIO);
3622 if (!buffer) {
3623 dev_err(DEV, "failed to allocate one page buffer in %s\n", __func__);
3624 goto out;
3625 }
3626
3627 c = (struct bm_xfer_ctx) {
3628 .bm_bits = drbd_bm_bits(mdev),
3629 .bm_words = drbd_bm_words(mdev),
3630 };
3631
2c46407d 3632 for(;;) {
02918be2 3633 if (cmd == P_BITMAP) {
2c46407d 3634 err = receive_bitmap_plain(mdev, data_size, buffer, &c);
02918be2 3635 } else if (cmd == P_COMPRESSED_BITMAP) {
b411b363
PR
3636 /* MAYBE: sanity check that we speak proto >= 90,
3637 * and the feature is enabled! */
3638 struct p_compressed_bm *p;
3639
02918be2 3640 if (data_size > BM_PACKET_PAYLOAD_BYTES) {
b411b363
PR
3641 dev_err(DEV, "ReportCBitmap packet too large\n");
3642 goto out;
3643 }
3644 /* use the page buff */
3645 p = buffer;
3646 memcpy(p, h, sizeof(*h));
de0ff338 3647 if (drbd_recv(mdev->tconn, p->head.payload, data_size) != data_size)
b411b363 3648 goto out;
004352fa
LE
3649 if (data_size <= (sizeof(*p) - sizeof(p->head))) {
3650 dev_err(DEV, "ReportCBitmap packet too small (l:%u)\n", data_size);
78fcbdae 3651 goto out;
b411b363 3652 }
c6d25cfe 3653 err = decode_bitmap_c(mdev, p, &c, data_size);
b411b363 3654 } else {
02918be2 3655 dev_warn(DEV, "receive_bitmap: cmd neither ReportBitMap nor ReportCBitMap (is 0x%x)", cmd);
b411b363
PR
3656 goto out;
3657 }
3658
02918be2 3659 c.packets[cmd == P_BITMAP]++;
257d0af6 3660 c.bytes[cmd == P_BITMAP] += sizeof(struct p_header) + data_size;
b411b363 3661
2c46407d
AG
3662 if (err <= 0) {
3663 if (err < 0)
3664 goto out;
b411b363 3665 break;
2c46407d 3666 }
9ba7aa00 3667 if (!drbd_recv_header(mdev->tconn, &pi))
b411b363 3668 goto out;
77351055
PR
3669 cmd = pi.cmd;
3670 data_size = pi.size;
2c46407d 3671 }
b411b363
PR
3672
3673 INFO_bm_xfer_stats(mdev, "receive", &c);
3674
3675 if (mdev->state.conn == C_WF_BITMAP_T) {
de1f8e4a
AG
3676 enum drbd_state_rv rv;
3677
b411b363
PR
3678 ok = !drbd_send_bitmap(mdev);
3679 if (!ok)
3680 goto out;
3681 /* Omit CS_ORDERED with this state transition to avoid deadlocks. */
de1f8e4a
AG
3682 rv = _drbd_request_state(mdev, NS(conn, C_WF_SYNC_UUID), CS_VERBOSE);
3683 D_ASSERT(rv == SS_SUCCESS);
b411b363
PR
3684 } else if (mdev->state.conn != C_WF_BITMAP_S) {
3685 /* admin may have requested C_DISCONNECTING,
3686 * other threads may have noticed network errors */
3687 dev_info(DEV, "unexpected cstate (%s) in receive_bitmap\n",
3688 drbd_conn_str(mdev->state.conn));
3689 }
3690
81e84650 3691 ok = true;
b411b363 3692 out:
20ceb2b2 3693 drbd_bm_unlock(mdev);
b411b363
PR
3694 if (ok && mdev->state.conn == C_WF_BITMAP_S)
3695 drbd_start_resync(mdev, C_SYNC_SOURCE);
3696 free_page((unsigned long) buffer);
3697 return ok;
3698}
3699
d8763023
AG
3700static int receive_skip(struct drbd_conf *mdev, enum drbd_packet cmd,
3701 unsigned int data_size)
b411b363
PR
3702{
3703 /* TODO zero copy sink :) */
3704 static char sink[128];
3705 int size, want, r;
3706
02918be2
PR
3707 dev_warn(DEV, "skipping unknown optional packet type %d, l: %d!\n",
3708 cmd, data_size);
b411b363 3709
02918be2 3710 size = data_size;
b411b363
PR
3711 while (size > 0) {
3712 want = min_t(int, size, sizeof(sink));
de0ff338 3713 r = drbd_recv(mdev->tconn, sink, want);
841ce241
AG
3714 if (!expect(r > 0))
3715 break;
b411b363
PR
3716 size -= r;
3717 }
3718 return size == 0;
3719}
3720
d8763023
AG
3721static int receive_UnplugRemote(struct drbd_conf *mdev, enum drbd_packet cmd,
3722 unsigned int data_size)
0ced55a3 3723{
e7f52dfb
LE
3724 /* Make sure we've acked all the TCP data associated
3725 * with the data requests being unplugged */
e42325a5 3726 drbd_tcp_quickack(mdev->tconn->data.socket);
0ced55a3 3727
81e84650 3728 return true;
0ced55a3
PR
3729}
3730
d8763023
AG
3731static int receive_out_of_sync(struct drbd_conf *mdev, enum drbd_packet cmd,
3732 unsigned int data_size)
73a01a18 3733{
e42325a5 3734 struct p_block_desc *p = &mdev->tconn->data.rbuf.block_desc;
73a01a18 3735
f735e363
LE
3736 switch (mdev->state.conn) {
3737 case C_WF_SYNC_UUID:
3738 case C_WF_BITMAP_T:
3739 case C_BEHIND:
3740 break;
3741 default:
3742 dev_err(DEV, "ASSERT FAILED cstate = %s, expected: WFSyncUUID|WFBitMapT|Behind\n",
3743 drbd_conn_str(mdev->state.conn));
3744 }
3745
73a01a18
PR
3746 drbd_set_out_of_sync(mdev, be64_to_cpu(p->sector), be32_to_cpu(p->blksize));
3747
81e84650 3748 return true;
73a01a18
PR
3749}
3750
d8763023
AG
3751typedef int (*drbd_cmd_handler_f)(struct drbd_conf *, enum drbd_packet cmd,
3752 unsigned int to_receive);
02918be2
PR
3753
3754struct data_cmd {
3755 int expect_payload;
3756 size_t pkt_size;
3757 drbd_cmd_handler_f function;
3758};
3759
3760static struct data_cmd drbd_cmd_handler[] = {
3761 [P_DATA] = { 1, sizeof(struct p_data), receive_Data },
3762 [P_DATA_REPLY] = { 1, sizeof(struct p_data), receive_DataReply },
3763 [P_RS_DATA_REPLY] = { 1, sizeof(struct p_data), receive_RSDataReply } ,
3764 [P_BARRIER] = { 0, sizeof(struct p_barrier), receive_Barrier } ,
257d0af6
PR
3765 [P_BITMAP] = { 1, sizeof(struct p_header), receive_bitmap } ,
3766 [P_COMPRESSED_BITMAP] = { 1, sizeof(struct p_header), receive_bitmap } ,
3767 [P_UNPLUG_REMOTE] = { 0, sizeof(struct p_header), receive_UnplugRemote },
02918be2
PR
3768 [P_DATA_REQUEST] = { 0, sizeof(struct p_block_req), receive_DataRequest },
3769 [P_RS_DATA_REQUEST] = { 0, sizeof(struct p_block_req), receive_DataRequest },
257d0af6
PR
3770 [P_SYNC_PARAM] = { 1, sizeof(struct p_header), receive_SyncParam },
3771 [P_SYNC_PARAM89] = { 1, sizeof(struct p_header), receive_SyncParam },
02918be2
PR
3772 [P_PROTOCOL] = { 1, sizeof(struct p_protocol), receive_protocol },
3773 [P_UUIDS] = { 0, sizeof(struct p_uuids), receive_uuids },
3774 [P_SIZES] = { 0, sizeof(struct p_sizes), receive_sizes },
3775 [P_STATE] = { 0, sizeof(struct p_state), receive_state },
3776 [P_STATE_CHG_REQ] = { 0, sizeof(struct p_req_state), receive_req_state },
3777 [P_SYNC_UUID] = { 0, sizeof(struct p_rs_uuid), receive_sync_uuid },
3778 [P_OV_REQUEST] = { 0, sizeof(struct p_block_req), receive_DataRequest },
3779 [P_OV_REPLY] = { 1, sizeof(struct p_block_req), receive_DataRequest },
3780 [P_CSUM_RS_REQUEST] = { 1, sizeof(struct p_block_req), receive_DataRequest },
3781 [P_DELAY_PROBE] = { 0, sizeof(struct p_delay_probe93), receive_skip },
73a01a18 3782 [P_OUT_OF_SYNC] = { 0, sizeof(struct p_block_desc), receive_out_of_sync },
047cd4a6 3783 [P_CONN_ST_CHG_REQ] = { 0, sizeof(struct p_req_state), receive_req_state },
b411b363
PR
3784 /* anything missing from this table is in
3785 * the asender_tbl, see get_asender_cmd */
02918be2 3786 [P_MAX_CMD] = { 0, 0, NULL },
b411b363
PR
3787};
3788
02918be2 3789/* All handler functions that expect a sub-header get that sub-heder in
e42325a5 3790 mdev->tconn->data.rbuf.header.head.payload.
02918be2 3791
e42325a5 3792 Usually in mdev->tconn->data.rbuf.header.head the callback can find the usual
02918be2
PR
3793 p_header, but they may not rely on that. Since there is also p_header95 !
3794 */
b411b363 3795
eefc2f7d 3796static void drbdd(struct drbd_tconn *tconn)
b411b363 3797{
eefc2f7d 3798 struct p_header *header = &tconn->data.rbuf.header;
77351055 3799 struct packet_info pi;
02918be2
PR
3800 size_t shs; /* sub header size */
3801 int rv;
b411b363 3802
eefc2f7d
PR
3803 while (get_t_state(&tconn->receiver) == RUNNING) {
3804 drbd_thread_current_set_cpu(&tconn->receiver);
3805 if (!drbd_recv_header(tconn, &pi))
02918be2 3806 goto err_out;
b411b363 3807
77351055 3808 if (unlikely(pi.cmd >= P_MAX_CMD || !drbd_cmd_handler[pi.cmd].function)) {
eefc2f7d 3809 conn_err(tconn, "unknown packet type %d, l: %d!\n", pi.cmd, pi.size);
02918be2 3810 goto err_out;
0b33a916 3811 }
b411b363 3812
77351055
PR
3813 shs = drbd_cmd_handler[pi.cmd].pkt_size - sizeof(struct p_header);
3814 if (pi.size - shs > 0 && !drbd_cmd_handler[pi.cmd].expect_payload) {
eefc2f7d 3815 conn_err(tconn, "No payload expected %s l:%d\n", cmdname(pi.cmd), pi.size);
02918be2 3816 goto err_out;
b411b363 3817 }
b411b363 3818
c13f7e1a 3819 if (shs) {
eefc2f7d 3820 rv = drbd_recv(tconn, &header->payload, shs);
c13f7e1a 3821 if (unlikely(rv != shs)) {
0ddc5549 3822 if (!signal_pending(current))
eefc2f7d 3823 conn_warn(tconn, "short read while reading sub header: rv=%d\n", rv);
c13f7e1a
LE
3824 goto err_out;
3825 }
3826 }
3827
eefc2f7d 3828 rv = drbd_cmd_handler[pi.cmd].function(vnr_to_mdev(tconn, pi.vnr), pi.cmd, pi.size - shs);
b411b363 3829
02918be2 3830 if (unlikely(!rv)) {
eefc2f7d 3831 conn_err(tconn, "error receiving %s, l: %d!\n",
77351055 3832 cmdname(pi.cmd), pi.size);
02918be2 3833 goto err_out;
b411b363
PR
3834 }
3835 }
b411b363 3836
02918be2
PR
3837 if (0) {
3838 err_out:
bbeb641c 3839 conn_request_state(tconn, NS(conn, C_PROTOCOL_ERROR), CS_HARD);
02918be2 3840 }
b411b363
PR
3841}
3842
a21e9298 3843void drbd_flush_workqueue(struct drbd_conf *mdev)
b411b363
PR
3844{
3845 struct drbd_wq_barrier barr;
3846
3847 barr.w.cb = w_prev_work_done;
a21e9298 3848 barr.w.mdev = mdev;
b411b363 3849 init_completion(&barr.done);
a21e9298 3850 drbd_queue_work(&mdev->tconn->data.work, &barr.w);
b411b363
PR
3851 wait_for_completion(&barr.done);
3852}
3853
360cc740 3854static void drbd_disconnect(struct drbd_tconn *tconn)
b411b363 3855{
bbeb641c 3856 enum drbd_conns oc;
b411b363 3857 int rv = SS_UNKNOWN_ERROR;
b411b363 3858
bbeb641c 3859 if (tconn->cstate == C_STANDALONE)
b411b363 3860 return;
b411b363
PR
3861
3862 /* asender does not clean up anything. it must not interfere, either */
360cc740
PR
3863 drbd_thread_stop(&tconn->asender);
3864 drbd_free_sock(tconn);
3865
3866 idr_for_each(&tconn->volumes, drbd_disconnected, tconn);
3867
3868 conn_info(tconn, "Connection closed\n");
3869
3870 spin_lock_irq(&tconn->req_lock);
bbeb641c
PR
3871 oc = tconn->cstate;
3872 if (oc >= C_UNCONNECTED)
3873 rv = _conn_request_state(tconn, NS(conn, C_UNCONNECTED), CS_VERBOSE);
3874
360cc740
PR
3875 spin_unlock_irq(&tconn->req_lock);
3876
bbeb641c 3877 if (oc == C_DISCONNECTING) {
360cc740
PR
3878 wait_event(tconn->net_cnt_wait, atomic_read(&tconn->net_cnt) == 0);
3879
3880 crypto_free_hash(tconn->cram_hmac_tfm);
3881 tconn->cram_hmac_tfm = NULL;
3882
3883 kfree(tconn->net_conf);
3884 tconn->net_conf = NULL;
bbeb641c 3885 conn_request_state(tconn, NS(conn, C_STANDALONE), CS_VERBOSE);
360cc740
PR
3886 }
3887}
3888
3889static int drbd_disconnected(int vnr, void *p, void *data)
3890{
3891 struct drbd_conf *mdev = (struct drbd_conf *)p;
3892 enum drbd_fencing_p fp;
3893 unsigned int i;
b411b363 3894
85719573 3895 /* wait for current activity to cease. */
87eeee41 3896 spin_lock_irq(&mdev->tconn->req_lock);
b411b363
PR
3897 _drbd_wait_ee_list_empty(mdev, &mdev->active_ee);
3898 _drbd_wait_ee_list_empty(mdev, &mdev->sync_ee);
3899 _drbd_wait_ee_list_empty(mdev, &mdev->read_ee);
87eeee41 3900 spin_unlock_irq(&mdev->tconn->req_lock);
b411b363
PR
3901
3902 /* We do not have data structures that would allow us to
3903 * get the rs_pending_cnt down to 0 again.
3904 * * On C_SYNC_TARGET we do not have any data structures describing
3905 * the pending RSDataRequest's we have sent.
3906 * * On C_SYNC_SOURCE there is no data structure that tracks
3907 * the P_RS_DATA_REPLY blocks that we sent to the SyncTarget.
3908 * And no, it is not the sum of the reference counts in the
3909 * resync_LRU. The resync_LRU tracks the whole operation including
3910 * the disk-IO, while the rs_pending_cnt only tracks the blocks
3911 * on the fly. */
3912 drbd_rs_cancel_all(mdev);
3913 mdev->rs_total = 0;
3914 mdev->rs_failed = 0;
3915 atomic_set(&mdev->rs_pending_cnt, 0);
3916 wake_up(&mdev->misc_wait);
3917
7fde2be9
PR
3918 del_timer(&mdev->request_timer);
3919
b411b363
PR
3920 /* make sure syncer is stopped and w_resume_next_sg queued */
3921 del_timer_sync(&mdev->resync_timer);
b411b363
PR
3922 resync_timer_fn((unsigned long)mdev);
3923
b411b363
PR
3924 /* wait for all w_e_end_data_req, w_e_end_rsdata_req, w_send_barrier,
3925 * w_make_resync_request etc. which may still be on the worker queue
3926 * to be "canceled" */
a21e9298 3927 drbd_flush_workqueue(mdev);
b411b363
PR
3928
3929 /* This also does reclaim_net_ee(). If we do this too early, we might
3930 * miss some resync ee and pages.*/
3931 drbd_process_done_ee(mdev);
3932
3933 kfree(mdev->p_uuid);
3934 mdev->p_uuid = NULL;
3935
fb22c402 3936 if (!is_susp(mdev->state))
b411b363
PR
3937 tl_clear(mdev);
3938
b411b363
PR
3939 drbd_md_sync(mdev);
3940
3941 fp = FP_DONT_CARE;
3942 if (get_ldev(mdev)) {
3943 fp = mdev->ldev->dc.fencing;
3944 put_ldev(mdev);
3945 }
3946
87f7be4c
PR
3947 if (mdev->state.role == R_PRIMARY && fp >= FP_RESOURCE && mdev->state.pdsk >= D_UNKNOWN)
3948 drbd_try_outdate_peer_async(mdev);
b411b363 3949
20ceb2b2
LE
3950 /* serialize with bitmap writeout triggered by the state change,
3951 * if any. */
3952 wait_event(mdev->misc_wait, !test_bit(BITMAP_IO, &mdev->flags));
3953
b411b363
PR
3954 /* tcp_close and release of sendpage pages can be deferred. I don't
3955 * want to use SO_LINGER, because apparently it can be deferred for
3956 * more than 20 seconds (longest time I checked).
3957 *
3958 * Actually we don't care for exactly when the network stack does its
3959 * put_page(), but release our reference on these pages right here.
3960 */
3961 i = drbd_release_ee(mdev, &mdev->net_ee);
3962 if (i)
3963 dev_info(DEV, "net_ee not empty, killed %u entries\n", i);
435f0740
LE
3964 i = atomic_read(&mdev->pp_in_use_by_net);
3965 if (i)
3966 dev_info(DEV, "pp_in_use_by_net = %d, expected 0\n", i);
b411b363
PR
3967 i = atomic_read(&mdev->pp_in_use);
3968 if (i)
45bb912b 3969 dev_info(DEV, "pp_in_use = %d, expected 0\n", i);
b411b363
PR
3970
3971 D_ASSERT(list_empty(&mdev->read_ee));
3972 D_ASSERT(list_empty(&mdev->active_ee));
3973 D_ASSERT(list_empty(&mdev->sync_ee));
3974 D_ASSERT(list_empty(&mdev->done_ee));
3975
3976 /* ok, no more ee's on the fly, it is safe to reset the epoch_size */
3977 atomic_set(&mdev->current_epoch->epoch_size, 0);
3978 D_ASSERT(list_empty(&mdev->current_epoch->list));
360cc740
PR
3979
3980 return 0;
b411b363
PR
3981}
3982
3983/*
3984 * We support PRO_VERSION_MIN to PRO_VERSION_MAX. The protocol version
3985 * we can agree on is stored in agreed_pro_version.
3986 *
3987 * feature flags and the reserved array should be enough room for future
3988 * enhancements of the handshake protocol, and possible plugins...
3989 *
3990 * for now, they are expected to be zero, but ignored.
3991 */
8a22cccc 3992static int drbd_send_handshake(struct drbd_tconn *tconn)
b411b363 3993{
e6b3ea83 3994 /* ASSERT current == mdev->tconn->receiver ... */
8a22cccc 3995 struct p_handshake *p = &tconn->data.sbuf.handshake;
b411b363
PR
3996 int ok;
3997
8a22cccc
PR
3998 if (mutex_lock_interruptible(&tconn->data.mutex)) {
3999 conn_err(tconn, "interrupted during initial handshake\n");
b411b363
PR
4000 return 0; /* interrupted. not ok. */
4001 }
4002
8a22cccc
PR
4003 if (tconn->data.socket == NULL) {
4004 mutex_unlock(&tconn->data.mutex);
b411b363
PR
4005 return 0;
4006 }
4007
4008 memset(p, 0, sizeof(*p));
4009 p->protocol_min = cpu_to_be32(PRO_VERSION_MIN);
4010 p->protocol_max = cpu_to_be32(PRO_VERSION_MAX);
8a22cccc
PR
4011 ok = _conn_send_cmd(tconn, 0, tconn->data.socket, P_HAND_SHAKE,
4012 &p->head, sizeof(*p), 0);
4013 mutex_unlock(&tconn->data.mutex);
b411b363
PR
4014 return ok;
4015}
4016
4017/*
4018 * return values:
4019 * 1 yes, we have a valid connection
4020 * 0 oops, did not work out, please try again
4021 * -1 peer talks different language,
4022 * no point in trying again, please go standalone.
4023 */
65d11ed6 4024static int drbd_do_handshake(struct drbd_tconn *tconn)
b411b363 4025{
65d11ed6
PR
4026 /* ASSERT current == tconn->receiver ... */
4027 struct p_handshake *p = &tconn->data.rbuf.handshake;
02918be2 4028 const int expect = sizeof(struct p_handshake) - sizeof(struct p_header80);
77351055 4029 struct packet_info pi;
b411b363
PR
4030 int rv;
4031
65d11ed6 4032 rv = drbd_send_handshake(tconn);
b411b363
PR
4033 if (!rv)
4034 return 0;
4035
65d11ed6 4036 rv = drbd_recv_header(tconn, &pi);
b411b363
PR
4037 if (!rv)
4038 return 0;
4039
77351055 4040 if (pi.cmd != P_HAND_SHAKE) {
65d11ed6 4041 conn_err(tconn, "expected HandShake packet, received: %s (0x%04x)\n",
77351055 4042 cmdname(pi.cmd), pi.cmd);
b411b363
PR
4043 return -1;
4044 }
4045
77351055 4046 if (pi.size != expect) {
65d11ed6 4047 conn_err(tconn, "expected HandShake length: %u, received: %u\n",
77351055 4048 expect, pi.size);
b411b363
PR
4049 return -1;
4050 }
4051
65d11ed6 4052 rv = drbd_recv(tconn, &p->head.payload, expect);
b411b363
PR
4053
4054 if (rv != expect) {
0ddc5549 4055 if (!signal_pending(current))
65d11ed6 4056 conn_warn(tconn, "short read receiving handshake packet: l=%u\n", rv);
b411b363
PR
4057 return 0;
4058 }
4059
b411b363
PR
4060 p->protocol_min = be32_to_cpu(p->protocol_min);
4061 p->protocol_max = be32_to_cpu(p->protocol_max);
4062 if (p->protocol_max == 0)
4063 p->protocol_max = p->protocol_min;
4064
4065 if (PRO_VERSION_MAX < p->protocol_min ||
4066 PRO_VERSION_MIN > p->protocol_max)
4067 goto incompat;
4068
65d11ed6 4069 tconn->agreed_pro_version = min_t(int, PRO_VERSION_MAX, p->protocol_max);
b411b363 4070
65d11ed6
PR
4071 conn_info(tconn, "Handshake successful: "
4072 "Agreed network protocol version %d\n", tconn->agreed_pro_version);
b411b363
PR
4073
4074 return 1;
4075
4076 incompat:
65d11ed6 4077 conn_err(tconn, "incompatible DRBD dialects: "
b411b363
PR
4078 "I support %d-%d, peer supports %d-%d\n",
4079 PRO_VERSION_MIN, PRO_VERSION_MAX,
4080 p->protocol_min, p->protocol_max);
4081 return -1;
4082}
4083
4084#if !defined(CONFIG_CRYPTO_HMAC) && !defined(CONFIG_CRYPTO_HMAC_MODULE)
13e6037d 4085static int drbd_do_auth(struct drbd_tconn *tconn)
b411b363
PR
4086{
4087 dev_err(DEV, "This kernel was build without CONFIG_CRYPTO_HMAC.\n");
4088 dev_err(DEV, "You need to disable 'cram-hmac-alg' in drbd.conf.\n");
b10d96cb 4089 return -1;
b411b363
PR
4090}
4091#else
4092#define CHALLENGE_LEN 64
b10d96cb
JT
4093
4094/* Return value:
4095 1 - auth succeeded,
4096 0 - failed, try again (network error),
4097 -1 - auth failed, don't try again.
4098*/
4099
13e6037d 4100static int drbd_do_auth(struct drbd_tconn *tconn)
b411b363
PR
4101{
4102 char my_challenge[CHALLENGE_LEN]; /* 64 Bytes... */
4103 struct scatterlist sg;
4104 char *response = NULL;
4105 char *right_response = NULL;
4106 char *peers_ch = NULL;
13e6037d 4107 unsigned int key_len = strlen(tconn->net_conf->shared_secret);
b411b363
PR
4108 unsigned int resp_size;
4109 struct hash_desc desc;
77351055 4110 struct packet_info pi;
b411b363
PR
4111 int rv;
4112
13e6037d 4113 desc.tfm = tconn->cram_hmac_tfm;
b411b363
PR
4114 desc.flags = 0;
4115
13e6037d
PR
4116 rv = crypto_hash_setkey(tconn->cram_hmac_tfm,
4117 (u8 *)tconn->net_conf->shared_secret, key_len);
b411b363 4118 if (rv) {
13e6037d 4119 conn_err(tconn, "crypto_hash_setkey() failed with %d\n", rv);
b10d96cb 4120 rv = -1;
b411b363
PR
4121 goto fail;
4122 }
4123
4124 get_random_bytes(my_challenge, CHALLENGE_LEN);
4125
13e6037d 4126 rv = conn_send_cmd2(tconn, P_AUTH_CHALLENGE, my_challenge, CHALLENGE_LEN);
b411b363
PR
4127 if (!rv)
4128 goto fail;
4129
13e6037d 4130 rv = drbd_recv_header(tconn, &pi);
b411b363
PR
4131 if (!rv)
4132 goto fail;
4133
77351055 4134 if (pi.cmd != P_AUTH_CHALLENGE) {
13e6037d 4135 conn_err(tconn, "expected AuthChallenge packet, received: %s (0x%04x)\n",
77351055 4136 cmdname(pi.cmd), pi.cmd);
b411b363
PR
4137 rv = 0;
4138 goto fail;
4139 }
4140
77351055 4141 if (pi.size > CHALLENGE_LEN * 2) {
13e6037d 4142 conn_err(tconn, "expected AuthChallenge payload too big.\n");
b10d96cb 4143 rv = -1;
b411b363
PR
4144 goto fail;
4145 }
4146
77351055 4147 peers_ch = kmalloc(pi.size, GFP_NOIO);
b411b363 4148 if (peers_ch == NULL) {
13e6037d 4149 conn_err(tconn, "kmalloc of peers_ch failed\n");
b10d96cb 4150 rv = -1;
b411b363
PR
4151 goto fail;
4152 }
4153
13e6037d 4154 rv = drbd_recv(tconn, peers_ch, pi.size);
b411b363 4155
77351055 4156 if (rv != pi.size) {
0ddc5549 4157 if (!signal_pending(current))
13e6037d 4158 conn_warn(tconn, "short read AuthChallenge: l=%u\n", rv);
b411b363
PR
4159 rv = 0;
4160 goto fail;
4161 }
4162
13e6037d 4163 resp_size = crypto_hash_digestsize(tconn->cram_hmac_tfm);
b411b363
PR
4164 response = kmalloc(resp_size, GFP_NOIO);
4165 if (response == NULL) {
13e6037d 4166 conn_err(tconn, "kmalloc of response failed\n");
b10d96cb 4167 rv = -1;
b411b363
PR
4168 goto fail;
4169 }
4170
4171 sg_init_table(&sg, 1);
77351055 4172 sg_set_buf(&sg, peers_ch, pi.size);
b411b363
PR
4173
4174 rv = crypto_hash_digest(&desc, &sg, sg.length, response);
4175 if (rv) {
13e6037d 4176 conn_err(tconn, "crypto_hash_digest() failed with %d\n", rv);
b10d96cb 4177 rv = -1;
b411b363
PR
4178 goto fail;
4179 }
4180
13e6037d 4181 rv = conn_send_cmd2(tconn, P_AUTH_RESPONSE, response, resp_size);
b411b363
PR
4182 if (!rv)
4183 goto fail;
4184
13e6037d 4185 rv = drbd_recv_header(tconn, &pi);
b411b363
PR
4186 if (!rv)
4187 goto fail;
4188
77351055 4189 if (pi.cmd != P_AUTH_RESPONSE) {
13e6037d 4190 conn_err(tconn, "expected AuthResponse packet, received: %s (0x%04x)\n",
77351055 4191 cmdname(pi.cmd), pi.cmd);
b411b363
PR
4192 rv = 0;
4193 goto fail;
4194 }
4195
77351055 4196 if (pi.size != resp_size) {
13e6037d 4197 conn_err(tconn, "expected AuthResponse payload of wrong size\n");
b411b363
PR
4198 rv = 0;
4199 goto fail;
4200 }
4201
13e6037d 4202 rv = drbd_recv(tconn, response , resp_size);
b411b363
PR
4203
4204 if (rv != resp_size) {
0ddc5549 4205 if (!signal_pending(current))
13e6037d 4206 conn_warn(tconn, "short read receiving AuthResponse: l=%u\n", rv);
b411b363
PR
4207 rv = 0;
4208 goto fail;
4209 }
4210
4211 right_response = kmalloc(resp_size, GFP_NOIO);
2d1ee87d 4212 if (right_response == NULL) {
13e6037d 4213 conn_err(tconn, "kmalloc of right_response failed\n");
b10d96cb 4214 rv = -1;
b411b363
PR
4215 goto fail;
4216 }
4217
4218 sg_set_buf(&sg, my_challenge, CHALLENGE_LEN);
4219
4220 rv = crypto_hash_digest(&desc, &sg, sg.length, right_response);
4221 if (rv) {
13e6037d 4222 conn_err(tconn, "crypto_hash_digest() failed with %d\n", rv);
b10d96cb 4223 rv = -1;
b411b363
PR
4224 goto fail;
4225 }
4226
4227 rv = !memcmp(response, right_response, resp_size);
4228
4229 if (rv)
13e6037d
PR
4230 conn_info(tconn, "Peer authenticated using %d bytes of '%s' HMAC\n",
4231 resp_size, tconn->net_conf->cram_hmac_alg);
b10d96cb
JT
4232 else
4233 rv = -1;
b411b363
PR
4234
4235 fail:
4236 kfree(peers_ch);
4237 kfree(response);
4238 kfree(right_response);
4239
4240 return rv;
4241}
4242#endif
4243
4244int drbdd_init(struct drbd_thread *thi)
4245{
392c8801 4246 struct drbd_tconn *tconn = thi->tconn;
b411b363
PR
4247 int h;
4248
4d641dd7 4249 conn_info(tconn, "receiver (re)started\n");
b411b363
PR
4250
4251 do {
4d641dd7 4252 h = drbd_connect(tconn);
b411b363 4253 if (h == 0) {
4d641dd7 4254 drbd_disconnect(tconn);
20ee6390 4255 schedule_timeout_interruptible(HZ);
b411b363
PR
4256 }
4257 if (h == -1) {
4d641dd7 4258 conn_warn(tconn, "Discarding network configuration.\n");
bbeb641c 4259 conn_request_state(tconn, NS(conn, C_DISCONNECTING), CS_HARD);
b411b363
PR
4260 }
4261 } while (h == 0);
4262
4263 if (h > 0) {
4d641dd7
PR
4264 if (get_net_conf(tconn)) {
4265 drbdd(tconn);
4266 put_net_conf(tconn);
b411b363
PR
4267 }
4268 }
4269
4d641dd7 4270 drbd_disconnect(tconn);
b411b363 4271
4d641dd7 4272 conn_info(tconn, "receiver terminated\n");
b411b363
PR
4273 return 0;
4274}
4275
4276/* ********* acknowledge sender ******** */
4277
d8763023 4278static int got_RqSReply(struct drbd_conf *mdev, enum drbd_packet cmd)
b411b363 4279{
257d0af6 4280 struct p_req_state_reply *p = &mdev->tconn->meta.rbuf.req_state_reply;
fc3b10a4 4281 struct drbd_tconn *tconn = mdev->tconn;
b411b363
PR
4282
4283 int retcode = be32_to_cpu(p->retcode);
4284
fc3b10a4
PR
4285 if (cmd == P_STATE_CHG_REPLY) {
4286 if (retcode >= SS_SUCCESS) {
4287 set_bit(CL_ST_CHG_SUCCESS, &mdev->flags);
4288 } else {
4289 set_bit(CL_ST_CHG_FAIL, &mdev->flags);
4290 dev_err(DEV, "Requested state change failed by peer: %s (%d)\n",
4291 drbd_set_st_err_str(retcode), retcode);
4292 }
4293 wake_up(&mdev->state_wait);
4294 } else /* conn == P_CONN_ST_CHG_REPLY */ {
4295 if (retcode >= SS_SUCCESS) {
4296 set_bit(CONN_WD_ST_CHG_OKAY, &tconn->flags);
4297 } else {
4298 set_bit(CONN_WD_ST_CHG_FAIL, &tconn->flags);
4299 conn_err(tconn, "Requested state change failed by peer: %s (%d)\n",
4300 drbd_set_st_err_str(retcode), retcode);
4301 }
4302 wake_up(&tconn->ping_wait);
b411b363 4303 }
81e84650 4304 return true;
b411b363
PR
4305}
4306
d8763023 4307static int got_Ping(struct drbd_conf *mdev, enum drbd_packet cmd)
b411b363 4308{
2a67d8b9 4309 return drbd_send_ping_ack(mdev->tconn);
b411b363
PR
4310
4311}
4312
d8763023 4313static int got_PingAck(struct drbd_conf *mdev, enum drbd_packet cmd)
b411b363 4314{
2a67d8b9 4315 struct drbd_tconn *tconn = mdev->tconn;
b411b363 4316 /* restore idle timeout */
2a67d8b9
PR
4317 tconn->meta.socket->sk->sk_rcvtimeo = tconn->net_conf->ping_int*HZ;
4318 if (!test_and_set_bit(GOT_PING_ACK, &tconn->flags))
4319 wake_up(&tconn->ping_wait);
b411b363 4320
81e84650 4321 return true;
b411b363
PR
4322}
4323
d8763023 4324static int got_IsInSync(struct drbd_conf *mdev, enum drbd_packet cmd)
b411b363 4325{
257d0af6 4326 struct p_block_ack *p = &mdev->tconn->meta.rbuf.block_ack;
b411b363
PR
4327 sector_t sector = be64_to_cpu(p->sector);
4328 int blksize = be32_to_cpu(p->blksize);
4329
31890f4a 4330 D_ASSERT(mdev->tconn->agreed_pro_version >= 89);
b411b363
PR
4331
4332 update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4333
1d53f09e
LE
4334 if (get_ldev(mdev)) {
4335 drbd_rs_complete_io(mdev, sector);
4336 drbd_set_in_sync(mdev, sector, blksize);
4337 /* rs_same_csums is supposed to count in units of BM_BLOCK_SIZE */
4338 mdev->rs_same_csum += (blksize >> BM_BLOCK_SHIFT);
4339 put_ldev(mdev);
4340 }
b411b363 4341 dec_rs_pending(mdev);
778f271d 4342 atomic_add(blksize >> 9, &mdev->rs_sect_in);
b411b363 4343
81e84650 4344 return true;
b411b363
PR
4345}
4346
bc9c5c41
AG
4347static int
4348validate_req_change_req_state(struct drbd_conf *mdev, u64 id, sector_t sector,
4349 struct rb_root *root, const char *func,
4350 enum drbd_req_event what, bool missing_ok)
b411b363
PR
4351{
4352 struct drbd_request *req;
4353 struct bio_and_error m;
4354
87eeee41 4355 spin_lock_irq(&mdev->tconn->req_lock);
bc9c5c41 4356 req = find_request(mdev, root, id, sector, missing_ok, func);
b411b363 4357 if (unlikely(!req)) {
87eeee41 4358 spin_unlock_irq(&mdev->tconn->req_lock);
81e84650 4359 return false;
b411b363
PR
4360 }
4361 __req_mod(req, what, &m);
87eeee41 4362 spin_unlock_irq(&mdev->tconn->req_lock);
b411b363
PR
4363
4364 if (m.bio)
4365 complete_master_bio(mdev, &m);
81e84650 4366 return true;
b411b363
PR
4367}
4368
d8763023 4369static int got_BlockAck(struct drbd_conf *mdev, enum drbd_packet cmd)
b411b363 4370{
257d0af6 4371 struct p_block_ack *p = &mdev->tconn->meta.rbuf.block_ack;
b411b363
PR
4372 sector_t sector = be64_to_cpu(p->sector);
4373 int blksize = be32_to_cpu(p->blksize);
4374 enum drbd_req_event what;
4375
4376 update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4377
579b57ed 4378 if (p->block_id == ID_SYNCER) {
b411b363
PR
4379 drbd_set_in_sync(mdev, sector, blksize);
4380 dec_rs_pending(mdev);
81e84650 4381 return true;
b411b363 4382 }
257d0af6 4383 switch (cmd) {
b411b363 4384 case P_RS_WRITE_ACK:
89e58e75 4385 D_ASSERT(mdev->tconn->net_conf->wire_protocol == DRBD_PROT_C);
8554df1c 4386 what = WRITE_ACKED_BY_PEER_AND_SIS;
b411b363
PR
4387 break;
4388 case P_WRITE_ACK:
89e58e75 4389 D_ASSERT(mdev->tconn->net_conf->wire_protocol == DRBD_PROT_C);
8554df1c 4390 what = WRITE_ACKED_BY_PEER;
b411b363
PR
4391 break;
4392 case P_RECV_ACK:
89e58e75 4393 D_ASSERT(mdev->tconn->net_conf->wire_protocol == DRBD_PROT_B);
8554df1c 4394 what = RECV_ACKED_BY_PEER;
b411b363
PR
4395 break;
4396 case P_DISCARD_ACK:
89e58e75 4397 D_ASSERT(mdev->tconn->net_conf->wire_protocol == DRBD_PROT_C);
8554df1c 4398 what = CONFLICT_DISCARDED_BY_PEER;
b411b363
PR
4399 break;
4400 default:
4401 D_ASSERT(0);
81e84650 4402 return false;
b411b363
PR
4403 }
4404
4405 return validate_req_change_req_state(mdev, p->block_id, sector,
bc9c5c41
AG
4406 &mdev->write_requests, __func__,
4407 what, false);
b411b363
PR
4408}
4409
d8763023 4410static int got_NegAck(struct drbd_conf *mdev, enum drbd_packet cmd)
b411b363 4411{
257d0af6 4412 struct p_block_ack *p = &mdev->tconn->meta.rbuf.block_ack;
b411b363 4413 sector_t sector = be64_to_cpu(p->sector);
2deb8336 4414 int size = be32_to_cpu(p->blksize);
89e58e75
PR
4415 bool missing_ok = mdev->tconn->net_conf->wire_protocol == DRBD_PROT_A ||
4416 mdev->tconn->net_conf->wire_protocol == DRBD_PROT_B;
c3afd8f5 4417 bool found;
b411b363
PR
4418
4419 update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4420
579b57ed 4421 if (p->block_id == ID_SYNCER) {
b411b363
PR
4422 dec_rs_pending(mdev);
4423 drbd_rs_failed_io(mdev, sector, size);
81e84650 4424 return true;
b411b363 4425 }
2deb8336 4426
c3afd8f5 4427 found = validate_req_change_req_state(mdev, p->block_id, sector,
bc9c5c41 4428 &mdev->write_requests, __func__,
8554df1c 4429 NEG_ACKED, missing_ok);
c3afd8f5
AG
4430 if (!found) {
4431 /* Protocol A has no P_WRITE_ACKs, but has P_NEG_ACKs.
4432 The master bio might already be completed, therefore the
4433 request is no longer in the collision hash. */
4434 /* In Protocol B we might already have got a P_RECV_ACK
4435 but then get a P_NEG_ACK afterwards. */
4436 if (!missing_ok)
2deb8336 4437 return false;
c3afd8f5 4438 drbd_set_out_of_sync(mdev, sector, size);
2deb8336 4439 }
2deb8336 4440 return true;
b411b363
PR
4441}
4442
d8763023 4443static int got_NegDReply(struct drbd_conf *mdev, enum drbd_packet cmd)
b411b363 4444{
257d0af6 4445 struct p_block_ack *p = &mdev->tconn->meta.rbuf.block_ack;
b411b363
PR
4446 sector_t sector = be64_to_cpu(p->sector);
4447
4448 update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4449 dev_err(DEV, "Got NegDReply; Sector %llus, len %u; Fail original request.\n",
4450 (unsigned long long)sector, be32_to_cpu(p->blksize));
4451
4452 return validate_req_change_req_state(mdev, p->block_id, sector,
bc9c5c41 4453 &mdev->read_requests, __func__,
8554df1c 4454 NEG_ACKED, false);
b411b363
PR
4455}
4456
d8763023 4457static int got_NegRSDReply(struct drbd_conf *mdev, enum drbd_packet cmd)
b411b363
PR
4458{
4459 sector_t sector;
4460 int size;
257d0af6 4461 struct p_block_ack *p = &mdev->tconn->meta.rbuf.block_ack;
b411b363
PR
4462
4463 sector = be64_to_cpu(p->sector);
4464 size = be32_to_cpu(p->blksize);
b411b363
PR
4465
4466 update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4467
4468 dec_rs_pending(mdev);
4469
4470 if (get_ldev_if_state(mdev, D_FAILED)) {
4471 drbd_rs_complete_io(mdev, sector);
257d0af6 4472 switch (cmd) {
d612d309
PR
4473 case P_NEG_RS_DREPLY:
4474 drbd_rs_failed_io(mdev, sector, size);
4475 case P_RS_CANCEL:
4476 break;
4477 default:
4478 D_ASSERT(0);
4479 put_ldev(mdev);
4480 return false;
4481 }
b411b363
PR
4482 put_ldev(mdev);
4483 }
4484
81e84650 4485 return true;
b411b363
PR
4486}
4487
d8763023 4488static int got_BarrierAck(struct drbd_conf *mdev, enum drbd_packet cmd)
b411b363 4489{
257d0af6 4490 struct p_barrier_ack *p = &mdev->tconn->meta.rbuf.barrier_ack;
b411b363
PR
4491
4492 tl_release(mdev, p->barrier, be32_to_cpu(p->set_size));
4493
c4752ef1
PR
4494 if (mdev->state.conn == C_AHEAD &&
4495 atomic_read(&mdev->ap_in_flight) == 0 &&
370a43e7
PR
4496 !test_and_set_bit(AHEAD_TO_SYNC_SOURCE, &mdev->current_epoch->flags)) {
4497 mdev->start_resync_timer.expires = jiffies + HZ;
4498 add_timer(&mdev->start_resync_timer);
c4752ef1
PR
4499 }
4500
81e84650 4501 return true;
b411b363
PR
4502}
4503
d8763023 4504static int got_OVResult(struct drbd_conf *mdev, enum drbd_packet cmd)
b411b363 4505{
257d0af6 4506 struct p_block_ack *p = &mdev->tconn->meta.rbuf.block_ack;
b411b363
PR
4507 struct drbd_work *w;
4508 sector_t sector;
4509 int size;
4510
4511 sector = be64_to_cpu(p->sector);
4512 size = be32_to_cpu(p->blksize);
4513
4514 update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4515
4516 if (be64_to_cpu(p->block_id) == ID_OUT_OF_SYNC)
4517 drbd_ov_oos_found(mdev, sector, size);
4518 else
4519 ov_oos_print(mdev);
4520
1d53f09e 4521 if (!get_ldev(mdev))
81e84650 4522 return true;
1d53f09e 4523
b411b363
PR
4524 drbd_rs_complete_io(mdev, sector);
4525 dec_rs_pending(mdev);
4526
ea5442af
LE
4527 --mdev->ov_left;
4528
4529 /* let's advance progress step marks only for every other megabyte */
4530 if ((mdev->ov_left & 0x200) == 0x200)
4531 drbd_advance_rs_marks(mdev, mdev->ov_left);
4532
4533 if (mdev->ov_left == 0) {
b411b363
PR
4534 w = kmalloc(sizeof(*w), GFP_NOIO);
4535 if (w) {
4536 w->cb = w_ov_finished;
a21e9298 4537 w->mdev = mdev;
e42325a5 4538 drbd_queue_work_front(&mdev->tconn->data.work, w);
b411b363
PR
4539 } else {
4540 dev_err(DEV, "kmalloc(w) failed.");
4541 ov_oos_print(mdev);
4542 drbd_resync_finished(mdev);
4543 }
4544 }
1d53f09e 4545 put_ldev(mdev);
81e84650 4546 return true;
b411b363
PR
4547}
4548
d8763023 4549static int got_skip(struct drbd_conf *mdev, enum drbd_packet cmd)
0ced55a3 4550{
81e84650 4551 return true;
0ced55a3
PR
4552}
4553
b411b363
PR
4554struct asender_cmd {
4555 size_t pkt_size;
d8763023 4556 int (*process)(struct drbd_conf *mdev, enum drbd_packet cmd);
b411b363
PR
4557};
4558
4559static struct asender_cmd *get_asender_cmd(int cmd)
4560{
4561 static struct asender_cmd asender_tbl[] = {
4562 /* anything missing from this table is in
4563 * the drbd_cmd_handler (drbd_default_handler) table,
4564 * see the beginning of drbdd() */
257d0af6
PR
4565 [P_PING] = { sizeof(struct p_header), got_Ping },
4566 [P_PING_ACK] = { sizeof(struct p_header), got_PingAck },
b411b363
PR
4567 [P_RECV_ACK] = { sizeof(struct p_block_ack), got_BlockAck },
4568 [P_WRITE_ACK] = { sizeof(struct p_block_ack), got_BlockAck },
4569 [P_RS_WRITE_ACK] = { sizeof(struct p_block_ack), got_BlockAck },
4570 [P_DISCARD_ACK] = { sizeof(struct p_block_ack), got_BlockAck },
4571 [P_NEG_ACK] = { sizeof(struct p_block_ack), got_NegAck },
4572 [P_NEG_DREPLY] = { sizeof(struct p_block_ack), got_NegDReply },
4573 [P_NEG_RS_DREPLY] = { sizeof(struct p_block_ack), got_NegRSDReply},
4574 [P_OV_RESULT] = { sizeof(struct p_block_ack), got_OVResult },
4575 [P_BARRIER_ACK] = { sizeof(struct p_barrier_ack), got_BarrierAck },
4576 [P_STATE_CHG_REPLY] = { sizeof(struct p_req_state_reply), got_RqSReply },
4577 [P_RS_IS_IN_SYNC] = { sizeof(struct p_block_ack), got_IsInSync },
02918be2 4578 [P_DELAY_PROBE] = { sizeof(struct p_delay_probe93), got_skip },
d612d309 4579 [P_RS_CANCEL] = { sizeof(struct p_block_ack), got_NegRSDReply},
fc3b10a4 4580 [P_CONN_ST_CHG_REPLY]={ sizeof(struct p_req_state_reply), got_RqSReply },
b411b363
PR
4581 [P_MAX_CMD] = { 0, NULL },
4582 };
4583 if (cmd > P_MAX_CMD || asender_tbl[cmd].process == NULL)
4584 return NULL;
4585 return &asender_tbl[cmd];
4586}
4587
32862ec7
PR
4588static int _drbd_process_done_ee(int vnr, void *p, void *data)
4589{
4590 struct drbd_conf *mdev = (struct drbd_conf *)p;
4591 return !drbd_process_done_ee(mdev);
4592}
4593
4594static int _check_ee_empty(int vnr, void *p, void *data)
4595{
4596 struct drbd_conf *mdev = (struct drbd_conf *)p;
4597 struct drbd_tconn *tconn = mdev->tconn;
4598 int not_empty;
4599
4600 spin_lock_irq(&tconn->req_lock);
4601 not_empty = !list_empty(&mdev->done_ee);
4602 spin_unlock_irq(&tconn->req_lock);
4603
4604 return not_empty;
4605}
4606
4607static int tconn_process_done_ee(struct drbd_tconn *tconn)
4608{
4609 int not_empty, err;
4610
4611 do {
4612 clear_bit(SIGNAL_ASENDER, &tconn->flags);
4613 flush_signals(current);
4614 err = idr_for_each(&tconn->volumes, _drbd_process_done_ee, NULL);
4615 if (err)
4616 return err;
4617 set_bit(SIGNAL_ASENDER, &tconn->flags);
4618 not_empty = idr_for_each(&tconn->volumes, _check_ee_empty, NULL);
4619 } while (not_empty);
4620
4621 return 0;
4622}
4623
b411b363
PR
4624int drbd_asender(struct drbd_thread *thi)
4625{
392c8801 4626 struct drbd_tconn *tconn = thi->tconn;
32862ec7 4627 struct p_header *h = &tconn->meta.rbuf.header;
b411b363 4628 struct asender_cmd *cmd = NULL;
77351055 4629 struct packet_info pi;
257d0af6 4630 int rv;
b411b363
PR
4631 void *buf = h;
4632 int received = 0;
257d0af6 4633 int expect = sizeof(struct p_header);
f36af18c 4634 int ping_timeout_active = 0;
b411b363 4635
b411b363
PR
4636 current->policy = SCHED_RR; /* Make this a realtime task! */
4637 current->rt_priority = 2; /* more important than all other tasks */
4638
e77a0a5c 4639 while (get_t_state(thi) == RUNNING) {
80822284 4640 drbd_thread_current_set_cpu(thi);
32862ec7 4641 if (test_and_clear_bit(SEND_PING, &tconn->flags)) {
2a67d8b9 4642 if (!drbd_send_ping(tconn)) {
32862ec7 4643 conn_err(tconn, "drbd_send_ping has failed\n");
841ce241
AG
4644 goto reconnect;
4645 }
32862ec7
PR
4646 tconn->meta.socket->sk->sk_rcvtimeo =
4647 tconn->net_conf->ping_timeo*HZ/10;
f36af18c 4648 ping_timeout_active = 1;
b411b363
PR
4649 }
4650
32862ec7
PR
4651 /* TODO: conditionally cork; it may hurt latency if we cork without
4652 much to send */
4653 if (!tconn->net_conf->no_cork)
4654 drbd_tcp_cork(tconn->meta.socket);
4655 if (tconn_process_done_ee(tconn))
4656 goto reconnect;
b411b363 4657 /* but unconditionally uncork unless disabled */
32862ec7
PR
4658 if (!tconn->net_conf->no_cork)
4659 drbd_tcp_uncork(tconn->meta.socket);
b411b363
PR
4660
4661 /* short circuit, recv_msg would return EINTR anyways. */
4662 if (signal_pending(current))
4663 continue;
4664
32862ec7
PR
4665 rv = drbd_recv_short(tconn->meta.socket, buf, expect-received, 0);
4666 clear_bit(SIGNAL_ASENDER, &tconn->flags);
b411b363
PR
4667
4668 flush_signals(current);
4669
4670 /* Note:
4671 * -EINTR (on meta) we got a signal
4672 * -EAGAIN (on meta) rcvtimeo expired
4673 * -ECONNRESET other side closed the connection
4674 * -ERESTARTSYS (on data) we got a signal
4675 * rv < 0 other than above: unexpected error!
4676 * rv == expected: full header or command
4677 * rv < expected: "woken" by signal during receive
4678 * rv == 0 : "connection shut down by peer"
4679 */
4680 if (likely(rv > 0)) {
4681 received += rv;
4682 buf += rv;
4683 } else if (rv == 0) {
32862ec7 4684 conn_err(tconn, "meta connection shut down by peer.\n");
b411b363
PR
4685 goto reconnect;
4686 } else if (rv == -EAGAIN) {
cb6518cb
LE
4687 /* If the data socket received something meanwhile,
4688 * that is good enough: peer is still alive. */
32862ec7
PR
4689 if (time_after(tconn->last_received,
4690 jiffies - tconn->meta.socket->sk->sk_rcvtimeo))
cb6518cb 4691 continue;
f36af18c 4692 if (ping_timeout_active) {
32862ec7 4693 conn_err(tconn, "PingAck did not arrive in time.\n");
b411b363
PR
4694 goto reconnect;
4695 }
32862ec7 4696 set_bit(SEND_PING, &tconn->flags);
b411b363
PR
4697 continue;
4698 } else if (rv == -EINTR) {
4699 continue;
4700 } else {
32862ec7 4701 conn_err(tconn, "sock_recvmsg returned %d\n", rv);
b411b363
PR
4702 goto reconnect;
4703 }
4704
4705 if (received == expect && cmd == NULL) {
32862ec7 4706 if (!decode_header(tconn, h, &pi))
b411b363 4707 goto reconnect;
77351055 4708 cmd = get_asender_cmd(pi.cmd);
b411b363 4709 if (unlikely(cmd == NULL)) {
32862ec7 4710 conn_err(tconn, "unknown command %d on meta (l: %d)\n",
77351055 4711 pi.cmd, pi.size);
b411b363
PR
4712 goto disconnect;
4713 }
4714 expect = cmd->pkt_size;
77351055 4715 if (pi.size != expect - sizeof(struct p_header)) {
32862ec7 4716 conn_err(tconn, "Wrong packet size on meta (c: %d, l: %d)\n",
77351055 4717 pi.cmd, pi.size);
b411b363 4718 goto reconnect;
257d0af6 4719 }
b411b363
PR
4720 }
4721 if (received == expect) {
32862ec7
PR
4722 tconn->last_received = jiffies;
4723 if (!cmd->process(vnr_to_mdev(tconn, pi.vnr), pi.cmd))
b411b363
PR
4724 goto reconnect;
4725
f36af18c
LE
4726 /* the idle_timeout (ping-int)
4727 * has been restored in got_PingAck() */
4728 if (cmd == get_asender_cmd(P_PING_ACK))
4729 ping_timeout_active = 0;
4730
b411b363
PR
4731 buf = h;
4732 received = 0;
257d0af6 4733 expect = sizeof(struct p_header);
b411b363
PR
4734 cmd = NULL;
4735 }
4736 }
4737
4738 if (0) {
4739reconnect:
bbeb641c 4740 conn_request_state(tconn, NS(conn, C_NETWORK_FAILURE), CS_HARD);
b411b363
PR
4741 }
4742 if (0) {
4743disconnect:
bbeb641c 4744 conn_request_state(tconn, NS(conn, C_DISCONNECTING), CS_HARD);
b411b363 4745 }
32862ec7 4746 clear_bit(SIGNAL_ASENDER, &tconn->flags);
b411b363 4747
32862ec7 4748 conn_info(tconn, "asender terminated\n");
b411b363
PR
4749
4750 return 0;
4751}
This page took 0.38498 seconds and 5 git commands to generate.