drbd: Implemented connection wide state changes
[deliverable/linux.git] / drivers / block / drbd / drbd_receiver.c
CommitLineData
b411b363
PR
1/*
2 drbd_receiver.c
3
4 This file is part of DRBD by Philipp Reisner and Lars Ellenberg.
5
6 Copyright (C) 2001-2008, LINBIT Information Technologies GmbH.
7 Copyright (C) 1999-2008, Philipp Reisner <philipp.reisner@linbit.com>.
8 Copyright (C) 2002-2008, Lars Ellenberg <lars.ellenberg@linbit.com>.
9
10 drbd is free software; you can redistribute it and/or modify
11 it under the terms of the GNU General Public License as published by
12 the Free Software Foundation; either version 2, or (at your option)
13 any later version.
14
15 drbd is distributed in the hope that it will be useful,
16 but WITHOUT ANY WARRANTY; without even the implied warranty of
17 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18 GNU General Public License for more details.
19
20 You should have received a copy of the GNU General Public License
21 along with drbd; see the file COPYING. If not, write to
22 the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
23 */
24
25
b411b363
PR
26#include <linux/module.h>
27
28#include <asm/uaccess.h>
29#include <net/sock.h>
30
b411b363
PR
31#include <linux/drbd.h>
32#include <linux/fs.h>
33#include <linux/file.h>
34#include <linux/in.h>
35#include <linux/mm.h>
36#include <linux/memcontrol.h>
37#include <linux/mm_inline.h>
38#include <linux/slab.h>
b411b363
PR
39#include <linux/pkt_sched.h>
40#define __KERNEL_SYSCALLS__
41#include <linux/unistd.h>
42#include <linux/vmalloc.h>
43#include <linux/random.h>
b411b363
PR
44#include <linux/string.h>
45#include <linux/scatterlist.h>
46#include "drbd_int.h"
b411b363
PR
47#include "drbd_req.h"
48
49#include "drbd_vli.h"
50
77351055
PR
51struct packet_info {
52 enum drbd_packet cmd;
53 int size;
54 int vnr;
55};
56
b411b363
PR
57enum finish_epoch {
58 FE_STILL_LIVE,
59 FE_DESTROYED,
60 FE_RECYCLED,
61};
62
65d11ed6 63static int drbd_do_handshake(struct drbd_tconn *tconn);
13e6037d 64static int drbd_do_auth(struct drbd_tconn *tconn);
360cc740 65static int drbd_disconnected(int vnr, void *p, void *data);
b411b363
PR
66
67static enum finish_epoch drbd_may_finish_epoch(struct drbd_conf *, struct drbd_epoch *, enum epoch_event);
00d56944 68static int e_end_block(struct drbd_work *, int);
b411b363 69
b411b363
PR
70
71#define GFP_TRY (__GFP_HIGHMEM | __GFP_NOWARN)
72
45bb912b
LE
73/*
74 * some helper functions to deal with single linked page lists,
75 * page->private being our "next" pointer.
76 */
77
78/* If at least n pages are linked at head, get n pages off.
79 * Otherwise, don't modify head, and return NULL.
80 * Locking is the responsibility of the caller.
81 */
82static struct page *page_chain_del(struct page **head, int n)
83{
84 struct page *page;
85 struct page *tmp;
86
87 BUG_ON(!n);
88 BUG_ON(!head);
89
90 page = *head;
23ce4227
PR
91
92 if (!page)
93 return NULL;
94
45bb912b
LE
95 while (page) {
96 tmp = page_chain_next(page);
97 if (--n == 0)
98 break; /* found sufficient pages */
99 if (tmp == NULL)
100 /* insufficient pages, don't use any of them. */
101 return NULL;
102 page = tmp;
103 }
104
105 /* add end of list marker for the returned list */
106 set_page_private(page, 0);
107 /* actual return value, and adjustment of head */
108 page = *head;
109 *head = tmp;
110 return page;
111}
112
113/* may be used outside of locks to find the tail of a (usually short)
114 * "private" page chain, before adding it back to a global chain head
115 * with page_chain_add() under a spinlock. */
116static struct page *page_chain_tail(struct page *page, int *len)
117{
118 struct page *tmp;
119 int i = 1;
120 while ((tmp = page_chain_next(page)))
121 ++i, page = tmp;
122 if (len)
123 *len = i;
124 return page;
125}
126
127static int page_chain_free(struct page *page)
128{
129 struct page *tmp;
130 int i = 0;
131 page_chain_for_each_safe(page, tmp) {
132 put_page(page);
133 ++i;
134 }
135 return i;
136}
137
138static void page_chain_add(struct page **head,
139 struct page *chain_first, struct page *chain_last)
140{
141#if 1
142 struct page *tmp;
143 tmp = page_chain_tail(chain_first, NULL);
144 BUG_ON(tmp != chain_last);
145#endif
146
147 /* add chain to head */
148 set_page_private(chain_last, (unsigned long)*head);
149 *head = chain_first;
150}
151
152static struct page *drbd_pp_first_pages_or_try_alloc(struct drbd_conf *mdev, int number)
b411b363
PR
153{
154 struct page *page = NULL;
45bb912b
LE
155 struct page *tmp = NULL;
156 int i = 0;
b411b363
PR
157
158 /* Yes, testing drbd_pp_vacant outside the lock is racy.
159 * So what. It saves a spin_lock. */
45bb912b 160 if (drbd_pp_vacant >= number) {
b411b363 161 spin_lock(&drbd_pp_lock);
45bb912b
LE
162 page = page_chain_del(&drbd_pp_pool, number);
163 if (page)
164 drbd_pp_vacant -= number;
b411b363 165 spin_unlock(&drbd_pp_lock);
45bb912b
LE
166 if (page)
167 return page;
b411b363 168 }
45bb912b 169
b411b363
PR
170 /* GFP_TRY, because we must not cause arbitrary write-out: in a DRBD
171 * "criss-cross" setup, that might cause write-out on some other DRBD,
172 * which in turn might block on the other node at this very place. */
45bb912b
LE
173 for (i = 0; i < number; i++) {
174 tmp = alloc_page(GFP_TRY);
175 if (!tmp)
176 break;
177 set_page_private(tmp, (unsigned long)page);
178 page = tmp;
179 }
180
181 if (i == number)
182 return page;
183
184 /* Not enough pages immediately available this time.
185 * No need to jump around here, drbd_pp_alloc will retry this
186 * function "soon". */
187 if (page) {
188 tmp = page_chain_tail(page, NULL);
189 spin_lock(&drbd_pp_lock);
190 page_chain_add(&drbd_pp_pool, page, tmp);
191 drbd_pp_vacant += i;
192 spin_unlock(&drbd_pp_lock);
193 }
194 return NULL;
b411b363
PR
195}
196
b411b363
PR
197static void reclaim_net_ee(struct drbd_conf *mdev, struct list_head *to_be_freed)
198{
db830c46 199 struct drbd_peer_request *peer_req;
b411b363
PR
200 struct list_head *le, *tle;
201
202 /* The EEs are always appended to the end of the list. Since
203 they are sent in order over the wire, they have to finish
204 in order. As soon as we see the first not finished we can
205 stop to examine the list... */
206
207 list_for_each_safe(le, tle, &mdev->net_ee) {
db830c46
AG
208 peer_req = list_entry(le, struct drbd_peer_request, w.list);
209 if (drbd_ee_has_active_page(peer_req))
b411b363
PR
210 break;
211 list_move(le, to_be_freed);
212 }
213}
214
215static void drbd_kick_lo_and_reclaim_net(struct drbd_conf *mdev)
216{
217 LIST_HEAD(reclaimed);
db830c46 218 struct drbd_peer_request *peer_req, *t;
b411b363 219
87eeee41 220 spin_lock_irq(&mdev->tconn->req_lock);
b411b363 221 reclaim_net_ee(mdev, &reclaimed);
87eeee41 222 spin_unlock_irq(&mdev->tconn->req_lock);
b411b363 223
db830c46
AG
224 list_for_each_entry_safe(peer_req, t, &reclaimed, w.list)
225 drbd_free_net_ee(mdev, peer_req);
b411b363
PR
226}
227
228/**
45bb912b 229 * drbd_pp_alloc() - Returns @number pages, retries forever (or until signalled)
b411b363 230 * @mdev: DRBD device.
45bb912b
LE
231 * @number: number of pages requested
232 * @retry: whether to retry, if not enough pages are available right now
233 *
234 * Tries to allocate number pages, first from our own page pool, then from
235 * the kernel, unless this allocation would exceed the max_buffers setting.
236 * Possibly retry until DRBD frees sufficient pages somewhere else.
b411b363 237 *
45bb912b 238 * Returns a page chain linked via page->private.
b411b363 239 */
45bb912b 240static struct page *drbd_pp_alloc(struct drbd_conf *mdev, unsigned number, bool retry)
b411b363
PR
241{
242 struct page *page = NULL;
243 DEFINE_WAIT(wait);
244
45bb912b
LE
245 /* Yes, we may run up to @number over max_buffers. If we
246 * follow it strictly, the admin will get it wrong anyways. */
89e58e75 247 if (atomic_read(&mdev->pp_in_use) < mdev->tconn->net_conf->max_buffers)
45bb912b 248 page = drbd_pp_first_pages_or_try_alloc(mdev, number);
b411b363 249
45bb912b 250 while (page == NULL) {
b411b363
PR
251 prepare_to_wait(&drbd_pp_wait, &wait, TASK_INTERRUPTIBLE);
252
253 drbd_kick_lo_and_reclaim_net(mdev);
254
89e58e75 255 if (atomic_read(&mdev->pp_in_use) < mdev->tconn->net_conf->max_buffers) {
45bb912b 256 page = drbd_pp_first_pages_or_try_alloc(mdev, number);
b411b363
PR
257 if (page)
258 break;
259 }
260
261 if (!retry)
262 break;
263
264 if (signal_pending(current)) {
265 dev_warn(DEV, "drbd_pp_alloc interrupted!\n");
266 break;
267 }
268
269 schedule();
270 }
271 finish_wait(&drbd_pp_wait, &wait);
272
45bb912b
LE
273 if (page)
274 atomic_add(number, &mdev->pp_in_use);
b411b363
PR
275 return page;
276}
277
278/* Must not be used from irq, as that may deadlock: see drbd_pp_alloc.
87eeee41 279 * Is also used from inside an other spin_lock_irq(&mdev->tconn->req_lock);
45bb912b
LE
280 * Either links the page chain back to the global pool,
281 * or returns all pages to the system. */
435f0740 282static void drbd_pp_free(struct drbd_conf *mdev, struct page *page, int is_net)
b411b363 283{
435f0740 284 atomic_t *a = is_net ? &mdev->pp_in_use_by_net : &mdev->pp_in_use;
b411b363 285 int i;
435f0740 286
1816a2b4 287 if (drbd_pp_vacant > (DRBD_MAX_BIO_SIZE/PAGE_SIZE)*minor_count)
45bb912b
LE
288 i = page_chain_free(page);
289 else {
290 struct page *tmp;
291 tmp = page_chain_tail(page, &i);
292 spin_lock(&drbd_pp_lock);
293 page_chain_add(&drbd_pp_pool, page, tmp);
294 drbd_pp_vacant += i;
295 spin_unlock(&drbd_pp_lock);
b411b363 296 }
435f0740 297 i = atomic_sub_return(i, a);
45bb912b 298 if (i < 0)
435f0740
LE
299 dev_warn(DEV, "ASSERTION FAILED: %s: %d < 0\n",
300 is_net ? "pp_in_use_by_net" : "pp_in_use", i);
b411b363
PR
301 wake_up(&drbd_pp_wait);
302}
303
304/*
305You need to hold the req_lock:
306 _drbd_wait_ee_list_empty()
307
308You must not have the req_lock:
309 drbd_free_ee()
310 drbd_alloc_ee()
311 drbd_init_ee()
312 drbd_release_ee()
313 drbd_ee_fix_bhs()
314 drbd_process_done_ee()
315 drbd_clear_done_ee()
316 drbd_wait_ee_list_empty()
317*/
318
f6ffca9f
AG
319struct drbd_peer_request *
320drbd_alloc_ee(struct drbd_conf *mdev, u64 id, sector_t sector,
321 unsigned int data_size, gfp_t gfp_mask) __must_hold(local)
b411b363 322{
db830c46 323 struct drbd_peer_request *peer_req;
b411b363 324 struct page *page;
45bb912b 325 unsigned nr_pages = (data_size + PAGE_SIZE -1) >> PAGE_SHIFT;
b411b363 326
0cf9d27e 327 if (drbd_insert_fault(mdev, DRBD_FAULT_AL_EE))
b411b363
PR
328 return NULL;
329
db830c46
AG
330 peer_req = mempool_alloc(drbd_ee_mempool, gfp_mask & ~__GFP_HIGHMEM);
331 if (!peer_req) {
b411b363
PR
332 if (!(gfp_mask & __GFP_NOWARN))
333 dev_err(DEV, "alloc_ee: Allocation of an EE failed\n");
334 return NULL;
335 }
336
45bb912b
LE
337 page = drbd_pp_alloc(mdev, nr_pages, (gfp_mask & __GFP_WAIT));
338 if (!page)
339 goto fail;
b411b363 340
db830c46
AG
341 drbd_clear_interval(&peer_req->i);
342 peer_req->i.size = data_size;
343 peer_req->i.sector = sector;
344 peer_req->i.local = false;
345 peer_req->i.waiting = false;
346
347 peer_req->epoch = NULL;
a21e9298 348 peer_req->w.mdev = mdev;
db830c46
AG
349 peer_req->pages = page;
350 atomic_set(&peer_req->pending_bios, 0);
351 peer_req->flags = 0;
9a8e7753
AG
352 /*
353 * The block_id is opaque to the receiver. It is not endianness
354 * converted, and sent back to the sender unchanged.
355 */
db830c46 356 peer_req->block_id = id;
b411b363 357
db830c46 358 return peer_req;
b411b363 359
45bb912b 360 fail:
db830c46 361 mempool_free(peer_req, drbd_ee_mempool);
b411b363
PR
362 return NULL;
363}
364
db830c46 365void drbd_free_some_ee(struct drbd_conf *mdev, struct drbd_peer_request *peer_req,
f6ffca9f 366 int is_net)
b411b363 367{
db830c46
AG
368 if (peer_req->flags & EE_HAS_DIGEST)
369 kfree(peer_req->digest);
370 drbd_pp_free(mdev, peer_req->pages, is_net);
371 D_ASSERT(atomic_read(&peer_req->pending_bios) == 0);
372 D_ASSERT(drbd_interval_empty(&peer_req->i));
373 mempool_free(peer_req, drbd_ee_mempool);
b411b363
PR
374}
375
376int drbd_release_ee(struct drbd_conf *mdev, struct list_head *list)
377{
378 LIST_HEAD(work_list);
db830c46 379 struct drbd_peer_request *peer_req, *t;
b411b363 380 int count = 0;
435f0740 381 int is_net = list == &mdev->net_ee;
b411b363 382
87eeee41 383 spin_lock_irq(&mdev->tconn->req_lock);
b411b363 384 list_splice_init(list, &work_list);
87eeee41 385 spin_unlock_irq(&mdev->tconn->req_lock);
b411b363 386
db830c46
AG
387 list_for_each_entry_safe(peer_req, t, &work_list, w.list) {
388 drbd_free_some_ee(mdev, peer_req, is_net);
b411b363
PR
389 count++;
390 }
391 return count;
392}
393
394
32862ec7 395/* See also comments in _req_mod(,BARRIER_ACKED)
b411b363
PR
396 * and receive_Barrier.
397 *
398 * Move entries from net_ee to done_ee, if ready.
399 * Grab done_ee, call all callbacks, free the entries.
400 * The callbacks typically send out ACKs.
401 */
402static int drbd_process_done_ee(struct drbd_conf *mdev)
403{
404 LIST_HEAD(work_list);
405 LIST_HEAD(reclaimed);
db830c46 406 struct drbd_peer_request *peer_req, *t;
b411b363
PR
407 int ok = (mdev->state.conn >= C_WF_REPORT_PARAMS);
408
87eeee41 409 spin_lock_irq(&mdev->tconn->req_lock);
b411b363
PR
410 reclaim_net_ee(mdev, &reclaimed);
411 list_splice_init(&mdev->done_ee, &work_list);
87eeee41 412 spin_unlock_irq(&mdev->tconn->req_lock);
b411b363 413
db830c46
AG
414 list_for_each_entry_safe(peer_req, t, &reclaimed, w.list)
415 drbd_free_net_ee(mdev, peer_req);
b411b363
PR
416
417 /* possible callbacks here:
418 * e_end_block, and e_end_resync_block, e_send_discard_ack.
419 * all ignore the last argument.
420 */
db830c46 421 list_for_each_entry_safe(peer_req, t, &work_list, w.list) {
b411b363 422 /* list_del not necessary, next/prev members not touched */
00d56944 423 ok = peer_req->w.cb(&peer_req->w, !ok) && ok;
db830c46 424 drbd_free_ee(mdev, peer_req);
b411b363
PR
425 }
426 wake_up(&mdev->ee_wait);
427
428 return ok;
429}
430
431void _drbd_wait_ee_list_empty(struct drbd_conf *mdev, struct list_head *head)
432{
433 DEFINE_WAIT(wait);
434
435 /* avoids spin_lock/unlock
436 * and calling prepare_to_wait in the fast path */
437 while (!list_empty(head)) {
438 prepare_to_wait(&mdev->ee_wait, &wait, TASK_UNINTERRUPTIBLE);
87eeee41 439 spin_unlock_irq(&mdev->tconn->req_lock);
7eaceacc 440 io_schedule();
b411b363 441 finish_wait(&mdev->ee_wait, &wait);
87eeee41 442 spin_lock_irq(&mdev->tconn->req_lock);
b411b363
PR
443 }
444}
445
446void drbd_wait_ee_list_empty(struct drbd_conf *mdev, struct list_head *head)
447{
87eeee41 448 spin_lock_irq(&mdev->tconn->req_lock);
b411b363 449 _drbd_wait_ee_list_empty(mdev, head);
87eeee41 450 spin_unlock_irq(&mdev->tconn->req_lock);
b411b363
PR
451}
452
453/* see also kernel_accept; which is only present since 2.6.18.
454 * also we want to log which part of it failed, exactly */
7653620d 455static int drbd_accept(const char **what, struct socket *sock, struct socket **newsock)
b411b363
PR
456{
457 struct sock *sk = sock->sk;
458 int err = 0;
459
460 *what = "listen";
461 err = sock->ops->listen(sock, 5);
462 if (err < 0)
463 goto out;
464
465 *what = "sock_create_lite";
466 err = sock_create_lite(sk->sk_family, sk->sk_type, sk->sk_protocol,
467 newsock);
468 if (err < 0)
469 goto out;
470
471 *what = "accept";
472 err = sock->ops->accept(sock, *newsock, 0);
473 if (err < 0) {
474 sock_release(*newsock);
475 *newsock = NULL;
476 goto out;
477 }
478 (*newsock)->ops = sock->ops;
479
480out:
481 return err;
482}
483
dbd9eea0 484static int drbd_recv_short(struct socket *sock, void *buf, size_t size, int flags)
b411b363
PR
485{
486 mm_segment_t oldfs;
487 struct kvec iov = {
488 .iov_base = buf,
489 .iov_len = size,
490 };
491 struct msghdr msg = {
492 .msg_iovlen = 1,
493 .msg_iov = (struct iovec *)&iov,
494 .msg_flags = (flags ? flags : MSG_WAITALL | MSG_NOSIGNAL)
495 };
496 int rv;
497
498 oldfs = get_fs();
499 set_fs(KERNEL_DS);
500 rv = sock_recvmsg(sock, &msg, size, msg.msg_flags);
501 set_fs(oldfs);
502
503 return rv;
504}
505
de0ff338 506static int drbd_recv(struct drbd_tconn *tconn, void *buf, size_t size)
b411b363
PR
507{
508 mm_segment_t oldfs;
509 struct kvec iov = {
510 .iov_base = buf,
511 .iov_len = size,
512 };
513 struct msghdr msg = {
514 .msg_iovlen = 1,
515 .msg_iov = (struct iovec *)&iov,
516 .msg_flags = MSG_WAITALL | MSG_NOSIGNAL
517 };
518 int rv;
519
520 oldfs = get_fs();
521 set_fs(KERNEL_DS);
522
523 for (;;) {
de0ff338 524 rv = sock_recvmsg(tconn->data.socket, &msg, size, msg.msg_flags);
b411b363
PR
525 if (rv == size)
526 break;
527
528 /* Note:
529 * ECONNRESET other side closed the connection
530 * ERESTARTSYS (on sock) we got a signal
531 */
532
533 if (rv < 0) {
534 if (rv == -ECONNRESET)
de0ff338 535 conn_info(tconn, "sock was reset by peer\n");
b411b363 536 else if (rv != -ERESTARTSYS)
de0ff338 537 conn_err(tconn, "sock_recvmsg returned %d\n", rv);
b411b363
PR
538 break;
539 } else if (rv == 0) {
de0ff338 540 conn_info(tconn, "sock was shut down by peer\n");
b411b363
PR
541 break;
542 } else {
543 /* signal came in, or peer/link went down,
544 * after we read a partial message
545 */
546 /* D_ASSERT(signal_pending(current)); */
547 break;
548 }
549 };
550
551 set_fs(oldfs);
552
553 if (rv != size)
bbeb641c 554 conn_request_state(tconn, NS(conn, C_BROKEN_PIPE), CS_HARD);
b411b363
PR
555
556 return rv;
557}
558
5dbf1673
LE
559/* quoting tcp(7):
560 * On individual connections, the socket buffer size must be set prior to the
561 * listen(2) or connect(2) calls in order to have it take effect.
562 * This is our wrapper to do so.
563 */
564static void drbd_setbufsize(struct socket *sock, unsigned int snd,
565 unsigned int rcv)
566{
567 /* open coded SO_SNDBUF, SO_RCVBUF */
568 if (snd) {
569 sock->sk->sk_sndbuf = snd;
570 sock->sk->sk_userlocks |= SOCK_SNDBUF_LOCK;
571 }
572 if (rcv) {
573 sock->sk->sk_rcvbuf = rcv;
574 sock->sk->sk_userlocks |= SOCK_RCVBUF_LOCK;
575 }
576}
577
eac3e990 578static struct socket *drbd_try_connect(struct drbd_tconn *tconn)
b411b363
PR
579{
580 const char *what;
581 struct socket *sock;
582 struct sockaddr_in6 src_in6;
583 int err;
584 int disconnect_on_error = 1;
585
eac3e990 586 if (!get_net_conf(tconn))
b411b363
PR
587 return NULL;
588
589 what = "sock_create_kern";
eac3e990 590 err = sock_create_kern(((struct sockaddr *)tconn->net_conf->my_addr)->sa_family,
b411b363
PR
591 SOCK_STREAM, IPPROTO_TCP, &sock);
592 if (err < 0) {
593 sock = NULL;
594 goto out;
595 }
596
597 sock->sk->sk_rcvtimeo =
eac3e990
PR
598 sock->sk->sk_sndtimeo = tconn->net_conf->try_connect_int*HZ;
599 drbd_setbufsize(sock, tconn->net_conf->sndbuf_size,
600 tconn->net_conf->rcvbuf_size);
b411b363
PR
601
602 /* explicitly bind to the configured IP as source IP
603 * for the outgoing connections.
604 * This is needed for multihomed hosts and to be
605 * able to use lo: interfaces for drbd.
606 * Make sure to use 0 as port number, so linux selects
607 * a free one dynamically.
608 */
eac3e990
PR
609 memcpy(&src_in6, tconn->net_conf->my_addr,
610 min_t(int, tconn->net_conf->my_addr_len, sizeof(src_in6)));
611 if (((struct sockaddr *)tconn->net_conf->my_addr)->sa_family == AF_INET6)
b411b363
PR
612 src_in6.sin6_port = 0;
613 else
614 ((struct sockaddr_in *)&src_in6)->sin_port = 0; /* AF_INET & AF_SCI */
615
616 what = "bind before connect";
617 err = sock->ops->bind(sock,
618 (struct sockaddr *) &src_in6,
eac3e990 619 tconn->net_conf->my_addr_len);
b411b363
PR
620 if (err < 0)
621 goto out;
622
623 /* connect may fail, peer not yet available.
624 * stay C_WF_CONNECTION, don't go Disconnecting! */
625 disconnect_on_error = 0;
626 what = "connect";
627 err = sock->ops->connect(sock,
eac3e990
PR
628 (struct sockaddr *)tconn->net_conf->peer_addr,
629 tconn->net_conf->peer_addr_len, 0);
b411b363
PR
630
631out:
632 if (err < 0) {
633 if (sock) {
634 sock_release(sock);
635 sock = NULL;
636 }
637 switch (-err) {
638 /* timeout, busy, signal pending */
639 case ETIMEDOUT: case EAGAIN: case EINPROGRESS:
640 case EINTR: case ERESTARTSYS:
641 /* peer not (yet) available, network problem */
642 case ECONNREFUSED: case ENETUNREACH:
643 case EHOSTDOWN: case EHOSTUNREACH:
644 disconnect_on_error = 0;
645 break;
646 default:
eac3e990 647 conn_err(tconn, "%s failed, err = %d\n", what, err);
b411b363
PR
648 }
649 if (disconnect_on_error)
bbeb641c 650 conn_request_state(tconn, NS(conn, C_DISCONNECTING), CS_HARD);
b411b363 651 }
eac3e990 652 put_net_conf(tconn);
b411b363
PR
653 return sock;
654}
655
7653620d 656static struct socket *drbd_wait_for_connect(struct drbd_tconn *tconn)
b411b363
PR
657{
658 int timeo, err;
659 struct socket *s_estab = NULL, *s_listen;
660 const char *what;
661
7653620d 662 if (!get_net_conf(tconn))
b411b363
PR
663 return NULL;
664
665 what = "sock_create_kern";
7653620d 666 err = sock_create_kern(((struct sockaddr *)tconn->net_conf->my_addr)->sa_family,
b411b363
PR
667 SOCK_STREAM, IPPROTO_TCP, &s_listen);
668 if (err) {
669 s_listen = NULL;
670 goto out;
671 }
672
7653620d 673 timeo = tconn->net_conf->try_connect_int * HZ;
b411b363
PR
674 timeo += (random32() & 1) ? timeo / 7 : -timeo / 7; /* 28.5% random jitter */
675
676 s_listen->sk->sk_reuse = 1; /* SO_REUSEADDR */
677 s_listen->sk->sk_rcvtimeo = timeo;
678 s_listen->sk->sk_sndtimeo = timeo;
7653620d
PR
679 drbd_setbufsize(s_listen, tconn->net_conf->sndbuf_size,
680 tconn->net_conf->rcvbuf_size);
b411b363
PR
681
682 what = "bind before listen";
683 err = s_listen->ops->bind(s_listen,
7653620d
PR
684 (struct sockaddr *) tconn->net_conf->my_addr,
685 tconn->net_conf->my_addr_len);
b411b363
PR
686 if (err < 0)
687 goto out;
688
7653620d 689 err = drbd_accept(&what, s_listen, &s_estab);
b411b363
PR
690
691out:
692 if (s_listen)
693 sock_release(s_listen);
694 if (err < 0) {
695 if (err != -EAGAIN && err != -EINTR && err != -ERESTARTSYS) {
7653620d 696 conn_err(tconn, "%s failed, err = %d\n", what, err);
bbeb641c 697 conn_request_state(tconn, NS(conn, C_DISCONNECTING), CS_HARD);
b411b363
PR
698 }
699 }
7653620d 700 put_net_conf(tconn);
b411b363
PR
701
702 return s_estab;
703}
704
d38e787e 705static int drbd_send_fp(struct drbd_tconn *tconn, struct socket *sock, enum drbd_packet cmd)
b411b363 706{
d38e787e 707 struct p_header *h = &tconn->data.sbuf.header;
b411b363 708
d38e787e 709 return _conn_send_cmd(tconn, 0, sock, cmd, h, sizeof(*h), 0);
b411b363
PR
710}
711
a25b63f1 712static enum drbd_packet drbd_recv_fp(struct drbd_tconn *tconn, struct socket *sock)
b411b363 713{
a25b63f1 714 struct p_header80 *h = &tconn->data.rbuf.header.h80;
b411b363
PR
715 int rr;
716
dbd9eea0 717 rr = drbd_recv_short(sock, h, sizeof(*h), 0);
b411b363 718
ca9bc12b 719 if (rr == sizeof(*h) && h->magic == cpu_to_be32(DRBD_MAGIC))
b411b363
PR
720 return be16_to_cpu(h->command);
721
722 return 0xffff;
723}
724
725/**
726 * drbd_socket_okay() - Free the socket if its connection is not okay
b411b363
PR
727 * @sock: pointer to the pointer to the socket.
728 */
dbd9eea0 729static int drbd_socket_okay(struct socket **sock)
b411b363
PR
730{
731 int rr;
732 char tb[4];
733
734 if (!*sock)
81e84650 735 return false;
b411b363 736
dbd9eea0 737 rr = drbd_recv_short(*sock, tb, 4, MSG_DONTWAIT | MSG_PEEK);
b411b363
PR
738
739 if (rr > 0 || rr == -EAGAIN) {
81e84650 740 return true;
b411b363
PR
741 } else {
742 sock_release(*sock);
743 *sock = NULL;
81e84650 744 return false;
b411b363
PR
745 }
746}
747
907599e0
PR
748static int drbd_connected(int vnr, void *p, void *data)
749{
750 struct drbd_conf *mdev = (struct drbd_conf *)p;
751 int ok = 1;
752
753 atomic_set(&mdev->packet_seq, 0);
754 mdev->peer_seq = 0;
755
8410da8f
PR
756 mdev->state_mutex = mdev->tconn->agreed_pro_version < 100 ?
757 &mdev->tconn->cstate_mutex :
758 &mdev->own_state_mutex;
759
907599e0
PR
760 ok &= drbd_send_sync_param(mdev, &mdev->sync_conf);
761 ok &= drbd_send_sizes(mdev, 0, 0);
762 ok &= drbd_send_uuids(mdev);
763 ok &= drbd_send_state(mdev);
764 clear_bit(USE_DEGR_WFC_T, &mdev->flags);
765 clear_bit(RESIZE_PENDING, &mdev->flags);
766
8410da8f 767
907599e0
PR
768 return !ok;
769}
770
b411b363
PR
771/*
772 * return values:
773 * 1 yes, we have a valid connection
774 * 0 oops, did not work out, please try again
775 * -1 peer talks different language,
776 * no point in trying again, please go standalone.
777 * -2 We do not have a network config...
778 */
907599e0 779static int drbd_connect(struct drbd_tconn *tconn)
b411b363
PR
780{
781 struct socket *s, *sock, *msock;
782 int try, h, ok;
783
bbeb641c 784 if (conn_request_state(tconn, NS(conn, C_WF_CONNECTION), CS_VERBOSE) < SS_SUCCESS)
b411b363
PR
785 return -2;
786
907599e0
PR
787 clear_bit(DISCARD_CONCURRENT, &tconn->flags);
788 tconn->agreed_pro_version = 99;
fd340c12
PR
789 /* agreed_pro_version must be smaller than 100 so we send the old
790 header (h80) in the first packet and in the handshake packet. */
b411b363
PR
791
792 sock = NULL;
793 msock = NULL;
794
795 do {
796 for (try = 0;;) {
797 /* 3 tries, this should take less than a second! */
907599e0 798 s = drbd_try_connect(tconn);
b411b363
PR
799 if (s || ++try >= 3)
800 break;
801 /* give the other side time to call bind() & listen() */
20ee6390 802 schedule_timeout_interruptible(HZ / 10);
b411b363
PR
803 }
804
805 if (s) {
806 if (!sock) {
907599e0 807 drbd_send_fp(tconn, s, P_HAND_SHAKE_S);
b411b363
PR
808 sock = s;
809 s = NULL;
810 } else if (!msock) {
907599e0 811 drbd_send_fp(tconn, s, P_HAND_SHAKE_M);
b411b363
PR
812 msock = s;
813 s = NULL;
814 } else {
907599e0 815 conn_err(tconn, "Logic error in drbd_connect()\n");
b411b363
PR
816 goto out_release_sockets;
817 }
818 }
819
820 if (sock && msock) {
907599e0 821 schedule_timeout_interruptible(tconn->net_conf->ping_timeo*HZ/10);
dbd9eea0
PR
822 ok = drbd_socket_okay(&sock);
823 ok = drbd_socket_okay(&msock) && ok;
b411b363
PR
824 if (ok)
825 break;
826 }
827
828retry:
907599e0 829 s = drbd_wait_for_connect(tconn);
b411b363 830 if (s) {
907599e0 831 try = drbd_recv_fp(tconn, s);
dbd9eea0
PR
832 drbd_socket_okay(&sock);
833 drbd_socket_okay(&msock);
b411b363
PR
834 switch (try) {
835 case P_HAND_SHAKE_S:
836 if (sock) {
907599e0 837 conn_warn(tconn, "initial packet S crossed\n");
b411b363
PR
838 sock_release(sock);
839 }
840 sock = s;
841 break;
842 case P_HAND_SHAKE_M:
843 if (msock) {
907599e0 844 conn_warn(tconn, "initial packet M crossed\n");
b411b363
PR
845 sock_release(msock);
846 }
847 msock = s;
907599e0 848 set_bit(DISCARD_CONCURRENT, &tconn->flags);
b411b363
PR
849 break;
850 default:
907599e0 851 conn_warn(tconn, "Error receiving initial packet\n");
b411b363
PR
852 sock_release(s);
853 if (random32() & 1)
854 goto retry;
855 }
856 }
857
bbeb641c 858 if (tconn->cstate <= C_DISCONNECTING)
b411b363
PR
859 goto out_release_sockets;
860 if (signal_pending(current)) {
861 flush_signals(current);
862 smp_rmb();
907599e0 863 if (get_t_state(&tconn->receiver) == EXITING)
b411b363
PR
864 goto out_release_sockets;
865 }
866
867 if (sock && msock) {
dbd9eea0
PR
868 ok = drbd_socket_okay(&sock);
869 ok = drbd_socket_okay(&msock) && ok;
b411b363
PR
870 if (ok)
871 break;
872 }
873 } while (1);
874
875 msock->sk->sk_reuse = 1; /* SO_REUSEADDR */
876 sock->sk->sk_reuse = 1; /* SO_REUSEADDR */
877
878 sock->sk->sk_allocation = GFP_NOIO;
879 msock->sk->sk_allocation = GFP_NOIO;
880
881 sock->sk->sk_priority = TC_PRIO_INTERACTIVE_BULK;
882 msock->sk->sk_priority = TC_PRIO_INTERACTIVE;
883
b411b363 884 /* NOT YET ...
907599e0 885 * sock->sk->sk_sndtimeo = tconn->net_conf->timeout*HZ/10;
b411b363
PR
886 * sock->sk->sk_rcvtimeo = MAX_SCHEDULE_TIMEOUT;
887 * first set it to the P_HAND_SHAKE timeout,
888 * which we set to 4x the configured ping_timeout. */
889 sock->sk->sk_sndtimeo =
907599e0 890 sock->sk->sk_rcvtimeo = tconn->net_conf->ping_timeo*4*HZ/10;
b411b363 891
907599e0
PR
892 msock->sk->sk_sndtimeo = tconn->net_conf->timeout*HZ/10;
893 msock->sk->sk_rcvtimeo = tconn->net_conf->ping_int*HZ;
b411b363
PR
894
895 /* we don't want delays.
25985edc 896 * we use TCP_CORK where appropriate, though */
b411b363
PR
897 drbd_tcp_nodelay(sock);
898 drbd_tcp_nodelay(msock);
899
907599e0
PR
900 tconn->data.socket = sock;
901 tconn->meta.socket = msock;
902 tconn->last_received = jiffies;
b411b363 903
907599e0 904 h = drbd_do_handshake(tconn);
b411b363
PR
905 if (h <= 0)
906 return h;
907
907599e0 908 if (tconn->cram_hmac_tfm) {
b411b363 909 /* drbd_request_state(mdev, NS(conn, WFAuth)); */
907599e0 910 switch (drbd_do_auth(tconn)) {
b10d96cb 911 case -1:
907599e0 912 conn_err(tconn, "Authentication of peer failed\n");
b411b363 913 return -1;
b10d96cb 914 case 0:
907599e0 915 conn_err(tconn, "Authentication of peer failed, trying again.\n");
b10d96cb 916 return 0;
b411b363
PR
917 }
918 }
919
bbeb641c 920 if (conn_request_state(tconn, NS(conn, C_WF_REPORT_PARAMS), CS_VERBOSE) < SS_SUCCESS)
b411b363
PR
921 return 0;
922
907599e0 923 sock->sk->sk_sndtimeo = tconn->net_conf->timeout*HZ/10;
b411b363
PR
924 sock->sk->sk_rcvtimeo = MAX_SCHEDULE_TIMEOUT;
925
907599e0 926 drbd_thread_start(&tconn->asender);
b411b363 927
907599e0 928 if (drbd_send_protocol(tconn) == -1)
7e2455c1 929 return -1;
b411b363 930
907599e0 931 return !idr_for_each(&tconn->volumes, drbd_connected, tconn);
b411b363
PR
932
933out_release_sockets:
934 if (sock)
935 sock_release(sock);
936 if (msock)
937 sock_release(msock);
938 return -1;
939}
940
ce243853 941static bool decode_header(struct drbd_tconn *tconn, struct p_header *h, struct packet_info *pi)
b411b363 942{
fd340c12 943 if (h->h80.magic == cpu_to_be32(DRBD_MAGIC)) {
77351055
PR
944 pi->cmd = be16_to_cpu(h->h80.command);
945 pi->size = be16_to_cpu(h->h80.length);
eefc2f7d 946 pi->vnr = 0;
ca9bc12b 947 } else if (h->h95.magic == cpu_to_be16(DRBD_MAGIC_BIG)) {
77351055
PR
948 pi->cmd = be16_to_cpu(h->h95.command);
949 pi->size = be32_to_cpu(h->h95.length) & 0x00ffffff;
950 pi->vnr = 0;
02918be2 951 } else {
ce243853 952 conn_err(tconn, "magic?? on data m: 0x%08x c: %d l: %d\n",
004352fa
LE
953 be32_to_cpu(h->h80.magic),
954 be16_to_cpu(h->h80.command),
955 be16_to_cpu(h->h80.length));
81e84650 956 return false;
b411b363 957 }
257d0af6
PR
958 return true;
959}
960
9ba7aa00 961static int drbd_recv_header(struct drbd_tconn *tconn, struct packet_info *pi)
257d0af6 962{
9ba7aa00 963 struct p_header *h = &tconn->data.rbuf.header;
257d0af6
PR
964 int r;
965
9ba7aa00 966 r = drbd_recv(tconn, h, sizeof(*h));
257d0af6
PR
967 if (unlikely(r != sizeof(*h))) {
968 if (!signal_pending(current))
9ba7aa00 969 conn_warn(tconn, "short read expecting header on sock: r=%d\n", r);
257d0af6
PR
970 return false;
971 }
972
9ba7aa00
PR
973 r = decode_header(tconn, h, pi);
974 tconn->last_received = jiffies;
b411b363 975
257d0af6 976 return r;
b411b363
PR
977}
978
2451fc3b 979static void drbd_flush(struct drbd_conf *mdev)
b411b363
PR
980{
981 int rv;
982
983 if (mdev->write_ordering >= WO_bdev_flush && get_ldev(mdev)) {
fbd9b09a 984 rv = blkdev_issue_flush(mdev->ldev->backing_bdev, GFP_KERNEL,
dd3932ed 985 NULL);
b411b363
PR
986 if (rv) {
987 dev_err(DEV, "local disk flush failed with status %d\n", rv);
988 /* would rather check on EOPNOTSUPP, but that is not reliable.
989 * don't try again for ANY return value != 0
990 * if (rv == -EOPNOTSUPP) */
991 drbd_bump_write_ordering(mdev, WO_drain_io);
992 }
993 put_ldev(mdev);
994 }
b411b363
PR
995}
996
997/**
998 * drbd_may_finish_epoch() - Applies an epoch_event to the epoch's state, eventually finishes it.
999 * @mdev: DRBD device.
1000 * @epoch: Epoch object.
1001 * @ev: Epoch event.
1002 */
1003static enum finish_epoch drbd_may_finish_epoch(struct drbd_conf *mdev,
1004 struct drbd_epoch *epoch,
1005 enum epoch_event ev)
1006{
2451fc3b 1007 int epoch_size;
b411b363 1008 struct drbd_epoch *next_epoch;
b411b363
PR
1009 enum finish_epoch rv = FE_STILL_LIVE;
1010
1011 spin_lock(&mdev->epoch_lock);
1012 do {
1013 next_epoch = NULL;
b411b363
PR
1014
1015 epoch_size = atomic_read(&epoch->epoch_size);
1016
1017 switch (ev & ~EV_CLEANUP) {
1018 case EV_PUT:
1019 atomic_dec(&epoch->active);
1020 break;
1021 case EV_GOT_BARRIER_NR:
1022 set_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags);
b411b363
PR
1023 break;
1024 case EV_BECAME_LAST:
1025 /* nothing to do*/
1026 break;
1027 }
1028
b411b363
PR
1029 if (epoch_size != 0 &&
1030 atomic_read(&epoch->active) == 0 &&
2451fc3b 1031 test_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags)) {
b411b363
PR
1032 if (!(ev & EV_CLEANUP)) {
1033 spin_unlock(&mdev->epoch_lock);
1034 drbd_send_b_ack(mdev, epoch->barrier_nr, epoch_size);
1035 spin_lock(&mdev->epoch_lock);
1036 }
1037 dec_unacked(mdev);
1038
1039 if (mdev->current_epoch != epoch) {
1040 next_epoch = list_entry(epoch->list.next, struct drbd_epoch, list);
1041 list_del(&epoch->list);
1042 ev = EV_BECAME_LAST | (ev & EV_CLEANUP);
1043 mdev->epochs--;
b411b363
PR
1044 kfree(epoch);
1045
1046 if (rv == FE_STILL_LIVE)
1047 rv = FE_DESTROYED;
1048 } else {
1049 epoch->flags = 0;
1050 atomic_set(&epoch->epoch_size, 0);
698f9315 1051 /* atomic_set(&epoch->active, 0); is already zero */
b411b363
PR
1052 if (rv == FE_STILL_LIVE)
1053 rv = FE_RECYCLED;
2451fc3b 1054 wake_up(&mdev->ee_wait);
b411b363
PR
1055 }
1056 }
1057
1058 if (!next_epoch)
1059 break;
1060
1061 epoch = next_epoch;
1062 } while (1);
1063
1064 spin_unlock(&mdev->epoch_lock);
1065
b411b363
PR
1066 return rv;
1067}
1068
1069/**
1070 * drbd_bump_write_ordering() - Fall back to an other write ordering method
1071 * @mdev: DRBD device.
1072 * @wo: Write ordering method to try.
1073 */
1074void drbd_bump_write_ordering(struct drbd_conf *mdev, enum write_ordering_e wo) __must_hold(local)
1075{
1076 enum write_ordering_e pwo;
1077 static char *write_ordering_str[] = {
1078 [WO_none] = "none",
1079 [WO_drain_io] = "drain",
1080 [WO_bdev_flush] = "flush",
b411b363
PR
1081 };
1082
1083 pwo = mdev->write_ordering;
1084 wo = min(pwo, wo);
b411b363
PR
1085 if (wo == WO_bdev_flush && mdev->ldev->dc.no_disk_flush)
1086 wo = WO_drain_io;
1087 if (wo == WO_drain_io && mdev->ldev->dc.no_disk_drain)
1088 wo = WO_none;
1089 mdev->write_ordering = wo;
2451fc3b 1090 if (pwo != mdev->write_ordering || wo == WO_bdev_flush)
b411b363
PR
1091 dev_info(DEV, "Method to ensure write ordering: %s\n", write_ordering_str[mdev->write_ordering]);
1092}
1093
45bb912b
LE
1094/**
1095 * drbd_submit_ee()
1096 * @mdev: DRBD device.
db830c46 1097 * @peer_req: peer request
45bb912b 1098 * @rw: flag field, see bio->bi_rw
10f6d992
LE
1099 *
1100 * May spread the pages to multiple bios,
1101 * depending on bio_add_page restrictions.
1102 *
1103 * Returns 0 if all bios have been submitted,
1104 * -ENOMEM if we could not allocate enough bios,
1105 * -ENOSPC (any better suggestion?) if we have not been able to bio_add_page a
1106 * single page to an empty bio (which should never happen and likely indicates
1107 * that the lower level IO stack is in some way broken). This has been observed
1108 * on certain Xen deployments.
45bb912b
LE
1109 */
1110/* TODO allocate from our own bio_set. */
db830c46 1111int drbd_submit_ee(struct drbd_conf *mdev, struct drbd_peer_request *peer_req,
f6ffca9f 1112 const unsigned rw, const int fault_type)
45bb912b
LE
1113{
1114 struct bio *bios = NULL;
1115 struct bio *bio;
db830c46
AG
1116 struct page *page = peer_req->pages;
1117 sector_t sector = peer_req->i.sector;
1118 unsigned ds = peer_req->i.size;
45bb912b
LE
1119 unsigned n_bios = 0;
1120 unsigned nr_pages = (ds + PAGE_SIZE -1) >> PAGE_SHIFT;
10f6d992 1121 int err = -ENOMEM;
45bb912b
LE
1122
1123 /* In most cases, we will only need one bio. But in case the lower
1124 * level restrictions happen to be different at this offset on this
1125 * side than those of the sending peer, we may need to submit the
1126 * request in more than one bio. */
1127next_bio:
1128 bio = bio_alloc(GFP_NOIO, nr_pages);
1129 if (!bio) {
1130 dev_err(DEV, "submit_ee: Allocation of a bio failed\n");
1131 goto fail;
1132 }
db830c46 1133 /* > peer_req->i.sector, unless this is the first bio */
45bb912b
LE
1134 bio->bi_sector = sector;
1135 bio->bi_bdev = mdev->ldev->backing_bdev;
45bb912b 1136 bio->bi_rw = rw;
db830c46 1137 bio->bi_private = peer_req;
45bb912b
LE
1138 bio->bi_end_io = drbd_endio_sec;
1139
1140 bio->bi_next = bios;
1141 bios = bio;
1142 ++n_bios;
1143
1144 page_chain_for_each(page) {
1145 unsigned len = min_t(unsigned, ds, PAGE_SIZE);
1146 if (!bio_add_page(bio, page, len, 0)) {
10f6d992
LE
1147 /* A single page must always be possible!
1148 * But in case it fails anyways,
1149 * we deal with it, and complain (below). */
1150 if (bio->bi_vcnt == 0) {
1151 dev_err(DEV,
1152 "bio_add_page failed for len=%u, "
1153 "bi_vcnt=0 (bi_sector=%llu)\n",
1154 len, (unsigned long long)bio->bi_sector);
1155 err = -ENOSPC;
1156 goto fail;
1157 }
45bb912b
LE
1158 goto next_bio;
1159 }
1160 ds -= len;
1161 sector += len >> 9;
1162 --nr_pages;
1163 }
1164 D_ASSERT(page == NULL);
1165 D_ASSERT(ds == 0);
1166
db830c46 1167 atomic_set(&peer_req->pending_bios, n_bios);
45bb912b
LE
1168 do {
1169 bio = bios;
1170 bios = bios->bi_next;
1171 bio->bi_next = NULL;
1172
45bb912b 1173 drbd_generic_make_request(mdev, fault_type, bio);
45bb912b 1174 } while (bios);
45bb912b
LE
1175 return 0;
1176
1177fail:
1178 while (bios) {
1179 bio = bios;
1180 bios = bios->bi_next;
1181 bio_put(bio);
1182 }
10f6d992 1183 return err;
45bb912b
LE
1184}
1185
53840641 1186static void drbd_remove_epoch_entry_interval(struct drbd_conf *mdev,
db830c46 1187 struct drbd_peer_request *peer_req)
53840641 1188{
db830c46 1189 struct drbd_interval *i = &peer_req->i;
53840641
AG
1190
1191 drbd_remove_interval(&mdev->write_requests, i);
1192 drbd_clear_interval(i);
1193
6c852bec 1194 /* Wake up any processes waiting for this peer request to complete. */
53840641
AG
1195 if (i->waiting)
1196 wake_up(&mdev->misc_wait);
1197}
1198
d8763023
AG
1199static int receive_Barrier(struct drbd_conf *mdev, enum drbd_packet cmd,
1200 unsigned int data_size)
b411b363 1201{
2451fc3b 1202 int rv;
e42325a5 1203 struct p_barrier *p = &mdev->tconn->data.rbuf.barrier;
b411b363
PR
1204 struct drbd_epoch *epoch;
1205
b411b363
PR
1206 inc_unacked(mdev);
1207
b411b363
PR
1208 mdev->current_epoch->barrier_nr = p->barrier;
1209 rv = drbd_may_finish_epoch(mdev, mdev->current_epoch, EV_GOT_BARRIER_NR);
1210
1211 /* P_BARRIER_ACK may imply that the corresponding extent is dropped from
1212 * the activity log, which means it would not be resynced in case the
1213 * R_PRIMARY crashes now.
1214 * Therefore we must send the barrier_ack after the barrier request was
1215 * completed. */
1216 switch (mdev->write_ordering) {
b411b363
PR
1217 case WO_none:
1218 if (rv == FE_RECYCLED)
81e84650 1219 return true;
2451fc3b
PR
1220
1221 /* receiver context, in the writeout path of the other node.
1222 * avoid potential distributed deadlock */
1223 epoch = kmalloc(sizeof(struct drbd_epoch), GFP_NOIO);
1224 if (epoch)
1225 break;
1226 else
1227 dev_warn(DEV, "Allocation of an epoch failed, slowing down\n");
1228 /* Fall through */
b411b363
PR
1229
1230 case WO_bdev_flush:
1231 case WO_drain_io:
b411b363 1232 drbd_wait_ee_list_empty(mdev, &mdev->active_ee);
2451fc3b
PR
1233 drbd_flush(mdev);
1234
1235 if (atomic_read(&mdev->current_epoch->epoch_size)) {
1236 epoch = kmalloc(sizeof(struct drbd_epoch), GFP_NOIO);
1237 if (epoch)
1238 break;
b411b363
PR
1239 }
1240
2451fc3b
PR
1241 epoch = mdev->current_epoch;
1242 wait_event(mdev->ee_wait, atomic_read(&epoch->epoch_size) == 0);
1243
1244 D_ASSERT(atomic_read(&epoch->active) == 0);
1245 D_ASSERT(epoch->flags == 0);
b411b363 1246
81e84650 1247 return true;
2451fc3b
PR
1248 default:
1249 dev_err(DEV, "Strangeness in mdev->write_ordering %d\n", mdev->write_ordering);
81e84650 1250 return false;
b411b363
PR
1251 }
1252
1253 epoch->flags = 0;
1254 atomic_set(&epoch->epoch_size, 0);
1255 atomic_set(&epoch->active, 0);
1256
1257 spin_lock(&mdev->epoch_lock);
1258 if (atomic_read(&mdev->current_epoch->epoch_size)) {
1259 list_add(&epoch->list, &mdev->current_epoch->list);
1260 mdev->current_epoch = epoch;
1261 mdev->epochs++;
b411b363
PR
1262 } else {
1263 /* The current_epoch got recycled while we allocated this one... */
1264 kfree(epoch);
1265 }
1266 spin_unlock(&mdev->epoch_lock);
1267
81e84650 1268 return true;
b411b363
PR
1269}
1270
1271/* used from receive_RSDataReply (recv_resync_read)
1272 * and from receive_Data */
f6ffca9f
AG
1273static struct drbd_peer_request *
1274read_in_block(struct drbd_conf *mdev, u64 id, sector_t sector,
1275 int data_size) __must_hold(local)
b411b363 1276{
6666032a 1277 const sector_t capacity = drbd_get_capacity(mdev->this_bdev);
db830c46 1278 struct drbd_peer_request *peer_req;
b411b363 1279 struct page *page;
45bb912b 1280 int dgs, ds, rr;
a0638456
PR
1281 void *dig_in = mdev->tconn->int_dig_in;
1282 void *dig_vv = mdev->tconn->int_dig_vv;
6b4388ac 1283 unsigned long *data;
b411b363 1284
a0638456
PR
1285 dgs = (mdev->tconn->agreed_pro_version >= 87 && mdev->tconn->integrity_r_tfm) ?
1286 crypto_hash_digestsize(mdev->tconn->integrity_r_tfm) : 0;
b411b363
PR
1287
1288 if (dgs) {
de0ff338 1289 rr = drbd_recv(mdev->tconn, dig_in, dgs);
b411b363 1290 if (rr != dgs) {
0ddc5549
LE
1291 if (!signal_pending(current))
1292 dev_warn(DEV,
1293 "short read receiving data digest: read %d expected %d\n",
1294 rr, dgs);
b411b363
PR
1295 return NULL;
1296 }
1297 }
1298
1299 data_size -= dgs;
1300
841ce241
AG
1301 if (!expect(data_size != 0))
1302 return NULL;
1303 if (!expect(IS_ALIGNED(data_size, 512)))
1304 return NULL;
1305 if (!expect(data_size <= DRBD_MAX_BIO_SIZE))
1306 return NULL;
b411b363 1307
6666032a
LE
1308 /* even though we trust out peer,
1309 * we sometimes have to double check. */
1310 if (sector + (data_size>>9) > capacity) {
fdda6544
LE
1311 dev_err(DEV, "request from peer beyond end of local disk: "
1312 "capacity: %llus < sector: %llus + size: %u\n",
6666032a
LE
1313 (unsigned long long)capacity,
1314 (unsigned long long)sector, data_size);
1315 return NULL;
1316 }
1317
b411b363
PR
1318 /* GFP_NOIO, because we must not cause arbitrary write-out: in a DRBD
1319 * "criss-cross" setup, that might cause write-out on some other DRBD,
1320 * which in turn might block on the other node at this very place. */
db830c46
AG
1321 peer_req = drbd_alloc_ee(mdev, id, sector, data_size, GFP_NOIO);
1322 if (!peer_req)
b411b363 1323 return NULL;
45bb912b 1324
b411b363 1325 ds = data_size;
db830c46 1326 page = peer_req->pages;
45bb912b
LE
1327 page_chain_for_each(page) {
1328 unsigned len = min_t(int, ds, PAGE_SIZE);
6b4388ac 1329 data = kmap(page);
de0ff338 1330 rr = drbd_recv(mdev->tconn, data, len);
0cf9d27e 1331 if (drbd_insert_fault(mdev, DRBD_FAULT_RECEIVE)) {
6b4388ac
PR
1332 dev_err(DEV, "Fault injection: Corrupting data on receive\n");
1333 data[0] = data[0] ^ (unsigned long)-1;
1334 }
b411b363 1335 kunmap(page);
45bb912b 1336 if (rr != len) {
db830c46 1337 drbd_free_ee(mdev, peer_req);
0ddc5549
LE
1338 if (!signal_pending(current))
1339 dev_warn(DEV, "short read receiving data: read %d expected %d\n",
1340 rr, len);
b411b363
PR
1341 return NULL;
1342 }
1343 ds -= rr;
1344 }
1345
1346 if (dgs) {
db830c46 1347 drbd_csum_ee(mdev, mdev->tconn->integrity_r_tfm, peer_req, dig_vv);
b411b363 1348 if (memcmp(dig_in, dig_vv, dgs)) {
470be44a
LE
1349 dev_err(DEV, "Digest integrity check FAILED: %llus +%u\n",
1350 (unsigned long long)sector, data_size);
b411b363 1351 drbd_bcast_ee(mdev, "digest failed",
db830c46
AG
1352 dgs, dig_in, dig_vv, peer_req);
1353 drbd_free_ee(mdev, peer_req);
b411b363
PR
1354 return NULL;
1355 }
1356 }
1357 mdev->recv_cnt += data_size>>9;
db830c46 1358 return peer_req;
b411b363
PR
1359}
1360
1361/* drbd_drain_block() just takes a data block
1362 * out of the socket input buffer, and discards it.
1363 */
1364static int drbd_drain_block(struct drbd_conf *mdev, int data_size)
1365{
1366 struct page *page;
1367 int rr, rv = 1;
1368 void *data;
1369
c3470cde 1370 if (!data_size)
81e84650 1371 return true;
c3470cde 1372
45bb912b 1373 page = drbd_pp_alloc(mdev, 1, 1);
b411b363
PR
1374
1375 data = kmap(page);
1376 while (data_size) {
de0ff338 1377 rr = drbd_recv(mdev->tconn, data, min_t(int, data_size, PAGE_SIZE));
b411b363
PR
1378 if (rr != min_t(int, data_size, PAGE_SIZE)) {
1379 rv = 0;
0ddc5549
LE
1380 if (!signal_pending(current))
1381 dev_warn(DEV,
1382 "short read receiving data: read %d expected %d\n",
1383 rr, min_t(int, data_size, PAGE_SIZE));
b411b363
PR
1384 break;
1385 }
1386 data_size -= rr;
1387 }
1388 kunmap(page);
435f0740 1389 drbd_pp_free(mdev, page, 0);
b411b363
PR
1390 return rv;
1391}
1392
1393static int recv_dless_read(struct drbd_conf *mdev, struct drbd_request *req,
1394 sector_t sector, int data_size)
1395{
1396 struct bio_vec *bvec;
1397 struct bio *bio;
1398 int dgs, rr, i, expect;
a0638456
PR
1399 void *dig_in = mdev->tconn->int_dig_in;
1400 void *dig_vv = mdev->tconn->int_dig_vv;
b411b363 1401
a0638456
PR
1402 dgs = (mdev->tconn->agreed_pro_version >= 87 && mdev->tconn->integrity_r_tfm) ?
1403 crypto_hash_digestsize(mdev->tconn->integrity_r_tfm) : 0;
b411b363
PR
1404
1405 if (dgs) {
de0ff338 1406 rr = drbd_recv(mdev->tconn, dig_in, dgs);
b411b363 1407 if (rr != dgs) {
0ddc5549
LE
1408 if (!signal_pending(current))
1409 dev_warn(DEV,
1410 "short read receiving data reply digest: read %d expected %d\n",
1411 rr, dgs);
b411b363
PR
1412 return 0;
1413 }
1414 }
1415
1416 data_size -= dgs;
1417
1418 /* optimistically update recv_cnt. if receiving fails below,
1419 * we disconnect anyways, and counters will be reset. */
1420 mdev->recv_cnt += data_size>>9;
1421
1422 bio = req->master_bio;
1423 D_ASSERT(sector == bio->bi_sector);
1424
1425 bio_for_each_segment(bvec, bio, i) {
1426 expect = min_t(int, data_size, bvec->bv_len);
de0ff338 1427 rr = drbd_recv(mdev->tconn,
b411b363
PR
1428 kmap(bvec->bv_page)+bvec->bv_offset,
1429 expect);
1430 kunmap(bvec->bv_page);
1431 if (rr != expect) {
0ddc5549
LE
1432 if (!signal_pending(current))
1433 dev_warn(DEV, "short read receiving data reply: "
1434 "read %d expected %d\n",
1435 rr, expect);
b411b363
PR
1436 return 0;
1437 }
1438 data_size -= rr;
1439 }
1440
1441 if (dgs) {
a0638456 1442 drbd_csum_bio(mdev, mdev->tconn->integrity_r_tfm, bio, dig_vv);
b411b363
PR
1443 if (memcmp(dig_in, dig_vv, dgs)) {
1444 dev_err(DEV, "Digest integrity check FAILED. Broken NICs?\n");
1445 return 0;
1446 }
1447 }
1448
1449 D_ASSERT(data_size == 0);
1450 return 1;
1451}
1452
1453/* e_end_resync_block() is called via
1454 * drbd_process_done_ee() by asender only */
00d56944 1455static int e_end_resync_block(struct drbd_work *w, int unused)
b411b363 1456{
db830c46 1457 struct drbd_peer_request *peer_req = (struct drbd_peer_request *)w;
00d56944 1458 struct drbd_conf *mdev = w->mdev;
db830c46 1459 sector_t sector = peer_req->i.sector;
b411b363
PR
1460 int ok;
1461
db830c46 1462 D_ASSERT(drbd_interval_empty(&peer_req->i));
b411b363 1463
db830c46
AG
1464 if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
1465 drbd_set_in_sync(mdev, sector, peer_req->i.size);
1466 ok = drbd_send_ack(mdev, P_RS_WRITE_ACK, peer_req);
b411b363
PR
1467 } else {
1468 /* Record failure to sync */
db830c46 1469 drbd_rs_failed_io(mdev, sector, peer_req->i.size);
b411b363 1470
db830c46 1471 ok = drbd_send_ack(mdev, P_NEG_ACK, peer_req);
b411b363
PR
1472 }
1473 dec_unacked(mdev);
1474
1475 return ok;
1476}
1477
1478static int recv_resync_read(struct drbd_conf *mdev, sector_t sector, int data_size) __releases(local)
1479{
db830c46 1480 struct drbd_peer_request *peer_req;
b411b363 1481
db830c46
AG
1482 peer_req = read_in_block(mdev, ID_SYNCER, sector, data_size);
1483 if (!peer_req)
45bb912b 1484 goto fail;
b411b363
PR
1485
1486 dec_rs_pending(mdev);
1487
b411b363
PR
1488 inc_unacked(mdev);
1489 /* corresponding dec_unacked() in e_end_resync_block()
1490 * respective _drbd_clear_done_ee */
1491
db830c46 1492 peer_req->w.cb = e_end_resync_block;
45bb912b 1493
87eeee41 1494 spin_lock_irq(&mdev->tconn->req_lock);
db830c46 1495 list_add(&peer_req->w.list, &mdev->sync_ee);
87eeee41 1496 spin_unlock_irq(&mdev->tconn->req_lock);
b411b363 1497
0f0601f4 1498 atomic_add(data_size >> 9, &mdev->rs_sect_ev);
db830c46 1499 if (drbd_submit_ee(mdev, peer_req, WRITE, DRBD_FAULT_RS_WR) == 0)
81e84650 1500 return true;
b411b363 1501
10f6d992
LE
1502 /* don't care for the reason here */
1503 dev_err(DEV, "submit failed, triggering re-connect\n");
87eeee41 1504 spin_lock_irq(&mdev->tconn->req_lock);
db830c46 1505 list_del(&peer_req->w.list);
87eeee41 1506 spin_unlock_irq(&mdev->tconn->req_lock);
22cc37a9 1507
db830c46 1508 drbd_free_ee(mdev, peer_req);
45bb912b
LE
1509fail:
1510 put_ldev(mdev);
81e84650 1511 return false;
b411b363
PR
1512}
1513
668eebc6 1514static struct drbd_request *
bc9c5c41
AG
1515find_request(struct drbd_conf *mdev, struct rb_root *root, u64 id,
1516 sector_t sector, bool missing_ok, const char *func)
51624585 1517{
51624585
AG
1518 struct drbd_request *req;
1519
bc9c5c41
AG
1520 /* Request object according to our peer */
1521 req = (struct drbd_request *)(unsigned long)id;
5e472264 1522 if (drbd_contains_interval(root, sector, &req->i) && req->i.local)
668eebc6 1523 return req;
c3afd8f5
AG
1524 if (!missing_ok) {
1525 dev_err(DEV, "%s: failed to find request %lu, sector %llus\n", func,
1526 (unsigned long)id, (unsigned long long)sector);
1527 }
51624585
AG
1528 return NULL;
1529}
1530
d8763023
AG
1531static int receive_DataReply(struct drbd_conf *mdev, enum drbd_packet cmd,
1532 unsigned int data_size)
b411b363
PR
1533{
1534 struct drbd_request *req;
1535 sector_t sector;
b411b363 1536 int ok;
e42325a5 1537 struct p_data *p = &mdev->tconn->data.rbuf.data;
b411b363
PR
1538
1539 sector = be64_to_cpu(p->sector);
1540
87eeee41 1541 spin_lock_irq(&mdev->tconn->req_lock);
bc9c5c41 1542 req = find_request(mdev, &mdev->read_requests, p->block_id, sector, false, __func__);
87eeee41 1543 spin_unlock_irq(&mdev->tconn->req_lock);
c3afd8f5 1544 if (unlikely(!req))
81e84650 1545 return false;
b411b363 1546
24c4830c 1547 /* hlist_del(&req->collision) is done in _req_may_be_done, to avoid
b411b363
PR
1548 * special casing it there for the various failure cases.
1549 * still no race with drbd_fail_pending_reads */
1550 ok = recv_dless_read(mdev, req, sector, data_size);
1551
1552 if (ok)
8554df1c 1553 req_mod(req, DATA_RECEIVED);
b411b363
PR
1554 /* else: nothing. handled from drbd_disconnect...
1555 * I don't think we may complete this just yet
1556 * in case we are "on-disconnect: freeze" */
1557
1558 return ok;
1559}
1560
d8763023
AG
1561static int receive_RSDataReply(struct drbd_conf *mdev, enum drbd_packet cmd,
1562 unsigned int data_size)
b411b363
PR
1563{
1564 sector_t sector;
b411b363 1565 int ok;
e42325a5 1566 struct p_data *p = &mdev->tconn->data.rbuf.data;
b411b363
PR
1567
1568 sector = be64_to_cpu(p->sector);
1569 D_ASSERT(p->block_id == ID_SYNCER);
1570
1571 if (get_ldev(mdev)) {
1572 /* data is submitted to disk within recv_resync_read.
1573 * corresponding put_ldev done below on error,
9c50842a 1574 * or in drbd_endio_sec. */
b411b363
PR
1575 ok = recv_resync_read(mdev, sector, data_size);
1576 } else {
1577 if (__ratelimit(&drbd_ratelimit_state))
1578 dev_err(DEV, "Can not write resync data to local disk.\n");
1579
1580 ok = drbd_drain_block(mdev, data_size);
1581
2b2bf214 1582 drbd_send_ack_dp(mdev, P_NEG_ACK, p, data_size);
b411b363
PR
1583 }
1584
778f271d
PR
1585 atomic_add(data_size >> 9, &mdev->rs_sect_in);
1586
b411b363
PR
1587 return ok;
1588}
1589
1590/* e_end_block() is called via drbd_process_done_ee().
1591 * this means this function only runs in the asender thread
1592 */
00d56944 1593static int e_end_block(struct drbd_work *w, int cancel)
b411b363 1594{
db830c46 1595 struct drbd_peer_request *peer_req = (struct drbd_peer_request *)w;
00d56944 1596 struct drbd_conf *mdev = w->mdev;
db830c46 1597 sector_t sector = peer_req->i.sector;
b411b363
PR
1598 int ok = 1, pcmd;
1599
89e58e75 1600 if (mdev->tconn->net_conf->wire_protocol == DRBD_PROT_C) {
db830c46 1601 if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
b411b363
PR
1602 pcmd = (mdev->state.conn >= C_SYNC_SOURCE &&
1603 mdev->state.conn <= C_PAUSED_SYNC_T &&
db830c46 1604 peer_req->flags & EE_MAY_SET_IN_SYNC) ?
b411b363 1605 P_RS_WRITE_ACK : P_WRITE_ACK;
db830c46 1606 ok &= drbd_send_ack(mdev, pcmd, peer_req);
b411b363 1607 if (pcmd == P_RS_WRITE_ACK)
db830c46 1608 drbd_set_in_sync(mdev, sector, peer_req->i.size);
b411b363 1609 } else {
db830c46 1610 ok = drbd_send_ack(mdev, P_NEG_ACK, peer_req);
b411b363
PR
1611 /* we expect it to be marked out of sync anyways...
1612 * maybe assert this? */
1613 }
1614 dec_unacked(mdev);
1615 }
1616 /* we delete from the conflict detection hash _after_ we sent out the
1617 * P_WRITE_ACK / P_NEG_ACK, to get the sequence number right. */
89e58e75 1618 if (mdev->tconn->net_conf->two_primaries) {
87eeee41 1619 spin_lock_irq(&mdev->tconn->req_lock);
db830c46
AG
1620 D_ASSERT(!drbd_interval_empty(&peer_req->i));
1621 drbd_remove_epoch_entry_interval(mdev, peer_req);
87eeee41 1622 spin_unlock_irq(&mdev->tconn->req_lock);
bb3bfe96 1623 } else
db830c46 1624 D_ASSERT(drbd_interval_empty(&peer_req->i));
b411b363 1625
db830c46 1626 drbd_may_finish_epoch(mdev, peer_req->epoch, EV_PUT + (cancel ? EV_CLEANUP : 0));
b411b363
PR
1627
1628 return ok;
1629}
1630
00d56944 1631static int e_send_discard_ack(struct drbd_work *w, int unused)
b411b363 1632{
db830c46 1633 struct drbd_peer_request *peer_req = (struct drbd_peer_request *)w;
00d56944 1634 struct drbd_conf *mdev = w->mdev;
b411b363
PR
1635 int ok = 1;
1636
89e58e75 1637 D_ASSERT(mdev->tconn->net_conf->wire_protocol == DRBD_PROT_C);
db830c46 1638 ok = drbd_send_ack(mdev, P_DISCARD_ACK, peer_req);
b411b363 1639
87eeee41 1640 spin_lock_irq(&mdev->tconn->req_lock);
db830c46
AG
1641 D_ASSERT(!drbd_interval_empty(&peer_req->i));
1642 drbd_remove_epoch_entry_interval(mdev, peer_req);
87eeee41 1643 spin_unlock_irq(&mdev->tconn->req_lock);
b411b363
PR
1644
1645 dec_unacked(mdev);
1646
1647 return ok;
1648}
1649
3e394da1
AG
1650static bool seq_greater(u32 a, u32 b)
1651{
1652 /*
1653 * We assume 32-bit wrap-around here.
1654 * For 24-bit wrap-around, we would have to shift:
1655 * a <<= 8; b <<= 8;
1656 */
1657 return (s32)a - (s32)b > 0;
1658}
1659
1660static u32 seq_max(u32 a, u32 b)
1661{
1662 return seq_greater(a, b) ? a : b;
1663}
1664
43ae077d 1665static void update_peer_seq(struct drbd_conf *mdev, unsigned int peer_seq)
3e394da1 1666{
43ae077d 1667 unsigned int old_peer_seq;
3e394da1
AG
1668
1669 spin_lock(&mdev->peer_seq_lock);
43ae077d
AG
1670 old_peer_seq = mdev->peer_seq;
1671 mdev->peer_seq = seq_max(mdev->peer_seq, peer_seq);
3e394da1 1672 spin_unlock(&mdev->peer_seq_lock);
43ae077d 1673 if (old_peer_seq != peer_seq)
3e394da1
AG
1674 wake_up(&mdev->seq_wait);
1675}
1676
b411b363
PR
1677/* Called from receive_Data.
1678 * Synchronize packets on sock with packets on msock.
1679 *
1680 * This is here so even when a P_DATA packet traveling via sock overtook an Ack
1681 * packet traveling on msock, they are still processed in the order they have
1682 * been sent.
1683 *
1684 * Note: we don't care for Ack packets overtaking P_DATA packets.
1685 *
1686 * In case packet_seq is larger than mdev->peer_seq number, there are
1687 * outstanding packets on the msock. We wait for them to arrive.
1688 * In case we are the logically next packet, we update mdev->peer_seq
1689 * ourselves. Correctly handles 32bit wrap around.
1690 *
1691 * Assume we have a 10 GBit connection, that is about 1<<30 byte per second,
1692 * about 1<<21 sectors per second. So "worst" case, we have 1<<3 == 8 seconds
1693 * for the 24bit wrap (historical atomic_t guarantee on some archs), and we have
1694 * 1<<9 == 512 seconds aka ages for the 32bit wrap around...
1695 *
1696 * returns 0 if we may process the packet,
1697 * -ERESTARTSYS if we were interrupted (by disconnect signal). */
1698static int drbd_wait_peer_seq(struct drbd_conf *mdev, const u32 packet_seq)
1699{
1700 DEFINE_WAIT(wait);
1701 unsigned int p_seq;
1702 long timeout;
1703 int ret = 0;
1704 spin_lock(&mdev->peer_seq_lock);
1705 for (;;) {
1706 prepare_to_wait(&mdev->seq_wait, &wait, TASK_INTERRUPTIBLE);
3e394da1 1707 if (!seq_greater(packet_seq, mdev->peer_seq + 1))
b411b363
PR
1708 break;
1709 if (signal_pending(current)) {
1710 ret = -ERESTARTSYS;
1711 break;
1712 }
1713 p_seq = mdev->peer_seq;
1714 spin_unlock(&mdev->peer_seq_lock);
1715 timeout = schedule_timeout(30*HZ);
1716 spin_lock(&mdev->peer_seq_lock);
1717 if (timeout == 0 && p_seq == mdev->peer_seq) {
1718 ret = -ETIMEDOUT;
1719 dev_err(DEV, "ASSERT FAILED waited 30 seconds for sequence update, forcing reconnect\n");
1720 break;
1721 }
1722 }
1723 finish_wait(&mdev->seq_wait, &wait);
1724 if (mdev->peer_seq+1 == packet_seq)
1725 mdev->peer_seq++;
1726 spin_unlock(&mdev->peer_seq_lock);
1727 return ret;
1728}
1729
688593c5
LE
1730/* see also bio_flags_to_wire()
1731 * DRBD_REQ_*, because we need to semantically map the flags to data packet
1732 * flags and back. We may replicate to other kernel versions. */
1733static unsigned long wire_flags_to_bio(struct drbd_conf *mdev, u32 dpf)
76d2e7ec 1734{
688593c5
LE
1735 return (dpf & DP_RW_SYNC ? REQ_SYNC : 0) |
1736 (dpf & DP_FUA ? REQ_FUA : 0) |
1737 (dpf & DP_FLUSH ? REQ_FLUSH : 0) |
1738 (dpf & DP_DISCARD ? REQ_DISCARD : 0);
76d2e7ec
PR
1739}
1740
b411b363 1741/* mirrored write */
d8763023
AG
1742static int receive_Data(struct drbd_conf *mdev, enum drbd_packet cmd,
1743 unsigned int data_size)
b411b363
PR
1744{
1745 sector_t sector;
db830c46 1746 struct drbd_peer_request *peer_req;
e42325a5 1747 struct p_data *p = &mdev->tconn->data.rbuf.data;
b411b363
PR
1748 int rw = WRITE;
1749 u32 dp_flags;
1750
b411b363 1751 if (!get_ldev(mdev)) {
b411b363
PR
1752 spin_lock(&mdev->peer_seq_lock);
1753 if (mdev->peer_seq+1 == be32_to_cpu(p->seq_num))
1754 mdev->peer_seq++;
1755 spin_unlock(&mdev->peer_seq_lock);
1756
2b2bf214 1757 drbd_send_ack_dp(mdev, P_NEG_ACK, p, data_size);
b411b363
PR
1758 atomic_inc(&mdev->current_epoch->epoch_size);
1759 return drbd_drain_block(mdev, data_size);
1760 }
1761
1762 /* get_ldev(mdev) successful.
1763 * Corresponding put_ldev done either below (on various errors),
9c50842a 1764 * or in drbd_endio_sec, if we successfully submit the data at
b411b363
PR
1765 * the end of this function. */
1766
1767 sector = be64_to_cpu(p->sector);
db830c46
AG
1768 peer_req = read_in_block(mdev, p->block_id, sector, data_size);
1769 if (!peer_req) {
b411b363 1770 put_ldev(mdev);
81e84650 1771 return false;
b411b363
PR
1772 }
1773
db830c46 1774 peer_req->w.cb = e_end_block;
b411b363 1775
688593c5
LE
1776 dp_flags = be32_to_cpu(p->dp_flags);
1777 rw |= wire_flags_to_bio(mdev, dp_flags);
1778
1779 if (dp_flags & DP_MAY_SET_IN_SYNC)
db830c46 1780 peer_req->flags |= EE_MAY_SET_IN_SYNC;
688593c5 1781
b411b363 1782 spin_lock(&mdev->epoch_lock);
db830c46
AG
1783 peer_req->epoch = mdev->current_epoch;
1784 atomic_inc(&peer_req->epoch->epoch_size);
1785 atomic_inc(&peer_req->epoch->active);
b411b363
PR
1786 spin_unlock(&mdev->epoch_lock);
1787
b411b363 1788 /* I'm the receiver, I do hold a net_cnt reference. */
89e58e75 1789 if (!mdev->tconn->net_conf->two_primaries) {
87eeee41 1790 spin_lock_irq(&mdev->tconn->req_lock);
b411b363
PR
1791 } else {
1792 /* don't get the req_lock yet,
1793 * we may sleep in drbd_wait_peer_seq */
db830c46 1794 const int size = peer_req->i.size;
25703f83 1795 const int discard = test_bit(DISCARD_CONCURRENT, &mdev->tconn->flags);
b411b363 1796 DEFINE_WAIT(wait);
b411b363
PR
1797 int first;
1798
89e58e75 1799 D_ASSERT(mdev->tconn->net_conf->wire_protocol == DRBD_PROT_C);
b411b363
PR
1800
1801 /* conflict detection and handling:
1802 * 1. wait on the sequence number,
1803 * in case this data packet overtook ACK packets.
5e472264 1804 * 2. check for conflicting write requests.
b411b363
PR
1805 *
1806 * Note: for two_primaries, we are protocol C,
1807 * so there cannot be any request that is DONE
1808 * but still on the transfer log.
1809 *
b411b363
PR
1810 * if no conflicting request is found:
1811 * submit.
1812 *
1813 * if any conflicting request is found
1814 * that has not yet been acked,
1815 * AND I have the "discard concurrent writes" flag:
1816 * queue (via done_ee) the P_DISCARD_ACK; OUT.
1817 *
1818 * if any conflicting request is found:
1819 * block the receiver, waiting on misc_wait
1820 * until no more conflicting requests are there,
1821 * or we get interrupted (disconnect).
1822 *
1823 * we do not just write after local io completion of those
1824 * requests, but only after req is done completely, i.e.
1825 * we wait for the P_DISCARD_ACK to arrive!
1826 *
1827 * then proceed normally, i.e. submit.
1828 */
1829 if (drbd_wait_peer_seq(mdev, be32_to_cpu(p->seq_num)))
1830 goto out_interrupted;
1831
87eeee41 1832 spin_lock_irq(&mdev->tconn->req_lock);
b411b363 1833
b411b363
PR
1834 first = 1;
1835 for (;;) {
de696716 1836 struct drbd_interval *i;
b411b363
PR
1837 int have_unacked = 0;
1838 int have_conflict = 0;
1839 prepare_to_wait(&mdev->misc_wait, &wait,
1840 TASK_INTERRUPTIBLE);
de696716
AG
1841
1842 i = drbd_find_overlap(&mdev->write_requests, sector, size);
1843 if (i) {
de696716
AG
1844 /* only ALERT on first iteration,
1845 * we may be woken up early... */
1846 if (first)
5e472264 1847 dev_alert(DEV, "%s[%u] Concurrent %s write detected!"
de696716
AG
1848 " new: %llus +%u; pending: %llus +%u\n",
1849 current->comm, current->pid,
5e472264 1850 i->local ? "local" : "remote",
de696716 1851 (unsigned long long)sector, size,
5e472264
AG
1852 (unsigned long long)i->sector, i->size);
1853
1854 if (i->local) {
1855 struct drbd_request *req2;
1856
1857 req2 = container_of(i, struct drbd_request, i);
1858 if (req2->rq_state & RQ_NET_PENDING)
1859 ++have_unacked;
1860 }
de696716 1861 ++have_conflict;
b411b363 1862 }
b411b363
PR
1863 if (!have_conflict)
1864 break;
1865
1866 /* Discard Ack only for the _first_ iteration */
1867 if (first && discard && have_unacked) {
1868 dev_alert(DEV, "Concurrent write! [DISCARD BY FLAG] sec=%llus\n",
1869 (unsigned long long)sector);
1870 inc_unacked(mdev);
db830c46
AG
1871 peer_req->w.cb = e_send_discard_ack;
1872 list_add_tail(&peer_req->w.list, &mdev->done_ee);
b411b363 1873
87eeee41 1874 spin_unlock_irq(&mdev->tconn->req_lock);
b411b363
PR
1875
1876 /* we could probably send that P_DISCARD_ACK ourselves,
1877 * but I don't like the receiver using the msock */
1878
1879 put_ldev(mdev);
0625ac19 1880 wake_asender(mdev->tconn);
b411b363 1881 finish_wait(&mdev->misc_wait, &wait);
81e84650 1882 return true;
b411b363
PR
1883 }
1884
1885 if (signal_pending(current)) {
87eeee41 1886 spin_unlock_irq(&mdev->tconn->req_lock);
b411b363
PR
1887 finish_wait(&mdev->misc_wait, &wait);
1888 goto out_interrupted;
1889 }
1890
a500c2ef 1891 /* Indicate to wake up mdev->misc_wait upon completion. */
53840641 1892 i->waiting = true;
a500c2ef 1893
87eeee41 1894 spin_unlock_irq(&mdev->tconn->req_lock);
b411b363
PR
1895 if (first) {
1896 first = 0;
1897 dev_alert(DEV, "Concurrent write! [W AFTERWARDS] "
1898 "sec=%llus\n", (unsigned long long)sector);
1899 } else if (discard) {
1900 /* we had none on the first iteration.
1901 * there must be none now. */
1902 D_ASSERT(have_unacked == 0);
1903 }
1904 schedule();
87eeee41 1905 spin_lock_irq(&mdev->tconn->req_lock);
b411b363
PR
1906 }
1907 finish_wait(&mdev->misc_wait, &wait);
5e472264 1908
db830c46 1909 drbd_insert_interval(&mdev->write_requests, &peer_req->i);
b411b363
PR
1910 }
1911
db830c46 1912 list_add(&peer_req->w.list, &mdev->active_ee);
87eeee41 1913 spin_unlock_irq(&mdev->tconn->req_lock);
b411b363 1914
89e58e75 1915 switch (mdev->tconn->net_conf->wire_protocol) {
b411b363
PR
1916 case DRBD_PROT_C:
1917 inc_unacked(mdev);
1918 /* corresponding dec_unacked() in e_end_block()
1919 * respective _drbd_clear_done_ee */
1920 break;
1921 case DRBD_PROT_B:
1922 /* I really don't like it that the receiver thread
1923 * sends on the msock, but anyways */
db830c46 1924 drbd_send_ack(mdev, P_RECV_ACK, peer_req);
b411b363
PR
1925 break;
1926 case DRBD_PROT_A:
1927 /* nothing to do */
1928 break;
1929 }
1930
6719fb03 1931 if (mdev->state.pdsk < D_INCONSISTENT) {
b411b363 1932 /* In case we have the only disk of the cluster, */
db830c46
AG
1933 drbd_set_out_of_sync(mdev, peer_req->i.sector, peer_req->i.size);
1934 peer_req->flags |= EE_CALL_AL_COMPLETE_IO;
1935 peer_req->flags &= ~EE_MAY_SET_IN_SYNC;
1936 drbd_al_begin_io(mdev, peer_req->i.sector);
b411b363
PR
1937 }
1938
db830c46 1939 if (drbd_submit_ee(mdev, peer_req, rw, DRBD_FAULT_DT_WR) == 0)
81e84650 1940 return true;
b411b363 1941
10f6d992
LE
1942 /* don't care for the reason here */
1943 dev_err(DEV, "submit failed, triggering re-connect\n");
87eeee41 1944 spin_lock_irq(&mdev->tconn->req_lock);
db830c46
AG
1945 list_del(&peer_req->w.list);
1946 drbd_remove_epoch_entry_interval(mdev, peer_req);
87eeee41 1947 spin_unlock_irq(&mdev->tconn->req_lock);
db830c46
AG
1948 if (peer_req->flags & EE_CALL_AL_COMPLETE_IO)
1949 drbd_al_complete_io(mdev, peer_req->i.sector);
22cc37a9 1950
b411b363 1951out_interrupted:
db830c46 1952 drbd_may_finish_epoch(mdev, peer_req->epoch, EV_PUT + EV_CLEANUP);
b411b363 1953 put_ldev(mdev);
db830c46 1954 drbd_free_ee(mdev, peer_req);
81e84650 1955 return false;
b411b363
PR
1956}
1957
0f0601f4
LE
1958/* We may throttle resync, if the lower device seems to be busy,
1959 * and current sync rate is above c_min_rate.
1960 *
1961 * To decide whether or not the lower device is busy, we use a scheme similar
1962 * to MD RAID is_mddev_idle(): if the partition stats reveal "significant"
1963 * (more than 64 sectors) of activity we cannot account for with our own resync
1964 * activity, it obviously is "busy".
1965 *
1966 * The current sync rate used here uses only the most recent two step marks,
1967 * to have a short time average so we can react faster.
1968 */
e3555d85 1969int drbd_rs_should_slow_down(struct drbd_conf *mdev, sector_t sector)
0f0601f4
LE
1970{
1971 struct gendisk *disk = mdev->ldev->backing_bdev->bd_contains->bd_disk;
1972 unsigned long db, dt, dbdt;
e3555d85 1973 struct lc_element *tmp;
0f0601f4
LE
1974 int curr_events;
1975 int throttle = 0;
1976
1977 /* feature disabled? */
1978 if (mdev->sync_conf.c_min_rate == 0)
1979 return 0;
1980
e3555d85
PR
1981 spin_lock_irq(&mdev->al_lock);
1982 tmp = lc_find(mdev->resync, BM_SECT_TO_EXT(sector));
1983 if (tmp) {
1984 struct bm_extent *bm_ext = lc_entry(tmp, struct bm_extent, lce);
1985 if (test_bit(BME_PRIORITY, &bm_ext->flags)) {
1986 spin_unlock_irq(&mdev->al_lock);
1987 return 0;
1988 }
1989 /* Do not slow down if app IO is already waiting for this extent */
1990 }
1991 spin_unlock_irq(&mdev->al_lock);
1992
0f0601f4
LE
1993 curr_events = (int)part_stat_read(&disk->part0, sectors[0]) +
1994 (int)part_stat_read(&disk->part0, sectors[1]) -
1995 atomic_read(&mdev->rs_sect_ev);
e3555d85 1996
0f0601f4
LE
1997 if (!mdev->rs_last_events || curr_events - mdev->rs_last_events > 64) {
1998 unsigned long rs_left;
1999 int i;
2000
2001 mdev->rs_last_events = curr_events;
2002
2003 /* sync speed average over the last 2*DRBD_SYNC_MARK_STEP,
2004 * approx. */
2649f080
LE
2005 i = (mdev->rs_last_mark + DRBD_SYNC_MARKS-1) % DRBD_SYNC_MARKS;
2006
2007 if (mdev->state.conn == C_VERIFY_S || mdev->state.conn == C_VERIFY_T)
2008 rs_left = mdev->ov_left;
2009 else
2010 rs_left = drbd_bm_total_weight(mdev) - mdev->rs_failed;
0f0601f4
LE
2011
2012 dt = ((long)jiffies - (long)mdev->rs_mark_time[i]) / HZ;
2013 if (!dt)
2014 dt++;
2015 db = mdev->rs_mark_left[i] - rs_left;
2016 dbdt = Bit2KB(db/dt);
2017
2018 if (dbdt > mdev->sync_conf.c_min_rate)
2019 throttle = 1;
2020 }
2021 return throttle;
2022}
2023
2024
d8763023
AG
2025static int receive_DataRequest(struct drbd_conf *mdev, enum drbd_packet cmd,
2026 unsigned int digest_size)
b411b363
PR
2027{
2028 sector_t sector;
2029 const sector_t capacity = drbd_get_capacity(mdev->this_bdev);
db830c46 2030 struct drbd_peer_request *peer_req;
b411b363 2031 struct digest_info *di = NULL;
b18b37be 2032 int size, verb;
b411b363 2033 unsigned int fault_type;
e42325a5 2034 struct p_block_req *p = &mdev->tconn->data.rbuf.block_req;
b411b363
PR
2035
2036 sector = be64_to_cpu(p->sector);
2037 size = be32_to_cpu(p->blksize);
2038
1816a2b4 2039 if (size <= 0 || (size & 0x1ff) != 0 || size > DRBD_MAX_BIO_SIZE) {
b411b363
PR
2040 dev_err(DEV, "%s:%d: sector: %llus, size: %u\n", __FILE__, __LINE__,
2041 (unsigned long long)sector, size);
81e84650 2042 return false;
b411b363
PR
2043 }
2044 if (sector + (size>>9) > capacity) {
2045 dev_err(DEV, "%s:%d: sector: %llus, size: %u\n", __FILE__, __LINE__,
2046 (unsigned long long)sector, size);
81e84650 2047 return false;
b411b363
PR
2048 }
2049
2050 if (!get_ldev_if_state(mdev, D_UP_TO_DATE)) {
b18b37be
PR
2051 verb = 1;
2052 switch (cmd) {
2053 case P_DATA_REQUEST:
2054 drbd_send_ack_rp(mdev, P_NEG_DREPLY, p);
2055 break;
2056 case P_RS_DATA_REQUEST:
2057 case P_CSUM_RS_REQUEST:
2058 case P_OV_REQUEST:
2059 drbd_send_ack_rp(mdev, P_NEG_RS_DREPLY , p);
2060 break;
2061 case P_OV_REPLY:
2062 verb = 0;
2063 dec_rs_pending(mdev);
2064 drbd_send_ack_ex(mdev, P_OV_RESULT, sector, size, ID_IN_SYNC);
2065 break;
2066 default:
2067 dev_err(DEV, "unexpected command (%s) in receive_DataRequest\n",
2068 cmdname(cmd));
2069 }
2070 if (verb && __ratelimit(&drbd_ratelimit_state))
b411b363
PR
2071 dev_err(DEV, "Can not satisfy peer's read request, "
2072 "no local data.\n");
b18b37be 2073
a821cc4a
LE
2074 /* drain possibly payload */
2075 return drbd_drain_block(mdev, digest_size);
b411b363
PR
2076 }
2077
2078 /* GFP_NOIO, because we must not cause arbitrary write-out: in a DRBD
2079 * "criss-cross" setup, that might cause write-out on some other DRBD,
2080 * which in turn might block on the other node at this very place. */
db830c46
AG
2081 peer_req = drbd_alloc_ee(mdev, p->block_id, sector, size, GFP_NOIO);
2082 if (!peer_req) {
b411b363 2083 put_ldev(mdev);
81e84650 2084 return false;
b411b363
PR
2085 }
2086
02918be2 2087 switch (cmd) {
b411b363 2088 case P_DATA_REQUEST:
db830c46 2089 peer_req->w.cb = w_e_end_data_req;
b411b363 2090 fault_type = DRBD_FAULT_DT_RD;
80a40e43
LE
2091 /* application IO, don't drbd_rs_begin_io */
2092 goto submit;
2093
b411b363 2094 case P_RS_DATA_REQUEST:
db830c46 2095 peer_req->w.cb = w_e_end_rsdata_req;
b411b363 2096 fault_type = DRBD_FAULT_RS_RD;
5f9915bb
LE
2097 /* used in the sector offset progress display */
2098 mdev->bm_resync_fo = BM_SECT_TO_BIT(sector);
b411b363
PR
2099 break;
2100
2101 case P_OV_REPLY:
2102 case P_CSUM_RS_REQUEST:
2103 fault_type = DRBD_FAULT_RS_RD;
b411b363
PR
2104 di = kmalloc(sizeof(*di) + digest_size, GFP_NOIO);
2105 if (!di)
2106 goto out_free_e;
2107
2108 di->digest_size = digest_size;
2109 di->digest = (((char *)di)+sizeof(struct digest_info));
2110
db830c46
AG
2111 peer_req->digest = di;
2112 peer_req->flags |= EE_HAS_DIGEST;
c36c3ced 2113
de0ff338 2114 if (drbd_recv(mdev->tconn, di->digest, digest_size) != digest_size)
b411b363
PR
2115 goto out_free_e;
2116
02918be2 2117 if (cmd == P_CSUM_RS_REQUEST) {
31890f4a 2118 D_ASSERT(mdev->tconn->agreed_pro_version >= 89);
db830c46 2119 peer_req->w.cb = w_e_end_csum_rs_req;
5f9915bb
LE
2120 /* used in the sector offset progress display */
2121 mdev->bm_resync_fo = BM_SECT_TO_BIT(sector);
02918be2 2122 } else if (cmd == P_OV_REPLY) {
2649f080
LE
2123 /* track progress, we may need to throttle */
2124 atomic_add(size >> 9, &mdev->rs_sect_in);
db830c46 2125 peer_req->w.cb = w_e_end_ov_reply;
b411b363 2126 dec_rs_pending(mdev);
0f0601f4
LE
2127 /* drbd_rs_begin_io done when we sent this request,
2128 * but accounting still needs to be done. */
2129 goto submit_for_resync;
b411b363
PR
2130 }
2131 break;
2132
2133 case P_OV_REQUEST:
b411b363 2134 if (mdev->ov_start_sector == ~(sector_t)0 &&
31890f4a 2135 mdev->tconn->agreed_pro_version >= 90) {
de228bba
LE
2136 unsigned long now = jiffies;
2137 int i;
b411b363
PR
2138 mdev->ov_start_sector = sector;
2139 mdev->ov_position = sector;
30b743a2
LE
2140 mdev->ov_left = drbd_bm_bits(mdev) - BM_SECT_TO_BIT(sector);
2141 mdev->rs_total = mdev->ov_left;
de228bba
LE
2142 for (i = 0; i < DRBD_SYNC_MARKS; i++) {
2143 mdev->rs_mark_left[i] = mdev->ov_left;
2144 mdev->rs_mark_time[i] = now;
2145 }
b411b363
PR
2146 dev_info(DEV, "Online Verify start sector: %llu\n",
2147 (unsigned long long)sector);
2148 }
db830c46 2149 peer_req->w.cb = w_e_end_ov_req;
b411b363 2150 fault_type = DRBD_FAULT_RS_RD;
b411b363
PR
2151 break;
2152
b411b363
PR
2153 default:
2154 dev_err(DEV, "unexpected command (%s) in receive_DataRequest\n",
02918be2 2155 cmdname(cmd));
b411b363 2156 fault_type = DRBD_FAULT_MAX;
80a40e43 2157 goto out_free_e;
b411b363
PR
2158 }
2159
0f0601f4
LE
2160 /* Throttle, drbd_rs_begin_io and submit should become asynchronous
2161 * wrt the receiver, but it is not as straightforward as it may seem.
2162 * Various places in the resync start and stop logic assume resync
2163 * requests are processed in order, requeuing this on the worker thread
2164 * introduces a bunch of new code for synchronization between threads.
2165 *
2166 * Unlimited throttling before drbd_rs_begin_io may stall the resync
2167 * "forever", throttling after drbd_rs_begin_io will lock that extent
2168 * for application writes for the same time. For now, just throttle
2169 * here, where the rest of the code expects the receiver to sleep for
2170 * a while, anyways.
2171 */
2172
2173 /* Throttle before drbd_rs_begin_io, as that locks out application IO;
2174 * this defers syncer requests for some time, before letting at least
2175 * on request through. The resync controller on the receiving side
2176 * will adapt to the incoming rate accordingly.
2177 *
2178 * We cannot throttle here if remote is Primary/SyncTarget:
2179 * we would also throttle its application reads.
2180 * In that case, throttling is done on the SyncTarget only.
2181 */
e3555d85
PR
2182 if (mdev->state.peer != R_PRIMARY && drbd_rs_should_slow_down(mdev, sector))
2183 schedule_timeout_uninterruptible(HZ/10);
2184 if (drbd_rs_begin_io(mdev, sector))
80a40e43 2185 goto out_free_e;
b411b363 2186
0f0601f4
LE
2187submit_for_resync:
2188 atomic_add(size >> 9, &mdev->rs_sect_ev);
2189
80a40e43 2190submit:
b411b363 2191 inc_unacked(mdev);
87eeee41 2192 spin_lock_irq(&mdev->tconn->req_lock);
db830c46 2193 list_add_tail(&peer_req->w.list, &mdev->read_ee);
87eeee41 2194 spin_unlock_irq(&mdev->tconn->req_lock);
b411b363 2195
db830c46 2196 if (drbd_submit_ee(mdev, peer_req, READ, fault_type) == 0)
81e84650 2197 return true;
b411b363 2198
10f6d992
LE
2199 /* don't care for the reason here */
2200 dev_err(DEV, "submit failed, triggering re-connect\n");
87eeee41 2201 spin_lock_irq(&mdev->tconn->req_lock);
db830c46 2202 list_del(&peer_req->w.list);
87eeee41 2203 spin_unlock_irq(&mdev->tconn->req_lock);
22cc37a9
LE
2204 /* no drbd_rs_complete_io(), we are dropping the connection anyways */
2205
b411b363 2206out_free_e:
b411b363 2207 put_ldev(mdev);
db830c46 2208 drbd_free_ee(mdev, peer_req);
81e84650 2209 return false;
b411b363
PR
2210}
2211
2212static int drbd_asb_recover_0p(struct drbd_conf *mdev) __must_hold(local)
2213{
2214 int self, peer, rv = -100;
2215 unsigned long ch_self, ch_peer;
2216
2217 self = mdev->ldev->md.uuid[UI_BITMAP] & 1;
2218 peer = mdev->p_uuid[UI_BITMAP] & 1;
2219
2220 ch_peer = mdev->p_uuid[UI_SIZE];
2221 ch_self = mdev->comm_bm_set;
2222
89e58e75 2223 switch (mdev->tconn->net_conf->after_sb_0p) {
b411b363
PR
2224 case ASB_CONSENSUS:
2225 case ASB_DISCARD_SECONDARY:
2226 case ASB_CALL_HELPER:
2227 dev_err(DEV, "Configuration error.\n");
2228 break;
2229 case ASB_DISCONNECT:
2230 break;
2231 case ASB_DISCARD_YOUNGER_PRI:
2232 if (self == 0 && peer == 1) {
2233 rv = -1;
2234 break;
2235 }
2236 if (self == 1 && peer == 0) {
2237 rv = 1;
2238 break;
2239 }
2240 /* Else fall through to one of the other strategies... */
2241 case ASB_DISCARD_OLDER_PRI:
2242 if (self == 0 && peer == 1) {
2243 rv = 1;
2244 break;
2245 }
2246 if (self == 1 && peer == 0) {
2247 rv = -1;
2248 break;
2249 }
2250 /* Else fall through to one of the other strategies... */
ad19bf6e 2251 dev_warn(DEV, "Discard younger/older primary did not find a decision\n"
b411b363
PR
2252 "Using discard-least-changes instead\n");
2253 case ASB_DISCARD_ZERO_CHG:
2254 if (ch_peer == 0 && ch_self == 0) {
25703f83 2255 rv = test_bit(DISCARD_CONCURRENT, &mdev->tconn->flags)
b411b363
PR
2256 ? -1 : 1;
2257 break;
2258 } else {
2259 if (ch_peer == 0) { rv = 1; break; }
2260 if (ch_self == 0) { rv = -1; break; }
2261 }
89e58e75 2262 if (mdev->tconn->net_conf->after_sb_0p == ASB_DISCARD_ZERO_CHG)
b411b363
PR
2263 break;
2264 case ASB_DISCARD_LEAST_CHG:
2265 if (ch_self < ch_peer)
2266 rv = -1;
2267 else if (ch_self > ch_peer)
2268 rv = 1;
2269 else /* ( ch_self == ch_peer ) */
2270 /* Well, then use something else. */
25703f83 2271 rv = test_bit(DISCARD_CONCURRENT, &mdev->tconn->flags)
b411b363
PR
2272 ? -1 : 1;
2273 break;
2274 case ASB_DISCARD_LOCAL:
2275 rv = -1;
2276 break;
2277 case ASB_DISCARD_REMOTE:
2278 rv = 1;
2279 }
2280
2281 return rv;
2282}
2283
2284static int drbd_asb_recover_1p(struct drbd_conf *mdev) __must_hold(local)
2285{
6184ea21 2286 int hg, rv = -100;
b411b363 2287
89e58e75 2288 switch (mdev->tconn->net_conf->after_sb_1p) {
b411b363
PR
2289 case ASB_DISCARD_YOUNGER_PRI:
2290 case ASB_DISCARD_OLDER_PRI:
2291 case ASB_DISCARD_LEAST_CHG:
2292 case ASB_DISCARD_LOCAL:
2293 case ASB_DISCARD_REMOTE:
2294 dev_err(DEV, "Configuration error.\n");
2295 break;
2296 case ASB_DISCONNECT:
2297 break;
2298 case ASB_CONSENSUS:
2299 hg = drbd_asb_recover_0p(mdev);
2300 if (hg == -1 && mdev->state.role == R_SECONDARY)
2301 rv = hg;
2302 if (hg == 1 && mdev->state.role == R_PRIMARY)
2303 rv = hg;
2304 break;
2305 case ASB_VIOLENTLY:
2306 rv = drbd_asb_recover_0p(mdev);
2307 break;
2308 case ASB_DISCARD_SECONDARY:
2309 return mdev->state.role == R_PRIMARY ? 1 : -1;
2310 case ASB_CALL_HELPER:
2311 hg = drbd_asb_recover_0p(mdev);
2312 if (hg == -1 && mdev->state.role == R_PRIMARY) {
bb437946
AG
2313 enum drbd_state_rv rv2;
2314
2315 drbd_set_role(mdev, R_SECONDARY, 0);
b411b363
PR
2316 /* drbd_change_state() does not sleep while in SS_IN_TRANSIENT_STATE,
2317 * we might be here in C_WF_REPORT_PARAMS which is transient.
2318 * we do not need to wait for the after state change work either. */
bb437946
AG
2319 rv2 = drbd_change_state(mdev, CS_VERBOSE, NS(role, R_SECONDARY));
2320 if (rv2 != SS_SUCCESS) {
b411b363
PR
2321 drbd_khelper(mdev, "pri-lost-after-sb");
2322 } else {
2323 dev_warn(DEV, "Successfully gave up primary role.\n");
2324 rv = hg;
2325 }
2326 } else
2327 rv = hg;
2328 }
2329
2330 return rv;
2331}
2332
2333static int drbd_asb_recover_2p(struct drbd_conf *mdev) __must_hold(local)
2334{
6184ea21 2335 int hg, rv = -100;
b411b363 2336
89e58e75 2337 switch (mdev->tconn->net_conf->after_sb_2p) {
b411b363
PR
2338 case ASB_DISCARD_YOUNGER_PRI:
2339 case ASB_DISCARD_OLDER_PRI:
2340 case ASB_DISCARD_LEAST_CHG:
2341 case ASB_DISCARD_LOCAL:
2342 case ASB_DISCARD_REMOTE:
2343 case ASB_CONSENSUS:
2344 case ASB_DISCARD_SECONDARY:
2345 dev_err(DEV, "Configuration error.\n");
2346 break;
2347 case ASB_VIOLENTLY:
2348 rv = drbd_asb_recover_0p(mdev);
2349 break;
2350 case ASB_DISCONNECT:
2351 break;
2352 case ASB_CALL_HELPER:
2353 hg = drbd_asb_recover_0p(mdev);
2354 if (hg == -1) {
bb437946
AG
2355 enum drbd_state_rv rv2;
2356
b411b363
PR
2357 /* drbd_change_state() does not sleep while in SS_IN_TRANSIENT_STATE,
2358 * we might be here in C_WF_REPORT_PARAMS which is transient.
2359 * we do not need to wait for the after state change work either. */
bb437946
AG
2360 rv2 = drbd_change_state(mdev, CS_VERBOSE, NS(role, R_SECONDARY));
2361 if (rv2 != SS_SUCCESS) {
b411b363
PR
2362 drbd_khelper(mdev, "pri-lost-after-sb");
2363 } else {
2364 dev_warn(DEV, "Successfully gave up primary role.\n");
2365 rv = hg;
2366 }
2367 } else
2368 rv = hg;
2369 }
2370
2371 return rv;
2372}
2373
2374static void drbd_uuid_dump(struct drbd_conf *mdev, char *text, u64 *uuid,
2375 u64 bits, u64 flags)
2376{
2377 if (!uuid) {
2378 dev_info(DEV, "%s uuid info vanished while I was looking!\n", text);
2379 return;
2380 }
2381 dev_info(DEV, "%s %016llX:%016llX:%016llX:%016llX bits:%llu flags:%llX\n",
2382 text,
2383 (unsigned long long)uuid[UI_CURRENT],
2384 (unsigned long long)uuid[UI_BITMAP],
2385 (unsigned long long)uuid[UI_HISTORY_START],
2386 (unsigned long long)uuid[UI_HISTORY_END],
2387 (unsigned long long)bits,
2388 (unsigned long long)flags);
2389}
2390
2391/*
2392 100 after split brain try auto recover
2393 2 C_SYNC_SOURCE set BitMap
2394 1 C_SYNC_SOURCE use BitMap
2395 0 no Sync
2396 -1 C_SYNC_TARGET use BitMap
2397 -2 C_SYNC_TARGET set BitMap
2398 -100 after split brain, disconnect
2399-1000 unrelated data
4a23f264
PR
2400-1091 requires proto 91
2401-1096 requires proto 96
b411b363
PR
2402 */
2403static int drbd_uuid_compare(struct drbd_conf *mdev, int *rule_nr) __must_hold(local)
2404{
2405 u64 self, peer;
2406 int i, j;
2407
2408 self = mdev->ldev->md.uuid[UI_CURRENT] & ~((u64)1);
2409 peer = mdev->p_uuid[UI_CURRENT] & ~((u64)1);
2410
2411 *rule_nr = 10;
2412 if (self == UUID_JUST_CREATED && peer == UUID_JUST_CREATED)
2413 return 0;
2414
2415 *rule_nr = 20;
2416 if ((self == UUID_JUST_CREATED || self == (u64)0) &&
2417 peer != UUID_JUST_CREATED)
2418 return -2;
2419
2420 *rule_nr = 30;
2421 if (self != UUID_JUST_CREATED &&
2422 (peer == UUID_JUST_CREATED || peer == (u64)0))
2423 return 2;
2424
2425 if (self == peer) {
2426 int rct, dc; /* roles at crash time */
2427
2428 if (mdev->p_uuid[UI_BITMAP] == (u64)0 && mdev->ldev->md.uuid[UI_BITMAP] != (u64)0) {
2429
31890f4a 2430 if (mdev->tconn->agreed_pro_version < 91)
4a23f264 2431 return -1091;
b411b363
PR
2432
2433 if ((mdev->ldev->md.uuid[UI_BITMAP] & ~((u64)1)) == (mdev->p_uuid[UI_HISTORY_START] & ~((u64)1)) &&
2434 (mdev->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) == (mdev->p_uuid[UI_HISTORY_START + 1] & ~((u64)1))) {
2435 dev_info(DEV, "was SyncSource, missed the resync finished event, corrected myself:\n");
2436 drbd_uuid_set_bm(mdev, 0UL);
2437
2438 drbd_uuid_dump(mdev, "self", mdev->ldev->md.uuid,
2439 mdev->state.disk >= D_NEGOTIATING ? drbd_bm_total_weight(mdev) : 0, 0);
2440 *rule_nr = 34;
2441 } else {
2442 dev_info(DEV, "was SyncSource (peer failed to write sync_uuid)\n");
2443 *rule_nr = 36;
2444 }
2445
2446 return 1;
2447 }
2448
2449 if (mdev->ldev->md.uuid[UI_BITMAP] == (u64)0 && mdev->p_uuid[UI_BITMAP] != (u64)0) {
2450
31890f4a 2451 if (mdev->tconn->agreed_pro_version < 91)
4a23f264 2452 return -1091;
b411b363
PR
2453
2454 if ((mdev->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) == (mdev->p_uuid[UI_BITMAP] & ~((u64)1)) &&
2455 (mdev->ldev->md.uuid[UI_HISTORY_START + 1] & ~((u64)1)) == (mdev->p_uuid[UI_HISTORY_START] & ~((u64)1))) {
2456 dev_info(DEV, "was SyncTarget, peer missed the resync finished event, corrected peer:\n");
2457
2458 mdev->p_uuid[UI_HISTORY_START + 1] = mdev->p_uuid[UI_HISTORY_START];
2459 mdev->p_uuid[UI_HISTORY_START] = mdev->p_uuid[UI_BITMAP];
2460 mdev->p_uuid[UI_BITMAP] = 0UL;
2461
2462 drbd_uuid_dump(mdev, "peer", mdev->p_uuid, mdev->p_uuid[UI_SIZE], mdev->p_uuid[UI_FLAGS]);
2463 *rule_nr = 35;
2464 } else {
2465 dev_info(DEV, "was SyncTarget (failed to write sync_uuid)\n");
2466 *rule_nr = 37;
2467 }
2468
2469 return -1;
2470 }
2471
2472 /* Common power [off|failure] */
2473 rct = (test_bit(CRASHED_PRIMARY, &mdev->flags) ? 1 : 0) +
2474 (mdev->p_uuid[UI_FLAGS] & 2);
2475 /* lowest bit is set when we were primary,
2476 * next bit (weight 2) is set when peer was primary */
2477 *rule_nr = 40;
2478
2479 switch (rct) {
2480 case 0: /* !self_pri && !peer_pri */ return 0;
2481 case 1: /* self_pri && !peer_pri */ return 1;
2482 case 2: /* !self_pri && peer_pri */ return -1;
2483 case 3: /* self_pri && peer_pri */
25703f83 2484 dc = test_bit(DISCARD_CONCURRENT, &mdev->tconn->flags);
b411b363
PR
2485 return dc ? -1 : 1;
2486 }
2487 }
2488
2489 *rule_nr = 50;
2490 peer = mdev->p_uuid[UI_BITMAP] & ~((u64)1);
2491 if (self == peer)
2492 return -1;
2493
2494 *rule_nr = 51;
2495 peer = mdev->p_uuid[UI_HISTORY_START] & ~((u64)1);
2496 if (self == peer) {
31890f4a 2497 if (mdev->tconn->agreed_pro_version < 96 ?
4a23f264
PR
2498 (mdev->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) ==
2499 (mdev->p_uuid[UI_HISTORY_START + 1] & ~((u64)1)) :
2500 peer + UUID_NEW_BM_OFFSET == (mdev->p_uuid[UI_BITMAP] & ~((u64)1))) {
b411b363
PR
2501 /* The last P_SYNC_UUID did not get though. Undo the last start of
2502 resync as sync source modifications of the peer's UUIDs. */
2503
31890f4a 2504 if (mdev->tconn->agreed_pro_version < 91)
4a23f264 2505 return -1091;
b411b363
PR
2506
2507 mdev->p_uuid[UI_BITMAP] = mdev->p_uuid[UI_HISTORY_START];
2508 mdev->p_uuid[UI_HISTORY_START] = mdev->p_uuid[UI_HISTORY_START + 1];
4a23f264
PR
2509
2510 dev_info(DEV, "Did not got last syncUUID packet, corrected:\n");
2511 drbd_uuid_dump(mdev, "peer", mdev->p_uuid, mdev->p_uuid[UI_SIZE], mdev->p_uuid[UI_FLAGS]);
2512
b411b363
PR
2513 return -1;
2514 }
2515 }
2516
2517 *rule_nr = 60;
2518 self = mdev->ldev->md.uuid[UI_CURRENT] & ~((u64)1);
2519 for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
2520 peer = mdev->p_uuid[i] & ~((u64)1);
2521 if (self == peer)
2522 return -2;
2523 }
2524
2525 *rule_nr = 70;
2526 self = mdev->ldev->md.uuid[UI_BITMAP] & ~((u64)1);
2527 peer = mdev->p_uuid[UI_CURRENT] & ~((u64)1);
2528 if (self == peer)
2529 return 1;
2530
2531 *rule_nr = 71;
2532 self = mdev->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1);
2533 if (self == peer) {
31890f4a 2534 if (mdev->tconn->agreed_pro_version < 96 ?
4a23f264
PR
2535 (mdev->ldev->md.uuid[UI_HISTORY_START + 1] & ~((u64)1)) ==
2536 (mdev->p_uuid[UI_HISTORY_START] & ~((u64)1)) :
2537 self + UUID_NEW_BM_OFFSET == (mdev->ldev->md.uuid[UI_BITMAP] & ~((u64)1))) {
b411b363
PR
2538 /* The last P_SYNC_UUID did not get though. Undo the last start of
2539 resync as sync source modifications of our UUIDs. */
2540
31890f4a 2541 if (mdev->tconn->agreed_pro_version < 91)
4a23f264 2542 return -1091;
b411b363
PR
2543
2544 _drbd_uuid_set(mdev, UI_BITMAP, mdev->ldev->md.uuid[UI_HISTORY_START]);
2545 _drbd_uuid_set(mdev, UI_HISTORY_START, mdev->ldev->md.uuid[UI_HISTORY_START + 1]);
2546
4a23f264 2547 dev_info(DEV, "Last syncUUID did not get through, corrected:\n");
b411b363
PR
2548 drbd_uuid_dump(mdev, "self", mdev->ldev->md.uuid,
2549 mdev->state.disk >= D_NEGOTIATING ? drbd_bm_total_weight(mdev) : 0, 0);
2550
2551 return 1;
2552 }
2553 }
2554
2555
2556 *rule_nr = 80;
d8c2a36b 2557 peer = mdev->p_uuid[UI_CURRENT] & ~((u64)1);
b411b363
PR
2558 for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
2559 self = mdev->ldev->md.uuid[i] & ~((u64)1);
2560 if (self == peer)
2561 return 2;
2562 }
2563
2564 *rule_nr = 90;
2565 self = mdev->ldev->md.uuid[UI_BITMAP] & ~((u64)1);
2566 peer = mdev->p_uuid[UI_BITMAP] & ~((u64)1);
2567 if (self == peer && self != ((u64)0))
2568 return 100;
2569
2570 *rule_nr = 100;
2571 for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
2572 self = mdev->ldev->md.uuid[i] & ~((u64)1);
2573 for (j = UI_HISTORY_START; j <= UI_HISTORY_END; j++) {
2574 peer = mdev->p_uuid[j] & ~((u64)1);
2575 if (self == peer)
2576 return -100;
2577 }
2578 }
2579
2580 return -1000;
2581}
2582
2583/* drbd_sync_handshake() returns the new conn state on success, or
2584 CONN_MASK (-1) on failure.
2585 */
2586static enum drbd_conns drbd_sync_handshake(struct drbd_conf *mdev, enum drbd_role peer_role,
2587 enum drbd_disk_state peer_disk) __must_hold(local)
2588{
2589 int hg, rule_nr;
2590 enum drbd_conns rv = C_MASK;
2591 enum drbd_disk_state mydisk;
2592
2593 mydisk = mdev->state.disk;
2594 if (mydisk == D_NEGOTIATING)
2595 mydisk = mdev->new_state_tmp.disk;
2596
2597 dev_info(DEV, "drbd_sync_handshake:\n");
2598 drbd_uuid_dump(mdev, "self", mdev->ldev->md.uuid, mdev->comm_bm_set, 0);
2599 drbd_uuid_dump(mdev, "peer", mdev->p_uuid,
2600 mdev->p_uuid[UI_SIZE], mdev->p_uuid[UI_FLAGS]);
2601
2602 hg = drbd_uuid_compare(mdev, &rule_nr);
2603
2604 dev_info(DEV, "uuid_compare()=%d by rule %d\n", hg, rule_nr);
2605
2606 if (hg == -1000) {
2607 dev_alert(DEV, "Unrelated data, aborting!\n");
2608 return C_MASK;
2609 }
4a23f264
PR
2610 if (hg < -1000) {
2611 dev_alert(DEV, "To resolve this both sides have to support at least protocol %d\n", -hg - 1000);
b411b363
PR
2612 return C_MASK;
2613 }
2614
2615 if ((mydisk == D_INCONSISTENT && peer_disk > D_INCONSISTENT) ||
2616 (peer_disk == D_INCONSISTENT && mydisk > D_INCONSISTENT)) {
2617 int f = (hg == -100) || abs(hg) == 2;
2618 hg = mydisk > D_INCONSISTENT ? 1 : -1;
2619 if (f)
2620 hg = hg*2;
2621 dev_info(DEV, "Becoming sync %s due to disk states.\n",
2622 hg > 0 ? "source" : "target");
2623 }
2624
3a11a487
AG
2625 if (abs(hg) == 100)
2626 drbd_khelper(mdev, "initial-split-brain");
2627
89e58e75 2628 if (hg == 100 || (hg == -100 && mdev->tconn->net_conf->always_asbp)) {
b411b363
PR
2629 int pcount = (mdev->state.role == R_PRIMARY)
2630 + (peer_role == R_PRIMARY);
2631 int forced = (hg == -100);
2632
2633 switch (pcount) {
2634 case 0:
2635 hg = drbd_asb_recover_0p(mdev);
2636 break;
2637 case 1:
2638 hg = drbd_asb_recover_1p(mdev);
2639 break;
2640 case 2:
2641 hg = drbd_asb_recover_2p(mdev);
2642 break;
2643 }
2644 if (abs(hg) < 100) {
2645 dev_warn(DEV, "Split-Brain detected, %d primaries, "
2646 "automatically solved. Sync from %s node\n",
2647 pcount, (hg < 0) ? "peer" : "this");
2648 if (forced) {
2649 dev_warn(DEV, "Doing a full sync, since"
2650 " UUIDs where ambiguous.\n");
2651 hg = hg*2;
2652 }
2653 }
2654 }
2655
2656 if (hg == -100) {
89e58e75 2657 if (mdev->tconn->net_conf->want_lose && !(mdev->p_uuid[UI_FLAGS]&1))
b411b363 2658 hg = -1;
89e58e75 2659 if (!mdev->tconn->net_conf->want_lose && (mdev->p_uuid[UI_FLAGS]&1))
b411b363
PR
2660 hg = 1;
2661
2662 if (abs(hg) < 100)
2663 dev_warn(DEV, "Split-Brain detected, manually solved. "
2664 "Sync from %s node\n",
2665 (hg < 0) ? "peer" : "this");
2666 }
2667
2668 if (hg == -100) {
580b9767
LE
2669 /* FIXME this log message is not correct if we end up here
2670 * after an attempted attach on a diskless node.
2671 * We just refuse to attach -- well, we drop the "connection"
2672 * to that disk, in a way... */
3a11a487 2673 dev_alert(DEV, "Split-Brain detected but unresolved, dropping connection!\n");
b411b363
PR
2674 drbd_khelper(mdev, "split-brain");
2675 return C_MASK;
2676 }
2677
2678 if (hg > 0 && mydisk <= D_INCONSISTENT) {
2679 dev_err(DEV, "I shall become SyncSource, but I am inconsistent!\n");
2680 return C_MASK;
2681 }
2682
2683 if (hg < 0 && /* by intention we do not use mydisk here. */
2684 mdev->state.role == R_PRIMARY && mdev->state.disk >= D_CONSISTENT) {
89e58e75 2685 switch (mdev->tconn->net_conf->rr_conflict) {
b411b363
PR
2686 case ASB_CALL_HELPER:
2687 drbd_khelper(mdev, "pri-lost");
2688 /* fall through */
2689 case ASB_DISCONNECT:
2690 dev_err(DEV, "I shall become SyncTarget, but I am primary!\n");
2691 return C_MASK;
2692 case ASB_VIOLENTLY:
2693 dev_warn(DEV, "Becoming SyncTarget, violating the stable-data"
2694 "assumption\n");
2695 }
2696 }
2697
89e58e75 2698 if (mdev->tconn->net_conf->dry_run || test_bit(CONN_DRY_RUN, &mdev->flags)) {
cf14c2e9
PR
2699 if (hg == 0)
2700 dev_info(DEV, "dry-run connect: No resync, would become Connected immediately.\n");
2701 else
2702 dev_info(DEV, "dry-run connect: Would become %s, doing a %s resync.",
2703 drbd_conn_str(hg > 0 ? C_SYNC_SOURCE : C_SYNC_TARGET),
2704 abs(hg) >= 2 ? "full" : "bit-map based");
2705 return C_MASK;
2706 }
2707
b411b363
PR
2708 if (abs(hg) >= 2) {
2709 dev_info(DEV, "Writing the whole bitmap, full sync required after drbd_sync_handshake.\n");
20ceb2b2
LE
2710 if (drbd_bitmap_io(mdev, &drbd_bmio_set_n_write, "set_n_write from sync_handshake",
2711 BM_LOCKED_SET_ALLOWED))
b411b363
PR
2712 return C_MASK;
2713 }
2714
2715 if (hg > 0) { /* become sync source. */
2716 rv = C_WF_BITMAP_S;
2717 } else if (hg < 0) { /* become sync target */
2718 rv = C_WF_BITMAP_T;
2719 } else {
2720 rv = C_CONNECTED;
2721 if (drbd_bm_total_weight(mdev)) {
2722 dev_info(DEV, "No resync, but %lu bits in bitmap!\n",
2723 drbd_bm_total_weight(mdev));
2724 }
2725 }
2726
2727 return rv;
2728}
2729
2730/* returns 1 if invalid */
2731static int cmp_after_sb(enum drbd_after_sb_p peer, enum drbd_after_sb_p self)
2732{
2733 /* ASB_DISCARD_REMOTE - ASB_DISCARD_LOCAL is valid */
2734 if ((peer == ASB_DISCARD_REMOTE && self == ASB_DISCARD_LOCAL) ||
2735 (self == ASB_DISCARD_REMOTE && peer == ASB_DISCARD_LOCAL))
2736 return 0;
2737
2738 /* any other things with ASB_DISCARD_REMOTE or ASB_DISCARD_LOCAL are invalid */
2739 if (peer == ASB_DISCARD_REMOTE || peer == ASB_DISCARD_LOCAL ||
2740 self == ASB_DISCARD_REMOTE || self == ASB_DISCARD_LOCAL)
2741 return 1;
2742
2743 /* everything else is valid if they are equal on both sides. */
2744 if (peer == self)
2745 return 0;
2746
2747 /* everything es is invalid. */
2748 return 1;
2749}
2750
d8763023
AG
2751static int receive_protocol(struct drbd_conf *mdev, enum drbd_packet cmd,
2752 unsigned int data_size)
b411b363 2753{
e42325a5 2754 struct p_protocol *p = &mdev->tconn->data.rbuf.protocol;
b411b363 2755 int p_proto, p_after_sb_0p, p_after_sb_1p, p_after_sb_2p;
cf14c2e9 2756 int p_want_lose, p_two_primaries, cf;
b411b363
PR
2757 char p_integrity_alg[SHARED_SECRET_MAX] = "";
2758
b411b363
PR
2759 p_proto = be32_to_cpu(p->protocol);
2760 p_after_sb_0p = be32_to_cpu(p->after_sb_0p);
2761 p_after_sb_1p = be32_to_cpu(p->after_sb_1p);
2762 p_after_sb_2p = be32_to_cpu(p->after_sb_2p);
b411b363 2763 p_two_primaries = be32_to_cpu(p->two_primaries);
cf14c2e9
PR
2764 cf = be32_to_cpu(p->conn_flags);
2765 p_want_lose = cf & CF_WANT_LOSE;
2766
2767 clear_bit(CONN_DRY_RUN, &mdev->flags);
2768
2769 if (cf & CF_DRY_RUN)
2770 set_bit(CONN_DRY_RUN, &mdev->flags);
b411b363 2771
89e58e75 2772 if (p_proto != mdev->tconn->net_conf->wire_protocol) {
b411b363
PR
2773 dev_err(DEV, "incompatible communication protocols\n");
2774 goto disconnect;
2775 }
2776
89e58e75 2777 if (cmp_after_sb(p_after_sb_0p, mdev->tconn->net_conf->after_sb_0p)) {
b411b363
PR
2778 dev_err(DEV, "incompatible after-sb-0pri settings\n");
2779 goto disconnect;
2780 }
2781
89e58e75 2782 if (cmp_after_sb(p_after_sb_1p, mdev->tconn->net_conf->after_sb_1p)) {
b411b363
PR
2783 dev_err(DEV, "incompatible after-sb-1pri settings\n");
2784 goto disconnect;
2785 }
2786
89e58e75 2787 if (cmp_after_sb(p_after_sb_2p, mdev->tconn->net_conf->after_sb_2p)) {
b411b363
PR
2788 dev_err(DEV, "incompatible after-sb-2pri settings\n");
2789 goto disconnect;
2790 }
2791
89e58e75 2792 if (p_want_lose && mdev->tconn->net_conf->want_lose) {
b411b363
PR
2793 dev_err(DEV, "both sides have the 'want_lose' flag set\n");
2794 goto disconnect;
2795 }
2796
89e58e75 2797 if (p_two_primaries != mdev->tconn->net_conf->two_primaries) {
b411b363
PR
2798 dev_err(DEV, "incompatible setting of the two-primaries options\n");
2799 goto disconnect;
2800 }
2801
31890f4a 2802 if (mdev->tconn->agreed_pro_version >= 87) {
89e58e75 2803 unsigned char *my_alg = mdev->tconn->net_conf->integrity_alg;
b411b363 2804
de0ff338 2805 if (drbd_recv(mdev->tconn, p_integrity_alg, data_size) != data_size)
81e84650 2806 return false;
b411b363
PR
2807
2808 p_integrity_alg[SHARED_SECRET_MAX-1] = 0;
2809 if (strcmp(p_integrity_alg, my_alg)) {
2810 dev_err(DEV, "incompatible setting of the data-integrity-alg\n");
2811 goto disconnect;
2812 }
2813 dev_info(DEV, "data-integrity-alg: %s\n",
2814 my_alg[0] ? my_alg : (unsigned char *)"<not-used>");
2815 }
2816
81e84650 2817 return true;
b411b363
PR
2818
2819disconnect:
2820 drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
81e84650 2821 return false;
b411b363
PR
2822}
2823
2824/* helper function
2825 * input: alg name, feature name
2826 * return: NULL (alg name was "")
2827 * ERR_PTR(error) if something goes wrong
2828 * or the crypto hash ptr, if it worked out ok. */
2829struct crypto_hash *drbd_crypto_alloc_digest_safe(const struct drbd_conf *mdev,
2830 const char *alg, const char *name)
2831{
2832 struct crypto_hash *tfm;
2833
2834 if (!alg[0])
2835 return NULL;
2836
2837 tfm = crypto_alloc_hash(alg, 0, CRYPTO_ALG_ASYNC);
2838 if (IS_ERR(tfm)) {
2839 dev_err(DEV, "Can not allocate \"%s\" as %s (reason: %ld)\n",
2840 alg, name, PTR_ERR(tfm));
2841 return tfm;
2842 }
2843 if (!drbd_crypto_is_hash(crypto_hash_tfm(tfm))) {
2844 crypto_free_hash(tfm);
2845 dev_err(DEV, "\"%s\" is not a digest (%s)\n", alg, name);
2846 return ERR_PTR(-EINVAL);
2847 }
2848 return tfm;
2849}
2850
d8763023
AG
2851static int receive_SyncParam(struct drbd_conf *mdev, enum drbd_packet cmd,
2852 unsigned int packet_size)
b411b363 2853{
81e84650 2854 int ok = true;
e42325a5 2855 struct p_rs_param_95 *p = &mdev->tconn->data.rbuf.rs_param_95;
b411b363
PR
2856 unsigned int header_size, data_size, exp_max_sz;
2857 struct crypto_hash *verify_tfm = NULL;
2858 struct crypto_hash *csums_tfm = NULL;
31890f4a 2859 const int apv = mdev->tconn->agreed_pro_version;
778f271d
PR
2860 int *rs_plan_s = NULL;
2861 int fifo_size = 0;
b411b363
PR
2862
2863 exp_max_sz = apv <= 87 ? sizeof(struct p_rs_param)
2864 : apv == 88 ? sizeof(struct p_rs_param)
2865 + SHARED_SECRET_MAX
8e26f9cc
PR
2866 : apv <= 94 ? sizeof(struct p_rs_param_89)
2867 : /* apv >= 95 */ sizeof(struct p_rs_param_95);
b411b363 2868
02918be2 2869 if (packet_size > exp_max_sz) {
b411b363 2870 dev_err(DEV, "SyncParam packet too long: received %u, expected <= %u bytes\n",
02918be2 2871 packet_size, exp_max_sz);
81e84650 2872 return false;
b411b363
PR
2873 }
2874
2875 if (apv <= 88) {
257d0af6 2876 header_size = sizeof(struct p_rs_param) - sizeof(struct p_header);
02918be2 2877 data_size = packet_size - header_size;
8e26f9cc 2878 } else if (apv <= 94) {
257d0af6 2879 header_size = sizeof(struct p_rs_param_89) - sizeof(struct p_header);
02918be2 2880 data_size = packet_size - header_size;
b411b363 2881 D_ASSERT(data_size == 0);
8e26f9cc 2882 } else {
257d0af6 2883 header_size = sizeof(struct p_rs_param_95) - sizeof(struct p_header);
02918be2 2884 data_size = packet_size - header_size;
b411b363
PR
2885 D_ASSERT(data_size == 0);
2886 }
2887
2888 /* initialize verify_alg and csums_alg */
2889 memset(p->verify_alg, 0, 2 * SHARED_SECRET_MAX);
2890
de0ff338 2891 if (drbd_recv(mdev->tconn, &p->head.payload, header_size) != header_size)
81e84650 2892 return false;
b411b363
PR
2893
2894 mdev->sync_conf.rate = be32_to_cpu(p->rate);
2895
2896 if (apv >= 88) {
2897 if (apv == 88) {
2898 if (data_size > SHARED_SECRET_MAX) {
2899 dev_err(DEV, "verify-alg too long, "
2900 "peer wants %u, accepting only %u byte\n",
2901 data_size, SHARED_SECRET_MAX);
81e84650 2902 return false;
b411b363
PR
2903 }
2904
de0ff338 2905 if (drbd_recv(mdev->tconn, p->verify_alg, data_size) != data_size)
81e84650 2906 return false;
b411b363
PR
2907
2908 /* we expect NUL terminated string */
2909 /* but just in case someone tries to be evil */
2910 D_ASSERT(p->verify_alg[data_size-1] == 0);
2911 p->verify_alg[data_size-1] = 0;
2912
2913 } else /* apv >= 89 */ {
2914 /* we still expect NUL terminated strings */
2915 /* but just in case someone tries to be evil */
2916 D_ASSERT(p->verify_alg[SHARED_SECRET_MAX-1] == 0);
2917 D_ASSERT(p->csums_alg[SHARED_SECRET_MAX-1] == 0);
2918 p->verify_alg[SHARED_SECRET_MAX-1] = 0;
2919 p->csums_alg[SHARED_SECRET_MAX-1] = 0;
2920 }
2921
2922 if (strcmp(mdev->sync_conf.verify_alg, p->verify_alg)) {
2923 if (mdev->state.conn == C_WF_REPORT_PARAMS) {
2924 dev_err(DEV, "Different verify-alg settings. me=\"%s\" peer=\"%s\"\n",
2925 mdev->sync_conf.verify_alg, p->verify_alg);
2926 goto disconnect;
2927 }
2928 verify_tfm = drbd_crypto_alloc_digest_safe(mdev,
2929 p->verify_alg, "verify-alg");
2930 if (IS_ERR(verify_tfm)) {
2931 verify_tfm = NULL;
2932 goto disconnect;
2933 }
2934 }
2935
2936 if (apv >= 89 && strcmp(mdev->sync_conf.csums_alg, p->csums_alg)) {
2937 if (mdev->state.conn == C_WF_REPORT_PARAMS) {
2938 dev_err(DEV, "Different csums-alg settings. me=\"%s\" peer=\"%s\"\n",
2939 mdev->sync_conf.csums_alg, p->csums_alg);
2940 goto disconnect;
2941 }
2942 csums_tfm = drbd_crypto_alloc_digest_safe(mdev,
2943 p->csums_alg, "csums-alg");
2944 if (IS_ERR(csums_tfm)) {
2945 csums_tfm = NULL;
2946 goto disconnect;
2947 }
2948 }
2949
8e26f9cc
PR
2950 if (apv > 94) {
2951 mdev->sync_conf.rate = be32_to_cpu(p->rate);
2952 mdev->sync_conf.c_plan_ahead = be32_to_cpu(p->c_plan_ahead);
2953 mdev->sync_conf.c_delay_target = be32_to_cpu(p->c_delay_target);
2954 mdev->sync_conf.c_fill_target = be32_to_cpu(p->c_fill_target);
2955 mdev->sync_conf.c_max_rate = be32_to_cpu(p->c_max_rate);
778f271d
PR
2956
2957 fifo_size = (mdev->sync_conf.c_plan_ahead * 10 * SLEEP_TIME) / HZ;
2958 if (fifo_size != mdev->rs_plan_s.size && fifo_size > 0) {
2959 rs_plan_s = kzalloc(sizeof(int) * fifo_size, GFP_KERNEL);
2960 if (!rs_plan_s) {
2961 dev_err(DEV, "kmalloc of fifo_buffer failed");
2962 goto disconnect;
2963 }
2964 }
8e26f9cc 2965 }
b411b363
PR
2966
2967 spin_lock(&mdev->peer_seq_lock);
2968 /* lock against drbd_nl_syncer_conf() */
2969 if (verify_tfm) {
2970 strcpy(mdev->sync_conf.verify_alg, p->verify_alg);
2971 mdev->sync_conf.verify_alg_len = strlen(p->verify_alg) + 1;
2972 crypto_free_hash(mdev->verify_tfm);
2973 mdev->verify_tfm = verify_tfm;
2974 dev_info(DEV, "using verify-alg: \"%s\"\n", p->verify_alg);
2975 }
2976 if (csums_tfm) {
2977 strcpy(mdev->sync_conf.csums_alg, p->csums_alg);
2978 mdev->sync_conf.csums_alg_len = strlen(p->csums_alg) + 1;
2979 crypto_free_hash(mdev->csums_tfm);
2980 mdev->csums_tfm = csums_tfm;
2981 dev_info(DEV, "using csums-alg: \"%s\"\n", p->csums_alg);
2982 }
778f271d
PR
2983 if (fifo_size != mdev->rs_plan_s.size) {
2984 kfree(mdev->rs_plan_s.values);
2985 mdev->rs_plan_s.values = rs_plan_s;
2986 mdev->rs_plan_s.size = fifo_size;
2987 mdev->rs_planed = 0;
2988 }
b411b363
PR
2989 spin_unlock(&mdev->peer_seq_lock);
2990 }
2991
2992 return ok;
2993disconnect:
2994 /* just for completeness: actually not needed,
2995 * as this is not reached if csums_tfm was ok. */
2996 crypto_free_hash(csums_tfm);
2997 /* but free the verify_tfm again, if csums_tfm did not work out */
2998 crypto_free_hash(verify_tfm);
2999 drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
81e84650 3000 return false;
b411b363
PR
3001}
3002
b411b363
PR
3003/* warn if the arguments differ by more than 12.5% */
3004static void warn_if_differ_considerably(struct drbd_conf *mdev,
3005 const char *s, sector_t a, sector_t b)
3006{
3007 sector_t d;
3008 if (a == 0 || b == 0)
3009 return;
3010 d = (a > b) ? (a - b) : (b - a);
3011 if (d > (a>>3) || d > (b>>3))
3012 dev_warn(DEV, "Considerable difference in %s: %llus vs. %llus\n", s,
3013 (unsigned long long)a, (unsigned long long)b);
3014}
3015
d8763023
AG
3016static int receive_sizes(struct drbd_conf *mdev, enum drbd_packet cmd,
3017 unsigned int data_size)
b411b363 3018{
e42325a5 3019 struct p_sizes *p = &mdev->tconn->data.rbuf.sizes;
b411b363 3020 enum determine_dev_size dd = unchanged;
b411b363
PR
3021 sector_t p_size, p_usize, my_usize;
3022 int ldsc = 0; /* local disk size changed */
e89b591c 3023 enum dds_flags ddsf;
b411b363 3024
b411b363
PR
3025 p_size = be64_to_cpu(p->d_size);
3026 p_usize = be64_to_cpu(p->u_size);
3027
3028 if (p_size == 0 && mdev->state.disk == D_DISKLESS) {
3029 dev_err(DEV, "some backing storage is needed\n");
3030 drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
81e84650 3031 return false;
b411b363
PR
3032 }
3033
3034 /* just store the peer's disk size for now.
3035 * we still need to figure out whether we accept that. */
3036 mdev->p_size = p_size;
3037
b411b363
PR
3038 if (get_ldev(mdev)) {
3039 warn_if_differ_considerably(mdev, "lower level device sizes",
3040 p_size, drbd_get_max_capacity(mdev->ldev));
3041 warn_if_differ_considerably(mdev, "user requested size",
3042 p_usize, mdev->ldev->dc.disk_size);
3043
3044 /* if this is the first connect, or an otherwise expected
3045 * param exchange, choose the minimum */
3046 if (mdev->state.conn == C_WF_REPORT_PARAMS)
3047 p_usize = min_not_zero((sector_t)mdev->ldev->dc.disk_size,
3048 p_usize);
3049
3050 my_usize = mdev->ldev->dc.disk_size;
3051
3052 if (mdev->ldev->dc.disk_size != p_usize) {
3053 mdev->ldev->dc.disk_size = p_usize;
3054 dev_info(DEV, "Peer sets u_size to %lu sectors\n",
3055 (unsigned long)mdev->ldev->dc.disk_size);
3056 }
3057
3058 /* Never shrink a device with usable data during connect.
3059 But allow online shrinking if we are connected. */
a393db6f 3060 if (drbd_new_dev_size(mdev, mdev->ldev, 0) <
b411b363
PR
3061 drbd_get_capacity(mdev->this_bdev) &&
3062 mdev->state.disk >= D_OUTDATED &&
3063 mdev->state.conn < C_CONNECTED) {
3064 dev_err(DEV, "The peer's disk size is too small!\n");
3065 drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
3066 mdev->ldev->dc.disk_size = my_usize;
3067 put_ldev(mdev);
81e84650 3068 return false;
b411b363
PR
3069 }
3070 put_ldev(mdev);
3071 }
b411b363 3072
e89b591c 3073 ddsf = be16_to_cpu(p->dds_flags);
b411b363 3074 if (get_ldev(mdev)) {
24c4830c 3075 dd = drbd_determine_dev_size(mdev, ddsf);
b411b363
PR
3076 put_ldev(mdev);
3077 if (dd == dev_size_error)
81e84650 3078 return false;
b411b363
PR
3079 drbd_md_sync(mdev);
3080 } else {
3081 /* I am diskless, need to accept the peer's size. */
3082 drbd_set_my_capacity(mdev, p_size);
3083 }
3084
99432fcc
PR
3085 mdev->peer_max_bio_size = be32_to_cpu(p->max_bio_size);
3086 drbd_reconsider_max_bio_size(mdev);
3087
b411b363
PR
3088 if (get_ldev(mdev)) {
3089 if (mdev->ldev->known_size != drbd_get_capacity(mdev->ldev->backing_bdev)) {
3090 mdev->ldev->known_size = drbd_get_capacity(mdev->ldev->backing_bdev);
3091 ldsc = 1;
3092 }
3093
b411b363
PR
3094 put_ldev(mdev);
3095 }
3096
3097 if (mdev->state.conn > C_WF_REPORT_PARAMS) {
3098 if (be64_to_cpu(p->c_size) !=
3099 drbd_get_capacity(mdev->this_bdev) || ldsc) {
3100 /* we have different sizes, probably peer
3101 * needs to know my new size... */
e89b591c 3102 drbd_send_sizes(mdev, 0, ddsf);
b411b363
PR
3103 }
3104 if (test_and_clear_bit(RESIZE_PENDING, &mdev->flags) ||
3105 (dd == grew && mdev->state.conn == C_CONNECTED)) {
3106 if (mdev->state.pdsk >= D_INCONSISTENT &&
e89b591c
PR
3107 mdev->state.disk >= D_INCONSISTENT) {
3108 if (ddsf & DDSF_NO_RESYNC)
3109 dev_info(DEV, "Resync of new storage suppressed with --assume-clean\n");
3110 else
3111 resync_after_online_grow(mdev);
3112 } else
b411b363
PR
3113 set_bit(RESYNC_AFTER_NEG, &mdev->flags);
3114 }
3115 }
3116
81e84650 3117 return true;
b411b363
PR
3118}
3119
d8763023
AG
3120static int receive_uuids(struct drbd_conf *mdev, enum drbd_packet cmd,
3121 unsigned int data_size)
b411b363 3122{
e42325a5 3123 struct p_uuids *p = &mdev->tconn->data.rbuf.uuids;
b411b363 3124 u64 *p_uuid;
62b0da3a 3125 int i, updated_uuids = 0;
b411b363 3126
b411b363
PR
3127 p_uuid = kmalloc(sizeof(u64)*UI_EXTENDED_SIZE, GFP_NOIO);
3128
3129 for (i = UI_CURRENT; i < UI_EXTENDED_SIZE; i++)
3130 p_uuid[i] = be64_to_cpu(p->uuid[i]);
3131
3132 kfree(mdev->p_uuid);
3133 mdev->p_uuid = p_uuid;
3134
3135 if (mdev->state.conn < C_CONNECTED &&
3136 mdev->state.disk < D_INCONSISTENT &&
3137 mdev->state.role == R_PRIMARY &&
3138 (mdev->ed_uuid & ~((u64)1)) != (p_uuid[UI_CURRENT] & ~((u64)1))) {
3139 dev_err(DEV, "Can only connect to data with current UUID=%016llX\n",
3140 (unsigned long long)mdev->ed_uuid);
3141 drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
81e84650 3142 return false;
b411b363
PR
3143 }
3144
3145 if (get_ldev(mdev)) {
3146 int skip_initial_sync =
3147 mdev->state.conn == C_CONNECTED &&
31890f4a 3148 mdev->tconn->agreed_pro_version >= 90 &&
b411b363
PR
3149 mdev->ldev->md.uuid[UI_CURRENT] == UUID_JUST_CREATED &&
3150 (p_uuid[UI_FLAGS] & 8);
3151 if (skip_initial_sync) {
3152 dev_info(DEV, "Accepted new current UUID, preparing to skip initial sync\n");
3153 drbd_bitmap_io(mdev, &drbd_bmio_clear_n_write,
20ceb2b2
LE
3154 "clear_n_write from receive_uuids",
3155 BM_LOCKED_TEST_ALLOWED);
b411b363
PR
3156 _drbd_uuid_set(mdev, UI_CURRENT, p_uuid[UI_CURRENT]);
3157 _drbd_uuid_set(mdev, UI_BITMAP, 0);
3158 _drbd_set_state(_NS2(mdev, disk, D_UP_TO_DATE, pdsk, D_UP_TO_DATE),
3159 CS_VERBOSE, NULL);
3160 drbd_md_sync(mdev);
62b0da3a 3161 updated_uuids = 1;
b411b363
PR
3162 }
3163 put_ldev(mdev);
18a50fa2
PR
3164 } else if (mdev->state.disk < D_INCONSISTENT &&
3165 mdev->state.role == R_PRIMARY) {
3166 /* I am a diskless primary, the peer just created a new current UUID
3167 for me. */
62b0da3a 3168 updated_uuids = drbd_set_ed_uuid(mdev, p_uuid[UI_CURRENT]);
b411b363
PR
3169 }
3170
3171 /* Before we test for the disk state, we should wait until an eventually
3172 ongoing cluster wide state change is finished. That is important if
3173 we are primary and are detaching from our disk. We need to see the
3174 new disk state... */
8410da8f
PR
3175 mutex_lock(mdev->state_mutex);
3176 mutex_unlock(mdev->state_mutex);
b411b363 3177 if (mdev->state.conn >= C_CONNECTED && mdev->state.disk < D_INCONSISTENT)
62b0da3a
LE
3178 updated_uuids |= drbd_set_ed_uuid(mdev, p_uuid[UI_CURRENT]);
3179
3180 if (updated_uuids)
3181 drbd_print_uuids(mdev, "receiver updated UUIDs to");
b411b363 3182
81e84650 3183 return true;
b411b363
PR
3184}
3185
3186/**
3187 * convert_state() - Converts the peer's view of the cluster state to our point of view
3188 * @ps: The state as seen by the peer.
3189 */
3190static union drbd_state convert_state(union drbd_state ps)
3191{
3192 union drbd_state ms;
3193
3194 static enum drbd_conns c_tab[] = {
3195 [C_CONNECTED] = C_CONNECTED,
3196
3197 [C_STARTING_SYNC_S] = C_STARTING_SYNC_T,
3198 [C_STARTING_SYNC_T] = C_STARTING_SYNC_S,
3199 [C_DISCONNECTING] = C_TEAR_DOWN, /* C_NETWORK_FAILURE, */
3200 [C_VERIFY_S] = C_VERIFY_T,
3201 [C_MASK] = C_MASK,
3202 };
3203
3204 ms.i = ps.i;
3205
3206 ms.conn = c_tab[ps.conn];
3207 ms.peer = ps.role;
3208 ms.role = ps.peer;
3209 ms.pdsk = ps.disk;
3210 ms.disk = ps.pdsk;
3211 ms.peer_isp = (ps.aftr_isp | ps.user_isp);
3212
3213 return ms;
3214}
3215
d8763023
AG
3216static int receive_req_state(struct drbd_conf *mdev, enum drbd_packet cmd,
3217 unsigned int data_size)
b411b363 3218{
e42325a5 3219 struct p_req_state *p = &mdev->tconn->data.rbuf.req_state;
b411b363 3220 union drbd_state mask, val;
bf885f8a 3221 enum drbd_state_rv rv;
b411b363 3222
b411b363
PR
3223 mask.i = be32_to_cpu(p->mask);
3224 val.i = be32_to_cpu(p->val);
3225
25703f83 3226 if (test_bit(DISCARD_CONCURRENT, &mdev->tconn->flags) &&
8410da8f 3227 mutex_is_locked(mdev->state_mutex)) {
b411b363 3228 drbd_send_sr_reply(mdev, SS_CONCURRENT_ST_CHG);
81e84650 3229 return true;
b411b363
PR
3230 }
3231
3232 mask = convert_state(mask);
3233 val = convert_state(val);
3234
047cd4a6
PR
3235 if (cmd == P_CONN_ST_CHG_REQ) {
3236 rv = conn_request_state(mdev->tconn, mask, val, CS_VERBOSE | CS_LOCAL_ONLY);
3237 conn_send_sr_reply(mdev->tconn, rv);
3238 } else {
3239 rv = drbd_change_state(mdev, CS_VERBOSE, mask, val);
3240 drbd_send_sr_reply(mdev, rv);
3241 }
b411b363 3242
b411b363
PR
3243 drbd_md_sync(mdev);
3244
81e84650 3245 return true;
b411b363
PR
3246}
3247
d8763023
AG
3248static int receive_state(struct drbd_conf *mdev, enum drbd_packet cmd,
3249 unsigned int data_size)
b411b363 3250{
e42325a5 3251 struct p_state *p = &mdev->tconn->data.rbuf.state;
4ac4aada 3252 union drbd_state os, ns, peer_state;
b411b363 3253 enum drbd_disk_state real_peer_disk;
65d922c3 3254 enum chg_state_flags cs_flags;
b411b363
PR
3255 int rv;
3256
b411b363
PR
3257 peer_state.i = be32_to_cpu(p->state);
3258
3259 real_peer_disk = peer_state.disk;
3260 if (peer_state.disk == D_NEGOTIATING) {
3261 real_peer_disk = mdev->p_uuid[UI_FLAGS] & 4 ? D_INCONSISTENT : D_CONSISTENT;
3262 dev_info(DEV, "real peer disk state = %s\n", drbd_disk_str(real_peer_disk));
3263 }
3264
87eeee41 3265 spin_lock_irq(&mdev->tconn->req_lock);
b411b363 3266 retry:
4ac4aada 3267 os = ns = mdev->state;
87eeee41 3268 spin_unlock_irq(&mdev->tconn->req_lock);
b411b363 3269
e9ef7bb6
LE
3270 /* peer says his disk is uptodate, while we think it is inconsistent,
3271 * and this happens while we think we have a sync going on. */
3272 if (os.pdsk == D_INCONSISTENT && real_peer_disk == D_UP_TO_DATE &&
3273 os.conn > C_CONNECTED && os.disk == D_UP_TO_DATE) {
3274 /* If we are (becoming) SyncSource, but peer is still in sync
3275 * preparation, ignore its uptodate-ness to avoid flapping, it
3276 * will change to inconsistent once the peer reaches active
3277 * syncing states.
3278 * It may have changed syncer-paused flags, however, so we
3279 * cannot ignore this completely. */
3280 if (peer_state.conn > C_CONNECTED &&
3281 peer_state.conn < C_SYNC_SOURCE)
3282 real_peer_disk = D_INCONSISTENT;
3283
3284 /* if peer_state changes to connected at the same time,
3285 * it explicitly notifies us that it finished resync.
3286 * Maybe we should finish it up, too? */
3287 else if (os.conn >= C_SYNC_SOURCE &&
3288 peer_state.conn == C_CONNECTED) {
3289 if (drbd_bm_total_weight(mdev) <= mdev->rs_failed)
3290 drbd_resync_finished(mdev);
81e84650 3291 return true;
e9ef7bb6
LE
3292 }
3293 }
3294
3295 /* peer says his disk is inconsistent, while we think it is uptodate,
3296 * and this happens while the peer still thinks we have a sync going on,
3297 * but we think we are already done with the sync.
3298 * We ignore this to avoid flapping pdsk.
3299 * This should not happen, if the peer is a recent version of drbd. */
3300 if (os.pdsk == D_UP_TO_DATE && real_peer_disk == D_INCONSISTENT &&
3301 os.conn == C_CONNECTED && peer_state.conn > C_SYNC_SOURCE)
3302 real_peer_disk = D_UP_TO_DATE;
3303
4ac4aada
LE
3304 if (ns.conn == C_WF_REPORT_PARAMS)
3305 ns.conn = C_CONNECTED;
b411b363 3306
67531718
PR
3307 if (peer_state.conn == C_AHEAD)
3308 ns.conn = C_BEHIND;
3309
b411b363
PR
3310 if (mdev->p_uuid && peer_state.disk >= D_NEGOTIATING &&
3311 get_ldev_if_state(mdev, D_NEGOTIATING)) {
3312 int cr; /* consider resync */
3313
3314 /* if we established a new connection */
4ac4aada 3315 cr = (os.conn < C_CONNECTED);
b411b363
PR
3316 /* if we had an established connection
3317 * and one of the nodes newly attaches a disk */
4ac4aada 3318 cr |= (os.conn == C_CONNECTED &&
b411b363 3319 (peer_state.disk == D_NEGOTIATING ||
4ac4aada 3320 os.disk == D_NEGOTIATING));
b411b363
PR
3321 /* if we have both been inconsistent, and the peer has been
3322 * forced to be UpToDate with --overwrite-data */
3323 cr |= test_bit(CONSIDER_RESYNC, &mdev->flags);
3324 /* if we had been plain connected, and the admin requested to
3325 * start a sync by "invalidate" or "invalidate-remote" */
4ac4aada 3326 cr |= (os.conn == C_CONNECTED &&
b411b363
PR
3327 (peer_state.conn >= C_STARTING_SYNC_S &&
3328 peer_state.conn <= C_WF_BITMAP_T));
3329
3330 if (cr)
4ac4aada 3331 ns.conn = drbd_sync_handshake(mdev, peer_state.role, real_peer_disk);
b411b363
PR
3332
3333 put_ldev(mdev);
4ac4aada
LE
3334 if (ns.conn == C_MASK) {
3335 ns.conn = C_CONNECTED;
b411b363 3336 if (mdev->state.disk == D_NEGOTIATING) {
82f59cc6 3337 drbd_force_state(mdev, NS(disk, D_FAILED));
b411b363
PR
3338 } else if (peer_state.disk == D_NEGOTIATING) {
3339 dev_err(DEV, "Disk attach process on the peer node was aborted.\n");
3340 peer_state.disk = D_DISKLESS;
580b9767 3341 real_peer_disk = D_DISKLESS;
b411b363 3342 } else {
cf14c2e9 3343 if (test_and_clear_bit(CONN_DRY_RUN, &mdev->flags))
81e84650 3344 return false;
4ac4aada 3345 D_ASSERT(os.conn == C_WF_REPORT_PARAMS);
b411b363 3346 drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
81e84650 3347 return false;
b411b363
PR
3348 }
3349 }
3350 }
3351
87eeee41 3352 spin_lock_irq(&mdev->tconn->req_lock);
4ac4aada 3353 if (mdev->state.i != os.i)
b411b363
PR
3354 goto retry;
3355 clear_bit(CONSIDER_RESYNC, &mdev->flags);
b411b363
PR
3356 ns.peer = peer_state.role;
3357 ns.pdsk = real_peer_disk;
3358 ns.peer_isp = (peer_state.aftr_isp | peer_state.user_isp);
4ac4aada 3359 if ((ns.conn == C_CONNECTED || ns.conn == C_WF_BITMAP_S) && ns.disk == D_NEGOTIATING)
b411b363 3360 ns.disk = mdev->new_state_tmp.disk;
4ac4aada
LE
3361 cs_flags = CS_VERBOSE + (os.conn < C_CONNECTED && ns.conn >= C_CONNECTED ? 0 : CS_HARD);
3362 if (ns.pdsk == D_CONSISTENT && is_susp(ns) && ns.conn == C_CONNECTED && os.conn < C_CONNECTED &&
481c6f50 3363 test_bit(NEW_CUR_UUID, &mdev->flags)) {
8554df1c 3364 /* Do not allow tl_restart(RESEND) for a rebooted peer. We can only allow this
481c6f50 3365 for temporal network outages! */
87eeee41 3366 spin_unlock_irq(&mdev->tconn->req_lock);
481c6f50
PR
3367 dev_err(DEV, "Aborting Connect, can not thaw IO with an only Consistent peer\n");
3368 tl_clear(mdev);
3369 drbd_uuid_new_current(mdev);
3370 clear_bit(NEW_CUR_UUID, &mdev->flags);
3371 drbd_force_state(mdev, NS2(conn, C_PROTOCOL_ERROR, susp, 0));
81e84650 3372 return false;
481c6f50 3373 }
65d922c3 3374 rv = _drbd_set_state(mdev, ns, cs_flags, NULL);
b411b363 3375 ns = mdev->state;
87eeee41 3376 spin_unlock_irq(&mdev->tconn->req_lock);
b411b363
PR
3377
3378 if (rv < SS_SUCCESS) {
3379 drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
81e84650 3380 return false;
b411b363
PR
3381 }
3382
4ac4aada
LE
3383 if (os.conn > C_WF_REPORT_PARAMS) {
3384 if (ns.conn > C_CONNECTED && peer_state.conn <= C_CONNECTED &&
b411b363
PR
3385 peer_state.disk != D_NEGOTIATING ) {
3386 /* we want resync, peer has not yet decided to sync... */
3387 /* Nowadays only used when forcing a node into primary role and
3388 setting its disk to UpToDate with that */
3389 drbd_send_uuids(mdev);
3390 drbd_send_state(mdev);
3391 }
3392 }
3393
89e58e75 3394 mdev->tconn->net_conf->want_lose = 0;
b411b363
PR
3395
3396 drbd_md_sync(mdev); /* update connected indicator, la_size, ... */
3397
81e84650 3398 return true;
b411b363
PR
3399}
3400
d8763023
AG
3401static int receive_sync_uuid(struct drbd_conf *mdev, enum drbd_packet cmd,
3402 unsigned int data_size)
b411b363 3403{
e42325a5 3404 struct p_rs_uuid *p = &mdev->tconn->data.rbuf.rs_uuid;
b411b363
PR
3405
3406 wait_event(mdev->misc_wait,
3407 mdev->state.conn == C_WF_SYNC_UUID ||
c4752ef1 3408 mdev->state.conn == C_BEHIND ||
b411b363
PR
3409 mdev->state.conn < C_CONNECTED ||
3410 mdev->state.disk < D_NEGOTIATING);
3411
3412 /* D_ASSERT( mdev->state.conn == C_WF_SYNC_UUID ); */
3413
b411b363
PR
3414 /* Here the _drbd_uuid_ functions are right, current should
3415 _not_ be rotated into the history */
3416 if (get_ldev_if_state(mdev, D_NEGOTIATING)) {
3417 _drbd_uuid_set(mdev, UI_CURRENT, be64_to_cpu(p->uuid));
3418 _drbd_uuid_set(mdev, UI_BITMAP, 0UL);
3419
62b0da3a 3420 drbd_print_uuids(mdev, "updated sync uuid");
b411b363
PR
3421 drbd_start_resync(mdev, C_SYNC_TARGET);
3422
3423 put_ldev(mdev);
3424 } else
3425 dev_err(DEV, "Ignoring SyncUUID packet!\n");
3426
81e84650 3427 return true;
b411b363
PR
3428}
3429
2c46407d
AG
3430/**
3431 * receive_bitmap_plain
3432 *
3433 * Return 0 when done, 1 when another iteration is needed, and a negative error
3434 * code upon failure.
3435 */
3436static int
02918be2
PR
3437receive_bitmap_plain(struct drbd_conf *mdev, unsigned int data_size,
3438 unsigned long *buffer, struct bm_xfer_ctx *c)
b411b363
PR
3439{
3440 unsigned num_words = min_t(size_t, BM_PACKET_WORDS, c->bm_words - c->word_offset);
3441 unsigned want = num_words * sizeof(long);
2c46407d 3442 int err;
b411b363 3443
02918be2
PR
3444 if (want != data_size) {
3445 dev_err(DEV, "%s:want (%u) != data_size (%u)\n", __func__, want, data_size);
2c46407d 3446 return -EIO;
b411b363
PR
3447 }
3448 if (want == 0)
2c46407d 3449 return 0;
de0ff338 3450 err = drbd_recv(mdev->tconn, buffer, want);
2c46407d
AG
3451 if (err != want) {
3452 if (err >= 0)
3453 err = -EIO;
3454 return err;
3455 }
b411b363
PR
3456
3457 drbd_bm_merge_lel(mdev, c->word_offset, num_words, buffer);
3458
3459 c->word_offset += num_words;
3460 c->bit_offset = c->word_offset * BITS_PER_LONG;
3461 if (c->bit_offset > c->bm_bits)
3462 c->bit_offset = c->bm_bits;
3463
2c46407d 3464 return 1;
b411b363
PR
3465}
3466
2c46407d
AG
3467/**
3468 * recv_bm_rle_bits
3469 *
3470 * Return 0 when done, 1 when another iteration is needed, and a negative error
3471 * code upon failure.
3472 */
3473static int
b411b363
PR
3474recv_bm_rle_bits(struct drbd_conf *mdev,
3475 struct p_compressed_bm *p,
c6d25cfe
PR
3476 struct bm_xfer_ctx *c,
3477 unsigned int len)
b411b363
PR
3478{
3479 struct bitstream bs;
3480 u64 look_ahead;
3481 u64 rl;
3482 u64 tmp;
3483 unsigned long s = c->bit_offset;
3484 unsigned long e;
b411b363
PR
3485 int toggle = DCBP_get_start(p);
3486 int have;
3487 int bits;
3488
3489 bitstream_init(&bs, p->code, len, DCBP_get_pad_bits(p));
3490
3491 bits = bitstream_get_bits(&bs, &look_ahead, 64);
3492 if (bits < 0)
2c46407d 3493 return -EIO;
b411b363
PR
3494
3495 for (have = bits; have > 0; s += rl, toggle = !toggle) {
3496 bits = vli_decode_bits(&rl, look_ahead);
3497 if (bits <= 0)
2c46407d 3498 return -EIO;
b411b363
PR
3499
3500 if (toggle) {
3501 e = s + rl -1;
3502 if (e >= c->bm_bits) {
3503 dev_err(DEV, "bitmap overflow (e:%lu) while decoding bm RLE packet\n", e);
2c46407d 3504 return -EIO;
b411b363
PR
3505 }
3506 _drbd_bm_set_bits(mdev, s, e);
3507 }
3508
3509 if (have < bits) {
3510 dev_err(DEV, "bitmap decoding error: h:%d b:%d la:0x%08llx l:%u/%u\n",
3511 have, bits, look_ahead,
3512 (unsigned int)(bs.cur.b - p->code),
3513 (unsigned int)bs.buf_len);
2c46407d 3514 return -EIO;
b411b363
PR
3515 }
3516 look_ahead >>= bits;
3517 have -= bits;
3518
3519 bits = bitstream_get_bits(&bs, &tmp, 64 - have);
3520 if (bits < 0)
2c46407d 3521 return -EIO;
b411b363
PR
3522 look_ahead |= tmp << have;
3523 have += bits;
3524 }
3525
3526 c->bit_offset = s;
3527 bm_xfer_ctx_bit_to_word_offset(c);
3528
2c46407d 3529 return (s != c->bm_bits);
b411b363
PR
3530}
3531
2c46407d
AG
3532/**
3533 * decode_bitmap_c
3534 *
3535 * Return 0 when done, 1 when another iteration is needed, and a negative error
3536 * code upon failure.
3537 */
3538static int
b411b363
PR
3539decode_bitmap_c(struct drbd_conf *mdev,
3540 struct p_compressed_bm *p,
c6d25cfe
PR
3541 struct bm_xfer_ctx *c,
3542 unsigned int len)
b411b363
PR
3543{
3544 if (DCBP_get_code(p) == RLE_VLI_Bits)
c6d25cfe 3545 return recv_bm_rle_bits(mdev, p, c, len);
b411b363
PR
3546
3547 /* other variants had been implemented for evaluation,
3548 * but have been dropped as this one turned out to be "best"
3549 * during all our tests. */
3550
3551 dev_err(DEV, "receive_bitmap_c: unknown encoding %u\n", p->encoding);
3552 drbd_force_state(mdev, NS(conn, C_PROTOCOL_ERROR));
2c46407d 3553 return -EIO;
b411b363
PR
3554}
3555
3556void INFO_bm_xfer_stats(struct drbd_conf *mdev,
3557 const char *direction, struct bm_xfer_ctx *c)
3558{
3559 /* what would it take to transfer it "plaintext" */
c012949a 3560 unsigned plain = sizeof(struct p_header) *
b411b363
PR
3561 ((c->bm_words+BM_PACKET_WORDS-1)/BM_PACKET_WORDS+1)
3562 + c->bm_words * sizeof(long);
3563 unsigned total = c->bytes[0] + c->bytes[1];
3564 unsigned r;
3565
3566 /* total can not be zero. but just in case: */
3567 if (total == 0)
3568 return;
3569
3570 /* don't report if not compressed */
3571 if (total >= plain)
3572 return;
3573
3574 /* total < plain. check for overflow, still */
3575 r = (total > UINT_MAX/1000) ? (total / (plain/1000))
3576 : (1000 * total / plain);
3577
3578 if (r > 1000)
3579 r = 1000;
3580
3581 r = 1000 - r;
3582 dev_info(DEV, "%s bitmap stats [Bytes(packets)]: plain %u(%u), RLE %u(%u), "
3583 "total %u; compression: %u.%u%%\n",
3584 direction,
3585 c->bytes[1], c->packets[1],
3586 c->bytes[0], c->packets[0],
3587 total, r/10, r % 10);
3588}
3589
3590/* Since we are processing the bitfield from lower addresses to higher,
3591 it does not matter if the process it in 32 bit chunks or 64 bit
3592 chunks as long as it is little endian. (Understand it as byte stream,
3593 beginning with the lowest byte...) If we would use big endian
3594 we would need to process it from the highest address to the lowest,
3595 in order to be agnostic to the 32 vs 64 bits issue.
3596
3597 returns 0 on failure, 1 if we successfully received it. */
d8763023
AG
3598static int receive_bitmap(struct drbd_conf *mdev, enum drbd_packet cmd,
3599 unsigned int data_size)
b411b363
PR
3600{
3601 struct bm_xfer_ctx c;
3602 void *buffer;
2c46407d 3603 int err;
81e84650 3604 int ok = false;
257d0af6 3605 struct p_header *h = &mdev->tconn->data.rbuf.header;
77351055 3606 struct packet_info pi;
b411b363 3607
20ceb2b2
LE
3608 drbd_bm_lock(mdev, "receive bitmap", BM_LOCKED_SET_ALLOWED);
3609 /* you are supposed to send additional out-of-sync information
3610 * if you actually set bits during this phase */
b411b363
PR
3611
3612 /* maybe we should use some per thread scratch page,
3613 * and allocate that during initial device creation? */
3614 buffer = (unsigned long *) __get_free_page(GFP_NOIO);
3615 if (!buffer) {
3616 dev_err(DEV, "failed to allocate one page buffer in %s\n", __func__);
3617 goto out;
3618 }
3619
3620 c = (struct bm_xfer_ctx) {
3621 .bm_bits = drbd_bm_bits(mdev),
3622 .bm_words = drbd_bm_words(mdev),
3623 };
3624
2c46407d 3625 for(;;) {
02918be2 3626 if (cmd == P_BITMAP) {
2c46407d 3627 err = receive_bitmap_plain(mdev, data_size, buffer, &c);
02918be2 3628 } else if (cmd == P_COMPRESSED_BITMAP) {
b411b363
PR
3629 /* MAYBE: sanity check that we speak proto >= 90,
3630 * and the feature is enabled! */
3631 struct p_compressed_bm *p;
3632
02918be2 3633 if (data_size > BM_PACKET_PAYLOAD_BYTES) {
b411b363
PR
3634 dev_err(DEV, "ReportCBitmap packet too large\n");
3635 goto out;
3636 }
3637 /* use the page buff */
3638 p = buffer;
3639 memcpy(p, h, sizeof(*h));
de0ff338 3640 if (drbd_recv(mdev->tconn, p->head.payload, data_size) != data_size)
b411b363 3641 goto out;
004352fa
LE
3642 if (data_size <= (sizeof(*p) - sizeof(p->head))) {
3643 dev_err(DEV, "ReportCBitmap packet too small (l:%u)\n", data_size);
78fcbdae 3644 goto out;
b411b363 3645 }
c6d25cfe 3646 err = decode_bitmap_c(mdev, p, &c, data_size);
b411b363 3647 } else {
02918be2 3648 dev_warn(DEV, "receive_bitmap: cmd neither ReportBitMap nor ReportCBitMap (is 0x%x)", cmd);
b411b363
PR
3649 goto out;
3650 }
3651
02918be2 3652 c.packets[cmd == P_BITMAP]++;
257d0af6 3653 c.bytes[cmd == P_BITMAP] += sizeof(struct p_header) + data_size;
b411b363 3654
2c46407d
AG
3655 if (err <= 0) {
3656 if (err < 0)
3657 goto out;
b411b363 3658 break;
2c46407d 3659 }
9ba7aa00 3660 if (!drbd_recv_header(mdev->tconn, &pi))
b411b363 3661 goto out;
77351055
PR
3662 cmd = pi.cmd;
3663 data_size = pi.size;
2c46407d 3664 }
b411b363
PR
3665
3666 INFO_bm_xfer_stats(mdev, "receive", &c);
3667
3668 if (mdev->state.conn == C_WF_BITMAP_T) {
de1f8e4a
AG
3669 enum drbd_state_rv rv;
3670
b411b363
PR
3671 ok = !drbd_send_bitmap(mdev);
3672 if (!ok)
3673 goto out;
3674 /* Omit CS_ORDERED with this state transition to avoid deadlocks. */
de1f8e4a
AG
3675 rv = _drbd_request_state(mdev, NS(conn, C_WF_SYNC_UUID), CS_VERBOSE);
3676 D_ASSERT(rv == SS_SUCCESS);
b411b363
PR
3677 } else if (mdev->state.conn != C_WF_BITMAP_S) {
3678 /* admin may have requested C_DISCONNECTING,
3679 * other threads may have noticed network errors */
3680 dev_info(DEV, "unexpected cstate (%s) in receive_bitmap\n",
3681 drbd_conn_str(mdev->state.conn));
3682 }
3683
81e84650 3684 ok = true;
b411b363 3685 out:
20ceb2b2 3686 drbd_bm_unlock(mdev);
b411b363
PR
3687 if (ok && mdev->state.conn == C_WF_BITMAP_S)
3688 drbd_start_resync(mdev, C_SYNC_SOURCE);
3689 free_page((unsigned long) buffer);
3690 return ok;
3691}
3692
d8763023
AG
3693static int receive_skip(struct drbd_conf *mdev, enum drbd_packet cmd,
3694 unsigned int data_size)
b411b363
PR
3695{
3696 /* TODO zero copy sink :) */
3697 static char sink[128];
3698 int size, want, r;
3699
02918be2
PR
3700 dev_warn(DEV, "skipping unknown optional packet type %d, l: %d!\n",
3701 cmd, data_size);
b411b363 3702
02918be2 3703 size = data_size;
b411b363
PR
3704 while (size > 0) {
3705 want = min_t(int, size, sizeof(sink));
de0ff338 3706 r = drbd_recv(mdev->tconn, sink, want);
841ce241
AG
3707 if (!expect(r > 0))
3708 break;
b411b363
PR
3709 size -= r;
3710 }
3711 return size == 0;
3712}
3713
d8763023
AG
3714static int receive_UnplugRemote(struct drbd_conf *mdev, enum drbd_packet cmd,
3715 unsigned int data_size)
0ced55a3 3716{
e7f52dfb
LE
3717 /* Make sure we've acked all the TCP data associated
3718 * with the data requests being unplugged */
e42325a5 3719 drbd_tcp_quickack(mdev->tconn->data.socket);
0ced55a3 3720
81e84650 3721 return true;
0ced55a3
PR
3722}
3723
d8763023
AG
3724static int receive_out_of_sync(struct drbd_conf *mdev, enum drbd_packet cmd,
3725 unsigned int data_size)
73a01a18 3726{
e42325a5 3727 struct p_block_desc *p = &mdev->tconn->data.rbuf.block_desc;
73a01a18 3728
f735e363
LE
3729 switch (mdev->state.conn) {
3730 case C_WF_SYNC_UUID:
3731 case C_WF_BITMAP_T:
3732 case C_BEHIND:
3733 break;
3734 default:
3735 dev_err(DEV, "ASSERT FAILED cstate = %s, expected: WFSyncUUID|WFBitMapT|Behind\n",
3736 drbd_conn_str(mdev->state.conn));
3737 }
3738
73a01a18
PR
3739 drbd_set_out_of_sync(mdev, be64_to_cpu(p->sector), be32_to_cpu(p->blksize));
3740
81e84650 3741 return true;
73a01a18
PR
3742}
3743
d8763023
AG
3744typedef int (*drbd_cmd_handler_f)(struct drbd_conf *, enum drbd_packet cmd,
3745 unsigned int to_receive);
02918be2
PR
3746
3747struct data_cmd {
3748 int expect_payload;
3749 size_t pkt_size;
3750 drbd_cmd_handler_f function;
3751};
3752
3753static struct data_cmd drbd_cmd_handler[] = {
3754 [P_DATA] = { 1, sizeof(struct p_data), receive_Data },
3755 [P_DATA_REPLY] = { 1, sizeof(struct p_data), receive_DataReply },
3756 [P_RS_DATA_REPLY] = { 1, sizeof(struct p_data), receive_RSDataReply } ,
3757 [P_BARRIER] = { 0, sizeof(struct p_barrier), receive_Barrier } ,
257d0af6
PR
3758 [P_BITMAP] = { 1, sizeof(struct p_header), receive_bitmap } ,
3759 [P_COMPRESSED_BITMAP] = { 1, sizeof(struct p_header), receive_bitmap } ,
3760 [P_UNPLUG_REMOTE] = { 0, sizeof(struct p_header), receive_UnplugRemote },
02918be2
PR
3761 [P_DATA_REQUEST] = { 0, sizeof(struct p_block_req), receive_DataRequest },
3762 [P_RS_DATA_REQUEST] = { 0, sizeof(struct p_block_req), receive_DataRequest },
257d0af6
PR
3763 [P_SYNC_PARAM] = { 1, sizeof(struct p_header), receive_SyncParam },
3764 [P_SYNC_PARAM89] = { 1, sizeof(struct p_header), receive_SyncParam },
02918be2
PR
3765 [P_PROTOCOL] = { 1, sizeof(struct p_protocol), receive_protocol },
3766 [P_UUIDS] = { 0, sizeof(struct p_uuids), receive_uuids },
3767 [P_SIZES] = { 0, sizeof(struct p_sizes), receive_sizes },
3768 [P_STATE] = { 0, sizeof(struct p_state), receive_state },
3769 [P_STATE_CHG_REQ] = { 0, sizeof(struct p_req_state), receive_req_state },
3770 [P_SYNC_UUID] = { 0, sizeof(struct p_rs_uuid), receive_sync_uuid },
3771 [P_OV_REQUEST] = { 0, sizeof(struct p_block_req), receive_DataRequest },
3772 [P_OV_REPLY] = { 1, sizeof(struct p_block_req), receive_DataRequest },
3773 [P_CSUM_RS_REQUEST] = { 1, sizeof(struct p_block_req), receive_DataRequest },
3774 [P_DELAY_PROBE] = { 0, sizeof(struct p_delay_probe93), receive_skip },
73a01a18 3775 [P_OUT_OF_SYNC] = { 0, sizeof(struct p_block_desc), receive_out_of_sync },
047cd4a6 3776 [P_CONN_ST_CHG_REQ] = { 0, sizeof(struct p_req_state), receive_req_state },
b411b363
PR
3777 /* anything missing from this table is in
3778 * the asender_tbl, see get_asender_cmd */
02918be2 3779 [P_MAX_CMD] = { 0, 0, NULL },
b411b363
PR
3780};
3781
02918be2 3782/* All handler functions that expect a sub-header get that sub-heder in
e42325a5 3783 mdev->tconn->data.rbuf.header.head.payload.
02918be2 3784
e42325a5 3785 Usually in mdev->tconn->data.rbuf.header.head the callback can find the usual
02918be2
PR
3786 p_header, but they may not rely on that. Since there is also p_header95 !
3787 */
b411b363 3788
eefc2f7d 3789static void drbdd(struct drbd_tconn *tconn)
b411b363 3790{
eefc2f7d 3791 struct p_header *header = &tconn->data.rbuf.header;
77351055 3792 struct packet_info pi;
02918be2
PR
3793 size_t shs; /* sub header size */
3794 int rv;
b411b363 3795
eefc2f7d
PR
3796 while (get_t_state(&tconn->receiver) == RUNNING) {
3797 drbd_thread_current_set_cpu(&tconn->receiver);
3798 if (!drbd_recv_header(tconn, &pi))
02918be2 3799 goto err_out;
b411b363 3800
77351055 3801 if (unlikely(pi.cmd >= P_MAX_CMD || !drbd_cmd_handler[pi.cmd].function)) {
eefc2f7d 3802 conn_err(tconn, "unknown packet type %d, l: %d!\n", pi.cmd, pi.size);
02918be2 3803 goto err_out;
0b33a916 3804 }
b411b363 3805
77351055
PR
3806 shs = drbd_cmd_handler[pi.cmd].pkt_size - sizeof(struct p_header);
3807 if (pi.size - shs > 0 && !drbd_cmd_handler[pi.cmd].expect_payload) {
eefc2f7d 3808 conn_err(tconn, "No payload expected %s l:%d\n", cmdname(pi.cmd), pi.size);
02918be2 3809 goto err_out;
b411b363 3810 }
b411b363 3811
c13f7e1a 3812 if (shs) {
eefc2f7d 3813 rv = drbd_recv(tconn, &header->payload, shs);
c13f7e1a 3814 if (unlikely(rv != shs)) {
0ddc5549 3815 if (!signal_pending(current))
eefc2f7d 3816 conn_warn(tconn, "short read while reading sub header: rv=%d\n", rv);
c13f7e1a
LE
3817 goto err_out;
3818 }
3819 }
3820
eefc2f7d 3821 rv = drbd_cmd_handler[pi.cmd].function(vnr_to_mdev(tconn, pi.vnr), pi.cmd, pi.size - shs);
b411b363 3822
02918be2 3823 if (unlikely(!rv)) {
eefc2f7d 3824 conn_err(tconn, "error receiving %s, l: %d!\n",
77351055 3825 cmdname(pi.cmd), pi.size);
02918be2 3826 goto err_out;
b411b363
PR
3827 }
3828 }
b411b363 3829
02918be2
PR
3830 if (0) {
3831 err_out:
bbeb641c 3832 conn_request_state(tconn, NS(conn, C_PROTOCOL_ERROR), CS_HARD);
02918be2 3833 }
b411b363
PR
3834}
3835
a21e9298 3836void drbd_flush_workqueue(struct drbd_conf *mdev)
b411b363
PR
3837{
3838 struct drbd_wq_barrier barr;
3839
3840 barr.w.cb = w_prev_work_done;
a21e9298 3841 barr.w.mdev = mdev;
b411b363 3842 init_completion(&barr.done);
a21e9298 3843 drbd_queue_work(&mdev->tconn->data.work, &barr.w);
b411b363
PR
3844 wait_for_completion(&barr.done);
3845}
3846
360cc740 3847static void drbd_disconnect(struct drbd_tconn *tconn)
b411b363 3848{
bbeb641c 3849 enum drbd_conns oc;
b411b363 3850 int rv = SS_UNKNOWN_ERROR;
b411b363 3851
bbeb641c 3852 if (tconn->cstate == C_STANDALONE)
b411b363 3853 return;
b411b363
PR
3854
3855 /* asender does not clean up anything. it must not interfere, either */
360cc740
PR
3856 drbd_thread_stop(&tconn->asender);
3857 drbd_free_sock(tconn);
3858
3859 idr_for_each(&tconn->volumes, drbd_disconnected, tconn);
3860
3861 conn_info(tconn, "Connection closed\n");
3862
3863 spin_lock_irq(&tconn->req_lock);
bbeb641c
PR
3864 oc = tconn->cstate;
3865 if (oc >= C_UNCONNECTED)
3866 rv = _conn_request_state(tconn, NS(conn, C_UNCONNECTED), CS_VERBOSE);
3867
360cc740
PR
3868 spin_unlock_irq(&tconn->req_lock);
3869
bbeb641c 3870 if (oc == C_DISCONNECTING) {
360cc740
PR
3871 wait_event(tconn->net_cnt_wait, atomic_read(&tconn->net_cnt) == 0);
3872
3873 crypto_free_hash(tconn->cram_hmac_tfm);
3874 tconn->cram_hmac_tfm = NULL;
3875
3876 kfree(tconn->net_conf);
3877 tconn->net_conf = NULL;
bbeb641c 3878 conn_request_state(tconn, NS(conn, C_STANDALONE), CS_VERBOSE);
360cc740
PR
3879 }
3880}
3881
3882static int drbd_disconnected(int vnr, void *p, void *data)
3883{
3884 struct drbd_conf *mdev = (struct drbd_conf *)p;
3885 enum drbd_fencing_p fp;
3886 unsigned int i;
b411b363 3887
85719573 3888 /* wait for current activity to cease. */
87eeee41 3889 spin_lock_irq(&mdev->tconn->req_lock);
b411b363
PR
3890 _drbd_wait_ee_list_empty(mdev, &mdev->active_ee);
3891 _drbd_wait_ee_list_empty(mdev, &mdev->sync_ee);
3892 _drbd_wait_ee_list_empty(mdev, &mdev->read_ee);
87eeee41 3893 spin_unlock_irq(&mdev->tconn->req_lock);
b411b363
PR
3894
3895 /* We do not have data structures that would allow us to
3896 * get the rs_pending_cnt down to 0 again.
3897 * * On C_SYNC_TARGET we do not have any data structures describing
3898 * the pending RSDataRequest's we have sent.
3899 * * On C_SYNC_SOURCE there is no data structure that tracks
3900 * the P_RS_DATA_REPLY blocks that we sent to the SyncTarget.
3901 * And no, it is not the sum of the reference counts in the
3902 * resync_LRU. The resync_LRU tracks the whole operation including
3903 * the disk-IO, while the rs_pending_cnt only tracks the blocks
3904 * on the fly. */
3905 drbd_rs_cancel_all(mdev);
3906 mdev->rs_total = 0;
3907 mdev->rs_failed = 0;
3908 atomic_set(&mdev->rs_pending_cnt, 0);
3909 wake_up(&mdev->misc_wait);
3910
7fde2be9
PR
3911 del_timer(&mdev->request_timer);
3912
b411b363
PR
3913 /* make sure syncer is stopped and w_resume_next_sg queued */
3914 del_timer_sync(&mdev->resync_timer);
b411b363
PR
3915 resync_timer_fn((unsigned long)mdev);
3916
b411b363
PR
3917 /* wait for all w_e_end_data_req, w_e_end_rsdata_req, w_send_barrier,
3918 * w_make_resync_request etc. which may still be on the worker queue
3919 * to be "canceled" */
a21e9298 3920 drbd_flush_workqueue(mdev);
b411b363
PR
3921
3922 /* This also does reclaim_net_ee(). If we do this too early, we might
3923 * miss some resync ee and pages.*/
3924 drbd_process_done_ee(mdev);
3925
3926 kfree(mdev->p_uuid);
3927 mdev->p_uuid = NULL;
3928
fb22c402 3929 if (!is_susp(mdev->state))
b411b363
PR
3930 tl_clear(mdev);
3931
b411b363
PR
3932 drbd_md_sync(mdev);
3933
3934 fp = FP_DONT_CARE;
3935 if (get_ldev(mdev)) {
3936 fp = mdev->ldev->dc.fencing;
3937 put_ldev(mdev);
3938 }
3939
87f7be4c
PR
3940 if (mdev->state.role == R_PRIMARY && fp >= FP_RESOURCE && mdev->state.pdsk >= D_UNKNOWN)
3941 drbd_try_outdate_peer_async(mdev);
b411b363 3942
20ceb2b2
LE
3943 /* serialize with bitmap writeout triggered by the state change,
3944 * if any. */
3945 wait_event(mdev->misc_wait, !test_bit(BITMAP_IO, &mdev->flags));
3946
b411b363
PR
3947 /* tcp_close and release of sendpage pages can be deferred. I don't
3948 * want to use SO_LINGER, because apparently it can be deferred for
3949 * more than 20 seconds (longest time I checked).
3950 *
3951 * Actually we don't care for exactly when the network stack does its
3952 * put_page(), but release our reference on these pages right here.
3953 */
3954 i = drbd_release_ee(mdev, &mdev->net_ee);
3955 if (i)
3956 dev_info(DEV, "net_ee not empty, killed %u entries\n", i);
435f0740
LE
3957 i = atomic_read(&mdev->pp_in_use_by_net);
3958 if (i)
3959 dev_info(DEV, "pp_in_use_by_net = %d, expected 0\n", i);
b411b363
PR
3960 i = atomic_read(&mdev->pp_in_use);
3961 if (i)
45bb912b 3962 dev_info(DEV, "pp_in_use = %d, expected 0\n", i);
b411b363
PR
3963
3964 D_ASSERT(list_empty(&mdev->read_ee));
3965 D_ASSERT(list_empty(&mdev->active_ee));
3966 D_ASSERT(list_empty(&mdev->sync_ee));
3967 D_ASSERT(list_empty(&mdev->done_ee));
3968
3969 /* ok, no more ee's on the fly, it is safe to reset the epoch_size */
3970 atomic_set(&mdev->current_epoch->epoch_size, 0);
3971 D_ASSERT(list_empty(&mdev->current_epoch->list));
360cc740
PR
3972
3973 return 0;
b411b363
PR
3974}
3975
3976/*
3977 * We support PRO_VERSION_MIN to PRO_VERSION_MAX. The protocol version
3978 * we can agree on is stored in agreed_pro_version.
3979 *
3980 * feature flags and the reserved array should be enough room for future
3981 * enhancements of the handshake protocol, and possible plugins...
3982 *
3983 * for now, they are expected to be zero, but ignored.
3984 */
8a22cccc 3985static int drbd_send_handshake(struct drbd_tconn *tconn)
b411b363 3986{
e6b3ea83 3987 /* ASSERT current == mdev->tconn->receiver ... */
8a22cccc 3988 struct p_handshake *p = &tconn->data.sbuf.handshake;
b411b363
PR
3989 int ok;
3990
8a22cccc
PR
3991 if (mutex_lock_interruptible(&tconn->data.mutex)) {
3992 conn_err(tconn, "interrupted during initial handshake\n");
b411b363
PR
3993 return 0; /* interrupted. not ok. */
3994 }
3995
8a22cccc
PR
3996 if (tconn->data.socket == NULL) {
3997 mutex_unlock(&tconn->data.mutex);
b411b363
PR
3998 return 0;
3999 }
4000
4001 memset(p, 0, sizeof(*p));
4002 p->protocol_min = cpu_to_be32(PRO_VERSION_MIN);
4003 p->protocol_max = cpu_to_be32(PRO_VERSION_MAX);
8a22cccc
PR
4004 ok = _conn_send_cmd(tconn, 0, tconn->data.socket, P_HAND_SHAKE,
4005 &p->head, sizeof(*p), 0);
4006 mutex_unlock(&tconn->data.mutex);
b411b363
PR
4007 return ok;
4008}
4009
4010/*
4011 * return values:
4012 * 1 yes, we have a valid connection
4013 * 0 oops, did not work out, please try again
4014 * -1 peer talks different language,
4015 * no point in trying again, please go standalone.
4016 */
65d11ed6 4017static int drbd_do_handshake(struct drbd_tconn *tconn)
b411b363 4018{
65d11ed6
PR
4019 /* ASSERT current == tconn->receiver ... */
4020 struct p_handshake *p = &tconn->data.rbuf.handshake;
02918be2 4021 const int expect = sizeof(struct p_handshake) - sizeof(struct p_header80);
77351055 4022 struct packet_info pi;
b411b363
PR
4023 int rv;
4024
65d11ed6 4025 rv = drbd_send_handshake(tconn);
b411b363
PR
4026 if (!rv)
4027 return 0;
4028
65d11ed6 4029 rv = drbd_recv_header(tconn, &pi);
b411b363
PR
4030 if (!rv)
4031 return 0;
4032
77351055 4033 if (pi.cmd != P_HAND_SHAKE) {
65d11ed6 4034 conn_err(tconn, "expected HandShake packet, received: %s (0x%04x)\n",
77351055 4035 cmdname(pi.cmd), pi.cmd);
b411b363
PR
4036 return -1;
4037 }
4038
77351055 4039 if (pi.size != expect) {
65d11ed6 4040 conn_err(tconn, "expected HandShake length: %u, received: %u\n",
77351055 4041 expect, pi.size);
b411b363
PR
4042 return -1;
4043 }
4044
65d11ed6 4045 rv = drbd_recv(tconn, &p->head.payload, expect);
b411b363
PR
4046
4047 if (rv != expect) {
0ddc5549 4048 if (!signal_pending(current))
65d11ed6 4049 conn_warn(tconn, "short read receiving handshake packet: l=%u\n", rv);
b411b363
PR
4050 return 0;
4051 }
4052
b411b363
PR
4053 p->protocol_min = be32_to_cpu(p->protocol_min);
4054 p->protocol_max = be32_to_cpu(p->protocol_max);
4055 if (p->protocol_max == 0)
4056 p->protocol_max = p->protocol_min;
4057
4058 if (PRO_VERSION_MAX < p->protocol_min ||
4059 PRO_VERSION_MIN > p->protocol_max)
4060 goto incompat;
4061
65d11ed6 4062 tconn->agreed_pro_version = min_t(int, PRO_VERSION_MAX, p->protocol_max);
b411b363 4063
65d11ed6
PR
4064 conn_info(tconn, "Handshake successful: "
4065 "Agreed network protocol version %d\n", tconn->agreed_pro_version);
b411b363
PR
4066
4067 return 1;
4068
4069 incompat:
65d11ed6 4070 conn_err(tconn, "incompatible DRBD dialects: "
b411b363
PR
4071 "I support %d-%d, peer supports %d-%d\n",
4072 PRO_VERSION_MIN, PRO_VERSION_MAX,
4073 p->protocol_min, p->protocol_max);
4074 return -1;
4075}
4076
4077#if !defined(CONFIG_CRYPTO_HMAC) && !defined(CONFIG_CRYPTO_HMAC_MODULE)
13e6037d 4078static int drbd_do_auth(struct drbd_tconn *tconn)
b411b363
PR
4079{
4080 dev_err(DEV, "This kernel was build without CONFIG_CRYPTO_HMAC.\n");
4081 dev_err(DEV, "You need to disable 'cram-hmac-alg' in drbd.conf.\n");
b10d96cb 4082 return -1;
b411b363
PR
4083}
4084#else
4085#define CHALLENGE_LEN 64
b10d96cb
JT
4086
4087/* Return value:
4088 1 - auth succeeded,
4089 0 - failed, try again (network error),
4090 -1 - auth failed, don't try again.
4091*/
4092
13e6037d 4093static int drbd_do_auth(struct drbd_tconn *tconn)
b411b363
PR
4094{
4095 char my_challenge[CHALLENGE_LEN]; /* 64 Bytes... */
4096 struct scatterlist sg;
4097 char *response = NULL;
4098 char *right_response = NULL;
4099 char *peers_ch = NULL;
13e6037d 4100 unsigned int key_len = strlen(tconn->net_conf->shared_secret);
b411b363
PR
4101 unsigned int resp_size;
4102 struct hash_desc desc;
77351055 4103 struct packet_info pi;
b411b363
PR
4104 int rv;
4105
13e6037d 4106 desc.tfm = tconn->cram_hmac_tfm;
b411b363
PR
4107 desc.flags = 0;
4108
13e6037d
PR
4109 rv = crypto_hash_setkey(tconn->cram_hmac_tfm,
4110 (u8 *)tconn->net_conf->shared_secret, key_len);
b411b363 4111 if (rv) {
13e6037d 4112 conn_err(tconn, "crypto_hash_setkey() failed with %d\n", rv);
b10d96cb 4113 rv = -1;
b411b363
PR
4114 goto fail;
4115 }
4116
4117 get_random_bytes(my_challenge, CHALLENGE_LEN);
4118
13e6037d 4119 rv = conn_send_cmd2(tconn, P_AUTH_CHALLENGE, my_challenge, CHALLENGE_LEN);
b411b363
PR
4120 if (!rv)
4121 goto fail;
4122
13e6037d 4123 rv = drbd_recv_header(tconn, &pi);
b411b363
PR
4124 if (!rv)
4125 goto fail;
4126
77351055 4127 if (pi.cmd != P_AUTH_CHALLENGE) {
13e6037d 4128 conn_err(tconn, "expected AuthChallenge packet, received: %s (0x%04x)\n",
77351055 4129 cmdname(pi.cmd), pi.cmd);
b411b363
PR
4130 rv = 0;
4131 goto fail;
4132 }
4133
77351055 4134 if (pi.size > CHALLENGE_LEN * 2) {
13e6037d 4135 conn_err(tconn, "expected AuthChallenge payload too big.\n");
b10d96cb 4136 rv = -1;
b411b363
PR
4137 goto fail;
4138 }
4139
77351055 4140 peers_ch = kmalloc(pi.size, GFP_NOIO);
b411b363 4141 if (peers_ch == NULL) {
13e6037d 4142 conn_err(tconn, "kmalloc of peers_ch failed\n");
b10d96cb 4143 rv = -1;
b411b363
PR
4144 goto fail;
4145 }
4146
13e6037d 4147 rv = drbd_recv(tconn, peers_ch, pi.size);
b411b363 4148
77351055 4149 if (rv != pi.size) {
0ddc5549 4150 if (!signal_pending(current))
13e6037d 4151 conn_warn(tconn, "short read AuthChallenge: l=%u\n", rv);
b411b363
PR
4152 rv = 0;
4153 goto fail;
4154 }
4155
13e6037d 4156 resp_size = crypto_hash_digestsize(tconn->cram_hmac_tfm);
b411b363
PR
4157 response = kmalloc(resp_size, GFP_NOIO);
4158 if (response == NULL) {
13e6037d 4159 conn_err(tconn, "kmalloc of response failed\n");
b10d96cb 4160 rv = -1;
b411b363
PR
4161 goto fail;
4162 }
4163
4164 sg_init_table(&sg, 1);
77351055 4165 sg_set_buf(&sg, peers_ch, pi.size);
b411b363
PR
4166
4167 rv = crypto_hash_digest(&desc, &sg, sg.length, response);
4168 if (rv) {
13e6037d 4169 conn_err(tconn, "crypto_hash_digest() failed with %d\n", rv);
b10d96cb 4170 rv = -1;
b411b363
PR
4171 goto fail;
4172 }
4173
13e6037d 4174 rv = conn_send_cmd2(tconn, P_AUTH_RESPONSE, response, resp_size);
b411b363
PR
4175 if (!rv)
4176 goto fail;
4177
13e6037d 4178 rv = drbd_recv_header(tconn, &pi);
b411b363
PR
4179 if (!rv)
4180 goto fail;
4181
77351055 4182 if (pi.cmd != P_AUTH_RESPONSE) {
13e6037d 4183 conn_err(tconn, "expected AuthResponse packet, received: %s (0x%04x)\n",
77351055 4184 cmdname(pi.cmd), pi.cmd);
b411b363
PR
4185 rv = 0;
4186 goto fail;
4187 }
4188
77351055 4189 if (pi.size != resp_size) {
13e6037d 4190 conn_err(tconn, "expected AuthResponse payload of wrong size\n");
b411b363
PR
4191 rv = 0;
4192 goto fail;
4193 }
4194
13e6037d 4195 rv = drbd_recv(tconn, response , resp_size);
b411b363
PR
4196
4197 if (rv != resp_size) {
0ddc5549 4198 if (!signal_pending(current))
13e6037d 4199 conn_warn(tconn, "short read receiving AuthResponse: l=%u\n", rv);
b411b363
PR
4200 rv = 0;
4201 goto fail;
4202 }
4203
4204 right_response = kmalloc(resp_size, GFP_NOIO);
2d1ee87d 4205 if (right_response == NULL) {
13e6037d 4206 conn_err(tconn, "kmalloc of right_response failed\n");
b10d96cb 4207 rv = -1;
b411b363
PR
4208 goto fail;
4209 }
4210
4211 sg_set_buf(&sg, my_challenge, CHALLENGE_LEN);
4212
4213 rv = crypto_hash_digest(&desc, &sg, sg.length, right_response);
4214 if (rv) {
13e6037d 4215 conn_err(tconn, "crypto_hash_digest() failed with %d\n", rv);
b10d96cb 4216 rv = -1;
b411b363
PR
4217 goto fail;
4218 }
4219
4220 rv = !memcmp(response, right_response, resp_size);
4221
4222 if (rv)
13e6037d
PR
4223 conn_info(tconn, "Peer authenticated using %d bytes of '%s' HMAC\n",
4224 resp_size, tconn->net_conf->cram_hmac_alg);
b10d96cb
JT
4225 else
4226 rv = -1;
b411b363
PR
4227
4228 fail:
4229 kfree(peers_ch);
4230 kfree(response);
4231 kfree(right_response);
4232
4233 return rv;
4234}
4235#endif
4236
4237int drbdd_init(struct drbd_thread *thi)
4238{
392c8801 4239 struct drbd_tconn *tconn = thi->tconn;
b411b363
PR
4240 int h;
4241
4d641dd7 4242 conn_info(tconn, "receiver (re)started\n");
b411b363
PR
4243
4244 do {
4d641dd7 4245 h = drbd_connect(tconn);
b411b363 4246 if (h == 0) {
4d641dd7 4247 drbd_disconnect(tconn);
20ee6390 4248 schedule_timeout_interruptible(HZ);
b411b363
PR
4249 }
4250 if (h == -1) {
4d641dd7 4251 conn_warn(tconn, "Discarding network configuration.\n");
bbeb641c 4252 conn_request_state(tconn, NS(conn, C_DISCONNECTING), CS_HARD);
b411b363
PR
4253 }
4254 } while (h == 0);
4255
4256 if (h > 0) {
4d641dd7
PR
4257 if (get_net_conf(tconn)) {
4258 drbdd(tconn);
4259 put_net_conf(tconn);
b411b363
PR
4260 }
4261 }
4262
4d641dd7 4263 drbd_disconnect(tconn);
b411b363 4264
4d641dd7 4265 conn_info(tconn, "receiver terminated\n");
b411b363
PR
4266 return 0;
4267}
4268
4269/* ********* acknowledge sender ******** */
4270
d8763023 4271static int got_RqSReply(struct drbd_conf *mdev, enum drbd_packet cmd)
b411b363 4272{
257d0af6 4273 struct p_req_state_reply *p = &mdev->tconn->meta.rbuf.req_state_reply;
fc3b10a4 4274 struct drbd_tconn *tconn = mdev->tconn;
b411b363
PR
4275
4276 int retcode = be32_to_cpu(p->retcode);
4277
fc3b10a4
PR
4278 if (cmd == P_STATE_CHG_REPLY) {
4279 if (retcode >= SS_SUCCESS) {
4280 set_bit(CL_ST_CHG_SUCCESS, &mdev->flags);
4281 } else {
4282 set_bit(CL_ST_CHG_FAIL, &mdev->flags);
4283 dev_err(DEV, "Requested state change failed by peer: %s (%d)\n",
4284 drbd_set_st_err_str(retcode), retcode);
4285 }
4286 wake_up(&mdev->state_wait);
4287 } else /* conn == P_CONN_ST_CHG_REPLY */ {
4288 if (retcode >= SS_SUCCESS) {
4289 set_bit(CONN_WD_ST_CHG_OKAY, &tconn->flags);
4290 } else {
4291 set_bit(CONN_WD_ST_CHG_FAIL, &tconn->flags);
4292 conn_err(tconn, "Requested state change failed by peer: %s (%d)\n",
4293 drbd_set_st_err_str(retcode), retcode);
4294 }
4295 wake_up(&tconn->ping_wait);
b411b363 4296 }
81e84650 4297 return true;
b411b363
PR
4298}
4299
d8763023 4300static int got_Ping(struct drbd_conf *mdev, enum drbd_packet cmd)
b411b363 4301{
2a67d8b9 4302 return drbd_send_ping_ack(mdev->tconn);
b411b363
PR
4303
4304}
4305
d8763023 4306static int got_PingAck(struct drbd_conf *mdev, enum drbd_packet cmd)
b411b363 4307{
2a67d8b9 4308 struct drbd_tconn *tconn = mdev->tconn;
b411b363 4309 /* restore idle timeout */
2a67d8b9
PR
4310 tconn->meta.socket->sk->sk_rcvtimeo = tconn->net_conf->ping_int*HZ;
4311 if (!test_and_set_bit(GOT_PING_ACK, &tconn->flags))
4312 wake_up(&tconn->ping_wait);
b411b363 4313
81e84650 4314 return true;
b411b363
PR
4315}
4316
d8763023 4317static int got_IsInSync(struct drbd_conf *mdev, enum drbd_packet cmd)
b411b363 4318{
257d0af6 4319 struct p_block_ack *p = &mdev->tconn->meta.rbuf.block_ack;
b411b363
PR
4320 sector_t sector = be64_to_cpu(p->sector);
4321 int blksize = be32_to_cpu(p->blksize);
4322
31890f4a 4323 D_ASSERT(mdev->tconn->agreed_pro_version >= 89);
b411b363
PR
4324
4325 update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4326
1d53f09e
LE
4327 if (get_ldev(mdev)) {
4328 drbd_rs_complete_io(mdev, sector);
4329 drbd_set_in_sync(mdev, sector, blksize);
4330 /* rs_same_csums is supposed to count in units of BM_BLOCK_SIZE */
4331 mdev->rs_same_csum += (blksize >> BM_BLOCK_SHIFT);
4332 put_ldev(mdev);
4333 }
b411b363 4334 dec_rs_pending(mdev);
778f271d 4335 atomic_add(blksize >> 9, &mdev->rs_sect_in);
b411b363 4336
81e84650 4337 return true;
b411b363
PR
4338}
4339
bc9c5c41
AG
4340static int
4341validate_req_change_req_state(struct drbd_conf *mdev, u64 id, sector_t sector,
4342 struct rb_root *root, const char *func,
4343 enum drbd_req_event what, bool missing_ok)
b411b363
PR
4344{
4345 struct drbd_request *req;
4346 struct bio_and_error m;
4347
87eeee41 4348 spin_lock_irq(&mdev->tconn->req_lock);
bc9c5c41 4349 req = find_request(mdev, root, id, sector, missing_ok, func);
b411b363 4350 if (unlikely(!req)) {
87eeee41 4351 spin_unlock_irq(&mdev->tconn->req_lock);
81e84650 4352 return false;
b411b363
PR
4353 }
4354 __req_mod(req, what, &m);
87eeee41 4355 spin_unlock_irq(&mdev->tconn->req_lock);
b411b363
PR
4356
4357 if (m.bio)
4358 complete_master_bio(mdev, &m);
81e84650 4359 return true;
b411b363
PR
4360}
4361
d8763023 4362static int got_BlockAck(struct drbd_conf *mdev, enum drbd_packet cmd)
b411b363 4363{
257d0af6 4364 struct p_block_ack *p = &mdev->tconn->meta.rbuf.block_ack;
b411b363
PR
4365 sector_t sector = be64_to_cpu(p->sector);
4366 int blksize = be32_to_cpu(p->blksize);
4367 enum drbd_req_event what;
4368
4369 update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4370
579b57ed 4371 if (p->block_id == ID_SYNCER) {
b411b363
PR
4372 drbd_set_in_sync(mdev, sector, blksize);
4373 dec_rs_pending(mdev);
81e84650 4374 return true;
b411b363 4375 }
257d0af6 4376 switch (cmd) {
b411b363 4377 case P_RS_WRITE_ACK:
89e58e75 4378 D_ASSERT(mdev->tconn->net_conf->wire_protocol == DRBD_PROT_C);
8554df1c 4379 what = WRITE_ACKED_BY_PEER_AND_SIS;
b411b363
PR
4380 break;
4381 case P_WRITE_ACK:
89e58e75 4382 D_ASSERT(mdev->tconn->net_conf->wire_protocol == DRBD_PROT_C);
8554df1c 4383 what = WRITE_ACKED_BY_PEER;
b411b363
PR
4384 break;
4385 case P_RECV_ACK:
89e58e75 4386 D_ASSERT(mdev->tconn->net_conf->wire_protocol == DRBD_PROT_B);
8554df1c 4387 what = RECV_ACKED_BY_PEER;
b411b363
PR
4388 break;
4389 case P_DISCARD_ACK:
89e58e75 4390 D_ASSERT(mdev->tconn->net_conf->wire_protocol == DRBD_PROT_C);
8554df1c 4391 what = CONFLICT_DISCARDED_BY_PEER;
b411b363
PR
4392 break;
4393 default:
4394 D_ASSERT(0);
81e84650 4395 return false;
b411b363
PR
4396 }
4397
4398 return validate_req_change_req_state(mdev, p->block_id, sector,
bc9c5c41
AG
4399 &mdev->write_requests, __func__,
4400 what, false);
b411b363
PR
4401}
4402
d8763023 4403static int got_NegAck(struct drbd_conf *mdev, enum drbd_packet cmd)
b411b363 4404{
257d0af6 4405 struct p_block_ack *p = &mdev->tconn->meta.rbuf.block_ack;
b411b363 4406 sector_t sector = be64_to_cpu(p->sector);
2deb8336 4407 int size = be32_to_cpu(p->blksize);
89e58e75
PR
4408 bool missing_ok = mdev->tconn->net_conf->wire_protocol == DRBD_PROT_A ||
4409 mdev->tconn->net_conf->wire_protocol == DRBD_PROT_B;
c3afd8f5 4410 bool found;
b411b363
PR
4411
4412 update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4413
579b57ed 4414 if (p->block_id == ID_SYNCER) {
b411b363
PR
4415 dec_rs_pending(mdev);
4416 drbd_rs_failed_io(mdev, sector, size);
81e84650 4417 return true;
b411b363 4418 }
2deb8336 4419
c3afd8f5 4420 found = validate_req_change_req_state(mdev, p->block_id, sector,
bc9c5c41 4421 &mdev->write_requests, __func__,
8554df1c 4422 NEG_ACKED, missing_ok);
c3afd8f5
AG
4423 if (!found) {
4424 /* Protocol A has no P_WRITE_ACKs, but has P_NEG_ACKs.
4425 The master bio might already be completed, therefore the
4426 request is no longer in the collision hash. */
4427 /* In Protocol B we might already have got a P_RECV_ACK
4428 but then get a P_NEG_ACK afterwards. */
4429 if (!missing_ok)
2deb8336 4430 return false;
c3afd8f5 4431 drbd_set_out_of_sync(mdev, sector, size);
2deb8336 4432 }
2deb8336 4433 return true;
b411b363
PR
4434}
4435
d8763023 4436static int got_NegDReply(struct drbd_conf *mdev, enum drbd_packet cmd)
b411b363 4437{
257d0af6 4438 struct p_block_ack *p = &mdev->tconn->meta.rbuf.block_ack;
b411b363
PR
4439 sector_t sector = be64_to_cpu(p->sector);
4440
4441 update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4442 dev_err(DEV, "Got NegDReply; Sector %llus, len %u; Fail original request.\n",
4443 (unsigned long long)sector, be32_to_cpu(p->blksize));
4444
4445 return validate_req_change_req_state(mdev, p->block_id, sector,
bc9c5c41 4446 &mdev->read_requests, __func__,
8554df1c 4447 NEG_ACKED, false);
b411b363
PR
4448}
4449
d8763023 4450static int got_NegRSDReply(struct drbd_conf *mdev, enum drbd_packet cmd)
b411b363
PR
4451{
4452 sector_t sector;
4453 int size;
257d0af6 4454 struct p_block_ack *p = &mdev->tconn->meta.rbuf.block_ack;
b411b363
PR
4455
4456 sector = be64_to_cpu(p->sector);
4457 size = be32_to_cpu(p->blksize);
b411b363
PR
4458
4459 update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4460
4461 dec_rs_pending(mdev);
4462
4463 if (get_ldev_if_state(mdev, D_FAILED)) {
4464 drbd_rs_complete_io(mdev, sector);
257d0af6 4465 switch (cmd) {
d612d309
PR
4466 case P_NEG_RS_DREPLY:
4467 drbd_rs_failed_io(mdev, sector, size);
4468 case P_RS_CANCEL:
4469 break;
4470 default:
4471 D_ASSERT(0);
4472 put_ldev(mdev);
4473 return false;
4474 }
b411b363
PR
4475 put_ldev(mdev);
4476 }
4477
81e84650 4478 return true;
b411b363
PR
4479}
4480
d8763023 4481static int got_BarrierAck(struct drbd_conf *mdev, enum drbd_packet cmd)
b411b363 4482{
257d0af6 4483 struct p_barrier_ack *p = &mdev->tconn->meta.rbuf.barrier_ack;
b411b363
PR
4484
4485 tl_release(mdev, p->barrier, be32_to_cpu(p->set_size));
4486
c4752ef1
PR
4487 if (mdev->state.conn == C_AHEAD &&
4488 atomic_read(&mdev->ap_in_flight) == 0 &&
370a43e7
PR
4489 !test_and_set_bit(AHEAD_TO_SYNC_SOURCE, &mdev->current_epoch->flags)) {
4490 mdev->start_resync_timer.expires = jiffies + HZ;
4491 add_timer(&mdev->start_resync_timer);
c4752ef1
PR
4492 }
4493
81e84650 4494 return true;
b411b363
PR
4495}
4496
d8763023 4497static int got_OVResult(struct drbd_conf *mdev, enum drbd_packet cmd)
b411b363 4498{
257d0af6 4499 struct p_block_ack *p = &mdev->tconn->meta.rbuf.block_ack;
b411b363
PR
4500 struct drbd_work *w;
4501 sector_t sector;
4502 int size;
4503
4504 sector = be64_to_cpu(p->sector);
4505 size = be32_to_cpu(p->blksize);
4506
4507 update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4508
4509 if (be64_to_cpu(p->block_id) == ID_OUT_OF_SYNC)
4510 drbd_ov_oos_found(mdev, sector, size);
4511 else
4512 ov_oos_print(mdev);
4513
1d53f09e 4514 if (!get_ldev(mdev))
81e84650 4515 return true;
1d53f09e 4516
b411b363
PR
4517 drbd_rs_complete_io(mdev, sector);
4518 dec_rs_pending(mdev);
4519
ea5442af
LE
4520 --mdev->ov_left;
4521
4522 /* let's advance progress step marks only for every other megabyte */
4523 if ((mdev->ov_left & 0x200) == 0x200)
4524 drbd_advance_rs_marks(mdev, mdev->ov_left);
4525
4526 if (mdev->ov_left == 0) {
b411b363
PR
4527 w = kmalloc(sizeof(*w), GFP_NOIO);
4528 if (w) {
4529 w->cb = w_ov_finished;
a21e9298 4530 w->mdev = mdev;
e42325a5 4531 drbd_queue_work_front(&mdev->tconn->data.work, w);
b411b363
PR
4532 } else {
4533 dev_err(DEV, "kmalloc(w) failed.");
4534 ov_oos_print(mdev);
4535 drbd_resync_finished(mdev);
4536 }
4537 }
1d53f09e 4538 put_ldev(mdev);
81e84650 4539 return true;
b411b363
PR
4540}
4541
d8763023 4542static int got_skip(struct drbd_conf *mdev, enum drbd_packet cmd)
0ced55a3 4543{
81e84650 4544 return true;
0ced55a3
PR
4545}
4546
b411b363
PR
4547struct asender_cmd {
4548 size_t pkt_size;
d8763023 4549 int (*process)(struct drbd_conf *mdev, enum drbd_packet cmd);
b411b363
PR
4550};
4551
4552static struct asender_cmd *get_asender_cmd(int cmd)
4553{
4554 static struct asender_cmd asender_tbl[] = {
4555 /* anything missing from this table is in
4556 * the drbd_cmd_handler (drbd_default_handler) table,
4557 * see the beginning of drbdd() */
257d0af6
PR
4558 [P_PING] = { sizeof(struct p_header), got_Ping },
4559 [P_PING_ACK] = { sizeof(struct p_header), got_PingAck },
b411b363
PR
4560 [P_RECV_ACK] = { sizeof(struct p_block_ack), got_BlockAck },
4561 [P_WRITE_ACK] = { sizeof(struct p_block_ack), got_BlockAck },
4562 [P_RS_WRITE_ACK] = { sizeof(struct p_block_ack), got_BlockAck },
4563 [P_DISCARD_ACK] = { sizeof(struct p_block_ack), got_BlockAck },
4564 [P_NEG_ACK] = { sizeof(struct p_block_ack), got_NegAck },
4565 [P_NEG_DREPLY] = { sizeof(struct p_block_ack), got_NegDReply },
4566 [P_NEG_RS_DREPLY] = { sizeof(struct p_block_ack), got_NegRSDReply},
4567 [P_OV_RESULT] = { sizeof(struct p_block_ack), got_OVResult },
4568 [P_BARRIER_ACK] = { sizeof(struct p_barrier_ack), got_BarrierAck },
4569 [P_STATE_CHG_REPLY] = { sizeof(struct p_req_state_reply), got_RqSReply },
4570 [P_RS_IS_IN_SYNC] = { sizeof(struct p_block_ack), got_IsInSync },
02918be2 4571 [P_DELAY_PROBE] = { sizeof(struct p_delay_probe93), got_skip },
d612d309 4572 [P_RS_CANCEL] = { sizeof(struct p_block_ack), got_NegRSDReply},
fc3b10a4 4573 [P_CONN_ST_CHG_REPLY]={ sizeof(struct p_req_state_reply), got_RqSReply },
b411b363
PR
4574 [P_MAX_CMD] = { 0, NULL },
4575 };
4576 if (cmd > P_MAX_CMD || asender_tbl[cmd].process == NULL)
4577 return NULL;
4578 return &asender_tbl[cmd];
4579}
4580
32862ec7
PR
4581static int _drbd_process_done_ee(int vnr, void *p, void *data)
4582{
4583 struct drbd_conf *mdev = (struct drbd_conf *)p;
4584 return !drbd_process_done_ee(mdev);
4585}
4586
4587static int _check_ee_empty(int vnr, void *p, void *data)
4588{
4589 struct drbd_conf *mdev = (struct drbd_conf *)p;
4590 struct drbd_tconn *tconn = mdev->tconn;
4591 int not_empty;
4592
4593 spin_lock_irq(&tconn->req_lock);
4594 not_empty = !list_empty(&mdev->done_ee);
4595 spin_unlock_irq(&tconn->req_lock);
4596
4597 return not_empty;
4598}
4599
4600static int tconn_process_done_ee(struct drbd_tconn *tconn)
4601{
4602 int not_empty, err;
4603
4604 do {
4605 clear_bit(SIGNAL_ASENDER, &tconn->flags);
4606 flush_signals(current);
4607 err = idr_for_each(&tconn->volumes, _drbd_process_done_ee, NULL);
4608 if (err)
4609 return err;
4610 set_bit(SIGNAL_ASENDER, &tconn->flags);
4611 not_empty = idr_for_each(&tconn->volumes, _check_ee_empty, NULL);
4612 } while (not_empty);
4613
4614 return 0;
4615}
4616
b411b363
PR
4617int drbd_asender(struct drbd_thread *thi)
4618{
392c8801 4619 struct drbd_tconn *tconn = thi->tconn;
32862ec7 4620 struct p_header *h = &tconn->meta.rbuf.header;
b411b363 4621 struct asender_cmd *cmd = NULL;
77351055 4622 struct packet_info pi;
257d0af6 4623 int rv;
b411b363
PR
4624 void *buf = h;
4625 int received = 0;
257d0af6 4626 int expect = sizeof(struct p_header);
f36af18c 4627 int ping_timeout_active = 0;
b411b363 4628
b411b363
PR
4629 current->policy = SCHED_RR; /* Make this a realtime task! */
4630 current->rt_priority = 2; /* more important than all other tasks */
4631
e77a0a5c 4632 while (get_t_state(thi) == RUNNING) {
80822284 4633 drbd_thread_current_set_cpu(thi);
32862ec7 4634 if (test_and_clear_bit(SEND_PING, &tconn->flags)) {
2a67d8b9 4635 if (!drbd_send_ping(tconn)) {
32862ec7 4636 conn_err(tconn, "drbd_send_ping has failed\n");
841ce241
AG
4637 goto reconnect;
4638 }
32862ec7
PR
4639 tconn->meta.socket->sk->sk_rcvtimeo =
4640 tconn->net_conf->ping_timeo*HZ/10;
f36af18c 4641 ping_timeout_active = 1;
b411b363
PR
4642 }
4643
32862ec7
PR
4644 /* TODO: conditionally cork; it may hurt latency if we cork without
4645 much to send */
4646 if (!tconn->net_conf->no_cork)
4647 drbd_tcp_cork(tconn->meta.socket);
4648 if (tconn_process_done_ee(tconn))
4649 goto reconnect;
b411b363 4650 /* but unconditionally uncork unless disabled */
32862ec7
PR
4651 if (!tconn->net_conf->no_cork)
4652 drbd_tcp_uncork(tconn->meta.socket);
b411b363
PR
4653
4654 /* short circuit, recv_msg would return EINTR anyways. */
4655 if (signal_pending(current))
4656 continue;
4657
32862ec7
PR
4658 rv = drbd_recv_short(tconn->meta.socket, buf, expect-received, 0);
4659 clear_bit(SIGNAL_ASENDER, &tconn->flags);
b411b363
PR
4660
4661 flush_signals(current);
4662
4663 /* Note:
4664 * -EINTR (on meta) we got a signal
4665 * -EAGAIN (on meta) rcvtimeo expired
4666 * -ECONNRESET other side closed the connection
4667 * -ERESTARTSYS (on data) we got a signal
4668 * rv < 0 other than above: unexpected error!
4669 * rv == expected: full header or command
4670 * rv < expected: "woken" by signal during receive
4671 * rv == 0 : "connection shut down by peer"
4672 */
4673 if (likely(rv > 0)) {
4674 received += rv;
4675 buf += rv;
4676 } else if (rv == 0) {
32862ec7 4677 conn_err(tconn, "meta connection shut down by peer.\n");
b411b363
PR
4678 goto reconnect;
4679 } else if (rv == -EAGAIN) {
cb6518cb
LE
4680 /* If the data socket received something meanwhile,
4681 * that is good enough: peer is still alive. */
32862ec7
PR
4682 if (time_after(tconn->last_received,
4683 jiffies - tconn->meta.socket->sk->sk_rcvtimeo))
cb6518cb 4684 continue;
f36af18c 4685 if (ping_timeout_active) {
32862ec7 4686 conn_err(tconn, "PingAck did not arrive in time.\n");
b411b363
PR
4687 goto reconnect;
4688 }
32862ec7 4689 set_bit(SEND_PING, &tconn->flags);
b411b363
PR
4690 continue;
4691 } else if (rv == -EINTR) {
4692 continue;
4693 } else {
32862ec7 4694 conn_err(tconn, "sock_recvmsg returned %d\n", rv);
b411b363
PR
4695 goto reconnect;
4696 }
4697
4698 if (received == expect && cmd == NULL) {
32862ec7 4699 if (!decode_header(tconn, h, &pi))
b411b363 4700 goto reconnect;
77351055 4701 cmd = get_asender_cmd(pi.cmd);
b411b363 4702 if (unlikely(cmd == NULL)) {
32862ec7 4703 conn_err(tconn, "unknown command %d on meta (l: %d)\n",
77351055 4704 pi.cmd, pi.size);
b411b363
PR
4705 goto disconnect;
4706 }
4707 expect = cmd->pkt_size;
77351055 4708 if (pi.size != expect - sizeof(struct p_header)) {
32862ec7 4709 conn_err(tconn, "Wrong packet size on meta (c: %d, l: %d)\n",
77351055 4710 pi.cmd, pi.size);
b411b363 4711 goto reconnect;
257d0af6 4712 }
b411b363
PR
4713 }
4714 if (received == expect) {
32862ec7
PR
4715 tconn->last_received = jiffies;
4716 if (!cmd->process(vnr_to_mdev(tconn, pi.vnr), pi.cmd))
b411b363
PR
4717 goto reconnect;
4718
f36af18c
LE
4719 /* the idle_timeout (ping-int)
4720 * has been restored in got_PingAck() */
4721 if (cmd == get_asender_cmd(P_PING_ACK))
4722 ping_timeout_active = 0;
4723
b411b363
PR
4724 buf = h;
4725 received = 0;
257d0af6 4726 expect = sizeof(struct p_header);
b411b363
PR
4727 cmd = NULL;
4728 }
4729 }
4730
4731 if (0) {
4732reconnect:
bbeb641c 4733 conn_request_state(tconn, NS(conn, C_NETWORK_FAILURE), CS_HARD);
b411b363
PR
4734 }
4735 if (0) {
4736disconnect:
bbeb641c 4737 conn_request_state(tconn, NS(conn, C_DISCONNECTING), CS_HARD);
b411b363 4738 }
32862ec7 4739 clear_bit(SIGNAL_ASENDER, &tconn->flags);
b411b363 4740
32862ec7 4741 conn_info(tconn, "asender terminated\n");
b411b363
PR
4742
4743 return 0;
4744}
This page took 0.39388 seconds and 5 git commands to generate.