drbd: Rename integrity_w_tfm -> integrity_tfm
[deliverable/linux.git] / drivers / block / drbd / drbd_main.c
1 /*
2 drbd.c
3
4 This file is part of DRBD by Philipp Reisner and Lars Ellenberg.
5
6 Copyright (C) 2001-2008, LINBIT Information Technologies GmbH.
7 Copyright (C) 1999-2008, Philipp Reisner <philipp.reisner@linbit.com>.
8 Copyright (C) 2002-2008, Lars Ellenberg <lars.ellenberg@linbit.com>.
9
10 Thanks to Carter Burden, Bart Grantham and Gennadiy Nerubayev
11 from Logicworks, Inc. for making SDP replication support possible.
12
13 drbd is free software; you can redistribute it and/or modify
14 it under the terms of the GNU General Public License as published by
15 the Free Software Foundation; either version 2, or (at your option)
16 any later version.
17
18 drbd is distributed in the hope that it will be useful,
19 but WITHOUT ANY WARRANTY; without even the implied warranty of
20 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
21 GNU General Public License for more details.
22
23 You should have received a copy of the GNU General Public License
24 along with drbd; see the file COPYING. If not, write to
25 the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
26
27 */
28
29 #include <linux/module.h>
30 #include <linux/drbd.h>
31 #include <asm/uaccess.h>
32 #include <asm/types.h>
33 #include <net/sock.h>
34 #include <linux/ctype.h>
35 #include <linux/mutex.h>
36 #include <linux/fs.h>
37 #include <linux/file.h>
38 #include <linux/proc_fs.h>
39 #include <linux/init.h>
40 #include <linux/mm.h>
41 #include <linux/memcontrol.h>
42 #include <linux/mm_inline.h>
43 #include <linux/slab.h>
44 #include <linux/random.h>
45 #include <linux/reboot.h>
46 #include <linux/notifier.h>
47 #include <linux/kthread.h>
48
49 #define __KERNEL_SYSCALLS__
50 #include <linux/unistd.h>
51 #include <linux/vmalloc.h>
52
53 #include <linux/drbd_limits.h>
54 #include "drbd_int.h"
55 #include "drbd_req.h" /* only for _req_mod in tl_release and tl_clear */
56
57 #include "drbd_vli.h"
58
59 static DEFINE_MUTEX(drbd_main_mutex);
60 int drbdd_init(struct drbd_thread *);
61 int drbd_worker(struct drbd_thread *);
62 int drbd_asender(struct drbd_thread *);
63
64 int drbd_init(void);
65 static int drbd_open(struct block_device *bdev, fmode_t mode);
66 static int drbd_release(struct gendisk *gd, fmode_t mode);
67 static int w_md_sync(struct drbd_work *w, int unused);
68 static void md_sync_timer_fn(unsigned long data);
69 static int w_bitmap_io(struct drbd_work *w, int unused);
70 static int w_go_diskless(struct drbd_work *w, int unused);
71
72 MODULE_AUTHOR("Philipp Reisner <phil@linbit.com>, "
73 "Lars Ellenberg <lars@linbit.com>");
74 MODULE_DESCRIPTION("drbd - Distributed Replicated Block Device v" REL_VERSION);
75 MODULE_VERSION(REL_VERSION);
76 MODULE_LICENSE("GPL");
77 MODULE_PARM_DESC(minor_count, "Approximate number of drbd devices ("
78 __stringify(DRBD_MINOR_COUNT_MIN) "-" __stringify(DRBD_MINOR_COUNT_MAX) ")");
79 MODULE_ALIAS_BLOCKDEV_MAJOR(DRBD_MAJOR);
80
81 #include <linux/moduleparam.h>
82 /* allow_open_on_secondary */
83 MODULE_PARM_DESC(allow_oos, "DONT USE!");
84 /* thanks to these macros, if compiled into the kernel (not-module),
85 * this becomes the boot parameter drbd.minor_count */
86 module_param(minor_count, uint, 0444);
87 module_param(disable_sendpage, bool, 0644);
88 module_param(allow_oos, bool, 0);
89 module_param(proc_details, int, 0644);
90
91 #ifdef CONFIG_DRBD_FAULT_INJECTION
92 int enable_faults;
93 int fault_rate;
94 static int fault_count;
95 int fault_devs;
96 /* bitmap of enabled faults */
97 module_param(enable_faults, int, 0664);
98 /* fault rate % value - applies to all enabled faults */
99 module_param(fault_rate, int, 0664);
100 /* count of faults inserted */
101 module_param(fault_count, int, 0664);
102 /* bitmap of devices to insert faults on */
103 module_param(fault_devs, int, 0644);
104 #endif
105
106 /* module parameter, defined */
107 unsigned int minor_count = DRBD_MINOR_COUNT_DEF;
108 int disable_sendpage;
109 int allow_oos;
110 int proc_details; /* Detail level in proc drbd*/
111
112 /* Module parameter for setting the user mode helper program
113 * to run. Default is /sbin/drbdadm */
114 char usermode_helper[80] = "/sbin/drbdadm";
115
116 module_param_string(usermode_helper, usermode_helper, sizeof(usermode_helper), 0644);
117
118 /* in 2.6.x, our device mapping and config info contains our virtual gendisks
119 * as member "struct gendisk *vdisk;"
120 */
121 struct idr minors;
122 struct list_head drbd_tconns; /* list of struct drbd_tconn */
123 DECLARE_RWSEM(drbd_cfg_rwsem);
124
125 struct kmem_cache *drbd_request_cache;
126 struct kmem_cache *drbd_ee_cache; /* peer requests */
127 struct kmem_cache *drbd_bm_ext_cache; /* bitmap extents */
128 struct kmem_cache *drbd_al_ext_cache; /* activity log extents */
129 mempool_t *drbd_request_mempool;
130 mempool_t *drbd_ee_mempool;
131 mempool_t *drbd_md_io_page_pool;
132 struct bio_set *drbd_md_io_bio_set;
133
134 /* I do not use a standard mempool, because:
135 1) I want to hand out the pre-allocated objects first.
136 2) I want to be able to interrupt sleeping allocation with a signal.
137 Note: This is a single linked list, the next pointer is the private
138 member of struct page.
139 */
140 struct page *drbd_pp_pool;
141 spinlock_t drbd_pp_lock;
142 int drbd_pp_vacant;
143 wait_queue_head_t drbd_pp_wait;
144
145 DEFINE_RATELIMIT_STATE(drbd_ratelimit_state, 5 * HZ, 5);
146
147 static const struct block_device_operations drbd_ops = {
148 .owner = THIS_MODULE,
149 .open = drbd_open,
150 .release = drbd_release,
151 };
152
153 static void bio_destructor_drbd(struct bio *bio)
154 {
155 bio_free(bio, drbd_md_io_bio_set);
156 }
157
158 struct bio *bio_alloc_drbd(gfp_t gfp_mask)
159 {
160 struct bio *bio;
161
162 if (!drbd_md_io_bio_set)
163 return bio_alloc(gfp_mask, 1);
164
165 bio = bio_alloc_bioset(gfp_mask, 1, drbd_md_io_bio_set);
166 if (!bio)
167 return NULL;
168 bio->bi_destructor = bio_destructor_drbd;
169 return bio;
170 }
171
172 #ifdef __CHECKER__
173 /* When checking with sparse, and this is an inline function, sparse will
174 give tons of false positives. When this is a real functions sparse works.
175 */
176 int _get_ldev_if_state(struct drbd_conf *mdev, enum drbd_disk_state mins)
177 {
178 int io_allowed;
179
180 atomic_inc(&mdev->local_cnt);
181 io_allowed = (mdev->state.disk >= mins);
182 if (!io_allowed) {
183 if (atomic_dec_and_test(&mdev->local_cnt))
184 wake_up(&mdev->misc_wait);
185 }
186 return io_allowed;
187 }
188
189 #endif
190
191 /**
192 * DOC: The transfer log
193 *
194 * The transfer log is a single linked list of &struct drbd_tl_epoch objects.
195 * mdev->tconn->newest_tle points to the head, mdev->tconn->oldest_tle points to the tail
196 * of the list. There is always at least one &struct drbd_tl_epoch object.
197 *
198 * Each &struct drbd_tl_epoch has a circular double linked list of requests
199 * attached.
200 */
201 static int tl_init(struct drbd_tconn *tconn)
202 {
203 struct drbd_tl_epoch *b;
204
205 /* during device minor initialization, we may well use GFP_KERNEL */
206 b = kmalloc(sizeof(struct drbd_tl_epoch), GFP_KERNEL);
207 if (!b)
208 return 0;
209 INIT_LIST_HEAD(&b->requests);
210 INIT_LIST_HEAD(&b->w.list);
211 b->next = NULL;
212 b->br_number = 4711;
213 b->n_writes = 0;
214 b->w.cb = NULL; /* if this is != NULL, we need to dec_ap_pending in tl_clear */
215
216 tconn->oldest_tle = b;
217 tconn->newest_tle = b;
218 INIT_LIST_HEAD(&tconn->out_of_sequence_requests);
219
220 return 1;
221 }
222
223 static void tl_cleanup(struct drbd_tconn *tconn)
224 {
225 if (tconn->oldest_tle != tconn->newest_tle)
226 conn_err(tconn, "ASSERT FAILED: oldest_tle == newest_tle\n");
227 if (!list_empty(&tconn->out_of_sequence_requests))
228 conn_err(tconn, "ASSERT FAILED: list_empty(out_of_sequence_requests)\n");
229 kfree(tconn->oldest_tle);
230 tconn->oldest_tle = NULL;
231 kfree(tconn->unused_spare_tle);
232 tconn->unused_spare_tle = NULL;
233 }
234
235 /**
236 * _tl_add_barrier() - Adds a barrier to the transfer log
237 * @mdev: DRBD device.
238 * @new: Barrier to be added before the current head of the TL.
239 *
240 * The caller must hold the req_lock.
241 */
242 void _tl_add_barrier(struct drbd_tconn *tconn, struct drbd_tl_epoch *new)
243 {
244 struct drbd_tl_epoch *newest_before;
245
246 INIT_LIST_HEAD(&new->requests);
247 INIT_LIST_HEAD(&new->w.list);
248 new->w.cb = NULL; /* if this is != NULL, we need to dec_ap_pending in tl_clear */
249 new->next = NULL;
250 new->n_writes = 0;
251
252 newest_before = tconn->newest_tle;
253 /* never send a barrier number == 0, because that is special-cased
254 * when using TCQ for our write ordering code */
255 new->br_number = (newest_before->br_number+1) ?: 1;
256 if (tconn->newest_tle != new) {
257 tconn->newest_tle->next = new;
258 tconn->newest_tle = new;
259 }
260 }
261
262 /**
263 * tl_release() - Free or recycle the oldest &struct drbd_tl_epoch object of the TL
264 * @mdev: DRBD device.
265 * @barrier_nr: Expected identifier of the DRBD write barrier packet.
266 * @set_size: Expected number of requests before that barrier.
267 *
268 * In case the passed barrier_nr or set_size does not match the oldest
269 * &struct drbd_tl_epoch objects this function will cause a termination
270 * of the connection.
271 */
272 void tl_release(struct drbd_tconn *tconn, unsigned int barrier_nr,
273 unsigned int set_size)
274 {
275 struct drbd_conf *mdev;
276 struct drbd_tl_epoch *b, *nob; /* next old barrier */
277 struct list_head *le, *tle;
278 struct drbd_request *r;
279
280 spin_lock_irq(&tconn->req_lock);
281
282 b = tconn->oldest_tle;
283
284 /* first some paranoia code */
285 if (b == NULL) {
286 conn_err(tconn, "BAD! BarrierAck #%u received, but no epoch in tl!?\n",
287 barrier_nr);
288 goto bail;
289 }
290 if (b->br_number != barrier_nr) {
291 conn_err(tconn, "BAD! BarrierAck #%u received, expected #%u!\n",
292 barrier_nr, b->br_number);
293 goto bail;
294 }
295 if (b->n_writes != set_size) {
296 conn_err(tconn, "BAD! BarrierAck #%u received with n_writes=%u, expected n_writes=%u!\n",
297 barrier_nr, set_size, b->n_writes);
298 goto bail;
299 }
300
301 /* Clean up list of requests processed during current epoch */
302 list_for_each_safe(le, tle, &b->requests) {
303 r = list_entry(le, struct drbd_request, tl_requests);
304 _req_mod(r, BARRIER_ACKED);
305 }
306 /* There could be requests on the list waiting for completion
307 of the write to the local disk. To avoid corruptions of
308 slab's data structures we have to remove the lists head.
309
310 Also there could have been a barrier ack out of sequence, overtaking
311 the write acks - which would be a bug and violating write ordering.
312 To not deadlock in case we lose connection while such requests are
313 still pending, we need some way to find them for the
314 _req_mode(CONNECTION_LOST_WHILE_PENDING).
315
316 These have been list_move'd to the out_of_sequence_requests list in
317 _req_mod(, BARRIER_ACKED) above.
318 */
319 list_del_init(&b->requests);
320 mdev = b->w.mdev;
321
322 nob = b->next;
323 if (test_and_clear_bit(CREATE_BARRIER, &mdev->flags)) {
324 _tl_add_barrier(tconn, b);
325 if (nob)
326 tconn->oldest_tle = nob;
327 /* if nob == NULL b was the only barrier, and becomes the new
328 barrier. Therefore tconn->oldest_tle points already to b */
329 } else {
330 D_ASSERT(nob != NULL);
331 tconn->oldest_tle = nob;
332 kfree(b);
333 }
334
335 spin_unlock_irq(&tconn->req_lock);
336 dec_ap_pending(mdev);
337
338 return;
339
340 bail:
341 spin_unlock_irq(&tconn->req_lock);
342 conn_request_state(tconn, NS(conn, C_PROTOCOL_ERROR), CS_HARD);
343 }
344
345
346 /**
347 * _tl_restart() - Walks the transfer log, and applies an action to all requests
348 * @mdev: DRBD device.
349 * @what: The action/event to perform with all request objects
350 *
351 * @what might be one of CONNECTION_LOST_WHILE_PENDING, RESEND, FAIL_FROZEN_DISK_IO,
352 * RESTART_FROZEN_DISK_IO.
353 */
354 void _tl_restart(struct drbd_tconn *tconn, enum drbd_req_event what)
355 {
356 struct drbd_tl_epoch *b, *tmp, **pn;
357 struct list_head *le, *tle, carry_reads;
358 struct drbd_request *req;
359 int rv, n_writes, n_reads;
360
361 b = tconn->oldest_tle;
362 pn = &tconn->oldest_tle;
363 while (b) {
364 n_writes = 0;
365 n_reads = 0;
366 INIT_LIST_HEAD(&carry_reads);
367 list_for_each_safe(le, tle, &b->requests) {
368 req = list_entry(le, struct drbd_request, tl_requests);
369 rv = _req_mod(req, what);
370
371 n_writes += (rv & MR_WRITE) >> MR_WRITE_SHIFT;
372 n_reads += (rv & MR_READ) >> MR_READ_SHIFT;
373 }
374 tmp = b->next;
375
376 if (n_writes) {
377 if (what == RESEND) {
378 b->n_writes = n_writes;
379 if (b->w.cb == NULL) {
380 b->w.cb = w_send_barrier;
381 inc_ap_pending(b->w.mdev);
382 set_bit(CREATE_BARRIER, &b->w.mdev->flags);
383 }
384
385 drbd_queue_work(&tconn->data.work, &b->w);
386 }
387 pn = &b->next;
388 } else {
389 if (n_reads)
390 list_add(&carry_reads, &b->requests);
391 /* there could still be requests on that ring list,
392 * in case local io is still pending */
393 list_del(&b->requests);
394
395 /* dec_ap_pending corresponding to queue_barrier.
396 * the newest barrier may not have been queued yet,
397 * in which case w.cb is still NULL. */
398 if (b->w.cb != NULL)
399 dec_ap_pending(b->w.mdev);
400
401 if (b == tconn->newest_tle) {
402 /* recycle, but reinit! */
403 if (tmp != NULL)
404 conn_err(tconn, "ASSERT FAILED tmp == NULL");
405 INIT_LIST_HEAD(&b->requests);
406 list_splice(&carry_reads, &b->requests);
407 INIT_LIST_HEAD(&b->w.list);
408 b->w.cb = NULL;
409 b->br_number = net_random();
410 b->n_writes = 0;
411
412 *pn = b;
413 break;
414 }
415 *pn = tmp;
416 kfree(b);
417 }
418 b = tmp;
419 list_splice(&carry_reads, &b->requests);
420 }
421 }
422
423
424 /**
425 * tl_clear() - Clears all requests and &struct drbd_tl_epoch objects out of the TL
426 * @mdev: DRBD device.
427 *
428 * This is called after the connection to the peer was lost. The storage covered
429 * by the requests on the transfer gets marked as our of sync. Called from the
430 * receiver thread and the worker thread.
431 */
432 void tl_clear(struct drbd_tconn *tconn)
433 {
434 struct drbd_conf *mdev;
435 struct list_head *le, *tle;
436 struct drbd_request *r;
437 int vnr;
438
439 spin_lock_irq(&tconn->req_lock);
440
441 _tl_restart(tconn, CONNECTION_LOST_WHILE_PENDING);
442
443 /* we expect this list to be empty. */
444 if (!list_empty(&tconn->out_of_sequence_requests))
445 conn_err(tconn, "ASSERT FAILED list_empty(&out_of_sequence_requests)\n");
446
447 /* but just in case, clean it up anyways! */
448 list_for_each_safe(le, tle, &tconn->out_of_sequence_requests) {
449 r = list_entry(le, struct drbd_request, tl_requests);
450 /* It would be nice to complete outside of spinlock.
451 * But this is easier for now. */
452 _req_mod(r, CONNECTION_LOST_WHILE_PENDING);
453 }
454
455 /* ensure bit indicating barrier is required is clear */
456 rcu_read_lock();
457 idr_for_each_entry(&tconn->volumes, mdev, vnr)
458 clear_bit(CREATE_BARRIER, &mdev->flags);
459 rcu_read_unlock();
460
461 spin_unlock_irq(&tconn->req_lock);
462 }
463
464 void tl_restart(struct drbd_tconn *tconn, enum drbd_req_event what)
465 {
466 spin_lock_irq(&tconn->req_lock);
467 _tl_restart(tconn, what);
468 spin_unlock_irq(&tconn->req_lock);
469 }
470
471 static int drbd_thread_setup(void *arg)
472 {
473 struct drbd_thread *thi = (struct drbd_thread *) arg;
474 struct drbd_tconn *tconn = thi->tconn;
475 unsigned long flags;
476 int retval;
477
478 snprintf(current->comm, sizeof(current->comm), "drbd_%c_%s",
479 thi->name[0], thi->tconn->name);
480
481 restart:
482 retval = thi->function(thi);
483
484 spin_lock_irqsave(&thi->t_lock, flags);
485
486 /* if the receiver has been "EXITING", the last thing it did
487 * was set the conn state to "StandAlone",
488 * if now a re-connect request comes in, conn state goes C_UNCONNECTED,
489 * and receiver thread will be "started".
490 * drbd_thread_start needs to set "RESTARTING" in that case.
491 * t_state check and assignment needs to be within the same spinlock,
492 * so either thread_start sees EXITING, and can remap to RESTARTING,
493 * or thread_start see NONE, and can proceed as normal.
494 */
495
496 if (thi->t_state == RESTARTING) {
497 conn_info(tconn, "Restarting %s thread\n", thi->name);
498 thi->t_state = RUNNING;
499 spin_unlock_irqrestore(&thi->t_lock, flags);
500 goto restart;
501 }
502
503 thi->task = NULL;
504 thi->t_state = NONE;
505 smp_mb();
506 complete(&thi->stop);
507 spin_unlock_irqrestore(&thi->t_lock, flags);
508
509 conn_info(tconn, "Terminating %s\n", current->comm);
510
511 /* Release mod reference taken when thread was started */
512
513 kref_put(&tconn->kref, &conn_destroy);
514 module_put(THIS_MODULE);
515 return retval;
516 }
517
518 static void drbd_thread_init(struct drbd_tconn *tconn, struct drbd_thread *thi,
519 int (*func) (struct drbd_thread *), char *name)
520 {
521 spin_lock_init(&thi->t_lock);
522 thi->task = NULL;
523 thi->t_state = NONE;
524 thi->function = func;
525 thi->tconn = tconn;
526 strncpy(thi->name, name, ARRAY_SIZE(thi->name));
527 }
528
529 int drbd_thread_start(struct drbd_thread *thi)
530 {
531 struct drbd_tconn *tconn = thi->tconn;
532 struct task_struct *nt;
533 unsigned long flags;
534
535 /* is used from state engine doing drbd_thread_stop_nowait,
536 * while holding the req lock irqsave */
537 spin_lock_irqsave(&thi->t_lock, flags);
538
539 switch (thi->t_state) {
540 case NONE:
541 conn_info(tconn, "Starting %s thread (from %s [%d])\n",
542 thi->name, current->comm, current->pid);
543
544 /* Get ref on module for thread - this is released when thread exits */
545 if (!try_module_get(THIS_MODULE)) {
546 conn_err(tconn, "Failed to get module reference in drbd_thread_start\n");
547 spin_unlock_irqrestore(&thi->t_lock, flags);
548 return false;
549 }
550
551 kref_get(&thi->tconn->kref);
552
553 init_completion(&thi->stop);
554 thi->reset_cpu_mask = 1;
555 thi->t_state = RUNNING;
556 spin_unlock_irqrestore(&thi->t_lock, flags);
557 flush_signals(current); /* otherw. may get -ERESTARTNOINTR */
558
559 nt = kthread_create(drbd_thread_setup, (void *) thi,
560 "drbd_%c_%s", thi->name[0], thi->tconn->name);
561
562 if (IS_ERR(nt)) {
563 conn_err(tconn, "Couldn't start thread\n");
564
565 kref_put(&tconn->kref, &conn_destroy);
566 module_put(THIS_MODULE);
567 return false;
568 }
569 spin_lock_irqsave(&thi->t_lock, flags);
570 thi->task = nt;
571 thi->t_state = RUNNING;
572 spin_unlock_irqrestore(&thi->t_lock, flags);
573 wake_up_process(nt);
574 break;
575 case EXITING:
576 thi->t_state = RESTARTING;
577 conn_info(tconn, "Restarting %s thread (from %s [%d])\n",
578 thi->name, current->comm, current->pid);
579 /* fall through */
580 case RUNNING:
581 case RESTARTING:
582 default:
583 spin_unlock_irqrestore(&thi->t_lock, flags);
584 break;
585 }
586
587 return true;
588 }
589
590
591 void _drbd_thread_stop(struct drbd_thread *thi, int restart, int wait)
592 {
593 unsigned long flags;
594
595 enum drbd_thread_state ns = restart ? RESTARTING : EXITING;
596
597 /* may be called from state engine, holding the req lock irqsave */
598 spin_lock_irqsave(&thi->t_lock, flags);
599
600 if (thi->t_state == NONE) {
601 spin_unlock_irqrestore(&thi->t_lock, flags);
602 if (restart)
603 drbd_thread_start(thi);
604 return;
605 }
606
607 if (thi->t_state != ns) {
608 if (thi->task == NULL) {
609 spin_unlock_irqrestore(&thi->t_lock, flags);
610 return;
611 }
612
613 thi->t_state = ns;
614 smp_mb();
615 init_completion(&thi->stop);
616 if (thi->task != current)
617 force_sig(DRBD_SIGKILL, thi->task);
618 }
619
620 spin_unlock_irqrestore(&thi->t_lock, flags);
621
622 if (wait)
623 wait_for_completion(&thi->stop);
624 }
625
626 static struct drbd_thread *drbd_task_to_thread(struct drbd_tconn *tconn, struct task_struct *task)
627 {
628 struct drbd_thread *thi =
629 task == tconn->receiver.task ? &tconn->receiver :
630 task == tconn->asender.task ? &tconn->asender :
631 task == tconn->worker.task ? &tconn->worker : NULL;
632
633 return thi;
634 }
635
636 char *drbd_task_to_thread_name(struct drbd_tconn *tconn, struct task_struct *task)
637 {
638 struct drbd_thread *thi = drbd_task_to_thread(tconn, task);
639 return thi ? thi->name : task->comm;
640 }
641
642 int conn_lowest_minor(struct drbd_tconn *tconn)
643 {
644 struct drbd_conf *mdev;
645 int vnr = 0, m;
646
647 rcu_read_lock();
648 mdev = idr_get_next(&tconn->volumes, &vnr);
649 m = mdev ? mdev_to_minor(mdev) : -1;
650 rcu_read_unlock();
651
652 return m;
653 }
654
655 #ifdef CONFIG_SMP
656 /**
657 * drbd_calc_cpu_mask() - Generate CPU masks, spread over all CPUs
658 * @mdev: DRBD device.
659 *
660 * Forces all threads of a device onto the same CPU. This is beneficial for
661 * DRBD's performance. May be overwritten by user's configuration.
662 */
663 void drbd_calc_cpu_mask(struct drbd_tconn *tconn)
664 {
665 int ord, cpu;
666
667 /* user override. */
668 if (cpumask_weight(tconn->cpu_mask))
669 return;
670
671 ord = conn_lowest_minor(tconn) % cpumask_weight(cpu_online_mask);
672 for_each_online_cpu(cpu) {
673 if (ord-- == 0) {
674 cpumask_set_cpu(cpu, tconn->cpu_mask);
675 return;
676 }
677 }
678 /* should not be reached */
679 cpumask_setall(tconn->cpu_mask);
680 }
681
682 /**
683 * drbd_thread_current_set_cpu() - modifies the cpu mask of the _current_ thread
684 * @mdev: DRBD device.
685 * @thi: drbd_thread object
686 *
687 * call in the "main loop" of _all_ threads, no need for any mutex, current won't die
688 * prematurely.
689 */
690 void drbd_thread_current_set_cpu(struct drbd_thread *thi)
691 {
692 struct task_struct *p = current;
693
694 if (!thi->reset_cpu_mask)
695 return;
696 thi->reset_cpu_mask = 0;
697 set_cpus_allowed_ptr(p, thi->tconn->cpu_mask);
698 }
699 #endif
700
701 /**
702 * drbd_header_size - size of a packet header
703 *
704 * The header size is a multiple of 8, so any payload following the header is
705 * word aligned on 64-bit architectures. (The bitmap send and receive code
706 * relies on this.)
707 */
708 unsigned int drbd_header_size(struct drbd_tconn *tconn)
709 {
710 if (tconn->agreed_pro_version >= 100) {
711 BUILD_BUG_ON(!IS_ALIGNED(sizeof(struct p_header100), 8));
712 return sizeof(struct p_header100);
713 } else {
714 BUILD_BUG_ON(sizeof(struct p_header80) !=
715 sizeof(struct p_header95));
716 BUILD_BUG_ON(!IS_ALIGNED(sizeof(struct p_header80), 8));
717 return sizeof(struct p_header80);
718 }
719 }
720
721 static unsigned int prepare_header80(struct p_header80 *h, enum drbd_packet cmd, int size)
722 {
723 h->magic = cpu_to_be32(DRBD_MAGIC);
724 h->command = cpu_to_be16(cmd);
725 h->length = cpu_to_be16(size);
726 return sizeof(struct p_header80);
727 }
728
729 static unsigned int prepare_header95(struct p_header95 *h, enum drbd_packet cmd, int size)
730 {
731 h->magic = cpu_to_be16(DRBD_MAGIC_BIG);
732 h->command = cpu_to_be16(cmd);
733 h->length = cpu_to_be32(size);
734 return sizeof(struct p_header95);
735 }
736
737 static unsigned int prepare_header100(struct p_header100 *h, enum drbd_packet cmd,
738 int size, int vnr)
739 {
740 h->magic = cpu_to_be32(DRBD_MAGIC_100);
741 h->volume = cpu_to_be16(vnr);
742 h->command = cpu_to_be16(cmd);
743 h->length = cpu_to_be32(size);
744 h->pad = 0;
745 return sizeof(struct p_header100);
746 }
747
748 static unsigned int prepare_header(struct drbd_tconn *tconn, int vnr,
749 void *buffer, enum drbd_packet cmd, int size)
750 {
751 if (tconn->agreed_pro_version >= 100)
752 return prepare_header100(buffer, cmd, size, vnr);
753 else if (tconn->agreed_pro_version >= 95 &&
754 size > DRBD_MAX_SIZE_H80_PACKET)
755 return prepare_header95(buffer, cmd, size);
756 else
757 return prepare_header80(buffer, cmd, size);
758 }
759
760 void *conn_prepare_command(struct drbd_tconn *tconn, struct drbd_socket *sock)
761 {
762 mutex_lock(&sock->mutex);
763 if (!sock->socket) {
764 mutex_unlock(&sock->mutex);
765 return NULL;
766 }
767 return sock->sbuf + drbd_header_size(tconn);
768 }
769
770 void *drbd_prepare_command(struct drbd_conf *mdev, struct drbd_socket *sock)
771 {
772 return conn_prepare_command(mdev->tconn, sock);
773 }
774
775 static int __send_command(struct drbd_tconn *tconn, int vnr,
776 struct drbd_socket *sock, enum drbd_packet cmd,
777 unsigned int header_size, void *data,
778 unsigned int size)
779 {
780 int msg_flags;
781 int err;
782
783 /*
784 * Called with @data == NULL and the size of the data blocks in @size
785 * for commands that send data blocks. For those commands, omit the
786 * MSG_MORE flag: this will increase the likelihood that data blocks
787 * which are page aligned on the sender will end up page aligned on the
788 * receiver.
789 */
790 msg_flags = data ? MSG_MORE : 0;
791
792 header_size += prepare_header(tconn, vnr, sock->sbuf, cmd,
793 header_size + size);
794 err = drbd_send_all(tconn, sock->socket, sock->sbuf, header_size,
795 msg_flags);
796 if (data && !err)
797 err = drbd_send_all(tconn, sock->socket, data, size, 0);
798 return err;
799 }
800
801 int conn_send_command(struct drbd_tconn *tconn, struct drbd_socket *sock,
802 enum drbd_packet cmd, unsigned int header_size,
803 void *data, unsigned int size)
804 {
805 int err;
806
807 err = __send_command(tconn, 0, sock, cmd, header_size, data, size);
808 mutex_unlock(&sock->mutex);
809 return err;
810 }
811
812 int drbd_send_command(struct drbd_conf *mdev, struct drbd_socket *sock,
813 enum drbd_packet cmd, unsigned int header_size,
814 void *data, unsigned int size)
815 {
816 int err;
817
818 err = __send_command(mdev->tconn, mdev->vnr, sock, cmd, header_size,
819 data, size);
820 mutex_unlock(&sock->mutex);
821 return err;
822 }
823
824 int drbd_send_ping(struct drbd_tconn *tconn)
825 {
826 struct drbd_socket *sock;
827
828 sock = &tconn->meta;
829 if (!conn_prepare_command(tconn, sock))
830 return -EIO;
831 return conn_send_command(tconn, sock, P_PING, 0, NULL, 0);
832 }
833
834 int drbd_send_ping_ack(struct drbd_tconn *tconn)
835 {
836 struct drbd_socket *sock;
837
838 sock = &tconn->meta;
839 if (!conn_prepare_command(tconn, sock))
840 return -EIO;
841 return conn_send_command(tconn, sock, P_PING_ACK, 0, NULL, 0);
842 }
843
844 int drbd_send_sync_param(struct drbd_conf *mdev)
845 {
846 struct drbd_socket *sock;
847 struct p_rs_param_95 *p;
848 int size;
849 const int apv = mdev->tconn->agreed_pro_version;
850 enum drbd_packet cmd;
851 struct net_conf *nc;
852
853 sock = &mdev->tconn->data;
854 p = drbd_prepare_command(mdev, sock);
855 if (!p)
856 return -EIO;
857
858 rcu_read_lock();
859 nc = rcu_dereference(mdev->tconn->net_conf);
860
861 size = apv <= 87 ? sizeof(struct p_rs_param)
862 : apv == 88 ? sizeof(struct p_rs_param)
863 + strlen(nc->verify_alg) + 1
864 : apv <= 94 ? sizeof(struct p_rs_param_89)
865 : /* apv >= 95 */ sizeof(struct p_rs_param_95);
866
867 cmd = apv >= 89 ? P_SYNC_PARAM89 : P_SYNC_PARAM;
868
869 /* initialize verify_alg and csums_alg */
870 memset(p->verify_alg, 0, 2 * SHARED_SECRET_MAX);
871
872 if (get_ldev(mdev)) {
873 p->rate = cpu_to_be32(mdev->ldev->dc.resync_rate);
874 p->c_plan_ahead = cpu_to_be32(mdev->ldev->dc.c_plan_ahead);
875 p->c_delay_target = cpu_to_be32(mdev->ldev->dc.c_delay_target);
876 p->c_fill_target = cpu_to_be32(mdev->ldev->dc.c_fill_target);
877 p->c_max_rate = cpu_to_be32(mdev->ldev->dc.c_max_rate);
878 put_ldev(mdev);
879 } else {
880 p->rate = cpu_to_be32(DRBD_RATE_DEF);
881 p->c_plan_ahead = cpu_to_be32(DRBD_C_PLAN_AHEAD_DEF);
882 p->c_delay_target = cpu_to_be32(DRBD_C_DELAY_TARGET_DEF);
883 p->c_fill_target = cpu_to_be32(DRBD_C_FILL_TARGET_DEF);
884 p->c_max_rate = cpu_to_be32(DRBD_C_MAX_RATE_DEF);
885 }
886
887 if (apv >= 88)
888 strcpy(p->verify_alg, nc->verify_alg);
889 if (apv >= 89)
890 strcpy(p->csums_alg, nc->csums_alg);
891 rcu_read_unlock();
892
893 return drbd_send_command(mdev, sock, cmd, size, NULL, 0);
894 }
895
896 int drbd_send_protocol(struct drbd_tconn *tconn)
897 {
898 struct drbd_socket *sock;
899 struct p_protocol *p;
900 struct net_conf *nc;
901 int size, cf;
902
903 sock = &tconn->data;
904 p = conn_prepare_command(tconn, sock);
905 if (!p)
906 return -EIO;
907
908 rcu_read_lock();
909 nc = rcu_dereference(tconn->net_conf);
910
911 if (nc->dry_run && tconn->agreed_pro_version < 92) {
912 rcu_read_unlock();
913 mutex_unlock(&sock->mutex);
914 conn_err(tconn, "--dry-run is not supported by peer");
915 return -EOPNOTSUPP;
916 }
917
918 size = sizeof(*p);
919 if (tconn->agreed_pro_version >= 87)
920 size += strlen(nc->integrity_alg) + 1;
921
922 p->protocol = cpu_to_be32(nc->wire_protocol);
923 p->after_sb_0p = cpu_to_be32(nc->after_sb_0p);
924 p->after_sb_1p = cpu_to_be32(nc->after_sb_1p);
925 p->after_sb_2p = cpu_to_be32(nc->after_sb_2p);
926 p->two_primaries = cpu_to_be32(nc->two_primaries);
927 cf = 0;
928 if (nc->want_lose)
929 cf |= CF_WANT_LOSE;
930 if (nc->dry_run)
931 cf |= CF_DRY_RUN;
932 p->conn_flags = cpu_to_be32(cf);
933
934 if (tconn->agreed_pro_version >= 87)
935 strcpy(p->integrity_alg, nc->integrity_alg);
936 rcu_read_unlock();
937
938 return conn_send_command(tconn, sock, P_PROTOCOL, size, NULL, 0);
939 }
940
941 int _drbd_send_uuids(struct drbd_conf *mdev, u64 uuid_flags)
942 {
943 struct drbd_socket *sock;
944 struct p_uuids *p;
945 int i;
946
947 if (!get_ldev_if_state(mdev, D_NEGOTIATING))
948 return 0;
949
950 sock = &mdev->tconn->data;
951 p = drbd_prepare_command(mdev, sock);
952 if (!p) {
953 put_ldev(mdev);
954 return -EIO;
955 }
956 for (i = UI_CURRENT; i < UI_SIZE; i++)
957 p->uuid[i] = mdev->ldev ? cpu_to_be64(mdev->ldev->md.uuid[i]) : 0;
958
959 mdev->comm_bm_set = drbd_bm_total_weight(mdev);
960 p->uuid[UI_SIZE] = cpu_to_be64(mdev->comm_bm_set);
961 rcu_read_lock();
962 uuid_flags |= rcu_dereference(mdev->tconn->net_conf)->want_lose ? 1 : 0;
963 rcu_read_unlock();
964 uuid_flags |= test_bit(CRASHED_PRIMARY, &mdev->flags) ? 2 : 0;
965 uuid_flags |= mdev->new_state_tmp.disk == D_INCONSISTENT ? 4 : 0;
966 p->uuid[UI_FLAGS] = cpu_to_be64(uuid_flags);
967
968 put_ldev(mdev);
969 return drbd_send_command(mdev, sock, P_UUIDS, sizeof(*p), NULL, 0);
970 }
971
972 int drbd_send_uuids(struct drbd_conf *mdev)
973 {
974 return _drbd_send_uuids(mdev, 0);
975 }
976
977 int drbd_send_uuids_skip_initial_sync(struct drbd_conf *mdev)
978 {
979 return _drbd_send_uuids(mdev, 8);
980 }
981
982 void drbd_print_uuids(struct drbd_conf *mdev, const char *text)
983 {
984 if (get_ldev_if_state(mdev, D_NEGOTIATING)) {
985 u64 *uuid = mdev->ldev->md.uuid;
986 dev_info(DEV, "%s %016llX:%016llX:%016llX:%016llX\n",
987 text,
988 (unsigned long long)uuid[UI_CURRENT],
989 (unsigned long long)uuid[UI_BITMAP],
990 (unsigned long long)uuid[UI_HISTORY_START],
991 (unsigned long long)uuid[UI_HISTORY_END]);
992 put_ldev(mdev);
993 } else {
994 dev_info(DEV, "%s effective data uuid: %016llX\n",
995 text,
996 (unsigned long long)mdev->ed_uuid);
997 }
998 }
999
1000 void drbd_gen_and_send_sync_uuid(struct drbd_conf *mdev)
1001 {
1002 struct drbd_socket *sock;
1003 struct p_rs_uuid *p;
1004 u64 uuid;
1005
1006 D_ASSERT(mdev->state.disk == D_UP_TO_DATE);
1007
1008 uuid = mdev->ldev->md.uuid[UI_BITMAP] + UUID_NEW_BM_OFFSET;
1009 drbd_uuid_set(mdev, UI_BITMAP, uuid);
1010 drbd_print_uuids(mdev, "updated sync UUID");
1011 drbd_md_sync(mdev);
1012
1013 sock = &mdev->tconn->data;
1014 p = drbd_prepare_command(mdev, sock);
1015 if (p) {
1016 p->uuid = cpu_to_be64(uuid);
1017 drbd_send_command(mdev, sock, P_SYNC_UUID, sizeof(*p), NULL, 0);
1018 }
1019 }
1020
1021 int drbd_send_sizes(struct drbd_conf *mdev, int trigger_reply, enum dds_flags flags)
1022 {
1023 struct drbd_socket *sock;
1024 struct p_sizes *p;
1025 sector_t d_size, u_size;
1026 int q_order_type, max_bio_size;
1027
1028 if (get_ldev_if_state(mdev, D_NEGOTIATING)) {
1029 D_ASSERT(mdev->ldev->backing_bdev);
1030 d_size = drbd_get_max_capacity(mdev->ldev);
1031 u_size = mdev->ldev->dc.disk_size;
1032 q_order_type = drbd_queue_order_type(mdev);
1033 max_bio_size = queue_max_hw_sectors(mdev->ldev->backing_bdev->bd_disk->queue) << 9;
1034 max_bio_size = min_t(int, max_bio_size, DRBD_MAX_BIO_SIZE);
1035 put_ldev(mdev);
1036 } else {
1037 d_size = 0;
1038 u_size = 0;
1039 q_order_type = QUEUE_ORDERED_NONE;
1040 max_bio_size = DRBD_MAX_BIO_SIZE; /* ... multiple BIOs per peer_request */
1041 }
1042
1043 sock = &mdev->tconn->data;
1044 p = drbd_prepare_command(mdev, sock);
1045 if (!p)
1046 return -EIO;
1047 p->d_size = cpu_to_be64(d_size);
1048 p->u_size = cpu_to_be64(u_size);
1049 p->c_size = cpu_to_be64(trigger_reply ? 0 : drbd_get_capacity(mdev->this_bdev));
1050 p->max_bio_size = cpu_to_be32(max_bio_size);
1051 p->queue_order_type = cpu_to_be16(q_order_type);
1052 p->dds_flags = cpu_to_be16(flags);
1053 return drbd_send_command(mdev, sock, P_SIZES, sizeof(*p), NULL, 0);
1054 }
1055
1056 /**
1057 * drbd_send_state() - Sends the drbd state to the peer
1058 * @mdev: DRBD device.
1059 */
1060 int drbd_send_state(struct drbd_conf *mdev)
1061 {
1062 struct drbd_socket *sock;
1063 struct p_state *p;
1064
1065 sock = &mdev->tconn->data;
1066 p = drbd_prepare_command(mdev, sock);
1067 if (!p)
1068 return -EIO;
1069 p->state = cpu_to_be32(mdev->state.i); /* Within the send mutex */
1070 return drbd_send_command(mdev, sock, P_STATE, sizeof(*p), NULL, 0);
1071 }
1072
1073 int drbd_send_state_req(struct drbd_conf *mdev, union drbd_state mask, union drbd_state val)
1074 {
1075 struct drbd_socket *sock;
1076 struct p_req_state *p;
1077
1078 sock = &mdev->tconn->data;
1079 p = drbd_prepare_command(mdev, sock);
1080 if (!p)
1081 return -EIO;
1082 p->mask = cpu_to_be32(mask.i);
1083 p->val = cpu_to_be32(val.i);
1084 return drbd_send_command(mdev, sock, P_STATE_CHG_REQ, sizeof(*p), NULL, 0);
1085
1086 }
1087
1088 int conn_send_state_req(struct drbd_tconn *tconn, union drbd_state mask, union drbd_state val)
1089 {
1090 enum drbd_packet cmd;
1091 struct drbd_socket *sock;
1092 struct p_req_state *p;
1093
1094 cmd = tconn->agreed_pro_version < 100 ? P_STATE_CHG_REQ : P_CONN_ST_CHG_REQ;
1095 sock = &tconn->data;
1096 p = conn_prepare_command(tconn, sock);
1097 if (!p)
1098 return -EIO;
1099 p->mask = cpu_to_be32(mask.i);
1100 p->val = cpu_to_be32(val.i);
1101 return conn_send_command(tconn, sock, cmd, sizeof(*p), NULL, 0);
1102 }
1103
1104 void drbd_send_sr_reply(struct drbd_conf *mdev, enum drbd_state_rv retcode)
1105 {
1106 struct drbd_socket *sock;
1107 struct p_req_state_reply *p;
1108
1109 sock = &mdev->tconn->meta;
1110 p = drbd_prepare_command(mdev, sock);
1111 if (p) {
1112 p->retcode = cpu_to_be32(retcode);
1113 drbd_send_command(mdev, sock, P_STATE_CHG_REPLY, sizeof(*p), NULL, 0);
1114 }
1115 }
1116
1117 void conn_send_sr_reply(struct drbd_tconn *tconn, enum drbd_state_rv retcode)
1118 {
1119 struct drbd_socket *sock;
1120 struct p_req_state_reply *p;
1121 enum drbd_packet cmd = tconn->agreed_pro_version < 100 ? P_STATE_CHG_REPLY : P_CONN_ST_CHG_REPLY;
1122
1123 sock = &tconn->meta;
1124 p = conn_prepare_command(tconn, sock);
1125 if (p) {
1126 p->retcode = cpu_to_be32(retcode);
1127 conn_send_command(tconn, sock, cmd, sizeof(*p), NULL, 0);
1128 }
1129 }
1130
1131 static void dcbp_set_code(struct p_compressed_bm *p, enum drbd_bitmap_code code)
1132 {
1133 BUG_ON(code & ~0xf);
1134 p->encoding = (p->encoding & ~0xf) | code;
1135 }
1136
1137 static void dcbp_set_start(struct p_compressed_bm *p, int set)
1138 {
1139 p->encoding = (p->encoding & ~0x80) | (set ? 0x80 : 0);
1140 }
1141
1142 static void dcbp_set_pad_bits(struct p_compressed_bm *p, int n)
1143 {
1144 BUG_ON(n & ~0x7);
1145 p->encoding = (p->encoding & (~0x7 << 4)) | (n << 4);
1146 }
1147
1148 int fill_bitmap_rle_bits(struct drbd_conf *mdev,
1149 struct p_compressed_bm *p,
1150 unsigned int size,
1151 struct bm_xfer_ctx *c)
1152 {
1153 struct bitstream bs;
1154 unsigned long plain_bits;
1155 unsigned long tmp;
1156 unsigned long rl;
1157 unsigned len;
1158 unsigned toggle;
1159 int bits, use_rle;
1160
1161 /* may we use this feature? */
1162 rcu_read_lock();
1163 use_rle = rcu_dereference(mdev->tconn->net_conf)->use_rle;
1164 rcu_read_unlock();
1165 if (!use_rle || mdev->tconn->agreed_pro_version < 90)
1166 return 0;
1167
1168 if (c->bit_offset >= c->bm_bits)
1169 return 0; /* nothing to do. */
1170
1171 /* use at most thus many bytes */
1172 bitstream_init(&bs, p->code, size, 0);
1173 memset(p->code, 0, size);
1174 /* plain bits covered in this code string */
1175 plain_bits = 0;
1176
1177 /* p->encoding & 0x80 stores whether the first run length is set.
1178 * bit offset is implicit.
1179 * start with toggle == 2 to be able to tell the first iteration */
1180 toggle = 2;
1181
1182 /* see how much plain bits we can stuff into one packet
1183 * using RLE and VLI. */
1184 do {
1185 tmp = (toggle == 0) ? _drbd_bm_find_next_zero(mdev, c->bit_offset)
1186 : _drbd_bm_find_next(mdev, c->bit_offset);
1187 if (tmp == -1UL)
1188 tmp = c->bm_bits;
1189 rl = tmp - c->bit_offset;
1190
1191 if (toggle == 2) { /* first iteration */
1192 if (rl == 0) {
1193 /* the first checked bit was set,
1194 * store start value, */
1195 dcbp_set_start(p, 1);
1196 /* but skip encoding of zero run length */
1197 toggle = !toggle;
1198 continue;
1199 }
1200 dcbp_set_start(p, 0);
1201 }
1202
1203 /* paranoia: catch zero runlength.
1204 * can only happen if bitmap is modified while we scan it. */
1205 if (rl == 0) {
1206 dev_err(DEV, "unexpected zero runlength while encoding bitmap "
1207 "t:%u bo:%lu\n", toggle, c->bit_offset);
1208 return -1;
1209 }
1210
1211 bits = vli_encode_bits(&bs, rl);
1212 if (bits == -ENOBUFS) /* buffer full */
1213 break;
1214 if (bits <= 0) {
1215 dev_err(DEV, "error while encoding bitmap: %d\n", bits);
1216 return 0;
1217 }
1218
1219 toggle = !toggle;
1220 plain_bits += rl;
1221 c->bit_offset = tmp;
1222 } while (c->bit_offset < c->bm_bits);
1223
1224 len = bs.cur.b - p->code + !!bs.cur.bit;
1225
1226 if (plain_bits < (len << 3)) {
1227 /* incompressible with this method.
1228 * we need to rewind both word and bit position. */
1229 c->bit_offset -= plain_bits;
1230 bm_xfer_ctx_bit_to_word_offset(c);
1231 c->bit_offset = c->word_offset * BITS_PER_LONG;
1232 return 0;
1233 }
1234
1235 /* RLE + VLI was able to compress it just fine.
1236 * update c->word_offset. */
1237 bm_xfer_ctx_bit_to_word_offset(c);
1238
1239 /* store pad_bits */
1240 dcbp_set_pad_bits(p, (8 - bs.cur.bit) & 0x7);
1241
1242 return len;
1243 }
1244
1245 /**
1246 * send_bitmap_rle_or_plain
1247 *
1248 * Return 0 when done, 1 when another iteration is needed, and a negative error
1249 * code upon failure.
1250 */
1251 static int
1252 send_bitmap_rle_or_plain(struct drbd_conf *mdev, struct bm_xfer_ctx *c)
1253 {
1254 struct drbd_socket *sock = &mdev->tconn->data;
1255 unsigned int header_size = drbd_header_size(mdev->tconn);
1256 struct p_compressed_bm *p = sock->sbuf + header_size;
1257 int len, err;
1258
1259 len = fill_bitmap_rle_bits(mdev, p,
1260 DRBD_SOCKET_BUFFER_SIZE - header_size - sizeof(*p), c);
1261 if (len < 0)
1262 return -EIO;
1263
1264 if (len) {
1265 dcbp_set_code(p, RLE_VLI_Bits);
1266 err = __send_command(mdev->tconn, mdev->vnr, sock,
1267 P_COMPRESSED_BITMAP, sizeof(*p) + len,
1268 NULL, 0);
1269 c->packets[0]++;
1270 c->bytes[0] += header_size + sizeof(*p) + len;
1271
1272 if (c->bit_offset >= c->bm_bits)
1273 len = 0; /* DONE */
1274 } else {
1275 /* was not compressible.
1276 * send a buffer full of plain text bits instead. */
1277 unsigned int data_size;
1278 unsigned long num_words;
1279 unsigned long *p = sock->sbuf + header_size;
1280
1281 data_size = DRBD_SOCKET_BUFFER_SIZE - header_size;
1282 num_words = min_t(size_t, data_size / sizeof(*p),
1283 c->bm_words - c->word_offset);
1284 len = num_words * sizeof(*p);
1285 if (len)
1286 drbd_bm_get_lel(mdev, c->word_offset, num_words, p);
1287 err = __send_command(mdev->tconn, mdev->vnr, sock, P_BITMAP, len, NULL, 0);
1288 c->word_offset += num_words;
1289 c->bit_offset = c->word_offset * BITS_PER_LONG;
1290
1291 c->packets[1]++;
1292 c->bytes[1] += header_size + len;
1293
1294 if (c->bit_offset > c->bm_bits)
1295 c->bit_offset = c->bm_bits;
1296 }
1297 if (!err) {
1298 if (len == 0) {
1299 INFO_bm_xfer_stats(mdev, "send", c);
1300 return 0;
1301 } else
1302 return 1;
1303 }
1304 return -EIO;
1305 }
1306
1307 /* See the comment at receive_bitmap() */
1308 static int _drbd_send_bitmap(struct drbd_conf *mdev)
1309 {
1310 struct bm_xfer_ctx c;
1311 int err;
1312
1313 if (!expect(mdev->bitmap))
1314 return false;
1315
1316 if (get_ldev(mdev)) {
1317 if (drbd_md_test_flag(mdev->ldev, MDF_FULL_SYNC)) {
1318 dev_info(DEV, "Writing the whole bitmap, MDF_FullSync was set.\n");
1319 drbd_bm_set_all(mdev);
1320 if (drbd_bm_write(mdev)) {
1321 /* write_bm did fail! Leave full sync flag set in Meta P_DATA
1322 * but otherwise process as per normal - need to tell other
1323 * side that a full resync is required! */
1324 dev_err(DEV, "Failed to write bitmap to disk!\n");
1325 } else {
1326 drbd_md_clear_flag(mdev, MDF_FULL_SYNC);
1327 drbd_md_sync(mdev);
1328 }
1329 }
1330 put_ldev(mdev);
1331 }
1332
1333 c = (struct bm_xfer_ctx) {
1334 .bm_bits = drbd_bm_bits(mdev),
1335 .bm_words = drbd_bm_words(mdev),
1336 };
1337
1338 do {
1339 err = send_bitmap_rle_or_plain(mdev, &c);
1340 } while (err > 0);
1341
1342 return err == 0;
1343 }
1344
1345 int drbd_send_bitmap(struct drbd_conf *mdev)
1346 {
1347 struct drbd_socket *sock = &mdev->tconn->data;
1348 int err = -1;
1349
1350 mutex_lock(&sock->mutex);
1351 if (sock->socket)
1352 err = !_drbd_send_bitmap(mdev);
1353 mutex_unlock(&sock->mutex);
1354 return err;
1355 }
1356
1357 void drbd_send_b_ack(struct drbd_conf *mdev, u32 barrier_nr, u32 set_size)
1358 {
1359 struct drbd_socket *sock;
1360 struct p_barrier_ack *p;
1361
1362 if (mdev->state.conn < C_CONNECTED)
1363 return;
1364
1365 sock = &mdev->tconn->meta;
1366 p = drbd_prepare_command(mdev, sock);
1367 if (!p)
1368 return;
1369 p->barrier = barrier_nr;
1370 p->set_size = cpu_to_be32(set_size);
1371 drbd_send_command(mdev, sock, P_BARRIER_ACK, sizeof(*p), NULL, 0);
1372 }
1373
1374 /**
1375 * _drbd_send_ack() - Sends an ack packet
1376 * @mdev: DRBD device.
1377 * @cmd: Packet command code.
1378 * @sector: sector, needs to be in big endian byte order
1379 * @blksize: size in byte, needs to be in big endian byte order
1380 * @block_id: Id, big endian byte order
1381 */
1382 static int _drbd_send_ack(struct drbd_conf *mdev, enum drbd_packet cmd,
1383 u64 sector, u32 blksize, u64 block_id)
1384 {
1385 struct drbd_socket *sock;
1386 struct p_block_ack *p;
1387
1388 if (mdev->state.conn < C_CONNECTED)
1389 return -EIO;
1390
1391 sock = &mdev->tconn->meta;
1392 p = drbd_prepare_command(mdev, sock);
1393 if (!p)
1394 return -EIO;
1395 p->sector = sector;
1396 p->block_id = block_id;
1397 p->blksize = blksize;
1398 p->seq_num = cpu_to_be32(atomic_inc_return(&mdev->packet_seq));
1399 return drbd_send_command(mdev, sock, cmd, sizeof(*p), NULL, 0);
1400 }
1401
1402 /* dp->sector and dp->block_id already/still in network byte order,
1403 * data_size is payload size according to dp->head,
1404 * and may need to be corrected for digest size. */
1405 void drbd_send_ack_dp(struct drbd_conf *mdev, enum drbd_packet cmd,
1406 struct p_data *dp, int data_size)
1407 {
1408 data_size -= (mdev->tconn->agreed_pro_version >= 87 && mdev->tconn->integrity_r_tfm) ?
1409 crypto_hash_digestsize(mdev->tconn->integrity_r_tfm) : 0;
1410 _drbd_send_ack(mdev, cmd, dp->sector, cpu_to_be32(data_size),
1411 dp->block_id);
1412 }
1413
1414 void drbd_send_ack_rp(struct drbd_conf *mdev, enum drbd_packet cmd,
1415 struct p_block_req *rp)
1416 {
1417 _drbd_send_ack(mdev, cmd, rp->sector, rp->blksize, rp->block_id);
1418 }
1419
1420 /**
1421 * drbd_send_ack() - Sends an ack packet
1422 * @mdev: DRBD device
1423 * @cmd: packet command code
1424 * @peer_req: peer request
1425 */
1426 int drbd_send_ack(struct drbd_conf *mdev, enum drbd_packet cmd,
1427 struct drbd_peer_request *peer_req)
1428 {
1429 return _drbd_send_ack(mdev, cmd,
1430 cpu_to_be64(peer_req->i.sector),
1431 cpu_to_be32(peer_req->i.size),
1432 peer_req->block_id);
1433 }
1434
1435 /* This function misuses the block_id field to signal if the blocks
1436 * are is sync or not. */
1437 int drbd_send_ack_ex(struct drbd_conf *mdev, enum drbd_packet cmd,
1438 sector_t sector, int blksize, u64 block_id)
1439 {
1440 return _drbd_send_ack(mdev, cmd,
1441 cpu_to_be64(sector),
1442 cpu_to_be32(blksize),
1443 cpu_to_be64(block_id));
1444 }
1445
1446 int drbd_send_drequest(struct drbd_conf *mdev, int cmd,
1447 sector_t sector, int size, u64 block_id)
1448 {
1449 struct drbd_socket *sock;
1450 struct p_block_req *p;
1451
1452 sock = &mdev->tconn->data;
1453 p = drbd_prepare_command(mdev, sock);
1454 if (!p)
1455 return -EIO;
1456 p->sector = cpu_to_be64(sector);
1457 p->block_id = block_id;
1458 p->blksize = cpu_to_be32(size);
1459 return drbd_send_command(mdev, sock, cmd, sizeof(*p), NULL, 0);
1460 }
1461
1462 int drbd_send_drequest_csum(struct drbd_conf *mdev, sector_t sector, int size,
1463 void *digest, int digest_size, enum drbd_packet cmd)
1464 {
1465 struct drbd_socket *sock;
1466 struct p_block_req *p;
1467
1468 /* FIXME: Put the digest into the preallocated socket buffer. */
1469
1470 sock = &mdev->tconn->data;
1471 p = drbd_prepare_command(mdev, sock);
1472 if (!p)
1473 return -EIO;
1474 p->sector = cpu_to_be64(sector);
1475 p->block_id = ID_SYNCER /* unused */;
1476 p->blksize = cpu_to_be32(size);
1477 return drbd_send_command(mdev, sock, cmd, sizeof(*p),
1478 digest, digest_size);
1479 }
1480
1481 int drbd_send_ov_request(struct drbd_conf *mdev, sector_t sector, int size)
1482 {
1483 struct drbd_socket *sock;
1484 struct p_block_req *p;
1485
1486 sock = &mdev->tconn->data;
1487 p = drbd_prepare_command(mdev, sock);
1488 if (!p)
1489 return -EIO;
1490 p->sector = cpu_to_be64(sector);
1491 p->block_id = ID_SYNCER /* unused */;
1492 p->blksize = cpu_to_be32(size);
1493 return drbd_send_command(mdev, sock, P_OV_REQUEST, sizeof(*p), NULL, 0);
1494 }
1495
1496 /* called on sndtimeo
1497 * returns false if we should retry,
1498 * true if we think connection is dead
1499 */
1500 static int we_should_drop_the_connection(struct drbd_tconn *tconn, struct socket *sock)
1501 {
1502 int drop_it;
1503 /* long elapsed = (long)(jiffies - mdev->last_received); */
1504
1505 drop_it = tconn->meta.socket == sock
1506 || !tconn->asender.task
1507 || get_t_state(&tconn->asender) != RUNNING
1508 || tconn->cstate < C_WF_REPORT_PARAMS;
1509
1510 if (drop_it)
1511 return true;
1512
1513 drop_it = !--tconn->ko_count;
1514 if (!drop_it) {
1515 conn_err(tconn, "[%s/%d] sock_sendmsg time expired, ko = %u\n",
1516 current->comm, current->pid, tconn->ko_count);
1517 request_ping(tconn);
1518 }
1519
1520 return drop_it; /* && (mdev->state == R_PRIMARY) */;
1521 }
1522
1523 static void drbd_update_congested(struct drbd_tconn *tconn)
1524 {
1525 struct sock *sk = tconn->data.socket->sk;
1526 if (sk->sk_wmem_queued > sk->sk_sndbuf * 4 / 5)
1527 set_bit(NET_CONGESTED, &tconn->flags);
1528 }
1529
1530 /* The idea of sendpage seems to be to put some kind of reference
1531 * to the page into the skb, and to hand it over to the NIC. In
1532 * this process get_page() gets called.
1533 *
1534 * As soon as the page was really sent over the network put_page()
1535 * gets called by some part of the network layer. [ NIC driver? ]
1536 *
1537 * [ get_page() / put_page() increment/decrement the count. If count
1538 * reaches 0 the page will be freed. ]
1539 *
1540 * This works nicely with pages from FSs.
1541 * But this means that in protocol A we might signal IO completion too early!
1542 *
1543 * In order not to corrupt data during a resync we must make sure
1544 * that we do not reuse our own buffer pages (EEs) to early, therefore
1545 * we have the net_ee list.
1546 *
1547 * XFS seems to have problems, still, it submits pages with page_count == 0!
1548 * As a workaround, we disable sendpage on pages
1549 * with page_count == 0 or PageSlab.
1550 */
1551 static int _drbd_no_send_page(struct drbd_conf *mdev, struct page *page,
1552 int offset, size_t size, unsigned msg_flags)
1553 {
1554 struct socket *socket;
1555 void *addr;
1556 int err;
1557
1558 socket = mdev->tconn->data.socket;
1559 addr = kmap(page) + offset;
1560 err = drbd_send_all(mdev->tconn, socket, addr, size, msg_flags);
1561 kunmap(page);
1562 if (!err)
1563 mdev->send_cnt += size >> 9;
1564 return err;
1565 }
1566
1567 static int _drbd_send_page(struct drbd_conf *mdev, struct page *page,
1568 int offset, size_t size, unsigned msg_flags)
1569 {
1570 struct socket *socket = mdev->tconn->data.socket;
1571 mm_segment_t oldfs = get_fs();
1572 int len = size;
1573 int err = -EIO;
1574
1575 /* e.g. XFS meta- & log-data is in slab pages, which have a
1576 * page_count of 0 and/or have PageSlab() set.
1577 * we cannot use send_page for those, as that does get_page();
1578 * put_page(); and would cause either a VM_BUG directly, or
1579 * __page_cache_release a page that would actually still be referenced
1580 * by someone, leading to some obscure delayed Oops somewhere else. */
1581 if (disable_sendpage || (page_count(page) < 1) || PageSlab(page))
1582 return _drbd_no_send_page(mdev, page, offset, size, msg_flags);
1583
1584 msg_flags |= MSG_NOSIGNAL;
1585 drbd_update_congested(mdev->tconn);
1586 set_fs(KERNEL_DS);
1587 do {
1588 int sent;
1589
1590 sent = socket->ops->sendpage(socket, page, offset, len, msg_flags);
1591 if (sent <= 0) {
1592 if (sent == -EAGAIN) {
1593 if (we_should_drop_the_connection(mdev->tconn, socket))
1594 break;
1595 continue;
1596 }
1597 dev_warn(DEV, "%s: size=%d len=%d sent=%d\n",
1598 __func__, (int)size, len, sent);
1599 if (sent < 0)
1600 err = sent;
1601 break;
1602 }
1603 len -= sent;
1604 offset += sent;
1605 } while (len > 0 /* THINK && mdev->cstate >= C_CONNECTED*/);
1606 set_fs(oldfs);
1607 clear_bit(NET_CONGESTED, &mdev->tconn->flags);
1608
1609 if (len == 0) {
1610 err = 0;
1611 mdev->send_cnt += size >> 9;
1612 }
1613 return err;
1614 }
1615
1616 static int _drbd_send_bio(struct drbd_conf *mdev, struct bio *bio)
1617 {
1618 struct bio_vec *bvec;
1619 int i;
1620 /* hint all but last page with MSG_MORE */
1621 __bio_for_each_segment(bvec, bio, i, 0) {
1622 int err;
1623
1624 err = _drbd_no_send_page(mdev, bvec->bv_page,
1625 bvec->bv_offset, bvec->bv_len,
1626 i == bio->bi_vcnt - 1 ? 0 : MSG_MORE);
1627 if (err)
1628 return err;
1629 }
1630 return 0;
1631 }
1632
1633 static int _drbd_send_zc_bio(struct drbd_conf *mdev, struct bio *bio)
1634 {
1635 struct bio_vec *bvec;
1636 int i;
1637 /* hint all but last page with MSG_MORE */
1638 __bio_for_each_segment(bvec, bio, i, 0) {
1639 int err;
1640
1641 err = _drbd_send_page(mdev, bvec->bv_page,
1642 bvec->bv_offset, bvec->bv_len,
1643 i == bio->bi_vcnt - 1 ? 0 : MSG_MORE);
1644 if (err)
1645 return err;
1646 }
1647 return 0;
1648 }
1649
1650 static int _drbd_send_zc_ee(struct drbd_conf *mdev,
1651 struct drbd_peer_request *peer_req)
1652 {
1653 struct page *page = peer_req->pages;
1654 unsigned len = peer_req->i.size;
1655 int err;
1656
1657 /* hint all but last page with MSG_MORE */
1658 page_chain_for_each(page) {
1659 unsigned l = min_t(unsigned, len, PAGE_SIZE);
1660
1661 err = _drbd_send_page(mdev, page, 0, l,
1662 page_chain_next(page) ? MSG_MORE : 0);
1663 if (err)
1664 return err;
1665 len -= l;
1666 }
1667 return 0;
1668 }
1669
1670 static u32 bio_flags_to_wire(struct drbd_conf *mdev, unsigned long bi_rw)
1671 {
1672 if (mdev->tconn->agreed_pro_version >= 95)
1673 return (bi_rw & REQ_SYNC ? DP_RW_SYNC : 0) |
1674 (bi_rw & REQ_FUA ? DP_FUA : 0) |
1675 (bi_rw & REQ_FLUSH ? DP_FLUSH : 0) |
1676 (bi_rw & REQ_DISCARD ? DP_DISCARD : 0);
1677 else
1678 return bi_rw & REQ_SYNC ? DP_RW_SYNC : 0;
1679 }
1680
1681 /* Used to send write requests
1682 * R_PRIMARY -> Peer (P_DATA)
1683 */
1684 int drbd_send_dblock(struct drbd_conf *mdev, struct drbd_request *req)
1685 {
1686 struct drbd_socket *sock;
1687 struct p_data *p;
1688 unsigned int dp_flags = 0;
1689 int dgs;
1690 int err;
1691
1692 dgs = (mdev->tconn->agreed_pro_version >= 87 && mdev->tconn->integrity_tfm) ?
1693 crypto_hash_digestsize(mdev->tconn->integrity_tfm) : 0;
1694
1695 sock = &mdev->tconn->data;
1696 p = drbd_prepare_command(mdev, sock);
1697 if (!p)
1698 return -EIO;
1699 p->sector = cpu_to_be64(req->i.sector);
1700 p->block_id = (unsigned long)req;
1701 p->seq_num = cpu_to_be32(req->seq_num = atomic_inc_return(&mdev->packet_seq));
1702 dp_flags = bio_flags_to_wire(mdev, req->master_bio->bi_rw);
1703 if (mdev->state.conn >= C_SYNC_SOURCE &&
1704 mdev->state.conn <= C_PAUSED_SYNC_T)
1705 dp_flags |= DP_MAY_SET_IN_SYNC;
1706 if (mdev->tconn->agreed_pro_version >= 100) {
1707 if (req->rq_state & RQ_EXP_RECEIVE_ACK)
1708 dp_flags |= DP_SEND_RECEIVE_ACK;
1709 if (req->rq_state & RQ_EXP_WRITE_ACK)
1710 dp_flags |= DP_SEND_WRITE_ACK;
1711 }
1712 p->dp_flags = cpu_to_be32(dp_flags);
1713 if (dgs)
1714 drbd_csum_bio(mdev, mdev->tconn->integrity_tfm, req->master_bio, p + 1);
1715 err = __send_command(mdev->tconn, mdev->vnr, sock, P_DATA, sizeof(*p) + dgs, NULL, req->i.size);
1716 if (!err) {
1717 /* For protocol A, we have to memcpy the payload into
1718 * socket buffers, as we may complete right away
1719 * as soon as we handed it over to tcp, at which point the data
1720 * pages may become invalid.
1721 *
1722 * For data-integrity enabled, we copy it as well, so we can be
1723 * sure that even if the bio pages may still be modified, it
1724 * won't change the data on the wire, thus if the digest checks
1725 * out ok after sending on this side, but does not fit on the
1726 * receiving side, we sure have detected corruption elsewhere.
1727 */
1728 if (!(req->rq_state & (RQ_EXP_RECEIVE_ACK | RQ_EXP_WRITE_ACK)) || dgs)
1729 err = _drbd_send_bio(mdev, req->master_bio);
1730 else
1731 err = _drbd_send_zc_bio(mdev, req->master_bio);
1732
1733 /* double check digest, sometimes buffers have been modified in flight. */
1734 if (dgs > 0 && dgs <= 64) {
1735 /* 64 byte, 512 bit, is the largest digest size
1736 * currently supported in kernel crypto. */
1737 unsigned char digest[64];
1738 drbd_csum_bio(mdev, mdev->tconn->integrity_tfm, req->master_bio, digest);
1739 if (memcmp(p + 1, digest, dgs)) {
1740 dev_warn(DEV,
1741 "Digest mismatch, buffer modified by upper layers during write: %llus +%u\n",
1742 (unsigned long long)req->i.sector, req->i.size);
1743 }
1744 } /* else if (dgs > 64) {
1745 ... Be noisy about digest too large ...
1746 } */
1747 }
1748 mutex_unlock(&sock->mutex); /* locked by drbd_prepare_command() */
1749
1750 return err;
1751 }
1752
1753 /* answer packet, used to send data back for read requests:
1754 * Peer -> (diskless) R_PRIMARY (P_DATA_REPLY)
1755 * C_SYNC_SOURCE -> C_SYNC_TARGET (P_RS_DATA_REPLY)
1756 */
1757 int drbd_send_block(struct drbd_conf *mdev, enum drbd_packet cmd,
1758 struct drbd_peer_request *peer_req)
1759 {
1760 struct drbd_socket *sock;
1761 struct p_data *p;
1762 int err;
1763 int dgs;
1764
1765 dgs = (mdev->tconn->agreed_pro_version >= 87 && mdev->tconn->integrity_tfm) ?
1766 crypto_hash_digestsize(mdev->tconn->integrity_tfm) : 0;
1767
1768 sock = &mdev->tconn->data;
1769 p = drbd_prepare_command(mdev, sock);
1770 if (!p)
1771 return -EIO;
1772 p->sector = cpu_to_be64(peer_req->i.sector);
1773 p->block_id = peer_req->block_id;
1774 p->seq_num = 0; /* unused */
1775 if (dgs)
1776 drbd_csum_ee(mdev, mdev->tconn->integrity_tfm, peer_req, p + 1);
1777 err = __send_command(mdev->tconn, mdev->vnr, sock, cmd, sizeof(*p) + dgs, NULL, peer_req->i.size);
1778 if (!err)
1779 err = _drbd_send_zc_ee(mdev, peer_req);
1780 mutex_unlock(&sock->mutex); /* locked by drbd_prepare_command() */
1781
1782 return err;
1783 }
1784
1785 int drbd_send_out_of_sync(struct drbd_conf *mdev, struct drbd_request *req)
1786 {
1787 struct drbd_socket *sock;
1788 struct p_block_desc *p;
1789
1790 sock = &mdev->tconn->data;
1791 p = drbd_prepare_command(mdev, sock);
1792 if (!p)
1793 return -EIO;
1794 p->sector = cpu_to_be64(req->i.sector);
1795 p->blksize = cpu_to_be32(req->i.size);
1796 return drbd_send_command(mdev, sock, P_OUT_OF_SYNC, sizeof(*p), NULL, 0);
1797 }
1798
1799 /*
1800 drbd_send distinguishes two cases:
1801
1802 Packets sent via the data socket "sock"
1803 and packets sent via the meta data socket "msock"
1804
1805 sock msock
1806 -----------------+-------------------------+------------------------------
1807 timeout conf.timeout / 2 conf.timeout / 2
1808 timeout action send a ping via msock Abort communication
1809 and close all sockets
1810 */
1811
1812 /*
1813 * you must have down()ed the appropriate [m]sock_mutex elsewhere!
1814 */
1815 int drbd_send(struct drbd_tconn *tconn, struct socket *sock,
1816 void *buf, size_t size, unsigned msg_flags)
1817 {
1818 struct kvec iov;
1819 struct msghdr msg;
1820 int rv, sent = 0;
1821
1822 if (!sock)
1823 return -EBADR;
1824
1825 /* THINK if (signal_pending) return ... ? */
1826
1827 iov.iov_base = buf;
1828 iov.iov_len = size;
1829
1830 msg.msg_name = NULL;
1831 msg.msg_namelen = 0;
1832 msg.msg_control = NULL;
1833 msg.msg_controllen = 0;
1834 msg.msg_flags = msg_flags | MSG_NOSIGNAL;
1835
1836 if (sock == tconn->data.socket) {
1837 rcu_read_lock();
1838 tconn->ko_count = rcu_dereference(tconn->net_conf)->ko_count;
1839 rcu_read_unlock();
1840 drbd_update_congested(tconn);
1841 }
1842 do {
1843 /* STRANGE
1844 * tcp_sendmsg does _not_ use its size parameter at all ?
1845 *
1846 * -EAGAIN on timeout, -EINTR on signal.
1847 */
1848 /* THINK
1849 * do we need to block DRBD_SIG if sock == &meta.socket ??
1850 * otherwise wake_asender() might interrupt some send_*Ack !
1851 */
1852 rv = kernel_sendmsg(sock, &msg, &iov, 1, size);
1853 if (rv == -EAGAIN) {
1854 if (we_should_drop_the_connection(tconn, sock))
1855 break;
1856 else
1857 continue;
1858 }
1859 if (rv == -EINTR) {
1860 flush_signals(current);
1861 rv = 0;
1862 }
1863 if (rv < 0)
1864 break;
1865 sent += rv;
1866 iov.iov_base += rv;
1867 iov.iov_len -= rv;
1868 } while (sent < size);
1869
1870 if (sock == tconn->data.socket)
1871 clear_bit(NET_CONGESTED, &tconn->flags);
1872
1873 if (rv <= 0) {
1874 if (rv != -EAGAIN) {
1875 conn_err(tconn, "%s_sendmsg returned %d\n",
1876 sock == tconn->meta.socket ? "msock" : "sock",
1877 rv);
1878 conn_request_state(tconn, NS(conn, C_BROKEN_PIPE), CS_HARD);
1879 } else
1880 conn_request_state(tconn, NS(conn, C_TIMEOUT), CS_HARD);
1881 }
1882
1883 return sent;
1884 }
1885
1886 /**
1887 * drbd_send_all - Send an entire buffer
1888 *
1889 * Returns 0 upon success and a negative error value otherwise.
1890 */
1891 int drbd_send_all(struct drbd_tconn *tconn, struct socket *sock, void *buffer,
1892 size_t size, unsigned msg_flags)
1893 {
1894 int err;
1895
1896 err = drbd_send(tconn, sock, buffer, size, msg_flags);
1897 if (err < 0)
1898 return err;
1899 if (err != size)
1900 return -EIO;
1901 return 0;
1902 }
1903
1904 static int drbd_open(struct block_device *bdev, fmode_t mode)
1905 {
1906 struct drbd_conf *mdev = bdev->bd_disk->private_data;
1907 unsigned long flags;
1908 int rv = 0;
1909
1910 mutex_lock(&drbd_main_mutex);
1911 spin_lock_irqsave(&mdev->tconn->req_lock, flags);
1912 /* to have a stable mdev->state.role
1913 * and no race with updating open_cnt */
1914
1915 if (mdev->state.role != R_PRIMARY) {
1916 if (mode & FMODE_WRITE)
1917 rv = -EROFS;
1918 else if (!allow_oos)
1919 rv = -EMEDIUMTYPE;
1920 }
1921
1922 if (!rv)
1923 mdev->open_cnt++;
1924 spin_unlock_irqrestore(&mdev->tconn->req_lock, flags);
1925 mutex_unlock(&drbd_main_mutex);
1926
1927 return rv;
1928 }
1929
1930 static int drbd_release(struct gendisk *gd, fmode_t mode)
1931 {
1932 struct drbd_conf *mdev = gd->private_data;
1933 mutex_lock(&drbd_main_mutex);
1934 mdev->open_cnt--;
1935 mutex_unlock(&drbd_main_mutex);
1936 return 0;
1937 }
1938
1939 static void drbd_set_defaults(struct drbd_conf *mdev)
1940 {
1941 /* Beware! The actual layout differs
1942 * between big endian and little endian */
1943 mdev->state = (union drbd_dev_state) {
1944 { .role = R_SECONDARY,
1945 .peer = R_UNKNOWN,
1946 .conn = C_STANDALONE,
1947 .disk = D_DISKLESS,
1948 .pdsk = D_UNKNOWN,
1949 } };
1950 }
1951
1952 void drbd_init_set_defaults(struct drbd_conf *mdev)
1953 {
1954 /* the memset(,0,) did most of this.
1955 * note: only assignments, no allocation in here */
1956
1957 drbd_set_defaults(mdev);
1958
1959 atomic_set(&mdev->ap_bio_cnt, 0);
1960 atomic_set(&mdev->ap_pending_cnt, 0);
1961 atomic_set(&mdev->rs_pending_cnt, 0);
1962 atomic_set(&mdev->unacked_cnt, 0);
1963 atomic_set(&mdev->local_cnt, 0);
1964 atomic_set(&mdev->pp_in_use_by_net, 0);
1965 atomic_set(&mdev->rs_sect_in, 0);
1966 atomic_set(&mdev->rs_sect_ev, 0);
1967 atomic_set(&mdev->ap_in_flight, 0);
1968
1969 mutex_init(&mdev->md_io_mutex);
1970 mutex_init(&mdev->own_state_mutex);
1971 mdev->state_mutex = &mdev->own_state_mutex;
1972
1973 spin_lock_init(&mdev->al_lock);
1974 spin_lock_init(&mdev->peer_seq_lock);
1975 spin_lock_init(&mdev->epoch_lock);
1976
1977 INIT_LIST_HEAD(&mdev->active_ee);
1978 INIT_LIST_HEAD(&mdev->sync_ee);
1979 INIT_LIST_HEAD(&mdev->done_ee);
1980 INIT_LIST_HEAD(&mdev->read_ee);
1981 INIT_LIST_HEAD(&mdev->net_ee);
1982 INIT_LIST_HEAD(&mdev->resync_reads);
1983 INIT_LIST_HEAD(&mdev->resync_work.list);
1984 INIT_LIST_HEAD(&mdev->unplug_work.list);
1985 INIT_LIST_HEAD(&mdev->go_diskless.list);
1986 INIT_LIST_HEAD(&mdev->md_sync_work.list);
1987 INIT_LIST_HEAD(&mdev->start_resync_work.list);
1988 INIT_LIST_HEAD(&mdev->bm_io_work.w.list);
1989
1990 mdev->resync_work.cb = w_resync_timer;
1991 mdev->unplug_work.cb = w_send_write_hint;
1992 mdev->go_diskless.cb = w_go_diskless;
1993 mdev->md_sync_work.cb = w_md_sync;
1994 mdev->bm_io_work.w.cb = w_bitmap_io;
1995 mdev->start_resync_work.cb = w_start_resync;
1996
1997 mdev->resync_work.mdev = mdev;
1998 mdev->unplug_work.mdev = mdev;
1999 mdev->go_diskless.mdev = mdev;
2000 mdev->md_sync_work.mdev = mdev;
2001 mdev->bm_io_work.w.mdev = mdev;
2002 mdev->start_resync_work.mdev = mdev;
2003
2004 init_timer(&mdev->resync_timer);
2005 init_timer(&mdev->md_sync_timer);
2006 init_timer(&mdev->start_resync_timer);
2007 init_timer(&mdev->request_timer);
2008 mdev->resync_timer.function = resync_timer_fn;
2009 mdev->resync_timer.data = (unsigned long) mdev;
2010 mdev->md_sync_timer.function = md_sync_timer_fn;
2011 mdev->md_sync_timer.data = (unsigned long) mdev;
2012 mdev->start_resync_timer.function = start_resync_timer_fn;
2013 mdev->start_resync_timer.data = (unsigned long) mdev;
2014 mdev->request_timer.function = request_timer_fn;
2015 mdev->request_timer.data = (unsigned long) mdev;
2016
2017 init_waitqueue_head(&mdev->misc_wait);
2018 init_waitqueue_head(&mdev->state_wait);
2019 init_waitqueue_head(&mdev->ee_wait);
2020 init_waitqueue_head(&mdev->al_wait);
2021 init_waitqueue_head(&mdev->seq_wait);
2022
2023 /* mdev->tconn->agreed_pro_version gets initialized in drbd_connect() */
2024 mdev->write_ordering = WO_bdev_flush;
2025 mdev->resync_wenr = LC_FREE;
2026 mdev->peer_max_bio_size = DRBD_MAX_BIO_SIZE_SAFE;
2027 mdev->local_max_bio_size = DRBD_MAX_BIO_SIZE_SAFE;
2028 }
2029
2030 void drbd_mdev_cleanup(struct drbd_conf *mdev)
2031 {
2032 int i;
2033 if (mdev->tconn->receiver.t_state != NONE)
2034 dev_err(DEV, "ASSERT FAILED: receiver t_state == %d expected 0.\n",
2035 mdev->tconn->receiver.t_state);
2036
2037 /* no need to lock it, I'm the only thread alive */
2038 if (atomic_read(&mdev->current_epoch->epoch_size) != 0)
2039 dev_err(DEV, "epoch_size:%d\n", atomic_read(&mdev->current_epoch->epoch_size));
2040 mdev->al_writ_cnt =
2041 mdev->bm_writ_cnt =
2042 mdev->read_cnt =
2043 mdev->recv_cnt =
2044 mdev->send_cnt =
2045 mdev->writ_cnt =
2046 mdev->p_size =
2047 mdev->rs_start =
2048 mdev->rs_total =
2049 mdev->rs_failed = 0;
2050 mdev->rs_last_events = 0;
2051 mdev->rs_last_sect_ev = 0;
2052 for (i = 0; i < DRBD_SYNC_MARKS; i++) {
2053 mdev->rs_mark_left[i] = 0;
2054 mdev->rs_mark_time[i] = 0;
2055 }
2056 D_ASSERT(mdev->tconn->net_conf == NULL);
2057
2058 drbd_set_my_capacity(mdev, 0);
2059 if (mdev->bitmap) {
2060 /* maybe never allocated. */
2061 drbd_bm_resize(mdev, 0, 1);
2062 drbd_bm_cleanup(mdev);
2063 }
2064
2065 drbd_free_bc(mdev->ldev);
2066 mdev->ldev = NULL;
2067
2068 clear_bit(AL_SUSPENDED, &mdev->flags);
2069
2070 D_ASSERT(list_empty(&mdev->active_ee));
2071 D_ASSERT(list_empty(&mdev->sync_ee));
2072 D_ASSERT(list_empty(&mdev->done_ee));
2073 D_ASSERT(list_empty(&mdev->read_ee));
2074 D_ASSERT(list_empty(&mdev->net_ee));
2075 D_ASSERT(list_empty(&mdev->resync_reads));
2076 D_ASSERT(list_empty(&mdev->tconn->data.work.q));
2077 D_ASSERT(list_empty(&mdev->tconn->meta.work.q));
2078 D_ASSERT(list_empty(&mdev->resync_work.list));
2079 D_ASSERT(list_empty(&mdev->unplug_work.list));
2080 D_ASSERT(list_empty(&mdev->go_diskless.list));
2081
2082 drbd_set_defaults(mdev);
2083 }
2084
2085
2086 static void drbd_destroy_mempools(void)
2087 {
2088 struct page *page;
2089
2090 while (drbd_pp_pool) {
2091 page = drbd_pp_pool;
2092 drbd_pp_pool = (struct page *)page_private(page);
2093 __free_page(page);
2094 drbd_pp_vacant--;
2095 }
2096
2097 /* D_ASSERT(atomic_read(&drbd_pp_vacant)==0); */
2098
2099 if (drbd_md_io_bio_set)
2100 bioset_free(drbd_md_io_bio_set);
2101 if (drbd_md_io_page_pool)
2102 mempool_destroy(drbd_md_io_page_pool);
2103 if (drbd_ee_mempool)
2104 mempool_destroy(drbd_ee_mempool);
2105 if (drbd_request_mempool)
2106 mempool_destroy(drbd_request_mempool);
2107 if (drbd_ee_cache)
2108 kmem_cache_destroy(drbd_ee_cache);
2109 if (drbd_request_cache)
2110 kmem_cache_destroy(drbd_request_cache);
2111 if (drbd_bm_ext_cache)
2112 kmem_cache_destroy(drbd_bm_ext_cache);
2113 if (drbd_al_ext_cache)
2114 kmem_cache_destroy(drbd_al_ext_cache);
2115
2116 drbd_md_io_bio_set = NULL;
2117 drbd_md_io_page_pool = NULL;
2118 drbd_ee_mempool = NULL;
2119 drbd_request_mempool = NULL;
2120 drbd_ee_cache = NULL;
2121 drbd_request_cache = NULL;
2122 drbd_bm_ext_cache = NULL;
2123 drbd_al_ext_cache = NULL;
2124
2125 return;
2126 }
2127
2128 static int drbd_create_mempools(void)
2129 {
2130 struct page *page;
2131 const int number = (DRBD_MAX_BIO_SIZE/PAGE_SIZE) * minor_count;
2132 int i;
2133
2134 /* prepare our caches and mempools */
2135 drbd_request_mempool = NULL;
2136 drbd_ee_cache = NULL;
2137 drbd_request_cache = NULL;
2138 drbd_bm_ext_cache = NULL;
2139 drbd_al_ext_cache = NULL;
2140 drbd_pp_pool = NULL;
2141 drbd_md_io_page_pool = NULL;
2142 drbd_md_io_bio_set = NULL;
2143
2144 /* caches */
2145 drbd_request_cache = kmem_cache_create(
2146 "drbd_req", sizeof(struct drbd_request), 0, 0, NULL);
2147 if (drbd_request_cache == NULL)
2148 goto Enomem;
2149
2150 drbd_ee_cache = kmem_cache_create(
2151 "drbd_ee", sizeof(struct drbd_peer_request), 0, 0, NULL);
2152 if (drbd_ee_cache == NULL)
2153 goto Enomem;
2154
2155 drbd_bm_ext_cache = kmem_cache_create(
2156 "drbd_bm", sizeof(struct bm_extent), 0, 0, NULL);
2157 if (drbd_bm_ext_cache == NULL)
2158 goto Enomem;
2159
2160 drbd_al_ext_cache = kmem_cache_create(
2161 "drbd_al", sizeof(struct lc_element), 0, 0, NULL);
2162 if (drbd_al_ext_cache == NULL)
2163 goto Enomem;
2164
2165 /* mempools */
2166 drbd_md_io_bio_set = bioset_create(DRBD_MIN_POOL_PAGES, 0);
2167 if (drbd_md_io_bio_set == NULL)
2168 goto Enomem;
2169
2170 drbd_md_io_page_pool = mempool_create_page_pool(DRBD_MIN_POOL_PAGES, 0);
2171 if (drbd_md_io_page_pool == NULL)
2172 goto Enomem;
2173
2174 drbd_request_mempool = mempool_create(number,
2175 mempool_alloc_slab, mempool_free_slab, drbd_request_cache);
2176 if (drbd_request_mempool == NULL)
2177 goto Enomem;
2178
2179 drbd_ee_mempool = mempool_create(number,
2180 mempool_alloc_slab, mempool_free_slab, drbd_ee_cache);
2181 if (drbd_ee_mempool == NULL)
2182 goto Enomem;
2183
2184 /* drbd's page pool */
2185 spin_lock_init(&drbd_pp_lock);
2186
2187 for (i = 0; i < number; i++) {
2188 page = alloc_page(GFP_HIGHUSER);
2189 if (!page)
2190 goto Enomem;
2191 set_page_private(page, (unsigned long)drbd_pp_pool);
2192 drbd_pp_pool = page;
2193 }
2194 drbd_pp_vacant = number;
2195
2196 return 0;
2197
2198 Enomem:
2199 drbd_destroy_mempools(); /* in case we allocated some */
2200 return -ENOMEM;
2201 }
2202
2203 static int drbd_notify_sys(struct notifier_block *this, unsigned long code,
2204 void *unused)
2205 {
2206 /* just so we have it. you never know what interesting things we
2207 * might want to do here some day...
2208 */
2209
2210 return NOTIFY_DONE;
2211 }
2212
2213 static struct notifier_block drbd_notifier = {
2214 .notifier_call = drbd_notify_sys,
2215 };
2216
2217 static void drbd_release_all_peer_reqs(struct drbd_conf *mdev)
2218 {
2219 int rr;
2220
2221 rr = drbd_free_peer_reqs(mdev, &mdev->active_ee);
2222 if (rr)
2223 dev_err(DEV, "%d EEs in active list found!\n", rr);
2224
2225 rr = drbd_free_peer_reqs(mdev, &mdev->sync_ee);
2226 if (rr)
2227 dev_err(DEV, "%d EEs in sync list found!\n", rr);
2228
2229 rr = drbd_free_peer_reqs(mdev, &mdev->read_ee);
2230 if (rr)
2231 dev_err(DEV, "%d EEs in read list found!\n", rr);
2232
2233 rr = drbd_free_peer_reqs(mdev, &mdev->done_ee);
2234 if (rr)
2235 dev_err(DEV, "%d EEs in done list found!\n", rr);
2236
2237 rr = drbd_free_peer_reqs(mdev, &mdev->net_ee);
2238 if (rr)
2239 dev_err(DEV, "%d EEs in net list found!\n", rr);
2240 }
2241
2242 /* caution. no locking. */
2243 void drbd_delete_device(struct drbd_conf *mdev)
2244 {
2245 struct drbd_tconn *tconn = mdev->tconn;
2246
2247 idr_remove(&mdev->tconn->volumes, mdev->vnr);
2248 idr_remove(&minors, mdev_to_minor(mdev));
2249 synchronize_rcu();
2250
2251 /* paranoia asserts */
2252 D_ASSERT(mdev->open_cnt == 0);
2253 D_ASSERT(list_empty(&mdev->tconn->data.work.q));
2254 /* end paranoia asserts */
2255
2256 del_gendisk(mdev->vdisk);
2257
2258 /* cleanup stuff that may have been allocated during
2259 * device (re-)configuration or state changes */
2260
2261 if (mdev->this_bdev)
2262 bdput(mdev->this_bdev);
2263
2264 drbd_free_bc(mdev->ldev);
2265 mdev->ldev = NULL;
2266
2267 drbd_release_all_peer_reqs(mdev);
2268
2269 lc_destroy(mdev->act_log);
2270 lc_destroy(mdev->resync);
2271
2272 kfree(mdev->p_uuid);
2273 /* mdev->p_uuid = NULL; */
2274
2275 kfree(mdev->current_epoch);
2276 if (mdev->bitmap) /* should no longer be there. */
2277 drbd_bm_cleanup(mdev);
2278 __free_page(mdev->md_io_page);
2279 put_disk(mdev->vdisk);
2280 blk_cleanup_queue(mdev->rq_queue);
2281 kfree(mdev);
2282
2283 kref_put(&tconn->kref, &conn_destroy);
2284 }
2285
2286 static void drbd_cleanup(void)
2287 {
2288 unsigned int i;
2289 struct drbd_conf *mdev;
2290
2291 unregister_reboot_notifier(&drbd_notifier);
2292
2293 /* first remove proc,
2294 * drbdsetup uses it's presence to detect
2295 * whether DRBD is loaded.
2296 * If we would get stuck in proc removal,
2297 * but have netlink already deregistered,
2298 * some drbdsetup commands may wait forever
2299 * for an answer.
2300 */
2301 if (drbd_proc)
2302 remove_proc_entry("drbd", NULL);
2303
2304 drbd_genl_unregister();
2305
2306 down_write(&drbd_cfg_rwsem);
2307 idr_for_each_entry(&minors, mdev, i)
2308 drbd_delete_device(mdev);
2309 up_write(&drbd_cfg_rwsem);
2310
2311 drbd_destroy_mempools();
2312 unregister_blkdev(DRBD_MAJOR, "drbd");
2313
2314 idr_destroy(&minors);
2315
2316 printk(KERN_INFO "drbd: module cleanup done.\n");
2317 }
2318
2319 /**
2320 * drbd_congested() - Callback for pdflush
2321 * @congested_data: User data
2322 * @bdi_bits: Bits pdflush is currently interested in
2323 *
2324 * Returns 1<<BDI_async_congested and/or 1<<BDI_sync_congested if we are congested.
2325 */
2326 static int drbd_congested(void *congested_data, int bdi_bits)
2327 {
2328 struct drbd_conf *mdev = congested_data;
2329 struct request_queue *q;
2330 char reason = '-';
2331 int r = 0;
2332
2333 if (!may_inc_ap_bio(mdev)) {
2334 /* DRBD has frozen IO */
2335 r = bdi_bits;
2336 reason = 'd';
2337 goto out;
2338 }
2339
2340 if (get_ldev(mdev)) {
2341 q = bdev_get_queue(mdev->ldev->backing_bdev);
2342 r = bdi_congested(&q->backing_dev_info, bdi_bits);
2343 put_ldev(mdev);
2344 if (r)
2345 reason = 'b';
2346 }
2347
2348 if (bdi_bits & (1 << BDI_async_congested) && test_bit(NET_CONGESTED, &mdev->tconn->flags)) {
2349 r |= (1 << BDI_async_congested);
2350 reason = reason == 'b' ? 'a' : 'n';
2351 }
2352
2353 out:
2354 mdev->congestion_reason = reason;
2355 return r;
2356 }
2357
2358 static void drbd_init_workqueue(struct drbd_work_queue* wq)
2359 {
2360 sema_init(&wq->s, 0);
2361 spin_lock_init(&wq->q_lock);
2362 INIT_LIST_HEAD(&wq->q);
2363 }
2364
2365 struct drbd_tconn *conn_get_by_name(const char *name)
2366 {
2367 struct drbd_tconn *tconn;
2368
2369 if (!name || !name[0])
2370 return NULL;
2371
2372 down_read(&drbd_cfg_rwsem);
2373 list_for_each_entry(tconn, &drbd_tconns, all_tconn) {
2374 if (!strcmp(tconn->name, name)) {
2375 kref_get(&tconn->kref);
2376 goto found;
2377 }
2378 }
2379 tconn = NULL;
2380 found:
2381 up_read(&drbd_cfg_rwsem);
2382 return tconn;
2383 }
2384
2385 static int drbd_alloc_socket(struct drbd_socket *socket)
2386 {
2387 socket->rbuf = (void *) __get_free_page(GFP_KERNEL);
2388 if (!socket->rbuf)
2389 return -ENOMEM;
2390 socket->sbuf = (void *) __get_free_page(GFP_KERNEL);
2391 if (!socket->sbuf)
2392 return -ENOMEM;
2393 return 0;
2394 }
2395
2396 static void drbd_free_socket(struct drbd_socket *socket)
2397 {
2398 free_page((unsigned long) socket->sbuf);
2399 free_page((unsigned long) socket->rbuf);
2400 }
2401
2402 void conn_free_crypto(struct drbd_tconn *tconn)
2403 {
2404 drbd_free_sock(tconn);
2405
2406 crypto_free_hash(tconn->csums_tfm);
2407 crypto_free_hash(tconn->verify_tfm);
2408 crypto_free_hash(tconn->cram_hmac_tfm);
2409 crypto_free_hash(tconn->integrity_tfm);
2410 crypto_free_hash(tconn->integrity_r_tfm);
2411 kfree(tconn->int_dig_in);
2412 kfree(tconn->int_dig_vv);
2413
2414 tconn->csums_tfm = NULL;
2415 tconn->verify_tfm = NULL;
2416 tconn->cram_hmac_tfm = NULL;
2417 tconn->integrity_tfm = NULL;
2418 tconn->integrity_r_tfm = NULL;
2419 tconn->int_dig_in = NULL;
2420 tconn->int_dig_vv = NULL;
2421 }
2422
2423 struct drbd_tconn *conn_create(const char *name)
2424 {
2425 struct drbd_tconn *tconn;
2426
2427 tconn = kzalloc(sizeof(struct drbd_tconn), GFP_KERNEL);
2428 if (!tconn)
2429 return NULL;
2430
2431 tconn->name = kstrdup(name, GFP_KERNEL);
2432 if (!tconn->name)
2433 goto fail;
2434
2435 if (drbd_alloc_socket(&tconn->data))
2436 goto fail;
2437 if (drbd_alloc_socket(&tconn->meta))
2438 goto fail;
2439
2440 if (!zalloc_cpumask_var(&tconn->cpu_mask, GFP_KERNEL))
2441 goto fail;
2442
2443 if (!tl_init(tconn))
2444 goto fail;
2445
2446 tconn->cstate = C_STANDALONE;
2447 mutex_init(&tconn->cstate_mutex);
2448 spin_lock_init(&tconn->req_lock);
2449 mutex_init(&tconn->net_conf_update);
2450 init_waitqueue_head(&tconn->ping_wait);
2451 idr_init(&tconn->volumes);
2452
2453 drbd_init_workqueue(&tconn->data.work);
2454 mutex_init(&tconn->data.mutex);
2455
2456 drbd_init_workqueue(&tconn->meta.work);
2457 mutex_init(&tconn->meta.mutex);
2458
2459 drbd_thread_init(tconn, &tconn->receiver, drbdd_init, "receiver");
2460 drbd_thread_init(tconn, &tconn->worker, drbd_worker, "worker");
2461 drbd_thread_init(tconn, &tconn->asender, drbd_asender, "asender");
2462
2463 drbd_set_res_opts_default(&tconn->res_opts);
2464
2465 down_write(&drbd_cfg_rwsem);
2466 kref_init(&tconn->kref);
2467 list_add_tail(&tconn->all_tconn, &drbd_tconns);
2468 up_write(&drbd_cfg_rwsem);
2469
2470 return tconn;
2471
2472 fail:
2473 tl_cleanup(tconn);
2474 free_cpumask_var(tconn->cpu_mask);
2475 drbd_free_socket(&tconn->meta);
2476 drbd_free_socket(&tconn->data);
2477 kfree(tconn->name);
2478 kfree(tconn);
2479
2480 return NULL;
2481 }
2482
2483 void conn_destroy(struct kref *kref)
2484 {
2485 struct drbd_tconn *tconn = container_of(kref, struct drbd_tconn, kref);
2486
2487 idr_destroy(&tconn->volumes);
2488
2489 free_cpumask_var(tconn->cpu_mask);
2490 drbd_free_socket(&tconn->meta);
2491 drbd_free_socket(&tconn->data);
2492 kfree(tconn->name);
2493 kfree(tconn->int_dig_in);
2494 kfree(tconn->int_dig_vv);
2495 kfree(tconn);
2496 }
2497
2498 enum drbd_ret_code conn_new_minor(struct drbd_tconn *tconn, unsigned int minor, int vnr)
2499 {
2500 struct drbd_conf *mdev;
2501 struct gendisk *disk;
2502 struct request_queue *q;
2503 int vnr_got = vnr;
2504 int minor_got = minor;
2505 enum drbd_ret_code err = ERR_NOMEM;
2506
2507 mdev = minor_to_mdev(minor);
2508 if (mdev)
2509 return ERR_MINOR_EXISTS;
2510
2511 /* GFP_KERNEL, we are outside of all write-out paths */
2512 mdev = kzalloc(sizeof(struct drbd_conf), GFP_KERNEL);
2513 if (!mdev)
2514 return ERR_NOMEM;
2515
2516 kref_get(&tconn->kref);
2517 mdev->tconn = tconn;
2518
2519 mdev->minor = minor;
2520 mdev->vnr = vnr;
2521
2522 drbd_init_set_defaults(mdev);
2523
2524 q = blk_alloc_queue(GFP_KERNEL);
2525 if (!q)
2526 goto out_no_q;
2527 mdev->rq_queue = q;
2528 q->queuedata = mdev;
2529
2530 disk = alloc_disk(1);
2531 if (!disk)
2532 goto out_no_disk;
2533 mdev->vdisk = disk;
2534
2535 set_disk_ro(disk, true);
2536
2537 disk->queue = q;
2538 disk->major = DRBD_MAJOR;
2539 disk->first_minor = minor;
2540 disk->fops = &drbd_ops;
2541 sprintf(disk->disk_name, "drbd%d", minor);
2542 disk->private_data = mdev;
2543
2544 mdev->this_bdev = bdget(MKDEV(DRBD_MAJOR, minor));
2545 /* we have no partitions. we contain only ourselves. */
2546 mdev->this_bdev->bd_contains = mdev->this_bdev;
2547
2548 q->backing_dev_info.congested_fn = drbd_congested;
2549 q->backing_dev_info.congested_data = mdev;
2550
2551 blk_queue_make_request(q, drbd_make_request);
2552 /* Setting the max_hw_sectors to an odd value of 8kibyte here
2553 This triggers a max_bio_size message upon first attach or connect */
2554 blk_queue_max_hw_sectors(q, DRBD_MAX_BIO_SIZE_SAFE >> 8);
2555 blk_queue_bounce_limit(q, BLK_BOUNCE_ANY);
2556 blk_queue_merge_bvec(q, drbd_merge_bvec);
2557 q->queue_lock = &mdev->tconn->req_lock; /* needed since we use */
2558
2559 mdev->md_io_page = alloc_page(GFP_KERNEL);
2560 if (!mdev->md_io_page)
2561 goto out_no_io_page;
2562
2563 if (drbd_bm_init(mdev))
2564 goto out_no_bitmap;
2565 mdev->read_requests = RB_ROOT;
2566 mdev->write_requests = RB_ROOT;
2567
2568 mdev->current_epoch = kzalloc(sizeof(struct drbd_epoch), GFP_KERNEL);
2569 if (!mdev->current_epoch)
2570 goto out_no_epoch;
2571
2572 INIT_LIST_HEAD(&mdev->current_epoch->list);
2573 mdev->epochs = 1;
2574
2575 if (!idr_pre_get(&minors, GFP_KERNEL))
2576 goto out_no_minor_idr;
2577 if (idr_get_new_above(&minors, mdev, minor, &minor_got))
2578 goto out_no_minor_idr;
2579 if (minor_got != minor) {
2580 err = ERR_MINOR_EXISTS;
2581 drbd_msg_put_info("requested minor exists already");
2582 goto out_idr_remove_minor;
2583 }
2584
2585 if (!idr_pre_get(&tconn->volumes, GFP_KERNEL))
2586 goto out_idr_remove_minor;
2587 if (idr_get_new_above(&tconn->volumes, mdev, vnr, &vnr_got))
2588 goto out_idr_remove_minor;
2589 if (vnr_got != vnr) {
2590 err = ERR_INVALID_REQUEST;
2591 drbd_msg_put_info("requested volume exists already");
2592 goto out_idr_remove_vol;
2593 }
2594 add_disk(disk);
2595
2596 /* inherit the connection state */
2597 mdev->state.conn = tconn->cstate;
2598 if (mdev->state.conn == C_WF_REPORT_PARAMS)
2599 drbd_connected(vnr, mdev, tconn);
2600
2601 return NO_ERROR;
2602
2603 out_idr_remove_vol:
2604 idr_remove(&tconn->volumes, vnr_got);
2605 out_idr_remove_minor:
2606 idr_remove(&minors, minor_got);
2607 synchronize_rcu();
2608 out_no_minor_idr:
2609 kfree(mdev->current_epoch);
2610 out_no_epoch:
2611 drbd_bm_cleanup(mdev);
2612 out_no_bitmap:
2613 __free_page(mdev->md_io_page);
2614 out_no_io_page:
2615 put_disk(disk);
2616 out_no_disk:
2617 blk_cleanup_queue(q);
2618 out_no_q:
2619 kfree(mdev);
2620 kref_put(&tconn->kref, &conn_destroy);
2621 return err;
2622 }
2623
2624 int __init drbd_init(void)
2625 {
2626 int err;
2627
2628 if (minor_count < DRBD_MINOR_COUNT_MIN || minor_count > DRBD_MINOR_COUNT_MAX) {
2629 printk(KERN_ERR
2630 "drbd: invalid minor_count (%d)\n", minor_count);
2631 #ifdef MODULE
2632 return -EINVAL;
2633 #else
2634 minor_count = 8;
2635 #endif
2636 }
2637
2638 err = register_blkdev(DRBD_MAJOR, "drbd");
2639 if (err) {
2640 printk(KERN_ERR
2641 "drbd: unable to register block device major %d\n",
2642 DRBD_MAJOR);
2643 return err;
2644 }
2645
2646 err = drbd_genl_register();
2647 if (err) {
2648 printk(KERN_ERR "drbd: unable to register generic netlink family\n");
2649 goto fail;
2650 }
2651
2652
2653 register_reboot_notifier(&drbd_notifier);
2654
2655 /*
2656 * allocate all necessary structs
2657 */
2658 err = -ENOMEM;
2659
2660 init_waitqueue_head(&drbd_pp_wait);
2661
2662 drbd_proc = NULL; /* play safe for drbd_cleanup */
2663 idr_init(&minors);
2664
2665 err = drbd_create_mempools();
2666 if (err)
2667 goto fail;
2668
2669 drbd_proc = proc_create_data("drbd", S_IFREG | S_IRUGO , NULL, &drbd_proc_fops, NULL);
2670 if (!drbd_proc) {
2671 printk(KERN_ERR "drbd: unable to register proc file\n");
2672 goto fail;
2673 }
2674
2675 rwlock_init(&global_state_lock);
2676 INIT_LIST_HEAD(&drbd_tconns);
2677
2678 printk(KERN_INFO "drbd: initialized. "
2679 "Version: " REL_VERSION " (api:%d/proto:%d-%d)\n",
2680 API_VERSION, PRO_VERSION_MIN, PRO_VERSION_MAX);
2681 printk(KERN_INFO "drbd: %s\n", drbd_buildtag());
2682 printk(KERN_INFO "drbd: registered as block device major %d\n",
2683 DRBD_MAJOR);
2684
2685 return 0; /* Success! */
2686
2687 fail:
2688 drbd_cleanup();
2689 if (err == -ENOMEM)
2690 /* currently always the case */
2691 printk(KERN_ERR "drbd: ran out of memory\n");
2692 else
2693 printk(KERN_ERR "drbd: initialization failure\n");
2694 return err;
2695 }
2696
2697 void drbd_free_bc(struct drbd_backing_dev *ldev)
2698 {
2699 if (ldev == NULL)
2700 return;
2701
2702 blkdev_put(ldev->backing_bdev, FMODE_READ | FMODE_WRITE | FMODE_EXCL);
2703 blkdev_put(ldev->md_bdev, FMODE_READ | FMODE_WRITE | FMODE_EXCL);
2704
2705 kfree(ldev);
2706 }
2707
2708 void drbd_free_sock(struct drbd_tconn *tconn)
2709 {
2710 if (tconn->data.socket) {
2711 mutex_lock(&tconn->data.mutex);
2712 kernel_sock_shutdown(tconn->data.socket, SHUT_RDWR);
2713 sock_release(tconn->data.socket);
2714 tconn->data.socket = NULL;
2715 mutex_unlock(&tconn->data.mutex);
2716 }
2717 if (tconn->meta.socket) {
2718 mutex_lock(&tconn->meta.mutex);
2719 kernel_sock_shutdown(tconn->meta.socket, SHUT_RDWR);
2720 sock_release(tconn->meta.socket);
2721 tconn->meta.socket = NULL;
2722 mutex_unlock(&tconn->meta.mutex);
2723 }
2724 }
2725
2726 /* meta data management */
2727
2728 struct meta_data_on_disk {
2729 u64 la_size; /* last agreed size. */
2730 u64 uuid[UI_SIZE]; /* UUIDs. */
2731 u64 device_uuid;
2732 u64 reserved_u64_1;
2733 u32 flags; /* MDF */
2734 u32 magic;
2735 u32 md_size_sect;
2736 u32 al_offset; /* offset to this block */
2737 u32 al_nr_extents; /* important for restoring the AL */
2738 /* `-- act_log->nr_elements <-- ldev->dc.al_extents */
2739 u32 bm_offset; /* offset to the bitmap, from here */
2740 u32 bm_bytes_per_bit; /* BM_BLOCK_SIZE */
2741 u32 la_peer_max_bio_size; /* last peer max_bio_size */
2742 u32 reserved_u32[3];
2743
2744 } __packed;
2745
2746 /**
2747 * drbd_md_sync() - Writes the meta data super block if the MD_DIRTY flag bit is set
2748 * @mdev: DRBD device.
2749 */
2750 void drbd_md_sync(struct drbd_conf *mdev)
2751 {
2752 struct meta_data_on_disk *buffer;
2753 sector_t sector;
2754 int i;
2755
2756 del_timer(&mdev->md_sync_timer);
2757 /* timer may be rearmed by drbd_md_mark_dirty() now. */
2758 if (!test_and_clear_bit(MD_DIRTY, &mdev->flags))
2759 return;
2760
2761 /* We use here D_FAILED and not D_ATTACHING because we try to write
2762 * metadata even if we detach due to a disk failure! */
2763 if (!get_ldev_if_state(mdev, D_FAILED))
2764 return;
2765
2766 mutex_lock(&mdev->md_io_mutex);
2767 buffer = (struct meta_data_on_disk *)page_address(mdev->md_io_page);
2768 memset(buffer, 0, 512);
2769
2770 buffer->la_size = cpu_to_be64(drbd_get_capacity(mdev->this_bdev));
2771 for (i = UI_CURRENT; i < UI_SIZE; i++)
2772 buffer->uuid[i] = cpu_to_be64(mdev->ldev->md.uuid[i]);
2773 buffer->flags = cpu_to_be32(mdev->ldev->md.flags);
2774 buffer->magic = cpu_to_be32(DRBD_MD_MAGIC);
2775
2776 buffer->md_size_sect = cpu_to_be32(mdev->ldev->md.md_size_sect);
2777 buffer->al_offset = cpu_to_be32(mdev->ldev->md.al_offset);
2778 buffer->al_nr_extents = cpu_to_be32(mdev->act_log->nr_elements);
2779 buffer->bm_bytes_per_bit = cpu_to_be32(BM_BLOCK_SIZE);
2780 buffer->device_uuid = cpu_to_be64(mdev->ldev->md.device_uuid);
2781
2782 buffer->bm_offset = cpu_to_be32(mdev->ldev->md.bm_offset);
2783 buffer->la_peer_max_bio_size = cpu_to_be32(mdev->peer_max_bio_size);
2784
2785 D_ASSERT(drbd_md_ss__(mdev, mdev->ldev) == mdev->ldev->md.md_offset);
2786 sector = mdev->ldev->md.md_offset;
2787
2788 if (drbd_md_sync_page_io(mdev, mdev->ldev, sector, WRITE)) {
2789 /* this was a try anyways ... */
2790 dev_err(DEV, "meta data update failed!\n");
2791 drbd_chk_io_error(mdev, 1, true);
2792 }
2793
2794 /* Update mdev->ldev->md.la_size_sect,
2795 * since we updated it on metadata. */
2796 mdev->ldev->md.la_size_sect = drbd_get_capacity(mdev->this_bdev);
2797
2798 mutex_unlock(&mdev->md_io_mutex);
2799 put_ldev(mdev);
2800 }
2801
2802 /**
2803 * drbd_md_read() - Reads in the meta data super block
2804 * @mdev: DRBD device.
2805 * @bdev: Device from which the meta data should be read in.
2806 *
2807 * Return 0 (NO_ERROR) on success, and an enum drbd_ret_code in case
2808 * something goes wrong. Currently only: ERR_IO_MD_DISK, ERR_MD_INVALID.
2809 */
2810 int drbd_md_read(struct drbd_conf *mdev, struct drbd_backing_dev *bdev)
2811 {
2812 struct meta_data_on_disk *buffer;
2813 int i, rv = NO_ERROR;
2814
2815 if (!get_ldev_if_state(mdev, D_ATTACHING))
2816 return ERR_IO_MD_DISK;
2817
2818 mutex_lock(&mdev->md_io_mutex);
2819 buffer = (struct meta_data_on_disk *)page_address(mdev->md_io_page);
2820
2821 if (drbd_md_sync_page_io(mdev, bdev, bdev->md.md_offset, READ)) {
2822 /* NOTE: can't do normal error processing here as this is
2823 called BEFORE disk is attached */
2824 dev_err(DEV, "Error while reading metadata.\n");
2825 rv = ERR_IO_MD_DISK;
2826 goto err;
2827 }
2828
2829 if (buffer->magic != cpu_to_be32(DRBD_MD_MAGIC)) {
2830 dev_err(DEV, "Error while reading metadata, magic not found.\n");
2831 rv = ERR_MD_INVALID;
2832 goto err;
2833 }
2834 if (be32_to_cpu(buffer->al_offset) != bdev->md.al_offset) {
2835 dev_err(DEV, "unexpected al_offset: %d (expected %d)\n",
2836 be32_to_cpu(buffer->al_offset), bdev->md.al_offset);
2837 rv = ERR_MD_INVALID;
2838 goto err;
2839 }
2840 if (be32_to_cpu(buffer->bm_offset) != bdev->md.bm_offset) {
2841 dev_err(DEV, "unexpected bm_offset: %d (expected %d)\n",
2842 be32_to_cpu(buffer->bm_offset), bdev->md.bm_offset);
2843 rv = ERR_MD_INVALID;
2844 goto err;
2845 }
2846 if (be32_to_cpu(buffer->md_size_sect) != bdev->md.md_size_sect) {
2847 dev_err(DEV, "unexpected md_size: %u (expected %u)\n",
2848 be32_to_cpu(buffer->md_size_sect), bdev->md.md_size_sect);
2849 rv = ERR_MD_INVALID;
2850 goto err;
2851 }
2852
2853 if (be32_to_cpu(buffer->bm_bytes_per_bit) != BM_BLOCK_SIZE) {
2854 dev_err(DEV, "unexpected bm_bytes_per_bit: %u (expected %u)\n",
2855 be32_to_cpu(buffer->bm_bytes_per_bit), BM_BLOCK_SIZE);
2856 rv = ERR_MD_INVALID;
2857 goto err;
2858 }
2859
2860 bdev->md.la_size_sect = be64_to_cpu(buffer->la_size);
2861 for (i = UI_CURRENT; i < UI_SIZE; i++)
2862 bdev->md.uuid[i] = be64_to_cpu(buffer->uuid[i]);
2863 bdev->md.flags = be32_to_cpu(buffer->flags);
2864 bdev->dc.al_extents = be32_to_cpu(buffer->al_nr_extents);
2865 bdev->md.device_uuid = be64_to_cpu(buffer->device_uuid);
2866
2867 spin_lock_irq(&mdev->tconn->req_lock);
2868 if (mdev->state.conn < C_CONNECTED) {
2869 int peer;
2870 peer = be32_to_cpu(buffer->la_peer_max_bio_size);
2871 peer = max_t(int, peer, DRBD_MAX_BIO_SIZE_SAFE);
2872 mdev->peer_max_bio_size = peer;
2873 }
2874 spin_unlock_irq(&mdev->tconn->req_lock);
2875
2876 if (bdev->dc.al_extents < 7)
2877 bdev->dc.al_extents = 127;
2878
2879 err:
2880 mutex_unlock(&mdev->md_io_mutex);
2881 put_ldev(mdev);
2882
2883 return rv;
2884 }
2885
2886 /**
2887 * drbd_md_mark_dirty() - Mark meta data super block as dirty
2888 * @mdev: DRBD device.
2889 *
2890 * Call this function if you change anything that should be written to
2891 * the meta-data super block. This function sets MD_DIRTY, and starts a
2892 * timer that ensures that within five seconds you have to call drbd_md_sync().
2893 */
2894 #ifdef DEBUG
2895 void drbd_md_mark_dirty_(struct drbd_conf *mdev, unsigned int line, const char *func)
2896 {
2897 if (!test_and_set_bit(MD_DIRTY, &mdev->flags)) {
2898 mod_timer(&mdev->md_sync_timer, jiffies + HZ);
2899 mdev->last_md_mark_dirty.line = line;
2900 mdev->last_md_mark_dirty.func = func;
2901 }
2902 }
2903 #else
2904 void drbd_md_mark_dirty(struct drbd_conf *mdev)
2905 {
2906 if (!test_and_set_bit(MD_DIRTY, &mdev->flags))
2907 mod_timer(&mdev->md_sync_timer, jiffies + 5*HZ);
2908 }
2909 #endif
2910
2911 static void drbd_uuid_move_history(struct drbd_conf *mdev) __must_hold(local)
2912 {
2913 int i;
2914
2915 for (i = UI_HISTORY_START; i < UI_HISTORY_END; i++)
2916 mdev->ldev->md.uuid[i+1] = mdev->ldev->md.uuid[i];
2917 }
2918
2919 void _drbd_uuid_set(struct drbd_conf *mdev, int idx, u64 val) __must_hold(local)
2920 {
2921 if (idx == UI_CURRENT) {
2922 if (mdev->state.role == R_PRIMARY)
2923 val |= 1;
2924 else
2925 val &= ~((u64)1);
2926
2927 drbd_set_ed_uuid(mdev, val);
2928 }
2929
2930 mdev->ldev->md.uuid[idx] = val;
2931 drbd_md_mark_dirty(mdev);
2932 }
2933
2934
2935 void drbd_uuid_set(struct drbd_conf *mdev, int idx, u64 val) __must_hold(local)
2936 {
2937 if (mdev->ldev->md.uuid[idx]) {
2938 drbd_uuid_move_history(mdev);
2939 mdev->ldev->md.uuid[UI_HISTORY_START] = mdev->ldev->md.uuid[idx];
2940 }
2941 _drbd_uuid_set(mdev, idx, val);
2942 }
2943
2944 /**
2945 * drbd_uuid_new_current() - Creates a new current UUID
2946 * @mdev: DRBD device.
2947 *
2948 * Creates a new current UUID, and rotates the old current UUID into
2949 * the bitmap slot. Causes an incremental resync upon next connect.
2950 */
2951 void drbd_uuid_new_current(struct drbd_conf *mdev) __must_hold(local)
2952 {
2953 u64 val;
2954 unsigned long long bm_uuid = mdev->ldev->md.uuid[UI_BITMAP];
2955
2956 if (bm_uuid)
2957 dev_warn(DEV, "bm UUID was already set: %llX\n", bm_uuid);
2958
2959 mdev->ldev->md.uuid[UI_BITMAP] = mdev->ldev->md.uuid[UI_CURRENT];
2960
2961 get_random_bytes(&val, sizeof(u64));
2962 _drbd_uuid_set(mdev, UI_CURRENT, val);
2963 drbd_print_uuids(mdev, "new current UUID");
2964 /* get it to stable storage _now_ */
2965 drbd_md_sync(mdev);
2966 }
2967
2968 void drbd_uuid_set_bm(struct drbd_conf *mdev, u64 val) __must_hold(local)
2969 {
2970 if (mdev->ldev->md.uuid[UI_BITMAP] == 0 && val == 0)
2971 return;
2972
2973 if (val == 0) {
2974 drbd_uuid_move_history(mdev);
2975 mdev->ldev->md.uuid[UI_HISTORY_START] = mdev->ldev->md.uuid[UI_BITMAP];
2976 mdev->ldev->md.uuid[UI_BITMAP] = 0;
2977 } else {
2978 unsigned long long bm_uuid = mdev->ldev->md.uuid[UI_BITMAP];
2979 if (bm_uuid)
2980 dev_warn(DEV, "bm UUID was already set: %llX\n", bm_uuid);
2981
2982 mdev->ldev->md.uuid[UI_BITMAP] = val & ~((u64)1);
2983 }
2984 drbd_md_mark_dirty(mdev);
2985 }
2986
2987 /**
2988 * drbd_bmio_set_n_write() - io_fn for drbd_queue_bitmap_io() or drbd_bitmap_io()
2989 * @mdev: DRBD device.
2990 *
2991 * Sets all bits in the bitmap and writes the whole bitmap to stable storage.
2992 */
2993 int drbd_bmio_set_n_write(struct drbd_conf *mdev)
2994 {
2995 int rv = -EIO;
2996
2997 if (get_ldev_if_state(mdev, D_ATTACHING)) {
2998 drbd_md_set_flag(mdev, MDF_FULL_SYNC);
2999 drbd_md_sync(mdev);
3000 drbd_bm_set_all(mdev);
3001
3002 rv = drbd_bm_write(mdev);
3003
3004 if (!rv) {
3005 drbd_md_clear_flag(mdev, MDF_FULL_SYNC);
3006 drbd_md_sync(mdev);
3007 }
3008
3009 put_ldev(mdev);
3010 }
3011
3012 return rv;
3013 }
3014
3015 /**
3016 * drbd_bmio_clear_n_write() - io_fn for drbd_queue_bitmap_io() or drbd_bitmap_io()
3017 * @mdev: DRBD device.
3018 *
3019 * Clears all bits in the bitmap and writes the whole bitmap to stable storage.
3020 */
3021 int drbd_bmio_clear_n_write(struct drbd_conf *mdev)
3022 {
3023 int rv = -EIO;
3024
3025 drbd_resume_al(mdev);
3026 if (get_ldev_if_state(mdev, D_ATTACHING)) {
3027 drbd_bm_clear_all(mdev);
3028 rv = drbd_bm_write(mdev);
3029 put_ldev(mdev);
3030 }
3031
3032 return rv;
3033 }
3034
3035 static int w_bitmap_io(struct drbd_work *w, int unused)
3036 {
3037 struct bm_io_work *work = container_of(w, struct bm_io_work, w);
3038 struct drbd_conf *mdev = w->mdev;
3039 int rv = -EIO;
3040
3041 D_ASSERT(atomic_read(&mdev->ap_bio_cnt) == 0);
3042
3043 if (get_ldev(mdev)) {
3044 drbd_bm_lock(mdev, work->why, work->flags);
3045 rv = work->io_fn(mdev);
3046 drbd_bm_unlock(mdev);
3047 put_ldev(mdev);
3048 }
3049
3050 clear_bit_unlock(BITMAP_IO, &mdev->flags);
3051 wake_up(&mdev->misc_wait);
3052
3053 if (work->done)
3054 work->done(mdev, rv);
3055
3056 clear_bit(BITMAP_IO_QUEUED, &mdev->flags);
3057 work->why = NULL;
3058 work->flags = 0;
3059
3060 return 0;
3061 }
3062
3063 void drbd_ldev_destroy(struct drbd_conf *mdev)
3064 {
3065 lc_destroy(mdev->resync);
3066 mdev->resync = NULL;
3067 lc_destroy(mdev->act_log);
3068 mdev->act_log = NULL;
3069 __no_warn(local,
3070 drbd_free_bc(mdev->ldev);
3071 mdev->ldev = NULL;);
3072
3073 clear_bit(GO_DISKLESS, &mdev->flags);
3074 }
3075
3076 static int w_go_diskless(struct drbd_work *w, int unused)
3077 {
3078 struct drbd_conf *mdev = w->mdev;
3079
3080 D_ASSERT(mdev->state.disk == D_FAILED);
3081 /* we cannot assert local_cnt == 0 here, as get_ldev_if_state will
3082 * inc/dec it frequently. Once we are D_DISKLESS, no one will touch
3083 * the protected members anymore, though, so once put_ldev reaches zero
3084 * again, it will be safe to free them. */
3085 drbd_force_state(mdev, NS(disk, D_DISKLESS));
3086 return 0;
3087 }
3088
3089 void drbd_go_diskless(struct drbd_conf *mdev)
3090 {
3091 D_ASSERT(mdev->state.disk == D_FAILED);
3092 if (!test_and_set_bit(GO_DISKLESS, &mdev->flags))
3093 drbd_queue_work(&mdev->tconn->data.work, &mdev->go_diskless);
3094 }
3095
3096 /**
3097 * drbd_queue_bitmap_io() - Queues an IO operation on the whole bitmap
3098 * @mdev: DRBD device.
3099 * @io_fn: IO callback to be called when bitmap IO is possible
3100 * @done: callback to be called after the bitmap IO was performed
3101 * @why: Descriptive text of the reason for doing the IO
3102 *
3103 * While IO on the bitmap happens we freeze application IO thus we ensure
3104 * that drbd_set_out_of_sync() can not be called. This function MAY ONLY be
3105 * called from worker context. It MUST NOT be used while a previous such
3106 * work is still pending!
3107 */
3108 void drbd_queue_bitmap_io(struct drbd_conf *mdev,
3109 int (*io_fn)(struct drbd_conf *),
3110 void (*done)(struct drbd_conf *, int),
3111 char *why, enum bm_flag flags)
3112 {
3113 D_ASSERT(current == mdev->tconn->worker.task);
3114
3115 D_ASSERT(!test_bit(BITMAP_IO_QUEUED, &mdev->flags));
3116 D_ASSERT(!test_bit(BITMAP_IO, &mdev->flags));
3117 D_ASSERT(list_empty(&mdev->bm_io_work.w.list));
3118 if (mdev->bm_io_work.why)
3119 dev_err(DEV, "FIXME going to queue '%s' but '%s' still pending?\n",
3120 why, mdev->bm_io_work.why);
3121
3122 mdev->bm_io_work.io_fn = io_fn;
3123 mdev->bm_io_work.done = done;
3124 mdev->bm_io_work.why = why;
3125 mdev->bm_io_work.flags = flags;
3126
3127 spin_lock_irq(&mdev->tconn->req_lock);
3128 set_bit(BITMAP_IO, &mdev->flags);
3129 if (atomic_read(&mdev->ap_bio_cnt) == 0) {
3130 if (!test_and_set_bit(BITMAP_IO_QUEUED, &mdev->flags))
3131 drbd_queue_work(&mdev->tconn->data.work, &mdev->bm_io_work.w);
3132 }
3133 spin_unlock_irq(&mdev->tconn->req_lock);
3134 }
3135
3136 /**
3137 * drbd_bitmap_io() - Does an IO operation on the whole bitmap
3138 * @mdev: DRBD device.
3139 * @io_fn: IO callback to be called when bitmap IO is possible
3140 * @why: Descriptive text of the reason for doing the IO
3141 *
3142 * freezes application IO while that the actual IO operations runs. This
3143 * functions MAY NOT be called from worker context.
3144 */
3145 int drbd_bitmap_io(struct drbd_conf *mdev, int (*io_fn)(struct drbd_conf *),
3146 char *why, enum bm_flag flags)
3147 {
3148 int rv;
3149
3150 D_ASSERT(current != mdev->tconn->worker.task);
3151
3152 if ((flags & BM_LOCKED_SET_ALLOWED) == 0)
3153 drbd_suspend_io(mdev);
3154
3155 drbd_bm_lock(mdev, why, flags);
3156 rv = io_fn(mdev);
3157 drbd_bm_unlock(mdev);
3158
3159 if ((flags & BM_LOCKED_SET_ALLOWED) == 0)
3160 drbd_resume_io(mdev);
3161
3162 return rv;
3163 }
3164
3165 void drbd_md_set_flag(struct drbd_conf *mdev, int flag) __must_hold(local)
3166 {
3167 if ((mdev->ldev->md.flags & flag) != flag) {
3168 drbd_md_mark_dirty(mdev);
3169 mdev->ldev->md.flags |= flag;
3170 }
3171 }
3172
3173 void drbd_md_clear_flag(struct drbd_conf *mdev, int flag) __must_hold(local)
3174 {
3175 if ((mdev->ldev->md.flags & flag) != 0) {
3176 drbd_md_mark_dirty(mdev);
3177 mdev->ldev->md.flags &= ~flag;
3178 }
3179 }
3180 int drbd_md_test_flag(struct drbd_backing_dev *bdev, int flag)
3181 {
3182 return (bdev->md.flags & flag) != 0;
3183 }
3184
3185 static void md_sync_timer_fn(unsigned long data)
3186 {
3187 struct drbd_conf *mdev = (struct drbd_conf *) data;
3188
3189 drbd_queue_work_front(&mdev->tconn->data.work, &mdev->md_sync_work);
3190 }
3191
3192 static int w_md_sync(struct drbd_work *w, int unused)
3193 {
3194 struct drbd_conf *mdev = w->mdev;
3195
3196 dev_warn(DEV, "md_sync_timer expired! Worker calls drbd_md_sync().\n");
3197 #ifdef DEBUG
3198 dev_warn(DEV, "last md_mark_dirty: %s:%u\n",
3199 mdev->last_md_mark_dirty.func, mdev->last_md_mark_dirty.line);
3200 #endif
3201 drbd_md_sync(mdev);
3202 return 0;
3203 }
3204
3205 const char *cmdname(enum drbd_packet cmd)
3206 {
3207 /* THINK may need to become several global tables
3208 * when we want to support more than
3209 * one PRO_VERSION */
3210 static const char *cmdnames[] = {
3211 [P_DATA] = "Data",
3212 [P_DATA_REPLY] = "DataReply",
3213 [P_RS_DATA_REPLY] = "RSDataReply",
3214 [P_BARRIER] = "Barrier",
3215 [P_BITMAP] = "ReportBitMap",
3216 [P_BECOME_SYNC_TARGET] = "BecomeSyncTarget",
3217 [P_BECOME_SYNC_SOURCE] = "BecomeSyncSource",
3218 [P_UNPLUG_REMOTE] = "UnplugRemote",
3219 [P_DATA_REQUEST] = "DataRequest",
3220 [P_RS_DATA_REQUEST] = "RSDataRequest",
3221 [P_SYNC_PARAM] = "SyncParam",
3222 [P_SYNC_PARAM89] = "SyncParam89",
3223 [P_PROTOCOL] = "ReportProtocol",
3224 [P_UUIDS] = "ReportUUIDs",
3225 [P_SIZES] = "ReportSizes",
3226 [P_STATE] = "ReportState",
3227 [P_SYNC_UUID] = "ReportSyncUUID",
3228 [P_AUTH_CHALLENGE] = "AuthChallenge",
3229 [P_AUTH_RESPONSE] = "AuthResponse",
3230 [P_PING] = "Ping",
3231 [P_PING_ACK] = "PingAck",
3232 [P_RECV_ACK] = "RecvAck",
3233 [P_WRITE_ACK] = "WriteAck",
3234 [P_RS_WRITE_ACK] = "RSWriteAck",
3235 [P_DISCARD_WRITE] = "DiscardWrite",
3236 [P_NEG_ACK] = "NegAck",
3237 [P_NEG_DREPLY] = "NegDReply",
3238 [P_NEG_RS_DREPLY] = "NegRSDReply",
3239 [P_BARRIER_ACK] = "BarrierAck",
3240 [P_STATE_CHG_REQ] = "StateChgRequest",
3241 [P_STATE_CHG_REPLY] = "StateChgReply",
3242 [P_OV_REQUEST] = "OVRequest",
3243 [P_OV_REPLY] = "OVReply",
3244 [P_OV_RESULT] = "OVResult",
3245 [P_CSUM_RS_REQUEST] = "CsumRSRequest",
3246 [P_RS_IS_IN_SYNC] = "CsumRSIsInSync",
3247 [P_COMPRESSED_BITMAP] = "CBitmap",
3248 [P_DELAY_PROBE] = "DelayProbe",
3249 [P_OUT_OF_SYNC] = "OutOfSync",
3250 [P_RETRY_WRITE] = "RetryWrite",
3251 [P_RS_CANCEL] = "RSCancel",
3252 [P_CONN_ST_CHG_REQ] = "conn_st_chg_req",
3253 [P_CONN_ST_CHG_REPLY] = "conn_st_chg_reply",
3254
3255 /* enum drbd_packet, but not commands - obsoleted flags:
3256 * P_MAY_IGNORE
3257 * P_MAX_OPT_CMD
3258 */
3259 };
3260
3261 /* too big for the array: 0xfffX */
3262 if (cmd == P_INITIAL_META)
3263 return "InitialMeta";
3264 if (cmd == P_INITIAL_DATA)
3265 return "InitialData";
3266 if (cmd == P_CONNECTION_FEATURES)
3267 return "ConnectionFeatures";
3268 if (cmd >= ARRAY_SIZE(cmdnames))
3269 return "Unknown";
3270 return cmdnames[cmd];
3271 }
3272
3273 /**
3274 * drbd_wait_misc - wait for a request to make progress
3275 * @mdev: device associated with the request
3276 * @i: the struct drbd_interval embedded in struct drbd_request or
3277 * struct drbd_peer_request
3278 */
3279 int drbd_wait_misc(struct drbd_conf *mdev, struct drbd_interval *i)
3280 {
3281 struct net_conf *nc;
3282 DEFINE_WAIT(wait);
3283 long timeout;
3284
3285 rcu_read_lock();
3286 nc = rcu_dereference(mdev->tconn->net_conf);
3287 if (!nc) {
3288 rcu_read_unlock();
3289 return -ETIMEDOUT;
3290 }
3291 timeout = nc->ko_count ? nc->timeout * HZ / 10 * nc->ko_count : MAX_SCHEDULE_TIMEOUT;
3292 rcu_read_unlock();
3293
3294 /* Indicate to wake up mdev->misc_wait on progress. */
3295 i->waiting = true;
3296 prepare_to_wait(&mdev->misc_wait, &wait, TASK_INTERRUPTIBLE);
3297 spin_unlock_irq(&mdev->tconn->req_lock);
3298 timeout = schedule_timeout(timeout);
3299 finish_wait(&mdev->misc_wait, &wait);
3300 spin_lock_irq(&mdev->tconn->req_lock);
3301 if (!timeout || mdev->state.conn < C_CONNECTED)
3302 return -ETIMEDOUT;
3303 if (signal_pending(current))
3304 return -ERESTARTSYS;
3305 return 0;
3306 }
3307
3308 #ifdef CONFIG_DRBD_FAULT_INJECTION
3309 /* Fault insertion support including random number generator shamelessly
3310 * stolen from kernel/rcutorture.c */
3311 struct fault_random_state {
3312 unsigned long state;
3313 unsigned long count;
3314 };
3315
3316 #define FAULT_RANDOM_MULT 39916801 /* prime */
3317 #define FAULT_RANDOM_ADD 479001701 /* prime */
3318 #define FAULT_RANDOM_REFRESH 10000
3319
3320 /*
3321 * Crude but fast random-number generator. Uses a linear congruential
3322 * generator, with occasional help from get_random_bytes().
3323 */
3324 static unsigned long
3325 _drbd_fault_random(struct fault_random_state *rsp)
3326 {
3327 long refresh;
3328
3329 if (!rsp->count--) {
3330 get_random_bytes(&refresh, sizeof(refresh));
3331 rsp->state += refresh;
3332 rsp->count = FAULT_RANDOM_REFRESH;
3333 }
3334 rsp->state = rsp->state * FAULT_RANDOM_MULT + FAULT_RANDOM_ADD;
3335 return swahw32(rsp->state);
3336 }
3337
3338 static char *
3339 _drbd_fault_str(unsigned int type) {
3340 static char *_faults[] = {
3341 [DRBD_FAULT_MD_WR] = "Meta-data write",
3342 [DRBD_FAULT_MD_RD] = "Meta-data read",
3343 [DRBD_FAULT_RS_WR] = "Resync write",
3344 [DRBD_FAULT_RS_RD] = "Resync read",
3345 [DRBD_FAULT_DT_WR] = "Data write",
3346 [DRBD_FAULT_DT_RD] = "Data read",
3347 [DRBD_FAULT_DT_RA] = "Data read ahead",
3348 [DRBD_FAULT_BM_ALLOC] = "BM allocation",
3349 [DRBD_FAULT_AL_EE] = "EE allocation",
3350 [DRBD_FAULT_RECEIVE] = "receive data corruption",
3351 };
3352
3353 return (type < DRBD_FAULT_MAX) ? _faults[type] : "**Unknown**";
3354 }
3355
3356 unsigned int
3357 _drbd_insert_fault(struct drbd_conf *mdev, unsigned int type)
3358 {
3359 static struct fault_random_state rrs = {0, 0};
3360
3361 unsigned int ret = (
3362 (fault_devs == 0 ||
3363 ((1 << mdev_to_minor(mdev)) & fault_devs) != 0) &&
3364 (((_drbd_fault_random(&rrs) % 100) + 1) <= fault_rate));
3365
3366 if (ret) {
3367 fault_count++;
3368
3369 if (__ratelimit(&drbd_ratelimit_state))
3370 dev_warn(DEV, "***Simulating %s failure\n",
3371 _drbd_fault_str(type));
3372 }
3373
3374 return ret;
3375 }
3376 #endif
3377
3378 const char *drbd_buildtag(void)
3379 {
3380 /* DRBD built from external sources has here a reference to the
3381 git hash of the source code. */
3382
3383 static char buildtag[38] = "\0uilt-in";
3384
3385 if (buildtag[0] == 0) {
3386 #ifdef CONFIG_MODULES
3387 if (THIS_MODULE != NULL)
3388 sprintf(buildtag, "srcversion: %-24s", THIS_MODULE->srcversion);
3389 else
3390 #endif
3391 buildtag[0] = 'b';
3392 }
3393
3394 return buildtag;
3395 }
3396
3397 module_init(drbd_init)
3398 module_exit(drbd_cleanup)
3399
3400 EXPORT_SYMBOL(drbd_conn_str);
3401 EXPORT_SYMBOL(drbd_role_str);
3402 EXPORT_SYMBOL(drbd_disk_str);
3403 EXPORT_SYMBOL(drbd_set_st_err_str);
This page took 0.105379 seconds and 5 git commands to generate.