4 This file is part of DRBD by Philipp Reisner and Lars Ellenberg.
6 Copyright (C) 2001-2008, LINBIT Information Technologies GmbH.
7 Copyright (C) 1999-2008, Philipp Reisner <philipp.reisner@linbit.com>.
8 Copyright (C) 2002-2008, Lars Ellenberg <lars.ellenberg@linbit.com>.
10 drbd is free software; you can redistribute it and/or modify
11 it under the terms of the GNU General Public License as published by
12 the Free Software Foundation; either version 2, or (at your option)
15 drbd is distributed in the hope that it will be useful,
16 but WITHOUT ANY WARRANTY; without even the implied warranty of
17 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18 GNU General Public License for more details.
20 You should have received a copy of the GNU General Public License
21 along with drbd; see the file COPYING. If not, write to
22 the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
26 #include <linux/module.h>
28 #include <asm/uaccess.h>
31 #include <linux/drbd.h>
33 #include <linux/file.h>
36 #include <linux/memcontrol.h>
37 #include <linux/mm_inline.h>
38 #include <linux/slab.h>
39 #include <linux/pkt_sched.h>
40 #define __KERNEL_SYSCALLS__
41 #include <linux/unistd.h>
42 #include <linux/vmalloc.h>
43 #include <linux/random.h>
44 #include <linux/string.h>
45 #include <linux/scatterlist.h>
63 static int drbd_do_handshake(struct drbd_tconn
*tconn
);
64 static int drbd_do_auth(struct drbd_tconn
*tconn
);
65 static int drbd_disconnected(int vnr
, void *p
, void *data
);
67 static enum finish_epoch
drbd_may_finish_epoch(struct drbd_conf
*, struct drbd_epoch
*, enum epoch_event
);
68 static int e_end_block(struct drbd_work
*, int);
71 #define GFP_TRY (__GFP_HIGHMEM | __GFP_NOWARN)
74 * some helper functions to deal with single linked page lists,
75 * page->private being our "next" pointer.
78 /* If at least n pages are linked at head, get n pages off.
79 * Otherwise, don't modify head, and return NULL.
80 * Locking is the responsibility of the caller.
82 static struct page
*page_chain_del(struct page
**head
, int n
)
96 tmp
= page_chain_next(page
);
98 break; /* found sufficient pages */
100 /* insufficient pages, don't use any of them. */
105 /* add end of list marker for the returned list */
106 set_page_private(page
, 0);
107 /* actual return value, and adjustment of head */
113 /* may be used outside of locks to find the tail of a (usually short)
114 * "private" page chain, before adding it back to a global chain head
115 * with page_chain_add() under a spinlock. */
116 static struct page
*page_chain_tail(struct page
*page
, int *len
)
120 while ((tmp
= page_chain_next(page
)))
127 static int page_chain_free(struct page
*page
)
131 page_chain_for_each_safe(page
, tmp
) {
138 static void page_chain_add(struct page
**head
,
139 struct page
*chain_first
, struct page
*chain_last
)
143 tmp
= page_chain_tail(chain_first
, NULL
);
144 BUG_ON(tmp
!= chain_last
);
147 /* add chain to head */
148 set_page_private(chain_last
, (unsigned long)*head
);
152 static struct page
*drbd_pp_first_pages_or_try_alloc(struct drbd_conf
*mdev
, int number
)
154 struct page
*page
= NULL
;
155 struct page
*tmp
= NULL
;
158 /* Yes, testing drbd_pp_vacant outside the lock is racy.
159 * So what. It saves a spin_lock. */
160 if (drbd_pp_vacant
>= number
) {
161 spin_lock(&drbd_pp_lock
);
162 page
= page_chain_del(&drbd_pp_pool
, number
);
164 drbd_pp_vacant
-= number
;
165 spin_unlock(&drbd_pp_lock
);
170 /* GFP_TRY, because we must not cause arbitrary write-out: in a DRBD
171 * "criss-cross" setup, that might cause write-out on some other DRBD,
172 * which in turn might block on the other node at this very place. */
173 for (i
= 0; i
< number
; i
++) {
174 tmp
= alloc_page(GFP_TRY
);
177 set_page_private(tmp
, (unsigned long)page
);
184 /* Not enough pages immediately available this time.
185 * No need to jump around here, drbd_pp_alloc will retry this
186 * function "soon". */
188 tmp
= page_chain_tail(page
, NULL
);
189 spin_lock(&drbd_pp_lock
);
190 page_chain_add(&drbd_pp_pool
, page
, tmp
);
192 spin_unlock(&drbd_pp_lock
);
197 static void reclaim_net_ee(struct drbd_conf
*mdev
, struct list_head
*to_be_freed
)
199 struct drbd_peer_request
*peer_req
;
200 struct list_head
*le
, *tle
;
202 /* The EEs are always appended to the end of the list. Since
203 they are sent in order over the wire, they have to finish
204 in order. As soon as we see the first not finished we can
205 stop to examine the list... */
207 list_for_each_safe(le
, tle
, &mdev
->net_ee
) {
208 peer_req
= list_entry(le
, struct drbd_peer_request
, w
.list
);
209 if (drbd_ee_has_active_page(peer_req
))
211 list_move(le
, to_be_freed
);
215 static void drbd_kick_lo_and_reclaim_net(struct drbd_conf
*mdev
)
217 LIST_HEAD(reclaimed
);
218 struct drbd_peer_request
*peer_req
, *t
;
220 spin_lock_irq(&mdev
->tconn
->req_lock
);
221 reclaim_net_ee(mdev
, &reclaimed
);
222 spin_unlock_irq(&mdev
->tconn
->req_lock
);
224 list_for_each_entry_safe(peer_req
, t
, &reclaimed
, w
.list
)
225 drbd_free_net_ee(mdev
, peer_req
);
229 * drbd_pp_alloc() - Returns @number pages, retries forever (or until signalled)
230 * @mdev: DRBD device.
231 * @number: number of pages requested
232 * @retry: whether to retry, if not enough pages are available right now
234 * Tries to allocate number pages, first from our own page pool, then from
235 * the kernel, unless this allocation would exceed the max_buffers setting.
236 * Possibly retry until DRBD frees sufficient pages somewhere else.
238 * Returns a page chain linked via page->private.
240 static struct page
*drbd_pp_alloc(struct drbd_conf
*mdev
, unsigned number
, bool retry
)
242 struct page
*page
= NULL
;
245 /* Yes, we may run up to @number over max_buffers. If we
246 * follow it strictly, the admin will get it wrong anyways. */
247 if (atomic_read(&mdev
->pp_in_use
) < mdev
->tconn
->net_conf
->max_buffers
)
248 page
= drbd_pp_first_pages_or_try_alloc(mdev
, number
);
250 while (page
== NULL
) {
251 prepare_to_wait(&drbd_pp_wait
, &wait
, TASK_INTERRUPTIBLE
);
253 drbd_kick_lo_and_reclaim_net(mdev
);
255 if (atomic_read(&mdev
->pp_in_use
) < mdev
->tconn
->net_conf
->max_buffers
) {
256 page
= drbd_pp_first_pages_or_try_alloc(mdev
, number
);
264 if (signal_pending(current
)) {
265 dev_warn(DEV
, "drbd_pp_alloc interrupted!\n");
271 finish_wait(&drbd_pp_wait
, &wait
);
274 atomic_add(number
, &mdev
->pp_in_use
);
278 /* Must not be used from irq, as that may deadlock: see drbd_pp_alloc.
279 * Is also used from inside an other spin_lock_irq(&mdev->tconn->req_lock);
280 * Either links the page chain back to the global pool,
281 * or returns all pages to the system. */
282 static void drbd_pp_free(struct drbd_conf
*mdev
, struct page
*page
, int is_net
)
284 atomic_t
*a
= is_net
? &mdev
->pp_in_use_by_net
: &mdev
->pp_in_use
;
287 if (drbd_pp_vacant
> (DRBD_MAX_BIO_SIZE
/PAGE_SIZE
)*minor_count
)
288 i
= page_chain_free(page
);
291 tmp
= page_chain_tail(page
, &i
);
292 spin_lock(&drbd_pp_lock
);
293 page_chain_add(&drbd_pp_pool
, page
, tmp
);
295 spin_unlock(&drbd_pp_lock
);
297 i
= atomic_sub_return(i
, a
);
299 dev_warn(DEV
, "ASSERTION FAILED: %s: %d < 0\n",
300 is_net
? "pp_in_use_by_net" : "pp_in_use", i
);
301 wake_up(&drbd_pp_wait
);
305 You need to hold the req_lock:
306 _drbd_wait_ee_list_empty()
308 You must not have the req_lock:
314 drbd_process_done_ee()
316 drbd_wait_ee_list_empty()
319 struct drbd_peer_request
*
320 drbd_alloc_ee(struct drbd_conf
*mdev
, u64 id
, sector_t sector
,
321 unsigned int data_size
, gfp_t gfp_mask
) __must_hold(local
)
323 struct drbd_peer_request
*peer_req
;
325 unsigned nr_pages
= (data_size
+ PAGE_SIZE
-1) >> PAGE_SHIFT
;
327 if (drbd_insert_fault(mdev
, DRBD_FAULT_AL_EE
))
330 peer_req
= mempool_alloc(drbd_ee_mempool
, gfp_mask
& ~__GFP_HIGHMEM
);
332 if (!(gfp_mask
& __GFP_NOWARN
))
333 dev_err(DEV
, "alloc_ee: Allocation of an EE failed\n");
337 page
= drbd_pp_alloc(mdev
, nr_pages
, (gfp_mask
& __GFP_WAIT
));
341 drbd_clear_interval(&peer_req
->i
);
342 peer_req
->i
.size
= data_size
;
343 peer_req
->i
.sector
= sector
;
344 peer_req
->i
.local
= false;
345 peer_req
->i
.waiting
= false;
347 peer_req
->epoch
= NULL
;
348 peer_req
->w
.mdev
= mdev
;
349 peer_req
->pages
= page
;
350 atomic_set(&peer_req
->pending_bios
, 0);
353 * The block_id is opaque to the receiver. It is not endianness
354 * converted, and sent back to the sender unchanged.
356 peer_req
->block_id
= id
;
361 mempool_free(peer_req
, drbd_ee_mempool
);
365 void drbd_free_some_ee(struct drbd_conf
*mdev
, struct drbd_peer_request
*peer_req
,
368 if (peer_req
->flags
& EE_HAS_DIGEST
)
369 kfree(peer_req
->digest
);
370 drbd_pp_free(mdev
, peer_req
->pages
, is_net
);
371 D_ASSERT(atomic_read(&peer_req
->pending_bios
) == 0);
372 D_ASSERT(drbd_interval_empty(&peer_req
->i
));
373 mempool_free(peer_req
, drbd_ee_mempool
);
376 int drbd_release_ee(struct drbd_conf
*mdev
, struct list_head
*list
)
378 LIST_HEAD(work_list
);
379 struct drbd_peer_request
*peer_req
, *t
;
381 int is_net
= list
== &mdev
->net_ee
;
383 spin_lock_irq(&mdev
->tconn
->req_lock
);
384 list_splice_init(list
, &work_list
);
385 spin_unlock_irq(&mdev
->tconn
->req_lock
);
387 list_for_each_entry_safe(peer_req
, t
, &work_list
, w
.list
) {
388 drbd_free_some_ee(mdev
, peer_req
, is_net
);
395 /* See also comments in _req_mod(,BARRIER_ACKED)
396 * and receive_Barrier.
398 * Move entries from net_ee to done_ee, if ready.
399 * Grab done_ee, call all callbacks, free the entries.
400 * The callbacks typically send out ACKs.
402 static int drbd_process_done_ee(struct drbd_conf
*mdev
)
404 LIST_HEAD(work_list
);
405 LIST_HEAD(reclaimed
);
406 struct drbd_peer_request
*peer_req
, *t
;
407 int ok
= (mdev
->state
.conn
>= C_WF_REPORT_PARAMS
);
409 spin_lock_irq(&mdev
->tconn
->req_lock
);
410 reclaim_net_ee(mdev
, &reclaimed
);
411 list_splice_init(&mdev
->done_ee
, &work_list
);
412 spin_unlock_irq(&mdev
->tconn
->req_lock
);
414 list_for_each_entry_safe(peer_req
, t
, &reclaimed
, w
.list
)
415 drbd_free_net_ee(mdev
, peer_req
);
417 /* possible callbacks here:
418 * e_end_block, and e_end_resync_block, e_send_discard_ack.
419 * all ignore the last argument.
421 list_for_each_entry_safe(peer_req
, t
, &work_list
, w
.list
) {
422 /* list_del not necessary, next/prev members not touched */
423 ok
= peer_req
->w
.cb(&peer_req
->w
, !ok
) && ok
;
424 drbd_free_ee(mdev
, peer_req
);
426 wake_up(&mdev
->ee_wait
);
431 void _drbd_wait_ee_list_empty(struct drbd_conf
*mdev
, struct list_head
*head
)
435 /* avoids spin_lock/unlock
436 * and calling prepare_to_wait in the fast path */
437 while (!list_empty(head
)) {
438 prepare_to_wait(&mdev
->ee_wait
, &wait
, TASK_UNINTERRUPTIBLE
);
439 spin_unlock_irq(&mdev
->tconn
->req_lock
);
441 finish_wait(&mdev
->ee_wait
, &wait
);
442 spin_lock_irq(&mdev
->tconn
->req_lock
);
446 void drbd_wait_ee_list_empty(struct drbd_conf
*mdev
, struct list_head
*head
)
448 spin_lock_irq(&mdev
->tconn
->req_lock
);
449 _drbd_wait_ee_list_empty(mdev
, head
);
450 spin_unlock_irq(&mdev
->tconn
->req_lock
);
453 /* see also kernel_accept; which is only present since 2.6.18.
454 * also we want to log which part of it failed, exactly */
455 static int drbd_accept(const char **what
, struct socket
*sock
, struct socket
**newsock
)
457 struct sock
*sk
= sock
->sk
;
461 err
= sock
->ops
->listen(sock
, 5);
465 *what
= "sock_create_lite";
466 err
= sock_create_lite(sk
->sk_family
, sk
->sk_type
, sk
->sk_protocol
,
472 err
= sock
->ops
->accept(sock
, *newsock
, 0);
474 sock_release(*newsock
);
478 (*newsock
)->ops
= sock
->ops
;
484 static int drbd_recv_short(struct socket
*sock
, void *buf
, size_t size
, int flags
)
491 struct msghdr msg
= {
493 .msg_iov
= (struct iovec
*)&iov
,
494 .msg_flags
= (flags
? flags
: MSG_WAITALL
| MSG_NOSIGNAL
)
500 rv
= sock_recvmsg(sock
, &msg
, size
, msg
.msg_flags
);
506 static int drbd_recv(struct drbd_tconn
*tconn
, void *buf
, size_t size
)
513 struct msghdr msg
= {
515 .msg_iov
= (struct iovec
*)&iov
,
516 .msg_flags
= MSG_WAITALL
| MSG_NOSIGNAL
524 rv
= sock_recvmsg(tconn
->data
.socket
, &msg
, size
, msg
.msg_flags
);
529 * ECONNRESET other side closed the connection
530 * ERESTARTSYS (on sock) we got a signal
534 if (rv
== -ECONNRESET
)
535 conn_info(tconn
, "sock was reset by peer\n");
536 else if (rv
!= -ERESTARTSYS
)
537 conn_err(tconn
, "sock_recvmsg returned %d\n", rv
);
539 } else if (rv
== 0) {
540 conn_info(tconn
, "sock was shut down by peer\n");
543 /* signal came in, or peer/link went down,
544 * after we read a partial message
546 /* D_ASSERT(signal_pending(current)); */
554 conn_request_state(tconn
, NS(conn
, C_BROKEN_PIPE
), CS_HARD
);
560 * On individual connections, the socket buffer size must be set prior to the
561 * listen(2) or connect(2) calls in order to have it take effect.
562 * This is our wrapper to do so.
564 static void drbd_setbufsize(struct socket
*sock
, unsigned int snd
,
567 /* open coded SO_SNDBUF, SO_RCVBUF */
569 sock
->sk
->sk_sndbuf
= snd
;
570 sock
->sk
->sk_userlocks
|= SOCK_SNDBUF_LOCK
;
573 sock
->sk
->sk_rcvbuf
= rcv
;
574 sock
->sk
->sk_userlocks
|= SOCK_RCVBUF_LOCK
;
578 static struct socket
*drbd_try_connect(struct drbd_tconn
*tconn
)
582 struct sockaddr_in6 src_in6
;
584 int disconnect_on_error
= 1;
586 if (!get_net_conf(tconn
))
589 what
= "sock_create_kern";
590 err
= sock_create_kern(((struct sockaddr
*)tconn
->net_conf
->my_addr
)->sa_family
,
591 SOCK_STREAM
, IPPROTO_TCP
, &sock
);
597 sock
->sk
->sk_rcvtimeo
=
598 sock
->sk
->sk_sndtimeo
= tconn
->net_conf
->try_connect_int
*HZ
;
599 drbd_setbufsize(sock
, tconn
->net_conf
->sndbuf_size
,
600 tconn
->net_conf
->rcvbuf_size
);
602 /* explicitly bind to the configured IP as source IP
603 * for the outgoing connections.
604 * This is needed for multihomed hosts and to be
605 * able to use lo: interfaces for drbd.
606 * Make sure to use 0 as port number, so linux selects
607 * a free one dynamically.
609 memcpy(&src_in6
, tconn
->net_conf
->my_addr
,
610 min_t(int, tconn
->net_conf
->my_addr_len
, sizeof(src_in6
)));
611 if (((struct sockaddr
*)tconn
->net_conf
->my_addr
)->sa_family
== AF_INET6
)
612 src_in6
.sin6_port
= 0;
614 ((struct sockaddr_in
*)&src_in6
)->sin_port
= 0; /* AF_INET & AF_SCI */
616 what
= "bind before connect";
617 err
= sock
->ops
->bind(sock
,
618 (struct sockaddr
*) &src_in6
,
619 tconn
->net_conf
->my_addr_len
);
623 /* connect may fail, peer not yet available.
624 * stay C_WF_CONNECTION, don't go Disconnecting! */
625 disconnect_on_error
= 0;
627 err
= sock
->ops
->connect(sock
,
628 (struct sockaddr
*)tconn
->net_conf
->peer_addr
,
629 tconn
->net_conf
->peer_addr_len
, 0);
638 /* timeout, busy, signal pending */
639 case ETIMEDOUT
: case EAGAIN
: case EINPROGRESS
:
640 case EINTR
: case ERESTARTSYS
:
641 /* peer not (yet) available, network problem */
642 case ECONNREFUSED
: case ENETUNREACH
:
643 case EHOSTDOWN
: case EHOSTUNREACH
:
644 disconnect_on_error
= 0;
647 conn_err(tconn
, "%s failed, err = %d\n", what
, err
);
649 if (disconnect_on_error
)
650 conn_request_state(tconn
, NS(conn
, C_DISCONNECTING
), CS_HARD
);
656 static struct socket
*drbd_wait_for_connect(struct drbd_tconn
*tconn
)
659 struct socket
*s_estab
= NULL
, *s_listen
;
662 if (!get_net_conf(tconn
))
665 what
= "sock_create_kern";
666 err
= sock_create_kern(((struct sockaddr
*)tconn
->net_conf
->my_addr
)->sa_family
,
667 SOCK_STREAM
, IPPROTO_TCP
, &s_listen
);
673 timeo
= tconn
->net_conf
->try_connect_int
* HZ
;
674 timeo
+= (random32() & 1) ? timeo
/ 7 : -timeo
/ 7; /* 28.5% random jitter */
676 s_listen
->sk
->sk_reuse
= 1; /* SO_REUSEADDR */
677 s_listen
->sk
->sk_rcvtimeo
= timeo
;
678 s_listen
->sk
->sk_sndtimeo
= timeo
;
679 drbd_setbufsize(s_listen
, tconn
->net_conf
->sndbuf_size
,
680 tconn
->net_conf
->rcvbuf_size
);
682 what
= "bind before listen";
683 err
= s_listen
->ops
->bind(s_listen
,
684 (struct sockaddr
*) tconn
->net_conf
->my_addr
,
685 tconn
->net_conf
->my_addr_len
);
689 err
= drbd_accept(&what
, s_listen
, &s_estab
);
693 sock_release(s_listen
);
695 if (err
!= -EAGAIN
&& err
!= -EINTR
&& err
!= -ERESTARTSYS
) {
696 conn_err(tconn
, "%s failed, err = %d\n", what
, err
);
697 conn_request_state(tconn
, NS(conn
, C_DISCONNECTING
), CS_HARD
);
705 static int drbd_send_fp(struct drbd_tconn
*tconn
, struct socket
*sock
, enum drbd_packet cmd
)
707 struct p_header
*h
= &tconn
->data
.sbuf
.header
;
709 return _conn_send_cmd(tconn
, 0, sock
, cmd
, h
, sizeof(*h
), 0);
712 static enum drbd_packet
drbd_recv_fp(struct drbd_tconn
*tconn
, struct socket
*sock
)
714 struct p_header80
*h
= &tconn
->data
.rbuf
.header
.h80
;
717 rr
= drbd_recv_short(sock
, h
, sizeof(*h
), 0);
719 if (rr
== sizeof(*h
) && h
->magic
== cpu_to_be32(DRBD_MAGIC
))
720 return be16_to_cpu(h
->command
);
726 * drbd_socket_okay() - Free the socket if its connection is not okay
727 * @sock: pointer to the pointer to the socket.
729 static int drbd_socket_okay(struct socket
**sock
)
737 rr
= drbd_recv_short(*sock
, tb
, 4, MSG_DONTWAIT
| MSG_PEEK
);
739 if (rr
> 0 || rr
== -EAGAIN
) {
748 static int drbd_connected(int vnr
, void *p
, void *data
)
750 struct drbd_conf
*mdev
= (struct drbd_conf
*)p
;
753 atomic_set(&mdev
->packet_seq
, 0);
756 mdev
->state_mutex
= mdev
->tconn
->agreed_pro_version
< 100 ?
757 &mdev
->tconn
->cstate_mutex
:
758 &mdev
->own_state_mutex
;
760 ok
&= drbd_send_sync_param(mdev
, &mdev
->sync_conf
);
761 ok
&= drbd_send_sizes(mdev
, 0, 0);
762 ok
&= drbd_send_uuids(mdev
);
763 ok
&= drbd_send_state(mdev
);
764 clear_bit(USE_DEGR_WFC_T
, &mdev
->flags
);
765 clear_bit(RESIZE_PENDING
, &mdev
->flags
);
773 * 1 yes, we have a valid connection
774 * 0 oops, did not work out, please try again
775 * -1 peer talks different language,
776 * no point in trying again, please go standalone.
777 * -2 We do not have a network config...
779 static int drbd_connect(struct drbd_tconn
*tconn
)
781 struct socket
*s
, *sock
, *msock
;
784 if (conn_request_state(tconn
, NS(conn
, C_WF_CONNECTION
), CS_VERBOSE
) < SS_SUCCESS
)
787 clear_bit(DISCARD_CONCURRENT
, &tconn
->flags
);
788 tconn
->agreed_pro_version
= 99;
789 /* agreed_pro_version must be smaller than 100 so we send the old
790 header (h80) in the first packet and in the handshake packet. */
797 /* 3 tries, this should take less than a second! */
798 s
= drbd_try_connect(tconn
);
801 /* give the other side time to call bind() & listen() */
802 schedule_timeout_interruptible(HZ
/ 10);
807 drbd_send_fp(tconn
, s
, P_HAND_SHAKE_S
);
811 drbd_send_fp(tconn
, s
, P_HAND_SHAKE_M
);
815 conn_err(tconn
, "Logic error in drbd_connect()\n");
816 goto out_release_sockets
;
821 schedule_timeout_interruptible(tconn
->net_conf
->ping_timeo
*HZ
/10);
822 ok
= drbd_socket_okay(&sock
);
823 ok
= drbd_socket_okay(&msock
) && ok
;
829 s
= drbd_wait_for_connect(tconn
);
831 try = drbd_recv_fp(tconn
, s
);
832 drbd_socket_okay(&sock
);
833 drbd_socket_okay(&msock
);
837 conn_warn(tconn
, "initial packet S crossed\n");
844 conn_warn(tconn
, "initial packet M crossed\n");
848 set_bit(DISCARD_CONCURRENT
, &tconn
->flags
);
851 conn_warn(tconn
, "Error receiving initial packet\n");
858 if (tconn
->cstate
<= C_DISCONNECTING
)
859 goto out_release_sockets
;
860 if (signal_pending(current
)) {
861 flush_signals(current
);
863 if (get_t_state(&tconn
->receiver
) == EXITING
)
864 goto out_release_sockets
;
868 ok
= drbd_socket_okay(&sock
);
869 ok
= drbd_socket_okay(&msock
) && ok
;
875 msock
->sk
->sk_reuse
= 1; /* SO_REUSEADDR */
876 sock
->sk
->sk_reuse
= 1; /* SO_REUSEADDR */
878 sock
->sk
->sk_allocation
= GFP_NOIO
;
879 msock
->sk
->sk_allocation
= GFP_NOIO
;
881 sock
->sk
->sk_priority
= TC_PRIO_INTERACTIVE_BULK
;
882 msock
->sk
->sk_priority
= TC_PRIO_INTERACTIVE
;
885 * sock->sk->sk_sndtimeo = tconn->net_conf->timeout*HZ/10;
886 * sock->sk->sk_rcvtimeo = MAX_SCHEDULE_TIMEOUT;
887 * first set it to the P_HAND_SHAKE timeout,
888 * which we set to 4x the configured ping_timeout. */
889 sock
->sk
->sk_sndtimeo
=
890 sock
->sk
->sk_rcvtimeo
= tconn
->net_conf
->ping_timeo
*4*HZ
/10;
892 msock
->sk
->sk_sndtimeo
= tconn
->net_conf
->timeout
*HZ
/10;
893 msock
->sk
->sk_rcvtimeo
= tconn
->net_conf
->ping_int
*HZ
;
895 /* we don't want delays.
896 * we use TCP_CORK where appropriate, though */
897 drbd_tcp_nodelay(sock
);
898 drbd_tcp_nodelay(msock
);
900 tconn
->data
.socket
= sock
;
901 tconn
->meta
.socket
= msock
;
902 tconn
->last_received
= jiffies
;
904 h
= drbd_do_handshake(tconn
);
908 if (tconn
->cram_hmac_tfm
) {
909 /* drbd_request_state(mdev, NS(conn, WFAuth)); */
910 switch (drbd_do_auth(tconn
)) {
912 conn_err(tconn
, "Authentication of peer failed\n");
915 conn_err(tconn
, "Authentication of peer failed, trying again.\n");
920 if (conn_request_state(tconn
, NS(conn
, C_WF_REPORT_PARAMS
), CS_VERBOSE
) < SS_SUCCESS
)
923 sock
->sk
->sk_sndtimeo
= tconn
->net_conf
->timeout
*HZ
/10;
924 sock
->sk
->sk_rcvtimeo
= MAX_SCHEDULE_TIMEOUT
;
926 drbd_thread_start(&tconn
->asender
);
928 if (drbd_send_protocol(tconn
) == -1)
931 return !idr_for_each(&tconn
->volumes
, drbd_connected
, tconn
);
941 static bool decode_header(struct drbd_tconn
*tconn
, struct p_header
*h
, struct packet_info
*pi
)
943 if (h
->h80
.magic
== cpu_to_be32(DRBD_MAGIC
)) {
944 pi
->cmd
= be16_to_cpu(h
->h80
.command
);
945 pi
->size
= be16_to_cpu(h
->h80
.length
);
947 } else if (h
->h95
.magic
== cpu_to_be16(DRBD_MAGIC_BIG
)) {
948 pi
->cmd
= be16_to_cpu(h
->h95
.command
);
949 pi
->size
= be32_to_cpu(h
->h95
.length
) & 0x00ffffff;
952 conn_err(tconn
, "magic?? on data m: 0x%08x c: %d l: %d\n",
953 be32_to_cpu(h
->h80
.magic
),
954 be16_to_cpu(h
->h80
.command
),
955 be16_to_cpu(h
->h80
.length
));
961 static int drbd_recv_header(struct drbd_tconn
*tconn
, struct packet_info
*pi
)
963 struct p_header
*h
= &tconn
->data
.rbuf
.header
;
966 r
= drbd_recv(tconn
, h
, sizeof(*h
));
967 if (unlikely(r
!= sizeof(*h
))) {
968 if (!signal_pending(current
))
969 conn_warn(tconn
, "short read expecting header on sock: r=%d\n", r
);
973 r
= decode_header(tconn
, h
, pi
);
974 tconn
->last_received
= jiffies
;
979 static void drbd_flush(struct drbd_conf
*mdev
)
983 if (mdev
->write_ordering
>= WO_bdev_flush
&& get_ldev(mdev
)) {
984 rv
= blkdev_issue_flush(mdev
->ldev
->backing_bdev
, GFP_KERNEL
,
987 dev_err(DEV
, "local disk flush failed with status %d\n", rv
);
988 /* would rather check on EOPNOTSUPP, but that is not reliable.
989 * don't try again for ANY return value != 0
990 * if (rv == -EOPNOTSUPP) */
991 drbd_bump_write_ordering(mdev
, WO_drain_io
);
998 * drbd_may_finish_epoch() - Applies an epoch_event to the epoch's state, eventually finishes it.
999 * @mdev: DRBD device.
1000 * @epoch: Epoch object.
1003 static enum finish_epoch
drbd_may_finish_epoch(struct drbd_conf
*mdev
,
1004 struct drbd_epoch
*epoch
,
1005 enum epoch_event ev
)
1008 struct drbd_epoch
*next_epoch
;
1009 enum finish_epoch rv
= FE_STILL_LIVE
;
1011 spin_lock(&mdev
->epoch_lock
);
1015 epoch_size
= atomic_read(&epoch
->epoch_size
);
1017 switch (ev
& ~EV_CLEANUP
) {
1019 atomic_dec(&epoch
->active
);
1021 case EV_GOT_BARRIER_NR
:
1022 set_bit(DE_HAVE_BARRIER_NUMBER
, &epoch
->flags
);
1024 case EV_BECAME_LAST
:
1029 if (epoch_size
!= 0 &&
1030 atomic_read(&epoch
->active
) == 0 &&
1031 test_bit(DE_HAVE_BARRIER_NUMBER
, &epoch
->flags
)) {
1032 if (!(ev
& EV_CLEANUP
)) {
1033 spin_unlock(&mdev
->epoch_lock
);
1034 drbd_send_b_ack(mdev
, epoch
->barrier_nr
, epoch_size
);
1035 spin_lock(&mdev
->epoch_lock
);
1039 if (mdev
->current_epoch
!= epoch
) {
1040 next_epoch
= list_entry(epoch
->list
.next
, struct drbd_epoch
, list
);
1041 list_del(&epoch
->list
);
1042 ev
= EV_BECAME_LAST
| (ev
& EV_CLEANUP
);
1046 if (rv
== FE_STILL_LIVE
)
1050 atomic_set(&epoch
->epoch_size
, 0);
1051 /* atomic_set(&epoch->active, 0); is already zero */
1052 if (rv
== FE_STILL_LIVE
)
1054 wake_up(&mdev
->ee_wait
);
1064 spin_unlock(&mdev
->epoch_lock
);
1070 * drbd_bump_write_ordering() - Fall back to an other write ordering method
1071 * @mdev: DRBD device.
1072 * @wo: Write ordering method to try.
1074 void drbd_bump_write_ordering(struct drbd_conf
*mdev
, enum write_ordering_e wo
) __must_hold(local
)
1076 enum write_ordering_e pwo
;
1077 static char *write_ordering_str
[] = {
1079 [WO_drain_io
] = "drain",
1080 [WO_bdev_flush
] = "flush",
1083 pwo
= mdev
->write_ordering
;
1085 if (wo
== WO_bdev_flush
&& mdev
->ldev
->dc
.no_disk_flush
)
1087 if (wo
== WO_drain_io
&& mdev
->ldev
->dc
.no_disk_drain
)
1089 mdev
->write_ordering
= wo
;
1090 if (pwo
!= mdev
->write_ordering
|| wo
== WO_bdev_flush
)
1091 dev_info(DEV
, "Method to ensure write ordering: %s\n", write_ordering_str
[mdev
->write_ordering
]);
1095 * drbd_submit_peer_request()
1096 * @mdev: DRBD device.
1097 * @peer_req: peer request
1098 * @rw: flag field, see bio->bi_rw
1100 * May spread the pages to multiple bios,
1101 * depending on bio_add_page restrictions.
1103 * Returns 0 if all bios have been submitted,
1104 * -ENOMEM if we could not allocate enough bios,
1105 * -ENOSPC (any better suggestion?) if we have not been able to bio_add_page a
1106 * single page to an empty bio (which should never happen and likely indicates
1107 * that the lower level IO stack is in some way broken). This has been observed
1108 * on certain Xen deployments.
1110 /* TODO allocate from our own bio_set. */
1111 int drbd_submit_peer_request(struct drbd_conf
*mdev
,
1112 struct drbd_peer_request
*peer_req
,
1113 const unsigned rw
, const int fault_type
)
1115 struct bio
*bios
= NULL
;
1117 struct page
*page
= peer_req
->pages
;
1118 sector_t sector
= peer_req
->i
.sector
;
1119 unsigned ds
= peer_req
->i
.size
;
1120 unsigned n_bios
= 0;
1121 unsigned nr_pages
= (ds
+ PAGE_SIZE
-1) >> PAGE_SHIFT
;
1124 /* In most cases, we will only need one bio. But in case the lower
1125 * level restrictions happen to be different at this offset on this
1126 * side than those of the sending peer, we may need to submit the
1127 * request in more than one bio. */
1129 bio
= bio_alloc(GFP_NOIO
, nr_pages
);
1131 dev_err(DEV
, "submit_ee: Allocation of a bio failed\n");
1134 /* > peer_req->i.sector, unless this is the first bio */
1135 bio
->bi_sector
= sector
;
1136 bio
->bi_bdev
= mdev
->ldev
->backing_bdev
;
1138 bio
->bi_private
= peer_req
;
1139 bio
->bi_end_io
= drbd_peer_request_endio
;
1141 bio
->bi_next
= bios
;
1145 page_chain_for_each(page
) {
1146 unsigned len
= min_t(unsigned, ds
, PAGE_SIZE
);
1147 if (!bio_add_page(bio
, page
, len
, 0)) {
1148 /* A single page must always be possible!
1149 * But in case it fails anyways,
1150 * we deal with it, and complain (below). */
1151 if (bio
->bi_vcnt
== 0) {
1153 "bio_add_page failed for len=%u, "
1154 "bi_vcnt=0 (bi_sector=%llu)\n",
1155 len
, (unsigned long long)bio
->bi_sector
);
1165 D_ASSERT(page
== NULL
);
1168 atomic_set(&peer_req
->pending_bios
, n_bios
);
1171 bios
= bios
->bi_next
;
1172 bio
->bi_next
= NULL
;
1174 drbd_generic_make_request(mdev
, fault_type
, bio
);
1181 bios
= bios
->bi_next
;
1187 static void drbd_remove_epoch_entry_interval(struct drbd_conf
*mdev
,
1188 struct drbd_peer_request
*peer_req
)
1190 struct drbd_interval
*i
= &peer_req
->i
;
1192 drbd_remove_interval(&mdev
->write_requests
, i
);
1193 drbd_clear_interval(i
);
1195 /* Wake up any processes waiting for this peer request to complete. */
1197 wake_up(&mdev
->misc_wait
);
1200 static int receive_Barrier(struct drbd_conf
*mdev
, enum drbd_packet cmd
,
1201 unsigned int data_size
)
1204 struct p_barrier
*p
= &mdev
->tconn
->data
.rbuf
.barrier
;
1205 struct drbd_epoch
*epoch
;
1209 mdev
->current_epoch
->barrier_nr
= p
->barrier
;
1210 rv
= drbd_may_finish_epoch(mdev
, mdev
->current_epoch
, EV_GOT_BARRIER_NR
);
1212 /* P_BARRIER_ACK may imply that the corresponding extent is dropped from
1213 * the activity log, which means it would not be resynced in case the
1214 * R_PRIMARY crashes now.
1215 * Therefore we must send the barrier_ack after the barrier request was
1217 switch (mdev
->write_ordering
) {
1219 if (rv
== FE_RECYCLED
)
1222 /* receiver context, in the writeout path of the other node.
1223 * avoid potential distributed deadlock */
1224 epoch
= kmalloc(sizeof(struct drbd_epoch
), GFP_NOIO
);
1228 dev_warn(DEV
, "Allocation of an epoch failed, slowing down\n");
1233 drbd_wait_ee_list_empty(mdev
, &mdev
->active_ee
);
1236 if (atomic_read(&mdev
->current_epoch
->epoch_size
)) {
1237 epoch
= kmalloc(sizeof(struct drbd_epoch
), GFP_NOIO
);
1242 epoch
= mdev
->current_epoch
;
1243 wait_event(mdev
->ee_wait
, atomic_read(&epoch
->epoch_size
) == 0);
1245 D_ASSERT(atomic_read(&epoch
->active
) == 0);
1246 D_ASSERT(epoch
->flags
== 0);
1250 dev_err(DEV
, "Strangeness in mdev->write_ordering %d\n", mdev
->write_ordering
);
1255 atomic_set(&epoch
->epoch_size
, 0);
1256 atomic_set(&epoch
->active
, 0);
1258 spin_lock(&mdev
->epoch_lock
);
1259 if (atomic_read(&mdev
->current_epoch
->epoch_size
)) {
1260 list_add(&epoch
->list
, &mdev
->current_epoch
->list
);
1261 mdev
->current_epoch
= epoch
;
1264 /* The current_epoch got recycled while we allocated this one... */
1267 spin_unlock(&mdev
->epoch_lock
);
1272 /* used from receive_RSDataReply (recv_resync_read)
1273 * and from receive_Data */
1274 static struct drbd_peer_request
*
1275 read_in_block(struct drbd_conf
*mdev
, u64 id
, sector_t sector
,
1276 int data_size
) __must_hold(local
)
1278 const sector_t capacity
= drbd_get_capacity(mdev
->this_bdev
);
1279 struct drbd_peer_request
*peer_req
;
1282 void *dig_in
= mdev
->tconn
->int_dig_in
;
1283 void *dig_vv
= mdev
->tconn
->int_dig_vv
;
1284 unsigned long *data
;
1286 dgs
= (mdev
->tconn
->agreed_pro_version
>= 87 && mdev
->tconn
->integrity_r_tfm
) ?
1287 crypto_hash_digestsize(mdev
->tconn
->integrity_r_tfm
) : 0;
1290 rr
= drbd_recv(mdev
->tconn
, dig_in
, dgs
);
1292 if (!signal_pending(current
))
1294 "short read receiving data digest: read %d expected %d\n",
1302 if (!expect(data_size
!= 0))
1304 if (!expect(IS_ALIGNED(data_size
, 512)))
1306 if (!expect(data_size
<= DRBD_MAX_BIO_SIZE
))
1309 /* even though we trust out peer,
1310 * we sometimes have to double check. */
1311 if (sector
+ (data_size
>>9) > capacity
) {
1312 dev_err(DEV
, "request from peer beyond end of local disk: "
1313 "capacity: %llus < sector: %llus + size: %u\n",
1314 (unsigned long long)capacity
,
1315 (unsigned long long)sector
, data_size
);
1319 /* GFP_NOIO, because we must not cause arbitrary write-out: in a DRBD
1320 * "criss-cross" setup, that might cause write-out on some other DRBD,
1321 * which in turn might block on the other node at this very place. */
1322 peer_req
= drbd_alloc_ee(mdev
, id
, sector
, data_size
, GFP_NOIO
);
1327 page
= peer_req
->pages
;
1328 page_chain_for_each(page
) {
1329 unsigned len
= min_t(int, ds
, PAGE_SIZE
);
1331 rr
= drbd_recv(mdev
->tconn
, data
, len
);
1332 if (drbd_insert_fault(mdev
, DRBD_FAULT_RECEIVE
)) {
1333 dev_err(DEV
, "Fault injection: Corrupting data on receive\n");
1334 data
[0] = data
[0] ^ (unsigned long)-1;
1338 drbd_free_ee(mdev
, peer_req
);
1339 if (!signal_pending(current
))
1340 dev_warn(DEV
, "short read receiving data: read %d expected %d\n",
1348 drbd_csum_ee(mdev
, mdev
->tconn
->integrity_r_tfm
, peer_req
, dig_vv
);
1349 if (memcmp(dig_in
, dig_vv
, dgs
)) {
1350 dev_err(DEV
, "Digest integrity check FAILED: %llus +%u\n",
1351 (unsigned long long)sector
, data_size
);
1352 drbd_bcast_ee(mdev
, "digest failed",
1353 dgs
, dig_in
, dig_vv
, peer_req
);
1354 drbd_free_ee(mdev
, peer_req
);
1358 mdev
->recv_cnt
+= data_size
>>9;
1362 /* drbd_drain_block() just takes a data block
1363 * out of the socket input buffer, and discards it.
1365 static int drbd_drain_block(struct drbd_conf
*mdev
, int data_size
)
1374 page
= drbd_pp_alloc(mdev
, 1, 1);
1378 rr
= drbd_recv(mdev
->tconn
, data
, min_t(int, data_size
, PAGE_SIZE
));
1379 if (rr
!= min_t(int, data_size
, PAGE_SIZE
)) {
1381 if (!signal_pending(current
))
1383 "short read receiving data: read %d expected %d\n",
1384 rr
, min_t(int, data_size
, PAGE_SIZE
));
1390 drbd_pp_free(mdev
, page
, 0);
1394 static int recv_dless_read(struct drbd_conf
*mdev
, struct drbd_request
*req
,
1395 sector_t sector
, int data_size
)
1397 struct bio_vec
*bvec
;
1399 int dgs
, rr
, i
, expect
;
1400 void *dig_in
= mdev
->tconn
->int_dig_in
;
1401 void *dig_vv
= mdev
->tconn
->int_dig_vv
;
1403 dgs
= (mdev
->tconn
->agreed_pro_version
>= 87 && mdev
->tconn
->integrity_r_tfm
) ?
1404 crypto_hash_digestsize(mdev
->tconn
->integrity_r_tfm
) : 0;
1407 rr
= drbd_recv(mdev
->tconn
, dig_in
, dgs
);
1409 if (!signal_pending(current
))
1411 "short read receiving data reply digest: read %d expected %d\n",
1419 /* optimistically update recv_cnt. if receiving fails below,
1420 * we disconnect anyways, and counters will be reset. */
1421 mdev
->recv_cnt
+= data_size
>>9;
1423 bio
= req
->master_bio
;
1424 D_ASSERT(sector
== bio
->bi_sector
);
1426 bio_for_each_segment(bvec
, bio
, i
) {
1427 expect
= min_t(int, data_size
, bvec
->bv_len
);
1428 rr
= drbd_recv(mdev
->tconn
,
1429 kmap(bvec
->bv_page
)+bvec
->bv_offset
,
1431 kunmap(bvec
->bv_page
);
1433 if (!signal_pending(current
))
1434 dev_warn(DEV
, "short read receiving data reply: "
1435 "read %d expected %d\n",
1443 drbd_csum_bio(mdev
, mdev
->tconn
->integrity_r_tfm
, bio
, dig_vv
);
1444 if (memcmp(dig_in
, dig_vv
, dgs
)) {
1445 dev_err(DEV
, "Digest integrity check FAILED. Broken NICs?\n");
1450 D_ASSERT(data_size
== 0);
1454 /* e_end_resync_block() is called via
1455 * drbd_process_done_ee() by asender only */
1456 static int e_end_resync_block(struct drbd_work
*w
, int unused
)
1458 struct drbd_peer_request
*peer_req
= (struct drbd_peer_request
*)w
;
1459 struct drbd_conf
*mdev
= w
->mdev
;
1460 sector_t sector
= peer_req
->i
.sector
;
1463 D_ASSERT(drbd_interval_empty(&peer_req
->i
));
1465 if (likely((peer_req
->flags
& EE_WAS_ERROR
) == 0)) {
1466 drbd_set_in_sync(mdev
, sector
, peer_req
->i
.size
);
1467 ok
= drbd_send_ack(mdev
, P_RS_WRITE_ACK
, peer_req
);
1469 /* Record failure to sync */
1470 drbd_rs_failed_io(mdev
, sector
, peer_req
->i
.size
);
1472 ok
= drbd_send_ack(mdev
, P_NEG_ACK
, peer_req
);
1479 static int recv_resync_read(struct drbd_conf
*mdev
, sector_t sector
, int data_size
) __releases(local
)
1481 struct drbd_peer_request
*peer_req
;
1483 peer_req
= read_in_block(mdev
, ID_SYNCER
, sector
, data_size
);
1487 dec_rs_pending(mdev
);
1490 /* corresponding dec_unacked() in e_end_resync_block()
1491 * respective _drbd_clear_done_ee */
1493 peer_req
->w
.cb
= e_end_resync_block
;
1495 spin_lock_irq(&mdev
->tconn
->req_lock
);
1496 list_add(&peer_req
->w
.list
, &mdev
->sync_ee
);
1497 spin_unlock_irq(&mdev
->tconn
->req_lock
);
1499 atomic_add(data_size
>> 9, &mdev
->rs_sect_ev
);
1500 if (drbd_submit_peer_request(mdev
, peer_req
, WRITE
, DRBD_FAULT_RS_WR
) == 0)
1503 /* don't care for the reason here */
1504 dev_err(DEV
, "submit failed, triggering re-connect\n");
1505 spin_lock_irq(&mdev
->tconn
->req_lock
);
1506 list_del(&peer_req
->w
.list
);
1507 spin_unlock_irq(&mdev
->tconn
->req_lock
);
1509 drbd_free_ee(mdev
, peer_req
);
1515 static struct drbd_request
*
1516 find_request(struct drbd_conf
*mdev
, struct rb_root
*root
, u64 id
,
1517 sector_t sector
, bool missing_ok
, const char *func
)
1519 struct drbd_request
*req
;
1521 /* Request object according to our peer */
1522 req
= (struct drbd_request
*)(unsigned long)id
;
1523 if (drbd_contains_interval(root
, sector
, &req
->i
) && req
->i
.local
)
1526 dev_err(DEV
, "%s: failed to find request %lu, sector %llus\n", func
,
1527 (unsigned long)id
, (unsigned long long)sector
);
1532 static int receive_DataReply(struct drbd_conf
*mdev
, enum drbd_packet cmd
,
1533 unsigned int data_size
)
1535 struct drbd_request
*req
;
1538 struct p_data
*p
= &mdev
->tconn
->data
.rbuf
.data
;
1540 sector
= be64_to_cpu(p
->sector
);
1542 spin_lock_irq(&mdev
->tconn
->req_lock
);
1543 req
= find_request(mdev
, &mdev
->read_requests
, p
->block_id
, sector
, false, __func__
);
1544 spin_unlock_irq(&mdev
->tconn
->req_lock
);
1548 /* hlist_del(&req->collision) is done in _req_may_be_done, to avoid
1549 * special casing it there for the various failure cases.
1550 * still no race with drbd_fail_pending_reads */
1551 ok
= recv_dless_read(mdev
, req
, sector
, data_size
);
1554 req_mod(req
, DATA_RECEIVED
);
1555 /* else: nothing. handled from drbd_disconnect...
1556 * I don't think we may complete this just yet
1557 * in case we are "on-disconnect: freeze" */
1562 static int receive_RSDataReply(struct drbd_conf
*mdev
, enum drbd_packet cmd
,
1563 unsigned int data_size
)
1567 struct p_data
*p
= &mdev
->tconn
->data
.rbuf
.data
;
1569 sector
= be64_to_cpu(p
->sector
);
1570 D_ASSERT(p
->block_id
== ID_SYNCER
);
1572 if (get_ldev(mdev
)) {
1573 /* data is submitted to disk within recv_resync_read.
1574 * corresponding put_ldev done below on error,
1575 * or in drbd_peer_request_endio. */
1576 ok
= recv_resync_read(mdev
, sector
, data_size
);
1578 if (__ratelimit(&drbd_ratelimit_state
))
1579 dev_err(DEV
, "Can not write resync data to local disk.\n");
1581 ok
= drbd_drain_block(mdev
, data_size
);
1583 drbd_send_ack_dp(mdev
, P_NEG_ACK
, p
, data_size
);
1586 atomic_add(data_size
>> 9, &mdev
->rs_sect_in
);
1591 /* e_end_block() is called via drbd_process_done_ee().
1592 * this means this function only runs in the asender thread
1594 static int e_end_block(struct drbd_work
*w
, int cancel
)
1596 struct drbd_peer_request
*peer_req
= (struct drbd_peer_request
*)w
;
1597 struct drbd_conf
*mdev
= w
->mdev
;
1598 sector_t sector
= peer_req
->i
.sector
;
1601 if (mdev
->tconn
->net_conf
->wire_protocol
== DRBD_PROT_C
) {
1602 if (likely((peer_req
->flags
& EE_WAS_ERROR
) == 0)) {
1603 pcmd
= (mdev
->state
.conn
>= C_SYNC_SOURCE
&&
1604 mdev
->state
.conn
<= C_PAUSED_SYNC_T
&&
1605 peer_req
->flags
& EE_MAY_SET_IN_SYNC
) ?
1606 P_RS_WRITE_ACK
: P_WRITE_ACK
;
1607 ok
&= drbd_send_ack(mdev
, pcmd
, peer_req
);
1608 if (pcmd
== P_RS_WRITE_ACK
)
1609 drbd_set_in_sync(mdev
, sector
, peer_req
->i
.size
);
1611 ok
= drbd_send_ack(mdev
, P_NEG_ACK
, peer_req
);
1612 /* we expect it to be marked out of sync anyways...
1613 * maybe assert this? */
1617 /* we delete from the conflict detection hash _after_ we sent out the
1618 * P_WRITE_ACK / P_NEG_ACK, to get the sequence number right. */
1619 if (mdev
->tconn
->net_conf
->two_primaries
) {
1620 spin_lock_irq(&mdev
->tconn
->req_lock
);
1621 D_ASSERT(!drbd_interval_empty(&peer_req
->i
));
1622 drbd_remove_epoch_entry_interval(mdev
, peer_req
);
1623 spin_unlock_irq(&mdev
->tconn
->req_lock
);
1625 D_ASSERT(drbd_interval_empty(&peer_req
->i
));
1627 drbd_may_finish_epoch(mdev
, peer_req
->epoch
, EV_PUT
+ (cancel
? EV_CLEANUP
: 0));
1632 static int e_send_discard_ack(struct drbd_work
*w
, int unused
)
1634 struct drbd_peer_request
*peer_req
= (struct drbd_peer_request
*)w
;
1635 struct drbd_conf
*mdev
= w
->mdev
;
1638 D_ASSERT(mdev
->tconn
->net_conf
->wire_protocol
== DRBD_PROT_C
);
1639 ok
= drbd_send_ack(mdev
, P_DISCARD_ACK
, peer_req
);
1641 spin_lock_irq(&mdev
->tconn
->req_lock
);
1642 D_ASSERT(!drbd_interval_empty(&peer_req
->i
));
1643 drbd_remove_epoch_entry_interval(mdev
, peer_req
);
1644 spin_unlock_irq(&mdev
->tconn
->req_lock
);
1651 static bool seq_greater(u32 a
, u32 b
)
1654 * We assume 32-bit wrap-around here.
1655 * For 24-bit wrap-around, we would have to shift:
1658 return (s32
)a
- (s32
)b
> 0;
1661 static u32
seq_max(u32 a
, u32 b
)
1663 return seq_greater(a
, b
) ? a
: b
;
1666 static void update_peer_seq(struct drbd_conf
*mdev
, unsigned int peer_seq
)
1668 unsigned int old_peer_seq
;
1670 spin_lock(&mdev
->peer_seq_lock
);
1671 old_peer_seq
= mdev
->peer_seq
;
1672 mdev
->peer_seq
= seq_max(mdev
->peer_seq
, peer_seq
);
1673 spin_unlock(&mdev
->peer_seq_lock
);
1674 if (old_peer_seq
!= peer_seq
)
1675 wake_up(&mdev
->seq_wait
);
1678 /* Called from receive_Data.
1679 * Synchronize packets on sock with packets on msock.
1681 * This is here so even when a P_DATA packet traveling via sock overtook an Ack
1682 * packet traveling on msock, they are still processed in the order they have
1685 * Note: we don't care for Ack packets overtaking P_DATA packets.
1687 * In case packet_seq is larger than mdev->peer_seq number, there are
1688 * outstanding packets on the msock. We wait for them to arrive.
1689 * In case we are the logically next packet, we update mdev->peer_seq
1690 * ourselves. Correctly handles 32bit wrap around.
1692 * Assume we have a 10 GBit connection, that is about 1<<30 byte per second,
1693 * about 1<<21 sectors per second. So "worst" case, we have 1<<3 == 8 seconds
1694 * for the 24bit wrap (historical atomic_t guarantee on some archs), and we have
1695 * 1<<9 == 512 seconds aka ages for the 32bit wrap around...
1697 * returns 0 if we may process the packet,
1698 * -ERESTARTSYS if we were interrupted (by disconnect signal). */
1699 static int drbd_wait_peer_seq(struct drbd_conf
*mdev
, const u32 packet_seq
)
1705 spin_lock(&mdev
->peer_seq_lock
);
1707 prepare_to_wait(&mdev
->seq_wait
, &wait
, TASK_INTERRUPTIBLE
);
1708 if (!seq_greater(packet_seq
, mdev
->peer_seq
+ 1))
1710 if (signal_pending(current
)) {
1714 p_seq
= mdev
->peer_seq
;
1715 spin_unlock(&mdev
->peer_seq_lock
);
1716 timeout
= schedule_timeout(30*HZ
);
1717 spin_lock(&mdev
->peer_seq_lock
);
1718 if (timeout
== 0 && p_seq
== mdev
->peer_seq
) {
1720 dev_err(DEV
, "ASSERT FAILED waited 30 seconds for sequence update, forcing reconnect\n");
1724 finish_wait(&mdev
->seq_wait
, &wait
);
1725 if (mdev
->peer_seq
+1 == packet_seq
)
1727 spin_unlock(&mdev
->peer_seq_lock
);
1731 /* see also bio_flags_to_wire()
1732 * DRBD_REQ_*, because we need to semantically map the flags to data packet
1733 * flags and back. We may replicate to other kernel versions. */
1734 static unsigned long wire_flags_to_bio(struct drbd_conf
*mdev
, u32 dpf
)
1736 return (dpf
& DP_RW_SYNC
? REQ_SYNC
: 0) |
1737 (dpf
& DP_FUA
? REQ_FUA
: 0) |
1738 (dpf
& DP_FLUSH
? REQ_FLUSH
: 0) |
1739 (dpf
& DP_DISCARD
? REQ_DISCARD
: 0);
1742 /* mirrored write */
1743 static int receive_Data(struct drbd_conf
*mdev
, enum drbd_packet cmd
,
1744 unsigned int data_size
)
1747 struct drbd_peer_request
*peer_req
;
1748 struct p_data
*p
= &mdev
->tconn
->data
.rbuf
.data
;
1752 if (!get_ldev(mdev
)) {
1753 spin_lock(&mdev
->peer_seq_lock
);
1754 if (mdev
->peer_seq
+1 == be32_to_cpu(p
->seq_num
))
1756 spin_unlock(&mdev
->peer_seq_lock
);
1758 drbd_send_ack_dp(mdev
, P_NEG_ACK
, p
, data_size
);
1759 atomic_inc(&mdev
->current_epoch
->epoch_size
);
1760 return drbd_drain_block(mdev
, data_size
);
1764 * Corresponding put_ldev done either below (on various errors), or in
1765 * drbd_peer_request_endio, if we successfully submit the data at the
1766 * end of this function.
1769 sector
= be64_to_cpu(p
->sector
);
1770 peer_req
= read_in_block(mdev
, p
->block_id
, sector
, data_size
);
1776 peer_req
->w
.cb
= e_end_block
;
1778 dp_flags
= be32_to_cpu(p
->dp_flags
);
1779 rw
|= wire_flags_to_bio(mdev
, dp_flags
);
1781 if (dp_flags
& DP_MAY_SET_IN_SYNC
)
1782 peer_req
->flags
|= EE_MAY_SET_IN_SYNC
;
1784 spin_lock(&mdev
->epoch_lock
);
1785 peer_req
->epoch
= mdev
->current_epoch
;
1786 atomic_inc(&peer_req
->epoch
->epoch_size
);
1787 atomic_inc(&peer_req
->epoch
->active
);
1788 spin_unlock(&mdev
->epoch_lock
);
1790 /* I'm the receiver, I do hold a net_cnt reference. */
1791 if (!mdev
->tconn
->net_conf
->two_primaries
) {
1792 spin_lock_irq(&mdev
->tconn
->req_lock
);
1794 /* don't get the req_lock yet,
1795 * we may sleep in drbd_wait_peer_seq */
1796 const int size
= peer_req
->i
.size
;
1797 const int discard
= test_bit(DISCARD_CONCURRENT
, &mdev
->tconn
->flags
);
1801 D_ASSERT(mdev
->tconn
->net_conf
->wire_protocol
== DRBD_PROT_C
);
1803 /* conflict detection and handling:
1804 * 1. wait on the sequence number,
1805 * in case this data packet overtook ACK packets.
1806 * 2. check for conflicting write requests.
1808 * Note: for two_primaries, we are protocol C,
1809 * so there cannot be any request that is DONE
1810 * but still on the transfer log.
1812 * if no conflicting request is found:
1815 * if any conflicting request is found
1816 * that has not yet been acked,
1817 * AND I have the "discard concurrent writes" flag:
1818 * queue (via done_ee) the P_DISCARD_ACK; OUT.
1820 * if any conflicting request is found:
1821 * block the receiver, waiting on misc_wait
1822 * until no more conflicting requests are there,
1823 * or we get interrupted (disconnect).
1825 * we do not just write after local io completion of those
1826 * requests, but only after req is done completely, i.e.
1827 * we wait for the P_DISCARD_ACK to arrive!
1829 * then proceed normally, i.e. submit.
1831 if (drbd_wait_peer_seq(mdev
, be32_to_cpu(p
->seq_num
)))
1832 goto out_interrupted
;
1834 spin_lock_irq(&mdev
->tconn
->req_lock
);
1838 struct drbd_interval
*i
;
1839 int have_unacked
= 0;
1840 int have_conflict
= 0;
1841 prepare_to_wait(&mdev
->misc_wait
, &wait
,
1842 TASK_INTERRUPTIBLE
);
1844 i
= drbd_find_overlap(&mdev
->write_requests
, sector
, size
);
1846 /* only ALERT on first iteration,
1847 * we may be woken up early... */
1849 dev_alert(DEV
, "%s[%u] Concurrent %s write detected!"
1850 " new: %llus +%u; pending: %llus +%u\n",
1851 current
->comm
, current
->pid
,
1852 i
->local
? "local" : "remote",
1853 (unsigned long long)sector
, size
,
1854 (unsigned long long)i
->sector
, i
->size
);
1857 struct drbd_request
*req2
;
1859 req2
= container_of(i
, struct drbd_request
, i
);
1860 if (req2
->rq_state
& RQ_NET_PENDING
)
1868 /* Discard Ack only for the _first_ iteration */
1869 if (first
&& discard
&& have_unacked
) {
1870 dev_alert(DEV
, "Concurrent write! [DISCARD BY FLAG] sec=%llus\n",
1871 (unsigned long long)sector
);
1873 peer_req
->w
.cb
= e_send_discard_ack
;
1874 list_add_tail(&peer_req
->w
.list
, &mdev
->done_ee
);
1876 spin_unlock_irq(&mdev
->tconn
->req_lock
);
1878 /* we could probably send that P_DISCARD_ACK ourselves,
1879 * but I don't like the receiver using the msock */
1882 wake_asender(mdev
->tconn
);
1883 finish_wait(&mdev
->misc_wait
, &wait
);
1887 if (signal_pending(current
)) {
1888 spin_unlock_irq(&mdev
->tconn
->req_lock
);
1889 finish_wait(&mdev
->misc_wait
, &wait
);
1890 goto out_interrupted
;
1893 /* Indicate to wake up mdev->misc_wait upon completion. */
1896 spin_unlock_irq(&mdev
->tconn
->req_lock
);
1899 dev_alert(DEV
, "Concurrent write! [W AFTERWARDS] "
1900 "sec=%llus\n", (unsigned long long)sector
);
1901 } else if (discard
) {
1902 /* we had none on the first iteration.
1903 * there must be none now. */
1904 D_ASSERT(have_unacked
== 0);
1907 spin_lock_irq(&mdev
->tconn
->req_lock
);
1909 finish_wait(&mdev
->misc_wait
, &wait
);
1911 drbd_insert_interval(&mdev
->write_requests
, &peer_req
->i
);
1914 list_add(&peer_req
->w
.list
, &mdev
->active_ee
);
1915 spin_unlock_irq(&mdev
->tconn
->req_lock
);
1917 switch (mdev
->tconn
->net_conf
->wire_protocol
) {
1920 /* corresponding dec_unacked() in e_end_block()
1921 * respective _drbd_clear_done_ee */
1924 /* I really don't like it that the receiver thread
1925 * sends on the msock, but anyways */
1926 drbd_send_ack(mdev
, P_RECV_ACK
, peer_req
);
1933 if (mdev
->state
.pdsk
< D_INCONSISTENT
) {
1934 /* In case we have the only disk of the cluster, */
1935 drbd_set_out_of_sync(mdev
, peer_req
->i
.sector
, peer_req
->i
.size
);
1936 peer_req
->flags
|= EE_CALL_AL_COMPLETE_IO
;
1937 peer_req
->flags
&= ~EE_MAY_SET_IN_SYNC
;
1938 drbd_al_begin_io(mdev
, peer_req
->i
.sector
);
1941 if (drbd_submit_peer_request(mdev
, peer_req
, rw
, DRBD_FAULT_DT_WR
) == 0)
1944 /* don't care for the reason here */
1945 dev_err(DEV
, "submit failed, triggering re-connect\n");
1946 spin_lock_irq(&mdev
->tconn
->req_lock
);
1947 list_del(&peer_req
->w
.list
);
1948 drbd_remove_epoch_entry_interval(mdev
, peer_req
);
1949 spin_unlock_irq(&mdev
->tconn
->req_lock
);
1950 if (peer_req
->flags
& EE_CALL_AL_COMPLETE_IO
)
1951 drbd_al_complete_io(mdev
, peer_req
->i
.sector
);
1954 drbd_may_finish_epoch(mdev
, peer_req
->epoch
, EV_PUT
+ EV_CLEANUP
);
1956 drbd_free_ee(mdev
, peer_req
);
1960 /* We may throttle resync, if the lower device seems to be busy,
1961 * and current sync rate is above c_min_rate.
1963 * To decide whether or not the lower device is busy, we use a scheme similar
1964 * to MD RAID is_mddev_idle(): if the partition stats reveal "significant"
1965 * (more than 64 sectors) of activity we cannot account for with our own resync
1966 * activity, it obviously is "busy".
1968 * The current sync rate used here uses only the most recent two step marks,
1969 * to have a short time average so we can react faster.
1971 int drbd_rs_should_slow_down(struct drbd_conf
*mdev
, sector_t sector
)
1973 struct gendisk
*disk
= mdev
->ldev
->backing_bdev
->bd_contains
->bd_disk
;
1974 unsigned long db
, dt
, dbdt
;
1975 struct lc_element
*tmp
;
1979 /* feature disabled? */
1980 if (mdev
->sync_conf
.c_min_rate
== 0)
1983 spin_lock_irq(&mdev
->al_lock
);
1984 tmp
= lc_find(mdev
->resync
, BM_SECT_TO_EXT(sector
));
1986 struct bm_extent
*bm_ext
= lc_entry(tmp
, struct bm_extent
, lce
);
1987 if (test_bit(BME_PRIORITY
, &bm_ext
->flags
)) {
1988 spin_unlock_irq(&mdev
->al_lock
);
1991 /* Do not slow down if app IO is already waiting for this extent */
1993 spin_unlock_irq(&mdev
->al_lock
);
1995 curr_events
= (int)part_stat_read(&disk
->part0
, sectors
[0]) +
1996 (int)part_stat_read(&disk
->part0
, sectors
[1]) -
1997 atomic_read(&mdev
->rs_sect_ev
);
1999 if (!mdev
->rs_last_events
|| curr_events
- mdev
->rs_last_events
> 64) {
2000 unsigned long rs_left
;
2003 mdev
->rs_last_events
= curr_events
;
2005 /* sync speed average over the last 2*DRBD_SYNC_MARK_STEP,
2007 i
= (mdev
->rs_last_mark
+ DRBD_SYNC_MARKS
-1) % DRBD_SYNC_MARKS
;
2009 if (mdev
->state
.conn
== C_VERIFY_S
|| mdev
->state
.conn
== C_VERIFY_T
)
2010 rs_left
= mdev
->ov_left
;
2012 rs_left
= drbd_bm_total_weight(mdev
) - mdev
->rs_failed
;
2014 dt
= ((long)jiffies
- (long)mdev
->rs_mark_time
[i
]) / HZ
;
2017 db
= mdev
->rs_mark_left
[i
] - rs_left
;
2018 dbdt
= Bit2KB(db
/dt
);
2020 if (dbdt
> mdev
->sync_conf
.c_min_rate
)
2027 static int receive_DataRequest(struct drbd_conf
*mdev
, enum drbd_packet cmd
,
2028 unsigned int digest_size
)
2031 const sector_t capacity
= drbd_get_capacity(mdev
->this_bdev
);
2032 struct drbd_peer_request
*peer_req
;
2033 struct digest_info
*di
= NULL
;
2035 unsigned int fault_type
;
2036 struct p_block_req
*p
= &mdev
->tconn
->data
.rbuf
.block_req
;
2038 sector
= be64_to_cpu(p
->sector
);
2039 size
= be32_to_cpu(p
->blksize
);
2041 if (size
<= 0 || (size
& 0x1ff) != 0 || size
> DRBD_MAX_BIO_SIZE
) {
2042 dev_err(DEV
, "%s:%d: sector: %llus, size: %u\n", __FILE__
, __LINE__
,
2043 (unsigned long long)sector
, size
);
2046 if (sector
+ (size
>>9) > capacity
) {
2047 dev_err(DEV
, "%s:%d: sector: %llus, size: %u\n", __FILE__
, __LINE__
,
2048 (unsigned long long)sector
, size
);
2052 if (!get_ldev_if_state(mdev
, D_UP_TO_DATE
)) {
2055 case P_DATA_REQUEST
:
2056 drbd_send_ack_rp(mdev
, P_NEG_DREPLY
, p
);
2058 case P_RS_DATA_REQUEST
:
2059 case P_CSUM_RS_REQUEST
:
2061 drbd_send_ack_rp(mdev
, P_NEG_RS_DREPLY
, p
);
2065 dec_rs_pending(mdev
);
2066 drbd_send_ack_ex(mdev
, P_OV_RESULT
, sector
, size
, ID_IN_SYNC
);
2069 dev_err(DEV
, "unexpected command (%s) in receive_DataRequest\n",
2072 if (verb
&& __ratelimit(&drbd_ratelimit_state
))
2073 dev_err(DEV
, "Can not satisfy peer's read request, "
2074 "no local data.\n");
2076 /* drain possibly payload */
2077 return drbd_drain_block(mdev
, digest_size
);
2080 /* GFP_NOIO, because we must not cause arbitrary write-out: in a DRBD
2081 * "criss-cross" setup, that might cause write-out on some other DRBD,
2082 * which in turn might block on the other node at this very place. */
2083 peer_req
= drbd_alloc_ee(mdev
, p
->block_id
, sector
, size
, GFP_NOIO
);
2090 case P_DATA_REQUEST
:
2091 peer_req
->w
.cb
= w_e_end_data_req
;
2092 fault_type
= DRBD_FAULT_DT_RD
;
2093 /* application IO, don't drbd_rs_begin_io */
2096 case P_RS_DATA_REQUEST
:
2097 peer_req
->w
.cb
= w_e_end_rsdata_req
;
2098 fault_type
= DRBD_FAULT_RS_RD
;
2099 /* used in the sector offset progress display */
2100 mdev
->bm_resync_fo
= BM_SECT_TO_BIT(sector
);
2104 case P_CSUM_RS_REQUEST
:
2105 fault_type
= DRBD_FAULT_RS_RD
;
2106 di
= kmalloc(sizeof(*di
) + digest_size
, GFP_NOIO
);
2110 di
->digest_size
= digest_size
;
2111 di
->digest
= (((char *)di
)+sizeof(struct digest_info
));
2113 peer_req
->digest
= di
;
2114 peer_req
->flags
|= EE_HAS_DIGEST
;
2116 if (drbd_recv(mdev
->tconn
, di
->digest
, digest_size
) != digest_size
)
2119 if (cmd
== P_CSUM_RS_REQUEST
) {
2120 D_ASSERT(mdev
->tconn
->agreed_pro_version
>= 89);
2121 peer_req
->w
.cb
= w_e_end_csum_rs_req
;
2122 /* used in the sector offset progress display */
2123 mdev
->bm_resync_fo
= BM_SECT_TO_BIT(sector
);
2124 } else if (cmd
== P_OV_REPLY
) {
2125 /* track progress, we may need to throttle */
2126 atomic_add(size
>> 9, &mdev
->rs_sect_in
);
2127 peer_req
->w
.cb
= w_e_end_ov_reply
;
2128 dec_rs_pending(mdev
);
2129 /* drbd_rs_begin_io done when we sent this request,
2130 * but accounting still needs to be done. */
2131 goto submit_for_resync
;
2136 if (mdev
->ov_start_sector
== ~(sector_t
)0 &&
2137 mdev
->tconn
->agreed_pro_version
>= 90) {
2138 unsigned long now
= jiffies
;
2140 mdev
->ov_start_sector
= sector
;
2141 mdev
->ov_position
= sector
;
2142 mdev
->ov_left
= drbd_bm_bits(mdev
) - BM_SECT_TO_BIT(sector
);
2143 mdev
->rs_total
= mdev
->ov_left
;
2144 for (i
= 0; i
< DRBD_SYNC_MARKS
; i
++) {
2145 mdev
->rs_mark_left
[i
] = mdev
->ov_left
;
2146 mdev
->rs_mark_time
[i
] = now
;
2148 dev_info(DEV
, "Online Verify start sector: %llu\n",
2149 (unsigned long long)sector
);
2151 peer_req
->w
.cb
= w_e_end_ov_req
;
2152 fault_type
= DRBD_FAULT_RS_RD
;
2156 dev_err(DEV
, "unexpected command (%s) in receive_DataRequest\n",
2158 fault_type
= DRBD_FAULT_MAX
;
2162 /* Throttle, drbd_rs_begin_io and submit should become asynchronous
2163 * wrt the receiver, but it is not as straightforward as it may seem.
2164 * Various places in the resync start and stop logic assume resync
2165 * requests are processed in order, requeuing this on the worker thread
2166 * introduces a bunch of new code for synchronization between threads.
2168 * Unlimited throttling before drbd_rs_begin_io may stall the resync
2169 * "forever", throttling after drbd_rs_begin_io will lock that extent
2170 * for application writes for the same time. For now, just throttle
2171 * here, where the rest of the code expects the receiver to sleep for
2175 /* Throttle before drbd_rs_begin_io, as that locks out application IO;
2176 * this defers syncer requests for some time, before letting at least
2177 * on request through. The resync controller on the receiving side
2178 * will adapt to the incoming rate accordingly.
2180 * We cannot throttle here if remote is Primary/SyncTarget:
2181 * we would also throttle its application reads.
2182 * In that case, throttling is done on the SyncTarget only.
2184 if (mdev
->state
.peer
!= R_PRIMARY
&& drbd_rs_should_slow_down(mdev
, sector
))
2185 schedule_timeout_uninterruptible(HZ
/10);
2186 if (drbd_rs_begin_io(mdev
, sector
))
2190 atomic_add(size
>> 9, &mdev
->rs_sect_ev
);
2194 spin_lock_irq(&mdev
->tconn
->req_lock
);
2195 list_add_tail(&peer_req
->w
.list
, &mdev
->read_ee
);
2196 spin_unlock_irq(&mdev
->tconn
->req_lock
);
2198 if (drbd_submit_peer_request(mdev
, peer_req
, READ
, fault_type
) == 0)
2201 /* don't care for the reason here */
2202 dev_err(DEV
, "submit failed, triggering re-connect\n");
2203 spin_lock_irq(&mdev
->tconn
->req_lock
);
2204 list_del(&peer_req
->w
.list
);
2205 spin_unlock_irq(&mdev
->tconn
->req_lock
);
2206 /* no drbd_rs_complete_io(), we are dropping the connection anyways */
2210 drbd_free_ee(mdev
, peer_req
);
2214 static int drbd_asb_recover_0p(struct drbd_conf
*mdev
) __must_hold(local
)
2216 int self
, peer
, rv
= -100;
2217 unsigned long ch_self
, ch_peer
;
2219 self
= mdev
->ldev
->md
.uuid
[UI_BITMAP
] & 1;
2220 peer
= mdev
->p_uuid
[UI_BITMAP
] & 1;
2222 ch_peer
= mdev
->p_uuid
[UI_SIZE
];
2223 ch_self
= mdev
->comm_bm_set
;
2225 switch (mdev
->tconn
->net_conf
->after_sb_0p
) {
2227 case ASB_DISCARD_SECONDARY
:
2228 case ASB_CALL_HELPER
:
2229 dev_err(DEV
, "Configuration error.\n");
2231 case ASB_DISCONNECT
:
2233 case ASB_DISCARD_YOUNGER_PRI
:
2234 if (self
== 0 && peer
== 1) {
2238 if (self
== 1 && peer
== 0) {
2242 /* Else fall through to one of the other strategies... */
2243 case ASB_DISCARD_OLDER_PRI
:
2244 if (self
== 0 && peer
== 1) {
2248 if (self
== 1 && peer
== 0) {
2252 /* Else fall through to one of the other strategies... */
2253 dev_warn(DEV
, "Discard younger/older primary did not find a decision\n"
2254 "Using discard-least-changes instead\n");
2255 case ASB_DISCARD_ZERO_CHG
:
2256 if (ch_peer
== 0 && ch_self
== 0) {
2257 rv
= test_bit(DISCARD_CONCURRENT
, &mdev
->tconn
->flags
)
2261 if (ch_peer
== 0) { rv
= 1; break; }
2262 if (ch_self
== 0) { rv
= -1; break; }
2264 if (mdev
->tconn
->net_conf
->after_sb_0p
== ASB_DISCARD_ZERO_CHG
)
2266 case ASB_DISCARD_LEAST_CHG
:
2267 if (ch_self
< ch_peer
)
2269 else if (ch_self
> ch_peer
)
2271 else /* ( ch_self == ch_peer ) */
2272 /* Well, then use something else. */
2273 rv
= test_bit(DISCARD_CONCURRENT
, &mdev
->tconn
->flags
)
2276 case ASB_DISCARD_LOCAL
:
2279 case ASB_DISCARD_REMOTE
:
2286 static int drbd_asb_recover_1p(struct drbd_conf
*mdev
) __must_hold(local
)
2290 switch (mdev
->tconn
->net_conf
->after_sb_1p
) {
2291 case ASB_DISCARD_YOUNGER_PRI
:
2292 case ASB_DISCARD_OLDER_PRI
:
2293 case ASB_DISCARD_LEAST_CHG
:
2294 case ASB_DISCARD_LOCAL
:
2295 case ASB_DISCARD_REMOTE
:
2296 dev_err(DEV
, "Configuration error.\n");
2298 case ASB_DISCONNECT
:
2301 hg
= drbd_asb_recover_0p(mdev
);
2302 if (hg
== -1 && mdev
->state
.role
== R_SECONDARY
)
2304 if (hg
== 1 && mdev
->state
.role
== R_PRIMARY
)
2308 rv
= drbd_asb_recover_0p(mdev
);
2310 case ASB_DISCARD_SECONDARY
:
2311 return mdev
->state
.role
== R_PRIMARY
? 1 : -1;
2312 case ASB_CALL_HELPER
:
2313 hg
= drbd_asb_recover_0p(mdev
);
2314 if (hg
== -1 && mdev
->state
.role
== R_PRIMARY
) {
2315 enum drbd_state_rv rv2
;
2317 drbd_set_role(mdev
, R_SECONDARY
, 0);
2318 /* drbd_change_state() does not sleep while in SS_IN_TRANSIENT_STATE,
2319 * we might be here in C_WF_REPORT_PARAMS which is transient.
2320 * we do not need to wait for the after state change work either. */
2321 rv2
= drbd_change_state(mdev
, CS_VERBOSE
, NS(role
, R_SECONDARY
));
2322 if (rv2
!= SS_SUCCESS
) {
2323 drbd_khelper(mdev
, "pri-lost-after-sb");
2325 dev_warn(DEV
, "Successfully gave up primary role.\n");
2335 static int drbd_asb_recover_2p(struct drbd_conf
*mdev
) __must_hold(local
)
2339 switch (mdev
->tconn
->net_conf
->after_sb_2p
) {
2340 case ASB_DISCARD_YOUNGER_PRI
:
2341 case ASB_DISCARD_OLDER_PRI
:
2342 case ASB_DISCARD_LEAST_CHG
:
2343 case ASB_DISCARD_LOCAL
:
2344 case ASB_DISCARD_REMOTE
:
2346 case ASB_DISCARD_SECONDARY
:
2347 dev_err(DEV
, "Configuration error.\n");
2350 rv
= drbd_asb_recover_0p(mdev
);
2352 case ASB_DISCONNECT
:
2354 case ASB_CALL_HELPER
:
2355 hg
= drbd_asb_recover_0p(mdev
);
2357 enum drbd_state_rv rv2
;
2359 /* drbd_change_state() does not sleep while in SS_IN_TRANSIENT_STATE,
2360 * we might be here in C_WF_REPORT_PARAMS which is transient.
2361 * we do not need to wait for the after state change work either. */
2362 rv2
= drbd_change_state(mdev
, CS_VERBOSE
, NS(role
, R_SECONDARY
));
2363 if (rv2
!= SS_SUCCESS
) {
2364 drbd_khelper(mdev
, "pri-lost-after-sb");
2366 dev_warn(DEV
, "Successfully gave up primary role.\n");
2376 static void drbd_uuid_dump(struct drbd_conf
*mdev
, char *text
, u64
*uuid
,
2377 u64 bits
, u64 flags
)
2380 dev_info(DEV
, "%s uuid info vanished while I was looking!\n", text
);
2383 dev_info(DEV
, "%s %016llX:%016llX:%016llX:%016llX bits:%llu flags:%llX\n",
2385 (unsigned long long)uuid
[UI_CURRENT
],
2386 (unsigned long long)uuid
[UI_BITMAP
],
2387 (unsigned long long)uuid
[UI_HISTORY_START
],
2388 (unsigned long long)uuid
[UI_HISTORY_END
],
2389 (unsigned long long)bits
,
2390 (unsigned long long)flags
);
2394 100 after split brain try auto recover
2395 2 C_SYNC_SOURCE set BitMap
2396 1 C_SYNC_SOURCE use BitMap
2398 -1 C_SYNC_TARGET use BitMap
2399 -2 C_SYNC_TARGET set BitMap
2400 -100 after split brain, disconnect
2401 -1000 unrelated data
2402 -1091 requires proto 91
2403 -1096 requires proto 96
2405 static int drbd_uuid_compare(struct drbd_conf
*mdev
, int *rule_nr
) __must_hold(local
)
2410 self
= mdev
->ldev
->md
.uuid
[UI_CURRENT
] & ~((u64
)1);
2411 peer
= mdev
->p_uuid
[UI_CURRENT
] & ~((u64
)1);
2414 if (self
== UUID_JUST_CREATED
&& peer
== UUID_JUST_CREATED
)
2418 if ((self
== UUID_JUST_CREATED
|| self
== (u64
)0) &&
2419 peer
!= UUID_JUST_CREATED
)
2423 if (self
!= UUID_JUST_CREATED
&&
2424 (peer
== UUID_JUST_CREATED
|| peer
== (u64
)0))
2428 int rct
, dc
; /* roles at crash time */
2430 if (mdev
->p_uuid
[UI_BITMAP
] == (u64
)0 && mdev
->ldev
->md
.uuid
[UI_BITMAP
] != (u64
)0) {
2432 if (mdev
->tconn
->agreed_pro_version
< 91)
2435 if ((mdev
->ldev
->md
.uuid
[UI_BITMAP
] & ~((u64
)1)) == (mdev
->p_uuid
[UI_HISTORY_START
] & ~((u64
)1)) &&
2436 (mdev
->ldev
->md
.uuid
[UI_HISTORY_START
] & ~((u64
)1)) == (mdev
->p_uuid
[UI_HISTORY_START
+ 1] & ~((u64
)1))) {
2437 dev_info(DEV
, "was SyncSource, missed the resync finished event, corrected myself:\n");
2438 drbd_uuid_set_bm(mdev
, 0UL);
2440 drbd_uuid_dump(mdev
, "self", mdev
->ldev
->md
.uuid
,
2441 mdev
->state
.disk
>= D_NEGOTIATING
? drbd_bm_total_weight(mdev
) : 0, 0);
2444 dev_info(DEV
, "was SyncSource (peer failed to write sync_uuid)\n");
2451 if (mdev
->ldev
->md
.uuid
[UI_BITMAP
] == (u64
)0 && mdev
->p_uuid
[UI_BITMAP
] != (u64
)0) {
2453 if (mdev
->tconn
->agreed_pro_version
< 91)
2456 if ((mdev
->ldev
->md
.uuid
[UI_HISTORY_START
] & ~((u64
)1)) == (mdev
->p_uuid
[UI_BITMAP
] & ~((u64
)1)) &&
2457 (mdev
->ldev
->md
.uuid
[UI_HISTORY_START
+ 1] & ~((u64
)1)) == (mdev
->p_uuid
[UI_HISTORY_START
] & ~((u64
)1))) {
2458 dev_info(DEV
, "was SyncTarget, peer missed the resync finished event, corrected peer:\n");
2460 mdev
->p_uuid
[UI_HISTORY_START
+ 1] = mdev
->p_uuid
[UI_HISTORY_START
];
2461 mdev
->p_uuid
[UI_HISTORY_START
] = mdev
->p_uuid
[UI_BITMAP
];
2462 mdev
->p_uuid
[UI_BITMAP
] = 0UL;
2464 drbd_uuid_dump(mdev
, "peer", mdev
->p_uuid
, mdev
->p_uuid
[UI_SIZE
], mdev
->p_uuid
[UI_FLAGS
]);
2467 dev_info(DEV
, "was SyncTarget (failed to write sync_uuid)\n");
2474 /* Common power [off|failure] */
2475 rct
= (test_bit(CRASHED_PRIMARY
, &mdev
->flags
) ? 1 : 0) +
2476 (mdev
->p_uuid
[UI_FLAGS
] & 2);
2477 /* lowest bit is set when we were primary,
2478 * next bit (weight 2) is set when peer was primary */
2482 case 0: /* !self_pri && !peer_pri */ return 0;
2483 case 1: /* self_pri && !peer_pri */ return 1;
2484 case 2: /* !self_pri && peer_pri */ return -1;
2485 case 3: /* self_pri && peer_pri */
2486 dc
= test_bit(DISCARD_CONCURRENT
, &mdev
->tconn
->flags
);
2492 peer
= mdev
->p_uuid
[UI_BITMAP
] & ~((u64
)1);
2497 peer
= mdev
->p_uuid
[UI_HISTORY_START
] & ~((u64
)1);
2499 if (mdev
->tconn
->agreed_pro_version
< 96 ?
2500 (mdev
->ldev
->md
.uuid
[UI_HISTORY_START
] & ~((u64
)1)) ==
2501 (mdev
->p_uuid
[UI_HISTORY_START
+ 1] & ~((u64
)1)) :
2502 peer
+ UUID_NEW_BM_OFFSET
== (mdev
->p_uuid
[UI_BITMAP
] & ~((u64
)1))) {
2503 /* The last P_SYNC_UUID did not get though. Undo the last start of
2504 resync as sync source modifications of the peer's UUIDs. */
2506 if (mdev
->tconn
->agreed_pro_version
< 91)
2509 mdev
->p_uuid
[UI_BITMAP
] = mdev
->p_uuid
[UI_HISTORY_START
];
2510 mdev
->p_uuid
[UI_HISTORY_START
] = mdev
->p_uuid
[UI_HISTORY_START
+ 1];
2512 dev_info(DEV
, "Did not got last syncUUID packet, corrected:\n");
2513 drbd_uuid_dump(mdev
, "peer", mdev
->p_uuid
, mdev
->p_uuid
[UI_SIZE
], mdev
->p_uuid
[UI_FLAGS
]);
2520 self
= mdev
->ldev
->md
.uuid
[UI_CURRENT
] & ~((u64
)1);
2521 for (i
= UI_HISTORY_START
; i
<= UI_HISTORY_END
; i
++) {
2522 peer
= mdev
->p_uuid
[i
] & ~((u64
)1);
2528 self
= mdev
->ldev
->md
.uuid
[UI_BITMAP
] & ~((u64
)1);
2529 peer
= mdev
->p_uuid
[UI_CURRENT
] & ~((u64
)1);
2534 self
= mdev
->ldev
->md
.uuid
[UI_HISTORY_START
] & ~((u64
)1);
2536 if (mdev
->tconn
->agreed_pro_version
< 96 ?
2537 (mdev
->ldev
->md
.uuid
[UI_HISTORY_START
+ 1] & ~((u64
)1)) ==
2538 (mdev
->p_uuid
[UI_HISTORY_START
] & ~((u64
)1)) :
2539 self
+ UUID_NEW_BM_OFFSET
== (mdev
->ldev
->md
.uuid
[UI_BITMAP
] & ~((u64
)1))) {
2540 /* The last P_SYNC_UUID did not get though. Undo the last start of
2541 resync as sync source modifications of our UUIDs. */
2543 if (mdev
->tconn
->agreed_pro_version
< 91)
2546 _drbd_uuid_set(mdev
, UI_BITMAP
, mdev
->ldev
->md
.uuid
[UI_HISTORY_START
]);
2547 _drbd_uuid_set(mdev
, UI_HISTORY_START
, mdev
->ldev
->md
.uuid
[UI_HISTORY_START
+ 1]);
2549 dev_info(DEV
, "Last syncUUID did not get through, corrected:\n");
2550 drbd_uuid_dump(mdev
, "self", mdev
->ldev
->md
.uuid
,
2551 mdev
->state
.disk
>= D_NEGOTIATING
? drbd_bm_total_weight(mdev
) : 0, 0);
2559 peer
= mdev
->p_uuid
[UI_CURRENT
] & ~((u64
)1);
2560 for (i
= UI_HISTORY_START
; i
<= UI_HISTORY_END
; i
++) {
2561 self
= mdev
->ldev
->md
.uuid
[i
] & ~((u64
)1);
2567 self
= mdev
->ldev
->md
.uuid
[UI_BITMAP
] & ~((u64
)1);
2568 peer
= mdev
->p_uuid
[UI_BITMAP
] & ~((u64
)1);
2569 if (self
== peer
&& self
!= ((u64
)0))
2573 for (i
= UI_HISTORY_START
; i
<= UI_HISTORY_END
; i
++) {
2574 self
= mdev
->ldev
->md
.uuid
[i
] & ~((u64
)1);
2575 for (j
= UI_HISTORY_START
; j
<= UI_HISTORY_END
; j
++) {
2576 peer
= mdev
->p_uuid
[j
] & ~((u64
)1);
2585 /* drbd_sync_handshake() returns the new conn state on success, or
2586 CONN_MASK (-1) on failure.
2588 static enum drbd_conns
drbd_sync_handshake(struct drbd_conf
*mdev
, enum drbd_role peer_role
,
2589 enum drbd_disk_state peer_disk
) __must_hold(local
)
2592 enum drbd_conns rv
= C_MASK
;
2593 enum drbd_disk_state mydisk
;
2595 mydisk
= mdev
->state
.disk
;
2596 if (mydisk
== D_NEGOTIATING
)
2597 mydisk
= mdev
->new_state_tmp
.disk
;
2599 dev_info(DEV
, "drbd_sync_handshake:\n");
2600 drbd_uuid_dump(mdev
, "self", mdev
->ldev
->md
.uuid
, mdev
->comm_bm_set
, 0);
2601 drbd_uuid_dump(mdev
, "peer", mdev
->p_uuid
,
2602 mdev
->p_uuid
[UI_SIZE
], mdev
->p_uuid
[UI_FLAGS
]);
2604 hg
= drbd_uuid_compare(mdev
, &rule_nr
);
2606 dev_info(DEV
, "uuid_compare()=%d by rule %d\n", hg
, rule_nr
);
2609 dev_alert(DEV
, "Unrelated data, aborting!\n");
2613 dev_alert(DEV
, "To resolve this both sides have to support at least protocol %d\n", -hg
- 1000);
2617 if ((mydisk
== D_INCONSISTENT
&& peer_disk
> D_INCONSISTENT
) ||
2618 (peer_disk
== D_INCONSISTENT
&& mydisk
> D_INCONSISTENT
)) {
2619 int f
= (hg
== -100) || abs(hg
) == 2;
2620 hg
= mydisk
> D_INCONSISTENT
? 1 : -1;
2623 dev_info(DEV
, "Becoming sync %s due to disk states.\n",
2624 hg
> 0 ? "source" : "target");
2628 drbd_khelper(mdev
, "initial-split-brain");
2630 if (hg
== 100 || (hg
== -100 && mdev
->tconn
->net_conf
->always_asbp
)) {
2631 int pcount
= (mdev
->state
.role
== R_PRIMARY
)
2632 + (peer_role
== R_PRIMARY
);
2633 int forced
= (hg
== -100);
2637 hg
= drbd_asb_recover_0p(mdev
);
2640 hg
= drbd_asb_recover_1p(mdev
);
2643 hg
= drbd_asb_recover_2p(mdev
);
2646 if (abs(hg
) < 100) {
2647 dev_warn(DEV
, "Split-Brain detected, %d primaries, "
2648 "automatically solved. Sync from %s node\n",
2649 pcount
, (hg
< 0) ? "peer" : "this");
2651 dev_warn(DEV
, "Doing a full sync, since"
2652 " UUIDs where ambiguous.\n");
2659 if (mdev
->tconn
->net_conf
->want_lose
&& !(mdev
->p_uuid
[UI_FLAGS
]&1))
2661 if (!mdev
->tconn
->net_conf
->want_lose
&& (mdev
->p_uuid
[UI_FLAGS
]&1))
2665 dev_warn(DEV
, "Split-Brain detected, manually solved. "
2666 "Sync from %s node\n",
2667 (hg
< 0) ? "peer" : "this");
2671 /* FIXME this log message is not correct if we end up here
2672 * after an attempted attach on a diskless node.
2673 * We just refuse to attach -- well, we drop the "connection"
2674 * to that disk, in a way... */
2675 dev_alert(DEV
, "Split-Brain detected but unresolved, dropping connection!\n");
2676 drbd_khelper(mdev
, "split-brain");
2680 if (hg
> 0 && mydisk
<= D_INCONSISTENT
) {
2681 dev_err(DEV
, "I shall become SyncSource, but I am inconsistent!\n");
2685 if (hg
< 0 && /* by intention we do not use mydisk here. */
2686 mdev
->state
.role
== R_PRIMARY
&& mdev
->state
.disk
>= D_CONSISTENT
) {
2687 switch (mdev
->tconn
->net_conf
->rr_conflict
) {
2688 case ASB_CALL_HELPER
:
2689 drbd_khelper(mdev
, "pri-lost");
2691 case ASB_DISCONNECT
:
2692 dev_err(DEV
, "I shall become SyncTarget, but I am primary!\n");
2695 dev_warn(DEV
, "Becoming SyncTarget, violating the stable-data"
2700 if (mdev
->tconn
->net_conf
->dry_run
|| test_bit(CONN_DRY_RUN
, &mdev
->flags
)) {
2702 dev_info(DEV
, "dry-run connect: No resync, would become Connected immediately.\n");
2704 dev_info(DEV
, "dry-run connect: Would become %s, doing a %s resync.",
2705 drbd_conn_str(hg
> 0 ? C_SYNC_SOURCE
: C_SYNC_TARGET
),
2706 abs(hg
) >= 2 ? "full" : "bit-map based");
2711 dev_info(DEV
, "Writing the whole bitmap, full sync required after drbd_sync_handshake.\n");
2712 if (drbd_bitmap_io(mdev
, &drbd_bmio_set_n_write
, "set_n_write from sync_handshake",
2713 BM_LOCKED_SET_ALLOWED
))
2717 if (hg
> 0) { /* become sync source. */
2719 } else if (hg
< 0) { /* become sync target */
2723 if (drbd_bm_total_weight(mdev
)) {
2724 dev_info(DEV
, "No resync, but %lu bits in bitmap!\n",
2725 drbd_bm_total_weight(mdev
));
2732 /* returns 1 if invalid */
2733 static int cmp_after_sb(enum drbd_after_sb_p peer
, enum drbd_after_sb_p self
)
2735 /* ASB_DISCARD_REMOTE - ASB_DISCARD_LOCAL is valid */
2736 if ((peer
== ASB_DISCARD_REMOTE
&& self
== ASB_DISCARD_LOCAL
) ||
2737 (self
== ASB_DISCARD_REMOTE
&& peer
== ASB_DISCARD_LOCAL
))
2740 /* any other things with ASB_DISCARD_REMOTE or ASB_DISCARD_LOCAL are invalid */
2741 if (peer
== ASB_DISCARD_REMOTE
|| peer
== ASB_DISCARD_LOCAL
||
2742 self
== ASB_DISCARD_REMOTE
|| self
== ASB_DISCARD_LOCAL
)
2745 /* everything else is valid if they are equal on both sides. */
2749 /* everything es is invalid. */
2753 static int receive_protocol(struct drbd_conf
*mdev
, enum drbd_packet cmd
,
2754 unsigned int data_size
)
2756 struct p_protocol
*p
= &mdev
->tconn
->data
.rbuf
.protocol
;
2757 int p_proto
, p_after_sb_0p
, p_after_sb_1p
, p_after_sb_2p
;
2758 int p_want_lose
, p_two_primaries
, cf
;
2759 char p_integrity_alg
[SHARED_SECRET_MAX
] = "";
2761 p_proto
= be32_to_cpu(p
->protocol
);
2762 p_after_sb_0p
= be32_to_cpu(p
->after_sb_0p
);
2763 p_after_sb_1p
= be32_to_cpu(p
->after_sb_1p
);
2764 p_after_sb_2p
= be32_to_cpu(p
->after_sb_2p
);
2765 p_two_primaries
= be32_to_cpu(p
->two_primaries
);
2766 cf
= be32_to_cpu(p
->conn_flags
);
2767 p_want_lose
= cf
& CF_WANT_LOSE
;
2769 clear_bit(CONN_DRY_RUN
, &mdev
->flags
);
2771 if (cf
& CF_DRY_RUN
)
2772 set_bit(CONN_DRY_RUN
, &mdev
->flags
);
2774 if (p_proto
!= mdev
->tconn
->net_conf
->wire_protocol
) {
2775 dev_err(DEV
, "incompatible communication protocols\n");
2779 if (cmp_after_sb(p_after_sb_0p
, mdev
->tconn
->net_conf
->after_sb_0p
)) {
2780 dev_err(DEV
, "incompatible after-sb-0pri settings\n");
2784 if (cmp_after_sb(p_after_sb_1p
, mdev
->tconn
->net_conf
->after_sb_1p
)) {
2785 dev_err(DEV
, "incompatible after-sb-1pri settings\n");
2789 if (cmp_after_sb(p_after_sb_2p
, mdev
->tconn
->net_conf
->after_sb_2p
)) {
2790 dev_err(DEV
, "incompatible after-sb-2pri settings\n");
2794 if (p_want_lose
&& mdev
->tconn
->net_conf
->want_lose
) {
2795 dev_err(DEV
, "both sides have the 'want_lose' flag set\n");
2799 if (p_two_primaries
!= mdev
->tconn
->net_conf
->two_primaries
) {
2800 dev_err(DEV
, "incompatible setting of the two-primaries options\n");
2804 if (mdev
->tconn
->agreed_pro_version
>= 87) {
2805 unsigned char *my_alg
= mdev
->tconn
->net_conf
->integrity_alg
;
2807 if (drbd_recv(mdev
->tconn
, p_integrity_alg
, data_size
) != data_size
)
2810 p_integrity_alg
[SHARED_SECRET_MAX
-1] = 0;
2811 if (strcmp(p_integrity_alg
, my_alg
)) {
2812 dev_err(DEV
, "incompatible setting of the data-integrity-alg\n");
2815 dev_info(DEV
, "data-integrity-alg: %s\n",
2816 my_alg
[0] ? my_alg
: (unsigned char *)"<not-used>");
2822 drbd_force_state(mdev
, NS(conn
, C_DISCONNECTING
));
2827 * input: alg name, feature name
2828 * return: NULL (alg name was "")
2829 * ERR_PTR(error) if something goes wrong
2830 * or the crypto hash ptr, if it worked out ok. */
2831 struct crypto_hash
*drbd_crypto_alloc_digest_safe(const struct drbd_conf
*mdev
,
2832 const char *alg
, const char *name
)
2834 struct crypto_hash
*tfm
;
2839 tfm
= crypto_alloc_hash(alg
, 0, CRYPTO_ALG_ASYNC
);
2841 dev_err(DEV
, "Can not allocate \"%s\" as %s (reason: %ld)\n",
2842 alg
, name
, PTR_ERR(tfm
));
2845 if (!drbd_crypto_is_hash(crypto_hash_tfm(tfm
))) {
2846 crypto_free_hash(tfm
);
2847 dev_err(DEV
, "\"%s\" is not a digest (%s)\n", alg
, name
);
2848 return ERR_PTR(-EINVAL
);
2853 static int receive_SyncParam(struct drbd_conf
*mdev
, enum drbd_packet cmd
,
2854 unsigned int packet_size
)
2857 struct p_rs_param_95
*p
= &mdev
->tconn
->data
.rbuf
.rs_param_95
;
2858 unsigned int header_size
, data_size
, exp_max_sz
;
2859 struct crypto_hash
*verify_tfm
= NULL
;
2860 struct crypto_hash
*csums_tfm
= NULL
;
2861 const int apv
= mdev
->tconn
->agreed_pro_version
;
2862 int *rs_plan_s
= NULL
;
2865 exp_max_sz
= apv
<= 87 ? sizeof(struct p_rs_param
)
2866 : apv
== 88 ? sizeof(struct p_rs_param
)
2868 : apv
<= 94 ? sizeof(struct p_rs_param_89
)
2869 : /* apv >= 95 */ sizeof(struct p_rs_param_95
);
2871 if (packet_size
> exp_max_sz
) {
2872 dev_err(DEV
, "SyncParam packet too long: received %u, expected <= %u bytes\n",
2873 packet_size
, exp_max_sz
);
2878 header_size
= sizeof(struct p_rs_param
) - sizeof(struct p_header
);
2879 data_size
= packet_size
- header_size
;
2880 } else if (apv
<= 94) {
2881 header_size
= sizeof(struct p_rs_param_89
) - sizeof(struct p_header
);
2882 data_size
= packet_size
- header_size
;
2883 D_ASSERT(data_size
== 0);
2885 header_size
= sizeof(struct p_rs_param_95
) - sizeof(struct p_header
);
2886 data_size
= packet_size
- header_size
;
2887 D_ASSERT(data_size
== 0);
2890 /* initialize verify_alg and csums_alg */
2891 memset(p
->verify_alg
, 0, 2 * SHARED_SECRET_MAX
);
2893 if (drbd_recv(mdev
->tconn
, &p
->head
.payload
, header_size
) != header_size
)
2896 mdev
->sync_conf
.rate
= be32_to_cpu(p
->rate
);
2900 if (data_size
> SHARED_SECRET_MAX
) {
2901 dev_err(DEV
, "verify-alg too long, "
2902 "peer wants %u, accepting only %u byte\n",
2903 data_size
, SHARED_SECRET_MAX
);
2907 if (drbd_recv(mdev
->tconn
, p
->verify_alg
, data_size
) != data_size
)
2910 /* we expect NUL terminated string */
2911 /* but just in case someone tries to be evil */
2912 D_ASSERT(p
->verify_alg
[data_size
-1] == 0);
2913 p
->verify_alg
[data_size
-1] = 0;
2915 } else /* apv >= 89 */ {
2916 /* we still expect NUL terminated strings */
2917 /* but just in case someone tries to be evil */
2918 D_ASSERT(p
->verify_alg
[SHARED_SECRET_MAX
-1] == 0);
2919 D_ASSERT(p
->csums_alg
[SHARED_SECRET_MAX
-1] == 0);
2920 p
->verify_alg
[SHARED_SECRET_MAX
-1] = 0;
2921 p
->csums_alg
[SHARED_SECRET_MAX
-1] = 0;
2924 if (strcmp(mdev
->sync_conf
.verify_alg
, p
->verify_alg
)) {
2925 if (mdev
->state
.conn
== C_WF_REPORT_PARAMS
) {
2926 dev_err(DEV
, "Different verify-alg settings. me=\"%s\" peer=\"%s\"\n",
2927 mdev
->sync_conf
.verify_alg
, p
->verify_alg
);
2930 verify_tfm
= drbd_crypto_alloc_digest_safe(mdev
,
2931 p
->verify_alg
, "verify-alg");
2932 if (IS_ERR(verify_tfm
)) {
2938 if (apv
>= 89 && strcmp(mdev
->sync_conf
.csums_alg
, p
->csums_alg
)) {
2939 if (mdev
->state
.conn
== C_WF_REPORT_PARAMS
) {
2940 dev_err(DEV
, "Different csums-alg settings. me=\"%s\" peer=\"%s\"\n",
2941 mdev
->sync_conf
.csums_alg
, p
->csums_alg
);
2944 csums_tfm
= drbd_crypto_alloc_digest_safe(mdev
,
2945 p
->csums_alg
, "csums-alg");
2946 if (IS_ERR(csums_tfm
)) {
2953 mdev
->sync_conf
.rate
= be32_to_cpu(p
->rate
);
2954 mdev
->sync_conf
.c_plan_ahead
= be32_to_cpu(p
->c_plan_ahead
);
2955 mdev
->sync_conf
.c_delay_target
= be32_to_cpu(p
->c_delay_target
);
2956 mdev
->sync_conf
.c_fill_target
= be32_to_cpu(p
->c_fill_target
);
2957 mdev
->sync_conf
.c_max_rate
= be32_to_cpu(p
->c_max_rate
);
2959 fifo_size
= (mdev
->sync_conf
.c_plan_ahead
* 10 * SLEEP_TIME
) / HZ
;
2960 if (fifo_size
!= mdev
->rs_plan_s
.size
&& fifo_size
> 0) {
2961 rs_plan_s
= kzalloc(sizeof(int) * fifo_size
, GFP_KERNEL
);
2963 dev_err(DEV
, "kmalloc of fifo_buffer failed");
2969 spin_lock(&mdev
->peer_seq_lock
);
2970 /* lock against drbd_nl_syncer_conf() */
2972 strcpy(mdev
->sync_conf
.verify_alg
, p
->verify_alg
);
2973 mdev
->sync_conf
.verify_alg_len
= strlen(p
->verify_alg
) + 1;
2974 crypto_free_hash(mdev
->verify_tfm
);
2975 mdev
->verify_tfm
= verify_tfm
;
2976 dev_info(DEV
, "using verify-alg: \"%s\"\n", p
->verify_alg
);
2979 strcpy(mdev
->sync_conf
.csums_alg
, p
->csums_alg
);
2980 mdev
->sync_conf
.csums_alg_len
= strlen(p
->csums_alg
) + 1;
2981 crypto_free_hash(mdev
->csums_tfm
);
2982 mdev
->csums_tfm
= csums_tfm
;
2983 dev_info(DEV
, "using csums-alg: \"%s\"\n", p
->csums_alg
);
2985 if (fifo_size
!= mdev
->rs_plan_s
.size
) {
2986 kfree(mdev
->rs_plan_s
.values
);
2987 mdev
->rs_plan_s
.values
= rs_plan_s
;
2988 mdev
->rs_plan_s
.size
= fifo_size
;
2989 mdev
->rs_planed
= 0;
2991 spin_unlock(&mdev
->peer_seq_lock
);
2996 /* just for completeness: actually not needed,
2997 * as this is not reached if csums_tfm was ok. */
2998 crypto_free_hash(csums_tfm
);
2999 /* but free the verify_tfm again, if csums_tfm did not work out */
3000 crypto_free_hash(verify_tfm
);
3001 drbd_force_state(mdev
, NS(conn
, C_DISCONNECTING
));
3005 /* warn if the arguments differ by more than 12.5% */
3006 static void warn_if_differ_considerably(struct drbd_conf
*mdev
,
3007 const char *s
, sector_t a
, sector_t b
)
3010 if (a
== 0 || b
== 0)
3012 d
= (a
> b
) ? (a
- b
) : (b
- a
);
3013 if (d
> (a
>>3) || d
> (b
>>3))
3014 dev_warn(DEV
, "Considerable difference in %s: %llus vs. %llus\n", s
,
3015 (unsigned long long)a
, (unsigned long long)b
);
3018 static int receive_sizes(struct drbd_conf
*mdev
, enum drbd_packet cmd
,
3019 unsigned int data_size
)
3021 struct p_sizes
*p
= &mdev
->tconn
->data
.rbuf
.sizes
;
3022 enum determine_dev_size dd
= unchanged
;
3023 sector_t p_size
, p_usize
, my_usize
;
3024 int ldsc
= 0; /* local disk size changed */
3025 enum dds_flags ddsf
;
3027 p_size
= be64_to_cpu(p
->d_size
);
3028 p_usize
= be64_to_cpu(p
->u_size
);
3030 if (p_size
== 0 && mdev
->state
.disk
== D_DISKLESS
) {
3031 dev_err(DEV
, "some backing storage is needed\n");
3032 drbd_force_state(mdev
, NS(conn
, C_DISCONNECTING
));
3036 /* just store the peer's disk size for now.
3037 * we still need to figure out whether we accept that. */
3038 mdev
->p_size
= p_size
;
3040 if (get_ldev(mdev
)) {
3041 warn_if_differ_considerably(mdev
, "lower level device sizes",
3042 p_size
, drbd_get_max_capacity(mdev
->ldev
));
3043 warn_if_differ_considerably(mdev
, "user requested size",
3044 p_usize
, mdev
->ldev
->dc
.disk_size
);
3046 /* if this is the first connect, or an otherwise expected
3047 * param exchange, choose the minimum */
3048 if (mdev
->state
.conn
== C_WF_REPORT_PARAMS
)
3049 p_usize
= min_not_zero((sector_t
)mdev
->ldev
->dc
.disk_size
,
3052 my_usize
= mdev
->ldev
->dc
.disk_size
;
3054 if (mdev
->ldev
->dc
.disk_size
!= p_usize
) {
3055 mdev
->ldev
->dc
.disk_size
= p_usize
;
3056 dev_info(DEV
, "Peer sets u_size to %lu sectors\n",
3057 (unsigned long)mdev
->ldev
->dc
.disk_size
);
3060 /* Never shrink a device with usable data during connect.
3061 But allow online shrinking if we are connected. */
3062 if (drbd_new_dev_size(mdev
, mdev
->ldev
, 0) <
3063 drbd_get_capacity(mdev
->this_bdev
) &&
3064 mdev
->state
.disk
>= D_OUTDATED
&&
3065 mdev
->state
.conn
< C_CONNECTED
) {
3066 dev_err(DEV
, "The peer's disk size is too small!\n");
3067 drbd_force_state(mdev
, NS(conn
, C_DISCONNECTING
));
3068 mdev
->ldev
->dc
.disk_size
= my_usize
;
3075 ddsf
= be16_to_cpu(p
->dds_flags
);
3076 if (get_ldev(mdev
)) {
3077 dd
= drbd_determine_dev_size(mdev
, ddsf
);
3079 if (dd
== dev_size_error
)
3083 /* I am diskless, need to accept the peer's size. */
3084 drbd_set_my_capacity(mdev
, p_size
);
3087 mdev
->peer_max_bio_size
= be32_to_cpu(p
->max_bio_size
);
3088 drbd_reconsider_max_bio_size(mdev
);
3090 if (get_ldev(mdev
)) {
3091 if (mdev
->ldev
->known_size
!= drbd_get_capacity(mdev
->ldev
->backing_bdev
)) {
3092 mdev
->ldev
->known_size
= drbd_get_capacity(mdev
->ldev
->backing_bdev
);
3099 if (mdev
->state
.conn
> C_WF_REPORT_PARAMS
) {
3100 if (be64_to_cpu(p
->c_size
) !=
3101 drbd_get_capacity(mdev
->this_bdev
) || ldsc
) {
3102 /* we have different sizes, probably peer
3103 * needs to know my new size... */
3104 drbd_send_sizes(mdev
, 0, ddsf
);
3106 if (test_and_clear_bit(RESIZE_PENDING
, &mdev
->flags
) ||
3107 (dd
== grew
&& mdev
->state
.conn
== C_CONNECTED
)) {
3108 if (mdev
->state
.pdsk
>= D_INCONSISTENT
&&
3109 mdev
->state
.disk
>= D_INCONSISTENT
) {
3110 if (ddsf
& DDSF_NO_RESYNC
)
3111 dev_info(DEV
, "Resync of new storage suppressed with --assume-clean\n");
3113 resync_after_online_grow(mdev
);
3115 set_bit(RESYNC_AFTER_NEG
, &mdev
->flags
);
3122 static int receive_uuids(struct drbd_conf
*mdev
, enum drbd_packet cmd
,
3123 unsigned int data_size
)
3125 struct p_uuids
*p
= &mdev
->tconn
->data
.rbuf
.uuids
;
3127 int i
, updated_uuids
= 0;
3129 p_uuid
= kmalloc(sizeof(u64
)*UI_EXTENDED_SIZE
, GFP_NOIO
);
3131 for (i
= UI_CURRENT
; i
< UI_EXTENDED_SIZE
; i
++)
3132 p_uuid
[i
] = be64_to_cpu(p
->uuid
[i
]);
3134 kfree(mdev
->p_uuid
);
3135 mdev
->p_uuid
= p_uuid
;
3137 if (mdev
->state
.conn
< C_CONNECTED
&&
3138 mdev
->state
.disk
< D_INCONSISTENT
&&
3139 mdev
->state
.role
== R_PRIMARY
&&
3140 (mdev
->ed_uuid
& ~((u64
)1)) != (p_uuid
[UI_CURRENT
] & ~((u64
)1))) {
3141 dev_err(DEV
, "Can only connect to data with current UUID=%016llX\n",
3142 (unsigned long long)mdev
->ed_uuid
);
3143 drbd_force_state(mdev
, NS(conn
, C_DISCONNECTING
));
3147 if (get_ldev(mdev
)) {
3148 int skip_initial_sync
=
3149 mdev
->state
.conn
== C_CONNECTED
&&
3150 mdev
->tconn
->agreed_pro_version
>= 90 &&
3151 mdev
->ldev
->md
.uuid
[UI_CURRENT
] == UUID_JUST_CREATED
&&
3152 (p_uuid
[UI_FLAGS
] & 8);
3153 if (skip_initial_sync
) {
3154 dev_info(DEV
, "Accepted new current UUID, preparing to skip initial sync\n");
3155 drbd_bitmap_io(mdev
, &drbd_bmio_clear_n_write
,
3156 "clear_n_write from receive_uuids",
3157 BM_LOCKED_TEST_ALLOWED
);
3158 _drbd_uuid_set(mdev
, UI_CURRENT
, p_uuid
[UI_CURRENT
]);
3159 _drbd_uuid_set(mdev
, UI_BITMAP
, 0);
3160 _drbd_set_state(_NS2(mdev
, disk
, D_UP_TO_DATE
, pdsk
, D_UP_TO_DATE
),
3166 } else if (mdev
->state
.disk
< D_INCONSISTENT
&&
3167 mdev
->state
.role
== R_PRIMARY
) {
3168 /* I am a diskless primary, the peer just created a new current UUID
3170 updated_uuids
= drbd_set_ed_uuid(mdev
, p_uuid
[UI_CURRENT
]);
3173 /* Before we test for the disk state, we should wait until an eventually
3174 ongoing cluster wide state change is finished. That is important if
3175 we are primary and are detaching from our disk. We need to see the
3176 new disk state... */
3177 mutex_lock(mdev
->state_mutex
);
3178 mutex_unlock(mdev
->state_mutex
);
3179 if (mdev
->state
.conn
>= C_CONNECTED
&& mdev
->state
.disk
< D_INCONSISTENT
)
3180 updated_uuids
|= drbd_set_ed_uuid(mdev
, p_uuid
[UI_CURRENT
]);
3183 drbd_print_uuids(mdev
, "receiver updated UUIDs to");
3189 * convert_state() - Converts the peer's view of the cluster state to our point of view
3190 * @ps: The state as seen by the peer.
3192 static union drbd_state
convert_state(union drbd_state ps
)
3194 union drbd_state ms
;
3196 static enum drbd_conns c_tab
[] = {
3197 [C_CONNECTED
] = C_CONNECTED
,
3199 [C_STARTING_SYNC_S
] = C_STARTING_SYNC_T
,
3200 [C_STARTING_SYNC_T
] = C_STARTING_SYNC_S
,
3201 [C_DISCONNECTING
] = C_TEAR_DOWN
, /* C_NETWORK_FAILURE, */
3202 [C_VERIFY_S
] = C_VERIFY_T
,
3208 ms
.conn
= c_tab
[ps
.conn
];
3213 ms
.peer_isp
= (ps
.aftr_isp
| ps
.user_isp
);
3218 static int receive_req_state(struct drbd_conf
*mdev
, enum drbd_packet cmd
,
3219 unsigned int data_size
)
3221 struct p_req_state
*p
= &mdev
->tconn
->data
.rbuf
.req_state
;
3222 union drbd_state mask
, val
;
3223 enum drbd_state_rv rv
;
3225 mask
.i
= be32_to_cpu(p
->mask
);
3226 val
.i
= be32_to_cpu(p
->val
);
3228 if (test_bit(DISCARD_CONCURRENT
, &mdev
->tconn
->flags
) &&
3229 mutex_is_locked(mdev
->state_mutex
)) {
3230 drbd_send_sr_reply(mdev
, SS_CONCURRENT_ST_CHG
);
3234 mask
= convert_state(mask
);
3235 val
= convert_state(val
);
3237 if (cmd
== P_CONN_ST_CHG_REQ
) {
3238 rv
= conn_request_state(mdev
->tconn
, mask
, val
, CS_VERBOSE
| CS_LOCAL_ONLY
);
3239 conn_send_sr_reply(mdev
->tconn
, rv
);
3241 rv
= drbd_change_state(mdev
, CS_VERBOSE
, mask
, val
);
3242 drbd_send_sr_reply(mdev
, rv
);
3250 static int receive_state(struct drbd_conf
*mdev
, enum drbd_packet cmd
,
3251 unsigned int data_size
)
3253 struct p_state
*p
= &mdev
->tconn
->data
.rbuf
.state
;
3254 union drbd_state os
, ns
, peer_state
;
3255 enum drbd_disk_state real_peer_disk
;
3256 enum chg_state_flags cs_flags
;
3259 peer_state
.i
= be32_to_cpu(p
->state
);
3261 real_peer_disk
= peer_state
.disk
;
3262 if (peer_state
.disk
== D_NEGOTIATING
) {
3263 real_peer_disk
= mdev
->p_uuid
[UI_FLAGS
] & 4 ? D_INCONSISTENT
: D_CONSISTENT
;
3264 dev_info(DEV
, "real peer disk state = %s\n", drbd_disk_str(real_peer_disk
));
3267 spin_lock_irq(&mdev
->tconn
->req_lock
);
3269 os
= ns
= mdev
->state
;
3270 spin_unlock_irq(&mdev
->tconn
->req_lock
);
3272 /* peer says his disk is uptodate, while we think it is inconsistent,
3273 * and this happens while we think we have a sync going on. */
3274 if (os
.pdsk
== D_INCONSISTENT
&& real_peer_disk
== D_UP_TO_DATE
&&
3275 os
.conn
> C_CONNECTED
&& os
.disk
== D_UP_TO_DATE
) {
3276 /* If we are (becoming) SyncSource, but peer is still in sync
3277 * preparation, ignore its uptodate-ness to avoid flapping, it
3278 * will change to inconsistent once the peer reaches active
3280 * It may have changed syncer-paused flags, however, so we
3281 * cannot ignore this completely. */
3282 if (peer_state
.conn
> C_CONNECTED
&&
3283 peer_state
.conn
< C_SYNC_SOURCE
)
3284 real_peer_disk
= D_INCONSISTENT
;
3286 /* if peer_state changes to connected at the same time,
3287 * it explicitly notifies us that it finished resync.
3288 * Maybe we should finish it up, too? */
3289 else if (os
.conn
>= C_SYNC_SOURCE
&&
3290 peer_state
.conn
== C_CONNECTED
) {
3291 if (drbd_bm_total_weight(mdev
) <= mdev
->rs_failed
)
3292 drbd_resync_finished(mdev
);
3297 /* peer says his disk is inconsistent, while we think it is uptodate,
3298 * and this happens while the peer still thinks we have a sync going on,
3299 * but we think we are already done with the sync.
3300 * We ignore this to avoid flapping pdsk.
3301 * This should not happen, if the peer is a recent version of drbd. */
3302 if (os
.pdsk
== D_UP_TO_DATE
&& real_peer_disk
== D_INCONSISTENT
&&
3303 os
.conn
== C_CONNECTED
&& peer_state
.conn
> C_SYNC_SOURCE
)
3304 real_peer_disk
= D_UP_TO_DATE
;
3306 if (ns
.conn
== C_WF_REPORT_PARAMS
)
3307 ns
.conn
= C_CONNECTED
;
3309 if (peer_state
.conn
== C_AHEAD
)
3312 if (mdev
->p_uuid
&& peer_state
.disk
>= D_NEGOTIATING
&&
3313 get_ldev_if_state(mdev
, D_NEGOTIATING
)) {
3314 int cr
; /* consider resync */
3316 /* if we established a new connection */
3317 cr
= (os
.conn
< C_CONNECTED
);
3318 /* if we had an established connection
3319 * and one of the nodes newly attaches a disk */
3320 cr
|= (os
.conn
== C_CONNECTED
&&
3321 (peer_state
.disk
== D_NEGOTIATING
||
3322 os
.disk
== D_NEGOTIATING
));
3323 /* if we have both been inconsistent, and the peer has been
3324 * forced to be UpToDate with --overwrite-data */
3325 cr
|= test_bit(CONSIDER_RESYNC
, &mdev
->flags
);
3326 /* if we had been plain connected, and the admin requested to
3327 * start a sync by "invalidate" or "invalidate-remote" */
3328 cr
|= (os
.conn
== C_CONNECTED
&&
3329 (peer_state
.conn
>= C_STARTING_SYNC_S
&&
3330 peer_state
.conn
<= C_WF_BITMAP_T
));
3333 ns
.conn
= drbd_sync_handshake(mdev
, peer_state
.role
, real_peer_disk
);
3336 if (ns
.conn
== C_MASK
) {
3337 ns
.conn
= C_CONNECTED
;
3338 if (mdev
->state
.disk
== D_NEGOTIATING
) {
3339 drbd_force_state(mdev
, NS(disk
, D_FAILED
));
3340 } else if (peer_state
.disk
== D_NEGOTIATING
) {
3341 dev_err(DEV
, "Disk attach process on the peer node was aborted.\n");
3342 peer_state
.disk
= D_DISKLESS
;
3343 real_peer_disk
= D_DISKLESS
;
3345 if (test_and_clear_bit(CONN_DRY_RUN
, &mdev
->flags
))
3347 D_ASSERT(os
.conn
== C_WF_REPORT_PARAMS
);
3348 drbd_force_state(mdev
, NS(conn
, C_DISCONNECTING
));
3354 spin_lock_irq(&mdev
->tconn
->req_lock
);
3355 if (mdev
->state
.i
!= os
.i
)
3357 clear_bit(CONSIDER_RESYNC
, &mdev
->flags
);
3358 ns
.peer
= peer_state
.role
;
3359 ns
.pdsk
= real_peer_disk
;
3360 ns
.peer_isp
= (peer_state
.aftr_isp
| peer_state
.user_isp
);
3361 if ((ns
.conn
== C_CONNECTED
|| ns
.conn
== C_WF_BITMAP_S
) && ns
.disk
== D_NEGOTIATING
)
3362 ns
.disk
= mdev
->new_state_tmp
.disk
;
3363 cs_flags
= CS_VERBOSE
+ (os
.conn
< C_CONNECTED
&& ns
.conn
>= C_CONNECTED
? 0 : CS_HARD
);
3364 if (ns
.pdsk
== D_CONSISTENT
&& is_susp(ns
) && ns
.conn
== C_CONNECTED
&& os
.conn
< C_CONNECTED
&&
3365 test_bit(NEW_CUR_UUID
, &mdev
->flags
)) {
3366 /* Do not allow tl_restart(RESEND) for a rebooted peer. We can only allow this
3367 for temporal network outages! */
3368 spin_unlock_irq(&mdev
->tconn
->req_lock
);
3369 dev_err(DEV
, "Aborting Connect, can not thaw IO with an only Consistent peer\n");
3371 drbd_uuid_new_current(mdev
);
3372 clear_bit(NEW_CUR_UUID
, &mdev
->flags
);
3373 drbd_force_state(mdev
, NS2(conn
, C_PROTOCOL_ERROR
, susp
, 0));
3376 rv
= _drbd_set_state(mdev
, ns
, cs_flags
, NULL
);
3378 spin_unlock_irq(&mdev
->tconn
->req_lock
);
3380 if (rv
< SS_SUCCESS
) {
3381 drbd_force_state(mdev
, NS(conn
, C_DISCONNECTING
));
3385 if (os
.conn
> C_WF_REPORT_PARAMS
) {
3386 if (ns
.conn
> C_CONNECTED
&& peer_state
.conn
<= C_CONNECTED
&&
3387 peer_state
.disk
!= D_NEGOTIATING
) {
3388 /* we want resync, peer has not yet decided to sync... */
3389 /* Nowadays only used when forcing a node into primary role and
3390 setting its disk to UpToDate with that */
3391 drbd_send_uuids(mdev
);
3392 drbd_send_state(mdev
);
3396 mdev
->tconn
->net_conf
->want_lose
= 0;
3398 drbd_md_sync(mdev
); /* update connected indicator, la_size, ... */
3403 static int receive_sync_uuid(struct drbd_conf
*mdev
, enum drbd_packet cmd
,
3404 unsigned int data_size
)
3406 struct p_rs_uuid
*p
= &mdev
->tconn
->data
.rbuf
.rs_uuid
;
3408 wait_event(mdev
->misc_wait
,
3409 mdev
->state
.conn
== C_WF_SYNC_UUID
||
3410 mdev
->state
.conn
== C_BEHIND
||
3411 mdev
->state
.conn
< C_CONNECTED
||
3412 mdev
->state
.disk
< D_NEGOTIATING
);
3414 /* D_ASSERT( mdev->state.conn == C_WF_SYNC_UUID ); */
3416 /* Here the _drbd_uuid_ functions are right, current should
3417 _not_ be rotated into the history */
3418 if (get_ldev_if_state(mdev
, D_NEGOTIATING
)) {
3419 _drbd_uuid_set(mdev
, UI_CURRENT
, be64_to_cpu(p
->uuid
));
3420 _drbd_uuid_set(mdev
, UI_BITMAP
, 0UL);
3422 drbd_print_uuids(mdev
, "updated sync uuid");
3423 drbd_start_resync(mdev
, C_SYNC_TARGET
);
3427 dev_err(DEV
, "Ignoring SyncUUID packet!\n");
3433 * receive_bitmap_plain
3435 * Return 0 when done, 1 when another iteration is needed, and a negative error
3436 * code upon failure.
3439 receive_bitmap_plain(struct drbd_conf
*mdev
, unsigned int data_size
,
3440 unsigned long *buffer
, struct bm_xfer_ctx
*c
)
3442 unsigned num_words
= min_t(size_t, BM_PACKET_WORDS
, c
->bm_words
- c
->word_offset
);
3443 unsigned want
= num_words
* sizeof(long);
3446 if (want
!= data_size
) {
3447 dev_err(DEV
, "%s:want (%u) != data_size (%u)\n", __func__
, want
, data_size
);
3452 err
= drbd_recv(mdev
->tconn
, buffer
, want
);
3459 drbd_bm_merge_lel(mdev
, c
->word_offset
, num_words
, buffer
);
3461 c
->word_offset
+= num_words
;
3462 c
->bit_offset
= c
->word_offset
* BITS_PER_LONG
;
3463 if (c
->bit_offset
> c
->bm_bits
)
3464 c
->bit_offset
= c
->bm_bits
;
3472 * Return 0 when done, 1 when another iteration is needed, and a negative error
3473 * code upon failure.
3476 recv_bm_rle_bits(struct drbd_conf
*mdev
,
3477 struct p_compressed_bm
*p
,
3478 struct bm_xfer_ctx
*c
,
3481 struct bitstream bs
;
3485 unsigned long s
= c
->bit_offset
;
3487 int toggle
= DCBP_get_start(p
);
3491 bitstream_init(&bs
, p
->code
, len
, DCBP_get_pad_bits(p
));
3493 bits
= bitstream_get_bits(&bs
, &look_ahead
, 64);
3497 for (have
= bits
; have
> 0; s
+= rl
, toggle
= !toggle
) {
3498 bits
= vli_decode_bits(&rl
, look_ahead
);
3504 if (e
>= c
->bm_bits
) {
3505 dev_err(DEV
, "bitmap overflow (e:%lu) while decoding bm RLE packet\n", e
);
3508 _drbd_bm_set_bits(mdev
, s
, e
);
3512 dev_err(DEV
, "bitmap decoding error: h:%d b:%d la:0x%08llx l:%u/%u\n",
3513 have
, bits
, look_ahead
,
3514 (unsigned int)(bs
.cur
.b
- p
->code
),
3515 (unsigned int)bs
.buf_len
);
3518 look_ahead
>>= bits
;
3521 bits
= bitstream_get_bits(&bs
, &tmp
, 64 - have
);
3524 look_ahead
|= tmp
<< have
;
3529 bm_xfer_ctx_bit_to_word_offset(c
);
3531 return (s
!= c
->bm_bits
);
3537 * Return 0 when done, 1 when another iteration is needed, and a negative error
3538 * code upon failure.
3541 decode_bitmap_c(struct drbd_conf
*mdev
,
3542 struct p_compressed_bm
*p
,
3543 struct bm_xfer_ctx
*c
,
3546 if (DCBP_get_code(p
) == RLE_VLI_Bits
)
3547 return recv_bm_rle_bits(mdev
, p
, c
, len
);
3549 /* other variants had been implemented for evaluation,
3550 * but have been dropped as this one turned out to be "best"
3551 * during all our tests. */
3553 dev_err(DEV
, "receive_bitmap_c: unknown encoding %u\n", p
->encoding
);
3554 drbd_force_state(mdev
, NS(conn
, C_PROTOCOL_ERROR
));
3558 void INFO_bm_xfer_stats(struct drbd_conf
*mdev
,
3559 const char *direction
, struct bm_xfer_ctx
*c
)
3561 /* what would it take to transfer it "plaintext" */
3562 unsigned plain
= sizeof(struct p_header
) *
3563 ((c
->bm_words
+BM_PACKET_WORDS
-1)/BM_PACKET_WORDS
+1)
3564 + c
->bm_words
* sizeof(long);
3565 unsigned total
= c
->bytes
[0] + c
->bytes
[1];
3568 /* total can not be zero. but just in case: */
3572 /* don't report if not compressed */
3576 /* total < plain. check for overflow, still */
3577 r
= (total
> UINT_MAX
/1000) ? (total
/ (plain
/1000))
3578 : (1000 * total
/ plain
);
3584 dev_info(DEV
, "%s bitmap stats [Bytes(packets)]: plain %u(%u), RLE %u(%u), "
3585 "total %u; compression: %u.%u%%\n",
3587 c
->bytes
[1], c
->packets
[1],
3588 c
->bytes
[0], c
->packets
[0],
3589 total
, r
/10, r
% 10);
3592 /* Since we are processing the bitfield from lower addresses to higher,
3593 it does not matter if the process it in 32 bit chunks or 64 bit
3594 chunks as long as it is little endian. (Understand it as byte stream,
3595 beginning with the lowest byte...) If we would use big endian
3596 we would need to process it from the highest address to the lowest,
3597 in order to be agnostic to the 32 vs 64 bits issue.
3599 returns 0 on failure, 1 if we successfully received it. */
3600 static int receive_bitmap(struct drbd_conf
*mdev
, enum drbd_packet cmd
,
3601 unsigned int data_size
)
3603 struct bm_xfer_ctx c
;
3607 struct p_header
*h
= &mdev
->tconn
->data
.rbuf
.header
;
3608 struct packet_info pi
;
3610 drbd_bm_lock(mdev
, "receive bitmap", BM_LOCKED_SET_ALLOWED
);
3611 /* you are supposed to send additional out-of-sync information
3612 * if you actually set bits during this phase */
3614 /* maybe we should use some per thread scratch page,
3615 * and allocate that during initial device creation? */
3616 buffer
= (unsigned long *) __get_free_page(GFP_NOIO
);
3618 dev_err(DEV
, "failed to allocate one page buffer in %s\n", __func__
);
3622 c
= (struct bm_xfer_ctx
) {
3623 .bm_bits
= drbd_bm_bits(mdev
),
3624 .bm_words
= drbd_bm_words(mdev
),
3628 if (cmd
== P_BITMAP
) {
3629 err
= receive_bitmap_plain(mdev
, data_size
, buffer
, &c
);
3630 } else if (cmd
== P_COMPRESSED_BITMAP
) {
3631 /* MAYBE: sanity check that we speak proto >= 90,
3632 * and the feature is enabled! */
3633 struct p_compressed_bm
*p
;
3635 if (data_size
> BM_PACKET_PAYLOAD_BYTES
) {
3636 dev_err(DEV
, "ReportCBitmap packet too large\n");
3639 /* use the page buff */
3641 memcpy(p
, h
, sizeof(*h
));
3642 if (drbd_recv(mdev
->tconn
, p
->head
.payload
, data_size
) != data_size
)
3644 if (data_size
<= (sizeof(*p
) - sizeof(p
->head
))) {
3645 dev_err(DEV
, "ReportCBitmap packet too small (l:%u)\n", data_size
);
3648 err
= decode_bitmap_c(mdev
, p
, &c
, data_size
);
3650 dev_warn(DEV
, "receive_bitmap: cmd neither ReportBitMap nor ReportCBitMap (is 0x%x)", cmd
);
3654 c
.packets
[cmd
== P_BITMAP
]++;
3655 c
.bytes
[cmd
== P_BITMAP
] += sizeof(struct p_header
) + data_size
;
3662 if (!drbd_recv_header(mdev
->tconn
, &pi
))
3665 data_size
= pi
.size
;
3668 INFO_bm_xfer_stats(mdev
, "receive", &c
);
3670 if (mdev
->state
.conn
== C_WF_BITMAP_T
) {
3671 enum drbd_state_rv rv
;
3673 ok
= !drbd_send_bitmap(mdev
);
3676 /* Omit CS_ORDERED with this state transition to avoid deadlocks. */
3677 rv
= _drbd_request_state(mdev
, NS(conn
, C_WF_SYNC_UUID
), CS_VERBOSE
);
3678 D_ASSERT(rv
== SS_SUCCESS
);
3679 } else if (mdev
->state
.conn
!= C_WF_BITMAP_S
) {
3680 /* admin may have requested C_DISCONNECTING,
3681 * other threads may have noticed network errors */
3682 dev_info(DEV
, "unexpected cstate (%s) in receive_bitmap\n",
3683 drbd_conn_str(mdev
->state
.conn
));
3688 drbd_bm_unlock(mdev
);
3689 if (ok
&& mdev
->state
.conn
== C_WF_BITMAP_S
)
3690 drbd_start_resync(mdev
, C_SYNC_SOURCE
);
3691 free_page((unsigned long) buffer
);
3695 static int receive_skip(struct drbd_conf
*mdev
, enum drbd_packet cmd
,
3696 unsigned int data_size
)
3698 /* TODO zero copy sink :) */
3699 static char sink
[128];
3702 dev_warn(DEV
, "skipping unknown optional packet type %d, l: %d!\n",
3707 want
= min_t(int, size
, sizeof(sink
));
3708 r
= drbd_recv(mdev
->tconn
, sink
, want
);
3716 static int receive_UnplugRemote(struct drbd_conf
*mdev
, enum drbd_packet cmd
,
3717 unsigned int data_size
)
3719 /* Make sure we've acked all the TCP data associated
3720 * with the data requests being unplugged */
3721 drbd_tcp_quickack(mdev
->tconn
->data
.socket
);
3726 static int receive_out_of_sync(struct drbd_conf
*mdev
, enum drbd_packet cmd
,
3727 unsigned int data_size
)
3729 struct p_block_desc
*p
= &mdev
->tconn
->data
.rbuf
.block_desc
;
3731 switch (mdev
->state
.conn
) {
3732 case C_WF_SYNC_UUID
:
3737 dev_err(DEV
, "ASSERT FAILED cstate = %s, expected: WFSyncUUID|WFBitMapT|Behind\n",
3738 drbd_conn_str(mdev
->state
.conn
));
3741 drbd_set_out_of_sync(mdev
, be64_to_cpu(p
->sector
), be32_to_cpu(p
->blksize
));
3746 typedef int (*drbd_cmd_handler_f
)(struct drbd_conf
*, enum drbd_packet cmd
,
3747 unsigned int to_receive
);
3752 drbd_cmd_handler_f function
;
3755 static struct data_cmd drbd_cmd_handler
[] = {
3756 [P_DATA
] = { 1, sizeof(struct p_data
), receive_Data
},
3757 [P_DATA_REPLY
] = { 1, sizeof(struct p_data
), receive_DataReply
},
3758 [P_RS_DATA_REPLY
] = { 1, sizeof(struct p_data
), receive_RSDataReply
} ,
3759 [P_BARRIER
] = { 0, sizeof(struct p_barrier
), receive_Barrier
} ,
3760 [P_BITMAP
] = { 1, sizeof(struct p_header
), receive_bitmap
} ,
3761 [P_COMPRESSED_BITMAP
] = { 1, sizeof(struct p_header
), receive_bitmap
} ,
3762 [P_UNPLUG_REMOTE
] = { 0, sizeof(struct p_header
), receive_UnplugRemote
},
3763 [P_DATA_REQUEST
] = { 0, sizeof(struct p_block_req
), receive_DataRequest
},
3764 [P_RS_DATA_REQUEST
] = { 0, sizeof(struct p_block_req
), receive_DataRequest
},
3765 [P_SYNC_PARAM
] = { 1, sizeof(struct p_header
), receive_SyncParam
},
3766 [P_SYNC_PARAM89
] = { 1, sizeof(struct p_header
), receive_SyncParam
},
3767 [P_PROTOCOL
] = { 1, sizeof(struct p_protocol
), receive_protocol
},
3768 [P_UUIDS
] = { 0, sizeof(struct p_uuids
), receive_uuids
},
3769 [P_SIZES
] = { 0, sizeof(struct p_sizes
), receive_sizes
},
3770 [P_STATE
] = { 0, sizeof(struct p_state
), receive_state
},
3771 [P_STATE_CHG_REQ
] = { 0, sizeof(struct p_req_state
), receive_req_state
},
3772 [P_SYNC_UUID
] = { 0, sizeof(struct p_rs_uuid
), receive_sync_uuid
},
3773 [P_OV_REQUEST
] = { 0, sizeof(struct p_block_req
), receive_DataRequest
},
3774 [P_OV_REPLY
] = { 1, sizeof(struct p_block_req
), receive_DataRequest
},
3775 [P_CSUM_RS_REQUEST
] = { 1, sizeof(struct p_block_req
), receive_DataRequest
},
3776 [P_DELAY_PROBE
] = { 0, sizeof(struct p_delay_probe93
), receive_skip
},
3777 [P_OUT_OF_SYNC
] = { 0, sizeof(struct p_block_desc
), receive_out_of_sync
},
3778 [P_CONN_ST_CHG_REQ
] = { 0, sizeof(struct p_req_state
), receive_req_state
},
3779 /* anything missing from this table is in
3780 * the asender_tbl, see get_asender_cmd */
3781 [P_MAX_CMD
] = { 0, 0, NULL
},
3784 /* All handler functions that expect a sub-header get that sub-heder in
3785 mdev->tconn->data.rbuf.header.head.payload.
3787 Usually in mdev->tconn->data.rbuf.header.head the callback can find the usual
3788 p_header, but they may not rely on that. Since there is also p_header95 !
3791 static void drbdd(struct drbd_tconn
*tconn
)
3793 struct p_header
*header
= &tconn
->data
.rbuf
.header
;
3794 struct packet_info pi
;
3795 size_t shs
; /* sub header size */
3798 while (get_t_state(&tconn
->receiver
) == RUNNING
) {
3799 drbd_thread_current_set_cpu(&tconn
->receiver
);
3800 if (!drbd_recv_header(tconn
, &pi
))
3803 if (unlikely(pi
.cmd
>= P_MAX_CMD
|| !drbd_cmd_handler
[pi
.cmd
].function
)) {
3804 conn_err(tconn
, "unknown packet type %d, l: %d!\n", pi
.cmd
, pi
.size
);
3808 shs
= drbd_cmd_handler
[pi
.cmd
].pkt_size
- sizeof(struct p_header
);
3809 if (pi
.size
- shs
> 0 && !drbd_cmd_handler
[pi
.cmd
].expect_payload
) {
3810 conn_err(tconn
, "No payload expected %s l:%d\n", cmdname(pi
.cmd
), pi
.size
);
3815 rv
= drbd_recv(tconn
, &header
->payload
, shs
);
3816 if (unlikely(rv
!= shs
)) {
3817 if (!signal_pending(current
))
3818 conn_warn(tconn
, "short read while reading sub header: rv=%d\n", rv
);
3823 rv
= drbd_cmd_handler
[pi
.cmd
].function(vnr_to_mdev(tconn
, pi
.vnr
), pi
.cmd
, pi
.size
- shs
);
3825 if (unlikely(!rv
)) {
3826 conn_err(tconn
, "error receiving %s, l: %d!\n",
3827 cmdname(pi
.cmd
), pi
.size
);
3834 conn_request_state(tconn
, NS(conn
, C_PROTOCOL_ERROR
), CS_HARD
);
3838 void drbd_flush_workqueue(struct drbd_conf
*mdev
)
3840 struct drbd_wq_barrier barr
;
3842 barr
.w
.cb
= w_prev_work_done
;
3844 init_completion(&barr
.done
);
3845 drbd_queue_work(&mdev
->tconn
->data
.work
, &barr
.w
);
3846 wait_for_completion(&barr
.done
);
3849 static void drbd_disconnect(struct drbd_tconn
*tconn
)
3852 int rv
= SS_UNKNOWN_ERROR
;
3854 if (tconn
->cstate
== C_STANDALONE
)
3857 /* asender does not clean up anything. it must not interfere, either */
3858 drbd_thread_stop(&tconn
->asender
);
3859 drbd_free_sock(tconn
);
3861 idr_for_each(&tconn
->volumes
, drbd_disconnected
, tconn
);
3863 conn_info(tconn
, "Connection closed\n");
3865 spin_lock_irq(&tconn
->req_lock
);
3867 if (oc
>= C_UNCONNECTED
)
3868 rv
= _conn_request_state(tconn
, NS(conn
, C_UNCONNECTED
), CS_VERBOSE
);
3870 spin_unlock_irq(&tconn
->req_lock
);
3872 if (oc
== C_DISCONNECTING
) {
3873 wait_event(tconn
->net_cnt_wait
, atomic_read(&tconn
->net_cnt
) == 0);
3875 crypto_free_hash(tconn
->cram_hmac_tfm
);
3876 tconn
->cram_hmac_tfm
= NULL
;
3878 kfree(tconn
->net_conf
);
3879 tconn
->net_conf
= NULL
;
3880 conn_request_state(tconn
, NS(conn
, C_STANDALONE
), CS_VERBOSE
);
3884 static int drbd_disconnected(int vnr
, void *p
, void *data
)
3886 struct drbd_conf
*mdev
= (struct drbd_conf
*)p
;
3887 enum drbd_fencing_p fp
;
3890 /* wait for current activity to cease. */
3891 spin_lock_irq(&mdev
->tconn
->req_lock
);
3892 _drbd_wait_ee_list_empty(mdev
, &mdev
->active_ee
);
3893 _drbd_wait_ee_list_empty(mdev
, &mdev
->sync_ee
);
3894 _drbd_wait_ee_list_empty(mdev
, &mdev
->read_ee
);
3895 spin_unlock_irq(&mdev
->tconn
->req_lock
);
3897 /* We do not have data structures that would allow us to
3898 * get the rs_pending_cnt down to 0 again.
3899 * * On C_SYNC_TARGET we do not have any data structures describing
3900 * the pending RSDataRequest's we have sent.
3901 * * On C_SYNC_SOURCE there is no data structure that tracks
3902 * the P_RS_DATA_REPLY blocks that we sent to the SyncTarget.
3903 * And no, it is not the sum of the reference counts in the
3904 * resync_LRU. The resync_LRU tracks the whole operation including
3905 * the disk-IO, while the rs_pending_cnt only tracks the blocks
3907 drbd_rs_cancel_all(mdev
);
3909 mdev
->rs_failed
= 0;
3910 atomic_set(&mdev
->rs_pending_cnt
, 0);
3911 wake_up(&mdev
->misc_wait
);
3913 del_timer(&mdev
->request_timer
);
3915 /* make sure syncer is stopped and w_resume_next_sg queued */
3916 del_timer_sync(&mdev
->resync_timer
);
3917 resync_timer_fn((unsigned long)mdev
);
3919 /* wait for all w_e_end_data_req, w_e_end_rsdata_req, w_send_barrier,
3920 * w_make_resync_request etc. which may still be on the worker queue
3921 * to be "canceled" */
3922 drbd_flush_workqueue(mdev
);
3924 /* This also does reclaim_net_ee(). If we do this too early, we might
3925 * miss some resync ee and pages.*/
3926 drbd_process_done_ee(mdev
);
3928 kfree(mdev
->p_uuid
);
3929 mdev
->p_uuid
= NULL
;
3931 if (!is_susp(mdev
->state
))
3937 if (get_ldev(mdev
)) {
3938 fp
= mdev
->ldev
->dc
.fencing
;
3942 if (mdev
->state
.role
== R_PRIMARY
&& fp
>= FP_RESOURCE
&& mdev
->state
.pdsk
>= D_UNKNOWN
)
3943 drbd_try_outdate_peer_async(mdev
);
3945 /* serialize with bitmap writeout triggered by the state change,
3947 wait_event(mdev
->misc_wait
, !test_bit(BITMAP_IO
, &mdev
->flags
));
3949 /* tcp_close and release of sendpage pages can be deferred. I don't
3950 * want to use SO_LINGER, because apparently it can be deferred for
3951 * more than 20 seconds (longest time I checked).
3953 * Actually we don't care for exactly when the network stack does its
3954 * put_page(), but release our reference on these pages right here.
3956 i
= drbd_release_ee(mdev
, &mdev
->net_ee
);
3958 dev_info(DEV
, "net_ee not empty, killed %u entries\n", i
);
3959 i
= atomic_read(&mdev
->pp_in_use_by_net
);
3961 dev_info(DEV
, "pp_in_use_by_net = %d, expected 0\n", i
);
3962 i
= atomic_read(&mdev
->pp_in_use
);
3964 dev_info(DEV
, "pp_in_use = %d, expected 0\n", i
);
3966 D_ASSERT(list_empty(&mdev
->read_ee
));
3967 D_ASSERT(list_empty(&mdev
->active_ee
));
3968 D_ASSERT(list_empty(&mdev
->sync_ee
));
3969 D_ASSERT(list_empty(&mdev
->done_ee
));
3971 /* ok, no more ee's on the fly, it is safe to reset the epoch_size */
3972 atomic_set(&mdev
->current_epoch
->epoch_size
, 0);
3973 D_ASSERT(list_empty(&mdev
->current_epoch
->list
));
3979 * We support PRO_VERSION_MIN to PRO_VERSION_MAX. The protocol version
3980 * we can agree on is stored in agreed_pro_version.
3982 * feature flags and the reserved array should be enough room for future
3983 * enhancements of the handshake protocol, and possible plugins...
3985 * for now, they are expected to be zero, but ignored.
3987 static int drbd_send_handshake(struct drbd_tconn
*tconn
)
3989 /* ASSERT current == mdev->tconn->receiver ... */
3990 struct p_handshake
*p
= &tconn
->data
.sbuf
.handshake
;
3993 if (mutex_lock_interruptible(&tconn
->data
.mutex
)) {
3994 conn_err(tconn
, "interrupted during initial handshake\n");
3995 return 0; /* interrupted. not ok. */
3998 if (tconn
->data
.socket
== NULL
) {
3999 mutex_unlock(&tconn
->data
.mutex
);
4003 memset(p
, 0, sizeof(*p
));
4004 p
->protocol_min
= cpu_to_be32(PRO_VERSION_MIN
);
4005 p
->protocol_max
= cpu_to_be32(PRO_VERSION_MAX
);
4006 ok
= _conn_send_cmd(tconn
, 0, tconn
->data
.socket
, P_HAND_SHAKE
,
4007 &p
->head
, sizeof(*p
), 0);
4008 mutex_unlock(&tconn
->data
.mutex
);
4014 * 1 yes, we have a valid connection
4015 * 0 oops, did not work out, please try again
4016 * -1 peer talks different language,
4017 * no point in trying again, please go standalone.
4019 static int drbd_do_handshake(struct drbd_tconn
*tconn
)
4021 /* ASSERT current == tconn->receiver ... */
4022 struct p_handshake
*p
= &tconn
->data
.rbuf
.handshake
;
4023 const int expect
= sizeof(struct p_handshake
) - sizeof(struct p_header80
);
4024 struct packet_info pi
;
4027 rv
= drbd_send_handshake(tconn
);
4031 rv
= drbd_recv_header(tconn
, &pi
);
4035 if (pi
.cmd
!= P_HAND_SHAKE
) {
4036 conn_err(tconn
, "expected HandShake packet, received: %s (0x%04x)\n",
4037 cmdname(pi
.cmd
), pi
.cmd
);
4041 if (pi
.size
!= expect
) {
4042 conn_err(tconn
, "expected HandShake length: %u, received: %u\n",
4047 rv
= drbd_recv(tconn
, &p
->head
.payload
, expect
);
4050 if (!signal_pending(current
))
4051 conn_warn(tconn
, "short read receiving handshake packet: l=%u\n", rv
);
4055 p
->protocol_min
= be32_to_cpu(p
->protocol_min
);
4056 p
->protocol_max
= be32_to_cpu(p
->protocol_max
);
4057 if (p
->protocol_max
== 0)
4058 p
->protocol_max
= p
->protocol_min
;
4060 if (PRO_VERSION_MAX
< p
->protocol_min
||
4061 PRO_VERSION_MIN
> p
->protocol_max
)
4064 tconn
->agreed_pro_version
= min_t(int, PRO_VERSION_MAX
, p
->protocol_max
);
4066 conn_info(tconn
, "Handshake successful: "
4067 "Agreed network protocol version %d\n", tconn
->agreed_pro_version
);
4072 conn_err(tconn
, "incompatible DRBD dialects: "
4073 "I support %d-%d, peer supports %d-%d\n",
4074 PRO_VERSION_MIN
, PRO_VERSION_MAX
,
4075 p
->protocol_min
, p
->protocol_max
);
4079 #if !defined(CONFIG_CRYPTO_HMAC) && !defined(CONFIG_CRYPTO_HMAC_MODULE)
4080 static int drbd_do_auth(struct drbd_tconn
*tconn
)
4082 dev_err(DEV
, "This kernel was build without CONFIG_CRYPTO_HMAC.\n");
4083 dev_err(DEV
, "You need to disable 'cram-hmac-alg' in drbd.conf.\n");
4087 #define CHALLENGE_LEN 64
4091 0 - failed, try again (network error),
4092 -1 - auth failed, don't try again.
4095 static int drbd_do_auth(struct drbd_tconn
*tconn
)
4097 char my_challenge
[CHALLENGE_LEN
]; /* 64 Bytes... */
4098 struct scatterlist sg
;
4099 char *response
= NULL
;
4100 char *right_response
= NULL
;
4101 char *peers_ch
= NULL
;
4102 unsigned int key_len
= strlen(tconn
->net_conf
->shared_secret
);
4103 unsigned int resp_size
;
4104 struct hash_desc desc
;
4105 struct packet_info pi
;
4108 desc
.tfm
= tconn
->cram_hmac_tfm
;
4111 rv
= crypto_hash_setkey(tconn
->cram_hmac_tfm
,
4112 (u8
*)tconn
->net_conf
->shared_secret
, key_len
);
4114 conn_err(tconn
, "crypto_hash_setkey() failed with %d\n", rv
);
4119 get_random_bytes(my_challenge
, CHALLENGE_LEN
);
4121 rv
= conn_send_cmd2(tconn
, P_AUTH_CHALLENGE
, my_challenge
, CHALLENGE_LEN
);
4125 rv
= drbd_recv_header(tconn
, &pi
);
4129 if (pi
.cmd
!= P_AUTH_CHALLENGE
) {
4130 conn_err(tconn
, "expected AuthChallenge packet, received: %s (0x%04x)\n",
4131 cmdname(pi
.cmd
), pi
.cmd
);
4136 if (pi
.size
> CHALLENGE_LEN
* 2) {
4137 conn_err(tconn
, "expected AuthChallenge payload too big.\n");
4142 peers_ch
= kmalloc(pi
.size
, GFP_NOIO
);
4143 if (peers_ch
== NULL
) {
4144 conn_err(tconn
, "kmalloc of peers_ch failed\n");
4149 rv
= drbd_recv(tconn
, peers_ch
, pi
.size
);
4151 if (rv
!= pi
.size
) {
4152 if (!signal_pending(current
))
4153 conn_warn(tconn
, "short read AuthChallenge: l=%u\n", rv
);
4158 resp_size
= crypto_hash_digestsize(tconn
->cram_hmac_tfm
);
4159 response
= kmalloc(resp_size
, GFP_NOIO
);
4160 if (response
== NULL
) {
4161 conn_err(tconn
, "kmalloc of response failed\n");
4166 sg_init_table(&sg
, 1);
4167 sg_set_buf(&sg
, peers_ch
, pi
.size
);
4169 rv
= crypto_hash_digest(&desc
, &sg
, sg
.length
, response
);
4171 conn_err(tconn
, "crypto_hash_digest() failed with %d\n", rv
);
4176 rv
= conn_send_cmd2(tconn
, P_AUTH_RESPONSE
, response
, resp_size
);
4180 rv
= drbd_recv_header(tconn
, &pi
);
4184 if (pi
.cmd
!= P_AUTH_RESPONSE
) {
4185 conn_err(tconn
, "expected AuthResponse packet, received: %s (0x%04x)\n",
4186 cmdname(pi
.cmd
), pi
.cmd
);
4191 if (pi
.size
!= resp_size
) {
4192 conn_err(tconn
, "expected AuthResponse payload of wrong size\n");
4197 rv
= drbd_recv(tconn
, response
, resp_size
);
4199 if (rv
!= resp_size
) {
4200 if (!signal_pending(current
))
4201 conn_warn(tconn
, "short read receiving AuthResponse: l=%u\n", rv
);
4206 right_response
= kmalloc(resp_size
, GFP_NOIO
);
4207 if (right_response
== NULL
) {
4208 conn_err(tconn
, "kmalloc of right_response failed\n");
4213 sg_set_buf(&sg
, my_challenge
, CHALLENGE_LEN
);
4215 rv
= crypto_hash_digest(&desc
, &sg
, sg
.length
, right_response
);
4217 conn_err(tconn
, "crypto_hash_digest() failed with %d\n", rv
);
4222 rv
= !memcmp(response
, right_response
, resp_size
);
4225 conn_info(tconn
, "Peer authenticated using %d bytes of '%s' HMAC\n",
4226 resp_size
, tconn
->net_conf
->cram_hmac_alg
);
4233 kfree(right_response
);
4239 int drbdd_init(struct drbd_thread
*thi
)
4241 struct drbd_tconn
*tconn
= thi
->tconn
;
4244 conn_info(tconn
, "receiver (re)started\n");
4247 h
= drbd_connect(tconn
);
4249 drbd_disconnect(tconn
);
4250 schedule_timeout_interruptible(HZ
);
4253 conn_warn(tconn
, "Discarding network configuration.\n");
4254 conn_request_state(tconn
, NS(conn
, C_DISCONNECTING
), CS_HARD
);
4259 if (get_net_conf(tconn
)) {
4261 put_net_conf(tconn
);
4265 drbd_disconnect(tconn
);
4267 conn_info(tconn
, "receiver terminated\n");
4271 /* ********* acknowledge sender ******** */
4273 static int got_RqSReply(struct drbd_conf
*mdev
, enum drbd_packet cmd
)
4275 struct p_req_state_reply
*p
= &mdev
->tconn
->meta
.rbuf
.req_state_reply
;
4276 struct drbd_tconn
*tconn
= mdev
->tconn
;
4278 int retcode
= be32_to_cpu(p
->retcode
);
4280 if (cmd
== P_STATE_CHG_REPLY
) {
4281 if (retcode
>= SS_SUCCESS
) {
4282 set_bit(CL_ST_CHG_SUCCESS
, &mdev
->flags
);
4284 set_bit(CL_ST_CHG_FAIL
, &mdev
->flags
);
4285 dev_err(DEV
, "Requested state change failed by peer: %s (%d)\n",
4286 drbd_set_st_err_str(retcode
), retcode
);
4288 wake_up(&mdev
->state_wait
);
4289 } else /* conn == P_CONN_ST_CHG_REPLY */ {
4290 if (retcode
>= SS_SUCCESS
) {
4291 set_bit(CONN_WD_ST_CHG_OKAY
, &tconn
->flags
);
4293 set_bit(CONN_WD_ST_CHG_FAIL
, &tconn
->flags
);
4294 conn_err(tconn
, "Requested state change failed by peer: %s (%d)\n",
4295 drbd_set_st_err_str(retcode
), retcode
);
4297 wake_up(&tconn
->ping_wait
);
4302 static int got_Ping(struct drbd_conf
*mdev
, enum drbd_packet cmd
)
4304 return drbd_send_ping_ack(mdev
->tconn
);
4308 static int got_PingAck(struct drbd_conf
*mdev
, enum drbd_packet cmd
)
4310 struct drbd_tconn
*tconn
= mdev
->tconn
;
4311 /* restore idle timeout */
4312 tconn
->meta
.socket
->sk
->sk_rcvtimeo
= tconn
->net_conf
->ping_int
*HZ
;
4313 if (!test_and_set_bit(GOT_PING_ACK
, &tconn
->flags
))
4314 wake_up(&tconn
->ping_wait
);
4319 static int got_IsInSync(struct drbd_conf
*mdev
, enum drbd_packet cmd
)
4321 struct p_block_ack
*p
= &mdev
->tconn
->meta
.rbuf
.block_ack
;
4322 sector_t sector
= be64_to_cpu(p
->sector
);
4323 int blksize
= be32_to_cpu(p
->blksize
);
4325 D_ASSERT(mdev
->tconn
->agreed_pro_version
>= 89);
4327 update_peer_seq(mdev
, be32_to_cpu(p
->seq_num
));
4329 if (get_ldev(mdev
)) {
4330 drbd_rs_complete_io(mdev
, sector
);
4331 drbd_set_in_sync(mdev
, sector
, blksize
);
4332 /* rs_same_csums is supposed to count in units of BM_BLOCK_SIZE */
4333 mdev
->rs_same_csum
+= (blksize
>> BM_BLOCK_SHIFT
);
4336 dec_rs_pending(mdev
);
4337 atomic_add(blksize
>> 9, &mdev
->rs_sect_in
);
4343 validate_req_change_req_state(struct drbd_conf
*mdev
, u64 id
, sector_t sector
,
4344 struct rb_root
*root
, const char *func
,
4345 enum drbd_req_event what
, bool missing_ok
)
4347 struct drbd_request
*req
;
4348 struct bio_and_error m
;
4350 spin_lock_irq(&mdev
->tconn
->req_lock
);
4351 req
= find_request(mdev
, root
, id
, sector
, missing_ok
, func
);
4352 if (unlikely(!req
)) {
4353 spin_unlock_irq(&mdev
->tconn
->req_lock
);
4356 __req_mod(req
, what
, &m
);
4357 spin_unlock_irq(&mdev
->tconn
->req_lock
);
4360 complete_master_bio(mdev
, &m
);
4364 static int got_BlockAck(struct drbd_conf
*mdev
, enum drbd_packet cmd
)
4366 struct p_block_ack
*p
= &mdev
->tconn
->meta
.rbuf
.block_ack
;
4367 sector_t sector
= be64_to_cpu(p
->sector
);
4368 int blksize
= be32_to_cpu(p
->blksize
);
4369 enum drbd_req_event what
;
4371 update_peer_seq(mdev
, be32_to_cpu(p
->seq_num
));
4373 if (p
->block_id
== ID_SYNCER
) {
4374 drbd_set_in_sync(mdev
, sector
, blksize
);
4375 dec_rs_pending(mdev
);
4379 case P_RS_WRITE_ACK
:
4380 D_ASSERT(mdev
->tconn
->net_conf
->wire_protocol
== DRBD_PROT_C
);
4381 what
= WRITE_ACKED_BY_PEER_AND_SIS
;
4384 D_ASSERT(mdev
->tconn
->net_conf
->wire_protocol
== DRBD_PROT_C
);
4385 what
= WRITE_ACKED_BY_PEER
;
4388 D_ASSERT(mdev
->tconn
->net_conf
->wire_protocol
== DRBD_PROT_B
);
4389 what
= RECV_ACKED_BY_PEER
;
4392 D_ASSERT(mdev
->tconn
->net_conf
->wire_protocol
== DRBD_PROT_C
);
4393 what
= CONFLICT_DISCARDED_BY_PEER
;
4400 return validate_req_change_req_state(mdev
, p
->block_id
, sector
,
4401 &mdev
->write_requests
, __func__
,
4405 static int got_NegAck(struct drbd_conf
*mdev
, enum drbd_packet cmd
)
4407 struct p_block_ack
*p
= &mdev
->tconn
->meta
.rbuf
.block_ack
;
4408 sector_t sector
= be64_to_cpu(p
->sector
);
4409 int size
= be32_to_cpu(p
->blksize
);
4410 bool missing_ok
= mdev
->tconn
->net_conf
->wire_protocol
== DRBD_PROT_A
||
4411 mdev
->tconn
->net_conf
->wire_protocol
== DRBD_PROT_B
;
4414 update_peer_seq(mdev
, be32_to_cpu(p
->seq_num
));
4416 if (p
->block_id
== ID_SYNCER
) {
4417 dec_rs_pending(mdev
);
4418 drbd_rs_failed_io(mdev
, sector
, size
);
4422 found
= validate_req_change_req_state(mdev
, p
->block_id
, sector
,
4423 &mdev
->write_requests
, __func__
,
4424 NEG_ACKED
, missing_ok
);
4426 /* Protocol A has no P_WRITE_ACKs, but has P_NEG_ACKs.
4427 The master bio might already be completed, therefore the
4428 request is no longer in the collision hash. */
4429 /* In Protocol B we might already have got a P_RECV_ACK
4430 but then get a P_NEG_ACK afterwards. */
4433 drbd_set_out_of_sync(mdev
, sector
, size
);
4438 static int got_NegDReply(struct drbd_conf
*mdev
, enum drbd_packet cmd
)
4440 struct p_block_ack
*p
= &mdev
->tconn
->meta
.rbuf
.block_ack
;
4441 sector_t sector
= be64_to_cpu(p
->sector
);
4443 update_peer_seq(mdev
, be32_to_cpu(p
->seq_num
));
4444 dev_err(DEV
, "Got NegDReply; Sector %llus, len %u; Fail original request.\n",
4445 (unsigned long long)sector
, be32_to_cpu(p
->blksize
));
4447 return validate_req_change_req_state(mdev
, p
->block_id
, sector
,
4448 &mdev
->read_requests
, __func__
,
4452 static int got_NegRSDReply(struct drbd_conf
*mdev
, enum drbd_packet cmd
)
4456 struct p_block_ack
*p
= &mdev
->tconn
->meta
.rbuf
.block_ack
;
4458 sector
= be64_to_cpu(p
->sector
);
4459 size
= be32_to_cpu(p
->blksize
);
4461 update_peer_seq(mdev
, be32_to_cpu(p
->seq_num
));
4463 dec_rs_pending(mdev
);
4465 if (get_ldev_if_state(mdev
, D_FAILED
)) {
4466 drbd_rs_complete_io(mdev
, sector
);
4468 case P_NEG_RS_DREPLY
:
4469 drbd_rs_failed_io(mdev
, sector
, size
);
4483 static int got_BarrierAck(struct drbd_conf
*mdev
, enum drbd_packet cmd
)
4485 struct p_barrier_ack
*p
= &mdev
->tconn
->meta
.rbuf
.barrier_ack
;
4487 tl_release(mdev
, p
->barrier
, be32_to_cpu(p
->set_size
));
4489 if (mdev
->state
.conn
== C_AHEAD
&&
4490 atomic_read(&mdev
->ap_in_flight
) == 0 &&
4491 !test_and_set_bit(AHEAD_TO_SYNC_SOURCE
, &mdev
->current_epoch
->flags
)) {
4492 mdev
->start_resync_timer
.expires
= jiffies
+ HZ
;
4493 add_timer(&mdev
->start_resync_timer
);
4499 static int got_OVResult(struct drbd_conf
*mdev
, enum drbd_packet cmd
)
4501 struct p_block_ack
*p
= &mdev
->tconn
->meta
.rbuf
.block_ack
;
4502 struct drbd_work
*w
;
4506 sector
= be64_to_cpu(p
->sector
);
4507 size
= be32_to_cpu(p
->blksize
);
4509 update_peer_seq(mdev
, be32_to_cpu(p
->seq_num
));
4511 if (be64_to_cpu(p
->block_id
) == ID_OUT_OF_SYNC
)
4512 drbd_ov_oos_found(mdev
, sector
, size
);
4516 if (!get_ldev(mdev
))
4519 drbd_rs_complete_io(mdev
, sector
);
4520 dec_rs_pending(mdev
);
4524 /* let's advance progress step marks only for every other megabyte */
4525 if ((mdev
->ov_left
& 0x200) == 0x200)
4526 drbd_advance_rs_marks(mdev
, mdev
->ov_left
);
4528 if (mdev
->ov_left
== 0) {
4529 w
= kmalloc(sizeof(*w
), GFP_NOIO
);
4531 w
->cb
= w_ov_finished
;
4533 drbd_queue_work_front(&mdev
->tconn
->data
.work
, w
);
4535 dev_err(DEV
, "kmalloc(w) failed.");
4537 drbd_resync_finished(mdev
);
4544 static int got_skip(struct drbd_conf
*mdev
, enum drbd_packet cmd
)
4549 struct asender_cmd
{
4551 int (*process
)(struct drbd_conf
*mdev
, enum drbd_packet cmd
);
4554 static struct asender_cmd
*get_asender_cmd(int cmd
)
4556 static struct asender_cmd asender_tbl
[] = {
4557 /* anything missing from this table is in
4558 * the drbd_cmd_handler (drbd_default_handler) table,
4559 * see the beginning of drbdd() */
4560 [P_PING
] = { sizeof(struct p_header
), got_Ping
},
4561 [P_PING_ACK
] = { sizeof(struct p_header
), got_PingAck
},
4562 [P_RECV_ACK
] = { sizeof(struct p_block_ack
), got_BlockAck
},
4563 [P_WRITE_ACK
] = { sizeof(struct p_block_ack
), got_BlockAck
},
4564 [P_RS_WRITE_ACK
] = { sizeof(struct p_block_ack
), got_BlockAck
},
4565 [P_DISCARD_ACK
] = { sizeof(struct p_block_ack
), got_BlockAck
},
4566 [P_NEG_ACK
] = { sizeof(struct p_block_ack
), got_NegAck
},
4567 [P_NEG_DREPLY
] = { sizeof(struct p_block_ack
), got_NegDReply
},
4568 [P_NEG_RS_DREPLY
] = { sizeof(struct p_block_ack
), got_NegRSDReply
},
4569 [P_OV_RESULT
] = { sizeof(struct p_block_ack
), got_OVResult
},
4570 [P_BARRIER_ACK
] = { sizeof(struct p_barrier_ack
), got_BarrierAck
},
4571 [P_STATE_CHG_REPLY
] = { sizeof(struct p_req_state_reply
), got_RqSReply
},
4572 [P_RS_IS_IN_SYNC
] = { sizeof(struct p_block_ack
), got_IsInSync
},
4573 [P_DELAY_PROBE
] = { sizeof(struct p_delay_probe93
), got_skip
},
4574 [P_RS_CANCEL
] = { sizeof(struct p_block_ack
), got_NegRSDReply
},
4575 [P_CONN_ST_CHG_REPLY
]={ sizeof(struct p_req_state_reply
), got_RqSReply
},
4576 [P_MAX_CMD
] = { 0, NULL
},
4578 if (cmd
> P_MAX_CMD
|| asender_tbl
[cmd
].process
== NULL
)
4580 return &asender_tbl
[cmd
];
4583 static int _drbd_process_done_ee(int vnr
, void *p
, void *data
)
4585 struct drbd_conf
*mdev
= (struct drbd_conf
*)p
;
4586 return !drbd_process_done_ee(mdev
);
4589 static int _check_ee_empty(int vnr
, void *p
, void *data
)
4591 struct drbd_conf
*mdev
= (struct drbd_conf
*)p
;
4592 struct drbd_tconn
*tconn
= mdev
->tconn
;
4595 spin_lock_irq(&tconn
->req_lock
);
4596 not_empty
= !list_empty(&mdev
->done_ee
);
4597 spin_unlock_irq(&tconn
->req_lock
);
4602 static int tconn_process_done_ee(struct drbd_tconn
*tconn
)
4607 clear_bit(SIGNAL_ASENDER
, &tconn
->flags
);
4608 flush_signals(current
);
4609 err
= idr_for_each(&tconn
->volumes
, _drbd_process_done_ee
, NULL
);
4612 set_bit(SIGNAL_ASENDER
, &tconn
->flags
);
4613 not_empty
= idr_for_each(&tconn
->volumes
, _check_ee_empty
, NULL
);
4614 } while (not_empty
);
4619 int drbd_asender(struct drbd_thread
*thi
)
4621 struct drbd_tconn
*tconn
= thi
->tconn
;
4622 struct p_header
*h
= &tconn
->meta
.rbuf
.header
;
4623 struct asender_cmd
*cmd
= NULL
;
4624 struct packet_info pi
;
4628 int expect
= sizeof(struct p_header
);
4629 int ping_timeout_active
= 0;
4631 current
->policy
= SCHED_RR
; /* Make this a realtime task! */
4632 current
->rt_priority
= 2; /* more important than all other tasks */
4634 while (get_t_state(thi
) == RUNNING
) {
4635 drbd_thread_current_set_cpu(thi
);
4636 if (test_and_clear_bit(SEND_PING
, &tconn
->flags
)) {
4637 if (!drbd_send_ping(tconn
)) {
4638 conn_err(tconn
, "drbd_send_ping has failed\n");
4641 tconn
->meta
.socket
->sk
->sk_rcvtimeo
=
4642 tconn
->net_conf
->ping_timeo
*HZ
/10;
4643 ping_timeout_active
= 1;
4646 /* TODO: conditionally cork; it may hurt latency if we cork without
4648 if (!tconn
->net_conf
->no_cork
)
4649 drbd_tcp_cork(tconn
->meta
.socket
);
4650 if (tconn_process_done_ee(tconn
))
4652 /* but unconditionally uncork unless disabled */
4653 if (!tconn
->net_conf
->no_cork
)
4654 drbd_tcp_uncork(tconn
->meta
.socket
);
4656 /* short circuit, recv_msg would return EINTR anyways. */
4657 if (signal_pending(current
))
4660 rv
= drbd_recv_short(tconn
->meta
.socket
, buf
, expect
-received
, 0);
4661 clear_bit(SIGNAL_ASENDER
, &tconn
->flags
);
4663 flush_signals(current
);
4666 * -EINTR (on meta) we got a signal
4667 * -EAGAIN (on meta) rcvtimeo expired
4668 * -ECONNRESET other side closed the connection
4669 * -ERESTARTSYS (on data) we got a signal
4670 * rv < 0 other than above: unexpected error!
4671 * rv == expected: full header or command
4672 * rv < expected: "woken" by signal during receive
4673 * rv == 0 : "connection shut down by peer"
4675 if (likely(rv
> 0)) {
4678 } else if (rv
== 0) {
4679 conn_err(tconn
, "meta connection shut down by peer.\n");
4681 } else if (rv
== -EAGAIN
) {
4682 /* If the data socket received something meanwhile,
4683 * that is good enough: peer is still alive. */
4684 if (time_after(tconn
->last_received
,
4685 jiffies
- tconn
->meta
.socket
->sk
->sk_rcvtimeo
))
4687 if (ping_timeout_active
) {
4688 conn_err(tconn
, "PingAck did not arrive in time.\n");
4691 set_bit(SEND_PING
, &tconn
->flags
);
4693 } else if (rv
== -EINTR
) {
4696 conn_err(tconn
, "sock_recvmsg returned %d\n", rv
);
4700 if (received
== expect
&& cmd
== NULL
) {
4701 if (!decode_header(tconn
, h
, &pi
))
4703 cmd
= get_asender_cmd(pi
.cmd
);
4704 if (unlikely(cmd
== NULL
)) {
4705 conn_err(tconn
, "unknown command %d on meta (l: %d)\n",
4709 expect
= cmd
->pkt_size
;
4710 if (pi
.size
!= expect
- sizeof(struct p_header
)) {
4711 conn_err(tconn
, "Wrong packet size on meta (c: %d, l: %d)\n",
4716 if (received
== expect
) {
4717 tconn
->last_received
= jiffies
;
4718 if (!cmd
->process(vnr_to_mdev(tconn
, pi
.vnr
), pi
.cmd
))
4721 /* the idle_timeout (ping-int)
4722 * has been restored in got_PingAck() */
4723 if (cmd
== get_asender_cmd(P_PING_ACK
))
4724 ping_timeout_active
= 0;
4728 expect
= sizeof(struct p_header
);
4735 conn_request_state(tconn
, NS(conn
, C_NETWORK_FAILURE
), CS_HARD
);
4739 conn_request_state(tconn
, NS(conn
, C_DISCONNECTING
), CS_HARD
);
4741 clear_bit(SIGNAL_ASENDER
, &tconn
->flags
);
4743 conn_info(tconn
, "asender terminated\n");