drbd: protect updates to integrits_tfm by tconn->data->mutex
[deliverable/linux.git] / drivers / block / drbd / drbd_receiver.c
CommitLineData
b411b363
PR
1/*
2 drbd_receiver.c
3
4 This file is part of DRBD by Philipp Reisner and Lars Ellenberg.
5
6 Copyright (C) 2001-2008, LINBIT Information Technologies GmbH.
7 Copyright (C) 1999-2008, Philipp Reisner <philipp.reisner@linbit.com>.
8 Copyright (C) 2002-2008, Lars Ellenberg <lars.ellenberg@linbit.com>.
9
10 drbd is free software; you can redistribute it and/or modify
11 it under the terms of the GNU General Public License as published by
12 the Free Software Foundation; either version 2, or (at your option)
13 any later version.
14
15 drbd is distributed in the hope that it will be useful,
16 but WITHOUT ANY WARRANTY; without even the implied warranty of
17 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18 GNU General Public License for more details.
19
20 You should have received a copy of the GNU General Public License
21 along with drbd; see the file COPYING. If not, write to
22 the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
23 */
24
25
b411b363
PR
26#include <linux/module.h>
27
28#include <asm/uaccess.h>
29#include <net/sock.h>
30
b411b363
PR
31#include <linux/drbd.h>
32#include <linux/fs.h>
33#include <linux/file.h>
34#include <linux/in.h>
35#include <linux/mm.h>
36#include <linux/memcontrol.h>
37#include <linux/mm_inline.h>
38#include <linux/slab.h>
b411b363
PR
39#include <linux/pkt_sched.h>
40#define __KERNEL_SYSCALLS__
41#include <linux/unistd.h>
42#include <linux/vmalloc.h>
43#include <linux/random.h>
b411b363
PR
44#include <linux/string.h>
45#include <linux/scatterlist.h>
46#include "drbd_int.h"
b411b363
PR
47#include "drbd_req.h"
48
49#include "drbd_vli.h"
50
77351055
PR
51struct packet_info {
52 enum drbd_packet cmd;
e2857216
AG
53 unsigned int size;
54 unsigned int vnr;
e658983a 55 void *data;
77351055
PR
56};
57
b411b363
PR
58enum finish_epoch {
59 FE_STILL_LIVE,
60 FE_DESTROYED,
61 FE_RECYCLED,
62};
63
6038178e 64static int drbd_do_features(struct drbd_tconn *tconn);
13e6037d 65static int drbd_do_auth(struct drbd_tconn *tconn);
c141ebda 66static int drbd_disconnected(struct drbd_conf *mdev);
b411b363
PR
67
68static enum finish_epoch drbd_may_finish_epoch(struct drbd_conf *, struct drbd_epoch *, enum epoch_event);
99920dc5 69static int e_end_block(struct drbd_work *, int);
b411b363 70
b411b363
PR
71
72#define GFP_TRY (__GFP_HIGHMEM | __GFP_NOWARN)
73
45bb912b
LE
74/*
75 * some helper functions to deal with single linked page lists,
76 * page->private being our "next" pointer.
77 */
78
79/* If at least n pages are linked at head, get n pages off.
80 * Otherwise, don't modify head, and return NULL.
81 * Locking is the responsibility of the caller.
82 */
83static struct page *page_chain_del(struct page **head, int n)
84{
85 struct page *page;
86 struct page *tmp;
87
88 BUG_ON(!n);
89 BUG_ON(!head);
90
91 page = *head;
23ce4227
PR
92
93 if (!page)
94 return NULL;
95
45bb912b
LE
96 while (page) {
97 tmp = page_chain_next(page);
98 if (--n == 0)
99 break; /* found sufficient pages */
100 if (tmp == NULL)
101 /* insufficient pages, don't use any of them. */
102 return NULL;
103 page = tmp;
104 }
105
106 /* add end of list marker for the returned list */
107 set_page_private(page, 0);
108 /* actual return value, and adjustment of head */
109 page = *head;
110 *head = tmp;
111 return page;
112}
113
114/* may be used outside of locks to find the tail of a (usually short)
115 * "private" page chain, before adding it back to a global chain head
116 * with page_chain_add() under a spinlock. */
117static struct page *page_chain_tail(struct page *page, int *len)
118{
119 struct page *tmp;
120 int i = 1;
121 while ((tmp = page_chain_next(page)))
122 ++i, page = tmp;
123 if (len)
124 *len = i;
125 return page;
126}
127
128static int page_chain_free(struct page *page)
129{
130 struct page *tmp;
131 int i = 0;
132 page_chain_for_each_safe(page, tmp) {
133 put_page(page);
134 ++i;
135 }
136 return i;
137}
138
139static void page_chain_add(struct page **head,
140 struct page *chain_first, struct page *chain_last)
141{
142#if 1
143 struct page *tmp;
144 tmp = page_chain_tail(chain_first, NULL);
145 BUG_ON(tmp != chain_last);
146#endif
147
148 /* add chain to head */
149 set_page_private(chain_last, (unsigned long)*head);
150 *head = chain_first;
151}
152
18c2d522
AG
153static struct page *__drbd_alloc_pages(struct drbd_conf *mdev,
154 unsigned int number)
b411b363
PR
155{
156 struct page *page = NULL;
45bb912b 157 struct page *tmp = NULL;
18c2d522 158 unsigned int i = 0;
b411b363
PR
159
160 /* Yes, testing drbd_pp_vacant outside the lock is racy.
161 * So what. It saves a spin_lock. */
45bb912b 162 if (drbd_pp_vacant >= number) {
b411b363 163 spin_lock(&drbd_pp_lock);
45bb912b
LE
164 page = page_chain_del(&drbd_pp_pool, number);
165 if (page)
166 drbd_pp_vacant -= number;
b411b363 167 spin_unlock(&drbd_pp_lock);
45bb912b
LE
168 if (page)
169 return page;
b411b363 170 }
45bb912b 171
b411b363
PR
172 /* GFP_TRY, because we must not cause arbitrary write-out: in a DRBD
173 * "criss-cross" setup, that might cause write-out on some other DRBD,
174 * which in turn might block on the other node at this very place. */
45bb912b
LE
175 for (i = 0; i < number; i++) {
176 tmp = alloc_page(GFP_TRY);
177 if (!tmp)
178 break;
179 set_page_private(tmp, (unsigned long)page);
180 page = tmp;
181 }
182
183 if (i == number)
184 return page;
185
186 /* Not enough pages immediately available this time.
c37c8ecf 187 * No need to jump around here, drbd_alloc_pages will retry this
45bb912b
LE
188 * function "soon". */
189 if (page) {
190 tmp = page_chain_tail(page, NULL);
191 spin_lock(&drbd_pp_lock);
192 page_chain_add(&drbd_pp_pool, page, tmp);
193 drbd_pp_vacant += i;
194 spin_unlock(&drbd_pp_lock);
195 }
196 return NULL;
b411b363
PR
197}
198
a990be46
AG
199static void reclaim_finished_net_peer_reqs(struct drbd_conf *mdev,
200 struct list_head *to_be_freed)
b411b363 201{
db830c46 202 struct drbd_peer_request *peer_req;
b411b363
PR
203 struct list_head *le, *tle;
204
205 /* The EEs are always appended to the end of the list. Since
206 they are sent in order over the wire, they have to finish
207 in order. As soon as we see the first not finished we can
208 stop to examine the list... */
209
210 list_for_each_safe(le, tle, &mdev->net_ee) {
db830c46 211 peer_req = list_entry(le, struct drbd_peer_request, w.list);
045417f7 212 if (drbd_peer_req_has_active_page(peer_req))
b411b363
PR
213 break;
214 list_move(le, to_be_freed);
215 }
216}
217
218static void drbd_kick_lo_and_reclaim_net(struct drbd_conf *mdev)
219{
220 LIST_HEAD(reclaimed);
db830c46 221 struct drbd_peer_request *peer_req, *t;
b411b363 222
87eeee41 223 spin_lock_irq(&mdev->tconn->req_lock);
a990be46 224 reclaim_finished_net_peer_reqs(mdev, &reclaimed);
87eeee41 225 spin_unlock_irq(&mdev->tconn->req_lock);
b411b363 226
db830c46 227 list_for_each_entry_safe(peer_req, t, &reclaimed, w.list)
3967deb1 228 drbd_free_net_peer_req(mdev, peer_req);
b411b363
PR
229}
230
231/**
c37c8ecf 232 * drbd_alloc_pages() - Returns @number pages, retries forever (or until signalled)
b411b363 233 * @mdev: DRBD device.
45bb912b
LE
234 * @number: number of pages requested
235 * @retry: whether to retry, if not enough pages are available right now
236 *
237 * Tries to allocate number pages, first from our own page pool, then from
238 * the kernel, unless this allocation would exceed the max_buffers setting.
239 * Possibly retry until DRBD frees sufficient pages somewhere else.
b411b363 240 *
45bb912b 241 * Returns a page chain linked via page->private.
b411b363 242 */
c37c8ecf
AG
243struct page *drbd_alloc_pages(struct drbd_conf *mdev, unsigned int number,
244 bool retry)
b411b363
PR
245{
246 struct page *page = NULL;
44ed167d 247 struct net_conf *nc;
b411b363 248 DEFINE_WAIT(wait);
44ed167d 249 int mxb;
b411b363 250
45bb912b
LE
251 /* Yes, we may run up to @number over max_buffers. If we
252 * follow it strictly, the admin will get it wrong anyways. */
44ed167d
PR
253 rcu_read_lock();
254 nc = rcu_dereference(mdev->tconn->net_conf);
255 mxb = nc ? nc->max_buffers : 1000000;
256 rcu_read_unlock();
257
258 if (atomic_read(&mdev->pp_in_use) < mxb)
18c2d522 259 page = __drbd_alloc_pages(mdev, number);
b411b363 260
45bb912b 261 while (page == NULL) {
b411b363
PR
262 prepare_to_wait(&drbd_pp_wait, &wait, TASK_INTERRUPTIBLE);
263
264 drbd_kick_lo_and_reclaim_net(mdev);
265
44ed167d 266 if (atomic_read(&mdev->pp_in_use) < mxb) {
18c2d522 267 page = __drbd_alloc_pages(mdev, number);
b411b363
PR
268 if (page)
269 break;
270 }
271
272 if (!retry)
273 break;
274
275 if (signal_pending(current)) {
c37c8ecf 276 dev_warn(DEV, "drbd_alloc_pages interrupted!\n");
b411b363
PR
277 break;
278 }
279
280 schedule();
281 }
282 finish_wait(&drbd_pp_wait, &wait);
283
45bb912b
LE
284 if (page)
285 atomic_add(number, &mdev->pp_in_use);
b411b363
PR
286 return page;
287}
288
c37c8ecf 289/* Must not be used from irq, as that may deadlock: see drbd_alloc_pages.
87eeee41 290 * Is also used from inside an other spin_lock_irq(&mdev->tconn->req_lock);
45bb912b
LE
291 * Either links the page chain back to the global pool,
292 * or returns all pages to the system. */
5cc287e0 293static void drbd_free_pages(struct drbd_conf *mdev, struct page *page, int is_net)
b411b363 294{
435f0740 295 atomic_t *a = is_net ? &mdev->pp_in_use_by_net : &mdev->pp_in_use;
b411b363 296 int i;
435f0740 297
81a5d60e 298 if (drbd_pp_vacant > (DRBD_MAX_BIO_SIZE/PAGE_SIZE) * minor_count)
45bb912b
LE
299 i = page_chain_free(page);
300 else {
301 struct page *tmp;
302 tmp = page_chain_tail(page, &i);
303 spin_lock(&drbd_pp_lock);
304 page_chain_add(&drbd_pp_pool, page, tmp);
305 drbd_pp_vacant += i;
306 spin_unlock(&drbd_pp_lock);
b411b363 307 }
435f0740 308 i = atomic_sub_return(i, a);
45bb912b 309 if (i < 0)
435f0740
LE
310 dev_warn(DEV, "ASSERTION FAILED: %s: %d < 0\n",
311 is_net ? "pp_in_use_by_net" : "pp_in_use", i);
b411b363
PR
312 wake_up(&drbd_pp_wait);
313}
314
315/*
316You need to hold the req_lock:
317 _drbd_wait_ee_list_empty()
318
319You must not have the req_lock:
3967deb1 320 drbd_free_peer_req()
0db55363 321 drbd_alloc_peer_req()
7721f567 322 drbd_free_peer_reqs()
b411b363 323 drbd_ee_fix_bhs()
a990be46 324 drbd_finish_peer_reqs()
b411b363
PR
325 drbd_clear_done_ee()
326 drbd_wait_ee_list_empty()
327*/
328
f6ffca9f 329struct drbd_peer_request *
0db55363
AG
330drbd_alloc_peer_req(struct drbd_conf *mdev, u64 id, sector_t sector,
331 unsigned int data_size, gfp_t gfp_mask) __must_hold(local)
b411b363 332{
db830c46 333 struct drbd_peer_request *peer_req;
b411b363 334 struct page *page;
45bb912b 335 unsigned nr_pages = (data_size + PAGE_SIZE -1) >> PAGE_SHIFT;
b411b363 336
0cf9d27e 337 if (drbd_insert_fault(mdev, DRBD_FAULT_AL_EE))
b411b363
PR
338 return NULL;
339
db830c46
AG
340 peer_req = mempool_alloc(drbd_ee_mempool, gfp_mask & ~__GFP_HIGHMEM);
341 if (!peer_req) {
b411b363 342 if (!(gfp_mask & __GFP_NOWARN))
0db55363 343 dev_err(DEV, "%s: allocation failed\n", __func__);
b411b363
PR
344 return NULL;
345 }
346
c37c8ecf 347 page = drbd_alloc_pages(mdev, nr_pages, (gfp_mask & __GFP_WAIT));
45bb912b
LE
348 if (!page)
349 goto fail;
b411b363 350
db830c46
AG
351 drbd_clear_interval(&peer_req->i);
352 peer_req->i.size = data_size;
353 peer_req->i.sector = sector;
354 peer_req->i.local = false;
355 peer_req->i.waiting = false;
356
357 peer_req->epoch = NULL;
a21e9298 358 peer_req->w.mdev = mdev;
db830c46
AG
359 peer_req->pages = page;
360 atomic_set(&peer_req->pending_bios, 0);
361 peer_req->flags = 0;
9a8e7753
AG
362 /*
363 * The block_id is opaque to the receiver. It is not endianness
364 * converted, and sent back to the sender unchanged.
365 */
db830c46 366 peer_req->block_id = id;
b411b363 367
db830c46 368 return peer_req;
b411b363 369
45bb912b 370 fail:
db830c46 371 mempool_free(peer_req, drbd_ee_mempool);
b411b363
PR
372 return NULL;
373}
374
3967deb1 375void __drbd_free_peer_req(struct drbd_conf *mdev, struct drbd_peer_request *peer_req,
f6ffca9f 376 int is_net)
b411b363 377{
db830c46
AG
378 if (peer_req->flags & EE_HAS_DIGEST)
379 kfree(peer_req->digest);
5cc287e0 380 drbd_free_pages(mdev, peer_req->pages, is_net);
db830c46
AG
381 D_ASSERT(atomic_read(&peer_req->pending_bios) == 0);
382 D_ASSERT(drbd_interval_empty(&peer_req->i));
383 mempool_free(peer_req, drbd_ee_mempool);
b411b363
PR
384}
385
7721f567 386int drbd_free_peer_reqs(struct drbd_conf *mdev, struct list_head *list)
b411b363
PR
387{
388 LIST_HEAD(work_list);
db830c46 389 struct drbd_peer_request *peer_req, *t;
b411b363 390 int count = 0;
435f0740 391 int is_net = list == &mdev->net_ee;
b411b363 392
87eeee41 393 spin_lock_irq(&mdev->tconn->req_lock);
b411b363 394 list_splice_init(list, &work_list);
87eeee41 395 spin_unlock_irq(&mdev->tconn->req_lock);
b411b363 396
db830c46 397 list_for_each_entry_safe(peer_req, t, &work_list, w.list) {
3967deb1 398 __drbd_free_peer_req(mdev, peer_req, is_net);
b411b363
PR
399 count++;
400 }
401 return count;
402}
403
a990be46
AG
404/*
405 * See also comments in _req_mod(,BARRIER_ACKED) and receive_Barrier.
b411b363 406 */
a990be46 407static int drbd_finish_peer_reqs(struct drbd_conf *mdev)
b411b363
PR
408{
409 LIST_HEAD(work_list);
410 LIST_HEAD(reclaimed);
db830c46 411 struct drbd_peer_request *peer_req, *t;
e2b3032b 412 int err = 0;
b411b363 413
87eeee41 414 spin_lock_irq(&mdev->tconn->req_lock);
a990be46 415 reclaim_finished_net_peer_reqs(mdev, &reclaimed);
b411b363 416 list_splice_init(&mdev->done_ee, &work_list);
87eeee41 417 spin_unlock_irq(&mdev->tconn->req_lock);
b411b363 418
db830c46 419 list_for_each_entry_safe(peer_req, t, &reclaimed, w.list)
3967deb1 420 drbd_free_net_peer_req(mdev, peer_req);
b411b363
PR
421
422 /* possible callbacks here:
7be8da07 423 * e_end_block, and e_end_resync_block, e_send_discard_write.
b411b363
PR
424 * all ignore the last argument.
425 */
db830c46 426 list_for_each_entry_safe(peer_req, t, &work_list, w.list) {
e2b3032b
AG
427 int err2;
428
b411b363 429 /* list_del not necessary, next/prev members not touched */
e2b3032b
AG
430 err2 = peer_req->w.cb(&peer_req->w, !!err);
431 if (!err)
432 err = err2;
3967deb1 433 drbd_free_peer_req(mdev, peer_req);
b411b363
PR
434 }
435 wake_up(&mdev->ee_wait);
436
e2b3032b 437 return err;
b411b363
PR
438}
439
d4da1537
AG
440static void _drbd_wait_ee_list_empty(struct drbd_conf *mdev,
441 struct list_head *head)
b411b363
PR
442{
443 DEFINE_WAIT(wait);
444
445 /* avoids spin_lock/unlock
446 * and calling prepare_to_wait in the fast path */
447 while (!list_empty(head)) {
448 prepare_to_wait(&mdev->ee_wait, &wait, TASK_UNINTERRUPTIBLE);
87eeee41 449 spin_unlock_irq(&mdev->tconn->req_lock);
7eaceacc 450 io_schedule();
b411b363 451 finish_wait(&mdev->ee_wait, &wait);
87eeee41 452 spin_lock_irq(&mdev->tconn->req_lock);
b411b363
PR
453 }
454}
455
d4da1537
AG
456static void drbd_wait_ee_list_empty(struct drbd_conf *mdev,
457 struct list_head *head)
b411b363 458{
87eeee41 459 spin_lock_irq(&mdev->tconn->req_lock);
b411b363 460 _drbd_wait_ee_list_empty(mdev, head);
87eeee41 461 spin_unlock_irq(&mdev->tconn->req_lock);
b411b363
PR
462}
463
464/* see also kernel_accept; which is only present since 2.6.18.
465 * also we want to log which part of it failed, exactly */
7653620d 466static int drbd_accept(const char **what, struct socket *sock, struct socket **newsock)
b411b363
PR
467{
468 struct sock *sk = sock->sk;
469 int err = 0;
470
471 *what = "listen";
472 err = sock->ops->listen(sock, 5);
473 if (err < 0)
474 goto out;
475
476 *what = "sock_create_lite";
477 err = sock_create_lite(sk->sk_family, sk->sk_type, sk->sk_protocol,
478 newsock);
479 if (err < 0)
480 goto out;
481
482 *what = "accept";
483 err = sock->ops->accept(sock, *newsock, 0);
484 if (err < 0) {
485 sock_release(*newsock);
486 *newsock = NULL;
487 goto out;
488 }
489 (*newsock)->ops = sock->ops;
490
491out:
492 return err;
493}
494
dbd9eea0 495static int drbd_recv_short(struct socket *sock, void *buf, size_t size, int flags)
b411b363
PR
496{
497 mm_segment_t oldfs;
498 struct kvec iov = {
499 .iov_base = buf,
500 .iov_len = size,
501 };
502 struct msghdr msg = {
503 .msg_iovlen = 1,
504 .msg_iov = (struct iovec *)&iov,
505 .msg_flags = (flags ? flags : MSG_WAITALL | MSG_NOSIGNAL)
506 };
507 int rv;
508
509 oldfs = get_fs();
510 set_fs(KERNEL_DS);
511 rv = sock_recvmsg(sock, &msg, size, msg.msg_flags);
512 set_fs(oldfs);
513
514 return rv;
515}
516
de0ff338 517static int drbd_recv(struct drbd_tconn *tconn, void *buf, size_t size)
b411b363
PR
518{
519 mm_segment_t oldfs;
520 struct kvec iov = {
521 .iov_base = buf,
522 .iov_len = size,
523 };
524 struct msghdr msg = {
525 .msg_iovlen = 1,
526 .msg_iov = (struct iovec *)&iov,
527 .msg_flags = MSG_WAITALL | MSG_NOSIGNAL
528 };
529 int rv;
530
531 oldfs = get_fs();
532 set_fs(KERNEL_DS);
533
534 for (;;) {
de0ff338 535 rv = sock_recvmsg(tconn->data.socket, &msg, size, msg.msg_flags);
b411b363
PR
536 if (rv == size)
537 break;
538
539 /* Note:
540 * ECONNRESET other side closed the connection
541 * ERESTARTSYS (on sock) we got a signal
542 */
543
544 if (rv < 0) {
545 if (rv == -ECONNRESET)
de0ff338 546 conn_info(tconn, "sock was reset by peer\n");
b411b363 547 else if (rv != -ERESTARTSYS)
de0ff338 548 conn_err(tconn, "sock_recvmsg returned %d\n", rv);
b411b363
PR
549 break;
550 } else if (rv == 0) {
de0ff338 551 conn_info(tconn, "sock was shut down by peer\n");
b411b363
PR
552 break;
553 } else {
554 /* signal came in, or peer/link went down,
555 * after we read a partial message
556 */
557 /* D_ASSERT(signal_pending(current)); */
558 break;
559 }
560 };
561
562 set_fs(oldfs);
563
564 if (rv != size)
bbeb641c 565 conn_request_state(tconn, NS(conn, C_BROKEN_PIPE), CS_HARD);
b411b363
PR
566
567 return rv;
568}
569
c6967746
AG
570static int drbd_recv_all(struct drbd_tconn *tconn, void *buf, size_t size)
571{
572 int err;
573
574 err = drbd_recv(tconn, buf, size);
575 if (err != size) {
576 if (err >= 0)
577 err = -EIO;
578 } else
579 err = 0;
580 return err;
581}
582
a5c31904
AG
583static int drbd_recv_all_warn(struct drbd_tconn *tconn, void *buf, size_t size)
584{
585 int err;
586
587 err = drbd_recv_all(tconn, buf, size);
588 if (err && !signal_pending(current))
589 conn_warn(tconn, "short read (expected size %d)\n", (int)size);
590 return err;
591}
592
5dbf1673
LE
593/* quoting tcp(7):
594 * On individual connections, the socket buffer size must be set prior to the
595 * listen(2) or connect(2) calls in order to have it take effect.
596 * This is our wrapper to do so.
597 */
598static void drbd_setbufsize(struct socket *sock, unsigned int snd,
599 unsigned int rcv)
600{
601 /* open coded SO_SNDBUF, SO_RCVBUF */
602 if (snd) {
603 sock->sk->sk_sndbuf = snd;
604 sock->sk->sk_userlocks |= SOCK_SNDBUF_LOCK;
605 }
606 if (rcv) {
607 sock->sk->sk_rcvbuf = rcv;
608 sock->sk->sk_userlocks |= SOCK_RCVBUF_LOCK;
609 }
610}
611
eac3e990 612static struct socket *drbd_try_connect(struct drbd_tconn *tconn)
b411b363
PR
613{
614 const char *what;
615 struct socket *sock;
616 struct sockaddr_in6 src_in6;
44ed167d
PR
617 struct sockaddr_in6 peer_in6;
618 struct net_conf *nc;
619 int err, peer_addr_len, my_addr_len;
69ef82de 620 int sndbuf_size, rcvbuf_size, connect_int;
b411b363
PR
621 int disconnect_on_error = 1;
622
44ed167d
PR
623 rcu_read_lock();
624 nc = rcu_dereference(tconn->net_conf);
625 if (!nc) {
626 rcu_read_unlock();
b411b363 627 return NULL;
44ed167d
PR
628 }
629
630 sndbuf_size = nc->sndbuf_size;
631 rcvbuf_size = nc->rcvbuf_size;
69ef82de 632 connect_int = nc->connect_int;
44ed167d
PR
633
634 my_addr_len = min_t(int, nc->my_addr_len, sizeof(src_in6));
635 memcpy(&src_in6, nc->my_addr, my_addr_len);
636
637 if (((struct sockaddr *)nc->my_addr)->sa_family == AF_INET6)
638 src_in6.sin6_port = 0;
639 else
640 ((struct sockaddr_in *)&src_in6)->sin_port = 0; /* AF_INET & AF_SCI */
641
642 peer_addr_len = min_t(int, nc->peer_addr_len, sizeof(src_in6));
643 memcpy(&peer_in6, nc->peer_addr, peer_addr_len);
644
645 rcu_read_unlock();
b411b363
PR
646
647 what = "sock_create_kern";
44ed167d
PR
648 err = sock_create_kern(((struct sockaddr *)&src_in6)->sa_family,
649 SOCK_STREAM, IPPROTO_TCP, &sock);
b411b363
PR
650 if (err < 0) {
651 sock = NULL;
652 goto out;
653 }
654
655 sock->sk->sk_rcvtimeo =
69ef82de 656 sock->sk->sk_sndtimeo = connect_int * HZ;
44ed167d 657 drbd_setbufsize(sock, sndbuf_size, rcvbuf_size);
b411b363
PR
658
659 /* explicitly bind to the configured IP as source IP
660 * for the outgoing connections.
661 * This is needed for multihomed hosts and to be
662 * able to use lo: interfaces for drbd.
663 * Make sure to use 0 as port number, so linux selects
664 * a free one dynamically.
665 */
b411b363 666 what = "bind before connect";
44ed167d 667 err = sock->ops->bind(sock, (struct sockaddr *) &src_in6, my_addr_len);
b411b363
PR
668 if (err < 0)
669 goto out;
670
671 /* connect may fail, peer not yet available.
672 * stay C_WF_CONNECTION, don't go Disconnecting! */
673 disconnect_on_error = 0;
674 what = "connect";
44ed167d 675 err = sock->ops->connect(sock, (struct sockaddr *) &peer_in6, peer_addr_len, 0);
b411b363
PR
676
677out:
678 if (err < 0) {
679 if (sock) {
680 sock_release(sock);
681 sock = NULL;
682 }
683 switch (-err) {
684 /* timeout, busy, signal pending */
685 case ETIMEDOUT: case EAGAIN: case EINPROGRESS:
686 case EINTR: case ERESTARTSYS:
687 /* peer not (yet) available, network problem */
688 case ECONNREFUSED: case ENETUNREACH:
689 case EHOSTDOWN: case EHOSTUNREACH:
690 disconnect_on_error = 0;
691 break;
692 default:
eac3e990 693 conn_err(tconn, "%s failed, err = %d\n", what, err);
b411b363
PR
694 }
695 if (disconnect_on_error)
bbeb641c 696 conn_request_state(tconn, NS(conn, C_DISCONNECTING), CS_HARD);
b411b363 697 }
44ed167d 698
b411b363
PR
699 return sock;
700}
701
7653620d 702static struct socket *drbd_wait_for_connect(struct drbd_tconn *tconn)
b411b363 703{
44ed167d 704 int timeo, err, my_addr_len;
69ef82de 705 int sndbuf_size, rcvbuf_size, connect_int;
b411b363 706 struct socket *s_estab = NULL, *s_listen;
44ed167d
PR
707 struct sockaddr_in6 my_addr;
708 struct net_conf *nc;
b411b363
PR
709 const char *what;
710
44ed167d
PR
711 rcu_read_lock();
712 nc = rcu_dereference(tconn->net_conf);
713 if (!nc) {
714 rcu_read_unlock();
b411b363 715 return NULL;
44ed167d
PR
716 }
717
718 sndbuf_size = nc->sndbuf_size;
719 rcvbuf_size = nc->rcvbuf_size;
69ef82de 720 connect_int = nc->connect_int;
44ed167d
PR
721
722 my_addr_len = min_t(int, nc->my_addr_len, sizeof(struct sockaddr_in6));
723 memcpy(&my_addr, nc->my_addr, my_addr_len);
724 rcu_read_unlock();
b411b363
PR
725
726 what = "sock_create_kern";
44ed167d 727 err = sock_create_kern(((struct sockaddr *)&my_addr)->sa_family,
b411b363
PR
728 SOCK_STREAM, IPPROTO_TCP, &s_listen);
729 if (err) {
730 s_listen = NULL;
731 goto out;
732 }
733
69ef82de 734 timeo = connect_int * HZ;
b411b363
PR
735 timeo += (random32() & 1) ? timeo / 7 : -timeo / 7; /* 28.5% random jitter */
736
737 s_listen->sk->sk_reuse = 1; /* SO_REUSEADDR */
738 s_listen->sk->sk_rcvtimeo = timeo;
739 s_listen->sk->sk_sndtimeo = timeo;
44ed167d 740 drbd_setbufsize(s_listen, sndbuf_size, rcvbuf_size);
b411b363
PR
741
742 what = "bind before listen";
44ed167d 743 err = s_listen->ops->bind(s_listen, (struct sockaddr *)&my_addr, my_addr_len);
b411b363
PR
744 if (err < 0)
745 goto out;
746
7653620d 747 err = drbd_accept(&what, s_listen, &s_estab);
b411b363
PR
748
749out:
750 if (s_listen)
751 sock_release(s_listen);
752 if (err < 0) {
753 if (err != -EAGAIN && err != -EINTR && err != -ERESTARTSYS) {
7653620d 754 conn_err(tconn, "%s failed, err = %d\n", what, err);
bbeb641c 755 conn_request_state(tconn, NS(conn, C_DISCONNECTING), CS_HARD);
b411b363
PR
756 }
757 }
b411b363
PR
758
759 return s_estab;
760}
761
e658983a 762static int decode_header(struct drbd_tconn *, void *, struct packet_info *);
b411b363 763
9f5bdc33
AG
764static int send_first_packet(struct drbd_tconn *tconn, struct drbd_socket *sock,
765 enum drbd_packet cmd)
766{
767 if (!conn_prepare_command(tconn, sock))
768 return -EIO;
e658983a 769 return conn_send_command(tconn, sock, cmd, 0, NULL, 0);
b411b363
PR
770}
771
9f5bdc33 772static int receive_first_packet(struct drbd_tconn *tconn, struct socket *sock)
b411b363 773{
9f5bdc33
AG
774 unsigned int header_size = drbd_header_size(tconn);
775 struct packet_info pi;
776 int err;
b411b363 777
9f5bdc33
AG
778 err = drbd_recv_short(sock, tconn->data.rbuf, header_size, 0);
779 if (err != header_size) {
780 if (err >= 0)
781 err = -EIO;
782 return err;
783 }
784 err = decode_header(tconn, tconn->data.rbuf, &pi);
785 if (err)
786 return err;
787 return pi.cmd;
b411b363
PR
788}
789
790/**
791 * drbd_socket_okay() - Free the socket if its connection is not okay
b411b363
PR
792 * @sock: pointer to the pointer to the socket.
793 */
dbd9eea0 794static int drbd_socket_okay(struct socket **sock)
b411b363
PR
795{
796 int rr;
797 char tb[4];
798
799 if (!*sock)
81e84650 800 return false;
b411b363 801
dbd9eea0 802 rr = drbd_recv_short(*sock, tb, 4, MSG_DONTWAIT | MSG_PEEK);
b411b363
PR
803
804 if (rr > 0 || rr == -EAGAIN) {
81e84650 805 return true;
b411b363
PR
806 } else {
807 sock_release(*sock);
808 *sock = NULL;
81e84650 809 return false;
b411b363
PR
810 }
811}
2325eb66
PR
812/* Gets called if a connection is established, or if a new minor gets created
813 in a connection */
c141ebda 814int drbd_connected(struct drbd_conf *mdev)
907599e0 815{
0829f5ed 816 int err;
907599e0
PR
817
818 atomic_set(&mdev->packet_seq, 0);
819 mdev->peer_seq = 0;
820
8410da8f
PR
821 mdev->state_mutex = mdev->tconn->agreed_pro_version < 100 ?
822 &mdev->tconn->cstate_mutex :
823 &mdev->own_state_mutex;
824
0829f5ed
AG
825 err = drbd_send_sync_param(mdev);
826 if (!err)
827 err = drbd_send_sizes(mdev, 0, 0);
828 if (!err)
829 err = drbd_send_uuids(mdev);
830 if (!err)
831 err = drbd_send_state(mdev);
907599e0
PR
832 clear_bit(USE_DEGR_WFC_T, &mdev->flags);
833 clear_bit(RESIZE_PENDING, &mdev->flags);
8b924f1d 834 mod_timer(&mdev->request_timer, jiffies + HZ); /* just start it here. */
0829f5ed 835 return err;
907599e0
PR
836}
837
b411b363
PR
838/*
839 * return values:
840 * 1 yes, we have a valid connection
841 * 0 oops, did not work out, please try again
842 * -1 peer talks different language,
843 * no point in trying again, please go standalone.
844 * -2 We do not have a network config...
845 */
81fa2e67 846static int conn_connect(struct drbd_tconn *tconn)
b411b363 847{
2bf89621 848 struct socket *sock, *msock;
c141ebda 849 struct drbd_conf *mdev;
44ed167d 850 struct net_conf *nc;
c141ebda 851 int vnr, timeout, try, h, ok;
b411b363 852
bbeb641c 853 if (conn_request_state(tconn, NS(conn, C_WF_CONNECTION), CS_VERBOSE) < SS_SUCCESS)
b411b363
PR
854 return -2;
855
907599e0 856 clear_bit(DISCARD_CONCURRENT, &tconn->flags);
0916e0e3
AG
857
858 /* Assume that the peer only understands protocol 80 until we know better. */
859 tconn->agreed_pro_version = 80;
b411b363 860
b411b363 861 do {
2bf89621
AG
862 struct socket *s;
863
b411b363
PR
864 for (try = 0;;) {
865 /* 3 tries, this should take less than a second! */
907599e0 866 s = drbd_try_connect(tconn);
b411b363
PR
867 if (s || ++try >= 3)
868 break;
869 /* give the other side time to call bind() & listen() */
20ee6390 870 schedule_timeout_interruptible(HZ / 10);
b411b363
PR
871 }
872
873 if (s) {
2bf89621
AG
874 if (!tconn->data.socket) {
875 tconn->data.socket = s;
9f5bdc33 876 send_first_packet(tconn, &tconn->data, P_INITIAL_DATA);
2bf89621
AG
877 } else if (!tconn->meta.socket) {
878 tconn->meta.socket = s;
9f5bdc33 879 send_first_packet(tconn, &tconn->meta, P_INITIAL_META);
b411b363 880 } else {
81fa2e67 881 conn_err(tconn, "Logic error in conn_connect()\n");
b411b363
PR
882 goto out_release_sockets;
883 }
884 }
885
2bf89621 886 if (tconn->data.socket && tconn->meta.socket) {
907599e0 887 schedule_timeout_interruptible(tconn->net_conf->ping_timeo*HZ/10);
2bf89621
AG
888 ok = drbd_socket_okay(&tconn->data.socket);
889 ok = drbd_socket_okay(&tconn->meta.socket) && ok;
b411b363
PR
890 if (ok)
891 break;
892 }
893
894retry:
907599e0 895 s = drbd_wait_for_connect(tconn);
b411b363 896 if (s) {
9f5bdc33 897 try = receive_first_packet(tconn, s);
2bf89621
AG
898 drbd_socket_okay(&tconn->data.socket);
899 drbd_socket_okay(&tconn->meta.socket);
b411b363 900 switch (try) {
e5d6f33a 901 case P_INITIAL_DATA:
2bf89621 902 if (tconn->data.socket) {
907599e0 903 conn_warn(tconn, "initial packet S crossed\n");
2bf89621 904 sock_release(tconn->data.socket);
b411b363 905 }
2bf89621 906 tconn->data.socket = s;
b411b363 907 break;
e5d6f33a 908 case P_INITIAL_META:
2bf89621 909 if (tconn->meta.socket) {
907599e0 910 conn_warn(tconn, "initial packet M crossed\n");
2bf89621 911 sock_release(tconn->meta.socket);
b411b363 912 }
2bf89621 913 tconn->meta.socket = s;
907599e0 914 set_bit(DISCARD_CONCURRENT, &tconn->flags);
b411b363
PR
915 break;
916 default:
907599e0 917 conn_warn(tconn, "Error receiving initial packet\n");
b411b363
PR
918 sock_release(s);
919 if (random32() & 1)
920 goto retry;
921 }
922 }
923
bbeb641c 924 if (tconn->cstate <= C_DISCONNECTING)
b411b363
PR
925 goto out_release_sockets;
926 if (signal_pending(current)) {
927 flush_signals(current);
928 smp_rmb();
907599e0 929 if (get_t_state(&tconn->receiver) == EXITING)
b411b363
PR
930 goto out_release_sockets;
931 }
932
2bf89621
AG
933 if (tconn->data.socket && &tconn->meta.socket) {
934 ok = drbd_socket_okay(&tconn->data.socket);
935 ok = drbd_socket_okay(&tconn->meta.socket) && ok;
b411b363
PR
936 if (ok)
937 break;
938 }
939 } while (1);
940
2bf89621
AG
941 sock = tconn->data.socket;
942 msock = tconn->meta.socket;
943
b411b363
PR
944 msock->sk->sk_reuse = 1; /* SO_REUSEADDR */
945 sock->sk->sk_reuse = 1; /* SO_REUSEADDR */
946
947 sock->sk->sk_allocation = GFP_NOIO;
948 msock->sk->sk_allocation = GFP_NOIO;
949
950 sock->sk->sk_priority = TC_PRIO_INTERACTIVE_BULK;
951 msock->sk->sk_priority = TC_PRIO_INTERACTIVE;
952
b411b363 953 /* NOT YET ...
907599e0 954 * sock->sk->sk_sndtimeo = tconn->net_conf->timeout*HZ/10;
b411b363 955 * sock->sk->sk_rcvtimeo = MAX_SCHEDULE_TIMEOUT;
6038178e 956 * first set it to the P_CONNECTION_FEATURES timeout,
b411b363 957 * which we set to 4x the configured ping_timeout. */
44ed167d
PR
958 rcu_read_lock();
959 nc = rcu_dereference(tconn->net_conf);
960
b411b363 961 sock->sk->sk_sndtimeo =
44ed167d
PR
962 sock->sk->sk_rcvtimeo = nc->ping_timeo*4*HZ/10;
963
964 msock->sk->sk_rcvtimeo = nc->ping_int*HZ;
965 timeout = nc->timeout * HZ / 10;
966 rcu_read_unlock();
b411b363 967
44ed167d 968 msock->sk->sk_sndtimeo = timeout;
b411b363
PR
969
970 /* we don't want delays.
25985edc 971 * we use TCP_CORK where appropriate, though */
b411b363
PR
972 drbd_tcp_nodelay(sock);
973 drbd_tcp_nodelay(msock);
974
907599e0 975 tconn->last_received = jiffies;
b411b363 976
6038178e 977 h = drbd_do_features(tconn);
b411b363
PR
978 if (h <= 0)
979 return h;
980
907599e0 981 if (tconn->cram_hmac_tfm) {
b411b363 982 /* drbd_request_state(mdev, NS(conn, WFAuth)); */
907599e0 983 switch (drbd_do_auth(tconn)) {
b10d96cb 984 case -1:
907599e0 985 conn_err(tconn, "Authentication of peer failed\n");
b411b363 986 return -1;
b10d96cb 987 case 0:
907599e0 988 conn_err(tconn, "Authentication of peer failed, trying again.\n");
b10d96cb 989 return 0;
b411b363
PR
990 }
991 }
992
bbeb641c 993 if (conn_request_state(tconn, NS(conn, C_WF_REPORT_PARAMS), CS_VERBOSE) < SS_SUCCESS)
b411b363
PR
994 return 0;
995
44ed167d 996 sock->sk->sk_sndtimeo = timeout;
b411b363
PR
997 sock->sk->sk_rcvtimeo = MAX_SCHEDULE_TIMEOUT;
998
907599e0 999 drbd_thread_start(&tconn->asender);
b411b363 1000
387eb308 1001 if (drbd_send_protocol(tconn) == -EOPNOTSUPP)
7e2455c1 1002 return -1;
b411b363 1003
c141ebda
PR
1004 rcu_read_lock();
1005 idr_for_each_entry(&tconn->volumes, mdev, vnr) {
1006 kref_get(&mdev->kref);
1007 rcu_read_unlock();
1008 drbd_connected(mdev);
1009 kref_put(&mdev->kref, &drbd_minor_destroy);
1010 rcu_read_lock();
1011 }
1012 rcu_read_unlock();
1013
d3fcb490 1014 return h;
b411b363
PR
1015
1016out_release_sockets:
2bf89621
AG
1017 if (tconn->data.socket) {
1018 sock_release(tconn->data.socket);
1019 tconn->data.socket = NULL;
1020 }
1021 if (tconn->meta.socket) {
1022 sock_release(tconn->meta.socket);
1023 tconn->meta.socket = NULL;
1024 }
b411b363
PR
1025 return -1;
1026}
1027
e658983a 1028static int decode_header(struct drbd_tconn *tconn, void *header, struct packet_info *pi)
b411b363 1029{
e658983a
AG
1030 unsigned int header_size = drbd_header_size(tconn);
1031
0c8e36d9
AG
1032 if (header_size == sizeof(struct p_header100) &&
1033 *(__be32 *)header == cpu_to_be32(DRBD_MAGIC_100)) {
1034 struct p_header100 *h = header;
1035 if (h->pad != 0) {
1036 conn_err(tconn, "Header padding is not zero\n");
1037 return -EINVAL;
1038 }
1039 pi->vnr = be16_to_cpu(h->volume);
1040 pi->cmd = be16_to_cpu(h->command);
1041 pi->size = be32_to_cpu(h->length);
1042 } else if (header_size == sizeof(struct p_header95) &&
1043 *(__be16 *)header == cpu_to_be16(DRBD_MAGIC_BIG)) {
e658983a 1044 struct p_header95 *h = header;
e658983a 1045 pi->cmd = be16_to_cpu(h->command);
b55d84ba
AG
1046 pi->size = be32_to_cpu(h->length);
1047 pi->vnr = 0;
e658983a
AG
1048 } else if (header_size == sizeof(struct p_header80) &&
1049 *(__be32 *)header == cpu_to_be32(DRBD_MAGIC)) {
1050 struct p_header80 *h = header;
1051 pi->cmd = be16_to_cpu(h->command);
1052 pi->size = be16_to_cpu(h->length);
77351055 1053 pi->vnr = 0;
02918be2 1054 } else {
e658983a
AG
1055 conn_err(tconn, "Wrong magic value 0x%08x in protocol version %d\n",
1056 be32_to_cpu(*(__be32 *)header),
1057 tconn->agreed_pro_version);
8172f3e9 1058 return -EINVAL;
b411b363 1059 }
e658983a 1060 pi->data = header + header_size;
8172f3e9 1061 return 0;
257d0af6
PR
1062}
1063
9ba7aa00 1064static int drbd_recv_header(struct drbd_tconn *tconn, struct packet_info *pi)
257d0af6 1065{
e658983a 1066 void *buffer = tconn->data.rbuf;
69bc7bc3 1067 int err;
257d0af6 1068
e658983a 1069 err = drbd_recv_all_warn(tconn, buffer, drbd_header_size(tconn));
a5c31904 1070 if (err)
69bc7bc3 1071 return err;
257d0af6 1072
e658983a 1073 err = decode_header(tconn, buffer, pi);
9ba7aa00 1074 tconn->last_received = jiffies;
b411b363 1075
69bc7bc3 1076 return err;
b411b363
PR
1077}
1078
2451fc3b 1079static void drbd_flush(struct drbd_conf *mdev)
b411b363
PR
1080{
1081 int rv;
1082
1083 if (mdev->write_ordering >= WO_bdev_flush && get_ldev(mdev)) {
fbd9b09a 1084 rv = blkdev_issue_flush(mdev->ldev->backing_bdev, GFP_KERNEL,
dd3932ed 1085 NULL);
b411b363
PR
1086 if (rv) {
1087 dev_err(DEV, "local disk flush failed with status %d\n", rv);
1088 /* would rather check on EOPNOTSUPP, but that is not reliable.
1089 * don't try again for ANY return value != 0
1090 * if (rv == -EOPNOTSUPP) */
1091 drbd_bump_write_ordering(mdev, WO_drain_io);
1092 }
1093 put_ldev(mdev);
1094 }
b411b363
PR
1095}
1096
1097/**
1098 * drbd_may_finish_epoch() - Applies an epoch_event to the epoch's state, eventually finishes it.
1099 * @mdev: DRBD device.
1100 * @epoch: Epoch object.
1101 * @ev: Epoch event.
1102 */
1103static enum finish_epoch drbd_may_finish_epoch(struct drbd_conf *mdev,
1104 struct drbd_epoch *epoch,
1105 enum epoch_event ev)
1106{
2451fc3b 1107 int epoch_size;
b411b363 1108 struct drbd_epoch *next_epoch;
b411b363
PR
1109 enum finish_epoch rv = FE_STILL_LIVE;
1110
1111 spin_lock(&mdev->epoch_lock);
1112 do {
1113 next_epoch = NULL;
b411b363
PR
1114
1115 epoch_size = atomic_read(&epoch->epoch_size);
1116
1117 switch (ev & ~EV_CLEANUP) {
1118 case EV_PUT:
1119 atomic_dec(&epoch->active);
1120 break;
1121 case EV_GOT_BARRIER_NR:
1122 set_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags);
b411b363
PR
1123 break;
1124 case EV_BECAME_LAST:
1125 /* nothing to do*/
1126 break;
1127 }
1128
b411b363
PR
1129 if (epoch_size != 0 &&
1130 atomic_read(&epoch->active) == 0 &&
2451fc3b 1131 test_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags)) {
b411b363
PR
1132 if (!(ev & EV_CLEANUP)) {
1133 spin_unlock(&mdev->epoch_lock);
1134 drbd_send_b_ack(mdev, epoch->barrier_nr, epoch_size);
1135 spin_lock(&mdev->epoch_lock);
1136 }
1137 dec_unacked(mdev);
1138
1139 if (mdev->current_epoch != epoch) {
1140 next_epoch = list_entry(epoch->list.next, struct drbd_epoch, list);
1141 list_del(&epoch->list);
1142 ev = EV_BECAME_LAST | (ev & EV_CLEANUP);
1143 mdev->epochs--;
b411b363
PR
1144 kfree(epoch);
1145
1146 if (rv == FE_STILL_LIVE)
1147 rv = FE_DESTROYED;
1148 } else {
1149 epoch->flags = 0;
1150 atomic_set(&epoch->epoch_size, 0);
698f9315 1151 /* atomic_set(&epoch->active, 0); is already zero */
b411b363
PR
1152 if (rv == FE_STILL_LIVE)
1153 rv = FE_RECYCLED;
2451fc3b 1154 wake_up(&mdev->ee_wait);
b411b363
PR
1155 }
1156 }
1157
1158 if (!next_epoch)
1159 break;
1160
1161 epoch = next_epoch;
1162 } while (1);
1163
1164 spin_unlock(&mdev->epoch_lock);
1165
b411b363
PR
1166 return rv;
1167}
1168
1169/**
1170 * drbd_bump_write_ordering() - Fall back to an other write ordering method
1171 * @mdev: DRBD device.
1172 * @wo: Write ordering method to try.
1173 */
1174void drbd_bump_write_ordering(struct drbd_conf *mdev, enum write_ordering_e wo) __must_hold(local)
1175{
daeda1cc 1176 struct disk_conf *dc;
b411b363
PR
1177 enum write_ordering_e pwo;
1178 static char *write_ordering_str[] = {
1179 [WO_none] = "none",
1180 [WO_drain_io] = "drain",
1181 [WO_bdev_flush] = "flush",
b411b363
PR
1182 };
1183
1184 pwo = mdev->write_ordering;
1185 wo = min(pwo, wo);
daeda1cc
PR
1186 rcu_read_lock();
1187 dc = rcu_dereference(mdev->ldev->disk_conf);
1188
66b2f6b9 1189 if (wo == WO_bdev_flush && !dc->disk_flushes)
b411b363 1190 wo = WO_drain_io;
d0c980e2 1191 if (wo == WO_drain_io && !dc->disk_drain)
b411b363 1192 wo = WO_none;
daeda1cc 1193 rcu_read_unlock();
b411b363 1194 mdev->write_ordering = wo;
2451fc3b 1195 if (pwo != mdev->write_ordering || wo == WO_bdev_flush)
b411b363
PR
1196 dev_info(DEV, "Method to ensure write ordering: %s\n", write_ordering_str[mdev->write_ordering]);
1197}
1198
45bb912b 1199/**
fbe29dec 1200 * drbd_submit_peer_request()
45bb912b 1201 * @mdev: DRBD device.
db830c46 1202 * @peer_req: peer request
45bb912b 1203 * @rw: flag field, see bio->bi_rw
10f6d992
LE
1204 *
1205 * May spread the pages to multiple bios,
1206 * depending on bio_add_page restrictions.
1207 *
1208 * Returns 0 if all bios have been submitted,
1209 * -ENOMEM if we could not allocate enough bios,
1210 * -ENOSPC (any better suggestion?) if we have not been able to bio_add_page a
1211 * single page to an empty bio (which should never happen and likely indicates
1212 * that the lower level IO stack is in some way broken). This has been observed
1213 * on certain Xen deployments.
45bb912b
LE
1214 */
1215/* TODO allocate from our own bio_set. */
fbe29dec
AG
1216int drbd_submit_peer_request(struct drbd_conf *mdev,
1217 struct drbd_peer_request *peer_req,
1218 const unsigned rw, const int fault_type)
45bb912b
LE
1219{
1220 struct bio *bios = NULL;
1221 struct bio *bio;
db830c46
AG
1222 struct page *page = peer_req->pages;
1223 sector_t sector = peer_req->i.sector;
1224 unsigned ds = peer_req->i.size;
45bb912b
LE
1225 unsigned n_bios = 0;
1226 unsigned nr_pages = (ds + PAGE_SIZE -1) >> PAGE_SHIFT;
10f6d992 1227 int err = -ENOMEM;
45bb912b
LE
1228
1229 /* In most cases, we will only need one bio. But in case the lower
1230 * level restrictions happen to be different at this offset on this
1231 * side than those of the sending peer, we may need to submit the
da4a75d2
LE
1232 * request in more than one bio.
1233 *
1234 * Plain bio_alloc is good enough here, this is no DRBD internally
1235 * generated bio, but a bio allocated on behalf of the peer.
1236 */
45bb912b
LE
1237next_bio:
1238 bio = bio_alloc(GFP_NOIO, nr_pages);
1239 if (!bio) {
1240 dev_err(DEV, "submit_ee: Allocation of a bio failed\n");
1241 goto fail;
1242 }
db830c46 1243 /* > peer_req->i.sector, unless this is the first bio */
45bb912b
LE
1244 bio->bi_sector = sector;
1245 bio->bi_bdev = mdev->ldev->backing_bdev;
45bb912b 1246 bio->bi_rw = rw;
db830c46 1247 bio->bi_private = peer_req;
fcefa62e 1248 bio->bi_end_io = drbd_peer_request_endio;
45bb912b
LE
1249
1250 bio->bi_next = bios;
1251 bios = bio;
1252 ++n_bios;
1253
1254 page_chain_for_each(page) {
1255 unsigned len = min_t(unsigned, ds, PAGE_SIZE);
1256 if (!bio_add_page(bio, page, len, 0)) {
10f6d992
LE
1257 /* A single page must always be possible!
1258 * But in case it fails anyways,
1259 * we deal with it, and complain (below). */
1260 if (bio->bi_vcnt == 0) {
1261 dev_err(DEV,
1262 "bio_add_page failed for len=%u, "
1263 "bi_vcnt=0 (bi_sector=%llu)\n",
1264 len, (unsigned long long)bio->bi_sector);
1265 err = -ENOSPC;
1266 goto fail;
1267 }
45bb912b
LE
1268 goto next_bio;
1269 }
1270 ds -= len;
1271 sector += len >> 9;
1272 --nr_pages;
1273 }
1274 D_ASSERT(page == NULL);
1275 D_ASSERT(ds == 0);
1276
db830c46 1277 atomic_set(&peer_req->pending_bios, n_bios);
45bb912b
LE
1278 do {
1279 bio = bios;
1280 bios = bios->bi_next;
1281 bio->bi_next = NULL;
1282
45bb912b 1283 drbd_generic_make_request(mdev, fault_type, bio);
45bb912b 1284 } while (bios);
45bb912b
LE
1285 return 0;
1286
1287fail:
1288 while (bios) {
1289 bio = bios;
1290 bios = bios->bi_next;
1291 bio_put(bio);
1292 }
10f6d992 1293 return err;
45bb912b
LE
1294}
1295
53840641 1296static void drbd_remove_epoch_entry_interval(struct drbd_conf *mdev,
db830c46 1297 struct drbd_peer_request *peer_req)
53840641 1298{
db830c46 1299 struct drbd_interval *i = &peer_req->i;
53840641
AG
1300
1301 drbd_remove_interval(&mdev->write_requests, i);
1302 drbd_clear_interval(i);
1303
6c852bec 1304 /* Wake up any processes waiting for this peer request to complete. */
53840641
AG
1305 if (i->waiting)
1306 wake_up(&mdev->misc_wait);
1307}
1308
4a76b161 1309static int receive_Barrier(struct drbd_tconn *tconn, struct packet_info *pi)
b411b363 1310{
4a76b161 1311 struct drbd_conf *mdev;
2451fc3b 1312 int rv;
e658983a 1313 struct p_barrier *p = pi->data;
b411b363
PR
1314 struct drbd_epoch *epoch;
1315
4a76b161
AG
1316 mdev = vnr_to_mdev(tconn, pi->vnr);
1317 if (!mdev)
1318 return -EIO;
1319
b411b363
PR
1320 inc_unacked(mdev);
1321
b411b363
PR
1322 mdev->current_epoch->barrier_nr = p->barrier;
1323 rv = drbd_may_finish_epoch(mdev, mdev->current_epoch, EV_GOT_BARRIER_NR);
1324
1325 /* P_BARRIER_ACK may imply that the corresponding extent is dropped from
1326 * the activity log, which means it would not be resynced in case the
1327 * R_PRIMARY crashes now.
1328 * Therefore we must send the barrier_ack after the barrier request was
1329 * completed. */
1330 switch (mdev->write_ordering) {
b411b363
PR
1331 case WO_none:
1332 if (rv == FE_RECYCLED)
82bc0194 1333 return 0;
2451fc3b
PR
1334
1335 /* receiver context, in the writeout path of the other node.
1336 * avoid potential distributed deadlock */
1337 epoch = kmalloc(sizeof(struct drbd_epoch), GFP_NOIO);
1338 if (epoch)
1339 break;
1340 else
1341 dev_warn(DEV, "Allocation of an epoch failed, slowing down\n");
1342 /* Fall through */
b411b363
PR
1343
1344 case WO_bdev_flush:
1345 case WO_drain_io:
b411b363 1346 drbd_wait_ee_list_empty(mdev, &mdev->active_ee);
2451fc3b
PR
1347 drbd_flush(mdev);
1348
1349 if (atomic_read(&mdev->current_epoch->epoch_size)) {
1350 epoch = kmalloc(sizeof(struct drbd_epoch), GFP_NOIO);
1351 if (epoch)
1352 break;
b411b363
PR
1353 }
1354
2451fc3b
PR
1355 epoch = mdev->current_epoch;
1356 wait_event(mdev->ee_wait, atomic_read(&epoch->epoch_size) == 0);
1357
1358 D_ASSERT(atomic_read(&epoch->active) == 0);
1359 D_ASSERT(epoch->flags == 0);
b411b363 1360
82bc0194 1361 return 0;
2451fc3b
PR
1362 default:
1363 dev_err(DEV, "Strangeness in mdev->write_ordering %d\n", mdev->write_ordering);
82bc0194 1364 return -EIO;
b411b363
PR
1365 }
1366
1367 epoch->flags = 0;
1368 atomic_set(&epoch->epoch_size, 0);
1369 atomic_set(&epoch->active, 0);
1370
1371 spin_lock(&mdev->epoch_lock);
1372 if (atomic_read(&mdev->current_epoch->epoch_size)) {
1373 list_add(&epoch->list, &mdev->current_epoch->list);
1374 mdev->current_epoch = epoch;
1375 mdev->epochs++;
b411b363
PR
1376 } else {
1377 /* The current_epoch got recycled while we allocated this one... */
1378 kfree(epoch);
1379 }
1380 spin_unlock(&mdev->epoch_lock);
1381
82bc0194 1382 return 0;
b411b363
PR
1383}
1384
1385/* used from receive_RSDataReply (recv_resync_read)
1386 * and from receive_Data */
f6ffca9f
AG
1387static struct drbd_peer_request *
1388read_in_block(struct drbd_conf *mdev, u64 id, sector_t sector,
1389 int data_size) __must_hold(local)
b411b363 1390{
6666032a 1391 const sector_t capacity = drbd_get_capacity(mdev->this_bdev);
db830c46 1392 struct drbd_peer_request *peer_req;
b411b363 1393 struct page *page;
a5c31904 1394 int dgs, ds, err;
a0638456
PR
1395 void *dig_in = mdev->tconn->int_dig_in;
1396 void *dig_vv = mdev->tconn->int_dig_vv;
6b4388ac 1397 unsigned long *data;
b411b363 1398
88104ca4
AG
1399 dgs = 0;
1400 if (mdev->tconn->peer_integrity_tfm) {
1401 dgs = crypto_hash_digestsize(mdev->tconn->peer_integrity_tfm);
9f5bdc33
AG
1402 /*
1403 * FIXME: Receive the incoming digest into the receive buffer
1404 * here, together with its struct p_data?
1405 */
a5c31904
AG
1406 err = drbd_recv_all_warn(mdev->tconn, dig_in, dgs);
1407 if (err)
b411b363 1408 return NULL;
88104ca4 1409 data_size -= dgs;
b411b363
PR
1410 }
1411
841ce241
AG
1412 if (!expect(data_size != 0))
1413 return NULL;
1414 if (!expect(IS_ALIGNED(data_size, 512)))
1415 return NULL;
1416 if (!expect(data_size <= DRBD_MAX_BIO_SIZE))
1417 return NULL;
b411b363 1418
6666032a
LE
1419 /* even though we trust out peer,
1420 * we sometimes have to double check. */
1421 if (sector + (data_size>>9) > capacity) {
fdda6544
LE
1422 dev_err(DEV, "request from peer beyond end of local disk: "
1423 "capacity: %llus < sector: %llus + size: %u\n",
6666032a
LE
1424 (unsigned long long)capacity,
1425 (unsigned long long)sector, data_size);
1426 return NULL;
1427 }
1428
b411b363
PR
1429 /* GFP_NOIO, because we must not cause arbitrary write-out: in a DRBD
1430 * "criss-cross" setup, that might cause write-out on some other DRBD,
1431 * which in turn might block on the other node at this very place. */
0db55363 1432 peer_req = drbd_alloc_peer_req(mdev, id, sector, data_size, GFP_NOIO);
db830c46 1433 if (!peer_req)
b411b363 1434 return NULL;
45bb912b 1435
b411b363 1436 ds = data_size;
db830c46 1437 page = peer_req->pages;
45bb912b
LE
1438 page_chain_for_each(page) {
1439 unsigned len = min_t(int, ds, PAGE_SIZE);
6b4388ac 1440 data = kmap(page);
a5c31904 1441 err = drbd_recv_all_warn(mdev->tconn, data, len);
0cf9d27e 1442 if (drbd_insert_fault(mdev, DRBD_FAULT_RECEIVE)) {
6b4388ac
PR
1443 dev_err(DEV, "Fault injection: Corrupting data on receive\n");
1444 data[0] = data[0] ^ (unsigned long)-1;
1445 }
b411b363 1446 kunmap(page);
a5c31904 1447 if (err) {
3967deb1 1448 drbd_free_peer_req(mdev, peer_req);
b411b363
PR
1449 return NULL;
1450 }
a5c31904 1451 ds -= len;
b411b363
PR
1452 }
1453
1454 if (dgs) {
5b614abe 1455 drbd_csum_ee(mdev, mdev->tconn->peer_integrity_tfm, peer_req, dig_vv);
b411b363 1456 if (memcmp(dig_in, dig_vv, dgs)) {
470be44a
LE
1457 dev_err(DEV, "Digest integrity check FAILED: %llus +%u\n",
1458 (unsigned long long)sector, data_size);
3967deb1 1459 drbd_free_peer_req(mdev, peer_req);
b411b363
PR
1460 return NULL;
1461 }
1462 }
1463 mdev->recv_cnt += data_size>>9;
db830c46 1464 return peer_req;
b411b363
PR
1465}
1466
1467/* drbd_drain_block() just takes a data block
1468 * out of the socket input buffer, and discards it.
1469 */
1470static int drbd_drain_block(struct drbd_conf *mdev, int data_size)
1471{
1472 struct page *page;
a5c31904 1473 int err = 0;
b411b363
PR
1474 void *data;
1475
c3470cde 1476 if (!data_size)
fc5be839 1477 return 0;
c3470cde 1478
c37c8ecf 1479 page = drbd_alloc_pages(mdev, 1, 1);
b411b363
PR
1480
1481 data = kmap(page);
1482 while (data_size) {
fc5be839
AG
1483 unsigned int len = min_t(int, data_size, PAGE_SIZE);
1484
a5c31904
AG
1485 err = drbd_recv_all_warn(mdev->tconn, data, len);
1486 if (err)
b411b363 1487 break;
a5c31904 1488 data_size -= len;
b411b363
PR
1489 }
1490 kunmap(page);
5cc287e0 1491 drbd_free_pages(mdev, page, 0);
fc5be839 1492 return err;
b411b363
PR
1493}
1494
1495static int recv_dless_read(struct drbd_conf *mdev, struct drbd_request *req,
1496 sector_t sector, int data_size)
1497{
1498 struct bio_vec *bvec;
1499 struct bio *bio;
a5c31904 1500 int dgs, err, i, expect;
a0638456
PR
1501 void *dig_in = mdev->tconn->int_dig_in;
1502 void *dig_vv = mdev->tconn->int_dig_vv;
b411b363 1503
88104ca4
AG
1504 dgs = 0;
1505 if (mdev->tconn->peer_integrity_tfm) {
1506 dgs = crypto_hash_digestsize(mdev->tconn->peer_integrity_tfm);
a5c31904
AG
1507 err = drbd_recv_all_warn(mdev->tconn, dig_in, dgs);
1508 if (err)
1509 return err;
88104ca4 1510 data_size -= dgs;
b411b363
PR
1511 }
1512
b411b363
PR
1513 /* optimistically update recv_cnt. if receiving fails below,
1514 * we disconnect anyways, and counters will be reset. */
1515 mdev->recv_cnt += data_size>>9;
1516
1517 bio = req->master_bio;
1518 D_ASSERT(sector == bio->bi_sector);
1519
1520 bio_for_each_segment(bvec, bio, i) {
a5c31904 1521 void *mapped = kmap(bvec->bv_page) + bvec->bv_offset;
b411b363 1522 expect = min_t(int, data_size, bvec->bv_len);
a5c31904 1523 err = drbd_recv_all_warn(mdev->tconn, mapped, expect);
b411b363 1524 kunmap(bvec->bv_page);
a5c31904
AG
1525 if (err)
1526 return err;
1527 data_size -= expect;
b411b363
PR
1528 }
1529
1530 if (dgs) {
5b614abe 1531 drbd_csum_bio(mdev, mdev->tconn->peer_integrity_tfm, bio, dig_vv);
b411b363
PR
1532 if (memcmp(dig_in, dig_vv, dgs)) {
1533 dev_err(DEV, "Digest integrity check FAILED. Broken NICs?\n");
28284cef 1534 return -EINVAL;
b411b363
PR
1535 }
1536 }
1537
1538 D_ASSERT(data_size == 0);
28284cef 1539 return 0;
b411b363
PR
1540}
1541
a990be46
AG
1542/*
1543 * e_end_resync_block() is called in asender context via
1544 * drbd_finish_peer_reqs().
1545 */
99920dc5 1546static int e_end_resync_block(struct drbd_work *w, int unused)
b411b363 1547{
8050e6d0
AG
1548 struct drbd_peer_request *peer_req =
1549 container_of(w, struct drbd_peer_request, w);
00d56944 1550 struct drbd_conf *mdev = w->mdev;
db830c46 1551 sector_t sector = peer_req->i.sector;
99920dc5 1552 int err;
b411b363 1553
db830c46 1554 D_ASSERT(drbd_interval_empty(&peer_req->i));
b411b363 1555
db830c46
AG
1556 if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
1557 drbd_set_in_sync(mdev, sector, peer_req->i.size);
99920dc5 1558 err = drbd_send_ack(mdev, P_RS_WRITE_ACK, peer_req);
b411b363
PR
1559 } else {
1560 /* Record failure to sync */
db830c46 1561 drbd_rs_failed_io(mdev, sector, peer_req->i.size);
b411b363 1562
99920dc5 1563 err = drbd_send_ack(mdev, P_NEG_ACK, peer_req);
b411b363
PR
1564 }
1565 dec_unacked(mdev);
1566
99920dc5 1567 return err;
b411b363
PR
1568}
1569
1570static int recv_resync_read(struct drbd_conf *mdev, sector_t sector, int data_size) __releases(local)
1571{
db830c46 1572 struct drbd_peer_request *peer_req;
b411b363 1573
db830c46
AG
1574 peer_req = read_in_block(mdev, ID_SYNCER, sector, data_size);
1575 if (!peer_req)
45bb912b 1576 goto fail;
b411b363
PR
1577
1578 dec_rs_pending(mdev);
1579
b411b363
PR
1580 inc_unacked(mdev);
1581 /* corresponding dec_unacked() in e_end_resync_block()
1582 * respective _drbd_clear_done_ee */
1583
db830c46 1584 peer_req->w.cb = e_end_resync_block;
45bb912b 1585
87eeee41 1586 spin_lock_irq(&mdev->tconn->req_lock);
db830c46 1587 list_add(&peer_req->w.list, &mdev->sync_ee);
87eeee41 1588 spin_unlock_irq(&mdev->tconn->req_lock);
b411b363 1589
0f0601f4 1590 atomic_add(data_size >> 9, &mdev->rs_sect_ev);
fbe29dec 1591 if (drbd_submit_peer_request(mdev, peer_req, WRITE, DRBD_FAULT_RS_WR) == 0)
e1c1b0fc 1592 return 0;
b411b363 1593
10f6d992
LE
1594 /* don't care for the reason here */
1595 dev_err(DEV, "submit failed, triggering re-connect\n");
87eeee41 1596 spin_lock_irq(&mdev->tconn->req_lock);
db830c46 1597 list_del(&peer_req->w.list);
87eeee41 1598 spin_unlock_irq(&mdev->tconn->req_lock);
22cc37a9 1599
3967deb1 1600 drbd_free_peer_req(mdev, peer_req);
45bb912b
LE
1601fail:
1602 put_ldev(mdev);
e1c1b0fc 1603 return -EIO;
b411b363
PR
1604}
1605
668eebc6 1606static struct drbd_request *
bc9c5c41
AG
1607find_request(struct drbd_conf *mdev, struct rb_root *root, u64 id,
1608 sector_t sector, bool missing_ok, const char *func)
51624585 1609{
51624585
AG
1610 struct drbd_request *req;
1611
bc9c5c41
AG
1612 /* Request object according to our peer */
1613 req = (struct drbd_request *)(unsigned long)id;
5e472264 1614 if (drbd_contains_interval(root, sector, &req->i) && req->i.local)
668eebc6 1615 return req;
c3afd8f5
AG
1616 if (!missing_ok) {
1617 dev_err(DEV, "%s: failed to find request %lu, sector %llus\n", func,
1618 (unsigned long)id, (unsigned long long)sector);
1619 }
51624585
AG
1620 return NULL;
1621}
1622
4a76b161 1623static int receive_DataReply(struct drbd_tconn *tconn, struct packet_info *pi)
b411b363 1624{
4a76b161 1625 struct drbd_conf *mdev;
b411b363
PR
1626 struct drbd_request *req;
1627 sector_t sector;
82bc0194 1628 int err;
e658983a 1629 struct p_data *p = pi->data;
4a76b161
AG
1630
1631 mdev = vnr_to_mdev(tconn, pi->vnr);
1632 if (!mdev)
1633 return -EIO;
b411b363
PR
1634
1635 sector = be64_to_cpu(p->sector);
1636
87eeee41 1637 spin_lock_irq(&mdev->tconn->req_lock);
bc9c5c41 1638 req = find_request(mdev, &mdev->read_requests, p->block_id, sector, false, __func__);
87eeee41 1639 spin_unlock_irq(&mdev->tconn->req_lock);
c3afd8f5 1640 if (unlikely(!req))
82bc0194 1641 return -EIO;
b411b363 1642
24c4830c 1643 /* hlist_del(&req->collision) is done in _req_may_be_done, to avoid
b411b363
PR
1644 * special casing it there for the various failure cases.
1645 * still no race with drbd_fail_pending_reads */
e2857216 1646 err = recv_dless_read(mdev, req, sector, pi->size);
82bc0194 1647 if (!err)
8554df1c 1648 req_mod(req, DATA_RECEIVED);
b411b363
PR
1649 /* else: nothing. handled from drbd_disconnect...
1650 * I don't think we may complete this just yet
1651 * in case we are "on-disconnect: freeze" */
1652
82bc0194 1653 return err;
b411b363
PR
1654}
1655
4a76b161 1656static int receive_RSDataReply(struct drbd_tconn *tconn, struct packet_info *pi)
b411b363 1657{
4a76b161 1658 struct drbd_conf *mdev;
b411b363 1659 sector_t sector;
82bc0194 1660 int err;
e658983a 1661 struct p_data *p = pi->data;
4a76b161
AG
1662
1663 mdev = vnr_to_mdev(tconn, pi->vnr);
1664 if (!mdev)
1665 return -EIO;
b411b363
PR
1666
1667 sector = be64_to_cpu(p->sector);
1668 D_ASSERT(p->block_id == ID_SYNCER);
1669
1670 if (get_ldev(mdev)) {
1671 /* data is submitted to disk within recv_resync_read.
1672 * corresponding put_ldev done below on error,
fcefa62e 1673 * or in drbd_peer_request_endio. */
e2857216 1674 err = recv_resync_read(mdev, sector, pi->size);
b411b363
PR
1675 } else {
1676 if (__ratelimit(&drbd_ratelimit_state))
1677 dev_err(DEV, "Can not write resync data to local disk.\n");
1678
e2857216 1679 err = drbd_drain_block(mdev, pi->size);
b411b363 1680
e2857216 1681 drbd_send_ack_dp(mdev, P_NEG_ACK, p, pi->size);
b411b363
PR
1682 }
1683
e2857216 1684 atomic_add(pi->size >> 9, &mdev->rs_sect_in);
778f271d 1685
82bc0194 1686 return err;
b411b363
PR
1687}
1688
99920dc5 1689static int w_restart_write(struct drbd_work *w, int cancel)
7be8da07
AG
1690{
1691 struct drbd_request *req = container_of(w, struct drbd_request, w);
1692 struct drbd_conf *mdev = w->mdev;
1693 struct bio *bio;
1694 unsigned long start_time;
1695 unsigned long flags;
1696
1697 spin_lock_irqsave(&mdev->tconn->req_lock, flags);
1698 if (!expect(req->rq_state & RQ_POSTPONED)) {
1699 spin_unlock_irqrestore(&mdev->tconn->req_lock, flags);
99920dc5 1700 return -EIO;
7be8da07
AG
1701 }
1702 bio = req->master_bio;
1703 start_time = req->start_time;
1704 /* Postponed requests will not have their master_bio completed! */
1705 __req_mod(req, DISCARD_WRITE, NULL);
1706 spin_unlock_irqrestore(&mdev->tconn->req_lock, flags);
1707
1708 while (__drbd_make_request(mdev, bio, start_time))
1709 /* retry */ ;
99920dc5 1710 return 0;
7be8da07
AG
1711}
1712
1713static void restart_conflicting_writes(struct drbd_conf *mdev,
1714 sector_t sector, int size)
1715{
1716 struct drbd_interval *i;
1717 struct drbd_request *req;
1718
1719 drbd_for_each_overlap(i, &mdev->write_requests, sector, size) {
1720 if (!i->local)
1721 continue;
1722 req = container_of(i, struct drbd_request, i);
1723 if (req->rq_state & RQ_LOCAL_PENDING ||
1724 !(req->rq_state & RQ_POSTPONED))
1725 continue;
1726 if (expect(list_empty(&req->w.list))) {
1727 req->w.mdev = mdev;
1728 req->w.cb = w_restart_write;
1729 drbd_queue_work(&mdev->tconn->data.work, &req->w);
1730 }
1731 }
1732}
1733
a990be46
AG
1734/*
1735 * e_end_block() is called in asender context via drbd_finish_peer_reqs().
b411b363 1736 */
99920dc5 1737static int e_end_block(struct drbd_work *w, int cancel)
b411b363 1738{
8050e6d0
AG
1739 struct drbd_peer_request *peer_req =
1740 container_of(w, struct drbd_peer_request, w);
00d56944 1741 struct drbd_conf *mdev = w->mdev;
db830c46 1742 sector_t sector = peer_req->i.sector;
99920dc5 1743 int err = 0, pcmd;
b411b363 1744
303d1448 1745 if (peer_req->flags & EE_SEND_WRITE_ACK) {
db830c46 1746 if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
b411b363
PR
1747 pcmd = (mdev->state.conn >= C_SYNC_SOURCE &&
1748 mdev->state.conn <= C_PAUSED_SYNC_T &&
db830c46 1749 peer_req->flags & EE_MAY_SET_IN_SYNC) ?
b411b363 1750 P_RS_WRITE_ACK : P_WRITE_ACK;
99920dc5 1751 err = drbd_send_ack(mdev, pcmd, peer_req);
b411b363 1752 if (pcmd == P_RS_WRITE_ACK)
db830c46 1753 drbd_set_in_sync(mdev, sector, peer_req->i.size);
b411b363 1754 } else {
99920dc5 1755 err = drbd_send_ack(mdev, P_NEG_ACK, peer_req);
b411b363
PR
1756 /* we expect it to be marked out of sync anyways...
1757 * maybe assert this? */
1758 }
1759 dec_unacked(mdev);
1760 }
1761 /* we delete from the conflict detection hash _after_ we sent out the
1762 * P_WRITE_ACK / P_NEG_ACK, to get the sequence number right. */
302bdeae 1763 if (peer_req->flags & EE_IN_INTERVAL_TREE) {
87eeee41 1764 spin_lock_irq(&mdev->tconn->req_lock);
db830c46
AG
1765 D_ASSERT(!drbd_interval_empty(&peer_req->i));
1766 drbd_remove_epoch_entry_interval(mdev, peer_req);
7be8da07
AG
1767 if (peer_req->flags & EE_RESTART_REQUESTS)
1768 restart_conflicting_writes(mdev, sector, peer_req->i.size);
87eeee41 1769 spin_unlock_irq(&mdev->tconn->req_lock);
bb3bfe96 1770 } else
db830c46 1771 D_ASSERT(drbd_interval_empty(&peer_req->i));
b411b363 1772
db830c46 1773 drbd_may_finish_epoch(mdev, peer_req->epoch, EV_PUT + (cancel ? EV_CLEANUP : 0));
b411b363 1774
99920dc5 1775 return err;
b411b363
PR
1776}
1777
7be8da07 1778static int e_send_ack(struct drbd_work *w, enum drbd_packet ack)
b411b363 1779{
7be8da07 1780 struct drbd_conf *mdev = w->mdev;
8050e6d0
AG
1781 struct drbd_peer_request *peer_req =
1782 container_of(w, struct drbd_peer_request, w);
99920dc5 1783 int err;
b411b363 1784
99920dc5 1785 err = drbd_send_ack(mdev, ack, peer_req);
b411b363
PR
1786 dec_unacked(mdev);
1787
99920dc5 1788 return err;
b411b363
PR
1789}
1790
99920dc5 1791static int e_send_discard_write(struct drbd_work *w, int unused)
7be8da07
AG
1792{
1793 return e_send_ack(w, P_DISCARD_WRITE);
1794}
1795
99920dc5 1796static int e_send_retry_write(struct drbd_work *w, int unused)
7be8da07
AG
1797{
1798 struct drbd_tconn *tconn = w->mdev->tconn;
1799
1800 return e_send_ack(w, tconn->agreed_pro_version >= 100 ?
1801 P_RETRY_WRITE : P_DISCARD_WRITE);
1802}
1803
3e394da1
AG
1804static bool seq_greater(u32 a, u32 b)
1805{
1806 /*
1807 * We assume 32-bit wrap-around here.
1808 * For 24-bit wrap-around, we would have to shift:
1809 * a <<= 8; b <<= 8;
1810 */
1811 return (s32)a - (s32)b > 0;
1812}
1813
1814static u32 seq_max(u32 a, u32 b)
1815{
1816 return seq_greater(a, b) ? a : b;
1817}
1818
7be8da07
AG
1819static bool need_peer_seq(struct drbd_conf *mdev)
1820{
1821 struct drbd_tconn *tconn = mdev->tconn;
302bdeae 1822 int tp;
7be8da07
AG
1823
1824 /*
1825 * We only need to keep track of the last packet_seq number of our peer
1826 * if we are in dual-primary mode and we have the discard flag set; see
1827 * handle_write_conflicts().
1828 */
302bdeae
PR
1829
1830 rcu_read_lock();
1831 tp = rcu_dereference(mdev->tconn->net_conf)->two_primaries;
1832 rcu_read_unlock();
1833
1834 return tp && test_bit(DISCARD_CONCURRENT, &tconn->flags);
7be8da07
AG
1835}
1836
43ae077d 1837static void update_peer_seq(struct drbd_conf *mdev, unsigned int peer_seq)
3e394da1 1838{
3c13b680 1839 unsigned int newest_peer_seq;
3e394da1 1840
7be8da07
AG
1841 if (need_peer_seq(mdev)) {
1842 spin_lock(&mdev->peer_seq_lock);
3c13b680
LE
1843 newest_peer_seq = seq_max(mdev->peer_seq, peer_seq);
1844 mdev->peer_seq = newest_peer_seq;
7be8da07 1845 spin_unlock(&mdev->peer_seq_lock);
3c13b680
LE
1846 /* wake up only if we actually changed mdev->peer_seq */
1847 if (peer_seq == newest_peer_seq)
7be8da07
AG
1848 wake_up(&mdev->seq_wait);
1849 }
3e394da1
AG
1850}
1851
b411b363
PR
1852/* Called from receive_Data.
1853 * Synchronize packets on sock with packets on msock.
1854 *
1855 * This is here so even when a P_DATA packet traveling via sock overtook an Ack
1856 * packet traveling on msock, they are still processed in the order they have
1857 * been sent.
1858 *
1859 * Note: we don't care for Ack packets overtaking P_DATA packets.
1860 *
1861 * In case packet_seq is larger than mdev->peer_seq number, there are
1862 * outstanding packets on the msock. We wait for them to arrive.
1863 * In case we are the logically next packet, we update mdev->peer_seq
1864 * ourselves. Correctly handles 32bit wrap around.
1865 *
1866 * Assume we have a 10 GBit connection, that is about 1<<30 byte per second,
1867 * about 1<<21 sectors per second. So "worst" case, we have 1<<3 == 8 seconds
1868 * for the 24bit wrap (historical atomic_t guarantee on some archs), and we have
1869 * 1<<9 == 512 seconds aka ages for the 32bit wrap around...
1870 *
1871 * returns 0 if we may process the packet,
1872 * -ERESTARTSYS if we were interrupted (by disconnect signal). */
7be8da07 1873static int wait_for_and_update_peer_seq(struct drbd_conf *mdev, const u32 peer_seq)
b411b363
PR
1874{
1875 DEFINE_WAIT(wait);
b411b363 1876 long timeout;
7be8da07
AG
1877 int ret;
1878
1879 if (!need_peer_seq(mdev))
1880 return 0;
1881
b411b363
PR
1882 spin_lock(&mdev->peer_seq_lock);
1883 for (;;) {
7be8da07
AG
1884 if (!seq_greater(peer_seq - 1, mdev->peer_seq)) {
1885 mdev->peer_seq = seq_max(mdev->peer_seq, peer_seq);
1886 ret = 0;
b411b363 1887 break;
7be8da07 1888 }
b411b363
PR
1889 if (signal_pending(current)) {
1890 ret = -ERESTARTSYS;
1891 break;
1892 }
7be8da07 1893 prepare_to_wait(&mdev->seq_wait, &wait, TASK_INTERRUPTIBLE);
b411b363 1894 spin_unlock(&mdev->peer_seq_lock);
44ed167d
PR
1895 rcu_read_lock();
1896 timeout = rcu_dereference(mdev->tconn->net_conf)->ping_timeo*HZ/10;
1897 rcu_read_unlock();
71b1c1eb 1898 timeout = schedule_timeout(timeout);
b411b363 1899 spin_lock(&mdev->peer_seq_lock);
7be8da07 1900 if (!timeout) {
b411b363 1901 ret = -ETIMEDOUT;
71b1c1eb 1902 dev_err(DEV, "Timed out waiting for missing ack packets; disconnecting\n");
b411b363
PR
1903 break;
1904 }
1905 }
b411b363 1906 spin_unlock(&mdev->peer_seq_lock);
7be8da07 1907 finish_wait(&mdev->seq_wait, &wait);
b411b363
PR
1908 return ret;
1909}
1910
688593c5
LE
1911/* see also bio_flags_to_wire()
1912 * DRBD_REQ_*, because we need to semantically map the flags to data packet
1913 * flags and back. We may replicate to other kernel versions. */
1914static unsigned long wire_flags_to_bio(struct drbd_conf *mdev, u32 dpf)
76d2e7ec 1915{
688593c5
LE
1916 return (dpf & DP_RW_SYNC ? REQ_SYNC : 0) |
1917 (dpf & DP_FUA ? REQ_FUA : 0) |
1918 (dpf & DP_FLUSH ? REQ_FLUSH : 0) |
1919 (dpf & DP_DISCARD ? REQ_DISCARD : 0);
76d2e7ec
PR
1920}
1921
7be8da07
AG
1922static void fail_postponed_requests(struct drbd_conf *mdev, sector_t sector,
1923 unsigned int size)
1924{
1925 struct drbd_interval *i;
1926
1927 repeat:
1928 drbd_for_each_overlap(i, &mdev->write_requests, sector, size) {
1929 struct drbd_request *req;
1930 struct bio_and_error m;
1931
1932 if (!i->local)
1933 continue;
1934 req = container_of(i, struct drbd_request, i);
1935 if (!(req->rq_state & RQ_POSTPONED))
1936 continue;
1937 req->rq_state &= ~RQ_POSTPONED;
1938 __req_mod(req, NEG_ACKED, &m);
1939 spin_unlock_irq(&mdev->tconn->req_lock);
1940 if (m.bio)
1941 complete_master_bio(mdev, &m);
1942 spin_lock_irq(&mdev->tconn->req_lock);
1943 goto repeat;
1944 }
1945}
1946
1947static int handle_write_conflicts(struct drbd_conf *mdev,
1948 struct drbd_peer_request *peer_req)
1949{
1950 struct drbd_tconn *tconn = mdev->tconn;
1951 bool resolve_conflicts = test_bit(DISCARD_CONCURRENT, &tconn->flags);
1952 sector_t sector = peer_req->i.sector;
1953 const unsigned int size = peer_req->i.size;
1954 struct drbd_interval *i;
1955 bool equal;
1956 int err;
1957
1958 /*
1959 * Inserting the peer request into the write_requests tree will prevent
1960 * new conflicting local requests from being added.
1961 */
1962 drbd_insert_interval(&mdev->write_requests, &peer_req->i);
1963
1964 repeat:
1965 drbd_for_each_overlap(i, &mdev->write_requests, sector, size) {
1966 if (i == &peer_req->i)
1967 continue;
1968
1969 if (!i->local) {
1970 /*
1971 * Our peer has sent a conflicting remote request; this
1972 * should not happen in a two-node setup. Wait for the
1973 * earlier peer request to complete.
1974 */
1975 err = drbd_wait_misc(mdev, i);
1976 if (err)
1977 goto out;
1978 goto repeat;
1979 }
1980
1981 equal = i->sector == sector && i->size == size;
1982 if (resolve_conflicts) {
1983 /*
1984 * If the peer request is fully contained within the
1985 * overlapping request, it can be discarded; otherwise,
1986 * it will be retried once all overlapping requests
1987 * have completed.
1988 */
1989 bool discard = i->sector <= sector && i->sector +
1990 (i->size >> 9) >= sector + (size >> 9);
1991
1992 if (!equal)
1993 dev_alert(DEV, "Concurrent writes detected: "
1994 "local=%llus +%u, remote=%llus +%u, "
1995 "assuming %s came first\n",
1996 (unsigned long long)i->sector, i->size,
1997 (unsigned long long)sector, size,
1998 discard ? "local" : "remote");
1999
2000 inc_unacked(mdev);
2001 peer_req->w.cb = discard ? e_send_discard_write :
2002 e_send_retry_write;
2003 list_add_tail(&peer_req->w.list, &mdev->done_ee);
2004 wake_asender(mdev->tconn);
2005
2006 err = -ENOENT;
2007 goto out;
2008 } else {
2009 struct drbd_request *req =
2010 container_of(i, struct drbd_request, i);
2011
2012 if (!equal)
2013 dev_alert(DEV, "Concurrent writes detected: "
2014 "local=%llus +%u, remote=%llus +%u\n",
2015 (unsigned long long)i->sector, i->size,
2016 (unsigned long long)sector, size);
2017
2018 if (req->rq_state & RQ_LOCAL_PENDING ||
2019 !(req->rq_state & RQ_POSTPONED)) {
2020 /*
2021 * Wait for the node with the discard flag to
2022 * decide if this request will be discarded or
2023 * retried. Requests that are discarded will
2024 * disappear from the write_requests tree.
2025 *
2026 * In addition, wait for the conflicting
2027 * request to finish locally before submitting
2028 * the conflicting peer request.
2029 */
2030 err = drbd_wait_misc(mdev, &req->i);
2031 if (err) {
2032 _conn_request_state(mdev->tconn,
2033 NS(conn, C_TIMEOUT),
2034 CS_HARD);
2035 fail_postponed_requests(mdev, sector, size);
2036 goto out;
2037 }
2038 goto repeat;
2039 }
2040 /*
2041 * Remember to restart the conflicting requests after
2042 * the new peer request has completed.
2043 */
2044 peer_req->flags |= EE_RESTART_REQUESTS;
2045 }
2046 }
2047 err = 0;
2048
2049 out:
2050 if (err)
2051 drbd_remove_epoch_entry_interval(mdev, peer_req);
2052 return err;
2053}
2054
b411b363 2055/* mirrored write */
4a76b161 2056static int receive_Data(struct drbd_tconn *tconn, struct packet_info *pi)
b411b363 2057{
4a76b161 2058 struct drbd_conf *mdev;
b411b363 2059 sector_t sector;
db830c46 2060 struct drbd_peer_request *peer_req;
e658983a 2061 struct p_data *p = pi->data;
7be8da07 2062 u32 peer_seq = be32_to_cpu(p->seq_num);
b411b363
PR
2063 int rw = WRITE;
2064 u32 dp_flags;
302bdeae 2065 int err, tp;
b411b363 2066
4a76b161
AG
2067 mdev = vnr_to_mdev(tconn, pi->vnr);
2068 if (!mdev)
2069 return -EIO;
2070
7be8da07 2071 if (!get_ldev(mdev)) {
82bc0194
AG
2072 int err2;
2073
7be8da07 2074 err = wait_for_and_update_peer_seq(mdev, peer_seq);
e2857216 2075 drbd_send_ack_dp(mdev, P_NEG_ACK, p, pi->size);
b411b363 2076 atomic_inc(&mdev->current_epoch->epoch_size);
e2857216 2077 err2 = drbd_drain_block(mdev, pi->size);
82bc0194
AG
2078 if (!err)
2079 err = err2;
2080 return err;
b411b363
PR
2081 }
2082
fcefa62e
AG
2083 /*
2084 * Corresponding put_ldev done either below (on various errors), or in
2085 * drbd_peer_request_endio, if we successfully submit the data at the
2086 * end of this function.
2087 */
b411b363
PR
2088
2089 sector = be64_to_cpu(p->sector);
e2857216 2090 peer_req = read_in_block(mdev, p->block_id, sector, pi->size);
db830c46 2091 if (!peer_req) {
b411b363 2092 put_ldev(mdev);
82bc0194 2093 return -EIO;
b411b363
PR
2094 }
2095
db830c46 2096 peer_req->w.cb = e_end_block;
b411b363 2097
688593c5
LE
2098 dp_flags = be32_to_cpu(p->dp_flags);
2099 rw |= wire_flags_to_bio(mdev, dp_flags);
2100
2101 if (dp_flags & DP_MAY_SET_IN_SYNC)
db830c46 2102 peer_req->flags |= EE_MAY_SET_IN_SYNC;
688593c5 2103
b411b363 2104 spin_lock(&mdev->epoch_lock);
db830c46
AG
2105 peer_req->epoch = mdev->current_epoch;
2106 atomic_inc(&peer_req->epoch->epoch_size);
2107 atomic_inc(&peer_req->epoch->active);
b411b363
PR
2108 spin_unlock(&mdev->epoch_lock);
2109
302bdeae
PR
2110 rcu_read_lock();
2111 tp = rcu_dereference(mdev->tconn->net_conf)->two_primaries;
2112 rcu_read_unlock();
2113 if (tp) {
2114 peer_req->flags |= EE_IN_INTERVAL_TREE;
7be8da07
AG
2115 err = wait_for_and_update_peer_seq(mdev, peer_seq);
2116 if (err)
b411b363 2117 goto out_interrupted;
87eeee41 2118 spin_lock_irq(&mdev->tconn->req_lock);
7be8da07
AG
2119 err = handle_write_conflicts(mdev, peer_req);
2120 if (err) {
2121 spin_unlock_irq(&mdev->tconn->req_lock);
2122 if (err == -ENOENT) {
b411b363 2123 put_ldev(mdev);
82bc0194 2124 return 0;
b411b363 2125 }
7be8da07 2126 goto out_interrupted;
b411b363 2127 }
7be8da07
AG
2128 } else
2129 spin_lock_irq(&mdev->tconn->req_lock);
db830c46 2130 list_add(&peer_req->w.list, &mdev->active_ee);
87eeee41 2131 spin_unlock_irq(&mdev->tconn->req_lock);
b411b363 2132
303d1448 2133 if (mdev->tconn->agreed_pro_version < 100) {
44ed167d
PR
2134 rcu_read_lock();
2135 switch (rcu_dereference(mdev->tconn->net_conf)->wire_protocol) {
303d1448
PR
2136 case DRBD_PROT_C:
2137 dp_flags |= DP_SEND_WRITE_ACK;
2138 break;
2139 case DRBD_PROT_B:
2140 dp_flags |= DP_SEND_RECEIVE_ACK;
2141 break;
2142 }
44ed167d 2143 rcu_read_unlock();
303d1448
PR
2144 }
2145
2146 if (dp_flags & DP_SEND_WRITE_ACK) {
2147 peer_req->flags |= EE_SEND_WRITE_ACK;
b411b363
PR
2148 inc_unacked(mdev);
2149 /* corresponding dec_unacked() in e_end_block()
2150 * respective _drbd_clear_done_ee */
303d1448
PR
2151 }
2152
2153 if (dp_flags & DP_SEND_RECEIVE_ACK) {
b411b363
PR
2154 /* I really don't like it that the receiver thread
2155 * sends on the msock, but anyways */
db830c46 2156 drbd_send_ack(mdev, P_RECV_ACK, peer_req);
b411b363
PR
2157 }
2158
6719fb03 2159 if (mdev->state.pdsk < D_INCONSISTENT) {
b411b363 2160 /* In case we have the only disk of the cluster, */
db830c46
AG
2161 drbd_set_out_of_sync(mdev, peer_req->i.sector, peer_req->i.size);
2162 peer_req->flags |= EE_CALL_AL_COMPLETE_IO;
2163 peer_req->flags &= ~EE_MAY_SET_IN_SYNC;
181286ad 2164 drbd_al_begin_io(mdev, &peer_req->i);
b411b363
PR
2165 }
2166
82bc0194
AG
2167 err = drbd_submit_peer_request(mdev, peer_req, rw, DRBD_FAULT_DT_WR);
2168 if (!err)
2169 return 0;
b411b363 2170
10f6d992
LE
2171 /* don't care for the reason here */
2172 dev_err(DEV, "submit failed, triggering re-connect\n");
87eeee41 2173 spin_lock_irq(&mdev->tconn->req_lock);
db830c46
AG
2174 list_del(&peer_req->w.list);
2175 drbd_remove_epoch_entry_interval(mdev, peer_req);
87eeee41 2176 spin_unlock_irq(&mdev->tconn->req_lock);
db830c46 2177 if (peer_req->flags & EE_CALL_AL_COMPLETE_IO)
181286ad 2178 drbd_al_complete_io(mdev, &peer_req->i);
22cc37a9 2179
b411b363 2180out_interrupted:
db830c46 2181 drbd_may_finish_epoch(mdev, peer_req->epoch, EV_PUT + EV_CLEANUP);
b411b363 2182 put_ldev(mdev);
3967deb1 2183 drbd_free_peer_req(mdev, peer_req);
82bc0194 2184 return err;
b411b363
PR
2185}
2186
0f0601f4
LE
2187/* We may throttle resync, if the lower device seems to be busy,
2188 * and current sync rate is above c_min_rate.
2189 *
2190 * To decide whether or not the lower device is busy, we use a scheme similar
2191 * to MD RAID is_mddev_idle(): if the partition stats reveal "significant"
2192 * (more than 64 sectors) of activity we cannot account for with our own resync
2193 * activity, it obviously is "busy".
2194 *
2195 * The current sync rate used here uses only the most recent two step marks,
2196 * to have a short time average so we can react faster.
2197 */
e3555d85 2198int drbd_rs_should_slow_down(struct drbd_conf *mdev, sector_t sector)
0f0601f4
LE
2199{
2200 struct gendisk *disk = mdev->ldev->backing_bdev->bd_contains->bd_disk;
2201 unsigned long db, dt, dbdt;
e3555d85 2202 struct lc_element *tmp;
0f0601f4
LE
2203 int curr_events;
2204 int throttle = 0;
daeda1cc
PR
2205 unsigned int c_min_rate;
2206
2207 rcu_read_lock();
2208 c_min_rate = rcu_dereference(mdev->ldev->disk_conf)->c_min_rate;
2209 rcu_read_unlock();
0f0601f4
LE
2210
2211 /* feature disabled? */
daeda1cc 2212 if (c_min_rate == 0)
0f0601f4
LE
2213 return 0;
2214
e3555d85
PR
2215 spin_lock_irq(&mdev->al_lock);
2216 tmp = lc_find(mdev->resync, BM_SECT_TO_EXT(sector));
2217 if (tmp) {
2218 struct bm_extent *bm_ext = lc_entry(tmp, struct bm_extent, lce);
2219 if (test_bit(BME_PRIORITY, &bm_ext->flags)) {
2220 spin_unlock_irq(&mdev->al_lock);
2221 return 0;
2222 }
2223 /* Do not slow down if app IO is already waiting for this extent */
2224 }
2225 spin_unlock_irq(&mdev->al_lock);
2226
0f0601f4
LE
2227 curr_events = (int)part_stat_read(&disk->part0, sectors[0]) +
2228 (int)part_stat_read(&disk->part0, sectors[1]) -
2229 atomic_read(&mdev->rs_sect_ev);
e3555d85 2230
0f0601f4
LE
2231 if (!mdev->rs_last_events || curr_events - mdev->rs_last_events > 64) {
2232 unsigned long rs_left;
2233 int i;
2234
2235 mdev->rs_last_events = curr_events;
2236
2237 /* sync speed average over the last 2*DRBD_SYNC_MARK_STEP,
2238 * approx. */
2649f080
LE
2239 i = (mdev->rs_last_mark + DRBD_SYNC_MARKS-1) % DRBD_SYNC_MARKS;
2240
2241 if (mdev->state.conn == C_VERIFY_S || mdev->state.conn == C_VERIFY_T)
2242 rs_left = mdev->ov_left;
2243 else
2244 rs_left = drbd_bm_total_weight(mdev) - mdev->rs_failed;
0f0601f4
LE
2245
2246 dt = ((long)jiffies - (long)mdev->rs_mark_time[i]) / HZ;
2247 if (!dt)
2248 dt++;
2249 db = mdev->rs_mark_left[i] - rs_left;
2250 dbdt = Bit2KB(db/dt);
2251
daeda1cc 2252 if (dbdt > c_min_rate)
0f0601f4
LE
2253 throttle = 1;
2254 }
2255 return throttle;
2256}
2257
2258
4a76b161 2259static int receive_DataRequest(struct drbd_tconn *tconn, struct packet_info *pi)
b411b363 2260{
4a76b161 2261 struct drbd_conf *mdev;
b411b363 2262 sector_t sector;
4a76b161 2263 sector_t capacity;
db830c46 2264 struct drbd_peer_request *peer_req;
b411b363 2265 struct digest_info *di = NULL;
b18b37be 2266 int size, verb;
b411b363 2267 unsigned int fault_type;
e658983a 2268 struct p_block_req *p = pi->data;
4a76b161
AG
2269
2270 mdev = vnr_to_mdev(tconn, pi->vnr);
2271 if (!mdev)
2272 return -EIO;
2273 capacity = drbd_get_capacity(mdev->this_bdev);
b411b363
PR
2274
2275 sector = be64_to_cpu(p->sector);
2276 size = be32_to_cpu(p->blksize);
2277
c670a398 2278 if (size <= 0 || !IS_ALIGNED(size, 512) || size > DRBD_MAX_BIO_SIZE) {
b411b363
PR
2279 dev_err(DEV, "%s:%d: sector: %llus, size: %u\n", __FILE__, __LINE__,
2280 (unsigned long long)sector, size);
82bc0194 2281 return -EINVAL;
b411b363
PR
2282 }
2283 if (sector + (size>>9) > capacity) {
2284 dev_err(DEV, "%s:%d: sector: %llus, size: %u\n", __FILE__, __LINE__,
2285 (unsigned long long)sector, size);
82bc0194 2286 return -EINVAL;
b411b363
PR
2287 }
2288
2289 if (!get_ldev_if_state(mdev, D_UP_TO_DATE)) {
b18b37be 2290 verb = 1;
e2857216 2291 switch (pi->cmd) {
b18b37be
PR
2292 case P_DATA_REQUEST:
2293 drbd_send_ack_rp(mdev, P_NEG_DREPLY, p);
2294 break;
2295 case P_RS_DATA_REQUEST:
2296 case P_CSUM_RS_REQUEST:
2297 case P_OV_REQUEST:
2298 drbd_send_ack_rp(mdev, P_NEG_RS_DREPLY , p);
2299 break;
2300 case P_OV_REPLY:
2301 verb = 0;
2302 dec_rs_pending(mdev);
2303 drbd_send_ack_ex(mdev, P_OV_RESULT, sector, size, ID_IN_SYNC);
2304 break;
2305 default:
49ba9b1b 2306 BUG();
b18b37be
PR
2307 }
2308 if (verb && __ratelimit(&drbd_ratelimit_state))
b411b363
PR
2309 dev_err(DEV, "Can not satisfy peer's read request, "
2310 "no local data.\n");
b18b37be 2311
a821cc4a 2312 /* drain possibly payload */
e2857216 2313 return drbd_drain_block(mdev, pi->size);
b411b363
PR
2314 }
2315
2316 /* GFP_NOIO, because we must not cause arbitrary write-out: in a DRBD
2317 * "criss-cross" setup, that might cause write-out on some other DRBD,
2318 * which in turn might block on the other node at this very place. */
0db55363 2319 peer_req = drbd_alloc_peer_req(mdev, p->block_id, sector, size, GFP_NOIO);
db830c46 2320 if (!peer_req) {
b411b363 2321 put_ldev(mdev);
82bc0194 2322 return -ENOMEM;
b411b363
PR
2323 }
2324
e2857216 2325 switch (pi->cmd) {
b411b363 2326 case P_DATA_REQUEST:
db830c46 2327 peer_req->w.cb = w_e_end_data_req;
b411b363 2328 fault_type = DRBD_FAULT_DT_RD;
80a40e43
LE
2329 /* application IO, don't drbd_rs_begin_io */
2330 goto submit;
2331
b411b363 2332 case P_RS_DATA_REQUEST:
db830c46 2333 peer_req->w.cb = w_e_end_rsdata_req;
b411b363 2334 fault_type = DRBD_FAULT_RS_RD;
5f9915bb
LE
2335 /* used in the sector offset progress display */
2336 mdev->bm_resync_fo = BM_SECT_TO_BIT(sector);
b411b363
PR
2337 break;
2338
2339 case P_OV_REPLY:
2340 case P_CSUM_RS_REQUEST:
2341 fault_type = DRBD_FAULT_RS_RD;
e2857216 2342 di = kmalloc(sizeof(*di) + pi->size, GFP_NOIO);
b411b363
PR
2343 if (!di)
2344 goto out_free_e;
2345
e2857216 2346 di->digest_size = pi->size;
b411b363
PR
2347 di->digest = (((char *)di)+sizeof(struct digest_info));
2348
db830c46
AG
2349 peer_req->digest = di;
2350 peer_req->flags |= EE_HAS_DIGEST;
c36c3ced 2351
e2857216 2352 if (drbd_recv_all(mdev->tconn, di->digest, pi->size))
b411b363
PR
2353 goto out_free_e;
2354
e2857216 2355 if (pi->cmd == P_CSUM_RS_REQUEST) {
31890f4a 2356 D_ASSERT(mdev->tconn->agreed_pro_version >= 89);
db830c46 2357 peer_req->w.cb = w_e_end_csum_rs_req;
5f9915bb
LE
2358 /* used in the sector offset progress display */
2359 mdev->bm_resync_fo = BM_SECT_TO_BIT(sector);
e2857216 2360 } else if (pi->cmd == P_OV_REPLY) {
2649f080
LE
2361 /* track progress, we may need to throttle */
2362 atomic_add(size >> 9, &mdev->rs_sect_in);
db830c46 2363 peer_req->w.cb = w_e_end_ov_reply;
b411b363 2364 dec_rs_pending(mdev);
0f0601f4
LE
2365 /* drbd_rs_begin_io done when we sent this request,
2366 * but accounting still needs to be done. */
2367 goto submit_for_resync;
b411b363
PR
2368 }
2369 break;
2370
2371 case P_OV_REQUEST:
b411b363 2372 if (mdev->ov_start_sector == ~(sector_t)0 &&
31890f4a 2373 mdev->tconn->agreed_pro_version >= 90) {
de228bba
LE
2374 unsigned long now = jiffies;
2375 int i;
b411b363
PR
2376 mdev->ov_start_sector = sector;
2377 mdev->ov_position = sector;
30b743a2
LE
2378 mdev->ov_left = drbd_bm_bits(mdev) - BM_SECT_TO_BIT(sector);
2379 mdev->rs_total = mdev->ov_left;
de228bba
LE
2380 for (i = 0; i < DRBD_SYNC_MARKS; i++) {
2381 mdev->rs_mark_left[i] = mdev->ov_left;
2382 mdev->rs_mark_time[i] = now;
2383 }
b411b363
PR
2384 dev_info(DEV, "Online Verify start sector: %llu\n",
2385 (unsigned long long)sector);
2386 }
db830c46 2387 peer_req->w.cb = w_e_end_ov_req;
b411b363 2388 fault_type = DRBD_FAULT_RS_RD;
b411b363
PR
2389 break;
2390
b411b363 2391 default:
49ba9b1b 2392 BUG();
b411b363
PR
2393 }
2394
0f0601f4
LE
2395 /* Throttle, drbd_rs_begin_io and submit should become asynchronous
2396 * wrt the receiver, but it is not as straightforward as it may seem.
2397 * Various places in the resync start and stop logic assume resync
2398 * requests are processed in order, requeuing this on the worker thread
2399 * introduces a bunch of new code for synchronization between threads.
2400 *
2401 * Unlimited throttling before drbd_rs_begin_io may stall the resync
2402 * "forever", throttling after drbd_rs_begin_io will lock that extent
2403 * for application writes for the same time. For now, just throttle
2404 * here, where the rest of the code expects the receiver to sleep for
2405 * a while, anyways.
2406 */
2407
2408 /* Throttle before drbd_rs_begin_io, as that locks out application IO;
2409 * this defers syncer requests for some time, before letting at least
2410 * on request through. The resync controller on the receiving side
2411 * will adapt to the incoming rate accordingly.
2412 *
2413 * We cannot throttle here if remote is Primary/SyncTarget:
2414 * we would also throttle its application reads.
2415 * In that case, throttling is done on the SyncTarget only.
2416 */
e3555d85
PR
2417 if (mdev->state.peer != R_PRIMARY && drbd_rs_should_slow_down(mdev, sector))
2418 schedule_timeout_uninterruptible(HZ/10);
2419 if (drbd_rs_begin_io(mdev, sector))
80a40e43 2420 goto out_free_e;
b411b363 2421
0f0601f4
LE
2422submit_for_resync:
2423 atomic_add(size >> 9, &mdev->rs_sect_ev);
2424
80a40e43 2425submit:
b411b363 2426 inc_unacked(mdev);
87eeee41 2427 spin_lock_irq(&mdev->tconn->req_lock);
db830c46 2428 list_add_tail(&peer_req->w.list, &mdev->read_ee);
87eeee41 2429 spin_unlock_irq(&mdev->tconn->req_lock);
b411b363 2430
fbe29dec 2431 if (drbd_submit_peer_request(mdev, peer_req, READ, fault_type) == 0)
82bc0194 2432 return 0;
b411b363 2433
10f6d992
LE
2434 /* don't care for the reason here */
2435 dev_err(DEV, "submit failed, triggering re-connect\n");
87eeee41 2436 spin_lock_irq(&mdev->tconn->req_lock);
db830c46 2437 list_del(&peer_req->w.list);
87eeee41 2438 spin_unlock_irq(&mdev->tconn->req_lock);
22cc37a9
LE
2439 /* no drbd_rs_complete_io(), we are dropping the connection anyways */
2440
b411b363 2441out_free_e:
b411b363 2442 put_ldev(mdev);
3967deb1 2443 drbd_free_peer_req(mdev, peer_req);
82bc0194 2444 return -EIO;
b411b363
PR
2445}
2446
2447static int drbd_asb_recover_0p(struct drbd_conf *mdev) __must_hold(local)
2448{
2449 int self, peer, rv = -100;
2450 unsigned long ch_self, ch_peer;
44ed167d 2451 enum drbd_after_sb_p after_sb_0p;
b411b363
PR
2452
2453 self = mdev->ldev->md.uuid[UI_BITMAP] & 1;
2454 peer = mdev->p_uuid[UI_BITMAP] & 1;
2455
2456 ch_peer = mdev->p_uuid[UI_SIZE];
2457 ch_self = mdev->comm_bm_set;
2458
44ed167d
PR
2459 rcu_read_lock();
2460 after_sb_0p = rcu_dereference(mdev->tconn->net_conf)->after_sb_0p;
2461 rcu_read_unlock();
2462 switch (after_sb_0p) {
b411b363
PR
2463 case ASB_CONSENSUS:
2464 case ASB_DISCARD_SECONDARY:
2465 case ASB_CALL_HELPER:
44ed167d 2466 case ASB_VIOLENTLY:
b411b363
PR
2467 dev_err(DEV, "Configuration error.\n");
2468 break;
2469 case ASB_DISCONNECT:
2470 break;
2471 case ASB_DISCARD_YOUNGER_PRI:
2472 if (self == 0 && peer == 1) {
2473 rv = -1;
2474 break;
2475 }
2476 if (self == 1 && peer == 0) {
2477 rv = 1;
2478 break;
2479 }
2480 /* Else fall through to one of the other strategies... */
2481 case ASB_DISCARD_OLDER_PRI:
2482 if (self == 0 && peer == 1) {
2483 rv = 1;
2484 break;
2485 }
2486 if (self == 1 && peer == 0) {
2487 rv = -1;
2488 break;
2489 }
2490 /* Else fall through to one of the other strategies... */
ad19bf6e 2491 dev_warn(DEV, "Discard younger/older primary did not find a decision\n"
b411b363
PR
2492 "Using discard-least-changes instead\n");
2493 case ASB_DISCARD_ZERO_CHG:
2494 if (ch_peer == 0 && ch_self == 0) {
25703f83 2495 rv = test_bit(DISCARD_CONCURRENT, &mdev->tconn->flags)
b411b363
PR
2496 ? -1 : 1;
2497 break;
2498 } else {
2499 if (ch_peer == 0) { rv = 1; break; }
2500 if (ch_self == 0) { rv = -1; break; }
2501 }
44ed167d 2502 if (after_sb_0p == ASB_DISCARD_ZERO_CHG)
b411b363
PR
2503 break;
2504 case ASB_DISCARD_LEAST_CHG:
2505 if (ch_self < ch_peer)
2506 rv = -1;
2507 else if (ch_self > ch_peer)
2508 rv = 1;
2509 else /* ( ch_self == ch_peer ) */
2510 /* Well, then use something else. */
25703f83 2511 rv = test_bit(DISCARD_CONCURRENT, &mdev->tconn->flags)
b411b363
PR
2512 ? -1 : 1;
2513 break;
2514 case ASB_DISCARD_LOCAL:
2515 rv = -1;
2516 break;
2517 case ASB_DISCARD_REMOTE:
2518 rv = 1;
2519 }
2520
2521 return rv;
2522}
2523
2524static int drbd_asb_recover_1p(struct drbd_conf *mdev) __must_hold(local)
2525{
6184ea21 2526 int hg, rv = -100;
44ed167d 2527 enum drbd_after_sb_p after_sb_1p;
b411b363 2528
44ed167d
PR
2529 rcu_read_lock();
2530 after_sb_1p = rcu_dereference(mdev->tconn->net_conf)->after_sb_1p;
2531 rcu_read_unlock();
2532 switch (after_sb_1p) {
b411b363
PR
2533 case ASB_DISCARD_YOUNGER_PRI:
2534 case ASB_DISCARD_OLDER_PRI:
2535 case ASB_DISCARD_LEAST_CHG:
2536 case ASB_DISCARD_LOCAL:
2537 case ASB_DISCARD_REMOTE:
44ed167d 2538 case ASB_DISCARD_ZERO_CHG:
b411b363
PR
2539 dev_err(DEV, "Configuration error.\n");
2540 break;
2541 case ASB_DISCONNECT:
2542 break;
2543 case ASB_CONSENSUS:
2544 hg = drbd_asb_recover_0p(mdev);
2545 if (hg == -1 && mdev->state.role == R_SECONDARY)
2546 rv = hg;
2547 if (hg == 1 && mdev->state.role == R_PRIMARY)
2548 rv = hg;
2549 break;
2550 case ASB_VIOLENTLY:
2551 rv = drbd_asb_recover_0p(mdev);
2552 break;
2553 case ASB_DISCARD_SECONDARY:
2554 return mdev->state.role == R_PRIMARY ? 1 : -1;
2555 case ASB_CALL_HELPER:
2556 hg = drbd_asb_recover_0p(mdev);
2557 if (hg == -1 && mdev->state.role == R_PRIMARY) {
bb437946
AG
2558 enum drbd_state_rv rv2;
2559
2560 drbd_set_role(mdev, R_SECONDARY, 0);
b411b363
PR
2561 /* drbd_change_state() does not sleep while in SS_IN_TRANSIENT_STATE,
2562 * we might be here in C_WF_REPORT_PARAMS which is transient.
2563 * we do not need to wait for the after state change work either. */
bb437946
AG
2564 rv2 = drbd_change_state(mdev, CS_VERBOSE, NS(role, R_SECONDARY));
2565 if (rv2 != SS_SUCCESS) {
b411b363
PR
2566 drbd_khelper(mdev, "pri-lost-after-sb");
2567 } else {
2568 dev_warn(DEV, "Successfully gave up primary role.\n");
2569 rv = hg;
2570 }
2571 } else
2572 rv = hg;
2573 }
2574
2575 return rv;
2576}
2577
2578static int drbd_asb_recover_2p(struct drbd_conf *mdev) __must_hold(local)
2579{
6184ea21 2580 int hg, rv = -100;
44ed167d 2581 enum drbd_after_sb_p after_sb_2p;
b411b363 2582
44ed167d
PR
2583 rcu_read_lock();
2584 after_sb_2p = rcu_dereference(mdev->tconn->net_conf)->after_sb_2p;
2585 rcu_read_unlock();
2586 switch (after_sb_2p) {
b411b363
PR
2587 case ASB_DISCARD_YOUNGER_PRI:
2588 case ASB_DISCARD_OLDER_PRI:
2589 case ASB_DISCARD_LEAST_CHG:
2590 case ASB_DISCARD_LOCAL:
2591 case ASB_DISCARD_REMOTE:
2592 case ASB_CONSENSUS:
2593 case ASB_DISCARD_SECONDARY:
44ed167d 2594 case ASB_DISCARD_ZERO_CHG:
b411b363
PR
2595 dev_err(DEV, "Configuration error.\n");
2596 break;
2597 case ASB_VIOLENTLY:
2598 rv = drbd_asb_recover_0p(mdev);
2599 break;
2600 case ASB_DISCONNECT:
2601 break;
2602 case ASB_CALL_HELPER:
2603 hg = drbd_asb_recover_0p(mdev);
2604 if (hg == -1) {
bb437946
AG
2605 enum drbd_state_rv rv2;
2606
b411b363
PR
2607 /* drbd_change_state() does not sleep while in SS_IN_TRANSIENT_STATE,
2608 * we might be here in C_WF_REPORT_PARAMS which is transient.
2609 * we do not need to wait for the after state change work either. */
bb437946
AG
2610 rv2 = drbd_change_state(mdev, CS_VERBOSE, NS(role, R_SECONDARY));
2611 if (rv2 != SS_SUCCESS) {
b411b363
PR
2612 drbd_khelper(mdev, "pri-lost-after-sb");
2613 } else {
2614 dev_warn(DEV, "Successfully gave up primary role.\n");
2615 rv = hg;
2616 }
2617 } else
2618 rv = hg;
2619 }
2620
2621 return rv;
2622}
2623
2624static void drbd_uuid_dump(struct drbd_conf *mdev, char *text, u64 *uuid,
2625 u64 bits, u64 flags)
2626{
2627 if (!uuid) {
2628 dev_info(DEV, "%s uuid info vanished while I was looking!\n", text);
2629 return;
2630 }
2631 dev_info(DEV, "%s %016llX:%016llX:%016llX:%016llX bits:%llu flags:%llX\n",
2632 text,
2633 (unsigned long long)uuid[UI_CURRENT],
2634 (unsigned long long)uuid[UI_BITMAP],
2635 (unsigned long long)uuid[UI_HISTORY_START],
2636 (unsigned long long)uuid[UI_HISTORY_END],
2637 (unsigned long long)bits,
2638 (unsigned long long)flags);
2639}
2640
2641/*
2642 100 after split brain try auto recover
2643 2 C_SYNC_SOURCE set BitMap
2644 1 C_SYNC_SOURCE use BitMap
2645 0 no Sync
2646 -1 C_SYNC_TARGET use BitMap
2647 -2 C_SYNC_TARGET set BitMap
2648 -100 after split brain, disconnect
2649-1000 unrelated data
4a23f264
PR
2650-1091 requires proto 91
2651-1096 requires proto 96
b411b363
PR
2652 */
2653static int drbd_uuid_compare(struct drbd_conf *mdev, int *rule_nr) __must_hold(local)
2654{
2655 u64 self, peer;
2656 int i, j;
2657
2658 self = mdev->ldev->md.uuid[UI_CURRENT] & ~((u64)1);
2659 peer = mdev->p_uuid[UI_CURRENT] & ~((u64)1);
2660
2661 *rule_nr = 10;
2662 if (self == UUID_JUST_CREATED && peer == UUID_JUST_CREATED)
2663 return 0;
2664
2665 *rule_nr = 20;
2666 if ((self == UUID_JUST_CREATED || self == (u64)0) &&
2667 peer != UUID_JUST_CREATED)
2668 return -2;
2669
2670 *rule_nr = 30;
2671 if (self != UUID_JUST_CREATED &&
2672 (peer == UUID_JUST_CREATED || peer == (u64)0))
2673 return 2;
2674
2675 if (self == peer) {
2676 int rct, dc; /* roles at crash time */
2677
2678 if (mdev->p_uuid[UI_BITMAP] == (u64)0 && mdev->ldev->md.uuid[UI_BITMAP] != (u64)0) {
2679
31890f4a 2680 if (mdev->tconn->agreed_pro_version < 91)
4a23f264 2681 return -1091;
b411b363
PR
2682
2683 if ((mdev->ldev->md.uuid[UI_BITMAP] & ~((u64)1)) == (mdev->p_uuid[UI_HISTORY_START] & ~((u64)1)) &&
2684 (mdev->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) == (mdev->p_uuid[UI_HISTORY_START + 1] & ~((u64)1))) {
2685 dev_info(DEV, "was SyncSource, missed the resync finished event, corrected myself:\n");
2686 drbd_uuid_set_bm(mdev, 0UL);
2687
2688 drbd_uuid_dump(mdev, "self", mdev->ldev->md.uuid,
2689 mdev->state.disk >= D_NEGOTIATING ? drbd_bm_total_weight(mdev) : 0, 0);
2690 *rule_nr = 34;
2691 } else {
2692 dev_info(DEV, "was SyncSource (peer failed to write sync_uuid)\n");
2693 *rule_nr = 36;
2694 }
2695
2696 return 1;
2697 }
2698
2699 if (mdev->ldev->md.uuid[UI_BITMAP] == (u64)0 && mdev->p_uuid[UI_BITMAP] != (u64)0) {
2700
31890f4a 2701 if (mdev->tconn->agreed_pro_version < 91)
4a23f264 2702 return -1091;
b411b363
PR
2703
2704 if ((mdev->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) == (mdev->p_uuid[UI_BITMAP] & ~((u64)1)) &&
2705 (mdev->ldev->md.uuid[UI_HISTORY_START + 1] & ~((u64)1)) == (mdev->p_uuid[UI_HISTORY_START] & ~((u64)1))) {
2706 dev_info(DEV, "was SyncTarget, peer missed the resync finished event, corrected peer:\n");
2707
2708 mdev->p_uuid[UI_HISTORY_START + 1] = mdev->p_uuid[UI_HISTORY_START];
2709 mdev->p_uuid[UI_HISTORY_START] = mdev->p_uuid[UI_BITMAP];
2710 mdev->p_uuid[UI_BITMAP] = 0UL;
2711
2712 drbd_uuid_dump(mdev, "peer", mdev->p_uuid, mdev->p_uuid[UI_SIZE], mdev->p_uuid[UI_FLAGS]);
2713 *rule_nr = 35;
2714 } else {
2715 dev_info(DEV, "was SyncTarget (failed to write sync_uuid)\n");
2716 *rule_nr = 37;
2717 }
2718
2719 return -1;
2720 }
2721
2722 /* Common power [off|failure] */
2723 rct = (test_bit(CRASHED_PRIMARY, &mdev->flags) ? 1 : 0) +
2724 (mdev->p_uuid[UI_FLAGS] & 2);
2725 /* lowest bit is set when we were primary,
2726 * next bit (weight 2) is set when peer was primary */
2727 *rule_nr = 40;
2728
2729 switch (rct) {
2730 case 0: /* !self_pri && !peer_pri */ return 0;
2731 case 1: /* self_pri && !peer_pri */ return 1;
2732 case 2: /* !self_pri && peer_pri */ return -1;
2733 case 3: /* self_pri && peer_pri */
25703f83 2734 dc = test_bit(DISCARD_CONCURRENT, &mdev->tconn->flags);
b411b363
PR
2735 return dc ? -1 : 1;
2736 }
2737 }
2738
2739 *rule_nr = 50;
2740 peer = mdev->p_uuid[UI_BITMAP] & ~((u64)1);
2741 if (self == peer)
2742 return -1;
2743
2744 *rule_nr = 51;
2745 peer = mdev->p_uuid[UI_HISTORY_START] & ~((u64)1);
2746 if (self == peer) {
31890f4a 2747 if (mdev->tconn->agreed_pro_version < 96 ?
4a23f264
PR
2748 (mdev->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) ==
2749 (mdev->p_uuid[UI_HISTORY_START + 1] & ~((u64)1)) :
2750 peer + UUID_NEW_BM_OFFSET == (mdev->p_uuid[UI_BITMAP] & ~((u64)1))) {
b411b363
PR
2751 /* The last P_SYNC_UUID did not get though. Undo the last start of
2752 resync as sync source modifications of the peer's UUIDs. */
2753
31890f4a 2754 if (mdev->tconn->agreed_pro_version < 91)
4a23f264 2755 return -1091;
b411b363
PR
2756
2757 mdev->p_uuid[UI_BITMAP] = mdev->p_uuid[UI_HISTORY_START];
2758 mdev->p_uuid[UI_HISTORY_START] = mdev->p_uuid[UI_HISTORY_START + 1];
4a23f264
PR
2759
2760 dev_info(DEV, "Did not got last syncUUID packet, corrected:\n");
2761 drbd_uuid_dump(mdev, "peer", mdev->p_uuid, mdev->p_uuid[UI_SIZE], mdev->p_uuid[UI_FLAGS]);
2762
b411b363
PR
2763 return -1;
2764 }
2765 }
2766
2767 *rule_nr = 60;
2768 self = mdev->ldev->md.uuid[UI_CURRENT] & ~((u64)1);
2769 for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
2770 peer = mdev->p_uuid[i] & ~((u64)1);
2771 if (self == peer)
2772 return -2;
2773 }
2774
2775 *rule_nr = 70;
2776 self = mdev->ldev->md.uuid[UI_BITMAP] & ~((u64)1);
2777 peer = mdev->p_uuid[UI_CURRENT] & ~((u64)1);
2778 if (self == peer)
2779 return 1;
2780
2781 *rule_nr = 71;
2782 self = mdev->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1);
2783 if (self == peer) {
31890f4a 2784 if (mdev->tconn->agreed_pro_version < 96 ?
4a23f264
PR
2785 (mdev->ldev->md.uuid[UI_HISTORY_START + 1] & ~((u64)1)) ==
2786 (mdev->p_uuid[UI_HISTORY_START] & ~((u64)1)) :
2787 self + UUID_NEW_BM_OFFSET == (mdev->ldev->md.uuid[UI_BITMAP] & ~((u64)1))) {
b411b363
PR
2788 /* The last P_SYNC_UUID did not get though. Undo the last start of
2789 resync as sync source modifications of our UUIDs. */
2790
31890f4a 2791 if (mdev->tconn->agreed_pro_version < 91)
4a23f264 2792 return -1091;
b411b363
PR
2793
2794 _drbd_uuid_set(mdev, UI_BITMAP, mdev->ldev->md.uuid[UI_HISTORY_START]);
2795 _drbd_uuid_set(mdev, UI_HISTORY_START, mdev->ldev->md.uuid[UI_HISTORY_START + 1]);
2796
4a23f264 2797 dev_info(DEV, "Last syncUUID did not get through, corrected:\n");
b411b363
PR
2798 drbd_uuid_dump(mdev, "self", mdev->ldev->md.uuid,
2799 mdev->state.disk >= D_NEGOTIATING ? drbd_bm_total_weight(mdev) : 0, 0);
2800
2801 return 1;
2802 }
2803 }
2804
2805
2806 *rule_nr = 80;
d8c2a36b 2807 peer = mdev->p_uuid[UI_CURRENT] & ~((u64)1);
b411b363
PR
2808 for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
2809 self = mdev->ldev->md.uuid[i] & ~((u64)1);
2810 if (self == peer)
2811 return 2;
2812 }
2813
2814 *rule_nr = 90;
2815 self = mdev->ldev->md.uuid[UI_BITMAP] & ~((u64)1);
2816 peer = mdev->p_uuid[UI_BITMAP] & ~((u64)1);
2817 if (self == peer && self != ((u64)0))
2818 return 100;
2819
2820 *rule_nr = 100;
2821 for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
2822 self = mdev->ldev->md.uuid[i] & ~((u64)1);
2823 for (j = UI_HISTORY_START; j <= UI_HISTORY_END; j++) {
2824 peer = mdev->p_uuid[j] & ~((u64)1);
2825 if (self == peer)
2826 return -100;
2827 }
2828 }
2829
2830 return -1000;
2831}
2832
2833/* drbd_sync_handshake() returns the new conn state on success, or
2834 CONN_MASK (-1) on failure.
2835 */
2836static enum drbd_conns drbd_sync_handshake(struct drbd_conf *mdev, enum drbd_role peer_role,
2837 enum drbd_disk_state peer_disk) __must_hold(local)
2838{
b411b363
PR
2839 enum drbd_conns rv = C_MASK;
2840 enum drbd_disk_state mydisk;
44ed167d
PR
2841 struct net_conf *nc;
2842 int hg, rule_nr, rr_conflict, dry_run;
b411b363
PR
2843
2844 mydisk = mdev->state.disk;
2845 if (mydisk == D_NEGOTIATING)
2846 mydisk = mdev->new_state_tmp.disk;
2847
2848 dev_info(DEV, "drbd_sync_handshake:\n");
2849 drbd_uuid_dump(mdev, "self", mdev->ldev->md.uuid, mdev->comm_bm_set, 0);
2850 drbd_uuid_dump(mdev, "peer", mdev->p_uuid,
2851 mdev->p_uuid[UI_SIZE], mdev->p_uuid[UI_FLAGS]);
2852
2853 hg = drbd_uuid_compare(mdev, &rule_nr);
2854
2855 dev_info(DEV, "uuid_compare()=%d by rule %d\n", hg, rule_nr);
2856
2857 if (hg == -1000) {
2858 dev_alert(DEV, "Unrelated data, aborting!\n");
2859 return C_MASK;
2860 }
4a23f264
PR
2861 if (hg < -1000) {
2862 dev_alert(DEV, "To resolve this both sides have to support at least protocol %d\n", -hg - 1000);
b411b363
PR
2863 return C_MASK;
2864 }
2865
2866 if ((mydisk == D_INCONSISTENT && peer_disk > D_INCONSISTENT) ||
2867 (peer_disk == D_INCONSISTENT && mydisk > D_INCONSISTENT)) {
2868 int f = (hg == -100) || abs(hg) == 2;
2869 hg = mydisk > D_INCONSISTENT ? 1 : -1;
2870 if (f)
2871 hg = hg*2;
2872 dev_info(DEV, "Becoming sync %s due to disk states.\n",
2873 hg > 0 ? "source" : "target");
2874 }
2875
3a11a487
AG
2876 if (abs(hg) == 100)
2877 drbd_khelper(mdev, "initial-split-brain");
2878
44ed167d
PR
2879 rcu_read_lock();
2880 nc = rcu_dereference(mdev->tconn->net_conf);
2881
2882 if (hg == 100 || (hg == -100 && nc->always_asbp)) {
b411b363
PR
2883 int pcount = (mdev->state.role == R_PRIMARY)
2884 + (peer_role == R_PRIMARY);
2885 int forced = (hg == -100);
2886
2887 switch (pcount) {
2888 case 0:
2889 hg = drbd_asb_recover_0p(mdev);
2890 break;
2891 case 1:
2892 hg = drbd_asb_recover_1p(mdev);
2893 break;
2894 case 2:
2895 hg = drbd_asb_recover_2p(mdev);
2896 break;
2897 }
2898 if (abs(hg) < 100) {
2899 dev_warn(DEV, "Split-Brain detected, %d primaries, "
2900 "automatically solved. Sync from %s node\n",
2901 pcount, (hg < 0) ? "peer" : "this");
2902 if (forced) {
2903 dev_warn(DEV, "Doing a full sync, since"
2904 " UUIDs where ambiguous.\n");
2905 hg = hg*2;
2906 }
2907 }
2908 }
2909
2910 if (hg == -100) {
6139f60d 2911 if (nc->discard_my_data && !(mdev->p_uuid[UI_FLAGS]&1))
b411b363 2912 hg = -1;
6139f60d 2913 if (!nc->discard_my_data && (mdev->p_uuid[UI_FLAGS]&1))
b411b363
PR
2914 hg = 1;
2915
2916 if (abs(hg) < 100)
2917 dev_warn(DEV, "Split-Brain detected, manually solved. "
2918 "Sync from %s node\n",
2919 (hg < 0) ? "peer" : "this");
2920 }
44ed167d
PR
2921 rr_conflict = nc->rr_conflict;
2922 dry_run = nc->dry_run;
2923 rcu_read_unlock();
b411b363
PR
2924
2925 if (hg == -100) {
580b9767
LE
2926 /* FIXME this log message is not correct if we end up here
2927 * after an attempted attach on a diskless node.
2928 * We just refuse to attach -- well, we drop the "connection"
2929 * to that disk, in a way... */
3a11a487 2930 dev_alert(DEV, "Split-Brain detected but unresolved, dropping connection!\n");
b411b363
PR
2931 drbd_khelper(mdev, "split-brain");
2932 return C_MASK;
2933 }
2934
2935 if (hg > 0 && mydisk <= D_INCONSISTENT) {
2936 dev_err(DEV, "I shall become SyncSource, but I am inconsistent!\n");
2937 return C_MASK;
2938 }
2939
2940 if (hg < 0 && /* by intention we do not use mydisk here. */
2941 mdev->state.role == R_PRIMARY && mdev->state.disk >= D_CONSISTENT) {
44ed167d 2942 switch (rr_conflict) {
b411b363
PR
2943 case ASB_CALL_HELPER:
2944 drbd_khelper(mdev, "pri-lost");
2945 /* fall through */
2946 case ASB_DISCONNECT:
2947 dev_err(DEV, "I shall become SyncTarget, but I am primary!\n");
2948 return C_MASK;
2949 case ASB_VIOLENTLY:
2950 dev_warn(DEV, "Becoming SyncTarget, violating the stable-data"
2951 "assumption\n");
2952 }
2953 }
2954
44ed167d 2955 if (dry_run || test_bit(CONN_DRY_RUN, &mdev->tconn->flags)) {
cf14c2e9
PR
2956 if (hg == 0)
2957 dev_info(DEV, "dry-run connect: No resync, would become Connected immediately.\n");
2958 else
2959 dev_info(DEV, "dry-run connect: Would become %s, doing a %s resync.",
2960 drbd_conn_str(hg > 0 ? C_SYNC_SOURCE : C_SYNC_TARGET),
2961 abs(hg) >= 2 ? "full" : "bit-map based");
2962 return C_MASK;
2963 }
2964
b411b363
PR
2965 if (abs(hg) >= 2) {
2966 dev_info(DEV, "Writing the whole bitmap, full sync required after drbd_sync_handshake.\n");
20ceb2b2
LE
2967 if (drbd_bitmap_io(mdev, &drbd_bmio_set_n_write, "set_n_write from sync_handshake",
2968 BM_LOCKED_SET_ALLOWED))
b411b363
PR
2969 return C_MASK;
2970 }
2971
2972 if (hg > 0) { /* become sync source. */
2973 rv = C_WF_BITMAP_S;
2974 } else if (hg < 0) { /* become sync target */
2975 rv = C_WF_BITMAP_T;
2976 } else {
2977 rv = C_CONNECTED;
2978 if (drbd_bm_total_weight(mdev)) {
2979 dev_info(DEV, "No resync, but %lu bits in bitmap!\n",
2980 drbd_bm_total_weight(mdev));
2981 }
2982 }
2983
2984 return rv;
2985}
2986
2987/* returns 1 if invalid */
2988static int cmp_after_sb(enum drbd_after_sb_p peer, enum drbd_after_sb_p self)
2989{
2990 /* ASB_DISCARD_REMOTE - ASB_DISCARD_LOCAL is valid */
2991 if ((peer == ASB_DISCARD_REMOTE && self == ASB_DISCARD_LOCAL) ||
2992 (self == ASB_DISCARD_REMOTE && peer == ASB_DISCARD_LOCAL))
2993 return 0;
2994
2995 /* any other things with ASB_DISCARD_REMOTE or ASB_DISCARD_LOCAL are invalid */
2996 if (peer == ASB_DISCARD_REMOTE || peer == ASB_DISCARD_LOCAL ||
2997 self == ASB_DISCARD_REMOTE || self == ASB_DISCARD_LOCAL)
2998 return 1;
2999
3000 /* everything else is valid if they are equal on both sides. */
3001 if (peer == self)
3002 return 0;
3003
3004 /* everything es is invalid. */
3005 return 1;
3006}
3007
e2857216 3008static int receive_protocol(struct drbd_tconn *tconn, struct packet_info *pi)
b411b363 3009{
e658983a 3010 struct p_protocol *p = pi->data;
b411b363 3011 int p_proto, p_after_sb_0p, p_after_sb_1p, p_after_sb_2p;
6139f60d 3012 int p_discard_my_data, p_two_primaries, cf;
44ed167d 3013 struct net_conf *nc;
b411b363 3014
b411b363
PR
3015 p_proto = be32_to_cpu(p->protocol);
3016 p_after_sb_0p = be32_to_cpu(p->after_sb_0p);
3017 p_after_sb_1p = be32_to_cpu(p->after_sb_1p);
3018 p_after_sb_2p = be32_to_cpu(p->after_sb_2p);
b411b363 3019 p_two_primaries = be32_to_cpu(p->two_primaries);
cf14c2e9 3020 cf = be32_to_cpu(p->conn_flags);
6139f60d 3021 p_discard_my_data = cf & CF_DISCARD_MY_DATA;
cf14c2e9 3022
86db0618 3023 if (tconn->agreed_pro_version >= 87) {
88104ca4
AG
3024 char integrity_alg[SHARED_SECRET_MAX];
3025 struct crypto_hash *tfm = NULL;
86db0618
AG
3026 int err;
3027
88104ca4 3028 if (pi->size > sizeof(integrity_alg))
86db0618 3029 return -EIO;
88104ca4 3030 err = drbd_recv_all(tconn, integrity_alg, pi->size);
86db0618
AG
3031 if (err)
3032 return err;
88104ca4
AG
3033 integrity_alg[SHARED_SECRET_MAX-1] = 0;
3034
3035 if (integrity_alg[0]) {
3036 tfm = crypto_alloc_hash(integrity_alg, 0, CRYPTO_ALG_ASYNC);
3037 if (!tfm) {
3038 conn_err(tconn, "peer data-integrity-alg %s not supported\n",
3039 integrity_alg);
3040 goto disconnect;
3041 }
3042 conn_info(tconn, "peer data-integrity-alg: %s\n", integrity_alg);
3043 }
86db0618 3044
88104ca4
AG
3045 if (tconn->peer_integrity_tfm)
3046 crypto_free_hash(tconn->peer_integrity_tfm);
3047 tconn->peer_integrity_tfm = tfm;
86db0618
AG
3048 }
3049
7204624c 3050 clear_bit(CONN_DRY_RUN, &tconn->flags);
cf14c2e9
PR
3051
3052 if (cf & CF_DRY_RUN)
7204624c 3053 set_bit(CONN_DRY_RUN, &tconn->flags);
b411b363 3054
44ed167d
PR
3055 rcu_read_lock();
3056 nc = rcu_dereference(tconn->net_conf);
3057
3058 if (p_proto != nc->wire_protocol && tconn->agreed_pro_version < 100) {
7204624c 3059 conn_err(tconn, "incompatible communication protocols\n");
44ed167d 3060 goto disconnect_rcu_unlock;
b411b363
PR
3061 }
3062
44ed167d 3063 if (cmp_after_sb(p_after_sb_0p, nc->after_sb_0p)) {
7204624c 3064 conn_err(tconn, "incompatible after-sb-0pri settings\n");
44ed167d 3065 goto disconnect_rcu_unlock;
b411b363
PR
3066 }
3067
44ed167d 3068 if (cmp_after_sb(p_after_sb_1p, nc->after_sb_1p)) {
7204624c 3069 conn_err(tconn, "incompatible after-sb-1pri settings\n");
44ed167d 3070 goto disconnect_rcu_unlock;
b411b363
PR
3071 }
3072
44ed167d 3073 if (cmp_after_sb(p_after_sb_2p, nc->after_sb_2p)) {
7204624c 3074 conn_err(tconn, "incompatible after-sb-2pri settings\n");
44ed167d 3075 goto disconnect_rcu_unlock;
b411b363
PR
3076 }
3077
6139f60d
AG
3078 if (p_discard_my_data && nc->discard_my_data) {
3079 conn_err(tconn, "both sides have the 'discard_my_data' flag set\n");
44ed167d 3080 goto disconnect_rcu_unlock;
b411b363
PR
3081 }
3082
44ed167d 3083 if (p_two_primaries != nc->two_primaries) {
7204624c 3084 conn_err(tconn, "incompatible setting of the two-primaries options\n");
44ed167d 3085 goto disconnect_rcu_unlock;
b411b363
PR
3086 }
3087
86db0618
AG
3088 rcu_read_unlock();
3089
82bc0194 3090 return 0;
b411b363 3091
44ed167d
PR
3092disconnect_rcu_unlock:
3093 rcu_read_unlock();
b411b363 3094disconnect:
7204624c 3095 conn_request_state(tconn, NS(conn, C_DISCONNECTING), CS_HARD);
82bc0194 3096 return -EIO;
b411b363
PR
3097}
3098
3099/* helper function
3100 * input: alg name, feature name
3101 * return: NULL (alg name was "")
3102 * ERR_PTR(error) if something goes wrong
3103 * or the crypto hash ptr, if it worked out ok. */
3104struct crypto_hash *drbd_crypto_alloc_digest_safe(const struct drbd_conf *mdev,
3105 const char *alg, const char *name)
3106{
3107 struct crypto_hash *tfm;
3108
3109 if (!alg[0])
3110 return NULL;
3111
3112 tfm = crypto_alloc_hash(alg, 0, CRYPTO_ALG_ASYNC);
3113 if (IS_ERR(tfm)) {
3114 dev_err(DEV, "Can not allocate \"%s\" as %s (reason: %ld)\n",
3115 alg, name, PTR_ERR(tfm));
3116 return tfm;
3117 }
b411b363
PR
3118 return tfm;
3119}
3120
4a76b161
AG
3121static int ignore_remaining_packet(struct drbd_tconn *tconn, struct packet_info *pi)
3122{
3123 void *buffer = tconn->data.rbuf;
3124 int size = pi->size;
3125
3126 while (size) {
3127 int s = min_t(int, size, DRBD_SOCKET_BUFFER_SIZE);
3128 s = drbd_recv(tconn, buffer, s);
3129 if (s <= 0) {
3130 if (s < 0)
3131 return s;
3132 break;
3133 }
3134 size -= s;
3135 }
3136 if (size)
3137 return -EIO;
3138 return 0;
3139}
3140
3141/*
3142 * config_unknown_volume - device configuration command for unknown volume
3143 *
3144 * When a device is added to an existing connection, the node on which the
3145 * device is added first will send configuration commands to its peer but the
3146 * peer will not know about the device yet. It will warn and ignore these
3147 * commands. Once the device is added on the second node, the second node will
3148 * send the same device configuration commands, but in the other direction.
3149 *
3150 * (We can also end up here if drbd is misconfigured.)
3151 */
3152static int config_unknown_volume(struct drbd_tconn *tconn, struct packet_info *pi)
3153{
3154 conn_warn(tconn, "Volume %u unknown; ignoring %s packet\n",
3155 pi->vnr, cmdname(pi->cmd));
3156 return ignore_remaining_packet(tconn, pi);
3157}
3158
3159static int receive_SyncParam(struct drbd_tconn *tconn, struct packet_info *pi)
b411b363 3160{
4a76b161 3161 struct drbd_conf *mdev;
e658983a 3162 struct p_rs_param_95 *p;
b411b363
PR
3163 unsigned int header_size, data_size, exp_max_sz;
3164 struct crypto_hash *verify_tfm = NULL;
3165 struct crypto_hash *csums_tfm = NULL;
2ec91e0e 3166 struct net_conf *old_net_conf, *new_net_conf = NULL;
813472ce 3167 struct disk_conf *old_disk_conf = NULL, *new_disk_conf = NULL;
4a76b161 3168 const int apv = tconn->agreed_pro_version;
813472ce 3169 struct fifo_buffer *old_plan = NULL, *new_plan = NULL;
778f271d 3170 int fifo_size = 0;
82bc0194 3171 int err;
b411b363 3172
4a76b161
AG
3173 mdev = vnr_to_mdev(tconn, pi->vnr);
3174 if (!mdev)
3175 return config_unknown_volume(tconn, pi);
3176
b411b363
PR
3177 exp_max_sz = apv <= 87 ? sizeof(struct p_rs_param)
3178 : apv == 88 ? sizeof(struct p_rs_param)
3179 + SHARED_SECRET_MAX
8e26f9cc
PR
3180 : apv <= 94 ? sizeof(struct p_rs_param_89)
3181 : /* apv >= 95 */ sizeof(struct p_rs_param_95);
b411b363 3182
e2857216 3183 if (pi->size > exp_max_sz) {
b411b363 3184 dev_err(DEV, "SyncParam packet too long: received %u, expected <= %u bytes\n",
e2857216 3185 pi->size, exp_max_sz);
82bc0194 3186 return -EIO;
b411b363
PR
3187 }
3188
3189 if (apv <= 88) {
e658983a 3190 header_size = sizeof(struct p_rs_param);
e2857216 3191 data_size = pi->size - header_size;
8e26f9cc 3192 } else if (apv <= 94) {
e658983a 3193 header_size = sizeof(struct p_rs_param_89);
e2857216 3194 data_size = pi->size - header_size;
b411b363 3195 D_ASSERT(data_size == 0);
8e26f9cc 3196 } else {
e658983a 3197 header_size = sizeof(struct p_rs_param_95);
e2857216 3198 data_size = pi->size - header_size;
b411b363
PR
3199 D_ASSERT(data_size == 0);
3200 }
3201
3202 /* initialize verify_alg and csums_alg */
e658983a 3203 p = pi->data;
b411b363
PR
3204 memset(p->verify_alg, 0, 2 * SHARED_SECRET_MAX);
3205
e658983a 3206 err = drbd_recv_all(mdev->tconn, p, header_size);
82bc0194
AG
3207 if (err)
3208 return err;
b411b363 3209
daeda1cc
PR
3210 mutex_lock(&mdev->tconn->conf_update);
3211 old_net_conf = mdev->tconn->net_conf;
813472ce
PR
3212 if (get_ldev(mdev)) {
3213 new_disk_conf = kzalloc(sizeof(struct disk_conf), GFP_KERNEL);
3214 if (!new_disk_conf) {
3215 put_ldev(mdev);
3216 mutex_unlock(&mdev->tconn->conf_update);
3217 dev_err(DEV, "Allocation of new disk_conf failed\n");
3218 return -ENOMEM;
3219 }
daeda1cc 3220
813472ce
PR
3221 old_disk_conf = mdev->ldev->disk_conf;
3222 *new_disk_conf = *old_disk_conf;
3223
6394b935 3224 new_disk_conf->resync_rate = be32_to_cpu(p->resync_rate);
813472ce 3225 }
daeda1cc 3226
b411b363
PR
3227 if (apv >= 88) {
3228 if (apv == 88) {
3229 if (data_size > SHARED_SECRET_MAX) {
3230 dev_err(DEV, "verify-alg too long, "
3231 "peer wants %u, accepting only %u byte\n",
3232 data_size, SHARED_SECRET_MAX);
813472ce
PR
3233 err = -EIO;
3234 goto reconnect;
b411b363
PR
3235 }
3236
82bc0194 3237 err = drbd_recv_all(mdev->tconn, p->verify_alg, data_size);
813472ce
PR
3238 if (err)
3239 goto reconnect;
b411b363
PR
3240 /* we expect NUL terminated string */
3241 /* but just in case someone tries to be evil */
3242 D_ASSERT(p->verify_alg[data_size-1] == 0);
3243 p->verify_alg[data_size-1] = 0;
3244
3245 } else /* apv >= 89 */ {
3246 /* we still expect NUL terminated strings */
3247 /* but just in case someone tries to be evil */
3248 D_ASSERT(p->verify_alg[SHARED_SECRET_MAX-1] == 0);
3249 D_ASSERT(p->csums_alg[SHARED_SECRET_MAX-1] == 0);
3250 p->verify_alg[SHARED_SECRET_MAX-1] = 0;
3251 p->csums_alg[SHARED_SECRET_MAX-1] = 0;
3252 }
3253
2ec91e0e 3254 if (strcmp(old_net_conf->verify_alg, p->verify_alg)) {
b411b363
PR
3255 if (mdev->state.conn == C_WF_REPORT_PARAMS) {
3256 dev_err(DEV, "Different verify-alg settings. me=\"%s\" peer=\"%s\"\n",
2ec91e0e 3257 old_net_conf->verify_alg, p->verify_alg);
b411b363
PR
3258 goto disconnect;
3259 }
3260 verify_tfm = drbd_crypto_alloc_digest_safe(mdev,
3261 p->verify_alg, "verify-alg");
3262 if (IS_ERR(verify_tfm)) {
3263 verify_tfm = NULL;
3264 goto disconnect;
3265 }
3266 }
3267
2ec91e0e 3268 if (apv >= 89 && strcmp(old_net_conf->csums_alg, p->csums_alg)) {
b411b363
PR
3269 if (mdev->state.conn == C_WF_REPORT_PARAMS) {
3270 dev_err(DEV, "Different csums-alg settings. me=\"%s\" peer=\"%s\"\n",
2ec91e0e 3271 old_net_conf->csums_alg, p->csums_alg);
b411b363
PR
3272 goto disconnect;
3273 }
3274 csums_tfm = drbd_crypto_alloc_digest_safe(mdev,
3275 p->csums_alg, "csums-alg");
3276 if (IS_ERR(csums_tfm)) {
3277 csums_tfm = NULL;
3278 goto disconnect;
3279 }
3280 }
3281
813472ce 3282 if (apv > 94 && new_disk_conf) {
daeda1cc
PR
3283 new_disk_conf->c_plan_ahead = be32_to_cpu(p->c_plan_ahead);
3284 new_disk_conf->c_delay_target = be32_to_cpu(p->c_delay_target);
3285 new_disk_conf->c_fill_target = be32_to_cpu(p->c_fill_target);
3286 new_disk_conf->c_max_rate = be32_to_cpu(p->c_max_rate);
778f271d 3287
daeda1cc 3288 fifo_size = (new_disk_conf->c_plan_ahead * 10 * SLEEP_TIME) / HZ;
9958c857 3289 if (fifo_size != mdev->rs_plan_s->size) {
813472ce
PR
3290 new_plan = fifo_alloc(fifo_size);
3291 if (!new_plan) {
778f271d 3292 dev_err(DEV, "kmalloc of fifo_buffer failed");
f399002e 3293 put_ldev(mdev);
778f271d
PR
3294 goto disconnect;
3295 }
3296 }
8e26f9cc 3297 }
b411b363 3298
91fd4dad 3299 if (verify_tfm || csums_tfm) {
2ec91e0e
PR
3300 new_net_conf = kzalloc(sizeof(struct net_conf), GFP_KERNEL);
3301 if (!new_net_conf) {
91fd4dad
PR
3302 dev_err(DEV, "Allocation of new net_conf failed\n");
3303 goto disconnect;
3304 }
3305
2ec91e0e 3306 *new_net_conf = *old_net_conf;
91fd4dad
PR
3307
3308 if (verify_tfm) {
2ec91e0e
PR
3309 strcpy(new_net_conf->verify_alg, p->verify_alg);
3310 new_net_conf->verify_alg_len = strlen(p->verify_alg) + 1;
91fd4dad
PR
3311 crypto_free_hash(mdev->tconn->verify_tfm);
3312 mdev->tconn->verify_tfm = verify_tfm;
3313 dev_info(DEV, "using verify-alg: \"%s\"\n", p->verify_alg);
3314 }
3315 if (csums_tfm) {
2ec91e0e
PR
3316 strcpy(new_net_conf->csums_alg, p->csums_alg);
3317 new_net_conf->csums_alg_len = strlen(p->csums_alg) + 1;
91fd4dad
PR
3318 crypto_free_hash(mdev->tconn->csums_tfm);
3319 mdev->tconn->csums_tfm = csums_tfm;
3320 dev_info(DEV, "using csums-alg: \"%s\"\n", p->csums_alg);
3321 }
2ec91e0e 3322 rcu_assign_pointer(tconn->net_conf, new_net_conf);
b411b363 3323 }
daeda1cc 3324 }
91fd4dad 3325
813472ce
PR
3326 if (new_disk_conf) {
3327 rcu_assign_pointer(mdev->ldev->disk_conf, new_disk_conf);
3328 put_ldev(mdev);
3329 }
3330
3331 if (new_plan) {
3332 old_plan = mdev->rs_plan_s;
3333 rcu_assign_pointer(mdev->rs_plan_s, new_plan);
b411b363 3334 }
daeda1cc
PR
3335
3336 mutex_unlock(&mdev->tconn->conf_update);
3337 synchronize_rcu();
3338 if (new_net_conf)
3339 kfree(old_net_conf);
3340 kfree(old_disk_conf);
813472ce 3341 kfree(old_plan);
daeda1cc 3342
82bc0194 3343 return 0;
b411b363 3344
813472ce
PR
3345reconnect:
3346 if (new_disk_conf) {
3347 put_ldev(mdev);
3348 kfree(new_disk_conf);
3349 }
3350 mutex_unlock(&mdev->tconn->conf_update);
3351 return -EIO;
3352
b411b363 3353disconnect:
813472ce
PR
3354 kfree(new_plan);
3355 if (new_disk_conf) {
3356 put_ldev(mdev);
3357 kfree(new_disk_conf);
3358 }
a0095508 3359 mutex_unlock(&mdev->tconn->conf_update);
b411b363
PR
3360 /* just for completeness: actually not needed,
3361 * as this is not reached if csums_tfm was ok. */
3362 crypto_free_hash(csums_tfm);
3363 /* but free the verify_tfm again, if csums_tfm did not work out */
3364 crypto_free_hash(verify_tfm);
38fa9988 3365 conn_request_state(mdev->tconn, NS(conn, C_DISCONNECTING), CS_HARD);
82bc0194 3366 return -EIO;
b411b363
PR
3367}
3368
b411b363
PR
3369/* warn if the arguments differ by more than 12.5% */
3370static void warn_if_differ_considerably(struct drbd_conf *mdev,
3371 const char *s, sector_t a, sector_t b)
3372{
3373 sector_t d;
3374 if (a == 0 || b == 0)
3375 return;
3376 d = (a > b) ? (a - b) : (b - a);
3377 if (d > (a>>3) || d > (b>>3))
3378 dev_warn(DEV, "Considerable difference in %s: %llus vs. %llus\n", s,
3379 (unsigned long long)a, (unsigned long long)b);
3380}
3381
4a76b161 3382static int receive_sizes(struct drbd_tconn *tconn, struct packet_info *pi)
b411b363 3383{
4a76b161 3384 struct drbd_conf *mdev;
e658983a 3385 struct p_sizes *p = pi->data;
b411b363 3386 enum determine_dev_size dd = unchanged;
b411b363
PR
3387 sector_t p_size, p_usize, my_usize;
3388 int ldsc = 0; /* local disk size changed */
e89b591c 3389 enum dds_flags ddsf;
b411b363 3390
4a76b161
AG
3391 mdev = vnr_to_mdev(tconn, pi->vnr);
3392 if (!mdev)
3393 return config_unknown_volume(tconn, pi);
3394
b411b363
PR
3395 p_size = be64_to_cpu(p->d_size);
3396 p_usize = be64_to_cpu(p->u_size);
3397
b411b363
PR
3398 /* just store the peer's disk size for now.
3399 * we still need to figure out whether we accept that. */
3400 mdev->p_size = p_size;
3401
b411b363 3402 if (get_ldev(mdev)) {
daeda1cc
PR
3403 rcu_read_lock();
3404 my_usize = rcu_dereference(mdev->ldev->disk_conf)->disk_size;
3405 rcu_read_unlock();
3406
b411b363
PR
3407 warn_if_differ_considerably(mdev, "lower level device sizes",
3408 p_size, drbd_get_max_capacity(mdev->ldev));
3409 warn_if_differ_considerably(mdev, "user requested size",
daeda1cc 3410 p_usize, my_usize);
b411b363
PR
3411
3412 /* if this is the first connect, or an otherwise expected
3413 * param exchange, choose the minimum */
3414 if (mdev->state.conn == C_WF_REPORT_PARAMS)
daeda1cc 3415 p_usize = min_not_zero(my_usize, p_usize);
b411b363
PR
3416
3417 /* Never shrink a device with usable data during connect.
3418 But allow online shrinking if we are connected. */
ef5e44a6 3419 if (drbd_new_dev_size(mdev, mdev->ldev, p_usize, 0) <
daeda1cc
PR
3420 drbd_get_capacity(mdev->this_bdev) &&
3421 mdev->state.disk >= D_OUTDATED &&
3422 mdev->state.conn < C_CONNECTED) {
b411b363 3423 dev_err(DEV, "The peer's disk size is too small!\n");
38fa9988 3424 conn_request_state(mdev->tconn, NS(conn, C_DISCONNECTING), CS_HARD);
b411b363 3425 put_ldev(mdev);
82bc0194 3426 return -EIO;
b411b363 3427 }
daeda1cc
PR
3428
3429 if (my_usize != p_usize) {
3430 struct disk_conf *old_disk_conf, *new_disk_conf = NULL;
3431
3432 new_disk_conf = kzalloc(sizeof(struct disk_conf), GFP_KERNEL);
3433 if (!new_disk_conf) {
3434 dev_err(DEV, "Allocation of new disk_conf failed\n");
3435 put_ldev(mdev);
3436 return -ENOMEM;
3437 }
3438
3439 mutex_lock(&mdev->tconn->conf_update);
3440 old_disk_conf = mdev->ldev->disk_conf;
3441 *new_disk_conf = *old_disk_conf;
3442 new_disk_conf->disk_size = p_usize;
3443
3444 rcu_assign_pointer(mdev->ldev->disk_conf, new_disk_conf);
3445 mutex_unlock(&mdev->tconn->conf_update);
3446 synchronize_rcu();
3447 kfree(old_disk_conf);
3448
3449 dev_info(DEV, "Peer sets u_size to %lu sectors\n",
3450 (unsigned long)my_usize);
3451 }
3452
b411b363
PR
3453 put_ldev(mdev);
3454 }
b411b363 3455
e89b591c 3456 ddsf = be16_to_cpu(p->dds_flags);
b411b363 3457 if (get_ldev(mdev)) {
24c4830c 3458 dd = drbd_determine_dev_size(mdev, ddsf);
b411b363
PR
3459 put_ldev(mdev);
3460 if (dd == dev_size_error)
82bc0194 3461 return -EIO;
b411b363
PR
3462 drbd_md_sync(mdev);
3463 } else {
3464 /* I am diskless, need to accept the peer's size. */
3465 drbd_set_my_capacity(mdev, p_size);
3466 }
3467
99432fcc
PR
3468 mdev->peer_max_bio_size = be32_to_cpu(p->max_bio_size);
3469 drbd_reconsider_max_bio_size(mdev);
3470
b411b363
PR
3471 if (get_ldev(mdev)) {
3472 if (mdev->ldev->known_size != drbd_get_capacity(mdev->ldev->backing_bdev)) {
3473 mdev->ldev->known_size = drbd_get_capacity(mdev->ldev->backing_bdev);
3474 ldsc = 1;
3475 }
3476
b411b363
PR
3477 put_ldev(mdev);
3478 }
3479
3480 if (mdev->state.conn > C_WF_REPORT_PARAMS) {
3481 if (be64_to_cpu(p->c_size) !=
3482 drbd_get_capacity(mdev->this_bdev) || ldsc) {
3483 /* we have different sizes, probably peer
3484 * needs to know my new size... */
e89b591c 3485 drbd_send_sizes(mdev, 0, ddsf);
b411b363
PR
3486 }
3487 if (test_and_clear_bit(RESIZE_PENDING, &mdev->flags) ||
3488 (dd == grew && mdev->state.conn == C_CONNECTED)) {
3489 if (mdev->state.pdsk >= D_INCONSISTENT &&
e89b591c
PR
3490 mdev->state.disk >= D_INCONSISTENT) {
3491 if (ddsf & DDSF_NO_RESYNC)
3492 dev_info(DEV, "Resync of new storage suppressed with --assume-clean\n");
3493 else
3494 resync_after_online_grow(mdev);
3495 } else
b411b363
PR
3496 set_bit(RESYNC_AFTER_NEG, &mdev->flags);
3497 }
3498 }
3499
82bc0194 3500 return 0;
b411b363
PR
3501}
3502
4a76b161 3503static int receive_uuids(struct drbd_tconn *tconn, struct packet_info *pi)
b411b363 3504{
4a76b161 3505 struct drbd_conf *mdev;
e658983a 3506 struct p_uuids *p = pi->data;
b411b363 3507 u64 *p_uuid;
62b0da3a 3508 int i, updated_uuids = 0;
b411b363 3509
4a76b161
AG
3510 mdev = vnr_to_mdev(tconn, pi->vnr);
3511 if (!mdev)
3512 return config_unknown_volume(tconn, pi);
3513
b411b363
PR
3514 p_uuid = kmalloc(sizeof(u64)*UI_EXTENDED_SIZE, GFP_NOIO);
3515
3516 for (i = UI_CURRENT; i < UI_EXTENDED_SIZE; i++)
3517 p_uuid[i] = be64_to_cpu(p->uuid[i]);
3518
3519 kfree(mdev->p_uuid);
3520 mdev->p_uuid = p_uuid;
3521
3522 if (mdev->state.conn < C_CONNECTED &&
3523 mdev->state.disk < D_INCONSISTENT &&
3524 mdev->state.role == R_PRIMARY &&
3525 (mdev->ed_uuid & ~((u64)1)) != (p_uuid[UI_CURRENT] & ~((u64)1))) {
3526 dev_err(DEV, "Can only connect to data with current UUID=%016llX\n",
3527 (unsigned long long)mdev->ed_uuid);
38fa9988 3528 conn_request_state(mdev->tconn, NS(conn, C_DISCONNECTING), CS_HARD);
82bc0194 3529 return -EIO;
b411b363
PR
3530 }
3531
3532 if (get_ldev(mdev)) {
3533 int skip_initial_sync =
3534 mdev->state.conn == C_CONNECTED &&
31890f4a 3535 mdev->tconn->agreed_pro_version >= 90 &&
b411b363
PR
3536 mdev->ldev->md.uuid[UI_CURRENT] == UUID_JUST_CREATED &&
3537 (p_uuid[UI_FLAGS] & 8);
3538 if (skip_initial_sync) {
3539 dev_info(DEV, "Accepted new current UUID, preparing to skip initial sync\n");
3540 drbd_bitmap_io(mdev, &drbd_bmio_clear_n_write,
20ceb2b2
LE
3541 "clear_n_write from receive_uuids",
3542 BM_LOCKED_TEST_ALLOWED);
b411b363
PR
3543 _drbd_uuid_set(mdev, UI_CURRENT, p_uuid[UI_CURRENT]);
3544 _drbd_uuid_set(mdev, UI_BITMAP, 0);
3545 _drbd_set_state(_NS2(mdev, disk, D_UP_TO_DATE, pdsk, D_UP_TO_DATE),
3546 CS_VERBOSE, NULL);
3547 drbd_md_sync(mdev);
62b0da3a 3548 updated_uuids = 1;
b411b363
PR
3549 }
3550 put_ldev(mdev);
18a50fa2
PR
3551 } else if (mdev->state.disk < D_INCONSISTENT &&
3552 mdev->state.role == R_PRIMARY) {
3553 /* I am a diskless primary, the peer just created a new current UUID
3554 for me. */
62b0da3a 3555 updated_uuids = drbd_set_ed_uuid(mdev, p_uuid[UI_CURRENT]);
b411b363
PR
3556 }
3557
3558 /* Before we test for the disk state, we should wait until an eventually
3559 ongoing cluster wide state change is finished. That is important if
3560 we are primary and are detaching from our disk. We need to see the
3561 new disk state... */
8410da8f
PR
3562 mutex_lock(mdev->state_mutex);
3563 mutex_unlock(mdev->state_mutex);
b411b363 3564 if (mdev->state.conn >= C_CONNECTED && mdev->state.disk < D_INCONSISTENT)
62b0da3a
LE
3565 updated_uuids |= drbd_set_ed_uuid(mdev, p_uuid[UI_CURRENT]);
3566
3567 if (updated_uuids)
3568 drbd_print_uuids(mdev, "receiver updated UUIDs to");
b411b363 3569
82bc0194 3570 return 0;
b411b363
PR
3571}
3572
3573/**
3574 * convert_state() - Converts the peer's view of the cluster state to our point of view
3575 * @ps: The state as seen by the peer.
3576 */
3577static union drbd_state convert_state(union drbd_state ps)
3578{
3579 union drbd_state ms;
3580
3581 static enum drbd_conns c_tab[] = {
3582 [C_CONNECTED] = C_CONNECTED,
3583
3584 [C_STARTING_SYNC_S] = C_STARTING_SYNC_T,
3585 [C_STARTING_SYNC_T] = C_STARTING_SYNC_S,
3586 [C_DISCONNECTING] = C_TEAR_DOWN, /* C_NETWORK_FAILURE, */
3587 [C_VERIFY_S] = C_VERIFY_T,
3588 [C_MASK] = C_MASK,
3589 };
3590
3591 ms.i = ps.i;
3592
3593 ms.conn = c_tab[ps.conn];
3594 ms.peer = ps.role;
3595 ms.role = ps.peer;
3596 ms.pdsk = ps.disk;
3597 ms.disk = ps.pdsk;
3598 ms.peer_isp = (ps.aftr_isp | ps.user_isp);
3599
3600 return ms;
3601}
3602
4a76b161 3603static int receive_req_state(struct drbd_tconn *tconn, struct packet_info *pi)
b411b363 3604{
4a76b161 3605 struct drbd_conf *mdev;
e658983a 3606 struct p_req_state *p = pi->data;
b411b363 3607 union drbd_state mask, val;
bf885f8a 3608 enum drbd_state_rv rv;
b411b363 3609
4a76b161
AG
3610 mdev = vnr_to_mdev(tconn, pi->vnr);
3611 if (!mdev)
3612 return -EIO;
3613
b411b363
PR
3614 mask.i = be32_to_cpu(p->mask);
3615 val.i = be32_to_cpu(p->val);
3616
25703f83 3617 if (test_bit(DISCARD_CONCURRENT, &mdev->tconn->flags) &&
8410da8f 3618 mutex_is_locked(mdev->state_mutex)) {
b411b363 3619 drbd_send_sr_reply(mdev, SS_CONCURRENT_ST_CHG);
82bc0194 3620 return 0;
b411b363
PR
3621 }
3622
3623 mask = convert_state(mask);
3624 val = convert_state(val);
3625
dfafcc8a
PR
3626 rv = drbd_change_state(mdev, CS_VERBOSE, mask, val);
3627 drbd_send_sr_reply(mdev, rv);
b411b363 3628
b411b363
PR
3629 drbd_md_sync(mdev);
3630
82bc0194 3631 return 0;
b411b363
PR
3632}
3633
e2857216 3634static int receive_req_conn_state(struct drbd_tconn *tconn, struct packet_info *pi)
dfafcc8a 3635{
e658983a 3636 struct p_req_state *p = pi->data;
dfafcc8a
PR
3637 union drbd_state mask, val;
3638 enum drbd_state_rv rv;
3639
3640 mask.i = be32_to_cpu(p->mask);
3641 val.i = be32_to_cpu(p->val);
3642
3643 if (test_bit(DISCARD_CONCURRENT, &tconn->flags) &&
3644 mutex_is_locked(&tconn->cstate_mutex)) {
3645 conn_send_sr_reply(tconn, SS_CONCURRENT_ST_CHG);
82bc0194 3646 return 0;
dfafcc8a
PR
3647 }
3648
3649 mask = convert_state(mask);
3650 val = convert_state(val);
3651
778bcf2e 3652 rv = conn_request_state(tconn, mask, val, CS_VERBOSE | CS_LOCAL_ONLY | CS_IGN_OUTD_FAIL);
dfafcc8a
PR
3653 conn_send_sr_reply(tconn, rv);
3654
82bc0194 3655 return 0;
dfafcc8a
PR
3656}
3657
4a76b161 3658static int receive_state(struct drbd_tconn *tconn, struct packet_info *pi)
b411b363 3659{
4a76b161 3660 struct drbd_conf *mdev;
e658983a 3661 struct p_state *p = pi->data;
4ac4aada 3662 union drbd_state os, ns, peer_state;
b411b363 3663 enum drbd_disk_state real_peer_disk;
65d922c3 3664 enum chg_state_flags cs_flags;
b411b363
PR
3665 int rv;
3666
4a76b161
AG
3667 mdev = vnr_to_mdev(tconn, pi->vnr);
3668 if (!mdev)
3669 return config_unknown_volume(tconn, pi);
3670
b411b363
PR
3671 peer_state.i = be32_to_cpu(p->state);
3672
3673 real_peer_disk = peer_state.disk;
3674 if (peer_state.disk == D_NEGOTIATING) {
3675 real_peer_disk = mdev->p_uuid[UI_FLAGS] & 4 ? D_INCONSISTENT : D_CONSISTENT;
3676 dev_info(DEV, "real peer disk state = %s\n", drbd_disk_str(real_peer_disk));
3677 }
3678
87eeee41 3679 spin_lock_irq(&mdev->tconn->req_lock);
b411b363 3680 retry:
78bae59b 3681 os = ns = drbd_read_state(mdev);
87eeee41 3682 spin_unlock_irq(&mdev->tconn->req_lock);
b411b363 3683
e9ef7bb6
LE
3684 /* peer says his disk is uptodate, while we think it is inconsistent,
3685 * and this happens while we think we have a sync going on. */
3686 if (os.pdsk == D_INCONSISTENT && real_peer_disk == D_UP_TO_DATE &&
3687 os.conn > C_CONNECTED && os.disk == D_UP_TO_DATE) {
3688 /* If we are (becoming) SyncSource, but peer is still in sync
3689 * preparation, ignore its uptodate-ness to avoid flapping, it
3690 * will change to inconsistent once the peer reaches active
3691 * syncing states.
3692 * It may have changed syncer-paused flags, however, so we
3693 * cannot ignore this completely. */
3694 if (peer_state.conn > C_CONNECTED &&
3695 peer_state.conn < C_SYNC_SOURCE)
3696 real_peer_disk = D_INCONSISTENT;
3697
3698 /* if peer_state changes to connected at the same time,
3699 * it explicitly notifies us that it finished resync.
3700 * Maybe we should finish it up, too? */
3701 else if (os.conn >= C_SYNC_SOURCE &&
3702 peer_state.conn == C_CONNECTED) {
3703 if (drbd_bm_total_weight(mdev) <= mdev->rs_failed)
3704 drbd_resync_finished(mdev);
82bc0194 3705 return 0;
e9ef7bb6
LE
3706 }
3707 }
3708
3709 /* peer says his disk is inconsistent, while we think it is uptodate,
3710 * and this happens while the peer still thinks we have a sync going on,
3711 * but we think we are already done with the sync.
3712 * We ignore this to avoid flapping pdsk.
3713 * This should not happen, if the peer is a recent version of drbd. */
3714 if (os.pdsk == D_UP_TO_DATE && real_peer_disk == D_INCONSISTENT &&
3715 os.conn == C_CONNECTED && peer_state.conn > C_SYNC_SOURCE)
3716 real_peer_disk = D_UP_TO_DATE;
3717
4ac4aada
LE
3718 if (ns.conn == C_WF_REPORT_PARAMS)
3719 ns.conn = C_CONNECTED;
b411b363 3720
67531718
PR
3721 if (peer_state.conn == C_AHEAD)
3722 ns.conn = C_BEHIND;
3723
b411b363
PR
3724 if (mdev->p_uuid && peer_state.disk >= D_NEGOTIATING &&
3725 get_ldev_if_state(mdev, D_NEGOTIATING)) {
3726 int cr; /* consider resync */
3727
3728 /* if we established a new connection */
4ac4aada 3729 cr = (os.conn < C_CONNECTED);
b411b363
PR
3730 /* if we had an established connection
3731 * and one of the nodes newly attaches a disk */
4ac4aada 3732 cr |= (os.conn == C_CONNECTED &&
b411b363 3733 (peer_state.disk == D_NEGOTIATING ||
4ac4aada 3734 os.disk == D_NEGOTIATING));
b411b363
PR
3735 /* if we have both been inconsistent, and the peer has been
3736 * forced to be UpToDate with --overwrite-data */
3737 cr |= test_bit(CONSIDER_RESYNC, &mdev->flags);
3738 /* if we had been plain connected, and the admin requested to
3739 * start a sync by "invalidate" or "invalidate-remote" */
4ac4aada 3740 cr |= (os.conn == C_CONNECTED &&
b411b363
PR
3741 (peer_state.conn >= C_STARTING_SYNC_S &&
3742 peer_state.conn <= C_WF_BITMAP_T));
3743
3744 if (cr)
4ac4aada 3745 ns.conn = drbd_sync_handshake(mdev, peer_state.role, real_peer_disk);
b411b363
PR
3746
3747 put_ldev(mdev);
4ac4aada
LE
3748 if (ns.conn == C_MASK) {
3749 ns.conn = C_CONNECTED;
b411b363 3750 if (mdev->state.disk == D_NEGOTIATING) {
82f59cc6 3751 drbd_force_state(mdev, NS(disk, D_FAILED));
b411b363
PR
3752 } else if (peer_state.disk == D_NEGOTIATING) {
3753 dev_err(DEV, "Disk attach process on the peer node was aborted.\n");
3754 peer_state.disk = D_DISKLESS;
580b9767 3755 real_peer_disk = D_DISKLESS;
b411b363 3756 } else {
8169e41b 3757 if (test_and_clear_bit(CONN_DRY_RUN, &mdev->tconn->flags))
82bc0194 3758 return -EIO;
4ac4aada 3759 D_ASSERT(os.conn == C_WF_REPORT_PARAMS);
38fa9988 3760 conn_request_state(mdev->tconn, NS(conn, C_DISCONNECTING), CS_HARD);
82bc0194 3761 return -EIO;
b411b363
PR
3762 }
3763 }
3764 }
3765
87eeee41 3766 spin_lock_irq(&mdev->tconn->req_lock);
78bae59b 3767 if (os.i != drbd_read_state(mdev).i)
b411b363
PR
3768 goto retry;
3769 clear_bit(CONSIDER_RESYNC, &mdev->flags);
b411b363
PR
3770 ns.peer = peer_state.role;
3771 ns.pdsk = real_peer_disk;
3772 ns.peer_isp = (peer_state.aftr_isp | peer_state.user_isp);
4ac4aada 3773 if ((ns.conn == C_CONNECTED || ns.conn == C_WF_BITMAP_S) && ns.disk == D_NEGOTIATING)
b411b363 3774 ns.disk = mdev->new_state_tmp.disk;
4ac4aada 3775 cs_flags = CS_VERBOSE + (os.conn < C_CONNECTED && ns.conn >= C_CONNECTED ? 0 : CS_HARD);
2aebfabb 3776 if (ns.pdsk == D_CONSISTENT && drbd_suspended(mdev) && ns.conn == C_CONNECTED && os.conn < C_CONNECTED &&
481c6f50 3777 test_bit(NEW_CUR_UUID, &mdev->flags)) {
8554df1c 3778 /* Do not allow tl_restart(RESEND) for a rebooted peer. We can only allow this
481c6f50 3779 for temporal network outages! */
87eeee41 3780 spin_unlock_irq(&mdev->tconn->req_lock);
481c6f50 3781 dev_err(DEV, "Aborting Connect, can not thaw IO with an only Consistent peer\n");
2f5cdd0b 3782 tl_clear(mdev->tconn);
481c6f50
PR
3783 drbd_uuid_new_current(mdev);
3784 clear_bit(NEW_CUR_UUID, &mdev->flags);
38fa9988 3785 conn_request_state(mdev->tconn, NS2(conn, C_PROTOCOL_ERROR, susp, 0), CS_HARD);
82bc0194 3786 return -EIO;
481c6f50 3787 }
65d922c3 3788 rv = _drbd_set_state(mdev, ns, cs_flags, NULL);
78bae59b 3789 ns = drbd_read_state(mdev);
87eeee41 3790 spin_unlock_irq(&mdev->tconn->req_lock);
b411b363
PR
3791
3792 if (rv < SS_SUCCESS) {
38fa9988 3793 conn_request_state(mdev->tconn, NS(conn, C_DISCONNECTING), CS_HARD);
82bc0194 3794 return -EIO;
b411b363
PR
3795 }
3796
4ac4aada
LE
3797 if (os.conn > C_WF_REPORT_PARAMS) {
3798 if (ns.conn > C_CONNECTED && peer_state.conn <= C_CONNECTED &&
b411b363
PR
3799 peer_state.disk != D_NEGOTIATING ) {
3800 /* we want resync, peer has not yet decided to sync... */
3801 /* Nowadays only used when forcing a node into primary role and
3802 setting its disk to UpToDate with that */
3803 drbd_send_uuids(mdev);
3804 drbd_send_state(mdev);
3805 }
3806 }
3807
a0095508 3808 mutex_lock(&mdev->tconn->conf_update);
6139f60d 3809 mdev->tconn->net_conf->discard_my_data = 0; /* without copy; single bit op is atomic */
a0095508 3810 mutex_unlock(&mdev->tconn->conf_update);
b411b363
PR
3811
3812 drbd_md_sync(mdev); /* update connected indicator, la_size, ... */
3813
82bc0194 3814 return 0;
b411b363
PR
3815}
3816
4a76b161 3817static int receive_sync_uuid(struct drbd_tconn *tconn, struct packet_info *pi)
b411b363 3818{
4a76b161 3819 struct drbd_conf *mdev;
e658983a 3820 struct p_rs_uuid *p = pi->data;
4a76b161
AG
3821
3822 mdev = vnr_to_mdev(tconn, pi->vnr);
3823 if (!mdev)
3824 return -EIO;
b411b363
PR
3825
3826 wait_event(mdev->misc_wait,
3827 mdev->state.conn == C_WF_SYNC_UUID ||
c4752ef1 3828 mdev->state.conn == C_BEHIND ||
b411b363
PR
3829 mdev->state.conn < C_CONNECTED ||
3830 mdev->state.disk < D_NEGOTIATING);
3831
3832 /* D_ASSERT( mdev->state.conn == C_WF_SYNC_UUID ); */
3833
b411b363
PR
3834 /* Here the _drbd_uuid_ functions are right, current should
3835 _not_ be rotated into the history */
3836 if (get_ldev_if_state(mdev, D_NEGOTIATING)) {
3837 _drbd_uuid_set(mdev, UI_CURRENT, be64_to_cpu(p->uuid));
3838 _drbd_uuid_set(mdev, UI_BITMAP, 0UL);
3839
62b0da3a 3840 drbd_print_uuids(mdev, "updated sync uuid");
b411b363
PR
3841 drbd_start_resync(mdev, C_SYNC_TARGET);
3842
3843 put_ldev(mdev);
3844 } else
3845 dev_err(DEV, "Ignoring SyncUUID packet!\n");
3846
82bc0194 3847 return 0;
b411b363
PR
3848}
3849
2c46407d
AG
3850/**
3851 * receive_bitmap_plain
3852 *
3853 * Return 0 when done, 1 when another iteration is needed, and a negative error
3854 * code upon failure.
3855 */
3856static int
50d0b1ad 3857receive_bitmap_plain(struct drbd_conf *mdev, unsigned int size,
e658983a 3858 unsigned long *p, struct bm_xfer_ctx *c)
b411b363 3859{
50d0b1ad
AG
3860 unsigned int data_size = DRBD_SOCKET_BUFFER_SIZE -
3861 drbd_header_size(mdev->tconn);
e658983a 3862 unsigned int num_words = min_t(size_t, data_size / sizeof(*p),
50d0b1ad 3863 c->bm_words - c->word_offset);
e658983a 3864 unsigned int want = num_words * sizeof(*p);
2c46407d 3865 int err;
b411b363 3866
50d0b1ad
AG
3867 if (want != size) {
3868 dev_err(DEV, "%s:want (%u) != size (%u)\n", __func__, want, size);
2c46407d 3869 return -EIO;
b411b363
PR
3870 }
3871 if (want == 0)
2c46407d 3872 return 0;
e658983a 3873 err = drbd_recv_all(mdev->tconn, p, want);
82bc0194 3874 if (err)
2c46407d 3875 return err;
b411b363 3876
e658983a 3877 drbd_bm_merge_lel(mdev, c->word_offset, num_words, p);
b411b363
PR
3878
3879 c->word_offset += num_words;
3880 c->bit_offset = c->word_offset * BITS_PER_LONG;
3881 if (c->bit_offset > c->bm_bits)
3882 c->bit_offset = c->bm_bits;
3883
2c46407d 3884 return 1;
b411b363
PR
3885}
3886
a02d1240
AG
3887static enum drbd_bitmap_code dcbp_get_code(struct p_compressed_bm *p)
3888{
3889 return (enum drbd_bitmap_code)(p->encoding & 0x0f);
3890}
3891
3892static int dcbp_get_start(struct p_compressed_bm *p)
3893{
3894 return (p->encoding & 0x80) != 0;
3895}
3896
3897static int dcbp_get_pad_bits(struct p_compressed_bm *p)
3898{
3899 return (p->encoding >> 4) & 0x7;
3900}
3901
2c46407d
AG
3902/**
3903 * recv_bm_rle_bits
3904 *
3905 * Return 0 when done, 1 when another iteration is needed, and a negative error
3906 * code upon failure.
3907 */
3908static int
b411b363
PR
3909recv_bm_rle_bits(struct drbd_conf *mdev,
3910 struct p_compressed_bm *p,
c6d25cfe
PR
3911 struct bm_xfer_ctx *c,
3912 unsigned int len)
b411b363
PR
3913{
3914 struct bitstream bs;
3915 u64 look_ahead;
3916 u64 rl;
3917 u64 tmp;
3918 unsigned long s = c->bit_offset;
3919 unsigned long e;
a02d1240 3920 int toggle = dcbp_get_start(p);
b411b363
PR
3921 int have;
3922 int bits;
3923
a02d1240 3924 bitstream_init(&bs, p->code, len, dcbp_get_pad_bits(p));
b411b363
PR
3925
3926 bits = bitstream_get_bits(&bs, &look_ahead, 64);
3927 if (bits < 0)
2c46407d 3928 return -EIO;
b411b363
PR
3929
3930 for (have = bits; have > 0; s += rl, toggle = !toggle) {
3931 bits = vli_decode_bits(&rl, look_ahead);
3932 if (bits <= 0)
2c46407d 3933 return -EIO;
b411b363
PR
3934
3935 if (toggle) {
3936 e = s + rl -1;
3937 if (e >= c->bm_bits) {
3938 dev_err(DEV, "bitmap overflow (e:%lu) while decoding bm RLE packet\n", e);
2c46407d 3939 return -EIO;
b411b363
PR
3940 }
3941 _drbd_bm_set_bits(mdev, s, e);
3942 }
3943
3944 if (have < bits) {
3945 dev_err(DEV, "bitmap decoding error: h:%d b:%d la:0x%08llx l:%u/%u\n",
3946 have, bits, look_ahead,
3947 (unsigned int)(bs.cur.b - p->code),
3948 (unsigned int)bs.buf_len);
2c46407d 3949 return -EIO;
b411b363
PR
3950 }
3951 look_ahead >>= bits;
3952 have -= bits;
3953
3954 bits = bitstream_get_bits(&bs, &tmp, 64 - have);
3955 if (bits < 0)
2c46407d 3956 return -EIO;
b411b363
PR
3957 look_ahead |= tmp << have;
3958 have += bits;
3959 }
3960
3961 c->bit_offset = s;
3962 bm_xfer_ctx_bit_to_word_offset(c);
3963
2c46407d 3964 return (s != c->bm_bits);
b411b363
PR
3965}
3966
2c46407d
AG
3967/**
3968 * decode_bitmap_c
3969 *
3970 * Return 0 when done, 1 when another iteration is needed, and a negative error
3971 * code upon failure.
3972 */
3973static int
b411b363
PR
3974decode_bitmap_c(struct drbd_conf *mdev,
3975 struct p_compressed_bm *p,
c6d25cfe
PR
3976 struct bm_xfer_ctx *c,
3977 unsigned int len)
b411b363 3978{
a02d1240 3979 if (dcbp_get_code(p) == RLE_VLI_Bits)
e658983a 3980 return recv_bm_rle_bits(mdev, p, c, len - sizeof(*p));
b411b363
PR
3981
3982 /* other variants had been implemented for evaluation,
3983 * but have been dropped as this one turned out to be "best"
3984 * during all our tests. */
3985
3986 dev_err(DEV, "receive_bitmap_c: unknown encoding %u\n", p->encoding);
38fa9988 3987 conn_request_state(mdev->tconn, NS(conn, C_PROTOCOL_ERROR), CS_HARD);
2c46407d 3988 return -EIO;
b411b363
PR
3989}
3990
3991void INFO_bm_xfer_stats(struct drbd_conf *mdev,
3992 const char *direction, struct bm_xfer_ctx *c)
3993{
3994 /* what would it take to transfer it "plaintext" */
50d0b1ad
AG
3995 unsigned int header_size = drbd_header_size(mdev->tconn);
3996 unsigned int data_size = DRBD_SOCKET_BUFFER_SIZE - header_size;
3997 unsigned int plain =
3998 header_size * (DIV_ROUND_UP(c->bm_words, data_size) + 1) +
3999 c->bm_words * sizeof(unsigned long);
4000 unsigned int total = c->bytes[0] + c->bytes[1];
4001 unsigned int r;
b411b363
PR
4002
4003 /* total can not be zero. but just in case: */
4004 if (total == 0)
4005 return;
4006
4007 /* don't report if not compressed */
4008 if (total >= plain)
4009 return;
4010
4011 /* total < plain. check for overflow, still */
4012 r = (total > UINT_MAX/1000) ? (total / (plain/1000))
4013 : (1000 * total / plain);
4014
4015 if (r > 1000)
4016 r = 1000;
4017
4018 r = 1000 - r;
4019 dev_info(DEV, "%s bitmap stats [Bytes(packets)]: plain %u(%u), RLE %u(%u), "
4020 "total %u; compression: %u.%u%%\n",
4021 direction,
4022 c->bytes[1], c->packets[1],
4023 c->bytes[0], c->packets[0],
4024 total, r/10, r % 10);
4025}
4026
4027/* Since we are processing the bitfield from lower addresses to higher,
4028 it does not matter if the process it in 32 bit chunks or 64 bit
4029 chunks as long as it is little endian. (Understand it as byte stream,
4030 beginning with the lowest byte...) If we would use big endian
4031 we would need to process it from the highest address to the lowest,
4032 in order to be agnostic to the 32 vs 64 bits issue.
4033
4034 returns 0 on failure, 1 if we successfully received it. */
4a76b161 4035static int receive_bitmap(struct drbd_tconn *tconn, struct packet_info *pi)
b411b363 4036{
4a76b161 4037 struct drbd_conf *mdev;
b411b363 4038 struct bm_xfer_ctx c;
2c46407d 4039 int err;
4a76b161
AG
4040
4041 mdev = vnr_to_mdev(tconn, pi->vnr);
4042 if (!mdev)
4043 return -EIO;
b411b363 4044
20ceb2b2
LE
4045 drbd_bm_lock(mdev, "receive bitmap", BM_LOCKED_SET_ALLOWED);
4046 /* you are supposed to send additional out-of-sync information
4047 * if you actually set bits during this phase */
b411b363 4048
b411b363
PR
4049 c = (struct bm_xfer_ctx) {
4050 .bm_bits = drbd_bm_bits(mdev),
4051 .bm_words = drbd_bm_words(mdev),
4052 };
4053
2c46407d 4054 for(;;) {
e658983a
AG
4055 if (pi->cmd == P_BITMAP)
4056 err = receive_bitmap_plain(mdev, pi->size, pi->data, &c);
4057 else if (pi->cmd == P_COMPRESSED_BITMAP) {
b411b363
PR
4058 /* MAYBE: sanity check that we speak proto >= 90,
4059 * and the feature is enabled! */
e658983a 4060 struct p_compressed_bm *p = pi->data;
b411b363 4061
50d0b1ad 4062 if (pi->size > DRBD_SOCKET_BUFFER_SIZE - drbd_header_size(tconn)) {
b411b363 4063 dev_err(DEV, "ReportCBitmap packet too large\n");
82bc0194 4064 err = -EIO;
b411b363
PR
4065 goto out;
4066 }
e658983a 4067 if (pi->size <= sizeof(*p)) {
e2857216 4068 dev_err(DEV, "ReportCBitmap packet too small (l:%u)\n", pi->size);
82bc0194 4069 err = -EIO;
78fcbdae 4070 goto out;
b411b363 4071 }
e658983a
AG
4072 err = drbd_recv_all(mdev->tconn, p, pi->size);
4073 if (err)
4074 goto out;
e2857216 4075 err = decode_bitmap_c(mdev, p, &c, pi->size);
b411b363 4076 } else {
e2857216 4077 dev_warn(DEV, "receive_bitmap: cmd neither ReportBitMap nor ReportCBitMap (is 0x%x)", pi->cmd);
82bc0194 4078 err = -EIO;
b411b363
PR
4079 goto out;
4080 }
4081
e2857216 4082 c.packets[pi->cmd == P_BITMAP]++;
50d0b1ad 4083 c.bytes[pi->cmd == P_BITMAP] += drbd_header_size(tconn) + pi->size;
b411b363 4084
2c46407d
AG
4085 if (err <= 0) {
4086 if (err < 0)
4087 goto out;
b411b363 4088 break;
2c46407d 4089 }
e2857216 4090 err = drbd_recv_header(mdev->tconn, pi);
82bc0194 4091 if (err)
b411b363 4092 goto out;
2c46407d 4093 }
b411b363
PR
4094
4095 INFO_bm_xfer_stats(mdev, "receive", &c);
4096
4097 if (mdev->state.conn == C_WF_BITMAP_T) {
de1f8e4a
AG
4098 enum drbd_state_rv rv;
4099
82bc0194
AG
4100 err = drbd_send_bitmap(mdev);
4101 if (err)
b411b363
PR
4102 goto out;
4103 /* Omit CS_ORDERED with this state transition to avoid deadlocks. */
de1f8e4a
AG
4104 rv = _drbd_request_state(mdev, NS(conn, C_WF_SYNC_UUID), CS_VERBOSE);
4105 D_ASSERT(rv == SS_SUCCESS);
b411b363
PR
4106 } else if (mdev->state.conn != C_WF_BITMAP_S) {
4107 /* admin may have requested C_DISCONNECTING,
4108 * other threads may have noticed network errors */
4109 dev_info(DEV, "unexpected cstate (%s) in receive_bitmap\n",
4110 drbd_conn_str(mdev->state.conn));
4111 }
82bc0194 4112 err = 0;
b411b363 4113
b411b363 4114 out:
20ceb2b2 4115 drbd_bm_unlock(mdev);
82bc0194 4116 if (!err && mdev->state.conn == C_WF_BITMAP_S)
b411b363 4117 drbd_start_resync(mdev, C_SYNC_SOURCE);
82bc0194 4118 return err;
b411b363
PR
4119}
4120
4a76b161 4121static int receive_skip(struct drbd_tconn *tconn, struct packet_info *pi)
b411b363 4122{
4a76b161 4123 conn_warn(tconn, "skipping unknown optional packet type %d, l: %d!\n",
e2857216 4124 pi->cmd, pi->size);
2de876ef 4125
4a76b161 4126 return ignore_remaining_packet(tconn, pi);
2de876ef
PR
4127}
4128
4a76b161 4129static int receive_UnplugRemote(struct drbd_tconn *tconn, struct packet_info *pi)
0ced55a3 4130{
e7f52dfb
LE
4131 /* Make sure we've acked all the TCP data associated
4132 * with the data requests being unplugged */
4a76b161 4133 drbd_tcp_quickack(tconn->data.socket);
0ced55a3 4134
82bc0194 4135 return 0;
0ced55a3
PR
4136}
4137
4a76b161 4138static int receive_out_of_sync(struct drbd_tconn *tconn, struct packet_info *pi)
73a01a18 4139{
4a76b161 4140 struct drbd_conf *mdev;
e658983a 4141 struct p_block_desc *p = pi->data;
4a76b161
AG
4142
4143 mdev = vnr_to_mdev(tconn, pi->vnr);
4144 if (!mdev)
4145 return -EIO;
73a01a18 4146
f735e363
LE
4147 switch (mdev->state.conn) {
4148 case C_WF_SYNC_UUID:
4149 case C_WF_BITMAP_T:
4150 case C_BEHIND:
4151 break;
4152 default:
4153 dev_err(DEV, "ASSERT FAILED cstate = %s, expected: WFSyncUUID|WFBitMapT|Behind\n",
4154 drbd_conn_str(mdev->state.conn));
4155 }
4156
73a01a18
PR
4157 drbd_set_out_of_sync(mdev, be64_to_cpu(p->sector), be32_to_cpu(p->blksize));
4158
82bc0194 4159 return 0;
73a01a18
PR
4160}
4161
02918be2
PR
4162struct data_cmd {
4163 int expect_payload;
4164 size_t pkt_size;
4a76b161 4165 int (*fn)(struct drbd_tconn *, struct packet_info *);
02918be2
PR
4166};
4167
4168static struct data_cmd drbd_cmd_handler[] = {
4a76b161
AG
4169 [P_DATA] = { 1, sizeof(struct p_data), receive_Data },
4170 [P_DATA_REPLY] = { 1, sizeof(struct p_data), receive_DataReply },
4171 [P_RS_DATA_REPLY] = { 1, sizeof(struct p_data), receive_RSDataReply } ,
4172 [P_BARRIER] = { 0, sizeof(struct p_barrier), receive_Barrier } ,
e658983a
AG
4173 [P_BITMAP] = { 1, 0, receive_bitmap } ,
4174 [P_COMPRESSED_BITMAP] = { 1, 0, receive_bitmap } ,
4175 [P_UNPLUG_REMOTE] = { 0, 0, receive_UnplugRemote },
4a76b161
AG
4176 [P_DATA_REQUEST] = { 0, sizeof(struct p_block_req), receive_DataRequest },
4177 [P_RS_DATA_REQUEST] = { 0, sizeof(struct p_block_req), receive_DataRequest },
e658983a
AG
4178 [P_SYNC_PARAM] = { 1, 0, receive_SyncParam },
4179 [P_SYNC_PARAM89] = { 1, 0, receive_SyncParam },
4a76b161
AG
4180 [P_PROTOCOL] = { 1, sizeof(struct p_protocol), receive_protocol },
4181 [P_UUIDS] = { 0, sizeof(struct p_uuids), receive_uuids },
4182 [P_SIZES] = { 0, sizeof(struct p_sizes), receive_sizes },
4183 [P_STATE] = { 0, sizeof(struct p_state), receive_state },
4184 [P_STATE_CHG_REQ] = { 0, sizeof(struct p_req_state), receive_req_state },
4185 [P_SYNC_UUID] = { 0, sizeof(struct p_rs_uuid), receive_sync_uuid },
4186 [P_OV_REQUEST] = { 0, sizeof(struct p_block_req), receive_DataRequest },
4187 [P_OV_REPLY] = { 1, sizeof(struct p_block_req), receive_DataRequest },
4188 [P_CSUM_RS_REQUEST] = { 1, sizeof(struct p_block_req), receive_DataRequest },
4189 [P_DELAY_PROBE] = { 0, sizeof(struct p_delay_probe93), receive_skip },
4190 [P_OUT_OF_SYNC] = { 0, sizeof(struct p_block_desc), receive_out_of_sync },
4191 [P_CONN_ST_CHG_REQ] = { 0, sizeof(struct p_req_state), receive_req_conn_state },
b411b363
PR
4192};
4193
eefc2f7d 4194static void drbdd(struct drbd_tconn *tconn)
b411b363 4195{
77351055 4196 struct packet_info pi;
02918be2 4197 size_t shs; /* sub header size */
82bc0194 4198 int err;
b411b363 4199
eefc2f7d 4200 while (get_t_state(&tconn->receiver) == RUNNING) {
deebe195
AG
4201 struct data_cmd *cmd;
4202
eefc2f7d 4203 drbd_thread_current_set_cpu(&tconn->receiver);
69bc7bc3 4204 if (drbd_recv_header(tconn, &pi))
02918be2 4205 goto err_out;
b411b363 4206
deebe195 4207 cmd = &drbd_cmd_handler[pi.cmd];
4a76b161 4208 if (unlikely(pi.cmd >= ARRAY_SIZE(drbd_cmd_handler) || !cmd->fn)) {
eefc2f7d 4209 conn_err(tconn, "unknown packet type %d, l: %d!\n", pi.cmd, pi.size);
02918be2 4210 goto err_out;
0b33a916 4211 }
b411b363 4212
e658983a
AG
4213 shs = cmd->pkt_size;
4214 if (pi.size > shs && !cmd->expect_payload) {
eefc2f7d 4215 conn_err(tconn, "No payload expected %s l:%d\n", cmdname(pi.cmd), pi.size);
02918be2 4216 goto err_out;
b411b363 4217 }
b411b363 4218
c13f7e1a 4219 if (shs) {
e658983a 4220 err = drbd_recv_all_warn(tconn, pi.data, shs);
a5c31904 4221 if (err)
c13f7e1a 4222 goto err_out;
e2857216 4223 pi.size -= shs;
c13f7e1a
LE
4224 }
4225
4a76b161
AG
4226 err = cmd->fn(tconn, &pi);
4227 if (err) {
9f5bdc33
AG
4228 conn_err(tconn, "error receiving %s, e: %d l: %d!\n",
4229 cmdname(pi.cmd), err, pi.size);
02918be2 4230 goto err_out;
b411b363
PR
4231 }
4232 }
82bc0194 4233 return;
b411b363 4234
82bc0194
AG
4235 err_out:
4236 conn_request_state(tconn, NS(conn, C_PROTOCOL_ERROR), CS_HARD);
b411b363
PR
4237}
4238
0e29d163 4239void conn_flush_workqueue(struct drbd_tconn *tconn)
b411b363
PR
4240{
4241 struct drbd_wq_barrier barr;
4242
4243 barr.w.cb = w_prev_work_done;
0e29d163 4244 barr.w.tconn = tconn;
b411b363 4245 init_completion(&barr.done);
0e29d163 4246 drbd_queue_work(&tconn->data.work, &barr.w);
b411b363
PR
4247 wait_for_completion(&barr.done);
4248}
4249
81fa2e67 4250static void conn_disconnect(struct drbd_tconn *tconn)
b411b363 4251{
c141ebda 4252 struct drbd_conf *mdev;
bbeb641c 4253 enum drbd_conns oc;
c141ebda 4254 int vnr, rv = SS_UNKNOWN_ERROR;
b411b363 4255
bbeb641c 4256 if (tconn->cstate == C_STANDALONE)
b411b363 4257 return;
b411b363
PR
4258
4259 /* asender does not clean up anything. it must not interfere, either */
360cc740
PR
4260 drbd_thread_stop(&tconn->asender);
4261 drbd_free_sock(tconn);
4262
c141ebda
PR
4263 rcu_read_lock();
4264 idr_for_each_entry(&tconn->volumes, mdev, vnr) {
4265 kref_get(&mdev->kref);
4266 rcu_read_unlock();
4267 drbd_disconnected(mdev);
4268 kref_put(&mdev->kref, &drbd_minor_destroy);
4269 rcu_read_lock();
4270 }
4271 rcu_read_unlock();
4272
360cc740
PR
4273 conn_info(tconn, "Connection closed\n");
4274
cb703454
PR
4275 if (conn_highest_role(tconn) == R_PRIMARY && conn_highest_pdsk(tconn) >= D_UNKNOWN)
4276 conn_try_outdate_peer_async(tconn);
4277
360cc740 4278 spin_lock_irq(&tconn->req_lock);
bbeb641c
PR
4279 oc = tconn->cstate;
4280 if (oc >= C_UNCONNECTED)
4281 rv = _conn_request_state(tconn, NS(conn, C_UNCONNECTED), CS_VERBOSE);
4282
360cc740
PR
4283 spin_unlock_irq(&tconn->req_lock);
4284
f3dfa40a 4285 if (oc == C_DISCONNECTING)
d9cc6e23 4286 conn_request_state(tconn, NS(conn, C_STANDALONE), CS_VERBOSE | CS_HARD);
360cc740
PR
4287}
4288
c141ebda 4289static int drbd_disconnected(struct drbd_conf *mdev)
360cc740 4290{
360cc740
PR
4291 enum drbd_fencing_p fp;
4292 unsigned int i;
b411b363 4293
85719573 4294 /* wait for current activity to cease. */
87eeee41 4295 spin_lock_irq(&mdev->tconn->req_lock);
b411b363
PR
4296 _drbd_wait_ee_list_empty(mdev, &mdev->active_ee);
4297 _drbd_wait_ee_list_empty(mdev, &mdev->sync_ee);
4298 _drbd_wait_ee_list_empty(mdev, &mdev->read_ee);
87eeee41 4299 spin_unlock_irq(&mdev->tconn->req_lock);
b411b363
PR
4300
4301 /* We do not have data structures that would allow us to
4302 * get the rs_pending_cnt down to 0 again.
4303 * * On C_SYNC_TARGET we do not have any data structures describing
4304 * the pending RSDataRequest's we have sent.
4305 * * On C_SYNC_SOURCE there is no data structure that tracks
4306 * the P_RS_DATA_REPLY blocks that we sent to the SyncTarget.
4307 * And no, it is not the sum of the reference counts in the
4308 * resync_LRU. The resync_LRU tracks the whole operation including
4309 * the disk-IO, while the rs_pending_cnt only tracks the blocks
4310 * on the fly. */
4311 drbd_rs_cancel_all(mdev);
4312 mdev->rs_total = 0;
4313 mdev->rs_failed = 0;
4314 atomic_set(&mdev->rs_pending_cnt, 0);
4315 wake_up(&mdev->misc_wait);
4316
7fde2be9
PR
4317 del_timer(&mdev->request_timer);
4318
b411b363 4319 del_timer_sync(&mdev->resync_timer);
b411b363
PR
4320 resync_timer_fn((unsigned long)mdev);
4321
b411b363
PR
4322 /* wait for all w_e_end_data_req, w_e_end_rsdata_req, w_send_barrier,
4323 * w_make_resync_request etc. which may still be on the worker queue
4324 * to be "canceled" */
a21e9298 4325 drbd_flush_workqueue(mdev);
b411b363 4326
a990be46 4327 drbd_finish_peer_reqs(mdev);
b411b363
PR
4328
4329 kfree(mdev->p_uuid);
4330 mdev->p_uuid = NULL;
4331
2aebfabb 4332 if (!drbd_suspended(mdev))
2f5cdd0b 4333 tl_clear(mdev->tconn);
b411b363 4334
b411b363
PR
4335 drbd_md_sync(mdev);
4336
4337 fp = FP_DONT_CARE;
4338 if (get_ldev(mdev)) {
daeda1cc
PR
4339 rcu_read_lock();
4340 fp = rcu_dereference(mdev->ldev->disk_conf)->fencing;
4341 rcu_read_unlock();
b411b363
PR
4342 put_ldev(mdev);
4343 }
4344
20ceb2b2
LE
4345 /* serialize with bitmap writeout triggered by the state change,
4346 * if any. */
4347 wait_event(mdev->misc_wait, !test_bit(BITMAP_IO, &mdev->flags));
4348
b411b363
PR
4349 /* tcp_close and release of sendpage pages can be deferred. I don't
4350 * want to use SO_LINGER, because apparently it can be deferred for
4351 * more than 20 seconds (longest time I checked).
4352 *
4353 * Actually we don't care for exactly when the network stack does its
4354 * put_page(), but release our reference on these pages right here.
4355 */
7721f567 4356 i = drbd_free_peer_reqs(mdev, &mdev->net_ee);
b411b363
PR
4357 if (i)
4358 dev_info(DEV, "net_ee not empty, killed %u entries\n", i);
435f0740
LE
4359 i = atomic_read(&mdev->pp_in_use_by_net);
4360 if (i)
4361 dev_info(DEV, "pp_in_use_by_net = %d, expected 0\n", i);
b411b363
PR
4362 i = atomic_read(&mdev->pp_in_use);
4363 if (i)
45bb912b 4364 dev_info(DEV, "pp_in_use = %d, expected 0\n", i);
b411b363
PR
4365
4366 D_ASSERT(list_empty(&mdev->read_ee));
4367 D_ASSERT(list_empty(&mdev->active_ee));
4368 D_ASSERT(list_empty(&mdev->sync_ee));
4369 D_ASSERT(list_empty(&mdev->done_ee));
4370
4371 /* ok, no more ee's on the fly, it is safe to reset the epoch_size */
4372 atomic_set(&mdev->current_epoch->epoch_size, 0);
4373 D_ASSERT(list_empty(&mdev->current_epoch->list));
360cc740
PR
4374
4375 return 0;
b411b363
PR
4376}
4377
4378/*
4379 * We support PRO_VERSION_MIN to PRO_VERSION_MAX. The protocol version
4380 * we can agree on is stored in agreed_pro_version.
4381 *
4382 * feature flags and the reserved array should be enough room for future
4383 * enhancements of the handshake protocol, and possible plugins...
4384 *
4385 * for now, they are expected to be zero, but ignored.
4386 */
6038178e 4387static int drbd_send_features(struct drbd_tconn *tconn)
b411b363 4388{
9f5bdc33
AG
4389 struct drbd_socket *sock;
4390 struct p_connection_features *p;
b411b363 4391
9f5bdc33
AG
4392 sock = &tconn->data;
4393 p = conn_prepare_command(tconn, sock);
4394 if (!p)
e8d17b01 4395 return -EIO;
b411b363
PR
4396 memset(p, 0, sizeof(*p));
4397 p->protocol_min = cpu_to_be32(PRO_VERSION_MIN);
4398 p->protocol_max = cpu_to_be32(PRO_VERSION_MAX);
9f5bdc33 4399 return conn_send_command(tconn, sock, P_CONNECTION_FEATURES, sizeof(*p), NULL, 0);
b411b363
PR
4400}
4401
4402/*
4403 * return values:
4404 * 1 yes, we have a valid connection
4405 * 0 oops, did not work out, please try again
4406 * -1 peer talks different language,
4407 * no point in trying again, please go standalone.
4408 */
6038178e 4409static int drbd_do_features(struct drbd_tconn *tconn)
b411b363 4410{
65d11ed6 4411 /* ASSERT current == tconn->receiver ... */
e658983a
AG
4412 struct p_connection_features *p;
4413 const int expect = sizeof(struct p_connection_features);
77351055 4414 struct packet_info pi;
a5c31904 4415 int err;
b411b363 4416
6038178e 4417 err = drbd_send_features(tconn);
e8d17b01 4418 if (err)
b411b363
PR
4419 return 0;
4420
69bc7bc3
AG
4421 err = drbd_recv_header(tconn, &pi);
4422 if (err)
b411b363
PR
4423 return 0;
4424
6038178e
AG
4425 if (pi.cmd != P_CONNECTION_FEATURES) {
4426 conn_err(tconn, "expected ConnectionFeatures packet, received: %s (0x%04x)\n",
77351055 4427 cmdname(pi.cmd), pi.cmd);
b411b363
PR
4428 return -1;
4429 }
4430
77351055 4431 if (pi.size != expect) {
6038178e 4432 conn_err(tconn, "expected ConnectionFeatures length: %u, received: %u\n",
77351055 4433 expect, pi.size);
b411b363
PR
4434 return -1;
4435 }
4436
e658983a
AG
4437 p = pi.data;
4438 err = drbd_recv_all_warn(tconn, p, expect);
a5c31904 4439 if (err)
b411b363 4440 return 0;
b411b363 4441
b411b363
PR
4442 p->protocol_min = be32_to_cpu(p->protocol_min);
4443 p->protocol_max = be32_to_cpu(p->protocol_max);
4444 if (p->protocol_max == 0)
4445 p->protocol_max = p->protocol_min;
4446
4447 if (PRO_VERSION_MAX < p->protocol_min ||
4448 PRO_VERSION_MIN > p->protocol_max)
4449 goto incompat;
4450
65d11ed6 4451 tconn->agreed_pro_version = min_t(int, PRO_VERSION_MAX, p->protocol_max);
b411b363 4452
65d11ed6
PR
4453 conn_info(tconn, "Handshake successful: "
4454 "Agreed network protocol version %d\n", tconn->agreed_pro_version);
b411b363
PR
4455
4456 return 1;
4457
4458 incompat:
65d11ed6 4459 conn_err(tconn, "incompatible DRBD dialects: "
b411b363
PR
4460 "I support %d-%d, peer supports %d-%d\n",
4461 PRO_VERSION_MIN, PRO_VERSION_MAX,
4462 p->protocol_min, p->protocol_max);
4463 return -1;
4464}
4465
4466#if !defined(CONFIG_CRYPTO_HMAC) && !defined(CONFIG_CRYPTO_HMAC_MODULE)
13e6037d 4467static int drbd_do_auth(struct drbd_tconn *tconn)
b411b363
PR
4468{
4469 dev_err(DEV, "This kernel was build without CONFIG_CRYPTO_HMAC.\n");
4470 dev_err(DEV, "You need to disable 'cram-hmac-alg' in drbd.conf.\n");
b10d96cb 4471 return -1;
b411b363
PR
4472}
4473#else
4474#define CHALLENGE_LEN 64
b10d96cb
JT
4475
4476/* Return value:
4477 1 - auth succeeded,
4478 0 - failed, try again (network error),
4479 -1 - auth failed, don't try again.
4480*/
4481
13e6037d 4482static int drbd_do_auth(struct drbd_tconn *tconn)
b411b363 4483{
9f5bdc33 4484 struct drbd_socket *sock;
b411b363
PR
4485 char my_challenge[CHALLENGE_LEN]; /* 64 Bytes... */
4486 struct scatterlist sg;
4487 char *response = NULL;
4488 char *right_response = NULL;
4489 char *peers_ch = NULL;
44ed167d
PR
4490 unsigned int key_len;
4491 char secret[SHARED_SECRET_MAX]; /* 64 byte */
b411b363
PR
4492 unsigned int resp_size;
4493 struct hash_desc desc;
77351055 4494 struct packet_info pi;
44ed167d 4495 struct net_conf *nc;
69bc7bc3 4496 int err, rv;
b411b363 4497
9f5bdc33
AG
4498 /* FIXME: Put the challenge/response into the preallocated socket buffer. */
4499
44ed167d
PR
4500 rcu_read_lock();
4501 nc = rcu_dereference(tconn->net_conf);
4502 key_len = strlen(nc->shared_secret);
4503 memcpy(secret, nc->shared_secret, key_len);
4504 rcu_read_unlock();
4505
13e6037d 4506 desc.tfm = tconn->cram_hmac_tfm;
b411b363
PR
4507 desc.flags = 0;
4508
44ed167d 4509 rv = crypto_hash_setkey(tconn->cram_hmac_tfm, (u8 *)secret, key_len);
b411b363 4510 if (rv) {
13e6037d 4511 conn_err(tconn, "crypto_hash_setkey() failed with %d\n", rv);
b10d96cb 4512 rv = -1;
b411b363
PR
4513 goto fail;
4514 }
4515
4516 get_random_bytes(my_challenge, CHALLENGE_LEN);
4517
9f5bdc33
AG
4518 sock = &tconn->data;
4519 if (!conn_prepare_command(tconn, sock)) {
4520 rv = 0;
4521 goto fail;
4522 }
e658983a 4523 rv = !conn_send_command(tconn, sock, P_AUTH_CHALLENGE, 0,
9f5bdc33 4524 my_challenge, CHALLENGE_LEN);
b411b363
PR
4525 if (!rv)
4526 goto fail;
4527
69bc7bc3
AG
4528 err = drbd_recv_header(tconn, &pi);
4529 if (err) {
4530 rv = 0;
b411b363 4531 goto fail;
69bc7bc3 4532 }
b411b363 4533
77351055 4534 if (pi.cmd != P_AUTH_CHALLENGE) {
13e6037d 4535 conn_err(tconn, "expected AuthChallenge packet, received: %s (0x%04x)\n",
77351055 4536 cmdname(pi.cmd), pi.cmd);
b411b363
PR
4537 rv = 0;
4538 goto fail;
4539 }
4540
77351055 4541 if (pi.size > CHALLENGE_LEN * 2) {
13e6037d 4542 conn_err(tconn, "expected AuthChallenge payload too big.\n");
b10d96cb 4543 rv = -1;
b411b363
PR
4544 goto fail;
4545 }
4546
77351055 4547 peers_ch = kmalloc(pi.size, GFP_NOIO);
b411b363 4548 if (peers_ch == NULL) {
13e6037d 4549 conn_err(tconn, "kmalloc of peers_ch failed\n");
b10d96cb 4550 rv = -1;
b411b363
PR
4551 goto fail;
4552 }
4553
a5c31904
AG
4554 err = drbd_recv_all_warn(tconn, peers_ch, pi.size);
4555 if (err) {
b411b363
PR
4556 rv = 0;
4557 goto fail;
4558 }
4559
13e6037d 4560 resp_size = crypto_hash_digestsize(tconn->cram_hmac_tfm);
b411b363
PR
4561 response = kmalloc(resp_size, GFP_NOIO);
4562 if (response == NULL) {
13e6037d 4563 conn_err(tconn, "kmalloc of response failed\n");
b10d96cb 4564 rv = -1;
b411b363
PR
4565 goto fail;
4566 }
4567
4568 sg_init_table(&sg, 1);
77351055 4569 sg_set_buf(&sg, peers_ch, pi.size);
b411b363
PR
4570
4571 rv = crypto_hash_digest(&desc, &sg, sg.length, response);
4572 if (rv) {
13e6037d 4573 conn_err(tconn, "crypto_hash_digest() failed with %d\n", rv);
b10d96cb 4574 rv = -1;
b411b363
PR
4575 goto fail;
4576 }
4577
9f5bdc33
AG
4578 if (!conn_prepare_command(tconn, sock)) {
4579 rv = 0;
4580 goto fail;
4581 }
e658983a 4582 rv = !conn_send_command(tconn, sock, P_AUTH_RESPONSE, 0,
9f5bdc33 4583 response, resp_size);
b411b363
PR
4584 if (!rv)
4585 goto fail;
4586
69bc7bc3
AG
4587 err = drbd_recv_header(tconn, &pi);
4588 if (err) {
4589 rv = 0;
b411b363 4590 goto fail;
69bc7bc3 4591 }
b411b363 4592
77351055 4593 if (pi.cmd != P_AUTH_RESPONSE) {
13e6037d 4594 conn_err(tconn, "expected AuthResponse packet, received: %s (0x%04x)\n",
77351055 4595 cmdname(pi.cmd), pi.cmd);
b411b363
PR
4596 rv = 0;
4597 goto fail;
4598 }
4599
77351055 4600 if (pi.size != resp_size) {
13e6037d 4601 conn_err(tconn, "expected AuthResponse payload of wrong size\n");
b411b363
PR
4602 rv = 0;
4603 goto fail;
4604 }
4605
a5c31904
AG
4606 err = drbd_recv_all_warn(tconn, response , resp_size);
4607 if (err) {
b411b363
PR
4608 rv = 0;
4609 goto fail;
4610 }
4611
4612 right_response = kmalloc(resp_size, GFP_NOIO);
2d1ee87d 4613 if (right_response == NULL) {
13e6037d 4614 conn_err(tconn, "kmalloc of right_response failed\n");
b10d96cb 4615 rv = -1;
b411b363
PR
4616 goto fail;
4617 }
4618
4619 sg_set_buf(&sg, my_challenge, CHALLENGE_LEN);
4620
4621 rv = crypto_hash_digest(&desc, &sg, sg.length, right_response);
4622 if (rv) {
13e6037d 4623 conn_err(tconn, "crypto_hash_digest() failed with %d\n", rv);
b10d96cb 4624 rv = -1;
b411b363
PR
4625 goto fail;
4626 }
4627
4628 rv = !memcmp(response, right_response, resp_size);
4629
4630 if (rv)
44ed167d
PR
4631 conn_info(tconn, "Peer authenticated using %d bytes HMAC\n",
4632 resp_size);
b10d96cb
JT
4633 else
4634 rv = -1;
b411b363
PR
4635
4636 fail:
4637 kfree(peers_ch);
4638 kfree(response);
4639 kfree(right_response);
4640
4641 return rv;
4642}
4643#endif
4644
4645int drbdd_init(struct drbd_thread *thi)
4646{
392c8801 4647 struct drbd_tconn *tconn = thi->tconn;
b411b363
PR
4648 int h;
4649
4d641dd7 4650 conn_info(tconn, "receiver (re)started\n");
b411b363
PR
4651
4652 do {
81fa2e67 4653 h = conn_connect(tconn);
b411b363 4654 if (h == 0) {
81fa2e67 4655 conn_disconnect(tconn);
20ee6390 4656 schedule_timeout_interruptible(HZ);
b411b363
PR
4657 }
4658 if (h == -1) {
4d641dd7 4659 conn_warn(tconn, "Discarding network configuration.\n");
bbeb641c 4660 conn_request_state(tconn, NS(conn, C_DISCONNECTING), CS_HARD);
b411b363
PR
4661 }
4662 } while (h == 0);
4663
91fd4dad
PR
4664 if (h > 0)
4665 drbdd(tconn);
b411b363 4666
81fa2e67 4667 conn_disconnect(tconn);
b411b363 4668
4d641dd7 4669 conn_info(tconn, "receiver terminated\n");
b411b363
PR
4670 return 0;
4671}
4672
4673/* ********* acknowledge sender ******** */
4674
e05e1e59 4675static int got_conn_RqSReply(struct drbd_tconn *tconn, struct packet_info *pi)
e4f78ede 4676{
e658983a 4677 struct p_req_state_reply *p = pi->data;
e4f78ede
PR
4678 int retcode = be32_to_cpu(p->retcode);
4679
4680 if (retcode >= SS_SUCCESS) {
4681 set_bit(CONN_WD_ST_CHG_OKAY, &tconn->flags);
4682 } else {
4683 set_bit(CONN_WD_ST_CHG_FAIL, &tconn->flags);
4684 conn_err(tconn, "Requested state change failed by peer: %s (%d)\n",
4685 drbd_set_st_err_str(retcode), retcode);
4686 }
4687 wake_up(&tconn->ping_wait);
4688
2735a594 4689 return 0;
e4f78ede
PR
4690}
4691
1952e916 4692static int got_RqSReply(struct drbd_tconn *tconn, struct packet_info *pi)
b411b363 4693{
1952e916 4694 struct drbd_conf *mdev;
e658983a 4695 struct p_req_state_reply *p = pi->data;
b411b363
PR
4696 int retcode = be32_to_cpu(p->retcode);
4697
1952e916
AG
4698 mdev = vnr_to_mdev(tconn, pi->vnr);
4699 if (!mdev)
2735a594 4700 return -EIO;
1952e916 4701
e4f78ede
PR
4702 if (retcode >= SS_SUCCESS) {
4703 set_bit(CL_ST_CHG_SUCCESS, &mdev->flags);
4704 } else {
4705 set_bit(CL_ST_CHG_FAIL, &mdev->flags);
4706 dev_err(DEV, "Requested state change failed by peer: %s (%d)\n",
4707 drbd_set_st_err_str(retcode), retcode);
b411b363 4708 }
e4f78ede
PR
4709 wake_up(&mdev->state_wait);
4710
2735a594 4711 return 0;
b411b363
PR
4712}
4713
e05e1e59 4714static int got_Ping(struct drbd_tconn *tconn, struct packet_info *pi)
b411b363 4715{
2735a594 4716 return drbd_send_ping_ack(tconn);
b411b363
PR
4717
4718}
4719
e05e1e59 4720static int got_PingAck(struct drbd_tconn *tconn, struct packet_info *pi)
b411b363
PR
4721{
4722 /* restore idle timeout */
2a67d8b9
PR
4723 tconn->meta.socket->sk->sk_rcvtimeo = tconn->net_conf->ping_int*HZ;
4724 if (!test_and_set_bit(GOT_PING_ACK, &tconn->flags))
4725 wake_up(&tconn->ping_wait);
b411b363 4726
2735a594 4727 return 0;
b411b363
PR
4728}
4729
1952e916 4730static int got_IsInSync(struct drbd_tconn *tconn, struct packet_info *pi)
b411b363 4731{
1952e916 4732 struct drbd_conf *mdev;
e658983a 4733 struct p_block_ack *p = pi->data;
b411b363
PR
4734 sector_t sector = be64_to_cpu(p->sector);
4735 int blksize = be32_to_cpu(p->blksize);
4736
1952e916
AG
4737 mdev = vnr_to_mdev(tconn, pi->vnr);
4738 if (!mdev)
2735a594 4739 return -EIO;
1952e916 4740
31890f4a 4741 D_ASSERT(mdev->tconn->agreed_pro_version >= 89);
b411b363
PR
4742
4743 update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4744
1d53f09e
LE
4745 if (get_ldev(mdev)) {
4746 drbd_rs_complete_io(mdev, sector);
4747 drbd_set_in_sync(mdev, sector, blksize);
4748 /* rs_same_csums is supposed to count in units of BM_BLOCK_SIZE */
4749 mdev->rs_same_csum += (blksize >> BM_BLOCK_SHIFT);
4750 put_ldev(mdev);
4751 }
b411b363 4752 dec_rs_pending(mdev);
778f271d 4753 atomic_add(blksize >> 9, &mdev->rs_sect_in);
b411b363 4754
2735a594 4755 return 0;
b411b363
PR
4756}
4757
bc9c5c41
AG
4758static int
4759validate_req_change_req_state(struct drbd_conf *mdev, u64 id, sector_t sector,
4760 struct rb_root *root, const char *func,
4761 enum drbd_req_event what, bool missing_ok)
b411b363
PR
4762{
4763 struct drbd_request *req;
4764 struct bio_and_error m;
4765
87eeee41 4766 spin_lock_irq(&mdev->tconn->req_lock);
bc9c5c41 4767 req = find_request(mdev, root, id, sector, missing_ok, func);
b411b363 4768 if (unlikely(!req)) {
87eeee41 4769 spin_unlock_irq(&mdev->tconn->req_lock);
85997675 4770 return -EIO;
b411b363
PR
4771 }
4772 __req_mod(req, what, &m);
87eeee41 4773 spin_unlock_irq(&mdev->tconn->req_lock);
b411b363
PR
4774
4775 if (m.bio)
4776 complete_master_bio(mdev, &m);
85997675 4777 return 0;
b411b363
PR
4778}
4779
1952e916 4780static int got_BlockAck(struct drbd_tconn *tconn, struct packet_info *pi)
b411b363 4781{
1952e916 4782 struct drbd_conf *mdev;
e658983a 4783 struct p_block_ack *p = pi->data;
b411b363
PR
4784 sector_t sector = be64_to_cpu(p->sector);
4785 int blksize = be32_to_cpu(p->blksize);
4786 enum drbd_req_event what;
4787
1952e916
AG
4788 mdev = vnr_to_mdev(tconn, pi->vnr);
4789 if (!mdev)
2735a594 4790 return -EIO;
1952e916 4791
b411b363
PR
4792 update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4793
579b57ed 4794 if (p->block_id == ID_SYNCER) {
b411b363
PR
4795 drbd_set_in_sync(mdev, sector, blksize);
4796 dec_rs_pending(mdev);
2735a594 4797 return 0;
b411b363 4798 }
e05e1e59 4799 switch (pi->cmd) {
b411b363 4800 case P_RS_WRITE_ACK:
8554df1c 4801 what = WRITE_ACKED_BY_PEER_AND_SIS;
b411b363
PR
4802 break;
4803 case P_WRITE_ACK:
8554df1c 4804 what = WRITE_ACKED_BY_PEER;
b411b363
PR
4805 break;
4806 case P_RECV_ACK:
8554df1c 4807 what = RECV_ACKED_BY_PEER;
b411b363 4808 break;
7be8da07 4809 case P_DISCARD_WRITE:
7be8da07
AG
4810 what = DISCARD_WRITE;
4811 break;
4812 case P_RETRY_WRITE:
7be8da07 4813 what = POSTPONE_WRITE;
b411b363
PR
4814 break;
4815 default:
2735a594 4816 BUG();
b411b363
PR
4817 }
4818
2735a594
AG
4819 return validate_req_change_req_state(mdev, p->block_id, sector,
4820 &mdev->write_requests, __func__,
4821 what, false);
b411b363
PR
4822}
4823
1952e916 4824static int got_NegAck(struct drbd_tconn *tconn, struct packet_info *pi)
b411b363 4825{
1952e916 4826 struct drbd_conf *mdev;
e658983a 4827 struct p_block_ack *p = pi->data;
b411b363 4828 sector_t sector = be64_to_cpu(p->sector);
2deb8336 4829 int size = be32_to_cpu(p->blksize);
85997675 4830 int err;
b411b363 4831
1952e916
AG
4832 mdev = vnr_to_mdev(tconn, pi->vnr);
4833 if (!mdev)
2735a594 4834 return -EIO;
1952e916 4835
b411b363
PR
4836 update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4837
579b57ed 4838 if (p->block_id == ID_SYNCER) {
b411b363
PR
4839 dec_rs_pending(mdev);
4840 drbd_rs_failed_io(mdev, sector, size);
2735a594 4841 return 0;
b411b363 4842 }
2deb8336 4843
85997675
AG
4844 err = validate_req_change_req_state(mdev, p->block_id, sector,
4845 &mdev->write_requests, __func__,
303d1448 4846 NEG_ACKED, true);
85997675 4847 if (err) {
c3afd8f5
AG
4848 /* Protocol A has no P_WRITE_ACKs, but has P_NEG_ACKs.
4849 The master bio might already be completed, therefore the
4850 request is no longer in the collision hash. */
4851 /* In Protocol B we might already have got a P_RECV_ACK
4852 but then get a P_NEG_ACK afterwards. */
c3afd8f5 4853 drbd_set_out_of_sync(mdev, sector, size);
2deb8336 4854 }
2735a594 4855 return 0;
b411b363
PR
4856}
4857
1952e916 4858static int got_NegDReply(struct drbd_tconn *tconn, struct packet_info *pi)
b411b363 4859{
1952e916 4860 struct drbd_conf *mdev;
e658983a 4861 struct p_block_ack *p = pi->data;
b411b363
PR
4862 sector_t sector = be64_to_cpu(p->sector);
4863
1952e916
AG
4864 mdev = vnr_to_mdev(tconn, pi->vnr);
4865 if (!mdev)
2735a594 4866 return -EIO;
1952e916 4867
b411b363 4868 update_peer_seq(mdev, be32_to_cpu(p->seq_num));
7be8da07 4869
b411b363
PR
4870 dev_err(DEV, "Got NegDReply; Sector %llus, len %u; Fail original request.\n",
4871 (unsigned long long)sector, be32_to_cpu(p->blksize));
4872
2735a594
AG
4873 return validate_req_change_req_state(mdev, p->block_id, sector,
4874 &mdev->read_requests, __func__,
4875 NEG_ACKED, false);
b411b363
PR
4876}
4877
1952e916 4878static int got_NegRSDReply(struct drbd_tconn *tconn, struct packet_info *pi)
b411b363 4879{
1952e916 4880 struct drbd_conf *mdev;
b411b363
PR
4881 sector_t sector;
4882 int size;
e658983a 4883 struct p_block_ack *p = pi->data;
1952e916
AG
4884
4885 mdev = vnr_to_mdev(tconn, pi->vnr);
4886 if (!mdev)
2735a594 4887 return -EIO;
b411b363
PR
4888
4889 sector = be64_to_cpu(p->sector);
4890 size = be32_to_cpu(p->blksize);
b411b363
PR
4891
4892 update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4893
4894 dec_rs_pending(mdev);
4895
4896 if (get_ldev_if_state(mdev, D_FAILED)) {
4897 drbd_rs_complete_io(mdev, sector);
e05e1e59 4898 switch (pi->cmd) {
d612d309
PR
4899 case P_NEG_RS_DREPLY:
4900 drbd_rs_failed_io(mdev, sector, size);
4901 case P_RS_CANCEL:
4902 break;
4903 default:
2735a594 4904 BUG();
d612d309 4905 }
b411b363
PR
4906 put_ldev(mdev);
4907 }
4908
2735a594 4909 return 0;
b411b363
PR
4910}
4911
1952e916 4912static int got_BarrierAck(struct drbd_tconn *tconn, struct packet_info *pi)
b411b363 4913{
1952e916 4914 struct drbd_conf *mdev;
e658983a 4915 struct p_barrier_ack *p = pi->data;
1952e916
AG
4916
4917 mdev = vnr_to_mdev(tconn, pi->vnr);
4918 if (!mdev)
2735a594 4919 return -EIO;
b411b363 4920
2f5cdd0b 4921 tl_release(mdev->tconn, p->barrier, be32_to_cpu(p->set_size));
b411b363 4922
c4752ef1
PR
4923 if (mdev->state.conn == C_AHEAD &&
4924 atomic_read(&mdev->ap_in_flight) == 0 &&
370a43e7
PR
4925 !test_and_set_bit(AHEAD_TO_SYNC_SOURCE, &mdev->current_epoch->flags)) {
4926 mdev->start_resync_timer.expires = jiffies + HZ;
4927 add_timer(&mdev->start_resync_timer);
c4752ef1
PR
4928 }
4929
2735a594 4930 return 0;
b411b363
PR
4931}
4932
1952e916 4933static int got_OVResult(struct drbd_tconn *tconn, struct packet_info *pi)
b411b363 4934{
1952e916 4935 struct drbd_conf *mdev;
e658983a 4936 struct p_block_ack *p = pi->data;
b411b363
PR
4937 struct drbd_work *w;
4938 sector_t sector;
4939 int size;
4940
1952e916
AG
4941 mdev = vnr_to_mdev(tconn, pi->vnr);
4942 if (!mdev)
2735a594 4943 return -EIO;
1952e916 4944
b411b363
PR
4945 sector = be64_to_cpu(p->sector);
4946 size = be32_to_cpu(p->blksize);
4947
4948 update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4949
4950 if (be64_to_cpu(p->block_id) == ID_OUT_OF_SYNC)
8f7bed77 4951 drbd_ov_out_of_sync_found(mdev, sector, size);
b411b363 4952 else
8f7bed77 4953 ov_out_of_sync_print(mdev);
b411b363 4954
1d53f09e 4955 if (!get_ldev(mdev))
2735a594 4956 return 0;
1d53f09e 4957
b411b363
PR
4958 drbd_rs_complete_io(mdev, sector);
4959 dec_rs_pending(mdev);
4960
ea5442af
LE
4961 --mdev->ov_left;
4962
4963 /* let's advance progress step marks only for every other megabyte */
4964 if ((mdev->ov_left & 0x200) == 0x200)
4965 drbd_advance_rs_marks(mdev, mdev->ov_left);
4966
4967 if (mdev->ov_left == 0) {
b411b363
PR
4968 w = kmalloc(sizeof(*w), GFP_NOIO);
4969 if (w) {
4970 w->cb = w_ov_finished;
a21e9298 4971 w->mdev = mdev;
e42325a5 4972 drbd_queue_work_front(&mdev->tconn->data.work, w);
b411b363
PR
4973 } else {
4974 dev_err(DEV, "kmalloc(w) failed.");
8f7bed77 4975 ov_out_of_sync_print(mdev);
b411b363
PR
4976 drbd_resync_finished(mdev);
4977 }
4978 }
1d53f09e 4979 put_ldev(mdev);
2735a594 4980 return 0;
b411b363
PR
4981}
4982
1952e916 4983static int got_skip(struct drbd_tconn *tconn, struct packet_info *pi)
0ced55a3 4984{
2735a594 4985 return 0;
0ced55a3
PR
4986}
4987
a990be46 4988static int tconn_finish_peer_reqs(struct drbd_tconn *tconn)
32862ec7 4989{
082a3439 4990 struct drbd_conf *mdev;
c141ebda 4991 int vnr, not_empty = 0;
32862ec7
PR
4992
4993 do {
4994 clear_bit(SIGNAL_ASENDER, &tconn->flags);
4995 flush_signals(current);
c141ebda
PR
4996
4997 rcu_read_lock();
4998 idr_for_each_entry(&tconn->volumes, mdev, vnr) {
4999 kref_get(&mdev->kref);
5000 rcu_read_unlock();
d3fcb490 5001 if (drbd_finish_peer_reqs(mdev)) {
c141ebda
PR
5002 kref_put(&mdev->kref, &drbd_minor_destroy);
5003 return 1;
d3fcb490 5004 }
c141ebda
PR
5005 kref_put(&mdev->kref, &drbd_minor_destroy);
5006 rcu_read_lock();
082a3439 5007 }
32862ec7 5008 set_bit(SIGNAL_ASENDER, &tconn->flags);
082a3439
PR
5009
5010 spin_lock_irq(&tconn->req_lock);
c141ebda 5011 idr_for_each_entry(&tconn->volumes, mdev, vnr) {
082a3439
PR
5012 not_empty = !list_empty(&mdev->done_ee);
5013 if (not_empty)
5014 break;
5015 }
5016 spin_unlock_irq(&tconn->req_lock);
c141ebda 5017 rcu_read_unlock();
32862ec7
PR
5018 } while (not_empty);
5019
5020 return 0;
5021}
5022
7201b972
AG
5023struct asender_cmd {
5024 size_t pkt_size;
1952e916 5025 int (*fn)(struct drbd_tconn *tconn, struct packet_info *);
7201b972
AG
5026};
5027
5028static struct asender_cmd asender_tbl[] = {
e658983a
AG
5029 [P_PING] = { 0, got_Ping },
5030 [P_PING_ACK] = { 0, got_PingAck },
1952e916
AG
5031 [P_RECV_ACK] = { sizeof(struct p_block_ack), got_BlockAck },
5032 [P_WRITE_ACK] = { sizeof(struct p_block_ack), got_BlockAck },
5033 [P_RS_WRITE_ACK] = { sizeof(struct p_block_ack), got_BlockAck },
5034 [P_DISCARD_WRITE] = { sizeof(struct p_block_ack), got_BlockAck },
5035 [P_NEG_ACK] = { sizeof(struct p_block_ack), got_NegAck },
5036 [P_NEG_DREPLY] = { sizeof(struct p_block_ack), got_NegDReply },
5037 [P_NEG_RS_DREPLY] = { sizeof(struct p_block_ack), got_NegRSDReply },
5038 [P_OV_RESULT] = { sizeof(struct p_block_ack), got_OVResult },
5039 [P_BARRIER_ACK] = { sizeof(struct p_barrier_ack), got_BarrierAck },
5040 [P_STATE_CHG_REPLY] = { sizeof(struct p_req_state_reply), got_RqSReply },
5041 [P_RS_IS_IN_SYNC] = { sizeof(struct p_block_ack), got_IsInSync },
5042 [P_DELAY_PROBE] = { sizeof(struct p_delay_probe93), got_skip },
5043 [P_RS_CANCEL] = { sizeof(struct p_block_ack), got_NegRSDReply },
5044 [P_CONN_ST_CHG_REPLY]={ sizeof(struct p_req_state_reply), got_conn_RqSReply },
5045 [P_RETRY_WRITE] = { sizeof(struct p_block_ack), got_BlockAck },
7201b972
AG
5046};
5047
b411b363
PR
5048int drbd_asender(struct drbd_thread *thi)
5049{
392c8801 5050 struct drbd_tconn *tconn = thi->tconn;
b411b363 5051 struct asender_cmd *cmd = NULL;
77351055 5052 struct packet_info pi;
257d0af6 5053 int rv;
e658983a 5054 void *buf = tconn->meta.rbuf;
b411b363 5055 int received = 0;
52b061a4
AG
5056 unsigned int header_size = drbd_header_size(tconn);
5057 int expect = header_size;
44ed167d
PR
5058 bool ping_timeout_active = false;
5059 struct net_conf *nc;
bb77d34e 5060 int ping_timeo, tcp_cork, ping_int;
b411b363 5061
b411b363
PR
5062 current->policy = SCHED_RR; /* Make this a realtime task! */
5063 current->rt_priority = 2; /* more important than all other tasks */
5064
e77a0a5c 5065 while (get_t_state(thi) == RUNNING) {
80822284 5066 drbd_thread_current_set_cpu(thi);
44ed167d
PR
5067
5068 rcu_read_lock();
5069 nc = rcu_dereference(tconn->net_conf);
5070 ping_timeo = nc->ping_timeo;
bb77d34e 5071 tcp_cork = nc->tcp_cork;
44ed167d
PR
5072 ping_int = nc->ping_int;
5073 rcu_read_unlock();
5074
32862ec7 5075 if (test_and_clear_bit(SEND_PING, &tconn->flags)) {
a17647aa 5076 if (drbd_send_ping(tconn)) {
32862ec7 5077 conn_err(tconn, "drbd_send_ping has failed\n");
841ce241
AG
5078 goto reconnect;
5079 }
44ed167d
PR
5080 tconn->meta.socket->sk->sk_rcvtimeo = ping_timeo * HZ / 10;
5081 ping_timeout_active = true;
b411b363
PR
5082 }
5083
32862ec7
PR
5084 /* TODO: conditionally cork; it may hurt latency if we cork without
5085 much to send */
bb77d34e 5086 if (tcp_cork)
32862ec7 5087 drbd_tcp_cork(tconn->meta.socket);
a990be46
AG
5088 if (tconn_finish_peer_reqs(tconn)) {
5089 conn_err(tconn, "tconn_finish_peer_reqs() failed\n");
32862ec7 5090 goto reconnect;
082a3439 5091 }
b411b363 5092 /* but unconditionally uncork unless disabled */
bb77d34e 5093 if (tcp_cork)
32862ec7 5094 drbd_tcp_uncork(tconn->meta.socket);
b411b363
PR
5095
5096 /* short circuit, recv_msg would return EINTR anyways. */
5097 if (signal_pending(current))
5098 continue;
5099
32862ec7
PR
5100 rv = drbd_recv_short(tconn->meta.socket, buf, expect-received, 0);
5101 clear_bit(SIGNAL_ASENDER, &tconn->flags);
b411b363
PR
5102
5103 flush_signals(current);
5104
5105 /* Note:
5106 * -EINTR (on meta) we got a signal
5107 * -EAGAIN (on meta) rcvtimeo expired
5108 * -ECONNRESET other side closed the connection
5109 * -ERESTARTSYS (on data) we got a signal
5110 * rv < 0 other than above: unexpected error!
5111 * rv == expected: full header or command
5112 * rv < expected: "woken" by signal during receive
5113 * rv == 0 : "connection shut down by peer"
5114 */
5115 if (likely(rv > 0)) {
5116 received += rv;
5117 buf += rv;
5118 } else if (rv == 0) {
32862ec7 5119 conn_err(tconn, "meta connection shut down by peer.\n");
b411b363
PR
5120 goto reconnect;
5121 } else if (rv == -EAGAIN) {
cb6518cb
LE
5122 /* If the data socket received something meanwhile,
5123 * that is good enough: peer is still alive. */
32862ec7
PR
5124 if (time_after(tconn->last_received,
5125 jiffies - tconn->meta.socket->sk->sk_rcvtimeo))
cb6518cb 5126 continue;
f36af18c 5127 if (ping_timeout_active) {
32862ec7 5128 conn_err(tconn, "PingAck did not arrive in time.\n");
b411b363
PR
5129 goto reconnect;
5130 }
32862ec7 5131 set_bit(SEND_PING, &tconn->flags);
b411b363
PR
5132 continue;
5133 } else if (rv == -EINTR) {
5134 continue;
5135 } else {
32862ec7 5136 conn_err(tconn, "sock_recvmsg returned %d\n", rv);
b411b363
PR
5137 goto reconnect;
5138 }
5139
5140 if (received == expect && cmd == NULL) {
e658983a 5141 if (decode_header(tconn, tconn->meta.rbuf, &pi))
b411b363 5142 goto reconnect;
7201b972 5143 cmd = &asender_tbl[pi.cmd];
1952e916 5144 if (pi.cmd >= ARRAY_SIZE(asender_tbl) || !cmd->fn) {
32862ec7 5145 conn_err(tconn, "unknown command %d on meta (l: %d)\n",
77351055 5146 pi.cmd, pi.size);
b411b363
PR
5147 goto disconnect;
5148 }
e658983a 5149 expect = header_size + cmd->pkt_size;
52b061a4 5150 if (pi.size != expect - header_size) {
32862ec7 5151 conn_err(tconn, "Wrong packet size on meta (c: %d, l: %d)\n",
77351055 5152 pi.cmd, pi.size);
b411b363 5153 goto reconnect;
257d0af6 5154 }
b411b363
PR
5155 }
5156 if (received == expect) {
2735a594 5157 bool err;
a4fbda8e 5158
2735a594
AG
5159 err = cmd->fn(tconn, &pi);
5160 if (err) {
1952e916 5161 conn_err(tconn, "%pf failed\n", cmd->fn);
b411b363 5162 goto reconnect;
1952e916 5163 }
b411b363 5164
a4fbda8e
PR
5165 tconn->last_received = jiffies;
5166
44ed167d
PR
5167 if (cmd == &asender_tbl[P_PING_ACK]) {
5168 /* restore idle timeout */
5169 tconn->meta.socket->sk->sk_rcvtimeo = ping_int * HZ;
5170 ping_timeout_active = false;
5171 }
f36af18c 5172
e658983a 5173 buf = tconn->meta.rbuf;
b411b363 5174 received = 0;
52b061a4 5175 expect = header_size;
b411b363
PR
5176 cmd = NULL;
5177 }
5178 }
5179
5180 if (0) {
5181reconnect:
bbeb641c 5182 conn_request_state(tconn, NS(conn, C_NETWORK_FAILURE), CS_HARD);
b411b363
PR
5183 }
5184 if (0) {
5185disconnect:
bbeb641c 5186 conn_request_state(tconn, NS(conn, C_DISCONNECTING), CS_HARD);
b411b363 5187 }
32862ec7 5188 clear_bit(SIGNAL_ASENDER, &tconn->flags);
b411b363 5189
32862ec7 5190 conn_info(tconn, "asender terminated\n");
b411b363
PR
5191
5192 return 0;
5193}
This page took 0.465163 seconds and 5 git commands to generate.