drbd: drbd_recv_header(): Return 0 upon success and an error code otherwise
[deliverable/linux.git] / drivers / block / drbd / drbd_receiver.c
CommitLineData
b411b363
PR
1/*
2 drbd_receiver.c
3
4 This file is part of DRBD by Philipp Reisner and Lars Ellenberg.
5
6 Copyright (C) 2001-2008, LINBIT Information Technologies GmbH.
7 Copyright (C) 1999-2008, Philipp Reisner <philipp.reisner@linbit.com>.
8 Copyright (C) 2002-2008, Lars Ellenberg <lars.ellenberg@linbit.com>.
9
10 drbd is free software; you can redistribute it and/or modify
11 it under the terms of the GNU General Public License as published by
12 the Free Software Foundation; either version 2, or (at your option)
13 any later version.
14
15 drbd is distributed in the hope that it will be useful,
16 but WITHOUT ANY WARRANTY; without even the implied warranty of
17 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18 GNU General Public License for more details.
19
20 You should have received a copy of the GNU General Public License
21 along with drbd; see the file COPYING. If not, write to
22 the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
23 */
24
25
b411b363
PR
26#include <linux/module.h>
27
28#include <asm/uaccess.h>
29#include <net/sock.h>
30
b411b363
PR
31#include <linux/drbd.h>
32#include <linux/fs.h>
33#include <linux/file.h>
34#include <linux/in.h>
35#include <linux/mm.h>
36#include <linux/memcontrol.h>
37#include <linux/mm_inline.h>
38#include <linux/slab.h>
b411b363
PR
39#include <linux/pkt_sched.h>
40#define __KERNEL_SYSCALLS__
41#include <linux/unistd.h>
42#include <linux/vmalloc.h>
43#include <linux/random.h>
b411b363
PR
44#include <linux/string.h>
45#include <linux/scatterlist.h>
46#include "drbd_int.h"
b411b363
PR
47#include "drbd_req.h"
48
49#include "drbd_vli.h"
50
77351055
PR
51struct packet_info {
52 enum drbd_packet cmd;
53 int size;
54 int vnr;
55};
56
b411b363
PR
57enum finish_epoch {
58 FE_STILL_LIVE,
59 FE_DESTROYED,
60 FE_RECYCLED,
61};
62
a4fbda8e
PR
63enum mdev_or_conn {
64 MDEV,
65 CONN,
66};
67
65d11ed6 68static int drbd_do_handshake(struct drbd_tconn *tconn);
13e6037d 69static int drbd_do_auth(struct drbd_tconn *tconn);
360cc740 70static int drbd_disconnected(int vnr, void *p, void *data);
b411b363
PR
71
72static enum finish_epoch drbd_may_finish_epoch(struct drbd_conf *, struct drbd_epoch *, enum epoch_event);
99920dc5 73static int e_end_block(struct drbd_work *, int);
b411b363 74
b411b363
PR
75
76#define GFP_TRY (__GFP_HIGHMEM | __GFP_NOWARN)
77
45bb912b
LE
78/*
79 * some helper functions to deal with single linked page lists,
80 * page->private being our "next" pointer.
81 */
82
83/* If at least n pages are linked at head, get n pages off.
84 * Otherwise, don't modify head, and return NULL.
85 * Locking is the responsibility of the caller.
86 */
87static struct page *page_chain_del(struct page **head, int n)
88{
89 struct page *page;
90 struct page *tmp;
91
92 BUG_ON(!n);
93 BUG_ON(!head);
94
95 page = *head;
23ce4227
PR
96
97 if (!page)
98 return NULL;
99
45bb912b
LE
100 while (page) {
101 tmp = page_chain_next(page);
102 if (--n == 0)
103 break; /* found sufficient pages */
104 if (tmp == NULL)
105 /* insufficient pages, don't use any of them. */
106 return NULL;
107 page = tmp;
108 }
109
110 /* add end of list marker for the returned list */
111 set_page_private(page, 0);
112 /* actual return value, and adjustment of head */
113 page = *head;
114 *head = tmp;
115 return page;
116}
117
118/* may be used outside of locks to find the tail of a (usually short)
119 * "private" page chain, before adding it back to a global chain head
120 * with page_chain_add() under a spinlock. */
121static struct page *page_chain_tail(struct page *page, int *len)
122{
123 struct page *tmp;
124 int i = 1;
125 while ((tmp = page_chain_next(page)))
126 ++i, page = tmp;
127 if (len)
128 *len = i;
129 return page;
130}
131
132static int page_chain_free(struct page *page)
133{
134 struct page *tmp;
135 int i = 0;
136 page_chain_for_each_safe(page, tmp) {
137 put_page(page);
138 ++i;
139 }
140 return i;
141}
142
143static void page_chain_add(struct page **head,
144 struct page *chain_first, struct page *chain_last)
145{
146#if 1
147 struct page *tmp;
148 tmp = page_chain_tail(chain_first, NULL);
149 BUG_ON(tmp != chain_last);
150#endif
151
152 /* add chain to head */
153 set_page_private(chain_last, (unsigned long)*head);
154 *head = chain_first;
155}
156
157static struct page *drbd_pp_first_pages_or_try_alloc(struct drbd_conf *mdev, int number)
b411b363
PR
158{
159 struct page *page = NULL;
45bb912b
LE
160 struct page *tmp = NULL;
161 int i = 0;
b411b363
PR
162
163 /* Yes, testing drbd_pp_vacant outside the lock is racy.
164 * So what. It saves a spin_lock. */
45bb912b 165 if (drbd_pp_vacant >= number) {
b411b363 166 spin_lock(&drbd_pp_lock);
45bb912b
LE
167 page = page_chain_del(&drbd_pp_pool, number);
168 if (page)
169 drbd_pp_vacant -= number;
b411b363 170 spin_unlock(&drbd_pp_lock);
45bb912b
LE
171 if (page)
172 return page;
b411b363 173 }
45bb912b 174
b411b363
PR
175 /* GFP_TRY, because we must not cause arbitrary write-out: in a DRBD
176 * "criss-cross" setup, that might cause write-out on some other DRBD,
177 * which in turn might block on the other node at this very place. */
45bb912b
LE
178 for (i = 0; i < number; i++) {
179 tmp = alloc_page(GFP_TRY);
180 if (!tmp)
181 break;
182 set_page_private(tmp, (unsigned long)page);
183 page = tmp;
184 }
185
186 if (i == number)
187 return page;
188
189 /* Not enough pages immediately available this time.
190 * No need to jump around here, drbd_pp_alloc will retry this
191 * function "soon". */
192 if (page) {
193 tmp = page_chain_tail(page, NULL);
194 spin_lock(&drbd_pp_lock);
195 page_chain_add(&drbd_pp_pool, page, tmp);
196 drbd_pp_vacant += i;
197 spin_unlock(&drbd_pp_lock);
198 }
199 return NULL;
b411b363
PR
200}
201
b411b363
PR
202static void reclaim_net_ee(struct drbd_conf *mdev, struct list_head *to_be_freed)
203{
db830c46 204 struct drbd_peer_request *peer_req;
b411b363
PR
205 struct list_head *le, *tle;
206
207 /* The EEs are always appended to the end of the list. Since
208 they are sent in order over the wire, they have to finish
209 in order. As soon as we see the first not finished we can
210 stop to examine the list... */
211
212 list_for_each_safe(le, tle, &mdev->net_ee) {
db830c46
AG
213 peer_req = list_entry(le, struct drbd_peer_request, w.list);
214 if (drbd_ee_has_active_page(peer_req))
b411b363
PR
215 break;
216 list_move(le, to_be_freed);
217 }
218}
219
220static void drbd_kick_lo_and_reclaim_net(struct drbd_conf *mdev)
221{
222 LIST_HEAD(reclaimed);
db830c46 223 struct drbd_peer_request *peer_req, *t;
b411b363 224
87eeee41 225 spin_lock_irq(&mdev->tconn->req_lock);
b411b363 226 reclaim_net_ee(mdev, &reclaimed);
87eeee41 227 spin_unlock_irq(&mdev->tconn->req_lock);
b411b363 228
db830c46
AG
229 list_for_each_entry_safe(peer_req, t, &reclaimed, w.list)
230 drbd_free_net_ee(mdev, peer_req);
b411b363
PR
231}
232
233/**
45bb912b 234 * drbd_pp_alloc() - Returns @number pages, retries forever (or until signalled)
b411b363 235 * @mdev: DRBD device.
45bb912b
LE
236 * @number: number of pages requested
237 * @retry: whether to retry, if not enough pages are available right now
238 *
239 * Tries to allocate number pages, first from our own page pool, then from
240 * the kernel, unless this allocation would exceed the max_buffers setting.
241 * Possibly retry until DRBD frees sufficient pages somewhere else.
b411b363 242 *
45bb912b 243 * Returns a page chain linked via page->private.
b411b363 244 */
45bb912b 245static struct page *drbd_pp_alloc(struct drbd_conf *mdev, unsigned number, bool retry)
b411b363
PR
246{
247 struct page *page = NULL;
248 DEFINE_WAIT(wait);
249
45bb912b
LE
250 /* Yes, we may run up to @number over max_buffers. If we
251 * follow it strictly, the admin will get it wrong anyways. */
89e58e75 252 if (atomic_read(&mdev->pp_in_use) < mdev->tconn->net_conf->max_buffers)
45bb912b 253 page = drbd_pp_first_pages_or_try_alloc(mdev, number);
b411b363 254
45bb912b 255 while (page == NULL) {
b411b363
PR
256 prepare_to_wait(&drbd_pp_wait, &wait, TASK_INTERRUPTIBLE);
257
258 drbd_kick_lo_and_reclaim_net(mdev);
259
89e58e75 260 if (atomic_read(&mdev->pp_in_use) < mdev->tconn->net_conf->max_buffers) {
45bb912b 261 page = drbd_pp_first_pages_or_try_alloc(mdev, number);
b411b363
PR
262 if (page)
263 break;
264 }
265
266 if (!retry)
267 break;
268
269 if (signal_pending(current)) {
270 dev_warn(DEV, "drbd_pp_alloc interrupted!\n");
271 break;
272 }
273
274 schedule();
275 }
276 finish_wait(&drbd_pp_wait, &wait);
277
45bb912b
LE
278 if (page)
279 atomic_add(number, &mdev->pp_in_use);
b411b363
PR
280 return page;
281}
282
283/* Must not be used from irq, as that may deadlock: see drbd_pp_alloc.
87eeee41 284 * Is also used from inside an other spin_lock_irq(&mdev->tconn->req_lock);
45bb912b
LE
285 * Either links the page chain back to the global pool,
286 * or returns all pages to the system. */
435f0740 287static void drbd_pp_free(struct drbd_conf *mdev, struct page *page, int is_net)
b411b363 288{
435f0740 289 atomic_t *a = is_net ? &mdev->pp_in_use_by_net : &mdev->pp_in_use;
b411b363 290 int i;
435f0740 291
81a5d60e 292 if (drbd_pp_vacant > (DRBD_MAX_BIO_SIZE/PAGE_SIZE) * minor_count)
45bb912b
LE
293 i = page_chain_free(page);
294 else {
295 struct page *tmp;
296 tmp = page_chain_tail(page, &i);
297 spin_lock(&drbd_pp_lock);
298 page_chain_add(&drbd_pp_pool, page, tmp);
299 drbd_pp_vacant += i;
300 spin_unlock(&drbd_pp_lock);
b411b363 301 }
435f0740 302 i = atomic_sub_return(i, a);
45bb912b 303 if (i < 0)
435f0740
LE
304 dev_warn(DEV, "ASSERTION FAILED: %s: %d < 0\n",
305 is_net ? "pp_in_use_by_net" : "pp_in_use", i);
b411b363
PR
306 wake_up(&drbd_pp_wait);
307}
308
309/*
310You need to hold the req_lock:
311 _drbd_wait_ee_list_empty()
312
313You must not have the req_lock:
314 drbd_free_ee()
315 drbd_alloc_ee()
316 drbd_init_ee()
317 drbd_release_ee()
318 drbd_ee_fix_bhs()
319 drbd_process_done_ee()
320 drbd_clear_done_ee()
321 drbd_wait_ee_list_empty()
322*/
323
f6ffca9f
AG
324struct drbd_peer_request *
325drbd_alloc_ee(struct drbd_conf *mdev, u64 id, sector_t sector,
326 unsigned int data_size, gfp_t gfp_mask) __must_hold(local)
b411b363 327{
db830c46 328 struct drbd_peer_request *peer_req;
b411b363 329 struct page *page;
45bb912b 330 unsigned nr_pages = (data_size + PAGE_SIZE -1) >> PAGE_SHIFT;
b411b363 331
0cf9d27e 332 if (drbd_insert_fault(mdev, DRBD_FAULT_AL_EE))
b411b363
PR
333 return NULL;
334
db830c46
AG
335 peer_req = mempool_alloc(drbd_ee_mempool, gfp_mask & ~__GFP_HIGHMEM);
336 if (!peer_req) {
b411b363
PR
337 if (!(gfp_mask & __GFP_NOWARN))
338 dev_err(DEV, "alloc_ee: Allocation of an EE failed\n");
339 return NULL;
340 }
341
45bb912b
LE
342 page = drbd_pp_alloc(mdev, nr_pages, (gfp_mask & __GFP_WAIT));
343 if (!page)
344 goto fail;
b411b363 345
db830c46
AG
346 drbd_clear_interval(&peer_req->i);
347 peer_req->i.size = data_size;
348 peer_req->i.sector = sector;
349 peer_req->i.local = false;
350 peer_req->i.waiting = false;
351
352 peer_req->epoch = NULL;
a21e9298 353 peer_req->w.mdev = mdev;
db830c46
AG
354 peer_req->pages = page;
355 atomic_set(&peer_req->pending_bios, 0);
356 peer_req->flags = 0;
9a8e7753
AG
357 /*
358 * The block_id is opaque to the receiver. It is not endianness
359 * converted, and sent back to the sender unchanged.
360 */
db830c46 361 peer_req->block_id = id;
b411b363 362
db830c46 363 return peer_req;
b411b363 364
45bb912b 365 fail:
db830c46 366 mempool_free(peer_req, drbd_ee_mempool);
b411b363
PR
367 return NULL;
368}
369
db830c46 370void drbd_free_some_ee(struct drbd_conf *mdev, struct drbd_peer_request *peer_req,
f6ffca9f 371 int is_net)
b411b363 372{
db830c46
AG
373 if (peer_req->flags & EE_HAS_DIGEST)
374 kfree(peer_req->digest);
375 drbd_pp_free(mdev, peer_req->pages, is_net);
376 D_ASSERT(atomic_read(&peer_req->pending_bios) == 0);
377 D_ASSERT(drbd_interval_empty(&peer_req->i));
378 mempool_free(peer_req, drbd_ee_mempool);
b411b363
PR
379}
380
381int drbd_release_ee(struct drbd_conf *mdev, struct list_head *list)
382{
383 LIST_HEAD(work_list);
db830c46 384 struct drbd_peer_request *peer_req, *t;
b411b363 385 int count = 0;
435f0740 386 int is_net = list == &mdev->net_ee;
b411b363 387
87eeee41 388 spin_lock_irq(&mdev->tconn->req_lock);
b411b363 389 list_splice_init(list, &work_list);
87eeee41 390 spin_unlock_irq(&mdev->tconn->req_lock);
b411b363 391
db830c46
AG
392 list_for_each_entry_safe(peer_req, t, &work_list, w.list) {
393 drbd_free_some_ee(mdev, peer_req, is_net);
b411b363
PR
394 count++;
395 }
396 return count;
397}
398
399
32862ec7 400/* See also comments in _req_mod(,BARRIER_ACKED)
b411b363
PR
401 * and receive_Barrier.
402 *
403 * Move entries from net_ee to done_ee, if ready.
404 * Grab done_ee, call all callbacks, free the entries.
405 * The callbacks typically send out ACKs.
406 */
407static int drbd_process_done_ee(struct drbd_conf *mdev)
408{
409 LIST_HEAD(work_list);
410 LIST_HEAD(reclaimed);
db830c46 411 struct drbd_peer_request *peer_req, *t;
e2b3032b 412 int err = 0;
b411b363 413
87eeee41 414 spin_lock_irq(&mdev->tconn->req_lock);
b411b363
PR
415 reclaim_net_ee(mdev, &reclaimed);
416 list_splice_init(&mdev->done_ee, &work_list);
87eeee41 417 spin_unlock_irq(&mdev->tconn->req_lock);
b411b363 418
db830c46
AG
419 list_for_each_entry_safe(peer_req, t, &reclaimed, w.list)
420 drbd_free_net_ee(mdev, peer_req);
b411b363
PR
421
422 /* possible callbacks here:
7be8da07 423 * e_end_block, and e_end_resync_block, e_send_discard_write.
b411b363
PR
424 * all ignore the last argument.
425 */
db830c46 426 list_for_each_entry_safe(peer_req, t, &work_list, w.list) {
e2b3032b
AG
427 int err2;
428
b411b363 429 /* list_del not necessary, next/prev members not touched */
e2b3032b
AG
430 err2 = peer_req->w.cb(&peer_req->w, !!err);
431 if (!err)
432 err = err2;
db830c46 433 drbd_free_ee(mdev, peer_req);
b411b363
PR
434 }
435 wake_up(&mdev->ee_wait);
436
e2b3032b 437 return err;
b411b363
PR
438}
439
440void _drbd_wait_ee_list_empty(struct drbd_conf *mdev, struct list_head *head)
441{
442 DEFINE_WAIT(wait);
443
444 /* avoids spin_lock/unlock
445 * and calling prepare_to_wait in the fast path */
446 while (!list_empty(head)) {
447 prepare_to_wait(&mdev->ee_wait, &wait, TASK_UNINTERRUPTIBLE);
87eeee41 448 spin_unlock_irq(&mdev->tconn->req_lock);
7eaceacc 449 io_schedule();
b411b363 450 finish_wait(&mdev->ee_wait, &wait);
87eeee41 451 spin_lock_irq(&mdev->tconn->req_lock);
b411b363
PR
452 }
453}
454
455void drbd_wait_ee_list_empty(struct drbd_conf *mdev, struct list_head *head)
456{
87eeee41 457 spin_lock_irq(&mdev->tconn->req_lock);
b411b363 458 _drbd_wait_ee_list_empty(mdev, head);
87eeee41 459 spin_unlock_irq(&mdev->tconn->req_lock);
b411b363
PR
460}
461
462/* see also kernel_accept; which is only present since 2.6.18.
463 * also we want to log which part of it failed, exactly */
7653620d 464static int drbd_accept(const char **what, struct socket *sock, struct socket **newsock)
b411b363
PR
465{
466 struct sock *sk = sock->sk;
467 int err = 0;
468
469 *what = "listen";
470 err = sock->ops->listen(sock, 5);
471 if (err < 0)
472 goto out;
473
474 *what = "sock_create_lite";
475 err = sock_create_lite(sk->sk_family, sk->sk_type, sk->sk_protocol,
476 newsock);
477 if (err < 0)
478 goto out;
479
480 *what = "accept";
481 err = sock->ops->accept(sock, *newsock, 0);
482 if (err < 0) {
483 sock_release(*newsock);
484 *newsock = NULL;
485 goto out;
486 }
487 (*newsock)->ops = sock->ops;
488
489out:
490 return err;
491}
492
dbd9eea0 493static int drbd_recv_short(struct socket *sock, void *buf, size_t size, int flags)
b411b363
PR
494{
495 mm_segment_t oldfs;
496 struct kvec iov = {
497 .iov_base = buf,
498 .iov_len = size,
499 };
500 struct msghdr msg = {
501 .msg_iovlen = 1,
502 .msg_iov = (struct iovec *)&iov,
503 .msg_flags = (flags ? flags : MSG_WAITALL | MSG_NOSIGNAL)
504 };
505 int rv;
506
507 oldfs = get_fs();
508 set_fs(KERNEL_DS);
509 rv = sock_recvmsg(sock, &msg, size, msg.msg_flags);
510 set_fs(oldfs);
511
512 return rv;
513}
514
de0ff338 515static int drbd_recv(struct drbd_tconn *tconn, void *buf, size_t size)
b411b363
PR
516{
517 mm_segment_t oldfs;
518 struct kvec iov = {
519 .iov_base = buf,
520 .iov_len = size,
521 };
522 struct msghdr msg = {
523 .msg_iovlen = 1,
524 .msg_iov = (struct iovec *)&iov,
525 .msg_flags = MSG_WAITALL | MSG_NOSIGNAL
526 };
527 int rv;
528
529 oldfs = get_fs();
530 set_fs(KERNEL_DS);
531
532 for (;;) {
de0ff338 533 rv = sock_recvmsg(tconn->data.socket, &msg, size, msg.msg_flags);
b411b363
PR
534 if (rv == size)
535 break;
536
537 /* Note:
538 * ECONNRESET other side closed the connection
539 * ERESTARTSYS (on sock) we got a signal
540 */
541
542 if (rv < 0) {
543 if (rv == -ECONNRESET)
de0ff338 544 conn_info(tconn, "sock was reset by peer\n");
b411b363 545 else if (rv != -ERESTARTSYS)
de0ff338 546 conn_err(tconn, "sock_recvmsg returned %d\n", rv);
b411b363
PR
547 break;
548 } else if (rv == 0) {
de0ff338 549 conn_info(tconn, "sock was shut down by peer\n");
b411b363
PR
550 break;
551 } else {
552 /* signal came in, or peer/link went down,
553 * after we read a partial message
554 */
555 /* D_ASSERT(signal_pending(current)); */
556 break;
557 }
558 };
559
560 set_fs(oldfs);
561
562 if (rv != size)
bbeb641c 563 conn_request_state(tconn, NS(conn, C_BROKEN_PIPE), CS_HARD);
b411b363
PR
564
565 return rv;
566}
567
5dbf1673
LE
568/* quoting tcp(7):
569 * On individual connections, the socket buffer size must be set prior to the
570 * listen(2) or connect(2) calls in order to have it take effect.
571 * This is our wrapper to do so.
572 */
573static void drbd_setbufsize(struct socket *sock, unsigned int snd,
574 unsigned int rcv)
575{
576 /* open coded SO_SNDBUF, SO_RCVBUF */
577 if (snd) {
578 sock->sk->sk_sndbuf = snd;
579 sock->sk->sk_userlocks |= SOCK_SNDBUF_LOCK;
580 }
581 if (rcv) {
582 sock->sk->sk_rcvbuf = rcv;
583 sock->sk->sk_userlocks |= SOCK_RCVBUF_LOCK;
584 }
585}
586
eac3e990 587static struct socket *drbd_try_connect(struct drbd_tconn *tconn)
b411b363
PR
588{
589 const char *what;
590 struct socket *sock;
591 struct sockaddr_in6 src_in6;
592 int err;
593 int disconnect_on_error = 1;
594
eac3e990 595 if (!get_net_conf(tconn))
b411b363
PR
596 return NULL;
597
598 what = "sock_create_kern";
eac3e990 599 err = sock_create_kern(((struct sockaddr *)tconn->net_conf->my_addr)->sa_family,
b411b363
PR
600 SOCK_STREAM, IPPROTO_TCP, &sock);
601 if (err < 0) {
602 sock = NULL;
603 goto out;
604 }
605
606 sock->sk->sk_rcvtimeo =
eac3e990
PR
607 sock->sk->sk_sndtimeo = tconn->net_conf->try_connect_int*HZ;
608 drbd_setbufsize(sock, tconn->net_conf->sndbuf_size,
609 tconn->net_conf->rcvbuf_size);
b411b363
PR
610
611 /* explicitly bind to the configured IP as source IP
612 * for the outgoing connections.
613 * This is needed for multihomed hosts and to be
614 * able to use lo: interfaces for drbd.
615 * Make sure to use 0 as port number, so linux selects
616 * a free one dynamically.
617 */
eac3e990
PR
618 memcpy(&src_in6, tconn->net_conf->my_addr,
619 min_t(int, tconn->net_conf->my_addr_len, sizeof(src_in6)));
620 if (((struct sockaddr *)tconn->net_conf->my_addr)->sa_family == AF_INET6)
b411b363
PR
621 src_in6.sin6_port = 0;
622 else
623 ((struct sockaddr_in *)&src_in6)->sin_port = 0; /* AF_INET & AF_SCI */
624
625 what = "bind before connect";
626 err = sock->ops->bind(sock,
627 (struct sockaddr *) &src_in6,
eac3e990 628 tconn->net_conf->my_addr_len);
b411b363
PR
629 if (err < 0)
630 goto out;
631
632 /* connect may fail, peer not yet available.
633 * stay C_WF_CONNECTION, don't go Disconnecting! */
634 disconnect_on_error = 0;
635 what = "connect";
636 err = sock->ops->connect(sock,
eac3e990
PR
637 (struct sockaddr *)tconn->net_conf->peer_addr,
638 tconn->net_conf->peer_addr_len, 0);
b411b363
PR
639
640out:
641 if (err < 0) {
642 if (sock) {
643 sock_release(sock);
644 sock = NULL;
645 }
646 switch (-err) {
647 /* timeout, busy, signal pending */
648 case ETIMEDOUT: case EAGAIN: case EINPROGRESS:
649 case EINTR: case ERESTARTSYS:
650 /* peer not (yet) available, network problem */
651 case ECONNREFUSED: case ENETUNREACH:
652 case EHOSTDOWN: case EHOSTUNREACH:
653 disconnect_on_error = 0;
654 break;
655 default:
eac3e990 656 conn_err(tconn, "%s failed, err = %d\n", what, err);
b411b363
PR
657 }
658 if (disconnect_on_error)
bbeb641c 659 conn_request_state(tconn, NS(conn, C_DISCONNECTING), CS_HARD);
b411b363 660 }
eac3e990 661 put_net_conf(tconn);
b411b363
PR
662 return sock;
663}
664
7653620d 665static struct socket *drbd_wait_for_connect(struct drbd_tconn *tconn)
b411b363
PR
666{
667 int timeo, err;
668 struct socket *s_estab = NULL, *s_listen;
669 const char *what;
670
7653620d 671 if (!get_net_conf(tconn))
b411b363
PR
672 return NULL;
673
674 what = "sock_create_kern";
7653620d 675 err = sock_create_kern(((struct sockaddr *)tconn->net_conf->my_addr)->sa_family,
b411b363
PR
676 SOCK_STREAM, IPPROTO_TCP, &s_listen);
677 if (err) {
678 s_listen = NULL;
679 goto out;
680 }
681
7653620d 682 timeo = tconn->net_conf->try_connect_int * HZ;
b411b363
PR
683 timeo += (random32() & 1) ? timeo / 7 : -timeo / 7; /* 28.5% random jitter */
684
685 s_listen->sk->sk_reuse = 1; /* SO_REUSEADDR */
686 s_listen->sk->sk_rcvtimeo = timeo;
687 s_listen->sk->sk_sndtimeo = timeo;
7653620d
PR
688 drbd_setbufsize(s_listen, tconn->net_conf->sndbuf_size,
689 tconn->net_conf->rcvbuf_size);
b411b363
PR
690
691 what = "bind before listen";
692 err = s_listen->ops->bind(s_listen,
7653620d
PR
693 (struct sockaddr *) tconn->net_conf->my_addr,
694 tconn->net_conf->my_addr_len);
b411b363
PR
695 if (err < 0)
696 goto out;
697
7653620d 698 err = drbd_accept(&what, s_listen, &s_estab);
b411b363
PR
699
700out:
701 if (s_listen)
702 sock_release(s_listen);
703 if (err < 0) {
704 if (err != -EAGAIN && err != -EINTR && err != -ERESTARTSYS) {
7653620d 705 conn_err(tconn, "%s failed, err = %d\n", what, err);
bbeb641c 706 conn_request_state(tconn, NS(conn, C_DISCONNECTING), CS_HARD);
b411b363
PR
707 }
708 }
7653620d 709 put_net_conf(tconn);
b411b363
PR
710
711 return s_estab;
712}
713
d38e787e 714static int drbd_send_fp(struct drbd_tconn *tconn, struct socket *sock, enum drbd_packet cmd)
b411b363 715{
d38e787e 716 struct p_header *h = &tconn->data.sbuf.header;
b411b363 717
ecf2363c 718 return !_conn_send_cmd(tconn, 0, sock, cmd, h, sizeof(*h), 0);
b411b363
PR
719}
720
a25b63f1 721static enum drbd_packet drbd_recv_fp(struct drbd_tconn *tconn, struct socket *sock)
b411b363 722{
a25b63f1 723 struct p_header80 *h = &tconn->data.rbuf.header.h80;
b411b363
PR
724 int rr;
725
dbd9eea0 726 rr = drbd_recv_short(sock, h, sizeof(*h), 0);
b411b363 727
ca9bc12b 728 if (rr == sizeof(*h) && h->magic == cpu_to_be32(DRBD_MAGIC))
b411b363
PR
729 return be16_to_cpu(h->command);
730
731 return 0xffff;
732}
733
734/**
735 * drbd_socket_okay() - Free the socket if its connection is not okay
b411b363
PR
736 * @sock: pointer to the pointer to the socket.
737 */
dbd9eea0 738static int drbd_socket_okay(struct socket **sock)
b411b363
PR
739{
740 int rr;
741 char tb[4];
742
743 if (!*sock)
81e84650 744 return false;
b411b363 745
dbd9eea0 746 rr = drbd_recv_short(*sock, tb, 4, MSG_DONTWAIT | MSG_PEEK);
b411b363
PR
747
748 if (rr > 0 || rr == -EAGAIN) {
81e84650 749 return true;
b411b363
PR
750 } else {
751 sock_release(*sock);
752 *sock = NULL;
81e84650 753 return false;
b411b363
PR
754 }
755}
2325eb66
PR
756/* Gets called if a connection is established, or if a new minor gets created
757 in a connection */
758int drbd_connected(int vnr, void *p, void *data)
907599e0
PR
759{
760 struct drbd_conf *mdev = (struct drbd_conf *)p;
761 int ok = 1;
762
763 atomic_set(&mdev->packet_seq, 0);
764 mdev->peer_seq = 0;
765
8410da8f
PR
766 mdev->state_mutex = mdev->tconn->agreed_pro_version < 100 ?
767 &mdev->tconn->cstate_mutex :
768 &mdev->own_state_mutex;
769
103ea275 770 ok &= !drbd_send_sync_param(mdev);
f02d4d0a 771 ok &= !drbd_send_sizes(mdev, 0, 0);
2ae5f95b 772 ok &= !drbd_send_uuids(mdev);
927036f9 773 ok &= !drbd_send_state(mdev);
907599e0
PR
774 clear_bit(USE_DEGR_WFC_T, &mdev->flags);
775 clear_bit(RESIZE_PENDING, &mdev->flags);
776
8410da8f 777
907599e0
PR
778 return !ok;
779}
780
b411b363
PR
781/*
782 * return values:
783 * 1 yes, we have a valid connection
784 * 0 oops, did not work out, please try again
785 * -1 peer talks different language,
786 * no point in trying again, please go standalone.
787 * -2 We do not have a network config...
788 */
907599e0 789static int drbd_connect(struct drbd_tconn *tconn)
b411b363
PR
790{
791 struct socket *s, *sock, *msock;
792 int try, h, ok;
793
bbeb641c 794 if (conn_request_state(tconn, NS(conn, C_WF_CONNECTION), CS_VERBOSE) < SS_SUCCESS)
b411b363
PR
795 return -2;
796
907599e0
PR
797 clear_bit(DISCARD_CONCURRENT, &tconn->flags);
798 tconn->agreed_pro_version = 99;
fd340c12
PR
799 /* agreed_pro_version must be smaller than 100 so we send the old
800 header (h80) in the first packet and in the handshake packet. */
b411b363
PR
801
802 sock = NULL;
803 msock = NULL;
804
805 do {
806 for (try = 0;;) {
807 /* 3 tries, this should take less than a second! */
907599e0 808 s = drbd_try_connect(tconn);
b411b363
PR
809 if (s || ++try >= 3)
810 break;
811 /* give the other side time to call bind() & listen() */
20ee6390 812 schedule_timeout_interruptible(HZ / 10);
b411b363
PR
813 }
814
815 if (s) {
816 if (!sock) {
907599e0 817 drbd_send_fp(tconn, s, P_HAND_SHAKE_S);
b411b363
PR
818 sock = s;
819 s = NULL;
820 } else if (!msock) {
907599e0 821 drbd_send_fp(tconn, s, P_HAND_SHAKE_M);
b411b363
PR
822 msock = s;
823 s = NULL;
824 } else {
907599e0 825 conn_err(tconn, "Logic error in drbd_connect()\n");
b411b363
PR
826 goto out_release_sockets;
827 }
828 }
829
830 if (sock && msock) {
907599e0 831 schedule_timeout_interruptible(tconn->net_conf->ping_timeo*HZ/10);
dbd9eea0
PR
832 ok = drbd_socket_okay(&sock);
833 ok = drbd_socket_okay(&msock) && ok;
b411b363
PR
834 if (ok)
835 break;
836 }
837
838retry:
907599e0 839 s = drbd_wait_for_connect(tconn);
b411b363 840 if (s) {
907599e0 841 try = drbd_recv_fp(tconn, s);
dbd9eea0
PR
842 drbd_socket_okay(&sock);
843 drbd_socket_okay(&msock);
b411b363
PR
844 switch (try) {
845 case P_HAND_SHAKE_S:
846 if (sock) {
907599e0 847 conn_warn(tconn, "initial packet S crossed\n");
b411b363
PR
848 sock_release(sock);
849 }
850 sock = s;
851 break;
852 case P_HAND_SHAKE_M:
853 if (msock) {
907599e0 854 conn_warn(tconn, "initial packet M crossed\n");
b411b363
PR
855 sock_release(msock);
856 }
857 msock = s;
907599e0 858 set_bit(DISCARD_CONCURRENT, &tconn->flags);
b411b363
PR
859 break;
860 default:
907599e0 861 conn_warn(tconn, "Error receiving initial packet\n");
b411b363
PR
862 sock_release(s);
863 if (random32() & 1)
864 goto retry;
865 }
866 }
867
bbeb641c 868 if (tconn->cstate <= C_DISCONNECTING)
b411b363
PR
869 goto out_release_sockets;
870 if (signal_pending(current)) {
871 flush_signals(current);
872 smp_rmb();
907599e0 873 if (get_t_state(&tconn->receiver) == EXITING)
b411b363
PR
874 goto out_release_sockets;
875 }
876
877 if (sock && msock) {
dbd9eea0
PR
878 ok = drbd_socket_okay(&sock);
879 ok = drbd_socket_okay(&msock) && ok;
b411b363
PR
880 if (ok)
881 break;
882 }
883 } while (1);
884
885 msock->sk->sk_reuse = 1; /* SO_REUSEADDR */
886 sock->sk->sk_reuse = 1; /* SO_REUSEADDR */
887
888 sock->sk->sk_allocation = GFP_NOIO;
889 msock->sk->sk_allocation = GFP_NOIO;
890
891 sock->sk->sk_priority = TC_PRIO_INTERACTIVE_BULK;
892 msock->sk->sk_priority = TC_PRIO_INTERACTIVE;
893
b411b363 894 /* NOT YET ...
907599e0 895 * sock->sk->sk_sndtimeo = tconn->net_conf->timeout*HZ/10;
b411b363
PR
896 * sock->sk->sk_rcvtimeo = MAX_SCHEDULE_TIMEOUT;
897 * first set it to the P_HAND_SHAKE timeout,
898 * which we set to 4x the configured ping_timeout. */
899 sock->sk->sk_sndtimeo =
907599e0 900 sock->sk->sk_rcvtimeo = tconn->net_conf->ping_timeo*4*HZ/10;
b411b363 901
907599e0
PR
902 msock->sk->sk_sndtimeo = tconn->net_conf->timeout*HZ/10;
903 msock->sk->sk_rcvtimeo = tconn->net_conf->ping_int*HZ;
b411b363
PR
904
905 /* we don't want delays.
25985edc 906 * we use TCP_CORK where appropriate, though */
b411b363
PR
907 drbd_tcp_nodelay(sock);
908 drbd_tcp_nodelay(msock);
909
907599e0
PR
910 tconn->data.socket = sock;
911 tconn->meta.socket = msock;
912 tconn->last_received = jiffies;
b411b363 913
907599e0 914 h = drbd_do_handshake(tconn);
b411b363
PR
915 if (h <= 0)
916 return h;
917
907599e0 918 if (tconn->cram_hmac_tfm) {
b411b363 919 /* drbd_request_state(mdev, NS(conn, WFAuth)); */
907599e0 920 switch (drbd_do_auth(tconn)) {
b10d96cb 921 case -1:
907599e0 922 conn_err(tconn, "Authentication of peer failed\n");
b411b363 923 return -1;
b10d96cb 924 case 0:
907599e0 925 conn_err(tconn, "Authentication of peer failed, trying again.\n");
b10d96cb 926 return 0;
b411b363
PR
927 }
928 }
929
bbeb641c 930 if (conn_request_state(tconn, NS(conn, C_WF_REPORT_PARAMS), CS_VERBOSE) < SS_SUCCESS)
b411b363
PR
931 return 0;
932
907599e0 933 sock->sk->sk_sndtimeo = tconn->net_conf->timeout*HZ/10;
b411b363
PR
934 sock->sk->sk_rcvtimeo = MAX_SCHEDULE_TIMEOUT;
935
907599e0 936 drbd_thread_start(&tconn->asender);
b411b363 937
387eb308 938 if (drbd_send_protocol(tconn) == -EOPNOTSUPP)
7e2455c1 939 return -1;
b411b363 940
907599e0 941 return !idr_for_each(&tconn->volumes, drbd_connected, tconn);
b411b363
PR
942
943out_release_sockets:
944 if (sock)
945 sock_release(sock);
946 if (msock)
947 sock_release(msock);
948 return -1;
949}
950
8172f3e9 951static int decode_header(struct drbd_tconn *tconn, struct p_header *h, struct packet_info *pi)
b411b363 952{
fd340c12 953 if (h->h80.magic == cpu_to_be32(DRBD_MAGIC)) {
77351055
PR
954 pi->cmd = be16_to_cpu(h->h80.command);
955 pi->size = be16_to_cpu(h->h80.length);
eefc2f7d 956 pi->vnr = 0;
ca9bc12b 957 } else if (h->h95.magic == cpu_to_be16(DRBD_MAGIC_BIG)) {
77351055
PR
958 pi->cmd = be16_to_cpu(h->h95.command);
959 pi->size = be32_to_cpu(h->h95.length) & 0x00ffffff;
960 pi->vnr = 0;
02918be2 961 } else {
ce243853 962 conn_err(tconn, "magic?? on data m: 0x%08x c: %d l: %d\n",
004352fa
LE
963 be32_to_cpu(h->h80.magic),
964 be16_to_cpu(h->h80.command),
965 be16_to_cpu(h->h80.length));
8172f3e9 966 return -EINVAL;
b411b363 967 }
8172f3e9 968 return 0;
257d0af6
PR
969}
970
9ba7aa00 971static int drbd_recv_header(struct drbd_tconn *tconn, struct packet_info *pi)
257d0af6 972{
9ba7aa00 973 struct p_header *h = &tconn->data.rbuf.header;
69bc7bc3 974 int err;
257d0af6 975
69bc7bc3
AG
976 err = drbd_recv(tconn, h, sizeof(*h));
977 if (unlikely(err != sizeof(*h))) {
257d0af6 978 if (!signal_pending(current))
69bc7bc3
AG
979 conn_warn(tconn, "short read expecting header on sock: r=%d\n", err);
980 if (err >= 0)
981 err = -EIO;
982 return err;
257d0af6
PR
983 }
984
69bc7bc3 985 err = decode_header(tconn, h, pi);
9ba7aa00 986 tconn->last_received = jiffies;
b411b363 987
69bc7bc3 988 return err;
b411b363
PR
989}
990
2451fc3b 991static void drbd_flush(struct drbd_conf *mdev)
b411b363
PR
992{
993 int rv;
994
995 if (mdev->write_ordering >= WO_bdev_flush && get_ldev(mdev)) {
fbd9b09a 996 rv = blkdev_issue_flush(mdev->ldev->backing_bdev, GFP_KERNEL,
dd3932ed 997 NULL);
b411b363
PR
998 if (rv) {
999 dev_err(DEV, "local disk flush failed with status %d\n", rv);
1000 /* would rather check on EOPNOTSUPP, but that is not reliable.
1001 * don't try again for ANY return value != 0
1002 * if (rv == -EOPNOTSUPP) */
1003 drbd_bump_write_ordering(mdev, WO_drain_io);
1004 }
1005 put_ldev(mdev);
1006 }
b411b363
PR
1007}
1008
1009/**
1010 * drbd_may_finish_epoch() - Applies an epoch_event to the epoch's state, eventually finishes it.
1011 * @mdev: DRBD device.
1012 * @epoch: Epoch object.
1013 * @ev: Epoch event.
1014 */
1015static enum finish_epoch drbd_may_finish_epoch(struct drbd_conf *mdev,
1016 struct drbd_epoch *epoch,
1017 enum epoch_event ev)
1018{
2451fc3b 1019 int epoch_size;
b411b363 1020 struct drbd_epoch *next_epoch;
b411b363
PR
1021 enum finish_epoch rv = FE_STILL_LIVE;
1022
1023 spin_lock(&mdev->epoch_lock);
1024 do {
1025 next_epoch = NULL;
b411b363
PR
1026
1027 epoch_size = atomic_read(&epoch->epoch_size);
1028
1029 switch (ev & ~EV_CLEANUP) {
1030 case EV_PUT:
1031 atomic_dec(&epoch->active);
1032 break;
1033 case EV_GOT_BARRIER_NR:
1034 set_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags);
b411b363
PR
1035 break;
1036 case EV_BECAME_LAST:
1037 /* nothing to do*/
1038 break;
1039 }
1040
b411b363
PR
1041 if (epoch_size != 0 &&
1042 atomic_read(&epoch->active) == 0 &&
2451fc3b 1043 test_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags)) {
b411b363
PR
1044 if (!(ev & EV_CLEANUP)) {
1045 spin_unlock(&mdev->epoch_lock);
1046 drbd_send_b_ack(mdev, epoch->barrier_nr, epoch_size);
1047 spin_lock(&mdev->epoch_lock);
1048 }
1049 dec_unacked(mdev);
1050
1051 if (mdev->current_epoch != epoch) {
1052 next_epoch = list_entry(epoch->list.next, struct drbd_epoch, list);
1053 list_del(&epoch->list);
1054 ev = EV_BECAME_LAST | (ev & EV_CLEANUP);
1055 mdev->epochs--;
b411b363
PR
1056 kfree(epoch);
1057
1058 if (rv == FE_STILL_LIVE)
1059 rv = FE_DESTROYED;
1060 } else {
1061 epoch->flags = 0;
1062 atomic_set(&epoch->epoch_size, 0);
698f9315 1063 /* atomic_set(&epoch->active, 0); is already zero */
b411b363
PR
1064 if (rv == FE_STILL_LIVE)
1065 rv = FE_RECYCLED;
2451fc3b 1066 wake_up(&mdev->ee_wait);
b411b363
PR
1067 }
1068 }
1069
1070 if (!next_epoch)
1071 break;
1072
1073 epoch = next_epoch;
1074 } while (1);
1075
1076 spin_unlock(&mdev->epoch_lock);
1077
b411b363
PR
1078 return rv;
1079}
1080
1081/**
1082 * drbd_bump_write_ordering() - Fall back to an other write ordering method
1083 * @mdev: DRBD device.
1084 * @wo: Write ordering method to try.
1085 */
1086void drbd_bump_write_ordering(struct drbd_conf *mdev, enum write_ordering_e wo) __must_hold(local)
1087{
1088 enum write_ordering_e pwo;
1089 static char *write_ordering_str[] = {
1090 [WO_none] = "none",
1091 [WO_drain_io] = "drain",
1092 [WO_bdev_flush] = "flush",
b411b363
PR
1093 };
1094
1095 pwo = mdev->write_ordering;
1096 wo = min(pwo, wo);
b411b363
PR
1097 if (wo == WO_bdev_flush && mdev->ldev->dc.no_disk_flush)
1098 wo = WO_drain_io;
1099 if (wo == WO_drain_io && mdev->ldev->dc.no_disk_drain)
1100 wo = WO_none;
1101 mdev->write_ordering = wo;
2451fc3b 1102 if (pwo != mdev->write_ordering || wo == WO_bdev_flush)
b411b363
PR
1103 dev_info(DEV, "Method to ensure write ordering: %s\n", write_ordering_str[mdev->write_ordering]);
1104}
1105
45bb912b 1106/**
fbe29dec 1107 * drbd_submit_peer_request()
45bb912b 1108 * @mdev: DRBD device.
db830c46 1109 * @peer_req: peer request
45bb912b 1110 * @rw: flag field, see bio->bi_rw
10f6d992
LE
1111 *
1112 * May spread the pages to multiple bios,
1113 * depending on bio_add_page restrictions.
1114 *
1115 * Returns 0 if all bios have been submitted,
1116 * -ENOMEM if we could not allocate enough bios,
1117 * -ENOSPC (any better suggestion?) if we have not been able to bio_add_page a
1118 * single page to an empty bio (which should never happen and likely indicates
1119 * that the lower level IO stack is in some way broken). This has been observed
1120 * on certain Xen deployments.
45bb912b
LE
1121 */
1122/* TODO allocate from our own bio_set. */
fbe29dec
AG
1123int drbd_submit_peer_request(struct drbd_conf *mdev,
1124 struct drbd_peer_request *peer_req,
1125 const unsigned rw, const int fault_type)
45bb912b
LE
1126{
1127 struct bio *bios = NULL;
1128 struct bio *bio;
db830c46
AG
1129 struct page *page = peer_req->pages;
1130 sector_t sector = peer_req->i.sector;
1131 unsigned ds = peer_req->i.size;
45bb912b
LE
1132 unsigned n_bios = 0;
1133 unsigned nr_pages = (ds + PAGE_SIZE -1) >> PAGE_SHIFT;
10f6d992 1134 int err = -ENOMEM;
45bb912b
LE
1135
1136 /* In most cases, we will only need one bio. But in case the lower
1137 * level restrictions happen to be different at this offset on this
1138 * side than those of the sending peer, we may need to submit the
da4a75d2
LE
1139 * request in more than one bio.
1140 *
1141 * Plain bio_alloc is good enough here, this is no DRBD internally
1142 * generated bio, but a bio allocated on behalf of the peer.
1143 */
45bb912b
LE
1144next_bio:
1145 bio = bio_alloc(GFP_NOIO, nr_pages);
1146 if (!bio) {
1147 dev_err(DEV, "submit_ee: Allocation of a bio failed\n");
1148 goto fail;
1149 }
db830c46 1150 /* > peer_req->i.sector, unless this is the first bio */
45bb912b
LE
1151 bio->bi_sector = sector;
1152 bio->bi_bdev = mdev->ldev->backing_bdev;
45bb912b 1153 bio->bi_rw = rw;
db830c46 1154 bio->bi_private = peer_req;
fcefa62e 1155 bio->bi_end_io = drbd_peer_request_endio;
45bb912b
LE
1156
1157 bio->bi_next = bios;
1158 bios = bio;
1159 ++n_bios;
1160
1161 page_chain_for_each(page) {
1162 unsigned len = min_t(unsigned, ds, PAGE_SIZE);
1163 if (!bio_add_page(bio, page, len, 0)) {
10f6d992
LE
1164 /* A single page must always be possible!
1165 * But in case it fails anyways,
1166 * we deal with it, and complain (below). */
1167 if (bio->bi_vcnt == 0) {
1168 dev_err(DEV,
1169 "bio_add_page failed for len=%u, "
1170 "bi_vcnt=0 (bi_sector=%llu)\n",
1171 len, (unsigned long long)bio->bi_sector);
1172 err = -ENOSPC;
1173 goto fail;
1174 }
45bb912b
LE
1175 goto next_bio;
1176 }
1177 ds -= len;
1178 sector += len >> 9;
1179 --nr_pages;
1180 }
1181 D_ASSERT(page == NULL);
1182 D_ASSERT(ds == 0);
1183
db830c46 1184 atomic_set(&peer_req->pending_bios, n_bios);
45bb912b
LE
1185 do {
1186 bio = bios;
1187 bios = bios->bi_next;
1188 bio->bi_next = NULL;
1189
45bb912b 1190 drbd_generic_make_request(mdev, fault_type, bio);
45bb912b 1191 } while (bios);
45bb912b
LE
1192 return 0;
1193
1194fail:
1195 while (bios) {
1196 bio = bios;
1197 bios = bios->bi_next;
1198 bio_put(bio);
1199 }
10f6d992 1200 return err;
45bb912b
LE
1201}
1202
53840641 1203static void drbd_remove_epoch_entry_interval(struct drbd_conf *mdev,
db830c46 1204 struct drbd_peer_request *peer_req)
53840641 1205{
db830c46 1206 struct drbd_interval *i = &peer_req->i;
53840641
AG
1207
1208 drbd_remove_interval(&mdev->write_requests, i);
1209 drbd_clear_interval(i);
1210
6c852bec 1211 /* Wake up any processes waiting for this peer request to complete. */
53840641
AG
1212 if (i->waiting)
1213 wake_up(&mdev->misc_wait);
1214}
1215
d8763023
AG
1216static int receive_Barrier(struct drbd_conf *mdev, enum drbd_packet cmd,
1217 unsigned int data_size)
b411b363 1218{
2451fc3b 1219 int rv;
e42325a5 1220 struct p_barrier *p = &mdev->tconn->data.rbuf.barrier;
b411b363
PR
1221 struct drbd_epoch *epoch;
1222
b411b363
PR
1223 inc_unacked(mdev);
1224
b411b363
PR
1225 mdev->current_epoch->barrier_nr = p->barrier;
1226 rv = drbd_may_finish_epoch(mdev, mdev->current_epoch, EV_GOT_BARRIER_NR);
1227
1228 /* P_BARRIER_ACK may imply that the corresponding extent is dropped from
1229 * the activity log, which means it would not be resynced in case the
1230 * R_PRIMARY crashes now.
1231 * Therefore we must send the barrier_ack after the barrier request was
1232 * completed. */
1233 switch (mdev->write_ordering) {
b411b363
PR
1234 case WO_none:
1235 if (rv == FE_RECYCLED)
81e84650 1236 return true;
2451fc3b
PR
1237
1238 /* receiver context, in the writeout path of the other node.
1239 * avoid potential distributed deadlock */
1240 epoch = kmalloc(sizeof(struct drbd_epoch), GFP_NOIO);
1241 if (epoch)
1242 break;
1243 else
1244 dev_warn(DEV, "Allocation of an epoch failed, slowing down\n");
1245 /* Fall through */
b411b363
PR
1246
1247 case WO_bdev_flush:
1248 case WO_drain_io:
b411b363 1249 drbd_wait_ee_list_empty(mdev, &mdev->active_ee);
2451fc3b
PR
1250 drbd_flush(mdev);
1251
1252 if (atomic_read(&mdev->current_epoch->epoch_size)) {
1253 epoch = kmalloc(sizeof(struct drbd_epoch), GFP_NOIO);
1254 if (epoch)
1255 break;
b411b363
PR
1256 }
1257
2451fc3b
PR
1258 epoch = mdev->current_epoch;
1259 wait_event(mdev->ee_wait, atomic_read(&epoch->epoch_size) == 0);
1260
1261 D_ASSERT(atomic_read(&epoch->active) == 0);
1262 D_ASSERT(epoch->flags == 0);
b411b363 1263
81e84650 1264 return true;
2451fc3b
PR
1265 default:
1266 dev_err(DEV, "Strangeness in mdev->write_ordering %d\n", mdev->write_ordering);
81e84650 1267 return false;
b411b363
PR
1268 }
1269
1270 epoch->flags = 0;
1271 atomic_set(&epoch->epoch_size, 0);
1272 atomic_set(&epoch->active, 0);
1273
1274 spin_lock(&mdev->epoch_lock);
1275 if (atomic_read(&mdev->current_epoch->epoch_size)) {
1276 list_add(&epoch->list, &mdev->current_epoch->list);
1277 mdev->current_epoch = epoch;
1278 mdev->epochs++;
b411b363
PR
1279 } else {
1280 /* The current_epoch got recycled while we allocated this one... */
1281 kfree(epoch);
1282 }
1283 spin_unlock(&mdev->epoch_lock);
1284
81e84650 1285 return true;
b411b363
PR
1286}
1287
1288/* used from receive_RSDataReply (recv_resync_read)
1289 * and from receive_Data */
f6ffca9f
AG
1290static struct drbd_peer_request *
1291read_in_block(struct drbd_conf *mdev, u64 id, sector_t sector,
1292 int data_size) __must_hold(local)
b411b363 1293{
6666032a 1294 const sector_t capacity = drbd_get_capacity(mdev->this_bdev);
db830c46 1295 struct drbd_peer_request *peer_req;
b411b363 1296 struct page *page;
45bb912b 1297 int dgs, ds, rr;
a0638456
PR
1298 void *dig_in = mdev->tconn->int_dig_in;
1299 void *dig_vv = mdev->tconn->int_dig_vv;
6b4388ac 1300 unsigned long *data;
b411b363 1301
a0638456
PR
1302 dgs = (mdev->tconn->agreed_pro_version >= 87 && mdev->tconn->integrity_r_tfm) ?
1303 crypto_hash_digestsize(mdev->tconn->integrity_r_tfm) : 0;
b411b363
PR
1304
1305 if (dgs) {
de0ff338 1306 rr = drbd_recv(mdev->tconn, dig_in, dgs);
b411b363 1307 if (rr != dgs) {
0ddc5549
LE
1308 if (!signal_pending(current))
1309 dev_warn(DEV,
1310 "short read receiving data digest: read %d expected %d\n",
1311 rr, dgs);
b411b363
PR
1312 return NULL;
1313 }
1314 }
1315
1316 data_size -= dgs;
1317
841ce241
AG
1318 if (!expect(data_size != 0))
1319 return NULL;
1320 if (!expect(IS_ALIGNED(data_size, 512)))
1321 return NULL;
1322 if (!expect(data_size <= DRBD_MAX_BIO_SIZE))
1323 return NULL;
b411b363 1324
6666032a
LE
1325 /* even though we trust out peer,
1326 * we sometimes have to double check. */
1327 if (sector + (data_size>>9) > capacity) {
fdda6544
LE
1328 dev_err(DEV, "request from peer beyond end of local disk: "
1329 "capacity: %llus < sector: %llus + size: %u\n",
6666032a
LE
1330 (unsigned long long)capacity,
1331 (unsigned long long)sector, data_size);
1332 return NULL;
1333 }
1334
b411b363
PR
1335 /* GFP_NOIO, because we must not cause arbitrary write-out: in a DRBD
1336 * "criss-cross" setup, that might cause write-out on some other DRBD,
1337 * which in turn might block on the other node at this very place. */
db830c46
AG
1338 peer_req = drbd_alloc_ee(mdev, id, sector, data_size, GFP_NOIO);
1339 if (!peer_req)
b411b363 1340 return NULL;
45bb912b 1341
b411b363 1342 ds = data_size;
db830c46 1343 page = peer_req->pages;
45bb912b
LE
1344 page_chain_for_each(page) {
1345 unsigned len = min_t(int, ds, PAGE_SIZE);
6b4388ac 1346 data = kmap(page);
de0ff338 1347 rr = drbd_recv(mdev->tconn, data, len);
0cf9d27e 1348 if (drbd_insert_fault(mdev, DRBD_FAULT_RECEIVE)) {
6b4388ac
PR
1349 dev_err(DEV, "Fault injection: Corrupting data on receive\n");
1350 data[0] = data[0] ^ (unsigned long)-1;
1351 }
b411b363 1352 kunmap(page);
45bb912b 1353 if (rr != len) {
db830c46 1354 drbd_free_ee(mdev, peer_req);
0ddc5549
LE
1355 if (!signal_pending(current))
1356 dev_warn(DEV, "short read receiving data: read %d expected %d\n",
1357 rr, len);
b411b363
PR
1358 return NULL;
1359 }
1360 ds -= rr;
1361 }
1362
1363 if (dgs) {
db830c46 1364 drbd_csum_ee(mdev, mdev->tconn->integrity_r_tfm, peer_req, dig_vv);
b411b363 1365 if (memcmp(dig_in, dig_vv, dgs)) {
470be44a
LE
1366 dev_err(DEV, "Digest integrity check FAILED: %llus +%u\n",
1367 (unsigned long long)sector, data_size);
db830c46 1368 drbd_free_ee(mdev, peer_req);
b411b363
PR
1369 return NULL;
1370 }
1371 }
1372 mdev->recv_cnt += data_size>>9;
db830c46 1373 return peer_req;
b411b363
PR
1374}
1375
1376/* drbd_drain_block() just takes a data block
1377 * out of the socket input buffer, and discards it.
1378 */
1379static int drbd_drain_block(struct drbd_conf *mdev, int data_size)
1380{
1381 struct page *page;
1382 int rr, rv = 1;
1383 void *data;
1384
c3470cde 1385 if (!data_size)
81e84650 1386 return true;
c3470cde 1387
45bb912b 1388 page = drbd_pp_alloc(mdev, 1, 1);
b411b363
PR
1389
1390 data = kmap(page);
1391 while (data_size) {
de0ff338 1392 rr = drbd_recv(mdev->tconn, data, min_t(int, data_size, PAGE_SIZE));
b411b363
PR
1393 if (rr != min_t(int, data_size, PAGE_SIZE)) {
1394 rv = 0;
0ddc5549
LE
1395 if (!signal_pending(current))
1396 dev_warn(DEV,
1397 "short read receiving data: read %d expected %d\n",
1398 rr, min_t(int, data_size, PAGE_SIZE));
b411b363
PR
1399 break;
1400 }
1401 data_size -= rr;
1402 }
1403 kunmap(page);
435f0740 1404 drbd_pp_free(mdev, page, 0);
b411b363
PR
1405 return rv;
1406}
1407
1408static int recv_dless_read(struct drbd_conf *mdev, struct drbd_request *req,
1409 sector_t sector, int data_size)
1410{
1411 struct bio_vec *bvec;
1412 struct bio *bio;
1413 int dgs, rr, i, expect;
a0638456
PR
1414 void *dig_in = mdev->tconn->int_dig_in;
1415 void *dig_vv = mdev->tconn->int_dig_vv;
b411b363 1416
a0638456
PR
1417 dgs = (mdev->tconn->agreed_pro_version >= 87 && mdev->tconn->integrity_r_tfm) ?
1418 crypto_hash_digestsize(mdev->tconn->integrity_r_tfm) : 0;
b411b363
PR
1419
1420 if (dgs) {
de0ff338 1421 rr = drbd_recv(mdev->tconn, dig_in, dgs);
b411b363 1422 if (rr != dgs) {
0ddc5549
LE
1423 if (!signal_pending(current))
1424 dev_warn(DEV,
1425 "short read receiving data reply digest: read %d expected %d\n",
1426 rr, dgs);
b411b363
PR
1427 return 0;
1428 }
1429 }
1430
1431 data_size -= dgs;
1432
1433 /* optimistically update recv_cnt. if receiving fails below,
1434 * we disconnect anyways, and counters will be reset. */
1435 mdev->recv_cnt += data_size>>9;
1436
1437 bio = req->master_bio;
1438 D_ASSERT(sector == bio->bi_sector);
1439
1440 bio_for_each_segment(bvec, bio, i) {
1441 expect = min_t(int, data_size, bvec->bv_len);
de0ff338 1442 rr = drbd_recv(mdev->tconn,
b411b363
PR
1443 kmap(bvec->bv_page)+bvec->bv_offset,
1444 expect);
1445 kunmap(bvec->bv_page);
1446 if (rr != expect) {
0ddc5549
LE
1447 if (!signal_pending(current))
1448 dev_warn(DEV, "short read receiving data reply: "
1449 "read %d expected %d\n",
1450 rr, expect);
b411b363
PR
1451 return 0;
1452 }
1453 data_size -= rr;
1454 }
1455
1456 if (dgs) {
a0638456 1457 drbd_csum_bio(mdev, mdev->tconn->integrity_r_tfm, bio, dig_vv);
b411b363
PR
1458 if (memcmp(dig_in, dig_vv, dgs)) {
1459 dev_err(DEV, "Digest integrity check FAILED. Broken NICs?\n");
1460 return 0;
1461 }
1462 }
1463
1464 D_ASSERT(data_size == 0);
1465 return 1;
1466}
1467
1468/* e_end_resync_block() is called via
1469 * drbd_process_done_ee() by asender only */
99920dc5 1470static int e_end_resync_block(struct drbd_work *w, int unused)
b411b363 1471{
8050e6d0
AG
1472 struct drbd_peer_request *peer_req =
1473 container_of(w, struct drbd_peer_request, w);
00d56944 1474 struct drbd_conf *mdev = w->mdev;
db830c46 1475 sector_t sector = peer_req->i.sector;
99920dc5 1476 int err;
b411b363 1477
db830c46 1478 D_ASSERT(drbd_interval_empty(&peer_req->i));
b411b363 1479
db830c46
AG
1480 if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
1481 drbd_set_in_sync(mdev, sector, peer_req->i.size);
99920dc5 1482 err = drbd_send_ack(mdev, P_RS_WRITE_ACK, peer_req);
b411b363
PR
1483 } else {
1484 /* Record failure to sync */
db830c46 1485 drbd_rs_failed_io(mdev, sector, peer_req->i.size);
b411b363 1486
99920dc5 1487 err = drbd_send_ack(mdev, P_NEG_ACK, peer_req);
b411b363
PR
1488 }
1489 dec_unacked(mdev);
1490
99920dc5 1491 return err;
b411b363
PR
1492}
1493
1494static int recv_resync_read(struct drbd_conf *mdev, sector_t sector, int data_size) __releases(local)
1495{
db830c46 1496 struct drbd_peer_request *peer_req;
b411b363 1497
db830c46
AG
1498 peer_req = read_in_block(mdev, ID_SYNCER, sector, data_size);
1499 if (!peer_req)
45bb912b 1500 goto fail;
b411b363
PR
1501
1502 dec_rs_pending(mdev);
1503
b411b363
PR
1504 inc_unacked(mdev);
1505 /* corresponding dec_unacked() in e_end_resync_block()
1506 * respective _drbd_clear_done_ee */
1507
db830c46 1508 peer_req->w.cb = e_end_resync_block;
45bb912b 1509
87eeee41 1510 spin_lock_irq(&mdev->tconn->req_lock);
db830c46 1511 list_add(&peer_req->w.list, &mdev->sync_ee);
87eeee41 1512 spin_unlock_irq(&mdev->tconn->req_lock);
b411b363 1513
0f0601f4 1514 atomic_add(data_size >> 9, &mdev->rs_sect_ev);
fbe29dec 1515 if (drbd_submit_peer_request(mdev, peer_req, WRITE, DRBD_FAULT_RS_WR) == 0)
81e84650 1516 return true;
b411b363 1517
10f6d992
LE
1518 /* don't care for the reason here */
1519 dev_err(DEV, "submit failed, triggering re-connect\n");
87eeee41 1520 spin_lock_irq(&mdev->tconn->req_lock);
db830c46 1521 list_del(&peer_req->w.list);
87eeee41 1522 spin_unlock_irq(&mdev->tconn->req_lock);
22cc37a9 1523
db830c46 1524 drbd_free_ee(mdev, peer_req);
45bb912b
LE
1525fail:
1526 put_ldev(mdev);
81e84650 1527 return false;
b411b363
PR
1528}
1529
668eebc6 1530static struct drbd_request *
bc9c5c41
AG
1531find_request(struct drbd_conf *mdev, struct rb_root *root, u64 id,
1532 sector_t sector, bool missing_ok, const char *func)
51624585 1533{
51624585
AG
1534 struct drbd_request *req;
1535
bc9c5c41
AG
1536 /* Request object according to our peer */
1537 req = (struct drbd_request *)(unsigned long)id;
5e472264 1538 if (drbd_contains_interval(root, sector, &req->i) && req->i.local)
668eebc6 1539 return req;
c3afd8f5
AG
1540 if (!missing_ok) {
1541 dev_err(DEV, "%s: failed to find request %lu, sector %llus\n", func,
1542 (unsigned long)id, (unsigned long long)sector);
1543 }
51624585
AG
1544 return NULL;
1545}
1546
d8763023
AG
1547static int receive_DataReply(struct drbd_conf *mdev, enum drbd_packet cmd,
1548 unsigned int data_size)
b411b363
PR
1549{
1550 struct drbd_request *req;
1551 sector_t sector;
b411b363 1552 int ok;
e42325a5 1553 struct p_data *p = &mdev->tconn->data.rbuf.data;
b411b363
PR
1554
1555 sector = be64_to_cpu(p->sector);
1556
87eeee41 1557 spin_lock_irq(&mdev->tconn->req_lock);
bc9c5c41 1558 req = find_request(mdev, &mdev->read_requests, p->block_id, sector, false, __func__);
87eeee41 1559 spin_unlock_irq(&mdev->tconn->req_lock);
c3afd8f5 1560 if (unlikely(!req))
81e84650 1561 return false;
b411b363 1562
24c4830c 1563 /* hlist_del(&req->collision) is done in _req_may_be_done, to avoid
b411b363
PR
1564 * special casing it there for the various failure cases.
1565 * still no race with drbd_fail_pending_reads */
1566 ok = recv_dless_read(mdev, req, sector, data_size);
1567
1568 if (ok)
8554df1c 1569 req_mod(req, DATA_RECEIVED);
b411b363
PR
1570 /* else: nothing. handled from drbd_disconnect...
1571 * I don't think we may complete this just yet
1572 * in case we are "on-disconnect: freeze" */
1573
1574 return ok;
1575}
1576
d8763023
AG
1577static int receive_RSDataReply(struct drbd_conf *mdev, enum drbd_packet cmd,
1578 unsigned int data_size)
b411b363
PR
1579{
1580 sector_t sector;
b411b363 1581 int ok;
e42325a5 1582 struct p_data *p = &mdev->tconn->data.rbuf.data;
b411b363
PR
1583
1584 sector = be64_to_cpu(p->sector);
1585 D_ASSERT(p->block_id == ID_SYNCER);
1586
1587 if (get_ldev(mdev)) {
1588 /* data is submitted to disk within recv_resync_read.
1589 * corresponding put_ldev done below on error,
fcefa62e 1590 * or in drbd_peer_request_endio. */
b411b363
PR
1591 ok = recv_resync_read(mdev, sector, data_size);
1592 } else {
1593 if (__ratelimit(&drbd_ratelimit_state))
1594 dev_err(DEV, "Can not write resync data to local disk.\n");
1595
1596 ok = drbd_drain_block(mdev, data_size);
1597
2b2bf214 1598 drbd_send_ack_dp(mdev, P_NEG_ACK, p, data_size);
b411b363
PR
1599 }
1600
778f271d
PR
1601 atomic_add(data_size >> 9, &mdev->rs_sect_in);
1602
b411b363
PR
1603 return ok;
1604}
1605
99920dc5 1606static int w_restart_write(struct drbd_work *w, int cancel)
7be8da07
AG
1607{
1608 struct drbd_request *req = container_of(w, struct drbd_request, w);
1609 struct drbd_conf *mdev = w->mdev;
1610 struct bio *bio;
1611 unsigned long start_time;
1612 unsigned long flags;
1613
1614 spin_lock_irqsave(&mdev->tconn->req_lock, flags);
1615 if (!expect(req->rq_state & RQ_POSTPONED)) {
1616 spin_unlock_irqrestore(&mdev->tconn->req_lock, flags);
99920dc5 1617 return -EIO;
7be8da07
AG
1618 }
1619 bio = req->master_bio;
1620 start_time = req->start_time;
1621 /* Postponed requests will not have their master_bio completed! */
1622 __req_mod(req, DISCARD_WRITE, NULL);
1623 spin_unlock_irqrestore(&mdev->tconn->req_lock, flags);
1624
1625 while (__drbd_make_request(mdev, bio, start_time))
1626 /* retry */ ;
99920dc5 1627 return 0;
7be8da07
AG
1628}
1629
1630static void restart_conflicting_writes(struct drbd_conf *mdev,
1631 sector_t sector, int size)
1632{
1633 struct drbd_interval *i;
1634 struct drbd_request *req;
1635
1636 drbd_for_each_overlap(i, &mdev->write_requests, sector, size) {
1637 if (!i->local)
1638 continue;
1639 req = container_of(i, struct drbd_request, i);
1640 if (req->rq_state & RQ_LOCAL_PENDING ||
1641 !(req->rq_state & RQ_POSTPONED))
1642 continue;
1643 if (expect(list_empty(&req->w.list))) {
1644 req->w.mdev = mdev;
1645 req->w.cb = w_restart_write;
1646 drbd_queue_work(&mdev->tconn->data.work, &req->w);
1647 }
1648 }
1649}
1650
b411b363
PR
1651/* e_end_block() is called via drbd_process_done_ee().
1652 * this means this function only runs in the asender thread
1653 */
99920dc5 1654static int e_end_block(struct drbd_work *w, int cancel)
b411b363 1655{
8050e6d0
AG
1656 struct drbd_peer_request *peer_req =
1657 container_of(w, struct drbd_peer_request, w);
00d56944 1658 struct drbd_conf *mdev = w->mdev;
db830c46 1659 sector_t sector = peer_req->i.sector;
99920dc5 1660 int err = 0, pcmd;
b411b363 1661
89e58e75 1662 if (mdev->tconn->net_conf->wire_protocol == DRBD_PROT_C) {
db830c46 1663 if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
b411b363
PR
1664 pcmd = (mdev->state.conn >= C_SYNC_SOURCE &&
1665 mdev->state.conn <= C_PAUSED_SYNC_T &&
db830c46 1666 peer_req->flags & EE_MAY_SET_IN_SYNC) ?
b411b363 1667 P_RS_WRITE_ACK : P_WRITE_ACK;
99920dc5 1668 err = drbd_send_ack(mdev, pcmd, peer_req);
b411b363 1669 if (pcmd == P_RS_WRITE_ACK)
db830c46 1670 drbd_set_in_sync(mdev, sector, peer_req->i.size);
b411b363 1671 } else {
99920dc5 1672 err = drbd_send_ack(mdev, P_NEG_ACK, peer_req);
b411b363
PR
1673 /* we expect it to be marked out of sync anyways...
1674 * maybe assert this? */
1675 }
1676 dec_unacked(mdev);
1677 }
1678 /* we delete from the conflict detection hash _after_ we sent out the
1679 * P_WRITE_ACK / P_NEG_ACK, to get the sequence number right. */
89e58e75 1680 if (mdev->tconn->net_conf->two_primaries) {
87eeee41 1681 spin_lock_irq(&mdev->tconn->req_lock);
db830c46
AG
1682 D_ASSERT(!drbd_interval_empty(&peer_req->i));
1683 drbd_remove_epoch_entry_interval(mdev, peer_req);
7be8da07
AG
1684 if (peer_req->flags & EE_RESTART_REQUESTS)
1685 restart_conflicting_writes(mdev, sector, peer_req->i.size);
87eeee41 1686 spin_unlock_irq(&mdev->tconn->req_lock);
bb3bfe96 1687 } else
db830c46 1688 D_ASSERT(drbd_interval_empty(&peer_req->i));
b411b363 1689
db830c46 1690 drbd_may_finish_epoch(mdev, peer_req->epoch, EV_PUT + (cancel ? EV_CLEANUP : 0));
b411b363 1691
99920dc5 1692 return err;
b411b363
PR
1693}
1694
7be8da07 1695static int e_send_ack(struct drbd_work *w, enum drbd_packet ack)
b411b363 1696{
7be8da07 1697 struct drbd_conf *mdev = w->mdev;
8050e6d0
AG
1698 struct drbd_peer_request *peer_req =
1699 container_of(w, struct drbd_peer_request, w);
99920dc5 1700 int err;
b411b363 1701
99920dc5 1702 err = drbd_send_ack(mdev, ack, peer_req);
b411b363
PR
1703 dec_unacked(mdev);
1704
99920dc5 1705 return err;
b411b363
PR
1706}
1707
99920dc5 1708static int e_send_discard_write(struct drbd_work *w, int unused)
7be8da07
AG
1709{
1710 return e_send_ack(w, P_DISCARD_WRITE);
1711}
1712
99920dc5 1713static int e_send_retry_write(struct drbd_work *w, int unused)
7be8da07
AG
1714{
1715 struct drbd_tconn *tconn = w->mdev->tconn;
1716
1717 return e_send_ack(w, tconn->agreed_pro_version >= 100 ?
1718 P_RETRY_WRITE : P_DISCARD_WRITE);
1719}
1720
3e394da1
AG
1721static bool seq_greater(u32 a, u32 b)
1722{
1723 /*
1724 * We assume 32-bit wrap-around here.
1725 * For 24-bit wrap-around, we would have to shift:
1726 * a <<= 8; b <<= 8;
1727 */
1728 return (s32)a - (s32)b > 0;
1729}
1730
1731static u32 seq_max(u32 a, u32 b)
1732{
1733 return seq_greater(a, b) ? a : b;
1734}
1735
7be8da07
AG
1736static bool need_peer_seq(struct drbd_conf *mdev)
1737{
1738 struct drbd_tconn *tconn = mdev->tconn;
1739
1740 /*
1741 * We only need to keep track of the last packet_seq number of our peer
1742 * if we are in dual-primary mode and we have the discard flag set; see
1743 * handle_write_conflicts().
1744 */
1745 return tconn->net_conf->two_primaries &&
1746 test_bit(DISCARD_CONCURRENT, &tconn->flags);
1747}
1748
43ae077d 1749static void update_peer_seq(struct drbd_conf *mdev, unsigned int peer_seq)
3e394da1 1750{
3c13b680 1751 unsigned int newest_peer_seq;
3e394da1 1752
7be8da07
AG
1753 if (need_peer_seq(mdev)) {
1754 spin_lock(&mdev->peer_seq_lock);
3c13b680
LE
1755 newest_peer_seq = seq_max(mdev->peer_seq, peer_seq);
1756 mdev->peer_seq = newest_peer_seq;
7be8da07 1757 spin_unlock(&mdev->peer_seq_lock);
3c13b680
LE
1758 /* wake up only if we actually changed mdev->peer_seq */
1759 if (peer_seq == newest_peer_seq)
7be8da07
AG
1760 wake_up(&mdev->seq_wait);
1761 }
3e394da1
AG
1762}
1763
b411b363
PR
1764/* Called from receive_Data.
1765 * Synchronize packets on sock with packets on msock.
1766 *
1767 * This is here so even when a P_DATA packet traveling via sock overtook an Ack
1768 * packet traveling on msock, they are still processed in the order they have
1769 * been sent.
1770 *
1771 * Note: we don't care for Ack packets overtaking P_DATA packets.
1772 *
1773 * In case packet_seq is larger than mdev->peer_seq number, there are
1774 * outstanding packets on the msock. We wait for them to arrive.
1775 * In case we are the logically next packet, we update mdev->peer_seq
1776 * ourselves. Correctly handles 32bit wrap around.
1777 *
1778 * Assume we have a 10 GBit connection, that is about 1<<30 byte per second,
1779 * about 1<<21 sectors per second. So "worst" case, we have 1<<3 == 8 seconds
1780 * for the 24bit wrap (historical atomic_t guarantee on some archs), and we have
1781 * 1<<9 == 512 seconds aka ages for the 32bit wrap around...
1782 *
1783 * returns 0 if we may process the packet,
1784 * -ERESTARTSYS if we were interrupted (by disconnect signal). */
7be8da07 1785static int wait_for_and_update_peer_seq(struct drbd_conf *mdev, const u32 peer_seq)
b411b363
PR
1786{
1787 DEFINE_WAIT(wait);
b411b363 1788 long timeout;
7be8da07
AG
1789 int ret;
1790
1791 if (!need_peer_seq(mdev))
1792 return 0;
1793
b411b363
PR
1794 spin_lock(&mdev->peer_seq_lock);
1795 for (;;) {
7be8da07
AG
1796 if (!seq_greater(peer_seq - 1, mdev->peer_seq)) {
1797 mdev->peer_seq = seq_max(mdev->peer_seq, peer_seq);
1798 ret = 0;
b411b363 1799 break;
7be8da07 1800 }
b411b363
PR
1801 if (signal_pending(current)) {
1802 ret = -ERESTARTSYS;
1803 break;
1804 }
7be8da07 1805 prepare_to_wait(&mdev->seq_wait, &wait, TASK_INTERRUPTIBLE);
b411b363 1806 spin_unlock(&mdev->peer_seq_lock);
71b1c1eb
AG
1807 timeout = mdev->tconn->net_conf->ping_timeo*HZ/10;
1808 timeout = schedule_timeout(timeout);
b411b363 1809 spin_lock(&mdev->peer_seq_lock);
7be8da07 1810 if (!timeout) {
b411b363 1811 ret = -ETIMEDOUT;
71b1c1eb 1812 dev_err(DEV, "Timed out waiting for missing ack packets; disconnecting\n");
b411b363
PR
1813 break;
1814 }
1815 }
b411b363 1816 spin_unlock(&mdev->peer_seq_lock);
7be8da07 1817 finish_wait(&mdev->seq_wait, &wait);
b411b363
PR
1818 return ret;
1819}
1820
688593c5
LE
1821/* see also bio_flags_to_wire()
1822 * DRBD_REQ_*, because we need to semantically map the flags to data packet
1823 * flags and back. We may replicate to other kernel versions. */
1824static unsigned long wire_flags_to_bio(struct drbd_conf *mdev, u32 dpf)
76d2e7ec 1825{
688593c5
LE
1826 return (dpf & DP_RW_SYNC ? REQ_SYNC : 0) |
1827 (dpf & DP_FUA ? REQ_FUA : 0) |
1828 (dpf & DP_FLUSH ? REQ_FLUSH : 0) |
1829 (dpf & DP_DISCARD ? REQ_DISCARD : 0);
76d2e7ec
PR
1830}
1831
7be8da07
AG
1832static void fail_postponed_requests(struct drbd_conf *mdev, sector_t sector,
1833 unsigned int size)
1834{
1835 struct drbd_interval *i;
1836
1837 repeat:
1838 drbd_for_each_overlap(i, &mdev->write_requests, sector, size) {
1839 struct drbd_request *req;
1840 struct bio_and_error m;
1841
1842 if (!i->local)
1843 continue;
1844 req = container_of(i, struct drbd_request, i);
1845 if (!(req->rq_state & RQ_POSTPONED))
1846 continue;
1847 req->rq_state &= ~RQ_POSTPONED;
1848 __req_mod(req, NEG_ACKED, &m);
1849 spin_unlock_irq(&mdev->tconn->req_lock);
1850 if (m.bio)
1851 complete_master_bio(mdev, &m);
1852 spin_lock_irq(&mdev->tconn->req_lock);
1853 goto repeat;
1854 }
1855}
1856
1857static int handle_write_conflicts(struct drbd_conf *mdev,
1858 struct drbd_peer_request *peer_req)
1859{
1860 struct drbd_tconn *tconn = mdev->tconn;
1861 bool resolve_conflicts = test_bit(DISCARD_CONCURRENT, &tconn->flags);
1862 sector_t sector = peer_req->i.sector;
1863 const unsigned int size = peer_req->i.size;
1864 struct drbd_interval *i;
1865 bool equal;
1866 int err;
1867
1868 /*
1869 * Inserting the peer request into the write_requests tree will prevent
1870 * new conflicting local requests from being added.
1871 */
1872 drbd_insert_interval(&mdev->write_requests, &peer_req->i);
1873
1874 repeat:
1875 drbd_for_each_overlap(i, &mdev->write_requests, sector, size) {
1876 if (i == &peer_req->i)
1877 continue;
1878
1879 if (!i->local) {
1880 /*
1881 * Our peer has sent a conflicting remote request; this
1882 * should not happen in a two-node setup. Wait for the
1883 * earlier peer request to complete.
1884 */
1885 err = drbd_wait_misc(mdev, i);
1886 if (err)
1887 goto out;
1888 goto repeat;
1889 }
1890
1891 equal = i->sector == sector && i->size == size;
1892 if (resolve_conflicts) {
1893 /*
1894 * If the peer request is fully contained within the
1895 * overlapping request, it can be discarded; otherwise,
1896 * it will be retried once all overlapping requests
1897 * have completed.
1898 */
1899 bool discard = i->sector <= sector && i->sector +
1900 (i->size >> 9) >= sector + (size >> 9);
1901
1902 if (!equal)
1903 dev_alert(DEV, "Concurrent writes detected: "
1904 "local=%llus +%u, remote=%llus +%u, "
1905 "assuming %s came first\n",
1906 (unsigned long long)i->sector, i->size,
1907 (unsigned long long)sector, size,
1908 discard ? "local" : "remote");
1909
1910 inc_unacked(mdev);
1911 peer_req->w.cb = discard ? e_send_discard_write :
1912 e_send_retry_write;
1913 list_add_tail(&peer_req->w.list, &mdev->done_ee);
1914 wake_asender(mdev->tconn);
1915
1916 err = -ENOENT;
1917 goto out;
1918 } else {
1919 struct drbd_request *req =
1920 container_of(i, struct drbd_request, i);
1921
1922 if (!equal)
1923 dev_alert(DEV, "Concurrent writes detected: "
1924 "local=%llus +%u, remote=%llus +%u\n",
1925 (unsigned long long)i->sector, i->size,
1926 (unsigned long long)sector, size);
1927
1928 if (req->rq_state & RQ_LOCAL_PENDING ||
1929 !(req->rq_state & RQ_POSTPONED)) {
1930 /*
1931 * Wait for the node with the discard flag to
1932 * decide if this request will be discarded or
1933 * retried. Requests that are discarded will
1934 * disappear from the write_requests tree.
1935 *
1936 * In addition, wait for the conflicting
1937 * request to finish locally before submitting
1938 * the conflicting peer request.
1939 */
1940 err = drbd_wait_misc(mdev, &req->i);
1941 if (err) {
1942 _conn_request_state(mdev->tconn,
1943 NS(conn, C_TIMEOUT),
1944 CS_HARD);
1945 fail_postponed_requests(mdev, sector, size);
1946 goto out;
1947 }
1948 goto repeat;
1949 }
1950 /*
1951 * Remember to restart the conflicting requests after
1952 * the new peer request has completed.
1953 */
1954 peer_req->flags |= EE_RESTART_REQUESTS;
1955 }
1956 }
1957 err = 0;
1958
1959 out:
1960 if (err)
1961 drbd_remove_epoch_entry_interval(mdev, peer_req);
1962 return err;
1963}
1964
b411b363 1965/* mirrored write */
d8763023
AG
1966static int receive_Data(struct drbd_conf *mdev, enum drbd_packet cmd,
1967 unsigned int data_size)
b411b363
PR
1968{
1969 sector_t sector;
db830c46 1970 struct drbd_peer_request *peer_req;
e42325a5 1971 struct p_data *p = &mdev->tconn->data.rbuf.data;
7be8da07 1972 u32 peer_seq = be32_to_cpu(p->seq_num);
b411b363
PR
1973 int rw = WRITE;
1974 u32 dp_flags;
7be8da07 1975 int err;
b411b363 1976
b411b363 1977
7be8da07
AG
1978 if (!get_ldev(mdev)) {
1979 err = wait_for_and_update_peer_seq(mdev, peer_seq);
2b2bf214 1980 drbd_send_ack_dp(mdev, P_NEG_ACK, p, data_size);
b411b363 1981 atomic_inc(&mdev->current_epoch->epoch_size);
7be8da07 1982 return drbd_drain_block(mdev, data_size) && err == 0;
b411b363
PR
1983 }
1984
fcefa62e
AG
1985 /*
1986 * Corresponding put_ldev done either below (on various errors), or in
1987 * drbd_peer_request_endio, if we successfully submit the data at the
1988 * end of this function.
1989 */
b411b363
PR
1990
1991 sector = be64_to_cpu(p->sector);
db830c46
AG
1992 peer_req = read_in_block(mdev, p->block_id, sector, data_size);
1993 if (!peer_req) {
b411b363 1994 put_ldev(mdev);
81e84650 1995 return false;
b411b363
PR
1996 }
1997
db830c46 1998 peer_req->w.cb = e_end_block;
b411b363 1999
688593c5
LE
2000 dp_flags = be32_to_cpu(p->dp_flags);
2001 rw |= wire_flags_to_bio(mdev, dp_flags);
2002
2003 if (dp_flags & DP_MAY_SET_IN_SYNC)
db830c46 2004 peer_req->flags |= EE_MAY_SET_IN_SYNC;
688593c5 2005
b411b363 2006 spin_lock(&mdev->epoch_lock);
db830c46
AG
2007 peer_req->epoch = mdev->current_epoch;
2008 atomic_inc(&peer_req->epoch->epoch_size);
2009 atomic_inc(&peer_req->epoch->active);
b411b363
PR
2010 spin_unlock(&mdev->epoch_lock);
2011
7be8da07
AG
2012 if (mdev->tconn->net_conf->two_primaries) {
2013 err = wait_for_and_update_peer_seq(mdev, peer_seq);
2014 if (err)
b411b363 2015 goto out_interrupted;
87eeee41 2016 spin_lock_irq(&mdev->tconn->req_lock);
7be8da07
AG
2017 err = handle_write_conflicts(mdev, peer_req);
2018 if (err) {
2019 spin_unlock_irq(&mdev->tconn->req_lock);
2020 if (err == -ENOENT) {
b411b363 2021 put_ldev(mdev);
81e84650 2022 return true;
b411b363 2023 }
7be8da07 2024 goto out_interrupted;
b411b363 2025 }
7be8da07
AG
2026 } else
2027 spin_lock_irq(&mdev->tconn->req_lock);
db830c46 2028 list_add(&peer_req->w.list, &mdev->active_ee);
87eeee41 2029 spin_unlock_irq(&mdev->tconn->req_lock);
b411b363 2030
89e58e75 2031 switch (mdev->tconn->net_conf->wire_protocol) {
b411b363
PR
2032 case DRBD_PROT_C:
2033 inc_unacked(mdev);
2034 /* corresponding dec_unacked() in e_end_block()
2035 * respective _drbd_clear_done_ee */
2036 break;
2037 case DRBD_PROT_B:
2038 /* I really don't like it that the receiver thread
2039 * sends on the msock, but anyways */
db830c46 2040 drbd_send_ack(mdev, P_RECV_ACK, peer_req);
b411b363
PR
2041 break;
2042 case DRBD_PROT_A:
2043 /* nothing to do */
2044 break;
2045 }
2046
6719fb03 2047 if (mdev->state.pdsk < D_INCONSISTENT) {
b411b363 2048 /* In case we have the only disk of the cluster, */
db830c46
AG
2049 drbd_set_out_of_sync(mdev, peer_req->i.sector, peer_req->i.size);
2050 peer_req->flags |= EE_CALL_AL_COMPLETE_IO;
2051 peer_req->flags &= ~EE_MAY_SET_IN_SYNC;
2052 drbd_al_begin_io(mdev, peer_req->i.sector);
b411b363
PR
2053 }
2054
fbe29dec 2055 if (drbd_submit_peer_request(mdev, peer_req, rw, DRBD_FAULT_DT_WR) == 0)
81e84650 2056 return true;
b411b363 2057
10f6d992
LE
2058 /* don't care for the reason here */
2059 dev_err(DEV, "submit failed, triggering re-connect\n");
87eeee41 2060 spin_lock_irq(&mdev->tconn->req_lock);
db830c46
AG
2061 list_del(&peer_req->w.list);
2062 drbd_remove_epoch_entry_interval(mdev, peer_req);
87eeee41 2063 spin_unlock_irq(&mdev->tconn->req_lock);
db830c46
AG
2064 if (peer_req->flags & EE_CALL_AL_COMPLETE_IO)
2065 drbd_al_complete_io(mdev, peer_req->i.sector);
22cc37a9 2066
b411b363 2067out_interrupted:
db830c46 2068 drbd_may_finish_epoch(mdev, peer_req->epoch, EV_PUT + EV_CLEANUP);
b411b363 2069 put_ldev(mdev);
db830c46 2070 drbd_free_ee(mdev, peer_req);
81e84650 2071 return false;
b411b363
PR
2072}
2073
0f0601f4
LE
2074/* We may throttle resync, if the lower device seems to be busy,
2075 * and current sync rate is above c_min_rate.
2076 *
2077 * To decide whether or not the lower device is busy, we use a scheme similar
2078 * to MD RAID is_mddev_idle(): if the partition stats reveal "significant"
2079 * (more than 64 sectors) of activity we cannot account for with our own resync
2080 * activity, it obviously is "busy".
2081 *
2082 * The current sync rate used here uses only the most recent two step marks,
2083 * to have a short time average so we can react faster.
2084 */
e3555d85 2085int drbd_rs_should_slow_down(struct drbd_conf *mdev, sector_t sector)
0f0601f4
LE
2086{
2087 struct gendisk *disk = mdev->ldev->backing_bdev->bd_contains->bd_disk;
2088 unsigned long db, dt, dbdt;
e3555d85 2089 struct lc_element *tmp;
0f0601f4
LE
2090 int curr_events;
2091 int throttle = 0;
2092
2093 /* feature disabled? */
f399002e 2094 if (mdev->ldev->dc.c_min_rate == 0)
0f0601f4
LE
2095 return 0;
2096
e3555d85
PR
2097 spin_lock_irq(&mdev->al_lock);
2098 tmp = lc_find(mdev->resync, BM_SECT_TO_EXT(sector));
2099 if (tmp) {
2100 struct bm_extent *bm_ext = lc_entry(tmp, struct bm_extent, lce);
2101 if (test_bit(BME_PRIORITY, &bm_ext->flags)) {
2102 spin_unlock_irq(&mdev->al_lock);
2103 return 0;
2104 }
2105 /* Do not slow down if app IO is already waiting for this extent */
2106 }
2107 spin_unlock_irq(&mdev->al_lock);
2108
0f0601f4
LE
2109 curr_events = (int)part_stat_read(&disk->part0, sectors[0]) +
2110 (int)part_stat_read(&disk->part0, sectors[1]) -
2111 atomic_read(&mdev->rs_sect_ev);
e3555d85 2112
0f0601f4
LE
2113 if (!mdev->rs_last_events || curr_events - mdev->rs_last_events > 64) {
2114 unsigned long rs_left;
2115 int i;
2116
2117 mdev->rs_last_events = curr_events;
2118
2119 /* sync speed average over the last 2*DRBD_SYNC_MARK_STEP,
2120 * approx. */
2649f080
LE
2121 i = (mdev->rs_last_mark + DRBD_SYNC_MARKS-1) % DRBD_SYNC_MARKS;
2122
2123 if (mdev->state.conn == C_VERIFY_S || mdev->state.conn == C_VERIFY_T)
2124 rs_left = mdev->ov_left;
2125 else
2126 rs_left = drbd_bm_total_weight(mdev) - mdev->rs_failed;
0f0601f4
LE
2127
2128 dt = ((long)jiffies - (long)mdev->rs_mark_time[i]) / HZ;
2129 if (!dt)
2130 dt++;
2131 db = mdev->rs_mark_left[i] - rs_left;
2132 dbdt = Bit2KB(db/dt);
2133
f399002e 2134 if (dbdt > mdev->ldev->dc.c_min_rate)
0f0601f4
LE
2135 throttle = 1;
2136 }
2137 return throttle;
2138}
2139
2140
d8763023
AG
2141static int receive_DataRequest(struct drbd_conf *mdev, enum drbd_packet cmd,
2142 unsigned int digest_size)
b411b363
PR
2143{
2144 sector_t sector;
2145 const sector_t capacity = drbd_get_capacity(mdev->this_bdev);
db830c46 2146 struct drbd_peer_request *peer_req;
b411b363 2147 struct digest_info *di = NULL;
b18b37be 2148 int size, verb;
b411b363 2149 unsigned int fault_type;
e42325a5 2150 struct p_block_req *p = &mdev->tconn->data.rbuf.block_req;
b411b363
PR
2151
2152 sector = be64_to_cpu(p->sector);
2153 size = be32_to_cpu(p->blksize);
2154
c670a398 2155 if (size <= 0 || !IS_ALIGNED(size, 512) || size > DRBD_MAX_BIO_SIZE) {
b411b363
PR
2156 dev_err(DEV, "%s:%d: sector: %llus, size: %u\n", __FILE__, __LINE__,
2157 (unsigned long long)sector, size);
81e84650 2158 return false;
b411b363
PR
2159 }
2160 if (sector + (size>>9) > capacity) {
2161 dev_err(DEV, "%s:%d: sector: %llus, size: %u\n", __FILE__, __LINE__,
2162 (unsigned long long)sector, size);
81e84650 2163 return false;
b411b363
PR
2164 }
2165
2166 if (!get_ldev_if_state(mdev, D_UP_TO_DATE)) {
b18b37be
PR
2167 verb = 1;
2168 switch (cmd) {
2169 case P_DATA_REQUEST:
2170 drbd_send_ack_rp(mdev, P_NEG_DREPLY, p);
2171 break;
2172 case P_RS_DATA_REQUEST:
2173 case P_CSUM_RS_REQUEST:
2174 case P_OV_REQUEST:
2175 drbd_send_ack_rp(mdev, P_NEG_RS_DREPLY , p);
2176 break;
2177 case P_OV_REPLY:
2178 verb = 0;
2179 dec_rs_pending(mdev);
2180 drbd_send_ack_ex(mdev, P_OV_RESULT, sector, size, ID_IN_SYNC);
2181 break;
2182 default:
2183 dev_err(DEV, "unexpected command (%s) in receive_DataRequest\n",
2184 cmdname(cmd));
2185 }
2186 if (verb && __ratelimit(&drbd_ratelimit_state))
b411b363
PR
2187 dev_err(DEV, "Can not satisfy peer's read request, "
2188 "no local data.\n");
b18b37be 2189
a821cc4a
LE
2190 /* drain possibly payload */
2191 return drbd_drain_block(mdev, digest_size);
b411b363
PR
2192 }
2193
2194 /* GFP_NOIO, because we must not cause arbitrary write-out: in a DRBD
2195 * "criss-cross" setup, that might cause write-out on some other DRBD,
2196 * which in turn might block on the other node at this very place. */
db830c46
AG
2197 peer_req = drbd_alloc_ee(mdev, p->block_id, sector, size, GFP_NOIO);
2198 if (!peer_req) {
b411b363 2199 put_ldev(mdev);
81e84650 2200 return false;
b411b363
PR
2201 }
2202
02918be2 2203 switch (cmd) {
b411b363 2204 case P_DATA_REQUEST:
db830c46 2205 peer_req->w.cb = w_e_end_data_req;
b411b363 2206 fault_type = DRBD_FAULT_DT_RD;
80a40e43
LE
2207 /* application IO, don't drbd_rs_begin_io */
2208 goto submit;
2209
b411b363 2210 case P_RS_DATA_REQUEST:
db830c46 2211 peer_req->w.cb = w_e_end_rsdata_req;
b411b363 2212 fault_type = DRBD_FAULT_RS_RD;
5f9915bb
LE
2213 /* used in the sector offset progress display */
2214 mdev->bm_resync_fo = BM_SECT_TO_BIT(sector);
b411b363
PR
2215 break;
2216
2217 case P_OV_REPLY:
2218 case P_CSUM_RS_REQUEST:
2219 fault_type = DRBD_FAULT_RS_RD;
b411b363
PR
2220 di = kmalloc(sizeof(*di) + digest_size, GFP_NOIO);
2221 if (!di)
2222 goto out_free_e;
2223
2224 di->digest_size = digest_size;
2225 di->digest = (((char *)di)+sizeof(struct digest_info));
2226
db830c46
AG
2227 peer_req->digest = di;
2228 peer_req->flags |= EE_HAS_DIGEST;
c36c3ced 2229
de0ff338 2230 if (drbd_recv(mdev->tconn, di->digest, digest_size) != digest_size)
b411b363
PR
2231 goto out_free_e;
2232
02918be2 2233 if (cmd == P_CSUM_RS_REQUEST) {
31890f4a 2234 D_ASSERT(mdev->tconn->agreed_pro_version >= 89);
db830c46 2235 peer_req->w.cb = w_e_end_csum_rs_req;
5f9915bb
LE
2236 /* used in the sector offset progress display */
2237 mdev->bm_resync_fo = BM_SECT_TO_BIT(sector);
02918be2 2238 } else if (cmd == P_OV_REPLY) {
2649f080
LE
2239 /* track progress, we may need to throttle */
2240 atomic_add(size >> 9, &mdev->rs_sect_in);
db830c46 2241 peer_req->w.cb = w_e_end_ov_reply;
b411b363 2242 dec_rs_pending(mdev);
0f0601f4
LE
2243 /* drbd_rs_begin_io done when we sent this request,
2244 * but accounting still needs to be done. */
2245 goto submit_for_resync;
b411b363
PR
2246 }
2247 break;
2248
2249 case P_OV_REQUEST:
b411b363 2250 if (mdev->ov_start_sector == ~(sector_t)0 &&
31890f4a 2251 mdev->tconn->agreed_pro_version >= 90) {
de228bba
LE
2252 unsigned long now = jiffies;
2253 int i;
b411b363
PR
2254 mdev->ov_start_sector = sector;
2255 mdev->ov_position = sector;
30b743a2
LE
2256 mdev->ov_left = drbd_bm_bits(mdev) - BM_SECT_TO_BIT(sector);
2257 mdev->rs_total = mdev->ov_left;
de228bba
LE
2258 for (i = 0; i < DRBD_SYNC_MARKS; i++) {
2259 mdev->rs_mark_left[i] = mdev->ov_left;
2260 mdev->rs_mark_time[i] = now;
2261 }
b411b363
PR
2262 dev_info(DEV, "Online Verify start sector: %llu\n",
2263 (unsigned long long)sector);
2264 }
db830c46 2265 peer_req->w.cb = w_e_end_ov_req;
b411b363 2266 fault_type = DRBD_FAULT_RS_RD;
b411b363
PR
2267 break;
2268
b411b363
PR
2269 default:
2270 dev_err(DEV, "unexpected command (%s) in receive_DataRequest\n",
02918be2 2271 cmdname(cmd));
b411b363 2272 fault_type = DRBD_FAULT_MAX;
80a40e43 2273 goto out_free_e;
b411b363
PR
2274 }
2275
0f0601f4
LE
2276 /* Throttle, drbd_rs_begin_io and submit should become asynchronous
2277 * wrt the receiver, but it is not as straightforward as it may seem.
2278 * Various places in the resync start and stop logic assume resync
2279 * requests are processed in order, requeuing this on the worker thread
2280 * introduces a bunch of new code for synchronization between threads.
2281 *
2282 * Unlimited throttling before drbd_rs_begin_io may stall the resync
2283 * "forever", throttling after drbd_rs_begin_io will lock that extent
2284 * for application writes for the same time. For now, just throttle
2285 * here, where the rest of the code expects the receiver to sleep for
2286 * a while, anyways.
2287 */
2288
2289 /* Throttle before drbd_rs_begin_io, as that locks out application IO;
2290 * this defers syncer requests for some time, before letting at least
2291 * on request through. The resync controller on the receiving side
2292 * will adapt to the incoming rate accordingly.
2293 *
2294 * We cannot throttle here if remote is Primary/SyncTarget:
2295 * we would also throttle its application reads.
2296 * In that case, throttling is done on the SyncTarget only.
2297 */
e3555d85
PR
2298 if (mdev->state.peer != R_PRIMARY && drbd_rs_should_slow_down(mdev, sector))
2299 schedule_timeout_uninterruptible(HZ/10);
2300 if (drbd_rs_begin_io(mdev, sector))
80a40e43 2301 goto out_free_e;
b411b363 2302
0f0601f4
LE
2303submit_for_resync:
2304 atomic_add(size >> 9, &mdev->rs_sect_ev);
2305
80a40e43 2306submit:
b411b363 2307 inc_unacked(mdev);
87eeee41 2308 spin_lock_irq(&mdev->tconn->req_lock);
db830c46 2309 list_add_tail(&peer_req->w.list, &mdev->read_ee);
87eeee41 2310 spin_unlock_irq(&mdev->tconn->req_lock);
b411b363 2311
fbe29dec 2312 if (drbd_submit_peer_request(mdev, peer_req, READ, fault_type) == 0)
81e84650 2313 return true;
b411b363 2314
10f6d992
LE
2315 /* don't care for the reason here */
2316 dev_err(DEV, "submit failed, triggering re-connect\n");
87eeee41 2317 spin_lock_irq(&mdev->tconn->req_lock);
db830c46 2318 list_del(&peer_req->w.list);
87eeee41 2319 spin_unlock_irq(&mdev->tconn->req_lock);
22cc37a9
LE
2320 /* no drbd_rs_complete_io(), we are dropping the connection anyways */
2321
b411b363 2322out_free_e:
b411b363 2323 put_ldev(mdev);
db830c46 2324 drbd_free_ee(mdev, peer_req);
81e84650 2325 return false;
b411b363
PR
2326}
2327
2328static int drbd_asb_recover_0p(struct drbd_conf *mdev) __must_hold(local)
2329{
2330 int self, peer, rv = -100;
2331 unsigned long ch_self, ch_peer;
2332
2333 self = mdev->ldev->md.uuid[UI_BITMAP] & 1;
2334 peer = mdev->p_uuid[UI_BITMAP] & 1;
2335
2336 ch_peer = mdev->p_uuid[UI_SIZE];
2337 ch_self = mdev->comm_bm_set;
2338
89e58e75 2339 switch (mdev->tconn->net_conf->after_sb_0p) {
b411b363
PR
2340 case ASB_CONSENSUS:
2341 case ASB_DISCARD_SECONDARY:
2342 case ASB_CALL_HELPER:
2343 dev_err(DEV, "Configuration error.\n");
2344 break;
2345 case ASB_DISCONNECT:
2346 break;
2347 case ASB_DISCARD_YOUNGER_PRI:
2348 if (self == 0 && peer == 1) {
2349 rv = -1;
2350 break;
2351 }
2352 if (self == 1 && peer == 0) {
2353 rv = 1;
2354 break;
2355 }
2356 /* Else fall through to one of the other strategies... */
2357 case ASB_DISCARD_OLDER_PRI:
2358 if (self == 0 && peer == 1) {
2359 rv = 1;
2360 break;
2361 }
2362 if (self == 1 && peer == 0) {
2363 rv = -1;
2364 break;
2365 }
2366 /* Else fall through to one of the other strategies... */
ad19bf6e 2367 dev_warn(DEV, "Discard younger/older primary did not find a decision\n"
b411b363
PR
2368 "Using discard-least-changes instead\n");
2369 case ASB_DISCARD_ZERO_CHG:
2370 if (ch_peer == 0 && ch_self == 0) {
25703f83 2371 rv = test_bit(DISCARD_CONCURRENT, &mdev->tconn->flags)
b411b363
PR
2372 ? -1 : 1;
2373 break;
2374 } else {
2375 if (ch_peer == 0) { rv = 1; break; }
2376 if (ch_self == 0) { rv = -1; break; }
2377 }
89e58e75 2378 if (mdev->tconn->net_conf->after_sb_0p == ASB_DISCARD_ZERO_CHG)
b411b363
PR
2379 break;
2380 case ASB_DISCARD_LEAST_CHG:
2381 if (ch_self < ch_peer)
2382 rv = -1;
2383 else if (ch_self > ch_peer)
2384 rv = 1;
2385 else /* ( ch_self == ch_peer ) */
2386 /* Well, then use something else. */
25703f83 2387 rv = test_bit(DISCARD_CONCURRENT, &mdev->tconn->flags)
b411b363
PR
2388 ? -1 : 1;
2389 break;
2390 case ASB_DISCARD_LOCAL:
2391 rv = -1;
2392 break;
2393 case ASB_DISCARD_REMOTE:
2394 rv = 1;
2395 }
2396
2397 return rv;
2398}
2399
2400static int drbd_asb_recover_1p(struct drbd_conf *mdev) __must_hold(local)
2401{
6184ea21 2402 int hg, rv = -100;
b411b363 2403
89e58e75 2404 switch (mdev->tconn->net_conf->after_sb_1p) {
b411b363
PR
2405 case ASB_DISCARD_YOUNGER_PRI:
2406 case ASB_DISCARD_OLDER_PRI:
2407 case ASB_DISCARD_LEAST_CHG:
2408 case ASB_DISCARD_LOCAL:
2409 case ASB_DISCARD_REMOTE:
2410 dev_err(DEV, "Configuration error.\n");
2411 break;
2412 case ASB_DISCONNECT:
2413 break;
2414 case ASB_CONSENSUS:
2415 hg = drbd_asb_recover_0p(mdev);
2416 if (hg == -1 && mdev->state.role == R_SECONDARY)
2417 rv = hg;
2418 if (hg == 1 && mdev->state.role == R_PRIMARY)
2419 rv = hg;
2420 break;
2421 case ASB_VIOLENTLY:
2422 rv = drbd_asb_recover_0p(mdev);
2423 break;
2424 case ASB_DISCARD_SECONDARY:
2425 return mdev->state.role == R_PRIMARY ? 1 : -1;
2426 case ASB_CALL_HELPER:
2427 hg = drbd_asb_recover_0p(mdev);
2428 if (hg == -1 && mdev->state.role == R_PRIMARY) {
bb437946
AG
2429 enum drbd_state_rv rv2;
2430
2431 drbd_set_role(mdev, R_SECONDARY, 0);
b411b363
PR
2432 /* drbd_change_state() does not sleep while in SS_IN_TRANSIENT_STATE,
2433 * we might be here in C_WF_REPORT_PARAMS which is transient.
2434 * we do not need to wait for the after state change work either. */
bb437946
AG
2435 rv2 = drbd_change_state(mdev, CS_VERBOSE, NS(role, R_SECONDARY));
2436 if (rv2 != SS_SUCCESS) {
b411b363
PR
2437 drbd_khelper(mdev, "pri-lost-after-sb");
2438 } else {
2439 dev_warn(DEV, "Successfully gave up primary role.\n");
2440 rv = hg;
2441 }
2442 } else
2443 rv = hg;
2444 }
2445
2446 return rv;
2447}
2448
2449static int drbd_asb_recover_2p(struct drbd_conf *mdev) __must_hold(local)
2450{
6184ea21 2451 int hg, rv = -100;
b411b363 2452
89e58e75 2453 switch (mdev->tconn->net_conf->after_sb_2p) {
b411b363
PR
2454 case ASB_DISCARD_YOUNGER_PRI:
2455 case ASB_DISCARD_OLDER_PRI:
2456 case ASB_DISCARD_LEAST_CHG:
2457 case ASB_DISCARD_LOCAL:
2458 case ASB_DISCARD_REMOTE:
2459 case ASB_CONSENSUS:
2460 case ASB_DISCARD_SECONDARY:
2461 dev_err(DEV, "Configuration error.\n");
2462 break;
2463 case ASB_VIOLENTLY:
2464 rv = drbd_asb_recover_0p(mdev);
2465 break;
2466 case ASB_DISCONNECT:
2467 break;
2468 case ASB_CALL_HELPER:
2469 hg = drbd_asb_recover_0p(mdev);
2470 if (hg == -1) {
bb437946
AG
2471 enum drbd_state_rv rv2;
2472
b411b363
PR
2473 /* drbd_change_state() does not sleep while in SS_IN_TRANSIENT_STATE,
2474 * we might be here in C_WF_REPORT_PARAMS which is transient.
2475 * we do not need to wait for the after state change work either. */
bb437946
AG
2476 rv2 = drbd_change_state(mdev, CS_VERBOSE, NS(role, R_SECONDARY));
2477 if (rv2 != SS_SUCCESS) {
b411b363
PR
2478 drbd_khelper(mdev, "pri-lost-after-sb");
2479 } else {
2480 dev_warn(DEV, "Successfully gave up primary role.\n");
2481 rv = hg;
2482 }
2483 } else
2484 rv = hg;
2485 }
2486
2487 return rv;
2488}
2489
2490static void drbd_uuid_dump(struct drbd_conf *mdev, char *text, u64 *uuid,
2491 u64 bits, u64 flags)
2492{
2493 if (!uuid) {
2494 dev_info(DEV, "%s uuid info vanished while I was looking!\n", text);
2495 return;
2496 }
2497 dev_info(DEV, "%s %016llX:%016llX:%016llX:%016llX bits:%llu flags:%llX\n",
2498 text,
2499 (unsigned long long)uuid[UI_CURRENT],
2500 (unsigned long long)uuid[UI_BITMAP],
2501 (unsigned long long)uuid[UI_HISTORY_START],
2502 (unsigned long long)uuid[UI_HISTORY_END],
2503 (unsigned long long)bits,
2504 (unsigned long long)flags);
2505}
2506
2507/*
2508 100 after split brain try auto recover
2509 2 C_SYNC_SOURCE set BitMap
2510 1 C_SYNC_SOURCE use BitMap
2511 0 no Sync
2512 -1 C_SYNC_TARGET use BitMap
2513 -2 C_SYNC_TARGET set BitMap
2514 -100 after split brain, disconnect
2515-1000 unrelated data
4a23f264
PR
2516-1091 requires proto 91
2517-1096 requires proto 96
b411b363
PR
2518 */
2519static int drbd_uuid_compare(struct drbd_conf *mdev, int *rule_nr) __must_hold(local)
2520{
2521 u64 self, peer;
2522 int i, j;
2523
2524 self = mdev->ldev->md.uuid[UI_CURRENT] & ~((u64)1);
2525 peer = mdev->p_uuid[UI_CURRENT] & ~((u64)1);
2526
2527 *rule_nr = 10;
2528 if (self == UUID_JUST_CREATED && peer == UUID_JUST_CREATED)
2529 return 0;
2530
2531 *rule_nr = 20;
2532 if ((self == UUID_JUST_CREATED || self == (u64)0) &&
2533 peer != UUID_JUST_CREATED)
2534 return -2;
2535
2536 *rule_nr = 30;
2537 if (self != UUID_JUST_CREATED &&
2538 (peer == UUID_JUST_CREATED || peer == (u64)0))
2539 return 2;
2540
2541 if (self == peer) {
2542 int rct, dc; /* roles at crash time */
2543
2544 if (mdev->p_uuid[UI_BITMAP] == (u64)0 && mdev->ldev->md.uuid[UI_BITMAP] != (u64)0) {
2545
31890f4a 2546 if (mdev->tconn->agreed_pro_version < 91)
4a23f264 2547 return -1091;
b411b363
PR
2548
2549 if ((mdev->ldev->md.uuid[UI_BITMAP] & ~((u64)1)) == (mdev->p_uuid[UI_HISTORY_START] & ~((u64)1)) &&
2550 (mdev->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) == (mdev->p_uuid[UI_HISTORY_START + 1] & ~((u64)1))) {
2551 dev_info(DEV, "was SyncSource, missed the resync finished event, corrected myself:\n");
2552 drbd_uuid_set_bm(mdev, 0UL);
2553
2554 drbd_uuid_dump(mdev, "self", mdev->ldev->md.uuid,
2555 mdev->state.disk >= D_NEGOTIATING ? drbd_bm_total_weight(mdev) : 0, 0);
2556 *rule_nr = 34;
2557 } else {
2558 dev_info(DEV, "was SyncSource (peer failed to write sync_uuid)\n");
2559 *rule_nr = 36;
2560 }
2561
2562 return 1;
2563 }
2564
2565 if (mdev->ldev->md.uuid[UI_BITMAP] == (u64)0 && mdev->p_uuid[UI_BITMAP] != (u64)0) {
2566
31890f4a 2567 if (mdev->tconn->agreed_pro_version < 91)
4a23f264 2568 return -1091;
b411b363
PR
2569
2570 if ((mdev->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) == (mdev->p_uuid[UI_BITMAP] & ~((u64)1)) &&
2571 (mdev->ldev->md.uuid[UI_HISTORY_START + 1] & ~((u64)1)) == (mdev->p_uuid[UI_HISTORY_START] & ~((u64)1))) {
2572 dev_info(DEV, "was SyncTarget, peer missed the resync finished event, corrected peer:\n");
2573
2574 mdev->p_uuid[UI_HISTORY_START + 1] = mdev->p_uuid[UI_HISTORY_START];
2575 mdev->p_uuid[UI_HISTORY_START] = mdev->p_uuid[UI_BITMAP];
2576 mdev->p_uuid[UI_BITMAP] = 0UL;
2577
2578 drbd_uuid_dump(mdev, "peer", mdev->p_uuid, mdev->p_uuid[UI_SIZE], mdev->p_uuid[UI_FLAGS]);
2579 *rule_nr = 35;
2580 } else {
2581 dev_info(DEV, "was SyncTarget (failed to write sync_uuid)\n");
2582 *rule_nr = 37;
2583 }
2584
2585 return -1;
2586 }
2587
2588 /* Common power [off|failure] */
2589 rct = (test_bit(CRASHED_PRIMARY, &mdev->flags) ? 1 : 0) +
2590 (mdev->p_uuid[UI_FLAGS] & 2);
2591 /* lowest bit is set when we were primary,
2592 * next bit (weight 2) is set when peer was primary */
2593 *rule_nr = 40;
2594
2595 switch (rct) {
2596 case 0: /* !self_pri && !peer_pri */ return 0;
2597 case 1: /* self_pri && !peer_pri */ return 1;
2598 case 2: /* !self_pri && peer_pri */ return -1;
2599 case 3: /* self_pri && peer_pri */
25703f83 2600 dc = test_bit(DISCARD_CONCURRENT, &mdev->tconn->flags);
b411b363
PR
2601 return dc ? -1 : 1;
2602 }
2603 }
2604
2605 *rule_nr = 50;
2606 peer = mdev->p_uuid[UI_BITMAP] & ~((u64)1);
2607 if (self == peer)
2608 return -1;
2609
2610 *rule_nr = 51;
2611 peer = mdev->p_uuid[UI_HISTORY_START] & ~((u64)1);
2612 if (self == peer) {
31890f4a 2613 if (mdev->tconn->agreed_pro_version < 96 ?
4a23f264
PR
2614 (mdev->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) ==
2615 (mdev->p_uuid[UI_HISTORY_START + 1] & ~((u64)1)) :
2616 peer + UUID_NEW_BM_OFFSET == (mdev->p_uuid[UI_BITMAP] & ~((u64)1))) {
b411b363
PR
2617 /* The last P_SYNC_UUID did not get though. Undo the last start of
2618 resync as sync source modifications of the peer's UUIDs. */
2619
31890f4a 2620 if (mdev->tconn->agreed_pro_version < 91)
4a23f264 2621 return -1091;
b411b363
PR
2622
2623 mdev->p_uuid[UI_BITMAP] = mdev->p_uuid[UI_HISTORY_START];
2624 mdev->p_uuid[UI_HISTORY_START] = mdev->p_uuid[UI_HISTORY_START + 1];
4a23f264
PR
2625
2626 dev_info(DEV, "Did not got last syncUUID packet, corrected:\n");
2627 drbd_uuid_dump(mdev, "peer", mdev->p_uuid, mdev->p_uuid[UI_SIZE], mdev->p_uuid[UI_FLAGS]);
2628
b411b363
PR
2629 return -1;
2630 }
2631 }
2632
2633 *rule_nr = 60;
2634 self = mdev->ldev->md.uuid[UI_CURRENT] & ~((u64)1);
2635 for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
2636 peer = mdev->p_uuid[i] & ~((u64)1);
2637 if (self == peer)
2638 return -2;
2639 }
2640
2641 *rule_nr = 70;
2642 self = mdev->ldev->md.uuid[UI_BITMAP] & ~((u64)1);
2643 peer = mdev->p_uuid[UI_CURRENT] & ~((u64)1);
2644 if (self == peer)
2645 return 1;
2646
2647 *rule_nr = 71;
2648 self = mdev->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1);
2649 if (self == peer) {
31890f4a 2650 if (mdev->tconn->agreed_pro_version < 96 ?
4a23f264
PR
2651 (mdev->ldev->md.uuid[UI_HISTORY_START + 1] & ~((u64)1)) ==
2652 (mdev->p_uuid[UI_HISTORY_START] & ~((u64)1)) :
2653 self + UUID_NEW_BM_OFFSET == (mdev->ldev->md.uuid[UI_BITMAP] & ~((u64)1))) {
b411b363
PR
2654 /* The last P_SYNC_UUID did not get though. Undo the last start of
2655 resync as sync source modifications of our UUIDs. */
2656
31890f4a 2657 if (mdev->tconn->agreed_pro_version < 91)
4a23f264 2658 return -1091;
b411b363
PR
2659
2660 _drbd_uuid_set(mdev, UI_BITMAP, mdev->ldev->md.uuid[UI_HISTORY_START]);
2661 _drbd_uuid_set(mdev, UI_HISTORY_START, mdev->ldev->md.uuid[UI_HISTORY_START + 1]);
2662
4a23f264 2663 dev_info(DEV, "Last syncUUID did not get through, corrected:\n");
b411b363
PR
2664 drbd_uuid_dump(mdev, "self", mdev->ldev->md.uuid,
2665 mdev->state.disk >= D_NEGOTIATING ? drbd_bm_total_weight(mdev) : 0, 0);
2666
2667 return 1;
2668 }
2669 }
2670
2671
2672 *rule_nr = 80;
d8c2a36b 2673 peer = mdev->p_uuid[UI_CURRENT] & ~((u64)1);
b411b363
PR
2674 for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
2675 self = mdev->ldev->md.uuid[i] & ~((u64)1);
2676 if (self == peer)
2677 return 2;
2678 }
2679
2680 *rule_nr = 90;
2681 self = mdev->ldev->md.uuid[UI_BITMAP] & ~((u64)1);
2682 peer = mdev->p_uuid[UI_BITMAP] & ~((u64)1);
2683 if (self == peer && self != ((u64)0))
2684 return 100;
2685
2686 *rule_nr = 100;
2687 for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
2688 self = mdev->ldev->md.uuid[i] & ~((u64)1);
2689 for (j = UI_HISTORY_START; j <= UI_HISTORY_END; j++) {
2690 peer = mdev->p_uuid[j] & ~((u64)1);
2691 if (self == peer)
2692 return -100;
2693 }
2694 }
2695
2696 return -1000;
2697}
2698
2699/* drbd_sync_handshake() returns the new conn state on success, or
2700 CONN_MASK (-1) on failure.
2701 */
2702static enum drbd_conns drbd_sync_handshake(struct drbd_conf *mdev, enum drbd_role peer_role,
2703 enum drbd_disk_state peer_disk) __must_hold(local)
2704{
2705 int hg, rule_nr;
2706 enum drbd_conns rv = C_MASK;
2707 enum drbd_disk_state mydisk;
2708
2709 mydisk = mdev->state.disk;
2710 if (mydisk == D_NEGOTIATING)
2711 mydisk = mdev->new_state_tmp.disk;
2712
2713 dev_info(DEV, "drbd_sync_handshake:\n");
2714 drbd_uuid_dump(mdev, "self", mdev->ldev->md.uuid, mdev->comm_bm_set, 0);
2715 drbd_uuid_dump(mdev, "peer", mdev->p_uuid,
2716 mdev->p_uuid[UI_SIZE], mdev->p_uuid[UI_FLAGS]);
2717
2718 hg = drbd_uuid_compare(mdev, &rule_nr);
2719
2720 dev_info(DEV, "uuid_compare()=%d by rule %d\n", hg, rule_nr);
2721
2722 if (hg == -1000) {
2723 dev_alert(DEV, "Unrelated data, aborting!\n");
2724 return C_MASK;
2725 }
4a23f264
PR
2726 if (hg < -1000) {
2727 dev_alert(DEV, "To resolve this both sides have to support at least protocol %d\n", -hg - 1000);
b411b363
PR
2728 return C_MASK;
2729 }
2730
2731 if ((mydisk == D_INCONSISTENT && peer_disk > D_INCONSISTENT) ||
2732 (peer_disk == D_INCONSISTENT && mydisk > D_INCONSISTENT)) {
2733 int f = (hg == -100) || abs(hg) == 2;
2734 hg = mydisk > D_INCONSISTENT ? 1 : -1;
2735 if (f)
2736 hg = hg*2;
2737 dev_info(DEV, "Becoming sync %s due to disk states.\n",
2738 hg > 0 ? "source" : "target");
2739 }
2740
3a11a487
AG
2741 if (abs(hg) == 100)
2742 drbd_khelper(mdev, "initial-split-brain");
2743
89e58e75 2744 if (hg == 100 || (hg == -100 && mdev->tconn->net_conf->always_asbp)) {
b411b363
PR
2745 int pcount = (mdev->state.role == R_PRIMARY)
2746 + (peer_role == R_PRIMARY);
2747 int forced = (hg == -100);
2748
2749 switch (pcount) {
2750 case 0:
2751 hg = drbd_asb_recover_0p(mdev);
2752 break;
2753 case 1:
2754 hg = drbd_asb_recover_1p(mdev);
2755 break;
2756 case 2:
2757 hg = drbd_asb_recover_2p(mdev);
2758 break;
2759 }
2760 if (abs(hg) < 100) {
2761 dev_warn(DEV, "Split-Brain detected, %d primaries, "
2762 "automatically solved. Sync from %s node\n",
2763 pcount, (hg < 0) ? "peer" : "this");
2764 if (forced) {
2765 dev_warn(DEV, "Doing a full sync, since"
2766 " UUIDs where ambiguous.\n");
2767 hg = hg*2;
2768 }
2769 }
2770 }
2771
2772 if (hg == -100) {
89e58e75 2773 if (mdev->tconn->net_conf->want_lose && !(mdev->p_uuid[UI_FLAGS]&1))
b411b363 2774 hg = -1;
89e58e75 2775 if (!mdev->tconn->net_conf->want_lose && (mdev->p_uuid[UI_FLAGS]&1))
b411b363
PR
2776 hg = 1;
2777
2778 if (abs(hg) < 100)
2779 dev_warn(DEV, "Split-Brain detected, manually solved. "
2780 "Sync from %s node\n",
2781 (hg < 0) ? "peer" : "this");
2782 }
2783
2784 if (hg == -100) {
580b9767
LE
2785 /* FIXME this log message is not correct if we end up here
2786 * after an attempted attach on a diskless node.
2787 * We just refuse to attach -- well, we drop the "connection"
2788 * to that disk, in a way... */
3a11a487 2789 dev_alert(DEV, "Split-Brain detected but unresolved, dropping connection!\n");
b411b363
PR
2790 drbd_khelper(mdev, "split-brain");
2791 return C_MASK;
2792 }
2793
2794 if (hg > 0 && mydisk <= D_INCONSISTENT) {
2795 dev_err(DEV, "I shall become SyncSource, but I am inconsistent!\n");
2796 return C_MASK;
2797 }
2798
2799 if (hg < 0 && /* by intention we do not use mydisk here. */
2800 mdev->state.role == R_PRIMARY && mdev->state.disk >= D_CONSISTENT) {
89e58e75 2801 switch (mdev->tconn->net_conf->rr_conflict) {
b411b363
PR
2802 case ASB_CALL_HELPER:
2803 drbd_khelper(mdev, "pri-lost");
2804 /* fall through */
2805 case ASB_DISCONNECT:
2806 dev_err(DEV, "I shall become SyncTarget, but I am primary!\n");
2807 return C_MASK;
2808 case ASB_VIOLENTLY:
2809 dev_warn(DEV, "Becoming SyncTarget, violating the stable-data"
2810 "assumption\n");
2811 }
2812 }
2813
8169e41b 2814 if (mdev->tconn->net_conf->dry_run || test_bit(CONN_DRY_RUN, &mdev->tconn->flags)) {
cf14c2e9
PR
2815 if (hg == 0)
2816 dev_info(DEV, "dry-run connect: No resync, would become Connected immediately.\n");
2817 else
2818 dev_info(DEV, "dry-run connect: Would become %s, doing a %s resync.",
2819 drbd_conn_str(hg > 0 ? C_SYNC_SOURCE : C_SYNC_TARGET),
2820 abs(hg) >= 2 ? "full" : "bit-map based");
2821 return C_MASK;
2822 }
2823
b411b363
PR
2824 if (abs(hg) >= 2) {
2825 dev_info(DEV, "Writing the whole bitmap, full sync required after drbd_sync_handshake.\n");
20ceb2b2
LE
2826 if (drbd_bitmap_io(mdev, &drbd_bmio_set_n_write, "set_n_write from sync_handshake",
2827 BM_LOCKED_SET_ALLOWED))
b411b363
PR
2828 return C_MASK;
2829 }
2830
2831 if (hg > 0) { /* become sync source. */
2832 rv = C_WF_BITMAP_S;
2833 } else if (hg < 0) { /* become sync target */
2834 rv = C_WF_BITMAP_T;
2835 } else {
2836 rv = C_CONNECTED;
2837 if (drbd_bm_total_weight(mdev)) {
2838 dev_info(DEV, "No resync, but %lu bits in bitmap!\n",
2839 drbd_bm_total_weight(mdev));
2840 }
2841 }
2842
2843 return rv;
2844}
2845
2846/* returns 1 if invalid */
2847static int cmp_after_sb(enum drbd_after_sb_p peer, enum drbd_after_sb_p self)
2848{
2849 /* ASB_DISCARD_REMOTE - ASB_DISCARD_LOCAL is valid */
2850 if ((peer == ASB_DISCARD_REMOTE && self == ASB_DISCARD_LOCAL) ||
2851 (self == ASB_DISCARD_REMOTE && peer == ASB_DISCARD_LOCAL))
2852 return 0;
2853
2854 /* any other things with ASB_DISCARD_REMOTE or ASB_DISCARD_LOCAL are invalid */
2855 if (peer == ASB_DISCARD_REMOTE || peer == ASB_DISCARD_LOCAL ||
2856 self == ASB_DISCARD_REMOTE || self == ASB_DISCARD_LOCAL)
2857 return 1;
2858
2859 /* everything else is valid if they are equal on both sides. */
2860 if (peer == self)
2861 return 0;
2862
2863 /* everything es is invalid. */
2864 return 1;
2865}
2866
7204624c 2867static int receive_protocol(struct drbd_tconn *tconn, enum drbd_packet cmd,
d8763023 2868 unsigned int data_size)
b411b363 2869{
7204624c 2870 struct p_protocol *p = &tconn->data.rbuf.protocol;
b411b363 2871 int p_proto, p_after_sb_0p, p_after_sb_1p, p_after_sb_2p;
cf14c2e9 2872 int p_want_lose, p_two_primaries, cf;
b411b363
PR
2873 char p_integrity_alg[SHARED_SECRET_MAX] = "";
2874
b411b363
PR
2875 p_proto = be32_to_cpu(p->protocol);
2876 p_after_sb_0p = be32_to_cpu(p->after_sb_0p);
2877 p_after_sb_1p = be32_to_cpu(p->after_sb_1p);
2878 p_after_sb_2p = be32_to_cpu(p->after_sb_2p);
b411b363 2879 p_two_primaries = be32_to_cpu(p->two_primaries);
cf14c2e9
PR
2880 cf = be32_to_cpu(p->conn_flags);
2881 p_want_lose = cf & CF_WANT_LOSE;
2882
7204624c 2883 clear_bit(CONN_DRY_RUN, &tconn->flags);
cf14c2e9
PR
2884
2885 if (cf & CF_DRY_RUN)
7204624c 2886 set_bit(CONN_DRY_RUN, &tconn->flags);
b411b363 2887
7204624c
PR
2888 if (p_proto != tconn->net_conf->wire_protocol) {
2889 conn_err(tconn, "incompatible communication protocols\n");
b411b363
PR
2890 goto disconnect;
2891 }
2892
7204624c
PR
2893 if (cmp_after_sb(p_after_sb_0p, tconn->net_conf->after_sb_0p)) {
2894 conn_err(tconn, "incompatible after-sb-0pri settings\n");
b411b363
PR
2895 goto disconnect;
2896 }
2897
7204624c
PR
2898 if (cmp_after_sb(p_after_sb_1p, tconn->net_conf->after_sb_1p)) {
2899 conn_err(tconn, "incompatible after-sb-1pri settings\n");
b411b363
PR
2900 goto disconnect;
2901 }
2902
7204624c
PR
2903 if (cmp_after_sb(p_after_sb_2p, tconn->net_conf->after_sb_2p)) {
2904 conn_err(tconn, "incompatible after-sb-2pri settings\n");
b411b363
PR
2905 goto disconnect;
2906 }
2907
7204624c
PR
2908 if (p_want_lose && tconn->net_conf->want_lose) {
2909 conn_err(tconn, "both sides have the 'want_lose' flag set\n");
b411b363
PR
2910 goto disconnect;
2911 }
2912
7204624c
PR
2913 if (p_two_primaries != tconn->net_conf->two_primaries) {
2914 conn_err(tconn, "incompatible setting of the two-primaries options\n");
b411b363
PR
2915 goto disconnect;
2916 }
2917
7204624c
PR
2918 if (tconn->agreed_pro_version >= 87) {
2919 unsigned char *my_alg = tconn->net_conf->integrity_alg;
b411b363 2920
7204624c 2921 if (drbd_recv(tconn, p_integrity_alg, data_size) != data_size)
81e84650 2922 return false;
b411b363
PR
2923
2924 p_integrity_alg[SHARED_SECRET_MAX-1] = 0;
2925 if (strcmp(p_integrity_alg, my_alg)) {
7204624c 2926 conn_err(tconn, "incompatible setting of the data-integrity-alg\n");
b411b363
PR
2927 goto disconnect;
2928 }
7204624c 2929 conn_info(tconn, "data-integrity-alg: %s\n",
b411b363
PR
2930 my_alg[0] ? my_alg : (unsigned char *)"<not-used>");
2931 }
2932
81e84650 2933 return true;
b411b363
PR
2934
2935disconnect:
7204624c 2936 conn_request_state(tconn, NS(conn, C_DISCONNECTING), CS_HARD);
81e84650 2937 return false;
b411b363
PR
2938}
2939
2940/* helper function
2941 * input: alg name, feature name
2942 * return: NULL (alg name was "")
2943 * ERR_PTR(error) if something goes wrong
2944 * or the crypto hash ptr, if it worked out ok. */
2945struct crypto_hash *drbd_crypto_alloc_digest_safe(const struct drbd_conf *mdev,
2946 const char *alg, const char *name)
2947{
2948 struct crypto_hash *tfm;
2949
2950 if (!alg[0])
2951 return NULL;
2952
2953 tfm = crypto_alloc_hash(alg, 0, CRYPTO_ALG_ASYNC);
2954 if (IS_ERR(tfm)) {
2955 dev_err(DEV, "Can not allocate \"%s\" as %s (reason: %ld)\n",
2956 alg, name, PTR_ERR(tfm));
2957 return tfm;
2958 }
2959 if (!drbd_crypto_is_hash(crypto_hash_tfm(tfm))) {
2960 crypto_free_hash(tfm);
2961 dev_err(DEV, "\"%s\" is not a digest (%s)\n", alg, name);
2962 return ERR_PTR(-EINVAL);
2963 }
2964 return tfm;
2965}
2966
d8763023
AG
2967static int receive_SyncParam(struct drbd_conf *mdev, enum drbd_packet cmd,
2968 unsigned int packet_size)
b411b363 2969{
81e84650 2970 int ok = true;
e42325a5 2971 struct p_rs_param_95 *p = &mdev->tconn->data.rbuf.rs_param_95;
b411b363
PR
2972 unsigned int header_size, data_size, exp_max_sz;
2973 struct crypto_hash *verify_tfm = NULL;
2974 struct crypto_hash *csums_tfm = NULL;
31890f4a 2975 const int apv = mdev->tconn->agreed_pro_version;
778f271d
PR
2976 int *rs_plan_s = NULL;
2977 int fifo_size = 0;
b411b363
PR
2978
2979 exp_max_sz = apv <= 87 ? sizeof(struct p_rs_param)
2980 : apv == 88 ? sizeof(struct p_rs_param)
2981 + SHARED_SECRET_MAX
8e26f9cc
PR
2982 : apv <= 94 ? sizeof(struct p_rs_param_89)
2983 : /* apv >= 95 */ sizeof(struct p_rs_param_95);
b411b363 2984
02918be2 2985 if (packet_size > exp_max_sz) {
b411b363 2986 dev_err(DEV, "SyncParam packet too long: received %u, expected <= %u bytes\n",
02918be2 2987 packet_size, exp_max_sz);
81e84650 2988 return false;
b411b363
PR
2989 }
2990
2991 if (apv <= 88) {
257d0af6 2992 header_size = sizeof(struct p_rs_param) - sizeof(struct p_header);
02918be2 2993 data_size = packet_size - header_size;
8e26f9cc 2994 } else if (apv <= 94) {
257d0af6 2995 header_size = sizeof(struct p_rs_param_89) - sizeof(struct p_header);
02918be2 2996 data_size = packet_size - header_size;
b411b363 2997 D_ASSERT(data_size == 0);
8e26f9cc 2998 } else {
257d0af6 2999 header_size = sizeof(struct p_rs_param_95) - sizeof(struct p_header);
02918be2 3000 data_size = packet_size - header_size;
b411b363
PR
3001 D_ASSERT(data_size == 0);
3002 }
3003
3004 /* initialize verify_alg and csums_alg */
3005 memset(p->verify_alg, 0, 2 * SHARED_SECRET_MAX);
3006
de0ff338 3007 if (drbd_recv(mdev->tconn, &p->head.payload, header_size) != header_size)
81e84650 3008 return false;
b411b363 3009
f399002e
LE
3010 if (get_ldev(mdev)) {
3011 mdev->ldev->dc.resync_rate = be32_to_cpu(p->rate);
3012 put_ldev(mdev);
3013 }
b411b363
PR
3014
3015 if (apv >= 88) {
3016 if (apv == 88) {
3017 if (data_size > SHARED_SECRET_MAX) {
3018 dev_err(DEV, "verify-alg too long, "
3019 "peer wants %u, accepting only %u byte\n",
3020 data_size, SHARED_SECRET_MAX);
81e84650 3021 return false;
b411b363
PR
3022 }
3023
de0ff338 3024 if (drbd_recv(mdev->tconn, p->verify_alg, data_size) != data_size)
81e84650 3025 return false;
b411b363
PR
3026
3027 /* we expect NUL terminated string */
3028 /* but just in case someone tries to be evil */
3029 D_ASSERT(p->verify_alg[data_size-1] == 0);
3030 p->verify_alg[data_size-1] = 0;
3031
3032 } else /* apv >= 89 */ {
3033 /* we still expect NUL terminated strings */
3034 /* but just in case someone tries to be evil */
3035 D_ASSERT(p->verify_alg[SHARED_SECRET_MAX-1] == 0);
3036 D_ASSERT(p->csums_alg[SHARED_SECRET_MAX-1] == 0);
3037 p->verify_alg[SHARED_SECRET_MAX-1] = 0;
3038 p->csums_alg[SHARED_SECRET_MAX-1] = 0;
3039 }
3040
f399002e 3041 if (strcmp(mdev->tconn->net_conf->verify_alg, p->verify_alg)) {
b411b363
PR
3042 if (mdev->state.conn == C_WF_REPORT_PARAMS) {
3043 dev_err(DEV, "Different verify-alg settings. me=\"%s\" peer=\"%s\"\n",
f399002e 3044 mdev->tconn->net_conf->verify_alg, p->verify_alg);
b411b363
PR
3045 goto disconnect;
3046 }
3047 verify_tfm = drbd_crypto_alloc_digest_safe(mdev,
3048 p->verify_alg, "verify-alg");
3049 if (IS_ERR(verify_tfm)) {
3050 verify_tfm = NULL;
3051 goto disconnect;
3052 }
3053 }
3054
f399002e 3055 if (apv >= 89 && strcmp(mdev->tconn->net_conf->csums_alg, p->csums_alg)) {
b411b363
PR
3056 if (mdev->state.conn == C_WF_REPORT_PARAMS) {
3057 dev_err(DEV, "Different csums-alg settings. me=\"%s\" peer=\"%s\"\n",
f399002e 3058 mdev->tconn->net_conf->csums_alg, p->csums_alg);
b411b363
PR
3059 goto disconnect;
3060 }
3061 csums_tfm = drbd_crypto_alloc_digest_safe(mdev,
3062 p->csums_alg, "csums-alg");
3063 if (IS_ERR(csums_tfm)) {
3064 csums_tfm = NULL;
3065 goto disconnect;
3066 }
3067 }
3068
f399002e
LE
3069 if (apv > 94 && get_ldev(mdev)) {
3070 mdev->ldev->dc.resync_rate = be32_to_cpu(p->rate);
3071 mdev->ldev->dc.c_plan_ahead = be32_to_cpu(p->c_plan_ahead);
3072 mdev->ldev->dc.c_delay_target = be32_to_cpu(p->c_delay_target);
3073 mdev->ldev->dc.c_fill_target = be32_to_cpu(p->c_fill_target);
3074 mdev->ldev->dc.c_max_rate = be32_to_cpu(p->c_max_rate);
778f271d 3075
f399002e 3076 fifo_size = (mdev->ldev->dc.c_plan_ahead * 10 * SLEEP_TIME) / HZ;
778f271d
PR
3077 if (fifo_size != mdev->rs_plan_s.size && fifo_size > 0) {
3078 rs_plan_s = kzalloc(sizeof(int) * fifo_size, GFP_KERNEL);
3079 if (!rs_plan_s) {
3080 dev_err(DEV, "kmalloc of fifo_buffer failed");
f399002e 3081 put_ldev(mdev);
778f271d
PR
3082 goto disconnect;
3083 }
3084 }
f399002e 3085 put_ldev(mdev);
8e26f9cc 3086 }
b411b363
PR
3087
3088 spin_lock(&mdev->peer_seq_lock);
3089 /* lock against drbd_nl_syncer_conf() */
3090 if (verify_tfm) {
f399002e
LE
3091 strcpy(mdev->tconn->net_conf->verify_alg, p->verify_alg);
3092 mdev->tconn->net_conf->verify_alg_len = strlen(p->verify_alg) + 1;
3093 crypto_free_hash(mdev->tconn->verify_tfm);
3094 mdev->tconn->verify_tfm = verify_tfm;
b411b363
PR
3095 dev_info(DEV, "using verify-alg: \"%s\"\n", p->verify_alg);
3096 }
3097 if (csums_tfm) {
f399002e
LE
3098 strcpy(mdev->tconn->net_conf->csums_alg, p->csums_alg);
3099 mdev->tconn->net_conf->csums_alg_len = strlen(p->csums_alg) + 1;
3100 crypto_free_hash(mdev->tconn->csums_tfm);
3101 mdev->tconn->csums_tfm = csums_tfm;
b411b363
PR
3102 dev_info(DEV, "using csums-alg: \"%s\"\n", p->csums_alg);
3103 }
778f271d
PR
3104 if (fifo_size != mdev->rs_plan_s.size) {
3105 kfree(mdev->rs_plan_s.values);
3106 mdev->rs_plan_s.values = rs_plan_s;
3107 mdev->rs_plan_s.size = fifo_size;
3108 mdev->rs_planed = 0;
3109 }
b411b363
PR
3110 spin_unlock(&mdev->peer_seq_lock);
3111 }
3112
3113 return ok;
3114disconnect:
3115 /* just for completeness: actually not needed,
3116 * as this is not reached if csums_tfm was ok. */
3117 crypto_free_hash(csums_tfm);
3118 /* but free the verify_tfm again, if csums_tfm did not work out */
3119 crypto_free_hash(verify_tfm);
38fa9988 3120 conn_request_state(mdev->tconn, NS(conn, C_DISCONNECTING), CS_HARD);
81e84650 3121 return false;
b411b363
PR
3122}
3123
b411b363
PR
3124/* warn if the arguments differ by more than 12.5% */
3125static void warn_if_differ_considerably(struct drbd_conf *mdev,
3126 const char *s, sector_t a, sector_t b)
3127{
3128 sector_t d;
3129 if (a == 0 || b == 0)
3130 return;
3131 d = (a > b) ? (a - b) : (b - a);
3132 if (d > (a>>3) || d > (b>>3))
3133 dev_warn(DEV, "Considerable difference in %s: %llus vs. %llus\n", s,
3134 (unsigned long long)a, (unsigned long long)b);
3135}
3136
d8763023
AG
3137static int receive_sizes(struct drbd_conf *mdev, enum drbd_packet cmd,
3138 unsigned int data_size)
b411b363 3139{
e42325a5 3140 struct p_sizes *p = &mdev->tconn->data.rbuf.sizes;
b411b363 3141 enum determine_dev_size dd = unchanged;
b411b363
PR
3142 sector_t p_size, p_usize, my_usize;
3143 int ldsc = 0; /* local disk size changed */
e89b591c 3144 enum dds_flags ddsf;
b411b363 3145
b411b363
PR
3146 p_size = be64_to_cpu(p->d_size);
3147 p_usize = be64_to_cpu(p->u_size);
3148
b411b363
PR
3149 /* just store the peer's disk size for now.
3150 * we still need to figure out whether we accept that. */
3151 mdev->p_size = p_size;
3152
b411b363
PR
3153 if (get_ldev(mdev)) {
3154 warn_if_differ_considerably(mdev, "lower level device sizes",
3155 p_size, drbd_get_max_capacity(mdev->ldev));
3156 warn_if_differ_considerably(mdev, "user requested size",
3157 p_usize, mdev->ldev->dc.disk_size);
3158
3159 /* if this is the first connect, or an otherwise expected
3160 * param exchange, choose the minimum */
3161 if (mdev->state.conn == C_WF_REPORT_PARAMS)
3162 p_usize = min_not_zero((sector_t)mdev->ldev->dc.disk_size,
3163 p_usize);
3164
3165 my_usize = mdev->ldev->dc.disk_size;
3166
3167 if (mdev->ldev->dc.disk_size != p_usize) {
3168 mdev->ldev->dc.disk_size = p_usize;
3169 dev_info(DEV, "Peer sets u_size to %lu sectors\n",
3170 (unsigned long)mdev->ldev->dc.disk_size);
3171 }
3172
3173 /* Never shrink a device with usable data during connect.
3174 But allow online shrinking if we are connected. */
a393db6f 3175 if (drbd_new_dev_size(mdev, mdev->ldev, 0) <
b411b363
PR
3176 drbd_get_capacity(mdev->this_bdev) &&
3177 mdev->state.disk >= D_OUTDATED &&
3178 mdev->state.conn < C_CONNECTED) {
3179 dev_err(DEV, "The peer's disk size is too small!\n");
38fa9988 3180 conn_request_state(mdev->tconn, NS(conn, C_DISCONNECTING), CS_HARD);
b411b363
PR
3181 mdev->ldev->dc.disk_size = my_usize;
3182 put_ldev(mdev);
81e84650 3183 return false;
b411b363
PR
3184 }
3185 put_ldev(mdev);
3186 }
b411b363 3187
e89b591c 3188 ddsf = be16_to_cpu(p->dds_flags);
b411b363 3189 if (get_ldev(mdev)) {
24c4830c 3190 dd = drbd_determine_dev_size(mdev, ddsf);
b411b363
PR
3191 put_ldev(mdev);
3192 if (dd == dev_size_error)
81e84650 3193 return false;
b411b363
PR
3194 drbd_md_sync(mdev);
3195 } else {
3196 /* I am diskless, need to accept the peer's size. */
3197 drbd_set_my_capacity(mdev, p_size);
3198 }
3199
99432fcc
PR
3200 mdev->peer_max_bio_size = be32_to_cpu(p->max_bio_size);
3201 drbd_reconsider_max_bio_size(mdev);
3202
b411b363
PR
3203 if (get_ldev(mdev)) {
3204 if (mdev->ldev->known_size != drbd_get_capacity(mdev->ldev->backing_bdev)) {
3205 mdev->ldev->known_size = drbd_get_capacity(mdev->ldev->backing_bdev);
3206 ldsc = 1;
3207 }
3208
b411b363
PR
3209 put_ldev(mdev);
3210 }
3211
3212 if (mdev->state.conn > C_WF_REPORT_PARAMS) {
3213 if (be64_to_cpu(p->c_size) !=
3214 drbd_get_capacity(mdev->this_bdev) || ldsc) {
3215 /* we have different sizes, probably peer
3216 * needs to know my new size... */
e89b591c 3217 drbd_send_sizes(mdev, 0, ddsf);
b411b363
PR
3218 }
3219 if (test_and_clear_bit(RESIZE_PENDING, &mdev->flags) ||
3220 (dd == grew && mdev->state.conn == C_CONNECTED)) {
3221 if (mdev->state.pdsk >= D_INCONSISTENT &&
e89b591c
PR
3222 mdev->state.disk >= D_INCONSISTENT) {
3223 if (ddsf & DDSF_NO_RESYNC)
3224 dev_info(DEV, "Resync of new storage suppressed with --assume-clean\n");
3225 else
3226 resync_after_online_grow(mdev);
3227 } else
b411b363
PR
3228 set_bit(RESYNC_AFTER_NEG, &mdev->flags);
3229 }
3230 }
3231
81e84650 3232 return true;
b411b363
PR
3233}
3234
d8763023
AG
3235static int receive_uuids(struct drbd_conf *mdev, enum drbd_packet cmd,
3236 unsigned int data_size)
b411b363 3237{
e42325a5 3238 struct p_uuids *p = &mdev->tconn->data.rbuf.uuids;
b411b363 3239 u64 *p_uuid;
62b0da3a 3240 int i, updated_uuids = 0;
b411b363 3241
b411b363
PR
3242 p_uuid = kmalloc(sizeof(u64)*UI_EXTENDED_SIZE, GFP_NOIO);
3243
3244 for (i = UI_CURRENT; i < UI_EXTENDED_SIZE; i++)
3245 p_uuid[i] = be64_to_cpu(p->uuid[i]);
3246
3247 kfree(mdev->p_uuid);
3248 mdev->p_uuid = p_uuid;
3249
3250 if (mdev->state.conn < C_CONNECTED &&
3251 mdev->state.disk < D_INCONSISTENT &&
3252 mdev->state.role == R_PRIMARY &&
3253 (mdev->ed_uuid & ~((u64)1)) != (p_uuid[UI_CURRENT] & ~((u64)1))) {
3254 dev_err(DEV, "Can only connect to data with current UUID=%016llX\n",
3255 (unsigned long long)mdev->ed_uuid);
38fa9988 3256 conn_request_state(mdev->tconn, NS(conn, C_DISCONNECTING), CS_HARD);
81e84650 3257 return false;
b411b363
PR
3258 }
3259
3260 if (get_ldev(mdev)) {
3261 int skip_initial_sync =
3262 mdev->state.conn == C_CONNECTED &&
31890f4a 3263 mdev->tconn->agreed_pro_version >= 90 &&
b411b363
PR
3264 mdev->ldev->md.uuid[UI_CURRENT] == UUID_JUST_CREATED &&
3265 (p_uuid[UI_FLAGS] & 8);
3266 if (skip_initial_sync) {
3267 dev_info(DEV, "Accepted new current UUID, preparing to skip initial sync\n");
3268 drbd_bitmap_io(mdev, &drbd_bmio_clear_n_write,
20ceb2b2
LE
3269 "clear_n_write from receive_uuids",
3270 BM_LOCKED_TEST_ALLOWED);
b411b363
PR
3271 _drbd_uuid_set(mdev, UI_CURRENT, p_uuid[UI_CURRENT]);
3272 _drbd_uuid_set(mdev, UI_BITMAP, 0);
3273 _drbd_set_state(_NS2(mdev, disk, D_UP_TO_DATE, pdsk, D_UP_TO_DATE),
3274 CS_VERBOSE, NULL);
3275 drbd_md_sync(mdev);
62b0da3a 3276 updated_uuids = 1;
b411b363
PR
3277 }
3278 put_ldev(mdev);
18a50fa2
PR
3279 } else if (mdev->state.disk < D_INCONSISTENT &&
3280 mdev->state.role == R_PRIMARY) {
3281 /* I am a diskless primary, the peer just created a new current UUID
3282 for me. */
62b0da3a 3283 updated_uuids = drbd_set_ed_uuid(mdev, p_uuid[UI_CURRENT]);
b411b363
PR
3284 }
3285
3286 /* Before we test for the disk state, we should wait until an eventually
3287 ongoing cluster wide state change is finished. That is important if
3288 we are primary and are detaching from our disk. We need to see the
3289 new disk state... */
8410da8f
PR
3290 mutex_lock(mdev->state_mutex);
3291 mutex_unlock(mdev->state_mutex);
b411b363 3292 if (mdev->state.conn >= C_CONNECTED && mdev->state.disk < D_INCONSISTENT)
62b0da3a
LE
3293 updated_uuids |= drbd_set_ed_uuid(mdev, p_uuid[UI_CURRENT]);
3294
3295 if (updated_uuids)
3296 drbd_print_uuids(mdev, "receiver updated UUIDs to");
b411b363 3297
81e84650 3298 return true;
b411b363
PR
3299}
3300
3301/**
3302 * convert_state() - Converts the peer's view of the cluster state to our point of view
3303 * @ps: The state as seen by the peer.
3304 */
3305static union drbd_state convert_state(union drbd_state ps)
3306{
3307 union drbd_state ms;
3308
3309 static enum drbd_conns c_tab[] = {
3310 [C_CONNECTED] = C_CONNECTED,
3311
3312 [C_STARTING_SYNC_S] = C_STARTING_SYNC_T,
3313 [C_STARTING_SYNC_T] = C_STARTING_SYNC_S,
3314 [C_DISCONNECTING] = C_TEAR_DOWN, /* C_NETWORK_FAILURE, */
3315 [C_VERIFY_S] = C_VERIFY_T,
3316 [C_MASK] = C_MASK,
3317 };
3318
3319 ms.i = ps.i;
3320
3321 ms.conn = c_tab[ps.conn];
3322 ms.peer = ps.role;
3323 ms.role = ps.peer;
3324 ms.pdsk = ps.disk;
3325 ms.disk = ps.pdsk;
3326 ms.peer_isp = (ps.aftr_isp | ps.user_isp);
3327
3328 return ms;
3329}
3330
d8763023
AG
3331static int receive_req_state(struct drbd_conf *mdev, enum drbd_packet cmd,
3332 unsigned int data_size)
b411b363 3333{
e42325a5 3334 struct p_req_state *p = &mdev->tconn->data.rbuf.req_state;
b411b363 3335 union drbd_state mask, val;
bf885f8a 3336 enum drbd_state_rv rv;
b411b363 3337
b411b363
PR
3338 mask.i = be32_to_cpu(p->mask);
3339 val.i = be32_to_cpu(p->val);
3340
25703f83 3341 if (test_bit(DISCARD_CONCURRENT, &mdev->tconn->flags) &&
8410da8f 3342 mutex_is_locked(mdev->state_mutex)) {
b411b363 3343 drbd_send_sr_reply(mdev, SS_CONCURRENT_ST_CHG);
81e84650 3344 return true;
b411b363
PR
3345 }
3346
3347 mask = convert_state(mask);
3348 val = convert_state(val);
3349
dfafcc8a
PR
3350 rv = drbd_change_state(mdev, CS_VERBOSE, mask, val);
3351 drbd_send_sr_reply(mdev, rv);
b411b363 3352
b411b363
PR
3353 drbd_md_sync(mdev);
3354
81e84650 3355 return true;
b411b363
PR
3356}
3357
dfafcc8a
PR
3358static int receive_req_conn_state(struct drbd_tconn *tconn, enum drbd_packet cmd,
3359 unsigned int data_size)
3360{
3361 struct p_req_state *p = &tconn->data.rbuf.req_state;
3362 union drbd_state mask, val;
3363 enum drbd_state_rv rv;
3364
3365 mask.i = be32_to_cpu(p->mask);
3366 val.i = be32_to_cpu(p->val);
3367
3368 if (test_bit(DISCARD_CONCURRENT, &tconn->flags) &&
3369 mutex_is_locked(&tconn->cstate_mutex)) {
3370 conn_send_sr_reply(tconn, SS_CONCURRENT_ST_CHG);
3371 return true;
3372 }
3373
3374 mask = convert_state(mask);
3375 val = convert_state(val);
3376
3377 rv = conn_request_state(tconn, mask, val, CS_VERBOSE | CS_LOCAL_ONLY);
3378 conn_send_sr_reply(tconn, rv);
3379
3380 return true;
3381}
3382
d8763023
AG
3383static int receive_state(struct drbd_conf *mdev, enum drbd_packet cmd,
3384 unsigned int data_size)
b411b363 3385{
e42325a5 3386 struct p_state *p = &mdev->tconn->data.rbuf.state;
4ac4aada 3387 union drbd_state os, ns, peer_state;
b411b363 3388 enum drbd_disk_state real_peer_disk;
65d922c3 3389 enum chg_state_flags cs_flags;
b411b363
PR
3390 int rv;
3391
b411b363
PR
3392 peer_state.i = be32_to_cpu(p->state);
3393
3394 real_peer_disk = peer_state.disk;
3395 if (peer_state.disk == D_NEGOTIATING) {
3396 real_peer_disk = mdev->p_uuid[UI_FLAGS] & 4 ? D_INCONSISTENT : D_CONSISTENT;
3397 dev_info(DEV, "real peer disk state = %s\n", drbd_disk_str(real_peer_disk));
3398 }
3399
87eeee41 3400 spin_lock_irq(&mdev->tconn->req_lock);
b411b363 3401 retry:
4ac4aada 3402 os = ns = mdev->state;
87eeee41 3403 spin_unlock_irq(&mdev->tconn->req_lock);
b411b363 3404
e9ef7bb6
LE
3405 /* peer says his disk is uptodate, while we think it is inconsistent,
3406 * and this happens while we think we have a sync going on. */
3407 if (os.pdsk == D_INCONSISTENT && real_peer_disk == D_UP_TO_DATE &&
3408 os.conn > C_CONNECTED && os.disk == D_UP_TO_DATE) {
3409 /* If we are (becoming) SyncSource, but peer is still in sync
3410 * preparation, ignore its uptodate-ness to avoid flapping, it
3411 * will change to inconsistent once the peer reaches active
3412 * syncing states.
3413 * It may have changed syncer-paused flags, however, so we
3414 * cannot ignore this completely. */
3415 if (peer_state.conn > C_CONNECTED &&
3416 peer_state.conn < C_SYNC_SOURCE)
3417 real_peer_disk = D_INCONSISTENT;
3418
3419 /* if peer_state changes to connected at the same time,
3420 * it explicitly notifies us that it finished resync.
3421 * Maybe we should finish it up, too? */
3422 else if (os.conn >= C_SYNC_SOURCE &&
3423 peer_state.conn == C_CONNECTED) {
3424 if (drbd_bm_total_weight(mdev) <= mdev->rs_failed)
3425 drbd_resync_finished(mdev);
81e84650 3426 return true;
e9ef7bb6
LE
3427 }
3428 }
3429
3430 /* peer says his disk is inconsistent, while we think it is uptodate,
3431 * and this happens while the peer still thinks we have a sync going on,
3432 * but we think we are already done with the sync.
3433 * We ignore this to avoid flapping pdsk.
3434 * This should not happen, if the peer is a recent version of drbd. */
3435 if (os.pdsk == D_UP_TO_DATE && real_peer_disk == D_INCONSISTENT &&
3436 os.conn == C_CONNECTED && peer_state.conn > C_SYNC_SOURCE)
3437 real_peer_disk = D_UP_TO_DATE;
3438
4ac4aada
LE
3439 if (ns.conn == C_WF_REPORT_PARAMS)
3440 ns.conn = C_CONNECTED;
b411b363 3441
67531718
PR
3442 if (peer_state.conn == C_AHEAD)
3443 ns.conn = C_BEHIND;
3444
b411b363
PR
3445 if (mdev->p_uuid && peer_state.disk >= D_NEGOTIATING &&
3446 get_ldev_if_state(mdev, D_NEGOTIATING)) {
3447 int cr; /* consider resync */
3448
3449 /* if we established a new connection */
4ac4aada 3450 cr = (os.conn < C_CONNECTED);
b411b363
PR
3451 /* if we had an established connection
3452 * and one of the nodes newly attaches a disk */
4ac4aada 3453 cr |= (os.conn == C_CONNECTED &&
b411b363 3454 (peer_state.disk == D_NEGOTIATING ||
4ac4aada 3455 os.disk == D_NEGOTIATING));
b411b363
PR
3456 /* if we have both been inconsistent, and the peer has been
3457 * forced to be UpToDate with --overwrite-data */
3458 cr |= test_bit(CONSIDER_RESYNC, &mdev->flags);
3459 /* if we had been plain connected, and the admin requested to
3460 * start a sync by "invalidate" or "invalidate-remote" */
4ac4aada 3461 cr |= (os.conn == C_CONNECTED &&
b411b363
PR
3462 (peer_state.conn >= C_STARTING_SYNC_S &&
3463 peer_state.conn <= C_WF_BITMAP_T));
3464
3465 if (cr)
4ac4aada 3466 ns.conn = drbd_sync_handshake(mdev, peer_state.role, real_peer_disk);
b411b363
PR
3467
3468 put_ldev(mdev);
4ac4aada
LE
3469 if (ns.conn == C_MASK) {
3470 ns.conn = C_CONNECTED;
b411b363 3471 if (mdev->state.disk == D_NEGOTIATING) {
82f59cc6 3472 drbd_force_state(mdev, NS(disk, D_FAILED));
b411b363
PR
3473 } else if (peer_state.disk == D_NEGOTIATING) {
3474 dev_err(DEV, "Disk attach process on the peer node was aborted.\n");
3475 peer_state.disk = D_DISKLESS;
580b9767 3476 real_peer_disk = D_DISKLESS;
b411b363 3477 } else {
8169e41b 3478 if (test_and_clear_bit(CONN_DRY_RUN, &mdev->tconn->flags))
81e84650 3479 return false;
4ac4aada 3480 D_ASSERT(os.conn == C_WF_REPORT_PARAMS);
38fa9988 3481 conn_request_state(mdev->tconn, NS(conn, C_DISCONNECTING), CS_HARD);
81e84650 3482 return false;
b411b363
PR
3483 }
3484 }
3485 }
3486
87eeee41 3487 spin_lock_irq(&mdev->tconn->req_lock);
4ac4aada 3488 if (mdev->state.i != os.i)
b411b363
PR
3489 goto retry;
3490 clear_bit(CONSIDER_RESYNC, &mdev->flags);
b411b363
PR
3491 ns.peer = peer_state.role;
3492 ns.pdsk = real_peer_disk;
3493 ns.peer_isp = (peer_state.aftr_isp | peer_state.user_isp);
4ac4aada 3494 if ((ns.conn == C_CONNECTED || ns.conn == C_WF_BITMAP_S) && ns.disk == D_NEGOTIATING)
b411b363 3495 ns.disk = mdev->new_state_tmp.disk;
4ac4aada
LE
3496 cs_flags = CS_VERBOSE + (os.conn < C_CONNECTED && ns.conn >= C_CONNECTED ? 0 : CS_HARD);
3497 if (ns.pdsk == D_CONSISTENT && is_susp(ns) && ns.conn == C_CONNECTED && os.conn < C_CONNECTED &&
481c6f50 3498 test_bit(NEW_CUR_UUID, &mdev->flags)) {
8554df1c 3499 /* Do not allow tl_restart(RESEND) for a rebooted peer. We can only allow this
481c6f50 3500 for temporal network outages! */
87eeee41 3501 spin_unlock_irq(&mdev->tconn->req_lock);
481c6f50 3502 dev_err(DEV, "Aborting Connect, can not thaw IO with an only Consistent peer\n");
2f5cdd0b 3503 tl_clear(mdev->tconn);
481c6f50
PR
3504 drbd_uuid_new_current(mdev);
3505 clear_bit(NEW_CUR_UUID, &mdev->flags);
38fa9988 3506 conn_request_state(mdev->tconn, NS2(conn, C_PROTOCOL_ERROR, susp, 0), CS_HARD);
81e84650 3507 return false;
481c6f50 3508 }
65d922c3 3509 rv = _drbd_set_state(mdev, ns, cs_flags, NULL);
b411b363 3510 ns = mdev->state;
87eeee41 3511 spin_unlock_irq(&mdev->tconn->req_lock);
b411b363
PR
3512
3513 if (rv < SS_SUCCESS) {
38fa9988 3514 conn_request_state(mdev->tconn, NS(conn, C_DISCONNECTING), CS_HARD);
81e84650 3515 return false;
b411b363
PR
3516 }
3517
4ac4aada
LE
3518 if (os.conn > C_WF_REPORT_PARAMS) {
3519 if (ns.conn > C_CONNECTED && peer_state.conn <= C_CONNECTED &&
b411b363
PR
3520 peer_state.disk != D_NEGOTIATING ) {
3521 /* we want resync, peer has not yet decided to sync... */
3522 /* Nowadays only used when forcing a node into primary role and
3523 setting its disk to UpToDate with that */
3524 drbd_send_uuids(mdev);
3525 drbd_send_state(mdev);
3526 }
3527 }
3528
89e58e75 3529 mdev->tconn->net_conf->want_lose = 0;
b411b363
PR
3530
3531 drbd_md_sync(mdev); /* update connected indicator, la_size, ... */
3532
81e84650 3533 return true;
b411b363
PR
3534}
3535
d8763023
AG
3536static int receive_sync_uuid(struct drbd_conf *mdev, enum drbd_packet cmd,
3537 unsigned int data_size)
b411b363 3538{
e42325a5 3539 struct p_rs_uuid *p = &mdev->tconn->data.rbuf.rs_uuid;
b411b363
PR
3540
3541 wait_event(mdev->misc_wait,
3542 mdev->state.conn == C_WF_SYNC_UUID ||
c4752ef1 3543 mdev->state.conn == C_BEHIND ||
b411b363
PR
3544 mdev->state.conn < C_CONNECTED ||
3545 mdev->state.disk < D_NEGOTIATING);
3546
3547 /* D_ASSERT( mdev->state.conn == C_WF_SYNC_UUID ); */
3548
b411b363
PR
3549 /* Here the _drbd_uuid_ functions are right, current should
3550 _not_ be rotated into the history */
3551 if (get_ldev_if_state(mdev, D_NEGOTIATING)) {
3552 _drbd_uuid_set(mdev, UI_CURRENT, be64_to_cpu(p->uuid));
3553 _drbd_uuid_set(mdev, UI_BITMAP, 0UL);
3554
62b0da3a 3555 drbd_print_uuids(mdev, "updated sync uuid");
b411b363
PR
3556 drbd_start_resync(mdev, C_SYNC_TARGET);
3557
3558 put_ldev(mdev);
3559 } else
3560 dev_err(DEV, "Ignoring SyncUUID packet!\n");
3561
81e84650 3562 return true;
b411b363
PR
3563}
3564
2c46407d
AG
3565/**
3566 * receive_bitmap_plain
3567 *
3568 * Return 0 when done, 1 when another iteration is needed, and a negative error
3569 * code upon failure.
3570 */
3571static int
02918be2
PR
3572receive_bitmap_plain(struct drbd_conf *mdev, unsigned int data_size,
3573 unsigned long *buffer, struct bm_xfer_ctx *c)
b411b363
PR
3574{
3575 unsigned num_words = min_t(size_t, BM_PACKET_WORDS, c->bm_words - c->word_offset);
3576 unsigned want = num_words * sizeof(long);
2c46407d 3577 int err;
b411b363 3578
02918be2
PR
3579 if (want != data_size) {
3580 dev_err(DEV, "%s:want (%u) != data_size (%u)\n", __func__, want, data_size);
2c46407d 3581 return -EIO;
b411b363
PR
3582 }
3583 if (want == 0)
2c46407d 3584 return 0;
de0ff338 3585 err = drbd_recv(mdev->tconn, buffer, want);
2c46407d
AG
3586 if (err != want) {
3587 if (err >= 0)
3588 err = -EIO;
3589 return err;
3590 }
b411b363
PR
3591
3592 drbd_bm_merge_lel(mdev, c->word_offset, num_words, buffer);
3593
3594 c->word_offset += num_words;
3595 c->bit_offset = c->word_offset * BITS_PER_LONG;
3596 if (c->bit_offset > c->bm_bits)
3597 c->bit_offset = c->bm_bits;
3598
2c46407d 3599 return 1;
b411b363
PR
3600}
3601
2c46407d
AG
3602/**
3603 * recv_bm_rle_bits
3604 *
3605 * Return 0 when done, 1 when another iteration is needed, and a negative error
3606 * code upon failure.
3607 */
3608static int
b411b363
PR
3609recv_bm_rle_bits(struct drbd_conf *mdev,
3610 struct p_compressed_bm *p,
c6d25cfe
PR
3611 struct bm_xfer_ctx *c,
3612 unsigned int len)
b411b363
PR
3613{
3614 struct bitstream bs;
3615 u64 look_ahead;
3616 u64 rl;
3617 u64 tmp;
3618 unsigned long s = c->bit_offset;
3619 unsigned long e;
b411b363
PR
3620 int toggle = DCBP_get_start(p);
3621 int have;
3622 int bits;
3623
3624 bitstream_init(&bs, p->code, len, DCBP_get_pad_bits(p));
3625
3626 bits = bitstream_get_bits(&bs, &look_ahead, 64);
3627 if (bits < 0)
2c46407d 3628 return -EIO;
b411b363
PR
3629
3630 for (have = bits; have > 0; s += rl, toggle = !toggle) {
3631 bits = vli_decode_bits(&rl, look_ahead);
3632 if (bits <= 0)
2c46407d 3633 return -EIO;
b411b363
PR
3634
3635 if (toggle) {
3636 e = s + rl -1;
3637 if (e >= c->bm_bits) {
3638 dev_err(DEV, "bitmap overflow (e:%lu) while decoding bm RLE packet\n", e);
2c46407d 3639 return -EIO;
b411b363
PR
3640 }
3641 _drbd_bm_set_bits(mdev, s, e);
3642 }
3643
3644 if (have < bits) {
3645 dev_err(DEV, "bitmap decoding error: h:%d b:%d la:0x%08llx l:%u/%u\n",
3646 have, bits, look_ahead,
3647 (unsigned int)(bs.cur.b - p->code),
3648 (unsigned int)bs.buf_len);
2c46407d 3649 return -EIO;
b411b363
PR
3650 }
3651 look_ahead >>= bits;
3652 have -= bits;
3653
3654 bits = bitstream_get_bits(&bs, &tmp, 64 - have);
3655 if (bits < 0)
2c46407d 3656 return -EIO;
b411b363
PR
3657 look_ahead |= tmp << have;
3658 have += bits;
3659 }
3660
3661 c->bit_offset = s;
3662 bm_xfer_ctx_bit_to_word_offset(c);
3663
2c46407d 3664 return (s != c->bm_bits);
b411b363
PR
3665}
3666
2c46407d
AG
3667/**
3668 * decode_bitmap_c
3669 *
3670 * Return 0 when done, 1 when another iteration is needed, and a negative error
3671 * code upon failure.
3672 */
3673static int
b411b363
PR
3674decode_bitmap_c(struct drbd_conf *mdev,
3675 struct p_compressed_bm *p,
c6d25cfe
PR
3676 struct bm_xfer_ctx *c,
3677 unsigned int len)
b411b363
PR
3678{
3679 if (DCBP_get_code(p) == RLE_VLI_Bits)
c6d25cfe 3680 return recv_bm_rle_bits(mdev, p, c, len);
b411b363
PR
3681
3682 /* other variants had been implemented for evaluation,
3683 * but have been dropped as this one turned out to be "best"
3684 * during all our tests. */
3685
3686 dev_err(DEV, "receive_bitmap_c: unknown encoding %u\n", p->encoding);
38fa9988 3687 conn_request_state(mdev->tconn, NS(conn, C_PROTOCOL_ERROR), CS_HARD);
2c46407d 3688 return -EIO;
b411b363
PR
3689}
3690
3691void INFO_bm_xfer_stats(struct drbd_conf *mdev,
3692 const char *direction, struct bm_xfer_ctx *c)
3693{
3694 /* what would it take to transfer it "plaintext" */
c012949a 3695 unsigned plain = sizeof(struct p_header) *
b411b363
PR
3696 ((c->bm_words+BM_PACKET_WORDS-1)/BM_PACKET_WORDS+1)
3697 + c->bm_words * sizeof(long);
3698 unsigned total = c->bytes[0] + c->bytes[1];
3699 unsigned r;
3700
3701 /* total can not be zero. but just in case: */
3702 if (total == 0)
3703 return;
3704
3705 /* don't report if not compressed */
3706 if (total >= plain)
3707 return;
3708
3709 /* total < plain. check for overflow, still */
3710 r = (total > UINT_MAX/1000) ? (total / (plain/1000))
3711 : (1000 * total / plain);
3712
3713 if (r > 1000)
3714 r = 1000;
3715
3716 r = 1000 - r;
3717 dev_info(DEV, "%s bitmap stats [Bytes(packets)]: plain %u(%u), RLE %u(%u), "
3718 "total %u; compression: %u.%u%%\n",
3719 direction,
3720 c->bytes[1], c->packets[1],
3721 c->bytes[0], c->packets[0],
3722 total, r/10, r % 10);
3723}
3724
3725/* Since we are processing the bitfield from lower addresses to higher,
3726 it does not matter if the process it in 32 bit chunks or 64 bit
3727 chunks as long as it is little endian. (Understand it as byte stream,
3728 beginning with the lowest byte...) If we would use big endian
3729 we would need to process it from the highest address to the lowest,
3730 in order to be agnostic to the 32 vs 64 bits issue.
3731
3732 returns 0 on failure, 1 if we successfully received it. */
d8763023
AG
3733static int receive_bitmap(struct drbd_conf *mdev, enum drbd_packet cmd,
3734 unsigned int data_size)
b411b363
PR
3735{
3736 struct bm_xfer_ctx c;
3737 void *buffer;
2c46407d 3738 int err;
81e84650 3739 int ok = false;
257d0af6 3740 struct p_header *h = &mdev->tconn->data.rbuf.header;
77351055 3741 struct packet_info pi;
b411b363 3742
20ceb2b2
LE
3743 drbd_bm_lock(mdev, "receive bitmap", BM_LOCKED_SET_ALLOWED);
3744 /* you are supposed to send additional out-of-sync information
3745 * if you actually set bits during this phase */
b411b363
PR
3746
3747 /* maybe we should use some per thread scratch page,
3748 * and allocate that during initial device creation? */
3749 buffer = (unsigned long *) __get_free_page(GFP_NOIO);
3750 if (!buffer) {
3751 dev_err(DEV, "failed to allocate one page buffer in %s\n", __func__);
3752 goto out;
3753 }
3754
3755 c = (struct bm_xfer_ctx) {
3756 .bm_bits = drbd_bm_bits(mdev),
3757 .bm_words = drbd_bm_words(mdev),
3758 };
3759
2c46407d 3760 for(;;) {
02918be2 3761 if (cmd == P_BITMAP) {
2c46407d 3762 err = receive_bitmap_plain(mdev, data_size, buffer, &c);
02918be2 3763 } else if (cmd == P_COMPRESSED_BITMAP) {
b411b363
PR
3764 /* MAYBE: sanity check that we speak proto >= 90,
3765 * and the feature is enabled! */
3766 struct p_compressed_bm *p;
3767
02918be2 3768 if (data_size > BM_PACKET_PAYLOAD_BYTES) {
b411b363
PR
3769 dev_err(DEV, "ReportCBitmap packet too large\n");
3770 goto out;
3771 }
3772 /* use the page buff */
3773 p = buffer;
3774 memcpy(p, h, sizeof(*h));
de0ff338 3775 if (drbd_recv(mdev->tconn, p->head.payload, data_size) != data_size)
b411b363 3776 goto out;
004352fa
LE
3777 if (data_size <= (sizeof(*p) - sizeof(p->head))) {
3778 dev_err(DEV, "ReportCBitmap packet too small (l:%u)\n", data_size);
78fcbdae 3779 goto out;
b411b363 3780 }
c6d25cfe 3781 err = decode_bitmap_c(mdev, p, &c, data_size);
b411b363 3782 } else {
02918be2 3783 dev_warn(DEV, "receive_bitmap: cmd neither ReportBitMap nor ReportCBitMap (is 0x%x)", cmd);
b411b363
PR
3784 goto out;
3785 }
3786
02918be2 3787 c.packets[cmd == P_BITMAP]++;
257d0af6 3788 c.bytes[cmd == P_BITMAP] += sizeof(struct p_header) + data_size;
b411b363 3789
2c46407d
AG
3790 if (err <= 0) {
3791 if (err < 0)
3792 goto out;
b411b363 3793 break;
2c46407d 3794 }
69bc7bc3 3795 if (drbd_recv_header(mdev->tconn, &pi))
b411b363 3796 goto out;
77351055
PR
3797 cmd = pi.cmd;
3798 data_size = pi.size;
2c46407d 3799 }
b411b363
PR
3800
3801 INFO_bm_xfer_stats(mdev, "receive", &c);
3802
3803 if (mdev->state.conn == C_WF_BITMAP_T) {
de1f8e4a
AG
3804 enum drbd_state_rv rv;
3805
b411b363
PR
3806 ok = !drbd_send_bitmap(mdev);
3807 if (!ok)
3808 goto out;
3809 /* Omit CS_ORDERED with this state transition to avoid deadlocks. */
de1f8e4a
AG
3810 rv = _drbd_request_state(mdev, NS(conn, C_WF_SYNC_UUID), CS_VERBOSE);
3811 D_ASSERT(rv == SS_SUCCESS);
b411b363
PR
3812 } else if (mdev->state.conn != C_WF_BITMAP_S) {
3813 /* admin may have requested C_DISCONNECTING,
3814 * other threads may have noticed network errors */
3815 dev_info(DEV, "unexpected cstate (%s) in receive_bitmap\n",
3816 drbd_conn_str(mdev->state.conn));
3817 }
3818
81e84650 3819 ok = true;
b411b363 3820 out:
20ceb2b2 3821 drbd_bm_unlock(mdev);
b411b363
PR
3822 if (ok && mdev->state.conn == C_WF_BITMAP_S)
3823 drbd_start_resync(mdev, C_SYNC_SOURCE);
3824 free_page((unsigned long) buffer);
3825 return ok;
3826}
3827
2de876ef 3828static int _tconn_receive_skip(struct drbd_tconn *tconn, unsigned int data_size)
b411b363
PR
3829{
3830 /* TODO zero copy sink :) */
3831 static char sink[128];
3832 int size, want, r;
3833
02918be2 3834 size = data_size;
b411b363
PR
3835 while (size > 0) {
3836 want = min_t(int, size, sizeof(sink));
2de876ef
PR
3837 r = drbd_recv(tconn, sink, want);
3838 if (r <= 0)
841ce241 3839 break;
b411b363
PR
3840 size -= r;
3841 }
3842 return size == 0;
3843}
3844
2de876ef
PR
3845static int receive_skip(struct drbd_conf *mdev, enum drbd_packet cmd,
3846 unsigned int data_size)
3847{
3848 dev_warn(DEV, "skipping unknown optional packet type %d, l: %d!\n",
3849 cmd, data_size);
3850
3851 return _tconn_receive_skip(mdev->tconn, data_size);
3852}
3853
3854static int tconn_receive_skip(struct drbd_tconn *tconn, enum drbd_packet cmd, unsigned int data_size)
3855{
3856 conn_warn(tconn, "skipping packet for non existing volume type %d, l: %d!\n",
3857 cmd, data_size);
3858
3859 return _tconn_receive_skip(tconn, data_size);
3860}
3861
d8763023
AG
3862static int receive_UnplugRemote(struct drbd_conf *mdev, enum drbd_packet cmd,
3863 unsigned int data_size)
0ced55a3 3864{
e7f52dfb
LE
3865 /* Make sure we've acked all the TCP data associated
3866 * with the data requests being unplugged */
e42325a5 3867 drbd_tcp_quickack(mdev->tconn->data.socket);
0ced55a3 3868
81e84650 3869 return true;
0ced55a3
PR
3870}
3871
d8763023
AG
3872static int receive_out_of_sync(struct drbd_conf *mdev, enum drbd_packet cmd,
3873 unsigned int data_size)
73a01a18 3874{
e42325a5 3875 struct p_block_desc *p = &mdev->tconn->data.rbuf.block_desc;
73a01a18 3876
f735e363
LE
3877 switch (mdev->state.conn) {
3878 case C_WF_SYNC_UUID:
3879 case C_WF_BITMAP_T:
3880 case C_BEHIND:
3881 break;
3882 default:
3883 dev_err(DEV, "ASSERT FAILED cstate = %s, expected: WFSyncUUID|WFBitMapT|Behind\n",
3884 drbd_conn_str(mdev->state.conn));
3885 }
3886
73a01a18
PR
3887 drbd_set_out_of_sync(mdev, be64_to_cpu(p->sector), be32_to_cpu(p->blksize));
3888
81e84650 3889 return true;
73a01a18
PR
3890}
3891
02918be2
PR
3892struct data_cmd {
3893 int expect_payload;
3894 size_t pkt_size;
a4fbda8e 3895 enum mdev_or_conn fa_type; /* first argument's type */
d9ae84e7
PR
3896 union {
3897 int (*mdev_fn)(struct drbd_conf *, enum drbd_packet cmd,
3898 unsigned int to_receive);
3899 int (*conn_fn)(struct drbd_tconn *, enum drbd_packet cmd,
3900 unsigned int to_receive);
3901 };
02918be2
PR
3902};
3903
3904static struct data_cmd drbd_cmd_handler[] = {
d9ae84e7
PR
3905 [P_DATA] = { 1, sizeof(struct p_data), MDEV, { receive_Data } },
3906 [P_DATA_REPLY] = { 1, sizeof(struct p_data), MDEV, { receive_DataReply } },
3907 [P_RS_DATA_REPLY] = { 1, sizeof(struct p_data), MDEV, { receive_RSDataReply } } ,
3908 [P_BARRIER] = { 0, sizeof(struct p_barrier), MDEV, { receive_Barrier } } ,
3909 [P_BITMAP] = { 1, sizeof(struct p_header), MDEV, { receive_bitmap } } ,
3910 [P_COMPRESSED_BITMAP] = { 1, sizeof(struct p_header), MDEV, { receive_bitmap } } ,
3911 [P_UNPLUG_REMOTE] = { 0, sizeof(struct p_header), MDEV, { receive_UnplugRemote } },
3912 [P_DATA_REQUEST] = { 0, sizeof(struct p_block_req), MDEV, { receive_DataRequest } },
3913 [P_RS_DATA_REQUEST] = { 0, sizeof(struct p_block_req), MDEV, { receive_DataRequest } },
3914 [P_SYNC_PARAM] = { 1, sizeof(struct p_header), MDEV, { receive_SyncParam } },
3915 [P_SYNC_PARAM89] = { 1, sizeof(struct p_header), MDEV, { receive_SyncParam } },
7204624c 3916 [P_PROTOCOL] = { 1, sizeof(struct p_protocol), CONN, { .conn_fn = receive_protocol } },
d9ae84e7
PR
3917 [P_UUIDS] = { 0, sizeof(struct p_uuids), MDEV, { receive_uuids } },
3918 [P_SIZES] = { 0, sizeof(struct p_sizes), MDEV, { receive_sizes } },
3919 [P_STATE] = { 0, sizeof(struct p_state), MDEV, { receive_state } },
3920 [P_STATE_CHG_REQ] = { 0, sizeof(struct p_req_state), MDEV, { receive_req_state } },
3921 [P_SYNC_UUID] = { 0, sizeof(struct p_rs_uuid), MDEV, { receive_sync_uuid } },
3922 [P_OV_REQUEST] = { 0, sizeof(struct p_block_req), MDEV, { receive_DataRequest } },
3923 [P_OV_REPLY] = { 1, sizeof(struct p_block_req), MDEV, { receive_DataRequest } },
3924 [P_CSUM_RS_REQUEST] = { 1, sizeof(struct p_block_req), MDEV, { receive_DataRequest } },
3925 [P_DELAY_PROBE] = { 0, sizeof(struct p_delay_probe93), MDEV, { receive_skip } },
3926 [P_OUT_OF_SYNC] = { 0, sizeof(struct p_block_desc), MDEV, { receive_out_of_sync } },
dfafcc8a 3927 [P_CONN_ST_CHG_REQ] = { 0, sizeof(struct p_req_state), CONN, { .conn_fn = receive_req_conn_state } },
b411b363
PR
3928};
3929
02918be2 3930/* All handler functions that expect a sub-header get that sub-heder in
e42325a5 3931 mdev->tconn->data.rbuf.header.head.payload.
02918be2 3932
e42325a5 3933 Usually in mdev->tconn->data.rbuf.header.head the callback can find the usual
02918be2
PR
3934 p_header, but they may not rely on that. Since there is also p_header95 !
3935 */
b411b363 3936
eefc2f7d 3937static void drbdd(struct drbd_tconn *tconn)
b411b363 3938{
eefc2f7d 3939 struct p_header *header = &tconn->data.rbuf.header;
77351055 3940 struct packet_info pi;
02918be2
PR
3941 size_t shs; /* sub header size */
3942 int rv;
b411b363 3943
eefc2f7d
PR
3944 while (get_t_state(&tconn->receiver) == RUNNING) {
3945 drbd_thread_current_set_cpu(&tconn->receiver);
69bc7bc3 3946 if (drbd_recv_header(tconn, &pi))
02918be2 3947 goto err_out;
b411b363 3948
6e849ce8 3949 if (unlikely(pi.cmd >= ARRAY_SIZE(drbd_cmd_handler) ||
d9ae84e7 3950 !drbd_cmd_handler[pi.cmd].mdev_fn)) {
eefc2f7d 3951 conn_err(tconn, "unknown packet type %d, l: %d!\n", pi.cmd, pi.size);
02918be2 3952 goto err_out;
0b33a916 3953 }
b411b363 3954
77351055
PR
3955 shs = drbd_cmd_handler[pi.cmd].pkt_size - sizeof(struct p_header);
3956 if (pi.size - shs > 0 && !drbd_cmd_handler[pi.cmd].expect_payload) {
eefc2f7d 3957 conn_err(tconn, "No payload expected %s l:%d\n", cmdname(pi.cmd), pi.size);
02918be2 3958 goto err_out;
b411b363 3959 }
b411b363 3960
c13f7e1a 3961 if (shs) {
eefc2f7d 3962 rv = drbd_recv(tconn, &header->payload, shs);
c13f7e1a 3963 if (unlikely(rv != shs)) {
0ddc5549 3964 if (!signal_pending(current))
eefc2f7d 3965 conn_warn(tconn, "short read while reading sub header: rv=%d\n", rv);
c13f7e1a
LE
3966 goto err_out;
3967 }
3968 }
3969
a4fbda8e 3970 if (drbd_cmd_handler[pi.cmd].fa_type == CONN) {
d9ae84e7
PR
3971 rv = drbd_cmd_handler[pi.cmd].conn_fn(tconn, pi.cmd, pi.size - shs);
3972 } else {
3973 struct drbd_conf *mdev = vnr_to_mdev(tconn, pi.vnr);
3974 rv = mdev ?
3975 drbd_cmd_handler[pi.cmd].mdev_fn(mdev, pi.cmd, pi.size - shs) :
3976 tconn_receive_skip(tconn, pi.cmd, pi.size - shs);
3977 }
b411b363 3978
02918be2 3979 if (unlikely(!rv)) {
eefc2f7d 3980 conn_err(tconn, "error receiving %s, l: %d!\n",
77351055 3981 cmdname(pi.cmd), pi.size);
02918be2 3982 goto err_out;
b411b363
PR
3983 }
3984 }
b411b363 3985
02918be2
PR
3986 if (0) {
3987 err_out:
bbeb641c 3988 conn_request_state(tconn, NS(conn, C_PROTOCOL_ERROR), CS_HARD);
02918be2 3989 }
b411b363
PR
3990}
3991
0e29d163 3992void conn_flush_workqueue(struct drbd_tconn *tconn)
b411b363
PR
3993{
3994 struct drbd_wq_barrier barr;
3995
3996 barr.w.cb = w_prev_work_done;
0e29d163 3997 barr.w.tconn = tconn;
b411b363 3998 init_completion(&barr.done);
0e29d163 3999 drbd_queue_work(&tconn->data.work, &barr.w);
b411b363
PR
4000 wait_for_completion(&barr.done);
4001}
4002
360cc740 4003static void drbd_disconnect(struct drbd_tconn *tconn)
b411b363 4004{
bbeb641c 4005 enum drbd_conns oc;
b411b363 4006 int rv = SS_UNKNOWN_ERROR;
b411b363 4007
bbeb641c 4008 if (tconn->cstate == C_STANDALONE)
b411b363 4009 return;
b411b363
PR
4010
4011 /* asender does not clean up anything. it must not interfere, either */
360cc740
PR
4012 drbd_thread_stop(&tconn->asender);
4013 drbd_free_sock(tconn);
4014
4015 idr_for_each(&tconn->volumes, drbd_disconnected, tconn);
4016
4017 conn_info(tconn, "Connection closed\n");
4018
4019 spin_lock_irq(&tconn->req_lock);
bbeb641c
PR
4020 oc = tconn->cstate;
4021 if (oc >= C_UNCONNECTED)
4022 rv = _conn_request_state(tconn, NS(conn, C_UNCONNECTED), CS_VERBOSE);
4023
360cc740
PR
4024 spin_unlock_irq(&tconn->req_lock);
4025
bbeb641c 4026 if (oc == C_DISCONNECTING) {
360cc740
PR
4027 wait_event(tconn->net_cnt_wait, atomic_read(&tconn->net_cnt) == 0);
4028
4029 crypto_free_hash(tconn->cram_hmac_tfm);
4030 tconn->cram_hmac_tfm = NULL;
4031
4032 kfree(tconn->net_conf);
4033 tconn->net_conf = NULL;
bbeb641c 4034 conn_request_state(tconn, NS(conn, C_STANDALONE), CS_VERBOSE);
360cc740
PR
4035 }
4036}
4037
4038static int drbd_disconnected(int vnr, void *p, void *data)
4039{
4040 struct drbd_conf *mdev = (struct drbd_conf *)p;
4041 enum drbd_fencing_p fp;
4042 unsigned int i;
b411b363 4043
85719573 4044 /* wait for current activity to cease. */
87eeee41 4045 spin_lock_irq(&mdev->tconn->req_lock);
b411b363
PR
4046 _drbd_wait_ee_list_empty(mdev, &mdev->active_ee);
4047 _drbd_wait_ee_list_empty(mdev, &mdev->sync_ee);
4048 _drbd_wait_ee_list_empty(mdev, &mdev->read_ee);
87eeee41 4049 spin_unlock_irq(&mdev->tconn->req_lock);
b411b363
PR
4050
4051 /* We do not have data structures that would allow us to
4052 * get the rs_pending_cnt down to 0 again.
4053 * * On C_SYNC_TARGET we do not have any data structures describing
4054 * the pending RSDataRequest's we have sent.
4055 * * On C_SYNC_SOURCE there is no data structure that tracks
4056 * the P_RS_DATA_REPLY blocks that we sent to the SyncTarget.
4057 * And no, it is not the sum of the reference counts in the
4058 * resync_LRU. The resync_LRU tracks the whole operation including
4059 * the disk-IO, while the rs_pending_cnt only tracks the blocks
4060 * on the fly. */
4061 drbd_rs_cancel_all(mdev);
4062 mdev->rs_total = 0;
4063 mdev->rs_failed = 0;
4064 atomic_set(&mdev->rs_pending_cnt, 0);
4065 wake_up(&mdev->misc_wait);
4066
7fde2be9
PR
4067 del_timer(&mdev->request_timer);
4068
b411b363 4069 del_timer_sync(&mdev->resync_timer);
b411b363
PR
4070 resync_timer_fn((unsigned long)mdev);
4071
b411b363
PR
4072 /* wait for all w_e_end_data_req, w_e_end_rsdata_req, w_send_barrier,
4073 * w_make_resync_request etc. which may still be on the worker queue
4074 * to be "canceled" */
a21e9298 4075 drbd_flush_workqueue(mdev);
b411b363
PR
4076
4077 /* This also does reclaim_net_ee(). If we do this too early, we might
4078 * miss some resync ee and pages.*/
4079 drbd_process_done_ee(mdev);
4080
4081 kfree(mdev->p_uuid);
4082 mdev->p_uuid = NULL;
4083
fb22c402 4084 if (!is_susp(mdev->state))
2f5cdd0b 4085 tl_clear(mdev->tconn);
b411b363 4086
b411b363
PR
4087 drbd_md_sync(mdev);
4088
4089 fp = FP_DONT_CARE;
4090 if (get_ldev(mdev)) {
4091 fp = mdev->ldev->dc.fencing;
4092 put_ldev(mdev);
4093 }
4094
87f7be4c
PR
4095 if (mdev->state.role == R_PRIMARY && fp >= FP_RESOURCE && mdev->state.pdsk >= D_UNKNOWN)
4096 drbd_try_outdate_peer_async(mdev);
b411b363 4097
20ceb2b2
LE
4098 /* serialize with bitmap writeout triggered by the state change,
4099 * if any. */
4100 wait_event(mdev->misc_wait, !test_bit(BITMAP_IO, &mdev->flags));
4101
b411b363
PR
4102 /* tcp_close and release of sendpage pages can be deferred. I don't
4103 * want to use SO_LINGER, because apparently it can be deferred for
4104 * more than 20 seconds (longest time I checked).
4105 *
4106 * Actually we don't care for exactly when the network stack does its
4107 * put_page(), but release our reference on these pages right here.
4108 */
4109 i = drbd_release_ee(mdev, &mdev->net_ee);
4110 if (i)
4111 dev_info(DEV, "net_ee not empty, killed %u entries\n", i);
435f0740
LE
4112 i = atomic_read(&mdev->pp_in_use_by_net);
4113 if (i)
4114 dev_info(DEV, "pp_in_use_by_net = %d, expected 0\n", i);
b411b363
PR
4115 i = atomic_read(&mdev->pp_in_use);
4116 if (i)
45bb912b 4117 dev_info(DEV, "pp_in_use = %d, expected 0\n", i);
b411b363
PR
4118
4119 D_ASSERT(list_empty(&mdev->read_ee));
4120 D_ASSERT(list_empty(&mdev->active_ee));
4121 D_ASSERT(list_empty(&mdev->sync_ee));
4122 D_ASSERT(list_empty(&mdev->done_ee));
4123
4124 /* ok, no more ee's on the fly, it is safe to reset the epoch_size */
4125 atomic_set(&mdev->current_epoch->epoch_size, 0);
4126 D_ASSERT(list_empty(&mdev->current_epoch->list));
360cc740
PR
4127
4128 return 0;
b411b363
PR
4129}
4130
4131/*
4132 * We support PRO_VERSION_MIN to PRO_VERSION_MAX. The protocol version
4133 * we can agree on is stored in agreed_pro_version.
4134 *
4135 * feature flags and the reserved array should be enough room for future
4136 * enhancements of the handshake protocol, and possible plugins...
4137 *
4138 * for now, they are expected to be zero, but ignored.
4139 */
8a22cccc 4140static int drbd_send_handshake(struct drbd_tconn *tconn)
b411b363 4141{
e6b3ea83 4142 /* ASSERT current == mdev->tconn->receiver ... */
8a22cccc 4143 struct p_handshake *p = &tconn->data.sbuf.handshake;
e8d17b01 4144 int err;
b411b363 4145
8a22cccc
PR
4146 if (mutex_lock_interruptible(&tconn->data.mutex)) {
4147 conn_err(tconn, "interrupted during initial handshake\n");
e8d17b01 4148 return -EINTR;
b411b363
PR
4149 }
4150
8a22cccc
PR
4151 if (tconn->data.socket == NULL) {
4152 mutex_unlock(&tconn->data.mutex);
e8d17b01 4153 return -EIO;
b411b363
PR
4154 }
4155
4156 memset(p, 0, sizeof(*p));
4157 p->protocol_min = cpu_to_be32(PRO_VERSION_MIN);
4158 p->protocol_max = cpu_to_be32(PRO_VERSION_MAX);
e8d17b01 4159 err = _conn_send_cmd(tconn, 0, tconn->data.socket, P_HAND_SHAKE,
ecf2363c 4160 &p->head, sizeof(*p), 0);
8a22cccc 4161 mutex_unlock(&tconn->data.mutex);
e8d17b01 4162 return err;
b411b363
PR
4163}
4164
4165/*
4166 * return values:
4167 * 1 yes, we have a valid connection
4168 * 0 oops, did not work out, please try again
4169 * -1 peer talks different language,
4170 * no point in trying again, please go standalone.
4171 */
65d11ed6 4172static int drbd_do_handshake(struct drbd_tconn *tconn)
b411b363 4173{
65d11ed6
PR
4174 /* ASSERT current == tconn->receiver ... */
4175 struct p_handshake *p = &tconn->data.rbuf.handshake;
02918be2 4176 const int expect = sizeof(struct p_handshake) - sizeof(struct p_header80);
77351055 4177 struct packet_info pi;
e8d17b01 4178 int err, rv;
b411b363 4179
e8d17b01
AG
4180 err = drbd_send_handshake(tconn);
4181 if (err)
b411b363
PR
4182 return 0;
4183
69bc7bc3
AG
4184 err = drbd_recv_header(tconn, &pi);
4185 if (err)
b411b363
PR
4186 return 0;
4187
77351055 4188 if (pi.cmd != P_HAND_SHAKE) {
65d11ed6 4189 conn_err(tconn, "expected HandShake packet, received: %s (0x%04x)\n",
77351055 4190 cmdname(pi.cmd), pi.cmd);
b411b363
PR
4191 return -1;
4192 }
4193
77351055 4194 if (pi.size != expect) {
65d11ed6 4195 conn_err(tconn, "expected HandShake length: %u, received: %u\n",
77351055 4196 expect, pi.size);
b411b363
PR
4197 return -1;
4198 }
4199
65d11ed6 4200 rv = drbd_recv(tconn, &p->head.payload, expect);
b411b363
PR
4201
4202 if (rv != expect) {
0ddc5549 4203 if (!signal_pending(current))
65d11ed6 4204 conn_warn(tconn, "short read receiving handshake packet: l=%u\n", rv);
b411b363
PR
4205 return 0;
4206 }
4207
b411b363
PR
4208 p->protocol_min = be32_to_cpu(p->protocol_min);
4209 p->protocol_max = be32_to_cpu(p->protocol_max);
4210 if (p->protocol_max == 0)
4211 p->protocol_max = p->protocol_min;
4212
4213 if (PRO_VERSION_MAX < p->protocol_min ||
4214 PRO_VERSION_MIN > p->protocol_max)
4215 goto incompat;
4216
65d11ed6 4217 tconn->agreed_pro_version = min_t(int, PRO_VERSION_MAX, p->protocol_max);
b411b363 4218
65d11ed6
PR
4219 conn_info(tconn, "Handshake successful: "
4220 "Agreed network protocol version %d\n", tconn->agreed_pro_version);
b411b363
PR
4221
4222 return 1;
4223
4224 incompat:
65d11ed6 4225 conn_err(tconn, "incompatible DRBD dialects: "
b411b363
PR
4226 "I support %d-%d, peer supports %d-%d\n",
4227 PRO_VERSION_MIN, PRO_VERSION_MAX,
4228 p->protocol_min, p->protocol_max);
4229 return -1;
4230}
4231
4232#if !defined(CONFIG_CRYPTO_HMAC) && !defined(CONFIG_CRYPTO_HMAC_MODULE)
13e6037d 4233static int drbd_do_auth(struct drbd_tconn *tconn)
b411b363
PR
4234{
4235 dev_err(DEV, "This kernel was build without CONFIG_CRYPTO_HMAC.\n");
4236 dev_err(DEV, "You need to disable 'cram-hmac-alg' in drbd.conf.\n");
b10d96cb 4237 return -1;
b411b363
PR
4238}
4239#else
4240#define CHALLENGE_LEN 64
b10d96cb
JT
4241
4242/* Return value:
4243 1 - auth succeeded,
4244 0 - failed, try again (network error),
4245 -1 - auth failed, don't try again.
4246*/
4247
13e6037d 4248static int drbd_do_auth(struct drbd_tconn *tconn)
b411b363
PR
4249{
4250 char my_challenge[CHALLENGE_LEN]; /* 64 Bytes... */
4251 struct scatterlist sg;
4252 char *response = NULL;
4253 char *right_response = NULL;
4254 char *peers_ch = NULL;
13e6037d 4255 unsigned int key_len = strlen(tconn->net_conf->shared_secret);
b411b363
PR
4256 unsigned int resp_size;
4257 struct hash_desc desc;
77351055 4258 struct packet_info pi;
69bc7bc3 4259 int err, rv;
b411b363 4260
13e6037d 4261 desc.tfm = tconn->cram_hmac_tfm;
b411b363
PR
4262 desc.flags = 0;
4263
13e6037d
PR
4264 rv = crypto_hash_setkey(tconn->cram_hmac_tfm,
4265 (u8 *)tconn->net_conf->shared_secret, key_len);
b411b363 4266 if (rv) {
13e6037d 4267 conn_err(tconn, "crypto_hash_setkey() failed with %d\n", rv);
b10d96cb 4268 rv = -1;
b411b363
PR
4269 goto fail;
4270 }
4271
4272 get_random_bytes(my_challenge, CHALLENGE_LEN);
4273
ce9879cb 4274 rv = !conn_send_cmd2(tconn, P_AUTH_CHALLENGE, my_challenge, CHALLENGE_LEN);
b411b363
PR
4275 if (!rv)
4276 goto fail;
4277
69bc7bc3
AG
4278 err = drbd_recv_header(tconn, &pi);
4279 if (err) {
4280 rv = 0;
b411b363 4281 goto fail;
69bc7bc3 4282 }
b411b363 4283
77351055 4284 if (pi.cmd != P_AUTH_CHALLENGE) {
13e6037d 4285 conn_err(tconn, "expected AuthChallenge packet, received: %s (0x%04x)\n",
77351055 4286 cmdname(pi.cmd), pi.cmd);
b411b363
PR
4287 rv = 0;
4288 goto fail;
4289 }
4290
77351055 4291 if (pi.size > CHALLENGE_LEN * 2) {
13e6037d 4292 conn_err(tconn, "expected AuthChallenge payload too big.\n");
b10d96cb 4293 rv = -1;
b411b363
PR
4294 goto fail;
4295 }
4296
77351055 4297 peers_ch = kmalloc(pi.size, GFP_NOIO);
b411b363 4298 if (peers_ch == NULL) {
13e6037d 4299 conn_err(tconn, "kmalloc of peers_ch failed\n");
b10d96cb 4300 rv = -1;
b411b363
PR
4301 goto fail;
4302 }
4303
13e6037d 4304 rv = drbd_recv(tconn, peers_ch, pi.size);
b411b363 4305
77351055 4306 if (rv != pi.size) {
0ddc5549 4307 if (!signal_pending(current))
13e6037d 4308 conn_warn(tconn, "short read AuthChallenge: l=%u\n", rv);
b411b363
PR
4309 rv = 0;
4310 goto fail;
4311 }
4312
13e6037d 4313 resp_size = crypto_hash_digestsize(tconn->cram_hmac_tfm);
b411b363
PR
4314 response = kmalloc(resp_size, GFP_NOIO);
4315 if (response == NULL) {
13e6037d 4316 conn_err(tconn, "kmalloc of response failed\n");
b10d96cb 4317 rv = -1;
b411b363
PR
4318 goto fail;
4319 }
4320
4321 sg_init_table(&sg, 1);
77351055 4322 sg_set_buf(&sg, peers_ch, pi.size);
b411b363
PR
4323
4324 rv = crypto_hash_digest(&desc, &sg, sg.length, response);
4325 if (rv) {
13e6037d 4326 conn_err(tconn, "crypto_hash_digest() failed with %d\n", rv);
b10d96cb 4327 rv = -1;
b411b363
PR
4328 goto fail;
4329 }
4330
ce9879cb 4331 rv = !conn_send_cmd2(tconn, P_AUTH_RESPONSE, response, resp_size);
b411b363
PR
4332 if (!rv)
4333 goto fail;
4334
69bc7bc3
AG
4335 err = drbd_recv_header(tconn, &pi);
4336 if (err) {
4337 rv = 0;
b411b363 4338 goto fail;
69bc7bc3 4339 }
b411b363 4340
77351055 4341 if (pi.cmd != P_AUTH_RESPONSE) {
13e6037d 4342 conn_err(tconn, "expected AuthResponse packet, received: %s (0x%04x)\n",
77351055 4343 cmdname(pi.cmd), pi.cmd);
b411b363
PR
4344 rv = 0;
4345 goto fail;
4346 }
4347
77351055 4348 if (pi.size != resp_size) {
13e6037d 4349 conn_err(tconn, "expected AuthResponse payload of wrong size\n");
b411b363
PR
4350 rv = 0;
4351 goto fail;
4352 }
4353
13e6037d 4354 rv = drbd_recv(tconn, response , resp_size);
b411b363
PR
4355
4356 if (rv != resp_size) {
0ddc5549 4357 if (!signal_pending(current))
13e6037d 4358 conn_warn(tconn, "short read receiving AuthResponse: l=%u\n", rv);
b411b363
PR
4359 rv = 0;
4360 goto fail;
4361 }
4362
4363 right_response = kmalloc(resp_size, GFP_NOIO);
2d1ee87d 4364 if (right_response == NULL) {
13e6037d 4365 conn_err(tconn, "kmalloc of right_response failed\n");
b10d96cb 4366 rv = -1;
b411b363
PR
4367 goto fail;
4368 }
4369
4370 sg_set_buf(&sg, my_challenge, CHALLENGE_LEN);
4371
4372 rv = crypto_hash_digest(&desc, &sg, sg.length, right_response);
4373 if (rv) {
13e6037d 4374 conn_err(tconn, "crypto_hash_digest() failed with %d\n", rv);
b10d96cb 4375 rv = -1;
b411b363
PR
4376 goto fail;
4377 }
4378
4379 rv = !memcmp(response, right_response, resp_size);
4380
4381 if (rv)
13e6037d
PR
4382 conn_info(tconn, "Peer authenticated using %d bytes of '%s' HMAC\n",
4383 resp_size, tconn->net_conf->cram_hmac_alg);
b10d96cb
JT
4384 else
4385 rv = -1;
b411b363
PR
4386
4387 fail:
4388 kfree(peers_ch);
4389 kfree(response);
4390 kfree(right_response);
4391
4392 return rv;
4393}
4394#endif
4395
4396int drbdd_init(struct drbd_thread *thi)
4397{
392c8801 4398 struct drbd_tconn *tconn = thi->tconn;
b411b363
PR
4399 int h;
4400
4d641dd7 4401 conn_info(tconn, "receiver (re)started\n");
b411b363
PR
4402
4403 do {
4d641dd7 4404 h = drbd_connect(tconn);
b411b363 4405 if (h == 0) {
4d641dd7 4406 drbd_disconnect(tconn);
20ee6390 4407 schedule_timeout_interruptible(HZ);
b411b363
PR
4408 }
4409 if (h == -1) {
4d641dd7 4410 conn_warn(tconn, "Discarding network configuration.\n");
bbeb641c 4411 conn_request_state(tconn, NS(conn, C_DISCONNECTING), CS_HARD);
b411b363
PR
4412 }
4413 } while (h == 0);
4414
4415 if (h > 0) {
4d641dd7
PR
4416 if (get_net_conf(tconn)) {
4417 drbdd(tconn);
4418 put_net_conf(tconn);
b411b363
PR
4419 }
4420 }
4421
4d641dd7 4422 drbd_disconnect(tconn);
b411b363 4423
4d641dd7 4424 conn_info(tconn, "receiver terminated\n");
b411b363
PR
4425 return 0;
4426}
4427
4428/* ********* acknowledge sender ******** */
4429
e4f78ede
PR
4430static int got_conn_RqSReply(struct drbd_tconn *tconn, enum drbd_packet cmd)
4431{
4432 struct p_req_state_reply *p = &tconn->meta.rbuf.req_state_reply;
4433 int retcode = be32_to_cpu(p->retcode);
4434
4435 if (retcode >= SS_SUCCESS) {
4436 set_bit(CONN_WD_ST_CHG_OKAY, &tconn->flags);
4437 } else {
4438 set_bit(CONN_WD_ST_CHG_FAIL, &tconn->flags);
4439 conn_err(tconn, "Requested state change failed by peer: %s (%d)\n",
4440 drbd_set_st_err_str(retcode), retcode);
4441 }
4442 wake_up(&tconn->ping_wait);
4443
4444 return true;
4445}
4446
d8763023 4447static int got_RqSReply(struct drbd_conf *mdev, enum drbd_packet cmd)
b411b363 4448{
257d0af6 4449 struct p_req_state_reply *p = &mdev->tconn->meta.rbuf.req_state_reply;
b411b363
PR
4450 int retcode = be32_to_cpu(p->retcode);
4451
e4f78ede
PR
4452 if (retcode >= SS_SUCCESS) {
4453 set_bit(CL_ST_CHG_SUCCESS, &mdev->flags);
4454 } else {
4455 set_bit(CL_ST_CHG_FAIL, &mdev->flags);
4456 dev_err(DEV, "Requested state change failed by peer: %s (%d)\n",
4457 drbd_set_st_err_str(retcode), retcode);
b411b363 4458 }
e4f78ede
PR
4459 wake_up(&mdev->state_wait);
4460
81e84650 4461 return true;
b411b363
PR
4462}
4463
f19e4f8b 4464static int got_Ping(struct drbd_tconn *tconn, enum drbd_packet cmd)
b411b363 4465{
f19e4f8b 4466 return drbd_send_ping_ack(tconn);
b411b363
PR
4467
4468}
4469
f19e4f8b 4470static int got_PingAck(struct drbd_tconn *tconn, enum drbd_packet cmd)
b411b363
PR
4471{
4472 /* restore idle timeout */
2a67d8b9
PR
4473 tconn->meta.socket->sk->sk_rcvtimeo = tconn->net_conf->ping_int*HZ;
4474 if (!test_and_set_bit(GOT_PING_ACK, &tconn->flags))
4475 wake_up(&tconn->ping_wait);
b411b363 4476
81e84650 4477 return true;
b411b363
PR
4478}
4479
d8763023 4480static int got_IsInSync(struct drbd_conf *mdev, enum drbd_packet cmd)
b411b363 4481{
257d0af6 4482 struct p_block_ack *p = &mdev->tconn->meta.rbuf.block_ack;
b411b363
PR
4483 sector_t sector = be64_to_cpu(p->sector);
4484 int blksize = be32_to_cpu(p->blksize);
4485
31890f4a 4486 D_ASSERT(mdev->tconn->agreed_pro_version >= 89);
b411b363
PR
4487
4488 update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4489
1d53f09e
LE
4490 if (get_ldev(mdev)) {
4491 drbd_rs_complete_io(mdev, sector);
4492 drbd_set_in_sync(mdev, sector, blksize);
4493 /* rs_same_csums is supposed to count in units of BM_BLOCK_SIZE */
4494 mdev->rs_same_csum += (blksize >> BM_BLOCK_SHIFT);
4495 put_ldev(mdev);
4496 }
b411b363 4497 dec_rs_pending(mdev);
778f271d 4498 atomic_add(blksize >> 9, &mdev->rs_sect_in);
b411b363 4499
81e84650 4500 return true;
b411b363
PR
4501}
4502
bc9c5c41
AG
4503static int
4504validate_req_change_req_state(struct drbd_conf *mdev, u64 id, sector_t sector,
4505 struct rb_root *root, const char *func,
4506 enum drbd_req_event what, bool missing_ok)
b411b363
PR
4507{
4508 struct drbd_request *req;
4509 struct bio_and_error m;
4510
87eeee41 4511 spin_lock_irq(&mdev->tconn->req_lock);
bc9c5c41 4512 req = find_request(mdev, root, id, sector, missing_ok, func);
b411b363 4513 if (unlikely(!req)) {
87eeee41 4514 spin_unlock_irq(&mdev->tconn->req_lock);
81e84650 4515 return false;
b411b363
PR
4516 }
4517 __req_mod(req, what, &m);
87eeee41 4518 spin_unlock_irq(&mdev->tconn->req_lock);
b411b363
PR
4519
4520 if (m.bio)
4521 complete_master_bio(mdev, &m);
81e84650 4522 return true;
b411b363
PR
4523}
4524
d8763023 4525static int got_BlockAck(struct drbd_conf *mdev, enum drbd_packet cmd)
b411b363 4526{
257d0af6 4527 struct p_block_ack *p = &mdev->tconn->meta.rbuf.block_ack;
b411b363
PR
4528 sector_t sector = be64_to_cpu(p->sector);
4529 int blksize = be32_to_cpu(p->blksize);
4530 enum drbd_req_event what;
4531
4532 update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4533
579b57ed 4534 if (p->block_id == ID_SYNCER) {
b411b363
PR
4535 drbd_set_in_sync(mdev, sector, blksize);
4536 dec_rs_pending(mdev);
81e84650 4537 return true;
b411b363 4538 }
257d0af6 4539 switch (cmd) {
b411b363 4540 case P_RS_WRITE_ACK:
89e58e75 4541 D_ASSERT(mdev->tconn->net_conf->wire_protocol == DRBD_PROT_C);
8554df1c 4542 what = WRITE_ACKED_BY_PEER_AND_SIS;
b411b363
PR
4543 break;
4544 case P_WRITE_ACK:
89e58e75 4545 D_ASSERT(mdev->tconn->net_conf->wire_protocol == DRBD_PROT_C);
8554df1c 4546 what = WRITE_ACKED_BY_PEER;
b411b363
PR
4547 break;
4548 case P_RECV_ACK:
89e58e75 4549 D_ASSERT(mdev->tconn->net_conf->wire_protocol == DRBD_PROT_B);
8554df1c 4550 what = RECV_ACKED_BY_PEER;
b411b363 4551 break;
7be8da07 4552 case P_DISCARD_WRITE:
89e58e75 4553 D_ASSERT(mdev->tconn->net_conf->wire_protocol == DRBD_PROT_C);
7be8da07
AG
4554 what = DISCARD_WRITE;
4555 break;
4556 case P_RETRY_WRITE:
4557 D_ASSERT(mdev->tconn->net_conf->wire_protocol == DRBD_PROT_C);
4558 what = POSTPONE_WRITE;
b411b363
PR
4559 break;
4560 default:
4561 D_ASSERT(0);
81e84650 4562 return false;
b411b363
PR
4563 }
4564
4565 return validate_req_change_req_state(mdev, p->block_id, sector,
bc9c5c41
AG
4566 &mdev->write_requests, __func__,
4567 what, false);
b411b363
PR
4568}
4569
d8763023 4570static int got_NegAck(struct drbd_conf *mdev, enum drbd_packet cmd)
b411b363 4571{
257d0af6 4572 struct p_block_ack *p = &mdev->tconn->meta.rbuf.block_ack;
b411b363 4573 sector_t sector = be64_to_cpu(p->sector);
2deb8336 4574 int size = be32_to_cpu(p->blksize);
89e58e75
PR
4575 bool missing_ok = mdev->tconn->net_conf->wire_protocol == DRBD_PROT_A ||
4576 mdev->tconn->net_conf->wire_protocol == DRBD_PROT_B;
c3afd8f5 4577 bool found;
b411b363
PR
4578
4579 update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4580
579b57ed 4581 if (p->block_id == ID_SYNCER) {
b411b363
PR
4582 dec_rs_pending(mdev);
4583 drbd_rs_failed_io(mdev, sector, size);
81e84650 4584 return true;
b411b363 4585 }
2deb8336 4586
c3afd8f5 4587 found = validate_req_change_req_state(mdev, p->block_id, sector,
bc9c5c41 4588 &mdev->write_requests, __func__,
8554df1c 4589 NEG_ACKED, missing_ok);
c3afd8f5
AG
4590 if (!found) {
4591 /* Protocol A has no P_WRITE_ACKs, but has P_NEG_ACKs.
4592 The master bio might already be completed, therefore the
4593 request is no longer in the collision hash. */
4594 /* In Protocol B we might already have got a P_RECV_ACK
4595 but then get a P_NEG_ACK afterwards. */
4596 if (!missing_ok)
2deb8336 4597 return false;
c3afd8f5 4598 drbd_set_out_of_sync(mdev, sector, size);
2deb8336 4599 }
2deb8336 4600 return true;
b411b363
PR
4601}
4602
d8763023 4603static int got_NegDReply(struct drbd_conf *mdev, enum drbd_packet cmd)
b411b363 4604{
257d0af6 4605 struct p_block_ack *p = &mdev->tconn->meta.rbuf.block_ack;
b411b363
PR
4606 sector_t sector = be64_to_cpu(p->sector);
4607
4608 update_peer_seq(mdev, be32_to_cpu(p->seq_num));
7be8da07 4609
b411b363
PR
4610 dev_err(DEV, "Got NegDReply; Sector %llus, len %u; Fail original request.\n",
4611 (unsigned long long)sector, be32_to_cpu(p->blksize));
4612
4613 return validate_req_change_req_state(mdev, p->block_id, sector,
bc9c5c41 4614 &mdev->read_requests, __func__,
8554df1c 4615 NEG_ACKED, false);
b411b363
PR
4616}
4617
d8763023 4618static int got_NegRSDReply(struct drbd_conf *mdev, enum drbd_packet cmd)
b411b363
PR
4619{
4620 sector_t sector;
4621 int size;
257d0af6 4622 struct p_block_ack *p = &mdev->tconn->meta.rbuf.block_ack;
b411b363
PR
4623
4624 sector = be64_to_cpu(p->sector);
4625 size = be32_to_cpu(p->blksize);
b411b363
PR
4626
4627 update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4628
4629 dec_rs_pending(mdev);
4630
4631 if (get_ldev_if_state(mdev, D_FAILED)) {
4632 drbd_rs_complete_io(mdev, sector);
257d0af6 4633 switch (cmd) {
d612d309
PR
4634 case P_NEG_RS_DREPLY:
4635 drbd_rs_failed_io(mdev, sector, size);
4636 case P_RS_CANCEL:
4637 break;
4638 default:
4639 D_ASSERT(0);
4640 put_ldev(mdev);
4641 return false;
4642 }
b411b363
PR
4643 put_ldev(mdev);
4644 }
4645
81e84650 4646 return true;
b411b363
PR
4647}
4648
d8763023 4649static int got_BarrierAck(struct drbd_conf *mdev, enum drbd_packet cmd)
b411b363 4650{
257d0af6 4651 struct p_barrier_ack *p = &mdev->tconn->meta.rbuf.barrier_ack;
b411b363 4652
2f5cdd0b 4653 tl_release(mdev->tconn, p->barrier, be32_to_cpu(p->set_size));
b411b363 4654
c4752ef1
PR
4655 if (mdev->state.conn == C_AHEAD &&
4656 atomic_read(&mdev->ap_in_flight) == 0 &&
370a43e7
PR
4657 !test_and_set_bit(AHEAD_TO_SYNC_SOURCE, &mdev->current_epoch->flags)) {
4658 mdev->start_resync_timer.expires = jiffies + HZ;
4659 add_timer(&mdev->start_resync_timer);
c4752ef1
PR
4660 }
4661
81e84650 4662 return true;
b411b363
PR
4663}
4664
d8763023 4665static int got_OVResult(struct drbd_conf *mdev, enum drbd_packet cmd)
b411b363 4666{
257d0af6 4667 struct p_block_ack *p = &mdev->tconn->meta.rbuf.block_ack;
b411b363
PR
4668 struct drbd_work *w;
4669 sector_t sector;
4670 int size;
4671
4672 sector = be64_to_cpu(p->sector);
4673 size = be32_to_cpu(p->blksize);
4674
4675 update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4676
4677 if (be64_to_cpu(p->block_id) == ID_OUT_OF_SYNC)
4678 drbd_ov_oos_found(mdev, sector, size);
4679 else
4680 ov_oos_print(mdev);
4681
1d53f09e 4682 if (!get_ldev(mdev))
81e84650 4683 return true;
1d53f09e 4684
b411b363
PR
4685 drbd_rs_complete_io(mdev, sector);
4686 dec_rs_pending(mdev);
4687
ea5442af
LE
4688 --mdev->ov_left;
4689
4690 /* let's advance progress step marks only for every other megabyte */
4691 if ((mdev->ov_left & 0x200) == 0x200)
4692 drbd_advance_rs_marks(mdev, mdev->ov_left);
4693
4694 if (mdev->ov_left == 0) {
b411b363
PR
4695 w = kmalloc(sizeof(*w), GFP_NOIO);
4696 if (w) {
4697 w->cb = w_ov_finished;
a21e9298 4698 w->mdev = mdev;
e42325a5 4699 drbd_queue_work_front(&mdev->tconn->data.work, w);
b411b363
PR
4700 } else {
4701 dev_err(DEV, "kmalloc(w) failed.");
4702 ov_oos_print(mdev);
4703 drbd_resync_finished(mdev);
4704 }
4705 }
1d53f09e 4706 put_ldev(mdev);
81e84650 4707 return true;
b411b363
PR
4708}
4709
d8763023 4710static int got_skip(struct drbd_conf *mdev, enum drbd_packet cmd)
0ced55a3 4711{
81e84650 4712 return true;
0ced55a3
PR
4713}
4714
32862ec7
PR
4715static int tconn_process_done_ee(struct drbd_tconn *tconn)
4716{
082a3439
PR
4717 struct drbd_conf *mdev;
4718 int i, not_empty = 0;
32862ec7
PR
4719
4720 do {
4721 clear_bit(SIGNAL_ASENDER, &tconn->flags);
4722 flush_signals(current);
082a3439 4723 idr_for_each_entry(&tconn->volumes, mdev, i) {
e2b3032b 4724 if (drbd_process_done_ee(mdev))
082a3439
PR
4725 return 1; /* error */
4726 }
32862ec7 4727 set_bit(SIGNAL_ASENDER, &tconn->flags);
082a3439
PR
4728
4729 spin_lock_irq(&tconn->req_lock);
4730 idr_for_each_entry(&tconn->volumes, mdev, i) {
4731 not_empty = !list_empty(&mdev->done_ee);
4732 if (not_empty)
4733 break;
4734 }
4735 spin_unlock_irq(&tconn->req_lock);
32862ec7
PR
4736 } while (not_empty);
4737
4738 return 0;
4739}
4740
7201b972
AG
4741struct asender_cmd {
4742 size_t pkt_size;
a4fbda8e
PR
4743 enum mdev_or_conn fa_type; /* first argument's type */
4744 union {
4745 int (*mdev_fn)(struct drbd_conf *mdev, enum drbd_packet cmd);
4746 int (*conn_fn)(struct drbd_tconn *tconn, enum drbd_packet cmd);
4747 };
7201b972
AG
4748};
4749
4750static struct asender_cmd asender_tbl[] = {
f19e4f8b
PR
4751 [P_PING] = { sizeof(struct p_header), CONN, { .conn_fn = got_Ping } },
4752 [P_PING_ACK] = { sizeof(struct p_header), CONN, { .conn_fn = got_PingAck } },
a4fbda8e
PR
4753 [P_RECV_ACK] = { sizeof(struct p_block_ack), MDEV, { got_BlockAck } },
4754 [P_WRITE_ACK] = { sizeof(struct p_block_ack), MDEV, { got_BlockAck } },
4755 [P_RS_WRITE_ACK] = { sizeof(struct p_block_ack), MDEV, { got_BlockAck } },
4756 [P_DISCARD_WRITE] = { sizeof(struct p_block_ack), MDEV, { got_BlockAck } },
4757 [P_NEG_ACK] = { sizeof(struct p_block_ack), MDEV, { got_NegAck } },
4758 [P_NEG_DREPLY] = { sizeof(struct p_block_ack), MDEV, { got_NegDReply } },
4759 [P_NEG_RS_DREPLY] = { sizeof(struct p_block_ack), MDEV, { got_NegRSDReply } },
4760 [P_OV_RESULT] = { sizeof(struct p_block_ack), MDEV, { got_OVResult } },
4761 [P_BARRIER_ACK] = { sizeof(struct p_barrier_ack), MDEV, { got_BarrierAck } },
4762 [P_STATE_CHG_REPLY] = { sizeof(struct p_req_state_reply), MDEV, { got_RqSReply } },
4763 [P_RS_IS_IN_SYNC] = { sizeof(struct p_block_ack), MDEV, { got_IsInSync } },
4764 [P_DELAY_PROBE] = { sizeof(struct p_delay_probe93), MDEV, { got_skip } },
4765 [P_RS_CANCEL] = { sizeof(struct p_block_ack), MDEV, { got_NegRSDReply } },
e4f78ede 4766 [P_CONN_ST_CHG_REPLY]={ sizeof(struct p_req_state_reply), CONN, {.conn_fn = got_conn_RqSReply}},
a4fbda8e 4767 [P_RETRY_WRITE] = { sizeof(struct p_block_ack), MDEV, { got_BlockAck } },
7201b972
AG
4768};
4769
b411b363
PR
4770int drbd_asender(struct drbd_thread *thi)
4771{
392c8801 4772 struct drbd_tconn *tconn = thi->tconn;
32862ec7 4773 struct p_header *h = &tconn->meta.rbuf.header;
b411b363 4774 struct asender_cmd *cmd = NULL;
77351055 4775 struct packet_info pi;
257d0af6 4776 int rv;
b411b363
PR
4777 void *buf = h;
4778 int received = 0;
257d0af6 4779 int expect = sizeof(struct p_header);
f36af18c 4780 int ping_timeout_active = 0;
b411b363 4781
b411b363
PR
4782 current->policy = SCHED_RR; /* Make this a realtime task! */
4783 current->rt_priority = 2; /* more important than all other tasks */
4784
e77a0a5c 4785 while (get_t_state(thi) == RUNNING) {
80822284 4786 drbd_thread_current_set_cpu(thi);
32862ec7 4787 if (test_and_clear_bit(SEND_PING, &tconn->flags)) {
2a67d8b9 4788 if (!drbd_send_ping(tconn)) {
32862ec7 4789 conn_err(tconn, "drbd_send_ping has failed\n");
841ce241
AG
4790 goto reconnect;
4791 }
32862ec7
PR
4792 tconn->meta.socket->sk->sk_rcvtimeo =
4793 tconn->net_conf->ping_timeo*HZ/10;
f36af18c 4794 ping_timeout_active = 1;
b411b363
PR
4795 }
4796
32862ec7
PR
4797 /* TODO: conditionally cork; it may hurt latency if we cork without
4798 much to send */
4799 if (!tconn->net_conf->no_cork)
4800 drbd_tcp_cork(tconn->meta.socket);
082a3439
PR
4801 if (tconn_process_done_ee(tconn)) {
4802 conn_err(tconn, "tconn_process_done_ee() failed\n");
32862ec7 4803 goto reconnect;
082a3439 4804 }
b411b363 4805 /* but unconditionally uncork unless disabled */
32862ec7
PR
4806 if (!tconn->net_conf->no_cork)
4807 drbd_tcp_uncork(tconn->meta.socket);
b411b363
PR
4808
4809 /* short circuit, recv_msg would return EINTR anyways. */
4810 if (signal_pending(current))
4811 continue;
4812
32862ec7
PR
4813 rv = drbd_recv_short(tconn->meta.socket, buf, expect-received, 0);
4814 clear_bit(SIGNAL_ASENDER, &tconn->flags);
b411b363
PR
4815
4816 flush_signals(current);
4817
4818 /* Note:
4819 * -EINTR (on meta) we got a signal
4820 * -EAGAIN (on meta) rcvtimeo expired
4821 * -ECONNRESET other side closed the connection
4822 * -ERESTARTSYS (on data) we got a signal
4823 * rv < 0 other than above: unexpected error!
4824 * rv == expected: full header or command
4825 * rv < expected: "woken" by signal during receive
4826 * rv == 0 : "connection shut down by peer"
4827 */
4828 if (likely(rv > 0)) {
4829 received += rv;
4830 buf += rv;
4831 } else if (rv == 0) {
32862ec7 4832 conn_err(tconn, "meta connection shut down by peer.\n");
b411b363
PR
4833 goto reconnect;
4834 } else if (rv == -EAGAIN) {
cb6518cb
LE
4835 /* If the data socket received something meanwhile,
4836 * that is good enough: peer is still alive. */
32862ec7
PR
4837 if (time_after(tconn->last_received,
4838 jiffies - tconn->meta.socket->sk->sk_rcvtimeo))
cb6518cb 4839 continue;
f36af18c 4840 if (ping_timeout_active) {
32862ec7 4841 conn_err(tconn, "PingAck did not arrive in time.\n");
b411b363
PR
4842 goto reconnect;
4843 }
32862ec7 4844 set_bit(SEND_PING, &tconn->flags);
b411b363
PR
4845 continue;
4846 } else if (rv == -EINTR) {
4847 continue;
4848 } else {
32862ec7 4849 conn_err(tconn, "sock_recvmsg returned %d\n", rv);
b411b363
PR
4850 goto reconnect;
4851 }
4852
4853 if (received == expect && cmd == NULL) {
8172f3e9 4854 if (decode_header(tconn, h, &pi))
b411b363 4855 goto reconnect;
7201b972
AG
4856 cmd = &asender_tbl[pi.cmd];
4857 if (pi.cmd >= ARRAY_SIZE(asender_tbl) || !cmd) {
32862ec7 4858 conn_err(tconn, "unknown command %d on meta (l: %d)\n",
77351055 4859 pi.cmd, pi.size);
b411b363
PR
4860 goto disconnect;
4861 }
4862 expect = cmd->pkt_size;
77351055 4863 if (pi.size != expect - sizeof(struct p_header)) {
32862ec7 4864 conn_err(tconn, "Wrong packet size on meta (c: %d, l: %d)\n",
77351055 4865 pi.cmd, pi.size);
b411b363 4866 goto reconnect;
257d0af6 4867 }
b411b363
PR
4868 }
4869 if (received == expect) {
a4fbda8e
PR
4870 bool rv;
4871
4872 if (cmd->fa_type == CONN) {
4873 rv = cmd->conn_fn(tconn, pi.cmd);
4874 } else {
4875 struct drbd_conf *mdev = vnr_to_mdev(tconn, pi.vnr);
4876 rv = cmd->mdev_fn(mdev, pi.cmd);
4877 }
4878
4879 if (!rv)
b411b363
PR
4880 goto reconnect;
4881
a4fbda8e
PR
4882 tconn->last_received = jiffies;
4883
f36af18c
LE
4884 /* the idle_timeout (ping-int)
4885 * has been restored in got_PingAck() */
7201b972 4886 if (cmd == &asender_tbl[P_PING_ACK])
f36af18c
LE
4887 ping_timeout_active = 0;
4888
b411b363
PR
4889 buf = h;
4890 received = 0;
257d0af6 4891 expect = sizeof(struct p_header);
b411b363
PR
4892 cmd = NULL;
4893 }
4894 }
4895
4896 if (0) {
4897reconnect:
bbeb641c 4898 conn_request_state(tconn, NS(conn, C_NETWORK_FAILURE), CS_HARD);
b411b363
PR
4899 }
4900 if (0) {
4901disconnect:
bbeb641c 4902 conn_request_state(tconn, NS(conn, C_DISCONNECTING), CS_HARD);
b411b363 4903 }
32862ec7 4904 clear_bit(SIGNAL_ASENDER, &tconn->flags);
b411b363 4905
32862ec7 4906 conn_info(tconn, "asender terminated\n");
b411b363
PR
4907
4908 return 0;
4909}
This page took 0.416185 seconds and 5 git commands to generate.