drbd: Rename drbd_endio_{pri,sec} -> drbd_{,peer_}request_endio
[deliverable/linux.git] / drivers / block / drbd / drbd_receiver.c
CommitLineData
b411b363
PR
1/*
2 drbd_receiver.c
3
4 This file is part of DRBD by Philipp Reisner and Lars Ellenberg.
5
6 Copyright (C) 2001-2008, LINBIT Information Technologies GmbH.
7 Copyright (C) 1999-2008, Philipp Reisner <philipp.reisner@linbit.com>.
8 Copyright (C) 2002-2008, Lars Ellenberg <lars.ellenberg@linbit.com>.
9
10 drbd is free software; you can redistribute it and/or modify
11 it under the terms of the GNU General Public License as published by
12 the Free Software Foundation; either version 2, or (at your option)
13 any later version.
14
15 drbd is distributed in the hope that it will be useful,
16 but WITHOUT ANY WARRANTY; without even the implied warranty of
17 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18 GNU General Public License for more details.
19
20 You should have received a copy of the GNU General Public License
21 along with drbd; see the file COPYING. If not, write to
22 the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
23 */
24
25
b411b363
PR
26#include <linux/module.h>
27
28#include <asm/uaccess.h>
29#include <net/sock.h>
30
b411b363
PR
31#include <linux/drbd.h>
32#include <linux/fs.h>
33#include <linux/file.h>
34#include <linux/in.h>
35#include <linux/mm.h>
36#include <linux/memcontrol.h>
37#include <linux/mm_inline.h>
38#include <linux/slab.h>
b411b363
PR
39#include <linux/pkt_sched.h>
40#define __KERNEL_SYSCALLS__
41#include <linux/unistd.h>
42#include <linux/vmalloc.h>
43#include <linux/random.h>
b411b363
PR
44#include <linux/string.h>
45#include <linux/scatterlist.h>
46#include "drbd_int.h"
b411b363
PR
47#include "drbd_req.h"
48
49#include "drbd_vli.h"
50
77351055
PR
51struct packet_info {
52 enum drbd_packet cmd;
53 int size;
54 int vnr;
55};
56
b411b363
PR
57enum finish_epoch {
58 FE_STILL_LIVE,
59 FE_DESTROYED,
60 FE_RECYCLED,
61};
62
65d11ed6 63static int drbd_do_handshake(struct drbd_tconn *tconn);
13e6037d 64static int drbd_do_auth(struct drbd_tconn *tconn);
360cc740 65static int drbd_disconnected(int vnr, void *p, void *data);
b411b363
PR
66
67static enum finish_epoch drbd_may_finish_epoch(struct drbd_conf *, struct drbd_epoch *, enum epoch_event);
00d56944 68static int e_end_block(struct drbd_work *, int);
b411b363 69
b411b363
PR
70
71#define GFP_TRY (__GFP_HIGHMEM | __GFP_NOWARN)
72
45bb912b
LE
73/*
74 * some helper functions to deal with single linked page lists,
75 * page->private being our "next" pointer.
76 */
77
78/* If at least n pages are linked at head, get n pages off.
79 * Otherwise, don't modify head, and return NULL.
80 * Locking is the responsibility of the caller.
81 */
82static struct page *page_chain_del(struct page **head, int n)
83{
84 struct page *page;
85 struct page *tmp;
86
87 BUG_ON(!n);
88 BUG_ON(!head);
89
90 page = *head;
23ce4227
PR
91
92 if (!page)
93 return NULL;
94
45bb912b
LE
95 while (page) {
96 tmp = page_chain_next(page);
97 if (--n == 0)
98 break; /* found sufficient pages */
99 if (tmp == NULL)
100 /* insufficient pages, don't use any of them. */
101 return NULL;
102 page = tmp;
103 }
104
105 /* add end of list marker for the returned list */
106 set_page_private(page, 0);
107 /* actual return value, and adjustment of head */
108 page = *head;
109 *head = tmp;
110 return page;
111}
112
113/* may be used outside of locks to find the tail of a (usually short)
114 * "private" page chain, before adding it back to a global chain head
115 * with page_chain_add() under a spinlock. */
116static struct page *page_chain_tail(struct page *page, int *len)
117{
118 struct page *tmp;
119 int i = 1;
120 while ((tmp = page_chain_next(page)))
121 ++i, page = tmp;
122 if (len)
123 *len = i;
124 return page;
125}
126
127static int page_chain_free(struct page *page)
128{
129 struct page *tmp;
130 int i = 0;
131 page_chain_for_each_safe(page, tmp) {
132 put_page(page);
133 ++i;
134 }
135 return i;
136}
137
138static void page_chain_add(struct page **head,
139 struct page *chain_first, struct page *chain_last)
140{
141#if 1
142 struct page *tmp;
143 tmp = page_chain_tail(chain_first, NULL);
144 BUG_ON(tmp != chain_last);
145#endif
146
147 /* add chain to head */
148 set_page_private(chain_last, (unsigned long)*head);
149 *head = chain_first;
150}
151
152static struct page *drbd_pp_first_pages_or_try_alloc(struct drbd_conf *mdev, int number)
b411b363
PR
153{
154 struct page *page = NULL;
45bb912b
LE
155 struct page *tmp = NULL;
156 int i = 0;
b411b363
PR
157
158 /* Yes, testing drbd_pp_vacant outside the lock is racy.
159 * So what. It saves a spin_lock. */
45bb912b 160 if (drbd_pp_vacant >= number) {
b411b363 161 spin_lock(&drbd_pp_lock);
45bb912b
LE
162 page = page_chain_del(&drbd_pp_pool, number);
163 if (page)
164 drbd_pp_vacant -= number;
b411b363 165 spin_unlock(&drbd_pp_lock);
45bb912b
LE
166 if (page)
167 return page;
b411b363 168 }
45bb912b 169
b411b363
PR
170 /* GFP_TRY, because we must not cause arbitrary write-out: in a DRBD
171 * "criss-cross" setup, that might cause write-out on some other DRBD,
172 * which in turn might block on the other node at this very place. */
45bb912b
LE
173 for (i = 0; i < number; i++) {
174 tmp = alloc_page(GFP_TRY);
175 if (!tmp)
176 break;
177 set_page_private(tmp, (unsigned long)page);
178 page = tmp;
179 }
180
181 if (i == number)
182 return page;
183
184 /* Not enough pages immediately available this time.
185 * No need to jump around here, drbd_pp_alloc will retry this
186 * function "soon". */
187 if (page) {
188 tmp = page_chain_tail(page, NULL);
189 spin_lock(&drbd_pp_lock);
190 page_chain_add(&drbd_pp_pool, page, tmp);
191 drbd_pp_vacant += i;
192 spin_unlock(&drbd_pp_lock);
193 }
194 return NULL;
b411b363
PR
195}
196
b411b363
PR
197static void reclaim_net_ee(struct drbd_conf *mdev, struct list_head *to_be_freed)
198{
db830c46 199 struct drbd_peer_request *peer_req;
b411b363
PR
200 struct list_head *le, *tle;
201
202 /* The EEs are always appended to the end of the list. Since
203 they are sent in order over the wire, they have to finish
204 in order. As soon as we see the first not finished we can
205 stop to examine the list... */
206
207 list_for_each_safe(le, tle, &mdev->net_ee) {
db830c46
AG
208 peer_req = list_entry(le, struct drbd_peer_request, w.list);
209 if (drbd_ee_has_active_page(peer_req))
b411b363
PR
210 break;
211 list_move(le, to_be_freed);
212 }
213}
214
215static void drbd_kick_lo_and_reclaim_net(struct drbd_conf *mdev)
216{
217 LIST_HEAD(reclaimed);
db830c46 218 struct drbd_peer_request *peer_req, *t;
b411b363 219
87eeee41 220 spin_lock_irq(&mdev->tconn->req_lock);
b411b363 221 reclaim_net_ee(mdev, &reclaimed);
87eeee41 222 spin_unlock_irq(&mdev->tconn->req_lock);
b411b363 223
db830c46
AG
224 list_for_each_entry_safe(peer_req, t, &reclaimed, w.list)
225 drbd_free_net_ee(mdev, peer_req);
b411b363
PR
226}
227
228/**
45bb912b 229 * drbd_pp_alloc() - Returns @number pages, retries forever (or until signalled)
b411b363 230 * @mdev: DRBD device.
45bb912b
LE
231 * @number: number of pages requested
232 * @retry: whether to retry, if not enough pages are available right now
233 *
234 * Tries to allocate number pages, first from our own page pool, then from
235 * the kernel, unless this allocation would exceed the max_buffers setting.
236 * Possibly retry until DRBD frees sufficient pages somewhere else.
b411b363 237 *
45bb912b 238 * Returns a page chain linked via page->private.
b411b363 239 */
45bb912b 240static struct page *drbd_pp_alloc(struct drbd_conf *mdev, unsigned number, bool retry)
b411b363
PR
241{
242 struct page *page = NULL;
243 DEFINE_WAIT(wait);
244
45bb912b
LE
245 /* Yes, we may run up to @number over max_buffers. If we
246 * follow it strictly, the admin will get it wrong anyways. */
89e58e75 247 if (atomic_read(&mdev->pp_in_use) < mdev->tconn->net_conf->max_buffers)
45bb912b 248 page = drbd_pp_first_pages_or_try_alloc(mdev, number);
b411b363 249
45bb912b 250 while (page == NULL) {
b411b363
PR
251 prepare_to_wait(&drbd_pp_wait, &wait, TASK_INTERRUPTIBLE);
252
253 drbd_kick_lo_and_reclaim_net(mdev);
254
89e58e75 255 if (atomic_read(&mdev->pp_in_use) < mdev->tconn->net_conf->max_buffers) {
45bb912b 256 page = drbd_pp_first_pages_or_try_alloc(mdev, number);
b411b363
PR
257 if (page)
258 break;
259 }
260
261 if (!retry)
262 break;
263
264 if (signal_pending(current)) {
265 dev_warn(DEV, "drbd_pp_alloc interrupted!\n");
266 break;
267 }
268
269 schedule();
270 }
271 finish_wait(&drbd_pp_wait, &wait);
272
45bb912b
LE
273 if (page)
274 atomic_add(number, &mdev->pp_in_use);
b411b363
PR
275 return page;
276}
277
278/* Must not be used from irq, as that may deadlock: see drbd_pp_alloc.
87eeee41 279 * Is also used from inside an other spin_lock_irq(&mdev->tconn->req_lock);
45bb912b
LE
280 * Either links the page chain back to the global pool,
281 * or returns all pages to the system. */
435f0740 282static void drbd_pp_free(struct drbd_conf *mdev, struct page *page, int is_net)
b411b363 283{
435f0740 284 atomic_t *a = is_net ? &mdev->pp_in_use_by_net : &mdev->pp_in_use;
b411b363 285 int i;
435f0740 286
1816a2b4 287 if (drbd_pp_vacant > (DRBD_MAX_BIO_SIZE/PAGE_SIZE)*minor_count)
45bb912b
LE
288 i = page_chain_free(page);
289 else {
290 struct page *tmp;
291 tmp = page_chain_tail(page, &i);
292 spin_lock(&drbd_pp_lock);
293 page_chain_add(&drbd_pp_pool, page, tmp);
294 drbd_pp_vacant += i;
295 spin_unlock(&drbd_pp_lock);
b411b363 296 }
435f0740 297 i = atomic_sub_return(i, a);
45bb912b 298 if (i < 0)
435f0740
LE
299 dev_warn(DEV, "ASSERTION FAILED: %s: %d < 0\n",
300 is_net ? "pp_in_use_by_net" : "pp_in_use", i);
b411b363
PR
301 wake_up(&drbd_pp_wait);
302}
303
304/*
305You need to hold the req_lock:
306 _drbd_wait_ee_list_empty()
307
308You must not have the req_lock:
309 drbd_free_ee()
310 drbd_alloc_ee()
311 drbd_init_ee()
312 drbd_release_ee()
313 drbd_ee_fix_bhs()
314 drbd_process_done_ee()
315 drbd_clear_done_ee()
316 drbd_wait_ee_list_empty()
317*/
318
f6ffca9f
AG
319struct drbd_peer_request *
320drbd_alloc_ee(struct drbd_conf *mdev, u64 id, sector_t sector,
321 unsigned int data_size, gfp_t gfp_mask) __must_hold(local)
b411b363 322{
db830c46 323 struct drbd_peer_request *peer_req;
b411b363 324 struct page *page;
45bb912b 325 unsigned nr_pages = (data_size + PAGE_SIZE -1) >> PAGE_SHIFT;
b411b363 326
0cf9d27e 327 if (drbd_insert_fault(mdev, DRBD_FAULT_AL_EE))
b411b363
PR
328 return NULL;
329
db830c46
AG
330 peer_req = mempool_alloc(drbd_ee_mempool, gfp_mask & ~__GFP_HIGHMEM);
331 if (!peer_req) {
b411b363
PR
332 if (!(gfp_mask & __GFP_NOWARN))
333 dev_err(DEV, "alloc_ee: Allocation of an EE failed\n");
334 return NULL;
335 }
336
45bb912b
LE
337 page = drbd_pp_alloc(mdev, nr_pages, (gfp_mask & __GFP_WAIT));
338 if (!page)
339 goto fail;
b411b363 340
db830c46
AG
341 drbd_clear_interval(&peer_req->i);
342 peer_req->i.size = data_size;
343 peer_req->i.sector = sector;
344 peer_req->i.local = false;
345 peer_req->i.waiting = false;
346
347 peer_req->epoch = NULL;
a21e9298 348 peer_req->w.mdev = mdev;
db830c46
AG
349 peer_req->pages = page;
350 atomic_set(&peer_req->pending_bios, 0);
351 peer_req->flags = 0;
9a8e7753
AG
352 /*
353 * The block_id is opaque to the receiver. It is not endianness
354 * converted, and sent back to the sender unchanged.
355 */
db830c46 356 peer_req->block_id = id;
b411b363 357
db830c46 358 return peer_req;
b411b363 359
45bb912b 360 fail:
db830c46 361 mempool_free(peer_req, drbd_ee_mempool);
b411b363
PR
362 return NULL;
363}
364
db830c46 365void drbd_free_some_ee(struct drbd_conf *mdev, struct drbd_peer_request *peer_req,
f6ffca9f 366 int is_net)
b411b363 367{
db830c46
AG
368 if (peer_req->flags & EE_HAS_DIGEST)
369 kfree(peer_req->digest);
370 drbd_pp_free(mdev, peer_req->pages, is_net);
371 D_ASSERT(atomic_read(&peer_req->pending_bios) == 0);
372 D_ASSERT(drbd_interval_empty(&peer_req->i));
373 mempool_free(peer_req, drbd_ee_mempool);
b411b363
PR
374}
375
376int drbd_release_ee(struct drbd_conf *mdev, struct list_head *list)
377{
378 LIST_HEAD(work_list);
db830c46 379 struct drbd_peer_request *peer_req, *t;
b411b363 380 int count = 0;
435f0740 381 int is_net = list == &mdev->net_ee;
b411b363 382
87eeee41 383 spin_lock_irq(&mdev->tconn->req_lock);
b411b363 384 list_splice_init(list, &work_list);
87eeee41 385 spin_unlock_irq(&mdev->tconn->req_lock);
b411b363 386
db830c46
AG
387 list_for_each_entry_safe(peer_req, t, &work_list, w.list) {
388 drbd_free_some_ee(mdev, peer_req, is_net);
b411b363
PR
389 count++;
390 }
391 return count;
392}
393
394
32862ec7 395/* See also comments in _req_mod(,BARRIER_ACKED)
b411b363
PR
396 * and receive_Barrier.
397 *
398 * Move entries from net_ee to done_ee, if ready.
399 * Grab done_ee, call all callbacks, free the entries.
400 * The callbacks typically send out ACKs.
401 */
402static int drbd_process_done_ee(struct drbd_conf *mdev)
403{
404 LIST_HEAD(work_list);
405 LIST_HEAD(reclaimed);
db830c46 406 struct drbd_peer_request *peer_req, *t;
b411b363
PR
407 int ok = (mdev->state.conn >= C_WF_REPORT_PARAMS);
408
87eeee41 409 spin_lock_irq(&mdev->tconn->req_lock);
b411b363
PR
410 reclaim_net_ee(mdev, &reclaimed);
411 list_splice_init(&mdev->done_ee, &work_list);
87eeee41 412 spin_unlock_irq(&mdev->tconn->req_lock);
b411b363 413
db830c46
AG
414 list_for_each_entry_safe(peer_req, t, &reclaimed, w.list)
415 drbd_free_net_ee(mdev, peer_req);
b411b363
PR
416
417 /* possible callbacks here:
418 * e_end_block, and e_end_resync_block, e_send_discard_ack.
419 * all ignore the last argument.
420 */
db830c46 421 list_for_each_entry_safe(peer_req, t, &work_list, w.list) {
b411b363 422 /* list_del not necessary, next/prev members not touched */
00d56944 423 ok = peer_req->w.cb(&peer_req->w, !ok) && ok;
db830c46 424 drbd_free_ee(mdev, peer_req);
b411b363
PR
425 }
426 wake_up(&mdev->ee_wait);
427
428 return ok;
429}
430
431void _drbd_wait_ee_list_empty(struct drbd_conf *mdev, struct list_head *head)
432{
433 DEFINE_WAIT(wait);
434
435 /* avoids spin_lock/unlock
436 * and calling prepare_to_wait in the fast path */
437 while (!list_empty(head)) {
438 prepare_to_wait(&mdev->ee_wait, &wait, TASK_UNINTERRUPTIBLE);
87eeee41 439 spin_unlock_irq(&mdev->tconn->req_lock);
7eaceacc 440 io_schedule();
b411b363 441 finish_wait(&mdev->ee_wait, &wait);
87eeee41 442 spin_lock_irq(&mdev->tconn->req_lock);
b411b363
PR
443 }
444}
445
446void drbd_wait_ee_list_empty(struct drbd_conf *mdev, struct list_head *head)
447{
87eeee41 448 spin_lock_irq(&mdev->tconn->req_lock);
b411b363 449 _drbd_wait_ee_list_empty(mdev, head);
87eeee41 450 spin_unlock_irq(&mdev->tconn->req_lock);
b411b363
PR
451}
452
453/* see also kernel_accept; which is only present since 2.6.18.
454 * also we want to log which part of it failed, exactly */
7653620d 455static int drbd_accept(const char **what, struct socket *sock, struct socket **newsock)
b411b363
PR
456{
457 struct sock *sk = sock->sk;
458 int err = 0;
459
460 *what = "listen";
461 err = sock->ops->listen(sock, 5);
462 if (err < 0)
463 goto out;
464
465 *what = "sock_create_lite";
466 err = sock_create_lite(sk->sk_family, sk->sk_type, sk->sk_protocol,
467 newsock);
468 if (err < 0)
469 goto out;
470
471 *what = "accept";
472 err = sock->ops->accept(sock, *newsock, 0);
473 if (err < 0) {
474 sock_release(*newsock);
475 *newsock = NULL;
476 goto out;
477 }
478 (*newsock)->ops = sock->ops;
479
480out:
481 return err;
482}
483
dbd9eea0 484static int drbd_recv_short(struct socket *sock, void *buf, size_t size, int flags)
b411b363
PR
485{
486 mm_segment_t oldfs;
487 struct kvec iov = {
488 .iov_base = buf,
489 .iov_len = size,
490 };
491 struct msghdr msg = {
492 .msg_iovlen = 1,
493 .msg_iov = (struct iovec *)&iov,
494 .msg_flags = (flags ? flags : MSG_WAITALL | MSG_NOSIGNAL)
495 };
496 int rv;
497
498 oldfs = get_fs();
499 set_fs(KERNEL_DS);
500 rv = sock_recvmsg(sock, &msg, size, msg.msg_flags);
501 set_fs(oldfs);
502
503 return rv;
504}
505
de0ff338 506static int drbd_recv(struct drbd_tconn *tconn, void *buf, size_t size)
b411b363
PR
507{
508 mm_segment_t oldfs;
509 struct kvec iov = {
510 .iov_base = buf,
511 .iov_len = size,
512 };
513 struct msghdr msg = {
514 .msg_iovlen = 1,
515 .msg_iov = (struct iovec *)&iov,
516 .msg_flags = MSG_WAITALL | MSG_NOSIGNAL
517 };
518 int rv;
519
520 oldfs = get_fs();
521 set_fs(KERNEL_DS);
522
523 for (;;) {
de0ff338 524 rv = sock_recvmsg(tconn->data.socket, &msg, size, msg.msg_flags);
b411b363
PR
525 if (rv == size)
526 break;
527
528 /* Note:
529 * ECONNRESET other side closed the connection
530 * ERESTARTSYS (on sock) we got a signal
531 */
532
533 if (rv < 0) {
534 if (rv == -ECONNRESET)
de0ff338 535 conn_info(tconn, "sock was reset by peer\n");
b411b363 536 else if (rv != -ERESTARTSYS)
de0ff338 537 conn_err(tconn, "sock_recvmsg returned %d\n", rv);
b411b363
PR
538 break;
539 } else if (rv == 0) {
de0ff338 540 conn_info(tconn, "sock was shut down by peer\n");
b411b363
PR
541 break;
542 } else {
543 /* signal came in, or peer/link went down,
544 * after we read a partial message
545 */
546 /* D_ASSERT(signal_pending(current)); */
547 break;
548 }
549 };
550
551 set_fs(oldfs);
552
553 if (rv != size)
bbeb641c 554 conn_request_state(tconn, NS(conn, C_BROKEN_PIPE), CS_HARD);
b411b363
PR
555
556 return rv;
557}
558
5dbf1673
LE
559/* quoting tcp(7):
560 * On individual connections, the socket buffer size must be set prior to the
561 * listen(2) or connect(2) calls in order to have it take effect.
562 * This is our wrapper to do so.
563 */
564static void drbd_setbufsize(struct socket *sock, unsigned int snd,
565 unsigned int rcv)
566{
567 /* open coded SO_SNDBUF, SO_RCVBUF */
568 if (snd) {
569 sock->sk->sk_sndbuf = snd;
570 sock->sk->sk_userlocks |= SOCK_SNDBUF_LOCK;
571 }
572 if (rcv) {
573 sock->sk->sk_rcvbuf = rcv;
574 sock->sk->sk_userlocks |= SOCK_RCVBUF_LOCK;
575 }
576}
577
eac3e990 578static struct socket *drbd_try_connect(struct drbd_tconn *tconn)
b411b363
PR
579{
580 const char *what;
581 struct socket *sock;
582 struct sockaddr_in6 src_in6;
583 int err;
584 int disconnect_on_error = 1;
585
eac3e990 586 if (!get_net_conf(tconn))
b411b363
PR
587 return NULL;
588
589 what = "sock_create_kern";
eac3e990 590 err = sock_create_kern(((struct sockaddr *)tconn->net_conf->my_addr)->sa_family,
b411b363
PR
591 SOCK_STREAM, IPPROTO_TCP, &sock);
592 if (err < 0) {
593 sock = NULL;
594 goto out;
595 }
596
597 sock->sk->sk_rcvtimeo =
eac3e990
PR
598 sock->sk->sk_sndtimeo = tconn->net_conf->try_connect_int*HZ;
599 drbd_setbufsize(sock, tconn->net_conf->sndbuf_size,
600 tconn->net_conf->rcvbuf_size);
b411b363
PR
601
602 /* explicitly bind to the configured IP as source IP
603 * for the outgoing connections.
604 * This is needed for multihomed hosts and to be
605 * able to use lo: interfaces for drbd.
606 * Make sure to use 0 as port number, so linux selects
607 * a free one dynamically.
608 */
eac3e990
PR
609 memcpy(&src_in6, tconn->net_conf->my_addr,
610 min_t(int, tconn->net_conf->my_addr_len, sizeof(src_in6)));
611 if (((struct sockaddr *)tconn->net_conf->my_addr)->sa_family == AF_INET6)
b411b363
PR
612 src_in6.sin6_port = 0;
613 else
614 ((struct sockaddr_in *)&src_in6)->sin_port = 0; /* AF_INET & AF_SCI */
615
616 what = "bind before connect";
617 err = sock->ops->bind(sock,
618 (struct sockaddr *) &src_in6,
eac3e990 619 tconn->net_conf->my_addr_len);
b411b363
PR
620 if (err < 0)
621 goto out;
622
623 /* connect may fail, peer not yet available.
624 * stay C_WF_CONNECTION, don't go Disconnecting! */
625 disconnect_on_error = 0;
626 what = "connect";
627 err = sock->ops->connect(sock,
eac3e990
PR
628 (struct sockaddr *)tconn->net_conf->peer_addr,
629 tconn->net_conf->peer_addr_len, 0);
b411b363
PR
630
631out:
632 if (err < 0) {
633 if (sock) {
634 sock_release(sock);
635 sock = NULL;
636 }
637 switch (-err) {
638 /* timeout, busy, signal pending */
639 case ETIMEDOUT: case EAGAIN: case EINPROGRESS:
640 case EINTR: case ERESTARTSYS:
641 /* peer not (yet) available, network problem */
642 case ECONNREFUSED: case ENETUNREACH:
643 case EHOSTDOWN: case EHOSTUNREACH:
644 disconnect_on_error = 0;
645 break;
646 default:
eac3e990 647 conn_err(tconn, "%s failed, err = %d\n", what, err);
b411b363
PR
648 }
649 if (disconnect_on_error)
bbeb641c 650 conn_request_state(tconn, NS(conn, C_DISCONNECTING), CS_HARD);
b411b363 651 }
eac3e990 652 put_net_conf(tconn);
b411b363
PR
653 return sock;
654}
655
7653620d 656static struct socket *drbd_wait_for_connect(struct drbd_tconn *tconn)
b411b363
PR
657{
658 int timeo, err;
659 struct socket *s_estab = NULL, *s_listen;
660 const char *what;
661
7653620d 662 if (!get_net_conf(tconn))
b411b363
PR
663 return NULL;
664
665 what = "sock_create_kern";
7653620d 666 err = sock_create_kern(((struct sockaddr *)tconn->net_conf->my_addr)->sa_family,
b411b363
PR
667 SOCK_STREAM, IPPROTO_TCP, &s_listen);
668 if (err) {
669 s_listen = NULL;
670 goto out;
671 }
672
7653620d 673 timeo = tconn->net_conf->try_connect_int * HZ;
b411b363
PR
674 timeo += (random32() & 1) ? timeo / 7 : -timeo / 7; /* 28.5% random jitter */
675
676 s_listen->sk->sk_reuse = 1; /* SO_REUSEADDR */
677 s_listen->sk->sk_rcvtimeo = timeo;
678 s_listen->sk->sk_sndtimeo = timeo;
7653620d
PR
679 drbd_setbufsize(s_listen, tconn->net_conf->sndbuf_size,
680 tconn->net_conf->rcvbuf_size);
b411b363
PR
681
682 what = "bind before listen";
683 err = s_listen->ops->bind(s_listen,
7653620d
PR
684 (struct sockaddr *) tconn->net_conf->my_addr,
685 tconn->net_conf->my_addr_len);
b411b363
PR
686 if (err < 0)
687 goto out;
688
7653620d 689 err = drbd_accept(&what, s_listen, &s_estab);
b411b363
PR
690
691out:
692 if (s_listen)
693 sock_release(s_listen);
694 if (err < 0) {
695 if (err != -EAGAIN && err != -EINTR && err != -ERESTARTSYS) {
7653620d 696 conn_err(tconn, "%s failed, err = %d\n", what, err);
bbeb641c 697 conn_request_state(tconn, NS(conn, C_DISCONNECTING), CS_HARD);
b411b363
PR
698 }
699 }
7653620d 700 put_net_conf(tconn);
b411b363
PR
701
702 return s_estab;
703}
704
d38e787e 705static int drbd_send_fp(struct drbd_tconn *tconn, struct socket *sock, enum drbd_packet cmd)
b411b363 706{
d38e787e 707 struct p_header *h = &tconn->data.sbuf.header;
b411b363 708
d38e787e 709 return _conn_send_cmd(tconn, 0, sock, cmd, h, sizeof(*h), 0);
b411b363
PR
710}
711
a25b63f1 712static enum drbd_packet drbd_recv_fp(struct drbd_tconn *tconn, struct socket *sock)
b411b363 713{
a25b63f1 714 struct p_header80 *h = &tconn->data.rbuf.header.h80;
b411b363
PR
715 int rr;
716
dbd9eea0 717 rr = drbd_recv_short(sock, h, sizeof(*h), 0);
b411b363 718
ca9bc12b 719 if (rr == sizeof(*h) && h->magic == cpu_to_be32(DRBD_MAGIC))
b411b363
PR
720 return be16_to_cpu(h->command);
721
722 return 0xffff;
723}
724
725/**
726 * drbd_socket_okay() - Free the socket if its connection is not okay
b411b363
PR
727 * @sock: pointer to the pointer to the socket.
728 */
dbd9eea0 729static int drbd_socket_okay(struct socket **sock)
b411b363
PR
730{
731 int rr;
732 char tb[4];
733
734 if (!*sock)
81e84650 735 return false;
b411b363 736
dbd9eea0 737 rr = drbd_recv_short(*sock, tb, 4, MSG_DONTWAIT | MSG_PEEK);
b411b363
PR
738
739 if (rr > 0 || rr == -EAGAIN) {
81e84650 740 return true;
b411b363
PR
741 } else {
742 sock_release(*sock);
743 *sock = NULL;
81e84650 744 return false;
b411b363
PR
745 }
746}
747
907599e0
PR
748static int drbd_connected(int vnr, void *p, void *data)
749{
750 struct drbd_conf *mdev = (struct drbd_conf *)p;
751 int ok = 1;
752
753 atomic_set(&mdev->packet_seq, 0);
754 mdev->peer_seq = 0;
755
8410da8f
PR
756 mdev->state_mutex = mdev->tconn->agreed_pro_version < 100 ?
757 &mdev->tconn->cstate_mutex :
758 &mdev->own_state_mutex;
759
907599e0
PR
760 ok &= drbd_send_sync_param(mdev, &mdev->sync_conf);
761 ok &= drbd_send_sizes(mdev, 0, 0);
762 ok &= drbd_send_uuids(mdev);
763 ok &= drbd_send_state(mdev);
764 clear_bit(USE_DEGR_WFC_T, &mdev->flags);
765 clear_bit(RESIZE_PENDING, &mdev->flags);
766
8410da8f 767
907599e0
PR
768 return !ok;
769}
770
b411b363
PR
771/*
772 * return values:
773 * 1 yes, we have a valid connection
774 * 0 oops, did not work out, please try again
775 * -1 peer talks different language,
776 * no point in trying again, please go standalone.
777 * -2 We do not have a network config...
778 */
907599e0 779static int drbd_connect(struct drbd_tconn *tconn)
b411b363
PR
780{
781 struct socket *s, *sock, *msock;
782 int try, h, ok;
783
bbeb641c 784 if (conn_request_state(tconn, NS(conn, C_WF_CONNECTION), CS_VERBOSE) < SS_SUCCESS)
b411b363
PR
785 return -2;
786
907599e0
PR
787 clear_bit(DISCARD_CONCURRENT, &tconn->flags);
788 tconn->agreed_pro_version = 99;
fd340c12
PR
789 /* agreed_pro_version must be smaller than 100 so we send the old
790 header (h80) in the first packet and in the handshake packet. */
b411b363
PR
791
792 sock = NULL;
793 msock = NULL;
794
795 do {
796 for (try = 0;;) {
797 /* 3 tries, this should take less than a second! */
907599e0 798 s = drbd_try_connect(tconn);
b411b363
PR
799 if (s || ++try >= 3)
800 break;
801 /* give the other side time to call bind() & listen() */
20ee6390 802 schedule_timeout_interruptible(HZ / 10);
b411b363
PR
803 }
804
805 if (s) {
806 if (!sock) {
907599e0 807 drbd_send_fp(tconn, s, P_HAND_SHAKE_S);
b411b363
PR
808 sock = s;
809 s = NULL;
810 } else if (!msock) {
907599e0 811 drbd_send_fp(tconn, s, P_HAND_SHAKE_M);
b411b363
PR
812 msock = s;
813 s = NULL;
814 } else {
907599e0 815 conn_err(tconn, "Logic error in drbd_connect()\n");
b411b363
PR
816 goto out_release_sockets;
817 }
818 }
819
820 if (sock && msock) {
907599e0 821 schedule_timeout_interruptible(tconn->net_conf->ping_timeo*HZ/10);
dbd9eea0
PR
822 ok = drbd_socket_okay(&sock);
823 ok = drbd_socket_okay(&msock) && ok;
b411b363
PR
824 if (ok)
825 break;
826 }
827
828retry:
907599e0 829 s = drbd_wait_for_connect(tconn);
b411b363 830 if (s) {
907599e0 831 try = drbd_recv_fp(tconn, s);
dbd9eea0
PR
832 drbd_socket_okay(&sock);
833 drbd_socket_okay(&msock);
b411b363
PR
834 switch (try) {
835 case P_HAND_SHAKE_S:
836 if (sock) {
907599e0 837 conn_warn(tconn, "initial packet S crossed\n");
b411b363
PR
838 sock_release(sock);
839 }
840 sock = s;
841 break;
842 case P_HAND_SHAKE_M:
843 if (msock) {
907599e0 844 conn_warn(tconn, "initial packet M crossed\n");
b411b363
PR
845 sock_release(msock);
846 }
847 msock = s;
907599e0 848 set_bit(DISCARD_CONCURRENT, &tconn->flags);
b411b363
PR
849 break;
850 default:
907599e0 851 conn_warn(tconn, "Error receiving initial packet\n");
b411b363
PR
852 sock_release(s);
853 if (random32() & 1)
854 goto retry;
855 }
856 }
857
bbeb641c 858 if (tconn->cstate <= C_DISCONNECTING)
b411b363
PR
859 goto out_release_sockets;
860 if (signal_pending(current)) {
861 flush_signals(current);
862 smp_rmb();
907599e0 863 if (get_t_state(&tconn->receiver) == EXITING)
b411b363
PR
864 goto out_release_sockets;
865 }
866
867 if (sock && msock) {
dbd9eea0
PR
868 ok = drbd_socket_okay(&sock);
869 ok = drbd_socket_okay(&msock) && ok;
b411b363
PR
870 if (ok)
871 break;
872 }
873 } while (1);
874
875 msock->sk->sk_reuse = 1; /* SO_REUSEADDR */
876 sock->sk->sk_reuse = 1; /* SO_REUSEADDR */
877
878 sock->sk->sk_allocation = GFP_NOIO;
879 msock->sk->sk_allocation = GFP_NOIO;
880
881 sock->sk->sk_priority = TC_PRIO_INTERACTIVE_BULK;
882 msock->sk->sk_priority = TC_PRIO_INTERACTIVE;
883
b411b363 884 /* NOT YET ...
907599e0 885 * sock->sk->sk_sndtimeo = tconn->net_conf->timeout*HZ/10;
b411b363
PR
886 * sock->sk->sk_rcvtimeo = MAX_SCHEDULE_TIMEOUT;
887 * first set it to the P_HAND_SHAKE timeout,
888 * which we set to 4x the configured ping_timeout. */
889 sock->sk->sk_sndtimeo =
907599e0 890 sock->sk->sk_rcvtimeo = tconn->net_conf->ping_timeo*4*HZ/10;
b411b363 891
907599e0
PR
892 msock->sk->sk_sndtimeo = tconn->net_conf->timeout*HZ/10;
893 msock->sk->sk_rcvtimeo = tconn->net_conf->ping_int*HZ;
b411b363
PR
894
895 /* we don't want delays.
25985edc 896 * we use TCP_CORK where appropriate, though */
b411b363
PR
897 drbd_tcp_nodelay(sock);
898 drbd_tcp_nodelay(msock);
899
907599e0
PR
900 tconn->data.socket = sock;
901 tconn->meta.socket = msock;
902 tconn->last_received = jiffies;
b411b363 903
907599e0 904 h = drbd_do_handshake(tconn);
b411b363
PR
905 if (h <= 0)
906 return h;
907
907599e0 908 if (tconn->cram_hmac_tfm) {
b411b363 909 /* drbd_request_state(mdev, NS(conn, WFAuth)); */
907599e0 910 switch (drbd_do_auth(tconn)) {
b10d96cb 911 case -1:
907599e0 912 conn_err(tconn, "Authentication of peer failed\n");
b411b363 913 return -1;
b10d96cb 914 case 0:
907599e0 915 conn_err(tconn, "Authentication of peer failed, trying again.\n");
b10d96cb 916 return 0;
b411b363
PR
917 }
918 }
919
bbeb641c 920 if (conn_request_state(tconn, NS(conn, C_WF_REPORT_PARAMS), CS_VERBOSE) < SS_SUCCESS)
b411b363
PR
921 return 0;
922
907599e0 923 sock->sk->sk_sndtimeo = tconn->net_conf->timeout*HZ/10;
b411b363
PR
924 sock->sk->sk_rcvtimeo = MAX_SCHEDULE_TIMEOUT;
925
907599e0 926 drbd_thread_start(&tconn->asender);
b411b363 927
907599e0 928 if (drbd_send_protocol(tconn) == -1)
7e2455c1 929 return -1;
b411b363 930
907599e0 931 return !idr_for_each(&tconn->volumes, drbd_connected, tconn);
b411b363
PR
932
933out_release_sockets:
934 if (sock)
935 sock_release(sock);
936 if (msock)
937 sock_release(msock);
938 return -1;
939}
940
ce243853 941static bool decode_header(struct drbd_tconn *tconn, struct p_header *h, struct packet_info *pi)
b411b363 942{
fd340c12 943 if (h->h80.magic == cpu_to_be32(DRBD_MAGIC)) {
77351055
PR
944 pi->cmd = be16_to_cpu(h->h80.command);
945 pi->size = be16_to_cpu(h->h80.length);
eefc2f7d 946 pi->vnr = 0;
ca9bc12b 947 } else if (h->h95.magic == cpu_to_be16(DRBD_MAGIC_BIG)) {
77351055
PR
948 pi->cmd = be16_to_cpu(h->h95.command);
949 pi->size = be32_to_cpu(h->h95.length) & 0x00ffffff;
950 pi->vnr = 0;
02918be2 951 } else {
ce243853 952 conn_err(tconn, "magic?? on data m: 0x%08x c: %d l: %d\n",
004352fa
LE
953 be32_to_cpu(h->h80.magic),
954 be16_to_cpu(h->h80.command),
955 be16_to_cpu(h->h80.length));
81e84650 956 return false;
b411b363 957 }
257d0af6
PR
958 return true;
959}
960
9ba7aa00 961static int drbd_recv_header(struct drbd_tconn *tconn, struct packet_info *pi)
257d0af6 962{
9ba7aa00 963 struct p_header *h = &tconn->data.rbuf.header;
257d0af6
PR
964 int r;
965
9ba7aa00 966 r = drbd_recv(tconn, h, sizeof(*h));
257d0af6
PR
967 if (unlikely(r != sizeof(*h))) {
968 if (!signal_pending(current))
9ba7aa00 969 conn_warn(tconn, "short read expecting header on sock: r=%d\n", r);
257d0af6
PR
970 return false;
971 }
972
9ba7aa00
PR
973 r = decode_header(tconn, h, pi);
974 tconn->last_received = jiffies;
b411b363 975
257d0af6 976 return r;
b411b363
PR
977}
978
2451fc3b 979static void drbd_flush(struct drbd_conf *mdev)
b411b363
PR
980{
981 int rv;
982
983 if (mdev->write_ordering >= WO_bdev_flush && get_ldev(mdev)) {
fbd9b09a 984 rv = blkdev_issue_flush(mdev->ldev->backing_bdev, GFP_KERNEL,
dd3932ed 985 NULL);
b411b363
PR
986 if (rv) {
987 dev_err(DEV, "local disk flush failed with status %d\n", rv);
988 /* would rather check on EOPNOTSUPP, but that is not reliable.
989 * don't try again for ANY return value != 0
990 * if (rv == -EOPNOTSUPP) */
991 drbd_bump_write_ordering(mdev, WO_drain_io);
992 }
993 put_ldev(mdev);
994 }
b411b363
PR
995}
996
997/**
998 * drbd_may_finish_epoch() - Applies an epoch_event to the epoch's state, eventually finishes it.
999 * @mdev: DRBD device.
1000 * @epoch: Epoch object.
1001 * @ev: Epoch event.
1002 */
1003static enum finish_epoch drbd_may_finish_epoch(struct drbd_conf *mdev,
1004 struct drbd_epoch *epoch,
1005 enum epoch_event ev)
1006{
2451fc3b 1007 int epoch_size;
b411b363 1008 struct drbd_epoch *next_epoch;
b411b363
PR
1009 enum finish_epoch rv = FE_STILL_LIVE;
1010
1011 spin_lock(&mdev->epoch_lock);
1012 do {
1013 next_epoch = NULL;
b411b363
PR
1014
1015 epoch_size = atomic_read(&epoch->epoch_size);
1016
1017 switch (ev & ~EV_CLEANUP) {
1018 case EV_PUT:
1019 atomic_dec(&epoch->active);
1020 break;
1021 case EV_GOT_BARRIER_NR:
1022 set_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags);
b411b363
PR
1023 break;
1024 case EV_BECAME_LAST:
1025 /* nothing to do*/
1026 break;
1027 }
1028
b411b363
PR
1029 if (epoch_size != 0 &&
1030 atomic_read(&epoch->active) == 0 &&
2451fc3b 1031 test_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags)) {
b411b363
PR
1032 if (!(ev & EV_CLEANUP)) {
1033 spin_unlock(&mdev->epoch_lock);
1034 drbd_send_b_ack(mdev, epoch->barrier_nr, epoch_size);
1035 spin_lock(&mdev->epoch_lock);
1036 }
1037 dec_unacked(mdev);
1038
1039 if (mdev->current_epoch != epoch) {
1040 next_epoch = list_entry(epoch->list.next, struct drbd_epoch, list);
1041 list_del(&epoch->list);
1042 ev = EV_BECAME_LAST | (ev & EV_CLEANUP);
1043 mdev->epochs--;
b411b363
PR
1044 kfree(epoch);
1045
1046 if (rv == FE_STILL_LIVE)
1047 rv = FE_DESTROYED;
1048 } else {
1049 epoch->flags = 0;
1050 atomic_set(&epoch->epoch_size, 0);
698f9315 1051 /* atomic_set(&epoch->active, 0); is already zero */
b411b363
PR
1052 if (rv == FE_STILL_LIVE)
1053 rv = FE_RECYCLED;
2451fc3b 1054 wake_up(&mdev->ee_wait);
b411b363
PR
1055 }
1056 }
1057
1058 if (!next_epoch)
1059 break;
1060
1061 epoch = next_epoch;
1062 } while (1);
1063
1064 spin_unlock(&mdev->epoch_lock);
1065
b411b363
PR
1066 return rv;
1067}
1068
1069/**
1070 * drbd_bump_write_ordering() - Fall back to an other write ordering method
1071 * @mdev: DRBD device.
1072 * @wo: Write ordering method to try.
1073 */
1074void drbd_bump_write_ordering(struct drbd_conf *mdev, enum write_ordering_e wo) __must_hold(local)
1075{
1076 enum write_ordering_e pwo;
1077 static char *write_ordering_str[] = {
1078 [WO_none] = "none",
1079 [WO_drain_io] = "drain",
1080 [WO_bdev_flush] = "flush",
b411b363
PR
1081 };
1082
1083 pwo = mdev->write_ordering;
1084 wo = min(pwo, wo);
b411b363
PR
1085 if (wo == WO_bdev_flush && mdev->ldev->dc.no_disk_flush)
1086 wo = WO_drain_io;
1087 if (wo == WO_drain_io && mdev->ldev->dc.no_disk_drain)
1088 wo = WO_none;
1089 mdev->write_ordering = wo;
2451fc3b 1090 if (pwo != mdev->write_ordering || wo == WO_bdev_flush)
b411b363
PR
1091 dev_info(DEV, "Method to ensure write ordering: %s\n", write_ordering_str[mdev->write_ordering]);
1092}
1093
45bb912b 1094/**
fbe29dec 1095 * drbd_submit_peer_request()
45bb912b 1096 * @mdev: DRBD device.
db830c46 1097 * @peer_req: peer request
45bb912b 1098 * @rw: flag field, see bio->bi_rw
10f6d992
LE
1099 *
1100 * May spread the pages to multiple bios,
1101 * depending on bio_add_page restrictions.
1102 *
1103 * Returns 0 if all bios have been submitted,
1104 * -ENOMEM if we could not allocate enough bios,
1105 * -ENOSPC (any better suggestion?) if we have not been able to bio_add_page a
1106 * single page to an empty bio (which should never happen and likely indicates
1107 * that the lower level IO stack is in some way broken). This has been observed
1108 * on certain Xen deployments.
45bb912b
LE
1109 */
1110/* TODO allocate from our own bio_set. */
fbe29dec
AG
1111int drbd_submit_peer_request(struct drbd_conf *mdev,
1112 struct drbd_peer_request *peer_req,
1113 const unsigned rw, const int fault_type)
45bb912b
LE
1114{
1115 struct bio *bios = NULL;
1116 struct bio *bio;
db830c46
AG
1117 struct page *page = peer_req->pages;
1118 sector_t sector = peer_req->i.sector;
1119 unsigned ds = peer_req->i.size;
45bb912b
LE
1120 unsigned n_bios = 0;
1121 unsigned nr_pages = (ds + PAGE_SIZE -1) >> PAGE_SHIFT;
10f6d992 1122 int err = -ENOMEM;
45bb912b
LE
1123
1124 /* In most cases, we will only need one bio. But in case the lower
1125 * level restrictions happen to be different at this offset on this
1126 * side than those of the sending peer, we may need to submit the
1127 * request in more than one bio. */
1128next_bio:
1129 bio = bio_alloc(GFP_NOIO, nr_pages);
1130 if (!bio) {
1131 dev_err(DEV, "submit_ee: Allocation of a bio failed\n");
1132 goto fail;
1133 }
db830c46 1134 /* > peer_req->i.sector, unless this is the first bio */
45bb912b
LE
1135 bio->bi_sector = sector;
1136 bio->bi_bdev = mdev->ldev->backing_bdev;
45bb912b 1137 bio->bi_rw = rw;
db830c46 1138 bio->bi_private = peer_req;
fcefa62e 1139 bio->bi_end_io = drbd_peer_request_endio;
45bb912b
LE
1140
1141 bio->bi_next = bios;
1142 bios = bio;
1143 ++n_bios;
1144
1145 page_chain_for_each(page) {
1146 unsigned len = min_t(unsigned, ds, PAGE_SIZE);
1147 if (!bio_add_page(bio, page, len, 0)) {
10f6d992
LE
1148 /* A single page must always be possible!
1149 * But in case it fails anyways,
1150 * we deal with it, and complain (below). */
1151 if (bio->bi_vcnt == 0) {
1152 dev_err(DEV,
1153 "bio_add_page failed for len=%u, "
1154 "bi_vcnt=0 (bi_sector=%llu)\n",
1155 len, (unsigned long long)bio->bi_sector);
1156 err = -ENOSPC;
1157 goto fail;
1158 }
45bb912b
LE
1159 goto next_bio;
1160 }
1161 ds -= len;
1162 sector += len >> 9;
1163 --nr_pages;
1164 }
1165 D_ASSERT(page == NULL);
1166 D_ASSERT(ds == 0);
1167
db830c46 1168 atomic_set(&peer_req->pending_bios, n_bios);
45bb912b
LE
1169 do {
1170 bio = bios;
1171 bios = bios->bi_next;
1172 bio->bi_next = NULL;
1173
45bb912b 1174 drbd_generic_make_request(mdev, fault_type, bio);
45bb912b 1175 } while (bios);
45bb912b
LE
1176 return 0;
1177
1178fail:
1179 while (bios) {
1180 bio = bios;
1181 bios = bios->bi_next;
1182 bio_put(bio);
1183 }
10f6d992 1184 return err;
45bb912b
LE
1185}
1186
53840641 1187static void drbd_remove_epoch_entry_interval(struct drbd_conf *mdev,
db830c46 1188 struct drbd_peer_request *peer_req)
53840641 1189{
db830c46 1190 struct drbd_interval *i = &peer_req->i;
53840641
AG
1191
1192 drbd_remove_interval(&mdev->write_requests, i);
1193 drbd_clear_interval(i);
1194
6c852bec 1195 /* Wake up any processes waiting for this peer request to complete. */
53840641
AG
1196 if (i->waiting)
1197 wake_up(&mdev->misc_wait);
1198}
1199
d8763023
AG
1200static int receive_Barrier(struct drbd_conf *mdev, enum drbd_packet cmd,
1201 unsigned int data_size)
b411b363 1202{
2451fc3b 1203 int rv;
e42325a5 1204 struct p_barrier *p = &mdev->tconn->data.rbuf.barrier;
b411b363
PR
1205 struct drbd_epoch *epoch;
1206
b411b363
PR
1207 inc_unacked(mdev);
1208
b411b363
PR
1209 mdev->current_epoch->barrier_nr = p->barrier;
1210 rv = drbd_may_finish_epoch(mdev, mdev->current_epoch, EV_GOT_BARRIER_NR);
1211
1212 /* P_BARRIER_ACK may imply that the corresponding extent is dropped from
1213 * the activity log, which means it would not be resynced in case the
1214 * R_PRIMARY crashes now.
1215 * Therefore we must send the barrier_ack after the barrier request was
1216 * completed. */
1217 switch (mdev->write_ordering) {
b411b363
PR
1218 case WO_none:
1219 if (rv == FE_RECYCLED)
81e84650 1220 return true;
2451fc3b
PR
1221
1222 /* receiver context, in the writeout path of the other node.
1223 * avoid potential distributed deadlock */
1224 epoch = kmalloc(sizeof(struct drbd_epoch), GFP_NOIO);
1225 if (epoch)
1226 break;
1227 else
1228 dev_warn(DEV, "Allocation of an epoch failed, slowing down\n");
1229 /* Fall through */
b411b363
PR
1230
1231 case WO_bdev_flush:
1232 case WO_drain_io:
b411b363 1233 drbd_wait_ee_list_empty(mdev, &mdev->active_ee);
2451fc3b
PR
1234 drbd_flush(mdev);
1235
1236 if (atomic_read(&mdev->current_epoch->epoch_size)) {
1237 epoch = kmalloc(sizeof(struct drbd_epoch), GFP_NOIO);
1238 if (epoch)
1239 break;
b411b363
PR
1240 }
1241
2451fc3b
PR
1242 epoch = mdev->current_epoch;
1243 wait_event(mdev->ee_wait, atomic_read(&epoch->epoch_size) == 0);
1244
1245 D_ASSERT(atomic_read(&epoch->active) == 0);
1246 D_ASSERT(epoch->flags == 0);
b411b363 1247
81e84650 1248 return true;
2451fc3b
PR
1249 default:
1250 dev_err(DEV, "Strangeness in mdev->write_ordering %d\n", mdev->write_ordering);
81e84650 1251 return false;
b411b363
PR
1252 }
1253
1254 epoch->flags = 0;
1255 atomic_set(&epoch->epoch_size, 0);
1256 atomic_set(&epoch->active, 0);
1257
1258 spin_lock(&mdev->epoch_lock);
1259 if (atomic_read(&mdev->current_epoch->epoch_size)) {
1260 list_add(&epoch->list, &mdev->current_epoch->list);
1261 mdev->current_epoch = epoch;
1262 mdev->epochs++;
b411b363
PR
1263 } else {
1264 /* The current_epoch got recycled while we allocated this one... */
1265 kfree(epoch);
1266 }
1267 spin_unlock(&mdev->epoch_lock);
1268
81e84650 1269 return true;
b411b363
PR
1270}
1271
1272/* used from receive_RSDataReply (recv_resync_read)
1273 * and from receive_Data */
f6ffca9f
AG
1274static struct drbd_peer_request *
1275read_in_block(struct drbd_conf *mdev, u64 id, sector_t sector,
1276 int data_size) __must_hold(local)
b411b363 1277{
6666032a 1278 const sector_t capacity = drbd_get_capacity(mdev->this_bdev);
db830c46 1279 struct drbd_peer_request *peer_req;
b411b363 1280 struct page *page;
45bb912b 1281 int dgs, ds, rr;
a0638456
PR
1282 void *dig_in = mdev->tconn->int_dig_in;
1283 void *dig_vv = mdev->tconn->int_dig_vv;
6b4388ac 1284 unsigned long *data;
b411b363 1285
a0638456
PR
1286 dgs = (mdev->tconn->agreed_pro_version >= 87 && mdev->tconn->integrity_r_tfm) ?
1287 crypto_hash_digestsize(mdev->tconn->integrity_r_tfm) : 0;
b411b363
PR
1288
1289 if (dgs) {
de0ff338 1290 rr = drbd_recv(mdev->tconn, dig_in, dgs);
b411b363 1291 if (rr != dgs) {
0ddc5549
LE
1292 if (!signal_pending(current))
1293 dev_warn(DEV,
1294 "short read receiving data digest: read %d expected %d\n",
1295 rr, dgs);
b411b363
PR
1296 return NULL;
1297 }
1298 }
1299
1300 data_size -= dgs;
1301
841ce241
AG
1302 if (!expect(data_size != 0))
1303 return NULL;
1304 if (!expect(IS_ALIGNED(data_size, 512)))
1305 return NULL;
1306 if (!expect(data_size <= DRBD_MAX_BIO_SIZE))
1307 return NULL;
b411b363 1308
6666032a
LE
1309 /* even though we trust out peer,
1310 * we sometimes have to double check. */
1311 if (sector + (data_size>>9) > capacity) {
fdda6544
LE
1312 dev_err(DEV, "request from peer beyond end of local disk: "
1313 "capacity: %llus < sector: %llus + size: %u\n",
6666032a
LE
1314 (unsigned long long)capacity,
1315 (unsigned long long)sector, data_size);
1316 return NULL;
1317 }
1318
b411b363
PR
1319 /* GFP_NOIO, because we must not cause arbitrary write-out: in a DRBD
1320 * "criss-cross" setup, that might cause write-out on some other DRBD,
1321 * which in turn might block on the other node at this very place. */
db830c46
AG
1322 peer_req = drbd_alloc_ee(mdev, id, sector, data_size, GFP_NOIO);
1323 if (!peer_req)
b411b363 1324 return NULL;
45bb912b 1325
b411b363 1326 ds = data_size;
db830c46 1327 page = peer_req->pages;
45bb912b
LE
1328 page_chain_for_each(page) {
1329 unsigned len = min_t(int, ds, PAGE_SIZE);
6b4388ac 1330 data = kmap(page);
de0ff338 1331 rr = drbd_recv(mdev->tconn, data, len);
0cf9d27e 1332 if (drbd_insert_fault(mdev, DRBD_FAULT_RECEIVE)) {
6b4388ac
PR
1333 dev_err(DEV, "Fault injection: Corrupting data on receive\n");
1334 data[0] = data[0] ^ (unsigned long)-1;
1335 }
b411b363 1336 kunmap(page);
45bb912b 1337 if (rr != len) {
db830c46 1338 drbd_free_ee(mdev, peer_req);
0ddc5549
LE
1339 if (!signal_pending(current))
1340 dev_warn(DEV, "short read receiving data: read %d expected %d\n",
1341 rr, len);
b411b363
PR
1342 return NULL;
1343 }
1344 ds -= rr;
1345 }
1346
1347 if (dgs) {
db830c46 1348 drbd_csum_ee(mdev, mdev->tconn->integrity_r_tfm, peer_req, dig_vv);
b411b363 1349 if (memcmp(dig_in, dig_vv, dgs)) {
470be44a
LE
1350 dev_err(DEV, "Digest integrity check FAILED: %llus +%u\n",
1351 (unsigned long long)sector, data_size);
b411b363 1352 drbd_bcast_ee(mdev, "digest failed",
db830c46
AG
1353 dgs, dig_in, dig_vv, peer_req);
1354 drbd_free_ee(mdev, peer_req);
b411b363
PR
1355 return NULL;
1356 }
1357 }
1358 mdev->recv_cnt += data_size>>9;
db830c46 1359 return peer_req;
b411b363
PR
1360}
1361
1362/* drbd_drain_block() just takes a data block
1363 * out of the socket input buffer, and discards it.
1364 */
1365static int drbd_drain_block(struct drbd_conf *mdev, int data_size)
1366{
1367 struct page *page;
1368 int rr, rv = 1;
1369 void *data;
1370
c3470cde 1371 if (!data_size)
81e84650 1372 return true;
c3470cde 1373
45bb912b 1374 page = drbd_pp_alloc(mdev, 1, 1);
b411b363
PR
1375
1376 data = kmap(page);
1377 while (data_size) {
de0ff338 1378 rr = drbd_recv(mdev->tconn, data, min_t(int, data_size, PAGE_SIZE));
b411b363
PR
1379 if (rr != min_t(int, data_size, PAGE_SIZE)) {
1380 rv = 0;
0ddc5549
LE
1381 if (!signal_pending(current))
1382 dev_warn(DEV,
1383 "short read receiving data: read %d expected %d\n",
1384 rr, min_t(int, data_size, PAGE_SIZE));
b411b363
PR
1385 break;
1386 }
1387 data_size -= rr;
1388 }
1389 kunmap(page);
435f0740 1390 drbd_pp_free(mdev, page, 0);
b411b363
PR
1391 return rv;
1392}
1393
1394static int recv_dless_read(struct drbd_conf *mdev, struct drbd_request *req,
1395 sector_t sector, int data_size)
1396{
1397 struct bio_vec *bvec;
1398 struct bio *bio;
1399 int dgs, rr, i, expect;
a0638456
PR
1400 void *dig_in = mdev->tconn->int_dig_in;
1401 void *dig_vv = mdev->tconn->int_dig_vv;
b411b363 1402
a0638456
PR
1403 dgs = (mdev->tconn->agreed_pro_version >= 87 && mdev->tconn->integrity_r_tfm) ?
1404 crypto_hash_digestsize(mdev->tconn->integrity_r_tfm) : 0;
b411b363
PR
1405
1406 if (dgs) {
de0ff338 1407 rr = drbd_recv(mdev->tconn, dig_in, dgs);
b411b363 1408 if (rr != dgs) {
0ddc5549
LE
1409 if (!signal_pending(current))
1410 dev_warn(DEV,
1411 "short read receiving data reply digest: read %d expected %d\n",
1412 rr, dgs);
b411b363
PR
1413 return 0;
1414 }
1415 }
1416
1417 data_size -= dgs;
1418
1419 /* optimistically update recv_cnt. if receiving fails below,
1420 * we disconnect anyways, and counters will be reset. */
1421 mdev->recv_cnt += data_size>>9;
1422
1423 bio = req->master_bio;
1424 D_ASSERT(sector == bio->bi_sector);
1425
1426 bio_for_each_segment(bvec, bio, i) {
1427 expect = min_t(int, data_size, bvec->bv_len);
de0ff338 1428 rr = drbd_recv(mdev->tconn,
b411b363
PR
1429 kmap(bvec->bv_page)+bvec->bv_offset,
1430 expect);
1431 kunmap(bvec->bv_page);
1432 if (rr != expect) {
0ddc5549
LE
1433 if (!signal_pending(current))
1434 dev_warn(DEV, "short read receiving data reply: "
1435 "read %d expected %d\n",
1436 rr, expect);
b411b363
PR
1437 return 0;
1438 }
1439 data_size -= rr;
1440 }
1441
1442 if (dgs) {
a0638456 1443 drbd_csum_bio(mdev, mdev->tconn->integrity_r_tfm, bio, dig_vv);
b411b363
PR
1444 if (memcmp(dig_in, dig_vv, dgs)) {
1445 dev_err(DEV, "Digest integrity check FAILED. Broken NICs?\n");
1446 return 0;
1447 }
1448 }
1449
1450 D_ASSERT(data_size == 0);
1451 return 1;
1452}
1453
1454/* e_end_resync_block() is called via
1455 * drbd_process_done_ee() by asender only */
00d56944 1456static int e_end_resync_block(struct drbd_work *w, int unused)
b411b363 1457{
db830c46 1458 struct drbd_peer_request *peer_req = (struct drbd_peer_request *)w;
00d56944 1459 struct drbd_conf *mdev = w->mdev;
db830c46 1460 sector_t sector = peer_req->i.sector;
b411b363
PR
1461 int ok;
1462
db830c46 1463 D_ASSERT(drbd_interval_empty(&peer_req->i));
b411b363 1464
db830c46
AG
1465 if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
1466 drbd_set_in_sync(mdev, sector, peer_req->i.size);
1467 ok = drbd_send_ack(mdev, P_RS_WRITE_ACK, peer_req);
b411b363
PR
1468 } else {
1469 /* Record failure to sync */
db830c46 1470 drbd_rs_failed_io(mdev, sector, peer_req->i.size);
b411b363 1471
db830c46 1472 ok = drbd_send_ack(mdev, P_NEG_ACK, peer_req);
b411b363
PR
1473 }
1474 dec_unacked(mdev);
1475
1476 return ok;
1477}
1478
1479static int recv_resync_read(struct drbd_conf *mdev, sector_t sector, int data_size) __releases(local)
1480{
db830c46 1481 struct drbd_peer_request *peer_req;
b411b363 1482
db830c46
AG
1483 peer_req = read_in_block(mdev, ID_SYNCER, sector, data_size);
1484 if (!peer_req)
45bb912b 1485 goto fail;
b411b363
PR
1486
1487 dec_rs_pending(mdev);
1488
b411b363
PR
1489 inc_unacked(mdev);
1490 /* corresponding dec_unacked() in e_end_resync_block()
1491 * respective _drbd_clear_done_ee */
1492
db830c46 1493 peer_req->w.cb = e_end_resync_block;
45bb912b 1494
87eeee41 1495 spin_lock_irq(&mdev->tconn->req_lock);
db830c46 1496 list_add(&peer_req->w.list, &mdev->sync_ee);
87eeee41 1497 spin_unlock_irq(&mdev->tconn->req_lock);
b411b363 1498
0f0601f4 1499 atomic_add(data_size >> 9, &mdev->rs_sect_ev);
fbe29dec 1500 if (drbd_submit_peer_request(mdev, peer_req, WRITE, DRBD_FAULT_RS_WR) == 0)
81e84650 1501 return true;
b411b363 1502
10f6d992
LE
1503 /* don't care for the reason here */
1504 dev_err(DEV, "submit failed, triggering re-connect\n");
87eeee41 1505 spin_lock_irq(&mdev->tconn->req_lock);
db830c46 1506 list_del(&peer_req->w.list);
87eeee41 1507 spin_unlock_irq(&mdev->tconn->req_lock);
22cc37a9 1508
db830c46 1509 drbd_free_ee(mdev, peer_req);
45bb912b
LE
1510fail:
1511 put_ldev(mdev);
81e84650 1512 return false;
b411b363
PR
1513}
1514
668eebc6 1515static struct drbd_request *
bc9c5c41
AG
1516find_request(struct drbd_conf *mdev, struct rb_root *root, u64 id,
1517 sector_t sector, bool missing_ok, const char *func)
51624585 1518{
51624585
AG
1519 struct drbd_request *req;
1520
bc9c5c41
AG
1521 /* Request object according to our peer */
1522 req = (struct drbd_request *)(unsigned long)id;
5e472264 1523 if (drbd_contains_interval(root, sector, &req->i) && req->i.local)
668eebc6 1524 return req;
c3afd8f5
AG
1525 if (!missing_ok) {
1526 dev_err(DEV, "%s: failed to find request %lu, sector %llus\n", func,
1527 (unsigned long)id, (unsigned long long)sector);
1528 }
51624585
AG
1529 return NULL;
1530}
1531
d8763023
AG
1532static int receive_DataReply(struct drbd_conf *mdev, enum drbd_packet cmd,
1533 unsigned int data_size)
b411b363
PR
1534{
1535 struct drbd_request *req;
1536 sector_t sector;
b411b363 1537 int ok;
e42325a5 1538 struct p_data *p = &mdev->tconn->data.rbuf.data;
b411b363
PR
1539
1540 sector = be64_to_cpu(p->sector);
1541
87eeee41 1542 spin_lock_irq(&mdev->tconn->req_lock);
bc9c5c41 1543 req = find_request(mdev, &mdev->read_requests, p->block_id, sector, false, __func__);
87eeee41 1544 spin_unlock_irq(&mdev->tconn->req_lock);
c3afd8f5 1545 if (unlikely(!req))
81e84650 1546 return false;
b411b363 1547
24c4830c 1548 /* hlist_del(&req->collision) is done in _req_may_be_done, to avoid
b411b363
PR
1549 * special casing it there for the various failure cases.
1550 * still no race with drbd_fail_pending_reads */
1551 ok = recv_dless_read(mdev, req, sector, data_size);
1552
1553 if (ok)
8554df1c 1554 req_mod(req, DATA_RECEIVED);
b411b363
PR
1555 /* else: nothing. handled from drbd_disconnect...
1556 * I don't think we may complete this just yet
1557 * in case we are "on-disconnect: freeze" */
1558
1559 return ok;
1560}
1561
d8763023
AG
1562static int receive_RSDataReply(struct drbd_conf *mdev, enum drbd_packet cmd,
1563 unsigned int data_size)
b411b363
PR
1564{
1565 sector_t sector;
b411b363 1566 int ok;
e42325a5 1567 struct p_data *p = &mdev->tconn->data.rbuf.data;
b411b363
PR
1568
1569 sector = be64_to_cpu(p->sector);
1570 D_ASSERT(p->block_id == ID_SYNCER);
1571
1572 if (get_ldev(mdev)) {
1573 /* data is submitted to disk within recv_resync_read.
1574 * corresponding put_ldev done below on error,
fcefa62e 1575 * or in drbd_peer_request_endio. */
b411b363
PR
1576 ok = recv_resync_read(mdev, sector, data_size);
1577 } else {
1578 if (__ratelimit(&drbd_ratelimit_state))
1579 dev_err(DEV, "Can not write resync data to local disk.\n");
1580
1581 ok = drbd_drain_block(mdev, data_size);
1582
2b2bf214 1583 drbd_send_ack_dp(mdev, P_NEG_ACK, p, data_size);
b411b363
PR
1584 }
1585
778f271d
PR
1586 atomic_add(data_size >> 9, &mdev->rs_sect_in);
1587
b411b363
PR
1588 return ok;
1589}
1590
1591/* e_end_block() is called via drbd_process_done_ee().
1592 * this means this function only runs in the asender thread
1593 */
00d56944 1594static int e_end_block(struct drbd_work *w, int cancel)
b411b363 1595{
db830c46 1596 struct drbd_peer_request *peer_req = (struct drbd_peer_request *)w;
00d56944 1597 struct drbd_conf *mdev = w->mdev;
db830c46 1598 sector_t sector = peer_req->i.sector;
b411b363
PR
1599 int ok = 1, pcmd;
1600
89e58e75 1601 if (mdev->tconn->net_conf->wire_protocol == DRBD_PROT_C) {
db830c46 1602 if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
b411b363
PR
1603 pcmd = (mdev->state.conn >= C_SYNC_SOURCE &&
1604 mdev->state.conn <= C_PAUSED_SYNC_T &&
db830c46 1605 peer_req->flags & EE_MAY_SET_IN_SYNC) ?
b411b363 1606 P_RS_WRITE_ACK : P_WRITE_ACK;
db830c46 1607 ok &= drbd_send_ack(mdev, pcmd, peer_req);
b411b363 1608 if (pcmd == P_RS_WRITE_ACK)
db830c46 1609 drbd_set_in_sync(mdev, sector, peer_req->i.size);
b411b363 1610 } else {
db830c46 1611 ok = drbd_send_ack(mdev, P_NEG_ACK, peer_req);
b411b363
PR
1612 /* we expect it to be marked out of sync anyways...
1613 * maybe assert this? */
1614 }
1615 dec_unacked(mdev);
1616 }
1617 /* we delete from the conflict detection hash _after_ we sent out the
1618 * P_WRITE_ACK / P_NEG_ACK, to get the sequence number right. */
89e58e75 1619 if (mdev->tconn->net_conf->two_primaries) {
87eeee41 1620 spin_lock_irq(&mdev->tconn->req_lock);
db830c46
AG
1621 D_ASSERT(!drbd_interval_empty(&peer_req->i));
1622 drbd_remove_epoch_entry_interval(mdev, peer_req);
87eeee41 1623 spin_unlock_irq(&mdev->tconn->req_lock);
bb3bfe96 1624 } else
db830c46 1625 D_ASSERT(drbd_interval_empty(&peer_req->i));
b411b363 1626
db830c46 1627 drbd_may_finish_epoch(mdev, peer_req->epoch, EV_PUT + (cancel ? EV_CLEANUP : 0));
b411b363
PR
1628
1629 return ok;
1630}
1631
00d56944 1632static int e_send_discard_ack(struct drbd_work *w, int unused)
b411b363 1633{
db830c46 1634 struct drbd_peer_request *peer_req = (struct drbd_peer_request *)w;
00d56944 1635 struct drbd_conf *mdev = w->mdev;
b411b363
PR
1636 int ok = 1;
1637
89e58e75 1638 D_ASSERT(mdev->tconn->net_conf->wire_protocol == DRBD_PROT_C);
db830c46 1639 ok = drbd_send_ack(mdev, P_DISCARD_ACK, peer_req);
b411b363 1640
87eeee41 1641 spin_lock_irq(&mdev->tconn->req_lock);
db830c46
AG
1642 D_ASSERT(!drbd_interval_empty(&peer_req->i));
1643 drbd_remove_epoch_entry_interval(mdev, peer_req);
87eeee41 1644 spin_unlock_irq(&mdev->tconn->req_lock);
b411b363
PR
1645
1646 dec_unacked(mdev);
1647
1648 return ok;
1649}
1650
3e394da1
AG
1651static bool seq_greater(u32 a, u32 b)
1652{
1653 /*
1654 * We assume 32-bit wrap-around here.
1655 * For 24-bit wrap-around, we would have to shift:
1656 * a <<= 8; b <<= 8;
1657 */
1658 return (s32)a - (s32)b > 0;
1659}
1660
1661static u32 seq_max(u32 a, u32 b)
1662{
1663 return seq_greater(a, b) ? a : b;
1664}
1665
43ae077d 1666static void update_peer_seq(struct drbd_conf *mdev, unsigned int peer_seq)
3e394da1 1667{
43ae077d 1668 unsigned int old_peer_seq;
3e394da1
AG
1669
1670 spin_lock(&mdev->peer_seq_lock);
43ae077d
AG
1671 old_peer_seq = mdev->peer_seq;
1672 mdev->peer_seq = seq_max(mdev->peer_seq, peer_seq);
3e394da1 1673 spin_unlock(&mdev->peer_seq_lock);
43ae077d 1674 if (old_peer_seq != peer_seq)
3e394da1
AG
1675 wake_up(&mdev->seq_wait);
1676}
1677
b411b363
PR
1678/* Called from receive_Data.
1679 * Synchronize packets on sock with packets on msock.
1680 *
1681 * This is here so even when a P_DATA packet traveling via sock overtook an Ack
1682 * packet traveling on msock, they are still processed in the order they have
1683 * been sent.
1684 *
1685 * Note: we don't care for Ack packets overtaking P_DATA packets.
1686 *
1687 * In case packet_seq is larger than mdev->peer_seq number, there are
1688 * outstanding packets on the msock. We wait for them to arrive.
1689 * In case we are the logically next packet, we update mdev->peer_seq
1690 * ourselves. Correctly handles 32bit wrap around.
1691 *
1692 * Assume we have a 10 GBit connection, that is about 1<<30 byte per second,
1693 * about 1<<21 sectors per second. So "worst" case, we have 1<<3 == 8 seconds
1694 * for the 24bit wrap (historical atomic_t guarantee on some archs), and we have
1695 * 1<<9 == 512 seconds aka ages for the 32bit wrap around...
1696 *
1697 * returns 0 if we may process the packet,
1698 * -ERESTARTSYS if we were interrupted (by disconnect signal). */
1699static int drbd_wait_peer_seq(struct drbd_conf *mdev, const u32 packet_seq)
1700{
1701 DEFINE_WAIT(wait);
1702 unsigned int p_seq;
1703 long timeout;
1704 int ret = 0;
1705 spin_lock(&mdev->peer_seq_lock);
1706 for (;;) {
1707 prepare_to_wait(&mdev->seq_wait, &wait, TASK_INTERRUPTIBLE);
3e394da1 1708 if (!seq_greater(packet_seq, mdev->peer_seq + 1))
b411b363
PR
1709 break;
1710 if (signal_pending(current)) {
1711 ret = -ERESTARTSYS;
1712 break;
1713 }
1714 p_seq = mdev->peer_seq;
1715 spin_unlock(&mdev->peer_seq_lock);
1716 timeout = schedule_timeout(30*HZ);
1717 spin_lock(&mdev->peer_seq_lock);
1718 if (timeout == 0 && p_seq == mdev->peer_seq) {
1719 ret = -ETIMEDOUT;
1720 dev_err(DEV, "ASSERT FAILED waited 30 seconds for sequence update, forcing reconnect\n");
1721 break;
1722 }
1723 }
1724 finish_wait(&mdev->seq_wait, &wait);
1725 if (mdev->peer_seq+1 == packet_seq)
1726 mdev->peer_seq++;
1727 spin_unlock(&mdev->peer_seq_lock);
1728 return ret;
1729}
1730
688593c5
LE
1731/* see also bio_flags_to_wire()
1732 * DRBD_REQ_*, because we need to semantically map the flags to data packet
1733 * flags and back. We may replicate to other kernel versions. */
1734static unsigned long wire_flags_to_bio(struct drbd_conf *mdev, u32 dpf)
76d2e7ec 1735{
688593c5
LE
1736 return (dpf & DP_RW_SYNC ? REQ_SYNC : 0) |
1737 (dpf & DP_FUA ? REQ_FUA : 0) |
1738 (dpf & DP_FLUSH ? REQ_FLUSH : 0) |
1739 (dpf & DP_DISCARD ? REQ_DISCARD : 0);
76d2e7ec
PR
1740}
1741
b411b363 1742/* mirrored write */
d8763023
AG
1743static int receive_Data(struct drbd_conf *mdev, enum drbd_packet cmd,
1744 unsigned int data_size)
b411b363
PR
1745{
1746 sector_t sector;
db830c46 1747 struct drbd_peer_request *peer_req;
e42325a5 1748 struct p_data *p = &mdev->tconn->data.rbuf.data;
b411b363
PR
1749 int rw = WRITE;
1750 u32 dp_flags;
1751
b411b363 1752 if (!get_ldev(mdev)) {
b411b363
PR
1753 spin_lock(&mdev->peer_seq_lock);
1754 if (mdev->peer_seq+1 == be32_to_cpu(p->seq_num))
1755 mdev->peer_seq++;
1756 spin_unlock(&mdev->peer_seq_lock);
1757
2b2bf214 1758 drbd_send_ack_dp(mdev, P_NEG_ACK, p, data_size);
b411b363
PR
1759 atomic_inc(&mdev->current_epoch->epoch_size);
1760 return drbd_drain_block(mdev, data_size);
1761 }
1762
fcefa62e
AG
1763 /*
1764 * Corresponding put_ldev done either below (on various errors), or in
1765 * drbd_peer_request_endio, if we successfully submit the data at the
1766 * end of this function.
1767 */
b411b363
PR
1768
1769 sector = be64_to_cpu(p->sector);
db830c46
AG
1770 peer_req = read_in_block(mdev, p->block_id, sector, data_size);
1771 if (!peer_req) {
b411b363 1772 put_ldev(mdev);
81e84650 1773 return false;
b411b363
PR
1774 }
1775
db830c46 1776 peer_req->w.cb = e_end_block;
b411b363 1777
688593c5
LE
1778 dp_flags = be32_to_cpu(p->dp_flags);
1779 rw |= wire_flags_to_bio(mdev, dp_flags);
1780
1781 if (dp_flags & DP_MAY_SET_IN_SYNC)
db830c46 1782 peer_req->flags |= EE_MAY_SET_IN_SYNC;
688593c5 1783
b411b363 1784 spin_lock(&mdev->epoch_lock);
db830c46
AG
1785 peer_req->epoch = mdev->current_epoch;
1786 atomic_inc(&peer_req->epoch->epoch_size);
1787 atomic_inc(&peer_req->epoch->active);
b411b363
PR
1788 spin_unlock(&mdev->epoch_lock);
1789
b411b363 1790 /* I'm the receiver, I do hold a net_cnt reference. */
89e58e75 1791 if (!mdev->tconn->net_conf->two_primaries) {
87eeee41 1792 spin_lock_irq(&mdev->tconn->req_lock);
b411b363
PR
1793 } else {
1794 /* don't get the req_lock yet,
1795 * we may sleep in drbd_wait_peer_seq */
db830c46 1796 const int size = peer_req->i.size;
25703f83 1797 const int discard = test_bit(DISCARD_CONCURRENT, &mdev->tconn->flags);
b411b363 1798 DEFINE_WAIT(wait);
b411b363
PR
1799 int first;
1800
89e58e75 1801 D_ASSERT(mdev->tconn->net_conf->wire_protocol == DRBD_PROT_C);
b411b363
PR
1802
1803 /* conflict detection and handling:
1804 * 1. wait on the sequence number,
1805 * in case this data packet overtook ACK packets.
5e472264 1806 * 2. check for conflicting write requests.
b411b363
PR
1807 *
1808 * Note: for two_primaries, we are protocol C,
1809 * so there cannot be any request that is DONE
1810 * but still on the transfer log.
1811 *
b411b363
PR
1812 * if no conflicting request is found:
1813 * submit.
1814 *
1815 * if any conflicting request is found
1816 * that has not yet been acked,
1817 * AND I have the "discard concurrent writes" flag:
1818 * queue (via done_ee) the P_DISCARD_ACK; OUT.
1819 *
1820 * if any conflicting request is found:
1821 * block the receiver, waiting on misc_wait
1822 * until no more conflicting requests are there,
1823 * or we get interrupted (disconnect).
1824 *
1825 * we do not just write after local io completion of those
1826 * requests, but only after req is done completely, i.e.
1827 * we wait for the P_DISCARD_ACK to arrive!
1828 *
1829 * then proceed normally, i.e. submit.
1830 */
1831 if (drbd_wait_peer_seq(mdev, be32_to_cpu(p->seq_num)))
1832 goto out_interrupted;
1833
87eeee41 1834 spin_lock_irq(&mdev->tconn->req_lock);
b411b363 1835
b411b363
PR
1836 first = 1;
1837 for (;;) {
de696716 1838 struct drbd_interval *i;
b411b363
PR
1839 int have_unacked = 0;
1840 int have_conflict = 0;
1841 prepare_to_wait(&mdev->misc_wait, &wait,
1842 TASK_INTERRUPTIBLE);
de696716
AG
1843
1844 i = drbd_find_overlap(&mdev->write_requests, sector, size);
1845 if (i) {
de696716
AG
1846 /* only ALERT on first iteration,
1847 * we may be woken up early... */
1848 if (first)
5e472264 1849 dev_alert(DEV, "%s[%u] Concurrent %s write detected!"
de696716
AG
1850 " new: %llus +%u; pending: %llus +%u\n",
1851 current->comm, current->pid,
5e472264 1852 i->local ? "local" : "remote",
de696716 1853 (unsigned long long)sector, size,
5e472264
AG
1854 (unsigned long long)i->sector, i->size);
1855
1856 if (i->local) {
1857 struct drbd_request *req2;
1858
1859 req2 = container_of(i, struct drbd_request, i);
1860 if (req2->rq_state & RQ_NET_PENDING)
1861 ++have_unacked;
1862 }
de696716 1863 ++have_conflict;
b411b363 1864 }
b411b363
PR
1865 if (!have_conflict)
1866 break;
1867
1868 /* Discard Ack only for the _first_ iteration */
1869 if (first && discard && have_unacked) {
1870 dev_alert(DEV, "Concurrent write! [DISCARD BY FLAG] sec=%llus\n",
1871 (unsigned long long)sector);
1872 inc_unacked(mdev);
db830c46
AG
1873 peer_req->w.cb = e_send_discard_ack;
1874 list_add_tail(&peer_req->w.list, &mdev->done_ee);
b411b363 1875
87eeee41 1876 spin_unlock_irq(&mdev->tconn->req_lock);
b411b363
PR
1877
1878 /* we could probably send that P_DISCARD_ACK ourselves,
1879 * but I don't like the receiver using the msock */
1880
1881 put_ldev(mdev);
0625ac19 1882 wake_asender(mdev->tconn);
b411b363 1883 finish_wait(&mdev->misc_wait, &wait);
81e84650 1884 return true;
b411b363
PR
1885 }
1886
1887 if (signal_pending(current)) {
87eeee41 1888 spin_unlock_irq(&mdev->tconn->req_lock);
b411b363
PR
1889 finish_wait(&mdev->misc_wait, &wait);
1890 goto out_interrupted;
1891 }
1892
a500c2ef 1893 /* Indicate to wake up mdev->misc_wait upon completion. */
53840641 1894 i->waiting = true;
a500c2ef 1895
87eeee41 1896 spin_unlock_irq(&mdev->tconn->req_lock);
b411b363
PR
1897 if (first) {
1898 first = 0;
1899 dev_alert(DEV, "Concurrent write! [W AFTERWARDS] "
1900 "sec=%llus\n", (unsigned long long)sector);
1901 } else if (discard) {
1902 /* we had none on the first iteration.
1903 * there must be none now. */
1904 D_ASSERT(have_unacked == 0);
1905 }
1906 schedule();
87eeee41 1907 spin_lock_irq(&mdev->tconn->req_lock);
b411b363
PR
1908 }
1909 finish_wait(&mdev->misc_wait, &wait);
5e472264 1910
db830c46 1911 drbd_insert_interval(&mdev->write_requests, &peer_req->i);
b411b363
PR
1912 }
1913
db830c46 1914 list_add(&peer_req->w.list, &mdev->active_ee);
87eeee41 1915 spin_unlock_irq(&mdev->tconn->req_lock);
b411b363 1916
89e58e75 1917 switch (mdev->tconn->net_conf->wire_protocol) {
b411b363
PR
1918 case DRBD_PROT_C:
1919 inc_unacked(mdev);
1920 /* corresponding dec_unacked() in e_end_block()
1921 * respective _drbd_clear_done_ee */
1922 break;
1923 case DRBD_PROT_B:
1924 /* I really don't like it that the receiver thread
1925 * sends on the msock, but anyways */
db830c46 1926 drbd_send_ack(mdev, P_RECV_ACK, peer_req);
b411b363
PR
1927 break;
1928 case DRBD_PROT_A:
1929 /* nothing to do */
1930 break;
1931 }
1932
6719fb03 1933 if (mdev->state.pdsk < D_INCONSISTENT) {
b411b363 1934 /* In case we have the only disk of the cluster, */
db830c46
AG
1935 drbd_set_out_of_sync(mdev, peer_req->i.sector, peer_req->i.size);
1936 peer_req->flags |= EE_CALL_AL_COMPLETE_IO;
1937 peer_req->flags &= ~EE_MAY_SET_IN_SYNC;
1938 drbd_al_begin_io(mdev, peer_req->i.sector);
b411b363
PR
1939 }
1940
fbe29dec 1941 if (drbd_submit_peer_request(mdev, peer_req, rw, DRBD_FAULT_DT_WR) == 0)
81e84650 1942 return true;
b411b363 1943
10f6d992
LE
1944 /* don't care for the reason here */
1945 dev_err(DEV, "submit failed, triggering re-connect\n");
87eeee41 1946 spin_lock_irq(&mdev->tconn->req_lock);
db830c46
AG
1947 list_del(&peer_req->w.list);
1948 drbd_remove_epoch_entry_interval(mdev, peer_req);
87eeee41 1949 spin_unlock_irq(&mdev->tconn->req_lock);
db830c46
AG
1950 if (peer_req->flags & EE_CALL_AL_COMPLETE_IO)
1951 drbd_al_complete_io(mdev, peer_req->i.sector);
22cc37a9 1952
b411b363 1953out_interrupted:
db830c46 1954 drbd_may_finish_epoch(mdev, peer_req->epoch, EV_PUT + EV_CLEANUP);
b411b363 1955 put_ldev(mdev);
db830c46 1956 drbd_free_ee(mdev, peer_req);
81e84650 1957 return false;
b411b363
PR
1958}
1959
0f0601f4
LE
1960/* We may throttle resync, if the lower device seems to be busy,
1961 * and current sync rate is above c_min_rate.
1962 *
1963 * To decide whether or not the lower device is busy, we use a scheme similar
1964 * to MD RAID is_mddev_idle(): if the partition stats reveal "significant"
1965 * (more than 64 sectors) of activity we cannot account for with our own resync
1966 * activity, it obviously is "busy".
1967 *
1968 * The current sync rate used here uses only the most recent two step marks,
1969 * to have a short time average so we can react faster.
1970 */
e3555d85 1971int drbd_rs_should_slow_down(struct drbd_conf *mdev, sector_t sector)
0f0601f4
LE
1972{
1973 struct gendisk *disk = mdev->ldev->backing_bdev->bd_contains->bd_disk;
1974 unsigned long db, dt, dbdt;
e3555d85 1975 struct lc_element *tmp;
0f0601f4
LE
1976 int curr_events;
1977 int throttle = 0;
1978
1979 /* feature disabled? */
1980 if (mdev->sync_conf.c_min_rate == 0)
1981 return 0;
1982
e3555d85
PR
1983 spin_lock_irq(&mdev->al_lock);
1984 tmp = lc_find(mdev->resync, BM_SECT_TO_EXT(sector));
1985 if (tmp) {
1986 struct bm_extent *bm_ext = lc_entry(tmp, struct bm_extent, lce);
1987 if (test_bit(BME_PRIORITY, &bm_ext->flags)) {
1988 spin_unlock_irq(&mdev->al_lock);
1989 return 0;
1990 }
1991 /* Do not slow down if app IO is already waiting for this extent */
1992 }
1993 spin_unlock_irq(&mdev->al_lock);
1994
0f0601f4
LE
1995 curr_events = (int)part_stat_read(&disk->part0, sectors[0]) +
1996 (int)part_stat_read(&disk->part0, sectors[1]) -
1997 atomic_read(&mdev->rs_sect_ev);
e3555d85 1998
0f0601f4
LE
1999 if (!mdev->rs_last_events || curr_events - mdev->rs_last_events > 64) {
2000 unsigned long rs_left;
2001 int i;
2002
2003 mdev->rs_last_events = curr_events;
2004
2005 /* sync speed average over the last 2*DRBD_SYNC_MARK_STEP,
2006 * approx. */
2649f080
LE
2007 i = (mdev->rs_last_mark + DRBD_SYNC_MARKS-1) % DRBD_SYNC_MARKS;
2008
2009 if (mdev->state.conn == C_VERIFY_S || mdev->state.conn == C_VERIFY_T)
2010 rs_left = mdev->ov_left;
2011 else
2012 rs_left = drbd_bm_total_weight(mdev) - mdev->rs_failed;
0f0601f4
LE
2013
2014 dt = ((long)jiffies - (long)mdev->rs_mark_time[i]) / HZ;
2015 if (!dt)
2016 dt++;
2017 db = mdev->rs_mark_left[i] - rs_left;
2018 dbdt = Bit2KB(db/dt);
2019
2020 if (dbdt > mdev->sync_conf.c_min_rate)
2021 throttle = 1;
2022 }
2023 return throttle;
2024}
2025
2026
d8763023
AG
2027static int receive_DataRequest(struct drbd_conf *mdev, enum drbd_packet cmd,
2028 unsigned int digest_size)
b411b363
PR
2029{
2030 sector_t sector;
2031 const sector_t capacity = drbd_get_capacity(mdev->this_bdev);
db830c46 2032 struct drbd_peer_request *peer_req;
b411b363 2033 struct digest_info *di = NULL;
b18b37be 2034 int size, verb;
b411b363 2035 unsigned int fault_type;
e42325a5 2036 struct p_block_req *p = &mdev->tconn->data.rbuf.block_req;
b411b363
PR
2037
2038 sector = be64_to_cpu(p->sector);
2039 size = be32_to_cpu(p->blksize);
2040
1816a2b4 2041 if (size <= 0 || (size & 0x1ff) != 0 || size > DRBD_MAX_BIO_SIZE) {
b411b363
PR
2042 dev_err(DEV, "%s:%d: sector: %llus, size: %u\n", __FILE__, __LINE__,
2043 (unsigned long long)sector, size);
81e84650 2044 return false;
b411b363
PR
2045 }
2046 if (sector + (size>>9) > capacity) {
2047 dev_err(DEV, "%s:%d: sector: %llus, size: %u\n", __FILE__, __LINE__,
2048 (unsigned long long)sector, size);
81e84650 2049 return false;
b411b363
PR
2050 }
2051
2052 if (!get_ldev_if_state(mdev, D_UP_TO_DATE)) {
b18b37be
PR
2053 verb = 1;
2054 switch (cmd) {
2055 case P_DATA_REQUEST:
2056 drbd_send_ack_rp(mdev, P_NEG_DREPLY, p);
2057 break;
2058 case P_RS_DATA_REQUEST:
2059 case P_CSUM_RS_REQUEST:
2060 case P_OV_REQUEST:
2061 drbd_send_ack_rp(mdev, P_NEG_RS_DREPLY , p);
2062 break;
2063 case P_OV_REPLY:
2064 verb = 0;
2065 dec_rs_pending(mdev);
2066 drbd_send_ack_ex(mdev, P_OV_RESULT, sector, size, ID_IN_SYNC);
2067 break;
2068 default:
2069 dev_err(DEV, "unexpected command (%s) in receive_DataRequest\n",
2070 cmdname(cmd));
2071 }
2072 if (verb && __ratelimit(&drbd_ratelimit_state))
b411b363
PR
2073 dev_err(DEV, "Can not satisfy peer's read request, "
2074 "no local data.\n");
b18b37be 2075
a821cc4a
LE
2076 /* drain possibly payload */
2077 return drbd_drain_block(mdev, digest_size);
b411b363
PR
2078 }
2079
2080 /* GFP_NOIO, because we must not cause arbitrary write-out: in a DRBD
2081 * "criss-cross" setup, that might cause write-out on some other DRBD,
2082 * which in turn might block on the other node at this very place. */
db830c46
AG
2083 peer_req = drbd_alloc_ee(mdev, p->block_id, sector, size, GFP_NOIO);
2084 if (!peer_req) {
b411b363 2085 put_ldev(mdev);
81e84650 2086 return false;
b411b363
PR
2087 }
2088
02918be2 2089 switch (cmd) {
b411b363 2090 case P_DATA_REQUEST:
db830c46 2091 peer_req->w.cb = w_e_end_data_req;
b411b363 2092 fault_type = DRBD_FAULT_DT_RD;
80a40e43
LE
2093 /* application IO, don't drbd_rs_begin_io */
2094 goto submit;
2095
b411b363 2096 case P_RS_DATA_REQUEST:
db830c46 2097 peer_req->w.cb = w_e_end_rsdata_req;
b411b363 2098 fault_type = DRBD_FAULT_RS_RD;
5f9915bb
LE
2099 /* used in the sector offset progress display */
2100 mdev->bm_resync_fo = BM_SECT_TO_BIT(sector);
b411b363
PR
2101 break;
2102
2103 case P_OV_REPLY:
2104 case P_CSUM_RS_REQUEST:
2105 fault_type = DRBD_FAULT_RS_RD;
b411b363
PR
2106 di = kmalloc(sizeof(*di) + digest_size, GFP_NOIO);
2107 if (!di)
2108 goto out_free_e;
2109
2110 di->digest_size = digest_size;
2111 di->digest = (((char *)di)+sizeof(struct digest_info));
2112
db830c46
AG
2113 peer_req->digest = di;
2114 peer_req->flags |= EE_HAS_DIGEST;
c36c3ced 2115
de0ff338 2116 if (drbd_recv(mdev->tconn, di->digest, digest_size) != digest_size)
b411b363
PR
2117 goto out_free_e;
2118
02918be2 2119 if (cmd == P_CSUM_RS_REQUEST) {
31890f4a 2120 D_ASSERT(mdev->tconn->agreed_pro_version >= 89);
db830c46 2121 peer_req->w.cb = w_e_end_csum_rs_req;
5f9915bb
LE
2122 /* used in the sector offset progress display */
2123 mdev->bm_resync_fo = BM_SECT_TO_BIT(sector);
02918be2 2124 } else if (cmd == P_OV_REPLY) {
2649f080
LE
2125 /* track progress, we may need to throttle */
2126 atomic_add(size >> 9, &mdev->rs_sect_in);
db830c46 2127 peer_req->w.cb = w_e_end_ov_reply;
b411b363 2128 dec_rs_pending(mdev);
0f0601f4
LE
2129 /* drbd_rs_begin_io done when we sent this request,
2130 * but accounting still needs to be done. */
2131 goto submit_for_resync;
b411b363
PR
2132 }
2133 break;
2134
2135 case P_OV_REQUEST:
b411b363 2136 if (mdev->ov_start_sector == ~(sector_t)0 &&
31890f4a 2137 mdev->tconn->agreed_pro_version >= 90) {
de228bba
LE
2138 unsigned long now = jiffies;
2139 int i;
b411b363
PR
2140 mdev->ov_start_sector = sector;
2141 mdev->ov_position = sector;
30b743a2
LE
2142 mdev->ov_left = drbd_bm_bits(mdev) - BM_SECT_TO_BIT(sector);
2143 mdev->rs_total = mdev->ov_left;
de228bba
LE
2144 for (i = 0; i < DRBD_SYNC_MARKS; i++) {
2145 mdev->rs_mark_left[i] = mdev->ov_left;
2146 mdev->rs_mark_time[i] = now;
2147 }
b411b363
PR
2148 dev_info(DEV, "Online Verify start sector: %llu\n",
2149 (unsigned long long)sector);
2150 }
db830c46 2151 peer_req->w.cb = w_e_end_ov_req;
b411b363 2152 fault_type = DRBD_FAULT_RS_RD;
b411b363
PR
2153 break;
2154
b411b363
PR
2155 default:
2156 dev_err(DEV, "unexpected command (%s) in receive_DataRequest\n",
02918be2 2157 cmdname(cmd));
b411b363 2158 fault_type = DRBD_FAULT_MAX;
80a40e43 2159 goto out_free_e;
b411b363
PR
2160 }
2161
0f0601f4
LE
2162 /* Throttle, drbd_rs_begin_io and submit should become asynchronous
2163 * wrt the receiver, but it is not as straightforward as it may seem.
2164 * Various places in the resync start and stop logic assume resync
2165 * requests are processed in order, requeuing this on the worker thread
2166 * introduces a bunch of new code for synchronization between threads.
2167 *
2168 * Unlimited throttling before drbd_rs_begin_io may stall the resync
2169 * "forever", throttling after drbd_rs_begin_io will lock that extent
2170 * for application writes for the same time. For now, just throttle
2171 * here, where the rest of the code expects the receiver to sleep for
2172 * a while, anyways.
2173 */
2174
2175 /* Throttle before drbd_rs_begin_io, as that locks out application IO;
2176 * this defers syncer requests for some time, before letting at least
2177 * on request through. The resync controller on the receiving side
2178 * will adapt to the incoming rate accordingly.
2179 *
2180 * We cannot throttle here if remote is Primary/SyncTarget:
2181 * we would also throttle its application reads.
2182 * In that case, throttling is done on the SyncTarget only.
2183 */
e3555d85
PR
2184 if (mdev->state.peer != R_PRIMARY && drbd_rs_should_slow_down(mdev, sector))
2185 schedule_timeout_uninterruptible(HZ/10);
2186 if (drbd_rs_begin_io(mdev, sector))
80a40e43 2187 goto out_free_e;
b411b363 2188
0f0601f4
LE
2189submit_for_resync:
2190 atomic_add(size >> 9, &mdev->rs_sect_ev);
2191
80a40e43 2192submit:
b411b363 2193 inc_unacked(mdev);
87eeee41 2194 spin_lock_irq(&mdev->tconn->req_lock);
db830c46 2195 list_add_tail(&peer_req->w.list, &mdev->read_ee);
87eeee41 2196 spin_unlock_irq(&mdev->tconn->req_lock);
b411b363 2197
fbe29dec 2198 if (drbd_submit_peer_request(mdev, peer_req, READ, fault_type) == 0)
81e84650 2199 return true;
b411b363 2200
10f6d992
LE
2201 /* don't care for the reason here */
2202 dev_err(DEV, "submit failed, triggering re-connect\n");
87eeee41 2203 spin_lock_irq(&mdev->tconn->req_lock);
db830c46 2204 list_del(&peer_req->w.list);
87eeee41 2205 spin_unlock_irq(&mdev->tconn->req_lock);
22cc37a9
LE
2206 /* no drbd_rs_complete_io(), we are dropping the connection anyways */
2207
b411b363 2208out_free_e:
b411b363 2209 put_ldev(mdev);
db830c46 2210 drbd_free_ee(mdev, peer_req);
81e84650 2211 return false;
b411b363
PR
2212}
2213
2214static int drbd_asb_recover_0p(struct drbd_conf *mdev) __must_hold(local)
2215{
2216 int self, peer, rv = -100;
2217 unsigned long ch_self, ch_peer;
2218
2219 self = mdev->ldev->md.uuid[UI_BITMAP] & 1;
2220 peer = mdev->p_uuid[UI_BITMAP] & 1;
2221
2222 ch_peer = mdev->p_uuid[UI_SIZE];
2223 ch_self = mdev->comm_bm_set;
2224
89e58e75 2225 switch (mdev->tconn->net_conf->after_sb_0p) {
b411b363
PR
2226 case ASB_CONSENSUS:
2227 case ASB_DISCARD_SECONDARY:
2228 case ASB_CALL_HELPER:
2229 dev_err(DEV, "Configuration error.\n");
2230 break;
2231 case ASB_DISCONNECT:
2232 break;
2233 case ASB_DISCARD_YOUNGER_PRI:
2234 if (self == 0 && peer == 1) {
2235 rv = -1;
2236 break;
2237 }
2238 if (self == 1 && peer == 0) {
2239 rv = 1;
2240 break;
2241 }
2242 /* Else fall through to one of the other strategies... */
2243 case ASB_DISCARD_OLDER_PRI:
2244 if (self == 0 && peer == 1) {
2245 rv = 1;
2246 break;
2247 }
2248 if (self == 1 && peer == 0) {
2249 rv = -1;
2250 break;
2251 }
2252 /* Else fall through to one of the other strategies... */
ad19bf6e 2253 dev_warn(DEV, "Discard younger/older primary did not find a decision\n"
b411b363
PR
2254 "Using discard-least-changes instead\n");
2255 case ASB_DISCARD_ZERO_CHG:
2256 if (ch_peer == 0 && ch_self == 0) {
25703f83 2257 rv = test_bit(DISCARD_CONCURRENT, &mdev->tconn->flags)
b411b363
PR
2258 ? -1 : 1;
2259 break;
2260 } else {
2261 if (ch_peer == 0) { rv = 1; break; }
2262 if (ch_self == 0) { rv = -1; break; }
2263 }
89e58e75 2264 if (mdev->tconn->net_conf->after_sb_0p == ASB_DISCARD_ZERO_CHG)
b411b363
PR
2265 break;
2266 case ASB_DISCARD_LEAST_CHG:
2267 if (ch_self < ch_peer)
2268 rv = -1;
2269 else if (ch_self > ch_peer)
2270 rv = 1;
2271 else /* ( ch_self == ch_peer ) */
2272 /* Well, then use something else. */
25703f83 2273 rv = test_bit(DISCARD_CONCURRENT, &mdev->tconn->flags)
b411b363
PR
2274 ? -1 : 1;
2275 break;
2276 case ASB_DISCARD_LOCAL:
2277 rv = -1;
2278 break;
2279 case ASB_DISCARD_REMOTE:
2280 rv = 1;
2281 }
2282
2283 return rv;
2284}
2285
2286static int drbd_asb_recover_1p(struct drbd_conf *mdev) __must_hold(local)
2287{
6184ea21 2288 int hg, rv = -100;
b411b363 2289
89e58e75 2290 switch (mdev->tconn->net_conf->after_sb_1p) {
b411b363
PR
2291 case ASB_DISCARD_YOUNGER_PRI:
2292 case ASB_DISCARD_OLDER_PRI:
2293 case ASB_DISCARD_LEAST_CHG:
2294 case ASB_DISCARD_LOCAL:
2295 case ASB_DISCARD_REMOTE:
2296 dev_err(DEV, "Configuration error.\n");
2297 break;
2298 case ASB_DISCONNECT:
2299 break;
2300 case ASB_CONSENSUS:
2301 hg = drbd_asb_recover_0p(mdev);
2302 if (hg == -1 && mdev->state.role == R_SECONDARY)
2303 rv = hg;
2304 if (hg == 1 && mdev->state.role == R_PRIMARY)
2305 rv = hg;
2306 break;
2307 case ASB_VIOLENTLY:
2308 rv = drbd_asb_recover_0p(mdev);
2309 break;
2310 case ASB_DISCARD_SECONDARY:
2311 return mdev->state.role == R_PRIMARY ? 1 : -1;
2312 case ASB_CALL_HELPER:
2313 hg = drbd_asb_recover_0p(mdev);
2314 if (hg == -1 && mdev->state.role == R_PRIMARY) {
bb437946
AG
2315 enum drbd_state_rv rv2;
2316
2317 drbd_set_role(mdev, R_SECONDARY, 0);
b411b363
PR
2318 /* drbd_change_state() does not sleep while in SS_IN_TRANSIENT_STATE,
2319 * we might be here in C_WF_REPORT_PARAMS which is transient.
2320 * we do not need to wait for the after state change work either. */
bb437946
AG
2321 rv2 = drbd_change_state(mdev, CS_VERBOSE, NS(role, R_SECONDARY));
2322 if (rv2 != SS_SUCCESS) {
b411b363
PR
2323 drbd_khelper(mdev, "pri-lost-after-sb");
2324 } else {
2325 dev_warn(DEV, "Successfully gave up primary role.\n");
2326 rv = hg;
2327 }
2328 } else
2329 rv = hg;
2330 }
2331
2332 return rv;
2333}
2334
2335static int drbd_asb_recover_2p(struct drbd_conf *mdev) __must_hold(local)
2336{
6184ea21 2337 int hg, rv = -100;
b411b363 2338
89e58e75 2339 switch (mdev->tconn->net_conf->after_sb_2p) {
b411b363
PR
2340 case ASB_DISCARD_YOUNGER_PRI:
2341 case ASB_DISCARD_OLDER_PRI:
2342 case ASB_DISCARD_LEAST_CHG:
2343 case ASB_DISCARD_LOCAL:
2344 case ASB_DISCARD_REMOTE:
2345 case ASB_CONSENSUS:
2346 case ASB_DISCARD_SECONDARY:
2347 dev_err(DEV, "Configuration error.\n");
2348 break;
2349 case ASB_VIOLENTLY:
2350 rv = drbd_asb_recover_0p(mdev);
2351 break;
2352 case ASB_DISCONNECT:
2353 break;
2354 case ASB_CALL_HELPER:
2355 hg = drbd_asb_recover_0p(mdev);
2356 if (hg == -1) {
bb437946
AG
2357 enum drbd_state_rv rv2;
2358
b411b363
PR
2359 /* drbd_change_state() does not sleep while in SS_IN_TRANSIENT_STATE,
2360 * we might be here in C_WF_REPORT_PARAMS which is transient.
2361 * we do not need to wait for the after state change work either. */
bb437946
AG
2362 rv2 = drbd_change_state(mdev, CS_VERBOSE, NS(role, R_SECONDARY));
2363 if (rv2 != SS_SUCCESS) {
b411b363
PR
2364 drbd_khelper(mdev, "pri-lost-after-sb");
2365 } else {
2366 dev_warn(DEV, "Successfully gave up primary role.\n");
2367 rv = hg;
2368 }
2369 } else
2370 rv = hg;
2371 }
2372
2373 return rv;
2374}
2375
2376static void drbd_uuid_dump(struct drbd_conf *mdev, char *text, u64 *uuid,
2377 u64 bits, u64 flags)
2378{
2379 if (!uuid) {
2380 dev_info(DEV, "%s uuid info vanished while I was looking!\n", text);
2381 return;
2382 }
2383 dev_info(DEV, "%s %016llX:%016llX:%016llX:%016llX bits:%llu flags:%llX\n",
2384 text,
2385 (unsigned long long)uuid[UI_CURRENT],
2386 (unsigned long long)uuid[UI_BITMAP],
2387 (unsigned long long)uuid[UI_HISTORY_START],
2388 (unsigned long long)uuid[UI_HISTORY_END],
2389 (unsigned long long)bits,
2390 (unsigned long long)flags);
2391}
2392
2393/*
2394 100 after split brain try auto recover
2395 2 C_SYNC_SOURCE set BitMap
2396 1 C_SYNC_SOURCE use BitMap
2397 0 no Sync
2398 -1 C_SYNC_TARGET use BitMap
2399 -2 C_SYNC_TARGET set BitMap
2400 -100 after split brain, disconnect
2401-1000 unrelated data
4a23f264
PR
2402-1091 requires proto 91
2403-1096 requires proto 96
b411b363
PR
2404 */
2405static int drbd_uuid_compare(struct drbd_conf *mdev, int *rule_nr) __must_hold(local)
2406{
2407 u64 self, peer;
2408 int i, j;
2409
2410 self = mdev->ldev->md.uuid[UI_CURRENT] & ~((u64)1);
2411 peer = mdev->p_uuid[UI_CURRENT] & ~((u64)1);
2412
2413 *rule_nr = 10;
2414 if (self == UUID_JUST_CREATED && peer == UUID_JUST_CREATED)
2415 return 0;
2416
2417 *rule_nr = 20;
2418 if ((self == UUID_JUST_CREATED || self == (u64)0) &&
2419 peer != UUID_JUST_CREATED)
2420 return -2;
2421
2422 *rule_nr = 30;
2423 if (self != UUID_JUST_CREATED &&
2424 (peer == UUID_JUST_CREATED || peer == (u64)0))
2425 return 2;
2426
2427 if (self == peer) {
2428 int rct, dc; /* roles at crash time */
2429
2430 if (mdev->p_uuid[UI_BITMAP] == (u64)0 && mdev->ldev->md.uuid[UI_BITMAP] != (u64)0) {
2431
31890f4a 2432 if (mdev->tconn->agreed_pro_version < 91)
4a23f264 2433 return -1091;
b411b363
PR
2434
2435 if ((mdev->ldev->md.uuid[UI_BITMAP] & ~((u64)1)) == (mdev->p_uuid[UI_HISTORY_START] & ~((u64)1)) &&
2436 (mdev->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) == (mdev->p_uuid[UI_HISTORY_START + 1] & ~((u64)1))) {
2437 dev_info(DEV, "was SyncSource, missed the resync finished event, corrected myself:\n");
2438 drbd_uuid_set_bm(mdev, 0UL);
2439
2440 drbd_uuid_dump(mdev, "self", mdev->ldev->md.uuid,
2441 mdev->state.disk >= D_NEGOTIATING ? drbd_bm_total_weight(mdev) : 0, 0);
2442 *rule_nr = 34;
2443 } else {
2444 dev_info(DEV, "was SyncSource (peer failed to write sync_uuid)\n");
2445 *rule_nr = 36;
2446 }
2447
2448 return 1;
2449 }
2450
2451 if (mdev->ldev->md.uuid[UI_BITMAP] == (u64)0 && mdev->p_uuid[UI_BITMAP] != (u64)0) {
2452
31890f4a 2453 if (mdev->tconn->agreed_pro_version < 91)
4a23f264 2454 return -1091;
b411b363
PR
2455
2456 if ((mdev->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) == (mdev->p_uuid[UI_BITMAP] & ~((u64)1)) &&
2457 (mdev->ldev->md.uuid[UI_HISTORY_START + 1] & ~((u64)1)) == (mdev->p_uuid[UI_HISTORY_START] & ~((u64)1))) {
2458 dev_info(DEV, "was SyncTarget, peer missed the resync finished event, corrected peer:\n");
2459
2460 mdev->p_uuid[UI_HISTORY_START + 1] = mdev->p_uuid[UI_HISTORY_START];
2461 mdev->p_uuid[UI_HISTORY_START] = mdev->p_uuid[UI_BITMAP];
2462 mdev->p_uuid[UI_BITMAP] = 0UL;
2463
2464 drbd_uuid_dump(mdev, "peer", mdev->p_uuid, mdev->p_uuid[UI_SIZE], mdev->p_uuid[UI_FLAGS]);
2465 *rule_nr = 35;
2466 } else {
2467 dev_info(DEV, "was SyncTarget (failed to write sync_uuid)\n");
2468 *rule_nr = 37;
2469 }
2470
2471 return -1;
2472 }
2473
2474 /* Common power [off|failure] */
2475 rct = (test_bit(CRASHED_PRIMARY, &mdev->flags) ? 1 : 0) +
2476 (mdev->p_uuid[UI_FLAGS] & 2);
2477 /* lowest bit is set when we were primary,
2478 * next bit (weight 2) is set when peer was primary */
2479 *rule_nr = 40;
2480
2481 switch (rct) {
2482 case 0: /* !self_pri && !peer_pri */ return 0;
2483 case 1: /* self_pri && !peer_pri */ return 1;
2484 case 2: /* !self_pri && peer_pri */ return -1;
2485 case 3: /* self_pri && peer_pri */
25703f83 2486 dc = test_bit(DISCARD_CONCURRENT, &mdev->tconn->flags);
b411b363
PR
2487 return dc ? -1 : 1;
2488 }
2489 }
2490
2491 *rule_nr = 50;
2492 peer = mdev->p_uuid[UI_BITMAP] & ~((u64)1);
2493 if (self == peer)
2494 return -1;
2495
2496 *rule_nr = 51;
2497 peer = mdev->p_uuid[UI_HISTORY_START] & ~((u64)1);
2498 if (self == peer) {
31890f4a 2499 if (mdev->tconn->agreed_pro_version < 96 ?
4a23f264
PR
2500 (mdev->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) ==
2501 (mdev->p_uuid[UI_HISTORY_START + 1] & ~((u64)1)) :
2502 peer + UUID_NEW_BM_OFFSET == (mdev->p_uuid[UI_BITMAP] & ~((u64)1))) {
b411b363
PR
2503 /* The last P_SYNC_UUID did not get though. Undo the last start of
2504 resync as sync source modifications of the peer's UUIDs. */
2505
31890f4a 2506 if (mdev->tconn->agreed_pro_version < 91)
4a23f264 2507 return -1091;
b411b363
PR
2508
2509 mdev->p_uuid[UI_BITMAP] = mdev->p_uuid[UI_HISTORY_START];
2510 mdev->p_uuid[UI_HISTORY_START] = mdev->p_uuid[UI_HISTORY_START + 1];
4a23f264
PR
2511
2512 dev_info(DEV, "Did not got last syncUUID packet, corrected:\n");
2513 drbd_uuid_dump(mdev, "peer", mdev->p_uuid, mdev->p_uuid[UI_SIZE], mdev->p_uuid[UI_FLAGS]);
2514
b411b363
PR
2515 return -1;
2516 }
2517 }
2518
2519 *rule_nr = 60;
2520 self = mdev->ldev->md.uuid[UI_CURRENT] & ~((u64)1);
2521 for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
2522 peer = mdev->p_uuid[i] & ~((u64)1);
2523 if (self == peer)
2524 return -2;
2525 }
2526
2527 *rule_nr = 70;
2528 self = mdev->ldev->md.uuid[UI_BITMAP] & ~((u64)1);
2529 peer = mdev->p_uuid[UI_CURRENT] & ~((u64)1);
2530 if (self == peer)
2531 return 1;
2532
2533 *rule_nr = 71;
2534 self = mdev->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1);
2535 if (self == peer) {
31890f4a 2536 if (mdev->tconn->agreed_pro_version < 96 ?
4a23f264
PR
2537 (mdev->ldev->md.uuid[UI_HISTORY_START + 1] & ~((u64)1)) ==
2538 (mdev->p_uuid[UI_HISTORY_START] & ~((u64)1)) :
2539 self + UUID_NEW_BM_OFFSET == (mdev->ldev->md.uuid[UI_BITMAP] & ~((u64)1))) {
b411b363
PR
2540 /* The last P_SYNC_UUID did not get though. Undo the last start of
2541 resync as sync source modifications of our UUIDs. */
2542
31890f4a 2543 if (mdev->tconn->agreed_pro_version < 91)
4a23f264 2544 return -1091;
b411b363
PR
2545
2546 _drbd_uuid_set(mdev, UI_BITMAP, mdev->ldev->md.uuid[UI_HISTORY_START]);
2547 _drbd_uuid_set(mdev, UI_HISTORY_START, mdev->ldev->md.uuid[UI_HISTORY_START + 1]);
2548
4a23f264 2549 dev_info(DEV, "Last syncUUID did not get through, corrected:\n");
b411b363
PR
2550 drbd_uuid_dump(mdev, "self", mdev->ldev->md.uuid,
2551 mdev->state.disk >= D_NEGOTIATING ? drbd_bm_total_weight(mdev) : 0, 0);
2552
2553 return 1;
2554 }
2555 }
2556
2557
2558 *rule_nr = 80;
d8c2a36b 2559 peer = mdev->p_uuid[UI_CURRENT] & ~((u64)1);
b411b363
PR
2560 for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
2561 self = mdev->ldev->md.uuid[i] & ~((u64)1);
2562 if (self == peer)
2563 return 2;
2564 }
2565
2566 *rule_nr = 90;
2567 self = mdev->ldev->md.uuid[UI_BITMAP] & ~((u64)1);
2568 peer = mdev->p_uuid[UI_BITMAP] & ~((u64)1);
2569 if (self == peer && self != ((u64)0))
2570 return 100;
2571
2572 *rule_nr = 100;
2573 for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
2574 self = mdev->ldev->md.uuid[i] & ~((u64)1);
2575 for (j = UI_HISTORY_START; j <= UI_HISTORY_END; j++) {
2576 peer = mdev->p_uuid[j] & ~((u64)1);
2577 if (self == peer)
2578 return -100;
2579 }
2580 }
2581
2582 return -1000;
2583}
2584
2585/* drbd_sync_handshake() returns the new conn state on success, or
2586 CONN_MASK (-1) on failure.
2587 */
2588static enum drbd_conns drbd_sync_handshake(struct drbd_conf *mdev, enum drbd_role peer_role,
2589 enum drbd_disk_state peer_disk) __must_hold(local)
2590{
2591 int hg, rule_nr;
2592 enum drbd_conns rv = C_MASK;
2593 enum drbd_disk_state mydisk;
2594
2595 mydisk = mdev->state.disk;
2596 if (mydisk == D_NEGOTIATING)
2597 mydisk = mdev->new_state_tmp.disk;
2598
2599 dev_info(DEV, "drbd_sync_handshake:\n");
2600 drbd_uuid_dump(mdev, "self", mdev->ldev->md.uuid, mdev->comm_bm_set, 0);
2601 drbd_uuid_dump(mdev, "peer", mdev->p_uuid,
2602 mdev->p_uuid[UI_SIZE], mdev->p_uuid[UI_FLAGS]);
2603
2604 hg = drbd_uuid_compare(mdev, &rule_nr);
2605
2606 dev_info(DEV, "uuid_compare()=%d by rule %d\n", hg, rule_nr);
2607
2608 if (hg == -1000) {
2609 dev_alert(DEV, "Unrelated data, aborting!\n");
2610 return C_MASK;
2611 }
4a23f264
PR
2612 if (hg < -1000) {
2613 dev_alert(DEV, "To resolve this both sides have to support at least protocol %d\n", -hg - 1000);
b411b363
PR
2614 return C_MASK;
2615 }
2616
2617 if ((mydisk == D_INCONSISTENT && peer_disk > D_INCONSISTENT) ||
2618 (peer_disk == D_INCONSISTENT && mydisk > D_INCONSISTENT)) {
2619 int f = (hg == -100) || abs(hg) == 2;
2620 hg = mydisk > D_INCONSISTENT ? 1 : -1;
2621 if (f)
2622 hg = hg*2;
2623 dev_info(DEV, "Becoming sync %s due to disk states.\n",
2624 hg > 0 ? "source" : "target");
2625 }
2626
3a11a487
AG
2627 if (abs(hg) == 100)
2628 drbd_khelper(mdev, "initial-split-brain");
2629
89e58e75 2630 if (hg == 100 || (hg == -100 && mdev->tconn->net_conf->always_asbp)) {
b411b363
PR
2631 int pcount = (mdev->state.role == R_PRIMARY)
2632 + (peer_role == R_PRIMARY);
2633 int forced = (hg == -100);
2634
2635 switch (pcount) {
2636 case 0:
2637 hg = drbd_asb_recover_0p(mdev);
2638 break;
2639 case 1:
2640 hg = drbd_asb_recover_1p(mdev);
2641 break;
2642 case 2:
2643 hg = drbd_asb_recover_2p(mdev);
2644 break;
2645 }
2646 if (abs(hg) < 100) {
2647 dev_warn(DEV, "Split-Brain detected, %d primaries, "
2648 "automatically solved. Sync from %s node\n",
2649 pcount, (hg < 0) ? "peer" : "this");
2650 if (forced) {
2651 dev_warn(DEV, "Doing a full sync, since"
2652 " UUIDs where ambiguous.\n");
2653 hg = hg*2;
2654 }
2655 }
2656 }
2657
2658 if (hg == -100) {
89e58e75 2659 if (mdev->tconn->net_conf->want_lose && !(mdev->p_uuid[UI_FLAGS]&1))
b411b363 2660 hg = -1;
89e58e75 2661 if (!mdev->tconn->net_conf->want_lose && (mdev->p_uuid[UI_FLAGS]&1))
b411b363
PR
2662 hg = 1;
2663
2664 if (abs(hg) < 100)
2665 dev_warn(DEV, "Split-Brain detected, manually solved. "
2666 "Sync from %s node\n",
2667 (hg < 0) ? "peer" : "this");
2668 }
2669
2670 if (hg == -100) {
580b9767
LE
2671 /* FIXME this log message is not correct if we end up here
2672 * after an attempted attach on a diskless node.
2673 * We just refuse to attach -- well, we drop the "connection"
2674 * to that disk, in a way... */
3a11a487 2675 dev_alert(DEV, "Split-Brain detected but unresolved, dropping connection!\n");
b411b363
PR
2676 drbd_khelper(mdev, "split-brain");
2677 return C_MASK;
2678 }
2679
2680 if (hg > 0 && mydisk <= D_INCONSISTENT) {
2681 dev_err(DEV, "I shall become SyncSource, but I am inconsistent!\n");
2682 return C_MASK;
2683 }
2684
2685 if (hg < 0 && /* by intention we do not use mydisk here. */
2686 mdev->state.role == R_PRIMARY && mdev->state.disk >= D_CONSISTENT) {
89e58e75 2687 switch (mdev->tconn->net_conf->rr_conflict) {
b411b363
PR
2688 case ASB_CALL_HELPER:
2689 drbd_khelper(mdev, "pri-lost");
2690 /* fall through */
2691 case ASB_DISCONNECT:
2692 dev_err(DEV, "I shall become SyncTarget, but I am primary!\n");
2693 return C_MASK;
2694 case ASB_VIOLENTLY:
2695 dev_warn(DEV, "Becoming SyncTarget, violating the stable-data"
2696 "assumption\n");
2697 }
2698 }
2699
89e58e75 2700 if (mdev->tconn->net_conf->dry_run || test_bit(CONN_DRY_RUN, &mdev->flags)) {
cf14c2e9
PR
2701 if (hg == 0)
2702 dev_info(DEV, "dry-run connect: No resync, would become Connected immediately.\n");
2703 else
2704 dev_info(DEV, "dry-run connect: Would become %s, doing a %s resync.",
2705 drbd_conn_str(hg > 0 ? C_SYNC_SOURCE : C_SYNC_TARGET),
2706 abs(hg) >= 2 ? "full" : "bit-map based");
2707 return C_MASK;
2708 }
2709
b411b363
PR
2710 if (abs(hg) >= 2) {
2711 dev_info(DEV, "Writing the whole bitmap, full sync required after drbd_sync_handshake.\n");
20ceb2b2
LE
2712 if (drbd_bitmap_io(mdev, &drbd_bmio_set_n_write, "set_n_write from sync_handshake",
2713 BM_LOCKED_SET_ALLOWED))
b411b363
PR
2714 return C_MASK;
2715 }
2716
2717 if (hg > 0) { /* become sync source. */
2718 rv = C_WF_BITMAP_S;
2719 } else if (hg < 0) { /* become sync target */
2720 rv = C_WF_BITMAP_T;
2721 } else {
2722 rv = C_CONNECTED;
2723 if (drbd_bm_total_weight(mdev)) {
2724 dev_info(DEV, "No resync, but %lu bits in bitmap!\n",
2725 drbd_bm_total_weight(mdev));
2726 }
2727 }
2728
2729 return rv;
2730}
2731
2732/* returns 1 if invalid */
2733static int cmp_after_sb(enum drbd_after_sb_p peer, enum drbd_after_sb_p self)
2734{
2735 /* ASB_DISCARD_REMOTE - ASB_DISCARD_LOCAL is valid */
2736 if ((peer == ASB_DISCARD_REMOTE && self == ASB_DISCARD_LOCAL) ||
2737 (self == ASB_DISCARD_REMOTE && peer == ASB_DISCARD_LOCAL))
2738 return 0;
2739
2740 /* any other things with ASB_DISCARD_REMOTE or ASB_DISCARD_LOCAL are invalid */
2741 if (peer == ASB_DISCARD_REMOTE || peer == ASB_DISCARD_LOCAL ||
2742 self == ASB_DISCARD_REMOTE || self == ASB_DISCARD_LOCAL)
2743 return 1;
2744
2745 /* everything else is valid if they are equal on both sides. */
2746 if (peer == self)
2747 return 0;
2748
2749 /* everything es is invalid. */
2750 return 1;
2751}
2752
d8763023
AG
2753static int receive_protocol(struct drbd_conf *mdev, enum drbd_packet cmd,
2754 unsigned int data_size)
b411b363 2755{
e42325a5 2756 struct p_protocol *p = &mdev->tconn->data.rbuf.protocol;
b411b363 2757 int p_proto, p_after_sb_0p, p_after_sb_1p, p_after_sb_2p;
cf14c2e9 2758 int p_want_lose, p_two_primaries, cf;
b411b363
PR
2759 char p_integrity_alg[SHARED_SECRET_MAX] = "";
2760
b411b363
PR
2761 p_proto = be32_to_cpu(p->protocol);
2762 p_after_sb_0p = be32_to_cpu(p->after_sb_0p);
2763 p_after_sb_1p = be32_to_cpu(p->after_sb_1p);
2764 p_after_sb_2p = be32_to_cpu(p->after_sb_2p);
b411b363 2765 p_two_primaries = be32_to_cpu(p->two_primaries);
cf14c2e9
PR
2766 cf = be32_to_cpu(p->conn_flags);
2767 p_want_lose = cf & CF_WANT_LOSE;
2768
2769 clear_bit(CONN_DRY_RUN, &mdev->flags);
2770
2771 if (cf & CF_DRY_RUN)
2772 set_bit(CONN_DRY_RUN, &mdev->flags);
b411b363 2773
89e58e75 2774 if (p_proto != mdev->tconn->net_conf->wire_protocol) {
b411b363
PR
2775 dev_err(DEV, "incompatible communication protocols\n");
2776 goto disconnect;
2777 }
2778
89e58e75 2779 if (cmp_after_sb(p_after_sb_0p, mdev->tconn->net_conf->after_sb_0p)) {
b411b363
PR
2780 dev_err(DEV, "incompatible after-sb-0pri settings\n");
2781 goto disconnect;
2782 }
2783
89e58e75 2784 if (cmp_after_sb(p_after_sb_1p, mdev->tconn->net_conf->after_sb_1p)) {
b411b363
PR
2785 dev_err(DEV, "incompatible after-sb-1pri settings\n");
2786 goto disconnect;
2787 }
2788
89e58e75 2789 if (cmp_after_sb(p_after_sb_2p, mdev->tconn->net_conf->after_sb_2p)) {
b411b363
PR
2790 dev_err(DEV, "incompatible after-sb-2pri settings\n");
2791 goto disconnect;
2792 }
2793
89e58e75 2794 if (p_want_lose && mdev->tconn->net_conf->want_lose) {
b411b363
PR
2795 dev_err(DEV, "both sides have the 'want_lose' flag set\n");
2796 goto disconnect;
2797 }
2798
89e58e75 2799 if (p_two_primaries != mdev->tconn->net_conf->two_primaries) {
b411b363
PR
2800 dev_err(DEV, "incompatible setting of the two-primaries options\n");
2801 goto disconnect;
2802 }
2803
31890f4a 2804 if (mdev->tconn->agreed_pro_version >= 87) {
89e58e75 2805 unsigned char *my_alg = mdev->tconn->net_conf->integrity_alg;
b411b363 2806
de0ff338 2807 if (drbd_recv(mdev->tconn, p_integrity_alg, data_size) != data_size)
81e84650 2808 return false;
b411b363
PR
2809
2810 p_integrity_alg[SHARED_SECRET_MAX-1] = 0;
2811 if (strcmp(p_integrity_alg, my_alg)) {
2812 dev_err(DEV, "incompatible setting of the data-integrity-alg\n");
2813 goto disconnect;
2814 }
2815 dev_info(DEV, "data-integrity-alg: %s\n",
2816 my_alg[0] ? my_alg : (unsigned char *)"<not-used>");
2817 }
2818
81e84650 2819 return true;
b411b363
PR
2820
2821disconnect:
2822 drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
81e84650 2823 return false;
b411b363
PR
2824}
2825
2826/* helper function
2827 * input: alg name, feature name
2828 * return: NULL (alg name was "")
2829 * ERR_PTR(error) if something goes wrong
2830 * or the crypto hash ptr, if it worked out ok. */
2831struct crypto_hash *drbd_crypto_alloc_digest_safe(const struct drbd_conf *mdev,
2832 const char *alg, const char *name)
2833{
2834 struct crypto_hash *tfm;
2835
2836 if (!alg[0])
2837 return NULL;
2838
2839 tfm = crypto_alloc_hash(alg, 0, CRYPTO_ALG_ASYNC);
2840 if (IS_ERR(tfm)) {
2841 dev_err(DEV, "Can not allocate \"%s\" as %s (reason: %ld)\n",
2842 alg, name, PTR_ERR(tfm));
2843 return tfm;
2844 }
2845 if (!drbd_crypto_is_hash(crypto_hash_tfm(tfm))) {
2846 crypto_free_hash(tfm);
2847 dev_err(DEV, "\"%s\" is not a digest (%s)\n", alg, name);
2848 return ERR_PTR(-EINVAL);
2849 }
2850 return tfm;
2851}
2852
d8763023
AG
2853static int receive_SyncParam(struct drbd_conf *mdev, enum drbd_packet cmd,
2854 unsigned int packet_size)
b411b363 2855{
81e84650 2856 int ok = true;
e42325a5 2857 struct p_rs_param_95 *p = &mdev->tconn->data.rbuf.rs_param_95;
b411b363
PR
2858 unsigned int header_size, data_size, exp_max_sz;
2859 struct crypto_hash *verify_tfm = NULL;
2860 struct crypto_hash *csums_tfm = NULL;
31890f4a 2861 const int apv = mdev->tconn->agreed_pro_version;
778f271d
PR
2862 int *rs_plan_s = NULL;
2863 int fifo_size = 0;
b411b363
PR
2864
2865 exp_max_sz = apv <= 87 ? sizeof(struct p_rs_param)
2866 : apv == 88 ? sizeof(struct p_rs_param)
2867 + SHARED_SECRET_MAX
8e26f9cc
PR
2868 : apv <= 94 ? sizeof(struct p_rs_param_89)
2869 : /* apv >= 95 */ sizeof(struct p_rs_param_95);
b411b363 2870
02918be2 2871 if (packet_size > exp_max_sz) {
b411b363 2872 dev_err(DEV, "SyncParam packet too long: received %u, expected <= %u bytes\n",
02918be2 2873 packet_size, exp_max_sz);
81e84650 2874 return false;
b411b363
PR
2875 }
2876
2877 if (apv <= 88) {
257d0af6 2878 header_size = sizeof(struct p_rs_param) - sizeof(struct p_header);
02918be2 2879 data_size = packet_size - header_size;
8e26f9cc 2880 } else if (apv <= 94) {
257d0af6 2881 header_size = sizeof(struct p_rs_param_89) - sizeof(struct p_header);
02918be2 2882 data_size = packet_size - header_size;
b411b363 2883 D_ASSERT(data_size == 0);
8e26f9cc 2884 } else {
257d0af6 2885 header_size = sizeof(struct p_rs_param_95) - sizeof(struct p_header);
02918be2 2886 data_size = packet_size - header_size;
b411b363
PR
2887 D_ASSERT(data_size == 0);
2888 }
2889
2890 /* initialize verify_alg and csums_alg */
2891 memset(p->verify_alg, 0, 2 * SHARED_SECRET_MAX);
2892
de0ff338 2893 if (drbd_recv(mdev->tconn, &p->head.payload, header_size) != header_size)
81e84650 2894 return false;
b411b363
PR
2895
2896 mdev->sync_conf.rate = be32_to_cpu(p->rate);
2897
2898 if (apv >= 88) {
2899 if (apv == 88) {
2900 if (data_size > SHARED_SECRET_MAX) {
2901 dev_err(DEV, "verify-alg too long, "
2902 "peer wants %u, accepting only %u byte\n",
2903 data_size, SHARED_SECRET_MAX);
81e84650 2904 return false;
b411b363
PR
2905 }
2906
de0ff338 2907 if (drbd_recv(mdev->tconn, p->verify_alg, data_size) != data_size)
81e84650 2908 return false;
b411b363
PR
2909
2910 /* we expect NUL terminated string */
2911 /* but just in case someone tries to be evil */
2912 D_ASSERT(p->verify_alg[data_size-1] == 0);
2913 p->verify_alg[data_size-1] = 0;
2914
2915 } else /* apv >= 89 */ {
2916 /* we still expect NUL terminated strings */
2917 /* but just in case someone tries to be evil */
2918 D_ASSERT(p->verify_alg[SHARED_SECRET_MAX-1] == 0);
2919 D_ASSERT(p->csums_alg[SHARED_SECRET_MAX-1] == 0);
2920 p->verify_alg[SHARED_SECRET_MAX-1] = 0;
2921 p->csums_alg[SHARED_SECRET_MAX-1] = 0;
2922 }
2923
2924 if (strcmp(mdev->sync_conf.verify_alg, p->verify_alg)) {
2925 if (mdev->state.conn == C_WF_REPORT_PARAMS) {
2926 dev_err(DEV, "Different verify-alg settings. me=\"%s\" peer=\"%s\"\n",
2927 mdev->sync_conf.verify_alg, p->verify_alg);
2928 goto disconnect;
2929 }
2930 verify_tfm = drbd_crypto_alloc_digest_safe(mdev,
2931 p->verify_alg, "verify-alg");
2932 if (IS_ERR(verify_tfm)) {
2933 verify_tfm = NULL;
2934 goto disconnect;
2935 }
2936 }
2937
2938 if (apv >= 89 && strcmp(mdev->sync_conf.csums_alg, p->csums_alg)) {
2939 if (mdev->state.conn == C_WF_REPORT_PARAMS) {
2940 dev_err(DEV, "Different csums-alg settings. me=\"%s\" peer=\"%s\"\n",
2941 mdev->sync_conf.csums_alg, p->csums_alg);
2942 goto disconnect;
2943 }
2944 csums_tfm = drbd_crypto_alloc_digest_safe(mdev,
2945 p->csums_alg, "csums-alg");
2946 if (IS_ERR(csums_tfm)) {
2947 csums_tfm = NULL;
2948 goto disconnect;
2949 }
2950 }
2951
8e26f9cc
PR
2952 if (apv > 94) {
2953 mdev->sync_conf.rate = be32_to_cpu(p->rate);
2954 mdev->sync_conf.c_plan_ahead = be32_to_cpu(p->c_plan_ahead);
2955 mdev->sync_conf.c_delay_target = be32_to_cpu(p->c_delay_target);
2956 mdev->sync_conf.c_fill_target = be32_to_cpu(p->c_fill_target);
2957 mdev->sync_conf.c_max_rate = be32_to_cpu(p->c_max_rate);
778f271d
PR
2958
2959 fifo_size = (mdev->sync_conf.c_plan_ahead * 10 * SLEEP_TIME) / HZ;
2960 if (fifo_size != mdev->rs_plan_s.size && fifo_size > 0) {
2961 rs_plan_s = kzalloc(sizeof(int) * fifo_size, GFP_KERNEL);
2962 if (!rs_plan_s) {
2963 dev_err(DEV, "kmalloc of fifo_buffer failed");
2964 goto disconnect;
2965 }
2966 }
8e26f9cc 2967 }
b411b363
PR
2968
2969 spin_lock(&mdev->peer_seq_lock);
2970 /* lock against drbd_nl_syncer_conf() */
2971 if (verify_tfm) {
2972 strcpy(mdev->sync_conf.verify_alg, p->verify_alg);
2973 mdev->sync_conf.verify_alg_len = strlen(p->verify_alg) + 1;
2974 crypto_free_hash(mdev->verify_tfm);
2975 mdev->verify_tfm = verify_tfm;
2976 dev_info(DEV, "using verify-alg: \"%s\"\n", p->verify_alg);
2977 }
2978 if (csums_tfm) {
2979 strcpy(mdev->sync_conf.csums_alg, p->csums_alg);
2980 mdev->sync_conf.csums_alg_len = strlen(p->csums_alg) + 1;
2981 crypto_free_hash(mdev->csums_tfm);
2982 mdev->csums_tfm = csums_tfm;
2983 dev_info(DEV, "using csums-alg: \"%s\"\n", p->csums_alg);
2984 }
778f271d
PR
2985 if (fifo_size != mdev->rs_plan_s.size) {
2986 kfree(mdev->rs_plan_s.values);
2987 mdev->rs_plan_s.values = rs_plan_s;
2988 mdev->rs_plan_s.size = fifo_size;
2989 mdev->rs_planed = 0;
2990 }
b411b363
PR
2991 spin_unlock(&mdev->peer_seq_lock);
2992 }
2993
2994 return ok;
2995disconnect:
2996 /* just for completeness: actually not needed,
2997 * as this is not reached if csums_tfm was ok. */
2998 crypto_free_hash(csums_tfm);
2999 /* but free the verify_tfm again, if csums_tfm did not work out */
3000 crypto_free_hash(verify_tfm);
3001 drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
81e84650 3002 return false;
b411b363
PR
3003}
3004
b411b363
PR
3005/* warn if the arguments differ by more than 12.5% */
3006static void warn_if_differ_considerably(struct drbd_conf *mdev,
3007 const char *s, sector_t a, sector_t b)
3008{
3009 sector_t d;
3010 if (a == 0 || b == 0)
3011 return;
3012 d = (a > b) ? (a - b) : (b - a);
3013 if (d > (a>>3) || d > (b>>3))
3014 dev_warn(DEV, "Considerable difference in %s: %llus vs. %llus\n", s,
3015 (unsigned long long)a, (unsigned long long)b);
3016}
3017
d8763023
AG
3018static int receive_sizes(struct drbd_conf *mdev, enum drbd_packet cmd,
3019 unsigned int data_size)
b411b363 3020{
e42325a5 3021 struct p_sizes *p = &mdev->tconn->data.rbuf.sizes;
b411b363 3022 enum determine_dev_size dd = unchanged;
b411b363
PR
3023 sector_t p_size, p_usize, my_usize;
3024 int ldsc = 0; /* local disk size changed */
e89b591c 3025 enum dds_flags ddsf;
b411b363 3026
b411b363
PR
3027 p_size = be64_to_cpu(p->d_size);
3028 p_usize = be64_to_cpu(p->u_size);
3029
3030 if (p_size == 0 && mdev->state.disk == D_DISKLESS) {
3031 dev_err(DEV, "some backing storage is needed\n");
3032 drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
81e84650 3033 return false;
b411b363
PR
3034 }
3035
3036 /* just store the peer's disk size for now.
3037 * we still need to figure out whether we accept that. */
3038 mdev->p_size = p_size;
3039
b411b363
PR
3040 if (get_ldev(mdev)) {
3041 warn_if_differ_considerably(mdev, "lower level device sizes",
3042 p_size, drbd_get_max_capacity(mdev->ldev));
3043 warn_if_differ_considerably(mdev, "user requested size",
3044 p_usize, mdev->ldev->dc.disk_size);
3045
3046 /* if this is the first connect, or an otherwise expected
3047 * param exchange, choose the minimum */
3048 if (mdev->state.conn == C_WF_REPORT_PARAMS)
3049 p_usize = min_not_zero((sector_t)mdev->ldev->dc.disk_size,
3050 p_usize);
3051
3052 my_usize = mdev->ldev->dc.disk_size;
3053
3054 if (mdev->ldev->dc.disk_size != p_usize) {
3055 mdev->ldev->dc.disk_size = p_usize;
3056 dev_info(DEV, "Peer sets u_size to %lu sectors\n",
3057 (unsigned long)mdev->ldev->dc.disk_size);
3058 }
3059
3060 /* Never shrink a device with usable data during connect.
3061 But allow online shrinking if we are connected. */
a393db6f 3062 if (drbd_new_dev_size(mdev, mdev->ldev, 0) <
b411b363
PR
3063 drbd_get_capacity(mdev->this_bdev) &&
3064 mdev->state.disk >= D_OUTDATED &&
3065 mdev->state.conn < C_CONNECTED) {
3066 dev_err(DEV, "The peer's disk size is too small!\n");
3067 drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
3068 mdev->ldev->dc.disk_size = my_usize;
3069 put_ldev(mdev);
81e84650 3070 return false;
b411b363
PR
3071 }
3072 put_ldev(mdev);
3073 }
b411b363 3074
e89b591c 3075 ddsf = be16_to_cpu(p->dds_flags);
b411b363 3076 if (get_ldev(mdev)) {
24c4830c 3077 dd = drbd_determine_dev_size(mdev, ddsf);
b411b363
PR
3078 put_ldev(mdev);
3079 if (dd == dev_size_error)
81e84650 3080 return false;
b411b363
PR
3081 drbd_md_sync(mdev);
3082 } else {
3083 /* I am diskless, need to accept the peer's size. */
3084 drbd_set_my_capacity(mdev, p_size);
3085 }
3086
99432fcc
PR
3087 mdev->peer_max_bio_size = be32_to_cpu(p->max_bio_size);
3088 drbd_reconsider_max_bio_size(mdev);
3089
b411b363
PR
3090 if (get_ldev(mdev)) {
3091 if (mdev->ldev->known_size != drbd_get_capacity(mdev->ldev->backing_bdev)) {
3092 mdev->ldev->known_size = drbd_get_capacity(mdev->ldev->backing_bdev);
3093 ldsc = 1;
3094 }
3095
b411b363
PR
3096 put_ldev(mdev);
3097 }
3098
3099 if (mdev->state.conn > C_WF_REPORT_PARAMS) {
3100 if (be64_to_cpu(p->c_size) !=
3101 drbd_get_capacity(mdev->this_bdev) || ldsc) {
3102 /* we have different sizes, probably peer
3103 * needs to know my new size... */
e89b591c 3104 drbd_send_sizes(mdev, 0, ddsf);
b411b363
PR
3105 }
3106 if (test_and_clear_bit(RESIZE_PENDING, &mdev->flags) ||
3107 (dd == grew && mdev->state.conn == C_CONNECTED)) {
3108 if (mdev->state.pdsk >= D_INCONSISTENT &&
e89b591c
PR
3109 mdev->state.disk >= D_INCONSISTENT) {
3110 if (ddsf & DDSF_NO_RESYNC)
3111 dev_info(DEV, "Resync of new storage suppressed with --assume-clean\n");
3112 else
3113 resync_after_online_grow(mdev);
3114 } else
b411b363
PR
3115 set_bit(RESYNC_AFTER_NEG, &mdev->flags);
3116 }
3117 }
3118
81e84650 3119 return true;
b411b363
PR
3120}
3121
d8763023
AG
3122static int receive_uuids(struct drbd_conf *mdev, enum drbd_packet cmd,
3123 unsigned int data_size)
b411b363 3124{
e42325a5 3125 struct p_uuids *p = &mdev->tconn->data.rbuf.uuids;
b411b363 3126 u64 *p_uuid;
62b0da3a 3127 int i, updated_uuids = 0;
b411b363 3128
b411b363
PR
3129 p_uuid = kmalloc(sizeof(u64)*UI_EXTENDED_SIZE, GFP_NOIO);
3130
3131 for (i = UI_CURRENT; i < UI_EXTENDED_SIZE; i++)
3132 p_uuid[i] = be64_to_cpu(p->uuid[i]);
3133
3134 kfree(mdev->p_uuid);
3135 mdev->p_uuid = p_uuid;
3136
3137 if (mdev->state.conn < C_CONNECTED &&
3138 mdev->state.disk < D_INCONSISTENT &&
3139 mdev->state.role == R_PRIMARY &&
3140 (mdev->ed_uuid & ~((u64)1)) != (p_uuid[UI_CURRENT] & ~((u64)1))) {
3141 dev_err(DEV, "Can only connect to data with current UUID=%016llX\n",
3142 (unsigned long long)mdev->ed_uuid);
3143 drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
81e84650 3144 return false;
b411b363
PR
3145 }
3146
3147 if (get_ldev(mdev)) {
3148 int skip_initial_sync =
3149 mdev->state.conn == C_CONNECTED &&
31890f4a 3150 mdev->tconn->agreed_pro_version >= 90 &&
b411b363
PR
3151 mdev->ldev->md.uuid[UI_CURRENT] == UUID_JUST_CREATED &&
3152 (p_uuid[UI_FLAGS] & 8);
3153 if (skip_initial_sync) {
3154 dev_info(DEV, "Accepted new current UUID, preparing to skip initial sync\n");
3155 drbd_bitmap_io(mdev, &drbd_bmio_clear_n_write,
20ceb2b2
LE
3156 "clear_n_write from receive_uuids",
3157 BM_LOCKED_TEST_ALLOWED);
b411b363
PR
3158 _drbd_uuid_set(mdev, UI_CURRENT, p_uuid[UI_CURRENT]);
3159 _drbd_uuid_set(mdev, UI_BITMAP, 0);
3160 _drbd_set_state(_NS2(mdev, disk, D_UP_TO_DATE, pdsk, D_UP_TO_DATE),
3161 CS_VERBOSE, NULL);
3162 drbd_md_sync(mdev);
62b0da3a 3163 updated_uuids = 1;
b411b363
PR
3164 }
3165 put_ldev(mdev);
18a50fa2
PR
3166 } else if (mdev->state.disk < D_INCONSISTENT &&
3167 mdev->state.role == R_PRIMARY) {
3168 /* I am a diskless primary, the peer just created a new current UUID
3169 for me. */
62b0da3a 3170 updated_uuids = drbd_set_ed_uuid(mdev, p_uuid[UI_CURRENT]);
b411b363
PR
3171 }
3172
3173 /* Before we test for the disk state, we should wait until an eventually
3174 ongoing cluster wide state change is finished. That is important if
3175 we are primary and are detaching from our disk. We need to see the
3176 new disk state... */
8410da8f
PR
3177 mutex_lock(mdev->state_mutex);
3178 mutex_unlock(mdev->state_mutex);
b411b363 3179 if (mdev->state.conn >= C_CONNECTED && mdev->state.disk < D_INCONSISTENT)
62b0da3a
LE
3180 updated_uuids |= drbd_set_ed_uuid(mdev, p_uuid[UI_CURRENT]);
3181
3182 if (updated_uuids)
3183 drbd_print_uuids(mdev, "receiver updated UUIDs to");
b411b363 3184
81e84650 3185 return true;
b411b363
PR
3186}
3187
3188/**
3189 * convert_state() - Converts the peer's view of the cluster state to our point of view
3190 * @ps: The state as seen by the peer.
3191 */
3192static union drbd_state convert_state(union drbd_state ps)
3193{
3194 union drbd_state ms;
3195
3196 static enum drbd_conns c_tab[] = {
3197 [C_CONNECTED] = C_CONNECTED,
3198
3199 [C_STARTING_SYNC_S] = C_STARTING_SYNC_T,
3200 [C_STARTING_SYNC_T] = C_STARTING_SYNC_S,
3201 [C_DISCONNECTING] = C_TEAR_DOWN, /* C_NETWORK_FAILURE, */
3202 [C_VERIFY_S] = C_VERIFY_T,
3203 [C_MASK] = C_MASK,
3204 };
3205
3206 ms.i = ps.i;
3207
3208 ms.conn = c_tab[ps.conn];
3209 ms.peer = ps.role;
3210 ms.role = ps.peer;
3211 ms.pdsk = ps.disk;
3212 ms.disk = ps.pdsk;
3213 ms.peer_isp = (ps.aftr_isp | ps.user_isp);
3214
3215 return ms;
3216}
3217
d8763023
AG
3218static int receive_req_state(struct drbd_conf *mdev, enum drbd_packet cmd,
3219 unsigned int data_size)
b411b363 3220{
e42325a5 3221 struct p_req_state *p = &mdev->tconn->data.rbuf.req_state;
b411b363 3222 union drbd_state mask, val;
bf885f8a 3223 enum drbd_state_rv rv;
b411b363 3224
b411b363
PR
3225 mask.i = be32_to_cpu(p->mask);
3226 val.i = be32_to_cpu(p->val);
3227
25703f83 3228 if (test_bit(DISCARD_CONCURRENT, &mdev->tconn->flags) &&
8410da8f 3229 mutex_is_locked(mdev->state_mutex)) {
b411b363 3230 drbd_send_sr_reply(mdev, SS_CONCURRENT_ST_CHG);
81e84650 3231 return true;
b411b363
PR
3232 }
3233
3234 mask = convert_state(mask);
3235 val = convert_state(val);
3236
047cd4a6
PR
3237 if (cmd == P_CONN_ST_CHG_REQ) {
3238 rv = conn_request_state(mdev->tconn, mask, val, CS_VERBOSE | CS_LOCAL_ONLY);
3239 conn_send_sr_reply(mdev->tconn, rv);
3240 } else {
3241 rv = drbd_change_state(mdev, CS_VERBOSE, mask, val);
3242 drbd_send_sr_reply(mdev, rv);
3243 }
b411b363 3244
b411b363
PR
3245 drbd_md_sync(mdev);
3246
81e84650 3247 return true;
b411b363
PR
3248}
3249
d8763023
AG
3250static int receive_state(struct drbd_conf *mdev, enum drbd_packet cmd,
3251 unsigned int data_size)
b411b363 3252{
e42325a5 3253 struct p_state *p = &mdev->tconn->data.rbuf.state;
4ac4aada 3254 union drbd_state os, ns, peer_state;
b411b363 3255 enum drbd_disk_state real_peer_disk;
65d922c3 3256 enum chg_state_flags cs_flags;
b411b363
PR
3257 int rv;
3258
b411b363
PR
3259 peer_state.i = be32_to_cpu(p->state);
3260
3261 real_peer_disk = peer_state.disk;
3262 if (peer_state.disk == D_NEGOTIATING) {
3263 real_peer_disk = mdev->p_uuid[UI_FLAGS] & 4 ? D_INCONSISTENT : D_CONSISTENT;
3264 dev_info(DEV, "real peer disk state = %s\n", drbd_disk_str(real_peer_disk));
3265 }
3266
87eeee41 3267 spin_lock_irq(&mdev->tconn->req_lock);
b411b363 3268 retry:
4ac4aada 3269 os = ns = mdev->state;
87eeee41 3270 spin_unlock_irq(&mdev->tconn->req_lock);
b411b363 3271
e9ef7bb6
LE
3272 /* peer says his disk is uptodate, while we think it is inconsistent,
3273 * and this happens while we think we have a sync going on. */
3274 if (os.pdsk == D_INCONSISTENT && real_peer_disk == D_UP_TO_DATE &&
3275 os.conn > C_CONNECTED && os.disk == D_UP_TO_DATE) {
3276 /* If we are (becoming) SyncSource, but peer is still in sync
3277 * preparation, ignore its uptodate-ness to avoid flapping, it
3278 * will change to inconsistent once the peer reaches active
3279 * syncing states.
3280 * It may have changed syncer-paused flags, however, so we
3281 * cannot ignore this completely. */
3282 if (peer_state.conn > C_CONNECTED &&
3283 peer_state.conn < C_SYNC_SOURCE)
3284 real_peer_disk = D_INCONSISTENT;
3285
3286 /* if peer_state changes to connected at the same time,
3287 * it explicitly notifies us that it finished resync.
3288 * Maybe we should finish it up, too? */
3289 else if (os.conn >= C_SYNC_SOURCE &&
3290 peer_state.conn == C_CONNECTED) {
3291 if (drbd_bm_total_weight(mdev) <= mdev->rs_failed)
3292 drbd_resync_finished(mdev);
81e84650 3293 return true;
e9ef7bb6
LE
3294 }
3295 }
3296
3297 /* peer says his disk is inconsistent, while we think it is uptodate,
3298 * and this happens while the peer still thinks we have a sync going on,
3299 * but we think we are already done with the sync.
3300 * We ignore this to avoid flapping pdsk.
3301 * This should not happen, if the peer is a recent version of drbd. */
3302 if (os.pdsk == D_UP_TO_DATE && real_peer_disk == D_INCONSISTENT &&
3303 os.conn == C_CONNECTED && peer_state.conn > C_SYNC_SOURCE)
3304 real_peer_disk = D_UP_TO_DATE;
3305
4ac4aada
LE
3306 if (ns.conn == C_WF_REPORT_PARAMS)
3307 ns.conn = C_CONNECTED;
b411b363 3308
67531718
PR
3309 if (peer_state.conn == C_AHEAD)
3310 ns.conn = C_BEHIND;
3311
b411b363
PR
3312 if (mdev->p_uuid && peer_state.disk >= D_NEGOTIATING &&
3313 get_ldev_if_state(mdev, D_NEGOTIATING)) {
3314 int cr; /* consider resync */
3315
3316 /* if we established a new connection */
4ac4aada 3317 cr = (os.conn < C_CONNECTED);
b411b363
PR
3318 /* if we had an established connection
3319 * and one of the nodes newly attaches a disk */
4ac4aada 3320 cr |= (os.conn == C_CONNECTED &&
b411b363 3321 (peer_state.disk == D_NEGOTIATING ||
4ac4aada 3322 os.disk == D_NEGOTIATING));
b411b363
PR
3323 /* if we have both been inconsistent, and the peer has been
3324 * forced to be UpToDate with --overwrite-data */
3325 cr |= test_bit(CONSIDER_RESYNC, &mdev->flags);
3326 /* if we had been plain connected, and the admin requested to
3327 * start a sync by "invalidate" or "invalidate-remote" */
4ac4aada 3328 cr |= (os.conn == C_CONNECTED &&
b411b363
PR
3329 (peer_state.conn >= C_STARTING_SYNC_S &&
3330 peer_state.conn <= C_WF_BITMAP_T));
3331
3332 if (cr)
4ac4aada 3333 ns.conn = drbd_sync_handshake(mdev, peer_state.role, real_peer_disk);
b411b363
PR
3334
3335 put_ldev(mdev);
4ac4aada
LE
3336 if (ns.conn == C_MASK) {
3337 ns.conn = C_CONNECTED;
b411b363 3338 if (mdev->state.disk == D_NEGOTIATING) {
82f59cc6 3339 drbd_force_state(mdev, NS(disk, D_FAILED));
b411b363
PR
3340 } else if (peer_state.disk == D_NEGOTIATING) {
3341 dev_err(DEV, "Disk attach process on the peer node was aborted.\n");
3342 peer_state.disk = D_DISKLESS;
580b9767 3343 real_peer_disk = D_DISKLESS;
b411b363 3344 } else {
cf14c2e9 3345 if (test_and_clear_bit(CONN_DRY_RUN, &mdev->flags))
81e84650 3346 return false;
4ac4aada 3347 D_ASSERT(os.conn == C_WF_REPORT_PARAMS);
b411b363 3348 drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
81e84650 3349 return false;
b411b363
PR
3350 }
3351 }
3352 }
3353
87eeee41 3354 spin_lock_irq(&mdev->tconn->req_lock);
4ac4aada 3355 if (mdev->state.i != os.i)
b411b363
PR
3356 goto retry;
3357 clear_bit(CONSIDER_RESYNC, &mdev->flags);
b411b363
PR
3358 ns.peer = peer_state.role;
3359 ns.pdsk = real_peer_disk;
3360 ns.peer_isp = (peer_state.aftr_isp | peer_state.user_isp);
4ac4aada 3361 if ((ns.conn == C_CONNECTED || ns.conn == C_WF_BITMAP_S) && ns.disk == D_NEGOTIATING)
b411b363 3362 ns.disk = mdev->new_state_tmp.disk;
4ac4aada
LE
3363 cs_flags = CS_VERBOSE + (os.conn < C_CONNECTED && ns.conn >= C_CONNECTED ? 0 : CS_HARD);
3364 if (ns.pdsk == D_CONSISTENT && is_susp(ns) && ns.conn == C_CONNECTED && os.conn < C_CONNECTED &&
481c6f50 3365 test_bit(NEW_CUR_UUID, &mdev->flags)) {
8554df1c 3366 /* Do not allow tl_restart(RESEND) for a rebooted peer. We can only allow this
481c6f50 3367 for temporal network outages! */
87eeee41 3368 spin_unlock_irq(&mdev->tconn->req_lock);
481c6f50
PR
3369 dev_err(DEV, "Aborting Connect, can not thaw IO with an only Consistent peer\n");
3370 tl_clear(mdev);
3371 drbd_uuid_new_current(mdev);
3372 clear_bit(NEW_CUR_UUID, &mdev->flags);
3373 drbd_force_state(mdev, NS2(conn, C_PROTOCOL_ERROR, susp, 0));
81e84650 3374 return false;
481c6f50 3375 }
65d922c3 3376 rv = _drbd_set_state(mdev, ns, cs_flags, NULL);
b411b363 3377 ns = mdev->state;
87eeee41 3378 spin_unlock_irq(&mdev->tconn->req_lock);
b411b363
PR
3379
3380 if (rv < SS_SUCCESS) {
3381 drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
81e84650 3382 return false;
b411b363
PR
3383 }
3384
4ac4aada
LE
3385 if (os.conn > C_WF_REPORT_PARAMS) {
3386 if (ns.conn > C_CONNECTED && peer_state.conn <= C_CONNECTED &&
b411b363
PR
3387 peer_state.disk != D_NEGOTIATING ) {
3388 /* we want resync, peer has not yet decided to sync... */
3389 /* Nowadays only used when forcing a node into primary role and
3390 setting its disk to UpToDate with that */
3391 drbd_send_uuids(mdev);
3392 drbd_send_state(mdev);
3393 }
3394 }
3395
89e58e75 3396 mdev->tconn->net_conf->want_lose = 0;
b411b363
PR
3397
3398 drbd_md_sync(mdev); /* update connected indicator, la_size, ... */
3399
81e84650 3400 return true;
b411b363
PR
3401}
3402
d8763023
AG
3403static int receive_sync_uuid(struct drbd_conf *mdev, enum drbd_packet cmd,
3404 unsigned int data_size)
b411b363 3405{
e42325a5 3406 struct p_rs_uuid *p = &mdev->tconn->data.rbuf.rs_uuid;
b411b363
PR
3407
3408 wait_event(mdev->misc_wait,
3409 mdev->state.conn == C_WF_SYNC_UUID ||
c4752ef1 3410 mdev->state.conn == C_BEHIND ||
b411b363
PR
3411 mdev->state.conn < C_CONNECTED ||
3412 mdev->state.disk < D_NEGOTIATING);
3413
3414 /* D_ASSERT( mdev->state.conn == C_WF_SYNC_UUID ); */
3415
b411b363
PR
3416 /* Here the _drbd_uuid_ functions are right, current should
3417 _not_ be rotated into the history */
3418 if (get_ldev_if_state(mdev, D_NEGOTIATING)) {
3419 _drbd_uuid_set(mdev, UI_CURRENT, be64_to_cpu(p->uuid));
3420 _drbd_uuid_set(mdev, UI_BITMAP, 0UL);
3421
62b0da3a 3422 drbd_print_uuids(mdev, "updated sync uuid");
b411b363
PR
3423 drbd_start_resync(mdev, C_SYNC_TARGET);
3424
3425 put_ldev(mdev);
3426 } else
3427 dev_err(DEV, "Ignoring SyncUUID packet!\n");
3428
81e84650 3429 return true;
b411b363
PR
3430}
3431
2c46407d
AG
3432/**
3433 * receive_bitmap_plain
3434 *
3435 * Return 0 when done, 1 when another iteration is needed, and a negative error
3436 * code upon failure.
3437 */
3438static int
02918be2
PR
3439receive_bitmap_plain(struct drbd_conf *mdev, unsigned int data_size,
3440 unsigned long *buffer, struct bm_xfer_ctx *c)
b411b363
PR
3441{
3442 unsigned num_words = min_t(size_t, BM_PACKET_WORDS, c->bm_words - c->word_offset);
3443 unsigned want = num_words * sizeof(long);
2c46407d 3444 int err;
b411b363 3445
02918be2
PR
3446 if (want != data_size) {
3447 dev_err(DEV, "%s:want (%u) != data_size (%u)\n", __func__, want, data_size);
2c46407d 3448 return -EIO;
b411b363
PR
3449 }
3450 if (want == 0)
2c46407d 3451 return 0;
de0ff338 3452 err = drbd_recv(mdev->tconn, buffer, want);
2c46407d
AG
3453 if (err != want) {
3454 if (err >= 0)
3455 err = -EIO;
3456 return err;
3457 }
b411b363
PR
3458
3459 drbd_bm_merge_lel(mdev, c->word_offset, num_words, buffer);
3460
3461 c->word_offset += num_words;
3462 c->bit_offset = c->word_offset * BITS_PER_LONG;
3463 if (c->bit_offset > c->bm_bits)
3464 c->bit_offset = c->bm_bits;
3465
2c46407d 3466 return 1;
b411b363
PR
3467}
3468
2c46407d
AG
3469/**
3470 * recv_bm_rle_bits
3471 *
3472 * Return 0 when done, 1 when another iteration is needed, and a negative error
3473 * code upon failure.
3474 */
3475static int
b411b363
PR
3476recv_bm_rle_bits(struct drbd_conf *mdev,
3477 struct p_compressed_bm *p,
c6d25cfe
PR
3478 struct bm_xfer_ctx *c,
3479 unsigned int len)
b411b363
PR
3480{
3481 struct bitstream bs;
3482 u64 look_ahead;
3483 u64 rl;
3484 u64 tmp;
3485 unsigned long s = c->bit_offset;
3486 unsigned long e;
b411b363
PR
3487 int toggle = DCBP_get_start(p);
3488 int have;
3489 int bits;
3490
3491 bitstream_init(&bs, p->code, len, DCBP_get_pad_bits(p));
3492
3493 bits = bitstream_get_bits(&bs, &look_ahead, 64);
3494 if (bits < 0)
2c46407d 3495 return -EIO;
b411b363
PR
3496
3497 for (have = bits; have > 0; s += rl, toggle = !toggle) {
3498 bits = vli_decode_bits(&rl, look_ahead);
3499 if (bits <= 0)
2c46407d 3500 return -EIO;
b411b363
PR
3501
3502 if (toggle) {
3503 e = s + rl -1;
3504 if (e >= c->bm_bits) {
3505 dev_err(DEV, "bitmap overflow (e:%lu) while decoding bm RLE packet\n", e);
2c46407d 3506 return -EIO;
b411b363
PR
3507 }
3508 _drbd_bm_set_bits(mdev, s, e);
3509 }
3510
3511 if (have < bits) {
3512 dev_err(DEV, "bitmap decoding error: h:%d b:%d la:0x%08llx l:%u/%u\n",
3513 have, bits, look_ahead,
3514 (unsigned int)(bs.cur.b - p->code),
3515 (unsigned int)bs.buf_len);
2c46407d 3516 return -EIO;
b411b363
PR
3517 }
3518 look_ahead >>= bits;
3519 have -= bits;
3520
3521 bits = bitstream_get_bits(&bs, &tmp, 64 - have);
3522 if (bits < 0)
2c46407d 3523 return -EIO;
b411b363
PR
3524 look_ahead |= tmp << have;
3525 have += bits;
3526 }
3527
3528 c->bit_offset = s;
3529 bm_xfer_ctx_bit_to_word_offset(c);
3530
2c46407d 3531 return (s != c->bm_bits);
b411b363
PR
3532}
3533
2c46407d
AG
3534/**
3535 * decode_bitmap_c
3536 *
3537 * Return 0 when done, 1 when another iteration is needed, and a negative error
3538 * code upon failure.
3539 */
3540static int
b411b363
PR
3541decode_bitmap_c(struct drbd_conf *mdev,
3542 struct p_compressed_bm *p,
c6d25cfe
PR
3543 struct bm_xfer_ctx *c,
3544 unsigned int len)
b411b363
PR
3545{
3546 if (DCBP_get_code(p) == RLE_VLI_Bits)
c6d25cfe 3547 return recv_bm_rle_bits(mdev, p, c, len);
b411b363
PR
3548
3549 /* other variants had been implemented for evaluation,
3550 * but have been dropped as this one turned out to be "best"
3551 * during all our tests. */
3552
3553 dev_err(DEV, "receive_bitmap_c: unknown encoding %u\n", p->encoding);
3554 drbd_force_state(mdev, NS(conn, C_PROTOCOL_ERROR));
2c46407d 3555 return -EIO;
b411b363
PR
3556}
3557
3558void INFO_bm_xfer_stats(struct drbd_conf *mdev,
3559 const char *direction, struct bm_xfer_ctx *c)
3560{
3561 /* what would it take to transfer it "plaintext" */
c012949a 3562 unsigned plain = sizeof(struct p_header) *
b411b363
PR
3563 ((c->bm_words+BM_PACKET_WORDS-1)/BM_PACKET_WORDS+1)
3564 + c->bm_words * sizeof(long);
3565 unsigned total = c->bytes[0] + c->bytes[1];
3566 unsigned r;
3567
3568 /* total can not be zero. but just in case: */
3569 if (total == 0)
3570 return;
3571
3572 /* don't report if not compressed */
3573 if (total >= plain)
3574 return;
3575
3576 /* total < plain. check for overflow, still */
3577 r = (total > UINT_MAX/1000) ? (total / (plain/1000))
3578 : (1000 * total / plain);
3579
3580 if (r > 1000)
3581 r = 1000;
3582
3583 r = 1000 - r;
3584 dev_info(DEV, "%s bitmap stats [Bytes(packets)]: plain %u(%u), RLE %u(%u), "
3585 "total %u; compression: %u.%u%%\n",
3586 direction,
3587 c->bytes[1], c->packets[1],
3588 c->bytes[0], c->packets[0],
3589 total, r/10, r % 10);
3590}
3591
3592/* Since we are processing the bitfield from lower addresses to higher,
3593 it does not matter if the process it in 32 bit chunks or 64 bit
3594 chunks as long as it is little endian. (Understand it as byte stream,
3595 beginning with the lowest byte...) If we would use big endian
3596 we would need to process it from the highest address to the lowest,
3597 in order to be agnostic to the 32 vs 64 bits issue.
3598
3599 returns 0 on failure, 1 if we successfully received it. */
d8763023
AG
3600static int receive_bitmap(struct drbd_conf *mdev, enum drbd_packet cmd,
3601 unsigned int data_size)
b411b363
PR
3602{
3603 struct bm_xfer_ctx c;
3604 void *buffer;
2c46407d 3605 int err;
81e84650 3606 int ok = false;
257d0af6 3607 struct p_header *h = &mdev->tconn->data.rbuf.header;
77351055 3608 struct packet_info pi;
b411b363 3609
20ceb2b2
LE
3610 drbd_bm_lock(mdev, "receive bitmap", BM_LOCKED_SET_ALLOWED);
3611 /* you are supposed to send additional out-of-sync information
3612 * if you actually set bits during this phase */
b411b363
PR
3613
3614 /* maybe we should use some per thread scratch page,
3615 * and allocate that during initial device creation? */
3616 buffer = (unsigned long *) __get_free_page(GFP_NOIO);
3617 if (!buffer) {
3618 dev_err(DEV, "failed to allocate one page buffer in %s\n", __func__);
3619 goto out;
3620 }
3621
3622 c = (struct bm_xfer_ctx) {
3623 .bm_bits = drbd_bm_bits(mdev),
3624 .bm_words = drbd_bm_words(mdev),
3625 };
3626
2c46407d 3627 for(;;) {
02918be2 3628 if (cmd == P_BITMAP) {
2c46407d 3629 err = receive_bitmap_plain(mdev, data_size, buffer, &c);
02918be2 3630 } else if (cmd == P_COMPRESSED_BITMAP) {
b411b363
PR
3631 /* MAYBE: sanity check that we speak proto >= 90,
3632 * and the feature is enabled! */
3633 struct p_compressed_bm *p;
3634
02918be2 3635 if (data_size > BM_PACKET_PAYLOAD_BYTES) {
b411b363
PR
3636 dev_err(DEV, "ReportCBitmap packet too large\n");
3637 goto out;
3638 }
3639 /* use the page buff */
3640 p = buffer;
3641 memcpy(p, h, sizeof(*h));
de0ff338 3642 if (drbd_recv(mdev->tconn, p->head.payload, data_size) != data_size)
b411b363 3643 goto out;
004352fa
LE
3644 if (data_size <= (sizeof(*p) - sizeof(p->head))) {
3645 dev_err(DEV, "ReportCBitmap packet too small (l:%u)\n", data_size);
78fcbdae 3646 goto out;
b411b363 3647 }
c6d25cfe 3648 err = decode_bitmap_c(mdev, p, &c, data_size);
b411b363 3649 } else {
02918be2 3650 dev_warn(DEV, "receive_bitmap: cmd neither ReportBitMap nor ReportCBitMap (is 0x%x)", cmd);
b411b363
PR
3651 goto out;
3652 }
3653
02918be2 3654 c.packets[cmd == P_BITMAP]++;
257d0af6 3655 c.bytes[cmd == P_BITMAP] += sizeof(struct p_header) + data_size;
b411b363 3656
2c46407d
AG
3657 if (err <= 0) {
3658 if (err < 0)
3659 goto out;
b411b363 3660 break;
2c46407d 3661 }
9ba7aa00 3662 if (!drbd_recv_header(mdev->tconn, &pi))
b411b363 3663 goto out;
77351055
PR
3664 cmd = pi.cmd;
3665 data_size = pi.size;
2c46407d 3666 }
b411b363
PR
3667
3668 INFO_bm_xfer_stats(mdev, "receive", &c);
3669
3670 if (mdev->state.conn == C_WF_BITMAP_T) {
de1f8e4a
AG
3671 enum drbd_state_rv rv;
3672
b411b363
PR
3673 ok = !drbd_send_bitmap(mdev);
3674 if (!ok)
3675 goto out;
3676 /* Omit CS_ORDERED with this state transition to avoid deadlocks. */
de1f8e4a
AG
3677 rv = _drbd_request_state(mdev, NS(conn, C_WF_SYNC_UUID), CS_VERBOSE);
3678 D_ASSERT(rv == SS_SUCCESS);
b411b363
PR
3679 } else if (mdev->state.conn != C_WF_BITMAP_S) {
3680 /* admin may have requested C_DISCONNECTING,
3681 * other threads may have noticed network errors */
3682 dev_info(DEV, "unexpected cstate (%s) in receive_bitmap\n",
3683 drbd_conn_str(mdev->state.conn));
3684 }
3685
81e84650 3686 ok = true;
b411b363 3687 out:
20ceb2b2 3688 drbd_bm_unlock(mdev);
b411b363
PR
3689 if (ok && mdev->state.conn == C_WF_BITMAP_S)
3690 drbd_start_resync(mdev, C_SYNC_SOURCE);
3691 free_page((unsigned long) buffer);
3692 return ok;
3693}
3694
d8763023
AG
3695static int receive_skip(struct drbd_conf *mdev, enum drbd_packet cmd,
3696 unsigned int data_size)
b411b363
PR
3697{
3698 /* TODO zero copy sink :) */
3699 static char sink[128];
3700 int size, want, r;
3701
02918be2
PR
3702 dev_warn(DEV, "skipping unknown optional packet type %d, l: %d!\n",
3703 cmd, data_size);
b411b363 3704
02918be2 3705 size = data_size;
b411b363
PR
3706 while (size > 0) {
3707 want = min_t(int, size, sizeof(sink));
de0ff338 3708 r = drbd_recv(mdev->tconn, sink, want);
841ce241
AG
3709 if (!expect(r > 0))
3710 break;
b411b363
PR
3711 size -= r;
3712 }
3713 return size == 0;
3714}
3715
d8763023
AG
3716static int receive_UnplugRemote(struct drbd_conf *mdev, enum drbd_packet cmd,
3717 unsigned int data_size)
0ced55a3 3718{
e7f52dfb
LE
3719 /* Make sure we've acked all the TCP data associated
3720 * with the data requests being unplugged */
e42325a5 3721 drbd_tcp_quickack(mdev->tconn->data.socket);
0ced55a3 3722
81e84650 3723 return true;
0ced55a3
PR
3724}
3725
d8763023
AG
3726static int receive_out_of_sync(struct drbd_conf *mdev, enum drbd_packet cmd,
3727 unsigned int data_size)
73a01a18 3728{
e42325a5 3729 struct p_block_desc *p = &mdev->tconn->data.rbuf.block_desc;
73a01a18 3730
f735e363
LE
3731 switch (mdev->state.conn) {
3732 case C_WF_SYNC_UUID:
3733 case C_WF_BITMAP_T:
3734 case C_BEHIND:
3735 break;
3736 default:
3737 dev_err(DEV, "ASSERT FAILED cstate = %s, expected: WFSyncUUID|WFBitMapT|Behind\n",
3738 drbd_conn_str(mdev->state.conn));
3739 }
3740
73a01a18
PR
3741 drbd_set_out_of_sync(mdev, be64_to_cpu(p->sector), be32_to_cpu(p->blksize));
3742
81e84650 3743 return true;
73a01a18
PR
3744}
3745
d8763023
AG
3746typedef int (*drbd_cmd_handler_f)(struct drbd_conf *, enum drbd_packet cmd,
3747 unsigned int to_receive);
02918be2
PR
3748
3749struct data_cmd {
3750 int expect_payload;
3751 size_t pkt_size;
3752 drbd_cmd_handler_f function;
3753};
3754
3755static struct data_cmd drbd_cmd_handler[] = {
3756 [P_DATA] = { 1, sizeof(struct p_data), receive_Data },
3757 [P_DATA_REPLY] = { 1, sizeof(struct p_data), receive_DataReply },
3758 [P_RS_DATA_REPLY] = { 1, sizeof(struct p_data), receive_RSDataReply } ,
3759 [P_BARRIER] = { 0, sizeof(struct p_barrier), receive_Barrier } ,
257d0af6
PR
3760 [P_BITMAP] = { 1, sizeof(struct p_header), receive_bitmap } ,
3761 [P_COMPRESSED_BITMAP] = { 1, sizeof(struct p_header), receive_bitmap } ,
3762 [P_UNPLUG_REMOTE] = { 0, sizeof(struct p_header), receive_UnplugRemote },
02918be2
PR
3763 [P_DATA_REQUEST] = { 0, sizeof(struct p_block_req), receive_DataRequest },
3764 [P_RS_DATA_REQUEST] = { 0, sizeof(struct p_block_req), receive_DataRequest },
257d0af6
PR
3765 [P_SYNC_PARAM] = { 1, sizeof(struct p_header), receive_SyncParam },
3766 [P_SYNC_PARAM89] = { 1, sizeof(struct p_header), receive_SyncParam },
02918be2
PR
3767 [P_PROTOCOL] = { 1, sizeof(struct p_protocol), receive_protocol },
3768 [P_UUIDS] = { 0, sizeof(struct p_uuids), receive_uuids },
3769 [P_SIZES] = { 0, sizeof(struct p_sizes), receive_sizes },
3770 [P_STATE] = { 0, sizeof(struct p_state), receive_state },
3771 [P_STATE_CHG_REQ] = { 0, sizeof(struct p_req_state), receive_req_state },
3772 [P_SYNC_UUID] = { 0, sizeof(struct p_rs_uuid), receive_sync_uuid },
3773 [P_OV_REQUEST] = { 0, sizeof(struct p_block_req), receive_DataRequest },
3774 [P_OV_REPLY] = { 1, sizeof(struct p_block_req), receive_DataRequest },
3775 [P_CSUM_RS_REQUEST] = { 1, sizeof(struct p_block_req), receive_DataRequest },
3776 [P_DELAY_PROBE] = { 0, sizeof(struct p_delay_probe93), receive_skip },
73a01a18 3777 [P_OUT_OF_SYNC] = { 0, sizeof(struct p_block_desc), receive_out_of_sync },
047cd4a6 3778 [P_CONN_ST_CHG_REQ] = { 0, sizeof(struct p_req_state), receive_req_state },
b411b363
PR
3779 /* anything missing from this table is in
3780 * the asender_tbl, see get_asender_cmd */
02918be2 3781 [P_MAX_CMD] = { 0, 0, NULL },
b411b363
PR
3782};
3783
02918be2 3784/* All handler functions that expect a sub-header get that sub-heder in
e42325a5 3785 mdev->tconn->data.rbuf.header.head.payload.
02918be2 3786
e42325a5 3787 Usually in mdev->tconn->data.rbuf.header.head the callback can find the usual
02918be2
PR
3788 p_header, but they may not rely on that. Since there is also p_header95 !
3789 */
b411b363 3790
eefc2f7d 3791static void drbdd(struct drbd_tconn *tconn)
b411b363 3792{
eefc2f7d 3793 struct p_header *header = &tconn->data.rbuf.header;
77351055 3794 struct packet_info pi;
02918be2
PR
3795 size_t shs; /* sub header size */
3796 int rv;
b411b363 3797
eefc2f7d
PR
3798 while (get_t_state(&tconn->receiver) == RUNNING) {
3799 drbd_thread_current_set_cpu(&tconn->receiver);
3800 if (!drbd_recv_header(tconn, &pi))
02918be2 3801 goto err_out;
b411b363 3802
77351055 3803 if (unlikely(pi.cmd >= P_MAX_CMD || !drbd_cmd_handler[pi.cmd].function)) {
eefc2f7d 3804 conn_err(tconn, "unknown packet type %d, l: %d!\n", pi.cmd, pi.size);
02918be2 3805 goto err_out;
0b33a916 3806 }
b411b363 3807
77351055
PR
3808 shs = drbd_cmd_handler[pi.cmd].pkt_size - sizeof(struct p_header);
3809 if (pi.size - shs > 0 && !drbd_cmd_handler[pi.cmd].expect_payload) {
eefc2f7d 3810 conn_err(tconn, "No payload expected %s l:%d\n", cmdname(pi.cmd), pi.size);
02918be2 3811 goto err_out;
b411b363 3812 }
b411b363 3813
c13f7e1a 3814 if (shs) {
eefc2f7d 3815 rv = drbd_recv(tconn, &header->payload, shs);
c13f7e1a 3816 if (unlikely(rv != shs)) {
0ddc5549 3817 if (!signal_pending(current))
eefc2f7d 3818 conn_warn(tconn, "short read while reading sub header: rv=%d\n", rv);
c13f7e1a
LE
3819 goto err_out;
3820 }
3821 }
3822
eefc2f7d 3823 rv = drbd_cmd_handler[pi.cmd].function(vnr_to_mdev(tconn, pi.vnr), pi.cmd, pi.size - shs);
b411b363 3824
02918be2 3825 if (unlikely(!rv)) {
eefc2f7d 3826 conn_err(tconn, "error receiving %s, l: %d!\n",
77351055 3827 cmdname(pi.cmd), pi.size);
02918be2 3828 goto err_out;
b411b363
PR
3829 }
3830 }
b411b363 3831
02918be2
PR
3832 if (0) {
3833 err_out:
bbeb641c 3834 conn_request_state(tconn, NS(conn, C_PROTOCOL_ERROR), CS_HARD);
02918be2 3835 }
b411b363
PR
3836}
3837
a21e9298 3838void drbd_flush_workqueue(struct drbd_conf *mdev)
b411b363
PR
3839{
3840 struct drbd_wq_barrier barr;
3841
3842 barr.w.cb = w_prev_work_done;
a21e9298 3843 barr.w.mdev = mdev;
b411b363 3844 init_completion(&barr.done);
a21e9298 3845 drbd_queue_work(&mdev->tconn->data.work, &barr.w);
b411b363
PR
3846 wait_for_completion(&barr.done);
3847}
3848
360cc740 3849static void drbd_disconnect(struct drbd_tconn *tconn)
b411b363 3850{
bbeb641c 3851 enum drbd_conns oc;
b411b363 3852 int rv = SS_UNKNOWN_ERROR;
b411b363 3853
bbeb641c 3854 if (tconn->cstate == C_STANDALONE)
b411b363 3855 return;
b411b363
PR
3856
3857 /* asender does not clean up anything. it must not interfere, either */
360cc740
PR
3858 drbd_thread_stop(&tconn->asender);
3859 drbd_free_sock(tconn);
3860
3861 idr_for_each(&tconn->volumes, drbd_disconnected, tconn);
3862
3863 conn_info(tconn, "Connection closed\n");
3864
3865 spin_lock_irq(&tconn->req_lock);
bbeb641c
PR
3866 oc = tconn->cstate;
3867 if (oc >= C_UNCONNECTED)
3868 rv = _conn_request_state(tconn, NS(conn, C_UNCONNECTED), CS_VERBOSE);
3869
360cc740
PR
3870 spin_unlock_irq(&tconn->req_lock);
3871
bbeb641c 3872 if (oc == C_DISCONNECTING) {
360cc740
PR
3873 wait_event(tconn->net_cnt_wait, atomic_read(&tconn->net_cnt) == 0);
3874
3875 crypto_free_hash(tconn->cram_hmac_tfm);
3876 tconn->cram_hmac_tfm = NULL;
3877
3878 kfree(tconn->net_conf);
3879 tconn->net_conf = NULL;
bbeb641c 3880 conn_request_state(tconn, NS(conn, C_STANDALONE), CS_VERBOSE);
360cc740
PR
3881 }
3882}
3883
3884static int drbd_disconnected(int vnr, void *p, void *data)
3885{
3886 struct drbd_conf *mdev = (struct drbd_conf *)p;
3887 enum drbd_fencing_p fp;
3888 unsigned int i;
b411b363 3889
85719573 3890 /* wait for current activity to cease. */
87eeee41 3891 spin_lock_irq(&mdev->tconn->req_lock);
b411b363
PR
3892 _drbd_wait_ee_list_empty(mdev, &mdev->active_ee);
3893 _drbd_wait_ee_list_empty(mdev, &mdev->sync_ee);
3894 _drbd_wait_ee_list_empty(mdev, &mdev->read_ee);
87eeee41 3895 spin_unlock_irq(&mdev->tconn->req_lock);
b411b363
PR
3896
3897 /* We do not have data structures that would allow us to
3898 * get the rs_pending_cnt down to 0 again.
3899 * * On C_SYNC_TARGET we do not have any data structures describing
3900 * the pending RSDataRequest's we have sent.
3901 * * On C_SYNC_SOURCE there is no data structure that tracks
3902 * the P_RS_DATA_REPLY blocks that we sent to the SyncTarget.
3903 * And no, it is not the sum of the reference counts in the
3904 * resync_LRU. The resync_LRU tracks the whole operation including
3905 * the disk-IO, while the rs_pending_cnt only tracks the blocks
3906 * on the fly. */
3907 drbd_rs_cancel_all(mdev);
3908 mdev->rs_total = 0;
3909 mdev->rs_failed = 0;
3910 atomic_set(&mdev->rs_pending_cnt, 0);
3911 wake_up(&mdev->misc_wait);
3912
7fde2be9
PR
3913 del_timer(&mdev->request_timer);
3914
b411b363
PR
3915 /* make sure syncer is stopped and w_resume_next_sg queued */
3916 del_timer_sync(&mdev->resync_timer);
b411b363
PR
3917 resync_timer_fn((unsigned long)mdev);
3918
b411b363
PR
3919 /* wait for all w_e_end_data_req, w_e_end_rsdata_req, w_send_barrier,
3920 * w_make_resync_request etc. which may still be on the worker queue
3921 * to be "canceled" */
a21e9298 3922 drbd_flush_workqueue(mdev);
b411b363
PR
3923
3924 /* This also does reclaim_net_ee(). If we do this too early, we might
3925 * miss some resync ee and pages.*/
3926 drbd_process_done_ee(mdev);
3927
3928 kfree(mdev->p_uuid);
3929 mdev->p_uuid = NULL;
3930
fb22c402 3931 if (!is_susp(mdev->state))
b411b363
PR
3932 tl_clear(mdev);
3933
b411b363
PR
3934 drbd_md_sync(mdev);
3935
3936 fp = FP_DONT_CARE;
3937 if (get_ldev(mdev)) {
3938 fp = mdev->ldev->dc.fencing;
3939 put_ldev(mdev);
3940 }
3941
87f7be4c
PR
3942 if (mdev->state.role == R_PRIMARY && fp >= FP_RESOURCE && mdev->state.pdsk >= D_UNKNOWN)
3943 drbd_try_outdate_peer_async(mdev);
b411b363 3944
20ceb2b2
LE
3945 /* serialize with bitmap writeout triggered by the state change,
3946 * if any. */
3947 wait_event(mdev->misc_wait, !test_bit(BITMAP_IO, &mdev->flags));
3948
b411b363
PR
3949 /* tcp_close and release of sendpage pages can be deferred. I don't
3950 * want to use SO_LINGER, because apparently it can be deferred for
3951 * more than 20 seconds (longest time I checked).
3952 *
3953 * Actually we don't care for exactly when the network stack does its
3954 * put_page(), but release our reference on these pages right here.
3955 */
3956 i = drbd_release_ee(mdev, &mdev->net_ee);
3957 if (i)
3958 dev_info(DEV, "net_ee not empty, killed %u entries\n", i);
435f0740
LE
3959 i = atomic_read(&mdev->pp_in_use_by_net);
3960 if (i)
3961 dev_info(DEV, "pp_in_use_by_net = %d, expected 0\n", i);
b411b363
PR
3962 i = atomic_read(&mdev->pp_in_use);
3963 if (i)
45bb912b 3964 dev_info(DEV, "pp_in_use = %d, expected 0\n", i);
b411b363
PR
3965
3966 D_ASSERT(list_empty(&mdev->read_ee));
3967 D_ASSERT(list_empty(&mdev->active_ee));
3968 D_ASSERT(list_empty(&mdev->sync_ee));
3969 D_ASSERT(list_empty(&mdev->done_ee));
3970
3971 /* ok, no more ee's on the fly, it is safe to reset the epoch_size */
3972 atomic_set(&mdev->current_epoch->epoch_size, 0);
3973 D_ASSERT(list_empty(&mdev->current_epoch->list));
360cc740
PR
3974
3975 return 0;
b411b363
PR
3976}
3977
3978/*
3979 * We support PRO_VERSION_MIN to PRO_VERSION_MAX. The protocol version
3980 * we can agree on is stored in agreed_pro_version.
3981 *
3982 * feature flags and the reserved array should be enough room for future
3983 * enhancements of the handshake protocol, and possible plugins...
3984 *
3985 * for now, they are expected to be zero, but ignored.
3986 */
8a22cccc 3987static int drbd_send_handshake(struct drbd_tconn *tconn)
b411b363 3988{
e6b3ea83 3989 /* ASSERT current == mdev->tconn->receiver ... */
8a22cccc 3990 struct p_handshake *p = &tconn->data.sbuf.handshake;
b411b363
PR
3991 int ok;
3992
8a22cccc
PR
3993 if (mutex_lock_interruptible(&tconn->data.mutex)) {
3994 conn_err(tconn, "interrupted during initial handshake\n");
b411b363
PR
3995 return 0; /* interrupted. not ok. */
3996 }
3997
8a22cccc
PR
3998 if (tconn->data.socket == NULL) {
3999 mutex_unlock(&tconn->data.mutex);
b411b363
PR
4000 return 0;
4001 }
4002
4003 memset(p, 0, sizeof(*p));
4004 p->protocol_min = cpu_to_be32(PRO_VERSION_MIN);
4005 p->protocol_max = cpu_to_be32(PRO_VERSION_MAX);
8a22cccc
PR
4006 ok = _conn_send_cmd(tconn, 0, tconn->data.socket, P_HAND_SHAKE,
4007 &p->head, sizeof(*p), 0);
4008 mutex_unlock(&tconn->data.mutex);
b411b363
PR
4009 return ok;
4010}
4011
4012/*
4013 * return values:
4014 * 1 yes, we have a valid connection
4015 * 0 oops, did not work out, please try again
4016 * -1 peer talks different language,
4017 * no point in trying again, please go standalone.
4018 */
65d11ed6 4019static int drbd_do_handshake(struct drbd_tconn *tconn)
b411b363 4020{
65d11ed6
PR
4021 /* ASSERT current == tconn->receiver ... */
4022 struct p_handshake *p = &tconn->data.rbuf.handshake;
02918be2 4023 const int expect = sizeof(struct p_handshake) - sizeof(struct p_header80);
77351055 4024 struct packet_info pi;
b411b363
PR
4025 int rv;
4026
65d11ed6 4027 rv = drbd_send_handshake(tconn);
b411b363
PR
4028 if (!rv)
4029 return 0;
4030
65d11ed6 4031 rv = drbd_recv_header(tconn, &pi);
b411b363
PR
4032 if (!rv)
4033 return 0;
4034
77351055 4035 if (pi.cmd != P_HAND_SHAKE) {
65d11ed6 4036 conn_err(tconn, "expected HandShake packet, received: %s (0x%04x)\n",
77351055 4037 cmdname(pi.cmd), pi.cmd);
b411b363
PR
4038 return -1;
4039 }
4040
77351055 4041 if (pi.size != expect) {
65d11ed6 4042 conn_err(tconn, "expected HandShake length: %u, received: %u\n",
77351055 4043 expect, pi.size);
b411b363
PR
4044 return -1;
4045 }
4046
65d11ed6 4047 rv = drbd_recv(tconn, &p->head.payload, expect);
b411b363
PR
4048
4049 if (rv != expect) {
0ddc5549 4050 if (!signal_pending(current))
65d11ed6 4051 conn_warn(tconn, "short read receiving handshake packet: l=%u\n", rv);
b411b363
PR
4052 return 0;
4053 }
4054
b411b363
PR
4055 p->protocol_min = be32_to_cpu(p->protocol_min);
4056 p->protocol_max = be32_to_cpu(p->protocol_max);
4057 if (p->protocol_max == 0)
4058 p->protocol_max = p->protocol_min;
4059
4060 if (PRO_VERSION_MAX < p->protocol_min ||
4061 PRO_VERSION_MIN > p->protocol_max)
4062 goto incompat;
4063
65d11ed6 4064 tconn->agreed_pro_version = min_t(int, PRO_VERSION_MAX, p->protocol_max);
b411b363 4065
65d11ed6
PR
4066 conn_info(tconn, "Handshake successful: "
4067 "Agreed network protocol version %d\n", tconn->agreed_pro_version);
b411b363
PR
4068
4069 return 1;
4070
4071 incompat:
65d11ed6 4072 conn_err(tconn, "incompatible DRBD dialects: "
b411b363
PR
4073 "I support %d-%d, peer supports %d-%d\n",
4074 PRO_VERSION_MIN, PRO_VERSION_MAX,
4075 p->protocol_min, p->protocol_max);
4076 return -1;
4077}
4078
4079#if !defined(CONFIG_CRYPTO_HMAC) && !defined(CONFIG_CRYPTO_HMAC_MODULE)
13e6037d 4080static int drbd_do_auth(struct drbd_tconn *tconn)
b411b363
PR
4081{
4082 dev_err(DEV, "This kernel was build without CONFIG_CRYPTO_HMAC.\n");
4083 dev_err(DEV, "You need to disable 'cram-hmac-alg' in drbd.conf.\n");
b10d96cb 4084 return -1;
b411b363
PR
4085}
4086#else
4087#define CHALLENGE_LEN 64
b10d96cb
JT
4088
4089/* Return value:
4090 1 - auth succeeded,
4091 0 - failed, try again (network error),
4092 -1 - auth failed, don't try again.
4093*/
4094
13e6037d 4095static int drbd_do_auth(struct drbd_tconn *tconn)
b411b363
PR
4096{
4097 char my_challenge[CHALLENGE_LEN]; /* 64 Bytes... */
4098 struct scatterlist sg;
4099 char *response = NULL;
4100 char *right_response = NULL;
4101 char *peers_ch = NULL;
13e6037d 4102 unsigned int key_len = strlen(tconn->net_conf->shared_secret);
b411b363
PR
4103 unsigned int resp_size;
4104 struct hash_desc desc;
77351055 4105 struct packet_info pi;
b411b363
PR
4106 int rv;
4107
13e6037d 4108 desc.tfm = tconn->cram_hmac_tfm;
b411b363
PR
4109 desc.flags = 0;
4110
13e6037d
PR
4111 rv = crypto_hash_setkey(tconn->cram_hmac_tfm,
4112 (u8 *)tconn->net_conf->shared_secret, key_len);
b411b363 4113 if (rv) {
13e6037d 4114 conn_err(tconn, "crypto_hash_setkey() failed with %d\n", rv);
b10d96cb 4115 rv = -1;
b411b363
PR
4116 goto fail;
4117 }
4118
4119 get_random_bytes(my_challenge, CHALLENGE_LEN);
4120
13e6037d 4121 rv = conn_send_cmd2(tconn, P_AUTH_CHALLENGE, my_challenge, CHALLENGE_LEN);
b411b363
PR
4122 if (!rv)
4123 goto fail;
4124
13e6037d 4125 rv = drbd_recv_header(tconn, &pi);
b411b363
PR
4126 if (!rv)
4127 goto fail;
4128
77351055 4129 if (pi.cmd != P_AUTH_CHALLENGE) {
13e6037d 4130 conn_err(tconn, "expected AuthChallenge packet, received: %s (0x%04x)\n",
77351055 4131 cmdname(pi.cmd), pi.cmd);
b411b363
PR
4132 rv = 0;
4133 goto fail;
4134 }
4135
77351055 4136 if (pi.size > CHALLENGE_LEN * 2) {
13e6037d 4137 conn_err(tconn, "expected AuthChallenge payload too big.\n");
b10d96cb 4138 rv = -1;
b411b363
PR
4139 goto fail;
4140 }
4141
77351055 4142 peers_ch = kmalloc(pi.size, GFP_NOIO);
b411b363 4143 if (peers_ch == NULL) {
13e6037d 4144 conn_err(tconn, "kmalloc of peers_ch failed\n");
b10d96cb 4145 rv = -1;
b411b363
PR
4146 goto fail;
4147 }
4148
13e6037d 4149 rv = drbd_recv(tconn, peers_ch, pi.size);
b411b363 4150
77351055 4151 if (rv != pi.size) {
0ddc5549 4152 if (!signal_pending(current))
13e6037d 4153 conn_warn(tconn, "short read AuthChallenge: l=%u\n", rv);
b411b363
PR
4154 rv = 0;
4155 goto fail;
4156 }
4157
13e6037d 4158 resp_size = crypto_hash_digestsize(tconn->cram_hmac_tfm);
b411b363
PR
4159 response = kmalloc(resp_size, GFP_NOIO);
4160 if (response == NULL) {
13e6037d 4161 conn_err(tconn, "kmalloc of response failed\n");
b10d96cb 4162 rv = -1;
b411b363
PR
4163 goto fail;
4164 }
4165
4166 sg_init_table(&sg, 1);
77351055 4167 sg_set_buf(&sg, peers_ch, pi.size);
b411b363
PR
4168
4169 rv = crypto_hash_digest(&desc, &sg, sg.length, response);
4170 if (rv) {
13e6037d 4171 conn_err(tconn, "crypto_hash_digest() failed with %d\n", rv);
b10d96cb 4172 rv = -1;
b411b363
PR
4173 goto fail;
4174 }
4175
13e6037d 4176 rv = conn_send_cmd2(tconn, P_AUTH_RESPONSE, response, resp_size);
b411b363
PR
4177 if (!rv)
4178 goto fail;
4179
13e6037d 4180 rv = drbd_recv_header(tconn, &pi);
b411b363
PR
4181 if (!rv)
4182 goto fail;
4183
77351055 4184 if (pi.cmd != P_AUTH_RESPONSE) {
13e6037d 4185 conn_err(tconn, "expected AuthResponse packet, received: %s (0x%04x)\n",
77351055 4186 cmdname(pi.cmd), pi.cmd);
b411b363
PR
4187 rv = 0;
4188 goto fail;
4189 }
4190
77351055 4191 if (pi.size != resp_size) {
13e6037d 4192 conn_err(tconn, "expected AuthResponse payload of wrong size\n");
b411b363
PR
4193 rv = 0;
4194 goto fail;
4195 }
4196
13e6037d 4197 rv = drbd_recv(tconn, response , resp_size);
b411b363
PR
4198
4199 if (rv != resp_size) {
0ddc5549 4200 if (!signal_pending(current))
13e6037d 4201 conn_warn(tconn, "short read receiving AuthResponse: l=%u\n", rv);
b411b363
PR
4202 rv = 0;
4203 goto fail;
4204 }
4205
4206 right_response = kmalloc(resp_size, GFP_NOIO);
2d1ee87d 4207 if (right_response == NULL) {
13e6037d 4208 conn_err(tconn, "kmalloc of right_response failed\n");
b10d96cb 4209 rv = -1;
b411b363
PR
4210 goto fail;
4211 }
4212
4213 sg_set_buf(&sg, my_challenge, CHALLENGE_LEN);
4214
4215 rv = crypto_hash_digest(&desc, &sg, sg.length, right_response);
4216 if (rv) {
13e6037d 4217 conn_err(tconn, "crypto_hash_digest() failed with %d\n", rv);
b10d96cb 4218 rv = -1;
b411b363
PR
4219 goto fail;
4220 }
4221
4222 rv = !memcmp(response, right_response, resp_size);
4223
4224 if (rv)
13e6037d
PR
4225 conn_info(tconn, "Peer authenticated using %d bytes of '%s' HMAC\n",
4226 resp_size, tconn->net_conf->cram_hmac_alg);
b10d96cb
JT
4227 else
4228 rv = -1;
b411b363
PR
4229
4230 fail:
4231 kfree(peers_ch);
4232 kfree(response);
4233 kfree(right_response);
4234
4235 return rv;
4236}
4237#endif
4238
4239int drbdd_init(struct drbd_thread *thi)
4240{
392c8801 4241 struct drbd_tconn *tconn = thi->tconn;
b411b363
PR
4242 int h;
4243
4d641dd7 4244 conn_info(tconn, "receiver (re)started\n");
b411b363
PR
4245
4246 do {
4d641dd7 4247 h = drbd_connect(tconn);
b411b363 4248 if (h == 0) {
4d641dd7 4249 drbd_disconnect(tconn);
20ee6390 4250 schedule_timeout_interruptible(HZ);
b411b363
PR
4251 }
4252 if (h == -1) {
4d641dd7 4253 conn_warn(tconn, "Discarding network configuration.\n");
bbeb641c 4254 conn_request_state(tconn, NS(conn, C_DISCONNECTING), CS_HARD);
b411b363
PR
4255 }
4256 } while (h == 0);
4257
4258 if (h > 0) {
4d641dd7
PR
4259 if (get_net_conf(tconn)) {
4260 drbdd(tconn);
4261 put_net_conf(tconn);
b411b363
PR
4262 }
4263 }
4264
4d641dd7 4265 drbd_disconnect(tconn);
b411b363 4266
4d641dd7 4267 conn_info(tconn, "receiver terminated\n");
b411b363
PR
4268 return 0;
4269}
4270
4271/* ********* acknowledge sender ******** */
4272
d8763023 4273static int got_RqSReply(struct drbd_conf *mdev, enum drbd_packet cmd)
b411b363 4274{
257d0af6 4275 struct p_req_state_reply *p = &mdev->tconn->meta.rbuf.req_state_reply;
fc3b10a4 4276 struct drbd_tconn *tconn = mdev->tconn;
b411b363
PR
4277
4278 int retcode = be32_to_cpu(p->retcode);
4279
fc3b10a4
PR
4280 if (cmd == P_STATE_CHG_REPLY) {
4281 if (retcode >= SS_SUCCESS) {
4282 set_bit(CL_ST_CHG_SUCCESS, &mdev->flags);
4283 } else {
4284 set_bit(CL_ST_CHG_FAIL, &mdev->flags);
4285 dev_err(DEV, "Requested state change failed by peer: %s (%d)\n",
4286 drbd_set_st_err_str(retcode), retcode);
4287 }
4288 wake_up(&mdev->state_wait);
4289 } else /* conn == P_CONN_ST_CHG_REPLY */ {
4290 if (retcode >= SS_SUCCESS) {
4291 set_bit(CONN_WD_ST_CHG_OKAY, &tconn->flags);
4292 } else {
4293 set_bit(CONN_WD_ST_CHG_FAIL, &tconn->flags);
4294 conn_err(tconn, "Requested state change failed by peer: %s (%d)\n",
4295 drbd_set_st_err_str(retcode), retcode);
4296 }
4297 wake_up(&tconn->ping_wait);
b411b363 4298 }
81e84650 4299 return true;
b411b363
PR
4300}
4301
d8763023 4302static int got_Ping(struct drbd_conf *mdev, enum drbd_packet cmd)
b411b363 4303{
2a67d8b9 4304 return drbd_send_ping_ack(mdev->tconn);
b411b363
PR
4305
4306}
4307
d8763023 4308static int got_PingAck(struct drbd_conf *mdev, enum drbd_packet cmd)
b411b363 4309{
2a67d8b9 4310 struct drbd_tconn *tconn = mdev->tconn;
b411b363 4311 /* restore idle timeout */
2a67d8b9
PR
4312 tconn->meta.socket->sk->sk_rcvtimeo = tconn->net_conf->ping_int*HZ;
4313 if (!test_and_set_bit(GOT_PING_ACK, &tconn->flags))
4314 wake_up(&tconn->ping_wait);
b411b363 4315
81e84650 4316 return true;
b411b363
PR
4317}
4318
d8763023 4319static int got_IsInSync(struct drbd_conf *mdev, enum drbd_packet cmd)
b411b363 4320{
257d0af6 4321 struct p_block_ack *p = &mdev->tconn->meta.rbuf.block_ack;
b411b363
PR
4322 sector_t sector = be64_to_cpu(p->sector);
4323 int blksize = be32_to_cpu(p->blksize);
4324
31890f4a 4325 D_ASSERT(mdev->tconn->agreed_pro_version >= 89);
b411b363
PR
4326
4327 update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4328
1d53f09e
LE
4329 if (get_ldev(mdev)) {
4330 drbd_rs_complete_io(mdev, sector);
4331 drbd_set_in_sync(mdev, sector, blksize);
4332 /* rs_same_csums is supposed to count in units of BM_BLOCK_SIZE */
4333 mdev->rs_same_csum += (blksize >> BM_BLOCK_SHIFT);
4334 put_ldev(mdev);
4335 }
b411b363 4336 dec_rs_pending(mdev);
778f271d 4337 atomic_add(blksize >> 9, &mdev->rs_sect_in);
b411b363 4338
81e84650 4339 return true;
b411b363
PR
4340}
4341
bc9c5c41
AG
4342static int
4343validate_req_change_req_state(struct drbd_conf *mdev, u64 id, sector_t sector,
4344 struct rb_root *root, const char *func,
4345 enum drbd_req_event what, bool missing_ok)
b411b363
PR
4346{
4347 struct drbd_request *req;
4348 struct bio_and_error m;
4349
87eeee41 4350 spin_lock_irq(&mdev->tconn->req_lock);
bc9c5c41 4351 req = find_request(mdev, root, id, sector, missing_ok, func);
b411b363 4352 if (unlikely(!req)) {
87eeee41 4353 spin_unlock_irq(&mdev->tconn->req_lock);
81e84650 4354 return false;
b411b363
PR
4355 }
4356 __req_mod(req, what, &m);
87eeee41 4357 spin_unlock_irq(&mdev->tconn->req_lock);
b411b363
PR
4358
4359 if (m.bio)
4360 complete_master_bio(mdev, &m);
81e84650 4361 return true;
b411b363
PR
4362}
4363
d8763023 4364static int got_BlockAck(struct drbd_conf *mdev, enum drbd_packet cmd)
b411b363 4365{
257d0af6 4366 struct p_block_ack *p = &mdev->tconn->meta.rbuf.block_ack;
b411b363
PR
4367 sector_t sector = be64_to_cpu(p->sector);
4368 int blksize = be32_to_cpu(p->blksize);
4369 enum drbd_req_event what;
4370
4371 update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4372
579b57ed 4373 if (p->block_id == ID_SYNCER) {
b411b363
PR
4374 drbd_set_in_sync(mdev, sector, blksize);
4375 dec_rs_pending(mdev);
81e84650 4376 return true;
b411b363 4377 }
257d0af6 4378 switch (cmd) {
b411b363 4379 case P_RS_WRITE_ACK:
89e58e75 4380 D_ASSERT(mdev->tconn->net_conf->wire_protocol == DRBD_PROT_C);
8554df1c 4381 what = WRITE_ACKED_BY_PEER_AND_SIS;
b411b363
PR
4382 break;
4383 case P_WRITE_ACK:
89e58e75 4384 D_ASSERT(mdev->tconn->net_conf->wire_protocol == DRBD_PROT_C);
8554df1c 4385 what = WRITE_ACKED_BY_PEER;
b411b363
PR
4386 break;
4387 case P_RECV_ACK:
89e58e75 4388 D_ASSERT(mdev->tconn->net_conf->wire_protocol == DRBD_PROT_B);
8554df1c 4389 what = RECV_ACKED_BY_PEER;
b411b363
PR
4390 break;
4391 case P_DISCARD_ACK:
89e58e75 4392 D_ASSERT(mdev->tconn->net_conf->wire_protocol == DRBD_PROT_C);
8554df1c 4393 what = CONFLICT_DISCARDED_BY_PEER;
b411b363
PR
4394 break;
4395 default:
4396 D_ASSERT(0);
81e84650 4397 return false;
b411b363
PR
4398 }
4399
4400 return validate_req_change_req_state(mdev, p->block_id, sector,
bc9c5c41
AG
4401 &mdev->write_requests, __func__,
4402 what, false);
b411b363
PR
4403}
4404
d8763023 4405static int got_NegAck(struct drbd_conf *mdev, enum drbd_packet cmd)
b411b363 4406{
257d0af6 4407 struct p_block_ack *p = &mdev->tconn->meta.rbuf.block_ack;
b411b363 4408 sector_t sector = be64_to_cpu(p->sector);
2deb8336 4409 int size = be32_to_cpu(p->blksize);
89e58e75
PR
4410 bool missing_ok = mdev->tconn->net_conf->wire_protocol == DRBD_PROT_A ||
4411 mdev->tconn->net_conf->wire_protocol == DRBD_PROT_B;
c3afd8f5 4412 bool found;
b411b363
PR
4413
4414 update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4415
579b57ed 4416 if (p->block_id == ID_SYNCER) {
b411b363
PR
4417 dec_rs_pending(mdev);
4418 drbd_rs_failed_io(mdev, sector, size);
81e84650 4419 return true;
b411b363 4420 }
2deb8336 4421
c3afd8f5 4422 found = validate_req_change_req_state(mdev, p->block_id, sector,
bc9c5c41 4423 &mdev->write_requests, __func__,
8554df1c 4424 NEG_ACKED, missing_ok);
c3afd8f5
AG
4425 if (!found) {
4426 /* Protocol A has no P_WRITE_ACKs, but has P_NEG_ACKs.
4427 The master bio might already be completed, therefore the
4428 request is no longer in the collision hash. */
4429 /* In Protocol B we might already have got a P_RECV_ACK
4430 but then get a P_NEG_ACK afterwards. */
4431 if (!missing_ok)
2deb8336 4432 return false;
c3afd8f5 4433 drbd_set_out_of_sync(mdev, sector, size);
2deb8336 4434 }
2deb8336 4435 return true;
b411b363
PR
4436}
4437
d8763023 4438static int got_NegDReply(struct drbd_conf *mdev, enum drbd_packet cmd)
b411b363 4439{
257d0af6 4440 struct p_block_ack *p = &mdev->tconn->meta.rbuf.block_ack;
b411b363
PR
4441 sector_t sector = be64_to_cpu(p->sector);
4442
4443 update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4444 dev_err(DEV, "Got NegDReply; Sector %llus, len %u; Fail original request.\n",
4445 (unsigned long long)sector, be32_to_cpu(p->blksize));
4446
4447 return validate_req_change_req_state(mdev, p->block_id, sector,
bc9c5c41 4448 &mdev->read_requests, __func__,
8554df1c 4449 NEG_ACKED, false);
b411b363
PR
4450}
4451
d8763023 4452static int got_NegRSDReply(struct drbd_conf *mdev, enum drbd_packet cmd)
b411b363
PR
4453{
4454 sector_t sector;
4455 int size;
257d0af6 4456 struct p_block_ack *p = &mdev->tconn->meta.rbuf.block_ack;
b411b363
PR
4457
4458 sector = be64_to_cpu(p->sector);
4459 size = be32_to_cpu(p->blksize);
b411b363
PR
4460
4461 update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4462
4463 dec_rs_pending(mdev);
4464
4465 if (get_ldev_if_state(mdev, D_FAILED)) {
4466 drbd_rs_complete_io(mdev, sector);
257d0af6 4467 switch (cmd) {
d612d309
PR
4468 case P_NEG_RS_DREPLY:
4469 drbd_rs_failed_io(mdev, sector, size);
4470 case P_RS_CANCEL:
4471 break;
4472 default:
4473 D_ASSERT(0);
4474 put_ldev(mdev);
4475 return false;
4476 }
b411b363
PR
4477 put_ldev(mdev);
4478 }
4479
81e84650 4480 return true;
b411b363
PR
4481}
4482
d8763023 4483static int got_BarrierAck(struct drbd_conf *mdev, enum drbd_packet cmd)
b411b363 4484{
257d0af6 4485 struct p_barrier_ack *p = &mdev->tconn->meta.rbuf.barrier_ack;
b411b363
PR
4486
4487 tl_release(mdev, p->barrier, be32_to_cpu(p->set_size));
4488
c4752ef1
PR
4489 if (mdev->state.conn == C_AHEAD &&
4490 atomic_read(&mdev->ap_in_flight) == 0 &&
370a43e7
PR
4491 !test_and_set_bit(AHEAD_TO_SYNC_SOURCE, &mdev->current_epoch->flags)) {
4492 mdev->start_resync_timer.expires = jiffies + HZ;
4493 add_timer(&mdev->start_resync_timer);
c4752ef1
PR
4494 }
4495
81e84650 4496 return true;
b411b363
PR
4497}
4498
d8763023 4499static int got_OVResult(struct drbd_conf *mdev, enum drbd_packet cmd)
b411b363 4500{
257d0af6 4501 struct p_block_ack *p = &mdev->tconn->meta.rbuf.block_ack;
b411b363
PR
4502 struct drbd_work *w;
4503 sector_t sector;
4504 int size;
4505
4506 sector = be64_to_cpu(p->sector);
4507 size = be32_to_cpu(p->blksize);
4508
4509 update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4510
4511 if (be64_to_cpu(p->block_id) == ID_OUT_OF_SYNC)
4512 drbd_ov_oos_found(mdev, sector, size);
4513 else
4514 ov_oos_print(mdev);
4515
1d53f09e 4516 if (!get_ldev(mdev))
81e84650 4517 return true;
1d53f09e 4518
b411b363
PR
4519 drbd_rs_complete_io(mdev, sector);
4520 dec_rs_pending(mdev);
4521
ea5442af
LE
4522 --mdev->ov_left;
4523
4524 /* let's advance progress step marks only for every other megabyte */
4525 if ((mdev->ov_left & 0x200) == 0x200)
4526 drbd_advance_rs_marks(mdev, mdev->ov_left);
4527
4528 if (mdev->ov_left == 0) {
b411b363
PR
4529 w = kmalloc(sizeof(*w), GFP_NOIO);
4530 if (w) {
4531 w->cb = w_ov_finished;
a21e9298 4532 w->mdev = mdev;
e42325a5 4533 drbd_queue_work_front(&mdev->tconn->data.work, w);
b411b363
PR
4534 } else {
4535 dev_err(DEV, "kmalloc(w) failed.");
4536 ov_oos_print(mdev);
4537 drbd_resync_finished(mdev);
4538 }
4539 }
1d53f09e 4540 put_ldev(mdev);
81e84650 4541 return true;
b411b363
PR
4542}
4543
d8763023 4544static int got_skip(struct drbd_conf *mdev, enum drbd_packet cmd)
0ced55a3 4545{
81e84650 4546 return true;
0ced55a3
PR
4547}
4548
b411b363
PR
4549struct asender_cmd {
4550 size_t pkt_size;
d8763023 4551 int (*process)(struct drbd_conf *mdev, enum drbd_packet cmd);
b411b363
PR
4552};
4553
4554static struct asender_cmd *get_asender_cmd(int cmd)
4555{
4556 static struct asender_cmd asender_tbl[] = {
4557 /* anything missing from this table is in
4558 * the drbd_cmd_handler (drbd_default_handler) table,
4559 * see the beginning of drbdd() */
257d0af6
PR
4560 [P_PING] = { sizeof(struct p_header), got_Ping },
4561 [P_PING_ACK] = { sizeof(struct p_header), got_PingAck },
b411b363
PR
4562 [P_RECV_ACK] = { sizeof(struct p_block_ack), got_BlockAck },
4563 [P_WRITE_ACK] = { sizeof(struct p_block_ack), got_BlockAck },
4564 [P_RS_WRITE_ACK] = { sizeof(struct p_block_ack), got_BlockAck },
4565 [P_DISCARD_ACK] = { sizeof(struct p_block_ack), got_BlockAck },
4566 [P_NEG_ACK] = { sizeof(struct p_block_ack), got_NegAck },
4567 [P_NEG_DREPLY] = { sizeof(struct p_block_ack), got_NegDReply },
4568 [P_NEG_RS_DREPLY] = { sizeof(struct p_block_ack), got_NegRSDReply},
4569 [P_OV_RESULT] = { sizeof(struct p_block_ack), got_OVResult },
4570 [P_BARRIER_ACK] = { sizeof(struct p_barrier_ack), got_BarrierAck },
4571 [P_STATE_CHG_REPLY] = { sizeof(struct p_req_state_reply), got_RqSReply },
4572 [P_RS_IS_IN_SYNC] = { sizeof(struct p_block_ack), got_IsInSync },
02918be2 4573 [P_DELAY_PROBE] = { sizeof(struct p_delay_probe93), got_skip },
d612d309 4574 [P_RS_CANCEL] = { sizeof(struct p_block_ack), got_NegRSDReply},
fc3b10a4 4575 [P_CONN_ST_CHG_REPLY]={ sizeof(struct p_req_state_reply), got_RqSReply },
b411b363
PR
4576 [P_MAX_CMD] = { 0, NULL },
4577 };
4578 if (cmd > P_MAX_CMD || asender_tbl[cmd].process == NULL)
4579 return NULL;
4580 return &asender_tbl[cmd];
4581}
4582
32862ec7
PR
4583static int _drbd_process_done_ee(int vnr, void *p, void *data)
4584{
4585 struct drbd_conf *mdev = (struct drbd_conf *)p;
4586 return !drbd_process_done_ee(mdev);
4587}
4588
4589static int _check_ee_empty(int vnr, void *p, void *data)
4590{
4591 struct drbd_conf *mdev = (struct drbd_conf *)p;
4592 struct drbd_tconn *tconn = mdev->tconn;
4593 int not_empty;
4594
4595 spin_lock_irq(&tconn->req_lock);
4596 not_empty = !list_empty(&mdev->done_ee);
4597 spin_unlock_irq(&tconn->req_lock);
4598
4599 return not_empty;
4600}
4601
4602static int tconn_process_done_ee(struct drbd_tconn *tconn)
4603{
4604 int not_empty, err;
4605
4606 do {
4607 clear_bit(SIGNAL_ASENDER, &tconn->flags);
4608 flush_signals(current);
4609 err = idr_for_each(&tconn->volumes, _drbd_process_done_ee, NULL);
4610 if (err)
4611 return err;
4612 set_bit(SIGNAL_ASENDER, &tconn->flags);
4613 not_empty = idr_for_each(&tconn->volumes, _check_ee_empty, NULL);
4614 } while (not_empty);
4615
4616 return 0;
4617}
4618
b411b363
PR
4619int drbd_asender(struct drbd_thread *thi)
4620{
392c8801 4621 struct drbd_tconn *tconn = thi->tconn;
32862ec7 4622 struct p_header *h = &tconn->meta.rbuf.header;
b411b363 4623 struct asender_cmd *cmd = NULL;
77351055 4624 struct packet_info pi;
257d0af6 4625 int rv;
b411b363
PR
4626 void *buf = h;
4627 int received = 0;
257d0af6 4628 int expect = sizeof(struct p_header);
f36af18c 4629 int ping_timeout_active = 0;
b411b363 4630
b411b363
PR
4631 current->policy = SCHED_RR; /* Make this a realtime task! */
4632 current->rt_priority = 2; /* more important than all other tasks */
4633
e77a0a5c 4634 while (get_t_state(thi) == RUNNING) {
80822284 4635 drbd_thread_current_set_cpu(thi);
32862ec7 4636 if (test_and_clear_bit(SEND_PING, &tconn->flags)) {
2a67d8b9 4637 if (!drbd_send_ping(tconn)) {
32862ec7 4638 conn_err(tconn, "drbd_send_ping has failed\n");
841ce241
AG
4639 goto reconnect;
4640 }
32862ec7
PR
4641 tconn->meta.socket->sk->sk_rcvtimeo =
4642 tconn->net_conf->ping_timeo*HZ/10;
f36af18c 4643 ping_timeout_active = 1;
b411b363
PR
4644 }
4645
32862ec7
PR
4646 /* TODO: conditionally cork; it may hurt latency if we cork without
4647 much to send */
4648 if (!tconn->net_conf->no_cork)
4649 drbd_tcp_cork(tconn->meta.socket);
4650 if (tconn_process_done_ee(tconn))
4651 goto reconnect;
b411b363 4652 /* but unconditionally uncork unless disabled */
32862ec7
PR
4653 if (!tconn->net_conf->no_cork)
4654 drbd_tcp_uncork(tconn->meta.socket);
b411b363
PR
4655
4656 /* short circuit, recv_msg would return EINTR anyways. */
4657 if (signal_pending(current))
4658 continue;
4659
32862ec7
PR
4660 rv = drbd_recv_short(tconn->meta.socket, buf, expect-received, 0);
4661 clear_bit(SIGNAL_ASENDER, &tconn->flags);
b411b363
PR
4662
4663 flush_signals(current);
4664
4665 /* Note:
4666 * -EINTR (on meta) we got a signal
4667 * -EAGAIN (on meta) rcvtimeo expired
4668 * -ECONNRESET other side closed the connection
4669 * -ERESTARTSYS (on data) we got a signal
4670 * rv < 0 other than above: unexpected error!
4671 * rv == expected: full header or command
4672 * rv < expected: "woken" by signal during receive
4673 * rv == 0 : "connection shut down by peer"
4674 */
4675 if (likely(rv > 0)) {
4676 received += rv;
4677 buf += rv;
4678 } else if (rv == 0) {
32862ec7 4679 conn_err(tconn, "meta connection shut down by peer.\n");
b411b363
PR
4680 goto reconnect;
4681 } else if (rv == -EAGAIN) {
cb6518cb
LE
4682 /* If the data socket received something meanwhile,
4683 * that is good enough: peer is still alive. */
32862ec7
PR
4684 if (time_after(tconn->last_received,
4685 jiffies - tconn->meta.socket->sk->sk_rcvtimeo))
cb6518cb 4686 continue;
f36af18c 4687 if (ping_timeout_active) {
32862ec7 4688 conn_err(tconn, "PingAck did not arrive in time.\n");
b411b363
PR
4689 goto reconnect;
4690 }
32862ec7 4691 set_bit(SEND_PING, &tconn->flags);
b411b363
PR
4692 continue;
4693 } else if (rv == -EINTR) {
4694 continue;
4695 } else {
32862ec7 4696 conn_err(tconn, "sock_recvmsg returned %d\n", rv);
b411b363
PR
4697 goto reconnect;
4698 }
4699
4700 if (received == expect && cmd == NULL) {
32862ec7 4701 if (!decode_header(tconn, h, &pi))
b411b363 4702 goto reconnect;
77351055 4703 cmd = get_asender_cmd(pi.cmd);
b411b363 4704 if (unlikely(cmd == NULL)) {
32862ec7 4705 conn_err(tconn, "unknown command %d on meta (l: %d)\n",
77351055 4706 pi.cmd, pi.size);
b411b363
PR
4707 goto disconnect;
4708 }
4709 expect = cmd->pkt_size;
77351055 4710 if (pi.size != expect - sizeof(struct p_header)) {
32862ec7 4711 conn_err(tconn, "Wrong packet size on meta (c: %d, l: %d)\n",
77351055 4712 pi.cmd, pi.size);
b411b363 4713 goto reconnect;
257d0af6 4714 }
b411b363
PR
4715 }
4716 if (received == expect) {
32862ec7
PR
4717 tconn->last_received = jiffies;
4718 if (!cmd->process(vnr_to_mdev(tconn, pi.vnr), pi.cmd))
b411b363
PR
4719 goto reconnect;
4720
f36af18c
LE
4721 /* the idle_timeout (ping-int)
4722 * has been restored in got_PingAck() */
4723 if (cmd == get_asender_cmd(P_PING_ACK))
4724 ping_timeout_active = 0;
4725
b411b363
PR
4726 buf = h;
4727 received = 0;
257d0af6 4728 expect = sizeof(struct p_header);
b411b363
PR
4729 cmd = NULL;
4730 }
4731 }
4732
4733 if (0) {
4734reconnect:
bbeb641c 4735 conn_request_state(tconn, NS(conn, C_NETWORK_FAILURE), CS_HARD);
b411b363
PR
4736 }
4737 if (0) {
4738disconnect:
bbeb641c 4739 conn_request_state(tconn, NS(conn, C_DISCONNECTING), CS_HARD);
b411b363 4740 }
32862ec7 4741 clear_bit(SIGNAL_ASENDER, &tconn->flags);
b411b363 4742
32862ec7 4743 conn_info(tconn, "asender terminated\n");
b411b363
PR
4744
4745 return 0;
4746}
This page took 0.383548 seconds and 5 git commands to generate.