4 This file is part of DRBD by Philipp Reisner and Lars Ellenberg.
6 Copyright (C) 2001-2008, LINBIT Information Technologies GmbH.
7 Copyright (C) 1999-2008, Philipp Reisner <philipp.reisner@linbit.com>.
8 Copyright (C) 2002-2008, Lars Ellenberg <lars.ellenberg@linbit.com>.
10 drbd is free software; you can redistribute it and/or modify
11 it under the terms of the GNU General Public License as published by
12 the Free Software Foundation; either version 2, or (at your option)
15 drbd is distributed in the hope that it will be useful,
16 but WITHOUT ANY WARRANTY; without even the implied warranty of
17 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18 GNU General Public License for more details.
20 You should have received a copy of the GNU General Public License
21 along with drbd; see the file COPYING. If not, write to
22 the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
26 #include <linux/module.h>
28 #include <asm/uaccess.h>
31 #include <linux/drbd.h>
33 #include <linux/file.h>
36 #include <linux/memcontrol.h>
37 #include <linux/mm_inline.h>
38 #include <linux/slab.h>
39 #include <linux/pkt_sched.h>
40 #define __KERNEL_SYSCALLS__
41 #include <linux/unistd.h>
42 #include <linux/vmalloc.h>
43 #include <linux/random.h>
44 #include <linux/string.h>
45 #include <linux/scatterlist.h>
64 static int drbd_do_features(struct drbd_tconn
*tconn
);
65 static int drbd_do_auth(struct drbd_tconn
*tconn
);
66 static int drbd_disconnected(int vnr
, void *p
, void *data
);
68 static enum finish_epoch
drbd_may_finish_epoch(struct drbd_conf
*, struct drbd_epoch
*, enum epoch_event
);
69 static int e_end_block(struct drbd_work
*, int);
72 #define GFP_TRY (__GFP_HIGHMEM | __GFP_NOWARN)
75 * some helper functions to deal with single linked page lists,
76 * page->private being our "next" pointer.
79 /* If at least n pages are linked at head, get n pages off.
80 * Otherwise, don't modify head, and return NULL.
81 * Locking is the responsibility of the caller.
83 static struct page
*page_chain_del(struct page
**head
, int n
)
97 tmp
= page_chain_next(page
);
99 break; /* found sufficient pages */
101 /* insufficient pages, don't use any of them. */
106 /* add end of list marker for the returned list */
107 set_page_private(page
, 0);
108 /* actual return value, and adjustment of head */
114 /* may be used outside of locks to find the tail of a (usually short)
115 * "private" page chain, before adding it back to a global chain head
116 * with page_chain_add() under a spinlock. */
117 static struct page
*page_chain_tail(struct page
*page
, int *len
)
121 while ((tmp
= page_chain_next(page
)))
128 static int page_chain_free(struct page
*page
)
132 page_chain_for_each_safe(page
, tmp
) {
139 static void page_chain_add(struct page
**head
,
140 struct page
*chain_first
, struct page
*chain_last
)
144 tmp
= page_chain_tail(chain_first
, NULL
);
145 BUG_ON(tmp
!= chain_last
);
148 /* add chain to head */
149 set_page_private(chain_last
, (unsigned long)*head
);
153 static struct page
*drbd_pp_first_pages_or_try_alloc(struct drbd_conf
*mdev
, int number
)
155 struct page
*page
= NULL
;
156 struct page
*tmp
= NULL
;
159 /* Yes, testing drbd_pp_vacant outside the lock is racy.
160 * So what. It saves a spin_lock. */
161 if (drbd_pp_vacant
>= number
) {
162 spin_lock(&drbd_pp_lock
);
163 page
= page_chain_del(&drbd_pp_pool
, number
);
165 drbd_pp_vacant
-= number
;
166 spin_unlock(&drbd_pp_lock
);
171 /* GFP_TRY, because we must not cause arbitrary write-out: in a DRBD
172 * "criss-cross" setup, that might cause write-out on some other DRBD,
173 * which in turn might block on the other node at this very place. */
174 for (i
= 0; i
< number
; i
++) {
175 tmp
= alloc_page(GFP_TRY
);
178 set_page_private(tmp
, (unsigned long)page
);
185 /* Not enough pages immediately available this time.
186 * No need to jump around here, drbd_pp_alloc will retry this
187 * function "soon". */
189 tmp
= page_chain_tail(page
, NULL
);
190 spin_lock(&drbd_pp_lock
);
191 page_chain_add(&drbd_pp_pool
, page
, tmp
);
193 spin_unlock(&drbd_pp_lock
);
198 static void reclaim_net_ee(struct drbd_conf
*mdev
, struct list_head
*to_be_freed
)
200 struct drbd_peer_request
*peer_req
;
201 struct list_head
*le
, *tle
;
203 /* The EEs are always appended to the end of the list. Since
204 they are sent in order over the wire, they have to finish
205 in order. As soon as we see the first not finished we can
206 stop to examine the list... */
208 list_for_each_safe(le
, tle
, &mdev
->net_ee
) {
209 peer_req
= list_entry(le
, struct drbd_peer_request
, w
.list
);
210 if (drbd_ee_has_active_page(peer_req
))
212 list_move(le
, to_be_freed
);
216 static void drbd_kick_lo_and_reclaim_net(struct drbd_conf
*mdev
)
218 LIST_HEAD(reclaimed
);
219 struct drbd_peer_request
*peer_req
, *t
;
221 spin_lock_irq(&mdev
->tconn
->req_lock
);
222 reclaim_net_ee(mdev
, &reclaimed
);
223 spin_unlock_irq(&mdev
->tconn
->req_lock
);
225 list_for_each_entry_safe(peer_req
, t
, &reclaimed
, w
.list
)
226 drbd_free_net_peer_req(mdev
, peer_req
);
230 * drbd_pp_alloc() - Returns @number pages, retries forever (or until signalled)
231 * @mdev: DRBD device.
232 * @number: number of pages requested
233 * @retry: whether to retry, if not enough pages are available right now
235 * Tries to allocate number pages, first from our own page pool, then from
236 * the kernel, unless this allocation would exceed the max_buffers setting.
237 * Possibly retry until DRBD frees sufficient pages somewhere else.
239 * Returns a page chain linked via page->private.
241 static struct page
*drbd_pp_alloc(struct drbd_conf
*mdev
, unsigned number
, bool retry
)
243 struct page
*page
= NULL
;
246 /* Yes, we may run up to @number over max_buffers. If we
247 * follow it strictly, the admin will get it wrong anyways. */
248 if (atomic_read(&mdev
->pp_in_use
) < mdev
->tconn
->net_conf
->max_buffers
)
249 page
= drbd_pp_first_pages_or_try_alloc(mdev
, number
);
251 while (page
== NULL
) {
252 prepare_to_wait(&drbd_pp_wait
, &wait
, TASK_INTERRUPTIBLE
);
254 drbd_kick_lo_and_reclaim_net(mdev
);
256 if (atomic_read(&mdev
->pp_in_use
) < mdev
->tconn
->net_conf
->max_buffers
) {
257 page
= drbd_pp_first_pages_or_try_alloc(mdev
, number
);
265 if (signal_pending(current
)) {
266 dev_warn(DEV
, "drbd_pp_alloc interrupted!\n");
272 finish_wait(&drbd_pp_wait
, &wait
);
275 atomic_add(number
, &mdev
->pp_in_use
);
279 /* Must not be used from irq, as that may deadlock: see drbd_pp_alloc.
280 * Is also used from inside an other spin_lock_irq(&mdev->tconn->req_lock);
281 * Either links the page chain back to the global pool,
282 * or returns all pages to the system. */
283 static void drbd_pp_free(struct drbd_conf
*mdev
, struct page
*page
, int is_net
)
285 atomic_t
*a
= is_net
? &mdev
->pp_in_use_by_net
: &mdev
->pp_in_use
;
288 if (drbd_pp_vacant
> (DRBD_MAX_BIO_SIZE
/PAGE_SIZE
) * minor_count
)
289 i
= page_chain_free(page
);
292 tmp
= page_chain_tail(page
, &i
);
293 spin_lock(&drbd_pp_lock
);
294 page_chain_add(&drbd_pp_pool
, page
, tmp
);
296 spin_unlock(&drbd_pp_lock
);
298 i
= atomic_sub_return(i
, a
);
300 dev_warn(DEV
, "ASSERTION FAILED: %s: %d < 0\n",
301 is_net
? "pp_in_use_by_net" : "pp_in_use", i
);
302 wake_up(&drbd_pp_wait
);
306 You need to hold the req_lock:
307 _drbd_wait_ee_list_empty()
309 You must not have the req_lock:
311 drbd_alloc_peer_req()
314 drbd_process_done_ee()
316 drbd_wait_ee_list_empty()
319 struct drbd_peer_request
*
320 drbd_alloc_peer_req(struct drbd_conf
*mdev
, u64 id
, sector_t sector
,
321 unsigned int data_size
, gfp_t gfp_mask
) __must_hold(local
)
323 struct drbd_peer_request
*peer_req
;
325 unsigned nr_pages
= (data_size
+ PAGE_SIZE
-1) >> PAGE_SHIFT
;
327 if (drbd_insert_fault(mdev
, DRBD_FAULT_AL_EE
))
330 peer_req
= mempool_alloc(drbd_ee_mempool
, gfp_mask
& ~__GFP_HIGHMEM
);
332 if (!(gfp_mask
& __GFP_NOWARN
))
333 dev_err(DEV
, "%s: allocation failed\n", __func__
);
337 page
= drbd_pp_alloc(mdev
, nr_pages
, (gfp_mask
& __GFP_WAIT
));
341 drbd_clear_interval(&peer_req
->i
);
342 peer_req
->i
.size
= data_size
;
343 peer_req
->i
.sector
= sector
;
344 peer_req
->i
.local
= false;
345 peer_req
->i
.waiting
= false;
347 peer_req
->epoch
= NULL
;
348 peer_req
->w
.mdev
= mdev
;
349 peer_req
->pages
= page
;
350 atomic_set(&peer_req
->pending_bios
, 0);
353 * The block_id is opaque to the receiver. It is not endianness
354 * converted, and sent back to the sender unchanged.
356 peer_req
->block_id
= id
;
361 mempool_free(peer_req
, drbd_ee_mempool
);
365 void __drbd_free_peer_req(struct drbd_conf
*mdev
, struct drbd_peer_request
*peer_req
,
368 if (peer_req
->flags
& EE_HAS_DIGEST
)
369 kfree(peer_req
->digest
);
370 drbd_pp_free(mdev
, peer_req
->pages
, is_net
);
371 D_ASSERT(atomic_read(&peer_req
->pending_bios
) == 0);
372 D_ASSERT(drbd_interval_empty(&peer_req
->i
));
373 mempool_free(peer_req
, drbd_ee_mempool
);
376 int drbd_release_ee(struct drbd_conf
*mdev
, struct list_head
*list
)
378 LIST_HEAD(work_list
);
379 struct drbd_peer_request
*peer_req
, *t
;
381 int is_net
= list
== &mdev
->net_ee
;
383 spin_lock_irq(&mdev
->tconn
->req_lock
);
384 list_splice_init(list
, &work_list
);
385 spin_unlock_irq(&mdev
->tconn
->req_lock
);
387 list_for_each_entry_safe(peer_req
, t
, &work_list
, w
.list
) {
388 __drbd_free_peer_req(mdev
, peer_req
, is_net
);
395 /* See also comments in _req_mod(,BARRIER_ACKED)
396 * and receive_Barrier.
398 * Move entries from net_ee to done_ee, if ready.
399 * Grab done_ee, call all callbacks, free the entries.
400 * The callbacks typically send out ACKs.
402 static int drbd_process_done_ee(struct drbd_conf
*mdev
)
404 LIST_HEAD(work_list
);
405 LIST_HEAD(reclaimed
);
406 struct drbd_peer_request
*peer_req
, *t
;
409 spin_lock_irq(&mdev
->tconn
->req_lock
);
410 reclaim_net_ee(mdev
, &reclaimed
);
411 list_splice_init(&mdev
->done_ee
, &work_list
);
412 spin_unlock_irq(&mdev
->tconn
->req_lock
);
414 list_for_each_entry_safe(peer_req
, t
, &reclaimed
, w
.list
)
415 drbd_free_net_peer_req(mdev
, peer_req
);
417 /* possible callbacks here:
418 * e_end_block, and e_end_resync_block, e_send_discard_write.
419 * all ignore the last argument.
421 list_for_each_entry_safe(peer_req
, t
, &work_list
, w
.list
) {
424 /* list_del not necessary, next/prev members not touched */
425 err2
= peer_req
->w
.cb(&peer_req
->w
, !!err
);
428 drbd_free_peer_req(mdev
, peer_req
);
430 wake_up(&mdev
->ee_wait
);
435 void _drbd_wait_ee_list_empty(struct drbd_conf
*mdev
, struct list_head
*head
)
439 /* avoids spin_lock/unlock
440 * and calling prepare_to_wait in the fast path */
441 while (!list_empty(head
)) {
442 prepare_to_wait(&mdev
->ee_wait
, &wait
, TASK_UNINTERRUPTIBLE
);
443 spin_unlock_irq(&mdev
->tconn
->req_lock
);
445 finish_wait(&mdev
->ee_wait
, &wait
);
446 spin_lock_irq(&mdev
->tconn
->req_lock
);
450 void drbd_wait_ee_list_empty(struct drbd_conf
*mdev
, struct list_head
*head
)
452 spin_lock_irq(&mdev
->tconn
->req_lock
);
453 _drbd_wait_ee_list_empty(mdev
, head
);
454 spin_unlock_irq(&mdev
->tconn
->req_lock
);
457 /* see also kernel_accept; which is only present since 2.6.18.
458 * also we want to log which part of it failed, exactly */
459 static int drbd_accept(const char **what
, struct socket
*sock
, struct socket
**newsock
)
461 struct sock
*sk
= sock
->sk
;
465 err
= sock
->ops
->listen(sock
, 5);
469 *what
= "sock_create_lite";
470 err
= sock_create_lite(sk
->sk_family
, sk
->sk_type
, sk
->sk_protocol
,
476 err
= sock
->ops
->accept(sock
, *newsock
, 0);
478 sock_release(*newsock
);
482 (*newsock
)->ops
= sock
->ops
;
488 static int drbd_recv_short(struct socket
*sock
, void *buf
, size_t size
, int flags
)
495 struct msghdr msg
= {
497 .msg_iov
= (struct iovec
*)&iov
,
498 .msg_flags
= (flags
? flags
: MSG_WAITALL
| MSG_NOSIGNAL
)
504 rv
= sock_recvmsg(sock
, &msg
, size
, msg
.msg_flags
);
510 static int drbd_recv(struct drbd_tconn
*tconn
, void *buf
, size_t size
)
517 struct msghdr msg
= {
519 .msg_iov
= (struct iovec
*)&iov
,
520 .msg_flags
= MSG_WAITALL
| MSG_NOSIGNAL
528 rv
= sock_recvmsg(tconn
->data
.socket
, &msg
, size
, msg
.msg_flags
);
533 * ECONNRESET other side closed the connection
534 * ERESTARTSYS (on sock) we got a signal
538 if (rv
== -ECONNRESET
)
539 conn_info(tconn
, "sock was reset by peer\n");
540 else if (rv
!= -ERESTARTSYS
)
541 conn_err(tconn
, "sock_recvmsg returned %d\n", rv
);
543 } else if (rv
== 0) {
544 conn_info(tconn
, "sock was shut down by peer\n");
547 /* signal came in, or peer/link went down,
548 * after we read a partial message
550 /* D_ASSERT(signal_pending(current)); */
558 conn_request_state(tconn
, NS(conn
, C_BROKEN_PIPE
), CS_HARD
);
563 static int drbd_recv_all(struct drbd_tconn
*tconn
, void *buf
, size_t size
)
567 err
= drbd_recv(tconn
, buf
, size
);
576 static int drbd_recv_all_warn(struct drbd_tconn
*tconn
, void *buf
, size_t size
)
580 err
= drbd_recv_all(tconn
, buf
, size
);
581 if (err
&& !signal_pending(current
))
582 conn_warn(tconn
, "short read (expected size %d)\n", (int)size
);
587 * On individual connections, the socket buffer size must be set prior to the
588 * listen(2) or connect(2) calls in order to have it take effect.
589 * This is our wrapper to do so.
591 static void drbd_setbufsize(struct socket
*sock
, unsigned int snd
,
594 /* open coded SO_SNDBUF, SO_RCVBUF */
596 sock
->sk
->sk_sndbuf
= snd
;
597 sock
->sk
->sk_userlocks
|= SOCK_SNDBUF_LOCK
;
600 sock
->sk
->sk_rcvbuf
= rcv
;
601 sock
->sk
->sk_userlocks
|= SOCK_RCVBUF_LOCK
;
605 static struct socket
*drbd_try_connect(struct drbd_tconn
*tconn
)
609 struct sockaddr_in6 src_in6
;
611 int disconnect_on_error
= 1;
613 if (!get_net_conf(tconn
))
616 what
= "sock_create_kern";
617 err
= sock_create_kern(((struct sockaddr
*)tconn
->net_conf
->my_addr
)->sa_family
,
618 SOCK_STREAM
, IPPROTO_TCP
, &sock
);
624 sock
->sk
->sk_rcvtimeo
=
625 sock
->sk
->sk_sndtimeo
= tconn
->net_conf
->try_connect_int
*HZ
;
626 drbd_setbufsize(sock
, tconn
->net_conf
->sndbuf_size
,
627 tconn
->net_conf
->rcvbuf_size
);
629 /* explicitly bind to the configured IP as source IP
630 * for the outgoing connections.
631 * This is needed for multihomed hosts and to be
632 * able to use lo: interfaces for drbd.
633 * Make sure to use 0 as port number, so linux selects
634 * a free one dynamically.
636 memcpy(&src_in6
, tconn
->net_conf
->my_addr
,
637 min_t(int, tconn
->net_conf
->my_addr_len
, sizeof(src_in6
)));
638 if (((struct sockaddr
*)tconn
->net_conf
->my_addr
)->sa_family
== AF_INET6
)
639 src_in6
.sin6_port
= 0;
641 ((struct sockaddr_in
*)&src_in6
)->sin_port
= 0; /* AF_INET & AF_SCI */
643 what
= "bind before connect";
644 err
= sock
->ops
->bind(sock
,
645 (struct sockaddr
*) &src_in6
,
646 tconn
->net_conf
->my_addr_len
);
650 /* connect may fail, peer not yet available.
651 * stay C_WF_CONNECTION, don't go Disconnecting! */
652 disconnect_on_error
= 0;
654 err
= sock
->ops
->connect(sock
,
655 (struct sockaddr
*)tconn
->net_conf
->peer_addr
,
656 tconn
->net_conf
->peer_addr_len
, 0);
665 /* timeout, busy, signal pending */
666 case ETIMEDOUT
: case EAGAIN
: case EINPROGRESS
:
667 case EINTR
: case ERESTARTSYS
:
668 /* peer not (yet) available, network problem */
669 case ECONNREFUSED
: case ENETUNREACH
:
670 case EHOSTDOWN
: case EHOSTUNREACH
:
671 disconnect_on_error
= 0;
674 conn_err(tconn
, "%s failed, err = %d\n", what
, err
);
676 if (disconnect_on_error
)
677 conn_request_state(tconn
, NS(conn
, C_DISCONNECTING
), CS_HARD
);
683 static struct socket
*drbd_wait_for_connect(struct drbd_tconn
*tconn
)
686 struct socket
*s_estab
= NULL
, *s_listen
;
689 if (!get_net_conf(tconn
))
692 what
= "sock_create_kern";
693 err
= sock_create_kern(((struct sockaddr
*)tconn
->net_conf
->my_addr
)->sa_family
,
694 SOCK_STREAM
, IPPROTO_TCP
, &s_listen
);
700 timeo
= tconn
->net_conf
->try_connect_int
* HZ
;
701 timeo
+= (random32() & 1) ? timeo
/ 7 : -timeo
/ 7; /* 28.5% random jitter */
703 s_listen
->sk
->sk_reuse
= 1; /* SO_REUSEADDR */
704 s_listen
->sk
->sk_rcvtimeo
= timeo
;
705 s_listen
->sk
->sk_sndtimeo
= timeo
;
706 drbd_setbufsize(s_listen
, tconn
->net_conf
->sndbuf_size
,
707 tconn
->net_conf
->rcvbuf_size
);
709 what
= "bind before listen";
710 err
= s_listen
->ops
->bind(s_listen
,
711 (struct sockaddr
*) tconn
->net_conf
->my_addr
,
712 tconn
->net_conf
->my_addr_len
);
716 err
= drbd_accept(&what
, s_listen
, &s_estab
);
720 sock_release(s_listen
);
722 if (err
!= -EAGAIN
&& err
!= -EINTR
&& err
!= -ERESTARTSYS
) {
723 conn_err(tconn
, "%s failed, err = %d\n", what
, err
);
724 conn_request_state(tconn
, NS(conn
, C_DISCONNECTING
), CS_HARD
);
732 static int decode_header(struct drbd_tconn
*, void *, struct packet_info
*);
734 static int send_first_packet(struct drbd_tconn
*tconn
, struct drbd_socket
*sock
,
735 enum drbd_packet cmd
)
737 if (!conn_prepare_command(tconn
, sock
))
739 return conn_send_command(tconn
, sock
, cmd
, 0, NULL
, 0);
742 static int receive_first_packet(struct drbd_tconn
*tconn
, struct socket
*sock
)
744 unsigned int header_size
= drbd_header_size(tconn
);
745 struct packet_info pi
;
748 err
= drbd_recv_short(sock
, tconn
->data
.rbuf
, header_size
, 0);
749 if (err
!= header_size
) {
754 err
= decode_header(tconn
, tconn
->data
.rbuf
, &pi
);
761 * drbd_socket_okay() - Free the socket if its connection is not okay
762 * @sock: pointer to the pointer to the socket.
764 static int drbd_socket_okay(struct socket
**sock
)
772 rr
= drbd_recv_short(*sock
, tb
, 4, MSG_DONTWAIT
| MSG_PEEK
);
774 if (rr
> 0 || rr
== -EAGAIN
) {
782 /* Gets called if a connection is established, or if a new minor gets created
784 int drbd_connected(int vnr
, void *p
, void *data
)
786 struct drbd_conf
*mdev
= (struct drbd_conf
*)p
;
789 atomic_set(&mdev
->packet_seq
, 0);
792 mdev
->state_mutex
= mdev
->tconn
->agreed_pro_version
< 100 ?
793 &mdev
->tconn
->cstate_mutex
:
794 &mdev
->own_state_mutex
;
796 err
= drbd_send_sync_param(mdev
);
798 err
= drbd_send_sizes(mdev
, 0, 0);
800 err
= drbd_send_uuids(mdev
);
802 err
= drbd_send_state(mdev
);
803 clear_bit(USE_DEGR_WFC_T
, &mdev
->flags
);
804 clear_bit(RESIZE_PENDING
, &mdev
->flags
);
805 mod_timer(&mdev
->request_timer
, jiffies
+ HZ
); /* just start it here. */
811 * 1 yes, we have a valid connection
812 * 0 oops, did not work out, please try again
813 * -1 peer talks different language,
814 * no point in trying again, please go standalone.
815 * -2 We do not have a network config...
817 static int drbd_connect(struct drbd_tconn
*tconn
)
819 struct socket
*sock
, *msock
;
822 if (conn_request_state(tconn
, NS(conn
, C_WF_CONNECTION
), CS_VERBOSE
) < SS_SUCCESS
)
825 clear_bit(DISCARD_CONCURRENT
, &tconn
->flags
);
827 /* Assume that the peer only understands protocol 80 until we know better. */
828 tconn
->agreed_pro_version
= 80;
834 /* 3 tries, this should take less than a second! */
835 s
= drbd_try_connect(tconn
);
838 /* give the other side time to call bind() & listen() */
839 schedule_timeout_interruptible(HZ
/ 10);
843 if (!tconn
->data
.socket
) {
844 tconn
->data
.socket
= s
;
845 send_first_packet(tconn
, &tconn
->data
, P_INITIAL_DATA
);
846 } else if (!tconn
->meta
.socket
) {
847 tconn
->meta
.socket
= s
;
848 send_first_packet(tconn
, &tconn
->meta
, P_INITIAL_META
);
850 conn_err(tconn
, "Logic error in drbd_connect()\n");
851 goto out_release_sockets
;
855 if (tconn
->data
.socket
&& tconn
->meta
.socket
) {
856 schedule_timeout_interruptible(tconn
->net_conf
->ping_timeo
*HZ
/10);
857 ok
= drbd_socket_okay(&tconn
->data
.socket
);
858 ok
= drbd_socket_okay(&tconn
->meta
.socket
) && ok
;
864 s
= drbd_wait_for_connect(tconn
);
866 try = receive_first_packet(tconn
, s
);
867 drbd_socket_okay(&tconn
->data
.socket
);
868 drbd_socket_okay(&tconn
->meta
.socket
);
871 if (tconn
->data
.socket
) {
872 conn_warn(tconn
, "initial packet S crossed\n");
873 sock_release(tconn
->data
.socket
);
875 tconn
->data
.socket
= s
;
878 if (tconn
->meta
.socket
) {
879 conn_warn(tconn
, "initial packet M crossed\n");
880 sock_release(tconn
->meta
.socket
);
882 tconn
->meta
.socket
= s
;
883 set_bit(DISCARD_CONCURRENT
, &tconn
->flags
);
886 conn_warn(tconn
, "Error receiving initial packet\n");
893 if (tconn
->cstate
<= C_DISCONNECTING
)
894 goto out_release_sockets
;
895 if (signal_pending(current
)) {
896 flush_signals(current
);
898 if (get_t_state(&tconn
->receiver
) == EXITING
)
899 goto out_release_sockets
;
902 if (tconn
->data
.socket
&& &tconn
->meta
.socket
) {
903 ok
= drbd_socket_okay(&tconn
->data
.socket
);
904 ok
= drbd_socket_okay(&tconn
->meta
.socket
) && ok
;
910 sock
= tconn
->data
.socket
;
911 msock
= tconn
->meta
.socket
;
913 msock
->sk
->sk_reuse
= 1; /* SO_REUSEADDR */
914 sock
->sk
->sk_reuse
= 1; /* SO_REUSEADDR */
916 sock
->sk
->sk_allocation
= GFP_NOIO
;
917 msock
->sk
->sk_allocation
= GFP_NOIO
;
919 sock
->sk
->sk_priority
= TC_PRIO_INTERACTIVE_BULK
;
920 msock
->sk
->sk_priority
= TC_PRIO_INTERACTIVE
;
923 * sock->sk->sk_sndtimeo = tconn->net_conf->timeout*HZ/10;
924 * sock->sk->sk_rcvtimeo = MAX_SCHEDULE_TIMEOUT;
925 * first set it to the P_CONNECTION_FEATURES timeout,
926 * which we set to 4x the configured ping_timeout. */
927 sock
->sk
->sk_sndtimeo
=
928 sock
->sk
->sk_rcvtimeo
= tconn
->net_conf
->ping_timeo
*4*HZ
/10;
930 msock
->sk
->sk_sndtimeo
= tconn
->net_conf
->timeout
*HZ
/10;
931 msock
->sk
->sk_rcvtimeo
= tconn
->net_conf
->ping_int
*HZ
;
933 /* we don't want delays.
934 * we use TCP_CORK where appropriate, though */
935 drbd_tcp_nodelay(sock
);
936 drbd_tcp_nodelay(msock
);
938 tconn
->last_received
= jiffies
;
940 h
= drbd_do_features(tconn
);
944 if (tconn
->cram_hmac_tfm
) {
945 /* drbd_request_state(mdev, NS(conn, WFAuth)); */
946 switch (drbd_do_auth(tconn
)) {
948 conn_err(tconn
, "Authentication of peer failed\n");
951 conn_err(tconn
, "Authentication of peer failed, trying again.\n");
956 if (conn_request_state(tconn
, NS(conn
, C_WF_REPORT_PARAMS
), CS_VERBOSE
) < SS_SUCCESS
)
959 sock
->sk
->sk_sndtimeo
= tconn
->net_conf
->timeout
*HZ
/10;
960 sock
->sk
->sk_rcvtimeo
= MAX_SCHEDULE_TIMEOUT
;
962 drbd_thread_start(&tconn
->asender
);
964 if (drbd_send_protocol(tconn
) == -EOPNOTSUPP
)
967 return !idr_for_each(&tconn
->volumes
, drbd_connected
, tconn
);
970 if (tconn
->data
.socket
) {
971 sock_release(tconn
->data
.socket
);
972 tconn
->data
.socket
= NULL
;
974 if (tconn
->meta
.socket
) {
975 sock_release(tconn
->meta
.socket
);
976 tconn
->meta
.socket
= NULL
;
981 static int decode_header(struct drbd_tconn
*tconn
, void *header
, struct packet_info
*pi
)
983 unsigned int header_size
= drbd_header_size(tconn
);
985 if (header_size
== sizeof(struct p_header100
) &&
986 *(__be32
*)header
== cpu_to_be32(DRBD_MAGIC_100
)) {
987 struct p_header100
*h
= header
;
989 conn_err(tconn
, "Header padding is not zero\n");
992 pi
->vnr
= be16_to_cpu(h
->volume
);
993 pi
->cmd
= be16_to_cpu(h
->command
);
994 pi
->size
= be32_to_cpu(h
->length
);
995 } else if (header_size
== sizeof(struct p_header95
) &&
996 *(__be16
*)header
== cpu_to_be16(DRBD_MAGIC_BIG
)) {
997 struct p_header95
*h
= header
;
998 pi
->cmd
= be16_to_cpu(h
->command
);
999 pi
->size
= be32_to_cpu(h
->length
);
1001 } else if (header_size
== sizeof(struct p_header80
) &&
1002 *(__be32
*)header
== cpu_to_be32(DRBD_MAGIC
)) {
1003 struct p_header80
*h
= header
;
1004 pi
->cmd
= be16_to_cpu(h
->command
);
1005 pi
->size
= be16_to_cpu(h
->length
);
1008 conn_err(tconn
, "Wrong magic value 0x%08x in protocol version %d\n",
1009 be32_to_cpu(*(__be32
*)header
),
1010 tconn
->agreed_pro_version
);
1013 pi
->data
= header
+ header_size
;
1017 static int drbd_recv_header(struct drbd_tconn
*tconn
, struct packet_info
*pi
)
1019 void *buffer
= tconn
->data
.rbuf
;
1022 err
= drbd_recv_all_warn(tconn
, buffer
, drbd_header_size(tconn
));
1026 err
= decode_header(tconn
, buffer
, pi
);
1027 tconn
->last_received
= jiffies
;
1032 static void drbd_flush(struct drbd_conf
*mdev
)
1036 if (mdev
->write_ordering
>= WO_bdev_flush
&& get_ldev(mdev
)) {
1037 rv
= blkdev_issue_flush(mdev
->ldev
->backing_bdev
, GFP_KERNEL
,
1040 dev_err(DEV
, "local disk flush failed with status %d\n", rv
);
1041 /* would rather check on EOPNOTSUPP, but that is not reliable.
1042 * don't try again for ANY return value != 0
1043 * if (rv == -EOPNOTSUPP) */
1044 drbd_bump_write_ordering(mdev
, WO_drain_io
);
1051 * drbd_may_finish_epoch() - Applies an epoch_event to the epoch's state, eventually finishes it.
1052 * @mdev: DRBD device.
1053 * @epoch: Epoch object.
1056 static enum finish_epoch
drbd_may_finish_epoch(struct drbd_conf
*mdev
,
1057 struct drbd_epoch
*epoch
,
1058 enum epoch_event ev
)
1061 struct drbd_epoch
*next_epoch
;
1062 enum finish_epoch rv
= FE_STILL_LIVE
;
1064 spin_lock(&mdev
->epoch_lock
);
1068 epoch_size
= atomic_read(&epoch
->epoch_size
);
1070 switch (ev
& ~EV_CLEANUP
) {
1072 atomic_dec(&epoch
->active
);
1074 case EV_GOT_BARRIER_NR
:
1075 set_bit(DE_HAVE_BARRIER_NUMBER
, &epoch
->flags
);
1077 case EV_BECAME_LAST
:
1082 if (epoch_size
!= 0 &&
1083 atomic_read(&epoch
->active
) == 0 &&
1084 test_bit(DE_HAVE_BARRIER_NUMBER
, &epoch
->flags
)) {
1085 if (!(ev
& EV_CLEANUP
)) {
1086 spin_unlock(&mdev
->epoch_lock
);
1087 drbd_send_b_ack(mdev
, epoch
->barrier_nr
, epoch_size
);
1088 spin_lock(&mdev
->epoch_lock
);
1092 if (mdev
->current_epoch
!= epoch
) {
1093 next_epoch
= list_entry(epoch
->list
.next
, struct drbd_epoch
, list
);
1094 list_del(&epoch
->list
);
1095 ev
= EV_BECAME_LAST
| (ev
& EV_CLEANUP
);
1099 if (rv
== FE_STILL_LIVE
)
1103 atomic_set(&epoch
->epoch_size
, 0);
1104 /* atomic_set(&epoch->active, 0); is already zero */
1105 if (rv
== FE_STILL_LIVE
)
1107 wake_up(&mdev
->ee_wait
);
1117 spin_unlock(&mdev
->epoch_lock
);
1123 * drbd_bump_write_ordering() - Fall back to an other write ordering method
1124 * @mdev: DRBD device.
1125 * @wo: Write ordering method to try.
1127 void drbd_bump_write_ordering(struct drbd_conf
*mdev
, enum write_ordering_e wo
) __must_hold(local
)
1129 enum write_ordering_e pwo
;
1130 static char *write_ordering_str
[] = {
1132 [WO_drain_io
] = "drain",
1133 [WO_bdev_flush
] = "flush",
1136 pwo
= mdev
->write_ordering
;
1138 if (wo
== WO_bdev_flush
&& mdev
->ldev
->dc
.no_disk_flush
)
1140 if (wo
== WO_drain_io
&& mdev
->ldev
->dc
.no_disk_drain
)
1142 mdev
->write_ordering
= wo
;
1143 if (pwo
!= mdev
->write_ordering
|| wo
== WO_bdev_flush
)
1144 dev_info(DEV
, "Method to ensure write ordering: %s\n", write_ordering_str
[mdev
->write_ordering
]);
1148 * drbd_submit_peer_request()
1149 * @mdev: DRBD device.
1150 * @peer_req: peer request
1151 * @rw: flag field, see bio->bi_rw
1153 * May spread the pages to multiple bios,
1154 * depending on bio_add_page restrictions.
1156 * Returns 0 if all bios have been submitted,
1157 * -ENOMEM if we could not allocate enough bios,
1158 * -ENOSPC (any better suggestion?) if we have not been able to bio_add_page a
1159 * single page to an empty bio (which should never happen and likely indicates
1160 * that the lower level IO stack is in some way broken). This has been observed
1161 * on certain Xen deployments.
1163 /* TODO allocate from our own bio_set. */
1164 int drbd_submit_peer_request(struct drbd_conf
*mdev
,
1165 struct drbd_peer_request
*peer_req
,
1166 const unsigned rw
, const int fault_type
)
1168 struct bio
*bios
= NULL
;
1170 struct page
*page
= peer_req
->pages
;
1171 sector_t sector
= peer_req
->i
.sector
;
1172 unsigned ds
= peer_req
->i
.size
;
1173 unsigned n_bios
= 0;
1174 unsigned nr_pages
= (ds
+ PAGE_SIZE
-1) >> PAGE_SHIFT
;
1177 /* In most cases, we will only need one bio. But in case the lower
1178 * level restrictions happen to be different at this offset on this
1179 * side than those of the sending peer, we may need to submit the
1180 * request in more than one bio.
1182 * Plain bio_alloc is good enough here, this is no DRBD internally
1183 * generated bio, but a bio allocated on behalf of the peer.
1186 bio
= bio_alloc(GFP_NOIO
, nr_pages
);
1188 dev_err(DEV
, "submit_ee: Allocation of a bio failed\n");
1191 /* > peer_req->i.sector, unless this is the first bio */
1192 bio
->bi_sector
= sector
;
1193 bio
->bi_bdev
= mdev
->ldev
->backing_bdev
;
1195 bio
->bi_private
= peer_req
;
1196 bio
->bi_end_io
= drbd_peer_request_endio
;
1198 bio
->bi_next
= bios
;
1202 page_chain_for_each(page
) {
1203 unsigned len
= min_t(unsigned, ds
, PAGE_SIZE
);
1204 if (!bio_add_page(bio
, page
, len
, 0)) {
1205 /* A single page must always be possible!
1206 * But in case it fails anyways,
1207 * we deal with it, and complain (below). */
1208 if (bio
->bi_vcnt
== 0) {
1210 "bio_add_page failed for len=%u, "
1211 "bi_vcnt=0 (bi_sector=%llu)\n",
1212 len
, (unsigned long long)bio
->bi_sector
);
1222 D_ASSERT(page
== NULL
);
1225 atomic_set(&peer_req
->pending_bios
, n_bios
);
1228 bios
= bios
->bi_next
;
1229 bio
->bi_next
= NULL
;
1231 drbd_generic_make_request(mdev
, fault_type
, bio
);
1238 bios
= bios
->bi_next
;
1244 static void drbd_remove_epoch_entry_interval(struct drbd_conf
*mdev
,
1245 struct drbd_peer_request
*peer_req
)
1247 struct drbd_interval
*i
= &peer_req
->i
;
1249 drbd_remove_interval(&mdev
->write_requests
, i
);
1250 drbd_clear_interval(i
);
1252 /* Wake up any processes waiting for this peer request to complete. */
1254 wake_up(&mdev
->misc_wait
);
1257 static int receive_Barrier(struct drbd_tconn
*tconn
, struct packet_info
*pi
)
1259 struct drbd_conf
*mdev
;
1261 struct p_barrier
*p
= pi
->data
;
1262 struct drbd_epoch
*epoch
;
1264 mdev
= vnr_to_mdev(tconn
, pi
->vnr
);
1270 mdev
->current_epoch
->barrier_nr
= p
->barrier
;
1271 rv
= drbd_may_finish_epoch(mdev
, mdev
->current_epoch
, EV_GOT_BARRIER_NR
);
1273 /* P_BARRIER_ACK may imply that the corresponding extent is dropped from
1274 * the activity log, which means it would not be resynced in case the
1275 * R_PRIMARY crashes now.
1276 * Therefore we must send the barrier_ack after the barrier request was
1278 switch (mdev
->write_ordering
) {
1280 if (rv
== FE_RECYCLED
)
1283 /* receiver context, in the writeout path of the other node.
1284 * avoid potential distributed deadlock */
1285 epoch
= kmalloc(sizeof(struct drbd_epoch
), GFP_NOIO
);
1289 dev_warn(DEV
, "Allocation of an epoch failed, slowing down\n");
1294 drbd_wait_ee_list_empty(mdev
, &mdev
->active_ee
);
1297 if (atomic_read(&mdev
->current_epoch
->epoch_size
)) {
1298 epoch
= kmalloc(sizeof(struct drbd_epoch
), GFP_NOIO
);
1303 epoch
= mdev
->current_epoch
;
1304 wait_event(mdev
->ee_wait
, atomic_read(&epoch
->epoch_size
) == 0);
1306 D_ASSERT(atomic_read(&epoch
->active
) == 0);
1307 D_ASSERT(epoch
->flags
== 0);
1311 dev_err(DEV
, "Strangeness in mdev->write_ordering %d\n", mdev
->write_ordering
);
1316 atomic_set(&epoch
->epoch_size
, 0);
1317 atomic_set(&epoch
->active
, 0);
1319 spin_lock(&mdev
->epoch_lock
);
1320 if (atomic_read(&mdev
->current_epoch
->epoch_size
)) {
1321 list_add(&epoch
->list
, &mdev
->current_epoch
->list
);
1322 mdev
->current_epoch
= epoch
;
1325 /* The current_epoch got recycled while we allocated this one... */
1328 spin_unlock(&mdev
->epoch_lock
);
1333 /* used from receive_RSDataReply (recv_resync_read)
1334 * and from receive_Data */
1335 static struct drbd_peer_request
*
1336 read_in_block(struct drbd_conf
*mdev
, u64 id
, sector_t sector
,
1337 int data_size
) __must_hold(local
)
1339 const sector_t capacity
= drbd_get_capacity(mdev
->this_bdev
);
1340 struct drbd_peer_request
*peer_req
;
1343 void *dig_in
= mdev
->tconn
->int_dig_in
;
1344 void *dig_vv
= mdev
->tconn
->int_dig_vv
;
1345 unsigned long *data
;
1347 dgs
= (mdev
->tconn
->agreed_pro_version
>= 87 && mdev
->tconn
->integrity_r_tfm
) ?
1348 crypto_hash_digestsize(mdev
->tconn
->integrity_r_tfm
) : 0;
1352 * FIXME: Receive the incoming digest into the receive buffer
1353 * here, together with its struct p_data?
1355 err
= drbd_recv_all_warn(mdev
->tconn
, dig_in
, dgs
);
1362 if (!expect(data_size
!= 0))
1364 if (!expect(IS_ALIGNED(data_size
, 512)))
1366 if (!expect(data_size
<= DRBD_MAX_BIO_SIZE
))
1369 /* even though we trust out peer,
1370 * we sometimes have to double check. */
1371 if (sector
+ (data_size
>>9) > capacity
) {
1372 dev_err(DEV
, "request from peer beyond end of local disk: "
1373 "capacity: %llus < sector: %llus + size: %u\n",
1374 (unsigned long long)capacity
,
1375 (unsigned long long)sector
, data_size
);
1379 /* GFP_NOIO, because we must not cause arbitrary write-out: in a DRBD
1380 * "criss-cross" setup, that might cause write-out on some other DRBD,
1381 * which in turn might block on the other node at this very place. */
1382 peer_req
= drbd_alloc_peer_req(mdev
, id
, sector
, data_size
, GFP_NOIO
);
1387 page
= peer_req
->pages
;
1388 page_chain_for_each(page
) {
1389 unsigned len
= min_t(int, ds
, PAGE_SIZE
);
1391 err
= drbd_recv_all_warn(mdev
->tconn
, data
, len
);
1392 if (drbd_insert_fault(mdev
, DRBD_FAULT_RECEIVE
)) {
1393 dev_err(DEV
, "Fault injection: Corrupting data on receive\n");
1394 data
[0] = data
[0] ^ (unsigned long)-1;
1398 drbd_free_peer_req(mdev
, peer_req
);
1405 drbd_csum_ee(mdev
, mdev
->tconn
->integrity_r_tfm
, peer_req
, dig_vv
);
1406 if (memcmp(dig_in
, dig_vv
, dgs
)) {
1407 dev_err(DEV
, "Digest integrity check FAILED: %llus +%u\n",
1408 (unsigned long long)sector
, data_size
);
1409 drbd_free_peer_req(mdev
, peer_req
);
1413 mdev
->recv_cnt
+= data_size
>>9;
1417 /* drbd_drain_block() just takes a data block
1418 * out of the socket input buffer, and discards it.
1420 static int drbd_drain_block(struct drbd_conf
*mdev
, int data_size
)
1429 page
= drbd_pp_alloc(mdev
, 1, 1);
1433 unsigned int len
= min_t(int, data_size
, PAGE_SIZE
);
1435 err
= drbd_recv_all_warn(mdev
->tconn
, data
, len
);
1441 drbd_pp_free(mdev
, page
, 0);
1445 static int recv_dless_read(struct drbd_conf
*mdev
, struct drbd_request
*req
,
1446 sector_t sector
, int data_size
)
1448 struct bio_vec
*bvec
;
1450 int dgs
, err
, i
, expect
;
1451 void *dig_in
= mdev
->tconn
->int_dig_in
;
1452 void *dig_vv
= mdev
->tconn
->int_dig_vv
;
1454 dgs
= (mdev
->tconn
->agreed_pro_version
>= 87 && mdev
->tconn
->integrity_r_tfm
) ?
1455 crypto_hash_digestsize(mdev
->tconn
->integrity_r_tfm
) : 0;
1458 err
= drbd_recv_all_warn(mdev
->tconn
, dig_in
, dgs
);
1465 /* optimistically update recv_cnt. if receiving fails below,
1466 * we disconnect anyways, and counters will be reset. */
1467 mdev
->recv_cnt
+= data_size
>>9;
1469 bio
= req
->master_bio
;
1470 D_ASSERT(sector
== bio
->bi_sector
);
1472 bio_for_each_segment(bvec
, bio
, i
) {
1473 void *mapped
= kmap(bvec
->bv_page
) + bvec
->bv_offset
;
1474 expect
= min_t(int, data_size
, bvec
->bv_len
);
1475 err
= drbd_recv_all_warn(mdev
->tconn
, mapped
, expect
);
1476 kunmap(bvec
->bv_page
);
1479 data_size
-= expect
;
1483 drbd_csum_bio(mdev
, mdev
->tconn
->integrity_r_tfm
, bio
, dig_vv
);
1484 if (memcmp(dig_in
, dig_vv
, dgs
)) {
1485 dev_err(DEV
, "Digest integrity check FAILED. Broken NICs?\n");
1490 D_ASSERT(data_size
== 0);
1494 /* e_end_resync_block() is called via
1495 * drbd_process_done_ee() by asender only */
1496 static int e_end_resync_block(struct drbd_work
*w
, int unused
)
1498 struct drbd_peer_request
*peer_req
=
1499 container_of(w
, struct drbd_peer_request
, w
);
1500 struct drbd_conf
*mdev
= w
->mdev
;
1501 sector_t sector
= peer_req
->i
.sector
;
1504 D_ASSERT(drbd_interval_empty(&peer_req
->i
));
1506 if (likely((peer_req
->flags
& EE_WAS_ERROR
) == 0)) {
1507 drbd_set_in_sync(mdev
, sector
, peer_req
->i
.size
);
1508 err
= drbd_send_ack(mdev
, P_RS_WRITE_ACK
, peer_req
);
1510 /* Record failure to sync */
1511 drbd_rs_failed_io(mdev
, sector
, peer_req
->i
.size
);
1513 err
= drbd_send_ack(mdev
, P_NEG_ACK
, peer_req
);
1520 static int recv_resync_read(struct drbd_conf
*mdev
, sector_t sector
, int data_size
) __releases(local
)
1522 struct drbd_peer_request
*peer_req
;
1524 peer_req
= read_in_block(mdev
, ID_SYNCER
, sector
, data_size
);
1528 dec_rs_pending(mdev
);
1531 /* corresponding dec_unacked() in e_end_resync_block()
1532 * respective _drbd_clear_done_ee */
1534 peer_req
->w
.cb
= e_end_resync_block
;
1536 spin_lock_irq(&mdev
->tconn
->req_lock
);
1537 list_add(&peer_req
->w
.list
, &mdev
->sync_ee
);
1538 spin_unlock_irq(&mdev
->tconn
->req_lock
);
1540 atomic_add(data_size
>> 9, &mdev
->rs_sect_ev
);
1541 if (drbd_submit_peer_request(mdev
, peer_req
, WRITE
, DRBD_FAULT_RS_WR
) == 0)
1544 /* don't care for the reason here */
1545 dev_err(DEV
, "submit failed, triggering re-connect\n");
1546 spin_lock_irq(&mdev
->tconn
->req_lock
);
1547 list_del(&peer_req
->w
.list
);
1548 spin_unlock_irq(&mdev
->tconn
->req_lock
);
1550 drbd_free_peer_req(mdev
, peer_req
);
1556 static struct drbd_request
*
1557 find_request(struct drbd_conf
*mdev
, struct rb_root
*root
, u64 id
,
1558 sector_t sector
, bool missing_ok
, const char *func
)
1560 struct drbd_request
*req
;
1562 /* Request object according to our peer */
1563 req
= (struct drbd_request
*)(unsigned long)id
;
1564 if (drbd_contains_interval(root
, sector
, &req
->i
) && req
->i
.local
)
1567 dev_err(DEV
, "%s: failed to find request %lu, sector %llus\n", func
,
1568 (unsigned long)id
, (unsigned long long)sector
);
1573 static int receive_DataReply(struct drbd_tconn
*tconn
, struct packet_info
*pi
)
1575 struct drbd_conf
*mdev
;
1576 struct drbd_request
*req
;
1579 struct p_data
*p
= pi
->data
;
1581 mdev
= vnr_to_mdev(tconn
, pi
->vnr
);
1585 sector
= be64_to_cpu(p
->sector
);
1587 spin_lock_irq(&mdev
->tconn
->req_lock
);
1588 req
= find_request(mdev
, &mdev
->read_requests
, p
->block_id
, sector
, false, __func__
);
1589 spin_unlock_irq(&mdev
->tconn
->req_lock
);
1593 /* hlist_del(&req->collision) is done in _req_may_be_done, to avoid
1594 * special casing it there for the various failure cases.
1595 * still no race with drbd_fail_pending_reads */
1596 err
= recv_dless_read(mdev
, req
, sector
, pi
->size
);
1598 req_mod(req
, DATA_RECEIVED
);
1599 /* else: nothing. handled from drbd_disconnect...
1600 * I don't think we may complete this just yet
1601 * in case we are "on-disconnect: freeze" */
1606 static int receive_RSDataReply(struct drbd_tconn
*tconn
, struct packet_info
*pi
)
1608 struct drbd_conf
*mdev
;
1611 struct p_data
*p
= pi
->data
;
1613 mdev
= vnr_to_mdev(tconn
, pi
->vnr
);
1617 sector
= be64_to_cpu(p
->sector
);
1618 D_ASSERT(p
->block_id
== ID_SYNCER
);
1620 if (get_ldev(mdev
)) {
1621 /* data is submitted to disk within recv_resync_read.
1622 * corresponding put_ldev done below on error,
1623 * or in drbd_peer_request_endio. */
1624 err
= recv_resync_read(mdev
, sector
, pi
->size
);
1626 if (__ratelimit(&drbd_ratelimit_state
))
1627 dev_err(DEV
, "Can not write resync data to local disk.\n");
1629 err
= drbd_drain_block(mdev
, pi
->size
);
1631 drbd_send_ack_dp(mdev
, P_NEG_ACK
, p
, pi
->size
);
1634 atomic_add(pi
->size
>> 9, &mdev
->rs_sect_in
);
1639 static int w_restart_write(struct drbd_work
*w
, int cancel
)
1641 struct drbd_request
*req
= container_of(w
, struct drbd_request
, w
);
1642 struct drbd_conf
*mdev
= w
->mdev
;
1644 unsigned long start_time
;
1645 unsigned long flags
;
1647 spin_lock_irqsave(&mdev
->tconn
->req_lock
, flags
);
1648 if (!expect(req
->rq_state
& RQ_POSTPONED
)) {
1649 spin_unlock_irqrestore(&mdev
->tconn
->req_lock
, flags
);
1652 bio
= req
->master_bio
;
1653 start_time
= req
->start_time
;
1654 /* Postponed requests will not have their master_bio completed! */
1655 __req_mod(req
, DISCARD_WRITE
, NULL
);
1656 spin_unlock_irqrestore(&mdev
->tconn
->req_lock
, flags
);
1658 while (__drbd_make_request(mdev
, bio
, start_time
))
1663 static void restart_conflicting_writes(struct drbd_conf
*mdev
,
1664 sector_t sector
, int size
)
1666 struct drbd_interval
*i
;
1667 struct drbd_request
*req
;
1669 drbd_for_each_overlap(i
, &mdev
->write_requests
, sector
, size
) {
1672 req
= container_of(i
, struct drbd_request
, i
);
1673 if (req
->rq_state
& RQ_LOCAL_PENDING
||
1674 !(req
->rq_state
& RQ_POSTPONED
))
1676 if (expect(list_empty(&req
->w
.list
))) {
1678 req
->w
.cb
= w_restart_write
;
1679 drbd_queue_work(&mdev
->tconn
->data
.work
, &req
->w
);
1684 /* e_end_block() is called via drbd_process_done_ee().
1685 * this means this function only runs in the asender thread
1687 static int e_end_block(struct drbd_work
*w
, int cancel
)
1689 struct drbd_peer_request
*peer_req
=
1690 container_of(w
, struct drbd_peer_request
, w
);
1691 struct drbd_conf
*mdev
= w
->mdev
;
1692 sector_t sector
= peer_req
->i
.sector
;
1695 if (mdev
->tconn
->net_conf
->wire_protocol
== DRBD_PROT_C
) {
1696 if (likely((peer_req
->flags
& EE_WAS_ERROR
) == 0)) {
1697 pcmd
= (mdev
->state
.conn
>= C_SYNC_SOURCE
&&
1698 mdev
->state
.conn
<= C_PAUSED_SYNC_T
&&
1699 peer_req
->flags
& EE_MAY_SET_IN_SYNC
) ?
1700 P_RS_WRITE_ACK
: P_WRITE_ACK
;
1701 err
= drbd_send_ack(mdev
, pcmd
, peer_req
);
1702 if (pcmd
== P_RS_WRITE_ACK
)
1703 drbd_set_in_sync(mdev
, sector
, peer_req
->i
.size
);
1705 err
= drbd_send_ack(mdev
, P_NEG_ACK
, peer_req
);
1706 /* we expect it to be marked out of sync anyways...
1707 * maybe assert this? */
1711 /* we delete from the conflict detection hash _after_ we sent out the
1712 * P_WRITE_ACK / P_NEG_ACK, to get the sequence number right. */
1713 if (mdev
->tconn
->net_conf
->two_primaries
) {
1714 spin_lock_irq(&mdev
->tconn
->req_lock
);
1715 D_ASSERT(!drbd_interval_empty(&peer_req
->i
));
1716 drbd_remove_epoch_entry_interval(mdev
, peer_req
);
1717 if (peer_req
->flags
& EE_RESTART_REQUESTS
)
1718 restart_conflicting_writes(mdev
, sector
, peer_req
->i
.size
);
1719 spin_unlock_irq(&mdev
->tconn
->req_lock
);
1721 D_ASSERT(drbd_interval_empty(&peer_req
->i
));
1723 drbd_may_finish_epoch(mdev
, peer_req
->epoch
, EV_PUT
+ (cancel
? EV_CLEANUP
: 0));
1728 static int e_send_ack(struct drbd_work
*w
, enum drbd_packet ack
)
1730 struct drbd_conf
*mdev
= w
->mdev
;
1731 struct drbd_peer_request
*peer_req
=
1732 container_of(w
, struct drbd_peer_request
, w
);
1735 err
= drbd_send_ack(mdev
, ack
, peer_req
);
1741 static int e_send_discard_write(struct drbd_work
*w
, int unused
)
1743 return e_send_ack(w
, P_DISCARD_WRITE
);
1746 static int e_send_retry_write(struct drbd_work
*w
, int unused
)
1748 struct drbd_tconn
*tconn
= w
->mdev
->tconn
;
1750 return e_send_ack(w
, tconn
->agreed_pro_version
>= 100 ?
1751 P_RETRY_WRITE
: P_DISCARD_WRITE
);
1754 static bool seq_greater(u32 a
, u32 b
)
1757 * We assume 32-bit wrap-around here.
1758 * For 24-bit wrap-around, we would have to shift:
1761 return (s32
)a
- (s32
)b
> 0;
1764 static u32
seq_max(u32 a
, u32 b
)
1766 return seq_greater(a
, b
) ? a
: b
;
1769 static bool need_peer_seq(struct drbd_conf
*mdev
)
1771 struct drbd_tconn
*tconn
= mdev
->tconn
;
1774 * We only need to keep track of the last packet_seq number of our peer
1775 * if we are in dual-primary mode and we have the discard flag set; see
1776 * handle_write_conflicts().
1778 return tconn
->net_conf
->two_primaries
&&
1779 test_bit(DISCARD_CONCURRENT
, &tconn
->flags
);
1782 static void update_peer_seq(struct drbd_conf
*mdev
, unsigned int peer_seq
)
1784 unsigned int newest_peer_seq
;
1786 if (need_peer_seq(mdev
)) {
1787 spin_lock(&mdev
->peer_seq_lock
);
1788 newest_peer_seq
= seq_max(mdev
->peer_seq
, peer_seq
);
1789 mdev
->peer_seq
= newest_peer_seq
;
1790 spin_unlock(&mdev
->peer_seq_lock
);
1791 /* wake up only if we actually changed mdev->peer_seq */
1792 if (peer_seq
== newest_peer_seq
)
1793 wake_up(&mdev
->seq_wait
);
1797 /* Called from receive_Data.
1798 * Synchronize packets on sock with packets on msock.
1800 * This is here so even when a P_DATA packet traveling via sock overtook an Ack
1801 * packet traveling on msock, they are still processed in the order they have
1804 * Note: we don't care for Ack packets overtaking P_DATA packets.
1806 * In case packet_seq is larger than mdev->peer_seq number, there are
1807 * outstanding packets on the msock. We wait for them to arrive.
1808 * In case we are the logically next packet, we update mdev->peer_seq
1809 * ourselves. Correctly handles 32bit wrap around.
1811 * Assume we have a 10 GBit connection, that is about 1<<30 byte per second,
1812 * about 1<<21 sectors per second. So "worst" case, we have 1<<3 == 8 seconds
1813 * for the 24bit wrap (historical atomic_t guarantee on some archs), and we have
1814 * 1<<9 == 512 seconds aka ages for the 32bit wrap around...
1816 * returns 0 if we may process the packet,
1817 * -ERESTARTSYS if we were interrupted (by disconnect signal). */
1818 static int wait_for_and_update_peer_seq(struct drbd_conf
*mdev
, const u32 peer_seq
)
1824 if (!need_peer_seq(mdev
))
1827 spin_lock(&mdev
->peer_seq_lock
);
1829 if (!seq_greater(peer_seq
- 1, mdev
->peer_seq
)) {
1830 mdev
->peer_seq
= seq_max(mdev
->peer_seq
, peer_seq
);
1834 if (signal_pending(current
)) {
1838 prepare_to_wait(&mdev
->seq_wait
, &wait
, TASK_INTERRUPTIBLE
);
1839 spin_unlock(&mdev
->peer_seq_lock
);
1840 timeout
= mdev
->tconn
->net_conf
->ping_timeo
*HZ
/10;
1841 timeout
= schedule_timeout(timeout
);
1842 spin_lock(&mdev
->peer_seq_lock
);
1845 dev_err(DEV
, "Timed out waiting for missing ack packets; disconnecting\n");
1849 spin_unlock(&mdev
->peer_seq_lock
);
1850 finish_wait(&mdev
->seq_wait
, &wait
);
1854 /* see also bio_flags_to_wire()
1855 * DRBD_REQ_*, because we need to semantically map the flags to data packet
1856 * flags and back. We may replicate to other kernel versions. */
1857 static unsigned long wire_flags_to_bio(struct drbd_conf
*mdev
, u32 dpf
)
1859 return (dpf
& DP_RW_SYNC
? REQ_SYNC
: 0) |
1860 (dpf
& DP_FUA
? REQ_FUA
: 0) |
1861 (dpf
& DP_FLUSH
? REQ_FLUSH
: 0) |
1862 (dpf
& DP_DISCARD
? REQ_DISCARD
: 0);
1865 static void fail_postponed_requests(struct drbd_conf
*mdev
, sector_t sector
,
1868 struct drbd_interval
*i
;
1871 drbd_for_each_overlap(i
, &mdev
->write_requests
, sector
, size
) {
1872 struct drbd_request
*req
;
1873 struct bio_and_error m
;
1877 req
= container_of(i
, struct drbd_request
, i
);
1878 if (!(req
->rq_state
& RQ_POSTPONED
))
1880 req
->rq_state
&= ~RQ_POSTPONED
;
1881 __req_mod(req
, NEG_ACKED
, &m
);
1882 spin_unlock_irq(&mdev
->tconn
->req_lock
);
1884 complete_master_bio(mdev
, &m
);
1885 spin_lock_irq(&mdev
->tconn
->req_lock
);
1890 static int handle_write_conflicts(struct drbd_conf
*mdev
,
1891 struct drbd_peer_request
*peer_req
)
1893 struct drbd_tconn
*tconn
= mdev
->tconn
;
1894 bool resolve_conflicts
= test_bit(DISCARD_CONCURRENT
, &tconn
->flags
);
1895 sector_t sector
= peer_req
->i
.sector
;
1896 const unsigned int size
= peer_req
->i
.size
;
1897 struct drbd_interval
*i
;
1902 * Inserting the peer request into the write_requests tree will prevent
1903 * new conflicting local requests from being added.
1905 drbd_insert_interval(&mdev
->write_requests
, &peer_req
->i
);
1908 drbd_for_each_overlap(i
, &mdev
->write_requests
, sector
, size
) {
1909 if (i
== &peer_req
->i
)
1914 * Our peer has sent a conflicting remote request; this
1915 * should not happen in a two-node setup. Wait for the
1916 * earlier peer request to complete.
1918 err
= drbd_wait_misc(mdev
, i
);
1924 equal
= i
->sector
== sector
&& i
->size
== size
;
1925 if (resolve_conflicts
) {
1927 * If the peer request is fully contained within the
1928 * overlapping request, it can be discarded; otherwise,
1929 * it will be retried once all overlapping requests
1932 bool discard
= i
->sector
<= sector
&& i
->sector
+
1933 (i
->size
>> 9) >= sector
+ (size
>> 9);
1936 dev_alert(DEV
, "Concurrent writes detected: "
1937 "local=%llus +%u, remote=%llus +%u, "
1938 "assuming %s came first\n",
1939 (unsigned long long)i
->sector
, i
->size
,
1940 (unsigned long long)sector
, size
,
1941 discard
? "local" : "remote");
1944 peer_req
->w
.cb
= discard
? e_send_discard_write
:
1946 list_add_tail(&peer_req
->w
.list
, &mdev
->done_ee
);
1947 wake_asender(mdev
->tconn
);
1952 struct drbd_request
*req
=
1953 container_of(i
, struct drbd_request
, i
);
1956 dev_alert(DEV
, "Concurrent writes detected: "
1957 "local=%llus +%u, remote=%llus +%u\n",
1958 (unsigned long long)i
->sector
, i
->size
,
1959 (unsigned long long)sector
, size
);
1961 if (req
->rq_state
& RQ_LOCAL_PENDING
||
1962 !(req
->rq_state
& RQ_POSTPONED
)) {
1964 * Wait for the node with the discard flag to
1965 * decide if this request will be discarded or
1966 * retried. Requests that are discarded will
1967 * disappear from the write_requests tree.
1969 * In addition, wait for the conflicting
1970 * request to finish locally before submitting
1971 * the conflicting peer request.
1973 err
= drbd_wait_misc(mdev
, &req
->i
);
1975 _conn_request_state(mdev
->tconn
,
1976 NS(conn
, C_TIMEOUT
),
1978 fail_postponed_requests(mdev
, sector
, size
);
1984 * Remember to restart the conflicting requests after
1985 * the new peer request has completed.
1987 peer_req
->flags
|= EE_RESTART_REQUESTS
;
1994 drbd_remove_epoch_entry_interval(mdev
, peer_req
);
1998 /* mirrored write */
1999 static int receive_Data(struct drbd_tconn
*tconn
, struct packet_info
*pi
)
2001 struct drbd_conf
*mdev
;
2003 struct drbd_peer_request
*peer_req
;
2004 struct p_data
*p
= pi
->data
;
2005 u32 peer_seq
= be32_to_cpu(p
->seq_num
);
2010 mdev
= vnr_to_mdev(tconn
, pi
->vnr
);
2014 if (!get_ldev(mdev
)) {
2017 err
= wait_for_and_update_peer_seq(mdev
, peer_seq
);
2018 drbd_send_ack_dp(mdev
, P_NEG_ACK
, p
, pi
->size
);
2019 atomic_inc(&mdev
->current_epoch
->epoch_size
);
2020 err2
= drbd_drain_block(mdev
, pi
->size
);
2027 * Corresponding put_ldev done either below (on various errors), or in
2028 * drbd_peer_request_endio, if we successfully submit the data at the
2029 * end of this function.
2032 sector
= be64_to_cpu(p
->sector
);
2033 peer_req
= read_in_block(mdev
, p
->block_id
, sector
, pi
->size
);
2039 peer_req
->w
.cb
= e_end_block
;
2041 dp_flags
= be32_to_cpu(p
->dp_flags
);
2042 rw
|= wire_flags_to_bio(mdev
, dp_flags
);
2044 if (dp_flags
& DP_MAY_SET_IN_SYNC
)
2045 peer_req
->flags
|= EE_MAY_SET_IN_SYNC
;
2047 spin_lock(&mdev
->epoch_lock
);
2048 peer_req
->epoch
= mdev
->current_epoch
;
2049 atomic_inc(&peer_req
->epoch
->epoch_size
);
2050 atomic_inc(&peer_req
->epoch
->active
);
2051 spin_unlock(&mdev
->epoch_lock
);
2053 if (mdev
->tconn
->net_conf
->two_primaries
) {
2054 err
= wait_for_and_update_peer_seq(mdev
, peer_seq
);
2056 goto out_interrupted
;
2057 spin_lock_irq(&mdev
->tconn
->req_lock
);
2058 err
= handle_write_conflicts(mdev
, peer_req
);
2060 spin_unlock_irq(&mdev
->tconn
->req_lock
);
2061 if (err
== -ENOENT
) {
2065 goto out_interrupted
;
2068 spin_lock_irq(&mdev
->tconn
->req_lock
);
2069 list_add(&peer_req
->w
.list
, &mdev
->active_ee
);
2070 spin_unlock_irq(&mdev
->tconn
->req_lock
);
2072 switch (mdev
->tconn
->net_conf
->wire_protocol
) {
2075 /* corresponding dec_unacked() in e_end_block()
2076 * respective _drbd_clear_done_ee */
2079 /* I really don't like it that the receiver thread
2080 * sends on the msock, but anyways */
2081 drbd_send_ack(mdev
, P_RECV_ACK
, peer_req
);
2088 if (mdev
->state
.pdsk
< D_INCONSISTENT
) {
2089 /* In case we have the only disk of the cluster, */
2090 drbd_set_out_of_sync(mdev
, peer_req
->i
.sector
, peer_req
->i
.size
);
2091 peer_req
->flags
|= EE_CALL_AL_COMPLETE_IO
;
2092 peer_req
->flags
&= ~EE_MAY_SET_IN_SYNC
;
2093 drbd_al_begin_io(mdev
, &peer_req
->i
);
2096 err
= drbd_submit_peer_request(mdev
, peer_req
, rw
, DRBD_FAULT_DT_WR
);
2100 /* don't care for the reason here */
2101 dev_err(DEV
, "submit failed, triggering re-connect\n");
2102 spin_lock_irq(&mdev
->tconn
->req_lock
);
2103 list_del(&peer_req
->w
.list
);
2104 drbd_remove_epoch_entry_interval(mdev
, peer_req
);
2105 spin_unlock_irq(&mdev
->tconn
->req_lock
);
2106 if (peer_req
->flags
& EE_CALL_AL_COMPLETE_IO
)
2107 drbd_al_complete_io(mdev
, &peer_req
->i
);
2110 drbd_may_finish_epoch(mdev
, peer_req
->epoch
, EV_PUT
+ EV_CLEANUP
);
2112 drbd_free_peer_req(mdev
, peer_req
);
2116 /* We may throttle resync, if the lower device seems to be busy,
2117 * and current sync rate is above c_min_rate.
2119 * To decide whether or not the lower device is busy, we use a scheme similar
2120 * to MD RAID is_mddev_idle(): if the partition stats reveal "significant"
2121 * (more than 64 sectors) of activity we cannot account for with our own resync
2122 * activity, it obviously is "busy".
2124 * The current sync rate used here uses only the most recent two step marks,
2125 * to have a short time average so we can react faster.
2127 int drbd_rs_should_slow_down(struct drbd_conf
*mdev
, sector_t sector
)
2129 struct gendisk
*disk
= mdev
->ldev
->backing_bdev
->bd_contains
->bd_disk
;
2130 unsigned long db
, dt
, dbdt
;
2131 struct lc_element
*tmp
;
2135 /* feature disabled? */
2136 if (mdev
->ldev
->dc
.c_min_rate
== 0)
2139 spin_lock_irq(&mdev
->al_lock
);
2140 tmp
= lc_find(mdev
->resync
, BM_SECT_TO_EXT(sector
));
2142 struct bm_extent
*bm_ext
= lc_entry(tmp
, struct bm_extent
, lce
);
2143 if (test_bit(BME_PRIORITY
, &bm_ext
->flags
)) {
2144 spin_unlock_irq(&mdev
->al_lock
);
2147 /* Do not slow down if app IO is already waiting for this extent */
2149 spin_unlock_irq(&mdev
->al_lock
);
2151 curr_events
= (int)part_stat_read(&disk
->part0
, sectors
[0]) +
2152 (int)part_stat_read(&disk
->part0
, sectors
[1]) -
2153 atomic_read(&mdev
->rs_sect_ev
);
2155 if (!mdev
->rs_last_events
|| curr_events
- mdev
->rs_last_events
> 64) {
2156 unsigned long rs_left
;
2159 mdev
->rs_last_events
= curr_events
;
2161 /* sync speed average over the last 2*DRBD_SYNC_MARK_STEP,
2163 i
= (mdev
->rs_last_mark
+ DRBD_SYNC_MARKS
-1) % DRBD_SYNC_MARKS
;
2165 if (mdev
->state
.conn
== C_VERIFY_S
|| mdev
->state
.conn
== C_VERIFY_T
)
2166 rs_left
= mdev
->ov_left
;
2168 rs_left
= drbd_bm_total_weight(mdev
) - mdev
->rs_failed
;
2170 dt
= ((long)jiffies
- (long)mdev
->rs_mark_time
[i
]) / HZ
;
2173 db
= mdev
->rs_mark_left
[i
] - rs_left
;
2174 dbdt
= Bit2KB(db
/dt
);
2176 if (dbdt
> mdev
->ldev
->dc
.c_min_rate
)
2183 static int receive_DataRequest(struct drbd_tconn
*tconn
, struct packet_info
*pi
)
2185 struct drbd_conf
*mdev
;
2188 struct drbd_peer_request
*peer_req
;
2189 struct digest_info
*di
= NULL
;
2191 unsigned int fault_type
;
2192 struct p_block_req
*p
= pi
->data
;
2194 mdev
= vnr_to_mdev(tconn
, pi
->vnr
);
2197 capacity
= drbd_get_capacity(mdev
->this_bdev
);
2199 sector
= be64_to_cpu(p
->sector
);
2200 size
= be32_to_cpu(p
->blksize
);
2202 if (size
<= 0 || !IS_ALIGNED(size
, 512) || size
> DRBD_MAX_BIO_SIZE
) {
2203 dev_err(DEV
, "%s:%d: sector: %llus, size: %u\n", __FILE__
, __LINE__
,
2204 (unsigned long long)sector
, size
);
2207 if (sector
+ (size
>>9) > capacity
) {
2208 dev_err(DEV
, "%s:%d: sector: %llus, size: %u\n", __FILE__
, __LINE__
,
2209 (unsigned long long)sector
, size
);
2213 if (!get_ldev_if_state(mdev
, D_UP_TO_DATE
)) {
2216 case P_DATA_REQUEST
:
2217 drbd_send_ack_rp(mdev
, P_NEG_DREPLY
, p
);
2219 case P_RS_DATA_REQUEST
:
2220 case P_CSUM_RS_REQUEST
:
2222 drbd_send_ack_rp(mdev
, P_NEG_RS_DREPLY
, p
);
2226 dec_rs_pending(mdev
);
2227 drbd_send_ack_ex(mdev
, P_OV_RESULT
, sector
, size
, ID_IN_SYNC
);
2232 if (verb
&& __ratelimit(&drbd_ratelimit_state
))
2233 dev_err(DEV
, "Can not satisfy peer's read request, "
2234 "no local data.\n");
2236 /* drain possibly payload */
2237 return drbd_drain_block(mdev
, pi
->size
);
2240 /* GFP_NOIO, because we must not cause arbitrary write-out: in a DRBD
2241 * "criss-cross" setup, that might cause write-out on some other DRBD,
2242 * which in turn might block on the other node at this very place. */
2243 peer_req
= drbd_alloc_peer_req(mdev
, p
->block_id
, sector
, size
, GFP_NOIO
);
2250 case P_DATA_REQUEST
:
2251 peer_req
->w
.cb
= w_e_end_data_req
;
2252 fault_type
= DRBD_FAULT_DT_RD
;
2253 /* application IO, don't drbd_rs_begin_io */
2256 case P_RS_DATA_REQUEST
:
2257 peer_req
->w
.cb
= w_e_end_rsdata_req
;
2258 fault_type
= DRBD_FAULT_RS_RD
;
2259 /* used in the sector offset progress display */
2260 mdev
->bm_resync_fo
= BM_SECT_TO_BIT(sector
);
2264 case P_CSUM_RS_REQUEST
:
2265 fault_type
= DRBD_FAULT_RS_RD
;
2266 di
= kmalloc(sizeof(*di
) + pi
->size
, GFP_NOIO
);
2270 di
->digest_size
= pi
->size
;
2271 di
->digest
= (((char *)di
)+sizeof(struct digest_info
));
2273 peer_req
->digest
= di
;
2274 peer_req
->flags
|= EE_HAS_DIGEST
;
2276 if (drbd_recv_all(mdev
->tconn
, di
->digest
, pi
->size
))
2279 if (pi
->cmd
== P_CSUM_RS_REQUEST
) {
2280 D_ASSERT(mdev
->tconn
->agreed_pro_version
>= 89);
2281 peer_req
->w
.cb
= w_e_end_csum_rs_req
;
2282 /* used in the sector offset progress display */
2283 mdev
->bm_resync_fo
= BM_SECT_TO_BIT(sector
);
2284 } else if (pi
->cmd
== P_OV_REPLY
) {
2285 /* track progress, we may need to throttle */
2286 atomic_add(size
>> 9, &mdev
->rs_sect_in
);
2287 peer_req
->w
.cb
= w_e_end_ov_reply
;
2288 dec_rs_pending(mdev
);
2289 /* drbd_rs_begin_io done when we sent this request,
2290 * but accounting still needs to be done. */
2291 goto submit_for_resync
;
2296 if (mdev
->ov_start_sector
== ~(sector_t
)0 &&
2297 mdev
->tconn
->agreed_pro_version
>= 90) {
2298 unsigned long now
= jiffies
;
2300 mdev
->ov_start_sector
= sector
;
2301 mdev
->ov_position
= sector
;
2302 mdev
->ov_left
= drbd_bm_bits(mdev
) - BM_SECT_TO_BIT(sector
);
2303 mdev
->rs_total
= mdev
->ov_left
;
2304 for (i
= 0; i
< DRBD_SYNC_MARKS
; i
++) {
2305 mdev
->rs_mark_left
[i
] = mdev
->ov_left
;
2306 mdev
->rs_mark_time
[i
] = now
;
2308 dev_info(DEV
, "Online Verify start sector: %llu\n",
2309 (unsigned long long)sector
);
2311 peer_req
->w
.cb
= w_e_end_ov_req
;
2312 fault_type
= DRBD_FAULT_RS_RD
;
2319 /* Throttle, drbd_rs_begin_io and submit should become asynchronous
2320 * wrt the receiver, but it is not as straightforward as it may seem.
2321 * Various places in the resync start and stop logic assume resync
2322 * requests are processed in order, requeuing this on the worker thread
2323 * introduces a bunch of new code for synchronization between threads.
2325 * Unlimited throttling before drbd_rs_begin_io may stall the resync
2326 * "forever", throttling after drbd_rs_begin_io will lock that extent
2327 * for application writes for the same time. For now, just throttle
2328 * here, where the rest of the code expects the receiver to sleep for
2332 /* Throttle before drbd_rs_begin_io, as that locks out application IO;
2333 * this defers syncer requests for some time, before letting at least
2334 * on request through. The resync controller on the receiving side
2335 * will adapt to the incoming rate accordingly.
2337 * We cannot throttle here if remote is Primary/SyncTarget:
2338 * we would also throttle its application reads.
2339 * In that case, throttling is done on the SyncTarget only.
2341 if (mdev
->state
.peer
!= R_PRIMARY
&& drbd_rs_should_slow_down(mdev
, sector
))
2342 schedule_timeout_uninterruptible(HZ
/10);
2343 if (drbd_rs_begin_io(mdev
, sector
))
2347 atomic_add(size
>> 9, &mdev
->rs_sect_ev
);
2351 spin_lock_irq(&mdev
->tconn
->req_lock
);
2352 list_add_tail(&peer_req
->w
.list
, &mdev
->read_ee
);
2353 spin_unlock_irq(&mdev
->tconn
->req_lock
);
2355 if (drbd_submit_peer_request(mdev
, peer_req
, READ
, fault_type
) == 0)
2358 /* don't care for the reason here */
2359 dev_err(DEV
, "submit failed, triggering re-connect\n");
2360 spin_lock_irq(&mdev
->tconn
->req_lock
);
2361 list_del(&peer_req
->w
.list
);
2362 spin_unlock_irq(&mdev
->tconn
->req_lock
);
2363 /* no drbd_rs_complete_io(), we are dropping the connection anyways */
2367 drbd_free_peer_req(mdev
, peer_req
);
2371 static int drbd_asb_recover_0p(struct drbd_conf
*mdev
) __must_hold(local
)
2373 int self
, peer
, rv
= -100;
2374 unsigned long ch_self
, ch_peer
;
2376 self
= mdev
->ldev
->md
.uuid
[UI_BITMAP
] & 1;
2377 peer
= mdev
->p_uuid
[UI_BITMAP
] & 1;
2379 ch_peer
= mdev
->p_uuid
[UI_SIZE
];
2380 ch_self
= mdev
->comm_bm_set
;
2382 switch (mdev
->tconn
->net_conf
->after_sb_0p
) {
2384 case ASB_DISCARD_SECONDARY
:
2385 case ASB_CALL_HELPER
:
2386 dev_err(DEV
, "Configuration error.\n");
2388 case ASB_DISCONNECT
:
2390 case ASB_DISCARD_YOUNGER_PRI
:
2391 if (self
== 0 && peer
== 1) {
2395 if (self
== 1 && peer
== 0) {
2399 /* Else fall through to one of the other strategies... */
2400 case ASB_DISCARD_OLDER_PRI
:
2401 if (self
== 0 && peer
== 1) {
2405 if (self
== 1 && peer
== 0) {
2409 /* Else fall through to one of the other strategies... */
2410 dev_warn(DEV
, "Discard younger/older primary did not find a decision\n"
2411 "Using discard-least-changes instead\n");
2412 case ASB_DISCARD_ZERO_CHG
:
2413 if (ch_peer
== 0 && ch_self
== 0) {
2414 rv
= test_bit(DISCARD_CONCURRENT
, &mdev
->tconn
->flags
)
2418 if (ch_peer
== 0) { rv
= 1; break; }
2419 if (ch_self
== 0) { rv
= -1; break; }
2421 if (mdev
->tconn
->net_conf
->after_sb_0p
== ASB_DISCARD_ZERO_CHG
)
2423 case ASB_DISCARD_LEAST_CHG
:
2424 if (ch_self
< ch_peer
)
2426 else if (ch_self
> ch_peer
)
2428 else /* ( ch_self == ch_peer ) */
2429 /* Well, then use something else. */
2430 rv
= test_bit(DISCARD_CONCURRENT
, &mdev
->tconn
->flags
)
2433 case ASB_DISCARD_LOCAL
:
2436 case ASB_DISCARD_REMOTE
:
2443 static int drbd_asb_recover_1p(struct drbd_conf
*mdev
) __must_hold(local
)
2447 switch (mdev
->tconn
->net_conf
->after_sb_1p
) {
2448 case ASB_DISCARD_YOUNGER_PRI
:
2449 case ASB_DISCARD_OLDER_PRI
:
2450 case ASB_DISCARD_LEAST_CHG
:
2451 case ASB_DISCARD_LOCAL
:
2452 case ASB_DISCARD_REMOTE
:
2453 dev_err(DEV
, "Configuration error.\n");
2455 case ASB_DISCONNECT
:
2458 hg
= drbd_asb_recover_0p(mdev
);
2459 if (hg
== -1 && mdev
->state
.role
== R_SECONDARY
)
2461 if (hg
== 1 && mdev
->state
.role
== R_PRIMARY
)
2465 rv
= drbd_asb_recover_0p(mdev
);
2467 case ASB_DISCARD_SECONDARY
:
2468 return mdev
->state
.role
== R_PRIMARY
? 1 : -1;
2469 case ASB_CALL_HELPER
:
2470 hg
= drbd_asb_recover_0p(mdev
);
2471 if (hg
== -1 && mdev
->state
.role
== R_PRIMARY
) {
2472 enum drbd_state_rv rv2
;
2474 drbd_set_role(mdev
, R_SECONDARY
, 0);
2475 /* drbd_change_state() does not sleep while in SS_IN_TRANSIENT_STATE,
2476 * we might be here in C_WF_REPORT_PARAMS which is transient.
2477 * we do not need to wait for the after state change work either. */
2478 rv2
= drbd_change_state(mdev
, CS_VERBOSE
, NS(role
, R_SECONDARY
));
2479 if (rv2
!= SS_SUCCESS
) {
2480 drbd_khelper(mdev
, "pri-lost-after-sb");
2482 dev_warn(DEV
, "Successfully gave up primary role.\n");
2492 static int drbd_asb_recover_2p(struct drbd_conf
*mdev
) __must_hold(local
)
2496 switch (mdev
->tconn
->net_conf
->after_sb_2p
) {
2497 case ASB_DISCARD_YOUNGER_PRI
:
2498 case ASB_DISCARD_OLDER_PRI
:
2499 case ASB_DISCARD_LEAST_CHG
:
2500 case ASB_DISCARD_LOCAL
:
2501 case ASB_DISCARD_REMOTE
:
2503 case ASB_DISCARD_SECONDARY
:
2504 dev_err(DEV
, "Configuration error.\n");
2507 rv
= drbd_asb_recover_0p(mdev
);
2509 case ASB_DISCONNECT
:
2511 case ASB_CALL_HELPER
:
2512 hg
= drbd_asb_recover_0p(mdev
);
2514 enum drbd_state_rv rv2
;
2516 /* drbd_change_state() does not sleep while in SS_IN_TRANSIENT_STATE,
2517 * we might be here in C_WF_REPORT_PARAMS which is transient.
2518 * we do not need to wait for the after state change work either. */
2519 rv2
= drbd_change_state(mdev
, CS_VERBOSE
, NS(role
, R_SECONDARY
));
2520 if (rv2
!= SS_SUCCESS
) {
2521 drbd_khelper(mdev
, "pri-lost-after-sb");
2523 dev_warn(DEV
, "Successfully gave up primary role.\n");
2533 static void drbd_uuid_dump(struct drbd_conf
*mdev
, char *text
, u64
*uuid
,
2534 u64 bits
, u64 flags
)
2537 dev_info(DEV
, "%s uuid info vanished while I was looking!\n", text
);
2540 dev_info(DEV
, "%s %016llX:%016llX:%016llX:%016llX bits:%llu flags:%llX\n",
2542 (unsigned long long)uuid
[UI_CURRENT
],
2543 (unsigned long long)uuid
[UI_BITMAP
],
2544 (unsigned long long)uuid
[UI_HISTORY_START
],
2545 (unsigned long long)uuid
[UI_HISTORY_END
],
2546 (unsigned long long)bits
,
2547 (unsigned long long)flags
);
2551 100 after split brain try auto recover
2552 2 C_SYNC_SOURCE set BitMap
2553 1 C_SYNC_SOURCE use BitMap
2555 -1 C_SYNC_TARGET use BitMap
2556 -2 C_SYNC_TARGET set BitMap
2557 -100 after split brain, disconnect
2558 -1000 unrelated data
2559 -1091 requires proto 91
2560 -1096 requires proto 96
2562 static int drbd_uuid_compare(struct drbd_conf
*mdev
, int *rule_nr
) __must_hold(local
)
2567 self
= mdev
->ldev
->md
.uuid
[UI_CURRENT
] & ~((u64
)1);
2568 peer
= mdev
->p_uuid
[UI_CURRENT
] & ~((u64
)1);
2571 if (self
== UUID_JUST_CREATED
&& peer
== UUID_JUST_CREATED
)
2575 if ((self
== UUID_JUST_CREATED
|| self
== (u64
)0) &&
2576 peer
!= UUID_JUST_CREATED
)
2580 if (self
!= UUID_JUST_CREATED
&&
2581 (peer
== UUID_JUST_CREATED
|| peer
== (u64
)0))
2585 int rct
, dc
; /* roles at crash time */
2587 if (mdev
->p_uuid
[UI_BITMAP
] == (u64
)0 && mdev
->ldev
->md
.uuid
[UI_BITMAP
] != (u64
)0) {
2589 if (mdev
->tconn
->agreed_pro_version
< 91)
2592 if ((mdev
->ldev
->md
.uuid
[UI_BITMAP
] & ~((u64
)1)) == (mdev
->p_uuid
[UI_HISTORY_START
] & ~((u64
)1)) &&
2593 (mdev
->ldev
->md
.uuid
[UI_HISTORY_START
] & ~((u64
)1)) == (mdev
->p_uuid
[UI_HISTORY_START
+ 1] & ~((u64
)1))) {
2594 dev_info(DEV
, "was SyncSource, missed the resync finished event, corrected myself:\n");
2595 drbd_uuid_set_bm(mdev
, 0UL);
2597 drbd_uuid_dump(mdev
, "self", mdev
->ldev
->md
.uuid
,
2598 mdev
->state
.disk
>= D_NEGOTIATING
? drbd_bm_total_weight(mdev
) : 0, 0);
2601 dev_info(DEV
, "was SyncSource (peer failed to write sync_uuid)\n");
2608 if (mdev
->ldev
->md
.uuid
[UI_BITMAP
] == (u64
)0 && mdev
->p_uuid
[UI_BITMAP
] != (u64
)0) {
2610 if (mdev
->tconn
->agreed_pro_version
< 91)
2613 if ((mdev
->ldev
->md
.uuid
[UI_HISTORY_START
] & ~((u64
)1)) == (mdev
->p_uuid
[UI_BITMAP
] & ~((u64
)1)) &&
2614 (mdev
->ldev
->md
.uuid
[UI_HISTORY_START
+ 1] & ~((u64
)1)) == (mdev
->p_uuid
[UI_HISTORY_START
] & ~((u64
)1))) {
2615 dev_info(DEV
, "was SyncTarget, peer missed the resync finished event, corrected peer:\n");
2617 mdev
->p_uuid
[UI_HISTORY_START
+ 1] = mdev
->p_uuid
[UI_HISTORY_START
];
2618 mdev
->p_uuid
[UI_HISTORY_START
] = mdev
->p_uuid
[UI_BITMAP
];
2619 mdev
->p_uuid
[UI_BITMAP
] = 0UL;
2621 drbd_uuid_dump(mdev
, "peer", mdev
->p_uuid
, mdev
->p_uuid
[UI_SIZE
], mdev
->p_uuid
[UI_FLAGS
]);
2624 dev_info(DEV
, "was SyncTarget (failed to write sync_uuid)\n");
2631 /* Common power [off|failure] */
2632 rct
= (test_bit(CRASHED_PRIMARY
, &mdev
->flags
) ? 1 : 0) +
2633 (mdev
->p_uuid
[UI_FLAGS
] & 2);
2634 /* lowest bit is set when we were primary,
2635 * next bit (weight 2) is set when peer was primary */
2639 case 0: /* !self_pri && !peer_pri */ return 0;
2640 case 1: /* self_pri && !peer_pri */ return 1;
2641 case 2: /* !self_pri && peer_pri */ return -1;
2642 case 3: /* self_pri && peer_pri */
2643 dc
= test_bit(DISCARD_CONCURRENT
, &mdev
->tconn
->flags
);
2649 peer
= mdev
->p_uuid
[UI_BITMAP
] & ~((u64
)1);
2654 peer
= mdev
->p_uuid
[UI_HISTORY_START
] & ~((u64
)1);
2656 if (mdev
->tconn
->agreed_pro_version
< 96 ?
2657 (mdev
->ldev
->md
.uuid
[UI_HISTORY_START
] & ~((u64
)1)) ==
2658 (mdev
->p_uuid
[UI_HISTORY_START
+ 1] & ~((u64
)1)) :
2659 peer
+ UUID_NEW_BM_OFFSET
== (mdev
->p_uuid
[UI_BITMAP
] & ~((u64
)1))) {
2660 /* The last P_SYNC_UUID did not get though. Undo the last start of
2661 resync as sync source modifications of the peer's UUIDs. */
2663 if (mdev
->tconn
->agreed_pro_version
< 91)
2666 mdev
->p_uuid
[UI_BITMAP
] = mdev
->p_uuid
[UI_HISTORY_START
];
2667 mdev
->p_uuid
[UI_HISTORY_START
] = mdev
->p_uuid
[UI_HISTORY_START
+ 1];
2669 dev_info(DEV
, "Did not got last syncUUID packet, corrected:\n");
2670 drbd_uuid_dump(mdev
, "peer", mdev
->p_uuid
, mdev
->p_uuid
[UI_SIZE
], mdev
->p_uuid
[UI_FLAGS
]);
2677 self
= mdev
->ldev
->md
.uuid
[UI_CURRENT
] & ~((u64
)1);
2678 for (i
= UI_HISTORY_START
; i
<= UI_HISTORY_END
; i
++) {
2679 peer
= mdev
->p_uuid
[i
] & ~((u64
)1);
2685 self
= mdev
->ldev
->md
.uuid
[UI_BITMAP
] & ~((u64
)1);
2686 peer
= mdev
->p_uuid
[UI_CURRENT
] & ~((u64
)1);
2691 self
= mdev
->ldev
->md
.uuid
[UI_HISTORY_START
] & ~((u64
)1);
2693 if (mdev
->tconn
->agreed_pro_version
< 96 ?
2694 (mdev
->ldev
->md
.uuid
[UI_HISTORY_START
+ 1] & ~((u64
)1)) ==
2695 (mdev
->p_uuid
[UI_HISTORY_START
] & ~((u64
)1)) :
2696 self
+ UUID_NEW_BM_OFFSET
== (mdev
->ldev
->md
.uuid
[UI_BITMAP
] & ~((u64
)1))) {
2697 /* The last P_SYNC_UUID did not get though. Undo the last start of
2698 resync as sync source modifications of our UUIDs. */
2700 if (mdev
->tconn
->agreed_pro_version
< 91)
2703 _drbd_uuid_set(mdev
, UI_BITMAP
, mdev
->ldev
->md
.uuid
[UI_HISTORY_START
]);
2704 _drbd_uuid_set(mdev
, UI_HISTORY_START
, mdev
->ldev
->md
.uuid
[UI_HISTORY_START
+ 1]);
2706 dev_info(DEV
, "Last syncUUID did not get through, corrected:\n");
2707 drbd_uuid_dump(mdev
, "self", mdev
->ldev
->md
.uuid
,
2708 mdev
->state
.disk
>= D_NEGOTIATING
? drbd_bm_total_weight(mdev
) : 0, 0);
2716 peer
= mdev
->p_uuid
[UI_CURRENT
] & ~((u64
)1);
2717 for (i
= UI_HISTORY_START
; i
<= UI_HISTORY_END
; i
++) {
2718 self
= mdev
->ldev
->md
.uuid
[i
] & ~((u64
)1);
2724 self
= mdev
->ldev
->md
.uuid
[UI_BITMAP
] & ~((u64
)1);
2725 peer
= mdev
->p_uuid
[UI_BITMAP
] & ~((u64
)1);
2726 if (self
== peer
&& self
!= ((u64
)0))
2730 for (i
= UI_HISTORY_START
; i
<= UI_HISTORY_END
; i
++) {
2731 self
= mdev
->ldev
->md
.uuid
[i
] & ~((u64
)1);
2732 for (j
= UI_HISTORY_START
; j
<= UI_HISTORY_END
; j
++) {
2733 peer
= mdev
->p_uuid
[j
] & ~((u64
)1);
2742 /* drbd_sync_handshake() returns the new conn state on success, or
2743 CONN_MASK (-1) on failure.
2745 static enum drbd_conns
drbd_sync_handshake(struct drbd_conf
*mdev
, enum drbd_role peer_role
,
2746 enum drbd_disk_state peer_disk
) __must_hold(local
)
2749 enum drbd_conns rv
= C_MASK
;
2750 enum drbd_disk_state mydisk
;
2752 mydisk
= mdev
->state
.disk
;
2753 if (mydisk
== D_NEGOTIATING
)
2754 mydisk
= mdev
->new_state_tmp
.disk
;
2756 dev_info(DEV
, "drbd_sync_handshake:\n");
2757 drbd_uuid_dump(mdev
, "self", mdev
->ldev
->md
.uuid
, mdev
->comm_bm_set
, 0);
2758 drbd_uuid_dump(mdev
, "peer", mdev
->p_uuid
,
2759 mdev
->p_uuid
[UI_SIZE
], mdev
->p_uuid
[UI_FLAGS
]);
2761 hg
= drbd_uuid_compare(mdev
, &rule_nr
);
2763 dev_info(DEV
, "uuid_compare()=%d by rule %d\n", hg
, rule_nr
);
2766 dev_alert(DEV
, "Unrelated data, aborting!\n");
2770 dev_alert(DEV
, "To resolve this both sides have to support at least protocol %d\n", -hg
- 1000);
2774 if ((mydisk
== D_INCONSISTENT
&& peer_disk
> D_INCONSISTENT
) ||
2775 (peer_disk
== D_INCONSISTENT
&& mydisk
> D_INCONSISTENT
)) {
2776 int f
= (hg
== -100) || abs(hg
) == 2;
2777 hg
= mydisk
> D_INCONSISTENT
? 1 : -1;
2780 dev_info(DEV
, "Becoming sync %s due to disk states.\n",
2781 hg
> 0 ? "source" : "target");
2785 drbd_khelper(mdev
, "initial-split-brain");
2787 if (hg
== 100 || (hg
== -100 && mdev
->tconn
->net_conf
->always_asbp
)) {
2788 int pcount
= (mdev
->state
.role
== R_PRIMARY
)
2789 + (peer_role
== R_PRIMARY
);
2790 int forced
= (hg
== -100);
2794 hg
= drbd_asb_recover_0p(mdev
);
2797 hg
= drbd_asb_recover_1p(mdev
);
2800 hg
= drbd_asb_recover_2p(mdev
);
2803 if (abs(hg
) < 100) {
2804 dev_warn(DEV
, "Split-Brain detected, %d primaries, "
2805 "automatically solved. Sync from %s node\n",
2806 pcount
, (hg
< 0) ? "peer" : "this");
2808 dev_warn(DEV
, "Doing a full sync, since"
2809 " UUIDs where ambiguous.\n");
2816 if (mdev
->tconn
->net_conf
->want_lose
&& !(mdev
->p_uuid
[UI_FLAGS
]&1))
2818 if (!mdev
->tconn
->net_conf
->want_lose
&& (mdev
->p_uuid
[UI_FLAGS
]&1))
2822 dev_warn(DEV
, "Split-Brain detected, manually solved. "
2823 "Sync from %s node\n",
2824 (hg
< 0) ? "peer" : "this");
2828 /* FIXME this log message is not correct if we end up here
2829 * after an attempted attach on a diskless node.
2830 * We just refuse to attach -- well, we drop the "connection"
2831 * to that disk, in a way... */
2832 dev_alert(DEV
, "Split-Brain detected but unresolved, dropping connection!\n");
2833 drbd_khelper(mdev
, "split-brain");
2837 if (hg
> 0 && mydisk
<= D_INCONSISTENT
) {
2838 dev_err(DEV
, "I shall become SyncSource, but I am inconsistent!\n");
2842 if (hg
< 0 && /* by intention we do not use mydisk here. */
2843 mdev
->state
.role
== R_PRIMARY
&& mdev
->state
.disk
>= D_CONSISTENT
) {
2844 switch (mdev
->tconn
->net_conf
->rr_conflict
) {
2845 case ASB_CALL_HELPER
:
2846 drbd_khelper(mdev
, "pri-lost");
2848 case ASB_DISCONNECT
:
2849 dev_err(DEV
, "I shall become SyncTarget, but I am primary!\n");
2852 dev_warn(DEV
, "Becoming SyncTarget, violating the stable-data"
2857 if (mdev
->tconn
->net_conf
->dry_run
|| test_bit(CONN_DRY_RUN
, &mdev
->tconn
->flags
)) {
2859 dev_info(DEV
, "dry-run connect: No resync, would become Connected immediately.\n");
2861 dev_info(DEV
, "dry-run connect: Would become %s, doing a %s resync.",
2862 drbd_conn_str(hg
> 0 ? C_SYNC_SOURCE
: C_SYNC_TARGET
),
2863 abs(hg
) >= 2 ? "full" : "bit-map based");
2868 dev_info(DEV
, "Writing the whole bitmap, full sync required after drbd_sync_handshake.\n");
2869 if (drbd_bitmap_io(mdev
, &drbd_bmio_set_n_write
, "set_n_write from sync_handshake",
2870 BM_LOCKED_SET_ALLOWED
))
2874 if (hg
> 0) { /* become sync source. */
2876 } else if (hg
< 0) { /* become sync target */
2880 if (drbd_bm_total_weight(mdev
)) {
2881 dev_info(DEV
, "No resync, but %lu bits in bitmap!\n",
2882 drbd_bm_total_weight(mdev
));
2889 /* returns 1 if invalid */
2890 static int cmp_after_sb(enum drbd_after_sb_p peer
, enum drbd_after_sb_p self
)
2892 /* ASB_DISCARD_REMOTE - ASB_DISCARD_LOCAL is valid */
2893 if ((peer
== ASB_DISCARD_REMOTE
&& self
== ASB_DISCARD_LOCAL
) ||
2894 (self
== ASB_DISCARD_REMOTE
&& peer
== ASB_DISCARD_LOCAL
))
2897 /* any other things with ASB_DISCARD_REMOTE or ASB_DISCARD_LOCAL are invalid */
2898 if (peer
== ASB_DISCARD_REMOTE
|| peer
== ASB_DISCARD_LOCAL
||
2899 self
== ASB_DISCARD_REMOTE
|| self
== ASB_DISCARD_LOCAL
)
2902 /* everything else is valid if they are equal on both sides. */
2906 /* everything es is invalid. */
2910 static int receive_protocol(struct drbd_tconn
*tconn
, struct packet_info
*pi
)
2912 struct p_protocol
*p
= pi
->data
;
2913 int p_proto
, p_after_sb_0p
, p_after_sb_1p
, p_after_sb_2p
;
2914 int p_want_lose
, p_two_primaries
, cf
;
2915 char p_integrity_alg
[SHARED_SECRET_MAX
] = "";
2917 p_proto
= be32_to_cpu(p
->protocol
);
2918 p_after_sb_0p
= be32_to_cpu(p
->after_sb_0p
);
2919 p_after_sb_1p
= be32_to_cpu(p
->after_sb_1p
);
2920 p_after_sb_2p
= be32_to_cpu(p
->after_sb_2p
);
2921 p_two_primaries
= be32_to_cpu(p
->two_primaries
);
2922 cf
= be32_to_cpu(p
->conn_flags
);
2923 p_want_lose
= cf
& CF_WANT_LOSE
;
2925 clear_bit(CONN_DRY_RUN
, &tconn
->flags
);
2927 if (cf
& CF_DRY_RUN
)
2928 set_bit(CONN_DRY_RUN
, &tconn
->flags
);
2930 if (p_proto
!= tconn
->net_conf
->wire_protocol
) {
2931 conn_err(tconn
, "incompatible communication protocols\n");
2935 if (cmp_after_sb(p_after_sb_0p
, tconn
->net_conf
->after_sb_0p
)) {
2936 conn_err(tconn
, "incompatible after-sb-0pri settings\n");
2940 if (cmp_after_sb(p_after_sb_1p
, tconn
->net_conf
->after_sb_1p
)) {
2941 conn_err(tconn
, "incompatible after-sb-1pri settings\n");
2945 if (cmp_after_sb(p_after_sb_2p
, tconn
->net_conf
->after_sb_2p
)) {
2946 conn_err(tconn
, "incompatible after-sb-2pri settings\n");
2950 if (p_want_lose
&& tconn
->net_conf
->want_lose
) {
2951 conn_err(tconn
, "both sides have the 'want_lose' flag set\n");
2955 if (p_two_primaries
!= tconn
->net_conf
->two_primaries
) {
2956 conn_err(tconn
, "incompatible setting of the two-primaries options\n");
2960 if (tconn
->agreed_pro_version
>= 87) {
2961 unsigned char *my_alg
= tconn
->net_conf
->integrity_alg
;
2964 err
= drbd_recv_all(tconn
, p_integrity_alg
, pi
->size
);
2968 p_integrity_alg
[SHARED_SECRET_MAX
-1] = 0;
2969 if (strcmp(p_integrity_alg
, my_alg
)) {
2970 conn_err(tconn
, "incompatible setting of the data-integrity-alg\n");
2973 conn_info(tconn
, "data-integrity-alg: %s\n",
2974 my_alg
[0] ? my_alg
: (unsigned char *)"<not-used>");
2980 conn_request_state(tconn
, NS(conn
, C_DISCONNECTING
), CS_HARD
);
2985 * input: alg name, feature name
2986 * return: NULL (alg name was "")
2987 * ERR_PTR(error) if something goes wrong
2988 * or the crypto hash ptr, if it worked out ok. */
2989 struct crypto_hash
*drbd_crypto_alloc_digest_safe(const struct drbd_conf
*mdev
,
2990 const char *alg
, const char *name
)
2992 struct crypto_hash
*tfm
;
2997 tfm
= crypto_alloc_hash(alg
, 0, CRYPTO_ALG_ASYNC
);
2999 dev_err(DEV
, "Can not allocate \"%s\" as %s (reason: %ld)\n",
3000 alg
, name
, PTR_ERR(tfm
));
3003 if (!drbd_crypto_is_hash(crypto_hash_tfm(tfm
))) {
3004 crypto_free_hash(tfm
);
3005 dev_err(DEV
, "\"%s\" is not a digest (%s)\n", alg
, name
);
3006 return ERR_PTR(-EINVAL
);
3011 static int ignore_remaining_packet(struct drbd_tconn
*tconn
, struct packet_info
*pi
)
3013 void *buffer
= tconn
->data
.rbuf
;
3014 int size
= pi
->size
;
3017 int s
= min_t(int, size
, DRBD_SOCKET_BUFFER_SIZE
);
3018 s
= drbd_recv(tconn
, buffer
, s
);
3032 * config_unknown_volume - device configuration command for unknown volume
3034 * When a device is added to an existing connection, the node on which the
3035 * device is added first will send configuration commands to its peer but the
3036 * peer will not know about the device yet. It will warn and ignore these
3037 * commands. Once the device is added on the second node, the second node will
3038 * send the same device configuration commands, but in the other direction.
3040 * (We can also end up here if drbd is misconfigured.)
3042 static int config_unknown_volume(struct drbd_tconn
*tconn
, struct packet_info
*pi
)
3044 conn_warn(tconn
, "Volume %u unknown; ignoring %s packet\n",
3045 pi
->vnr
, cmdname(pi
->cmd
));
3046 return ignore_remaining_packet(tconn
, pi
);
3049 static int receive_SyncParam(struct drbd_tconn
*tconn
, struct packet_info
*pi
)
3051 struct drbd_conf
*mdev
;
3052 struct p_rs_param_95
*p
;
3053 unsigned int header_size
, data_size
, exp_max_sz
;
3054 struct crypto_hash
*verify_tfm
= NULL
;
3055 struct crypto_hash
*csums_tfm
= NULL
;
3056 const int apv
= tconn
->agreed_pro_version
;
3057 int *rs_plan_s
= NULL
;
3061 mdev
= vnr_to_mdev(tconn
, pi
->vnr
);
3063 return config_unknown_volume(tconn
, pi
);
3065 exp_max_sz
= apv
<= 87 ? sizeof(struct p_rs_param
)
3066 : apv
== 88 ? sizeof(struct p_rs_param
)
3068 : apv
<= 94 ? sizeof(struct p_rs_param_89
)
3069 : /* apv >= 95 */ sizeof(struct p_rs_param_95
);
3071 if (pi
->size
> exp_max_sz
) {
3072 dev_err(DEV
, "SyncParam packet too long: received %u, expected <= %u bytes\n",
3073 pi
->size
, exp_max_sz
);
3078 header_size
= sizeof(struct p_rs_param
);
3079 data_size
= pi
->size
- header_size
;
3080 } else if (apv
<= 94) {
3081 header_size
= sizeof(struct p_rs_param_89
);
3082 data_size
= pi
->size
- header_size
;
3083 D_ASSERT(data_size
== 0);
3085 header_size
= sizeof(struct p_rs_param_95
);
3086 data_size
= pi
->size
- header_size
;
3087 D_ASSERT(data_size
== 0);
3090 /* initialize verify_alg and csums_alg */
3092 memset(p
->verify_alg
, 0, 2 * SHARED_SECRET_MAX
);
3094 err
= drbd_recv_all(mdev
->tconn
, p
, header_size
);
3098 if (get_ldev(mdev
)) {
3099 mdev
->ldev
->dc
.resync_rate
= be32_to_cpu(p
->rate
);
3105 if (data_size
> SHARED_SECRET_MAX
) {
3106 dev_err(DEV
, "verify-alg too long, "
3107 "peer wants %u, accepting only %u byte\n",
3108 data_size
, SHARED_SECRET_MAX
);
3112 err
= drbd_recv_all(mdev
->tconn
, p
->verify_alg
, data_size
);
3116 /* we expect NUL terminated string */
3117 /* but just in case someone tries to be evil */
3118 D_ASSERT(p
->verify_alg
[data_size
-1] == 0);
3119 p
->verify_alg
[data_size
-1] = 0;
3121 } else /* apv >= 89 */ {
3122 /* we still expect NUL terminated strings */
3123 /* but just in case someone tries to be evil */
3124 D_ASSERT(p
->verify_alg
[SHARED_SECRET_MAX
-1] == 0);
3125 D_ASSERT(p
->csums_alg
[SHARED_SECRET_MAX
-1] == 0);
3126 p
->verify_alg
[SHARED_SECRET_MAX
-1] = 0;
3127 p
->csums_alg
[SHARED_SECRET_MAX
-1] = 0;
3130 if (strcmp(mdev
->tconn
->net_conf
->verify_alg
, p
->verify_alg
)) {
3131 if (mdev
->state
.conn
== C_WF_REPORT_PARAMS
) {
3132 dev_err(DEV
, "Different verify-alg settings. me=\"%s\" peer=\"%s\"\n",
3133 mdev
->tconn
->net_conf
->verify_alg
, p
->verify_alg
);
3136 verify_tfm
= drbd_crypto_alloc_digest_safe(mdev
,
3137 p
->verify_alg
, "verify-alg");
3138 if (IS_ERR(verify_tfm
)) {
3144 if (apv
>= 89 && strcmp(mdev
->tconn
->net_conf
->csums_alg
, p
->csums_alg
)) {
3145 if (mdev
->state
.conn
== C_WF_REPORT_PARAMS
) {
3146 dev_err(DEV
, "Different csums-alg settings. me=\"%s\" peer=\"%s\"\n",
3147 mdev
->tconn
->net_conf
->csums_alg
, p
->csums_alg
);
3150 csums_tfm
= drbd_crypto_alloc_digest_safe(mdev
,
3151 p
->csums_alg
, "csums-alg");
3152 if (IS_ERR(csums_tfm
)) {
3158 if (apv
> 94 && get_ldev(mdev
)) {
3159 mdev
->ldev
->dc
.resync_rate
= be32_to_cpu(p
->rate
);
3160 mdev
->ldev
->dc
.c_plan_ahead
= be32_to_cpu(p
->c_plan_ahead
);
3161 mdev
->ldev
->dc
.c_delay_target
= be32_to_cpu(p
->c_delay_target
);
3162 mdev
->ldev
->dc
.c_fill_target
= be32_to_cpu(p
->c_fill_target
);
3163 mdev
->ldev
->dc
.c_max_rate
= be32_to_cpu(p
->c_max_rate
);
3165 fifo_size
= (mdev
->ldev
->dc
.c_plan_ahead
* 10 * SLEEP_TIME
) / HZ
;
3166 if (fifo_size
!= mdev
->rs_plan_s
.size
&& fifo_size
> 0) {
3167 rs_plan_s
= kzalloc(sizeof(int) * fifo_size
, GFP_KERNEL
);
3169 dev_err(DEV
, "kmalloc of fifo_buffer failed");
3177 spin_lock(&mdev
->peer_seq_lock
);
3178 /* lock against drbd_nl_syncer_conf() */
3180 strcpy(mdev
->tconn
->net_conf
->verify_alg
, p
->verify_alg
);
3181 mdev
->tconn
->net_conf
->verify_alg_len
= strlen(p
->verify_alg
) + 1;
3182 crypto_free_hash(mdev
->tconn
->verify_tfm
);
3183 mdev
->tconn
->verify_tfm
= verify_tfm
;
3184 dev_info(DEV
, "using verify-alg: \"%s\"\n", p
->verify_alg
);
3187 strcpy(mdev
->tconn
->net_conf
->csums_alg
, p
->csums_alg
);
3188 mdev
->tconn
->net_conf
->csums_alg_len
= strlen(p
->csums_alg
) + 1;
3189 crypto_free_hash(mdev
->tconn
->csums_tfm
);
3190 mdev
->tconn
->csums_tfm
= csums_tfm
;
3191 dev_info(DEV
, "using csums-alg: \"%s\"\n", p
->csums_alg
);
3193 if (fifo_size
!= mdev
->rs_plan_s
.size
) {
3194 kfree(mdev
->rs_plan_s
.values
);
3195 mdev
->rs_plan_s
.values
= rs_plan_s
;
3196 mdev
->rs_plan_s
.size
= fifo_size
;
3197 mdev
->rs_planed
= 0;
3199 spin_unlock(&mdev
->peer_seq_lock
);
3204 /* just for completeness: actually not needed,
3205 * as this is not reached if csums_tfm was ok. */
3206 crypto_free_hash(csums_tfm
);
3207 /* but free the verify_tfm again, if csums_tfm did not work out */
3208 crypto_free_hash(verify_tfm
);
3209 conn_request_state(mdev
->tconn
, NS(conn
, C_DISCONNECTING
), CS_HARD
);
3213 /* warn if the arguments differ by more than 12.5% */
3214 static void warn_if_differ_considerably(struct drbd_conf
*mdev
,
3215 const char *s
, sector_t a
, sector_t b
)
3218 if (a
== 0 || b
== 0)
3220 d
= (a
> b
) ? (a
- b
) : (b
- a
);
3221 if (d
> (a
>>3) || d
> (b
>>3))
3222 dev_warn(DEV
, "Considerable difference in %s: %llus vs. %llus\n", s
,
3223 (unsigned long long)a
, (unsigned long long)b
);
3226 static int receive_sizes(struct drbd_tconn
*tconn
, struct packet_info
*pi
)
3228 struct drbd_conf
*mdev
;
3229 struct p_sizes
*p
= pi
->data
;
3230 enum determine_dev_size dd
= unchanged
;
3231 sector_t p_size
, p_usize
, my_usize
;
3232 int ldsc
= 0; /* local disk size changed */
3233 enum dds_flags ddsf
;
3235 mdev
= vnr_to_mdev(tconn
, pi
->vnr
);
3237 return config_unknown_volume(tconn
, pi
);
3239 p_size
= be64_to_cpu(p
->d_size
);
3240 p_usize
= be64_to_cpu(p
->u_size
);
3242 /* just store the peer's disk size for now.
3243 * we still need to figure out whether we accept that. */
3244 mdev
->p_size
= p_size
;
3246 if (get_ldev(mdev
)) {
3247 warn_if_differ_considerably(mdev
, "lower level device sizes",
3248 p_size
, drbd_get_max_capacity(mdev
->ldev
));
3249 warn_if_differ_considerably(mdev
, "user requested size",
3250 p_usize
, mdev
->ldev
->dc
.disk_size
);
3252 /* if this is the first connect, or an otherwise expected
3253 * param exchange, choose the minimum */
3254 if (mdev
->state
.conn
== C_WF_REPORT_PARAMS
)
3255 p_usize
= min_not_zero((sector_t
)mdev
->ldev
->dc
.disk_size
,
3258 my_usize
= mdev
->ldev
->dc
.disk_size
;
3260 if (mdev
->ldev
->dc
.disk_size
!= p_usize
) {
3261 mdev
->ldev
->dc
.disk_size
= p_usize
;
3262 dev_info(DEV
, "Peer sets u_size to %lu sectors\n",
3263 (unsigned long)mdev
->ldev
->dc
.disk_size
);
3266 /* Never shrink a device with usable data during connect.
3267 But allow online shrinking if we are connected. */
3268 if (drbd_new_dev_size(mdev
, mdev
->ldev
, 0) <
3269 drbd_get_capacity(mdev
->this_bdev
) &&
3270 mdev
->state
.disk
>= D_OUTDATED
&&
3271 mdev
->state
.conn
< C_CONNECTED
) {
3272 dev_err(DEV
, "The peer's disk size is too small!\n");
3273 conn_request_state(mdev
->tconn
, NS(conn
, C_DISCONNECTING
), CS_HARD
);
3274 mdev
->ldev
->dc
.disk_size
= my_usize
;
3281 ddsf
= be16_to_cpu(p
->dds_flags
);
3282 if (get_ldev(mdev
)) {
3283 dd
= drbd_determine_dev_size(mdev
, ddsf
);
3285 if (dd
== dev_size_error
)
3289 /* I am diskless, need to accept the peer's size. */
3290 drbd_set_my_capacity(mdev
, p_size
);
3293 mdev
->peer_max_bio_size
= be32_to_cpu(p
->max_bio_size
);
3294 drbd_reconsider_max_bio_size(mdev
);
3296 if (get_ldev(mdev
)) {
3297 if (mdev
->ldev
->known_size
!= drbd_get_capacity(mdev
->ldev
->backing_bdev
)) {
3298 mdev
->ldev
->known_size
= drbd_get_capacity(mdev
->ldev
->backing_bdev
);
3305 if (mdev
->state
.conn
> C_WF_REPORT_PARAMS
) {
3306 if (be64_to_cpu(p
->c_size
) !=
3307 drbd_get_capacity(mdev
->this_bdev
) || ldsc
) {
3308 /* we have different sizes, probably peer
3309 * needs to know my new size... */
3310 drbd_send_sizes(mdev
, 0, ddsf
);
3312 if (test_and_clear_bit(RESIZE_PENDING
, &mdev
->flags
) ||
3313 (dd
== grew
&& mdev
->state
.conn
== C_CONNECTED
)) {
3314 if (mdev
->state
.pdsk
>= D_INCONSISTENT
&&
3315 mdev
->state
.disk
>= D_INCONSISTENT
) {
3316 if (ddsf
& DDSF_NO_RESYNC
)
3317 dev_info(DEV
, "Resync of new storage suppressed with --assume-clean\n");
3319 resync_after_online_grow(mdev
);
3321 set_bit(RESYNC_AFTER_NEG
, &mdev
->flags
);
3328 static int receive_uuids(struct drbd_tconn
*tconn
, struct packet_info
*pi
)
3330 struct drbd_conf
*mdev
;
3331 struct p_uuids
*p
= pi
->data
;
3333 int i
, updated_uuids
= 0;
3335 mdev
= vnr_to_mdev(tconn
, pi
->vnr
);
3337 return config_unknown_volume(tconn
, pi
);
3339 p_uuid
= kmalloc(sizeof(u64
)*UI_EXTENDED_SIZE
, GFP_NOIO
);
3341 for (i
= UI_CURRENT
; i
< UI_EXTENDED_SIZE
; i
++)
3342 p_uuid
[i
] = be64_to_cpu(p
->uuid
[i
]);
3344 kfree(mdev
->p_uuid
);
3345 mdev
->p_uuid
= p_uuid
;
3347 if (mdev
->state
.conn
< C_CONNECTED
&&
3348 mdev
->state
.disk
< D_INCONSISTENT
&&
3349 mdev
->state
.role
== R_PRIMARY
&&
3350 (mdev
->ed_uuid
& ~((u64
)1)) != (p_uuid
[UI_CURRENT
] & ~((u64
)1))) {
3351 dev_err(DEV
, "Can only connect to data with current UUID=%016llX\n",
3352 (unsigned long long)mdev
->ed_uuid
);
3353 conn_request_state(mdev
->tconn
, NS(conn
, C_DISCONNECTING
), CS_HARD
);
3357 if (get_ldev(mdev
)) {
3358 int skip_initial_sync
=
3359 mdev
->state
.conn
== C_CONNECTED
&&
3360 mdev
->tconn
->agreed_pro_version
>= 90 &&
3361 mdev
->ldev
->md
.uuid
[UI_CURRENT
] == UUID_JUST_CREATED
&&
3362 (p_uuid
[UI_FLAGS
] & 8);
3363 if (skip_initial_sync
) {
3364 dev_info(DEV
, "Accepted new current UUID, preparing to skip initial sync\n");
3365 drbd_bitmap_io(mdev
, &drbd_bmio_clear_n_write
,
3366 "clear_n_write from receive_uuids",
3367 BM_LOCKED_TEST_ALLOWED
);
3368 _drbd_uuid_set(mdev
, UI_CURRENT
, p_uuid
[UI_CURRENT
]);
3369 _drbd_uuid_set(mdev
, UI_BITMAP
, 0);
3370 _drbd_set_state(_NS2(mdev
, disk
, D_UP_TO_DATE
, pdsk
, D_UP_TO_DATE
),
3376 } else if (mdev
->state
.disk
< D_INCONSISTENT
&&
3377 mdev
->state
.role
== R_PRIMARY
) {
3378 /* I am a diskless primary, the peer just created a new current UUID
3380 updated_uuids
= drbd_set_ed_uuid(mdev
, p_uuid
[UI_CURRENT
]);
3383 /* Before we test for the disk state, we should wait until an eventually
3384 ongoing cluster wide state change is finished. That is important if
3385 we are primary and are detaching from our disk. We need to see the
3386 new disk state... */
3387 mutex_lock(mdev
->state_mutex
);
3388 mutex_unlock(mdev
->state_mutex
);
3389 if (mdev
->state
.conn
>= C_CONNECTED
&& mdev
->state
.disk
< D_INCONSISTENT
)
3390 updated_uuids
|= drbd_set_ed_uuid(mdev
, p_uuid
[UI_CURRENT
]);
3393 drbd_print_uuids(mdev
, "receiver updated UUIDs to");
3399 * convert_state() - Converts the peer's view of the cluster state to our point of view
3400 * @ps: The state as seen by the peer.
3402 static union drbd_state
convert_state(union drbd_state ps
)
3404 union drbd_state ms
;
3406 static enum drbd_conns c_tab
[] = {
3407 [C_CONNECTED
] = C_CONNECTED
,
3409 [C_STARTING_SYNC_S
] = C_STARTING_SYNC_T
,
3410 [C_STARTING_SYNC_T
] = C_STARTING_SYNC_S
,
3411 [C_DISCONNECTING
] = C_TEAR_DOWN
, /* C_NETWORK_FAILURE, */
3412 [C_VERIFY_S
] = C_VERIFY_T
,
3418 ms
.conn
= c_tab
[ps
.conn
];
3423 ms
.peer_isp
= (ps
.aftr_isp
| ps
.user_isp
);
3428 static int receive_req_state(struct drbd_tconn
*tconn
, struct packet_info
*pi
)
3430 struct drbd_conf
*mdev
;
3431 struct p_req_state
*p
= pi
->data
;
3432 union drbd_state mask
, val
;
3433 enum drbd_state_rv rv
;
3435 mdev
= vnr_to_mdev(tconn
, pi
->vnr
);
3439 mask
.i
= be32_to_cpu(p
->mask
);
3440 val
.i
= be32_to_cpu(p
->val
);
3442 if (test_bit(DISCARD_CONCURRENT
, &mdev
->tconn
->flags
) &&
3443 mutex_is_locked(mdev
->state_mutex
)) {
3444 drbd_send_sr_reply(mdev
, SS_CONCURRENT_ST_CHG
);
3448 mask
= convert_state(mask
);
3449 val
= convert_state(val
);
3451 rv
= drbd_change_state(mdev
, CS_VERBOSE
, mask
, val
);
3452 drbd_send_sr_reply(mdev
, rv
);
3459 static int receive_req_conn_state(struct drbd_tconn
*tconn
, struct packet_info
*pi
)
3461 struct p_req_state
*p
= pi
->data
;
3462 union drbd_state mask
, val
;
3463 enum drbd_state_rv rv
;
3465 mask
.i
= be32_to_cpu(p
->mask
);
3466 val
.i
= be32_to_cpu(p
->val
);
3468 if (test_bit(DISCARD_CONCURRENT
, &tconn
->flags
) &&
3469 mutex_is_locked(&tconn
->cstate_mutex
)) {
3470 conn_send_sr_reply(tconn
, SS_CONCURRENT_ST_CHG
);
3474 mask
= convert_state(mask
);
3475 val
= convert_state(val
);
3477 rv
= conn_request_state(tconn
, mask
, val
, CS_VERBOSE
| CS_LOCAL_ONLY
| CS_IGN_OUTD_FAIL
);
3478 conn_send_sr_reply(tconn
, rv
);
3483 static int receive_state(struct drbd_tconn
*tconn
, struct packet_info
*pi
)
3485 struct drbd_conf
*mdev
;
3486 struct p_state
*p
= pi
->data
;
3487 union drbd_state os
, ns
, peer_state
;
3488 enum drbd_disk_state real_peer_disk
;
3489 enum chg_state_flags cs_flags
;
3492 mdev
= vnr_to_mdev(tconn
, pi
->vnr
);
3494 return config_unknown_volume(tconn
, pi
);
3496 peer_state
.i
= be32_to_cpu(p
->state
);
3498 real_peer_disk
= peer_state
.disk
;
3499 if (peer_state
.disk
== D_NEGOTIATING
) {
3500 real_peer_disk
= mdev
->p_uuid
[UI_FLAGS
] & 4 ? D_INCONSISTENT
: D_CONSISTENT
;
3501 dev_info(DEV
, "real peer disk state = %s\n", drbd_disk_str(real_peer_disk
));
3504 spin_lock_irq(&mdev
->tconn
->req_lock
);
3506 os
= ns
= drbd_read_state(mdev
);
3507 spin_unlock_irq(&mdev
->tconn
->req_lock
);
3509 /* peer says his disk is uptodate, while we think it is inconsistent,
3510 * and this happens while we think we have a sync going on. */
3511 if (os
.pdsk
== D_INCONSISTENT
&& real_peer_disk
== D_UP_TO_DATE
&&
3512 os
.conn
> C_CONNECTED
&& os
.disk
== D_UP_TO_DATE
) {
3513 /* If we are (becoming) SyncSource, but peer is still in sync
3514 * preparation, ignore its uptodate-ness to avoid flapping, it
3515 * will change to inconsistent once the peer reaches active
3517 * It may have changed syncer-paused flags, however, so we
3518 * cannot ignore this completely. */
3519 if (peer_state
.conn
> C_CONNECTED
&&
3520 peer_state
.conn
< C_SYNC_SOURCE
)
3521 real_peer_disk
= D_INCONSISTENT
;
3523 /* if peer_state changes to connected at the same time,
3524 * it explicitly notifies us that it finished resync.
3525 * Maybe we should finish it up, too? */
3526 else if (os
.conn
>= C_SYNC_SOURCE
&&
3527 peer_state
.conn
== C_CONNECTED
) {
3528 if (drbd_bm_total_weight(mdev
) <= mdev
->rs_failed
)
3529 drbd_resync_finished(mdev
);
3534 /* peer says his disk is inconsistent, while we think it is uptodate,
3535 * and this happens while the peer still thinks we have a sync going on,
3536 * but we think we are already done with the sync.
3537 * We ignore this to avoid flapping pdsk.
3538 * This should not happen, if the peer is a recent version of drbd. */
3539 if (os
.pdsk
== D_UP_TO_DATE
&& real_peer_disk
== D_INCONSISTENT
&&
3540 os
.conn
== C_CONNECTED
&& peer_state
.conn
> C_SYNC_SOURCE
)
3541 real_peer_disk
= D_UP_TO_DATE
;
3543 if (ns
.conn
== C_WF_REPORT_PARAMS
)
3544 ns
.conn
= C_CONNECTED
;
3546 if (peer_state
.conn
== C_AHEAD
)
3549 if (mdev
->p_uuid
&& peer_state
.disk
>= D_NEGOTIATING
&&
3550 get_ldev_if_state(mdev
, D_NEGOTIATING
)) {
3551 int cr
; /* consider resync */
3553 /* if we established a new connection */
3554 cr
= (os
.conn
< C_CONNECTED
);
3555 /* if we had an established connection
3556 * and one of the nodes newly attaches a disk */
3557 cr
|= (os
.conn
== C_CONNECTED
&&
3558 (peer_state
.disk
== D_NEGOTIATING
||
3559 os
.disk
== D_NEGOTIATING
));
3560 /* if we have both been inconsistent, and the peer has been
3561 * forced to be UpToDate with --overwrite-data */
3562 cr
|= test_bit(CONSIDER_RESYNC
, &mdev
->flags
);
3563 /* if we had been plain connected, and the admin requested to
3564 * start a sync by "invalidate" or "invalidate-remote" */
3565 cr
|= (os
.conn
== C_CONNECTED
&&
3566 (peer_state
.conn
>= C_STARTING_SYNC_S
&&
3567 peer_state
.conn
<= C_WF_BITMAP_T
));
3570 ns
.conn
= drbd_sync_handshake(mdev
, peer_state
.role
, real_peer_disk
);
3573 if (ns
.conn
== C_MASK
) {
3574 ns
.conn
= C_CONNECTED
;
3575 if (mdev
->state
.disk
== D_NEGOTIATING
) {
3576 drbd_force_state(mdev
, NS(disk
, D_FAILED
));
3577 } else if (peer_state
.disk
== D_NEGOTIATING
) {
3578 dev_err(DEV
, "Disk attach process on the peer node was aborted.\n");
3579 peer_state
.disk
= D_DISKLESS
;
3580 real_peer_disk
= D_DISKLESS
;
3582 if (test_and_clear_bit(CONN_DRY_RUN
, &mdev
->tconn
->flags
))
3584 D_ASSERT(os
.conn
== C_WF_REPORT_PARAMS
);
3585 conn_request_state(mdev
->tconn
, NS(conn
, C_DISCONNECTING
), CS_HARD
);
3591 spin_lock_irq(&mdev
->tconn
->req_lock
);
3592 if (os
.i
!= drbd_read_state(mdev
).i
)
3594 clear_bit(CONSIDER_RESYNC
, &mdev
->flags
);
3595 ns
.peer
= peer_state
.role
;
3596 ns
.pdsk
= real_peer_disk
;
3597 ns
.peer_isp
= (peer_state
.aftr_isp
| peer_state
.user_isp
);
3598 if ((ns
.conn
== C_CONNECTED
|| ns
.conn
== C_WF_BITMAP_S
) && ns
.disk
== D_NEGOTIATING
)
3599 ns
.disk
= mdev
->new_state_tmp
.disk
;
3600 cs_flags
= CS_VERBOSE
+ (os
.conn
< C_CONNECTED
&& ns
.conn
>= C_CONNECTED
? 0 : CS_HARD
);
3601 if (ns
.pdsk
== D_CONSISTENT
&& drbd_suspended(mdev
) && ns
.conn
== C_CONNECTED
&& os
.conn
< C_CONNECTED
&&
3602 test_bit(NEW_CUR_UUID
, &mdev
->flags
)) {
3603 /* Do not allow tl_restart(RESEND) for a rebooted peer. We can only allow this
3604 for temporal network outages! */
3605 spin_unlock_irq(&mdev
->tconn
->req_lock
);
3606 dev_err(DEV
, "Aborting Connect, can not thaw IO with an only Consistent peer\n");
3607 tl_clear(mdev
->tconn
);
3608 drbd_uuid_new_current(mdev
);
3609 clear_bit(NEW_CUR_UUID
, &mdev
->flags
);
3610 conn_request_state(mdev
->tconn
, NS2(conn
, C_PROTOCOL_ERROR
, susp
, 0), CS_HARD
);
3613 rv
= _drbd_set_state(mdev
, ns
, cs_flags
, NULL
);
3614 ns
= drbd_read_state(mdev
);
3615 spin_unlock_irq(&mdev
->tconn
->req_lock
);
3617 if (rv
< SS_SUCCESS
) {
3618 conn_request_state(mdev
->tconn
, NS(conn
, C_DISCONNECTING
), CS_HARD
);
3622 if (os
.conn
> C_WF_REPORT_PARAMS
) {
3623 if (ns
.conn
> C_CONNECTED
&& peer_state
.conn
<= C_CONNECTED
&&
3624 peer_state
.disk
!= D_NEGOTIATING
) {
3625 /* we want resync, peer has not yet decided to sync... */
3626 /* Nowadays only used when forcing a node into primary role and
3627 setting its disk to UpToDate with that */
3628 drbd_send_uuids(mdev
);
3629 drbd_send_state(mdev
);
3633 mdev
->tconn
->net_conf
->want_lose
= 0;
3635 drbd_md_sync(mdev
); /* update connected indicator, la_size, ... */
3640 static int receive_sync_uuid(struct drbd_tconn
*tconn
, struct packet_info
*pi
)
3642 struct drbd_conf
*mdev
;
3643 struct p_rs_uuid
*p
= pi
->data
;
3645 mdev
= vnr_to_mdev(tconn
, pi
->vnr
);
3649 wait_event(mdev
->misc_wait
,
3650 mdev
->state
.conn
== C_WF_SYNC_UUID
||
3651 mdev
->state
.conn
== C_BEHIND
||
3652 mdev
->state
.conn
< C_CONNECTED
||
3653 mdev
->state
.disk
< D_NEGOTIATING
);
3655 /* D_ASSERT( mdev->state.conn == C_WF_SYNC_UUID ); */
3657 /* Here the _drbd_uuid_ functions are right, current should
3658 _not_ be rotated into the history */
3659 if (get_ldev_if_state(mdev
, D_NEGOTIATING
)) {
3660 _drbd_uuid_set(mdev
, UI_CURRENT
, be64_to_cpu(p
->uuid
));
3661 _drbd_uuid_set(mdev
, UI_BITMAP
, 0UL);
3663 drbd_print_uuids(mdev
, "updated sync uuid");
3664 drbd_start_resync(mdev
, C_SYNC_TARGET
);
3668 dev_err(DEV
, "Ignoring SyncUUID packet!\n");
3674 * receive_bitmap_plain
3676 * Return 0 when done, 1 when another iteration is needed, and a negative error
3677 * code upon failure.
3680 receive_bitmap_plain(struct drbd_conf
*mdev
, unsigned int size
,
3681 unsigned long *p
, struct bm_xfer_ctx
*c
)
3683 unsigned int data_size
= DRBD_SOCKET_BUFFER_SIZE
-
3684 drbd_header_size(mdev
->tconn
);
3685 unsigned int num_words
= min_t(size_t, data_size
/ sizeof(*p
),
3686 c
->bm_words
- c
->word_offset
);
3687 unsigned int want
= num_words
* sizeof(*p
);
3691 dev_err(DEV
, "%s:want (%u) != size (%u)\n", __func__
, want
, size
);
3696 err
= drbd_recv_all(mdev
->tconn
, p
, want
);
3700 drbd_bm_merge_lel(mdev
, c
->word_offset
, num_words
, p
);
3702 c
->word_offset
+= num_words
;
3703 c
->bit_offset
= c
->word_offset
* BITS_PER_LONG
;
3704 if (c
->bit_offset
> c
->bm_bits
)
3705 c
->bit_offset
= c
->bm_bits
;
3710 static enum drbd_bitmap_code
dcbp_get_code(struct p_compressed_bm
*p
)
3712 return (enum drbd_bitmap_code
)(p
->encoding
& 0x0f);
3715 static int dcbp_get_start(struct p_compressed_bm
*p
)
3717 return (p
->encoding
& 0x80) != 0;
3720 static int dcbp_get_pad_bits(struct p_compressed_bm
*p
)
3722 return (p
->encoding
>> 4) & 0x7;
3728 * Return 0 when done, 1 when another iteration is needed, and a negative error
3729 * code upon failure.
3732 recv_bm_rle_bits(struct drbd_conf
*mdev
,
3733 struct p_compressed_bm
*p
,
3734 struct bm_xfer_ctx
*c
,
3737 struct bitstream bs
;
3741 unsigned long s
= c
->bit_offset
;
3743 int toggle
= dcbp_get_start(p
);
3747 bitstream_init(&bs
, p
->code
, len
, dcbp_get_pad_bits(p
));
3749 bits
= bitstream_get_bits(&bs
, &look_ahead
, 64);
3753 for (have
= bits
; have
> 0; s
+= rl
, toggle
= !toggle
) {
3754 bits
= vli_decode_bits(&rl
, look_ahead
);
3760 if (e
>= c
->bm_bits
) {
3761 dev_err(DEV
, "bitmap overflow (e:%lu) while decoding bm RLE packet\n", e
);
3764 _drbd_bm_set_bits(mdev
, s
, e
);
3768 dev_err(DEV
, "bitmap decoding error: h:%d b:%d la:0x%08llx l:%u/%u\n",
3769 have
, bits
, look_ahead
,
3770 (unsigned int)(bs
.cur
.b
- p
->code
),
3771 (unsigned int)bs
.buf_len
);
3774 look_ahead
>>= bits
;
3777 bits
= bitstream_get_bits(&bs
, &tmp
, 64 - have
);
3780 look_ahead
|= tmp
<< have
;
3785 bm_xfer_ctx_bit_to_word_offset(c
);
3787 return (s
!= c
->bm_bits
);
3793 * Return 0 when done, 1 when another iteration is needed, and a negative error
3794 * code upon failure.
3797 decode_bitmap_c(struct drbd_conf
*mdev
,
3798 struct p_compressed_bm
*p
,
3799 struct bm_xfer_ctx
*c
,
3802 if (dcbp_get_code(p
) == RLE_VLI_Bits
)
3803 return recv_bm_rle_bits(mdev
, p
, c
, len
- sizeof(*p
));
3805 /* other variants had been implemented for evaluation,
3806 * but have been dropped as this one turned out to be "best"
3807 * during all our tests. */
3809 dev_err(DEV
, "receive_bitmap_c: unknown encoding %u\n", p
->encoding
);
3810 conn_request_state(mdev
->tconn
, NS(conn
, C_PROTOCOL_ERROR
), CS_HARD
);
3814 void INFO_bm_xfer_stats(struct drbd_conf
*mdev
,
3815 const char *direction
, struct bm_xfer_ctx
*c
)
3817 /* what would it take to transfer it "plaintext" */
3818 unsigned int header_size
= drbd_header_size(mdev
->tconn
);
3819 unsigned int data_size
= DRBD_SOCKET_BUFFER_SIZE
- header_size
;
3820 unsigned int plain
=
3821 header_size
* (DIV_ROUND_UP(c
->bm_words
, data_size
) + 1) +
3822 c
->bm_words
* sizeof(unsigned long);
3823 unsigned int total
= c
->bytes
[0] + c
->bytes
[1];
3826 /* total can not be zero. but just in case: */
3830 /* don't report if not compressed */
3834 /* total < plain. check for overflow, still */
3835 r
= (total
> UINT_MAX
/1000) ? (total
/ (plain
/1000))
3836 : (1000 * total
/ plain
);
3842 dev_info(DEV
, "%s bitmap stats [Bytes(packets)]: plain %u(%u), RLE %u(%u), "
3843 "total %u; compression: %u.%u%%\n",
3845 c
->bytes
[1], c
->packets
[1],
3846 c
->bytes
[0], c
->packets
[0],
3847 total
, r
/10, r
% 10);
3850 /* Since we are processing the bitfield from lower addresses to higher,
3851 it does not matter if the process it in 32 bit chunks or 64 bit
3852 chunks as long as it is little endian. (Understand it as byte stream,
3853 beginning with the lowest byte...) If we would use big endian
3854 we would need to process it from the highest address to the lowest,
3855 in order to be agnostic to the 32 vs 64 bits issue.
3857 returns 0 on failure, 1 if we successfully received it. */
3858 static int receive_bitmap(struct drbd_tconn
*tconn
, struct packet_info
*pi
)
3860 struct drbd_conf
*mdev
;
3861 struct bm_xfer_ctx c
;
3864 mdev
= vnr_to_mdev(tconn
, pi
->vnr
);
3868 drbd_bm_lock(mdev
, "receive bitmap", BM_LOCKED_SET_ALLOWED
);
3869 /* you are supposed to send additional out-of-sync information
3870 * if you actually set bits during this phase */
3872 c
= (struct bm_xfer_ctx
) {
3873 .bm_bits
= drbd_bm_bits(mdev
),
3874 .bm_words
= drbd_bm_words(mdev
),
3878 if (pi
->cmd
== P_BITMAP
)
3879 err
= receive_bitmap_plain(mdev
, pi
->size
, pi
->data
, &c
);
3880 else if (pi
->cmd
== P_COMPRESSED_BITMAP
) {
3881 /* MAYBE: sanity check that we speak proto >= 90,
3882 * and the feature is enabled! */
3883 struct p_compressed_bm
*p
= pi
->data
;
3885 if (pi
->size
> DRBD_SOCKET_BUFFER_SIZE
- drbd_header_size(tconn
)) {
3886 dev_err(DEV
, "ReportCBitmap packet too large\n");
3890 if (pi
->size
<= sizeof(*p
)) {
3891 dev_err(DEV
, "ReportCBitmap packet too small (l:%u)\n", pi
->size
);
3895 err
= drbd_recv_all(mdev
->tconn
, p
, pi
->size
);
3898 err
= decode_bitmap_c(mdev
, p
, &c
, pi
->size
);
3900 dev_warn(DEV
, "receive_bitmap: cmd neither ReportBitMap nor ReportCBitMap (is 0x%x)", pi
->cmd
);
3905 c
.packets
[pi
->cmd
== P_BITMAP
]++;
3906 c
.bytes
[pi
->cmd
== P_BITMAP
] += drbd_header_size(tconn
) + pi
->size
;
3913 err
= drbd_recv_header(mdev
->tconn
, pi
);
3918 INFO_bm_xfer_stats(mdev
, "receive", &c
);
3920 if (mdev
->state
.conn
== C_WF_BITMAP_T
) {
3921 enum drbd_state_rv rv
;
3923 err
= drbd_send_bitmap(mdev
);
3926 /* Omit CS_ORDERED with this state transition to avoid deadlocks. */
3927 rv
= _drbd_request_state(mdev
, NS(conn
, C_WF_SYNC_UUID
), CS_VERBOSE
);
3928 D_ASSERT(rv
== SS_SUCCESS
);
3929 } else if (mdev
->state
.conn
!= C_WF_BITMAP_S
) {
3930 /* admin may have requested C_DISCONNECTING,
3931 * other threads may have noticed network errors */
3932 dev_info(DEV
, "unexpected cstate (%s) in receive_bitmap\n",
3933 drbd_conn_str(mdev
->state
.conn
));
3938 drbd_bm_unlock(mdev
);
3939 if (!err
&& mdev
->state
.conn
== C_WF_BITMAP_S
)
3940 drbd_start_resync(mdev
, C_SYNC_SOURCE
);
3944 static int receive_skip(struct drbd_tconn
*tconn
, struct packet_info
*pi
)
3946 conn_warn(tconn
, "skipping unknown optional packet type %d, l: %d!\n",
3949 return ignore_remaining_packet(tconn
, pi
);
3952 static int receive_UnplugRemote(struct drbd_tconn
*tconn
, struct packet_info
*pi
)
3954 /* Make sure we've acked all the TCP data associated
3955 * with the data requests being unplugged */
3956 drbd_tcp_quickack(tconn
->data
.socket
);
3961 static int receive_out_of_sync(struct drbd_tconn
*tconn
, struct packet_info
*pi
)
3963 struct drbd_conf
*mdev
;
3964 struct p_block_desc
*p
= pi
->data
;
3966 mdev
= vnr_to_mdev(tconn
, pi
->vnr
);
3970 switch (mdev
->state
.conn
) {
3971 case C_WF_SYNC_UUID
:
3976 dev_err(DEV
, "ASSERT FAILED cstate = %s, expected: WFSyncUUID|WFBitMapT|Behind\n",
3977 drbd_conn_str(mdev
->state
.conn
));
3980 drbd_set_out_of_sync(mdev
, be64_to_cpu(p
->sector
), be32_to_cpu(p
->blksize
));
3988 int (*fn
)(struct drbd_tconn
*, struct packet_info
*);
3991 static struct data_cmd drbd_cmd_handler
[] = {
3992 [P_DATA
] = { 1, sizeof(struct p_data
), receive_Data
},
3993 [P_DATA_REPLY
] = { 1, sizeof(struct p_data
), receive_DataReply
},
3994 [P_RS_DATA_REPLY
] = { 1, sizeof(struct p_data
), receive_RSDataReply
} ,
3995 [P_BARRIER
] = { 0, sizeof(struct p_barrier
), receive_Barrier
} ,
3996 [P_BITMAP
] = { 1, 0, receive_bitmap
} ,
3997 [P_COMPRESSED_BITMAP
] = { 1, 0, receive_bitmap
} ,
3998 [P_UNPLUG_REMOTE
] = { 0, 0, receive_UnplugRemote
},
3999 [P_DATA_REQUEST
] = { 0, sizeof(struct p_block_req
), receive_DataRequest
},
4000 [P_RS_DATA_REQUEST
] = { 0, sizeof(struct p_block_req
), receive_DataRequest
},
4001 [P_SYNC_PARAM
] = { 1, 0, receive_SyncParam
},
4002 [P_SYNC_PARAM89
] = { 1, 0, receive_SyncParam
},
4003 [P_PROTOCOL
] = { 1, sizeof(struct p_protocol
), receive_protocol
},
4004 [P_UUIDS
] = { 0, sizeof(struct p_uuids
), receive_uuids
},
4005 [P_SIZES
] = { 0, sizeof(struct p_sizes
), receive_sizes
},
4006 [P_STATE
] = { 0, sizeof(struct p_state
), receive_state
},
4007 [P_STATE_CHG_REQ
] = { 0, sizeof(struct p_req_state
), receive_req_state
},
4008 [P_SYNC_UUID
] = { 0, sizeof(struct p_rs_uuid
), receive_sync_uuid
},
4009 [P_OV_REQUEST
] = { 0, sizeof(struct p_block_req
), receive_DataRequest
},
4010 [P_OV_REPLY
] = { 1, sizeof(struct p_block_req
), receive_DataRequest
},
4011 [P_CSUM_RS_REQUEST
] = { 1, sizeof(struct p_block_req
), receive_DataRequest
},
4012 [P_DELAY_PROBE
] = { 0, sizeof(struct p_delay_probe93
), receive_skip
},
4013 [P_OUT_OF_SYNC
] = { 0, sizeof(struct p_block_desc
), receive_out_of_sync
},
4014 [P_CONN_ST_CHG_REQ
] = { 0, sizeof(struct p_req_state
), receive_req_conn_state
},
4017 static void drbdd(struct drbd_tconn
*tconn
)
4019 struct packet_info pi
;
4020 size_t shs
; /* sub header size */
4023 while (get_t_state(&tconn
->receiver
) == RUNNING
) {
4024 struct data_cmd
*cmd
;
4026 drbd_thread_current_set_cpu(&tconn
->receiver
);
4027 if (drbd_recv_header(tconn
, &pi
))
4030 cmd
= &drbd_cmd_handler
[pi
.cmd
];
4031 if (unlikely(pi
.cmd
>= ARRAY_SIZE(drbd_cmd_handler
) || !cmd
->fn
)) {
4032 conn_err(tconn
, "unknown packet type %d, l: %d!\n", pi
.cmd
, pi
.size
);
4036 shs
= cmd
->pkt_size
;
4037 if (pi
.size
> shs
&& !cmd
->expect_payload
) {
4038 conn_err(tconn
, "No payload expected %s l:%d\n", cmdname(pi
.cmd
), pi
.size
);
4043 err
= drbd_recv_all_warn(tconn
, pi
.data
, shs
);
4049 err
= cmd
->fn(tconn
, &pi
);
4051 conn_err(tconn
, "error receiving %s, e: %d l: %d!\n",
4052 cmdname(pi
.cmd
), err
, pi
.size
);
4059 conn_request_state(tconn
, NS(conn
, C_PROTOCOL_ERROR
), CS_HARD
);
4062 void conn_flush_workqueue(struct drbd_tconn
*tconn
)
4064 struct drbd_wq_barrier barr
;
4066 barr
.w
.cb
= w_prev_work_done
;
4067 barr
.w
.tconn
= tconn
;
4068 init_completion(&barr
.done
);
4069 drbd_queue_work(&tconn
->data
.work
, &barr
.w
);
4070 wait_for_completion(&barr
.done
);
4073 static void drbd_disconnect(struct drbd_tconn
*tconn
)
4076 int rv
= SS_UNKNOWN_ERROR
;
4078 if (tconn
->cstate
== C_STANDALONE
)
4081 /* asender does not clean up anything. it must not interfere, either */
4082 drbd_thread_stop(&tconn
->asender
);
4083 drbd_free_sock(tconn
);
4085 idr_for_each(&tconn
->volumes
, drbd_disconnected
, tconn
);
4086 conn_info(tconn
, "Connection closed\n");
4088 if (conn_highest_role(tconn
) == R_PRIMARY
&& conn_highest_pdsk(tconn
) >= D_UNKNOWN
)
4089 conn_try_outdate_peer_async(tconn
);
4091 spin_lock_irq(&tconn
->req_lock
);
4093 if (oc
>= C_UNCONNECTED
)
4094 rv
= _conn_request_state(tconn
, NS(conn
, C_UNCONNECTED
), CS_VERBOSE
);
4096 spin_unlock_irq(&tconn
->req_lock
);
4098 if (oc
== C_DISCONNECTING
) {
4099 wait_event(tconn
->net_cnt_wait
, atomic_read(&tconn
->net_cnt
) == 0);
4101 crypto_free_hash(tconn
->cram_hmac_tfm
);
4102 tconn
->cram_hmac_tfm
= NULL
;
4104 kfree(tconn
->net_conf
);
4105 tconn
->net_conf
= NULL
;
4106 conn_request_state(tconn
, NS(conn
, C_STANDALONE
), CS_VERBOSE
);
4110 static int drbd_disconnected(int vnr
, void *p
, void *data
)
4112 struct drbd_conf
*mdev
= (struct drbd_conf
*)p
;
4113 enum drbd_fencing_p fp
;
4116 /* wait for current activity to cease. */
4117 spin_lock_irq(&mdev
->tconn
->req_lock
);
4118 _drbd_wait_ee_list_empty(mdev
, &mdev
->active_ee
);
4119 _drbd_wait_ee_list_empty(mdev
, &mdev
->sync_ee
);
4120 _drbd_wait_ee_list_empty(mdev
, &mdev
->read_ee
);
4121 spin_unlock_irq(&mdev
->tconn
->req_lock
);
4123 /* We do not have data structures that would allow us to
4124 * get the rs_pending_cnt down to 0 again.
4125 * * On C_SYNC_TARGET we do not have any data structures describing
4126 * the pending RSDataRequest's we have sent.
4127 * * On C_SYNC_SOURCE there is no data structure that tracks
4128 * the P_RS_DATA_REPLY blocks that we sent to the SyncTarget.
4129 * And no, it is not the sum of the reference counts in the
4130 * resync_LRU. The resync_LRU tracks the whole operation including
4131 * the disk-IO, while the rs_pending_cnt only tracks the blocks
4133 drbd_rs_cancel_all(mdev
);
4135 mdev
->rs_failed
= 0;
4136 atomic_set(&mdev
->rs_pending_cnt
, 0);
4137 wake_up(&mdev
->misc_wait
);
4139 del_timer(&mdev
->request_timer
);
4141 del_timer_sync(&mdev
->resync_timer
);
4142 resync_timer_fn((unsigned long)mdev
);
4144 /* wait for all w_e_end_data_req, w_e_end_rsdata_req, w_send_barrier,
4145 * w_make_resync_request etc. which may still be on the worker queue
4146 * to be "canceled" */
4147 drbd_flush_workqueue(mdev
);
4149 /* This also does reclaim_net_ee(). If we do this too early, we might
4150 * miss some resync ee and pages.*/
4151 drbd_process_done_ee(mdev
);
4153 kfree(mdev
->p_uuid
);
4154 mdev
->p_uuid
= NULL
;
4156 if (!drbd_suspended(mdev
))
4157 tl_clear(mdev
->tconn
);
4162 if (get_ldev(mdev
)) {
4163 fp
= mdev
->ldev
->dc
.fencing
;
4167 /* serialize with bitmap writeout triggered by the state change,
4169 wait_event(mdev
->misc_wait
, !test_bit(BITMAP_IO
, &mdev
->flags
));
4171 /* tcp_close and release of sendpage pages can be deferred. I don't
4172 * want to use SO_LINGER, because apparently it can be deferred for
4173 * more than 20 seconds (longest time I checked).
4175 * Actually we don't care for exactly when the network stack does its
4176 * put_page(), but release our reference on these pages right here.
4178 i
= drbd_release_ee(mdev
, &mdev
->net_ee
);
4180 dev_info(DEV
, "net_ee not empty, killed %u entries\n", i
);
4181 i
= atomic_read(&mdev
->pp_in_use_by_net
);
4183 dev_info(DEV
, "pp_in_use_by_net = %d, expected 0\n", i
);
4184 i
= atomic_read(&mdev
->pp_in_use
);
4186 dev_info(DEV
, "pp_in_use = %d, expected 0\n", i
);
4188 D_ASSERT(list_empty(&mdev
->read_ee
));
4189 D_ASSERT(list_empty(&mdev
->active_ee
));
4190 D_ASSERT(list_empty(&mdev
->sync_ee
));
4191 D_ASSERT(list_empty(&mdev
->done_ee
));
4193 /* ok, no more ee's on the fly, it is safe to reset the epoch_size */
4194 atomic_set(&mdev
->current_epoch
->epoch_size
, 0);
4195 D_ASSERT(list_empty(&mdev
->current_epoch
->list
));
4201 * We support PRO_VERSION_MIN to PRO_VERSION_MAX. The protocol version
4202 * we can agree on is stored in agreed_pro_version.
4204 * feature flags and the reserved array should be enough room for future
4205 * enhancements of the handshake protocol, and possible plugins...
4207 * for now, they are expected to be zero, but ignored.
4209 static int drbd_send_features(struct drbd_tconn
*tconn
)
4211 struct drbd_socket
*sock
;
4212 struct p_connection_features
*p
;
4214 sock
= &tconn
->data
;
4215 p
= conn_prepare_command(tconn
, sock
);
4218 memset(p
, 0, sizeof(*p
));
4219 p
->protocol_min
= cpu_to_be32(PRO_VERSION_MIN
);
4220 p
->protocol_max
= cpu_to_be32(PRO_VERSION_MAX
);
4221 return conn_send_command(tconn
, sock
, P_CONNECTION_FEATURES
, sizeof(*p
), NULL
, 0);
4226 * 1 yes, we have a valid connection
4227 * 0 oops, did not work out, please try again
4228 * -1 peer talks different language,
4229 * no point in trying again, please go standalone.
4231 static int drbd_do_features(struct drbd_tconn
*tconn
)
4233 /* ASSERT current == tconn->receiver ... */
4234 struct p_connection_features
*p
;
4235 const int expect
= sizeof(struct p_connection_features
);
4236 struct packet_info pi
;
4239 err
= drbd_send_features(tconn
);
4243 err
= drbd_recv_header(tconn
, &pi
);
4247 if (pi
.cmd
!= P_CONNECTION_FEATURES
) {
4248 conn_err(tconn
, "expected ConnectionFeatures packet, received: %s (0x%04x)\n",
4249 cmdname(pi
.cmd
), pi
.cmd
);
4253 if (pi
.size
!= expect
) {
4254 conn_err(tconn
, "expected ConnectionFeatures length: %u, received: %u\n",
4260 err
= drbd_recv_all_warn(tconn
, p
, expect
);
4264 p
->protocol_min
= be32_to_cpu(p
->protocol_min
);
4265 p
->protocol_max
= be32_to_cpu(p
->protocol_max
);
4266 if (p
->protocol_max
== 0)
4267 p
->protocol_max
= p
->protocol_min
;
4269 if (PRO_VERSION_MAX
< p
->protocol_min
||
4270 PRO_VERSION_MIN
> p
->protocol_max
)
4273 tconn
->agreed_pro_version
= min_t(int, PRO_VERSION_MAX
, p
->protocol_max
);
4275 conn_info(tconn
, "Handshake successful: "
4276 "Agreed network protocol version %d\n", tconn
->agreed_pro_version
);
4281 conn_err(tconn
, "incompatible DRBD dialects: "
4282 "I support %d-%d, peer supports %d-%d\n",
4283 PRO_VERSION_MIN
, PRO_VERSION_MAX
,
4284 p
->protocol_min
, p
->protocol_max
);
4288 #if !defined(CONFIG_CRYPTO_HMAC) && !defined(CONFIG_CRYPTO_HMAC_MODULE)
4289 static int drbd_do_auth(struct drbd_tconn
*tconn
)
4291 dev_err(DEV
, "This kernel was build without CONFIG_CRYPTO_HMAC.\n");
4292 dev_err(DEV
, "You need to disable 'cram-hmac-alg' in drbd.conf.\n");
4296 #define CHALLENGE_LEN 64
4300 0 - failed, try again (network error),
4301 -1 - auth failed, don't try again.
4304 static int drbd_do_auth(struct drbd_tconn
*tconn
)
4306 struct drbd_socket
*sock
;
4307 char my_challenge
[CHALLENGE_LEN
]; /* 64 Bytes... */
4308 struct scatterlist sg
;
4309 char *response
= NULL
;
4310 char *right_response
= NULL
;
4311 char *peers_ch
= NULL
;
4312 unsigned int key_len
= strlen(tconn
->net_conf
->shared_secret
);
4313 unsigned int resp_size
;
4314 struct hash_desc desc
;
4315 struct packet_info pi
;
4318 /* FIXME: Put the challenge/response into the preallocated socket buffer. */
4320 desc
.tfm
= tconn
->cram_hmac_tfm
;
4323 rv
= crypto_hash_setkey(tconn
->cram_hmac_tfm
,
4324 (u8
*)tconn
->net_conf
->shared_secret
, key_len
);
4326 conn_err(tconn
, "crypto_hash_setkey() failed with %d\n", rv
);
4331 get_random_bytes(my_challenge
, CHALLENGE_LEN
);
4333 sock
= &tconn
->data
;
4334 if (!conn_prepare_command(tconn
, sock
)) {
4338 rv
= !conn_send_command(tconn
, sock
, P_AUTH_CHALLENGE
, 0,
4339 my_challenge
, CHALLENGE_LEN
);
4343 err
= drbd_recv_header(tconn
, &pi
);
4349 if (pi
.cmd
!= P_AUTH_CHALLENGE
) {
4350 conn_err(tconn
, "expected AuthChallenge packet, received: %s (0x%04x)\n",
4351 cmdname(pi
.cmd
), pi
.cmd
);
4356 if (pi
.size
> CHALLENGE_LEN
* 2) {
4357 conn_err(tconn
, "expected AuthChallenge payload too big.\n");
4362 peers_ch
= kmalloc(pi
.size
, GFP_NOIO
);
4363 if (peers_ch
== NULL
) {
4364 conn_err(tconn
, "kmalloc of peers_ch failed\n");
4369 err
= drbd_recv_all_warn(tconn
, peers_ch
, pi
.size
);
4375 resp_size
= crypto_hash_digestsize(tconn
->cram_hmac_tfm
);
4376 response
= kmalloc(resp_size
, GFP_NOIO
);
4377 if (response
== NULL
) {
4378 conn_err(tconn
, "kmalloc of response failed\n");
4383 sg_init_table(&sg
, 1);
4384 sg_set_buf(&sg
, peers_ch
, pi
.size
);
4386 rv
= crypto_hash_digest(&desc
, &sg
, sg
.length
, response
);
4388 conn_err(tconn
, "crypto_hash_digest() failed with %d\n", rv
);
4393 if (!conn_prepare_command(tconn
, sock
)) {
4397 rv
= !conn_send_command(tconn
, sock
, P_AUTH_RESPONSE
, 0,
4398 response
, resp_size
);
4402 err
= drbd_recv_header(tconn
, &pi
);
4408 if (pi
.cmd
!= P_AUTH_RESPONSE
) {
4409 conn_err(tconn
, "expected AuthResponse packet, received: %s (0x%04x)\n",
4410 cmdname(pi
.cmd
), pi
.cmd
);
4415 if (pi
.size
!= resp_size
) {
4416 conn_err(tconn
, "expected AuthResponse payload of wrong size\n");
4421 err
= drbd_recv_all_warn(tconn
, response
, resp_size
);
4427 right_response
= kmalloc(resp_size
, GFP_NOIO
);
4428 if (right_response
== NULL
) {
4429 conn_err(tconn
, "kmalloc of right_response failed\n");
4434 sg_set_buf(&sg
, my_challenge
, CHALLENGE_LEN
);
4436 rv
= crypto_hash_digest(&desc
, &sg
, sg
.length
, right_response
);
4438 conn_err(tconn
, "crypto_hash_digest() failed with %d\n", rv
);
4443 rv
= !memcmp(response
, right_response
, resp_size
);
4446 conn_info(tconn
, "Peer authenticated using %d bytes of '%s' HMAC\n",
4447 resp_size
, tconn
->net_conf
->cram_hmac_alg
);
4454 kfree(right_response
);
4460 int drbdd_init(struct drbd_thread
*thi
)
4462 struct drbd_tconn
*tconn
= thi
->tconn
;
4465 conn_info(tconn
, "receiver (re)started\n");
4468 h
= drbd_connect(tconn
);
4470 drbd_disconnect(tconn
);
4471 schedule_timeout_interruptible(HZ
);
4474 conn_warn(tconn
, "Discarding network configuration.\n");
4475 conn_request_state(tconn
, NS(conn
, C_DISCONNECTING
), CS_HARD
);
4480 if (get_net_conf(tconn
)) {
4482 put_net_conf(tconn
);
4486 drbd_disconnect(tconn
);
4488 conn_info(tconn
, "receiver terminated\n");
4492 /* ********* acknowledge sender ******** */
4494 static int got_conn_RqSReply(struct drbd_tconn
*tconn
, struct packet_info
*pi
)
4496 struct p_req_state_reply
*p
= pi
->data
;
4497 int retcode
= be32_to_cpu(p
->retcode
);
4499 if (retcode
>= SS_SUCCESS
) {
4500 set_bit(CONN_WD_ST_CHG_OKAY
, &tconn
->flags
);
4502 set_bit(CONN_WD_ST_CHG_FAIL
, &tconn
->flags
);
4503 conn_err(tconn
, "Requested state change failed by peer: %s (%d)\n",
4504 drbd_set_st_err_str(retcode
), retcode
);
4506 wake_up(&tconn
->ping_wait
);
4511 static int got_RqSReply(struct drbd_tconn
*tconn
, struct packet_info
*pi
)
4513 struct drbd_conf
*mdev
;
4514 struct p_req_state_reply
*p
= pi
->data
;
4515 int retcode
= be32_to_cpu(p
->retcode
);
4517 mdev
= vnr_to_mdev(tconn
, pi
->vnr
);
4521 if (retcode
>= SS_SUCCESS
) {
4522 set_bit(CL_ST_CHG_SUCCESS
, &mdev
->flags
);
4524 set_bit(CL_ST_CHG_FAIL
, &mdev
->flags
);
4525 dev_err(DEV
, "Requested state change failed by peer: %s (%d)\n",
4526 drbd_set_st_err_str(retcode
), retcode
);
4528 wake_up(&mdev
->state_wait
);
4533 static int got_Ping(struct drbd_tconn
*tconn
, struct packet_info
*pi
)
4535 return drbd_send_ping_ack(tconn
);
4539 static int got_PingAck(struct drbd_tconn
*tconn
, struct packet_info
*pi
)
4541 /* restore idle timeout */
4542 tconn
->meta
.socket
->sk
->sk_rcvtimeo
= tconn
->net_conf
->ping_int
*HZ
;
4543 if (!test_and_set_bit(GOT_PING_ACK
, &tconn
->flags
))
4544 wake_up(&tconn
->ping_wait
);
4549 static int got_IsInSync(struct drbd_tconn
*tconn
, struct packet_info
*pi
)
4551 struct drbd_conf
*mdev
;
4552 struct p_block_ack
*p
= pi
->data
;
4553 sector_t sector
= be64_to_cpu(p
->sector
);
4554 int blksize
= be32_to_cpu(p
->blksize
);
4556 mdev
= vnr_to_mdev(tconn
, pi
->vnr
);
4560 D_ASSERT(mdev
->tconn
->agreed_pro_version
>= 89);
4562 update_peer_seq(mdev
, be32_to_cpu(p
->seq_num
));
4564 if (get_ldev(mdev
)) {
4565 drbd_rs_complete_io(mdev
, sector
);
4566 drbd_set_in_sync(mdev
, sector
, blksize
);
4567 /* rs_same_csums is supposed to count in units of BM_BLOCK_SIZE */
4568 mdev
->rs_same_csum
+= (blksize
>> BM_BLOCK_SHIFT
);
4571 dec_rs_pending(mdev
);
4572 atomic_add(blksize
>> 9, &mdev
->rs_sect_in
);
4578 validate_req_change_req_state(struct drbd_conf
*mdev
, u64 id
, sector_t sector
,
4579 struct rb_root
*root
, const char *func
,
4580 enum drbd_req_event what
, bool missing_ok
)
4582 struct drbd_request
*req
;
4583 struct bio_and_error m
;
4585 spin_lock_irq(&mdev
->tconn
->req_lock
);
4586 req
= find_request(mdev
, root
, id
, sector
, missing_ok
, func
);
4587 if (unlikely(!req
)) {
4588 spin_unlock_irq(&mdev
->tconn
->req_lock
);
4591 __req_mod(req
, what
, &m
);
4592 spin_unlock_irq(&mdev
->tconn
->req_lock
);
4595 complete_master_bio(mdev
, &m
);
4599 static int got_BlockAck(struct drbd_tconn
*tconn
, struct packet_info
*pi
)
4601 struct drbd_conf
*mdev
;
4602 struct p_block_ack
*p
= pi
->data
;
4603 sector_t sector
= be64_to_cpu(p
->sector
);
4604 int blksize
= be32_to_cpu(p
->blksize
);
4605 enum drbd_req_event what
;
4607 mdev
= vnr_to_mdev(tconn
, pi
->vnr
);
4611 update_peer_seq(mdev
, be32_to_cpu(p
->seq_num
));
4613 if (p
->block_id
== ID_SYNCER
) {
4614 drbd_set_in_sync(mdev
, sector
, blksize
);
4615 dec_rs_pending(mdev
);
4619 case P_RS_WRITE_ACK
:
4620 D_ASSERT(mdev
->tconn
->net_conf
->wire_protocol
== DRBD_PROT_C
);
4621 what
= WRITE_ACKED_BY_PEER_AND_SIS
;
4624 D_ASSERT(mdev
->tconn
->net_conf
->wire_protocol
== DRBD_PROT_C
);
4625 what
= WRITE_ACKED_BY_PEER
;
4628 D_ASSERT(mdev
->tconn
->net_conf
->wire_protocol
== DRBD_PROT_B
);
4629 what
= RECV_ACKED_BY_PEER
;
4631 case P_DISCARD_WRITE
:
4632 D_ASSERT(mdev
->tconn
->net_conf
->wire_protocol
== DRBD_PROT_C
);
4633 what
= DISCARD_WRITE
;
4636 D_ASSERT(mdev
->tconn
->net_conf
->wire_protocol
== DRBD_PROT_C
);
4637 what
= POSTPONE_WRITE
;
4643 return validate_req_change_req_state(mdev
, p
->block_id
, sector
,
4644 &mdev
->write_requests
, __func__
,
4648 static int got_NegAck(struct drbd_tconn
*tconn
, struct packet_info
*pi
)
4650 struct drbd_conf
*mdev
;
4651 struct p_block_ack
*p
= pi
->data
;
4652 sector_t sector
= be64_to_cpu(p
->sector
);
4653 int size
= be32_to_cpu(p
->blksize
);
4654 bool missing_ok
= tconn
->net_conf
->wire_protocol
== DRBD_PROT_A
||
4655 tconn
->net_conf
->wire_protocol
== DRBD_PROT_B
;
4658 mdev
= vnr_to_mdev(tconn
, pi
->vnr
);
4662 update_peer_seq(mdev
, be32_to_cpu(p
->seq_num
));
4664 if (p
->block_id
== ID_SYNCER
) {
4665 dec_rs_pending(mdev
);
4666 drbd_rs_failed_io(mdev
, sector
, size
);
4670 err
= validate_req_change_req_state(mdev
, p
->block_id
, sector
,
4671 &mdev
->write_requests
, __func__
,
4672 NEG_ACKED
, missing_ok
);
4674 /* Protocol A has no P_WRITE_ACKs, but has P_NEG_ACKs.
4675 The master bio might already be completed, therefore the
4676 request is no longer in the collision hash. */
4677 /* In Protocol B we might already have got a P_RECV_ACK
4678 but then get a P_NEG_ACK afterwards. */
4681 drbd_set_out_of_sync(mdev
, sector
, size
);
4686 static int got_NegDReply(struct drbd_tconn
*tconn
, struct packet_info
*pi
)
4688 struct drbd_conf
*mdev
;
4689 struct p_block_ack
*p
= pi
->data
;
4690 sector_t sector
= be64_to_cpu(p
->sector
);
4692 mdev
= vnr_to_mdev(tconn
, pi
->vnr
);
4696 update_peer_seq(mdev
, be32_to_cpu(p
->seq_num
));
4698 dev_err(DEV
, "Got NegDReply; Sector %llus, len %u; Fail original request.\n",
4699 (unsigned long long)sector
, be32_to_cpu(p
->blksize
));
4701 return validate_req_change_req_state(mdev
, p
->block_id
, sector
,
4702 &mdev
->read_requests
, __func__
,
4706 static int got_NegRSDReply(struct drbd_tconn
*tconn
, struct packet_info
*pi
)
4708 struct drbd_conf
*mdev
;
4711 struct p_block_ack
*p
= pi
->data
;
4713 mdev
= vnr_to_mdev(tconn
, pi
->vnr
);
4717 sector
= be64_to_cpu(p
->sector
);
4718 size
= be32_to_cpu(p
->blksize
);
4720 update_peer_seq(mdev
, be32_to_cpu(p
->seq_num
));
4722 dec_rs_pending(mdev
);
4724 if (get_ldev_if_state(mdev
, D_FAILED
)) {
4725 drbd_rs_complete_io(mdev
, sector
);
4727 case P_NEG_RS_DREPLY
:
4728 drbd_rs_failed_io(mdev
, sector
, size
);
4740 static int got_BarrierAck(struct drbd_tconn
*tconn
, struct packet_info
*pi
)
4742 struct drbd_conf
*mdev
;
4743 struct p_barrier_ack
*p
= pi
->data
;
4745 mdev
= vnr_to_mdev(tconn
, pi
->vnr
);
4749 tl_release(mdev
->tconn
, p
->barrier
, be32_to_cpu(p
->set_size
));
4751 if (mdev
->state
.conn
== C_AHEAD
&&
4752 atomic_read(&mdev
->ap_in_flight
) == 0 &&
4753 !test_and_set_bit(AHEAD_TO_SYNC_SOURCE
, &mdev
->current_epoch
->flags
)) {
4754 mdev
->start_resync_timer
.expires
= jiffies
+ HZ
;
4755 add_timer(&mdev
->start_resync_timer
);
4761 static int got_OVResult(struct drbd_tconn
*tconn
, struct packet_info
*pi
)
4763 struct drbd_conf
*mdev
;
4764 struct p_block_ack
*p
= pi
->data
;
4765 struct drbd_work
*w
;
4769 mdev
= vnr_to_mdev(tconn
, pi
->vnr
);
4773 sector
= be64_to_cpu(p
->sector
);
4774 size
= be32_to_cpu(p
->blksize
);
4776 update_peer_seq(mdev
, be32_to_cpu(p
->seq_num
));
4778 if (be64_to_cpu(p
->block_id
) == ID_OUT_OF_SYNC
)
4779 drbd_ov_out_of_sync_found(mdev
, sector
, size
);
4781 ov_out_of_sync_print(mdev
);
4783 if (!get_ldev(mdev
))
4786 drbd_rs_complete_io(mdev
, sector
);
4787 dec_rs_pending(mdev
);
4791 /* let's advance progress step marks only for every other megabyte */
4792 if ((mdev
->ov_left
& 0x200) == 0x200)
4793 drbd_advance_rs_marks(mdev
, mdev
->ov_left
);
4795 if (mdev
->ov_left
== 0) {
4796 w
= kmalloc(sizeof(*w
), GFP_NOIO
);
4798 w
->cb
= w_ov_finished
;
4800 drbd_queue_work_front(&mdev
->tconn
->data
.work
, w
);
4802 dev_err(DEV
, "kmalloc(w) failed.");
4803 ov_out_of_sync_print(mdev
);
4804 drbd_resync_finished(mdev
);
4811 static int got_skip(struct drbd_tconn
*tconn
, struct packet_info
*pi
)
4816 static int tconn_process_done_ee(struct drbd_tconn
*tconn
)
4818 struct drbd_conf
*mdev
;
4819 int i
, not_empty
= 0;
4822 clear_bit(SIGNAL_ASENDER
, &tconn
->flags
);
4823 flush_signals(current
);
4824 idr_for_each_entry(&tconn
->volumes
, mdev
, i
) {
4825 if (drbd_process_done_ee(mdev
))
4826 return 1; /* error */
4828 set_bit(SIGNAL_ASENDER
, &tconn
->flags
);
4830 spin_lock_irq(&tconn
->req_lock
);
4831 idr_for_each_entry(&tconn
->volumes
, mdev
, i
) {
4832 not_empty
= !list_empty(&mdev
->done_ee
);
4836 spin_unlock_irq(&tconn
->req_lock
);
4837 } while (not_empty
);
4842 struct asender_cmd
{
4844 int (*fn
)(struct drbd_tconn
*tconn
, struct packet_info
*);
4847 static struct asender_cmd asender_tbl
[] = {
4848 [P_PING
] = { 0, got_Ping
},
4849 [P_PING_ACK
] = { 0, got_PingAck
},
4850 [P_RECV_ACK
] = { sizeof(struct p_block_ack
), got_BlockAck
},
4851 [P_WRITE_ACK
] = { sizeof(struct p_block_ack
), got_BlockAck
},
4852 [P_RS_WRITE_ACK
] = { sizeof(struct p_block_ack
), got_BlockAck
},
4853 [P_DISCARD_WRITE
] = { sizeof(struct p_block_ack
), got_BlockAck
},
4854 [P_NEG_ACK
] = { sizeof(struct p_block_ack
), got_NegAck
},
4855 [P_NEG_DREPLY
] = { sizeof(struct p_block_ack
), got_NegDReply
},
4856 [P_NEG_RS_DREPLY
] = { sizeof(struct p_block_ack
), got_NegRSDReply
},
4857 [P_OV_RESULT
] = { sizeof(struct p_block_ack
), got_OVResult
},
4858 [P_BARRIER_ACK
] = { sizeof(struct p_barrier_ack
), got_BarrierAck
},
4859 [P_STATE_CHG_REPLY
] = { sizeof(struct p_req_state_reply
), got_RqSReply
},
4860 [P_RS_IS_IN_SYNC
] = { sizeof(struct p_block_ack
), got_IsInSync
},
4861 [P_DELAY_PROBE
] = { sizeof(struct p_delay_probe93
), got_skip
},
4862 [P_RS_CANCEL
] = { sizeof(struct p_block_ack
), got_NegRSDReply
},
4863 [P_CONN_ST_CHG_REPLY
]={ sizeof(struct p_req_state_reply
), got_conn_RqSReply
},
4864 [P_RETRY_WRITE
] = { sizeof(struct p_block_ack
), got_BlockAck
},
4867 int drbd_asender(struct drbd_thread
*thi
)
4869 struct drbd_tconn
*tconn
= thi
->tconn
;
4870 struct asender_cmd
*cmd
= NULL
;
4871 struct packet_info pi
;
4873 void *buf
= tconn
->meta
.rbuf
;
4875 unsigned int header_size
= drbd_header_size(tconn
);
4876 int expect
= header_size
;
4877 int ping_timeout_active
= 0;
4879 current
->policy
= SCHED_RR
; /* Make this a realtime task! */
4880 current
->rt_priority
= 2; /* more important than all other tasks */
4882 while (get_t_state(thi
) == RUNNING
) {
4883 drbd_thread_current_set_cpu(thi
);
4884 if (test_and_clear_bit(SEND_PING
, &tconn
->flags
)) {
4885 if (drbd_send_ping(tconn
)) {
4886 conn_err(tconn
, "drbd_send_ping has failed\n");
4889 tconn
->meta
.socket
->sk
->sk_rcvtimeo
=
4890 tconn
->net_conf
->ping_timeo
*HZ
/10;
4891 ping_timeout_active
= 1;
4894 /* TODO: conditionally cork; it may hurt latency if we cork without
4896 if (!tconn
->net_conf
->no_cork
)
4897 drbd_tcp_cork(tconn
->meta
.socket
);
4898 if (tconn_process_done_ee(tconn
)) {
4899 conn_err(tconn
, "tconn_process_done_ee() failed\n");
4902 /* but unconditionally uncork unless disabled */
4903 if (!tconn
->net_conf
->no_cork
)
4904 drbd_tcp_uncork(tconn
->meta
.socket
);
4906 /* short circuit, recv_msg would return EINTR anyways. */
4907 if (signal_pending(current
))
4910 rv
= drbd_recv_short(tconn
->meta
.socket
, buf
, expect
-received
, 0);
4911 clear_bit(SIGNAL_ASENDER
, &tconn
->flags
);
4913 flush_signals(current
);
4916 * -EINTR (on meta) we got a signal
4917 * -EAGAIN (on meta) rcvtimeo expired
4918 * -ECONNRESET other side closed the connection
4919 * -ERESTARTSYS (on data) we got a signal
4920 * rv < 0 other than above: unexpected error!
4921 * rv == expected: full header or command
4922 * rv < expected: "woken" by signal during receive
4923 * rv == 0 : "connection shut down by peer"
4925 if (likely(rv
> 0)) {
4928 } else if (rv
== 0) {
4929 conn_err(tconn
, "meta connection shut down by peer.\n");
4931 } else if (rv
== -EAGAIN
) {
4932 /* If the data socket received something meanwhile,
4933 * that is good enough: peer is still alive. */
4934 if (time_after(tconn
->last_received
,
4935 jiffies
- tconn
->meta
.socket
->sk
->sk_rcvtimeo
))
4937 if (ping_timeout_active
) {
4938 conn_err(tconn
, "PingAck did not arrive in time.\n");
4941 set_bit(SEND_PING
, &tconn
->flags
);
4943 } else if (rv
== -EINTR
) {
4946 conn_err(tconn
, "sock_recvmsg returned %d\n", rv
);
4950 if (received
== expect
&& cmd
== NULL
) {
4951 if (decode_header(tconn
, tconn
->meta
.rbuf
, &pi
))
4953 cmd
= &asender_tbl
[pi
.cmd
];
4954 if (pi
.cmd
>= ARRAY_SIZE(asender_tbl
) || !cmd
->fn
) {
4955 conn_err(tconn
, "unknown command %d on meta (l: %d)\n",
4959 expect
= header_size
+ cmd
->pkt_size
;
4960 if (pi
.size
!= expect
- header_size
) {
4961 conn_err(tconn
, "Wrong packet size on meta (c: %d, l: %d)\n",
4966 if (received
== expect
) {
4969 err
= cmd
->fn(tconn
, &pi
);
4971 conn_err(tconn
, "%pf failed\n", cmd
->fn
);
4975 tconn
->last_received
= jiffies
;
4977 /* the idle_timeout (ping-int)
4978 * has been restored in got_PingAck() */
4979 if (cmd
== &asender_tbl
[P_PING_ACK
])
4980 ping_timeout_active
= 0;
4982 buf
= tconn
->meta
.rbuf
;
4984 expect
= header_size
;
4991 conn_request_state(tconn
, NS(conn
, C_NETWORK_FAILURE
), CS_HARD
);
4995 conn_request_state(tconn
, NS(conn
, C_DISCONNECTING
), CS_HARD
);
4997 clear_bit(SIGNAL_ASENDER
, &tconn
->flags
);
4999 conn_info(tconn
, "asender terminated\n");