block, drivers, fs: rename REQ_FLUSH to REQ_PREFLUSH
[deliverable/linux.git] / drivers / block / drbd / drbd_receiver.c
1 /*
2 drbd_receiver.c
3
4 This file is part of DRBD by Philipp Reisner and Lars Ellenberg.
5
6 Copyright (C) 2001-2008, LINBIT Information Technologies GmbH.
7 Copyright (C) 1999-2008, Philipp Reisner <philipp.reisner@linbit.com>.
8 Copyright (C) 2002-2008, Lars Ellenberg <lars.ellenberg@linbit.com>.
9
10 drbd is free software; you can redistribute it and/or modify
11 it under the terms of the GNU General Public License as published by
12 the Free Software Foundation; either version 2, or (at your option)
13 any later version.
14
15 drbd is distributed in the hope that it will be useful,
16 but WITHOUT ANY WARRANTY; without even the implied warranty of
17 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18 GNU General Public License for more details.
19
20 You should have received a copy of the GNU General Public License
21 along with drbd; see the file COPYING. If not, write to
22 the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
23 */
24
25
26 #include <linux/module.h>
27
28 #include <asm/uaccess.h>
29 #include <net/sock.h>
30
31 #include <linux/drbd.h>
32 #include <linux/fs.h>
33 #include <linux/file.h>
34 #include <linux/in.h>
35 #include <linux/mm.h>
36 #include <linux/memcontrol.h>
37 #include <linux/mm_inline.h>
38 #include <linux/slab.h>
39 #include <linux/pkt_sched.h>
40 #define __KERNEL_SYSCALLS__
41 #include <linux/unistd.h>
42 #include <linux/vmalloc.h>
43 #include <linux/random.h>
44 #include <linux/string.h>
45 #include <linux/scatterlist.h>
46 #include "drbd_int.h"
47 #include "drbd_protocol.h"
48 #include "drbd_req.h"
49 #include "drbd_vli.h"
50
51 #define PRO_FEATURES (FF_TRIM)
52
53 struct packet_info {
54 enum drbd_packet cmd;
55 unsigned int size;
56 unsigned int vnr;
57 void *data;
58 };
59
60 enum finish_epoch {
61 FE_STILL_LIVE,
62 FE_DESTROYED,
63 FE_RECYCLED,
64 };
65
66 static int drbd_do_features(struct drbd_connection *connection);
67 static int drbd_do_auth(struct drbd_connection *connection);
68 static int drbd_disconnected(struct drbd_peer_device *);
69 static void conn_wait_active_ee_empty(struct drbd_connection *connection);
70 static enum finish_epoch drbd_may_finish_epoch(struct drbd_connection *, struct drbd_epoch *, enum epoch_event);
71 static int e_end_block(struct drbd_work *, int);
72
73
74 #define GFP_TRY (__GFP_HIGHMEM | __GFP_NOWARN)
75
76 /*
77 * some helper functions to deal with single linked page lists,
78 * page->private being our "next" pointer.
79 */
80
81 /* If at least n pages are linked at head, get n pages off.
82 * Otherwise, don't modify head, and return NULL.
83 * Locking is the responsibility of the caller.
84 */
85 static struct page *page_chain_del(struct page **head, int n)
86 {
87 struct page *page;
88 struct page *tmp;
89
90 BUG_ON(!n);
91 BUG_ON(!head);
92
93 page = *head;
94
95 if (!page)
96 return NULL;
97
98 while (page) {
99 tmp = page_chain_next(page);
100 if (--n == 0)
101 break; /* found sufficient pages */
102 if (tmp == NULL)
103 /* insufficient pages, don't use any of them. */
104 return NULL;
105 page = tmp;
106 }
107
108 /* add end of list marker for the returned list */
109 set_page_private(page, 0);
110 /* actual return value, and adjustment of head */
111 page = *head;
112 *head = tmp;
113 return page;
114 }
115
116 /* may be used outside of locks to find the tail of a (usually short)
117 * "private" page chain, before adding it back to a global chain head
118 * with page_chain_add() under a spinlock. */
119 static struct page *page_chain_tail(struct page *page, int *len)
120 {
121 struct page *tmp;
122 int i = 1;
123 while ((tmp = page_chain_next(page)))
124 ++i, page = tmp;
125 if (len)
126 *len = i;
127 return page;
128 }
129
130 static int page_chain_free(struct page *page)
131 {
132 struct page *tmp;
133 int i = 0;
134 page_chain_for_each_safe(page, tmp) {
135 put_page(page);
136 ++i;
137 }
138 return i;
139 }
140
141 static void page_chain_add(struct page **head,
142 struct page *chain_first, struct page *chain_last)
143 {
144 #if 1
145 struct page *tmp;
146 tmp = page_chain_tail(chain_first, NULL);
147 BUG_ON(tmp != chain_last);
148 #endif
149
150 /* add chain to head */
151 set_page_private(chain_last, (unsigned long)*head);
152 *head = chain_first;
153 }
154
155 static struct page *__drbd_alloc_pages(struct drbd_device *device,
156 unsigned int number)
157 {
158 struct page *page = NULL;
159 struct page *tmp = NULL;
160 unsigned int i = 0;
161
162 /* Yes, testing drbd_pp_vacant outside the lock is racy.
163 * So what. It saves a spin_lock. */
164 if (drbd_pp_vacant >= number) {
165 spin_lock(&drbd_pp_lock);
166 page = page_chain_del(&drbd_pp_pool, number);
167 if (page)
168 drbd_pp_vacant -= number;
169 spin_unlock(&drbd_pp_lock);
170 if (page)
171 return page;
172 }
173
174 /* GFP_TRY, because we must not cause arbitrary write-out: in a DRBD
175 * "criss-cross" setup, that might cause write-out on some other DRBD,
176 * which in turn might block on the other node at this very place. */
177 for (i = 0; i < number; i++) {
178 tmp = alloc_page(GFP_TRY);
179 if (!tmp)
180 break;
181 set_page_private(tmp, (unsigned long)page);
182 page = tmp;
183 }
184
185 if (i == number)
186 return page;
187
188 /* Not enough pages immediately available this time.
189 * No need to jump around here, drbd_alloc_pages will retry this
190 * function "soon". */
191 if (page) {
192 tmp = page_chain_tail(page, NULL);
193 spin_lock(&drbd_pp_lock);
194 page_chain_add(&drbd_pp_pool, page, tmp);
195 drbd_pp_vacant += i;
196 spin_unlock(&drbd_pp_lock);
197 }
198 return NULL;
199 }
200
201 static void reclaim_finished_net_peer_reqs(struct drbd_device *device,
202 struct list_head *to_be_freed)
203 {
204 struct drbd_peer_request *peer_req, *tmp;
205
206 /* The EEs are always appended to the end of the list. Since
207 they are sent in order over the wire, they have to finish
208 in order. As soon as we see the first not finished we can
209 stop to examine the list... */
210
211 list_for_each_entry_safe(peer_req, tmp, &device->net_ee, w.list) {
212 if (drbd_peer_req_has_active_page(peer_req))
213 break;
214 list_move(&peer_req->w.list, to_be_freed);
215 }
216 }
217
218 static void drbd_reclaim_net_peer_reqs(struct drbd_device *device)
219 {
220 LIST_HEAD(reclaimed);
221 struct drbd_peer_request *peer_req, *t;
222
223 spin_lock_irq(&device->resource->req_lock);
224 reclaim_finished_net_peer_reqs(device, &reclaimed);
225 spin_unlock_irq(&device->resource->req_lock);
226 list_for_each_entry_safe(peer_req, t, &reclaimed, w.list)
227 drbd_free_net_peer_req(device, peer_req);
228 }
229
230 static void conn_reclaim_net_peer_reqs(struct drbd_connection *connection)
231 {
232 struct drbd_peer_device *peer_device;
233 int vnr;
234
235 rcu_read_lock();
236 idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
237 struct drbd_device *device = peer_device->device;
238 if (!atomic_read(&device->pp_in_use_by_net))
239 continue;
240
241 kref_get(&device->kref);
242 rcu_read_unlock();
243 drbd_reclaim_net_peer_reqs(device);
244 kref_put(&device->kref, drbd_destroy_device);
245 rcu_read_lock();
246 }
247 rcu_read_unlock();
248 }
249
250 /**
251 * drbd_alloc_pages() - Returns @number pages, retries forever (or until signalled)
252 * @device: DRBD device.
253 * @number: number of pages requested
254 * @retry: whether to retry, if not enough pages are available right now
255 *
256 * Tries to allocate number pages, first from our own page pool, then from
257 * the kernel.
258 * Possibly retry until DRBD frees sufficient pages somewhere else.
259 *
260 * If this allocation would exceed the max_buffers setting, we throttle
261 * allocation (schedule_timeout) to give the system some room to breathe.
262 *
263 * We do not use max-buffers as hard limit, because it could lead to
264 * congestion and further to a distributed deadlock during online-verify or
265 * (checksum based) resync, if the max-buffers, socket buffer sizes and
266 * resync-rate settings are mis-configured.
267 *
268 * Returns a page chain linked via page->private.
269 */
270 struct page *drbd_alloc_pages(struct drbd_peer_device *peer_device, unsigned int number,
271 bool retry)
272 {
273 struct drbd_device *device = peer_device->device;
274 struct page *page = NULL;
275 struct net_conf *nc;
276 DEFINE_WAIT(wait);
277 unsigned int mxb;
278
279 rcu_read_lock();
280 nc = rcu_dereference(peer_device->connection->net_conf);
281 mxb = nc ? nc->max_buffers : 1000000;
282 rcu_read_unlock();
283
284 if (atomic_read(&device->pp_in_use) < mxb)
285 page = __drbd_alloc_pages(device, number);
286
287 /* Try to keep the fast path fast, but occasionally we need
288 * to reclaim the pages we lended to the network stack. */
289 if (page && atomic_read(&device->pp_in_use_by_net) > 512)
290 drbd_reclaim_net_peer_reqs(device);
291
292 while (page == NULL) {
293 prepare_to_wait(&drbd_pp_wait, &wait, TASK_INTERRUPTIBLE);
294
295 drbd_reclaim_net_peer_reqs(device);
296
297 if (atomic_read(&device->pp_in_use) < mxb) {
298 page = __drbd_alloc_pages(device, number);
299 if (page)
300 break;
301 }
302
303 if (!retry)
304 break;
305
306 if (signal_pending(current)) {
307 drbd_warn(device, "drbd_alloc_pages interrupted!\n");
308 break;
309 }
310
311 if (schedule_timeout(HZ/10) == 0)
312 mxb = UINT_MAX;
313 }
314 finish_wait(&drbd_pp_wait, &wait);
315
316 if (page)
317 atomic_add(number, &device->pp_in_use);
318 return page;
319 }
320
321 /* Must not be used from irq, as that may deadlock: see drbd_alloc_pages.
322 * Is also used from inside an other spin_lock_irq(&resource->req_lock);
323 * Either links the page chain back to the global pool,
324 * or returns all pages to the system. */
325 static void drbd_free_pages(struct drbd_device *device, struct page *page, int is_net)
326 {
327 atomic_t *a = is_net ? &device->pp_in_use_by_net : &device->pp_in_use;
328 int i;
329
330 if (page == NULL)
331 return;
332
333 if (drbd_pp_vacant > (DRBD_MAX_BIO_SIZE/PAGE_SIZE) * minor_count)
334 i = page_chain_free(page);
335 else {
336 struct page *tmp;
337 tmp = page_chain_tail(page, &i);
338 spin_lock(&drbd_pp_lock);
339 page_chain_add(&drbd_pp_pool, page, tmp);
340 drbd_pp_vacant += i;
341 spin_unlock(&drbd_pp_lock);
342 }
343 i = atomic_sub_return(i, a);
344 if (i < 0)
345 drbd_warn(device, "ASSERTION FAILED: %s: %d < 0\n",
346 is_net ? "pp_in_use_by_net" : "pp_in_use", i);
347 wake_up(&drbd_pp_wait);
348 }
349
350 /*
351 You need to hold the req_lock:
352 _drbd_wait_ee_list_empty()
353
354 You must not have the req_lock:
355 drbd_free_peer_req()
356 drbd_alloc_peer_req()
357 drbd_free_peer_reqs()
358 drbd_ee_fix_bhs()
359 drbd_finish_peer_reqs()
360 drbd_clear_done_ee()
361 drbd_wait_ee_list_empty()
362 */
363
364 struct drbd_peer_request *
365 drbd_alloc_peer_req(struct drbd_peer_device *peer_device, u64 id, sector_t sector,
366 unsigned int data_size, bool has_payload, gfp_t gfp_mask) __must_hold(local)
367 {
368 struct drbd_device *device = peer_device->device;
369 struct drbd_peer_request *peer_req;
370 struct page *page = NULL;
371 unsigned nr_pages = (data_size + PAGE_SIZE -1) >> PAGE_SHIFT;
372
373 if (drbd_insert_fault(device, DRBD_FAULT_AL_EE))
374 return NULL;
375
376 peer_req = mempool_alloc(drbd_ee_mempool, gfp_mask & ~__GFP_HIGHMEM);
377 if (!peer_req) {
378 if (!(gfp_mask & __GFP_NOWARN))
379 drbd_err(device, "%s: allocation failed\n", __func__);
380 return NULL;
381 }
382
383 if (has_payload && data_size) {
384 page = drbd_alloc_pages(peer_device, nr_pages,
385 gfpflags_allow_blocking(gfp_mask));
386 if (!page)
387 goto fail;
388 }
389
390 memset(peer_req, 0, sizeof(*peer_req));
391 INIT_LIST_HEAD(&peer_req->w.list);
392 drbd_clear_interval(&peer_req->i);
393 peer_req->i.size = data_size;
394 peer_req->i.sector = sector;
395 peer_req->submit_jif = jiffies;
396 peer_req->peer_device = peer_device;
397 peer_req->pages = page;
398 /*
399 * The block_id is opaque to the receiver. It is not endianness
400 * converted, and sent back to the sender unchanged.
401 */
402 peer_req->block_id = id;
403
404 return peer_req;
405
406 fail:
407 mempool_free(peer_req, drbd_ee_mempool);
408 return NULL;
409 }
410
411 void __drbd_free_peer_req(struct drbd_device *device, struct drbd_peer_request *peer_req,
412 int is_net)
413 {
414 might_sleep();
415 if (peer_req->flags & EE_HAS_DIGEST)
416 kfree(peer_req->digest);
417 drbd_free_pages(device, peer_req->pages, is_net);
418 D_ASSERT(device, atomic_read(&peer_req->pending_bios) == 0);
419 D_ASSERT(device, drbd_interval_empty(&peer_req->i));
420 if (!expect(!(peer_req->flags & EE_CALL_AL_COMPLETE_IO))) {
421 peer_req->flags &= ~EE_CALL_AL_COMPLETE_IO;
422 drbd_al_complete_io(device, &peer_req->i);
423 }
424 mempool_free(peer_req, drbd_ee_mempool);
425 }
426
427 int drbd_free_peer_reqs(struct drbd_device *device, struct list_head *list)
428 {
429 LIST_HEAD(work_list);
430 struct drbd_peer_request *peer_req, *t;
431 int count = 0;
432 int is_net = list == &device->net_ee;
433
434 spin_lock_irq(&device->resource->req_lock);
435 list_splice_init(list, &work_list);
436 spin_unlock_irq(&device->resource->req_lock);
437
438 list_for_each_entry_safe(peer_req, t, &work_list, w.list) {
439 __drbd_free_peer_req(device, peer_req, is_net);
440 count++;
441 }
442 return count;
443 }
444
445 /*
446 * See also comments in _req_mod(,BARRIER_ACKED) and receive_Barrier.
447 */
448 static int drbd_finish_peer_reqs(struct drbd_device *device)
449 {
450 LIST_HEAD(work_list);
451 LIST_HEAD(reclaimed);
452 struct drbd_peer_request *peer_req, *t;
453 int err = 0;
454
455 spin_lock_irq(&device->resource->req_lock);
456 reclaim_finished_net_peer_reqs(device, &reclaimed);
457 list_splice_init(&device->done_ee, &work_list);
458 spin_unlock_irq(&device->resource->req_lock);
459
460 list_for_each_entry_safe(peer_req, t, &reclaimed, w.list)
461 drbd_free_net_peer_req(device, peer_req);
462
463 /* possible callbacks here:
464 * e_end_block, and e_end_resync_block, e_send_superseded.
465 * all ignore the last argument.
466 */
467 list_for_each_entry_safe(peer_req, t, &work_list, w.list) {
468 int err2;
469
470 /* list_del not necessary, next/prev members not touched */
471 err2 = peer_req->w.cb(&peer_req->w, !!err);
472 if (!err)
473 err = err2;
474 drbd_free_peer_req(device, peer_req);
475 }
476 wake_up(&device->ee_wait);
477
478 return err;
479 }
480
481 static void _drbd_wait_ee_list_empty(struct drbd_device *device,
482 struct list_head *head)
483 {
484 DEFINE_WAIT(wait);
485
486 /* avoids spin_lock/unlock
487 * and calling prepare_to_wait in the fast path */
488 while (!list_empty(head)) {
489 prepare_to_wait(&device->ee_wait, &wait, TASK_UNINTERRUPTIBLE);
490 spin_unlock_irq(&device->resource->req_lock);
491 io_schedule();
492 finish_wait(&device->ee_wait, &wait);
493 spin_lock_irq(&device->resource->req_lock);
494 }
495 }
496
497 static void drbd_wait_ee_list_empty(struct drbd_device *device,
498 struct list_head *head)
499 {
500 spin_lock_irq(&device->resource->req_lock);
501 _drbd_wait_ee_list_empty(device, head);
502 spin_unlock_irq(&device->resource->req_lock);
503 }
504
505 static int drbd_recv_short(struct socket *sock, void *buf, size_t size, int flags)
506 {
507 struct kvec iov = {
508 .iov_base = buf,
509 .iov_len = size,
510 };
511 struct msghdr msg = {
512 .msg_flags = (flags ? flags : MSG_WAITALL | MSG_NOSIGNAL)
513 };
514 return kernel_recvmsg(sock, &msg, &iov, 1, size, msg.msg_flags);
515 }
516
517 static int drbd_recv(struct drbd_connection *connection, void *buf, size_t size)
518 {
519 int rv;
520
521 rv = drbd_recv_short(connection->data.socket, buf, size, 0);
522
523 if (rv < 0) {
524 if (rv == -ECONNRESET)
525 drbd_info(connection, "sock was reset by peer\n");
526 else if (rv != -ERESTARTSYS)
527 drbd_err(connection, "sock_recvmsg returned %d\n", rv);
528 } else if (rv == 0) {
529 if (test_bit(DISCONNECT_SENT, &connection->flags)) {
530 long t;
531 rcu_read_lock();
532 t = rcu_dereference(connection->net_conf)->ping_timeo * HZ/10;
533 rcu_read_unlock();
534
535 t = wait_event_timeout(connection->ping_wait, connection->cstate < C_WF_REPORT_PARAMS, t);
536
537 if (t)
538 goto out;
539 }
540 drbd_info(connection, "sock was shut down by peer\n");
541 }
542
543 if (rv != size)
544 conn_request_state(connection, NS(conn, C_BROKEN_PIPE), CS_HARD);
545
546 out:
547 return rv;
548 }
549
550 static int drbd_recv_all(struct drbd_connection *connection, void *buf, size_t size)
551 {
552 int err;
553
554 err = drbd_recv(connection, buf, size);
555 if (err != size) {
556 if (err >= 0)
557 err = -EIO;
558 } else
559 err = 0;
560 return err;
561 }
562
563 static int drbd_recv_all_warn(struct drbd_connection *connection, void *buf, size_t size)
564 {
565 int err;
566
567 err = drbd_recv_all(connection, buf, size);
568 if (err && !signal_pending(current))
569 drbd_warn(connection, "short read (expected size %d)\n", (int)size);
570 return err;
571 }
572
573 /* quoting tcp(7):
574 * On individual connections, the socket buffer size must be set prior to the
575 * listen(2) or connect(2) calls in order to have it take effect.
576 * This is our wrapper to do so.
577 */
578 static void drbd_setbufsize(struct socket *sock, unsigned int snd,
579 unsigned int rcv)
580 {
581 /* open coded SO_SNDBUF, SO_RCVBUF */
582 if (snd) {
583 sock->sk->sk_sndbuf = snd;
584 sock->sk->sk_userlocks |= SOCK_SNDBUF_LOCK;
585 }
586 if (rcv) {
587 sock->sk->sk_rcvbuf = rcv;
588 sock->sk->sk_userlocks |= SOCK_RCVBUF_LOCK;
589 }
590 }
591
592 static struct socket *drbd_try_connect(struct drbd_connection *connection)
593 {
594 const char *what;
595 struct socket *sock;
596 struct sockaddr_in6 src_in6;
597 struct sockaddr_in6 peer_in6;
598 struct net_conf *nc;
599 int err, peer_addr_len, my_addr_len;
600 int sndbuf_size, rcvbuf_size, connect_int;
601 int disconnect_on_error = 1;
602
603 rcu_read_lock();
604 nc = rcu_dereference(connection->net_conf);
605 if (!nc) {
606 rcu_read_unlock();
607 return NULL;
608 }
609 sndbuf_size = nc->sndbuf_size;
610 rcvbuf_size = nc->rcvbuf_size;
611 connect_int = nc->connect_int;
612 rcu_read_unlock();
613
614 my_addr_len = min_t(int, connection->my_addr_len, sizeof(src_in6));
615 memcpy(&src_in6, &connection->my_addr, my_addr_len);
616
617 if (((struct sockaddr *)&connection->my_addr)->sa_family == AF_INET6)
618 src_in6.sin6_port = 0;
619 else
620 ((struct sockaddr_in *)&src_in6)->sin_port = 0; /* AF_INET & AF_SCI */
621
622 peer_addr_len = min_t(int, connection->peer_addr_len, sizeof(src_in6));
623 memcpy(&peer_in6, &connection->peer_addr, peer_addr_len);
624
625 what = "sock_create_kern";
626 err = sock_create_kern(&init_net, ((struct sockaddr *)&src_in6)->sa_family,
627 SOCK_STREAM, IPPROTO_TCP, &sock);
628 if (err < 0) {
629 sock = NULL;
630 goto out;
631 }
632
633 sock->sk->sk_rcvtimeo =
634 sock->sk->sk_sndtimeo = connect_int * HZ;
635 drbd_setbufsize(sock, sndbuf_size, rcvbuf_size);
636
637 /* explicitly bind to the configured IP as source IP
638 * for the outgoing connections.
639 * This is needed for multihomed hosts and to be
640 * able to use lo: interfaces for drbd.
641 * Make sure to use 0 as port number, so linux selects
642 * a free one dynamically.
643 */
644 what = "bind before connect";
645 err = sock->ops->bind(sock, (struct sockaddr *) &src_in6, my_addr_len);
646 if (err < 0)
647 goto out;
648
649 /* connect may fail, peer not yet available.
650 * stay C_WF_CONNECTION, don't go Disconnecting! */
651 disconnect_on_error = 0;
652 what = "connect";
653 err = sock->ops->connect(sock, (struct sockaddr *) &peer_in6, peer_addr_len, 0);
654
655 out:
656 if (err < 0) {
657 if (sock) {
658 sock_release(sock);
659 sock = NULL;
660 }
661 switch (-err) {
662 /* timeout, busy, signal pending */
663 case ETIMEDOUT: case EAGAIN: case EINPROGRESS:
664 case EINTR: case ERESTARTSYS:
665 /* peer not (yet) available, network problem */
666 case ECONNREFUSED: case ENETUNREACH:
667 case EHOSTDOWN: case EHOSTUNREACH:
668 disconnect_on_error = 0;
669 break;
670 default:
671 drbd_err(connection, "%s failed, err = %d\n", what, err);
672 }
673 if (disconnect_on_error)
674 conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD);
675 }
676
677 return sock;
678 }
679
680 struct accept_wait_data {
681 struct drbd_connection *connection;
682 struct socket *s_listen;
683 struct completion door_bell;
684 void (*original_sk_state_change)(struct sock *sk);
685
686 };
687
688 static void drbd_incoming_connection(struct sock *sk)
689 {
690 struct accept_wait_data *ad = sk->sk_user_data;
691 void (*state_change)(struct sock *sk);
692
693 state_change = ad->original_sk_state_change;
694 if (sk->sk_state == TCP_ESTABLISHED)
695 complete(&ad->door_bell);
696 state_change(sk);
697 }
698
699 static int prepare_listen_socket(struct drbd_connection *connection, struct accept_wait_data *ad)
700 {
701 int err, sndbuf_size, rcvbuf_size, my_addr_len;
702 struct sockaddr_in6 my_addr;
703 struct socket *s_listen;
704 struct net_conf *nc;
705 const char *what;
706
707 rcu_read_lock();
708 nc = rcu_dereference(connection->net_conf);
709 if (!nc) {
710 rcu_read_unlock();
711 return -EIO;
712 }
713 sndbuf_size = nc->sndbuf_size;
714 rcvbuf_size = nc->rcvbuf_size;
715 rcu_read_unlock();
716
717 my_addr_len = min_t(int, connection->my_addr_len, sizeof(struct sockaddr_in6));
718 memcpy(&my_addr, &connection->my_addr, my_addr_len);
719
720 what = "sock_create_kern";
721 err = sock_create_kern(&init_net, ((struct sockaddr *)&my_addr)->sa_family,
722 SOCK_STREAM, IPPROTO_TCP, &s_listen);
723 if (err) {
724 s_listen = NULL;
725 goto out;
726 }
727
728 s_listen->sk->sk_reuse = SK_CAN_REUSE; /* SO_REUSEADDR */
729 drbd_setbufsize(s_listen, sndbuf_size, rcvbuf_size);
730
731 what = "bind before listen";
732 err = s_listen->ops->bind(s_listen, (struct sockaddr *)&my_addr, my_addr_len);
733 if (err < 0)
734 goto out;
735
736 ad->s_listen = s_listen;
737 write_lock_bh(&s_listen->sk->sk_callback_lock);
738 ad->original_sk_state_change = s_listen->sk->sk_state_change;
739 s_listen->sk->sk_state_change = drbd_incoming_connection;
740 s_listen->sk->sk_user_data = ad;
741 write_unlock_bh(&s_listen->sk->sk_callback_lock);
742
743 what = "listen";
744 err = s_listen->ops->listen(s_listen, 5);
745 if (err < 0)
746 goto out;
747
748 return 0;
749 out:
750 if (s_listen)
751 sock_release(s_listen);
752 if (err < 0) {
753 if (err != -EAGAIN && err != -EINTR && err != -ERESTARTSYS) {
754 drbd_err(connection, "%s failed, err = %d\n", what, err);
755 conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD);
756 }
757 }
758
759 return -EIO;
760 }
761
762 static void unregister_state_change(struct sock *sk, struct accept_wait_data *ad)
763 {
764 write_lock_bh(&sk->sk_callback_lock);
765 sk->sk_state_change = ad->original_sk_state_change;
766 sk->sk_user_data = NULL;
767 write_unlock_bh(&sk->sk_callback_lock);
768 }
769
770 static struct socket *drbd_wait_for_connect(struct drbd_connection *connection, struct accept_wait_data *ad)
771 {
772 int timeo, connect_int, err = 0;
773 struct socket *s_estab = NULL;
774 struct net_conf *nc;
775
776 rcu_read_lock();
777 nc = rcu_dereference(connection->net_conf);
778 if (!nc) {
779 rcu_read_unlock();
780 return NULL;
781 }
782 connect_int = nc->connect_int;
783 rcu_read_unlock();
784
785 timeo = connect_int * HZ;
786 /* 28.5% random jitter */
787 timeo += (prandom_u32() & 1) ? timeo / 7 : -timeo / 7;
788
789 err = wait_for_completion_interruptible_timeout(&ad->door_bell, timeo);
790 if (err <= 0)
791 return NULL;
792
793 err = kernel_accept(ad->s_listen, &s_estab, 0);
794 if (err < 0) {
795 if (err != -EAGAIN && err != -EINTR && err != -ERESTARTSYS) {
796 drbd_err(connection, "accept failed, err = %d\n", err);
797 conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD);
798 }
799 }
800
801 if (s_estab)
802 unregister_state_change(s_estab->sk, ad);
803
804 return s_estab;
805 }
806
807 static int decode_header(struct drbd_connection *, void *, struct packet_info *);
808
809 static int send_first_packet(struct drbd_connection *connection, struct drbd_socket *sock,
810 enum drbd_packet cmd)
811 {
812 if (!conn_prepare_command(connection, sock))
813 return -EIO;
814 return conn_send_command(connection, sock, cmd, 0, NULL, 0);
815 }
816
817 static int receive_first_packet(struct drbd_connection *connection, struct socket *sock)
818 {
819 unsigned int header_size = drbd_header_size(connection);
820 struct packet_info pi;
821 struct net_conf *nc;
822 int err;
823
824 rcu_read_lock();
825 nc = rcu_dereference(connection->net_conf);
826 if (!nc) {
827 rcu_read_unlock();
828 return -EIO;
829 }
830 sock->sk->sk_rcvtimeo = nc->ping_timeo * 4 * HZ / 10;
831 rcu_read_unlock();
832
833 err = drbd_recv_short(sock, connection->data.rbuf, header_size, 0);
834 if (err != header_size) {
835 if (err >= 0)
836 err = -EIO;
837 return err;
838 }
839 err = decode_header(connection, connection->data.rbuf, &pi);
840 if (err)
841 return err;
842 return pi.cmd;
843 }
844
845 /**
846 * drbd_socket_okay() - Free the socket if its connection is not okay
847 * @sock: pointer to the pointer to the socket.
848 */
849 static bool drbd_socket_okay(struct socket **sock)
850 {
851 int rr;
852 char tb[4];
853
854 if (!*sock)
855 return false;
856
857 rr = drbd_recv_short(*sock, tb, 4, MSG_DONTWAIT | MSG_PEEK);
858
859 if (rr > 0 || rr == -EAGAIN) {
860 return true;
861 } else {
862 sock_release(*sock);
863 *sock = NULL;
864 return false;
865 }
866 }
867
868 static bool connection_established(struct drbd_connection *connection,
869 struct socket **sock1,
870 struct socket **sock2)
871 {
872 struct net_conf *nc;
873 int timeout;
874 bool ok;
875
876 if (!*sock1 || !*sock2)
877 return false;
878
879 rcu_read_lock();
880 nc = rcu_dereference(connection->net_conf);
881 timeout = (nc->sock_check_timeo ?: nc->ping_timeo) * HZ / 10;
882 rcu_read_unlock();
883 schedule_timeout_interruptible(timeout);
884
885 ok = drbd_socket_okay(sock1);
886 ok = drbd_socket_okay(sock2) && ok;
887
888 return ok;
889 }
890
891 /* Gets called if a connection is established, or if a new minor gets created
892 in a connection */
893 int drbd_connected(struct drbd_peer_device *peer_device)
894 {
895 struct drbd_device *device = peer_device->device;
896 int err;
897
898 atomic_set(&device->packet_seq, 0);
899 device->peer_seq = 0;
900
901 device->state_mutex = peer_device->connection->agreed_pro_version < 100 ?
902 &peer_device->connection->cstate_mutex :
903 &device->own_state_mutex;
904
905 err = drbd_send_sync_param(peer_device);
906 if (!err)
907 err = drbd_send_sizes(peer_device, 0, 0);
908 if (!err)
909 err = drbd_send_uuids(peer_device);
910 if (!err)
911 err = drbd_send_current_state(peer_device);
912 clear_bit(USE_DEGR_WFC_T, &device->flags);
913 clear_bit(RESIZE_PENDING, &device->flags);
914 atomic_set(&device->ap_in_flight, 0);
915 mod_timer(&device->request_timer, jiffies + HZ); /* just start it here. */
916 return err;
917 }
918
919 /*
920 * return values:
921 * 1 yes, we have a valid connection
922 * 0 oops, did not work out, please try again
923 * -1 peer talks different language,
924 * no point in trying again, please go standalone.
925 * -2 We do not have a network config...
926 */
927 static int conn_connect(struct drbd_connection *connection)
928 {
929 struct drbd_socket sock, msock;
930 struct drbd_peer_device *peer_device;
931 struct net_conf *nc;
932 int vnr, timeout, h;
933 bool discard_my_data, ok;
934 enum drbd_state_rv rv;
935 struct accept_wait_data ad = {
936 .connection = connection,
937 .door_bell = COMPLETION_INITIALIZER_ONSTACK(ad.door_bell),
938 };
939
940 clear_bit(DISCONNECT_SENT, &connection->flags);
941 if (conn_request_state(connection, NS(conn, C_WF_CONNECTION), CS_VERBOSE) < SS_SUCCESS)
942 return -2;
943
944 mutex_init(&sock.mutex);
945 sock.sbuf = connection->data.sbuf;
946 sock.rbuf = connection->data.rbuf;
947 sock.socket = NULL;
948 mutex_init(&msock.mutex);
949 msock.sbuf = connection->meta.sbuf;
950 msock.rbuf = connection->meta.rbuf;
951 msock.socket = NULL;
952
953 /* Assume that the peer only understands protocol 80 until we know better. */
954 connection->agreed_pro_version = 80;
955
956 if (prepare_listen_socket(connection, &ad))
957 return 0;
958
959 do {
960 struct socket *s;
961
962 s = drbd_try_connect(connection);
963 if (s) {
964 if (!sock.socket) {
965 sock.socket = s;
966 send_first_packet(connection, &sock, P_INITIAL_DATA);
967 } else if (!msock.socket) {
968 clear_bit(RESOLVE_CONFLICTS, &connection->flags);
969 msock.socket = s;
970 send_first_packet(connection, &msock, P_INITIAL_META);
971 } else {
972 drbd_err(connection, "Logic error in conn_connect()\n");
973 goto out_release_sockets;
974 }
975 }
976
977 if (connection_established(connection, &sock.socket, &msock.socket))
978 break;
979
980 retry:
981 s = drbd_wait_for_connect(connection, &ad);
982 if (s) {
983 int fp = receive_first_packet(connection, s);
984 drbd_socket_okay(&sock.socket);
985 drbd_socket_okay(&msock.socket);
986 switch (fp) {
987 case P_INITIAL_DATA:
988 if (sock.socket) {
989 drbd_warn(connection, "initial packet S crossed\n");
990 sock_release(sock.socket);
991 sock.socket = s;
992 goto randomize;
993 }
994 sock.socket = s;
995 break;
996 case P_INITIAL_META:
997 set_bit(RESOLVE_CONFLICTS, &connection->flags);
998 if (msock.socket) {
999 drbd_warn(connection, "initial packet M crossed\n");
1000 sock_release(msock.socket);
1001 msock.socket = s;
1002 goto randomize;
1003 }
1004 msock.socket = s;
1005 break;
1006 default:
1007 drbd_warn(connection, "Error receiving initial packet\n");
1008 sock_release(s);
1009 randomize:
1010 if (prandom_u32() & 1)
1011 goto retry;
1012 }
1013 }
1014
1015 if (connection->cstate <= C_DISCONNECTING)
1016 goto out_release_sockets;
1017 if (signal_pending(current)) {
1018 flush_signals(current);
1019 smp_rmb();
1020 if (get_t_state(&connection->receiver) == EXITING)
1021 goto out_release_sockets;
1022 }
1023
1024 ok = connection_established(connection, &sock.socket, &msock.socket);
1025 } while (!ok);
1026
1027 if (ad.s_listen)
1028 sock_release(ad.s_listen);
1029
1030 sock.socket->sk->sk_reuse = SK_CAN_REUSE; /* SO_REUSEADDR */
1031 msock.socket->sk->sk_reuse = SK_CAN_REUSE; /* SO_REUSEADDR */
1032
1033 sock.socket->sk->sk_allocation = GFP_NOIO;
1034 msock.socket->sk->sk_allocation = GFP_NOIO;
1035
1036 sock.socket->sk->sk_priority = TC_PRIO_INTERACTIVE_BULK;
1037 msock.socket->sk->sk_priority = TC_PRIO_INTERACTIVE;
1038
1039 /* NOT YET ...
1040 * sock.socket->sk->sk_sndtimeo = connection->net_conf->timeout*HZ/10;
1041 * sock.socket->sk->sk_rcvtimeo = MAX_SCHEDULE_TIMEOUT;
1042 * first set it to the P_CONNECTION_FEATURES timeout,
1043 * which we set to 4x the configured ping_timeout. */
1044 rcu_read_lock();
1045 nc = rcu_dereference(connection->net_conf);
1046
1047 sock.socket->sk->sk_sndtimeo =
1048 sock.socket->sk->sk_rcvtimeo = nc->ping_timeo*4*HZ/10;
1049
1050 msock.socket->sk->sk_rcvtimeo = nc->ping_int*HZ;
1051 timeout = nc->timeout * HZ / 10;
1052 discard_my_data = nc->discard_my_data;
1053 rcu_read_unlock();
1054
1055 msock.socket->sk->sk_sndtimeo = timeout;
1056
1057 /* we don't want delays.
1058 * we use TCP_CORK where appropriate, though */
1059 drbd_tcp_nodelay(sock.socket);
1060 drbd_tcp_nodelay(msock.socket);
1061
1062 connection->data.socket = sock.socket;
1063 connection->meta.socket = msock.socket;
1064 connection->last_received = jiffies;
1065
1066 h = drbd_do_features(connection);
1067 if (h <= 0)
1068 return h;
1069
1070 if (connection->cram_hmac_tfm) {
1071 /* drbd_request_state(device, NS(conn, WFAuth)); */
1072 switch (drbd_do_auth(connection)) {
1073 case -1:
1074 drbd_err(connection, "Authentication of peer failed\n");
1075 return -1;
1076 case 0:
1077 drbd_err(connection, "Authentication of peer failed, trying again.\n");
1078 return 0;
1079 }
1080 }
1081
1082 connection->data.socket->sk->sk_sndtimeo = timeout;
1083 connection->data.socket->sk->sk_rcvtimeo = MAX_SCHEDULE_TIMEOUT;
1084
1085 if (drbd_send_protocol(connection) == -EOPNOTSUPP)
1086 return -1;
1087
1088 /* Prevent a race between resync-handshake and
1089 * being promoted to Primary.
1090 *
1091 * Grab and release the state mutex, so we know that any current
1092 * drbd_set_role() is finished, and any incoming drbd_set_role
1093 * will see the STATE_SENT flag, and wait for it to be cleared.
1094 */
1095 idr_for_each_entry(&connection->peer_devices, peer_device, vnr)
1096 mutex_lock(peer_device->device->state_mutex);
1097
1098 set_bit(STATE_SENT, &connection->flags);
1099
1100 idr_for_each_entry(&connection->peer_devices, peer_device, vnr)
1101 mutex_unlock(peer_device->device->state_mutex);
1102
1103 rcu_read_lock();
1104 idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
1105 struct drbd_device *device = peer_device->device;
1106 kref_get(&device->kref);
1107 rcu_read_unlock();
1108
1109 if (discard_my_data)
1110 set_bit(DISCARD_MY_DATA, &device->flags);
1111 else
1112 clear_bit(DISCARD_MY_DATA, &device->flags);
1113
1114 drbd_connected(peer_device);
1115 kref_put(&device->kref, drbd_destroy_device);
1116 rcu_read_lock();
1117 }
1118 rcu_read_unlock();
1119
1120 rv = conn_request_state(connection, NS(conn, C_WF_REPORT_PARAMS), CS_VERBOSE);
1121 if (rv < SS_SUCCESS || connection->cstate != C_WF_REPORT_PARAMS) {
1122 clear_bit(STATE_SENT, &connection->flags);
1123 return 0;
1124 }
1125
1126 drbd_thread_start(&connection->ack_receiver);
1127 /* opencoded create_singlethread_workqueue(),
1128 * to be able to use format string arguments */
1129 connection->ack_sender =
1130 alloc_ordered_workqueue("drbd_as_%s", WQ_MEM_RECLAIM, connection->resource->name);
1131 if (!connection->ack_sender) {
1132 drbd_err(connection, "Failed to create workqueue ack_sender\n");
1133 return 0;
1134 }
1135
1136 mutex_lock(&connection->resource->conf_update);
1137 /* The discard_my_data flag is a single-shot modifier to the next
1138 * connection attempt, the handshake of which is now well underway.
1139 * No need for rcu style copying of the whole struct
1140 * just to clear a single value. */
1141 connection->net_conf->discard_my_data = 0;
1142 mutex_unlock(&connection->resource->conf_update);
1143
1144 return h;
1145
1146 out_release_sockets:
1147 if (ad.s_listen)
1148 sock_release(ad.s_listen);
1149 if (sock.socket)
1150 sock_release(sock.socket);
1151 if (msock.socket)
1152 sock_release(msock.socket);
1153 return -1;
1154 }
1155
1156 static int decode_header(struct drbd_connection *connection, void *header, struct packet_info *pi)
1157 {
1158 unsigned int header_size = drbd_header_size(connection);
1159
1160 if (header_size == sizeof(struct p_header100) &&
1161 *(__be32 *)header == cpu_to_be32(DRBD_MAGIC_100)) {
1162 struct p_header100 *h = header;
1163 if (h->pad != 0) {
1164 drbd_err(connection, "Header padding is not zero\n");
1165 return -EINVAL;
1166 }
1167 pi->vnr = be16_to_cpu(h->volume);
1168 pi->cmd = be16_to_cpu(h->command);
1169 pi->size = be32_to_cpu(h->length);
1170 } else if (header_size == sizeof(struct p_header95) &&
1171 *(__be16 *)header == cpu_to_be16(DRBD_MAGIC_BIG)) {
1172 struct p_header95 *h = header;
1173 pi->cmd = be16_to_cpu(h->command);
1174 pi->size = be32_to_cpu(h->length);
1175 pi->vnr = 0;
1176 } else if (header_size == sizeof(struct p_header80) &&
1177 *(__be32 *)header == cpu_to_be32(DRBD_MAGIC)) {
1178 struct p_header80 *h = header;
1179 pi->cmd = be16_to_cpu(h->command);
1180 pi->size = be16_to_cpu(h->length);
1181 pi->vnr = 0;
1182 } else {
1183 drbd_err(connection, "Wrong magic value 0x%08x in protocol version %d\n",
1184 be32_to_cpu(*(__be32 *)header),
1185 connection->agreed_pro_version);
1186 return -EINVAL;
1187 }
1188 pi->data = header + header_size;
1189 return 0;
1190 }
1191
1192 static int drbd_recv_header(struct drbd_connection *connection, struct packet_info *pi)
1193 {
1194 void *buffer = connection->data.rbuf;
1195 int err;
1196
1197 err = drbd_recv_all_warn(connection, buffer, drbd_header_size(connection));
1198 if (err)
1199 return err;
1200
1201 err = decode_header(connection, buffer, pi);
1202 connection->last_received = jiffies;
1203
1204 return err;
1205 }
1206
1207 static void drbd_flush(struct drbd_connection *connection)
1208 {
1209 int rv;
1210 struct drbd_peer_device *peer_device;
1211 int vnr;
1212
1213 if (connection->resource->write_ordering >= WO_BDEV_FLUSH) {
1214 rcu_read_lock();
1215 idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
1216 struct drbd_device *device = peer_device->device;
1217
1218 if (!get_ldev(device))
1219 continue;
1220 kref_get(&device->kref);
1221 rcu_read_unlock();
1222
1223 /* Right now, we have only this one synchronous code path
1224 * for flushes between request epochs.
1225 * We may want to make those asynchronous,
1226 * or at least parallelize the flushes to the volume devices.
1227 */
1228 device->flush_jif = jiffies;
1229 set_bit(FLUSH_PENDING, &device->flags);
1230 rv = blkdev_issue_flush(device->ldev->backing_bdev,
1231 GFP_NOIO, NULL);
1232 clear_bit(FLUSH_PENDING, &device->flags);
1233 if (rv) {
1234 drbd_info(device, "local disk flush failed with status %d\n", rv);
1235 /* would rather check on EOPNOTSUPP, but that is not reliable.
1236 * don't try again for ANY return value != 0
1237 * if (rv == -EOPNOTSUPP) */
1238 drbd_bump_write_ordering(connection->resource, NULL, WO_DRAIN_IO);
1239 }
1240 put_ldev(device);
1241 kref_put(&device->kref, drbd_destroy_device);
1242
1243 rcu_read_lock();
1244 if (rv)
1245 break;
1246 }
1247 rcu_read_unlock();
1248 }
1249 }
1250
1251 /**
1252 * drbd_may_finish_epoch() - Applies an epoch_event to the epoch's state, eventually finishes it.
1253 * @device: DRBD device.
1254 * @epoch: Epoch object.
1255 * @ev: Epoch event.
1256 */
1257 static enum finish_epoch drbd_may_finish_epoch(struct drbd_connection *connection,
1258 struct drbd_epoch *epoch,
1259 enum epoch_event ev)
1260 {
1261 int epoch_size;
1262 struct drbd_epoch *next_epoch;
1263 enum finish_epoch rv = FE_STILL_LIVE;
1264
1265 spin_lock(&connection->epoch_lock);
1266 do {
1267 next_epoch = NULL;
1268
1269 epoch_size = atomic_read(&epoch->epoch_size);
1270
1271 switch (ev & ~EV_CLEANUP) {
1272 case EV_PUT:
1273 atomic_dec(&epoch->active);
1274 break;
1275 case EV_GOT_BARRIER_NR:
1276 set_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags);
1277 break;
1278 case EV_BECAME_LAST:
1279 /* nothing to do*/
1280 break;
1281 }
1282
1283 if (epoch_size != 0 &&
1284 atomic_read(&epoch->active) == 0 &&
1285 (test_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags) || ev & EV_CLEANUP)) {
1286 if (!(ev & EV_CLEANUP)) {
1287 spin_unlock(&connection->epoch_lock);
1288 drbd_send_b_ack(epoch->connection, epoch->barrier_nr, epoch_size);
1289 spin_lock(&connection->epoch_lock);
1290 }
1291 #if 0
1292 /* FIXME: dec unacked on connection, once we have
1293 * something to count pending connection packets in. */
1294 if (test_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags))
1295 dec_unacked(epoch->connection);
1296 #endif
1297
1298 if (connection->current_epoch != epoch) {
1299 next_epoch = list_entry(epoch->list.next, struct drbd_epoch, list);
1300 list_del(&epoch->list);
1301 ev = EV_BECAME_LAST | (ev & EV_CLEANUP);
1302 connection->epochs--;
1303 kfree(epoch);
1304
1305 if (rv == FE_STILL_LIVE)
1306 rv = FE_DESTROYED;
1307 } else {
1308 epoch->flags = 0;
1309 atomic_set(&epoch->epoch_size, 0);
1310 /* atomic_set(&epoch->active, 0); is already zero */
1311 if (rv == FE_STILL_LIVE)
1312 rv = FE_RECYCLED;
1313 }
1314 }
1315
1316 if (!next_epoch)
1317 break;
1318
1319 epoch = next_epoch;
1320 } while (1);
1321
1322 spin_unlock(&connection->epoch_lock);
1323
1324 return rv;
1325 }
1326
1327 static enum write_ordering_e
1328 max_allowed_wo(struct drbd_backing_dev *bdev, enum write_ordering_e wo)
1329 {
1330 struct disk_conf *dc;
1331
1332 dc = rcu_dereference(bdev->disk_conf);
1333
1334 if (wo == WO_BDEV_FLUSH && !dc->disk_flushes)
1335 wo = WO_DRAIN_IO;
1336 if (wo == WO_DRAIN_IO && !dc->disk_drain)
1337 wo = WO_NONE;
1338
1339 return wo;
1340 }
1341
1342 /**
1343 * drbd_bump_write_ordering() - Fall back to an other write ordering method
1344 * @connection: DRBD connection.
1345 * @wo: Write ordering method to try.
1346 */
1347 void drbd_bump_write_ordering(struct drbd_resource *resource, struct drbd_backing_dev *bdev,
1348 enum write_ordering_e wo)
1349 {
1350 struct drbd_device *device;
1351 enum write_ordering_e pwo;
1352 int vnr;
1353 static char *write_ordering_str[] = {
1354 [WO_NONE] = "none",
1355 [WO_DRAIN_IO] = "drain",
1356 [WO_BDEV_FLUSH] = "flush",
1357 };
1358
1359 pwo = resource->write_ordering;
1360 if (wo != WO_BDEV_FLUSH)
1361 wo = min(pwo, wo);
1362 rcu_read_lock();
1363 idr_for_each_entry(&resource->devices, device, vnr) {
1364 if (get_ldev(device)) {
1365 wo = max_allowed_wo(device->ldev, wo);
1366 if (device->ldev == bdev)
1367 bdev = NULL;
1368 put_ldev(device);
1369 }
1370 }
1371
1372 if (bdev)
1373 wo = max_allowed_wo(bdev, wo);
1374
1375 rcu_read_unlock();
1376
1377 resource->write_ordering = wo;
1378 if (pwo != resource->write_ordering || wo == WO_BDEV_FLUSH)
1379 drbd_info(resource, "Method to ensure write ordering: %s\n", write_ordering_str[resource->write_ordering]);
1380 }
1381
1382 /**
1383 * drbd_submit_peer_request()
1384 * @device: DRBD device.
1385 * @peer_req: peer request
1386 * @rw: flag field, see bio->bi_rw
1387 *
1388 * May spread the pages to multiple bios,
1389 * depending on bio_add_page restrictions.
1390 *
1391 * Returns 0 if all bios have been submitted,
1392 * -ENOMEM if we could not allocate enough bios,
1393 * -ENOSPC (any better suggestion?) if we have not been able to bio_add_page a
1394 * single page to an empty bio (which should never happen and likely indicates
1395 * that the lower level IO stack is in some way broken). This has been observed
1396 * on certain Xen deployments.
1397 */
1398 /* TODO allocate from our own bio_set. */
1399 int drbd_submit_peer_request(struct drbd_device *device,
1400 struct drbd_peer_request *peer_req,
1401 const unsigned op, const unsigned op_flags,
1402 const int fault_type)
1403 {
1404 struct bio *bios = NULL;
1405 struct bio *bio;
1406 struct page *page = peer_req->pages;
1407 sector_t sector = peer_req->i.sector;
1408 unsigned data_size = peer_req->i.size;
1409 unsigned n_bios = 0;
1410 unsigned nr_pages = (data_size + PAGE_SIZE -1) >> PAGE_SHIFT;
1411 int err = -ENOMEM;
1412
1413 if (peer_req->flags & EE_IS_TRIM_USE_ZEROOUT) {
1414 /* wait for all pending IO completions, before we start
1415 * zeroing things out. */
1416 conn_wait_active_ee_empty(peer_req->peer_device->connection);
1417 /* add it to the active list now,
1418 * so we can find it to present it in debugfs */
1419 peer_req->submit_jif = jiffies;
1420 peer_req->flags |= EE_SUBMITTED;
1421 spin_lock_irq(&device->resource->req_lock);
1422 list_add_tail(&peer_req->w.list, &device->active_ee);
1423 spin_unlock_irq(&device->resource->req_lock);
1424 if (blkdev_issue_zeroout(device->ldev->backing_bdev,
1425 sector, data_size >> 9, GFP_NOIO, false))
1426 peer_req->flags |= EE_WAS_ERROR;
1427 drbd_endio_write_sec_final(peer_req);
1428 return 0;
1429 }
1430
1431 /* Discards don't have any payload.
1432 * But the scsi layer still expects a bio_vec it can use internally,
1433 * see sd_setup_discard_cmnd() and blk_add_request_payload(). */
1434 if (peer_req->flags & EE_IS_TRIM)
1435 nr_pages = 1;
1436
1437 /* In most cases, we will only need one bio. But in case the lower
1438 * level restrictions happen to be different at this offset on this
1439 * side than those of the sending peer, we may need to submit the
1440 * request in more than one bio.
1441 *
1442 * Plain bio_alloc is good enough here, this is no DRBD internally
1443 * generated bio, but a bio allocated on behalf of the peer.
1444 */
1445 next_bio:
1446 bio = bio_alloc(GFP_NOIO, nr_pages);
1447 if (!bio) {
1448 drbd_err(device, "submit_ee: Allocation of a bio failed (nr_pages=%u)\n", nr_pages);
1449 goto fail;
1450 }
1451 /* > peer_req->i.sector, unless this is the first bio */
1452 bio->bi_iter.bi_sector = sector;
1453 bio->bi_bdev = device->ldev->backing_bdev;
1454 bio_set_op_attrs(bio, op, op_flags);
1455 bio->bi_private = peer_req;
1456 bio->bi_end_io = drbd_peer_request_endio;
1457
1458 bio->bi_next = bios;
1459 bios = bio;
1460 ++n_bios;
1461
1462 if (op == REQ_OP_DISCARD) {
1463 bio->bi_iter.bi_size = data_size;
1464 goto submit;
1465 }
1466
1467 page_chain_for_each(page) {
1468 unsigned len = min_t(unsigned, data_size, PAGE_SIZE);
1469 if (!bio_add_page(bio, page, len, 0)) {
1470 /* A single page must always be possible!
1471 * But in case it fails anyways,
1472 * we deal with it, and complain (below). */
1473 if (bio->bi_vcnt == 0) {
1474 drbd_err(device,
1475 "bio_add_page failed for len=%u, "
1476 "bi_vcnt=0 (bi_sector=%llu)\n",
1477 len, (uint64_t)bio->bi_iter.bi_sector);
1478 err = -ENOSPC;
1479 goto fail;
1480 }
1481 goto next_bio;
1482 }
1483 data_size -= len;
1484 sector += len >> 9;
1485 --nr_pages;
1486 }
1487 D_ASSERT(device, data_size == 0);
1488 submit:
1489 D_ASSERT(device, page == NULL);
1490
1491 atomic_set(&peer_req->pending_bios, n_bios);
1492 /* for debugfs: update timestamp, mark as submitted */
1493 peer_req->submit_jif = jiffies;
1494 peer_req->flags |= EE_SUBMITTED;
1495 do {
1496 bio = bios;
1497 bios = bios->bi_next;
1498 bio->bi_next = NULL;
1499
1500 drbd_generic_make_request(device, fault_type, bio);
1501 } while (bios);
1502 return 0;
1503
1504 fail:
1505 while (bios) {
1506 bio = bios;
1507 bios = bios->bi_next;
1508 bio_put(bio);
1509 }
1510 return err;
1511 }
1512
1513 static void drbd_remove_epoch_entry_interval(struct drbd_device *device,
1514 struct drbd_peer_request *peer_req)
1515 {
1516 struct drbd_interval *i = &peer_req->i;
1517
1518 drbd_remove_interval(&device->write_requests, i);
1519 drbd_clear_interval(i);
1520
1521 /* Wake up any processes waiting for this peer request to complete. */
1522 if (i->waiting)
1523 wake_up(&device->misc_wait);
1524 }
1525
1526 static void conn_wait_active_ee_empty(struct drbd_connection *connection)
1527 {
1528 struct drbd_peer_device *peer_device;
1529 int vnr;
1530
1531 rcu_read_lock();
1532 idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
1533 struct drbd_device *device = peer_device->device;
1534
1535 kref_get(&device->kref);
1536 rcu_read_unlock();
1537 drbd_wait_ee_list_empty(device, &device->active_ee);
1538 kref_put(&device->kref, drbd_destroy_device);
1539 rcu_read_lock();
1540 }
1541 rcu_read_unlock();
1542 }
1543
1544 static int receive_Barrier(struct drbd_connection *connection, struct packet_info *pi)
1545 {
1546 int rv;
1547 struct p_barrier *p = pi->data;
1548 struct drbd_epoch *epoch;
1549
1550 /* FIXME these are unacked on connection,
1551 * not a specific (peer)device.
1552 */
1553 connection->current_epoch->barrier_nr = p->barrier;
1554 connection->current_epoch->connection = connection;
1555 rv = drbd_may_finish_epoch(connection, connection->current_epoch, EV_GOT_BARRIER_NR);
1556
1557 /* P_BARRIER_ACK may imply that the corresponding extent is dropped from
1558 * the activity log, which means it would not be resynced in case the
1559 * R_PRIMARY crashes now.
1560 * Therefore we must send the barrier_ack after the barrier request was
1561 * completed. */
1562 switch (connection->resource->write_ordering) {
1563 case WO_NONE:
1564 if (rv == FE_RECYCLED)
1565 return 0;
1566
1567 /* receiver context, in the writeout path of the other node.
1568 * avoid potential distributed deadlock */
1569 epoch = kmalloc(sizeof(struct drbd_epoch), GFP_NOIO);
1570 if (epoch)
1571 break;
1572 else
1573 drbd_warn(connection, "Allocation of an epoch failed, slowing down\n");
1574 /* Fall through */
1575
1576 case WO_BDEV_FLUSH:
1577 case WO_DRAIN_IO:
1578 conn_wait_active_ee_empty(connection);
1579 drbd_flush(connection);
1580
1581 if (atomic_read(&connection->current_epoch->epoch_size)) {
1582 epoch = kmalloc(sizeof(struct drbd_epoch), GFP_NOIO);
1583 if (epoch)
1584 break;
1585 }
1586
1587 return 0;
1588 default:
1589 drbd_err(connection, "Strangeness in connection->write_ordering %d\n",
1590 connection->resource->write_ordering);
1591 return -EIO;
1592 }
1593
1594 epoch->flags = 0;
1595 atomic_set(&epoch->epoch_size, 0);
1596 atomic_set(&epoch->active, 0);
1597
1598 spin_lock(&connection->epoch_lock);
1599 if (atomic_read(&connection->current_epoch->epoch_size)) {
1600 list_add(&epoch->list, &connection->current_epoch->list);
1601 connection->current_epoch = epoch;
1602 connection->epochs++;
1603 } else {
1604 /* The current_epoch got recycled while we allocated this one... */
1605 kfree(epoch);
1606 }
1607 spin_unlock(&connection->epoch_lock);
1608
1609 return 0;
1610 }
1611
1612 /* used from receive_RSDataReply (recv_resync_read)
1613 * and from receive_Data */
1614 static struct drbd_peer_request *
1615 read_in_block(struct drbd_peer_device *peer_device, u64 id, sector_t sector,
1616 struct packet_info *pi) __must_hold(local)
1617 {
1618 struct drbd_device *device = peer_device->device;
1619 const sector_t capacity = drbd_get_capacity(device->this_bdev);
1620 struct drbd_peer_request *peer_req;
1621 struct page *page;
1622 int digest_size, err;
1623 unsigned int data_size = pi->size, ds;
1624 void *dig_in = peer_device->connection->int_dig_in;
1625 void *dig_vv = peer_device->connection->int_dig_vv;
1626 unsigned long *data;
1627 struct p_trim *trim = (pi->cmd == P_TRIM) ? pi->data : NULL;
1628
1629 digest_size = 0;
1630 if (!trim && peer_device->connection->peer_integrity_tfm) {
1631 digest_size = crypto_ahash_digestsize(peer_device->connection->peer_integrity_tfm);
1632 /*
1633 * FIXME: Receive the incoming digest into the receive buffer
1634 * here, together with its struct p_data?
1635 */
1636 err = drbd_recv_all_warn(peer_device->connection, dig_in, digest_size);
1637 if (err)
1638 return NULL;
1639 data_size -= digest_size;
1640 }
1641
1642 if (trim) {
1643 D_ASSERT(peer_device, data_size == 0);
1644 data_size = be32_to_cpu(trim->size);
1645 }
1646
1647 if (!expect(IS_ALIGNED(data_size, 512)))
1648 return NULL;
1649 /* prepare for larger trim requests. */
1650 if (!trim && !expect(data_size <= DRBD_MAX_BIO_SIZE))
1651 return NULL;
1652
1653 /* even though we trust out peer,
1654 * we sometimes have to double check. */
1655 if (sector + (data_size>>9) > capacity) {
1656 drbd_err(device, "request from peer beyond end of local disk: "
1657 "capacity: %llus < sector: %llus + size: %u\n",
1658 (unsigned long long)capacity,
1659 (unsigned long long)sector, data_size);
1660 return NULL;
1661 }
1662
1663 /* GFP_NOIO, because we must not cause arbitrary write-out: in a DRBD
1664 * "criss-cross" setup, that might cause write-out on some other DRBD,
1665 * which in turn might block on the other node at this very place. */
1666 peer_req = drbd_alloc_peer_req(peer_device, id, sector, data_size, trim == NULL, GFP_NOIO);
1667 if (!peer_req)
1668 return NULL;
1669
1670 peer_req->flags |= EE_WRITE;
1671 if (trim)
1672 return peer_req;
1673
1674 ds = data_size;
1675 page = peer_req->pages;
1676 page_chain_for_each(page) {
1677 unsigned len = min_t(int, ds, PAGE_SIZE);
1678 data = kmap(page);
1679 err = drbd_recv_all_warn(peer_device->connection, data, len);
1680 if (drbd_insert_fault(device, DRBD_FAULT_RECEIVE)) {
1681 drbd_err(device, "Fault injection: Corrupting data on receive\n");
1682 data[0] = data[0] ^ (unsigned long)-1;
1683 }
1684 kunmap(page);
1685 if (err) {
1686 drbd_free_peer_req(device, peer_req);
1687 return NULL;
1688 }
1689 ds -= len;
1690 }
1691
1692 if (digest_size) {
1693 drbd_csum_ee(peer_device->connection->peer_integrity_tfm, peer_req, dig_vv);
1694 if (memcmp(dig_in, dig_vv, digest_size)) {
1695 drbd_err(device, "Digest integrity check FAILED: %llus +%u\n",
1696 (unsigned long long)sector, data_size);
1697 drbd_free_peer_req(device, peer_req);
1698 return NULL;
1699 }
1700 }
1701 device->recv_cnt += data_size >> 9;
1702 return peer_req;
1703 }
1704
1705 /* drbd_drain_block() just takes a data block
1706 * out of the socket input buffer, and discards it.
1707 */
1708 static int drbd_drain_block(struct drbd_peer_device *peer_device, int data_size)
1709 {
1710 struct page *page;
1711 int err = 0;
1712 void *data;
1713
1714 if (!data_size)
1715 return 0;
1716
1717 page = drbd_alloc_pages(peer_device, 1, 1);
1718
1719 data = kmap(page);
1720 while (data_size) {
1721 unsigned int len = min_t(int, data_size, PAGE_SIZE);
1722
1723 err = drbd_recv_all_warn(peer_device->connection, data, len);
1724 if (err)
1725 break;
1726 data_size -= len;
1727 }
1728 kunmap(page);
1729 drbd_free_pages(peer_device->device, page, 0);
1730 return err;
1731 }
1732
1733 static int recv_dless_read(struct drbd_peer_device *peer_device, struct drbd_request *req,
1734 sector_t sector, int data_size)
1735 {
1736 struct bio_vec bvec;
1737 struct bvec_iter iter;
1738 struct bio *bio;
1739 int digest_size, err, expect;
1740 void *dig_in = peer_device->connection->int_dig_in;
1741 void *dig_vv = peer_device->connection->int_dig_vv;
1742
1743 digest_size = 0;
1744 if (peer_device->connection->peer_integrity_tfm) {
1745 digest_size = crypto_ahash_digestsize(peer_device->connection->peer_integrity_tfm);
1746 err = drbd_recv_all_warn(peer_device->connection, dig_in, digest_size);
1747 if (err)
1748 return err;
1749 data_size -= digest_size;
1750 }
1751
1752 /* optimistically update recv_cnt. if receiving fails below,
1753 * we disconnect anyways, and counters will be reset. */
1754 peer_device->device->recv_cnt += data_size>>9;
1755
1756 bio = req->master_bio;
1757 D_ASSERT(peer_device->device, sector == bio->bi_iter.bi_sector);
1758
1759 bio_for_each_segment(bvec, bio, iter) {
1760 void *mapped = kmap(bvec.bv_page) + bvec.bv_offset;
1761 expect = min_t(int, data_size, bvec.bv_len);
1762 err = drbd_recv_all_warn(peer_device->connection, mapped, expect);
1763 kunmap(bvec.bv_page);
1764 if (err)
1765 return err;
1766 data_size -= expect;
1767 }
1768
1769 if (digest_size) {
1770 drbd_csum_bio(peer_device->connection->peer_integrity_tfm, bio, dig_vv);
1771 if (memcmp(dig_in, dig_vv, digest_size)) {
1772 drbd_err(peer_device, "Digest integrity check FAILED. Broken NICs?\n");
1773 return -EINVAL;
1774 }
1775 }
1776
1777 D_ASSERT(peer_device->device, data_size == 0);
1778 return 0;
1779 }
1780
1781 /*
1782 * e_end_resync_block() is called in ack_sender context via
1783 * drbd_finish_peer_reqs().
1784 */
1785 static int e_end_resync_block(struct drbd_work *w, int unused)
1786 {
1787 struct drbd_peer_request *peer_req =
1788 container_of(w, struct drbd_peer_request, w);
1789 struct drbd_peer_device *peer_device = peer_req->peer_device;
1790 struct drbd_device *device = peer_device->device;
1791 sector_t sector = peer_req->i.sector;
1792 int err;
1793
1794 D_ASSERT(device, drbd_interval_empty(&peer_req->i));
1795
1796 if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
1797 drbd_set_in_sync(device, sector, peer_req->i.size);
1798 err = drbd_send_ack(peer_device, P_RS_WRITE_ACK, peer_req);
1799 } else {
1800 /* Record failure to sync */
1801 drbd_rs_failed_io(device, sector, peer_req->i.size);
1802
1803 err = drbd_send_ack(peer_device, P_NEG_ACK, peer_req);
1804 }
1805 dec_unacked(device);
1806
1807 return err;
1808 }
1809
1810 static int recv_resync_read(struct drbd_peer_device *peer_device, sector_t sector,
1811 struct packet_info *pi) __releases(local)
1812 {
1813 struct drbd_device *device = peer_device->device;
1814 struct drbd_peer_request *peer_req;
1815
1816 peer_req = read_in_block(peer_device, ID_SYNCER, sector, pi);
1817 if (!peer_req)
1818 goto fail;
1819
1820 dec_rs_pending(device);
1821
1822 inc_unacked(device);
1823 /* corresponding dec_unacked() in e_end_resync_block()
1824 * respective _drbd_clear_done_ee */
1825
1826 peer_req->w.cb = e_end_resync_block;
1827 peer_req->submit_jif = jiffies;
1828
1829 spin_lock_irq(&device->resource->req_lock);
1830 list_add_tail(&peer_req->w.list, &device->sync_ee);
1831 spin_unlock_irq(&device->resource->req_lock);
1832
1833 atomic_add(pi->size >> 9, &device->rs_sect_ev);
1834 if (drbd_submit_peer_request(device, peer_req, REQ_OP_WRITE, 0,
1835 DRBD_FAULT_RS_WR) == 0)
1836 return 0;
1837
1838 /* don't care for the reason here */
1839 drbd_err(device, "submit failed, triggering re-connect\n");
1840 spin_lock_irq(&device->resource->req_lock);
1841 list_del(&peer_req->w.list);
1842 spin_unlock_irq(&device->resource->req_lock);
1843
1844 drbd_free_peer_req(device, peer_req);
1845 fail:
1846 put_ldev(device);
1847 return -EIO;
1848 }
1849
1850 static struct drbd_request *
1851 find_request(struct drbd_device *device, struct rb_root *root, u64 id,
1852 sector_t sector, bool missing_ok, const char *func)
1853 {
1854 struct drbd_request *req;
1855
1856 /* Request object according to our peer */
1857 req = (struct drbd_request *)(unsigned long)id;
1858 if (drbd_contains_interval(root, sector, &req->i) && req->i.local)
1859 return req;
1860 if (!missing_ok) {
1861 drbd_err(device, "%s: failed to find request 0x%lx, sector %llus\n", func,
1862 (unsigned long)id, (unsigned long long)sector);
1863 }
1864 return NULL;
1865 }
1866
1867 static int receive_DataReply(struct drbd_connection *connection, struct packet_info *pi)
1868 {
1869 struct drbd_peer_device *peer_device;
1870 struct drbd_device *device;
1871 struct drbd_request *req;
1872 sector_t sector;
1873 int err;
1874 struct p_data *p = pi->data;
1875
1876 peer_device = conn_peer_device(connection, pi->vnr);
1877 if (!peer_device)
1878 return -EIO;
1879 device = peer_device->device;
1880
1881 sector = be64_to_cpu(p->sector);
1882
1883 spin_lock_irq(&device->resource->req_lock);
1884 req = find_request(device, &device->read_requests, p->block_id, sector, false, __func__);
1885 spin_unlock_irq(&device->resource->req_lock);
1886 if (unlikely(!req))
1887 return -EIO;
1888
1889 /* hlist_del(&req->collision) is done in _req_may_be_done, to avoid
1890 * special casing it there for the various failure cases.
1891 * still no race with drbd_fail_pending_reads */
1892 err = recv_dless_read(peer_device, req, sector, pi->size);
1893 if (!err)
1894 req_mod(req, DATA_RECEIVED);
1895 /* else: nothing. handled from drbd_disconnect...
1896 * I don't think we may complete this just yet
1897 * in case we are "on-disconnect: freeze" */
1898
1899 return err;
1900 }
1901
1902 static int receive_RSDataReply(struct drbd_connection *connection, struct packet_info *pi)
1903 {
1904 struct drbd_peer_device *peer_device;
1905 struct drbd_device *device;
1906 sector_t sector;
1907 int err;
1908 struct p_data *p = pi->data;
1909
1910 peer_device = conn_peer_device(connection, pi->vnr);
1911 if (!peer_device)
1912 return -EIO;
1913 device = peer_device->device;
1914
1915 sector = be64_to_cpu(p->sector);
1916 D_ASSERT(device, p->block_id == ID_SYNCER);
1917
1918 if (get_ldev(device)) {
1919 /* data is submitted to disk within recv_resync_read.
1920 * corresponding put_ldev done below on error,
1921 * or in drbd_peer_request_endio. */
1922 err = recv_resync_read(peer_device, sector, pi);
1923 } else {
1924 if (__ratelimit(&drbd_ratelimit_state))
1925 drbd_err(device, "Can not write resync data to local disk.\n");
1926
1927 err = drbd_drain_block(peer_device, pi->size);
1928
1929 drbd_send_ack_dp(peer_device, P_NEG_ACK, p, pi->size);
1930 }
1931
1932 atomic_add(pi->size >> 9, &device->rs_sect_in);
1933
1934 return err;
1935 }
1936
1937 static void restart_conflicting_writes(struct drbd_device *device,
1938 sector_t sector, int size)
1939 {
1940 struct drbd_interval *i;
1941 struct drbd_request *req;
1942
1943 drbd_for_each_overlap(i, &device->write_requests, sector, size) {
1944 if (!i->local)
1945 continue;
1946 req = container_of(i, struct drbd_request, i);
1947 if (req->rq_state & RQ_LOCAL_PENDING ||
1948 !(req->rq_state & RQ_POSTPONED))
1949 continue;
1950 /* as it is RQ_POSTPONED, this will cause it to
1951 * be queued on the retry workqueue. */
1952 __req_mod(req, CONFLICT_RESOLVED, NULL);
1953 }
1954 }
1955
1956 /*
1957 * e_end_block() is called in ack_sender context via drbd_finish_peer_reqs().
1958 */
1959 static int e_end_block(struct drbd_work *w, int cancel)
1960 {
1961 struct drbd_peer_request *peer_req =
1962 container_of(w, struct drbd_peer_request, w);
1963 struct drbd_peer_device *peer_device = peer_req->peer_device;
1964 struct drbd_device *device = peer_device->device;
1965 sector_t sector = peer_req->i.sector;
1966 int err = 0, pcmd;
1967
1968 if (peer_req->flags & EE_SEND_WRITE_ACK) {
1969 if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
1970 pcmd = (device->state.conn >= C_SYNC_SOURCE &&
1971 device->state.conn <= C_PAUSED_SYNC_T &&
1972 peer_req->flags & EE_MAY_SET_IN_SYNC) ?
1973 P_RS_WRITE_ACK : P_WRITE_ACK;
1974 err = drbd_send_ack(peer_device, pcmd, peer_req);
1975 if (pcmd == P_RS_WRITE_ACK)
1976 drbd_set_in_sync(device, sector, peer_req->i.size);
1977 } else {
1978 err = drbd_send_ack(peer_device, P_NEG_ACK, peer_req);
1979 /* we expect it to be marked out of sync anyways...
1980 * maybe assert this? */
1981 }
1982 dec_unacked(device);
1983 }
1984
1985 /* we delete from the conflict detection hash _after_ we sent out the
1986 * P_WRITE_ACK / P_NEG_ACK, to get the sequence number right. */
1987 if (peer_req->flags & EE_IN_INTERVAL_TREE) {
1988 spin_lock_irq(&device->resource->req_lock);
1989 D_ASSERT(device, !drbd_interval_empty(&peer_req->i));
1990 drbd_remove_epoch_entry_interval(device, peer_req);
1991 if (peer_req->flags & EE_RESTART_REQUESTS)
1992 restart_conflicting_writes(device, sector, peer_req->i.size);
1993 spin_unlock_irq(&device->resource->req_lock);
1994 } else
1995 D_ASSERT(device, drbd_interval_empty(&peer_req->i));
1996
1997 drbd_may_finish_epoch(peer_device->connection, peer_req->epoch, EV_PUT + (cancel ? EV_CLEANUP : 0));
1998
1999 return err;
2000 }
2001
2002 static int e_send_ack(struct drbd_work *w, enum drbd_packet ack)
2003 {
2004 struct drbd_peer_request *peer_req =
2005 container_of(w, struct drbd_peer_request, w);
2006 struct drbd_peer_device *peer_device = peer_req->peer_device;
2007 int err;
2008
2009 err = drbd_send_ack(peer_device, ack, peer_req);
2010 dec_unacked(peer_device->device);
2011
2012 return err;
2013 }
2014
2015 static int e_send_superseded(struct drbd_work *w, int unused)
2016 {
2017 return e_send_ack(w, P_SUPERSEDED);
2018 }
2019
2020 static int e_send_retry_write(struct drbd_work *w, int unused)
2021 {
2022 struct drbd_peer_request *peer_req =
2023 container_of(w, struct drbd_peer_request, w);
2024 struct drbd_connection *connection = peer_req->peer_device->connection;
2025
2026 return e_send_ack(w, connection->agreed_pro_version >= 100 ?
2027 P_RETRY_WRITE : P_SUPERSEDED);
2028 }
2029
2030 static bool seq_greater(u32 a, u32 b)
2031 {
2032 /*
2033 * We assume 32-bit wrap-around here.
2034 * For 24-bit wrap-around, we would have to shift:
2035 * a <<= 8; b <<= 8;
2036 */
2037 return (s32)a - (s32)b > 0;
2038 }
2039
2040 static u32 seq_max(u32 a, u32 b)
2041 {
2042 return seq_greater(a, b) ? a : b;
2043 }
2044
2045 static void update_peer_seq(struct drbd_peer_device *peer_device, unsigned int peer_seq)
2046 {
2047 struct drbd_device *device = peer_device->device;
2048 unsigned int newest_peer_seq;
2049
2050 if (test_bit(RESOLVE_CONFLICTS, &peer_device->connection->flags)) {
2051 spin_lock(&device->peer_seq_lock);
2052 newest_peer_seq = seq_max(device->peer_seq, peer_seq);
2053 device->peer_seq = newest_peer_seq;
2054 spin_unlock(&device->peer_seq_lock);
2055 /* wake up only if we actually changed device->peer_seq */
2056 if (peer_seq == newest_peer_seq)
2057 wake_up(&device->seq_wait);
2058 }
2059 }
2060
2061 static inline int overlaps(sector_t s1, int l1, sector_t s2, int l2)
2062 {
2063 return !((s1 + (l1>>9) <= s2) || (s1 >= s2 + (l2>>9)));
2064 }
2065
2066 /* maybe change sync_ee into interval trees as well? */
2067 static bool overlapping_resync_write(struct drbd_device *device, struct drbd_peer_request *peer_req)
2068 {
2069 struct drbd_peer_request *rs_req;
2070 bool rv = 0;
2071
2072 spin_lock_irq(&device->resource->req_lock);
2073 list_for_each_entry(rs_req, &device->sync_ee, w.list) {
2074 if (overlaps(peer_req->i.sector, peer_req->i.size,
2075 rs_req->i.sector, rs_req->i.size)) {
2076 rv = 1;
2077 break;
2078 }
2079 }
2080 spin_unlock_irq(&device->resource->req_lock);
2081
2082 return rv;
2083 }
2084
2085 /* Called from receive_Data.
2086 * Synchronize packets on sock with packets on msock.
2087 *
2088 * This is here so even when a P_DATA packet traveling via sock overtook an Ack
2089 * packet traveling on msock, they are still processed in the order they have
2090 * been sent.
2091 *
2092 * Note: we don't care for Ack packets overtaking P_DATA packets.
2093 *
2094 * In case packet_seq is larger than device->peer_seq number, there are
2095 * outstanding packets on the msock. We wait for them to arrive.
2096 * In case we are the logically next packet, we update device->peer_seq
2097 * ourselves. Correctly handles 32bit wrap around.
2098 *
2099 * Assume we have a 10 GBit connection, that is about 1<<30 byte per second,
2100 * about 1<<21 sectors per second. So "worst" case, we have 1<<3 == 8 seconds
2101 * for the 24bit wrap (historical atomic_t guarantee on some archs), and we have
2102 * 1<<9 == 512 seconds aka ages for the 32bit wrap around...
2103 *
2104 * returns 0 if we may process the packet,
2105 * -ERESTARTSYS if we were interrupted (by disconnect signal). */
2106 static int wait_for_and_update_peer_seq(struct drbd_peer_device *peer_device, const u32 peer_seq)
2107 {
2108 struct drbd_device *device = peer_device->device;
2109 DEFINE_WAIT(wait);
2110 long timeout;
2111 int ret = 0, tp;
2112
2113 if (!test_bit(RESOLVE_CONFLICTS, &peer_device->connection->flags))
2114 return 0;
2115
2116 spin_lock(&device->peer_seq_lock);
2117 for (;;) {
2118 if (!seq_greater(peer_seq - 1, device->peer_seq)) {
2119 device->peer_seq = seq_max(device->peer_seq, peer_seq);
2120 break;
2121 }
2122
2123 if (signal_pending(current)) {
2124 ret = -ERESTARTSYS;
2125 break;
2126 }
2127
2128 rcu_read_lock();
2129 tp = rcu_dereference(peer_device->connection->net_conf)->two_primaries;
2130 rcu_read_unlock();
2131
2132 if (!tp)
2133 break;
2134
2135 /* Only need to wait if two_primaries is enabled */
2136 prepare_to_wait(&device->seq_wait, &wait, TASK_INTERRUPTIBLE);
2137 spin_unlock(&device->peer_seq_lock);
2138 rcu_read_lock();
2139 timeout = rcu_dereference(peer_device->connection->net_conf)->ping_timeo*HZ/10;
2140 rcu_read_unlock();
2141 timeout = schedule_timeout(timeout);
2142 spin_lock(&device->peer_seq_lock);
2143 if (!timeout) {
2144 ret = -ETIMEDOUT;
2145 drbd_err(device, "Timed out waiting for missing ack packets; disconnecting\n");
2146 break;
2147 }
2148 }
2149 spin_unlock(&device->peer_seq_lock);
2150 finish_wait(&device->seq_wait, &wait);
2151 return ret;
2152 }
2153
2154 /* see also bio_flags_to_wire()
2155 * DRBD_REQ_*, because we need to semantically map the flags to data packet
2156 * flags and back. We may replicate to other kernel versions. */
2157 static unsigned long wire_flags_to_bio_flags(u32 dpf)
2158 {
2159 return (dpf & DP_RW_SYNC ? REQ_SYNC : 0) |
2160 (dpf & DP_FUA ? REQ_FUA : 0) |
2161 (dpf & DP_FLUSH ? REQ_PREFLUSH : 0);
2162 }
2163
2164 static unsigned long wire_flags_to_bio_op(u32 dpf)
2165 {
2166 if (dpf & DP_DISCARD)
2167 return REQ_OP_DISCARD;
2168 else
2169 return REQ_OP_WRITE;
2170 }
2171
2172 static void fail_postponed_requests(struct drbd_device *device, sector_t sector,
2173 unsigned int size)
2174 {
2175 struct drbd_interval *i;
2176
2177 repeat:
2178 drbd_for_each_overlap(i, &device->write_requests, sector, size) {
2179 struct drbd_request *req;
2180 struct bio_and_error m;
2181
2182 if (!i->local)
2183 continue;
2184 req = container_of(i, struct drbd_request, i);
2185 if (!(req->rq_state & RQ_POSTPONED))
2186 continue;
2187 req->rq_state &= ~RQ_POSTPONED;
2188 __req_mod(req, NEG_ACKED, &m);
2189 spin_unlock_irq(&device->resource->req_lock);
2190 if (m.bio)
2191 complete_master_bio(device, &m);
2192 spin_lock_irq(&device->resource->req_lock);
2193 goto repeat;
2194 }
2195 }
2196
2197 static int handle_write_conflicts(struct drbd_device *device,
2198 struct drbd_peer_request *peer_req)
2199 {
2200 struct drbd_connection *connection = peer_req->peer_device->connection;
2201 bool resolve_conflicts = test_bit(RESOLVE_CONFLICTS, &connection->flags);
2202 sector_t sector = peer_req->i.sector;
2203 const unsigned int size = peer_req->i.size;
2204 struct drbd_interval *i;
2205 bool equal;
2206 int err;
2207
2208 /*
2209 * Inserting the peer request into the write_requests tree will prevent
2210 * new conflicting local requests from being added.
2211 */
2212 drbd_insert_interval(&device->write_requests, &peer_req->i);
2213
2214 repeat:
2215 drbd_for_each_overlap(i, &device->write_requests, sector, size) {
2216 if (i == &peer_req->i)
2217 continue;
2218 if (i->completed)
2219 continue;
2220
2221 if (!i->local) {
2222 /*
2223 * Our peer has sent a conflicting remote request; this
2224 * should not happen in a two-node setup. Wait for the
2225 * earlier peer request to complete.
2226 */
2227 err = drbd_wait_misc(device, i);
2228 if (err)
2229 goto out;
2230 goto repeat;
2231 }
2232
2233 equal = i->sector == sector && i->size == size;
2234 if (resolve_conflicts) {
2235 /*
2236 * If the peer request is fully contained within the
2237 * overlapping request, it can be considered overwritten
2238 * and thus superseded; otherwise, it will be retried
2239 * once all overlapping requests have completed.
2240 */
2241 bool superseded = i->sector <= sector && i->sector +
2242 (i->size >> 9) >= sector + (size >> 9);
2243
2244 if (!equal)
2245 drbd_alert(device, "Concurrent writes detected: "
2246 "local=%llus +%u, remote=%llus +%u, "
2247 "assuming %s came first\n",
2248 (unsigned long long)i->sector, i->size,
2249 (unsigned long long)sector, size,
2250 superseded ? "local" : "remote");
2251
2252 peer_req->w.cb = superseded ? e_send_superseded :
2253 e_send_retry_write;
2254 list_add_tail(&peer_req->w.list, &device->done_ee);
2255 queue_work(connection->ack_sender, &peer_req->peer_device->send_acks_work);
2256
2257 err = -ENOENT;
2258 goto out;
2259 } else {
2260 struct drbd_request *req =
2261 container_of(i, struct drbd_request, i);
2262
2263 if (!equal)
2264 drbd_alert(device, "Concurrent writes detected: "
2265 "local=%llus +%u, remote=%llus +%u\n",
2266 (unsigned long long)i->sector, i->size,
2267 (unsigned long long)sector, size);
2268
2269 if (req->rq_state & RQ_LOCAL_PENDING ||
2270 !(req->rq_state & RQ_POSTPONED)) {
2271 /*
2272 * Wait for the node with the discard flag to
2273 * decide if this request has been superseded
2274 * or needs to be retried.
2275 * Requests that have been superseded will
2276 * disappear from the write_requests tree.
2277 *
2278 * In addition, wait for the conflicting
2279 * request to finish locally before submitting
2280 * the conflicting peer request.
2281 */
2282 err = drbd_wait_misc(device, &req->i);
2283 if (err) {
2284 _conn_request_state(connection, NS(conn, C_TIMEOUT), CS_HARD);
2285 fail_postponed_requests(device, sector, size);
2286 goto out;
2287 }
2288 goto repeat;
2289 }
2290 /*
2291 * Remember to restart the conflicting requests after
2292 * the new peer request has completed.
2293 */
2294 peer_req->flags |= EE_RESTART_REQUESTS;
2295 }
2296 }
2297 err = 0;
2298
2299 out:
2300 if (err)
2301 drbd_remove_epoch_entry_interval(device, peer_req);
2302 return err;
2303 }
2304
2305 /* mirrored write */
2306 static int receive_Data(struct drbd_connection *connection, struct packet_info *pi)
2307 {
2308 struct drbd_peer_device *peer_device;
2309 struct drbd_device *device;
2310 struct net_conf *nc;
2311 sector_t sector;
2312 struct drbd_peer_request *peer_req;
2313 struct p_data *p = pi->data;
2314 u32 peer_seq = be32_to_cpu(p->seq_num);
2315 int op, op_flags;
2316 u32 dp_flags;
2317 int err, tp;
2318
2319 peer_device = conn_peer_device(connection, pi->vnr);
2320 if (!peer_device)
2321 return -EIO;
2322 device = peer_device->device;
2323
2324 if (!get_ldev(device)) {
2325 int err2;
2326
2327 err = wait_for_and_update_peer_seq(peer_device, peer_seq);
2328 drbd_send_ack_dp(peer_device, P_NEG_ACK, p, pi->size);
2329 atomic_inc(&connection->current_epoch->epoch_size);
2330 err2 = drbd_drain_block(peer_device, pi->size);
2331 if (!err)
2332 err = err2;
2333 return err;
2334 }
2335
2336 /*
2337 * Corresponding put_ldev done either below (on various errors), or in
2338 * drbd_peer_request_endio, if we successfully submit the data at the
2339 * end of this function.
2340 */
2341
2342 sector = be64_to_cpu(p->sector);
2343 peer_req = read_in_block(peer_device, p->block_id, sector, pi);
2344 if (!peer_req) {
2345 put_ldev(device);
2346 return -EIO;
2347 }
2348
2349 peer_req->w.cb = e_end_block;
2350 peer_req->submit_jif = jiffies;
2351 peer_req->flags |= EE_APPLICATION;
2352
2353 dp_flags = be32_to_cpu(p->dp_flags);
2354 op = wire_flags_to_bio_op(dp_flags);
2355 op_flags = wire_flags_to_bio_flags(dp_flags);
2356 if (pi->cmd == P_TRIM) {
2357 struct request_queue *q = bdev_get_queue(device->ldev->backing_bdev);
2358 peer_req->flags |= EE_IS_TRIM;
2359 if (!blk_queue_discard(q))
2360 peer_req->flags |= EE_IS_TRIM_USE_ZEROOUT;
2361 D_ASSERT(peer_device, peer_req->i.size > 0);
2362 D_ASSERT(peer_device, op == REQ_OP_DISCARD);
2363 D_ASSERT(peer_device, peer_req->pages == NULL);
2364 } else if (peer_req->pages == NULL) {
2365 D_ASSERT(device, peer_req->i.size == 0);
2366 D_ASSERT(device, dp_flags & DP_FLUSH);
2367 }
2368
2369 if (dp_flags & DP_MAY_SET_IN_SYNC)
2370 peer_req->flags |= EE_MAY_SET_IN_SYNC;
2371
2372 spin_lock(&connection->epoch_lock);
2373 peer_req->epoch = connection->current_epoch;
2374 atomic_inc(&peer_req->epoch->epoch_size);
2375 atomic_inc(&peer_req->epoch->active);
2376 spin_unlock(&connection->epoch_lock);
2377
2378 rcu_read_lock();
2379 nc = rcu_dereference(peer_device->connection->net_conf);
2380 tp = nc->two_primaries;
2381 if (peer_device->connection->agreed_pro_version < 100) {
2382 switch (nc->wire_protocol) {
2383 case DRBD_PROT_C:
2384 dp_flags |= DP_SEND_WRITE_ACK;
2385 break;
2386 case DRBD_PROT_B:
2387 dp_flags |= DP_SEND_RECEIVE_ACK;
2388 break;
2389 }
2390 }
2391 rcu_read_unlock();
2392
2393 if (dp_flags & DP_SEND_WRITE_ACK) {
2394 peer_req->flags |= EE_SEND_WRITE_ACK;
2395 inc_unacked(device);
2396 /* corresponding dec_unacked() in e_end_block()
2397 * respective _drbd_clear_done_ee */
2398 }
2399
2400 if (dp_flags & DP_SEND_RECEIVE_ACK) {
2401 /* I really don't like it that the receiver thread
2402 * sends on the msock, but anyways */
2403 drbd_send_ack(peer_device, P_RECV_ACK, peer_req);
2404 }
2405
2406 if (tp) {
2407 /* two primaries implies protocol C */
2408 D_ASSERT(device, dp_flags & DP_SEND_WRITE_ACK);
2409 peer_req->flags |= EE_IN_INTERVAL_TREE;
2410 err = wait_for_and_update_peer_seq(peer_device, peer_seq);
2411 if (err)
2412 goto out_interrupted;
2413 spin_lock_irq(&device->resource->req_lock);
2414 err = handle_write_conflicts(device, peer_req);
2415 if (err) {
2416 spin_unlock_irq(&device->resource->req_lock);
2417 if (err == -ENOENT) {
2418 put_ldev(device);
2419 return 0;
2420 }
2421 goto out_interrupted;
2422 }
2423 } else {
2424 update_peer_seq(peer_device, peer_seq);
2425 spin_lock_irq(&device->resource->req_lock);
2426 }
2427 /* if we use the zeroout fallback code, we process synchronously
2428 * and we wait for all pending requests, respectively wait for
2429 * active_ee to become empty in drbd_submit_peer_request();
2430 * better not add ourselves here. */
2431 if ((peer_req->flags & EE_IS_TRIM_USE_ZEROOUT) == 0)
2432 list_add_tail(&peer_req->w.list, &device->active_ee);
2433 spin_unlock_irq(&device->resource->req_lock);
2434
2435 if (device->state.conn == C_SYNC_TARGET)
2436 wait_event(device->ee_wait, !overlapping_resync_write(device, peer_req));
2437
2438 if (device->state.pdsk < D_INCONSISTENT) {
2439 /* In case we have the only disk of the cluster, */
2440 drbd_set_out_of_sync(device, peer_req->i.sector, peer_req->i.size);
2441 peer_req->flags &= ~EE_MAY_SET_IN_SYNC;
2442 drbd_al_begin_io(device, &peer_req->i);
2443 peer_req->flags |= EE_CALL_AL_COMPLETE_IO;
2444 }
2445
2446 err = drbd_submit_peer_request(device, peer_req, op, op_flags,
2447 DRBD_FAULT_DT_WR);
2448 if (!err)
2449 return 0;
2450
2451 /* don't care for the reason here */
2452 drbd_err(device, "submit failed, triggering re-connect\n");
2453 spin_lock_irq(&device->resource->req_lock);
2454 list_del(&peer_req->w.list);
2455 drbd_remove_epoch_entry_interval(device, peer_req);
2456 spin_unlock_irq(&device->resource->req_lock);
2457 if (peer_req->flags & EE_CALL_AL_COMPLETE_IO) {
2458 peer_req->flags &= ~EE_CALL_AL_COMPLETE_IO;
2459 drbd_al_complete_io(device, &peer_req->i);
2460 }
2461
2462 out_interrupted:
2463 drbd_may_finish_epoch(connection, peer_req->epoch, EV_PUT + EV_CLEANUP);
2464 put_ldev(device);
2465 drbd_free_peer_req(device, peer_req);
2466 return err;
2467 }
2468
2469 /* We may throttle resync, if the lower device seems to be busy,
2470 * and current sync rate is above c_min_rate.
2471 *
2472 * To decide whether or not the lower device is busy, we use a scheme similar
2473 * to MD RAID is_mddev_idle(): if the partition stats reveal "significant"
2474 * (more than 64 sectors) of activity we cannot account for with our own resync
2475 * activity, it obviously is "busy".
2476 *
2477 * The current sync rate used here uses only the most recent two step marks,
2478 * to have a short time average so we can react faster.
2479 */
2480 bool drbd_rs_should_slow_down(struct drbd_device *device, sector_t sector,
2481 bool throttle_if_app_is_waiting)
2482 {
2483 struct lc_element *tmp;
2484 bool throttle = drbd_rs_c_min_rate_throttle(device);
2485
2486 if (!throttle || throttle_if_app_is_waiting)
2487 return throttle;
2488
2489 spin_lock_irq(&device->al_lock);
2490 tmp = lc_find(device->resync, BM_SECT_TO_EXT(sector));
2491 if (tmp) {
2492 struct bm_extent *bm_ext = lc_entry(tmp, struct bm_extent, lce);
2493 if (test_bit(BME_PRIORITY, &bm_ext->flags))
2494 throttle = false;
2495 /* Do not slow down if app IO is already waiting for this extent,
2496 * and our progress is necessary for application IO to complete. */
2497 }
2498 spin_unlock_irq(&device->al_lock);
2499
2500 return throttle;
2501 }
2502
2503 bool drbd_rs_c_min_rate_throttle(struct drbd_device *device)
2504 {
2505 struct gendisk *disk = device->ldev->backing_bdev->bd_contains->bd_disk;
2506 unsigned long db, dt, dbdt;
2507 unsigned int c_min_rate;
2508 int curr_events;
2509
2510 rcu_read_lock();
2511 c_min_rate = rcu_dereference(device->ldev->disk_conf)->c_min_rate;
2512 rcu_read_unlock();
2513
2514 /* feature disabled? */
2515 if (c_min_rate == 0)
2516 return false;
2517
2518 curr_events = (int)part_stat_read(&disk->part0, sectors[0]) +
2519 (int)part_stat_read(&disk->part0, sectors[1]) -
2520 atomic_read(&device->rs_sect_ev);
2521
2522 if (atomic_read(&device->ap_actlog_cnt)
2523 || curr_events - device->rs_last_events > 64) {
2524 unsigned long rs_left;
2525 int i;
2526
2527 device->rs_last_events = curr_events;
2528
2529 /* sync speed average over the last 2*DRBD_SYNC_MARK_STEP,
2530 * approx. */
2531 i = (device->rs_last_mark + DRBD_SYNC_MARKS-1) % DRBD_SYNC_MARKS;
2532
2533 if (device->state.conn == C_VERIFY_S || device->state.conn == C_VERIFY_T)
2534 rs_left = device->ov_left;
2535 else
2536 rs_left = drbd_bm_total_weight(device) - device->rs_failed;
2537
2538 dt = ((long)jiffies - (long)device->rs_mark_time[i]) / HZ;
2539 if (!dt)
2540 dt++;
2541 db = device->rs_mark_left[i] - rs_left;
2542 dbdt = Bit2KB(db/dt);
2543
2544 if (dbdt > c_min_rate)
2545 return true;
2546 }
2547 return false;
2548 }
2549
2550 static int receive_DataRequest(struct drbd_connection *connection, struct packet_info *pi)
2551 {
2552 struct drbd_peer_device *peer_device;
2553 struct drbd_device *device;
2554 sector_t sector;
2555 sector_t capacity;
2556 struct drbd_peer_request *peer_req;
2557 struct digest_info *di = NULL;
2558 int size, verb;
2559 unsigned int fault_type;
2560 struct p_block_req *p = pi->data;
2561
2562 peer_device = conn_peer_device(connection, pi->vnr);
2563 if (!peer_device)
2564 return -EIO;
2565 device = peer_device->device;
2566 capacity = drbd_get_capacity(device->this_bdev);
2567
2568 sector = be64_to_cpu(p->sector);
2569 size = be32_to_cpu(p->blksize);
2570
2571 if (size <= 0 || !IS_ALIGNED(size, 512) || size > DRBD_MAX_BIO_SIZE) {
2572 drbd_err(device, "%s:%d: sector: %llus, size: %u\n", __FILE__, __LINE__,
2573 (unsigned long long)sector, size);
2574 return -EINVAL;
2575 }
2576 if (sector + (size>>9) > capacity) {
2577 drbd_err(device, "%s:%d: sector: %llus, size: %u\n", __FILE__, __LINE__,
2578 (unsigned long long)sector, size);
2579 return -EINVAL;
2580 }
2581
2582 if (!get_ldev_if_state(device, D_UP_TO_DATE)) {
2583 verb = 1;
2584 switch (pi->cmd) {
2585 case P_DATA_REQUEST:
2586 drbd_send_ack_rp(peer_device, P_NEG_DREPLY, p);
2587 break;
2588 case P_RS_DATA_REQUEST:
2589 case P_CSUM_RS_REQUEST:
2590 case P_OV_REQUEST:
2591 drbd_send_ack_rp(peer_device, P_NEG_RS_DREPLY , p);
2592 break;
2593 case P_OV_REPLY:
2594 verb = 0;
2595 dec_rs_pending(device);
2596 drbd_send_ack_ex(peer_device, P_OV_RESULT, sector, size, ID_IN_SYNC);
2597 break;
2598 default:
2599 BUG();
2600 }
2601 if (verb && __ratelimit(&drbd_ratelimit_state))
2602 drbd_err(device, "Can not satisfy peer's read request, "
2603 "no local data.\n");
2604
2605 /* drain possibly payload */
2606 return drbd_drain_block(peer_device, pi->size);
2607 }
2608
2609 /* GFP_NOIO, because we must not cause arbitrary write-out: in a DRBD
2610 * "criss-cross" setup, that might cause write-out on some other DRBD,
2611 * which in turn might block on the other node at this very place. */
2612 peer_req = drbd_alloc_peer_req(peer_device, p->block_id, sector, size,
2613 true /* has real payload */, GFP_NOIO);
2614 if (!peer_req) {
2615 put_ldev(device);
2616 return -ENOMEM;
2617 }
2618
2619 switch (pi->cmd) {
2620 case P_DATA_REQUEST:
2621 peer_req->w.cb = w_e_end_data_req;
2622 fault_type = DRBD_FAULT_DT_RD;
2623 /* application IO, don't drbd_rs_begin_io */
2624 peer_req->flags |= EE_APPLICATION;
2625 goto submit;
2626
2627 case P_RS_DATA_REQUEST:
2628 peer_req->w.cb = w_e_end_rsdata_req;
2629 fault_type = DRBD_FAULT_RS_RD;
2630 /* used in the sector offset progress display */
2631 device->bm_resync_fo = BM_SECT_TO_BIT(sector);
2632 break;
2633
2634 case P_OV_REPLY:
2635 case P_CSUM_RS_REQUEST:
2636 fault_type = DRBD_FAULT_RS_RD;
2637 di = kmalloc(sizeof(*di) + pi->size, GFP_NOIO);
2638 if (!di)
2639 goto out_free_e;
2640
2641 di->digest_size = pi->size;
2642 di->digest = (((char *)di)+sizeof(struct digest_info));
2643
2644 peer_req->digest = di;
2645 peer_req->flags |= EE_HAS_DIGEST;
2646
2647 if (drbd_recv_all(peer_device->connection, di->digest, pi->size))
2648 goto out_free_e;
2649
2650 if (pi->cmd == P_CSUM_RS_REQUEST) {
2651 D_ASSERT(device, peer_device->connection->agreed_pro_version >= 89);
2652 peer_req->w.cb = w_e_end_csum_rs_req;
2653 /* used in the sector offset progress display */
2654 device->bm_resync_fo = BM_SECT_TO_BIT(sector);
2655 /* remember to report stats in drbd_resync_finished */
2656 device->use_csums = true;
2657 } else if (pi->cmd == P_OV_REPLY) {
2658 /* track progress, we may need to throttle */
2659 atomic_add(size >> 9, &device->rs_sect_in);
2660 peer_req->w.cb = w_e_end_ov_reply;
2661 dec_rs_pending(device);
2662 /* drbd_rs_begin_io done when we sent this request,
2663 * but accounting still needs to be done. */
2664 goto submit_for_resync;
2665 }
2666 break;
2667
2668 case P_OV_REQUEST:
2669 if (device->ov_start_sector == ~(sector_t)0 &&
2670 peer_device->connection->agreed_pro_version >= 90) {
2671 unsigned long now = jiffies;
2672 int i;
2673 device->ov_start_sector = sector;
2674 device->ov_position = sector;
2675 device->ov_left = drbd_bm_bits(device) - BM_SECT_TO_BIT(sector);
2676 device->rs_total = device->ov_left;
2677 for (i = 0; i < DRBD_SYNC_MARKS; i++) {
2678 device->rs_mark_left[i] = device->ov_left;
2679 device->rs_mark_time[i] = now;
2680 }
2681 drbd_info(device, "Online Verify start sector: %llu\n",
2682 (unsigned long long)sector);
2683 }
2684 peer_req->w.cb = w_e_end_ov_req;
2685 fault_type = DRBD_FAULT_RS_RD;
2686 break;
2687
2688 default:
2689 BUG();
2690 }
2691
2692 /* Throttle, drbd_rs_begin_io and submit should become asynchronous
2693 * wrt the receiver, but it is not as straightforward as it may seem.
2694 * Various places in the resync start and stop logic assume resync
2695 * requests are processed in order, requeuing this on the worker thread
2696 * introduces a bunch of new code for synchronization between threads.
2697 *
2698 * Unlimited throttling before drbd_rs_begin_io may stall the resync
2699 * "forever", throttling after drbd_rs_begin_io will lock that extent
2700 * for application writes for the same time. For now, just throttle
2701 * here, where the rest of the code expects the receiver to sleep for
2702 * a while, anyways.
2703 */
2704
2705 /* Throttle before drbd_rs_begin_io, as that locks out application IO;
2706 * this defers syncer requests for some time, before letting at least
2707 * on request through. The resync controller on the receiving side
2708 * will adapt to the incoming rate accordingly.
2709 *
2710 * We cannot throttle here if remote is Primary/SyncTarget:
2711 * we would also throttle its application reads.
2712 * In that case, throttling is done on the SyncTarget only.
2713 */
2714
2715 /* Even though this may be a resync request, we do add to "read_ee";
2716 * "sync_ee" is only used for resync WRITEs.
2717 * Add to list early, so debugfs can find this request
2718 * even if we have to sleep below. */
2719 spin_lock_irq(&device->resource->req_lock);
2720 list_add_tail(&peer_req->w.list, &device->read_ee);
2721 spin_unlock_irq(&device->resource->req_lock);
2722
2723 update_receiver_timing_details(connection, drbd_rs_should_slow_down);
2724 if (device->state.peer != R_PRIMARY
2725 && drbd_rs_should_slow_down(device, sector, false))
2726 schedule_timeout_uninterruptible(HZ/10);
2727 update_receiver_timing_details(connection, drbd_rs_begin_io);
2728 if (drbd_rs_begin_io(device, sector))
2729 goto out_free_e;
2730
2731 submit_for_resync:
2732 atomic_add(size >> 9, &device->rs_sect_ev);
2733
2734 submit:
2735 update_receiver_timing_details(connection, drbd_submit_peer_request);
2736 inc_unacked(device);
2737 if (drbd_submit_peer_request(device, peer_req, REQ_OP_READ, 0,
2738 fault_type) == 0)
2739 return 0;
2740
2741 /* don't care for the reason here */
2742 drbd_err(device, "submit failed, triggering re-connect\n");
2743
2744 out_free_e:
2745 spin_lock_irq(&device->resource->req_lock);
2746 list_del(&peer_req->w.list);
2747 spin_unlock_irq(&device->resource->req_lock);
2748 /* no drbd_rs_complete_io(), we are dropping the connection anyways */
2749
2750 put_ldev(device);
2751 drbd_free_peer_req(device, peer_req);
2752 return -EIO;
2753 }
2754
2755 /**
2756 * drbd_asb_recover_0p - Recover after split-brain with no remaining primaries
2757 */
2758 static int drbd_asb_recover_0p(struct drbd_peer_device *peer_device) __must_hold(local)
2759 {
2760 struct drbd_device *device = peer_device->device;
2761 int self, peer, rv = -100;
2762 unsigned long ch_self, ch_peer;
2763 enum drbd_after_sb_p after_sb_0p;
2764
2765 self = device->ldev->md.uuid[UI_BITMAP] & 1;
2766 peer = device->p_uuid[UI_BITMAP] & 1;
2767
2768 ch_peer = device->p_uuid[UI_SIZE];
2769 ch_self = device->comm_bm_set;
2770
2771 rcu_read_lock();
2772 after_sb_0p = rcu_dereference(peer_device->connection->net_conf)->after_sb_0p;
2773 rcu_read_unlock();
2774 switch (after_sb_0p) {
2775 case ASB_CONSENSUS:
2776 case ASB_DISCARD_SECONDARY:
2777 case ASB_CALL_HELPER:
2778 case ASB_VIOLENTLY:
2779 drbd_err(device, "Configuration error.\n");
2780 break;
2781 case ASB_DISCONNECT:
2782 break;
2783 case ASB_DISCARD_YOUNGER_PRI:
2784 if (self == 0 && peer == 1) {
2785 rv = -1;
2786 break;
2787 }
2788 if (self == 1 && peer == 0) {
2789 rv = 1;
2790 break;
2791 }
2792 /* Else fall through to one of the other strategies... */
2793 case ASB_DISCARD_OLDER_PRI:
2794 if (self == 0 && peer == 1) {
2795 rv = 1;
2796 break;
2797 }
2798 if (self == 1 && peer == 0) {
2799 rv = -1;
2800 break;
2801 }
2802 /* Else fall through to one of the other strategies... */
2803 drbd_warn(device, "Discard younger/older primary did not find a decision\n"
2804 "Using discard-least-changes instead\n");
2805 case ASB_DISCARD_ZERO_CHG:
2806 if (ch_peer == 0 && ch_self == 0) {
2807 rv = test_bit(RESOLVE_CONFLICTS, &peer_device->connection->flags)
2808 ? -1 : 1;
2809 break;
2810 } else {
2811 if (ch_peer == 0) { rv = 1; break; }
2812 if (ch_self == 0) { rv = -1; break; }
2813 }
2814 if (after_sb_0p == ASB_DISCARD_ZERO_CHG)
2815 break;
2816 case ASB_DISCARD_LEAST_CHG:
2817 if (ch_self < ch_peer)
2818 rv = -1;
2819 else if (ch_self > ch_peer)
2820 rv = 1;
2821 else /* ( ch_self == ch_peer ) */
2822 /* Well, then use something else. */
2823 rv = test_bit(RESOLVE_CONFLICTS, &peer_device->connection->flags)
2824 ? -1 : 1;
2825 break;
2826 case ASB_DISCARD_LOCAL:
2827 rv = -1;
2828 break;
2829 case ASB_DISCARD_REMOTE:
2830 rv = 1;
2831 }
2832
2833 return rv;
2834 }
2835
2836 /**
2837 * drbd_asb_recover_1p - Recover after split-brain with one remaining primary
2838 */
2839 static int drbd_asb_recover_1p(struct drbd_peer_device *peer_device) __must_hold(local)
2840 {
2841 struct drbd_device *device = peer_device->device;
2842 int hg, rv = -100;
2843 enum drbd_after_sb_p after_sb_1p;
2844
2845 rcu_read_lock();
2846 after_sb_1p = rcu_dereference(peer_device->connection->net_conf)->after_sb_1p;
2847 rcu_read_unlock();
2848 switch (after_sb_1p) {
2849 case ASB_DISCARD_YOUNGER_PRI:
2850 case ASB_DISCARD_OLDER_PRI:
2851 case ASB_DISCARD_LEAST_CHG:
2852 case ASB_DISCARD_LOCAL:
2853 case ASB_DISCARD_REMOTE:
2854 case ASB_DISCARD_ZERO_CHG:
2855 drbd_err(device, "Configuration error.\n");
2856 break;
2857 case ASB_DISCONNECT:
2858 break;
2859 case ASB_CONSENSUS:
2860 hg = drbd_asb_recover_0p(peer_device);
2861 if (hg == -1 && device->state.role == R_SECONDARY)
2862 rv = hg;
2863 if (hg == 1 && device->state.role == R_PRIMARY)
2864 rv = hg;
2865 break;
2866 case ASB_VIOLENTLY:
2867 rv = drbd_asb_recover_0p(peer_device);
2868 break;
2869 case ASB_DISCARD_SECONDARY:
2870 return device->state.role == R_PRIMARY ? 1 : -1;
2871 case ASB_CALL_HELPER:
2872 hg = drbd_asb_recover_0p(peer_device);
2873 if (hg == -1 && device->state.role == R_PRIMARY) {
2874 enum drbd_state_rv rv2;
2875
2876 /* drbd_change_state() does not sleep while in SS_IN_TRANSIENT_STATE,
2877 * we might be here in C_WF_REPORT_PARAMS which is transient.
2878 * we do not need to wait for the after state change work either. */
2879 rv2 = drbd_change_state(device, CS_VERBOSE, NS(role, R_SECONDARY));
2880 if (rv2 != SS_SUCCESS) {
2881 drbd_khelper(device, "pri-lost-after-sb");
2882 } else {
2883 drbd_warn(device, "Successfully gave up primary role.\n");
2884 rv = hg;
2885 }
2886 } else
2887 rv = hg;
2888 }
2889
2890 return rv;
2891 }
2892
2893 /**
2894 * drbd_asb_recover_2p - Recover after split-brain with two remaining primaries
2895 */
2896 static int drbd_asb_recover_2p(struct drbd_peer_device *peer_device) __must_hold(local)
2897 {
2898 struct drbd_device *device = peer_device->device;
2899 int hg, rv = -100;
2900 enum drbd_after_sb_p after_sb_2p;
2901
2902 rcu_read_lock();
2903 after_sb_2p = rcu_dereference(peer_device->connection->net_conf)->after_sb_2p;
2904 rcu_read_unlock();
2905 switch (after_sb_2p) {
2906 case ASB_DISCARD_YOUNGER_PRI:
2907 case ASB_DISCARD_OLDER_PRI:
2908 case ASB_DISCARD_LEAST_CHG:
2909 case ASB_DISCARD_LOCAL:
2910 case ASB_DISCARD_REMOTE:
2911 case ASB_CONSENSUS:
2912 case ASB_DISCARD_SECONDARY:
2913 case ASB_DISCARD_ZERO_CHG:
2914 drbd_err(device, "Configuration error.\n");
2915 break;
2916 case ASB_VIOLENTLY:
2917 rv = drbd_asb_recover_0p(peer_device);
2918 break;
2919 case ASB_DISCONNECT:
2920 break;
2921 case ASB_CALL_HELPER:
2922 hg = drbd_asb_recover_0p(peer_device);
2923 if (hg == -1) {
2924 enum drbd_state_rv rv2;
2925
2926 /* drbd_change_state() does not sleep while in SS_IN_TRANSIENT_STATE,
2927 * we might be here in C_WF_REPORT_PARAMS which is transient.
2928 * we do not need to wait for the after state change work either. */
2929 rv2 = drbd_change_state(device, CS_VERBOSE, NS(role, R_SECONDARY));
2930 if (rv2 != SS_SUCCESS) {
2931 drbd_khelper(device, "pri-lost-after-sb");
2932 } else {
2933 drbd_warn(device, "Successfully gave up primary role.\n");
2934 rv = hg;
2935 }
2936 } else
2937 rv = hg;
2938 }
2939
2940 return rv;
2941 }
2942
2943 static void drbd_uuid_dump(struct drbd_device *device, char *text, u64 *uuid,
2944 u64 bits, u64 flags)
2945 {
2946 if (!uuid) {
2947 drbd_info(device, "%s uuid info vanished while I was looking!\n", text);
2948 return;
2949 }
2950 drbd_info(device, "%s %016llX:%016llX:%016llX:%016llX bits:%llu flags:%llX\n",
2951 text,
2952 (unsigned long long)uuid[UI_CURRENT],
2953 (unsigned long long)uuid[UI_BITMAP],
2954 (unsigned long long)uuid[UI_HISTORY_START],
2955 (unsigned long long)uuid[UI_HISTORY_END],
2956 (unsigned long long)bits,
2957 (unsigned long long)flags);
2958 }
2959
2960 /*
2961 100 after split brain try auto recover
2962 2 C_SYNC_SOURCE set BitMap
2963 1 C_SYNC_SOURCE use BitMap
2964 0 no Sync
2965 -1 C_SYNC_TARGET use BitMap
2966 -2 C_SYNC_TARGET set BitMap
2967 -100 after split brain, disconnect
2968 -1000 unrelated data
2969 -1091 requires proto 91
2970 -1096 requires proto 96
2971 */
2972 static int drbd_uuid_compare(struct drbd_device *const device, int *rule_nr) __must_hold(local)
2973 {
2974 struct drbd_peer_device *const peer_device = first_peer_device(device);
2975 struct drbd_connection *const connection = peer_device ? peer_device->connection : NULL;
2976 u64 self, peer;
2977 int i, j;
2978
2979 self = device->ldev->md.uuid[UI_CURRENT] & ~((u64)1);
2980 peer = device->p_uuid[UI_CURRENT] & ~((u64)1);
2981
2982 *rule_nr = 10;
2983 if (self == UUID_JUST_CREATED && peer == UUID_JUST_CREATED)
2984 return 0;
2985
2986 *rule_nr = 20;
2987 if ((self == UUID_JUST_CREATED || self == (u64)0) &&
2988 peer != UUID_JUST_CREATED)
2989 return -2;
2990
2991 *rule_nr = 30;
2992 if (self != UUID_JUST_CREATED &&
2993 (peer == UUID_JUST_CREATED || peer == (u64)0))
2994 return 2;
2995
2996 if (self == peer) {
2997 int rct, dc; /* roles at crash time */
2998
2999 if (device->p_uuid[UI_BITMAP] == (u64)0 && device->ldev->md.uuid[UI_BITMAP] != (u64)0) {
3000
3001 if (connection->agreed_pro_version < 91)
3002 return -1091;
3003
3004 if ((device->ldev->md.uuid[UI_BITMAP] & ~((u64)1)) == (device->p_uuid[UI_HISTORY_START] & ~((u64)1)) &&
3005 (device->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) == (device->p_uuid[UI_HISTORY_START + 1] & ~((u64)1))) {
3006 drbd_info(device, "was SyncSource, missed the resync finished event, corrected myself:\n");
3007 drbd_uuid_move_history(device);
3008 device->ldev->md.uuid[UI_HISTORY_START] = device->ldev->md.uuid[UI_BITMAP];
3009 device->ldev->md.uuid[UI_BITMAP] = 0;
3010
3011 drbd_uuid_dump(device, "self", device->ldev->md.uuid,
3012 device->state.disk >= D_NEGOTIATING ? drbd_bm_total_weight(device) : 0, 0);
3013 *rule_nr = 34;
3014 } else {
3015 drbd_info(device, "was SyncSource (peer failed to write sync_uuid)\n");
3016 *rule_nr = 36;
3017 }
3018
3019 return 1;
3020 }
3021
3022 if (device->ldev->md.uuid[UI_BITMAP] == (u64)0 && device->p_uuid[UI_BITMAP] != (u64)0) {
3023
3024 if (connection->agreed_pro_version < 91)
3025 return -1091;
3026
3027 if ((device->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) == (device->p_uuid[UI_BITMAP] & ~((u64)1)) &&
3028 (device->ldev->md.uuid[UI_HISTORY_START + 1] & ~((u64)1)) == (device->p_uuid[UI_HISTORY_START] & ~((u64)1))) {
3029 drbd_info(device, "was SyncTarget, peer missed the resync finished event, corrected peer:\n");
3030
3031 device->p_uuid[UI_HISTORY_START + 1] = device->p_uuid[UI_HISTORY_START];
3032 device->p_uuid[UI_HISTORY_START] = device->p_uuid[UI_BITMAP];
3033 device->p_uuid[UI_BITMAP] = 0UL;
3034
3035 drbd_uuid_dump(device, "peer", device->p_uuid, device->p_uuid[UI_SIZE], device->p_uuid[UI_FLAGS]);
3036 *rule_nr = 35;
3037 } else {
3038 drbd_info(device, "was SyncTarget (failed to write sync_uuid)\n");
3039 *rule_nr = 37;
3040 }
3041
3042 return -1;
3043 }
3044
3045 /* Common power [off|failure] */
3046 rct = (test_bit(CRASHED_PRIMARY, &device->flags) ? 1 : 0) +
3047 (device->p_uuid[UI_FLAGS] & 2);
3048 /* lowest bit is set when we were primary,
3049 * next bit (weight 2) is set when peer was primary */
3050 *rule_nr = 40;
3051
3052 switch (rct) {
3053 case 0: /* !self_pri && !peer_pri */ return 0;
3054 case 1: /* self_pri && !peer_pri */ return 1;
3055 case 2: /* !self_pri && peer_pri */ return -1;
3056 case 3: /* self_pri && peer_pri */
3057 dc = test_bit(RESOLVE_CONFLICTS, &connection->flags);
3058 return dc ? -1 : 1;
3059 }
3060 }
3061
3062 *rule_nr = 50;
3063 peer = device->p_uuid[UI_BITMAP] & ~((u64)1);
3064 if (self == peer)
3065 return -1;
3066
3067 *rule_nr = 51;
3068 peer = device->p_uuid[UI_HISTORY_START] & ~((u64)1);
3069 if (self == peer) {
3070 if (connection->agreed_pro_version < 96 ?
3071 (device->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) ==
3072 (device->p_uuid[UI_HISTORY_START + 1] & ~((u64)1)) :
3073 peer + UUID_NEW_BM_OFFSET == (device->p_uuid[UI_BITMAP] & ~((u64)1))) {
3074 /* The last P_SYNC_UUID did not get though. Undo the last start of
3075 resync as sync source modifications of the peer's UUIDs. */
3076
3077 if (connection->agreed_pro_version < 91)
3078 return -1091;
3079
3080 device->p_uuid[UI_BITMAP] = device->p_uuid[UI_HISTORY_START];
3081 device->p_uuid[UI_HISTORY_START] = device->p_uuid[UI_HISTORY_START + 1];
3082
3083 drbd_info(device, "Lost last syncUUID packet, corrected:\n");
3084 drbd_uuid_dump(device, "peer", device->p_uuid, device->p_uuid[UI_SIZE], device->p_uuid[UI_FLAGS]);
3085
3086 return -1;
3087 }
3088 }
3089
3090 *rule_nr = 60;
3091 self = device->ldev->md.uuid[UI_CURRENT] & ~((u64)1);
3092 for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
3093 peer = device->p_uuid[i] & ~((u64)1);
3094 if (self == peer)
3095 return -2;
3096 }
3097
3098 *rule_nr = 70;
3099 self = device->ldev->md.uuid[UI_BITMAP] & ~((u64)1);
3100 peer = device->p_uuid[UI_CURRENT] & ~((u64)1);
3101 if (self == peer)
3102 return 1;
3103
3104 *rule_nr = 71;
3105 self = device->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1);
3106 if (self == peer) {
3107 if (connection->agreed_pro_version < 96 ?
3108 (device->ldev->md.uuid[UI_HISTORY_START + 1] & ~((u64)1)) ==
3109 (device->p_uuid[UI_HISTORY_START] & ~((u64)1)) :
3110 self + UUID_NEW_BM_OFFSET == (device->ldev->md.uuid[UI_BITMAP] & ~((u64)1))) {
3111 /* The last P_SYNC_UUID did not get though. Undo the last start of
3112 resync as sync source modifications of our UUIDs. */
3113
3114 if (connection->agreed_pro_version < 91)
3115 return -1091;
3116
3117 __drbd_uuid_set(device, UI_BITMAP, device->ldev->md.uuid[UI_HISTORY_START]);
3118 __drbd_uuid_set(device, UI_HISTORY_START, device->ldev->md.uuid[UI_HISTORY_START + 1]);
3119
3120 drbd_info(device, "Last syncUUID did not get through, corrected:\n");
3121 drbd_uuid_dump(device, "self", device->ldev->md.uuid,
3122 device->state.disk >= D_NEGOTIATING ? drbd_bm_total_weight(device) : 0, 0);
3123
3124 return 1;
3125 }
3126 }
3127
3128
3129 *rule_nr = 80;
3130 peer = device->p_uuid[UI_CURRENT] & ~((u64)1);
3131 for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
3132 self = device->ldev->md.uuid[i] & ~((u64)1);
3133 if (self == peer)
3134 return 2;
3135 }
3136
3137 *rule_nr = 90;
3138 self = device->ldev->md.uuid[UI_BITMAP] & ~((u64)1);
3139 peer = device->p_uuid[UI_BITMAP] & ~((u64)1);
3140 if (self == peer && self != ((u64)0))
3141 return 100;
3142
3143 *rule_nr = 100;
3144 for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
3145 self = device->ldev->md.uuid[i] & ~((u64)1);
3146 for (j = UI_HISTORY_START; j <= UI_HISTORY_END; j++) {
3147 peer = device->p_uuid[j] & ~((u64)1);
3148 if (self == peer)
3149 return -100;
3150 }
3151 }
3152
3153 return -1000;
3154 }
3155
3156 /* drbd_sync_handshake() returns the new conn state on success, or
3157 CONN_MASK (-1) on failure.
3158 */
3159 static enum drbd_conns drbd_sync_handshake(struct drbd_peer_device *peer_device,
3160 enum drbd_role peer_role,
3161 enum drbd_disk_state peer_disk) __must_hold(local)
3162 {
3163 struct drbd_device *device = peer_device->device;
3164 enum drbd_conns rv = C_MASK;
3165 enum drbd_disk_state mydisk;
3166 struct net_conf *nc;
3167 int hg, rule_nr, rr_conflict, tentative;
3168
3169 mydisk = device->state.disk;
3170 if (mydisk == D_NEGOTIATING)
3171 mydisk = device->new_state_tmp.disk;
3172
3173 drbd_info(device, "drbd_sync_handshake:\n");
3174
3175 spin_lock_irq(&device->ldev->md.uuid_lock);
3176 drbd_uuid_dump(device, "self", device->ldev->md.uuid, device->comm_bm_set, 0);
3177 drbd_uuid_dump(device, "peer", device->p_uuid,
3178 device->p_uuid[UI_SIZE], device->p_uuid[UI_FLAGS]);
3179
3180 hg = drbd_uuid_compare(device, &rule_nr);
3181 spin_unlock_irq(&device->ldev->md.uuid_lock);
3182
3183 drbd_info(device, "uuid_compare()=%d by rule %d\n", hg, rule_nr);
3184
3185 if (hg == -1000) {
3186 drbd_alert(device, "Unrelated data, aborting!\n");
3187 return C_MASK;
3188 }
3189 if (hg < -1000) {
3190 drbd_alert(device, "To resolve this both sides have to support at least protocol %d\n", -hg - 1000);
3191 return C_MASK;
3192 }
3193
3194 if ((mydisk == D_INCONSISTENT && peer_disk > D_INCONSISTENT) ||
3195 (peer_disk == D_INCONSISTENT && mydisk > D_INCONSISTENT)) {
3196 int f = (hg == -100) || abs(hg) == 2;
3197 hg = mydisk > D_INCONSISTENT ? 1 : -1;
3198 if (f)
3199 hg = hg*2;
3200 drbd_info(device, "Becoming sync %s due to disk states.\n",
3201 hg > 0 ? "source" : "target");
3202 }
3203
3204 if (abs(hg) == 100)
3205 drbd_khelper(device, "initial-split-brain");
3206
3207 rcu_read_lock();
3208 nc = rcu_dereference(peer_device->connection->net_conf);
3209
3210 if (hg == 100 || (hg == -100 && nc->always_asbp)) {
3211 int pcount = (device->state.role == R_PRIMARY)
3212 + (peer_role == R_PRIMARY);
3213 int forced = (hg == -100);
3214
3215 switch (pcount) {
3216 case 0:
3217 hg = drbd_asb_recover_0p(peer_device);
3218 break;
3219 case 1:
3220 hg = drbd_asb_recover_1p(peer_device);
3221 break;
3222 case 2:
3223 hg = drbd_asb_recover_2p(peer_device);
3224 break;
3225 }
3226 if (abs(hg) < 100) {
3227 drbd_warn(device, "Split-Brain detected, %d primaries, "
3228 "automatically solved. Sync from %s node\n",
3229 pcount, (hg < 0) ? "peer" : "this");
3230 if (forced) {
3231 drbd_warn(device, "Doing a full sync, since"
3232 " UUIDs where ambiguous.\n");
3233 hg = hg*2;
3234 }
3235 }
3236 }
3237
3238 if (hg == -100) {
3239 if (test_bit(DISCARD_MY_DATA, &device->flags) && !(device->p_uuid[UI_FLAGS]&1))
3240 hg = -1;
3241 if (!test_bit(DISCARD_MY_DATA, &device->flags) && (device->p_uuid[UI_FLAGS]&1))
3242 hg = 1;
3243
3244 if (abs(hg) < 100)
3245 drbd_warn(device, "Split-Brain detected, manually solved. "
3246 "Sync from %s node\n",
3247 (hg < 0) ? "peer" : "this");
3248 }
3249 rr_conflict = nc->rr_conflict;
3250 tentative = nc->tentative;
3251 rcu_read_unlock();
3252
3253 if (hg == -100) {
3254 /* FIXME this log message is not correct if we end up here
3255 * after an attempted attach on a diskless node.
3256 * We just refuse to attach -- well, we drop the "connection"
3257 * to that disk, in a way... */
3258 drbd_alert(device, "Split-Brain detected but unresolved, dropping connection!\n");
3259 drbd_khelper(device, "split-brain");
3260 return C_MASK;
3261 }
3262
3263 if (hg > 0 && mydisk <= D_INCONSISTENT) {
3264 drbd_err(device, "I shall become SyncSource, but I am inconsistent!\n");
3265 return C_MASK;
3266 }
3267
3268 if (hg < 0 && /* by intention we do not use mydisk here. */
3269 device->state.role == R_PRIMARY && device->state.disk >= D_CONSISTENT) {
3270 switch (rr_conflict) {
3271 case ASB_CALL_HELPER:
3272 drbd_khelper(device, "pri-lost");
3273 /* fall through */
3274 case ASB_DISCONNECT:
3275 drbd_err(device, "I shall become SyncTarget, but I am primary!\n");
3276 return C_MASK;
3277 case ASB_VIOLENTLY:
3278 drbd_warn(device, "Becoming SyncTarget, violating the stable-data"
3279 "assumption\n");
3280 }
3281 }
3282
3283 if (tentative || test_bit(CONN_DRY_RUN, &peer_device->connection->flags)) {
3284 if (hg == 0)
3285 drbd_info(device, "dry-run connect: No resync, would become Connected immediately.\n");
3286 else
3287 drbd_info(device, "dry-run connect: Would become %s, doing a %s resync.",
3288 drbd_conn_str(hg > 0 ? C_SYNC_SOURCE : C_SYNC_TARGET),
3289 abs(hg) >= 2 ? "full" : "bit-map based");
3290 return C_MASK;
3291 }
3292
3293 if (abs(hg) >= 2) {
3294 drbd_info(device, "Writing the whole bitmap, full sync required after drbd_sync_handshake.\n");
3295 if (drbd_bitmap_io(device, &drbd_bmio_set_n_write, "set_n_write from sync_handshake",
3296 BM_LOCKED_SET_ALLOWED))
3297 return C_MASK;
3298 }
3299
3300 if (hg > 0) { /* become sync source. */
3301 rv = C_WF_BITMAP_S;
3302 } else if (hg < 0) { /* become sync target */
3303 rv = C_WF_BITMAP_T;
3304 } else {
3305 rv = C_CONNECTED;
3306 if (drbd_bm_total_weight(device)) {
3307 drbd_info(device, "No resync, but %lu bits in bitmap!\n",
3308 drbd_bm_total_weight(device));
3309 }
3310 }
3311
3312 return rv;
3313 }
3314
3315 static enum drbd_after_sb_p convert_after_sb(enum drbd_after_sb_p peer)
3316 {
3317 /* ASB_DISCARD_REMOTE - ASB_DISCARD_LOCAL is valid */
3318 if (peer == ASB_DISCARD_REMOTE)
3319 return ASB_DISCARD_LOCAL;
3320
3321 /* any other things with ASB_DISCARD_REMOTE or ASB_DISCARD_LOCAL are invalid */
3322 if (peer == ASB_DISCARD_LOCAL)
3323 return ASB_DISCARD_REMOTE;
3324
3325 /* everything else is valid if they are equal on both sides. */
3326 return peer;
3327 }
3328
3329 static int receive_protocol(struct drbd_connection *connection, struct packet_info *pi)
3330 {
3331 struct p_protocol *p = pi->data;
3332 enum drbd_after_sb_p p_after_sb_0p, p_after_sb_1p, p_after_sb_2p;
3333 int p_proto, p_discard_my_data, p_two_primaries, cf;
3334 struct net_conf *nc, *old_net_conf, *new_net_conf = NULL;
3335 char integrity_alg[SHARED_SECRET_MAX] = "";
3336 struct crypto_ahash *peer_integrity_tfm = NULL;
3337 void *int_dig_in = NULL, *int_dig_vv = NULL;
3338
3339 p_proto = be32_to_cpu(p->protocol);
3340 p_after_sb_0p = be32_to_cpu(p->after_sb_0p);
3341 p_after_sb_1p = be32_to_cpu(p->after_sb_1p);
3342 p_after_sb_2p = be32_to_cpu(p->after_sb_2p);
3343 p_two_primaries = be32_to_cpu(p->two_primaries);
3344 cf = be32_to_cpu(p->conn_flags);
3345 p_discard_my_data = cf & CF_DISCARD_MY_DATA;
3346
3347 if (connection->agreed_pro_version >= 87) {
3348 int err;
3349
3350 if (pi->size > sizeof(integrity_alg))
3351 return -EIO;
3352 err = drbd_recv_all(connection, integrity_alg, pi->size);
3353 if (err)
3354 return err;
3355 integrity_alg[SHARED_SECRET_MAX - 1] = 0;
3356 }
3357
3358 if (pi->cmd != P_PROTOCOL_UPDATE) {
3359 clear_bit(CONN_DRY_RUN, &connection->flags);
3360
3361 if (cf & CF_DRY_RUN)
3362 set_bit(CONN_DRY_RUN, &connection->flags);
3363
3364 rcu_read_lock();
3365 nc = rcu_dereference(connection->net_conf);
3366
3367 if (p_proto != nc->wire_protocol) {
3368 drbd_err(connection, "incompatible %s settings\n", "protocol");
3369 goto disconnect_rcu_unlock;
3370 }
3371
3372 if (convert_after_sb(p_after_sb_0p) != nc->after_sb_0p) {
3373 drbd_err(connection, "incompatible %s settings\n", "after-sb-0pri");
3374 goto disconnect_rcu_unlock;
3375 }
3376
3377 if (convert_after_sb(p_after_sb_1p) != nc->after_sb_1p) {
3378 drbd_err(connection, "incompatible %s settings\n", "after-sb-1pri");
3379 goto disconnect_rcu_unlock;
3380 }
3381
3382 if (convert_after_sb(p_after_sb_2p) != nc->after_sb_2p) {
3383 drbd_err(connection, "incompatible %s settings\n", "after-sb-2pri");
3384 goto disconnect_rcu_unlock;
3385 }
3386
3387 if (p_discard_my_data && nc->discard_my_data) {
3388 drbd_err(connection, "incompatible %s settings\n", "discard-my-data");
3389 goto disconnect_rcu_unlock;
3390 }
3391
3392 if (p_two_primaries != nc->two_primaries) {
3393 drbd_err(connection, "incompatible %s settings\n", "allow-two-primaries");
3394 goto disconnect_rcu_unlock;
3395 }
3396
3397 if (strcmp(integrity_alg, nc->integrity_alg)) {
3398 drbd_err(connection, "incompatible %s settings\n", "data-integrity-alg");
3399 goto disconnect_rcu_unlock;
3400 }
3401
3402 rcu_read_unlock();
3403 }
3404
3405 if (integrity_alg[0]) {
3406 int hash_size;
3407
3408 /*
3409 * We can only change the peer data integrity algorithm
3410 * here. Changing our own data integrity algorithm
3411 * requires that we send a P_PROTOCOL_UPDATE packet at
3412 * the same time; otherwise, the peer has no way to
3413 * tell between which packets the algorithm should
3414 * change.
3415 */
3416
3417 peer_integrity_tfm = crypto_alloc_ahash(integrity_alg, 0, CRYPTO_ALG_ASYNC);
3418 if (!peer_integrity_tfm) {
3419 drbd_err(connection, "peer data-integrity-alg %s not supported\n",
3420 integrity_alg);
3421 goto disconnect;
3422 }
3423
3424 hash_size = crypto_ahash_digestsize(peer_integrity_tfm);
3425 int_dig_in = kmalloc(hash_size, GFP_KERNEL);
3426 int_dig_vv = kmalloc(hash_size, GFP_KERNEL);
3427 if (!(int_dig_in && int_dig_vv)) {
3428 drbd_err(connection, "Allocation of buffers for data integrity checking failed\n");
3429 goto disconnect;
3430 }
3431 }
3432
3433 new_net_conf = kmalloc(sizeof(struct net_conf), GFP_KERNEL);
3434 if (!new_net_conf) {
3435 drbd_err(connection, "Allocation of new net_conf failed\n");
3436 goto disconnect;
3437 }
3438
3439 mutex_lock(&connection->data.mutex);
3440 mutex_lock(&connection->resource->conf_update);
3441 old_net_conf = connection->net_conf;
3442 *new_net_conf = *old_net_conf;
3443
3444 new_net_conf->wire_protocol = p_proto;
3445 new_net_conf->after_sb_0p = convert_after_sb(p_after_sb_0p);
3446 new_net_conf->after_sb_1p = convert_after_sb(p_after_sb_1p);
3447 new_net_conf->after_sb_2p = convert_after_sb(p_after_sb_2p);
3448 new_net_conf->two_primaries = p_two_primaries;
3449
3450 rcu_assign_pointer(connection->net_conf, new_net_conf);
3451 mutex_unlock(&connection->resource->conf_update);
3452 mutex_unlock(&connection->data.mutex);
3453
3454 crypto_free_ahash(connection->peer_integrity_tfm);
3455 kfree(connection->int_dig_in);
3456 kfree(connection->int_dig_vv);
3457 connection->peer_integrity_tfm = peer_integrity_tfm;
3458 connection->int_dig_in = int_dig_in;
3459 connection->int_dig_vv = int_dig_vv;
3460
3461 if (strcmp(old_net_conf->integrity_alg, integrity_alg))
3462 drbd_info(connection, "peer data-integrity-alg: %s\n",
3463 integrity_alg[0] ? integrity_alg : "(none)");
3464
3465 synchronize_rcu();
3466 kfree(old_net_conf);
3467 return 0;
3468
3469 disconnect_rcu_unlock:
3470 rcu_read_unlock();
3471 disconnect:
3472 crypto_free_ahash(peer_integrity_tfm);
3473 kfree(int_dig_in);
3474 kfree(int_dig_vv);
3475 conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD);
3476 return -EIO;
3477 }
3478
3479 /* helper function
3480 * input: alg name, feature name
3481 * return: NULL (alg name was "")
3482 * ERR_PTR(error) if something goes wrong
3483 * or the crypto hash ptr, if it worked out ok. */
3484 static struct crypto_ahash *drbd_crypto_alloc_digest_safe(const struct drbd_device *device,
3485 const char *alg, const char *name)
3486 {
3487 struct crypto_ahash *tfm;
3488
3489 if (!alg[0])
3490 return NULL;
3491
3492 tfm = crypto_alloc_ahash(alg, 0, CRYPTO_ALG_ASYNC);
3493 if (IS_ERR(tfm)) {
3494 drbd_err(device, "Can not allocate \"%s\" as %s (reason: %ld)\n",
3495 alg, name, PTR_ERR(tfm));
3496 return tfm;
3497 }
3498 return tfm;
3499 }
3500
3501 static int ignore_remaining_packet(struct drbd_connection *connection, struct packet_info *pi)
3502 {
3503 void *buffer = connection->data.rbuf;
3504 int size = pi->size;
3505
3506 while (size) {
3507 int s = min_t(int, size, DRBD_SOCKET_BUFFER_SIZE);
3508 s = drbd_recv(connection, buffer, s);
3509 if (s <= 0) {
3510 if (s < 0)
3511 return s;
3512 break;
3513 }
3514 size -= s;
3515 }
3516 if (size)
3517 return -EIO;
3518 return 0;
3519 }
3520
3521 /*
3522 * config_unknown_volume - device configuration command for unknown volume
3523 *
3524 * When a device is added to an existing connection, the node on which the
3525 * device is added first will send configuration commands to its peer but the
3526 * peer will not know about the device yet. It will warn and ignore these
3527 * commands. Once the device is added on the second node, the second node will
3528 * send the same device configuration commands, but in the other direction.
3529 *
3530 * (We can also end up here if drbd is misconfigured.)
3531 */
3532 static int config_unknown_volume(struct drbd_connection *connection, struct packet_info *pi)
3533 {
3534 drbd_warn(connection, "%s packet received for volume %u, which is not configured locally\n",
3535 cmdname(pi->cmd), pi->vnr);
3536 return ignore_remaining_packet(connection, pi);
3537 }
3538
3539 static int receive_SyncParam(struct drbd_connection *connection, struct packet_info *pi)
3540 {
3541 struct drbd_peer_device *peer_device;
3542 struct drbd_device *device;
3543 struct p_rs_param_95 *p;
3544 unsigned int header_size, data_size, exp_max_sz;
3545 struct crypto_ahash *verify_tfm = NULL;
3546 struct crypto_ahash *csums_tfm = NULL;
3547 struct net_conf *old_net_conf, *new_net_conf = NULL;
3548 struct disk_conf *old_disk_conf = NULL, *new_disk_conf = NULL;
3549 const int apv = connection->agreed_pro_version;
3550 struct fifo_buffer *old_plan = NULL, *new_plan = NULL;
3551 int fifo_size = 0;
3552 int err;
3553
3554 peer_device = conn_peer_device(connection, pi->vnr);
3555 if (!peer_device)
3556 return config_unknown_volume(connection, pi);
3557 device = peer_device->device;
3558
3559 exp_max_sz = apv <= 87 ? sizeof(struct p_rs_param)
3560 : apv == 88 ? sizeof(struct p_rs_param)
3561 + SHARED_SECRET_MAX
3562 : apv <= 94 ? sizeof(struct p_rs_param_89)
3563 : /* apv >= 95 */ sizeof(struct p_rs_param_95);
3564
3565 if (pi->size > exp_max_sz) {
3566 drbd_err(device, "SyncParam packet too long: received %u, expected <= %u bytes\n",
3567 pi->size, exp_max_sz);
3568 return -EIO;
3569 }
3570
3571 if (apv <= 88) {
3572 header_size = sizeof(struct p_rs_param);
3573 data_size = pi->size - header_size;
3574 } else if (apv <= 94) {
3575 header_size = sizeof(struct p_rs_param_89);
3576 data_size = pi->size - header_size;
3577 D_ASSERT(device, data_size == 0);
3578 } else {
3579 header_size = sizeof(struct p_rs_param_95);
3580 data_size = pi->size - header_size;
3581 D_ASSERT(device, data_size == 0);
3582 }
3583
3584 /* initialize verify_alg and csums_alg */
3585 p = pi->data;
3586 memset(p->verify_alg, 0, 2 * SHARED_SECRET_MAX);
3587
3588 err = drbd_recv_all(peer_device->connection, p, header_size);
3589 if (err)
3590 return err;
3591
3592 mutex_lock(&connection->resource->conf_update);
3593 old_net_conf = peer_device->connection->net_conf;
3594 if (get_ldev(device)) {
3595 new_disk_conf = kzalloc(sizeof(struct disk_conf), GFP_KERNEL);
3596 if (!new_disk_conf) {
3597 put_ldev(device);
3598 mutex_unlock(&connection->resource->conf_update);
3599 drbd_err(device, "Allocation of new disk_conf failed\n");
3600 return -ENOMEM;
3601 }
3602
3603 old_disk_conf = device->ldev->disk_conf;
3604 *new_disk_conf = *old_disk_conf;
3605
3606 new_disk_conf->resync_rate = be32_to_cpu(p->resync_rate);
3607 }
3608
3609 if (apv >= 88) {
3610 if (apv == 88) {
3611 if (data_size > SHARED_SECRET_MAX || data_size == 0) {
3612 drbd_err(device, "verify-alg of wrong size, "
3613 "peer wants %u, accepting only up to %u byte\n",
3614 data_size, SHARED_SECRET_MAX);
3615 err = -EIO;
3616 goto reconnect;
3617 }
3618
3619 err = drbd_recv_all(peer_device->connection, p->verify_alg, data_size);
3620 if (err)
3621 goto reconnect;
3622 /* we expect NUL terminated string */
3623 /* but just in case someone tries to be evil */
3624 D_ASSERT(device, p->verify_alg[data_size-1] == 0);
3625 p->verify_alg[data_size-1] = 0;
3626
3627 } else /* apv >= 89 */ {
3628 /* we still expect NUL terminated strings */
3629 /* but just in case someone tries to be evil */
3630 D_ASSERT(device, p->verify_alg[SHARED_SECRET_MAX-1] == 0);
3631 D_ASSERT(device, p->csums_alg[SHARED_SECRET_MAX-1] == 0);
3632 p->verify_alg[SHARED_SECRET_MAX-1] = 0;
3633 p->csums_alg[SHARED_SECRET_MAX-1] = 0;
3634 }
3635
3636 if (strcmp(old_net_conf->verify_alg, p->verify_alg)) {
3637 if (device->state.conn == C_WF_REPORT_PARAMS) {
3638 drbd_err(device, "Different verify-alg settings. me=\"%s\" peer=\"%s\"\n",
3639 old_net_conf->verify_alg, p->verify_alg);
3640 goto disconnect;
3641 }
3642 verify_tfm = drbd_crypto_alloc_digest_safe(device,
3643 p->verify_alg, "verify-alg");
3644 if (IS_ERR(verify_tfm)) {
3645 verify_tfm = NULL;
3646 goto disconnect;
3647 }
3648 }
3649
3650 if (apv >= 89 && strcmp(old_net_conf->csums_alg, p->csums_alg)) {
3651 if (device->state.conn == C_WF_REPORT_PARAMS) {
3652 drbd_err(device, "Different csums-alg settings. me=\"%s\" peer=\"%s\"\n",
3653 old_net_conf->csums_alg, p->csums_alg);
3654 goto disconnect;
3655 }
3656 csums_tfm = drbd_crypto_alloc_digest_safe(device,
3657 p->csums_alg, "csums-alg");
3658 if (IS_ERR(csums_tfm)) {
3659 csums_tfm = NULL;
3660 goto disconnect;
3661 }
3662 }
3663
3664 if (apv > 94 && new_disk_conf) {
3665 new_disk_conf->c_plan_ahead = be32_to_cpu(p->c_plan_ahead);
3666 new_disk_conf->c_delay_target = be32_to_cpu(p->c_delay_target);
3667 new_disk_conf->c_fill_target = be32_to_cpu(p->c_fill_target);
3668 new_disk_conf->c_max_rate = be32_to_cpu(p->c_max_rate);
3669
3670 fifo_size = (new_disk_conf->c_plan_ahead * 10 * SLEEP_TIME) / HZ;
3671 if (fifo_size != device->rs_plan_s->size) {
3672 new_plan = fifo_alloc(fifo_size);
3673 if (!new_plan) {
3674 drbd_err(device, "kmalloc of fifo_buffer failed");
3675 put_ldev(device);
3676 goto disconnect;
3677 }
3678 }
3679 }
3680
3681 if (verify_tfm || csums_tfm) {
3682 new_net_conf = kzalloc(sizeof(struct net_conf), GFP_KERNEL);
3683 if (!new_net_conf) {
3684 drbd_err(device, "Allocation of new net_conf failed\n");
3685 goto disconnect;
3686 }
3687
3688 *new_net_conf = *old_net_conf;
3689
3690 if (verify_tfm) {
3691 strcpy(new_net_conf->verify_alg, p->verify_alg);
3692 new_net_conf->verify_alg_len = strlen(p->verify_alg) + 1;
3693 crypto_free_ahash(peer_device->connection->verify_tfm);
3694 peer_device->connection->verify_tfm = verify_tfm;
3695 drbd_info(device, "using verify-alg: \"%s\"\n", p->verify_alg);
3696 }
3697 if (csums_tfm) {
3698 strcpy(new_net_conf->csums_alg, p->csums_alg);
3699 new_net_conf->csums_alg_len = strlen(p->csums_alg) + 1;
3700 crypto_free_ahash(peer_device->connection->csums_tfm);
3701 peer_device->connection->csums_tfm = csums_tfm;
3702 drbd_info(device, "using csums-alg: \"%s\"\n", p->csums_alg);
3703 }
3704 rcu_assign_pointer(connection->net_conf, new_net_conf);
3705 }
3706 }
3707
3708 if (new_disk_conf) {
3709 rcu_assign_pointer(device->ldev->disk_conf, new_disk_conf);
3710 put_ldev(device);
3711 }
3712
3713 if (new_plan) {
3714 old_plan = device->rs_plan_s;
3715 rcu_assign_pointer(device->rs_plan_s, new_plan);
3716 }
3717
3718 mutex_unlock(&connection->resource->conf_update);
3719 synchronize_rcu();
3720 if (new_net_conf)
3721 kfree(old_net_conf);
3722 kfree(old_disk_conf);
3723 kfree(old_plan);
3724
3725 return 0;
3726
3727 reconnect:
3728 if (new_disk_conf) {
3729 put_ldev(device);
3730 kfree(new_disk_conf);
3731 }
3732 mutex_unlock(&connection->resource->conf_update);
3733 return -EIO;
3734
3735 disconnect:
3736 kfree(new_plan);
3737 if (new_disk_conf) {
3738 put_ldev(device);
3739 kfree(new_disk_conf);
3740 }
3741 mutex_unlock(&connection->resource->conf_update);
3742 /* just for completeness: actually not needed,
3743 * as this is not reached if csums_tfm was ok. */
3744 crypto_free_ahash(csums_tfm);
3745 /* but free the verify_tfm again, if csums_tfm did not work out */
3746 crypto_free_ahash(verify_tfm);
3747 conn_request_state(peer_device->connection, NS(conn, C_DISCONNECTING), CS_HARD);
3748 return -EIO;
3749 }
3750
3751 /* warn if the arguments differ by more than 12.5% */
3752 static void warn_if_differ_considerably(struct drbd_device *device,
3753 const char *s, sector_t a, sector_t b)
3754 {
3755 sector_t d;
3756 if (a == 0 || b == 0)
3757 return;
3758 d = (a > b) ? (a - b) : (b - a);
3759 if (d > (a>>3) || d > (b>>3))
3760 drbd_warn(device, "Considerable difference in %s: %llus vs. %llus\n", s,
3761 (unsigned long long)a, (unsigned long long)b);
3762 }
3763
3764 static int receive_sizes(struct drbd_connection *connection, struct packet_info *pi)
3765 {
3766 struct drbd_peer_device *peer_device;
3767 struct drbd_device *device;
3768 struct p_sizes *p = pi->data;
3769 enum determine_dev_size dd = DS_UNCHANGED;
3770 sector_t p_size, p_usize, p_csize, my_usize;
3771 int ldsc = 0; /* local disk size changed */
3772 enum dds_flags ddsf;
3773
3774 peer_device = conn_peer_device(connection, pi->vnr);
3775 if (!peer_device)
3776 return config_unknown_volume(connection, pi);
3777 device = peer_device->device;
3778
3779 p_size = be64_to_cpu(p->d_size);
3780 p_usize = be64_to_cpu(p->u_size);
3781 p_csize = be64_to_cpu(p->c_size);
3782
3783 /* just store the peer's disk size for now.
3784 * we still need to figure out whether we accept that. */
3785 device->p_size = p_size;
3786
3787 if (get_ldev(device)) {
3788 rcu_read_lock();
3789 my_usize = rcu_dereference(device->ldev->disk_conf)->disk_size;
3790 rcu_read_unlock();
3791
3792 warn_if_differ_considerably(device, "lower level device sizes",
3793 p_size, drbd_get_max_capacity(device->ldev));
3794 warn_if_differ_considerably(device, "user requested size",
3795 p_usize, my_usize);
3796
3797 /* if this is the first connect, or an otherwise expected
3798 * param exchange, choose the minimum */
3799 if (device->state.conn == C_WF_REPORT_PARAMS)
3800 p_usize = min_not_zero(my_usize, p_usize);
3801
3802 /* Never shrink a device with usable data during connect.
3803 But allow online shrinking if we are connected. */
3804 if (drbd_new_dev_size(device, device->ldev, p_usize, 0) <
3805 drbd_get_capacity(device->this_bdev) &&
3806 device->state.disk >= D_OUTDATED &&
3807 device->state.conn < C_CONNECTED) {
3808 drbd_err(device, "The peer's disk size is too small!\n");
3809 conn_request_state(peer_device->connection, NS(conn, C_DISCONNECTING), CS_HARD);
3810 put_ldev(device);
3811 return -EIO;
3812 }
3813
3814 if (my_usize != p_usize) {
3815 struct disk_conf *old_disk_conf, *new_disk_conf = NULL;
3816
3817 new_disk_conf = kzalloc(sizeof(struct disk_conf), GFP_KERNEL);
3818 if (!new_disk_conf) {
3819 drbd_err(device, "Allocation of new disk_conf failed\n");
3820 put_ldev(device);
3821 return -ENOMEM;
3822 }
3823
3824 mutex_lock(&connection->resource->conf_update);
3825 old_disk_conf = device->ldev->disk_conf;
3826 *new_disk_conf = *old_disk_conf;
3827 new_disk_conf->disk_size = p_usize;
3828
3829 rcu_assign_pointer(device->ldev->disk_conf, new_disk_conf);
3830 mutex_unlock(&connection->resource->conf_update);
3831 synchronize_rcu();
3832 kfree(old_disk_conf);
3833
3834 drbd_info(device, "Peer sets u_size to %lu sectors\n",
3835 (unsigned long)my_usize);
3836 }
3837
3838 put_ldev(device);
3839 }
3840
3841 device->peer_max_bio_size = be32_to_cpu(p->max_bio_size);
3842 /* Leave drbd_reconsider_max_bio_size() before drbd_determine_dev_size().
3843 In case we cleared the QUEUE_FLAG_DISCARD from our queue in
3844 drbd_reconsider_max_bio_size(), we can be sure that after
3845 drbd_determine_dev_size() no REQ_DISCARDs are in the queue. */
3846
3847 ddsf = be16_to_cpu(p->dds_flags);
3848 if (get_ldev(device)) {
3849 drbd_reconsider_max_bio_size(device, device->ldev);
3850 dd = drbd_determine_dev_size(device, ddsf, NULL);
3851 put_ldev(device);
3852 if (dd == DS_ERROR)
3853 return -EIO;
3854 drbd_md_sync(device);
3855 } else {
3856 /*
3857 * I am diskless, need to accept the peer's *current* size.
3858 * I must NOT accept the peers backing disk size,
3859 * it may have been larger than mine all along...
3860 *
3861 * At this point, the peer knows more about my disk, or at
3862 * least about what we last agreed upon, than myself.
3863 * So if his c_size is less than his d_size, the most likely
3864 * reason is that *my* d_size was smaller last time we checked.
3865 *
3866 * However, if he sends a zero current size,
3867 * take his (user-capped or) backing disk size anyways.
3868 */
3869 drbd_reconsider_max_bio_size(device, NULL);
3870 drbd_set_my_capacity(device, p_csize ?: p_usize ?: p_size);
3871 }
3872
3873 if (get_ldev(device)) {
3874 if (device->ldev->known_size != drbd_get_capacity(device->ldev->backing_bdev)) {
3875 device->ldev->known_size = drbd_get_capacity(device->ldev->backing_bdev);
3876 ldsc = 1;
3877 }
3878
3879 put_ldev(device);
3880 }
3881
3882 if (device->state.conn > C_WF_REPORT_PARAMS) {
3883 if (be64_to_cpu(p->c_size) !=
3884 drbd_get_capacity(device->this_bdev) || ldsc) {
3885 /* we have different sizes, probably peer
3886 * needs to know my new size... */
3887 drbd_send_sizes(peer_device, 0, ddsf);
3888 }
3889 if (test_and_clear_bit(RESIZE_PENDING, &device->flags) ||
3890 (dd == DS_GREW && device->state.conn == C_CONNECTED)) {
3891 if (device->state.pdsk >= D_INCONSISTENT &&
3892 device->state.disk >= D_INCONSISTENT) {
3893 if (ddsf & DDSF_NO_RESYNC)
3894 drbd_info(device, "Resync of new storage suppressed with --assume-clean\n");
3895 else
3896 resync_after_online_grow(device);
3897 } else
3898 set_bit(RESYNC_AFTER_NEG, &device->flags);
3899 }
3900 }
3901
3902 return 0;
3903 }
3904
3905 static int receive_uuids(struct drbd_connection *connection, struct packet_info *pi)
3906 {
3907 struct drbd_peer_device *peer_device;
3908 struct drbd_device *device;
3909 struct p_uuids *p = pi->data;
3910 u64 *p_uuid;
3911 int i, updated_uuids = 0;
3912
3913 peer_device = conn_peer_device(connection, pi->vnr);
3914 if (!peer_device)
3915 return config_unknown_volume(connection, pi);
3916 device = peer_device->device;
3917
3918 p_uuid = kmalloc(sizeof(u64)*UI_EXTENDED_SIZE, GFP_NOIO);
3919 if (!p_uuid) {
3920 drbd_err(device, "kmalloc of p_uuid failed\n");
3921 return false;
3922 }
3923
3924 for (i = UI_CURRENT; i < UI_EXTENDED_SIZE; i++)
3925 p_uuid[i] = be64_to_cpu(p->uuid[i]);
3926
3927 kfree(device->p_uuid);
3928 device->p_uuid = p_uuid;
3929
3930 if (device->state.conn < C_CONNECTED &&
3931 device->state.disk < D_INCONSISTENT &&
3932 device->state.role == R_PRIMARY &&
3933 (device->ed_uuid & ~((u64)1)) != (p_uuid[UI_CURRENT] & ~((u64)1))) {
3934 drbd_err(device, "Can only connect to data with current UUID=%016llX\n",
3935 (unsigned long long)device->ed_uuid);
3936 conn_request_state(peer_device->connection, NS(conn, C_DISCONNECTING), CS_HARD);
3937 return -EIO;
3938 }
3939
3940 if (get_ldev(device)) {
3941 int skip_initial_sync =
3942 device->state.conn == C_CONNECTED &&
3943 peer_device->connection->agreed_pro_version >= 90 &&
3944 device->ldev->md.uuid[UI_CURRENT] == UUID_JUST_CREATED &&
3945 (p_uuid[UI_FLAGS] & 8);
3946 if (skip_initial_sync) {
3947 drbd_info(device, "Accepted new current UUID, preparing to skip initial sync\n");
3948 drbd_bitmap_io(device, &drbd_bmio_clear_n_write,
3949 "clear_n_write from receive_uuids",
3950 BM_LOCKED_TEST_ALLOWED);
3951 _drbd_uuid_set(device, UI_CURRENT, p_uuid[UI_CURRENT]);
3952 _drbd_uuid_set(device, UI_BITMAP, 0);
3953 _drbd_set_state(_NS2(device, disk, D_UP_TO_DATE, pdsk, D_UP_TO_DATE),
3954 CS_VERBOSE, NULL);
3955 drbd_md_sync(device);
3956 updated_uuids = 1;
3957 }
3958 put_ldev(device);
3959 } else if (device->state.disk < D_INCONSISTENT &&
3960 device->state.role == R_PRIMARY) {
3961 /* I am a diskless primary, the peer just created a new current UUID
3962 for me. */
3963 updated_uuids = drbd_set_ed_uuid(device, p_uuid[UI_CURRENT]);
3964 }
3965
3966 /* Before we test for the disk state, we should wait until an eventually
3967 ongoing cluster wide state change is finished. That is important if
3968 we are primary and are detaching from our disk. We need to see the
3969 new disk state... */
3970 mutex_lock(device->state_mutex);
3971 mutex_unlock(device->state_mutex);
3972 if (device->state.conn >= C_CONNECTED && device->state.disk < D_INCONSISTENT)
3973 updated_uuids |= drbd_set_ed_uuid(device, p_uuid[UI_CURRENT]);
3974
3975 if (updated_uuids)
3976 drbd_print_uuids(device, "receiver updated UUIDs to");
3977
3978 return 0;
3979 }
3980
3981 /**
3982 * convert_state() - Converts the peer's view of the cluster state to our point of view
3983 * @ps: The state as seen by the peer.
3984 */
3985 static union drbd_state convert_state(union drbd_state ps)
3986 {
3987 union drbd_state ms;
3988
3989 static enum drbd_conns c_tab[] = {
3990 [C_WF_REPORT_PARAMS] = C_WF_REPORT_PARAMS,
3991 [C_CONNECTED] = C_CONNECTED,
3992
3993 [C_STARTING_SYNC_S] = C_STARTING_SYNC_T,
3994 [C_STARTING_SYNC_T] = C_STARTING_SYNC_S,
3995 [C_DISCONNECTING] = C_TEAR_DOWN, /* C_NETWORK_FAILURE, */
3996 [C_VERIFY_S] = C_VERIFY_T,
3997 [C_MASK] = C_MASK,
3998 };
3999
4000 ms.i = ps.i;
4001
4002 ms.conn = c_tab[ps.conn];
4003 ms.peer = ps.role;
4004 ms.role = ps.peer;
4005 ms.pdsk = ps.disk;
4006 ms.disk = ps.pdsk;
4007 ms.peer_isp = (ps.aftr_isp | ps.user_isp);
4008
4009 return ms;
4010 }
4011
4012 static int receive_req_state(struct drbd_connection *connection, struct packet_info *pi)
4013 {
4014 struct drbd_peer_device *peer_device;
4015 struct drbd_device *device;
4016 struct p_req_state *p = pi->data;
4017 union drbd_state mask, val;
4018 enum drbd_state_rv rv;
4019
4020 peer_device = conn_peer_device(connection, pi->vnr);
4021 if (!peer_device)
4022 return -EIO;
4023 device = peer_device->device;
4024
4025 mask.i = be32_to_cpu(p->mask);
4026 val.i = be32_to_cpu(p->val);
4027
4028 if (test_bit(RESOLVE_CONFLICTS, &peer_device->connection->flags) &&
4029 mutex_is_locked(device->state_mutex)) {
4030 drbd_send_sr_reply(peer_device, SS_CONCURRENT_ST_CHG);
4031 return 0;
4032 }
4033
4034 mask = convert_state(mask);
4035 val = convert_state(val);
4036
4037 rv = drbd_change_state(device, CS_VERBOSE, mask, val);
4038 drbd_send_sr_reply(peer_device, rv);
4039
4040 drbd_md_sync(device);
4041
4042 return 0;
4043 }
4044
4045 static int receive_req_conn_state(struct drbd_connection *connection, struct packet_info *pi)
4046 {
4047 struct p_req_state *p = pi->data;
4048 union drbd_state mask, val;
4049 enum drbd_state_rv rv;
4050
4051 mask.i = be32_to_cpu(p->mask);
4052 val.i = be32_to_cpu(p->val);
4053
4054 if (test_bit(RESOLVE_CONFLICTS, &connection->flags) &&
4055 mutex_is_locked(&connection->cstate_mutex)) {
4056 conn_send_sr_reply(connection, SS_CONCURRENT_ST_CHG);
4057 return 0;
4058 }
4059
4060 mask = convert_state(mask);
4061 val = convert_state(val);
4062
4063 rv = conn_request_state(connection, mask, val, CS_VERBOSE | CS_LOCAL_ONLY | CS_IGN_OUTD_FAIL);
4064 conn_send_sr_reply(connection, rv);
4065
4066 return 0;
4067 }
4068
4069 static int receive_state(struct drbd_connection *connection, struct packet_info *pi)
4070 {
4071 struct drbd_peer_device *peer_device;
4072 struct drbd_device *device;
4073 struct p_state *p = pi->data;
4074 union drbd_state os, ns, peer_state;
4075 enum drbd_disk_state real_peer_disk;
4076 enum chg_state_flags cs_flags;
4077 int rv;
4078
4079 peer_device = conn_peer_device(connection, pi->vnr);
4080 if (!peer_device)
4081 return config_unknown_volume(connection, pi);
4082 device = peer_device->device;
4083
4084 peer_state.i = be32_to_cpu(p->state);
4085
4086 real_peer_disk = peer_state.disk;
4087 if (peer_state.disk == D_NEGOTIATING) {
4088 real_peer_disk = device->p_uuid[UI_FLAGS] & 4 ? D_INCONSISTENT : D_CONSISTENT;
4089 drbd_info(device, "real peer disk state = %s\n", drbd_disk_str(real_peer_disk));
4090 }
4091
4092 spin_lock_irq(&device->resource->req_lock);
4093 retry:
4094 os = ns = drbd_read_state(device);
4095 spin_unlock_irq(&device->resource->req_lock);
4096
4097 /* If some other part of the code (ack_receiver thread, timeout)
4098 * already decided to close the connection again,
4099 * we must not "re-establish" it here. */
4100 if (os.conn <= C_TEAR_DOWN)
4101 return -ECONNRESET;
4102
4103 /* If this is the "end of sync" confirmation, usually the peer disk
4104 * transitions from D_INCONSISTENT to D_UP_TO_DATE. For empty (0 bits
4105 * set) resync started in PausedSyncT, or if the timing of pause-/
4106 * unpause-sync events has been "just right", the peer disk may
4107 * transition from D_CONSISTENT to D_UP_TO_DATE as well.
4108 */
4109 if ((os.pdsk == D_INCONSISTENT || os.pdsk == D_CONSISTENT) &&
4110 real_peer_disk == D_UP_TO_DATE &&
4111 os.conn > C_CONNECTED && os.disk == D_UP_TO_DATE) {
4112 /* If we are (becoming) SyncSource, but peer is still in sync
4113 * preparation, ignore its uptodate-ness to avoid flapping, it
4114 * will change to inconsistent once the peer reaches active
4115 * syncing states.
4116 * It may have changed syncer-paused flags, however, so we
4117 * cannot ignore this completely. */
4118 if (peer_state.conn > C_CONNECTED &&
4119 peer_state.conn < C_SYNC_SOURCE)
4120 real_peer_disk = D_INCONSISTENT;
4121
4122 /* if peer_state changes to connected at the same time,
4123 * it explicitly notifies us that it finished resync.
4124 * Maybe we should finish it up, too? */
4125 else if (os.conn >= C_SYNC_SOURCE &&
4126 peer_state.conn == C_CONNECTED) {
4127 if (drbd_bm_total_weight(device) <= device->rs_failed)
4128 drbd_resync_finished(device);
4129 return 0;
4130 }
4131 }
4132
4133 /* explicit verify finished notification, stop sector reached. */
4134 if (os.conn == C_VERIFY_T && os.disk == D_UP_TO_DATE &&
4135 peer_state.conn == C_CONNECTED && real_peer_disk == D_UP_TO_DATE) {
4136 ov_out_of_sync_print(device);
4137 drbd_resync_finished(device);
4138 return 0;
4139 }
4140
4141 /* peer says his disk is inconsistent, while we think it is uptodate,
4142 * and this happens while the peer still thinks we have a sync going on,
4143 * but we think we are already done with the sync.
4144 * We ignore this to avoid flapping pdsk.
4145 * This should not happen, if the peer is a recent version of drbd. */
4146 if (os.pdsk == D_UP_TO_DATE && real_peer_disk == D_INCONSISTENT &&
4147 os.conn == C_CONNECTED && peer_state.conn > C_SYNC_SOURCE)
4148 real_peer_disk = D_UP_TO_DATE;
4149
4150 if (ns.conn == C_WF_REPORT_PARAMS)
4151 ns.conn = C_CONNECTED;
4152
4153 if (peer_state.conn == C_AHEAD)
4154 ns.conn = C_BEHIND;
4155
4156 if (device->p_uuid && peer_state.disk >= D_NEGOTIATING &&
4157 get_ldev_if_state(device, D_NEGOTIATING)) {
4158 int cr; /* consider resync */
4159
4160 /* if we established a new connection */
4161 cr = (os.conn < C_CONNECTED);
4162 /* if we had an established connection
4163 * and one of the nodes newly attaches a disk */
4164 cr |= (os.conn == C_CONNECTED &&
4165 (peer_state.disk == D_NEGOTIATING ||
4166 os.disk == D_NEGOTIATING));
4167 /* if we have both been inconsistent, and the peer has been
4168 * forced to be UpToDate with --overwrite-data */
4169 cr |= test_bit(CONSIDER_RESYNC, &device->flags);
4170 /* if we had been plain connected, and the admin requested to
4171 * start a sync by "invalidate" or "invalidate-remote" */
4172 cr |= (os.conn == C_CONNECTED &&
4173 (peer_state.conn >= C_STARTING_SYNC_S &&
4174 peer_state.conn <= C_WF_BITMAP_T));
4175
4176 if (cr)
4177 ns.conn = drbd_sync_handshake(peer_device, peer_state.role, real_peer_disk);
4178
4179 put_ldev(device);
4180 if (ns.conn == C_MASK) {
4181 ns.conn = C_CONNECTED;
4182 if (device->state.disk == D_NEGOTIATING) {
4183 drbd_force_state(device, NS(disk, D_FAILED));
4184 } else if (peer_state.disk == D_NEGOTIATING) {
4185 drbd_err(device, "Disk attach process on the peer node was aborted.\n");
4186 peer_state.disk = D_DISKLESS;
4187 real_peer_disk = D_DISKLESS;
4188 } else {
4189 if (test_and_clear_bit(CONN_DRY_RUN, &peer_device->connection->flags))
4190 return -EIO;
4191 D_ASSERT(device, os.conn == C_WF_REPORT_PARAMS);
4192 conn_request_state(peer_device->connection, NS(conn, C_DISCONNECTING), CS_HARD);
4193 return -EIO;
4194 }
4195 }
4196 }
4197
4198 spin_lock_irq(&device->resource->req_lock);
4199 if (os.i != drbd_read_state(device).i)
4200 goto retry;
4201 clear_bit(CONSIDER_RESYNC, &device->flags);
4202 ns.peer = peer_state.role;
4203 ns.pdsk = real_peer_disk;
4204 ns.peer_isp = (peer_state.aftr_isp | peer_state.user_isp);
4205 if ((ns.conn == C_CONNECTED || ns.conn == C_WF_BITMAP_S) && ns.disk == D_NEGOTIATING)
4206 ns.disk = device->new_state_tmp.disk;
4207 cs_flags = CS_VERBOSE + (os.conn < C_CONNECTED && ns.conn >= C_CONNECTED ? 0 : CS_HARD);
4208 if (ns.pdsk == D_CONSISTENT && drbd_suspended(device) && ns.conn == C_CONNECTED && os.conn < C_CONNECTED &&
4209 test_bit(NEW_CUR_UUID, &device->flags)) {
4210 /* Do not allow tl_restart(RESEND) for a rebooted peer. We can only allow this
4211 for temporal network outages! */
4212 spin_unlock_irq(&device->resource->req_lock);
4213 drbd_err(device, "Aborting Connect, can not thaw IO with an only Consistent peer\n");
4214 tl_clear(peer_device->connection);
4215 drbd_uuid_new_current(device);
4216 clear_bit(NEW_CUR_UUID, &device->flags);
4217 conn_request_state(peer_device->connection, NS2(conn, C_PROTOCOL_ERROR, susp, 0), CS_HARD);
4218 return -EIO;
4219 }
4220 rv = _drbd_set_state(device, ns, cs_flags, NULL);
4221 ns = drbd_read_state(device);
4222 spin_unlock_irq(&device->resource->req_lock);
4223
4224 if (rv < SS_SUCCESS) {
4225 conn_request_state(peer_device->connection, NS(conn, C_DISCONNECTING), CS_HARD);
4226 return -EIO;
4227 }
4228
4229 if (os.conn > C_WF_REPORT_PARAMS) {
4230 if (ns.conn > C_CONNECTED && peer_state.conn <= C_CONNECTED &&
4231 peer_state.disk != D_NEGOTIATING ) {
4232 /* we want resync, peer has not yet decided to sync... */
4233 /* Nowadays only used when forcing a node into primary role and
4234 setting its disk to UpToDate with that */
4235 drbd_send_uuids(peer_device);
4236 drbd_send_current_state(peer_device);
4237 }
4238 }
4239
4240 clear_bit(DISCARD_MY_DATA, &device->flags);
4241
4242 drbd_md_sync(device); /* update connected indicator, la_size_sect, ... */
4243
4244 return 0;
4245 }
4246
4247 static int receive_sync_uuid(struct drbd_connection *connection, struct packet_info *pi)
4248 {
4249 struct drbd_peer_device *peer_device;
4250 struct drbd_device *device;
4251 struct p_rs_uuid *p = pi->data;
4252
4253 peer_device = conn_peer_device(connection, pi->vnr);
4254 if (!peer_device)
4255 return -EIO;
4256 device = peer_device->device;
4257
4258 wait_event(device->misc_wait,
4259 device->state.conn == C_WF_SYNC_UUID ||
4260 device->state.conn == C_BEHIND ||
4261 device->state.conn < C_CONNECTED ||
4262 device->state.disk < D_NEGOTIATING);
4263
4264 /* D_ASSERT(device, device->state.conn == C_WF_SYNC_UUID ); */
4265
4266 /* Here the _drbd_uuid_ functions are right, current should
4267 _not_ be rotated into the history */
4268 if (get_ldev_if_state(device, D_NEGOTIATING)) {
4269 _drbd_uuid_set(device, UI_CURRENT, be64_to_cpu(p->uuid));
4270 _drbd_uuid_set(device, UI_BITMAP, 0UL);
4271
4272 drbd_print_uuids(device, "updated sync uuid");
4273 drbd_start_resync(device, C_SYNC_TARGET);
4274
4275 put_ldev(device);
4276 } else
4277 drbd_err(device, "Ignoring SyncUUID packet!\n");
4278
4279 return 0;
4280 }
4281
4282 /**
4283 * receive_bitmap_plain
4284 *
4285 * Return 0 when done, 1 when another iteration is needed, and a negative error
4286 * code upon failure.
4287 */
4288 static int
4289 receive_bitmap_plain(struct drbd_peer_device *peer_device, unsigned int size,
4290 unsigned long *p, struct bm_xfer_ctx *c)
4291 {
4292 unsigned int data_size = DRBD_SOCKET_BUFFER_SIZE -
4293 drbd_header_size(peer_device->connection);
4294 unsigned int num_words = min_t(size_t, data_size / sizeof(*p),
4295 c->bm_words - c->word_offset);
4296 unsigned int want = num_words * sizeof(*p);
4297 int err;
4298
4299 if (want != size) {
4300 drbd_err(peer_device, "%s:want (%u) != size (%u)\n", __func__, want, size);
4301 return -EIO;
4302 }
4303 if (want == 0)
4304 return 0;
4305 err = drbd_recv_all(peer_device->connection, p, want);
4306 if (err)
4307 return err;
4308
4309 drbd_bm_merge_lel(peer_device->device, c->word_offset, num_words, p);
4310
4311 c->word_offset += num_words;
4312 c->bit_offset = c->word_offset * BITS_PER_LONG;
4313 if (c->bit_offset > c->bm_bits)
4314 c->bit_offset = c->bm_bits;
4315
4316 return 1;
4317 }
4318
4319 static enum drbd_bitmap_code dcbp_get_code(struct p_compressed_bm *p)
4320 {
4321 return (enum drbd_bitmap_code)(p->encoding & 0x0f);
4322 }
4323
4324 static int dcbp_get_start(struct p_compressed_bm *p)
4325 {
4326 return (p->encoding & 0x80) != 0;
4327 }
4328
4329 static int dcbp_get_pad_bits(struct p_compressed_bm *p)
4330 {
4331 return (p->encoding >> 4) & 0x7;
4332 }
4333
4334 /**
4335 * recv_bm_rle_bits
4336 *
4337 * Return 0 when done, 1 when another iteration is needed, and a negative error
4338 * code upon failure.
4339 */
4340 static int
4341 recv_bm_rle_bits(struct drbd_peer_device *peer_device,
4342 struct p_compressed_bm *p,
4343 struct bm_xfer_ctx *c,
4344 unsigned int len)
4345 {
4346 struct bitstream bs;
4347 u64 look_ahead;
4348 u64 rl;
4349 u64 tmp;
4350 unsigned long s = c->bit_offset;
4351 unsigned long e;
4352 int toggle = dcbp_get_start(p);
4353 int have;
4354 int bits;
4355
4356 bitstream_init(&bs, p->code, len, dcbp_get_pad_bits(p));
4357
4358 bits = bitstream_get_bits(&bs, &look_ahead, 64);
4359 if (bits < 0)
4360 return -EIO;
4361
4362 for (have = bits; have > 0; s += rl, toggle = !toggle) {
4363 bits = vli_decode_bits(&rl, look_ahead);
4364 if (bits <= 0)
4365 return -EIO;
4366
4367 if (toggle) {
4368 e = s + rl -1;
4369 if (e >= c->bm_bits) {
4370 drbd_err(peer_device, "bitmap overflow (e:%lu) while decoding bm RLE packet\n", e);
4371 return -EIO;
4372 }
4373 _drbd_bm_set_bits(peer_device->device, s, e);
4374 }
4375
4376 if (have < bits) {
4377 drbd_err(peer_device, "bitmap decoding error: h:%d b:%d la:0x%08llx l:%u/%u\n",
4378 have, bits, look_ahead,
4379 (unsigned int)(bs.cur.b - p->code),
4380 (unsigned int)bs.buf_len);
4381 return -EIO;
4382 }
4383 /* if we consumed all 64 bits, assign 0; >> 64 is "undefined"; */
4384 if (likely(bits < 64))
4385 look_ahead >>= bits;
4386 else
4387 look_ahead = 0;
4388 have -= bits;
4389
4390 bits = bitstream_get_bits(&bs, &tmp, 64 - have);
4391 if (bits < 0)
4392 return -EIO;
4393 look_ahead |= tmp << have;
4394 have += bits;
4395 }
4396
4397 c->bit_offset = s;
4398 bm_xfer_ctx_bit_to_word_offset(c);
4399
4400 return (s != c->bm_bits);
4401 }
4402
4403 /**
4404 * decode_bitmap_c
4405 *
4406 * Return 0 when done, 1 when another iteration is needed, and a negative error
4407 * code upon failure.
4408 */
4409 static int
4410 decode_bitmap_c(struct drbd_peer_device *peer_device,
4411 struct p_compressed_bm *p,
4412 struct bm_xfer_ctx *c,
4413 unsigned int len)
4414 {
4415 if (dcbp_get_code(p) == RLE_VLI_Bits)
4416 return recv_bm_rle_bits(peer_device, p, c, len - sizeof(*p));
4417
4418 /* other variants had been implemented for evaluation,
4419 * but have been dropped as this one turned out to be "best"
4420 * during all our tests. */
4421
4422 drbd_err(peer_device, "receive_bitmap_c: unknown encoding %u\n", p->encoding);
4423 conn_request_state(peer_device->connection, NS(conn, C_PROTOCOL_ERROR), CS_HARD);
4424 return -EIO;
4425 }
4426
4427 void INFO_bm_xfer_stats(struct drbd_device *device,
4428 const char *direction, struct bm_xfer_ctx *c)
4429 {
4430 /* what would it take to transfer it "plaintext" */
4431 unsigned int header_size = drbd_header_size(first_peer_device(device)->connection);
4432 unsigned int data_size = DRBD_SOCKET_BUFFER_SIZE - header_size;
4433 unsigned int plain =
4434 header_size * (DIV_ROUND_UP(c->bm_words, data_size) + 1) +
4435 c->bm_words * sizeof(unsigned long);
4436 unsigned int total = c->bytes[0] + c->bytes[1];
4437 unsigned int r;
4438
4439 /* total can not be zero. but just in case: */
4440 if (total == 0)
4441 return;
4442
4443 /* don't report if not compressed */
4444 if (total >= plain)
4445 return;
4446
4447 /* total < plain. check for overflow, still */
4448 r = (total > UINT_MAX/1000) ? (total / (plain/1000))
4449 : (1000 * total / plain);
4450
4451 if (r > 1000)
4452 r = 1000;
4453
4454 r = 1000 - r;
4455 drbd_info(device, "%s bitmap stats [Bytes(packets)]: plain %u(%u), RLE %u(%u), "
4456 "total %u; compression: %u.%u%%\n",
4457 direction,
4458 c->bytes[1], c->packets[1],
4459 c->bytes[0], c->packets[0],
4460 total, r/10, r % 10);
4461 }
4462
4463 /* Since we are processing the bitfield from lower addresses to higher,
4464 it does not matter if the process it in 32 bit chunks or 64 bit
4465 chunks as long as it is little endian. (Understand it as byte stream,
4466 beginning with the lowest byte...) If we would use big endian
4467 we would need to process it from the highest address to the lowest,
4468 in order to be agnostic to the 32 vs 64 bits issue.
4469
4470 returns 0 on failure, 1 if we successfully received it. */
4471 static int receive_bitmap(struct drbd_connection *connection, struct packet_info *pi)
4472 {
4473 struct drbd_peer_device *peer_device;
4474 struct drbd_device *device;
4475 struct bm_xfer_ctx c;
4476 int err;
4477
4478 peer_device = conn_peer_device(connection, pi->vnr);
4479 if (!peer_device)
4480 return -EIO;
4481 device = peer_device->device;
4482
4483 drbd_bm_lock(device, "receive bitmap", BM_LOCKED_SET_ALLOWED);
4484 /* you are supposed to send additional out-of-sync information
4485 * if you actually set bits during this phase */
4486
4487 c = (struct bm_xfer_ctx) {
4488 .bm_bits = drbd_bm_bits(device),
4489 .bm_words = drbd_bm_words(device),
4490 };
4491
4492 for(;;) {
4493 if (pi->cmd == P_BITMAP)
4494 err = receive_bitmap_plain(peer_device, pi->size, pi->data, &c);
4495 else if (pi->cmd == P_COMPRESSED_BITMAP) {
4496 /* MAYBE: sanity check that we speak proto >= 90,
4497 * and the feature is enabled! */
4498 struct p_compressed_bm *p = pi->data;
4499
4500 if (pi->size > DRBD_SOCKET_BUFFER_SIZE - drbd_header_size(connection)) {
4501 drbd_err(device, "ReportCBitmap packet too large\n");
4502 err = -EIO;
4503 goto out;
4504 }
4505 if (pi->size <= sizeof(*p)) {
4506 drbd_err(device, "ReportCBitmap packet too small (l:%u)\n", pi->size);
4507 err = -EIO;
4508 goto out;
4509 }
4510 err = drbd_recv_all(peer_device->connection, p, pi->size);
4511 if (err)
4512 goto out;
4513 err = decode_bitmap_c(peer_device, p, &c, pi->size);
4514 } else {
4515 drbd_warn(device, "receive_bitmap: cmd neither ReportBitMap nor ReportCBitMap (is 0x%x)", pi->cmd);
4516 err = -EIO;
4517 goto out;
4518 }
4519
4520 c.packets[pi->cmd == P_BITMAP]++;
4521 c.bytes[pi->cmd == P_BITMAP] += drbd_header_size(connection) + pi->size;
4522
4523 if (err <= 0) {
4524 if (err < 0)
4525 goto out;
4526 break;
4527 }
4528 err = drbd_recv_header(peer_device->connection, pi);
4529 if (err)
4530 goto out;
4531 }
4532
4533 INFO_bm_xfer_stats(device, "receive", &c);
4534
4535 if (device->state.conn == C_WF_BITMAP_T) {
4536 enum drbd_state_rv rv;
4537
4538 err = drbd_send_bitmap(device);
4539 if (err)
4540 goto out;
4541 /* Omit CS_ORDERED with this state transition to avoid deadlocks. */
4542 rv = _drbd_request_state(device, NS(conn, C_WF_SYNC_UUID), CS_VERBOSE);
4543 D_ASSERT(device, rv == SS_SUCCESS);
4544 } else if (device->state.conn != C_WF_BITMAP_S) {
4545 /* admin may have requested C_DISCONNECTING,
4546 * other threads may have noticed network errors */
4547 drbd_info(device, "unexpected cstate (%s) in receive_bitmap\n",
4548 drbd_conn_str(device->state.conn));
4549 }
4550 err = 0;
4551
4552 out:
4553 drbd_bm_unlock(device);
4554 if (!err && device->state.conn == C_WF_BITMAP_S)
4555 drbd_start_resync(device, C_SYNC_SOURCE);
4556 return err;
4557 }
4558
4559 static int receive_skip(struct drbd_connection *connection, struct packet_info *pi)
4560 {
4561 drbd_warn(connection, "skipping unknown optional packet type %d, l: %d!\n",
4562 pi->cmd, pi->size);
4563
4564 return ignore_remaining_packet(connection, pi);
4565 }
4566
4567 static int receive_UnplugRemote(struct drbd_connection *connection, struct packet_info *pi)
4568 {
4569 /* Make sure we've acked all the TCP data associated
4570 * with the data requests being unplugged */
4571 drbd_tcp_quickack(connection->data.socket);
4572
4573 return 0;
4574 }
4575
4576 static int receive_out_of_sync(struct drbd_connection *connection, struct packet_info *pi)
4577 {
4578 struct drbd_peer_device *peer_device;
4579 struct drbd_device *device;
4580 struct p_block_desc *p = pi->data;
4581
4582 peer_device = conn_peer_device(connection, pi->vnr);
4583 if (!peer_device)
4584 return -EIO;
4585 device = peer_device->device;
4586
4587 switch (device->state.conn) {
4588 case C_WF_SYNC_UUID:
4589 case C_WF_BITMAP_T:
4590 case C_BEHIND:
4591 break;
4592 default:
4593 drbd_err(device, "ASSERT FAILED cstate = %s, expected: WFSyncUUID|WFBitMapT|Behind\n",
4594 drbd_conn_str(device->state.conn));
4595 }
4596
4597 drbd_set_out_of_sync(device, be64_to_cpu(p->sector), be32_to_cpu(p->blksize));
4598
4599 return 0;
4600 }
4601
4602 struct data_cmd {
4603 int expect_payload;
4604 size_t pkt_size;
4605 int (*fn)(struct drbd_connection *, struct packet_info *);
4606 };
4607
4608 static struct data_cmd drbd_cmd_handler[] = {
4609 [P_DATA] = { 1, sizeof(struct p_data), receive_Data },
4610 [P_DATA_REPLY] = { 1, sizeof(struct p_data), receive_DataReply },
4611 [P_RS_DATA_REPLY] = { 1, sizeof(struct p_data), receive_RSDataReply } ,
4612 [P_BARRIER] = { 0, sizeof(struct p_barrier), receive_Barrier } ,
4613 [P_BITMAP] = { 1, 0, receive_bitmap } ,
4614 [P_COMPRESSED_BITMAP] = { 1, 0, receive_bitmap } ,
4615 [P_UNPLUG_REMOTE] = { 0, 0, receive_UnplugRemote },
4616 [P_DATA_REQUEST] = { 0, sizeof(struct p_block_req), receive_DataRequest },
4617 [P_RS_DATA_REQUEST] = { 0, sizeof(struct p_block_req), receive_DataRequest },
4618 [P_SYNC_PARAM] = { 1, 0, receive_SyncParam },
4619 [P_SYNC_PARAM89] = { 1, 0, receive_SyncParam },
4620 [P_PROTOCOL] = { 1, sizeof(struct p_protocol), receive_protocol },
4621 [P_UUIDS] = { 0, sizeof(struct p_uuids), receive_uuids },
4622 [P_SIZES] = { 0, sizeof(struct p_sizes), receive_sizes },
4623 [P_STATE] = { 0, sizeof(struct p_state), receive_state },
4624 [P_STATE_CHG_REQ] = { 0, sizeof(struct p_req_state), receive_req_state },
4625 [P_SYNC_UUID] = { 0, sizeof(struct p_rs_uuid), receive_sync_uuid },
4626 [P_OV_REQUEST] = { 0, sizeof(struct p_block_req), receive_DataRequest },
4627 [P_OV_REPLY] = { 1, sizeof(struct p_block_req), receive_DataRequest },
4628 [P_CSUM_RS_REQUEST] = { 1, sizeof(struct p_block_req), receive_DataRequest },
4629 [P_DELAY_PROBE] = { 0, sizeof(struct p_delay_probe93), receive_skip },
4630 [P_OUT_OF_SYNC] = { 0, sizeof(struct p_block_desc), receive_out_of_sync },
4631 [P_CONN_ST_CHG_REQ] = { 0, sizeof(struct p_req_state), receive_req_conn_state },
4632 [P_PROTOCOL_UPDATE] = { 1, sizeof(struct p_protocol), receive_protocol },
4633 [P_TRIM] = { 0, sizeof(struct p_trim), receive_Data },
4634 };
4635
4636 static void drbdd(struct drbd_connection *connection)
4637 {
4638 struct packet_info pi;
4639 size_t shs; /* sub header size */
4640 int err;
4641
4642 while (get_t_state(&connection->receiver) == RUNNING) {
4643 struct data_cmd *cmd;
4644
4645 drbd_thread_current_set_cpu(&connection->receiver);
4646 update_receiver_timing_details(connection, drbd_recv_header);
4647 if (drbd_recv_header(connection, &pi))
4648 goto err_out;
4649
4650 cmd = &drbd_cmd_handler[pi.cmd];
4651 if (unlikely(pi.cmd >= ARRAY_SIZE(drbd_cmd_handler) || !cmd->fn)) {
4652 drbd_err(connection, "Unexpected data packet %s (0x%04x)",
4653 cmdname(pi.cmd), pi.cmd);
4654 goto err_out;
4655 }
4656
4657 shs = cmd->pkt_size;
4658 if (pi.size > shs && !cmd->expect_payload) {
4659 drbd_err(connection, "No payload expected %s l:%d\n",
4660 cmdname(pi.cmd), pi.size);
4661 goto err_out;
4662 }
4663
4664 if (shs) {
4665 update_receiver_timing_details(connection, drbd_recv_all_warn);
4666 err = drbd_recv_all_warn(connection, pi.data, shs);
4667 if (err)
4668 goto err_out;
4669 pi.size -= shs;
4670 }
4671
4672 update_receiver_timing_details(connection, cmd->fn);
4673 err = cmd->fn(connection, &pi);
4674 if (err) {
4675 drbd_err(connection, "error receiving %s, e: %d l: %d!\n",
4676 cmdname(pi.cmd), err, pi.size);
4677 goto err_out;
4678 }
4679 }
4680 return;
4681
4682 err_out:
4683 conn_request_state(connection, NS(conn, C_PROTOCOL_ERROR), CS_HARD);
4684 }
4685
4686 static void conn_disconnect(struct drbd_connection *connection)
4687 {
4688 struct drbd_peer_device *peer_device;
4689 enum drbd_conns oc;
4690 int vnr;
4691
4692 if (connection->cstate == C_STANDALONE)
4693 return;
4694
4695 /* We are about to start the cleanup after connection loss.
4696 * Make sure drbd_make_request knows about that.
4697 * Usually we should be in some network failure state already,
4698 * but just in case we are not, we fix it up here.
4699 */
4700 conn_request_state(connection, NS(conn, C_NETWORK_FAILURE), CS_HARD);
4701
4702 /* ack_receiver does not clean up anything. it must not interfere, either */
4703 drbd_thread_stop(&connection->ack_receiver);
4704 if (connection->ack_sender) {
4705 destroy_workqueue(connection->ack_sender);
4706 connection->ack_sender = NULL;
4707 }
4708 drbd_free_sock(connection);
4709
4710 rcu_read_lock();
4711 idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
4712 struct drbd_device *device = peer_device->device;
4713 kref_get(&device->kref);
4714 rcu_read_unlock();
4715 drbd_disconnected(peer_device);
4716 kref_put(&device->kref, drbd_destroy_device);
4717 rcu_read_lock();
4718 }
4719 rcu_read_unlock();
4720
4721 if (!list_empty(&connection->current_epoch->list))
4722 drbd_err(connection, "ASSERTION FAILED: connection->current_epoch->list not empty\n");
4723 /* ok, no more ee's on the fly, it is safe to reset the epoch_size */
4724 atomic_set(&connection->current_epoch->epoch_size, 0);
4725 connection->send.seen_any_write_yet = false;
4726
4727 drbd_info(connection, "Connection closed\n");
4728
4729 if (conn_highest_role(connection) == R_PRIMARY && conn_highest_pdsk(connection) >= D_UNKNOWN)
4730 conn_try_outdate_peer_async(connection);
4731
4732 spin_lock_irq(&connection->resource->req_lock);
4733 oc = connection->cstate;
4734 if (oc >= C_UNCONNECTED)
4735 _conn_request_state(connection, NS(conn, C_UNCONNECTED), CS_VERBOSE);
4736
4737 spin_unlock_irq(&connection->resource->req_lock);
4738
4739 if (oc == C_DISCONNECTING)
4740 conn_request_state(connection, NS(conn, C_STANDALONE), CS_VERBOSE | CS_HARD);
4741 }
4742
4743 static int drbd_disconnected(struct drbd_peer_device *peer_device)
4744 {
4745 struct drbd_device *device = peer_device->device;
4746 unsigned int i;
4747
4748 /* wait for current activity to cease. */
4749 spin_lock_irq(&device->resource->req_lock);
4750 _drbd_wait_ee_list_empty(device, &device->active_ee);
4751 _drbd_wait_ee_list_empty(device, &device->sync_ee);
4752 _drbd_wait_ee_list_empty(device, &device->read_ee);
4753 spin_unlock_irq(&device->resource->req_lock);
4754
4755 /* We do not have data structures that would allow us to
4756 * get the rs_pending_cnt down to 0 again.
4757 * * On C_SYNC_TARGET we do not have any data structures describing
4758 * the pending RSDataRequest's we have sent.
4759 * * On C_SYNC_SOURCE there is no data structure that tracks
4760 * the P_RS_DATA_REPLY blocks that we sent to the SyncTarget.
4761 * And no, it is not the sum of the reference counts in the
4762 * resync_LRU. The resync_LRU tracks the whole operation including
4763 * the disk-IO, while the rs_pending_cnt only tracks the blocks
4764 * on the fly. */
4765 drbd_rs_cancel_all(device);
4766 device->rs_total = 0;
4767 device->rs_failed = 0;
4768 atomic_set(&device->rs_pending_cnt, 0);
4769 wake_up(&device->misc_wait);
4770
4771 del_timer_sync(&device->resync_timer);
4772 resync_timer_fn((unsigned long)device);
4773
4774 /* wait for all w_e_end_data_req, w_e_end_rsdata_req, w_send_barrier,
4775 * w_make_resync_request etc. which may still be on the worker queue
4776 * to be "canceled" */
4777 drbd_flush_workqueue(&peer_device->connection->sender_work);
4778
4779 drbd_finish_peer_reqs(device);
4780
4781 /* This second workqueue flush is necessary, since drbd_finish_peer_reqs()
4782 might have issued a work again. The one before drbd_finish_peer_reqs() is
4783 necessary to reclain net_ee in drbd_finish_peer_reqs(). */
4784 drbd_flush_workqueue(&peer_device->connection->sender_work);
4785
4786 /* need to do it again, drbd_finish_peer_reqs() may have populated it
4787 * again via drbd_try_clear_on_disk_bm(). */
4788 drbd_rs_cancel_all(device);
4789
4790 kfree(device->p_uuid);
4791 device->p_uuid = NULL;
4792
4793 if (!drbd_suspended(device))
4794 tl_clear(peer_device->connection);
4795
4796 drbd_md_sync(device);
4797
4798 /* serialize with bitmap writeout triggered by the state change,
4799 * if any. */
4800 wait_event(device->misc_wait, !test_bit(BITMAP_IO, &device->flags));
4801
4802 /* tcp_close and release of sendpage pages can be deferred. I don't
4803 * want to use SO_LINGER, because apparently it can be deferred for
4804 * more than 20 seconds (longest time I checked).
4805 *
4806 * Actually we don't care for exactly when the network stack does its
4807 * put_page(), but release our reference on these pages right here.
4808 */
4809 i = drbd_free_peer_reqs(device, &device->net_ee);
4810 if (i)
4811 drbd_info(device, "net_ee not empty, killed %u entries\n", i);
4812 i = atomic_read(&device->pp_in_use_by_net);
4813 if (i)
4814 drbd_info(device, "pp_in_use_by_net = %d, expected 0\n", i);
4815 i = atomic_read(&device->pp_in_use);
4816 if (i)
4817 drbd_info(device, "pp_in_use = %d, expected 0\n", i);
4818
4819 D_ASSERT(device, list_empty(&device->read_ee));
4820 D_ASSERT(device, list_empty(&device->active_ee));
4821 D_ASSERT(device, list_empty(&device->sync_ee));
4822 D_ASSERT(device, list_empty(&device->done_ee));
4823
4824 return 0;
4825 }
4826
4827 /*
4828 * We support PRO_VERSION_MIN to PRO_VERSION_MAX. The protocol version
4829 * we can agree on is stored in agreed_pro_version.
4830 *
4831 * feature flags and the reserved array should be enough room for future
4832 * enhancements of the handshake protocol, and possible plugins...
4833 *
4834 * for now, they are expected to be zero, but ignored.
4835 */
4836 static int drbd_send_features(struct drbd_connection *connection)
4837 {
4838 struct drbd_socket *sock;
4839 struct p_connection_features *p;
4840
4841 sock = &connection->data;
4842 p = conn_prepare_command(connection, sock);
4843 if (!p)
4844 return -EIO;
4845 memset(p, 0, sizeof(*p));
4846 p->protocol_min = cpu_to_be32(PRO_VERSION_MIN);
4847 p->protocol_max = cpu_to_be32(PRO_VERSION_MAX);
4848 p->feature_flags = cpu_to_be32(PRO_FEATURES);
4849 return conn_send_command(connection, sock, P_CONNECTION_FEATURES, sizeof(*p), NULL, 0);
4850 }
4851
4852 /*
4853 * return values:
4854 * 1 yes, we have a valid connection
4855 * 0 oops, did not work out, please try again
4856 * -1 peer talks different language,
4857 * no point in trying again, please go standalone.
4858 */
4859 static int drbd_do_features(struct drbd_connection *connection)
4860 {
4861 /* ASSERT current == connection->receiver ... */
4862 struct p_connection_features *p;
4863 const int expect = sizeof(struct p_connection_features);
4864 struct packet_info pi;
4865 int err;
4866
4867 err = drbd_send_features(connection);
4868 if (err)
4869 return 0;
4870
4871 err = drbd_recv_header(connection, &pi);
4872 if (err)
4873 return 0;
4874
4875 if (pi.cmd != P_CONNECTION_FEATURES) {
4876 drbd_err(connection, "expected ConnectionFeatures packet, received: %s (0x%04x)\n",
4877 cmdname(pi.cmd), pi.cmd);
4878 return -1;
4879 }
4880
4881 if (pi.size != expect) {
4882 drbd_err(connection, "expected ConnectionFeatures length: %u, received: %u\n",
4883 expect, pi.size);
4884 return -1;
4885 }
4886
4887 p = pi.data;
4888 err = drbd_recv_all_warn(connection, p, expect);
4889 if (err)
4890 return 0;
4891
4892 p->protocol_min = be32_to_cpu(p->protocol_min);
4893 p->protocol_max = be32_to_cpu(p->protocol_max);
4894 if (p->protocol_max == 0)
4895 p->protocol_max = p->protocol_min;
4896
4897 if (PRO_VERSION_MAX < p->protocol_min ||
4898 PRO_VERSION_MIN > p->protocol_max)
4899 goto incompat;
4900
4901 connection->agreed_pro_version = min_t(int, PRO_VERSION_MAX, p->protocol_max);
4902 connection->agreed_features = PRO_FEATURES & be32_to_cpu(p->feature_flags);
4903
4904 drbd_info(connection, "Handshake successful: "
4905 "Agreed network protocol version %d\n", connection->agreed_pro_version);
4906
4907 drbd_info(connection, "Agreed to%ssupport TRIM on protocol level\n",
4908 connection->agreed_features & FF_TRIM ? " " : " not ");
4909
4910 return 1;
4911
4912 incompat:
4913 drbd_err(connection, "incompatible DRBD dialects: "
4914 "I support %d-%d, peer supports %d-%d\n",
4915 PRO_VERSION_MIN, PRO_VERSION_MAX,
4916 p->protocol_min, p->protocol_max);
4917 return -1;
4918 }
4919
4920 #if !defined(CONFIG_CRYPTO_HMAC) && !defined(CONFIG_CRYPTO_HMAC_MODULE)
4921 static int drbd_do_auth(struct drbd_connection *connection)
4922 {
4923 drbd_err(connection, "This kernel was build without CONFIG_CRYPTO_HMAC.\n");
4924 drbd_err(connection, "You need to disable 'cram-hmac-alg' in drbd.conf.\n");
4925 return -1;
4926 }
4927 #else
4928 #define CHALLENGE_LEN 64
4929
4930 /* Return value:
4931 1 - auth succeeded,
4932 0 - failed, try again (network error),
4933 -1 - auth failed, don't try again.
4934 */
4935
4936 static int drbd_do_auth(struct drbd_connection *connection)
4937 {
4938 struct drbd_socket *sock;
4939 char my_challenge[CHALLENGE_LEN]; /* 64 Bytes... */
4940 char *response = NULL;
4941 char *right_response = NULL;
4942 char *peers_ch = NULL;
4943 unsigned int key_len;
4944 char secret[SHARED_SECRET_MAX]; /* 64 byte */
4945 unsigned int resp_size;
4946 SHASH_DESC_ON_STACK(desc, connection->cram_hmac_tfm);
4947 struct packet_info pi;
4948 struct net_conf *nc;
4949 int err, rv;
4950
4951 /* FIXME: Put the challenge/response into the preallocated socket buffer. */
4952
4953 rcu_read_lock();
4954 nc = rcu_dereference(connection->net_conf);
4955 key_len = strlen(nc->shared_secret);
4956 memcpy(secret, nc->shared_secret, key_len);
4957 rcu_read_unlock();
4958
4959 desc->tfm = connection->cram_hmac_tfm;
4960 desc->flags = 0;
4961
4962 rv = crypto_shash_setkey(connection->cram_hmac_tfm, (u8 *)secret, key_len);
4963 if (rv) {
4964 drbd_err(connection, "crypto_shash_setkey() failed with %d\n", rv);
4965 rv = -1;
4966 goto fail;
4967 }
4968
4969 get_random_bytes(my_challenge, CHALLENGE_LEN);
4970
4971 sock = &connection->data;
4972 if (!conn_prepare_command(connection, sock)) {
4973 rv = 0;
4974 goto fail;
4975 }
4976 rv = !conn_send_command(connection, sock, P_AUTH_CHALLENGE, 0,
4977 my_challenge, CHALLENGE_LEN);
4978 if (!rv)
4979 goto fail;
4980
4981 err = drbd_recv_header(connection, &pi);
4982 if (err) {
4983 rv = 0;
4984 goto fail;
4985 }
4986
4987 if (pi.cmd != P_AUTH_CHALLENGE) {
4988 drbd_err(connection, "expected AuthChallenge packet, received: %s (0x%04x)\n",
4989 cmdname(pi.cmd), pi.cmd);
4990 rv = 0;
4991 goto fail;
4992 }
4993
4994 if (pi.size > CHALLENGE_LEN * 2) {
4995 drbd_err(connection, "expected AuthChallenge payload too big.\n");
4996 rv = -1;
4997 goto fail;
4998 }
4999
5000 if (pi.size < CHALLENGE_LEN) {
5001 drbd_err(connection, "AuthChallenge payload too small.\n");
5002 rv = -1;
5003 goto fail;
5004 }
5005
5006 peers_ch = kmalloc(pi.size, GFP_NOIO);
5007 if (peers_ch == NULL) {
5008 drbd_err(connection, "kmalloc of peers_ch failed\n");
5009 rv = -1;
5010 goto fail;
5011 }
5012
5013 err = drbd_recv_all_warn(connection, peers_ch, pi.size);
5014 if (err) {
5015 rv = 0;
5016 goto fail;
5017 }
5018
5019 if (!memcmp(my_challenge, peers_ch, CHALLENGE_LEN)) {
5020 drbd_err(connection, "Peer presented the same challenge!\n");
5021 rv = -1;
5022 goto fail;
5023 }
5024
5025 resp_size = crypto_shash_digestsize(connection->cram_hmac_tfm);
5026 response = kmalloc(resp_size, GFP_NOIO);
5027 if (response == NULL) {
5028 drbd_err(connection, "kmalloc of response failed\n");
5029 rv = -1;
5030 goto fail;
5031 }
5032
5033 rv = crypto_shash_digest(desc, peers_ch, pi.size, response);
5034 if (rv) {
5035 drbd_err(connection, "crypto_hash_digest() failed with %d\n", rv);
5036 rv = -1;
5037 goto fail;
5038 }
5039
5040 if (!conn_prepare_command(connection, sock)) {
5041 rv = 0;
5042 goto fail;
5043 }
5044 rv = !conn_send_command(connection, sock, P_AUTH_RESPONSE, 0,
5045 response, resp_size);
5046 if (!rv)
5047 goto fail;
5048
5049 err = drbd_recv_header(connection, &pi);
5050 if (err) {
5051 rv = 0;
5052 goto fail;
5053 }
5054
5055 if (pi.cmd != P_AUTH_RESPONSE) {
5056 drbd_err(connection, "expected AuthResponse packet, received: %s (0x%04x)\n",
5057 cmdname(pi.cmd), pi.cmd);
5058 rv = 0;
5059 goto fail;
5060 }
5061
5062 if (pi.size != resp_size) {
5063 drbd_err(connection, "expected AuthResponse payload of wrong size\n");
5064 rv = 0;
5065 goto fail;
5066 }
5067
5068 err = drbd_recv_all_warn(connection, response , resp_size);
5069 if (err) {
5070 rv = 0;
5071 goto fail;
5072 }
5073
5074 right_response = kmalloc(resp_size, GFP_NOIO);
5075 if (right_response == NULL) {
5076 drbd_err(connection, "kmalloc of right_response failed\n");
5077 rv = -1;
5078 goto fail;
5079 }
5080
5081 rv = crypto_shash_digest(desc, my_challenge, CHALLENGE_LEN,
5082 right_response);
5083 if (rv) {
5084 drbd_err(connection, "crypto_hash_digest() failed with %d\n", rv);
5085 rv = -1;
5086 goto fail;
5087 }
5088
5089 rv = !memcmp(response, right_response, resp_size);
5090
5091 if (rv)
5092 drbd_info(connection, "Peer authenticated using %d bytes HMAC\n",
5093 resp_size);
5094 else
5095 rv = -1;
5096
5097 fail:
5098 kfree(peers_ch);
5099 kfree(response);
5100 kfree(right_response);
5101 shash_desc_zero(desc);
5102
5103 return rv;
5104 }
5105 #endif
5106
5107 int drbd_receiver(struct drbd_thread *thi)
5108 {
5109 struct drbd_connection *connection = thi->connection;
5110 int h;
5111
5112 drbd_info(connection, "receiver (re)started\n");
5113
5114 do {
5115 h = conn_connect(connection);
5116 if (h == 0) {
5117 conn_disconnect(connection);
5118 schedule_timeout_interruptible(HZ);
5119 }
5120 if (h == -1) {
5121 drbd_warn(connection, "Discarding network configuration.\n");
5122 conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD);
5123 }
5124 } while (h == 0);
5125
5126 if (h > 0)
5127 drbdd(connection);
5128
5129 conn_disconnect(connection);
5130
5131 drbd_info(connection, "receiver terminated\n");
5132 return 0;
5133 }
5134
5135 /* ********* acknowledge sender ******** */
5136
5137 static int got_conn_RqSReply(struct drbd_connection *connection, struct packet_info *pi)
5138 {
5139 struct p_req_state_reply *p = pi->data;
5140 int retcode = be32_to_cpu(p->retcode);
5141
5142 if (retcode >= SS_SUCCESS) {
5143 set_bit(CONN_WD_ST_CHG_OKAY, &connection->flags);
5144 } else {
5145 set_bit(CONN_WD_ST_CHG_FAIL, &connection->flags);
5146 drbd_err(connection, "Requested state change failed by peer: %s (%d)\n",
5147 drbd_set_st_err_str(retcode), retcode);
5148 }
5149 wake_up(&connection->ping_wait);
5150
5151 return 0;
5152 }
5153
5154 static int got_RqSReply(struct drbd_connection *connection, struct packet_info *pi)
5155 {
5156 struct drbd_peer_device *peer_device;
5157 struct drbd_device *device;
5158 struct p_req_state_reply *p = pi->data;
5159 int retcode = be32_to_cpu(p->retcode);
5160
5161 peer_device = conn_peer_device(connection, pi->vnr);
5162 if (!peer_device)
5163 return -EIO;
5164 device = peer_device->device;
5165
5166 if (test_bit(CONN_WD_ST_CHG_REQ, &connection->flags)) {
5167 D_ASSERT(device, connection->agreed_pro_version < 100);
5168 return got_conn_RqSReply(connection, pi);
5169 }
5170
5171 if (retcode >= SS_SUCCESS) {
5172 set_bit(CL_ST_CHG_SUCCESS, &device->flags);
5173 } else {
5174 set_bit(CL_ST_CHG_FAIL, &device->flags);
5175 drbd_err(device, "Requested state change failed by peer: %s (%d)\n",
5176 drbd_set_st_err_str(retcode), retcode);
5177 }
5178 wake_up(&device->state_wait);
5179
5180 return 0;
5181 }
5182
5183 static int got_Ping(struct drbd_connection *connection, struct packet_info *pi)
5184 {
5185 return drbd_send_ping_ack(connection);
5186
5187 }
5188
5189 static int got_PingAck(struct drbd_connection *connection, struct packet_info *pi)
5190 {
5191 /* restore idle timeout */
5192 connection->meta.socket->sk->sk_rcvtimeo = connection->net_conf->ping_int*HZ;
5193 if (!test_and_set_bit(GOT_PING_ACK, &connection->flags))
5194 wake_up(&connection->ping_wait);
5195
5196 return 0;
5197 }
5198
5199 static int got_IsInSync(struct drbd_connection *connection, struct packet_info *pi)
5200 {
5201 struct drbd_peer_device *peer_device;
5202 struct drbd_device *device;
5203 struct p_block_ack *p = pi->data;
5204 sector_t sector = be64_to_cpu(p->sector);
5205 int blksize = be32_to_cpu(p->blksize);
5206
5207 peer_device = conn_peer_device(connection, pi->vnr);
5208 if (!peer_device)
5209 return -EIO;
5210 device = peer_device->device;
5211
5212 D_ASSERT(device, peer_device->connection->agreed_pro_version >= 89);
5213
5214 update_peer_seq(peer_device, be32_to_cpu(p->seq_num));
5215
5216 if (get_ldev(device)) {
5217 drbd_rs_complete_io(device, sector);
5218 drbd_set_in_sync(device, sector, blksize);
5219 /* rs_same_csums is supposed to count in units of BM_BLOCK_SIZE */
5220 device->rs_same_csum += (blksize >> BM_BLOCK_SHIFT);
5221 put_ldev(device);
5222 }
5223 dec_rs_pending(device);
5224 atomic_add(blksize >> 9, &device->rs_sect_in);
5225
5226 return 0;
5227 }
5228
5229 static int
5230 validate_req_change_req_state(struct drbd_device *device, u64 id, sector_t sector,
5231 struct rb_root *root, const char *func,
5232 enum drbd_req_event what, bool missing_ok)
5233 {
5234 struct drbd_request *req;
5235 struct bio_and_error m;
5236
5237 spin_lock_irq(&device->resource->req_lock);
5238 req = find_request(device, root, id, sector, missing_ok, func);
5239 if (unlikely(!req)) {
5240 spin_unlock_irq(&device->resource->req_lock);
5241 return -EIO;
5242 }
5243 __req_mod(req, what, &m);
5244 spin_unlock_irq(&device->resource->req_lock);
5245
5246 if (m.bio)
5247 complete_master_bio(device, &m);
5248 return 0;
5249 }
5250
5251 static int got_BlockAck(struct drbd_connection *connection, struct packet_info *pi)
5252 {
5253 struct drbd_peer_device *peer_device;
5254 struct drbd_device *device;
5255 struct p_block_ack *p = pi->data;
5256 sector_t sector = be64_to_cpu(p->sector);
5257 int blksize = be32_to_cpu(p->blksize);
5258 enum drbd_req_event what;
5259
5260 peer_device = conn_peer_device(connection, pi->vnr);
5261 if (!peer_device)
5262 return -EIO;
5263 device = peer_device->device;
5264
5265 update_peer_seq(peer_device, be32_to_cpu(p->seq_num));
5266
5267 if (p->block_id == ID_SYNCER) {
5268 drbd_set_in_sync(device, sector, blksize);
5269 dec_rs_pending(device);
5270 return 0;
5271 }
5272 switch (pi->cmd) {
5273 case P_RS_WRITE_ACK:
5274 what = WRITE_ACKED_BY_PEER_AND_SIS;
5275 break;
5276 case P_WRITE_ACK:
5277 what = WRITE_ACKED_BY_PEER;
5278 break;
5279 case P_RECV_ACK:
5280 what = RECV_ACKED_BY_PEER;
5281 break;
5282 case P_SUPERSEDED:
5283 what = CONFLICT_RESOLVED;
5284 break;
5285 case P_RETRY_WRITE:
5286 what = POSTPONE_WRITE;
5287 break;
5288 default:
5289 BUG();
5290 }
5291
5292 return validate_req_change_req_state(device, p->block_id, sector,
5293 &device->write_requests, __func__,
5294 what, false);
5295 }
5296
5297 static int got_NegAck(struct drbd_connection *connection, struct packet_info *pi)
5298 {
5299 struct drbd_peer_device *peer_device;
5300 struct drbd_device *device;
5301 struct p_block_ack *p = pi->data;
5302 sector_t sector = be64_to_cpu(p->sector);
5303 int size = be32_to_cpu(p->blksize);
5304 int err;
5305
5306 peer_device = conn_peer_device(connection, pi->vnr);
5307 if (!peer_device)
5308 return -EIO;
5309 device = peer_device->device;
5310
5311 update_peer_seq(peer_device, be32_to_cpu(p->seq_num));
5312
5313 if (p->block_id == ID_SYNCER) {
5314 dec_rs_pending(device);
5315 drbd_rs_failed_io(device, sector, size);
5316 return 0;
5317 }
5318
5319 err = validate_req_change_req_state(device, p->block_id, sector,
5320 &device->write_requests, __func__,
5321 NEG_ACKED, true);
5322 if (err) {
5323 /* Protocol A has no P_WRITE_ACKs, but has P_NEG_ACKs.
5324 The master bio might already be completed, therefore the
5325 request is no longer in the collision hash. */
5326 /* In Protocol B we might already have got a P_RECV_ACK
5327 but then get a P_NEG_ACK afterwards. */
5328 drbd_set_out_of_sync(device, sector, size);
5329 }
5330 return 0;
5331 }
5332
5333 static int got_NegDReply(struct drbd_connection *connection, struct packet_info *pi)
5334 {
5335 struct drbd_peer_device *peer_device;
5336 struct drbd_device *device;
5337 struct p_block_ack *p = pi->data;
5338 sector_t sector = be64_to_cpu(p->sector);
5339
5340 peer_device = conn_peer_device(connection, pi->vnr);
5341 if (!peer_device)
5342 return -EIO;
5343 device = peer_device->device;
5344
5345 update_peer_seq(peer_device, be32_to_cpu(p->seq_num));
5346
5347 drbd_err(device, "Got NegDReply; Sector %llus, len %u.\n",
5348 (unsigned long long)sector, be32_to_cpu(p->blksize));
5349
5350 return validate_req_change_req_state(device, p->block_id, sector,
5351 &device->read_requests, __func__,
5352 NEG_ACKED, false);
5353 }
5354
5355 static int got_NegRSDReply(struct drbd_connection *connection, struct packet_info *pi)
5356 {
5357 struct drbd_peer_device *peer_device;
5358 struct drbd_device *device;
5359 sector_t sector;
5360 int size;
5361 struct p_block_ack *p = pi->data;
5362
5363 peer_device = conn_peer_device(connection, pi->vnr);
5364 if (!peer_device)
5365 return -EIO;
5366 device = peer_device->device;
5367
5368 sector = be64_to_cpu(p->sector);
5369 size = be32_to_cpu(p->blksize);
5370
5371 update_peer_seq(peer_device, be32_to_cpu(p->seq_num));
5372
5373 dec_rs_pending(device);
5374
5375 if (get_ldev_if_state(device, D_FAILED)) {
5376 drbd_rs_complete_io(device, sector);
5377 switch (pi->cmd) {
5378 case P_NEG_RS_DREPLY:
5379 drbd_rs_failed_io(device, sector, size);
5380 case P_RS_CANCEL:
5381 break;
5382 default:
5383 BUG();
5384 }
5385 put_ldev(device);
5386 }
5387
5388 return 0;
5389 }
5390
5391 static int got_BarrierAck(struct drbd_connection *connection, struct packet_info *pi)
5392 {
5393 struct p_barrier_ack *p = pi->data;
5394 struct drbd_peer_device *peer_device;
5395 int vnr;
5396
5397 tl_release(connection, p->barrier, be32_to_cpu(p->set_size));
5398
5399 rcu_read_lock();
5400 idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
5401 struct drbd_device *device = peer_device->device;
5402
5403 if (device->state.conn == C_AHEAD &&
5404 atomic_read(&device->ap_in_flight) == 0 &&
5405 !test_and_set_bit(AHEAD_TO_SYNC_SOURCE, &device->flags)) {
5406 device->start_resync_timer.expires = jiffies + HZ;
5407 add_timer(&device->start_resync_timer);
5408 }
5409 }
5410 rcu_read_unlock();
5411
5412 return 0;
5413 }
5414
5415 static int got_OVResult(struct drbd_connection *connection, struct packet_info *pi)
5416 {
5417 struct drbd_peer_device *peer_device;
5418 struct drbd_device *device;
5419 struct p_block_ack *p = pi->data;
5420 struct drbd_device_work *dw;
5421 sector_t sector;
5422 int size;
5423
5424 peer_device = conn_peer_device(connection, pi->vnr);
5425 if (!peer_device)
5426 return -EIO;
5427 device = peer_device->device;
5428
5429 sector = be64_to_cpu(p->sector);
5430 size = be32_to_cpu(p->blksize);
5431
5432 update_peer_seq(peer_device, be32_to_cpu(p->seq_num));
5433
5434 if (be64_to_cpu(p->block_id) == ID_OUT_OF_SYNC)
5435 drbd_ov_out_of_sync_found(device, sector, size);
5436 else
5437 ov_out_of_sync_print(device);
5438
5439 if (!get_ldev(device))
5440 return 0;
5441
5442 drbd_rs_complete_io(device, sector);
5443 dec_rs_pending(device);
5444
5445 --device->ov_left;
5446
5447 /* let's advance progress step marks only for every other megabyte */
5448 if ((device->ov_left & 0x200) == 0x200)
5449 drbd_advance_rs_marks(device, device->ov_left);
5450
5451 if (device->ov_left == 0) {
5452 dw = kmalloc(sizeof(*dw), GFP_NOIO);
5453 if (dw) {
5454 dw->w.cb = w_ov_finished;
5455 dw->device = device;
5456 drbd_queue_work(&peer_device->connection->sender_work, &dw->w);
5457 } else {
5458 drbd_err(device, "kmalloc(dw) failed.");
5459 ov_out_of_sync_print(device);
5460 drbd_resync_finished(device);
5461 }
5462 }
5463 put_ldev(device);
5464 return 0;
5465 }
5466
5467 static int got_skip(struct drbd_connection *connection, struct packet_info *pi)
5468 {
5469 return 0;
5470 }
5471
5472 struct meta_sock_cmd {
5473 size_t pkt_size;
5474 int (*fn)(struct drbd_connection *connection, struct packet_info *);
5475 };
5476
5477 static void set_rcvtimeo(struct drbd_connection *connection, bool ping_timeout)
5478 {
5479 long t;
5480 struct net_conf *nc;
5481
5482 rcu_read_lock();
5483 nc = rcu_dereference(connection->net_conf);
5484 t = ping_timeout ? nc->ping_timeo : nc->ping_int;
5485 rcu_read_unlock();
5486
5487 t *= HZ;
5488 if (ping_timeout)
5489 t /= 10;
5490
5491 connection->meta.socket->sk->sk_rcvtimeo = t;
5492 }
5493
5494 static void set_ping_timeout(struct drbd_connection *connection)
5495 {
5496 set_rcvtimeo(connection, 1);
5497 }
5498
5499 static void set_idle_timeout(struct drbd_connection *connection)
5500 {
5501 set_rcvtimeo(connection, 0);
5502 }
5503
5504 static struct meta_sock_cmd ack_receiver_tbl[] = {
5505 [P_PING] = { 0, got_Ping },
5506 [P_PING_ACK] = { 0, got_PingAck },
5507 [P_RECV_ACK] = { sizeof(struct p_block_ack), got_BlockAck },
5508 [P_WRITE_ACK] = { sizeof(struct p_block_ack), got_BlockAck },
5509 [P_RS_WRITE_ACK] = { sizeof(struct p_block_ack), got_BlockAck },
5510 [P_SUPERSEDED] = { sizeof(struct p_block_ack), got_BlockAck },
5511 [P_NEG_ACK] = { sizeof(struct p_block_ack), got_NegAck },
5512 [P_NEG_DREPLY] = { sizeof(struct p_block_ack), got_NegDReply },
5513 [P_NEG_RS_DREPLY] = { sizeof(struct p_block_ack), got_NegRSDReply },
5514 [P_OV_RESULT] = { sizeof(struct p_block_ack), got_OVResult },
5515 [P_BARRIER_ACK] = { sizeof(struct p_barrier_ack), got_BarrierAck },
5516 [P_STATE_CHG_REPLY] = { sizeof(struct p_req_state_reply), got_RqSReply },
5517 [P_RS_IS_IN_SYNC] = { sizeof(struct p_block_ack), got_IsInSync },
5518 [P_DELAY_PROBE] = { sizeof(struct p_delay_probe93), got_skip },
5519 [P_RS_CANCEL] = { sizeof(struct p_block_ack), got_NegRSDReply },
5520 [P_CONN_ST_CHG_REPLY]={ sizeof(struct p_req_state_reply), got_conn_RqSReply },
5521 [P_RETRY_WRITE] = { sizeof(struct p_block_ack), got_BlockAck },
5522 };
5523
5524 int drbd_ack_receiver(struct drbd_thread *thi)
5525 {
5526 struct drbd_connection *connection = thi->connection;
5527 struct meta_sock_cmd *cmd = NULL;
5528 struct packet_info pi;
5529 unsigned long pre_recv_jif;
5530 int rv;
5531 void *buf = connection->meta.rbuf;
5532 int received = 0;
5533 unsigned int header_size = drbd_header_size(connection);
5534 int expect = header_size;
5535 bool ping_timeout_active = false;
5536 struct sched_param param = { .sched_priority = 2 };
5537
5538 rv = sched_setscheduler(current, SCHED_RR, &param);
5539 if (rv < 0)
5540 drbd_err(connection, "drbd_ack_receiver: ERROR set priority, ret=%d\n", rv);
5541
5542 while (get_t_state(thi) == RUNNING) {
5543 drbd_thread_current_set_cpu(thi);
5544
5545 conn_reclaim_net_peer_reqs(connection);
5546
5547 if (test_and_clear_bit(SEND_PING, &connection->flags)) {
5548 if (drbd_send_ping(connection)) {
5549 drbd_err(connection, "drbd_send_ping has failed\n");
5550 goto reconnect;
5551 }
5552 set_ping_timeout(connection);
5553 ping_timeout_active = true;
5554 }
5555
5556 pre_recv_jif = jiffies;
5557 rv = drbd_recv_short(connection->meta.socket, buf, expect-received, 0);
5558
5559 /* Note:
5560 * -EINTR (on meta) we got a signal
5561 * -EAGAIN (on meta) rcvtimeo expired
5562 * -ECONNRESET other side closed the connection
5563 * -ERESTARTSYS (on data) we got a signal
5564 * rv < 0 other than above: unexpected error!
5565 * rv == expected: full header or command
5566 * rv < expected: "woken" by signal during receive
5567 * rv == 0 : "connection shut down by peer"
5568 */
5569 if (likely(rv > 0)) {
5570 received += rv;
5571 buf += rv;
5572 } else if (rv == 0) {
5573 if (test_bit(DISCONNECT_SENT, &connection->flags)) {
5574 long t;
5575 rcu_read_lock();
5576 t = rcu_dereference(connection->net_conf)->ping_timeo * HZ/10;
5577 rcu_read_unlock();
5578
5579 t = wait_event_timeout(connection->ping_wait,
5580 connection->cstate < C_WF_REPORT_PARAMS,
5581 t);
5582 if (t)
5583 break;
5584 }
5585 drbd_err(connection, "meta connection shut down by peer.\n");
5586 goto reconnect;
5587 } else if (rv == -EAGAIN) {
5588 /* If the data socket received something meanwhile,
5589 * that is good enough: peer is still alive. */
5590 if (time_after(connection->last_received, pre_recv_jif))
5591 continue;
5592 if (ping_timeout_active) {
5593 drbd_err(connection, "PingAck did not arrive in time.\n");
5594 goto reconnect;
5595 }
5596 set_bit(SEND_PING, &connection->flags);
5597 continue;
5598 } else if (rv == -EINTR) {
5599 /* maybe drbd_thread_stop(): the while condition will notice.
5600 * maybe woken for send_ping: we'll send a ping above,
5601 * and change the rcvtimeo */
5602 flush_signals(current);
5603 continue;
5604 } else {
5605 drbd_err(connection, "sock_recvmsg returned %d\n", rv);
5606 goto reconnect;
5607 }
5608
5609 if (received == expect && cmd == NULL) {
5610 if (decode_header(connection, connection->meta.rbuf, &pi))
5611 goto reconnect;
5612 cmd = &ack_receiver_tbl[pi.cmd];
5613 if (pi.cmd >= ARRAY_SIZE(ack_receiver_tbl) || !cmd->fn) {
5614 drbd_err(connection, "Unexpected meta packet %s (0x%04x)\n",
5615 cmdname(pi.cmd), pi.cmd);
5616 goto disconnect;
5617 }
5618 expect = header_size + cmd->pkt_size;
5619 if (pi.size != expect - header_size) {
5620 drbd_err(connection, "Wrong packet size on meta (c: %d, l: %d)\n",
5621 pi.cmd, pi.size);
5622 goto reconnect;
5623 }
5624 }
5625 if (received == expect) {
5626 bool err;
5627
5628 err = cmd->fn(connection, &pi);
5629 if (err) {
5630 drbd_err(connection, "%pf failed\n", cmd->fn);
5631 goto reconnect;
5632 }
5633
5634 connection->last_received = jiffies;
5635
5636 if (cmd == &ack_receiver_tbl[P_PING_ACK]) {
5637 set_idle_timeout(connection);
5638 ping_timeout_active = false;
5639 }
5640
5641 buf = connection->meta.rbuf;
5642 received = 0;
5643 expect = header_size;
5644 cmd = NULL;
5645 }
5646 }
5647
5648 if (0) {
5649 reconnect:
5650 conn_request_state(connection, NS(conn, C_NETWORK_FAILURE), CS_HARD);
5651 conn_md_sync(connection);
5652 }
5653 if (0) {
5654 disconnect:
5655 conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD);
5656 }
5657
5658 drbd_info(connection, "ack_receiver terminated\n");
5659
5660 return 0;
5661 }
5662
5663 void drbd_send_acks_wf(struct work_struct *ws)
5664 {
5665 struct drbd_peer_device *peer_device =
5666 container_of(ws, struct drbd_peer_device, send_acks_work);
5667 struct drbd_connection *connection = peer_device->connection;
5668 struct drbd_device *device = peer_device->device;
5669 struct net_conf *nc;
5670 int tcp_cork, err;
5671
5672 rcu_read_lock();
5673 nc = rcu_dereference(connection->net_conf);
5674 tcp_cork = nc->tcp_cork;
5675 rcu_read_unlock();
5676
5677 if (tcp_cork)
5678 drbd_tcp_cork(connection->meta.socket);
5679
5680 err = drbd_finish_peer_reqs(device);
5681 kref_put(&device->kref, drbd_destroy_device);
5682 /* get is in drbd_endio_write_sec_final(). That is necessary to keep the
5683 struct work_struct send_acks_work alive, which is in the peer_device object */
5684
5685 if (err) {
5686 conn_request_state(connection, NS(conn, C_NETWORK_FAILURE), CS_HARD);
5687 return;
5688 }
5689
5690 if (tcp_cork)
5691 drbd_tcp_uncork(connection->meta.socket);
5692
5693 return;
5694 }
This page took 0.242758 seconds and 5 git commands to generate.