drbd: drbd_drain_block(): Return 0 upon success and an error code otherwise
[deliverable/linux.git] / drivers / block / drbd / drbd_receiver.c
CommitLineData
b411b363
PR
1/*
2 drbd_receiver.c
3
4 This file is part of DRBD by Philipp Reisner and Lars Ellenberg.
5
6 Copyright (C) 2001-2008, LINBIT Information Technologies GmbH.
7 Copyright (C) 1999-2008, Philipp Reisner <philipp.reisner@linbit.com>.
8 Copyright (C) 2002-2008, Lars Ellenberg <lars.ellenberg@linbit.com>.
9
10 drbd is free software; you can redistribute it and/or modify
11 it under the terms of the GNU General Public License as published by
12 the Free Software Foundation; either version 2, or (at your option)
13 any later version.
14
15 drbd is distributed in the hope that it will be useful,
16 but WITHOUT ANY WARRANTY; without even the implied warranty of
17 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18 GNU General Public License for more details.
19
20 You should have received a copy of the GNU General Public License
21 along with drbd; see the file COPYING. If not, write to
22 the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
23 */
24
25
b411b363
PR
26#include <linux/module.h>
27
28#include <asm/uaccess.h>
29#include <net/sock.h>
30
b411b363
PR
31#include <linux/drbd.h>
32#include <linux/fs.h>
33#include <linux/file.h>
34#include <linux/in.h>
35#include <linux/mm.h>
36#include <linux/memcontrol.h>
37#include <linux/mm_inline.h>
38#include <linux/slab.h>
b411b363
PR
39#include <linux/pkt_sched.h>
40#define __KERNEL_SYSCALLS__
41#include <linux/unistd.h>
42#include <linux/vmalloc.h>
43#include <linux/random.h>
b411b363
PR
44#include <linux/string.h>
45#include <linux/scatterlist.h>
46#include "drbd_int.h"
b411b363
PR
47#include "drbd_req.h"
48
49#include "drbd_vli.h"
50
77351055
PR
51struct packet_info {
52 enum drbd_packet cmd;
53 int size;
54 int vnr;
55};
56
b411b363
PR
57enum finish_epoch {
58 FE_STILL_LIVE,
59 FE_DESTROYED,
60 FE_RECYCLED,
61};
62
a4fbda8e
PR
63enum mdev_or_conn {
64 MDEV,
65 CONN,
66};
67
65d11ed6 68static int drbd_do_handshake(struct drbd_tconn *tconn);
13e6037d 69static int drbd_do_auth(struct drbd_tconn *tconn);
360cc740 70static int drbd_disconnected(int vnr, void *p, void *data);
b411b363
PR
71
72static enum finish_epoch drbd_may_finish_epoch(struct drbd_conf *, struct drbd_epoch *, enum epoch_event);
99920dc5 73static int e_end_block(struct drbd_work *, int);
b411b363 74
b411b363
PR
75
76#define GFP_TRY (__GFP_HIGHMEM | __GFP_NOWARN)
77
45bb912b
LE
78/*
79 * some helper functions to deal with single linked page lists,
80 * page->private being our "next" pointer.
81 */
82
83/* If at least n pages are linked at head, get n pages off.
84 * Otherwise, don't modify head, and return NULL.
85 * Locking is the responsibility of the caller.
86 */
87static struct page *page_chain_del(struct page **head, int n)
88{
89 struct page *page;
90 struct page *tmp;
91
92 BUG_ON(!n);
93 BUG_ON(!head);
94
95 page = *head;
23ce4227
PR
96
97 if (!page)
98 return NULL;
99
45bb912b
LE
100 while (page) {
101 tmp = page_chain_next(page);
102 if (--n == 0)
103 break; /* found sufficient pages */
104 if (tmp == NULL)
105 /* insufficient pages, don't use any of them. */
106 return NULL;
107 page = tmp;
108 }
109
110 /* add end of list marker for the returned list */
111 set_page_private(page, 0);
112 /* actual return value, and adjustment of head */
113 page = *head;
114 *head = tmp;
115 return page;
116}
117
118/* may be used outside of locks to find the tail of a (usually short)
119 * "private" page chain, before adding it back to a global chain head
120 * with page_chain_add() under a spinlock. */
121static struct page *page_chain_tail(struct page *page, int *len)
122{
123 struct page *tmp;
124 int i = 1;
125 while ((tmp = page_chain_next(page)))
126 ++i, page = tmp;
127 if (len)
128 *len = i;
129 return page;
130}
131
132static int page_chain_free(struct page *page)
133{
134 struct page *tmp;
135 int i = 0;
136 page_chain_for_each_safe(page, tmp) {
137 put_page(page);
138 ++i;
139 }
140 return i;
141}
142
143static void page_chain_add(struct page **head,
144 struct page *chain_first, struct page *chain_last)
145{
146#if 1
147 struct page *tmp;
148 tmp = page_chain_tail(chain_first, NULL);
149 BUG_ON(tmp != chain_last);
150#endif
151
152 /* add chain to head */
153 set_page_private(chain_last, (unsigned long)*head);
154 *head = chain_first;
155}
156
157static struct page *drbd_pp_first_pages_or_try_alloc(struct drbd_conf *mdev, int number)
b411b363
PR
158{
159 struct page *page = NULL;
45bb912b
LE
160 struct page *tmp = NULL;
161 int i = 0;
b411b363
PR
162
163 /* Yes, testing drbd_pp_vacant outside the lock is racy.
164 * So what. It saves a spin_lock. */
45bb912b 165 if (drbd_pp_vacant >= number) {
b411b363 166 spin_lock(&drbd_pp_lock);
45bb912b
LE
167 page = page_chain_del(&drbd_pp_pool, number);
168 if (page)
169 drbd_pp_vacant -= number;
b411b363 170 spin_unlock(&drbd_pp_lock);
45bb912b
LE
171 if (page)
172 return page;
b411b363 173 }
45bb912b 174
b411b363
PR
175 /* GFP_TRY, because we must not cause arbitrary write-out: in a DRBD
176 * "criss-cross" setup, that might cause write-out on some other DRBD,
177 * which in turn might block on the other node at this very place. */
45bb912b
LE
178 for (i = 0; i < number; i++) {
179 tmp = alloc_page(GFP_TRY);
180 if (!tmp)
181 break;
182 set_page_private(tmp, (unsigned long)page);
183 page = tmp;
184 }
185
186 if (i == number)
187 return page;
188
189 /* Not enough pages immediately available this time.
190 * No need to jump around here, drbd_pp_alloc will retry this
191 * function "soon". */
192 if (page) {
193 tmp = page_chain_tail(page, NULL);
194 spin_lock(&drbd_pp_lock);
195 page_chain_add(&drbd_pp_pool, page, tmp);
196 drbd_pp_vacant += i;
197 spin_unlock(&drbd_pp_lock);
198 }
199 return NULL;
b411b363
PR
200}
201
b411b363
PR
202static void reclaim_net_ee(struct drbd_conf *mdev, struct list_head *to_be_freed)
203{
db830c46 204 struct drbd_peer_request *peer_req;
b411b363
PR
205 struct list_head *le, *tle;
206
207 /* The EEs are always appended to the end of the list. Since
208 they are sent in order over the wire, they have to finish
209 in order. As soon as we see the first not finished we can
210 stop to examine the list... */
211
212 list_for_each_safe(le, tle, &mdev->net_ee) {
db830c46
AG
213 peer_req = list_entry(le, struct drbd_peer_request, w.list);
214 if (drbd_ee_has_active_page(peer_req))
b411b363
PR
215 break;
216 list_move(le, to_be_freed);
217 }
218}
219
220static void drbd_kick_lo_and_reclaim_net(struct drbd_conf *mdev)
221{
222 LIST_HEAD(reclaimed);
db830c46 223 struct drbd_peer_request *peer_req, *t;
b411b363 224
87eeee41 225 spin_lock_irq(&mdev->tconn->req_lock);
b411b363 226 reclaim_net_ee(mdev, &reclaimed);
87eeee41 227 spin_unlock_irq(&mdev->tconn->req_lock);
b411b363 228
db830c46
AG
229 list_for_each_entry_safe(peer_req, t, &reclaimed, w.list)
230 drbd_free_net_ee(mdev, peer_req);
b411b363
PR
231}
232
233/**
45bb912b 234 * drbd_pp_alloc() - Returns @number pages, retries forever (or until signalled)
b411b363 235 * @mdev: DRBD device.
45bb912b
LE
236 * @number: number of pages requested
237 * @retry: whether to retry, if not enough pages are available right now
238 *
239 * Tries to allocate number pages, first from our own page pool, then from
240 * the kernel, unless this allocation would exceed the max_buffers setting.
241 * Possibly retry until DRBD frees sufficient pages somewhere else.
b411b363 242 *
45bb912b 243 * Returns a page chain linked via page->private.
b411b363 244 */
45bb912b 245static struct page *drbd_pp_alloc(struct drbd_conf *mdev, unsigned number, bool retry)
b411b363
PR
246{
247 struct page *page = NULL;
248 DEFINE_WAIT(wait);
249
45bb912b
LE
250 /* Yes, we may run up to @number over max_buffers. If we
251 * follow it strictly, the admin will get it wrong anyways. */
89e58e75 252 if (atomic_read(&mdev->pp_in_use) < mdev->tconn->net_conf->max_buffers)
45bb912b 253 page = drbd_pp_first_pages_or_try_alloc(mdev, number);
b411b363 254
45bb912b 255 while (page == NULL) {
b411b363
PR
256 prepare_to_wait(&drbd_pp_wait, &wait, TASK_INTERRUPTIBLE);
257
258 drbd_kick_lo_and_reclaim_net(mdev);
259
89e58e75 260 if (atomic_read(&mdev->pp_in_use) < mdev->tconn->net_conf->max_buffers) {
45bb912b 261 page = drbd_pp_first_pages_or_try_alloc(mdev, number);
b411b363
PR
262 if (page)
263 break;
264 }
265
266 if (!retry)
267 break;
268
269 if (signal_pending(current)) {
270 dev_warn(DEV, "drbd_pp_alloc interrupted!\n");
271 break;
272 }
273
274 schedule();
275 }
276 finish_wait(&drbd_pp_wait, &wait);
277
45bb912b
LE
278 if (page)
279 atomic_add(number, &mdev->pp_in_use);
b411b363
PR
280 return page;
281}
282
283/* Must not be used from irq, as that may deadlock: see drbd_pp_alloc.
87eeee41 284 * Is also used from inside an other spin_lock_irq(&mdev->tconn->req_lock);
45bb912b
LE
285 * Either links the page chain back to the global pool,
286 * or returns all pages to the system. */
435f0740 287static void drbd_pp_free(struct drbd_conf *mdev, struct page *page, int is_net)
b411b363 288{
435f0740 289 atomic_t *a = is_net ? &mdev->pp_in_use_by_net : &mdev->pp_in_use;
b411b363 290 int i;
435f0740 291
81a5d60e 292 if (drbd_pp_vacant > (DRBD_MAX_BIO_SIZE/PAGE_SIZE) * minor_count)
45bb912b
LE
293 i = page_chain_free(page);
294 else {
295 struct page *tmp;
296 tmp = page_chain_tail(page, &i);
297 spin_lock(&drbd_pp_lock);
298 page_chain_add(&drbd_pp_pool, page, tmp);
299 drbd_pp_vacant += i;
300 spin_unlock(&drbd_pp_lock);
b411b363 301 }
435f0740 302 i = atomic_sub_return(i, a);
45bb912b 303 if (i < 0)
435f0740
LE
304 dev_warn(DEV, "ASSERTION FAILED: %s: %d < 0\n",
305 is_net ? "pp_in_use_by_net" : "pp_in_use", i);
b411b363
PR
306 wake_up(&drbd_pp_wait);
307}
308
309/*
310You need to hold the req_lock:
311 _drbd_wait_ee_list_empty()
312
313You must not have the req_lock:
314 drbd_free_ee()
315 drbd_alloc_ee()
316 drbd_init_ee()
317 drbd_release_ee()
318 drbd_ee_fix_bhs()
319 drbd_process_done_ee()
320 drbd_clear_done_ee()
321 drbd_wait_ee_list_empty()
322*/
323
f6ffca9f
AG
324struct drbd_peer_request *
325drbd_alloc_ee(struct drbd_conf *mdev, u64 id, sector_t sector,
326 unsigned int data_size, gfp_t gfp_mask) __must_hold(local)
b411b363 327{
db830c46 328 struct drbd_peer_request *peer_req;
b411b363 329 struct page *page;
45bb912b 330 unsigned nr_pages = (data_size + PAGE_SIZE -1) >> PAGE_SHIFT;
b411b363 331
0cf9d27e 332 if (drbd_insert_fault(mdev, DRBD_FAULT_AL_EE))
b411b363
PR
333 return NULL;
334
db830c46
AG
335 peer_req = mempool_alloc(drbd_ee_mempool, gfp_mask & ~__GFP_HIGHMEM);
336 if (!peer_req) {
b411b363
PR
337 if (!(gfp_mask & __GFP_NOWARN))
338 dev_err(DEV, "alloc_ee: Allocation of an EE failed\n");
339 return NULL;
340 }
341
45bb912b
LE
342 page = drbd_pp_alloc(mdev, nr_pages, (gfp_mask & __GFP_WAIT));
343 if (!page)
344 goto fail;
b411b363 345
db830c46
AG
346 drbd_clear_interval(&peer_req->i);
347 peer_req->i.size = data_size;
348 peer_req->i.sector = sector;
349 peer_req->i.local = false;
350 peer_req->i.waiting = false;
351
352 peer_req->epoch = NULL;
a21e9298 353 peer_req->w.mdev = mdev;
db830c46
AG
354 peer_req->pages = page;
355 atomic_set(&peer_req->pending_bios, 0);
356 peer_req->flags = 0;
9a8e7753
AG
357 /*
358 * The block_id is opaque to the receiver. It is not endianness
359 * converted, and sent back to the sender unchanged.
360 */
db830c46 361 peer_req->block_id = id;
b411b363 362
db830c46 363 return peer_req;
b411b363 364
45bb912b 365 fail:
db830c46 366 mempool_free(peer_req, drbd_ee_mempool);
b411b363
PR
367 return NULL;
368}
369
db830c46 370void drbd_free_some_ee(struct drbd_conf *mdev, struct drbd_peer_request *peer_req,
f6ffca9f 371 int is_net)
b411b363 372{
db830c46
AG
373 if (peer_req->flags & EE_HAS_DIGEST)
374 kfree(peer_req->digest);
375 drbd_pp_free(mdev, peer_req->pages, is_net);
376 D_ASSERT(atomic_read(&peer_req->pending_bios) == 0);
377 D_ASSERT(drbd_interval_empty(&peer_req->i));
378 mempool_free(peer_req, drbd_ee_mempool);
b411b363
PR
379}
380
381int drbd_release_ee(struct drbd_conf *mdev, struct list_head *list)
382{
383 LIST_HEAD(work_list);
db830c46 384 struct drbd_peer_request *peer_req, *t;
b411b363 385 int count = 0;
435f0740 386 int is_net = list == &mdev->net_ee;
b411b363 387
87eeee41 388 spin_lock_irq(&mdev->tconn->req_lock);
b411b363 389 list_splice_init(list, &work_list);
87eeee41 390 spin_unlock_irq(&mdev->tconn->req_lock);
b411b363 391
db830c46
AG
392 list_for_each_entry_safe(peer_req, t, &work_list, w.list) {
393 drbd_free_some_ee(mdev, peer_req, is_net);
b411b363
PR
394 count++;
395 }
396 return count;
397}
398
399
32862ec7 400/* See also comments in _req_mod(,BARRIER_ACKED)
b411b363
PR
401 * and receive_Barrier.
402 *
403 * Move entries from net_ee to done_ee, if ready.
404 * Grab done_ee, call all callbacks, free the entries.
405 * The callbacks typically send out ACKs.
406 */
407static int drbd_process_done_ee(struct drbd_conf *mdev)
408{
409 LIST_HEAD(work_list);
410 LIST_HEAD(reclaimed);
db830c46 411 struct drbd_peer_request *peer_req, *t;
e2b3032b 412 int err = 0;
b411b363 413
87eeee41 414 spin_lock_irq(&mdev->tconn->req_lock);
b411b363
PR
415 reclaim_net_ee(mdev, &reclaimed);
416 list_splice_init(&mdev->done_ee, &work_list);
87eeee41 417 spin_unlock_irq(&mdev->tconn->req_lock);
b411b363 418
db830c46
AG
419 list_for_each_entry_safe(peer_req, t, &reclaimed, w.list)
420 drbd_free_net_ee(mdev, peer_req);
b411b363
PR
421
422 /* possible callbacks here:
7be8da07 423 * e_end_block, and e_end_resync_block, e_send_discard_write.
b411b363
PR
424 * all ignore the last argument.
425 */
db830c46 426 list_for_each_entry_safe(peer_req, t, &work_list, w.list) {
e2b3032b
AG
427 int err2;
428
b411b363 429 /* list_del not necessary, next/prev members not touched */
e2b3032b
AG
430 err2 = peer_req->w.cb(&peer_req->w, !!err);
431 if (!err)
432 err = err2;
db830c46 433 drbd_free_ee(mdev, peer_req);
b411b363
PR
434 }
435 wake_up(&mdev->ee_wait);
436
e2b3032b 437 return err;
b411b363
PR
438}
439
440void _drbd_wait_ee_list_empty(struct drbd_conf *mdev, struct list_head *head)
441{
442 DEFINE_WAIT(wait);
443
444 /* avoids spin_lock/unlock
445 * and calling prepare_to_wait in the fast path */
446 while (!list_empty(head)) {
447 prepare_to_wait(&mdev->ee_wait, &wait, TASK_UNINTERRUPTIBLE);
87eeee41 448 spin_unlock_irq(&mdev->tconn->req_lock);
7eaceacc 449 io_schedule();
b411b363 450 finish_wait(&mdev->ee_wait, &wait);
87eeee41 451 spin_lock_irq(&mdev->tconn->req_lock);
b411b363
PR
452 }
453}
454
455void drbd_wait_ee_list_empty(struct drbd_conf *mdev, struct list_head *head)
456{
87eeee41 457 spin_lock_irq(&mdev->tconn->req_lock);
b411b363 458 _drbd_wait_ee_list_empty(mdev, head);
87eeee41 459 spin_unlock_irq(&mdev->tconn->req_lock);
b411b363
PR
460}
461
462/* see also kernel_accept; which is only present since 2.6.18.
463 * also we want to log which part of it failed, exactly */
7653620d 464static int drbd_accept(const char **what, struct socket *sock, struct socket **newsock)
b411b363
PR
465{
466 struct sock *sk = sock->sk;
467 int err = 0;
468
469 *what = "listen";
470 err = sock->ops->listen(sock, 5);
471 if (err < 0)
472 goto out;
473
474 *what = "sock_create_lite";
475 err = sock_create_lite(sk->sk_family, sk->sk_type, sk->sk_protocol,
476 newsock);
477 if (err < 0)
478 goto out;
479
480 *what = "accept";
481 err = sock->ops->accept(sock, *newsock, 0);
482 if (err < 0) {
483 sock_release(*newsock);
484 *newsock = NULL;
485 goto out;
486 }
487 (*newsock)->ops = sock->ops;
488
489out:
490 return err;
491}
492
dbd9eea0 493static int drbd_recv_short(struct socket *sock, void *buf, size_t size, int flags)
b411b363
PR
494{
495 mm_segment_t oldfs;
496 struct kvec iov = {
497 .iov_base = buf,
498 .iov_len = size,
499 };
500 struct msghdr msg = {
501 .msg_iovlen = 1,
502 .msg_iov = (struct iovec *)&iov,
503 .msg_flags = (flags ? flags : MSG_WAITALL | MSG_NOSIGNAL)
504 };
505 int rv;
506
507 oldfs = get_fs();
508 set_fs(KERNEL_DS);
509 rv = sock_recvmsg(sock, &msg, size, msg.msg_flags);
510 set_fs(oldfs);
511
512 return rv;
513}
514
de0ff338 515static int drbd_recv(struct drbd_tconn *tconn, void *buf, size_t size)
b411b363
PR
516{
517 mm_segment_t oldfs;
518 struct kvec iov = {
519 .iov_base = buf,
520 .iov_len = size,
521 };
522 struct msghdr msg = {
523 .msg_iovlen = 1,
524 .msg_iov = (struct iovec *)&iov,
525 .msg_flags = MSG_WAITALL | MSG_NOSIGNAL
526 };
527 int rv;
528
529 oldfs = get_fs();
530 set_fs(KERNEL_DS);
531
532 for (;;) {
de0ff338 533 rv = sock_recvmsg(tconn->data.socket, &msg, size, msg.msg_flags);
b411b363
PR
534 if (rv == size)
535 break;
536
537 /* Note:
538 * ECONNRESET other side closed the connection
539 * ERESTARTSYS (on sock) we got a signal
540 */
541
542 if (rv < 0) {
543 if (rv == -ECONNRESET)
de0ff338 544 conn_info(tconn, "sock was reset by peer\n");
b411b363 545 else if (rv != -ERESTARTSYS)
de0ff338 546 conn_err(tconn, "sock_recvmsg returned %d\n", rv);
b411b363
PR
547 break;
548 } else if (rv == 0) {
de0ff338 549 conn_info(tconn, "sock was shut down by peer\n");
b411b363
PR
550 break;
551 } else {
552 /* signal came in, or peer/link went down,
553 * after we read a partial message
554 */
555 /* D_ASSERT(signal_pending(current)); */
556 break;
557 }
558 };
559
560 set_fs(oldfs);
561
562 if (rv != size)
bbeb641c 563 conn_request_state(tconn, NS(conn, C_BROKEN_PIPE), CS_HARD);
b411b363
PR
564
565 return rv;
566}
567
5dbf1673
LE
568/* quoting tcp(7):
569 * On individual connections, the socket buffer size must be set prior to the
570 * listen(2) or connect(2) calls in order to have it take effect.
571 * This is our wrapper to do so.
572 */
573static void drbd_setbufsize(struct socket *sock, unsigned int snd,
574 unsigned int rcv)
575{
576 /* open coded SO_SNDBUF, SO_RCVBUF */
577 if (snd) {
578 sock->sk->sk_sndbuf = snd;
579 sock->sk->sk_userlocks |= SOCK_SNDBUF_LOCK;
580 }
581 if (rcv) {
582 sock->sk->sk_rcvbuf = rcv;
583 sock->sk->sk_userlocks |= SOCK_RCVBUF_LOCK;
584 }
585}
586
eac3e990 587static struct socket *drbd_try_connect(struct drbd_tconn *tconn)
b411b363
PR
588{
589 const char *what;
590 struct socket *sock;
591 struct sockaddr_in6 src_in6;
592 int err;
593 int disconnect_on_error = 1;
594
eac3e990 595 if (!get_net_conf(tconn))
b411b363
PR
596 return NULL;
597
598 what = "sock_create_kern";
eac3e990 599 err = sock_create_kern(((struct sockaddr *)tconn->net_conf->my_addr)->sa_family,
b411b363
PR
600 SOCK_STREAM, IPPROTO_TCP, &sock);
601 if (err < 0) {
602 sock = NULL;
603 goto out;
604 }
605
606 sock->sk->sk_rcvtimeo =
eac3e990
PR
607 sock->sk->sk_sndtimeo = tconn->net_conf->try_connect_int*HZ;
608 drbd_setbufsize(sock, tconn->net_conf->sndbuf_size,
609 tconn->net_conf->rcvbuf_size);
b411b363
PR
610
611 /* explicitly bind to the configured IP as source IP
612 * for the outgoing connections.
613 * This is needed for multihomed hosts and to be
614 * able to use lo: interfaces for drbd.
615 * Make sure to use 0 as port number, so linux selects
616 * a free one dynamically.
617 */
eac3e990
PR
618 memcpy(&src_in6, tconn->net_conf->my_addr,
619 min_t(int, tconn->net_conf->my_addr_len, sizeof(src_in6)));
620 if (((struct sockaddr *)tconn->net_conf->my_addr)->sa_family == AF_INET6)
b411b363
PR
621 src_in6.sin6_port = 0;
622 else
623 ((struct sockaddr_in *)&src_in6)->sin_port = 0; /* AF_INET & AF_SCI */
624
625 what = "bind before connect";
626 err = sock->ops->bind(sock,
627 (struct sockaddr *) &src_in6,
eac3e990 628 tconn->net_conf->my_addr_len);
b411b363
PR
629 if (err < 0)
630 goto out;
631
632 /* connect may fail, peer not yet available.
633 * stay C_WF_CONNECTION, don't go Disconnecting! */
634 disconnect_on_error = 0;
635 what = "connect";
636 err = sock->ops->connect(sock,
eac3e990
PR
637 (struct sockaddr *)tconn->net_conf->peer_addr,
638 tconn->net_conf->peer_addr_len, 0);
b411b363
PR
639
640out:
641 if (err < 0) {
642 if (sock) {
643 sock_release(sock);
644 sock = NULL;
645 }
646 switch (-err) {
647 /* timeout, busy, signal pending */
648 case ETIMEDOUT: case EAGAIN: case EINPROGRESS:
649 case EINTR: case ERESTARTSYS:
650 /* peer not (yet) available, network problem */
651 case ECONNREFUSED: case ENETUNREACH:
652 case EHOSTDOWN: case EHOSTUNREACH:
653 disconnect_on_error = 0;
654 break;
655 default:
eac3e990 656 conn_err(tconn, "%s failed, err = %d\n", what, err);
b411b363
PR
657 }
658 if (disconnect_on_error)
bbeb641c 659 conn_request_state(tconn, NS(conn, C_DISCONNECTING), CS_HARD);
b411b363 660 }
eac3e990 661 put_net_conf(tconn);
b411b363
PR
662 return sock;
663}
664
7653620d 665static struct socket *drbd_wait_for_connect(struct drbd_tconn *tconn)
b411b363
PR
666{
667 int timeo, err;
668 struct socket *s_estab = NULL, *s_listen;
669 const char *what;
670
7653620d 671 if (!get_net_conf(tconn))
b411b363
PR
672 return NULL;
673
674 what = "sock_create_kern";
7653620d 675 err = sock_create_kern(((struct sockaddr *)tconn->net_conf->my_addr)->sa_family,
b411b363
PR
676 SOCK_STREAM, IPPROTO_TCP, &s_listen);
677 if (err) {
678 s_listen = NULL;
679 goto out;
680 }
681
7653620d 682 timeo = tconn->net_conf->try_connect_int * HZ;
b411b363
PR
683 timeo += (random32() & 1) ? timeo / 7 : -timeo / 7; /* 28.5% random jitter */
684
685 s_listen->sk->sk_reuse = 1; /* SO_REUSEADDR */
686 s_listen->sk->sk_rcvtimeo = timeo;
687 s_listen->sk->sk_sndtimeo = timeo;
7653620d
PR
688 drbd_setbufsize(s_listen, tconn->net_conf->sndbuf_size,
689 tconn->net_conf->rcvbuf_size);
b411b363
PR
690
691 what = "bind before listen";
692 err = s_listen->ops->bind(s_listen,
7653620d
PR
693 (struct sockaddr *) tconn->net_conf->my_addr,
694 tconn->net_conf->my_addr_len);
b411b363
PR
695 if (err < 0)
696 goto out;
697
7653620d 698 err = drbd_accept(&what, s_listen, &s_estab);
b411b363
PR
699
700out:
701 if (s_listen)
702 sock_release(s_listen);
703 if (err < 0) {
704 if (err != -EAGAIN && err != -EINTR && err != -ERESTARTSYS) {
7653620d 705 conn_err(tconn, "%s failed, err = %d\n", what, err);
bbeb641c 706 conn_request_state(tconn, NS(conn, C_DISCONNECTING), CS_HARD);
b411b363
PR
707 }
708 }
7653620d 709 put_net_conf(tconn);
b411b363
PR
710
711 return s_estab;
712}
713
d38e787e 714static int drbd_send_fp(struct drbd_tconn *tconn, struct socket *sock, enum drbd_packet cmd)
b411b363 715{
d38e787e 716 struct p_header *h = &tconn->data.sbuf.header;
b411b363 717
ecf2363c 718 return !_conn_send_cmd(tconn, 0, sock, cmd, h, sizeof(*h), 0);
b411b363
PR
719}
720
a25b63f1 721static enum drbd_packet drbd_recv_fp(struct drbd_tconn *tconn, struct socket *sock)
b411b363 722{
a25b63f1 723 struct p_header80 *h = &tconn->data.rbuf.header.h80;
b411b363
PR
724 int rr;
725
dbd9eea0 726 rr = drbd_recv_short(sock, h, sizeof(*h), 0);
b411b363 727
ca9bc12b 728 if (rr == sizeof(*h) && h->magic == cpu_to_be32(DRBD_MAGIC))
b411b363
PR
729 return be16_to_cpu(h->command);
730
731 return 0xffff;
732}
733
734/**
735 * drbd_socket_okay() - Free the socket if its connection is not okay
b411b363
PR
736 * @sock: pointer to the pointer to the socket.
737 */
dbd9eea0 738static int drbd_socket_okay(struct socket **sock)
b411b363
PR
739{
740 int rr;
741 char tb[4];
742
743 if (!*sock)
81e84650 744 return false;
b411b363 745
dbd9eea0 746 rr = drbd_recv_short(*sock, tb, 4, MSG_DONTWAIT | MSG_PEEK);
b411b363
PR
747
748 if (rr > 0 || rr == -EAGAIN) {
81e84650 749 return true;
b411b363
PR
750 } else {
751 sock_release(*sock);
752 *sock = NULL;
81e84650 753 return false;
b411b363
PR
754 }
755}
2325eb66
PR
756/* Gets called if a connection is established, or if a new minor gets created
757 in a connection */
758int drbd_connected(int vnr, void *p, void *data)
907599e0
PR
759{
760 struct drbd_conf *mdev = (struct drbd_conf *)p;
761 int ok = 1;
762
763 atomic_set(&mdev->packet_seq, 0);
764 mdev->peer_seq = 0;
765
8410da8f
PR
766 mdev->state_mutex = mdev->tconn->agreed_pro_version < 100 ?
767 &mdev->tconn->cstate_mutex :
768 &mdev->own_state_mutex;
769
103ea275 770 ok &= !drbd_send_sync_param(mdev);
f02d4d0a 771 ok &= !drbd_send_sizes(mdev, 0, 0);
2ae5f95b 772 ok &= !drbd_send_uuids(mdev);
927036f9 773 ok &= !drbd_send_state(mdev);
907599e0
PR
774 clear_bit(USE_DEGR_WFC_T, &mdev->flags);
775 clear_bit(RESIZE_PENDING, &mdev->flags);
776
8410da8f 777
907599e0
PR
778 return !ok;
779}
780
b411b363
PR
781/*
782 * return values:
783 * 1 yes, we have a valid connection
784 * 0 oops, did not work out, please try again
785 * -1 peer talks different language,
786 * no point in trying again, please go standalone.
787 * -2 We do not have a network config...
788 */
907599e0 789static int drbd_connect(struct drbd_tconn *tconn)
b411b363
PR
790{
791 struct socket *s, *sock, *msock;
792 int try, h, ok;
793
bbeb641c 794 if (conn_request_state(tconn, NS(conn, C_WF_CONNECTION), CS_VERBOSE) < SS_SUCCESS)
b411b363
PR
795 return -2;
796
907599e0
PR
797 clear_bit(DISCARD_CONCURRENT, &tconn->flags);
798 tconn->agreed_pro_version = 99;
fd340c12
PR
799 /* agreed_pro_version must be smaller than 100 so we send the old
800 header (h80) in the first packet and in the handshake packet. */
b411b363
PR
801
802 sock = NULL;
803 msock = NULL;
804
805 do {
806 for (try = 0;;) {
807 /* 3 tries, this should take less than a second! */
907599e0 808 s = drbd_try_connect(tconn);
b411b363
PR
809 if (s || ++try >= 3)
810 break;
811 /* give the other side time to call bind() & listen() */
20ee6390 812 schedule_timeout_interruptible(HZ / 10);
b411b363
PR
813 }
814
815 if (s) {
816 if (!sock) {
907599e0 817 drbd_send_fp(tconn, s, P_HAND_SHAKE_S);
b411b363
PR
818 sock = s;
819 s = NULL;
820 } else if (!msock) {
907599e0 821 drbd_send_fp(tconn, s, P_HAND_SHAKE_M);
b411b363
PR
822 msock = s;
823 s = NULL;
824 } else {
907599e0 825 conn_err(tconn, "Logic error in drbd_connect()\n");
b411b363
PR
826 goto out_release_sockets;
827 }
828 }
829
830 if (sock && msock) {
907599e0 831 schedule_timeout_interruptible(tconn->net_conf->ping_timeo*HZ/10);
dbd9eea0
PR
832 ok = drbd_socket_okay(&sock);
833 ok = drbd_socket_okay(&msock) && ok;
b411b363
PR
834 if (ok)
835 break;
836 }
837
838retry:
907599e0 839 s = drbd_wait_for_connect(tconn);
b411b363 840 if (s) {
907599e0 841 try = drbd_recv_fp(tconn, s);
dbd9eea0
PR
842 drbd_socket_okay(&sock);
843 drbd_socket_okay(&msock);
b411b363
PR
844 switch (try) {
845 case P_HAND_SHAKE_S:
846 if (sock) {
907599e0 847 conn_warn(tconn, "initial packet S crossed\n");
b411b363
PR
848 sock_release(sock);
849 }
850 sock = s;
851 break;
852 case P_HAND_SHAKE_M:
853 if (msock) {
907599e0 854 conn_warn(tconn, "initial packet M crossed\n");
b411b363
PR
855 sock_release(msock);
856 }
857 msock = s;
907599e0 858 set_bit(DISCARD_CONCURRENT, &tconn->flags);
b411b363
PR
859 break;
860 default:
907599e0 861 conn_warn(tconn, "Error receiving initial packet\n");
b411b363
PR
862 sock_release(s);
863 if (random32() & 1)
864 goto retry;
865 }
866 }
867
bbeb641c 868 if (tconn->cstate <= C_DISCONNECTING)
b411b363
PR
869 goto out_release_sockets;
870 if (signal_pending(current)) {
871 flush_signals(current);
872 smp_rmb();
907599e0 873 if (get_t_state(&tconn->receiver) == EXITING)
b411b363
PR
874 goto out_release_sockets;
875 }
876
877 if (sock && msock) {
dbd9eea0
PR
878 ok = drbd_socket_okay(&sock);
879 ok = drbd_socket_okay(&msock) && ok;
b411b363
PR
880 if (ok)
881 break;
882 }
883 } while (1);
884
885 msock->sk->sk_reuse = 1; /* SO_REUSEADDR */
886 sock->sk->sk_reuse = 1; /* SO_REUSEADDR */
887
888 sock->sk->sk_allocation = GFP_NOIO;
889 msock->sk->sk_allocation = GFP_NOIO;
890
891 sock->sk->sk_priority = TC_PRIO_INTERACTIVE_BULK;
892 msock->sk->sk_priority = TC_PRIO_INTERACTIVE;
893
b411b363 894 /* NOT YET ...
907599e0 895 * sock->sk->sk_sndtimeo = tconn->net_conf->timeout*HZ/10;
b411b363
PR
896 * sock->sk->sk_rcvtimeo = MAX_SCHEDULE_TIMEOUT;
897 * first set it to the P_HAND_SHAKE timeout,
898 * which we set to 4x the configured ping_timeout. */
899 sock->sk->sk_sndtimeo =
907599e0 900 sock->sk->sk_rcvtimeo = tconn->net_conf->ping_timeo*4*HZ/10;
b411b363 901
907599e0
PR
902 msock->sk->sk_sndtimeo = tconn->net_conf->timeout*HZ/10;
903 msock->sk->sk_rcvtimeo = tconn->net_conf->ping_int*HZ;
b411b363
PR
904
905 /* we don't want delays.
25985edc 906 * we use TCP_CORK where appropriate, though */
b411b363
PR
907 drbd_tcp_nodelay(sock);
908 drbd_tcp_nodelay(msock);
909
907599e0
PR
910 tconn->data.socket = sock;
911 tconn->meta.socket = msock;
912 tconn->last_received = jiffies;
b411b363 913
907599e0 914 h = drbd_do_handshake(tconn);
b411b363
PR
915 if (h <= 0)
916 return h;
917
907599e0 918 if (tconn->cram_hmac_tfm) {
b411b363 919 /* drbd_request_state(mdev, NS(conn, WFAuth)); */
907599e0 920 switch (drbd_do_auth(tconn)) {
b10d96cb 921 case -1:
907599e0 922 conn_err(tconn, "Authentication of peer failed\n");
b411b363 923 return -1;
b10d96cb 924 case 0:
907599e0 925 conn_err(tconn, "Authentication of peer failed, trying again.\n");
b10d96cb 926 return 0;
b411b363
PR
927 }
928 }
929
bbeb641c 930 if (conn_request_state(tconn, NS(conn, C_WF_REPORT_PARAMS), CS_VERBOSE) < SS_SUCCESS)
b411b363
PR
931 return 0;
932
907599e0 933 sock->sk->sk_sndtimeo = tconn->net_conf->timeout*HZ/10;
b411b363
PR
934 sock->sk->sk_rcvtimeo = MAX_SCHEDULE_TIMEOUT;
935
907599e0 936 drbd_thread_start(&tconn->asender);
b411b363 937
387eb308 938 if (drbd_send_protocol(tconn) == -EOPNOTSUPP)
7e2455c1 939 return -1;
b411b363 940
907599e0 941 return !idr_for_each(&tconn->volumes, drbd_connected, tconn);
b411b363
PR
942
943out_release_sockets:
944 if (sock)
945 sock_release(sock);
946 if (msock)
947 sock_release(msock);
948 return -1;
949}
950
8172f3e9 951static int decode_header(struct drbd_tconn *tconn, struct p_header *h, struct packet_info *pi)
b411b363 952{
fd340c12 953 if (h->h80.magic == cpu_to_be32(DRBD_MAGIC)) {
77351055
PR
954 pi->cmd = be16_to_cpu(h->h80.command);
955 pi->size = be16_to_cpu(h->h80.length);
eefc2f7d 956 pi->vnr = 0;
ca9bc12b 957 } else if (h->h95.magic == cpu_to_be16(DRBD_MAGIC_BIG)) {
77351055
PR
958 pi->cmd = be16_to_cpu(h->h95.command);
959 pi->size = be32_to_cpu(h->h95.length) & 0x00ffffff;
960 pi->vnr = 0;
02918be2 961 } else {
ce243853 962 conn_err(tconn, "magic?? on data m: 0x%08x c: %d l: %d\n",
004352fa
LE
963 be32_to_cpu(h->h80.magic),
964 be16_to_cpu(h->h80.command),
965 be16_to_cpu(h->h80.length));
8172f3e9 966 return -EINVAL;
b411b363 967 }
8172f3e9 968 return 0;
257d0af6
PR
969}
970
9ba7aa00 971static int drbd_recv_header(struct drbd_tconn *tconn, struct packet_info *pi)
257d0af6 972{
9ba7aa00 973 struct p_header *h = &tconn->data.rbuf.header;
69bc7bc3 974 int err;
257d0af6 975
69bc7bc3
AG
976 err = drbd_recv(tconn, h, sizeof(*h));
977 if (unlikely(err != sizeof(*h))) {
257d0af6 978 if (!signal_pending(current))
69bc7bc3
AG
979 conn_warn(tconn, "short read expecting header on sock: r=%d\n", err);
980 if (err >= 0)
981 err = -EIO;
982 return err;
257d0af6
PR
983 }
984
69bc7bc3 985 err = decode_header(tconn, h, pi);
9ba7aa00 986 tconn->last_received = jiffies;
b411b363 987
69bc7bc3 988 return err;
b411b363
PR
989}
990
2451fc3b 991static void drbd_flush(struct drbd_conf *mdev)
b411b363
PR
992{
993 int rv;
994
995 if (mdev->write_ordering >= WO_bdev_flush && get_ldev(mdev)) {
fbd9b09a 996 rv = blkdev_issue_flush(mdev->ldev->backing_bdev, GFP_KERNEL,
dd3932ed 997 NULL);
b411b363
PR
998 if (rv) {
999 dev_err(DEV, "local disk flush failed with status %d\n", rv);
1000 /* would rather check on EOPNOTSUPP, but that is not reliable.
1001 * don't try again for ANY return value != 0
1002 * if (rv == -EOPNOTSUPP) */
1003 drbd_bump_write_ordering(mdev, WO_drain_io);
1004 }
1005 put_ldev(mdev);
1006 }
b411b363
PR
1007}
1008
1009/**
1010 * drbd_may_finish_epoch() - Applies an epoch_event to the epoch's state, eventually finishes it.
1011 * @mdev: DRBD device.
1012 * @epoch: Epoch object.
1013 * @ev: Epoch event.
1014 */
1015static enum finish_epoch drbd_may_finish_epoch(struct drbd_conf *mdev,
1016 struct drbd_epoch *epoch,
1017 enum epoch_event ev)
1018{
2451fc3b 1019 int epoch_size;
b411b363 1020 struct drbd_epoch *next_epoch;
b411b363
PR
1021 enum finish_epoch rv = FE_STILL_LIVE;
1022
1023 spin_lock(&mdev->epoch_lock);
1024 do {
1025 next_epoch = NULL;
b411b363
PR
1026
1027 epoch_size = atomic_read(&epoch->epoch_size);
1028
1029 switch (ev & ~EV_CLEANUP) {
1030 case EV_PUT:
1031 atomic_dec(&epoch->active);
1032 break;
1033 case EV_GOT_BARRIER_NR:
1034 set_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags);
b411b363
PR
1035 break;
1036 case EV_BECAME_LAST:
1037 /* nothing to do*/
1038 break;
1039 }
1040
b411b363
PR
1041 if (epoch_size != 0 &&
1042 atomic_read(&epoch->active) == 0 &&
2451fc3b 1043 test_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags)) {
b411b363
PR
1044 if (!(ev & EV_CLEANUP)) {
1045 spin_unlock(&mdev->epoch_lock);
1046 drbd_send_b_ack(mdev, epoch->barrier_nr, epoch_size);
1047 spin_lock(&mdev->epoch_lock);
1048 }
1049 dec_unacked(mdev);
1050
1051 if (mdev->current_epoch != epoch) {
1052 next_epoch = list_entry(epoch->list.next, struct drbd_epoch, list);
1053 list_del(&epoch->list);
1054 ev = EV_BECAME_LAST | (ev & EV_CLEANUP);
1055 mdev->epochs--;
b411b363
PR
1056 kfree(epoch);
1057
1058 if (rv == FE_STILL_LIVE)
1059 rv = FE_DESTROYED;
1060 } else {
1061 epoch->flags = 0;
1062 atomic_set(&epoch->epoch_size, 0);
698f9315 1063 /* atomic_set(&epoch->active, 0); is already zero */
b411b363
PR
1064 if (rv == FE_STILL_LIVE)
1065 rv = FE_RECYCLED;
2451fc3b 1066 wake_up(&mdev->ee_wait);
b411b363
PR
1067 }
1068 }
1069
1070 if (!next_epoch)
1071 break;
1072
1073 epoch = next_epoch;
1074 } while (1);
1075
1076 spin_unlock(&mdev->epoch_lock);
1077
b411b363
PR
1078 return rv;
1079}
1080
1081/**
1082 * drbd_bump_write_ordering() - Fall back to an other write ordering method
1083 * @mdev: DRBD device.
1084 * @wo: Write ordering method to try.
1085 */
1086void drbd_bump_write_ordering(struct drbd_conf *mdev, enum write_ordering_e wo) __must_hold(local)
1087{
1088 enum write_ordering_e pwo;
1089 static char *write_ordering_str[] = {
1090 [WO_none] = "none",
1091 [WO_drain_io] = "drain",
1092 [WO_bdev_flush] = "flush",
b411b363
PR
1093 };
1094
1095 pwo = mdev->write_ordering;
1096 wo = min(pwo, wo);
b411b363
PR
1097 if (wo == WO_bdev_flush && mdev->ldev->dc.no_disk_flush)
1098 wo = WO_drain_io;
1099 if (wo == WO_drain_io && mdev->ldev->dc.no_disk_drain)
1100 wo = WO_none;
1101 mdev->write_ordering = wo;
2451fc3b 1102 if (pwo != mdev->write_ordering || wo == WO_bdev_flush)
b411b363
PR
1103 dev_info(DEV, "Method to ensure write ordering: %s\n", write_ordering_str[mdev->write_ordering]);
1104}
1105
45bb912b 1106/**
fbe29dec 1107 * drbd_submit_peer_request()
45bb912b 1108 * @mdev: DRBD device.
db830c46 1109 * @peer_req: peer request
45bb912b 1110 * @rw: flag field, see bio->bi_rw
10f6d992
LE
1111 *
1112 * May spread the pages to multiple bios,
1113 * depending on bio_add_page restrictions.
1114 *
1115 * Returns 0 if all bios have been submitted,
1116 * -ENOMEM if we could not allocate enough bios,
1117 * -ENOSPC (any better suggestion?) if we have not been able to bio_add_page a
1118 * single page to an empty bio (which should never happen and likely indicates
1119 * that the lower level IO stack is in some way broken). This has been observed
1120 * on certain Xen deployments.
45bb912b
LE
1121 */
1122/* TODO allocate from our own bio_set. */
fbe29dec
AG
1123int drbd_submit_peer_request(struct drbd_conf *mdev,
1124 struct drbd_peer_request *peer_req,
1125 const unsigned rw, const int fault_type)
45bb912b
LE
1126{
1127 struct bio *bios = NULL;
1128 struct bio *bio;
db830c46
AG
1129 struct page *page = peer_req->pages;
1130 sector_t sector = peer_req->i.sector;
1131 unsigned ds = peer_req->i.size;
45bb912b
LE
1132 unsigned n_bios = 0;
1133 unsigned nr_pages = (ds + PAGE_SIZE -1) >> PAGE_SHIFT;
10f6d992 1134 int err = -ENOMEM;
45bb912b
LE
1135
1136 /* In most cases, we will only need one bio. But in case the lower
1137 * level restrictions happen to be different at this offset on this
1138 * side than those of the sending peer, we may need to submit the
da4a75d2
LE
1139 * request in more than one bio.
1140 *
1141 * Plain bio_alloc is good enough here, this is no DRBD internally
1142 * generated bio, but a bio allocated on behalf of the peer.
1143 */
45bb912b
LE
1144next_bio:
1145 bio = bio_alloc(GFP_NOIO, nr_pages);
1146 if (!bio) {
1147 dev_err(DEV, "submit_ee: Allocation of a bio failed\n");
1148 goto fail;
1149 }
db830c46 1150 /* > peer_req->i.sector, unless this is the first bio */
45bb912b
LE
1151 bio->bi_sector = sector;
1152 bio->bi_bdev = mdev->ldev->backing_bdev;
45bb912b 1153 bio->bi_rw = rw;
db830c46 1154 bio->bi_private = peer_req;
fcefa62e 1155 bio->bi_end_io = drbd_peer_request_endio;
45bb912b
LE
1156
1157 bio->bi_next = bios;
1158 bios = bio;
1159 ++n_bios;
1160
1161 page_chain_for_each(page) {
1162 unsigned len = min_t(unsigned, ds, PAGE_SIZE);
1163 if (!bio_add_page(bio, page, len, 0)) {
10f6d992
LE
1164 /* A single page must always be possible!
1165 * But in case it fails anyways,
1166 * we deal with it, and complain (below). */
1167 if (bio->bi_vcnt == 0) {
1168 dev_err(DEV,
1169 "bio_add_page failed for len=%u, "
1170 "bi_vcnt=0 (bi_sector=%llu)\n",
1171 len, (unsigned long long)bio->bi_sector);
1172 err = -ENOSPC;
1173 goto fail;
1174 }
45bb912b
LE
1175 goto next_bio;
1176 }
1177 ds -= len;
1178 sector += len >> 9;
1179 --nr_pages;
1180 }
1181 D_ASSERT(page == NULL);
1182 D_ASSERT(ds == 0);
1183
db830c46 1184 atomic_set(&peer_req->pending_bios, n_bios);
45bb912b
LE
1185 do {
1186 bio = bios;
1187 bios = bios->bi_next;
1188 bio->bi_next = NULL;
1189
45bb912b 1190 drbd_generic_make_request(mdev, fault_type, bio);
45bb912b 1191 } while (bios);
45bb912b
LE
1192 return 0;
1193
1194fail:
1195 while (bios) {
1196 bio = bios;
1197 bios = bios->bi_next;
1198 bio_put(bio);
1199 }
10f6d992 1200 return err;
45bb912b
LE
1201}
1202
53840641 1203static void drbd_remove_epoch_entry_interval(struct drbd_conf *mdev,
db830c46 1204 struct drbd_peer_request *peer_req)
53840641 1205{
db830c46 1206 struct drbd_interval *i = &peer_req->i;
53840641
AG
1207
1208 drbd_remove_interval(&mdev->write_requests, i);
1209 drbd_clear_interval(i);
1210
6c852bec 1211 /* Wake up any processes waiting for this peer request to complete. */
53840641
AG
1212 if (i->waiting)
1213 wake_up(&mdev->misc_wait);
1214}
1215
d8763023
AG
1216static int receive_Barrier(struct drbd_conf *mdev, enum drbd_packet cmd,
1217 unsigned int data_size)
b411b363 1218{
2451fc3b 1219 int rv;
e42325a5 1220 struct p_barrier *p = &mdev->tconn->data.rbuf.barrier;
b411b363
PR
1221 struct drbd_epoch *epoch;
1222
b411b363
PR
1223 inc_unacked(mdev);
1224
b411b363
PR
1225 mdev->current_epoch->barrier_nr = p->barrier;
1226 rv = drbd_may_finish_epoch(mdev, mdev->current_epoch, EV_GOT_BARRIER_NR);
1227
1228 /* P_BARRIER_ACK may imply that the corresponding extent is dropped from
1229 * the activity log, which means it would not be resynced in case the
1230 * R_PRIMARY crashes now.
1231 * Therefore we must send the barrier_ack after the barrier request was
1232 * completed. */
1233 switch (mdev->write_ordering) {
b411b363
PR
1234 case WO_none:
1235 if (rv == FE_RECYCLED)
81e84650 1236 return true;
2451fc3b
PR
1237
1238 /* receiver context, in the writeout path of the other node.
1239 * avoid potential distributed deadlock */
1240 epoch = kmalloc(sizeof(struct drbd_epoch), GFP_NOIO);
1241 if (epoch)
1242 break;
1243 else
1244 dev_warn(DEV, "Allocation of an epoch failed, slowing down\n");
1245 /* Fall through */
b411b363
PR
1246
1247 case WO_bdev_flush:
1248 case WO_drain_io:
b411b363 1249 drbd_wait_ee_list_empty(mdev, &mdev->active_ee);
2451fc3b
PR
1250 drbd_flush(mdev);
1251
1252 if (atomic_read(&mdev->current_epoch->epoch_size)) {
1253 epoch = kmalloc(sizeof(struct drbd_epoch), GFP_NOIO);
1254 if (epoch)
1255 break;
b411b363
PR
1256 }
1257
2451fc3b
PR
1258 epoch = mdev->current_epoch;
1259 wait_event(mdev->ee_wait, atomic_read(&epoch->epoch_size) == 0);
1260
1261 D_ASSERT(atomic_read(&epoch->active) == 0);
1262 D_ASSERT(epoch->flags == 0);
b411b363 1263
81e84650 1264 return true;
2451fc3b
PR
1265 default:
1266 dev_err(DEV, "Strangeness in mdev->write_ordering %d\n", mdev->write_ordering);
81e84650 1267 return false;
b411b363
PR
1268 }
1269
1270 epoch->flags = 0;
1271 atomic_set(&epoch->epoch_size, 0);
1272 atomic_set(&epoch->active, 0);
1273
1274 spin_lock(&mdev->epoch_lock);
1275 if (atomic_read(&mdev->current_epoch->epoch_size)) {
1276 list_add(&epoch->list, &mdev->current_epoch->list);
1277 mdev->current_epoch = epoch;
1278 mdev->epochs++;
b411b363
PR
1279 } else {
1280 /* The current_epoch got recycled while we allocated this one... */
1281 kfree(epoch);
1282 }
1283 spin_unlock(&mdev->epoch_lock);
1284
81e84650 1285 return true;
b411b363
PR
1286}
1287
1288/* used from receive_RSDataReply (recv_resync_read)
1289 * and from receive_Data */
f6ffca9f
AG
1290static struct drbd_peer_request *
1291read_in_block(struct drbd_conf *mdev, u64 id, sector_t sector,
1292 int data_size) __must_hold(local)
b411b363 1293{
6666032a 1294 const sector_t capacity = drbd_get_capacity(mdev->this_bdev);
db830c46 1295 struct drbd_peer_request *peer_req;
b411b363 1296 struct page *page;
45bb912b 1297 int dgs, ds, rr;
a0638456
PR
1298 void *dig_in = mdev->tconn->int_dig_in;
1299 void *dig_vv = mdev->tconn->int_dig_vv;
6b4388ac 1300 unsigned long *data;
b411b363 1301
a0638456
PR
1302 dgs = (mdev->tconn->agreed_pro_version >= 87 && mdev->tconn->integrity_r_tfm) ?
1303 crypto_hash_digestsize(mdev->tconn->integrity_r_tfm) : 0;
b411b363
PR
1304
1305 if (dgs) {
de0ff338 1306 rr = drbd_recv(mdev->tconn, dig_in, dgs);
b411b363 1307 if (rr != dgs) {
0ddc5549
LE
1308 if (!signal_pending(current))
1309 dev_warn(DEV,
1310 "short read receiving data digest: read %d expected %d\n",
1311 rr, dgs);
b411b363
PR
1312 return NULL;
1313 }
1314 }
1315
1316 data_size -= dgs;
1317
841ce241
AG
1318 if (!expect(data_size != 0))
1319 return NULL;
1320 if (!expect(IS_ALIGNED(data_size, 512)))
1321 return NULL;
1322 if (!expect(data_size <= DRBD_MAX_BIO_SIZE))
1323 return NULL;
b411b363 1324
6666032a
LE
1325 /* even though we trust out peer,
1326 * we sometimes have to double check. */
1327 if (sector + (data_size>>9) > capacity) {
fdda6544
LE
1328 dev_err(DEV, "request from peer beyond end of local disk: "
1329 "capacity: %llus < sector: %llus + size: %u\n",
6666032a
LE
1330 (unsigned long long)capacity,
1331 (unsigned long long)sector, data_size);
1332 return NULL;
1333 }
1334
b411b363
PR
1335 /* GFP_NOIO, because we must not cause arbitrary write-out: in a DRBD
1336 * "criss-cross" setup, that might cause write-out on some other DRBD,
1337 * which in turn might block on the other node at this very place. */
db830c46
AG
1338 peer_req = drbd_alloc_ee(mdev, id, sector, data_size, GFP_NOIO);
1339 if (!peer_req)
b411b363 1340 return NULL;
45bb912b 1341
b411b363 1342 ds = data_size;
db830c46 1343 page = peer_req->pages;
45bb912b
LE
1344 page_chain_for_each(page) {
1345 unsigned len = min_t(int, ds, PAGE_SIZE);
6b4388ac 1346 data = kmap(page);
de0ff338 1347 rr = drbd_recv(mdev->tconn, data, len);
0cf9d27e 1348 if (drbd_insert_fault(mdev, DRBD_FAULT_RECEIVE)) {
6b4388ac
PR
1349 dev_err(DEV, "Fault injection: Corrupting data on receive\n");
1350 data[0] = data[0] ^ (unsigned long)-1;
1351 }
b411b363 1352 kunmap(page);
45bb912b 1353 if (rr != len) {
db830c46 1354 drbd_free_ee(mdev, peer_req);
0ddc5549
LE
1355 if (!signal_pending(current))
1356 dev_warn(DEV, "short read receiving data: read %d expected %d\n",
1357 rr, len);
b411b363
PR
1358 return NULL;
1359 }
1360 ds -= rr;
1361 }
1362
1363 if (dgs) {
db830c46 1364 drbd_csum_ee(mdev, mdev->tconn->integrity_r_tfm, peer_req, dig_vv);
b411b363 1365 if (memcmp(dig_in, dig_vv, dgs)) {
470be44a
LE
1366 dev_err(DEV, "Digest integrity check FAILED: %llus +%u\n",
1367 (unsigned long long)sector, data_size);
db830c46 1368 drbd_free_ee(mdev, peer_req);
b411b363
PR
1369 return NULL;
1370 }
1371 }
1372 mdev->recv_cnt += data_size>>9;
db830c46 1373 return peer_req;
b411b363
PR
1374}
1375
1376/* drbd_drain_block() just takes a data block
1377 * out of the socket input buffer, and discards it.
1378 */
1379static int drbd_drain_block(struct drbd_conf *mdev, int data_size)
1380{
1381 struct page *page;
fc5be839 1382 int rr, err = 0;
b411b363
PR
1383 void *data;
1384
c3470cde 1385 if (!data_size)
fc5be839 1386 return 0;
c3470cde 1387
45bb912b 1388 page = drbd_pp_alloc(mdev, 1, 1);
b411b363
PR
1389
1390 data = kmap(page);
1391 while (data_size) {
fc5be839
AG
1392 unsigned int len = min_t(int, data_size, PAGE_SIZE);
1393
1394 rr = drbd_recv(mdev->tconn, data, len);
1395 if (rr != len) {
0ddc5549
LE
1396 if (!signal_pending(current))
1397 dev_warn(DEV,
1398 "short read receiving data: read %d expected %d\n",
fc5be839
AG
1399 rr, len);
1400 err = (rr < 0) ? rr : -EIO;
b411b363
PR
1401 break;
1402 }
1403 data_size -= rr;
1404 }
1405 kunmap(page);
435f0740 1406 drbd_pp_free(mdev, page, 0);
fc5be839 1407 return err;
b411b363
PR
1408}
1409
1410static int recv_dless_read(struct drbd_conf *mdev, struct drbd_request *req,
1411 sector_t sector, int data_size)
1412{
1413 struct bio_vec *bvec;
1414 struct bio *bio;
1415 int dgs, rr, i, expect;
a0638456
PR
1416 void *dig_in = mdev->tconn->int_dig_in;
1417 void *dig_vv = mdev->tconn->int_dig_vv;
b411b363 1418
a0638456
PR
1419 dgs = (mdev->tconn->agreed_pro_version >= 87 && mdev->tconn->integrity_r_tfm) ?
1420 crypto_hash_digestsize(mdev->tconn->integrity_r_tfm) : 0;
b411b363
PR
1421
1422 if (dgs) {
de0ff338 1423 rr = drbd_recv(mdev->tconn, dig_in, dgs);
b411b363 1424 if (rr != dgs) {
0ddc5549
LE
1425 if (!signal_pending(current))
1426 dev_warn(DEV,
1427 "short read receiving data reply digest: read %d expected %d\n",
1428 rr, dgs);
b411b363
PR
1429 return 0;
1430 }
1431 }
1432
1433 data_size -= dgs;
1434
1435 /* optimistically update recv_cnt. if receiving fails below,
1436 * we disconnect anyways, and counters will be reset. */
1437 mdev->recv_cnt += data_size>>9;
1438
1439 bio = req->master_bio;
1440 D_ASSERT(sector == bio->bi_sector);
1441
1442 bio_for_each_segment(bvec, bio, i) {
1443 expect = min_t(int, data_size, bvec->bv_len);
de0ff338 1444 rr = drbd_recv(mdev->tconn,
b411b363
PR
1445 kmap(bvec->bv_page)+bvec->bv_offset,
1446 expect);
1447 kunmap(bvec->bv_page);
1448 if (rr != expect) {
0ddc5549
LE
1449 if (!signal_pending(current))
1450 dev_warn(DEV, "short read receiving data reply: "
1451 "read %d expected %d\n",
1452 rr, expect);
b411b363
PR
1453 return 0;
1454 }
1455 data_size -= rr;
1456 }
1457
1458 if (dgs) {
a0638456 1459 drbd_csum_bio(mdev, mdev->tconn->integrity_r_tfm, bio, dig_vv);
b411b363
PR
1460 if (memcmp(dig_in, dig_vv, dgs)) {
1461 dev_err(DEV, "Digest integrity check FAILED. Broken NICs?\n");
1462 return 0;
1463 }
1464 }
1465
1466 D_ASSERT(data_size == 0);
1467 return 1;
1468}
1469
1470/* e_end_resync_block() is called via
1471 * drbd_process_done_ee() by asender only */
99920dc5 1472static int e_end_resync_block(struct drbd_work *w, int unused)
b411b363 1473{
8050e6d0
AG
1474 struct drbd_peer_request *peer_req =
1475 container_of(w, struct drbd_peer_request, w);
00d56944 1476 struct drbd_conf *mdev = w->mdev;
db830c46 1477 sector_t sector = peer_req->i.sector;
99920dc5 1478 int err;
b411b363 1479
db830c46 1480 D_ASSERT(drbd_interval_empty(&peer_req->i));
b411b363 1481
db830c46
AG
1482 if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
1483 drbd_set_in_sync(mdev, sector, peer_req->i.size);
99920dc5 1484 err = drbd_send_ack(mdev, P_RS_WRITE_ACK, peer_req);
b411b363
PR
1485 } else {
1486 /* Record failure to sync */
db830c46 1487 drbd_rs_failed_io(mdev, sector, peer_req->i.size);
b411b363 1488
99920dc5 1489 err = drbd_send_ack(mdev, P_NEG_ACK, peer_req);
b411b363
PR
1490 }
1491 dec_unacked(mdev);
1492
99920dc5 1493 return err;
b411b363
PR
1494}
1495
1496static int recv_resync_read(struct drbd_conf *mdev, sector_t sector, int data_size) __releases(local)
1497{
db830c46 1498 struct drbd_peer_request *peer_req;
b411b363 1499
db830c46
AG
1500 peer_req = read_in_block(mdev, ID_SYNCER, sector, data_size);
1501 if (!peer_req)
45bb912b 1502 goto fail;
b411b363
PR
1503
1504 dec_rs_pending(mdev);
1505
b411b363
PR
1506 inc_unacked(mdev);
1507 /* corresponding dec_unacked() in e_end_resync_block()
1508 * respective _drbd_clear_done_ee */
1509
db830c46 1510 peer_req->w.cb = e_end_resync_block;
45bb912b 1511
87eeee41 1512 spin_lock_irq(&mdev->tconn->req_lock);
db830c46 1513 list_add(&peer_req->w.list, &mdev->sync_ee);
87eeee41 1514 spin_unlock_irq(&mdev->tconn->req_lock);
b411b363 1515
0f0601f4 1516 atomic_add(data_size >> 9, &mdev->rs_sect_ev);
fbe29dec 1517 if (drbd_submit_peer_request(mdev, peer_req, WRITE, DRBD_FAULT_RS_WR) == 0)
81e84650 1518 return true;
b411b363 1519
10f6d992
LE
1520 /* don't care for the reason here */
1521 dev_err(DEV, "submit failed, triggering re-connect\n");
87eeee41 1522 spin_lock_irq(&mdev->tconn->req_lock);
db830c46 1523 list_del(&peer_req->w.list);
87eeee41 1524 spin_unlock_irq(&mdev->tconn->req_lock);
22cc37a9 1525
db830c46 1526 drbd_free_ee(mdev, peer_req);
45bb912b
LE
1527fail:
1528 put_ldev(mdev);
81e84650 1529 return false;
b411b363
PR
1530}
1531
668eebc6 1532static struct drbd_request *
bc9c5c41
AG
1533find_request(struct drbd_conf *mdev, struct rb_root *root, u64 id,
1534 sector_t sector, bool missing_ok, const char *func)
51624585 1535{
51624585
AG
1536 struct drbd_request *req;
1537
bc9c5c41
AG
1538 /* Request object according to our peer */
1539 req = (struct drbd_request *)(unsigned long)id;
5e472264 1540 if (drbd_contains_interval(root, sector, &req->i) && req->i.local)
668eebc6 1541 return req;
c3afd8f5
AG
1542 if (!missing_ok) {
1543 dev_err(DEV, "%s: failed to find request %lu, sector %llus\n", func,
1544 (unsigned long)id, (unsigned long long)sector);
1545 }
51624585
AG
1546 return NULL;
1547}
1548
d8763023
AG
1549static int receive_DataReply(struct drbd_conf *mdev, enum drbd_packet cmd,
1550 unsigned int data_size)
b411b363
PR
1551{
1552 struct drbd_request *req;
1553 sector_t sector;
b411b363 1554 int ok;
e42325a5 1555 struct p_data *p = &mdev->tconn->data.rbuf.data;
b411b363
PR
1556
1557 sector = be64_to_cpu(p->sector);
1558
87eeee41 1559 spin_lock_irq(&mdev->tconn->req_lock);
bc9c5c41 1560 req = find_request(mdev, &mdev->read_requests, p->block_id, sector, false, __func__);
87eeee41 1561 spin_unlock_irq(&mdev->tconn->req_lock);
c3afd8f5 1562 if (unlikely(!req))
81e84650 1563 return false;
b411b363 1564
24c4830c 1565 /* hlist_del(&req->collision) is done in _req_may_be_done, to avoid
b411b363
PR
1566 * special casing it there for the various failure cases.
1567 * still no race with drbd_fail_pending_reads */
1568 ok = recv_dless_read(mdev, req, sector, data_size);
1569
1570 if (ok)
8554df1c 1571 req_mod(req, DATA_RECEIVED);
b411b363
PR
1572 /* else: nothing. handled from drbd_disconnect...
1573 * I don't think we may complete this just yet
1574 * in case we are "on-disconnect: freeze" */
1575
1576 return ok;
1577}
1578
d8763023
AG
1579static int receive_RSDataReply(struct drbd_conf *mdev, enum drbd_packet cmd,
1580 unsigned int data_size)
b411b363
PR
1581{
1582 sector_t sector;
b411b363 1583 int ok;
e42325a5 1584 struct p_data *p = &mdev->tconn->data.rbuf.data;
b411b363
PR
1585
1586 sector = be64_to_cpu(p->sector);
1587 D_ASSERT(p->block_id == ID_SYNCER);
1588
1589 if (get_ldev(mdev)) {
1590 /* data is submitted to disk within recv_resync_read.
1591 * corresponding put_ldev done below on error,
fcefa62e 1592 * or in drbd_peer_request_endio. */
b411b363
PR
1593 ok = recv_resync_read(mdev, sector, data_size);
1594 } else {
1595 if (__ratelimit(&drbd_ratelimit_state))
1596 dev_err(DEV, "Can not write resync data to local disk.\n");
1597
fc5be839 1598 ok = !drbd_drain_block(mdev, data_size);
b411b363 1599
2b2bf214 1600 drbd_send_ack_dp(mdev, P_NEG_ACK, p, data_size);
b411b363
PR
1601 }
1602
778f271d
PR
1603 atomic_add(data_size >> 9, &mdev->rs_sect_in);
1604
b411b363
PR
1605 return ok;
1606}
1607
99920dc5 1608static int w_restart_write(struct drbd_work *w, int cancel)
7be8da07
AG
1609{
1610 struct drbd_request *req = container_of(w, struct drbd_request, w);
1611 struct drbd_conf *mdev = w->mdev;
1612 struct bio *bio;
1613 unsigned long start_time;
1614 unsigned long flags;
1615
1616 spin_lock_irqsave(&mdev->tconn->req_lock, flags);
1617 if (!expect(req->rq_state & RQ_POSTPONED)) {
1618 spin_unlock_irqrestore(&mdev->tconn->req_lock, flags);
99920dc5 1619 return -EIO;
7be8da07
AG
1620 }
1621 bio = req->master_bio;
1622 start_time = req->start_time;
1623 /* Postponed requests will not have their master_bio completed! */
1624 __req_mod(req, DISCARD_WRITE, NULL);
1625 spin_unlock_irqrestore(&mdev->tconn->req_lock, flags);
1626
1627 while (__drbd_make_request(mdev, bio, start_time))
1628 /* retry */ ;
99920dc5 1629 return 0;
7be8da07
AG
1630}
1631
1632static void restart_conflicting_writes(struct drbd_conf *mdev,
1633 sector_t sector, int size)
1634{
1635 struct drbd_interval *i;
1636 struct drbd_request *req;
1637
1638 drbd_for_each_overlap(i, &mdev->write_requests, sector, size) {
1639 if (!i->local)
1640 continue;
1641 req = container_of(i, struct drbd_request, i);
1642 if (req->rq_state & RQ_LOCAL_PENDING ||
1643 !(req->rq_state & RQ_POSTPONED))
1644 continue;
1645 if (expect(list_empty(&req->w.list))) {
1646 req->w.mdev = mdev;
1647 req->w.cb = w_restart_write;
1648 drbd_queue_work(&mdev->tconn->data.work, &req->w);
1649 }
1650 }
1651}
1652
b411b363
PR
1653/* e_end_block() is called via drbd_process_done_ee().
1654 * this means this function only runs in the asender thread
1655 */
99920dc5 1656static int e_end_block(struct drbd_work *w, int cancel)
b411b363 1657{
8050e6d0
AG
1658 struct drbd_peer_request *peer_req =
1659 container_of(w, struct drbd_peer_request, w);
00d56944 1660 struct drbd_conf *mdev = w->mdev;
db830c46 1661 sector_t sector = peer_req->i.sector;
99920dc5 1662 int err = 0, pcmd;
b411b363 1663
89e58e75 1664 if (mdev->tconn->net_conf->wire_protocol == DRBD_PROT_C) {
db830c46 1665 if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
b411b363
PR
1666 pcmd = (mdev->state.conn >= C_SYNC_SOURCE &&
1667 mdev->state.conn <= C_PAUSED_SYNC_T &&
db830c46 1668 peer_req->flags & EE_MAY_SET_IN_SYNC) ?
b411b363 1669 P_RS_WRITE_ACK : P_WRITE_ACK;
99920dc5 1670 err = drbd_send_ack(mdev, pcmd, peer_req);
b411b363 1671 if (pcmd == P_RS_WRITE_ACK)
db830c46 1672 drbd_set_in_sync(mdev, sector, peer_req->i.size);
b411b363 1673 } else {
99920dc5 1674 err = drbd_send_ack(mdev, P_NEG_ACK, peer_req);
b411b363
PR
1675 /* we expect it to be marked out of sync anyways...
1676 * maybe assert this? */
1677 }
1678 dec_unacked(mdev);
1679 }
1680 /* we delete from the conflict detection hash _after_ we sent out the
1681 * P_WRITE_ACK / P_NEG_ACK, to get the sequence number right. */
89e58e75 1682 if (mdev->tconn->net_conf->two_primaries) {
87eeee41 1683 spin_lock_irq(&mdev->tconn->req_lock);
db830c46
AG
1684 D_ASSERT(!drbd_interval_empty(&peer_req->i));
1685 drbd_remove_epoch_entry_interval(mdev, peer_req);
7be8da07
AG
1686 if (peer_req->flags & EE_RESTART_REQUESTS)
1687 restart_conflicting_writes(mdev, sector, peer_req->i.size);
87eeee41 1688 spin_unlock_irq(&mdev->tconn->req_lock);
bb3bfe96 1689 } else
db830c46 1690 D_ASSERT(drbd_interval_empty(&peer_req->i));
b411b363 1691
db830c46 1692 drbd_may_finish_epoch(mdev, peer_req->epoch, EV_PUT + (cancel ? EV_CLEANUP : 0));
b411b363 1693
99920dc5 1694 return err;
b411b363
PR
1695}
1696
7be8da07 1697static int e_send_ack(struct drbd_work *w, enum drbd_packet ack)
b411b363 1698{
7be8da07 1699 struct drbd_conf *mdev = w->mdev;
8050e6d0
AG
1700 struct drbd_peer_request *peer_req =
1701 container_of(w, struct drbd_peer_request, w);
99920dc5 1702 int err;
b411b363 1703
99920dc5 1704 err = drbd_send_ack(mdev, ack, peer_req);
b411b363
PR
1705 dec_unacked(mdev);
1706
99920dc5 1707 return err;
b411b363
PR
1708}
1709
99920dc5 1710static int e_send_discard_write(struct drbd_work *w, int unused)
7be8da07
AG
1711{
1712 return e_send_ack(w, P_DISCARD_WRITE);
1713}
1714
99920dc5 1715static int e_send_retry_write(struct drbd_work *w, int unused)
7be8da07
AG
1716{
1717 struct drbd_tconn *tconn = w->mdev->tconn;
1718
1719 return e_send_ack(w, tconn->agreed_pro_version >= 100 ?
1720 P_RETRY_WRITE : P_DISCARD_WRITE);
1721}
1722
3e394da1
AG
1723static bool seq_greater(u32 a, u32 b)
1724{
1725 /*
1726 * We assume 32-bit wrap-around here.
1727 * For 24-bit wrap-around, we would have to shift:
1728 * a <<= 8; b <<= 8;
1729 */
1730 return (s32)a - (s32)b > 0;
1731}
1732
1733static u32 seq_max(u32 a, u32 b)
1734{
1735 return seq_greater(a, b) ? a : b;
1736}
1737
7be8da07
AG
1738static bool need_peer_seq(struct drbd_conf *mdev)
1739{
1740 struct drbd_tconn *tconn = mdev->tconn;
1741
1742 /*
1743 * We only need to keep track of the last packet_seq number of our peer
1744 * if we are in dual-primary mode and we have the discard flag set; see
1745 * handle_write_conflicts().
1746 */
1747 return tconn->net_conf->two_primaries &&
1748 test_bit(DISCARD_CONCURRENT, &tconn->flags);
1749}
1750
43ae077d 1751static void update_peer_seq(struct drbd_conf *mdev, unsigned int peer_seq)
3e394da1 1752{
3c13b680 1753 unsigned int newest_peer_seq;
3e394da1 1754
7be8da07
AG
1755 if (need_peer_seq(mdev)) {
1756 spin_lock(&mdev->peer_seq_lock);
3c13b680
LE
1757 newest_peer_seq = seq_max(mdev->peer_seq, peer_seq);
1758 mdev->peer_seq = newest_peer_seq;
7be8da07 1759 spin_unlock(&mdev->peer_seq_lock);
3c13b680
LE
1760 /* wake up only if we actually changed mdev->peer_seq */
1761 if (peer_seq == newest_peer_seq)
7be8da07
AG
1762 wake_up(&mdev->seq_wait);
1763 }
3e394da1
AG
1764}
1765
b411b363
PR
1766/* Called from receive_Data.
1767 * Synchronize packets on sock with packets on msock.
1768 *
1769 * This is here so even when a P_DATA packet traveling via sock overtook an Ack
1770 * packet traveling on msock, they are still processed in the order they have
1771 * been sent.
1772 *
1773 * Note: we don't care for Ack packets overtaking P_DATA packets.
1774 *
1775 * In case packet_seq is larger than mdev->peer_seq number, there are
1776 * outstanding packets on the msock. We wait for them to arrive.
1777 * In case we are the logically next packet, we update mdev->peer_seq
1778 * ourselves. Correctly handles 32bit wrap around.
1779 *
1780 * Assume we have a 10 GBit connection, that is about 1<<30 byte per second,
1781 * about 1<<21 sectors per second. So "worst" case, we have 1<<3 == 8 seconds
1782 * for the 24bit wrap (historical atomic_t guarantee on some archs), and we have
1783 * 1<<9 == 512 seconds aka ages for the 32bit wrap around...
1784 *
1785 * returns 0 if we may process the packet,
1786 * -ERESTARTSYS if we were interrupted (by disconnect signal). */
7be8da07 1787static int wait_for_and_update_peer_seq(struct drbd_conf *mdev, const u32 peer_seq)
b411b363
PR
1788{
1789 DEFINE_WAIT(wait);
b411b363 1790 long timeout;
7be8da07
AG
1791 int ret;
1792
1793 if (!need_peer_seq(mdev))
1794 return 0;
1795
b411b363
PR
1796 spin_lock(&mdev->peer_seq_lock);
1797 for (;;) {
7be8da07
AG
1798 if (!seq_greater(peer_seq - 1, mdev->peer_seq)) {
1799 mdev->peer_seq = seq_max(mdev->peer_seq, peer_seq);
1800 ret = 0;
b411b363 1801 break;
7be8da07 1802 }
b411b363
PR
1803 if (signal_pending(current)) {
1804 ret = -ERESTARTSYS;
1805 break;
1806 }
7be8da07 1807 prepare_to_wait(&mdev->seq_wait, &wait, TASK_INTERRUPTIBLE);
b411b363 1808 spin_unlock(&mdev->peer_seq_lock);
71b1c1eb
AG
1809 timeout = mdev->tconn->net_conf->ping_timeo*HZ/10;
1810 timeout = schedule_timeout(timeout);
b411b363 1811 spin_lock(&mdev->peer_seq_lock);
7be8da07 1812 if (!timeout) {
b411b363 1813 ret = -ETIMEDOUT;
71b1c1eb 1814 dev_err(DEV, "Timed out waiting for missing ack packets; disconnecting\n");
b411b363
PR
1815 break;
1816 }
1817 }
b411b363 1818 spin_unlock(&mdev->peer_seq_lock);
7be8da07 1819 finish_wait(&mdev->seq_wait, &wait);
b411b363
PR
1820 return ret;
1821}
1822
688593c5
LE
1823/* see also bio_flags_to_wire()
1824 * DRBD_REQ_*, because we need to semantically map the flags to data packet
1825 * flags and back. We may replicate to other kernel versions. */
1826static unsigned long wire_flags_to_bio(struct drbd_conf *mdev, u32 dpf)
76d2e7ec 1827{
688593c5
LE
1828 return (dpf & DP_RW_SYNC ? REQ_SYNC : 0) |
1829 (dpf & DP_FUA ? REQ_FUA : 0) |
1830 (dpf & DP_FLUSH ? REQ_FLUSH : 0) |
1831 (dpf & DP_DISCARD ? REQ_DISCARD : 0);
76d2e7ec
PR
1832}
1833
7be8da07
AG
1834static void fail_postponed_requests(struct drbd_conf *mdev, sector_t sector,
1835 unsigned int size)
1836{
1837 struct drbd_interval *i;
1838
1839 repeat:
1840 drbd_for_each_overlap(i, &mdev->write_requests, sector, size) {
1841 struct drbd_request *req;
1842 struct bio_and_error m;
1843
1844 if (!i->local)
1845 continue;
1846 req = container_of(i, struct drbd_request, i);
1847 if (!(req->rq_state & RQ_POSTPONED))
1848 continue;
1849 req->rq_state &= ~RQ_POSTPONED;
1850 __req_mod(req, NEG_ACKED, &m);
1851 spin_unlock_irq(&mdev->tconn->req_lock);
1852 if (m.bio)
1853 complete_master_bio(mdev, &m);
1854 spin_lock_irq(&mdev->tconn->req_lock);
1855 goto repeat;
1856 }
1857}
1858
1859static int handle_write_conflicts(struct drbd_conf *mdev,
1860 struct drbd_peer_request *peer_req)
1861{
1862 struct drbd_tconn *tconn = mdev->tconn;
1863 bool resolve_conflicts = test_bit(DISCARD_CONCURRENT, &tconn->flags);
1864 sector_t sector = peer_req->i.sector;
1865 const unsigned int size = peer_req->i.size;
1866 struct drbd_interval *i;
1867 bool equal;
1868 int err;
1869
1870 /*
1871 * Inserting the peer request into the write_requests tree will prevent
1872 * new conflicting local requests from being added.
1873 */
1874 drbd_insert_interval(&mdev->write_requests, &peer_req->i);
1875
1876 repeat:
1877 drbd_for_each_overlap(i, &mdev->write_requests, sector, size) {
1878 if (i == &peer_req->i)
1879 continue;
1880
1881 if (!i->local) {
1882 /*
1883 * Our peer has sent a conflicting remote request; this
1884 * should not happen in a two-node setup. Wait for the
1885 * earlier peer request to complete.
1886 */
1887 err = drbd_wait_misc(mdev, i);
1888 if (err)
1889 goto out;
1890 goto repeat;
1891 }
1892
1893 equal = i->sector == sector && i->size == size;
1894 if (resolve_conflicts) {
1895 /*
1896 * If the peer request is fully contained within the
1897 * overlapping request, it can be discarded; otherwise,
1898 * it will be retried once all overlapping requests
1899 * have completed.
1900 */
1901 bool discard = i->sector <= sector && i->sector +
1902 (i->size >> 9) >= sector + (size >> 9);
1903
1904 if (!equal)
1905 dev_alert(DEV, "Concurrent writes detected: "
1906 "local=%llus +%u, remote=%llus +%u, "
1907 "assuming %s came first\n",
1908 (unsigned long long)i->sector, i->size,
1909 (unsigned long long)sector, size,
1910 discard ? "local" : "remote");
1911
1912 inc_unacked(mdev);
1913 peer_req->w.cb = discard ? e_send_discard_write :
1914 e_send_retry_write;
1915 list_add_tail(&peer_req->w.list, &mdev->done_ee);
1916 wake_asender(mdev->tconn);
1917
1918 err = -ENOENT;
1919 goto out;
1920 } else {
1921 struct drbd_request *req =
1922 container_of(i, struct drbd_request, i);
1923
1924 if (!equal)
1925 dev_alert(DEV, "Concurrent writes detected: "
1926 "local=%llus +%u, remote=%llus +%u\n",
1927 (unsigned long long)i->sector, i->size,
1928 (unsigned long long)sector, size);
1929
1930 if (req->rq_state & RQ_LOCAL_PENDING ||
1931 !(req->rq_state & RQ_POSTPONED)) {
1932 /*
1933 * Wait for the node with the discard flag to
1934 * decide if this request will be discarded or
1935 * retried. Requests that are discarded will
1936 * disappear from the write_requests tree.
1937 *
1938 * In addition, wait for the conflicting
1939 * request to finish locally before submitting
1940 * the conflicting peer request.
1941 */
1942 err = drbd_wait_misc(mdev, &req->i);
1943 if (err) {
1944 _conn_request_state(mdev->tconn,
1945 NS(conn, C_TIMEOUT),
1946 CS_HARD);
1947 fail_postponed_requests(mdev, sector, size);
1948 goto out;
1949 }
1950 goto repeat;
1951 }
1952 /*
1953 * Remember to restart the conflicting requests after
1954 * the new peer request has completed.
1955 */
1956 peer_req->flags |= EE_RESTART_REQUESTS;
1957 }
1958 }
1959 err = 0;
1960
1961 out:
1962 if (err)
1963 drbd_remove_epoch_entry_interval(mdev, peer_req);
1964 return err;
1965}
1966
b411b363 1967/* mirrored write */
d8763023
AG
1968static int receive_Data(struct drbd_conf *mdev, enum drbd_packet cmd,
1969 unsigned int data_size)
b411b363
PR
1970{
1971 sector_t sector;
db830c46 1972 struct drbd_peer_request *peer_req;
e42325a5 1973 struct p_data *p = &mdev->tconn->data.rbuf.data;
7be8da07 1974 u32 peer_seq = be32_to_cpu(p->seq_num);
b411b363
PR
1975 int rw = WRITE;
1976 u32 dp_flags;
7be8da07 1977 int err;
b411b363 1978
b411b363 1979
7be8da07
AG
1980 if (!get_ldev(mdev)) {
1981 err = wait_for_and_update_peer_seq(mdev, peer_seq);
2b2bf214 1982 drbd_send_ack_dp(mdev, P_NEG_ACK, p, data_size);
b411b363 1983 atomic_inc(&mdev->current_epoch->epoch_size);
fc5be839 1984 return !drbd_drain_block(mdev, data_size) && err == 0;
b411b363
PR
1985 }
1986
fcefa62e
AG
1987 /*
1988 * Corresponding put_ldev done either below (on various errors), or in
1989 * drbd_peer_request_endio, if we successfully submit the data at the
1990 * end of this function.
1991 */
b411b363
PR
1992
1993 sector = be64_to_cpu(p->sector);
db830c46
AG
1994 peer_req = read_in_block(mdev, p->block_id, sector, data_size);
1995 if (!peer_req) {
b411b363 1996 put_ldev(mdev);
81e84650 1997 return false;
b411b363
PR
1998 }
1999
db830c46 2000 peer_req->w.cb = e_end_block;
b411b363 2001
688593c5
LE
2002 dp_flags = be32_to_cpu(p->dp_flags);
2003 rw |= wire_flags_to_bio(mdev, dp_flags);
2004
2005 if (dp_flags & DP_MAY_SET_IN_SYNC)
db830c46 2006 peer_req->flags |= EE_MAY_SET_IN_SYNC;
688593c5 2007
b411b363 2008 spin_lock(&mdev->epoch_lock);
db830c46
AG
2009 peer_req->epoch = mdev->current_epoch;
2010 atomic_inc(&peer_req->epoch->epoch_size);
2011 atomic_inc(&peer_req->epoch->active);
b411b363
PR
2012 spin_unlock(&mdev->epoch_lock);
2013
7be8da07
AG
2014 if (mdev->tconn->net_conf->two_primaries) {
2015 err = wait_for_and_update_peer_seq(mdev, peer_seq);
2016 if (err)
b411b363 2017 goto out_interrupted;
87eeee41 2018 spin_lock_irq(&mdev->tconn->req_lock);
7be8da07
AG
2019 err = handle_write_conflicts(mdev, peer_req);
2020 if (err) {
2021 spin_unlock_irq(&mdev->tconn->req_lock);
2022 if (err == -ENOENT) {
b411b363 2023 put_ldev(mdev);
81e84650 2024 return true;
b411b363 2025 }
7be8da07 2026 goto out_interrupted;
b411b363 2027 }
7be8da07
AG
2028 } else
2029 spin_lock_irq(&mdev->tconn->req_lock);
db830c46 2030 list_add(&peer_req->w.list, &mdev->active_ee);
87eeee41 2031 spin_unlock_irq(&mdev->tconn->req_lock);
b411b363 2032
89e58e75 2033 switch (mdev->tconn->net_conf->wire_protocol) {
b411b363
PR
2034 case DRBD_PROT_C:
2035 inc_unacked(mdev);
2036 /* corresponding dec_unacked() in e_end_block()
2037 * respective _drbd_clear_done_ee */
2038 break;
2039 case DRBD_PROT_B:
2040 /* I really don't like it that the receiver thread
2041 * sends on the msock, but anyways */
db830c46 2042 drbd_send_ack(mdev, P_RECV_ACK, peer_req);
b411b363
PR
2043 break;
2044 case DRBD_PROT_A:
2045 /* nothing to do */
2046 break;
2047 }
2048
6719fb03 2049 if (mdev->state.pdsk < D_INCONSISTENT) {
b411b363 2050 /* In case we have the only disk of the cluster, */
db830c46
AG
2051 drbd_set_out_of_sync(mdev, peer_req->i.sector, peer_req->i.size);
2052 peer_req->flags |= EE_CALL_AL_COMPLETE_IO;
2053 peer_req->flags &= ~EE_MAY_SET_IN_SYNC;
2054 drbd_al_begin_io(mdev, peer_req->i.sector);
b411b363
PR
2055 }
2056
fbe29dec 2057 if (drbd_submit_peer_request(mdev, peer_req, rw, DRBD_FAULT_DT_WR) == 0)
81e84650 2058 return true;
b411b363 2059
10f6d992
LE
2060 /* don't care for the reason here */
2061 dev_err(DEV, "submit failed, triggering re-connect\n");
87eeee41 2062 spin_lock_irq(&mdev->tconn->req_lock);
db830c46
AG
2063 list_del(&peer_req->w.list);
2064 drbd_remove_epoch_entry_interval(mdev, peer_req);
87eeee41 2065 spin_unlock_irq(&mdev->tconn->req_lock);
db830c46
AG
2066 if (peer_req->flags & EE_CALL_AL_COMPLETE_IO)
2067 drbd_al_complete_io(mdev, peer_req->i.sector);
22cc37a9 2068
b411b363 2069out_interrupted:
db830c46 2070 drbd_may_finish_epoch(mdev, peer_req->epoch, EV_PUT + EV_CLEANUP);
b411b363 2071 put_ldev(mdev);
db830c46 2072 drbd_free_ee(mdev, peer_req);
81e84650 2073 return false;
b411b363
PR
2074}
2075
0f0601f4
LE
2076/* We may throttle resync, if the lower device seems to be busy,
2077 * and current sync rate is above c_min_rate.
2078 *
2079 * To decide whether or not the lower device is busy, we use a scheme similar
2080 * to MD RAID is_mddev_idle(): if the partition stats reveal "significant"
2081 * (more than 64 sectors) of activity we cannot account for with our own resync
2082 * activity, it obviously is "busy".
2083 *
2084 * The current sync rate used here uses only the most recent two step marks,
2085 * to have a short time average so we can react faster.
2086 */
e3555d85 2087int drbd_rs_should_slow_down(struct drbd_conf *mdev, sector_t sector)
0f0601f4
LE
2088{
2089 struct gendisk *disk = mdev->ldev->backing_bdev->bd_contains->bd_disk;
2090 unsigned long db, dt, dbdt;
e3555d85 2091 struct lc_element *tmp;
0f0601f4
LE
2092 int curr_events;
2093 int throttle = 0;
2094
2095 /* feature disabled? */
f399002e 2096 if (mdev->ldev->dc.c_min_rate == 0)
0f0601f4
LE
2097 return 0;
2098
e3555d85
PR
2099 spin_lock_irq(&mdev->al_lock);
2100 tmp = lc_find(mdev->resync, BM_SECT_TO_EXT(sector));
2101 if (tmp) {
2102 struct bm_extent *bm_ext = lc_entry(tmp, struct bm_extent, lce);
2103 if (test_bit(BME_PRIORITY, &bm_ext->flags)) {
2104 spin_unlock_irq(&mdev->al_lock);
2105 return 0;
2106 }
2107 /* Do not slow down if app IO is already waiting for this extent */
2108 }
2109 spin_unlock_irq(&mdev->al_lock);
2110
0f0601f4
LE
2111 curr_events = (int)part_stat_read(&disk->part0, sectors[0]) +
2112 (int)part_stat_read(&disk->part0, sectors[1]) -
2113 atomic_read(&mdev->rs_sect_ev);
e3555d85 2114
0f0601f4
LE
2115 if (!mdev->rs_last_events || curr_events - mdev->rs_last_events > 64) {
2116 unsigned long rs_left;
2117 int i;
2118
2119 mdev->rs_last_events = curr_events;
2120
2121 /* sync speed average over the last 2*DRBD_SYNC_MARK_STEP,
2122 * approx. */
2649f080
LE
2123 i = (mdev->rs_last_mark + DRBD_SYNC_MARKS-1) % DRBD_SYNC_MARKS;
2124
2125 if (mdev->state.conn == C_VERIFY_S || mdev->state.conn == C_VERIFY_T)
2126 rs_left = mdev->ov_left;
2127 else
2128 rs_left = drbd_bm_total_weight(mdev) - mdev->rs_failed;
0f0601f4
LE
2129
2130 dt = ((long)jiffies - (long)mdev->rs_mark_time[i]) / HZ;
2131 if (!dt)
2132 dt++;
2133 db = mdev->rs_mark_left[i] - rs_left;
2134 dbdt = Bit2KB(db/dt);
2135
f399002e 2136 if (dbdt > mdev->ldev->dc.c_min_rate)
0f0601f4
LE
2137 throttle = 1;
2138 }
2139 return throttle;
2140}
2141
2142
d8763023
AG
2143static int receive_DataRequest(struct drbd_conf *mdev, enum drbd_packet cmd,
2144 unsigned int digest_size)
b411b363
PR
2145{
2146 sector_t sector;
2147 const sector_t capacity = drbd_get_capacity(mdev->this_bdev);
db830c46 2148 struct drbd_peer_request *peer_req;
b411b363 2149 struct digest_info *di = NULL;
b18b37be 2150 int size, verb;
b411b363 2151 unsigned int fault_type;
e42325a5 2152 struct p_block_req *p = &mdev->tconn->data.rbuf.block_req;
b411b363
PR
2153
2154 sector = be64_to_cpu(p->sector);
2155 size = be32_to_cpu(p->blksize);
2156
c670a398 2157 if (size <= 0 || !IS_ALIGNED(size, 512) || size > DRBD_MAX_BIO_SIZE) {
b411b363
PR
2158 dev_err(DEV, "%s:%d: sector: %llus, size: %u\n", __FILE__, __LINE__,
2159 (unsigned long long)sector, size);
81e84650 2160 return false;
b411b363
PR
2161 }
2162 if (sector + (size>>9) > capacity) {
2163 dev_err(DEV, "%s:%d: sector: %llus, size: %u\n", __FILE__, __LINE__,
2164 (unsigned long long)sector, size);
81e84650 2165 return false;
b411b363
PR
2166 }
2167
2168 if (!get_ldev_if_state(mdev, D_UP_TO_DATE)) {
b18b37be
PR
2169 verb = 1;
2170 switch (cmd) {
2171 case P_DATA_REQUEST:
2172 drbd_send_ack_rp(mdev, P_NEG_DREPLY, p);
2173 break;
2174 case P_RS_DATA_REQUEST:
2175 case P_CSUM_RS_REQUEST:
2176 case P_OV_REQUEST:
2177 drbd_send_ack_rp(mdev, P_NEG_RS_DREPLY , p);
2178 break;
2179 case P_OV_REPLY:
2180 verb = 0;
2181 dec_rs_pending(mdev);
2182 drbd_send_ack_ex(mdev, P_OV_RESULT, sector, size, ID_IN_SYNC);
2183 break;
2184 default:
2185 dev_err(DEV, "unexpected command (%s) in receive_DataRequest\n",
2186 cmdname(cmd));
2187 }
2188 if (verb && __ratelimit(&drbd_ratelimit_state))
b411b363
PR
2189 dev_err(DEV, "Can not satisfy peer's read request, "
2190 "no local data.\n");
b18b37be 2191
a821cc4a 2192 /* drain possibly payload */
fc5be839 2193 return !drbd_drain_block(mdev, digest_size);
b411b363
PR
2194 }
2195
2196 /* GFP_NOIO, because we must not cause arbitrary write-out: in a DRBD
2197 * "criss-cross" setup, that might cause write-out on some other DRBD,
2198 * which in turn might block on the other node at this very place. */
db830c46
AG
2199 peer_req = drbd_alloc_ee(mdev, p->block_id, sector, size, GFP_NOIO);
2200 if (!peer_req) {
b411b363 2201 put_ldev(mdev);
81e84650 2202 return false;
b411b363
PR
2203 }
2204
02918be2 2205 switch (cmd) {
b411b363 2206 case P_DATA_REQUEST:
db830c46 2207 peer_req->w.cb = w_e_end_data_req;
b411b363 2208 fault_type = DRBD_FAULT_DT_RD;
80a40e43
LE
2209 /* application IO, don't drbd_rs_begin_io */
2210 goto submit;
2211
b411b363 2212 case P_RS_DATA_REQUEST:
db830c46 2213 peer_req->w.cb = w_e_end_rsdata_req;
b411b363 2214 fault_type = DRBD_FAULT_RS_RD;
5f9915bb
LE
2215 /* used in the sector offset progress display */
2216 mdev->bm_resync_fo = BM_SECT_TO_BIT(sector);
b411b363
PR
2217 break;
2218
2219 case P_OV_REPLY:
2220 case P_CSUM_RS_REQUEST:
2221 fault_type = DRBD_FAULT_RS_RD;
b411b363
PR
2222 di = kmalloc(sizeof(*di) + digest_size, GFP_NOIO);
2223 if (!di)
2224 goto out_free_e;
2225
2226 di->digest_size = digest_size;
2227 di->digest = (((char *)di)+sizeof(struct digest_info));
2228
db830c46
AG
2229 peer_req->digest = di;
2230 peer_req->flags |= EE_HAS_DIGEST;
c36c3ced 2231
de0ff338 2232 if (drbd_recv(mdev->tconn, di->digest, digest_size) != digest_size)
b411b363
PR
2233 goto out_free_e;
2234
02918be2 2235 if (cmd == P_CSUM_RS_REQUEST) {
31890f4a 2236 D_ASSERT(mdev->tconn->agreed_pro_version >= 89);
db830c46 2237 peer_req->w.cb = w_e_end_csum_rs_req;
5f9915bb
LE
2238 /* used in the sector offset progress display */
2239 mdev->bm_resync_fo = BM_SECT_TO_BIT(sector);
02918be2 2240 } else if (cmd == P_OV_REPLY) {
2649f080
LE
2241 /* track progress, we may need to throttle */
2242 atomic_add(size >> 9, &mdev->rs_sect_in);
db830c46 2243 peer_req->w.cb = w_e_end_ov_reply;
b411b363 2244 dec_rs_pending(mdev);
0f0601f4
LE
2245 /* drbd_rs_begin_io done when we sent this request,
2246 * but accounting still needs to be done. */
2247 goto submit_for_resync;
b411b363
PR
2248 }
2249 break;
2250
2251 case P_OV_REQUEST:
b411b363 2252 if (mdev->ov_start_sector == ~(sector_t)0 &&
31890f4a 2253 mdev->tconn->agreed_pro_version >= 90) {
de228bba
LE
2254 unsigned long now = jiffies;
2255 int i;
b411b363
PR
2256 mdev->ov_start_sector = sector;
2257 mdev->ov_position = sector;
30b743a2
LE
2258 mdev->ov_left = drbd_bm_bits(mdev) - BM_SECT_TO_BIT(sector);
2259 mdev->rs_total = mdev->ov_left;
de228bba
LE
2260 for (i = 0; i < DRBD_SYNC_MARKS; i++) {
2261 mdev->rs_mark_left[i] = mdev->ov_left;
2262 mdev->rs_mark_time[i] = now;
2263 }
b411b363
PR
2264 dev_info(DEV, "Online Verify start sector: %llu\n",
2265 (unsigned long long)sector);
2266 }
db830c46 2267 peer_req->w.cb = w_e_end_ov_req;
b411b363 2268 fault_type = DRBD_FAULT_RS_RD;
b411b363
PR
2269 break;
2270
b411b363
PR
2271 default:
2272 dev_err(DEV, "unexpected command (%s) in receive_DataRequest\n",
02918be2 2273 cmdname(cmd));
b411b363 2274 fault_type = DRBD_FAULT_MAX;
80a40e43 2275 goto out_free_e;
b411b363
PR
2276 }
2277
0f0601f4
LE
2278 /* Throttle, drbd_rs_begin_io and submit should become asynchronous
2279 * wrt the receiver, but it is not as straightforward as it may seem.
2280 * Various places in the resync start and stop logic assume resync
2281 * requests are processed in order, requeuing this on the worker thread
2282 * introduces a bunch of new code for synchronization between threads.
2283 *
2284 * Unlimited throttling before drbd_rs_begin_io may stall the resync
2285 * "forever", throttling after drbd_rs_begin_io will lock that extent
2286 * for application writes for the same time. For now, just throttle
2287 * here, where the rest of the code expects the receiver to sleep for
2288 * a while, anyways.
2289 */
2290
2291 /* Throttle before drbd_rs_begin_io, as that locks out application IO;
2292 * this defers syncer requests for some time, before letting at least
2293 * on request through. The resync controller on the receiving side
2294 * will adapt to the incoming rate accordingly.
2295 *
2296 * We cannot throttle here if remote is Primary/SyncTarget:
2297 * we would also throttle its application reads.
2298 * In that case, throttling is done on the SyncTarget only.
2299 */
e3555d85
PR
2300 if (mdev->state.peer != R_PRIMARY && drbd_rs_should_slow_down(mdev, sector))
2301 schedule_timeout_uninterruptible(HZ/10);
2302 if (drbd_rs_begin_io(mdev, sector))
80a40e43 2303 goto out_free_e;
b411b363 2304
0f0601f4
LE
2305submit_for_resync:
2306 atomic_add(size >> 9, &mdev->rs_sect_ev);
2307
80a40e43 2308submit:
b411b363 2309 inc_unacked(mdev);
87eeee41 2310 spin_lock_irq(&mdev->tconn->req_lock);
db830c46 2311 list_add_tail(&peer_req->w.list, &mdev->read_ee);
87eeee41 2312 spin_unlock_irq(&mdev->tconn->req_lock);
b411b363 2313
fbe29dec 2314 if (drbd_submit_peer_request(mdev, peer_req, READ, fault_type) == 0)
81e84650 2315 return true;
b411b363 2316
10f6d992
LE
2317 /* don't care for the reason here */
2318 dev_err(DEV, "submit failed, triggering re-connect\n");
87eeee41 2319 spin_lock_irq(&mdev->tconn->req_lock);
db830c46 2320 list_del(&peer_req->w.list);
87eeee41 2321 spin_unlock_irq(&mdev->tconn->req_lock);
22cc37a9
LE
2322 /* no drbd_rs_complete_io(), we are dropping the connection anyways */
2323
b411b363 2324out_free_e:
b411b363 2325 put_ldev(mdev);
db830c46 2326 drbd_free_ee(mdev, peer_req);
81e84650 2327 return false;
b411b363
PR
2328}
2329
2330static int drbd_asb_recover_0p(struct drbd_conf *mdev) __must_hold(local)
2331{
2332 int self, peer, rv = -100;
2333 unsigned long ch_self, ch_peer;
2334
2335 self = mdev->ldev->md.uuid[UI_BITMAP] & 1;
2336 peer = mdev->p_uuid[UI_BITMAP] & 1;
2337
2338 ch_peer = mdev->p_uuid[UI_SIZE];
2339 ch_self = mdev->comm_bm_set;
2340
89e58e75 2341 switch (mdev->tconn->net_conf->after_sb_0p) {
b411b363
PR
2342 case ASB_CONSENSUS:
2343 case ASB_DISCARD_SECONDARY:
2344 case ASB_CALL_HELPER:
2345 dev_err(DEV, "Configuration error.\n");
2346 break;
2347 case ASB_DISCONNECT:
2348 break;
2349 case ASB_DISCARD_YOUNGER_PRI:
2350 if (self == 0 && peer == 1) {
2351 rv = -1;
2352 break;
2353 }
2354 if (self == 1 && peer == 0) {
2355 rv = 1;
2356 break;
2357 }
2358 /* Else fall through to one of the other strategies... */
2359 case ASB_DISCARD_OLDER_PRI:
2360 if (self == 0 && peer == 1) {
2361 rv = 1;
2362 break;
2363 }
2364 if (self == 1 && peer == 0) {
2365 rv = -1;
2366 break;
2367 }
2368 /* Else fall through to one of the other strategies... */
ad19bf6e 2369 dev_warn(DEV, "Discard younger/older primary did not find a decision\n"
b411b363
PR
2370 "Using discard-least-changes instead\n");
2371 case ASB_DISCARD_ZERO_CHG:
2372 if (ch_peer == 0 && ch_self == 0) {
25703f83 2373 rv = test_bit(DISCARD_CONCURRENT, &mdev->tconn->flags)
b411b363
PR
2374 ? -1 : 1;
2375 break;
2376 } else {
2377 if (ch_peer == 0) { rv = 1; break; }
2378 if (ch_self == 0) { rv = -1; break; }
2379 }
89e58e75 2380 if (mdev->tconn->net_conf->after_sb_0p == ASB_DISCARD_ZERO_CHG)
b411b363
PR
2381 break;
2382 case ASB_DISCARD_LEAST_CHG:
2383 if (ch_self < ch_peer)
2384 rv = -1;
2385 else if (ch_self > ch_peer)
2386 rv = 1;
2387 else /* ( ch_self == ch_peer ) */
2388 /* Well, then use something else. */
25703f83 2389 rv = test_bit(DISCARD_CONCURRENT, &mdev->tconn->flags)
b411b363
PR
2390 ? -1 : 1;
2391 break;
2392 case ASB_DISCARD_LOCAL:
2393 rv = -1;
2394 break;
2395 case ASB_DISCARD_REMOTE:
2396 rv = 1;
2397 }
2398
2399 return rv;
2400}
2401
2402static int drbd_asb_recover_1p(struct drbd_conf *mdev) __must_hold(local)
2403{
6184ea21 2404 int hg, rv = -100;
b411b363 2405
89e58e75 2406 switch (mdev->tconn->net_conf->after_sb_1p) {
b411b363
PR
2407 case ASB_DISCARD_YOUNGER_PRI:
2408 case ASB_DISCARD_OLDER_PRI:
2409 case ASB_DISCARD_LEAST_CHG:
2410 case ASB_DISCARD_LOCAL:
2411 case ASB_DISCARD_REMOTE:
2412 dev_err(DEV, "Configuration error.\n");
2413 break;
2414 case ASB_DISCONNECT:
2415 break;
2416 case ASB_CONSENSUS:
2417 hg = drbd_asb_recover_0p(mdev);
2418 if (hg == -1 && mdev->state.role == R_SECONDARY)
2419 rv = hg;
2420 if (hg == 1 && mdev->state.role == R_PRIMARY)
2421 rv = hg;
2422 break;
2423 case ASB_VIOLENTLY:
2424 rv = drbd_asb_recover_0p(mdev);
2425 break;
2426 case ASB_DISCARD_SECONDARY:
2427 return mdev->state.role == R_PRIMARY ? 1 : -1;
2428 case ASB_CALL_HELPER:
2429 hg = drbd_asb_recover_0p(mdev);
2430 if (hg == -1 && mdev->state.role == R_PRIMARY) {
bb437946
AG
2431 enum drbd_state_rv rv2;
2432
2433 drbd_set_role(mdev, R_SECONDARY, 0);
b411b363
PR
2434 /* drbd_change_state() does not sleep while in SS_IN_TRANSIENT_STATE,
2435 * we might be here in C_WF_REPORT_PARAMS which is transient.
2436 * we do not need to wait for the after state change work either. */
bb437946
AG
2437 rv2 = drbd_change_state(mdev, CS_VERBOSE, NS(role, R_SECONDARY));
2438 if (rv2 != SS_SUCCESS) {
b411b363
PR
2439 drbd_khelper(mdev, "pri-lost-after-sb");
2440 } else {
2441 dev_warn(DEV, "Successfully gave up primary role.\n");
2442 rv = hg;
2443 }
2444 } else
2445 rv = hg;
2446 }
2447
2448 return rv;
2449}
2450
2451static int drbd_asb_recover_2p(struct drbd_conf *mdev) __must_hold(local)
2452{
6184ea21 2453 int hg, rv = -100;
b411b363 2454
89e58e75 2455 switch (mdev->tconn->net_conf->after_sb_2p) {
b411b363
PR
2456 case ASB_DISCARD_YOUNGER_PRI:
2457 case ASB_DISCARD_OLDER_PRI:
2458 case ASB_DISCARD_LEAST_CHG:
2459 case ASB_DISCARD_LOCAL:
2460 case ASB_DISCARD_REMOTE:
2461 case ASB_CONSENSUS:
2462 case ASB_DISCARD_SECONDARY:
2463 dev_err(DEV, "Configuration error.\n");
2464 break;
2465 case ASB_VIOLENTLY:
2466 rv = drbd_asb_recover_0p(mdev);
2467 break;
2468 case ASB_DISCONNECT:
2469 break;
2470 case ASB_CALL_HELPER:
2471 hg = drbd_asb_recover_0p(mdev);
2472 if (hg == -1) {
bb437946
AG
2473 enum drbd_state_rv rv2;
2474
b411b363
PR
2475 /* drbd_change_state() does not sleep while in SS_IN_TRANSIENT_STATE,
2476 * we might be here in C_WF_REPORT_PARAMS which is transient.
2477 * we do not need to wait for the after state change work either. */
bb437946
AG
2478 rv2 = drbd_change_state(mdev, CS_VERBOSE, NS(role, R_SECONDARY));
2479 if (rv2 != SS_SUCCESS) {
b411b363
PR
2480 drbd_khelper(mdev, "pri-lost-after-sb");
2481 } else {
2482 dev_warn(DEV, "Successfully gave up primary role.\n");
2483 rv = hg;
2484 }
2485 } else
2486 rv = hg;
2487 }
2488
2489 return rv;
2490}
2491
2492static void drbd_uuid_dump(struct drbd_conf *mdev, char *text, u64 *uuid,
2493 u64 bits, u64 flags)
2494{
2495 if (!uuid) {
2496 dev_info(DEV, "%s uuid info vanished while I was looking!\n", text);
2497 return;
2498 }
2499 dev_info(DEV, "%s %016llX:%016llX:%016llX:%016llX bits:%llu flags:%llX\n",
2500 text,
2501 (unsigned long long)uuid[UI_CURRENT],
2502 (unsigned long long)uuid[UI_BITMAP],
2503 (unsigned long long)uuid[UI_HISTORY_START],
2504 (unsigned long long)uuid[UI_HISTORY_END],
2505 (unsigned long long)bits,
2506 (unsigned long long)flags);
2507}
2508
2509/*
2510 100 after split brain try auto recover
2511 2 C_SYNC_SOURCE set BitMap
2512 1 C_SYNC_SOURCE use BitMap
2513 0 no Sync
2514 -1 C_SYNC_TARGET use BitMap
2515 -2 C_SYNC_TARGET set BitMap
2516 -100 after split brain, disconnect
2517-1000 unrelated data
4a23f264
PR
2518-1091 requires proto 91
2519-1096 requires proto 96
b411b363
PR
2520 */
2521static int drbd_uuid_compare(struct drbd_conf *mdev, int *rule_nr) __must_hold(local)
2522{
2523 u64 self, peer;
2524 int i, j;
2525
2526 self = mdev->ldev->md.uuid[UI_CURRENT] & ~((u64)1);
2527 peer = mdev->p_uuid[UI_CURRENT] & ~((u64)1);
2528
2529 *rule_nr = 10;
2530 if (self == UUID_JUST_CREATED && peer == UUID_JUST_CREATED)
2531 return 0;
2532
2533 *rule_nr = 20;
2534 if ((self == UUID_JUST_CREATED || self == (u64)0) &&
2535 peer != UUID_JUST_CREATED)
2536 return -2;
2537
2538 *rule_nr = 30;
2539 if (self != UUID_JUST_CREATED &&
2540 (peer == UUID_JUST_CREATED || peer == (u64)0))
2541 return 2;
2542
2543 if (self == peer) {
2544 int rct, dc; /* roles at crash time */
2545
2546 if (mdev->p_uuid[UI_BITMAP] == (u64)0 && mdev->ldev->md.uuid[UI_BITMAP] != (u64)0) {
2547
31890f4a 2548 if (mdev->tconn->agreed_pro_version < 91)
4a23f264 2549 return -1091;
b411b363
PR
2550
2551 if ((mdev->ldev->md.uuid[UI_BITMAP] & ~((u64)1)) == (mdev->p_uuid[UI_HISTORY_START] & ~((u64)1)) &&
2552 (mdev->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) == (mdev->p_uuid[UI_HISTORY_START + 1] & ~((u64)1))) {
2553 dev_info(DEV, "was SyncSource, missed the resync finished event, corrected myself:\n");
2554 drbd_uuid_set_bm(mdev, 0UL);
2555
2556 drbd_uuid_dump(mdev, "self", mdev->ldev->md.uuid,
2557 mdev->state.disk >= D_NEGOTIATING ? drbd_bm_total_weight(mdev) : 0, 0);
2558 *rule_nr = 34;
2559 } else {
2560 dev_info(DEV, "was SyncSource (peer failed to write sync_uuid)\n");
2561 *rule_nr = 36;
2562 }
2563
2564 return 1;
2565 }
2566
2567 if (mdev->ldev->md.uuid[UI_BITMAP] == (u64)0 && mdev->p_uuid[UI_BITMAP] != (u64)0) {
2568
31890f4a 2569 if (mdev->tconn->agreed_pro_version < 91)
4a23f264 2570 return -1091;
b411b363
PR
2571
2572 if ((mdev->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) == (mdev->p_uuid[UI_BITMAP] & ~((u64)1)) &&
2573 (mdev->ldev->md.uuid[UI_HISTORY_START + 1] & ~((u64)1)) == (mdev->p_uuid[UI_HISTORY_START] & ~((u64)1))) {
2574 dev_info(DEV, "was SyncTarget, peer missed the resync finished event, corrected peer:\n");
2575
2576 mdev->p_uuid[UI_HISTORY_START + 1] = mdev->p_uuid[UI_HISTORY_START];
2577 mdev->p_uuid[UI_HISTORY_START] = mdev->p_uuid[UI_BITMAP];
2578 mdev->p_uuid[UI_BITMAP] = 0UL;
2579
2580 drbd_uuid_dump(mdev, "peer", mdev->p_uuid, mdev->p_uuid[UI_SIZE], mdev->p_uuid[UI_FLAGS]);
2581 *rule_nr = 35;
2582 } else {
2583 dev_info(DEV, "was SyncTarget (failed to write sync_uuid)\n");
2584 *rule_nr = 37;
2585 }
2586
2587 return -1;
2588 }
2589
2590 /* Common power [off|failure] */
2591 rct = (test_bit(CRASHED_PRIMARY, &mdev->flags) ? 1 : 0) +
2592 (mdev->p_uuid[UI_FLAGS] & 2);
2593 /* lowest bit is set when we were primary,
2594 * next bit (weight 2) is set when peer was primary */
2595 *rule_nr = 40;
2596
2597 switch (rct) {
2598 case 0: /* !self_pri && !peer_pri */ return 0;
2599 case 1: /* self_pri && !peer_pri */ return 1;
2600 case 2: /* !self_pri && peer_pri */ return -1;
2601 case 3: /* self_pri && peer_pri */
25703f83 2602 dc = test_bit(DISCARD_CONCURRENT, &mdev->tconn->flags);
b411b363
PR
2603 return dc ? -1 : 1;
2604 }
2605 }
2606
2607 *rule_nr = 50;
2608 peer = mdev->p_uuid[UI_BITMAP] & ~((u64)1);
2609 if (self == peer)
2610 return -1;
2611
2612 *rule_nr = 51;
2613 peer = mdev->p_uuid[UI_HISTORY_START] & ~((u64)1);
2614 if (self == peer) {
31890f4a 2615 if (mdev->tconn->agreed_pro_version < 96 ?
4a23f264
PR
2616 (mdev->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) ==
2617 (mdev->p_uuid[UI_HISTORY_START + 1] & ~((u64)1)) :
2618 peer + UUID_NEW_BM_OFFSET == (mdev->p_uuid[UI_BITMAP] & ~((u64)1))) {
b411b363
PR
2619 /* The last P_SYNC_UUID did not get though. Undo the last start of
2620 resync as sync source modifications of the peer's UUIDs. */
2621
31890f4a 2622 if (mdev->tconn->agreed_pro_version < 91)
4a23f264 2623 return -1091;
b411b363
PR
2624
2625 mdev->p_uuid[UI_BITMAP] = mdev->p_uuid[UI_HISTORY_START];
2626 mdev->p_uuid[UI_HISTORY_START] = mdev->p_uuid[UI_HISTORY_START + 1];
4a23f264
PR
2627
2628 dev_info(DEV, "Did not got last syncUUID packet, corrected:\n");
2629 drbd_uuid_dump(mdev, "peer", mdev->p_uuid, mdev->p_uuid[UI_SIZE], mdev->p_uuid[UI_FLAGS]);
2630
b411b363
PR
2631 return -1;
2632 }
2633 }
2634
2635 *rule_nr = 60;
2636 self = mdev->ldev->md.uuid[UI_CURRENT] & ~((u64)1);
2637 for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
2638 peer = mdev->p_uuid[i] & ~((u64)1);
2639 if (self == peer)
2640 return -2;
2641 }
2642
2643 *rule_nr = 70;
2644 self = mdev->ldev->md.uuid[UI_BITMAP] & ~((u64)1);
2645 peer = mdev->p_uuid[UI_CURRENT] & ~((u64)1);
2646 if (self == peer)
2647 return 1;
2648
2649 *rule_nr = 71;
2650 self = mdev->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1);
2651 if (self == peer) {
31890f4a 2652 if (mdev->tconn->agreed_pro_version < 96 ?
4a23f264
PR
2653 (mdev->ldev->md.uuid[UI_HISTORY_START + 1] & ~((u64)1)) ==
2654 (mdev->p_uuid[UI_HISTORY_START] & ~((u64)1)) :
2655 self + UUID_NEW_BM_OFFSET == (mdev->ldev->md.uuid[UI_BITMAP] & ~((u64)1))) {
b411b363
PR
2656 /* The last P_SYNC_UUID did not get though. Undo the last start of
2657 resync as sync source modifications of our UUIDs. */
2658
31890f4a 2659 if (mdev->tconn->agreed_pro_version < 91)
4a23f264 2660 return -1091;
b411b363
PR
2661
2662 _drbd_uuid_set(mdev, UI_BITMAP, mdev->ldev->md.uuid[UI_HISTORY_START]);
2663 _drbd_uuid_set(mdev, UI_HISTORY_START, mdev->ldev->md.uuid[UI_HISTORY_START + 1]);
2664
4a23f264 2665 dev_info(DEV, "Last syncUUID did not get through, corrected:\n");
b411b363
PR
2666 drbd_uuid_dump(mdev, "self", mdev->ldev->md.uuid,
2667 mdev->state.disk >= D_NEGOTIATING ? drbd_bm_total_weight(mdev) : 0, 0);
2668
2669 return 1;
2670 }
2671 }
2672
2673
2674 *rule_nr = 80;
d8c2a36b 2675 peer = mdev->p_uuid[UI_CURRENT] & ~((u64)1);
b411b363
PR
2676 for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
2677 self = mdev->ldev->md.uuid[i] & ~((u64)1);
2678 if (self == peer)
2679 return 2;
2680 }
2681
2682 *rule_nr = 90;
2683 self = mdev->ldev->md.uuid[UI_BITMAP] & ~((u64)1);
2684 peer = mdev->p_uuid[UI_BITMAP] & ~((u64)1);
2685 if (self == peer && self != ((u64)0))
2686 return 100;
2687
2688 *rule_nr = 100;
2689 for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
2690 self = mdev->ldev->md.uuid[i] & ~((u64)1);
2691 for (j = UI_HISTORY_START; j <= UI_HISTORY_END; j++) {
2692 peer = mdev->p_uuid[j] & ~((u64)1);
2693 if (self == peer)
2694 return -100;
2695 }
2696 }
2697
2698 return -1000;
2699}
2700
2701/* drbd_sync_handshake() returns the new conn state on success, or
2702 CONN_MASK (-1) on failure.
2703 */
2704static enum drbd_conns drbd_sync_handshake(struct drbd_conf *mdev, enum drbd_role peer_role,
2705 enum drbd_disk_state peer_disk) __must_hold(local)
2706{
2707 int hg, rule_nr;
2708 enum drbd_conns rv = C_MASK;
2709 enum drbd_disk_state mydisk;
2710
2711 mydisk = mdev->state.disk;
2712 if (mydisk == D_NEGOTIATING)
2713 mydisk = mdev->new_state_tmp.disk;
2714
2715 dev_info(DEV, "drbd_sync_handshake:\n");
2716 drbd_uuid_dump(mdev, "self", mdev->ldev->md.uuid, mdev->comm_bm_set, 0);
2717 drbd_uuid_dump(mdev, "peer", mdev->p_uuid,
2718 mdev->p_uuid[UI_SIZE], mdev->p_uuid[UI_FLAGS]);
2719
2720 hg = drbd_uuid_compare(mdev, &rule_nr);
2721
2722 dev_info(DEV, "uuid_compare()=%d by rule %d\n", hg, rule_nr);
2723
2724 if (hg == -1000) {
2725 dev_alert(DEV, "Unrelated data, aborting!\n");
2726 return C_MASK;
2727 }
4a23f264
PR
2728 if (hg < -1000) {
2729 dev_alert(DEV, "To resolve this both sides have to support at least protocol %d\n", -hg - 1000);
b411b363
PR
2730 return C_MASK;
2731 }
2732
2733 if ((mydisk == D_INCONSISTENT && peer_disk > D_INCONSISTENT) ||
2734 (peer_disk == D_INCONSISTENT && mydisk > D_INCONSISTENT)) {
2735 int f = (hg == -100) || abs(hg) == 2;
2736 hg = mydisk > D_INCONSISTENT ? 1 : -1;
2737 if (f)
2738 hg = hg*2;
2739 dev_info(DEV, "Becoming sync %s due to disk states.\n",
2740 hg > 0 ? "source" : "target");
2741 }
2742
3a11a487
AG
2743 if (abs(hg) == 100)
2744 drbd_khelper(mdev, "initial-split-brain");
2745
89e58e75 2746 if (hg == 100 || (hg == -100 && mdev->tconn->net_conf->always_asbp)) {
b411b363
PR
2747 int pcount = (mdev->state.role == R_PRIMARY)
2748 + (peer_role == R_PRIMARY);
2749 int forced = (hg == -100);
2750
2751 switch (pcount) {
2752 case 0:
2753 hg = drbd_asb_recover_0p(mdev);
2754 break;
2755 case 1:
2756 hg = drbd_asb_recover_1p(mdev);
2757 break;
2758 case 2:
2759 hg = drbd_asb_recover_2p(mdev);
2760 break;
2761 }
2762 if (abs(hg) < 100) {
2763 dev_warn(DEV, "Split-Brain detected, %d primaries, "
2764 "automatically solved. Sync from %s node\n",
2765 pcount, (hg < 0) ? "peer" : "this");
2766 if (forced) {
2767 dev_warn(DEV, "Doing a full sync, since"
2768 " UUIDs where ambiguous.\n");
2769 hg = hg*2;
2770 }
2771 }
2772 }
2773
2774 if (hg == -100) {
89e58e75 2775 if (mdev->tconn->net_conf->want_lose && !(mdev->p_uuid[UI_FLAGS]&1))
b411b363 2776 hg = -1;
89e58e75 2777 if (!mdev->tconn->net_conf->want_lose && (mdev->p_uuid[UI_FLAGS]&1))
b411b363
PR
2778 hg = 1;
2779
2780 if (abs(hg) < 100)
2781 dev_warn(DEV, "Split-Brain detected, manually solved. "
2782 "Sync from %s node\n",
2783 (hg < 0) ? "peer" : "this");
2784 }
2785
2786 if (hg == -100) {
580b9767
LE
2787 /* FIXME this log message is not correct if we end up here
2788 * after an attempted attach on a diskless node.
2789 * We just refuse to attach -- well, we drop the "connection"
2790 * to that disk, in a way... */
3a11a487 2791 dev_alert(DEV, "Split-Brain detected but unresolved, dropping connection!\n");
b411b363
PR
2792 drbd_khelper(mdev, "split-brain");
2793 return C_MASK;
2794 }
2795
2796 if (hg > 0 && mydisk <= D_INCONSISTENT) {
2797 dev_err(DEV, "I shall become SyncSource, but I am inconsistent!\n");
2798 return C_MASK;
2799 }
2800
2801 if (hg < 0 && /* by intention we do not use mydisk here. */
2802 mdev->state.role == R_PRIMARY && mdev->state.disk >= D_CONSISTENT) {
89e58e75 2803 switch (mdev->tconn->net_conf->rr_conflict) {
b411b363
PR
2804 case ASB_CALL_HELPER:
2805 drbd_khelper(mdev, "pri-lost");
2806 /* fall through */
2807 case ASB_DISCONNECT:
2808 dev_err(DEV, "I shall become SyncTarget, but I am primary!\n");
2809 return C_MASK;
2810 case ASB_VIOLENTLY:
2811 dev_warn(DEV, "Becoming SyncTarget, violating the stable-data"
2812 "assumption\n");
2813 }
2814 }
2815
8169e41b 2816 if (mdev->tconn->net_conf->dry_run || test_bit(CONN_DRY_RUN, &mdev->tconn->flags)) {
cf14c2e9
PR
2817 if (hg == 0)
2818 dev_info(DEV, "dry-run connect: No resync, would become Connected immediately.\n");
2819 else
2820 dev_info(DEV, "dry-run connect: Would become %s, doing a %s resync.",
2821 drbd_conn_str(hg > 0 ? C_SYNC_SOURCE : C_SYNC_TARGET),
2822 abs(hg) >= 2 ? "full" : "bit-map based");
2823 return C_MASK;
2824 }
2825
b411b363
PR
2826 if (abs(hg) >= 2) {
2827 dev_info(DEV, "Writing the whole bitmap, full sync required after drbd_sync_handshake.\n");
20ceb2b2
LE
2828 if (drbd_bitmap_io(mdev, &drbd_bmio_set_n_write, "set_n_write from sync_handshake",
2829 BM_LOCKED_SET_ALLOWED))
b411b363
PR
2830 return C_MASK;
2831 }
2832
2833 if (hg > 0) { /* become sync source. */
2834 rv = C_WF_BITMAP_S;
2835 } else if (hg < 0) { /* become sync target */
2836 rv = C_WF_BITMAP_T;
2837 } else {
2838 rv = C_CONNECTED;
2839 if (drbd_bm_total_weight(mdev)) {
2840 dev_info(DEV, "No resync, but %lu bits in bitmap!\n",
2841 drbd_bm_total_weight(mdev));
2842 }
2843 }
2844
2845 return rv;
2846}
2847
2848/* returns 1 if invalid */
2849static int cmp_after_sb(enum drbd_after_sb_p peer, enum drbd_after_sb_p self)
2850{
2851 /* ASB_DISCARD_REMOTE - ASB_DISCARD_LOCAL is valid */
2852 if ((peer == ASB_DISCARD_REMOTE && self == ASB_DISCARD_LOCAL) ||
2853 (self == ASB_DISCARD_REMOTE && peer == ASB_DISCARD_LOCAL))
2854 return 0;
2855
2856 /* any other things with ASB_DISCARD_REMOTE or ASB_DISCARD_LOCAL are invalid */
2857 if (peer == ASB_DISCARD_REMOTE || peer == ASB_DISCARD_LOCAL ||
2858 self == ASB_DISCARD_REMOTE || self == ASB_DISCARD_LOCAL)
2859 return 1;
2860
2861 /* everything else is valid if they are equal on both sides. */
2862 if (peer == self)
2863 return 0;
2864
2865 /* everything es is invalid. */
2866 return 1;
2867}
2868
7204624c 2869static int receive_protocol(struct drbd_tconn *tconn, enum drbd_packet cmd,
d8763023 2870 unsigned int data_size)
b411b363 2871{
7204624c 2872 struct p_protocol *p = &tconn->data.rbuf.protocol;
b411b363 2873 int p_proto, p_after_sb_0p, p_after_sb_1p, p_after_sb_2p;
cf14c2e9 2874 int p_want_lose, p_two_primaries, cf;
b411b363
PR
2875 char p_integrity_alg[SHARED_SECRET_MAX] = "";
2876
b411b363
PR
2877 p_proto = be32_to_cpu(p->protocol);
2878 p_after_sb_0p = be32_to_cpu(p->after_sb_0p);
2879 p_after_sb_1p = be32_to_cpu(p->after_sb_1p);
2880 p_after_sb_2p = be32_to_cpu(p->after_sb_2p);
b411b363 2881 p_two_primaries = be32_to_cpu(p->two_primaries);
cf14c2e9
PR
2882 cf = be32_to_cpu(p->conn_flags);
2883 p_want_lose = cf & CF_WANT_LOSE;
2884
7204624c 2885 clear_bit(CONN_DRY_RUN, &tconn->flags);
cf14c2e9
PR
2886
2887 if (cf & CF_DRY_RUN)
7204624c 2888 set_bit(CONN_DRY_RUN, &tconn->flags);
b411b363 2889
7204624c
PR
2890 if (p_proto != tconn->net_conf->wire_protocol) {
2891 conn_err(tconn, "incompatible communication protocols\n");
b411b363
PR
2892 goto disconnect;
2893 }
2894
7204624c
PR
2895 if (cmp_after_sb(p_after_sb_0p, tconn->net_conf->after_sb_0p)) {
2896 conn_err(tconn, "incompatible after-sb-0pri settings\n");
b411b363
PR
2897 goto disconnect;
2898 }
2899
7204624c
PR
2900 if (cmp_after_sb(p_after_sb_1p, tconn->net_conf->after_sb_1p)) {
2901 conn_err(tconn, "incompatible after-sb-1pri settings\n");
b411b363
PR
2902 goto disconnect;
2903 }
2904
7204624c
PR
2905 if (cmp_after_sb(p_after_sb_2p, tconn->net_conf->after_sb_2p)) {
2906 conn_err(tconn, "incompatible after-sb-2pri settings\n");
b411b363
PR
2907 goto disconnect;
2908 }
2909
7204624c
PR
2910 if (p_want_lose && tconn->net_conf->want_lose) {
2911 conn_err(tconn, "both sides have the 'want_lose' flag set\n");
b411b363
PR
2912 goto disconnect;
2913 }
2914
7204624c
PR
2915 if (p_two_primaries != tconn->net_conf->two_primaries) {
2916 conn_err(tconn, "incompatible setting of the two-primaries options\n");
b411b363
PR
2917 goto disconnect;
2918 }
2919
7204624c
PR
2920 if (tconn->agreed_pro_version >= 87) {
2921 unsigned char *my_alg = tconn->net_conf->integrity_alg;
b411b363 2922
7204624c 2923 if (drbd_recv(tconn, p_integrity_alg, data_size) != data_size)
81e84650 2924 return false;
b411b363
PR
2925
2926 p_integrity_alg[SHARED_SECRET_MAX-1] = 0;
2927 if (strcmp(p_integrity_alg, my_alg)) {
7204624c 2928 conn_err(tconn, "incompatible setting of the data-integrity-alg\n");
b411b363
PR
2929 goto disconnect;
2930 }
7204624c 2931 conn_info(tconn, "data-integrity-alg: %s\n",
b411b363
PR
2932 my_alg[0] ? my_alg : (unsigned char *)"<not-used>");
2933 }
2934
81e84650 2935 return true;
b411b363
PR
2936
2937disconnect:
7204624c 2938 conn_request_state(tconn, NS(conn, C_DISCONNECTING), CS_HARD);
81e84650 2939 return false;
b411b363
PR
2940}
2941
2942/* helper function
2943 * input: alg name, feature name
2944 * return: NULL (alg name was "")
2945 * ERR_PTR(error) if something goes wrong
2946 * or the crypto hash ptr, if it worked out ok. */
2947struct crypto_hash *drbd_crypto_alloc_digest_safe(const struct drbd_conf *mdev,
2948 const char *alg, const char *name)
2949{
2950 struct crypto_hash *tfm;
2951
2952 if (!alg[0])
2953 return NULL;
2954
2955 tfm = crypto_alloc_hash(alg, 0, CRYPTO_ALG_ASYNC);
2956 if (IS_ERR(tfm)) {
2957 dev_err(DEV, "Can not allocate \"%s\" as %s (reason: %ld)\n",
2958 alg, name, PTR_ERR(tfm));
2959 return tfm;
2960 }
2961 if (!drbd_crypto_is_hash(crypto_hash_tfm(tfm))) {
2962 crypto_free_hash(tfm);
2963 dev_err(DEV, "\"%s\" is not a digest (%s)\n", alg, name);
2964 return ERR_PTR(-EINVAL);
2965 }
2966 return tfm;
2967}
2968
d8763023
AG
2969static int receive_SyncParam(struct drbd_conf *mdev, enum drbd_packet cmd,
2970 unsigned int packet_size)
b411b363 2971{
81e84650 2972 int ok = true;
e42325a5 2973 struct p_rs_param_95 *p = &mdev->tconn->data.rbuf.rs_param_95;
b411b363
PR
2974 unsigned int header_size, data_size, exp_max_sz;
2975 struct crypto_hash *verify_tfm = NULL;
2976 struct crypto_hash *csums_tfm = NULL;
31890f4a 2977 const int apv = mdev->tconn->agreed_pro_version;
778f271d
PR
2978 int *rs_plan_s = NULL;
2979 int fifo_size = 0;
b411b363
PR
2980
2981 exp_max_sz = apv <= 87 ? sizeof(struct p_rs_param)
2982 : apv == 88 ? sizeof(struct p_rs_param)
2983 + SHARED_SECRET_MAX
8e26f9cc
PR
2984 : apv <= 94 ? sizeof(struct p_rs_param_89)
2985 : /* apv >= 95 */ sizeof(struct p_rs_param_95);
b411b363 2986
02918be2 2987 if (packet_size > exp_max_sz) {
b411b363 2988 dev_err(DEV, "SyncParam packet too long: received %u, expected <= %u bytes\n",
02918be2 2989 packet_size, exp_max_sz);
81e84650 2990 return false;
b411b363
PR
2991 }
2992
2993 if (apv <= 88) {
257d0af6 2994 header_size = sizeof(struct p_rs_param) - sizeof(struct p_header);
02918be2 2995 data_size = packet_size - header_size;
8e26f9cc 2996 } else if (apv <= 94) {
257d0af6 2997 header_size = sizeof(struct p_rs_param_89) - sizeof(struct p_header);
02918be2 2998 data_size = packet_size - header_size;
b411b363 2999 D_ASSERT(data_size == 0);
8e26f9cc 3000 } else {
257d0af6 3001 header_size = sizeof(struct p_rs_param_95) - sizeof(struct p_header);
02918be2 3002 data_size = packet_size - header_size;
b411b363
PR
3003 D_ASSERT(data_size == 0);
3004 }
3005
3006 /* initialize verify_alg and csums_alg */
3007 memset(p->verify_alg, 0, 2 * SHARED_SECRET_MAX);
3008
de0ff338 3009 if (drbd_recv(mdev->tconn, &p->head.payload, header_size) != header_size)
81e84650 3010 return false;
b411b363 3011
f399002e
LE
3012 if (get_ldev(mdev)) {
3013 mdev->ldev->dc.resync_rate = be32_to_cpu(p->rate);
3014 put_ldev(mdev);
3015 }
b411b363
PR
3016
3017 if (apv >= 88) {
3018 if (apv == 88) {
3019 if (data_size > SHARED_SECRET_MAX) {
3020 dev_err(DEV, "verify-alg too long, "
3021 "peer wants %u, accepting only %u byte\n",
3022 data_size, SHARED_SECRET_MAX);
81e84650 3023 return false;
b411b363
PR
3024 }
3025
de0ff338 3026 if (drbd_recv(mdev->tconn, p->verify_alg, data_size) != data_size)
81e84650 3027 return false;
b411b363
PR
3028
3029 /* we expect NUL terminated string */
3030 /* but just in case someone tries to be evil */
3031 D_ASSERT(p->verify_alg[data_size-1] == 0);
3032 p->verify_alg[data_size-1] = 0;
3033
3034 } else /* apv >= 89 */ {
3035 /* we still expect NUL terminated strings */
3036 /* but just in case someone tries to be evil */
3037 D_ASSERT(p->verify_alg[SHARED_SECRET_MAX-1] == 0);
3038 D_ASSERT(p->csums_alg[SHARED_SECRET_MAX-1] == 0);
3039 p->verify_alg[SHARED_SECRET_MAX-1] = 0;
3040 p->csums_alg[SHARED_SECRET_MAX-1] = 0;
3041 }
3042
f399002e 3043 if (strcmp(mdev->tconn->net_conf->verify_alg, p->verify_alg)) {
b411b363
PR
3044 if (mdev->state.conn == C_WF_REPORT_PARAMS) {
3045 dev_err(DEV, "Different verify-alg settings. me=\"%s\" peer=\"%s\"\n",
f399002e 3046 mdev->tconn->net_conf->verify_alg, p->verify_alg);
b411b363
PR
3047 goto disconnect;
3048 }
3049 verify_tfm = drbd_crypto_alloc_digest_safe(mdev,
3050 p->verify_alg, "verify-alg");
3051 if (IS_ERR(verify_tfm)) {
3052 verify_tfm = NULL;
3053 goto disconnect;
3054 }
3055 }
3056
f399002e 3057 if (apv >= 89 && strcmp(mdev->tconn->net_conf->csums_alg, p->csums_alg)) {
b411b363
PR
3058 if (mdev->state.conn == C_WF_REPORT_PARAMS) {
3059 dev_err(DEV, "Different csums-alg settings. me=\"%s\" peer=\"%s\"\n",
f399002e 3060 mdev->tconn->net_conf->csums_alg, p->csums_alg);
b411b363
PR
3061 goto disconnect;
3062 }
3063 csums_tfm = drbd_crypto_alloc_digest_safe(mdev,
3064 p->csums_alg, "csums-alg");
3065 if (IS_ERR(csums_tfm)) {
3066 csums_tfm = NULL;
3067 goto disconnect;
3068 }
3069 }
3070
f399002e
LE
3071 if (apv > 94 && get_ldev(mdev)) {
3072 mdev->ldev->dc.resync_rate = be32_to_cpu(p->rate);
3073 mdev->ldev->dc.c_plan_ahead = be32_to_cpu(p->c_plan_ahead);
3074 mdev->ldev->dc.c_delay_target = be32_to_cpu(p->c_delay_target);
3075 mdev->ldev->dc.c_fill_target = be32_to_cpu(p->c_fill_target);
3076 mdev->ldev->dc.c_max_rate = be32_to_cpu(p->c_max_rate);
778f271d 3077
f399002e 3078 fifo_size = (mdev->ldev->dc.c_plan_ahead * 10 * SLEEP_TIME) / HZ;
778f271d
PR
3079 if (fifo_size != mdev->rs_plan_s.size && fifo_size > 0) {
3080 rs_plan_s = kzalloc(sizeof(int) * fifo_size, GFP_KERNEL);
3081 if (!rs_plan_s) {
3082 dev_err(DEV, "kmalloc of fifo_buffer failed");
f399002e 3083 put_ldev(mdev);
778f271d
PR
3084 goto disconnect;
3085 }
3086 }
f399002e 3087 put_ldev(mdev);
8e26f9cc 3088 }
b411b363
PR
3089
3090 spin_lock(&mdev->peer_seq_lock);
3091 /* lock against drbd_nl_syncer_conf() */
3092 if (verify_tfm) {
f399002e
LE
3093 strcpy(mdev->tconn->net_conf->verify_alg, p->verify_alg);
3094 mdev->tconn->net_conf->verify_alg_len = strlen(p->verify_alg) + 1;
3095 crypto_free_hash(mdev->tconn->verify_tfm);
3096 mdev->tconn->verify_tfm = verify_tfm;
b411b363
PR
3097 dev_info(DEV, "using verify-alg: \"%s\"\n", p->verify_alg);
3098 }
3099 if (csums_tfm) {
f399002e
LE
3100 strcpy(mdev->tconn->net_conf->csums_alg, p->csums_alg);
3101 mdev->tconn->net_conf->csums_alg_len = strlen(p->csums_alg) + 1;
3102 crypto_free_hash(mdev->tconn->csums_tfm);
3103 mdev->tconn->csums_tfm = csums_tfm;
b411b363
PR
3104 dev_info(DEV, "using csums-alg: \"%s\"\n", p->csums_alg);
3105 }
778f271d
PR
3106 if (fifo_size != mdev->rs_plan_s.size) {
3107 kfree(mdev->rs_plan_s.values);
3108 mdev->rs_plan_s.values = rs_plan_s;
3109 mdev->rs_plan_s.size = fifo_size;
3110 mdev->rs_planed = 0;
3111 }
b411b363
PR
3112 spin_unlock(&mdev->peer_seq_lock);
3113 }
3114
3115 return ok;
3116disconnect:
3117 /* just for completeness: actually not needed,
3118 * as this is not reached if csums_tfm was ok. */
3119 crypto_free_hash(csums_tfm);
3120 /* but free the verify_tfm again, if csums_tfm did not work out */
3121 crypto_free_hash(verify_tfm);
38fa9988 3122 conn_request_state(mdev->tconn, NS(conn, C_DISCONNECTING), CS_HARD);
81e84650 3123 return false;
b411b363
PR
3124}
3125
b411b363
PR
3126/* warn if the arguments differ by more than 12.5% */
3127static void warn_if_differ_considerably(struct drbd_conf *mdev,
3128 const char *s, sector_t a, sector_t b)
3129{
3130 sector_t d;
3131 if (a == 0 || b == 0)
3132 return;
3133 d = (a > b) ? (a - b) : (b - a);
3134 if (d > (a>>3) || d > (b>>3))
3135 dev_warn(DEV, "Considerable difference in %s: %llus vs. %llus\n", s,
3136 (unsigned long long)a, (unsigned long long)b);
3137}
3138
d8763023
AG
3139static int receive_sizes(struct drbd_conf *mdev, enum drbd_packet cmd,
3140 unsigned int data_size)
b411b363 3141{
e42325a5 3142 struct p_sizes *p = &mdev->tconn->data.rbuf.sizes;
b411b363 3143 enum determine_dev_size dd = unchanged;
b411b363
PR
3144 sector_t p_size, p_usize, my_usize;
3145 int ldsc = 0; /* local disk size changed */
e89b591c 3146 enum dds_flags ddsf;
b411b363 3147
b411b363
PR
3148 p_size = be64_to_cpu(p->d_size);
3149 p_usize = be64_to_cpu(p->u_size);
3150
b411b363
PR
3151 /* just store the peer's disk size for now.
3152 * we still need to figure out whether we accept that. */
3153 mdev->p_size = p_size;
3154
b411b363
PR
3155 if (get_ldev(mdev)) {
3156 warn_if_differ_considerably(mdev, "lower level device sizes",
3157 p_size, drbd_get_max_capacity(mdev->ldev));
3158 warn_if_differ_considerably(mdev, "user requested size",
3159 p_usize, mdev->ldev->dc.disk_size);
3160
3161 /* if this is the first connect, or an otherwise expected
3162 * param exchange, choose the minimum */
3163 if (mdev->state.conn == C_WF_REPORT_PARAMS)
3164 p_usize = min_not_zero((sector_t)mdev->ldev->dc.disk_size,
3165 p_usize);
3166
3167 my_usize = mdev->ldev->dc.disk_size;
3168
3169 if (mdev->ldev->dc.disk_size != p_usize) {
3170 mdev->ldev->dc.disk_size = p_usize;
3171 dev_info(DEV, "Peer sets u_size to %lu sectors\n",
3172 (unsigned long)mdev->ldev->dc.disk_size);
3173 }
3174
3175 /* Never shrink a device with usable data during connect.
3176 But allow online shrinking if we are connected. */
a393db6f 3177 if (drbd_new_dev_size(mdev, mdev->ldev, 0) <
b411b363
PR
3178 drbd_get_capacity(mdev->this_bdev) &&
3179 mdev->state.disk >= D_OUTDATED &&
3180 mdev->state.conn < C_CONNECTED) {
3181 dev_err(DEV, "The peer's disk size is too small!\n");
38fa9988 3182 conn_request_state(mdev->tconn, NS(conn, C_DISCONNECTING), CS_HARD);
b411b363
PR
3183 mdev->ldev->dc.disk_size = my_usize;
3184 put_ldev(mdev);
81e84650 3185 return false;
b411b363
PR
3186 }
3187 put_ldev(mdev);
3188 }
b411b363 3189
e89b591c 3190 ddsf = be16_to_cpu(p->dds_flags);
b411b363 3191 if (get_ldev(mdev)) {
24c4830c 3192 dd = drbd_determine_dev_size(mdev, ddsf);
b411b363
PR
3193 put_ldev(mdev);
3194 if (dd == dev_size_error)
81e84650 3195 return false;
b411b363
PR
3196 drbd_md_sync(mdev);
3197 } else {
3198 /* I am diskless, need to accept the peer's size. */
3199 drbd_set_my_capacity(mdev, p_size);
3200 }
3201
99432fcc
PR
3202 mdev->peer_max_bio_size = be32_to_cpu(p->max_bio_size);
3203 drbd_reconsider_max_bio_size(mdev);
3204
b411b363
PR
3205 if (get_ldev(mdev)) {
3206 if (mdev->ldev->known_size != drbd_get_capacity(mdev->ldev->backing_bdev)) {
3207 mdev->ldev->known_size = drbd_get_capacity(mdev->ldev->backing_bdev);
3208 ldsc = 1;
3209 }
3210
b411b363
PR
3211 put_ldev(mdev);
3212 }
3213
3214 if (mdev->state.conn > C_WF_REPORT_PARAMS) {
3215 if (be64_to_cpu(p->c_size) !=
3216 drbd_get_capacity(mdev->this_bdev) || ldsc) {
3217 /* we have different sizes, probably peer
3218 * needs to know my new size... */
e89b591c 3219 drbd_send_sizes(mdev, 0, ddsf);
b411b363
PR
3220 }
3221 if (test_and_clear_bit(RESIZE_PENDING, &mdev->flags) ||
3222 (dd == grew && mdev->state.conn == C_CONNECTED)) {
3223 if (mdev->state.pdsk >= D_INCONSISTENT &&
e89b591c
PR
3224 mdev->state.disk >= D_INCONSISTENT) {
3225 if (ddsf & DDSF_NO_RESYNC)
3226 dev_info(DEV, "Resync of new storage suppressed with --assume-clean\n");
3227 else
3228 resync_after_online_grow(mdev);
3229 } else
b411b363
PR
3230 set_bit(RESYNC_AFTER_NEG, &mdev->flags);
3231 }
3232 }
3233
81e84650 3234 return true;
b411b363
PR
3235}
3236
d8763023
AG
3237static int receive_uuids(struct drbd_conf *mdev, enum drbd_packet cmd,
3238 unsigned int data_size)
b411b363 3239{
e42325a5 3240 struct p_uuids *p = &mdev->tconn->data.rbuf.uuids;
b411b363 3241 u64 *p_uuid;
62b0da3a 3242 int i, updated_uuids = 0;
b411b363 3243
b411b363
PR
3244 p_uuid = kmalloc(sizeof(u64)*UI_EXTENDED_SIZE, GFP_NOIO);
3245
3246 for (i = UI_CURRENT; i < UI_EXTENDED_SIZE; i++)
3247 p_uuid[i] = be64_to_cpu(p->uuid[i]);
3248
3249 kfree(mdev->p_uuid);
3250 mdev->p_uuid = p_uuid;
3251
3252 if (mdev->state.conn < C_CONNECTED &&
3253 mdev->state.disk < D_INCONSISTENT &&
3254 mdev->state.role == R_PRIMARY &&
3255 (mdev->ed_uuid & ~((u64)1)) != (p_uuid[UI_CURRENT] & ~((u64)1))) {
3256 dev_err(DEV, "Can only connect to data with current UUID=%016llX\n",
3257 (unsigned long long)mdev->ed_uuid);
38fa9988 3258 conn_request_state(mdev->tconn, NS(conn, C_DISCONNECTING), CS_HARD);
81e84650 3259 return false;
b411b363
PR
3260 }
3261
3262 if (get_ldev(mdev)) {
3263 int skip_initial_sync =
3264 mdev->state.conn == C_CONNECTED &&
31890f4a 3265 mdev->tconn->agreed_pro_version >= 90 &&
b411b363
PR
3266 mdev->ldev->md.uuid[UI_CURRENT] == UUID_JUST_CREATED &&
3267 (p_uuid[UI_FLAGS] & 8);
3268 if (skip_initial_sync) {
3269 dev_info(DEV, "Accepted new current UUID, preparing to skip initial sync\n");
3270 drbd_bitmap_io(mdev, &drbd_bmio_clear_n_write,
20ceb2b2
LE
3271 "clear_n_write from receive_uuids",
3272 BM_LOCKED_TEST_ALLOWED);
b411b363
PR
3273 _drbd_uuid_set(mdev, UI_CURRENT, p_uuid[UI_CURRENT]);
3274 _drbd_uuid_set(mdev, UI_BITMAP, 0);
3275 _drbd_set_state(_NS2(mdev, disk, D_UP_TO_DATE, pdsk, D_UP_TO_DATE),
3276 CS_VERBOSE, NULL);
3277 drbd_md_sync(mdev);
62b0da3a 3278 updated_uuids = 1;
b411b363
PR
3279 }
3280 put_ldev(mdev);
18a50fa2
PR
3281 } else if (mdev->state.disk < D_INCONSISTENT &&
3282 mdev->state.role == R_PRIMARY) {
3283 /* I am a diskless primary, the peer just created a new current UUID
3284 for me. */
62b0da3a 3285 updated_uuids = drbd_set_ed_uuid(mdev, p_uuid[UI_CURRENT]);
b411b363
PR
3286 }
3287
3288 /* Before we test for the disk state, we should wait until an eventually
3289 ongoing cluster wide state change is finished. That is important if
3290 we are primary and are detaching from our disk. We need to see the
3291 new disk state... */
8410da8f
PR
3292 mutex_lock(mdev->state_mutex);
3293 mutex_unlock(mdev->state_mutex);
b411b363 3294 if (mdev->state.conn >= C_CONNECTED && mdev->state.disk < D_INCONSISTENT)
62b0da3a
LE
3295 updated_uuids |= drbd_set_ed_uuid(mdev, p_uuid[UI_CURRENT]);
3296
3297 if (updated_uuids)
3298 drbd_print_uuids(mdev, "receiver updated UUIDs to");
b411b363 3299
81e84650 3300 return true;
b411b363
PR
3301}
3302
3303/**
3304 * convert_state() - Converts the peer's view of the cluster state to our point of view
3305 * @ps: The state as seen by the peer.
3306 */
3307static union drbd_state convert_state(union drbd_state ps)
3308{
3309 union drbd_state ms;
3310
3311 static enum drbd_conns c_tab[] = {
3312 [C_CONNECTED] = C_CONNECTED,
3313
3314 [C_STARTING_SYNC_S] = C_STARTING_SYNC_T,
3315 [C_STARTING_SYNC_T] = C_STARTING_SYNC_S,
3316 [C_DISCONNECTING] = C_TEAR_DOWN, /* C_NETWORK_FAILURE, */
3317 [C_VERIFY_S] = C_VERIFY_T,
3318 [C_MASK] = C_MASK,
3319 };
3320
3321 ms.i = ps.i;
3322
3323 ms.conn = c_tab[ps.conn];
3324 ms.peer = ps.role;
3325 ms.role = ps.peer;
3326 ms.pdsk = ps.disk;
3327 ms.disk = ps.pdsk;
3328 ms.peer_isp = (ps.aftr_isp | ps.user_isp);
3329
3330 return ms;
3331}
3332
d8763023
AG
3333static int receive_req_state(struct drbd_conf *mdev, enum drbd_packet cmd,
3334 unsigned int data_size)
b411b363 3335{
e42325a5 3336 struct p_req_state *p = &mdev->tconn->data.rbuf.req_state;
b411b363 3337 union drbd_state mask, val;
bf885f8a 3338 enum drbd_state_rv rv;
b411b363 3339
b411b363
PR
3340 mask.i = be32_to_cpu(p->mask);
3341 val.i = be32_to_cpu(p->val);
3342
25703f83 3343 if (test_bit(DISCARD_CONCURRENT, &mdev->tconn->flags) &&
8410da8f 3344 mutex_is_locked(mdev->state_mutex)) {
b411b363 3345 drbd_send_sr_reply(mdev, SS_CONCURRENT_ST_CHG);
81e84650 3346 return true;
b411b363
PR
3347 }
3348
3349 mask = convert_state(mask);
3350 val = convert_state(val);
3351
dfafcc8a
PR
3352 rv = drbd_change_state(mdev, CS_VERBOSE, mask, val);
3353 drbd_send_sr_reply(mdev, rv);
b411b363 3354
b411b363
PR
3355 drbd_md_sync(mdev);
3356
81e84650 3357 return true;
b411b363
PR
3358}
3359
dfafcc8a
PR
3360static int receive_req_conn_state(struct drbd_tconn *tconn, enum drbd_packet cmd,
3361 unsigned int data_size)
3362{
3363 struct p_req_state *p = &tconn->data.rbuf.req_state;
3364 union drbd_state mask, val;
3365 enum drbd_state_rv rv;
3366
3367 mask.i = be32_to_cpu(p->mask);
3368 val.i = be32_to_cpu(p->val);
3369
3370 if (test_bit(DISCARD_CONCURRENT, &tconn->flags) &&
3371 mutex_is_locked(&tconn->cstate_mutex)) {
3372 conn_send_sr_reply(tconn, SS_CONCURRENT_ST_CHG);
3373 return true;
3374 }
3375
3376 mask = convert_state(mask);
3377 val = convert_state(val);
3378
3379 rv = conn_request_state(tconn, mask, val, CS_VERBOSE | CS_LOCAL_ONLY);
3380 conn_send_sr_reply(tconn, rv);
3381
3382 return true;
3383}
3384
d8763023
AG
3385static int receive_state(struct drbd_conf *mdev, enum drbd_packet cmd,
3386 unsigned int data_size)
b411b363 3387{
e42325a5 3388 struct p_state *p = &mdev->tconn->data.rbuf.state;
4ac4aada 3389 union drbd_state os, ns, peer_state;
b411b363 3390 enum drbd_disk_state real_peer_disk;
65d922c3 3391 enum chg_state_flags cs_flags;
b411b363
PR
3392 int rv;
3393
b411b363
PR
3394 peer_state.i = be32_to_cpu(p->state);
3395
3396 real_peer_disk = peer_state.disk;
3397 if (peer_state.disk == D_NEGOTIATING) {
3398 real_peer_disk = mdev->p_uuid[UI_FLAGS] & 4 ? D_INCONSISTENT : D_CONSISTENT;
3399 dev_info(DEV, "real peer disk state = %s\n", drbd_disk_str(real_peer_disk));
3400 }
3401
87eeee41 3402 spin_lock_irq(&mdev->tconn->req_lock);
b411b363 3403 retry:
4ac4aada 3404 os = ns = mdev->state;
87eeee41 3405 spin_unlock_irq(&mdev->tconn->req_lock);
b411b363 3406
e9ef7bb6
LE
3407 /* peer says his disk is uptodate, while we think it is inconsistent,
3408 * and this happens while we think we have a sync going on. */
3409 if (os.pdsk == D_INCONSISTENT && real_peer_disk == D_UP_TO_DATE &&
3410 os.conn > C_CONNECTED && os.disk == D_UP_TO_DATE) {
3411 /* If we are (becoming) SyncSource, but peer is still in sync
3412 * preparation, ignore its uptodate-ness to avoid flapping, it
3413 * will change to inconsistent once the peer reaches active
3414 * syncing states.
3415 * It may have changed syncer-paused flags, however, so we
3416 * cannot ignore this completely. */
3417 if (peer_state.conn > C_CONNECTED &&
3418 peer_state.conn < C_SYNC_SOURCE)
3419 real_peer_disk = D_INCONSISTENT;
3420
3421 /* if peer_state changes to connected at the same time,
3422 * it explicitly notifies us that it finished resync.
3423 * Maybe we should finish it up, too? */
3424 else if (os.conn >= C_SYNC_SOURCE &&
3425 peer_state.conn == C_CONNECTED) {
3426 if (drbd_bm_total_weight(mdev) <= mdev->rs_failed)
3427 drbd_resync_finished(mdev);
81e84650 3428 return true;
e9ef7bb6
LE
3429 }
3430 }
3431
3432 /* peer says his disk is inconsistent, while we think it is uptodate,
3433 * and this happens while the peer still thinks we have a sync going on,
3434 * but we think we are already done with the sync.
3435 * We ignore this to avoid flapping pdsk.
3436 * This should not happen, if the peer is a recent version of drbd. */
3437 if (os.pdsk == D_UP_TO_DATE && real_peer_disk == D_INCONSISTENT &&
3438 os.conn == C_CONNECTED && peer_state.conn > C_SYNC_SOURCE)
3439 real_peer_disk = D_UP_TO_DATE;
3440
4ac4aada
LE
3441 if (ns.conn == C_WF_REPORT_PARAMS)
3442 ns.conn = C_CONNECTED;
b411b363 3443
67531718
PR
3444 if (peer_state.conn == C_AHEAD)
3445 ns.conn = C_BEHIND;
3446
b411b363
PR
3447 if (mdev->p_uuid && peer_state.disk >= D_NEGOTIATING &&
3448 get_ldev_if_state(mdev, D_NEGOTIATING)) {
3449 int cr; /* consider resync */
3450
3451 /* if we established a new connection */
4ac4aada 3452 cr = (os.conn < C_CONNECTED);
b411b363
PR
3453 /* if we had an established connection
3454 * and one of the nodes newly attaches a disk */
4ac4aada 3455 cr |= (os.conn == C_CONNECTED &&
b411b363 3456 (peer_state.disk == D_NEGOTIATING ||
4ac4aada 3457 os.disk == D_NEGOTIATING));
b411b363
PR
3458 /* if we have both been inconsistent, and the peer has been
3459 * forced to be UpToDate with --overwrite-data */
3460 cr |= test_bit(CONSIDER_RESYNC, &mdev->flags);
3461 /* if we had been plain connected, and the admin requested to
3462 * start a sync by "invalidate" or "invalidate-remote" */
4ac4aada 3463 cr |= (os.conn == C_CONNECTED &&
b411b363
PR
3464 (peer_state.conn >= C_STARTING_SYNC_S &&
3465 peer_state.conn <= C_WF_BITMAP_T));
3466
3467 if (cr)
4ac4aada 3468 ns.conn = drbd_sync_handshake(mdev, peer_state.role, real_peer_disk);
b411b363
PR
3469
3470 put_ldev(mdev);
4ac4aada
LE
3471 if (ns.conn == C_MASK) {
3472 ns.conn = C_CONNECTED;
b411b363 3473 if (mdev->state.disk == D_NEGOTIATING) {
82f59cc6 3474 drbd_force_state(mdev, NS(disk, D_FAILED));
b411b363
PR
3475 } else if (peer_state.disk == D_NEGOTIATING) {
3476 dev_err(DEV, "Disk attach process on the peer node was aborted.\n");
3477 peer_state.disk = D_DISKLESS;
580b9767 3478 real_peer_disk = D_DISKLESS;
b411b363 3479 } else {
8169e41b 3480 if (test_and_clear_bit(CONN_DRY_RUN, &mdev->tconn->flags))
81e84650 3481 return false;
4ac4aada 3482 D_ASSERT(os.conn == C_WF_REPORT_PARAMS);
38fa9988 3483 conn_request_state(mdev->tconn, NS(conn, C_DISCONNECTING), CS_HARD);
81e84650 3484 return false;
b411b363
PR
3485 }
3486 }
3487 }
3488
87eeee41 3489 spin_lock_irq(&mdev->tconn->req_lock);
4ac4aada 3490 if (mdev->state.i != os.i)
b411b363
PR
3491 goto retry;
3492 clear_bit(CONSIDER_RESYNC, &mdev->flags);
b411b363
PR
3493 ns.peer = peer_state.role;
3494 ns.pdsk = real_peer_disk;
3495 ns.peer_isp = (peer_state.aftr_isp | peer_state.user_isp);
4ac4aada 3496 if ((ns.conn == C_CONNECTED || ns.conn == C_WF_BITMAP_S) && ns.disk == D_NEGOTIATING)
b411b363 3497 ns.disk = mdev->new_state_tmp.disk;
4ac4aada
LE
3498 cs_flags = CS_VERBOSE + (os.conn < C_CONNECTED && ns.conn >= C_CONNECTED ? 0 : CS_HARD);
3499 if (ns.pdsk == D_CONSISTENT && is_susp(ns) && ns.conn == C_CONNECTED && os.conn < C_CONNECTED &&
481c6f50 3500 test_bit(NEW_CUR_UUID, &mdev->flags)) {
8554df1c 3501 /* Do not allow tl_restart(RESEND) for a rebooted peer. We can only allow this
481c6f50 3502 for temporal network outages! */
87eeee41 3503 spin_unlock_irq(&mdev->tconn->req_lock);
481c6f50 3504 dev_err(DEV, "Aborting Connect, can not thaw IO with an only Consistent peer\n");
2f5cdd0b 3505 tl_clear(mdev->tconn);
481c6f50
PR
3506 drbd_uuid_new_current(mdev);
3507 clear_bit(NEW_CUR_UUID, &mdev->flags);
38fa9988 3508 conn_request_state(mdev->tconn, NS2(conn, C_PROTOCOL_ERROR, susp, 0), CS_HARD);
81e84650 3509 return false;
481c6f50 3510 }
65d922c3 3511 rv = _drbd_set_state(mdev, ns, cs_flags, NULL);
b411b363 3512 ns = mdev->state;
87eeee41 3513 spin_unlock_irq(&mdev->tconn->req_lock);
b411b363
PR
3514
3515 if (rv < SS_SUCCESS) {
38fa9988 3516 conn_request_state(mdev->tconn, NS(conn, C_DISCONNECTING), CS_HARD);
81e84650 3517 return false;
b411b363
PR
3518 }
3519
4ac4aada
LE
3520 if (os.conn > C_WF_REPORT_PARAMS) {
3521 if (ns.conn > C_CONNECTED && peer_state.conn <= C_CONNECTED &&
b411b363
PR
3522 peer_state.disk != D_NEGOTIATING ) {
3523 /* we want resync, peer has not yet decided to sync... */
3524 /* Nowadays only used when forcing a node into primary role and
3525 setting its disk to UpToDate with that */
3526 drbd_send_uuids(mdev);
3527 drbd_send_state(mdev);
3528 }
3529 }
3530
89e58e75 3531 mdev->tconn->net_conf->want_lose = 0;
b411b363
PR
3532
3533 drbd_md_sync(mdev); /* update connected indicator, la_size, ... */
3534
81e84650 3535 return true;
b411b363
PR
3536}
3537
d8763023
AG
3538static int receive_sync_uuid(struct drbd_conf *mdev, enum drbd_packet cmd,
3539 unsigned int data_size)
b411b363 3540{
e42325a5 3541 struct p_rs_uuid *p = &mdev->tconn->data.rbuf.rs_uuid;
b411b363
PR
3542
3543 wait_event(mdev->misc_wait,
3544 mdev->state.conn == C_WF_SYNC_UUID ||
c4752ef1 3545 mdev->state.conn == C_BEHIND ||
b411b363
PR
3546 mdev->state.conn < C_CONNECTED ||
3547 mdev->state.disk < D_NEGOTIATING);
3548
3549 /* D_ASSERT( mdev->state.conn == C_WF_SYNC_UUID ); */
3550
b411b363
PR
3551 /* Here the _drbd_uuid_ functions are right, current should
3552 _not_ be rotated into the history */
3553 if (get_ldev_if_state(mdev, D_NEGOTIATING)) {
3554 _drbd_uuid_set(mdev, UI_CURRENT, be64_to_cpu(p->uuid));
3555 _drbd_uuid_set(mdev, UI_BITMAP, 0UL);
3556
62b0da3a 3557 drbd_print_uuids(mdev, "updated sync uuid");
b411b363
PR
3558 drbd_start_resync(mdev, C_SYNC_TARGET);
3559
3560 put_ldev(mdev);
3561 } else
3562 dev_err(DEV, "Ignoring SyncUUID packet!\n");
3563
81e84650 3564 return true;
b411b363
PR
3565}
3566
2c46407d
AG
3567/**
3568 * receive_bitmap_plain
3569 *
3570 * Return 0 when done, 1 when another iteration is needed, and a negative error
3571 * code upon failure.
3572 */
3573static int
02918be2
PR
3574receive_bitmap_plain(struct drbd_conf *mdev, unsigned int data_size,
3575 unsigned long *buffer, struct bm_xfer_ctx *c)
b411b363
PR
3576{
3577 unsigned num_words = min_t(size_t, BM_PACKET_WORDS, c->bm_words - c->word_offset);
3578 unsigned want = num_words * sizeof(long);
2c46407d 3579 int err;
b411b363 3580
02918be2
PR
3581 if (want != data_size) {
3582 dev_err(DEV, "%s:want (%u) != data_size (%u)\n", __func__, want, data_size);
2c46407d 3583 return -EIO;
b411b363
PR
3584 }
3585 if (want == 0)
2c46407d 3586 return 0;
de0ff338 3587 err = drbd_recv(mdev->tconn, buffer, want);
2c46407d
AG
3588 if (err != want) {
3589 if (err >= 0)
3590 err = -EIO;
3591 return err;
3592 }
b411b363
PR
3593
3594 drbd_bm_merge_lel(mdev, c->word_offset, num_words, buffer);
3595
3596 c->word_offset += num_words;
3597 c->bit_offset = c->word_offset * BITS_PER_LONG;
3598 if (c->bit_offset > c->bm_bits)
3599 c->bit_offset = c->bm_bits;
3600
2c46407d 3601 return 1;
b411b363
PR
3602}
3603
2c46407d
AG
3604/**
3605 * recv_bm_rle_bits
3606 *
3607 * Return 0 when done, 1 when another iteration is needed, and a negative error
3608 * code upon failure.
3609 */
3610static int
b411b363
PR
3611recv_bm_rle_bits(struct drbd_conf *mdev,
3612 struct p_compressed_bm *p,
c6d25cfe
PR
3613 struct bm_xfer_ctx *c,
3614 unsigned int len)
b411b363
PR
3615{
3616 struct bitstream bs;
3617 u64 look_ahead;
3618 u64 rl;
3619 u64 tmp;
3620 unsigned long s = c->bit_offset;
3621 unsigned long e;
b411b363
PR
3622 int toggle = DCBP_get_start(p);
3623 int have;
3624 int bits;
3625
3626 bitstream_init(&bs, p->code, len, DCBP_get_pad_bits(p));
3627
3628 bits = bitstream_get_bits(&bs, &look_ahead, 64);
3629 if (bits < 0)
2c46407d 3630 return -EIO;
b411b363
PR
3631
3632 for (have = bits; have > 0; s += rl, toggle = !toggle) {
3633 bits = vli_decode_bits(&rl, look_ahead);
3634 if (bits <= 0)
2c46407d 3635 return -EIO;
b411b363
PR
3636
3637 if (toggle) {
3638 e = s + rl -1;
3639 if (e >= c->bm_bits) {
3640 dev_err(DEV, "bitmap overflow (e:%lu) while decoding bm RLE packet\n", e);
2c46407d 3641 return -EIO;
b411b363
PR
3642 }
3643 _drbd_bm_set_bits(mdev, s, e);
3644 }
3645
3646 if (have < bits) {
3647 dev_err(DEV, "bitmap decoding error: h:%d b:%d la:0x%08llx l:%u/%u\n",
3648 have, bits, look_ahead,
3649 (unsigned int)(bs.cur.b - p->code),
3650 (unsigned int)bs.buf_len);
2c46407d 3651 return -EIO;
b411b363
PR
3652 }
3653 look_ahead >>= bits;
3654 have -= bits;
3655
3656 bits = bitstream_get_bits(&bs, &tmp, 64 - have);
3657 if (bits < 0)
2c46407d 3658 return -EIO;
b411b363
PR
3659 look_ahead |= tmp << have;
3660 have += bits;
3661 }
3662
3663 c->bit_offset = s;
3664 bm_xfer_ctx_bit_to_word_offset(c);
3665
2c46407d 3666 return (s != c->bm_bits);
b411b363
PR
3667}
3668
2c46407d
AG
3669/**
3670 * decode_bitmap_c
3671 *
3672 * Return 0 when done, 1 when another iteration is needed, and a negative error
3673 * code upon failure.
3674 */
3675static int
b411b363
PR
3676decode_bitmap_c(struct drbd_conf *mdev,
3677 struct p_compressed_bm *p,
c6d25cfe
PR
3678 struct bm_xfer_ctx *c,
3679 unsigned int len)
b411b363
PR
3680{
3681 if (DCBP_get_code(p) == RLE_VLI_Bits)
c6d25cfe 3682 return recv_bm_rle_bits(mdev, p, c, len);
b411b363
PR
3683
3684 /* other variants had been implemented for evaluation,
3685 * but have been dropped as this one turned out to be "best"
3686 * during all our tests. */
3687
3688 dev_err(DEV, "receive_bitmap_c: unknown encoding %u\n", p->encoding);
38fa9988 3689 conn_request_state(mdev->tconn, NS(conn, C_PROTOCOL_ERROR), CS_HARD);
2c46407d 3690 return -EIO;
b411b363
PR
3691}
3692
3693void INFO_bm_xfer_stats(struct drbd_conf *mdev,
3694 const char *direction, struct bm_xfer_ctx *c)
3695{
3696 /* what would it take to transfer it "plaintext" */
c012949a 3697 unsigned plain = sizeof(struct p_header) *
b411b363
PR
3698 ((c->bm_words+BM_PACKET_WORDS-1)/BM_PACKET_WORDS+1)
3699 + c->bm_words * sizeof(long);
3700 unsigned total = c->bytes[0] + c->bytes[1];
3701 unsigned r;
3702
3703 /* total can not be zero. but just in case: */
3704 if (total == 0)
3705 return;
3706
3707 /* don't report if not compressed */
3708 if (total >= plain)
3709 return;
3710
3711 /* total < plain. check for overflow, still */
3712 r = (total > UINT_MAX/1000) ? (total / (plain/1000))
3713 : (1000 * total / plain);
3714
3715 if (r > 1000)
3716 r = 1000;
3717
3718 r = 1000 - r;
3719 dev_info(DEV, "%s bitmap stats [Bytes(packets)]: plain %u(%u), RLE %u(%u), "
3720 "total %u; compression: %u.%u%%\n",
3721 direction,
3722 c->bytes[1], c->packets[1],
3723 c->bytes[0], c->packets[0],
3724 total, r/10, r % 10);
3725}
3726
3727/* Since we are processing the bitfield from lower addresses to higher,
3728 it does not matter if the process it in 32 bit chunks or 64 bit
3729 chunks as long as it is little endian. (Understand it as byte stream,
3730 beginning with the lowest byte...) If we would use big endian
3731 we would need to process it from the highest address to the lowest,
3732 in order to be agnostic to the 32 vs 64 bits issue.
3733
3734 returns 0 on failure, 1 if we successfully received it. */
d8763023
AG
3735static int receive_bitmap(struct drbd_conf *mdev, enum drbd_packet cmd,
3736 unsigned int data_size)
b411b363
PR
3737{
3738 struct bm_xfer_ctx c;
3739 void *buffer;
2c46407d 3740 int err;
81e84650 3741 int ok = false;
257d0af6 3742 struct p_header *h = &mdev->tconn->data.rbuf.header;
77351055 3743 struct packet_info pi;
b411b363 3744
20ceb2b2
LE
3745 drbd_bm_lock(mdev, "receive bitmap", BM_LOCKED_SET_ALLOWED);
3746 /* you are supposed to send additional out-of-sync information
3747 * if you actually set bits during this phase */
b411b363
PR
3748
3749 /* maybe we should use some per thread scratch page,
3750 * and allocate that during initial device creation? */
3751 buffer = (unsigned long *) __get_free_page(GFP_NOIO);
3752 if (!buffer) {
3753 dev_err(DEV, "failed to allocate one page buffer in %s\n", __func__);
3754 goto out;
3755 }
3756
3757 c = (struct bm_xfer_ctx) {
3758 .bm_bits = drbd_bm_bits(mdev),
3759 .bm_words = drbd_bm_words(mdev),
3760 };
3761
2c46407d 3762 for(;;) {
02918be2 3763 if (cmd == P_BITMAP) {
2c46407d 3764 err = receive_bitmap_plain(mdev, data_size, buffer, &c);
02918be2 3765 } else if (cmd == P_COMPRESSED_BITMAP) {
b411b363
PR
3766 /* MAYBE: sanity check that we speak proto >= 90,
3767 * and the feature is enabled! */
3768 struct p_compressed_bm *p;
3769
02918be2 3770 if (data_size > BM_PACKET_PAYLOAD_BYTES) {
b411b363
PR
3771 dev_err(DEV, "ReportCBitmap packet too large\n");
3772 goto out;
3773 }
3774 /* use the page buff */
3775 p = buffer;
3776 memcpy(p, h, sizeof(*h));
de0ff338 3777 if (drbd_recv(mdev->tconn, p->head.payload, data_size) != data_size)
b411b363 3778 goto out;
004352fa
LE
3779 if (data_size <= (sizeof(*p) - sizeof(p->head))) {
3780 dev_err(DEV, "ReportCBitmap packet too small (l:%u)\n", data_size);
78fcbdae 3781 goto out;
b411b363 3782 }
c6d25cfe 3783 err = decode_bitmap_c(mdev, p, &c, data_size);
b411b363 3784 } else {
02918be2 3785 dev_warn(DEV, "receive_bitmap: cmd neither ReportBitMap nor ReportCBitMap (is 0x%x)", cmd);
b411b363
PR
3786 goto out;
3787 }
3788
02918be2 3789 c.packets[cmd == P_BITMAP]++;
257d0af6 3790 c.bytes[cmd == P_BITMAP] += sizeof(struct p_header) + data_size;
b411b363 3791
2c46407d
AG
3792 if (err <= 0) {
3793 if (err < 0)
3794 goto out;
b411b363 3795 break;
2c46407d 3796 }
69bc7bc3 3797 if (drbd_recv_header(mdev->tconn, &pi))
b411b363 3798 goto out;
77351055
PR
3799 cmd = pi.cmd;
3800 data_size = pi.size;
2c46407d 3801 }
b411b363
PR
3802
3803 INFO_bm_xfer_stats(mdev, "receive", &c);
3804
3805 if (mdev->state.conn == C_WF_BITMAP_T) {
de1f8e4a
AG
3806 enum drbd_state_rv rv;
3807
b411b363
PR
3808 ok = !drbd_send_bitmap(mdev);
3809 if (!ok)
3810 goto out;
3811 /* Omit CS_ORDERED with this state transition to avoid deadlocks. */
de1f8e4a
AG
3812 rv = _drbd_request_state(mdev, NS(conn, C_WF_SYNC_UUID), CS_VERBOSE);
3813 D_ASSERT(rv == SS_SUCCESS);
b411b363
PR
3814 } else if (mdev->state.conn != C_WF_BITMAP_S) {
3815 /* admin may have requested C_DISCONNECTING,
3816 * other threads may have noticed network errors */
3817 dev_info(DEV, "unexpected cstate (%s) in receive_bitmap\n",
3818 drbd_conn_str(mdev->state.conn));
3819 }
3820
81e84650 3821 ok = true;
b411b363 3822 out:
20ceb2b2 3823 drbd_bm_unlock(mdev);
b411b363
PR
3824 if (ok && mdev->state.conn == C_WF_BITMAP_S)
3825 drbd_start_resync(mdev, C_SYNC_SOURCE);
3826 free_page((unsigned long) buffer);
3827 return ok;
3828}
3829
2de876ef 3830static int _tconn_receive_skip(struct drbd_tconn *tconn, unsigned int data_size)
b411b363
PR
3831{
3832 /* TODO zero copy sink :) */
3833 static char sink[128];
3834 int size, want, r;
3835
02918be2 3836 size = data_size;
b411b363
PR
3837 while (size > 0) {
3838 want = min_t(int, size, sizeof(sink));
2de876ef
PR
3839 r = drbd_recv(tconn, sink, want);
3840 if (r <= 0)
841ce241 3841 break;
b411b363
PR
3842 size -= r;
3843 }
3844 return size == 0;
3845}
3846
2de876ef
PR
3847static int receive_skip(struct drbd_conf *mdev, enum drbd_packet cmd,
3848 unsigned int data_size)
3849{
3850 dev_warn(DEV, "skipping unknown optional packet type %d, l: %d!\n",
3851 cmd, data_size);
3852
3853 return _tconn_receive_skip(mdev->tconn, data_size);
3854}
3855
3856static int tconn_receive_skip(struct drbd_tconn *tconn, enum drbd_packet cmd, unsigned int data_size)
3857{
3858 conn_warn(tconn, "skipping packet for non existing volume type %d, l: %d!\n",
3859 cmd, data_size);
3860
3861 return _tconn_receive_skip(tconn, data_size);
3862}
3863
d8763023
AG
3864static int receive_UnplugRemote(struct drbd_conf *mdev, enum drbd_packet cmd,
3865 unsigned int data_size)
0ced55a3 3866{
e7f52dfb
LE
3867 /* Make sure we've acked all the TCP data associated
3868 * with the data requests being unplugged */
e42325a5 3869 drbd_tcp_quickack(mdev->tconn->data.socket);
0ced55a3 3870
81e84650 3871 return true;
0ced55a3
PR
3872}
3873
d8763023
AG
3874static int receive_out_of_sync(struct drbd_conf *mdev, enum drbd_packet cmd,
3875 unsigned int data_size)
73a01a18 3876{
e42325a5 3877 struct p_block_desc *p = &mdev->tconn->data.rbuf.block_desc;
73a01a18 3878
f735e363
LE
3879 switch (mdev->state.conn) {
3880 case C_WF_SYNC_UUID:
3881 case C_WF_BITMAP_T:
3882 case C_BEHIND:
3883 break;
3884 default:
3885 dev_err(DEV, "ASSERT FAILED cstate = %s, expected: WFSyncUUID|WFBitMapT|Behind\n",
3886 drbd_conn_str(mdev->state.conn));
3887 }
3888
73a01a18
PR
3889 drbd_set_out_of_sync(mdev, be64_to_cpu(p->sector), be32_to_cpu(p->blksize));
3890
81e84650 3891 return true;
73a01a18
PR
3892}
3893
02918be2
PR
3894struct data_cmd {
3895 int expect_payload;
3896 size_t pkt_size;
a4fbda8e 3897 enum mdev_or_conn fa_type; /* first argument's type */
d9ae84e7
PR
3898 union {
3899 int (*mdev_fn)(struct drbd_conf *, enum drbd_packet cmd,
3900 unsigned int to_receive);
3901 int (*conn_fn)(struct drbd_tconn *, enum drbd_packet cmd,
3902 unsigned int to_receive);
3903 };
02918be2
PR
3904};
3905
3906static struct data_cmd drbd_cmd_handler[] = {
d9ae84e7
PR
3907 [P_DATA] = { 1, sizeof(struct p_data), MDEV, { receive_Data } },
3908 [P_DATA_REPLY] = { 1, sizeof(struct p_data), MDEV, { receive_DataReply } },
3909 [P_RS_DATA_REPLY] = { 1, sizeof(struct p_data), MDEV, { receive_RSDataReply } } ,
3910 [P_BARRIER] = { 0, sizeof(struct p_barrier), MDEV, { receive_Barrier } } ,
3911 [P_BITMAP] = { 1, sizeof(struct p_header), MDEV, { receive_bitmap } } ,
3912 [P_COMPRESSED_BITMAP] = { 1, sizeof(struct p_header), MDEV, { receive_bitmap } } ,
3913 [P_UNPLUG_REMOTE] = { 0, sizeof(struct p_header), MDEV, { receive_UnplugRemote } },
3914 [P_DATA_REQUEST] = { 0, sizeof(struct p_block_req), MDEV, { receive_DataRequest } },
3915 [P_RS_DATA_REQUEST] = { 0, sizeof(struct p_block_req), MDEV, { receive_DataRequest } },
3916 [P_SYNC_PARAM] = { 1, sizeof(struct p_header), MDEV, { receive_SyncParam } },
3917 [P_SYNC_PARAM89] = { 1, sizeof(struct p_header), MDEV, { receive_SyncParam } },
7204624c 3918 [P_PROTOCOL] = { 1, sizeof(struct p_protocol), CONN, { .conn_fn = receive_protocol } },
d9ae84e7
PR
3919 [P_UUIDS] = { 0, sizeof(struct p_uuids), MDEV, { receive_uuids } },
3920 [P_SIZES] = { 0, sizeof(struct p_sizes), MDEV, { receive_sizes } },
3921 [P_STATE] = { 0, sizeof(struct p_state), MDEV, { receive_state } },
3922 [P_STATE_CHG_REQ] = { 0, sizeof(struct p_req_state), MDEV, { receive_req_state } },
3923 [P_SYNC_UUID] = { 0, sizeof(struct p_rs_uuid), MDEV, { receive_sync_uuid } },
3924 [P_OV_REQUEST] = { 0, sizeof(struct p_block_req), MDEV, { receive_DataRequest } },
3925 [P_OV_REPLY] = { 1, sizeof(struct p_block_req), MDEV, { receive_DataRequest } },
3926 [P_CSUM_RS_REQUEST] = { 1, sizeof(struct p_block_req), MDEV, { receive_DataRequest } },
3927 [P_DELAY_PROBE] = { 0, sizeof(struct p_delay_probe93), MDEV, { receive_skip } },
3928 [P_OUT_OF_SYNC] = { 0, sizeof(struct p_block_desc), MDEV, { receive_out_of_sync } },
dfafcc8a 3929 [P_CONN_ST_CHG_REQ] = { 0, sizeof(struct p_req_state), CONN, { .conn_fn = receive_req_conn_state } },
b411b363
PR
3930};
3931
02918be2 3932/* All handler functions that expect a sub-header get that sub-heder in
e42325a5 3933 mdev->tconn->data.rbuf.header.head.payload.
02918be2 3934
e42325a5 3935 Usually in mdev->tconn->data.rbuf.header.head the callback can find the usual
02918be2
PR
3936 p_header, but they may not rely on that. Since there is also p_header95 !
3937 */
b411b363 3938
eefc2f7d 3939static void drbdd(struct drbd_tconn *tconn)
b411b363 3940{
eefc2f7d 3941 struct p_header *header = &tconn->data.rbuf.header;
77351055 3942 struct packet_info pi;
02918be2
PR
3943 size_t shs; /* sub header size */
3944 int rv;
b411b363 3945
eefc2f7d
PR
3946 while (get_t_state(&tconn->receiver) == RUNNING) {
3947 drbd_thread_current_set_cpu(&tconn->receiver);
69bc7bc3 3948 if (drbd_recv_header(tconn, &pi))
02918be2 3949 goto err_out;
b411b363 3950
6e849ce8 3951 if (unlikely(pi.cmd >= ARRAY_SIZE(drbd_cmd_handler) ||
d9ae84e7 3952 !drbd_cmd_handler[pi.cmd].mdev_fn)) {
eefc2f7d 3953 conn_err(tconn, "unknown packet type %d, l: %d!\n", pi.cmd, pi.size);
02918be2 3954 goto err_out;
0b33a916 3955 }
b411b363 3956
77351055
PR
3957 shs = drbd_cmd_handler[pi.cmd].pkt_size - sizeof(struct p_header);
3958 if (pi.size - shs > 0 && !drbd_cmd_handler[pi.cmd].expect_payload) {
eefc2f7d 3959 conn_err(tconn, "No payload expected %s l:%d\n", cmdname(pi.cmd), pi.size);
02918be2 3960 goto err_out;
b411b363 3961 }
b411b363 3962
c13f7e1a 3963 if (shs) {
eefc2f7d 3964 rv = drbd_recv(tconn, &header->payload, shs);
c13f7e1a 3965 if (unlikely(rv != shs)) {
0ddc5549 3966 if (!signal_pending(current))
eefc2f7d 3967 conn_warn(tconn, "short read while reading sub header: rv=%d\n", rv);
c13f7e1a
LE
3968 goto err_out;
3969 }
3970 }
3971
a4fbda8e 3972 if (drbd_cmd_handler[pi.cmd].fa_type == CONN) {
d9ae84e7
PR
3973 rv = drbd_cmd_handler[pi.cmd].conn_fn(tconn, pi.cmd, pi.size - shs);
3974 } else {
3975 struct drbd_conf *mdev = vnr_to_mdev(tconn, pi.vnr);
3976 rv = mdev ?
3977 drbd_cmd_handler[pi.cmd].mdev_fn(mdev, pi.cmd, pi.size - shs) :
3978 tconn_receive_skip(tconn, pi.cmd, pi.size - shs);
3979 }
b411b363 3980
02918be2 3981 if (unlikely(!rv)) {
eefc2f7d 3982 conn_err(tconn, "error receiving %s, l: %d!\n",
77351055 3983 cmdname(pi.cmd), pi.size);
02918be2 3984 goto err_out;
b411b363
PR
3985 }
3986 }
b411b363 3987
02918be2
PR
3988 if (0) {
3989 err_out:
bbeb641c 3990 conn_request_state(tconn, NS(conn, C_PROTOCOL_ERROR), CS_HARD);
02918be2 3991 }
b411b363
PR
3992}
3993
0e29d163 3994void conn_flush_workqueue(struct drbd_tconn *tconn)
b411b363
PR
3995{
3996 struct drbd_wq_barrier barr;
3997
3998 barr.w.cb = w_prev_work_done;
0e29d163 3999 barr.w.tconn = tconn;
b411b363 4000 init_completion(&barr.done);
0e29d163 4001 drbd_queue_work(&tconn->data.work, &barr.w);
b411b363
PR
4002 wait_for_completion(&barr.done);
4003}
4004
360cc740 4005static void drbd_disconnect(struct drbd_tconn *tconn)
b411b363 4006{
bbeb641c 4007 enum drbd_conns oc;
b411b363 4008 int rv = SS_UNKNOWN_ERROR;
b411b363 4009
bbeb641c 4010 if (tconn->cstate == C_STANDALONE)
b411b363 4011 return;
b411b363
PR
4012
4013 /* asender does not clean up anything. it must not interfere, either */
360cc740
PR
4014 drbd_thread_stop(&tconn->asender);
4015 drbd_free_sock(tconn);
4016
4017 idr_for_each(&tconn->volumes, drbd_disconnected, tconn);
4018
4019 conn_info(tconn, "Connection closed\n");
4020
4021 spin_lock_irq(&tconn->req_lock);
bbeb641c
PR
4022 oc = tconn->cstate;
4023 if (oc >= C_UNCONNECTED)
4024 rv = _conn_request_state(tconn, NS(conn, C_UNCONNECTED), CS_VERBOSE);
4025
360cc740
PR
4026 spin_unlock_irq(&tconn->req_lock);
4027
bbeb641c 4028 if (oc == C_DISCONNECTING) {
360cc740
PR
4029 wait_event(tconn->net_cnt_wait, atomic_read(&tconn->net_cnt) == 0);
4030
4031 crypto_free_hash(tconn->cram_hmac_tfm);
4032 tconn->cram_hmac_tfm = NULL;
4033
4034 kfree(tconn->net_conf);
4035 tconn->net_conf = NULL;
bbeb641c 4036 conn_request_state(tconn, NS(conn, C_STANDALONE), CS_VERBOSE);
360cc740
PR
4037 }
4038}
4039
4040static int drbd_disconnected(int vnr, void *p, void *data)
4041{
4042 struct drbd_conf *mdev = (struct drbd_conf *)p;
4043 enum drbd_fencing_p fp;
4044 unsigned int i;
b411b363 4045
85719573 4046 /* wait for current activity to cease. */
87eeee41 4047 spin_lock_irq(&mdev->tconn->req_lock);
b411b363
PR
4048 _drbd_wait_ee_list_empty(mdev, &mdev->active_ee);
4049 _drbd_wait_ee_list_empty(mdev, &mdev->sync_ee);
4050 _drbd_wait_ee_list_empty(mdev, &mdev->read_ee);
87eeee41 4051 spin_unlock_irq(&mdev->tconn->req_lock);
b411b363
PR
4052
4053 /* We do not have data structures that would allow us to
4054 * get the rs_pending_cnt down to 0 again.
4055 * * On C_SYNC_TARGET we do not have any data structures describing
4056 * the pending RSDataRequest's we have sent.
4057 * * On C_SYNC_SOURCE there is no data structure that tracks
4058 * the P_RS_DATA_REPLY blocks that we sent to the SyncTarget.
4059 * And no, it is not the sum of the reference counts in the
4060 * resync_LRU. The resync_LRU tracks the whole operation including
4061 * the disk-IO, while the rs_pending_cnt only tracks the blocks
4062 * on the fly. */
4063 drbd_rs_cancel_all(mdev);
4064 mdev->rs_total = 0;
4065 mdev->rs_failed = 0;
4066 atomic_set(&mdev->rs_pending_cnt, 0);
4067 wake_up(&mdev->misc_wait);
4068
7fde2be9
PR
4069 del_timer(&mdev->request_timer);
4070
b411b363 4071 del_timer_sync(&mdev->resync_timer);
b411b363
PR
4072 resync_timer_fn((unsigned long)mdev);
4073
b411b363
PR
4074 /* wait for all w_e_end_data_req, w_e_end_rsdata_req, w_send_barrier,
4075 * w_make_resync_request etc. which may still be on the worker queue
4076 * to be "canceled" */
a21e9298 4077 drbd_flush_workqueue(mdev);
b411b363
PR
4078
4079 /* This also does reclaim_net_ee(). If we do this too early, we might
4080 * miss some resync ee and pages.*/
4081 drbd_process_done_ee(mdev);
4082
4083 kfree(mdev->p_uuid);
4084 mdev->p_uuid = NULL;
4085
fb22c402 4086 if (!is_susp(mdev->state))
2f5cdd0b 4087 tl_clear(mdev->tconn);
b411b363 4088
b411b363
PR
4089 drbd_md_sync(mdev);
4090
4091 fp = FP_DONT_CARE;
4092 if (get_ldev(mdev)) {
4093 fp = mdev->ldev->dc.fencing;
4094 put_ldev(mdev);
4095 }
4096
87f7be4c
PR
4097 if (mdev->state.role == R_PRIMARY && fp >= FP_RESOURCE && mdev->state.pdsk >= D_UNKNOWN)
4098 drbd_try_outdate_peer_async(mdev);
b411b363 4099
20ceb2b2
LE
4100 /* serialize with bitmap writeout triggered by the state change,
4101 * if any. */
4102 wait_event(mdev->misc_wait, !test_bit(BITMAP_IO, &mdev->flags));
4103
b411b363
PR
4104 /* tcp_close and release of sendpage pages can be deferred. I don't
4105 * want to use SO_LINGER, because apparently it can be deferred for
4106 * more than 20 seconds (longest time I checked).
4107 *
4108 * Actually we don't care for exactly when the network stack does its
4109 * put_page(), but release our reference on these pages right here.
4110 */
4111 i = drbd_release_ee(mdev, &mdev->net_ee);
4112 if (i)
4113 dev_info(DEV, "net_ee not empty, killed %u entries\n", i);
435f0740
LE
4114 i = atomic_read(&mdev->pp_in_use_by_net);
4115 if (i)
4116 dev_info(DEV, "pp_in_use_by_net = %d, expected 0\n", i);
b411b363
PR
4117 i = atomic_read(&mdev->pp_in_use);
4118 if (i)
45bb912b 4119 dev_info(DEV, "pp_in_use = %d, expected 0\n", i);
b411b363
PR
4120
4121 D_ASSERT(list_empty(&mdev->read_ee));
4122 D_ASSERT(list_empty(&mdev->active_ee));
4123 D_ASSERT(list_empty(&mdev->sync_ee));
4124 D_ASSERT(list_empty(&mdev->done_ee));
4125
4126 /* ok, no more ee's on the fly, it is safe to reset the epoch_size */
4127 atomic_set(&mdev->current_epoch->epoch_size, 0);
4128 D_ASSERT(list_empty(&mdev->current_epoch->list));
360cc740
PR
4129
4130 return 0;
b411b363
PR
4131}
4132
4133/*
4134 * We support PRO_VERSION_MIN to PRO_VERSION_MAX. The protocol version
4135 * we can agree on is stored in agreed_pro_version.
4136 *
4137 * feature flags and the reserved array should be enough room for future
4138 * enhancements of the handshake protocol, and possible plugins...
4139 *
4140 * for now, they are expected to be zero, but ignored.
4141 */
8a22cccc 4142static int drbd_send_handshake(struct drbd_tconn *tconn)
b411b363 4143{
e6b3ea83 4144 /* ASSERT current == mdev->tconn->receiver ... */
8a22cccc 4145 struct p_handshake *p = &tconn->data.sbuf.handshake;
e8d17b01 4146 int err;
b411b363 4147
8a22cccc
PR
4148 if (mutex_lock_interruptible(&tconn->data.mutex)) {
4149 conn_err(tconn, "interrupted during initial handshake\n");
e8d17b01 4150 return -EINTR;
b411b363
PR
4151 }
4152
8a22cccc
PR
4153 if (tconn->data.socket == NULL) {
4154 mutex_unlock(&tconn->data.mutex);
e8d17b01 4155 return -EIO;
b411b363
PR
4156 }
4157
4158 memset(p, 0, sizeof(*p));
4159 p->protocol_min = cpu_to_be32(PRO_VERSION_MIN);
4160 p->protocol_max = cpu_to_be32(PRO_VERSION_MAX);
e8d17b01 4161 err = _conn_send_cmd(tconn, 0, tconn->data.socket, P_HAND_SHAKE,
ecf2363c 4162 &p->head, sizeof(*p), 0);
8a22cccc 4163 mutex_unlock(&tconn->data.mutex);
e8d17b01 4164 return err;
b411b363
PR
4165}
4166
4167/*
4168 * return values:
4169 * 1 yes, we have a valid connection
4170 * 0 oops, did not work out, please try again
4171 * -1 peer talks different language,
4172 * no point in trying again, please go standalone.
4173 */
65d11ed6 4174static int drbd_do_handshake(struct drbd_tconn *tconn)
b411b363 4175{
65d11ed6
PR
4176 /* ASSERT current == tconn->receiver ... */
4177 struct p_handshake *p = &tconn->data.rbuf.handshake;
02918be2 4178 const int expect = sizeof(struct p_handshake) - sizeof(struct p_header80);
77351055 4179 struct packet_info pi;
e8d17b01 4180 int err, rv;
b411b363 4181
e8d17b01
AG
4182 err = drbd_send_handshake(tconn);
4183 if (err)
b411b363
PR
4184 return 0;
4185
69bc7bc3
AG
4186 err = drbd_recv_header(tconn, &pi);
4187 if (err)
b411b363
PR
4188 return 0;
4189
77351055 4190 if (pi.cmd != P_HAND_SHAKE) {
65d11ed6 4191 conn_err(tconn, "expected HandShake packet, received: %s (0x%04x)\n",
77351055 4192 cmdname(pi.cmd), pi.cmd);
b411b363
PR
4193 return -1;
4194 }
4195
77351055 4196 if (pi.size != expect) {
65d11ed6 4197 conn_err(tconn, "expected HandShake length: %u, received: %u\n",
77351055 4198 expect, pi.size);
b411b363
PR
4199 return -1;
4200 }
4201
65d11ed6 4202 rv = drbd_recv(tconn, &p->head.payload, expect);
b411b363
PR
4203
4204 if (rv != expect) {
0ddc5549 4205 if (!signal_pending(current))
65d11ed6 4206 conn_warn(tconn, "short read receiving handshake packet: l=%u\n", rv);
b411b363
PR
4207 return 0;
4208 }
4209
b411b363
PR
4210 p->protocol_min = be32_to_cpu(p->protocol_min);
4211 p->protocol_max = be32_to_cpu(p->protocol_max);
4212 if (p->protocol_max == 0)
4213 p->protocol_max = p->protocol_min;
4214
4215 if (PRO_VERSION_MAX < p->protocol_min ||
4216 PRO_VERSION_MIN > p->protocol_max)
4217 goto incompat;
4218
65d11ed6 4219 tconn->agreed_pro_version = min_t(int, PRO_VERSION_MAX, p->protocol_max);
b411b363 4220
65d11ed6
PR
4221 conn_info(tconn, "Handshake successful: "
4222 "Agreed network protocol version %d\n", tconn->agreed_pro_version);
b411b363
PR
4223
4224 return 1;
4225
4226 incompat:
65d11ed6 4227 conn_err(tconn, "incompatible DRBD dialects: "
b411b363
PR
4228 "I support %d-%d, peer supports %d-%d\n",
4229 PRO_VERSION_MIN, PRO_VERSION_MAX,
4230 p->protocol_min, p->protocol_max);
4231 return -1;
4232}
4233
4234#if !defined(CONFIG_CRYPTO_HMAC) && !defined(CONFIG_CRYPTO_HMAC_MODULE)
13e6037d 4235static int drbd_do_auth(struct drbd_tconn *tconn)
b411b363
PR
4236{
4237 dev_err(DEV, "This kernel was build without CONFIG_CRYPTO_HMAC.\n");
4238 dev_err(DEV, "You need to disable 'cram-hmac-alg' in drbd.conf.\n");
b10d96cb 4239 return -1;
b411b363
PR
4240}
4241#else
4242#define CHALLENGE_LEN 64
b10d96cb
JT
4243
4244/* Return value:
4245 1 - auth succeeded,
4246 0 - failed, try again (network error),
4247 -1 - auth failed, don't try again.
4248*/
4249
13e6037d 4250static int drbd_do_auth(struct drbd_tconn *tconn)
b411b363
PR
4251{
4252 char my_challenge[CHALLENGE_LEN]; /* 64 Bytes... */
4253 struct scatterlist sg;
4254 char *response = NULL;
4255 char *right_response = NULL;
4256 char *peers_ch = NULL;
13e6037d 4257 unsigned int key_len = strlen(tconn->net_conf->shared_secret);
b411b363
PR
4258 unsigned int resp_size;
4259 struct hash_desc desc;
77351055 4260 struct packet_info pi;
69bc7bc3 4261 int err, rv;
b411b363 4262
13e6037d 4263 desc.tfm = tconn->cram_hmac_tfm;
b411b363
PR
4264 desc.flags = 0;
4265
13e6037d
PR
4266 rv = crypto_hash_setkey(tconn->cram_hmac_tfm,
4267 (u8 *)tconn->net_conf->shared_secret, key_len);
b411b363 4268 if (rv) {
13e6037d 4269 conn_err(tconn, "crypto_hash_setkey() failed with %d\n", rv);
b10d96cb 4270 rv = -1;
b411b363
PR
4271 goto fail;
4272 }
4273
4274 get_random_bytes(my_challenge, CHALLENGE_LEN);
4275
ce9879cb 4276 rv = !conn_send_cmd2(tconn, P_AUTH_CHALLENGE, my_challenge, CHALLENGE_LEN);
b411b363
PR
4277 if (!rv)
4278 goto fail;
4279
69bc7bc3
AG
4280 err = drbd_recv_header(tconn, &pi);
4281 if (err) {
4282 rv = 0;
b411b363 4283 goto fail;
69bc7bc3 4284 }
b411b363 4285
77351055 4286 if (pi.cmd != P_AUTH_CHALLENGE) {
13e6037d 4287 conn_err(tconn, "expected AuthChallenge packet, received: %s (0x%04x)\n",
77351055 4288 cmdname(pi.cmd), pi.cmd);
b411b363
PR
4289 rv = 0;
4290 goto fail;
4291 }
4292
77351055 4293 if (pi.size > CHALLENGE_LEN * 2) {
13e6037d 4294 conn_err(tconn, "expected AuthChallenge payload too big.\n");
b10d96cb 4295 rv = -1;
b411b363
PR
4296 goto fail;
4297 }
4298
77351055 4299 peers_ch = kmalloc(pi.size, GFP_NOIO);
b411b363 4300 if (peers_ch == NULL) {
13e6037d 4301 conn_err(tconn, "kmalloc of peers_ch failed\n");
b10d96cb 4302 rv = -1;
b411b363
PR
4303 goto fail;
4304 }
4305
13e6037d 4306 rv = drbd_recv(tconn, peers_ch, pi.size);
b411b363 4307
77351055 4308 if (rv != pi.size) {
0ddc5549 4309 if (!signal_pending(current))
13e6037d 4310 conn_warn(tconn, "short read AuthChallenge: l=%u\n", rv);
b411b363
PR
4311 rv = 0;
4312 goto fail;
4313 }
4314
13e6037d 4315 resp_size = crypto_hash_digestsize(tconn->cram_hmac_tfm);
b411b363
PR
4316 response = kmalloc(resp_size, GFP_NOIO);
4317 if (response == NULL) {
13e6037d 4318 conn_err(tconn, "kmalloc of response failed\n");
b10d96cb 4319 rv = -1;
b411b363
PR
4320 goto fail;
4321 }
4322
4323 sg_init_table(&sg, 1);
77351055 4324 sg_set_buf(&sg, peers_ch, pi.size);
b411b363
PR
4325
4326 rv = crypto_hash_digest(&desc, &sg, sg.length, response);
4327 if (rv) {
13e6037d 4328 conn_err(tconn, "crypto_hash_digest() failed with %d\n", rv);
b10d96cb 4329 rv = -1;
b411b363
PR
4330 goto fail;
4331 }
4332
ce9879cb 4333 rv = !conn_send_cmd2(tconn, P_AUTH_RESPONSE, response, resp_size);
b411b363
PR
4334 if (!rv)
4335 goto fail;
4336
69bc7bc3
AG
4337 err = drbd_recv_header(tconn, &pi);
4338 if (err) {
4339 rv = 0;
b411b363 4340 goto fail;
69bc7bc3 4341 }
b411b363 4342
77351055 4343 if (pi.cmd != P_AUTH_RESPONSE) {
13e6037d 4344 conn_err(tconn, "expected AuthResponse packet, received: %s (0x%04x)\n",
77351055 4345 cmdname(pi.cmd), pi.cmd);
b411b363
PR
4346 rv = 0;
4347 goto fail;
4348 }
4349
77351055 4350 if (pi.size != resp_size) {
13e6037d 4351 conn_err(tconn, "expected AuthResponse payload of wrong size\n");
b411b363
PR
4352 rv = 0;
4353 goto fail;
4354 }
4355
13e6037d 4356 rv = drbd_recv(tconn, response , resp_size);
b411b363
PR
4357
4358 if (rv != resp_size) {
0ddc5549 4359 if (!signal_pending(current))
13e6037d 4360 conn_warn(tconn, "short read receiving AuthResponse: l=%u\n", rv);
b411b363
PR
4361 rv = 0;
4362 goto fail;
4363 }
4364
4365 right_response = kmalloc(resp_size, GFP_NOIO);
2d1ee87d 4366 if (right_response == NULL) {
13e6037d 4367 conn_err(tconn, "kmalloc of right_response failed\n");
b10d96cb 4368 rv = -1;
b411b363
PR
4369 goto fail;
4370 }
4371
4372 sg_set_buf(&sg, my_challenge, CHALLENGE_LEN);
4373
4374 rv = crypto_hash_digest(&desc, &sg, sg.length, right_response);
4375 if (rv) {
13e6037d 4376 conn_err(tconn, "crypto_hash_digest() failed with %d\n", rv);
b10d96cb 4377 rv = -1;
b411b363
PR
4378 goto fail;
4379 }
4380
4381 rv = !memcmp(response, right_response, resp_size);
4382
4383 if (rv)
13e6037d
PR
4384 conn_info(tconn, "Peer authenticated using %d bytes of '%s' HMAC\n",
4385 resp_size, tconn->net_conf->cram_hmac_alg);
b10d96cb
JT
4386 else
4387 rv = -1;
b411b363
PR
4388
4389 fail:
4390 kfree(peers_ch);
4391 kfree(response);
4392 kfree(right_response);
4393
4394 return rv;
4395}
4396#endif
4397
4398int drbdd_init(struct drbd_thread *thi)
4399{
392c8801 4400 struct drbd_tconn *tconn = thi->tconn;
b411b363
PR
4401 int h;
4402
4d641dd7 4403 conn_info(tconn, "receiver (re)started\n");
b411b363
PR
4404
4405 do {
4d641dd7 4406 h = drbd_connect(tconn);
b411b363 4407 if (h == 0) {
4d641dd7 4408 drbd_disconnect(tconn);
20ee6390 4409 schedule_timeout_interruptible(HZ);
b411b363
PR
4410 }
4411 if (h == -1) {
4d641dd7 4412 conn_warn(tconn, "Discarding network configuration.\n");
bbeb641c 4413 conn_request_state(tconn, NS(conn, C_DISCONNECTING), CS_HARD);
b411b363
PR
4414 }
4415 } while (h == 0);
4416
4417 if (h > 0) {
4d641dd7
PR
4418 if (get_net_conf(tconn)) {
4419 drbdd(tconn);
4420 put_net_conf(tconn);
b411b363
PR
4421 }
4422 }
4423
4d641dd7 4424 drbd_disconnect(tconn);
b411b363 4425
4d641dd7 4426 conn_info(tconn, "receiver terminated\n");
b411b363
PR
4427 return 0;
4428}
4429
4430/* ********* acknowledge sender ******** */
4431
e4f78ede
PR
4432static int got_conn_RqSReply(struct drbd_tconn *tconn, enum drbd_packet cmd)
4433{
4434 struct p_req_state_reply *p = &tconn->meta.rbuf.req_state_reply;
4435 int retcode = be32_to_cpu(p->retcode);
4436
4437 if (retcode >= SS_SUCCESS) {
4438 set_bit(CONN_WD_ST_CHG_OKAY, &tconn->flags);
4439 } else {
4440 set_bit(CONN_WD_ST_CHG_FAIL, &tconn->flags);
4441 conn_err(tconn, "Requested state change failed by peer: %s (%d)\n",
4442 drbd_set_st_err_str(retcode), retcode);
4443 }
4444 wake_up(&tconn->ping_wait);
4445
4446 return true;
4447}
4448
d8763023 4449static int got_RqSReply(struct drbd_conf *mdev, enum drbd_packet cmd)
b411b363 4450{
257d0af6 4451 struct p_req_state_reply *p = &mdev->tconn->meta.rbuf.req_state_reply;
b411b363
PR
4452 int retcode = be32_to_cpu(p->retcode);
4453
e4f78ede
PR
4454 if (retcode >= SS_SUCCESS) {
4455 set_bit(CL_ST_CHG_SUCCESS, &mdev->flags);
4456 } else {
4457 set_bit(CL_ST_CHG_FAIL, &mdev->flags);
4458 dev_err(DEV, "Requested state change failed by peer: %s (%d)\n",
4459 drbd_set_st_err_str(retcode), retcode);
b411b363 4460 }
e4f78ede
PR
4461 wake_up(&mdev->state_wait);
4462
81e84650 4463 return true;
b411b363
PR
4464}
4465
f19e4f8b 4466static int got_Ping(struct drbd_tconn *tconn, enum drbd_packet cmd)
b411b363 4467{
f19e4f8b 4468 return drbd_send_ping_ack(tconn);
b411b363
PR
4469
4470}
4471
f19e4f8b 4472static int got_PingAck(struct drbd_tconn *tconn, enum drbd_packet cmd)
b411b363
PR
4473{
4474 /* restore idle timeout */
2a67d8b9
PR
4475 tconn->meta.socket->sk->sk_rcvtimeo = tconn->net_conf->ping_int*HZ;
4476 if (!test_and_set_bit(GOT_PING_ACK, &tconn->flags))
4477 wake_up(&tconn->ping_wait);
b411b363 4478
81e84650 4479 return true;
b411b363
PR
4480}
4481
d8763023 4482static int got_IsInSync(struct drbd_conf *mdev, enum drbd_packet cmd)
b411b363 4483{
257d0af6 4484 struct p_block_ack *p = &mdev->tconn->meta.rbuf.block_ack;
b411b363
PR
4485 sector_t sector = be64_to_cpu(p->sector);
4486 int blksize = be32_to_cpu(p->blksize);
4487
31890f4a 4488 D_ASSERT(mdev->tconn->agreed_pro_version >= 89);
b411b363
PR
4489
4490 update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4491
1d53f09e
LE
4492 if (get_ldev(mdev)) {
4493 drbd_rs_complete_io(mdev, sector);
4494 drbd_set_in_sync(mdev, sector, blksize);
4495 /* rs_same_csums is supposed to count in units of BM_BLOCK_SIZE */
4496 mdev->rs_same_csum += (blksize >> BM_BLOCK_SHIFT);
4497 put_ldev(mdev);
4498 }
b411b363 4499 dec_rs_pending(mdev);
778f271d 4500 atomic_add(blksize >> 9, &mdev->rs_sect_in);
b411b363 4501
81e84650 4502 return true;
b411b363
PR
4503}
4504
bc9c5c41
AG
4505static int
4506validate_req_change_req_state(struct drbd_conf *mdev, u64 id, sector_t sector,
4507 struct rb_root *root, const char *func,
4508 enum drbd_req_event what, bool missing_ok)
b411b363
PR
4509{
4510 struct drbd_request *req;
4511 struct bio_and_error m;
4512
87eeee41 4513 spin_lock_irq(&mdev->tconn->req_lock);
bc9c5c41 4514 req = find_request(mdev, root, id, sector, missing_ok, func);
b411b363 4515 if (unlikely(!req)) {
87eeee41 4516 spin_unlock_irq(&mdev->tconn->req_lock);
81e84650 4517 return false;
b411b363
PR
4518 }
4519 __req_mod(req, what, &m);
87eeee41 4520 spin_unlock_irq(&mdev->tconn->req_lock);
b411b363
PR
4521
4522 if (m.bio)
4523 complete_master_bio(mdev, &m);
81e84650 4524 return true;
b411b363
PR
4525}
4526
d8763023 4527static int got_BlockAck(struct drbd_conf *mdev, enum drbd_packet cmd)
b411b363 4528{
257d0af6 4529 struct p_block_ack *p = &mdev->tconn->meta.rbuf.block_ack;
b411b363
PR
4530 sector_t sector = be64_to_cpu(p->sector);
4531 int blksize = be32_to_cpu(p->blksize);
4532 enum drbd_req_event what;
4533
4534 update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4535
579b57ed 4536 if (p->block_id == ID_SYNCER) {
b411b363
PR
4537 drbd_set_in_sync(mdev, sector, blksize);
4538 dec_rs_pending(mdev);
81e84650 4539 return true;
b411b363 4540 }
257d0af6 4541 switch (cmd) {
b411b363 4542 case P_RS_WRITE_ACK:
89e58e75 4543 D_ASSERT(mdev->tconn->net_conf->wire_protocol == DRBD_PROT_C);
8554df1c 4544 what = WRITE_ACKED_BY_PEER_AND_SIS;
b411b363
PR
4545 break;
4546 case P_WRITE_ACK:
89e58e75 4547 D_ASSERT(mdev->tconn->net_conf->wire_protocol == DRBD_PROT_C);
8554df1c 4548 what = WRITE_ACKED_BY_PEER;
b411b363
PR
4549 break;
4550 case P_RECV_ACK:
89e58e75 4551 D_ASSERT(mdev->tconn->net_conf->wire_protocol == DRBD_PROT_B);
8554df1c 4552 what = RECV_ACKED_BY_PEER;
b411b363 4553 break;
7be8da07 4554 case P_DISCARD_WRITE:
89e58e75 4555 D_ASSERT(mdev->tconn->net_conf->wire_protocol == DRBD_PROT_C);
7be8da07
AG
4556 what = DISCARD_WRITE;
4557 break;
4558 case P_RETRY_WRITE:
4559 D_ASSERT(mdev->tconn->net_conf->wire_protocol == DRBD_PROT_C);
4560 what = POSTPONE_WRITE;
b411b363
PR
4561 break;
4562 default:
4563 D_ASSERT(0);
81e84650 4564 return false;
b411b363
PR
4565 }
4566
4567 return validate_req_change_req_state(mdev, p->block_id, sector,
bc9c5c41
AG
4568 &mdev->write_requests, __func__,
4569 what, false);
b411b363
PR
4570}
4571
d8763023 4572static int got_NegAck(struct drbd_conf *mdev, enum drbd_packet cmd)
b411b363 4573{
257d0af6 4574 struct p_block_ack *p = &mdev->tconn->meta.rbuf.block_ack;
b411b363 4575 sector_t sector = be64_to_cpu(p->sector);
2deb8336 4576 int size = be32_to_cpu(p->blksize);
89e58e75
PR
4577 bool missing_ok = mdev->tconn->net_conf->wire_protocol == DRBD_PROT_A ||
4578 mdev->tconn->net_conf->wire_protocol == DRBD_PROT_B;
c3afd8f5 4579 bool found;
b411b363
PR
4580
4581 update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4582
579b57ed 4583 if (p->block_id == ID_SYNCER) {
b411b363
PR
4584 dec_rs_pending(mdev);
4585 drbd_rs_failed_io(mdev, sector, size);
81e84650 4586 return true;
b411b363 4587 }
2deb8336 4588
c3afd8f5 4589 found = validate_req_change_req_state(mdev, p->block_id, sector,
bc9c5c41 4590 &mdev->write_requests, __func__,
8554df1c 4591 NEG_ACKED, missing_ok);
c3afd8f5
AG
4592 if (!found) {
4593 /* Protocol A has no P_WRITE_ACKs, but has P_NEG_ACKs.
4594 The master bio might already be completed, therefore the
4595 request is no longer in the collision hash. */
4596 /* In Protocol B we might already have got a P_RECV_ACK
4597 but then get a P_NEG_ACK afterwards. */
4598 if (!missing_ok)
2deb8336 4599 return false;
c3afd8f5 4600 drbd_set_out_of_sync(mdev, sector, size);
2deb8336 4601 }
2deb8336 4602 return true;
b411b363
PR
4603}
4604
d8763023 4605static int got_NegDReply(struct drbd_conf *mdev, enum drbd_packet cmd)
b411b363 4606{
257d0af6 4607 struct p_block_ack *p = &mdev->tconn->meta.rbuf.block_ack;
b411b363
PR
4608 sector_t sector = be64_to_cpu(p->sector);
4609
4610 update_peer_seq(mdev, be32_to_cpu(p->seq_num));
7be8da07 4611
b411b363
PR
4612 dev_err(DEV, "Got NegDReply; Sector %llus, len %u; Fail original request.\n",
4613 (unsigned long long)sector, be32_to_cpu(p->blksize));
4614
4615 return validate_req_change_req_state(mdev, p->block_id, sector,
bc9c5c41 4616 &mdev->read_requests, __func__,
8554df1c 4617 NEG_ACKED, false);
b411b363
PR
4618}
4619
d8763023 4620static int got_NegRSDReply(struct drbd_conf *mdev, enum drbd_packet cmd)
b411b363
PR
4621{
4622 sector_t sector;
4623 int size;
257d0af6 4624 struct p_block_ack *p = &mdev->tconn->meta.rbuf.block_ack;
b411b363
PR
4625
4626 sector = be64_to_cpu(p->sector);
4627 size = be32_to_cpu(p->blksize);
b411b363
PR
4628
4629 update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4630
4631 dec_rs_pending(mdev);
4632
4633 if (get_ldev_if_state(mdev, D_FAILED)) {
4634 drbd_rs_complete_io(mdev, sector);
257d0af6 4635 switch (cmd) {
d612d309
PR
4636 case P_NEG_RS_DREPLY:
4637 drbd_rs_failed_io(mdev, sector, size);
4638 case P_RS_CANCEL:
4639 break;
4640 default:
4641 D_ASSERT(0);
4642 put_ldev(mdev);
4643 return false;
4644 }
b411b363
PR
4645 put_ldev(mdev);
4646 }
4647
81e84650 4648 return true;
b411b363
PR
4649}
4650
d8763023 4651static int got_BarrierAck(struct drbd_conf *mdev, enum drbd_packet cmd)
b411b363 4652{
257d0af6 4653 struct p_barrier_ack *p = &mdev->tconn->meta.rbuf.barrier_ack;
b411b363 4654
2f5cdd0b 4655 tl_release(mdev->tconn, p->barrier, be32_to_cpu(p->set_size));
b411b363 4656
c4752ef1
PR
4657 if (mdev->state.conn == C_AHEAD &&
4658 atomic_read(&mdev->ap_in_flight) == 0 &&
370a43e7
PR
4659 !test_and_set_bit(AHEAD_TO_SYNC_SOURCE, &mdev->current_epoch->flags)) {
4660 mdev->start_resync_timer.expires = jiffies + HZ;
4661 add_timer(&mdev->start_resync_timer);
c4752ef1
PR
4662 }
4663
81e84650 4664 return true;
b411b363
PR
4665}
4666
d8763023 4667static int got_OVResult(struct drbd_conf *mdev, enum drbd_packet cmd)
b411b363 4668{
257d0af6 4669 struct p_block_ack *p = &mdev->tconn->meta.rbuf.block_ack;
b411b363
PR
4670 struct drbd_work *w;
4671 sector_t sector;
4672 int size;
4673
4674 sector = be64_to_cpu(p->sector);
4675 size = be32_to_cpu(p->blksize);
4676
4677 update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4678
4679 if (be64_to_cpu(p->block_id) == ID_OUT_OF_SYNC)
4680 drbd_ov_oos_found(mdev, sector, size);
4681 else
4682 ov_oos_print(mdev);
4683
1d53f09e 4684 if (!get_ldev(mdev))
81e84650 4685 return true;
1d53f09e 4686
b411b363
PR
4687 drbd_rs_complete_io(mdev, sector);
4688 dec_rs_pending(mdev);
4689
ea5442af
LE
4690 --mdev->ov_left;
4691
4692 /* let's advance progress step marks only for every other megabyte */
4693 if ((mdev->ov_left & 0x200) == 0x200)
4694 drbd_advance_rs_marks(mdev, mdev->ov_left);
4695
4696 if (mdev->ov_left == 0) {
b411b363
PR
4697 w = kmalloc(sizeof(*w), GFP_NOIO);
4698 if (w) {
4699 w->cb = w_ov_finished;
a21e9298 4700 w->mdev = mdev;
e42325a5 4701 drbd_queue_work_front(&mdev->tconn->data.work, w);
b411b363
PR
4702 } else {
4703 dev_err(DEV, "kmalloc(w) failed.");
4704 ov_oos_print(mdev);
4705 drbd_resync_finished(mdev);
4706 }
4707 }
1d53f09e 4708 put_ldev(mdev);
81e84650 4709 return true;
b411b363
PR
4710}
4711
d8763023 4712static int got_skip(struct drbd_conf *mdev, enum drbd_packet cmd)
0ced55a3 4713{
81e84650 4714 return true;
0ced55a3
PR
4715}
4716
32862ec7
PR
4717static int tconn_process_done_ee(struct drbd_tconn *tconn)
4718{
082a3439
PR
4719 struct drbd_conf *mdev;
4720 int i, not_empty = 0;
32862ec7
PR
4721
4722 do {
4723 clear_bit(SIGNAL_ASENDER, &tconn->flags);
4724 flush_signals(current);
082a3439 4725 idr_for_each_entry(&tconn->volumes, mdev, i) {
e2b3032b 4726 if (drbd_process_done_ee(mdev))
082a3439
PR
4727 return 1; /* error */
4728 }
32862ec7 4729 set_bit(SIGNAL_ASENDER, &tconn->flags);
082a3439
PR
4730
4731 spin_lock_irq(&tconn->req_lock);
4732 idr_for_each_entry(&tconn->volumes, mdev, i) {
4733 not_empty = !list_empty(&mdev->done_ee);
4734 if (not_empty)
4735 break;
4736 }
4737 spin_unlock_irq(&tconn->req_lock);
32862ec7
PR
4738 } while (not_empty);
4739
4740 return 0;
4741}
4742
7201b972
AG
4743struct asender_cmd {
4744 size_t pkt_size;
a4fbda8e
PR
4745 enum mdev_or_conn fa_type; /* first argument's type */
4746 union {
4747 int (*mdev_fn)(struct drbd_conf *mdev, enum drbd_packet cmd);
4748 int (*conn_fn)(struct drbd_tconn *tconn, enum drbd_packet cmd);
4749 };
7201b972
AG
4750};
4751
4752static struct asender_cmd asender_tbl[] = {
f19e4f8b
PR
4753 [P_PING] = { sizeof(struct p_header), CONN, { .conn_fn = got_Ping } },
4754 [P_PING_ACK] = { sizeof(struct p_header), CONN, { .conn_fn = got_PingAck } },
a4fbda8e
PR
4755 [P_RECV_ACK] = { sizeof(struct p_block_ack), MDEV, { got_BlockAck } },
4756 [P_WRITE_ACK] = { sizeof(struct p_block_ack), MDEV, { got_BlockAck } },
4757 [P_RS_WRITE_ACK] = { sizeof(struct p_block_ack), MDEV, { got_BlockAck } },
4758 [P_DISCARD_WRITE] = { sizeof(struct p_block_ack), MDEV, { got_BlockAck } },
4759 [P_NEG_ACK] = { sizeof(struct p_block_ack), MDEV, { got_NegAck } },
4760 [P_NEG_DREPLY] = { sizeof(struct p_block_ack), MDEV, { got_NegDReply } },
4761 [P_NEG_RS_DREPLY] = { sizeof(struct p_block_ack), MDEV, { got_NegRSDReply } },
4762 [P_OV_RESULT] = { sizeof(struct p_block_ack), MDEV, { got_OVResult } },
4763 [P_BARRIER_ACK] = { sizeof(struct p_barrier_ack), MDEV, { got_BarrierAck } },
4764 [P_STATE_CHG_REPLY] = { sizeof(struct p_req_state_reply), MDEV, { got_RqSReply } },
4765 [P_RS_IS_IN_SYNC] = { sizeof(struct p_block_ack), MDEV, { got_IsInSync } },
4766 [P_DELAY_PROBE] = { sizeof(struct p_delay_probe93), MDEV, { got_skip } },
4767 [P_RS_CANCEL] = { sizeof(struct p_block_ack), MDEV, { got_NegRSDReply } },
e4f78ede 4768 [P_CONN_ST_CHG_REPLY]={ sizeof(struct p_req_state_reply), CONN, {.conn_fn = got_conn_RqSReply}},
a4fbda8e 4769 [P_RETRY_WRITE] = { sizeof(struct p_block_ack), MDEV, { got_BlockAck } },
7201b972
AG
4770};
4771
b411b363
PR
4772int drbd_asender(struct drbd_thread *thi)
4773{
392c8801 4774 struct drbd_tconn *tconn = thi->tconn;
32862ec7 4775 struct p_header *h = &tconn->meta.rbuf.header;
b411b363 4776 struct asender_cmd *cmd = NULL;
77351055 4777 struct packet_info pi;
257d0af6 4778 int rv;
b411b363
PR
4779 void *buf = h;
4780 int received = 0;
257d0af6 4781 int expect = sizeof(struct p_header);
f36af18c 4782 int ping_timeout_active = 0;
b411b363 4783
b411b363
PR
4784 current->policy = SCHED_RR; /* Make this a realtime task! */
4785 current->rt_priority = 2; /* more important than all other tasks */
4786
e77a0a5c 4787 while (get_t_state(thi) == RUNNING) {
80822284 4788 drbd_thread_current_set_cpu(thi);
32862ec7 4789 if (test_and_clear_bit(SEND_PING, &tconn->flags)) {
2a67d8b9 4790 if (!drbd_send_ping(tconn)) {
32862ec7 4791 conn_err(tconn, "drbd_send_ping has failed\n");
841ce241
AG
4792 goto reconnect;
4793 }
32862ec7
PR
4794 tconn->meta.socket->sk->sk_rcvtimeo =
4795 tconn->net_conf->ping_timeo*HZ/10;
f36af18c 4796 ping_timeout_active = 1;
b411b363
PR
4797 }
4798
32862ec7
PR
4799 /* TODO: conditionally cork; it may hurt latency if we cork without
4800 much to send */
4801 if (!tconn->net_conf->no_cork)
4802 drbd_tcp_cork(tconn->meta.socket);
082a3439
PR
4803 if (tconn_process_done_ee(tconn)) {
4804 conn_err(tconn, "tconn_process_done_ee() failed\n");
32862ec7 4805 goto reconnect;
082a3439 4806 }
b411b363 4807 /* but unconditionally uncork unless disabled */
32862ec7
PR
4808 if (!tconn->net_conf->no_cork)
4809 drbd_tcp_uncork(tconn->meta.socket);
b411b363
PR
4810
4811 /* short circuit, recv_msg would return EINTR anyways. */
4812 if (signal_pending(current))
4813 continue;
4814
32862ec7
PR
4815 rv = drbd_recv_short(tconn->meta.socket, buf, expect-received, 0);
4816 clear_bit(SIGNAL_ASENDER, &tconn->flags);
b411b363
PR
4817
4818 flush_signals(current);
4819
4820 /* Note:
4821 * -EINTR (on meta) we got a signal
4822 * -EAGAIN (on meta) rcvtimeo expired
4823 * -ECONNRESET other side closed the connection
4824 * -ERESTARTSYS (on data) we got a signal
4825 * rv < 0 other than above: unexpected error!
4826 * rv == expected: full header or command
4827 * rv < expected: "woken" by signal during receive
4828 * rv == 0 : "connection shut down by peer"
4829 */
4830 if (likely(rv > 0)) {
4831 received += rv;
4832 buf += rv;
4833 } else if (rv == 0) {
32862ec7 4834 conn_err(tconn, "meta connection shut down by peer.\n");
b411b363
PR
4835 goto reconnect;
4836 } else if (rv == -EAGAIN) {
cb6518cb
LE
4837 /* If the data socket received something meanwhile,
4838 * that is good enough: peer is still alive. */
32862ec7
PR
4839 if (time_after(tconn->last_received,
4840 jiffies - tconn->meta.socket->sk->sk_rcvtimeo))
cb6518cb 4841 continue;
f36af18c 4842 if (ping_timeout_active) {
32862ec7 4843 conn_err(tconn, "PingAck did not arrive in time.\n");
b411b363
PR
4844 goto reconnect;
4845 }
32862ec7 4846 set_bit(SEND_PING, &tconn->flags);
b411b363
PR
4847 continue;
4848 } else if (rv == -EINTR) {
4849 continue;
4850 } else {
32862ec7 4851 conn_err(tconn, "sock_recvmsg returned %d\n", rv);
b411b363
PR
4852 goto reconnect;
4853 }
4854
4855 if (received == expect && cmd == NULL) {
8172f3e9 4856 if (decode_header(tconn, h, &pi))
b411b363 4857 goto reconnect;
7201b972
AG
4858 cmd = &asender_tbl[pi.cmd];
4859 if (pi.cmd >= ARRAY_SIZE(asender_tbl) || !cmd) {
32862ec7 4860 conn_err(tconn, "unknown command %d on meta (l: %d)\n",
77351055 4861 pi.cmd, pi.size);
b411b363
PR
4862 goto disconnect;
4863 }
4864 expect = cmd->pkt_size;
77351055 4865 if (pi.size != expect - sizeof(struct p_header)) {
32862ec7 4866 conn_err(tconn, "Wrong packet size on meta (c: %d, l: %d)\n",
77351055 4867 pi.cmd, pi.size);
b411b363 4868 goto reconnect;
257d0af6 4869 }
b411b363
PR
4870 }
4871 if (received == expect) {
a4fbda8e
PR
4872 bool rv;
4873
4874 if (cmd->fa_type == CONN) {
4875 rv = cmd->conn_fn(tconn, pi.cmd);
4876 } else {
4877 struct drbd_conf *mdev = vnr_to_mdev(tconn, pi.vnr);
4878 rv = cmd->mdev_fn(mdev, pi.cmd);
4879 }
4880
4881 if (!rv)
b411b363
PR
4882 goto reconnect;
4883
a4fbda8e
PR
4884 tconn->last_received = jiffies;
4885
f36af18c
LE
4886 /* the idle_timeout (ping-int)
4887 * has been restored in got_PingAck() */
7201b972 4888 if (cmd == &asender_tbl[P_PING_ACK])
f36af18c
LE
4889 ping_timeout_active = 0;
4890
b411b363
PR
4891 buf = h;
4892 received = 0;
257d0af6 4893 expect = sizeof(struct p_header);
b411b363
PR
4894 cmd = NULL;
4895 }
4896 }
4897
4898 if (0) {
4899reconnect:
bbeb641c 4900 conn_request_state(tconn, NS(conn, C_NETWORK_FAILURE), CS_HARD);
b411b363
PR
4901 }
4902 if (0) {
4903disconnect:
bbeb641c 4904 conn_request_state(tconn, NS(conn, C_DISCONNECTING), CS_HARD);
b411b363 4905 }
32862ec7 4906 clear_bit(SIGNAL_ASENDER, &tconn->flags);
b411b363 4907
32862ec7 4908 conn_info(tconn, "asender terminated\n");
b411b363
PR
4909
4910 return 0;
4911}
This page took 0.413202 seconds and 5 git commands to generate.