drbd: Converted drbd_recv() from mdev to tconn
[deliverable/linux.git] / drivers / block / drbd / drbd_receiver.c
CommitLineData
b411b363
PR
1/*
2 drbd_receiver.c
3
4 This file is part of DRBD by Philipp Reisner and Lars Ellenberg.
5
6 Copyright (C) 2001-2008, LINBIT Information Technologies GmbH.
7 Copyright (C) 1999-2008, Philipp Reisner <philipp.reisner@linbit.com>.
8 Copyright (C) 2002-2008, Lars Ellenberg <lars.ellenberg@linbit.com>.
9
10 drbd is free software; you can redistribute it and/or modify
11 it under the terms of the GNU General Public License as published by
12 the Free Software Foundation; either version 2, or (at your option)
13 any later version.
14
15 drbd is distributed in the hope that it will be useful,
16 but WITHOUT ANY WARRANTY; without even the implied warranty of
17 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18 GNU General Public License for more details.
19
20 You should have received a copy of the GNU General Public License
21 along with drbd; see the file COPYING. If not, write to
22 the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
23 */
24
25
b411b363
PR
26#include <linux/module.h>
27
28#include <asm/uaccess.h>
29#include <net/sock.h>
30
b411b363
PR
31#include <linux/drbd.h>
32#include <linux/fs.h>
33#include <linux/file.h>
34#include <linux/in.h>
35#include <linux/mm.h>
36#include <linux/memcontrol.h>
37#include <linux/mm_inline.h>
38#include <linux/slab.h>
b411b363
PR
39#include <linux/pkt_sched.h>
40#define __KERNEL_SYSCALLS__
41#include <linux/unistd.h>
42#include <linux/vmalloc.h>
43#include <linux/random.h>
b411b363
PR
44#include <linux/string.h>
45#include <linux/scatterlist.h>
46#include "drbd_int.h"
b411b363
PR
47#include "drbd_req.h"
48
49#include "drbd_vli.h"
50
b411b363
PR
51enum finish_epoch {
52 FE_STILL_LIVE,
53 FE_DESTROYED,
54 FE_RECYCLED,
55};
56
57static int drbd_do_handshake(struct drbd_conf *mdev);
58static int drbd_do_auth(struct drbd_conf *mdev);
59
60static enum finish_epoch drbd_may_finish_epoch(struct drbd_conf *, struct drbd_epoch *, enum epoch_event);
61static int e_end_block(struct drbd_conf *, struct drbd_work *, int);
62
b411b363
PR
63
64#define GFP_TRY (__GFP_HIGHMEM | __GFP_NOWARN)
65
45bb912b
LE
66/*
67 * some helper functions to deal with single linked page lists,
68 * page->private being our "next" pointer.
69 */
70
71/* If at least n pages are linked at head, get n pages off.
72 * Otherwise, don't modify head, and return NULL.
73 * Locking is the responsibility of the caller.
74 */
75static struct page *page_chain_del(struct page **head, int n)
76{
77 struct page *page;
78 struct page *tmp;
79
80 BUG_ON(!n);
81 BUG_ON(!head);
82
83 page = *head;
23ce4227
PR
84
85 if (!page)
86 return NULL;
87
45bb912b
LE
88 while (page) {
89 tmp = page_chain_next(page);
90 if (--n == 0)
91 break; /* found sufficient pages */
92 if (tmp == NULL)
93 /* insufficient pages, don't use any of them. */
94 return NULL;
95 page = tmp;
96 }
97
98 /* add end of list marker for the returned list */
99 set_page_private(page, 0);
100 /* actual return value, and adjustment of head */
101 page = *head;
102 *head = tmp;
103 return page;
104}
105
106/* may be used outside of locks to find the tail of a (usually short)
107 * "private" page chain, before adding it back to a global chain head
108 * with page_chain_add() under a spinlock. */
109static struct page *page_chain_tail(struct page *page, int *len)
110{
111 struct page *tmp;
112 int i = 1;
113 while ((tmp = page_chain_next(page)))
114 ++i, page = tmp;
115 if (len)
116 *len = i;
117 return page;
118}
119
120static int page_chain_free(struct page *page)
121{
122 struct page *tmp;
123 int i = 0;
124 page_chain_for_each_safe(page, tmp) {
125 put_page(page);
126 ++i;
127 }
128 return i;
129}
130
131static void page_chain_add(struct page **head,
132 struct page *chain_first, struct page *chain_last)
133{
134#if 1
135 struct page *tmp;
136 tmp = page_chain_tail(chain_first, NULL);
137 BUG_ON(tmp != chain_last);
138#endif
139
140 /* add chain to head */
141 set_page_private(chain_last, (unsigned long)*head);
142 *head = chain_first;
143}
144
145static struct page *drbd_pp_first_pages_or_try_alloc(struct drbd_conf *mdev, int number)
b411b363
PR
146{
147 struct page *page = NULL;
45bb912b
LE
148 struct page *tmp = NULL;
149 int i = 0;
b411b363
PR
150
151 /* Yes, testing drbd_pp_vacant outside the lock is racy.
152 * So what. It saves a spin_lock. */
45bb912b 153 if (drbd_pp_vacant >= number) {
b411b363 154 spin_lock(&drbd_pp_lock);
45bb912b
LE
155 page = page_chain_del(&drbd_pp_pool, number);
156 if (page)
157 drbd_pp_vacant -= number;
b411b363 158 spin_unlock(&drbd_pp_lock);
45bb912b
LE
159 if (page)
160 return page;
b411b363 161 }
45bb912b 162
b411b363
PR
163 /* GFP_TRY, because we must not cause arbitrary write-out: in a DRBD
164 * "criss-cross" setup, that might cause write-out on some other DRBD,
165 * which in turn might block on the other node at this very place. */
45bb912b
LE
166 for (i = 0; i < number; i++) {
167 tmp = alloc_page(GFP_TRY);
168 if (!tmp)
169 break;
170 set_page_private(tmp, (unsigned long)page);
171 page = tmp;
172 }
173
174 if (i == number)
175 return page;
176
177 /* Not enough pages immediately available this time.
178 * No need to jump around here, drbd_pp_alloc will retry this
179 * function "soon". */
180 if (page) {
181 tmp = page_chain_tail(page, NULL);
182 spin_lock(&drbd_pp_lock);
183 page_chain_add(&drbd_pp_pool, page, tmp);
184 drbd_pp_vacant += i;
185 spin_unlock(&drbd_pp_lock);
186 }
187 return NULL;
b411b363
PR
188}
189
b411b363
PR
190static void reclaim_net_ee(struct drbd_conf *mdev, struct list_head *to_be_freed)
191{
db830c46 192 struct drbd_peer_request *peer_req;
b411b363
PR
193 struct list_head *le, *tle;
194
195 /* The EEs are always appended to the end of the list. Since
196 they are sent in order over the wire, they have to finish
197 in order. As soon as we see the first not finished we can
198 stop to examine the list... */
199
200 list_for_each_safe(le, tle, &mdev->net_ee) {
db830c46
AG
201 peer_req = list_entry(le, struct drbd_peer_request, w.list);
202 if (drbd_ee_has_active_page(peer_req))
b411b363
PR
203 break;
204 list_move(le, to_be_freed);
205 }
206}
207
208static void drbd_kick_lo_and_reclaim_net(struct drbd_conf *mdev)
209{
210 LIST_HEAD(reclaimed);
db830c46 211 struct drbd_peer_request *peer_req, *t;
b411b363 212
87eeee41 213 spin_lock_irq(&mdev->tconn->req_lock);
b411b363 214 reclaim_net_ee(mdev, &reclaimed);
87eeee41 215 spin_unlock_irq(&mdev->tconn->req_lock);
b411b363 216
db830c46
AG
217 list_for_each_entry_safe(peer_req, t, &reclaimed, w.list)
218 drbd_free_net_ee(mdev, peer_req);
b411b363
PR
219}
220
221/**
45bb912b 222 * drbd_pp_alloc() - Returns @number pages, retries forever (or until signalled)
b411b363 223 * @mdev: DRBD device.
45bb912b
LE
224 * @number: number of pages requested
225 * @retry: whether to retry, if not enough pages are available right now
226 *
227 * Tries to allocate number pages, first from our own page pool, then from
228 * the kernel, unless this allocation would exceed the max_buffers setting.
229 * Possibly retry until DRBD frees sufficient pages somewhere else.
b411b363 230 *
45bb912b 231 * Returns a page chain linked via page->private.
b411b363 232 */
45bb912b 233static struct page *drbd_pp_alloc(struct drbd_conf *mdev, unsigned number, bool retry)
b411b363
PR
234{
235 struct page *page = NULL;
236 DEFINE_WAIT(wait);
237
45bb912b
LE
238 /* Yes, we may run up to @number over max_buffers. If we
239 * follow it strictly, the admin will get it wrong anyways. */
89e58e75 240 if (atomic_read(&mdev->pp_in_use) < mdev->tconn->net_conf->max_buffers)
45bb912b 241 page = drbd_pp_first_pages_or_try_alloc(mdev, number);
b411b363 242
45bb912b 243 while (page == NULL) {
b411b363
PR
244 prepare_to_wait(&drbd_pp_wait, &wait, TASK_INTERRUPTIBLE);
245
246 drbd_kick_lo_and_reclaim_net(mdev);
247
89e58e75 248 if (atomic_read(&mdev->pp_in_use) < mdev->tconn->net_conf->max_buffers) {
45bb912b 249 page = drbd_pp_first_pages_or_try_alloc(mdev, number);
b411b363
PR
250 if (page)
251 break;
252 }
253
254 if (!retry)
255 break;
256
257 if (signal_pending(current)) {
258 dev_warn(DEV, "drbd_pp_alloc interrupted!\n");
259 break;
260 }
261
262 schedule();
263 }
264 finish_wait(&drbd_pp_wait, &wait);
265
45bb912b
LE
266 if (page)
267 atomic_add(number, &mdev->pp_in_use);
b411b363
PR
268 return page;
269}
270
271/* Must not be used from irq, as that may deadlock: see drbd_pp_alloc.
87eeee41 272 * Is also used from inside an other spin_lock_irq(&mdev->tconn->req_lock);
45bb912b
LE
273 * Either links the page chain back to the global pool,
274 * or returns all pages to the system. */
435f0740 275static void drbd_pp_free(struct drbd_conf *mdev, struct page *page, int is_net)
b411b363 276{
435f0740 277 atomic_t *a = is_net ? &mdev->pp_in_use_by_net : &mdev->pp_in_use;
b411b363 278 int i;
435f0740 279
1816a2b4 280 if (drbd_pp_vacant > (DRBD_MAX_BIO_SIZE/PAGE_SIZE)*minor_count)
45bb912b
LE
281 i = page_chain_free(page);
282 else {
283 struct page *tmp;
284 tmp = page_chain_tail(page, &i);
285 spin_lock(&drbd_pp_lock);
286 page_chain_add(&drbd_pp_pool, page, tmp);
287 drbd_pp_vacant += i;
288 spin_unlock(&drbd_pp_lock);
b411b363 289 }
435f0740 290 i = atomic_sub_return(i, a);
45bb912b 291 if (i < 0)
435f0740
LE
292 dev_warn(DEV, "ASSERTION FAILED: %s: %d < 0\n",
293 is_net ? "pp_in_use_by_net" : "pp_in_use", i);
b411b363
PR
294 wake_up(&drbd_pp_wait);
295}
296
297/*
298You need to hold the req_lock:
299 _drbd_wait_ee_list_empty()
300
301You must not have the req_lock:
302 drbd_free_ee()
303 drbd_alloc_ee()
304 drbd_init_ee()
305 drbd_release_ee()
306 drbd_ee_fix_bhs()
307 drbd_process_done_ee()
308 drbd_clear_done_ee()
309 drbd_wait_ee_list_empty()
310*/
311
f6ffca9f
AG
312struct drbd_peer_request *
313drbd_alloc_ee(struct drbd_conf *mdev, u64 id, sector_t sector,
314 unsigned int data_size, gfp_t gfp_mask) __must_hold(local)
b411b363 315{
db830c46 316 struct drbd_peer_request *peer_req;
b411b363 317 struct page *page;
45bb912b 318 unsigned nr_pages = (data_size + PAGE_SIZE -1) >> PAGE_SHIFT;
b411b363 319
0cf9d27e 320 if (drbd_insert_fault(mdev, DRBD_FAULT_AL_EE))
b411b363
PR
321 return NULL;
322
db830c46
AG
323 peer_req = mempool_alloc(drbd_ee_mempool, gfp_mask & ~__GFP_HIGHMEM);
324 if (!peer_req) {
b411b363
PR
325 if (!(gfp_mask & __GFP_NOWARN))
326 dev_err(DEV, "alloc_ee: Allocation of an EE failed\n");
327 return NULL;
328 }
329
45bb912b
LE
330 page = drbd_pp_alloc(mdev, nr_pages, (gfp_mask & __GFP_WAIT));
331 if (!page)
332 goto fail;
b411b363 333
db830c46
AG
334 drbd_clear_interval(&peer_req->i);
335 peer_req->i.size = data_size;
336 peer_req->i.sector = sector;
337 peer_req->i.local = false;
338 peer_req->i.waiting = false;
339
340 peer_req->epoch = NULL;
341 peer_req->mdev = mdev;
342 peer_req->pages = page;
343 atomic_set(&peer_req->pending_bios, 0);
344 peer_req->flags = 0;
9a8e7753
AG
345 /*
346 * The block_id is opaque to the receiver. It is not endianness
347 * converted, and sent back to the sender unchanged.
348 */
db830c46 349 peer_req->block_id = id;
b411b363 350
db830c46 351 return peer_req;
b411b363 352
45bb912b 353 fail:
db830c46 354 mempool_free(peer_req, drbd_ee_mempool);
b411b363
PR
355 return NULL;
356}
357
db830c46 358void drbd_free_some_ee(struct drbd_conf *mdev, struct drbd_peer_request *peer_req,
f6ffca9f 359 int is_net)
b411b363 360{
db830c46
AG
361 if (peer_req->flags & EE_HAS_DIGEST)
362 kfree(peer_req->digest);
363 drbd_pp_free(mdev, peer_req->pages, is_net);
364 D_ASSERT(atomic_read(&peer_req->pending_bios) == 0);
365 D_ASSERT(drbd_interval_empty(&peer_req->i));
366 mempool_free(peer_req, drbd_ee_mempool);
b411b363
PR
367}
368
369int drbd_release_ee(struct drbd_conf *mdev, struct list_head *list)
370{
371 LIST_HEAD(work_list);
db830c46 372 struct drbd_peer_request *peer_req, *t;
b411b363 373 int count = 0;
435f0740 374 int is_net = list == &mdev->net_ee;
b411b363 375
87eeee41 376 spin_lock_irq(&mdev->tconn->req_lock);
b411b363 377 list_splice_init(list, &work_list);
87eeee41 378 spin_unlock_irq(&mdev->tconn->req_lock);
b411b363 379
db830c46
AG
380 list_for_each_entry_safe(peer_req, t, &work_list, w.list) {
381 drbd_free_some_ee(mdev, peer_req, is_net);
b411b363
PR
382 count++;
383 }
384 return count;
385}
386
387
388/*
389 * This function is called from _asender only_
8554df1c 390 * but see also comments in _req_mod(,BARRIER_ACKED)
b411b363
PR
391 * and receive_Barrier.
392 *
393 * Move entries from net_ee to done_ee, if ready.
394 * Grab done_ee, call all callbacks, free the entries.
395 * The callbacks typically send out ACKs.
396 */
397static int drbd_process_done_ee(struct drbd_conf *mdev)
398{
399 LIST_HEAD(work_list);
400 LIST_HEAD(reclaimed);
db830c46 401 struct drbd_peer_request *peer_req, *t;
b411b363
PR
402 int ok = (mdev->state.conn >= C_WF_REPORT_PARAMS);
403
87eeee41 404 spin_lock_irq(&mdev->tconn->req_lock);
b411b363
PR
405 reclaim_net_ee(mdev, &reclaimed);
406 list_splice_init(&mdev->done_ee, &work_list);
87eeee41 407 spin_unlock_irq(&mdev->tconn->req_lock);
b411b363 408
db830c46
AG
409 list_for_each_entry_safe(peer_req, t, &reclaimed, w.list)
410 drbd_free_net_ee(mdev, peer_req);
b411b363
PR
411
412 /* possible callbacks here:
413 * e_end_block, and e_end_resync_block, e_send_discard_ack.
414 * all ignore the last argument.
415 */
db830c46 416 list_for_each_entry_safe(peer_req, t, &work_list, w.list) {
b411b363 417 /* list_del not necessary, next/prev members not touched */
db830c46
AG
418 ok = peer_req->w.cb(mdev, &peer_req->w, !ok) && ok;
419 drbd_free_ee(mdev, peer_req);
b411b363
PR
420 }
421 wake_up(&mdev->ee_wait);
422
423 return ok;
424}
425
426void _drbd_wait_ee_list_empty(struct drbd_conf *mdev, struct list_head *head)
427{
428 DEFINE_WAIT(wait);
429
430 /* avoids spin_lock/unlock
431 * and calling prepare_to_wait in the fast path */
432 while (!list_empty(head)) {
433 prepare_to_wait(&mdev->ee_wait, &wait, TASK_UNINTERRUPTIBLE);
87eeee41 434 spin_unlock_irq(&mdev->tconn->req_lock);
7eaceacc 435 io_schedule();
b411b363 436 finish_wait(&mdev->ee_wait, &wait);
87eeee41 437 spin_lock_irq(&mdev->tconn->req_lock);
b411b363
PR
438 }
439}
440
441void drbd_wait_ee_list_empty(struct drbd_conf *mdev, struct list_head *head)
442{
87eeee41 443 spin_lock_irq(&mdev->tconn->req_lock);
b411b363 444 _drbd_wait_ee_list_empty(mdev, head);
87eeee41 445 spin_unlock_irq(&mdev->tconn->req_lock);
b411b363
PR
446}
447
448/* see also kernel_accept; which is only present since 2.6.18.
449 * also we want to log which part of it failed, exactly */
7653620d 450static int drbd_accept(const char **what, struct socket *sock, struct socket **newsock)
b411b363
PR
451{
452 struct sock *sk = sock->sk;
453 int err = 0;
454
455 *what = "listen";
456 err = sock->ops->listen(sock, 5);
457 if (err < 0)
458 goto out;
459
460 *what = "sock_create_lite";
461 err = sock_create_lite(sk->sk_family, sk->sk_type, sk->sk_protocol,
462 newsock);
463 if (err < 0)
464 goto out;
465
466 *what = "accept";
467 err = sock->ops->accept(sock, *newsock, 0);
468 if (err < 0) {
469 sock_release(*newsock);
470 *newsock = NULL;
471 goto out;
472 }
473 (*newsock)->ops = sock->ops;
474
475out:
476 return err;
477}
478
dbd9eea0 479static int drbd_recv_short(struct socket *sock, void *buf, size_t size, int flags)
b411b363
PR
480{
481 mm_segment_t oldfs;
482 struct kvec iov = {
483 .iov_base = buf,
484 .iov_len = size,
485 };
486 struct msghdr msg = {
487 .msg_iovlen = 1,
488 .msg_iov = (struct iovec *)&iov,
489 .msg_flags = (flags ? flags : MSG_WAITALL | MSG_NOSIGNAL)
490 };
491 int rv;
492
493 oldfs = get_fs();
494 set_fs(KERNEL_DS);
495 rv = sock_recvmsg(sock, &msg, size, msg.msg_flags);
496 set_fs(oldfs);
497
498 return rv;
499}
500
de0ff338 501static int drbd_recv(struct drbd_tconn *tconn, void *buf, size_t size)
b411b363
PR
502{
503 mm_segment_t oldfs;
504 struct kvec iov = {
505 .iov_base = buf,
506 .iov_len = size,
507 };
508 struct msghdr msg = {
509 .msg_iovlen = 1,
510 .msg_iov = (struct iovec *)&iov,
511 .msg_flags = MSG_WAITALL | MSG_NOSIGNAL
512 };
513 int rv;
514
515 oldfs = get_fs();
516 set_fs(KERNEL_DS);
517
518 for (;;) {
de0ff338 519 rv = sock_recvmsg(tconn->data.socket, &msg, size, msg.msg_flags);
b411b363
PR
520 if (rv == size)
521 break;
522
523 /* Note:
524 * ECONNRESET other side closed the connection
525 * ERESTARTSYS (on sock) we got a signal
526 */
527
528 if (rv < 0) {
529 if (rv == -ECONNRESET)
de0ff338 530 conn_info(tconn, "sock was reset by peer\n");
b411b363 531 else if (rv != -ERESTARTSYS)
de0ff338 532 conn_err(tconn, "sock_recvmsg returned %d\n", rv);
b411b363
PR
533 break;
534 } else if (rv == 0) {
de0ff338 535 conn_info(tconn, "sock was shut down by peer\n");
b411b363
PR
536 break;
537 } else {
538 /* signal came in, or peer/link went down,
539 * after we read a partial message
540 */
541 /* D_ASSERT(signal_pending(current)); */
542 break;
543 }
544 };
545
546 set_fs(oldfs);
547
548 if (rv != size)
de0ff338 549 drbd_force_state(tconn->volume0, NS(conn, C_BROKEN_PIPE));
b411b363
PR
550
551 return rv;
552}
553
5dbf1673
LE
554/* quoting tcp(7):
555 * On individual connections, the socket buffer size must be set prior to the
556 * listen(2) or connect(2) calls in order to have it take effect.
557 * This is our wrapper to do so.
558 */
559static void drbd_setbufsize(struct socket *sock, unsigned int snd,
560 unsigned int rcv)
561{
562 /* open coded SO_SNDBUF, SO_RCVBUF */
563 if (snd) {
564 sock->sk->sk_sndbuf = snd;
565 sock->sk->sk_userlocks |= SOCK_SNDBUF_LOCK;
566 }
567 if (rcv) {
568 sock->sk->sk_rcvbuf = rcv;
569 sock->sk->sk_userlocks |= SOCK_RCVBUF_LOCK;
570 }
571}
572
eac3e990 573static struct socket *drbd_try_connect(struct drbd_tconn *tconn)
b411b363
PR
574{
575 const char *what;
576 struct socket *sock;
577 struct sockaddr_in6 src_in6;
578 int err;
579 int disconnect_on_error = 1;
580
eac3e990 581 if (!get_net_conf(tconn))
b411b363
PR
582 return NULL;
583
584 what = "sock_create_kern";
eac3e990 585 err = sock_create_kern(((struct sockaddr *)tconn->net_conf->my_addr)->sa_family,
b411b363
PR
586 SOCK_STREAM, IPPROTO_TCP, &sock);
587 if (err < 0) {
588 sock = NULL;
589 goto out;
590 }
591
592 sock->sk->sk_rcvtimeo =
eac3e990
PR
593 sock->sk->sk_sndtimeo = tconn->net_conf->try_connect_int*HZ;
594 drbd_setbufsize(sock, tconn->net_conf->sndbuf_size,
595 tconn->net_conf->rcvbuf_size);
b411b363
PR
596
597 /* explicitly bind to the configured IP as source IP
598 * for the outgoing connections.
599 * This is needed for multihomed hosts and to be
600 * able to use lo: interfaces for drbd.
601 * Make sure to use 0 as port number, so linux selects
602 * a free one dynamically.
603 */
eac3e990
PR
604 memcpy(&src_in6, tconn->net_conf->my_addr,
605 min_t(int, tconn->net_conf->my_addr_len, sizeof(src_in6)));
606 if (((struct sockaddr *)tconn->net_conf->my_addr)->sa_family == AF_INET6)
b411b363
PR
607 src_in6.sin6_port = 0;
608 else
609 ((struct sockaddr_in *)&src_in6)->sin_port = 0; /* AF_INET & AF_SCI */
610
611 what = "bind before connect";
612 err = sock->ops->bind(sock,
613 (struct sockaddr *) &src_in6,
eac3e990 614 tconn->net_conf->my_addr_len);
b411b363
PR
615 if (err < 0)
616 goto out;
617
618 /* connect may fail, peer not yet available.
619 * stay C_WF_CONNECTION, don't go Disconnecting! */
620 disconnect_on_error = 0;
621 what = "connect";
622 err = sock->ops->connect(sock,
eac3e990
PR
623 (struct sockaddr *)tconn->net_conf->peer_addr,
624 tconn->net_conf->peer_addr_len, 0);
b411b363
PR
625
626out:
627 if (err < 0) {
628 if (sock) {
629 sock_release(sock);
630 sock = NULL;
631 }
632 switch (-err) {
633 /* timeout, busy, signal pending */
634 case ETIMEDOUT: case EAGAIN: case EINPROGRESS:
635 case EINTR: case ERESTARTSYS:
636 /* peer not (yet) available, network problem */
637 case ECONNREFUSED: case ENETUNREACH:
638 case EHOSTDOWN: case EHOSTUNREACH:
639 disconnect_on_error = 0;
640 break;
641 default:
eac3e990 642 conn_err(tconn, "%s failed, err = %d\n", what, err);
b411b363
PR
643 }
644 if (disconnect_on_error)
eac3e990 645 drbd_force_state(tconn->volume0, NS(conn, C_DISCONNECTING));
b411b363 646 }
eac3e990 647 put_net_conf(tconn);
b411b363
PR
648 return sock;
649}
650
7653620d 651static struct socket *drbd_wait_for_connect(struct drbd_tconn *tconn)
b411b363
PR
652{
653 int timeo, err;
654 struct socket *s_estab = NULL, *s_listen;
655 const char *what;
656
7653620d 657 if (!get_net_conf(tconn))
b411b363
PR
658 return NULL;
659
660 what = "sock_create_kern";
7653620d 661 err = sock_create_kern(((struct sockaddr *)tconn->net_conf->my_addr)->sa_family,
b411b363
PR
662 SOCK_STREAM, IPPROTO_TCP, &s_listen);
663 if (err) {
664 s_listen = NULL;
665 goto out;
666 }
667
7653620d 668 timeo = tconn->net_conf->try_connect_int * HZ;
b411b363
PR
669 timeo += (random32() & 1) ? timeo / 7 : -timeo / 7; /* 28.5% random jitter */
670
671 s_listen->sk->sk_reuse = 1; /* SO_REUSEADDR */
672 s_listen->sk->sk_rcvtimeo = timeo;
673 s_listen->sk->sk_sndtimeo = timeo;
7653620d
PR
674 drbd_setbufsize(s_listen, tconn->net_conf->sndbuf_size,
675 tconn->net_conf->rcvbuf_size);
b411b363
PR
676
677 what = "bind before listen";
678 err = s_listen->ops->bind(s_listen,
7653620d
PR
679 (struct sockaddr *) tconn->net_conf->my_addr,
680 tconn->net_conf->my_addr_len);
b411b363
PR
681 if (err < 0)
682 goto out;
683
7653620d 684 err = drbd_accept(&what, s_listen, &s_estab);
b411b363
PR
685
686out:
687 if (s_listen)
688 sock_release(s_listen);
689 if (err < 0) {
690 if (err != -EAGAIN && err != -EINTR && err != -ERESTARTSYS) {
7653620d
PR
691 conn_err(tconn, "%s failed, err = %d\n", what, err);
692 drbd_force_state(tconn->volume0, NS(conn, C_DISCONNECTING));
b411b363
PR
693 }
694 }
7653620d 695 put_net_conf(tconn);
b411b363
PR
696
697 return s_estab;
698}
699
d38e787e 700static int drbd_send_fp(struct drbd_tconn *tconn, struct socket *sock, enum drbd_packet cmd)
b411b363 701{
d38e787e 702 struct p_header *h = &tconn->data.sbuf.header;
b411b363 703
d38e787e 704 return _conn_send_cmd(tconn, 0, sock, cmd, h, sizeof(*h), 0);
b411b363
PR
705}
706
a25b63f1 707static enum drbd_packet drbd_recv_fp(struct drbd_tconn *tconn, struct socket *sock)
b411b363 708{
a25b63f1 709 struct p_header80 *h = &tconn->data.rbuf.header.h80;
b411b363
PR
710 int rr;
711
dbd9eea0 712 rr = drbd_recv_short(sock, h, sizeof(*h), 0);
b411b363 713
ca9bc12b 714 if (rr == sizeof(*h) && h->magic == cpu_to_be32(DRBD_MAGIC))
b411b363
PR
715 return be16_to_cpu(h->command);
716
717 return 0xffff;
718}
719
720/**
721 * drbd_socket_okay() - Free the socket if its connection is not okay
b411b363
PR
722 * @sock: pointer to the pointer to the socket.
723 */
dbd9eea0 724static int drbd_socket_okay(struct socket **sock)
b411b363
PR
725{
726 int rr;
727 char tb[4];
728
729 if (!*sock)
81e84650 730 return false;
b411b363 731
dbd9eea0 732 rr = drbd_recv_short(*sock, tb, 4, MSG_DONTWAIT | MSG_PEEK);
b411b363
PR
733
734 if (rr > 0 || rr == -EAGAIN) {
81e84650 735 return true;
b411b363
PR
736 } else {
737 sock_release(*sock);
738 *sock = NULL;
81e84650 739 return false;
b411b363
PR
740 }
741}
742
743/*
744 * return values:
745 * 1 yes, we have a valid connection
746 * 0 oops, did not work out, please try again
747 * -1 peer talks different language,
748 * no point in trying again, please go standalone.
749 * -2 We do not have a network config...
750 */
751static int drbd_connect(struct drbd_conf *mdev)
752{
753 struct socket *s, *sock, *msock;
754 int try, h, ok;
755
e42325a5 756 D_ASSERT(!mdev->tconn->data.socket);
b411b363 757
b411b363
PR
758 if (drbd_request_state(mdev, NS(conn, C_WF_CONNECTION)) < SS_SUCCESS)
759 return -2;
760
25703f83 761 clear_bit(DISCARD_CONCURRENT, &mdev->tconn->flags);
fd340c12
PR
762 mdev->tconn->agreed_pro_version = 99;
763 /* agreed_pro_version must be smaller than 100 so we send the old
764 header (h80) in the first packet and in the handshake packet. */
b411b363
PR
765
766 sock = NULL;
767 msock = NULL;
768
769 do {
770 for (try = 0;;) {
771 /* 3 tries, this should take less than a second! */
eac3e990 772 s = drbd_try_connect(mdev->tconn);
b411b363
PR
773 if (s || ++try >= 3)
774 break;
775 /* give the other side time to call bind() & listen() */
20ee6390 776 schedule_timeout_interruptible(HZ / 10);
b411b363
PR
777 }
778
779 if (s) {
780 if (!sock) {
d38e787e 781 drbd_send_fp(mdev->tconn, s, P_HAND_SHAKE_S);
b411b363
PR
782 sock = s;
783 s = NULL;
784 } else if (!msock) {
d38e787e 785 drbd_send_fp(mdev->tconn, s, P_HAND_SHAKE_M);
b411b363
PR
786 msock = s;
787 s = NULL;
788 } else {
789 dev_err(DEV, "Logic error in drbd_connect()\n");
790 goto out_release_sockets;
791 }
792 }
793
794 if (sock && msock) {
89e58e75 795 schedule_timeout_interruptible(mdev->tconn->net_conf->ping_timeo*HZ/10);
dbd9eea0
PR
796 ok = drbd_socket_okay(&sock);
797 ok = drbd_socket_okay(&msock) && ok;
b411b363
PR
798 if (ok)
799 break;
800 }
801
802retry:
7653620d 803 s = drbd_wait_for_connect(mdev->tconn);
b411b363 804 if (s) {
a25b63f1 805 try = drbd_recv_fp(mdev->tconn, s);
dbd9eea0
PR
806 drbd_socket_okay(&sock);
807 drbd_socket_okay(&msock);
b411b363
PR
808 switch (try) {
809 case P_HAND_SHAKE_S:
810 if (sock) {
811 dev_warn(DEV, "initial packet S crossed\n");
812 sock_release(sock);
813 }
814 sock = s;
815 break;
816 case P_HAND_SHAKE_M:
817 if (msock) {
818 dev_warn(DEV, "initial packet M crossed\n");
819 sock_release(msock);
820 }
821 msock = s;
25703f83 822 set_bit(DISCARD_CONCURRENT, &mdev->tconn->flags);
b411b363
PR
823 break;
824 default:
825 dev_warn(DEV, "Error receiving initial packet\n");
826 sock_release(s);
827 if (random32() & 1)
828 goto retry;
829 }
830 }
831
832 if (mdev->state.conn <= C_DISCONNECTING)
833 goto out_release_sockets;
834 if (signal_pending(current)) {
835 flush_signals(current);
836 smp_rmb();
e6b3ea83 837 if (get_t_state(&mdev->tconn->receiver) == EXITING)
b411b363
PR
838 goto out_release_sockets;
839 }
840
841 if (sock && msock) {
dbd9eea0
PR
842 ok = drbd_socket_okay(&sock);
843 ok = drbd_socket_okay(&msock) && ok;
b411b363
PR
844 if (ok)
845 break;
846 }
847 } while (1);
848
849 msock->sk->sk_reuse = 1; /* SO_REUSEADDR */
850 sock->sk->sk_reuse = 1; /* SO_REUSEADDR */
851
852 sock->sk->sk_allocation = GFP_NOIO;
853 msock->sk->sk_allocation = GFP_NOIO;
854
855 sock->sk->sk_priority = TC_PRIO_INTERACTIVE_BULK;
856 msock->sk->sk_priority = TC_PRIO_INTERACTIVE;
857
b411b363 858 /* NOT YET ...
89e58e75 859 * sock->sk->sk_sndtimeo = mdev->tconn->net_conf->timeout*HZ/10;
b411b363
PR
860 * sock->sk->sk_rcvtimeo = MAX_SCHEDULE_TIMEOUT;
861 * first set it to the P_HAND_SHAKE timeout,
862 * which we set to 4x the configured ping_timeout. */
863 sock->sk->sk_sndtimeo =
89e58e75 864 sock->sk->sk_rcvtimeo = mdev->tconn->net_conf->ping_timeo*4*HZ/10;
b411b363 865
89e58e75
PR
866 msock->sk->sk_sndtimeo = mdev->tconn->net_conf->timeout*HZ/10;
867 msock->sk->sk_rcvtimeo = mdev->tconn->net_conf->ping_int*HZ;
b411b363
PR
868
869 /* we don't want delays.
25985edc 870 * we use TCP_CORK where appropriate, though */
b411b363
PR
871 drbd_tcp_nodelay(sock);
872 drbd_tcp_nodelay(msock);
873
e42325a5
PR
874 mdev->tconn->data.socket = sock;
875 mdev->tconn->meta.socket = msock;
31890f4a 876 mdev->tconn->last_received = jiffies;
b411b363 877
e6b3ea83 878 D_ASSERT(mdev->tconn->asender.task == NULL);
b411b363
PR
879
880 h = drbd_do_handshake(mdev);
881 if (h <= 0)
882 return h;
883
a0638456 884 if (mdev->tconn->cram_hmac_tfm) {
b411b363 885 /* drbd_request_state(mdev, NS(conn, WFAuth)); */
b10d96cb
JT
886 switch (drbd_do_auth(mdev)) {
887 case -1:
b411b363
PR
888 dev_err(DEV, "Authentication of peer failed\n");
889 return -1;
b10d96cb
JT
890 case 0:
891 dev_err(DEV, "Authentication of peer failed, trying again.\n");
892 return 0;
b411b363
PR
893 }
894 }
895
896 if (drbd_request_state(mdev, NS(conn, C_WF_REPORT_PARAMS)) < SS_SUCCESS)
897 return 0;
898
89e58e75 899 sock->sk->sk_sndtimeo = mdev->tconn->net_conf->timeout*HZ/10;
b411b363
PR
900 sock->sk->sk_rcvtimeo = MAX_SCHEDULE_TIMEOUT;
901
902 atomic_set(&mdev->packet_seq, 0);
903 mdev->peer_seq = 0;
904
e6b3ea83 905 drbd_thread_start(&mdev->tconn->asender);
b411b363 906
148efa16 907 if (drbd_send_protocol(mdev) == -1)
7e2455c1 908 return -1;
b411b363 909 drbd_send_sync_param(mdev, &mdev->sync_conf);
e89b591c 910 drbd_send_sizes(mdev, 0, 0);
b411b363
PR
911 drbd_send_uuids(mdev);
912 drbd_send_state(mdev);
913 clear_bit(USE_DEGR_WFC_T, &mdev->flags);
914 clear_bit(RESIZE_PENDING, &mdev->flags);
7fde2be9 915 mod_timer(&mdev->request_timer, jiffies + HZ); /* just start it here. */
b411b363
PR
916
917 return 1;
918
919out_release_sockets:
920 if (sock)
921 sock_release(sock);
922 if (msock)
923 sock_release(msock);
924 return -1;
925}
926
d8763023
AG
927static bool decode_header(struct drbd_conf *mdev, struct p_header *h,
928 enum drbd_packet *cmd, unsigned int *packet_size)
b411b363 929{
fd340c12 930 if (h->h80.magic == cpu_to_be32(DRBD_MAGIC)) {
02918be2
PR
931 *cmd = be16_to_cpu(h->h80.command);
932 *packet_size = be16_to_cpu(h->h80.length);
ca9bc12b 933 } else if (h->h95.magic == cpu_to_be16(DRBD_MAGIC_BIG)) {
02918be2 934 *cmd = be16_to_cpu(h->h95.command);
fd340c12 935 *packet_size = be32_to_cpu(h->h95.length) & 0x00ffffff;
02918be2 936 } else {
004352fa
LE
937 dev_err(DEV, "magic?? on data m: 0x%08x c: %d l: %d\n",
938 be32_to_cpu(h->h80.magic),
939 be16_to_cpu(h->h80.command),
940 be16_to_cpu(h->h80.length));
81e84650 941 return false;
b411b363 942 }
257d0af6
PR
943 return true;
944}
945
d8763023
AG
946static int drbd_recv_header(struct drbd_conf *mdev, enum drbd_packet *cmd,
947 unsigned int *packet_size)
257d0af6
PR
948{
949 struct p_header *h = &mdev->tconn->data.rbuf.header;
950 int r;
951
de0ff338 952 r = drbd_recv(mdev->tconn, h, sizeof(*h));
257d0af6
PR
953 if (unlikely(r != sizeof(*h))) {
954 if (!signal_pending(current))
955 dev_warn(DEV, "short read expecting header on sock: r=%d\n", r);
956 return false;
957 }
958
959 r = decode_header(mdev, h, cmd, packet_size);
31890f4a 960 mdev->tconn->last_received = jiffies;
b411b363 961
257d0af6 962 return r;
b411b363
PR
963}
964
2451fc3b 965static void drbd_flush(struct drbd_conf *mdev)
b411b363
PR
966{
967 int rv;
968
969 if (mdev->write_ordering >= WO_bdev_flush && get_ldev(mdev)) {
fbd9b09a 970 rv = blkdev_issue_flush(mdev->ldev->backing_bdev, GFP_KERNEL,
dd3932ed 971 NULL);
b411b363
PR
972 if (rv) {
973 dev_err(DEV, "local disk flush failed with status %d\n", rv);
974 /* would rather check on EOPNOTSUPP, but that is not reliable.
975 * don't try again for ANY return value != 0
976 * if (rv == -EOPNOTSUPP) */
977 drbd_bump_write_ordering(mdev, WO_drain_io);
978 }
979 put_ldev(mdev);
980 }
b411b363
PR
981}
982
983/**
984 * drbd_may_finish_epoch() - Applies an epoch_event to the epoch's state, eventually finishes it.
985 * @mdev: DRBD device.
986 * @epoch: Epoch object.
987 * @ev: Epoch event.
988 */
989static enum finish_epoch drbd_may_finish_epoch(struct drbd_conf *mdev,
990 struct drbd_epoch *epoch,
991 enum epoch_event ev)
992{
2451fc3b 993 int epoch_size;
b411b363 994 struct drbd_epoch *next_epoch;
b411b363
PR
995 enum finish_epoch rv = FE_STILL_LIVE;
996
997 spin_lock(&mdev->epoch_lock);
998 do {
999 next_epoch = NULL;
b411b363
PR
1000
1001 epoch_size = atomic_read(&epoch->epoch_size);
1002
1003 switch (ev & ~EV_CLEANUP) {
1004 case EV_PUT:
1005 atomic_dec(&epoch->active);
1006 break;
1007 case EV_GOT_BARRIER_NR:
1008 set_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags);
b411b363
PR
1009 break;
1010 case EV_BECAME_LAST:
1011 /* nothing to do*/
1012 break;
1013 }
1014
b411b363
PR
1015 if (epoch_size != 0 &&
1016 atomic_read(&epoch->active) == 0 &&
2451fc3b 1017 test_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags)) {
b411b363
PR
1018 if (!(ev & EV_CLEANUP)) {
1019 spin_unlock(&mdev->epoch_lock);
1020 drbd_send_b_ack(mdev, epoch->barrier_nr, epoch_size);
1021 spin_lock(&mdev->epoch_lock);
1022 }
1023 dec_unacked(mdev);
1024
1025 if (mdev->current_epoch != epoch) {
1026 next_epoch = list_entry(epoch->list.next, struct drbd_epoch, list);
1027 list_del(&epoch->list);
1028 ev = EV_BECAME_LAST | (ev & EV_CLEANUP);
1029 mdev->epochs--;
b411b363
PR
1030 kfree(epoch);
1031
1032 if (rv == FE_STILL_LIVE)
1033 rv = FE_DESTROYED;
1034 } else {
1035 epoch->flags = 0;
1036 atomic_set(&epoch->epoch_size, 0);
698f9315 1037 /* atomic_set(&epoch->active, 0); is already zero */
b411b363
PR
1038 if (rv == FE_STILL_LIVE)
1039 rv = FE_RECYCLED;
2451fc3b 1040 wake_up(&mdev->ee_wait);
b411b363
PR
1041 }
1042 }
1043
1044 if (!next_epoch)
1045 break;
1046
1047 epoch = next_epoch;
1048 } while (1);
1049
1050 spin_unlock(&mdev->epoch_lock);
1051
b411b363
PR
1052 return rv;
1053}
1054
1055/**
1056 * drbd_bump_write_ordering() - Fall back to an other write ordering method
1057 * @mdev: DRBD device.
1058 * @wo: Write ordering method to try.
1059 */
1060void drbd_bump_write_ordering(struct drbd_conf *mdev, enum write_ordering_e wo) __must_hold(local)
1061{
1062 enum write_ordering_e pwo;
1063 static char *write_ordering_str[] = {
1064 [WO_none] = "none",
1065 [WO_drain_io] = "drain",
1066 [WO_bdev_flush] = "flush",
b411b363
PR
1067 };
1068
1069 pwo = mdev->write_ordering;
1070 wo = min(pwo, wo);
b411b363
PR
1071 if (wo == WO_bdev_flush && mdev->ldev->dc.no_disk_flush)
1072 wo = WO_drain_io;
1073 if (wo == WO_drain_io && mdev->ldev->dc.no_disk_drain)
1074 wo = WO_none;
1075 mdev->write_ordering = wo;
2451fc3b 1076 if (pwo != mdev->write_ordering || wo == WO_bdev_flush)
b411b363
PR
1077 dev_info(DEV, "Method to ensure write ordering: %s\n", write_ordering_str[mdev->write_ordering]);
1078}
1079
45bb912b
LE
1080/**
1081 * drbd_submit_ee()
1082 * @mdev: DRBD device.
db830c46 1083 * @peer_req: peer request
45bb912b 1084 * @rw: flag field, see bio->bi_rw
10f6d992
LE
1085 *
1086 * May spread the pages to multiple bios,
1087 * depending on bio_add_page restrictions.
1088 *
1089 * Returns 0 if all bios have been submitted,
1090 * -ENOMEM if we could not allocate enough bios,
1091 * -ENOSPC (any better suggestion?) if we have not been able to bio_add_page a
1092 * single page to an empty bio (which should never happen and likely indicates
1093 * that the lower level IO stack is in some way broken). This has been observed
1094 * on certain Xen deployments.
45bb912b
LE
1095 */
1096/* TODO allocate from our own bio_set. */
db830c46 1097int drbd_submit_ee(struct drbd_conf *mdev, struct drbd_peer_request *peer_req,
f6ffca9f 1098 const unsigned rw, const int fault_type)
45bb912b
LE
1099{
1100 struct bio *bios = NULL;
1101 struct bio *bio;
db830c46
AG
1102 struct page *page = peer_req->pages;
1103 sector_t sector = peer_req->i.sector;
1104 unsigned ds = peer_req->i.size;
45bb912b
LE
1105 unsigned n_bios = 0;
1106 unsigned nr_pages = (ds + PAGE_SIZE -1) >> PAGE_SHIFT;
10f6d992 1107 int err = -ENOMEM;
45bb912b
LE
1108
1109 /* In most cases, we will only need one bio. But in case the lower
1110 * level restrictions happen to be different at this offset on this
1111 * side than those of the sending peer, we may need to submit the
1112 * request in more than one bio. */
1113next_bio:
1114 bio = bio_alloc(GFP_NOIO, nr_pages);
1115 if (!bio) {
1116 dev_err(DEV, "submit_ee: Allocation of a bio failed\n");
1117 goto fail;
1118 }
db830c46 1119 /* > peer_req->i.sector, unless this is the first bio */
45bb912b
LE
1120 bio->bi_sector = sector;
1121 bio->bi_bdev = mdev->ldev->backing_bdev;
45bb912b 1122 bio->bi_rw = rw;
db830c46 1123 bio->bi_private = peer_req;
45bb912b
LE
1124 bio->bi_end_io = drbd_endio_sec;
1125
1126 bio->bi_next = bios;
1127 bios = bio;
1128 ++n_bios;
1129
1130 page_chain_for_each(page) {
1131 unsigned len = min_t(unsigned, ds, PAGE_SIZE);
1132 if (!bio_add_page(bio, page, len, 0)) {
10f6d992
LE
1133 /* A single page must always be possible!
1134 * But in case it fails anyways,
1135 * we deal with it, and complain (below). */
1136 if (bio->bi_vcnt == 0) {
1137 dev_err(DEV,
1138 "bio_add_page failed for len=%u, "
1139 "bi_vcnt=0 (bi_sector=%llu)\n",
1140 len, (unsigned long long)bio->bi_sector);
1141 err = -ENOSPC;
1142 goto fail;
1143 }
45bb912b
LE
1144 goto next_bio;
1145 }
1146 ds -= len;
1147 sector += len >> 9;
1148 --nr_pages;
1149 }
1150 D_ASSERT(page == NULL);
1151 D_ASSERT(ds == 0);
1152
db830c46 1153 atomic_set(&peer_req->pending_bios, n_bios);
45bb912b
LE
1154 do {
1155 bio = bios;
1156 bios = bios->bi_next;
1157 bio->bi_next = NULL;
1158
45bb912b 1159 drbd_generic_make_request(mdev, fault_type, bio);
45bb912b 1160 } while (bios);
45bb912b
LE
1161 return 0;
1162
1163fail:
1164 while (bios) {
1165 bio = bios;
1166 bios = bios->bi_next;
1167 bio_put(bio);
1168 }
10f6d992 1169 return err;
45bb912b
LE
1170}
1171
53840641 1172static void drbd_remove_epoch_entry_interval(struct drbd_conf *mdev,
db830c46 1173 struct drbd_peer_request *peer_req)
53840641 1174{
db830c46 1175 struct drbd_interval *i = &peer_req->i;
53840641
AG
1176
1177 drbd_remove_interval(&mdev->write_requests, i);
1178 drbd_clear_interval(i);
1179
6c852bec 1180 /* Wake up any processes waiting for this peer request to complete. */
53840641
AG
1181 if (i->waiting)
1182 wake_up(&mdev->misc_wait);
1183}
1184
d8763023
AG
1185static int receive_Barrier(struct drbd_conf *mdev, enum drbd_packet cmd,
1186 unsigned int data_size)
b411b363 1187{
2451fc3b 1188 int rv;
e42325a5 1189 struct p_barrier *p = &mdev->tconn->data.rbuf.barrier;
b411b363
PR
1190 struct drbd_epoch *epoch;
1191
b411b363
PR
1192 inc_unacked(mdev);
1193
b411b363
PR
1194 mdev->current_epoch->barrier_nr = p->barrier;
1195 rv = drbd_may_finish_epoch(mdev, mdev->current_epoch, EV_GOT_BARRIER_NR);
1196
1197 /* P_BARRIER_ACK may imply that the corresponding extent is dropped from
1198 * the activity log, which means it would not be resynced in case the
1199 * R_PRIMARY crashes now.
1200 * Therefore we must send the barrier_ack after the barrier request was
1201 * completed. */
1202 switch (mdev->write_ordering) {
b411b363
PR
1203 case WO_none:
1204 if (rv == FE_RECYCLED)
81e84650 1205 return true;
2451fc3b
PR
1206
1207 /* receiver context, in the writeout path of the other node.
1208 * avoid potential distributed deadlock */
1209 epoch = kmalloc(sizeof(struct drbd_epoch), GFP_NOIO);
1210 if (epoch)
1211 break;
1212 else
1213 dev_warn(DEV, "Allocation of an epoch failed, slowing down\n");
1214 /* Fall through */
b411b363
PR
1215
1216 case WO_bdev_flush:
1217 case WO_drain_io:
b411b363 1218 drbd_wait_ee_list_empty(mdev, &mdev->active_ee);
2451fc3b
PR
1219 drbd_flush(mdev);
1220
1221 if (atomic_read(&mdev->current_epoch->epoch_size)) {
1222 epoch = kmalloc(sizeof(struct drbd_epoch), GFP_NOIO);
1223 if (epoch)
1224 break;
b411b363
PR
1225 }
1226
2451fc3b
PR
1227 epoch = mdev->current_epoch;
1228 wait_event(mdev->ee_wait, atomic_read(&epoch->epoch_size) == 0);
1229
1230 D_ASSERT(atomic_read(&epoch->active) == 0);
1231 D_ASSERT(epoch->flags == 0);
b411b363 1232
81e84650 1233 return true;
2451fc3b
PR
1234 default:
1235 dev_err(DEV, "Strangeness in mdev->write_ordering %d\n", mdev->write_ordering);
81e84650 1236 return false;
b411b363
PR
1237 }
1238
1239 epoch->flags = 0;
1240 atomic_set(&epoch->epoch_size, 0);
1241 atomic_set(&epoch->active, 0);
1242
1243 spin_lock(&mdev->epoch_lock);
1244 if (atomic_read(&mdev->current_epoch->epoch_size)) {
1245 list_add(&epoch->list, &mdev->current_epoch->list);
1246 mdev->current_epoch = epoch;
1247 mdev->epochs++;
b411b363
PR
1248 } else {
1249 /* The current_epoch got recycled while we allocated this one... */
1250 kfree(epoch);
1251 }
1252 spin_unlock(&mdev->epoch_lock);
1253
81e84650 1254 return true;
b411b363
PR
1255}
1256
1257/* used from receive_RSDataReply (recv_resync_read)
1258 * and from receive_Data */
f6ffca9f
AG
1259static struct drbd_peer_request *
1260read_in_block(struct drbd_conf *mdev, u64 id, sector_t sector,
1261 int data_size) __must_hold(local)
b411b363 1262{
6666032a 1263 const sector_t capacity = drbd_get_capacity(mdev->this_bdev);
db830c46 1264 struct drbd_peer_request *peer_req;
b411b363 1265 struct page *page;
45bb912b 1266 int dgs, ds, rr;
a0638456
PR
1267 void *dig_in = mdev->tconn->int_dig_in;
1268 void *dig_vv = mdev->tconn->int_dig_vv;
6b4388ac 1269 unsigned long *data;
b411b363 1270
a0638456
PR
1271 dgs = (mdev->tconn->agreed_pro_version >= 87 && mdev->tconn->integrity_r_tfm) ?
1272 crypto_hash_digestsize(mdev->tconn->integrity_r_tfm) : 0;
b411b363
PR
1273
1274 if (dgs) {
de0ff338 1275 rr = drbd_recv(mdev->tconn, dig_in, dgs);
b411b363 1276 if (rr != dgs) {
0ddc5549
LE
1277 if (!signal_pending(current))
1278 dev_warn(DEV,
1279 "short read receiving data digest: read %d expected %d\n",
1280 rr, dgs);
b411b363
PR
1281 return NULL;
1282 }
1283 }
1284
1285 data_size -= dgs;
1286
841ce241
AG
1287 if (!expect(data_size != 0))
1288 return NULL;
1289 if (!expect(IS_ALIGNED(data_size, 512)))
1290 return NULL;
1291 if (!expect(data_size <= DRBD_MAX_BIO_SIZE))
1292 return NULL;
b411b363 1293
6666032a
LE
1294 /* even though we trust out peer,
1295 * we sometimes have to double check. */
1296 if (sector + (data_size>>9) > capacity) {
fdda6544
LE
1297 dev_err(DEV, "request from peer beyond end of local disk: "
1298 "capacity: %llus < sector: %llus + size: %u\n",
6666032a
LE
1299 (unsigned long long)capacity,
1300 (unsigned long long)sector, data_size);
1301 return NULL;
1302 }
1303
b411b363
PR
1304 /* GFP_NOIO, because we must not cause arbitrary write-out: in a DRBD
1305 * "criss-cross" setup, that might cause write-out on some other DRBD,
1306 * which in turn might block on the other node at this very place. */
db830c46
AG
1307 peer_req = drbd_alloc_ee(mdev, id, sector, data_size, GFP_NOIO);
1308 if (!peer_req)
b411b363 1309 return NULL;
45bb912b 1310
b411b363 1311 ds = data_size;
db830c46 1312 page = peer_req->pages;
45bb912b
LE
1313 page_chain_for_each(page) {
1314 unsigned len = min_t(int, ds, PAGE_SIZE);
6b4388ac 1315 data = kmap(page);
de0ff338 1316 rr = drbd_recv(mdev->tconn, data, len);
0cf9d27e 1317 if (drbd_insert_fault(mdev, DRBD_FAULT_RECEIVE)) {
6b4388ac
PR
1318 dev_err(DEV, "Fault injection: Corrupting data on receive\n");
1319 data[0] = data[0] ^ (unsigned long)-1;
1320 }
b411b363 1321 kunmap(page);
45bb912b 1322 if (rr != len) {
db830c46 1323 drbd_free_ee(mdev, peer_req);
0ddc5549
LE
1324 if (!signal_pending(current))
1325 dev_warn(DEV, "short read receiving data: read %d expected %d\n",
1326 rr, len);
b411b363
PR
1327 return NULL;
1328 }
1329 ds -= rr;
1330 }
1331
1332 if (dgs) {
db830c46 1333 drbd_csum_ee(mdev, mdev->tconn->integrity_r_tfm, peer_req, dig_vv);
b411b363 1334 if (memcmp(dig_in, dig_vv, dgs)) {
470be44a
LE
1335 dev_err(DEV, "Digest integrity check FAILED: %llus +%u\n",
1336 (unsigned long long)sector, data_size);
b411b363 1337 drbd_bcast_ee(mdev, "digest failed",
db830c46
AG
1338 dgs, dig_in, dig_vv, peer_req);
1339 drbd_free_ee(mdev, peer_req);
b411b363
PR
1340 return NULL;
1341 }
1342 }
1343 mdev->recv_cnt += data_size>>9;
db830c46 1344 return peer_req;
b411b363
PR
1345}
1346
1347/* drbd_drain_block() just takes a data block
1348 * out of the socket input buffer, and discards it.
1349 */
1350static int drbd_drain_block(struct drbd_conf *mdev, int data_size)
1351{
1352 struct page *page;
1353 int rr, rv = 1;
1354 void *data;
1355
c3470cde 1356 if (!data_size)
81e84650 1357 return true;
c3470cde 1358
45bb912b 1359 page = drbd_pp_alloc(mdev, 1, 1);
b411b363
PR
1360
1361 data = kmap(page);
1362 while (data_size) {
de0ff338 1363 rr = drbd_recv(mdev->tconn, data, min_t(int, data_size, PAGE_SIZE));
b411b363
PR
1364 if (rr != min_t(int, data_size, PAGE_SIZE)) {
1365 rv = 0;
0ddc5549
LE
1366 if (!signal_pending(current))
1367 dev_warn(DEV,
1368 "short read receiving data: read %d expected %d\n",
1369 rr, min_t(int, data_size, PAGE_SIZE));
b411b363
PR
1370 break;
1371 }
1372 data_size -= rr;
1373 }
1374 kunmap(page);
435f0740 1375 drbd_pp_free(mdev, page, 0);
b411b363
PR
1376 return rv;
1377}
1378
1379static int recv_dless_read(struct drbd_conf *mdev, struct drbd_request *req,
1380 sector_t sector, int data_size)
1381{
1382 struct bio_vec *bvec;
1383 struct bio *bio;
1384 int dgs, rr, i, expect;
a0638456
PR
1385 void *dig_in = mdev->tconn->int_dig_in;
1386 void *dig_vv = mdev->tconn->int_dig_vv;
b411b363 1387
a0638456
PR
1388 dgs = (mdev->tconn->agreed_pro_version >= 87 && mdev->tconn->integrity_r_tfm) ?
1389 crypto_hash_digestsize(mdev->tconn->integrity_r_tfm) : 0;
b411b363
PR
1390
1391 if (dgs) {
de0ff338 1392 rr = drbd_recv(mdev->tconn, dig_in, dgs);
b411b363 1393 if (rr != dgs) {
0ddc5549
LE
1394 if (!signal_pending(current))
1395 dev_warn(DEV,
1396 "short read receiving data reply digest: read %d expected %d\n",
1397 rr, dgs);
b411b363
PR
1398 return 0;
1399 }
1400 }
1401
1402 data_size -= dgs;
1403
1404 /* optimistically update recv_cnt. if receiving fails below,
1405 * we disconnect anyways, and counters will be reset. */
1406 mdev->recv_cnt += data_size>>9;
1407
1408 bio = req->master_bio;
1409 D_ASSERT(sector == bio->bi_sector);
1410
1411 bio_for_each_segment(bvec, bio, i) {
1412 expect = min_t(int, data_size, bvec->bv_len);
de0ff338 1413 rr = drbd_recv(mdev->tconn,
b411b363
PR
1414 kmap(bvec->bv_page)+bvec->bv_offset,
1415 expect);
1416 kunmap(bvec->bv_page);
1417 if (rr != expect) {
0ddc5549
LE
1418 if (!signal_pending(current))
1419 dev_warn(DEV, "short read receiving data reply: "
1420 "read %d expected %d\n",
1421 rr, expect);
b411b363
PR
1422 return 0;
1423 }
1424 data_size -= rr;
1425 }
1426
1427 if (dgs) {
a0638456 1428 drbd_csum_bio(mdev, mdev->tconn->integrity_r_tfm, bio, dig_vv);
b411b363
PR
1429 if (memcmp(dig_in, dig_vv, dgs)) {
1430 dev_err(DEV, "Digest integrity check FAILED. Broken NICs?\n");
1431 return 0;
1432 }
1433 }
1434
1435 D_ASSERT(data_size == 0);
1436 return 1;
1437}
1438
1439/* e_end_resync_block() is called via
1440 * drbd_process_done_ee() by asender only */
1441static int e_end_resync_block(struct drbd_conf *mdev, struct drbd_work *w, int unused)
1442{
db830c46
AG
1443 struct drbd_peer_request *peer_req = (struct drbd_peer_request *)w;
1444 sector_t sector = peer_req->i.sector;
b411b363
PR
1445 int ok;
1446
db830c46 1447 D_ASSERT(drbd_interval_empty(&peer_req->i));
b411b363 1448
db830c46
AG
1449 if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
1450 drbd_set_in_sync(mdev, sector, peer_req->i.size);
1451 ok = drbd_send_ack(mdev, P_RS_WRITE_ACK, peer_req);
b411b363
PR
1452 } else {
1453 /* Record failure to sync */
db830c46 1454 drbd_rs_failed_io(mdev, sector, peer_req->i.size);
b411b363 1455
db830c46 1456 ok = drbd_send_ack(mdev, P_NEG_ACK, peer_req);
b411b363
PR
1457 }
1458 dec_unacked(mdev);
1459
1460 return ok;
1461}
1462
1463static int recv_resync_read(struct drbd_conf *mdev, sector_t sector, int data_size) __releases(local)
1464{
db830c46 1465 struct drbd_peer_request *peer_req;
b411b363 1466
db830c46
AG
1467 peer_req = read_in_block(mdev, ID_SYNCER, sector, data_size);
1468 if (!peer_req)
45bb912b 1469 goto fail;
b411b363
PR
1470
1471 dec_rs_pending(mdev);
1472
b411b363
PR
1473 inc_unacked(mdev);
1474 /* corresponding dec_unacked() in e_end_resync_block()
1475 * respective _drbd_clear_done_ee */
1476
db830c46 1477 peer_req->w.cb = e_end_resync_block;
45bb912b 1478
87eeee41 1479 spin_lock_irq(&mdev->tconn->req_lock);
db830c46 1480 list_add(&peer_req->w.list, &mdev->sync_ee);
87eeee41 1481 spin_unlock_irq(&mdev->tconn->req_lock);
b411b363 1482
0f0601f4 1483 atomic_add(data_size >> 9, &mdev->rs_sect_ev);
db830c46 1484 if (drbd_submit_ee(mdev, peer_req, WRITE, DRBD_FAULT_RS_WR) == 0)
81e84650 1485 return true;
b411b363 1486
10f6d992
LE
1487 /* don't care for the reason here */
1488 dev_err(DEV, "submit failed, triggering re-connect\n");
87eeee41 1489 spin_lock_irq(&mdev->tconn->req_lock);
db830c46 1490 list_del(&peer_req->w.list);
87eeee41 1491 spin_unlock_irq(&mdev->tconn->req_lock);
22cc37a9 1492
db830c46 1493 drbd_free_ee(mdev, peer_req);
45bb912b
LE
1494fail:
1495 put_ldev(mdev);
81e84650 1496 return false;
b411b363
PR
1497}
1498
668eebc6 1499static struct drbd_request *
bc9c5c41
AG
1500find_request(struct drbd_conf *mdev, struct rb_root *root, u64 id,
1501 sector_t sector, bool missing_ok, const char *func)
51624585 1502{
51624585
AG
1503 struct drbd_request *req;
1504
bc9c5c41
AG
1505 /* Request object according to our peer */
1506 req = (struct drbd_request *)(unsigned long)id;
5e472264 1507 if (drbd_contains_interval(root, sector, &req->i) && req->i.local)
668eebc6 1508 return req;
c3afd8f5
AG
1509 if (!missing_ok) {
1510 dev_err(DEV, "%s: failed to find request %lu, sector %llus\n", func,
1511 (unsigned long)id, (unsigned long long)sector);
1512 }
51624585
AG
1513 return NULL;
1514}
1515
d8763023
AG
1516static int receive_DataReply(struct drbd_conf *mdev, enum drbd_packet cmd,
1517 unsigned int data_size)
b411b363
PR
1518{
1519 struct drbd_request *req;
1520 sector_t sector;
b411b363 1521 int ok;
e42325a5 1522 struct p_data *p = &mdev->tconn->data.rbuf.data;
b411b363
PR
1523
1524 sector = be64_to_cpu(p->sector);
1525
87eeee41 1526 spin_lock_irq(&mdev->tconn->req_lock);
bc9c5c41 1527 req = find_request(mdev, &mdev->read_requests, p->block_id, sector, false, __func__);
87eeee41 1528 spin_unlock_irq(&mdev->tconn->req_lock);
c3afd8f5 1529 if (unlikely(!req))
81e84650 1530 return false;
b411b363 1531
24c4830c 1532 /* hlist_del(&req->collision) is done in _req_may_be_done, to avoid
b411b363
PR
1533 * special casing it there for the various failure cases.
1534 * still no race with drbd_fail_pending_reads */
1535 ok = recv_dless_read(mdev, req, sector, data_size);
1536
1537 if (ok)
8554df1c 1538 req_mod(req, DATA_RECEIVED);
b411b363
PR
1539 /* else: nothing. handled from drbd_disconnect...
1540 * I don't think we may complete this just yet
1541 * in case we are "on-disconnect: freeze" */
1542
1543 return ok;
1544}
1545
d8763023
AG
1546static int receive_RSDataReply(struct drbd_conf *mdev, enum drbd_packet cmd,
1547 unsigned int data_size)
b411b363
PR
1548{
1549 sector_t sector;
b411b363 1550 int ok;
e42325a5 1551 struct p_data *p = &mdev->tconn->data.rbuf.data;
b411b363
PR
1552
1553 sector = be64_to_cpu(p->sector);
1554 D_ASSERT(p->block_id == ID_SYNCER);
1555
1556 if (get_ldev(mdev)) {
1557 /* data is submitted to disk within recv_resync_read.
1558 * corresponding put_ldev done below on error,
9c50842a 1559 * or in drbd_endio_sec. */
b411b363
PR
1560 ok = recv_resync_read(mdev, sector, data_size);
1561 } else {
1562 if (__ratelimit(&drbd_ratelimit_state))
1563 dev_err(DEV, "Can not write resync data to local disk.\n");
1564
1565 ok = drbd_drain_block(mdev, data_size);
1566
2b2bf214 1567 drbd_send_ack_dp(mdev, P_NEG_ACK, p, data_size);
b411b363
PR
1568 }
1569
778f271d
PR
1570 atomic_add(data_size >> 9, &mdev->rs_sect_in);
1571
b411b363
PR
1572 return ok;
1573}
1574
1575/* e_end_block() is called via drbd_process_done_ee().
1576 * this means this function only runs in the asender thread
1577 */
1578static int e_end_block(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1579{
db830c46
AG
1580 struct drbd_peer_request *peer_req = (struct drbd_peer_request *)w;
1581 sector_t sector = peer_req->i.sector;
b411b363
PR
1582 int ok = 1, pcmd;
1583
89e58e75 1584 if (mdev->tconn->net_conf->wire_protocol == DRBD_PROT_C) {
db830c46 1585 if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
b411b363
PR
1586 pcmd = (mdev->state.conn >= C_SYNC_SOURCE &&
1587 mdev->state.conn <= C_PAUSED_SYNC_T &&
db830c46 1588 peer_req->flags & EE_MAY_SET_IN_SYNC) ?
b411b363 1589 P_RS_WRITE_ACK : P_WRITE_ACK;
db830c46 1590 ok &= drbd_send_ack(mdev, pcmd, peer_req);
b411b363 1591 if (pcmd == P_RS_WRITE_ACK)
db830c46 1592 drbd_set_in_sync(mdev, sector, peer_req->i.size);
b411b363 1593 } else {
db830c46 1594 ok = drbd_send_ack(mdev, P_NEG_ACK, peer_req);
b411b363
PR
1595 /* we expect it to be marked out of sync anyways...
1596 * maybe assert this? */
1597 }
1598 dec_unacked(mdev);
1599 }
1600 /* we delete from the conflict detection hash _after_ we sent out the
1601 * P_WRITE_ACK / P_NEG_ACK, to get the sequence number right. */
89e58e75 1602 if (mdev->tconn->net_conf->two_primaries) {
87eeee41 1603 spin_lock_irq(&mdev->tconn->req_lock);
db830c46
AG
1604 D_ASSERT(!drbd_interval_empty(&peer_req->i));
1605 drbd_remove_epoch_entry_interval(mdev, peer_req);
87eeee41 1606 spin_unlock_irq(&mdev->tconn->req_lock);
bb3bfe96 1607 } else
db830c46 1608 D_ASSERT(drbd_interval_empty(&peer_req->i));
b411b363 1609
db830c46 1610 drbd_may_finish_epoch(mdev, peer_req->epoch, EV_PUT + (cancel ? EV_CLEANUP : 0));
b411b363
PR
1611
1612 return ok;
1613}
1614
1615static int e_send_discard_ack(struct drbd_conf *mdev, struct drbd_work *w, int unused)
1616{
db830c46 1617 struct drbd_peer_request *peer_req = (struct drbd_peer_request *)w;
b411b363
PR
1618 int ok = 1;
1619
89e58e75 1620 D_ASSERT(mdev->tconn->net_conf->wire_protocol == DRBD_PROT_C);
db830c46 1621 ok = drbd_send_ack(mdev, P_DISCARD_ACK, peer_req);
b411b363 1622
87eeee41 1623 spin_lock_irq(&mdev->tconn->req_lock);
db830c46
AG
1624 D_ASSERT(!drbd_interval_empty(&peer_req->i));
1625 drbd_remove_epoch_entry_interval(mdev, peer_req);
87eeee41 1626 spin_unlock_irq(&mdev->tconn->req_lock);
b411b363
PR
1627
1628 dec_unacked(mdev);
1629
1630 return ok;
1631}
1632
3e394da1
AG
1633static bool seq_greater(u32 a, u32 b)
1634{
1635 /*
1636 * We assume 32-bit wrap-around here.
1637 * For 24-bit wrap-around, we would have to shift:
1638 * a <<= 8; b <<= 8;
1639 */
1640 return (s32)a - (s32)b > 0;
1641}
1642
1643static u32 seq_max(u32 a, u32 b)
1644{
1645 return seq_greater(a, b) ? a : b;
1646}
1647
43ae077d 1648static void update_peer_seq(struct drbd_conf *mdev, unsigned int peer_seq)
3e394da1 1649{
43ae077d 1650 unsigned int old_peer_seq;
3e394da1
AG
1651
1652 spin_lock(&mdev->peer_seq_lock);
43ae077d
AG
1653 old_peer_seq = mdev->peer_seq;
1654 mdev->peer_seq = seq_max(mdev->peer_seq, peer_seq);
3e394da1 1655 spin_unlock(&mdev->peer_seq_lock);
43ae077d 1656 if (old_peer_seq != peer_seq)
3e394da1
AG
1657 wake_up(&mdev->seq_wait);
1658}
1659
b411b363
PR
1660/* Called from receive_Data.
1661 * Synchronize packets on sock with packets on msock.
1662 *
1663 * This is here so even when a P_DATA packet traveling via sock overtook an Ack
1664 * packet traveling on msock, they are still processed in the order they have
1665 * been sent.
1666 *
1667 * Note: we don't care for Ack packets overtaking P_DATA packets.
1668 *
1669 * In case packet_seq is larger than mdev->peer_seq number, there are
1670 * outstanding packets on the msock. We wait for them to arrive.
1671 * In case we are the logically next packet, we update mdev->peer_seq
1672 * ourselves. Correctly handles 32bit wrap around.
1673 *
1674 * Assume we have a 10 GBit connection, that is about 1<<30 byte per second,
1675 * about 1<<21 sectors per second. So "worst" case, we have 1<<3 == 8 seconds
1676 * for the 24bit wrap (historical atomic_t guarantee on some archs), and we have
1677 * 1<<9 == 512 seconds aka ages for the 32bit wrap around...
1678 *
1679 * returns 0 if we may process the packet,
1680 * -ERESTARTSYS if we were interrupted (by disconnect signal). */
1681static int drbd_wait_peer_seq(struct drbd_conf *mdev, const u32 packet_seq)
1682{
1683 DEFINE_WAIT(wait);
1684 unsigned int p_seq;
1685 long timeout;
1686 int ret = 0;
1687 spin_lock(&mdev->peer_seq_lock);
1688 for (;;) {
1689 prepare_to_wait(&mdev->seq_wait, &wait, TASK_INTERRUPTIBLE);
3e394da1 1690 if (!seq_greater(packet_seq, mdev->peer_seq + 1))
b411b363
PR
1691 break;
1692 if (signal_pending(current)) {
1693 ret = -ERESTARTSYS;
1694 break;
1695 }
1696 p_seq = mdev->peer_seq;
1697 spin_unlock(&mdev->peer_seq_lock);
1698 timeout = schedule_timeout(30*HZ);
1699 spin_lock(&mdev->peer_seq_lock);
1700 if (timeout == 0 && p_seq == mdev->peer_seq) {
1701 ret = -ETIMEDOUT;
1702 dev_err(DEV, "ASSERT FAILED waited 30 seconds for sequence update, forcing reconnect\n");
1703 break;
1704 }
1705 }
1706 finish_wait(&mdev->seq_wait, &wait);
1707 if (mdev->peer_seq+1 == packet_seq)
1708 mdev->peer_seq++;
1709 spin_unlock(&mdev->peer_seq_lock);
1710 return ret;
1711}
1712
688593c5
LE
1713/* see also bio_flags_to_wire()
1714 * DRBD_REQ_*, because we need to semantically map the flags to data packet
1715 * flags and back. We may replicate to other kernel versions. */
1716static unsigned long wire_flags_to_bio(struct drbd_conf *mdev, u32 dpf)
76d2e7ec 1717{
688593c5
LE
1718 return (dpf & DP_RW_SYNC ? REQ_SYNC : 0) |
1719 (dpf & DP_FUA ? REQ_FUA : 0) |
1720 (dpf & DP_FLUSH ? REQ_FLUSH : 0) |
1721 (dpf & DP_DISCARD ? REQ_DISCARD : 0);
76d2e7ec
PR
1722}
1723
b411b363 1724/* mirrored write */
d8763023
AG
1725static int receive_Data(struct drbd_conf *mdev, enum drbd_packet cmd,
1726 unsigned int data_size)
b411b363
PR
1727{
1728 sector_t sector;
db830c46 1729 struct drbd_peer_request *peer_req;
e42325a5 1730 struct p_data *p = &mdev->tconn->data.rbuf.data;
b411b363
PR
1731 int rw = WRITE;
1732 u32 dp_flags;
1733
b411b363 1734 if (!get_ldev(mdev)) {
b411b363
PR
1735 spin_lock(&mdev->peer_seq_lock);
1736 if (mdev->peer_seq+1 == be32_to_cpu(p->seq_num))
1737 mdev->peer_seq++;
1738 spin_unlock(&mdev->peer_seq_lock);
1739
2b2bf214 1740 drbd_send_ack_dp(mdev, P_NEG_ACK, p, data_size);
b411b363
PR
1741 atomic_inc(&mdev->current_epoch->epoch_size);
1742 return drbd_drain_block(mdev, data_size);
1743 }
1744
1745 /* get_ldev(mdev) successful.
1746 * Corresponding put_ldev done either below (on various errors),
9c50842a 1747 * or in drbd_endio_sec, if we successfully submit the data at
b411b363
PR
1748 * the end of this function. */
1749
1750 sector = be64_to_cpu(p->sector);
db830c46
AG
1751 peer_req = read_in_block(mdev, p->block_id, sector, data_size);
1752 if (!peer_req) {
b411b363 1753 put_ldev(mdev);
81e84650 1754 return false;
b411b363
PR
1755 }
1756
db830c46 1757 peer_req->w.cb = e_end_block;
b411b363 1758
688593c5
LE
1759 dp_flags = be32_to_cpu(p->dp_flags);
1760 rw |= wire_flags_to_bio(mdev, dp_flags);
1761
1762 if (dp_flags & DP_MAY_SET_IN_SYNC)
db830c46 1763 peer_req->flags |= EE_MAY_SET_IN_SYNC;
688593c5 1764
b411b363 1765 spin_lock(&mdev->epoch_lock);
db830c46
AG
1766 peer_req->epoch = mdev->current_epoch;
1767 atomic_inc(&peer_req->epoch->epoch_size);
1768 atomic_inc(&peer_req->epoch->active);
b411b363
PR
1769 spin_unlock(&mdev->epoch_lock);
1770
b411b363 1771 /* I'm the receiver, I do hold a net_cnt reference. */
89e58e75 1772 if (!mdev->tconn->net_conf->two_primaries) {
87eeee41 1773 spin_lock_irq(&mdev->tconn->req_lock);
b411b363
PR
1774 } else {
1775 /* don't get the req_lock yet,
1776 * we may sleep in drbd_wait_peer_seq */
db830c46 1777 const int size = peer_req->i.size;
25703f83 1778 const int discard = test_bit(DISCARD_CONCURRENT, &mdev->tconn->flags);
b411b363 1779 DEFINE_WAIT(wait);
b411b363
PR
1780 int first;
1781
89e58e75 1782 D_ASSERT(mdev->tconn->net_conf->wire_protocol == DRBD_PROT_C);
b411b363
PR
1783
1784 /* conflict detection and handling:
1785 * 1. wait on the sequence number,
1786 * in case this data packet overtook ACK packets.
5e472264 1787 * 2. check for conflicting write requests.
b411b363
PR
1788 *
1789 * Note: for two_primaries, we are protocol C,
1790 * so there cannot be any request that is DONE
1791 * but still on the transfer log.
1792 *
b411b363
PR
1793 * if no conflicting request is found:
1794 * submit.
1795 *
1796 * if any conflicting request is found
1797 * that has not yet been acked,
1798 * AND I have the "discard concurrent writes" flag:
1799 * queue (via done_ee) the P_DISCARD_ACK; OUT.
1800 *
1801 * if any conflicting request is found:
1802 * block the receiver, waiting on misc_wait
1803 * until no more conflicting requests are there,
1804 * or we get interrupted (disconnect).
1805 *
1806 * we do not just write after local io completion of those
1807 * requests, but only after req is done completely, i.e.
1808 * we wait for the P_DISCARD_ACK to arrive!
1809 *
1810 * then proceed normally, i.e. submit.
1811 */
1812 if (drbd_wait_peer_seq(mdev, be32_to_cpu(p->seq_num)))
1813 goto out_interrupted;
1814
87eeee41 1815 spin_lock_irq(&mdev->tconn->req_lock);
b411b363 1816
b411b363
PR
1817 first = 1;
1818 for (;;) {
de696716 1819 struct drbd_interval *i;
b411b363
PR
1820 int have_unacked = 0;
1821 int have_conflict = 0;
1822 prepare_to_wait(&mdev->misc_wait, &wait,
1823 TASK_INTERRUPTIBLE);
de696716
AG
1824
1825 i = drbd_find_overlap(&mdev->write_requests, sector, size);
1826 if (i) {
de696716
AG
1827 /* only ALERT on first iteration,
1828 * we may be woken up early... */
1829 if (first)
5e472264 1830 dev_alert(DEV, "%s[%u] Concurrent %s write detected!"
de696716
AG
1831 " new: %llus +%u; pending: %llus +%u\n",
1832 current->comm, current->pid,
5e472264 1833 i->local ? "local" : "remote",
de696716 1834 (unsigned long long)sector, size,
5e472264
AG
1835 (unsigned long long)i->sector, i->size);
1836
1837 if (i->local) {
1838 struct drbd_request *req2;
1839
1840 req2 = container_of(i, struct drbd_request, i);
1841 if (req2->rq_state & RQ_NET_PENDING)
1842 ++have_unacked;
1843 }
de696716 1844 ++have_conflict;
b411b363 1845 }
b411b363
PR
1846 if (!have_conflict)
1847 break;
1848
1849 /* Discard Ack only for the _first_ iteration */
1850 if (first && discard && have_unacked) {
1851 dev_alert(DEV, "Concurrent write! [DISCARD BY FLAG] sec=%llus\n",
1852 (unsigned long long)sector);
1853 inc_unacked(mdev);
db830c46
AG
1854 peer_req->w.cb = e_send_discard_ack;
1855 list_add_tail(&peer_req->w.list, &mdev->done_ee);
b411b363 1856
87eeee41 1857 spin_unlock_irq(&mdev->tconn->req_lock);
b411b363
PR
1858
1859 /* we could probably send that P_DISCARD_ACK ourselves,
1860 * but I don't like the receiver using the msock */
1861
1862 put_ldev(mdev);
0625ac19 1863 wake_asender(mdev->tconn);
b411b363 1864 finish_wait(&mdev->misc_wait, &wait);
81e84650 1865 return true;
b411b363
PR
1866 }
1867
1868 if (signal_pending(current)) {
87eeee41 1869 spin_unlock_irq(&mdev->tconn->req_lock);
b411b363
PR
1870 finish_wait(&mdev->misc_wait, &wait);
1871 goto out_interrupted;
1872 }
1873
a500c2ef 1874 /* Indicate to wake up mdev->misc_wait upon completion. */
53840641 1875 i->waiting = true;
a500c2ef 1876
87eeee41 1877 spin_unlock_irq(&mdev->tconn->req_lock);
b411b363
PR
1878 if (first) {
1879 first = 0;
1880 dev_alert(DEV, "Concurrent write! [W AFTERWARDS] "
1881 "sec=%llus\n", (unsigned long long)sector);
1882 } else if (discard) {
1883 /* we had none on the first iteration.
1884 * there must be none now. */
1885 D_ASSERT(have_unacked == 0);
1886 }
1887 schedule();
87eeee41 1888 spin_lock_irq(&mdev->tconn->req_lock);
b411b363
PR
1889 }
1890 finish_wait(&mdev->misc_wait, &wait);
5e472264 1891
db830c46 1892 drbd_insert_interval(&mdev->write_requests, &peer_req->i);
b411b363
PR
1893 }
1894
db830c46 1895 list_add(&peer_req->w.list, &mdev->active_ee);
87eeee41 1896 spin_unlock_irq(&mdev->tconn->req_lock);
b411b363 1897
89e58e75 1898 switch (mdev->tconn->net_conf->wire_protocol) {
b411b363
PR
1899 case DRBD_PROT_C:
1900 inc_unacked(mdev);
1901 /* corresponding dec_unacked() in e_end_block()
1902 * respective _drbd_clear_done_ee */
1903 break;
1904 case DRBD_PROT_B:
1905 /* I really don't like it that the receiver thread
1906 * sends on the msock, but anyways */
db830c46 1907 drbd_send_ack(mdev, P_RECV_ACK, peer_req);
b411b363
PR
1908 break;
1909 case DRBD_PROT_A:
1910 /* nothing to do */
1911 break;
1912 }
1913
6719fb03 1914 if (mdev->state.pdsk < D_INCONSISTENT) {
b411b363 1915 /* In case we have the only disk of the cluster, */
db830c46
AG
1916 drbd_set_out_of_sync(mdev, peer_req->i.sector, peer_req->i.size);
1917 peer_req->flags |= EE_CALL_AL_COMPLETE_IO;
1918 peer_req->flags &= ~EE_MAY_SET_IN_SYNC;
1919 drbd_al_begin_io(mdev, peer_req->i.sector);
b411b363
PR
1920 }
1921
db830c46 1922 if (drbd_submit_ee(mdev, peer_req, rw, DRBD_FAULT_DT_WR) == 0)
81e84650 1923 return true;
b411b363 1924
10f6d992
LE
1925 /* don't care for the reason here */
1926 dev_err(DEV, "submit failed, triggering re-connect\n");
87eeee41 1927 spin_lock_irq(&mdev->tconn->req_lock);
db830c46
AG
1928 list_del(&peer_req->w.list);
1929 drbd_remove_epoch_entry_interval(mdev, peer_req);
87eeee41 1930 spin_unlock_irq(&mdev->tconn->req_lock);
db830c46
AG
1931 if (peer_req->flags & EE_CALL_AL_COMPLETE_IO)
1932 drbd_al_complete_io(mdev, peer_req->i.sector);
22cc37a9 1933
b411b363 1934out_interrupted:
db830c46 1935 drbd_may_finish_epoch(mdev, peer_req->epoch, EV_PUT + EV_CLEANUP);
b411b363 1936 put_ldev(mdev);
db830c46 1937 drbd_free_ee(mdev, peer_req);
81e84650 1938 return false;
b411b363
PR
1939}
1940
0f0601f4
LE
1941/* We may throttle resync, if the lower device seems to be busy,
1942 * and current sync rate is above c_min_rate.
1943 *
1944 * To decide whether or not the lower device is busy, we use a scheme similar
1945 * to MD RAID is_mddev_idle(): if the partition stats reveal "significant"
1946 * (more than 64 sectors) of activity we cannot account for with our own resync
1947 * activity, it obviously is "busy".
1948 *
1949 * The current sync rate used here uses only the most recent two step marks,
1950 * to have a short time average so we can react faster.
1951 */
e3555d85 1952int drbd_rs_should_slow_down(struct drbd_conf *mdev, sector_t sector)
0f0601f4
LE
1953{
1954 struct gendisk *disk = mdev->ldev->backing_bdev->bd_contains->bd_disk;
1955 unsigned long db, dt, dbdt;
e3555d85 1956 struct lc_element *tmp;
0f0601f4
LE
1957 int curr_events;
1958 int throttle = 0;
1959
1960 /* feature disabled? */
1961 if (mdev->sync_conf.c_min_rate == 0)
1962 return 0;
1963
e3555d85
PR
1964 spin_lock_irq(&mdev->al_lock);
1965 tmp = lc_find(mdev->resync, BM_SECT_TO_EXT(sector));
1966 if (tmp) {
1967 struct bm_extent *bm_ext = lc_entry(tmp, struct bm_extent, lce);
1968 if (test_bit(BME_PRIORITY, &bm_ext->flags)) {
1969 spin_unlock_irq(&mdev->al_lock);
1970 return 0;
1971 }
1972 /* Do not slow down if app IO is already waiting for this extent */
1973 }
1974 spin_unlock_irq(&mdev->al_lock);
1975
0f0601f4
LE
1976 curr_events = (int)part_stat_read(&disk->part0, sectors[0]) +
1977 (int)part_stat_read(&disk->part0, sectors[1]) -
1978 atomic_read(&mdev->rs_sect_ev);
e3555d85 1979
0f0601f4
LE
1980 if (!mdev->rs_last_events || curr_events - mdev->rs_last_events > 64) {
1981 unsigned long rs_left;
1982 int i;
1983
1984 mdev->rs_last_events = curr_events;
1985
1986 /* sync speed average over the last 2*DRBD_SYNC_MARK_STEP,
1987 * approx. */
2649f080
LE
1988 i = (mdev->rs_last_mark + DRBD_SYNC_MARKS-1) % DRBD_SYNC_MARKS;
1989
1990 if (mdev->state.conn == C_VERIFY_S || mdev->state.conn == C_VERIFY_T)
1991 rs_left = mdev->ov_left;
1992 else
1993 rs_left = drbd_bm_total_weight(mdev) - mdev->rs_failed;
0f0601f4
LE
1994
1995 dt = ((long)jiffies - (long)mdev->rs_mark_time[i]) / HZ;
1996 if (!dt)
1997 dt++;
1998 db = mdev->rs_mark_left[i] - rs_left;
1999 dbdt = Bit2KB(db/dt);
2000
2001 if (dbdt > mdev->sync_conf.c_min_rate)
2002 throttle = 1;
2003 }
2004 return throttle;
2005}
2006
2007
d8763023
AG
2008static int receive_DataRequest(struct drbd_conf *mdev, enum drbd_packet cmd,
2009 unsigned int digest_size)
b411b363
PR
2010{
2011 sector_t sector;
2012 const sector_t capacity = drbd_get_capacity(mdev->this_bdev);
db830c46 2013 struct drbd_peer_request *peer_req;
b411b363 2014 struct digest_info *di = NULL;
b18b37be 2015 int size, verb;
b411b363 2016 unsigned int fault_type;
e42325a5 2017 struct p_block_req *p = &mdev->tconn->data.rbuf.block_req;
b411b363
PR
2018
2019 sector = be64_to_cpu(p->sector);
2020 size = be32_to_cpu(p->blksize);
2021
1816a2b4 2022 if (size <= 0 || (size & 0x1ff) != 0 || size > DRBD_MAX_BIO_SIZE) {
b411b363
PR
2023 dev_err(DEV, "%s:%d: sector: %llus, size: %u\n", __FILE__, __LINE__,
2024 (unsigned long long)sector, size);
81e84650 2025 return false;
b411b363
PR
2026 }
2027 if (sector + (size>>9) > capacity) {
2028 dev_err(DEV, "%s:%d: sector: %llus, size: %u\n", __FILE__, __LINE__,
2029 (unsigned long long)sector, size);
81e84650 2030 return false;
b411b363
PR
2031 }
2032
2033 if (!get_ldev_if_state(mdev, D_UP_TO_DATE)) {
b18b37be
PR
2034 verb = 1;
2035 switch (cmd) {
2036 case P_DATA_REQUEST:
2037 drbd_send_ack_rp(mdev, P_NEG_DREPLY, p);
2038 break;
2039 case P_RS_DATA_REQUEST:
2040 case P_CSUM_RS_REQUEST:
2041 case P_OV_REQUEST:
2042 drbd_send_ack_rp(mdev, P_NEG_RS_DREPLY , p);
2043 break;
2044 case P_OV_REPLY:
2045 verb = 0;
2046 dec_rs_pending(mdev);
2047 drbd_send_ack_ex(mdev, P_OV_RESULT, sector, size, ID_IN_SYNC);
2048 break;
2049 default:
2050 dev_err(DEV, "unexpected command (%s) in receive_DataRequest\n",
2051 cmdname(cmd));
2052 }
2053 if (verb && __ratelimit(&drbd_ratelimit_state))
b411b363
PR
2054 dev_err(DEV, "Can not satisfy peer's read request, "
2055 "no local data.\n");
b18b37be 2056
a821cc4a
LE
2057 /* drain possibly payload */
2058 return drbd_drain_block(mdev, digest_size);
b411b363
PR
2059 }
2060
2061 /* GFP_NOIO, because we must not cause arbitrary write-out: in a DRBD
2062 * "criss-cross" setup, that might cause write-out on some other DRBD,
2063 * which in turn might block on the other node at this very place. */
db830c46
AG
2064 peer_req = drbd_alloc_ee(mdev, p->block_id, sector, size, GFP_NOIO);
2065 if (!peer_req) {
b411b363 2066 put_ldev(mdev);
81e84650 2067 return false;
b411b363
PR
2068 }
2069
02918be2 2070 switch (cmd) {
b411b363 2071 case P_DATA_REQUEST:
db830c46 2072 peer_req->w.cb = w_e_end_data_req;
b411b363 2073 fault_type = DRBD_FAULT_DT_RD;
80a40e43
LE
2074 /* application IO, don't drbd_rs_begin_io */
2075 goto submit;
2076
b411b363 2077 case P_RS_DATA_REQUEST:
db830c46 2078 peer_req->w.cb = w_e_end_rsdata_req;
b411b363 2079 fault_type = DRBD_FAULT_RS_RD;
5f9915bb
LE
2080 /* used in the sector offset progress display */
2081 mdev->bm_resync_fo = BM_SECT_TO_BIT(sector);
b411b363
PR
2082 break;
2083
2084 case P_OV_REPLY:
2085 case P_CSUM_RS_REQUEST:
2086 fault_type = DRBD_FAULT_RS_RD;
b411b363
PR
2087 di = kmalloc(sizeof(*di) + digest_size, GFP_NOIO);
2088 if (!di)
2089 goto out_free_e;
2090
2091 di->digest_size = digest_size;
2092 di->digest = (((char *)di)+sizeof(struct digest_info));
2093
db830c46
AG
2094 peer_req->digest = di;
2095 peer_req->flags |= EE_HAS_DIGEST;
c36c3ced 2096
de0ff338 2097 if (drbd_recv(mdev->tconn, di->digest, digest_size) != digest_size)
b411b363
PR
2098 goto out_free_e;
2099
02918be2 2100 if (cmd == P_CSUM_RS_REQUEST) {
31890f4a 2101 D_ASSERT(mdev->tconn->agreed_pro_version >= 89);
db830c46 2102 peer_req->w.cb = w_e_end_csum_rs_req;
5f9915bb
LE
2103 /* used in the sector offset progress display */
2104 mdev->bm_resync_fo = BM_SECT_TO_BIT(sector);
02918be2 2105 } else if (cmd == P_OV_REPLY) {
2649f080
LE
2106 /* track progress, we may need to throttle */
2107 atomic_add(size >> 9, &mdev->rs_sect_in);
db830c46 2108 peer_req->w.cb = w_e_end_ov_reply;
b411b363 2109 dec_rs_pending(mdev);
0f0601f4
LE
2110 /* drbd_rs_begin_io done when we sent this request,
2111 * but accounting still needs to be done. */
2112 goto submit_for_resync;
b411b363
PR
2113 }
2114 break;
2115
2116 case P_OV_REQUEST:
b411b363 2117 if (mdev->ov_start_sector == ~(sector_t)0 &&
31890f4a 2118 mdev->tconn->agreed_pro_version >= 90) {
de228bba
LE
2119 unsigned long now = jiffies;
2120 int i;
b411b363
PR
2121 mdev->ov_start_sector = sector;
2122 mdev->ov_position = sector;
30b743a2
LE
2123 mdev->ov_left = drbd_bm_bits(mdev) - BM_SECT_TO_BIT(sector);
2124 mdev->rs_total = mdev->ov_left;
de228bba
LE
2125 for (i = 0; i < DRBD_SYNC_MARKS; i++) {
2126 mdev->rs_mark_left[i] = mdev->ov_left;
2127 mdev->rs_mark_time[i] = now;
2128 }
b411b363
PR
2129 dev_info(DEV, "Online Verify start sector: %llu\n",
2130 (unsigned long long)sector);
2131 }
db830c46 2132 peer_req->w.cb = w_e_end_ov_req;
b411b363 2133 fault_type = DRBD_FAULT_RS_RD;
b411b363
PR
2134 break;
2135
b411b363
PR
2136 default:
2137 dev_err(DEV, "unexpected command (%s) in receive_DataRequest\n",
02918be2 2138 cmdname(cmd));
b411b363 2139 fault_type = DRBD_FAULT_MAX;
80a40e43 2140 goto out_free_e;
b411b363
PR
2141 }
2142
0f0601f4
LE
2143 /* Throttle, drbd_rs_begin_io and submit should become asynchronous
2144 * wrt the receiver, but it is not as straightforward as it may seem.
2145 * Various places in the resync start and stop logic assume resync
2146 * requests are processed in order, requeuing this on the worker thread
2147 * introduces a bunch of new code for synchronization between threads.
2148 *
2149 * Unlimited throttling before drbd_rs_begin_io may stall the resync
2150 * "forever", throttling after drbd_rs_begin_io will lock that extent
2151 * for application writes for the same time. For now, just throttle
2152 * here, where the rest of the code expects the receiver to sleep for
2153 * a while, anyways.
2154 */
2155
2156 /* Throttle before drbd_rs_begin_io, as that locks out application IO;
2157 * this defers syncer requests for some time, before letting at least
2158 * on request through. The resync controller on the receiving side
2159 * will adapt to the incoming rate accordingly.
2160 *
2161 * We cannot throttle here if remote is Primary/SyncTarget:
2162 * we would also throttle its application reads.
2163 * In that case, throttling is done on the SyncTarget only.
2164 */
e3555d85
PR
2165 if (mdev->state.peer != R_PRIMARY && drbd_rs_should_slow_down(mdev, sector))
2166 schedule_timeout_uninterruptible(HZ/10);
2167 if (drbd_rs_begin_io(mdev, sector))
80a40e43 2168 goto out_free_e;
b411b363 2169
0f0601f4
LE
2170submit_for_resync:
2171 atomic_add(size >> 9, &mdev->rs_sect_ev);
2172
80a40e43 2173submit:
b411b363 2174 inc_unacked(mdev);
87eeee41 2175 spin_lock_irq(&mdev->tconn->req_lock);
db830c46 2176 list_add_tail(&peer_req->w.list, &mdev->read_ee);
87eeee41 2177 spin_unlock_irq(&mdev->tconn->req_lock);
b411b363 2178
db830c46 2179 if (drbd_submit_ee(mdev, peer_req, READ, fault_type) == 0)
81e84650 2180 return true;
b411b363 2181
10f6d992
LE
2182 /* don't care for the reason here */
2183 dev_err(DEV, "submit failed, triggering re-connect\n");
87eeee41 2184 spin_lock_irq(&mdev->tconn->req_lock);
db830c46 2185 list_del(&peer_req->w.list);
87eeee41 2186 spin_unlock_irq(&mdev->tconn->req_lock);
22cc37a9
LE
2187 /* no drbd_rs_complete_io(), we are dropping the connection anyways */
2188
b411b363 2189out_free_e:
b411b363 2190 put_ldev(mdev);
db830c46 2191 drbd_free_ee(mdev, peer_req);
81e84650 2192 return false;
b411b363
PR
2193}
2194
2195static int drbd_asb_recover_0p(struct drbd_conf *mdev) __must_hold(local)
2196{
2197 int self, peer, rv = -100;
2198 unsigned long ch_self, ch_peer;
2199
2200 self = mdev->ldev->md.uuid[UI_BITMAP] & 1;
2201 peer = mdev->p_uuid[UI_BITMAP] & 1;
2202
2203 ch_peer = mdev->p_uuid[UI_SIZE];
2204 ch_self = mdev->comm_bm_set;
2205
89e58e75 2206 switch (mdev->tconn->net_conf->after_sb_0p) {
b411b363
PR
2207 case ASB_CONSENSUS:
2208 case ASB_DISCARD_SECONDARY:
2209 case ASB_CALL_HELPER:
2210 dev_err(DEV, "Configuration error.\n");
2211 break;
2212 case ASB_DISCONNECT:
2213 break;
2214 case ASB_DISCARD_YOUNGER_PRI:
2215 if (self == 0 && peer == 1) {
2216 rv = -1;
2217 break;
2218 }
2219 if (self == 1 && peer == 0) {
2220 rv = 1;
2221 break;
2222 }
2223 /* Else fall through to one of the other strategies... */
2224 case ASB_DISCARD_OLDER_PRI:
2225 if (self == 0 && peer == 1) {
2226 rv = 1;
2227 break;
2228 }
2229 if (self == 1 && peer == 0) {
2230 rv = -1;
2231 break;
2232 }
2233 /* Else fall through to one of the other strategies... */
ad19bf6e 2234 dev_warn(DEV, "Discard younger/older primary did not find a decision\n"
b411b363
PR
2235 "Using discard-least-changes instead\n");
2236 case ASB_DISCARD_ZERO_CHG:
2237 if (ch_peer == 0 && ch_self == 0) {
25703f83 2238 rv = test_bit(DISCARD_CONCURRENT, &mdev->tconn->flags)
b411b363
PR
2239 ? -1 : 1;
2240 break;
2241 } else {
2242 if (ch_peer == 0) { rv = 1; break; }
2243 if (ch_self == 0) { rv = -1; break; }
2244 }
89e58e75 2245 if (mdev->tconn->net_conf->after_sb_0p == ASB_DISCARD_ZERO_CHG)
b411b363
PR
2246 break;
2247 case ASB_DISCARD_LEAST_CHG:
2248 if (ch_self < ch_peer)
2249 rv = -1;
2250 else if (ch_self > ch_peer)
2251 rv = 1;
2252 else /* ( ch_self == ch_peer ) */
2253 /* Well, then use something else. */
25703f83 2254 rv = test_bit(DISCARD_CONCURRENT, &mdev->tconn->flags)
b411b363
PR
2255 ? -1 : 1;
2256 break;
2257 case ASB_DISCARD_LOCAL:
2258 rv = -1;
2259 break;
2260 case ASB_DISCARD_REMOTE:
2261 rv = 1;
2262 }
2263
2264 return rv;
2265}
2266
2267static int drbd_asb_recover_1p(struct drbd_conf *mdev) __must_hold(local)
2268{
6184ea21 2269 int hg, rv = -100;
b411b363 2270
89e58e75 2271 switch (mdev->tconn->net_conf->after_sb_1p) {
b411b363
PR
2272 case ASB_DISCARD_YOUNGER_PRI:
2273 case ASB_DISCARD_OLDER_PRI:
2274 case ASB_DISCARD_LEAST_CHG:
2275 case ASB_DISCARD_LOCAL:
2276 case ASB_DISCARD_REMOTE:
2277 dev_err(DEV, "Configuration error.\n");
2278 break;
2279 case ASB_DISCONNECT:
2280 break;
2281 case ASB_CONSENSUS:
2282 hg = drbd_asb_recover_0p(mdev);
2283 if (hg == -1 && mdev->state.role == R_SECONDARY)
2284 rv = hg;
2285 if (hg == 1 && mdev->state.role == R_PRIMARY)
2286 rv = hg;
2287 break;
2288 case ASB_VIOLENTLY:
2289 rv = drbd_asb_recover_0p(mdev);
2290 break;
2291 case ASB_DISCARD_SECONDARY:
2292 return mdev->state.role == R_PRIMARY ? 1 : -1;
2293 case ASB_CALL_HELPER:
2294 hg = drbd_asb_recover_0p(mdev);
2295 if (hg == -1 && mdev->state.role == R_PRIMARY) {
bb437946
AG
2296 enum drbd_state_rv rv2;
2297
2298 drbd_set_role(mdev, R_SECONDARY, 0);
b411b363
PR
2299 /* drbd_change_state() does not sleep while in SS_IN_TRANSIENT_STATE,
2300 * we might be here in C_WF_REPORT_PARAMS which is transient.
2301 * we do not need to wait for the after state change work either. */
bb437946
AG
2302 rv2 = drbd_change_state(mdev, CS_VERBOSE, NS(role, R_SECONDARY));
2303 if (rv2 != SS_SUCCESS) {
b411b363
PR
2304 drbd_khelper(mdev, "pri-lost-after-sb");
2305 } else {
2306 dev_warn(DEV, "Successfully gave up primary role.\n");
2307 rv = hg;
2308 }
2309 } else
2310 rv = hg;
2311 }
2312
2313 return rv;
2314}
2315
2316static int drbd_asb_recover_2p(struct drbd_conf *mdev) __must_hold(local)
2317{
6184ea21 2318 int hg, rv = -100;
b411b363 2319
89e58e75 2320 switch (mdev->tconn->net_conf->after_sb_2p) {
b411b363
PR
2321 case ASB_DISCARD_YOUNGER_PRI:
2322 case ASB_DISCARD_OLDER_PRI:
2323 case ASB_DISCARD_LEAST_CHG:
2324 case ASB_DISCARD_LOCAL:
2325 case ASB_DISCARD_REMOTE:
2326 case ASB_CONSENSUS:
2327 case ASB_DISCARD_SECONDARY:
2328 dev_err(DEV, "Configuration error.\n");
2329 break;
2330 case ASB_VIOLENTLY:
2331 rv = drbd_asb_recover_0p(mdev);
2332 break;
2333 case ASB_DISCONNECT:
2334 break;
2335 case ASB_CALL_HELPER:
2336 hg = drbd_asb_recover_0p(mdev);
2337 if (hg == -1) {
bb437946
AG
2338 enum drbd_state_rv rv2;
2339
b411b363
PR
2340 /* drbd_change_state() does not sleep while in SS_IN_TRANSIENT_STATE,
2341 * we might be here in C_WF_REPORT_PARAMS which is transient.
2342 * we do not need to wait for the after state change work either. */
bb437946
AG
2343 rv2 = drbd_change_state(mdev, CS_VERBOSE, NS(role, R_SECONDARY));
2344 if (rv2 != SS_SUCCESS) {
b411b363
PR
2345 drbd_khelper(mdev, "pri-lost-after-sb");
2346 } else {
2347 dev_warn(DEV, "Successfully gave up primary role.\n");
2348 rv = hg;
2349 }
2350 } else
2351 rv = hg;
2352 }
2353
2354 return rv;
2355}
2356
2357static void drbd_uuid_dump(struct drbd_conf *mdev, char *text, u64 *uuid,
2358 u64 bits, u64 flags)
2359{
2360 if (!uuid) {
2361 dev_info(DEV, "%s uuid info vanished while I was looking!\n", text);
2362 return;
2363 }
2364 dev_info(DEV, "%s %016llX:%016llX:%016llX:%016llX bits:%llu flags:%llX\n",
2365 text,
2366 (unsigned long long)uuid[UI_CURRENT],
2367 (unsigned long long)uuid[UI_BITMAP],
2368 (unsigned long long)uuid[UI_HISTORY_START],
2369 (unsigned long long)uuid[UI_HISTORY_END],
2370 (unsigned long long)bits,
2371 (unsigned long long)flags);
2372}
2373
2374/*
2375 100 after split brain try auto recover
2376 2 C_SYNC_SOURCE set BitMap
2377 1 C_SYNC_SOURCE use BitMap
2378 0 no Sync
2379 -1 C_SYNC_TARGET use BitMap
2380 -2 C_SYNC_TARGET set BitMap
2381 -100 after split brain, disconnect
2382-1000 unrelated data
4a23f264
PR
2383-1091 requires proto 91
2384-1096 requires proto 96
b411b363
PR
2385 */
2386static int drbd_uuid_compare(struct drbd_conf *mdev, int *rule_nr) __must_hold(local)
2387{
2388 u64 self, peer;
2389 int i, j;
2390
2391 self = mdev->ldev->md.uuid[UI_CURRENT] & ~((u64)1);
2392 peer = mdev->p_uuid[UI_CURRENT] & ~((u64)1);
2393
2394 *rule_nr = 10;
2395 if (self == UUID_JUST_CREATED && peer == UUID_JUST_CREATED)
2396 return 0;
2397
2398 *rule_nr = 20;
2399 if ((self == UUID_JUST_CREATED || self == (u64)0) &&
2400 peer != UUID_JUST_CREATED)
2401 return -2;
2402
2403 *rule_nr = 30;
2404 if (self != UUID_JUST_CREATED &&
2405 (peer == UUID_JUST_CREATED || peer == (u64)0))
2406 return 2;
2407
2408 if (self == peer) {
2409 int rct, dc; /* roles at crash time */
2410
2411 if (mdev->p_uuid[UI_BITMAP] == (u64)0 && mdev->ldev->md.uuid[UI_BITMAP] != (u64)0) {
2412
31890f4a 2413 if (mdev->tconn->agreed_pro_version < 91)
4a23f264 2414 return -1091;
b411b363
PR
2415
2416 if ((mdev->ldev->md.uuid[UI_BITMAP] & ~((u64)1)) == (mdev->p_uuid[UI_HISTORY_START] & ~((u64)1)) &&
2417 (mdev->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) == (mdev->p_uuid[UI_HISTORY_START + 1] & ~((u64)1))) {
2418 dev_info(DEV, "was SyncSource, missed the resync finished event, corrected myself:\n");
2419 drbd_uuid_set_bm(mdev, 0UL);
2420
2421 drbd_uuid_dump(mdev, "self", mdev->ldev->md.uuid,
2422 mdev->state.disk >= D_NEGOTIATING ? drbd_bm_total_weight(mdev) : 0, 0);
2423 *rule_nr = 34;
2424 } else {
2425 dev_info(DEV, "was SyncSource (peer failed to write sync_uuid)\n");
2426 *rule_nr = 36;
2427 }
2428
2429 return 1;
2430 }
2431
2432 if (mdev->ldev->md.uuid[UI_BITMAP] == (u64)0 && mdev->p_uuid[UI_BITMAP] != (u64)0) {
2433
31890f4a 2434 if (mdev->tconn->agreed_pro_version < 91)
4a23f264 2435 return -1091;
b411b363
PR
2436
2437 if ((mdev->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) == (mdev->p_uuid[UI_BITMAP] & ~((u64)1)) &&
2438 (mdev->ldev->md.uuid[UI_HISTORY_START + 1] & ~((u64)1)) == (mdev->p_uuid[UI_HISTORY_START] & ~((u64)1))) {
2439 dev_info(DEV, "was SyncTarget, peer missed the resync finished event, corrected peer:\n");
2440
2441 mdev->p_uuid[UI_HISTORY_START + 1] = mdev->p_uuid[UI_HISTORY_START];
2442 mdev->p_uuid[UI_HISTORY_START] = mdev->p_uuid[UI_BITMAP];
2443 mdev->p_uuid[UI_BITMAP] = 0UL;
2444
2445 drbd_uuid_dump(mdev, "peer", mdev->p_uuid, mdev->p_uuid[UI_SIZE], mdev->p_uuid[UI_FLAGS]);
2446 *rule_nr = 35;
2447 } else {
2448 dev_info(DEV, "was SyncTarget (failed to write sync_uuid)\n");
2449 *rule_nr = 37;
2450 }
2451
2452 return -1;
2453 }
2454
2455 /* Common power [off|failure] */
2456 rct = (test_bit(CRASHED_PRIMARY, &mdev->flags) ? 1 : 0) +
2457 (mdev->p_uuid[UI_FLAGS] & 2);
2458 /* lowest bit is set when we were primary,
2459 * next bit (weight 2) is set when peer was primary */
2460 *rule_nr = 40;
2461
2462 switch (rct) {
2463 case 0: /* !self_pri && !peer_pri */ return 0;
2464 case 1: /* self_pri && !peer_pri */ return 1;
2465 case 2: /* !self_pri && peer_pri */ return -1;
2466 case 3: /* self_pri && peer_pri */
25703f83 2467 dc = test_bit(DISCARD_CONCURRENT, &mdev->tconn->flags);
b411b363
PR
2468 return dc ? -1 : 1;
2469 }
2470 }
2471
2472 *rule_nr = 50;
2473 peer = mdev->p_uuid[UI_BITMAP] & ~((u64)1);
2474 if (self == peer)
2475 return -1;
2476
2477 *rule_nr = 51;
2478 peer = mdev->p_uuid[UI_HISTORY_START] & ~((u64)1);
2479 if (self == peer) {
31890f4a 2480 if (mdev->tconn->agreed_pro_version < 96 ?
4a23f264
PR
2481 (mdev->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) ==
2482 (mdev->p_uuid[UI_HISTORY_START + 1] & ~((u64)1)) :
2483 peer + UUID_NEW_BM_OFFSET == (mdev->p_uuid[UI_BITMAP] & ~((u64)1))) {
b411b363
PR
2484 /* The last P_SYNC_UUID did not get though. Undo the last start of
2485 resync as sync source modifications of the peer's UUIDs. */
2486
31890f4a 2487 if (mdev->tconn->agreed_pro_version < 91)
4a23f264 2488 return -1091;
b411b363
PR
2489
2490 mdev->p_uuid[UI_BITMAP] = mdev->p_uuid[UI_HISTORY_START];
2491 mdev->p_uuid[UI_HISTORY_START] = mdev->p_uuid[UI_HISTORY_START + 1];
4a23f264
PR
2492
2493 dev_info(DEV, "Did not got last syncUUID packet, corrected:\n");
2494 drbd_uuid_dump(mdev, "peer", mdev->p_uuid, mdev->p_uuid[UI_SIZE], mdev->p_uuid[UI_FLAGS]);
2495
b411b363
PR
2496 return -1;
2497 }
2498 }
2499
2500 *rule_nr = 60;
2501 self = mdev->ldev->md.uuid[UI_CURRENT] & ~((u64)1);
2502 for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
2503 peer = mdev->p_uuid[i] & ~((u64)1);
2504 if (self == peer)
2505 return -2;
2506 }
2507
2508 *rule_nr = 70;
2509 self = mdev->ldev->md.uuid[UI_BITMAP] & ~((u64)1);
2510 peer = mdev->p_uuid[UI_CURRENT] & ~((u64)1);
2511 if (self == peer)
2512 return 1;
2513
2514 *rule_nr = 71;
2515 self = mdev->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1);
2516 if (self == peer) {
31890f4a 2517 if (mdev->tconn->agreed_pro_version < 96 ?
4a23f264
PR
2518 (mdev->ldev->md.uuid[UI_HISTORY_START + 1] & ~((u64)1)) ==
2519 (mdev->p_uuid[UI_HISTORY_START] & ~((u64)1)) :
2520 self + UUID_NEW_BM_OFFSET == (mdev->ldev->md.uuid[UI_BITMAP] & ~((u64)1))) {
b411b363
PR
2521 /* The last P_SYNC_UUID did not get though. Undo the last start of
2522 resync as sync source modifications of our UUIDs. */
2523
31890f4a 2524 if (mdev->tconn->agreed_pro_version < 91)
4a23f264 2525 return -1091;
b411b363
PR
2526
2527 _drbd_uuid_set(mdev, UI_BITMAP, mdev->ldev->md.uuid[UI_HISTORY_START]);
2528 _drbd_uuid_set(mdev, UI_HISTORY_START, mdev->ldev->md.uuid[UI_HISTORY_START + 1]);
2529
4a23f264 2530 dev_info(DEV, "Last syncUUID did not get through, corrected:\n");
b411b363
PR
2531 drbd_uuid_dump(mdev, "self", mdev->ldev->md.uuid,
2532 mdev->state.disk >= D_NEGOTIATING ? drbd_bm_total_weight(mdev) : 0, 0);
2533
2534 return 1;
2535 }
2536 }
2537
2538
2539 *rule_nr = 80;
d8c2a36b 2540 peer = mdev->p_uuid[UI_CURRENT] & ~((u64)1);
b411b363
PR
2541 for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
2542 self = mdev->ldev->md.uuid[i] & ~((u64)1);
2543 if (self == peer)
2544 return 2;
2545 }
2546
2547 *rule_nr = 90;
2548 self = mdev->ldev->md.uuid[UI_BITMAP] & ~((u64)1);
2549 peer = mdev->p_uuid[UI_BITMAP] & ~((u64)1);
2550 if (self == peer && self != ((u64)0))
2551 return 100;
2552
2553 *rule_nr = 100;
2554 for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
2555 self = mdev->ldev->md.uuid[i] & ~((u64)1);
2556 for (j = UI_HISTORY_START; j <= UI_HISTORY_END; j++) {
2557 peer = mdev->p_uuid[j] & ~((u64)1);
2558 if (self == peer)
2559 return -100;
2560 }
2561 }
2562
2563 return -1000;
2564}
2565
2566/* drbd_sync_handshake() returns the new conn state on success, or
2567 CONN_MASK (-1) on failure.
2568 */
2569static enum drbd_conns drbd_sync_handshake(struct drbd_conf *mdev, enum drbd_role peer_role,
2570 enum drbd_disk_state peer_disk) __must_hold(local)
2571{
2572 int hg, rule_nr;
2573 enum drbd_conns rv = C_MASK;
2574 enum drbd_disk_state mydisk;
2575
2576 mydisk = mdev->state.disk;
2577 if (mydisk == D_NEGOTIATING)
2578 mydisk = mdev->new_state_tmp.disk;
2579
2580 dev_info(DEV, "drbd_sync_handshake:\n");
2581 drbd_uuid_dump(mdev, "self", mdev->ldev->md.uuid, mdev->comm_bm_set, 0);
2582 drbd_uuid_dump(mdev, "peer", mdev->p_uuid,
2583 mdev->p_uuid[UI_SIZE], mdev->p_uuid[UI_FLAGS]);
2584
2585 hg = drbd_uuid_compare(mdev, &rule_nr);
2586
2587 dev_info(DEV, "uuid_compare()=%d by rule %d\n", hg, rule_nr);
2588
2589 if (hg == -1000) {
2590 dev_alert(DEV, "Unrelated data, aborting!\n");
2591 return C_MASK;
2592 }
4a23f264
PR
2593 if (hg < -1000) {
2594 dev_alert(DEV, "To resolve this both sides have to support at least protocol %d\n", -hg - 1000);
b411b363
PR
2595 return C_MASK;
2596 }
2597
2598 if ((mydisk == D_INCONSISTENT && peer_disk > D_INCONSISTENT) ||
2599 (peer_disk == D_INCONSISTENT && mydisk > D_INCONSISTENT)) {
2600 int f = (hg == -100) || abs(hg) == 2;
2601 hg = mydisk > D_INCONSISTENT ? 1 : -1;
2602 if (f)
2603 hg = hg*2;
2604 dev_info(DEV, "Becoming sync %s due to disk states.\n",
2605 hg > 0 ? "source" : "target");
2606 }
2607
3a11a487
AG
2608 if (abs(hg) == 100)
2609 drbd_khelper(mdev, "initial-split-brain");
2610
89e58e75 2611 if (hg == 100 || (hg == -100 && mdev->tconn->net_conf->always_asbp)) {
b411b363
PR
2612 int pcount = (mdev->state.role == R_PRIMARY)
2613 + (peer_role == R_PRIMARY);
2614 int forced = (hg == -100);
2615
2616 switch (pcount) {
2617 case 0:
2618 hg = drbd_asb_recover_0p(mdev);
2619 break;
2620 case 1:
2621 hg = drbd_asb_recover_1p(mdev);
2622 break;
2623 case 2:
2624 hg = drbd_asb_recover_2p(mdev);
2625 break;
2626 }
2627 if (abs(hg) < 100) {
2628 dev_warn(DEV, "Split-Brain detected, %d primaries, "
2629 "automatically solved. Sync from %s node\n",
2630 pcount, (hg < 0) ? "peer" : "this");
2631 if (forced) {
2632 dev_warn(DEV, "Doing a full sync, since"
2633 " UUIDs where ambiguous.\n");
2634 hg = hg*2;
2635 }
2636 }
2637 }
2638
2639 if (hg == -100) {
89e58e75 2640 if (mdev->tconn->net_conf->want_lose && !(mdev->p_uuid[UI_FLAGS]&1))
b411b363 2641 hg = -1;
89e58e75 2642 if (!mdev->tconn->net_conf->want_lose && (mdev->p_uuid[UI_FLAGS]&1))
b411b363
PR
2643 hg = 1;
2644
2645 if (abs(hg) < 100)
2646 dev_warn(DEV, "Split-Brain detected, manually solved. "
2647 "Sync from %s node\n",
2648 (hg < 0) ? "peer" : "this");
2649 }
2650
2651 if (hg == -100) {
580b9767
LE
2652 /* FIXME this log message is not correct if we end up here
2653 * after an attempted attach on a diskless node.
2654 * We just refuse to attach -- well, we drop the "connection"
2655 * to that disk, in a way... */
3a11a487 2656 dev_alert(DEV, "Split-Brain detected but unresolved, dropping connection!\n");
b411b363
PR
2657 drbd_khelper(mdev, "split-brain");
2658 return C_MASK;
2659 }
2660
2661 if (hg > 0 && mydisk <= D_INCONSISTENT) {
2662 dev_err(DEV, "I shall become SyncSource, but I am inconsistent!\n");
2663 return C_MASK;
2664 }
2665
2666 if (hg < 0 && /* by intention we do not use mydisk here. */
2667 mdev->state.role == R_PRIMARY && mdev->state.disk >= D_CONSISTENT) {
89e58e75 2668 switch (mdev->tconn->net_conf->rr_conflict) {
b411b363
PR
2669 case ASB_CALL_HELPER:
2670 drbd_khelper(mdev, "pri-lost");
2671 /* fall through */
2672 case ASB_DISCONNECT:
2673 dev_err(DEV, "I shall become SyncTarget, but I am primary!\n");
2674 return C_MASK;
2675 case ASB_VIOLENTLY:
2676 dev_warn(DEV, "Becoming SyncTarget, violating the stable-data"
2677 "assumption\n");
2678 }
2679 }
2680
89e58e75 2681 if (mdev->tconn->net_conf->dry_run || test_bit(CONN_DRY_RUN, &mdev->flags)) {
cf14c2e9
PR
2682 if (hg == 0)
2683 dev_info(DEV, "dry-run connect: No resync, would become Connected immediately.\n");
2684 else
2685 dev_info(DEV, "dry-run connect: Would become %s, doing a %s resync.",
2686 drbd_conn_str(hg > 0 ? C_SYNC_SOURCE : C_SYNC_TARGET),
2687 abs(hg) >= 2 ? "full" : "bit-map based");
2688 return C_MASK;
2689 }
2690
b411b363
PR
2691 if (abs(hg) >= 2) {
2692 dev_info(DEV, "Writing the whole bitmap, full sync required after drbd_sync_handshake.\n");
20ceb2b2
LE
2693 if (drbd_bitmap_io(mdev, &drbd_bmio_set_n_write, "set_n_write from sync_handshake",
2694 BM_LOCKED_SET_ALLOWED))
b411b363
PR
2695 return C_MASK;
2696 }
2697
2698 if (hg > 0) { /* become sync source. */
2699 rv = C_WF_BITMAP_S;
2700 } else if (hg < 0) { /* become sync target */
2701 rv = C_WF_BITMAP_T;
2702 } else {
2703 rv = C_CONNECTED;
2704 if (drbd_bm_total_weight(mdev)) {
2705 dev_info(DEV, "No resync, but %lu bits in bitmap!\n",
2706 drbd_bm_total_weight(mdev));
2707 }
2708 }
2709
2710 return rv;
2711}
2712
2713/* returns 1 if invalid */
2714static int cmp_after_sb(enum drbd_after_sb_p peer, enum drbd_after_sb_p self)
2715{
2716 /* ASB_DISCARD_REMOTE - ASB_DISCARD_LOCAL is valid */
2717 if ((peer == ASB_DISCARD_REMOTE && self == ASB_DISCARD_LOCAL) ||
2718 (self == ASB_DISCARD_REMOTE && peer == ASB_DISCARD_LOCAL))
2719 return 0;
2720
2721 /* any other things with ASB_DISCARD_REMOTE or ASB_DISCARD_LOCAL are invalid */
2722 if (peer == ASB_DISCARD_REMOTE || peer == ASB_DISCARD_LOCAL ||
2723 self == ASB_DISCARD_REMOTE || self == ASB_DISCARD_LOCAL)
2724 return 1;
2725
2726 /* everything else is valid if they are equal on both sides. */
2727 if (peer == self)
2728 return 0;
2729
2730 /* everything es is invalid. */
2731 return 1;
2732}
2733
d8763023
AG
2734static int receive_protocol(struct drbd_conf *mdev, enum drbd_packet cmd,
2735 unsigned int data_size)
b411b363 2736{
e42325a5 2737 struct p_protocol *p = &mdev->tconn->data.rbuf.protocol;
b411b363 2738 int p_proto, p_after_sb_0p, p_after_sb_1p, p_after_sb_2p;
cf14c2e9 2739 int p_want_lose, p_two_primaries, cf;
b411b363
PR
2740 char p_integrity_alg[SHARED_SECRET_MAX] = "";
2741
b411b363
PR
2742 p_proto = be32_to_cpu(p->protocol);
2743 p_after_sb_0p = be32_to_cpu(p->after_sb_0p);
2744 p_after_sb_1p = be32_to_cpu(p->after_sb_1p);
2745 p_after_sb_2p = be32_to_cpu(p->after_sb_2p);
b411b363 2746 p_two_primaries = be32_to_cpu(p->two_primaries);
cf14c2e9
PR
2747 cf = be32_to_cpu(p->conn_flags);
2748 p_want_lose = cf & CF_WANT_LOSE;
2749
2750 clear_bit(CONN_DRY_RUN, &mdev->flags);
2751
2752 if (cf & CF_DRY_RUN)
2753 set_bit(CONN_DRY_RUN, &mdev->flags);
b411b363 2754
89e58e75 2755 if (p_proto != mdev->tconn->net_conf->wire_protocol) {
b411b363
PR
2756 dev_err(DEV, "incompatible communication protocols\n");
2757 goto disconnect;
2758 }
2759
89e58e75 2760 if (cmp_after_sb(p_after_sb_0p, mdev->tconn->net_conf->after_sb_0p)) {
b411b363
PR
2761 dev_err(DEV, "incompatible after-sb-0pri settings\n");
2762 goto disconnect;
2763 }
2764
89e58e75 2765 if (cmp_after_sb(p_after_sb_1p, mdev->tconn->net_conf->after_sb_1p)) {
b411b363
PR
2766 dev_err(DEV, "incompatible after-sb-1pri settings\n");
2767 goto disconnect;
2768 }
2769
89e58e75 2770 if (cmp_after_sb(p_after_sb_2p, mdev->tconn->net_conf->after_sb_2p)) {
b411b363
PR
2771 dev_err(DEV, "incompatible after-sb-2pri settings\n");
2772 goto disconnect;
2773 }
2774
89e58e75 2775 if (p_want_lose && mdev->tconn->net_conf->want_lose) {
b411b363
PR
2776 dev_err(DEV, "both sides have the 'want_lose' flag set\n");
2777 goto disconnect;
2778 }
2779
89e58e75 2780 if (p_two_primaries != mdev->tconn->net_conf->two_primaries) {
b411b363
PR
2781 dev_err(DEV, "incompatible setting of the two-primaries options\n");
2782 goto disconnect;
2783 }
2784
31890f4a 2785 if (mdev->tconn->agreed_pro_version >= 87) {
89e58e75 2786 unsigned char *my_alg = mdev->tconn->net_conf->integrity_alg;
b411b363 2787
de0ff338 2788 if (drbd_recv(mdev->tconn, p_integrity_alg, data_size) != data_size)
81e84650 2789 return false;
b411b363
PR
2790
2791 p_integrity_alg[SHARED_SECRET_MAX-1] = 0;
2792 if (strcmp(p_integrity_alg, my_alg)) {
2793 dev_err(DEV, "incompatible setting of the data-integrity-alg\n");
2794 goto disconnect;
2795 }
2796 dev_info(DEV, "data-integrity-alg: %s\n",
2797 my_alg[0] ? my_alg : (unsigned char *)"<not-used>");
2798 }
2799
81e84650 2800 return true;
b411b363
PR
2801
2802disconnect:
2803 drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
81e84650 2804 return false;
b411b363
PR
2805}
2806
2807/* helper function
2808 * input: alg name, feature name
2809 * return: NULL (alg name was "")
2810 * ERR_PTR(error) if something goes wrong
2811 * or the crypto hash ptr, if it worked out ok. */
2812struct crypto_hash *drbd_crypto_alloc_digest_safe(const struct drbd_conf *mdev,
2813 const char *alg, const char *name)
2814{
2815 struct crypto_hash *tfm;
2816
2817 if (!alg[0])
2818 return NULL;
2819
2820 tfm = crypto_alloc_hash(alg, 0, CRYPTO_ALG_ASYNC);
2821 if (IS_ERR(tfm)) {
2822 dev_err(DEV, "Can not allocate \"%s\" as %s (reason: %ld)\n",
2823 alg, name, PTR_ERR(tfm));
2824 return tfm;
2825 }
2826 if (!drbd_crypto_is_hash(crypto_hash_tfm(tfm))) {
2827 crypto_free_hash(tfm);
2828 dev_err(DEV, "\"%s\" is not a digest (%s)\n", alg, name);
2829 return ERR_PTR(-EINVAL);
2830 }
2831 return tfm;
2832}
2833
d8763023
AG
2834static int receive_SyncParam(struct drbd_conf *mdev, enum drbd_packet cmd,
2835 unsigned int packet_size)
b411b363 2836{
81e84650 2837 int ok = true;
e42325a5 2838 struct p_rs_param_95 *p = &mdev->tconn->data.rbuf.rs_param_95;
b411b363
PR
2839 unsigned int header_size, data_size, exp_max_sz;
2840 struct crypto_hash *verify_tfm = NULL;
2841 struct crypto_hash *csums_tfm = NULL;
31890f4a 2842 const int apv = mdev->tconn->agreed_pro_version;
778f271d
PR
2843 int *rs_plan_s = NULL;
2844 int fifo_size = 0;
b411b363
PR
2845
2846 exp_max_sz = apv <= 87 ? sizeof(struct p_rs_param)
2847 : apv == 88 ? sizeof(struct p_rs_param)
2848 + SHARED_SECRET_MAX
8e26f9cc
PR
2849 : apv <= 94 ? sizeof(struct p_rs_param_89)
2850 : /* apv >= 95 */ sizeof(struct p_rs_param_95);
b411b363 2851
02918be2 2852 if (packet_size > exp_max_sz) {
b411b363 2853 dev_err(DEV, "SyncParam packet too long: received %u, expected <= %u bytes\n",
02918be2 2854 packet_size, exp_max_sz);
81e84650 2855 return false;
b411b363
PR
2856 }
2857
2858 if (apv <= 88) {
257d0af6 2859 header_size = sizeof(struct p_rs_param) - sizeof(struct p_header);
02918be2 2860 data_size = packet_size - header_size;
8e26f9cc 2861 } else if (apv <= 94) {
257d0af6 2862 header_size = sizeof(struct p_rs_param_89) - sizeof(struct p_header);
02918be2 2863 data_size = packet_size - header_size;
b411b363 2864 D_ASSERT(data_size == 0);
8e26f9cc 2865 } else {
257d0af6 2866 header_size = sizeof(struct p_rs_param_95) - sizeof(struct p_header);
02918be2 2867 data_size = packet_size - header_size;
b411b363
PR
2868 D_ASSERT(data_size == 0);
2869 }
2870
2871 /* initialize verify_alg and csums_alg */
2872 memset(p->verify_alg, 0, 2 * SHARED_SECRET_MAX);
2873
de0ff338 2874 if (drbd_recv(mdev->tconn, &p->head.payload, header_size) != header_size)
81e84650 2875 return false;
b411b363
PR
2876
2877 mdev->sync_conf.rate = be32_to_cpu(p->rate);
2878
2879 if (apv >= 88) {
2880 if (apv == 88) {
2881 if (data_size > SHARED_SECRET_MAX) {
2882 dev_err(DEV, "verify-alg too long, "
2883 "peer wants %u, accepting only %u byte\n",
2884 data_size, SHARED_SECRET_MAX);
81e84650 2885 return false;
b411b363
PR
2886 }
2887
de0ff338 2888 if (drbd_recv(mdev->tconn, p->verify_alg, data_size) != data_size)
81e84650 2889 return false;
b411b363
PR
2890
2891 /* we expect NUL terminated string */
2892 /* but just in case someone tries to be evil */
2893 D_ASSERT(p->verify_alg[data_size-1] == 0);
2894 p->verify_alg[data_size-1] = 0;
2895
2896 } else /* apv >= 89 */ {
2897 /* we still expect NUL terminated strings */
2898 /* but just in case someone tries to be evil */
2899 D_ASSERT(p->verify_alg[SHARED_SECRET_MAX-1] == 0);
2900 D_ASSERT(p->csums_alg[SHARED_SECRET_MAX-1] == 0);
2901 p->verify_alg[SHARED_SECRET_MAX-1] = 0;
2902 p->csums_alg[SHARED_SECRET_MAX-1] = 0;
2903 }
2904
2905 if (strcmp(mdev->sync_conf.verify_alg, p->verify_alg)) {
2906 if (mdev->state.conn == C_WF_REPORT_PARAMS) {
2907 dev_err(DEV, "Different verify-alg settings. me=\"%s\" peer=\"%s\"\n",
2908 mdev->sync_conf.verify_alg, p->verify_alg);
2909 goto disconnect;
2910 }
2911 verify_tfm = drbd_crypto_alloc_digest_safe(mdev,
2912 p->verify_alg, "verify-alg");
2913 if (IS_ERR(verify_tfm)) {
2914 verify_tfm = NULL;
2915 goto disconnect;
2916 }
2917 }
2918
2919 if (apv >= 89 && strcmp(mdev->sync_conf.csums_alg, p->csums_alg)) {
2920 if (mdev->state.conn == C_WF_REPORT_PARAMS) {
2921 dev_err(DEV, "Different csums-alg settings. me=\"%s\" peer=\"%s\"\n",
2922 mdev->sync_conf.csums_alg, p->csums_alg);
2923 goto disconnect;
2924 }
2925 csums_tfm = drbd_crypto_alloc_digest_safe(mdev,
2926 p->csums_alg, "csums-alg");
2927 if (IS_ERR(csums_tfm)) {
2928 csums_tfm = NULL;
2929 goto disconnect;
2930 }
2931 }
2932
8e26f9cc
PR
2933 if (apv > 94) {
2934 mdev->sync_conf.rate = be32_to_cpu(p->rate);
2935 mdev->sync_conf.c_plan_ahead = be32_to_cpu(p->c_plan_ahead);
2936 mdev->sync_conf.c_delay_target = be32_to_cpu(p->c_delay_target);
2937 mdev->sync_conf.c_fill_target = be32_to_cpu(p->c_fill_target);
2938 mdev->sync_conf.c_max_rate = be32_to_cpu(p->c_max_rate);
778f271d
PR
2939
2940 fifo_size = (mdev->sync_conf.c_plan_ahead * 10 * SLEEP_TIME) / HZ;
2941 if (fifo_size != mdev->rs_plan_s.size && fifo_size > 0) {
2942 rs_plan_s = kzalloc(sizeof(int) * fifo_size, GFP_KERNEL);
2943 if (!rs_plan_s) {
2944 dev_err(DEV, "kmalloc of fifo_buffer failed");
2945 goto disconnect;
2946 }
2947 }
8e26f9cc 2948 }
b411b363
PR
2949
2950 spin_lock(&mdev->peer_seq_lock);
2951 /* lock against drbd_nl_syncer_conf() */
2952 if (verify_tfm) {
2953 strcpy(mdev->sync_conf.verify_alg, p->verify_alg);
2954 mdev->sync_conf.verify_alg_len = strlen(p->verify_alg) + 1;
2955 crypto_free_hash(mdev->verify_tfm);
2956 mdev->verify_tfm = verify_tfm;
2957 dev_info(DEV, "using verify-alg: \"%s\"\n", p->verify_alg);
2958 }
2959 if (csums_tfm) {
2960 strcpy(mdev->sync_conf.csums_alg, p->csums_alg);
2961 mdev->sync_conf.csums_alg_len = strlen(p->csums_alg) + 1;
2962 crypto_free_hash(mdev->csums_tfm);
2963 mdev->csums_tfm = csums_tfm;
2964 dev_info(DEV, "using csums-alg: \"%s\"\n", p->csums_alg);
2965 }
778f271d
PR
2966 if (fifo_size != mdev->rs_plan_s.size) {
2967 kfree(mdev->rs_plan_s.values);
2968 mdev->rs_plan_s.values = rs_plan_s;
2969 mdev->rs_plan_s.size = fifo_size;
2970 mdev->rs_planed = 0;
2971 }
b411b363
PR
2972 spin_unlock(&mdev->peer_seq_lock);
2973 }
2974
2975 return ok;
2976disconnect:
2977 /* just for completeness: actually not needed,
2978 * as this is not reached if csums_tfm was ok. */
2979 crypto_free_hash(csums_tfm);
2980 /* but free the verify_tfm again, if csums_tfm did not work out */
2981 crypto_free_hash(verify_tfm);
2982 drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
81e84650 2983 return false;
b411b363
PR
2984}
2985
b411b363
PR
2986/* warn if the arguments differ by more than 12.5% */
2987static void warn_if_differ_considerably(struct drbd_conf *mdev,
2988 const char *s, sector_t a, sector_t b)
2989{
2990 sector_t d;
2991 if (a == 0 || b == 0)
2992 return;
2993 d = (a > b) ? (a - b) : (b - a);
2994 if (d > (a>>3) || d > (b>>3))
2995 dev_warn(DEV, "Considerable difference in %s: %llus vs. %llus\n", s,
2996 (unsigned long long)a, (unsigned long long)b);
2997}
2998
d8763023
AG
2999static int receive_sizes(struct drbd_conf *mdev, enum drbd_packet cmd,
3000 unsigned int data_size)
b411b363 3001{
e42325a5 3002 struct p_sizes *p = &mdev->tconn->data.rbuf.sizes;
b411b363 3003 enum determine_dev_size dd = unchanged;
b411b363
PR
3004 sector_t p_size, p_usize, my_usize;
3005 int ldsc = 0; /* local disk size changed */
e89b591c 3006 enum dds_flags ddsf;
b411b363 3007
b411b363
PR
3008 p_size = be64_to_cpu(p->d_size);
3009 p_usize = be64_to_cpu(p->u_size);
3010
3011 if (p_size == 0 && mdev->state.disk == D_DISKLESS) {
3012 dev_err(DEV, "some backing storage is needed\n");
3013 drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
81e84650 3014 return false;
b411b363
PR
3015 }
3016
3017 /* just store the peer's disk size for now.
3018 * we still need to figure out whether we accept that. */
3019 mdev->p_size = p_size;
3020
b411b363
PR
3021 if (get_ldev(mdev)) {
3022 warn_if_differ_considerably(mdev, "lower level device sizes",
3023 p_size, drbd_get_max_capacity(mdev->ldev));
3024 warn_if_differ_considerably(mdev, "user requested size",
3025 p_usize, mdev->ldev->dc.disk_size);
3026
3027 /* if this is the first connect, or an otherwise expected
3028 * param exchange, choose the minimum */
3029 if (mdev->state.conn == C_WF_REPORT_PARAMS)
3030 p_usize = min_not_zero((sector_t)mdev->ldev->dc.disk_size,
3031 p_usize);
3032
3033 my_usize = mdev->ldev->dc.disk_size;
3034
3035 if (mdev->ldev->dc.disk_size != p_usize) {
3036 mdev->ldev->dc.disk_size = p_usize;
3037 dev_info(DEV, "Peer sets u_size to %lu sectors\n",
3038 (unsigned long)mdev->ldev->dc.disk_size);
3039 }
3040
3041 /* Never shrink a device with usable data during connect.
3042 But allow online shrinking if we are connected. */
a393db6f 3043 if (drbd_new_dev_size(mdev, mdev->ldev, 0) <
b411b363
PR
3044 drbd_get_capacity(mdev->this_bdev) &&
3045 mdev->state.disk >= D_OUTDATED &&
3046 mdev->state.conn < C_CONNECTED) {
3047 dev_err(DEV, "The peer's disk size is too small!\n");
3048 drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
3049 mdev->ldev->dc.disk_size = my_usize;
3050 put_ldev(mdev);
81e84650 3051 return false;
b411b363
PR
3052 }
3053 put_ldev(mdev);
3054 }
b411b363 3055
e89b591c 3056 ddsf = be16_to_cpu(p->dds_flags);
b411b363 3057 if (get_ldev(mdev)) {
24c4830c 3058 dd = drbd_determine_dev_size(mdev, ddsf);
b411b363
PR
3059 put_ldev(mdev);
3060 if (dd == dev_size_error)
81e84650 3061 return false;
b411b363
PR
3062 drbd_md_sync(mdev);
3063 } else {
3064 /* I am diskless, need to accept the peer's size. */
3065 drbd_set_my_capacity(mdev, p_size);
3066 }
3067
99432fcc
PR
3068 mdev->peer_max_bio_size = be32_to_cpu(p->max_bio_size);
3069 drbd_reconsider_max_bio_size(mdev);
3070
b411b363
PR
3071 if (get_ldev(mdev)) {
3072 if (mdev->ldev->known_size != drbd_get_capacity(mdev->ldev->backing_bdev)) {
3073 mdev->ldev->known_size = drbd_get_capacity(mdev->ldev->backing_bdev);
3074 ldsc = 1;
3075 }
3076
b411b363
PR
3077 put_ldev(mdev);
3078 }
3079
3080 if (mdev->state.conn > C_WF_REPORT_PARAMS) {
3081 if (be64_to_cpu(p->c_size) !=
3082 drbd_get_capacity(mdev->this_bdev) || ldsc) {
3083 /* we have different sizes, probably peer
3084 * needs to know my new size... */
e89b591c 3085 drbd_send_sizes(mdev, 0, ddsf);
b411b363
PR
3086 }
3087 if (test_and_clear_bit(RESIZE_PENDING, &mdev->flags) ||
3088 (dd == grew && mdev->state.conn == C_CONNECTED)) {
3089 if (mdev->state.pdsk >= D_INCONSISTENT &&
e89b591c
PR
3090 mdev->state.disk >= D_INCONSISTENT) {
3091 if (ddsf & DDSF_NO_RESYNC)
3092 dev_info(DEV, "Resync of new storage suppressed with --assume-clean\n");
3093 else
3094 resync_after_online_grow(mdev);
3095 } else
b411b363
PR
3096 set_bit(RESYNC_AFTER_NEG, &mdev->flags);
3097 }
3098 }
3099
81e84650 3100 return true;
b411b363
PR
3101}
3102
d8763023
AG
3103static int receive_uuids(struct drbd_conf *mdev, enum drbd_packet cmd,
3104 unsigned int data_size)
b411b363 3105{
e42325a5 3106 struct p_uuids *p = &mdev->tconn->data.rbuf.uuids;
b411b363 3107 u64 *p_uuid;
62b0da3a 3108 int i, updated_uuids = 0;
b411b363 3109
b411b363
PR
3110 p_uuid = kmalloc(sizeof(u64)*UI_EXTENDED_SIZE, GFP_NOIO);
3111
3112 for (i = UI_CURRENT; i < UI_EXTENDED_SIZE; i++)
3113 p_uuid[i] = be64_to_cpu(p->uuid[i]);
3114
3115 kfree(mdev->p_uuid);
3116 mdev->p_uuid = p_uuid;
3117
3118 if (mdev->state.conn < C_CONNECTED &&
3119 mdev->state.disk < D_INCONSISTENT &&
3120 mdev->state.role == R_PRIMARY &&
3121 (mdev->ed_uuid & ~((u64)1)) != (p_uuid[UI_CURRENT] & ~((u64)1))) {
3122 dev_err(DEV, "Can only connect to data with current UUID=%016llX\n",
3123 (unsigned long long)mdev->ed_uuid);
3124 drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
81e84650 3125 return false;
b411b363
PR
3126 }
3127
3128 if (get_ldev(mdev)) {
3129 int skip_initial_sync =
3130 mdev->state.conn == C_CONNECTED &&
31890f4a 3131 mdev->tconn->agreed_pro_version >= 90 &&
b411b363
PR
3132 mdev->ldev->md.uuid[UI_CURRENT] == UUID_JUST_CREATED &&
3133 (p_uuid[UI_FLAGS] & 8);
3134 if (skip_initial_sync) {
3135 dev_info(DEV, "Accepted new current UUID, preparing to skip initial sync\n");
3136 drbd_bitmap_io(mdev, &drbd_bmio_clear_n_write,
20ceb2b2
LE
3137 "clear_n_write from receive_uuids",
3138 BM_LOCKED_TEST_ALLOWED);
b411b363
PR
3139 _drbd_uuid_set(mdev, UI_CURRENT, p_uuid[UI_CURRENT]);
3140 _drbd_uuid_set(mdev, UI_BITMAP, 0);
3141 _drbd_set_state(_NS2(mdev, disk, D_UP_TO_DATE, pdsk, D_UP_TO_DATE),
3142 CS_VERBOSE, NULL);
3143 drbd_md_sync(mdev);
62b0da3a 3144 updated_uuids = 1;
b411b363
PR
3145 }
3146 put_ldev(mdev);
18a50fa2
PR
3147 } else if (mdev->state.disk < D_INCONSISTENT &&
3148 mdev->state.role == R_PRIMARY) {
3149 /* I am a diskless primary, the peer just created a new current UUID
3150 for me. */
62b0da3a 3151 updated_uuids = drbd_set_ed_uuid(mdev, p_uuid[UI_CURRENT]);
b411b363
PR
3152 }
3153
3154 /* Before we test for the disk state, we should wait until an eventually
3155 ongoing cluster wide state change is finished. That is important if
3156 we are primary and are detaching from our disk. We need to see the
3157 new disk state... */
3158 wait_event(mdev->misc_wait, !test_bit(CLUSTER_ST_CHANGE, &mdev->flags));
3159 if (mdev->state.conn >= C_CONNECTED && mdev->state.disk < D_INCONSISTENT)
62b0da3a
LE
3160 updated_uuids |= drbd_set_ed_uuid(mdev, p_uuid[UI_CURRENT]);
3161
3162 if (updated_uuids)
3163 drbd_print_uuids(mdev, "receiver updated UUIDs to");
b411b363 3164
81e84650 3165 return true;
b411b363
PR
3166}
3167
3168/**
3169 * convert_state() - Converts the peer's view of the cluster state to our point of view
3170 * @ps: The state as seen by the peer.
3171 */
3172static union drbd_state convert_state(union drbd_state ps)
3173{
3174 union drbd_state ms;
3175
3176 static enum drbd_conns c_tab[] = {
3177 [C_CONNECTED] = C_CONNECTED,
3178
3179 [C_STARTING_SYNC_S] = C_STARTING_SYNC_T,
3180 [C_STARTING_SYNC_T] = C_STARTING_SYNC_S,
3181 [C_DISCONNECTING] = C_TEAR_DOWN, /* C_NETWORK_FAILURE, */
3182 [C_VERIFY_S] = C_VERIFY_T,
3183 [C_MASK] = C_MASK,
3184 };
3185
3186 ms.i = ps.i;
3187
3188 ms.conn = c_tab[ps.conn];
3189 ms.peer = ps.role;
3190 ms.role = ps.peer;
3191 ms.pdsk = ps.disk;
3192 ms.disk = ps.pdsk;
3193 ms.peer_isp = (ps.aftr_isp | ps.user_isp);
3194
3195 return ms;
3196}
3197
d8763023
AG
3198static int receive_req_state(struct drbd_conf *mdev, enum drbd_packet cmd,
3199 unsigned int data_size)
b411b363 3200{
e42325a5 3201 struct p_req_state *p = &mdev->tconn->data.rbuf.req_state;
b411b363 3202 union drbd_state mask, val;
bf885f8a 3203 enum drbd_state_rv rv;
b411b363 3204
b411b363
PR
3205 mask.i = be32_to_cpu(p->mask);
3206 val.i = be32_to_cpu(p->val);
3207
25703f83 3208 if (test_bit(DISCARD_CONCURRENT, &mdev->tconn->flags) &&
b411b363
PR
3209 test_bit(CLUSTER_ST_CHANGE, &mdev->flags)) {
3210 drbd_send_sr_reply(mdev, SS_CONCURRENT_ST_CHG);
81e84650 3211 return true;
b411b363
PR
3212 }
3213
3214 mask = convert_state(mask);
3215 val = convert_state(val);
3216
3217 rv = drbd_change_state(mdev, CS_VERBOSE, mask, val);
3218
3219 drbd_send_sr_reply(mdev, rv);
3220 drbd_md_sync(mdev);
3221
81e84650 3222 return true;
b411b363
PR
3223}
3224
d8763023
AG
3225static int receive_state(struct drbd_conf *mdev, enum drbd_packet cmd,
3226 unsigned int data_size)
b411b363 3227{
e42325a5 3228 struct p_state *p = &mdev->tconn->data.rbuf.state;
4ac4aada 3229 union drbd_state os, ns, peer_state;
b411b363 3230 enum drbd_disk_state real_peer_disk;
65d922c3 3231 enum chg_state_flags cs_flags;
b411b363
PR
3232 int rv;
3233
b411b363
PR
3234 peer_state.i = be32_to_cpu(p->state);
3235
3236 real_peer_disk = peer_state.disk;
3237 if (peer_state.disk == D_NEGOTIATING) {
3238 real_peer_disk = mdev->p_uuid[UI_FLAGS] & 4 ? D_INCONSISTENT : D_CONSISTENT;
3239 dev_info(DEV, "real peer disk state = %s\n", drbd_disk_str(real_peer_disk));
3240 }
3241
87eeee41 3242 spin_lock_irq(&mdev->tconn->req_lock);
b411b363 3243 retry:
4ac4aada 3244 os = ns = mdev->state;
87eeee41 3245 spin_unlock_irq(&mdev->tconn->req_lock);
b411b363 3246
e9ef7bb6
LE
3247 /* peer says his disk is uptodate, while we think it is inconsistent,
3248 * and this happens while we think we have a sync going on. */
3249 if (os.pdsk == D_INCONSISTENT && real_peer_disk == D_UP_TO_DATE &&
3250 os.conn > C_CONNECTED && os.disk == D_UP_TO_DATE) {
3251 /* If we are (becoming) SyncSource, but peer is still in sync
3252 * preparation, ignore its uptodate-ness to avoid flapping, it
3253 * will change to inconsistent once the peer reaches active
3254 * syncing states.
3255 * It may have changed syncer-paused flags, however, so we
3256 * cannot ignore this completely. */
3257 if (peer_state.conn > C_CONNECTED &&
3258 peer_state.conn < C_SYNC_SOURCE)
3259 real_peer_disk = D_INCONSISTENT;
3260
3261 /* if peer_state changes to connected at the same time,
3262 * it explicitly notifies us that it finished resync.
3263 * Maybe we should finish it up, too? */
3264 else if (os.conn >= C_SYNC_SOURCE &&
3265 peer_state.conn == C_CONNECTED) {
3266 if (drbd_bm_total_weight(mdev) <= mdev->rs_failed)
3267 drbd_resync_finished(mdev);
81e84650 3268 return true;
e9ef7bb6
LE
3269 }
3270 }
3271
3272 /* peer says his disk is inconsistent, while we think it is uptodate,
3273 * and this happens while the peer still thinks we have a sync going on,
3274 * but we think we are already done with the sync.
3275 * We ignore this to avoid flapping pdsk.
3276 * This should not happen, if the peer is a recent version of drbd. */
3277 if (os.pdsk == D_UP_TO_DATE && real_peer_disk == D_INCONSISTENT &&
3278 os.conn == C_CONNECTED && peer_state.conn > C_SYNC_SOURCE)
3279 real_peer_disk = D_UP_TO_DATE;
3280
4ac4aada
LE
3281 if (ns.conn == C_WF_REPORT_PARAMS)
3282 ns.conn = C_CONNECTED;
b411b363 3283
67531718
PR
3284 if (peer_state.conn == C_AHEAD)
3285 ns.conn = C_BEHIND;
3286
b411b363
PR
3287 if (mdev->p_uuid && peer_state.disk >= D_NEGOTIATING &&
3288 get_ldev_if_state(mdev, D_NEGOTIATING)) {
3289 int cr; /* consider resync */
3290
3291 /* if we established a new connection */
4ac4aada 3292 cr = (os.conn < C_CONNECTED);
b411b363
PR
3293 /* if we had an established connection
3294 * and one of the nodes newly attaches a disk */
4ac4aada 3295 cr |= (os.conn == C_CONNECTED &&
b411b363 3296 (peer_state.disk == D_NEGOTIATING ||
4ac4aada 3297 os.disk == D_NEGOTIATING));
b411b363
PR
3298 /* if we have both been inconsistent, and the peer has been
3299 * forced to be UpToDate with --overwrite-data */
3300 cr |= test_bit(CONSIDER_RESYNC, &mdev->flags);
3301 /* if we had been plain connected, and the admin requested to
3302 * start a sync by "invalidate" or "invalidate-remote" */
4ac4aada 3303 cr |= (os.conn == C_CONNECTED &&
b411b363
PR
3304 (peer_state.conn >= C_STARTING_SYNC_S &&
3305 peer_state.conn <= C_WF_BITMAP_T));
3306
3307 if (cr)
4ac4aada 3308 ns.conn = drbd_sync_handshake(mdev, peer_state.role, real_peer_disk);
b411b363
PR
3309
3310 put_ldev(mdev);
4ac4aada
LE
3311 if (ns.conn == C_MASK) {
3312 ns.conn = C_CONNECTED;
b411b363 3313 if (mdev->state.disk == D_NEGOTIATING) {
82f59cc6 3314 drbd_force_state(mdev, NS(disk, D_FAILED));
b411b363
PR
3315 } else if (peer_state.disk == D_NEGOTIATING) {
3316 dev_err(DEV, "Disk attach process on the peer node was aborted.\n");
3317 peer_state.disk = D_DISKLESS;
580b9767 3318 real_peer_disk = D_DISKLESS;
b411b363 3319 } else {
cf14c2e9 3320 if (test_and_clear_bit(CONN_DRY_RUN, &mdev->flags))
81e84650 3321 return false;
4ac4aada 3322 D_ASSERT(os.conn == C_WF_REPORT_PARAMS);
b411b363 3323 drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
81e84650 3324 return false;
b411b363
PR
3325 }
3326 }
3327 }
3328
87eeee41 3329 spin_lock_irq(&mdev->tconn->req_lock);
4ac4aada 3330 if (mdev->state.i != os.i)
b411b363
PR
3331 goto retry;
3332 clear_bit(CONSIDER_RESYNC, &mdev->flags);
b411b363
PR
3333 ns.peer = peer_state.role;
3334 ns.pdsk = real_peer_disk;
3335 ns.peer_isp = (peer_state.aftr_isp | peer_state.user_isp);
4ac4aada 3336 if ((ns.conn == C_CONNECTED || ns.conn == C_WF_BITMAP_S) && ns.disk == D_NEGOTIATING)
b411b363 3337 ns.disk = mdev->new_state_tmp.disk;
4ac4aada
LE
3338 cs_flags = CS_VERBOSE + (os.conn < C_CONNECTED && ns.conn >= C_CONNECTED ? 0 : CS_HARD);
3339 if (ns.pdsk == D_CONSISTENT && is_susp(ns) && ns.conn == C_CONNECTED && os.conn < C_CONNECTED &&
481c6f50 3340 test_bit(NEW_CUR_UUID, &mdev->flags)) {
8554df1c 3341 /* Do not allow tl_restart(RESEND) for a rebooted peer. We can only allow this
481c6f50 3342 for temporal network outages! */
87eeee41 3343 spin_unlock_irq(&mdev->tconn->req_lock);
481c6f50
PR
3344 dev_err(DEV, "Aborting Connect, can not thaw IO with an only Consistent peer\n");
3345 tl_clear(mdev);
3346 drbd_uuid_new_current(mdev);
3347 clear_bit(NEW_CUR_UUID, &mdev->flags);
3348 drbd_force_state(mdev, NS2(conn, C_PROTOCOL_ERROR, susp, 0));
81e84650 3349 return false;
481c6f50 3350 }
65d922c3 3351 rv = _drbd_set_state(mdev, ns, cs_flags, NULL);
b411b363 3352 ns = mdev->state;
87eeee41 3353 spin_unlock_irq(&mdev->tconn->req_lock);
b411b363
PR
3354
3355 if (rv < SS_SUCCESS) {
3356 drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
81e84650 3357 return false;
b411b363
PR
3358 }
3359
4ac4aada
LE
3360 if (os.conn > C_WF_REPORT_PARAMS) {
3361 if (ns.conn > C_CONNECTED && peer_state.conn <= C_CONNECTED &&
b411b363
PR
3362 peer_state.disk != D_NEGOTIATING ) {
3363 /* we want resync, peer has not yet decided to sync... */
3364 /* Nowadays only used when forcing a node into primary role and
3365 setting its disk to UpToDate with that */
3366 drbd_send_uuids(mdev);
3367 drbd_send_state(mdev);
3368 }
3369 }
3370
89e58e75 3371 mdev->tconn->net_conf->want_lose = 0;
b411b363
PR
3372
3373 drbd_md_sync(mdev); /* update connected indicator, la_size, ... */
3374
81e84650 3375 return true;
b411b363
PR
3376}
3377
d8763023
AG
3378static int receive_sync_uuid(struct drbd_conf *mdev, enum drbd_packet cmd,
3379 unsigned int data_size)
b411b363 3380{
e42325a5 3381 struct p_rs_uuid *p = &mdev->tconn->data.rbuf.rs_uuid;
b411b363
PR
3382
3383 wait_event(mdev->misc_wait,
3384 mdev->state.conn == C_WF_SYNC_UUID ||
c4752ef1 3385 mdev->state.conn == C_BEHIND ||
b411b363
PR
3386 mdev->state.conn < C_CONNECTED ||
3387 mdev->state.disk < D_NEGOTIATING);
3388
3389 /* D_ASSERT( mdev->state.conn == C_WF_SYNC_UUID ); */
3390
b411b363
PR
3391 /* Here the _drbd_uuid_ functions are right, current should
3392 _not_ be rotated into the history */
3393 if (get_ldev_if_state(mdev, D_NEGOTIATING)) {
3394 _drbd_uuid_set(mdev, UI_CURRENT, be64_to_cpu(p->uuid));
3395 _drbd_uuid_set(mdev, UI_BITMAP, 0UL);
3396
62b0da3a 3397 drbd_print_uuids(mdev, "updated sync uuid");
b411b363
PR
3398 drbd_start_resync(mdev, C_SYNC_TARGET);
3399
3400 put_ldev(mdev);
3401 } else
3402 dev_err(DEV, "Ignoring SyncUUID packet!\n");
3403
81e84650 3404 return true;
b411b363
PR
3405}
3406
2c46407d
AG
3407/**
3408 * receive_bitmap_plain
3409 *
3410 * Return 0 when done, 1 when another iteration is needed, and a negative error
3411 * code upon failure.
3412 */
3413static int
02918be2
PR
3414receive_bitmap_plain(struct drbd_conf *mdev, unsigned int data_size,
3415 unsigned long *buffer, struct bm_xfer_ctx *c)
b411b363
PR
3416{
3417 unsigned num_words = min_t(size_t, BM_PACKET_WORDS, c->bm_words - c->word_offset);
3418 unsigned want = num_words * sizeof(long);
2c46407d 3419 int err;
b411b363 3420
02918be2
PR
3421 if (want != data_size) {
3422 dev_err(DEV, "%s:want (%u) != data_size (%u)\n", __func__, want, data_size);
2c46407d 3423 return -EIO;
b411b363
PR
3424 }
3425 if (want == 0)
2c46407d 3426 return 0;
de0ff338 3427 err = drbd_recv(mdev->tconn, buffer, want);
2c46407d
AG
3428 if (err != want) {
3429 if (err >= 0)
3430 err = -EIO;
3431 return err;
3432 }
b411b363
PR
3433
3434 drbd_bm_merge_lel(mdev, c->word_offset, num_words, buffer);
3435
3436 c->word_offset += num_words;
3437 c->bit_offset = c->word_offset * BITS_PER_LONG;
3438 if (c->bit_offset > c->bm_bits)
3439 c->bit_offset = c->bm_bits;
3440
2c46407d 3441 return 1;
b411b363
PR
3442}
3443
2c46407d
AG
3444/**
3445 * recv_bm_rle_bits
3446 *
3447 * Return 0 when done, 1 when another iteration is needed, and a negative error
3448 * code upon failure.
3449 */
3450static int
b411b363
PR
3451recv_bm_rle_bits(struct drbd_conf *mdev,
3452 struct p_compressed_bm *p,
c6d25cfe
PR
3453 struct bm_xfer_ctx *c,
3454 unsigned int len)
b411b363
PR
3455{
3456 struct bitstream bs;
3457 u64 look_ahead;
3458 u64 rl;
3459 u64 tmp;
3460 unsigned long s = c->bit_offset;
3461 unsigned long e;
b411b363
PR
3462 int toggle = DCBP_get_start(p);
3463 int have;
3464 int bits;
3465
3466 bitstream_init(&bs, p->code, len, DCBP_get_pad_bits(p));
3467
3468 bits = bitstream_get_bits(&bs, &look_ahead, 64);
3469 if (bits < 0)
2c46407d 3470 return -EIO;
b411b363
PR
3471
3472 for (have = bits; have > 0; s += rl, toggle = !toggle) {
3473 bits = vli_decode_bits(&rl, look_ahead);
3474 if (bits <= 0)
2c46407d 3475 return -EIO;
b411b363
PR
3476
3477 if (toggle) {
3478 e = s + rl -1;
3479 if (e >= c->bm_bits) {
3480 dev_err(DEV, "bitmap overflow (e:%lu) while decoding bm RLE packet\n", e);
2c46407d 3481 return -EIO;
b411b363
PR
3482 }
3483 _drbd_bm_set_bits(mdev, s, e);
3484 }
3485
3486 if (have < bits) {
3487 dev_err(DEV, "bitmap decoding error: h:%d b:%d la:0x%08llx l:%u/%u\n",
3488 have, bits, look_ahead,
3489 (unsigned int)(bs.cur.b - p->code),
3490 (unsigned int)bs.buf_len);
2c46407d 3491 return -EIO;
b411b363
PR
3492 }
3493 look_ahead >>= bits;
3494 have -= bits;
3495
3496 bits = bitstream_get_bits(&bs, &tmp, 64 - have);
3497 if (bits < 0)
2c46407d 3498 return -EIO;
b411b363
PR
3499 look_ahead |= tmp << have;
3500 have += bits;
3501 }
3502
3503 c->bit_offset = s;
3504 bm_xfer_ctx_bit_to_word_offset(c);
3505
2c46407d 3506 return (s != c->bm_bits);
b411b363
PR
3507}
3508
2c46407d
AG
3509/**
3510 * decode_bitmap_c
3511 *
3512 * Return 0 when done, 1 when another iteration is needed, and a negative error
3513 * code upon failure.
3514 */
3515static int
b411b363
PR
3516decode_bitmap_c(struct drbd_conf *mdev,
3517 struct p_compressed_bm *p,
c6d25cfe
PR
3518 struct bm_xfer_ctx *c,
3519 unsigned int len)
b411b363
PR
3520{
3521 if (DCBP_get_code(p) == RLE_VLI_Bits)
c6d25cfe 3522 return recv_bm_rle_bits(mdev, p, c, len);
b411b363
PR
3523
3524 /* other variants had been implemented for evaluation,
3525 * but have been dropped as this one turned out to be "best"
3526 * during all our tests. */
3527
3528 dev_err(DEV, "receive_bitmap_c: unknown encoding %u\n", p->encoding);
3529 drbd_force_state(mdev, NS(conn, C_PROTOCOL_ERROR));
2c46407d 3530 return -EIO;
b411b363
PR
3531}
3532
3533void INFO_bm_xfer_stats(struct drbd_conf *mdev,
3534 const char *direction, struct bm_xfer_ctx *c)
3535{
3536 /* what would it take to transfer it "plaintext" */
c012949a 3537 unsigned plain = sizeof(struct p_header) *
b411b363
PR
3538 ((c->bm_words+BM_PACKET_WORDS-1)/BM_PACKET_WORDS+1)
3539 + c->bm_words * sizeof(long);
3540 unsigned total = c->bytes[0] + c->bytes[1];
3541 unsigned r;
3542
3543 /* total can not be zero. but just in case: */
3544 if (total == 0)
3545 return;
3546
3547 /* don't report if not compressed */
3548 if (total >= plain)
3549 return;
3550
3551 /* total < plain. check for overflow, still */
3552 r = (total > UINT_MAX/1000) ? (total / (plain/1000))
3553 : (1000 * total / plain);
3554
3555 if (r > 1000)
3556 r = 1000;
3557
3558 r = 1000 - r;
3559 dev_info(DEV, "%s bitmap stats [Bytes(packets)]: plain %u(%u), RLE %u(%u), "
3560 "total %u; compression: %u.%u%%\n",
3561 direction,
3562 c->bytes[1], c->packets[1],
3563 c->bytes[0], c->packets[0],
3564 total, r/10, r % 10);
3565}
3566
3567/* Since we are processing the bitfield from lower addresses to higher,
3568 it does not matter if the process it in 32 bit chunks or 64 bit
3569 chunks as long as it is little endian. (Understand it as byte stream,
3570 beginning with the lowest byte...) If we would use big endian
3571 we would need to process it from the highest address to the lowest,
3572 in order to be agnostic to the 32 vs 64 bits issue.
3573
3574 returns 0 on failure, 1 if we successfully received it. */
d8763023
AG
3575static int receive_bitmap(struct drbd_conf *mdev, enum drbd_packet cmd,
3576 unsigned int data_size)
b411b363
PR
3577{
3578 struct bm_xfer_ctx c;
3579 void *buffer;
2c46407d 3580 int err;
81e84650 3581 int ok = false;
257d0af6 3582 struct p_header *h = &mdev->tconn->data.rbuf.header;
b411b363 3583
20ceb2b2
LE
3584 drbd_bm_lock(mdev, "receive bitmap", BM_LOCKED_SET_ALLOWED);
3585 /* you are supposed to send additional out-of-sync information
3586 * if you actually set bits during this phase */
b411b363
PR
3587
3588 /* maybe we should use some per thread scratch page,
3589 * and allocate that during initial device creation? */
3590 buffer = (unsigned long *) __get_free_page(GFP_NOIO);
3591 if (!buffer) {
3592 dev_err(DEV, "failed to allocate one page buffer in %s\n", __func__);
3593 goto out;
3594 }
3595
3596 c = (struct bm_xfer_ctx) {
3597 .bm_bits = drbd_bm_bits(mdev),
3598 .bm_words = drbd_bm_words(mdev),
3599 };
3600
2c46407d 3601 for(;;) {
02918be2 3602 if (cmd == P_BITMAP) {
2c46407d 3603 err = receive_bitmap_plain(mdev, data_size, buffer, &c);
02918be2 3604 } else if (cmd == P_COMPRESSED_BITMAP) {
b411b363
PR
3605 /* MAYBE: sanity check that we speak proto >= 90,
3606 * and the feature is enabled! */
3607 struct p_compressed_bm *p;
3608
02918be2 3609 if (data_size > BM_PACKET_PAYLOAD_BYTES) {
b411b363
PR
3610 dev_err(DEV, "ReportCBitmap packet too large\n");
3611 goto out;
3612 }
3613 /* use the page buff */
3614 p = buffer;
3615 memcpy(p, h, sizeof(*h));
de0ff338 3616 if (drbd_recv(mdev->tconn, p->head.payload, data_size) != data_size)
b411b363 3617 goto out;
004352fa
LE
3618 if (data_size <= (sizeof(*p) - sizeof(p->head))) {
3619 dev_err(DEV, "ReportCBitmap packet too small (l:%u)\n", data_size);
78fcbdae 3620 goto out;
b411b363 3621 }
c6d25cfe 3622 err = decode_bitmap_c(mdev, p, &c, data_size);
b411b363 3623 } else {
02918be2 3624 dev_warn(DEV, "receive_bitmap: cmd neither ReportBitMap nor ReportCBitMap (is 0x%x)", cmd);
b411b363
PR
3625 goto out;
3626 }
3627
02918be2 3628 c.packets[cmd == P_BITMAP]++;
257d0af6 3629 c.bytes[cmd == P_BITMAP] += sizeof(struct p_header) + data_size;
b411b363 3630
2c46407d
AG
3631 if (err <= 0) {
3632 if (err < 0)
3633 goto out;
b411b363 3634 break;
2c46407d 3635 }
02918be2 3636 if (!drbd_recv_header(mdev, &cmd, &data_size))
b411b363 3637 goto out;
2c46407d 3638 }
b411b363
PR
3639
3640 INFO_bm_xfer_stats(mdev, "receive", &c);
3641
3642 if (mdev->state.conn == C_WF_BITMAP_T) {
de1f8e4a
AG
3643 enum drbd_state_rv rv;
3644
b411b363
PR
3645 ok = !drbd_send_bitmap(mdev);
3646 if (!ok)
3647 goto out;
3648 /* Omit CS_ORDERED with this state transition to avoid deadlocks. */
de1f8e4a
AG
3649 rv = _drbd_request_state(mdev, NS(conn, C_WF_SYNC_UUID), CS_VERBOSE);
3650 D_ASSERT(rv == SS_SUCCESS);
b411b363
PR
3651 } else if (mdev->state.conn != C_WF_BITMAP_S) {
3652 /* admin may have requested C_DISCONNECTING,
3653 * other threads may have noticed network errors */
3654 dev_info(DEV, "unexpected cstate (%s) in receive_bitmap\n",
3655 drbd_conn_str(mdev->state.conn));
3656 }
3657
81e84650 3658 ok = true;
b411b363 3659 out:
20ceb2b2 3660 drbd_bm_unlock(mdev);
b411b363
PR
3661 if (ok && mdev->state.conn == C_WF_BITMAP_S)
3662 drbd_start_resync(mdev, C_SYNC_SOURCE);
3663 free_page((unsigned long) buffer);
3664 return ok;
3665}
3666
d8763023
AG
3667static int receive_skip(struct drbd_conf *mdev, enum drbd_packet cmd,
3668 unsigned int data_size)
b411b363
PR
3669{
3670 /* TODO zero copy sink :) */
3671 static char sink[128];
3672 int size, want, r;
3673
02918be2
PR
3674 dev_warn(DEV, "skipping unknown optional packet type %d, l: %d!\n",
3675 cmd, data_size);
b411b363 3676
02918be2 3677 size = data_size;
b411b363
PR
3678 while (size > 0) {
3679 want = min_t(int, size, sizeof(sink));
de0ff338 3680 r = drbd_recv(mdev->tconn, sink, want);
841ce241
AG
3681 if (!expect(r > 0))
3682 break;
b411b363
PR
3683 size -= r;
3684 }
3685 return size == 0;
3686}
3687
d8763023
AG
3688static int receive_UnplugRemote(struct drbd_conf *mdev, enum drbd_packet cmd,
3689 unsigned int data_size)
0ced55a3 3690{
e7f52dfb
LE
3691 /* Make sure we've acked all the TCP data associated
3692 * with the data requests being unplugged */
e42325a5 3693 drbd_tcp_quickack(mdev->tconn->data.socket);
0ced55a3 3694
81e84650 3695 return true;
0ced55a3
PR
3696}
3697
d8763023
AG
3698static int receive_out_of_sync(struct drbd_conf *mdev, enum drbd_packet cmd,
3699 unsigned int data_size)
73a01a18 3700{
e42325a5 3701 struct p_block_desc *p = &mdev->tconn->data.rbuf.block_desc;
73a01a18 3702
f735e363
LE
3703 switch (mdev->state.conn) {
3704 case C_WF_SYNC_UUID:
3705 case C_WF_BITMAP_T:
3706 case C_BEHIND:
3707 break;
3708 default:
3709 dev_err(DEV, "ASSERT FAILED cstate = %s, expected: WFSyncUUID|WFBitMapT|Behind\n",
3710 drbd_conn_str(mdev->state.conn));
3711 }
3712
73a01a18
PR
3713 drbd_set_out_of_sync(mdev, be64_to_cpu(p->sector), be32_to_cpu(p->blksize));
3714
81e84650 3715 return true;
73a01a18
PR
3716}
3717
d8763023
AG
3718typedef int (*drbd_cmd_handler_f)(struct drbd_conf *, enum drbd_packet cmd,
3719 unsigned int to_receive);
02918be2
PR
3720
3721struct data_cmd {
3722 int expect_payload;
3723 size_t pkt_size;
3724 drbd_cmd_handler_f function;
3725};
3726
3727static struct data_cmd drbd_cmd_handler[] = {
3728 [P_DATA] = { 1, sizeof(struct p_data), receive_Data },
3729 [P_DATA_REPLY] = { 1, sizeof(struct p_data), receive_DataReply },
3730 [P_RS_DATA_REPLY] = { 1, sizeof(struct p_data), receive_RSDataReply } ,
3731 [P_BARRIER] = { 0, sizeof(struct p_barrier), receive_Barrier } ,
257d0af6
PR
3732 [P_BITMAP] = { 1, sizeof(struct p_header), receive_bitmap } ,
3733 [P_COMPRESSED_BITMAP] = { 1, sizeof(struct p_header), receive_bitmap } ,
3734 [P_UNPLUG_REMOTE] = { 0, sizeof(struct p_header), receive_UnplugRemote },
02918be2
PR
3735 [P_DATA_REQUEST] = { 0, sizeof(struct p_block_req), receive_DataRequest },
3736 [P_RS_DATA_REQUEST] = { 0, sizeof(struct p_block_req), receive_DataRequest },
257d0af6
PR
3737 [P_SYNC_PARAM] = { 1, sizeof(struct p_header), receive_SyncParam },
3738 [P_SYNC_PARAM89] = { 1, sizeof(struct p_header), receive_SyncParam },
02918be2
PR
3739 [P_PROTOCOL] = { 1, sizeof(struct p_protocol), receive_protocol },
3740 [P_UUIDS] = { 0, sizeof(struct p_uuids), receive_uuids },
3741 [P_SIZES] = { 0, sizeof(struct p_sizes), receive_sizes },
3742 [P_STATE] = { 0, sizeof(struct p_state), receive_state },
3743 [P_STATE_CHG_REQ] = { 0, sizeof(struct p_req_state), receive_req_state },
3744 [P_SYNC_UUID] = { 0, sizeof(struct p_rs_uuid), receive_sync_uuid },
3745 [P_OV_REQUEST] = { 0, sizeof(struct p_block_req), receive_DataRequest },
3746 [P_OV_REPLY] = { 1, sizeof(struct p_block_req), receive_DataRequest },
3747 [P_CSUM_RS_REQUEST] = { 1, sizeof(struct p_block_req), receive_DataRequest },
3748 [P_DELAY_PROBE] = { 0, sizeof(struct p_delay_probe93), receive_skip },
73a01a18 3749 [P_OUT_OF_SYNC] = { 0, sizeof(struct p_block_desc), receive_out_of_sync },
b411b363
PR
3750 /* anything missing from this table is in
3751 * the asender_tbl, see get_asender_cmd */
02918be2 3752 [P_MAX_CMD] = { 0, 0, NULL },
b411b363
PR
3753};
3754
02918be2 3755/* All handler functions that expect a sub-header get that sub-heder in
e42325a5 3756 mdev->tconn->data.rbuf.header.head.payload.
02918be2 3757
e42325a5 3758 Usually in mdev->tconn->data.rbuf.header.head the callback can find the usual
02918be2
PR
3759 p_header, but they may not rely on that. Since there is also p_header95 !
3760 */
b411b363
PR
3761
3762static void drbdd(struct drbd_conf *mdev)
3763{
c012949a 3764 struct p_header *header = &mdev->tconn->data.rbuf.header;
02918be2 3765 unsigned int packet_size;
d8763023 3766 enum drbd_packet cmd;
02918be2
PR
3767 size_t shs; /* sub header size */
3768 int rv;
b411b363 3769
e6b3ea83 3770 while (get_t_state(&mdev->tconn->receiver) == RUNNING) {
bc31fe33 3771 drbd_thread_current_set_cpu(mdev, &mdev->tconn->receiver);
02918be2
PR
3772 if (!drbd_recv_header(mdev, &cmd, &packet_size))
3773 goto err_out;
b411b363 3774
02918be2
PR
3775 if (unlikely(cmd >= P_MAX_CMD || !drbd_cmd_handler[cmd].function)) {
3776 dev_err(DEV, "unknown packet type %d, l: %d!\n", cmd, packet_size);
3777 goto err_out;
0b33a916 3778 }
b411b363 3779
c012949a 3780 shs = drbd_cmd_handler[cmd].pkt_size - sizeof(struct p_header);
02918be2
PR
3781 if (packet_size - shs > 0 && !drbd_cmd_handler[cmd].expect_payload) {
3782 dev_err(DEV, "No payload expected %s l:%d\n", cmdname(cmd), packet_size);
3783 goto err_out;
b411b363 3784 }
b411b363 3785
c13f7e1a 3786 if (shs) {
de0ff338 3787 rv = drbd_recv(mdev->tconn, &header->payload, shs);
c13f7e1a 3788 if (unlikely(rv != shs)) {
0ddc5549
LE
3789 if (!signal_pending(current))
3790 dev_warn(DEV, "short read while reading sub header: rv=%d\n", rv);
c13f7e1a
LE
3791 goto err_out;
3792 }
3793 }
3794
02918be2 3795 rv = drbd_cmd_handler[cmd].function(mdev, cmd, packet_size - shs);
b411b363 3796
02918be2 3797 if (unlikely(!rv)) {
b411b363 3798 dev_err(DEV, "error receiving %s, l: %d!\n",
02918be2
PR
3799 cmdname(cmd), packet_size);
3800 goto err_out;
b411b363
PR
3801 }
3802 }
b411b363 3803
02918be2
PR
3804 if (0) {
3805 err_out:
3806 drbd_force_state(mdev, NS(conn, C_PROTOCOL_ERROR));
3807 }
856c50c7
LE
3808 /* If we leave here, we probably want to update at least the
3809 * "Connected" indicator on stable storage. Do so explicitly here. */
3810 drbd_md_sync(mdev);
b411b363
PR
3811}
3812
191d3cc8 3813void drbd_flush_workqueue(struct drbd_tconn *tconn)
b411b363
PR
3814{
3815 struct drbd_wq_barrier barr;
3816
3817 barr.w.cb = w_prev_work_done;
3818 init_completion(&barr.done);
191d3cc8 3819 drbd_queue_work(&tconn->data.work, &barr.w);
b411b363
PR
3820 wait_for_completion(&barr.done);
3821}
3822
3823static void drbd_disconnect(struct drbd_conf *mdev)
3824{
3825 enum drbd_fencing_p fp;
3826 union drbd_state os, ns;
3827 int rv = SS_UNKNOWN_ERROR;
3828 unsigned int i;
3829
3830 if (mdev->state.conn == C_STANDALONE)
3831 return;
b411b363
PR
3832
3833 /* asender does not clean up anything. it must not interfere, either */
e6b3ea83 3834 drbd_thread_stop(&mdev->tconn->asender);
b411b363 3835 drbd_free_sock(mdev);
b411b363 3836
85719573 3837 /* wait for current activity to cease. */
87eeee41 3838 spin_lock_irq(&mdev->tconn->req_lock);
b411b363
PR
3839 _drbd_wait_ee_list_empty(mdev, &mdev->active_ee);
3840 _drbd_wait_ee_list_empty(mdev, &mdev->sync_ee);
3841 _drbd_wait_ee_list_empty(mdev, &mdev->read_ee);
87eeee41 3842 spin_unlock_irq(&mdev->tconn->req_lock);
b411b363
PR
3843
3844 /* We do not have data structures that would allow us to
3845 * get the rs_pending_cnt down to 0 again.
3846 * * On C_SYNC_TARGET we do not have any data structures describing
3847 * the pending RSDataRequest's we have sent.
3848 * * On C_SYNC_SOURCE there is no data structure that tracks
3849 * the P_RS_DATA_REPLY blocks that we sent to the SyncTarget.
3850 * And no, it is not the sum of the reference counts in the
3851 * resync_LRU. The resync_LRU tracks the whole operation including
3852 * the disk-IO, while the rs_pending_cnt only tracks the blocks
3853 * on the fly. */
3854 drbd_rs_cancel_all(mdev);
3855 mdev->rs_total = 0;
3856 mdev->rs_failed = 0;
3857 atomic_set(&mdev->rs_pending_cnt, 0);
3858 wake_up(&mdev->misc_wait);
3859
7fde2be9
PR
3860 del_timer(&mdev->request_timer);
3861
b411b363
PR
3862 /* make sure syncer is stopped and w_resume_next_sg queued */
3863 del_timer_sync(&mdev->resync_timer);
b411b363
PR
3864 resync_timer_fn((unsigned long)mdev);
3865
b411b363
PR
3866 /* wait for all w_e_end_data_req, w_e_end_rsdata_req, w_send_barrier,
3867 * w_make_resync_request etc. which may still be on the worker queue
3868 * to be "canceled" */
191d3cc8 3869 drbd_flush_workqueue(mdev->tconn);
b411b363
PR
3870
3871 /* This also does reclaim_net_ee(). If we do this too early, we might
3872 * miss some resync ee and pages.*/
3873 drbd_process_done_ee(mdev);
3874
3875 kfree(mdev->p_uuid);
3876 mdev->p_uuid = NULL;
3877
fb22c402 3878 if (!is_susp(mdev->state))
b411b363
PR
3879 tl_clear(mdev);
3880
b411b363
PR
3881 dev_info(DEV, "Connection closed\n");
3882
3883 drbd_md_sync(mdev);
3884
3885 fp = FP_DONT_CARE;
3886 if (get_ldev(mdev)) {
3887 fp = mdev->ldev->dc.fencing;
3888 put_ldev(mdev);
3889 }
3890
87f7be4c
PR
3891 if (mdev->state.role == R_PRIMARY && fp >= FP_RESOURCE && mdev->state.pdsk >= D_UNKNOWN)
3892 drbd_try_outdate_peer_async(mdev);
b411b363 3893
87eeee41 3894 spin_lock_irq(&mdev->tconn->req_lock);
b411b363
PR
3895 os = mdev->state;
3896 if (os.conn >= C_UNCONNECTED) {
3897 /* Do not restart in case we are C_DISCONNECTING */
3898 ns = os;
3899 ns.conn = C_UNCONNECTED;
3900 rv = _drbd_set_state(mdev, ns, CS_VERBOSE, NULL);
3901 }
87eeee41 3902 spin_unlock_irq(&mdev->tconn->req_lock);
b411b363
PR
3903
3904 if (os.conn == C_DISCONNECTING) {
b2fb6dbe 3905 wait_event(mdev->tconn->net_cnt_wait, atomic_read(&mdev->tconn->net_cnt) == 0);
b411b363 3906
a0638456
PR
3907 crypto_free_hash(mdev->tconn->cram_hmac_tfm);
3908 mdev->tconn->cram_hmac_tfm = NULL;
b411b363 3909
89e58e75
PR
3910 kfree(mdev->tconn->net_conf);
3911 mdev->tconn->net_conf = NULL;
b411b363
PR
3912 drbd_request_state(mdev, NS(conn, C_STANDALONE));
3913 }
3914
20ceb2b2
LE
3915 /* serialize with bitmap writeout triggered by the state change,
3916 * if any. */
3917 wait_event(mdev->misc_wait, !test_bit(BITMAP_IO, &mdev->flags));
3918
b411b363
PR
3919 /* tcp_close and release of sendpage pages can be deferred. I don't
3920 * want to use SO_LINGER, because apparently it can be deferred for
3921 * more than 20 seconds (longest time I checked).
3922 *
3923 * Actually we don't care for exactly when the network stack does its
3924 * put_page(), but release our reference on these pages right here.
3925 */
3926 i = drbd_release_ee(mdev, &mdev->net_ee);
3927 if (i)
3928 dev_info(DEV, "net_ee not empty, killed %u entries\n", i);
435f0740
LE
3929 i = atomic_read(&mdev->pp_in_use_by_net);
3930 if (i)
3931 dev_info(DEV, "pp_in_use_by_net = %d, expected 0\n", i);
b411b363
PR
3932 i = atomic_read(&mdev->pp_in_use);
3933 if (i)
45bb912b 3934 dev_info(DEV, "pp_in_use = %d, expected 0\n", i);
b411b363
PR
3935
3936 D_ASSERT(list_empty(&mdev->read_ee));
3937 D_ASSERT(list_empty(&mdev->active_ee));
3938 D_ASSERT(list_empty(&mdev->sync_ee));
3939 D_ASSERT(list_empty(&mdev->done_ee));
3940
3941 /* ok, no more ee's on the fly, it is safe to reset the epoch_size */
3942 atomic_set(&mdev->current_epoch->epoch_size, 0);
3943 D_ASSERT(list_empty(&mdev->current_epoch->list));
3944}
3945
3946/*
3947 * We support PRO_VERSION_MIN to PRO_VERSION_MAX. The protocol version
3948 * we can agree on is stored in agreed_pro_version.
3949 *
3950 * feature flags and the reserved array should be enough room for future
3951 * enhancements of the handshake protocol, and possible plugins...
3952 *
3953 * for now, they are expected to be zero, but ignored.
3954 */
8a22cccc 3955static int drbd_send_handshake(struct drbd_tconn *tconn)
b411b363 3956{
e6b3ea83 3957 /* ASSERT current == mdev->tconn->receiver ... */
8a22cccc 3958 struct p_handshake *p = &tconn->data.sbuf.handshake;
b411b363
PR
3959 int ok;
3960
8a22cccc
PR
3961 if (mutex_lock_interruptible(&tconn->data.mutex)) {
3962 conn_err(tconn, "interrupted during initial handshake\n");
b411b363
PR
3963 return 0; /* interrupted. not ok. */
3964 }
3965
8a22cccc
PR
3966 if (tconn->data.socket == NULL) {
3967 mutex_unlock(&tconn->data.mutex);
b411b363
PR
3968 return 0;
3969 }
3970
3971 memset(p, 0, sizeof(*p));
3972 p->protocol_min = cpu_to_be32(PRO_VERSION_MIN);
3973 p->protocol_max = cpu_to_be32(PRO_VERSION_MAX);
8a22cccc
PR
3974 ok = _conn_send_cmd(tconn, 0, tconn->data.socket, P_HAND_SHAKE,
3975 &p->head, sizeof(*p), 0);
3976 mutex_unlock(&tconn->data.mutex);
b411b363
PR
3977 return ok;
3978}
3979
3980/*
3981 * return values:
3982 * 1 yes, we have a valid connection
3983 * 0 oops, did not work out, please try again
3984 * -1 peer talks different language,
3985 * no point in trying again, please go standalone.
3986 */
3987static int drbd_do_handshake(struct drbd_conf *mdev)
3988{
e6b3ea83 3989 /* ASSERT current == mdev->tconn->receiver ... */
e42325a5 3990 struct p_handshake *p = &mdev->tconn->data.rbuf.handshake;
02918be2
PR
3991 const int expect = sizeof(struct p_handshake) - sizeof(struct p_header80);
3992 unsigned int length;
d8763023 3993 enum drbd_packet cmd;
b411b363
PR
3994 int rv;
3995
8a22cccc 3996 rv = drbd_send_handshake(mdev->tconn);
b411b363
PR
3997 if (!rv)
3998 return 0;
3999
02918be2 4000 rv = drbd_recv_header(mdev, &cmd, &length);
b411b363
PR
4001 if (!rv)
4002 return 0;
4003
02918be2 4004 if (cmd != P_HAND_SHAKE) {
b411b363 4005 dev_err(DEV, "expected HandShake packet, received: %s (0x%04x)\n",
02918be2 4006 cmdname(cmd), cmd);
b411b363
PR
4007 return -1;
4008 }
4009
02918be2 4010 if (length != expect) {
b411b363 4011 dev_err(DEV, "expected HandShake length: %u, received: %u\n",
02918be2 4012 expect, length);
b411b363
PR
4013 return -1;
4014 }
4015
de0ff338 4016 rv = drbd_recv(mdev->tconn, &p->head.payload, expect);
b411b363
PR
4017
4018 if (rv != expect) {
0ddc5549
LE
4019 if (!signal_pending(current))
4020 dev_warn(DEV, "short read receiving handshake packet: l=%u\n", rv);
b411b363
PR
4021 return 0;
4022 }
4023
b411b363
PR
4024 p->protocol_min = be32_to_cpu(p->protocol_min);
4025 p->protocol_max = be32_to_cpu(p->protocol_max);
4026 if (p->protocol_max == 0)
4027 p->protocol_max = p->protocol_min;
4028
4029 if (PRO_VERSION_MAX < p->protocol_min ||
4030 PRO_VERSION_MIN > p->protocol_max)
4031 goto incompat;
4032
31890f4a 4033 mdev->tconn->agreed_pro_version = min_t(int, PRO_VERSION_MAX, p->protocol_max);
b411b363
PR
4034
4035 dev_info(DEV, "Handshake successful: "
31890f4a 4036 "Agreed network protocol version %d\n", mdev->tconn->agreed_pro_version);
b411b363
PR
4037
4038 return 1;
4039
4040 incompat:
4041 dev_err(DEV, "incompatible DRBD dialects: "
4042 "I support %d-%d, peer supports %d-%d\n",
4043 PRO_VERSION_MIN, PRO_VERSION_MAX,
4044 p->protocol_min, p->protocol_max);
4045 return -1;
4046}
4047
4048#if !defined(CONFIG_CRYPTO_HMAC) && !defined(CONFIG_CRYPTO_HMAC_MODULE)
4049static int drbd_do_auth(struct drbd_conf *mdev)
4050{
4051 dev_err(DEV, "This kernel was build without CONFIG_CRYPTO_HMAC.\n");
4052 dev_err(DEV, "You need to disable 'cram-hmac-alg' in drbd.conf.\n");
b10d96cb 4053 return -1;
b411b363
PR
4054}
4055#else
4056#define CHALLENGE_LEN 64
b10d96cb
JT
4057
4058/* Return value:
4059 1 - auth succeeded,
4060 0 - failed, try again (network error),
4061 -1 - auth failed, don't try again.
4062*/
4063
b411b363
PR
4064static int drbd_do_auth(struct drbd_conf *mdev)
4065{
4066 char my_challenge[CHALLENGE_LEN]; /* 64 Bytes... */
4067 struct scatterlist sg;
4068 char *response = NULL;
4069 char *right_response = NULL;
4070 char *peers_ch = NULL;
89e58e75 4071 unsigned int key_len = strlen(mdev->tconn->net_conf->shared_secret);
b411b363
PR
4072 unsigned int resp_size;
4073 struct hash_desc desc;
d8763023 4074 enum drbd_packet cmd;
02918be2 4075 unsigned int length;
b411b363
PR
4076 int rv;
4077
a0638456 4078 desc.tfm = mdev->tconn->cram_hmac_tfm;
b411b363
PR
4079 desc.flags = 0;
4080
a0638456 4081 rv = crypto_hash_setkey(mdev->tconn->cram_hmac_tfm,
89e58e75 4082 (u8 *)mdev->tconn->net_conf->shared_secret, key_len);
b411b363
PR
4083 if (rv) {
4084 dev_err(DEV, "crypto_hash_setkey() failed with %d\n", rv);
b10d96cb 4085 rv = -1;
b411b363
PR
4086 goto fail;
4087 }
4088
4089 get_random_bytes(my_challenge, CHALLENGE_LEN);
4090
4091 rv = drbd_send_cmd2(mdev, P_AUTH_CHALLENGE, my_challenge, CHALLENGE_LEN);
4092 if (!rv)
4093 goto fail;
4094
02918be2 4095 rv = drbd_recv_header(mdev, &cmd, &length);
b411b363
PR
4096 if (!rv)
4097 goto fail;
4098
02918be2 4099 if (cmd != P_AUTH_CHALLENGE) {
b411b363 4100 dev_err(DEV, "expected AuthChallenge packet, received: %s (0x%04x)\n",
02918be2 4101 cmdname(cmd), cmd);
b411b363
PR
4102 rv = 0;
4103 goto fail;
4104 }
4105
02918be2 4106 if (length > CHALLENGE_LEN * 2) {
b411b363 4107 dev_err(DEV, "expected AuthChallenge payload too big.\n");
b10d96cb 4108 rv = -1;
b411b363
PR
4109 goto fail;
4110 }
4111
02918be2 4112 peers_ch = kmalloc(length, GFP_NOIO);
b411b363
PR
4113 if (peers_ch == NULL) {
4114 dev_err(DEV, "kmalloc of peers_ch failed\n");
b10d96cb 4115 rv = -1;
b411b363
PR
4116 goto fail;
4117 }
4118
de0ff338 4119 rv = drbd_recv(mdev->tconn, peers_ch, length);
b411b363 4120
02918be2 4121 if (rv != length) {
0ddc5549
LE
4122 if (!signal_pending(current))
4123 dev_warn(DEV, "short read AuthChallenge: l=%u\n", rv);
b411b363
PR
4124 rv = 0;
4125 goto fail;
4126 }
4127
a0638456 4128 resp_size = crypto_hash_digestsize(mdev->tconn->cram_hmac_tfm);
b411b363
PR
4129 response = kmalloc(resp_size, GFP_NOIO);
4130 if (response == NULL) {
4131 dev_err(DEV, "kmalloc of response failed\n");
b10d96cb 4132 rv = -1;
b411b363
PR
4133 goto fail;
4134 }
4135
4136 sg_init_table(&sg, 1);
02918be2 4137 sg_set_buf(&sg, peers_ch, length);
b411b363
PR
4138
4139 rv = crypto_hash_digest(&desc, &sg, sg.length, response);
4140 if (rv) {
4141 dev_err(DEV, "crypto_hash_digest() failed with %d\n", rv);
b10d96cb 4142 rv = -1;
b411b363
PR
4143 goto fail;
4144 }
4145
4146 rv = drbd_send_cmd2(mdev, P_AUTH_RESPONSE, response, resp_size);
4147 if (!rv)
4148 goto fail;
4149
02918be2 4150 rv = drbd_recv_header(mdev, &cmd, &length);
b411b363
PR
4151 if (!rv)
4152 goto fail;
4153
02918be2 4154 if (cmd != P_AUTH_RESPONSE) {
b411b363 4155 dev_err(DEV, "expected AuthResponse packet, received: %s (0x%04x)\n",
02918be2 4156 cmdname(cmd), cmd);
b411b363
PR
4157 rv = 0;
4158 goto fail;
4159 }
4160
02918be2 4161 if (length != resp_size) {
b411b363
PR
4162 dev_err(DEV, "expected AuthResponse payload of wrong size\n");
4163 rv = 0;
4164 goto fail;
4165 }
4166
de0ff338 4167 rv = drbd_recv(mdev->tconn, response , resp_size);
b411b363
PR
4168
4169 if (rv != resp_size) {
0ddc5549
LE
4170 if (!signal_pending(current))
4171 dev_warn(DEV, "short read receiving AuthResponse: l=%u\n", rv);
b411b363
PR
4172 rv = 0;
4173 goto fail;
4174 }
4175
4176 right_response = kmalloc(resp_size, GFP_NOIO);
2d1ee87d 4177 if (right_response == NULL) {
b411b363 4178 dev_err(DEV, "kmalloc of right_response failed\n");
b10d96cb 4179 rv = -1;
b411b363
PR
4180 goto fail;
4181 }
4182
4183 sg_set_buf(&sg, my_challenge, CHALLENGE_LEN);
4184
4185 rv = crypto_hash_digest(&desc, &sg, sg.length, right_response);
4186 if (rv) {
4187 dev_err(DEV, "crypto_hash_digest() failed with %d\n", rv);
b10d96cb 4188 rv = -1;
b411b363
PR
4189 goto fail;
4190 }
4191
4192 rv = !memcmp(response, right_response, resp_size);
4193
4194 if (rv)
4195 dev_info(DEV, "Peer authenticated using %d bytes of '%s' HMAC\n",
89e58e75 4196 resp_size, mdev->tconn->net_conf->cram_hmac_alg);
b10d96cb
JT
4197 else
4198 rv = -1;
b411b363
PR
4199
4200 fail:
4201 kfree(peers_ch);
4202 kfree(response);
4203 kfree(right_response);
4204
4205 return rv;
4206}
4207#endif
4208
4209int drbdd_init(struct drbd_thread *thi)
4210{
4211 struct drbd_conf *mdev = thi->mdev;
4212 unsigned int minor = mdev_to_minor(mdev);
4213 int h;
4214
4215 sprintf(current->comm, "drbd%d_receiver", minor);
4216
4217 dev_info(DEV, "receiver (re)started\n");
4218
4219 do {
4220 h = drbd_connect(mdev);
4221 if (h == 0) {
4222 drbd_disconnect(mdev);
20ee6390 4223 schedule_timeout_interruptible(HZ);
b411b363
PR
4224 }
4225 if (h == -1) {
4226 dev_warn(DEV, "Discarding network configuration.\n");
4227 drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
4228 }
4229 } while (h == 0);
4230
4231 if (h > 0) {
b2fb6dbe 4232 if (get_net_conf(mdev->tconn)) {
b411b363 4233 drbdd(mdev);
b2fb6dbe 4234 put_net_conf(mdev->tconn);
b411b363
PR
4235 }
4236 }
4237
4238 drbd_disconnect(mdev);
4239
4240 dev_info(DEV, "receiver terminated\n");
4241 return 0;
4242}
4243
4244/* ********* acknowledge sender ******** */
4245
d8763023 4246static int got_RqSReply(struct drbd_conf *mdev, enum drbd_packet cmd)
b411b363 4247{
257d0af6 4248 struct p_req_state_reply *p = &mdev->tconn->meta.rbuf.req_state_reply;
b411b363
PR
4249
4250 int retcode = be32_to_cpu(p->retcode);
4251
4252 if (retcode >= SS_SUCCESS) {
4253 set_bit(CL_ST_CHG_SUCCESS, &mdev->flags);
4254 } else {
4255 set_bit(CL_ST_CHG_FAIL, &mdev->flags);
4256 dev_err(DEV, "Requested state change failed by peer: %s (%d)\n",
4257 drbd_set_st_err_str(retcode), retcode);
4258 }
4259 wake_up(&mdev->state_wait);
4260
81e84650 4261 return true;
b411b363
PR
4262}
4263
d8763023 4264static int got_Ping(struct drbd_conf *mdev, enum drbd_packet cmd)
b411b363
PR
4265{
4266 return drbd_send_ping_ack(mdev);
4267
4268}
4269
d8763023 4270static int got_PingAck(struct drbd_conf *mdev, enum drbd_packet cmd)
b411b363
PR
4271{
4272 /* restore idle timeout */
e42325a5 4273 mdev->tconn->meta.socket->sk->sk_rcvtimeo = mdev->tconn->net_conf->ping_int*HZ;
309d1608
PR
4274 if (!test_and_set_bit(GOT_PING_ACK, &mdev->flags))
4275 wake_up(&mdev->misc_wait);
b411b363 4276
81e84650 4277 return true;
b411b363
PR
4278}
4279
d8763023 4280static int got_IsInSync(struct drbd_conf *mdev, enum drbd_packet cmd)
b411b363 4281{
257d0af6 4282 struct p_block_ack *p = &mdev->tconn->meta.rbuf.block_ack;
b411b363
PR
4283 sector_t sector = be64_to_cpu(p->sector);
4284 int blksize = be32_to_cpu(p->blksize);
4285
31890f4a 4286 D_ASSERT(mdev->tconn->agreed_pro_version >= 89);
b411b363
PR
4287
4288 update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4289
1d53f09e
LE
4290 if (get_ldev(mdev)) {
4291 drbd_rs_complete_io(mdev, sector);
4292 drbd_set_in_sync(mdev, sector, blksize);
4293 /* rs_same_csums is supposed to count in units of BM_BLOCK_SIZE */
4294 mdev->rs_same_csum += (blksize >> BM_BLOCK_SHIFT);
4295 put_ldev(mdev);
4296 }
b411b363 4297 dec_rs_pending(mdev);
778f271d 4298 atomic_add(blksize >> 9, &mdev->rs_sect_in);
b411b363 4299
81e84650 4300 return true;
b411b363
PR
4301}
4302
bc9c5c41
AG
4303static int
4304validate_req_change_req_state(struct drbd_conf *mdev, u64 id, sector_t sector,
4305 struct rb_root *root, const char *func,
4306 enum drbd_req_event what, bool missing_ok)
b411b363
PR
4307{
4308 struct drbd_request *req;
4309 struct bio_and_error m;
4310
87eeee41 4311 spin_lock_irq(&mdev->tconn->req_lock);
bc9c5c41 4312 req = find_request(mdev, root, id, sector, missing_ok, func);
b411b363 4313 if (unlikely(!req)) {
87eeee41 4314 spin_unlock_irq(&mdev->tconn->req_lock);
81e84650 4315 return false;
b411b363
PR
4316 }
4317 __req_mod(req, what, &m);
87eeee41 4318 spin_unlock_irq(&mdev->tconn->req_lock);
b411b363
PR
4319
4320 if (m.bio)
4321 complete_master_bio(mdev, &m);
81e84650 4322 return true;
b411b363
PR
4323}
4324
d8763023 4325static int got_BlockAck(struct drbd_conf *mdev, enum drbd_packet cmd)
b411b363 4326{
257d0af6 4327 struct p_block_ack *p = &mdev->tconn->meta.rbuf.block_ack;
b411b363
PR
4328 sector_t sector = be64_to_cpu(p->sector);
4329 int blksize = be32_to_cpu(p->blksize);
4330 enum drbd_req_event what;
4331
4332 update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4333
579b57ed 4334 if (p->block_id == ID_SYNCER) {
b411b363
PR
4335 drbd_set_in_sync(mdev, sector, blksize);
4336 dec_rs_pending(mdev);
81e84650 4337 return true;
b411b363 4338 }
257d0af6 4339 switch (cmd) {
b411b363 4340 case P_RS_WRITE_ACK:
89e58e75 4341 D_ASSERT(mdev->tconn->net_conf->wire_protocol == DRBD_PROT_C);
8554df1c 4342 what = WRITE_ACKED_BY_PEER_AND_SIS;
b411b363
PR
4343 break;
4344 case P_WRITE_ACK:
89e58e75 4345 D_ASSERT(mdev->tconn->net_conf->wire_protocol == DRBD_PROT_C);
8554df1c 4346 what = WRITE_ACKED_BY_PEER;
b411b363
PR
4347 break;
4348 case P_RECV_ACK:
89e58e75 4349 D_ASSERT(mdev->tconn->net_conf->wire_protocol == DRBD_PROT_B);
8554df1c 4350 what = RECV_ACKED_BY_PEER;
b411b363
PR
4351 break;
4352 case P_DISCARD_ACK:
89e58e75 4353 D_ASSERT(mdev->tconn->net_conf->wire_protocol == DRBD_PROT_C);
8554df1c 4354 what = CONFLICT_DISCARDED_BY_PEER;
b411b363
PR
4355 break;
4356 default:
4357 D_ASSERT(0);
81e84650 4358 return false;
b411b363
PR
4359 }
4360
4361 return validate_req_change_req_state(mdev, p->block_id, sector,
bc9c5c41
AG
4362 &mdev->write_requests, __func__,
4363 what, false);
b411b363
PR
4364}
4365
d8763023 4366static int got_NegAck(struct drbd_conf *mdev, enum drbd_packet cmd)
b411b363 4367{
257d0af6 4368 struct p_block_ack *p = &mdev->tconn->meta.rbuf.block_ack;
b411b363 4369 sector_t sector = be64_to_cpu(p->sector);
2deb8336 4370 int size = be32_to_cpu(p->blksize);
89e58e75
PR
4371 bool missing_ok = mdev->tconn->net_conf->wire_protocol == DRBD_PROT_A ||
4372 mdev->tconn->net_conf->wire_protocol == DRBD_PROT_B;
c3afd8f5 4373 bool found;
b411b363
PR
4374
4375 update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4376
579b57ed 4377 if (p->block_id == ID_SYNCER) {
b411b363
PR
4378 dec_rs_pending(mdev);
4379 drbd_rs_failed_io(mdev, sector, size);
81e84650 4380 return true;
b411b363 4381 }
2deb8336 4382
c3afd8f5 4383 found = validate_req_change_req_state(mdev, p->block_id, sector,
bc9c5c41 4384 &mdev->write_requests, __func__,
8554df1c 4385 NEG_ACKED, missing_ok);
c3afd8f5
AG
4386 if (!found) {
4387 /* Protocol A has no P_WRITE_ACKs, but has P_NEG_ACKs.
4388 The master bio might already be completed, therefore the
4389 request is no longer in the collision hash. */
4390 /* In Protocol B we might already have got a P_RECV_ACK
4391 but then get a P_NEG_ACK afterwards. */
4392 if (!missing_ok)
2deb8336 4393 return false;
c3afd8f5 4394 drbd_set_out_of_sync(mdev, sector, size);
2deb8336 4395 }
2deb8336 4396 return true;
b411b363
PR
4397}
4398
d8763023 4399static int got_NegDReply(struct drbd_conf *mdev, enum drbd_packet cmd)
b411b363 4400{
257d0af6 4401 struct p_block_ack *p = &mdev->tconn->meta.rbuf.block_ack;
b411b363
PR
4402 sector_t sector = be64_to_cpu(p->sector);
4403
4404 update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4405 dev_err(DEV, "Got NegDReply; Sector %llus, len %u; Fail original request.\n",
4406 (unsigned long long)sector, be32_to_cpu(p->blksize));
4407
4408 return validate_req_change_req_state(mdev, p->block_id, sector,
bc9c5c41 4409 &mdev->read_requests, __func__,
8554df1c 4410 NEG_ACKED, false);
b411b363
PR
4411}
4412
d8763023 4413static int got_NegRSDReply(struct drbd_conf *mdev, enum drbd_packet cmd)
b411b363
PR
4414{
4415 sector_t sector;
4416 int size;
257d0af6 4417 struct p_block_ack *p = &mdev->tconn->meta.rbuf.block_ack;
b411b363
PR
4418
4419 sector = be64_to_cpu(p->sector);
4420 size = be32_to_cpu(p->blksize);
b411b363
PR
4421
4422 update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4423
4424 dec_rs_pending(mdev);
4425
4426 if (get_ldev_if_state(mdev, D_FAILED)) {
4427 drbd_rs_complete_io(mdev, sector);
257d0af6 4428 switch (cmd) {
d612d309
PR
4429 case P_NEG_RS_DREPLY:
4430 drbd_rs_failed_io(mdev, sector, size);
4431 case P_RS_CANCEL:
4432 break;
4433 default:
4434 D_ASSERT(0);
4435 put_ldev(mdev);
4436 return false;
4437 }
b411b363
PR
4438 put_ldev(mdev);
4439 }
4440
81e84650 4441 return true;
b411b363
PR
4442}
4443
d8763023 4444static int got_BarrierAck(struct drbd_conf *mdev, enum drbd_packet cmd)
b411b363 4445{
257d0af6 4446 struct p_barrier_ack *p = &mdev->tconn->meta.rbuf.barrier_ack;
b411b363
PR
4447
4448 tl_release(mdev, p->barrier, be32_to_cpu(p->set_size));
4449
c4752ef1
PR
4450 if (mdev->state.conn == C_AHEAD &&
4451 atomic_read(&mdev->ap_in_flight) == 0 &&
370a43e7
PR
4452 !test_and_set_bit(AHEAD_TO_SYNC_SOURCE, &mdev->current_epoch->flags)) {
4453 mdev->start_resync_timer.expires = jiffies + HZ;
4454 add_timer(&mdev->start_resync_timer);
c4752ef1
PR
4455 }
4456
81e84650 4457 return true;
b411b363
PR
4458}
4459
d8763023 4460static int got_OVResult(struct drbd_conf *mdev, enum drbd_packet cmd)
b411b363 4461{
257d0af6 4462 struct p_block_ack *p = &mdev->tconn->meta.rbuf.block_ack;
b411b363
PR
4463 struct drbd_work *w;
4464 sector_t sector;
4465 int size;
4466
4467 sector = be64_to_cpu(p->sector);
4468 size = be32_to_cpu(p->blksize);
4469
4470 update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4471
4472 if (be64_to_cpu(p->block_id) == ID_OUT_OF_SYNC)
4473 drbd_ov_oos_found(mdev, sector, size);
4474 else
4475 ov_oos_print(mdev);
4476
1d53f09e 4477 if (!get_ldev(mdev))
81e84650 4478 return true;
1d53f09e 4479
b411b363
PR
4480 drbd_rs_complete_io(mdev, sector);
4481 dec_rs_pending(mdev);
4482
ea5442af
LE
4483 --mdev->ov_left;
4484
4485 /* let's advance progress step marks only for every other megabyte */
4486 if ((mdev->ov_left & 0x200) == 0x200)
4487 drbd_advance_rs_marks(mdev, mdev->ov_left);
4488
4489 if (mdev->ov_left == 0) {
b411b363
PR
4490 w = kmalloc(sizeof(*w), GFP_NOIO);
4491 if (w) {
4492 w->cb = w_ov_finished;
e42325a5 4493 drbd_queue_work_front(&mdev->tconn->data.work, w);
b411b363
PR
4494 } else {
4495 dev_err(DEV, "kmalloc(w) failed.");
4496 ov_oos_print(mdev);
4497 drbd_resync_finished(mdev);
4498 }
4499 }
1d53f09e 4500 put_ldev(mdev);
81e84650 4501 return true;
b411b363
PR
4502}
4503
d8763023 4504static int got_skip(struct drbd_conf *mdev, enum drbd_packet cmd)
0ced55a3 4505{
81e84650 4506 return true;
0ced55a3
PR
4507}
4508
b411b363
PR
4509struct asender_cmd {
4510 size_t pkt_size;
d8763023 4511 int (*process)(struct drbd_conf *mdev, enum drbd_packet cmd);
b411b363
PR
4512};
4513
4514static struct asender_cmd *get_asender_cmd(int cmd)
4515{
4516 static struct asender_cmd asender_tbl[] = {
4517 /* anything missing from this table is in
4518 * the drbd_cmd_handler (drbd_default_handler) table,
4519 * see the beginning of drbdd() */
257d0af6
PR
4520 [P_PING] = { sizeof(struct p_header), got_Ping },
4521 [P_PING_ACK] = { sizeof(struct p_header), got_PingAck },
b411b363
PR
4522 [P_RECV_ACK] = { sizeof(struct p_block_ack), got_BlockAck },
4523 [P_WRITE_ACK] = { sizeof(struct p_block_ack), got_BlockAck },
4524 [P_RS_WRITE_ACK] = { sizeof(struct p_block_ack), got_BlockAck },
4525 [P_DISCARD_ACK] = { sizeof(struct p_block_ack), got_BlockAck },
4526 [P_NEG_ACK] = { sizeof(struct p_block_ack), got_NegAck },
4527 [P_NEG_DREPLY] = { sizeof(struct p_block_ack), got_NegDReply },
4528 [P_NEG_RS_DREPLY] = { sizeof(struct p_block_ack), got_NegRSDReply},
4529 [P_OV_RESULT] = { sizeof(struct p_block_ack), got_OVResult },
4530 [P_BARRIER_ACK] = { sizeof(struct p_barrier_ack), got_BarrierAck },
4531 [P_STATE_CHG_REPLY] = { sizeof(struct p_req_state_reply), got_RqSReply },
4532 [P_RS_IS_IN_SYNC] = { sizeof(struct p_block_ack), got_IsInSync },
02918be2 4533 [P_DELAY_PROBE] = { sizeof(struct p_delay_probe93), got_skip },
d612d309 4534 [P_RS_CANCEL] = { sizeof(struct p_block_ack), got_NegRSDReply},
b411b363
PR
4535 [P_MAX_CMD] = { 0, NULL },
4536 };
4537 if (cmd > P_MAX_CMD || asender_tbl[cmd].process == NULL)
4538 return NULL;
4539 return &asender_tbl[cmd];
4540}
4541
4542int drbd_asender(struct drbd_thread *thi)
4543{
4544 struct drbd_conf *mdev = thi->mdev;
257d0af6 4545 struct p_header *h = &mdev->tconn->meta.rbuf.header;
b411b363
PR
4546 struct asender_cmd *cmd = NULL;
4547
257d0af6 4548 int rv;
b411b363
PR
4549 void *buf = h;
4550 int received = 0;
257d0af6 4551 int expect = sizeof(struct p_header);
f36af18c 4552 int ping_timeout_active = 0;
257d0af6 4553 int empty, pkt_size;
d8763023 4554 enum drbd_packet cmd_nr;
b411b363
PR
4555
4556 sprintf(current->comm, "drbd%d_asender", mdev_to_minor(mdev));
4557
4558 current->policy = SCHED_RR; /* Make this a realtime task! */
4559 current->rt_priority = 2; /* more important than all other tasks */
4560
e77a0a5c 4561 while (get_t_state(thi) == RUNNING) {
bc31fe33 4562 drbd_thread_current_set_cpu(mdev, thi);
e43ef195 4563 if (test_and_clear_bit(SEND_PING, &mdev->tconn->flags)) {
841ce241
AG
4564 if (!drbd_send_ping(mdev)) {
4565 dev_err(DEV, "drbd_send_ping has failed\n");
4566 goto reconnect;
4567 }
e42325a5 4568 mdev->tconn->meta.socket->sk->sk_rcvtimeo =
89e58e75 4569 mdev->tconn->net_conf->ping_timeo*HZ/10;
f36af18c 4570 ping_timeout_active = 1;
b411b363
PR
4571 }
4572
4573 /* conditionally cork;
4574 * it may hurt latency if we cork without much to send */
89e58e75 4575 if (!mdev->tconn->net_conf->no_cork &&
b411b363 4576 3 < atomic_read(&mdev->unacked_cnt))
e42325a5 4577 drbd_tcp_cork(mdev->tconn->meta.socket);
b411b363 4578 while (1) {
808e37b8 4579 clear_bit(SIGNAL_ASENDER, &mdev->tconn->flags);
b411b363 4580 flush_signals(current);
0f8488e1 4581 if (!drbd_process_done_ee(mdev))
b411b363 4582 goto reconnect;
b411b363 4583 /* to avoid race with newly queued ACKs */
808e37b8 4584 set_bit(SIGNAL_ASENDER, &mdev->tconn->flags);
87eeee41 4585 spin_lock_irq(&mdev->tconn->req_lock);
b411b363 4586 empty = list_empty(&mdev->done_ee);
87eeee41 4587 spin_unlock_irq(&mdev->tconn->req_lock);
b411b363
PR
4588 /* new ack may have been queued right here,
4589 * but then there is also a signal pending,
4590 * and we start over... */
4591 if (empty)
4592 break;
4593 }
4594 /* but unconditionally uncork unless disabled */
89e58e75 4595 if (!mdev->tconn->net_conf->no_cork)
e42325a5 4596 drbd_tcp_uncork(mdev->tconn->meta.socket);
b411b363
PR
4597
4598 /* short circuit, recv_msg would return EINTR anyways. */
4599 if (signal_pending(current))
4600 continue;
4601
dbd9eea0 4602 rv = drbd_recv_short(mdev->tconn->meta.socket, buf, expect-received, 0);
808e37b8 4603 clear_bit(SIGNAL_ASENDER, &mdev->tconn->flags);
b411b363
PR
4604
4605 flush_signals(current);
4606
4607 /* Note:
4608 * -EINTR (on meta) we got a signal
4609 * -EAGAIN (on meta) rcvtimeo expired
4610 * -ECONNRESET other side closed the connection
4611 * -ERESTARTSYS (on data) we got a signal
4612 * rv < 0 other than above: unexpected error!
4613 * rv == expected: full header or command
4614 * rv < expected: "woken" by signal during receive
4615 * rv == 0 : "connection shut down by peer"
4616 */
4617 if (likely(rv > 0)) {
4618 received += rv;
4619 buf += rv;
4620 } else if (rv == 0) {
4621 dev_err(DEV, "meta connection shut down by peer.\n");
4622 goto reconnect;
4623 } else if (rv == -EAGAIN) {
cb6518cb
LE
4624 /* If the data socket received something meanwhile,
4625 * that is good enough: peer is still alive. */
31890f4a 4626 if (time_after(mdev->tconn->last_received,
e42325a5 4627 jiffies - mdev->tconn->meta.socket->sk->sk_rcvtimeo))
cb6518cb 4628 continue;
f36af18c 4629 if (ping_timeout_active) {
b411b363
PR
4630 dev_err(DEV, "PingAck did not arrive in time.\n");
4631 goto reconnect;
4632 }
e43ef195 4633 set_bit(SEND_PING, &mdev->tconn->flags);
b411b363
PR
4634 continue;
4635 } else if (rv == -EINTR) {
4636 continue;
4637 } else {
4638 dev_err(DEV, "sock_recvmsg returned %d\n", rv);
4639 goto reconnect;
4640 }
4641
4642 if (received == expect && cmd == NULL) {
257d0af6 4643 if (!decode_header(mdev, h, &cmd_nr, &pkt_size))
b411b363 4644 goto reconnect;
257d0af6 4645 cmd = get_asender_cmd(cmd_nr);
b411b363 4646 if (unlikely(cmd == NULL)) {
257d0af6
PR
4647 dev_err(DEV, "unknown command %d on meta (l: %d)\n",
4648 cmd_nr, pkt_size);
b411b363
PR
4649 goto disconnect;
4650 }
4651 expect = cmd->pkt_size;
257d0af6
PR
4652 if (pkt_size != expect - sizeof(struct p_header)) {
4653 dev_err(DEV, "Wrong packet size on meta (c: %d, l: %d)\n",
4654 cmd_nr, pkt_size);
b411b363 4655 goto reconnect;
257d0af6 4656 }
b411b363
PR
4657 }
4658 if (received == expect) {
31890f4a 4659 mdev->tconn->last_received = jiffies;
b411b363 4660 D_ASSERT(cmd != NULL);
257d0af6 4661 if (!cmd->process(mdev, cmd_nr))
b411b363
PR
4662 goto reconnect;
4663
f36af18c
LE
4664 /* the idle_timeout (ping-int)
4665 * has been restored in got_PingAck() */
4666 if (cmd == get_asender_cmd(P_PING_ACK))
4667 ping_timeout_active = 0;
4668
b411b363
PR
4669 buf = h;
4670 received = 0;
257d0af6 4671 expect = sizeof(struct p_header);
b411b363
PR
4672 cmd = NULL;
4673 }
4674 }
4675
4676 if (0) {
4677reconnect:
4678 drbd_force_state(mdev, NS(conn, C_NETWORK_FAILURE));
856c50c7 4679 drbd_md_sync(mdev);
b411b363
PR
4680 }
4681 if (0) {
4682disconnect:
4683 drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
856c50c7 4684 drbd_md_sync(mdev);
b411b363 4685 }
808e37b8 4686 clear_bit(SIGNAL_ASENDER, &mdev->tconn->flags);
b411b363
PR
4687
4688 D_ASSERT(mdev->state.conn < C_CONNECTED);
4689 dev_info(DEV, "asender terminated\n");
4690
4691 return 0;
4692}
This page took 0.405023 seconds and 5 git commands to generate.