drbd: Pass a peer device to a number of fuctions
[deliverable/linux.git] / drivers / block / drbd / drbd_receiver.c
... / ...
CommitLineData
1/*
2 drbd_receiver.c
3
4 This file is part of DRBD by Philipp Reisner and Lars Ellenberg.
5
6 Copyright (C) 2001-2008, LINBIT Information Technologies GmbH.
7 Copyright (C) 1999-2008, Philipp Reisner <philipp.reisner@linbit.com>.
8 Copyright (C) 2002-2008, Lars Ellenberg <lars.ellenberg@linbit.com>.
9
10 drbd is free software; you can redistribute it and/or modify
11 it under the terms of the GNU General Public License as published by
12 the Free Software Foundation; either version 2, or (at your option)
13 any later version.
14
15 drbd is distributed in the hope that it will be useful,
16 but WITHOUT ANY WARRANTY; without even the implied warranty of
17 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18 GNU General Public License for more details.
19
20 You should have received a copy of the GNU General Public License
21 along with drbd; see the file COPYING. If not, write to
22 the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
23 */
24
25
26#include <linux/module.h>
27
28#include <asm/uaccess.h>
29#include <net/sock.h>
30
31#include <linux/drbd.h>
32#include <linux/fs.h>
33#include <linux/file.h>
34#include <linux/in.h>
35#include <linux/mm.h>
36#include <linux/memcontrol.h>
37#include <linux/mm_inline.h>
38#include <linux/slab.h>
39#include <linux/pkt_sched.h>
40#define __KERNEL_SYSCALLS__
41#include <linux/unistd.h>
42#include <linux/vmalloc.h>
43#include <linux/random.h>
44#include <linux/string.h>
45#include <linux/scatterlist.h>
46#include "drbd_int.h"
47#include "drbd_protocol.h"
48#include "drbd_req.h"
49
50#include "drbd_vli.h"
51
52struct packet_info {
53 enum drbd_packet cmd;
54 unsigned int size;
55 unsigned int vnr;
56 void *data;
57};
58
59enum finish_epoch {
60 FE_STILL_LIVE,
61 FE_DESTROYED,
62 FE_RECYCLED,
63};
64
65static int drbd_do_features(struct drbd_connection *connection);
66static int drbd_do_auth(struct drbd_connection *connection);
67static int drbd_disconnected(struct drbd_peer_device *);
68
69static enum finish_epoch drbd_may_finish_epoch(struct drbd_connection *, struct drbd_epoch *, enum epoch_event);
70static int e_end_block(struct drbd_work *, int);
71
72
73#define GFP_TRY (__GFP_HIGHMEM | __GFP_NOWARN)
74
75/*
76 * some helper functions to deal with single linked page lists,
77 * page->private being our "next" pointer.
78 */
79
80/* If at least n pages are linked at head, get n pages off.
81 * Otherwise, don't modify head, and return NULL.
82 * Locking is the responsibility of the caller.
83 */
84static struct page *page_chain_del(struct page **head, int n)
85{
86 struct page *page;
87 struct page *tmp;
88
89 BUG_ON(!n);
90 BUG_ON(!head);
91
92 page = *head;
93
94 if (!page)
95 return NULL;
96
97 while (page) {
98 tmp = page_chain_next(page);
99 if (--n == 0)
100 break; /* found sufficient pages */
101 if (tmp == NULL)
102 /* insufficient pages, don't use any of them. */
103 return NULL;
104 page = tmp;
105 }
106
107 /* add end of list marker for the returned list */
108 set_page_private(page, 0);
109 /* actual return value, and adjustment of head */
110 page = *head;
111 *head = tmp;
112 return page;
113}
114
115/* may be used outside of locks to find the tail of a (usually short)
116 * "private" page chain, before adding it back to a global chain head
117 * with page_chain_add() under a spinlock. */
118static struct page *page_chain_tail(struct page *page, int *len)
119{
120 struct page *tmp;
121 int i = 1;
122 while ((tmp = page_chain_next(page)))
123 ++i, page = tmp;
124 if (len)
125 *len = i;
126 return page;
127}
128
129static int page_chain_free(struct page *page)
130{
131 struct page *tmp;
132 int i = 0;
133 page_chain_for_each_safe(page, tmp) {
134 put_page(page);
135 ++i;
136 }
137 return i;
138}
139
140static void page_chain_add(struct page **head,
141 struct page *chain_first, struct page *chain_last)
142{
143#if 1
144 struct page *tmp;
145 tmp = page_chain_tail(chain_first, NULL);
146 BUG_ON(tmp != chain_last);
147#endif
148
149 /* add chain to head */
150 set_page_private(chain_last, (unsigned long)*head);
151 *head = chain_first;
152}
153
154static struct page *__drbd_alloc_pages(struct drbd_device *device,
155 unsigned int number)
156{
157 struct page *page = NULL;
158 struct page *tmp = NULL;
159 unsigned int i = 0;
160
161 /* Yes, testing drbd_pp_vacant outside the lock is racy.
162 * So what. It saves a spin_lock. */
163 if (drbd_pp_vacant >= number) {
164 spin_lock(&drbd_pp_lock);
165 page = page_chain_del(&drbd_pp_pool, number);
166 if (page)
167 drbd_pp_vacant -= number;
168 spin_unlock(&drbd_pp_lock);
169 if (page)
170 return page;
171 }
172
173 /* GFP_TRY, because we must not cause arbitrary write-out: in a DRBD
174 * "criss-cross" setup, that might cause write-out on some other DRBD,
175 * which in turn might block on the other node at this very place. */
176 for (i = 0; i < number; i++) {
177 tmp = alloc_page(GFP_TRY);
178 if (!tmp)
179 break;
180 set_page_private(tmp, (unsigned long)page);
181 page = tmp;
182 }
183
184 if (i == number)
185 return page;
186
187 /* Not enough pages immediately available this time.
188 * No need to jump around here, drbd_alloc_pages will retry this
189 * function "soon". */
190 if (page) {
191 tmp = page_chain_tail(page, NULL);
192 spin_lock(&drbd_pp_lock);
193 page_chain_add(&drbd_pp_pool, page, tmp);
194 drbd_pp_vacant += i;
195 spin_unlock(&drbd_pp_lock);
196 }
197 return NULL;
198}
199
200static void reclaim_finished_net_peer_reqs(struct drbd_device *device,
201 struct list_head *to_be_freed)
202{
203 struct drbd_peer_request *peer_req;
204 struct list_head *le, *tle;
205
206 /* The EEs are always appended to the end of the list. Since
207 they are sent in order over the wire, they have to finish
208 in order. As soon as we see the first not finished we can
209 stop to examine the list... */
210
211 list_for_each_safe(le, tle, &device->net_ee) {
212 peer_req = list_entry(le, struct drbd_peer_request, w.list);
213 if (drbd_peer_req_has_active_page(peer_req))
214 break;
215 list_move(le, to_be_freed);
216 }
217}
218
219static void drbd_kick_lo_and_reclaim_net(struct drbd_device *device)
220{
221 LIST_HEAD(reclaimed);
222 struct drbd_peer_request *peer_req, *t;
223
224 spin_lock_irq(&device->resource->req_lock);
225 reclaim_finished_net_peer_reqs(device, &reclaimed);
226 spin_unlock_irq(&device->resource->req_lock);
227
228 list_for_each_entry_safe(peer_req, t, &reclaimed, w.list)
229 drbd_free_net_peer_req(device, peer_req);
230}
231
232/**
233 * drbd_alloc_pages() - Returns @number pages, retries forever (or until signalled)
234 * @device: DRBD device.
235 * @number: number of pages requested
236 * @retry: whether to retry, if not enough pages are available right now
237 *
238 * Tries to allocate number pages, first from our own page pool, then from
239 * the kernel, unless this allocation would exceed the max_buffers setting.
240 * Possibly retry until DRBD frees sufficient pages somewhere else.
241 *
242 * Returns a page chain linked via page->private.
243 */
244struct page *drbd_alloc_pages(struct drbd_peer_device *peer_device, unsigned int number,
245 bool retry)
246{
247 struct drbd_device *device = peer_device->device;
248 struct page *page = NULL;
249 struct net_conf *nc;
250 DEFINE_WAIT(wait);
251 int mxb;
252
253 /* Yes, we may run up to @number over max_buffers. If we
254 * follow it strictly, the admin will get it wrong anyways. */
255 rcu_read_lock();
256 nc = rcu_dereference(peer_device->connection->net_conf);
257 mxb = nc ? nc->max_buffers : 1000000;
258 rcu_read_unlock();
259
260 if (atomic_read(&device->pp_in_use) < mxb)
261 page = __drbd_alloc_pages(device, number);
262
263 while (page == NULL) {
264 prepare_to_wait(&drbd_pp_wait, &wait, TASK_INTERRUPTIBLE);
265
266 drbd_kick_lo_and_reclaim_net(device);
267
268 if (atomic_read(&device->pp_in_use) < mxb) {
269 page = __drbd_alloc_pages(device, number);
270 if (page)
271 break;
272 }
273
274 if (!retry)
275 break;
276
277 if (signal_pending(current)) {
278 drbd_warn(device, "drbd_alloc_pages interrupted!\n");
279 break;
280 }
281
282 schedule();
283 }
284 finish_wait(&drbd_pp_wait, &wait);
285
286 if (page)
287 atomic_add(number, &device->pp_in_use);
288 return page;
289}
290
291/* Must not be used from irq, as that may deadlock: see drbd_alloc_pages.
292 * Is also used from inside an other spin_lock_irq(&resource->req_lock);
293 * Either links the page chain back to the global pool,
294 * or returns all pages to the system. */
295static void drbd_free_pages(struct drbd_device *device, struct page *page, int is_net)
296{
297 atomic_t *a = is_net ? &device->pp_in_use_by_net : &device->pp_in_use;
298 int i;
299
300 if (page == NULL)
301 return;
302
303 if (drbd_pp_vacant > (DRBD_MAX_BIO_SIZE/PAGE_SIZE) * minor_count)
304 i = page_chain_free(page);
305 else {
306 struct page *tmp;
307 tmp = page_chain_tail(page, &i);
308 spin_lock(&drbd_pp_lock);
309 page_chain_add(&drbd_pp_pool, page, tmp);
310 drbd_pp_vacant += i;
311 spin_unlock(&drbd_pp_lock);
312 }
313 i = atomic_sub_return(i, a);
314 if (i < 0)
315 drbd_warn(device, "ASSERTION FAILED: %s: %d < 0\n",
316 is_net ? "pp_in_use_by_net" : "pp_in_use", i);
317 wake_up(&drbd_pp_wait);
318}
319
320/*
321You need to hold the req_lock:
322 _drbd_wait_ee_list_empty()
323
324You must not have the req_lock:
325 drbd_free_peer_req()
326 drbd_alloc_peer_req()
327 drbd_free_peer_reqs()
328 drbd_ee_fix_bhs()
329 drbd_finish_peer_reqs()
330 drbd_clear_done_ee()
331 drbd_wait_ee_list_empty()
332*/
333
334struct drbd_peer_request *
335drbd_alloc_peer_req(struct drbd_peer_device *peer_device, u64 id, sector_t sector,
336 unsigned int data_size, gfp_t gfp_mask) __must_hold(local)
337{
338 struct drbd_device *device = peer_device->device;
339 struct drbd_peer_request *peer_req;
340 struct page *page = NULL;
341 unsigned nr_pages = (data_size + PAGE_SIZE -1) >> PAGE_SHIFT;
342
343 if (drbd_insert_fault(device, DRBD_FAULT_AL_EE))
344 return NULL;
345
346 peer_req = mempool_alloc(drbd_ee_mempool, gfp_mask & ~__GFP_HIGHMEM);
347 if (!peer_req) {
348 if (!(gfp_mask & __GFP_NOWARN))
349 drbd_err(device, "%s: allocation failed\n", __func__);
350 return NULL;
351 }
352
353 if (data_size) {
354 page = drbd_alloc_pages(peer_device, nr_pages, (gfp_mask & __GFP_WAIT));
355 if (!page)
356 goto fail;
357 }
358
359 drbd_clear_interval(&peer_req->i);
360 peer_req->i.size = data_size;
361 peer_req->i.sector = sector;
362 peer_req->i.local = false;
363 peer_req->i.waiting = false;
364
365 peer_req->epoch = NULL;
366 peer_req->w.device = device;
367 peer_req->pages = page;
368 atomic_set(&peer_req->pending_bios, 0);
369 peer_req->flags = 0;
370 /*
371 * The block_id is opaque to the receiver. It is not endianness
372 * converted, and sent back to the sender unchanged.
373 */
374 peer_req->block_id = id;
375
376 return peer_req;
377
378 fail:
379 mempool_free(peer_req, drbd_ee_mempool);
380 return NULL;
381}
382
383void __drbd_free_peer_req(struct drbd_device *device, struct drbd_peer_request *peer_req,
384 int is_net)
385{
386 if (peer_req->flags & EE_HAS_DIGEST)
387 kfree(peer_req->digest);
388 drbd_free_pages(device, peer_req->pages, is_net);
389 D_ASSERT(device, atomic_read(&peer_req->pending_bios) == 0);
390 D_ASSERT(device, drbd_interval_empty(&peer_req->i));
391 mempool_free(peer_req, drbd_ee_mempool);
392}
393
394int drbd_free_peer_reqs(struct drbd_device *device, struct list_head *list)
395{
396 LIST_HEAD(work_list);
397 struct drbd_peer_request *peer_req, *t;
398 int count = 0;
399 int is_net = list == &device->net_ee;
400
401 spin_lock_irq(&device->resource->req_lock);
402 list_splice_init(list, &work_list);
403 spin_unlock_irq(&device->resource->req_lock);
404
405 list_for_each_entry_safe(peer_req, t, &work_list, w.list) {
406 __drbd_free_peer_req(device, peer_req, is_net);
407 count++;
408 }
409 return count;
410}
411
412/*
413 * See also comments in _req_mod(,BARRIER_ACKED) and receive_Barrier.
414 */
415static int drbd_finish_peer_reqs(struct drbd_device *device)
416{
417 LIST_HEAD(work_list);
418 LIST_HEAD(reclaimed);
419 struct drbd_peer_request *peer_req, *t;
420 int err = 0;
421
422 spin_lock_irq(&device->resource->req_lock);
423 reclaim_finished_net_peer_reqs(device, &reclaimed);
424 list_splice_init(&device->done_ee, &work_list);
425 spin_unlock_irq(&device->resource->req_lock);
426
427 list_for_each_entry_safe(peer_req, t, &reclaimed, w.list)
428 drbd_free_net_peer_req(device, peer_req);
429
430 /* possible callbacks here:
431 * e_end_block, and e_end_resync_block, e_send_superseded.
432 * all ignore the last argument.
433 */
434 list_for_each_entry_safe(peer_req, t, &work_list, w.list) {
435 int err2;
436
437 /* list_del not necessary, next/prev members not touched */
438 err2 = peer_req->w.cb(&peer_req->w, !!err);
439 if (!err)
440 err = err2;
441 drbd_free_peer_req(device, peer_req);
442 }
443 wake_up(&device->ee_wait);
444
445 return err;
446}
447
448static void _drbd_wait_ee_list_empty(struct drbd_device *device,
449 struct list_head *head)
450{
451 DEFINE_WAIT(wait);
452
453 /* avoids spin_lock/unlock
454 * and calling prepare_to_wait in the fast path */
455 while (!list_empty(head)) {
456 prepare_to_wait(&device->ee_wait, &wait, TASK_UNINTERRUPTIBLE);
457 spin_unlock_irq(&device->resource->req_lock);
458 io_schedule();
459 finish_wait(&device->ee_wait, &wait);
460 spin_lock_irq(&device->resource->req_lock);
461 }
462}
463
464static void drbd_wait_ee_list_empty(struct drbd_device *device,
465 struct list_head *head)
466{
467 spin_lock_irq(&device->resource->req_lock);
468 _drbd_wait_ee_list_empty(device, head);
469 spin_unlock_irq(&device->resource->req_lock);
470}
471
472static int drbd_recv_short(struct socket *sock, void *buf, size_t size, int flags)
473{
474 mm_segment_t oldfs;
475 struct kvec iov = {
476 .iov_base = buf,
477 .iov_len = size,
478 };
479 struct msghdr msg = {
480 .msg_iovlen = 1,
481 .msg_iov = (struct iovec *)&iov,
482 .msg_flags = (flags ? flags : MSG_WAITALL | MSG_NOSIGNAL)
483 };
484 int rv;
485
486 oldfs = get_fs();
487 set_fs(KERNEL_DS);
488 rv = sock_recvmsg(sock, &msg, size, msg.msg_flags);
489 set_fs(oldfs);
490
491 return rv;
492}
493
494static int drbd_recv(struct drbd_connection *connection, void *buf, size_t size)
495{
496 int rv;
497
498 rv = drbd_recv_short(connection->data.socket, buf, size, 0);
499
500 if (rv < 0) {
501 if (rv == -ECONNRESET)
502 drbd_info(connection, "sock was reset by peer\n");
503 else if (rv != -ERESTARTSYS)
504 drbd_err(connection, "sock_recvmsg returned %d\n", rv);
505 } else if (rv == 0) {
506 if (test_bit(DISCONNECT_SENT, &connection->flags)) {
507 long t;
508 rcu_read_lock();
509 t = rcu_dereference(connection->net_conf)->ping_timeo * HZ/10;
510 rcu_read_unlock();
511
512 t = wait_event_timeout(connection->ping_wait, connection->cstate < C_WF_REPORT_PARAMS, t);
513
514 if (t)
515 goto out;
516 }
517 drbd_info(connection, "sock was shut down by peer\n");
518 }
519
520 if (rv != size)
521 conn_request_state(connection, NS(conn, C_BROKEN_PIPE), CS_HARD);
522
523out:
524 return rv;
525}
526
527static int drbd_recv_all(struct drbd_connection *connection, void *buf, size_t size)
528{
529 int err;
530
531 err = drbd_recv(connection, buf, size);
532 if (err != size) {
533 if (err >= 0)
534 err = -EIO;
535 } else
536 err = 0;
537 return err;
538}
539
540static int drbd_recv_all_warn(struct drbd_connection *connection, void *buf, size_t size)
541{
542 int err;
543
544 err = drbd_recv_all(connection, buf, size);
545 if (err && !signal_pending(current))
546 drbd_warn(connection, "short read (expected size %d)\n", (int)size);
547 return err;
548}
549
550/* quoting tcp(7):
551 * On individual connections, the socket buffer size must be set prior to the
552 * listen(2) or connect(2) calls in order to have it take effect.
553 * This is our wrapper to do so.
554 */
555static void drbd_setbufsize(struct socket *sock, unsigned int snd,
556 unsigned int rcv)
557{
558 /* open coded SO_SNDBUF, SO_RCVBUF */
559 if (snd) {
560 sock->sk->sk_sndbuf = snd;
561 sock->sk->sk_userlocks |= SOCK_SNDBUF_LOCK;
562 }
563 if (rcv) {
564 sock->sk->sk_rcvbuf = rcv;
565 sock->sk->sk_userlocks |= SOCK_RCVBUF_LOCK;
566 }
567}
568
569static struct socket *drbd_try_connect(struct drbd_connection *connection)
570{
571 const char *what;
572 struct socket *sock;
573 struct sockaddr_in6 src_in6;
574 struct sockaddr_in6 peer_in6;
575 struct net_conf *nc;
576 int err, peer_addr_len, my_addr_len;
577 int sndbuf_size, rcvbuf_size, connect_int;
578 int disconnect_on_error = 1;
579
580 rcu_read_lock();
581 nc = rcu_dereference(connection->net_conf);
582 if (!nc) {
583 rcu_read_unlock();
584 return NULL;
585 }
586 sndbuf_size = nc->sndbuf_size;
587 rcvbuf_size = nc->rcvbuf_size;
588 connect_int = nc->connect_int;
589 rcu_read_unlock();
590
591 my_addr_len = min_t(int, connection->my_addr_len, sizeof(src_in6));
592 memcpy(&src_in6, &connection->my_addr, my_addr_len);
593
594 if (((struct sockaddr *)&connection->my_addr)->sa_family == AF_INET6)
595 src_in6.sin6_port = 0;
596 else
597 ((struct sockaddr_in *)&src_in6)->sin_port = 0; /* AF_INET & AF_SCI */
598
599 peer_addr_len = min_t(int, connection->peer_addr_len, sizeof(src_in6));
600 memcpy(&peer_in6, &connection->peer_addr, peer_addr_len);
601
602 what = "sock_create_kern";
603 err = sock_create_kern(((struct sockaddr *)&src_in6)->sa_family,
604 SOCK_STREAM, IPPROTO_TCP, &sock);
605 if (err < 0) {
606 sock = NULL;
607 goto out;
608 }
609
610 sock->sk->sk_rcvtimeo =
611 sock->sk->sk_sndtimeo = connect_int * HZ;
612 drbd_setbufsize(sock, sndbuf_size, rcvbuf_size);
613
614 /* explicitly bind to the configured IP as source IP
615 * for the outgoing connections.
616 * This is needed for multihomed hosts and to be
617 * able to use lo: interfaces for drbd.
618 * Make sure to use 0 as port number, so linux selects
619 * a free one dynamically.
620 */
621 what = "bind before connect";
622 err = sock->ops->bind(sock, (struct sockaddr *) &src_in6, my_addr_len);
623 if (err < 0)
624 goto out;
625
626 /* connect may fail, peer not yet available.
627 * stay C_WF_CONNECTION, don't go Disconnecting! */
628 disconnect_on_error = 0;
629 what = "connect";
630 err = sock->ops->connect(sock, (struct sockaddr *) &peer_in6, peer_addr_len, 0);
631
632out:
633 if (err < 0) {
634 if (sock) {
635 sock_release(sock);
636 sock = NULL;
637 }
638 switch (-err) {
639 /* timeout, busy, signal pending */
640 case ETIMEDOUT: case EAGAIN: case EINPROGRESS:
641 case EINTR: case ERESTARTSYS:
642 /* peer not (yet) available, network problem */
643 case ECONNREFUSED: case ENETUNREACH:
644 case EHOSTDOWN: case EHOSTUNREACH:
645 disconnect_on_error = 0;
646 break;
647 default:
648 drbd_err(connection, "%s failed, err = %d\n", what, err);
649 }
650 if (disconnect_on_error)
651 conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD);
652 }
653
654 return sock;
655}
656
657struct accept_wait_data {
658 struct drbd_connection *connection;
659 struct socket *s_listen;
660 struct completion door_bell;
661 void (*original_sk_state_change)(struct sock *sk);
662
663};
664
665static void drbd_incoming_connection(struct sock *sk)
666{
667 struct accept_wait_data *ad = sk->sk_user_data;
668 void (*state_change)(struct sock *sk);
669
670 state_change = ad->original_sk_state_change;
671 if (sk->sk_state == TCP_ESTABLISHED)
672 complete(&ad->door_bell);
673 state_change(sk);
674}
675
676static int prepare_listen_socket(struct drbd_connection *connection, struct accept_wait_data *ad)
677{
678 int err, sndbuf_size, rcvbuf_size, my_addr_len;
679 struct sockaddr_in6 my_addr;
680 struct socket *s_listen;
681 struct net_conf *nc;
682 const char *what;
683
684 rcu_read_lock();
685 nc = rcu_dereference(connection->net_conf);
686 if (!nc) {
687 rcu_read_unlock();
688 return -EIO;
689 }
690 sndbuf_size = nc->sndbuf_size;
691 rcvbuf_size = nc->rcvbuf_size;
692 rcu_read_unlock();
693
694 my_addr_len = min_t(int, connection->my_addr_len, sizeof(struct sockaddr_in6));
695 memcpy(&my_addr, &connection->my_addr, my_addr_len);
696
697 what = "sock_create_kern";
698 err = sock_create_kern(((struct sockaddr *)&my_addr)->sa_family,
699 SOCK_STREAM, IPPROTO_TCP, &s_listen);
700 if (err) {
701 s_listen = NULL;
702 goto out;
703 }
704
705 s_listen->sk->sk_reuse = SK_CAN_REUSE; /* SO_REUSEADDR */
706 drbd_setbufsize(s_listen, sndbuf_size, rcvbuf_size);
707
708 what = "bind before listen";
709 err = s_listen->ops->bind(s_listen, (struct sockaddr *)&my_addr, my_addr_len);
710 if (err < 0)
711 goto out;
712
713 ad->s_listen = s_listen;
714 write_lock_bh(&s_listen->sk->sk_callback_lock);
715 ad->original_sk_state_change = s_listen->sk->sk_state_change;
716 s_listen->sk->sk_state_change = drbd_incoming_connection;
717 s_listen->sk->sk_user_data = ad;
718 write_unlock_bh(&s_listen->sk->sk_callback_lock);
719
720 what = "listen";
721 err = s_listen->ops->listen(s_listen, 5);
722 if (err < 0)
723 goto out;
724
725 return 0;
726out:
727 if (s_listen)
728 sock_release(s_listen);
729 if (err < 0) {
730 if (err != -EAGAIN && err != -EINTR && err != -ERESTARTSYS) {
731 drbd_err(connection, "%s failed, err = %d\n", what, err);
732 conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD);
733 }
734 }
735
736 return -EIO;
737}
738
739static void unregister_state_change(struct sock *sk, struct accept_wait_data *ad)
740{
741 write_lock_bh(&sk->sk_callback_lock);
742 sk->sk_state_change = ad->original_sk_state_change;
743 sk->sk_user_data = NULL;
744 write_unlock_bh(&sk->sk_callback_lock);
745}
746
747static struct socket *drbd_wait_for_connect(struct drbd_connection *connection, struct accept_wait_data *ad)
748{
749 int timeo, connect_int, err = 0;
750 struct socket *s_estab = NULL;
751 struct net_conf *nc;
752
753 rcu_read_lock();
754 nc = rcu_dereference(connection->net_conf);
755 if (!nc) {
756 rcu_read_unlock();
757 return NULL;
758 }
759 connect_int = nc->connect_int;
760 rcu_read_unlock();
761
762 timeo = connect_int * HZ;
763 /* 28.5% random jitter */
764 timeo += (prandom_u32() & 1) ? timeo / 7 : -timeo / 7;
765
766 err = wait_for_completion_interruptible_timeout(&ad->door_bell, timeo);
767 if (err <= 0)
768 return NULL;
769
770 err = kernel_accept(ad->s_listen, &s_estab, 0);
771 if (err < 0) {
772 if (err != -EAGAIN && err != -EINTR && err != -ERESTARTSYS) {
773 drbd_err(connection, "accept failed, err = %d\n", err);
774 conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD);
775 }
776 }
777
778 if (s_estab)
779 unregister_state_change(s_estab->sk, ad);
780
781 return s_estab;
782}
783
784static int decode_header(struct drbd_connection *, void *, struct packet_info *);
785
786static int send_first_packet(struct drbd_connection *connection, struct drbd_socket *sock,
787 enum drbd_packet cmd)
788{
789 if (!conn_prepare_command(connection, sock))
790 return -EIO;
791 return conn_send_command(connection, sock, cmd, 0, NULL, 0);
792}
793
794static int receive_first_packet(struct drbd_connection *connection, struct socket *sock)
795{
796 unsigned int header_size = drbd_header_size(connection);
797 struct packet_info pi;
798 int err;
799
800 err = drbd_recv_short(sock, connection->data.rbuf, header_size, 0);
801 if (err != header_size) {
802 if (err >= 0)
803 err = -EIO;
804 return err;
805 }
806 err = decode_header(connection, connection->data.rbuf, &pi);
807 if (err)
808 return err;
809 return pi.cmd;
810}
811
812/**
813 * drbd_socket_okay() - Free the socket if its connection is not okay
814 * @sock: pointer to the pointer to the socket.
815 */
816static int drbd_socket_okay(struct socket **sock)
817{
818 int rr;
819 char tb[4];
820
821 if (!*sock)
822 return false;
823
824 rr = drbd_recv_short(*sock, tb, 4, MSG_DONTWAIT | MSG_PEEK);
825
826 if (rr > 0 || rr == -EAGAIN) {
827 return true;
828 } else {
829 sock_release(*sock);
830 *sock = NULL;
831 return false;
832 }
833}
834/* Gets called if a connection is established, or if a new minor gets created
835 in a connection */
836int drbd_connected(struct drbd_peer_device *peer_device)
837{
838 struct drbd_device *device = peer_device->device;
839 int err;
840
841 atomic_set(&device->packet_seq, 0);
842 device->peer_seq = 0;
843
844 device->state_mutex = peer_device->connection->agreed_pro_version < 100 ?
845 &peer_device->connection->cstate_mutex :
846 &device->own_state_mutex;
847
848 err = drbd_send_sync_param(peer_device);
849 if (!err)
850 err = drbd_send_sizes(peer_device, 0, 0);
851 if (!err)
852 err = drbd_send_uuids(peer_device);
853 if (!err)
854 err = drbd_send_current_state(peer_device);
855 clear_bit(USE_DEGR_WFC_T, &device->flags);
856 clear_bit(RESIZE_PENDING, &device->flags);
857 atomic_set(&device->ap_in_flight, 0);
858 mod_timer(&device->request_timer, jiffies + HZ); /* just start it here. */
859 return err;
860}
861
862/*
863 * return values:
864 * 1 yes, we have a valid connection
865 * 0 oops, did not work out, please try again
866 * -1 peer talks different language,
867 * no point in trying again, please go standalone.
868 * -2 We do not have a network config...
869 */
870static int conn_connect(struct drbd_connection *connection)
871{
872 struct drbd_socket sock, msock;
873 struct drbd_peer_device *peer_device;
874 struct net_conf *nc;
875 int vnr, timeout, h, ok;
876 bool discard_my_data;
877 enum drbd_state_rv rv;
878 struct accept_wait_data ad = {
879 .connection = connection,
880 .door_bell = COMPLETION_INITIALIZER_ONSTACK(ad.door_bell),
881 };
882
883 clear_bit(DISCONNECT_SENT, &connection->flags);
884 if (conn_request_state(connection, NS(conn, C_WF_CONNECTION), CS_VERBOSE) < SS_SUCCESS)
885 return -2;
886
887 mutex_init(&sock.mutex);
888 sock.sbuf = connection->data.sbuf;
889 sock.rbuf = connection->data.rbuf;
890 sock.socket = NULL;
891 mutex_init(&msock.mutex);
892 msock.sbuf = connection->meta.sbuf;
893 msock.rbuf = connection->meta.rbuf;
894 msock.socket = NULL;
895
896 /* Assume that the peer only understands protocol 80 until we know better. */
897 connection->agreed_pro_version = 80;
898
899 if (prepare_listen_socket(connection, &ad))
900 return 0;
901
902 do {
903 struct socket *s;
904
905 s = drbd_try_connect(connection);
906 if (s) {
907 if (!sock.socket) {
908 sock.socket = s;
909 send_first_packet(connection, &sock, P_INITIAL_DATA);
910 } else if (!msock.socket) {
911 clear_bit(RESOLVE_CONFLICTS, &connection->flags);
912 msock.socket = s;
913 send_first_packet(connection, &msock, P_INITIAL_META);
914 } else {
915 drbd_err(connection, "Logic error in conn_connect()\n");
916 goto out_release_sockets;
917 }
918 }
919
920 if (sock.socket && msock.socket) {
921 rcu_read_lock();
922 nc = rcu_dereference(connection->net_conf);
923 timeout = nc->ping_timeo * HZ / 10;
924 rcu_read_unlock();
925 schedule_timeout_interruptible(timeout);
926 ok = drbd_socket_okay(&sock.socket);
927 ok = drbd_socket_okay(&msock.socket) && ok;
928 if (ok)
929 break;
930 }
931
932retry:
933 s = drbd_wait_for_connect(connection, &ad);
934 if (s) {
935 int fp = receive_first_packet(connection, s);
936 drbd_socket_okay(&sock.socket);
937 drbd_socket_okay(&msock.socket);
938 switch (fp) {
939 case P_INITIAL_DATA:
940 if (sock.socket) {
941 drbd_warn(connection, "initial packet S crossed\n");
942 sock_release(sock.socket);
943 sock.socket = s;
944 goto randomize;
945 }
946 sock.socket = s;
947 break;
948 case P_INITIAL_META:
949 set_bit(RESOLVE_CONFLICTS, &connection->flags);
950 if (msock.socket) {
951 drbd_warn(connection, "initial packet M crossed\n");
952 sock_release(msock.socket);
953 msock.socket = s;
954 goto randomize;
955 }
956 msock.socket = s;
957 break;
958 default:
959 drbd_warn(connection, "Error receiving initial packet\n");
960 sock_release(s);
961randomize:
962 if (prandom_u32() & 1)
963 goto retry;
964 }
965 }
966
967 if (connection->cstate <= C_DISCONNECTING)
968 goto out_release_sockets;
969 if (signal_pending(current)) {
970 flush_signals(current);
971 smp_rmb();
972 if (get_t_state(&connection->receiver) == EXITING)
973 goto out_release_sockets;
974 }
975
976 ok = drbd_socket_okay(&sock.socket);
977 ok = drbd_socket_okay(&msock.socket) && ok;
978 } while (!ok);
979
980 if (ad.s_listen)
981 sock_release(ad.s_listen);
982
983 sock.socket->sk->sk_reuse = SK_CAN_REUSE; /* SO_REUSEADDR */
984 msock.socket->sk->sk_reuse = SK_CAN_REUSE; /* SO_REUSEADDR */
985
986 sock.socket->sk->sk_allocation = GFP_NOIO;
987 msock.socket->sk->sk_allocation = GFP_NOIO;
988
989 sock.socket->sk->sk_priority = TC_PRIO_INTERACTIVE_BULK;
990 msock.socket->sk->sk_priority = TC_PRIO_INTERACTIVE;
991
992 /* NOT YET ...
993 * sock.socket->sk->sk_sndtimeo = connection->net_conf->timeout*HZ/10;
994 * sock.socket->sk->sk_rcvtimeo = MAX_SCHEDULE_TIMEOUT;
995 * first set it to the P_CONNECTION_FEATURES timeout,
996 * which we set to 4x the configured ping_timeout. */
997 rcu_read_lock();
998 nc = rcu_dereference(connection->net_conf);
999
1000 sock.socket->sk->sk_sndtimeo =
1001 sock.socket->sk->sk_rcvtimeo = nc->ping_timeo*4*HZ/10;
1002
1003 msock.socket->sk->sk_rcvtimeo = nc->ping_int*HZ;
1004 timeout = nc->timeout * HZ / 10;
1005 discard_my_data = nc->discard_my_data;
1006 rcu_read_unlock();
1007
1008 msock.socket->sk->sk_sndtimeo = timeout;
1009
1010 /* we don't want delays.
1011 * we use TCP_CORK where appropriate, though */
1012 drbd_tcp_nodelay(sock.socket);
1013 drbd_tcp_nodelay(msock.socket);
1014
1015 connection->data.socket = sock.socket;
1016 connection->meta.socket = msock.socket;
1017 connection->last_received = jiffies;
1018
1019 h = drbd_do_features(connection);
1020 if (h <= 0)
1021 return h;
1022
1023 if (connection->cram_hmac_tfm) {
1024 /* drbd_request_state(device, NS(conn, WFAuth)); */
1025 switch (drbd_do_auth(connection)) {
1026 case -1:
1027 drbd_err(connection, "Authentication of peer failed\n");
1028 return -1;
1029 case 0:
1030 drbd_err(connection, "Authentication of peer failed, trying again.\n");
1031 return 0;
1032 }
1033 }
1034
1035 connection->data.socket->sk->sk_sndtimeo = timeout;
1036 connection->data.socket->sk->sk_rcvtimeo = MAX_SCHEDULE_TIMEOUT;
1037
1038 if (drbd_send_protocol(connection) == -EOPNOTSUPP)
1039 return -1;
1040
1041 set_bit(STATE_SENT, &connection->flags);
1042
1043 rcu_read_lock();
1044 idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
1045 struct drbd_device *device = peer_device->device;
1046 kref_get(&device->kref);
1047 rcu_read_unlock();
1048
1049 /* Prevent a race between resync-handshake and
1050 * being promoted to Primary.
1051 *
1052 * Grab and release the state mutex, so we know that any current
1053 * drbd_set_role() is finished, and any incoming drbd_set_role
1054 * will see the STATE_SENT flag, and wait for it to be cleared.
1055 */
1056 mutex_lock(device->state_mutex);
1057 mutex_unlock(device->state_mutex);
1058
1059 if (discard_my_data)
1060 set_bit(DISCARD_MY_DATA, &device->flags);
1061 else
1062 clear_bit(DISCARD_MY_DATA, &device->flags);
1063
1064 drbd_connected(peer_device);
1065 kref_put(&device->kref, drbd_destroy_device);
1066 rcu_read_lock();
1067 }
1068 rcu_read_unlock();
1069
1070 rv = conn_request_state(connection, NS(conn, C_WF_REPORT_PARAMS), CS_VERBOSE);
1071 if (rv < SS_SUCCESS || connection->cstate != C_WF_REPORT_PARAMS) {
1072 clear_bit(STATE_SENT, &connection->flags);
1073 return 0;
1074 }
1075
1076 drbd_thread_start(&connection->asender);
1077
1078 mutex_lock(&connection->resource->conf_update);
1079 /* The discard_my_data flag is a single-shot modifier to the next
1080 * connection attempt, the handshake of which is now well underway.
1081 * No need for rcu style copying of the whole struct
1082 * just to clear a single value. */
1083 connection->net_conf->discard_my_data = 0;
1084 mutex_unlock(&connection->resource->conf_update);
1085
1086 return h;
1087
1088out_release_sockets:
1089 if (ad.s_listen)
1090 sock_release(ad.s_listen);
1091 if (sock.socket)
1092 sock_release(sock.socket);
1093 if (msock.socket)
1094 sock_release(msock.socket);
1095 return -1;
1096}
1097
1098static int decode_header(struct drbd_connection *connection, void *header, struct packet_info *pi)
1099{
1100 unsigned int header_size = drbd_header_size(connection);
1101
1102 if (header_size == sizeof(struct p_header100) &&
1103 *(__be32 *)header == cpu_to_be32(DRBD_MAGIC_100)) {
1104 struct p_header100 *h = header;
1105 if (h->pad != 0) {
1106 drbd_err(connection, "Header padding is not zero\n");
1107 return -EINVAL;
1108 }
1109 pi->vnr = be16_to_cpu(h->volume);
1110 pi->cmd = be16_to_cpu(h->command);
1111 pi->size = be32_to_cpu(h->length);
1112 } else if (header_size == sizeof(struct p_header95) &&
1113 *(__be16 *)header == cpu_to_be16(DRBD_MAGIC_BIG)) {
1114 struct p_header95 *h = header;
1115 pi->cmd = be16_to_cpu(h->command);
1116 pi->size = be32_to_cpu(h->length);
1117 pi->vnr = 0;
1118 } else if (header_size == sizeof(struct p_header80) &&
1119 *(__be32 *)header == cpu_to_be32(DRBD_MAGIC)) {
1120 struct p_header80 *h = header;
1121 pi->cmd = be16_to_cpu(h->command);
1122 pi->size = be16_to_cpu(h->length);
1123 pi->vnr = 0;
1124 } else {
1125 drbd_err(connection, "Wrong magic value 0x%08x in protocol version %d\n",
1126 be32_to_cpu(*(__be32 *)header),
1127 connection->agreed_pro_version);
1128 return -EINVAL;
1129 }
1130 pi->data = header + header_size;
1131 return 0;
1132}
1133
1134static int drbd_recv_header(struct drbd_connection *connection, struct packet_info *pi)
1135{
1136 void *buffer = connection->data.rbuf;
1137 int err;
1138
1139 err = drbd_recv_all_warn(connection, buffer, drbd_header_size(connection));
1140 if (err)
1141 return err;
1142
1143 err = decode_header(connection, buffer, pi);
1144 connection->last_received = jiffies;
1145
1146 return err;
1147}
1148
1149static void drbd_flush(struct drbd_connection *connection)
1150{
1151 int rv;
1152 struct drbd_peer_device *peer_device;
1153 int vnr;
1154
1155 if (connection->write_ordering >= WO_bdev_flush) {
1156 rcu_read_lock();
1157 idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
1158 struct drbd_device *device = peer_device->device;
1159
1160 if (!get_ldev(device))
1161 continue;
1162 kref_get(&device->kref);
1163 rcu_read_unlock();
1164
1165 rv = blkdev_issue_flush(device->ldev->backing_bdev,
1166 GFP_NOIO, NULL);
1167 if (rv) {
1168 drbd_info(device, "local disk flush failed with status %d\n", rv);
1169 /* would rather check on EOPNOTSUPP, but that is not reliable.
1170 * don't try again for ANY return value != 0
1171 * if (rv == -EOPNOTSUPP) */
1172 drbd_bump_write_ordering(connection, WO_drain_io);
1173 }
1174 put_ldev(device);
1175 kref_put(&device->kref, drbd_destroy_device);
1176
1177 rcu_read_lock();
1178 if (rv)
1179 break;
1180 }
1181 rcu_read_unlock();
1182 }
1183}
1184
1185/**
1186 * drbd_may_finish_epoch() - Applies an epoch_event to the epoch's state, eventually finishes it.
1187 * @device: DRBD device.
1188 * @epoch: Epoch object.
1189 * @ev: Epoch event.
1190 */
1191static enum finish_epoch drbd_may_finish_epoch(struct drbd_connection *connection,
1192 struct drbd_epoch *epoch,
1193 enum epoch_event ev)
1194{
1195 int epoch_size;
1196 struct drbd_epoch *next_epoch;
1197 enum finish_epoch rv = FE_STILL_LIVE;
1198
1199 spin_lock(&connection->epoch_lock);
1200 do {
1201 next_epoch = NULL;
1202
1203 epoch_size = atomic_read(&epoch->epoch_size);
1204
1205 switch (ev & ~EV_CLEANUP) {
1206 case EV_PUT:
1207 atomic_dec(&epoch->active);
1208 break;
1209 case EV_GOT_BARRIER_NR:
1210 set_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags);
1211 break;
1212 case EV_BECAME_LAST:
1213 /* nothing to do*/
1214 break;
1215 }
1216
1217 if (epoch_size != 0 &&
1218 atomic_read(&epoch->active) == 0 &&
1219 (test_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags) || ev & EV_CLEANUP)) {
1220 if (!(ev & EV_CLEANUP)) {
1221 spin_unlock(&connection->epoch_lock);
1222 drbd_send_b_ack(epoch->connection, epoch->barrier_nr, epoch_size);
1223 spin_lock(&connection->epoch_lock);
1224 }
1225#if 0
1226 /* FIXME: dec unacked on connection, once we have
1227 * something to count pending connection packets in. */
1228 if (test_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags))
1229 dec_unacked(epoch->connection);
1230#endif
1231
1232 if (connection->current_epoch != epoch) {
1233 next_epoch = list_entry(epoch->list.next, struct drbd_epoch, list);
1234 list_del(&epoch->list);
1235 ev = EV_BECAME_LAST | (ev & EV_CLEANUP);
1236 connection->epochs--;
1237 kfree(epoch);
1238
1239 if (rv == FE_STILL_LIVE)
1240 rv = FE_DESTROYED;
1241 } else {
1242 epoch->flags = 0;
1243 atomic_set(&epoch->epoch_size, 0);
1244 /* atomic_set(&epoch->active, 0); is already zero */
1245 if (rv == FE_STILL_LIVE)
1246 rv = FE_RECYCLED;
1247 }
1248 }
1249
1250 if (!next_epoch)
1251 break;
1252
1253 epoch = next_epoch;
1254 } while (1);
1255
1256 spin_unlock(&connection->epoch_lock);
1257
1258 return rv;
1259}
1260
1261/**
1262 * drbd_bump_write_ordering() - Fall back to an other write ordering method
1263 * @connection: DRBD connection.
1264 * @wo: Write ordering method to try.
1265 */
1266void drbd_bump_write_ordering(struct drbd_connection *connection, enum write_ordering_e wo)
1267{
1268 struct disk_conf *dc;
1269 struct drbd_peer_device *peer_device;
1270 enum write_ordering_e pwo;
1271 int vnr;
1272 static char *write_ordering_str[] = {
1273 [WO_none] = "none",
1274 [WO_drain_io] = "drain",
1275 [WO_bdev_flush] = "flush",
1276 };
1277
1278 pwo = connection->write_ordering;
1279 wo = min(pwo, wo);
1280 rcu_read_lock();
1281 idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
1282 struct drbd_device *device = peer_device->device;
1283
1284 if (!get_ldev_if_state(device, D_ATTACHING))
1285 continue;
1286 dc = rcu_dereference(device->ldev->disk_conf);
1287
1288 if (wo == WO_bdev_flush && !dc->disk_flushes)
1289 wo = WO_drain_io;
1290 if (wo == WO_drain_io && !dc->disk_drain)
1291 wo = WO_none;
1292 put_ldev(device);
1293 }
1294 rcu_read_unlock();
1295 connection->write_ordering = wo;
1296 if (pwo != connection->write_ordering || wo == WO_bdev_flush)
1297 drbd_info(connection, "Method to ensure write ordering: %s\n", write_ordering_str[connection->write_ordering]);
1298}
1299
1300/**
1301 * drbd_submit_peer_request()
1302 * @device: DRBD device.
1303 * @peer_req: peer request
1304 * @rw: flag field, see bio->bi_rw
1305 *
1306 * May spread the pages to multiple bios,
1307 * depending on bio_add_page restrictions.
1308 *
1309 * Returns 0 if all bios have been submitted,
1310 * -ENOMEM if we could not allocate enough bios,
1311 * -ENOSPC (any better suggestion?) if we have not been able to bio_add_page a
1312 * single page to an empty bio (which should never happen and likely indicates
1313 * that the lower level IO stack is in some way broken). This has been observed
1314 * on certain Xen deployments.
1315 */
1316/* TODO allocate from our own bio_set. */
1317int drbd_submit_peer_request(struct drbd_device *device,
1318 struct drbd_peer_request *peer_req,
1319 const unsigned rw, const int fault_type)
1320{
1321 struct bio *bios = NULL;
1322 struct bio *bio;
1323 struct page *page = peer_req->pages;
1324 sector_t sector = peer_req->i.sector;
1325 unsigned ds = peer_req->i.size;
1326 unsigned n_bios = 0;
1327 unsigned nr_pages = (ds + PAGE_SIZE -1) >> PAGE_SHIFT;
1328 int err = -ENOMEM;
1329
1330 /* In most cases, we will only need one bio. But in case the lower
1331 * level restrictions happen to be different at this offset on this
1332 * side than those of the sending peer, we may need to submit the
1333 * request in more than one bio.
1334 *
1335 * Plain bio_alloc is good enough here, this is no DRBD internally
1336 * generated bio, but a bio allocated on behalf of the peer.
1337 */
1338next_bio:
1339 bio = bio_alloc(GFP_NOIO, nr_pages);
1340 if (!bio) {
1341 drbd_err(device, "submit_ee: Allocation of a bio failed\n");
1342 goto fail;
1343 }
1344 /* > peer_req->i.sector, unless this is the first bio */
1345 bio->bi_iter.bi_sector = sector;
1346 bio->bi_bdev = device->ldev->backing_bdev;
1347 bio->bi_rw = rw;
1348 bio->bi_private = peer_req;
1349 bio->bi_end_io = drbd_peer_request_endio;
1350
1351 bio->bi_next = bios;
1352 bios = bio;
1353 ++n_bios;
1354
1355 page_chain_for_each(page) {
1356 unsigned len = min_t(unsigned, ds, PAGE_SIZE);
1357 if (!bio_add_page(bio, page, len, 0)) {
1358 /* A single page must always be possible!
1359 * But in case it fails anyways,
1360 * we deal with it, and complain (below). */
1361 if (bio->bi_vcnt == 0) {
1362 drbd_err(device,
1363 "bio_add_page failed for len=%u, "
1364 "bi_vcnt=0 (bi_sector=%llu)\n",
1365 len, (uint64_t)bio->bi_iter.bi_sector);
1366 err = -ENOSPC;
1367 goto fail;
1368 }
1369 goto next_bio;
1370 }
1371 ds -= len;
1372 sector += len >> 9;
1373 --nr_pages;
1374 }
1375 D_ASSERT(device, page == NULL);
1376 D_ASSERT(device, ds == 0);
1377
1378 atomic_set(&peer_req->pending_bios, n_bios);
1379 do {
1380 bio = bios;
1381 bios = bios->bi_next;
1382 bio->bi_next = NULL;
1383
1384 drbd_generic_make_request(device, fault_type, bio);
1385 } while (bios);
1386 return 0;
1387
1388fail:
1389 while (bios) {
1390 bio = bios;
1391 bios = bios->bi_next;
1392 bio_put(bio);
1393 }
1394 return err;
1395}
1396
1397static void drbd_remove_epoch_entry_interval(struct drbd_device *device,
1398 struct drbd_peer_request *peer_req)
1399{
1400 struct drbd_interval *i = &peer_req->i;
1401
1402 drbd_remove_interval(&device->write_requests, i);
1403 drbd_clear_interval(i);
1404
1405 /* Wake up any processes waiting for this peer request to complete. */
1406 if (i->waiting)
1407 wake_up(&device->misc_wait);
1408}
1409
1410static void conn_wait_active_ee_empty(struct drbd_connection *connection)
1411{
1412 struct drbd_peer_device *peer_device;
1413 int vnr;
1414
1415 rcu_read_lock();
1416 idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
1417 struct drbd_device *device = peer_device->device;
1418
1419 kref_get(&device->kref);
1420 rcu_read_unlock();
1421 drbd_wait_ee_list_empty(device, &device->active_ee);
1422 kref_put(&device->kref, drbd_destroy_device);
1423 rcu_read_lock();
1424 }
1425 rcu_read_unlock();
1426}
1427
1428static struct drbd_peer_device *
1429conn_peer_device(struct drbd_connection *connection, int volume_number)
1430{
1431 return idr_find(&connection->peer_devices, volume_number);
1432}
1433
1434static int receive_Barrier(struct drbd_connection *connection, struct packet_info *pi)
1435{
1436 int rv;
1437 struct p_barrier *p = pi->data;
1438 struct drbd_epoch *epoch;
1439
1440 /* FIXME these are unacked on connection,
1441 * not a specific (peer)device.
1442 */
1443 connection->current_epoch->barrier_nr = p->barrier;
1444 connection->current_epoch->connection = connection;
1445 rv = drbd_may_finish_epoch(connection, connection->current_epoch, EV_GOT_BARRIER_NR);
1446
1447 /* P_BARRIER_ACK may imply that the corresponding extent is dropped from
1448 * the activity log, which means it would not be resynced in case the
1449 * R_PRIMARY crashes now.
1450 * Therefore we must send the barrier_ack after the barrier request was
1451 * completed. */
1452 switch (connection->write_ordering) {
1453 case WO_none:
1454 if (rv == FE_RECYCLED)
1455 return 0;
1456
1457 /* receiver context, in the writeout path of the other node.
1458 * avoid potential distributed deadlock */
1459 epoch = kmalloc(sizeof(struct drbd_epoch), GFP_NOIO);
1460 if (epoch)
1461 break;
1462 else
1463 drbd_warn(connection, "Allocation of an epoch failed, slowing down\n");
1464 /* Fall through */
1465
1466 case WO_bdev_flush:
1467 case WO_drain_io:
1468 conn_wait_active_ee_empty(connection);
1469 drbd_flush(connection);
1470
1471 if (atomic_read(&connection->current_epoch->epoch_size)) {
1472 epoch = kmalloc(sizeof(struct drbd_epoch), GFP_NOIO);
1473 if (epoch)
1474 break;
1475 }
1476
1477 return 0;
1478 default:
1479 drbd_err(connection, "Strangeness in connection->write_ordering %d\n", connection->write_ordering);
1480 return -EIO;
1481 }
1482
1483 epoch->flags = 0;
1484 atomic_set(&epoch->epoch_size, 0);
1485 atomic_set(&epoch->active, 0);
1486
1487 spin_lock(&connection->epoch_lock);
1488 if (atomic_read(&connection->current_epoch->epoch_size)) {
1489 list_add(&epoch->list, &connection->current_epoch->list);
1490 connection->current_epoch = epoch;
1491 connection->epochs++;
1492 } else {
1493 /* The current_epoch got recycled while we allocated this one... */
1494 kfree(epoch);
1495 }
1496 spin_unlock(&connection->epoch_lock);
1497
1498 return 0;
1499}
1500
1501/* used from receive_RSDataReply (recv_resync_read)
1502 * and from receive_Data */
1503static struct drbd_peer_request *
1504read_in_block(struct drbd_peer_device *peer_device, u64 id, sector_t sector,
1505 int data_size) __must_hold(local)
1506{
1507 struct drbd_device *device = peer_device->device;
1508 const sector_t capacity = drbd_get_capacity(device->this_bdev);
1509 struct drbd_peer_request *peer_req;
1510 struct page *page;
1511 int dgs, ds, err;
1512 void *dig_in = peer_device->connection->int_dig_in;
1513 void *dig_vv = peer_device->connection->int_dig_vv;
1514 unsigned long *data;
1515
1516 dgs = 0;
1517 if (peer_device->connection->peer_integrity_tfm) {
1518 dgs = crypto_hash_digestsize(peer_device->connection->peer_integrity_tfm);
1519 /*
1520 * FIXME: Receive the incoming digest into the receive buffer
1521 * here, together with its struct p_data?
1522 */
1523 err = drbd_recv_all_warn(peer_device->connection, dig_in, dgs);
1524 if (err)
1525 return NULL;
1526 data_size -= dgs;
1527 }
1528
1529 if (!expect(IS_ALIGNED(data_size, 512)))
1530 return NULL;
1531 if (!expect(data_size <= DRBD_MAX_BIO_SIZE))
1532 return NULL;
1533
1534 /* even though we trust out peer,
1535 * we sometimes have to double check. */
1536 if (sector + (data_size>>9) > capacity) {
1537 drbd_err(device, "request from peer beyond end of local disk: "
1538 "capacity: %llus < sector: %llus + size: %u\n",
1539 (unsigned long long)capacity,
1540 (unsigned long long)sector, data_size);
1541 return NULL;
1542 }
1543
1544 /* GFP_NOIO, because we must not cause arbitrary write-out: in a DRBD
1545 * "criss-cross" setup, that might cause write-out on some other DRBD,
1546 * which in turn might block on the other node at this very place. */
1547 peer_req = drbd_alloc_peer_req(peer_device, id, sector, data_size, GFP_NOIO);
1548 if (!peer_req)
1549 return NULL;
1550
1551 if (!data_size)
1552 return peer_req;
1553
1554 ds = data_size;
1555 page = peer_req->pages;
1556 page_chain_for_each(page) {
1557 unsigned len = min_t(int, ds, PAGE_SIZE);
1558 data = kmap(page);
1559 err = drbd_recv_all_warn(peer_device->connection, data, len);
1560 if (drbd_insert_fault(device, DRBD_FAULT_RECEIVE)) {
1561 drbd_err(device, "Fault injection: Corrupting data on receive\n");
1562 data[0] = data[0] ^ (unsigned long)-1;
1563 }
1564 kunmap(page);
1565 if (err) {
1566 drbd_free_peer_req(device, peer_req);
1567 return NULL;
1568 }
1569 ds -= len;
1570 }
1571
1572 if (dgs) {
1573 drbd_csum_ee(peer_device->connection->peer_integrity_tfm, peer_req, dig_vv);
1574 if (memcmp(dig_in, dig_vv, dgs)) {
1575 drbd_err(device, "Digest integrity check FAILED: %llus +%u\n",
1576 (unsigned long long)sector, data_size);
1577 drbd_free_peer_req(device, peer_req);
1578 return NULL;
1579 }
1580 }
1581 device->recv_cnt += data_size>>9;
1582 return peer_req;
1583}
1584
1585/* drbd_drain_block() just takes a data block
1586 * out of the socket input buffer, and discards it.
1587 */
1588static int drbd_drain_block(struct drbd_peer_device *peer_device, int data_size)
1589{
1590 struct page *page;
1591 int err = 0;
1592 void *data;
1593
1594 if (!data_size)
1595 return 0;
1596
1597 page = drbd_alloc_pages(peer_device, 1, 1);
1598
1599 data = kmap(page);
1600 while (data_size) {
1601 unsigned int len = min_t(int, data_size, PAGE_SIZE);
1602
1603 err = drbd_recv_all_warn(peer_device->connection, data, len);
1604 if (err)
1605 break;
1606 data_size -= len;
1607 }
1608 kunmap(page);
1609 drbd_free_pages(peer_device->device, page, 0);
1610 return err;
1611}
1612
1613static int recv_dless_read(struct drbd_peer_device *peer_device, struct drbd_request *req,
1614 sector_t sector, int data_size)
1615{
1616 struct bio_vec bvec;
1617 struct bvec_iter iter;
1618 struct bio *bio;
1619 int dgs, err, expect;
1620 void *dig_in = peer_device->connection->int_dig_in;
1621 void *dig_vv = peer_device->connection->int_dig_vv;
1622
1623 dgs = 0;
1624 if (peer_device->connection->peer_integrity_tfm) {
1625 dgs = crypto_hash_digestsize(peer_device->connection->peer_integrity_tfm);
1626 err = drbd_recv_all_warn(peer_device->connection, dig_in, dgs);
1627 if (err)
1628 return err;
1629 data_size -= dgs;
1630 }
1631
1632 /* optimistically update recv_cnt. if receiving fails below,
1633 * we disconnect anyways, and counters will be reset. */
1634 peer_device->device->recv_cnt += data_size>>9;
1635
1636 bio = req->master_bio;
1637 D_ASSERT(peer_device->device, sector == bio->bi_iter.bi_sector);
1638
1639 bio_for_each_segment(bvec, bio, iter) {
1640 void *mapped = kmap(bvec.bv_page) + bvec.bv_offset;
1641 expect = min_t(int, data_size, bvec.bv_len);
1642 err = drbd_recv_all_warn(peer_device->connection, mapped, expect);
1643 kunmap(bvec.bv_page);
1644 if (err)
1645 return err;
1646 data_size -= expect;
1647 }
1648
1649 if (dgs) {
1650 drbd_csum_bio(peer_device->connection->peer_integrity_tfm, bio, dig_vv);
1651 if (memcmp(dig_in, dig_vv, dgs)) {
1652 drbd_err(peer_device, "Digest integrity check FAILED. Broken NICs?\n");
1653 return -EINVAL;
1654 }
1655 }
1656
1657 D_ASSERT(peer_device->device, data_size == 0);
1658 return 0;
1659}
1660
1661/*
1662 * e_end_resync_block() is called in asender context via
1663 * drbd_finish_peer_reqs().
1664 */
1665static int e_end_resync_block(struct drbd_work *w, int unused)
1666{
1667 struct drbd_peer_request *peer_req =
1668 container_of(w, struct drbd_peer_request, w);
1669 struct drbd_device *device = w->device;
1670 sector_t sector = peer_req->i.sector;
1671 int err;
1672
1673 D_ASSERT(device, drbd_interval_empty(&peer_req->i));
1674
1675 if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
1676 drbd_set_in_sync(device, sector, peer_req->i.size);
1677 err = drbd_send_ack(first_peer_device(device), P_RS_WRITE_ACK, peer_req);
1678 } else {
1679 /* Record failure to sync */
1680 drbd_rs_failed_io(device, sector, peer_req->i.size);
1681
1682 err = drbd_send_ack(first_peer_device(device), P_NEG_ACK, peer_req);
1683 }
1684 dec_unacked(device);
1685
1686 return err;
1687}
1688
1689static int recv_resync_read(struct drbd_peer_device *peer_device, sector_t sector,
1690 int data_size) __releases(local)
1691{
1692 struct drbd_device *device = peer_device->device;
1693 struct drbd_peer_request *peer_req;
1694
1695 peer_req = read_in_block(peer_device, ID_SYNCER, sector, data_size);
1696 if (!peer_req)
1697 goto fail;
1698
1699 dec_rs_pending(device);
1700
1701 inc_unacked(device);
1702 /* corresponding dec_unacked() in e_end_resync_block()
1703 * respective _drbd_clear_done_ee */
1704
1705 peer_req->w.cb = e_end_resync_block;
1706
1707 spin_lock_irq(&device->resource->req_lock);
1708 list_add(&peer_req->w.list, &device->sync_ee);
1709 spin_unlock_irq(&device->resource->req_lock);
1710
1711 atomic_add(data_size >> 9, &device->rs_sect_ev);
1712 if (drbd_submit_peer_request(device, peer_req, WRITE, DRBD_FAULT_RS_WR) == 0)
1713 return 0;
1714
1715 /* don't care for the reason here */
1716 drbd_err(device, "submit failed, triggering re-connect\n");
1717 spin_lock_irq(&device->resource->req_lock);
1718 list_del(&peer_req->w.list);
1719 spin_unlock_irq(&device->resource->req_lock);
1720
1721 drbd_free_peer_req(device, peer_req);
1722fail:
1723 put_ldev(device);
1724 return -EIO;
1725}
1726
1727static struct drbd_request *
1728find_request(struct drbd_device *device, struct rb_root *root, u64 id,
1729 sector_t sector, bool missing_ok, const char *func)
1730{
1731 struct drbd_request *req;
1732
1733 /* Request object according to our peer */
1734 req = (struct drbd_request *)(unsigned long)id;
1735 if (drbd_contains_interval(root, sector, &req->i) && req->i.local)
1736 return req;
1737 if (!missing_ok) {
1738 drbd_err(device, "%s: failed to find request 0x%lx, sector %llus\n", func,
1739 (unsigned long)id, (unsigned long long)sector);
1740 }
1741 return NULL;
1742}
1743
1744static int receive_DataReply(struct drbd_connection *connection, struct packet_info *pi)
1745{
1746 struct drbd_peer_device *peer_device;
1747 struct drbd_device *device;
1748 struct drbd_request *req;
1749 sector_t sector;
1750 int err;
1751 struct p_data *p = pi->data;
1752
1753 peer_device = conn_peer_device(connection, pi->vnr);
1754 if (!peer_device)
1755 return -EIO;
1756 device = peer_device->device;
1757
1758 sector = be64_to_cpu(p->sector);
1759
1760 spin_lock_irq(&device->resource->req_lock);
1761 req = find_request(device, &device->read_requests, p->block_id, sector, false, __func__);
1762 spin_unlock_irq(&device->resource->req_lock);
1763 if (unlikely(!req))
1764 return -EIO;
1765
1766 /* hlist_del(&req->collision) is done in _req_may_be_done, to avoid
1767 * special casing it there for the various failure cases.
1768 * still no race with drbd_fail_pending_reads */
1769 err = recv_dless_read(peer_device, req, sector, pi->size);
1770 if (!err)
1771 req_mod(req, DATA_RECEIVED);
1772 /* else: nothing. handled from drbd_disconnect...
1773 * I don't think we may complete this just yet
1774 * in case we are "on-disconnect: freeze" */
1775
1776 return err;
1777}
1778
1779static int receive_RSDataReply(struct drbd_connection *connection, struct packet_info *pi)
1780{
1781 struct drbd_peer_device *peer_device;
1782 struct drbd_device *device;
1783 sector_t sector;
1784 int err;
1785 struct p_data *p = pi->data;
1786
1787 peer_device = conn_peer_device(connection, pi->vnr);
1788 if (!peer_device)
1789 return -EIO;
1790 device = peer_device->device;
1791
1792 sector = be64_to_cpu(p->sector);
1793 D_ASSERT(device, p->block_id == ID_SYNCER);
1794
1795 if (get_ldev(device)) {
1796 /* data is submitted to disk within recv_resync_read.
1797 * corresponding put_ldev done below on error,
1798 * or in drbd_peer_request_endio. */
1799 err = recv_resync_read(peer_device, sector, pi->size);
1800 } else {
1801 if (__ratelimit(&drbd_ratelimit_state))
1802 drbd_err(device, "Can not write resync data to local disk.\n");
1803
1804 err = drbd_drain_block(peer_device, pi->size);
1805
1806 drbd_send_ack_dp(peer_device, P_NEG_ACK, p, pi->size);
1807 }
1808
1809 atomic_add(pi->size >> 9, &device->rs_sect_in);
1810
1811 return err;
1812}
1813
1814static void restart_conflicting_writes(struct drbd_device *device,
1815 sector_t sector, int size)
1816{
1817 struct drbd_interval *i;
1818 struct drbd_request *req;
1819
1820 drbd_for_each_overlap(i, &device->write_requests, sector, size) {
1821 if (!i->local)
1822 continue;
1823 req = container_of(i, struct drbd_request, i);
1824 if (req->rq_state & RQ_LOCAL_PENDING ||
1825 !(req->rq_state & RQ_POSTPONED))
1826 continue;
1827 /* as it is RQ_POSTPONED, this will cause it to
1828 * be queued on the retry workqueue. */
1829 __req_mod(req, CONFLICT_RESOLVED, NULL);
1830 }
1831}
1832
1833/*
1834 * e_end_block() is called in asender context via drbd_finish_peer_reqs().
1835 */
1836static int e_end_block(struct drbd_work *w, int cancel)
1837{
1838 struct drbd_peer_request *peer_req =
1839 container_of(w, struct drbd_peer_request, w);
1840 struct drbd_device *device = w->device;
1841 sector_t sector = peer_req->i.sector;
1842 int err = 0, pcmd;
1843
1844 if (peer_req->flags & EE_SEND_WRITE_ACK) {
1845 if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
1846 pcmd = (device->state.conn >= C_SYNC_SOURCE &&
1847 device->state.conn <= C_PAUSED_SYNC_T &&
1848 peer_req->flags & EE_MAY_SET_IN_SYNC) ?
1849 P_RS_WRITE_ACK : P_WRITE_ACK;
1850 err = drbd_send_ack(first_peer_device(device), pcmd, peer_req);
1851 if (pcmd == P_RS_WRITE_ACK)
1852 drbd_set_in_sync(device, sector, peer_req->i.size);
1853 } else {
1854 err = drbd_send_ack(first_peer_device(device), P_NEG_ACK, peer_req);
1855 /* we expect it to be marked out of sync anyways...
1856 * maybe assert this? */
1857 }
1858 dec_unacked(device);
1859 }
1860 /* we delete from the conflict detection hash _after_ we sent out the
1861 * P_WRITE_ACK / P_NEG_ACK, to get the sequence number right. */
1862 if (peer_req->flags & EE_IN_INTERVAL_TREE) {
1863 spin_lock_irq(&device->resource->req_lock);
1864 D_ASSERT(device, !drbd_interval_empty(&peer_req->i));
1865 drbd_remove_epoch_entry_interval(device, peer_req);
1866 if (peer_req->flags & EE_RESTART_REQUESTS)
1867 restart_conflicting_writes(device, sector, peer_req->i.size);
1868 spin_unlock_irq(&device->resource->req_lock);
1869 } else
1870 D_ASSERT(device, drbd_interval_empty(&peer_req->i));
1871
1872 drbd_may_finish_epoch(first_peer_device(device)->connection, peer_req->epoch, EV_PUT + (cancel ? EV_CLEANUP : 0));
1873
1874 return err;
1875}
1876
1877static int e_send_ack(struct drbd_work *w, enum drbd_packet ack)
1878{
1879 struct drbd_device *device = w->device;
1880 struct drbd_peer_request *peer_req =
1881 container_of(w, struct drbd_peer_request, w);
1882 int err;
1883
1884 err = drbd_send_ack(first_peer_device(device), ack, peer_req);
1885 dec_unacked(device);
1886
1887 return err;
1888}
1889
1890static int e_send_superseded(struct drbd_work *w, int unused)
1891{
1892 return e_send_ack(w, P_SUPERSEDED);
1893}
1894
1895static int e_send_retry_write(struct drbd_work *w, int unused)
1896{
1897 struct drbd_connection *connection = first_peer_device(w->device)->connection;
1898
1899 return e_send_ack(w, connection->agreed_pro_version >= 100 ?
1900 P_RETRY_WRITE : P_SUPERSEDED);
1901}
1902
1903static bool seq_greater(u32 a, u32 b)
1904{
1905 /*
1906 * We assume 32-bit wrap-around here.
1907 * For 24-bit wrap-around, we would have to shift:
1908 * a <<= 8; b <<= 8;
1909 */
1910 return (s32)a - (s32)b > 0;
1911}
1912
1913static u32 seq_max(u32 a, u32 b)
1914{
1915 return seq_greater(a, b) ? a : b;
1916}
1917
1918static void update_peer_seq(struct drbd_peer_device *peer_device, unsigned int peer_seq)
1919{
1920 struct drbd_device *device = peer_device->device;
1921 unsigned int newest_peer_seq;
1922
1923 if (test_bit(RESOLVE_CONFLICTS, &peer_device->connection->flags)) {
1924 spin_lock(&device->peer_seq_lock);
1925 newest_peer_seq = seq_max(device->peer_seq, peer_seq);
1926 device->peer_seq = newest_peer_seq;
1927 spin_unlock(&device->peer_seq_lock);
1928 /* wake up only if we actually changed device->peer_seq */
1929 if (peer_seq == newest_peer_seq)
1930 wake_up(&device->seq_wait);
1931 }
1932}
1933
1934static inline int overlaps(sector_t s1, int l1, sector_t s2, int l2)
1935{
1936 return !((s1 + (l1>>9) <= s2) || (s1 >= s2 + (l2>>9)));
1937}
1938
1939/* maybe change sync_ee into interval trees as well? */
1940static bool overlapping_resync_write(struct drbd_device *device, struct drbd_peer_request *peer_req)
1941{
1942 struct drbd_peer_request *rs_req;
1943 bool rv = 0;
1944
1945 spin_lock_irq(&device->resource->req_lock);
1946 list_for_each_entry(rs_req, &device->sync_ee, w.list) {
1947 if (overlaps(peer_req->i.sector, peer_req->i.size,
1948 rs_req->i.sector, rs_req->i.size)) {
1949 rv = 1;
1950 break;
1951 }
1952 }
1953 spin_unlock_irq(&device->resource->req_lock);
1954
1955 return rv;
1956}
1957
1958/* Called from receive_Data.
1959 * Synchronize packets on sock with packets on msock.
1960 *
1961 * This is here so even when a P_DATA packet traveling via sock overtook an Ack
1962 * packet traveling on msock, they are still processed in the order they have
1963 * been sent.
1964 *
1965 * Note: we don't care for Ack packets overtaking P_DATA packets.
1966 *
1967 * In case packet_seq is larger than device->peer_seq number, there are
1968 * outstanding packets on the msock. We wait for them to arrive.
1969 * In case we are the logically next packet, we update device->peer_seq
1970 * ourselves. Correctly handles 32bit wrap around.
1971 *
1972 * Assume we have a 10 GBit connection, that is about 1<<30 byte per second,
1973 * about 1<<21 sectors per second. So "worst" case, we have 1<<3 == 8 seconds
1974 * for the 24bit wrap (historical atomic_t guarantee on some archs), and we have
1975 * 1<<9 == 512 seconds aka ages for the 32bit wrap around...
1976 *
1977 * returns 0 if we may process the packet,
1978 * -ERESTARTSYS if we were interrupted (by disconnect signal). */
1979static int wait_for_and_update_peer_seq(struct drbd_peer_device *peer_device, const u32 peer_seq)
1980{
1981 struct drbd_device *device = peer_device->device;
1982 DEFINE_WAIT(wait);
1983 long timeout;
1984 int ret = 0, tp;
1985
1986 if (!test_bit(RESOLVE_CONFLICTS, &peer_device->connection->flags))
1987 return 0;
1988
1989 spin_lock(&device->peer_seq_lock);
1990 for (;;) {
1991 if (!seq_greater(peer_seq - 1, device->peer_seq)) {
1992 device->peer_seq = seq_max(device->peer_seq, peer_seq);
1993 break;
1994 }
1995
1996 if (signal_pending(current)) {
1997 ret = -ERESTARTSYS;
1998 break;
1999 }
2000
2001 rcu_read_lock();
2002 tp = rcu_dereference(first_peer_device(device)->connection->net_conf)->two_primaries;
2003 rcu_read_unlock();
2004
2005 if (!tp)
2006 break;
2007
2008 /* Only need to wait if two_primaries is enabled */
2009 prepare_to_wait(&device->seq_wait, &wait, TASK_INTERRUPTIBLE);
2010 spin_unlock(&device->peer_seq_lock);
2011 rcu_read_lock();
2012 timeout = rcu_dereference(peer_device->connection->net_conf)->ping_timeo*HZ/10;
2013 rcu_read_unlock();
2014 timeout = schedule_timeout(timeout);
2015 spin_lock(&device->peer_seq_lock);
2016 if (!timeout) {
2017 ret = -ETIMEDOUT;
2018 drbd_err(device, "Timed out waiting for missing ack packets; disconnecting\n");
2019 break;
2020 }
2021 }
2022 spin_unlock(&device->peer_seq_lock);
2023 finish_wait(&device->seq_wait, &wait);
2024 return ret;
2025}
2026
2027/* see also bio_flags_to_wire()
2028 * DRBD_REQ_*, because we need to semantically map the flags to data packet
2029 * flags and back. We may replicate to other kernel versions. */
2030static unsigned long wire_flags_to_bio(struct drbd_device *device, u32 dpf)
2031{
2032 return (dpf & DP_RW_SYNC ? REQ_SYNC : 0) |
2033 (dpf & DP_FUA ? REQ_FUA : 0) |
2034 (dpf & DP_FLUSH ? REQ_FLUSH : 0) |
2035 (dpf & DP_DISCARD ? REQ_DISCARD : 0);
2036}
2037
2038static void fail_postponed_requests(struct drbd_device *device, sector_t sector,
2039 unsigned int size)
2040{
2041 struct drbd_interval *i;
2042
2043 repeat:
2044 drbd_for_each_overlap(i, &device->write_requests, sector, size) {
2045 struct drbd_request *req;
2046 struct bio_and_error m;
2047
2048 if (!i->local)
2049 continue;
2050 req = container_of(i, struct drbd_request, i);
2051 if (!(req->rq_state & RQ_POSTPONED))
2052 continue;
2053 req->rq_state &= ~RQ_POSTPONED;
2054 __req_mod(req, NEG_ACKED, &m);
2055 spin_unlock_irq(&device->resource->req_lock);
2056 if (m.bio)
2057 complete_master_bio(device, &m);
2058 spin_lock_irq(&device->resource->req_lock);
2059 goto repeat;
2060 }
2061}
2062
2063static int handle_write_conflicts(struct drbd_device *device,
2064 struct drbd_peer_request *peer_req)
2065{
2066 struct drbd_connection *connection = first_peer_device(device)->connection;
2067 bool resolve_conflicts = test_bit(RESOLVE_CONFLICTS, &connection->flags);
2068 sector_t sector = peer_req->i.sector;
2069 const unsigned int size = peer_req->i.size;
2070 struct drbd_interval *i;
2071 bool equal;
2072 int err;
2073
2074 /*
2075 * Inserting the peer request into the write_requests tree will prevent
2076 * new conflicting local requests from being added.
2077 */
2078 drbd_insert_interval(&device->write_requests, &peer_req->i);
2079
2080 repeat:
2081 drbd_for_each_overlap(i, &device->write_requests, sector, size) {
2082 if (i == &peer_req->i)
2083 continue;
2084
2085 if (!i->local) {
2086 /*
2087 * Our peer has sent a conflicting remote request; this
2088 * should not happen in a two-node setup. Wait for the
2089 * earlier peer request to complete.
2090 */
2091 err = drbd_wait_misc(device, i);
2092 if (err)
2093 goto out;
2094 goto repeat;
2095 }
2096
2097 equal = i->sector == sector && i->size == size;
2098 if (resolve_conflicts) {
2099 /*
2100 * If the peer request is fully contained within the
2101 * overlapping request, it can be considered overwritten
2102 * and thus superseded; otherwise, it will be retried
2103 * once all overlapping requests have completed.
2104 */
2105 bool superseded = i->sector <= sector && i->sector +
2106 (i->size >> 9) >= sector + (size >> 9);
2107
2108 if (!equal)
2109 drbd_alert(device, "Concurrent writes detected: "
2110 "local=%llus +%u, remote=%llus +%u, "
2111 "assuming %s came first\n",
2112 (unsigned long long)i->sector, i->size,
2113 (unsigned long long)sector, size,
2114 superseded ? "local" : "remote");
2115
2116 inc_unacked(device);
2117 peer_req->w.cb = superseded ? e_send_superseded :
2118 e_send_retry_write;
2119 list_add_tail(&peer_req->w.list, &device->done_ee);
2120 wake_asender(first_peer_device(device)->connection);
2121
2122 err = -ENOENT;
2123 goto out;
2124 } else {
2125 struct drbd_request *req =
2126 container_of(i, struct drbd_request, i);
2127
2128 if (!equal)
2129 drbd_alert(device, "Concurrent writes detected: "
2130 "local=%llus +%u, remote=%llus +%u\n",
2131 (unsigned long long)i->sector, i->size,
2132 (unsigned long long)sector, size);
2133
2134 if (req->rq_state & RQ_LOCAL_PENDING ||
2135 !(req->rq_state & RQ_POSTPONED)) {
2136 /*
2137 * Wait for the node with the discard flag to
2138 * decide if this request has been superseded
2139 * or needs to be retried.
2140 * Requests that have been superseded will
2141 * disappear from the write_requests tree.
2142 *
2143 * In addition, wait for the conflicting
2144 * request to finish locally before submitting
2145 * the conflicting peer request.
2146 */
2147 err = drbd_wait_misc(device, &req->i);
2148 if (err) {
2149 _conn_request_state(first_peer_device(device)->connection,
2150 NS(conn, C_TIMEOUT),
2151 CS_HARD);
2152 fail_postponed_requests(device, sector, size);
2153 goto out;
2154 }
2155 goto repeat;
2156 }
2157 /*
2158 * Remember to restart the conflicting requests after
2159 * the new peer request has completed.
2160 */
2161 peer_req->flags |= EE_RESTART_REQUESTS;
2162 }
2163 }
2164 err = 0;
2165
2166 out:
2167 if (err)
2168 drbd_remove_epoch_entry_interval(device, peer_req);
2169 return err;
2170}
2171
2172/* mirrored write */
2173static int receive_Data(struct drbd_connection *connection, struct packet_info *pi)
2174{
2175 struct drbd_peer_device *peer_device;
2176 struct drbd_device *device;
2177 sector_t sector;
2178 struct drbd_peer_request *peer_req;
2179 struct p_data *p = pi->data;
2180 u32 peer_seq = be32_to_cpu(p->seq_num);
2181 int rw = WRITE;
2182 u32 dp_flags;
2183 int err, tp;
2184
2185 peer_device = conn_peer_device(connection, pi->vnr);
2186 if (!peer_device)
2187 return -EIO;
2188 device = peer_device->device;
2189
2190 if (!get_ldev(device)) {
2191 int err2;
2192
2193 err = wait_for_and_update_peer_seq(peer_device, peer_seq);
2194 drbd_send_ack_dp(peer_device, P_NEG_ACK, p, pi->size);
2195 atomic_inc(&connection->current_epoch->epoch_size);
2196 err2 = drbd_drain_block(peer_device, pi->size);
2197 if (!err)
2198 err = err2;
2199 return err;
2200 }
2201
2202 /*
2203 * Corresponding put_ldev done either below (on various errors), or in
2204 * drbd_peer_request_endio, if we successfully submit the data at the
2205 * end of this function.
2206 */
2207
2208 sector = be64_to_cpu(p->sector);
2209 peer_req = read_in_block(peer_device, p->block_id, sector, pi->size);
2210 if (!peer_req) {
2211 put_ldev(device);
2212 return -EIO;
2213 }
2214
2215 peer_req->w.cb = e_end_block;
2216
2217 dp_flags = be32_to_cpu(p->dp_flags);
2218 rw |= wire_flags_to_bio(device, dp_flags);
2219 if (peer_req->pages == NULL) {
2220 D_ASSERT(device, peer_req->i.size == 0);
2221 D_ASSERT(device, dp_flags & DP_FLUSH);
2222 }
2223
2224 if (dp_flags & DP_MAY_SET_IN_SYNC)
2225 peer_req->flags |= EE_MAY_SET_IN_SYNC;
2226
2227 spin_lock(&connection->epoch_lock);
2228 peer_req->epoch = connection->current_epoch;
2229 atomic_inc(&peer_req->epoch->epoch_size);
2230 atomic_inc(&peer_req->epoch->active);
2231 spin_unlock(&connection->epoch_lock);
2232
2233 rcu_read_lock();
2234 tp = rcu_dereference(peer_device->connection->net_conf)->two_primaries;
2235 rcu_read_unlock();
2236 if (tp) {
2237 peer_req->flags |= EE_IN_INTERVAL_TREE;
2238 err = wait_for_and_update_peer_seq(peer_device, peer_seq);
2239 if (err)
2240 goto out_interrupted;
2241 spin_lock_irq(&device->resource->req_lock);
2242 err = handle_write_conflicts(device, peer_req);
2243 if (err) {
2244 spin_unlock_irq(&device->resource->req_lock);
2245 if (err == -ENOENT) {
2246 put_ldev(device);
2247 return 0;
2248 }
2249 goto out_interrupted;
2250 }
2251 } else {
2252 update_peer_seq(peer_device, peer_seq);
2253 spin_lock_irq(&device->resource->req_lock);
2254 }
2255 list_add(&peer_req->w.list, &device->active_ee);
2256 spin_unlock_irq(&device->resource->req_lock);
2257
2258 if (device->state.conn == C_SYNC_TARGET)
2259 wait_event(device->ee_wait, !overlapping_resync_write(device, peer_req));
2260
2261 if (peer_device->connection->agreed_pro_version < 100) {
2262 rcu_read_lock();
2263 switch (rcu_dereference(peer_device->connection->net_conf)->wire_protocol) {
2264 case DRBD_PROT_C:
2265 dp_flags |= DP_SEND_WRITE_ACK;
2266 break;
2267 case DRBD_PROT_B:
2268 dp_flags |= DP_SEND_RECEIVE_ACK;
2269 break;
2270 }
2271 rcu_read_unlock();
2272 }
2273
2274 if (dp_flags & DP_SEND_WRITE_ACK) {
2275 peer_req->flags |= EE_SEND_WRITE_ACK;
2276 inc_unacked(device);
2277 /* corresponding dec_unacked() in e_end_block()
2278 * respective _drbd_clear_done_ee */
2279 }
2280
2281 if (dp_flags & DP_SEND_RECEIVE_ACK) {
2282 /* I really don't like it that the receiver thread
2283 * sends on the msock, but anyways */
2284 drbd_send_ack(first_peer_device(device), P_RECV_ACK, peer_req);
2285 }
2286
2287 if (device->state.pdsk < D_INCONSISTENT) {
2288 /* In case we have the only disk of the cluster, */
2289 drbd_set_out_of_sync(device, peer_req->i.sector, peer_req->i.size);
2290 peer_req->flags |= EE_CALL_AL_COMPLETE_IO;
2291 peer_req->flags &= ~EE_MAY_SET_IN_SYNC;
2292 drbd_al_begin_io(device, &peer_req->i, true);
2293 }
2294
2295 err = drbd_submit_peer_request(device, peer_req, rw, DRBD_FAULT_DT_WR);
2296 if (!err)
2297 return 0;
2298
2299 /* don't care for the reason here */
2300 drbd_err(device, "submit failed, triggering re-connect\n");
2301 spin_lock_irq(&device->resource->req_lock);
2302 list_del(&peer_req->w.list);
2303 drbd_remove_epoch_entry_interval(device, peer_req);
2304 spin_unlock_irq(&device->resource->req_lock);
2305 if (peer_req->flags & EE_CALL_AL_COMPLETE_IO)
2306 drbd_al_complete_io(device, &peer_req->i);
2307
2308out_interrupted:
2309 drbd_may_finish_epoch(connection, peer_req->epoch, EV_PUT + EV_CLEANUP);
2310 put_ldev(device);
2311 drbd_free_peer_req(device, peer_req);
2312 return err;
2313}
2314
2315/* We may throttle resync, if the lower device seems to be busy,
2316 * and current sync rate is above c_min_rate.
2317 *
2318 * To decide whether or not the lower device is busy, we use a scheme similar
2319 * to MD RAID is_mddev_idle(): if the partition stats reveal "significant"
2320 * (more than 64 sectors) of activity we cannot account for with our own resync
2321 * activity, it obviously is "busy".
2322 *
2323 * The current sync rate used here uses only the most recent two step marks,
2324 * to have a short time average so we can react faster.
2325 */
2326int drbd_rs_should_slow_down(struct drbd_device *device, sector_t sector)
2327{
2328 struct gendisk *disk = device->ldev->backing_bdev->bd_contains->bd_disk;
2329 unsigned long db, dt, dbdt;
2330 struct lc_element *tmp;
2331 int curr_events;
2332 int throttle = 0;
2333 unsigned int c_min_rate;
2334
2335 rcu_read_lock();
2336 c_min_rate = rcu_dereference(device->ldev->disk_conf)->c_min_rate;
2337 rcu_read_unlock();
2338
2339 /* feature disabled? */
2340 if (c_min_rate == 0)
2341 return 0;
2342
2343 spin_lock_irq(&device->al_lock);
2344 tmp = lc_find(device->resync, BM_SECT_TO_EXT(sector));
2345 if (tmp) {
2346 struct bm_extent *bm_ext = lc_entry(tmp, struct bm_extent, lce);
2347 if (test_bit(BME_PRIORITY, &bm_ext->flags)) {
2348 spin_unlock_irq(&device->al_lock);
2349 return 0;
2350 }
2351 /* Do not slow down if app IO is already waiting for this extent */
2352 }
2353 spin_unlock_irq(&device->al_lock);
2354
2355 curr_events = (int)part_stat_read(&disk->part0, sectors[0]) +
2356 (int)part_stat_read(&disk->part0, sectors[1]) -
2357 atomic_read(&device->rs_sect_ev);
2358
2359 if (!device->rs_last_events || curr_events - device->rs_last_events > 64) {
2360 unsigned long rs_left;
2361 int i;
2362
2363 device->rs_last_events = curr_events;
2364
2365 /* sync speed average over the last 2*DRBD_SYNC_MARK_STEP,
2366 * approx. */
2367 i = (device->rs_last_mark + DRBD_SYNC_MARKS-1) % DRBD_SYNC_MARKS;
2368
2369 if (device->state.conn == C_VERIFY_S || device->state.conn == C_VERIFY_T)
2370 rs_left = device->ov_left;
2371 else
2372 rs_left = drbd_bm_total_weight(device) - device->rs_failed;
2373
2374 dt = ((long)jiffies - (long)device->rs_mark_time[i]) / HZ;
2375 if (!dt)
2376 dt++;
2377 db = device->rs_mark_left[i] - rs_left;
2378 dbdt = Bit2KB(db/dt);
2379
2380 if (dbdt > c_min_rate)
2381 throttle = 1;
2382 }
2383 return throttle;
2384}
2385
2386
2387static int receive_DataRequest(struct drbd_connection *connection, struct packet_info *pi)
2388{
2389 struct drbd_peer_device *peer_device;
2390 struct drbd_device *device;
2391 sector_t sector;
2392 sector_t capacity;
2393 struct drbd_peer_request *peer_req;
2394 struct digest_info *di = NULL;
2395 int size, verb;
2396 unsigned int fault_type;
2397 struct p_block_req *p = pi->data;
2398
2399 peer_device = conn_peer_device(connection, pi->vnr);
2400 if (!peer_device)
2401 return -EIO;
2402 device = peer_device->device;
2403 capacity = drbd_get_capacity(device->this_bdev);
2404
2405 sector = be64_to_cpu(p->sector);
2406 size = be32_to_cpu(p->blksize);
2407
2408 if (size <= 0 || !IS_ALIGNED(size, 512) || size > DRBD_MAX_BIO_SIZE) {
2409 drbd_err(device, "%s:%d: sector: %llus, size: %u\n", __FILE__, __LINE__,
2410 (unsigned long long)sector, size);
2411 return -EINVAL;
2412 }
2413 if (sector + (size>>9) > capacity) {
2414 drbd_err(device, "%s:%d: sector: %llus, size: %u\n", __FILE__, __LINE__,
2415 (unsigned long long)sector, size);
2416 return -EINVAL;
2417 }
2418
2419 if (!get_ldev_if_state(device, D_UP_TO_DATE)) {
2420 verb = 1;
2421 switch (pi->cmd) {
2422 case P_DATA_REQUEST:
2423 drbd_send_ack_rp(peer_device, P_NEG_DREPLY, p);
2424 break;
2425 case P_RS_DATA_REQUEST:
2426 case P_CSUM_RS_REQUEST:
2427 case P_OV_REQUEST:
2428 drbd_send_ack_rp(peer_device, P_NEG_RS_DREPLY , p);
2429 break;
2430 case P_OV_REPLY:
2431 verb = 0;
2432 dec_rs_pending(device);
2433 drbd_send_ack_ex(peer_device, P_OV_RESULT, sector, size, ID_IN_SYNC);
2434 break;
2435 default:
2436 BUG();
2437 }
2438 if (verb && __ratelimit(&drbd_ratelimit_state))
2439 drbd_err(device, "Can not satisfy peer's read request, "
2440 "no local data.\n");
2441
2442 /* drain possibly payload */
2443 return drbd_drain_block(peer_device, pi->size);
2444 }
2445
2446 /* GFP_NOIO, because we must not cause arbitrary write-out: in a DRBD
2447 * "criss-cross" setup, that might cause write-out on some other DRBD,
2448 * which in turn might block on the other node at this very place. */
2449 peer_req = drbd_alloc_peer_req(peer_device, p->block_id, sector, size, GFP_NOIO);
2450 if (!peer_req) {
2451 put_ldev(device);
2452 return -ENOMEM;
2453 }
2454
2455 switch (pi->cmd) {
2456 case P_DATA_REQUEST:
2457 peer_req->w.cb = w_e_end_data_req;
2458 fault_type = DRBD_FAULT_DT_RD;
2459 /* application IO, don't drbd_rs_begin_io */
2460 goto submit;
2461
2462 case P_RS_DATA_REQUEST:
2463 peer_req->w.cb = w_e_end_rsdata_req;
2464 fault_type = DRBD_FAULT_RS_RD;
2465 /* used in the sector offset progress display */
2466 device->bm_resync_fo = BM_SECT_TO_BIT(sector);
2467 break;
2468
2469 case P_OV_REPLY:
2470 case P_CSUM_RS_REQUEST:
2471 fault_type = DRBD_FAULT_RS_RD;
2472 di = kmalloc(sizeof(*di) + pi->size, GFP_NOIO);
2473 if (!di)
2474 goto out_free_e;
2475
2476 di->digest_size = pi->size;
2477 di->digest = (((char *)di)+sizeof(struct digest_info));
2478
2479 peer_req->digest = di;
2480 peer_req->flags |= EE_HAS_DIGEST;
2481
2482 if (drbd_recv_all(peer_device->connection, di->digest, pi->size))
2483 goto out_free_e;
2484
2485 if (pi->cmd == P_CSUM_RS_REQUEST) {
2486 D_ASSERT(device, peer_device->connection->agreed_pro_version >= 89);
2487 peer_req->w.cb = w_e_end_csum_rs_req;
2488 /* used in the sector offset progress display */
2489 device->bm_resync_fo = BM_SECT_TO_BIT(sector);
2490 } else if (pi->cmd == P_OV_REPLY) {
2491 /* track progress, we may need to throttle */
2492 atomic_add(size >> 9, &device->rs_sect_in);
2493 peer_req->w.cb = w_e_end_ov_reply;
2494 dec_rs_pending(device);
2495 /* drbd_rs_begin_io done when we sent this request,
2496 * but accounting still needs to be done. */
2497 goto submit_for_resync;
2498 }
2499 break;
2500
2501 case P_OV_REQUEST:
2502 if (device->ov_start_sector == ~(sector_t)0 &&
2503 peer_device->connection->agreed_pro_version >= 90) {
2504 unsigned long now = jiffies;
2505 int i;
2506 device->ov_start_sector = sector;
2507 device->ov_position = sector;
2508 device->ov_left = drbd_bm_bits(device) - BM_SECT_TO_BIT(sector);
2509 device->rs_total = device->ov_left;
2510 for (i = 0; i < DRBD_SYNC_MARKS; i++) {
2511 device->rs_mark_left[i] = device->ov_left;
2512 device->rs_mark_time[i] = now;
2513 }
2514 drbd_info(device, "Online Verify start sector: %llu\n",
2515 (unsigned long long)sector);
2516 }
2517 peer_req->w.cb = w_e_end_ov_req;
2518 fault_type = DRBD_FAULT_RS_RD;
2519 break;
2520
2521 default:
2522 BUG();
2523 }
2524
2525 /* Throttle, drbd_rs_begin_io and submit should become asynchronous
2526 * wrt the receiver, but it is not as straightforward as it may seem.
2527 * Various places in the resync start and stop logic assume resync
2528 * requests are processed in order, requeuing this on the worker thread
2529 * introduces a bunch of new code for synchronization between threads.
2530 *
2531 * Unlimited throttling before drbd_rs_begin_io may stall the resync
2532 * "forever", throttling after drbd_rs_begin_io will lock that extent
2533 * for application writes for the same time. For now, just throttle
2534 * here, where the rest of the code expects the receiver to sleep for
2535 * a while, anyways.
2536 */
2537
2538 /* Throttle before drbd_rs_begin_io, as that locks out application IO;
2539 * this defers syncer requests for some time, before letting at least
2540 * on request through. The resync controller on the receiving side
2541 * will adapt to the incoming rate accordingly.
2542 *
2543 * We cannot throttle here if remote is Primary/SyncTarget:
2544 * we would also throttle its application reads.
2545 * In that case, throttling is done on the SyncTarget only.
2546 */
2547 if (device->state.peer != R_PRIMARY && drbd_rs_should_slow_down(device, sector))
2548 schedule_timeout_uninterruptible(HZ/10);
2549 if (drbd_rs_begin_io(device, sector))
2550 goto out_free_e;
2551
2552submit_for_resync:
2553 atomic_add(size >> 9, &device->rs_sect_ev);
2554
2555submit:
2556 inc_unacked(device);
2557 spin_lock_irq(&device->resource->req_lock);
2558 list_add_tail(&peer_req->w.list, &device->read_ee);
2559 spin_unlock_irq(&device->resource->req_lock);
2560
2561 if (drbd_submit_peer_request(device, peer_req, READ, fault_type) == 0)
2562 return 0;
2563
2564 /* don't care for the reason here */
2565 drbd_err(device, "submit failed, triggering re-connect\n");
2566 spin_lock_irq(&device->resource->req_lock);
2567 list_del(&peer_req->w.list);
2568 spin_unlock_irq(&device->resource->req_lock);
2569 /* no drbd_rs_complete_io(), we are dropping the connection anyways */
2570
2571out_free_e:
2572 put_ldev(device);
2573 drbd_free_peer_req(device, peer_req);
2574 return -EIO;
2575}
2576
2577/**
2578 * drbd_asb_recover_0p - Recover after split-brain with no remaining primaries
2579 */
2580static int drbd_asb_recover_0p(struct drbd_peer_device *peer_device) __must_hold(local)
2581{
2582 struct drbd_device *device = peer_device->device;
2583 int self, peer, rv = -100;
2584 unsigned long ch_self, ch_peer;
2585 enum drbd_after_sb_p after_sb_0p;
2586
2587 self = device->ldev->md.uuid[UI_BITMAP] & 1;
2588 peer = device->p_uuid[UI_BITMAP] & 1;
2589
2590 ch_peer = device->p_uuid[UI_SIZE];
2591 ch_self = device->comm_bm_set;
2592
2593 rcu_read_lock();
2594 after_sb_0p = rcu_dereference(peer_device->connection->net_conf)->after_sb_0p;
2595 rcu_read_unlock();
2596 switch (after_sb_0p) {
2597 case ASB_CONSENSUS:
2598 case ASB_DISCARD_SECONDARY:
2599 case ASB_CALL_HELPER:
2600 case ASB_VIOLENTLY:
2601 drbd_err(device, "Configuration error.\n");
2602 break;
2603 case ASB_DISCONNECT:
2604 break;
2605 case ASB_DISCARD_YOUNGER_PRI:
2606 if (self == 0 && peer == 1) {
2607 rv = -1;
2608 break;
2609 }
2610 if (self == 1 && peer == 0) {
2611 rv = 1;
2612 break;
2613 }
2614 /* Else fall through to one of the other strategies... */
2615 case ASB_DISCARD_OLDER_PRI:
2616 if (self == 0 && peer == 1) {
2617 rv = 1;
2618 break;
2619 }
2620 if (self == 1 && peer == 0) {
2621 rv = -1;
2622 break;
2623 }
2624 /* Else fall through to one of the other strategies... */
2625 drbd_warn(device, "Discard younger/older primary did not find a decision\n"
2626 "Using discard-least-changes instead\n");
2627 case ASB_DISCARD_ZERO_CHG:
2628 if (ch_peer == 0 && ch_self == 0) {
2629 rv = test_bit(RESOLVE_CONFLICTS, &peer_device->connection->flags)
2630 ? -1 : 1;
2631 break;
2632 } else {
2633 if (ch_peer == 0) { rv = 1; break; }
2634 if (ch_self == 0) { rv = -1; break; }
2635 }
2636 if (after_sb_0p == ASB_DISCARD_ZERO_CHG)
2637 break;
2638 case ASB_DISCARD_LEAST_CHG:
2639 if (ch_self < ch_peer)
2640 rv = -1;
2641 else if (ch_self > ch_peer)
2642 rv = 1;
2643 else /* ( ch_self == ch_peer ) */
2644 /* Well, then use something else. */
2645 rv = test_bit(RESOLVE_CONFLICTS, &peer_device->connection->flags)
2646 ? -1 : 1;
2647 break;
2648 case ASB_DISCARD_LOCAL:
2649 rv = -1;
2650 break;
2651 case ASB_DISCARD_REMOTE:
2652 rv = 1;
2653 }
2654
2655 return rv;
2656}
2657
2658/**
2659 * drbd_asb_recover_1p - Recover after split-brain with one remaining primary
2660 */
2661static int drbd_asb_recover_1p(struct drbd_peer_device *peer_device) __must_hold(local)
2662{
2663 struct drbd_device *device = peer_device->device;
2664 int hg, rv = -100;
2665 enum drbd_after_sb_p after_sb_1p;
2666
2667 rcu_read_lock();
2668 after_sb_1p = rcu_dereference(peer_device->connection->net_conf)->after_sb_1p;
2669 rcu_read_unlock();
2670 switch (after_sb_1p) {
2671 case ASB_DISCARD_YOUNGER_PRI:
2672 case ASB_DISCARD_OLDER_PRI:
2673 case ASB_DISCARD_LEAST_CHG:
2674 case ASB_DISCARD_LOCAL:
2675 case ASB_DISCARD_REMOTE:
2676 case ASB_DISCARD_ZERO_CHG:
2677 drbd_err(device, "Configuration error.\n");
2678 break;
2679 case ASB_DISCONNECT:
2680 break;
2681 case ASB_CONSENSUS:
2682 hg = drbd_asb_recover_0p(peer_device);
2683 if (hg == -1 && device->state.role == R_SECONDARY)
2684 rv = hg;
2685 if (hg == 1 && device->state.role == R_PRIMARY)
2686 rv = hg;
2687 break;
2688 case ASB_VIOLENTLY:
2689 rv = drbd_asb_recover_0p(peer_device);
2690 break;
2691 case ASB_DISCARD_SECONDARY:
2692 return device->state.role == R_PRIMARY ? 1 : -1;
2693 case ASB_CALL_HELPER:
2694 hg = drbd_asb_recover_0p(peer_device);
2695 if (hg == -1 && device->state.role == R_PRIMARY) {
2696 enum drbd_state_rv rv2;
2697
2698 /* drbd_change_state() does not sleep while in SS_IN_TRANSIENT_STATE,
2699 * we might be here in C_WF_REPORT_PARAMS which is transient.
2700 * we do not need to wait for the after state change work either. */
2701 rv2 = drbd_change_state(device, CS_VERBOSE, NS(role, R_SECONDARY));
2702 if (rv2 != SS_SUCCESS) {
2703 drbd_khelper(device, "pri-lost-after-sb");
2704 } else {
2705 drbd_warn(device, "Successfully gave up primary role.\n");
2706 rv = hg;
2707 }
2708 } else
2709 rv = hg;
2710 }
2711
2712 return rv;
2713}
2714
2715/**
2716 * drbd_asb_recover_2p - Recover after split-brain with two remaining primaries
2717 */
2718static int drbd_asb_recover_2p(struct drbd_peer_device *peer_device) __must_hold(local)
2719{
2720 struct drbd_device *device = peer_device->device;
2721 int hg, rv = -100;
2722 enum drbd_after_sb_p after_sb_2p;
2723
2724 rcu_read_lock();
2725 after_sb_2p = rcu_dereference(peer_device->connection->net_conf)->after_sb_2p;
2726 rcu_read_unlock();
2727 switch (after_sb_2p) {
2728 case ASB_DISCARD_YOUNGER_PRI:
2729 case ASB_DISCARD_OLDER_PRI:
2730 case ASB_DISCARD_LEAST_CHG:
2731 case ASB_DISCARD_LOCAL:
2732 case ASB_DISCARD_REMOTE:
2733 case ASB_CONSENSUS:
2734 case ASB_DISCARD_SECONDARY:
2735 case ASB_DISCARD_ZERO_CHG:
2736 drbd_err(device, "Configuration error.\n");
2737 break;
2738 case ASB_VIOLENTLY:
2739 rv = drbd_asb_recover_0p(peer_device);
2740 break;
2741 case ASB_DISCONNECT:
2742 break;
2743 case ASB_CALL_HELPER:
2744 hg = drbd_asb_recover_0p(peer_device);
2745 if (hg == -1) {
2746 enum drbd_state_rv rv2;
2747
2748 /* drbd_change_state() does not sleep while in SS_IN_TRANSIENT_STATE,
2749 * we might be here in C_WF_REPORT_PARAMS which is transient.
2750 * we do not need to wait for the after state change work either. */
2751 rv2 = drbd_change_state(device, CS_VERBOSE, NS(role, R_SECONDARY));
2752 if (rv2 != SS_SUCCESS) {
2753 drbd_khelper(device, "pri-lost-after-sb");
2754 } else {
2755 drbd_warn(device, "Successfully gave up primary role.\n");
2756 rv = hg;
2757 }
2758 } else
2759 rv = hg;
2760 }
2761
2762 return rv;
2763}
2764
2765static void drbd_uuid_dump(struct drbd_device *device, char *text, u64 *uuid,
2766 u64 bits, u64 flags)
2767{
2768 if (!uuid) {
2769 drbd_info(device, "%s uuid info vanished while I was looking!\n", text);
2770 return;
2771 }
2772 drbd_info(device, "%s %016llX:%016llX:%016llX:%016llX bits:%llu flags:%llX\n",
2773 text,
2774 (unsigned long long)uuid[UI_CURRENT],
2775 (unsigned long long)uuid[UI_BITMAP],
2776 (unsigned long long)uuid[UI_HISTORY_START],
2777 (unsigned long long)uuid[UI_HISTORY_END],
2778 (unsigned long long)bits,
2779 (unsigned long long)flags);
2780}
2781
2782/*
2783 100 after split brain try auto recover
2784 2 C_SYNC_SOURCE set BitMap
2785 1 C_SYNC_SOURCE use BitMap
2786 0 no Sync
2787 -1 C_SYNC_TARGET use BitMap
2788 -2 C_SYNC_TARGET set BitMap
2789 -100 after split brain, disconnect
2790-1000 unrelated data
2791-1091 requires proto 91
2792-1096 requires proto 96
2793 */
2794static int drbd_uuid_compare(struct drbd_device *device, int *rule_nr) __must_hold(local)
2795{
2796 u64 self, peer;
2797 int i, j;
2798
2799 self = device->ldev->md.uuid[UI_CURRENT] & ~((u64)1);
2800 peer = device->p_uuid[UI_CURRENT] & ~((u64)1);
2801
2802 *rule_nr = 10;
2803 if (self == UUID_JUST_CREATED && peer == UUID_JUST_CREATED)
2804 return 0;
2805
2806 *rule_nr = 20;
2807 if ((self == UUID_JUST_CREATED || self == (u64)0) &&
2808 peer != UUID_JUST_CREATED)
2809 return -2;
2810
2811 *rule_nr = 30;
2812 if (self != UUID_JUST_CREATED &&
2813 (peer == UUID_JUST_CREATED || peer == (u64)0))
2814 return 2;
2815
2816 if (self == peer) {
2817 int rct, dc; /* roles at crash time */
2818
2819 if (device->p_uuid[UI_BITMAP] == (u64)0 && device->ldev->md.uuid[UI_BITMAP] != (u64)0) {
2820
2821 if (first_peer_device(device)->connection->agreed_pro_version < 91)
2822 return -1091;
2823
2824 if ((device->ldev->md.uuid[UI_BITMAP] & ~((u64)1)) == (device->p_uuid[UI_HISTORY_START] & ~((u64)1)) &&
2825 (device->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) == (device->p_uuid[UI_HISTORY_START + 1] & ~((u64)1))) {
2826 drbd_info(device, "was SyncSource, missed the resync finished event, corrected myself:\n");
2827 drbd_uuid_move_history(device);
2828 device->ldev->md.uuid[UI_HISTORY_START] = device->ldev->md.uuid[UI_BITMAP];
2829 device->ldev->md.uuid[UI_BITMAP] = 0;
2830
2831 drbd_uuid_dump(device, "self", device->ldev->md.uuid,
2832 device->state.disk >= D_NEGOTIATING ? drbd_bm_total_weight(device) : 0, 0);
2833 *rule_nr = 34;
2834 } else {
2835 drbd_info(device, "was SyncSource (peer failed to write sync_uuid)\n");
2836 *rule_nr = 36;
2837 }
2838
2839 return 1;
2840 }
2841
2842 if (device->ldev->md.uuid[UI_BITMAP] == (u64)0 && device->p_uuid[UI_BITMAP] != (u64)0) {
2843
2844 if (first_peer_device(device)->connection->agreed_pro_version < 91)
2845 return -1091;
2846
2847 if ((device->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) == (device->p_uuid[UI_BITMAP] & ~((u64)1)) &&
2848 (device->ldev->md.uuid[UI_HISTORY_START + 1] & ~((u64)1)) == (device->p_uuid[UI_HISTORY_START] & ~((u64)1))) {
2849 drbd_info(device, "was SyncTarget, peer missed the resync finished event, corrected peer:\n");
2850
2851 device->p_uuid[UI_HISTORY_START + 1] = device->p_uuid[UI_HISTORY_START];
2852 device->p_uuid[UI_HISTORY_START] = device->p_uuid[UI_BITMAP];
2853 device->p_uuid[UI_BITMAP] = 0UL;
2854
2855 drbd_uuid_dump(device, "peer", device->p_uuid, device->p_uuid[UI_SIZE], device->p_uuid[UI_FLAGS]);
2856 *rule_nr = 35;
2857 } else {
2858 drbd_info(device, "was SyncTarget (failed to write sync_uuid)\n");
2859 *rule_nr = 37;
2860 }
2861
2862 return -1;
2863 }
2864
2865 /* Common power [off|failure] */
2866 rct = (test_bit(CRASHED_PRIMARY, &device->flags) ? 1 : 0) +
2867 (device->p_uuid[UI_FLAGS] & 2);
2868 /* lowest bit is set when we were primary,
2869 * next bit (weight 2) is set when peer was primary */
2870 *rule_nr = 40;
2871
2872 switch (rct) {
2873 case 0: /* !self_pri && !peer_pri */ return 0;
2874 case 1: /* self_pri && !peer_pri */ return 1;
2875 case 2: /* !self_pri && peer_pri */ return -1;
2876 case 3: /* self_pri && peer_pri */
2877 dc = test_bit(RESOLVE_CONFLICTS, &first_peer_device(device)->connection->flags);
2878 return dc ? -1 : 1;
2879 }
2880 }
2881
2882 *rule_nr = 50;
2883 peer = device->p_uuid[UI_BITMAP] & ~((u64)1);
2884 if (self == peer)
2885 return -1;
2886
2887 *rule_nr = 51;
2888 peer = device->p_uuid[UI_HISTORY_START] & ~((u64)1);
2889 if (self == peer) {
2890 if (first_peer_device(device)->connection->agreed_pro_version < 96 ?
2891 (device->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) ==
2892 (device->p_uuid[UI_HISTORY_START + 1] & ~((u64)1)) :
2893 peer + UUID_NEW_BM_OFFSET == (device->p_uuid[UI_BITMAP] & ~((u64)1))) {
2894 /* The last P_SYNC_UUID did not get though. Undo the last start of
2895 resync as sync source modifications of the peer's UUIDs. */
2896
2897 if (first_peer_device(device)->connection->agreed_pro_version < 91)
2898 return -1091;
2899
2900 device->p_uuid[UI_BITMAP] = device->p_uuid[UI_HISTORY_START];
2901 device->p_uuid[UI_HISTORY_START] = device->p_uuid[UI_HISTORY_START + 1];
2902
2903 drbd_info(device, "Lost last syncUUID packet, corrected:\n");
2904 drbd_uuid_dump(device, "peer", device->p_uuid, device->p_uuid[UI_SIZE], device->p_uuid[UI_FLAGS]);
2905
2906 return -1;
2907 }
2908 }
2909
2910 *rule_nr = 60;
2911 self = device->ldev->md.uuid[UI_CURRENT] & ~((u64)1);
2912 for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
2913 peer = device->p_uuid[i] & ~((u64)1);
2914 if (self == peer)
2915 return -2;
2916 }
2917
2918 *rule_nr = 70;
2919 self = device->ldev->md.uuid[UI_BITMAP] & ~((u64)1);
2920 peer = device->p_uuid[UI_CURRENT] & ~((u64)1);
2921 if (self == peer)
2922 return 1;
2923
2924 *rule_nr = 71;
2925 self = device->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1);
2926 if (self == peer) {
2927 if (first_peer_device(device)->connection->agreed_pro_version < 96 ?
2928 (device->ldev->md.uuid[UI_HISTORY_START + 1] & ~((u64)1)) ==
2929 (device->p_uuid[UI_HISTORY_START] & ~((u64)1)) :
2930 self + UUID_NEW_BM_OFFSET == (device->ldev->md.uuid[UI_BITMAP] & ~((u64)1))) {
2931 /* The last P_SYNC_UUID did not get though. Undo the last start of
2932 resync as sync source modifications of our UUIDs. */
2933
2934 if (first_peer_device(device)->connection->agreed_pro_version < 91)
2935 return -1091;
2936
2937 __drbd_uuid_set(device, UI_BITMAP, device->ldev->md.uuid[UI_HISTORY_START]);
2938 __drbd_uuid_set(device, UI_HISTORY_START, device->ldev->md.uuid[UI_HISTORY_START + 1]);
2939
2940 drbd_info(device, "Last syncUUID did not get through, corrected:\n");
2941 drbd_uuid_dump(device, "self", device->ldev->md.uuid,
2942 device->state.disk >= D_NEGOTIATING ? drbd_bm_total_weight(device) : 0, 0);
2943
2944 return 1;
2945 }
2946 }
2947
2948
2949 *rule_nr = 80;
2950 peer = device->p_uuid[UI_CURRENT] & ~((u64)1);
2951 for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
2952 self = device->ldev->md.uuid[i] & ~((u64)1);
2953 if (self == peer)
2954 return 2;
2955 }
2956
2957 *rule_nr = 90;
2958 self = device->ldev->md.uuid[UI_BITMAP] & ~((u64)1);
2959 peer = device->p_uuid[UI_BITMAP] & ~((u64)1);
2960 if (self == peer && self != ((u64)0))
2961 return 100;
2962
2963 *rule_nr = 100;
2964 for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
2965 self = device->ldev->md.uuid[i] & ~((u64)1);
2966 for (j = UI_HISTORY_START; j <= UI_HISTORY_END; j++) {
2967 peer = device->p_uuid[j] & ~((u64)1);
2968 if (self == peer)
2969 return -100;
2970 }
2971 }
2972
2973 return -1000;
2974}
2975
2976/* drbd_sync_handshake() returns the new conn state on success, or
2977 CONN_MASK (-1) on failure.
2978 */
2979static enum drbd_conns drbd_sync_handshake(struct drbd_peer_device *peer_device,
2980 enum drbd_role peer_role,
2981 enum drbd_disk_state peer_disk) __must_hold(local)
2982{
2983 struct drbd_device *device = peer_device->device;
2984 enum drbd_conns rv = C_MASK;
2985 enum drbd_disk_state mydisk;
2986 struct net_conf *nc;
2987 int hg, rule_nr, rr_conflict, tentative;
2988
2989 mydisk = device->state.disk;
2990 if (mydisk == D_NEGOTIATING)
2991 mydisk = device->new_state_tmp.disk;
2992
2993 drbd_info(device, "drbd_sync_handshake:\n");
2994
2995 spin_lock_irq(&device->ldev->md.uuid_lock);
2996 drbd_uuid_dump(device, "self", device->ldev->md.uuid, device->comm_bm_set, 0);
2997 drbd_uuid_dump(device, "peer", device->p_uuid,
2998 device->p_uuid[UI_SIZE], device->p_uuid[UI_FLAGS]);
2999
3000 hg = drbd_uuid_compare(device, &rule_nr);
3001 spin_unlock_irq(&device->ldev->md.uuid_lock);
3002
3003 drbd_info(device, "uuid_compare()=%d by rule %d\n", hg, rule_nr);
3004
3005 if (hg == -1000) {
3006 drbd_alert(device, "Unrelated data, aborting!\n");
3007 return C_MASK;
3008 }
3009 if (hg < -1000) {
3010 drbd_alert(device, "To resolve this both sides have to support at least protocol %d\n", -hg - 1000);
3011 return C_MASK;
3012 }
3013
3014 if ((mydisk == D_INCONSISTENT && peer_disk > D_INCONSISTENT) ||
3015 (peer_disk == D_INCONSISTENT && mydisk > D_INCONSISTENT)) {
3016 int f = (hg == -100) || abs(hg) == 2;
3017 hg = mydisk > D_INCONSISTENT ? 1 : -1;
3018 if (f)
3019 hg = hg*2;
3020 drbd_info(device, "Becoming sync %s due to disk states.\n",
3021 hg > 0 ? "source" : "target");
3022 }
3023
3024 if (abs(hg) == 100)
3025 drbd_khelper(device, "initial-split-brain");
3026
3027 rcu_read_lock();
3028 nc = rcu_dereference(peer_device->connection->net_conf);
3029
3030 if (hg == 100 || (hg == -100 && nc->always_asbp)) {
3031 int pcount = (device->state.role == R_PRIMARY)
3032 + (peer_role == R_PRIMARY);
3033 int forced = (hg == -100);
3034
3035 switch (pcount) {
3036 case 0:
3037 hg = drbd_asb_recover_0p(peer_device);
3038 break;
3039 case 1:
3040 hg = drbd_asb_recover_1p(peer_device);
3041 break;
3042 case 2:
3043 hg = drbd_asb_recover_2p(peer_device);
3044 break;
3045 }
3046 if (abs(hg) < 100) {
3047 drbd_warn(device, "Split-Brain detected, %d primaries, "
3048 "automatically solved. Sync from %s node\n",
3049 pcount, (hg < 0) ? "peer" : "this");
3050 if (forced) {
3051 drbd_warn(device, "Doing a full sync, since"
3052 " UUIDs where ambiguous.\n");
3053 hg = hg*2;
3054 }
3055 }
3056 }
3057
3058 if (hg == -100) {
3059 if (test_bit(DISCARD_MY_DATA, &device->flags) && !(device->p_uuid[UI_FLAGS]&1))
3060 hg = -1;
3061 if (!test_bit(DISCARD_MY_DATA, &device->flags) && (device->p_uuid[UI_FLAGS]&1))
3062 hg = 1;
3063
3064 if (abs(hg) < 100)
3065 drbd_warn(device, "Split-Brain detected, manually solved. "
3066 "Sync from %s node\n",
3067 (hg < 0) ? "peer" : "this");
3068 }
3069 rr_conflict = nc->rr_conflict;
3070 tentative = nc->tentative;
3071 rcu_read_unlock();
3072
3073 if (hg == -100) {
3074 /* FIXME this log message is not correct if we end up here
3075 * after an attempted attach on a diskless node.
3076 * We just refuse to attach -- well, we drop the "connection"
3077 * to that disk, in a way... */
3078 drbd_alert(device, "Split-Brain detected but unresolved, dropping connection!\n");
3079 drbd_khelper(device, "split-brain");
3080 return C_MASK;
3081 }
3082
3083 if (hg > 0 && mydisk <= D_INCONSISTENT) {
3084 drbd_err(device, "I shall become SyncSource, but I am inconsistent!\n");
3085 return C_MASK;
3086 }
3087
3088 if (hg < 0 && /* by intention we do not use mydisk here. */
3089 device->state.role == R_PRIMARY && device->state.disk >= D_CONSISTENT) {
3090 switch (rr_conflict) {
3091 case ASB_CALL_HELPER:
3092 drbd_khelper(device, "pri-lost");
3093 /* fall through */
3094 case ASB_DISCONNECT:
3095 drbd_err(device, "I shall become SyncTarget, but I am primary!\n");
3096 return C_MASK;
3097 case ASB_VIOLENTLY:
3098 drbd_warn(device, "Becoming SyncTarget, violating the stable-data"
3099 "assumption\n");
3100 }
3101 }
3102
3103 if (tentative || test_bit(CONN_DRY_RUN, &peer_device->connection->flags)) {
3104 if (hg == 0)
3105 drbd_info(device, "dry-run connect: No resync, would become Connected immediately.\n");
3106 else
3107 drbd_info(device, "dry-run connect: Would become %s, doing a %s resync.",
3108 drbd_conn_str(hg > 0 ? C_SYNC_SOURCE : C_SYNC_TARGET),
3109 abs(hg) >= 2 ? "full" : "bit-map based");
3110 return C_MASK;
3111 }
3112
3113 if (abs(hg) >= 2) {
3114 drbd_info(device, "Writing the whole bitmap, full sync required after drbd_sync_handshake.\n");
3115 if (drbd_bitmap_io(device, &drbd_bmio_set_n_write, "set_n_write from sync_handshake",
3116 BM_LOCKED_SET_ALLOWED))
3117 return C_MASK;
3118 }
3119
3120 if (hg > 0) { /* become sync source. */
3121 rv = C_WF_BITMAP_S;
3122 } else if (hg < 0) { /* become sync target */
3123 rv = C_WF_BITMAP_T;
3124 } else {
3125 rv = C_CONNECTED;
3126 if (drbd_bm_total_weight(device)) {
3127 drbd_info(device, "No resync, but %lu bits in bitmap!\n",
3128 drbd_bm_total_weight(device));
3129 }
3130 }
3131
3132 return rv;
3133}
3134
3135static enum drbd_after_sb_p convert_after_sb(enum drbd_after_sb_p peer)
3136{
3137 /* ASB_DISCARD_REMOTE - ASB_DISCARD_LOCAL is valid */
3138 if (peer == ASB_DISCARD_REMOTE)
3139 return ASB_DISCARD_LOCAL;
3140
3141 /* any other things with ASB_DISCARD_REMOTE or ASB_DISCARD_LOCAL are invalid */
3142 if (peer == ASB_DISCARD_LOCAL)
3143 return ASB_DISCARD_REMOTE;
3144
3145 /* everything else is valid if they are equal on both sides. */
3146 return peer;
3147}
3148
3149static int receive_protocol(struct drbd_connection *connection, struct packet_info *pi)
3150{
3151 struct p_protocol *p = pi->data;
3152 enum drbd_after_sb_p p_after_sb_0p, p_after_sb_1p, p_after_sb_2p;
3153 int p_proto, p_discard_my_data, p_two_primaries, cf;
3154 struct net_conf *nc, *old_net_conf, *new_net_conf = NULL;
3155 char integrity_alg[SHARED_SECRET_MAX] = "";
3156 struct crypto_hash *peer_integrity_tfm = NULL;
3157 void *int_dig_in = NULL, *int_dig_vv = NULL;
3158
3159 p_proto = be32_to_cpu(p->protocol);
3160 p_after_sb_0p = be32_to_cpu(p->after_sb_0p);
3161 p_after_sb_1p = be32_to_cpu(p->after_sb_1p);
3162 p_after_sb_2p = be32_to_cpu(p->after_sb_2p);
3163 p_two_primaries = be32_to_cpu(p->two_primaries);
3164 cf = be32_to_cpu(p->conn_flags);
3165 p_discard_my_data = cf & CF_DISCARD_MY_DATA;
3166
3167 if (connection->agreed_pro_version >= 87) {
3168 int err;
3169
3170 if (pi->size > sizeof(integrity_alg))
3171 return -EIO;
3172 err = drbd_recv_all(connection, integrity_alg, pi->size);
3173 if (err)
3174 return err;
3175 integrity_alg[SHARED_SECRET_MAX - 1] = 0;
3176 }
3177
3178 if (pi->cmd != P_PROTOCOL_UPDATE) {
3179 clear_bit(CONN_DRY_RUN, &connection->flags);
3180
3181 if (cf & CF_DRY_RUN)
3182 set_bit(CONN_DRY_RUN, &connection->flags);
3183
3184 rcu_read_lock();
3185 nc = rcu_dereference(connection->net_conf);
3186
3187 if (p_proto != nc->wire_protocol) {
3188 drbd_err(connection, "incompatible %s settings\n", "protocol");
3189 goto disconnect_rcu_unlock;
3190 }
3191
3192 if (convert_after_sb(p_after_sb_0p) != nc->after_sb_0p) {
3193 drbd_err(connection, "incompatible %s settings\n", "after-sb-0pri");
3194 goto disconnect_rcu_unlock;
3195 }
3196
3197 if (convert_after_sb(p_after_sb_1p) != nc->after_sb_1p) {
3198 drbd_err(connection, "incompatible %s settings\n", "after-sb-1pri");
3199 goto disconnect_rcu_unlock;
3200 }
3201
3202 if (convert_after_sb(p_after_sb_2p) != nc->after_sb_2p) {
3203 drbd_err(connection, "incompatible %s settings\n", "after-sb-2pri");
3204 goto disconnect_rcu_unlock;
3205 }
3206
3207 if (p_discard_my_data && nc->discard_my_data) {
3208 drbd_err(connection, "incompatible %s settings\n", "discard-my-data");
3209 goto disconnect_rcu_unlock;
3210 }
3211
3212 if (p_two_primaries != nc->two_primaries) {
3213 drbd_err(connection, "incompatible %s settings\n", "allow-two-primaries");
3214 goto disconnect_rcu_unlock;
3215 }
3216
3217 if (strcmp(integrity_alg, nc->integrity_alg)) {
3218 drbd_err(connection, "incompatible %s settings\n", "data-integrity-alg");
3219 goto disconnect_rcu_unlock;
3220 }
3221
3222 rcu_read_unlock();
3223 }
3224
3225 if (integrity_alg[0]) {
3226 int hash_size;
3227
3228 /*
3229 * We can only change the peer data integrity algorithm
3230 * here. Changing our own data integrity algorithm
3231 * requires that we send a P_PROTOCOL_UPDATE packet at
3232 * the same time; otherwise, the peer has no way to
3233 * tell between which packets the algorithm should
3234 * change.
3235 */
3236
3237 peer_integrity_tfm = crypto_alloc_hash(integrity_alg, 0, CRYPTO_ALG_ASYNC);
3238 if (!peer_integrity_tfm) {
3239 drbd_err(connection, "peer data-integrity-alg %s not supported\n",
3240 integrity_alg);
3241 goto disconnect;
3242 }
3243
3244 hash_size = crypto_hash_digestsize(peer_integrity_tfm);
3245 int_dig_in = kmalloc(hash_size, GFP_KERNEL);
3246 int_dig_vv = kmalloc(hash_size, GFP_KERNEL);
3247 if (!(int_dig_in && int_dig_vv)) {
3248 drbd_err(connection, "Allocation of buffers for data integrity checking failed\n");
3249 goto disconnect;
3250 }
3251 }
3252
3253 new_net_conf = kmalloc(sizeof(struct net_conf), GFP_KERNEL);
3254 if (!new_net_conf) {
3255 drbd_err(connection, "Allocation of new net_conf failed\n");
3256 goto disconnect;
3257 }
3258
3259 mutex_lock(&connection->data.mutex);
3260 mutex_lock(&connection->resource->conf_update);
3261 old_net_conf = connection->net_conf;
3262 *new_net_conf = *old_net_conf;
3263
3264 new_net_conf->wire_protocol = p_proto;
3265 new_net_conf->after_sb_0p = convert_after_sb(p_after_sb_0p);
3266 new_net_conf->after_sb_1p = convert_after_sb(p_after_sb_1p);
3267 new_net_conf->after_sb_2p = convert_after_sb(p_after_sb_2p);
3268 new_net_conf->two_primaries = p_two_primaries;
3269
3270 rcu_assign_pointer(connection->net_conf, new_net_conf);
3271 mutex_unlock(&connection->resource->conf_update);
3272 mutex_unlock(&connection->data.mutex);
3273
3274 crypto_free_hash(connection->peer_integrity_tfm);
3275 kfree(connection->int_dig_in);
3276 kfree(connection->int_dig_vv);
3277 connection->peer_integrity_tfm = peer_integrity_tfm;
3278 connection->int_dig_in = int_dig_in;
3279 connection->int_dig_vv = int_dig_vv;
3280
3281 if (strcmp(old_net_conf->integrity_alg, integrity_alg))
3282 drbd_info(connection, "peer data-integrity-alg: %s\n",
3283 integrity_alg[0] ? integrity_alg : "(none)");
3284
3285 synchronize_rcu();
3286 kfree(old_net_conf);
3287 return 0;
3288
3289disconnect_rcu_unlock:
3290 rcu_read_unlock();
3291disconnect:
3292 crypto_free_hash(peer_integrity_tfm);
3293 kfree(int_dig_in);
3294 kfree(int_dig_vv);
3295 conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD);
3296 return -EIO;
3297}
3298
3299/* helper function
3300 * input: alg name, feature name
3301 * return: NULL (alg name was "")
3302 * ERR_PTR(error) if something goes wrong
3303 * or the crypto hash ptr, if it worked out ok. */
3304static
3305struct crypto_hash *drbd_crypto_alloc_digest_safe(const struct drbd_device *device,
3306 const char *alg, const char *name)
3307{
3308 struct crypto_hash *tfm;
3309
3310 if (!alg[0])
3311 return NULL;
3312
3313 tfm = crypto_alloc_hash(alg, 0, CRYPTO_ALG_ASYNC);
3314 if (IS_ERR(tfm)) {
3315 drbd_err(device, "Can not allocate \"%s\" as %s (reason: %ld)\n",
3316 alg, name, PTR_ERR(tfm));
3317 return tfm;
3318 }
3319 return tfm;
3320}
3321
3322static int ignore_remaining_packet(struct drbd_connection *connection, struct packet_info *pi)
3323{
3324 void *buffer = connection->data.rbuf;
3325 int size = pi->size;
3326
3327 while (size) {
3328 int s = min_t(int, size, DRBD_SOCKET_BUFFER_SIZE);
3329 s = drbd_recv(connection, buffer, s);
3330 if (s <= 0) {
3331 if (s < 0)
3332 return s;
3333 break;
3334 }
3335 size -= s;
3336 }
3337 if (size)
3338 return -EIO;
3339 return 0;
3340}
3341
3342/*
3343 * config_unknown_volume - device configuration command for unknown volume
3344 *
3345 * When a device is added to an existing connection, the node on which the
3346 * device is added first will send configuration commands to its peer but the
3347 * peer will not know about the device yet. It will warn and ignore these
3348 * commands. Once the device is added on the second node, the second node will
3349 * send the same device configuration commands, but in the other direction.
3350 *
3351 * (We can also end up here if drbd is misconfigured.)
3352 */
3353static int config_unknown_volume(struct drbd_connection *connection, struct packet_info *pi)
3354{
3355 drbd_warn(connection, "%s packet received for volume %u, which is not configured locally\n",
3356 cmdname(pi->cmd), pi->vnr);
3357 return ignore_remaining_packet(connection, pi);
3358}
3359
3360static int receive_SyncParam(struct drbd_connection *connection, struct packet_info *pi)
3361{
3362 struct drbd_peer_device *peer_device;
3363 struct drbd_device *device;
3364 struct p_rs_param_95 *p;
3365 unsigned int header_size, data_size, exp_max_sz;
3366 struct crypto_hash *verify_tfm = NULL;
3367 struct crypto_hash *csums_tfm = NULL;
3368 struct net_conf *old_net_conf, *new_net_conf = NULL;
3369 struct disk_conf *old_disk_conf = NULL, *new_disk_conf = NULL;
3370 const int apv = connection->agreed_pro_version;
3371 struct fifo_buffer *old_plan = NULL, *new_plan = NULL;
3372 int fifo_size = 0;
3373 int err;
3374
3375 peer_device = conn_peer_device(connection, pi->vnr);
3376 if (!peer_device)
3377 return config_unknown_volume(connection, pi);
3378 device = peer_device->device;
3379
3380 exp_max_sz = apv <= 87 ? sizeof(struct p_rs_param)
3381 : apv == 88 ? sizeof(struct p_rs_param)
3382 + SHARED_SECRET_MAX
3383 : apv <= 94 ? sizeof(struct p_rs_param_89)
3384 : /* apv >= 95 */ sizeof(struct p_rs_param_95);
3385
3386 if (pi->size > exp_max_sz) {
3387 drbd_err(device, "SyncParam packet too long: received %u, expected <= %u bytes\n",
3388 pi->size, exp_max_sz);
3389 return -EIO;
3390 }
3391
3392 if (apv <= 88) {
3393 header_size = sizeof(struct p_rs_param);
3394 data_size = pi->size - header_size;
3395 } else if (apv <= 94) {
3396 header_size = sizeof(struct p_rs_param_89);
3397 data_size = pi->size - header_size;
3398 D_ASSERT(device, data_size == 0);
3399 } else {
3400 header_size = sizeof(struct p_rs_param_95);
3401 data_size = pi->size - header_size;
3402 D_ASSERT(device, data_size == 0);
3403 }
3404
3405 /* initialize verify_alg and csums_alg */
3406 p = pi->data;
3407 memset(p->verify_alg, 0, 2 * SHARED_SECRET_MAX);
3408
3409 err = drbd_recv_all(peer_device->connection, p, header_size);
3410 if (err)
3411 return err;
3412
3413 mutex_lock(&connection->resource->conf_update);
3414 old_net_conf = peer_device->connection->net_conf;
3415 if (get_ldev(device)) {
3416 new_disk_conf = kzalloc(sizeof(struct disk_conf), GFP_KERNEL);
3417 if (!new_disk_conf) {
3418 put_ldev(device);
3419 mutex_unlock(&connection->resource->conf_update);
3420 drbd_err(device, "Allocation of new disk_conf failed\n");
3421 return -ENOMEM;
3422 }
3423
3424 old_disk_conf = device->ldev->disk_conf;
3425 *new_disk_conf = *old_disk_conf;
3426
3427 new_disk_conf->resync_rate = be32_to_cpu(p->resync_rate);
3428 }
3429
3430 if (apv >= 88) {
3431 if (apv == 88) {
3432 if (data_size > SHARED_SECRET_MAX || data_size == 0) {
3433 drbd_err(device, "verify-alg of wrong size, "
3434 "peer wants %u, accepting only up to %u byte\n",
3435 data_size, SHARED_SECRET_MAX);
3436 err = -EIO;
3437 goto reconnect;
3438 }
3439
3440 err = drbd_recv_all(peer_device->connection, p->verify_alg, data_size);
3441 if (err)
3442 goto reconnect;
3443 /* we expect NUL terminated string */
3444 /* but just in case someone tries to be evil */
3445 D_ASSERT(device, p->verify_alg[data_size-1] == 0);
3446 p->verify_alg[data_size-1] = 0;
3447
3448 } else /* apv >= 89 */ {
3449 /* we still expect NUL terminated strings */
3450 /* but just in case someone tries to be evil */
3451 D_ASSERT(device, p->verify_alg[SHARED_SECRET_MAX-1] == 0);
3452 D_ASSERT(device, p->csums_alg[SHARED_SECRET_MAX-1] == 0);
3453 p->verify_alg[SHARED_SECRET_MAX-1] = 0;
3454 p->csums_alg[SHARED_SECRET_MAX-1] = 0;
3455 }
3456
3457 if (strcmp(old_net_conf->verify_alg, p->verify_alg)) {
3458 if (device->state.conn == C_WF_REPORT_PARAMS) {
3459 drbd_err(device, "Different verify-alg settings. me=\"%s\" peer=\"%s\"\n",
3460 old_net_conf->verify_alg, p->verify_alg);
3461 goto disconnect;
3462 }
3463 verify_tfm = drbd_crypto_alloc_digest_safe(device,
3464 p->verify_alg, "verify-alg");
3465 if (IS_ERR(verify_tfm)) {
3466 verify_tfm = NULL;
3467 goto disconnect;
3468 }
3469 }
3470
3471 if (apv >= 89 && strcmp(old_net_conf->csums_alg, p->csums_alg)) {
3472 if (device->state.conn == C_WF_REPORT_PARAMS) {
3473 drbd_err(device, "Different csums-alg settings. me=\"%s\" peer=\"%s\"\n",
3474 old_net_conf->csums_alg, p->csums_alg);
3475 goto disconnect;
3476 }
3477 csums_tfm = drbd_crypto_alloc_digest_safe(device,
3478 p->csums_alg, "csums-alg");
3479 if (IS_ERR(csums_tfm)) {
3480 csums_tfm = NULL;
3481 goto disconnect;
3482 }
3483 }
3484
3485 if (apv > 94 && new_disk_conf) {
3486 new_disk_conf->c_plan_ahead = be32_to_cpu(p->c_plan_ahead);
3487 new_disk_conf->c_delay_target = be32_to_cpu(p->c_delay_target);
3488 new_disk_conf->c_fill_target = be32_to_cpu(p->c_fill_target);
3489 new_disk_conf->c_max_rate = be32_to_cpu(p->c_max_rate);
3490
3491 fifo_size = (new_disk_conf->c_plan_ahead * 10 * SLEEP_TIME) / HZ;
3492 if (fifo_size != device->rs_plan_s->size) {
3493 new_plan = fifo_alloc(fifo_size);
3494 if (!new_plan) {
3495 drbd_err(device, "kmalloc of fifo_buffer failed");
3496 put_ldev(device);
3497 goto disconnect;
3498 }
3499 }
3500 }
3501
3502 if (verify_tfm || csums_tfm) {
3503 new_net_conf = kzalloc(sizeof(struct net_conf), GFP_KERNEL);
3504 if (!new_net_conf) {
3505 drbd_err(device, "Allocation of new net_conf failed\n");
3506 goto disconnect;
3507 }
3508
3509 *new_net_conf = *old_net_conf;
3510
3511 if (verify_tfm) {
3512 strcpy(new_net_conf->verify_alg, p->verify_alg);
3513 new_net_conf->verify_alg_len = strlen(p->verify_alg) + 1;
3514 crypto_free_hash(peer_device->connection->verify_tfm);
3515 peer_device->connection->verify_tfm = verify_tfm;
3516 drbd_info(device, "using verify-alg: \"%s\"\n", p->verify_alg);
3517 }
3518 if (csums_tfm) {
3519 strcpy(new_net_conf->csums_alg, p->csums_alg);
3520 new_net_conf->csums_alg_len = strlen(p->csums_alg) + 1;
3521 crypto_free_hash(peer_device->connection->csums_tfm);
3522 peer_device->connection->csums_tfm = csums_tfm;
3523 drbd_info(device, "using csums-alg: \"%s\"\n", p->csums_alg);
3524 }
3525 rcu_assign_pointer(connection->net_conf, new_net_conf);
3526 }
3527 }
3528
3529 if (new_disk_conf) {
3530 rcu_assign_pointer(device->ldev->disk_conf, new_disk_conf);
3531 put_ldev(device);
3532 }
3533
3534 if (new_plan) {
3535 old_plan = device->rs_plan_s;
3536 rcu_assign_pointer(device->rs_plan_s, new_plan);
3537 }
3538
3539 mutex_unlock(&connection->resource->conf_update);
3540 synchronize_rcu();
3541 if (new_net_conf)
3542 kfree(old_net_conf);
3543 kfree(old_disk_conf);
3544 kfree(old_plan);
3545
3546 return 0;
3547
3548reconnect:
3549 if (new_disk_conf) {
3550 put_ldev(device);
3551 kfree(new_disk_conf);
3552 }
3553 mutex_unlock(&connection->resource->conf_update);
3554 return -EIO;
3555
3556disconnect:
3557 kfree(new_plan);
3558 if (new_disk_conf) {
3559 put_ldev(device);
3560 kfree(new_disk_conf);
3561 }
3562 mutex_unlock(&connection->resource->conf_update);
3563 /* just for completeness: actually not needed,
3564 * as this is not reached if csums_tfm was ok. */
3565 crypto_free_hash(csums_tfm);
3566 /* but free the verify_tfm again, if csums_tfm did not work out */
3567 crypto_free_hash(verify_tfm);
3568 conn_request_state(peer_device->connection, NS(conn, C_DISCONNECTING), CS_HARD);
3569 return -EIO;
3570}
3571
3572/* warn if the arguments differ by more than 12.5% */
3573static void warn_if_differ_considerably(struct drbd_device *device,
3574 const char *s, sector_t a, sector_t b)
3575{
3576 sector_t d;
3577 if (a == 0 || b == 0)
3578 return;
3579 d = (a > b) ? (a - b) : (b - a);
3580 if (d > (a>>3) || d > (b>>3))
3581 drbd_warn(device, "Considerable difference in %s: %llus vs. %llus\n", s,
3582 (unsigned long long)a, (unsigned long long)b);
3583}
3584
3585static int receive_sizes(struct drbd_connection *connection, struct packet_info *pi)
3586{
3587 struct drbd_peer_device *peer_device;
3588 struct drbd_device *device;
3589 struct p_sizes *p = pi->data;
3590 enum determine_dev_size dd = DS_UNCHANGED;
3591 sector_t p_size, p_usize, my_usize;
3592 int ldsc = 0; /* local disk size changed */
3593 enum dds_flags ddsf;
3594
3595 peer_device = conn_peer_device(connection, pi->vnr);
3596 if (!peer_device)
3597 return config_unknown_volume(connection, pi);
3598 device = peer_device->device;
3599
3600 p_size = be64_to_cpu(p->d_size);
3601 p_usize = be64_to_cpu(p->u_size);
3602
3603 /* just store the peer's disk size for now.
3604 * we still need to figure out whether we accept that. */
3605 device->p_size = p_size;
3606
3607 if (get_ldev(device)) {
3608 rcu_read_lock();
3609 my_usize = rcu_dereference(device->ldev->disk_conf)->disk_size;
3610 rcu_read_unlock();
3611
3612 warn_if_differ_considerably(device, "lower level device sizes",
3613 p_size, drbd_get_max_capacity(device->ldev));
3614 warn_if_differ_considerably(device, "user requested size",
3615 p_usize, my_usize);
3616
3617 /* if this is the first connect, or an otherwise expected
3618 * param exchange, choose the minimum */
3619 if (device->state.conn == C_WF_REPORT_PARAMS)
3620 p_usize = min_not_zero(my_usize, p_usize);
3621
3622 /* Never shrink a device with usable data during connect.
3623 But allow online shrinking if we are connected. */
3624 if (drbd_new_dev_size(device, device->ldev, p_usize, 0) <
3625 drbd_get_capacity(device->this_bdev) &&
3626 device->state.disk >= D_OUTDATED &&
3627 device->state.conn < C_CONNECTED) {
3628 drbd_err(device, "The peer's disk size is too small!\n");
3629 conn_request_state(peer_device->connection, NS(conn, C_DISCONNECTING), CS_HARD);
3630 put_ldev(device);
3631 return -EIO;
3632 }
3633
3634 if (my_usize != p_usize) {
3635 struct disk_conf *old_disk_conf, *new_disk_conf = NULL;
3636
3637 new_disk_conf = kzalloc(sizeof(struct disk_conf), GFP_KERNEL);
3638 if (!new_disk_conf) {
3639 drbd_err(device, "Allocation of new disk_conf failed\n");
3640 put_ldev(device);
3641 return -ENOMEM;
3642 }
3643
3644 mutex_lock(&connection->resource->conf_update);
3645 old_disk_conf = device->ldev->disk_conf;
3646 *new_disk_conf = *old_disk_conf;
3647 new_disk_conf->disk_size = p_usize;
3648
3649 rcu_assign_pointer(device->ldev->disk_conf, new_disk_conf);
3650 mutex_unlock(&connection->resource->conf_update);
3651 synchronize_rcu();
3652 kfree(old_disk_conf);
3653
3654 drbd_info(device, "Peer sets u_size to %lu sectors\n",
3655 (unsigned long)my_usize);
3656 }
3657
3658 put_ldev(device);
3659 }
3660
3661 ddsf = be16_to_cpu(p->dds_flags);
3662 if (get_ldev(device)) {
3663 dd = drbd_determine_dev_size(device, ddsf, NULL);
3664 put_ldev(device);
3665 if (dd == DS_ERROR)
3666 return -EIO;
3667 drbd_md_sync(device);
3668 } else {
3669 /* I am diskless, need to accept the peer's size. */
3670 drbd_set_my_capacity(device, p_size);
3671 }
3672
3673 device->peer_max_bio_size = be32_to_cpu(p->max_bio_size);
3674 drbd_reconsider_max_bio_size(device);
3675
3676 if (get_ldev(device)) {
3677 if (device->ldev->known_size != drbd_get_capacity(device->ldev->backing_bdev)) {
3678 device->ldev->known_size = drbd_get_capacity(device->ldev->backing_bdev);
3679 ldsc = 1;
3680 }
3681
3682 put_ldev(device);
3683 }
3684
3685 if (device->state.conn > C_WF_REPORT_PARAMS) {
3686 if (be64_to_cpu(p->c_size) !=
3687 drbd_get_capacity(device->this_bdev) || ldsc) {
3688 /* we have different sizes, probably peer
3689 * needs to know my new size... */
3690 drbd_send_sizes(peer_device, 0, ddsf);
3691 }
3692 if (test_and_clear_bit(RESIZE_PENDING, &device->flags) ||
3693 (dd == DS_GREW && device->state.conn == C_CONNECTED)) {
3694 if (device->state.pdsk >= D_INCONSISTENT &&
3695 device->state.disk >= D_INCONSISTENT) {
3696 if (ddsf & DDSF_NO_RESYNC)
3697 drbd_info(device, "Resync of new storage suppressed with --assume-clean\n");
3698 else
3699 resync_after_online_grow(device);
3700 } else
3701 set_bit(RESYNC_AFTER_NEG, &device->flags);
3702 }
3703 }
3704
3705 return 0;
3706}
3707
3708static int receive_uuids(struct drbd_connection *connection, struct packet_info *pi)
3709{
3710 struct drbd_peer_device *peer_device;
3711 struct drbd_device *device;
3712 struct p_uuids *p = pi->data;
3713 u64 *p_uuid;
3714 int i, updated_uuids = 0;
3715
3716 peer_device = conn_peer_device(connection, pi->vnr);
3717 if (!peer_device)
3718 return config_unknown_volume(connection, pi);
3719 device = peer_device->device;
3720
3721 p_uuid = kmalloc(sizeof(u64)*UI_EXTENDED_SIZE, GFP_NOIO);
3722 if (!p_uuid) {
3723 drbd_err(device, "kmalloc of p_uuid failed\n");
3724 return false;
3725 }
3726
3727 for (i = UI_CURRENT; i < UI_EXTENDED_SIZE; i++)
3728 p_uuid[i] = be64_to_cpu(p->uuid[i]);
3729
3730 kfree(device->p_uuid);
3731 device->p_uuid = p_uuid;
3732
3733 if (device->state.conn < C_CONNECTED &&
3734 device->state.disk < D_INCONSISTENT &&
3735 device->state.role == R_PRIMARY &&
3736 (device->ed_uuid & ~((u64)1)) != (p_uuid[UI_CURRENT] & ~((u64)1))) {
3737 drbd_err(device, "Can only connect to data with current UUID=%016llX\n",
3738 (unsigned long long)device->ed_uuid);
3739 conn_request_state(peer_device->connection, NS(conn, C_DISCONNECTING), CS_HARD);
3740 return -EIO;
3741 }
3742
3743 if (get_ldev(device)) {
3744 int skip_initial_sync =
3745 device->state.conn == C_CONNECTED &&
3746 peer_device->connection->agreed_pro_version >= 90 &&
3747 device->ldev->md.uuid[UI_CURRENT] == UUID_JUST_CREATED &&
3748 (p_uuid[UI_FLAGS] & 8);
3749 if (skip_initial_sync) {
3750 drbd_info(device, "Accepted new current UUID, preparing to skip initial sync\n");
3751 drbd_bitmap_io(device, &drbd_bmio_clear_n_write,
3752 "clear_n_write from receive_uuids",
3753 BM_LOCKED_TEST_ALLOWED);
3754 _drbd_uuid_set(device, UI_CURRENT, p_uuid[UI_CURRENT]);
3755 _drbd_uuid_set(device, UI_BITMAP, 0);
3756 _drbd_set_state(_NS2(device, disk, D_UP_TO_DATE, pdsk, D_UP_TO_DATE),
3757 CS_VERBOSE, NULL);
3758 drbd_md_sync(device);
3759 updated_uuids = 1;
3760 }
3761 put_ldev(device);
3762 } else if (device->state.disk < D_INCONSISTENT &&
3763 device->state.role == R_PRIMARY) {
3764 /* I am a diskless primary, the peer just created a new current UUID
3765 for me. */
3766 updated_uuids = drbd_set_ed_uuid(device, p_uuid[UI_CURRENT]);
3767 }
3768
3769 /* Before we test for the disk state, we should wait until an eventually
3770 ongoing cluster wide state change is finished. That is important if
3771 we are primary and are detaching from our disk. We need to see the
3772 new disk state... */
3773 mutex_lock(device->state_mutex);
3774 mutex_unlock(device->state_mutex);
3775 if (device->state.conn >= C_CONNECTED && device->state.disk < D_INCONSISTENT)
3776 updated_uuids |= drbd_set_ed_uuid(device, p_uuid[UI_CURRENT]);
3777
3778 if (updated_uuids)
3779 drbd_print_uuids(device, "receiver updated UUIDs to");
3780
3781 return 0;
3782}
3783
3784/**
3785 * convert_state() - Converts the peer's view of the cluster state to our point of view
3786 * @ps: The state as seen by the peer.
3787 */
3788static union drbd_state convert_state(union drbd_state ps)
3789{
3790 union drbd_state ms;
3791
3792 static enum drbd_conns c_tab[] = {
3793 [C_WF_REPORT_PARAMS] = C_WF_REPORT_PARAMS,
3794 [C_CONNECTED] = C_CONNECTED,
3795
3796 [C_STARTING_SYNC_S] = C_STARTING_SYNC_T,
3797 [C_STARTING_SYNC_T] = C_STARTING_SYNC_S,
3798 [C_DISCONNECTING] = C_TEAR_DOWN, /* C_NETWORK_FAILURE, */
3799 [C_VERIFY_S] = C_VERIFY_T,
3800 [C_MASK] = C_MASK,
3801 };
3802
3803 ms.i = ps.i;
3804
3805 ms.conn = c_tab[ps.conn];
3806 ms.peer = ps.role;
3807 ms.role = ps.peer;
3808 ms.pdsk = ps.disk;
3809 ms.disk = ps.pdsk;
3810 ms.peer_isp = (ps.aftr_isp | ps.user_isp);
3811
3812 return ms;
3813}
3814
3815static int receive_req_state(struct drbd_connection *connection, struct packet_info *pi)
3816{
3817 struct drbd_peer_device *peer_device;
3818 struct drbd_device *device;
3819 struct p_req_state *p = pi->data;
3820 union drbd_state mask, val;
3821 enum drbd_state_rv rv;
3822
3823 peer_device = conn_peer_device(connection, pi->vnr);
3824 if (!peer_device)
3825 return -EIO;
3826 device = peer_device->device;
3827
3828 mask.i = be32_to_cpu(p->mask);
3829 val.i = be32_to_cpu(p->val);
3830
3831 if (test_bit(RESOLVE_CONFLICTS, &peer_device->connection->flags) &&
3832 mutex_is_locked(device->state_mutex)) {
3833 drbd_send_sr_reply(peer_device, SS_CONCURRENT_ST_CHG);
3834 return 0;
3835 }
3836
3837 mask = convert_state(mask);
3838 val = convert_state(val);
3839
3840 rv = drbd_change_state(device, CS_VERBOSE, mask, val);
3841 drbd_send_sr_reply(peer_device, rv);
3842
3843 drbd_md_sync(device);
3844
3845 return 0;
3846}
3847
3848static int receive_req_conn_state(struct drbd_connection *connection, struct packet_info *pi)
3849{
3850 struct p_req_state *p = pi->data;
3851 union drbd_state mask, val;
3852 enum drbd_state_rv rv;
3853
3854 mask.i = be32_to_cpu(p->mask);
3855 val.i = be32_to_cpu(p->val);
3856
3857 if (test_bit(RESOLVE_CONFLICTS, &connection->flags) &&
3858 mutex_is_locked(&connection->cstate_mutex)) {
3859 conn_send_sr_reply(connection, SS_CONCURRENT_ST_CHG);
3860 return 0;
3861 }
3862
3863 mask = convert_state(mask);
3864 val = convert_state(val);
3865
3866 rv = conn_request_state(connection, mask, val, CS_VERBOSE | CS_LOCAL_ONLY | CS_IGN_OUTD_FAIL);
3867 conn_send_sr_reply(connection, rv);
3868
3869 return 0;
3870}
3871
3872static int receive_state(struct drbd_connection *connection, struct packet_info *pi)
3873{
3874 struct drbd_peer_device *peer_device;
3875 struct drbd_device *device;
3876 struct p_state *p = pi->data;
3877 union drbd_state os, ns, peer_state;
3878 enum drbd_disk_state real_peer_disk;
3879 enum chg_state_flags cs_flags;
3880 int rv;
3881
3882 peer_device = conn_peer_device(connection, pi->vnr);
3883 if (!peer_device)
3884 return config_unknown_volume(connection, pi);
3885 device = peer_device->device;
3886
3887 peer_state.i = be32_to_cpu(p->state);
3888
3889 real_peer_disk = peer_state.disk;
3890 if (peer_state.disk == D_NEGOTIATING) {
3891 real_peer_disk = device->p_uuid[UI_FLAGS] & 4 ? D_INCONSISTENT : D_CONSISTENT;
3892 drbd_info(device, "real peer disk state = %s\n", drbd_disk_str(real_peer_disk));
3893 }
3894
3895 spin_lock_irq(&device->resource->req_lock);
3896 retry:
3897 os = ns = drbd_read_state(device);
3898 spin_unlock_irq(&device->resource->req_lock);
3899
3900 /* If some other part of the code (asender thread, timeout)
3901 * already decided to close the connection again,
3902 * we must not "re-establish" it here. */
3903 if (os.conn <= C_TEAR_DOWN)
3904 return -ECONNRESET;
3905
3906 /* If this is the "end of sync" confirmation, usually the peer disk
3907 * transitions from D_INCONSISTENT to D_UP_TO_DATE. For empty (0 bits
3908 * set) resync started in PausedSyncT, or if the timing of pause-/
3909 * unpause-sync events has been "just right", the peer disk may
3910 * transition from D_CONSISTENT to D_UP_TO_DATE as well.
3911 */
3912 if ((os.pdsk == D_INCONSISTENT || os.pdsk == D_CONSISTENT) &&
3913 real_peer_disk == D_UP_TO_DATE &&
3914 os.conn > C_CONNECTED && os.disk == D_UP_TO_DATE) {
3915 /* If we are (becoming) SyncSource, but peer is still in sync
3916 * preparation, ignore its uptodate-ness to avoid flapping, it
3917 * will change to inconsistent once the peer reaches active
3918 * syncing states.
3919 * It may have changed syncer-paused flags, however, so we
3920 * cannot ignore this completely. */
3921 if (peer_state.conn > C_CONNECTED &&
3922 peer_state.conn < C_SYNC_SOURCE)
3923 real_peer_disk = D_INCONSISTENT;
3924
3925 /* if peer_state changes to connected at the same time,
3926 * it explicitly notifies us that it finished resync.
3927 * Maybe we should finish it up, too? */
3928 else if (os.conn >= C_SYNC_SOURCE &&
3929 peer_state.conn == C_CONNECTED) {
3930 if (drbd_bm_total_weight(device) <= device->rs_failed)
3931 drbd_resync_finished(device);
3932 return 0;
3933 }
3934 }
3935
3936 /* explicit verify finished notification, stop sector reached. */
3937 if (os.conn == C_VERIFY_T && os.disk == D_UP_TO_DATE &&
3938 peer_state.conn == C_CONNECTED && real_peer_disk == D_UP_TO_DATE) {
3939 ov_out_of_sync_print(device);
3940 drbd_resync_finished(device);
3941 return 0;
3942 }
3943
3944 /* peer says his disk is inconsistent, while we think it is uptodate,
3945 * and this happens while the peer still thinks we have a sync going on,
3946 * but we think we are already done with the sync.
3947 * We ignore this to avoid flapping pdsk.
3948 * This should not happen, if the peer is a recent version of drbd. */
3949 if (os.pdsk == D_UP_TO_DATE && real_peer_disk == D_INCONSISTENT &&
3950 os.conn == C_CONNECTED && peer_state.conn > C_SYNC_SOURCE)
3951 real_peer_disk = D_UP_TO_DATE;
3952
3953 if (ns.conn == C_WF_REPORT_PARAMS)
3954 ns.conn = C_CONNECTED;
3955
3956 if (peer_state.conn == C_AHEAD)
3957 ns.conn = C_BEHIND;
3958
3959 if (device->p_uuid && peer_state.disk >= D_NEGOTIATING &&
3960 get_ldev_if_state(device, D_NEGOTIATING)) {
3961 int cr; /* consider resync */
3962
3963 /* if we established a new connection */
3964 cr = (os.conn < C_CONNECTED);
3965 /* if we had an established connection
3966 * and one of the nodes newly attaches a disk */
3967 cr |= (os.conn == C_CONNECTED &&
3968 (peer_state.disk == D_NEGOTIATING ||
3969 os.disk == D_NEGOTIATING));
3970 /* if we have both been inconsistent, and the peer has been
3971 * forced to be UpToDate with --overwrite-data */
3972 cr |= test_bit(CONSIDER_RESYNC, &device->flags);
3973 /* if we had been plain connected, and the admin requested to
3974 * start a sync by "invalidate" or "invalidate-remote" */
3975 cr |= (os.conn == C_CONNECTED &&
3976 (peer_state.conn >= C_STARTING_SYNC_S &&
3977 peer_state.conn <= C_WF_BITMAP_T));
3978
3979 if (cr)
3980 ns.conn = drbd_sync_handshake(peer_device, peer_state.role, real_peer_disk);
3981
3982 put_ldev(device);
3983 if (ns.conn == C_MASK) {
3984 ns.conn = C_CONNECTED;
3985 if (device->state.disk == D_NEGOTIATING) {
3986 drbd_force_state(device, NS(disk, D_FAILED));
3987 } else if (peer_state.disk == D_NEGOTIATING) {
3988 drbd_err(device, "Disk attach process on the peer node was aborted.\n");
3989 peer_state.disk = D_DISKLESS;
3990 real_peer_disk = D_DISKLESS;
3991 } else {
3992 if (test_and_clear_bit(CONN_DRY_RUN, &peer_device->connection->flags))
3993 return -EIO;
3994 D_ASSERT(device, os.conn == C_WF_REPORT_PARAMS);
3995 conn_request_state(peer_device->connection, NS(conn, C_DISCONNECTING), CS_HARD);
3996 return -EIO;
3997 }
3998 }
3999 }
4000
4001 spin_lock_irq(&device->resource->req_lock);
4002 if (os.i != drbd_read_state(device).i)
4003 goto retry;
4004 clear_bit(CONSIDER_RESYNC, &device->flags);
4005 ns.peer = peer_state.role;
4006 ns.pdsk = real_peer_disk;
4007 ns.peer_isp = (peer_state.aftr_isp | peer_state.user_isp);
4008 if ((ns.conn == C_CONNECTED || ns.conn == C_WF_BITMAP_S) && ns.disk == D_NEGOTIATING)
4009 ns.disk = device->new_state_tmp.disk;
4010 cs_flags = CS_VERBOSE + (os.conn < C_CONNECTED && ns.conn >= C_CONNECTED ? 0 : CS_HARD);
4011 if (ns.pdsk == D_CONSISTENT && drbd_suspended(device) && ns.conn == C_CONNECTED && os.conn < C_CONNECTED &&
4012 test_bit(NEW_CUR_UUID, &device->flags)) {
4013 /* Do not allow tl_restart(RESEND) for a rebooted peer. We can only allow this
4014 for temporal network outages! */
4015 spin_unlock_irq(&device->resource->req_lock);
4016 drbd_err(device, "Aborting Connect, can not thaw IO with an only Consistent peer\n");
4017 tl_clear(peer_device->connection);
4018 drbd_uuid_new_current(device);
4019 clear_bit(NEW_CUR_UUID, &device->flags);
4020 conn_request_state(peer_device->connection, NS2(conn, C_PROTOCOL_ERROR, susp, 0), CS_HARD);
4021 return -EIO;
4022 }
4023 rv = _drbd_set_state(device, ns, cs_flags, NULL);
4024 ns = drbd_read_state(device);
4025 spin_unlock_irq(&device->resource->req_lock);
4026
4027 if (rv < SS_SUCCESS) {
4028 conn_request_state(peer_device->connection, NS(conn, C_DISCONNECTING), CS_HARD);
4029 return -EIO;
4030 }
4031
4032 if (os.conn > C_WF_REPORT_PARAMS) {
4033 if (ns.conn > C_CONNECTED && peer_state.conn <= C_CONNECTED &&
4034 peer_state.disk != D_NEGOTIATING ) {
4035 /* we want resync, peer has not yet decided to sync... */
4036 /* Nowadays only used when forcing a node into primary role and
4037 setting its disk to UpToDate with that */
4038 drbd_send_uuids(peer_device);
4039 drbd_send_current_state(peer_device);
4040 }
4041 }
4042
4043 clear_bit(DISCARD_MY_DATA, &device->flags);
4044
4045 drbd_md_sync(device); /* update connected indicator, la_size_sect, ... */
4046
4047 return 0;
4048}
4049
4050static int receive_sync_uuid(struct drbd_connection *connection, struct packet_info *pi)
4051{
4052 struct drbd_peer_device *peer_device;
4053 struct drbd_device *device;
4054 struct p_rs_uuid *p = pi->data;
4055
4056 peer_device = conn_peer_device(connection, pi->vnr);
4057 if (!peer_device)
4058 return -EIO;
4059 device = peer_device->device;
4060
4061 wait_event(device->misc_wait,
4062 device->state.conn == C_WF_SYNC_UUID ||
4063 device->state.conn == C_BEHIND ||
4064 device->state.conn < C_CONNECTED ||
4065 device->state.disk < D_NEGOTIATING);
4066
4067 /* D_ASSERT(device, device->state.conn == C_WF_SYNC_UUID ); */
4068
4069 /* Here the _drbd_uuid_ functions are right, current should
4070 _not_ be rotated into the history */
4071 if (get_ldev_if_state(device, D_NEGOTIATING)) {
4072 _drbd_uuid_set(device, UI_CURRENT, be64_to_cpu(p->uuid));
4073 _drbd_uuid_set(device, UI_BITMAP, 0UL);
4074
4075 drbd_print_uuids(device, "updated sync uuid");
4076 drbd_start_resync(device, C_SYNC_TARGET);
4077
4078 put_ldev(device);
4079 } else
4080 drbd_err(device, "Ignoring SyncUUID packet!\n");
4081
4082 return 0;
4083}
4084
4085/**
4086 * receive_bitmap_plain
4087 *
4088 * Return 0 when done, 1 when another iteration is needed, and a negative error
4089 * code upon failure.
4090 */
4091static int
4092receive_bitmap_plain(struct drbd_peer_device *peer_device, unsigned int size,
4093 unsigned long *p, struct bm_xfer_ctx *c)
4094{
4095 unsigned int data_size = DRBD_SOCKET_BUFFER_SIZE -
4096 drbd_header_size(peer_device->connection);
4097 unsigned int num_words = min_t(size_t, data_size / sizeof(*p),
4098 c->bm_words - c->word_offset);
4099 unsigned int want = num_words * sizeof(*p);
4100 int err;
4101
4102 if (want != size) {
4103 drbd_err(peer_device, "%s:want (%u) != size (%u)\n", __func__, want, size);
4104 return -EIO;
4105 }
4106 if (want == 0)
4107 return 0;
4108 err = drbd_recv_all(peer_device->connection, p, want);
4109 if (err)
4110 return err;
4111
4112 drbd_bm_merge_lel(peer_device->device, c->word_offset, num_words, p);
4113
4114 c->word_offset += num_words;
4115 c->bit_offset = c->word_offset * BITS_PER_LONG;
4116 if (c->bit_offset > c->bm_bits)
4117 c->bit_offset = c->bm_bits;
4118
4119 return 1;
4120}
4121
4122static enum drbd_bitmap_code dcbp_get_code(struct p_compressed_bm *p)
4123{
4124 return (enum drbd_bitmap_code)(p->encoding & 0x0f);
4125}
4126
4127static int dcbp_get_start(struct p_compressed_bm *p)
4128{
4129 return (p->encoding & 0x80) != 0;
4130}
4131
4132static int dcbp_get_pad_bits(struct p_compressed_bm *p)
4133{
4134 return (p->encoding >> 4) & 0x7;
4135}
4136
4137/**
4138 * recv_bm_rle_bits
4139 *
4140 * Return 0 when done, 1 when another iteration is needed, and a negative error
4141 * code upon failure.
4142 */
4143static int
4144recv_bm_rle_bits(struct drbd_peer_device *peer_device,
4145 struct p_compressed_bm *p,
4146 struct bm_xfer_ctx *c,
4147 unsigned int len)
4148{
4149 struct bitstream bs;
4150 u64 look_ahead;
4151 u64 rl;
4152 u64 tmp;
4153 unsigned long s = c->bit_offset;
4154 unsigned long e;
4155 int toggle = dcbp_get_start(p);
4156 int have;
4157 int bits;
4158
4159 bitstream_init(&bs, p->code, len, dcbp_get_pad_bits(p));
4160
4161 bits = bitstream_get_bits(&bs, &look_ahead, 64);
4162 if (bits < 0)
4163 return -EIO;
4164
4165 for (have = bits; have > 0; s += rl, toggle = !toggle) {
4166 bits = vli_decode_bits(&rl, look_ahead);
4167 if (bits <= 0)
4168 return -EIO;
4169
4170 if (toggle) {
4171 e = s + rl -1;
4172 if (e >= c->bm_bits) {
4173 drbd_err(peer_device, "bitmap overflow (e:%lu) while decoding bm RLE packet\n", e);
4174 return -EIO;
4175 }
4176 _drbd_bm_set_bits(peer_device->device, s, e);
4177 }
4178
4179 if (have < bits) {
4180 drbd_err(peer_device, "bitmap decoding error: h:%d b:%d la:0x%08llx l:%u/%u\n",
4181 have, bits, look_ahead,
4182 (unsigned int)(bs.cur.b - p->code),
4183 (unsigned int)bs.buf_len);
4184 return -EIO;
4185 }
4186 /* if we consumed all 64 bits, assign 0; >> 64 is "undefined"; */
4187 if (likely(bits < 64))
4188 look_ahead >>= bits;
4189 else
4190 look_ahead = 0;
4191 have -= bits;
4192
4193 bits = bitstream_get_bits(&bs, &tmp, 64 - have);
4194 if (bits < 0)
4195 return -EIO;
4196 look_ahead |= tmp << have;
4197 have += bits;
4198 }
4199
4200 c->bit_offset = s;
4201 bm_xfer_ctx_bit_to_word_offset(c);
4202
4203 return (s != c->bm_bits);
4204}
4205
4206/**
4207 * decode_bitmap_c
4208 *
4209 * Return 0 when done, 1 when another iteration is needed, and a negative error
4210 * code upon failure.
4211 */
4212static int
4213decode_bitmap_c(struct drbd_peer_device *peer_device,
4214 struct p_compressed_bm *p,
4215 struct bm_xfer_ctx *c,
4216 unsigned int len)
4217{
4218 if (dcbp_get_code(p) == RLE_VLI_Bits)
4219 return recv_bm_rle_bits(peer_device, p, c, len - sizeof(*p));
4220
4221 /* other variants had been implemented for evaluation,
4222 * but have been dropped as this one turned out to be "best"
4223 * during all our tests. */
4224
4225 drbd_err(peer_device, "receive_bitmap_c: unknown encoding %u\n", p->encoding);
4226 conn_request_state(peer_device->connection, NS(conn, C_PROTOCOL_ERROR), CS_HARD);
4227 return -EIO;
4228}
4229
4230void INFO_bm_xfer_stats(struct drbd_device *device,
4231 const char *direction, struct bm_xfer_ctx *c)
4232{
4233 /* what would it take to transfer it "plaintext" */
4234 unsigned int header_size = drbd_header_size(first_peer_device(device)->connection);
4235 unsigned int data_size = DRBD_SOCKET_BUFFER_SIZE - header_size;
4236 unsigned int plain =
4237 header_size * (DIV_ROUND_UP(c->bm_words, data_size) + 1) +
4238 c->bm_words * sizeof(unsigned long);
4239 unsigned int total = c->bytes[0] + c->bytes[1];
4240 unsigned int r;
4241
4242 /* total can not be zero. but just in case: */
4243 if (total == 0)
4244 return;
4245
4246 /* don't report if not compressed */
4247 if (total >= plain)
4248 return;
4249
4250 /* total < plain. check for overflow, still */
4251 r = (total > UINT_MAX/1000) ? (total / (plain/1000))
4252 : (1000 * total / plain);
4253
4254 if (r > 1000)
4255 r = 1000;
4256
4257 r = 1000 - r;
4258 drbd_info(device, "%s bitmap stats [Bytes(packets)]: plain %u(%u), RLE %u(%u), "
4259 "total %u; compression: %u.%u%%\n",
4260 direction,
4261 c->bytes[1], c->packets[1],
4262 c->bytes[0], c->packets[0],
4263 total, r/10, r % 10);
4264}
4265
4266/* Since we are processing the bitfield from lower addresses to higher,
4267 it does not matter if the process it in 32 bit chunks or 64 bit
4268 chunks as long as it is little endian. (Understand it as byte stream,
4269 beginning with the lowest byte...) If we would use big endian
4270 we would need to process it from the highest address to the lowest,
4271 in order to be agnostic to the 32 vs 64 bits issue.
4272
4273 returns 0 on failure, 1 if we successfully received it. */
4274static int receive_bitmap(struct drbd_connection *connection, struct packet_info *pi)
4275{
4276 struct drbd_peer_device *peer_device;
4277 struct drbd_device *device;
4278 struct bm_xfer_ctx c;
4279 int err;
4280
4281 peer_device = conn_peer_device(connection, pi->vnr);
4282 if (!peer_device)
4283 return -EIO;
4284 device = peer_device->device;
4285
4286 drbd_bm_lock(device, "receive bitmap", BM_LOCKED_SET_ALLOWED);
4287 /* you are supposed to send additional out-of-sync information
4288 * if you actually set bits during this phase */
4289
4290 c = (struct bm_xfer_ctx) {
4291 .bm_bits = drbd_bm_bits(device),
4292 .bm_words = drbd_bm_words(device),
4293 };
4294
4295 for(;;) {
4296 if (pi->cmd == P_BITMAP)
4297 err = receive_bitmap_plain(peer_device, pi->size, pi->data, &c);
4298 else if (pi->cmd == P_COMPRESSED_BITMAP) {
4299 /* MAYBE: sanity check that we speak proto >= 90,
4300 * and the feature is enabled! */
4301 struct p_compressed_bm *p = pi->data;
4302
4303 if (pi->size > DRBD_SOCKET_BUFFER_SIZE - drbd_header_size(connection)) {
4304 drbd_err(device, "ReportCBitmap packet too large\n");
4305 err = -EIO;
4306 goto out;
4307 }
4308 if (pi->size <= sizeof(*p)) {
4309 drbd_err(device, "ReportCBitmap packet too small (l:%u)\n", pi->size);
4310 err = -EIO;
4311 goto out;
4312 }
4313 err = drbd_recv_all(peer_device->connection, p, pi->size);
4314 if (err)
4315 goto out;
4316 err = decode_bitmap_c(peer_device, p, &c, pi->size);
4317 } else {
4318 drbd_warn(device, "receive_bitmap: cmd neither ReportBitMap nor ReportCBitMap (is 0x%x)", pi->cmd);
4319 err = -EIO;
4320 goto out;
4321 }
4322
4323 c.packets[pi->cmd == P_BITMAP]++;
4324 c.bytes[pi->cmd == P_BITMAP] += drbd_header_size(connection) + pi->size;
4325
4326 if (err <= 0) {
4327 if (err < 0)
4328 goto out;
4329 break;
4330 }
4331 err = drbd_recv_header(peer_device->connection, pi);
4332 if (err)
4333 goto out;
4334 }
4335
4336 INFO_bm_xfer_stats(device, "receive", &c);
4337
4338 if (device->state.conn == C_WF_BITMAP_T) {
4339 enum drbd_state_rv rv;
4340
4341 err = drbd_send_bitmap(device);
4342 if (err)
4343 goto out;
4344 /* Omit CS_ORDERED with this state transition to avoid deadlocks. */
4345 rv = _drbd_request_state(device, NS(conn, C_WF_SYNC_UUID), CS_VERBOSE);
4346 D_ASSERT(device, rv == SS_SUCCESS);
4347 } else if (device->state.conn != C_WF_BITMAP_S) {
4348 /* admin may have requested C_DISCONNECTING,
4349 * other threads may have noticed network errors */
4350 drbd_info(device, "unexpected cstate (%s) in receive_bitmap\n",
4351 drbd_conn_str(device->state.conn));
4352 }
4353 err = 0;
4354
4355 out:
4356 drbd_bm_unlock(device);
4357 if (!err && device->state.conn == C_WF_BITMAP_S)
4358 drbd_start_resync(device, C_SYNC_SOURCE);
4359 return err;
4360}
4361
4362static int receive_skip(struct drbd_connection *connection, struct packet_info *pi)
4363{
4364 drbd_warn(connection, "skipping unknown optional packet type %d, l: %d!\n",
4365 pi->cmd, pi->size);
4366
4367 return ignore_remaining_packet(connection, pi);
4368}
4369
4370static int receive_UnplugRemote(struct drbd_connection *connection, struct packet_info *pi)
4371{
4372 /* Make sure we've acked all the TCP data associated
4373 * with the data requests being unplugged */
4374 drbd_tcp_quickack(connection->data.socket);
4375
4376 return 0;
4377}
4378
4379static int receive_out_of_sync(struct drbd_connection *connection, struct packet_info *pi)
4380{
4381 struct drbd_peer_device *peer_device;
4382 struct drbd_device *device;
4383 struct p_block_desc *p = pi->data;
4384
4385 peer_device = conn_peer_device(connection, pi->vnr);
4386 if (!peer_device)
4387 return -EIO;
4388 device = peer_device->device;
4389
4390 switch (device->state.conn) {
4391 case C_WF_SYNC_UUID:
4392 case C_WF_BITMAP_T:
4393 case C_BEHIND:
4394 break;
4395 default:
4396 drbd_err(device, "ASSERT FAILED cstate = %s, expected: WFSyncUUID|WFBitMapT|Behind\n",
4397 drbd_conn_str(device->state.conn));
4398 }
4399
4400 drbd_set_out_of_sync(device, be64_to_cpu(p->sector), be32_to_cpu(p->blksize));
4401
4402 return 0;
4403}
4404
4405struct data_cmd {
4406 int expect_payload;
4407 size_t pkt_size;
4408 int (*fn)(struct drbd_connection *, struct packet_info *);
4409};
4410
4411static struct data_cmd drbd_cmd_handler[] = {
4412 [P_DATA] = { 1, sizeof(struct p_data), receive_Data },
4413 [P_DATA_REPLY] = { 1, sizeof(struct p_data), receive_DataReply },
4414 [P_RS_DATA_REPLY] = { 1, sizeof(struct p_data), receive_RSDataReply } ,
4415 [P_BARRIER] = { 0, sizeof(struct p_barrier), receive_Barrier } ,
4416 [P_BITMAP] = { 1, 0, receive_bitmap } ,
4417 [P_COMPRESSED_BITMAP] = { 1, 0, receive_bitmap } ,
4418 [P_UNPLUG_REMOTE] = { 0, 0, receive_UnplugRemote },
4419 [P_DATA_REQUEST] = { 0, sizeof(struct p_block_req), receive_DataRequest },
4420 [P_RS_DATA_REQUEST] = { 0, sizeof(struct p_block_req), receive_DataRequest },
4421 [P_SYNC_PARAM] = { 1, 0, receive_SyncParam },
4422 [P_SYNC_PARAM89] = { 1, 0, receive_SyncParam },
4423 [P_PROTOCOL] = { 1, sizeof(struct p_protocol), receive_protocol },
4424 [P_UUIDS] = { 0, sizeof(struct p_uuids), receive_uuids },
4425 [P_SIZES] = { 0, sizeof(struct p_sizes), receive_sizes },
4426 [P_STATE] = { 0, sizeof(struct p_state), receive_state },
4427 [P_STATE_CHG_REQ] = { 0, sizeof(struct p_req_state), receive_req_state },
4428 [P_SYNC_UUID] = { 0, sizeof(struct p_rs_uuid), receive_sync_uuid },
4429 [P_OV_REQUEST] = { 0, sizeof(struct p_block_req), receive_DataRequest },
4430 [P_OV_REPLY] = { 1, sizeof(struct p_block_req), receive_DataRequest },
4431 [P_CSUM_RS_REQUEST] = { 1, sizeof(struct p_block_req), receive_DataRequest },
4432 [P_DELAY_PROBE] = { 0, sizeof(struct p_delay_probe93), receive_skip },
4433 [P_OUT_OF_SYNC] = { 0, sizeof(struct p_block_desc), receive_out_of_sync },
4434 [P_CONN_ST_CHG_REQ] = { 0, sizeof(struct p_req_state), receive_req_conn_state },
4435 [P_PROTOCOL_UPDATE] = { 1, sizeof(struct p_protocol), receive_protocol },
4436};
4437
4438static void drbdd(struct drbd_connection *connection)
4439{
4440 struct packet_info pi;
4441 size_t shs; /* sub header size */
4442 int err;
4443
4444 while (get_t_state(&connection->receiver) == RUNNING) {
4445 struct data_cmd *cmd;
4446
4447 drbd_thread_current_set_cpu(&connection->receiver);
4448 if (drbd_recv_header(connection, &pi))
4449 goto err_out;
4450
4451 cmd = &drbd_cmd_handler[pi.cmd];
4452 if (unlikely(pi.cmd >= ARRAY_SIZE(drbd_cmd_handler) || !cmd->fn)) {
4453 drbd_err(connection, "Unexpected data packet %s (0x%04x)",
4454 cmdname(pi.cmd), pi.cmd);
4455 goto err_out;
4456 }
4457
4458 shs = cmd->pkt_size;
4459 if (pi.size > shs && !cmd->expect_payload) {
4460 drbd_err(connection, "No payload expected %s l:%d\n",
4461 cmdname(pi.cmd), pi.size);
4462 goto err_out;
4463 }
4464
4465 if (shs) {
4466 err = drbd_recv_all_warn(connection, pi.data, shs);
4467 if (err)
4468 goto err_out;
4469 pi.size -= shs;
4470 }
4471
4472 err = cmd->fn(connection, &pi);
4473 if (err) {
4474 drbd_err(connection, "error receiving %s, e: %d l: %d!\n",
4475 cmdname(pi.cmd), err, pi.size);
4476 goto err_out;
4477 }
4478 }
4479 return;
4480
4481 err_out:
4482 conn_request_state(connection, NS(conn, C_PROTOCOL_ERROR), CS_HARD);
4483}
4484
4485void conn_flush_workqueue(struct drbd_connection *connection)
4486{
4487 struct drbd_wq_barrier barr;
4488
4489 barr.w.cb = w_prev_work_done;
4490 barr.w.connection = connection;
4491 init_completion(&barr.done);
4492 drbd_queue_work(&connection->sender_work, &barr.w);
4493 wait_for_completion(&barr.done);
4494}
4495
4496static void conn_disconnect(struct drbd_connection *connection)
4497{
4498 struct drbd_peer_device *peer_device;
4499 enum drbd_conns oc;
4500 int vnr;
4501
4502 if (connection->cstate == C_STANDALONE)
4503 return;
4504
4505 /* We are about to start the cleanup after connection loss.
4506 * Make sure drbd_make_request knows about that.
4507 * Usually we should be in some network failure state already,
4508 * but just in case we are not, we fix it up here.
4509 */
4510 conn_request_state(connection, NS(conn, C_NETWORK_FAILURE), CS_HARD);
4511
4512 /* asender does not clean up anything. it must not interfere, either */
4513 drbd_thread_stop(&connection->asender);
4514 drbd_free_sock(connection);
4515
4516 rcu_read_lock();
4517 idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
4518 struct drbd_device *device = peer_device->device;
4519 kref_get(&device->kref);
4520 rcu_read_unlock();
4521 drbd_disconnected(peer_device);
4522 kref_put(&device->kref, drbd_destroy_device);
4523 rcu_read_lock();
4524 }
4525 rcu_read_unlock();
4526
4527 if (!list_empty(&connection->current_epoch->list))
4528 drbd_err(connection, "ASSERTION FAILED: connection->current_epoch->list not empty\n");
4529 /* ok, no more ee's on the fly, it is safe to reset the epoch_size */
4530 atomic_set(&connection->current_epoch->epoch_size, 0);
4531 connection->send.seen_any_write_yet = false;
4532
4533 drbd_info(connection, "Connection closed\n");
4534
4535 if (conn_highest_role(connection) == R_PRIMARY && conn_highest_pdsk(connection) >= D_UNKNOWN)
4536 conn_try_outdate_peer_async(connection);
4537
4538 spin_lock_irq(&connection->resource->req_lock);
4539 oc = connection->cstate;
4540 if (oc >= C_UNCONNECTED)
4541 _conn_request_state(connection, NS(conn, C_UNCONNECTED), CS_VERBOSE);
4542
4543 spin_unlock_irq(&connection->resource->req_lock);
4544
4545 if (oc == C_DISCONNECTING)
4546 conn_request_state(connection, NS(conn, C_STANDALONE), CS_VERBOSE | CS_HARD);
4547}
4548
4549static int drbd_disconnected(struct drbd_peer_device *peer_device)
4550{
4551 struct drbd_device *device = peer_device->device;
4552 unsigned int i;
4553
4554 /* wait for current activity to cease. */
4555 spin_lock_irq(&device->resource->req_lock);
4556 _drbd_wait_ee_list_empty(device, &device->active_ee);
4557 _drbd_wait_ee_list_empty(device, &device->sync_ee);
4558 _drbd_wait_ee_list_empty(device, &device->read_ee);
4559 spin_unlock_irq(&device->resource->req_lock);
4560
4561 /* We do not have data structures that would allow us to
4562 * get the rs_pending_cnt down to 0 again.
4563 * * On C_SYNC_TARGET we do not have any data structures describing
4564 * the pending RSDataRequest's we have sent.
4565 * * On C_SYNC_SOURCE there is no data structure that tracks
4566 * the P_RS_DATA_REPLY blocks that we sent to the SyncTarget.
4567 * And no, it is not the sum of the reference counts in the
4568 * resync_LRU. The resync_LRU tracks the whole operation including
4569 * the disk-IO, while the rs_pending_cnt only tracks the blocks
4570 * on the fly. */
4571 drbd_rs_cancel_all(device);
4572 device->rs_total = 0;
4573 device->rs_failed = 0;
4574 atomic_set(&device->rs_pending_cnt, 0);
4575 wake_up(&device->misc_wait);
4576
4577 del_timer_sync(&device->resync_timer);
4578 resync_timer_fn((unsigned long)device);
4579
4580 /* wait for all w_e_end_data_req, w_e_end_rsdata_req, w_send_barrier,
4581 * w_make_resync_request etc. which may still be on the worker queue
4582 * to be "canceled" */
4583 drbd_flush_workqueue(device);
4584
4585 drbd_finish_peer_reqs(device);
4586
4587 /* This second workqueue flush is necessary, since drbd_finish_peer_reqs()
4588 might have issued a work again. The one before drbd_finish_peer_reqs() is
4589 necessary to reclain net_ee in drbd_finish_peer_reqs(). */
4590 drbd_flush_workqueue(device);
4591
4592 /* need to do it again, drbd_finish_peer_reqs() may have populated it
4593 * again via drbd_try_clear_on_disk_bm(). */
4594 drbd_rs_cancel_all(device);
4595
4596 kfree(device->p_uuid);
4597 device->p_uuid = NULL;
4598
4599 if (!drbd_suspended(device))
4600 tl_clear(peer_device->connection);
4601
4602 drbd_md_sync(device);
4603
4604 /* serialize with bitmap writeout triggered by the state change,
4605 * if any. */
4606 wait_event(device->misc_wait, !test_bit(BITMAP_IO, &device->flags));
4607
4608 /* tcp_close and release of sendpage pages can be deferred. I don't
4609 * want to use SO_LINGER, because apparently it can be deferred for
4610 * more than 20 seconds (longest time I checked).
4611 *
4612 * Actually we don't care for exactly when the network stack does its
4613 * put_page(), but release our reference on these pages right here.
4614 */
4615 i = drbd_free_peer_reqs(device, &device->net_ee);
4616 if (i)
4617 drbd_info(device, "net_ee not empty, killed %u entries\n", i);
4618 i = atomic_read(&device->pp_in_use_by_net);
4619 if (i)
4620 drbd_info(device, "pp_in_use_by_net = %d, expected 0\n", i);
4621 i = atomic_read(&device->pp_in_use);
4622 if (i)
4623 drbd_info(device, "pp_in_use = %d, expected 0\n", i);
4624
4625 D_ASSERT(device, list_empty(&device->read_ee));
4626 D_ASSERT(device, list_empty(&device->active_ee));
4627 D_ASSERT(device, list_empty(&device->sync_ee));
4628 D_ASSERT(device, list_empty(&device->done_ee));
4629
4630 return 0;
4631}
4632
4633/*
4634 * We support PRO_VERSION_MIN to PRO_VERSION_MAX. The protocol version
4635 * we can agree on is stored in agreed_pro_version.
4636 *
4637 * feature flags and the reserved array should be enough room for future
4638 * enhancements of the handshake protocol, and possible plugins...
4639 *
4640 * for now, they are expected to be zero, but ignored.
4641 */
4642static int drbd_send_features(struct drbd_connection *connection)
4643{
4644 struct drbd_socket *sock;
4645 struct p_connection_features *p;
4646
4647 sock = &connection->data;
4648 p = conn_prepare_command(connection, sock);
4649 if (!p)
4650 return -EIO;
4651 memset(p, 0, sizeof(*p));
4652 p->protocol_min = cpu_to_be32(PRO_VERSION_MIN);
4653 p->protocol_max = cpu_to_be32(PRO_VERSION_MAX);
4654 return conn_send_command(connection, sock, P_CONNECTION_FEATURES, sizeof(*p), NULL, 0);
4655}
4656
4657/*
4658 * return values:
4659 * 1 yes, we have a valid connection
4660 * 0 oops, did not work out, please try again
4661 * -1 peer talks different language,
4662 * no point in trying again, please go standalone.
4663 */
4664static int drbd_do_features(struct drbd_connection *connection)
4665{
4666 /* ASSERT current == connection->receiver ... */
4667 struct p_connection_features *p;
4668 const int expect = sizeof(struct p_connection_features);
4669 struct packet_info pi;
4670 int err;
4671
4672 err = drbd_send_features(connection);
4673 if (err)
4674 return 0;
4675
4676 err = drbd_recv_header(connection, &pi);
4677 if (err)
4678 return 0;
4679
4680 if (pi.cmd != P_CONNECTION_FEATURES) {
4681 drbd_err(connection, "expected ConnectionFeatures packet, received: %s (0x%04x)\n",
4682 cmdname(pi.cmd), pi.cmd);
4683 return -1;
4684 }
4685
4686 if (pi.size != expect) {
4687 drbd_err(connection, "expected ConnectionFeatures length: %u, received: %u\n",
4688 expect, pi.size);
4689 return -1;
4690 }
4691
4692 p = pi.data;
4693 err = drbd_recv_all_warn(connection, p, expect);
4694 if (err)
4695 return 0;
4696
4697 p->protocol_min = be32_to_cpu(p->protocol_min);
4698 p->protocol_max = be32_to_cpu(p->protocol_max);
4699 if (p->protocol_max == 0)
4700 p->protocol_max = p->protocol_min;
4701
4702 if (PRO_VERSION_MAX < p->protocol_min ||
4703 PRO_VERSION_MIN > p->protocol_max)
4704 goto incompat;
4705
4706 connection->agreed_pro_version = min_t(int, PRO_VERSION_MAX, p->protocol_max);
4707
4708 drbd_info(connection, "Handshake successful: "
4709 "Agreed network protocol version %d\n", connection->agreed_pro_version);
4710
4711 return 1;
4712
4713 incompat:
4714 drbd_err(connection, "incompatible DRBD dialects: "
4715 "I support %d-%d, peer supports %d-%d\n",
4716 PRO_VERSION_MIN, PRO_VERSION_MAX,
4717 p->protocol_min, p->protocol_max);
4718 return -1;
4719}
4720
4721#if !defined(CONFIG_CRYPTO_HMAC) && !defined(CONFIG_CRYPTO_HMAC_MODULE)
4722static int drbd_do_auth(struct drbd_connection *connection)
4723{
4724 drbd_err(connection, "This kernel was build without CONFIG_CRYPTO_HMAC.\n");
4725 drbd_err(connection, "You need to disable 'cram-hmac-alg' in drbd.conf.\n");
4726 return -1;
4727}
4728#else
4729#define CHALLENGE_LEN 64
4730
4731/* Return value:
4732 1 - auth succeeded,
4733 0 - failed, try again (network error),
4734 -1 - auth failed, don't try again.
4735*/
4736
4737static int drbd_do_auth(struct drbd_connection *connection)
4738{
4739 struct drbd_socket *sock;
4740 char my_challenge[CHALLENGE_LEN]; /* 64 Bytes... */
4741 struct scatterlist sg;
4742 char *response = NULL;
4743 char *right_response = NULL;
4744 char *peers_ch = NULL;
4745 unsigned int key_len;
4746 char secret[SHARED_SECRET_MAX]; /* 64 byte */
4747 unsigned int resp_size;
4748 struct hash_desc desc;
4749 struct packet_info pi;
4750 struct net_conf *nc;
4751 int err, rv;
4752
4753 /* FIXME: Put the challenge/response into the preallocated socket buffer. */
4754
4755 rcu_read_lock();
4756 nc = rcu_dereference(connection->net_conf);
4757 key_len = strlen(nc->shared_secret);
4758 memcpy(secret, nc->shared_secret, key_len);
4759 rcu_read_unlock();
4760
4761 desc.tfm = connection->cram_hmac_tfm;
4762 desc.flags = 0;
4763
4764 rv = crypto_hash_setkey(connection->cram_hmac_tfm, (u8 *)secret, key_len);
4765 if (rv) {
4766 drbd_err(connection, "crypto_hash_setkey() failed with %d\n", rv);
4767 rv = -1;
4768 goto fail;
4769 }
4770
4771 get_random_bytes(my_challenge, CHALLENGE_LEN);
4772
4773 sock = &connection->data;
4774 if (!conn_prepare_command(connection, sock)) {
4775 rv = 0;
4776 goto fail;
4777 }
4778 rv = !conn_send_command(connection, sock, P_AUTH_CHALLENGE, 0,
4779 my_challenge, CHALLENGE_LEN);
4780 if (!rv)
4781 goto fail;
4782
4783 err = drbd_recv_header(connection, &pi);
4784 if (err) {
4785 rv = 0;
4786 goto fail;
4787 }
4788
4789 if (pi.cmd != P_AUTH_CHALLENGE) {
4790 drbd_err(connection, "expected AuthChallenge packet, received: %s (0x%04x)\n",
4791 cmdname(pi.cmd), pi.cmd);
4792 rv = 0;
4793 goto fail;
4794 }
4795
4796 if (pi.size > CHALLENGE_LEN * 2) {
4797 drbd_err(connection, "expected AuthChallenge payload too big.\n");
4798 rv = -1;
4799 goto fail;
4800 }
4801
4802 peers_ch = kmalloc(pi.size, GFP_NOIO);
4803 if (peers_ch == NULL) {
4804 drbd_err(connection, "kmalloc of peers_ch failed\n");
4805 rv = -1;
4806 goto fail;
4807 }
4808
4809 err = drbd_recv_all_warn(connection, peers_ch, pi.size);
4810 if (err) {
4811 rv = 0;
4812 goto fail;
4813 }
4814
4815 resp_size = crypto_hash_digestsize(connection->cram_hmac_tfm);
4816 response = kmalloc(resp_size, GFP_NOIO);
4817 if (response == NULL) {
4818 drbd_err(connection, "kmalloc of response failed\n");
4819 rv = -1;
4820 goto fail;
4821 }
4822
4823 sg_init_table(&sg, 1);
4824 sg_set_buf(&sg, peers_ch, pi.size);
4825
4826 rv = crypto_hash_digest(&desc, &sg, sg.length, response);
4827 if (rv) {
4828 drbd_err(connection, "crypto_hash_digest() failed with %d\n", rv);
4829 rv = -1;
4830 goto fail;
4831 }
4832
4833 if (!conn_prepare_command(connection, sock)) {
4834 rv = 0;
4835 goto fail;
4836 }
4837 rv = !conn_send_command(connection, sock, P_AUTH_RESPONSE, 0,
4838 response, resp_size);
4839 if (!rv)
4840 goto fail;
4841
4842 err = drbd_recv_header(connection, &pi);
4843 if (err) {
4844 rv = 0;
4845 goto fail;
4846 }
4847
4848 if (pi.cmd != P_AUTH_RESPONSE) {
4849 drbd_err(connection, "expected AuthResponse packet, received: %s (0x%04x)\n",
4850 cmdname(pi.cmd), pi.cmd);
4851 rv = 0;
4852 goto fail;
4853 }
4854
4855 if (pi.size != resp_size) {
4856 drbd_err(connection, "expected AuthResponse payload of wrong size\n");
4857 rv = 0;
4858 goto fail;
4859 }
4860
4861 err = drbd_recv_all_warn(connection, response , resp_size);
4862 if (err) {
4863 rv = 0;
4864 goto fail;
4865 }
4866
4867 right_response = kmalloc(resp_size, GFP_NOIO);
4868 if (right_response == NULL) {
4869 drbd_err(connection, "kmalloc of right_response failed\n");
4870 rv = -1;
4871 goto fail;
4872 }
4873
4874 sg_set_buf(&sg, my_challenge, CHALLENGE_LEN);
4875
4876 rv = crypto_hash_digest(&desc, &sg, sg.length, right_response);
4877 if (rv) {
4878 drbd_err(connection, "crypto_hash_digest() failed with %d\n", rv);
4879 rv = -1;
4880 goto fail;
4881 }
4882
4883 rv = !memcmp(response, right_response, resp_size);
4884
4885 if (rv)
4886 drbd_info(connection, "Peer authenticated using %d bytes HMAC\n",
4887 resp_size);
4888 else
4889 rv = -1;
4890
4891 fail:
4892 kfree(peers_ch);
4893 kfree(response);
4894 kfree(right_response);
4895
4896 return rv;
4897}
4898#endif
4899
4900int drbd_receiver(struct drbd_thread *thi)
4901{
4902 struct drbd_connection *connection = thi->connection;
4903 int h;
4904
4905 drbd_info(connection, "receiver (re)started\n");
4906
4907 do {
4908 h = conn_connect(connection);
4909 if (h == 0) {
4910 conn_disconnect(connection);
4911 schedule_timeout_interruptible(HZ);
4912 }
4913 if (h == -1) {
4914 drbd_warn(connection, "Discarding network configuration.\n");
4915 conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD);
4916 }
4917 } while (h == 0);
4918
4919 if (h > 0)
4920 drbdd(connection);
4921
4922 conn_disconnect(connection);
4923
4924 drbd_info(connection, "receiver terminated\n");
4925 return 0;
4926}
4927
4928/* ********* acknowledge sender ******** */
4929
4930static int got_conn_RqSReply(struct drbd_connection *connection, struct packet_info *pi)
4931{
4932 struct p_req_state_reply *p = pi->data;
4933 int retcode = be32_to_cpu(p->retcode);
4934
4935 if (retcode >= SS_SUCCESS) {
4936 set_bit(CONN_WD_ST_CHG_OKAY, &connection->flags);
4937 } else {
4938 set_bit(CONN_WD_ST_CHG_FAIL, &connection->flags);
4939 drbd_err(connection, "Requested state change failed by peer: %s (%d)\n",
4940 drbd_set_st_err_str(retcode), retcode);
4941 }
4942 wake_up(&connection->ping_wait);
4943
4944 return 0;
4945}
4946
4947static int got_RqSReply(struct drbd_connection *connection, struct packet_info *pi)
4948{
4949 struct drbd_peer_device *peer_device;
4950 struct drbd_device *device;
4951 struct p_req_state_reply *p = pi->data;
4952 int retcode = be32_to_cpu(p->retcode);
4953
4954 peer_device = conn_peer_device(connection, pi->vnr);
4955 if (!peer_device)
4956 return -EIO;
4957 device = peer_device->device;
4958
4959 if (test_bit(CONN_WD_ST_CHG_REQ, &connection->flags)) {
4960 D_ASSERT(device, connection->agreed_pro_version < 100);
4961 return got_conn_RqSReply(connection, pi);
4962 }
4963
4964 if (retcode >= SS_SUCCESS) {
4965 set_bit(CL_ST_CHG_SUCCESS, &device->flags);
4966 } else {
4967 set_bit(CL_ST_CHG_FAIL, &device->flags);
4968 drbd_err(device, "Requested state change failed by peer: %s (%d)\n",
4969 drbd_set_st_err_str(retcode), retcode);
4970 }
4971 wake_up(&device->state_wait);
4972
4973 return 0;
4974}
4975
4976static int got_Ping(struct drbd_connection *connection, struct packet_info *pi)
4977{
4978 return drbd_send_ping_ack(connection);
4979
4980}
4981
4982static int got_PingAck(struct drbd_connection *connection, struct packet_info *pi)
4983{
4984 /* restore idle timeout */
4985 connection->meta.socket->sk->sk_rcvtimeo = connection->net_conf->ping_int*HZ;
4986 if (!test_and_set_bit(GOT_PING_ACK, &connection->flags))
4987 wake_up(&connection->ping_wait);
4988
4989 return 0;
4990}
4991
4992static int got_IsInSync(struct drbd_connection *connection, struct packet_info *pi)
4993{
4994 struct drbd_peer_device *peer_device;
4995 struct drbd_device *device;
4996 struct p_block_ack *p = pi->data;
4997 sector_t sector = be64_to_cpu(p->sector);
4998 int blksize = be32_to_cpu(p->blksize);
4999
5000 peer_device = conn_peer_device(connection, pi->vnr);
5001 if (!peer_device)
5002 return -EIO;
5003 device = peer_device->device;
5004
5005 D_ASSERT(device, peer_device->connection->agreed_pro_version >= 89);
5006
5007 update_peer_seq(peer_device, be32_to_cpu(p->seq_num));
5008
5009 if (get_ldev(device)) {
5010 drbd_rs_complete_io(device, sector);
5011 drbd_set_in_sync(device, sector, blksize);
5012 /* rs_same_csums is supposed to count in units of BM_BLOCK_SIZE */
5013 device->rs_same_csum += (blksize >> BM_BLOCK_SHIFT);
5014 put_ldev(device);
5015 }
5016 dec_rs_pending(device);
5017 atomic_add(blksize >> 9, &device->rs_sect_in);
5018
5019 return 0;
5020}
5021
5022static int
5023validate_req_change_req_state(struct drbd_device *device, u64 id, sector_t sector,
5024 struct rb_root *root, const char *func,
5025 enum drbd_req_event what, bool missing_ok)
5026{
5027 struct drbd_request *req;
5028 struct bio_and_error m;
5029
5030 spin_lock_irq(&device->resource->req_lock);
5031 req = find_request(device, root, id, sector, missing_ok, func);
5032 if (unlikely(!req)) {
5033 spin_unlock_irq(&device->resource->req_lock);
5034 return -EIO;
5035 }
5036 __req_mod(req, what, &m);
5037 spin_unlock_irq(&device->resource->req_lock);
5038
5039 if (m.bio)
5040 complete_master_bio(device, &m);
5041 return 0;
5042}
5043
5044static int got_BlockAck(struct drbd_connection *connection, struct packet_info *pi)
5045{
5046 struct drbd_peer_device *peer_device;
5047 struct drbd_device *device;
5048 struct p_block_ack *p = pi->data;
5049 sector_t sector = be64_to_cpu(p->sector);
5050 int blksize = be32_to_cpu(p->blksize);
5051 enum drbd_req_event what;
5052
5053 peer_device = conn_peer_device(connection, pi->vnr);
5054 if (!peer_device)
5055 return -EIO;
5056 device = peer_device->device;
5057
5058 update_peer_seq(peer_device, be32_to_cpu(p->seq_num));
5059
5060 if (p->block_id == ID_SYNCER) {
5061 drbd_set_in_sync(device, sector, blksize);
5062 dec_rs_pending(device);
5063 return 0;
5064 }
5065 switch (pi->cmd) {
5066 case P_RS_WRITE_ACK:
5067 what = WRITE_ACKED_BY_PEER_AND_SIS;
5068 break;
5069 case P_WRITE_ACK:
5070 what = WRITE_ACKED_BY_PEER;
5071 break;
5072 case P_RECV_ACK:
5073 what = RECV_ACKED_BY_PEER;
5074 break;
5075 case P_SUPERSEDED:
5076 what = CONFLICT_RESOLVED;
5077 break;
5078 case P_RETRY_WRITE:
5079 what = POSTPONE_WRITE;
5080 break;
5081 default:
5082 BUG();
5083 }
5084
5085 return validate_req_change_req_state(device, p->block_id, sector,
5086 &device->write_requests, __func__,
5087 what, false);
5088}
5089
5090static int got_NegAck(struct drbd_connection *connection, struct packet_info *pi)
5091{
5092 struct drbd_peer_device *peer_device;
5093 struct drbd_device *device;
5094 struct p_block_ack *p = pi->data;
5095 sector_t sector = be64_to_cpu(p->sector);
5096 int size = be32_to_cpu(p->blksize);
5097 int err;
5098
5099 peer_device = conn_peer_device(connection, pi->vnr);
5100 if (!peer_device)
5101 return -EIO;
5102 device = peer_device->device;
5103
5104 update_peer_seq(peer_device, be32_to_cpu(p->seq_num));
5105
5106 if (p->block_id == ID_SYNCER) {
5107 dec_rs_pending(device);
5108 drbd_rs_failed_io(device, sector, size);
5109 return 0;
5110 }
5111
5112 err = validate_req_change_req_state(device, p->block_id, sector,
5113 &device->write_requests, __func__,
5114 NEG_ACKED, true);
5115 if (err) {
5116 /* Protocol A has no P_WRITE_ACKs, but has P_NEG_ACKs.
5117 The master bio might already be completed, therefore the
5118 request is no longer in the collision hash. */
5119 /* In Protocol B we might already have got a P_RECV_ACK
5120 but then get a P_NEG_ACK afterwards. */
5121 drbd_set_out_of_sync(device, sector, size);
5122 }
5123 return 0;
5124}
5125
5126static int got_NegDReply(struct drbd_connection *connection, struct packet_info *pi)
5127{
5128 struct drbd_peer_device *peer_device;
5129 struct drbd_device *device;
5130 struct p_block_ack *p = pi->data;
5131 sector_t sector = be64_to_cpu(p->sector);
5132
5133 peer_device = conn_peer_device(connection, pi->vnr);
5134 if (!peer_device)
5135 return -EIO;
5136 device = peer_device->device;
5137
5138 update_peer_seq(peer_device, be32_to_cpu(p->seq_num));
5139
5140 drbd_err(device, "Got NegDReply; Sector %llus, len %u.\n",
5141 (unsigned long long)sector, be32_to_cpu(p->blksize));
5142
5143 return validate_req_change_req_state(device, p->block_id, sector,
5144 &device->read_requests, __func__,
5145 NEG_ACKED, false);
5146}
5147
5148static int got_NegRSDReply(struct drbd_connection *connection, struct packet_info *pi)
5149{
5150 struct drbd_peer_device *peer_device;
5151 struct drbd_device *device;
5152 sector_t sector;
5153 int size;
5154 struct p_block_ack *p = pi->data;
5155
5156 peer_device = conn_peer_device(connection, pi->vnr);
5157 if (!peer_device)
5158 return -EIO;
5159 device = peer_device->device;
5160
5161 sector = be64_to_cpu(p->sector);
5162 size = be32_to_cpu(p->blksize);
5163
5164 update_peer_seq(peer_device, be32_to_cpu(p->seq_num));
5165
5166 dec_rs_pending(device);
5167
5168 if (get_ldev_if_state(device, D_FAILED)) {
5169 drbd_rs_complete_io(device, sector);
5170 switch (pi->cmd) {
5171 case P_NEG_RS_DREPLY:
5172 drbd_rs_failed_io(device, sector, size);
5173 case P_RS_CANCEL:
5174 break;
5175 default:
5176 BUG();
5177 }
5178 put_ldev(device);
5179 }
5180
5181 return 0;
5182}
5183
5184static int got_BarrierAck(struct drbd_connection *connection, struct packet_info *pi)
5185{
5186 struct p_barrier_ack *p = pi->data;
5187 struct drbd_peer_device *peer_device;
5188 int vnr;
5189
5190 tl_release(connection, p->barrier, be32_to_cpu(p->set_size));
5191
5192 rcu_read_lock();
5193 idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
5194 struct drbd_device *device = peer_device->device;
5195
5196 if (device->state.conn == C_AHEAD &&
5197 atomic_read(&device->ap_in_flight) == 0 &&
5198 !test_and_set_bit(AHEAD_TO_SYNC_SOURCE, &device->flags)) {
5199 device->start_resync_timer.expires = jiffies + HZ;
5200 add_timer(&device->start_resync_timer);
5201 }
5202 }
5203 rcu_read_unlock();
5204
5205 return 0;
5206}
5207
5208static int got_OVResult(struct drbd_connection *connection, struct packet_info *pi)
5209{
5210 struct drbd_peer_device *peer_device;
5211 struct drbd_device *device;
5212 struct p_block_ack *p = pi->data;
5213 struct drbd_work *w;
5214 sector_t sector;
5215 int size;
5216
5217 peer_device = conn_peer_device(connection, pi->vnr);
5218 if (!peer_device)
5219 return -EIO;
5220 device = peer_device->device;
5221
5222 sector = be64_to_cpu(p->sector);
5223 size = be32_to_cpu(p->blksize);
5224
5225 update_peer_seq(peer_device, be32_to_cpu(p->seq_num));
5226
5227 if (be64_to_cpu(p->block_id) == ID_OUT_OF_SYNC)
5228 drbd_ov_out_of_sync_found(device, sector, size);
5229 else
5230 ov_out_of_sync_print(device);
5231
5232 if (!get_ldev(device))
5233 return 0;
5234
5235 drbd_rs_complete_io(device, sector);
5236 dec_rs_pending(device);
5237
5238 --device->ov_left;
5239
5240 /* let's advance progress step marks only for every other megabyte */
5241 if ((device->ov_left & 0x200) == 0x200)
5242 drbd_advance_rs_marks(device, device->ov_left);
5243
5244 if (device->ov_left == 0) {
5245 w = kmalloc(sizeof(*w), GFP_NOIO);
5246 if (w) {
5247 w->cb = w_ov_finished;
5248 w->device = device;
5249 drbd_queue_work(&peer_device->connection->sender_work, w);
5250 } else {
5251 drbd_err(device, "kmalloc(w) failed.");
5252 ov_out_of_sync_print(device);
5253 drbd_resync_finished(device);
5254 }
5255 }
5256 put_ldev(device);
5257 return 0;
5258}
5259
5260static int got_skip(struct drbd_connection *connection, struct packet_info *pi)
5261{
5262 return 0;
5263}
5264
5265static int connection_finish_peer_reqs(struct drbd_connection *connection)
5266{
5267 struct drbd_peer_device *peer_device;
5268 int vnr, not_empty = 0;
5269
5270 do {
5271 clear_bit(SIGNAL_ASENDER, &connection->flags);
5272 flush_signals(current);
5273
5274 rcu_read_lock();
5275 idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
5276 struct drbd_device *device = peer_device->device;
5277 kref_get(&device->kref);
5278 rcu_read_unlock();
5279 if (drbd_finish_peer_reqs(device)) {
5280 kref_put(&device->kref, drbd_destroy_device);
5281 return 1;
5282 }
5283 kref_put(&device->kref, drbd_destroy_device);
5284 rcu_read_lock();
5285 }
5286 set_bit(SIGNAL_ASENDER, &connection->flags);
5287
5288 spin_lock_irq(&connection->resource->req_lock);
5289 idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
5290 struct drbd_device *device = peer_device->device;
5291 not_empty = !list_empty(&device->done_ee);
5292 if (not_empty)
5293 break;
5294 }
5295 spin_unlock_irq(&connection->resource->req_lock);
5296 rcu_read_unlock();
5297 } while (not_empty);
5298
5299 return 0;
5300}
5301
5302struct asender_cmd {
5303 size_t pkt_size;
5304 int (*fn)(struct drbd_connection *connection, struct packet_info *);
5305};
5306
5307static struct asender_cmd asender_tbl[] = {
5308 [P_PING] = { 0, got_Ping },
5309 [P_PING_ACK] = { 0, got_PingAck },
5310 [P_RECV_ACK] = { sizeof(struct p_block_ack), got_BlockAck },
5311 [P_WRITE_ACK] = { sizeof(struct p_block_ack), got_BlockAck },
5312 [P_RS_WRITE_ACK] = { sizeof(struct p_block_ack), got_BlockAck },
5313 [P_SUPERSEDED] = { sizeof(struct p_block_ack), got_BlockAck },
5314 [P_NEG_ACK] = { sizeof(struct p_block_ack), got_NegAck },
5315 [P_NEG_DREPLY] = { sizeof(struct p_block_ack), got_NegDReply },
5316 [P_NEG_RS_DREPLY] = { sizeof(struct p_block_ack), got_NegRSDReply },
5317 [P_OV_RESULT] = { sizeof(struct p_block_ack), got_OVResult },
5318 [P_BARRIER_ACK] = { sizeof(struct p_barrier_ack), got_BarrierAck },
5319 [P_STATE_CHG_REPLY] = { sizeof(struct p_req_state_reply), got_RqSReply },
5320 [P_RS_IS_IN_SYNC] = { sizeof(struct p_block_ack), got_IsInSync },
5321 [P_DELAY_PROBE] = { sizeof(struct p_delay_probe93), got_skip },
5322 [P_RS_CANCEL] = { sizeof(struct p_block_ack), got_NegRSDReply },
5323 [P_CONN_ST_CHG_REPLY]={ sizeof(struct p_req_state_reply), got_conn_RqSReply },
5324 [P_RETRY_WRITE] = { sizeof(struct p_block_ack), got_BlockAck },
5325};
5326
5327int drbd_asender(struct drbd_thread *thi)
5328{
5329 struct drbd_connection *connection = thi->connection;
5330 struct asender_cmd *cmd = NULL;
5331 struct packet_info pi;
5332 int rv;
5333 void *buf = connection->meta.rbuf;
5334 int received = 0;
5335 unsigned int header_size = drbd_header_size(connection);
5336 int expect = header_size;
5337 bool ping_timeout_active = false;
5338 struct net_conf *nc;
5339 int ping_timeo, tcp_cork, ping_int;
5340 struct sched_param param = { .sched_priority = 2 };
5341
5342 rv = sched_setscheduler(current, SCHED_RR, &param);
5343 if (rv < 0)
5344 drbd_err(connection, "drbd_asender: ERROR set priority, ret=%d\n", rv);
5345
5346 while (get_t_state(thi) == RUNNING) {
5347 drbd_thread_current_set_cpu(thi);
5348
5349 rcu_read_lock();
5350 nc = rcu_dereference(connection->net_conf);
5351 ping_timeo = nc->ping_timeo;
5352 tcp_cork = nc->tcp_cork;
5353 ping_int = nc->ping_int;
5354 rcu_read_unlock();
5355
5356 if (test_and_clear_bit(SEND_PING, &connection->flags)) {
5357 if (drbd_send_ping(connection)) {
5358 drbd_err(connection, "drbd_send_ping has failed\n");
5359 goto reconnect;
5360 }
5361 connection->meta.socket->sk->sk_rcvtimeo = ping_timeo * HZ / 10;
5362 ping_timeout_active = true;
5363 }
5364
5365 /* TODO: conditionally cork; it may hurt latency if we cork without
5366 much to send */
5367 if (tcp_cork)
5368 drbd_tcp_cork(connection->meta.socket);
5369 if (connection_finish_peer_reqs(connection)) {
5370 drbd_err(connection, "connection_finish_peer_reqs() failed\n");
5371 goto reconnect;
5372 }
5373 /* but unconditionally uncork unless disabled */
5374 if (tcp_cork)
5375 drbd_tcp_uncork(connection->meta.socket);
5376
5377 /* short circuit, recv_msg would return EINTR anyways. */
5378 if (signal_pending(current))
5379 continue;
5380
5381 rv = drbd_recv_short(connection->meta.socket, buf, expect-received, 0);
5382 clear_bit(SIGNAL_ASENDER, &connection->flags);
5383
5384 flush_signals(current);
5385
5386 /* Note:
5387 * -EINTR (on meta) we got a signal
5388 * -EAGAIN (on meta) rcvtimeo expired
5389 * -ECONNRESET other side closed the connection
5390 * -ERESTARTSYS (on data) we got a signal
5391 * rv < 0 other than above: unexpected error!
5392 * rv == expected: full header or command
5393 * rv < expected: "woken" by signal during receive
5394 * rv == 0 : "connection shut down by peer"
5395 */
5396 if (likely(rv > 0)) {
5397 received += rv;
5398 buf += rv;
5399 } else if (rv == 0) {
5400 if (test_bit(DISCONNECT_SENT, &connection->flags)) {
5401 long t;
5402 rcu_read_lock();
5403 t = rcu_dereference(connection->net_conf)->ping_timeo * HZ/10;
5404 rcu_read_unlock();
5405
5406 t = wait_event_timeout(connection->ping_wait,
5407 connection->cstate < C_WF_REPORT_PARAMS,
5408 t);
5409 if (t)
5410 break;
5411 }
5412 drbd_err(connection, "meta connection shut down by peer.\n");
5413 goto reconnect;
5414 } else if (rv == -EAGAIN) {
5415 /* If the data socket received something meanwhile,
5416 * that is good enough: peer is still alive. */
5417 if (time_after(connection->last_received,
5418 jiffies - connection->meta.socket->sk->sk_rcvtimeo))
5419 continue;
5420 if (ping_timeout_active) {
5421 drbd_err(connection, "PingAck did not arrive in time.\n");
5422 goto reconnect;
5423 }
5424 set_bit(SEND_PING, &connection->flags);
5425 continue;
5426 } else if (rv == -EINTR) {
5427 continue;
5428 } else {
5429 drbd_err(connection, "sock_recvmsg returned %d\n", rv);
5430 goto reconnect;
5431 }
5432
5433 if (received == expect && cmd == NULL) {
5434 if (decode_header(connection, connection->meta.rbuf, &pi))
5435 goto reconnect;
5436 cmd = &asender_tbl[pi.cmd];
5437 if (pi.cmd >= ARRAY_SIZE(asender_tbl) || !cmd->fn) {
5438 drbd_err(connection, "Unexpected meta packet %s (0x%04x)\n",
5439 cmdname(pi.cmd), pi.cmd);
5440 goto disconnect;
5441 }
5442 expect = header_size + cmd->pkt_size;
5443 if (pi.size != expect - header_size) {
5444 drbd_err(connection, "Wrong packet size on meta (c: %d, l: %d)\n",
5445 pi.cmd, pi.size);
5446 goto reconnect;
5447 }
5448 }
5449 if (received == expect) {
5450 bool err;
5451
5452 err = cmd->fn(connection, &pi);
5453 if (err) {
5454 drbd_err(connection, "%pf failed\n", cmd->fn);
5455 goto reconnect;
5456 }
5457
5458 connection->last_received = jiffies;
5459
5460 if (cmd == &asender_tbl[P_PING_ACK]) {
5461 /* restore idle timeout */
5462 connection->meta.socket->sk->sk_rcvtimeo = ping_int * HZ;
5463 ping_timeout_active = false;
5464 }
5465
5466 buf = connection->meta.rbuf;
5467 received = 0;
5468 expect = header_size;
5469 cmd = NULL;
5470 }
5471 }
5472
5473 if (0) {
5474reconnect:
5475 conn_request_state(connection, NS(conn, C_NETWORK_FAILURE), CS_HARD);
5476 conn_md_sync(connection);
5477 }
5478 if (0) {
5479disconnect:
5480 conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD);
5481 }
5482 clear_bit(SIGNAL_ASENDER, &connection->flags);
5483
5484 drbd_info(connection, "asender terminated\n");
5485
5486 return 0;
5487}
This page took 0.043949 seconds and 5 git commands to generate.