drbd: Removed outdated comments and code that envisioned VNRs in header 95
[deliverable/linux.git] / drivers / block / drbd / drbd_receiver.c
CommitLineData
b411b363
PR
1/*
2 drbd_receiver.c
3
4 This file is part of DRBD by Philipp Reisner and Lars Ellenberg.
5
6 Copyright (C) 2001-2008, LINBIT Information Technologies GmbH.
7 Copyright (C) 1999-2008, Philipp Reisner <philipp.reisner@linbit.com>.
8 Copyright (C) 2002-2008, Lars Ellenberg <lars.ellenberg@linbit.com>.
9
10 drbd is free software; you can redistribute it and/or modify
11 it under the terms of the GNU General Public License as published by
12 the Free Software Foundation; either version 2, or (at your option)
13 any later version.
14
15 drbd is distributed in the hope that it will be useful,
16 but WITHOUT ANY WARRANTY; without even the implied warranty of
17 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18 GNU General Public License for more details.
19
20 You should have received a copy of the GNU General Public License
21 along with drbd; see the file COPYING. If not, write to
22 the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
23 */
24
25
b411b363
PR
26#include <linux/module.h>
27
28#include <asm/uaccess.h>
29#include <net/sock.h>
30
b411b363
PR
31#include <linux/drbd.h>
32#include <linux/fs.h>
33#include <linux/file.h>
34#include <linux/in.h>
35#include <linux/mm.h>
36#include <linux/memcontrol.h>
37#include <linux/mm_inline.h>
38#include <linux/slab.h>
b411b363
PR
39#include <linux/pkt_sched.h>
40#define __KERNEL_SYSCALLS__
41#include <linux/unistd.h>
42#include <linux/vmalloc.h>
43#include <linux/random.h>
b411b363
PR
44#include <linux/string.h>
45#include <linux/scatterlist.h>
46#include "drbd_int.h"
b411b363
PR
47#include "drbd_req.h"
48
49#include "drbd_vli.h"
50
77351055
PR
51struct packet_info {
52 enum drbd_packet cmd;
e2857216
AG
53 unsigned int size;
54 unsigned int vnr;
e658983a 55 void *data;
77351055
PR
56};
57
b411b363
PR
58enum finish_epoch {
59 FE_STILL_LIVE,
60 FE_DESTROYED,
61 FE_RECYCLED,
62};
63
6038178e 64static int drbd_do_features(struct drbd_tconn *tconn);
13e6037d 65static int drbd_do_auth(struct drbd_tconn *tconn);
360cc740 66static int drbd_disconnected(int vnr, void *p, void *data);
b411b363
PR
67
68static enum finish_epoch drbd_may_finish_epoch(struct drbd_conf *, struct drbd_epoch *, enum epoch_event);
99920dc5 69static int e_end_block(struct drbd_work *, int);
b411b363 70
b411b363
PR
71
72#define GFP_TRY (__GFP_HIGHMEM | __GFP_NOWARN)
73
45bb912b
LE
74/*
75 * some helper functions to deal with single linked page lists,
76 * page->private being our "next" pointer.
77 */
78
79/* If at least n pages are linked at head, get n pages off.
80 * Otherwise, don't modify head, and return NULL.
81 * Locking is the responsibility of the caller.
82 */
83static struct page *page_chain_del(struct page **head, int n)
84{
85 struct page *page;
86 struct page *tmp;
87
88 BUG_ON(!n);
89 BUG_ON(!head);
90
91 page = *head;
23ce4227
PR
92
93 if (!page)
94 return NULL;
95
45bb912b
LE
96 while (page) {
97 tmp = page_chain_next(page);
98 if (--n == 0)
99 break; /* found sufficient pages */
100 if (tmp == NULL)
101 /* insufficient pages, don't use any of them. */
102 return NULL;
103 page = tmp;
104 }
105
106 /* add end of list marker for the returned list */
107 set_page_private(page, 0);
108 /* actual return value, and adjustment of head */
109 page = *head;
110 *head = tmp;
111 return page;
112}
113
114/* may be used outside of locks to find the tail of a (usually short)
115 * "private" page chain, before adding it back to a global chain head
116 * with page_chain_add() under a spinlock. */
117static struct page *page_chain_tail(struct page *page, int *len)
118{
119 struct page *tmp;
120 int i = 1;
121 while ((tmp = page_chain_next(page)))
122 ++i, page = tmp;
123 if (len)
124 *len = i;
125 return page;
126}
127
128static int page_chain_free(struct page *page)
129{
130 struct page *tmp;
131 int i = 0;
132 page_chain_for_each_safe(page, tmp) {
133 put_page(page);
134 ++i;
135 }
136 return i;
137}
138
139static void page_chain_add(struct page **head,
140 struct page *chain_first, struct page *chain_last)
141{
142#if 1
143 struct page *tmp;
144 tmp = page_chain_tail(chain_first, NULL);
145 BUG_ON(tmp != chain_last);
146#endif
147
148 /* add chain to head */
149 set_page_private(chain_last, (unsigned long)*head);
150 *head = chain_first;
151}
152
153static struct page *drbd_pp_first_pages_or_try_alloc(struct drbd_conf *mdev, int number)
b411b363
PR
154{
155 struct page *page = NULL;
45bb912b
LE
156 struct page *tmp = NULL;
157 int i = 0;
b411b363
PR
158
159 /* Yes, testing drbd_pp_vacant outside the lock is racy.
160 * So what. It saves a spin_lock. */
45bb912b 161 if (drbd_pp_vacant >= number) {
b411b363 162 spin_lock(&drbd_pp_lock);
45bb912b
LE
163 page = page_chain_del(&drbd_pp_pool, number);
164 if (page)
165 drbd_pp_vacant -= number;
b411b363 166 spin_unlock(&drbd_pp_lock);
45bb912b
LE
167 if (page)
168 return page;
b411b363 169 }
45bb912b 170
b411b363
PR
171 /* GFP_TRY, because we must not cause arbitrary write-out: in a DRBD
172 * "criss-cross" setup, that might cause write-out on some other DRBD,
173 * which in turn might block on the other node at this very place. */
45bb912b
LE
174 for (i = 0; i < number; i++) {
175 tmp = alloc_page(GFP_TRY);
176 if (!tmp)
177 break;
178 set_page_private(tmp, (unsigned long)page);
179 page = tmp;
180 }
181
182 if (i == number)
183 return page;
184
185 /* Not enough pages immediately available this time.
186 * No need to jump around here, drbd_pp_alloc will retry this
187 * function "soon". */
188 if (page) {
189 tmp = page_chain_tail(page, NULL);
190 spin_lock(&drbd_pp_lock);
191 page_chain_add(&drbd_pp_pool, page, tmp);
192 drbd_pp_vacant += i;
193 spin_unlock(&drbd_pp_lock);
194 }
195 return NULL;
b411b363
PR
196}
197
b411b363
PR
198static void reclaim_net_ee(struct drbd_conf *mdev, struct list_head *to_be_freed)
199{
db830c46 200 struct drbd_peer_request *peer_req;
b411b363
PR
201 struct list_head *le, *tle;
202
203 /* The EEs are always appended to the end of the list. Since
204 they are sent in order over the wire, they have to finish
205 in order. As soon as we see the first not finished we can
206 stop to examine the list... */
207
208 list_for_each_safe(le, tle, &mdev->net_ee) {
db830c46
AG
209 peer_req = list_entry(le, struct drbd_peer_request, w.list);
210 if (drbd_ee_has_active_page(peer_req))
b411b363
PR
211 break;
212 list_move(le, to_be_freed);
213 }
214}
215
216static void drbd_kick_lo_and_reclaim_net(struct drbd_conf *mdev)
217{
218 LIST_HEAD(reclaimed);
db830c46 219 struct drbd_peer_request *peer_req, *t;
b411b363 220
87eeee41 221 spin_lock_irq(&mdev->tconn->req_lock);
b411b363 222 reclaim_net_ee(mdev, &reclaimed);
87eeee41 223 spin_unlock_irq(&mdev->tconn->req_lock);
b411b363 224
db830c46
AG
225 list_for_each_entry_safe(peer_req, t, &reclaimed, w.list)
226 drbd_free_net_ee(mdev, peer_req);
b411b363
PR
227}
228
229/**
45bb912b 230 * drbd_pp_alloc() - Returns @number pages, retries forever (or until signalled)
b411b363 231 * @mdev: DRBD device.
45bb912b
LE
232 * @number: number of pages requested
233 * @retry: whether to retry, if not enough pages are available right now
234 *
235 * Tries to allocate number pages, first from our own page pool, then from
236 * the kernel, unless this allocation would exceed the max_buffers setting.
237 * Possibly retry until DRBD frees sufficient pages somewhere else.
b411b363 238 *
45bb912b 239 * Returns a page chain linked via page->private.
b411b363 240 */
45bb912b 241static struct page *drbd_pp_alloc(struct drbd_conf *mdev, unsigned number, bool retry)
b411b363
PR
242{
243 struct page *page = NULL;
244 DEFINE_WAIT(wait);
245
45bb912b
LE
246 /* Yes, we may run up to @number over max_buffers. If we
247 * follow it strictly, the admin will get it wrong anyways. */
89e58e75 248 if (atomic_read(&mdev->pp_in_use) < mdev->tconn->net_conf->max_buffers)
45bb912b 249 page = drbd_pp_first_pages_or_try_alloc(mdev, number);
b411b363 250
45bb912b 251 while (page == NULL) {
b411b363
PR
252 prepare_to_wait(&drbd_pp_wait, &wait, TASK_INTERRUPTIBLE);
253
254 drbd_kick_lo_and_reclaim_net(mdev);
255
89e58e75 256 if (atomic_read(&mdev->pp_in_use) < mdev->tconn->net_conf->max_buffers) {
45bb912b 257 page = drbd_pp_first_pages_or_try_alloc(mdev, number);
b411b363
PR
258 if (page)
259 break;
260 }
261
262 if (!retry)
263 break;
264
265 if (signal_pending(current)) {
266 dev_warn(DEV, "drbd_pp_alloc interrupted!\n");
267 break;
268 }
269
270 schedule();
271 }
272 finish_wait(&drbd_pp_wait, &wait);
273
45bb912b
LE
274 if (page)
275 atomic_add(number, &mdev->pp_in_use);
b411b363
PR
276 return page;
277}
278
279/* Must not be used from irq, as that may deadlock: see drbd_pp_alloc.
87eeee41 280 * Is also used from inside an other spin_lock_irq(&mdev->tconn->req_lock);
45bb912b
LE
281 * Either links the page chain back to the global pool,
282 * or returns all pages to the system. */
435f0740 283static void drbd_pp_free(struct drbd_conf *mdev, struct page *page, int is_net)
b411b363 284{
435f0740 285 atomic_t *a = is_net ? &mdev->pp_in_use_by_net : &mdev->pp_in_use;
b411b363 286 int i;
435f0740 287
81a5d60e 288 if (drbd_pp_vacant > (DRBD_MAX_BIO_SIZE/PAGE_SIZE) * minor_count)
45bb912b
LE
289 i = page_chain_free(page);
290 else {
291 struct page *tmp;
292 tmp = page_chain_tail(page, &i);
293 spin_lock(&drbd_pp_lock);
294 page_chain_add(&drbd_pp_pool, page, tmp);
295 drbd_pp_vacant += i;
296 spin_unlock(&drbd_pp_lock);
b411b363 297 }
435f0740 298 i = atomic_sub_return(i, a);
45bb912b 299 if (i < 0)
435f0740
LE
300 dev_warn(DEV, "ASSERTION FAILED: %s: %d < 0\n",
301 is_net ? "pp_in_use_by_net" : "pp_in_use", i);
b411b363
PR
302 wake_up(&drbd_pp_wait);
303}
304
305/*
306You need to hold the req_lock:
307 _drbd_wait_ee_list_empty()
308
309You must not have the req_lock:
310 drbd_free_ee()
311 drbd_alloc_ee()
312 drbd_init_ee()
313 drbd_release_ee()
314 drbd_ee_fix_bhs()
315 drbd_process_done_ee()
316 drbd_clear_done_ee()
317 drbd_wait_ee_list_empty()
318*/
319
f6ffca9f
AG
320struct drbd_peer_request *
321drbd_alloc_ee(struct drbd_conf *mdev, u64 id, sector_t sector,
322 unsigned int data_size, gfp_t gfp_mask) __must_hold(local)
b411b363 323{
db830c46 324 struct drbd_peer_request *peer_req;
b411b363 325 struct page *page;
45bb912b 326 unsigned nr_pages = (data_size + PAGE_SIZE -1) >> PAGE_SHIFT;
b411b363 327
0cf9d27e 328 if (drbd_insert_fault(mdev, DRBD_FAULT_AL_EE))
b411b363
PR
329 return NULL;
330
db830c46
AG
331 peer_req = mempool_alloc(drbd_ee_mempool, gfp_mask & ~__GFP_HIGHMEM);
332 if (!peer_req) {
b411b363
PR
333 if (!(gfp_mask & __GFP_NOWARN))
334 dev_err(DEV, "alloc_ee: Allocation of an EE failed\n");
335 return NULL;
336 }
337
45bb912b
LE
338 page = drbd_pp_alloc(mdev, nr_pages, (gfp_mask & __GFP_WAIT));
339 if (!page)
340 goto fail;
b411b363 341
db830c46
AG
342 drbd_clear_interval(&peer_req->i);
343 peer_req->i.size = data_size;
344 peer_req->i.sector = sector;
345 peer_req->i.local = false;
346 peer_req->i.waiting = false;
347
348 peer_req->epoch = NULL;
a21e9298 349 peer_req->w.mdev = mdev;
db830c46
AG
350 peer_req->pages = page;
351 atomic_set(&peer_req->pending_bios, 0);
352 peer_req->flags = 0;
9a8e7753
AG
353 /*
354 * The block_id is opaque to the receiver. It is not endianness
355 * converted, and sent back to the sender unchanged.
356 */
db830c46 357 peer_req->block_id = id;
b411b363 358
db830c46 359 return peer_req;
b411b363 360
45bb912b 361 fail:
db830c46 362 mempool_free(peer_req, drbd_ee_mempool);
b411b363
PR
363 return NULL;
364}
365
db830c46 366void drbd_free_some_ee(struct drbd_conf *mdev, struct drbd_peer_request *peer_req,
f6ffca9f 367 int is_net)
b411b363 368{
db830c46
AG
369 if (peer_req->flags & EE_HAS_DIGEST)
370 kfree(peer_req->digest);
371 drbd_pp_free(mdev, peer_req->pages, is_net);
372 D_ASSERT(atomic_read(&peer_req->pending_bios) == 0);
373 D_ASSERT(drbd_interval_empty(&peer_req->i));
374 mempool_free(peer_req, drbd_ee_mempool);
b411b363
PR
375}
376
377int drbd_release_ee(struct drbd_conf *mdev, struct list_head *list)
378{
379 LIST_HEAD(work_list);
db830c46 380 struct drbd_peer_request *peer_req, *t;
b411b363 381 int count = 0;
435f0740 382 int is_net = list == &mdev->net_ee;
b411b363 383
87eeee41 384 spin_lock_irq(&mdev->tconn->req_lock);
b411b363 385 list_splice_init(list, &work_list);
87eeee41 386 spin_unlock_irq(&mdev->tconn->req_lock);
b411b363 387
db830c46
AG
388 list_for_each_entry_safe(peer_req, t, &work_list, w.list) {
389 drbd_free_some_ee(mdev, peer_req, is_net);
b411b363
PR
390 count++;
391 }
392 return count;
393}
394
395
32862ec7 396/* See also comments in _req_mod(,BARRIER_ACKED)
b411b363
PR
397 * and receive_Barrier.
398 *
399 * Move entries from net_ee to done_ee, if ready.
400 * Grab done_ee, call all callbacks, free the entries.
401 * The callbacks typically send out ACKs.
402 */
403static int drbd_process_done_ee(struct drbd_conf *mdev)
404{
405 LIST_HEAD(work_list);
406 LIST_HEAD(reclaimed);
db830c46 407 struct drbd_peer_request *peer_req, *t;
e2b3032b 408 int err = 0;
b411b363 409
87eeee41 410 spin_lock_irq(&mdev->tconn->req_lock);
b411b363
PR
411 reclaim_net_ee(mdev, &reclaimed);
412 list_splice_init(&mdev->done_ee, &work_list);
87eeee41 413 spin_unlock_irq(&mdev->tconn->req_lock);
b411b363 414
db830c46
AG
415 list_for_each_entry_safe(peer_req, t, &reclaimed, w.list)
416 drbd_free_net_ee(mdev, peer_req);
b411b363
PR
417
418 /* possible callbacks here:
7be8da07 419 * e_end_block, and e_end_resync_block, e_send_discard_write.
b411b363
PR
420 * all ignore the last argument.
421 */
db830c46 422 list_for_each_entry_safe(peer_req, t, &work_list, w.list) {
e2b3032b
AG
423 int err2;
424
b411b363 425 /* list_del not necessary, next/prev members not touched */
e2b3032b
AG
426 err2 = peer_req->w.cb(&peer_req->w, !!err);
427 if (!err)
428 err = err2;
db830c46 429 drbd_free_ee(mdev, peer_req);
b411b363
PR
430 }
431 wake_up(&mdev->ee_wait);
432
e2b3032b 433 return err;
b411b363
PR
434}
435
436void _drbd_wait_ee_list_empty(struct drbd_conf *mdev, struct list_head *head)
437{
438 DEFINE_WAIT(wait);
439
440 /* avoids spin_lock/unlock
441 * and calling prepare_to_wait in the fast path */
442 while (!list_empty(head)) {
443 prepare_to_wait(&mdev->ee_wait, &wait, TASK_UNINTERRUPTIBLE);
87eeee41 444 spin_unlock_irq(&mdev->tconn->req_lock);
7eaceacc 445 io_schedule();
b411b363 446 finish_wait(&mdev->ee_wait, &wait);
87eeee41 447 spin_lock_irq(&mdev->tconn->req_lock);
b411b363
PR
448 }
449}
450
451void drbd_wait_ee_list_empty(struct drbd_conf *mdev, struct list_head *head)
452{
87eeee41 453 spin_lock_irq(&mdev->tconn->req_lock);
b411b363 454 _drbd_wait_ee_list_empty(mdev, head);
87eeee41 455 spin_unlock_irq(&mdev->tconn->req_lock);
b411b363
PR
456}
457
458/* see also kernel_accept; which is only present since 2.6.18.
459 * also we want to log which part of it failed, exactly */
7653620d 460static int drbd_accept(const char **what, struct socket *sock, struct socket **newsock)
b411b363
PR
461{
462 struct sock *sk = sock->sk;
463 int err = 0;
464
465 *what = "listen";
466 err = sock->ops->listen(sock, 5);
467 if (err < 0)
468 goto out;
469
470 *what = "sock_create_lite";
471 err = sock_create_lite(sk->sk_family, sk->sk_type, sk->sk_protocol,
472 newsock);
473 if (err < 0)
474 goto out;
475
476 *what = "accept";
477 err = sock->ops->accept(sock, *newsock, 0);
478 if (err < 0) {
479 sock_release(*newsock);
480 *newsock = NULL;
481 goto out;
482 }
483 (*newsock)->ops = sock->ops;
484
485out:
486 return err;
487}
488
dbd9eea0 489static int drbd_recv_short(struct socket *sock, void *buf, size_t size, int flags)
b411b363
PR
490{
491 mm_segment_t oldfs;
492 struct kvec iov = {
493 .iov_base = buf,
494 .iov_len = size,
495 };
496 struct msghdr msg = {
497 .msg_iovlen = 1,
498 .msg_iov = (struct iovec *)&iov,
499 .msg_flags = (flags ? flags : MSG_WAITALL | MSG_NOSIGNAL)
500 };
501 int rv;
502
503 oldfs = get_fs();
504 set_fs(KERNEL_DS);
505 rv = sock_recvmsg(sock, &msg, size, msg.msg_flags);
506 set_fs(oldfs);
507
508 return rv;
509}
510
de0ff338 511static int drbd_recv(struct drbd_tconn *tconn, void *buf, size_t size)
b411b363
PR
512{
513 mm_segment_t oldfs;
514 struct kvec iov = {
515 .iov_base = buf,
516 .iov_len = size,
517 };
518 struct msghdr msg = {
519 .msg_iovlen = 1,
520 .msg_iov = (struct iovec *)&iov,
521 .msg_flags = MSG_WAITALL | MSG_NOSIGNAL
522 };
523 int rv;
524
525 oldfs = get_fs();
526 set_fs(KERNEL_DS);
527
528 for (;;) {
de0ff338 529 rv = sock_recvmsg(tconn->data.socket, &msg, size, msg.msg_flags);
b411b363
PR
530 if (rv == size)
531 break;
532
533 /* Note:
534 * ECONNRESET other side closed the connection
535 * ERESTARTSYS (on sock) we got a signal
536 */
537
538 if (rv < 0) {
539 if (rv == -ECONNRESET)
de0ff338 540 conn_info(tconn, "sock was reset by peer\n");
b411b363 541 else if (rv != -ERESTARTSYS)
de0ff338 542 conn_err(tconn, "sock_recvmsg returned %d\n", rv);
b411b363
PR
543 break;
544 } else if (rv == 0) {
de0ff338 545 conn_info(tconn, "sock was shut down by peer\n");
b411b363
PR
546 break;
547 } else {
548 /* signal came in, or peer/link went down,
549 * after we read a partial message
550 */
551 /* D_ASSERT(signal_pending(current)); */
552 break;
553 }
554 };
555
556 set_fs(oldfs);
557
558 if (rv != size)
bbeb641c 559 conn_request_state(tconn, NS(conn, C_BROKEN_PIPE), CS_HARD);
b411b363
PR
560
561 return rv;
562}
563
c6967746
AG
564static int drbd_recv_all(struct drbd_tconn *tconn, void *buf, size_t size)
565{
566 int err;
567
568 err = drbd_recv(tconn, buf, size);
569 if (err != size) {
570 if (err >= 0)
571 err = -EIO;
572 } else
573 err = 0;
574 return err;
575}
576
a5c31904
AG
577static int drbd_recv_all_warn(struct drbd_tconn *tconn, void *buf, size_t size)
578{
579 int err;
580
581 err = drbd_recv_all(tconn, buf, size);
582 if (err && !signal_pending(current))
583 conn_warn(tconn, "short read (expected size %d)\n", (int)size);
584 return err;
585}
586
5dbf1673
LE
587/* quoting tcp(7):
588 * On individual connections, the socket buffer size must be set prior to the
589 * listen(2) or connect(2) calls in order to have it take effect.
590 * This is our wrapper to do so.
591 */
592static void drbd_setbufsize(struct socket *sock, unsigned int snd,
593 unsigned int rcv)
594{
595 /* open coded SO_SNDBUF, SO_RCVBUF */
596 if (snd) {
597 sock->sk->sk_sndbuf = snd;
598 sock->sk->sk_userlocks |= SOCK_SNDBUF_LOCK;
599 }
600 if (rcv) {
601 sock->sk->sk_rcvbuf = rcv;
602 sock->sk->sk_userlocks |= SOCK_RCVBUF_LOCK;
603 }
604}
605
eac3e990 606static struct socket *drbd_try_connect(struct drbd_tconn *tconn)
b411b363
PR
607{
608 const char *what;
609 struct socket *sock;
610 struct sockaddr_in6 src_in6;
611 int err;
612 int disconnect_on_error = 1;
613
eac3e990 614 if (!get_net_conf(tconn))
b411b363
PR
615 return NULL;
616
617 what = "sock_create_kern";
eac3e990 618 err = sock_create_kern(((struct sockaddr *)tconn->net_conf->my_addr)->sa_family,
b411b363
PR
619 SOCK_STREAM, IPPROTO_TCP, &sock);
620 if (err < 0) {
621 sock = NULL;
622 goto out;
623 }
624
625 sock->sk->sk_rcvtimeo =
eac3e990
PR
626 sock->sk->sk_sndtimeo = tconn->net_conf->try_connect_int*HZ;
627 drbd_setbufsize(sock, tconn->net_conf->sndbuf_size,
628 tconn->net_conf->rcvbuf_size);
b411b363
PR
629
630 /* explicitly bind to the configured IP as source IP
631 * for the outgoing connections.
632 * This is needed for multihomed hosts and to be
633 * able to use lo: interfaces for drbd.
634 * Make sure to use 0 as port number, so linux selects
635 * a free one dynamically.
636 */
eac3e990
PR
637 memcpy(&src_in6, tconn->net_conf->my_addr,
638 min_t(int, tconn->net_conf->my_addr_len, sizeof(src_in6)));
639 if (((struct sockaddr *)tconn->net_conf->my_addr)->sa_family == AF_INET6)
b411b363
PR
640 src_in6.sin6_port = 0;
641 else
642 ((struct sockaddr_in *)&src_in6)->sin_port = 0; /* AF_INET & AF_SCI */
643
644 what = "bind before connect";
645 err = sock->ops->bind(sock,
646 (struct sockaddr *) &src_in6,
eac3e990 647 tconn->net_conf->my_addr_len);
b411b363
PR
648 if (err < 0)
649 goto out;
650
651 /* connect may fail, peer not yet available.
652 * stay C_WF_CONNECTION, don't go Disconnecting! */
653 disconnect_on_error = 0;
654 what = "connect";
655 err = sock->ops->connect(sock,
eac3e990
PR
656 (struct sockaddr *)tconn->net_conf->peer_addr,
657 tconn->net_conf->peer_addr_len, 0);
b411b363
PR
658
659out:
660 if (err < 0) {
661 if (sock) {
662 sock_release(sock);
663 sock = NULL;
664 }
665 switch (-err) {
666 /* timeout, busy, signal pending */
667 case ETIMEDOUT: case EAGAIN: case EINPROGRESS:
668 case EINTR: case ERESTARTSYS:
669 /* peer not (yet) available, network problem */
670 case ECONNREFUSED: case ENETUNREACH:
671 case EHOSTDOWN: case EHOSTUNREACH:
672 disconnect_on_error = 0;
673 break;
674 default:
eac3e990 675 conn_err(tconn, "%s failed, err = %d\n", what, err);
b411b363
PR
676 }
677 if (disconnect_on_error)
bbeb641c 678 conn_request_state(tconn, NS(conn, C_DISCONNECTING), CS_HARD);
b411b363 679 }
eac3e990 680 put_net_conf(tconn);
b411b363
PR
681 return sock;
682}
683
7653620d 684static struct socket *drbd_wait_for_connect(struct drbd_tconn *tconn)
b411b363
PR
685{
686 int timeo, err;
687 struct socket *s_estab = NULL, *s_listen;
688 const char *what;
689
7653620d 690 if (!get_net_conf(tconn))
b411b363
PR
691 return NULL;
692
693 what = "sock_create_kern";
7653620d 694 err = sock_create_kern(((struct sockaddr *)tconn->net_conf->my_addr)->sa_family,
b411b363
PR
695 SOCK_STREAM, IPPROTO_TCP, &s_listen);
696 if (err) {
697 s_listen = NULL;
698 goto out;
699 }
700
7653620d 701 timeo = tconn->net_conf->try_connect_int * HZ;
b411b363
PR
702 timeo += (random32() & 1) ? timeo / 7 : -timeo / 7; /* 28.5% random jitter */
703
704 s_listen->sk->sk_reuse = 1; /* SO_REUSEADDR */
705 s_listen->sk->sk_rcvtimeo = timeo;
706 s_listen->sk->sk_sndtimeo = timeo;
7653620d
PR
707 drbd_setbufsize(s_listen, tconn->net_conf->sndbuf_size,
708 tconn->net_conf->rcvbuf_size);
b411b363
PR
709
710 what = "bind before listen";
711 err = s_listen->ops->bind(s_listen,
7653620d
PR
712 (struct sockaddr *) tconn->net_conf->my_addr,
713 tconn->net_conf->my_addr_len);
b411b363
PR
714 if (err < 0)
715 goto out;
716
7653620d 717 err = drbd_accept(&what, s_listen, &s_estab);
b411b363
PR
718
719out:
720 if (s_listen)
721 sock_release(s_listen);
722 if (err < 0) {
723 if (err != -EAGAIN && err != -EINTR && err != -ERESTARTSYS) {
7653620d 724 conn_err(tconn, "%s failed, err = %d\n", what, err);
bbeb641c 725 conn_request_state(tconn, NS(conn, C_DISCONNECTING), CS_HARD);
b411b363
PR
726 }
727 }
7653620d 728 put_net_conf(tconn);
b411b363
PR
729
730 return s_estab;
731}
732
e658983a 733static int decode_header(struct drbd_tconn *, void *, struct packet_info *);
b411b363 734
9f5bdc33
AG
735static int send_first_packet(struct drbd_tconn *tconn, struct drbd_socket *sock,
736 enum drbd_packet cmd)
737{
738 if (!conn_prepare_command(tconn, sock))
739 return -EIO;
e658983a 740 return conn_send_command(tconn, sock, cmd, 0, NULL, 0);
b411b363
PR
741}
742
9f5bdc33 743static int receive_first_packet(struct drbd_tconn *tconn, struct socket *sock)
b411b363 744{
9f5bdc33
AG
745 unsigned int header_size = drbd_header_size(tconn);
746 struct packet_info pi;
747 int err;
b411b363 748
9f5bdc33
AG
749 err = drbd_recv_short(sock, tconn->data.rbuf, header_size, 0);
750 if (err != header_size) {
751 if (err >= 0)
752 err = -EIO;
753 return err;
754 }
755 err = decode_header(tconn, tconn->data.rbuf, &pi);
756 if (err)
757 return err;
758 return pi.cmd;
b411b363
PR
759}
760
761/**
762 * drbd_socket_okay() - Free the socket if its connection is not okay
b411b363
PR
763 * @sock: pointer to the pointer to the socket.
764 */
dbd9eea0 765static int drbd_socket_okay(struct socket **sock)
b411b363
PR
766{
767 int rr;
768 char tb[4];
769
770 if (!*sock)
81e84650 771 return false;
b411b363 772
dbd9eea0 773 rr = drbd_recv_short(*sock, tb, 4, MSG_DONTWAIT | MSG_PEEK);
b411b363
PR
774
775 if (rr > 0 || rr == -EAGAIN) {
81e84650 776 return true;
b411b363
PR
777 } else {
778 sock_release(*sock);
779 *sock = NULL;
81e84650 780 return false;
b411b363
PR
781 }
782}
2325eb66
PR
783/* Gets called if a connection is established, or if a new minor gets created
784 in a connection */
785int drbd_connected(int vnr, void *p, void *data)
907599e0
PR
786{
787 struct drbd_conf *mdev = (struct drbd_conf *)p;
0829f5ed 788 int err;
907599e0
PR
789
790 atomic_set(&mdev->packet_seq, 0);
791 mdev->peer_seq = 0;
792
8410da8f
PR
793 mdev->state_mutex = mdev->tconn->agreed_pro_version < 100 ?
794 &mdev->tconn->cstate_mutex :
795 &mdev->own_state_mutex;
796
0829f5ed
AG
797 err = drbd_send_sync_param(mdev);
798 if (!err)
799 err = drbd_send_sizes(mdev, 0, 0);
800 if (!err)
801 err = drbd_send_uuids(mdev);
802 if (!err)
803 err = drbd_send_state(mdev);
907599e0
PR
804 clear_bit(USE_DEGR_WFC_T, &mdev->flags);
805 clear_bit(RESIZE_PENDING, &mdev->flags);
8b924f1d 806 mod_timer(&mdev->request_timer, jiffies + HZ); /* just start it here. */
0829f5ed 807 return err;
907599e0
PR
808}
809
b411b363
PR
810/*
811 * return values:
812 * 1 yes, we have a valid connection
813 * 0 oops, did not work out, please try again
814 * -1 peer talks different language,
815 * no point in trying again, please go standalone.
816 * -2 We do not have a network config...
817 */
907599e0 818static int drbd_connect(struct drbd_tconn *tconn)
b411b363 819{
2bf89621 820 struct socket *sock, *msock;
b411b363
PR
821 int try, h, ok;
822
bbeb641c 823 if (conn_request_state(tconn, NS(conn, C_WF_CONNECTION), CS_VERBOSE) < SS_SUCCESS)
b411b363
PR
824 return -2;
825
907599e0 826 clear_bit(DISCARD_CONCURRENT, &tconn->flags);
0916e0e3
AG
827
828 /* Assume that the peer only understands protocol 80 until we know better. */
829 tconn->agreed_pro_version = 80;
b411b363 830
b411b363 831 do {
2bf89621
AG
832 struct socket *s;
833
b411b363
PR
834 for (try = 0;;) {
835 /* 3 tries, this should take less than a second! */
907599e0 836 s = drbd_try_connect(tconn);
b411b363
PR
837 if (s || ++try >= 3)
838 break;
839 /* give the other side time to call bind() & listen() */
20ee6390 840 schedule_timeout_interruptible(HZ / 10);
b411b363
PR
841 }
842
843 if (s) {
2bf89621
AG
844 if (!tconn->data.socket) {
845 tconn->data.socket = s;
9f5bdc33 846 send_first_packet(tconn, &tconn->data, P_INITIAL_DATA);
2bf89621
AG
847 } else if (!tconn->meta.socket) {
848 tconn->meta.socket = s;
9f5bdc33 849 send_first_packet(tconn, &tconn->meta, P_INITIAL_META);
b411b363 850 } else {
907599e0 851 conn_err(tconn, "Logic error in drbd_connect()\n");
b411b363
PR
852 goto out_release_sockets;
853 }
854 }
855
2bf89621 856 if (tconn->data.socket && tconn->meta.socket) {
907599e0 857 schedule_timeout_interruptible(tconn->net_conf->ping_timeo*HZ/10);
2bf89621
AG
858 ok = drbd_socket_okay(&tconn->data.socket);
859 ok = drbd_socket_okay(&tconn->meta.socket) && ok;
b411b363
PR
860 if (ok)
861 break;
862 }
863
864retry:
907599e0 865 s = drbd_wait_for_connect(tconn);
b411b363 866 if (s) {
9f5bdc33 867 try = receive_first_packet(tconn, s);
2bf89621
AG
868 drbd_socket_okay(&tconn->data.socket);
869 drbd_socket_okay(&tconn->meta.socket);
b411b363 870 switch (try) {
e5d6f33a 871 case P_INITIAL_DATA:
2bf89621 872 if (tconn->data.socket) {
907599e0 873 conn_warn(tconn, "initial packet S crossed\n");
2bf89621 874 sock_release(tconn->data.socket);
b411b363 875 }
2bf89621 876 tconn->data.socket = s;
b411b363 877 break;
e5d6f33a 878 case P_INITIAL_META:
2bf89621 879 if (tconn->meta.socket) {
907599e0 880 conn_warn(tconn, "initial packet M crossed\n");
2bf89621 881 sock_release(tconn->meta.socket);
b411b363 882 }
2bf89621 883 tconn->meta.socket = s;
907599e0 884 set_bit(DISCARD_CONCURRENT, &tconn->flags);
b411b363
PR
885 break;
886 default:
907599e0 887 conn_warn(tconn, "Error receiving initial packet\n");
b411b363
PR
888 sock_release(s);
889 if (random32() & 1)
890 goto retry;
891 }
892 }
893
bbeb641c 894 if (tconn->cstate <= C_DISCONNECTING)
b411b363
PR
895 goto out_release_sockets;
896 if (signal_pending(current)) {
897 flush_signals(current);
898 smp_rmb();
907599e0 899 if (get_t_state(&tconn->receiver) == EXITING)
b411b363
PR
900 goto out_release_sockets;
901 }
902
2bf89621
AG
903 if (tconn->data.socket && &tconn->meta.socket) {
904 ok = drbd_socket_okay(&tconn->data.socket);
905 ok = drbd_socket_okay(&tconn->meta.socket) && ok;
b411b363
PR
906 if (ok)
907 break;
908 }
909 } while (1);
910
2bf89621
AG
911 sock = tconn->data.socket;
912 msock = tconn->meta.socket;
913
b411b363
PR
914 msock->sk->sk_reuse = 1; /* SO_REUSEADDR */
915 sock->sk->sk_reuse = 1; /* SO_REUSEADDR */
916
917 sock->sk->sk_allocation = GFP_NOIO;
918 msock->sk->sk_allocation = GFP_NOIO;
919
920 sock->sk->sk_priority = TC_PRIO_INTERACTIVE_BULK;
921 msock->sk->sk_priority = TC_PRIO_INTERACTIVE;
922
b411b363 923 /* NOT YET ...
907599e0 924 * sock->sk->sk_sndtimeo = tconn->net_conf->timeout*HZ/10;
b411b363 925 * sock->sk->sk_rcvtimeo = MAX_SCHEDULE_TIMEOUT;
6038178e 926 * first set it to the P_CONNECTION_FEATURES timeout,
b411b363
PR
927 * which we set to 4x the configured ping_timeout. */
928 sock->sk->sk_sndtimeo =
907599e0 929 sock->sk->sk_rcvtimeo = tconn->net_conf->ping_timeo*4*HZ/10;
b411b363 930
907599e0
PR
931 msock->sk->sk_sndtimeo = tconn->net_conf->timeout*HZ/10;
932 msock->sk->sk_rcvtimeo = tconn->net_conf->ping_int*HZ;
b411b363
PR
933
934 /* we don't want delays.
25985edc 935 * we use TCP_CORK where appropriate, though */
b411b363
PR
936 drbd_tcp_nodelay(sock);
937 drbd_tcp_nodelay(msock);
938
907599e0 939 tconn->last_received = jiffies;
b411b363 940
6038178e 941 h = drbd_do_features(tconn);
b411b363
PR
942 if (h <= 0)
943 return h;
944
907599e0 945 if (tconn->cram_hmac_tfm) {
b411b363 946 /* drbd_request_state(mdev, NS(conn, WFAuth)); */
907599e0 947 switch (drbd_do_auth(tconn)) {
b10d96cb 948 case -1:
907599e0 949 conn_err(tconn, "Authentication of peer failed\n");
b411b363 950 return -1;
b10d96cb 951 case 0:
907599e0 952 conn_err(tconn, "Authentication of peer failed, trying again.\n");
b10d96cb 953 return 0;
b411b363
PR
954 }
955 }
956
bbeb641c 957 if (conn_request_state(tconn, NS(conn, C_WF_REPORT_PARAMS), CS_VERBOSE) < SS_SUCCESS)
b411b363
PR
958 return 0;
959
907599e0 960 sock->sk->sk_sndtimeo = tconn->net_conf->timeout*HZ/10;
b411b363
PR
961 sock->sk->sk_rcvtimeo = MAX_SCHEDULE_TIMEOUT;
962
907599e0 963 drbd_thread_start(&tconn->asender);
b411b363 964
387eb308 965 if (drbd_send_protocol(tconn) == -EOPNOTSUPP)
7e2455c1 966 return -1;
b411b363 967
907599e0 968 return !idr_for_each(&tconn->volumes, drbd_connected, tconn);
b411b363
PR
969
970out_release_sockets:
2bf89621
AG
971 if (tconn->data.socket) {
972 sock_release(tconn->data.socket);
973 tconn->data.socket = NULL;
974 }
975 if (tconn->meta.socket) {
976 sock_release(tconn->meta.socket);
977 tconn->meta.socket = NULL;
978 }
b411b363
PR
979 return -1;
980}
981
e658983a 982static int decode_header(struct drbd_tconn *tconn, void *header, struct packet_info *pi)
b411b363 983{
e658983a
AG
984 unsigned int header_size = drbd_header_size(tconn);
985
0c8e36d9
AG
986 if (header_size == sizeof(struct p_header100) &&
987 *(__be32 *)header == cpu_to_be32(DRBD_MAGIC_100)) {
988 struct p_header100 *h = header;
989 if (h->pad != 0) {
990 conn_err(tconn, "Header padding is not zero\n");
991 return -EINVAL;
992 }
993 pi->vnr = be16_to_cpu(h->volume);
994 pi->cmd = be16_to_cpu(h->command);
995 pi->size = be32_to_cpu(h->length);
996 } else if (header_size == sizeof(struct p_header95) &&
997 *(__be16 *)header == cpu_to_be16(DRBD_MAGIC_BIG)) {
e658983a 998 struct p_header95 *h = header;
e658983a 999 pi->cmd = be16_to_cpu(h->command);
b55d84ba
AG
1000 pi->size = be32_to_cpu(h->length);
1001 pi->vnr = 0;
e658983a
AG
1002 } else if (header_size == sizeof(struct p_header80) &&
1003 *(__be32 *)header == cpu_to_be32(DRBD_MAGIC)) {
1004 struct p_header80 *h = header;
1005 pi->cmd = be16_to_cpu(h->command);
1006 pi->size = be16_to_cpu(h->length);
77351055 1007 pi->vnr = 0;
02918be2 1008 } else {
e658983a
AG
1009 conn_err(tconn, "Wrong magic value 0x%08x in protocol version %d\n",
1010 be32_to_cpu(*(__be32 *)header),
1011 tconn->agreed_pro_version);
8172f3e9 1012 return -EINVAL;
b411b363 1013 }
e658983a 1014 pi->data = header + header_size;
8172f3e9 1015 return 0;
257d0af6
PR
1016}
1017
9ba7aa00 1018static int drbd_recv_header(struct drbd_tconn *tconn, struct packet_info *pi)
257d0af6 1019{
e658983a 1020 void *buffer = tconn->data.rbuf;
69bc7bc3 1021 int err;
257d0af6 1022
e658983a 1023 err = drbd_recv_all_warn(tconn, buffer, drbd_header_size(tconn));
a5c31904 1024 if (err)
69bc7bc3 1025 return err;
257d0af6 1026
e658983a 1027 err = decode_header(tconn, buffer, pi);
9ba7aa00 1028 tconn->last_received = jiffies;
b411b363 1029
69bc7bc3 1030 return err;
b411b363
PR
1031}
1032
2451fc3b 1033static void drbd_flush(struct drbd_conf *mdev)
b411b363
PR
1034{
1035 int rv;
1036
1037 if (mdev->write_ordering >= WO_bdev_flush && get_ldev(mdev)) {
fbd9b09a 1038 rv = blkdev_issue_flush(mdev->ldev->backing_bdev, GFP_KERNEL,
dd3932ed 1039 NULL);
b411b363
PR
1040 if (rv) {
1041 dev_err(DEV, "local disk flush failed with status %d\n", rv);
1042 /* would rather check on EOPNOTSUPP, but that is not reliable.
1043 * don't try again for ANY return value != 0
1044 * if (rv == -EOPNOTSUPP) */
1045 drbd_bump_write_ordering(mdev, WO_drain_io);
1046 }
1047 put_ldev(mdev);
1048 }
b411b363
PR
1049}
1050
1051/**
1052 * drbd_may_finish_epoch() - Applies an epoch_event to the epoch's state, eventually finishes it.
1053 * @mdev: DRBD device.
1054 * @epoch: Epoch object.
1055 * @ev: Epoch event.
1056 */
1057static enum finish_epoch drbd_may_finish_epoch(struct drbd_conf *mdev,
1058 struct drbd_epoch *epoch,
1059 enum epoch_event ev)
1060{
2451fc3b 1061 int epoch_size;
b411b363 1062 struct drbd_epoch *next_epoch;
b411b363
PR
1063 enum finish_epoch rv = FE_STILL_LIVE;
1064
1065 spin_lock(&mdev->epoch_lock);
1066 do {
1067 next_epoch = NULL;
b411b363
PR
1068
1069 epoch_size = atomic_read(&epoch->epoch_size);
1070
1071 switch (ev & ~EV_CLEANUP) {
1072 case EV_PUT:
1073 atomic_dec(&epoch->active);
1074 break;
1075 case EV_GOT_BARRIER_NR:
1076 set_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags);
b411b363
PR
1077 break;
1078 case EV_BECAME_LAST:
1079 /* nothing to do*/
1080 break;
1081 }
1082
b411b363
PR
1083 if (epoch_size != 0 &&
1084 atomic_read(&epoch->active) == 0 &&
2451fc3b 1085 test_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags)) {
b411b363
PR
1086 if (!(ev & EV_CLEANUP)) {
1087 spin_unlock(&mdev->epoch_lock);
1088 drbd_send_b_ack(mdev, epoch->barrier_nr, epoch_size);
1089 spin_lock(&mdev->epoch_lock);
1090 }
1091 dec_unacked(mdev);
1092
1093 if (mdev->current_epoch != epoch) {
1094 next_epoch = list_entry(epoch->list.next, struct drbd_epoch, list);
1095 list_del(&epoch->list);
1096 ev = EV_BECAME_LAST | (ev & EV_CLEANUP);
1097 mdev->epochs--;
b411b363
PR
1098 kfree(epoch);
1099
1100 if (rv == FE_STILL_LIVE)
1101 rv = FE_DESTROYED;
1102 } else {
1103 epoch->flags = 0;
1104 atomic_set(&epoch->epoch_size, 0);
698f9315 1105 /* atomic_set(&epoch->active, 0); is already zero */
b411b363
PR
1106 if (rv == FE_STILL_LIVE)
1107 rv = FE_RECYCLED;
2451fc3b 1108 wake_up(&mdev->ee_wait);
b411b363
PR
1109 }
1110 }
1111
1112 if (!next_epoch)
1113 break;
1114
1115 epoch = next_epoch;
1116 } while (1);
1117
1118 spin_unlock(&mdev->epoch_lock);
1119
b411b363
PR
1120 return rv;
1121}
1122
1123/**
1124 * drbd_bump_write_ordering() - Fall back to an other write ordering method
1125 * @mdev: DRBD device.
1126 * @wo: Write ordering method to try.
1127 */
1128void drbd_bump_write_ordering(struct drbd_conf *mdev, enum write_ordering_e wo) __must_hold(local)
1129{
1130 enum write_ordering_e pwo;
1131 static char *write_ordering_str[] = {
1132 [WO_none] = "none",
1133 [WO_drain_io] = "drain",
1134 [WO_bdev_flush] = "flush",
b411b363
PR
1135 };
1136
1137 pwo = mdev->write_ordering;
1138 wo = min(pwo, wo);
b411b363
PR
1139 if (wo == WO_bdev_flush && mdev->ldev->dc.no_disk_flush)
1140 wo = WO_drain_io;
1141 if (wo == WO_drain_io && mdev->ldev->dc.no_disk_drain)
1142 wo = WO_none;
1143 mdev->write_ordering = wo;
2451fc3b 1144 if (pwo != mdev->write_ordering || wo == WO_bdev_flush)
b411b363
PR
1145 dev_info(DEV, "Method to ensure write ordering: %s\n", write_ordering_str[mdev->write_ordering]);
1146}
1147
45bb912b 1148/**
fbe29dec 1149 * drbd_submit_peer_request()
45bb912b 1150 * @mdev: DRBD device.
db830c46 1151 * @peer_req: peer request
45bb912b 1152 * @rw: flag field, see bio->bi_rw
10f6d992
LE
1153 *
1154 * May spread the pages to multiple bios,
1155 * depending on bio_add_page restrictions.
1156 *
1157 * Returns 0 if all bios have been submitted,
1158 * -ENOMEM if we could not allocate enough bios,
1159 * -ENOSPC (any better suggestion?) if we have not been able to bio_add_page a
1160 * single page to an empty bio (which should never happen and likely indicates
1161 * that the lower level IO stack is in some way broken). This has been observed
1162 * on certain Xen deployments.
45bb912b
LE
1163 */
1164/* TODO allocate from our own bio_set. */
fbe29dec
AG
1165int drbd_submit_peer_request(struct drbd_conf *mdev,
1166 struct drbd_peer_request *peer_req,
1167 const unsigned rw, const int fault_type)
45bb912b
LE
1168{
1169 struct bio *bios = NULL;
1170 struct bio *bio;
db830c46
AG
1171 struct page *page = peer_req->pages;
1172 sector_t sector = peer_req->i.sector;
1173 unsigned ds = peer_req->i.size;
45bb912b
LE
1174 unsigned n_bios = 0;
1175 unsigned nr_pages = (ds + PAGE_SIZE -1) >> PAGE_SHIFT;
10f6d992 1176 int err = -ENOMEM;
45bb912b
LE
1177
1178 /* In most cases, we will only need one bio. But in case the lower
1179 * level restrictions happen to be different at this offset on this
1180 * side than those of the sending peer, we may need to submit the
da4a75d2
LE
1181 * request in more than one bio.
1182 *
1183 * Plain bio_alloc is good enough here, this is no DRBD internally
1184 * generated bio, but a bio allocated on behalf of the peer.
1185 */
45bb912b
LE
1186next_bio:
1187 bio = bio_alloc(GFP_NOIO, nr_pages);
1188 if (!bio) {
1189 dev_err(DEV, "submit_ee: Allocation of a bio failed\n");
1190 goto fail;
1191 }
db830c46 1192 /* > peer_req->i.sector, unless this is the first bio */
45bb912b
LE
1193 bio->bi_sector = sector;
1194 bio->bi_bdev = mdev->ldev->backing_bdev;
45bb912b 1195 bio->bi_rw = rw;
db830c46 1196 bio->bi_private = peer_req;
fcefa62e 1197 bio->bi_end_io = drbd_peer_request_endio;
45bb912b
LE
1198
1199 bio->bi_next = bios;
1200 bios = bio;
1201 ++n_bios;
1202
1203 page_chain_for_each(page) {
1204 unsigned len = min_t(unsigned, ds, PAGE_SIZE);
1205 if (!bio_add_page(bio, page, len, 0)) {
10f6d992
LE
1206 /* A single page must always be possible!
1207 * But in case it fails anyways,
1208 * we deal with it, and complain (below). */
1209 if (bio->bi_vcnt == 0) {
1210 dev_err(DEV,
1211 "bio_add_page failed for len=%u, "
1212 "bi_vcnt=0 (bi_sector=%llu)\n",
1213 len, (unsigned long long)bio->bi_sector);
1214 err = -ENOSPC;
1215 goto fail;
1216 }
45bb912b
LE
1217 goto next_bio;
1218 }
1219 ds -= len;
1220 sector += len >> 9;
1221 --nr_pages;
1222 }
1223 D_ASSERT(page == NULL);
1224 D_ASSERT(ds == 0);
1225
db830c46 1226 atomic_set(&peer_req->pending_bios, n_bios);
45bb912b
LE
1227 do {
1228 bio = bios;
1229 bios = bios->bi_next;
1230 bio->bi_next = NULL;
1231
45bb912b 1232 drbd_generic_make_request(mdev, fault_type, bio);
45bb912b 1233 } while (bios);
45bb912b
LE
1234 return 0;
1235
1236fail:
1237 while (bios) {
1238 bio = bios;
1239 bios = bios->bi_next;
1240 bio_put(bio);
1241 }
10f6d992 1242 return err;
45bb912b
LE
1243}
1244
53840641 1245static void drbd_remove_epoch_entry_interval(struct drbd_conf *mdev,
db830c46 1246 struct drbd_peer_request *peer_req)
53840641 1247{
db830c46 1248 struct drbd_interval *i = &peer_req->i;
53840641
AG
1249
1250 drbd_remove_interval(&mdev->write_requests, i);
1251 drbd_clear_interval(i);
1252
6c852bec 1253 /* Wake up any processes waiting for this peer request to complete. */
53840641
AG
1254 if (i->waiting)
1255 wake_up(&mdev->misc_wait);
1256}
1257
4a76b161 1258static int receive_Barrier(struct drbd_tconn *tconn, struct packet_info *pi)
b411b363 1259{
4a76b161 1260 struct drbd_conf *mdev;
2451fc3b 1261 int rv;
e658983a 1262 struct p_barrier *p = pi->data;
b411b363
PR
1263 struct drbd_epoch *epoch;
1264
4a76b161
AG
1265 mdev = vnr_to_mdev(tconn, pi->vnr);
1266 if (!mdev)
1267 return -EIO;
1268
b411b363
PR
1269 inc_unacked(mdev);
1270
b411b363
PR
1271 mdev->current_epoch->barrier_nr = p->barrier;
1272 rv = drbd_may_finish_epoch(mdev, mdev->current_epoch, EV_GOT_BARRIER_NR);
1273
1274 /* P_BARRIER_ACK may imply that the corresponding extent is dropped from
1275 * the activity log, which means it would not be resynced in case the
1276 * R_PRIMARY crashes now.
1277 * Therefore we must send the barrier_ack after the barrier request was
1278 * completed. */
1279 switch (mdev->write_ordering) {
b411b363
PR
1280 case WO_none:
1281 if (rv == FE_RECYCLED)
82bc0194 1282 return 0;
2451fc3b
PR
1283
1284 /* receiver context, in the writeout path of the other node.
1285 * avoid potential distributed deadlock */
1286 epoch = kmalloc(sizeof(struct drbd_epoch), GFP_NOIO);
1287 if (epoch)
1288 break;
1289 else
1290 dev_warn(DEV, "Allocation of an epoch failed, slowing down\n");
1291 /* Fall through */
b411b363
PR
1292
1293 case WO_bdev_flush:
1294 case WO_drain_io:
b411b363 1295 drbd_wait_ee_list_empty(mdev, &mdev->active_ee);
2451fc3b
PR
1296 drbd_flush(mdev);
1297
1298 if (atomic_read(&mdev->current_epoch->epoch_size)) {
1299 epoch = kmalloc(sizeof(struct drbd_epoch), GFP_NOIO);
1300 if (epoch)
1301 break;
b411b363
PR
1302 }
1303
2451fc3b
PR
1304 epoch = mdev->current_epoch;
1305 wait_event(mdev->ee_wait, atomic_read(&epoch->epoch_size) == 0);
1306
1307 D_ASSERT(atomic_read(&epoch->active) == 0);
1308 D_ASSERT(epoch->flags == 0);
b411b363 1309
82bc0194 1310 return 0;
2451fc3b
PR
1311 default:
1312 dev_err(DEV, "Strangeness in mdev->write_ordering %d\n", mdev->write_ordering);
82bc0194 1313 return -EIO;
b411b363
PR
1314 }
1315
1316 epoch->flags = 0;
1317 atomic_set(&epoch->epoch_size, 0);
1318 atomic_set(&epoch->active, 0);
1319
1320 spin_lock(&mdev->epoch_lock);
1321 if (atomic_read(&mdev->current_epoch->epoch_size)) {
1322 list_add(&epoch->list, &mdev->current_epoch->list);
1323 mdev->current_epoch = epoch;
1324 mdev->epochs++;
b411b363
PR
1325 } else {
1326 /* The current_epoch got recycled while we allocated this one... */
1327 kfree(epoch);
1328 }
1329 spin_unlock(&mdev->epoch_lock);
1330
82bc0194 1331 return 0;
b411b363
PR
1332}
1333
1334/* used from receive_RSDataReply (recv_resync_read)
1335 * and from receive_Data */
f6ffca9f
AG
1336static struct drbd_peer_request *
1337read_in_block(struct drbd_conf *mdev, u64 id, sector_t sector,
1338 int data_size) __must_hold(local)
b411b363 1339{
6666032a 1340 const sector_t capacity = drbd_get_capacity(mdev->this_bdev);
db830c46 1341 struct drbd_peer_request *peer_req;
b411b363 1342 struct page *page;
a5c31904 1343 int dgs, ds, err;
a0638456
PR
1344 void *dig_in = mdev->tconn->int_dig_in;
1345 void *dig_vv = mdev->tconn->int_dig_vv;
6b4388ac 1346 unsigned long *data;
b411b363 1347
a0638456
PR
1348 dgs = (mdev->tconn->agreed_pro_version >= 87 && mdev->tconn->integrity_r_tfm) ?
1349 crypto_hash_digestsize(mdev->tconn->integrity_r_tfm) : 0;
b411b363
PR
1350
1351 if (dgs) {
9f5bdc33
AG
1352 /*
1353 * FIXME: Receive the incoming digest into the receive buffer
1354 * here, together with its struct p_data?
1355 */
a5c31904
AG
1356 err = drbd_recv_all_warn(mdev->tconn, dig_in, dgs);
1357 if (err)
b411b363 1358 return NULL;
b411b363
PR
1359 }
1360
1361 data_size -= dgs;
1362
841ce241
AG
1363 if (!expect(data_size != 0))
1364 return NULL;
1365 if (!expect(IS_ALIGNED(data_size, 512)))
1366 return NULL;
1367 if (!expect(data_size <= DRBD_MAX_BIO_SIZE))
1368 return NULL;
b411b363 1369
6666032a
LE
1370 /* even though we trust out peer,
1371 * we sometimes have to double check. */
1372 if (sector + (data_size>>9) > capacity) {
fdda6544
LE
1373 dev_err(DEV, "request from peer beyond end of local disk: "
1374 "capacity: %llus < sector: %llus + size: %u\n",
6666032a
LE
1375 (unsigned long long)capacity,
1376 (unsigned long long)sector, data_size);
1377 return NULL;
1378 }
1379
b411b363
PR
1380 /* GFP_NOIO, because we must not cause arbitrary write-out: in a DRBD
1381 * "criss-cross" setup, that might cause write-out on some other DRBD,
1382 * which in turn might block on the other node at this very place. */
db830c46
AG
1383 peer_req = drbd_alloc_ee(mdev, id, sector, data_size, GFP_NOIO);
1384 if (!peer_req)
b411b363 1385 return NULL;
45bb912b 1386
b411b363 1387 ds = data_size;
db830c46 1388 page = peer_req->pages;
45bb912b
LE
1389 page_chain_for_each(page) {
1390 unsigned len = min_t(int, ds, PAGE_SIZE);
6b4388ac 1391 data = kmap(page);
a5c31904 1392 err = drbd_recv_all_warn(mdev->tconn, data, len);
0cf9d27e 1393 if (drbd_insert_fault(mdev, DRBD_FAULT_RECEIVE)) {
6b4388ac
PR
1394 dev_err(DEV, "Fault injection: Corrupting data on receive\n");
1395 data[0] = data[0] ^ (unsigned long)-1;
1396 }
b411b363 1397 kunmap(page);
a5c31904 1398 if (err) {
db830c46 1399 drbd_free_ee(mdev, peer_req);
b411b363
PR
1400 return NULL;
1401 }
a5c31904 1402 ds -= len;
b411b363
PR
1403 }
1404
1405 if (dgs) {
db830c46 1406 drbd_csum_ee(mdev, mdev->tconn->integrity_r_tfm, peer_req, dig_vv);
b411b363 1407 if (memcmp(dig_in, dig_vv, dgs)) {
470be44a
LE
1408 dev_err(DEV, "Digest integrity check FAILED: %llus +%u\n",
1409 (unsigned long long)sector, data_size);
db830c46 1410 drbd_free_ee(mdev, peer_req);
b411b363
PR
1411 return NULL;
1412 }
1413 }
1414 mdev->recv_cnt += data_size>>9;
db830c46 1415 return peer_req;
b411b363
PR
1416}
1417
1418/* drbd_drain_block() just takes a data block
1419 * out of the socket input buffer, and discards it.
1420 */
1421static int drbd_drain_block(struct drbd_conf *mdev, int data_size)
1422{
1423 struct page *page;
a5c31904 1424 int err = 0;
b411b363
PR
1425 void *data;
1426
c3470cde 1427 if (!data_size)
fc5be839 1428 return 0;
c3470cde 1429
45bb912b 1430 page = drbd_pp_alloc(mdev, 1, 1);
b411b363
PR
1431
1432 data = kmap(page);
1433 while (data_size) {
fc5be839
AG
1434 unsigned int len = min_t(int, data_size, PAGE_SIZE);
1435
a5c31904
AG
1436 err = drbd_recv_all_warn(mdev->tconn, data, len);
1437 if (err)
b411b363 1438 break;
a5c31904 1439 data_size -= len;
b411b363
PR
1440 }
1441 kunmap(page);
435f0740 1442 drbd_pp_free(mdev, page, 0);
fc5be839 1443 return err;
b411b363
PR
1444}
1445
1446static int recv_dless_read(struct drbd_conf *mdev, struct drbd_request *req,
1447 sector_t sector, int data_size)
1448{
1449 struct bio_vec *bvec;
1450 struct bio *bio;
a5c31904 1451 int dgs, err, i, expect;
a0638456
PR
1452 void *dig_in = mdev->tconn->int_dig_in;
1453 void *dig_vv = mdev->tconn->int_dig_vv;
b411b363 1454
a0638456
PR
1455 dgs = (mdev->tconn->agreed_pro_version >= 87 && mdev->tconn->integrity_r_tfm) ?
1456 crypto_hash_digestsize(mdev->tconn->integrity_r_tfm) : 0;
b411b363
PR
1457
1458 if (dgs) {
a5c31904
AG
1459 err = drbd_recv_all_warn(mdev->tconn, dig_in, dgs);
1460 if (err)
1461 return err;
b411b363
PR
1462 }
1463
1464 data_size -= dgs;
1465
1466 /* optimistically update recv_cnt. if receiving fails below,
1467 * we disconnect anyways, and counters will be reset. */
1468 mdev->recv_cnt += data_size>>9;
1469
1470 bio = req->master_bio;
1471 D_ASSERT(sector == bio->bi_sector);
1472
1473 bio_for_each_segment(bvec, bio, i) {
a5c31904 1474 void *mapped = kmap(bvec->bv_page) + bvec->bv_offset;
b411b363 1475 expect = min_t(int, data_size, bvec->bv_len);
a5c31904 1476 err = drbd_recv_all_warn(mdev->tconn, mapped, expect);
b411b363 1477 kunmap(bvec->bv_page);
a5c31904
AG
1478 if (err)
1479 return err;
1480 data_size -= expect;
b411b363
PR
1481 }
1482
1483 if (dgs) {
a0638456 1484 drbd_csum_bio(mdev, mdev->tconn->integrity_r_tfm, bio, dig_vv);
b411b363
PR
1485 if (memcmp(dig_in, dig_vv, dgs)) {
1486 dev_err(DEV, "Digest integrity check FAILED. Broken NICs?\n");
28284cef 1487 return -EINVAL;
b411b363
PR
1488 }
1489 }
1490
1491 D_ASSERT(data_size == 0);
28284cef 1492 return 0;
b411b363
PR
1493}
1494
1495/* e_end_resync_block() is called via
1496 * drbd_process_done_ee() by asender only */
99920dc5 1497static int e_end_resync_block(struct drbd_work *w, int unused)
b411b363 1498{
8050e6d0
AG
1499 struct drbd_peer_request *peer_req =
1500 container_of(w, struct drbd_peer_request, w);
00d56944 1501 struct drbd_conf *mdev = w->mdev;
db830c46 1502 sector_t sector = peer_req->i.sector;
99920dc5 1503 int err;
b411b363 1504
db830c46 1505 D_ASSERT(drbd_interval_empty(&peer_req->i));
b411b363 1506
db830c46
AG
1507 if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
1508 drbd_set_in_sync(mdev, sector, peer_req->i.size);
99920dc5 1509 err = drbd_send_ack(mdev, P_RS_WRITE_ACK, peer_req);
b411b363
PR
1510 } else {
1511 /* Record failure to sync */
db830c46 1512 drbd_rs_failed_io(mdev, sector, peer_req->i.size);
b411b363 1513
99920dc5 1514 err = drbd_send_ack(mdev, P_NEG_ACK, peer_req);
b411b363
PR
1515 }
1516 dec_unacked(mdev);
1517
99920dc5 1518 return err;
b411b363
PR
1519}
1520
1521static int recv_resync_read(struct drbd_conf *mdev, sector_t sector, int data_size) __releases(local)
1522{
db830c46 1523 struct drbd_peer_request *peer_req;
b411b363 1524
db830c46
AG
1525 peer_req = read_in_block(mdev, ID_SYNCER, sector, data_size);
1526 if (!peer_req)
45bb912b 1527 goto fail;
b411b363
PR
1528
1529 dec_rs_pending(mdev);
1530
b411b363
PR
1531 inc_unacked(mdev);
1532 /* corresponding dec_unacked() in e_end_resync_block()
1533 * respective _drbd_clear_done_ee */
1534
db830c46 1535 peer_req->w.cb = e_end_resync_block;
45bb912b 1536
87eeee41 1537 spin_lock_irq(&mdev->tconn->req_lock);
db830c46 1538 list_add(&peer_req->w.list, &mdev->sync_ee);
87eeee41 1539 spin_unlock_irq(&mdev->tconn->req_lock);
b411b363 1540
0f0601f4 1541 atomic_add(data_size >> 9, &mdev->rs_sect_ev);
fbe29dec 1542 if (drbd_submit_peer_request(mdev, peer_req, WRITE, DRBD_FAULT_RS_WR) == 0)
e1c1b0fc 1543 return 0;
b411b363 1544
10f6d992
LE
1545 /* don't care for the reason here */
1546 dev_err(DEV, "submit failed, triggering re-connect\n");
87eeee41 1547 spin_lock_irq(&mdev->tconn->req_lock);
db830c46 1548 list_del(&peer_req->w.list);
87eeee41 1549 spin_unlock_irq(&mdev->tconn->req_lock);
22cc37a9 1550
db830c46 1551 drbd_free_ee(mdev, peer_req);
45bb912b
LE
1552fail:
1553 put_ldev(mdev);
e1c1b0fc 1554 return -EIO;
b411b363
PR
1555}
1556
668eebc6 1557static struct drbd_request *
bc9c5c41
AG
1558find_request(struct drbd_conf *mdev, struct rb_root *root, u64 id,
1559 sector_t sector, bool missing_ok, const char *func)
51624585 1560{
51624585
AG
1561 struct drbd_request *req;
1562
bc9c5c41
AG
1563 /* Request object according to our peer */
1564 req = (struct drbd_request *)(unsigned long)id;
5e472264 1565 if (drbd_contains_interval(root, sector, &req->i) && req->i.local)
668eebc6 1566 return req;
c3afd8f5
AG
1567 if (!missing_ok) {
1568 dev_err(DEV, "%s: failed to find request %lu, sector %llus\n", func,
1569 (unsigned long)id, (unsigned long long)sector);
1570 }
51624585
AG
1571 return NULL;
1572}
1573
4a76b161 1574static int receive_DataReply(struct drbd_tconn *tconn, struct packet_info *pi)
b411b363 1575{
4a76b161 1576 struct drbd_conf *mdev;
b411b363
PR
1577 struct drbd_request *req;
1578 sector_t sector;
82bc0194 1579 int err;
e658983a 1580 struct p_data *p = pi->data;
4a76b161
AG
1581
1582 mdev = vnr_to_mdev(tconn, pi->vnr);
1583 if (!mdev)
1584 return -EIO;
b411b363
PR
1585
1586 sector = be64_to_cpu(p->sector);
1587
87eeee41 1588 spin_lock_irq(&mdev->tconn->req_lock);
bc9c5c41 1589 req = find_request(mdev, &mdev->read_requests, p->block_id, sector, false, __func__);
87eeee41 1590 spin_unlock_irq(&mdev->tconn->req_lock);
c3afd8f5 1591 if (unlikely(!req))
82bc0194 1592 return -EIO;
b411b363 1593
24c4830c 1594 /* hlist_del(&req->collision) is done in _req_may_be_done, to avoid
b411b363
PR
1595 * special casing it there for the various failure cases.
1596 * still no race with drbd_fail_pending_reads */
e2857216 1597 err = recv_dless_read(mdev, req, sector, pi->size);
82bc0194 1598 if (!err)
8554df1c 1599 req_mod(req, DATA_RECEIVED);
b411b363
PR
1600 /* else: nothing. handled from drbd_disconnect...
1601 * I don't think we may complete this just yet
1602 * in case we are "on-disconnect: freeze" */
1603
82bc0194 1604 return err;
b411b363
PR
1605}
1606
4a76b161 1607static int receive_RSDataReply(struct drbd_tconn *tconn, struct packet_info *pi)
b411b363 1608{
4a76b161 1609 struct drbd_conf *mdev;
b411b363 1610 sector_t sector;
82bc0194 1611 int err;
e658983a 1612 struct p_data *p = pi->data;
4a76b161
AG
1613
1614 mdev = vnr_to_mdev(tconn, pi->vnr);
1615 if (!mdev)
1616 return -EIO;
b411b363
PR
1617
1618 sector = be64_to_cpu(p->sector);
1619 D_ASSERT(p->block_id == ID_SYNCER);
1620
1621 if (get_ldev(mdev)) {
1622 /* data is submitted to disk within recv_resync_read.
1623 * corresponding put_ldev done below on error,
fcefa62e 1624 * or in drbd_peer_request_endio. */
e2857216 1625 err = recv_resync_read(mdev, sector, pi->size);
b411b363
PR
1626 } else {
1627 if (__ratelimit(&drbd_ratelimit_state))
1628 dev_err(DEV, "Can not write resync data to local disk.\n");
1629
e2857216 1630 err = drbd_drain_block(mdev, pi->size);
b411b363 1631
e2857216 1632 drbd_send_ack_dp(mdev, P_NEG_ACK, p, pi->size);
b411b363
PR
1633 }
1634
e2857216 1635 atomic_add(pi->size >> 9, &mdev->rs_sect_in);
778f271d 1636
82bc0194 1637 return err;
b411b363
PR
1638}
1639
99920dc5 1640static int w_restart_write(struct drbd_work *w, int cancel)
7be8da07
AG
1641{
1642 struct drbd_request *req = container_of(w, struct drbd_request, w);
1643 struct drbd_conf *mdev = w->mdev;
1644 struct bio *bio;
1645 unsigned long start_time;
1646 unsigned long flags;
1647
1648 spin_lock_irqsave(&mdev->tconn->req_lock, flags);
1649 if (!expect(req->rq_state & RQ_POSTPONED)) {
1650 spin_unlock_irqrestore(&mdev->tconn->req_lock, flags);
99920dc5 1651 return -EIO;
7be8da07
AG
1652 }
1653 bio = req->master_bio;
1654 start_time = req->start_time;
1655 /* Postponed requests will not have their master_bio completed! */
1656 __req_mod(req, DISCARD_WRITE, NULL);
1657 spin_unlock_irqrestore(&mdev->tconn->req_lock, flags);
1658
1659 while (__drbd_make_request(mdev, bio, start_time))
1660 /* retry */ ;
99920dc5 1661 return 0;
7be8da07
AG
1662}
1663
1664static void restart_conflicting_writes(struct drbd_conf *mdev,
1665 sector_t sector, int size)
1666{
1667 struct drbd_interval *i;
1668 struct drbd_request *req;
1669
1670 drbd_for_each_overlap(i, &mdev->write_requests, sector, size) {
1671 if (!i->local)
1672 continue;
1673 req = container_of(i, struct drbd_request, i);
1674 if (req->rq_state & RQ_LOCAL_PENDING ||
1675 !(req->rq_state & RQ_POSTPONED))
1676 continue;
1677 if (expect(list_empty(&req->w.list))) {
1678 req->w.mdev = mdev;
1679 req->w.cb = w_restart_write;
1680 drbd_queue_work(&mdev->tconn->data.work, &req->w);
1681 }
1682 }
1683}
1684
b411b363
PR
1685/* e_end_block() is called via drbd_process_done_ee().
1686 * this means this function only runs in the asender thread
1687 */
99920dc5 1688static int e_end_block(struct drbd_work *w, int cancel)
b411b363 1689{
8050e6d0
AG
1690 struct drbd_peer_request *peer_req =
1691 container_of(w, struct drbd_peer_request, w);
00d56944 1692 struct drbd_conf *mdev = w->mdev;
db830c46 1693 sector_t sector = peer_req->i.sector;
99920dc5 1694 int err = 0, pcmd;
b411b363 1695
89e58e75 1696 if (mdev->tconn->net_conf->wire_protocol == DRBD_PROT_C) {
db830c46 1697 if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
b411b363
PR
1698 pcmd = (mdev->state.conn >= C_SYNC_SOURCE &&
1699 mdev->state.conn <= C_PAUSED_SYNC_T &&
db830c46 1700 peer_req->flags & EE_MAY_SET_IN_SYNC) ?
b411b363 1701 P_RS_WRITE_ACK : P_WRITE_ACK;
99920dc5 1702 err = drbd_send_ack(mdev, pcmd, peer_req);
b411b363 1703 if (pcmd == P_RS_WRITE_ACK)
db830c46 1704 drbd_set_in_sync(mdev, sector, peer_req->i.size);
b411b363 1705 } else {
99920dc5 1706 err = drbd_send_ack(mdev, P_NEG_ACK, peer_req);
b411b363
PR
1707 /* we expect it to be marked out of sync anyways...
1708 * maybe assert this? */
1709 }
1710 dec_unacked(mdev);
1711 }
1712 /* we delete from the conflict detection hash _after_ we sent out the
1713 * P_WRITE_ACK / P_NEG_ACK, to get the sequence number right. */
89e58e75 1714 if (mdev->tconn->net_conf->two_primaries) {
87eeee41 1715 spin_lock_irq(&mdev->tconn->req_lock);
db830c46
AG
1716 D_ASSERT(!drbd_interval_empty(&peer_req->i));
1717 drbd_remove_epoch_entry_interval(mdev, peer_req);
7be8da07
AG
1718 if (peer_req->flags & EE_RESTART_REQUESTS)
1719 restart_conflicting_writes(mdev, sector, peer_req->i.size);
87eeee41 1720 spin_unlock_irq(&mdev->tconn->req_lock);
bb3bfe96 1721 } else
db830c46 1722 D_ASSERT(drbd_interval_empty(&peer_req->i));
b411b363 1723
db830c46 1724 drbd_may_finish_epoch(mdev, peer_req->epoch, EV_PUT + (cancel ? EV_CLEANUP : 0));
b411b363 1725
99920dc5 1726 return err;
b411b363
PR
1727}
1728
7be8da07 1729static int e_send_ack(struct drbd_work *w, enum drbd_packet ack)
b411b363 1730{
7be8da07 1731 struct drbd_conf *mdev = w->mdev;
8050e6d0
AG
1732 struct drbd_peer_request *peer_req =
1733 container_of(w, struct drbd_peer_request, w);
99920dc5 1734 int err;
b411b363 1735
99920dc5 1736 err = drbd_send_ack(mdev, ack, peer_req);
b411b363
PR
1737 dec_unacked(mdev);
1738
99920dc5 1739 return err;
b411b363
PR
1740}
1741
99920dc5 1742static int e_send_discard_write(struct drbd_work *w, int unused)
7be8da07
AG
1743{
1744 return e_send_ack(w, P_DISCARD_WRITE);
1745}
1746
99920dc5 1747static int e_send_retry_write(struct drbd_work *w, int unused)
7be8da07
AG
1748{
1749 struct drbd_tconn *tconn = w->mdev->tconn;
1750
1751 return e_send_ack(w, tconn->agreed_pro_version >= 100 ?
1752 P_RETRY_WRITE : P_DISCARD_WRITE);
1753}
1754
3e394da1
AG
1755static bool seq_greater(u32 a, u32 b)
1756{
1757 /*
1758 * We assume 32-bit wrap-around here.
1759 * For 24-bit wrap-around, we would have to shift:
1760 * a <<= 8; b <<= 8;
1761 */
1762 return (s32)a - (s32)b > 0;
1763}
1764
1765static u32 seq_max(u32 a, u32 b)
1766{
1767 return seq_greater(a, b) ? a : b;
1768}
1769
7be8da07
AG
1770static bool need_peer_seq(struct drbd_conf *mdev)
1771{
1772 struct drbd_tconn *tconn = mdev->tconn;
1773
1774 /*
1775 * We only need to keep track of the last packet_seq number of our peer
1776 * if we are in dual-primary mode and we have the discard flag set; see
1777 * handle_write_conflicts().
1778 */
1779 return tconn->net_conf->two_primaries &&
1780 test_bit(DISCARD_CONCURRENT, &tconn->flags);
1781}
1782
43ae077d 1783static void update_peer_seq(struct drbd_conf *mdev, unsigned int peer_seq)
3e394da1 1784{
3c13b680 1785 unsigned int newest_peer_seq;
3e394da1 1786
7be8da07
AG
1787 if (need_peer_seq(mdev)) {
1788 spin_lock(&mdev->peer_seq_lock);
3c13b680
LE
1789 newest_peer_seq = seq_max(mdev->peer_seq, peer_seq);
1790 mdev->peer_seq = newest_peer_seq;
7be8da07 1791 spin_unlock(&mdev->peer_seq_lock);
3c13b680
LE
1792 /* wake up only if we actually changed mdev->peer_seq */
1793 if (peer_seq == newest_peer_seq)
7be8da07
AG
1794 wake_up(&mdev->seq_wait);
1795 }
3e394da1
AG
1796}
1797
b411b363
PR
1798/* Called from receive_Data.
1799 * Synchronize packets on sock with packets on msock.
1800 *
1801 * This is here so even when a P_DATA packet traveling via sock overtook an Ack
1802 * packet traveling on msock, they are still processed in the order they have
1803 * been sent.
1804 *
1805 * Note: we don't care for Ack packets overtaking P_DATA packets.
1806 *
1807 * In case packet_seq is larger than mdev->peer_seq number, there are
1808 * outstanding packets on the msock. We wait for them to arrive.
1809 * In case we are the logically next packet, we update mdev->peer_seq
1810 * ourselves. Correctly handles 32bit wrap around.
1811 *
1812 * Assume we have a 10 GBit connection, that is about 1<<30 byte per second,
1813 * about 1<<21 sectors per second. So "worst" case, we have 1<<3 == 8 seconds
1814 * for the 24bit wrap (historical atomic_t guarantee on some archs), and we have
1815 * 1<<9 == 512 seconds aka ages for the 32bit wrap around...
1816 *
1817 * returns 0 if we may process the packet,
1818 * -ERESTARTSYS if we were interrupted (by disconnect signal). */
7be8da07 1819static int wait_for_and_update_peer_seq(struct drbd_conf *mdev, const u32 peer_seq)
b411b363
PR
1820{
1821 DEFINE_WAIT(wait);
b411b363 1822 long timeout;
7be8da07
AG
1823 int ret;
1824
1825 if (!need_peer_seq(mdev))
1826 return 0;
1827
b411b363
PR
1828 spin_lock(&mdev->peer_seq_lock);
1829 for (;;) {
7be8da07
AG
1830 if (!seq_greater(peer_seq - 1, mdev->peer_seq)) {
1831 mdev->peer_seq = seq_max(mdev->peer_seq, peer_seq);
1832 ret = 0;
b411b363 1833 break;
7be8da07 1834 }
b411b363
PR
1835 if (signal_pending(current)) {
1836 ret = -ERESTARTSYS;
1837 break;
1838 }
7be8da07 1839 prepare_to_wait(&mdev->seq_wait, &wait, TASK_INTERRUPTIBLE);
b411b363 1840 spin_unlock(&mdev->peer_seq_lock);
71b1c1eb
AG
1841 timeout = mdev->tconn->net_conf->ping_timeo*HZ/10;
1842 timeout = schedule_timeout(timeout);
b411b363 1843 spin_lock(&mdev->peer_seq_lock);
7be8da07 1844 if (!timeout) {
b411b363 1845 ret = -ETIMEDOUT;
71b1c1eb 1846 dev_err(DEV, "Timed out waiting for missing ack packets; disconnecting\n");
b411b363
PR
1847 break;
1848 }
1849 }
b411b363 1850 spin_unlock(&mdev->peer_seq_lock);
7be8da07 1851 finish_wait(&mdev->seq_wait, &wait);
b411b363
PR
1852 return ret;
1853}
1854
688593c5
LE
1855/* see also bio_flags_to_wire()
1856 * DRBD_REQ_*, because we need to semantically map the flags to data packet
1857 * flags and back. We may replicate to other kernel versions. */
1858static unsigned long wire_flags_to_bio(struct drbd_conf *mdev, u32 dpf)
76d2e7ec 1859{
688593c5
LE
1860 return (dpf & DP_RW_SYNC ? REQ_SYNC : 0) |
1861 (dpf & DP_FUA ? REQ_FUA : 0) |
1862 (dpf & DP_FLUSH ? REQ_FLUSH : 0) |
1863 (dpf & DP_DISCARD ? REQ_DISCARD : 0);
76d2e7ec
PR
1864}
1865
7be8da07
AG
1866static void fail_postponed_requests(struct drbd_conf *mdev, sector_t sector,
1867 unsigned int size)
1868{
1869 struct drbd_interval *i;
1870
1871 repeat:
1872 drbd_for_each_overlap(i, &mdev->write_requests, sector, size) {
1873 struct drbd_request *req;
1874 struct bio_and_error m;
1875
1876 if (!i->local)
1877 continue;
1878 req = container_of(i, struct drbd_request, i);
1879 if (!(req->rq_state & RQ_POSTPONED))
1880 continue;
1881 req->rq_state &= ~RQ_POSTPONED;
1882 __req_mod(req, NEG_ACKED, &m);
1883 spin_unlock_irq(&mdev->tconn->req_lock);
1884 if (m.bio)
1885 complete_master_bio(mdev, &m);
1886 spin_lock_irq(&mdev->tconn->req_lock);
1887 goto repeat;
1888 }
1889}
1890
1891static int handle_write_conflicts(struct drbd_conf *mdev,
1892 struct drbd_peer_request *peer_req)
1893{
1894 struct drbd_tconn *tconn = mdev->tconn;
1895 bool resolve_conflicts = test_bit(DISCARD_CONCURRENT, &tconn->flags);
1896 sector_t sector = peer_req->i.sector;
1897 const unsigned int size = peer_req->i.size;
1898 struct drbd_interval *i;
1899 bool equal;
1900 int err;
1901
1902 /*
1903 * Inserting the peer request into the write_requests tree will prevent
1904 * new conflicting local requests from being added.
1905 */
1906 drbd_insert_interval(&mdev->write_requests, &peer_req->i);
1907
1908 repeat:
1909 drbd_for_each_overlap(i, &mdev->write_requests, sector, size) {
1910 if (i == &peer_req->i)
1911 continue;
1912
1913 if (!i->local) {
1914 /*
1915 * Our peer has sent a conflicting remote request; this
1916 * should not happen in a two-node setup. Wait for the
1917 * earlier peer request to complete.
1918 */
1919 err = drbd_wait_misc(mdev, i);
1920 if (err)
1921 goto out;
1922 goto repeat;
1923 }
1924
1925 equal = i->sector == sector && i->size == size;
1926 if (resolve_conflicts) {
1927 /*
1928 * If the peer request is fully contained within the
1929 * overlapping request, it can be discarded; otherwise,
1930 * it will be retried once all overlapping requests
1931 * have completed.
1932 */
1933 bool discard = i->sector <= sector && i->sector +
1934 (i->size >> 9) >= sector + (size >> 9);
1935
1936 if (!equal)
1937 dev_alert(DEV, "Concurrent writes detected: "
1938 "local=%llus +%u, remote=%llus +%u, "
1939 "assuming %s came first\n",
1940 (unsigned long long)i->sector, i->size,
1941 (unsigned long long)sector, size,
1942 discard ? "local" : "remote");
1943
1944 inc_unacked(mdev);
1945 peer_req->w.cb = discard ? e_send_discard_write :
1946 e_send_retry_write;
1947 list_add_tail(&peer_req->w.list, &mdev->done_ee);
1948 wake_asender(mdev->tconn);
1949
1950 err = -ENOENT;
1951 goto out;
1952 } else {
1953 struct drbd_request *req =
1954 container_of(i, struct drbd_request, i);
1955
1956 if (!equal)
1957 dev_alert(DEV, "Concurrent writes detected: "
1958 "local=%llus +%u, remote=%llus +%u\n",
1959 (unsigned long long)i->sector, i->size,
1960 (unsigned long long)sector, size);
1961
1962 if (req->rq_state & RQ_LOCAL_PENDING ||
1963 !(req->rq_state & RQ_POSTPONED)) {
1964 /*
1965 * Wait for the node with the discard flag to
1966 * decide if this request will be discarded or
1967 * retried. Requests that are discarded will
1968 * disappear from the write_requests tree.
1969 *
1970 * In addition, wait for the conflicting
1971 * request to finish locally before submitting
1972 * the conflicting peer request.
1973 */
1974 err = drbd_wait_misc(mdev, &req->i);
1975 if (err) {
1976 _conn_request_state(mdev->tconn,
1977 NS(conn, C_TIMEOUT),
1978 CS_HARD);
1979 fail_postponed_requests(mdev, sector, size);
1980 goto out;
1981 }
1982 goto repeat;
1983 }
1984 /*
1985 * Remember to restart the conflicting requests after
1986 * the new peer request has completed.
1987 */
1988 peer_req->flags |= EE_RESTART_REQUESTS;
1989 }
1990 }
1991 err = 0;
1992
1993 out:
1994 if (err)
1995 drbd_remove_epoch_entry_interval(mdev, peer_req);
1996 return err;
1997}
1998
b411b363 1999/* mirrored write */
4a76b161 2000static int receive_Data(struct drbd_tconn *tconn, struct packet_info *pi)
b411b363 2001{
4a76b161 2002 struct drbd_conf *mdev;
b411b363 2003 sector_t sector;
db830c46 2004 struct drbd_peer_request *peer_req;
e658983a 2005 struct p_data *p = pi->data;
7be8da07 2006 u32 peer_seq = be32_to_cpu(p->seq_num);
b411b363
PR
2007 int rw = WRITE;
2008 u32 dp_flags;
7be8da07 2009 int err;
b411b363 2010
4a76b161
AG
2011 mdev = vnr_to_mdev(tconn, pi->vnr);
2012 if (!mdev)
2013 return -EIO;
2014
7be8da07 2015 if (!get_ldev(mdev)) {
82bc0194
AG
2016 int err2;
2017
7be8da07 2018 err = wait_for_and_update_peer_seq(mdev, peer_seq);
e2857216 2019 drbd_send_ack_dp(mdev, P_NEG_ACK, p, pi->size);
b411b363 2020 atomic_inc(&mdev->current_epoch->epoch_size);
e2857216 2021 err2 = drbd_drain_block(mdev, pi->size);
82bc0194
AG
2022 if (!err)
2023 err = err2;
2024 return err;
b411b363
PR
2025 }
2026
fcefa62e
AG
2027 /*
2028 * Corresponding put_ldev done either below (on various errors), or in
2029 * drbd_peer_request_endio, if we successfully submit the data at the
2030 * end of this function.
2031 */
b411b363
PR
2032
2033 sector = be64_to_cpu(p->sector);
e2857216 2034 peer_req = read_in_block(mdev, p->block_id, sector, pi->size);
db830c46 2035 if (!peer_req) {
b411b363 2036 put_ldev(mdev);
82bc0194 2037 return -EIO;
b411b363
PR
2038 }
2039
db830c46 2040 peer_req->w.cb = e_end_block;
b411b363 2041
688593c5
LE
2042 dp_flags = be32_to_cpu(p->dp_flags);
2043 rw |= wire_flags_to_bio(mdev, dp_flags);
2044
2045 if (dp_flags & DP_MAY_SET_IN_SYNC)
db830c46 2046 peer_req->flags |= EE_MAY_SET_IN_SYNC;
688593c5 2047
b411b363 2048 spin_lock(&mdev->epoch_lock);
db830c46
AG
2049 peer_req->epoch = mdev->current_epoch;
2050 atomic_inc(&peer_req->epoch->epoch_size);
2051 atomic_inc(&peer_req->epoch->active);
b411b363
PR
2052 spin_unlock(&mdev->epoch_lock);
2053
7be8da07
AG
2054 if (mdev->tconn->net_conf->two_primaries) {
2055 err = wait_for_and_update_peer_seq(mdev, peer_seq);
2056 if (err)
b411b363 2057 goto out_interrupted;
87eeee41 2058 spin_lock_irq(&mdev->tconn->req_lock);
7be8da07
AG
2059 err = handle_write_conflicts(mdev, peer_req);
2060 if (err) {
2061 spin_unlock_irq(&mdev->tconn->req_lock);
2062 if (err == -ENOENT) {
b411b363 2063 put_ldev(mdev);
82bc0194 2064 return 0;
b411b363 2065 }
7be8da07 2066 goto out_interrupted;
b411b363 2067 }
7be8da07
AG
2068 } else
2069 spin_lock_irq(&mdev->tconn->req_lock);
db830c46 2070 list_add(&peer_req->w.list, &mdev->active_ee);
87eeee41 2071 spin_unlock_irq(&mdev->tconn->req_lock);
b411b363 2072
89e58e75 2073 switch (mdev->tconn->net_conf->wire_protocol) {
b411b363
PR
2074 case DRBD_PROT_C:
2075 inc_unacked(mdev);
2076 /* corresponding dec_unacked() in e_end_block()
2077 * respective _drbd_clear_done_ee */
2078 break;
2079 case DRBD_PROT_B:
2080 /* I really don't like it that the receiver thread
2081 * sends on the msock, but anyways */
db830c46 2082 drbd_send_ack(mdev, P_RECV_ACK, peer_req);
b411b363
PR
2083 break;
2084 case DRBD_PROT_A:
2085 /* nothing to do */
2086 break;
2087 }
2088
6719fb03 2089 if (mdev->state.pdsk < D_INCONSISTENT) {
b411b363 2090 /* In case we have the only disk of the cluster, */
db830c46
AG
2091 drbd_set_out_of_sync(mdev, peer_req->i.sector, peer_req->i.size);
2092 peer_req->flags |= EE_CALL_AL_COMPLETE_IO;
2093 peer_req->flags &= ~EE_MAY_SET_IN_SYNC;
181286ad 2094 drbd_al_begin_io(mdev, &peer_req->i);
b411b363
PR
2095 }
2096
82bc0194
AG
2097 err = drbd_submit_peer_request(mdev, peer_req, rw, DRBD_FAULT_DT_WR);
2098 if (!err)
2099 return 0;
b411b363 2100
10f6d992
LE
2101 /* don't care for the reason here */
2102 dev_err(DEV, "submit failed, triggering re-connect\n");
87eeee41 2103 spin_lock_irq(&mdev->tconn->req_lock);
db830c46
AG
2104 list_del(&peer_req->w.list);
2105 drbd_remove_epoch_entry_interval(mdev, peer_req);
87eeee41 2106 spin_unlock_irq(&mdev->tconn->req_lock);
db830c46 2107 if (peer_req->flags & EE_CALL_AL_COMPLETE_IO)
181286ad 2108 drbd_al_complete_io(mdev, &peer_req->i);
22cc37a9 2109
b411b363 2110out_interrupted:
db830c46 2111 drbd_may_finish_epoch(mdev, peer_req->epoch, EV_PUT + EV_CLEANUP);
b411b363 2112 put_ldev(mdev);
db830c46 2113 drbd_free_ee(mdev, peer_req);
82bc0194 2114 return err;
b411b363
PR
2115}
2116
0f0601f4
LE
2117/* We may throttle resync, if the lower device seems to be busy,
2118 * and current sync rate is above c_min_rate.
2119 *
2120 * To decide whether or not the lower device is busy, we use a scheme similar
2121 * to MD RAID is_mddev_idle(): if the partition stats reveal "significant"
2122 * (more than 64 sectors) of activity we cannot account for with our own resync
2123 * activity, it obviously is "busy".
2124 *
2125 * The current sync rate used here uses only the most recent two step marks,
2126 * to have a short time average so we can react faster.
2127 */
e3555d85 2128int drbd_rs_should_slow_down(struct drbd_conf *mdev, sector_t sector)
0f0601f4
LE
2129{
2130 struct gendisk *disk = mdev->ldev->backing_bdev->bd_contains->bd_disk;
2131 unsigned long db, dt, dbdt;
e3555d85 2132 struct lc_element *tmp;
0f0601f4
LE
2133 int curr_events;
2134 int throttle = 0;
2135
2136 /* feature disabled? */
f399002e 2137 if (mdev->ldev->dc.c_min_rate == 0)
0f0601f4
LE
2138 return 0;
2139
e3555d85
PR
2140 spin_lock_irq(&mdev->al_lock);
2141 tmp = lc_find(mdev->resync, BM_SECT_TO_EXT(sector));
2142 if (tmp) {
2143 struct bm_extent *bm_ext = lc_entry(tmp, struct bm_extent, lce);
2144 if (test_bit(BME_PRIORITY, &bm_ext->flags)) {
2145 spin_unlock_irq(&mdev->al_lock);
2146 return 0;
2147 }
2148 /* Do not slow down if app IO is already waiting for this extent */
2149 }
2150 spin_unlock_irq(&mdev->al_lock);
2151
0f0601f4
LE
2152 curr_events = (int)part_stat_read(&disk->part0, sectors[0]) +
2153 (int)part_stat_read(&disk->part0, sectors[1]) -
2154 atomic_read(&mdev->rs_sect_ev);
e3555d85 2155
0f0601f4
LE
2156 if (!mdev->rs_last_events || curr_events - mdev->rs_last_events > 64) {
2157 unsigned long rs_left;
2158 int i;
2159
2160 mdev->rs_last_events = curr_events;
2161
2162 /* sync speed average over the last 2*DRBD_SYNC_MARK_STEP,
2163 * approx. */
2649f080
LE
2164 i = (mdev->rs_last_mark + DRBD_SYNC_MARKS-1) % DRBD_SYNC_MARKS;
2165
2166 if (mdev->state.conn == C_VERIFY_S || mdev->state.conn == C_VERIFY_T)
2167 rs_left = mdev->ov_left;
2168 else
2169 rs_left = drbd_bm_total_weight(mdev) - mdev->rs_failed;
0f0601f4
LE
2170
2171 dt = ((long)jiffies - (long)mdev->rs_mark_time[i]) / HZ;
2172 if (!dt)
2173 dt++;
2174 db = mdev->rs_mark_left[i] - rs_left;
2175 dbdt = Bit2KB(db/dt);
2176
f399002e 2177 if (dbdt > mdev->ldev->dc.c_min_rate)
0f0601f4
LE
2178 throttle = 1;
2179 }
2180 return throttle;
2181}
2182
2183
4a76b161 2184static int receive_DataRequest(struct drbd_tconn *tconn, struct packet_info *pi)
b411b363 2185{
4a76b161 2186 struct drbd_conf *mdev;
b411b363 2187 sector_t sector;
4a76b161 2188 sector_t capacity;
db830c46 2189 struct drbd_peer_request *peer_req;
b411b363 2190 struct digest_info *di = NULL;
b18b37be 2191 int size, verb;
b411b363 2192 unsigned int fault_type;
e658983a 2193 struct p_block_req *p = pi->data;
4a76b161
AG
2194
2195 mdev = vnr_to_mdev(tconn, pi->vnr);
2196 if (!mdev)
2197 return -EIO;
2198 capacity = drbd_get_capacity(mdev->this_bdev);
b411b363
PR
2199
2200 sector = be64_to_cpu(p->sector);
2201 size = be32_to_cpu(p->blksize);
2202
c670a398 2203 if (size <= 0 || !IS_ALIGNED(size, 512) || size > DRBD_MAX_BIO_SIZE) {
b411b363
PR
2204 dev_err(DEV, "%s:%d: sector: %llus, size: %u\n", __FILE__, __LINE__,
2205 (unsigned long long)sector, size);
82bc0194 2206 return -EINVAL;
b411b363
PR
2207 }
2208 if (sector + (size>>9) > capacity) {
2209 dev_err(DEV, "%s:%d: sector: %llus, size: %u\n", __FILE__, __LINE__,
2210 (unsigned long long)sector, size);
82bc0194 2211 return -EINVAL;
b411b363
PR
2212 }
2213
2214 if (!get_ldev_if_state(mdev, D_UP_TO_DATE)) {
b18b37be 2215 verb = 1;
e2857216 2216 switch (pi->cmd) {
b18b37be
PR
2217 case P_DATA_REQUEST:
2218 drbd_send_ack_rp(mdev, P_NEG_DREPLY, p);
2219 break;
2220 case P_RS_DATA_REQUEST:
2221 case P_CSUM_RS_REQUEST:
2222 case P_OV_REQUEST:
2223 drbd_send_ack_rp(mdev, P_NEG_RS_DREPLY , p);
2224 break;
2225 case P_OV_REPLY:
2226 verb = 0;
2227 dec_rs_pending(mdev);
2228 drbd_send_ack_ex(mdev, P_OV_RESULT, sector, size, ID_IN_SYNC);
2229 break;
2230 default:
49ba9b1b 2231 BUG();
b18b37be
PR
2232 }
2233 if (verb && __ratelimit(&drbd_ratelimit_state))
b411b363
PR
2234 dev_err(DEV, "Can not satisfy peer's read request, "
2235 "no local data.\n");
b18b37be 2236
a821cc4a 2237 /* drain possibly payload */
e2857216 2238 return drbd_drain_block(mdev, pi->size);
b411b363
PR
2239 }
2240
2241 /* GFP_NOIO, because we must not cause arbitrary write-out: in a DRBD
2242 * "criss-cross" setup, that might cause write-out on some other DRBD,
2243 * which in turn might block on the other node at this very place. */
db830c46
AG
2244 peer_req = drbd_alloc_ee(mdev, p->block_id, sector, size, GFP_NOIO);
2245 if (!peer_req) {
b411b363 2246 put_ldev(mdev);
82bc0194 2247 return -ENOMEM;
b411b363
PR
2248 }
2249
e2857216 2250 switch (pi->cmd) {
b411b363 2251 case P_DATA_REQUEST:
db830c46 2252 peer_req->w.cb = w_e_end_data_req;
b411b363 2253 fault_type = DRBD_FAULT_DT_RD;
80a40e43
LE
2254 /* application IO, don't drbd_rs_begin_io */
2255 goto submit;
2256
b411b363 2257 case P_RS_DATA_REQUEST:
db830c46 2258 peer_req->w.cb = w_e_end_rsdata_req;
b411b363 2259 fault_type = DRBD_FAULT_RS_RD;
5f9915bb
LE
2260 /* used in the sector offset progress display */
2261 mdev->bm_resync_fo = BM_SECT_TO_BIT(sector);
b411b363
PR
2262 break;
2263
2264 case P_OV_REPLY:
2265 case P_CSUM_RS_REQUEST:
2266 fault_type = DRBD_FAULT_RS_RD;
e2857216 2267 di = kmalloc(sizeof(*di) + pi->size, GFP_NOIO);
b411b363
PR
2268 if (!di)
2269 goto out_free_e;
2270
e2857216 2271 di->digest_size = pi->size;
b411b363
PR
2272 di->digest = (((char *)di)+sizeof(struct digest_info));
2273
db830c46
AG
2274 peer_req->digest = di;
2275 peer_req->flags |= EE_HAS_DIGEST;
c36c3ced 2276
e2857216 2277 if (drbd_recv_all(mdev->tconn, di->digest, pi->size))
b411b363
PR
2278 goto out_free_e;
2279
e2857216 2280 if (pi->cmd == P_CSUM_RS_REQUEST) {
31890f4a 2281 D_ASSERT(mdev->tconn->agreed_pro_version >= 89);
db830c46 2282 peer_req->w.cb = w_e_end_csum_rs_req;
5f9915bb
LE
2283 /* used in the sector offset progress display */
2284 mdev->bm_resync_fo = BM_SECT_TO_BIT(sector);
e2857216 2285 } else if (pi->cmd == P_OV_REPLY) {
2649f080
LE
2286 /* track progress, we may need to throttle */
2287 atomic_add(size >> 9, &mdev->rs_sect_in);
db830c46 2288 peer_req->w.cb = w_e_end_ov_reply;
b411b363 2289 dec_rs_pending(mdev);
0f0601f4
LE
2290 /* drbd_rs_begin_io done when we sent this request,
2291 * but accounting still needs to be done. */
2292 goto submit_for_resync;
b411b363
PR
2293 }
2294 break;
2295
2296 case P_OV_REQUEST:
b411b363 2297 if (mdev->ov_start_sector == ~(sector_t)0 &&
31890f4a 2298 mdev->tconn->agreed_pro_version >= 90) {
de228bba
LE
2299 unsigned long now = jiffies;
2300 int i;
b411b363
PR
2301 mdev->ov_start_sector = sector;
2302 mdev->ov_position = sector;
30b743a2
LE
2303 mdev->ov_left = drbd_bm_bits(mdev) - BM_SECT_TO_BIT(sector);
2304 mdev->rs_total = mdev->ov_left;
de228bba
LE
2305 for (i = 0; i < DRBD_SYNC_MARKS; i++) {
2306 mdev->rs_mark_left[i] = mdev->ov_left;
2307 mdev->rs_mark_time[i] = now;
2308 }
b411b363
PR
2309 dev_info(DEV, "Online Verify start sector: %llu\n",
2310 (unsigned long long)sector);
2311 }
db830c46 2312 peer_req->w.cb = w_e_end_ov_req;
b411b363 2313 fault_type = DRBD_FAULT_RS_RD;
b411b363
PR
2314 break;
2315
b411b363 2316 default:
49ba9b1b 2317 BUG();
b411b363
PR
2318 }
2319
0f0601f4
LE
2320 /* Throttle, drbd_rs_begin_io and submit should become asynchronous
2321 * wrt the receiver, but it is not as straightforward as it may seem.
2322 * Various places in the resync start and stop logic assume resync
2323 * requests are processed in order, requeuing this on the worker thread
2324 * introduces a bunch of new code for synchronization between threads.
2325 *
2326 * Unlimited throttling before drbd_rs_begin_io may stall the resync
2327 * "forever", throttling after drbd_rs_begin_io will lock that extent
2328 * for application writes for the same time. For now, just throttle
2329 * here, where the rest of the code expects the receiver to sleep for
2330 * a while, anyways.
2331 */
2332
2333 /* Throttle before drbd_rs_begin_io, as that locks out application IO;
2334 * this defers syncer requests for some time, before letting at least
2335 * on request through. The resync controller on the receiving side
2336 * will adapt to the incoming rate accordingly.
2337 *
2338 * We cannot throttle here if remote is Primary/SyncTarget:
2339 * we would also throttle its application reads.
2340 * In that case, throttling is done on the SyncTarget only.
2341 */
e3555d85
PR
2342 if (mdev->state.peer != R_PRIMARY && drbd_rs_should_slow_down(mdev, sector))
2343 schedule_timeout_uninterruptible(HZ/10);
2344 if (drbd_rs_begin_io(mdev, sector))
80a40e43 2345 goto out_free_e;
b411b363 2346
0f0601f4
LE
2347submit_for_resync:
2348 atomic_add(size >> 9, &mdev->rs_sect_ev);
2349
80a40e43 2350submit:
b411b363 2351 inc_unacked(mdev);
87eeee41 2352 spin_lock_irq(&mdev->tconn->req_lock);
db830c46 2353 list_add_tail(&peer_req->w.list, &mdev->read_ee);
87eeee41 2354 spin_unlock_irq(&mdev->tconn->req_lock);
b411b363 2355
fbe29dec 2356 if (drbd_submit_peer_request(mdev, peer_req, READ, fault_type) == 0)
82bc0194 2357 return 0;
b411b363 2358
10f6d992
LE
2359 /* don't care for the reason here */
2360 dev_err(DEV, "submit failed, triggering re-connect\n");
87eeee41 2361 spin_lock_irq(&mdev->tconn->req_lock);
db830c46 2362 list_del(&peer_req->w.list);
87eeee41 2363 spin_unlock_irq(&mdev->tconn->req_lock);
22cc37a9
LE
2364 /* no drbd_rs_complete_io(), we are dropping the connection anyways */
2365
b411b363 2366out_free_e:
b411b363 2367 put_ldev(mdev);
db830c46 2368 drbd_free_ee(mdev, peer_req);
82bc0194 2369 return -EIO;
b411b363
PR
2370}
2371
2372static int drbd_asb_recover_0p(struct drbd_conf *mdev) __must_hold(local)
2373{
2374 int self, peer, rv = -100;
2375 unsigned long ch_self, ch_peer;
2376
2377 self = mdev->ldev->md.uuid[UI_BITMAP] & 1;
2378 peer = mdev->p_uuid[UI_BITMAP] & 1;
2379
2380 ch_peer = mdev->p_uuid[UI_SIZE];
2381 ch_self = mdev->comm_bm_set;
2382
89e58e75 2383 switch (mdev->tconn->net_conf->after_sb_0p) {
b411b363
PR
2384 case ASB_CONSENSUS:
2385 case ASB_DISCARD_SECONDARY:
2386 case ASB_CALL_HELPER:
2387 dev_err(DEV, "Configuration error.\n");
2388 break;
2389 case ASB_DISCONNECT:
2390 break;
2391 case ASB_DISCARD_YOUNGER_PRI:
2392 if (self == 0 && peer == 1) {
2393 rv = -1;
2394 break;
2395 }
2396 if (self == 1 && peer == 0) {
2397 rv = 1;
2398 break;
2399 }
2400 /* Else fall through to one of the other strategies... */
2401 case ASB_DISCARD_OLDER_PRI:
2402 if (self == 0 && peer == 1) {
2403 rv = 1;
2404 break;
2405 }
2406 if (self == 1 && peer == 0) {
2407 rv = -1;
2408 break;
2409 }
2410 /* Else fall through to one of the other strategies... */
ad19bf6e 2411 dev_warn(DEV, "Discard younger/older primary did not find a decision\n"
b411b363
PR
2412 "Using discard-least-changes instead\n");
2413 case ASB_DISCARD_ZERO_CHG:
2414 if (ch_peer == 0 && ch_self == 0) {
25703f83 2415 rv = test_bit(DISCARD_CONCURRENT, &mdev->tconn->flags)
b411b363
PR
2416 ? -1 : 1;
2417 break;
2418 } else {
2419 if (ch_peer == 0) { rv = 1; break; }
2420 if (ch_self == 0) { rv = -1; break; }
2421 }
89e58e75 2422 if (mdev->tconn->net_conf->after_sb_0p == ASB_DISCARD_ZERO_CHG)
b411b363
PR
2423 break;
2424 case ASB_DISCARD_LEAST_CHG:
2425 if (ch_self < ch_peer)
2426 rv = -1;
2427 else if (ch_self > ch_peer)
2428 rv = 1;
2429 else /* ( ch_self == ch_peer ) */
2430 /* Well, then use something else. */
25703f83 2431 rv = test_bit(DISCARD_CONCURRENT, &mdev->tconn->flags)
b411b363
PR
2432 ? -1 : 1;
2433 break;
2434 case ASB_DISCARD_LOCAL:
2435 rv = -1;
2436 break;
2437 case ASB_DISCARD_REMOTE:
2438 rv = 1;
2439 }
2440
2441 return rv;
2442}
2443
2444static int drbd_asb_recover_1p(struct drbd_conf *mdev) __must_hold(local)
2445{
6184ea21 2446 int hg, rv = -100;
b411b363 2447
89e58e75 2448 switch (mdev->tconn->net_conf->after_sb_1p) {
b411b363
PR
2449 case ASB_DISCARD_YOUNGER_PRI:
2450 case ASB_DISCARD_OLDER_PRI:
2451 case ASB_DISCARD_LEAST_CHG:
2452 case ASB_DISCARD_LOCAL:
2453 case ASB_DISCARD_REMOTE:
2454 dev_err(DEV, "Configuration error.\n");
2455 break;
2456 case ASB_DISCONNECT:
2457 break;
2458 case ASB_CONSENSUS:
2459 hg = drbd_asb_recover_0p(mdev);
2460 if (hg == -1 && mdev->state.role == R_SECONDARY)
2461 rv = hg;
2462 if (hg == 1 && mdev->state.role == R_PRIMARY)
2463 rv = hg;
2464 break;
2465 case ASB_VIOLENTLY:
2466 rv = drbd_asb_recover_0p(mdev);
2467 break;
2468 case ASB_DISCARD_SECONDARY:
2469 return mdev->state.role == R_PRIMARY ? 1 : -1;
2470 case ASB_CALL_HELPER:
2471 hg = drbd_asb_recover_0p(mdev);
2472 if (hg == -1 && mdev->state.role == R_PRIMARY) {
bb437946
AG
2473 enum drbd_state_rv rv2;
2474
2475 drbd_set_role(mdev, R_SECONDARY, 0);
b411b363
PR
2476 /* drbd_change_state() does not sleep while in SS_IN_TRANSIENT_STATE,
2477 * we might be here in C_WF_REPORT_PARAMS which is transient.
2478 * we do not need to wait for the after state change work either. */
bb437946
AG
2479 rv2 = drbd_change_state(mdev, CS_VERBOSE, NS(role, R_SECONDARY));
2480 if (rv2 != SS_SUCCESS) {
b411b363
PR
2481 drbd_khelper(mdev, "pri-lost-after-sb");
2482 } else {
2483 dev_warn(DEV, "Successfully gave up primary role.\n");
2484 rv = hg;
2485 }
2486 } else
2487 rv = hg;
2488 }
2489
2490 return rv;
2491}
2492
2493static int drbd_asb_recover_2p(struct drbd_conf *mdev) __must_hold(local)
2494{
6184ea21 2495 int hg, rv = -100;
b411b363 2496
89e58e75 2497 switch (mdev->tconn->net_conf->after_sb_2p) {
b411b363
PR
2498 case ASB_DISCARD_YOUNGER_PRI:
2499 case ASB_DISCARD_OLDER_PRI:
2500 case ASB_DISCARD_LEAST_CHG:
2501 case ASB_DISCARD_LOCAL:
2502 case ASB_DISCARD_REMOTE:
2503 case ASB_CONSENSUS:
2504 case ASB_DISCARD_SECONDARY:
2505 dev_err(DEV, "Configuration error.\n");
2506 break;
2507 case ASB_VIOLENTLY:
2508 rv = drbd_asb_recover_0p(mdev);
2509 break;
2510 case ASB_DISCONNECT:
2511 break;
2512 case ASB_CALL_HELPER:
2513 hg = drbd_asb_recover_0p(mdev);
2514 if (hg == -1) {
bb437946
AG
2515 enum drbd_state_rv rv2;
2516
b411b363
PR
2517 /* drbd_change_state() does not sleep while in SS_IN_TRANSIENT_STATE,
2518 * we might be here in C_WF_REPORT_PARAMS which is transient.
2519 * we do not need to wait for the after state change work either. */
bb437946
AG
2520 rv2 = drbd_change_state(mdev, CS_VERBOSE, NS(role, R_SECONDARY));
2521 if (rv2 != SS_SUCCESS) {
b411b363
PR
2522 drbd_khelper(mdev, "pri-lost-after-sb");
2523 } else {
2524 dev_warn(DEV, "Successfully gave up primary role.\n");
2525 rv = hg;
2526 }
2527 } else
2528 rv = hg;
2529 }
2530
2531 return rv;
2532}
2533
2534static void drbd_uuid_dump(struct drbd_conf *mdev, char *text, u64 *uuid,
2535 u64 bits, u64 flags)
2536{
2537 if (!uuid) {
2538 dev_info(DEV, "%s uuid info vanished while I was looking!\n", text);
2539 return;
2540 }
2541 dev_info(DEV, "%s %016llX:%016llX:%016llX:%016llX bits:%llu flags:%llX\n",
2542 text,
2543 (unsigned long long)uuid[UI_CURRENT],
2544 (unsigned long long)uuid[UI_BITMAP],
2545 (unsigned long long)uuid[UI_HISTORY_START],
2546 (unsigned long long)uuid[UI_HISTORY_END],
2547 (unsigned long long)bits,
2548 (unsigned long long)flags);
2549}
2550
2551/*
2552 100 after split brain try auto recover
2553 2 C_SYNC_SOURCE set BitMap
2554 1 C_SYNC_SOURCE use BitMap
2555 0 no Sync
2556 -1 C_SYNC_TARGET use BitMap
2557 -2 C_SYNC_TARGET set BitMap
2558 -100 after split brain, disconnect
2559-1000 unrelated data
4a23f264
PR
2560-1091 requires proto 91
2561-1096 requires proto 96
b411b363
PR
2562 */
2563static int drbd_uuid_compare(struct drbd_conf *mdev, int *rule_nr) __must_hold(local)
2564{
2565 u64 self, peer;
2566 int i, j;
2567
2568 self = mdev->ldev->md.uuid[UI_CURRENT] & ~((u64)1);
2569 peer = mdev->p_uuid[UI_CURRENT] & ~((u64)1);
2570
2571 *rule_nr = 10;
2572 if (self == UUID_JUST_CREATED && peer == UUID_JUST_CREATED)
2573 return 0;
2574
2575 *rule_nr = 20;
2576 if ((self == UUID_JUST_CREATED || self == (u64)0) &&
2577 peer != UUID_JUST_CREATED)
2578 return -2;
2579
2580 *rule_nr = 30;
2581 if (self != UUID_JUST_CREATED &&
2582 (peer == UUID_JUST_CREATED || peer == (u64)0))
2583 return 2;
2584
2585 if (self == peer) {
2586 int rct, dc; /* roles at crash time */
2587
2588 if (mdev->p_uuid[UI_BITMAP] == (u64)0 && mdev->ldev->md.uuid[UI_BITMAP] != (u64)0) {
2589
31890f4a 2590 if (mdev->tconn->agreed_pro_version < 91)
4a23f264 2591 return -1091;
b411b363
PR
2592
2593 if ((mdev->ldev->md.uuid[UI_BITMAP] & ~((u64)1)) == (mdev->p_uuid[UI_HISTORY_START] & ~((u64)1)) &&
2594 (mdev->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) == (mdev->p_uuid[UI_HISTORY_START + 1] & ~((u64)1))) {
2595 dev_info(DEV, "was SyncSource, missed the resync finished event, corrected myself:\n");
2596 drbd_uuid_set_bm(mdev, 0UL);
2597
2598 drbd_uuid_dump(mdev, "self", mdev->ldev->md.uuid,
2599 mdev->state.disk >= D_NEGOTIATING ? drbd_bm_total_weight(mdev) : 0, 0);
2600 *rule_nr = 34;
2601 } else {
2602 dev_info(DEV, "was SyncSource (peer failed to write sync_uuid)\n");
2603 *rule_nr = 36;
2604 }
2605
2606 return 1;
2607 }
2608
2609 if (mdev->ldev->md.uuid[UI_BITMAP] == (u64)0 && mdev->p_uuid[UI_BITMAP] != (u64)0) {
2610
31890f4a 2611 if (mdev->tconn->agreed_pro_version < 91)
4a23f264 2612 return -1091;
b411b363
PR
2613
2614 if ((mdev->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) == (mdev->p_uuid[UI_BITMAP] & ~((u64)1)) &&
2615 (mdev->ldev->md.uuid[UI_HISTORY_START + 1] & ~((u64)1)) == (mdev->p_uuid[UI_HISTORY_START] & ~((u64)1))) {
2616 dev_info(DEV, "was SyncTarget, peer missed the resync finished event, corrected peer:\n");
2617
2618 mdev->p_uuid[UI_HISTORY_START + 1] = mdev->p_uuid[UI_HISTORY_START];
2619 mdev->p_uuid[UI_HISTORY_START] = mdev->p_uuid[UI_BITMAP];
2620 mdev->p_uuid[UI_BITMAP] = 0UL;
2621
2622 drbd_uuid_dump(mdev, "peer", mdev->p_uuid, mdev->p_uuid[UI_SIZE], mdev->p_uuid[UI_FLAGS]);
2623 *rule_nr = 35;
2624 } else {
2625 dev_info(DEV, "was SyncTarget (failed to write sync_uuid)\n");
2626 *rule_nr = 37;
2627 }
2628
2629 return -1;
2630 }
2631
2632 /* Common power [off|failure] */
2633 rct = (test_bit(CRASHED_PRIMARY, &mdev->flags) ? 1 : 0) +
2634 (mdev->p_uuid[UI_FLAGS] & 2);
2635 /* lowest bit is set when we were primary,
2636 * next bit (weight 2) is set when peer was primary */
2637 *rule_nr = 40;
2638
2639 switch (rct) {
2640 case 0: /* !self_pri && !peer_pri */ return 0;
2641 case 1: /* self_pri && !peer_pri */ return 1;
2642 case 2: /* !self_pri && peer_pri */ return -1;
2643 case 3: /* self_pri && peer_pri */
25703f83 2644 dc = test_bit(DISCARD_CONCURRENT, &mdev->tconn->flags);
b411b363
PR
2645 return dc ? -1 : 1;
2646 }
2647 }
2648
2649 *rule_nr = 50;
2650 peer = mdev->p_uuid[UI_BITMAP] & ~((u64)1);
2651 if (self == peer)
2652 return -1;
2653
2654 *rule_nr = 51;
2655 peer = mdev->p_uuid[UI_HISTORY_START] & ~((u64)1);
2656 if (self == peer) {
31890f4a 2657 if (mdev->tconn->agreed_pro_version < 96 ?
4a23f264
PR
2658 (mdev->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) ==
2659 (mdev->p_uuid[UI_HISTORY_START + 1] & ~((u64)1)) :
2660 peer + UUID_NEW_BM_OFFSET == (mdev->p_uuid[UI_BITMAP] & ~((u64)1))) {
b411b363
PR
2661 /* The last P_SYNC_UUID did not get though. Undo the last start of
2662 resync as sync source modifications of the peer's UUIDs. */
2663
31890f4a 2664 if (mdev->tconn->agreed_pro_version < 91)
4a23f264 2665 return -1091;
b411b363
PR
2666
2667 mdev->p_uuid[UI_BITMAP] = mdev->p_uuid[UI_HISTORY_START];
2668 mdev->p_uuid[UI_HISTORY_START] = mdev->p_uuid[UI_HISTORY_START + 1];
4a23f264
PR
2669
2670 dev_info(DEV, "Did not got last syncUUID packet, corrected:\n");
2671 drbd_uuid_dump(mdev, "peer", mdev->p_uuid, mdev->p_uuid[UI_SIZE], mdev->p_uuid[UI_FLAGS]);
2672
b411b363
PR
2673 return -1;
2674 }
2675 }
2676
2677 *rule_nr = 60;
2678 self = mdev->ldev->md.uuid[UI_CURRENT] & ~((u64)1);
2679 for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
2680 peer = mdev->p_uuid[i] & ~((u64)1);
2681 if (self == peer)
2682 return -2;
2683 }
2684
2685 *rule_nr = 70;
2686 self = mdev->ldev->md.uuid[UI_BITMAP] & ~((u64)1);
2687 peer = mdev->p_uuid[UI_CURRENT] & ~((u64)1);
2688 if (self == peer)
2689 return 1;
2690
2691 *rule_nr = 71;
2692 self = mdev->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1);
2693 if (self == peer) {
31890f4a 2694 if (mdev->tconn->agreed_pro_version < 96 ?
4a23f264
PR
2695 (mdev->ldev->md.uuid[UI_HISTORY_START + 1] & ~((u64)1)) ==
2696 (mdev->p_uuid[UI_HISTORY_START] & ~((u64)1)) :
2697 self + UUID_NEW_BM_OFFSET == (mdev->ldev->md.uuid[UI_BITMAP] & ~((u64)1))) {
b411b363
PR
2698 /* The last P_SYNC_UUID did not get though. Undo the last start of
2699 resync as sync source modifications of our UUIDs. */
2700
31890f4a 2701 if (mdev->tconn->agreed_pro_version < 91)
4a23f264 2702 return -1091;
b411b363
PR
2703
2704 _drbd_uuid_set(mdev, UI_BITMAP, mdev->ldev->md.uuid[UI_HISTORY_START]);
2705 _drbd_uuid_set(mdev, UI_HISTORY_START, mdev->ldev->md.uuid[UI_HISTORY_START + 1]);
2706
4a23f264 2707 dev_info(DEV, "Last syncUUID did not get through, corrected:\n");
b411b363
PR
2708 drbd_uuid_dump(mdev, "self", mdev->ldev->md.uuid,
2709 mdev->state.disk >= D_NEGOTIATING ? drbd_bm_total_weight(mdev) : 0, 0);
2710
2711 return 1;
2712 }
2713 }
2714
2715
2716 *rule_nr = 80;
d8c2a36b 2717 peer = mdev->p_uuid[UI_CURRENT] & ~((u64)1);
b411b363
PR
2718 for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
2719 self = mdev->ldev->md.uuid[i] & ~((u64)1);
2720 if (self == peer)
2721 return 2;
2722 }
2723
2724 *rule_nr = 90;
2725 self = mdev->ldev->md.uuid[UI_BITMAP] & ~((u64)1);
2726 peer = mdev->p_uuid[UI_BITMAP] & ~((u64)1);
2727 if (self == peer && self != ((u64)0))
2728 return 100;
2729
2730 *rule_nr = 100;
2731 for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
2732 self = mdev->ldev->md.uuid[i] & ~((u64)1);
2733 for (j = UI_HISTORY_START; j <= UI_HISTORY_END; j++) {
2734 peer = mdev->p_uuid[j] & ~((u64)1);
2735 if (self == peer)
2736 return -100;
2737 }
2738 }
2739
2740 return -1000;
2741}
2742
2743/* drbd_sync_handshake() returns the new conn state on success, or
2744 CONN_MASK (-1) on failure.
2745 */
2746static enum drbd_conns drbd_sync_handshake(struct drbd_conf *mdev, enum drbd_role peer_role,
2747 enum drbd_disk_state peer_disk) __must_hold(local)
2748{
2749 int hg, rule_nr;
2750 enum drbd_conns rv = C_MASK;
2751 enum drbd_disk_state mydisk;
2752
2753 mydisk = mdev->state.disk;
2754 if (mydisk == D_NEGOTIATING)
2755 mydisk = mdev->new_state_tmp.disk;
2756
2757 dev_info(DEV, "drbd_sync_handshake:\n");
2758 drbd_uuid_dump(mdev, "self", mdev->ldev->md.uuid, mdev->comm_bm_set, 0);
2759 drbd_uuid_dump(mdev, "peer", mdev->p_uuid,
2760 mdev->p_uuid[UI_SIZE], mdev->p_uuid[UI_FLAGS]);
2761
2762 hg = drbd_uuid_compare(mdev, &rule_nr);
2763
2764 dev_info(DEV, "uuid_compare()=%d by rule %d\n", hg, rule_nr);
2765
2766 if (hg == -1000) {
2767 dev_alert(DEV, "Unrelated data, aborting!\n");
2768 return C_MASK;
2769 }
4a23f264
PR
2770 if (hg < -1000) {
2771 dev_alert(DEV, "To resolve this both sides have to support at least protocol %d\n", -hg - 1000);
b411b363
PR
2772 return C_MASK;
2773 }
2774
2775 if ((mydisk == D_INCONSISTENT && peer_disk > D_INCONSISTENT) ||
2776 (peer_disk == D_INCONSISTENT && mydisk > D_INCONSISTENT)) {
2777 int f = (hg == -100) || abs(hg) == 2;
2778 hg = mydisk > D_INCONSISTENT ? 1 : -1;
2779 if (f)
2780 hg = hg*2;
2781 dev_info(DEV, "Becoming sync %s due to disk states.\n",
2782 hg > 0 ? "source" : "target");
2783 }
2784
3a11a487
AG
2785 if (abs(hg) == 100)
2786 drbd_khelper(mdev, "initial-split-brain");
2787
89e58e75 2788 if (hg == 100 || (hg == -100 && mdev->tconn->net_conf->always_asbp)) {
b411b363
PR
2789 int pcount = (mdev->state.role == R_PRIMARY)
2790 + (peer_role == R_PRIMARY);
2791 int forced = (hg == -100);
2792
2793 switch (pcount) {
2794 case 0:
2795 hg = drbd_asb_recover_0p(mdev);
2796 break;
2797 case 1:
2798 hg = drbd_asb_recover_1p(mdev);
2799 break;
2800 case 2:
2801 hg = drbd_asb_recover_2p(mdev);
2802 break;
2803 }
2804 if (abs(hg) < 100) {
2805 dev_warn(DEV, "Split-Brain detected, %d primaries, "
2806 "automatically solved. Sync from %s node\n",
2807 pcount, (hg < 0) ? "peer" : "this");
2808 if (forced) {
2809 dev_warn(DEV, "Doing a full sync, since"
2810 " UUIDs where ambiguous.\n");
2811 hg = hg*2;
2812 }
2813 }
2814 }
2815
2816 if (hg == -100) {
89e58e75 2817 if (mdev->tconn->net_conf->want_lose && !(mdev->p_uuid[UI_FLAGS]&1))
b411b363 2818 hg = -1;
89e58e75 2819 if (!mdev->tconn->net_conf->want_lose && (mdev->p_uuid[UI_FLAGS]&1))
b411b363
PR
2820 hg = 1;
2821
2822 if (abs(hg) < 100)
2823 dev_warn(DEV, "Split-Brain detected, manually solved. "
2824 "Sync from %s node\n",
2825 (hg < 0) ? "peer" : "this");
2826 }
2827
2828 if (hg == -100) {
580b9767
LE
2829 /* FIXME this log message is not correct if we end up here
2830 * after an attempted attach on a diskless node.
2831 * We just refuse to attach -- well, we drop the "connection"
2832 * to that disk, in a way... */
3a11a487 2833 dev_alert(DEV, "Split-Brain detected but unresolved, dropping connection!\n");
b411b363
PR
2834 drbd_khelper(mdev, "split-brain");
2835 return C_MASK;
2836 }
2837
2838 if (hg > 0 && mydisk <= D_INCONSISTENT) {
2839 dev_err(DEV, "I shall become SyncSource, but I am inconsistent!\n");
2840 return C_MASK;
2841 }
2842
2843 if (hg < 0 && /* by intention we do not use mydisk here. */
2844 mdev->state.role == R_PRIMARY && mdev->state.disk >= D_CONSISTENT) {
89e58e75 2845 switch (mdev->tconn->net_conf->rr_conflict) {
b411b363
PR
2846 case ASB_CALL_HELPER:
2847 drbd_khelper(mdev, "pri-lost");
2848 /* fall through */
2849 case ASB_DISCONNECT:
2850 dev_err(DEV, "I shall become SyncTarget, but I am primary!\n");
2851 return C_MASK;
2852 case ASB_VIOLENTLY:
2853 dev_warn(DEV, "Becoming SyncTarget, violating the stable-data"
2854 "assumption\n");
2855 }
2856 }
2857
8169e41b 2858 if (mdev->tconn->net_conf->dry_run || test_bit(CONN_DRY_RUN, &mdev->tconn->flags)) {
cf14c2e9
PR
2859 if (hg == 0)
2860 dev_info(DEV, "dry-run connect: No resync, would become Connected immediately.\n");
2861 else
2862 dev_info(DEV, "dry-run connect: Would become %s, doing a %s resync.",
2863 drbd_conn_str(hg > 0 ? C_SYNC_SOURCE : C_SYNC_TARGET),
2864 abs(hg) >= 2 ? "full" : "bit-map based");
2865 return C_MASK;
2866 }
2867
b411b363
PR
2868 if (abs(hg) >= 2) {
2869 dev_info(DEV, "Writing the whole bitmap, full sync required after drbd_sync_handshake.\n");
20ceb2b2
LE
2870 if (drbd_bitmap_io(mdev, &drbd_bmio_set_n_write, "set_n_write from sync_handshake",
2871 BM_LOCKED_SET_ALLOWED))
b411b363
PR
2872 return C_MASK;
2873 }
2874
2875 if (hg > 0) { /* become sync source. */
2876 rv = C_WF_BITMAP_S;
2877 } else if (hg < 0) { /* become sync target */
2878 rv = C_WF_BITMAP_T;
2879 } else {
2880 rv = C_CONNECTED;
2881 if (drbd_bm_total_weight(mdev)) {
2882 dev_info(DEV, "No resync, but %lu bits in bitmap!\n",
2883 drbd_bm_total_weight(mdev));
2884 }
2885 }
2886
2887 return rv;
2888}
2889
2890/* returns 1 if invalid */
2891static int cmp_after_sb(enum drbd_after_sb_p peer, enum drbd_after_sb_p self)
2892{
2893 /* ASB_DISCARD_REMOTE - ASB_DISCARD_LOCAL is valid */
2894 if ((peer == ASB_DISCARD_REMOTE && self == ASB_DISCARD_LOCAL) ||
2895 (self == ASB_DISCARD_REMOTE && peer == ASB_DISCARD_LOCAL))
2896 return 0;
2897
2898 /* any other things with ASB_DISCARD_REMOTE or ASB_DISCARD_LOCAL are invalid */
2899 if (peer == ASB_DISCARD_REMOTE || peer == ASB_DISCARD_LOCAL ||
2900 self == ASB_DISCARD_REMOTE || self == ASB_DISCARD_LOCAL)
2901 return 1;
2902
2903 /* everything else is valid if they are equal on both sides. */
2904 if (peer == self)
2905 return 0;
2906
2907 /* everything es is invalid. */
2908 return 1;
2909}
2910
e2857216 2911static int receive_protocol(struct drbd_tconn *tconn, struct packet_info *pi)
b411b363 2912{
e658983a 2913 struct p_protocol *p = pi->data;
b411b363 2914 int p_proto, p_after_sb_0p, p_after_sb_1p, p_after_sb_2p;
cf14c2e9 2915 int p_want_lose, p_two_primaries, cf;
b411b363
PR
2916 char p_integrity_alg[SHARED_SECRET_MAX] = "";
2917
b411b363
PR
2918 p_proto = be32_to_cpu(p->protocol);
2919 p_after_sb_0p = be32_to_cpu(p->after_sb_0p);
2920 p_after_sb_1p = be32_to_cpu(p->after_sb_1p);
2921 p_after_sb_2p = be32_to_cpu(p->after_sb_2p);
b411b363 2922 p_two_primaries = be32_to_cpu(p->two_primaries);
cf14c2e9
PR
2923 cf = be32_to_cpu(p->conn_flags);
2924 p_want_lose = cf & CF_WANT_LOSE;
2925
7204624c 2926 clear_bit(CONN_DRY_RUN, &tconn->flags);
cf14c2e9
PR
2927
2928 if (cf & CF_DRY_RUN)
7204624c 2929 set_bit(CONN_DRY_RUN, &tconn->flags);
b411b363 2930
7204624c
PR
2931 if (p_proto != tconn->net_conf->wire_protocol) {
2932 conn_err(tconn, "incompatible communication protocols\n");
b411b363
PR
2933 goto disconnect;
2934 }
2935
7204624c
PR
2936 if (cmp_after_sb(p_after_sb_0p, tconn->net_conf->after_sb_0p)) {
2937 conn_err(tconn, "incompatible after-sb-0pri settings\n");
b411b363
PR
2938 goto disconnect;
2939 }
2940
7204624c
PR
2941 if (cmp_after_sb(p_after_sb_1p, tconn->net_conf->after_sb_1p)) {
2942 conn_err(tconn, "incompatible after-sb-1pri settings\n");
b411b363
PR
2943 goto disconnect;
2944 }
2945
7204624c
PR
2946 if (cmp_after_sb(p_after_sb_2p, tconn->net_conf->after_sb_2p)) {
2947 conn_err(tconn, "incompatible after-sb-2pri settings\n");
b411b363
PR
2948 goto disconnect;
2949 }
2950
7204624c
PR
2951 if (p_want_lose && tconn->net_conf->want_lose) {
2952 conn_err(tconn, "both sides have the 'want_lose' flag set\n");
b411b363
PR
2953 goto disconnect;
2954 }
2955
7204624c
PR
2956 if (p_two_primaries != tconn->net_conf->two_primaries) {
2957 conn_err(tconn, "incompatible setting of the two-primaries options\n");
b411b363
PR
2958 goto disconnect;
2959 }
2960
7204624c
PR
2961 if (tconn->agreed_pro_version >= 87) {
2962 unsigned char *my_alg = tconn->net_conf->integrity_alg;
82bc0194 2963 int err;
b411b363 2964
e2857216 2965 err = drbd_recv_all(tconn, p_integrity_alg, pi->size);
82bc0194
AG
2966 if (err)
2967 return err;
b411b363
PR
2968
2969 p_integrity_alg[SHARED_SECRET_MAX-1] = 0;
2970 if (strcmp(p_integrity_alg, my_alg)) {
7204624c 2971 conn_err(tconn, "incompatible setting of the data-integrity-alg\n");
b411b363
PR
2972 goto disconnect;
2973 }
7204624c 2974 conn_info(tconn, "data-integrity-alg: %s\n",
b411b363
PR
2975 my_alg[0] ? my_alg : (unsigned char *)"<not-used>");
2976 }
2977
82bc0194 2978 return 0;
b411b363
PR
2979
2980disconnect:
7204624c 2981 conn_request_state(tconn, NS(conn, C_DISCONNECTING), CS_HARD);
82bc0194 2982 return -EIO;
b411b363
PR
2983}
2984
2985/* helper function
2986 * input: alg name, feature name
2987 * return: NULL (alg name was "")
2988 * ERR_PTR(error) if something goes wrong
2989 * or the crypto hash ptr, if it worked out ok. */
2990struct crypto_hash *drbd_crypto_alloc_digest_safe(const struct drbd_conf *mdev,
2991 const char *alg, const char *name)
2992{
2993 struct crypto_hash *tfm;
2994
2995 if (!alg[0])
2996 return NULL;
2997
2998 tfm = crypto_alloc_hash(alg, 0, CRYPTO_ALG_ASYNC);
2999 if (IS_ERR(tfm)) {
3000 dev_err(DEV, "Can not allocate \"%s\" as %s (reason: %ld)\n",
3001 alg, name, PTR_ERR(tfm));
3002 return tfm;
3003 }
3004 if (!drbd_crypto_is_hash(crypto_hash_tfm(tfm))) {
3005 crypto_free_hash(tfm);
3006 dev_err(DEV, "\"%s\" is not a digest (%s)\n", alg, name);
3007 return ERR_PTR(-EINVAL);
3008 }
3009 return tfm;
3010}
3011
4a76b161
AG
3012static int ignore_remaining_packet(struct drbd_tconn *tconn, struct packet_info *pi)
3013{
3014 void *buffer = tconn->data.rbuf;
3015 int size = pi->size;
3016
3017 while (size) {
3018 int s = min_t(int, size, DRBD_SOCKET_BUFFER_SIZE);
3019 s = drbd_recv(tconn, buffer, s);
3020 if (s <= 0) {
3021 if (s < 0)
3022 return s;
3023 break;
3024 }
3025 size -= s;
3026 }
3027 if (size)
3028 return -EIO;
3029 return 0;
3030}
3031
3032/*
3033 * config_unknown_volume - device configuration command for unknown volume
3034 *
3035 * When a device is added to an existing connection, the node on which the
3036 * device is added first will send configuration commands to its peer but the
3037 * peer will not know about the device yet. It will warn and ignore these
3038 * commands. Once the device is added on the second node, the second node will
3039 * send the same device configuration commands, but in the other direction.
3040 *
3041 * (We can also end up here if drbd is misconfigured.)
3042 */
3043static int config_unknown_volume(struct drbd_tconn *tconn, struct packet_info *pi)
3044{
3045 conn_warn(tconn, "Volume %u unknown; ignoring %s packet\n",
3046 pi->vnr, cmdname(pi->cmd));
3047 return ignore_remaining_packet(tconn, pi);
3048}
3049
3050static int receive_SyncParam(struct drbd_tconn *tconn, struct packet_info *pi)
b411b363 3051{
4a76b161 3052 struct drbd_conf *mdev;
e658983a 3053 struct p_rs_param_95 *p;
b411b363
PR
3054 unsigned int header_size, data_size, exp_max_sz;
3055 struct crypto_hash *verify_tfm = NULL;
3056 struct crypto_hash *csums_tfm = NULL;
4a76b161 3057 const int apv = tconn->agreed_pro_version;
778f271d
PR
3058 int *rs_plan_s = NULL;
3059 int fifo_size = 0;
82bc0194 3060 int err;
b411b363 3061
4a76b161
AG
3062 mdev = vnr_to_mdev(tconn, pi->vnr);
3063 if (!mdev)
3064 return config_unknown_volume(tconn, pi);
3065
b411b363
PR
3066 exp_max_sz = apv <= 87 ? sizeof(struct p_rs_param)
3067 : apv == 88 ? sizeof(struct p_rs_param)
3068 + SHARED_SECRET_MAX
8e26f9cc
PR
3069 : apv <= 94 ? sizeof(struct p_rs_param_89)
3070 : /* apv >= 95 */ sizeof(struct p_rs_param_95);
b411b363 3071
e2857216 3072 if (pi->size > exp_max_sz) {
b411b363 3073 dev_err(DEV, "SyncParam packet too long: received %u, expected <= %u bytes\n",
e2857216 3074 pi->size, exp_max_sz);
82bc0194 3075 return -EIO;
b411b363
PR
3076 }
3077
3078 if (apv <= 88) {
e658983a 3079 header_size = sizeof(struct p_rs_param);
e2857216 3080 data_size = pi->size - header_size;
8e26f9cc 3081 } else if (apv <= 94) {
e658983a 3082 header_size = sizeof(struct p_rs_param_89);
e2857216 3083 data_size = pi->size - header_size;
b411b363 3084 D_ASSERT(data_size == 0);
8e26f9cc 3085 } else {
e658983a 3086 header_size = sizeof(struct p_rs_param_95);
e2857216 3087 data_size = pi->size - header_size;
b411b363
PR
3088 D_ASSERT(data_size == 0);
3089 }
3090
3091 /* initialize verify_alg and csums_alg */
e658983a 3092 p = pi->data;
b411b363
PR
3093 memset(p->verify_alg, 0, 2 * SHARED_SECRET_MAX);
3094
e658983a 3095 err = drbd_recv_all(mdev->tconn, p, header_size);
82bc0194
AG
3096 if (err)
3097 return err;
b411b363 3098
f399002e
LE
3099 if (get_ldev(mdev)) {
3100 mdev->ldev->dc.resync_rate = be32_to_cpu(p->rate);
3101 put_ldev(mdev);
3102 }
b411b363
PR
3103
3104 if (apv >= 88) {
3105 if (apv == 88) {
3106 if (data_size > SHARED_SECRET_MAX) {
3107 dev_err(DEV, "verify-alg too long, "
3108 "peer wants %u, accepting only %u byte\n",
3109 data_size, SHARED_SECRET_MAX);
82bc0194 3110 return -EIO;
b411b363
PR
3111 }
3112
82bc0194
AG
3113 err = drbd_recv_all(mdev->tconn, p->verify_alg, data_size);
3114 if (err)
3115 return err;
b411b363
PR
3116
3117 /* we expect NUL terminated string */
3118 /* but just in case someone tries to be evil */
3119 D_ASSERT(p->verify_alg[data_size-1] == 0);
3120 p->verify_alg[data_size-1] = 0;
3121
3122 } else /* apv >= 89 */ {
3123 /* we still expect NUL terminated strings */
3124 /* but just in case someone tries to be evil */
3125 D_ASSERT(p->verify_alg[SHARED_SECRET_MAX-1] == 0);
3126 D_ASSERT(p->csums_alg[SHARED_SECRET_MAX-1] == 0);
3127 p->verify_alg[SHARED_SECRET_MAX-1] = 0;
3128 p->csums_alg[SHARED_SECRET_MAX-1] = 0;
3129 }
3130
f399002e 3131 if (strcmp(mdev->tconn->net_conf->verify_alg, p->verify_alg)) {
b411b363
PR
3132 if (mdev->state.conn == C_WF_REPORT_PARAMS) {
3133 dev_err(DEV, "Different verify-alg settings. me=\"%s\" peer=\"%s\"\n",
f399002e 3134 mdev->tconn->net_conf->verify_alg, p->verify_alg);
b411b363
PR
3135 goto disconnect;
3136 }
3137 verify_tfm = drbd_crypto_alloc_digest_safe(mdev,
3138 p->verify_alg, "verify-alg");
3139 if (IS_ERR(verify_tfm)) {
3140 verify_tfm = NULL;
3141 goto disconnect;
3142 }
3143 }
3144
f399002e 3145 if (apv >= 89 && strcmp(mdev->tconn->net_conf->csums_alg, p->csums_alg)) {
b411b363
PR
3146 if (mdev->state.conn == C_WF_REPORT_PARAMS) {
3147 dev_err(DEV, "Different csums-alg settings. me=\"%s\" peer=\"%s\"\n",
f399002e 3148 mdev->tconn->net_conf->csums_alg, p->csums_alg);
b411b363
PR
3149 goto disconnect;
3150 }
3151 csums_tfm = drbd_crypto_alloc_digest_safe(mdev,
3152 p->csums_alg, "csums-alg");
3153 if (IS_ERR(csums_tfm)) {
3154 csums_tfm = NULL;
3155 goto disconnect;
3156 }
3157 }
3158
f399002e
LE
3159 if (apv > 94 && get_ldev(mdev)) {
3160 mdev->ldev->dc.resync_rate = be32_to_cpu(p->rate);
3161 mdev->ldev->dc.c_plan_ahead = be32_to_cpu(p->c_plan_ahead);
3162 mdev->ldev->dc.c_delay_target = be32_to_cpu(p->c_delay_target);
3163 mdev->ldev->dc.c_fill_target = be32_to_cpu(p->c_fill_target);
3164 mdev->ldev->dc.c_max_rate = be32_to_cpu(p->c_max_rate);
778f271d 3165
f399002e 3166 fifo_size = (mdev->ldev->dc.c_plan_ahead * 10 * SLEEP_TIME) / HZ;
778f271d
PR
3167 if (fifo_size != mdev->rs_plan_s.size && fifo_size > 0) {
3168 rs_plan_s = kzalloc(sizeof(int) * fifo_size, GFP_KERNEL);
3169 if (!rs_plan_s) {
3170 dev_err(DEV, "kmalloc of fifo_buffer failed");
f399002e 3171 put_ldev(mdev);
778f271d
PR
3172 goto disconnect;
3173 }
3174 }
f399002e 3175 put_ldev(mdev);
8e26f9cc 3176 }
b411b363
PR
3177
3178 spin_lock(&mdev->peer_seq_lock);
3179 /* lock against drbd_nl_syncer_conf() */
3180 if (verify_tfm) {
f399002e
LE
3181 strcpy(mdev->tconn->net_conf->verify_alg, p->verify_alg);
3182 mdev->tconn->net_conf->verify_alg_len = strlen(p->verify_alg) + 1;
3183 crypto_free_hash(mdev->tconn->verify_tfm);
3184 mdev->tconn->verify_tfm = verify_tfm;
b411b363
PR
3185 dev_info(DEV, "using verify-alg: \"%s\"\n", p->verify_alg);
3186 }
3187 if (csums_tfm) {
f399002e
LE
3188 strcpy(mdev->tconn->net_conf->csums_alg, p->csums_alg);
3189 mdev->tconn->net_conf->csums_alg_len = strlen(p->csums_alg) + 1;
3190 crypto_free_hash(mdev->tconn->csums_tfm);
3191 mdev->tconn->csums_tfm = csums_tfm;
b411b363
PR
3192 dev_info(DEV, "using csums-alg: \"%s\"\n", p->csums_alg);
3193 }
778f271d
PR
3194 if (fifo_size != mdev->rs_plan_s.size) {
3195 kfree(mdev->rs_plan_s.values);
3196 mdev->rs_plan_s.values = rs_plan_s;
3197 mdev->rs_plan_s.size = fifo_size;
3198 mdev->rs_planed = 0;
3199 }
b411b363
PR
3200 spin_unlock(&mdev->peer_seq_lock);
3201 }
82bc0194 3202 return 0;
b411b363 3203
b411b363
PR
3204disconnect:
3205 /* just for completeness: actually not needed,
3206 * as this is not reached if csums_tfm was ok. */
3207 crypto_free_hash(csums_tfm);
3208 /* but free the verify_tfm again, if csums_tfm did not work out */
3209 crypto_free_hash(verify_tfm);
38fa9988 3210 conn_request_state(mdev->tconn, NS(conn, C_DISCONNECTING), CS_HARD);
82bc0194 3211 return -EIO;
b411b363
PR
3212}
3213
b411b363
PR
3214/* warn if the arguments differ by more than 12.5% */
3215static void warn_if_differ_considerably(struct drbd_conf *mdev,
3216 const char *s, sector_t a, sector_t b)
3217{
3218 sector_t d;
3219 if (a == 0 || b == 0)
3220 return;
3221 d = (a > b) ? (a - b) : (b - a);
3222 if (d > (a>>3) || d > (b>>3))
3223 dev_warn(DEV, "Considerable difference in %s: %llus vs. %llus\n", s,
3224 (unsigned long long)a, (unsigned long long)b);
3225}
3226
4a76b161 3227static int receive_sizes(struct drbd_tconn *tconn, struct packet_info *pi)
b411b363 3228{
4a76b161 3229 struct drbd_conf *mdev;
e658983a 3230 struct p_sizes *p = pi->data;
b411b363 3231 enum determine_dev_size dd = unchanged;
b411b363
PR
3232 sector_t p_size, p_usize, my_usize;
3233 int ldsc = 0; /* local disk size changed */
e89b591c 3234 enum dds_flags ddsf;
b411b363 3235
4a76b161
AG
3236 mdev = vnr_to_mdev(tconn, pi->vnr);
3237 if (!mdev)
3238 return config_unknown_volume(tconn, pi);
3239
b411b363
PR
3240 p_size = be64_to_cpu(p->d_size);
3241 p_usize = be64_to_cpu(p->u_size);
3242
b411b363
PR
3243 /* just store the peer's disk size for now.
3244 * we still need to figure out whether we accept that. */
3245 mdev->p_size = p_size;
3246
b411b363
PR
3247 if (get_ldev(mdev)) {
3248 warn_if_differ_considerably(mdev, "lower level device sizes",
3249 p_size, drbd_get_max_capacity(mdev->ldev));
3250 warn_if_differ_considerably(mdev, "user requested size",
3251 p_usize, mdev->ldev->dc.disk_size);
3252
3253 /* if this is the first connect, or an otherwise expected
3254 * param exchange, choose the minimum */
3255 if (mdev->state.conn == C_WF_REPORT_PARAMS)
3256 p_usize = min_not_zero((sector_t)mdev->ldev->dc.disk_size,
3257 p_usize);
3258
3259 my_usize = mdev->ldev->dc.disk_size;
3260
3261 if (mdev->ldev->dc.disk_size != p_usize) {
3262 mdev->ldev->dc.disk_size = p_usize;
3263 dev_info(DEV, "Peer sets u_size to %lu sectors\n",
3264 (unsigned long)mdev->ldev->dc.disk_size);
3265 }
3266
3267 /* Never shrink a device with usable data during connect.
3268 But allow online shrinking if we are connected. */
a393db6f 3269 if (drbd_new_dev_size(mdev, mdev->ldev, 0) <
b411b363
PR
3270 drbd_get_capacity(mdev->this_bdev) &&
3271 mdev->state.disk >= D_OUTDATED &&
3272 mdev->state.conn < C_CONNECTED) {
3273 dev_err(DEV, "The peer's disk size is too small!\n");
38fa9988 3274 conn_request_state(mdev->tconn, NS(conn, C_DISCONNECTING), CS_HARD);
b411b363
PR
3275 mdev->ldev->dc.disk_size = my_usize;
3276 put_ldev(mdev);
82bc0194 3277 return -EIO;
b411b363
PR
3278 }
3279 put_ldev(mdev);
3280 }
b411b363 3281
e89b591c 3282 ddsf = be16_to_cpu(p->dds_flags);
b411b363 3283 if (get_ldev(mdev)) {
24c4830c 3284 dd = drbd_determine_dev_size(mdev, ddsf);
b411b363
PR
3285 put_ldev(mdev);
3286 if (dd == dev_size_error)
82bc0194 3287 return -EIO;
b411b363
PR
3288 drbd_md_sync(mdev);
3289 } else {
3290 /* I am diskless, need to accept the peer's size. */
3291 drbd_set_my_capacity(mdev, p_size);
3292 }
3293
99432fcc
PR
3294 mdev->peer_max_bio_size = be32_to_cpu(p->max_bio_size);
3295 drbd_reconsider_max_bio_size(mdev);
3296
b411b363
PR
3297 if (get_ldev(mdev)) {
3298 if (mdev->ldev->known_size != drbd_get_capacity(mdev->ldev->backing_bdev)) {
3299 mdev->ldev->known_size = drbd_get_capacity(mdev->ldev->backing_bdev);
3300 ldsc = 1;
3301 }
3302
b411b363
PR
3303 put_ldev(mdev);
3304 }
3305
3306 if (mdev->state.conn > C_WF_REPORT_PARAMS) {
3307 if (be64_to_cpu(p->c_size) !=
3308 drbd_get_capacity(mdev->this_bdev) || ldsc) {
3309 /* we have different sizes, probably peer
3310 * needs to know my new size... */
e89b591c 3311 drbd_send_sizes(mdev, 0, ddsf);
b411b363
PR
3312 }
3313 if (test_and_clear_bit(RESIZE_PENDING, &mdev->flags) ||
3314 (dd == grew && mdev->state.conn == C_CONNECTED)) {
3315 if (mdev->state.pdsk >= D_INCONSISTENT &&
e89b591c
PR
3316 mdev->state.disk >= D_INCONSISTENT) {
3317 if (ddsf & DDSF_NO_RESYNC)
3318 dev_info(DEV, "Resync of new storage suppressed with --assume-clean\n");
3319 else
3320 resync_after_online_grow(mdev);
3321 } else
b411b363
PR
3322 set_bit(RESYNC_AFTER_NEG, &mdev->flags);
3323 }
3324 }
3325
82bc0194 3326 return 0;
b411b363
PR
3327}
3328
4a76b161 3329static int receive_uuids(struct drbd_tconn *tconn, struct packet_info *pi)
b411b363 3330{
4a76b161 3331 struct drbd_conf *mdev;
e658983a 3332 struct p_uuids *p = pi->data;
b411b363 3333 u64 *p_uuid;
62b0da3a 3334 int i, updated_uuids = 0;
b411b363 3335
4a76b161
AG
3336 mdev = vnr_to_mdev(tconn, pi->vnr);
3337 if (!mdev)
3338 return config_unknown_volume(tconn, pi);
3339
b411b363
PR
3340 p_uuid = kmalloc(sizeof(u64)*UI_EXTENDED_SIZE, GFP_NOIO);
3341
3342 for (i = UI_CURRENT; i < UI_EXTENDED_SIZE; i++)
3343 p_uuid[i] = be64_to_cpu(p->uuid[i]);
3344
3345 kfree(mdev->p_uuid);
3346 mdev->p_uuid = p_uuid;
3347
3348 if (mdev->state.conn < C_CONNECTED &&
3349 mdev->state.disk < D_INCONSISTENT &&
3350 mdev->state.role == R_PRIMARY &&
3351 (mdev->ed_uuid & ~((u64)1)) != (p_uuid[UI_CURRENT] & ~((u64)1))) {
3352 dev_err(DEV, "Can only connect to data with current UUID=%016llX\n",
3353 (unsigned long long)mdev->ed_uuid);
38fa9988 3354 conn_request_state(mdev->tconn, NS(conn, C_DISCONNECTING), CS_HARD);
82bc0194 3355 return -EIO;
b411b363
PR
3356 }
3357
3358 if (get_ldev(mdev)) {
3359 int skip_initial_sync =
3360 mdev->state.conn == C_CONNECTED &&
31890f4a 3361 mdev->tconn->agreed_pro_version >= 90 &&
b411b363
PR
3362 mdev->ldev->md.uuid[UI_CURRENT] == UUID_JUST_CREATED &&
3363 (p_uuid[UI_FLAGS] & 8);
3364 if (skip_initial_sync) {
3365 dev_info(DEV, "Accepted new current UUID, preparing to skip initial sync\n");
3366 drbd_bitmap_io(mdev, &drbd_bmio_clear_n_write,
20ceb2b2
LE
3367 "clear_n_write from receive_uuids",
3368 BM_LOCKED_TEST_ALLOWED);
b411b363
PR
3369 _drbd_uuid_set(mdev, UI_CURRENT, p_uuid[UI_CURRENT]);
3370 _drbd_uuid_set(mdev, UI_BITMAP, 0);
3371 _drbd_set_state(_NS2(mdev, disk, D_UP_TO_DATE, pdsk, D_UP_TO_DATE),
3372 CS_VERBOSE, NULL);
3373 drbd_md_sync(mdev);
62b0da3a 3374 updated_uuids = 1;
b411b363
PR
3375 }
3376 put_ldev(mdev);
18a50fa2
PR
3377 } else if (mdev->state.disk < D_INCONSISTENT &&
3378 mdev->state.role == R_PRIMARY) {
3379 /* I am a diskless primary, the peer just created a new current UUID
3380 for me. */
62b0da3a 3381 updated_uuids = drbd_set_ed_uuid(mdev, p_uuid[UI_CURRENT]);
b411b363
PR
3382 }
3383
3384 /* Before we test for the disk state, we should wait until an eventually
3385 ongoing cluster wide state change is finished. That is important if
3386 we are primary and are detaching from our disk. We need to see the
3387 new disk state... */
8410da8f
PR
3388 mutex_lock(mdev->state_mutex);
3389 mutex_unlock(mdev->state_mutex);
b411b363 3390 if (mdev->state.conn >= C_CONNECTED && mdev->state.disk < D_INCONSISTENT)
62b0da3a
LE
3391 updated_uuids |= drbd_set_ed_uuid(mdev, p_uuid[UI_CURRENT]);
3392
3393 if (updated_uuids)
3394 drbd_print_uuids(mdev, "receiver updated UUIDs to");
b411b363 3395
82bc0194 3396 return 0;
b411b363
PR
3397}
3398
3399/**
3400 * convert_state() - Converts the peer's view of the cluster state to our point of view
3401 * @ps: The state as seen by the peer.
3402 */
3403static union drbd_state convert_state(union drbd_state ps)
3404{
3405 union drbd_state ms;
3406
3407 static enum drbd_conns c_tab[] = {
3408 [C_CONNECTED] = C_CONNECTED,
3409
3410 [C_STARTING_SYNC_S] = C_STARTING_SYNC_T,
3411 [C_STARTING_SYNC_T] = C_STARTING_SYNC_S,
3412 [C_DISCONNECTING] = C_TEAR_DOWN, /* C_NETWORK_FAILURE, */
3413 [C_VERIFY_S] = C_VERIFY_T,
3414 [C_MASK] = C_MASK,
3415 };
3416
3417 ms.i = ps.i;
3418
3419 ms.conn = c_tab[ps.conn];
3420 ms.peer = ps.role;
3421 ms.role = ps.peer;
3422 ms.pdsk = ps.disk;
3423 ms.disk = ps.pdsk;
3424 ms.peer_isp = (ps.aftr_isp | ps.user_isp);
3425
3426 return ms;
3427}
3428
4a76b161 3429static int receive_req_state(struct drbd_tconn *tconn, struct packet_info *pi)
b411b363 3430{
4a76b161 3431 struct drbd_conf *mdev;
e658983a 3432 struct p_req_state *p = pi->data;
b411b363 3433 union drbd_state mask, val;
bf885f8a 3434 enum drbd_state_rv rv;
b411b363 3435
4a76b161
AG
3436 mdev = vnr_to_mdev(tconn, pi->vnr);
3437 if (!mdev)
3438 return -EIO;
3439
b411b363
PR
3440 mask.i = be32_to_cpu(p->mask);
3441 val.i = be32_to_cpu(p->val);
3442
25703f83 3443 if (test_bit(DISCARD_CONCURRENT, &mdev->tconn->flags) &&
8410da8f 3444 mutex_is_locked(mdev->state_mutex)) {
b411b363 3445 drbd_send_sr_reply(mdev, SS_CONCURRENT_ST_CHG);
82bc0194 3446 return 0;
b411b363
PR
3447 }
3448
3449 mask = convert_state(mask);
3450 val = convert_state(val);
3451
dfafcc8a
PR
3452 rv = drbd_change_state(mdev, CS_VERBOSE, mask, val);
3453 drbd_send_sr_reply(mdev, rv);
b411b363 3454
b411b363
PR
3455 drbd_md_sync(mdev);
3456
82bc0194 3457 return 0;
b411b363
PR
3458}
3459
e2857216 3460static int receive_req_conn_state(struct drbd_tconn *tconn, struct packet_info *pi)
dfafcc8a 3461{
e658983a 3462 struct p_req_state *p = pi->data;
dfafcc8a
PR
3463 union drbd_state mask, val;
3464 enum drbd_state_rv rv;
3465
3466 mask.i = be32_to_cpu(p->mask);
3467 val.i = be32_to_cpu(p->val);
3468
3469 if (test_bit(DISCARD_CONCURRENT, &tconn->flags) &&
3470 mutex_is_locked(&tconn->cstate_mutex)) {
3471 conn_send_sr_reply(tconn, SS_CONCURRENT_ST_CHG);
82bc0194 3472 return 0;
dfafcc8a
PR
3473 }
3474
3475 mask = convert_state(mask);
3476 val = convert_state(val);
3477
778bcf2e 3478 rv = conn_request_state(tconn, mask, val, CS_VERBOSE | CS_LOCAL_ONLY | CS_IGN_OUTD_FAIL);
dfafcc8a
PR
3479 conn_send_sr_reply(tconn, rv);
3480
82bc0194 3481 return 0;
dfafcc8a
PR
3482}
3483
4a76b161 3484static int receive_state(struct drbd_tconn *tconn, struct packet_info *pi)
b411b363 3485{
4a76b161 3486 struct drbd_conf *mdev;
e658983a 3487 struct p_state *p = pi->data;
4ac4aada 3488 union drbd_state os, ns, peer_state;
b411b363 3489 enum drbd_disk_state real_peer_disk;
65d922c3 3490 enum chg_state_flags cs_flags;
b411b363
PR
3491 int rv;
3492
4a76b161
AG
3493 mdev = vnr_to_mdev(tconn, pi->vnr);
3494 if (!mdev)
3495 return config_unknown_volume(tconn, pi);
3496
b411b363
PR
3497 peer_state.i = be32_to_cpu(p->state);
3498
3499 real_peer_disk = peer_state.disk;
3500 if (peer_state.disk == D_NEGOTIATING) {
3501 real_peer_disk = mdev->p_uuid[UI_FLAGS] & 4 ? D_INCONSISTENT : D_CONSISTENT;
3502 dev_info(DEV, "real peer disk state = %s\n", drbd_disk_str(real_peer_disk));
3503 }
3504
87eeee41 3505 spin_lock_irq(&mdev->tconn->req_lock);
b411b363 3506 retry:
78bae59b 3507 os = ns = drbd_read_state(mdev);
87eeee41 3508 spin_unlock_irq(&mdev->tconn->req_lock);
b411b363 3509
e9ef7bb6
LE
3510 /* peer says his disk is uptodate, while we think it is inconsistent,
3511 * and this happens while we think we have a sync going on. */
3512 if (os.pdsk == D_INCONSISTENT && real_peer_disk == D_UP_TO_DATE &&
3513 os.conn > C_CONNECTED && os.disk == D_UP_TO_DATE) {
3514 /* If we are (becoming) SyncSource, but peer is still in sync
3515 * preparation, ignore its uptodate-ness to avoid flapping, it
3516 * will change to inconsistent once the peer reaches active
3517 * syncing states.
3518 * It may have changed syncer-paused flags, however, so we
3519 * cannot ignore this completely. */
3520 if (peer_state.conn > C_CONNECTED &&
3521 peer_state.conn < C_SYNC_SOURCE)
3522 real_peer_disk = D_INCONSISTENT;
3523
3524 /* if peer_state changes to connected at the same time,
3525 * it explicitly notifies us that it finished resync.
3526 * Maybe we should finish it up, too? */
3527 else if (os.conn >= C_SYNC_SOURCE &&
3528 peer_state.conn == C_CONNECTED) {
3529 if (drbd_bm_total_weight(mdev) <= mdev->rs_failed)
3530 drbd_resync_finished(mdev);
82bc0194 3531 return 0;
e9ef7bb6
LE
3532 }
3533 }
3534
3535 /* peer says his disk is inconsistent, while we think it is uptodate,
3536 * and this happens while the peer still thinks we have a sync going on,
3537 * but we think we are already done with the sync.
3538 * We ignore this to avoid flapping pdsk.
3539 * This should not happen, if the peer is a recent version of drbd. */
3540 if (os.pdsk == D_UP_TO_DATE && real_peer_disk == D_INCONSISTENT &&
3541 os.conn == C_CONNECTED && peer_state.conn > C_SYNC_SOURCE)
3542 real_peer_disk = D_UP_TO_DATE;
3543
4ac4aada
LE
3544 if (ns.conn == C_WF_REPORT_PARAMS)
3545 ns.conn = C_CONNECTED;
b411b363 3546
67531718
PR
3547 if (peer_state.conn == C_AHEAD)
3548 ns.conn = C_BEHIND;
3549
b411b363
PR
3550 if (mdev->p_uuid && peer_state.disk >= D_NEGOTIATING &&
3551 get_ldev_if_state(mdev, D_NEGOTIATING)) {
3552 int cr; /* consider resync */
3553
3554 /* if we established a new connection */
4ac4aada 3555 cr = (os.conn < C_CONNECTED);
b411b363
PR
3556 /* if we had an established connection
3557 * and one of the nodes newly attaches a disk */
4ac4aada 3558 cr |= (os.conn == C_CONNECTED &&
b411b363 3559 (peer_state.disk == D_NEGOTIATING ||
4ac4aada 3560 os.disk == D_NEGOTIATING));
b411b363
PR
3561 /* if we have both been inconsistent, and the peer has been
3562 * forced to be UpToDate with --overwrite-data */
3563 cr |= test_bit(CONSIDER_RESYNC, &mdev->flags);
3564 /* if we had been plain connected, and the admin requested to
3565 * start a sync by "invalidate" or "invalidate-remote" */
4ac4aada 3566 cr |= (os.conn == C_CONNECTED &&
b411b363
PR
3567 (peer_state.conn >= C_STARTING_SYNC_S &&
3568 peer_state.conn <= C_WF_BITMAP_T));
3569
3570 if (cr)
4ac4aada 3571 ns.conn = drbd_sync_handshake(mdev, peer_state.role, real_peer_disk);
b411b363
PR
3572
3573 put_ldev(mdev);
4ac4aada
LE
3574 if (ns.conn == C_MASK) {
3575 ns.conn = C_CONNECTED;
b411b363 3576 if (mdev->state.disk == D_NEGOTIATING) {
82f59cc6 3577 drbd_force_state(mdev, NS(disk, D_FAILED));
b411b363
PR
3578 } else if (peer_state.disk == D_NEGOTIATING) {
3579 dev_err(DEV, "Disk attach process on the peer node was aborted.\n");
3580 peer_state.disk = D_DISKLESS;
580b9767 3581 real_peer_disk = D_DISKLESS;
b411b363 3582 } else {
8169e41b 3583 if (test_and_clear_bit(CONN_DRY_RUN, &mdev->tconn->flags))
82bc0194 3584 return -EIO;
4ac4aada 3585 D_ASSERT(os.conn == C_WF_REPORT_PARAMS);
38fa9988 3586 conn_request_state(mdev->tconn, NS(conn, C_DISCONNECTING), CS_HARD);
82bc0194 3587 return -EIO;
b411b363
PR
3588 }
3589 }
3590 }
3591
87eeee41 3592 spin_lock_irq(&mdev->tconn->req_lock);
78bae59b 3593 if (os.i != drbd_read_state(mdev).i)
b411b363
PR
3594 goto retry;
3595 clear_bit(CONSIDER_RESYNC, &mdev->flags);
b411b363
PR
3596 ns.peer = peer_state.role;
3597 ns.pdsk = real_peer_disk;
3598 ns.peer_isp = (peer_state.aftr_isp | peer_state.user_isp);
4ac4aada 3599 if ((ns.conn == C_CONNECTED || ns.conn == C_WF_BITMAP_S) && ns.disk == D_NEGOTIATING)
b411b363 3600 ns.disk = mdev->new_state_tmp.disk;
4ac4aada 3601 cs_flags = CS_VERBOSE + (os.conn < C_CONNECTED && ns.conn >= C_CONNECTED ? 0 : CS_HARD);
2aebfabb 3602 if (ns.pdsk == D_CONSISTENT && drbd_suspended(mdev) && ns.conn == C_CONNECTED && os.conn < C_CONNECTED &&
481c6f50 3603 test_bit(NEW_CUR_UUID, &mdev->flags)) {
8554df1c 3604 /* Do not allow tl_restart(RESEND) for a rebooted peer. We can only allow this
481c6f50 3605 for temporal network outages! */
87eeee41 3606 spin_unlock_irq(&mdev->tconn->req_lock);
481c6f50 3607 dev_err(DEV, "Aborting Connect, can not thaw IO with an only Consistent peer\n");
2f5cdd0b 3608 tl_clear(mdev->tconn);
481c6f50
PR
3609 drbd_uuid_new_current(mdev);
3610 clear_bit(NEW_CUR_UUID, &mdev->flags);
38fa9988 3611 conn_request_state(mdev->tconn, NS2(conn, C_PROTOCOL_ERROR, susp, 0), CS_HARD);
82bc0194 3612 return -EIO;
481c6f50 3613 }
65d922c3 3614 rv = _drbd_set_state(mdev, ns, cs_flags, NULL);
78bae59b 3615 ns = drbd_read_state(mdev);
87eeee41 3616 spin_unlock_irq(&mdev->tconn->req_lock);
b411b363
PR
3617
3618 if (rv < SS_SUCCESS) {
38fa9988 3619 conn_request_state(mdev->tconn, NS(conn, C_DISCONNECTING), CS_HARD);
82bc0194 3620 return -EIO;
b411b363
PR
3621 }
3622
4ac4aada
LE
3623 if (os.conn > C_WF_REPORT_PARAMS) {
3624 if (ns.conn > C_CONNECTED && peer_state.conn <= C_CONNECTED &&
b411b363
PR
3625 peer_state.disk != D_NEGOTIATING ) {
3626 /* we want resync, peer has not yet decided to sync... */
3627 /* Nowadays only used when forcing a node into primary role and
3628 setting its disk to UpToDate with that */
3629 drbd_send_uuids(mdev);
3630 drbd_send_state(mdev);
3631 }
3632 }
3633
89e58e75 3634 mdev->tconn->net_conf->want_lose = 0;
b411b363
PR
3635
3636 drbd_md_sync(mdev); /* update connected indicator, la_size, ... */
3637
82bc0194 3638 return 0;
b411b363
PR
3639}
3640
4a76b161 3641static int receive_sync_uuid(struct drbd_tconn *tconn, struct packet_info *pi)
b411b363 3642{
4a76b161 3643 struct drbd_conf *mdev;
e658983a 3644 struct p_rs_uuid *p = pi->data;
4a76b161
AG
3645
3646 mdev = vnr_to_mdev(tconn, pi->vnr);
3647 if (!mdev)
3648 return -EIO;
b411b363
PR
3649
3650 wait_event(mdev->misc_wait,
3651 mdev->state.conn == C_WF_SYNC_UUID ||
c4752ef1 3652 mdev->state.conn == C_BEHIND ||
b411b363
PR
3653 mdev->state.conn < C_CONNECTED ||
3654 mdev->state.disk < D_NEGOTIATING);
3655
3656 /* D_ASSERT( mdev->state.conn == C_WF_SYNC_UUID ); */
3657
b411b363
PR
3658 /* Here the _drbd_uuid_ functions are right, current should
3659 _not_ be rotated into the history */
3660 if (get_ldev_if_state(mdev, D_NEGOTIATING)) {
3661 _drbd_uuid_set(mdev, UI_CURRENT, be64_to_cpu(p->uuid));
3662 _drbd_uuid_set(mdev, UI_BITMAP, 0UL);
3663
62b0da3a 3664 drbd_print_uuids(mdev, "updated sync uuid");
b411b363
PR
3665 drbd_start_resync(mdev, C_SYNC_TARGET);
3666
3667 put_ldev(mdev);
3668 } else
3669 dev_err(DEV, "Ignoring SyncUUID packet!\n");
3670
82bc0194 3671 return 0;
b411b363
PR
3672}
3673
2c46407d
AG
3674/**
3675 * receive_bitmap_plain
3676 *
3677 * Return 0 when done, 1 when another iteration is needed, and a negative error
3678 * code upon failure.
3679 */
3680static int
50d0b1ad 3681receive_bitmap_plain(struct drbd_conf *mdev, unsigned int size,
e658983a 3682 unsigned long *p, struct bm_xfer_ctx *c)
b411b363 3683{
50d0b1ad
AG
3684 unsigned int data_size = DRBD_SOCKET_BUFFER_SIZE -
3685 drbd_header_size(mdev->tconn);
e658983a 3686 unsigned int num_words = min_t(size_t, data_size / sizeof(*p),
50d0b1ad 3687 c->bm_words - c->word_offset);
e658983a 3688 unsigned int want = num_words * sizeof(*p);
2c46407d 3689 int err;
b411b363 3690
50d0b1ad
AG
3691 if (want != size) {
3692 dev_err(DEV, "%s:want (%u) != size (%u)\n", __func__, want, size);
2c46407d 3693 return -EIO;
b411b363
PR
3694 }
3695 if (want == 0)
2c46407d 3696 return 0;
e658983a 3697 err = drbd_recv_all(mdev->tconn, p, want);
82bc0194 3698 if (err)
2c46407d 3699 return err;
b411b363 3700
e658983a 3701 drbd_bm_merge_lel(mdev, c->word_offset, num_words, p);
b411b363
PR
3702
3703 c->word_offset += num_words;
3704 c->bit_offset = c->word_offset * BITS_PER_LONG;
3705 if (c->bit_offset > c->bm_bits)
3706 c->bit_offset = c->bm_bits;
3707
2c46407d 3708 return 1;
b411b363
PR
3709}
3710
a02d1240
AG
3711static enum drbd_bitmap_code dcbp_get_code(struct p_compressed_bm *p)
3712{
3713 return (enum drbd_bitmap_code)(p->encoding & 0x0f);
3714}
3715
3716static int dcbp_get_start(struct p_compressed_bm *p)
3717{
3718 return (p->encoding & 0x80) != 0;
3719}
3720
3721static int dcbp_get_pad_bits(struct p_compressed_bm *p)
3722{
3723 return (p->encoding >> 4) & 0x7;
3724}
3725
2c46407d
AG
3726/**
3727 * recv_bm_rle_bits
3728 *
3729 * Return 0 when done, 1 when another iteration is needed, and a negative error
3730 * code upon failure.
3731 */
3732static int
b411b363
PR
3733recv_bm_rle_bits(struct drbd_conf *mdev,
3734 struct p_compressed_bm *p,
c6d25cfe
PR
3735 struct bm_xfer_ctx *c,
3736 unsigned int len)
b411b363
PR
3737{
3738 struct bitstream bs;
3739 u64 look_ahead;
3740 u64 rl;
3741 u64 tmp;
3742 unsigned long s = c->bit_offset;
3743 unsigned long e;
a02d1240 3744 int toggle = dcbp_get_start(p);
b411b363
PR
3745 int have;
3746 int bits;
3747
a02d1240 3748 bitstream_init(&bs, p->code, len, dcbp_get_pad_bits(p));
b411b363
PR
3749
3750 bits = bitstream_get_bits(&bs, &look_ahead, 64);
3751 if (bits < 0)
2c46407d 3752 return -EIO;
b411b363
PR
3753
3754 for (have = bits; have > 0; s += rl, toggle = !toggle) {
3755 bits = vli_decode_bits(&rl, look_ahead);
3756 if (bits <= 0)
2c46407d 3757 return -EIO;
b411b363
PR
3758
3759 if (toggle) {
3760 e = s + rl -1;
3761 if (e >= c->bm_bits) {
3762 dev_err(DEV, "bitmap overflow (e:%lu) while decoding bm RLE packet\n", e);
2c46407d 3763 return -EIO;
b411b363
PR
3764 }
3765 _drbd_bm_set_bits(mdev, s, e);
3766 }
3767
3768 if (have < bits) {
3769 dev_err(DEV, "bitmap decoding error: h:%d b:%d la:0x%08llx l:%u/%u\n",
3770 have, bits, look_ahead,
3771 (unsigned int)(bs.cur.b - p->code),
3772 (unsigned int)bs.buf_len);
2c46407d 3773 return -EIO;
b411b363
PR
3774 }
3775 look_ahead >>= bits;
3776 have -= bits;
3777
3778 bits = bitstream_get_bits(&bs, &tmp, 64 - have);
3779 if (bits < 0)
2c46407d 3780 return -EIO;
b411b363
PR
3781 look_ahead |= tmp << have;
3782 have += bits;
3783 }
3784
3785 c->bit_offset = s;
3786 bm_xfer_ctx_bit_to_word_offset(c);
3787
2c46407d 3788 return (s != c->bm_bits);
b411b363
PR
3789}
3790
2c46407d
AG
3791/**
3792 * decode_bitmap_c
3793 *
3794 * Return 0 when done, 1 when another iteration is needed, and a negative error
3795 * code upon failure.
3796 */
3797static int
b411b363
PR
3798decode_bitmap_c(struct drbd_conf *mdev,
3799 struct p_compressed_bm *p,
c6d25cfe
PR
3800 struct bm_xfer_ctx *c,
3801 unsigned int len)
b411b363 3802{
a02d1240 3803 if (dcbp_get_code(p) == RLE_VLI_Bits)
e658983a 3804 return recv_bm_rle_bits(mdev, p, c, len - sizeof(*p));
b411b363
PR
3805
3806 /* other variants had been implemented for evaluation,
3807 * but have been dropped as this one turned out to be "best"
3808 * during all our tests. */
3809
3810 dev_err(DEV, "receive_bitmap_c: unknown encoding %u\n", p->encoding);
38fa9988 3811 conn_request_state(mdev->tconn, NS(conn, C_PROTOCOL_ERROR), CS_HARD);
2c46407d 3812 return -EIO;
b411b363
PR
3813}
3814
3815void INFO_bm_xfer_stats(struct drbd_conf *mdev,
3816 const char *direction, struct bm_xfer_ctx *c)
3817{
3818 /* what would it take to transfer it "plaintext" */
50d0b1ad
AG
3819 unsigned int header_size = drbd_header_size(mdev->tconn);
3820 unsigned int data_size = DRBD_SOCKET_BUFFER_SIZE - header_size;
3821 unsigned int plain =
3822 header_size * (DIV_ROUND_UP(c->bm_words, data_size) + 1) +
3823 c->bm_words * sizeof(unsigned long);
3824 unsigned int total = c->bytes[0] + c->bytes[1];
3825 unsigned int r;
b411b363
PR
3826
3827 /* total can not be zero. but just in case: */
3828 if (total == 0)
3829 return;
3830
3831 /* don't report if not compressed */
3832 if (total >= plain)
3833 return;
3834
3835 /* total < plain. check for overflow, still */
3836 r = (total > UINT_MAX/1000) ? (total / (plain/1000))
3837 : (1000 * total / plain);
3838
3839 if (r > 1000)
3840 r = 1000;
3841
3842 r = 1000 - r;
3843 dev_info(DEV, "%s bitmap stats [Bytes(packets)]: plain %u(%u), RLE %u(%u), "
3844 "total %u; compression: %u.%u%%\n",
3845 direction,
3846 c->bytes[1], c->packets[1],
3847 c->bytes[0], c->packets[0],
3848 total, r/10, r % 10);
3849}
3850
3851/* Since we are processing the bitfield from lower addresses to higher,
3852 it does not matter if the process it in 32 bit chunks or 64 bit
3853 chunks as long as it is little endian. (Understand it as byte stream,
3854 beginning with the lowest byte...) If we would use big endian
3855 we would need to process it from the highest address to the lowest,
3856 in order to be agnostic to the 32 vs 64 bits issue.
3857
3858 returns 0 on failure, 1 if we successfully received it. */
4a76b161 3859static int receive_bitmap(struct drbd_tconn *tconn, struct packet_info *pi)
b411b363 3860{
4a76b161 3861 struct drbd_conf *mdev;
b411b363 3862 struct bm_xfer_ctx c;
2c46407d 3863 int err;
4a76b161
AG
3864
3865 mdev = vnr_to_mdev(tconn, pi->vnr);
3866 if (!mdev)
3867 return -EIO;
b411b363 3868
20ceb2b2
LE
3869 drbd_bm_lock(mdev, "receive bitmap", BM_LOCKED_SET_ALLOWED);
3870 /* you are supposed to send additional out-of-sync information
3871 * if you actually set bits during this phase */
b411b363 3872
b411b363
PR
3873 c = (struct bm_xfer_ctx) {
3874 .bm_bits = drbd_bm_bits(mdev),
3875 .bm_words = drbd_bm_words(mdev),
3876 };
3877
2c46407d 3878 for(;;) {
e658983a
AG
3879 if (pi->cmd == P_BITMAP)
3880 err = receive_bitmap_plain(mdev, pi->size, pi->data, &c);
3881 else if (pi->cmd == P_COMPRESSED_BITMAP) {
b411b363
PR
3882 /* MAYBE: sanity check that we speak proto >= 90,
3883 * and the feature is enabled! */
e658983a 3884 struct p_compressed_bm *p = pi->data;
b411b363 3885
50d0b1ad 3886 if (pi->size > DRBD_SOCKET_BUFFER_SIZE - drbd_header_size(tconn)) {
b411b363 3887 dev_err(DEV, "ReportCBitmap packet too large\n");
82bc0194 3888 err = -EIO;
b411b363
PR
3889 goto out;
3890 }
e658983a 3891 if (pi->size <= sizeof(*p)) {
e2857216 3892 dev_err(DEV, "ReportCBitmap packet too small (l:%u)\n", pi->size);
82bc0194 3893 err = -EIO;
78fcbdae 3894 goto out;
b411b363 3895 }
e658983a
AG
3896 err = drbd_recv_all(mdev->tconn, p, pi->size);
3897 if (err)
3898 goto out;
e2857216 3899 err = decode_bitmap_c(mdev, p, &c, pi->size);
b411b363 3900 } else {
e2857216 3901 dev_warn(DEV, "receive_bitmap: cmd neither ReportBitMap nor ReportCBitMap (is 0x%x)", pi->cmd);
82bc0194 3902 err = -EIO;
b411b363
PR
3903 goto out;
3904 }
3905
e2857216 3906 c.packets[pi->cmd == P_BITMAP]++;
50d0b1ad 3907 c.bytes[pi->cmd == P_BITMAP] += drbd_header_size(tconn) + pi->size;
b411b363 3908
2c46407d
AG
3909 if (err <= 0) {
3910 if (err < 0)
3911 goto out;
b411b363 3912 break;
2c46407d 3913 }
e2857216 3914 err = drbd_recv_header(mdev->tconn, pi);
82bc0194 3915 if (err)
b411b363 3916 goto out;
2c46407d 3917 }
b411b363
PR
3918
3919 INFO_bm_xfer_stats(mdev, "receive", &c);
3920
3921 if (mdev->state.conn == C_WF_BITMAP_T) {
de1f8e4a
AG
3922 enum drbd_state_rv rv;
3923
82bc0194
AG
3924 err = drbd_send_bitmap(mdev);
3925 if (err)
b411b363
PR
3926 goto out;
3927 /* Omit CS_ORDERED with this state transition to avoid deadlocks. */
de1f8e4a
AG
3928 rv = _drbd_request_state(mdev, NS(conn, C_WF_SYNC_UUID), CS_VERBOSE);
3929 D_ASSERT(rv == SS_SUCCESS);
b411b363
PR
3930 } else if (mdev->state.conn != C_WF_BITMAP_S) {
3931 /* admin may have requested C_DISCONNECTING,
3932 * other threads may have noticed network errors */
3933 dev_info(DEV, "unexpected cstate (%s) in receive_bitmap\n",
3934 drbd_conn_str(mdev->state.conn));
3935 }
82bc0194 3936 err = 0;
b411b363 3937
b411b363 3938 out:
20ceb2b2 3939 drbd_bm_unlock(mdev);
82bc0194 3940 if (!err && mdev->state.conn == C_WF_BITMAP_S)
b411b363 3941 drbd_start_resync(mdev, C_SYNC_SOURCE);
82bc0194 3942 return err;
b411b363
PR
3943}
3944
4a76b161 3945static int receive_skip(struct drbd_tconn *tconn, struct packet_info *pi)
b411b363 3946{
4a76b161 3947 conn_warn(tconn, "skipping unknown optional packet type %d, l: %d!\n",
e2857216 3948 pi->cmd, pi->size);
2de876ef 3949
4a76b161 3950 return ignore_remaining_packet(tconn, pi);
2de876ef
PR
3951}
3952
4a76b161 3953static int receive_UnplugRemote(struct drbd_tconn *tconn, struct packet_info *pi)
0ced55a3 3954{
e7f52dfb
LE
3955 /* Make sure we've acked all the TCP data associated
3956 * with the data requests being unplugged */
4a76b161 3957 drbd_tcp_quickack(tconn->data.socket);
0ced55a3 3958
82bc0194 3959 return 0;
0ced55a3
PR
3960}
3961
4a76b161 3962static int receive_out_of_sync(struct drbd_tconn *tconn, struct packet_info *pi)
73a01a18 3963{
4a76b161 3964 struct drbd_conf *mdev;
e658983a 3965 struct p_block_desc *p = pi->data;
4a76b161
AG
3966
3967 mdev = vnr_to_mdev(tconn, pi->vnr);
3968 if (!mdev)
3969 return -EIO;
73a01a18 3970
f735e363
LE
3971 switch (mdev->state.conn) {
3972 case C_WF_SYNC_UUID:
3973 case C_WF_BITMAP_T:
3974 case C_BEHIND:
3975 break;
3976 default:
3977 dev_err(DEV, "ASSERT FAILED cstate = %s, expected: WFSyncUUID|WFBitMapT|Behind\n",
3978 drbd_conn_str(mdev->state.conn));
3979 }
3980
73a01a18
PR
3981 drbd_set_out_of_sync(mdev, be64_to_cpu(p->sector), be32_to_cpu(p->blksize));
3982
82bc0194 3983 return 0;
73a01a18
PR
3984}
3985
02918be2
PR
3986struct data_cmd {
3987 int expect_payload;
3988 size_t pkt_size;
4a76b161 3989 int (*fn)(struct drbd_tconn *, struct packet_info *);
02918be2
PR
3990};
3991
3992static struct data_cmd drbd_cmd_handler[] = {
4a76b161
AG
3993 [P_DATA] = { 1, sizeof(struct p_data), receive_Data },
3994 [P_DATA_REPLY] = { 1, sizeof(struct p_data), receive_DataReply },
3995 [P_RS_DATA_REPLY] = { 1, sizeof(struct p_data), receive_RSDataReply } ,
3996 [P_BARRIER] = { 0, sizeof(struct p_barrier), receive_Barrier } ,
e658983a
AG
3997 [P_BITMAP] = { 1, 0, receive_bitmap } ,
3998 [P_COMPRESSED_BITMAP] = { 1, 0, receive_bitmap } ,
3999 [P_UNPLUG_REMOTE] = { 0, 0, receive_UnplugRemote },
4a76b161
AG
4000 [P_DATA_REQUEST] = { 0, sizeof(struct p_block_req), receive_DataRequest },
4001 [P_RS_DATA_REQUEST] = { 0, sizeof(struct p_block_req), receive_DataRequest },
e658983a
AG
4002 [P_SYNC_PARAM] = { 1, 0, receive_SyncParam },
4003 [P_SYNC_PARAM89] = { 1, 0, receive_SyncParam },
4a76b161
AG
4004 [P_PROTOCOL] = { 1, sizeof(struct p_protocol), receive_protocol },
4005 [P_UUIDS] = { 0, sizeof(struct p_uuids), receive_uuids },
4006 [P_SIZES] = { 0, sizeof(struct p_sizes), receive_sizes },
4007 [P_STATE] = { 0, sizeof(struct p_state), receive_state },
4008 [P_STATE_CHG_REQ] = { 0, sizeof(struct p_req_state), receive_req_state },
4009 [P_SYNC_UUID] = { 0, sizeof(struct p_rs_uuid), receive_sync_uuid },
4010 [P_OV_REQUEST] = { 0, sizeof(struct p_block_req), receive_DataRequest },
4011 [P_OV_REPLY] = { 1, sizeof(struct p_block_req), receive_DataRequest },
4012 [P_CSUM_RS_REQUEST] = { 1, sizeof(struct p_block_req), receive_DataRequest },
4013 [P_DELAY_PROBE] = { 0, sizeof(struct p_delay_probe93), receive_skip },
4014 [P_OUT_OF_SYNC] = { 0, sizeof(struct p_block_desc), receive_out_of_sync },
4015 [P_CONN_ST_CHG_REQ] = { 0, sizeof(struct p_req_state), receive_req_conn_state },
b411b363
PR
4016};
4017
eefc2f7d 4018static void drbdd(struct drbd_tconn *tconn)
b411b363 4019{
77351055 4020 struct packet_info pi;
02918be2 4021 size_t shs; /* sub header size */
82bc0194 4022 int err;
b411b363 4023
eefc2f7d 4024 while (get_t_state(&tconn->receiver) == RUNNING) {
deebe195
AG
4025 struct data_cmd *cmd;
4026
eefc2f7d 4027 drbd_thread_current_set_cpu(&tconn->receiver);
69bc7bc3 4028 if (drbd_recv_header(tconn, &pi))
02918be2 4029 goto err_out;
b411b363 4030
deebe195 4031 cmd = &drbd_cmd_handler[pi.cmd];
4a76b161 4032 if (unlikely(pi.cmd >= ARRAY_SIZE(drbd_cmd_handler) || !cmd->fn)) {
eefc2f7d 4033 conn_err(tconn, "unknown packet type %d, l: %d!\n", pi.cmd, pi.size);
02918be2 4034 goto err_out;
0b33a916 4035 }
b411b363 4036
e658983a
AG
4037 shs = cmd->pkt_size;
4038 if (pi.size > shs && !cmd->expect_payload) {
eefc2f7d 4039 conn_err(tconn, "No payload expected %s l:%d\n", cmdname(pi.cmd), pi.size);
02918be2 4040 goto err_out;
b411b363 4041 }
b411b363 4042
c13f7e1a 4043 if (shs) {
e658983a 4044 err = drbd_recv_all_warn(tconn, pi.data, shs);
a5c31904 4045 if (err)
c13f7e1a 4046 goto err_out;
e2857216 4047 pi.size -= shs;
c13f7e1a
LE
4048 }
4049
4a76b161
AG
4050 err = cmd->fn(tconn, &pi);
4051 if (err) {
9f5bdc33
AG
4052 conn_err(tconn, "error receiving %s, e: %d l: %d!\n",
4053 cmdname(pi.cmd), err, pi.size);
02918be2 4054 goto err_out;
b411b363
PR
4055 }
4056 }
82bc0194 4057 return;
b411b363 4058
82bc0194
AG
4059 err_out:
4060 conn_request_state(tconn, NS(conn, C_PROTOCOL_ERROR), CS_HARD);
b411b363
PR
4061}
4062
0e29d163 4063void conn_flush_workqueue(struct drbd_tconn *tconn)
b411b363
PR
4064{
4065 struct drbd_wq_barrier barr;
4066
4067 barr.w.cb = w_prev_work_done;
0e29d163 4068 barr.w.tconn = tconn;
b411b363 4069 init_completion(&barr.done);
0e29d163 4070 drbd_queue_work(&tconn->data.work, &barr.w);
b411b363
PR
4071 wait_for_completion(&barr.done);
4072}
4073
360cc740 4074static void drbd_disconnect(struct drbd_tconn *tconn)
b411b363 4075{
bbeb641c 4076 enum drbd_conns oc;
b411b363 4077 int rv = SS_UNKNOWN_ERROR;
b411b363 4078
bbeb641c 4079 if (tconn->cstate == C_STANDALONE)
b411b363 4080 return;
b411b363
PR
4081
4082 /* asender does not clean up anything. it must not interfere, either */
360cc740
PR
4083 drbd_thread_stop(&tconn->asender);
4084 drbd_free_sock(tconn);
4085
4086 idr_for_each(&tconn->volumes, drbd_disconnected, tconn);
360cc740
PR
4087 conn_info(tconn, "Connection closed\n");
4088
cb703454
PR
4089 if (conn_highest_role(tconn) == R_PRIMARY && conn_highest_pdsk(tconn) >= D_UNKNOWN)
4090 conn_try_outdate_peer_async(tconn);
4091
360cc740 4092 spin_lock_irq(&tconn->req_lock);
bbeb641c
PR
4093 oc = tconn->cstate;
4094 if (oc >= C_UNCONNECTED)
4095 rv = _conn_request_state(tconn, NS(conn, C_UNCONNECTED), CS_VERBOSE);
4096
360cc740
PR
4097 spin_unlock_irq(&tconn->req_lock);
4098
bbeb641c 4099 if (oc == C_DISCONNECTING) {
360cc740
PR
4100 wait_event(tconn->net_cnt_wait, atomic_read(&tconn->net_cnt) == 0);
4101
4102 crypto_free_hash(tconn->cram_hmac_tfm);
4103 tconn->cram_hmac_tfm = NULL;
4104
4105 kfree(tconn->net_conf);
4106 tconn->net_conf = NULL;
bbeb641c 4107 conn_request_state(tconn, NS(conn, C_STANDALONE), CS_VERBOSE);
360cc740
PR
4108 }
4109}
4110
4111static int drbd_disconnected(int vnr, void *p, void *data)
4112{
4113 struct drbd_conf *mdev = (struct drbd_conf *)p;
4114 enum drbd_fencing_p fp;
4115 unsigned int i;
b411b363 4116
85719573 4117 /* wait for current activity to cease. */
87eeee41 4118 spin_lock_irq(&mdev->tconn->req_lock);
b411b363
PR
4119 _drbd_wait_ee_list_empty(mdev, &mdev->active_ee);
4120 _drbd_wait_ee_list_empty(mdev, &mdev->sync_ee);
4121 _drbd_wait_ee_list_empty(mdev, &mdev->read_ee);
87eeee41 4122 spin_unlock_irq(&mdev->tconn->req_lock);
b411b363
PR
4123
4124 /* We do not have data structures that would allow us to
4125 * get the rs_pending_cnt down to 0 again.
4126 * * On C_SYNC_TARGET we do not have any data structures describing
4127 * the pending RSDataRequest's we have sent.
4128 * * On C_SYNC_SOURCE there is no data structure that tracks
4129 * the P_RS_DATA_REPLY blocks that we sent to the SyncTarget.
4130 * And no, it is not the sum of the reference counts in the
4131 * resync_LRU. The resync_LRU tracks the whole operation including
4132 * the disk-IO, while the rs_pending_cnt only tracks the blocks
4133 * on the fly. */
4134 drbd_rs_cancel_all(mdev);
4135 mdev->rs_total = 0;
4136 mdev->rs_failed = 0;
4137 atomic_set(&mdev->rs_pending_cnt, 0);
4138 wake_up(&mdev->misc_wait);
4139
7fde2be9
PR
4140 del_timer(&mdev->request_timer);
4141
b411b363 4142 del_timer_sync(&mdev->resync_timer);
b411b363
PR
4143 resync_timer_fn((unsigned long)mdev);
4144
b411b363
PR
4145 /* wait for all w_e_end_data_req, w_e_end_rsdata_req, w_send_barrier,
4146 * w_make_resync_request etc. which may still be on the worker queue
4147 * to be "canceled" */
a21e9298 4148 drbd_flush_workqueue(mdev);
b411b363
PR
4149
4150 /* This also does reclaim_net_ee(). If we do this too early, we might
4151 * miss some resync ee and pages.*/
4152 drbd_process_done_ee(mdev);
4153
4154 kfree(mdev->p_uuid);
4155 mdev->p_uuid = NULL;
4156
2aebfabb 4157 if (!drbd_suspended(mdev))
2f5cdd0b 4158 tl_clear(mdev->tconn);
b411b363 4159
b411b363
PR
4160 drbd_md_sync(mdev);
4161
4162 fp = FP_DONT_CARE;
4163 if (get_ldev(mdev)) {
4164 fp = mdev->ldev->dc.fencing;
4165 put_ldev(mdev);
4166 }
4167
20ceb2b2
LE
4168 /* serialize with bitmap writeout triggered by the state change,
4169 * if any. */
4170 wait_event(mdev->misc_wait, !test_bit(BITMAP_IO, &mdev->flags));
4171
b411b363
PR
4172 /* tcp_close and release of sendpage pages can be deferred. I don't
4173 * want to use SO_LINGER, because apparently it can be deferred for
4174 * more than 20 seconds (longest time I checked).
4175 *
4176 * Actually we don't care for exactly when the network stack does its
4177 * put_page(), but release our reference on these pages right here.
4178 */
4179 i = drbd_release_ee(mdev, &mdev->net_ee);
4180 if (i)
4181 dev_info(DEV, "net_ee not empty, killed %u entries\n", i);
435f0740
LE
4182 i = atomic_read(&mdev->pp_in_use_by_net);
4183 if (i)
4184 dev_info(DEV, "pp_in_use_by_net = %d, expected 0\n", i);
b411b363
PR
4185 i = atomic_read(&mdev->pp_in_use);
4186 if (i)
45bb912b 4187 dev_info(DEV, "pp_in_use = %d, expected 0\n", i);
b411b363
PR
4188
4189 D_ASSERT(list_empty(&mdev->read_ee));
4190 D_ASSERT(list_empty(&mdev->active_ee));
4191 D_ASSERT(list_empty(&mdev->sync_ee));
4192 D_ASSERT(list_empty(&mdev->done_ee));
4193
4194 /* ok, no more ee's on the fly, it is safe to reset the epoch_size */
4195 atomic_set(&mdev->current_epoch->epoch_size, 0);
4196 D_ASSERT(list_empty(&mdev->current_epoch->list));
360cc740
PR
4197
4198 return 0;
b411b363
PR
4199}
4200
4201/*
4202 * We support PRO_VERSION_MIN to PRO_VERSION_MAX. The protocol version
4203 * we can agree on is stored in agreed_pro_version.
4204 *
4205 * feature flags and the reserved array should be enough room for future
4206 * enhancements of the handshake protocol, and possible plugins...
4207 *
4208 * for now, they are expected to be zero, but ignored.
4209 */
6038178e 4210static int drbd_send_features(struct drbd_tconn *tconn)
b411b363 4211{
9f5bdc33
AG
4212 struct drbd_socket *sock;
4213 struct p_connection_features *p;
b411b363 4214
9f5bdc33
AG
4215 sock = &tconn->data;
4216 p = conn_prepare_command(tconn, sock);
4217 if (!p)
e8d17b01 4218 return -EIO;
b411b363
PR
4219 memset(p, 0, sizeof(*p));
4220 p->protocol_min = cpu_to_be32(PRO_VERSION_MIN);
4221 p->protocol_max = cpu_to_be32(PRO_VERSION_MAX);
9f5bdc33 4222 return conn_send_command(tconn, sock, P_CONNECTION_FEATURES, sizeof(*p), NULL, 0);
b411b363
PR
4223}
4224
4225/*
4226 * return values:
4227 * 1 yes, we have a valid connection
4228 * 0 oops, did not work out, please try again
4229 * -1 peer talks different language,
4230 * no point in trying again, please go standalone.
4231 */
6038178e 4232static int drbd_do_features(struct drbd_tconn *tconn)
b411b363 4233{
65d11ed6 4234 /* ASSERT current == tconn->receiver ... */
e658983a
AG
4235 struct p_connection_features *p;
4236 const int expect = sizeof(struct p_connection_features);
77351055 4237 struct packet_info pi;
a5c31904 4238 int err;
b411b363 4239
6038178e 4240 err = drbd_send_features(tconn);
e8d17b01 4241 if (err)
b411b363
PR
4242 return 0;
4243
69bc7bc3
AG
4244 err = drbd_recv_header(tconn, &pi);
4245 if (err)
b411b363
PR
4246 return 0;
4247
6038178e
AG
4248 if (pi.cmd != P_CONNECTION_FEATURES) {
4249 conn_err(tconn, "expected ConnectionFeatures packet, received: %s (0x%04x)\n",
77351055 4250 cmdname(pi.cmd), pi.cmd);
b411b363
PR
4251 return -1;
4252 }
4253
77351055 4254 if (pi.size != expect) {
6038178e 4255 conn_err(tconn, "expected ConnectionFeatures length: %u, received: %u\n",
77351055 4256 expect, pi.size);
b411b363
PR
4257 return -1;
4258 }
4259
e658983a
AG
4260 p = pi.data;
4261 err = drbd_recv_all_warn(tconn, p, expect);
a5c31904 4262 if (err)
b411b363 4263 return 0;
b411b363 4264
b411b363
PR
4265 p->protocol_min = be32_to_cpu(p->protocol_min);
4266 p->protocol_max = be32_to_cpu(p->protocol_max);
4267 if (p->protocol_max == 0)
4268 p->protocol_max = p->protocol_min;
4269
4270 if (PRO_VERSION_MAX < p->protocol_min ||
4271 PRO_VERSION_MIN > p->protocol_max)
4272 goto incompat;
4273
65d11ed6 4274 tconn->agreed_pro_version = min_t(int, PRO_VERSION_MAX, p->protocol_max);
b411b363 4275
65d11ed6
PR
4276 conn_info(tconn, "Handshake successful: "
4277 "Agreed network protocol version %d\n", tconn->agreed_pro_version);
b411b363
PR
4278
4279 return 1;
4280
4281 incompat:
65d11ed6 4282 conn_err(tconn, "incompatible DRBD dialects: "
b411b363
PR
4283 "I support %d-%d, peer supports %d-%d\n",
4284 PRO_VERSION_MIN, PRO_VERSION_MAX,
4285 p->protocol_min, p->protocol_max);
4286 return -1;
4287}
4288
4289#if !defined(CONFIG_CRYPTO_HMAC) && !defined(CONFIG_CRYPTO_HMAC_MODULE)
13e6037d 4290static int drbd_do_auth(struct drbd_tconn *tconn)
b411b363
PR
4291{
4292 dev_err(DEV, "This kernel was build without CONFIG_CRYPTO_HMAC.\n");
4293 dev_err(DEV, "You need to disable 'cram-hmac-alg' in drbd.conf.\n");
b10d96cb 4294 return -1;
b411b363
PR
4295}
4296#else
4297#define CHALLENGE_LEN 64
b10d96cb
JT
4298
4299/* Return value:
4300 1 - auth succeeded,
4301 0 - failed, try again (network error),
4302 -1 - auth failed, don't try again.
4303*/
4304
13e6037d 4305static int drbd_do_auth(struct drbd_tconn *tconn)
b411b363 4306{
9f5bdc33 4307 struct drbd_socket *sock;
b411b363
PR
4308 char my_challenge[CHALLENGE_LEN]; /* 64 Bytes... */
4309 struct scatterlist sg;
4310 char *response = NULL;
4311 char *right_response = NULL;
4312 char *peers_ch = NULL;
13e6037d 4313 unsigned int key_len = strlen(tconn->net_conf->shared_secret);
b411b363
PR
4314 unsigned int resp_size;
4315 struct hash_desc desc;
77351055 4316 struct packet_info pi;
69bc7bc3 4317 int err, rv;
b411b363 4318
9f5bdc33
AG
4319 /* FIXME: Put the challenge/response into the preallocated socket buffer. */
4320
13e6037d 4321 desc.tfm = tconn->cram_hmac_tfm;
b411b363
PR
4322 desc.flags = 0;
4323
13e6037d
PR
4324 rv = crypto_hash_setkey(tconn->cram_hmac_tfm,
4325 (u8 *)tconn->net_conf->shared_secret, key_len);
b411b363 4326 if (rv) {
13e6037d 4327 conn_err(tconn, "crypto_hash_setkey() failed with %d\n", rv);
b10d96cb 4328 rv = -1;
b411b363
PR
4329 goto fail;
4330 }
4331
4332 get_random_bytes(my_challenge, CHALLENGE_LEN);
4333
9f5bdc33
AG
4334 sock = &tconn->data;
4335 if (!conn_prepare_command(tconn, sock)) {
4336 rv = 0;
4337 goto fail;
4338 }
e658983a 4339 rv = !conn_send_command(tconn, sock, P_AUTH_CHALLENGE, 0,
9f5bdc33 4340 my_challenge, CHALLENGE_LEN);
b411b363
PR
4341 if (!rv)
4342 goto fail;
4343
69bc7bc3
AG
4344 err = drbd_recv_header(tconn, &pi);
4345 if (err) {
4346 rv = 0;
b411b363 4347 goto fail;
69bc7bc3 4348 }
b411b363 4349
77351055 4350 if (pi.cmd != P_AUTH_CHALLENGE) {
13e6037d 4351 conn_err(tconn, "expected AuthChallenge packet, received: %s (0x%04x)\n",
77351055 4352 cmdname(pi.cmd), pi.cmd);
b411b363
PR
4353 rv = 0;
4354 goto fail;
4355 }
4356
77351055 4357 if (pi.size > CHALLENGE_LEN * 2) {
13e6037d 4358 conn_err(tconn, "expected AuthChallenge payload too big.\n");
b10d96cb 4359 rv = -1;
b411b363
PR
4360 goto fail;
4361 }
4362
77351055 4363 peers_ch = kmalloc(pi.size, GFP_NOIO);
b411b363 4364 if (peers_ch == NULL) {
13e6037d 4365 conn_err(tconn, "kmalloc of peers_ch failed\n");
b10d96cb 4366 rv = -1;
b411b363
PR
4367 goto fail;
4368 }
4369
a5c31904
AG
4370 err = drbd_recv_all_warn(tconn, peers_ch, pi.size);
4371 if (err) {
b411b363
PR
4372 rv = 0;
4373 goto fail;
4374 }
4375
13e6037d 4376 resp_size = crypto_hash_digestsize(tconn->cram_hmac_tfm);
b411b363
PR
4377 response = kmalloc(resp_size, GFP_NOIO);
4378 if (response == NULL) {
13e6037d 4379 conn_err(tconn, "kmalloc of response failed\n");
b10d96cb 4380 rv = -1;
b411b363
PR
4381 goto fail;
4382 }
4383
4384 sg_init_table(&sg, 1);
77351055 4385 sg_set_buf(&sg, peers_ch, pi.size);
b411b363
PR
4386
4387 rv = crypto_hash_digest(&desc, &sg, sg.length, response);
4388 if (rv) {
13e6037d 4389 conn_err(tconn, "crypto_hash_digest() failed with %d\n", rv);
b10d96cb 4390 rv = -1;
b411b363
PR
4391 goto fail;
4392 }
4393
9f5bdc33
AG
4394 if (!conn_prepare_command(tconn, sock)) {
4395 rv = 0;
4396 goto fail;
4397 }
e658983a 4398 rv = !conn_send_command(tconn, sock, P_AUTH_RESPONSE, 0,
9f5bdc33 4399 response, resp_size);
b411b363
PR
4400 if (!rv)
4401 goto fail;
4402
69bc7bc3
AG
4403 err = drbd_recv_header(tconn, &pi);
4404 if (err) {
4405 rv = 0;
b411b363 4406 goto fail;
69bc7bc3 4407 }
b411b363 4408
77351055 4409 if (pi.cmd != P_AUTH_RESPONSE) {
13e6037d 4410 conn_err(tconn, "expected AuthResponse packet, received: %s (0x%04x)\n",
77351055 4411 cmdname(pi.cmd), pi.cmd);
b411b363
PR
4412 rv = 0;
4413 goto fail;
4414 }
4415
77351055 4416 if (pi.size != resp_size) {
13e6037d 4417 conn_err(tconn, "expected AuthResponse payload of wrong size\n");
b411b363
PR
4418 rv = 0;
4419 goto fail;
4420 }
4421
a5c31904
AG
4422 err = drbd_recv_all_warn(tconn, response , resp_size);
4423 if (err) {
b411b363
PR
4424 rv = 0;
4425 goto fail;
4426 }
4427
4428 right_response = kmalloc(resp_size, GFP_NOIO);
2d1ee87d 4429 if (right_response == NULL) {
13e6037d 4430 conn_err(tconn, "kmalloc of right_response failed\n");
b10d96cb 4431 rv = -1;
b411b363
PR
4432 goto fail;
4433 }
4434
4435 sg_set_buf(&sg, my_challenge, CHALLENGE_LEN);
4436
4437 rv = crypto_hash_digest(&desc, &sg, sg.length, right_response);
4438 if (rv) {
13e6037d 4439 conn_err(tconn, "crypto_hash_digest() failed with %d\n", rv);
b10d96cb 4440 rv = -1;
b411b363
PR
4441 goto fail;
4442 }
4443
4444 rv = !memcmp(response, right_response, resp_size);
4445
4446 if (rv)
13e6037d
PR
4447 conn_info(tconn, "Peer authenticated using %d bytes of '%s' HMAC\n",
4448 resp_size, tconn->net_conf->cram_hmac_alg);
b10d96cb
JT
4449 else
4450 rv = -1;
b411b363
PR
4451
4452 fail:
4453 kfree(peers_ch);
4454 kfree(response);
4455 kfree(right_response);
4456
4457 return rv;
4458}
4459#endif
4460
4461int drbdd_init(struct drbd_thread *thi)
4462{
392c8801 4463 struct drbd_tconn *tconn = thi->tconn;
b411b363
PR
4464 int h;
4465
4d641dd7 4466 conn_info(tconn, "receiver (re)started\n");
b411b363
PR
4467
4468 do {
4d641dd7 4469 h = drbd_connect(tconn);
b411b363 4470 if (h == 0) {
4d641dd7 4471 drbd_disconnect(tconn);
20ee6390 4472 schedule_timeout_interruptible(HZ);
b411b363
PR
4473 }
4474 if (h == -1) {
4d641dd7 4475 conn_warn(tconn, "Discarding network configuration.\n");
bbeb641c 4476 conn_request_state(tconn, NS(conn, C_DISCONNECTING), CS_HARD);
b411b363
PR
4477 }
4478 } while (h == 0);
4479
4480 if (h > 0) {
4d641dd7
PR
4481 if (get_net_conf(tconn)) {
4482 drbdd(tconn);
4483 put_net_conf(tconn);
b411b363
PR
4484 }
4485 }
4486
4d641dd7 4487 drbd_disconnect(tconn);
b411b363 4488
4d641dd7 4489 conn_info(tconn, "receiver terminated\n");
b411b363
PR
4490 return 0;
4491}
4492
4493/* ********* acknowledge sender ******** */
4494
e05e1e59 4495static int got_conn_RqSReply(struct drbd_tconn *tconn, struct packet_info *pi)
e4f78ede 4496{
e658983a 4497 struct p_req_state_reply *p = pi->data;
e4f78ede
PR
4498 int retcode = be32_to_cpu(p->retcode);
4499
4500 if (retcode >= SS_SUCCESS) {
4501 set_bit(CONN_WD_ST_CHG_OKAY, &tconn->flags);
4502 } else {
4503 set_bit(CONN_WD_ST_CHG_FAIL, &tconn->flags);
4504 conn_err(tconn, "Requested state change failed by peer: %s (%d)\n",
4505 drbd_set_st_err_str(retcode), retcode);
4506 }
4507 wake_up(&tconn->ping_wait);
4508
4509 return true;
4510}
4511
1952e916 4512static int got_RqSReply(struct drbd_tconn *tconn, struct packet_info *pi)
b411b363 4513{
1952e916 4514 struct drbd_conf *mdev;
e658983a 4515 struct p_req_state_reply *p = pi->data;
b411b363
PR
4516 int retcode = be32_to_cpu(p->retcode);
4517
1952e916
AG
4518 mdev = vnr_to_mdev(tconn, pi->vnr);
4519 if (!mdev)
4520 return false;
4521
e4f78ede
PR
4522 if (retcode >= SS_SUCCESS) {
4523 set_bit(CL_ST_CHG_SUCCESS, &mdev->flags);
4524 } else {
4525 set_bit(CL_ST_CHG_FAIL, &mdev->flags);
4526 dev_err(DEV, "Requested state change failed by peer: %s (%d)\n",
4527 drbd_set_st_err_str(retcode), retcode);
b411b363 4528 }
e4f78ede
PR
4529 wake_up(&mdev->state_wait);
4530
81e84650 4531 return true;
b411b363
PR
4532}
4533
e05e1e59 4534static int got_Ping(struct drbd_tconn *tconn, struct packet_info *pi)
b411b363 4535{
a17647aa 4536 return !drbd_send_ping_ack(tconn);
b411b363
PR
4537
4538}
4539
e05e1e59 4540static int got_PingAck(struct drbd_tconn *tconn, struct packet_info *pi)
b411b363
PR
4541{
4542 /* restore idle timeout */
2a67d8b9
PR
4543 tconn->meta.socket->sk->sk_rcvtimeo = tconn->net_conf->ping_int*HZ;
4544 if (!test_and_set_bit(GOT_PING_ACK, &tconn->flags))
4545 wake_up(&tconn->ping_wait);
b411b363 4546
81e84650 4547 return true;
b411b363
PR
4548}
4549
1952e916 4550static int got_IsInSync(struct drbd_tconn *tconn, struct packet_info *pi)
b411b363 4551{
1952e916 4552 struct drbd_conf *mdev;
e658983a 4553 struct p_block_ack *p = pi->data;
b411b363
PR
4554 sector_t sector = be64_to_cpu(p->sector);
4555 int blksize = be32_to_cpu(p->blksize);
4556
1952e916
AG
4557 mdev = vnr_to_mdev(tconn, pi->vnr);
4558 if (!mdev)
4559 return false;
4560
31890f4a 4561 D_ASSERT(mdev->tconn->agreed_pro_version >= 89);
b411b363
PR
4562
4563 update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4564
1d53f09e
LE
4565 if (get_ldev(mdev)) {
4566 drbd_rs_complete_io(mdev, sector);
4567 drbd_set_in_sync(mdev, sector, blksize);
4568 /* rs_same_csums is supposed to count in units of BM_BLOCK_SIZE */
4569 mdev->rs_same_csum += (blksize >> BM_BLOCK_SHIFT);
4570 put_ldev(mdev);
4571 }
b411b363 4572 dec_rs_pending(mdev);
778f271d 4573 atomic_add(blksize >> 9, &mdev->rs_sect_in);
b411b363 4574
81e84650 4575 return true;
b411b363
PR
4576}
4577
bc9c5c41
AG
4578static int
4579validate_req_change_req_state(struct drbd_conf *mdev, u64 id, sector_t sector,
4580 struct rb_root *root, const char *func,
4581 enum drbd_req_event what, bool missing_ok)
b411b363
PR
4582{
4583 struct drbd_request *req;
4584 struct bio_and_error m;
4585
87eeee41 4586 spin_lock_irq(&mdev->tconn->req_lock);
bc9c5c41 4587 req = find_request(mdev, root, id, sector, missing_ok, func);
b411b363 4588 if (unlikely(!req)) {
87eeee41 4589 spin_unlock_irq(&mdev->tconn->req_lock);
81e84650 4590 return false;
b411b363
PR
4591 }
4592 __req_mod(req, what, &m);
87eeee41 4593 spin_unlock_irq(&mdev->tconn->req_lock);
b411b363
PR
4594
4595 if (m.bio)
4596 complete_master_bio(mdev, &m);
81e84650 4597 return true;
b411b363
PR
4598}
4599
1952e916 4600static int got_BlockAck(struct drbd_tconn *tconn, struct packet_info *pi)
b411b363 4601{
1952e916 4602 struct drbd_conf *mdev;
e658983a 4603 struct p_block_ack *p = pi->data;
b411b363
PR
4604 sector_t sector = be64_to_cpu(p->sector);
4605 int blksize = be32_to_cpu(p->blksize);
4606 enum drbd_req_event what;
4607
1952e916
AG
4608 mdev = vnr_to_mdev(tconn, pi->vnr);
4609 if (!mdev)
4610 return false;
4611
b411b363
PR
4612 update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4613
579b57ed 4614 if (p->block_id == ID_SYNCER) {
b411b363
PR
4615 drbd_set_in_sync(mdev, sector, blksize);
4616 dec_rs_pending(mdev);
81e84650 4617 return true;
b411b363 4618 }
e05e1e59 4619 switch (pi->cmd) {
b411b363 4620 case P_RS_WRITE_ACK:
89e58e75 4621 D_ASSERT(mdev->tconn->net_conf->wire_protocol == DRBD_PROT_C);
8554df1c 4622 what = WRITE_ACKED_BY_PEER_AND_SIS;
b411b363
PR
4623 break;
4624 case P_WRITE_ACK:
89e58e75 4625 D_ASSERT(mdev->tconn->net_conf->wire_protocol == DRBD_PROT_C);
8554df1c 4626 what = WRITE_ACKED_BY_PEER;
b411b363
PR
4627 break;
4628 case P_RECV_ACK:
89e58e75 4629 D_ASSERT(mdev->tconn->net_conf->wire_protocol == DRBD_PROT_B);
8554df1c 4630 what = RECV_ACKED_BY_PEER;
b411b363 4631 break;
7be8da07 4632 case P_DISCARD_WRITE:
89e58e75 4633 D_ASSERT(mdev->tconn->net_conf->wire_protocol == DRBD_PROT_C);
7be8da07
AG
4634 what = DISCARD_WRITE;
4635 break;
4636 case P_RETRY_WRITE:
4637 D_ASSERT(mdev->tconn->net_conf->wire_protocol == DRBD_PROT_C);
4638 what = POSTPONE_WRITE;
b411b363
PR
4639 break;
4640 default:
4641 D_ASSERT(0);
81e84650 4642 return false;
b411b363
PR
4643 }
4644
4645 return validate_req_change_req_state(mdev, p->block_id, sector,
bc9c5c41
AG
4646 &mdev->write_requests, __func__,
4647 what, false);
b411b363
PR
4648}
4649
1952e916 4650static int got_NegAck(struct drbd_tconn *tconn, struct packet_info *pi)
b411b363 4651{
1952e916 4652 struct drbd_conf *mdev;
e658983a 4653 struct p_block_ack *p = pi->data;
b411b363 4654 sector_t sector = be64_to_cpu(p->sector);
2deb8336 4655 int size = be32_to_cpu(p->blksize);
1952e916
AG
4656 bool missing_ok = tconn->net_conf->wire_protocol == DRBD_PROT_A ||
4657 tconn->net_conf->wire_protocol == DRBD_PROT_B;
c3afd8f5 4658 bool found;
b411b363 4659
1952e916
AG
4660 mdev = vnr_to_mdev(tconn, pi->vnr);
4661 if (!mdev)
4662 return false;
4663
b411b363
PR
4664 update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4665
579b57ed 4666 if (p->block_id == ID_SYNCER) {
b411b363
PR
4667 dec_rs_pending(mdev);
4668 drbd_rs_failed_io(mdev, sector, size);
81e84650 4669 return true;
b411b363 4670 }
2deb8336 4671
c3afd8f5 4672 found = validate_req_change_req_state(mdev, p->block_id, sector,
bc9c5c41 4673 &mdev->write_requests, __func__,
8554df1c 4674 NEG_ACKED, missing_ok);
c3afd8f5
AG
4675 if (!found) {
4676 /* Protocol A has no P_WRITE_ACKs, but has P_NEG_ACKs.
4677 The master bio might already be completed, therefore the
4678 request is no longer in the collision hash. */
4679 /* In Protocol B we might already have got a P_RECV_ACK
4680 but then get a P_NEG_ACK afterwards. */
4681 if (!missing_ok)
2deb8336 4682 return false;
c3afd8f5 4683 drbd_set_out_of_sync(mdev, sector, size);
2deb8336 4684 }
2deb8336 4685 return true;
b411b363
PR
4686}
4687
1952e916 4688static int got_NegDReply(struct drbd_tconn *tconn, struct packet_info *pi)
b411b363 4689{
1952e916 4690 struct drbd_conf *mdev;
e658983a 4691 struct p_block_ack *p = pi->data;
b411b363
PR
4692 sector_t sector = be64_to_cpu(p->sector);
4693
1952e916
AG
4694 mdev = vnr_to_mdev(tconn, pi->vnr);
4695 if (!mdev)
4696 return false;
4697
b411b363 4698 update_peer_seq(mdev, be32_to_cpu(p->seq_num));
7be8da07 4699
b411b363
PR
4700 dev_err(DEV, "Got NegDReply; Sector %llus, len %u; Fail original request.\n",
4701 (unsigned long long)sector, be32_to_cpu(p->blksize));
4702
4703 return validate_req_change_req_state(mdev, p->block_id, sector,
bc9c5c41 4704 &mdev->read_requests, __func__,
8554df1c 4705 NEG_ACKED, false);
b411b363
PR
4706}
4707
1952e916 4708static int got_NegRSDReply(struct drbd_tconn *tconn, struct packet_info *pi)
b411b363 4709{
1952e916 4710 struct drbd_conf *mdev;
b411b363
PR
4711 sector_t sector;
4712 int size;
e658983a 4713 struct p_block_ack *p = pi->data;
1952e916
AG
4714
4715 mdev = vnr_to_mdev(tconn, pi->vnr);
4716 if (!mdev)
4717 return false;
b411b363
PR
4718
4719 sector = be64_to_cpu(p->sector);
4720 size = be32_to_cpu(p->blksize);
b411b363
PR
4721
4722 update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4723
4724 dec_rs_pending(mdev);
4725
4726 if (get_ldev_if_state(mdev, D_FAILED)) {
4727 drbd_rs_complete_io(mdev, sector);
e05e1e59 4728 switch (pi->cmd) {
d612d309
PR
4729 case P_NEG_RS_DREPLY:
4730 drbd_rs_failed_io(mdev, sector, size);
4731 case P_RS_CANCEL:
4732 break;
4733 default:
4734 D_ASSERT(0);
4735 put_ldev(mdev);
4736 return false;
4737 }
b411b363
PR
4738 put_ldev(mdev);
4739 }
4740
81e84650 4741 return true;
b411b363
PR
4742}
4743
1952e916 4744static int got_BarrierAck(struct drbd_tconn *tconn, struct packet_info *pi)
b411b363 4745{
1952e916 4746 struct drbd_conf *mdev;
e658983a 4747 struct p_barrier_ack *p = pi->data;
1952e916
AG
4748
4749 mdev = vnr_to_mdev(tconn, pi->vnr);
4750 if (!mdev)
4751 return false;
b411b363 4752
2f5cdd0b 4753 tl_release(mdev->tconn, p->barrier, be32_to_cpu(p->set_size));
b411b363 4754
c4752ef1
PR
4755 if (mdev->state.conn == C_AHEAD &&
4756 atomic_read(&mdev->ap_in_flight) == 0 &&
370a43e7
PR
4757 !test_and_set_bit(AHEAD_TO_SYNC_SOURCE, &mdev->current_epoch->flags)) {
4758 mdev->start_resync_timer.expires = jiffies + HZ;
4759 add_timer(&mdev->start_resync_timer);
c4752ef1
PR
4760 }
4761
81e84650 4762 return true;
b411b363
PR
4763}
4764
1952e916 4765static int got_OVResult(struct drbd_tconn *tconn, struct packet_info *pi)
b411b363 4766{
1952e916 4767 struct drbd_conf *mdev;
e658983a 4768 struct p_block_ack *p = pi->data;
b411b363
PR
4769 struct drbd_work *w;
4770 sector_t sector;
4771 int size;
4772
1952e916
AG
4773 mdev = vnr_to_mdev(tconn, pi->vnr);
4774 if (!mdev)
4775 return false;
4776
b411b363
PR
4777 sector = be64_to_cpu(p->sector);
4778 size = be32_to_cpu(p->blksize);
4779
4780 update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4781
4782 if (be64_to_cpu(p->block_id) == ID_OUT_OF_SYNC)
8f7bed77 4783 drbd_ov_out_of_sync_found(mdev, sector, size);
b411b363 4784 else
8f7bed77 4785 ov_out_of_sync_print(mdev);
b411b363 4786
1d53f09e 4787 if (!get_ldev(mdev))
81e84650 4788 return true;
1d53f09e 4789
b411b363
PR
4790 drbd_rs_complete_io(mdev, sector);
4791 dec_rs_pending(mdev);
4792
ea5442af
LE
4793 --mdev->ov_left;
4794
4795 /* let's advance progress step marks only for every other megabyte */
4796 if ((mdev->ov_left & 0x200) == 0x200)
4797 drbd_advance_rs_marks(mdev, mdev->ov_left);
4798
4799 if (mdev->ov_left == 0) {
b411b363
PR
4800 w = kmalloc(sizeof(*w), GFP_NOIO);
4801 if (w) {
4802 w->cb = w_ov_finished;
a21e9298 4803 w->mdev = mdev;
e42325a5 4804 drbd_queue_work_front(&mdev->tconn->data.work, w);
b411b363
PR
4805 } else {
4806 dev_err(DEV, "kmalloc(w) failed.");
8f7bed77 4807 ov_out_of_sync_print(mdev);
b411b363
PR
4808 drbd_resync_finished(mdev);
4809 }
4810 }
1d53f09e 4811 put_ldev(mdev);
81e84650 4812 return true;
b411b363
PR
4813}
4814
1952e916 4815static int got_skip(struct drbd_tconn *tconn, struct packet_info *pi)
0ced55a3 4816{
81e84650 4817 return true;
0ced55a3
PR
4818}
4819
32862ec7
PR
4820static int tconn_process_done_ee(struct drbd_tconn *tconn)
4821{
082a3439
PR
4822 struct drbd_conf *mdev;
4823 int i, not_empty = 0;
32862ec7
PR
4824
4825 do {
4826 clear_bit(SIGNAL_ASENDER, &tconn->flags);
4827 flush_signals(current);
082a3439 4828 idr_for_each_entry(&tconn->volumes, mdev, i) {
e2b3032b 4829 if (drbd_process_done_ee(mdev))
082a3439
PR
4830 return 1; /* error */
4831 }
32862ec7 4832 set_bit(SIGNAL_ASENDER, &tconn->flags);
082a3439
PR
4833
4834 spin_lock_irq(&tconn->req_lock);
4835 idr_for_each_entry(&tconn->volumes, mdev, i) {
4836 not_empty = !list_empty(&mdev->done_ee);
4837 if (not_empty)
4838 break;
4839 }
4840 spin_unlock_irq(&tconn->req_lock);
32862ec7
PR
4841 } while (not_empty);
4842
4843 return 0;
4844}
4845
7201b972
AG
4846struct asender_cmd {
4847 size_t pkt_size;
1952e916 4848 int (*fn)(struct drbd_tconn *tconn, struct packet_info *);
7201b972
AG
4849};
4850
4851static struct asender_cmd asender_tbl[] = {
e658983a
AG
4852 [P_PING] = { 0, got_Ping },
4853 [P_PING_ACK] = { 0, got_PingAck },
1952e916
AG
4854 [P_RECV_ACK] = { sizeof(struct p_block_ack), got_BlockAck },
4855 [P_WRITE_ACK] = { sizeof(struct p_block_ack), got_BlockAck },
4856 [P_RS_WRITE_ACK] = { sizeof(struct p_block_ack), got_BlockAck },
4857 [P_DISCARD_WRITE] = { sizeof(struct p_block_ack), got_BlockAck },
4858 [P_NEG_ACK] = { sizeof(struct p_block_ack), got_NegAck },
4859 [P_NEG_DREPLY] = { sizeof(struct p_block_ack), got_NegDReply },
4860 [P_NEG_RS_DREPLY] = { sizeof(struct p_block_ack), got_NegRSDReply },
4861 [P_OV_RESULT] = { sizeof(struct p_block_ack), got_OVResult },
4862 [P_BARRIER_ACK] = { sizeof(struct p_barrier_ack), got_BarrierAck },
4863 [P_STATE_CHG_REPLY] = { sizeof(struct p_req_state_reply), got_RqSReply },
4864 [P_RS_IS_IN_SYNC] = { sizeof(struct p_block_ack), got_IsInSync },
4865 [P_DELAY_PROBE] = { sizeof(struct p_delay_probe93), got_skip },
4866 [P_RS_CANCEL] = { sizeof(struct p_block_ack), got_NegRSDReply },
4867 [P_CONN_ST_CHG_REPLY]={ sizeof(struct p_req_state_reply), got_conn_RqSReply },
4868 [P_RETRY_WRITE] = { sizeof(struct p_block_ack), got_BlockAck },
7201b972
AG
4869};
4870
b411b363
PR
4871int drbd_asender(struct drbd_thread *thi)
4872{
392c8801 4873 struct drbd_tconn *tconn = thi->tconn;
b411b363 4874 struct asender_cmd *cmd = NULL;
77351055 4875 struct packet_info pi;
257d0af6 4876 int rv;
e658983a 4877 void *buf = tconn->meta.rbuf;
b411b363 4878 int received = 0;
52b061a4
AG
4879 unsigned int header_size = drbd_header_size(tconn);
4880 int expect = header_size;
f36af18c 4881 int ping_timeout_active = 0;
b411b363 4882
b411b363
PR
4883 current->policy = SCHED_RR; /* Make this a realtime task! */
4884 current->rt_priority = 2; /* more important than all other tasks */
4885
e77a0a5c 4886 while (get_t_state(thi) == RUNNING) {
80822284 4887 drbd_thread_current_set_cpu(thi);
32862ec7 4888 if (test_and_clear_bit(SEND_PING, &tconn->flags)) {
a17647aa 4889 if (drbd_send_ping(tconn)) {
32862ec7 4890 conn_err(tconn, "drbd_send_ping has failed\n");
841ce241
AG
4891 goto reconnect;
4892 }
32862ec7
PR
4893 tconn->meta.socket->sk->sk_rcvtimeo =
4894 tconn->net_conf->ping_timeo*HZ/10;
f36af18c 4895 ping_timeout_active = 1;
b411b363
PR
4896 }
4897
32862ec7
PR
4898 /* TODO: conditionally cork; it may hurt latency if we cork without
4899 much to send */
4900 if (!tconn->net_conf->no_cork)
4901 drbd_tcp_cork(tconn->meta.socket);
082a3439
PR
4902 if (tconn_process_done_ee(tconn)) {
4903 conn_err(tconn, "tconn_process_done_ee() failed\n");
32862ec7 4904 goto reconnect;
082a3439 4905 }
b411b363 4906 /* but unconditionally uncork unless disabled */
32862ec7
PR
4907 if (!tconn->net_conf->no_cork)
4908 drbd_tcp_uncork(tconn->meta.socket);
b411b363
PR
4909
4910 /* short circuit, recv_msg would return EINTR anyways. */
4911 if (signal_pending(current))
4912 continue;
4913
32862ec7
PR
4914 rv = drbd_recv_short(tconn->meta.socket, buf, expect-received, 0);
4915 clear_bit(SIGNAL_ASENDER, &tconn->flags);
b411b363
PR
4916
4917 flush_signals(current);
4918
4919 /* Note:
4920 * -EINTR (on meta) we got a signal
4921 * -EAGAIN (on meta) rcvtimeo expired
4922 * -ECONNRESET other side closed the connection
4923 * -ERESTARTSYS (on data) we got a signal
4924 * rv < 0 other than above: unexpected error!
4925 * rv == expected: full header or command
4926 * rv < expected: "woken" by signal during receive
4927 * rv == 0 : "connection shut down by peer"
4928 */
4929 if (likely(rv > 0)) {
4930 received += rv;
4931 buf += rv;
4932 } else if (rv == 0) {
32862ec7 4933 conn_err(tconn, "meta connection shut down by peer.\n");
b411b363
PR
4934 goto reconnect;
4935 } else if (rv == -EAGAIN) {
cb6518cb
LE
4936 /* If the data socket received something meanwhile,
4937 * that is good enough: peer is still alive. */
32862ec7
PR
4938 if (time_after(tconn->last_received,
4939 jiffies - tconn->meta.socket->sk->sk_rcvtimeo))
cb6518cb 4940 continue;
f36af18c 4941 if (ping_timeout_active) {
32862ec7 4942 conn_err(tconn, "PingAck did not arrive in time.\n");
b411b363
PR
4943 goto reconnect;
4944 }
32862ec7 4945 set_bit(SEND_PING, &tconn->flags);
b411b363
PR
4946 continue;
4947 } else if (rv == -EINTR) {
4948 continue;
4949 } else {
32862ec7 4950 conn_err(tconn, "sock_recvmsg returned %d\n", rv);
b411b363
PR
4951 goto reconnect;
4952 }
4953
4954 if (received == expect && cmd == NULL) {
e658983a 4955 if (decode_header(tconn, tconn->meta.rbuf, &pi))
b411b363 4956 goto reconnect;
7201b972 4957 cmd = &asender_tbl[pi.cmd];
1952e916 4958 if (pi.cmd >= ARRAY_SIZE(asender_tbl) || !cmd->fn) {
32862ec7 4959 conn_err(tconn, "unknown command %d on meta (l: %d)\n",
77351055 4960 pi.cmd, pi.size);
b411b363
PR
4961 goto disconnect;
4962 }
e658983a 4963 expect = header_size + cmd->pkt_size;
52b061a4 4964 if (pi.size != expect - header_size) {
32862ec7 4965 conn_err(tconn, "Wrong packet size on meta (c: %d, l: %d)\n",
77351055 4966 pi.cmd, pi.size);
b411b363 4967 goto reconnect;
257d0af6 4968 }
b411b363
PR
4969 }
4970 if (received == expect) {
a4fbda8e
PR
4971 bool rv;
4972
1952e916
AG
4973 rv = cmd->fn(tconn, &pi);
4974 if (!rv) {
4975 conn_err(tconn, "%pf failed\n", cmd->fn);
b411b363 4976 goto reconnect;
1952e916 4977 }
b411b363 4978
a4fbda8e
PR
4979 tconn->last_received = jiffies;
4980
f36af18c
LE
4981 /* the idle_timeout (ping-int)
4982 * has been restored in got_PingAck() */
7201b972 4983 if (cmd == &asender_tbl[P_PING_ACK])
f36af18c
LE
4984 ping_timeout_active = 0;
4985
e658983a 4986 buf = tconn->meta.rbuf;
b411b363 4987 received = 0;
52b061a4 4988 expect = header_size;
b411b363
PR
4989 cmd = NULL;
4990 }
4991 }
4992
4993 if (0) {
4994reconnect:
bbeb641c 4995 conn_request_state(tconn, NS(conn, C_NETWORK_FAILURE), CS_HARD);
b411b363
PR
4996 }
4997 if (0) {
4998disconnect:
bbeb641c 4999 conn_request_state(tconn, NS(conn, C_DISCONNECTING), CS_HARD);
b411b363 5000 }
32862ec7 5001 clear_bit(SIGNAL_ASENDER, &tconn->flags);
b411b363 5002
32862ec7 5003 conn_info(tconn, "asender terminated\n");
b411b363
PR
5004
5005 return 0;
5006}
This page took 0.4712 seconds and 5 git commands to generate.