drbd: detach from frozen backing device
[deliverable/linux.git] / drivers / block / drbd / drbd_main.c
1 /*
2 drbd.c
3
4 This file is part of DRBD by Philipp Reisner and Lars Ellenberg.
5
6 Copyright (C) 2001-2008, LINBIT Information Technologies GmbH.
7 Copyright (C) 1999-2008, Philipp Reisner <philipp.reisner@linbit.com>.
8 Copyright (C) 2002-2008, Lars Ellenberg <lars.ellenberg@linbit.com>.
9
10 Thanks to Carter Burden, Bart Grantham and Gennadiy Nerubayev
11 from Logicworks, Inc. for making SDP replication support possible.
12
13 drbd is free software; you can redistribute it and/or modify
14 it under the terms of the GNU General Public License as published by
15 the Free Software Foundation; either version 2, or (at your option)
16 any later version.
17
18 drbd is distributed in the hope that it will be useful,
19 but WITHOUT ANY WARRANTY; without even the implied warranty of
20 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
21 GNU General Public License for more details.
22
23 You should have received a copy of the GNU General Public License
24 along with drbd; see the file COPYING. If not, write to
25 the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
26
27 */
28
29 #include <linux/module.h>
30 #include <linux/drbd.h>
31 #include <asm/uaccess.h>
32 #include <asm/types.h>
33 #include <net/sock.h>
34 #include <linux/ctype.h>
35 #include <linux/mutex.h>
36 #include <linux/fs.h>
37 #include <linux/file.h>
38 #include <linux/proc_fs.h>
39 #include <linux/init.h>
40 #include <linux/mm.h>
41 #include <linux/memcontrol.h>
42 #include <linux/mm_inline.h>
43 #include <linux/slab.h>
44 #include <linux/random.h>
45 #include <linux/reboot.h>
46 #include <linux/notifier.h>
47 #include <linux/kthread.h>
48
49 #define __KERNEL_SYSCALLS__
50 #include <linux/unistd.h>
51 #include <linux/vmalloc.h>
52
53 #include <linux/drbd_limits.h>
54 #include "drbd_int.h"
55 #include "drbd_req.h" /* only for _req_mod in tl_release and tl_clear */
56
57 #include "drbd_vli.h"
58
59 static DEFINE_MUTEX(drbd_main_mutex);
60 int drbdd_init(struct drbd_thread *);
61 int drbd_worker(struct drbd_thread *);
62 int drbd_asender(struct drbd_thread *);
63
64 int drbd_init(void);
65 static int drbd_open(struct block_device *bdev, fmode_t mode);
66 static int drbd_release(struct gendisk *gd, fmode_t mode);
67 static int w_md_sync(struct drbd_work *w, int unused);
68 static void md_sync_timer_fn(unsigned long data);
69 static int w_bitmap_io(struct drbd_work *w, int unused);
70 static int w_go_diskless(struct drbd_work *w, int unused);
71
72 MODULE_AUTHOR("Philipp Reisner <phil@linbit.com>, "
73 "Lars Ellenberg <lars@linbit.com>");
74 MODULE_DESCRIPTION("drbd - Distributed Replicated Block Device v" REL_VERSION);
75 MODULE_VERSION(REL_VERSION);
76 MODULE_LICENSE("GPL");
77 MODULE_PARM_DESC(minor_count, "Approximate number of drbd devices ("
78 __stringify(DRBD_MINOR_COUNT_MIN) "-" __stringify(DRBD_MINOR_COUNT_MAX) ")");
79 MODULE_ALIAS_BLOCKDEV_MAJOR(DRBD_MAJOR);
80
81 #include <linux/moduleparam.h>
82 /* allow_open_on_secondary */
83 MODULE_PARM_DESC(allow_oos, "DONT USE!");
84 /* thanks to these macros, if compiled into the kernel (not-module),
85 * this becomes the boot parameter drbd.minor_count */
86 module_param(minor_count, uint, 0444);
87 module_param(disable_sendpage, bool, 0644);
88 module_param(allow_oos, bool, 0);
89 module_param(proc_details, int, 0644);
90
91 #ifdef CONFIG_DRBD_FAULT_INJECTION
92 int enable_faults;
93 int fault_rate;
94 static int fault_count;
95 int fault_devs;
96 /* bitmap of enabled faults */
97 module_param(enable_faults, int, 0664);
98 /* fault rate % value - applies to all enabled faults */
99 module_param(fault_rate, int, 0664);
100 /* count of faults inserted */
101 module_param(fault_count, int, 0664);
102 /* bitmap of devices to insert faults on */
103 module_param(fault_devs, int, 0644);
104 #endif
105
106 /* module parameter, defined */
107 unsigned int minor_count = DRBD_MINOR_COUNT_DEF;
108 int disable_sendpage;
109 int allow_oos;
110 int proc_details; /* Detail level in proc drbd*/
111
112 /* Module parameter for setting the user mode helper program
113 * to run. Default is /sbin/drbdadm */
114 char usermode_helper[80] = "/sbin/drbdadm";
115
116 module_param_string(usermode_helper, usermode_helper, sizeof(usermode_helper), 0644);
117
118 /* in 2.6.x, our device mapping and config info contains our virtual gendisks
119 * as member "struct gendisk *vdisk;"
120 */
121 struct idr minors;
122 struct list_head drbd_tconns; /* list of struct drbd_tconn */
123
124 struct kmem_cache *drbd_request_cache;
125 struct kmem_cache *drbd_ee_cache; /* peer requests */
126 struct kmem_cache *drbd_bm_ext_cache; /* bitmap extents */
127 struct kmem_cache *drbd_al_ext_cache; /* activity log extents */
128 mempool_t *drbd_request_mempool;
129 mempool_t *drbd_ee_mempool;
130 mempool_t *drbd_md_io_page_pool;
131 struct bio_set *drbd_md_io_bio_set;
132
133 /* I do not use a standard mempool, because:
134 1) I want to hand out the pre-allocated objects first.
135 2) I want to be able to interrupt sleeping allocation with a signal.
136 Note: This is a single linked list, the next pointer is the private
137 member of struct page.
138 */
139 struct page *drbd_pp_pool;
140 spinlock_t drbd_pp_lock;
141 int drbd_pp_vacant;
142 wait_queue_head_t drbd_pp_wait;
143
144 DEFINE_RATELIMIT_STATE(drbd_ratelimit_state, 5 * HZ, 5);
145
146 static const struct block_device_operations drbd_ops = {
147 .owner = THIS_MODULE,
148 .open = drbd_open,
149 .release = drbd_release,
150 };
151
152 static void bio_destructor_drbd(struct bio *bio)
153 {
154 bio_free(bio, drbd_md_io_bio_set);
155 }
156
157 struct bio *bio_alloc_drbd(gfp_t gfp_mask)
158 {
159 struct bio *bio;
160
161 if (!drbd_md_io_bio_set)
162 return bio_alloc(gfp_mask, 1);
163
164 bio = bio_alloc_bioset(gfp_mask, 1, drbd_md_io_bio_set);
165 if (!bio)
166 return NULL;
167 bio->bi_destructor = bio_destructor_drbd;
168 return bio;
169 }
170
171 #ifdef __CHECKER__
172 /* When checking with sparse, and this is an inline function, sparse will
173 give tons of false positives. When this is a real functions sparse works.
174 */
175 int _get_ldev_if_state(struct drbd_conf *mdev, enum drbd_disk_state mins)
176 {
177 int io_allowed;
178
179 atomic_inc(&mdev->local_cnt);
180 io_allowed = (mdev->state.disk >= mins);
181 if (!io_allowed) {
182 if (atomic_dec_and_test(&mdev->local_cnt))
183 wake_up(&mdev->misc_wait);
184 }
185 return io_allowed;
186 }
187
188 #endif
189
190 /**
191 * DOC: The transfer log
192 *
193 * The transfer log is a single linked list of &struct drbd_tl_epoch objects.
194 * mdev->tconn->newest_tle points to the head, mdev->tconn->oldest_tle points to the tail
195 * of the list. There is always at least one &struct drbd_tl_epoch object.
196 *
197 * Each &struct drbd_tl_epoch has a circular double linked list of requests
198 * attached.
199 */
200 static int tl_init(struct drbd_tconn *tconn)
201 {
202 struct drbd_tl_epoch *b;
203
204 /* during device minor initialization, we may well use GFP_KERNEL */
205 b = kmalloc(sizeof(struct drbd_tl_epoch), GFP_KERNEL);
206 if (!b)
207 return 0;
208 INIT_LIST_HEAD(&b->requests);
209 INIT_LIST_HEAD(&b->w.list);
210 b->next = NULL;
211 b->br_number = 4711;
212 b->n_writes = 0;
213 b->w.cb = NULL; /* if this is != NULL, we need to dec_ap_pending in tl_clear */
214
215 tconn->oldest_tle = b;
216 tconn->newest_tle = b;
217 INIT_LIST_HEAD(&tconn->out_of_sequence_requests);
218 INIT_LIST_HEAD(&tconn->barrier_acked_requests);
219
220 return 1;
221 }
222
223 static void tl_cleanup(struct drbd_tconn *tconn)
224 {
225 if (tconn->oldest_tle != tconn->newest_tle)
226 conn_err(tconn, "ASSERT FAILED: oldest_tle == newest_tle\n");
227 if (!list_empty(&tconn->out_of_sequence_requests))
228 conn_err(tconn, "ASSERT FAILED: list_empty(out_of_sequence_requests)\n");
229 kfree(tconn->oldest_tle);
230 tconn->oldest_tle = NULL;
231 kfree(tconn->unused_spare_tle);
232 tconn->unused_spare_tle = NULL;
233 }
234
235 /**
236 * _tl_add_barrier() - Adds a barrier to the transfer log
237 * @mdev: DRBD device.
238 * @new: Barrier to be added before the current head of the TL.
239 *
240 * The caller must hold the req_lock.
241 */
242 void _tl_add_barrier(struct drbd_tconn *tconn, struct drbd_tl_epoch *new)
243 {
244 struct drbd_tl_epoch *newest_before;
245
246 INIT_LIST_HEAD(&new->requests);
247 INIT_LIST_HEAD(&new->w.list);
248 new->w.cb = NULL; /* if this is != NULL, we need to dec_ap_pending in tl_clear */
249 new->next = NULL;
250 new->n_writes = 0;
251
252 newest_before = tconn->newest_tle;
253 /* never send a barrier number == 0, because that is special-cased
254 * when using TCQ for our write ordering code */
255 new->br_number = (newest_before->br_number+1) ?: 1;
256 if (tconn->newest_tle != new) {
257 tconn->newest_tle->next = new;
258 tconn->newest_tle = new;
259 }
260 }
261
262 /**
263 * tl_release() - Free or recycle the oldest &struct drbd_tl_epoch object of the TL
264 * @mdev: DRBD device.
265 * @barrier_nr: Expected identifier of the DRBD write barrier packet.
266 * @set_size: Expected number of requests before that barrier.
267 *
268 * In case the passed barrier_nr or set_size does not match the oldest
269 * &struct drbd_tl_epoch objects this function will cause a termination
270 * of the connection.
271 */
272 void tl_release(struct drbd_tconn *tconn, unsigned int barrier_nr,
273 unsigned int set_size)
274 {
275 struct drbd_conf *mdev;
276 struct drbd_tl_epoch *b, *nob; /* next old barrier */
277 struct list_head *le, *tle;
278 struct drbd_request *r;
279
280 spin_lock_irq(&tconn->req_lock);
281
282 b = tconn->oldest_tle;
283
284 /* first some paranoia code */
285 if (b == NULL) {
286 conn_err(tconn, "BAD! BarrierAck #%u received, but no epoch in tl!?\n",
287 barrier_nr);
288 goto bail;
289 }
290 if (b->br_number != barrier_nr) {
291 conn_err(tconn, "BAD! BarrierAck #%u received, expected #%u!\n",
292 barrier_nr, b->br_number);
293 goto bail;
294 }
295 if (b->n_writes != set_size) {
296 conn_err(tconn, "BAD! BarrierAck #%u received with n_writes=%u, expected n_writes=%u!\n",
297 barrier_nr, set_size, b->n_writes);
298 goto bail;
299 }
300
301 /* Clean up list of requests processed during current epoch */
302 list_for_each_safe(le, tle, &b->requests) {
303 r = list_entry(le, struct drbd_request, tl_requests);
304 _req_mod(r, BARRIER_ACKED);
305 }
306 /* There could be requests on the list waiting for completion
307 of the write to the local disk. To avoid corruptions of
308 slab's data structures we have to remove the lists head.
309
310 Also there could have been a barrier ack out of sequence, overtaking
311 the write acks - which would be a bug and violating write ordering.
312 To not deadlock in case we lose connection while such requests are
313 still pending, we need some way to find them for the
314 _req_mode(CONNECTION_LOST_WHILE_PENDING).
315
316 These have been list_move'd to the out_of_sequence_requests list in
317 _req_mod(, BARRIER_ACKED) above.
318 */
319 list_splice_init(&b->requests, &tconn->barrier_acked_requests);
320 mdev = b->w.mdev;
321
322 nob = b->next;
323 if (test_and_clear_bit(CREATE_BARRIER, &mdev->flags)) {
324 _tl_add_barrier(tconn, b);
325 if (nob)
326 tconn->oldest_tle = nob;
327 /* if nob == NULL b was the only barrier, and becomes the new
328 barrier. Therefore tconn->oldest_tle points already to b */
329 } else {
330 D_ASSERT(nob != NULL);
331 tconn->oldest_tle = nob;
332 kfree(b);
333 }
334
335 spin_unlock_irq(&tconn->req_lock);
336 dec_ap_pending(mdev);
337
338 return;
339
340 bail:
341 spin_unlock_irq(&tconn->req_lock);
342 conn_request_state(tconn, NS(conn, C_PROTOCOL_ERROR), CS_HARD);
343 }
344
345
346 /**
347 * _tl_restart() - Walks the transfer log, and applies an action to all requests
348 * @mdev: DRBD device.
349 * @what: The action/event to perform with all request objects
350 *
351 * @what might be one of CONNECTION_LOST_WHILE_PENDING, RESEND, FAIL_FROZEN_DISK_IO,
352 * RESTART_FROZEN_DISK_IO.
353 */
354 void _tl_restart(struct drbd_tconn *tconn, enum drbd_req_event what)
355 {
356 struct drbd_tl_epoch *b, *tmp, **pn;
357 struct list_head *le, *tle, carry_reads;
358 struct drbd_request *req;
359 int rv, n_writes, n_reads;
360
361 b = tconn->oldest_tle;
362 pn = &tconn->oldest_tle;
363 while (b) {
364 n_writes = 0;
365 n_reads = 0;
366 INIT_LIST_HEAD(&carry_reads);
367 list_for_each_safe(le, tle, &b->requests) {
368 req = list_entry(le, struct drbd_request, tl_requests);
369 rv = _req_mod(req, what);
370
371 n_writes += (rv & MR_WRITE) >> MR_WRITE_SHIFT;
372 n_reads += (rv & MR_READ) >> MR_READ_SHIFT;
373 }
374 tmp = b->next;
375
376 if (n_writes) {
377 if (what == RESEND) {
378 b->n_writes = n_writes;
379 if (b->w.cb == NULL) {
380 b->w.cb = w_send_barrier;
381 inc_ap_pending(b->w.mdev);
382 set_bit(CREATE_BARRIER, &b->w.mdev->flags);
383 }
384
385 drbd_queue_work(&tconn->data.work, &b->w);
386 }
387 pn = &b->next;
388 } else {
389 if (n_reads)
390 list_add(&carry_reads, &b->requests);
391 /* there could still be requests on that ring list,
392 * in case local io is still pending */
393 list_del(&b->requests);
394
395 /* dec_ap_pending corresponding to queue_barrier.
396 * the newest barrier may not have been queued yet,
397 * in which case w.cb is still NULL. */
398 if (b->w.cb != NULL)
399 dec_ap_pending(b->w.mdev);
400
401 if (b == tconn->newest_tle) {
402 /* recycle, but reinit! */
403 if (tmp != NULL)
404 conn_err(tconn, "ASSERT FAILED tmp == NULL");
405 INIT_LIST_HEAD(&b->requests);
406 list_splice(&carry_reads, &b->requests);
407 INIT_LIST_HEAD(&b->w.list);
408 b->w.cb = NULL;
409 b->br_number = net_random();
410 b->n_writes = 0;
411
412 *pn = b;
413 break;
414 }
415 *pn = tmp;
416 kfree(b);
417 }
418 b = tmp;
419 list_splice(&carry_reads, &b->requests);
420 }
421
422 /* Actions operating on the disk state, also want to work on
423 requests that got barrier acked. */
424 switch (what) {
425 case FAIL_FROZEN_DISK_IO:
426 case RESTART_FROZEN_DISK_IO:
427 list_for_each_safe(le, tle, &tconn->barrier_acked_requests) {
428 req = list_entry(le, struct drbd_request, tl_requests);
429 _req_mod(req, what);
430 }
431 case CONNECTION_LOST_WHILE_PENDING:
432 case RESEND:
433 break;
434 default:
435 conn_err(tconn, "what = %d in _tl_restart()\n", what);
436 }
437 }
438
439 /**
440 * tl_clear() - Clears all requests and &struct drbd_tl_epoch objects out of the TL
441 * @mdev: DRBD device.
442 *
443 * This is called after the connection to the peer was lost. The storage covered
444 * by the requests on the transfer gets marked as our of sync. Called from the
445 * receiver thread and the worker thread.
446 */
447 void tl_clear(struct drbd_tconn *tconn)
448 {
449 struct drbd_conf *mdev;
450 struct list_head *le, *tle;
451 struct drbd_request *r;
452 int vnr;
453
454 spin_lock_irq(&tconn->req_lock);
455
456 _tl_restart(tconn, CONNECTION_LOST_WHILE_PENDING);
457
458 /* we expect this list to be empty. */
459 if (!list_empty(&tconn->out_of_sequence_requests))
460 conn_err(tconn, "ASSERT FAILED list_empty(&out_of_sequence_requests)\n");
461
462 /* but just in case, clean it up anyways! */
463 list_for_each_safe(le, tle, &tconn->out_of_sequence_requests) {
464 r = list_entry(le, struct drbd_request, tl_requests);
465 /* It would be nice to complete outside of spinlock.
466 * But this is easier for now. */
467 _req_mod(r, CONNECTION_LOST_WHILE_PENDING);
468 }
469
470 /* ensure bit indicating barrier is required is clear */
471 rcu_read_lock();
472 idr_for_each_entry(&tconn->volumes, mdev, vnr)
473 clear_bit(CREATE_BARRIER, &mdev->flags);
474 rcu_read_unlock();
475
476 spin_unlock_irq(&tconn->req_lock);
477 }
478
479 void tl_restart(struct drbd_tconn *tconn, enum drbd_req_event what)
480 {
481 spin_lock_irq(&tconn->req_lock);
482 _tl_restart(tconn, what);
483 spin_unlock_irq(&tconn->req_lock);
484 }
485
486 /**
487 * tl_apply() - Applies an event to all requests for a certain mdev in the TL
488 * @mdev: DRBD device.
489 * @what: The action/event to perform with all request objects
490 *
491 * @what might ony be ABORT_DISK_IO.
492 */
493 void tl_apply(struct drbd_conf *mdev, enum drbd_req_event what)
494 {
495 struct drbd_tconn *tconn = mdev->tconn;
496 struct drbd_tl_epoch *b;
497 struct list_head *le, *tle;
498 struct drbd_request *req;
499
500 D_ASSERT(what == ABORT_DISK_IO);
501
502 spin_lock_irq(&tconn->req_lock);
503 b = tconn->oldest_tle;
504 while (b) {
505 list_for_each_safe(le, tle, &b->requests) {
506 req = list_entry(le, struct drbd_request, tl_requests);
507 if (req->w.mdev == mdev)
508 _req_mod(req, what);
509 }
510 b = b->next;
511 }
512
513 list_for_each_safe(le, tle, &tconn->barrier_acked_requests) {
514 req = list_entry(le, struct drbd_request, tl_requests);
515 if (req->w.mdev == mdev)
516 _req_mod(req, what);
517 }
518
519 spin_unlock_irq(&tconn->req_lock);
520 }
521
522 static int drbd_thread_setup(void *arg)
523 {
524 struct drbd_thread *thi = (struct drbd_thread *) arg;
525 struct drbd_tconn *tconn = thi->tconn;
526 unsigned long flags;
527 int retval;
528
529 snprintf(current->comm, sizeof(current->comm), "drbd_%c_%s",
530 thi->name[0], thi->tconn->name);
531
532 restart:
533 retval = thi->function(thi);
534
535 spin_lock_irqsave(&thi->t_lock, flags);
536
537 /* if the receiver has been "EXITING", the last thing it did
538 * was set the conn state to "StandAlone",
539 * if now a re-connect request comes in, conn state goes C_UNCONNECTED,
540 * and receiver thread will be "started".
541 * drbd_thread_start needs to set "RESTARTING" in that case.
542 * t_state check and assignment needs to be within the same spinlock,
543 * so either thread_start sees EXITING, and can remap to RESTARTING,
544 * or thread_start see NONE, and can proceed as normal.
545 */
546
547 if (thi->t_state == RESTARTING) {
548 conn_info(tconn, "Restarting %s thread\n", thi->name);
549 thi->t_state = RUNNING;
550 spin_unlock_irqrestore(&thi->t_lock, flags);
551 goto restart;
552 }
553
554 thi->task = NULL;
555 thi->t_state = NONE;
556 smp_mb();
557 complete_all(&thi->stop);
558 spin_unlock_irqrestore(&thi->t_lock, flags);
559
560 conn_info(tconn, "Terminating %s\n", current->comm);
561
562 /* Release mod reference taken when thread was started */
563
564 kref_put(&tconn->kref, &conn_destroy);
565 module_put(THIS_MODULE);
566 return retval;
567 }
568
569 static void drbd_thread_init(struct drbd_tconn *tconn, struct drbd_thread *thi,
570 int (*func) (struct drbd_thread *), char *name)
571 {
572 spin_lock_init(&thi->t_lock);
573 thi->task = NULL;
574 thi->t_state = NONE;
575 thi->function = func;
576 thi->tconn = tconn;
577 strncpy(thi->name, name, ARRAY_SIZE(thi->name));
578 }
579
580 int drbd_thread_start(struct drbd_thread *thi)
581 {
582 struct drbd_tconn *tconn = thi->tconn;
583 struct task_struct *nt;
584 unsigned long flags;
585
586 /* is used from state engine doing drbd_thread_stop_nowait,
587 * while holding the req lock irqsave */
588 spin_lock_irqsave(&thi->t_lock, flags);
589
590 switch (thi->t_state) {
591 case NONE:
592 conn_info(tconn, "Starting %s thread (from %s [%d])\n",
593 thi->name, current->comm, current->pid);
594
595 /* Get ref on module for thread - this is released when thread exits */
596 if (!try_module_get(THIS_MODULE)) {
597 conn_err(tconn, "Failed to get module reference in drbd_thread_start\n");
598 spin_unlock_irqrestore(&thi->t_lock, flags);
599 return false;
600 }
601
602 kref_get(&thi->tconn->kref);
603
604 init_completion(&thi->stop);
605 thi->reset_cpu_mask = 1;
606 thi->t_state = RUNNING;
607 spin_unlock_irqrestore(&thi->t_lock, flags);
608 flush_signals(current); /* otherw. may get -ERESTARTNOINTR */
609
610 nt = kthread_create(drbd_thread_setup, (void *) thi,
611 "drbd_%c_%s", thi->name[0], thi->tconn->name);
612
613 if (IS_ERR(nt)) {
614 conn_err(tconn, "Couldn't start thread\n");
615
616 kref_put(&tconn->kref, &conn_destroy);
617 module_put(THIS_MODULE);
618 return false;
619 }
620 spin_lock_irqsave(&thi->t_lock, flags);
621 thi->task = nt;
622 thi->t_state = RUNNING;
623 spin_unlock_irqrestore(&thi->t_lock, flags);
624 wake_up_process(nt);
625 break;
626 case EXITING:
627 thi->t_state = RESTARTING;
628 conn_info(tconn, "Restarting %s thread (from %s [%d])\n",
629 thi->name, current->comm, current->pid);
630 /* fall through */
631 case RUNNING:
632 case RESTARTING:
633 default:
634 spin_unlock_irqrestore(&thi->t_lock, flags);
635 break;
636 }
637
638 return true;
639 }
640
641
642 void _drbd_thread_stop(struct drbd_thread *thi, int restart, int wait)
643 {
644 unsigned long flags;
645
646 enum drbd_thread_state ns = restart ? RESTARTING : EXITING;
647
648 /* may be called from state engine, holding the req lock irqsave */
649 spin_lock_irqsave(&thi->t_lock, flags);
650
651 if (thi->t_state == NONE) {
652 spin_unlock_irqrestore(&thi->t_lock, flags);
653 if (restart)
654 drbd_thread_start(thi);
655 return;
656 }
657
658 if (thi->t_state != ns) {
659 if (thi->task == NULL) {
660 spin_unlock_irqrestore(&thi->t_lock, flags);
661 return;
662 }
663
664 thi->t_state = ns;
665 smp_mb();
666 init_completion(&thi->stop);
667 if (thi->task != current)
668 force_sig(DRBD_SIGKILL, thi->task);
669 }
670
671 spin_unlock_irqrestore(&thi->t_lock, flags);
672
673 if (wait)
674 wait_for_completion(&thi->stop);
675 }
676
677 static struct drbd_thread *drbd_task_to_thread(struct drbd_tconn *tconn, struct task_struct *task)
678 {
679 struct drbd_thread *thi =
680 task == tconn->receiver.task ? &tconn->receiver :
681 task == tconn->asender.task ? &tconn->asender :
682 task == tconn->worker.task ? &tconn->worker : NULL;
683
684 return thi;
685 }
686
687 char *drbd_task_to_thread_name(struct drbd_tconn *tconn, struct task_struct *task)
688 {
689 struct drbd_thread *thi = drbd_task_to_thread(tconn, task);
690 return thi ? thi->name : task->comm;
691 }
692
693 int conn_lowest_minor(struct drbd_tconn *tconn)
694 {
695 struct drbd_conf *mdev;
696 int vnr = 0, m;
697
698 rcu_read_lock();
699 mdev = idr_get_next(&tconn->volumes, &vnr);
700 m = mdev ? mdev_to_minor(mdev) : -1;
701 rcu_read_unlock();
702
703 return m;
704 }
705
706 #ifdef CONFIG_SMP
707 /**
708 * drbd_calc_cpu_mask() - Generate CPU masks, spread over all CPUs
709 * @mdev: DRBD device.
710 *
711 * Forces all threads of a device onto the same CPU. This is beneficial for
712 * DRBD's performance. May be overwritten by user's configuration.
713 */
714 void drbd_calc_cpu_mask(struct drbd_tconn *tconn)
715 {
716 int ord, cpu;
717
718 /* user override. */
719 if (cpumask_weight(tconn->cpu_mask))
720 return;
721
722 ord = conn_lowest_minor(tconn) % cpumask_weight(cpu_online_mask);
723 for_each_online_cpu(cpu) {
724 if (ord-- == 0) {
725 cpumask_set_cpu(cpu, tconn->cpu_mask);
726 return;
727 }
728 }
729 /* should not be reached */
730 cpumask_setall(tconn->cpu_mask);
731 }
732
733 /**
734 * drbd_thread_current_set_cpu() - modifies the cpu mask of the _current_ thread
735 * @mdev: DRBD device.
736 * @thi: drbd_thread object
737 *
738 * call in the "main loop" of _all_ threads, no need for any mutex, current won't die
739 * prematurely.
740 */
741 void drbd_thread_current_set_cpu(struct drbd_thread *thi)
742 {
743 struct task_struct *p = current;
744
745 if (!thi->reset_cpu_mask)
746 return;
747 thi->reset_cpu_mask = 0;
748 set_cpus_allowed_ptr(p, thi->tconn->cpu_mask);
749 }
750 #endif
751
752 /**
753 * drbd_header_size - size of a packet header
754 *
755 * The header size is a multiple of 8, so any payload following the header is
756 * word aligned on 64-bit architectures. (The bitmap send and receive code
757 * relies on this.)
758 */
759 unsigned int drbd_header_size(struct drbd_tconn *tconn)
760 {
761 if (tconn->agreed_pro_version >= 100) {
762 BUILD_BUG_ON(!IS_ALIGNED(sizeof(struct p_header100), 8));
763 return sizeof(struct p_header100);
764 } else {
765 BUILD_BUG_ON(sizeof(struct p_header80) !=
766 sizeof(struct p_header95));
767 BUILD_BUG_ON(!IS_ALIGNED(sizeof(struct p_header80), 8));
768 return sizeof(struct p_header80);
769 }
770 }
771
772 static unsigned int prepare_header80(struct p_header80 *h, enum drbd_packet cmd, int size)
773 {
774 h->magic = cpu_to_be32(DRBD_MAGIC);
775 h->command = cpu_to_be16(cmd);
776 h->length = cpu_to_be16(size);
777 return sizeof(struct p_header80);
778 }
779
780 static unsigned int prepare_header95(struct p_header95 *h, enum drbd_packet cmd, int size)
781 {
782 h->magic = cpu_to_be16(DRBD_MAGIC_BIG);
783 h->command = cpu_to_be16(cmd);
784 h->length = cpu_to_be32(size);
785 return sizeof(struct p_header95);
786 }
787
788 static unsigned int prepare_header100(struct p_header100 *h, enum drbd_packet cmd,
789 int size, int vnr)
790 {
791 h->magic = cpu_to_be32(DRBD_MAGIC_100);
792 h->volume = cpu_to_be16(vnr);
793 h->command = cpu_to_be16(cmd);
794 h->length = cpu_to_be32(size);
795 h->pad = 0;
796 return sizeof(struct p_header100);
797 }
798
799 static unsigned int prepare_header(struct drbd_tconn *tconn, int vnr,
800 void *buffer, enum drbd_packet cmd, int size)
801 {
802 if (tconn->agreed_pro_version >= 100)
803 return prepare_header100(buffer, cmd, size, vnr);
804 else if (tconn->agreed_pro_version >= 95 &&
805 size > DRBD_MAX_SIZE_H80_PACKET)
806 return prepare_header95(buffer, cmd, size);
807 else
808 return prepare_header80(buffer, cmd, size);
809 }
810
811 static void *__conn_prepare_command(struct drbd_tconn *tconn,
812 struct drbd_socket *sock)
813 {
814 if (!sock->socket)
815 return NULL;
816 return sock->sbuf + drbd_header_size(tconn);
817 }
818
819 void *conn_prepare_command(struct drbd_tconn *tconn, struct drbd_socket *sock)
820 {
821 void *p;
822
823 mutex_lock(&sock->mutex);
824 p = __conn_prepare_command(tconn, sock);
825 if (!p)
826 mutex_unlock(&sock->mutex);
827
828 return p;
829 }
830
831 void *drbd_prepare_command(struct drbd_conf *mdev, struct drbd_socket *sock)
832 {
833 return conn_prepare_command(mdev->tconn, sock);
834 }
835
836 static int __send_command(struct drbd_tconn *tconn, int vnr,
837 struct drbd_socket *sock, enum drbd_packet cmd,
838 unsigned int header_size, void *data,
839 unsigned int size)
840 {
841 int msg_flags;
842 int err;
843
844 /*
845 * Called with @data == NULL and the size of the data blocks in @size
846 * for commands that send data blocks. For those commands, omit the
847 * MSG_MORE flag: this will increase the likelihood that data blocks
848 * which are page aligned on the sender will end up page aligned on the
849 * receiver.
850 */
851 msg_flags = data ? MSG_MORE : 0;
852
853 header_size += prepare_header(tconn, vnr, sock->sbuf, cmd,
854 header_size + size);
855 err = drbd_send_all(tconn, sock->socket, sock->sbuf, header_size,
856 msg_flags);
857 if (data && !err)
858 err = drbd_send_all(tconn, sock->socket, data, size, 0);
859 return err;
860 }
861
862 static int __conn_send_command(struct drbd_tconn *tconn, struct drbd_socket *sock,
863 enum drbd_packet cmd, unsigned int header_size,
864 void *data, unsigned int size)
865 {
866 return __send_command(tconn, 0, sock, cmd, header_size, data, size);
867 }
868
869 int conn_send_command(struct drbd_tconn *tconn, struct drbd_socket *sock,
870 enum drbd_packet cmd, unsigned int header_size,
871 void *data, unsigned int size)
872 {
873 int err;
874
875 err = __conn_send_command(tconn, sock, cmd, header_size, data, size);
876 mutex_unlock(&sock->mutex);
877 return err;
878 }
879
880 int drbd_send_command(struct drbd_conf *mdev, struct drbd_socket *sock,
881 enum drbd_packet cmd, unsigned int header_size,
882 void *data, unsigned int size)
883 {
884 int err;
885
886 err = __send_command(mdev->tconn, mdev->vnr, sock, cmd, header_size,
887 data, size);
888 mutex_unlock(&sock->mutex);
889 return err;
890 }
891
892 int drbd_send_ping(struct drbd_tconn *tconn)
893 {
894 struct drbd_socket *sock;
895
896 sock = &tconn->meta;
897 if (!conn_prepare_command(tconn, sock))
898 return -EIO;
899 return conn_send_command(tconn, sock, P_PING, 0, NULL, 0);
900 }
901
902 int drbd_send_ping_ack(struct drbd_tconn *tconn)
903 {
904 struct drbd_socket *sock;
905
906 sock = &tconn->meta;
907 if (!conn_prepare_command(tconn, sock))
908 return -EIO;
909 return conn_send_command(tconn, sock, P_PING_ACK, 0, NULL, 0);
910 }
911
912 int drbd_send_sync_param(struct drbd_conf *mdev)
913 {
914 struct drbd_socket *sock;
915 struct p_rs_param_95 *p;
916 int size;
917 const int apv = mdev->tconn->agreed_pro_version;
918 enum drbd_packet cmd;
919 struct net_conf *nc;
920 struct disk_conf *dc;
921
922 sock = &mdev->tconn->data;
923 p = drbd_prepare_command(mdev, sock);
924 if (!p)
925 return -EIO;
926
927 rcu_read_lock();
928 nc = rcu_dereference(mdev->tconn->net_conf);
929
930 size = apv <= 87 ? sizeof(struct p_rs_param)
931 : apv == 88 ? sizeof(struct p_rs_param)
932 + strlen(nc->verify_alg) + 1
933 : apv <= 94 ? sizeof(struct p_rs_param_89)
934 : /* apv >= 95 */ sizeof(struct p_rs_param_95);
935
936 cmd = apv >= 89 ? P_SYNC_PARAM89 : P_SYNC_PARAM;
937
938 /* initialize verify_alg and csums_alg */
939 memset(p->verify_alg, 0, 2 * SHARED_SECRET_MAX);
940
941 if (get_ldev(mdev)) {
942 dc = rcu_dereference(mdev->ldev->disk_conf);
943 p->resync_rate = cpu_to_be32(dc->resync_rate);
944 p->c_plan_ahead = cpu_to_be32(dc->c_plan_ahead);
945 p->c_delay_target = cpu_to_be32(dc->c_delay_target);
946 p->c_fill_target = cpu_to_be32(dc->c_fill_target);
947 p->c_max_rate = cpu_to_be32(dc->c_max_rate);
948 put_ldev(mdev);
949 } else {
950 p->resync_rate = cpu_to_be32(DRBD_RESYNC_RATE_DEF);
951 p->c_plan_ahead = cpu_to_be32(DRBD_C_PLAN_AHEAD_DEF);
952 p->c_delay_target = cpu_to_be32(DRBD_C_DELAY_TARGET_DEF);
953 p->c_fill_target = cpu_to_be32(DRBD_C_FILL_TARGET_DEF);
954 p->c_max_rate = cpu_to_be32(DRBD_C_MAX_RATE_DEF);
955 }
956
957 if (apv >= 88)
958 strcpy(p->verify_alg, nc->verify_alg);
959 if (apv >= 89)
960 strcpy(p->csums_alg, nc->csums_alg);
961 rcu_read_unlock();
962
963 return drbd_send_command(mdev, sock, cmd, size, NULL, 0);
964 }
965
966 int __drbd_send_protocol(struct drbd_tconn *tconn, enum drbd_packet cmd)
967 {
968 struct drbd_socket *sock;
969 struct p_protocol *p;
970 struct net_conf *nc;
971 int size, cf;
972
973 sock = &tconn->data;
974 p = __conn_prepare_command(tconn, sock);
975 if (!p)
976 return -EIO;
977
978 rcu_read_lock();
979 nc = rcu_dereference(tconn->net_conf);
980
981 if (nc->tentative && tconn->agreed_pro_version < 92) {
982 rcu_read_unlock();
983 mutex_unlock(&sock->mutex);
984 conn_err(tconn, "--dry-run is not supported by peer");
985 return -EOPNOTSUPP;
986 }
987
988 size = sizeof(*p);
989 if (tconn->agreed_pro_version >= 87)
990 size += strlen(nc->integrity_alg) + 1;
991
992 p->protocol = cpu_to_be32(nc->wire_protocol);
993 p->after_sb_0p = cpu_to_be32(nc->after_sb_0p);
994 p->after_sb_1p = cpu_to_be32(nc->after_sb_1p);
995 p->after_sb_2p = cpu_to_be32(nc->after_sb_2p);
996 p->two_primaries = cpu_to_be32(nc->two_primaries);
997 cf = 0;
998 if (nc->discard_my_data)
999 cf |= CF_DISCARD_MY_DATA;
1000 if (nc->tentative)
1001 cf |= CF_DRY_RUN;
1002 p->conn_flags = cpu_to_be32(cf);
1003
1004 if (tconn->agreed_pro_version >= 87)
1005 strcpy(p->integrity_alg, nc->integrity_alg);
1006 rcu_read_unlock();
1007
1008 return __conn_send_command(tconn, sock, cmd, size, NULL, 0);
1009 }
1010
1011 int drbd_send_protocol(struct drbd_tconn *tconn)
1012 {
1013 int err;
1014
1015 mutex_lock(&tconn->data.mutex);
1016 err = __drbd_send_protocol(tconn, P_PROTOCOL);
1017 mutex_unlock(&tconn->data.mutex);
1018
1019 return err;
1020 }
1021
1022 int _drbd_send_uuids(struct drbd_conf *mdev, u64 uuid_flags)
1023 {
1024 struct drbd_socket *sock;
1025 struct p_uuids *p;
1026 int i;
1027
1028 if (!get_ldev_if_state(mdev, D_NEGOTIATING))
1029 return 0;
1030
1031 sock = &mdev->tconn->data;
1032 p = drbd_prepare_command(mdev, sock);
1033 if (!p) {
1034 put_ldev(mdev);
1035 return -EIO;
1036 }
1037 for (i = UI_CURRENT; i < UI_SIZE; i++)
1038 p->uuid[i] = mdev->ldev ? cpu_to_be64(mdev->ldev->md.uuid[i]) : 0;
1039
1040 mdev->comm_bm_set = drbd_bm_total_weight(mdev);
1041 p->uuid[UI_SIZE] = cpu_to_be64(mdev->comm_bm_set);
1042 rcu_read_lock();
1043 uuid_flags |= rcu_dereference(mdev->tconn->net_conf)->discard_my_data ? 1 : 0;
1044 rcu_read_unlock();
1045 uuid_flags |= test_bit(CRASHED_PRIMARY, &mdev->flags) ? 2 : 0;
1046 uuid_flags |= mdev->new_state_tmp.disk == D_INCONSISTENT ? 4 : 0;
1047 p->uuid[UI_FLAGS] = cpu_to_be64(uuid_flags);
1048
1049 put_ldev(mdev);
1050 return drbd_send_command(mdev, sock, P_UUIDS, sizeof(*p), NULL, 0);
1051 }
1052
1053 int drbd_send_uuids(struct drbd_conf *mdev)
1054 {
1055 return _drbd_send_uuids(mdev, 0);
1056 }
1057
1058 int drbd_send_uuids_skip_initial_sync(struct drbd_conf *mdev)
1059 {
1060 return _drbd_send_uuids(mdev, 8);
1061 }
1062
1063 void drbd_print_uuids(struct drbd_conf *mdev, const char *text)
1064 {
1065 if (get_ldev_if_state(mdev, D_NEGOTIATING)) {
1066 u64 *uuid = mdev->ldev->md.uuid;
1067 dev_info(DEV, "%s %016llX:%016llX:%016llX:%016llX\n",
1068 text,
1069 (unsigned long long)uuid[UI_CURRENT],
1070 (unsigned long long)uuid[UI_BITMAP],
1071 (unsigned long long)uuid[UI_HISTORY_START],
1072 (unsigned long long)uuid[UI_HISTORY_END]);
1073 put_ldev(mdev);
1074 } else {
1075 dev_info(DEV, "%s effective data uuid: %016llX\n",
1076 text,
1077 (unsigned long long)mdev->ed_uuid);
1078 }
1079 }
1080
1081 void drbd_gen_and_send_sync_uuid(struct drbd_conf *mdev)
1082 {
1083 struct drbd_socket *sock;
1084 struct p_rs_uuid *p;
1085 u64 uuid;
1086
1087 D_ASSERT(mdev->state.disk == D_UP_TO_DATE);
1088
1089 uuid = mdev->ldev->md.uuid[UI_BITMAP] + UUID_NEW_BM_OFFSET;
1090 drbd_uuid_set(mdev, UI_BITMAP, uuid);
1091 drbd_print_uuids(mdev, "updated sync UUID");
1092 drbd_md_sync(mdev);
1093
1094 sock = &mdev->tconn->data;
1095 p = drbd_prepare_command(mdev, sock);
1096 if (p) {
1097 p->uuid = cpu_to_be64(uuid);
1098 drbd_send_command(mdev, sock, P_SYNC_UUID, sizeof(*p), NULL, 0);
1099 }
1100 }
1101
1102 int drbd_send_sizes(struct drbd_conf *mdev, int trigger_reply, enum dds_flags flags)
1103 {
1104 struct drbd_socket *sock;
1105 struct p_sizes *p;
1106 sector_t d_size, u_size;
1107 int q_order_type, max_bio_size;
1108
1109 if (get_ldev_if_state(mdev, D_NEGOTIATING)) {
1110 D_ASSERT(mdev->ldev->backing_bdev);
1111 d_size = drbd_get_max_capacity(mdev->ldev);
1112 rcu_read_lock();
1113 u_size = rcu_dereference(mdev->ldev->disk_conf)->disk_size;
1114 rcu_read_unlock();
1115 q_order_type = drbd_queue_order_type(mdev);
1116 max_bio_size = queue_max_hw_sectors(mdev->ldev->backing_bdev->bd_disk->queue) << 9;
1117 max_bio_size = min_t(int, max_bio_size, DRBD_MAX_BIO_SIZE);
1118 put_ldev(mdev);
1119 } else {
1120 d_size = 0;
1121 u_size = 0;
1122 q_order_type = QUEUE_ORDERED_NONE;
1123 max_bio_size = DRBD_MAX_BIO_SIZE; /* ... multiple BIOs per peer_request */
1124 }
1125
1126 sock = &mdev->tconn->data;
1127 p = drbd_prepare_command(mdev, sock);
1128 if (!p)
1129 return -EIO;
1130
1131 if (mdev->tconn->agreed_pro_version <= 94)
1132 max_bio_size = min_t(int, max_bio_size, DRBD_MAX_SIZE_H80_PACKET);
1133 else if (mdev->tconn->agreed_pro_version < 100)
1134 max_bio_size = min_t(int, max_bio_size, DRBD_MAX_BIO_SIZE_P95);
1135
1136 p->d_size = cpu_to_be64(d_size);
1137 p->u_size = cpu_to_be64(u_size);
1138 p->c_size = cpu_to_be64(trigger_reply ? 0 : drbd_get_capacity(mdev->this_bdev));
1139 p->max_bio_size = cpu_to_be32(max_bio_size);
1140 p->queue_order_type = cpu_to_be16(q_order_type);
1141 p->dds_flags = cpu_to_be16(flags);
1142 return drbd_send_command(mdev, sock, P_SIZES, sizeof(*p), NULL, 0);
1143 }
1144
1145 /**
1146 * drbd_send_state() - Sends the drbd state to the peer
1147 * @mdev: DRBD device.
1148 */
1149 int drbd_send_state(struct drbd_conf *mdev)
1150 {
1151 struct drbd_socket *sock;
1152 struct p_state *p;
1153
1154 sock = &mdev->tconn->data;
1155 p = drbd_prepare_command(mdev, sock);
1156 if (!p)
1157 return -EIO;
1158 p->state = cpu_to_be32(mdev->state.i); /* Within the send mutex */
1159 return drbd_send_command(mdev, sock, P_STATE, sizeof(*p), NULL, 0);
1160 }
1161
1162 int drbd_send_state_req(struct drbd_conf *mdev, union drbd_state mask, union drbd_state val)
1163 {
1164 struct drbd_socket *sock;
1165 struct p_req_state *p;
1166
1167 sock = &mdev->tconn->data;
1168 p = drbd_prepare_command(mdev, sock);
1169 if (!p)
1170 return -EIO;
1171 p->mask = cpu_to_be32(mask.i);
1172 p->val = cpu_to_be32(val.i);
1173 return drbd_send_command(mdev, sock, P_STATE_CHG_REQ, sizeof(*p), NULL, 0);
1174
1175 }
1176
1177 int conn_send_state_req(struct drbd_tconn *tconn, union drbd_state mask, union drbd_state val)
1178 {
1179 enum drbd_packet cmd;
1180 struct drbd_socket *sock;
1181 struct p_req_state *p;
1182
1183 cmd = tconn->agreed_pro_version < 100 ? P_STATE_CHG_REQ : P_CONN_ST_CHG_REQ;
1184 sock = &tconn->data;
1185 p = conn_prepare_command(tconn, sock);
1186 if (!p)
1187 return -EIO;
1188 p->mask = cpu_to_be32(mask.i);
1189 p->val = cpu_to_be32(val.i);
1190 return conn_send_command(tconn, sock, cmd, sizeof(*p), NULL, 0);
1191 }
1192
1193 void drbd_send_sr_reply(struct drbd_conf *mdev, enum drbd_state_rv retcode)
1194 {
1195 struct drbd_socket *sock;
1196 struct p_req_state_reply *p;
1197
1198 sock = &mdev->tconn->meta;
1199 p = drbd_prepare_command(mdev, sock);
1200 if (p) {
1201 p->retcode = cpu_to_be32(retcode);
1202 drbd_send_command(mdev, sock, P_STATE_CHG_REPLY, sizeof(*p), NULL, 0);
1203 }
1204 }
1205
1206 void conn_send_sr_reply(struct drbd_tconn *tconn, enum drbd_state_rv retcode)
1207 {
1208 struct drbd_socket *sock;
1209 struct p_req_state_reply *p;
1210 enum drbd_packet cmd = tconn->agreed_pro_version < 100 ? P_STATE_CHG_REPLY : P_CONN_ST_CHG_REPLY;
1211
1212 sock = &tconn->meta;
1213 p = conn_prepare_command(tconn, sock);
1214 if (p) {
1215 p->retcode = cpu_to_be32(retcode);
1216 conn_send_command(tconn, sock, cmd, sizeof(*p), NULL, 0);
1217 }
1218 }
1219
1220 static void dcbp_set_code(struct p_compressed_bm *p, enum drbd_bitmap_code code)
1221 {
1222 BUG_ON(code & ~0xf);
1223 p->encoding = (p->encoding & ~0xf) | code;
1224 }
1225
1226 static void dcbp_set_start(struct p_compressed_bm *p, int set)
1227 {
1228 p->encoding = (p->encoding & ~0x80) | (set ? 0x80 : 0);
1229 }
1230
1231 static void dcbp_set_pad_bits(struct p_compressed_bm *p, int n)
1232 {
1233 BUG_ON(n & ~0x7);
1234 p->encoding = (p->encoding & (~0x7 << 4)) | (n << 4);
1235 }
1236
1237 int fill_bitmap_rle_bits(struct drbd_conf *mdev,
1238 struct p_compressed_bm *p,
1239 unsigned int size,
1240 struct bm_xfer_ctx *c)
1241 {
1242 struct bitstream bs;
1243 unsigned long plain_bits;
1244 unsigned long tmp;
1245 unsigned long rl;
1246 unsigned len;
1247 unsigned toggle;
1248 int bits, use_rle;
1249
1250 /* may we use this feature? */
1251 rcu_read_lock();
1252 use_rle = rcu_dereference(mdev->tconn->net_conf)->use_rle;
1253 rcu_read_unlock();
1254 if (!use_rle || mdev->tconn->agreed_pro_version < 90)
1255 return 0;
1256
1257 if (c->bit_offset >= c->bm_bits)
1258 return 0; /* nothing to do. */
1259
1260 /* use at most thus many bytes */
1261 bitstream_init(&bs, p->code, size, 0);
1262 memset(p->code, 0, size);
1263 /* plain bits covered in this code string */
1264 plain_bits = 0;
1265
1266 /* p->encoding & 0x80 stores whether the first run length is set.
1267 * bit offset is implicit.
1268 * start with toggle == 2 to be able to tell the first iteration */
1269 toggle = 2;
1270
1271 /* see how much plain bits we can stuff into one packet
1272 * using RLE and VLI. */
1273 do {
1274 tmp = (toggle == 0) ? _drbd_bm_find_next_zero(mdev, c->bit_offset)
1275 : _drbd_bm_find_next(mdev, c->bit_offset);
1276 if (tmp == -1UL)
1277 tmp = c->bm_bits;
1278 rl = tmp - c->bit_offset;
1279
1280 if (toggle == 2) { /* first iteration */
1281 if (rl == 0) {
1282 /* the first checked bit was set,
1283 * store start value, */
1284 dcbp_set_start(p, 1);
1285 /* but skip encoding of zero run length */
1286 toggle = !toggle;
1287 continue;
1288 }
1289 dcbp_set_start(p, 0);
1290 }
1291
1292 /* paranoia: catch zero runlength.
1293 * can only happen if bitmap is modified while we scan it. */
1294 if (rl == 0) {
1295 dev_err(DEV, "unexpected zero runlength while encoding bitmap "
1296 "t:%u bo:%lu\n", toggle, c->bit_offset);
1297 return -1;
1298 }
1299
1300 bits = vli_encode_bits(&bs, rl);
1301 if (bits == -ENOBUFS) /* buffer full */
1302 break;
1303 if (bits <= 0) {
1304 dev_err(DEV, "error while encoding bitmap: %d\n", bits);
1305 return 0;
1306 }
1307
1308 toggle = !toggle;
1309 plain_bits += rl;
1310 c->bit_offset = tmp;
1311 } while (c->bit_offset < c->bm_bits);
1312
1313 len = bs.cur.b - p->code + !!bs.cur.bit;
1314
1315 if (plain_bits < (len << 3)) {
1316 /* incompressible with this method.
1317 * we need to rewind both word and bit position. */
1318 c->bit_offset -= plain_bits;
1319 bm_xfer_ctx_bit_to_word_offset(c);
1320 c->bit_offset = c->word_offset * BITS_PER_LONG;
1321 return 0;
1322 }
1323
1324 /* RLE + VLI was able to compress it just fine.
1325 * update c->word_offset. */
1326 bm_xfer_ctx_bit_to_word_offset(c);
1327
1328 /* store pad_bits */
1329 dcbp_set_pad_bits(p, (8 - bs.cur.bit) & 0x7);
1330
1331 return len;
1332 }
1333
1334 /**
1335 * send_bitmap_rle_or_plain
1336 *
1337 * Return 0 when done, 1 when another iteration is needed, and a negative error
1338 * code upon failure.
1339 */
1340 static int
1341 send_bitmap_rle_or_plain(struct drbd_conf *mdev, struct bm_xfer_ctx *c)
1342 {
1343 struct drbd_socket *sock = &mdev->tconn->data;
1344 unsigned int header_size = drbd_header_size(mdev->tconn);
1345 struct p_compressed_bm *p = sock->sbuf + header_size;
1346 int len, err;
1347
1348 len = fill_bitmap_rle_bits(mdev, p,
1349 DRBD_SOCKET_BUFFER_SIZE - header_size - sizeof(*p), c);
1350 if (len < 0)
1351 return -EIO;
1352
1353 if (len) {
1354 dcbp_set_code(p, RLE_VLI_Bits);
1355 err = __send_command(mdev->tconn, mdev->vnr, sock,
1356 P_COMPRESSED_BITMAP, sizeof(*p) + len,
1357 NULL, 0);
1358 c->packets[0]++;
1359 c->bytes[0] += header_size + sizeof(*p) + len;
1360
1361 if (c->bit_offset >= c->bm_bits)
1362 len = 0; /* DONE */
1363 } else {
1364 /* was not compressible.
1365 * send a buffer full of plain text bits instead. */
1366 unsigned int data_size;
1367 unsigned long num_words;
1368 unsigned long *p = sock->sbuf + header_size;
1369
1370 data_size = DRBD_SOCKET_BUFFER_SIZE - header_size;
1371 num_words = min_t(size_t, data_size / sizeof(*p),
1372 c->bm_words - c->word_offset);
1373 len = num_words * sizeof(*p);
1374 if (len)
1375 drbd_bm_get_lel(mdev, c->word_offset, num_words, p);
1376 err = __send_command(mdev->tconn, mdev->vnr, sock, P_BITMAP, len, NULL, 0);
1377 c->word_offset += num_words;
1378 c->bit_offset = c->word_offset * BITS_PER_LONG;
1379
1380 c->packets[1]++;
1381 c->bytes[1] += header_size + len;
1382
1383 if (c->bit_offset > c->bm_bits)
1384 c->bit_offset = c->bm_bits;
1385 }
1386 if (!err) {
1387 if (len == 0) {
1388 INFO_bm_xfer_stats(mdev, "send", c);
1389 return 0;
1390 } else
1391 return 1;
1392 }
1393 return -EIO;
1394 }
1395
1396 /* See the comment at receive_bitmap() */
1397 static int _drbd_send_bitmap(struct drbd_conf *mdev)
1398 {
1399 struct bm_xfer_ctx c;
1400 int err;
1401
1402 if (!expect(mdev->bitmap))
1403 return false;
1404
1405 if (get_ldev(mdev)) {
1406 if (drbd_md_test_flag(mdev->ldev, MDF_FULL_SYNC)) {
1407 dev_info(DEV, "Writing the whole bitmap, MDF_FullSync was set.\n");
1408 drbd_bm_set_all(mdev);
1409 if (drbd_bm_write(mdev)) {
1410 /* write_bm did fail! Leave full sync flag set in Meta P_DATA
1411 * but otherwise process as per normal - need to tell other
1412 * side that a full resync is required! */
1413 dev_err(DEV, "Failed to write bitmap to disk!\n");
1414 } else {
1415 drbd_md_clear_flag(mdev, MDF_FULL_SYNC);
1416 drbd_md_sync(mdev);
1417 }
1418 }
1419 put_ldev(mdev);
1420 }
1421
1422 c = (struct bm_xfer_ctx) {
1423 .bm_bits = drbd_bm_bits(mdev),
1424 .bm_words = drbd_bm_words(mdev),
1425 };
1426
1427 do {
1428 err = send_bitmap_rle_or_plain(mdev, &c);
1429 } while (err > 0);
1430
1431 return err == 0;
1432 }
1433
1434 int drbd_send_bitmap(struct drbd_conf *mdev)
1435 {
1436 struct drbd_socket *sock = &mdev->tconn->data;
1437 int err = -1;
1438
1439 mutex_lock(&sock->mutex);
1440 if (sock->socket)
1441 err = !_drbd_send_bitmap(mdev);
1442 mutex_unlock(&sock->mutex);
1443 return err;
1444 }
1445
1446 void drbd_send_b_ack(struct drbd_conf *mdev, u32 barrier_nr, u32 set_size)
1447 {
1448 struct drbd_socket *sock;
1449 struct p_barrier_ack *p;
1450
1451 if (mdev->state.conn < C_CONNECTED)
1452 return;
1453
1454 sock = &mdev->tconn->meta;
1455 p = drbd_prepare_command(mdev, sock);
1456 if (!p)
1457 return;
1458 p->barrier = barrier_nr;
1459 p->set_size = cpu_to_be32(set_size);
1460 drbd_send_command(mdev, sock, P_BARRIER_ACK, sizeof(*p), NULL, 0);
1461 }
1462
1463 /**
1464 * _drbd_send_ack() - Sends an ack packet
1465 * @mdev: DRBD device.
1466 * @cmd: Packet command code.
1467 * @sector: sector, needs to be in big endian byte order
1468 * @blksize: size in byte, needs to be in big endian byte order
1469 * @block_id: Id, big endian byte order
1470 */
1471 static int _drbd_send_ack(struct drbd_conf *mdev, enum drbd_packet cmd,
1472 u64 sector, u32 blksize, u64 block_id)
1473 {
1474 struct drbd_socket *sock;
1475 struct p_block_ack *p;
1476
1477 if (mdev->state.conn < C_CONNECTED)
1478 return -EIO;
1479
1480 sock = &mdev->tconn->meta;
1481 p = drbd_prepare_command(mdev, sock);
1482 if (!p)
1483 return -EIO;
1484 p->sector = sector;
1485 p->block_id = block_id;
1486 p->blksize = blksize;
1487 p->seq_num = cpu_to_be32(atomic_inc_return(&mdev->packet_seq));
1488 return drbd_send_command(mdev, sock, cmd, sizeof(*p), NULL, 0);
1489 }
1490
1491 /* dp->sector and dp->block_id already/still in network byte order,
1492 * data_size is payload size according to dp->head,
1493 * and may need to be corrected for digest size. */
1494 void drbd_send_ack_dp(struct drbd_conf *mdev, enum drbd_packet cmd,
1495 struct p_data *dp, int data_size)
1496 {
1497 if (mdev->tconn->peer_integrity_tfm)
1498 data_size -= crypto_hash_digestsize(mdev->tconn->peer_integrity_tfm);
1499 _drbd_send_ack(mdev, cmd, dp->sector, cpu_to_be32(data_size),
1500 dp->block_id);
1501 }
1502
1503 void drbd_send_ack_rp(struct drbd_conf *mdev, enum drbd_packet cmd,
1504 struct p_block_req *rp)
1505 {
1506 _drbd_send_ack(mdev, cmd, rp->sector, rp->blksize, rp->block_id);
1507 }
1508
1509 /**
1510 * drbd_send_ack() - Sends an ack packet
1511 * @mdev: DRBD device
1512 * @cmd: packet command code
1513 * @peer_req: peer request
1514 */
1515 int drbd_send_ack(struct drbd_conf *mdev, enum drbd_packet cmd,
1516 struct drbd_peer_request *peer_req)
1517 {
1518 return _drbd_send_ack(mdev, cmd,
1519 cpu_to_be64(peer_req->i.sector),
1520 cpu_to_be32(peer_req->i.size),
1521 peer_req->block_id);
1522 }
1523
1524 /* This function misuses the block_id field to signal if the blocks
1525 * are is sync or not. */
1526 int drbd_send_ack_ex(struct drbd_conf *mdev, enum drbd_packet cmd,
1527 sector_t sector, int blksize, u64 block_id)
1528 {
1529 return _drbd_send_ack(mdev, cmd,
1530 cpu_to_be64(sector),
1531 cpu_to_be32(blksize),
1532 cpu_to_be64(block_id));
1533 }
1534
1535 int drbd_send_drequest(struct drbd_conf *mdev, int cmd,
1536 sector_t sector, int size, u64 block_id)
1537 {
1538 struct drbd_socket *sock;
1539 struct p_block_req *p;
1540
1541 sock = &mdev->tconn->data;
1542 p = drbd_prepare_command(mdev, sock);
1543 if (!p)
1544 return -EIO;
1545 p->sector = cpu_to_be64(sector);
1546 p->block_id = block_id;
1547 p->blksize = cpu_to_be32(size);
1548 return drbd_send_command(mdev, sock, cmd, sizeof(*p), NULL, 0);
1549 }
1550
1551 int drbd_send_drequest_csum(struct drbd_conf *mdev, sector_t sector, int size,
1552 void *digest, int digest_size, enum drbd_packet cmd)
1553 {
1554 struct drbd_socket *sock;
1555 struct p_block_req *p;
1556
1557 /* FIXME: Put the digest into the preallocated socket buffer. */
1558
1559 sock = &mdev->tconn->data;
1560 p = drbd_prepare_command(mdev, sock);
1561 if (!p)
1562 return -EIO;
1563 p->sector = cpu_to_be64(sector);
1564 p->block_id = ID_SYNCER /* unused */;
1565 p->blksize = cpu_to_be32(size);
1566 return drbd_send_command(mdev, sock, cmd, sizeof(*p),
1567 digest, digest_size);
1568 }
1569
1570 int drbd_send_ov_request(struct drbd_conf *mdev, sector_t sector, int size)
1571 {
1572 struct drbd_socket *sock;
1573 struct p_block_req *p;
1574
1575 sock = &mdev->tconn->data;
1576 p = drbd_prepare_command(mdev, sock);
1577 if (!p)
1578 return -EIO;
1579 p->sector = cpu_to_be64(sector);
1580 p->block_id = ID_SYNCER /* unused */;
1581 p->blksize = cpu_to_be32(size);
1582 return drbd_send_command(mdev, sock, P_OV_REQUEST, sizeof(*p), NULL, 0);
1583 }
1584
1585 /* called on sndtimeo
1586 * returns false if we should retry,
1587 * true if we think connection is dead
1588 */
1589 static int we_should_drop_the_connection(struct drbd_tconn *tconn, struct socket *sock)
1590 {
1591 int drop_it;
1592 /* long elapsed = (long)(jiffies - mdev->last_received); */
1593
1594 drop_it = tconn->meta.socket == sock
1595 || !tconn->asender.task
1596 || get_t_state(&tconn->asender) != RUNNING
1597 || tconn->cstate < C_WF_REPORT_PARAMS;
1598
1599 if (drop_it)
1600 return true;
1601
1602 drop_it = !--tconn->ko_count;
1603 if (!drop_it) {
1604 conn_err(tconn, "[%s/%d] sock_sendmsg time expired, ko = %u\n",
1605 current->comm, current->pid, tconn->ko_count);
1606 request_ping(tconn);
1607 }
1608
1609 return drop_it; /* && (mdev->state == R_PRIMARY) */;
1610 }
1611
1612 static void drbd_update_congested(struct drbd_tconn *tconn)
1613 {
1614 struct sock *sk = tconn->data.socket->sk;
1615 if (sk->sk_wmem_queued > sk->sk_sndbuf * 4 / 5)
1616 set_bit(NET_CONGESTED, &tconn->flags);
1617 }
1618
1619 /* The idea of sendpage seems to be to put some kind of reference
1620 * to the page into the skb, and to hand it over to the NIC. In
1621 * this process get_page() gets called.
1622 *
1623 * As soon as the page was really sent over the network put_page()
1624 * gets called by some part of the network layer. [ NIC driver? ]
1625 *
1626 * [ get_page() / put_page() increment/decrement the count. If count
1627 * reaches 0 the page will be freed. ]
1628 *
1629 * This works nicely with pages from FSs.
1630 * But this means that in protocol A we might signal IO completion too early!
1631 *
1632 * In order not to corrupt data during a resync we must make sure
1633 * that we do not reuse our own buffer pages (EEs) to early, therefore
1634 * we have the net_ee list.
1635 *
1636 * XFS seems to have problems, still, it submits pages with page_count == 0!
1637 * As a workaround, we disable sendpage on pages
1638 * with page_count == 0 or PageSlab.
1639 */
1640 static int _drbd_no_send_page(struct drbd_conf *mdev, struct page *page,
1641 int offset, size_t size, unsigned msg_flags)
1642 {
1643 struct socket *socket;
1644 void *addr;
1645 int err;
1646
1647 socket = mdev->tconn->data.socket;
1648 addr = kmap(page) + offset;
1649 err = drbd_send_all(mdev->tconn, socket, addr, size, msg_flags);
1650 kunmap(page);
1651 if (!err)
1652 mdev->send_cnt += size >> 9;
1653 return err;
1654 }
1655
1656 static int _drbd_send_page(struct drbd_conf *mdev, struct page *page,
1657 int offset, size_t size, unsigned msg_flags)
1658 {
1659 struct socket *socket = mdev->tconn->data.socket;
1660 mm_segment_t oldfs = get_fs();
1661 int len = size;
1662 int err = -EIO;
1663
1664 /* e.g. XFS meta- & log-data is in slab pages, which have a
1665 * page_count of 0 and/or have PageSlab() set.
1666 * we cannot use send_page for those, as that does get_page();
1667 * put_page(); and would cause either a VM_BUG directly, or
1668 * __page_cache_release a page that would actually still be referenced
1669 * by someone, leading to some obscure delayed Oops somewhere else. */
1670 if (disable_sendpage || (page_count(page) < 1) || PageSlab(page))
1671 return _drbd_no_send_page(mdev, page, offset, size, msg_flags);
1672
1673 msg_flags |= MSG_NOSIGNAL;
1674 drbd_update_congested(mdev->tconn);
1675 set_fs(KERNEL_DS);
1676 do {
1677 int sent;
1678
1679 sent = socket->ops->sendpage(socket, page, offset, len, msg_flags);
1680 if (sent <= 0) {
1681 if (sent == -EAGAIN) {
1682 if (we_should_drop_the_connection(mdev->tconn, socket))
1683 break;
1684 continue;
1685 }
1686 dev_warn(DEV, "%s: size=%d len=%d sent=%d\n",
1687 __func__, (int)size, len, sent);
1688 if (sent < 0)
1689 err = sent;
1690 break;
1691 }
1692 len -= sent;
1693 offset += sent;
1694 } while (len > 0 /* THINK && mdev->cstate >= C_CONNECTED*/);
1695 set_fs(oldfs);
1696 clear_bit(NET_CONGESTED, &mdev->tconn->flags);
1697
1698 if (len == 0) {
1699 err = 0;
1700 mdev->send_cnt += size >> 9;
1701 }
1702 return err;
1703 }
1704
1705 static int _drbd_send_bio(struct drbd_conf *mdev, struct bio *bio)
1706 {
1707 struct bio_vec *bvec;
1708 int i;
1709 /* hint all but last page with MSG_MORE */
1710 __bio_for_each_segment(bvec, bio, i, 0) {
1711 int err;
1712
1713 err = _drbd_no_send_page(mdev, bvec->bv_page,
1714 bvec->bv_offset, bvec->bv_len,
1715 i == bio->bi_vcnt - 1 ? 0 : MSG_MORE);
1716 if (err)
1717 return err;
1718 }
1719 return 0;
1720 }
1721
1722 static int _drbd_send_zc_bio(struct drbd_conf *mdev, struct bio *bio)
1723 {
1724 struct bio_vec *bvec;
1725 int i;
1726 /* hint all but last page with MSG_MORE */
1727 __bio_for_each_segment(bvec, bio, i, 0) {
1728 int err;
1729
1730 err = _drbd_send_page(mdev, bvec->bv_page,
1731 bvec->bv_offset, bvec->bv_len,
1732 i == bio->bi_vcnt - 1 ? 0 : MSG_MORE);
1733 if (err)
1734 return err;
1735 }
1736 return 0;
1737 }
1738
1739 static int _drbd_send_zc_ee(struct drbd_conf *mdev,
1740 struct drbd_peer_request *peer_req)
1741 {
1742 struct page *page = peer_req->pages;
1743 unsigned len = peer_req->i.size;
1744 int err;
1745
1746 /* hint all but last page with MSG_MORE */
1747 page_chain_for_each(page) {
1748 unsigned l = min_t(unsigned, len, PAGE_SIZE);
1749
1750 err = _drbd_send_page(mdev, page, 0, l,
1751 page_chain_next(page) ? MSG_MORE : 0);
1752 if (err)
1753 return err;
1754 len -= l;
1755 }
1756 return 0;
1757 }
1758
1759 static u32 bio_flags_to_wire(struct drbd_conf *mdev, unsigned long bi_rw)
1760 {
1761 if (mdev->tconn->agreed_pro_version >= 95)
1762 return (bi_rw & REQ_SYNC ? DP_RW_SYNC : 0) |
1763 (bi_rw & REQ_FUA ? DP_FUA : 0) |
1764 (bi_rw & REQ_FLUSH ? DP_FLUSH : 0) |
1765 (bi_rw & REQ_DISCARD ? DP_DISCARD : 0);
1766 else
1767 return bi_rw & REQ_SYNC ? DP_RW_SYNC : 0;
1768 }
1769
1770 /* Used to send write requests
1771 * R_PRIMARY -> Peer (P_DATA)
1772 */
1773 int drbd_send_dblock(struct drbd_conf *mdev, struct drbd_request *req)
1774 {
1775 struct drbd_socket *sock;
1776 struct p_data *p;
1777 unsigned int dp_flags = 0;
1778 int dgs;
1779 int err;
1780
1781 sock = &mdev->tconn->data;
1782 p = drbd_prepare_command(mdev, sock);
1783 dgs = (mdev->tconn->agreed_pro_version >= 87 && mdev->tconn->integrity_tfm) ?
1784 crypto_hash_digestsize(mdev->tconn->integrity_tfm) : 0;
1785
1786 if (!p)
1787 return -EIO;
1788 p->sector = cpu_to_be64(req->i.sector);
1789 p->block_id = (unsigned long)req;
1790 p->seq_num = cpu_to_be32(req->seq_num = atomic_inc_return(&mdev->packet_seq));
1791 dp_flags = bio_flags_to_wire(mdev, req->master_bio->bi_rw);
1792 if (mdev->state.conn >= C_SYNC_SOURCE &&
1793 mdev->state.conn <= C_PAUSED_SYNC_T)
1794 dp_flags |= DP_MAY_SET_IN_SYNC;
1795 if (mdev->tconn->agreed_pro_version >= 100) {
1796 if (req->rq_state & RQ_EXP_RECEIVE_ACK)
1797 dp_flags |= DP_SEND_RECEIVE_ACK;
1798 if (req->rq_state & RQ_EXP_WRITE_ACK)
1799 dp_flags |= DP_SEND_WRITE_ACK;
1800 }
1801 p->dp_flags = cpu_to_be32(dp_flags);
1802 if (dgs)
1803 drbd_csum_bio(mdev, mdev->tconn->integrity_tfm, req->master_bio, p + 1);
1804 err = __send_command(mdev->tconn, mdev->vnr, sock, P_DATA, sizeof(*p) + dgs, NULL, req->i.size);
1805 if (!err) {
1806 /* For protocol A, we have to memcpy the payload into
1807 * socket buffers, as we may complete right away
1808 * as soon as we handed it over to tcp, at which point the data
1809 * pages may become invalid.
1810 *
1811 * For data-integrity enabled, we copy it as well, so we can be
1812 * sure that even if the bio pages may still be modified, it
1813 * won't change the data on the wire, thus if the digest checks
1814 * out ok after sending on this side, but does not fit on the
1815 * receiving side, we sure have detected corruption elsewhere.
1816 */
1817 if (!(req->rq_state & (RQ_EXP_RECEIVE_ACK | RQ_EXP_WRITE_ACK)) || dgs)
1818 err = _drbd_send_bio(mdev, req->master_bio);
1819 else
1820 err = _drbd_send_zc_bio(mdev, req->master_bio);
1821
1822 /* double check digest, sometimes buffers have been modified in flight. */
1823 if (dgs > 0 && dgs <= 64) {
1824 /* 64 byte, 512 bit, is the largest digest size
1825 * currently supported in kernel crypto. */
1826 unsigned char digest[64];
1827 drbd_csum_bio(mdev, mdev->tconn->integrity_tfm, req->master_bio, digest);
1828 if (memcmp(p + 1, digest, dgs)) {
1829 dev_warn(DEV,
1830 "Digest mismatch, buffer modified by upper layers during write: %llus +%u\n",
1831 (unsigned long long)req->i.sector, req->i.size);
1832 }
1833 } /* else if (dgs > 64) {
1834 ... Be noisy about digest too large ...
1835 } */
1836 }
1837 mutex_unlock(&sock->mutex); /* locked by drbd_prepare_command() */
1838
1839 return err;
1840 }
1841
1842 /* answer packet, used to send data back for read requests:
1843 * Peer -> (diskless) R_PRIMARY (P_DATA_REPLY)
1844 * C_SYNC_SOURCE -> C_SYNC_TARGET (P_RS_DATA_REPLY)
1845 */
1846 int drbd_send_block(struct drbd_conf *mdev, enum drbd_packet cmd,
1847 struct drbd_peer_request *peer_req)
1848 {
1849 struct drbd_socket *sock;
1850 struct p_data *p;
1851 int err;
1852 int dgs;
1853
1854 sock = &mdev->tconn->data;
1855 p = drbd_prepare_command(mdev, sock);
1856
1857 dgs = (mdev->tconn->agreed_pro_version >= 87 && mdev->tconn->integrity_tfm) ?
1858 crypto_hash_digestsize(mdev->tconn->integrity_tfm) : 0;
1859
1860 if (!p)
1861 return -EIO;
1862 p->sector = cpu_to_be64(peer_req->i.sector);
1863 p->block_id = peer_req->block_id;
1864 p->seq_num = 0; /* unused */
1865 if (dgs)
1866 drbd_csum_ee(mdev, mdev->tconn->integrity_tfm, peer_req, p + 1);
1867 err = __send_command(mdev->tconn, mdev->vnr, sock, cmd, sizeof(*p) + dgs, NULL, peer_req->i.size);
1868 if (!err)
1869 err = _drbd_send_zc_ee(mdev, peer_req);
1870 mutex_unlock(&sock->mutex); /* locked by drbd_prepare_command() */
1871
1872 return err;
1873 }
1874
1875 int drbd_send_out_of_sync(struct drbd_conf *mdev, struct drbd_request *req)
1876 {
1877 struct drbd_socket *sock;
1878 struct p_block_desc *p;
1879
1880 sock = &mdev->tconn->data;
1881 p = drbd_prepare_command(mdev, sock);
1882 if (!p)
1883 return -EIO;
1884 p->sector = cpu_to_be64(req->i.sector);
1885 p->blksize = cpu_to_be32(req->i.size);
1886 return drbd_send_command(mdev, sock, P_OUT_OF_SYNC, sizeof(*p), NULL, 0);
1887 }
1888
1889 /*
1890 drbd_send distinguishes two cases:
1891
1892 Packets sent via the data socket "sock"
1893 and packets sent via the meta data socket "msock"
1894
1895 sock msock
1896 -----------------+-------------------------+------------------------------
1897 timeout conf.timeout / 2 conf.timeout / 2
1898 timeout action send a ping via msock Abort communication
1899 and close all sockets
1900 */
1901
1902 /*
1903 * you must have down()ed the appropriate [m]sock_mutex elsewhere!
1904 */
1905 int drbd_send(struct drbd_tconn *tconn, struct socket *sock,
1906 void *buf, size_t size, unsigned msg_flags)
1907 {
1908 struct kvec iov;
1909 struct msghdr msg;
1910 int rv, sent = 0;
1911
1912 if (!sock)
1913 return -EBADR;
1914
1915 /* THINK if (signal_pending) return ... ? */
1916
1917 iov.iov_base = buf;
1918 iov.iov_len = size;
1919
1920 msg.msg_name = NULL;
1921 msg.msg_namelen = 0;
1922 msg.msg_control = NULL;
1923 msg.msg_controllen = 0;
1924 msg.msg_flags = msg_flags | MSG_NOSIGNAL;
1925
1926 if (sock == tconn->data.socket) {
1927 rcu_read_lock();
1928 tconn->ko_count = rcu_dereference(tconn->net_conf)->ko_count;
1929 rcu_read_unlock();
1930 drbd_update_congested(tconn);
1931 }
1932 do {
1933 /* STRANGE
1934 * tcp_sendmsg does _not_ use its size parameter at all ?
1935 *
1936 * -EAGAIN on timeout, -EINTR on signal.
1937 */
1938 /* THINK
1939 * do we need to block DRBD_SIG if sock == &meta.socket ??
1940 * otherwise wake_asender() might interrupt some send_*Ack !
1941 */
1942 rv = kernel_sendmsg(sock, &msg, &iov, 1, size);
1943 if (rv == -EAGAIN) {
1944 if (we_should_drop_the_connection(tconn, sock))
1945 break;
1946 else
1947 continue;
1948 }
1949 if (rv == -EINTR) {
1950 flush_signals(current);
1951 rv = 0;
1952 }
1953 if (rv < 0)
1954 break;
1955 sent += rv;
1956 iov.iov_base += rv;
1957 iov.iov_len -= rv;
1958 } while (sent < size);
1959
1960 if (sock == tconn->data.socket)
1961 clear_bit(NET_CONGESTED, &tconn->flags);
1962
1963 if (rv <= 0) {
1964 if (rv != -EAGAIN) {
1965 conn_err(tconn, "%s_sendmsg returned %d\n",
1966 sock == tconn->meta.socket ? "msock" : "sock",
1967 rv);
1968 conn_request_state(tconn, NS(conn, C_BROKEN_PIPE), CS_HARD);
1969 } else
1970 conn_request_state(tconn, NS(conn, C_TIMEOUT), CS_HARD);
1971 }
1972
1973 return sent;
1974 }
1975
1976 /**
1977 * drbd_send_all - Send an entire buffer
1978 *
1979 * Returns 0 upon success and a negative error value otherwise.
1980 */
1981 int drbd_send_all(struct drbd_tconn *tconn, struct socket *sock, void *buffer,
1982 size_t size, unsigned msg_flags)
1983 {
1984 int err;
1985
1986 err = drbd_send(tconn, sock, buffer, size, msg_flags);
1987 if (err < 0)
1988 return err;
1989 if (err != size)
1990 return -EIO;
1991 return 0;
1992 }
1993
1994 static int drbd_open(struct block_device *bdev, fmode_t mode)
1995 {
1996 struct drbd_conf *mdev = bdev->bd_disk->private_data;
1997 unsigned long flags;
1998 int rv = 0;
1999
2000 mutex_lock(&drbd_main_mutex);
2001 spin_lock_irqsave(&mdev->tconn->req_lock, flags);
2002 /* to have a stable mdev->state.role
2003 * and no race with updating open_cnt */
2004
2005 if (mdev->state.role != R_PRIMARY) {
2006 if (mode & FMODE_WRITE)
2007 rv = -EROFS;
2008 else if (!allow_oos)
2009 rv = -EMEDIUMTYPE;
2010 }
2011
2012 if (!rv)
2013 mdev->open_cnt++;
2014 spin_unlock_irqrestore(&mdev->tconn->req_lock, flags);
2015 mutex_unlock(&drbd_main_mutex);
2016
2017 return rv;
2018 }
2019
2020 static int drbd_release(struct gendisk *gd, fmode_t mode)
2021 {
2022 struct drbd_conf *mdev = gd->private_data;
2023 mutex_lock(&drbd_main_mutex);
2024 mdev->open_cnt--;
2025 mutex_unlock(&drbd_main_mutex);
2026 return 0;
2027 }
2028
2029 static void drbd_set_defaults(struct drbd_conf *mdev)
2030 {
2031 /* Beware! The actual layout differs
2032 * between big endian and little endian */
2033 mdev->state = (union drbd_dev_state) {
2034 { .role = R_SECONDARY,
2035 .peer = R_UNKNOWN,
2036 .conn = C_STANDALONE,
2037 .disk = D_DISKLESS,
2038 .pdsk = D_UNKNOWN,
2039 } };
2040 }
2041
2042 void drbd_init_set_defaults(struct drbd_conf *mdev)
2043 {
2044 /* the memset(,0,) did most of this.
2045 * note: only assignments, no allocation in here */
2046
2047 drbd_set_defaults(mdev);
2048
2049 atomic_set(&mdev->ap_bio_cnt, 0);
2050 atomic_set(&mdev->ap_pending_cnt, 0);
2051 atomic_set(&mdev->rs_pending_cnt, 0);
2052 atomic_set(&mdev->unacked_cnt, 0);
2053 atomic_set(&mdev->local_cnt, 0);
2054 atomic_set(&mdev->pp_in_use_by_net, 0);
2055 atomic_set(&mdev->rs_sect_in, 0);
2056 atomic_set(&mdev->rs_sect_ev, 0);
2057 atomic_set(&mdev->ap_in_flight, 0);
2058 atomic_set(&mdev->md_io_in_use, 0);
2059
2060 mutex_init(&mdev->own_state_mutex);
2061 mdev->state_mutex = &mdev->own_state_mutex;
2062
2063 spin_lock_init(&mdev->al_lock);
2064 spin_lock_init(&mdev->peer_seq_lock);
2065 spin_lock_init(&mdev->epoch_lock);
2066
2067 INIT_LIST_HEAD(&mdev->active_ee);
2068 INIT_LIST_HEAD(&mdev->sync_ee);
2069 INIT_LIST_HEAD(&mdev->done_ee);
2070 INIT_LIST_HEAD(&mdev->read_ee);
2071 INIT_LIST_HEAD(&mdev->net_ee);
2072 INIT_LIST_HEAD(&mdev->resync_reads);
2073 INIT_LIST_HEAD(&mdev->resync_work.list);
2074 INIT_LIST_HEAD(&mdev->unplug_work.list);
2075 INIT_LIST_HEAD(&mdev->go_diskless.list);
2076 INIT_LIST_HEAD(&mdev->md_sync_work.list);
2077 INIT_LIST_HEAD(&mdev->start_resync_work.list);
2078 INIT_LIST_HEAD(&mdev->bm_io_work.w.list);
2079
2080 mdev->resync_work.cb = w_resync_timer;
2081 mdev->unplug_work.cb = w_send_write_hint;
2082 mdev->go_diskless.cb = w_go_diskless;
2083 mdev->md_sync_work.cb = w_md_sync;
2084 mdev->bm_io_work.w.cb = w_bitmap_io;
2085 mdev->start_resync_work.cb = w_start_resync;
2086
2087 mdev->resync_work.mdev = mdev;
2088 mdev->unplug_work.mdev = mdev;
2089 mdev->go_diskless.mdev = mdev;
2090 mdev->md_sync_work.mdev = mdev;
2091 mdev->bm_io_work.w.mdev = mdev;
2092 mdev->start_resync_work.mdev = mdev;
2093
2094 init_timer(&mdev->resync_timer);
2095 init_timer(&mdev->md_sync_timer);
2096 init_timer(&mdev->start_resync_timer);
2097 init_timer(&mdev->request_timer);
2098 mdev->resync_timer.function = resync_timer_fn;
2099 mdev->resync_timer.data = (unsigned long) mdev;
2100 mdev->md_sync_timer.function = md_sync_timer_fn;
2101 mdev->md_sync_timer.data = (unsigned long) mdev;
2102 mdev->start_resync_timer.function = start_resync_timer_fn;
2103 mdev->start_resync_timer.data = (unsigned long) mdev;
2104 mdev->request_timer.function = request_timer_fn;
2105 mdev->request_timer.data = (unsigned long) mdev;
2106
2107 init_waitqueue_head(&mdev->misc_wait);
2108 init_waitqueue_head(&mdev->state_wait);
2109 init_waitqueue_head(&mdev->ee_wait);
2110 init_waitqueue_head(&mdev->al_wait);
2111 init_waitqueue_head(&mdev->seq_wait);
2112
2113 mdev->write_ordering = WO_bdev_flush;
2114 mdev->resync_wenr = LC_FREE;
2115 mdev->peer_max_bio_size = DRBD_MAX_BIO_SIZE_SAFE;
2116 mdev->local_max_bio_size = DRBD_MAX_BIO_SIZE_SAFE;
2117 }
2118
2119 void drbd_mdev_cleanup(struct drbd_conf *mdev)
2120 {
2121 int i;
2122 if (mdev->tconn->receiver.t_state != NONE)
2123 dev_err(DEV, "ASSERT FAILED: receiver t_state == %d expected 0.\n",
2124 mdev->tconn->receiver.t_state);
2125
2126 /* no need to lock it, I'm the only thread alive */
2127 if (atomic_read(&mdev->current_epoch->epoch_size) != 0)
2128 dev_err(DEV, "epoch_size:%d\n", atomic_read(&mdev->current_epoch->epoch_size));
2129 mdev->al_writ_cnt =
2130 mdev->bm_writ_cnt =
2131 mdev->read_cnt =
2132 mdev->recv_cnt =
2133 mdev->send_cnt =
2134 mdev->writ_cnt =
2135 mdev->p_size =
2136 mdev->rs_start =
2137 mdev->rs_total =
2138 mdev->rs_failed = 0;
2139 mdev->rs_last_events = 0;
2140 mdev->rs_last_sect_ev = 0;
2141 for (i = 0; i < DRBD_SYNC_MARKS; i++) {
2142 mdev->rs_mark_left[i] = 0;
2143 mdev->rs_mark_time[i] = 0;
2144 }
2145 D_ASSERT(mdev->tconn->net_conf == NULL);
2146
2147 drbd_set_my_capacity(mdev, 0);
2148 if (mdev->bitmap) {
2149 /* maybe never allocated. */
2150 drbd_bm_resize(mdev, 0, 1);
2151 drbd_bm_cleanup(mdev);
2152 }
2153
2154 drbd_free_bc(mdev->ldev);
2155 mdev->ldev = NULL;
2156
2157 clear_bit(AL_SUSPENDED, &mdev->flags);
2158
2159 D_ASSERT(list_empty(&mdev->active_ee));
2160 D_ASSERT(list_empty(&mdev->sync_ee));
2161 D_ASSERT(list_empty(&mdev->done_ee));
2162 D_ASSERT(list_empty(&mdev->read_ee));
2163 D_ASSERT(list_empty(&mdev->net_ee));
2164 D_ASSERT(list_empty(&mdev->resync_reads));
2165 D_ASSERT(list_empty(&mdev->tconn->data.work.q));
2166 D_ASSERT(list_empty(&mdev->tconn->meta.work.q));
2167 D_ASSERT(list_empty(&mdev->resync_work.list));
2168 D_ASSERT(list_empty(&mdev->unplug_work.list));
2169 D_ASSERT(list_empty(&mdev->go_diskless.list));
2170
2171 drbd_set_defaults(mdev);
2172 }
2173
2174
2175 static void drbd_destroy_mempools(void)
2176 {
2177 struct page *page;
2178
2179 while (drbd_pp_pool) {
2180 page = drbd_pp_pool;
2181 drbd_pp_pool = (struct page *)page_private(page);
2182 __free_page(page);
2183 drbd_pp_vacant--;
2184 }
2185
2186 /* D_ASSERT(atomic_read(&drbd_pp_vacant)==0); */
2187
2188 if (drbd_md_io_bio_set)
2189 bioset_free(drbd_md_io_bio_set);
2190 if (drbd_md_io_page_pool)
2191 mempool_destroy(drbd_md_io_page_pool);
2192 if (drbd_ee_mempool)
2193 mempool_destroy(drbd_ee_mempool);
2194 if (drbd_request_mempool)
2195 mempool_destroy(drbd_request_mempool);
2196 if (drbd_ee_cache)
2197 kmem_cache_destroy(drbd_ee_cache);
2198 if (drbd_request_cache)
2199 kmem_cache_destroy(drbd_request_cache);
2200 if (drbd_bm_ext_cache)
2201 kmem_cache_destroy(drbd_bm_ext_cache);
2202 if (drbd_al_ext_cache)
2203 kmem_cache_destroy(drbd_al_ext_cache);
2204
2205 drbd_md_io_bio_set = NULL;
2206 drbd_md_io_page_pool = NULL;
2207 drbd_ee_mempool = NULL;
2208 drbd_request_mempool = NULL;
2209 drbd_ee_cache = NULL;
2210 drbd_request_cache = NULL;
2211 drbd_bm_ext_cache = NULL;
2212 drbd_al_ext_cache = NULL;
2213
2214 return;
2215 }
2216
2217 static int drbd_create_mempools(void)
2218 {
2219 struct page *page;
2220 const int number = (DRBD_MAX_BIO_SIZE/PAGE_SIZE) * minor_count;
2221 int i;
2222
2223 /* prepare our caches and mempools */
2224 drbd_request_mempool = NULL;
2225 drbd_ee_cache = NULL;
2226 drbd_request_cache = NULL;
2227 drbd_bm_ext_cache = NULL;
2228 drbd_al_ext_cache = NULL;
2229 drbd_pp_pool = NULL;
2230 drbd_md_io_page_pool = NULL;
2231 drbd_md_io_bio_set = NULL;
2232
2233 /* caches */
2234 drbd_request_cache = kmem_cache_create(
2235 "drbd_req", sizeof(struct drbd_request), 0, 0, NULL);
2236 if (drbd_request_cache == NULL)
2237 goto Enomem;
2238
2239 drbd_ee_cache = kmem_cache_create(
2240 "drbd_ee", sizeof(struct drbd_peer_request), 0, 0, NULL);
2241 if (drbd_ee_cache == NULL)
2242 goto Enomem;
2243
2244 drbd_bm_ext_cache = kmem_cache_create(
2245 "drbd_bm", sizeof(struct bm_extent), 0, 0, NULL);
2246 if (drbd_bm_ext_cache == NULL)
2247 goto Enomem;
2248
2249 drbd_al_ext_cache = kmem_cache_create(
2250 "drbd_al", sizeof(struct lc_element), 0, 0, NULL);
2251 if (drbd_al_ext_cache == NULL)
2252 goto Enomem;
2253
2254 /* mempools */
2255 drbd_md_io_bio_set = bioset_create(DRBD_MIN_POOL_PAGES, 0);
2256 if (drbd_md_io_bio_set == NULL)
2257 goto Enomem;
2258
2259 drbd_md_io_page_pool = mempool_create_page_pool(DRBD_MIN_POOL_PAGES, 0);
2260 if (drbd_md_io_page_pool == NULL)
2261 goto Enomem;
2262
2263 drbd_request_mempool = mempool_create(number,
2264 mempool_alloc_slab, mempool_free_slab, drbd_request_cache);
2265 if (drbd_request_mempool == NULL)
2266 goto Enomem;
2267
2268 drbd_ee_mempool = mempool_create(number,
2269 mempool_alloc_slab, mempool_free_slab, drbd_ee_cache);
2270 if (drbd_ee_mempool == NULL)
2271 goto Enomem;
2272
2273 /* drbd's page pool */
2274 spin_lock_init(&drbd_pp_lock);
2275
2276 for (i = 0; i < number; i++) {
2277 page = alloc_page(GFP_HIGHUSER);
2278 if (!page)
2279 goto Enomem;
2280 set_page_private(page, (unsigned long)drbd_pp_pool);
2281 drbd_pp_pool = page;
2282 }
2283 drbd_pp_vacant = number;
2284
2285 return 0;
2286
2287 Enomem:
2288 drbd_destroy_mempools(); /* in case we allocated some */
2289 return -ENOMEM;
2290 }
2291
2292 static int drbd_notify_sys(struct notifier_block *this, unsigned long code,
2293 void *unused)
2294 {
2295 /* just so we have it. you never know what interesting things we
2296 * might want to do here some day...
2297 */
2298
2299 return NOTIFY_DONE;
2300 }
2301
2302 static struct notifier_block drbd_notifier = {
2303 .notifier_call = drbd_notify_sys,
2304 };
2305
2306 static void drbd_release_all_peer_reqs(struct drbd_conf *mdev)
2307 {
2308 int rr;
2309
2310 rr = drbd_free_peer_reqs(mdev, &mdev->active_ee);
2311 if (rr)
2312 dev_err(DEV, "%d EEs in active list found!\n", rr);
2313
2314 rr = drbd_free_peer_reqs(mdev, &mdev->sync_ee);
2315 if (rr)
2316 dev_err(DEV, "%d EEs in sync list found!\n", rr);
2317
2318 rr = drbd_free_peer_reqs(mdev, &mdev->read_ee);
2319 if (rr)
2320 dev_err(DEV, "%d EEs in read list found!\n", rr);
2321
2322 rr = drbd_free_peer_reqs(mdev, &mdev->done_ee);
2323 if (rr)
2324 dev_err(DEV, "%d EEs in done list found!\n", rr);
2325
2326 rr = drbd_free_peer_reqs(mdev, &mdev->net_ee);
2327 if (rr)
2328 dev_err(DEV, "%d EEs in net list found!\n", rr);
2329 }
2330
2331 /* caution. no locking. */
2332 void drbd_minor_destroy(struct kref *kref)
2333 {
2334 struct drbd_conf *mdev = container_of(kref, struct drbd_conf, kref);
2335 struct drbd_tconn *tconn = mdev->tconn;
2336
2337 del_timer_sync(&mdev->request_timer);
2338
2339 /* paranoia asserts */
2340 D_ASSERT(mdev->open_cnt == 0);
2341 D_ASSERT(list_empty(&mdev->tconn->data.work.q));
2342 /* end paranoia asserts */
2343
2344 /* cleanup stuff that may have been allocated during
2345 * device (re-)configuration or state changes */
2346
2347 if (mdev->this_bdev)
2348 bdput(mdev->this_bdev);
2349
2350 drbd_free_bc(mdev->ldev);
2351 mdev->ldev = NULL;
2352
2353 drbd_release_all_peer_reqs(mdev);
2354
2355 lc_destroy(mdev->act_log);
2356 lc_destroy(mdev->resync);
2357
2358 kfree(mdev->p_uuid);
2359 /* mdev->p_uuid = NULL; */
2360
2361 kfree(mdev->current_epoch);
2362 if (mdev->bitmap) /* should no longer be there. */
2363 drbd_bm_cleanup(mdev);
2364 __free_page(mdev->md_io_page);
2365 put_disk(mdev->vdisk);
2366 blk_cleanup_queue(mdev->rq_queue);
2367 kfree(mdev->rs_plan_s);
2368 kfree(mdev);
2369
2370 kref_put(&tconn->kref, &conn_destroy);
2371 }
2372
2373 static void drbd_cleanup(void)
2374 {
2375 unsigned int i;
2376 struct drbd_conf *mdev;
2377 struct drbd_tconn *tconn, *tmp;
2378
2379 unregister_reboot_notifier(&drbd_notifier);
2380
2381 /* first remove proc,
2382 * drbdsetup uses it's presence to detect
2383 * whether DRBD is loaded.
2384 * If we would get stuck in proc removal,
2385 * but have netlink already deregistered,
2386 * some drbdsetup commands may wait forever
2387 * for an answer.
2388 */
2389 if (drbd_proc)
2390 remove_proc_entry("drbd", NULL);
2391
2392 drbd_genl_unregister();
2393
2394 idr_for_each_entry(&minors, mdev, i) {
2395 idr_remove(&minors, mdev_to_minor(mdev));
2396 idr_remove(&mdev->tconn->volumes, mdev->vnr);
2397 del_gendisk(mdev->vdisk);
2398 /* synchronize_rcu(); No other threads running at this point */
2399 kref_put(&mdev->kref, &drbd_minor_destroy);
2400 }
2401
2402 /* not _rcu since, no other updater anymore. Genl already unregistered */
2403 list_for_each_entry_safe(tconn, tmp, &drbd_tconns, all_tconn) {
2404 list_del(&tconn->all_tconn); /* not _rcu no proc, not other threads */
2405 /* synchronize_rcu(); */
2406 kref_put(&tconn->kref, &conn_destroy);
2407 }
2408
2409 drbd_destroy_mempools();
2410 unregister_blkdev(DRBD_MAJOR, "drbd");
2411
2412 idr_destroy(&minors);
2413
2414 printk(KERN_INFO "drbd: module cleanup done.\n");
2415 }
2416
2417 /**
2418 * drbd_congested() - Callback for pdflush
2419 * @congested_data: User data
2420 * @bdi_bits: Bits pdflush is currently interested in
2421 *
2422 * Returns 1<<BDI_async_congested and/or 1<<BDI_sync_congested if we are congested.
2423 */
2424 static int drbd_congested(void *congested_data, int bdi_bits)
2425 {
2426 struct drbd_conf *mdev = congested_data;
2427 struct request_queue *q;
2428 char reason = '-';
2429 int r = 0;
2430
2431 if (!may_inc_ap_bio(mdev)) {
2432 /* DRBD has frozen IO */
2433 r = bdi_bits;
2434 reason = 'd';
2435 goto out;
2436 }
2437
2438 if (get_ldev(mdev)) {
2439 q = bdev_get_queue(mdev->ldev->backing_bdev);
2440 r = bdi_congested(&q->backing_dev_info, bdi_bits);
2441 put_ldev(mdev);
2442 if (r)
2443 reason = 'b';
2444 }
2445
2446 if (bdi_bits & (1 << BDI_async_congested) && test_bit(NET_CONGESTED, &mdev->tconn->flags)) {
2447 r |= (1 << BDI_async_congested);
2448 reason = reason == 'b' ? 'a' : 'n';
2449 }
2450
2451 out:
2452 mdev->congestion_reason = reason;
2453 return r;
2454 }
2455
2456 static void drbd_init_workqueue(struct drbd_work_queue* wq)
2457 {
2458 sema_init(&wq->s, 0);
2459 spin_lock_init(&wq->q_lock);
2460 INIT_LIST_HEAD(&wq->q);
2461 }
2462
2463 struct drbd_tconn *conn_get_by_name(const char *name)
2464 {
2465 struct drbd_tconn *tconn;
2466
2467 if (!name || !name[0])
2468 return NULL;
2469
2470 rcu_read_lock();
2471 list_for_each_entry_rcu(tconn, &drbd_tconns, all_tconn) {
2472 if (!strcmp(tconn->name, name)) {
2473 kref_get(&tconn->kref);
2474 goto found;
2475 }
2476 }
2477 tconn = NULL;
2478 found:
2479 rcu_read_unlock();
2480 return tconn;
2481 }
2482
2483 struct drbd_tconn *conn_get_by_addrs(void *my_addr, int my_addr_len,
2484 void *peer_addr, int peer_addr_len)
2485 {
2486 struct drbd_tconn *tconn;
2487
2488 rcu_read_lock();
2489 list_for_each_entry_rcu(tconn, &drbd_tconns, all_tconn) {
2490 if (tconn->my_addr_len == my_addr_len &&
2491 tconn->peer_addr_len == peer_addr_len &&
2492 !memcmp(&tconn->my_addr, my_addr, my_addr_len) &&
2493 !memcmp(&tconn->peer_addr, peer_addr, peer_addr_len)) {
2494 kref_get(&tconn->kref);
2495 goto found;
2496 }
2497 }
2498 tconn = NULL;
2499 found:
2500 rcu_read_unlock();
2501 return tconn;
2502 }
2503
2504 static int drbd_alloc_socket(struct drbd_socket *socket)
2505 {
2506 socket->rbuf = (void *) __get_free_page(GFP_KERNEL);
2507 if (!socket->rbuf)
2508 return -ENOMEM;
2509 socket->sbuf = (void *) __get_free_page(GFP_KERNEL);
2510 if (!socket->sbuf)
2511 return -ENOMEM;
2512 return 0;
2513 }
2514
2515 static void drbd_free_socket(struct drbd_socket *socket)
2516 {
2517 free_page((unsigned long) socket->sbuf);
2518 free_page((unsigned long) socket->rbuf);
2519 }
2520
2521 void conn_free_crypto(struct drbd_tconn *tconn)
2522 {
2523 drbd_free_sock(tconn);
2524
2525 crypto_free_hash(tconn->csums_tfm);
2526 crypto_free_hash(tconn->verify_tfm);
2527 crypto_free_hash(tconn->cram_hmac_tfm);
2528 crypto_free_hash(tconn->integrity_tfm);
2529 crypto_free_hash(tconn->peer_integrity_tfm);
2530 kfree(tconn->int_dig_in);
2531 kfree(tconn->int_dig_vv);
2532
2533 tconn->csums_tfm = NULL;
2534 tconn->verify_tfm = NULL;
2535 tconn->cram_hmac_tfm = NULL;
2536 tconn->integrity_tfm = NULL;
2537 tconn->peer_integrity_tfm = NULL;
2538 tconn->int_dig_in = NULL;
2539 tconn->int_dig_vv = NULL;
2540 }
2541
2542 int set_resource_options(struct drbd_tconn *tconn, struct res_opts *res_opts)
2543 {
2544 cpumask_var_t new_cpu_mask;
2545 int err;
2546
2547 if (!zalloc_cpumask_var(&new_cpu_mask, GFP_KERNEL))
2548 return -ENOMEM;
2549 /*
2550 retcode = ERR_NOMEM;
2551 drbd_msg_put_info("unable to allocate cpumask");
2552 */
2553
2554 /* silently ignore cpu mask on UP kernel */
2555 if (nr_cpu_ids > 1 && res_opts->cpu_mask[0] != 0) {
2556 /* FIXME: Get rid of constant 32 here */
2557 err = __bitmap_parse(res_opts->cpu_mask, 32, 0,
2558 cpumask_bits(new_cpu_mask), nr_cpu_ids);
2559 if (err) {
2560 conn_warn(tconn, "__bitmap_parse() failed with %d\n", err);
2561 /* retcode = ERR_CPU_MASK_PARSE; */
2562 goto fail;
2563 }
2564 }
2565 tconn->res_opts = *res_opts;
2566 if (!cpumask_equal(tconn->cpu_mask, new_cpu_mask)) {
2567 cpumask_copy(tconn->cpu_mask, new_cpu_mask);
2568 drbd_calc_cpu_mask(tconn);
2569 tconn->receiver.reset_cpu_mask = 1;
2570 tconn->asender.reset_cpu_mask = 1;
2571 tconn->worker.reset_cpu_mask = 1;
2572 }
2573 err = 0;
2574
2575 fail:
2576 free_cpumask_var(new_cpu_mask);
2577 return err;
2578
2579 }
2580
2581 /* caller must be under genl_lock() */
2582 struct drbd_tconn *conn_create(const char *name, struct res_opts *res_opts)
2583 {
2584 struct drbd_tconn *tconn;
2585
2586 tconn = kzalloc(sizeof(struct drbd_tconn), GFP_KERNEL);
2587 if (!tconn)
2588 return NULL;
2589
2590 tconn->name = kstrdup(name, GFP_KERNEL);
2591 if (!tconn->name)
2592 goto fail;
2593
2594 if (drbd_alloc_socket(&tconn->data))
2595 goto fail;
2596 if (drbd_alloc_socket(&tconn->meta))
2597 goto fail;
2598
2599 if (!zalloc_cpumask_var(&tconn->cpu_mask, GFP_KERNEL))
2600 goto fail;
2601
2602 if (set_resource_options(tconn, res_opts))
2603 goto fail;
2604
2605 if (!tl_init(tconn))
2606 goto fail;
2607
2608 tconn->cstate = C_STANDALONE;
2609 mutex_init(&tconn->cstate_mutex);
2610 spin_lock_init(&tconn->req_lock);
2611 mutex_init(&tconn->conf_update);
2612 init_waitqueue_head(&tconn->ping_wait);
2613 idr_init(&tconn->volumes);
2614
2615 drbd_init_workqueue(&tconn->data.work);
2616 mutex_init(&tconn->data.mutex);
2617
2618 drbd_init_workqueue(&tconn->meta.work);
2619 mutex_init(&tconn->meta.mutex);
2620
2621 drbd_thread_init(tconn, &tconn->receiver, drbdd_init, "receiver");
2622 drbd_thread_init(tconn, &tconn->worker, drbd_worker, "worker");
2623 drbd_thread_init(tconn, &tconn->asender, drbd_asender, "asender");
2624
2625 kref_init(&tconn->kref);
2626 list_add_tail_rcu(&tconn->all_tconn, &drbd_tconns);
2627
2628 return tconn;
2629
2630 fail:
2631 tl_cleanup(tconn);
2632 free_cpumask_var(tconn->cpu_mask);
2633 drbd_free_socket(&tconn->meta);
2634 drbd_free_socket(&tconn->data);
2635 kfree(tconn->name);
2636 kfree(tconn);
2637
2638 return NULL;
2639 }
2640
2641 void conn_destroy(struct kref *kref)
2642 {
2643 struct drbd_tconn *tconn = container_of(kref, struct drbd_tconn, kref);
2644
2645 idr_destroy(&tconn->volumes);
2646
2647 free_cpumask_var(tconn->cpu_mask);
2648 drbd_free_socket(&tconn->meta);
2649 drbd_free_socket(&tconn->data);
2650 kfree(tconn->name);
2651 kfree(tconn->int_dig_in);
2652 kfree(tconn->int_dig_vv);
2653 kfree(tconn);
2654 }
2655
2656 enum drbd_ret_code conn_new_minor(struct drbd_tconn *tconn, unsigned int minor, int vnr)
2657 {
2658 struct drbd_conf *mdev;
2659 struct gendisk *disk;
2660 struct request_queue *q;
2661 int vnr_got = vnr;
2662 int minor_got = minor;
2663 enum drbd_ret_code err = ERR_NOMEM;
2664
2665 mdev = minor_to_mdev(minor);
2666 if (mdev)
2667 return ERR_MINOR_EXISTS;
2668
2669 /* GFP_KERNEL, we are outside of all write-out paths */
2670 mdev = kzalloc(sizeof(struct drbd_conf), GFP_KERNEL);
2671 if (!mdev)
2672 return ERR_NOMEM;
2673
2674 kref_get(&tconn->kref);
2675 mdev->tconn = tconn;
2676
2677 mdev->minor = minor;
2678 mdev->vnr = vnr;
2679
2680 drbd_init_set_defaults(mdev);
2681
2682 q = blk_alloc_queue(GFP_KERNEL);
2683 if (!q)
2684 goto out_no_q;
2685 mdev->rq_queue = q;
2686 q->queuedata = mdev;
2687
2688 disk = alloc_disk(1);
2689 if (!disk)
2690 goto out_no_disk;
2691 mdev->vdisk = disk;
2692
2693 set_disk_ro(disk, true);
2694
2695 disk->queue = q;
2696 disk->major = DRBD_MAJOR;
2697 disk->first_minor = minor;
2698 disk->fops = &drbd_ops;
2699 sprintf(disk->disk_name, "drbd%d", minor);
2700 disk->private_data = mdev;
2701
2702 mdev->this_bdev = bdget(MKDEV(DRBD_MAJOR, minor));
2703 /* we have no partitions. we contain only ourselves. */
2704 mdev->this_bdev->bd_contains = mdev->this_bdev;
2705
2706 q->backing_dev_info.congested_fn = drbd_congested;
2707 q->backing_dev_info.congested_data = mdev;
2708
2709 blk_queue_make_request(q, drbd_make_request);
2710 /* Setting the max_hw_sectors to an odd value of 8kibyte here
2711 This triggers a max_bio_size message upon first attach or connect */
2712 blk_queue_max_hw_sectors(q, DRBD_MAX_BIO_SIZE_SAFE >> 8);
2713 blk_queue_bounce_limit(q, BLK_BOUNCE_ANY);
2714 blk_queue_merge_bvec(q, drbd_merge_bvec);
2715 q->queue_lock = &mdev->tconn->req_lock; /* needed since we use */
2716
2717 mdev->md_io_page = alloc_page(GFP_KERNEL);
2718 if (!mdev->md_io_page)
2719 goto out_no_io_page;
2720
2721 if (drbd_bm_init(mdev))
2722 goto out_no_bitmap;
2723 mdev->read_requests = RB_ROOT;
2724 mdev->write_requests = RB_ROOT;
2725
2726 mdev->current_epoch = kzalloc(sizeof(struct drbd_epoch), GFP_KERNEL);
2727 if (!mdev->current_epoch)
2728 goto out_no_epoch;
2729
2730 INIT_LIST_HEAD(&mdev->current_epoch->list);
2731 mdev->epochs = 1;
2732
2733 if (!idr_pre_get(&minors, GFP_KERNEL))
2734 goto out_no_minor_idr;
2735 if (idr_get_new_above(&minors, mdev, minor, &minor_got))
2736 goto out_no_minor_idr;
2737 if (minor_got != minor) {
2738 err = ERR_MINOR_EXISTS;
2739 drbd_msg_put_info("requested minor exists already");
2740 goto out_idr_remove_minor;
2741 }
2742
2743 if (!idr_pre_get(&tconn->volumes, GFP_KERNEL))
2744 goto out_idr_remove_minor;
2745 if (idr_get_new_above(&tconn->volumes, mdev, vnr, &vnr_got))
2746 goto out_idr_remove_minor;
2747 if (vnr_got != vnr) {
2748 err = ERR_INVALID_REQUEST;
2749 drbd_msg_put_info("requested volume exists already");
2750 goto out_idr_remove_vol;
2751 }
2752 add_disk(disk);
2753 kref_init(&mdev->kref); /* one ref for both idrs and the the add_disk */
2754
2755 /* inherit the connection state */
2756 mdev->state.conn = tconn->cstate;
2757 if (mdev->state.conn == C_WF_REPORT_PARAMS)
2758 drbd_connected(mdev);
2759
2760 return NO_ERROR;
2761
2762 out_idr_remove_vol:
2763 idr_remove(&tconn->volumes, vnr_got);
2764 out_idr_remove_minor:
2765 idr_remove(&minors, minor_got);
2766 synchronize_rcu();
2767 out_no_minor_idr:
2768 kfree(mdev->current_epoch);
2769 out_no_epoch:
2770 drbd_bm_cleanup(mdev);
2771 out_no_bitmap:
2772 __free_page(mdev->md_io_page);
2773 out_no_io_page:
2774 put_disk(disk);
2775 out_no_disk:
2776 blk_cleanup_queue(q);
2777 out_no_q:
2778 kfree(mdev);
2779 kref_put(&tconn->kref, &conn_destroy);
2780 return err;
2781 }
2782
2783 int __init drbd_init(void)
2784 {
2785 int err;
2786
2787 if (minor_count < DRBD_MINOR_COUNT_MIN || minor_count > DRBD_MINOR_COUNT_MAX) {
2788 printk(KERN_ERR
2789 "drbd: invalid minor_count (%d)\n", minor_count);
2790 #ifdef MODULE
2791 return -EINVAL;
2792 #else
2793 minor_count = DRBD_MINOR_COUNT_DEF;
2794 #endif
2795 }
2796
2797 err = register_blkdev(DRBD_MAJOR, "drbd");
2798 if (err) {
2799 printk(KERN_ERR
2800 "drbd: unable to register block device major %d\n",
2801 DRBD_MAJOR);
2802 return err;
2803 }
2804
2805 err = drbd_genl_register();
2806 if (err) {
2807 printk(KERN_ERR "drbd: unable to register generic netlink family\n");
2808 goto fail;
2809 }
2810
2811
2812 register_reboot_notifier(&drbd_notifier);
2813
2814 /*
2815 * allocate all necessary structs
2816 */
2817 err = -ENOMEM;
2818
2819 init_waitqueue_head(&drbd_pp_wait);
2820
2821 drbd_proc = NULL; /* play safe for drbd_cleanup */
2822 idr_init(&minors);
2823
2824 err = drbd_create_mempools();
2825 if (err)
2826 goto fail;
2827
2828 drbd_proc = proc_create_data("drbd", S_IFREG | S_IRUGO , NULL, &drbd_proc_fops, NULL);
2829 if (!drbd_proc) {
2830 printk(KERN_ERR "drbd: unable to register proc file\n");
2831 goto fail;
2832 }
2833
2834 rwlock_init(&global_state_lock);
2835 INIT_LIST_HEAD(&drbd_tconns);
2836
2837 printk(KERN_INFO "drbd: initialized. "
2838 "Version: " REL_VERSION " (api:%d/proto:%d-%d)\n",
2839 API_VERSION, PRO_VERSION_MIN, PRO_VERSION_MAX);
2840 printk(KERN_INFO "drbd: %s\n", drbd_buildtag());
2841 printk(KERN_INFO "drbd: registered as block device major %d\n",
2842 DRBD_MAJOR);
2843
2844 return 0; /* Success! */
2845
2846 fail:
2847 drbd_cleanup();
2848 if (err == -ENOMEM)
2849 /* currently always the case */
2850 printk(KERN_ERR "drbd: ran out of memory\n");
2851 else
2852 printk(KERN_ERR "drbd: initialization failure\n");
2853 return err;
2854 }
2855
2856 void drbd_free_bc(struct drbd_backing_dev *ldev)
2857 {
2858 if (ldev == NULL)
2859 return;
2860
2861 blkdev_put(ldev->backing_bdev, FMODE_READ | FMODE_WRITE | FMODE_EXCL);
2862 blkdev_put(ldev->md_bdev, FMODE_READ | FMODE_WRITE | FMODE_EXCL);
2863
2864 kfree(ldev);
2865 }
2866
2867 void drbd_free_sock(struct drbd_tconn *tconn)
2868 {
2869 if (tconn->data.socket) {
2870 mutex_lock(&tconn->data.mutex);
2871 kernel_sock_shutdown(tconn->data.socket, SHUT_RDWR);
2872 sock_release(tconn->data.socket);
2873 tconn->data.socket = NULL;
2874 mutex_unlock(&tconn->data.mutex);
2875 }
2876 if (tconn->meta.socket) {
2877 mutex_lock(&tconn->meta.mutex);
2878 kernel_sock_shutdown(tconn->meta.socket, SHUT_RDWR);
2879 sock_release(tconn->meta.socket);
2880 tconn->meta.socket = NULL;
2881 mutex_unlock(&tconn->meta.mutex);
2882 }
2883 }
2884
2885 /* meta data management */
2886
2887 struct meta_data_on_disk {
2888 u64 la_size; /* last agreed size. */
2889 u64 uuid[UI_SIZE]; /* UUIDs. */
2890 u64 device_uuid;
2891 u64 reserved_u64_1;
2892 u32 flags; /* MDF */
2893 u32 magic;
2894 u32 md_size_sect;
2895 u32 al_offset; /* offset to this block */
2896 u32 al_nr_extents; /* important for restoring the AL */
2897 /* `-- act_log->nr_elements <-- ldev->dc.al_extents */
2898 u32 bm_offset; /* offset to the bitmap, from here */
2899 u32 bm_bytes_per_bit; /* BM_BLOCK_SIZE */
2900 u32 la_peer_max_bio_size; /* last peer max_bio_size */
2901 u32 reserved_u32[3];
2902
2903 } __packed;
2904
2905 /**
2906 * drbd_md_sync() - Writes the meta data super block if the MD_DIRTY flag bit is set
2907 * @mdev: DRBD device.
2908 */
2909 void drbd_md_sync(struct drbd_conf *mdev)
2910 {
2911 struct meta_data_on_disk *buffer;
2912 sector_t sector;
2913 int i;
2914
2915 del_timer(&mdev->md_sync_timer);
2916 /* timer may be rearmed by drbd_md_mark_dirty() now. */
2917 if (!test_and_clear_bit(MD_DIRTY, &mdev->flags))
2918 return;
2919
2920 /* We use here D_FAILED and not D_ATTACHING because we try to write
2921 * metadata even if we detach due to a disk failure! */
2922 if (!get_ldev_if_state(mdev, D_FAILED))
2923 return;
2924
2925 buffer = drbd_md_get_buffer(mdev);
2926 if (!buffer)
2927 goto out;
2928
2929 memset(buffer, 0, 512);
2930
2931 buffer->la_size = cpu_to_be64(drbd_get_capacity(mdev->this_bdev));
2932 for (i = UI_CURRENT; i < UI_SIZE; i++)
2933 buffer->uuid[i] = cpu_to_be64(mdev->ldev->md.uuid[i]);
2934 buffer->flags = cpu_to_be32(mdev->ldev->md.flags);
2935 buffer->magic = cpu_to_be32(DRBD_MD_MAGIC);
2936
2937 buffer->md_size_sect = cpu_to_be32(mdev->ldev->md.md_size_sect);
2938 buffer->al_offset = cpu_to_be32(mdev->ldev->md.al_offset);
2939 buffer->al_nr_extents = cpu_to_be32(mdev->act_log->nr_elements);
2940 buffer->bm_bytes_per_bit = cpu_to_be32(BM_BLOCK_SIZE);
2941 buffer->device_uuid = cpu_to_be64(mdev->ldev->md.device_uuid);
2942
2943 buffer->bm_offset = cpu_to_be32(mdev->ldev->md.bm_offset);
2944 buffer->la_peer_max_bio_size = cpu_to_be32(mdev->peer_max_bio_size);
2945
2946 D_ASSERT(drbd_md_ss__(mdev, mdev->ldev) == mdev->ldev->md.md_offset);
2947 sector = mdev->ldev->md.md_offset;
2948
2949 if (drbd_md_sync_page_io(mdev, mdev->ldev, sector, WRITE)) {
2950 /* this was a try anyways ... */
2951 dev_err(DEV, "meta data update failed!\n");
2952 drbd_chk_io_error(mdev, 1, true);
2953 }
2954
2955 /* Update mdev->ldev->md.la_size_sect,
2956 * since we updated it on metadata. */
2957 mdev->ldev->md.la_size_sect = drbd_get_capacity(mdev->this_bdev);
2958
2959 drbd_md_put_buffer(mdev);
2960 out:
2961 put_ldev(mdev);
2962 }
2963
2964 /**
2965 * drbd_md_read() - Reads in the meta data super block
2966 * @mdev: DRBD device.
2967 * @bdev: Device from which the meta data should be read in.
2968 *
2969 * Return 0 (NO_ERROR) on success, and an enum drbd_ret_code in case
2970 * something goes wrong. Currently only: ERR_IO_MD_DISK, ERR_MD_INVALID.
2971 */
2972 int drbd_md_read(struct drbd_conf *mdev, struct drbd_backing_dev *bdev)
2973 {
2974 struct meta_data_on_disk *buffer;
2975 int i, rv = NO_ERROR;
2976
2977 if (!get_ldev_if_state(mdev, D_ATTACHING))
2978 return ERR_IO_MD_DISK;
2979
2980 buffer = drbd_md_get_buffer(mdev);
2981 if (!buffer)
2982 goto out;
2983
2984 if (drbd_md_sync_page_io(mdev, bdev, bdev->md.md_offset, READ)) {
2985 /* NOTE: can't do normal error processing here as this is
2986 called BEFORE disk is attached */
2987 dev_err(DEV, "Error while reading metadata.\n");
2988 rv = ERR_IO_MD_DISK;
2989 goto err;
2990 }
2991
2992 if (buffer->magic != cpu_to_be32(DRBD_MD_MAGIC)) {
2993 dev_err(DEV, "Error while reading metadata, magic not found.\n");
2994 rv = ERR_MD_INVALID;
2995 goto err;
2996 }
2997 if (be32_to_cpu(buffer->al_offset) != bdev->md.al_offset) {
2998 dev_err(DEV, "unexpected al_offset: %d (expected %d)\n",
2999 be32_to_cpu(buffer->al_offset), bdev->md.al_offset);
3000 rv = ERR_MD_INVALID;
3001 goto err;
3002 }
3003 if (be32_to_cpu(buffer->bm_offset) != bdev->md.bm_offset) {
3004 dev_err(DEV, "unexpected bm_offset: %d (expected %d)\n",
3005 be32_to_cpu(buffer->bm_offset), bdev->md.bm_offset);
3006 rv = ERR_MD_INVALID;
3007 goto err;
3008 }
3009 if (be32_to_cpu(buffer->md_size_sect) != bdev->md.md_size_sect) {
3010 dev_err(DEV, "unexpected md_size: %u (expected %u)\n",
3011 be32_to_cpu(buffer->md_size_sect), bdev->md.md_size_sect);
3012 rv = ERR_MD_INVALID;
3013 goto err;
3014 }
3015
3016 if (be32_to_cpu(buffer->bm_bytes_per_bit) != BM_BLOCK_SIZE) {
3017 dev_err(DEV, "unexpected bm_bytes_per_bit: %u (expected %u)\n",
3018 be32_to_cpu(buffer->bm_bytes_per_bit), BM_BLOCK_SIZE);
3019 rv = ERR_MD_INVALID;
3020 goto err;
3021 }
3022
3023 bdev->md.la_size_sect = be64_to_cpu(buffer->la_size);
3024 for (i = UI_CURRENT; i < UI_SIZE; i++)
3025 bdev->md.uuid[i] = be64_to_cpu(buffer->uuid[i]);
3026 bdev->md.flags = be32_to_cpu(buffer->flags);
3027 bdev->md.device_uuid = be64_to_cpu(buffer->device_uuid);
3028
3029 spin_lock_irq(&mdev->tconn->req_lock);
3030 if (mdev->state.conn < C_CONNECTED) {
3031 int peer;
3032 peer = be32_to_cpu(buffer->la_peer_max_bio_size);
3033 peer = max_t(int, peer, DRBD_MAX_BIO_SIZE_SAFE);
3034 mdev->peer_max_bio_size = peer;
3035 }
3036 spin_unlock_irq(&mdev->tconn->req_lock);
3037
3038 /* This blocks wants to be get removed... */
3039 bdev->disk_conf->al_extents = be32_to_cpu(buffer->al_nr_extents);
3040 if (bdev->disk_conf->al_extents < DRBD_AL_EXTENTS_MIN)
3041 bdev->disk_conf->al_extents = DRBD_AL_EXTENTS_DEF;
3042
3043 err:
3044 drbd_md_put_buffer(mdev);
3045 out:
3046 put_ldev(mdev);
3047
3048 return rv;
3049 }
3050
3051 /**
3052 * drbd_md_mark_dirty() - Mark meta data super block as dirty
3053 * @mdev: DRBD device.
3054 *
3055 * Call this function if you change anything that should be written to
3056 * the meta-data super block. This function sets MD_DIRTY, and starts a
3057 * timer that ensures that within five seconds you have to call drbd_md_sync().
3058 */
3059 #ifdef DEBUG
3060 void drbd_md_mark_dirty_(struct drbd_conf *mdev, unsigned int line, const char *func)
3061 {
3062 if (!test_and_set_bit(MD_DIRTY, &mdev->flags)) {
3063 mod_timer(&mdev->md_sync_timer, jiffies + HZ);
3064 mdev->last_md_mark_dirty.line = line;
3065 mdev->last_md_mark_dirty.func = func;
3066 }
3067 }
3068 #else
3069 void drbd_md_mark_dirty(struct drbd_conf *mdev)
3070 {
3071 if (!test_and_set_bit(MD_DIRTY, &mdev->flags))
3072 mod_timer(&mdev->md_sync_timer, jiffies + 5*HZ);
3073 }
3074 #endif
3075
3076 static void drbd_uuid_move_history(struct drbd_conf *mdev) __must_hold(local)
3077 {
3078 int i;
3079
3080 for (i = UI_HISTORY_START; i < UI_HISTORY_END; i++)
3081 mdev->ldev->md.uuid[i+1] = mdev->ldev->md.uuid[i];
3082 }
3083
3084 void _drbd_uuid_set(struct drbd_conf *mdev, int idx, u64 val) __must_hold(local)
3085 {
3086 if (idx == UI_CURRENT) {
3087 if (mdev->state.role == R_PRIMARY)
3088 val |= 1;
3089 else
3090 val &= ~((u64)1);
3091
3092 drbd_set_ed_uuid(mdev, val);
3093 }
3094
3095 mdev->ldev->md.uuid[idx] = val;
3096 drbd_md_mark_dirty(mdev);
3097 }
3098
3099
3100 void drbd_uuid_set(struct drbd_conf *mdev, int idx, u64 val) __must_hold(local)
3101 {
3102 if (mdev->ldev->md.uuid[idx]) {
3103 drbd_uuid_move_history(mdev);
3104 mdev->ldev->md.uuid[UI_HISTORY_START] = mdev->ldev->md.uuid[idx];
3105 }
3106 _drbd_uuid_set(mdev, idx, val);
3107 }
3108
3109 /**
3110 * drbd_uuid_new_current() - Creates a new current UUID
3111 * @mdev: DRBD device.
3112 *
3113 * Creates a new current UUID, and rotates the old current UUID into
3114 * the bitmap slot. Causes an incremental resync upon next connect.
3115 */
3116 void drbd_uuid_new_current(struct drbd_conf *mdev) __must_hold(local)
3117 {
3118 u64 val;
3119 unsigned long long bm_uuid = mdev->ldev->md.uuid[UI_BITMAP];
3120
3121 if (bm_uuid)
3122 dev_warn(DEV, "bm UUID was already set: %llX\n", bm_uuid);
3123
3124 mdev->ldev->md.uuid[UI_BITMAP] = mdev->ldev->md.uuid[UI_CURRENT];
3125
3126 get_random_bytes(&val, sizeof(u64));
3127 _drbd_uuid_set(mdev, UI_CURRENT, val);
3128 drbd_print_uuids(mdev, "new current UUID");
3129 /* get it to stable storage _now_ */
3130 drbd_md_sync(mdev);
3131 }
3132
3133 void drbd_uuid_set_bm(struct drbd_conf *mdev, u64 val) __must_hold(local)
3134 {
3135 if (mdev->ldev->md.uuid[UI_BITMAP] == 0 && val == 0)
3136 return;
3137
3138 if (val == 0) {
3139 drbd_uuid_move_history(mdev);
3140 mdev->ldev->md.uuid[UI_HISTORY_START] = mdev->ldev->md.uuid[UI_BITMAP];
3141 mdev->ldev->md.uuid[UI_BITMAP] = 0;
3142 } else {
3143 unsigned long long bm_uuid = mdev->ldev->md.uuid[UI_BITMAP];
3144 if (bm_uuid)
3145 dev_warn(DEV, "bm UUID was already set: %llX\n", bm_uuid);
3146
3147 mdev->ldev->md.uuid[UI_BITMAP] = val & ~((u64)1);
3148 }
3149 drbd_md_mark_dirty(mdev);
3150 }
3151
3152 /**
3153 * drbd_bmio_set_n_write() - io_fn for drbd_queue_bitmap_io() or drbd_bitmap_io()
3154 * @mdev: DRBD device.
3155 *
3156 * Sets all bits in the bitmap and writes the whole bitmap to stable storage.
3157 */
3158 int drbd_bmio_set_n_write(struct drbd_conf *mdev)
3159 {
3160 int rv = -EIO;
3161
3162 if (get_ldev_if_state(mdev, D_ATTACHING)) {
3163 drbd_md_set_flag(mdev, MDF_FULL_SYNC);
3164 drbd_md_sync(mdev);
3165 drbd_bm_set_all(mdev);
3166
3167 rv = drbd_bm_write(mdev);
3168
3169 if (!rv) {
3170 drbd_md_clear_flag(mdev, MDF_FULL_SYNC);
3171 drbd_md_sync(mdev);
3172 }
3173
3174 put_ldev(mdev);
3175 }
3176
3177 return rv;
3178 }
3179
3180 /**
3181 * drbd_bmio_clear_n_write() - io_fn for drbd_queue_bitmap_io() or drbd_bitmap_io()
3182 * @mdev: DRBD device.
3183 *
3184 * Clears all bits in the bitmap and writes the whole bitmap to stable storage.
3185 */
3186 int drbd_bmio_clear_n_write(struct drbd_conf *mdev)
3187 {
3188 int rv = -EIO;
3189
3190 drbd_resume_al(mdev);
3191 if (get_ldev_if_state(mdev, D_ATTACHING)) {
3192 drbd_bm_clear_all(mdev);
3193 rv = drbd_bm_write(mdev);
3194 put_ldev(mdev);
3195 }
3196
3197 return rv;
3198 }
3199
3200 static int w_bitmap_io(struct drbd_work *w, int unused)
3201 {
3202 struct bm_io_work *work = container_of(w, struct bm_io_work, w);
3203 struct drbd_conf *mdev = w->mdev;
3204 int rv = -EIO;
3205
3206 D_ASSERT(atomic_read(&mdev->ap_bio_cnt) == 0);
3207
3208 if (get_ldev(mdev)) {
3209 drbd_bm_lock(mdev, work->why, work->flags);
3210 rv = work->io_fn(mdev);
3211 drbd_bm_unlock(mdev);
3212 put_ldev(mdev);
3213 }
3214
3215 clear_bit_unlock(BITMAP_IO, &mdev->flags);
3216 wake_up(&mdev->misc_wait);
3217
3218 if (work->done)
3219 work->done(mdev, rv);
3220
3221 clear_bit(BITMAP_IO_QUEUED, &mdev->flags);
3222 work->why = NULL;
3223 work->flags = 0;
3224
3225 return 0;
3226 }
3227
3228 void drbd_ldev_destroy(struct drbd_conf *mdev)
3229 {
3230 lc_destroy(mdev->resync);
3231 mdev->resync = NULL;
3232 lc_destroy(mdev->act_log);
3233 mdev->act_log = NULL;
3234 __no_warn(local,
3235 drbd_free_bc(mdev->ldev);
3236 mdev->ldev = NULL;);
3237
3238 clear_bit(GO_DISKLESS, &mdev->flags);
3239 }
3240
3241 static int w_go_diskless(struct drbd_work *w, int unused)
3242 {
3243 struct drbd_conf *mdev = w->mdev;
3244
3245 D_ASSERT(mdev->state.disk == D_FAILED);
3246 /* we cannot assert local_cnt == 0 here, as get_ldev_if_state will
3247 * inc/dec it frequently. Once we are D_DISKLESS, no one will touch
3248 * the protected members anymore, though, so once put_ldev reaches zero
3249 * again, it will be safe to free them. */
3250 drbd_force_state(mdev, NS(disk, D_DISKLESS));
3251 return 0;
3252 }
3253
3254 void drbd_go_diskless(struct drbd_conf *mdev)
3255 {
3256 D_ASSERT(mdev->state.disk == D_FAILED);
3257 if (!test_and_set_bit(GO_DISKLESS, &mdev->flags))
3258 drbd_queue_work(&mdev->tconn->data.work, &mdev->go_diskless);
3259 }
3260
3261 /**
3262 * drbd_queue_bitmap_io() - Queues an IO operation on the whole bitmap
3263 * @mdev: DRBD device.
3264 * @io_fn: IO callback to be called when bitmap IO is possible
3265 * @done: callback to be called after the bitmap IO was performed
3266 * @why: Descriptive text of the reason for doing the IO
3267 *
3268 * While IO on the bitmap happens we freeze application IO thus we ensure
3269 * that drbd_set_out_of_sync() can not be called. This function MAY ONLY be
3270 * called from worker context. It MUST NOT be used while a previous such
3271 * work is still pending!
3272 */
3273 void drbd_queue_bitmap_io(struct drbd_conf *mdev,
3274 int (*io_fn)(struct drbd_conf *),
3275 void (*done)(struct drbd_conf *, int),
3276 char *why, enum bm_flag flags)
3277 {
3278 D_ASSERT(current == mdev->tconn->worker.task);
3279
3280 D_ASSERT(!test_bit(BITMAP_IO_QUEUED, &mdev->flags));
3281 D_ASSERT(!test_bit(BITMAP_IO, &mdev->flags));
3282 D_ASSERT(list_empty(&mdev->bm_io_work.w.list));
3283 if (mdev->bm_io_work.why)
3284 dev_err(DEV, "FIXME going to queue '%s' but '%s' still pending?\n",
3285 why, mdev->bm_io_work.why);
3286
3287 mdev->bm_io_work.io_fn = io_fn;
3288 mdev->bm_io_work.done = done;
3289 mdev->bm_io_work.why = why;
3290 mdev->bm_io_work.flags = flags;
3291
3292 spin_lock_irq(&mdev->tconn->req_lock);
3293 set_bit(BITMAP_IO, &mdev->flags);
3294 if (atomic_read(&mdev->ap_bio_cnt) == 0) {
3295 if (!test_and_set_bit(BITMAP_IO_QUEUED, &mdev->flags))
3296 drbd_queue_work(&mdev->tconn->data.work, &mdev->bm_io_work.w);
3297 }
3298 spin_unlock_irq(&mdev->tconn->req_lock);
3299 }
3300
3301 /**
3302 * drbd_bitmap_io() - Does an IO operation on the whole bitmap
3303 * @mdev: DRBD device.
3304 * @io_fn: IO callback to be called when bitmap IO is possible
3305 * @why: Descriptive text of the reason for doing the IO
3306 *
3307 * freezes application IO while that the actual IO operations runs. This
3308 * functions MAY NOT be called from worker context.
3309 */
3310 int drbd_bitmap_io(struct drbd_conf *mdev, int (*io_fn)(struct drbd_conf *),
3311 char *why, enum bm_flag flags)
3312 {
3313 int rv;
3314
3315 D_ASSERT(current != mdev->tconn->worker.task);
3316
3317 if ((flags & BM_LOCKED_SET_ALLOWED) == 0)
3318 drbd_suspend_io(mdev);
3319
3320 drbd_bm_lock(mdev, why, flags);
3321 rv = io_fn(mdev);
3322 drbd_bm_unlock(mdev);
3323
3324 if ((flags & BM_LOCKED_SET_ALLOWED) == 0)
3325 drbd_resume_io(mdev);
3326
3327 return rv;
3328 }
3329
3330 void drbd_md_set_flag(struct drbd_conf *mdev, int flag) __must_hold(local)
3331 {
3332 if ((mdev->ldev->md.flags & flag) != flag) {
3333 drbd_md_mark_dirty(mdev);
3334 mdev->ldev->md.flags |= flag;
3335 }
3336 }
3337
3338 void drbd_md_clear_flag(struct drbd_conf *mdev, int flag) __must_hold(local)
3339 {
3340 if ((mdev->ldev->md.flags & flag) != 0) {
3341 drbd_md_mark_dirty(mdev);
3342 mdev->ldev->md.flags &= ~flag;
3343 }
3344 }
3345 int drbd_md_test_flag(struct drbd_backing_dev *bdev, int flag)
3346 {
3347 return (bdev->md.flags & flag) != 0;
3348 }
3349
3350 static void md_sync_timer_fn(unsigned long data)
3351 {
3352 struct drbd_conf *mdev = (struct drbd_conf *) data;
3353
3354 drbd_queue_work_front(&mdev->tconn->data.work, &mdev->md_sync_work);
3355 }
3356
3357 static int w_md_sync(struct drbd_work *w, int unused)
3358 {
3359 struct drbd_conf *mdev = w->mdev;
3360
3361 dev_warn(DEV, "md_sync_timer expired! Worker calls drbd_md_sync().\n");
3362 #ifdef DEBUG
3363 dev_warn(DEV, "last md_mark_dirty: %s:%u\n",
3364 mdev->last_md_mark_dirty.func, mdev->last_md_mark_dirty.line);
3365 #endif
3366 drbd_md_sync(mdev);
3367 return 0;
3368 }
3369
3370 const char *cmdname(enum drbd_packet cmd)
3371 {
3372 /* THINK may need to become several global tables
3373 * when we want to support more than
3374 * one PRO_VERSION */
3375 static const char *cmdnames[] = {
3376 [P_DATA] = "Data",
3377 [P_DATA_REPLY] = "DataReply",
3378 [P_RS_DATA_REPLY] = "RSDataReply",
3379 [P_BARRIER] = "Barrier",
3380 [P_BITMAP] = "ReportBitMap",
3381 [P_BECOME_SYNC_TARGET] = "BecomeSyncTarget",
3382 [P_BECOME_SYNC_SOURCE] = "BecomeSyncSource",
3383 [P_UNPLUG_REMOTE] = "UnplugRemote",
3384 [P_DATA_REQUEST] = "DataRequest",
3385 [P_RS_DATA_REQUEST] = "RSDataRequest",
3386 [P_SYNC_PARAM] = "SyncParam",
3387 [P_SYNC_PARAM89] = "SyncParam89",
3388 [P_PROTOCOL] = "ReportProtocol",
3389 [P_UUIDS] = "ReportUUIDs",
3390 [P_SIZES] = "ReportSizes",
3391 [P_STATE] = "ReportState",
3392 [P_SYNC_UUID] = "ReportSyncUUID",
3393 [P_AUTH_CHALLENGE] = "AuthChallenge",
3394 [P_AUTH_RESPONSE] = "AuthResponse",
3395 [P_PING] = "Ping",
3396 [P_PING_ACK] = "PingAck",
3397 [P_RECV_ACK] = "RecvAck",
3398 [P_WRITE_ACK] = "WriteAck",
3399 [P_RS_WRITE_ACK] = "RSWriteAck",
3400 [P_DISCARD_WRITE] = "DiscardWrite",
3401 [P_NEG_ACK] = "NegAck",
3402 [P_NEG_DREPLY] = "NegDReply",
3403 [P_NEG_RS_DREPLY] = "NegRSDReply",
3404 [P_BARRIER_ACK] = "BarrierAck",
3405 [P_STATE_CHG_REQ] = "StateChgRequest",
3406 [P_STATE_CHG_REPLY] = "StateChgReply",
3407 [P_OV_REQUEST] = "OVRequest",
3408 [P_OV_REPLY] = "OVReply",
3409 [P_OV_RESULT] = "OVResult",
3410 [P_CSUM_RS_REQUEST] = "CsumRSRequest",
3411 [P_RS_IS_IN_SYNC] = "CsumRSIsInSync",
3412 [P_COMPRESSED_BITMAP] = "CBitmap",
3413 [P_DELAY_PROBE] = "DelayProbe",
3414 [P_OUT_OF_SYNC] = "OutOfSync",
3415 [P_RETRY_WRITE] = "RetryWrite",
3416 [P_RS_CANCEL] = "RSCancel",
3417 [P_CONN_ST_CHG_REQ] = "conn_st_chg_req",
3418 [P_CONN_ST_CHG_REPLY] = "conn_st_chg_reply",
3419 [P_RETRY_WRITE] = "retry_write",
3420 [P_PROTOCOL_UPDATE] = "protocol_update",
3421
3422 /* enum drbd_packet, but not commands - obsoleted flags:
3423 * P_MAY_IGNORE
3424 * P_MAX_OPT_CMD
3425 */
3426 };
3427
3428 /* too big for the array: 0xfffX */
3429 if (cmd == P_INITIAL_META)
3430 return "InitialMeta";
3431 if (cmd == P_INITIAL_DATA)
3432 return "InitialData";
3433 if (cmd == P_CONNECTION_FEATURES)
3434 return "ConnectionFeatures";
3435 if (cmd >= ARRAY_SIZE(cmdnames))
3436 return "Unknown";
3437 return cmdnames[cmd];
3438 }
3439
3440 /**
3441 * drbd_wait_misc - wait for a request to make progress
3442 * @mdev: device associated with the request
3443 * @i: the struct drbd_interval embedded in struct drbd_request or
3444 * struct drbd_peer_request
3445 */
3446 int drbd_wait_misc(struct drbd_conf *mdev, struct drbd_interval *i)
3447 {
3448 struct net_conf *nc;
3449 DEFINE_WAIT(wait);
3450 long timeout;
3451
3452 rcu_read_lock();
3453 nc = rcu_dereference(mdev->tconn->net_conf);
3454 if (!nc) {
3455 rcu_read_unlock();
3456 return -ETIMEDOUT;
3457 }
3458 timeout = nc->ko_count ? nc->timeout * HZ / 10 * nc->ko_count : MAX_SCHEDULE_TIMEOUT;
3459 rcu_read_unlock();
3460
3461 /* Indicate to wake up mdev->misc_wait on progress. */
3462 i->waiting = true;
3463 prepare_to_wait(&mdev->misc_wait, &wait, TASK_INTERRUPTIBLE);
3464 spin_unlock_irq(&mdev->tconn->req_lock);
3465 timeout = schedule_timeout(timeout);
3466 finish_wait(&mdev->misc_wait, &wait);
3467 spin_lock_irq(&mdev->tconn->req_lock);
3468 if (!timeout || mdev->state.conn < C_CONNECTED)
3469 return -ETIMEDOUT;
3470 if (signal_pending(current))
3471 return -ERESTARTSYS;
3472 return 0;
3473 }
3474
3475 #ifdef CONFIG_DRBD_FAULT_INJECTION
3476 /* Fault insertion support including random number generator shamelessly
3477 * stolen from kernel/rcutorture.c */
3478 struct fault_random_state {
3479 unsigned long state;
3480 unsigned long count;
3481 };
3482
3483 #define FAULT_RANDOM_MULT 39916801 /* prime */
3484 #define FAULT_RANDOM_ADD 479001701 /* prime */
3485 #define FAULT_RANDOM_REFRESH 10000
3486
3487 /*
3488 * Crude but fast random-number generator. Uses a linear congruential
3489 * generator, with occasional help from get_random_bytes().
3490 */
3491 static unsigned long
3492 _drbd_fault_random(struct fault_random_state *rsp)
3493 {
3494 long refresh;
3495
3496 if (!rsp->count--) {
3497 get_random_bytes(&refresh, sizeof(refresh));
3498 rsp->state += refresh;
3499 rsp->count = FAULT_RANDOM_REFRESH;
3500 }
3501 rsp->state = rsp->state * FAULT_RANDOM_MULT + FAULT_RANDOM_ADD;
3502 return swahw32(rsp->state);
3503 }
3504
3505 static char *
3506 _drbd_fault_str(unsigned int type) {
3507 static char *_faults[] = {
3508 [DRBD_FAULT_MD_WR] = "Meta-data write",
3509 [DRBD_FAULT_MD_RD] = "Meta-data read",
3510 [DRBD_FAULT_RS_WR] = "Resync write",
3511 [DRBD_FAULT_RS_RD] = "Resync read",
3512 [DRBD_FAULT_DT_WR] = "Data write",
3513 [DRBD_FAULT_DT_RD] = "Data read",
3514 [DRBD_FAULT_DT_RA] = "Data read ahead",
3515 [DRBD_FAULT_BM_ALLOC] = "BM allocation",
3516 [DRBD_FAULT_AL_EE] = "EE allocation",
3517 [DRBD_FAULT_RECEIVE] = "receive data corruption",
3518 };
3519
3520 return (type < DRBD_FAULT_MAX) ? _faults[type] : "**Unknown**";
3521 }
3522
3523 unsigned int
3524 _drbd_insert_fault(struct drbd_conf *mdev, unsigned int type)
3525 {
3526 static struct fault_random_state rrs = {0, 0};
3527
3528 unsigned int ret = (
3529 (fault_devs == 0 ||
3530 ((1 << mdev_to_minor(mdev)) & fault_devs) != 0) &&
3531 (((_drbd_fault_random(&rrs) % 100) + 1) <= fault_rate));
3532
3533 if (ret) {
3534 fault_count++;
3535
3536 if (__ratelimit(&drbd_ratelimit_state))
3537 dev_warn(DEV, "***Simulating %s failure\n",
3538 _drbd_fault_str(type));
3539 }
3540
3541 return ret;
3542 }
3543 #endif
3544
3545 const char *drbd_buildtag(void)
3546 {
3547 /* DRBD built from external sources has here a reference to the
3548 git hash of the source code. */
3549
3550 static char buildtag[38] = "\0uilt-in";
3551
3552 if (buildtag[0] == 0) {
3553 #ifdef CONFIG_MODULES
3554 if (THIS_MODULE != NULL)
3555 sprintf(buildtag, "srcversion: %-24s", THIS_MODULE->srcversion);
3556 else
3557 #endif
3558 buildtag[0] = 'b';
3559 }
3560
3561 return buildtag;
3562 }
3563
3564 module_init(drbd_init)
3565 module_exit(drbd_cleanup)
3566
3567 EXPORT_SYMBOL(drbd_conn_str);
3568 EXPORT_SYMBOL(drbd_role_str);
3569 EXPORT_SYMBOL(drbd_disk_str);
3570 EXPORT_SYMBOL(drbd_set_st_err_str);
This page took 0.166422 seconds and 5 git commands to generate.