drbd: Rename drbd_free_ee() and variants to *_peer_req()
[deliverable/linux.git] / drivers / block / drbd / drbd_receiver.c
1 /*
2 drbd_receiver.c
3
4 This file is part of DRBD by Philipp Reisner and Lars Ellenberg.
5
6 Copyright (C) 2001-2008, LINBIT Information Technologies GmbH.
7 Copyright (C) 1999-2008, Philipp Reisner <philipp.reisner@linbit.com>.
8 Copyright (C) 2002-2008, Lars Ellenberg <lars.ellenberg@linbit.com>.
9
10 drbd is free software; you can redistribute it and/or modify
11 it under the terms of the GNU General Public License as published by
12 the Free Software Foundation; either version 2, or (at your option)
13 any later version.
14
15 drbd is distributed in the hope that it will be useful,
16 but WITHOUT ANY WARRANTY; without even the implied warranty of
17 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18 GNU General Public License for more details.
19
20 You should have received a copy of the GNU General Public License
21 along with drbd; see the file COPYING. If not, write to
22 the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
23 */
24
25
26 #include <linux/module.h>
27
28 #include <asm/uaccess.h>
29 #include <net/sock.h>
30
31 #include <linux/drbd.h>
32 #include <linux/fs.h>
33 #include <linux/file.h>
34 #include <linux/in.h>
35 #include <linux/mm.h>
36 #include <linux/memcontrol.h>
37 #include <linux/mm_inline.h>
38 #include <linux/slab.h>
39 #include <linux/pkt_sched.h>
40 #define __KERNEL_SYSCALLS__
41 #include <linux/unistd.h>
42 #include <linux/vmalloc.h>
43 #include <linux/random.h>
44 #include <linux/string.h>
45 #include <linux/scatterlist.h>
46 #include "drbd_int.h"
47 #include "drbd_req.h"
48
49 #include "drbd_vli.h"
50
51 struct packet_info {
52 enum drbd_packet cmd;
53 unsigned int size;
54 unsigned int vnr;
55 void *data;
56 };
57
58 enum finish_epoch {
59 FE_STILL_LIVE,
60 FE_DESTROYED,
61 FE_RECYCLED,
62 };
63
64 static int drbd_do_features(struct drbd_tconn *tconn);
65 static int drbd_do_auth(struct drbd_tconn *tconn);
66 static int drbd_disconnected(int vnr, void *p, void *data);
67
68 static enum finish_epoch drbd_may_finish_epoch(struct drbd_conf *, struct drbd_epoch *, enum epoch_event);
69 static int e_end_block(struct drbd_work *, int);
70
71
72 #define GFP_TRY (__GFP_HIGHMEM | __GFP_NOWARN)
73
74 /*
75 * some helper functions to deal with single linked page lists,
76 * page->private being our "next" pointer.
77 */
78
79 /* If at least n pages are linked at head, get n pages off.
80 * Otherwise, don't modify head, and return NULL.
81 * Locking is the responsibility of the caller.
82 */
83 static struct page *page_chain_del(struct page **head, int n)
84 {
85 struct page *page;
86 struct page *tmp;
87
88 BUG_ON(!n);
89 BUG_ON(!head);
90
91 page = *head;
92
93 if (!page)
94 return NULL;
95
96 while (page) {
97 tmp = page_chain_next(page);
98 if (--n == 0)
99 break; /* found sufficient pages */
100 if (tmp == NULL)
101 /* insufficient pages, don't use any of them. */
102 return NULL;
103 page = tmp;
104 }
105
106 /* add end of list marker for the returned list */
107 set_page_private(page, 0);
108 /* actual return value, and adjustment of head */
109 page = *head;
110 *head = tmp;
111 return page;
112 }
113
114 /* may be used outside of locks to find the tail of a (usually short)
115 * "private" page chain, before adding it back to a global chain head
116 * with page_chain_add() under a spinlock. */
117 static struct page *page_chain_tail(struct page *page, int *len)
118 {
119 struct page *tmp;
120 int i = 1;
121 while ((tmp = page_chain_next(page)))
122 ++i, page = tmp;
123 if (len)
124 *len = i;
125 return page;
126 }
127
128 static int page_chain_free(struct page *page)
129 {
130 struct page *tmp;
131 int i = 0;
132 page_chain_for_each_safe(page, tmp) {
133 put_page(page);
134 ++i;
135 }
136 return i;
137 }
138
139 static void page_chain_add(struct page **head,
140 struct page *chain_first, struct page *chain_last)
141 {
142 #if 1
143 struct page *tmp;
144 tmp = page_chain_tail(chain_first, NULL);
145 BUG_ON(tmp != chain_last);
146 #endif
147
148 /* add chain to head */
149 set_page_private(chain_last, (unsigned long)*head);
150 *head = chain_first;
151 }
152
153 static struct page *drbd_pp_first_pages_or_try_alloc(struct drbd_conf *mdev, int number)
154 {
155 struct page *page = NULL;
156 struct page *tmp = NULL;
157 int i = 0;
158
159 /* Yes, testing drbd_pp_vacant outside the lock is racy.
160 * So what. It saves a spin_lock. */
161 if (drbd_pp_vacant >= number) {
162 spin_lock(&drbd_pp_lock);
163 page = page_chain_del(&drbd_pp_pool, number);
164 if (page)
165 drbd_pp_vacant -= number;
166 spin_unlock(&drbd_pp_lock);
167 if (page)
168 return page;
169 }
170
171 /* GFP_TRY, because we must not cause arbitrary write-out: in a DRBD
172 * "criss-cross" setup, that might cause write-out on some other DRBD,
173 * which in turn might block on the other node at this very place. */
174 for (i = 0; i < number; i++) {
175 tmp = alloc_page(GFP_TRY);
176 if (!tmp)
177 break;
178 set_page_private(tmp, (unsigned long)page);
179 page = tmp;
180 }
181
182 if (i == number)
183 return page;
184
185 /* Not enough pages immediately available this time.
186 * No need to jump around here, drbd_pp_alloc will retry this
187 * function "soon". */
188 if (page) {
189 tmp = page_chain_tail(page, NULL);
190 spin_lock(&drbd_pp_lock);
191 page_chain_add(&drbd_pp_pool, page, tmp);
192 drbd_pp_vacant += i;
193 spin_unlock(&drbd_pp_lock);
194 }
195 return NULL;
196 }
197
198 static void reclaim_net_ee(struct drbd_conf *mdev, struct list_head *to_be_freed)
199 {
200 struct drbd_peer_request *peer_req;
201 struct list_head *le, *tle;
202
203 /* The EEs are always appended to the end of the list. Since
204 they are sent in order over the wire, they have to finish
205 in order. As soon as we see the first not finished we can
206 stop to examine the list... */
207
208 list_for_each_safe(le, tle, &mdev->net_ee) {
209 peer_req = list_entry(le, struct drbd_peer_request, w.list);
210 if (drbd_ee_has_active_page(peer_req))
211 break;
212 list_move(le, to_be_freed);
213 }
214 }
215
216 static void drbd_kick_lo_and_reclaim_net(struct drbd_conf *mdev)
217 {
218 LIST_HEAD(reclaimed);
219 struct drbd_peer_request *peer_req, *t;
220
221 spin_lock_irq(&mdev->tconn->req_lock);
222 reclaim_net_ee(mdev, &reclaimed);
223 spin_unlock_irq(&mdev->tconn->req_lock);
224
225 list_for_each_entry_safe(peer_req, t, &reclaimed, w.list)
226 drbd_free_net_peer_req(mdev, peer_req);
227 }
228
229 /**
230 * drbd_pp_alloc() - Returns @number pages, retries forever (or until signalled)
231 * @mdev: DRBD device.
232 * @number: number of pages requested
233 * @retry: whether to retry, if not enough pages are available right now
234 *
235 * Tries to allocate number pages, first from our own page pool, then from
236 * the kernel, unless this allocation would exceed the max_buffers setting.
237 * Possibly retry until DRBD frees sufficient pages somewhere else.
238 *
239 * Returns a page chain linked via page->private.
240 */
241 static struct page *drbd_pp_alloc(struct drbd_conf *mdev, unsigned number, bool retry)
242 {
243 struct page *page = NULL;
244 DEFINE_WAIT(wait);
245
246 /* Yes, we may run up to @number over max_buffers. If we
247 * follow it strictly, the admin will get it wrong anyways. */
248 if (atomic_read(&mdev->pp_in_use) < mdev->tconn->net_conf->max_buffers)
249 page = drbd_pp_first_pages_or_try_alloc(mdev, number);
250
251 while (page == NULL) {
252 prepare_to_wait(&drbd_pp_wait, &wait, TASK_INTERRUPTIBLE);
253
254 drbd_kick_lo_and_reclaim_net(mdev);
255
256 if (atomic_read(&mdev->pp_in_use) < mdev->tconn->net_conf->max_buffers) {
257 page = drbd_pp_first_pages_or_try_alloc(mdev, number);
258 if (page)
259 break;
260 }
261
262 if (!retry)
263 break;
264
265 if (signal_pending(current)) {
266 dev_warn(DEV, "drbd_pp_alloc interrupted!\n");
267 break;
268 }
269
270 schedule();
271 }
272 finish_wait(&drbd_pp_wait, &wait);
273
274 if (page)
275 atomic_add(number, &mdev->pp_in_use);
276 return page;
277 }
278
279 /* Must not be used from irq, as that may deadlock: see drbd_pp_alloc.
280 * Is also used from inside an other spin_lock_irq(&mdev->tconn->req_lock);
281 * Either links the page chain back to the global pool,
282 * or returns all pages to the system. */
283 static void drbd_pp_free(struct drbd_conf *mdev, struct page *page, int is_net)
284 {
285 atomic_t *a = is_net ? &mdev->pp_in_use_by_net : &mdev->pp_in_use;
286 int i;
287
288 if (drbd_pp_vacant > (DRBD_MAX_BIO_SIZE/PAGE_SIZE) * minor_count)
289 i = page_chain_free(page);
290 else {
291 struct page *tmp;
292 tmp = page_chain_tail(page, &i);
293 spin_lock(&drbd_pp_lock);
294 page_chain_add(&drbd_pp_pool, page, tmp);
295 drbd_pp_vacant += i;
296 spin_unlock(&drbd_pp_lock);
297 }
298 i = atomic_sub_return(i, a);
299 if (i < 0)
300 dev_warn(DEV, "ASSERTION FAILED: %s: %d < 0\n",
301 is_net ? "pp_in_use_by_net" : "pp_in_use", i);
302 wake_up(&drbd_pp_wait);
303 }
304
305 /*
306 You need to hold the req_lock:
307 _drbd_wait_ee_list_empty()
308
309 You must not have the req_lock:
310 drbd_free_peer_req()
311 drbd_alloc_peer_req()
312 drbd_release_ee()
313 drbd_ee_fix_bhs()
314 drbd_process_done_ee()
315 drbd_clear_done_ee()
316 drbd_wait_ee_list_empty()
317 */
318
319 struct drbd_peer_request *
320 drbd_alloc_peer_req(struct drbd_conf *mdev, u64 id, sector_t sector,
321 unsigned int data_size, gfp_t gfp_mask) __must_hold(local)
322 {
323 struct drbd_peer_request *peer_req;
324 struct page *page;
325 unsigned nr_pages = (data_size + PAGE_SIZE -1) >> PAGE_SHIFT;
326
327 if (drbd_insert_fault(mdev, DRBD_FAULT_AL_EE))
328 return NULL;
329
330 peer_req = mempool_alloc(drbd_ee_mempool, gfp_mask & ~__GFP_HIGHMEM);
331 if (!peer_req) {
332 if (!(gfp_mask & __GFP_NOWARN))
333 dev_err(DEV, "%s: allocation failed\n", __func__);
334 return NULL;
335 }
336
337 page = drbd_pp_alloc(mdev, nr_pages, (gfp_mask & __GFP_WAIT));
338 if (!page)
339 goto fail;
340
341 drbd_clear_interval(&peer_req->i);
342 peer_req->i.size = data_size;
343 peer_req->i.sector = sector;
344 peer_req->i.local = false;
345 peer_req->i.waiting = false;
346
347 peer_req->epoch = NULL;
348 peer_req->w.mdev = mdev;
349 peer_req->pages = page;
350 atomic_set(&peer_req->pending_bios, 0);
351 peer_req->flags = 0;
352 /*
353 * The block_id is opaque to the receiver. It is not endianness
354 * converted, and sent back to the sender unchanged.
355 */
356 peer_req->block_id = id;
357
358 return peer_req;
359
360 fail:
361 mempool_free(peer_req, drbd_ee_mempool);
362 return NULL;
363 }
364
365 void __drbd_free_peer_req(struct drbd_conf *mdev, struct drbd_peer_request *peer_req,
366 int is_net)
367 {
368 if (peer_req->flags & EE_HAS_DIGEST)
369 kfree(peer_req->digest);
370 drbd_pp_free(mdev, peer_req->pages, is_net);
371 D_ASSERT(atomic_read(&peer_req->pending_bios) == 0);
372 D_ASSERT(drbd_interval_empty(&peer_req->i));
373 mempool_free(peer_req, drbd_ee_mempool);
374 }
375
376 int drbd_release_ee(struct drbd_conf *mdev, struct list_head *list)
377 {
378 LIST_HEAD(work_list);
379 struct drbd_peer_request *peer_req, *t;
380 int count = 0;
381 int is_net = list == &mdev->net_ee;
382
383 spin_lock_irq(&mdev->tconn->req_lock);
384 list_splice_init(list, &work_list);
385 spin_unlock_irq(&mdev->tconn->req_lock);
386
387 list_for_each_entry_safe(peer_req, t, &work_list, w.list) {
388 __drbd_free_peer_req(mdev, peer_req, is_net);
389 count++;
390 }
391 return count;
392 }
393
394
395 /* See also comments in _req_mod(,BARRIER_ACKED)
396 * and receive_Barrier.
397 *
398 * Move entries from net_ee to done_ee, if ready.
399 * Grab done_ee, call all callbacks, free the entries.
400 * The callbacks typically send out ACKs.
401 */
402 static int drbd_process_done_ee(struct drbd_conf *mdev)
403 {
404 LIST_HEAD(work_list);
405 LIST_HEAD(reclaimed);
406 struct drbd_peer_request *peer_req, *t;
407 int err = 0;
408
409 spin_lock_irq(&mdev->tconn->req_lock);
410 reclaim_net_ee(mdev, &reclaimed);
411 list_splice_init(&mdev->done_ee, &work_list);
412 spin_unlock_irq(&mdev->tconn->req_lock);
413
414 list_for_each_entry_safe(peer_req, t, &reclaimed, w.list)
415 drbd_free_net_peer_req(mdev, peer_req);
416
417 /* possible callbacks here:
418 * e_end_block, and e_end_resync_block, e_send_discard_write.
419 * all ignore the last argument.
420 */
421 list_for_each_entry_safe(peer_req, t, &work_list, w.list) {
422 int err2;
423
424 /* list_del not necessary, next/prev members not touched */
425 err2 = peer_req->w.cb(&peer_req->w, !!err);
426 if (!err)
427 err = err2;
428 drbd_free_peer_req(mdev, peer_req);
429 }
430 wake_up(&mdev->ee_wait);
431
432 return err;
433 }
434
435 void _drbd_wait_ee_list_empty(struct drbd_conf *mdev, struct list_head *head)
436 {
437 DEFINE_WAIT(wait);
438
439 /* avoids spin_lock/unlock
440 * and calling prepare_to_wait in the fast path */
441 while (!list_empty(head)) {
442 prepare_to_wait(&mdev->ee_wait, &wait, TASK_UNINTERRUPTIBLE);
443 spin_unlock_irq(&mdev->tconn->req_lock);
444 io_schedule();
445 finish_wait(&mdev->ee_wait, &wait);
446 spin_lock_irq(&mdev->tconn->req_lock);
447 }
448 }
449
450 void drbd_wait_ee_list_empty(struct drbd_conf *mdev, struct list_head *head)
451 {
452 spin_lock_irq(&mdev->tconn->req_lock);
453 _drbd_wait_ee_list_empty(mdev, head);
454 spin_unlock_irq(&mdev->tconn->req_lock);
455 }
456
457 /* see also kernel_accept; which is only present since 2.6.18.
458 * also we want to log which part of it failed, exactly */
459 static int drbd_accept(const char **what, struct socket *sock, struct socket **newsock)
460 {
461 struct sock *sk = sock->sk;
462 int err = 0;
463
464 *what = "listen";
465 err = sock->ops->listen(sock, 5);
466 if (err < 0)
467 goto out;
468
469 *what = "sock_create_lite";
470 err = sock_create_lite(sk->sk_family, sk->sk_type, sk->sk_protocol,
471 newsock);
472 if (err < 0)
473 goto out;
474
475 *what = "accept";
476 err = sock->ops->accept(sock, *newsock, 0);
477 if (err < 0) {
478 sock_release(*newsock);
479 *newsock = NULL;
480 goto out;
481 }
482 (*newsock)->ops = sock->ops;
483
484 out:
485 return err;
486 }
487
488 static int drbd_recv_short(struct socket *sock, void *buf, size_t size, int flags)
489 {
490 mm_segment_t oldfs;
491 struct kvec iov = {
492 .iov_base = buf,
493 .iov_len = size,
494 };
495 struct msghdr msg = {
496 .msg_iovlen = 1,
497 .msg_iov = (struct iovec *)&iov,
498 .msg_flags = (flags ? flags : MSG_WAITALL | MSG_NOSIGNAL)
499 };
500 int rv;
501
502 oldfs = get_fs();
503 set_fs(KERNEL_DS);
504 rv = sock_recvmsg(sock, &msg, size, msg.msg_flags);
505 set_fs(oldfs);
506
507 return rv;
508 }
509
510 static int drbd_recv(struct drbd_tconn *tconn, void *buf, size_t size)
511 {
512 mm_segment_t oldfs;
513 struct kvec iov = {
514 .iov_base = buf,
515 .iov_len = size,
516 };
517 struct msghdr msg = {
518 .msg_iovlen = 1,
519 .msg_iov = (struct iovec *)&iov,
520 .msg_flags = MSG_WAITALL | MSG_NOSIGNAL
521 };
522 int rv;
523
524 oldfs = get_fs();
525 set_fs(KERNEL_DS);
526
527 for (;;) {
528 rv = sock_recvmsg(tconn->data.socket, &msg, size, msg.msg_flags);
529 if (rv == size)
530 break;
531
532 /* Note:
533 * ECONNRESET other side closed the connection
534 * ERESTARTSYS (on sock) we got a signal
535 */
536
537 if (rv < 0) {
538 if (rv == -ECONNRESET)
539 conn_info(tconn, "sock was reset by peer\n");
540 else if (rv != -ERESTARTSYS)
541 conn_err(tconn, "sock_recvmsg returned %d\n", rv);
542 break;
543 } else if (rv == 0) {
544 conn_info(tconn, "sock was shut down by peer\n");
545 break;
546 } else {
547 /* signal came in, or peer/link went down,
548 * after we read a partial message
549 */
550 /* D_ASSERT(signal_pending(current)); */
551 break;
552 }
553 };
554
555 set_fs(oldfs);
556
557 if (rv != size)
558 conn_request_state(tconn, NS(conn, C_BROKEN_PIPE), CS_HARD);
559
560 return rv;
561 }
562
563 static int drbd_recv_all(struct drbd_tconn *tconn, void *buf, size_t size)
564 {
565 int err;
566
567 err = drbd_recv(tconn, buf, size);
568 if (err != size) {
569 if (err >= 0)
570 err = -EIO;
571 } else
572 err = 0;
573 return err;
574 }
575
576 static int drbd_recv_all_warn(struct drbd_tconn *tconn, void *buf, size_t size)
577 {
578 int err;
579
580 err = drbd_recv_all(tconn, buf, size);
581 if (err && !signal_pending(current))
582 conn_warn(tconn, "short read (expected size %d)\n", (int)size);
583 return err;
584 }
585
586 /* quoting tcp(7):
587 * On individual connections, the socket buffer size must be set prior to the
588 * listen(2) or connect(2) calls in order to have it take effect.
589 * This is our wrapper to do so.
590 */
591 static void drbd_setbufsize(struct socket *sock, unsigned int snd,
592 unsigned int rcv)
593 {
594 /* open coded SO_SNDBUF, SO_RCVBUF */
595 if (snd) {
596 sock->sk->sk_sndbuf = snd;
597 sock->sk->sk_userlocks |= SOCK_SNDBUF_LOCK;
598 }
599 if (rcv) {
600 sock->sk->sk_rcvbuf = rcv;
601 sock->sk->sk_userlocks |= SOCK_RCVBUF_LOCK;
602 }
603 }
604
605 static struct socket *drbd_try_connect(struct drbd_tconn *tconn)
606 {
607 const char *what;
608 struct socket *sock;
609 struct sockaddr_in6 src_in6;
610 int err;
611 int disconnect_on_error = 1;
612
613 if (!get_net_conf(tconn))
614 return NULL;
615
616 what = "sock_create_kern";
617 err = sock_create_kern(((struct sockaddr *)tconn->net_conf->my_addr)->sa_family,
618 SOCK_STREAM, IPPROTO_TCP, &sock);
619 if (err < 0) {
620 sock = NULL;
621 goto out;
622 }
623
624 sock->sk->sk_rcvtimeo =
625 sock->sk->sk_sndtimeo = tconn->net_conf->try_connect_int*HZ;
626 drbd_setbufsize(sock, tconn->net_conf->sndbuf_size,
627 tconn->net_conf->rcvbuf_size);
628
629 /* explicitly bind to the configured IP as source IP
630 * for the outgoing connections.
631 * This is needed for multihomed hosts and to be
632 * able to use lo: interfaces for drbd.
633 * Make sure to use 0 as port number, so linux selects
634 * a free one dynamically.
635 */
636 memcpy(&src_in6, tconn->net_conf->my_addr,
637 min_t(int, tconn->net_conf->my_addr_len, sizeof(src_in6)));
638 if (((struct sockaddr *)tconn->net_conf->my_addr)->sa_family == AF_INET6)
639 src_in6.sin6_port = 0;
640 else
641 ((struct sockaddr_in *)&src_in6)->sin_port = 0; /* AF_INET & AF_SCI */
642
643 what = "bind before connect";
644 err = sock->ops->bind(sock,
645 (struct sockaddr *) &src_in6,
646 tconn->net_conf->my_addr_len);
647 if (err < 0)
648 goto out;
649
650 /* connect may fail, peer not yet available.
651 * stay C_WF_CONNECTION, don't go Disconnecting! */
652 disconnect_on_error = 0;
653 what = "connect";
654 err = sock->ops->connect(sock,
655 (struct sockaddr *)tconn->net_conf->peer_addr,
656 tconn->net_conf->peer_addr_len, 0);
657
658 out:
659 if (err < 0) {
660 if (sock) {
661 sock_release(sock);
662 sock = NULL;
663 }
664 switch (-err) {
665 /* timeout, busy, signal pending */
666 case ETIMEDOUT: case EAGAIN: case EINPROGRESS:
667 case EINTR: case ERESTARTSYS:
668 /* peer not (yet) available, network problem */
669 case ECONNREFUSED: case ENETUNREACH:
670 case EHOSTDOWN: case EHOSTUNREACH:
671 disconnect_on_error = 0;
672 break;
673 default:
674 conn_err(tconn, "%s failed, err = %d\n", what, err);
675 }
676 if (disconnect_on_error)
677 conn_request_state(tconn, NS(conn, C_DISCONNECTING), CS_HARD);
678 }
679 put_net_conf(tconn);
680 return sock;
681 }
682
683 static struct socket *drbd_wait_for_connect(struct drbd_tconn *tconn)
684 {
685 int timeo, err;
686 struct socket *s_estab = NULL, *s_listen;
687 const char *what;
688
689 if (!get_net_conf(tconn))
690 return NULL;
691
692 what = "sock_create_kern";
693 err = sock_create_kern(((struct sockaddr *)tconn->net_conf->my_addr)->sa_family,
694 SOCK_STREAM, IPPROTO_TCP, &s_listen);
695 if (err) {
696 s_listen = NULL;
697 goto out;
698 }
699
700 timeo = tconn->net_conf->try_connect_int * HZ;
701 timeo += (random32() & 1) ? timeo / 7 : -timeo / 7; /* 28.5% random jitter */
702
703 s_listen->sk->sk_reuse = 1; /* SO_REUSEADDR */
704 s_listen->sk->sk_rcvtimeo = timeo;
705 s_listen->sk->sk_sndtimeo = timeo;
706 drbd_setbufsize(s_listen, tconn->net_conf->sndbuf_size,
707 tconn->net_conf->rcvbuf_size);
708
709 what = "bind before listen";
710 err = s_listen->ops->bind(s_listen,
711 (struct sockaddr *) tconn->net_conf->my_addr,
712 tconn->net_conf->my_addr_len);
713 if (err < 0)
714 goto out;
715
716 err = drbd_accept(&what, s_listen, &s_estab);
717
718 out:
719 if (s_listen)
720 sock_release(s_listen);
721 if (err < 0) {
722 if (err != -EAGAIN && err != -EINTR && err != -ERESTARTSYS) {
723 conn_err(tconn, "%s failed, err = %d\n", what, err);
724 conn_request_state(tconn, NS(conn, C_DISCONNECTING), CS_HARD);
725 }
726 }
727 put_net_conf(tconn);
728
729 return s_estab;
730 }
731
732 static int decode_header(struct drbd_tconn *, void *, struct packet_info *);
733
734 static int send_first_packet(struct drbd_tconn *tconn, struct drbd_socket *sock,
735 enum drbd_packet cmd)
736 {
737 if (!conn_prepare_command(tconn, sock))
738 return -EIO;
739 return conn_send_command(tconn, sock, cmd, 0, NULL, 0);
740 }
741
742 static int receive_first_packet(struct drbd_tconn *tconn, struct socket *sock)
743 {
744 unsigned int header_size = drbd_header_size(tconn);
745 struct packet_info pi;
746 int err;
747
748 err = drbd_recv_short(sock, tconn->data.rbuf, header_size, 0);
749 if (err != header_size) {
750 if (err >= 0)
751 err = -EIO;
752 return err;
753 }
754 err = decode_header(tconn, tconn->data.rbuf, &pi);
755 if (err)
756 return err;
757 return pi.cmd;
758 }
759
760 /**
761 * drbd_socket_okay() - Free the socket if its connection is not okay
762 * @sock: pointer to the pointer to the socket.
763 */
764 static int drbd_socket_okay(struct socket **sock)
765 {
766 int rr;
767 char tb[4];
768
769 if (!*sock)
770 return false;
771
772 rr = drbd_recv_short(*sock, tb, 4, MSG_DONTWAIT | MSG_PEEK);
773
774 if (rr > 0 || rr == -EAGAIN) {
775 return true;
776 } else {
777 sock_release(*sock);
778 *sock = NULL;
779 return false;
780 }
781 }
782 /* Gets called if a connection is established, or if a new minor gets created
783 in a connection */
784 int drbd_connected(int vnr, void *p, void *data)
785 {
786 struct drbd_conf *mdev = (struct drbd_conf *)p;
787 int err;
788
789 atomic_set(&mdev->packet_seq, 0);
790 mdev->peer_seq = 0;
791
792 mdev->state_mutex = mdev->tconn->agreed_pro_version < 100 ?
793 &mdev->tconn->cstate_mutex :
794 &mdev->own_state_mutex;
795
796 err = drbd_send_sync_param(mdev);
797 if (!err)
798 err = drbd_send_sizes(mdev, 0, 0);
799 if (!err)
800 err = drbd_send_uuids(mdev);
801 if (!err)
802 err = drbd_send_state(mdev);
803 clear_bit(USE_DEGR_WFC_T, &mdev->flags);
804 clear_bit(RESIZE_PENDING, &mdev->flags);
805 mod_timer(&mdev->request_timer, jiffies + HZ); /* just start it here. */
806 return err;
807 }
808
809 /*
810 * return values:
811 * 1 yes, we have a valid connection
812 * 0 oops, did not work out, please try again
813 * -1 peer talks different language,
814 * no point in trying again, please go standalone.
815 * -2 We do not have a network config...
816 */
817 static int drbd_connect(struct drbd_tconn *tconn)
818 {
819 struct socket *sock, *msock;
820 int try, h, ok;
821
822 if (conn_request_state(tconn, NS(conn, C_WF_CONNECTION), CS_VERBOSE) < SS_SUCCESS)
823 return -2;
824
825 clear_bit(DISCARD_CONCURRENT, &tconn->flags);
826
827 /* Assume that the peer only understands protocol 80 until we know better. */
828 tconn->agreed_pro_version = 80;
829
830 do {
831 struct socket *s;
832
833 for (try = 0;;) {
834 /* 3 tries, this should take less than a second! */
835 s = drbd_try_connect(tconn);
836 if (s || ++try >= 3)
837 break;
838 /* give the other side time to call bind() & listen() */
839 schedule_timeout_interruptible(HZ / 10);
840 }
841
842 if (s) {
843 if (!tconn->data.socket) {
844 tconn->data.socket = s;
845 send_first_packet(tconn, &tconn->data, P_INITIAL_DATA);
846 } else if (!tconn->meta.socket) {
847 tconn->meta.socket = s;
848 send_first_packet(tconn, &tconn->meta, P_INITIAL_META);
849 } else {
850 conn_err(tconn, "Logic error in drbd_connect()\n");
851 goto out_release_sockets;
852 }
853 }
854
855 if (tconn->data.socket && tconn->meta.socket) {
856 schedule_timeout_interruptible(tconn->net_conf->ping_timeo*HZ/10);
857 ok = drbd_socket_okay(&tconn->data.socket);
858 ok = drbd_socket_okay(&tconn->meta.socket) && ok;
859 if (ok)
860 break;
861 }
862
863 retry:
864 s = drbd_wait_for_connect(tconn);
865 if (s) {
866 try = receive_first_packet(tconn, s);
867 drbd_socket_okay(&tconn->data.socket);
868 drbd_socket_okay(&tconn->meta.socket);
869 switch (try) {
870 case P_INITIAL_DATA:
871 if (tconn->data.socket) {
872 conn_warn(tconn, "initial packet S crossed\n");
873 sock_release(tconn->data.socket);
874 }
875 tconn->data.socket = s;
876 break;
877 case P_INITIAL_META:
878 if (tconn->meta.socket) {
879 conn_warn(tconn, "initial packet M crossed\n");
880 sock_release(tconn->meta.socket);
881 }
882 tconn->meta.socket = s;
883 set_bit(DISCARD_CONCURRENT, &tconn->flags);
884 break;
885 default:
886 conn_warn(tconn, "Error receiving initial packet\n");
887 sock_release(s);
888 if (random32() & 1)
889 goto retry;
890 }
891 }
892
893 if (tconn->cstate <= C_DISCONNECTING)
894 goto out_release_sockets;
895 if (signal_pending(current)) {
896 flush_signals(current);
897 smp_rmb();
898 if (get_t_state(&tconn->receiver) == EXITING)
899 goto out_release_sockets;
900 }
901
902 if (tconn->data.socket && &tconn->meta.socket) {
903 ok = drbd_socket_okay(&tconn->data.socket);
904 ok = drbd_socket_okay(&tconn->meta.socket) && ok;
905 if (ok)
906 break;
907 }
908 } while (1);
909
910 sock = tconn->data.socket;
911 msock = tconn->meta.socket;
912
913 msock->sk->sk_reuse = 1; /* SO_REUSEADDR */
914 sock->sk->sk_reuse = 1; /* SO_REUSEADDR */
915
916 sock->sk->sk_allocation = GFP_NOIO;
917 msock->sk->sk_allocation = GFP_NOIO;
918
919 sock->sk->sk_priority = TC_PRIO_INTERACTIVE_BULK;
920 msock->sk->sk_priority = TC_PRIO_INTERACTIVE;
921
922 /* NOT YET ...
923 * sock->sk->sk_sndtimeo = tconn->net_conf->timeout*HZ/10;
924 * sock->sk->sk_rcvtimeo = MAX_SCHEDULE_TIMEOUT;
925 * first set it to the P_CONNECTION_FEATURES timeout,
926 * which we set to 4x the configured ping_timeout. */
927 sock->sk->sk_sndtimeo =
928 sock->sk->sk_rcvtimeo = tconn->net_conf->ping_timeo*4*HZ/10;
929
930 msock->sk->sk_sndtimeo = tconn->net_conf->timeout*HZ/10;
931 msock->sk->sk_rcvtimeo = tconn->net_conf->ping_int*HZ;
932
933 /* we don't want delays.
934 * we use TCP_CORK where appropriate, though */
935 drbd_tcp_nodelay(sock);
936 drbd_tcp_nodelay(msock);
937
938 tconn->last_received = jiffies;
939
940 h = drbd_do_features(tconn);
941 if (h <= 0)
942 return h;
943
944 if (tconn->cram_hmac_tfm) {
945 /* drbd_request_state(mdev, NS(conn, WFAuth)); */
946 switch (drbd_do_auth(tconn)) {
947 case -1:
948 conn_err(tconn, "Authentication of peer failed\n");
949 return -1;
950 case 0:
951 conn_err(tconn, "Authentication of peer failed, trying again.\n");
952 return 0;
953 }
954 }
955
956 if (conn_request_state(tconn, NS(conn, C_WF_REPORT_PARAMS), CS_VERBOSE) < SS_SUCCESS)
957 return 0;
958
959 sock->sk->sk_sndtimeo = tconn->net_conf->timeout*HZ/10;
960 sock->sk->sk_rcvtimeo = MAX_SCHEDULE_TIMEOUT;
961
962 drbd_thread_start(&tconn->asender);
963
964 if (drbd_send_protocol(tconn) == -EOPNOTSUPP)
965 return -1;
966
967 return !idr_for_each(&tconn->volumes, drbd_connected, tconn);
968
969 out_release_sockets:
970 if (tconn->data.socket) {
971 sock_release(tconn->data.socket);
972 tconn->data.socket = NULL;
973 }
974 if (tconn->meta.socket) {
975 sock_release(tconn->meta.socket);
976 tconn->meta.socket = NULL;
977 }
978 return -1;
979 }
980
981 static int decode_header(struct drbd_tconn *tconn, void *header, struct packet_info *pi)
982 {
983 unsigned int header_size = drbd_header_size(tconn);
984
985 if (header_size == sizeof(struct p_header100) &&
986 *(__be32 *)header == cpu_to_be32(DRBD_MAGIC_100)) {
987 struct p_header100 *h = header;
988 if (h->pad != 0) {
989 conn_err(tconn, "Header padding is not zero\n");
990 return -EINVAL;
991 }
992 pi->vnr = be16_to_cpu(h->volume);
993 pi->cmd = be16_to_cpu(h->command);
994 pi->size = be32_to_cpu(h->length);
995 } else if (header_size == sizeof(struct p_header95) &&
996 *(__be16 *)header == cpu_to_be16(DRBD_MAGIC_BIG)) {
997 struct p_header95 *h = header;
998 pi->cmd = be16_to_cpu(h->command);
999 pi->size = be32_to_cpu(h->length);
1000 pi->vnr = 0;
1001 } else if (header_size == sizeof(struct p_header80) &&
1002 *(__be32 *)header == cpu_to_be32(DRBD_MAGIC)) {
1003 struct p_header80 *h = header;
1004 pi->cmd = be16_to_cpu(h->command);
1005 pi->size = be16_to_cpu(h->length);
1006 pi->vnr = 0;
1007 } else {
1008 conn_err(tconn, "Wrong magic value 0x%08x in protocol version %d\n",
1009 be32_to_cpu(*(__be32 *)header),
1010 tconn->agreed_pro_version);
1011 return -EINVAL;
1012 }
1013 pi->data = header + header_size;
1014 return 0;
1015 }
1016
1017 static int drbd_recv_header(struct drbd_tconn *tconn, struct packet_info *pi)
1018 {
1019 void *buffer = tconn->data.rbuf;
1020 int err;
1021
1022 err = drbd_recv_all_warn(tconn, buffer, drbd_header_size(tconn));
1023 if (err)
1024 return err;
1025
1026 err = decode_header(tconn, buffer, pi);
1027 tconn->last_received = jiffies;
1028
1029 return err;
1030 }
1031
1032 static void drbd_flush(struct drbd_conf *mdev)
1033 {
1034 int rv;
1035
1036 if (mdev->write_ordering >= WO_bdev_flush && get_ldev(mdev)) {
1037 rv = blkdev_issue_flush(mdev->ldev->backing_bdev, GFP_KERNEL,
1038 NULL);
1039 if (rv) {
1040 dev_err(DEV, "local disk flush failed with status %d\n", rv);
1041 /* would rather check on EOPNOTSUPP, but that is not reliable.
1042 * don't try again for ANY return value != 0
1043 * if (rv == -EOPNOTSUPP) */
1044 drbd_bump_write_ordering(mdev, WO_drain_io);
1045 }
1046 put_ldev(mdev);
1047 }
1048 }
1049
1050 /**
1051 * drbd_may_finish_epoch() - Applies an epoch_event to the epoch's state, eventually finishes it.
1052 * @mdev: DRBD device.
1053 * @epoch: Epoch object.
1054 * @ev: Epoch event.
1055 */
1056 static enum finish_epoch drbd_may_finish_epoch(struct drbd_conf *mdev,
1057 struct drbd_epoch *epoch,
1058 enum epoch_event ev)
1059 {
1060 int epoch_size;
1061 struct drbd_epoch *next_epoch;
1062 enum finish_epoch rv = FE_STILL_LIVE;
1063
1064 spin_lock(&mdev->epoch_lock);
1065 do {
1066 next_epoch = NULL;
1067
1068 epoch_size = atomic_read(&epoch->epoch_size);
1069
1070 switch (ev & ~EV_CLEANUP) {
1071 case EV_PUT:
1072 atomic_dec(&epoch->active);
1073 break;
1074 case EV_GOT_BARRIER_NR:
1075 set_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags);
1076 break;
1077 case EV_BECAME_LAST:
1078 /* nothing to do*/
1079 break;
1080 }
1081
1082 if (epoch_size != 0 &&
1083 atomic_read(&epoch->active) == 0 &&
1084 test_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags)) {
1085 if (!(ev & EV_CLEANUP)) {
1086 spin_unlock(&mdev->epoch_lock);
1087 drbd_send_b_ack(mdev, epoch->barrier_nr, epoch_size);
1088 spin_lock(&mdev->epoch_lock);
1089 }
1090 dec_unacked(mdev);
1091
1092 if (mdev->current_epoch != epoch) {
1093 next_epoch = list_entry(epoch->list.next, struct drbd_epoch, list);
1094 list_del(&epoch->list);
1095 ev = EV_BECAME_LAST | (ev & EV_CLEANUP);
1096 mdev->epochs--;
1097 kfree(epoch);
1098
1099 if (rv == FE_STILL_LIVE)
1100 rv = FE_DESTROYED;
1101 } else {
1102 epoch->flags = 0;
1103 atomic_set(&epoch->epoch_size, 0);
1104 /* atomic_set(&epoch->active, 0); is already zero */
1105 if (rv == FE_STILL_LIVE)
1106 rv = FE_RECYCLED;
1107 wake_up(&mdev->ee_wait);
1108 }
1109 }
1110
1111 if (!next_epoch)
1112 break;
1113
1114 epoch = next_epoch;
1115 } while (1);
1116
1117 spin_unlock(&mdev->epoch_lock);
1118
1119 return rv;
1120 }
1121
1122 /**
1123 * drbd_bump_write_ordering() - Fall back to an other write ordering method
1124 * @mdev: DRBD device.
1125 * @wo: Write ordering method to try.
1126 */
1127 void drbd_bump_write_ordering(struct drbd_conf *mdev, enum write_ordering_e wo) __must_hold(local)
1128 {
1129 enum write_ordering_e pwo;
1130 static char *write_ordering_str[] = {
1131 [WO_none] = "none",
1132 [WO_drain_io] = "drain",
1133 [WO_bdev_flush] = "flush",
1134 };
1135
1136 pwo = mdev->write_ordering;
1137 wo = min(pwo, wo);
1138 if (wo == WO_bdev_flush && mdev->ldev->dc.no_disk_flush)
1139 wo = WO_drain_io;
1140 if (wo == WO_drain_io && mdev->ldev->dc.no_disk_drain)
1141 wo = WO_none;
1142 mdev->write_ordering = wo;
1143 if (pwo != mdev->write_ordering || wo == WO_bdev_flush)
1144 dev_info(DEV, "Method to ensure write ordering: %s\n", write_ordering_str[mdev->write_ordering]);
1145 }
1146
1147 /**
1148 * drbd_submit_peer_request()
1149 * @mdev: DRBD device.
1150 * @peer_req: peer request
1151 * @rw: flag field, see bio->bi_rw
1152 *
1153 * May spread the pages to multiple bios,
1154 * depending on bio_add_page restrictions.
1155 *
1156 * Returns 0 if all bios have been submitted,
1157 * -ENOMEM if we could not allocate enough bios,
1158 * -ENOSPC (any better suggestion?) if we have not been able to bio_add_page a
1159 * single page to an empty bio (which should never happen and likely indicates
1160 * that the lower level IO stack is in some way broken). This has been observed
1161 * on certain Xen deployments.
1162 */
1163 /* TODO allocate from our own bio_set. */
1164 int drbd_submit_peer_request(struct drbd_conf *mdev,
1165 struct drbd_peer_request *peer_req,
1166 const unsigned rw, const int fault_type)
1167 {
1168 struct bio *bios = NULL;
1169 struct bio *bio;
1170 struct page *page = peer_req->pages;
1171 sector_t sector = peer_req->i.sector;
1172 unsigned ds = peer_req->i.size;
1173 unsigned n_bios = 0;
1174 unsigned nr_pages = (ds + PAGE_SIZE -1) >> PAGE_SHIFT;
1175 int err = -ENOMEM;
1176
1177 /* In most cases, we will only need one bio. But in case the lower
1178 * level restrictions happen to be different at this offset on this
1179 * side than those of the sending peer, we may need to submit the
1180 * request in more than one bio.
1181 *
1182 * Plain bio_alloc is good enough here, this is no DRBD internally
1183 * generated bio, but a bio allocated on behalf of the peer.
1184 */
1185 next_bio:
1186 bio = bio_alloc(GFP_NOIO, nr_pages);
1187 if (!bio) {
1188 dev_err(DEV, "submit_ee: Allocation of a bio failed\n");
1189 goto fail;
1190 }
1191 /* > peer_req->i.sector, unless this is the first bio */
1192 bio->bi_sector = sector;
1193 bio->bi_bdev = mdev->ldev->backing_bdev;
1194 bio->bi_rw = rw;
1195 bio->bi_private = peer_req;
1196 bio->bi_end_io = drbd_peer_request_endio;
1197
1198 bio->bi_next = bios;
1199 bios = bio;
1200 ++n_bios;
1201
1202 page_chain_for_each(page) {
1203 unsigned len = min_t(unsigned, ds, PAGE_SIZE);
1204 if (!bio_add_page(bio, page, len, 0)) {
1205 /* A single page must always be possible!
1206 * But in case it fails anyways,
1207 * we deal with it, and complain (below). */
1208 if (bio->bi_vcnt == 0) {
1209 dev_err(DEV,
1210 "bio_add_page failed for len=%u, "
1211 "bi_vcnt=0 (bi_sector=%llu)\n",
1212 len, (unsigned long long)bio->bi_sector);
1213 err = -ENOSPC;
1214 goto fail;
1215 }
1216 goto next_bio;
1217 }
1218 ds -= len;
1219 sector += len >> 9;
1220 --nr_pages;
1221 }
1222 D_ASSERT(page == NULL);
1223 D_ASSERT(ds == 0);
1224
1225 atomic_set(&peer_req->pending_bios, n_bios);
1226 do {
1227 bio = bios;
1228 bios = bios->bi_next;
1229 bio->bi_next = NULL;
1230
1231 drbd_generic_make_request(mdev, fault_type, bio);
1232 } while (bios);
1233 return 0;
1234
1235 fail:
1236 while (bios) {
1237 bio = bios;
1238 bios = bios->bi_next;
1239 bio_put(bio);
1240 }
1241 return err;
1242 }
1243
1244 static void drbd_remove_epoch_entry_interval(struct drbd_conf *mdev,
1245 struct drbd_peer_request *peer_req)
1246 {
1247 struct drbd_interval *i = &peer_req->i;
1248
1249 drbd_remove_interval(&mdev->write_requests, i);
1250 drbd_clear_interval(i);
1251
1252 /* Wake up any processes waiting for this peer request to complete. */
1253 if (i->waiting)
1254 wake_up(&mdev->misc_wait);
1255 }
1256
1257 static int receive_Barrier(struct drbd_tconn *tconn, struct packet_info *pi)
1258 {
1259 struct drbd_conf *mdev;
1260 int rv;
1261 struct p_barrier *p = pi->data;
1262 struct drbd_epoch *epoch;
1263
1264 mdev = vnr_to_mdev(tconn, pi->vnr);
1265 if (!mdev)
1266 return -EIO;
1267
1268 inc_unacked(mdev);
1269
1270 mdev->current_epoch->barrier_nr = p->barrier;
1271 rv = drbd_may_finish_epoch(mdev, mdev->current_epoch, EV_GOT_BARRIER_NR);
1272
1273 /* P_BARRIER_ACK may imply that the corresponding extent is dropped from
1274 * the activity log, which means it would not be resynced in case the
1275 * R_PRIMARY crashes now.
1276 * Therefore we must send the barrier_ack after the barrier request was
1277 * completed. */
1278 switch (mdev->write_ordering) {
1279 case WO_none:
1280 if (rv == FE_RECYCLED)
1281 return 0;
1282
1283 /* receiver context, in the writeout path of the other node.
1284 * avoid potential distributed deadlock */
1285 epoch = kmalloc(sizeof(struct drbd_epoch), GFP_NOIO);
1286 if (epoch)
1287 break;
1288 else
1289 dev_warn(DEV, "Allocation of an epoch failed, slowing down\n");
1290 /* Fall through */
1291
1292 case WO_bdev_flush:
1293 case WO_drain_io:
1294 drbd_wait_ee_list_empty(mdev, &mdev->active_ee);
1295 drbd_flush(mdev);
1296
1297 if (atomic_read(&mdev->current_epoch->epoch_size)) {
1298 epoch = kmalloc(sizeof(struct drbd_epoch), GFP_NOIO);
1299 if (epoch)
1300 break;
1301 }
1302
1303 epoch = mdev->current_epoch;
1304 wait_event(mdev->ee_wait, atomic_read(&epoch->epoch_size) == 0);
1305
1306 D_ASSERT(atomic_read(&epoch->active) == 0);
1307 D_ASSERT(epoch->flags == 0);
1308
1309 return 0;
1310 default:
1311 dev_err(DEV, "Strangeness in mdev->write_ordering %d\n", mdev->write_ordering);
1312 return -EIO;
1313 }
1314
1315 epoch->flags = 0;
1316 atomic_set(&epoch->epoch_size, 0);
1317 atomic_set(&epoch->active, 0);
1318
1319 spin_lock(&mdev->epoch_lock);
1320 if (atomic_read(&mdev->current_epoch->epoch_size)) {
1321 list_add(&epoch->list, &mdev->current_epoch->list);
1322 mdev->current_epoch = epoch;
1323 mdev->epochs++;
1324 } else {
1325 /* The current_epoch got recycled while we allocated this one... */
1326 kfree(epoch);
1327 }
1328 spin_unlock(&mdev->epoch_lock);
1329
1330 return 0;
1331 }
1332
1333 /* used from receive_RSDataReply (recv_resync_read)
1334 * and from receive_Data */
1335 static struct drbd_peer_request *
1336 read_in_block(struct drbd_conf *mdev, u64 id, sector_t sector,
1337 int data_size) __must_hold(local)
1338 {
1339 const sector_t capacity = drbd_get_capacity(mdev->this_bdev);
1340 struct drbd_peer_request *peer_req;
1341 struct page *page;
1342 int dgs, ds, err;
1343 void *dig_in = mdev->tconn->int_dig_in;
1344 void *dig_vv = mdev->tconn->int_dig_vv;
1345 unsigned long *data;
1346
1347 dgs = (mdev->tconn->agreed_pro_version >= 87 && mdev->tconn->integrity_r_tfm) ?
1348 crypto_hash_digestsize(mdev->tconn->integrity_r_tfm) : 0;
1349
1350 if (dgs) {
1351 /*
1352 * FIXME: Receive the incoming digest into the receive buffer
1353 * here, together with its struct p_data?
1354 */
1355 err = drbd_recv_all_warn(mdev->tconn, dig_in, dgs);
1356 if (err)
1357 return NULL;
1358 }
1359
1360 data_size -= dgs;
1361
1362 if (!expect(data_size != 0))
1363 return NULL;
1364 if (!expect(IS_ALIGNED(data_size, 512)))
1365 return NULL;
1366 if (!expect(data_size <= DRBD_MAX_BIO_SIZE))
1367 return NULL;
1368
1369 /* even though we trust out peer,
1370 * we sometimes have to double check. */
1371 if (sector + (data_size>>9) > capacity) {
1372 dev_err(DEV, "request from peer beyond end of local disk: "
1373 "capacity: %llus < sector: %llus + size: %u\n",
1374 (unsigned long long)capacity,
1375 (unsigned long long)sector, data_size);
1376 return NULL;
1377 }
1378
1379 /* GFP_NOIO, because we must not cause arbitrary write-out: in a DRBD
1380 * "criss-cross" setup, that might cause write-out on some other DRBD,
1381 * which in turn might block on the other node at this very place. */
1382 peer_req = drbd_alloc_peer_req(mdev, id, sector, data_size, GFP_NOIO);
1383 if (!peer_req)
1384 return NULL;
1385
1386 ds = data_size;
1387 page = peer_req->pages;
1388 page_chain_for_each(page) {
1389 unsigned len = min_t(int, ds, PAGE_SIZE);
1390 data = kmap(page);
1391 err = drbd_recv_all_warn(mdev->tconn, data, len);
1392 if (drbd_insert_fault(mdev, DRBD_FAULT_RECEIVE)) {
1393 dev_err(DEV, "Fault injection: Corrupting data on receive\n");
1394 data[0] = data[0] ^ (unsigned long)-1;
1395 }
1396 kunmap(page);
1397 if (err) {
1398 drbd_free_peer_req(mdev, peer_req);
1399 return NULL;
1400 }
1401 ds -= len;
1402 }
1403
1404 if (dgs) {
1405 drbd_csum_ee(mdev, mdev->tconn->integrity_r_tfm, peer_req, dig_vv);
1406 if (memcmp(dig_in, dig_vv, dgs)) {
1407 dev_err(DEV, "Digest integrity check FAILED: %llus +%u\n",
1408 (unsigned long long)sector, data_size);
1409 drbd_free_peer_req(mdev, peer_req);
1410 return NULL;
1411 }
1412 }
1413 mdev->recv_cnt += data_size>>9;
1414 return peer_req;
1415 }
1416
1417 /* drbd_drain_block() just takes a data block
1418 * out of the socket input buffer, and discards it.
1419 */
1420 static int drbd_drain_block(struct drbd_conf *mdev, int data_size)
1421 {
1422 struct page *page;
1423 int err = 0;
1424 void *data;
1425
1426 if (!data_size)
1427 return 0;
1428
1429 page = drbd_pp_alloc(mdev, 1, 1);
1430
1431 data = kmap(page);
1432 while (data_size) {
1433 unsigned int len = min_t(int, data_size, PAGE_SIZE);
1434
1435 err = drbd_recv_all_warn(mdev->tconn, data, len);
1436 if (err)
1437 break;
1438 data_size -= len;
1439 }
1440 kunmap(page);
1441 drbd_pp_free(mdev, page, 0);
1442 return err;
1443 }
1444
1445 static int recv_dless_read(struct drbd_conf *mdev, struct drbd_request *req,
1446 sector_t sector, int data_size)
1447 {
1448 struct bio_vec *bvec;
1449 struct bio *bio;
1450 int dgs, err, i, expect;
1451 void *dig_in = mdev->tconn->int_dig_in;
1452 void *dig_vv = mdev->tconn->int_dig_vv;
1453
1454 dgs = (mdev->tconn->agreed_pro_version >= 87 && mdev->tconn->integrity_r_tfm) ?
1455 crypto_hash_digestsize(mdev->tconn->integrity_r_tfm) : 0;
1456
1457 if (dgs) {
1458 err = drbd_recv_all_warn(mdev->tconn, dig_in, dgs);
1459 if (err)
1460 return err;
1461 }
1462
1463 data_size -= dgs;
1464
1465 /* optimistically update recv_cnt. if receiving fails below,
1466 * we disconnect anyways, and counters will be reset. */
1467 mdev->recv_cnt += data_size>>9;
1468
1469 bio = req->master_bio;
1470 D_ASSERT(sector == bio->bi_sector);
1471
1472 bio_for_each_segment(bvec, bio, i) {
1473 void *mapped = kmap(bvec->bv_page) + bvec->bv_offset;
1474 expect = min_t(int, data_size, bvec->bv_len);
1475 err = drbd_recv_all_warn(mdev->tconn, mapped, expect);
1476 kunmap(bvec->bv_page);
1477 if (err)
1478 return err;
1479 data_size -= expect;
1480 }
1481
1482 if (dgs) {
1483 drbd_csum_bio(mdev, mdev->tconn->integrity_r_tfm, bio, dig_vv);
1484 if (memcmp(dig_in, dig_vv, dgs)) {
1485 dev_err(DEV, "Digest integrity check FAILED. Broken NICs?\n");
1486 return -EINVAL;
1487 }
1488 }
1489
1490 D_ASSERT(data_size == 0);
1491 return 0;
1492 }
1493
1494 /* e_end_resync_block() is called via
1495 * drbd_process_done_ee() by asender only */
1496 static int e_end_resync_block(struct drbd_work *w, int unused)
1497 {
1498 struct drbd_peer_request *peer_req =
1499 container_of(w, struct drbd_peer_request, w);
1500 struct drbd_conf *mdev = w->mdev;
1501 sector_t sector = peer_req->i.sector;
1502 int err;
1503
1504 D_ASSERT(drbd_interval_empty(&peer_req->i));
1505
1506 if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
1507 drbd_set_in_sync(mdev, sector, peer_req->i.size);
1508 err = drbd_send_ack(mdev, P_RS_WRITE_ACK, peer_req);
1509 } else {
1510 /* Record failure to sync */
1511 drbd_rs_failed_io(mdev, sector, peer_req->i.size);
1512
1513 err = drbd_send_ack(mdev, P_NEG_ACK, peer_req);
1514 }
1515 dec_unacked(mdev);
1516
1517 return err;
1518 }
1519
1520 static int recv_resync_read(struct drbd_conf *mdev, sector_t sector, int data_size) __releases(local)
1521 {
1522 struct drbd_peer_request *peer_req;
1523
1524 peer_req = read_in_block(mdev, ID_SYNCER, sector, data_size);
1525 if (!peer_req)
1526 goto fail;
1527
1528 dec_rs_pending(mdev);
1529
1530 inc_unacked(mdev);
1531 /* corresponding dec_unacked() in e_end_resync_block()
1532 * respective _drbd_clear_done_ee */
1533
1534 peer_req->w.cb = e_end_resync_block;
1535
1536 spin_lock_irq(&mdev->tconn->req_lock);
1537 list_add(&peer_req->w.list, &mdev->sync_ee);
1538 spin_unlock_irq(&mdev->tconn->req_lock);
1539
1540 atomic_add(data_size >> 9, &mdev->rs_sect_ev);
1541 if (drbd_submit_peer_request(mdev, peer_req, WRITE, DRBD_FAULT_RS_WR) == 0)
1542 return 0;
1543
1544 /* don't care for the reason here */
1545 dev_err(DEV, "submit failed, triggering re-connect\n");
1546 spin_lock_irq(&mdev->tconn->req_lock);
1547 list_del(&peer_req->w.list);
1548 spin_unlock_irq(&mdev->tconn->req_lock);
1549
1550 drbd_free_peer_req(mdev, peer_req);
1551 fail:
1552 put_ldev(mdev);
1553 return -EIO;
1554 }
1555
1556 static struct drbd_request *
1557 find_request(struct drbd_conf *mdev, struct rb_root *root, u64 id,
1558 sector_t sector, bool missing_ok, const char *func)
1559 {
1560 struct drbd_request *req;
1561
1562 /* Request object according to our peer */
1563 req = (struct drbd_request *)(unsigned long)id;
1564 if (drbd_contains_interval(root, sector, &req->i) && req->i.local)
1565 return req;
1566 if (!missing_ok) {
1567 dev_err(DEV, "%s: failed to find request %lu, sector %llus\n", func,
1568 (unsigned long)id, (unsigned long long)sector);
1569 }
1570 return NULL;
1571 }
1572
1573 static int receive_DataReply(struct drbd_tconn *tconn, struct packet_info *pi)
1574 {
1575 struct drbd_conf *mdev;
1576 struct drbd_request *req;
1577 sector_t sector;
1578 int err;
1579 struct p_data *p = pi->data;
1580
1581 mdev = vnr_to_mdev(tconn, pi->vnr);
1582 if (!mdev)
1583 return -EIO;
1584
1585 sector = be64_to_cpu(p->sector);
1586
1587 spin_lock_irq(&mdev->tconn->req_lock);
1588 req = find_request(mdev, &mdev->read_requests, p->block_id, sector, false, __func__);
1589 spin_unlock_irq(&mdev->tconn->req_lock);
1590 if (unlikely(!req))
1591 return -EIO;
1592
1593 /* hlist_del(&req->collision) is done in _req_may_be_done, to avoid
1594 * special casing it there for the various failure cases.
1595 * still no race with drbd_fail_pending_reads */
1596 err = recv_dless_read(mdev, req, sector, pi->size);
1597 if (!err)
1598 req_mod(req, DATA_RECEIVED);
1599 /* else: nothing. handled from drbd_disconnect...
1600 * I don't think we may complete this just yet
1601 * in case we are "on-disconnect: freeze" */
1602
1603 return err;
1604 }
1605
1606 static int receive_RSDataReply(struct drbd_tconn *tconn, struct packet_info *pi)
1607 {
1608 struct drbd_conf *mdev;
1609 sector_t sector;
1610 int err;
1611 struct p_data *p = pi->data;
1612
1613 mdev = vnr_to_mdev(tconn, pi->vnr);
1614 if (!mdev)
1615 return -EIO;
1616
1617 sector = be64_to_cpu(p->sector);
1618 D_ASSERT(p->block_id == ID_SYNCER);
1619
1620 if (get_ldev(mdev)) {
1621 /* data is submitted to disk within recv_resync_read.
1622 * corresponding put_ldev done below on error,
1623 * or in drbd_peer_request_endio. */
1624 err = recv_resync_read(mdev, sector, pi->size);
1625 } else {
1626 if (__ratelimit(&drbd_ratelimit_state))
1627 dev_err(DEV, "Can not write resync data to local disk.\n");
1628
1629 err = drbd_drain_block(mdev, pi->size);
1630
1631 drbd_send_ack_dp(mdev, P_NEG_ACK, p, pi->size);
1632 }
1633
1634 atomic_add(pi->size >> 9, &mdev->rs_sect_in);
1635
1636 return err;
1637 }
1638
1639 static int w_restart_write(struct drbd_work *w, int cancel)
1640 {
1641 struct drbd_request *req = container_of(w, struct drbd_request, w);
1642 struct drbd_conf *mdev = w->mdev;
1643 struct bio *bio;
1644 unsigned long start_time;
1645 unsigned long flags;
1646
1647 spin_lock_irqsave(&mdev->tconn->req_lock, flags);
1648 if (!expect(req->rq_state & RQ_POSTPONED)) {
1649 spin_unlock_irqrestore(&mdev->tconn->req_lock, flags);
1650 return -EIO;
1651 }
1652 bio = req->master_bio;
1653 start_time = req->start_time;
1654 /* Postponed requests will not have their master_bio completed! */
1655 __req_mod(req, DISCARD_WRITE, NULL);
1656 spin_unlock_irqrestore(&mdev->tconn->req_lock, flags);
1657
1658 while (__drbd_make_request(mdev, bio, start_time))
1659 /* retry */ ;
1660 return 0;
1661 }
1662
1663 static void restart_conflicting_writes(struct drbd_conf *mdev,
1664 sector_t sector, int size)
1665 {
1666 struct drbd_interval *i;
1667 struct drbd_request *req;
1668
1669 drbd_for_each_overlap(i, &mdev->write_requests, sector, size) {
1670 if (!i->local)
1671 continue;
1672 req = container_of(i, struct drbd_request, i);
1673 if (req->rq_state & RQ_LOCAL_PENDING ||
1674 !(req->rq_state & RQ_POSTPONED))
1675 continue;
1676 if (expect(list_empty(&req->w.list))) {
1677 req->w.mdev = mdev;
1678 req->w.cb = w_restart_write;
1679 drbd_queue_work(&mdev->tconn->data.work, &req->w);
1680 }
1681 }
1682 }
1683
1684 /* e_end_block() is called via drbd_process_done_ee().
1685 * this means this function only runs in the asender thread
1686 */
1687 static int e_end_block(struct drbd_work *w, int cancel)
1688 {
1689 struct drbd_peer_request *peer_req =
1690 container_of(w, struct drbd_peer_request, w);
1691 struct drbd_conf *mdev = w->mdev;
1692 sector_t sector = peer_req->i.sector;
1693 int err = 0, pcmd;
1694
1695 if (mdev->tconn->net_conf->wire_protocol == DRBD_PROT_C) {
1696 if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
1697 pcmd = (mdev->state.conn >= C_SYNC_SOURCE &&
1698 mdev->state.conn <= C_PAUSED_SYNC_T &&
1699 peer_req->flags & EE_MAY_SET_IN_SYNC) ?
1700 P_RS_WRITE_ACK : P_WRITE_ACK;
1701 err = drbd_send_ack(mdev, pcmd, peer_req);
1702 if (pcmd == P_RS_WRITE_ACK)
1703 drbd_set_in_sync(mdev, sector, peer_req->i.size);
1704 } else {
1705 err = drbd_send_ack(mdev, P_NEG_ACK, peer_req);
1706 /* we expect it to be marked out of sync anyways...
1707 * maybe assert this? */
1708 }
1709 dec_unacked(mdev);
1710 }
1711 /* we delete from the conflict detection hash _after_ we sent out the
1712 * P_WRITE_ACK / P_NEG_ACK, to get the sequence number right. */
1713 if (mdev->tconn->net_conf->two_primaries) {
1714 spin_lock_irq(&mdev->tconn->req_lock);
1715 D_ASSERT(!drbd_interval_empty(&peer_req->i));
1716 drbd_remove_epoch_entry_interval(mdev, peer_req);
1717 if (peer_req->flags & EE_RESTART_REQUESTS)
1718 restart_conflicting_writes(mdev, sector, peer_req->i.size);
1719 spin_unlock_irq(&mdev->tconn->req_lock);
1720 } else
1721 D_ASSERT(drbd_interval_empty(&peer_req->i));
1722
1723 drbd_may_finish_epoch(mdev, peer_req->epoch, EV_PUT + (cancel ? EV_CLEANUP : 0));
1724
1725 return err;
1726 }
1727
1728 static int e_send_ack(struct drbd_work *w, enum drbd_packet ack)
1729 {
1730 struct drbd_conf *mdev = w->mdev;
1731 struct drbd_peer_request *peer_req =
1732 container_of(w, struct drbd_peer_request, w);
1733 int err;
1734
1735 err = drbd_send_ack(mdev, ack, peer_req);
1736 dec_unacked(mdev);
1737
1738 return err;
1739 }
1740
1741 static int e_send_discard_write(struct drbd_work *w, int unused)
1742 {
1743 return e_send_ack(w, P_DISCARD_WRITE);
1744 }
1745
1746 static int e_send_retry_write(struct drbd_work *w, int unused)
1747 {
1748 struct drbd_tconn *tconn = w->mdev->tconn;
1749
1750 return e_send_ack(w, tconn->agreed_pro_version >= 100 ?
1751 P_RETRY_WRITE : P_DISCARD_WRITE);
1752 }
1753
1754 static bool seq_greater(u32 a, u32 b)
1755 {
1756 /*
1757 * We assume 32-bit wrap-around here.
1758 * For 24-bit wrap-around, we would have to shift:
1759 * a <<= 8; b <<= 8;
1760 */
1761 return (s32)a - (s32)b > 0;
1762 }
1763
1764 static u32 seq_max(u32 a, u32 b)
1765 {
1766 return seq_greater(a, b) ? a : b;
1767 }
1768
1769 static bool need_peer_seq(struct drbd_conf *mdev)
1770 {
1771 struct drbd_tconn *tconn = mdev->tconn;
1772
1773 /*
1774 * We only need to keep track of the last packet_seq number of our peer
1775 * if we are in dual-primary mode and we have the discard flag set; see
1776 * handle_write_conflicts().
1777 */
1778 return tconn->net_conf->two_primaries &&
1779 test_bit(DISCARD_CONCURRENT, &tconn->flags);
1780 }
1781
1782 static void update_peer_seq(struct drbd_conf *mdev, unsigned int peer_seq)
1783 {
1784 unsigned int newest_peer_seq;
1785
1786 if (need_peer_seq(mdev)) {
1787 spin_lock(&mdev->peer_seq_lock);
1788 newest_peer_seq = seq_max(mdev->peer_seq, peer_seq);
1789 mdev->peer_seq = newest_peer_seq;
1790 spin_unlock(&mdev->peer_seq_lock);
1791 /* wake up only if we actually changed mdev->peer_seq */
1792 if (peer_seq == newest_peer_seq)
1793 wake_up(&mdev->seq_wait);
1794 }
1795 }
1796
1797 /* Called from receive_Data.
1798 * Synchronize packets on sock with packets on msock.
1799 *
1800 * This is here so even when a P_DATA packet traveling via sock overtook an Ack
1801 * packet traveling on msock, they are still processed in the order they have
1802 * been sent.
1803 *
1804 * Note: we don't care for Ack packets overtaking P_DATA packets.
1805 *
1806 * In case packet_seq is larger than mdev->peer_seq number, there are
1807 * outstanding packets on the msock. We wait for them to arrive.
1808 * In case we are the logically next packet, we update mdev->peer_seq
1809 * ourselves. Correctly handles 32bit wrap around.
1810 *
1811 * Assume we have a 10 GBit connection, that is about 1<<30 byte per second,
1812 * about 1<<21 sectors per second. So "worst" case, we have 1<<3 == 8 seconds
1813 * for the 24bit wrap (historical atomic_t guarantee on some archs), and we have
1814 * 1<<9 == 512 seconds aka ages for the 32bit wrap around...
1815 *
1816 * returns 0 if we may process the packet,
1817 * -ERESTARTSYS if we were interrupted (by disconnect signal). */
1818 static int wait_for_and_update_peer_seq(struct drbd_conf *mdev, const u32 peer_seq)
1819 {
1820 DEFINE_WAIT(wait);
1821 long timeout;
1822 int ret;
1823
1824 if (!need_peer_seq(mdev))
1825 return 0;
1826
1827 spin_lock(&mdev->peer_seq_lock);
1828 for (;;) {
1829 if (!seq_greater(peer_seq - 1, mdev->peer_seq)) {
1830 mdev->peer_seq = seq_max(mdev->peer_seq, peer_seq);
1831 ret = 0;
1832 break;
1833 }
1834 if (signal_pending(current)) {
1835 ret = -ERESTARTSYS;
1836 break;
1837 }
1838 prepare_to_wait(&mdev->seq_wait, &wait, TASK_INTERRUPTIBLE);
1839 spin_unlock(&mdev->peer_seq_lock);
1840 timeout = mdev->tconn->net_conf->ping_timeo*HZ/10;
1841 timeout = schedule_timeout(timeout);
1842 spin_lock(&mdev->peer_seq_lock);
1843 if (!timeout) {
1844 ret = -ETIMEDOUT;
1845 dev_err(DEV, "Timed out waiting for missing ack packets; disconnecting\n");
1846 break;
1847 }
1848 }
1849 spin_unlock(&mdev->peer_seq_lock);
1850 finish_wait(&mdev->seq_wait, &wait);
1851 return ret;
1852 }
1853
1854 /* see also bio_flags_to_wire()
1855 * DRBD_REQ_*, because we need to semantically map the flags to data packet
1856 * flags and back. We may replicate to other kernel versions. */
1857 static unsigned long wire_flags_to_bio(struct drbd_conf *mdev, u32 dpf)
1858 {
1859 return (dpf & DP_RW_SYNC ? REQ_SYNC : 0) |
1860 (dpf & DP_FUA ? REQ_FUA : 0) |
1861 (dpf & DP_FLUSH ? REQ_FLUSH : 0) |
1862 (dpf & DP_DISCARD ? REQ_DISCARD : 0);
1863 }
1864
1865 static void fail_postponed_requests(struct drbd_conf *mdev, sector_t sector,
1866 unsigned int size)
1867 {
1868 struct drbd_interval *i;
1869
1870 repeat:
1871 drbd_for_each_overlap(i, &mdev->write_requests, sector, size) {
1872 struct drbd_request *req;
1873 struct bio_and_error m;
1874
1875 if (!i->local)
1876 continue;
1877 req = container_of(i, struct drbd_request, i);
1878 if (!(req->rq_state & RQ_POSTPONED))
1879 continue;
1880 req->rq_state &= ~RQ_POSTPONED;
1881 __req_mod(req, NEG_ACKED, &m);
1882 spin_unlock_irq(&mdev->tconn->req_lock);
1883 if (m.bio)
1884 complete_master_bio(mdev, &m);
1885 spin_lock_irq(&mdev->tconn->req_lock);
1886 goto repeat;
1887 }
1888 }
1889
1890 static int handle_write_conflicts(struct drbd_conf *mdev,
1891 struct drbd_peer_request *peer_req)
1892 {
1893 struct drbd_tconn *tconn = mdev->tconn;
1894 bool resolve_conflicts = test_bit(DISCARD_CONCURRENT, &tconn->flags);
1895 sector_t sector = peer_req->i.sector;
1896 const unsigned int size = peer_req->i.size;
1897 struct drbd_interval *i;
1898 bool equal;
1899 int err;
1900
1901 /*
1902 * Inserting the peer request into the write_requests tree will prevent
1903 * new conflicting local requests from being added.
1904 */
1905 drbd_insert_interval(&mdev->write_requests, &peer_req->i);
1906
1907 repeat:
1908 drbd_for_each_overlap(i, &mdev->write_requests, sector, size) {
1909 if (i == &peer_req->i)
1910 continue;
1911
1912 if (!i->local) {
1913 /*
1914 * Our peer has sent a conflicting remote request; this
1915 * should not happen in a two-node setup. Wait for the
1916 * earlier peer request to complete.
1917 */
1918 err = drbd_wait_misc(mdev, i);
1919 if (err)
1920 goto out;
1921 goto repeat;
1922 }
1923
1924 equal = i->sector == sector && i->size == size;
1925 if (resolve_conflicts) {
1926 /*
1927 * If the peer request is fully contained within the
1928 * overlapping request, it can be discarded; otherwise,
1929 * it will be retried once all overlapping requests
1930 * have completed.
1931 */
1932 bool discard = i->sector <= sector && i->sector +
1933 (i->size >> 9) >= sector + (size >> 9);
1934
1935 if (!equal)
1936 dev_alert(DEV, "Concurrent writes detected: "
1937 "local=%llus +%u, remote=%llus +%u, "
1938 "assuming %s came first\n",
1939 (unsigned long long)i->sector, i->size,
1940 (unsigned long long)sector, size,
1941 discard ? "local" : "remote");
1942
1943 inc_unacked(mdev);
1944 peer_req->w.cb = discard ? e_send_discard_write :
1945 e_send_retry_write;
1946 list_add_tail(&peer_req->w.list, &mdev->done_ee);
1947 wake_asender(mdev->tconn);
1948
1949 err = -ENOENT;
1950 goto out;
1951 } else {
1952 struct drbd_request *req =
1953 container_of(i, struct drbd_request, i);
1954
1955 if (!equal)
1956 dev_alert(DEV, "Concurrent writes detected: "
1957 "local=%llus +%u, remote=%llus +%u\n",
1958 (unsigned long long)i->sector, i->size,
1959 (unsigned long long)sector, size);
1960
1961 if (req->rq_state & RQ_LOCAL_PENDING ||
1962 !(req->rq_state & RQ_POSTPONED)) {
1963 /*
1964 * Wait for the node with the discard flag to
1965 * decide if this request will be discarded or
1966 * retried. Requests that are discarded will
1967 * disappear from the write_requests tree.
1968 *
1969 * In addition, wait for the conflicting
1970 * request to finish locally before submitting
1971 * the conflicting peer request.
1972 */
1973 err = drbd_wait_misc(mdev, &req->i);
1974 if (err) {
1975 _conn_request_state(mdev->tconn,
1976 NS(conn, C_TIMEOUT),
1977 CS_HARD);
1978 fail_postponed_requests(mdev, sector, size);
1979 goto out;
1980 }
1981 goto repeat;
1982 }
1983 /*
1984 * Remember to restart the conflicting requests after
1985 * the new peer request has completed.
1986 */
1987 peer_req->flags |= EE_RESTART_REQUESTS;
1988 }
1989 }
1990 err = 0;
1991
1992 out:
1993 if (err)
1994 drbd_remove_epoch_entry_interval(mdev, peer_req);
1995 return err;
1996 }
1997
1998 /* mirrored write */
1999 static int receive_Data(struct drbd_tconn *tconn, struct packet_info *pi)
2000 {
2001 struct drbd_conf *mdev;
2002 sector_t sector;
2003 struct drbd_peer_request *peer_req;
2004 struct p_data *p = pi->data;
2005 u32 peer_seq = be32_to_cpu(p->seq_num);
2006 int rw = WRITE;
2007 u32 dp_flags;
2008 int err;
2009
2010 mdev = vnr_to_mdev(tconn, pi->vnr);
2011 if (!mdev)
2012 return -EIO;
2013
2014 if (!get_ldev(mdev)) {
2015 int err2;
2016
2017 err = wait_for_and_update_peer_seq(mdev, peer_seq);
2018 drbd_send_ack_dp(mdev, P_NEG_ACK, p, pi->size);
2019 atomic_inc(&mdev->current_epoch->epoch_size);
2020 err2 = drbd_drain_block(mdev, pi->size);
2021 if (!err)
2022 err = err2;
2023 return err;
2024 }
2025
2026 /*
2027 * Corresponding put_ldev done either below (on various errors), or in
2028 * drbd_peer_request_endio, if we successfully submit the data at the
2029 * end of this function.
2030 */
2031
2032 sector = be64_to_cpu(p->sector);
2033 peer_req = read_in_block(mdev, p->block_id, sector, pi->size);
2034 if (!peer_req) {
2035 put_ldev(mdev);
2036 return -EIO;
2037 }
2038
2039 peer_req->w.cb = e_end_block;
2040
2041 dp_flags = be32_to_cpu(p->dp_flags);
2042 rw |= wire_flags_to_bio(mdev, dp_flags);
2043
2044 if (dp_flags & DP_MAY_SET_IN_SYNC)
2045 peer_req->flags |= EE_MAY_SET_IN_SYNC;
2046
2047 spin_lock(&mdev->epoch_lock);
2048 peer_req->epoch = mdev->current_epoch;
2049 atomic_inc(&peer_req->epoch->epoch_size);
2050 atomic_inc(&peer_req->epoch->active);
2051 spin_unlock(&mdev->epoch_lock);
2052
2053 if (mdev->tconn->net_conf->two_primaries) {
2054 err = wait_for_and_update_peer_seq(mdev, peer_seq);
2055 if (err)
2056 goto out_interrupted;
2057 spin_lock_irq(&mdev->tconn->req_lock);
2058 err = handle_write_conflicts(mdev, peer_req);
2059 if (err) {
2060 spin_unlock_irq(&mdev->tconn->req_lock);
2061 if (err == -ENOENT) {
2062 put_ldev(mdev);
2063 return 0;
2064 }
2065 goto out_interrupted;
2066 }
2067 } else
2068 spin_lock_irq(&mdev->tconn->req_lock);
2069 list_add(&peer_req->w.list, &mdev->active_ee);
2070 spin_unlock_irq(&mdev->tconn->req_lock);
2071
2072 switch (mdev->tconn->net_conf->wire_protocol) {
2073 case DRBD_PROT_C:
2074 inc_unacked(mdev);
2075 /* corresponding dec_unacked() in e_end_block()
2076 * respective _drbd_clear_done_ee */
2077 break;
2078 case DRBD_PROT_B:
2079 /* I really don't like it that the receiver thread
2080 * sends on the msock, but anyways */
2081 drbd_send_ack(mdev, P_RECV_ACK, peer_req);
2082 break;
2083 case DRBD_PROT_A:
2084 /* nothing to do */
2085 break;
2086 }
2087
2088 if (mdev->state.pdsk < D_INCONSISTENT) {
2089 /* In case we have the only disk of the cluster, */
2090 drbd_set_out_of_sync(mdev, peer_req->i.sector, peer_req->i.size);
2091 peer_req->flags |= EE_CALL_AL_COMPLETE_IO;
2092 peer_req->flags &= ~EE_MAY_SET_IN_SYNC;
2093 drbd_al_begin_io(mdev, &peer_req->i);
2094 }
2095
2096 err = drbd_submit_peer_request(mdev, peer_req, rw, DRBD_FAULT_DT_WR);
2097 if (!err)
2098 return 0;
2099
2100 /* don't care for the reason here */
2101 dev_err(DEV, "submit failed, triggering re-connect\n");
2102 spin_lock_irq(&mdev->tconn->req_lock);
2103 list_del(&peer_req->w.list);
2104 drbd_remove_epoch_entry_interval(mdev, peer_req);
2105 spin_unlock_irq(&mdev->tconn->req_lock);
2106 if (peer_req->flags & EE_CALL_AL_COMPLETE_IO)
2107 drbd_al_complete_io(mdev, &peer_req->i);
2108
2109 out_interrupted:
2110 drbd_may_finish_epoch(mdev, peer_req->epoch, EV_PUT + EV_CLEANUP);
2111 put_ldev(mdev);
2112 drbd_free_peer_req(mdev, peer_req);
2113 return err;
2114 }
2115
2116 /* We may throttle resync, if the lower device seems to be busy,
2117 * and current sync rate is above c_min_rate.
2118 *
2119 * To decide whether or not the lower device is busy, we use a scheme similar
2120 * to MD RAID is_mddev_idle(): if the partition stats reveal "significant"
2121 * (more than 64 sectors) of activity we cannot account for with our own resync
2122 * activity, it obviously is "busy".
2123 *
2124 * The current sync rate used here uses only the most recent two step marks,
2125 * to have a short time average so we can react faster.
2126 */
2127 int drbd_rs_should_slow_down(struct drbd_conf *mdev, sector_t sector)
2128 {
2129 struct gendisk *disk = mdev->ldev->backing_bdev->bd_contains->bd_disk;
2130 unsigned long db, dt, dbdt;
2131 struct lc_element *tmp;
2132 int curr_events;
2133 int throttle = 0;
2134
2135 /* feature disabled? */
2136 if (mdev->ldev->dc.c_min_rate == 0)
2137 return 0;
2138
2139 spin_lock_irq(&mdev->al_lock);
2140 tmp = lc_find(mdev->resync, BM_SECT_TO_EXT(sector));
2141 if (tmp) {
2142 struct bm_extent *bm_ext = lc_entry(tmp, struct bm_extent, lce);
2143 if (test_bit(BME_PRIORITY, &bm_ext->flags)) {
2144 spin_unlock_irq(&mdev->al_lock);
2145 return 0;
2146 }
2147 /* Do not slow down if app IO is already waiting for this extent */
2148 }
2149 spin_unlock_irq(&mdev->al_lock);
2150
2151 curr_events = (int)part_stat_read(&disk->part0, sectors[0]) +
2152 (int)part_stat_read(&disk->part0, sectors[1]) -
2153 atomic_read(&mdev->rs_sect_ev);
2154
2155 if (!mdev->rs_last_events || curr_events - mdev->rs_last_events > 64) {
2156 unsigned long rs_left;
2157 int i;
2158
2159 mdev->rs_last_events = curr_events;
2160
2161 /* sync speed average over the last 2*DRBD_SYNC_MARK_STEP,
2162 * approx. */
2163 i = (mdev->rs_last_mark + DRBD_SYNC_MARKS-1) % DRBD_SYNC_MARKS;
2164
2165 if (mdev->state.conn == C_VERIFY_S || mdev->state.conn == C_VERIFY_T)
2166 rs_left = mdev->ov_left;
2167 else
2168 rs_left = drbd_bm_total_weight(mdev) - mdev->rs_failed;
2169
2170 dt = ((long)jiffies - (long)mdev->rs_mark_time[i]) / HZ;
2171 if (!dt)
2172 dt++;
2173 db = mdev->rs_mark_left[i] - rs_left;
2174 dbdt = Bit2KB(db/dt);
2175
2176 if (dbdt > mdev->ldev->dc.c_min_rate)
2177 throttle = 1;
2178 }
2179 return throttle;
2180 }
2181
2182
2183 static int receive_DataRequest(struct drbd_tconn *tconn, struct packet_info *pi)
2184 {
2185 struct drbd_conf *mdev;
2186 sector_t sector;
2187 sector_t capacity;
2188 struct drbd_peer_request *peer_req;
2189 struct digest_info *di = NULL;
2190 int size, verb;
2191 unsigned int fault_type;
2192 struct p_block_req *p = pi->data;
2193
2194 mdev = vnr_to_mdev(tconn, pi->vnr);
2195 if (!mdev)
2196 return -EIO;
2197 capacity = drbd_get_capacity(mdev->this_bdev);
2198
2199 sector = be64_to_cpu(p->sector);
2200 size = be32_to_cpu(p->blksize);
2201
2202 if (size <= 0 || !IS_ALIGNED(size, 512) || size > DRBD_MAX_BIO_SIZE) {
2203 dev_err(DEV, "%s:%d: sector: %llus, size: %u\n", __FILE__, __LINE__,
2204 (unsigned long long)sector, size);
2205 return -EINVAL;
2206 }
2207 if (sector + (size>>9) > capacity) {
2208 dev_err(DEV, "%s:%d: sector: %llus, size: %u\n", __FILE__, __LINE__,
2209 (unsigned long long)sector, size);
2210 return -EINVAL;
2211 }
2212
2213 if (!get_ldev_if_state(mdev, D_UP_TO_DATE)) {
2214 verb = 1;
2215 switch (pi->cmd) {
2216 case P_DATA_REQUEST:
2217 drbd_send_ack_rp(mdev, P_NEG_DREPLY, p);
2218 break;
2219 case P_RS_DATA_REQUEST:
2220 case P_CSUM_RS_REQUEST:
2221 case P_OV_REQUEST:
2222 drbd_send_ack_rp(mdev, P_NEG_RS_DREPLY , p);
2223 break;
2224 case P_OV_REPLY:
2225 verb = 0;
2226 dec_rs_pending(mdev);
2227 drbd_send_ack_ex(mdev, P_OV_RESULT, sector, size, ID_IN_SYNC);
2228 break;
2229 default:
2230 BUG();
2231 }
2232 if (verb && __ratelimit(&drbd_ratelimit_state))
2233 dev_err(DEV, "Can not satisfy peer's read request, "
2234 "no local data.\n");
2235
2236 /* drain possibly payload */
2237 return drbd_drain_block(mdev, pi->size);
2238 }
2239
2240 /* GFP_NOIO, because we must not cause arbitrary write-out: in a DRBD
2241 * "criss-cross" setup, that might cause write-out on some other DRBD,
2242 * which in turn might block on the other node at this very place. */
2243 peer_req = drbd_alloc_peer_req(mdev, p->block_id, sector, size, GFP_NOIO);
2244 if (!peer_req) {
2245 put_ldev(mdev);
2246 return -ENOMEM;
2247 }
2248
2249 switch (pi->cmd) {
2250 case P_DATA_REQUEST:
2251 peer_req->w.cb = w_e_end_data_req;
2252 fault_type = DRBD_FAULT_DT_RD;
2253 /* application IO, don't drbd_rs_begin_io */
2254 goto submit;
2255
2256 case P_RS_DATA_REQUEST:
2257 peer_req->w.cb = w_e_end_rsdata_req;
2258 fault_type = DRBD_FAULT_RS_RD;
2259 /* used in the sector offset progress display */
2260 mdev->bm_resync_fo = BM_SECT_TO_BIT(sector);
2261 break;
2262
2263 case P_OV_REPLY:
2264 case P_CSUM_RS_REQUEST:
2265 fault_type = DRBD_FAULT_RS_RD;
2266 di = kmalloc(sizeof(*di) + pi->size, GFP_NOIO);
2267 if (!di)
2268 goto out_free_e;
2269
2270 di->digest_size = pi->size;
2271 di->digest = (((char *)di)+sizeof(struct digest_info));
2272
2273 peer_req->digest = di;
2274 peer_req->flags |= EE_HAS_DIGEST;
2275
2276 if (drbd_recv_all(mdev->tconn, di->digest, pi->size))
2277 goto out_free_e;
2278
2279 if (pi->cmd == P_CSUM_RS_REQUEST) {
2280 D_ASSERT(mdev->tconn->agreed_pro_version >= 89);
2281 peer_req->w.cb = w_e_end_csum_rs_req;
2282 /* used in the sector offset progress display */
2283 mdev->bm_resync_fo = BM_SECT_TO_BIT(sector);
2284 } else if (pi->cmd == P_OV_REPLY) {
2285 /* track progress, we may need to throttle */
2286 atomic_add(size >> 9, &mdev->rs_sect_in);
2287 peer_req->w.cb = w_e_end_ov_reply;
2288 dec_rs_pending(mdev);
2289 /* drbd_rs_begin_io done when we sent this request,
2290 * but accounting still needs to be done. */
2291 goto submit_for_resync;
2292 }
2293 break;
2294
2295 case P_OV_REQUEST:
2296 if (mdev->ov_start_sector == ~(sector_t)0 &&
2297 mdev->tconn->agreed_pro_version >= 90) {
2298 unsigned long now = jiffies;
2299 int i;
2300 mdev->ov_start_sector = sector;
2301 mdev->ov_position = sector;
2302 mdev->ov_left = drbd_bm_bits(mdev) - BM_SECT_TO_BIT(sector);
2303 mdev->rs_total = mdev->ov_left;
2304 for (i = 0; i < DRBD_SYNC_MARKS; i++) {
2305 mdev->rs_mark_left[i] = mdev->ov_left;
2306 mdev->rs_mark_time[i] = now;
2307 }
2308 dev_info(DEV, "Online Verify start sector: %llu\n",
2309 (unsigned long long)sector);
2310 }
2311 peer_req->w.cb = w_e_end_ov_req;
2312 fault_type = DRBD_FAULT_RS_RD;
2313 break;
2314
2315 default:
2316 BUG();
2317 }
2318
2319 /* Throttle, drbd_rs_begin_io and submit should become asynchronous
2320 * wrt the receiver, but it is not as straightforward as it may seem.
2321 * Various places in the resync start and stop logic assume resync
2322 * requests are processed in order, requeuing this on the worker thread
2323 * introduces a bunch of new code for synchronization between threads.
2324 *
2325 * Unlimited throttling before drbd_rs_begin_io may stall the resync
2326 * "forever", throttling after drbd_rs_begin_io will lock that extent
2327 * for application writes for the same time. For now, just throttle
2328 * here, where the rest of the code expects the receiver to sleep for
2329 * a while, anyways.
2330 */
2331
2332 /* Throttle before drbd_rs_begin_io, as that locks out application IO;
2333 * this defers syncer requests for some time, before letting at least
2334 * on request through. The resync controller on the receiving side
2335 * will adapt to the incoming rate accordingly.
2336 *
2337 * We cannot throttle here if remote is Primary/SyncTarget:
2338 * we would also throttle its application reads.
2339 * In that case, throttling is done on the SyncTarget only.
2340 */
2341 if (mdev->state.peer != R_PRIMARY && drbd_rs_should_slow_down(mdev, sector))
2342 schedule_timeout_uninterruptible(HZ/10);
2343 if (drbd_rs_begin_io(mdev, sector))
2344 goto out_free_e;
2345
2346 submit_for_resync:
2347 atomic_add(size >> 9, &mdev->rs_sect_ev);
2348
2349 submit:
2350 inc_unacked(mdev);
2351 spin_lock_irq(&mdev->tconn->req_lock);
2352 list_add_tail(&peer_req->w.list, &mdev->read_ee);
2353 spin_unlock_irq(&mdev->tconn->req_lock);
2354
2355 if (drbd_submit_peer_request(mdev, peer_req, READ, fault_type) == 0)
2356 return 0;
2357
2358 /* don't care for the reason here */
2359 dev_err(DEV, "submit failed, triggering re-connect\n");
2360 spin_lock_irq(&mdev->tconn->req_lock);
2361 list_del(&peer_req->w.list);
2362 spin_unlock_irq(&mdev->tconn->req_lock);
2363 /* no drbd_rs_complete_io(), we are dropping the connection anyways */
2364
2365 out_free_e:
2366 put_ldev(mdev);
2367 drbd_free_peer_req(mdev, peer_req);
2368 return -EIO;
2369 }
2370
2371 static int drbd_asb_recover_0p(struct drbd_conf *mdev) __must_hold(local)
2372 {
2373 int self, peer, rv = -100;
2374 unsigned long ch_self, ch_peer;
2375
2376 self = mdev->ldev->md.uuid[UI_BITMAP] & 1;
2377 peer = mdev->p_uuid[UI_BITMAP] & 1;
2378
2379 ch_peer = mdev->p_uuid[UI_SIZE];
2380 ch_self = mdev->comm_bm_set;
2381
2382 switch (mdev->tconn->net_conf->after_sb_0p) {
2383 case ASB_CONSENSUS:
2384 case ASB_DISCARD_SECONDARY:
2385 case ASB_CALL_HELPER:
2386 dev_err(DEV, "Configuration error.\n");
2387 break;
2388 case ASB_DISCONNECT:
2389 break;
2390 case ASB_DISCARD_YOUNGER_PRI:
2391 if (self == 0 && peer == 1) {
2392 rv = -1;
2393 break;
2394 }
2395 if (self == 1 && peer == 0) {
2396 rv = 1;
2397 break;
2398 }
2399 /* Else fall through to one of the other strategies... */
2400 case ASB_DISCARD_OLDER_PRI:
2401 if (self == 0 && peer == 1) {
2402 rv = 1;
2403 break;
2404 }
2405 if (self == 1 && peer == 0) {
2406 rv = -1;
2407 break;
2408 }
2409 /* Else fall through to one of the other strategies... */
2410 dev_warn(DEV, "Discard younger/older primary did not find a decision\n"
2411 "Using discard-least-changes instead\n");
2412 case ASB_DISCARD_ZERO_CHG:
2413 if (ch_peer == 0 && ch_self == 0) {
2414 rv = test_bit(DISCARD_CONCURRENT, &mdev->tconn->flags)
2415 ? -1 : 1;
2416 break;
2417 } else {
2418 if (ch_peer == 0) { rv = 1; break; }
2419 if (ch_self == 0) { rv = -1; break; }
2420 }
2421 if (mdev->tconn->net_conf->after_sb_0p == ASB_DISCARD_ZERO_CHG)
2422 break;
2423 case ASB_DISCARD_LEAST_CHG:
2424 if (ch_self < ch_peer)
2425 rv = -1;
2426 else if (ch_self > ch_peer)
2427 rv = 1;
2428 else /* ( ch_self == ch_peer ) */
2429 /* Well, then use something else. */
2430 rv = test_bit(DISCARD_CONCURRENT, &mdev->tconn->flags)
2431 ? -1 : 1;
2432 break;
2433 case ASB_DISCARD_LOCAL:
2434 rv = -1;
2435 break;
2436 case ASB_DISCARD_REMOTE:
2437 rv = 1;
2438 }
2439
2440 return rv;
2441 }
2442
2443 static int drbd_asb_recover_1p(struct drbd_conf *mdev) __must_hold(local)
2444 {
2445 int hg, rv = -100;
2446
2447 switch (mdev->tconn->net_conf->after_sb_1p) {
2448 case ASB_DISCARD_YOUNGER_PRI:
2449 case ASB_DISCARD_OLDER_PRI:
2450 case ASB_DISCARD_LEAST_CHG:
2451 case ASB_DISCARD_LOCAL:
2452 case ASB_DISCARD_REMOTE:
2453 dev_err(DEV, "Configuration error.\n");
2454 break;
2455 case ASB_DISCONNECT:
2456 break;
2457 case ASB_CONSENSUS:
2458 hg = drbd_asb_recover_0p(mdev);
2459 if (hg == -1 && mdev->state.role == R_SECONDARY)
2460 rv = hg;
2461 if (hg == 1 && mdev->state.role == R_PRIMARY)
2462 rv = hg;
2463 break;
2464 case ASB_VIOLENTLY:
2465 rv = drbd_asb_recover_0p(mdev);
2466 break;
2467 case ASB_DISCARD_SECONDARY:
2468 return mdev->state.role == R_PRIMARY ? 1 : -1;
2469 case ASB_CALL_HELPER:
2470 hg = drbd_asb_recover_0p(mdev);
2471 if (hg == -1 && mdev->state.role == R_PRIMARY) {
2472 enum drbd_state_rv rv2;
2473
2474 drbd_set_role(mdev, R_SECONDARY, 0);
2475 /* drbd_change_state() does not sleep while in SS_IN_TRANSIENT_STATE,
2476 * we might be here in C_WF_REPORT_PARAMS which is transient.
2477 * we do not need to wait for the after state change work either. */
2478 rv2 = drbd_change_state(mdev, CS_VERBOSE, NS(role, R_SECONDARY));
2479 if (rv2 != SS_SUCCESS) {
2480 drbd_khelper(mdev, "pri-lost-after-sb");
2481 } else {
2482 dev_warn(DEV, "Successfully gave up primary role.\n");
2483 rv = hg;
2484 }
2485 } else
2486 rv = hg;
2487 }
2488
2489 return rv;
2490 }
2491
2492 static int drbd_asb_recover_2p(struct drbd_conf *mdev) __must_hold(local)
2493 {
2494 int hg, rv = -100;
2495
2496 switch (mdev->tconn->net_conf->after_sb_2p) {
2497 case ASB_DISCARD_YOUNGER_PRI:
2498 case ASB_DISCARD_OLDER_PRI:
2499 case ASB_DISCARD_LEAST_CHG:
2500 case ASB_DISCARD_LOCAL:
2501 case ASB_DISCARD_REMOTE:
2502 case ASB_CONSENSUS:
2503 case ASB_DISCARD_SECONDARY:
2504 dev_err(DEV, "Configuration error.\n");
2505 break;
2506 case ASB_VIOLENTLY:
2507 rv = drbd_asb_recover_0p(mdev);
2508 break;
2509 case ASB_DISCONNECT:
2510 break;
2511 case ASB_CALL_HELPER:
2512 hg = drbd_asb_recover_0p(mdev);
2513 if (hg == -1) {
2514 enum drbd_state_rv rv2;
2515
2516 /* drbd_change_state() does not sleep while in SS_IN_TRANSIENT_STATE,
2517 * we might be here in C_WF_REPORT_PARAMS which is transient.
2518 * we do not need to wait for the after state change work either. */
2519 rv2 = drbd_change_state(mdev, CS_VERBOSE, NS(role, R_SECONDARY));
2520 if (rv2 != SS_SUCCESS) {
2521 drbd_khelper(mdev, "pri-lost-after-sb");
2522 } else {
2523 dev_warn(DEV, "Successfully gave up primary role.\n");
2524 rv = hg;
2525 }
2526 } else
2527 rv = hg;
2528 }
2529
2530 return rv;
2531 }
2532
2533 static void drbd_uuid_dump(struct drbd_conf *mdev, char *text, u64 *uuid,
2534 u64 bits, u64 flags)
2535 {
2536 if (!uuid) {
2537 dev_info(DEV, "%s uuid info vanished while I was looking!\n", text);
2538 return;
2539 }
2540 dev_info(DEV, "%s %016llX:%016llX:%016llX:%016llX bits:%llu flags:%llX\n",
2541 text,
2542 (unsigned long long)uuid[UI_CURRENT],
2543 (unsigned long long)uuid[UI_BITMAP],
2544 (unsigned long long)uuid[UI_HISTORY_START],
2545 (unsigned long long)uuid[UI_HISTORY_END],
2546 (unsigned long long)bits,
2547 (unsigned long long)flags);
2548 }
2549
2550 /*
2551 100 after split brain try auto recover
2552 2 C_SYNC_SOURCE set BitMap
2553 1 C_SYNC_SOURCE use BitMap
2554 0 no Sync
2555 -1 C_SYNC_TARGET use BitMap
2556 -2 C_SYNC_TARGET set BitMap
2557 -100 after split brain, disconnect
2558 -1000 unrelated data
2559 -1091 requires proto 91
2560 -1096 requires proto 96
2561 */
2562 static int drbd_uuid_compare(struct drbd_conf *mdev, int *rule_nr) __must_hold(local)
2563 {
2564 u64 self, peer;
2565 int i, j;
2566
2567 self = mdev->ldev->md.uuid[UI_CURRENT] & ~((u64)1);
2568 peer = mdev->p_uuid[UI_CURRENT] & ~((u64)1);
2569
2570 *rule_nr = 10;
2571 if (self == UUID_JUST_CREATED && peer == UUID_JUST_CREATED)
2572 return 0;
2573
2574 *rule_nr = 20;
2575 if ((self == UUID_JUST_CREATED || self == (u64)0) &&
2576 peer != UUID_JUST_CREATED)
2577 return -2;
2578
2579 *rule_nr = 30;
2580 if (self != UUID_JUST_CREATED &&
2581 (peer == UUID_JUST_CREATED || peer == (u64)0))
2582 return 2;
2583
2584 if (self == peer) {
2585 int rct, dc; /* roles at crash time */
2586
2587 if (mdev->p_uuid[UI_BITMAP] == (u64)0 && mdev->ldev->md.uuid[UI_BITMAP] != (u64)0) {
2588
2589 if (mdev->tconn->agreed_pro_version < 91)
2590 return -1091;
2591
2592 if ((mdev->ldev->md.uuid[UI_BITMAP] & ~((u64)1)) == (mdev->p_uuid[UI_HISTORY_START] & ~((u64)1)) &&
2593 (mdev->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) == (mdev->p_uuid[UI_HISTORY_START + 1] & ~((u64)1))) {
2594 dev_info(DEV, "was SyncSource, missed the resync finished event, corrected myself:\n");
2595 drbd_uuid_set_bm(mdev, 0UL);
2596
2597 drbd_uuid_dump(mdev, "self", mdev->ldev->md.uuid,
2598 mdev->state.disk >= D_NEGOTIATING ? drbd_bm_total_weight(mdev) : 0, 0);
2599 *rule_nr = 34;
2600 } else {
2601 dev_info(DEV, "was SyncSource (peer failed to write sync_uuid)\n");
2602 *rule_nr = 36;
2603 }
2604
2605 return 1;
2606 }
2607
2608 if (mdev->ldev->md.uuid[UI_BITMAP] == (u64)0 && mdev->p_uuid[UI_BITMAP] != (u64)0) {
2609
2610 if (mdev->tconn->agreed_pro_version < 91)
2611 return -1091;
2612
2613 if ((mdev->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) == (mdev->p_uuid[UI_BITMAP] & ~((u64)1)) &&
2614 (mdev->ldev->md.uuid[UI_HISTORY_START + 1] & ~((u64)1)) == (mdev->p_uuid[UI_HISTORY_START] & ~((u64)1))) {
2615 dev_info(DEV, "was SyncTarget, peer missed the resync finished event, corrected peer:\n");
2616
2617 mdev->p_uuid[UI_HISTORY_START + 1] = mdev->p_uuid[UI_HISTORY_START];
2618 mdev->p_uuid[UI_HISTORY_START] = mdev->p_uuid[UI_BITMAP];
2619 mdev->p_uuid[UI_BITMAP] = 0UL;
2620
2621 drbd_uuid_dump(mdev, "peer", mdev->p_uuid, mdev->p_uuid[UI_SIZE], mdev->p_uuid[UI_FLAGS]);
2622 *rule_nr = 35;
2623 } else {
2624 dev_info(DEV, "was SyncTarget (failed to write sync_uuid)\n");
2625 *rule_nr = 37;
2626 }
2627
2628 return -1;
2629 }
2630
2631 /* Common power [off|failure] */
2632 rct = (test_bit(CRASHED_PRIMARY, &mdev->flags) ? 1 : 0) +
2633 (mdev->p_uuid[UI_FLAGS] & 2);
2634 /* lowest bit is set when we were primary,
2635 * next bit (weight 2) is set when peer was primary */
2636 *rule_nr = 40;
2637
2638 switch (rct) {
2639 case 0: /* !self_pri && !peer_pri */ return 0;
2640 case 1: /* self_pri && !peer_pri */ return 1;
2641 case 2: /* !self_pri && peer_pri */ return -1;
2642 case 3: /* self_pri && peer_pri */
2643 dc = test_bit(DISCARD_CONCURRENT, &mdev->tconn->flags);
2644 return dc ? -1 : 1;
2645 }
2646 }
2647
2648 *rule_nr = 50;
2649 peer = mdev->p_uuid[UI_BITMAP] & ~((u64)1);
2650 if (self == peer)
2651 return -1;
2652
2653 *rule_nr = 51;
2654 peer = mdev->p_uuid[UI_HISTORY_START] & ~((u64)1);
2655 if (self == peer) {
2656 if (mdev->tconn->agreed_pro_version < 96 ?
2657 (mdev->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) ==
2658 (mdev->p_uuid[UI_HISTORY_START + 1] & ~((u64)1)) :
2659 peer + UUID_NEW_BM_OFFSET == (mdev->p_uuid[UI_BITMAP] & ~((u64)1))) {
2660 /* The last P_SYNC_UUID did not get though. Undo the last start of
2661 resync as sync source modifications of the peer's UUIDs. */
2662
2663 if (mdev->tconn->agreed_pro_version < 91)
2664 return -1091;
2665
2666 mdev->p_uuid[UI_BITMAP] = mdev->p_uuid[UI_HISTORY_START];
2667 mdev->p_uuid[UI_HISTORY_START] = mdev->p_uuid[UI_HISTORY_START + 1];
2668
2669 dev_info(DEV, "Did not got last syncUUID packet, corrected:\n");
2670 drbd_uuid_dump(mdev, "peer", mdev->p_uuid, mdev->p_uuid[UI_SIZE], mdev->p_uuid[UI_FLAGS]);
2671
2672 return -1;
2673 }
2674 }
2675
2676 *rule_nr = 60;
2677 self = mdev->ldev->md.uuid[UI_CURRENT] & ~((u64)1);
2678 for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
2679 peer = mdev->p_uuid[i] & ~((u64)1);
2680 if (self == peer)
2681 return -2;
2682 }
2683
2684 *rule_nr = 70;
2685 self = mdev->ldev->md.uuid[UI_BITMAP] & ~((u64)1);
2686 peer = mdev->p_uuid[UI_CURRENT] & ~((u64)1);
2687 if (self == peer)
2688 return 1;
2689
2690 *rule_nr = 71;
2691 self = mdev->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1);
2692 if (self == peer) {
2693 if (mdev->tconn->agreed_pro_version < 96 ?
2694 (mdev->ldev->md.uuid[UI_HISTORY_START + 1] & ~((u64)1)) ==
2695 (mdev->p_uuid[UI_HISTORY_START] & ~((u64)1)) :
2696 self + UUID_NEW_BM_OFFSET == (mdev->ldev->md.uuid[UI_BITMAP] & ~((u64)1))) {
2697 /* The last P_SYNC_UUID did not get though. Undo the last start of
2698 resync as sync source modifications of our UUIDs. */
2699
2700 if (mdev->tconn->agreed_pro_version < 91)
2701 return -1091;
2702
2703 _drbd_uuid_set(mdev, UI_BITMAP, mdev->ldev->md.uuid[UI_HISTORY_START]);
2704 _drbd_uuid_set(mdev, UI_HISTORY_START, mdev->ldev->md.uuid[UI_HISTORY_START + 1]);
2705
2706 dev_info(DEV, "Last syncUUID did not get through, corrected:\n");
2707 drbd_uuid_dump(mdev, "self", mdev->ldev->md.uuid,
2708 mdev->state.disk >= D_NEGOTIATING ? drbd_bm_total_weight(mdev) : 0, 0);
2709
2710 return 1;
2711 }
2712 }
2713
2714
2715 *rule_nr = 80;
2716 peer = mdev->p_uuid[UI_CURRENT] & ~((u64)1);
2717 for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
2718 self = mdev->ldev->md.uuid[i] & ~((u64)1);
2719 if (self == peer)
2720 return 2;
2721 }
2722
2723 *rule_nr = 90;
2724 self = mdev->ldev->md.uuid[UI_BITMAP] & ~((u64)1);
2725 peer = mdev->p_uuid[UI_BITMAP] & ~((u64)1);
2726 if (self == peer && self != ((u64)0))
2727 return 100;
2728
2729 *rule_nr = 100;
2730 for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
2731 self = mdev->ldev->md.uuid[i] & ~((u64)1);
2732 for (j = UI_HISTORY_START; j <= UI_HISTORY_END; j++) {
2733 peer = mdev->p_uuid[j] & ~((u64)1);
2734 if (self == peer)
2735 return -100;
2736 }
2737 }
2738
2739 return -1000;
2740 }
2741
2742 /* drbd_sync_handshake() returns the new conn state on success, or
2743 CONN_MASK (-1) on failure.
2744 */
2745 static enum drbd_conns drbd_sync_handshake(struct drbd_conf *mdev, enum drbd_role peer_role,
2746 enum drbd_disk_state peer_disk) __must_hold(local)
2747 {
2748 int hg, rule_nr;
2749 enum drbd_conns rv = C_MASK;
2750 enum drbd_disk_state mydisk;
2751
2752 mydisk = mdev->state.disk;
2753 if (mydisk == D_NEGOTIATING)
2754 mydisk = mdev->new_state_tmp.disk;
2755
2756 dev_info(DEV, "drbd_sync_handshake:\n");
2757 drbd_uuid_dump(mdev, "self", mdev->ldev->md.uuid, mdev->comm_bm_set, 0);
2758 drbd_uuid_dump(mdev, "peer", mdev->p_uuid,
2759 mdev->p_uuid[UI_SIZE], mdev->p_uuid[UI_FLAGS]);
2760
2761 hg = drbd_uuid_compare(mdev, &rule_nr);
2762
2763 dev_info(DEV, "uuid_compare()=%d by rule %d\n", hg, rule_nr);
2764
2765 if (hg == -1000) {
2766 dev_alert(DEV, "Unrelated data, aborting!\n");
2767 return C_MASK;
2768 }
2769 if (hg < -1000) {
2770 dev_alert(DEV, "To resolve this both sides have to support at least protocol %d\n", -hg - 1000);
2771 return C_MASK;
2772 }
2773
2774 if ((mydisk == D_INCONSISTENT && peer_disk > D_INCONSISTENT) ||
2775 (peer_disk == D_INCONSISTENT && mydisk > D_INCONSISTENT)) {
2776 int f = (hg == -100) || abs(hg) == 2;
2777 hg = mydisk > D_INCONSISTENT ? 1 : -1;
2778 if (f)
2779 hg = hg*2;
2780 dev_info(DEV, "Becoming sync %s due to disk states.\n",
2781 hg > 0 ? "source" : "target");
2782 }
2783
2784 if (abs(hg) == 100)
2785 drbd_khelper(mdev, "initial-split-brain");
2786
2787 if (hg == 100 || (hg == -100 && mdev->tconn->net_conf->always_asbp)) {
2788 int pcount = (mdev->state.role == R_PRIMARY)
2789 + (peer_role == R_PRIMARY);
2790 int forced = (hg == -100);
2791
2792 switch (pcount) {
2793 case 0:
2794 hg = drbd_asb_recover_0p(mdev);
2795 break;
2796 case 1:
2797 hg = drbd_asb_recover_1p(mdev);
2798 break;
2799 case 2:
2800 hg = drbd_asb_recover_2p(mdev);
2801 break;
2802 }
2803 if (abs(hg) < 100) {
2804 dev_warn(DEV, "Split-Brain detected, %d primaries, "
2805 "automatically solved. Sync from %s node\n",
2806 pcount, (hg < 0) ? "peer" : "this");
2807 if (forced) {
2808 dev_warn(DEV, "Doing a full sync, since"
2809 " UUIDs where ambiguous.\n");
2810 hg = hg*2;
2811 }
2812 }
2813 }
2814
2815 if (hg == -100) {
2816 if (mdev->tconn->net_conf->want_lose && !(mdev->p_uuid[UI_FLAGS]&1))
2817 hg = -1;
2818 if (!mdev->tconn->net_conf->want_lose && (mdev->p_uuid[UI_FLAGS]&1))
2819 hg = 1;
2820
2821 if (abs(hg) < 100)
2822 dev_warn(DEV, "Split-Brain detected, manually solved. "
2823 "Sync from %s node\n",
2824 (hg < 0) ? "peer" : "this");
2825 }
2826
2827 if (hg == -100) {
2828 /* FIXME this log message is not correct if we end up here
2829 * after an attempted attach on a diskless node.
2830 * We just refuse to attach -- well, we drop the "connection"
2831 * to that disk, in a way... */
2832 dev_alert(DEV, "Split-Brain detected but unresolved, dropping connection!\n");
2833 drbd_khelper(mdev, "split-brain");
2834 return C_MASK;
2835 }
2836
2837 if (hg > 0 && mydisk <= D_INCONSISTENT) {
2838 dev_err(DEV, "I shall become SyncSource, but I am inconsistent!\n");
2839 return C_MASK;
2840 }
2841
2842 if (hg < 0 && /* by intention we do not use mydisk here. */
2843 mdev->state.role == R_PRIMARY && mdev->state.disk >= D_CONSISTENT) {
2844 switch (mdev->tconn->net_conf->rr_conflict) {
2845 case ASB_CALL_HELPER:
2846 drbd_khelper(mdev, "pri-lost");
2847 /* fall through */
2848 case ASB_DISCONNECT:
2849 dev_err(DEV, "I shall become SyncTarget, but I am primary!\n");
2850 return C_MASK;
2851 case ASB_VIOLENTLY:
2852 dev_warn(DEV, "Becoming SyncTarget, violating the stable-data"
2853 "assumption\n");
2854 }
2855 }
2856
2857 if (mdev->tconn->net_conf->dry_run || test_bit(CONN_DRY_RUN, &mdev->tconn->flags)) {
2858 if (hg == 0)
2859 dev_info(DEV, "dry-run connect: No resync, would become Connected immediately.\n");
2860 else
2861 dev_info(DEV, "dry-run connect: Would become %s, doing a %s resync.",
2862 drbd_conn_str(hg > 0 ? C_SYNC_SOURCE : C_SYNC_TARGET),
2863 abs(hg) >= 2 ? "full" : "bit-map based");
2864 return C_MASK;
2865 }
2866
2867 if (abs(hg) >= 2) {
2868 dev_info(DEV, "Writing the whole bitmap, full sync required after drbd_sync_handshake.\n");
2869 if (drbd_bitmap_io(mdev, &drbd_bmio_set_n_write, "set_n_write from sync_handshake",
2870 BM_LOCKED_SET_ALLOWED))
2871 return C_MASK;
2872 }
2873
2874 if (hg > 0) { /* become sync source. */
2875 rv = C_WF_BITMAP_S;
2876 } else if (hg < 0) { /* become sync target */
2877 rv = C_WF_BITMAP_T;
2878 } else {
2879 rv = C_CONNECTED;
2880 if (drbd_bm_total_weight(mdev)) {
2881 dev_info(DEV, "No resync, but %lu bits in bitmap!\n",
2882 drbd_bm_total_weight(mdev));
2883 }
2884 }
2885
2886 return rv;
2887 }
2888
2889 /* returns 1 if invalid */
2890 static int cmp_after_sb(enum drbd_after_sb_p peer, enum drbd_after_sb_p self)
2891 {
2892 /* ASB_DISCARD_REMOTE - ASB_DISCARD_LOCAL is valid */
2893 if ((peer == ASB_DISCARD_REMOTE && self == ASB_DISCARD_LOCAL) ||
2894 (self == ASB_DISCARD_REMOTE && peer == ASB_DISCARD_LOCAL))
2895 return 0;
2896
2897 /* any other things with ASB_DISCARD_REMOTE or ASB_DISCARD_LOCAL are invalid */
2898 if (peer == ASB_DISCARD_REMOTE || peer == ASB_DISCARD_LOCAL ||
2899 self == ASB_DISCARD_REMOTE || self == ASB_DISCARD_LOCAL)
2900 return 1;
2901
2902 /* everything else is valid if they are equal on both sides. */
2903 if (peer == self)
2904 return 0;
2905
2906 /* everything es is invalid. */
2907 return 1;
2908 }
2909
2910 static int receive_protocol(struct drbd_tconn *tconn, struct packet_info *pi)
2911 {
2912 struct p_protocol *p = pi->data;
2913 int p_proto, p_after_sb_0p, p_after_sb_1p, p_after_sb_2p;
2914 int p_want_lose, p_two_primaries, cf;
2915 char p_integrity_alg[SHARED_SECRET_MAX] = "";
2916
2917 p_proto = be32_to_cpu(p->protocol);
2918 p_after_sb_0p = be32_to_cpu(p->after_sb_0p);
2919 p_after_sb_1p = be32_to_cpu(p->after_sb_1p);
2920 p_after_sb_2p = be32_to_cpu(p->after_sb_2p);
2921 p_two_primaries = be32_to_cpu(p->two_primaries);
2922 cf = be32_to_cpu(p->conn_flags);
2923 p_want_lose = cf & CF_WANT_LOSE;
2924
2925 clear_bit(CONN_DRY_RUN, &tconn->flags);
2926
2927 if (cf & CF_DRY_RUN)
2928 set_bit(CONN_DRY_RUN, &tconn->flags);
2929
2930 if (p_proto != tconn->net_conf->wire_protocol) {
2931 conn_err(tconn, "incompatible communication protocols\n");
2932 goto disconnect;
2933 }
2934
2935 if (cmp_after_sb(p_after_sb_0p, tconn->net_conf->after_sb_0p)) {
2936 conn_err(tconn, "incompatible after-sb-0pri settings\n");
2937 goto disconnect;
2938 }
2939
2940 if (cmp_after_sb(p_after_sb_1p, tconn->net_conf->after_sb_1p)) {
2941 conn_err(tconn, "incompatible after-sb-1pri settings\n");
2942 goto disconnect;
2943 }
2944
2945 if (cmp_after_sb(p_after_sb_2p, tconn->net_conf->after_sb_2p)) {
2946 conn_err(tconn, "incompatible after-sb-2pri settings\n");
2947 goto disconnect;
2948 }
2949
2950 if (p_want_lose && tconn->net_conf->want_lose) {
2951 conn_err(tconn, "both sides have the 'want_lose' flag set\n");
2952 goto disconnect;
2953 }
2954
2955 if (p_two_primaries != tconn->net_conf->two_primaries) {
2956 conn_err(tconn, "incompatible setting of the two-primaries options\n");
2957 goto disconnect;
2958 }
2959
2960 if (tconn->agreed_pro_version >= 87) {
2961 unsigned char *my_alg = tconn->net_conf->integrity_alg;
2962 int err;
2963
2964 err = drbd_recv_all(tconn, p_integrity_alg, pi->size);
2965 if (err)
2966 return err;
2967
2968 p_integrity_alg[SHARED_SECRET_MAX-1] = 0;
2969 if (strcmp(p_integrity_alg, my_alg)) {
2970 conn_err(tconn, "incompatible setting of the data-integrity-alg\n");
2971 goto disconnect;
2972 }
2973 conn_info(tconn, "data-integrity-alg: %s\n",
2974 my_alg[0] ? my_alg : (unsigned char *)"<not-used>");
2975 }
2976
2977 return 0;
2978
2979 disconnect:
2980 conn_request_state(tconn, NS(conn, C_DISCONNECTING), CS_HARD);
2981 return -EIO;
2982 }
2983
2984 /* helper function
2985 * input: alg name, feature name
2986 * return: NULL (alg name was "")
2987 * ERR_PTR(error) if something goes wrong
2988 * or the crypto hash ptr, if it worked out ok. */
2989 struct crypto_hash *drbd_crypto_alloc_digest_safe(const struct drbd_conf *mdev,
2990 const char *alg, const char *name)
2991 {
2992 struct crypto_hash *tfm;
2993
2994 if (!alg[0])
2995 return NULL;
2996
2997 tfm = crypto_alloc_hash(alg, 0, CRYPTO_ALG_ASYNC);
2998 if (IS_ERR(tfm)) {
2999 dev_err(DEV, "Can not allocate \"%s\" as %s (reason: %ld)\n",
3000 alg, name, PTR_ERR(tfm));
3001 return tfm;
3002 }
3003 if (!drbd_crypto_is_hash(crypto_hash_tfm(tfm))) {
3004 crypto_free_hash(tfm);
3005 dev_err(DEV, "\"%s\" is not a digest (%s)\n", alg, name);
3006 return ERR_PTR(-EINVAL);
3007 }
3008 return tfm;
3009 }
3010
3011 static int ignore_remaining_packet(struct drbd_tconn *tconn, struct packet_info *pi)
3012 {
3013 void *buffer = tconn->data.rbuf;
3014 int size = pi->size;
3015
3016 while (size) {
3017 int s = min_t(int, size, DRBD_SOCKET_BUFFER_SIZE);
3018 s = drbd_recv(tconn, buffer, s);
3019 if (s <= 0) {
3020 if (s < 0)
3021 return s;
3022 break;
3023 }
3024 size -= s;
3025 }
3026 if (size)
3027 return -EIO;
3028 return 0;
3029 }
3030
3031 /*
3032 * config_unknown_volume - device configuration command for unknown volume
3033 *
3034 * When a device is added to an existing connection, the node on which the
3035 * device is added first will send configuration commands to its peer but the
3036 * peer will not know about the device yet. It will warn and ignore these
3037 * commands. Once the device is added on the second node, the second node will
3038 * send the same device configuration commands, but in the other direction.
3039 *
3040 * (We can also end up here if drbd is misconfigured.)
3041 */
3042 static int config_unknown_volume(struct drbd_tconn *tconn, struct packet_info *pi)
3043 {
3044 conn_warn(tconn, "Volume %u unknown; ignoring %s packet\n",
3045 pi->vnr, cmdname(pi->cmd));
3046 return ignore_remaining_packet(tconn, pi);
3047 }
3048
3049 static int receive_SyncParam(struct drbd_tconn *tconn, struct packet_info *pi)
3050 {
3051 struct drbd_conf *mdev;
3052 struct p_rs_param_95 *p;
3053 unsigned int header_size, data_size, exp_max_sz;
3054 struct crypto_hash *verify_tfm = NULL;
3055 struct crypto_hash *csums_tfm = NULL;
3056 const int apv = tconn->agreed_pro_version;
3057 int *rs_plan_s = NULL;
3058 int fifo_size = 0;
3059 int err;
3060
3061 mdev = vnr_to_mdev(tconn, pi->vnr);
3062 if (!mdev)
3063 return config_unknown_volume(tconn, pi);
3064
3065 exp_max_sz = apv <= 87 ? sizeof(struct p_rs_param)
3066 : apv == 88 ? sizeof(struct p_rs_param)
3067 + SHARED_SECRET_MAX
3068 : apv <= 94 ? sizeof(struct p_rs_param_89)
3069 : /* apv >= 95 */ sizeof(struct p_rs_param_95);
3070
3071 if (pi->size > exp_max_sz) {
3072 dev_err(DEV, "SyncParam packet too long: received %u, expected <= %u bytes\n",
3073 pi->size, exp_max_sz);
3074 return -EIO;
3075 }
3076
3077 if (apv <= 88) {
3078 header_size = sizeof(struct p_rs_param);
3079 data_size = pi->size - header_size;
3080 } else if (apv <= 94) {
3081 header_size = sizeof(struct p_rs_param_89);
3082 data_size = pi->size - header_size;
3083 D_ASSERT(data_size == 0);
3084 } else {
3085 header_size = sizeof(struct p_rs_param_95);
3086 data_size = pi->size - header_size;
3087 D_ASSERT(data_size == 0);
3088 }
3089
3090 /* initialize verify_alg and csums_alg */
3091 p = pi->data;
3092 memset(p->verify_alg, 0, 2 * SHARED_SECRET_MAX);
3093
3094 err = drbd_recv_all(mdev->tconn, p, header_size);
3095 if (err)
3096 return err;
3097
3098 if (get_ldev(mdev)) {
3099 mdev->ldev->dc.resync_rate = be32_to_cpu(p->rate);
3100 put_ldev(mdev);
3101 }
3102
3103 if (apv >= 88) {
3104 if (apv == 88) {
3105 if (data_size > SHARED_SECRET_MAX) {
3106 dev_err(DEV, "verify-alg too long, "
3107 "peer wants %u, accepting only %u byte\n",
3108 data_size, SHARED_SECRET_MAX);
3109 return -EIO;
3110 }
3111
3112 err = drbd_recv_all(mdev->tconn, p->verify_alg, data_size);
3113 if (err)
3114 return err;
3115
3116 /* we expect NUL terminated string */
3117 /* but just in case someone tries to be evil */
3118 D_ASSERT(p->verify_alg[data_size-1] == 0);
3119 p->verify_alg[data_size-1] = 0;
3120
3121 } else /* apv >= 89 */ {
3122 /* we still expect NUL terminated strings */
3123 /* but just in case someone tries to be evil */
3124 D_ASSERT(p->verify_alg[SHARED_SECRET_MAX-1] == 0);
3125 D_ASSERT(p->csums_alg[SHARED_SECRET_MAX-1] == 0);
3126 p->verify_alg[SHARED_SECRET_MAX-1] = 0;
3127 p->csums_alg[SHARED_SECRET_MAX-1] = 0;
3128 }
3129
3130 if (strcmp(mdev->tconn->net_conf->verify_alg, p->verify_alg)) {
3131 if (mdev->state.conn == C_WF_REPORT_PARAMS) {
3132 dev_err(DEV, "Different verify-alg settings. me=\"%s\" peer=\"%s\"\n",
3133 mdev->tconn->net_conf->verify_alg, p->verify_alg);
3134 goto disconnect;
3135 }
3136 verify_tfm = drbd_crypto_alloc_digest_safe(mdev,
3137 p->verify_alg, "verify-alg");
3138 if (IS_ERR(verify_tfm)) {
3139 verify_tfm = NULL;
3140 goto disconnect;
3141 }
3142 }
3143
3144 if (apv >= 89 && strcmp(mdev->tconn->net_conf->csums_alg, p->csums_alg)) {
3145 if (mdev->state.conn == C_WF_REPORT_PARAMS) {
3146 dev_err(DEV, "Different csums-alg settings. me=\"%s\" peer=\"%s\"\n",
3147 mdev->tconn->net_conf->csums_alg, p->csums_alg);
3148 goto disconnect;
3149 }
3150 csums_tfm = drbd_crypto_alloc_digest_safe(mdev,
3151 p->csums_alg, "csums-alg");
3152 if (IS_ERR(csums_tfm)) {
3153 csums_tfm = NULL;
3154 goto disconnect;
3155 }
3156 }
3157
3158 if (apv > 94 && get_ldev(mdev)) {
3159 mdev->ldev->dc.resync_rate = be32_to_cpu(p->rate);
3160 mdev->ldev->dc.c_plan_ahead = be32_to_cpu(p->c_plan_ahead);
3161 mdev->ldev->dc.c_delay_target = be32_to_cpu(p->c_delay_target);
3162 mdev->ldev->dc.c_fill_target = be32_to_cpu(p->c_fill_target);
3163 mdev->ldev->dc.c_max_rate = be32_to_cpu(p->c_max_rate);
3164
3165 fifo_size = (mdev->ldev->dc.c_plan_ahead * 10 * SLEEP_TIME) / HZ;
3166 if (fifo_size != mdev->rs_plan_s.size && fifo_size > 0) {
3167 rs_plan_s = kzalloc(sizeof(int) * fifo_size, GFP_KERNEL);
3168 if (!rs_plan_s) {
3169 dev_err(DEV, "kmalloc of fifo_buffer failed");
3170 put_ldev(mdev);
3171 goto disconnect;
3172 }
3173 }
3174 put_ldev(mdev);
3175 }
3176
3177 spin_lock(&mdev->peer_seq_lock);
3178 /* lock against drbd_nl_syncer_conf() */
3179 if (verify_tfm) {
3180 strcpy(mdev->tconn->net_conf->verify_alg, p->verify_alg);
3181 mdev->tconn->net_conf->verify_alg_len = strlen(p->verify_alg) + 1;
3182 crypto_free_hash(mdev->tconn->verify_tfm);
3183 mdev->tconn->verify_tfm = verify_tfm;
3184 dev_info(DEV, "using verify-alg: \"%s\"\n", p->verify_alg);
3185 }
3186 if (csums_tfm) {
3187 strcpy(mdev->tconn->net_conf->csums_alg, p->csums_alg);
3188 mdev->tconn->net_conf->csums_alg_len = strlen(p->csums_alg) + 1;
3189 crypto_free_hash(mdev->tconn->csums_tfm);
3190 mdev->tconn->csums_tfm = csums_tfm;
3191 dev_info(DEV, "using csums-alg: \"%s\"\n", p->csums_alg);
3192 }
3193 if (fifo_size != mdev->rs_plan_s.size) {
3194 kfree(mdev->rs_plan_s.values);
3195 mdev->rs_plan_s.values = rs_plan_s;
3196 mdev->rs_plan_s.size = fifo_size;
3197 mdev->rs_planed = 0;
3198 }
3199 spin_unlock(&mdev->peer_seq_lock);
3200 }
3201 return 0;
3202
3203 disconnect:
3204 /* just for completeness: actually not needed,
3205 * as this is not reached if csums_tfm was ok. */
3206 crypto_free_hash(csums_tfm);
3207 /* but free the verify_tfm again, if csums_tfm did not work out */
3208 crypto_free_hash(verify_tfm);
3209 conn_request_state(mdev->tconn, NS(conn, C_DISCONNECTING), CS_HARD);
3210 return -EIO;
3211 }
3212
3213 /* warn if the arguments differ by more than 12.5% */
3214 static void warn_if_differ_considerably(struct drbd_conf *mdev,
3215 const char *s, sector_t a, sector_t b)
3216 {
3217 sector_t d;
3218 if (a == 0 || b == 0)
3219 return;
3220 d = (a > b) ? (a - b) : (b - a);
3221 if (d > (a>>3) || d > (b>>3))
3222 dev_warn(DEV, "Considerable difference in %s: %llus vs. %llus\n", s,
3223 (unsigned long long)a, (unsigned long long)b);
3224 }
3225
3226 static int receive_sizes(struct drbd_tconn *tconn, struct packet_info *pi)
3227 {
3228 struct drbd_conf *mdev;
3229 struct p_sizes *p = pi->data;
3230 enum determine_dev_size dd = unchanged;
3231 sector_t p_size, p_usize, my_usize;
3232 int ldsc = 0; /* local disk size changed */
3233 enum dds_flags ddsf;
3234
3235 mdev = vnr_to_mdev(tconn, pi->vnr);
3236 if (!mdev)
3237 return config_unknown_volume(tconn, pi);
3238
3239 p_size = be64_to_cpu(p->d_size);
3240 p_usize = be64_to_cpu(p->u_size);
3241
3242 /* just store the peer's disk size for now.
3243 * we still need to figure out whether we accept that. */
3244 mdev->p_size = p_size;
3245
3246 if (get_ldev(mdev)) {
3247 warn_if_differ_considerably(mdev, "lower level device sizes",
3248 p_size, drbd_get_max_capacity(mdev->ldev));
3249 warn_if_differ_considerably(mdev, "user requested size",
3250 p_usize, mdev->ldev->dc.disk_size);
3251
3252 /* if this is the first connect, or an otherwise expected
3253 * param exchange, choose the minimum */
3254 if (mdev->state.conn == C_WF_REPORT_PARAMS)
3255 p_usize = min_not_zero((sector_t)mdev->ldev->dc.disk_size,
3256 p_usize);
3257
3258 my_usize = mdev->ldev->dc.disk_size;
3259
3260 if (mdev->ldev->dc.disk_size != p_usize) {
3261 mdev->ldev->dc.disk_size = p_usize;
3262 dev_info(DEV, "Peer sets u_size to %lu sectors\n",
3263 (unsigned long)mdev->ldev->dc.disk_size);
3264 }
3265
3266 /* Never shrink a device with usable data during connect.
3267 But allow online shrinking if we are connected. */
3268 if (drbd_new_dev_size(mdev, mdev->ldev, 0) <
3269 drbd_get_capacity(mdev->this_bdev) &&
3270 mdev->state.disk >= D_OUTDATED &&
3271 mdev->state.conn < C_CONNECTED) {
3272 dev_err(DEV, "The peer's disk size is too small!\n");
3273 conn_request_state(mdev->tconn, NS(conn, C_DISCONNECTING), CS_HARD);
3274 mdev->ldev->dc.disk_size = my_usize;
3275 put_ldev(mdev);
3276 return -EIO;
3277 }
3278 put_ldev(mdev);
3279 }
3280
3281 ddsf = be16_to_cpu(p->dds_flags);
3282 if (get_ldev(mdev)) {
3283 dd = drbd_determine_dev_size(mdev, ddsf);
3284 put_ldev(mdev);
3285 if (dd == dev_size_error)
3286 return -EIO;
3287 drbd_md_sync(mdev);
3288 } else {
3289 /* I am diskless, need to accept the peer's size. */
3290 drbd_set_my_capacity(mdev, p_size);
3291 }
3292
3293 mdev->peer_max_bio_size = be32_to_cpu(p->max_bio_size);
3294 drbd_reconsider_max_bio_size(mdev);
3295
3296 if (get_ldev(mdev)) {
3297 if (mdev->ldev->known_size != drbd_get_capacity(mdev->ldev->backing_bdev)) {
3298 mdev->ldev->known_size = drbd_get_capacity(mdev->ldev->backing_bdev);
3299 ldsc = 1;
3300 }
3301
3302 put_ldev(mdev);
3303 }
3304
3305 if (mdev->state.conn > C_WF_REPORT_PARAMS) {
3306 if (be64_to_cpu(p->c_size) !=
3307 drbd_get_capacity(mdev->this_bdev) || ldsc) {
3308 /* we have different sizes, probably peer
3309 * needs to know my new size... */
3310 drbd_send_sizes(mdev, 0, ddsf);
3311 }
3312 if (test_and_clear_bit(RESIZE_PENDING, &mdev->flags) ||
3313 (dd == grew && mdev->state.conn == C_CONNECTED)) {
3314 if (mdev->state.pdsk >= D_INCONSISTENT &&
3315 mdev->state.disk >= D_INCONSISTENT) {
3316 if (ddsf & DDSF_NO_RESYNC)
3317 dev_info(DEV, "Resync of new storage suppressed with --assume-clean\n");
3318 else
3319 resync_after_online_grow(mdev);
3320 } else
3321 set_bit(RESYNC_AFTER_NEG, &mdev->flags);
3322 }
3323 }
3324
3325 return 0;
3326 }
3327
3328 static int receive_uuids(struct drbd_tconn *tconn, struct packet_info *pi)
3329 {
3330 struct drbd_conf *mdev;
3331 struct p_uuids *p = pi->data;
3332 u64 *p_uuid;
3333 int i, updated_uuids = 0;
3334
3335 mdev = vnr_to_mdev(tconn, pi->vnr);
3336 if (!mdev)
3337 return config_unknown_volume(tconn, pi);
3338
3339 p_uuid = kmalloc(sizeof(u64)*UI_EXTENDED_SIZE, GFP_NOIO);
3340
3341 for (i = UI_CURRENT; i < UI_EXTENDED_SIZE; i++)
3342 p_uuid[i] = be64_to_cpu(p->uuid[i]);
3343
3344 kfree(mdev->p_uuid);
3345 mdev->p_uuid = p_uuid;
3346
3347 if (mdev->state.conn < C_CONNECTED &&
3348 mdev->state.disk < D_INCONSISTENT &&
3349 mdev->state.role == R_PRIMARY &&
3350 (mdev->ed_uuid & ~((u64)1)) != (p_uuid[UI_CURRENT] & ~((u64)1))) {
3351 dev_err(DEV, "Can only connect to data with current UUID=%016llX\n",
3352 (unsigned long long)mdev->ed_uuid);
3353 conn_request_state(mdev->tconn, NS(conn, C_DISCONNECTING), CS_HARD);
3354 return -EIO;
3355 }
3356
3357 if (get_ldev(mdev)) {
3358 int skip_initial_sync =
3359 mdev->state.conn == C_CONNECTED &&
3360 mdev->tconn->agreed_pro_version >= 90 &&
3361 mdev->ldev->md.uuid[UI_CURRENT] == UUID_JUST_CREATED &&
3362 (p_uuid[UI_FLAGS] & 8);
3363 if (skip_initial_sync) {
3364 dev_info(DEV, "Accepted new current UUID, preparing to skip initial sync\n");
3365 drbd_bitmap_io(mdev, &drbd_bmio_clear_n_write,
3366 "clear_n_write from receive_uuids",
3367 BM_LOCKED_TEST_ALLOWED);
3368 _drbd_uuid_set(mdev, UI_CURRENT, p_uuid[UI_CURRENT]);
3369 _drbd_uuid_set(mdev, UI_BITMAP, 0);
3370 _drbd_set_state(_NS2(mdev, disk, D_UP_TO_DATE, pdsk, D_UP_TO_DATE),
3371 CS_VERBOSE, NULL);
3372 drbd_md_sync(mdev);
3373 updated_uuids = 1;
3374 }
3375 put_ldev(mdev);
3376 } else if (mdev->state.disk < D_INCONSISTENT &&
3377 mdev->state.role == R_PRIMARY) {
3378 /* I am a diskless primary, the peer just created a new current UUID
3379 for me. */
3380 updated_uuids = drbd_set_ed_uuid(mdev, p_uuid[UI_CURRENT]);
3381 }
3382
3383 /* Before we test for the disk state, we should wait until an eventually
3384 ongoing cluster wide state change is finished. That is important if
3385 we are primary and are detaching from our disk. We need to see the
3386 new disk state... */
3387 mutex_lock(mdev->state_mutex);
3388 mutex_unlock(mdev->state_mutex);
3389 if (mdev->state.conn >= C_CONNECTED && mdev->state.disk < D_INCONSISTENT)
3390 updated_uuids |= drbd_set_ed_uuid(mdev, p_uuid[UI_CURRENT]);
3391
3392 if (updated_uuids)
3393 drbd_print_uuids(mdev, "receiver updated UUIDs to");
3394
3395 return 0;
3396 }
3397
3398 /**
3399 * convert_state() - Converts the peer's view of the cluster state to our point of view
3400 * @ps: The state as seen by the peer.
3401 */
3402 static union drbd_state convert_state(union drbd_state ps)
3403 {
3404 union drbd_state ms;
3405
3406 static enum drbd_conns c_tab[] = {
3407 [C_CONNECTED] = C_CONNECTED,
3408
3409 [C_STARTING_SYNC_S] = C_STARTING_SYNC_T,
3410 [C_STARTING_SYNC_T] = C_STARTING_SYNC_S,
3411 [C_DISCONNECTING] = C_TEAR_DOWN, /* C_NETWORK_FAILURE, */
3412 [C_VERIFY_S] = C_VERIFY_T,
3413 [C_MASK] = C_MASK,
3414 };
3415
3416 ms.i = ps.i;
3417
3418 ms.conn = c_tab[ps.conn];
3419 ms.peer = ps.role;
3420 ms.role = ps.peer;
3421 ms.pdsk = ps.disk;
3422 ms.disk = ps.pdsk;
3423 ms.peer_isp = (ps.aftr_isp | ps.user_isp);
3424
3425 return ms;
3426 }
3427
3428 static int receive_req_state(struct drbd_tconn *tconn, struct packet_info *pi)
3429 {
3430 struct drbd_conf *mdev;
3431 struct p_req_state *p = pi->data;
3432 union drbd_state mask, val;
3433 enum drbd_state_rv rv;
3434
3435 mdev = vnr_to_mdev(tconn, pi->vnr);
3436 if (!mdev)
3437 return -EIO;
3438
3439 mask.i = be32_to_cpu(p->mask);
3440 val.i = be32_to_cpu(p->val);
3441
3442 if (test_bit(DISCARD_CONCURRENT, &mdev->tconn->flags) &&
3443 mutex_is_locked(mdev->state_mutex)) {
3444 drbd_send_sr_reply(mdev, SS_CONCURRENT_ST_CHG);
3445 return 0;
3446 }
3447
3448 mask = convert_state(mask);
3449 val = convert_state(val);
3450
3451 rv = drbd_change_state(mdev, CS_VERBOSE, mask, val);
3452 drbd_send_sr_reply(mdev, rv);
3453
3454 drbd_md_sync(mdev);
3455
3456 return 0;
3457 }
3458
3459 static int receive_req_conn_state(struct drbd_tconn *tconn, struct packet_info *pi)
3460 {
3461 struct p_req_state *p = pi->data;
3462 union drbd_state mask, val;
3463 enum drbd_state_rv rv;
3464
3465 mask.i = be32_to_cpu(p->mask);
3466 val.i = be32_to_cpu(p->val);
3467
3468 if (test_bit(DISCARD_CONCURRENT, &tconn->flags) &&
3469 mutex_is_locked(&tconn->cstate_mutex)) {
3470 conn_send_sr_reply(tconn, SS_CONCURRENT_ST_CHG);
3471 return 0;
3472 }
3473
3474 mask = convert_state(mask);
3475 val = convert_state(val);
3476
3477 rv = conn_request_state(tconn, mask, val, CS_VERBOSE | CS_LOCAL_ONLY | CS_IGN_OUTD_FAIL);
3478 conn_send_sr_reply(tconn, rv);
3479
3480 return 0;
3481 }
3482
3483 static int receive_state(struct drbd_tconn *tconn, struct packet_info *pi)
3484 {
3485 struct drbd_conf *mdev;
3486 struct p_state *p = pi->data;
3487 union drbd_state os, ns, peer_state;
3488 enum drbd_disk_state real_peer_disk;
3489 enum chg_state_flags cs_flags;
3490 int rv;
3491
3492 mdev = vnr_to_mdev(tconn, pi->vnr);
3493 if (!mdev)
3494 return config_unknown_volume(tconn, pi);
3495
3496 peer_state.i = be32_to_cpu(p->state);
3497
3498 real_peer_disk = peer_state.disk;
3499 if (peer_state.disk == D_NEGOTIATING) {
3500 real_peer_disk = mdev->p_uuid[UI_FLAGS] & 4 ? D_INCONSISTENT : D_CONSISTENT;
3501 dev_info(DEV, "real peer disk state = %s\n", drbd_disk_str(real_peer_disk));
3502 }
3503
3504 spin_lock_irq(&mdev->tconn->req_lock);
3505 retry:
3506 os = ns = drbd_read_state(mdev);
3507 spin_unlock_irq(&mdev->tconn->req_lock);
3508
3509 /* peer says his disk is uptodate, while we think it is inconsistent,
3510 * and this happens while we think we have a sync going on. */
3511 if (os.pdsk == D_INCONSISTENT && real_peer_disk == D_UP_TO_DATE &&
3512 os.conn > C_CONNECTED && os.disk == D_UP_TO_DATE) {
3513 /* If we are (becoming) SyncSource, but peer is still in sync
3514 * preparation, ignore its uptodate-ness to avoid flapping, it
3515 * will change to inconsistent once the peer reaches active
3516 * syncing states.
3517 * It may have changed syncer-paused flags, however, so we
3518 * cannot ignore this completely. */
3519 if (peer_state.conn > C_CONNECTED &&
3520 peer_state.conn < C_SYNC_SOURCE)
3521 real_peer_disk = D_INCONSISTENT;
3522
3523 /* if peer_state changes to connected at the same time,
3524 * it explicitly notifies us that it finished resync.
3525 * Maybe we should finish it up, too? */
3526 else if (os.conn >= C_SYNC_SOURCE &&
3527 peer_state.conn == C_CONNECTED) {
3528 if (drbd_bm_total_weight(mdev) <= mdev->rs_failed)
3529 drbd_resync_finished(mdev);
3530 return 0;
3531 }
3532 }
3533
3534 /* peer says his disk is inconsistent, while we think it is uptodate,
3535 * and this happens while the peer still thinks we have a sync going on,
3536 * but we think we are already done with the sync.
3537 * We ignore this to avoid flapping pdsk.
3538 * This should not happen, if the peer is a recent version of drbd. */
3539 if (os.pdsk == D_UP_TO_DATE && real_peer_disk == D_INCONSISTENT &&
3540 os.conn == C_CONNECTED && peer_state.conn > C_SYNC_SOURCE)
3541 real_peer_disk = D_UP_TO_DATE;
3542
3543 if (ns.conn == C_WF_REPORT_PARAMS)
3544 ns.conn = C_CONNECTED;
3545
3546 if (peer_state.conn == C_AHEAD)
3547 ns.conn = C_BEHIND;
3548
3549 if (mdev->p_uuid && peer_state.disk >= D_NEGOTIATING &&
3550 get_ldev_if_state(mdev, D_NEGOTIATING)) {
3551 int cr; /* consider resync */
3552
3553 /* if we established a new connection */
3554 cr = (os.conn < C_CONNECTED);
3555 /* if we had an established connection
3556 * and one of the nodes newly attaches a disk */
3557 cr |= (os.conn == C_CONNECTED &&
3558 (peer_state.disk == D_NEGOTIATING ||
3559 os.disk == D_NEGOTIATING));
3560 /* if we have both been inconsistent, and the peer has been
3561 * forced to be UpToDate with --overwrite-data */
3562 cr |= test_bit(CONSIDER_RESYNC, &mdev->flags);
3563 /* if we had been plain connected, and the admin requested to
3564 * start a sync by "invalidate" or "invalidate-remote" */
3565 cr |= (os.conn == C_CONNECTED &&
3566 (peer_state.conn >= C_STARTING_SYNC_S &&
3567 peer_state.conn <= C_WF_BITMAP_T));
3568
3569 if (cr)
3570 ns.conn = drbd_sync_handshake(mdev, peer_state.role, real_peer_disk);
3571
3572 put_ldev(mdev);
3573 if (ns.conn == C_MASK) {
3574 ns.conn = C_CONNECTED;
3575 if (mdev->state.disk == D_NEGOTIATING) {
3576 drbd_force_state(mdev, NS(disk, D_FAILED));
3577 } else if (peer_state.disk == D_NEGOTIATING) {
3578 dev_err(DEV, "Disk attach process on the peer node was aborted.\n");
3579 peer_state.disk = D_DISKLESS;
3580 real_peer_disk = D_DISKLESS;
3581 } else {
3582 if (test_and_clear_bit(CONN_DRY_RUN, &mdev->tconn->flags))
3583 return -EIO;
3584 D_ASSERT(os.conn == C_WF_REPORT_PARAMS);
3585 conn_request_state(mdev->tconn, NS(conn, C_DISCONNECTING), CS_HARD);
3586 return -EIO;
3587 }
3588 }
3589 }
3590
3591 spin_lock_irq(&mdev->tconn->req_lock);
3592 if (os.i != drbd_read_state(mdev).i)
3593 goto retry;
3594 clear_bit(CONSIDER_RESYNC, &mdev->flags);
3595 ns.peer = peer_state.role;
3596 ns.pdsk = real_peer_disk;
3597 ns.peer_isp = (peer_state.aftr_isp | peer_state.user_isp);
3598 if ((ns.conn == C_CONNECTED || ns.conn == C_WF_BITMAP_S) && ns.disk == D_NEGOTIATING)
3599 ns.disk = mdev->new_state_tmp.disk;
3600 cs_flags = CS_VERBOSE + (os.conn < C_CONNECTED && ns.conn >= C_CONNECTED ? 0 : CS_HARD);
3601 if (ns.pdsk == D_CONSISTENT && drbd_suspended(mdev) && ns.conn == C_CONNECTED && os.conn < C_CONNECTED &&
3602 test_bit(NEW_CUR_UUID, &mdev->flags)) {
3603 /* Do not allow tl_restart(RESEND) for a rebooted peer. We can only allow this
3604 for temporal network outages! */
3605 spin_unlock_irq(&mdev->tconn->req_lock);
3606 dev_err(DEV, "Aborting Connect, can not thaw IO with an only Consistent peer\n");
3607 tl_clear(mdev->tconn);
3608 drbd_uuid_new_current(mdev);
3609 clear_bit(NEW_CUR_UUID, &mdev->flags);
3610 conn_request_state(mdev->tconn, NS2(conn, C_PROTOCOL_ERROR, susp, 0), CS_HARD);
3611 return -EIO;
3612 }
3613 rv = _drbd_set_state(mdev, ns, cs_flags, NULL);
3614 ns = drbd_read_state(mdev);
3615 spin_unlock_irq(&mdev->tconn->req_lock);
3616
3617 if (rv < SS_SUCCESS) {
3618 conn_request_state(mdev->tconn, NS(conn, C_DISCONNECTING), CS_HARD);
3619 return -EIO;
3620 }
3621
3622 if (os.conn > C_WF_REPORT_PARAMS) {
3623 if (ns.conn > C_CONNECTED && peer_state.conn <= C_CONNECTED &&
3624 peer_state.disk != D_NEGOTIATING ) {
3625 /* we want resync, peer has not yet decided to sync... */
3626 /* Nowadays only used when forcing a node into primary role and
3627 setting its disk to UpToDate with that */
3628 drbd_send_uuids(mdev);
3629 drbd_send_state(mdev);
3630 }
3631 }
3632
3633 mdev->tconn->net_conf->want_lose = 0;
3634
3635 drbd_md_sync(mdev); /* update connected indicator, la_size, ... */
3636
3637 return 0;
3638 }
3639
3640 static int receive_sync_uuid(struct drbd_tconn *tconn, struct packet_info *pi)
3641 {
3642 struct drbd_conf *mdev;
3643 struct p_rs_uuid *p = pi->data;
3644
3645 mdev = vnr_to_mdev(tconn, pi->vnr);
3646 if (!mdev)
3647 return -EIO;
3648
3649 wait_event(mdev->misc_wait,
3650 mdev->state.conn == C_WF_SYNC_UUID ||
3651 mdev->state.conn == C_BEHIND ||
3652 mdev->state.conn < C_CONNECTED ||
3653 mdev->state.disk < D_NEGOTIATING);
3654
3655 /* D_ASSERT( mdev->state.conn == C_WF_SYNC_UUID ); */
3656
3657 /* Here the _drbd_uuid_ functions are right, current should
3658 _not_ be rotated into the history */
3659 if (get_ldev_if_state(mdev, D_NEGOTIATING)) {
3660 _drbd_uuid_set(mdev, UI_CURRENT, be64_to_cpu(p->uuid));
3661 _drbd_uuid_set(mdev, UI_BITMAP, 0UL);
3662
3663 drbd_print_uuids(mdev, "updated sync uuid");
3664 drbd_start_resync(mdev, C_SYNC_TARGET);
3665
3666 put_ldev(mdev);
3667 } else
3668 dev_err(DEV, "Ignoring SyncUUID packet!\n");
3669
3670 return 0;
3671 }
3672
3673 /**
3674 * receive_bitmap_plain
3675 *
3676 * Return 0 when done, 1 when another iteration is needed, and a negative error
3677 * code upon failure.
3678 */
3679 static int
3680 receive_bitmap_plain(struct drbd_conf *mdev, unsigned int size,
3681 unsigned long *p, struct bm_xfer_ctx *c)
3682 {
3683 unsigned int data_size = DRBD_SOCKET_BUFFER_SIZE -
3684 drbd_header_size(mdev->tconn);
3685 unsigned int num_words = min_t(size_t, data_size / sizeof(*p),
3686 c->bm_words - c->word_offset);
3687 unsigned int want = num_words * sizeof(*p);
3688 int err;
3689
3690 if (want != size) {
3691 dev_err(DEV, "%s:want (%u) != size (%u)\n", __func__, want, size);
3692 return -EIO;
3693 }
3694 if (want == 0)
3695 return 0;
3696 err = drbd_recv_all(mdev->tconn, p, want);
3697 if (err)
3698 return err;
3699
3700 drbd_bm_merge_lel(mdev, c->word_offset, num_words, p);
3701
3702 c->word_offset += num_words;
3703 c->bit_offset = c->word_offset * BITS_PER_LONG;
3704 if (c->bit_offset > c->bm_bits)
3705 c->bit_offset = c->bm_bits;
3706
3707 return 1;
3708 }
3709
3710 static enum drbd_bitmap_code dcbp_get_code(struct p_compressed_bm *p)
3711 {
3712 return (enum drbd_bitmap_code)(p->encoding & 0x0f);
3713 }
3714
3715 static int dcbp_get_start(struct p_compressed_bm *p)
3716 {
3717 return (p->encoding & 0x80) != 0;
3718 }
3719
3720 static int dcbp_get_pad_bits(struct p_compressed_bm *p)
3721 {
3722 return (p->encoding >> 4) & 0x7;
3723 }
3724
3725 /**
3726 * recv_bm_rle_bits
3727 *
3728 * Return 0 when done, 1 when another iteration is needed, and a negative error
3729 * code upon failure.
3730 */
3731 static int
3732 recv_bm_rle_bits(struct drbd_conf *mdev,
3733 struct p_compressed_bm *p,
3734 struct bm_xfer_ctx *c,
3735 unsigned int len)
3736 {
3737 struct bitstream bs;
3738 u64 look_ahead;
3739 u64 rl;
3740 u64 tmp;
3741 unsigned long s = c->bit_offset;
3742 unsigned long e;
3743 int toggle = dcbp_get_start(p);
3744 int have;
3745 int bits;
3746
3747 bitstream_init(&bs, p->code, len, dcbp_get_pad_bits(p));
3748
3749 bits = bitstream_get_bits(&bs, &look_ahead, 64);
3750 if (bits < 0)
3751 return -EIO;
3752
3753 for (have = bits; have > 0; s += rl, toggle = !toggle) {
3754 bits = vli_decode_bits(&rl, look_ahead);
3755 if (bits <= 0)
3756 return -EIO;
3757
3758 if (toggle) {
3759 e = s + rl -1;
3760 if (e >= c->bm_bits) {
3761 dev_err(DEV, "bitmap overflow (e:%lu) while decoding bm RLE packet\n", e);
3762 return -EIO;
3763 }
3764 _drbd_bm_set_bits(mdev, s, e);
3765 }
3766
3767 if (have < bits) {
3768 dev_err(DEV, "bitmap decoding error: h:%d b:%d la:0x%08llx l:%u/%u\n",
3769 have, bits, look_ahead,
3770 (unsigned int)(bs.cur.b - p->code),
3771 (unsigned int)bs.buf_len);
3772 return -EIO;
3773 }
3774 look_ahead >>= bits;
3775 have -= bits;
3776
3777 bits = bitstream_get_bits(&bs, &tmp, 64 - have);
3778 if (bits < 0)
3779 return -EIO;
3780 look_ahead |= tmp << have;
3781 have += bits;
3782 }
3783
3784 c->bit_offset = s;
3785 bm_xfer_ctx_bit_to_word_offset(c);
3786
3787 return (s != c->bm_bits);
3788 }
3789
3790 /**
3791 * decode_bitmap_c
3792 *
3793 * Return 0 when done, 1 when another iteration is needed, and a negative error
3794 * code upon failure.
3795 */
3796 static int
3797 decode_bitmap_c(struct drbd_conf *mdev,
3798 struct p_compressed_bm *p,
3799 struct bm_xfer_ctx *c,
3800 unsigned int len)
3801 {
3802 if (dcbp_get_code(p) == RLE_VLI_Bits)
3803 return recv_bm_rle_bits(mdev, p, c, len - sizeof(*p));
3804
3805 /* other variants had been implemented for evaluation,
3806 * but have been dropped as this one turned out to be "best"
3807 * during all our tests. */
3808
3809 dev_err(DEV, "receive_bitmap_c: unknown encoding %u\n", p->encoding);
3810 conn_request_state(mdev->tconn, NS(conn, C_PROTOCOL_ERROR), CS_HARD);
3811 return -EIO;
3812 }
3813
3814 void INFO_bm_xfer_stats(struct drbd_conf *mdev,
3815 const char *direction, struct bm_xfer_ctx *c)
3816 {
3817 /* what would it take to transfer it "plaintext" */
3818 unsigned int header_size = drbd_header_size(mdev->tconn);
3819 unsigned int data_size = DRBD_SOCKET_BUFFER_SIZE - header_size;
3820 unsigned int plain =
3821 header_size * (DIV_ROUND_UP(c->bm_words, data_size) + 1) +
3822 c->bm_words * sizeof(unsigned long);
3823 unsigned int total = c->bytes[0] + c->bytes[1];
3824 unsigned int r;
3825
3826 /* total can not be zero. but just in case: */
3827 if (total == 0)
3828 return;
3829
3830 /* don't report if not compressed */
3831 if (total >= plain)
3832 return;
3833
3834 /* total < plain. check for overflow, still */
3835 r = (total > UINT_MAX/1000) ? (total / (plain/1000))
3836 : (1000 * total / plain);
3837
3838 if (r > 1000)
3839 r = 1000;
3840
3841 r = 1000 - r;
3842 dev_info(DEV, "%s bitmap stats [Bytes(packets)]: plain %u(%u), RLE %u(%u), "
3843 "total %u; compression: %u.%u%%\n",
3844 direction,
3845 c->bytes[1], c->packets[1],
3846 c->bytes[0], c->packets[0],
3847 total, r/10, r % 10);
3848 }
3849
3850 /* Since we are processing the bitfield from lower addresses to higher,
3851 it does not matter if the process it in 32 bit chunks or 64 bit
3852 chunks as long as it is little endian. (Understand it as byte stream,
3853 beginning with the lowest byte...) If we would use big endian
3854 we would need to process it from the highest address to the lowest,
3855 in order to be agnostic to the 32 vs 64 bits issue.
3856
3857 returns 0 on failure, 1 if we successfully received it. */
3858 static int receive_bitmap(struct drbd_tconn *tconn, struct packet_info *pi)
3859 {
3860 struct drbd_conf *mdev;
3861 struct bm_xfer_ctx c;
3862 int err;
3863
3864 mdev = vnr_to_mdev(tconn, pi->vnr);
3865 if (!mdev)
3866 return -EIO;
3867
3868 drbd_bm_lock(mdev, "receive bitmap", BM_LOCKED_SET_ALLOWED);
3869 /* you are supposed to send additional out-of-sync information
3870 * if you actually set bits during this phase */
3871
3872 c = (struct bm_xfer_ctx) {
3873 .bm_bits = drbd_bm_bits(mdev),
3874 .bm_words = drbd_bm_words(mdev),
3875 };
3876
3877 for(;;) {
3878 if (pi->cmd == P_BITMAP)
3879 err = receive_bitmap_plain(mdev, pi->size, pi->data, &c);
3880 else if (pi->cmd == P_COMPRESSED_BITMAP) {
3881 /* MAYBE: sanity check that we speak proto >= 90,
3882 * and the feature is enabled! */
3883 struct p_compressed_bm *p = pi->data;
3884
3885 if (pi->size > DRBD_SOCKET_BUFFER_SIZE - drbd_header_size(tconn)) {
3886 dev_err(DEV, "ReportCBitmap packet too large\n");
3887 err = -EIO;
3888 goto out;
3889 }
3890 if (pi->size <= sizeof(*p)) {
3891 dev_err(DEV, "ReportCBitmap packet too small (l:%u)\n", pi->size);
3892 err = -EIO;
3893 goto out;
3894 }
3895 err = drbd_recv_all(mdev->tconn, p, pi->size);
3896 if (err)
3897 goto out;
3898 err = decode_bitmap_c(mdev, p, &c, pi->size);
3899 } else {
3900 dev_warn(DEV, "receive_bitmap: cmd neither ReportBitMap nor ReportCBitMap (is 0x%x)", pi->cmd);
3901 err = -EIO;
3902 goto out;
3903 }
3904
3905 c.packets[pi->cmd == P_BITMAP]++;
3906 c.bytes[pi->cmd == P_BITMAP] += drbd_header_size(tconn) + pi->size;
3907
3908 if (err <= 0) {
3909 if (err < 0)
3910 goto out;
3911 break;
3912 }
3913 err = drbd_recv_header(mdev->tconn, pi);
3914 if (err)
3915 goto out;
3916 }
3917
3918 INFO_bm_xfer_stats(mdev, "receive", &c);
3919
3920 if (mdev->state.conn == C_WF_BITMAP_T) {
3921 enum drbd_state_rv rv;
3922
3923 err = drbd_send_bitmap(mdev);
3924 if (err)
3925 goto out;
3926 /* Omit CS_ORDERED with this state transition to avoid deadlocks. */
3927 rv = _drbd_request_state(mdev, NS(conn, C_WF_SYNC_UUID), CS_VERBOSE);
3928 D_ASSERT(rv == SS_SUCCESS);
3929 } else if (mdev->state.conn != C_WF_BITMAP_S) {
3930 /* admin may have requested C_DISCONNECTING,
3931 * other threads may have noticed network errors */
3932 dev_info(DEV, "unexpected cstate (%s) in receive_bitmap\n",
3933 drbd_conn_str(mdev->state.conn));
3934 }
3935 err = 0;
3936
3937 out:
3938 drbd_bm_unlock(mdev);
3939 if (!err && mdev->state.conn == C_WF_BITMAP_S)
3940 drbd_start_resync(mdev, C_SYNC_SOURCE);
3941 return err;
3942 }
3943
3944 static int receive_skip(struct drbd_tconn *tconn, struct packet_info *pi)
3945 {
3946 conn_warn(tconn, "skipping unknown optional packet type %d, l: %d!\n",
3947 pi->cmd, pi->size);
3948
3949 return ignore_remaining_packet(tconn, pi);
3950 }
3951
3952 static int receive_UnplugRemote(struct drbd_tconn *tconn, struct packet_info *pi)
3953 {
3954 /* Make sure we've acked all the TCP data associated
3955 * with the data requests being unplugged */
3956 drbd_tcp_quickack(tconn->data.socket);
3957
3958 return 0;
3959 }
3960
3961 static int receive_out_of_sync(struct drbd_tconn *tconn, struct packet_info *pi)
3962 {
3963 struct drbd_conf *mdev;
3964 struct p_block_desc *p = pi->data;
3965
3966 mdev = vnr_to_mdev(tconn, pi->vnr);
3967 if (!mdev)
3968 return -EIO;
3969
3970 switch (mdev->state.conn) {
3971 case C_WF_SYNC_UUID:
3972 case C_WF_BITMAP_T:
3973 case C_BEHIND:
3974 break;
3975 default:
3976 dev_err(DEV, "ASSERT FAILED cstate = %s, expected: WFSyncUUID|WFBitMapT|Behind\n",
3977 drbd_conn_str(mdev->state.conn));
3978 }
3979
3980 drbd_set_out_of_sync(mdev, be64_to_cpu(p->sector), be32_to_cpu(p->blksize));
3981
3982 return 0;
3983 }
3984
3985 struct data_cmd {
3986 int expect_payload;
3987 size_t pkt_size;
3988 int (*fn)(struct drbd_tconn *, struct packet_info *);
3989 };
3990
3991 static struct data_cmd drbd_cmd_handler[] = {
3992 [P_DATA] = { 1, sizeof(struct p_data), receive_Data },
3993 [P_DATA_REPLY] = { 1, sizeof(struct p_data), receive_DataReply },
3994 [P_RS_DATA_REPLY] = { 1, sizeof(struct p_data), receive_RSDataReply } ,
3995 [P_BARRIER] = { 0, sizeof(struct p_barrier), receive_Barrier } ,
3996 [P_BITMAP] = { 1, 0, receive_bitmap } ,
3997 [P_COMPRESSED_BITMAP] = { 1, 0, receive_bitmap } ,
3998 [P_UNPLUG_REMOTE] = { 0, 0, receive_UnplugRemote },
3999 [P_DATA_REQUEST] = { 0, sizeof(struct p_block_req), receive_DataRequest },
4000 [P_RS_DATA_REQUEST] = { 0, sizeof(struct p_block_req), receive_DataRequest },
4001 [P_SYNC_PARAM] = { 1, 0, receive_SyncParam },
4002 [P_SYNC_PARAM89] = { 1, 0, receive_SyncParam },
4003 [P_PROTOCOL] = { 1, sizeof(struct p_protocol), receive_protocol },
4004 [P_UUIDS] = { 0, sizeof(struct p_uuids), receive_uuids },
4005 [P_SIZES] = { 0, sizeof(struct p_sizes), receive_sizes },
4006 [P_STATE] = { 0, sizeof(struct p_state), receive_state },
4007 [P_STATE_CHG_REQ] = { 0, sizeof(struct p_req_state), receive_req_state },
4008 [P_SYNC_UUID] = { 0, sizeof(struct p_rs_uuid), receive_sync_uuid },
4009 [P_OV_REQUEST] = { 0, sizeof(struct p_block_req), receive_DataRequest },
4010 [P_OV_REPLY] = { 1, sizeof(struct p_block_req), receive_DataRequest },
4011 [P_CSUM_RS_REQUEST] = { 1, sizeof(struct p_block_req), receive_DataRequest },
4012 [P_DELAY_PROBE] = { 0, sizeof(struct p_delay_probe93), receive_skip },
4013 [P_OUT_OF_SYNC] = { 0, sizeof(struct p_block_desc), receive_out_of_sync },
4014 [P_CONN_ST_CHG_REQ] = { 0, sizeof(struct p_req_state), receive_req_conn_state },
4015 };
4016
4017 static void drbdd(struct drbd_tconn *tconn)
4018 {
4019 struct packet_info pi;
4020 size_t shs; /* sub header size */
4021 int err;
4022
4023 while (get_t_state(&tconn->receiver) == RUNNING) {
4024 struct data_cmd *cmd;
4025
4026 drbd_thread_current_set_cpu(&tconn->receiver);
4027 if (drbd_recv_header(tconn, &pi))
4028 goto err_out;
4029
4030 cmd = &drbd_cmd_handler[pi.cmd];
4031 if (unlikely(pi.cmd >= ARRAY_SIZE(drbd_cmd_handler) || !cmd->fn)) {
4032 conn_err(tconn, "unknown packet type %d, l: %d!\n", pi.cmd, pi.size);
4033 goto err_out;
4034 }
4035
4036 shs = cmd->pkt_size;
4037 if (pi.size > shs && !cmd->expect_payload) {
4038 conn_err(tconn, "No payload expected %s l:%d\n", cmdname(pi.cmd), pi.size);
4039 goto err_out;
4040 }
4041
4042 if (shs) {
4043 err = drbd_recv_all_warn(tconn, pi.data, shs);
4044 if (err)
4045 goto err_out;
4046 pi.size -= shs;
4047 }
4048
4049 err = cmd->fn(tconn, &pi);
4050 if (err) {
4051 conn_err(tconn, "error receiving %s, e: %d l: %d!\n",
4052 cmdname(pi.cmd), err, pi.size);
4053 goto err_out;
4054 }
4055 }
4056 return;
4057
4058 err_out:
4059 conn_request_state(tconn, NS(conn, C_PROTOCOL_ERROR), CS_HARD);
4060 }
4061
4062 void conn_flush_workqueue(struct drbd_tconn *tconn)
4063 {
4064 struct drbd_wq_barrier barr;
4065
4066 barr.w.cb = w_prev_work_done;
4067 barr.w.tconn = tconn;
4068 init_completion(&barr.done);
4069 drbd_queue_work(&tconn->data.work, &barr.w);
4070 wait_for_completion(&barr.done);
4071 }
4072
4073 static void drbd_disconnect(struct drbd_tconn *tconn)
4074 {
4075 enum drbd_conns oc;
4076 int rv = SS_UNKNOWN_ERROR;
4077
4078 if (tconn->cstate == C_STANDALONE)
4079 return;
4080
4081 /* asender does not clean up anything. it must not interfere, either */
4082 drbd_thread_stop(&tconn->asender);
4083 drbd_free_sock(tconn);
4084
4085 idr_for_each(&tconn->volumes, drbd_disconnected, tconn);
4086 conn_info(tconn, "Connection closed\n");
4087
4088 if (conn_highest_role(tconn) == R_PRIMARY && conn_highest_pdsk(tconn) >= D_UNKNOWN)
4089 conn_try_outdate_peer_async(tconn);
4090
4091 spin_lock_irq(&tconn->req_lock);
4092 oc = tconn->cstate;
4093 if (oc >= C_UNCONNECTED)
4094 rv = _conn_request_state(tconn, NS(conn, C_UNCONNECTED), CS_VERBOSE);
4095
4096 spin_unlock_irq(&tconn->req_lock);
4097
4098 if (oc == C_DISCONNECTING) {
4099 wait_event(tconn->net_cnt_wait, atomic_read(&tconn->net_cnt) == 0);
4100
4101 crypto_free_hash(tconn->cram_hmac_tfm);
4102 tconn->cram_hmac_tfm = NULL;
4103
4104 kfree(tconn->net_conf);
4105 tconn->net_conf = NULL;
4106 conn_request_state(tconn, NS(conn, C_STANDALONE), CS_VERBOSE);
4107 }
4108 }
4109
4110 static int drbd_disconnected(int vnr, void *p, void *data)
4111 {
4112 struct drbd_conf *mdev = (struct drbd_conf *)p;
4113 enum drbd_fencing_p fp;
4114 unsigned int i;
4115
4116 /* wait for current activity to cease. */
4117 spin_lock_irq(&mdev->tconn->req_lock);
4118 _drbd_wait_ee_list_empty(mdev, &mdev->active_ee);
4119 _drbd_wait_ee_list_empty(mdev, &mdev->sync_ee);
4120 _drbd_wait_ee_list_empty(mdev, &mdev->read_ee);
4121 spin_unlock_irq(&mdev->tconn->req_lock);
4122
4123 /* We do not have data structures that would allow us to
4124 * get the rs_pending_cnt down to 0 again.
4125 * * On C_SYNC_TARGET we do not have any data structures describing
4126 * the pending RSDataRequest's we have sent.
4127 * * On C_SYNC_SOURCE there is no data structure that tracks
4128 * the P_RS_DATA_REPLY blocks that we sent to the SyncTarget.
4129 * And no, it is not the sum of the reference counts in the
4130 * resync_LRU. The resync_LRU tracks the whole operation including
4131 * the disk-IO, while the rs_pending_cnt only tracks the blocks
4132 * on the fly. */
4133 drbd_rs_cancel_all(mdev);
4134 mdev->rs_total = 0;
4135 mdev->rs_failed = 0;
4136 atomic_set(&mdev->rs_pending_cnt, 0);
4137 wake_up(&mdev->misc_wait);
4138
4139 del_timer(&mdev->request_timer);
4140
4141 del_timer_sync(&mdev->resync_timer);
4142 resync_timer_fn((unsigned long)mdev);
4143
4144 /* wait for all w_e_end_data_req, w_e_end_rsdata_req, w_send_barrier,
4145 * w_make_resync_request etc. which may still be on the worker queue
4146 * to be "canceled" */
4147 drbd_flush_workqueue(mdev);
4148
4149 /* This also does reclaim_net_ee(). If we do this too early, we might
4150 * miss some resync ee and pages.*/
4151 drbd_process_done_ee(mdev);
4152
4153 kfree(mdev->p_uuid);
4154 mdev->p_uuid = NULL;
4155
4156 if (!drbd_suspended(mdev))
4157 tl_clear(mdev->tconn);
4158
4159 drbd_md_sync(mdev);
4160
4161 fp = FP_DONT_CARE;
4162 if (get_ldev(mdev)) {
4163 fp = mdev->ldev->dc.fencing;
4164 put_ldev(mdev);
4165 }
4166
4167 /* serialize with bitmap writeout triggered by the state change,
4168 * if any. */
4169 wait_event(mdev->misc_wait, !test_bit(BITMAP_IO, &mdev->flags));
4170
4171 /* tcp_close and release of sendpage pages can be deferred. I don't
4172 * want to use SO_LINGER, because apparently it can be deferred for
4173 * more than 20 seconds (longest time I checked).
4174 *
4175 * Actually we don't care for exactly when the network stack does its
4176 * put_page(), but release our reference on these pages right here.
4177 */
4178 i = drbd_release_ee(mdev, &mdev->net_ee);
4179 if (i)
4180 dev_info(DEV, "net_ee not empty, killed %u entries\n", i);
4181 i = atomic_read(&mdev->pp_in_use_by_net);
4182 if (i)
4183 dev_info(DEV, "pp_in_use_by_net = %d, expected 0\n", i);
4184 i = atomic_read(&mdev->pp_in_use);
4185 if (i)
4186 dev_info(DEV, "pp_in_use = %d, expected 0\n", i);
4187
4188 D_ASSERT(list_empty(&mdev->read_ee));
4189 D_ASSERT(list_empty(&mdev->active_ee));
4190 D_ASSERT(list_empty(&mdev->sync_ee));
4191 D_ASSERT(list_empty(&mdev->done_ee));
4192
4193 /* ok, no more ee's on the fly, it is safe to reset the epoch_size */
4194 atomic_set(&mdev->current_epoch->epoch_size, 0);
4195 D_ASSERT(list_empty(&mdev->current_epoch->list));
4196
4197 return 0;
4198 }
4199
4200 /*
4201 * We support PRO_VERSION_MIN to PRO_VERSION_MAX. The protocol version
4202 * we can agree on is stored in agreed_pro_version.
4203 *
4204 * feature flags and the reserved array should be enough room for future
4205 * enhancements of the handshake protocol, and possible plugins...
4206 *
4207 * for now, they are expected to be zero, but ignored.
4208 */
4209 static int drbd_send_features(struct drbd_tconn *tconn)
4210 {
4211 struct drbd_socket *sock;
4212 struct p_connection_features *p;
4213
4214 sock = &tconn->data;
4215 p = conn_prepare_command(tconn, sock);
4216 if (!p)
4217 return -EIO;
4218 memset(p, 0, sizeof(*p));
4219 p->protocol_min = cpu_to_be32(PRO_VERSION_MIN);
4220 p->protocol_max = cpu_to_be32(PRO_VERSION_MAX);
4221 return conn_send_command(tconn, sock, P_CONNECTION_FEATURES, sizeof(*p), NULL, 0);
4222 }
4223
4224 /*
4225 * return values:
4226 * 1 yes, we have a valid connection
4227 * 0 oops, did not work out, please try again
4228 * -1 peer talks different language,
4229 * no point in trying again, please go standalone.
4230 */
4231 static int drbd_do_features(struct drbd_tconn *tconn)
4232 {
4233 /* ASSERT current == tconn->receiver ... */
4234 struct p_connection_features *p;
4235 const int expect = sizeof(struct p_connection_features);
4236 struct packet_info pi;
4237 int err;
4238
4239 err = drbd_send_features(tconn);
4240 if (err)
4241 return 0;
4242
4243 err = drbd_recv_header(tconn, &pi);
4244 if (err)
4245 return 0;
4246
4247 if (pi.cmd != P_CONNECTION_FEATURES) {
4248 conn_err(tconn, "expected ConnectionFeatures packet, received: %s (0x%04x)\n",
4249 cmdname(pi.cmd), pi.cmd);
4250 return -1;
4251 }
4252
4253 if (pi.size != expect) {
4254 conn_err(tconn, "expected ConnectionFeatures length: %u, received: %u\n",
4255 expect, pi.size);
4256 return -1;
4257 }
4258
4259 p = pi.data;
4260 err = drbd_recv_all_warn(tconn, p, expect);
4261 if (err)
4262 return 0;
4263
4264 p->protocol_min = be32_to_cpu(p->protocol_min);
4265 p->protocol_max = be32_to_cpu(p->protocol_max);
4266 if (p->protocol_max == 0)
4267 p->protocol_max = p->protocol_min;
4268
4269 if (PRO_VERSION_MAX < p->protocol_min ||
4270 PRO_VERSION_MIN > p->protocol_max)
4271 goto incompat;
4272
4273 tconn->agreed_pro_version = min_t(int, PRO_VERSION_MAX, p->protocol_max);
4274
4275 conn_info(tconn, "Handshake successful: "
4276 "Agreed network protocol version %d\n", tconn->agreed_pro_version);
4277
4278 return 1;
4279
4280 incompat:
4281 conn_err(tconn, "incompatible DRBD dialects: "
4282 "I support %d-%d, peer supports %d-%d\n",
4283 PRO_VERSION_MIN, PRO_VERSION_MAX,
4284 p->protocol_min, p->protocol_max);
4285 return -1;
4286 }
4287
4288 #if !defined(CONFIG_CRYPTO_HMAC) && !defined(CONFIG_CRYPTO_HMAC_MODULE)
4289 static int drbd_do_auth(struct drbd_tconn *tconn)
4290 {
4291 dev_err(DEV, "This kernel was build without CONFIG_CRYPTO_HMAC.\n");
4292 dev_err(DEV, "You need to disable 'cram-hmac-alg' in drbd.conf.\n");
4293 return -1;
4294 }
4295 #else
4296 #define CHALLENGE_LEN 64
4297
4298 /* Return value:
4299 1 - auth succeeded,
4300 0 - failed, try again (network error),
4301 -1 - auth failed, don't try again.
4302 */
4303
4304 static int drbd_do_auth(struct drbd_tconn *tconn)
4305 {
4306 struct drbd_socket *sock;
4307 char my_challenge[CHALLENGE_LEN]; /* 64 Bytes... */
4308 struct scatterlist sg;
4309 char *response = NULL;
4310 char *right_response = NULL;
4311 char *peers_ch = NULL;
4312 unsigned int key_len = strlen(tconn->net_conf->shared_secret);
4313 unsigned int resp_size;
4314 struct hash_desc desc;
4315 struct packet_info pi;
4316 int err, rv;
4317
4318 /* FIXME: Put the challenge/response into the preallocated socket buffer. */
4319
4320 desc.tfm = tconn->cram_hmac_tfm;
4321 desc.flags = 0;
4322
4323 rv = crypto_hash_setkey(tconn->cram_hmac_tfm,
4324 (u8 *)tconn->net_conf->shared_secret, key_len);
4325 if (rv) {
4326 conn_err(tconn, "crypto_hash_setkey() failed with %d\n", rv);
4327 rv = -1;
4328 goto fail;
4329 }
4330
4331 get_random_bytes(my_challenge, CHALLENGE_LEN);
4332
4333 sock = &tconn->data;
4334 if (!conn_prepare_command(tconn, sock)) {
4335 rv = 0;
4336 goto fail;
4337 }
4338 rv = !conn_send_command(tconn, sock, P_AUTH_CHALLENGE, 0,
4339 my_challenge, CHALLENGE_LEN);
4340 if (!rv)
4341 goto fail;
4342
4343 err = drbd_recv_header(tconn, &pi);
4344 if (err) {
4345 rv = 0;
4346 goto fail;
4347 }
4348
4349 if (pi.cmd != P_AUTH_CHALLENGE) {
4350 conn_err(tconn, "expected AuthChallenge packet, received: %s (0x%04x)\n",
4351 cmdname(pi.cmd), pi.cmd);
4352 rv = 0;
4353 goto fail;
4354 }
4355
4356 if (pi.size > CHALLENGE_LEN * 2) {
4357 conn_err(tconn, "expected AuthChallenge payload too big.\n");
4358 rv = -1;
4359 goto fail;
4360 }
4361
4362 peers_ch = kmalloc(pi.size, GFP_NOIO);
4363 if (peers_ch == NULL) {
4364 conn_err(tconn, "kmalloc of peers_ch failed\n");
4365 rv = -1;
4366 goto fail;
4367 }
4368
4369 err = drbd_recv_all_warn(tconn, peers_ch, pi.size);
4370 if (err) {
4371 rv = 0;
4372 goto fail;
4373 }
4374
4375 resp_size = crypto_hash_digestsize(tconn->cram_hmac_tfm);
4376 response = kmalloc(resp_size, GFP_NOIO);
4377 if (response == NULL) {
4378 conn_err(tconn, "kmalloc of response failed\n");
4379 rv = -1;
4380 goto fail;
4381 }
4382
4383 sg_init_table(&sg, 1);
4384 sg_set_buf(&sg, peers_ch, pi.size);
4385
4386 rv = crypto_hash_digest(&desc, &sg, sg.length, response);
4387 if (rv) {
4388 conn_err(tconn, "crypto_hash_digest() failed with %d\n", rv);
4389 rv = -1;
4390 goto fail;
4391 }
4392
4393 if (!conn_prepare_command(tconn, sock)) {
4394 rv = 0;
4395 goto fail;
4396 }
4397 rv = !conn_send_command(tconn, sock, P_AUTH_RESPONSE, 0,
4398 response, resp_size);
4399 if (!rv)
4400 goto fail;
4401
4402 err = drbd_recv_header(tconn, &pi);
4403 if (err) {
4404 rv = 0;
4405 goto fail;
4406 }
4407
4408 if (pi.cmd != P_AUTH_RESPONSE) {
4409 conn_err(tconn, "expected AuthResponse packet, received: %s (0x%04x)\n",
4410 cmdname(pi.cmd), pi.cmd);
4411 rv = 0;
4412 goto fail;
4413 }
4414
4415 if (pi.size != resp_size) {
4416 conn_err(tconn, "expected AuthResponse payload of wrong size\n");
4417 rv = 0;
4418 goto fail;
4419 }
4420
4421 err = drbd_recv_all_warn(tconn, response , resp_size);
4422 if (err) {
4423 rv = 0;
4424 goto fail;
4425 }
4426
4427 right_response = kmalloc(resp_size, GFP_NOIO);
4428 if (right_response == NULL) {
4429 conn_err(tconn, "kmalloc of right_response failed\n");
4430 rv = -1;
4431 goto fail;
4432 }
4433
4434 sg_set_buf(&sg, my_challenge, CHALLENGE_LEN);
4435
4436 rv = crypto_hash_digest(&desc, &sg, sg.length, right_response);
4437 if (rv) {
4438 conn_err(tconn, "crypto_hash_digest() failed with %d\n", rv);
4439 rv = -1;
4440 goto fail;
4441 }
4442
4443 rv = !memcmp(response, right_response, resp_size);
4444
4445 if (rv)
4446 conn_info(tconn, "Peer authenticated using %d bytes of '%s' HMAC\n",
4447 resp_size, tconn->net_conf->cram_hmac_alg);
4448 else
4449 rv = -1;
4450
4451 fail:
4452 kfree(peers_ch);
4453 kfree(response);
4454 kfree(right_response);
4455
4456 return rv;
4457 }
4458 #endif
4459
4460 int drbdd_init(struct drbd_thread *thi)
4461 {
4462 struct drbd_tconn *tconn = thi->tconn;
4463 int h;
4464
4465 conn_info(tconn, "receiver (re)started\n");
4466
4467 do {
4468 h = drbd_connect(tconn);
4469 if (h == 0) {
4470 drbd_disconnect(tconn);
4471 schedule_timeout_interruptible(HZ);
4472 }
4473 if (h == -1) {
4474 conn_warn(tconn, "Discarding network configuration.\n");
4475 conn_request_state(tconn, NS(conn, C_DISCONNECTING), CS_HARD);
4476 }
4477 } while (h == 0);
4478
4479 if (h > 0) {
4480 if (get_net_conf(tconn)) {
4481 drbdd(tconn);
4482 put_net_conf(tconn);
4483 }
4484 }
4485
4486 drbd_disconnect(tconn);
4487
4488 conn_info(tconn, "receiver terminated\n");
4489 return 0;
4490 }
4491
4492 /* ********* acknowledge sender ******** */
4493
4494 static int got_conn_RqSReply(struct drbd_tconn *tconn, struct packet_info *pi)
4495 {
4496 struct p_req_state_reply *p = pi->data;
4497 int retcode = be32_to_cpu(p->retcode);
4498
4499 if (retcode >= SS_SUCCESS) {
4500 set_bit(CONN_WD_ST_CHG_OKAY, &tconn->flags);
4501 } else {
4502 set_bit(CONN_WD_ST_CHG_FAIL, &tconn->flags);
4503 conn_err(tconn, "Requested state change failed by peer: %s (%d)\n",
4504 drbd_set_st_err_str(retcode), retcode);
4505 }
4506 wake_up(&tconn->ping_wait);
4507
4508 return 0;
4509 }
4510
4511 static int got_RqSReply(struct drbd_tconn *tconn, struct packet_info *pi)
4512 {
4513 struct drbd_conf *mdev;
4514 struct p_req_state_reply *p = pi->data;
4515 int retcode = be32_to_cpu(p->retcode);
4516
4517 mdev = vnr_to_mdev(tconn, pi->vnr);
4518 if (!mdev)
4519 return -EIO;
4520
4521 if (retcode >= SS_SUCCESS) {
4522 set_bit(CL_ST_CHG_SUCCESS, &mdev->flags);
4523 } else {
4524 set_bit(CL_ST_CHG_FAIL, &mdev->flags);
4525 dev_err(DEV, "Requested state change failed by peer: %s (%d)\n",
4526 drbd_set_st_err_str(retcode), retcode);
4527 }
4528 wake_up(&mdev->state_wait);
4529
4530 return 0;
4531 }
4532
4533 static int got_Ping(struct drbd_tconn *tconn, struct packet_info *pi)
4534 {
4535 return drbd_send_ping_ack(tconn);
4536
4537 }
4538
4539 static int got_PingAck(struct drbd_tconn *tconn, struct packet_info *pi)
4540 {
4541 /* restore idle timeout */
4542 tconn->meta.socket->sk->sk_rcvtimeo = tconn->net_conf->ping_int*HZ;
4543 if (!test_and_set_bit(GOT_PING_ACK, &tconn->flags))
4544 wake_up(&tconn->ping_wait);
4545
4546 return 0;
4547 }
4548
4549 static int got_IsInSync(struct drbd_tconn *tconn, struct packet_info *pi)
4550 {
4551 struct drbd_conf *mdev;
4552 struct p_block_ack *p = pi->data;
4553 sector_t sector = be64_to_cpu(p->sector);
4554 int blksize = be32_to_cpu(p->blksize);
4555
4556 mdev = vnr_to_mdev(tconn, pi->vnr);
4557 if (!mdev)
4558 return -EIO;
4559
4560 D_ASSERT(mdev->tconn->agreed_pro_version >= 89);
4561
4562 update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4563
4564 if (get_ldev(mdev)) {
4565 drbd_rs_complete_io(mdev, sector);
4566 drbd_set_in_sync(mdev, sector, blksize);
4567 /* rs_same_csums is supposed to count in units of BM_BLOCK_SIZE */
4568 mdev->rs_same_csum += (blksize >> BM_BLOCK_SHIFT);
4569 put_ldev(mdev);
4570 }
4571 dec_rs_pending(mdev);
4572 atomic_add(blksize >> 9, &mdev->rs_sect_in);
4573
4574 return 0;
4575 }
4576
4577 static int
4578 validate_req_change_req_state(struct drbd_conf *mdev, u64 id, sector_t sector,
4579 struct rb_root *root, const char *func,
4580 enum drbd_req_event what, bool missing_ok)
4581 {
4582 struct drbd_request *req;
4583 struct bio_and_error m;
4584
4585 spin_lock_irq(&mdev->tconn->req_lock);
4586 req = find_request(mdev, root, id, sector, missing_ok, func);
4587 if (unlikely(!req)) {
4588 spin_unlock_irq(&mdev->tconn->req_lock);
4589 return -EIO;
4590 }
4591 __req_mod(req, what, &m);
4592 spin_unlock_irq(&mdev->tconn->req_lock);
4593
4594 if (m.bio)
4595 complete_master_bio(mdev, &m);
4596 return 0;
4597 }
4598
4599 static int got_BlockAck(struct drbd_tconn *tconn, struct packet_info *pi)
4600 {
4601 struct drbd_conf *mdev;
4602 struct p_block_ack *p = pi->data;
4603 sector_t sector = be64_to_cpu(p->sector);
4604 int blksize = be32_to_cpu(p->blksize);
4605 enum drbd_req_event what;
4606
4607 mdev = vnr_to_mdev(tconn, pi->vnr);
4608 if (!mdev)
4609 return -EIO;
4610
4611 update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4612
4613 if (p->block_id == ID_SYNCER) {
4614 drbd_set_in_sync(mdev, sector, blksize);
4615 dec_rs_pending(mdev);
4616 return 0;
4617 }
4618 switch (pi->cmd) {
4619 case P_RS_WRITE_ACK:
4620 D_ASSERT(mdev->tconn->net_conf->wire_protocol == DRBD_PROT_C);
4621 what = WRITE_ACKED_BY_PEER_AND_SIS;
4622 break;
4623 case P_WRITE_ACK:
4624 D_ASSERT(mdev->tconn->net_conf->wire_protocol == DRBD_PROT_C);
4625 what = WRITE_ACKED_BY_PEER;
4626 break;
4627 case P_RECV_ACK:
4628 D_ASSERT(mdev->tconn->net_conf->wire_protocol == DRBD_PROT_B);
4629 what = RECV_ACKED_BY_PEER;
4630 break;
4631 case P_DISCARD_WRITE:
4632 D_ASSERT(mdev->tconn->net_conf->wire_protocol == DRBD_PROT_C);
4633 what = DISCARD_WRITE;
4634 break;
4635 case P_RETRY_WRITE:
4636 D_ASSERT(mdev->tconn->net_conf->wire_protocol == DRBD_PROT_C);
4637 what = POSTPONE_WRITE;
4638 break;
4639 default:
4640 BUG();
4641 }
4642
4643 return validate_req_change_req_state(mdev, p->block_id, sector,
4644 &mdev->write_requests, __func__,
4645 what, false);
4646 }
4647
4648 static int got_NegAck(struct drbd_tconn *tconn, struct packet_info *pi)
4649 {
4650 struct drbd_conf *mdev;
4651 struct p_block_ack *p = pi->data;
4652 sector_t sector = be64_to_cpu(p->sector);
4653 int size = be32_to_cpu(p->blksize);
4654 bool missing_ok = tconn->net_conf->wire_protocol == DRBD_PROT_A ||
4655 tconn->net_conf->wire_protocol == DRBD_PROT_B;
4656 int err;
4657
4658 mdev = vnr_to_mdev(tconn, pi->vnr);
4659 if (!mdev)
4660 return -EIO;
4661
4662 update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4663
4664 if (p->block_id == ID_SYNCER) {
4665 dec_rs_pending(mdev);
4666 drbd_rs_failed_io(mdev, sector, size);
4667 return 0;
4668 }
4669
4670 err = validate_req_change_req_state(mdev, p->block_id, sector,
4671 &mdev->write_requests, __func__,
4672 NEG_ACKED, missing_ok);
4673 if (err) {
4674 /* Protocol A has no P_WRITE_ACKs, but has P_NEG_ACKs.
4675 The master bio might already be completed, therefore the
4676 request is no longer in the collision hash. */
4677 /* In Protocol B we might already have got a P_RECV_ACK
4678 but then get a P_NEG_ACK afterwards. */
4679 if (!missing_ok)
4680 return err;
4681 drbd_set_out_of_sync(mdev, sector, size);
4682 }
4683 return 0;
4684 }
4685
4686 static int got_NegDReply(struct drbd_tconn *tconn, struct packet_info *pi)
4687 {
4688 struct drbd_conf *mdev;
4689 struct p_block_ack *p = pi->data;
4690 sector_t sector = be64_to_cpu(p->sector);
4691
4692 mdev = vnr_to_mdev(tconn, pi->vnr);
4693 if (!mdev)
4694 return -EIO;
4695
4696 update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4697
4698 dev_err(DEV, "Got NegDReply; Sector %llus, len %u; Fail original request.\n",
4699 (unsigned long long)sector, be32_to_cpu(p->blksize));
4700
4701 return validate_req_change_req_state(mdev, p->block_id, sector,
4702 &mdev->read_requests, __func__,
4703 NEG_ACKED, false);
4704 }
4705
4706 static int got_NegRSDReply(struct drbd_tconn *tconn, struct packet_info *pi)
4707 {
4708 struct drbd_conf *mdev;
4709 sector_t sector;
4710 int size;
4711 struct p_block_ack *p = pi->data;
4712
4713 mdev = vnr_to_mdev(tconn, pi->vnr);
4714 if (!mdev)
4715 return -EIO;
4716
4717 sector = be64_to_cpu(p->sector);
4718 size = be32_to_cpu(p->blksize);
4719
4720 update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4721
4722 dec_rs_pending(mdev);
4723
4724 if (get_ldev_if_state(mdev, D_FAILED)) {
4725 drbd_rs_complete_io(mdev, sector);
4726 switch (pi->cmd) {
4727 case P_NEG_RS_DREPLY:
4728 drbd_rs_failed_io(mdev, sector, size);
4729 case P_RS_CANCEL:
4730 break;
4731 default:
4732 BUG();
4733 }
4734 put_ldev(mdev);
4735 }
4736
4737 return 0;
4738 }
4739
4740 static int got_BarrierAck(struct drbd_tconn *tconn, struct packet_info *pi)
4741 {
4742 struct drbd_conf *mdev;
4743 struct p_barrier_ack *p = pi->data;
4744
4745 mdev = vnr_to_mdev(tconn, pi->vnr);
4746 if (!mdev)
4747 return -EIO;
4748
4749 tl_release(mdev->tconn, p->barrier, be32_to_cpu(p->set_size));
4750
4751 if (mdev->state.conn == C_AHEAD &&
4752 atomic_read(&mdev->ap_in_flight) == 0 &&
4753 !test_and_set_bit(AHEAD_TO_SYNC_SOURCE, &mdev->current_epoch->flags)) {
4754 mdev->start_resync_timer.expires = jiffies + HZ;
4755 add_timer(&mdev->start_resync_timer);
4756 }
4757
4758 return 0;
4759 }
4760
4761 static int got_OVResult(struct drbd_tconn *tconn, struct packet_info *pi)
4762 {
4763 struct drbd_conf *mdev;
4764 struct p_block_ack *p = pi->data;
4765 struct drbd_work *w;
4766 sector_t sector;
4767 int size;
4768
4769 mdev = vnr_to_mdev(tconn, pi->vnr);
4770 if (!mdev)
4771 return -EIO;
4772
4773 sector = be64_to_cpu(p->sector);
4774 size = be32_to_cpu(p->blksize);
4775
4776 update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4777
4778 if (be64_to_cpu(p->block_id) == ID_OUT_OF_SYNC)
4779 drbd_ov_out_of_sync_found(mdev, sector, size);
4780 else
4781 ov_out_of_sync_print(mdev);
4782
4783 if (!get_ldev(mdev))
4784 return 0;
4785
4786 drbd_rs_complete_io(mdev, sector);
4787 dec_rs_pending(mdev);
4788
4789 --mdev->ov_left;
4790
4791 /* let's advance progress step marks only for every other megabyte */
4792 if ((mdev->ov_left & 0x200) == 0x200)
4793 drbd_advance_rs_marks(mdev, mdev->ov_left);
4794
4795 if (mdev->ov_left == 0) {
4796 w = kmalloc(sizeof(*w), GFP_NOIO);
4797 if (w) {
4798 w->cb = w_ov_finished;
4799 w->mdev = mdev;
4800 drbd_queue_work_front(&mdev->tconn->data.work, w);
4801 } else {
4802 dev_err(DEV, "kmalloc(w) failed.");
4803 ov_out_of_sync_print(mdev);
4804 drbd_resync_finished(mdev);
4805 }
4806 }
4807 put_ldev(mdev);
4808 return 0;
4809 }
4810
4811 static int got_skip(struct drbd_tconn *tconn, struct packet_info *pi)
4812 {
4813 return 0;
4814 }
4815
4816 static int tconn_process_done_ee(struct drbd_tconn *tconn)
4817 {
4818 struct drbd_conf *mdev;
4819 int i, not_empty = 0;
4820
4821 do {
4822 clear_bit(SIGNAL_ASENDER, &tconn->flags);
4823 flush_signals(current);
4824 idr_for_each_entry(&tconn->volumes, mdev, i) {
4825 if (drbd_process_done_ee(mdev))
4826 return 1; /* error */
4827 }
4828 set_bit(SIGNAL_ASENDER, &tconn->flags);
4829
4830 spin_lock_irq(&tconn->req_lock);
4831 idr_for_each_entry(&tconn->volumes, mdev, i) {
4832 not_empty = !list_empty(&mdev->done_ee);
4833 if (not_empty)
4834 break;
4835 }
4836 spin_unlock_irq(&tconn->req_lock);
4837 } while (not_empty);
4838
4839 return 0;
4840 }
4841
4842 struct asender_cmd {
4843 size_t pkt_size;
4844 int (*fn)(struct drbd_tconn *tconn, struct packet_info *);
4845 };
4846
4847 static struct asender_cmd asender_tbl[] = {
4848 [P_PING] = { 0, got_Ping },
4849 [P_PING_ACK] = { 0, got_PingAck },
4850 [P_RECV_ACK] = { sizeof(struct p_block_ack), got_BlockAck },
4851 [P_WRITE_ACK] = { sizeof(struct p_block_ack), got_BlockAck },
4852 [P_RS_WRITE_ACK] = { sizeof(struct p_block_ack), got_BlockAck },
4853 [P_DISCARD_WRITE] = { sizeof(struct p_block_ack), got_BlockAck },
4854 [P_NEG_ACK] = { sizeof(struct p_block_ack), got_NegAck },
4855 [P_NEG_DREPLY] = { sizeof(struct p_block_ack), got_NegDReply },
4856 [P_NEG_RS_DREPLY] = { sizeof(struct p_block_ack), got_NegRSDReply },
4857 [P_OV_RESULT] = { sizeof(struct p_block_ack), got_OVResult },
4858 [P_BARRIER_ACK] = { sizeof(struct p_barrier_ack), got_BarrierAck },
4859 [P_STATE_CHG_REPLY] = { sizeof(struct p_req_state_reply), got_RqSReply },
4860 [P_RS_IS_IN_SYNC] = { sizeof(struct p_block_ack), got_IsInSync },
4861 [P_DELAY_PROBE] = { sizeof(struct p_delay_probe93), got_skip },
4862 [P_RS_CANCEL] = { sizeof(struct p_block_ack), got_NegRSDReply },
4863 [P_CONN_ST_CHG_REPLY]={ sizeof(struct p_req_state_reply), got_conn_RqSReply },
4864 [P_RETRY_WRITE] = { sizeof(struct p_block_ack), got_BlockAck },
4865 };
4866
4867 int drbd_asender(struct drbd_thread *thi)
4868 {
4869 struct drbd_tconn *tconn = thi->tconn;
4870 struct asender_cmd *cmd = NULL;
4871 struct packet_info pi;
4872 int rv;
4873 void *buf = tconn->meta.rbuf;
4874 int received = 0;
4875 unsigned int header_size = drbd_header_size(tconn);
4876 int expect = header_size;
4877 int ping_timeout_active = 0;
4878
4879 current->policy = SCHED_RR; /* Make this a realtime task! */
4880 current->rt_priority = 2; /* more important than all other tasks */
4881
4882 while (get_t_state(thi) == RUNNING) {
4883 drbd_thread_current_set_cpu(thi);
4884 if (test_and_clear_bit(SEND_PING, &tconn->flags)) {
4885 if (drbd_send_ping(tconn)) {
4886 conn_err(tconn, "drbd_send_ping has failed\n");
4887 goto reconnect;
4888 }
4889 tconn->meta.socket->sk->sk_rcvtimeo =
4890 tconn->net_conf->ping_timeo*HZ/10;
4891 ping_timeout_active = 1;
4892 }
4893
4894 /* TODO: conditionally cork; it may hurt latency if we cork without
4895 much to send */
4896 if (!tconn->net_conf->no_cork)
4897 drbd_tcp_cork(tconn->meta.socket);
4898 if (tconn_process_done_ee(tconn)) {
4899 conn_err(tconn, "tconn_process_done_ee() failed\n");
4900 goto reconnect;
4901 }
4902 /* but unconditionally uncork unless disabled */
4903 if (!tconn->net_conf->no_cork)
4904 drbd_tcp_uncork(tconn->meta.socket);
4905
4906 /* short circuit, recv_msg would return EINTR anyways. */
4907 if (signal_pending(current))
4908 continue;
4909
4910 rv = drbd_recv_short(tconn->meta.socket, buf, expect-received, 0);
4911 clear_bit(SIGNAL_ASENDER, &tconn->flags);
4912
4913 flush_signals(current);
4914
4915 /* Note:
4916 * -EINTR (on meta) we got a signal
4917 * -EAGAIN (on meta) rcvtimeo expired
4918 * -ECONNRESET other side closed the connection
4919 * -ERESTARTSYS (on data) we got a signal
4920 * rv < 0 other than above: unexpected error!
4921 * rv == expected: full header or command
4922 * rv < expected: "woken" by signal during receive
4923 * rv == 0 : "connection shut down by peer"
4924 */
4925 if (likely(rv > 0)) {
4926 received += rv;
4927 buf += rv;
4928 } else if (rv == 0) {
4929 conn_err(tconn, "meta connection shut down by peer.\n");
4930 goto reconnect;
4931 } else if (rv == -EAGAIN) {
4932 /* If the data socket received something meanwhile,
4933 * that is good enough: peer is still alive. */
4934 if (time_after(tconn->last_received,
4935 jiffies - tconn->meta.socket->sk->sk_rcvtimeo))
4936 continue;
4937 if (ping_timeout_active) {
4938 conn_err(tconn, "PingAck did not arrive in time.\n");
4939 goto reconnect;
4940 }
4941 set_bit(SEND_PING, &tconn->flags);
4942 continue;
4943 } else if (rv == -EINTR) {
4944 continue;
4945 } else {
4946 conn_err(tconn, "sock_recvmsg returned %d\n", rv);
4947 goto reconnect;
4948 }
4949
4950 if (received == expect && cmd == NULL) {
4951 if (decode_header(tconn, tconn->meta.rbuf, &pi))
4952 goto reconnect;
4953 cmd = &asender_tbl[pi.cmd];
4954 if (pi.cmd >= ARRAY_SIZE(asender_tbl) || !cmd->fn) {
4955 conn_err(tconn, "unknown command %d on meta (l: %d)\n",
4956 pi.cmd, pi.size);
4957 goto disconnect;
4958 }
4959 expect = header_size + cmd->pkt_size;
4960 if (pi.size != expect - header_size) {
4961 conn_err(tconn, "Wrong packet size on meta (c: %d, l: %d)\n",
4962 pi.cmd, pi.size);
4963 goto reconnect;
4964 }
4965 }
4966 if (received == expect) {
4967 bool err;
4968
4969 err = cmd->fn(tconn, &pi);
4970 if (err) {
4971 conn_err(tconn, "%pf failed\n", cmd->fn);
4972 goto reconnect;
4973 }
4974
4975 tconn->last_received = jiffies;
4976
4977 /* the idle_timeout (ping-int)
4978 * has been restored in got_PingAck() */
4979 if (cmd == &asender_tbl[P_PING_ACK])
4980 ping_timeout_active = 0;
4981
4982 buf = tconn->meta.rbuf;
4983 received = 0;
4984 expect = header_size;
4985 cmd = NULL;
4986 }
4987 }
4988
4989 if (0) {
4990 reconnect:
4991 conn_request_state(tconn, NS(conn, C_NETWORK_FAILURE), CS_HARD);
4992 }
4993 if (0) {
4994 disconnect:
4995 conn_request_state(tconn, NS(conn, C_DISCONNECTING), CS_HARD);
4996 }
4997 clear_bit(SIGNAL_ASENDER, &tconn->flags);
4998
4999 conn_info(tconn, "asender terminated\n");
5000
5001 return 0;
5002 }
This page took 0.145395 seconds and 5 git commands to generate.