drbd: Pass a peer device to a number of fuctions
[deliverable/linux.git] / drivers / block / drbd / drbd_receiver.c
CommitLineData
b411b363
PR
1/*
2 drbd_receiver.c
3
4 This file is part of DRBD by Philipp Reisner and Lars Ellenberg.
5
6 Copyright (C) 2001-2008, LINBIT Information Technologies GmbH.
7 Copyright (C) 1999-2008, Philipp Reisner <philipp.reisner@linbit.com>.
8 Copyright (C) 2002-2008, Lars Ellenberg <lars.ellenberg@linbit.com>.
9
10 drbd is free software; you can redistribute it and/or modify
11 it under the terms of the GNU General Public License as published by
12 the Free Software Foundation; either version 2, or (at your option)
13 any later version.
14
15 drbd is distributed in the hope that it will be useful,
16 but WITHOUT ANY WARRANTY; without even the implied warranty of
17 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18 GNU General Public License for more details.
19
20 You should have received a copy of the GNU General Public License
21 along with drbd; see the file COPYING. If not, write to
22 the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
23 */
24
25
b411b363
PR
26#include <linux/module.h>
27
28#include <asm/uaccess.h>
29#include <net/sock.h>
30
b411b363
PR
31#include <linux/drbd.h>
32#include <linux/fs.h>
33#include <linux/file.h>
34#include <linux/in.h>
35#include <linux/mm.h>
36#include <linux/memcontrol.h>
37#include <linux/mm_inline.h>
38#include <linux/slab.h>
b411b363
PR
39#include <linux/pkt_sched.h>
40#define __KERNEL_SYSCALLS__
41#include <linux/unistd.h>
42#include <linux/vmalloc.h>
43#include <linux/random.h>
b411b363
PR
44#include <linux/string.h>
45#include <linux/scatterlist.h>
46#include "drbd_int.h"
a3603a6e 47#include "drbd_protocol.h"
b411b363
PR
48#include "drbd_req.h"
49
50#include "drbd_vli.h"
51
77351055
PR
52struct packet_info {
53 enum drbd_packet cmd;
e2857216
AG
54 unsigned int size;
55 unsigned int vnr;
e658983a 56 void *data;
77351055
PR
57};
58
b411b363
PR
59enum finish_epoch {
60 FE_STILL_LIVE,
61 FE_DESTROYED,
62 FE_RECYCLED,
63};
64
bde89a9e
AG
65static int drbd_do_features(struct drbd_connection *connection);
66static int drbd_do_auth(struct drbd_connection *connection);
69a22773 67static int drbd_disconnected(struct drbd_peer_device *);
b411b363 68
bde89a9e 69static enum finish_epoch drbd_may_finish_epoch(struct drbd_connection *, struct drbd_epoch *, enum epoch_event);
99920dc5 70static int e_end_block(struct drbd_work *, int);
b411b363 71
b411b363
PR
72
73#define GFP_TRY (__GFP_HIGHMEM | __GFP_NOWARN)
74
45bb912b
LE
75/*
76 * some helper functions to deal with single linked page lists,
77 * page->private being our "next" pointer.
78 */
79
80/* If at least n pages are linked at head, get n pages off.
81 * Otherwise, don't modify head, and return NULL.
82 * Locking is the responsibility of the caller.
83 */
84static struct page *page_chain_del(struct page **head, int n)
85{
86 struct page *page;
87 struct page *tmp;
88
89 BUG_ON(!n);
90 BUG_ON(!head);
91
92 page = *head;
23ce4227
PR
93
94 if (!page)
95 return NULL;
96
45bb912b
LE
97 while (page) {
98 tmp = page_chain_next(page);
99 if (--n == 0)
100 break; /* found sufficient pages */
101 if (tmp == NULL)
102 /* insufficient pages, don't use any of them. */
103 return NULL;
104 page = tmp;
105 }
106
107 /* add end of list marker for the returned list */
108 set_page_private(page, 0);
109 /* actual return value, and adjustment of head */
110 page = *head;
111 *head = tmp;
112 return page;
113}
114
115/* may be used outside of locks to find the tail of a (usually short)
116 * "private" page chain, before adding it back to a global chain head
117 * with page_chain_add() under a spinlock. */
118static struct page *page_chain_tail(struct page *page, int *len)
119{
120 struct page *tmp;
121 int i = 1;
122 while ((tmp = page_chain_next(page)))
123 ++i, page = tmp;
124 if (len)
125 *len = i;
126 return page;
127}
128
129static int page_chain_free(struct page *page)
130{
131 struct page *tmp;
132 int i = 0;
133 page_chain_for_each_safe(page, tmp) {
134 put_page(page);
135 ++i;
136 }
137 return i;
138}
139
140static void page_chain_add(struct page **head,
141 struct page *chain_first, struct page *chain_last)
142{
143#if 1
144 struct page *tmp;
145 tmp = page_chain_tail(chain_first, NULL);
146 BUG_ON(tmp != chain_last);
147#endif
148
149 /* add chain to head */
150 set_page_private(chain_last, (unsigned long)*head);
151 *head = chain_first;
152}
153
b30ab791 154static struct page *__drbd_alloc_pages(struct drbd_device *device,
18c2d522 155 unsigned int number)
b411b363
PR
156{
157 struct page *page = NULL;
45bb912b 158 struct page *tmp = NULL;
18c2d522 159 unsigned int i = 0;
b411b363
PR
160
161 /* Yes, testing drbd_pp_vacant outside the lock is racy.
162 * So what. It saves a spin_lock. */
45bb912b 163 if (drbd_pp_vacant >= number) {
b411b363 164 spin_lock(&drbd_pp_lock);
45bb912b
LE
165 page = page_chain_del(&drbd_pp_pool, number);
166 if (page)
167 drbd_pp_vacant -= number;
b411b363 168 spin_unlock(&drbd_pp_lock);
45bb912b
LE
169 if (page)
170 return page;
b411b363 171 }
45bb912b 172
b411b363
PR
173 /* GFP_TRY, because we must not cause arbitrary write-out: in a DRBD
174 * "criss-cross" setup, that might cause write-out on some other DRBD,
175 * which in turn might block on the other node at this very place. */
45bb912b
LE
176 for (i = 0; i < number; i++) {
177 tmp = alloc_page(GFP_TRY);
178 if (!tmp)
179 break;
180 set_page_private(tmp, (unsigned long)page);
181 page = tmp;
182 }
183
184 if (i == number)
185 return page;
186
187 /* Not enough pages immediately available this time.
c37c8ecf 188 * No need to jump around here, drbd_alloc_pages will retry this
45bb912b
LE
189 * function "soon". */
190 if (page) {
191 tmp = page_chain_tail(page, NULL);
192 spin_lock(&drbd_pp_lock);
193 page_chain_add(&drbd_pp_pool, page, tmp);
194 drbd_pp_vacant += i;
195 spin_unlock(&drbd_pp_lock);
196 }
197 return NULL;
b411b363
PR
198}
199
b30ab791 200static void reclaim_finished_net_peer_reqs(struct drbd_device *device,
a990be46 201 struct list_head *to_be_freed)
b411b363 202{
db830c46 203 struct drbd_peer_request *peer_req;
b411b363
PR
204 struct list_head *le, *tle;
205
206 /* The EEs are always appended to the end of the list. Since
207 they are sent in order over the wire, they have to finish
208 in order. As soon as we see the first not finished we can
209 stop to examine the list... */
210
b30ab791 211 list_for_each_safe(le, tle, &device->net_ee) {
db830c46 212 peer_req = list_entry(le, struct drbd_peer_request, w.list);
045417f7 213 if (drbd_peer_req_has_active_page(peer_req))
b411b363
PR
214 break;
215 list_move(le, to_be_freed);
216 }
217}
218
b30ab791 219static void drbd_kick_lo_and_reclaim_net(struct drbd_device *device)
b411b363
PR
220{
221 LIST_HEAD(reclaimed);
db830c46 222 struct drbd_peer_request *peer_req, *t;
b411b363 223
0500813f 224 spin_lock_irq(&device->resource->req_lock);
b30ab791 225 reclaim_finished_net_peer_reqs(device, &reclaimed);
0500813f 226 spin_unlock_irq(&device->resource->req_lock);
b411b363 227
db830c46 228 list_for_each_entry_safe(peer_req, t, &reclaimed, w.list)
b30ab791 229 drbd_free_net_peer_req(device, peer_req);
b411b363
PR
230}
231
232/**
c37c8ecf 233 * drbd_alloc_pages() - Returns @number pages, retries forever (or until signalled)
b30ab791 234 * @device: DRBD device.
45bb912b
LE
235 * @number: number of pages requested
236 * @retry: whether to retry, if not enough pages are available right now
237 *
238 * Tries to allocate number pages, first from our own page pool, then from
239 * the kernel, unless this allocation would exceed the max_buffers setting.
240 * Possibly retry until DRBD frees sufficient pages somewhere else.
b411b363 241 *
45bb912b 242 * Returns a page chain linked via page->private.
b411b363 243 */
69a22773 244struct page *drbd_alloc_pages(struct drbd_peer_device *peer_device, unsigned int number,
c37c8ecf 245 bool retry)
b411b363 246{
69a22773 247 struct drbd_device *device = peer_device->device;
b411b363 248 struct page *page = NULL;
44ed167d 249 struct net_conf *nc;
b411b363 250 DEFINE_WAIT(wait);
44ed167d 251 int mxb;
b411b363 252
45bb912b
LE
253 /* Yes, we may run up to @number over max_buffers. If we
254 * follow it strictly, the admin will get it wrong anyways. */
44ed167d 255 rcu_read_lock();
69a22773 256 nc = rcu_dereference(peer_device->connection->net_conf);
44ed167d
PR
257 mxb = nc ? nc->max_buffers : 1000000;
258 rcu_read_unlock();
259
b30ab791
AG
260 if (atomic_read(&device->pp_in_use) < mxb)
261 page = __drbd_alloc_pages(device, number);
b411b363 262
45bb912b 263 while (page == NULL) {
b411b363
PR
264 prepare_to_wait(&drbd_pp_wait, &wait, TASK_INTERRUPTIBLE);
265
b30ab791 266 drbd_kick_lo_and_reclaim_net(device);
b411b363 267
b30ab791
AG
268 if (atomic_read(&device->pp_in_use) < mxb) {
269 page = __drbd_alloc_pages(device, number);
b411b363
PR
270 if (page)
271 break;
272 }
273
274 if (!retry)
275 break;
276
277 if (signal_pending(current)) {
d0180171 278 drbd_warn(device, "drbd_alloc_pages interrupted!\n");
b411b363
PR
279 break;
280 }
281
282 schedule();
283 }
284 finish_wait(&drbd_pp_wait, &wait);
285
45bb912b 286 if (page)
b30ab791 287 atomic_add(number, &device->pp_in_use);
b411b363
PR
288 return page;
289}
290
c37c8ecf 291/* Must not be used from irq, as that may deadlock: see drbd_alloc_pages.
0500813f 292 * Is also used from inside an other spin_lock_irq(&resource->req_lock);
45bb912b
LE
293 * Either links the page chain back to the global pool,
294 * or returns all pages to the system. */
b30ab791 295static void drbd_free_pages(struct drbd_device *device, struct page *page, int is_net)
b411b363 296{
b30ab791 297 atomic_t *a = is_net ? &device->pp_in_use_by_net : &device->pp_in_use;
b411b363 298 int i;
435f0740 299
a73ff323
LE
300 if (page == NULL)
301 return;
302
81a5d60e 303 if (drbd_pp_vacant > (DRBD_MAX_BIO_SIZE/PAGE_SIZE) * minor_count)
45bb912b
LE
304 i = page_chain_free(page);
305 else {
306 struct page *tmp;
307 tmp = page_chain_tail(page, &i);
308 spin_lock(&drbd_pp_lock);
309 page_chain_add(&drbd_pp_pool, page, tmp);
310 drbd_pp_vacant += i;
311 spin_unlock(&drbd_pp_lock);
b411b363 312 }
435f0740 313 i = atomic_sub_return(i, a);
45bb912b 314 if (i < 0)
d0180171 315 drbd_warn(device, "ASSERTION FAILED: %s: %d < 0\n",
435f0740 316 is_net ? "pp_in_use_by_net" : "pp_in_use", i);
b411b363
PR
317 wake_up(&drbd_pp_wait);
318}
319
320/*
321You need to hold the req_lock:
322 _drbd_wait_ee_list_empty()
323
324You must not have the req_lock:
3967deb1 325 drbd_free_peer_req()
0db55363 326 drbd_alloc_peer_req()
7721f567 327 drbd_free_peer_reqs()
b411b363 328 drbd_ee_fix_bhs()
a990be46 329 drbd_finish_peer_reqs()
b411b363
PR
330 drbd_clear_done_ee()
331 drbd_wait_ee_list_empty()
332*/
333
f6ffca9f 334struct drbd_peer_request *
69a22773 335drbd_alloc_peer_req(struct drbd_peer_device *peer_device, u64 id, sector_t sector,
0db55363 336 unsigned int data_size, gfp_t gfp_mask) __must_hold(local)
b411b363 337{
69a22773 338 struct drbd_device *device = peer_device->device;
db830c46 339 struct drbd_peer_request *peer_req;
a73ff323 340 struct page *page = NULL;
45bb912b 341 unsigned nr_pages = (data_size + PAGE_SIZE -1) >> PAGE_SHIFT;
b411b363 342
b30ab791 343 if (drbd_insert_fault(device, DRBD_FAULT_AL_EE))
b411b363
PR
344 return NULL;
345
db830c46
AG
346 peer_req = mempool_alloc(drbd_ee_mempool, gfp_mask & ~__GFP_HIGHMEM);
347 if (!peer_req) {
b411b363 348 if (!(gfp_mask & __GFP_NOWARN))
d0180171 349 drbd_err(device, "%s: allocation failed\n", __func__);
b411b363
PR
350 return NULL;
351 }
352
a73ff323 353 if (data_size) {
69a22773 354 page = drbd_alloc_pages(peer_device, nr_pages, (gfp_mask & __GFP_WAIT));
a73ff323
LE
355 if (!page)
356 goto fail;
357 }
b411b363 358
db830c46
AG
359 drbd_clear_interval(&peer_req->i);
360 peer_req->i.size = data_size;
361 peer_req->i.sector = sector;
362 peer_req->i.local = false;
363 peer_req->i.waiting = false;
364
365 peer_req->epoch = NULL;
b30ab791 366 peer_req->w.device = device;
db830c46
AG
367 peer_req->pages = page;
368 atomic_set(&peer_req->pending_bios, 0);
369 peer_req->flags = 0;
9a8e7753
AG
370 /*
371 * The block_id is opaque to the receiver. It is not endianness
372 * converted, and sent back to the sender unchanged.
373 */
db830c46 374 peer_req->block_id = id;
b411b363 375
db830c46 376 return peer_req;
b411b363 377
45bb912b 378 fail:
db830c46 379 mempool_free(peer_req, drbd_ee_mempool);
b411b363
PR
380 return NULL;
381}
382
b30ab791 383void __drbd_free_peer_req(struct drbd_device *device, struct drbd_peer_request *peer_req,
f6ffca9f 384 int is_net)
b411b363 385{
db830c46
AG
386 if (peer_req->flags & EE_HAS_DIGEST)
387 kfree(peer_req->digest);
b30ab791 388 drbd_free_pages(device, peer_req->pages, is_net);
0b0ba1ef
AG
389 D_ASSERT(device, atomic_read(&peer_req->pending_bios) == 0);
390 D_ASSERT(device, drbd_interval_empty(&peer_req->i));
db830c46 391 mempool_free(peer_req, drbd_ee_mempool);
b411b363
PR
392}
393
b30ab791 394int drbd_free_peer_reqs(struct drbd_device *device, struct list_head *list)
b411b363
PR
395{
396 LIST_HEAD(work_list);
db830c46 397 struct drbd_peer_request *peer_req, *t;
b411b363 398 int count = 0;
b30ab791 399 int is_net = list == &device->net_ee;
b411b363 400
0500813f 401 spin_lock_irq(&device->resource->req_lock);
b411b363 402 list_splice_init(list, &work_list);
0500813f 403 spin_unlock_irq(&device->resource->req_lock);
b411b363 404
db830c46 405 list_for_each_entry_safe(peer_req, t, &work_list, w.list) {
b30ab791 406 __drbd_free_peer_req(device, peer_req, is_net);
b411b363
PR
407 count++;
408 }
409 return count;
410}
411
b411b363 412/*
a990be46 413 * See also comments in _req_mod(,BARRIER_ACKED) and receive_Barrier.
b411b363 414 */
b30ab791 415static int drbd_finish_peer_reqs(struct drbd_device *device)
b411b363
PR
416{
417 LIST_HEAD(work_list);
418 LIST_HEAD(reclaimed);
db830c46 419 struct drbd_peer_request *peer_req, *t;
e2b3032b 420 int err = 0;
b411b363 421
0500813f 422 spin_lock_irq(&device->resource->req_lock);
b30ab791
AG
423 reclaim_finished_net_peer_reqs(device, &reclaimed);
424 list_splice_init(&device->done_ee, &work_list);
0500813f 425 spin_unlock_irq(&device->resource->req_lock);
b411b363 426
db830c46 427 list_for_each_entry_safe(peer_req, t, &reclaimed, w.list)
b30ab791 428 drbd_free_net_peer_req(device, peer_req);
b411b363
PR
429
430 /* possible callbacks here:
d4dabbe2 431 * e_end_block, and e_end_resync_block, e_send_superseded.
b411b363
PR
432 * all ignore the last argument.
433 */
db830c46 434 list_for_each_entry_safe(peer_req, t, &work_list, w.list) {
e2b3032b
AG
435 int err2;
436
b411b363 437 /* list_del not necessary, next/prev members not touched */
e2b3032b
AG
438 err2 = peer_req->w.cb(&peer_req->w, !!err);
439 if (!err)
440 err = err2;
b30ab791 441 drbd_free_peer_req(device, peer_req);
b411b363 442 }
b30ab791 443 wake_up(&device->ee_wait);
b411b363 444
e2b3032b 445 return err;
b411b363
PR
446}
447
b30ab791 448static void _drbd_wait_ee_list_empty(struct drbd_device *device,
d4da1537 449 struct list_head *head)
b411b363
PR
450{
451 DEFINE_WAIT(wait);
452
453 /* avoids spin_lock/unlock
454 * and calling prepare_to_wait in the fast path */
455 while (!list_empty(head)) {
b30ab791 456 prepare_to_wait(&device->ee_wait, &wait, TASK_UNINTERRUPTIBLE);
0500813f 457 spin_unlock_irq(&device->resource->req_lock);
7eaceacc 458 io_schedule();
b30ab791 459 finish_wait(&device->ee_wait, &wait);
0500813f 460 spin_lock_irq(&device->resource->req_lock);
b411b363
PR
461 }
462}
463
b30ab791 464static void drbd_wait_ee_list_empty(struct drbd_device *device,
d4da1537 465 struct list_head *head)
b411b363 466{
0500813f 467 spin_lock_irq(&device->resource->req_lock);
b30ab791 468 _drbd_wait_ee_list_empty(device, head);
0500813f 469 spin_unlock_irq(&device->resource->req_lock);
b411b363
PR
470}
471
dbd9eea0 472static int drbd_recv_short(struct socket *sock, void *buf, size_t size, int flags)
b411b363
PR
473{
474 mm_segment_t oldfs;
475 struct kvec iov = {
476 .iov_base = buf,
477 .iov_len = size,
478 };
479 struct msghdr msg = {
480 .msg_iovlen = 1,
481 .msg_iov = (struct iovec *)&iov,
482 .msg_flags = (flags ? flags : MSG_WAITALL | MSG_NOSIGNAL)
483 };
484 int rv;
485
486 oldfs = get_fs();
487 set_fs(KERNEL_DS);
488 rv = sock_recvmsg(sock, &msg, size, msg.msg_flags);
489 set_fs(oldfs);
490
491 return rv;
492}
493
bde89a9e 494static int drbd_recv(struct drbd_connection *connection, void *buf, size_t size)
b411b363 495{
b411b363
PR
496 int rv;
497
bde89a9e 498 rv = drbd_recv_short(connection->data.socket, buf, size, 0);
b411b363 499
dbd0820c
PR
500 if (rv < 0) {
501 if (rv == -ECONNRESET)
1ec861eb 502 drbd_info(connection, "sock was reset by peer\n");
dbd0820c 503 else if (rv != -ERESTARTSYS)
1ec861eb 504 drbd_err(connection, "sock_recvmsg returned %d\n", rv);
dbd0820c 505 } else if (rv == 0) {
bde89a9e 506 if (test_bit(DISCONNECT_SENT, &connection->flags)) {
b66623e3
PR
507 long t;
508 rcu_read_lock();
bde89a9e 509 t = rcu_dereference(connection->net_conf)->ping_timeo * HZ/10;
b66623e3
PR
510 rcu_read_unlock();
511
bde89a9e 512 t = wait_event_timeout(connection->ping_wait, connection->cstate < C_WF_REPORT_PARAMS, t);
b66623e3 513
599377ac
PR
514 if (t)
515 goto out;
516 }
1ec861eb 517 drbd_info(connection, "sock was shut down by peer\n");
599377ac
PR
518 }
519
b411b363 520 if (rv != size)
bde89a9e 521 conn_request_state(connection, NS(conn, C_BROKEN_PIPE), CS_HARD);
b411b363 522
599377ac 523out:
b411b363
PR
524 return rv;
525}
526
bde89a9e 527static int drbd_recv_all(struct drbd_connection *connection, void *buf, size_t size)
c6967746
AG
528{
529 int err;
530
bde89a9e 531 err = drbd_recv(connection, buf, size);
c6967746
AG
532 if (err != size) {
533 if (err >= 0)
534 err = -EIO;
535 } else
536 err = 0;
537 return err;
538}
539
bde89a9e 540static int drbd_recv_all_warn(struct drbd_connection *connection, void *buf, size_t size)
a5c31904
AG
541{
542 int err;
543
bde89a9e 544 err = drbd_recv_all(connection, buf, size);
a5c31904 545 if (err && !signal_pending(current))
1ec861eb 546 drbd_warn(connection, "short read (expected size %d)\n", (int)size);
a5c31904
AG
547 return err;
548}
549
5dbf1673
LE
550/* quoting tcp(7):
551 * On individual connections, the socket buffer size must be set prior to the
552 * listen(2) or connect(2) calls in order to have it take effect.
553 * This is our wrapper to do so.
554 */
555static void drbd_setbufsize(struct socket *sock, unsigned int snd,
556 unsigned int rcv)
557{
558 /* open coded SO_SNDBUF, SO_RCVBUF */
559 if (snd) {
560 sock->sk->sk_sndbuf = snd;
561 sock->sk->sk_userlocks |= SOCK_SNDBUF_LOCK;
562 }
563 if (rcv) {
564 sock->sk->sk_rcvbuf = rcv;
565 sock->sk->sk_userlocks |= SOCK_RCVBUF_LOCK;
566 }
567}
568
bde89a9e 569static struct socket *drbd_try_connect(struct drbd_connection *connection)
b411b363
PR
570{
571 const char *what;
572 struct socket *sock;
573 struct sockaddr_in6 src_in6;
44ed167d
PR
574 struct sockaddr_in6 peer_in6;
575 struct net_conf *nc;
576 int err, peer_addr_len, my_addr_len;
69ef82de 577 int sndbuf_size, rcvbuf_size, connect_int;
b411b363
PR
578 int disconnect_on_error = 1;
579
44ed167d 580 rcu_read_lock();
bde89a9e 581 nc = rcu_dereference(connection->net_conf);
44ed167d
PR
582 if (!nc) {
583 rcu_read_unlock();
b411b363 584 return NULL;
44ed167d 585 }
44ed167d
PR
586 sndbuf_size = nc->sndbuf_size;
587 rcvbuf_size = nc->rcvbuf_size;
69ef82de 588 connect_int = nc->connect_int;
089c075d 589 rcu_read_unlock();
44ed167d 590
bde89a9e
AG
591 my_addr_len = min_t(int, connection->my_addr_len, sizeof(src_in6));
592 memcpy(&src_in6, &connection->my_addr, my_addr_len);
44ed167d 593
bde89a9e 594 if (((struct sockaddr *)&connection->my_addr)->sa_family == AF_INET6)
44ed167d
PR
595 src_in6.sin6_port = 0;
596 else
597 ((struct sockaddr_in *)&src_in6)->sin_port = 0; /* AF_INET & AF_SCI */
598
bde89a9e
AG
599 peer_addr_len = min_t(int, connection->peer_addr_len, sizeof(src_in6));
600 memcpy(&peer_in6, &connection->peer_addr, peer_addr_len);
b411b363
PR
601
602 what = "sock_create_kern";
44ed167d
PR
603 err = sock_create_kern(((struct sockaddr *)&src_in6)->sa_family,
604 SOCK_STREAM, IPPROTO_TCP, &sock);
b411b363
PR
605 if (err < 0) {
606 sock = NULL;
607 goto out;
608 }
609
610 sock->sk->sk_rcvtimeo =
69ef82de 611 sock->sk->sk_sndtimeo = connect_int * HZ;
44ed167d 612 drbd_setbufsize(sock, sndbuf_size, rcvbuf_size);
b411b363
PR
613
614 /* explicitly bind to the configured IP as source IP
615 * for the outgoing connections.
616 * This is needed for multihomed hosts and to be
617 * able to use lo: interfaces for drbd.
618 * Make sure to use 0 as port number, so linux selects
619 * a free one dynamically.
620 */
b411b363 621 what = "bind before connect";
44ed167d 622 err = sock->ops->bind(sock, (struct sockaddr *) &src_in6, my_addr_len);
b411b363
PR
623 if (err < 0)
624 goto out;
625
626 /* connect may fail, peer not yet available.
627 * stay C_WF_CONNECTION, don't go Disconnecting! */
628 disconnect_on_error = 0;
629 what = "connect";
44ed167d 630 err = sock->ops->connect(sock, (struct sockaddr *) &peer_in6, peer_addr_len, 0);
b411b363
PR
631
632out:
633 if (err < 0) {
634 if (sock) {
635 sock_release(sock);
636 sock = NULL;
637 }
638 switch (-err) {
639 /* timeout, busy, signal pending */
640 case ETIMEDOUT: case EAGAIN: case EINPROGRESS:
641 case EINTR: case ERESTARTSYS:
642 /* peer not (yet) available, network problem */
643 case ECONNREFUSED: case ENETUNREACH:
644 case EHOSTDOWN: case EHOSTUNREACH:
645 disconnect_on_error = 0;
646 break;
647 default:
1ec861eb 648 drbd_err(connection, "%s failed, err = %d\n", what, err);
b411b363
PR
649 }
650 if (disconnect_on_error)
bde89a9e 651 conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD);
b411b363 652 }
44ed167d 653
b411b363
PR
654 return sock;
655}
656
7a426fd8 657struct accept_wait_data {
bde89a9e 658 struct drbd_connection *connection;
7a426fd8
PR
659 struct socket *s_listen;
660 struct completion door_bell;
661 void (*original_sk_state_change)(struct sock *sk);
662
663};
664
715306f6 665static void drbd_incoming_connection(struct sock *sk)
7a426fd8
PR
666{
667 struct accept_wait_data *ad = sk->sk_user_data;
715306f6 668 void (*state_change)(struct sock *sk);
7a426fd8 669
715306f6
AG
670 state_change = ad->original_sk_state_change;
671 if (sk->sk_state == TCP_ESTABLISHED)
672 complete(&ad->door_bell);
673 state_change(sk);
7a426fd8
PR
674}
675
bde89a9e 676static int prepare_listen_socket(struct drbd_connection *connection, struct accept_wait_data *ad)
b411b363 677{
1f3e509b 678 int err, sndbuf_size, rcvbuf_size, my_addr_len;
44ed167d 679 struct sockaddr_in6 my_addr;
1f3e509b 680 struct socket *s_listen;
44ed167d 681 struct net_conf *nc;
b411b363
PR
682 const char *what;
683
44ed167d 684 rcu_read_lock();
bde89a9e 685 nc = rcu_dereference(connection->net_conf);
44ed167d
PR
686 if (!nc) {
687 rcu_read_unlock();
7a426fd8 688 return -EIO;
44ed167d 689 }
44ed167d
PR
690 sndbuf_size = nc->sndbuf_size;
691 rcvbuf_size = nc->rcvbuf_size;
44ed167d 692 rcu_read_unlock();
b411b363 693
bde89a9e
AG
694 my_addr_len = min_t(int, connection->my_addr_len, sizeof(struct sockaddr_in6));
695 memcpy(&my_addr, &connection->my_addr, my_addr_len);
b411b363
PR
696
697 what = "sock_create_kern";
44ed167d 698 err = sock_create_kern(((struct sockaddr *)&my_addr)->sa_family,
1f3e509b 699 SOCK_STREAM, IPPROTO_TCP, &s_listen);
b411b363
PR
700 if (err) {
701 s_listen = NULL;
702 goto out;
703 }
704
98683650 705 s_listen->sk->sk_reuse = SK_CAN_REUSE; /* SO_REUSEADDR */
44ed167d 706 drbd_setbufsize(s_listen, sndbuf_size, rcvbuf_size);
b411b363
PR
707
708 what = "bind before listen";
44ed167d 709 err = s_listen->ops->bind(s_listen, (struct sockaddr *)&my_addr, my_addr_len);
b411b363
PR
710 if (err < 0)
711 goto out;
712
7a426fd8
PR
713 ad->s_listen = s_listen;
714 write_lock_bh(&s_listen->sk->sk_callback_lock);
715 ad->original_sk_state_change = s_listen->sk->sk_state_change;
715306f6 716 s_listen->sk->sk_state_change = drbd_incoming_connection;
7a426fd8
PR
717 s_listen->sk->sk_user_data = ad;
718 write_unlock_bh(&s_listen->sk->sk_callback_lock);
b411b363 719
2820fd39
PR
720 what = "listen";
721 err = s_listen->ops->listen(s_listen, 5);
722 if (err < 0)
723 goto out;
724
7a426fd8 725 return 0;
b411b363
PR
726out:
727 if (s_listen)
728 sock_release(s_listen);
729 if (err < 0) {
730 if (err != -EAGAIN && err != -EINTR && err != -ERESTARTSYS) {
1ec861eb 731 drbd_err(connection, "%s failed, err = %d\n", what, err);
bde89a9e 732 conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD);
b411b363
PR
733 }
734 }
b411b363 735
7a426fd8 736 return -EIO;
b411b363
PR
737}
738
715306f6 739static void unregister_state_change(struct sock *sk, struct accept_wait_data *ad)
b411b363 740{
715306f6
AG
741 write_lock_bh(&sk->sk_callback_lock);
742 sk->sk_state_change = ad->original_sk_state_change;
743 sk->sk_user_data = NULL;
744 write_unlock_bh(&sk->sk_callback_lock);
b411b363
PR
745}
746
bde89a9e 747static struct socket *drbd_wait_for_connect(struct drbd_connection *connection, struct accept_wait_data *ad)
b411b363 748{
1f3e509b
PR
749 int timeo, connect_int, err = 0;
750 struct socket *s_estab = NULL;
1f3e509b
PR
751 struct net_conf *nc;
752
753 rcu_read_lock();
bde89a9e 754 nc = rcu_dereference(connection->net_conf);
1f3e509b
PR
755 if (!nc) {
756 rcu_read_unlock();
757 return NULL;
758 }
759 connect_int = nc->connect_int;
760 rcu_read_unlock();
761
762 timeo = connect_int * HZ;
38b682b2
AM
763 /* 28.5% random jitter */
764 timeo += (prandom_u32() & 1) ? timeo / 7 : -timeo / 7;
1f3e509b 765
7a426fd8
PR
766 err = wait_for_completion_interruptible_timeout(&ad->door_bell, timeo);
767 if (err <= 0)
768 return NULL;
b411b363 769
7a426fd8 770 err = kernel_accept(ad->s_listen, &s_estab, 0);
b411b363
PR
771 if (err < 0) {
772 if (err != -EAGAIN && err != -EINTR && err != -ERESTARTSYS) {
1ec861eb 773 drbd_err(connection, "accept failed, err = %d\n", err);
bde89a9e 774 conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD);
b411b363
PR
775 }
776 }
b411b363 777
715306f6
AG
778 if (s_estab)
779 unregister_state_change(s_estab->sk, ad);
b411b363 780
b411b363
PR
781 return s_estab;
782}
b411b363 783
bde89a9e 784static int decode_header(struct drbd_connection *, void *, struct packet_info *);
b411b363 785
bde89a9e 786static int send_first_packet(struct drbd_connection *connection, struct drbd_socket *sock,
9f5bdc33
AG
787 enum drbd_packet cmd)
788{
bde89a9e 789 if (!conn_prepare_command(connection, sock))
9f5bdc33 790 return -EIO;
bde89a9e 791 return conn_send_command(connection, sock, cmd, 0, NULL, 0);
b411b363
PR
792}
793
bde89a9e 794static int receive_first_packet(struct drbd_connection *connection, struct socket *sock)
b411b363 795{
bde89a9e 796 unsigned int header_size = drbd_header_size(connection);
9f5bdc33
AG
797 struct packet_info pi;
798 int err;
b411b363 799
bde89a9e 800 err = drbd_recv_short(sock, connection->data.rbuf, header_size, 0);
9f5bdc33
AG
801 if (err != header_size) {
802 if (err >= 0)
803 err = -EIO;
804 return err;
805 }
bde89a9e 806 err = decode_header(connection, connection->data.rbuf, &pi);
9f5bdc33
AG
807 if (err)
808 return err;
809 return pi.cmd;
b411b363
PR
810}
811
812/**
813 * drbd_socket_okay() - Free the socket if its connection is not okay
b411b363
PR
814 * @sock: pointer to the pointer to the socket.
815 */
dbd9eea0 816static int drbd_socket_okay(struct socket **sock)
b411b363
PR
817{
818 int rr;
819 char tb[4];
820
821 if (!*sock)
81e84650 822 return false;
b411b363 823
dbd9eea0 824 rr = drbd_recv_short(*sock, tb, 4, MSG_DONTWAIT | MSG_PEEK);
b411b363
PR
825
826 if (rr > 0 || rr == -EAGAIN) {
81e84650 827 return true;
b411b363
PR
828 } else {
829 sock_release(*sock);
830 *sock = NULL;
81e84650 831 return false;
b411b363
PR
832 }
833}
2325eb66
PR
834/* Gets called if a connection is established, or if a new minor gets created
835 in a connection */
69a22773 836int drbd_connected(struct drbd_peer_device *peer_device)
907599e0 837{
69a22773 838 struct drbd_device *device = peer_device->device;
0829f5ed 839 int err;
907599e0 840
b30ab791
AG
841 atomic_set(&device->packet_seq, 0);
842 device->peer_seq = 0;
907599e0 843
69a22773
AG
844 device->state_mutex = peer_device->connection->agreed_pro_version < 100 ?
845 &peer_device->connection->cstate_mutex :
b30ab791 846 &device->own_state_mutex;
8410da8f 847
69a22773 848 err = drbd_send_sync_param(peer_device);
0829f5ed 849 if (!err)
69a22773 850 err = drbd_send_sizes(peer_device, 0, 0);
0829f5ed 851 if (!err)
69a22773 852 err = drbd_send_uuids(peer_device);
0829f5ed 853 if (!err)
69a22773 854 err = drbd_send_current_state(peer_device);
b30ab791
AG
855 clear_bit(USE_DEGR_WFC_T, &device->flags);
856 clear_bit(RESIZE_PENDING, &device->flags);
857 atomic_set(&device->ap_in_flight, 0);
858 mod_timer(&device->request_timer, jiffies + HZ); /* just start it here. */
0829f5ed 859 return err;
907599e0 860}
b411b363
PR
861
862/*
863 * return values:
864 * 1 yes, we have a valid connection
865 * 0 oops, did not work out, please try again
866 * -1 peer talks different language,
867 * no point in trying again, please go standalone.
868 * -2 We do not have a network config...
869 */
bde89a9e 870static int conn_connect(struct drbd_connection *connection)
b411b363 871{
7da35862 872 struct drbd_socket sock, msock;
c06ece6b 873 struct drbd_peer_device *peer_device;
44ed167d 874 struct net_conf *nc;
92f14951 875 int vnr, timeout, h, ok;
08b165ba 876 bool discard_my_data;
197296ff 877 enum drbd_state_rv rv;
7a426fd8 878 struct accept_wait_data ad = {
bde89a9e 879 .connection = connection,
7a426fd8
PR
880 .door_bell = COMPLETION_INITIALIZER_ONSTACK(ad.door_bell),
881 };
b411b363 882
bde89a9e
AG
883 clear_bit(DISCONNECT_SENT, &connection->flags);
884 if (conn_request_state(connection, NS(conn, C_WF_CONNECTION), CS_VERBOSE) < SS_SUCCESS)
b411b363
PR
885 return -2;
886
7da35862 887 mutex_init(&sock.mutex);
bde89a9e
AG
888 sock.sbuf = connection->data.sbuf;
889 sock.rbuf = connection->data.rbuf;
7da35862
PR
890 sock.socket = NULL;
891 mutex_init(&msock.mutex);
bde89a9e
AG
892 msock.sbuf = connection->meta.sbuf;
893 msock.rbuf = connection->meta.rbuf;
7da35862
PR
894 msock.socket = NULL;
895
0916e0e3 896 /* Assume that the peer only understands protocol 80 until we know better. */
bde89a9e 897 connection->agreed_pro_version = 80;
b411b363 898
bde89a9e 899 if (prepare_listen_socket(connection, &ad))
7a426fd8 900 return 0;
b411b363
PR
901
902 do {
2bf89621 903 struct socket *s;
b411b363 904
bde89a9e 905 s = drbd_try_connect(connection);
b411b363 906 if (s) {
7da35862
PR
907 if (!sock.socket) {
908 sock.socket = s;
bde89a9e 909 send_first_packet(connection, &sock, P_INITIAL_DATA);
7da35862 910 } else if (!msock.socket) {
bde89a9e 911 clear_bit(RESOLVE_CONFLICTS, &connection->flags);
7da35862 912 msock.socket = s;
bde89a9e 913 send_first_packet(connection, &msock, P_INITIAL_META);
b411b363 914 } else {
1ec861eb 915 drbd_err(connection, "Logic error in conn_connect()\n");
b411b363
PR
916 goto out_release_sockets;
917 }
918 }
919
7da35862
PR
920 if (sock.socket && msock.socket) {
921 rcu_read_lock();
bde89a9e 922 nc = rcu_dereference(connection->net_conf);
7da35862
PR
923 timeout = nc->ping_timeo * HZ / 10;
924 rcu_read_unlock();
925 schedule_timeout_interruptible(timeout);
926 ok = drbd_socket_okay(&sock.socket);
927 ok = drbd_socket_okay(&msock.socket) && ok;
b411b363
PR
928 if (ok)
929 break;
930 }
931
932retry:
bde89a9e 933 s = drbd_wait_for_connect(connection, &ad);
b411b363 934 if (s) {
bde89a9e 935 int fp = receive_first_packet(connection, s);
7da35862
PR
936 drbd_socket_okay(&sock.socket);
937 drbd_socket_okay(&msock.socket);
92f14951 938 switch (fp) {
e5d6f33a 939 case P_INITIAL_DATA:
7da35862 940 if (sock.socket) {
1ec861eb 941 drbd_warn(connection, "initial packet S crossed\n");
7da35862 942 sock_release(sock.socket);
80c6eed4
PR
943 sock.socket = s;
944 goto randomize;
b411b363 945 }
7da35862 946 sock.socket = s;
b411b363 947 break;
e5d6f33a 948 case P_INITIAL_META:
bde89a9e 949 set_bit(RESOLVE_CONFLICTS, &connection->flags);
7da35862 950 if (msock.socket) {
1ec861eb 951 drbd_warn(connection, "initial packet M crossed\n");
7da35862 952 sock_release(msock.socket);
80c6eed4
PR
953 msock.socket = s;
954 goto randomize;
b411b363 955 }
7da35862 956 msock.socket = s;
b411b363
PR
957 break;
958 default:
1ec861eb 959 drbd_warn(connection, "Error receiving initial packet\n");
b411b363 960 sock_release(s);
80c6eed4 961randomize:
38b682b2 962 if (prandom_u32() & 1)
b411b363
PR
963 goto retry;
964 }
965 }
966
bde89a9e 967 if (connection->cstate <= C_DISCONNECTING)
b411b363
PR
968 goto out_release_sockets;
969 if (signal_pending(current)) {
970 flush_signals(current);
971 smp_rmb();
bde89a9e 972 if (get_t_state(&connection->receiver) == EXITING)
b411b363
PR
973 goto out_release_sockets;
974 }
975
b666dbf8
PR
976 ok = drbd_socket_okay(&sock.socket);
977 ok = drbd_socket_okay(&msock.socket) && ok;
978 } while (!ok);
b411b363 979
7a426fd8
PR
980 if (ad.s_listen)
981 sock_release(ad.s_listen);
b411b363 982
98683650
PR
983 sock.socket->sk->sk_reuse = SK_CAN_REUSE; /* SO_REUSEADDR */
984 msock.socket->sk->sk_reuse = SK_CAN_REUSE; /* SO_REUSEADDR */
b411b363 985
7da35862
PR
986 sock.socket->sk->sk_allocation = GFP_NOIO;
987 msock.socket->sk->sk_allocation = GFP_NOIO;
b411b363 988
7da35862
PR
989 sock.socket->sk->sk_priority = TC_PRIO_INTERACTIVE_BULK;
990 msock.socket->sk->sk_priority = TC_PRIO_INTERACTIVE;
b411b363 991
b411b363 992 /* NOT YET ...
bde89a9e 993 * sock.socket->sk->sk_sndtimeo = connection->net_conf->timeout*HZ/10;
7da35862 994 * sock.socket->sk->sk_rcvtimeo = MAX_SCHEDULE_TIMEOUT;
6038178e 995 * first set it to the P_CONNECTION_FEATURES timeout,
b411b363 996 * which we set to 4x the configured ping_timeout. */
44ed167d 997 rcu_read_lock();
bde89a9e 998 nc = rcu_dereference(connection->net_conf);
44ed167d 999
7da35862
PR
1000 sock.socket->sk->sk_sndtimeo =
1001 sock.socket->sk->sk_rcvtimeo = nc->ping_timeo*4*HZ/10;
b411b363 1002
7da35862 1003 msock.socket->sk->sk_rcvtimeo = nc->ping_int*HZ;
44ed167d 1004 timeout = nc->timeout * HZ / 10;
08b165ba 1005 discard_my_data = nc->discard_my_data;
44ed167d 1006 rcu_read_unlock();
b411b363 1007
7da35862 1008 msock.socket->sk->sk_sndtimeo = timeout;
b411b363
PR
1009
1010 /* we don't want delays.
25985edc 1011 * we use TCP_CORK where appropriate, though */
7da35862
PR
1012 drbd_tcp_nodelay(sock.socket);
1013 drbd_tcp_nodelay(msock.socket);
b411b363 1014
bde89a9e
AG
1015 connection->data.socket = sock.socket;
1016 connection->meta.socket = msock.socket;
1017 connection->last_received = jiffies;
b411b363 1018
bde89a9e 1019 h = drbd_do_features(connection);
b411b363
PR
1020 if (h <= 0)
1021 return h;
1022
bde89a9e 1023 if (connection->cram_hmac_tfm) {
b30ab791 1024 /* drbd_request_state(device, NS(conn, WFAuth)); */
bde89a9e 1025 switch (drbd_do_auth(connection)) {
b10d96cb 1026 case -1:
1ec861eb 1027 drbd_err(connection, "Authentication of peer failed\n");
b411b363 1028 return -1;
b10d96cb 1029 case 0:
1ec861eb 1030 drbd_err(connection, "Authentication of peer failed, trying again.\n");
b10d96cb 1031 return 0;
b411b363
PR
1032 }
1033 }
1034
bde89a9e
AG
1035 connection->data.socket->sk->sk_sndtimeo = timeout;
1036 connection->data.socket->sk->sk_rcvtimeo = MAX_SCHEDULE_TIMEOUT;
b411b363 1037
bde89a9e 1038 if (drbd_send_protocol(connection) == -EOPNOTSUPP)
7e2455c1 1039 return -1;
b411b363 1040
bde89a9e 1041 set_bit(STATE_SENT, &connection->flags);
a1096a6e 1042
c141ebda 1043 rcu_read_lock();
c06ece6b
AG
1044 idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
1045 struct drbd_device *device = peer_device->device;
b30ab791 1046 kref_get(&device->kref);
26ea8f92
AG
1047 rcu_read_unlock();
1048
13c76aba
PR
1049 /* Prevent a race between resync-handshake and
1050 * being promoted to Primary.
1051 *
1052 * Grab and release the state mutex, so we know that any current
1053 * drbd_set_role() is finished, and any incoming drbd_set_role
1054 * will see the STATE_SENT flag, and wait for it to be cleared.
1055 */
b30ab791
AG
1056 mutex_lock(device->state_mutex);
1057 mutex_unlock(device->state_mutex);
13c76aba 1058
08b165ba 1059 if (discard_my_data)
b30ab791 1060 set_bit(DISCARD_MY_DATA, &device->flags);
08b165ba 1061 else
b30ab791 1062 clear_bit(DISCARD_MY_DATA, &device->flags);
08b165ba 1063
69a22773 1064 drbd_connected(peer_device);
05a10ec7 1065 kref_put(&device->kref, drbd_destroy_device);
c141ebda
PR
1066 rcu_read_lock();
1067 }
1068 rcu_read_unlock();
1069
bde89a9e
AG
1070 rv = conn_request_state(connection, NS(conn, C_WF_REPORT_PARAMS), CS_VERBOSE);
1071 if (rv < SS_SUCCESS || connection->cstate != C_WF_REPORT_PARAMS) {
1072 clear_bit(STATE_SENT, &connection->flags);
1e86ac48 1073 return 0;
a1096a6e 1074 }
1e86ac48 1075
bde89a9e 1076 drbd_thread_start(&connection->asender);
b411b363 1077
0500813f 1078 mutex_lock(&connection->resource->conf_update);
08b165ba
PR
1079 /* The discard_my_data flag is a single-shot modifier to the next
1080 * connection attempt, the handshake of which is now well underway.
1081 * No need for rcu style copying of the whole struct
1082 * just to clear a single value. */
bde89a9e 1083 connection->net_conf->discard_my_data = 0;
0500813f 1084 mutex_unlock(&connection->resource->conf_update);
08b165ba 1085
d3fcb490 1086 return h;
b411b363
PR
1087
1088out_release_sockets:
7a426fd8
PR
1089 if (ad.s_listen)
1090 sock_release(ad.s_listen);
7da35862
PR
1091 if (sock.socket)
1092 sock_release(sock.socket);
1093 if (msock.socket)
1094 sock_release(msock.socket);
b411b363
PR
1095 return -1;
1096}
1097
bde89a9e 1098static int decode_header(struct drbd_connection *connection, void *header, struct packet_info *pi)
b411b363 1099{
bde89a9e 1100 unsigned int header_size = drbd_header_size(connection);
e658983a 1101
0c8e36d9
AG
1102 if (header_size == sizeof(struct p_header100) &&
1103 *(__be32 *)header == cpu_to_be32(DRBD_MAGIC_100)) {
1104 struct p_header100 *h = header;
1105 if (h->pad != 0) {
1ec861eb 1106 drbd_err(connection, "Header padding is not zero\n");
0c8e36d9
AG
1107 return -EINVAL;
1108 }
1109 pi->vnr = be16_to_cpu(h->volume);
1110 pi->cmd = be16_to_cpu(h->command);
1111 pi->size = be32_to_cpu(h->length);
1112 } else if (header_size == sizeof(struct p_header95) &&
1113 *(__be16 *)header == cpu_to_be16(DRBD_MAGIC_BIG)) {
e658983a 1114 struct p_header95 *h = header;
e658983a 1115 pi->cmd = be16_to_cpu(h->command);
b55d84ba
AG
1116 pi->size = be32_to_cpu(h->length);
1117 pi->vnr = 0;
e658983a
AG
1118 } else if (header_size == sizeof(struct p_header80) &&
1119 *(__be32 *)header == cpu_to_be32(DRBD_MAGIC)) {
1120 struct p_header80 *h = header;
1121 pi->cmd = be16_to_cpu(h->command);
1122 pi->size = be16_to_cpu(h->length);
77351055 1123 pi->vnr = 0;
02918be2 1124 } else {
1ec861eb 1125 drbd_err(connection, "Wrong magic value 0x%08x in protocol version %d\n",
e658983a 1126 be32_to_cpu(*(__be32 *)header),
bde89a9e 1127 connection->agreed_pro_version);
8172f3e9 1128 return -EINVAL;
b411b363 1129 }
e658983a 1130 pi->data = header + header_size;
8172f3e9 1131 return 0;
257d0af6 1132}
b411b363 1133
bde89a9e 1134static int drbd_recv_header(struct drbd_connection *connection, struct packet_info *pi)
257d0af6 1135{
bde89a9e 1136 void *buffer = connection->data.rbuf;
69bc7bc3 1137 int err;
257d0af6 1138
bde89a9e 1139 err = drbd_recv_all_warn(connection, buffer, drbd_header_size(connection));
a5c31904 1140 if (err)
69bc7bc3 1141 return err;
257d0af6 1142
bde89a9e
AG
1143 err = decode_header(connection, buffer, pi);
1144 connection->last_received = jiffies;
b411b363 1145
69bc7bc3 1146 return err;
b411b363
PR
1147}
1148
bde89a9e 1149static void drbd_flush(struct drbd_connection *connection)
b411b363
PR
1150{
1151 int rv;
c06ece6b 1152 struct drbd_peer_device *peer_device;
4b0007c0
PR
1153 int vnr;
1154
bde89a9e 1155 if (connection->write_ordering >= WO_bdev_flush) {
615e087f 1156 rcu_read_lock();
c06ece6b
AG
1157 idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
1158 struct drbd_device *device = peer_device->device;
1159
b30ab791 1160 if (!get_ldev(device))
615e087f 1161 continue;
b30ab791 1162 kref_get(&device->kref);
615e087f
LE
1163 rcu_read_unlock();
1164
b30ab791 1165 rv = blkdev_issue_flush(device->ldev->backing_bdev,
615e087f
LE
1166 GFP_NOIO, NULL);
1167 if (rv) {
d0180171 1168 drbd_info(device, "local disk flush failed with status %d\n", rv);
615e087f
LE
1169 /* would rather check on EOPNOTSUPP, but that is not reliable.
1170 * don't try again for ANY return value != 0
1171 * if (rv == -EOPNOTSUPP) */
bde89a9e 1172 drbd_bump_write_ordering(connection, WO_drain_io);
4b0007c0 1173 }
b30ab791 1174 put_ldev(device);
05a10ec7 1175 kref_put(&device->kref, drbd_destroy_device);
b411b363 1176
615e087f
LE
1177 rcu_read_lock();
1178 if (rv)
1179 break;
b411b363 1180 }
615e087f 1181 rcu_read_unlock();
b411b363 1182 }
b411b363
PR
1183}
1184
1185/**
1186 * drbd_may_finish_epoch() - Applies an epoch_event to the epoch's state, eventually finishes it.
b30ab791 1187 * @device: DRBD device.
b411b363
PR
1188 * @epoch: Epoch object.
1189 * @ev: Epoch event.
1190 */
bde89a9e 1191static enum finish_epoch drbd_may_finish_epoch(struct drbd_connection *connection,
b411b363
PR
1192 struct drbd_epoch *epoch,
1193 enum epoch_event ev)
1194{
2451fc3b 1195 int epoch_size;
b411b363 1196 struct drbd_epoch *next_epoch;
b411b363
PR
1197 enum finish_epoch rv = FE_STILL_LIVE;
1198
bde89a9e 1199 spin_lock(&connection->epoch_lock);
b411b363
PR
1200 do {
1201 next_epoch = NULL;
b411b363
PR
1202
1203 epoch_size = atomic_read(&epoch->epoch_size);
1204
1205 switch (ev & ~EV_CLEANUP) {
1206 case EV_PUT:
1207 atomic_dec(&epoch->active);
1208 break;
1209 case EV_GOT_BARRIER_NR:
1210 set_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags);
b411b363
PR
1211 break;
1212 case EV_BECAME_LAST:
1213 /* nothing to do*/
1214 break;
1215 }
1216
b411b363
PR
1217 if (epoch_size != 0 &&
1218 atomic_read(&epoch->active) == 0 &&
80f9fd55 1219 (test_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags) || ev & EV_CLEANUP)) {
b411b363 1220 if (!(ev & EV_CLEANUP)) {
bde89a9e
AG
1221 spin_unlock(&connection->epoch_lock);
1222 drbd_send_b_ack(epoch->connection, epoch->barrier_nr, epoch_size);
1223 spin_lock(&connection->epoch_lock);
b411b363 1224 }
9ed57dcb
LE
1225#if 0
1226 /* FIXME: dec unacked on connection, once we have
1227 * something to count pending connection packets in. */
80f9fd55 1228 if (test_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags))
bde89a9e 1229 dec_unacked(epoch->connection);
9ed57dcb 1230#endif
b411b363 1231
bde89a9e 1232 if (connection->current_epoch != epoch) {
b411b363
PR
1233 next_epoch = list_entry(epoch->list.next, struct drbd_epoch, list);
1234 list_del(&epoch->list);
1235 ev = EV_BECAME_LAST | (ev & EV_CLEANUP);
bde89a9e 1236 connection->epochs--;
b411b363
PR
1237 kfree(epoch);
1238
1239 if (rv == FE_STILL_LIVE)
1240 rv = FE_DESTROYED;
1241 } else {
1242 epoch->flags = 0;
1243 atomic_set(&epoch->epoch_size, 0);
698f9315 1244 /* atomic_set(&epoch->active, 0); is already zero */
b411b363
PR
1245 if (rv == FE_STILL_LIVE)
1246 rv = FE_RECYCLED;
1247 }
1248 }
1249
1250 if (!next_epoch)
1251 break;
1252
1253 epoch = next_epoch;
1254 } while (1);
1255
bde89a9e 1256 spin_unlock(&connection->epoch_lock);
b411b363 1257
b411b363
PR
1258 return rv;
1259}
1260
1261/**
1262 * drbd_bump_write_ordering() - Fall back to an other write ordering method
bde89a9e 1263 * @connection: DRBD connection.
b411b363
PR
1264 * @wo: Write ordering method to try.
1265 */
bde89a9e 1266void drbd_bump_write_ordering(struct drbd_connection *connection, enum write_ordering_e wo)
b411b363 1267{
daeda1cc 1268 struct disk_conf *dc;
c06ece6b 1269 struct drbd_peer_device *peer_device;
b411b363 1270 enum write_ordering_e pwo;
4b0007c0 1271 int vnr;
b411b363
PR
1272 static char *write_ordering_str[] = {
1273 [WO_none] = "none",
1274 [WO_drain_io] = "drain",
1275 [WO_bdev_flush] = "flush",
b411b363
PR
1276 };
1277
bde89a9e 1278 pwo = connection->write_ordering;
b411b363 1279 wo = min(pwo, wo);
daeda1cc 1280 rcu_read_lock();
c06ece6b
AG
1281 idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
1282 struct drbd_device *device = peer_device->device;
1283
b30ab791 1284 if (!get_ldev_if_state(device, D_ATTACHING))
4b0007c0 1285 continue;
b30ab791 1286 dc = rcu_dereference(device->ldev->disk_conf);
4b0007c0
PR
1287
1288 if (wo == WO_bdev_flush && !dc->disk_flushes)
1289 wo = WO_drain_io;
1290 if (wo == WO_drain_io && !dc->disk_drain)
1291 wo = WO_none;
b30ab791 1292 put_ldev(device);
4b0007c0 1293 }
daeda1cc 1294 rcu_read_unlock();
bde89a9e
AG
1295 connection->write_ordering = wo;
1296 if (pwo != connection->write_ordering || wo == WO_bdev_flush)
1ec861eb 1297 drbd_info(connection, "Method to ensure write ordering: %s\n", write_ordering_str[connection->write_ordering]);
b411b363
PR
1298}
1299
45bb912b 1300/**
fbe29dec 1301 * drbd_submit_peer_request()
b30ab791 1302 * @device: DRBD device.
db830c46 1303 * @peer_req: peer request
45bb912b 1304 * @rw: flag field, see bio->bi_rw
10f6d992
LE
1305 *
1306 * May spread the pages to multiple bios,
1307 * depending on bio_add_page restrictions.
1308 *
1309 * Returns 0 if all bios have been submitted,
1310 * -ENOMEM if we could not allocate enough bios,
1311 * -ENOSPC (any better suggestion?) if we have not been able to bio_add_page a
1312 * single page to an empty bio (which should never happen and likely indicates
1313 * that the lower level IO stack is in some way broken). This has been observed
1314 * on certain Xen deployments.
45bb912b
LE
1315 */
1316/* TODO allocate from our own bio_set. */
b30ab791 1317int drbd_submit_peer_request(struct drbd_device *device,
fbe29dec
AG
1318 struct drbd_peer_request *peer_req,
1319 const unsigned rw, const int fault_type)
45bb912b
LE
1320{
1321 struct bio *bios = NULL;
1322 struct bio *bio;
db830c46
AG
1323 struct page *page = peer_req->pages;
1324 sector_t sector = peer_req->i.sector;
1325 unsigned ds = peer_req->i.size;
45bb912b
LE
1326 unsigned n_bios = 0;
1327 unsigned nr_pages = (ds + PAGE_SIZE -1) >> PAGE_SHIFT;
10f6d992 1328 int err = -ENOMEM;
45bb912b
LE
1329
1330 /* In most cases, we will only need one bio. But in case the lower
1331 * level restrictions happen to be different at this offset on this
1332 * side than those of the sending peer, we may need to submit the
9476f39d
LE
1333 * request in more than one bio.
1334 *
1335 * Plain bio_alloc is good enough here, this is no DRBD internally
1336 * generated bio, but a bio allocated on behalf of the peer.
1337 */
45bb912b
LE
1338next_bio:
1339 bio = bio_alloc(GFP_NOIO, nr_pages);
1340 if (!bio) {
d0180171 1341 drbd_err(device, "submit_ee: Allocation of a bio failed\n");
45bb912b
LE
1342 goto fail;
1343 }
db830c46 1344 /* > peer_req->i.sector, unless this is the first bio */
4f024f37 1345 bio->bi_iter.bi_sector = sector;
b30ab791 1346 bio->bi_bdev = device->ldev->backing_bdev;
45bb912b 1347 bio->bi_rw = rw;
db830c46 1348 bio->bi_private = peer_req;
fcefa62e 1349 bio->bi_end_io = drbd_peer_request_endio;
45bb912b
LE
1350
1351 bio->bi_next = bios;
1352 bios = bio;
1353 ++n_bios;
1354
1355 page_chain_for_each(page) {
1356 unsigned len = min_t(unsigned, ds, PAGE_SIZE);
1357 if (!bio_add_page(bio, page, len, 0)) {
10f6d992
LE
1358 /* A single page must always be possible!
1359 * But in case it fails anyways,
1360 * we deal with it, and complain (below). */
1361 if (bio->bi_vcnt == 0) {
d0180171 1362 drbd_err(device,
10f6d992
LE
1363 "bio_add_page failed for len=%u, "
1364 "bi_vcnt=0 (bi_sector=%llu)\n",
4f024f37 1365 len, (uint64_t)bio->bi_iter.bi_sector);
10f6d992
LE
1366 err = -ENOSPC;
1367 goto fail;
1368 }
45bb912b
LE
1369 goto next_bio;
1370 }
1371 ds -= len;
1372 sector += len >> 9;
1373 --nr_pages;
1374 }
0b0ba1ef
AG
1375 D_ASSERT(device, page == NULL);
1376 D_ASSERT(device, ds == 0);
45bb912b 1377
db830c46 1378 atomic_set(&peer_req->pending_bios, n_bios);
45bb912b
LE
1379 do {
1380 bio = bios;
1381 bios = bios->bi_next;
1382 bio->bi_next = NULL;
1383
b30ab791 1384 drbd_generic_make_request(device, fault_type, bio);
45bb912b 1385 } while (bios);
45bb912b
LE
1386 return 0;
1387
1388fail:
1389 while (bios) {
1390 bio = bios;
1391 bios = bios->bi_next;
1392 bio_put(bio);
1393 }
10f6d992 1394 return err;
45bb912b
LE
1395}
1396
b30ab791 1397static void drbd_remove_epoch_entry_interval(struct drbd_device *device,
db830c46 1398 struct drbd_peer_request *peer_req)
53840641 1399{
db830c46 1400 struct drbd_interval *i = &peer_req->i;
53840641 1401
b30ab791 1402 drbd_remove_interval(&device->write_requests, i);
53840641
AG
1403 drbd_clear_interval(i);
1404
6c852bec 1405 /* Wake up any processes waiting for this peer request to complete. */
53840641 1406 if (i->waiting)
b30ab791 1407 wake_up(&device->misc_wait);
53840641
AG
1408}
1409
bde89a9e 1410static void conn_wait_active_ee_empty(struct drbd_connection *connection)
77fede51 1411{
c06ece6b 1412 struct drbd_peer_device *peer_device;
77fede51
PR
1413 int vnr;
1414
1415 rcu_read_lock();
c06ece6b
AG
1416 idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
1417 struct drbd_device *device = peer_device->device;
1418
b30ab791 1419 kref_get(&device->kref);
77fede51 1420 rcu_read_unlock();
b30ab791 1421 drbd_wait_ee_list_empty(device, &device->active_ee);
05a10ec7 1422 kref_put(&device->kref, drbd_destroy_device);
77fede51
PR
1423 rcu_read_lock();
1424 }
1425 rcu_read_unlock();
1426}
1427
9f4fe9ad
AG
1428static struct drbd_peer_device *
1429conn_peer_device(struct drbd_connection *connection, int volume_number)
1430{
1431 return idr_find(&connection->peer_devices, volume_number);
1432}
1433
bde89a9e 1434static int receive_Barrier(struct drbd_connection *connection, struct packet_info *pi)
b411b363 1435{
2451fc3b 1436 int rv;
e658983a 1437 struct p_barrier *p = pi->data;
b411b363
PR
1438 struct drbd_epoch *epoch;
1439
9ed57dcb
LE
1440 /* FIXME these are unacked on connection,
1441 * not a specific (peer)device.
1442 */
bde89a9e
AG
1443 connection->current_epoch->barrier_nr = p->barrier;
1444 connection->current_epoch->connection = connection;
1445 rv = drbd_may_finish_epoch(connection, connection->current_epoch, EV_GOT_BARRIER_NR);
b411b363
PR
1446
1447 /* P_BARRIER_ACK may imply that the corresponding extent is dropped from
1448 * the activity log, which means it would not be resynced in case the
1449 * R_PRIMARY crashes now.
1450 * Therefore we must send the barrier_ack after the barrier request was
1451 * completed. */
bde89a9e 1452 switch (connection->write_ordering) {
b411b363
PR
1453 case WO_none:
1454 if (rv == FE_RECYCLED)
82bc0194 1455 return 0;
2451fc3b
PR
1456
1457 /* receiver context, in the writeout path of the other node.
1458 * avoid potential distributed deadlock */
1459 epoch = kmalloc(sizeof(struct drbd_epoch), GFP_NOIO);
1460 if (epoch)
1461 break;
1462 else
1ec861eb 1463 drbd_warn(connection, "Allocation of an epoch failed, slowing down\n");
2451fc3b 1464 /* Fall through */
b411b363
PR
1465
1466 case WO_bdev_flush:
1467 case WO_drain_io:
bde89a9e
AG
1468 conn_wait_active_ee_empty(connection);
1469 drbd_flush(connection);
2451fc3b 1470
bde89a9e 1471 if (atomic_read(&connection->current_epoch->epoch_size)) {
2451fc3b
PR
1472 epoch = kmalloc(sizeof(struct drbd_epoch), GFP_NOIO);
1473 if (epoch)
1474 break;
b411b363
PR
1475 }
1476
82bc0194 1477 return 0;
2451fc3b 1478 default:
1ec861eb 1479 drbd_err(connection, "Strangeness in connection->write_ordering %d\n", connection->write_ordering);
82bc0194 1480 return -EIO;
b411b363
PR
1481 }
1482
1483 epoch->flags = 0;
1484 atomic_set(&epoch->epoch_size, 0);
1485 atomic_set(&epoch->active, 0);
1486
bde89a9e
AG
1487 spin_lock(&connection->epoch_lock);
1488 if (atomic_read(&connection->current_epoch->epoch_size)) {
1489 list_add(&epoch->list, &connection->current_epoch->list);
1490 connection->current_epoch = epoch;
1491 connection->epochs++;
b411b363
PR
1492 } else {
1493 /* The current_epoch got recycled while we allocated this one... */
1494 kfree(epoch);
1495 }
bde89a9e 1496 spin_unlock(&connection->epoch_lock);
b411b363 1497
82bc0194 1498 return 0;
b411b363
PR
1499}
1500
1501/* used from receive_RSDataReply (recv_resync_read)
1502 * and from receive_Data */
f6ffca9f 1503static struct drbd_peer_request *
69a22773 1504read_in_block(struct drbd_peer_device *peer_device, u64 id, sector_t sector,
f6ffca9f 1505 int data_size) __must_hold(local)
b411b363 1506{
69a22773 1507 struct drbd_device *device = peer_device->device;
b30ab791 1508 const sector_t capacity = drbd_get_capacity(device->this_bdev);
db830c46 1509 struct drbd_peer_request *peer_req;
b411b363 1510 struct page *page;
a5c31904 1511 int dgs, ds, err;
69a22773
AG
1512 void *dig_in = peer_device->connection->int_dig_in;
1513 void *dig_vv = peer_device->connection->int_dig_vv;
6b4388ac 1514 unsigned long *data;
b411b363 1515
88104ca4 1516 dgs = 0;
69a22773
AG
1517 if (peer_device->connection->peer_integrity_tfm) {
1518 dgs = crypto_hash_digestsize(peer_device->connection->peer_integrity_tfm);
9f5bdc33
AG
1519 /*
1520 * FIXME: Receive the incoming digest into the receive buffer
1521 * here, together with its struct p_data?
1522 */
69a22773 1523 err = drbd_recv_all_warn(peer_device->connection, dig_in, dgs);
a5c31904 1524 if (err)
b411b363 1525 return NULL;
88104ca4 1526 data_size -= dgs;
b411b363
PR
1527 }
1528
841ce241
AG
1529 if (!expect(IS_ALIGNED(data_size, 512)))
1530 return NULL;
1531 if (!expect(data_size <= DRBD_MAX_BIO_SIZE))
1532 return NULL;
b411b363 1533
6666032a
LE
1534 /* even though we trust out peer,
1535 * we sometimes have to double check. */
1536 if (sector + (data_size>>9) > capacity) {
d0180171 1537 drbd_err(device, "request from peer beyond end of local disk: "
fdda6544 1538 "capacity: %llus < sector: %llus + size: %u\n",
6666032a
LE
1539 (unsigned long long)capacity,
1540 (unsigned long long)sector, data_size);
1541 return NULL;
1542 }
1543
b411b363
PR
1544 /* GFP_NOIO, because we must not cause arbitrary write-out: in a DRBD
1545 * "criss-cross" setup, that might cause write-out on some other DRBD,
1546 * which in turn might block on the other node at this very place. */
69a22773 1547 peer_req = drbd_alloc_peer_req(peer_device, id, sector, data_size, GFP_NOIO);
db830c46 1548 if (!peer_req)
b411b363 1549 return NULL;
45bb912b 1550
a73ff323 1551 if (!data_size)
81a3537a 1552 return peer_req;
a73ff323 1553
b411b363 1554 ds = data_size;
db830c46 1555 page = peer_req->pages;
45bb912b
LE
1556 page_chain_for_each(page) {
1557 unsigned len = min_t(int, ds, PAGE_SIZE);
6b4388ac 1558 data = kmap(page);
69a22773 1559 err = drbd_recv_all_warn(peer_device->connection, data, len);
b30ab791 1560 if (drbd_insert_fault(device, DRBD_FAULT_RECEIVE)) {
d0180171 1561 drbd_err(device, "Fault injection: Corrupting data on receive\n");
6b4388ac
PR
1562 data[0] = data[0] ^ (unsigned long)-1;
1563 }
b411b363 1564 kunmap(page);
a5c31904 1565 if (err) {
b30ab791 1566 drbd_free_peer_req(device, peer_req);
b411b363
PR
1567 return NULL;
1568 }
a5c31904 1569 ds -= len;
b411b363
PR
1570 }
1571
1572 if (dgs) {
69a22773 1573 drbd_csum_ee(peer_device->connection->peer_integrity_tfm, peer_req, dig_vv);
b411b363 1574 if (memcmp(dig_in, dig_vv, dgs)) {
d0180171 1575 drbd_err(device, "Digest integrity check FAILED: %llus +%u\n",
470be44a 1576 (unsigned long long)sector, data_size);
b30ab791 1577 drbd_free_peer_req(device, peer_req);
b411b363
PR
1578 return NULL;
1579 }
1580 }
b30ab791 1581 device->recv_cnt += data_size>>9;
db830c46 1582 return peer_req;
b411b363
PR
1583}
1584
1585/* drbd_drain_block() just takes a data block
1586 * out of the socket input buffer, and discards it.
1587 */
69a22773 1588static int drbd_drain_block(struct drbd_peer_device *peer_device, int data_size)
b411b363
PR
1589{
1590 struct page *page;
a5c31904 1591 int err = 0;
b411b363
PR
1592 void *data;
1593
c3470cde 1594 if (!data_size)
fc5be839 1595 return 0;
c3470cde 1596
69a22773 1597 page = drbd_alloc_pages(peer_device, 1, 1);
b411b363
PR
1598
1599 data = kmap(page);
1600 while (data_size) {
fc5be839
AG
1601 unsigned int len = min_t(int, data_size, PAGE_SIZE);
1602
69a22773 1603 err = drbd_recv_all_warn(peer_device->connection, data, len);
a5c31904 1604 if (err)
b411b363 1605 break;
a5c31904 1606 data_size -= len;
b411b363
PR
1607 }
1608 kunmap(page);
69a22773 1609 drbd_free_pages(peer_device->device, page, 0);
fc5be839 1610 return err;
b411b363
PR
1611}
1612
69a22773 1613static int recv_dless_read(struct drbd_peer_device *peer_device, struct drbd_request *req,
b411b363
PR
1614 sector_t sector, int data_size)
1615{
7988613b
KO
1616 struct bio_vec bvec;
1617 struct bvec_iter iter;
b411b363 1618 struct bio *bio;
7988613b 1619 int dgs, err, expect;
69a22773
AG
1620 void *dig_in = peer_device->connection->int_dig_in;
1621 void *dig_vv = peer_device->connection->int_dig_vv;
b411b363 1622
88104ca4 1623 dgs = 0;
69a22773
AG
1624 if (peer_device->connection->peer_integrity_tfm) {
1625 dgs = crypto_hash_digestsize(peer_device->connection->peer_integrity_tfm);
1626 err = drbd_recv_all_warn(peer_device->connection, dig_in, dgs);
a5c31904
AG
1627 if (err)
1628 return err;
88104ca4 1629 data_size -= dgs;
b411b363
PR
1630 }
1631
b411b363
PR
1632 /* optimistically update recv_cnt. if receiving fails below,
1633 * we disconnect anyways, and counters will be reset. */
69a22773 1634 peer_device->device->recv_cnt += data_size>>9;
b411b363
PR
1635
1636 bio = req->master_bio;
69a22773 1637 D_ASSERT(peer_device->device, sector == bio->bi_iter.bi_sector);
b411b363 1638
7988613b
KO
1639 bio_for_each_segment(bvec, bio, iter) {
1640 void *mapped = kmap(bvec.bv_page) + bvec.bv_offset;
1641 expect = min_t(int, data_size, bvec.bv_len);
69a22773 1642 err = drbd_recv_all_warn(peer_device->connection, mapped, expect);
7988613b 1643 kunmap(bvec.bv_page);
a5c31904
AG
1644 if (err)
1645 return err;
1646 data_size -= expect;
b411b363
PR
1647 }
1648
1649 if (dgs) {
69a22773 1650 drbd_csum_bio(peer_device->connection->peer_integrity_tfm, bio, dig_vv);
b411b363 1651 if (memcmp(dig_in, dig_vv, dgs)) {
69a22773 1652 drbd_err(peer_device, "Digest integrity check FAILED. Broken NICs?\n");
28284cef 1653 return -EINVAL;
b411b363
PR
1654 }
1655 }
1656
69a22773 1657 D_ASSERT(peer_device->device, data_size == 0);
28284cef 1658 return 0;
b411b363
PR
1659}
1660
a990be46
AG
1661/*
1662 * e_end_resync_block() is called in asender context via
1663 * drbd_finish_peer_reqs().
1664 */
99920dc5 1665static int e_end_resync_block(struct drbd_work *w, int unused)
b411b363 1666{
8050e6d0
AG
1667 struct drbd_peer_request *peer_req =
1668 container_of(w, struct drbd_peer_request, w);
b30ab791 1669 struct drbd_device *device = w->device;
db830c46 1670 sector_t sector = peer_req->i.sector;
99920dc5 1671 int err;
b411b363 1672
0b0ba1ef 1673 D_ASSERT(device, drbd_interval_empty(&peer_req->i));
b411b363 1674
db830c46 1675 if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
b30ab791 1676 drbd_set_in_sync(device, sector, peer_req->i.size);
69a22773 1677 err = drbd_send_ack(first_peer_device(device), P_RS_WRITE_ACK, peer_req);
b411b363
PR
1678 } else {
1679 /* Record failure to sync */
b30ab791 1680 drbd_rs_failed_io(device, sector, peer_req->i.size);
b411b363 1681
69a22773 1682 err = drbd_send_ack(first_peer_device(device), P_NEG_ACK, peer_req);
b411b363 1683 }
b30ab791 1684 dec_unacked(device);
b411b363 1685
99920dc5 1686 return err;
b411b363
PR
1687}
1688
69a22773
AG
1689static int recv_resync_read(struct drbd_peer_device *peer_device, sector_t sector,
1690 int data_size) __releases(local)
b411b363 1691{
69a22773 1692 struct drbd_device *device = peer_device->device;
db830c46 1693 struct drbd_peer_request *peer_req;
b411b363 1694
69a22773 1695 peer_req = read_in_block(peer_device, ID_SYNCER, sector, data_size);
db830c46 1696 if (!peer_req)
45bb912b 1697 goto fail;
b411b363 1698
b30ab791 1699 dec_rs_pending(device);
b411b363 1700
b30ab791 1701 inc_unacked(device);
b411b363
PR
1702 /* corresponding dec_unacked() in e_end_resync_block()
1703 * respective _drbd_clear_done_ee */
1704
db830c46 1705 peer_req->w.cb = e_end_resync_block;
45bb912b 1706
0500813f 1707 spin_lock_irq(&device->resource->req_lock);
b30ab791 1708 list_add(&peer_req->w.list, &device->sync_ee);
0500813f 1709 spin_unlock_irq(&device->resource->req_lock);
b411b363 1710
b30ab791
AG
1711 atomic_add(data_size >> 9, &device->rs_sect_ev);
1712 if (drbd_submit_peer_request(device, peer_req, WRITE, DRBD_FAULT_RS_WR) == 0)
e1c1b0fc 1713 return 0;
b411b363 1714
10f6d992 1715 /* don't care for the reason here */
d0180171 1716 drbd_err(device, "submit failed, triggering re-connect\n");
0500813f 1717 spin_lock_irq(&device->resource->req_lock);
db830c46 1718 list_del(&peer_req->w.list);
0500813f 1719 spin_unlock_irq(&device->resource->req_lock);
22cc37a9 1720
b30ab791 1721 drbd_free_peer_req(device, peer_req);
45bb912b 1722fail:
b30ab791 1723 put_ldev(device);
e1c1b0fc 1724 return -EIO;
b411b363
PR
1725}
1726
668eebc6 1727static struct drbd_request *
b30ab791 1728find_request(struct drbd_device *device, struct rb_root *root, u64 id,
bc9c5c41 1729 sector_t sector, bool missing_ok, const char *func)
51624585 1730{
51624585
AG
1731 struct drbd_request *req;
1732
bc9c5c41
AG
1733 /* Request object according to our peer */
1734 req = (struct drbd_request *)(unsigned long)id;
5e472264 1735 if (drbd_contains_interval(root, sector, &req->i) && req->i.local)
668eebc6 1736 return req;
c3afd8f5 1737 if (!missing_ok) {
d0180171 1738 drbd_err(device, "%s: failed to find request 0x%lx, sector %llus\n", func,
c3afd8f5
AG
1739 (unsigned long)id, (unsigned long long)sector);
1740 }
51624585 1741 return NULL;
b411b363
PR
1742}
1743
bde89a9e 1744static int receive_DataReply(struct drbd_connection *connection, struct packet_info *pi)
b411b363 1745{
9f4fe9ad 1746 struct drbd_peer_device *peer_device;
b30ab791 1747 struct drbd_device *device;
b411b363
PR
1748 struct drbd_request *req;
1749 sector_t sector;
82bc0194 1750 int err;
e658983a 1751 struct p_data *p = pi->data;
4a76b161 1752
9f4fe9ad
AG
1753 peer_device = conn_peer_device(connection, pi->vnr);
1754 if (!peer_device)
4a76b161 1755 return -EIO;
9f4fe9ad 1756 device = peer_device->device;
b411b363
PR
1757
1758 sector = be64_to_cpu(p->sector);
1759
0500813f 1760 spin_lock_irq(&device->resource->req_lock);
b30ab791 1761 req = find_request(device, &device->read_requests, p->block_id, sector, false, __func__);
0500813f 1762 spin_unlock_irq(&device->resource->req_lock);
c3afd8f5 1763 if (unlikely(!req))
82bc0194 1764 return -EIO;
b411b363 1765
24c4830c 1766 /* hlist_del(&req->collision) is done in _req_may_be_done, to avoid
b411b363
PR
1767 * special casing it there for the various failure cases.
1768 * still no race with drbd_fail_pending_reads */
69a22773 1769 err = recv_dless_read(peer_device, req, sector, pi->size);
82bc0194 1770 if (!err)
8554df1c 1771 req_mod(req, DATA_RECEIVED);
b411b363
PR
1772 /* else: nothing. handled from drbd_disconnect...
1773 * I don't think we may complete this just yet
1774 * in case we are "on-disconnect: freeze" */
1775
82bc0194 1776 return err;
b411b363
PR
1777}
1778
bde89a9e 1779static int receive_RSDataReply(struct drbd_connection *connection, struct packet_info *pi)
b411b363 1780{
9f4fe9ad 1781 struct drbd_peer_device *peer_device;
b30ab791 1782 struct drbd_device *device;
b411b363 1783 sector_t sector;
82bc0194 1784 int err;
e658983a 1785 struct p_data *p = pi->data;
4a76b161 1786
9f4fe9ad
AG
1787 peer_device = conn_peer_device(connection, pi->vnr);
1788 if (!peer_device)
4a76b161 1789 return -EIO;
9f4fe9ad 1790 device = peer_device->device;
b411b363
PR
1791
1792 sector = be64_to_cpu(p->sector);
0b0ba1ef 1793 D_ASSERT(device, p->block_id == ID_SYNCER);
b411b363 1794
b30ab791 1795 if (get_ldev(device)) {
b411b363
PR
1796 /* data is submitted to disk within recv_resync_read.
1797 * corresponding put_ldev done below on error,
fcefa62e 1798 * or in drbd_peer_request_endio. */
69a22773 1799 err = recv_resync_read(peer_device, sector, pi->size);
b411b363
PR
1800 } else {
1801 if (__ratelimit(&drbd_ratelimit_state))
d0180171 1802 drbd_err(device, "Can not write resync data to local disk.\n");
b411b363 1803
69a22773 1804 err = drbd_drain_block(peer_device, pi->size);
b411b363 1805
69a22773 1806 drbd_send_ack_dp(peer_device, P_NEG_ACK, p, pi->size);
b411b363
PR
1807 }
1808
b30ab791 1809 atomic_add(pi->size >> 9, &device->rs_sect_in);
778f271d 1810
82bc0194 1811 return err;
b411b363
PR
1812}
1813
b30ab791 1814static void restart_conflicting_writes(struct drbd_device *device,
7be8da07 1815 sector_t sector, int size)
b411b363 1816{
7be8da07
AG
1817 struct drbd_interval *i;
1818 struct drbd_request *req;
1819
b30ab791 1820 drbd_for_each_overlap(i, &device->write_requests, sector, size) {
7be8da07
AG
1821 if (!i->local)
1822 continue;
1823 req = container_of(i, struct drbd_request, i);
1824 if (req->rq_state & RQ_LOCAL_PENDING ||
1825 !(req->rq_state & RQ_POSTPONED))
1826 continue;
2312f0b3
LE
1827 /* as it is RQ_POSTPONED, this will cause it to
1828 * be queued on the retry workqueue. */
d4dabbe2 1829 __req_mod(req, CONFLICT_RESOLVED, NULL);
7be8da07
AG
1830 }
1831}
b411b363 1832
a990be46
AG
1833/*
1834 * e_end_block() is called in asender context via drbd_finish_peer_reqs().
b411b363 1835 */
99920dc5 1836static int e_end_block(struct drbd_work *w, int cancel)
b411b363 1837{
8050e6d0
AG
1838 struct drbd_peer_request *peer_req =
1839 container_of(w, struct drbd_peer_request, w);
b30ab791 1840 struct drbd_device *device = w->device;
db830c46 1841 sector_t sector = peer_req->i.sector;
99920dc5 1842 int err = 0, pcmd;
b411b363 1843
303d1448 1844 if (peer_req->flags & EE_SEND_WRITE_ACK) {
db830c46 1845 if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
b30ab791
AG
1846 pcmd = (device->state.conn >= C_SYNC_SOURCE &&
1847 device->state.conn <= C_PAUSED_SYNC_T &&
db830c46 1848 peer_req->flags & EE_MAY_SET_IN_SYNC) ?
b411b363 1849 P_RS_WRITE_ACK : P_WRITE_ACK;
69a22773 1850 err = drbd_send_ack(first_peer_device(device), pcmd, peer_req);
b411b363 1851 if (pcmd == P_RS_WRITE_ACK)
b30ab791 1852 drbd_set_in_sync(device, sector, peer_req->i.size);
b411b363 1853 } else {
69a22773 1854 err = drbd_send_ack(first_peer_device(device), P_NEG_ACK, peer_req);
b411b363
PR
1855 /* we expect it to be marked out of sync anyways...
1856 * maybe assert this? */
1857 }
b30ab791 1858 dec_unacked(device);
b411b363
PR
1859 }
1860 /* we delete from the conflict detection hash _after_ we sent out the
1861 * P_WRITE_ACK / P_NEG_ACK, to get the sequence number right. */
302bdeae 1862 if (peer_req->flags & EE_IN_INTERVAL_TREE) {
0500813f 1863 spin_lock_irq(&device->resource->req_lock);
0b0ba1ef 1864 D_ASSERT(device, !drbd_interval_empty(&peer_req->i));
b30ab791 1865 drbd_remove_epoch_entry_interval(device, peer_req);
7be8da07 1866 if (peer_req->flags & EE_RESTART_REQUESTS)
b30ab791 1867 restart_conflicting_writes(device, sector, peer_req->i.size);
0500813f 1868 spin_unlock_irq(&device->resource->req_lock);
bb3bfe96 1869 } else
0b0ba1ef 1870 D_ASSERT(device, drbd_interval_empty(&peer_req->i));
b411b363 1871
a6b32bc3 1872 drbd_may_finish_epoch(first_peer_device(device)->connection, peer_req->epoch, EV_PUT + (cancel ? EV_CLEANUP : 0));
b411b363 1873
99920dc5 1874 return err;
b411b363
PR
1875}
1876
7be8da07 1877static int e_send_ack(struct drbd_work *w, enum drbd_packet ack)
b411b363 1878{
b30ab791 1879 struct drbd_device *device = w->device;
8050e6d0
AG
1880 struct drbd_peer_request *peer_req =
1881 container_of(w, struct drbd_peer_request, w);
99920dc5 1882 int err;
b411b363 1883
69a22773 1884 err = drbd_send_ack(first_peer_device(device), ack, peer_req);
b30ab791 1885 dec_unacked(device);
b411b363 1886
99920dc5 1887 return err;
b411b363
PR
1888}
1889
d4dabbe2 1890static int e_send_superseded(struct drbd_work *w, int unused)
7be8da07 1891{
d4dabbe2 1892 return e_send_ack(w, P_SUPERSEDED);
7be8da07
AG
1893}
1894
99920dc5 1895static int e_send_retry_write(struct drbd_work *w, int unused)
7be8da07 1896{
a6b32bc3 1897 struct drbd_connection *connection = first_peer_device(w->device)->connection;
7be8da07 1898
bde89a9e 1899 return e_send_ack(w, connection->agreed_pro_version >= 100 ?
d4dabbe2 1900 P_RETRY_WRITE : P_SUPERSEDED);
7be8da07 1901}
b411b363 1902
3e394da1
AG
1903static bool seq_greater(u32 a, u32 b)
1904{
1905 /*
1906 * We assume 32-bit wrap-around here.
1907 * For 24-bit wrap-around, we would have to shift:
1908 * a <<= 8; b <<= 8;
1909 */
1910 return (s32)a - (s32)b > 0;
1911}
b411b363 1912
3e394da1
AG
1913static u32 seq_max(u32 a, u32 b)
1914{
1915 return seq_greater(a, b) ? a : b;
b411b363
PR
1916}
1917
69a22773 1918static void update_peer_seq(struct drbd_peer_device *peer_device, unsigned int peer_seq)
3e394da1 1919{
69a22773 1920 struct drbd_device *device = peer_device->device;
3c13b680 1921 unsigned int newest_peer_seq;
3e394da1 1922
69a22773 1923 if (test_bit(RESOLVE_CONFLICTS, &peer_device->connection->flags)) {
b30ab791
AG
1924 spin_lock(&device->peer_seq_lock);
1925 newest_peer_seq = seq_max(device->peer_seq, peer_seq);
1926 device->peer_seq = newest_peer_seq;
1927 spin_unlock(&device->peer_seq_lock);
1928 /* wake up only if we actually changed device->peer_seq */
3c13b680 1929 if (peer_seq == newest_peer_seq)
b30ab791 1930 wake_up(&device->seq_wait);
7be8da07 1931 }
b411b363
PR
1932}
1933
d93f6302 1934static inline int overlaps(sector_t s1, int l1, sector_t s2, int l2)
b6a370ba 1935{
d93f6302
LE
1936 return !((s1 + (l1>>9) <= s2) || (s1 >= s2 + (l2>>9)));
1937}
b6a370ba 1938
d93f6302 1939/* maybe change sync_ee into interval trees as well? */
b30ab791 1940static bool overlapping_resync_write(struct drbd_device *device, struct drbd_peer_request *peer_req)
d93f6302
LE
1941{
1942 struct drbd_peer_request *rs_req;
b6a370ba
PR
1943 bool rv = 0;
1944
0500813f 1945 spin_lock_irq(&device->resource->req_lock);
b30ab791 1946 list_for_each_entry(rs_req, &device->sync_ee, w.list) {
d93f6302
LE
1947 if (overlaps(peer_req->i.sector, peer_req->i.size,
1948 rs_req->i.sector, rs_req->i.size)) {
b6a370ba
PR
1949 rv = 1;
1950 break;
1951 }
1952 }
0500813f 1953 spin_unlock_irq(&device->resource->req_lock);
b6a370ba
PR
1954
1955 return rv;
1956}
1957
b411b363
PR
1958/* Called from receive_Data.
1959 * Synchronize packets on sock with packets on msock.
1960 *
1961 * This is here so even when a P_DATA packet traveling via sock overtook an Ack
1962 * packet traveling on msock, they are still processed in the order they have
1963 * been sent.
1964 *
1965 * Note: we don't care for Ack packets overtaking P_DATA packets.
1966 *
b30ab791 1967 * In case packet_seq is larger than device->peer_seq number, there are
b411b363 1968 * outstanding packets on the msock. We wait for them to arrive.
b30ab791 1969 * In case we are the logically next packet, we update device->peer_seq
b411b363
PR
1970 * ourselves. Correctly handles 32bit wrap around.
1971 *
1972 * Assume we have a 10 GBit connection, that is about 1<<30 byte per second,
1973 * about 1<<21 sectors per second. So "worst" case, we have 1<<3 == 8 seconds
1974 * for the 24bit wrap (historical atomic_t guarantee on some archs), and we have
1975 * 1<<9 == 512 seconds aka ages for the 32bit wrap around...
1976 *
1977 * returns 0 if we may process the packet,
1978 * -ERESTARTSYS if we were interrupted (by disconnect signal). */
69a22773 1979static int wait_for_and_update_peer_seq(struct drbd_peer_device *peer_device, const u32 peer_seq)
b411b363 1980{
69a22773 1981 struct drbd_device *device = peer_device->device;
b411b363 1982 DEFINE_WAIT(wait);
b411b363 1983 long timeout;
b874d231 1984 int ret = 0, tp;
7be8da07 1985
69a22773 1986 if (!test_bit(RESOLVE_CONFLICTS, &peer_device->connection->flags))
7be8da07
AG
1987 return 0;
1988
b30ab791 1989 spin_lock(&device->peer_seq_lock);
b411b363 1990 for (;;) {
b30ab791
AG
1991 if (!seq_greater(peer_seq - 1, device->peer_seq)) {
1992 device->peer_seq = seq_max(device->peer_seq, peer_seq);
b411b363 1993 break;
7be8da07 1994 }
b874d231 1995
b411b363
PR
1996 if (signal_pending(current)) {
1997 ret = -ERESTARTSYS;
1998 break;
1999 }
b874d231
PR
2000
2001 rcu_read_lock();
a6b32bc3 2002 tp = rcu_dereference(first_peer_device(device)->connection->net_conf)->two_primaries;
b874d231
PR
2003 rcu_read_unlock();
2004
2005 if (!tp)
2006 break;
2007
2008 /* Only need to wait if two_primaries is enabled */
b30ab791
AG
2009 prepare_to_wait(&device->seq_wait, &wait, TASK_INTERRUPTIBLE);
2010 spin_unlock(&device->peer_seq_lock);
44ed167d 2011 rcu_read_lock();
69a22773 2012 timeout = rcu_dereference(peer_device->connection->net_conf)->ping_timeo*HZ/10;
44ed167d 2013 rcu_read_unlock();
71b1c1eb 2014 timeout = schedule_timeout(timeout);
b30ab791 2015 spin_lock(&device->peer_seq_lock);
7be8da07 2016 if (!timeout) {
b411b363 2017 ret = -ETIMEDOUT;
d0180171 2018 drbd_err(device, "Timed out waiting for missing ack packets; disconnecting\n");
b411b363
PR
2019 break;
2020 }
2021 }
b30ab791
AG
2022 spin_unlock(&device->peer_seq_lock);
2023 finish_wait(&device->seq_wait, &wait);
b411b363
PR
2024 return ret;
2025}
2026
688593c5
LE
2027/* see also bio_flags_to_wire()
2028 * DRBD_REQ_*, because we need to semantically map the flags to data packet
2029 * flags and back. We may replicate to other kernel versions. */
b30ab791 2030static unsigned long wire_flags_to_bio(struct drbd_device *device, u32 dpf)
76d2e7ec 2031{
688593c5
LE
2032 return (dpf & DP_RW_SYNC ? REQ_SYNC : 0) |
2033 (dpf & DP_FUA ? REQ_FUA : 0) |
2034 (dpf & DP_FLUSH ? REQ_FLUSH : 0) |
2035 (dpf & DP_DISCARD ? REQ_DISCARD : 0);
76d2e7ec
PR
2036}
2037
b30ab791 2038static void fail_postponed_requests(struct drbd_device *device, sector_t sector,
7be8da07
AG
2039 unsigned int size)
2040{
2041 struct drbd_interval *i;
2042
2043 repeat:
b30ab791 2044 drbd_for_each_overlap(i, &device->write_requests, sector, size) {
7be8da07
AG
2045 struct drbd_request *req;
2046 struct bio_and_error m;
2047
2048 if (!i->local)
2049 continue;
2050 req = container_of(i, struct drbd_request, i);
2051 if (!(req->rq_state & RQ_POSTPONED))
2052 continue;
2053 req->rq_state &= ~RQ_POSTPONED;
2054 __req_mod(req, NEG_ACKED, &m);
0500813f 2055 spin_unlock_irq(&device->resource->req_lock);
7be8da07 2056 if (m.bio)
b30ab791 2057 complete_master_bio(device, &m);
0500813f 2058 spin_lock_irq(&device->resource->req_lock);
7be8da07
AG
2059 goto repeat;
2060 }
2061}
2062
b30ab791 2063static int handle_write_conflicts(struct drbd_device *device,
7be8da07
AG
2064 struct drbd_peer_request *peer_req)
2065{
a6b32bc3 2066 struct drbd_connection *connection = first_peer_device(device)->connection;
bde89a9e 2067 bool resolve_conflicts = test_bit(RESOLVE_CONFLICTS, &connection->flags);
7be8da07
AG
2068 sector_t sector = peer_req->i.sector;
2069 const unsigned int size = peer_req->i.size;
2070 struct drbd_interval *i;
2071 bool equal;
2072 int err;
2073
2074 /*
2075 * Inserting the peer request into the write_requests tree will prevent
2076 * new conflicting local requests from being added.
2077 */
b30ab791 2078 drbd_insert_interval(&device->write_requests, &peer_req->i);
7be8da07
AG
2079
2080 repeat:
b30ab791 2081 drbd_for_each_overlap(i, &device->write_requests, sector, size) {
7be8da07
AG
2082 if (i == &peer_req->i)
2083 continue;
2084
2085 if (!i->local) {
2086 /*
2087 * Our peer has sent a conflicting remote request; this
2088 * should not happen in a two-node setup. Wait for the
2089 * earlier peer request to complete.
2090 */
b30ab791 2091 err = drbd_wait_misc(device, i);
7be8da07
AG
2092 if (err)
2093 goto out;
2094 goto repeat;
2095 }
2096
2097 equal = i->sector == sector && i->size == size;
2098 if (resolve_conflicts) {
2099 /*
2100 * If the peer request is fully contained within the
d4dabbe2
LE
2101 * overlapping request, it can be considered overwritten
2102 * and thus superseded; otherwise, it will be retried
2103 * once all overlapping requests have completed.
7be8da07 2104 */
d4dabbe2 2105 bool superseded = i->sector <= sector && i->sector +
7be8da07
AG
2106 (i->size >> 9) >= sector + (size >> 9);
2107
2108 if (!equal)
d0180171 2109 drbd_alert(device, "Concurrent writes detected: "
7be8da07
AG
2110 "local=%llus +%u, remote=%llus +%u, "
2111 "assuming %s came first\n",
2112 (unsigned long long)i->sector, i->size,
2113 (unsigned long long)sector, size,
d4dabbe2 2114 superseded ? "local" : "remote");
7be8da07 2115
b30ab791 2116 inc_unacked(device);
d4dabbe2 2117 peer_req->w.cb = superseded ? e_send_superseded :
7be8da07 2118 e_send_retry_write;
b30ab791 2119 list_add_tail(&peer_req->w.list, &device->done_ee);
a6b32bc3 2120 wake_asender(first_peer_device(device)->connection);
7be8da07
AG
2121
2122 err = -ENOENT;
2123 goto out;
2124 } else {
2125 struct drbd_request *req =
2126 container_of(i, struct drbd_request, i);
2127
2128 if (!equal)
d0180171 2129 drbd_alert(device, "Concurrent writes detected: "
7be8da07
AG
2130 "local=%llus +%u, remote=%llus +%u\n",
2131 (unsigned long long)i->sector, i->size,
2132 (unsigned long long)sector, size);
2133
2134 if (req->rq_state & RQ_LOCAL_PENDING ||
2135 !(req->rq_state & RQ_POSTPONED)) {
2136 /*
2137 * Wait for the node with the discard flag to
d4dabbe2
LE
2138 * decide if this request has been superseded
2139 * or needs to be retried.
2140 * Requests that have been superseded will
7be8da07
AG
2141 * disappear from the write_requests tree.
2142 *
2143 * In addition, wait for the conflicting
2144 * request to finish locally before submitting
2145 * the conflicting peer request.
2146 */
b30ab791 2147 err = drbd_wait_misc(device, &req->i);
7be8da07 2148 if (err) {
a6b32bc3 2149 _conn_request_state(first_peer_device(device)->connection,
7be8da07
AG
2150 NS(conn, C_TIMEOUT),
2151 CS_HARD);
b30ab791 2152 fail_postponed_requests(device, sector, size);
7be8da07
AG
2153 goto out;
2154 }
2155 goto repeat;
2156 }
2157 /*
2158 * Remember to restart the conflicting requests after
2159 * the new peer request has completed.
2160 */
2161 peer_req->flags |= EE_RESTART_REQUESTS;
2162 }
2163 }
2164 err = 0;
2165
2166 out:
2167 if (err)
b30ab791 2168 drbd_remove_epoch_entry_interval(device, peer_req);
7be8da07
AG
2169 return err;
2170}
2171
b411b363 2172/* mirrored write */
bde89a9e 2173static int receive_Data(struct drbd_connection *connection, struct packet_info *pi)
b411b363 2174{
9f4fe9ad 2175 struct drbd_peer_device *peer_device;
b30ab791 2176 struct drbd_device *device;
b411b363 2177 sector_t sector;
db830c46 2178 struct drbd_peer_request *peer_req;
e658983a 2179 struct p_data *p = pi->data;
7be8da07 2180 u32 peer_seq = be32_to_cpu(p->seq_num);
b411b363
PR
2181 int rw = WRITE;
2182 u32 dp_flags;
302bdeae 2183 int err, tp;
b411b363 2184
9f4fe9ad
AG
2185 peer_device = conn_peer_device(connection, pi->vnr);
2186 if (!peer_device)
4a76b161 2187 return -EIO;
9f4fe9ad 2188 device = peer_device->device;
b411b363 2189
b30ab791 2190 if (!get_ldev(device)) {
82bc0194
AG
2191 int err2;
2192
69a22773
AG
2193 err = wait_for_and_update_peer_seq(peer_device, peer_seq);
2194 drbd_send_ack_dp(peer_device, P_NEG_ACK, p, pi->size);
bde89a9e 2195 atomic_inc(&connection->current_epoch->epoch_size);
69a22773 2196 err2 = drbd_drain_block(peer_device, pi->size);
82bc0194
AG
2197 if (!err)
2198 err = err2;
2199 return err;
b411b363
PR
2200 }
2201
fcefa62e
AG
2202 /*
2203 * Corresponding put_ldev done either below (on various errors), or in
2204 * drbd_peer_request_endio, if we successfully submit the data at the
2205 * end of this function.
2206 */
b411b363
PR
2207
2208 sector = be64_to_cpu(p->sector);
69a22773 2209 peer_req = read_in_block(peer_device, p->block_id, sector, pi->size);
db830c46 2210 if (!peer_req) {
b30ab791 2211 put_ldev(device);
82bc0194 2212 return -EIO;
b411b363
PR
2213 }
2214
db830c46 2215 peer_req->w.cb = e_end_block;
b411b363 2216
688593c5 2217 dp_flags = be32_to_cpu(p->dp_flags);
b30ab791 2218 rw |= wire_flags_to_bio(device, dp_flags);
81a3537a 2219 if (peer_req->pages == NULL) {
0b0ba1ef
AG
2220 D_ASSERT(device, peer_req->i.size == 0);
2221 D_ASSERT(device, dp_flags & DP_FLUSH);
a73ff323 2222 }
688593c5
LE
2223
2224 if (dp_flags & DP_MAY_SET_IN_SYNC)
db830c46 2225 peer_req->flags |= EE_MAY_SET_IN_SYNC;
688593c5 2226
bde89a9e
AG
2227 spin_lock(&connection->epoch_lock);
2228 peer_req->epoch = connection->current_epoch;
db830c46
AG
2229 atomic_inc(&peer_req->epoch->epoch_size);
2230 atomic_inc(&peer_req->epoch->active);
bde89a9e 2231 spin_unlock(&connection->epoch_lock);
b411b363 2232
302bdeae 2233 rcu_read_lock();
9f4fe9ad 2234 tp = rcu_dereference(peer_device->connection->net_conf)->two_primaries;
302bdeae
PR
2235 rcu_read_unlock();
2236 if (tp) {
2237 peer_req->flags |= EE_IN_INTERVAL_TREE;
69a22773 2238 err = wait_for_and_update_peer_seq(peer_device, peer_seq);
7be8da07 2239 if (err)
b411b363 2240 goto out_interrupted;
0500813f 2241 spin_lock_irq(&device->resource->req_lock);
b30ab791 2242 err = handle_write_conflicts(device, peer_req);
7be8da07 2243 if (err) {
0500813f 2244 spin_unlock_irq(&device->resource->req_lock);
7be8da07 2245 if (err == -ENOENT) {
b30ab791 2246 put_ldev(device);
82bc0194 2247 return 0;
b411b363 2248 }
7be8da07 2249 goto out_interrupted;
b411b363 2250 }
b874d231 2251 } else {
69a22773 2252 update_peer_seq(peer_device, peer_seq);
0500813f 2253 spin_lock_irq(&device->resource->req_lock);
b874d231 2254 }
b30ab791 2255 list_add(&peer_req->w.list, &device->active_ee);
0500813f 2256 spin_unlock_irq(&device->resource->req_lock);
b411b363 2257
b30ab791
AG
2258 if (device->state.conn == C_SYNC_TARGET)
2259 wait_event(device->ee_wait, !overlapping_resync_write(device, peer_req));
b411b363 2260
9f4fe9ad 2261 if (peer_device->connection->agreed_pro_version < 100) {
44ed167d 2262 rcu_read_lock();
9f4fe9ad 2263 switch (rcu_dereference(peer_device->connection->net_conf)->wire_protocol) {
303d1448
PR
2264 case DRBD_PROT_C:
2265 dp_flags |= DP_SEND_WRITE_ACK;
2266 break;
2267 case DRBD_PROT_B:
2268 dp_flags |= DP_SEND_RECEIVE_ACK;
2269 break;
b411b363 2270 }
44ed167d 2271 rcu_read_unlock();
b411b363
PR
2272 }
2273
303d1448
PR
2274 if (dp_flags & DP_SEND_WRITE_ACK) {
2275 peer_req->flags |= EE_SEND_WRITE_ACK;
b30ab791 2276 inc_unacked(device);
b411b363
PR
2277 /* corresponding dec_unacked() in e_end_block()
2278 * respective _drbd_clear_done_ee */
303d1448
PR
2279 }
2280
2281 if (dp_flags & DP_SEND_RECEIVE_ACK) {
b411b363
PR
2282 /* I really don't like it that the receiver thread
2283 * sends on the msock, but anyways */
69a22773 2284 drbd_send_ack(first_peer_device(device), P_RECV_ACK, peer_req);
b411b363
PR
2285 }
2286
b30ab791 2287 if (device->state.pdsk < D_INCONSISTENT) {
b411b363 2288 /* In case we have the only disk of the cluster, */
b30ab791 2289 drbd_set_out_of_sync(device, peer_req->i.sector, peer_req->i.size);
db830c46
AG
2290 peer_req->flags |= EE_CALL_AL_COMPLETE_IO;
2291 peer_req->flags &= ~EE_MAY_SET_IN_SYNC;
b30ab791 2292 drbd_al_begin_io(device, &peer_req->i, true);
b411b363
PR
2293 }
2294
b30ab791 2295 err = drbd_submit_peer_request(device, peer_req, rw, DRBD_FAULT_DT_WR);
82bc0194
AG
2296 if (!err)
2297 return 0;
b411b363 2298
10f6d992 2299 /* don't care for the reason here */
d0180171 2300 drbd_err(device, "submit failed, triggering re-connect\n");
0500813f 2301 spin_lock_irq(&device->resource->req_lock);
db830c46 2302 list_del(&peer_req->w.list);
b30ab791 2303 drbd_remove_epoch_entry_interval(device, peer_req);
0500813f 2304 spin_unlock_irq(&device->resource->req_lock);
db830c46 2305 if (peer_req->flags & EE_CALL_AL_COMPLETE_IO)
b30ab791 2306 drbd_al_complete_io(device, &peer_req->i);
22cc37a9 2307
b411b363 2308out_interrupted:
bde89a9e 2309 drbd_may_finish_epoch(connection, peer_req->epoch, EV_PUT + EV_CLEANUP);
b30ab791
AG
2310 put_ldev(device);
2311 drbd_free_peer_req(device, peer_req);
82bc0194 2312 return err;
b411b363
PR
2313}
2314
0f0601f4
LE
2315/* We may throttle resync, if the lower device seems to be busy,
2316 * and current sync rate is above c_min_rate.
2317 *
2318 * To decide whether or not the lower device is busy, we use a scheme similar
2319 * to MD RAID is_mddev_idle(): if the partition stats reveal "significant"
2320 * (more than 64 sectors) of activity we cannot account for with our own resync
2321 * activity, it obviously is "busy".
2322 *
2323 * The current sync rate used here uses only the most recent two step marks,
2324 * to have a short time average so we can react faster.
2325 */
b30ab791 2326int drbd_rs_should_slow_down(struct drbd_device *device, sector_t sector)
0f0601f4 2327{
b30ab791 2328 struct gendisk *disk = device->ldev->backing_bdev->bd_contains->bd_disk;
0f0601f4 2329 unsigned long db, dt, dbdt;
e3555d85 2330 struct lc_element *tmp;
0f0601f4
LE
2331 int curr_events;
2332 int throttle = 0;
daeda1cc
PR
2333 unsigned int c_min_rate;
2334
2335 rcu_read_lock();
b30ab791 2336 c_min_rate = rcu_dereference(device->ldev->disk_conf)->c_min_rate;
daeda1cc 2337 rcu_read_unlock();
0f0601f4
LE
2338
2339 /* feature disabled? */
daeda1cc 2340 if (c_min_rate == 0)
0f0601f4
LE
2341 return 0;
2342
b30ab791
AG
2343 spin_lock_irq(&device->al_lock);
2344 tmp = lc_find(device->resync, BM_SECT_TO_EXT(sector));
e3555d85
PR
2345 if (tmp) {
2346 struct bm_extent *bm_ext = lc_entry(tmp, struct bm_extent, lce);
2347 if (test_bit(BME_PRIORITY, &bm_ext->flags)) {
b30ab791 2348 spin_unlock_irq(&device->al_lock);
e3555d85
PR
2349 return 0;
2350 }
2351 /* Do not slow down if app IO is already waiting for this extent */
2352 }
b30ab791 2353 spin_unlock_irq(&device->al_lock);
e3555d85 2354
0f0601f4
LE
2355 curr_events = (int)part_stat_read(&disk->part0, sectors[0]) +
2356 (int)part_stat_read(&disk->part0, sectors[1]) -
b30ab791 2357 atomic_read(&device->rs_sect_ev);
e3555d85 2358
b30ab791 2359 if (!device->rs_last_events || curr_events - device->rs_last_events > 64) {
0f0601f4
LE
2360 unsigned long rs_left;
2361 int i;
2362
b30ab791 2363 device->rs_last_events = curr_events;
0f0601f4
LE
2364
2365 /* sync speed average over the last 2*DRBD_SYNC_MARK_STEP,
2366 * approx. */
b30ab791 2367 i = (device->rs_last_mark + DRBD_SYNC_MARKS-1) % DRBD_SYNC_MARKS;
2649f080 2368
b30ab791
AG
2369 if (device->state.conn == C_VERIFY_S || device->state.conn == C_VERIFY_T)
2370 rs_left = device->ov_left;
2649f080 2371 else
b30ab791 2372 rs_left = drbd_bm_total_weight(device) - device->rs_failed;
0f0601f4 2373
b30ab791 2374 dt = ((long)jiffies - (long)device->rs_mark_time[i]) / HZ;
0f0601f4
LE
2375 if (!dt)
2376 dt++;
b30ab791 2377 db = device->rs_mark_left[i] - rs_left;
0f0601f4
LE
2378 dbdt = Bit2KB(db/dt);
2379
daeda1cc 2380 if (dbdt > c_min_rate)
0f0601f4
LE
2381 throttle = 1;
2382 }
2383 return throttle;
2384}
2385
2386
bde89a9e 2387static int receive_DataRequest(struct drbd_connection *connection, struct packet_info *pi)
b411b363 2388{
9f4fe9ad 2389 struct drbd_peer_device *peer_device;
b30ab791 2390 struct drbd_device *device;
b411b363 2391 sector_t sector;
4a76b161 2392 sector_t capacity;
db830c46 2393 struct drbd_peer_request *peer_req;
b411b363 2394 struct digest_info *di = NULL;
b18b37be 2395 int size, verb;
b411b363 2396 unsigned int fault_type;
e658983a 2397 struct p_block_req *p = pi->data;
4a76b161 2398
9f4fe9ad
AG
2399 peer_device = conn_peer_device(connection, pi->vnr);
2400 if (!peer_device)
4a76b161 2401 return -EIO;
9f4fe9ad 2402 device = peer_device->device;
b30ab791 2403 capacity = drbd_get_capacity(device->this_bdev);
b411b363
PR
2404
2405 sector = be64_to_cpu(p->sector);
2406 size = be32_to_cpu(p->blksize);
2407
c670a398 2408 if (size <= 0 || !IS_ALIGNED(size, 512) || size > DRBD_MAX_BIO_SIZE) {
d0180171 2409 drbd_err(device, "%s:%d: sector: %llus, size: %u\n", __FILE__, __LINE__,
b411b363 2410 (unsigned long long)sector, size);
82bc0194 2411 return -EINVAL;
b411b363
PR
2412 }
2413 if (sector + (size>>9) > capacity) {
d0180171 2414 drbd_err(device, "%s:%d: sector: %llus, size: %u\n", __FILE__, __LINE__,
b411b363 2415 (unsigned long long)sector, size);
82bc0194 2416 return -EINVAL;
b411b363
PR
2417 }
2418
b30ab791 2419 if (!get_ldev_if_state(device, D_UP_TO_DATE)) {
b18b37be 2420 verb = 1;
e2857216 2421 switch (pi->cmd) {
b18b37be 2422 case P_DATA_REQUEST:
69a22773 2423 drbd_send_ack_rp(peer_device, P_NEG_DREPLY, p);
b18b37be
PR
2424 break;
2425 case P_RS_DATA_REQUEST:
2426 case P_CSUM_RS_REQUEST:
2427 case P_OV_REQUEST:
69a22773 2428 drbd_send_ack_rp(peer_device, P_NEG_RS_DREPLY , p);
b18b37be
PR
2429 break;
2430 case P_OV_REPLY:
2431 verb = 0;
b30ab791 2432 dec_rs_pending(device);
69a22773 2433 drbd_send_ack_ex(peer_device, P_OV_RESULT, sector, size, ID_IN_SYNC);
b18b37be
PR
2434 break;
2435 default:
49ba9b1b 2436 BUG();
b18b37be
PR
2437 }
2438 if (verb && __ratelimit(&drbd_ratelimit_state))
d0180171 2439 drbd_err(device, "Can not satisfy peer's read request, "
b411b363 2440 "no local data.\n");
b18b37be 2441
a821cc4a 2442 /* drain possibly payload */
69a22773 2443 return drbd_drain_block(peer_device, pi->size);
b411b363
PR
2444 }
2445
2446 /* GFP_NOIO, because we must not cause arbitrary write-out: in a DRBD
2447 * "criss-cross" setup, that might cause write-out on some other DRBD,
2448 * which in turn might block on the other node at this very place. */
69a22773 2449 peer_req = drbd_alloc_peer_req(peer_device, p->block_id, sector, size, GFP_NOIO);
db830c46 2450 if (!peer_req) {
b30ab791 2451 put_ldev(device);
82bc0194 2452 return -ENOMEM;
b411b363
PR
2453 }
2454
e2857216 2455 switch (pi->cmd) {
b411b363 2456 case P_DATA_REQUEST:
db830c46 2457 peer_req->w.cb = w_e_end_data_req;
b411b363 2458 fault_type = DRBD_FAULT_DT_RD;
80a40e43
LE
2459 /* application IO, don't drbd_rs_begin_io */
2460 goto submit;
2461
b411b363 2462 case P_RS_DATA_REQUEST:
db830c46 2463 peer_req->w.cb = w_e_end_rsdata_req;
b411b363 2464 fault_type = DRBD_FAULT_RS_RD;
5f9915bb 2465 /* used in the sector offset progress display */
b30ab791 2466 device->bm_resync_fo = BM_SECT_TO_BIT(sector);
b411b363
PR
2467 break;
2468
2469 case P_OV_REPLY:
2470 case P_CSUM_RS_REQUEST:
2471 fault_type = DRBD_FAULT_RS_RD;
e2857216 2472 di = kmalloc(sizeof(*di) + pi->size, GFP_NOIO);
b411b363
PR
2473 if (!di)
2474 goto out_free_e;
2475
e2857216 2476 di->digest_size = pi->size;
b411b363
PR
2477 di->digest = (((char *)di)+sizeof(struct digest_info));
2478
db830c46
AG
2479 peer_req->digest = di;
2480 peer_req->flags |= EE_HAS_DIGEST;
c36c3ced 2481
9f4fe9ad 2482 if (drbd_recv_all(peer_device->connection, di->digest, pi->size))
b411b363
PR
2483 goto out_free_e;
2484
e2857216 2485 if (pi->cmd == P_CSUM_RS_REQUEST) {
9f4fe9ad 2486 D_ASSERT(device, peer_device->connection->agreed_pro_version >= 89);
db830c46 2487 peer_req->w.cb = w_e_end_csum_rs_req;
5f9915bb 2488 /* used in the sector offset progress display */
b30ab791 2489 device->bm_resync_fo = BM_SECT_TO_BIT(sector);
e2857216 2490 } else if (pi->cmd == P_OV_REPLY) {
2649f080 2491 /* track progress, we may need to throttle */
b30ab791 2492 atomic_add(size >> 9, &device->rs_sect_in);
db830c46 2493 peer_req->w.cb = w_e_end_ov_reply;
b30ab791 2494 dec_rs_pending(device);
0f0601f4
LE
2495 /* drbd_rs_begin_io done when we sent this request,
2496 * but accounting still needs to be done. */
2497 goto submit_for_resync;
b411b363
PR
2498 }
2499 break;
2500
2501 case P_OV_REQUEST:
b30ab791 2502 if (device->ov_start_sector == ~(sector_t)0 &&
9f4fe9ad 2503 peer_device->connection->agreed_pro_version >= 90) {
de228bba
LE
2504 unsigned long now = jiffies;
2505 int i;
b30ab791
AG
2506 device->ov_start_sector = sector;
2507 device->ov_position = sector;
2508 device->ov_left = drbd_bm_bits(device) - BM_SECT_TO_BIT(sector);
2509 device->rs_total = device->ov_left;
de228bba 2510 for (i = 0; i < DRBD_SYNC_MARKS; i++) {
b30ab791
AG
2511 device->rs_mark_left[i] = device->ov_left;
2512 device->rs_mark_time[i] = now;
de228bba 2513 }
d0180171 2514 drbd_info(device, "Online Verify start sector: %llu\n",
b411b363
PR
2515 (unsigned long long)sector);
2516 }
db830c46 2517 peer_req->w.cb = w_e_end_ov_req;
b411b363 2518 fault_type = DRBD_FAULT_RS_RD;
b411b363
PR
2519 break;
2520
b411b363 2521 default:
49ba9b1b 2522 BUG();
b411b363
PR
2523 }
2524
0f0601f4
LE
2525 /* Throttle, drbd_rs_begin_io and submit should become asynchronous
2526 * wrt the receiver, but it is not as straightforward as it may seem.
2527 * Various places in the resync start and stop logic assume resync
2528 * requests are processed in order, requeuing this on the worker thread
2529 * introduces a bunch of new code for synchronization between threads.
2530 *
2531 * Unlimited throttling before drbd_rs_begin_io may stall the resync
2532 * "forever", throttling after drbd_rs_begin_io will lock that extent
2533 * for application writes for the same time. For now, just throttle
2534 * here, where the rest of the code expects the receiver to sleep for
2535 * a while, anyways.
2536 */
2537
2538 /* Throttle before drbd_rs_begin_io, as that locks out application IO;
2539 * this defers syncer requests for some time, before letting at least
2540 * on request through. The resync controller on the receiving side
2541 * will adapt to the incoming rate accordingly.
2542 *
2543 * We cannot throttle here if remote is Primary/SyncTarget:
2544 * we would also throttle its application reads.
2545 * In that case, throttling is done on the SyncTarget only.
2546 */
b30ab791 2547 if (device->state.peer != R_PRIMARY && drbd_rs_should_slow_down(device, sector))
e3555d85 2548 schedule_timeout_uninterruptible(HZ/10);
b30ab791 2549 if (drbd_rs_begin_io(device, sector))
80a40e43 2550 goto out_free_e;
b411b363 2551
0f0601f4 2552submit_for_resync:
b30ab791 2553 atomic_add(size >> 9, &device->rs_sect_ev);
0f0601f4 2554
80a40e43 2555submit:
b30ab791 2556 inc_unacked(device);
0500813f 2557 spin_lock_irq(&device->resource->req_lock);
b30ab791 2558 list_add_tail(&peer_req->w.list, &device->read_ee);
0500813f 2559 spin_unlock_irq(&device->resource->req_lock);
b411b363 2560
b30ab791 2561 if (drbd_submit_peer_request(device, peer_req, READ, fault_type) == 0)
82bc0194 2562 return 0;
b411b363 2563
10f6d992 2564 /* don't care for the reason here */
d0180171 2565 drbd_err(device, "submit failed, triggering re-connect\n");
0500813f 2566 spin_lock_irq(&device->resource->req_lock);
db830c46 2567 list_del(&peer_req->w.list);
0500813f 2568 spin_unlock_irq(&device->resource->req_lock);
22cc37a9
LE
2569 /* no drbd_rs_complete_io(), we are dropping the connection anyways */
2570
b411b363 2571out_free_e:
b30ab791
AG
2572 put_ldev(device);
2573 drbd_free_peer_req(device, peer_req);
82bc0194 2574 return -EIO;
b411b363
PR
2575}
2576
69a22773
AG
2577/**
2578 * drbd_asb_recover_0p - Recover after split-brain with no remaining primaries
2579 */
2580static int drbd_asb_recover_0p(struct drbd_peer_device *peer_device) __must_hold(local)
b411b363 2581{
69a22773 2582 struct drbd_device *device = peer_device->device;
b411b363
PR
2583 int self, peer, rv = -100;
2584 unsigned long ch_self, ch_peer;
44ed167d 2585 enum drbd_after_sb_p after_sb_0p;
b411b363 2586
b30ab791
AG
2587 self = device->ldev->md.uuid[UI_BITMAP] & 1;
2588 peer = device->p_uuid[UI_BITMAP] & 1;
b411b363 2589
b30ab791
AG
2590 ch_peer = device->p_uuid[UI_SIZE];
2591 ch_self = device->comm_bm_set;
b411b363 2592
44ed167d 2593 rcu_read_lock();
69a22773 2594 after_sb_0p = rcu_dereference(peer_device->connection->net_conf)->after_sb_0p;
44ed167d
PR
2595 rcu_read_unlock();
2596 switch (after_sb_0p) {
b411b363
PR
2597 case ASB_CONSENSUS:
2598 case ASB_DISCARD_SECONDARY:
2599 case ASB_CALL_HELPER:
44ed167d 2600 case ASB_VIOLENTLY:
d0180171 2601 drbd_err(device, "Configuration error.\n");
b411b363
PR
2602 break;
2603 case ASB_DISCONNECT:
2604 break;
2605 case ASB_DISCARD_YOUNGER_PRI:
2606 if (self == 0 && peer == 1) {
2607 rv = -1;
2608 break;
2609 }
2610 if (self == 1 && peer == 0) {
2611 rv = 1;
2612 break;
2613 }
2614 /* Else fall through to one of the other strategies... */
2615 case ASB_DISCARD_OLDER_PRI:
2616 if (self == 0 && peer == 1) {
2617 rv = 1;
2618 break;
2619 }
2620 if (self == 1 && peer == 0) {
2621 rv = -1;
2622 break;
2623 }
2624 /* Else fall through to one of the other strategies... */
d0180171 2625 drbd_warn(device, "Discard younger/older primary did not find a decision\n"
b411b363
PR
2626 "Using discard-least-changes instead\n");
2627 case ASB_DISCARD_ZERO_CHG:
2628 if (ch_peer == 0 && ch_self == 0) {
69a22773 2629 rv = test_bit(RESOLVE_CONFLICTS, &peer_device->connection->flags)
b411b363
PR
2630 ? -1 : 1;
2631 break;
2632 } else {
2633 if (ch_peer == 0) { rv = 1; break; }
2634 if (ch_self == 0) { rv = -1; break; }
2635 }
44ed167d 2636 if (after_sb_0p == ASB_DISCARD_ZERO_CHG)
b411b363
PR
2637 break;
2638 case ASB_DISCARD_LEAST_CHG:
2639 if (ch_self < ch_peer)
2640 rv = -1;
2641 else if (ch_self > ch_peer)
2642 rv = 1;
2643 else /* ( ch_self == ch_peer ) */
2644 /* Well, then use something else. */
69a22773 2645 rv = test_bit(RESOLVE_CONFLICTS, &peer_device->connection->flags)
b411b363
PR
2646 ? -1 : 1;
2647 break;
2648 case ASB_DISCARD_LOCAL:
2649 rv = -1;
2650 break;
2651 case ASB_DISCARD_REMOTE:
2652 rv = 1;
2653 }
2654
2655 return rv;
2656}
2657
69a22773
AG
2658/**
2659 * drbd_asb_recover_1p - Recover after split-brain with one remaining primary
2660 */
2661static int drbd_asb_recover_1p(struct drbd_peer_device *peer_device) __must_hold(local)
b411b363 2662{
69a22773 2663 struct drbd_device *device = peer_device->device;
6184ea21 2664 int hg, rv = -100;
44ed167d 2665 enum drbd_after_sb_p after_sb_1p;
b411b363 2666
44ed167d 2667 rcu_read_lock();
69a22773 2668 after_sb_1p = rcu_dereference(peer_device->connection->net_conf)->after_sb_1p;
44ed167d
PR
2669 rcu_read_unlock();
2670 switch (after_sb_1p) {
b411b363
PR
2671 case ASB_DISCARD_YOUNGER_PRI:
2672 case ASB_DISCARD_OLDER_PRI:
2673 case ASB_DISCARD_LEAST_CHG:
2674 case ASB_DISCARD_LOCAL:
2675 case ASB_DISCARD_REMOTE:
44ed167d 2676 case ASB_DISCARD_ZERO_CHG:
d0180171 2677 drbd_err(device, "Configuration error.\n");
b411b363
PR
2678 break;
2679 case ASB_DISCONNECT:
2680 break;
2681 case ASB_CONSENSUS:
69a22773 2682 hg = drbd_asb_recover_0p(peer_device);
b30ab791 2683 if (hg == -1 && device->state.role == R_SECONDARY)
b411b363 2684 rv = hg;
b30ab791 2685 if (hg == 1 && device->state.role == R_PRIMARY)
b411b363
PR
2686 rv = hg;
2687 break;
2688 case ASB_VIOLENTLY:
69a22773 2689 rv = drbd_asb_recover_0p(peer_device);
b411b363
PR
2690 break;
2691 case ASB_DISCARD_SECONDARY:
b30ab791 2692 return device->state.role == R_PRIMARY ? 1 : -1;
b411b363 2693 case ASB_CALL_HELPER:
69a22773 2694 hg = drbd_asb_recover_0p(peer_device);
b30ab791 2695 if (hg == -1 && device->state.role == R_PRIMARY) {
bb437946
AG
2696 enum drbd_state_rv rv2;
2697
b411b363
PR
2698 /* drbd_change_state() does not sleep while in SS_IN_TRANSIENT_STATE,
2699 * we might be here in C_WF_REPORT_PARAMS which is transient.
2700 * we do not need to wait for the after state change work either. */
b30ab791 2701 rv2 = drbd_change_state(device, CS_VERBOSE, NS(role, R_SECONDARY));
bb437946 2702 if (rv2 != SS_SUCCESS) {
b30ab791 2703 drbd_khelper(device, "pri-lost-after-sb");
b411b363 2704 } else {
d0180171 2705 drbd_warn(device, "Successfully gave up primary role.\n");
b411b363
PR
2706 rv = hg;
2707 }
2708 } else
2709 rv = hg;
2710 }
2711
2712 return rv;
2713}
2714
69a22773
AG
2715/**
2716 * drbd_asb_recover_2p - Recover after split-brain with two remaining primaries
2717 */
2718static int drbd_asb_recover_2p(struct drbd_peer_device *peer_device) __must_hold(local)
b411b363 2719{
69a22773 2720 struct drbd_device *device = peer_device->device;
6184ea21 2721 int hg, rv = -100;
44ed167d 2722 enum drbd_after_sb_p after_sb_2p;
b411b363 2723
44ed167d 2724 rcu_read_lock();
69a22773 2725 after_sb_2p = rcu_dereference(peer_device->connection->net_conf)->after_sb_2p;
44ed167d
PR
2726 rcu_read_unlock();
2727 switch (after_sb_2p) {
b411b363
PR
2728 case ASB_DISCARD_YOUNGER_PRI:
2729 case ASB_DISCARD_OLDER_PRI:
2730 case ASB_DISCARD_LEAST_CHG:
2731 case ASB_DISCARD_LOCAL:
2732 case ASB_DISCARD_REMOTE:
2733 case ASB_CONSENSUS:
2734 case ASB_DISCARD_SECONDARY:
44ed167d 2735 case ASB_DISCARD_ZERO_CHG:
d0180171 2736 drbd_err(device, "Configuration error.\n");
b411b363
PR
2737 break;
2738 case ASB_VIOLENTLY:
69a22773 2739 rv = drbd_asb_recover_0p(peer_device);
b411b363
PR
2740 break;
2741 case ASB_DISCONNECT:
2742 break;
2743 case ASB_CALL_HELPER:
69a22773 2744 hg = drbd_asb_recover_0p(peer_device);
b411b363 2745 if (hg == -1) {
bb437946
AG
2746 enum drbd_state_rv rv2;
2747
b411b363
PR
2748 /* drbd_change_state() does not sleep while in SS_IN_TRANSIENT_STATE,
2749 * we might be here in C_WF_REPORT_PARAMS which is transient.
2750 * we do not need to wait for the after state change work either. */
b30ab791 2751 rv2 = drbd_change_state(device, CS_VERBOSE, NS(role, R_SECONDARY));
bb437946 2752 if (rv2 != SS_SUCCESS) {
b30ab791 2753 drbd_khelper(device, "pri-lost-after-sb");
b411b363 2754 } else {
d0180171 2755 drbd_warn(device, "Successfully gave up primary role.\n");
b411b363
PR
2756 rv = hg;
2757 }
2758 } else
2759 rv = hg;
2760 }
2761
2762 return rv;
2763}
2764
b30ab791 2765static void drbd_uuid_dump(struct drbd_device *device, char *text, u64 *uuid,
b411b363
PR
2766 u64 bits, u64 flags)
2767{
2768 if (!uuid) {
d0180171 2769 drbd_info(device, "%s uuid info vanished while I was looking!\n", text);
b411b363
PR
2770 return;
2771 }
d0180171 2772 drbd_info(device, "%s %016llX:%016llX:%016llX:%016llX bits:%llu flags:%llX\n",
b411b363
PR
2773 text,
2774 (unsigned long long)uuid[UI_CURRENT],
2775 (unsigned long long)uuid[UI_BITMAP],
2776 (unsigned long long)uuid[UI_HISTORY_START],
2777 (unsigned long long)uuid[UI_HISTORY_END],
2778 (unsigned long long)bits,
2779 (unsigned long long)flags);
2780}
2781
2782/*
2783 100 after split brain try auto recover
2784 2 C_SYNC_SOURCE set BitMap
2785 1 C_SYNC_SOURCE use BitMap
2786 0 no Sync
2787 -1 C_SYNC_TARGET use BitMap
2788 -2 C_SYNC_TARGET set BitMap
2789 -100 after split brain, disconnect
2790-1000 unrelated data
4a23f264
PR
2791-1091 requires proto 91
2792-1096 requires proto 96
b411b363 2793 */
b30ab791 2794static int drbd_uuid_compare(struct drbd_device *device, int *rule_nr) __must_hold(local)
b411b363
PR
2795{
2796 u64 self, peer;
2797 int i, j;
2798
b30ab791
AG
2799 self = device->ldev->md.uuid[UI_CURRENT] & ~((u64)1);
2800 peer = device->p_uuid[UI_CURRENT] & ~((u64)1);
b411b363
PR
2801
2802 *rule_nr = 10;
2803 if (self == UUID_JUST_CREATED && peer == UUID_JUST_CREATED)
2804 return 0;
2805
2806 *rule_nr = 20;
2807 if ((self == UUID_JUST_CREATED || self == (u64)0) &&
2808 peer != UUID_JUST_CREATED)
2809 return -2;
2810
2811 *rule_nr = 30;
2812 if (self != UUID_JUST_CREATED &&
2813 (peer == UUID_JUST_CREATED || peer == (u64)0))
2814 return 2;
2815
2816 if (self == peer) {
2817 int rct, dc; /* roles at crash time */
2818
b30ab791 2819 if (device->p_uuid[UI_BITMAP] == (u64)0 && device->ldev->md.uuid[UI_BITMAP] != (u64)0) {
b411b363 2820
a6b32bc3 2821 if (first_peer_device(device)->connection->agreed_pro_version < 91)
4a23f264 2822 return -1091;
b411b363 2823
b30ab791
AG
2824 if ((device->ldev->md.uuid[UI_BITMAP] & ~((u64)1)) == (device->p_uuid[UI_HISTORY_START] & ~((u64)1)) &&
2825 (device->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) == (device->p_uuid[UI_HISTORY_START + 1] & ~((u64)1))) {
d0180171 2826 drbd_info(device, "was SyncSource, missed the resync finished event, corrected myself:\n");
b30ab791
AG
2827 drbd_uuid_move_history(device);
2828 device->ldev->md.uuid[UI_HISTORY_START] = device->ldev->md.uuid[UI_BITMAP];
2829 device->ldev->md.uuid[UI_BITMAP] = 0;
b411b363 2830
b30ab791
AG
2831 drbd_uuid_dump(device, "self", device->ldev->md.uuid,
2832 device->state.disk >= D_NEGOTIATING ? drbd_bm_total_weight(device) : 0, 0);
b411b363
PR
2833 *rule_nr = 34;
2834 } else {
d0180171 2835 drbd_info(device, "was SyncSource (peer failed to write sync_uuid)\n");
b411b363
PR
2836 *rule_nr = 36;
2837 }
2838
2839 return 1;
2840 }
2841
b30ab791 2842 if (device->ldev->md.uuid[UI_BITMAP] == (u64)0 && device->p_uuid[UI_BITMAP] != (u64)0) {
b411b363 2843
a6b32bc3 2844 if (first_peer_device(device)->connection->agreed_pro_version < 91)
4a23f264 2845 return -1091;
b411b363 2846
b30ab791
AG
2847 if ((device->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) == (device->p_uuid[UI_BITMAP] & ~((u64)1)) &&
2848 (device->ldev->md.uuid[UI_HISTORY_START + 1] & ~((u64)1)) == (device->p_uuid[UI_HISTORY_START] & ~((u64)1))) {
d0180171 2849 drbd_info(device, "was SyncTarget, peer missed the resync finished event, corrected peer:\n");
b411b363 2850
b30ab791
AG
2851 device->p_uuid[UI_HISTORY_START + 1] = device->p_uuid[UI_HISTORY_START];
2852 device->p_uuid[UI_HISTORY_START] = device->p_uuid[UI_BITMAP];
2853 device->p_uuid[UI_BITMAP] = 0UL;
b411b363 2854
b30ab791 2855 drbd_uuid_dump(device, "peer", device->p_uuid, device->p_uuid[UI_SIZE], device->p_uuid[UI_FLAGS]);
b411b363
PR
2856 *rule_nr = 35;
2857 } else {
d0180171 2858 drbd_info(device, "was SyncTarget (failed to write sync_uuid)\n");
b411b363
PR
2859 *rule_nr = 37;
2860 }
2861
2862 return -1;
2863 }
2864
2865 /* Common power [off|failure] */
b30ab791
AG
2866 rct = (test_bit(CRASHED_PRIMARY, &device->flags) ? 1 : 0) +
2867 (device->p_uuid[UI_FLAGS] & 2);
b411b363
PR
2868 /* lowest bit is set when we were primary,
2869 * next bit (weight 2) is set when peer was primary */
2870 *rule_nr = 40;
2871
2872 switch (rct) {
2873 case 0: /* !self_pri && !peer_pri */ return 0;
2874 case 1: /* self_pri && !peer_pri */ return 1;
2875 case 2: /* !self_pri && peer_pri */ return -1;
2876 case 3: /* self_pri && peer_pri */
a6b32bc3 2877 dc = test_bit(RESOLVE_CONFLICTS, &first_peer_device(device)->connection->flags);
b411b363
PR
2878 return dc ? -1 : 1;
2879 }
2880 }
2881
2882 *rule_nr = 50;
b30ab791 2883 peer = device->p_uuid[UI_BITMAP] & ~((u64)1);
b411b363
PR
2884 if (self == peer)
2885 return -1;
2886
2887 *rule_nr = 51;
b30ab791 2888 peer = device->p_uuid[UI_HISTORY_START] & ~((u64)1);
b411b363 2889 if (self == peer) {
a6b32bc3 2890 if (first_peer_device(device)->connection->agreed_pro_version < 96 ?
b30ab791
AG
2891 (device->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) ==
2892 (device->p_uuid[UI_HISTORY_START + 1] & ~((u64)1)) :
2893 peer + UUID_NEW_BM_OFFSET == (device->p_uuid[UI_BITMAP] & ~((u64)1))) {
b411b363
PR
2894 /* The last P_SYNC_UUID did not get though. Undo the last start of
2895 resync as sync source modifications of the peer's UUIDs. */
2896
a6b32bc3 2897 if (first_peer_device(device)->connection->agreed_pro_version < 91)
4a23f264 2898 return -1091;
b411b363 2899
b30ab791
AG
2900 device->p_uuid[UI_BITMAP] = device->p_uuid[UI_HISTORY_START];
2901 device->p_uuid[UI_HISTORY_START] = device->p_uuid[UI_HISTORY_START + 1];
4a23f264 2902
d0180171 2903 drbd_info(device, "Lost last syncUUID packet, corrected:\n");
b30ab791 2904 drbd_uuid_dump(device, "peer", device->p_uuid, device->p_uuid[UI_SIZE], device->p_uuid[UI_FLAGS]);
4a23f264 2905
b411b363
PR
2906 return -1;
2907 }
2908 }
2909
2910 *rule_nr = 60;
b30ab791 2911 self = device->ldev->md.uuid[UI_CURRENT] & ~((u64)1);
b411b363 2912 for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
b30ab791 2913 peer = device->p_uuid[i] & ~((u64)1);
b411b363
PR
2914 if (self == peer)
2915 return -2;
2916 }
2917
2918 *rule_nr = 70;
b30ab791
AG
2919 self = device->ldev->md.uuid[UI_BITMAP] & ~((u64)1);
2920 peer = device->p_uuid[UI_CURRENT] & ~((u64)1);
b411b363
PR
2921 if (self == peer)
2922 return 1;
2923
2924 *rule_nr = 71;
b30ab791 2925 self = device->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1);
b411b363 2926 if (self == peer) {
a6b32bc3 2927 if (first_peer_device(device)->connection->agreed_pro_version < 96 ?
b30ab791
AG
2928 (device->ldev->md.uuid[UI_HISTORY_START + 1] & ~((u64)1)) ==
2929 (device->p_uuid[UI_HISTORY_START] & ~((u64)1)) :
2930 self + UUID_NEW_BM_OFFSET == (device->ldev->md.uuid[UI_BITMAP] & ~((u64)1))) {
b411b363
PR
2931 /* The last P_SYNC_UUID did not get though. Undo the last start of
2932 resync as sync source modifications of our UUIDs. */
2933
a6b32bc3 2934 if (first_peer_device(device)->connection->agreed_pro_version < 91)
4a23f264 2935 return -1091;
b411b363 2936
b30ab791
AG
2937 __drbd_uuid_set(device, UI_BITMAP, device->ldev->md.uuid[UI_HISTORY_START]);
2938 __drbd_uuid_set(device, UI_HISTORY_START, device->ldev->md.uuid[UI_HISTORY_START + 1]);
b411b363 2939
d0180171 2940 drbd_info(device, "Last syncUUID did not get through, corrected:\n");
b30ab791
AG
2941 drbd_uuid_dump(device, "self", device->ldev->md.uuid,
2942 device->state.disk >= D_NEGOTIATING ? drbd_bm_total_weight(device) : 0, 0);
b411b363
PR
2943
2944 return 1;
2945 }
2946 }
2947
2948
2949 *rule_nr = 80;
b30ab791 2950 peer = device->p_uuid[UI_CURRENT] & ~((u64)1);
b411b363 2951 for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
b30ab791 2952 self = device->ldev->md.uuid[i] & ~((u64)1);
b411b363
PR
2953 if (self == peer)
2954 return 2;
2955 }
2956
2957 *rule_nr = 90;
b30ab791
AG
2958 self = device->ldev->md.uuid[UI_BITMAP] & ~((u64)1);
2959 peer = device->p_uuid[UI_BITMAP] & ~((u64)1);
b411b363
PR
2960 if (self == peer && self != ((u64)0))
2961 return 100;
2962
2963 *rule_nr = 100;
2964 for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
b30ab791 2965 self = device->ldev->md.uuid[i] & ~((u64)1);
b411b363 2966 for (j = UI_HISTORY_START; j <= UI_HISTORY_END; j++) {
b30ab791 2967 peer = device->p_uuid[j] & ~((u64)1);
b411b363
PR
2968 if (self == peer)
2969 return -100;
2970 }
2971 }
2972
2973 return -1000;
2974}
2975
2976/* drbd_sync_handshake() returns the new conn state on success, or
2977 CONN_MASK (-1) on failure.
2978 */
69a22773
AG
2979static enum drbd_conns drbd_sync_handshake(struct drbd_peer_device *peer_device,
2980 enum drbd_role peer_role,
b411b363
PR
2981 enum drbd_disk_state peer_disk) __must_hold(local)
2982{
69a22773 2983 struct drbd_device *device = peer_device->device;
b411b363
PR
2984 enum drbd_conns rv = C_MASK;
2985 enum drbd_disk_state mydisk;
44ed167d 2986 struct net_conf *nc;
6dff2902 2987 int hg, rule_nr, rr_conflict, tentative;
b411b363 2988
b30ab791 2989 mydisk = device->state.disk;
b411b363 2990 if (mydisk == D_NEGOTIATING)
b30ab791 2991 mydisk = device->new_state_tmp.disk;
b411b363 2992
d0180171 2993 drbd_info(device, "drbd_sync_handshake:\n");
9f2247bb 2994
b30ab791
AG
2995 spin_lock_irq(&device->ldev->md.uuid_lock);
2996 drbd_uuid_dump(device, "self", device->ldev->md.uuid, device->comm_bm_set, 0);
2997 drbd_uuid_dump(device, "peer", device->p_uuid,
2998 device->p_uuid[UI_SIZE], device->p_uuid[UI_FLAGS]);
b411b363 2999
b30ab791
AG
3000 hg = drbd_uuid_compare(device, &rule_nr);
3001 spin_unlock_irq(&device->ldev->md.uuid_lock);
b411b363 3002
d0180171 3003 drbd_info(device, "uuid_compare()=%d by rule %d\n", hg, rule_nr);
b411b363
PR
3004
3005 if (hg == -1000) {
d0180171 3006 drbd_alert(device, "Unrelated data, aborting!\n");
b411b363
PR
3007 return C_MASK;
3008 }
4a23f264 3009 if (hg < -1000) {
d0180171 3010 drbd_alert(device, "To resolve this both sides have to support at least protocol %d\n", -hg - 1000);
b411b363
PR
3011 return C_MASK;
3012 }
3013
3014 if ((mydisk == D_INCONSISTENT && peer_disk > D_INCONSISTENT) ||
3015 (peer_disk == D_INCONSISTENT && mydisk > D_INCONSISTENT)) {
3016 int f = (hg == -100) || abs(hg) == 2;
3017 hg = mydisk > D_INCONSISTENT ? 1 : -1;
3018 if (f)
3019 hg = hg*2;
d0180171 3020 drbd_info(device, "Becoming sync %s due to disk states.\n",
b411b363
PR
3021 hg > 0 ? "source" : "target");
3022 }
3023
3a11a487 3024 if (abs(hg) == 100)
b30ab791 3025 drbd_khelper(device, "initial-split-brain");
3a11a487 3026
44ed167d 3027 rcu_read_lock();
69a22773 3028 nc = rcu_dereference(peer_device->connection->net_conf);
44ed167d
PR
3029
3030 if (hg == 100 || (hg == -100 && nc->always_asbp)) {
b30ab791 3031 int pcount = (device->state.role == R_PRIMARY)
b411b363
PR
3032 + (peer_role == R_PRIMARY);
3033 int forced = (hg == -100);
3034
3035 switch (pcount) {
3036 case 0:
69a22773 3037 hg = drbd_asb_recover_0p(peer_device);
b411b363
PR
3038 break;
3039 case 1:
69a22773 3040 hg = drbd_asb_recover_1p(peer_device);
b411b363
PR
3041 break;
3042 case 2:
69a22773 3043 hg = drbd_asb_recover_2p(peer_device);
b411b363
PR
3044 break;
3045 }
3046 if (abs(hg) < 100) {
d0180171 3047 drbd_warn(device, "Split-Brain detected, %d primaries, "
b411b363
PR
3048 "automatically solved. Sync from %s node\n",
3049 pcount, (hg < 0) ? "peer" : "this");
3050 if (forced) {
d0180171 3051 drbd_warn(device, "Doing a full sync, since"
b411b363
PR
3052 " UUIDs where ambiguous.\n");
3053 hg = hg*2;
3054 }
3055 }
3056 }
3057
3058 if (hg == -100) {
b30ab791 3059 if (test_bit(DISCARD_MY_DATA, &device->flags) && !(device->p_uuid[UI_FLAGS]&1))
b411b363 3060 hg = -1;
b30ab791 3061 if (!test_bit(DISCARD_MY_DATA, &device->flags) && (device->p_uuid[UI_FLAGS]&1))
b411b363
PR
3062 hg = 1;
3063
3064 if (abs(hg) < 100)
d0180171 3065 drbd_warn(device, "Split-Brain detected, manually solved. "
b411b363
PR
3066 "Sync from %s node\n",
3067 (hg < 0) ? "peer" : "this");
3068 }
44ed167d 3069 rr_conflict = nc->rr_conflict;
6dff2902 3070 tentative = nc->tentative;
44ed167d 3071 rcu_read_unlock();
b411b363
PR
3072
3073 if (hg == -100) {
580b9767
LE
3074 /* FIXME this log message is not correct if we end up here
3075 * after an attempted attach on a diskless node.
3076 * We just refuse to attach -- well, we drop the "connection"
3077 * to that disk, in a way... */
d0180171 3078 drbd_alert(device, "Split-Brain detected but unresolved, dropping connection!\n");
b30ab791 3079 drbd_khelper(device, "split-brain");
b411b363
PR
3080 return C_MASK;
3081 }
3082
3083 if (hg > 0 && mydisk <= D_INCONSISTENT) {
d0180171 3084 drbd_err(device, "I shall become SyncSource, but I am inconsistent!\n");
b411b363
PR
3085 return C_MASK;
3086 }
3087
3088 if (hg < 0 && /* by intention we do not use mydisk here. */
b30ab791 3089 device->state.role == R_PRIMARY && device->state.disk >= D_CONSISTENT) {
44ed167d 3090 switch (rr_conflict) {
b411b363 3091 case ASB_CALL_HELPER:
b30ab791 3092 drbd_khelper(device, "pri-lost");
b411b363
PR
3093 /* fall through */
3094 case ASB_DISCONNECT:
d0180171 3095 drbd_err(device, "I shall become SyncTarget, but I am primary!\n");
b411b363
PR
3096 return C_MASK;
3097 case ASB_VIOLENTLY:
d0180171 3098 drbd_warn(device, "Becoming SyncTarget, violating the stable-data"
b411b363
PR
3099 "assumption\n");
3100 }
3101 }
3102
69a22773 3103 if (tentative || test_bit(CONN_DRY_RUN, &peer_device->connection->flags)) {
cf14c2e9 3104 if (hg == 0)
d0180171 3105 drbd_info(device, "dry-run connect: No resync, would become Connected immediately.\n");
cf14c2e9 3106 else
d0180171 3107 drbd_info(device, "dry-run connect: Would become %s, doing a %s resync.",
cf14c2e9
PR
3108 drbd_conn_str(hg > 0 ? C_SYNC_SOURCE : C_SYNC_TARGET),
3109 abs(hg) >= 2 ? "full" : "bit-map based");
3110 return C_MASK;
3111 }
3112
b411b363 3113 if (abs(hg) >= 2) {
d0180171 3114 drbd_info(device, "Writing the whole bitmap, full sync required after drbd_sync_handshake.\n");
b30ab791 3115 if (drbd_bitmap_io(device, &drbd_bmio_set_n_write, "set_n_write from sync_handshake",
20ceb2b2 3116 BM_LOCKED_SET_ALLOWED))
b411b363
PR
3117 return C_MASK;
3118 }
3119
3120 if (hg > 0) { /* become sync source. */
3121 rv = C_WF_BITMAP_S;
3122 } else if (hg < 0) { /* become sync target */
3123 rv = C_WF_BITMAP_T;
3124 } else {
3125 rv = C_CONNECTED;
b30ab791 3126 if (drbd_bm_total_weight(device)) {
d0180171 3127 drbd_info(device, "No resync, but %lu bits in bitmap!\n",
b30ab791 3128 drbd_bm_total_weight(device));
b411b363
PR
3129 }
3130 }
3131
3132 return rv;
3133}
3134
f179d76d 3135static enum drbd_after_sb_p convert_after_sb(enum drbd_after_sb_p peer)
b411b363
PR
3136{
3137 /* ASB_DISCARD_REMOTE - ASB_DISCARD_LOCAL is valid */
f179d76d
PR
3138 if (peer == ASB_DISCARD_REMOTE)
3139 return ASB_DISCARD_LOCAL;
b411b363
PR
3140
3141 /* any other things with ASB_DISCARD_REMOTE or ASB_DISCARD_LOCAL are invalid */
f179d76d
PR
3142 if (peer == ASB_DISCARD_LOCAL)
3143 return ASB_DISCARD_REMOTE;
b411b363
PR
3144
3145 /* everything else is valid if they are equal on both sides. */
f179d76d 3146 return peer;
b411b363
PR
3147}
3148
bde89a9e 3149static int receive_protocol(struct drbd_connection *connection, struct packet_info *pi)
b411b363 3150{
e658983a 3151 struct p_protocol *p = pi->data;
036b17ea
PR
3152 enum drbd_after_sb_p p_after_sb_0p, p_after_sb_1p, p_after_sb_2p;
3153 int p_proto, p_discard_my_data, p_two_primaries, cf;
3154 struct net_conf *nc, *old_net_conf, *new_net_conf = NULL;
3155 char integrity_alg[SHARED_SECRET_MAX] = "";
accdbcc5 3156 struct crypto_hash *peer_integrity_tfm = NULL;
7aca6c75 3157 void *int_dig_in = NULL, *int_dig_vv = NULL;
b411b363 3158
b411b363
PR
3159 p_proto = be32_to_cpu(p->protocol);
3160 p_after_sb_0p = be32_to_cpu(p->after_sb_0p);
3161 p_after_sb_1p = be32_to_cpu(p->after_sb_1p);
3162 p_after_sb_2p = be32_to_cpu(p->after_sb_2p);
b411b363 3163 p_two_primaries = be32_to_cpu(p->two_primaries);
cf14c2e9 3164 cf = be32_to_cpu(p->conn_flags);
6139f60d 3165 p_discard_my_data = cf & CF_DISCARD_MY_DATA;
cf14c2e9 3166
bde89a9e 3167 if (connection->agreed_pro_version >= 87) {
86db0618 3168 int err;
cf14c2e9 3169
88104ca4 3170 if (pi->size > sizeof(integrity_alg))
86db0618 3171 return -EIO;
bde89a9e 3172 err = drbd_recv_all(connection, integrity_alg, pi->size);
86db0618
AG
3173 if (err)
3174 return err;
036b17ea 3175 integrity_alg[SHARED_SECRET_MAX - 1] = 0;
b411b363
PR
3176 }
3177
7d4c782c 3178 if (pi->cmd != P_PROTOCOL_UPDATE) {
bde89a9e 3179 clear_bit(CONN_DRY_RUN, &connection->flags);
b411b363 3180
fbc12f45 3181 if (cf & CF_DRY_RUN)
bde89a9e 3182 set_bit(CONN_DRY_RUN, &connection->flags);
b411b363 3183
fbc12f45 3184 rcu_read_lock();
bde89a9e 3185 nc = rcu_dereference(connection->net_conf);
b411b363 3186
fbc12f45 3187 if (p_proto != nc->wire_protocol) {
1ec861eb 3188 drbd_err(connection, "incompatible %s settings\n", "protocol");
fbc12f45
AG
3189 goto disconnect_rcu_unlock;
3190 }
b411b363 3191
fbc12f45 3192 if (convert_after_sb(p_after_sb_0p) != nc->after_sb_0p) {
1ec861eb 3193 drbd_err(connection, "incompatible %s settings\n", "after-sb-0pri");
fbc12f45
AG
3194 goto disconnect_rcu_unlock;
3195 }
b411b363 3196
fbc12f45 3197 if (convert_after_sb(p_after_sb_1p) != nc->after_sb_1p) {
1ec861eb 3198 drbd_err(connection, "incompatible %s settings\n", "after-sb-1pri");
fbc12f45
AG
3199 goto disconnect_rcu_unlock;
3200 }
b411b363 3201
fbc12f45 3202 if (convert_after_sb(p_after_sb_2p) != nc->after_sb_2p) {
1ec861eb 3203 drbd_err(connection, "incompatible %s settings\n", "after-sb-2pri");
fbc12f45
AG
3204 goto disconnect_rcu_unlock;
3205 }
b411b363 3206
fbc12f45 3207 if (p_discard_my_data && nc->discard_my_data) {
1ec861eb 3208 drbd_err(connection, "incompatible %s settings\n", "discard-my-data");
fbc12f45
AG
3209 goto disconnect_rcu_unlock;
3210 }
b411b363 3211
fbc12f45 3212 if (p_two_primaries != nc->two_primaries) {
1ec861eb 3213 drbd_err(connection, "incompatible %s settings\n", "allow-two-primaries");
fbc12f45
AG
3214 goto disconnect_rcu_unlock;
3215 }
b411b363 3216
fbc12f45 3217 if (strcmp(integrity_alg, nc->integrity_alg)) {
1ec861eb 3218 drbd_err(connection, "incompatible %s settings\n", "data-integrity-alg");
fbc12f45
AG
3219 goto disconnect_rcu_unlock;
3220 }
b411b363 3221
fbc12f45 3222 rcu_read_unlock();
b411b363
PR
3223 }
3224
7d4c782c
AG
3225 if (integrity_alg[0]) {
3226 int hash_size;
3227
3228 /*
3229 * We can only change the peer data integrity algorithm
3230 * here. Changing our own data integrity algorithm
3231 * requires that we send a P_PROTOCOL_UPDATE packet at
3232 * the same time; otherwise, the peer has no way to
3233 * tell between which packets the algorithm should
3234 * change.
3235 */
b411b363 3236
7d4c782c
AG
3237 peer_integrity_tfm = crypto_alloc_hash(integrity_alg, 0, CRYPTO_ALG_ASYNC);
3238 if (!peer_integrity_tfm) {
1ec861eb 3239 drbd_err(connection, "peer data-integrity-alg %s not supported\n",
7d4c782c
AG
3240 integrity_alg);
3241 goto disconnect;
3242 }
b411b363 3243
7d4c782c
AG
3244 hash_size = crypto_hash_digestsize(peer_integrity_tfm);
3245 int_dig_in = kmalloc(hash_size, GFP_KERNEL);
3246 int_dig_vv = kmalloc(hash_size, GFP_KERNEL);
3247 if (!(int_dig_in && int_dig_vv)) {
1ec861eb 3248 drbd_err(connection, "Allocation of buffers for data integrity checking failed\n");
b411b363
PR
3249 goto disconnect;
3250 }
b411b363
PR
3251 }
3252
7d4c782c
AG
3253 new_net_conf = kmalloc(sizeof(struct net_conf), GFP_KERNEL);
3254 if (!new_net_conf) {
1ec861eb 3255 drbd_err(connection, "Allocation of new net_conf failed\n");
7d4c782c
AG
3256 goto disconnect;
3257 }
3258
bde89a9e 3259 mutex_lock(&connection->data.mutex);
0500813f 3260 mutex_lock(&connection->resource->conf_update);
bde89a9e 3261 old_net_conf = connection->net_conf;
7d4c782c
AG
3262 *new_net_conf = *old_net_conf;
3263
3264 new_net_conf->wire_protocol = p_proto;
3265 new_net_conf->after_sb_0p = convert_after_sb(p_after_sb_0p);
3266 new_net_conf->after_sb_1p = convert_after_sb(p_after_sb_1p);
3267 new_net_conf->after_sb_2p = convert_after_sb(p_after_sb_2p);
3268 new_net_conf->two_primaries = p_two_primaries;
3269
bde89a9e 3270 rcu_assign_pointer(connection->net_conf, new_net_conf);
0500813f 3271 mutex_unlock(&connection->resource->conf_update);
bde89a9e 3272 mutex_unlock(&connection->data.mutex);
7d4c782c 3273
bde89a9e
AG
3274 crypto_free_hash(connection->peer_integrity_tfm);
3275 kfree(connection->int_dig_in);
3276 kfree(connection->int_dig_vv);
3277 connection->peer_integrity_tfm = peer_integrity_tfm;
3278 connection->int_dig_in = int_dig_in;
3279 connection->int_dig_vv = int_dig_vv;
7d4c782c
AG
3280
3281 if (strcmp(old_net_conf->integrity_alg, integrity_alg))
1ec861eb 3282 drbd_info(connection, "peer data-integrity-alg: %s\n",
7d4c782c
AG
3283 integrity_alg[0] ? integrity_alg : "(none)");
3284
3285 synchronize_rcu();
3286 kfree(old_net_conf);
82bc0194 3287 return 0;
b411b363 3288
44ed167d
PR
3289disconnect_rcu_unlock:
3290 rcu_read_unlock();
b411b363 3291disconnect:
b792c35c 3292 crypto_free_hash(peer_integrity_tfm);
036b17ea
PR
3293 kfree(int_dig_in);
3294 kfree(int_dig_vv);
bde89a9e 3295 conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD);
82bc0194 3296 return -EIO;
b411b363
PR
3297}
3298
3299/* helper function
3300 * input: alg name, feature name
3301 * return: NULL (alg name was "")
3302 * ERR_PTR(error) if something goes wrong
3303 * or the crypto hash ptr, if it worked out ok. */
f63e631a 3304static
b30ab791 3305struct crypto_hash *drbd_crypto_alloc_digest_safe(const struct drbd_device *device,
b411b363
PR
3306 const char *alg, const char *name)
3307{
3308 struct crypto_hash *tfm;
3309
3310 if (!alg[0])
3311 return NULL;
3312
3313 tfm = crypto_alloc_hash(alg, 0, CRYPTO_ALG_ASYNC);
3314 if (IS_ERR(tfm)) {
d0180171 3315 drbd_err(device, "Can not allocate \"%s\" as %s (reason: %ld)\n",
b411b363
PR
3316 alg, name, PTR_ERR(tfm));
3317 return tfm;
3318 }
b411b363
PR
3319 return tfm;
3320}
3321
bde89a9e 3322static int ignore_remaining_packet(struct drbd_connection *connection, struct packet_info *pi)
4a76b161 3323{
bde89a9e 3324 void *buffer = connection->data.rbuf;
4a76b161
AG
3325 int size = pi->size;
3326
3327 while (size) {
3328 int s = min_t(int, size, DRBD_SOCKET_BUFFER_SIZE);
bde89a9e 3329 s = drbd_recv(connection, buffer, s);
4a76b161
AG
3330 if (s <= 0) {
3331 if (s < 0)
3332 return s;
3333 break;
3334 }
3335 size -= s;
3336 }
3337 if (size)
3338 return -EIO;
3339 return 0;
3340}
3341
3342/*
3343 * config_unknown_volume - device configuration command for unknown volume
3344 *
3345 * When a device is added to an existing connection, the node on which the
3346 * device is added first will send configuration commands to its peer but the
3347 * peer will not know about the device yet. It will warn and ignore these
3348 * commands. Once the device is added on the second node, the second node will
3349 * send the same device configuration commands, but in the other direction.
3350 *
3351 * (We can also end up here if drbd is misconfigured.)
3352 */
bde89a9e 3353static int config_unknown_volume(struct drbd_connection *connection, struct packet_info *pi)
4a76b161 3354{
1ec861eb 3355 drbd_warn(connection, "%s packet received for volume %u, which is not configured locally\n",
2fcb8f30 3356 cmdname(pi->cmd), pi->vnr);
bde89a9e 3357 return ignore_remaining_packet(connection, pi);
4a76b161
AG
3358}
3359
bde89a9e 3360static int receive_SyncParam(struct drbd_connection *connection, struct packet_info *pi)
b411b363 3361{
9f4fe9ad 3362 struct drbd_peer_device *peer_device;
b30ab791 3363 struct drbd_device *device;
e658983a 3364 struct p_rs_param_95 *p;
b411b363
PR
3365 unsigned int header_size, data_size, exp_max_sz;
3366 struct crypto_hash *verify_tfm = NULL;
3367 struct crypto_hash *csums_tfm = NULL;
2ec91e0e 3368 struct net_conf *old_net_conf, *new_net_conf = NULL;
813472ce 3369 struct disk_conf *old_disk_conf = NULL, *new_disk_conf = NULL;
bde89a9e 3370 const int apv = connection->agreed_pro_version;
813472ce 3371 struct fifo_buffer *old_plan = NULL, *new_plan = NULL;
778f271d 3372 int fifo_size = 0;
82bc0194 3373 int err;
b411b363 3374
9f4fe9ad
AG
3375 peer_device = conn_peer_device(connection, pi->vnr);
3376 if (!peer_device)
bde89a9e 3377 return config_unknown_volume(connection, pi);
9f4fe9ad 3378 device = peer_device->device;
b411b363
PR
3379
3380 exp_max_sz = apv <= 87 ? sizeof(struct p_rs_param)
3381 : apv == 88 ? sizeof(struct p_rs_param)
3382 + SHARED_SECRET_MAX
8e26f9cc
PR
3383 : apv <= 94 ? sizeof(struct p_rs_param_89)
3384 : /* apv >= 95 */ sizeof(struct p_rs_param_95);
b411b363 3385
e2857216 3386 if (pi->size > exp_max_sz) {
d0180171 3387 drbd_err(device, "SyncParam packet too long: received %u, expected <= %u bytes\n",
e2857216 3388 pi->size, exp_max_sz);
82bc0194 3389 return -EIO;
b411b363
PR
3390 }
3391
3392 if (apv <= 88) {
e658983a 3393 header_size = sizeof(struct p_rs_param);
e2857216 3394 data_size = pi->size - header_size;
8e26f9cc 3395 } else if (apv <= 94) {
e658983a 3396 header_size = sizeof(struct p_rs_param_89);
e2857216 3397 data_size = pi->size - header_size;
0b0ba1ef 3398 D_ASSERT(device, data_size == 0);
8e26f9cc 3399 } else {
e658983a 3400 header_size = sizeof(struct p_rs_param_95);
e2857216 3401 data_size = pi->size - header_size;
0b0ba1ef 3402 D_ASSERT(device, data_size == 0);
b411b363
PR
3403 }
3404
3405 /* initialize verify_alg and csums_alg */
e658983a 3406 p = pi->data;
b411b363
PR
3407 memset(p->verify_alg, 0, 2 * SHARED_SECRET_MAX);
3408
9f4fe9ad 3409 err = drbd_recv_all(peer_device->connection, p, header_size);
82bc0194
AG
3410 if (err)
3411 return err;
b411b363 3412
0500813f 3413 mutex_lock(&connection->resource->conf_update);
9f4fe9ad 3414 old_net_conf = peer_device->connection->net_conf;
b30ab791 3415 if (get_ldev(device)) {
813472ce
PR
3416 new_disk_conf = kzalloc(sizeof(struct disk_conf), GFP_KERNEL);
3417 if (!new_disk_conf) {
b30ab791 3418 put_ldev(device);
0500813f 3419 mutex_unlock(&connection->resource->conf_update);
d0180171 3420 drbd_err(device, "Allocation of new disk_conf failed\n");
813472ce
PR
3421 return -ENOMEM;
3422 }
daeda1cc 3423
b30ab791 3424 old_disk_conf = device->ldev->disk_conf;
813472ce 3425 *new_disk_conf = *old_disk_conf;
b411b363 3426
6394b935 3427 new_disk_conf->resync_rate = be32_to_cpu(p->resync_rate);
813472ce 3428 }
b411b363
PR
3429
3430 if (apv >= 88) {
3431 if (apv == 88) {
5de73827 3432 if (data_size > SHARED_SECRET_MAX || data_size == 0) {
d0180171 3433 drbd_err(device, "verify-alg of wrong size, "
5de73827
PR
3434 "peer wants %u, accepting only up to %u byte\n",
3435 data_size, SHARED_SECRET_MAX);
813472ce
PR
3436 err = -EIO;
3437 goto reconnect;
b411b363
PR
3438 }
3439
9f4fe9ad 3440 err = drbd_recv_all(peer_device->connection, p->verify_alg, data_size);
813472ce
PR
3441 if (err)
3442 goto reconnect;
b411b363
PR
3443 /* we expect NUL terminated string */
3444 /* but just in case someone tries to be evil */
0b0ba1ef 3445 D_ASSERT(device, p->verify_alg[data_size-1] == 0);
b411b363
PR
3446 p->verify_alg[data_size-1] = 0;
3447
3448 } else /* apv >= 89 */ {
3449 /* we still expect NUL terminated strings */
3450 /* but just in case someone tries to be evil */
0b0ba1ef
AG
3451 D_ASSERT(device, p->verify_alg[SHARED_SECRET_MAX-1] == 0);
3452 D_ASSERT(device, p->csums_alg[SHARED_SECRET_MAX-1] == 0);
b411b363
PR
3453 p->verify_alg[SHARED_SECRET_MAX-1] = 0;
3454 p->csums_alg[SHARED_SECRET_MAX-1] = 0;
3455 }
3456
2ec91e0e 3457 if (strcmp(old_net_conf->verify_alg, p->verify_alg)) {
b30ab791 3458 if (device->state.conn == C_WF_REPORT_PARAMS) {
d0180171 3459 drbd_err(device, "Different verify-alg settings. me=\"%s\" peer=\"%s\"\n",
2ec91e0e 3460 old_net_conf->verify_alg, p->verify_alg);
b411b363
PR
3461 goto disconnect;
3462 }
b30ab791 3463 verify_tfm = drbd_crypto_alloc_digest_safe(device,
b411b363
PR
3464 p->verify_alg, "verify-alg");
3465 if (IS_ERR(verify_tfm)) {
3466 verify_tfm = NULL;
3467 goto disconnect;
3468 }
3469 }
3470
2ec91e0e 3471 if (apv >= 89 && strcmp(old_net_conf->csums_alg, p->csums_alg)) {
b30ab791 3472 if (device->state.conn == C_WF_REPORT_PARAMS) {
d0180171 3473 drbd_err(device, "Different csums-alg settings. me=\"%s\" peer=\"%s\"\n",
2ec91e0e 3474 old_net_conf->csums_alg, p->csums_alg);
b411b363
PR
3475 goto disconnect;
3476 }
b30ab791 3477 csums_tfm = drbd_crypto_alloc_digest_safe(device,
b411b363
PR
3478 p->csums_alg, "csums-alg");
3479 if (IS_ERR(csums_tfm)) {
3480 csums_tfm = NULL;
3481 goto disconnect;
3482 }
3483 }
3484
813472ce 3485 if (apv > 94 && new_disk_conf) {
daeda1cc
PR
3486 new_disk_conf->c_plan_ahead = be32_to_cpu(p->c_plan_ahead);
3487 new_disk_conf->c_delay_target = be32_to_cpu(p->c_delay_target);
3488 new_disk_conf->c_fill_target = be32_to_cpu(p->c_fill_target);
3489 new_disk_conf->c_max_rate = be32_to_cpu(p->c_max_rate);
778f271d 3490
daeda1cc 3491 fifo_size = (new_disk_conf->c_plan_ahead * 10 * SLEEP_TIME) / HZ;
b30ab791 3492 if (fifo_size != device->rs_plan_s->size) {
813472ce
PR
3493 new_plan = fifo_alloc(fifo_size);
3494 if (!new_plan) {
d0180171 3495 drbd_err(device, "kmalloc of fifo_buffer failed");
b30ab791 3496 put_ldev(device);
778f271d
PR
3497 goto disconnect;
3498 }
3499 }
8e26f9cc 3500 }
b411b363 3501
91fd4dad 3502 if (verify_tfm || csums_tfm) {
2ec91e0e
PR
3503 new_net_conf = kzalloc(sizeof(struct net_conf), GFP_KERNEL);
3504 if (!new_net_conf) {
d0180171 3505 drbd_err(device, "Allocation of new net_conf failed\n");
91fd4dad
PR
3506 goto disconnect;
3507 }
3508
2ec91e0e 3509 *new_net_conf = *old_net_conf;
91fd4dad
PR
3510
3511 if (verify_tfm) {
2ec91e0e
PR
3512 strcpy(new_net_conf->verify_alg, p->verify_alg);
3513 new_net_conf->verify_alg_len = strlen(p->verify_alg) + 1;
9f4fe9ad
AG
3514 crypto_free_hash(peer_device->connection->verify_tfm);
3515 peer_device->connection->verify_tfm = verify_tfm;
d0180171 3516 drbd_info(device, "using verify-alg: \"%s\"\n", p->verify_alg);
91fd4dad
PR
3517 }
3518 if (csums_tfm) {
2ec91e0e
PR
3519 strcpy(new_net_conf->csums_alg, p->csums_alg);
3520 new_net_conf->csums_alg_len = strlen(p->csums_alg) + 1;
9f4fe9ad
AG
3521 crypto_free_hash(peer_device->connection->csums_tfm);
3522 peer_device->connection->csums_tfm = csums_tfm;
d0180171 3523 drbd_info(device, "using csums-alg: \"%s\"\n", p->csums_alg);
91fd4dad 3524 }
bde89a9e 3525 rcu_assign_pointer(connection->net_conf, new_net_conf);
778f271d 3526 }
b411b363
PR
3527 }
3528
813472ce 3529 if (new_disk_conf) {
b30ab791
AG
3530 rcu_assign_pointer(device->ldev->disk_conf, new_disk_conf);
3531 put_ldev(device);
813472ce
PR
3532 }
3533
3534 if (new_plan) {
b30ab791
AG
3535 old_plan = device->rs_plan_s;
3536 rcu_assign_pointer(device->rs_plan_s, new_plan);
b411b363 3537 }
daeda1cc 3538
0500813f 3539 mutex_unlock(&connection->resource->conf_update);
daeda1cc
PR
3540 synchronize_rcu();
3541 if (new_net_conf)
3542 kfree(old_net_conf);
3543 kfree(old_disk_conf);
813472ce 3544 kfree(old_plan);
daeda1cc 3545
82bc0194 3546 return 0;
b411b363 3547
813472ce
PR
3548reconnect:
3549 if (new_disk_conf) {
b30ab791 3550 put_ldev(device);
813472ce
PR
3551 kfree(new_disk_conf);
3552 }
0500813f 3553 mutex_unlock(&connection->resource->conf_update);
813472ce
PR
3554 return -EIO;
3555
b411b363 3556disconnect:
813472ce
PR
3557 kfree(new_plan);
3558 if (new_disk_conf) {
b30ab791 3559 put_ldev(device);
813472ce
PR
3560 kfree(new_disk_conf);
3561 }
0500813f 3562 mutex_unlock(&connection->resource->conf_update);
b411b363
PR
3563 /* just for completeness: actually not needed,
3564 * as this is not reached if csums_tfm was ok. */
3565 crypto_free_hash(csums_tfm);
3566 /* but free the verify_tfm again, if csums_tfm did not work out */
3567 crypto_free_hash(verify_tfm);
9f4fe9ad 3568 conn_request_state(peer_device->connection, NS(conn, C_DISCONNECTING), CS_HARD);
82bc0194 3569 return -EIO;
b411b363
PR
3570}
3571
b411b363 3572/* warn if the arguments differ by more than 12.5% */
b30ab791 3573static void warn_if_differ_considerably(struct drbd_device *device,
b411b363
PR
3574 const char *s, sector_t a, sector_t b)
3575{
3576 sector_t d;
3577 if (a == 0 || b == 0)
3578 return;
3579 d = (a > b) ? (a - b) : (b - a);
3580 if (d > (a>>3) || d > (b>>3))
d0180171 3581 drbd_warn(device, "Considerable difference in %s: %llus vs. %llus\n", s,
b411b363
PR
3582 (unsigned long long)a, (unsigned long long)b);
3583}
3584
bde89a9e 3585static int receive_sizes(struct drbd_connection *connection, struct packet_info *pi)
b411b363 3586{
9f4fe9ad 3587 struct drbd_peer_device *peer_device;
b30ab791 3588 struct drbd_device *device;
e658983a 3589 struct p_sizes *p = pi->data;
e96c9633 3590 enum determine_dev_size dd = DS_UNCHANGED;
b411b363
PR
3591 sector_t p_size, p_usize, my_usize;
3592 int ldsc = 0; /* local disk size changed */
e89b591c 3593 enum dds_flags ddsf;
b411b363 3594
9f4fe9ad
AG
3595 peer_device = conn_peer_device(connection, pi->vnr);
3596 if (!peer_device)
bde89a9e 3597 return config_unknown_volume(connection, pi);
9f4fe9ad 3598 device = peer_device->device;
4a76b161 3599
b411b363
PR
3600 p_size = be64_to_cpu(p->d_size);
3601 p_usize = be64_to_cpu(p->u_size);
3602
b411b363
PR
3603 /* just store the peer's disk size for now.
3604 * we still need to figure out whether we accept that. */
b30ab791 3605 device->p_size = p_size;
b411b363 3606
b30ab791 3607 if (get_ldev(device)) {
daeda1cc 3608 rcu_read_lock();
b30ab791 3609 my_usize = rcu_dereference(device->ldev->disk_conf)->disk_size;
daeda1cc
PR
3610 rcu_read_unlock();
3611
b30ab791
AG
3612 warn_if_differ_considerably(device, "lower level device sizes",
3613 p_size, drbd_get_max_capacity(device->ldev));
3614 warn_if_differ_considerably(device, "user requested size",
daeda1cc 3615 p_usize, my_usize);
b411b363
PR
3616
3617 /* if this is the first connect, or an otherwise expected
3618 * param exchange, choose the minimum */
b30ab791 3619 if (device->state.conn == C_WF_REPORT_PARAMS)
daeda1cc 3620 p_usize = min_not_zero(my_usize, p_usize);
b411b363
PR
3621
3622 /* Never shrink a device with usable data during connect.
3623 But allow online shrinking if we are connected. */
b30ab791
AG
3624 if (drbd_new_dev_size(device, device->ldev, p_usize, 0) <
3625 drbd_get_capacity(device->this_bdev) &&
3626 device->state.disk >= D_OUTDATED &&
3627 device->state.conn < C_CONNECTED) {
d0180171 3628 drbd_err(device, "The peer's disk size is too small!\n");
9f4fe9ad 3629 conn_request_state(peer_device->connection, NS(conn, C_DISCONNECTING), CS_HARD);
b30ab791 3630 put_ldev(device);
82bc0194 3631 return -EIO;
b411b363 3632 }
daeda1cc
PR
3633
3634 if (my_usize != p_usize) {
3635 struct disk_conf *old_disk_conf, *new_disk_conf = NULL;
3636
3637 new_disk_conf = kzalloc(sizeof(struct disk_conf), GFP_KERNEL);
3638 if (!new_disk_conf) {
d0180171 3639 drbd_err(device, "Allocation of new disk_conf failed\n");
b30ab791 3640 put_ldev(device);
daeda1cc
PR
3641 return -ENOMEM;
3642 }
3643
0500813f 3644 mutex_lock(&connection->resource->conf_update);
b30ab791 3645 old_disk_conf = device->ldev->disk_conf;
daeda1cc
PR
3646 *new_disk_conf = *old_disk_conf;
3647 new_disk_conf->disk_size = p_usize;
3648
b30ab791 3649 rcu_assign_pointer(device->ldev->disk_conf, new_disk_conf);
0500813f 3650 mutex_unlock(&connection->resource->conf_update);
daeda1cc
PR
3651 synchronize_rcu();
3652 kfree(old_disk_conf);
3653
d0180171 3654 drbd_info(device, "Peer sets u_size to %lu sectors\n",
daeda1cc 3655 (unsigned long)my_usize);
b411b363 3656 }
daeda1cc 3657
b30ab791 3658 put_ldev(device);
b411b363 3659 }
b411b363 3660
e89b591c 3661 ddsf = be16_to_cpu(p->dds_flags);
b30ab791
AG
3662 if (get_ldev(device)) {
3663 dd = drbd_determine_dev_size(device, ddsf, NULL);
3664 put_ldev(device);
e96c9633 3665 if (dd == DS_ERROR)
82bc0194 3666 return -EIO;
b30ab791 3667 drbd_md_sync(device);
b411b363
PR
3668 } else {
3669 /* I am diskless, need to accept the peer's size. */
b30ab791 3670 drbd_set_my_capacity(device, p_size);
b411b363
PR
3671 }
3672
b30ab791
AG
3673 device->peer_max_bio_size = be32_to_cpu(p->max_bio_size);
3674 drbd_reconsider_max_bio_size(device);
99432fcc 3675
b30ab791
AG
3676 if (get_ldev(device)) {
3677 if (device->ldev->known_size != drbd_get_capacity(device->ldev->backing_bdev)) {
3678 device->ldev->known_size = drbd_get_capacity(device->ldev->backing_bdev);
b411b363
PR
3679 ldsc = 1;
3680 }
3681
b30ab791 3682 put_ldev(device);
b411b363
PR
3683 }
3684
b30ab791 3685 if (device->state.conn > C_WF_REPORT_PARAMS) {
b411b363 3686 if (be64_to_cpu(p->c_size) !=
b30ab791 3687 drbd_get_capacity(device->this_bdev) || ldsc) {
b411b363
PR
3688 /* we have different sizes, probably peer
3689 * needs to know my new size... */
69a22773 3690 drbd_send_sizes(peer_device, 0, ddsf);
b411b363 3691 }
b30ab791
AG
3692 if (test_and_clear_bit(RESIZE_PENDING, &device->flags) ||
3693 (dd == DS_GREW && device->state.conn == C_CONNECTED)) {
3694 if (device->state.pdsk >= D_INCONSISTENT &&
3695 device->state.disk >= D_INCONSISTENT) {
e89b591c 3696 if (ddsf & DDSF_NO_RESYNC)
d0180171 3697 drbd_info(device, "Resync of new storage suppressed with --assume-clean\n");
e89b591c 3698 else
b30ab791 3699 resync_after_online_grow(device);
e89b591c 3700 } else
b30ab791 3701 set_bit(RESYNC_AFTER_NEG, &device->flags);
b411b363
PR
3702 }
3703 }
3704
82bc0194 3705 return 0;
b411b363
PR
3706}
3707
bde89a9e 3708static int receive_uuids(struct drbd_connection *connection, struct packet_info *pi)
b411b363 3709{
9f4fe9ad 3710 struct drbd_peer_device *peer_device;
b30ab791 3711 struct drbd_device *device;
e658983a 3712 struct p_uuids *p = pi->data;
b411b363 3713 u64 *p_uuid;
62b0da3a 3714 int i, updated_uuids = 0;
b411b363 3715
9f4fe9ad
AG
3716 peer_device = conn_peer_device(connection, pi->vnr);
3717 if (!peer_device)
bde89a9e 3718 return config_unknown_volume(connection, pi);
9f4fe9ad 3719 device = peer_device->device;
4a76b161 3720
b411b363 3721 p_uuid = kmalloc(sizeof(u64)*UI_EXTENDED_SIZE, GFP_NOIO);
063eacf8 3722 if (!p_uuid) {
d0180171 3723 drbd_err(device, "kmalloc of p_uuid failed\n");
063eacf8
JW
3724 return false;
3725 }
b411b363
PR
3726
3727 for (i = UI_CURRENT; i < UI_EXTENDED_SIZE; i++)
3728 p_uuid[i] = be64_to_cpu(p->uuid[i]);
3729
b30ab791
AG
3730 kfree(device->p_uuid);
3731 device->p_uuid = p_uuid;
b411b363 3732
b30ab791
AG
3733 if (device->state.conn < C_CONNECTED &&
3734 device->state.disk < D_INCONSISTENT &&
3735 device->state.role == R_PRIMARY &&
3736 (device->ed_uuid & ~((u64)1)) != (p_uuid[UI_CURRENT] & ~((u64)1))) {
d0180171 3737 drbd_err(device, "Can only connect to data with current UUID=%016llX\n",
b30ab791 3738 (unsigned long long)device->ed_uuid);
9f4fe9ad 3739 conn_request_state(peer_device->connection, NS(conn, C_DISCONNECTING), CS_HARD);
82bc0194 3740 return -EIO;
b411b363
PR
3741 }
3742
b30ab791 3743 if (get_ldev(device)) {
b411b363 3744 int skip_initial_sync =
b30ab791 3745 device->state.conn == C_CONNECTED &&
9f4fe9ad 3746 peer_device->connection->agreed_pro_version >= 90 &&
b30ab791 3747 device->ldev->md.uuid[UI_CURRENT] == UUID_JUST_CREATED &&
b411b363
PR
3748 (p_uuid[UI_FLAGS] & 8);
3749 if (skip_initial_sync) {
d0180171 3750 drbd_info(device, "Accepted new current UUID, preparing to skip initial sync\n");
b30ab791 3751 drbd_bitmap_io(device, &drbd_bmio_clear_n_write,
20ceb2b2
LE
3752 "clear_n_write from receive_uuids",
3753 BM_LOCKED_TEST_ALLOWED);
b30ab791
AG
3754 _drbd_uuid_set(device, UI_CURRENT, p_uuid[UI_CURRENT]);
3755 _drbd_uuid_set(device, UI_BITMAP, 0);
3756 _drbd_set_state(_NS2(device, disk, D_UP_TO_DATE, pdsk, D_UP_TO_DATE),
b411b363 3757 CS_VERBOSE, NULL);
b30ab791 3758 drbd_md_sync(device);
62b0da3a 3759 updated_uuids = 1;
b411b363 3760 }
b30ab791
AG
3761 put_ldev(device);
3762 } else if (device->state.disk < D_INCONSISTENT &&
3763 device->state.role == R_PRIMARY) {
18a50fa2
PR
3764 /* I am a diskless primary, the peer just created a new current UUID
3765 for me. */
b30ab791 3766 updated_uuids = drbd_set_ed_uuid(device, p_uuid[UI_CURRENT]);
b411b363
PR
3767 }
3768
3769 /* Before we test for the disk state, we should wait until an eventually
3770 ongoing cluster wide state change is finished. That is important if
3771 we are primary and are detaching from our disk. We need to see the
3772 new disk state... */
b30ab791
AG
3773 mutex_lock(device->state_mutex);
3774 mutex_unlock(device->state_mutex);
3775 if (device->state.conn >= C_CONNECTED && device->state.disk < D_INCONSISTENT)
3776 updated_uuids |= drbd_set_ed_uuid(device, p_uuid[UI_CURRENT]);
62b0da3a
LE
3777
3778 if (updated_uuids)
b30ab791 3779 drbd_print_uuids(device, "receiver updated UUIDs to");
b411b363 3780
82bc0194 3781 return 0;
b411b363
PR
3782}
3783
3784/**
3785 * convert_state() - Converts the peer's view of the cluster state to our point of view
3786 * @ps: The state as seen by the peer.
3787 */
3788static union drbd_state convert_state(union drbd_state ps)
3789{
3790 union drbd_state ms;
3791
3792 static enum drbd_conns c_tab[] = {
369bea63 3793 [C_WF_REPORT_PARAMS] = C_WF_REPORT_PARAMS,
b411b363
PR
3794 [C_CONNECTED] = C_CONNECTED,
3795
3796 [C_STARTING_SYNC_S] = C_STARTING_SYNC_T,
3797 [C_STARTING_SYNC_T] = C_STARTING_SYNC_S,
3798 [C_DISCONNECTING] = C_TEAR_DOWN, /* C_NETWORK_FAILURE, */
3799 [C_VERIFY_S] = C_VERIFY_T,
3800 [C_MASK] = C_MASK,
3801 };
3802
3803 ms.i = ps.i;
3804
3805 ms.conn = c_tab[ps.conn];
3806 ms.peer = ps.role;
3807 ms.role = ps.peer;
3808 ms.pdsk = ps.disk;
3809 ms.disk = ps.pdsk;
3810 ms.peer_isp = (ps.aftr_isp | ps.user_isp);
3811
3812 return ms;
3813}
3814
bde89a9e 3815static int receive_req_state(struct drbd_connection *connection, struct packet_info *pi)
b411b363 3816{
9f4fe9ad 3817 struct drbd_peer_device *peer_device;
b30ab791 3818 struct drbd_device *device;
e658983a 3819 struct p_req_state *p = pi->data;
b411b363 3820 union drbd_state mask, val;
bf885f8a 3821 enum drbd_state_rv rv;
b411b363 3822
9f4fe9ad
AG
3823 peer_device = conn_peer_device(connection, pi->vnr);
3824 if (!peer_device)
4a76b161 3825 return -EIO;
9f4fe9ad 3826 device = peer_device->device;
4a76b161 3827
b411b363
PR
3828 mask.i = be32_to_cpu(p->mask);
3829 val.i = be32_to_cpu(p->val);
3830
9f4fe9ad 3831 if (test_bit(RESOLVE_CONFLICTS, &peer_device->connection->flags) &&
b30ab791 3832 mutex_is_locked(device->state_mutex)) {
69a22773 3833 drbd_send_sr_reply(peer_device, SS_CONCURRENT_ST_CHG);
82bc0194 3834 return 0;
b411b363
PR
3835 }
3836
3837 mask = convert_state(mask);
3838 val = convert_state(val);
3839
b30ab791 3840 rv = drbd_change_state(device, CS_VERBOSE, mask, val);
69a22773 3841 drbd_send_sr_reply(peer_device, rv);
b411b363 3842
b30ab791 3843 drbd_md_sync(device);
b411b363 3844
82bc0194 3845 return 0;
b411b363
PR
3846}
3847
bde89a9e 3848static int receive_req_conn_state(struct drbd_connection *connection, struct packet_info *pi)
b411b363 3849{
e658983a 3850 struct p_req_state *p = pi->data;
b411b363 3851 union drbd_state mask, val;
bf885f8a 3852 enum drbd_state_rv rv;
b411b363 3853
b411b363
PR
3854 mask.i = be32_to_cpu(p->mask);
3855 val.i = be32_to_cpu(p->val);
3856
bde89a9e
AG
3857 if (test_bit(RESOLVE_CONFLICTS, &connection->flags) &&
3858 mutex_is_locked(&connection->cstate_mutex)) {
3859 conn_send_sr_reply(connection, SS_CONCURRENT_ST_CHG);
82bc0194 3860 return 0;
b411b363
PR
3861 }
3862
3863 mask = convert_state(mask);
3864 val = convert_state(val);
3865
bde89a9e
AG
3866 rv = conn_request_state(connection, mask, val, CS_VERBOSE | CS_LOCAL_ONLY | CS_IGN_OUTD_FAIL);
3867 conn_send_sr_reply(connection, rv);
b411b363 3868
82bc0194 3869 return 0;
b411b363
PR
3870}
3871
bde89a9e 3872static int receive_state(struct drbd_connection *connection, struct packet_info *pi)
b411b363 3873{
9f4fe9ad 3874 struct drbd_peer_device *peer_device;
b30ab791 3875 struct drbd_device *device;
e658983a 3876 struct p_state *p = pi->data;
4ac4aada 3877 union drbd_state os, ns, peer_state;
b411b363 3878 enum drbd_disk_state real_peer_disk;
65d922c3 3879 enum chg_state_flags cs_flags;
b411b363
PR
3880 int rv;
3881
9f4fe9ad
AG
3882 peer_device = conn_peer_device(connection, pi->vnr);
3883 if (!peer_device)
bde89a9e 3884 return config_unknown_volume(connection, pi);
9f4fe9ad 3885 device = peer_device->device;
4a76b161 3886
b411b363
PR
3887 peer_state.i = be32_to_cpu(p->state);
3888
3889 real_peer_disk = peer_state.disk;
3890 if (peer_state.disk == D_NEGOTIATING) {
b30ab791 3891 real_peer_disk = device->p_uuid[UI_FLAGS] & 4 ? D_INCONSISTENT : D_CONSISTENT;
d0180171 3892 drbd_info(device, "real peer disk state = %s\n", drbd_disk_str(real_peer_disk));
b411b363
PR
3893 }
3894
0500813f 3895 spin_lock_irq(&device->resource->req_lock);
b411b363 3896 retry:
b30ab791 3897 os = ns = drbd_read_state(device);
0500813f 3898 spin_unlock_irq(&device->resource->req_lock);
b411b363 3899
545752d5
LE
3900 /* If some other part of the code (asender thread, timeout)
3901 * already decided to close the connection again,
3902 * we must not "re-establish" it here. */
3903 if (os.conn <= C_TEAR_DOWN)
58ffa580 3904 return -ECONNRESET;
545752d5 3905
40424e4a
LE
3906 /* If this is the "end of sync" confirmation, usually the peer disk
3907 * transitions from D_INCONSISTENT to D_UP_TO_DATE. For empty (0 bits
3908 * set) resync started in PausedSyncT, or if the timing of pause-/
3909 * unpause-sync events has been "just right", the peer disk may
3910 * transition from D_CONSISTENT to D_UP_TO_DATE as well.
3911 */
3912 if ((os.pdsk == D_INCONSISTENT || os.pdsk == D_CONSISTENT) &&
3913 real_peer_disk == D_UP_TO_DATE &&
e9ef7bb6
LE
3914 os.conn > C_CONNECTED && os.disk == D_UP_TO_DATE) {
3915 /* If we are (becoming) SyncSource, but peer is still in sync
3916 * preparation, ignore its uptodate-ness to avoid flapping, it
3917 * will change to inconsistent once the peer reaches active
3918 * syncing states.
3919 * It may have changed syncer-paused flags, however, so we
3920 * cannot ignore this completely. */
3921 if (peer_state.conn > C_CONNECTED &&
3922 peer_state.conn < C_SYNC_SOURCE)
3923 real_peer_disk = D_INCONSISTENT;
3924
3925 /* if peer_state changes to connected at the same time,
3926 * it explicitly notifies us that it finished resync.
3927 * Maybe we should finish it up, too? */
3928 else if (os.conn >= C_SYNC_SOURCE &&
3929 peer_state.conn == C_CONNECTED) {
b30ab791
AG
3930 if (drbd_bm_total_weight(device) <= device->rs_failed)
3931 drbd_resync_finished(device);
82bc0194 3932 return 0;
e9ef7bb6
LE
3933 }
3934 }
3935
02b91b55
LE
3936 /* explicit verify finished notification, stop sector reached. */
3937 if (os.conn == C_VERIFY_T && os.disk == D_UP_TO_DATE &&
3938 peer_state.conn == C_CONNECTED && real_peer_disk == D_UP_TO_DATE) {
b30ab791
AG
3939 ov_out_of_sync_print(device);
3940 drbd_resync_finished(device);
58ffa580 3941 return 0;
02b91b55
LE
3942 }
3943
e9ef7bb6
LE
3944 /* peer says his disk is inconsistent, while we think it is uptodate,
3945 * and this happens while the peer still thinks we have a sync going on,
3946 * but we think we are already done with the sync.
3947 * We ignore this to avoid flapping pdsk.
3948 * This should not happen, if the peer is a recent version of drbd. */
3949 if (os.pdsk == D_UP_TO_DATE && real_peer_disk == D_INCONSISTENT &&
3950 os.conn == C_CONNECTED && peer_state.conn > C_SYNC_SOURCE)
3951 real_peer_disk = D_UP_TO_DATE;
3952
4ac4aada
LE
3953 if (ns.conn == C_WF_REPORT_PARAMS)
3954 ns.conn = C_CONNECTED;
b411b363 3955
67531718
PR
3956 if (peer_state.conn == C_AHEAD)
3957 ns.conn = C_BEHIND;
3958
b30ab791
AG
3959 if (device->p_uuid && peer_state.disk >= D_NEGOTIATING &&
3960 get_ldev_if_state(device, D_NEGOTIATING)) {
b411b363
PR
3961 int cr; /* consider resync */
3962
3963 /* if we established a new connection */
4ac4aada 3964 cr = (os.conn < C_CONNECTED);
b411b363
PR
3965 /* if we had an established connection
3966 * and one of the nodes newly attaches a disk */
4ac4aada 3967 cr |= (os.conn == C_CONNECTED &&
b411b363 3968 (peer_state.disk == D_NEGOTIATING ||
4ac4aada 3969 os.disk == D_NEGOTIATING));
b411b363
PR
3970 /* if we have both been inconsistent, and the peer has been
3971 * forced to be UpToDate with --overwrite-data */
b30ab791 3972 cr |= test_bit(CONSIDER_RESYNC, &device->flags);
b411b363
PR
3973 /* if we had been plain connected, and the admin requested to
3974 * start a sync by "invalidate" or "invalidate-remote" */
4ac4aada 3975 cr |= (os.conn == C_CONNECTED &&
b411b363
PR
3976 (peer_state.conn >= C_STARTING_SYNC_S &&
3977 peer_state.conn <= C_WF_BITMAP_T));
3978
3979 if (cr)
69a22773 3980 ns.conn = drbd_sync_handshake(peer_device, peer_state.role, real_peer_disk);
b411b363 3981
b30ab791 3982 put_ldev(device);
4ac4aada
LE
3983 if (ns.conn == C_MASK) {
3984 ns.conn = C_CONNECTED;
b30ab791
AG
3985 if (device->state.disk == D_NEGOTIATING) {
3986 drbd_force_state(device, NS(disk, D_FAILED));
b411b363 3987 } else if (peer_state.disk == D_NEGOTIATING) {
d0180171 3988 drbd_err(device, "Disk attach process on the peer node was aborted.\n");
b411b363 3989 peer_state.disk = D_DISKLESS;
580b9767 3990 real_peer_disk = D_DISKLESS;
b411b363 3991 } else {
9f4fe9ad 3992 if (test_and_clear_bit(CONN_DRY_RUN, &peer_device->connection->flags))
82bc0194 3993 return -EIO;
0b0ba1ef 3994 D_ASSERT(device, os.conn == C_WF_REPORT_PARAMS);
9f4fe9ad 3995 conn_request_state(peer_device->connection, NS(conn, C_DISCONNECTING), CS_HARD);
82bc0194 3996 return -EIO;
b411b363
PR
3997 }
3998 }
3999 }
4000
0500813f 4001 spin_lock_irq(&device->resource->req_lock);
b30ab791 4002 if (os.i != drbd_read_state(device).i)
b411b363 4003 goto retry;
b30ab791 4004 clear_bit(CONSIDER_RESYNC, &device->flags);
b411b363
PR
4005 ns.peer = peer_state.role;
4006 ns.pdsk = real_peer_disk;
4007 ns.peer_isp = (peer_state.aftr_isp | peer_state.user_isp);
4ac4aada 4008 if ((ns.conn == C_CONNECTED || ns.conn == C_WF_BITMAP_S) && ns.disk == D_NEGOTIATING)
b30ab791 4009 ns.disk = device->new_state_tmp.disk;
4ac4aada 4010 cs_flags = CS_VERBOSE + (os.conn < C_CONNECTED && ns.conn >= C_CONNECTED ? 0 : CS_HARD);
b30ab791
AG
4011 if (ns.pdsk == D_CONSISTENT && drbd_suspended(device) && ns.conn == C_CONNECTED && os.conn < C_CONNECTED &&
4012 test_bit(NEW_CUR_UUID, &device->flags)) {
8554df1c 4013 /* Do not allow tl_restart(RESEND) for a rebooted peer. We can only allow this
481c6f50 4014 for temporal network outages! */
0500813f 4015 spin_unlock_irq(&device->resource->req_lock);
d0180171 4016 drbd_err(device, "Aborting Connect, can not thaw IO with an only Consistent peer\n");
9f4fe9ad 4017 tl_clear(peer_device->connection);
b30ab791
AG
4018 drbd_uuid_new_current(device);
4019 clear_bit(NEW_CUR_UUID, &device->flags);
9f4fe9ad 4020 conn_request_state(peer_device->connection, NS2(conn, C_PROTOCOL_ERROR, susp, 0), CS_HARD);
82bc0194 4021 return -EIO;
481c6f50 4022 }
b30ab791
AG
4023 rv = _drbd_set_state(device, ns, cs_flags, NULL);
4024 ns = drbd_read_state(device);
0500813f 4025 spin_unlock_irq(&device->resource->req_lock);
b411b363
PR
4026
4027 if (rv < SS_SUCCESS) {
9f4fe9ad 4028 conn_request_state(peer_device->connection, NS(conn, C_DISCONNECTING), CS_HARD);
82bc0194 4029 return -EIO;
b411b363
PR
4030 }
4031
4ac4aada
LE
4032 if (os.conn > C_WF_REPORT_PARAMS) {
4033 if (ns.conn > C_CONNECTED && peer_state.conn <= C_CONNECTED &&
b411b363
PR
4034 peer_state.disk != D_NEGOTIATING ) {
4035 /* we want resync, peer has not yet decided to sync... */
4036 /* Nowadays only used when forcing a node into primary role and
4037 setting its disk to UpToDate with that */
69a22773
AG
4038 drbd_send_uuids(peer_device);
4039 drbd_send_current_state(peer_device);
b411b363
PR
4040 }
4041 }
4042
b30ab791 4043 clear_bit(DISCARD_MY_DATA, &device->flags);
b411b363 4044
b30ab791 4045 drbd_md_sync(device); /* update connected indicator, la_size_sect, ... */
b411b363 4046
82bc0194 4047 return 0;
b411b363
PR
4048}
4049
bde89a9e 4050static int receive_sync_uuid(struct drbd_connection *connection, struct packet_info *pi)
b411b363 4051{
9f4fe9ad 4052 struct drbd_peer_device *peer_device;
b30ab791 4053 struct drbd_device *device;
e658983a 4054 struct p_rs_uuid *p = pi->data;
4a76b161 4055
9f4fe9ad
AG
4056 peer_device = conn_peer_device(connection, pi->vnr);
4057 if (!peer_device)
4a76b161 4058 return -EIO;
9f4fe9ad 4059 device = peer_device->device;
b411b363 4060
b30ab791
AG
4061 wait_event(device->misc_wait,
4062 device->state.conn == C_WF_SYNC_UUID ||
4063 device->state.conn == C_BEHIND ||
4064 device->state.conn < C_CONNECTED ||
4065 device->state.disk < D_NEGOTIATING);
b411b363 4066
0b0ba1ef 4067 /* D_ASSERT(device, device->state.conn == C_WF_SYNC_UUID ); */
b411b363 4068
b411b363
PR
4069 /* Here the _drbd_uuid_ functions are right, current should
4070 _not_ be rotated into the history */
b30ab791
AG
4071 if (get_ldev_if_state(device, D_NEGOTIATING)) {
4072 _drbd_uuid_set(device, UI_CURRENT, be64_to_cpu(p->uuid));
4073 _drbd_uuid_set(device, UI_BITMAP, 0UL);
b411b363 4074
b30ab791
AG
4075 drbd_print_uuids(device, "updated sync uuid");
4076 drbd_start_resync(device, C_SYNC_TARGET);
b411b363 4077
b30ab791 4078 put_ldev(device);
b411b363 4079 } else
d0180171 4080 drbd_err(device, "Ignoring SyncUUID packet!\n");
b411b363 4081
82bc0194 4082 return 0;
b411b363
PR
4083}
4084
2c46407d
AG
4085/**
4086 * receive_bitmap_plain
4087 *
4088 * Return 0 when done, 1 when another iteration is needed, and a negative error
4089 * code upon failure.
4090 */
4091static int
69a22773 4092receive_bitmap_plain(struct drbd_peer_device *peer_device, unsigned int size,
e658983a 4093 unsigned long *p, struct bm_xfer_ctx *c)
b411b363 4094{
50d0b1ad 4095 unsigned int data_size = DRBD_SOCKET_BUFFER_SIZE -
69a22773 4096 drbd_header_size(peer_device->connection);
e658983a 4097 unsigned int num_words = min_t(size_t, data_size / sizeof(*p),
50d0b1ad 4098 c->bm_words - c->word_offset);
e658983a 4099 unsigned int want = num_words * sizeof(*p);
2c46407d 4100 int err;
b411b363 4101
50d0b1ad 4102 if (want != size) {
69a22773 4103 drbd_err(peer_device, "%s:want (%u) != size (%u)\n", __func__, want, size);
2c46407d 4104 return -EIO;
b411b363
PR
4105 }
4106 if (want == 0)
2c46407d 4107 return 0;
69a22773 4108 err = drbd_recv_all(peer_device->connection, p, want);
82bc0194 4109 if (err)
2c46407d 4110 return err;
b411b363 4111
69a22773 4112 drbd_bm_merge_lel(peer_device->device, c->word_offset, num_words, p);
b411b363
PR
4113
4114 c->word_offset += num_words;
4115 c->bit_offset = c->word_offset * BITS_PER_LONG;
4116 if (c->bit_offset > c->bm_bits)
4117 c->bit_offset = c->bm_bits;
4118
2c46407d 4119 return 1;
b411b363
PR
4120}
4121
a02d1240
AG
4122static enum drbd_bitmap_code dcbp_get_code(struct p_compressed_bm *p)
4123{
4124 return (enum drbd_bitmap_code)(p->encoding & 0x0f);
4125}
4126
4127static int dcbp_get_start(struct p_compressed_bm *p)
4128{
4129 return (p->encoding & 0x80) != 0;
4130}
4131
4132static int dcbp_get_pad_bits(struct p_compressed_bm *p)
4133{
4134 return (p->encoding >> 4) & 0x7;
4135}
4136
2c46407d
AG
4137/**
4138 * recv_bm_rle_bits
4139 *
4140 * Return 0 when done, 1 when another iteration is needed, and a negative error
4141 * code upon failure.
4142 */
4143static int
69a22773 4144recv_bm_rle_bits(struct drbd_peer_device *peer_device,
b411b363 4145 struct p_compressed_bm *p,
c6d25cfe
PR
4146 struct bm_xfer_ctx *c,
4147 unsigned int len)
b411b363
PR
4148{
4149 struct bitstream bs;
4150 u64 look_ahead;
4151 u64 rl;
4152 u64 tmp;
4153 unsigned long s = c->bit_offset;
4154 unsigned long e;
a02d1240 4155 int toggle = dcbp_get_start(p);
b411b363
PR
4156 int have;
4157 int bits;
4158
a02d1240 4159 bitstream_init(&bs, p->code, len, dcbp_get_pad_bits(p));
b411b363
PR
4160
4161 bits = bitstream_get_bits(&bs, &look_ahead, 64);
4162 if (bits < 0)
2c46407d 4163 return -EIO;
b411b363
PR
4164
4165 for (have = bits; have > 0; s += rl, toggle = !toggle) {
4166 bits = vli_decode_bits(&rl, look_ahead);
4167 if (bits <= 0)
2c46407d 4168 return -EIO;
b411b363
PR
4169
4170 if (toggle) {
4171 e = s + rl -1;
4172 if (e >= c->bm_bits) {
69a22773 4173 drbd_err(peer_device, "bitmap overflow (e:%lu) while decoding bm RLE packet\n", e);
2c46407d 4174 return -EIO;
b411b363 4175 }
69a22773 4176 _drbd_bm_set_bits(peer_device->device, s, e);
b411b363
PR
4177 }
4178
4179 if (have < bits) {
69a22773 4180 drbd_err(peer_device, "bitmap decoding error: h:%d b:%d la:0x%08llx l:%u/%u\n",
b411b363
PR
4181 have, bits, look_ahead,
4182 (unsigned int)(bs.cur.b - p->code),
4183 (unsigned int)bs.buf_len);
2c46407d 4184 return -EIO;
b411b363 4185 }
d2da5b0c
LE
4186 /* if we consumed all 64 bits, assign 0; >> 64 is "undefined"; */
4187 if (likely(bits < 64))
4188 look_ahead >>= bits;
4189 else
4190 look_ahead = 0;
b411b363
PR
4191 have -= bits;
4192
4193 bits = bitstream_get_bits(&bs, &tmp, 64 - have);
4194 if (bits < 0)
2c46407d 4195 return -EIO;
b411b363
PR
4196 look_ahead |= tmp << have;
4197 have += bits;
4198 }
4199
4200 c->bit_offset = s;
4201 bm_xfer_ctx_bit_to_word_offset(c);
4202
2c46407d 4203 return (s != c->bm_bits);
b411b363
PR
4204}
4205
2c46407d
AG
4206/**
4207 * decode_bitmap_c
4208 *
4209 * Return 0 when done, 1 when another iteration is needed, and a negative error
4210 * code upon failure.
4211 */
4212static int
69a22773 4213decode_bitmap_c(struct drbd_peer_device *peer_device,
b411b363 4214 struct p_compressed_bm *p,
c6d25cfe
PR
4215 struct bm_xfer_ctx *c,
4216 unsigned int len)
b411b363 4217{
a02d1240 4218 if (dcbp_get_code(p) == RLE_VLI_Bits)
69a22773 4219 return recv_bm_rle_bits(peer_device, p, c, len - sizeof(*p));
b411b363
PR
4220
4221 /* other variants had been implemented for evaluation,
4222 * but have been dropped as this one turned out to be "best"
4223 * during all our tests. */
4224
69a22773
AG
4225 drbd_err(peer_device, "receive_bitmap_c: unknown encoding %u\n", p->encoding);
4226 conn_request_state(peer_device->connection, NS(conn, C_PROTOCOL_ERROR), CS_HARD);
2c46407d 4227 return -EIO;
b411b363
PR
4228}
4229
b30ab791 4230void INFO_bm_xfer_stats(struct drbd_device *device,
b411b363
PR
4231 const char *direction, struct bm_xfer_ctx *c)
4232{
4233 /* what would it take to transfer it "plaintext" */
a6b32bc3 4234 unsigned int header_size = drbd_header_size(first_peer_device(device)->connection);
50d0b1ad
AG
4235 unsigned int data_size = DRBD_SOCKET_BUFFER_SIZE - header_size;
4236 unsigned int plain =
4237 header_size * (DIV_ROUND_UP(c->bm_words, data_size) + 1) +
4238 c->bm_words * sizeof(unsigned long);
4239 unsigned int total = c->bytes[0] + c->bytes[1];
4240 unsigned int r;
b411b363
PR
4241
4242 /* total can not be zero. but just in case: */
4243 if (total == 0)
4244 return;
4245
4246 /* don't report if not compressed */
4247 if (total >= plain)
4248 return;
4249
4250 /* total < plain. check for overflow, still */
4251 r = (total > UINT_MAX/1000) ? (total / (plain/1000))
4252 : (1000 * total / plain);
4253
4254 if (r > 1000)
4255 r = 1000;
4256
4257 r = 1000 - r;
d0180171 4258 drbd_info(device, "%s bitmap stats [Bytes(packets)]: plain %u(%u), RLE %u(%u), "
b411b363
PR
4259 "total %u; compression: %u.%u%%\n",
4260 direction,
4261 c->bytes[1], c->packets[1],
4262 c->bytes[0], c->packets[0],
4263 total, r/10, r % 10);
4264}
4265
4266/* Since we are processing the bitfield from lower addresses to higher,
4267 it does not matter if the process it in 32 bit chunks or 64 bit
4268 chunks as long as it is little endian. (Understand it as byte stream,
4269 beginning with the lowest byte...) If we would use big endian
4270 we would need to process it from the highest address to the lowest,
4271 in order to be agnostic to the 32 vs 64 bits issue.
4272
4273 returns 0 on failure, 1 if we successfully received it. */
bde89a9e 4274static int receive_bitmap(struct drbd_connection *connection, struct packet_info *pi)
b411b363 4275{
9f4fe9ad 4276 struct drbd_peer_device *peer_device;
b30ab791 4277 struct drbd_device *device;
b411b363 4278 struct bm_xfer_ctx c;
2c46407d 4279 int err;
4a76b161 4280
9f4fe9ad
AG
4281 peer_device = conn_peer_device(connection, pi->vnr);
4282 if (!peer_device)
4a76b161 4283 return -EIO;
9f4fe9ad 4284 device = peer_device->device;
b411b363 4285
b30ab791 4286 drbd_bm_lock(device, "receive bitmap", BM_LOCKED_SET_ALLOWED);
20ceb2b2
LE
4287 /* you are supposed to send additional out-of-sync information
4288 * if you actually set bits during this phase */
b411b363 4289
b411b363 4290 c = (struct bm_xfer_ctx) {
b30ab791
AG
4291 .bm_bits = drbd_bm_bits(device),
4292 .bm_words = drbd_bm_words(device),
b411b363
PR
4293 };
4294
2c46407d 4295 for(;;) {
e658983a 4296 if (pi->cmd == P_BITMAP)
69a22773 4297 err = receive_bitmap_plain(peer_device, pi->size, pi->data, &c);
e658983a 4298 else if (pi->cmd == P_COMPRESSED_BITMAP) {
b411b363
PR
4299 /* MAYBE: sanity check that we speak proto >= 90,
4300 * and the feature is enabled! */
e658983a 4301 struct p_compressed_bm *p = pi->data;
b411b363 4302
bde89a9e 4303 if (pi->size > DRBD_SOCKET_BUFFER_SIZE - drbd_header_size(connection)) {
d0180171 4304 drbd_err(device, "ReportCBitmap packet too large\n");
82bc0194 4305 err = -EIO;
b411b363
PR
4306 goto out;
4307 }
e658983a 4308 if (pi->size <= sizeof(*p)) {
d0180171 4309 drbd_err(device, "ReportCBitmap packet too small (l:%u)\n", pi->size);
82bc0194 4310 err = -EIO;
78fcbdae 4311 goto out;
b411b363 4312 }
9f4fe9ad 4313 err = drbd_recv_all(peer_device->connection, p, pi->size);
e658983a
AG
4314 if (err)
4315 goto out;
69a22773 4316 err = decode_bitmap_c(peer_device, p, &c, pi->size);
b411b363 4317 } else {
d0180171 4318 drbd_warn(device, "receive_bitmap: cmd neither ReportBitMap nor ReportCBitMap (is 0x%x)", pi->cmd);
82bc0194 4319 err = -EIO;
b411b363
PR
4320 goto out;
4321 }
4322
e2857216 4323 c.packets[pi->cmd == P_BITMAP]++;
bde89a9e 4324 c.bytes[pi->cmd == P_BITMAP] += drbd_header_size(connection) + pi->size;
b411b363 4325
2c46407d
AG
4326 if (err <= 0) {
4327 if (err < 0)
4328 goto out;
b411b363 4329 break;
2c46407d 4330 }
9f4fe9ad 4331 err = drbd_recv_header(peer_device->connection, pi);
82bc0194 4332 if (err)
b411b363 4333 goto out;
2c46407d 4334 }
b411b363 4335
b30ab791 4336 INFO_bm_xfer_stats(device, "receive", &c);
b411b363 4337
b30ab791 4338 if (device->state.conn == C_WF_BITMAP_T) {
de1f8e4a
AG
4339 enum drbd_state_rv rv;
4340
b30ab791 4341 err = drbd_send_bitmap(device);
82bc0194 4342 if (err)
b411b363
PR
4343 goto out;
4344 /* Omit CS_ORDERED with this state transition to avoid deadlocks. */
b30ab791 4345 rv = _drbd_request_state(device, NS(conn, C_WF_SYNC_UUID), CS_VERBOSE);
0b0ba1ef 4346 D_ASSERT(device, rv == SS_SUCCESS);
b30ab791 4347 } else if (device->state.conn != C_WF_BITMAP_S) {
b411b363
PR
4348 /* admin may have requested C_DISCONNECTING,
4349 * other threads may have noticed network errors */
d0180171 4350 drbd_info(device, "unexpected cstate (%s) in receive_bitmap\n",
b30ab791 4351 drbd_conn_str(device->state.conn));
b411b363 4352 }
82bc0194 4353 err = 0;
b411b363 4354
b411b363 4355 out:
b30ab791
AG
4356 drbd_bm_unlock(device);
4357 if (!err && device->state.conn == C_WF_BITMAP_S)
4358 drbd_start_resync(device, C_SYNC_SOURCE);
82bc0194 4359 return err;
b411b363
PR
4360}
4361
bde89a9e 4362static int receive_skip(struct drbd_connection *connection, struct packet_info *pi)
b411b363 4363{
1ec861eb 4364 drbd_warn(connection, "skipping unknown optional packet type %d, l: %d!\n",
e2857216 4365 pi->cmd, pi->size);
b411b363 4366
bde89a9e 4367 return ignore_remaining_packet(connection, pi);
b411b363
PR
4368}
4369
bde89a9e 4370static int receive_UnplugRemote(struct drbd_connection *connection, struct packet_info *pi)
0ced55a3 4371{
e7f52dfb
LE
4372 /* Make sure we've acked all the TCP data associated
4373 * with the data requests being unplugged */
bde89a9e 4374 drbd_tcp_quickack(connection->data.socket);
0ced55a3 4375
82bc0194 4376 return 0;
0ced55a3
PR
4377}
4378
bde89a9e 4379static int receive_out_of_sync(struct drbd_connection *connection, struct packet_info *pi)
73a01a18 4380{
9f4fe9ad 4381 struct drbd_peer_device *peer_device;
b30ab791 4382 struct drbd_device *device;
e658983a 4383 struct p_block_desc *p = pi->data;
4a76b161 4384
9f4fe9ad
AG
4385 peer_device = conn_peer_device(connection, pi->vnr);
4386 if (!peer_device)
4a76b161 4387 return -EIO;
9f4fe9ad 4388 device = peer_device->device;
73a01a18 4389
b30ab791 4390 switch (device->state.conn) {
f735e363
LE
4391 case C_WF_SYNC_UUID:
4392 case C_WF_BITMAP_T:
4393 case C_BEHIND:
4394 break;
4395 default:
d0180171 4396 drbd_err(device, "ASSERT FAILED cstate = %s, expected: WFSyncUUID|WFBitMapT|Behind\n",
b30ab791 4397 drbd_conn_str(device->state.conn));
f735e363
LE
4398 }
4399
b30ab791 4400 drbd_set_out_of_sync(device, be64_to_cpu(p->sector), be32_to_cpu(p->blksize));
73a01a18 4401
82bc0194 4402 return 0;
73a01a18
PR
4403}
4404
02918be2
PR
4405struct data_cmd {
4406 int expect_payload;
4407 size_t pkt_size;
bde89a9e 4408 int (*fn)(struct drbd_connection *, struct packet_info *);
02918be2
PR
4409};
4410
4411static struct data_cmd drbd_cmd_handler[] = {
4412 [P_DATA] = { 1, sizeof(struct p_data), receive_Data },
4413 [P_DATA_REPLY] = { 1, sizeof(struct p_data), receive_DataReply },
4414 [P_RS_DATA_REPLY] = { 1, sizeof(struct p_data), receive_RSDataReply } ,
4415 [P_BARRIER] = { 0, sizeof(struct p_barrier), receive_Barrier } ,
e658983a
AG
4416 [P_BITMAP] = { 1, 0, receive_bitmap } ,
4417 [P_COMPRESSED_BITMAP] = { 1, 0, receive_bitmap } ,
4418 [P_UNPLUG_REMOTE] = { 0, 0, receive_UnplugRemote },
02918be2
PR
4419 [P_DATA_REQUEST] = { 0, sizeof(struct p_block_req), receive_DataRequest },
4420 [P_RS_DATA_REQUEST] = { 0, sizeof(struct p_block_req), receive_DataRequest },
e658983a
AG
4421 [P_SYNC_PARAM] = { 1, 0, receive_SyncParam },
4422 [P_SYNC_PARAM89] = { 1, 0, receive_SyncParam },
02918be2
PR
4423 [P_PROTOCOL] = { 1, sizeof(struct p_protocol), receive_protocol },
4424 [P_UUIDS] = { 0, sizeof(struct p_uuids), receive_uuids },
4425 [P_SIZES] = { 0, sizeof(struct p_sizes), receive_sizes },
4426 [P_STATE] = { 0, sizeof(struct p_state), receive_state },
4427 [P_STATE_CHG_REQ] = { 0, sizeof(struct p_req_state), receive_req_state },
4428 [P_SYNC_UUID] = { 0, sizeof(struct p_rs_uuid), receive_sync_uuid },
4429 [P_OV_REQUEST] = { 0, sizeof(struct p_block_req), receive_DataRequest },
4430 [P_OV_REPLY] = { 1, sizeof(struct p_block_req), receive_DataRequest },
4431 [P_CSUM_RS_REQUEST] = { 1, sizeof(struct p_block_req), receive_DataRequest },
4432 [P_DELAY_PROBE] = { 0, sizeof(struct p_delay_probe93), receive_skip },
73a01a18 4433 [P_OUT_OF_SYNC] = { 0, sizeof(struct p_block_desc), receive_out_of_sync },
4a76b161 4434 [P_CONN_ST_CHG_REQ] = { 0, sizeof(struct p_req_state), receive_req_conn_state },
036b17ea 4435 [P_PROTOCOL_UPDATE] = { 1, sizeof(struct p_protocol), receive_protocol },
b411b363
PR
4436};
4437
bde89a9e 4438static void drbdd(struct drbd_connection *connection)
b411b363 4439{
77351055 4440 struct packet_info pi;
02918be2 4441 size_t shs; /* sub header size */
82bc0194 4442 int err;
b411b363 4443
bde89a9e 4444 while (get_t_state(&connection->receiver) == RUNNING) {
deebe195 4445 struct data_cmd *cmd;
b411b363 4446
bde89a9e
AG
4447 drbd_thread_current_set_cpu(&connection->receiver);
4448 if (drbd_recv_header(connection, &pi))
02918be2 4449 goto err_out;
b411b363 4450
deebe195 4451 cmd = &drbd_cmd_handler[pi.cmd];
4a76b161 4452 if (unlikely(pi.cmd >= ARRAY_SIZE(drbd_cmd_handler) || !cmd->fn)) {
1ec861eb 4453 drbd_err(connection, "Unexpected data packet %s (0x%04x)",
2fcb8f30 4454 cmdname(pi.cmd), pi.cmd);
02918be2 4455 goto err_out;
0b33a916 4456 }
b411b363 4457
e658983a
AG
4458 shs = cmd->pkt_size;
4459 if (pi.size > shs && !cmd->expect_payload) {
1ec861eb 4460 drbd_err(connection, "No payload expected %s l:%d\n",
2fcb8f30 4461 cmdname(pi.cmd), pi.size);
02918be2 4462 goto err_out;
b411b363 4463 }
b411b363 4464
c13f7e1a 4465 if (shs) {
bde89a9e 4466 err = drbd_recv_all_warn(connection, pi.data, shs);
a5c31904 4467 if (err)
c13f7e1a 4468 goto err_out;
e2857216 4469 pi.size -= shs;
c13f7e1a
LE
4470 }
4471
bde89a9e 4472 err = cmd->fn(connection, &pi);
4a76b161 4473 if (err) {
1ec861eb 4474 drbd_err(connection, "error receiving %s, e: %d l: %d!\n",
9f5bdc33 4475 cmdname(pi.cmd), err, pi.size);
02918be2 4476 goto err_out;
b411b363
PR
4477 }
4478 }
82bc0194 4479 return;
b411b363 4480
82bc0194 4481 err_out:
bde89a9e 4482 conn_request_state(connection, NS(conn, C_PROTOCOL_ERROR), CS_HARD);
b411b363
PR
4483}
4484
bde89a9e 4485void conn_flush_workqueue(struct drbd_connection *connection)
b411b363
PR
4486{
4487 struct drbd_wq_barrier barr;
4488
4489 barr.w.cb = w_prev_work_done;
bde89a9e 4490 barr.w.connection = connection;
b411b363 4491 init_completion(&barr.done);
bde89a9e 4492 drbd_queue_work(&connection->sender_work, &barr.w);
b411b363
PR
4493 wait_for_completion(&barr.done);
4494}
4495
bde89a9e 4496static void conn_disconnect(struct drbd_connection *connection)
b411b363 4497{
c06ece6b 4498 struct drbd_peer_device *peer_device;
bbeb641c 4499 enum drbd_conns oc;
376694a0 4500 int vnr;
b411b363 4501
bde89a9e 4502 if (connection->cstate == C_STANDALONE)
b411b363 4503 return;
b411b363 4504
545752d5
LE
4505 /* We are about to start the cleanup after connection loss.
4506 * Make sure drbd_make_request knows about that.
4507 * Usually we should be in some network failure state already,
4508 * but just in case we are not, we fix it up here.
4509 */
bde89a9e 4510 conn_request_state(connection, NS(conn, C_NETWORK_FAILURE), CS_HARD);
545752d5 4511
b411b363 4512 /* asender does not clean up anything. it must not interfere, either */
bde89a9e
AG
4513 drbd_thread_stop(&connection->asender);
4514 drbd_free_sock(connection);
360cc740 4515
c141ebda 4516 rcu_read_lock();
c06ece6b
AG
4517 idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
4518 struct drbd_device *device = peer_device->device;
b30ab791 4519 kref_get(&device->kref);
c141ebda 4520 rcu_read_unlock();
69a22773 4521 drbd_disconnected(peer_device);
c06ece6b 4522 kref_put(&device->kref, drbd_destroy_device);
c141ebda
PR
4523 rcu_read_lock();
4524 }
4525 rcu_read_unlock();
4526
bde89a9e 4527 if (!list_empty(&connection->current_epoch->list))
1ec861eb 4528 drbd_err(connection, "ASSERTION FAILED: connection->current_epoch->list not empty\n");
12038a3a 4529 /* ok, no more ee's on the fly, it is safe to reset the epoch_size */
bde89a9e
AG
4530 atomic_set(&connection->current_epoch->epoch_size, 0);
4531 connection->send.seen_any_write_yet = false;
12038a3a 4532
1ec861eb 4533 drbd_info(connection, "Connection closed\n");
360cc740 4534
bde89a9e
AG
4535 if (conn_highest_role(connection) == R_PRIMARY && conn_highest_pdsk(connection) >= D_UNKNOWN)
4536 conn_try_outdate_peer_async(connection);
cb703454 4537
0500813f 4538 spin_lock_irq(&connection->resource->req_lock);
bde89a9e 4539 oc = connection->cstate;
bbeb641c 4540 if (oc >= C_UNCONNECTED)
bde89a9e 4541 _conn_request_state(connection, NS(conn, C_UNCONNECTED), CS_VERBOSE);
bbeb641c 4542
0500813f 4543 spin_unlock_irq(&connection->resource->req_lock);
360cc740 4544
f3dfa40a 4545 if (oc == C_DISCONNECTING)
bde89a9e 4546 conn_request_state(connection, NS(conn, C_STANDALONE), CS_VERBOSE | CS_HARD);
360cc740
PR
4547}
4548
69a22773 4549static int drbd_disconnected(struct drbd_peer_device *peer_device)
360cc740 4550{
69a22773 4551 struct drbd_device *device = peer_device->device;
360cc740 4552 unsigned int i;
b411b363 4553
85719573 4554 /* wait for current activity to cease. */
0500813f 4555 spin_lock_irq(&device->resource->req_lock);
b30ab791
AG
4556 _drbd_wait_ee_list_empty(device, &device->active_ee);
4557 _drbd_wait_ee_list_empty(device, &device->sync_ee);
4558 _drbd_wait_ee_list_empty(device, &device->read_ee);
0500813f 4559 spin_unlock_irq(&device->resource->req_lock);
b411b363
PR
4560
4561 /* We do not have data structures that would allow us to
4562 * get the rs_pending_cnt down to 0 again.
4563 * * On C_SYNC_TARGET we do not have any data structures describing
4564 * the pending RSDataRequest's we have sent.
4565 * * On C_SYNC_SOURCE there is no data structure that tracks
4566 * the P_RS_DATA_REPLY blocks that we sent to the SyncTarget.
4567 * And no, it is not the sum of the reference counts in the
4568 * resync_LRU. The resync_LRU tracks the whole operation including
4569 * the disk-IO, while the rs_pending_cnt only tracks the blocks
4570 * on the fly. */
b30ab791
AG
4571 drbd_rs_cancel_all(device);
4572 device->rs_total = 0;
4573 device->rs_failed = 0;
4574 atomic_set(&device->rs_pending_cnt, 0);
4575 wake_up(&device->misc_wait);
b411b363 4576
b30ab791
AG
4577 del_timer_sync(&device->resync_timer);
4578 resync_timer_fn((unsigned long)device);
b411b363 4579
b411b363
PR
4580 /* wait for all w_e_end_data_req, w_e_end_rsdata_req, w_send_barrier,
4581 * w_make_resync_request etc. which may still be on the worker queue
4582 * to be "canceled" */
b30ab791 4583 drbd_flush_workqueue(device);
b411b363 4584
b30ab791 4585 drbd_finish_peer_reqs(device);
b411b363 4586
d10b4ea3
PR
4587 /* This second workqueue flush is necessary, since drbd_finish_peer_reqs()
4588 might have issued a work again. The one before drbd_finish_peer_reqs() is
4589 necessary to reclain net_ee in drbd_finish_peer_reqs(). */
b30ab791 4590 drbd_flush_workqueue(device);
d10b4ea3 4591
08332d73
LE
4592 /* need to do it again, drbd_finish_peer_reqs() may have populated it
4593 * again via drbd_try_clear_on_disk_bm(). */
b30ab791 4594 drbd_rs_cancel_all(device);
b411b363 4595
b30ab791
AG
4596 kfree(device->p_uuid);
4597 device->p_uuid = NULL;
b411b363 4598
b30ab791 4599 if (!drbd_suspended(device))
69a22773 4600 tl_clear(peer_device->connection);
b411b363 4601
b30ab791 4602 drbd_md_sync(device);
b411b363 4603
20ceb2b2
LE
4604 /* serialize with bitmap writeout triggered by the state change,
4605 * if any. */
b30ab791 4606 wait_event(device->misc_wait, !test_bit(BITMAP_IO, &device->flags));
20ceb2b2 4607
b411b363
PR
4608 /* tcp_close and release of sendpage pages can be deferred. I don't
4609 * want to use SO_LINGER, because apparently it can be deferred for
4610 * more than 20 seconds (longest time I checked).
4611 *
4612 * Actually we don't care for exactly when the network stack does its
4613 * put_page(), but release our reference on these pages right here.
4614 */
b30ab791 4615 i = drbd_free_peer_reqs(device, &device->net_ee);
b411b363 4616 if (i)
d0180171 4617 drbd_info(device, "net_ee not empty, killed %u entries\n", i);
b30ab791 4618 i = atomic_read(&device->pp_in_use_by_net);
435f0740 4619 if (i)
d0180171 4620 drbd_info(device, "pp_in_use_by_net = %d, expected 0\n", i);
b30ab791 4621 i = atomic_read(&device->pp_in_use);
b411b363 4622 if (i)
d0180171 4623 drbd_info(device, "pp_in_use = %d, expected 0\n", i);
b411b363 4624
0b0ba1ef
AG
4625 D_ASSERT(device, list_empty(&device->read_ee));
4626 D_ASSERT(device, list_empty(&device->active_ee));
4627 D_ASSERT(device, list_empty(&device->sync_ee));
4628 D_ASSERT(device, list_empty(&device->done_ee));
b411b363 4629
360cc740 4630 return 0;
b411b363
PR
4631}
4632
4633/*
4634 * We support PRO_VERSION_MIN to PRO_VERSION_MAX. The protocol version
4635 * we can agree on is stored in agreed_pro_version.
4636 *
4637 * feature flags and the reserved array should be enough room for future
4638 * enhancements of the handshake protocol, and possible plugins...
4639 *
4640 * for now, they are expected to be zero, but ignored.
4641 */
bde89a9e 4642static int drbd_send_features(struct drbd_connection *connection)
b411b363 4643{
9f5bdc33
AG
4644 struct drbd_socket *sock;
4645 struct p_connection_features *p;
b411b363 4646
bde89a9e
AG
4647 sock = &connection->data;
4648 p = conn_prepare_command(connection, sock);
9f5bdc33 4649 if (!p)
e8d17b01 4650 return -EIO;
b411b363
PR
4651 memset(p, 0, sizeof(*p));
4652 p->protocol_min = cpu_to_be32(PRO_VERSION_MIN);
4653 p->protocol_max = cpu_to_be32(PRO_VERSION_MAX);
bde89a9e 4654 return conn_send_command(connection, sock, P_CONNECTION_FEATURES, sizeof(*p), NULL, 0);
b411b363
PR
4655}
4656
4657/*
4658 * return values:
4659 * 1 yes, we have a valid connection
4660 * 0 oops, did not work out, please try again
4661 * -1 peer talks different language,
4662 * no point in trying again, please go standalone.
4663 */
bde89a9e 4664static int drbd_do_features(struct drbd_connection *connection)
b411b363 4665{
bde89a9e 4666 /* ASSERT current == connection->receiver ... */
e658983a
AG
4667 struct p_connection_features *p;
4668 const int expect = sizeof(struct p_connection_features);
77351055 4669 struct packet_info pi;
a5c31904 4670 int err;
b411b363 4671
bde89a9e 4672 err = drbd_send_features(connection);
e8d17b01 4673 if (err)
b411b363
PR
4674 return 0;
4675
bde89a9e 4676 err = drbd_recv_header(connection, &pi);
69bc7bc3 4677 if (err)
b411b363
PR
4678 return 0;
4679
6038178e 4680 if (pi.cmd != P_CONNECTION_FEATURES) {
1ec861eb 4681 drbd_err(connection, "expected ConnectionFeatures packet, received: %s (0x%04x)\n",
2fcb8f30 4682 cmdname(pi.cmd), pi.cmd);
b411b363
PR
4683 return -1;
4684 }
4685
77351055 4686 if (pi.size != expect) {
1ec861eb 4687 drbd_err(connection, "expected ConnectionFeatures length: %u, received: %u\n",
77351055 4688 expect, pi.size);
b411b363
PR
4689 return -1;
4690 }
4691
e658983a 4692 p = pi.data;
bde89a9e 4693 err = drbd_recv_all_warn(connection, p, expect);
a5c31904 4694 if (err)
b411b363 4695 return 0;
b411b363 4696
b411b363
PR
4697 p->protocol_min = be32_to_cpu(p->protocol_min);
4698 p->protocol_max = be32_to_cpu(p->protocol_max);
4699 if (p->protocol_max == 0)
4700 p->protocol_max = p->protocol_min;
4701
4702 if (PRO_VERSION_MAX < p->protocol_min ||
4703 PRO_VERSION_MIN > p->protocol_max)
4704 goto incompat;
4705
bde89a9e 4706 connection->agreed_pro_version = min_t(int, PRO_VERSION_MAX, p->protocol_max);
b411b363 4707
1ec861eb 4708 drbd_info(connection, "Handshake successful: "
bde89a9e 4709 "Agreed network protocol version %d\n", connection->agreed_pro_version);
b411b363
PR
4710
4711 return 1;
4712
4713 incompat:
1ec861eb 4714 drbd_err(connection, "incompatible DRBD dialects: "
b411b363
PR
4715 "I support %d-%d, peer supports %d-%d\n",
4716 PRO_VERSION_MIN, PRO_VERSION_MAX,
4717 p->protocol_min, p->protocol_max);
4718 return -1;
4719}
4720
4721#if !defined(CONFIG_CRYPTO_HMAC) && !defined(CONFIG_CRYPTO_HMAC_MODULE)
bde89a9e 4722static int drbd_do_auth(struct drbd_connection *connection)
b411b363 4723{
1ec861eb
AG
4724 drbd_err(connection, "This kernel was build without CONFIG_CRYPTO_HMAC.\n");
4725 drbd_err(connection, "You need to disable 'cram-hmac-alg' in drbd.conf.\n");
b10d96cb 4726 return -1;
b411b363
PR
4727}
4728#else
4729#define CHALLENGE_LEN 64
b10d96cb
JT
4730
4731/* Return value:
4732 1 - auth succeeded,
4733 0 - failed, try again (network error),
4734 -1 - auth failed, don't try again.
4735*/
4736
bde89a9e 4737static int drbd_do_auth(struct drbd_connection *connection)
b411b363 4738{
9f5bdc33 4739 struct drbd_socket *sock;
b411b363
PR
4740 char my_challenge[CHALLENGE_LEN]; /* 64 Bytes... */
4741 struct scatterlist sg;
4742 char *response = NULL;
4743 char *right_response = NULL;
4744 char *peers_ch = NULL;
44ed167d
PR
4745 unsigned int key_len;
4746 char secret[SHARED_SECRET_MAX]; /* 64 byte */
b411b363
PR
4747 unsigned int resp_size;
4748 struct hash_desc desc;
77351055 4749 struct packet_info pi;
44ed167d 4750 struct net_conf *nc;
69bc7bc3 4751 int err, rv;
b411b363 4752
9f5bdc33 4753 /* FIXME: Put the challenge/response into the preallocated socket buffer. */
b411b363 4754
44ed167d 4755 rcu_read_lock();
bde89a9e 4756 nc = rcu_dereference(connection->net_conf);
44ed167d
PR
4757 key_len = strlen(nc->shared_secret);
4758 memcpy(secret, nc->shared_secret, key_len);
4759 rcu_read_unlock();
4760
bde89a9e 4761 desc.tfm = connection->cram_hmac_tfm;
b411b363
PR
4762 desc.flags = 0;
4763
bde89a9e 4764 rv = crypto_hash_setkey(connection->cram_hmac_tfm, (u8 *)secret, key_len);
b411b363 4765 if (rv) {
1ec861eb 4766 drbd_err(connection, "crypto_hash_setkey() failed with %d\n", rv);
b10d96cb 4767 rv = -1;
b411b363
PR
4768 goto fail;
4769 }
4770
4771 get_random_bytes(my_challenge, CHALLENGE_LEN);
4772
bde89a9e
AG
4773 sock = &connection->data;
4774 if (!conn_prepare_command(connection, sock)) {
9f5bdc33
AG
4775 rv = 0;
4776 goto fail;
4777 }
bde89a9e 4778 rv = !conn_send_command(connection, sock, P_AUTH_CHALLENGE, 0,
9f5bdc33 4779 my_challenge, CHALLENGE_LEN);
b411b363
PR
4780 if (!rv)
4781 goto fail;
4782
bde89a9e 4783 err = drbd_recv_header(connection, &pi);
69bc7bc3
AG
4784 if (err) {
4785 rv = 0;
b411b363 4786 goto fail;
69bc7bc3 4787 }
b411b363 4788
77351055 4789 if (pi.cmd != P_AUTH_CHALLENGE) {
1ec861eb 4790 drbd_err(connection, "expected AuthChallenge packet, received: %s (0x%04x)\n",
2fcb8f30 4791 cmdname(pi.cmd), pi.cmd);
b411b363
PR
4792 rv = 0;
4793 goto fail;
4794 }
4795
77351055 4796 if (pi.size > CHALLENGE_LEN * 2) {
1ec861eb 4797 drbd_err(connection, "expected AuthChallenge payload too big.\n");
b10d96cb 4798 rv = -1;
b411b363
PR
4799 goto fail;
4800 }
4801
77351055 4802 peers_ch = kmalloc(pi.size, GFP_NOIO);
b411b363 4803 if (peers_ch == NULL) {
1ec861eb 4804 drbd_err(connection, "kmalloc of peers_ch failed\n");
b10d96cb 4805 rv = -1;
b411b363
PR
4806 goto fail;
4807 }
4808
bde89a9e 4809 err = drbd_recv_all_warn(connection, peers_ch, pi.size);
a5c31904 4810 if (err) {
b411b363
PR
4811 rv = 0;
4812 goto fail;
4813 }
4814
bde89a9e 4815 resp_size = crypto_hash_digestsize(connection->cram_hmac_tfm);
b411b363
PR
4816 response = kmalloc(resp_size, GFP_NOIO);
4817 if (response == NULL) {
1ec861eb 4818 drbd_err(connection, "kmalloc of response failed\n");
b10d96cb 4819 rv = -1;
b411b363
PR
4820 goto fail;
4821 }
4822
4823 sg_init_table(&sg, 1);
77351055 4824 sg_set_buf(&sg, peers_ch, pi.size);
b411b363
PR
4825
4826 rv = crypto_hash_digest(&desc, &sg, sg.length, response);
4827 if (rv) {
1ec861eb 4828 drbd_err(connection, "crypto_hash_digest() failed with %d\n", rv);
b10d96cb 4829 rv = -1;
b411b363
PR
4830 goto fail;
4831 }
4832
bde89a9e 4833 if (!conn_prepare_command(connection, sock)) {
9f5bdc33 4834 rv = 0;
b411b363 4835 goto fail;
9f5bdc33 4836 }
bde89a9e 4837 rv = !conn_send_command(connection, sock, P_AUTH_RESPONSE, 0,
9f5bdc33 4838 response, resp_size);
b411b363
PR
4839 if (!rv)
4840 goto fail;
4841
bde89a9e 4842 err = drbd_recv_header(connection, &pi);
69bc7bc3 4843 if (err) {
b411b363
PR
4844 rv = 0;
4845 goto fail;
4846 }
4847
77351055 4848 if (pi.cmd != P_AUTH_RESPONSE) {
1ec861eb 4849 drbd_err(connection, "expected AuthResponse packet, received: %s (0x%04x)\n",
2fcb8f30 4850 cmdname(pi.cmd), pi.cmd);
b411b363
PR
4851 rv = 0;
4852 goto fail;
4853 }
4854
77351055 4855 if (pi.size != resp_size) {
1ec861eb 4856 drbd_err(connection, "expected AuthResponse payload of wrong size\n");
b411b363
PR
4857 rv = 0;
4858 goto fail;
4859 }
b411b363 4860
bde89a9e 4861 err = drbd_recv_all_warn(connection, response , resp_size);
a5c31904 4862 if (err) {
b411b363
PR
4863 rv = 0;
4864 goto fail;
4865 }
4866
4867 right_response = kmalloc(resp_size, GFP_NOIO);
2d1ee87d 4868 if (right_response == NULL) {
1ec861eb 4869 drbd_err(connection, "kmalloc of right_response failed\n");
b10d96cb 4870 rv = -1;
b411b363
PR
4871 goto fail;
4872 }
4873
4874 sg_set_buf(&sg, my_challenge, CHALLENGE_LEN);
4875
4876 rv = crypto_hash_digest(&desc, &sg, sg.length, right_response);
4877 if (rv) {
1ec861eb 4878 drbd_err(connection, "crypto_hash_digest() failed with %d\n", rv);
b10d96cb 4879 rv = -1;
b411b363
PR
4880 goto fail;
4881 }
4882
4883 rv = !memcmp(response, right_response, resp_size);
4884
4885 if (rv)
1ec861eb 4886 drbd_info(connection, "Peer authenticated using %d bytes HMAC\n",
44ed167d 4887 resp_size);
b10d96cb
JT
4888 else
4889 rv = -1;
b411b363
PR
4890
4891 fail:
4892 kfree(peers_ch);
4893 kfree(response);
4894 kfree(right_response);
4895
4896 return rv;
4897}
4898#endif
4899
8fe60551 4900int drbd_receiver(struct drbd_thread *thi)
b411b363 4901{
bde89a9e 4902 struct drbd_connection *connection = thi->connection;
b411b363
PR
4903 int h;
4904
1ec861eb 4905 drbd_info(connection, "receiver (re)started\n");
b411b363
PR
4906
4907 do {
bde89a9e 4908 h = conn_connect(connection);
b411b363 4909 if (h == 0) {
bde89a9e 4910 conn_disconnect(connection);
20ee6390 4911 schedule_timeout_interruptible(HZ);
b411b363
PR
4912 }
4913 if (h == -1) {
1ec861eb 4914 drbd_warn(connection, "Discarding network configuration.\n");
bde89a9e 4915 conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD);
b411b363
PR
4916 }
4917 } while (h == 0);
4918
91fd4dad 4919 if (h > 0)
bde89a9e 4920 drbdd(connection);
b411b363 4921
bde89a9e 4922 conn_disconnect(connection);
b411b363 4923
1ec861eb 4924 drbd_info(connection, "receiver terminated\n");
b411b363
PR
4925 return 0;
4926}
4927
4928/* ********* acknowledge sender ******** */
4929
bde89a9e 4930static int got_conn_RqSReply(struct drbd_connection *connection, struct packet_info *pi)
b411b363 4931{
e658983a 4932 struct p_req_state_reply *p = pi->data;
e4f78ede
PR
4933 int retcode = be32_to_cpu(p->retcode);
4934
4935 if (retcode >= SS_SUCCESS) {
bde89a9e 4936 set_bit(CONN_WD_ST_CHG_OKAY, &connection->flags);
e4f78ede 4937 } else {
bde89a9e 4938 set_bit(CONN_WD_ST_CHG_FAIL, &connection->flags);
1ec861eb 4939 drbd_err(connection, "Requested state change failed by peer: %s (%d)\n",
e4f78ede
PR
4940 drbd_set_st_err_str(retcode), retcode);
4941 }
bde89a9e 4942 wake_up(&connection->ping_wait);
e4f78ede 4943
2735a594 4944 return 0;
e4f78ede 4945}
b411b363 4946
bde89a9e 4947static int got_RqSReply(struct drbd_connection *connection, struct packet_info *pi)
b411b363 4948{
9f4fe9ad 4949 struct drbd_peer_device *peer_device;
b30ab791 4950 struct drbd_device *device;
e658983a 4951 struct p_req_state_reply *p = pi->data;
b411b363
PR
4952 int retcode = be32_to_cpu(p->retcode);
4953
9f4fe9ad
AG
4954 peer_device = conn_peer_device(connection, pi->vnr);
4955 if (!peer_device)
2735a594 4956 return -EIO;
9f4fe9ad 4957 device = peer_device->device;
1952e916 4958
bde89a9e 4959 if (test_bit(CONN_WD_ST_CHG_REQ, &connection->flags)) {
0b0ba1ef 4960 D_ASSERT(device, connection->agreed_pro_version < 100);
bde89a9e 4961 return got_conn_RqSReply(connection, pi);
4d0fc3fd
PR
4962 }
4963
b411b363 4964 if (retcode >= SS_SUCCESS) {
b30ab791 4965 set_bit(CL_ST_CHG_SUCCESS, &device->flags);
b411b363 4966 } else {
b30ab791 4967 set_bit(CL_ST_CHG_FAIL, &device->flags);
d0180171 4968 drbd_err(device, "Requested state change failed by peer: %s (%d)\n",
e4f78ede 4969 drbd_set_st_err_str(retcode), retcode);
b411b363 4970 }
b30ab791 4971 wake_up(&device->state_wait);
b411b363 4972
2735a594 4973 return 0;
b411b363
PR
4974}
4975
bde89a9e 4976static int got_Ping(struct drbd_connection *connection, struct packet_info *pi)
b411b363 4977{
bde89a9e 4978 return drbd_send_ping_ack(connection);
b411b363
PR
4979
4980}
4981
bde89a9e 4982static int got_PingAck(struct drbd_connection *connection, struct packet_info *pi)
b411b363
PR
4983{
4984 /* restore idle timeout */
bde89a9e
AG
4985 connection->meta.socket->sk->sk_rcvtimeo = connection->net_conf->ping_int*HZ;
4986 if (!test_and_set_bit(GOT_PING_ACK, &connection->flags))
4987 wake_up(&connection->ping_wait);
b411b363 4988
2735a594 4989 return 0;
b411b363
PR
4990}
4991
bde89a9e 4992static int got_IsInSync(struct drbd_connection *connection, struct packet_info *pi)
b411b363 4993{
9f4fe9ad 4994 struct drbd_peer_device *peer_device;
b30ab791 4995 struct drbd_device *device;
e658983a 4996 struct p_block_ack *p = pi->data;
b411b363
PR
4997 sector_t sector = be64_to_cpu(p->sector);
4998 int blksize = be32_to_cpu(p->blksize);
4999
9f4fe9ad
AG
5000 peer_device = conn_peer_device(connection, pi->vnr);
5001 if (!peer_device)
2735a594 5002 return -EIO;
9f4fe9ad 5003 device = peer_device->device;
1952e916 5004
9f4fe9ad 5005 D_ASSERT(device, peer_device->connection->agreed_pro_version >= 89);
b411b363 5006
69a22773 5007 update_peer_seq(peer_device, be32_to_cpu(p->seq_num));
b411b363 5008
b30ab791
AG
5009 if (get_ldev(device)) {
5010 drbd_rs_complete_io(device, sector);
5011 drbd_set_in_sync(device, sector, blksize);
1d53f09e 5012 /* rs_same_csums is supposed to count in units of BM_BLOCK_SIZE */
b30ab791
AG
5013 device->rs_same_csum += (blksize >> BM_BLOCK_SHIFT);
5014 put_ldev(device);
1d53f09e 5015 }
b30ab791
AG
5016 dec_rs_pending(device);
5017 atomic_add(blksize >> 9, &device->rs_sect_in);
b411b363 5018
2735a594 5019 return 0;
b411b363
PR
5020}
5021
bc9c5c41 5022static int
b30ab791 5023validate_req_change_req_state(struct drbd_device *device, u64 id, sector_t sector,
bc9c5c41
AG
5024 struct rb_root *root, const char *func,
5025 enum drbd_req_event what, bool missing_ok)
b411b363
PR
5026{
5027 struct drbd_request *req;
5028 struct bio_and_error m;
5029
0500813f 5030 spin_lock_irq(&device->resource->req_lock);
b30ab791 5031 req = find_request(device, root, id, sector, missing_ok, func);
b411b363 5032 if (unlikely(!req)) {
0500813f 5033 spin_unlock_irq(&device->resource->req_lock);
85997675 5034 return -EIO;
b411b363
PR
5035 }
5036 __req_mod(req, what, &m);
0500813f 5037 spin_unlock_irq(&device->resource->req_lock);
b411b363
PR
5038
5039 if (m.bio)
b30ab791 5040 complete_master_bio(device, &m);
85997675 5041 return 0;
b411b363
PR
5042}
5043
bde89a9e 5044static int got_BlockAck(struct drbd_connection *connection, struct packet_info *pi)
b411b363 5045{
9f4fe9ad 5046 struct drbd_peer_device *peer_device;
b30ab791 5047 struct drbd_device *device;
e658983a 5048 struct p_block_ack *p = pi->data;
b411b363
PR
5049 sector_t sector = be64_to_cpu(p->sector);
5050 int blksize = be32_to_cpu(p->blksize);
5051 enum drbd_req_event what;
5052
9f4fe9ad
AG
5053 peer_device = conn_peer_device(connection, pi->vnr);
5054 if (!peer_device)
2735a594 5055 return -EIO;
9f4fe9ad 5056 device = peer_device->device;
1952e916 5057
69a22773 5058 update_peer_seq(peer_device, be32_to_cpu(p->seq_num));
b411b363 5059
579b57ed 5060 if (p->block_id == ID_SYNCER) {
b30ab791
AG
5061 drbd_set_in_sync(device, sector, blksize);
5062 dec_rs_pending(device);
2735a594 5063 return 0;
b411b363 5064 }
e05e1e59 5065 switch (pi->cmd) {
b411b363 5066 case P_RS_WRITE_ACK:
8554df1c 5067 what = WRITE_ACKED_BY_PEER_AND_SIS;
b411b363
PR
5068 break;
5069 case P_WRITE_ACK:
8554df1c 5070 what = WRITE_ACKED_BY_PEER;
b411b363
PR
5071 break;
5072 case P_RECV_ACK:
8554df1c 5073 what = RECV_ACKED_BY_PEER;
b411b363 5074 break;
d4dabbe2
LE
5075 case P_SUPERSEDED:
5076 what = CONFLICT_RESOLVED;
b411b363 5077 break;
7be8da07 5078 case P_RETRY_WRITE:
7be8da07 5079 what = POSTPONE_WRITE;
b411b363
PR
5080 break;
5081 default:
2735a594 5082 BUG();
b411b363
PR
5083 }
5084
b30ab791
AG
5085 return validate_req_change_req_state(device, p->block_id, sector,
5086 &device->write_requests, __func__,
2735a594 5087 what, false);
b411b363
PR
5088}
5089
bde89a9e 5090static int got_NegAck(struct drbd_connection *connection, struct packet_info *pi)
b411b363 5091{
9f4fe9ad 5092 struct drbd_peer_device *peer_device;
b30ab791 5093 struct drbd_device *device;
e658983a 5094 struct p_block_ack *p = pi->data;
b411b363 5095 sector_t sector = be64_to_cpu(p->sector);
2deb8336 5096 int size = be32_to_cpu(p->blksize);
85997675 5097 int err;
b411b363 5098
9f4fe9ad
AG
5099 peer_device = conn_peer_device(connection, pi->vnr);
5100 if (!peer_device)
2735a594 5101 return -EIO;
9f4fe9ad 5102 device = peer_device->device;
b411b363 5103
69a22773 5104 update_peer_seq(peer_device, be32_to_cpu(p->seq_num));
b411b363 5105
579b57ed 5106 if (p->block_id == ID_SYNCER) {
b30ab791
AG
5107 dec_rs_pending(device);
5108 drbd_rs_failed_io(device, sector, size);
2735a594 5109 return 0;
b411b363 5110 }
2deb8336 5111
b30ab791
AG
5112 err = validate_req_change_req_state(device, p->block_id, sector,
5113 &device->write_requests, __func__,
303d1448 5114 NEG_ACKED, true);
85997675 5115 if (err) {
c3afd8f5
AG
5116 /* Protocol A has no P_WRITE_ACKs, but has P_NEG_ACKs.
5117 The master bio might already be completed, therefore the
5118 request is no longer in the collision hash. */
5119 /* In Protocol B we might already have got a P_RECV_ACK
5120 but then get a P_NEG_ACK afterwards. */
b30ab791 5121 drbd_set_out_of_sync(device, sector, size);
2deb8336 5122 }
2735a594 5123 return 0;
b411b363
PR
5124}
5125
bde89a9e 5126static int got_NegDReply(struct drbd_connection *connection, struct packet_info *pi)
b411b363 5127{
9f4fe9ad 5128 struct drbd_peer_device *peer_device;
b30ab791 5129 struct drbd_device *device;
e658983a 5130 struct p_block_ack *p = pi->data;
b411b363
PR
5131 sector_t sector = be64_to_cpu(p->sector);
5132
9f4fe9ad
AG
5133 peer_device = conn_peer_device(connection, pi->vnr);
5134 if (!peer_device)
2735a594 5135 return -EIO;
9f4fe9ad 5136 device = peer_device->device;
1952e916 5137
69a22773 5138 update_peer_seq(peer_device, be32_to_cpu(p->seq_num));
7be8da07 5139
d0180171 5140 drbd_err(device, "Got NegDReply; Sector %llus, len %u.\n",
b411b363
PR
5141 (unsigned long long)sector, be32_to_cpu(p->blksize));
5142
b30ab791
AG
5143 return validate_req_change_req_state(device, p->block_id, sector,
5144 &device->read_requests, __func__,
2735a594 5145 NEG_ACKED, false);
b411b363
PR
5146}
5147
bde89a9e 5148static int got_NegRSDReply(struct drbd_connection *connection, struct packet_info *pi)
b411b363 5149{
9f4fe9ad 5150 struct drbd_peer_device *peer_device;
b30ab791 5151 struct drbd_device *device;
b411b363
PR
5152 sector_t sector;
5153 int size;
e658983a 5154 struct p_block_ack *p = pi->data;
1952e916 5155
9f4fe9ad
AG
5156 peer_device = conn_peer_device(connection, pi->vnr);
5157 if (!peer_device)
2735a594 5158 return -EIO;
9f4fe9ad 5159 device = peer_device->device;
b411b363
PR
5160
5161 sector = be64_to_cpu(p->sector);
5162 size = be32_to_cpu(p->blksize);
b411b363 5163
69a22773 5164 update_peer_seq(peer_device, be32_to_cpu(p->seq_num));
b411b363 5165
b30ab791 5166 dec_rs_pending(device);
b411b363 5167
b30ab791
AG
5168 if (get_ldev_if_state(device, D_FAILED)) {
5169 drbd_rs_complete_io(device, sector);
e05e1e59 5170 switch (pi->cmd) {
d612d309 5171 case P_NEG_RS_DREPLY:
b30ab791 5172 drbd_rs_failed_io(device, sector, size);
d612d309
PR
5173 case P_RS_CANCEL:
5174 break;
5175 default:
2735a594 5176 BUG();
d612d309 5177 }
b30ab791 5178 put_ldev(device);
b411b363
PR
5179 }
5180
2735a594 5181 return 0;
b411b363
PR
5182}
5183
bde89a9e 5184static int got_BarrierAck(struct drbd_connection *connection, struct packet_info *pi)
b411b363 5185{
e658983a 5186 struct p_barrier_ack *p = pi->data;
c06ece6b 5187 struct drbd_peer_device *peer_device;
9ed57dcb 5188 int vnr;
1952e916 5189
bde89a9e 5190 tl_release(connection, p->barrier, be32_to_cpu(p->set_size));
b411b363 5191
9ed57dcb 5192 rcu_read_lock();
c06ece6b
AG
5193 idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
5194 struct drbd_device *device = peer_device->device;
5195
b30ab791
AG
5196 if (device->state.conn == C_AHEAD &&
5197 atomic_read(&device->ap_in_flight) == 0 &&
5198 !test_and_set_bit(AHEAD_TO_SYNC_SOURCE, &device->flags)) {
5199 device->start_resync_timer.expires = jiffies + HZ;
5200 add_timer(&device->start_resync_timer);
9ed57dcb 5201 }
c4752ef1 5202 }
9ed57dcb 5203 rcu_read_unlock();
c4752ef1 5204
2735a594 5205 return 0;
b411b363
PR
5206}
5207
bde89a9e 5208static int got_OVResult(struct drbd_connection *connection, struct packet_info *pi)
b411b363 5209{
9f4fe9ad 5210 struct drbd_peer_device *peer_device;
b30ab791 5211 struct drbd_device *device;
e658983a 5212 struct p_block_ack *p = pi->data;
b411b363
PR
5213 struct drbd_work *w;
5214 sector_t sector;
5215 int size;
5216
9f4fe9ad
AG
5217 peer_device = conn_peer_device(connection, pi->vnr);
5218 if (!peer_device)
2735a594 5219 return -EIO;
9f4fe9ad 5220 device = peer_device->device;
1952e916 5221
b411b363
PR
5222 sector = be64_to_cpu(p->sector);
5223 size = be32_to_cpu(p->blksize);
5224
69a22773 5225 update_peer_seq(peer_device, be32_to_cpu(p->seq_num));
b411b363
PR
5226
5227 if (be64_to_cpu(p->block_id) == ID_OUT_OF_SYNC)
b30ab791 5228 drbd_ov_out_of_sync_found(device, sector, size);
b411b363 5229 else
b30ab791 5230 ov_out_of_sync_print(device);
b411b363 5231
b30ab791 5232 if (!get_ldev(device))
2735a594 5233 return 0;
1d53f09e 5234
b30ab791
AG
5235 drbd_rs_complete_io(device, sector);
5236 dec_rs_pending(device);
b411b363 5237
b30ab791 5238 --device->ov_left;
ea5442af
LE
5239
5240 /* let's advance progress step marks only for every other megabyte */
b30ab791
AG
5241 if ((device->ov_left & 0x200) == 0x200)
5242 drbd_advance_rs_marks(device, device->ov_left);
ea5442af 5243
b30ab791 5244 if (device->ov_left == 0) {
b411b363
PR
5245 w = kmalloc(sizeof(*w), GFP_NOIO);
5246 if (w) {
5247 w->cb = w_ov_finished;
b30ab791 5248 w->device = device;
9f4fe9ad 5249 drbd_queue_work(&peer_device->connection->sender_work, w);
b411b363 5250 } else {
d0180171 5251 drbd_err(device, "kmalloc(w) failed.");
b30ab791
AG
5252 ov_out_of_sync_print(device);
5253 drbd_resync_finished(device);
b411b363
PR
5254 }
5255 }
b30ab791 5256 put_ldev(device);
2735a594 5257 return 0;
b411b363
PR
5258}
5259
bde89a9e 5260static int got_skip(struct drbd_connection *connection, struct packet_info *pi)
0ced55a3 5261{
2735a594 5262 return 0;
b411b363
PR
5263}
5264
bde89a9e 5265static int connection_finish_peer_reqs(struct drbd_connection *connection)
0ced55a3 5266{
c06ece6b 5267 struct drbd_peer_device *peer_device;
c141ebda 5268 int vnr, not_empty = 0;
32862ec7
PR
5269
5270 do {
bde89a9e 5271 clear_bit(SIGNAL_ASENDER, &connection->flags);
32862ec7 5272 flush_signals(current);
c141ebda
PR
5273
5274 rcu_read_lock();
c06ece6b
AG
5275 idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
5276 struct drbd_device *device = peer_device->device;
b30ab791 5277 kref_get(&device->kref);
c141ebda 5278 rcu_read_unlock();
b30ab791 5279 if (drbd_finish_peer_reqs(device)) {
05a10ec7 5280 kref_put(&device->kref, drbd_destroy_device);
c141ebda 5281 return 1;
d3fcb490 5282 }
05a10ec7 5283 kref_put(&device->kref, drbd_destroy_device);
c141ebda 5284 rcu_read_lock();
082a3439 5285 }
bde89a9e 5286 set_bit(SIGNAL_ASENDER, &connection->flags);
082a3439 5287
0500813f 5288 spin_lock_irq(&connection->resource->req_lock);
c06ece6b
AG
5289 idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
5290 struct drbd_device *device = peer_device->device;
b30ab791 5291 not_empty = !list_empty(&device->done_ee);
082a3439
PR
5292 if (not_empty)
5293 break;
5294 }
0500813f 5295 spin_unlock_irq(&connection->resource->req_lock);
c141ebda 5296 rcu_read_unlock();
32862ec7
PR
5297 } while (not_empty);
5298
5299 return 0;
0ced55a3
PR
5300}
5301
b411b363
PR
5302struct asender_cmd {
5303 size_t pkt_size;
bde89a9e 5304 int (*fn)(struct drbd_connection *connection, struct packet_info *);
b411b363
PR
5305};
5306
7201b972 5307static struct asender_cmd asender_tbl[] = {
e658983a
AG
5308 [P_PING] = { 0, got_Ping },
5309 [P_PING_ACK] = { 0, got_PingAck },
b411b363
PR
5310 [P_RECV_ACK] = { sizeof(struct p_block_ack), got_BlockAck },
5311 [P_WRITE_ACK] = { sizeof(struct p_block_ack), got_BlockAck },
5312 [P_RS_WRITE_ACK] = { sizeof(struct p_block_ack), got_BlockAck },
d4dabbe2 5313 [P_SUPERSEDED] = { sizeof(struct p_block_ack), got_BlockAck },
b411b363
PR
5314 [P_NEG_ACK] = { sizeof(struct p_block_ack), got_NegAck },
5315 [P_NEG_DREPLY] = { sizeof(struct p_block_ack), got_NegDReply },
1952e916 5316 [P_NEG_RS_DREPLY] = { sizeof(struct p_block_ack), got_NegRSDReply },
b411b363
PR
5317 [P_OV_RESULT] = { sizeof(struct p_block_ack), got_OVResult },
5318 [P_BARRIER_ACK] = { sizeof(struct p_barrier_ack), got_BarrierAck },
5319 [P_STATE_CHG_REPLY] = { sizeof(struct p_req_state_reply), got_RqSReply },
5320 [P_RS_IS_IN_SYNC] = { sizeof(struct p_block_ack), got_IsInSync },
02918be2 5321 [P_DELAY_PROBE] = { sizeof(struct p_delay_probe93), got_skip },
1952e916
AG
5322 [P_RS_CANCEL] = { sizeof(struct p_block_ack), got_NegRSDReply },
5323 [P_CONN_ST_CHG_REPLY]={ sizeof(struct p_req_state_reply), got_conn_RqSReply },
5324 [P_RETRY_WRITE] = { sizeof(struct p_block_ack), got_BlockAck },
7201b972 5325};
b411b363
PR
5326
5327int drbd_asender(struct drbd_thread *thi)
5328{
bde89a9e 5329 struct drbd_connection *connection = thi->connection;
b411b363 5330 struct asender_cmd *cmd = NULL;
77351055 5331 struct packet_info pi;
257d0af6 5332 int rv;
bde89a9e 5333 void *buf = connection->meta.rbuf;
b411b363 5334 int received = 0;
bde89a9e 5335 unsigned int header_size = drbd_header_size(connection);
52b061a4 5336 int expect = header_size;
44ed167d
PR
5337 bool ping_timeout_active = false;
5338 struct net_conf *nc;
bb77d34e 5339 int ping_timeo, tcp_cork, ping_int;
3990e04d 5340 struct sched_param param = { .sched_priority = 2 };
b411b363 5341
3990e04d
PR
5342 rv = sched_setscheduler(current, SCHED_RR, &param);
5343 if (rv < 0)
1ec861eb 5344 drbd_err(connection, "drbd_asender: ERROR set priority, ret=%d\n", rv);
b411b363 5345
e77a0a5c 5346 while (get_t_state(thi) == RUNNING) {
80822284 5347 drbd_thread_current_set_cpu(thi);
b411b363 5348
44ed167d 5349 rcu_read_lock();
bde89a9e 5350 nc = rcu_dereference(connection->net_conf);
44ed167d 5351 ping_timeo = nc->ping_timeo;
bb77d34e 5352 tcp_cork = nc->tcp_cork;
44ed167d
PR
5353 ping_int = nc->ping_int;
5354 rcu_read_unlock();
5355
bde89a9e
AG
5356 if (test_and_clear_bit(SEND_PING, &connection->flags)) {
5357 if (drbd_send_ping(connection)) {
1ec861eb 5358 drbd_err(connection, "drbd_send_ping has failed\n");
b411b363 5359 goto reconnect;
841ce241 5360 }
bde89a9e 5361 connection->meta.socket->sk->sk_rcvtimeo = ping_timeo * HZ / 10;
44ed167d 5362 ping_timeout_active = true;
b411b363
PR
5363 }
5364
32862ec7
PR
5365 /* TODO: conditionally cork; it may hurt latency if we cork without
5366 much to send */
bb77d34e 5367 if (tcp_cork)
bde89a9e
AG
5368 drbd_tcp_cork(connection->meta.socket);
5369 if (connection_finish_peer_reqs(connection)) {
1ec861eb 5370 drbd_err(connection, "connection_finish_peer_reqs() failed\n");
32862ec7 5371 goto reconnect;
b411b363
PR
5372 }
5373 /* but unconditionally uncork unless disabled */
bb77d34e 5374 if (tcp_cork)
bde89a9e 5375 drbd_tcp_uncork(connection->meta.socket);
b411b363
PR
5376
5377 /* short circuit, recv_msg would return EINTR anyways. */
5378 if (signal_pending(current))
5379 continue;
5380
bde89a9e
AG
5381 rv = drbd_recv_short(connection->meta.socket, buf, expect-received, 0);
5382 clear_bit(SIGNAL_ASENDER, &connection->flags);
b411b363
PR
5383
5384 flush_signals(current);
5385
5386 /* Note:
5387 * -EINTR (on meta) we got a signal
5388 * -EAGAIN (on meta) rcvtimeo expired
5389 * -ECONNRESET other side closed the connection
5390 * -ERESTARTSYS (on data) we got a signal
5391 * rv < 0 other than above: unexpected error!
5392 * rv == expected: full header or command
5393 * rv < expected: "woken" by signal during receive
5394 * rv == 0 : "connection shut down by peer"
5395 */
5396 if (likely(rv > 0)) {
5397 received += rv;
5398 buf += rv;
5399 } else if (rv == 0) {
bde89a9e 5400 if (test_bit(DISCONNECT_SENT, &connection->flags)) {
b66623e3
PR
5401 long t;
5402 rcu_read_lock();
bde89a9e 5403 t = rcu_dereference(connection->net_conf)->ping_timeo * HZ/10;
b66623e3
PR
5404 rcu_read_unlock();
5405
bde89a9e
AG
5406 t = wait_event_timeout(connection->ping_wait,
5407 connection->cstate < C_WF_REPORT_PARAMS,
b66623e3 5408 t);
599377ac
PR
5409 if (t)
5410 break;
5411 }
1ec861eb 5412 drbd_err(connection, "meta connection shut down by peer.\n");
b411b363
PR
5413 goto reconnect;
5414 } else if (rv == -EAGAIN) {
cb6518cb
LE
5415 /* If the data socket received something meanwhile,
5416 * that is good enough: peer is still alive. */
bde89a9e
AG
5417 if (time_after(connection->last_received,
5418 jiffies - connection->meta.socket->sk->sk_rcvtimeo))
cb6518cb 5419 continue;
f36af18c 5420 if (ping_timeout_active) {
1ec861eb 5421 drbd_err(connection, "PingAck did not arrive in time.\n");
b411b363
PR
5422 goto reconnect;
5423 }
bde89a9e 5424 set_bit(SEND_PING, &connection->flags);
b411b363
PR
5425 continue;
5426 } else if (rv == -EINTR) {
5427 continue;
5428 } else {
1ec861eb 5429 drbd_err(connection, "sock_recvmsg returned %d\n", rv);
b411b363
PR
5430 goto reconnect;
5431 }
5432
5433 if (received == expect && cmd == NULL) {
bde89a9e 5434 if (decode_header(connection, connection->meta.rbuf, &pi))
b411b363 5435 goto reconnect;
7201b972 5436 cmd = &asender_tbl[pi.cmd];
1952e916 5437 if (pi.cmd >= ARRAY_SIZE(asender_tbl) || !cmd->fn) {
1ec861eb 5438 drbd_err(connection, "Unexpected meta packet %s (0x%04x)\n",
2fcb8f30 5439 cmdname(pi.cmd), pi.cmd);
b411b363
PR
5440 goto disconnect;
5441 }
e658983a 5442 expect = header_size + cmd->pkt_size;
52b061a4 5443 if (pi.size != expect - header_size) {
1ec861eb 5444 drbd_err(connection, "Wrong packet size on meta (c: %d, l: %d)\n",
77351055 5445 pi.cmd, pi.size);
b411b363 5446 goto reconnect;
257d0af6 5447 }
b411b363
PR
5448 }
5449 if (received == expect) {
2735a594 5450 bool err;
a4fbda8e 5451
bde89a9e 5452 err = cmd->fn(connection, &pi);
2735a594 5453 if (err) {
1ec861eb 5454 drbd_err(connection, "%pf failed\n", cmd->fn);
b411b363 5455 goto reconnect;
1952e916 5456 }
b411b363 5457
bde89a9e 5458 connection->last_received = jiffies;
f36af18c 5459
44ed167d
PR
5460 if (cmd == &asender_tbl[P_PING_ACK]) {
5461 /* restore idle timeout */
bde89a9e 5462 connection->meta.socket->sk->sk_rcvtimeo = ping_int * HZ;
44ed167d
PR
5463 ping_timeout_active = false;
5464 }
f36af18c 5465
bde89a9e 5466 buf = connection->meta.rbuf;
b411b363 5467 received = 0;
52b061a4 5468 expect = header_size;
b411b363
PR
5469 cmd = NULL;
5470 }
5471 }
5472
5473 if (0) {
5474reconnect:
bde89a9e
AG
5475 conn_request_state(connection, NS(conn, C_NETWORK_FAILURE), CS_HARD);
5476 conn_md_sync(connection);
b411b363
PR
5477 }
5478 if (0) {
5479disconnect:
bde89a9e 5480 conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD);
b411b363 5481 }
bde89a9e 5482 clear_bit(SIGNAL_ASENDER, &connection->flags);
b411b363 5483
1ec861eb 5484 drbd_info(connection, "asender terminated\n");
b411b363
PR
5485
5486 return 0;
5487}
This page took 0.709216 seconds and 5 git commands to generate.