drbd: implicitly create unconfigured devices on sync-after dependencies
[deliverable/linux.git] / drivers / block / drbd / drbd_worker.c
CommitLineData
b411b363
PR
1/*
2 drbd_worker.c
3
4 This file is part of DRBD by Philipp Reisner and Lars Ellenberg.
5
6 Copyright (C) 2001-2008, LINBIT Information Technologies GmbH.
7 Copyright (C) 1999-2008, Philipp Reisner <philipp.reisner@linbit.com>.
8 Copyright (C) 2002-2008, Lars Ellenberg <lars.ellenberg@linbit.com>.
9
10 drbd is free software; you can redistribute it and/or modify
11 it under the terms of the GNU General Public License as published by
12 the Free Software Foundation; either version 2, or (at your option)
13 any later version.
14
15 drbd is distributed in the hope that it will be useful,
16 but WITHOUT ANY WARRANTY; without even the implied warranty of
17 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18 GNU General Public License for more details.
19
20 You should have received a copy of the GNU General Public License
21 along with drbd; see the file COPYING. If not, write to
22 the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
23
24 */
25
b411b363 26#include <linux/module.h>
b411b363
PR
27#include <linux/drbd.h>
28#include <linux/sched.h>
29#include <linux/smp_lock.h>
30#include <linux/wait.h>
31#include <linux/mm.h>
32#include <linux/memcontrol.h>
33#include <linux/mm_inline.h>
34#include <linux/slab.h>
35#include <linux/random.h>
b411b363
PR
36#include <linux/string.h>
37#include <linux/scatterlist.h>
38
39#include "drbd_int.h"
40#include "drbd_req.h"
b411b363 41
b411b363
PR
42static int w_make_ov_request(struct drbd_conf *mdev, struct drbd_work *w, int cancel);
43
44
45
46/* defined here:
47 drbd_md_io_complete
45bb912b 48 drbd_endio_sec
b411b363
PR
49 drbd_endio_pri
50
51 * more endio handlers:
52 atodb_endio in drbd_actlog.c
53 drbd_bm_async_io_complete in drbd_bitmap.c
54
55 * For all these callbacks, note the following:
56 * The callbacks will be called in irq context by the IDE drivers,
57 * and in Softirqs/Tasklets/BH context by the SCSI drivers.
58 * Try to get the locking right :)
59 *
60 */
61
62
63/* About the global_state_lock
64 Each state transition on an device holds a read lock. In case we have
65 to evaluate the sync after dependencies, we grab a write lock, because
66 we need stable states on all devices for that. */
67rwlock_t global_state_lock;
68
69/* used for synchronous meta data and bitmap IO
70 * submitted by drbd_md_sync_page_io()
71 */
72void drbd_md_io_complete(struct bio *bio, int error)
73{
74 struct drbd_md_io *md_io;
75
76 md_io = (struct drbd_md_io *)bio->bi_private;
77 md_io->error = error;
78
b411b363
PR
79 complete(&md_io->event);
80}
81
82/* reads on behalf of the partner,
83 * "submitted" by the receiver
84 */
45bb912b 85void drbd_endio_read_sec_final(struct drbd_epoch_entry *e) __releases(local)
b411b363
PR
86{
87 unsigned long flags = 0;
45bb912b 88 struct drbd_conf *mdev = e->mdev;
b411b363
PR
89
90 D_ASSERT(e->block_id != ID_VACANT);
91
b411b363
PR
92 spin_lock_irqsave(&mdev->req_lock, flags);
93 mdev->read_cnt += e->size >> 9;
94 list_del(&e->w.list);
95 if (list_empty(&mdev->read_ee))
96 wake_up(&mdev->ee_wait);
45bb912b
LE
97 if (test_bit(__EE_WAS_ERROR, &e->flags))
98 __drbd_chk_io_error(mdev, FALSE);
b411b363
PR
99 spin_unlock_irqrestore(&mdev->req_lock, flags);
100
b411b363
PR
101 drbd_queue_work(&mdev->data.work, &e->w);
102 put_ldev(mdev);
b411b363
PR
103}
104
45bb912b
LE
105static int is_failed_barrier(int ee_flags)
106{
107 return (ee_flags & (EE_IS_BARRIER|EE_WAS_ERROR|EE_RESUBMITTED))
108 == (EE_IS_BARRIER|EE_WAS_ERROR);
109}
110
b411b363 111/* writes on behalf of the partner, or resync writes,
45bb912b
LE
112 * "submitted" by the receiver, final stage. */
113static void drbd_endio_write_sec_final(struct drbd_epoch_entry *e) __releases(local)
b411b363
PR
114{
115 unsigned long flags = 0;
45bb912b 116 struct drbd_conf *mdev = e->mdev;
b411b363
PR
117 sector_t e_sector;
118 int do_wake;
119 int is_syncer_req;
120 int do_al_complete_io;
b411b363 121
45bb912b
LE
122 /* if this is a failed barrier request, disable use of barriers,
123 * and schedule for resubmission */
124 if (is_failed_barrier(e->flags)) {
b411b363
PR
125 drbd_bump_write_ordering(mdev, WO_bdev_flush);
126 spin_lock_irqsave(&mdev->req_lock, flags);
127 list_del(&e->w.list);
fc8ce194 128 e->flags = (e->flags & ~EE_WAS_ERROR) | EE_RESUBMITTED;
b411b363
PR
129 e->w.cb = w_e_reissue;
130 /* put_ldev actually happens below, once we come here again. */
131 __release(local);
132 spin_unlock_irqrestore(&mdev->req_lock, flags);
133 drbd_queue_work(&mdev->data.work, &e->w);
134 return;
135 }
136
137 D_ASSERT(e->block_id != ID_VACANT);
138
b411b363
PR
139 /* after we moved e to done_ee,
140 * we may no longer access it,
141 * it may be freed/reused already!
142 * (as soon as we release the req_lock) */
143 e_sector = e->sector;
144 do_al_complete_io = e->flags & EE_CALL_AL_COMPLETE_IO;
45bb912b 145 is_syncer_req = is_syncer_block_id(e->block_id);
b411b363 146
45bb912b
LE
147 spin_lock_irqsave(&mdev->req_lock, flags);
148 mdev->writ_cnt += e->size >> 9;
b411b363
PR
149 list_del(&e->w.list); /* has been on active_ee or sync_ee */
150 list_add_tail(&e->w.list, &mdev->done_ee);
151
b411b363
PR
152 /* No hlist_del_init(&e->colision) here, we did not send the Ack yet,
153 * neither did we wake possibly waiting conflicting requests.
154 * done from "drbd_process_done_ee" within the appropriate w.cb
155 * (e_end_block/e_end_resync_block) or from _drbd_clear_done_ee */
156
157 do_wake = is_syncer_req
158 ? list_empty(&mdev->sync_ee)
159 : list_empty(&mdev->active_ee);
160
45bb912b 161 if (test_bit(__EE_WAS_ERROR, &e->flags))
b411b363
PR
162 __drbd_chk_io_error(mdev, FALSE);
163 spin_unlock_irqrestore(&mdev->req_lock, flags);
164
165 if (is_syncer_req)
166 drbd_rs_complete_io(mdev, e_sector);
167
168 if (do_wake)
169 wake_up(&mdev->ee_wait);
170
171 if (do_al_complete_io)
172 drbd_al_complete_io(mdev, e_sector);
173
174 wake_asender(mdev);
175 put_ldev(mdev);
45bb912b 176}
b411b363 177
45bb912b
LE
178/* writes on behalf of the partner, or resync writes,
179 * "submitted" by the receiver.
180 */
181void drbd_endio_sec(struct bio *bio, int error)
182{
183 struct drbd_epoch_entry *e = bio->bi_private;
184 struct drbd_conf *mdev = e->mdev;
185 int uptodate = bio_flagged(bio, BIO_UPTODATE);
186 int is_write = bio_data_dir(bio) == WRITE;
187
188 if (error)
189 dev_warn(DEV, "%s: error=%d s=%llus\n",
190 is_write ? "write" : "read", error,
191 (unsigned long long)e->sector);
192 if (!error && !uptodate) {
193 dev_warn(DEV, "%s: setting error to -EIO s=%llus\n",
194 is_write ? "write" : "read",
195 (unsigned long long)e->sector);
196 /* strange behavior of some lower level drivers...
197 * fail the request by clearing the uptodate flag,
198 * but do not return any error?! */
199 error = -EIO;
200 }
201
202 if (error)
203 set_bit(__EE_WAS_ERROR, &e->flags);
204
205 bio_put(bio); /* no need for the bio anymore */
206 if (atomic_dec_and_test(&e->pending_bios)) {
207 if (is_write)
208 drbd_endio_write_sec_final(e);
209 else
210 drbd_endio_read_sec_final(e);
211 }
b411b363
PR
212}
213
214/* read, readA or write requests on R_PRIMARY coming from drbd_make_request
215 */
216void drbd_endio_pri(struct bio *bio, int error)
217{
b411b363
PR
218 struct drbd_request *req = bio->bi_private;
219 struct drbd_conf *mdev = req->mdev;
b411b363
PR
220 enum drbd_req_event what;
221 int uptodate = bio_flagged(bio, BIO_UPTODATE);
222
b411b363
PR
223 if (!error && !uptodate) {
224 dev_warn(DEV, "p %s: setting error to -EIO\n",
225 bio_data_dir(bio) == WRITE ? "write" : "read");
226 /* strange behavior of some lower level drivers...
227 * fail the request by clearing the uptodate flag,
228 * but do not return any error?! */
229 error = -EIO;
230 }
231
b411b363
PR
232 /* to avoid recursion in __req_mod */
233 if (unlikely(error)) {
234 what = (bio_data_dir(bio) == WRITE)
235 ? write_completed_with_error
5c3c7e64 236 : (bio_rw(bio) == READ)
b411b363
PR
237 ? read_completed_with_error
238 : read_ahead_completed_with_error;
239 } else
240 what = completed_ok;
241
242 bio_put(req->private_bio);
243 req->private_bio = ERR_PTR(error);
244
0f0601f4 245 req_mod(req, what);
b411b363
PR
246}
247
b411b363
PR
248int w_read_retry_remote(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
249{
250 struct drbd_request *req = container_of(w, struct drbd_request, w);
251
252 /* We should not detach for read io-error,
253 * but try to WRITE the P_DATA_REPLY to the failed location,
254 * to give the disk the chance to relocate that block */
255
256 spin_lock_irq(&mdev->req_lock);
d255e5ff
LE
257 if (cancel || mdev->state.pdsk != D_UP_TO_DATE) {
258 _req_mod(req, read_retry_remote_canceled);
b411b363 259 spin_unlock_irq(&mdev->req_lock);
b411b363
PR
260 return 1;
261 }
262 spin_unlock_irq(&mdev->req_lock);
263
264 return w_send_read_req(mdev, w, 0);
265}
266
267int w_resync_inactive(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
268{
269 ERR_IF(cancel) return 1;
270 dev_err(DEV, "resync inactive, but callback triggered??\n");
271 return 1; /* Simply ignore this! */
272}
273
45bb912b
LE
274void drbd_csum_ee(struct drbd_conf *mdev, struct crypto_hash *tfm, struct drbd_epoch_entry *e, void *digest)
275{
276 struct hash_desc desc;
277 struct scatterlist sg;
278 struct page *page = e->pages;
279 struct page *tmp;
280 unsigned len;
281
282 desc.tfm = tfm;
283 desc.flags = 0;
284
285 sg_init_table(&sg, 1);
286 crypto_hash_init(&desc);
287
288 while ((tmp = page_chain_next(page))) {
289 /* all but the last page will be fully used */
290 sg_set_page(&sg, page, PAGE_SIZE, 0);
291 crypto_hash_update(&desc, &sg, sg.length);
292 page = tmp;
293 }
294 /* and now the last, possibly only partially used page */
295 len = e->size & (PAGE_SIZE - 1);
296 sg_set_page(&sg, page, len ?: PAGE_SIZE, 0);
297 crypto_hash_update(&desc, &sg, sg.length);
298 crypto_hash_final(&desc, digest);
299}
300
301void drbd_csum_bio(struct drbd_conf *mdev, struct crypto_hash *tfm, struct bio *bio, void *digest)
b411b363
PR
302{
303 struct hash_desc desc;
304 struct scatterlist sg;
305 struct bio_vec *bvec;
306 int i;
307
308 desc.tfm = tfm;
309 desc.flags = 0;
310
311 sg_init_table(&sg, 1);
312 crypto_hash_init(&desc);
313
314 __bio_for_each_segment(bvec, bio, i, 0) {
315 sg_set_page(&sg, bvec->bv_page, bvec->bv_len, bvec->bv_offset);
316 crypto_hash_update(&desc, &sg, sg.length);
317 }
318 crypto_hash_final(&desc, digest);
319}
320
321static int w_e_send_csum(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
322{
323 struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w);
324 int digest_size;
325 void *digest;
326 int ok;
327
328 D_ASSERT(e->block_id == DRBD_MAGIC + 0xbeef);
329
330 if (unlikely(cancel)) {
331 drbd_free_ee(mdev, e);
332 return 1;
333 }
334
45bb912b 335 if (likely((e->flags & EE_WAS_ERROR) == 0)) {
b411b363
PR
336 digest_size = crypto_hash_digestsize(mdev->csums_tfm);
337 digest = kmalloc(digest_size, GFP_NOIO);
338 if (digest) {
45bb912b 339 drbd_csum_ee(mdev, mdev->csums_tfm, e, digest);
b411b363
PR
340
341 inc_rs_pending(mdev);
342 ok = drbd_send_drequest_csum(mdev,
343 e->sector,
344 e->size,
345 digest,
346 digest_size,
347 P_CSUM_RS_REQUEST);
348 kfree(digest);
349 } else {
350 dev_err(DEV, "kmalloc() of digest failed.\n");
351 ok = 0;
352 }
353 } else
354 ok = 1;
355
356 drbd_free_ee(mdev, e);
357
358 if (unlikely(!ok))
359 dev_err(DEV, "drbd_send_drequest(..., csum) failed\n");
360 return ok;
361}
362
363#define GFP_TRY (__GFP_HIGHMEM | __GFP_NOWARN)
364
365static int read_for_csum(struct drbd_conf *mdev, sector_t sector, int size)
366{
367 struct drbd_epoch_entry *e;
368
369 if (!get_ldev(mdev))
80a40e43 370 return -EIO;
b411b363 371
0f0601f4
LE
372 if (drbd_rs_should_slow_down(mdev))
373 goto defer;
374
b411b363
PR
375 /* GFP_TRY, because if there is no memory available right now, this may
376 * be rescheduled for later. It is "only" background resync, after all. */
377 e = drbd_alloc_ee(mdev, DRBD_MAGIC+0xbeef, sector, size, GFP_TRY);
45bb912b 378 if (!e)
80a40e43 379 goto defer;
b411b363 380
80a40e43 381 e->w.cb = w_e_send_csum;
b411b363
PR
382 spin_lock_irq(&mdev->req_lock);
383 list_add(&e->w.list, &mdev->read_ee);
384 spin_unlock_irq(&mdev->req_lock);
385
0f0601f4 386 atomic_add(size >> 9, &mdev->rs_sect_ev);
45bb912b 387 if (drbd_submit_ee(mdev, e, READ, DRBD_FAULT_RS_RD) == 0)
80a40e43 388 return 0;
b411b363 389
45bb912b 390 drbd_free_ee(mdev, e);
80a40e43 391defer:
45bb912b 392 put_ldev(mdev);
80a40e43 393 return -EAGAIN;
b411b363
PR
394}
395
396void resync_timer_fn(unsigned long data)
397{
398 unsigned long flags;
399 struct drbd_conf *mdev = (struct drbd_conf *) data;
400 int queue;
401
402 spin_lock_irqsave(&mdev->req_lock, flags);
403
404 if (likely(!test_and_clear_bit(STOP_SYNC_TIMER, &mdev->flags))) {
405 queue = 1;
406 if (mdev->state.conn == C_VERIFY_S)
407 mdev->resync_work.cb = w_make_ov_request;
408 else
409 mdev->resync_work.cb = w_make_resync_request;
410 } else {
411 queue = 0;
412 mdev->resync_work.cb = w_resync_inactive;
413 }
414
415 spin_unlock_irqrestore(&mdev->req_lock, flags);
416
417 /* harmless race: list_empty outside data.work.q_lock */
418 if (list_empty(&mdev->resync_work.list) && queue)
419 drbd_queue_work(&mdev->data.work, &mdev->resync_work);
420}
421
778f271d
PR
422static void fifo_set(struct fifo_buffer *fb, int value)
423{
424 int i;
425
426 for (i = 0; i < fb->size; i++)
427 fb->values[i] += value;
428}
429
430static int fifo_push(struct fifo_buffer *fb, int value)
431{
432 int ov;
433
434 ov = fb->values[fb->head_index];
435 fb->values[fb->head_index++] = value;
436
437 if (fb->head_index >= fb->size)
438 fb->head_index = 0;
439
440 return ov;
441}
442
443static void fifo_add_val(struct fifo_buffer *fb, int value)
444{
445 int i;
446
447 for (i = 0; i < fb->size; i++)
448 fb->values[i] += value;
449}
450
451int drbd_rs_controller(struct drbd_conf *mdev)
452{
453 unsigned int sect_in; /* Number of sectors that came in since the last turn */
454 unsigned int want; /* The number of sectors we want in the proxy */
455 int req_sect; /* Number of sectors to request in this turn */
456 int correction; /* Number of sectors more we need in the proxy*/
457 int cps; /* correction per invocation of drbd_rs_controller() */
458 int steps; /* Number of time steps to plan ahead */
459 int curr_corr;
460 int max_sect;
461
462 sect_in = atomic_xchg(&mdev->rs_sect_in, 0); /* Number of sectors that came in */
463 mdev->rs_in_flight -= sect_in;
464
465 spin_lock(&mdev->peer_seq_lock); /* get an atomic view on mdev->rs_plan_s */
466
467 steps = mdev->rs_plan_s.size; /* (mdev->sync_conf.c_plan_ahead * 10 * SLEEP_TIME) / HZ; */
468
469 if (mdev->rs_in_flight + sect_in == 0) { /* At start of resync */
470 want = ((mdev->sync_conf.rate * 2 * SLEEP_TIME) / HZ) * steps;
471 } else { /* normal path */
472 want = mdev->sync_conf.c_fill_target ? mdev->sync_conf.c_fill_target :
473 sect_in * mdev->sync_conf.c_delay_target * HZ / (SLEEP_TIME * 10);
474 }
475
476 correction = want - mdev->rs_in_flight - mdev->rs_planed;
477
478 /* Plan ahead */
479 cps = correction / steps;
480 fifo_add_val(&mdev->rs_plan_s, cps);
481 mdev->rs_planed += cps * steps;
482
483 /* What we do in this step */
484 curr_corr = fifo_push(&mdev->rs_plan_s, 0);
485 spin_unlock(&mdev->peer_seq_lock);
486 mdev->rs_planed -= curr_corr;
487
488 req_sect = sect_in + curr_corr;
489 if (req_sect < 0)
490 req_sect = 0;
491
492 max_sect = (mdev->sync_conf.c_max_rate * 2 * SLEEP_TIME) / HZ;
493 if (req_sect > max_sect)
494 req_sect = max_sect;
495
496 /*
497 dev_warn(DEV, "si=%u if=%d wa=%u co=%d st=%d cps=%d pl=%d cc=%d rs=%d\n",
498 sect_in, mdev->rs_in_flight, want, correction,
499 steps, cps, mdev->rs_planed, curr_corr, req_sect);
500 */
501
502 return req_sect;
503}
504
b411b363
PR
505int w_make_resync_request(struct drbd_conf *mdev,
506 struct drbd_work *w, int cancel)
507{
508 unsigned long bit;
509 sector_t sector;
510 const sector_t capacity = drbd_get_capacity(mdev->this_bdev);
bb3d000c 511 int max_segment_size;
0f0601f4 512 int number, rollback_i, size, pe, mx;
b411b363 513 int align, queued, sndbuf;
0f0601f4 514 int i = 0;
b411b363
PR
515
516 if (unlikely(cancel))
517 return 1;
518
519 if (unlikely(mdev->state.conn < C_CONNECTED)) {
520 dev_err(DEV, "Confused in w_make_resync_request()! cstate < Connected");
521 return 0;
522 }
523
524 if (mdev->state.conn != C_SYNC_TARGET)
525 dev_err(DEV, "%s in w_make_resync_request\n",
526 drbd_conn_str(mdev->state.conn));
527
528 if (!get_ldev(mdev)) {
529 /* Since we only need to access mdev->rsync a
530 get_ldev_if_state(mdev,D_FAILED) would be sufficient, but
531 to continue resync with a broken disk makes no sense at
532 all */
533 dev_err(DEV, "Disk broke down during resync!\n");
534 mdev->resync_work.cb = w_resync_inactive;
535 return 1;
536 }
537
bb3d000c
LE
538 /* starting with drbd 8.3.8, we can handle multi-bio EEs,
539 * if it should be necessary */
540 max_segment_size = mdev->agreed_pro_version < 94 ?
541 queue_max_segment_size(mdev->rq_queue) : DRBD_MAX_SEGMENT_SIZE;
542
778f271d
PR
543 if (mdev->rs_plan_s.size) { /* mdev->sync_conf.c_plan_ahead */
544 number = drbd_rs_controller(mdev) >> (BM_BLOCK_SHIFT - 9);
545 mdev->c_sync_rate = number * HZ * (BM_BLOCK_SIZE / 1024) / SLEEP_TIME;
546 } else {
547 mdev->c_sync_rate = mdev->sync_conf.rate;
548 number = SLEEP_TIME * mdev->c_sync_rate / ((BM_BLOCK_SIZE / 1024) * HZ);
549 }
0f0601f4
LE
550
551 /* Throttle resync on lower level disk activity, which may also be
552 * caused by application IO on Primary/SyncTarget.
553 * Keep this after the call to drbd_rs_controller, as that assumes
554 * to be called as precisely as possible every SLEEP_TIME,
555 * and would be confused otherwise. */
556 if (drbd_rs_should_slow_down(mdev))
557 goto requeue;
b411b363
PR
558
559 mutex_lock(&mdev->data.mutex);
560 if (mdev->data.socket)
561 mx = mdev->data.socket->sk->sk_rcvbuf / sizeof(struct p_block_req);
562 else
563 mx = 1;
564 mutex_unlock(&mdev->data.mutex);
565
566 /* For resync rates >160MB/sec, allow more pending RS requests */
567 if (number > mx)
568 mx = number;
569
570 /* Limit the number of pending RS requests to no more than the peer's receive buffer */
0f0601f4 571 pe = atomic_read(&mdev->rs_pending_cnt);
b411b363
PR
572 if ((pe + number) > mx) {
573 number = mx - pe;
574 }
575
576 for (i = 0; i < number; i++) {
577 /* Stop generating RS requests, when half of the send buffer is filled */
578 mutex_lock(&mdev->data.mutex);
579 if (mdev->data.socket) {
580 queued = mdev->data.socket->sk->sk_wmem_queued;
581 sndbuf = mdev->data.socket->sk->sk_sndbuf;
582 } else {
583 queued = 1;
584 sndbuf = 0;
585 }
586 mutex_unlock(&mdev->data.mutex);
587 if (queued > sndbuf / 2)
588 goto requeue;
589
590next_sector:
591 size = BM_BLOCK_SIZE;
592 bit = drbd_bm_find_next(mdev, mdev->bm_resync_fo);
593
594 if (bit == -1UL) {
595 mdev->bm_resync_fo = drbd_bm_bits(mdev);
596 mdev->resync_work.cb = w_resync_inactive;
597 put_ldev(mdev);
598 return 1;
599 }
600
601 sector = BM_BIT_TO_SECT(bit);
602
603 if (drbd_try_rs_begin_io(mdev, sector)) {
604 mdev->bm_resync_fo = bit;
605 goto requeue;
606 }
607 mdev->bm_resync_fo = bit + 1;
608
609 if (unlikely(drbd_bm_test_bit(mdev, bit) == 0)) {
610 drbd_rs_complete_io(mdev, sector);
611 goto next_sector;
612 }
613
614#if DRBD_MAX_SEGMENT_SIZE > BM_BLOCK_SIZE
615 /* try to find some adjacent bits.
616 * we stop if we have already the maximum req size.
617 *
618 * Additionally always align bigger requests, in order to
619 * be prepared for all stripe sizes of software RAIDs.
b411b363
PR
620 */
621 align = 1;
d207450c 622 rollback_i = i;
b411b363
PR
623 for (;;) {
624 if (size + BM_BLOCK_SIZE > max_segment_size)
625 break;
626
627 /* Be always aligned */
628 if (sector & ((1<<(align+3))-1))
629 break;
630
631 /* do not cross extent boundaries */
632 if (((bit+1) & BM_BLOCKS_PER_BM_EXT_MASK) == 0)
633 break;
634 /* now, is it actually dirty, after all?
635 * caution, drbd_bm_test_bit is tri-state for some
636 * obscure reason; ( b == 0 ) would get the out-of-band
637 * only accidentally right because of the "oddly sized"
638 * adjustment below */
639 if (drbd_bm_test_bit(mdev, bit+1) != 1)
640 break;
641 bit++;
642 size += BM_BLOCK_SIZE;
643 if ((BM_BLOCK_SIZE << align) <= size)
644 align++;
645 i++;
646 }
647 /* if we merged some,
648 * reset the offset to start the next drbd_bm_find_next from */
649 if (size > BM_BLOCK_SIZE)
650 mdev->bm_resync_fo = bit + 1;
651#endif
652
653 /* adjust very last sectors, in case we are oddly sized */
654 if (sector + (size>>9) > capacity)
655 size = (capacity-sector)<<9;
656 if (mdev->agreed_pro_version >= 89 && mdev->csums_tfm) {
657 switch (read_for_csum(mdev, sector, size)) {
80a40e43 658 case -EIO: /* Disk failure */
b411b363
PR
659 put_ldev(mdev);
660 return 0;
80a40e43 661 case -EAGAIN: /* allocation failed, or ldev busy */
b411b363
PR
662 drbd_rs_complete_io(mdev, sector);
663 mdev->bm_resync_fo = BM_SECT_TO_BIT(sector);
d207450c 664 i = rollback_i;
b411b363 665 goto requeue;
80a40e43
LE
666 case 0:
667 /* everything ok */
668 break;
669 default:
670 BUG();
b411b363
PR
671 }
672 } else {
673 inc_rs_pending(mdev);
674 if (!drbd_send_drequest(mdev, P_RS_DATA_REQUEST,
675 sector, size, ID_SYNCER)) {
676 dev_err(DEV, "drbd_send_drequest() failed, aborting...\n");
677 dec_rs_pending(mdev);
678 put_ldev(mdev);
679 return 0;
680 }
681 }
682 }
683
684 if (mdev->bm_resync_fo >= drbd_bm_bits(mdev)) {
685 /* last syncer _request_ was sent,
686 * but the P_RS_DATA_REPLY not yet received. sync will end (and
687 * next sync group will resume), as soon as we receive the last
688 * resync data block, and the last bit is cleared.
689 * until then resync "work" is "inactive" ...
690 */
691 mdev->resync_work.cb = w_resync_inactive;
692 put_ldev(mdev);
693 return 1;
694 }
695
696 requeue:
778f271d 697 mdev->rs_in_flight += (i << (BM_BLOCK_SHIFT - 9));
b411b363
PR
698 mod_timer(&mdev->resync_timer, jiffies + SLEEP_TIME);
699 put_ldev(mdev);
700 return 1;
701}
702
703static int w_make_ov_request(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
704{
705 int number, i, size;
706 sector_t sector;
707 const sector_t capacity = drbd_get_capacity(mdev->this_bdev);
708
709 if (unlikely(cancel))
710 return 1;
711
712 if (unlikely(mdev->state.conn < C_CONNECTED)) {
713 dev_err(DEV, "Confused in w_make_ov_request()! cstate < Connected");
714 return 0;
715 }
716
717 number = SLEEP_TIME*mdev->sync_conf.rate / ((BM_BLOCK_SIZE/1024)*HZ);
718 if (atomic_read(&mdev->rs_pending_cnt) > number)
719 goto requeue;
720
721 number -= atomic_read(&mdev->rs_pending_cnt);
722
723 sector = mdev->ov_position;
724 for (i = 0; i < number; i++) {
725 if (sector >= capacity) {
726 mdev->resync_work.cb = w_resync_inactive;
727 return 1;
728 }
729
730 size = BM_BLOCK_SIZE;
731
732 if (drbd_try_rs_begin_io(mdev, sector)) {
733 mdev->ov_position = sector;
734 goto requeue;
735 }
736
737 if (sector + (size>>9) > capacity)
738 size = (capacity-sector)<<9;
739
740 inc_rs_pending(mdev);
741 if (!drbd_send_ov_request(mdev, sector, size)) {
742 dec_rs_pending(mdev);
743 return 0;
744 }
745 sector += BM_SECT_PER_BIT;
746 }
747 mdev->ov_position = sector;
748
749 requeue:
750 mod_timer(&mdev->resync_timer, jiffies + SLEEP_TIME);
751 return 1;
752}
753
754
755int w_ov_finished(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
756{
757 kfree(w);
758 ov_oos_print(mdev);
759 drbd_resync_finished(mdev);
760
761 return 1;
762}
763
764static int w_resync_finished(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
765{
766 kfree(w);
767
768 drbd_resync_finished(mdev);
769
770 return 1;
771}
772
773int drbd_resync_finished(struct drbd_conf *mdev)
774{
775 unsigned long db, dt, dbdt;
776 unsigned long n_oos;
777 union drbd_state os, ns;
778 struct drbd_work *w;
779 char *khelper_cmd = NULL;
780
781 /* Remove all elements from the resync LRU. Since future actions
782 * might set bits in the (main) bitmap, then the entries in the
783 * resync LRU would be wrong. */
784 if (drbd_rs_del_all(mdev)) {
785 /* In case this is not possible now, most probably because
786 * there are P_RS_DATA_REPLY Packets lingering on the worker's
787 * queue (or even the read operations for those packets
788 * is not finished by now). Retry in 100ms. */
789
790 drbd_kick_lo(mdev);
791 __set_current_state(TASK_INTERRUPTIBLE);
792 schedule_timeout(HZ / 10);
793 w = kmalloc(sizeof(struct drbd_work), GFP_ATOMIC);
794 if (w) {
795 w->cb = w_resync_finished;
796 drbd_queue_work(&mdev->data.work, w);
797 return 1;
798 }
799 dev_err(DEV, "Warn failed to drbd_rs_del_all() and to kmalloc(w).\n");
800 }
801
802 dt = (jiffies - mdev->rs_start - mdev->rs_paused) / HZ;
803 if (dt <= 0)
804 dt = 1;
805 db = mdev->rs_total;
806 dbdt = Bit2KB(db/dt);
807 mdev->rs_paused /= HZ;
808
809 if (!get_ldev(mdev))
810 goto out;
811
812 spin_lock_irq(&mdev->req_lock);
813 os = mdev->state;
814
815 /* This protects us against multiple calls (that can happen in the presence
816 of application IO), and against connectivity loss just before we arrive here. */
817 if (os.conn <= C_CONNECTED)
818 goto out_unlock;
819
820 ns = os;
821 ns.conn = C_CONNECTED;
822
823 dev_info(DEV, "%s done (total %lu sec; paused %lu sec; %lu K/sec)\n",
824 (os.conn == C_VERIFY_S || os.conn == C_VERIFY_T) ?
825 "Online verify " : "Resync",
826 dt + mdev->rs_paused, mdev->rs_paused, dbdt);
827
828 n_oos = drbd_bm_total_weight(mdev);
829
830 if (os.conn == C_VERIFY_S || os.conn == C_VERIFY_T) {
831 if (n_oos) {
832 dev_alert(DEV, "Online verify found %lu %dk block out of sync!\n",
833 n_oos, Bit2KB(1));
834 khelper_cmd = "out-of-sync";
835 }
836 } else {
837 D_ASSERT((n_oos - mdev->rs_failed) == 0);
838
839 if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T)
840 khelper_cmd = "after-resync-target";
841
842 if (mdev->csums_tfm && mdev->rs_total) {
843 const unsigned long s = mdev->rs_same_csum;
844 const unsigned long t = mdev->rs_total;
845 const int ratio =
846 (t == 0) ? 0 :
847 (t < 100000) ? ((s*100)/t) : (s/(t/100));
848 dev_info(DEV, "%u %% had equal check sums, eliminated: %luK; "
849 "transferred %luK total %luK\n",
850 ratio,
851 Bit2KB(mdev->rs_same_csum),
852 Bit2KB(mdev->rs_total - mdev->rs_same_csum),
853 Bit2KB(mdev->rs_total));
854 }
855 }
856
857 if (mdev->rs_failed) {
858 dev_info(DEV, " %lu failed blocks\n", mdev->rs_failed);
859
860 if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T) {
861 ns.disk = D_INCONSISTENT;
862 ns.pdsk = D_UP_TO_DATE;
863 } else {
864 ns.disk = D_UP_TO_DATE;
865 ns.pdsk = D_INCONSISTENT;
866 }
867 } else {
868 ns.disk = D_UP_TO_DATE;
869 ns.pdsk = D_UP_TO_DATE;
870
871 if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T) {
872 if (mdev->p_uuid) {
873 int i;
874 for (i = UI_BITMAP ; i <= UI_HISTORY_END ; i++)
875 _drbd_uuid_set(mdev, i, mdev->p_uuid[i]);
876 drbd_uuid_set(mdev, UI_BITMAP, mdev->ldev->md.uuid[UI_CURRENT]);
877 _drbd_uuid_set(mdev, UI_CURRENT, mdev->p_uuid[UI_CURRENT]);
878 } else {
879 dev_err(DEV, "mdev->p_uuid is NULL! BUG\n");
880 }
881 }
882
883 drbd_uuid_set_bm(mdev, 0UL);
884
885 if (mdev->p_uuid) {
886 /* Now the two UUID sets are equal, update what we
887 * know of the peer. */
888 int i;
889 for (i = UI_CURRENT ; i <= UI_HISTORY_END ; i++)
890 mdev->p_uuid[i] = mdev->ldev->md.uuid[i];
891 }
892 }
893
894 _drbd_set_state(mdev, ns, CS_VERBOSE, NULL);
895out_unlock:
896 spin_unlock_irq(&mdev->req_lock);
897 put_ldev(mdev);
898out:
899 mdev->rs_total = 0;
900 mdev->rs_failed = 0;
901 mdev->rs_paused = 0;
902 mdev->ov_start_sector = 0;
903
904 if (test_and_clear_bit(WRITE_BM_AFTER_RESYNC, &mdev->flags)) {
905 dev_warn(DEV, "Writing the whole bitmap, due to failed kmalloc\n");
906 drbd_queue_bitmap_io(mdev, &drbd_bm_write, NULL, "write from resync_finished");
907 }
908
909 if (khelper_cmd)
910 drbd_khelper(mdev, khelper_cmd);
911
912 return 1;
913}
914
915/* helper */
916static void move_to_net_ee_or_free(struct drbd_conf *mdev, struct drbd_epoch_entry *e)
917{
45bb912b 918 if (drbd_ee_has_active_page(e)) {
b411b363
PR
919 /* This might happen if sendpage() has not finished */
920 spin_lock_irq(&mdev->req_lock);
921 list_add_tail(&e->w.list, &mdev->net_ee);
922 spin_unlock_irq(&mdev->req_lock);
923 } else
924 drbd_free_ee(mdev, e);
925}
926
927/**
928 * w_e_end_data_req() - Worker callback, to send a P_DATA_REPLY packet in response to a P_DATA_REQUEST
929 * @mdev: DRBD device.
930 * @w: work object.
931 * @cancel: The connection will be closed anyways
932 */
933int w_e_end_data_req(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
934{
935 struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w);
936 int ok;
937
938 if (unlikely(cancel)) {
939 drbd_free_ee(mdev, e);
940 dec_unacked(mdev);
941 return 1;
942 }
943
45bb912b 944 if (likely((e->flags & EE_WAS_ERROR) == 0)) {
b411b363
PR
945 ok = drbd_send_block(mdev, P_DATA_REPLY, e);
946 } else {
947 if (__ratelimit(&drbd_ratelimit_state))
948 dev_err(DEV, "Sending NegDReply. sector=%llus.\n",
949 (unsigned long long)e->sector);
950
951 ok = drbd_send_ack(mdev, P_NEG_DREPLY, e);
952 }
953
954 dec_unacked(mdev);
955
956 move_to_net_ee_or_free(mdev, e);
957
958 if (unlikely(!ok))
959 dev_err(DEV, "drbd_send_block() failed\n");
960 return ok;
961}
962
963/**
964 * w_e_end_rsdata_req() - Worker callback to send a P_RS_DATA_REPLY packet in response to a P_RS_DATA_REQUESTRS
965 * @mdev: DRBD device.
966 * @w: work object.
967 * @cancel: The connection will be closed anyways
968 */
969int w_e_end_rsdata_req(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
970{
971 struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w);
972 int ok;
973
974 if (unlikely(cancel)) {
975 drbd_free_ee(mdev, e);
976 dec_unacked(mdev);
977 return 1;
978 }
979
980 if (get_ldev_if_state(mdev, D_FAILED)) {
981 drbd_rs_complete_io(mdev, e->sector);
982 put_ldev(mdev);
983 }
984
45bb912b 985 if (likely((e->flags & EE_WAS_ERROR) == 0)) {
b411b363
PR
986 if (likely(mdev->state.pdsk >= D_INCONSISTENT)) {
987 inc_rs_pending(mdev);
988 ok = drbd_send_block(mdev, P_RS_DATA_REPLY, e);
989 } else {
990 if (__ratelimit(&drbd_ratelimit_state))
991 dev_err(DEV, "Not sending RSDataReply, "
992 "partner DISKLESS!\n");
993 ok = 1;
994 }
995 } else {
996 if (__ratelimit(&drbd_ratelimit_state))
997 dev_err(DEV, "Sending NegRSDReply. sector %llus.\n",
998 (unsigned long long)e->sector);
999
1000 ok = drbd_send_ack(mdev, P_NEG_RS_DREPLY, e);
1001
1002 /* update resync data with failure */
1003 drbd_rs_failed_io(mdev, e->sector, e->size);
1004 }
1005
1006 dec_unacked(mdev);
1007
1008 move_to_net_ee_or_free(mdev, e);
1009
1010 if (unlikely(!ok))
1011 dev_err(DEV, "drbd_send_block() failed\n");
1012 return ok;
1013}
1014
1015int w_e_end_csum_rs_req(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1016{
1017 struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w);
1018 struct digest_info *di;
1019 int digest_size;
1020 void *digest = NULL;
1021 int ok, eq = 0;
1022
1023 if (unlikely(cancel)) {
1024 drbd_free_ee(mdev, e);
1025 dec_unacked(mdev);
1026 return 1;
1027 }
1028
1029 drbd_rs_complete_io(mdev, e->sector);
1030
85719573 1031 di = e->digest;
b411b363 1032
45bb912b 1033 if (likely((e->flags & EE_WAS_ERROR) == 0)) {
b411b363
PR
1034 /* quick hack to try to avoid a race against reconfiguration.
1035 * a real fix would be much more involved,
1036 * introducing more locking mechanisms */
1037 if (mdev->csums_tfm) {
1038 digest_size = crypto_hash_digestsize(mdev->csums_tfm);
1039 D_ASSERT(digest_size == di->digest_size);
1040 digest = kmalloc(digest_size, GFP_NOIO);
1041 }
1042 if (digest) {
45bb912b 1043 drbd_csum_ee(mdev, mdev->csums_tfm, e, digest);
b411b363
PR
1044 eq = !memcmp(digest, di->digest, digest_size);
1045 kfree(digest);
1046 }
1047
1048 if (eq) {
1049 drbd_set_in_sync(mdev, e->sector, e->size);
676396d5
LE
1050 /* rs_same_csums unit is BM_BLOCK_SIZE */
1051 mdev->rs_same_csum += e->size >> BM_BLOCK_SHIFT;
b411b363
PR
1052 ok = drbd_send_ack(mdev, P_RS_IS_IN_SYNC, e);
1053 } else {
1054 inc_rs_pending(mdev);
204bba99
PR
1055 e->block_id = ID_SYNCER; /* By setting block_id, digest pointer becomes invalid! */
1056 e->flags &= ~EE_HAS_DIGEST; /* This e no longer has a digest pointer */
1057 kfree(di);
b411b363
PR
1058 ok = drbd_send_block(mdev, P_RS_DATA_REPLY, e);
1059 }
1060 } else {
1061 ok = drbd_send_ack(mdev, P_NEG_RS_DREPLY, e);
1062 if (__ratelimit(&drbd_ratelimit_state))
1063 dev_err(DEV, "Sending NegDReply. I guess it gets messy.\n");
1064 }
1065
1066 dec_unacked(mdev);
b411b363
PR
1067 move_to_net_ee_or_free(mdev, e);
1068
1069 if (unlikely(!ok))
1070 dev_err(DEV, "drbd_send_block/ack() failed\n");
1071 return ok;
1072}
1073
1074int w_e_end_ov_req(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1075{
1076 struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w);
1077 int digest_size;
1078 void *digest;
1079 int ok = 1;
1080
1081 if (unlikely(cancel))
1082 goto out;
1083
45bb912b 1084 if (unlikely((e->flags & EE_WAS_ERROR) != 0))
b411b363
PR
1085 goto out;
1086
1087 digest_size = crypto_hash_digestsize(mdev->verify_tfm);
1088 /* FIXME if this allocation fails, online verify will not terminate! */
1089 digest = kmalloc(digest_size, GFP_NOIO);
1090 if (digest) {
45bb912b 1091 drbd_csum_ee(mdev, mdev->verify_tfm, e, digest);
b411b363
PR
1092 inc_rs_pending(mdev);
1093 ok = drbd_send_drequest_csum(mdev, e->sector, e->size,
1094 digest, digest_size, P_OV_REPLY);
1095 if (!ok)
1096 dec_rs_pending(mdev);
1097 kfree(digest);
1098 }
1099
1100out:
1101 drbd_free_ee(mdev, e);
1102
1103 dec_unacked(mdev);
1104
1105 return ok;
1106}
1107
1108void drbd_ov_oos_found(struct drbd_conf *mdev, sector_t sector, int size)
1109{
1110 if (mdev->ov_last_oos_start + mdev->ov_last_oos_size == sector) {
1111 mdev->ov_last_oos_size += size>>9;
1112 } else {
1113 mdev->ov_last_oos_start = sector;
1114 mdev->ov_last_oos_size = size>>9;
1115 }
1116 drbd_set_out_of_sync(mdev, sector, size);
1117 set_bit(WRITE_BM_AFTER_RESYNC, &mdev->flags);
1118}
1119
1120int w_e_end_ov_reply(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1121{
1122 struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w);
1123 struct digest_info *di;
1124 int digest_size;
1125 void *digest;
1126 int ok, eq = 0;
1127
1128 if (unlikely(cancel)) {
1129 drbd_free_ee(mdev, e);
1130 dec_unacked(mdev);
1131 return 1;
1132 }
1133
1134 /* after "cancel", because after drbd_disconnect/drbd_rs_cancel_all
1135 * the resync lru has been cleaned up already */
1136 drbd_rs_complete_io(mdev, e->sector);
1137
85719573 1138 di = e->digest;
b411b363 1139
45bb912b 1140 if (likely((e->flags & EE_WAS_ERROR) == 0)) {
b411b363
PR
1141 digest_size = crypto_hash_digestsize(mdev->verify_tfm);
1142 digest = kmalloc(digest_size, GFP_NOIO);
1143 if (digest) {
45bb912b 1144 drbd_csum_ee(mdev, mdev->verify_tfm, e, digest);
b411b363
PR
1145
1146 D_ASSERT(digest_size == di->digest_size);
1147 eq = !memcmp(digest, di->digest, digest_size);
1148 kfree(digest);
1149 }
1150 } else {
1151 ok = drbd_send_ack(mdev, P_NEG_RS_DREPLY, e);
1152 if (__ratelimit(&drbd_ratelimit_state))
1153 dev_err(DEV, "Sending NegDReply. I guess it gets messy.\n");
1154 }
1155
1156 dec_unacked(mdev);
b411b363
PR
1157 if (!eq)
1158 drbd_ov_oos_found(mdev, e->sector, e->size);
1159 else
1160 ov_oos_print(mdev);
1161
1162 ok = drbd_send_ack_ex(mdev, P_OV_RESULT, e->sector, e->size,
1163 eq ? ID_IN_SYNC : ID_OUT_OF_SYNC);
1164
1165 drbd_free_ee(mdev, e);
1166
1167 if (--mdev->ov_left == 0) {
1168 ov_oos_print(mdev);
1169 drbd_resync_finished(mdev);
1170 }
1171
1172 return ok;
1173}
1174
1175int w_prev_work_done(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1176{
1177 struct drbd_wq_barrier *b = container_of(w, struct drbd_wq_barrier, w);
1178 complete(&b->done);
1179 return 1;
1180}
1181
1182int w_send_barrier(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1183{
1184 struct drbd_tl_epoch *b = container_of(w, struct drbd_tl_epoch, w);
1185 struct p_barrier *p = &mdev->data.sbuf.barrier;
1186 int ok = 1;
1187
1188 /* really avoid racing with tl_clear. w.cb may have been referenced
1189 * just before it was reassigned and re-queued, so double check that.
1190 * actually, this race was harmless, since we only try to send the
1191 * barrier packet here, and otherwise do nothing with the object.
1192 * but compare with the head of w_clear_epoch */
1193 spin_lock_irq(&mdev->req_lock);
1194 if (w->cb != w_send_barrier || mdev->state.conn < C_CONNECTED)
1195 cancel = 1;
1196 spin_unlock_irq(&mdev->req_lock);
1197 if (cancel)
1198 return 1;
1199
1200 if (!drbd_get_data_sock(mdev))
1201 return 0;
1202 p->barrier = b->br_number;
1203 /* inc_ap_pending was done where this was queued.
1204 * dec_ap_pending will be done in got_BarrierAck
1205 * or (on connection loss) in w_clear_epoch. */
1206 ok = _drbd_send_cmd(mdev, mdev->data.socket, P_BARRIER,
0b70a13d 1207 (struct p_header80 *)p, sizeof(*p), 0);
b411b363
PR
1208 drbd_put_data_sock(mdev);
1209
1210 return ok;
1211}
1212
1213int w_send_write_hint(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1214{
1215 if (cancel)
1216 return 1;
1217 return drbd_send_short_cmd(mdev, P_UNPLUG_REMOTE);
1218}
1219
1220/**
1221 * w_send_dblock() - Worker callback to send a P_DATA packet in order to mirror a write request
1222 * @mdev: DRBD device.
1223 * @w: work object.
1224 * @cancel: The connection will be closed anyways
1225 */
1226int w_send_dblock(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1227{
1228 struct drbd_request *req = container_of(w, struct drbd_request, w);
1229 int ok;
1230
1231 if (unlikely(cancel)) {
1232 req_mod(req, send_canceled);
1233 return 1;
1234 }
1235
1236 ok = drbd_send_dblock(mdev, req);
1237 req_mod(req, ok ? handed_over_to_network : send_failed);
1238
1239 return ok;
1240}
1241
1242/**
1243 * w_send_read_req() - Worker callback to send a read request (P_DATA_REQUEST) packet
1244 * @mdev: DRBD device.
1245 * @w: work object.
1246 * @cancel: The connection will be closed anyways
1247 */
1248int w_send_read_req(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1249{
1250 struct drbd_request *req = container_of(w, struct drbd_request, w);
1251 int ok;
1252
1253 if (unlikely(cancel)) {
1254 req_mod(req, send_canceled);
1255 return 1;
1256 }
1257
1258 ok = drbd_send_drequest(mdev, P_DATA_REQUEST, req->sector, req->size,
1259 (unsigned long)req);
1260
1261 if (!ok) {
1262 /* ?? we set C_TIMEOUT or C_BROKEN_PIPE in drbd_send();
1263 * so this is probably redundant */
1264 if (mdev->state.conn >= C_CONNECTED)
1265 drbd_force_state(mdev, NS(conn, C_NETWORK_FAILURE));
1266 }
1267 req_mod(req, ok ? handed_over_to_network : send_failed);
1268
1269 return ok;
1270}
1271
265be2d0
PR
1272int w_restart_disk_io(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1273{
1274 struct drbd_request *req = container_of(w, struct drbd_request, w);
1275
0778286a 1276 if (bio_data_dir(req->master_bio) == WRITE && req->rq_state & RQ_IN_ACT_LOG)
265be2d0
PR
1277 drbd_al_begin_io(mdev, req->sector);
1278 /* Calling drbd_al_begin_io() out of the worker might deadlocks
1279 theoretically. Practically it can not deadlock, since this is
1280 only used when unfreezing IOs. All the extents of the requests
1281 that made it into the TL are already active */
1282
1283 drbd_req_make_private_bio(req, req->master_bio);
1284 req->private_bio->bi_bdev = mdev->ldev->backing_bdev;
1285 generic_make_request(req->private_bio);
1286
1287 return 1;
1288}
1289
b411b363
PR
1290static int _drbd_may_sync_now(struct drbd_conf *mdev)
1291{
1292 struct drbd_conf *odev = mdev;
1293
1294 while (1) {
1295 if (odev->sync_conf.after == -1)
1296 return 1;
1297 odev = minor_to_mdev(odev->sync_conf.after);
1298 ERR_IF(!odev) return 1;
1299 if ((odev->state.conn >= C_SYNC_SOURCE &&
1300 odev->state.conn <= C_PAUSED_SYNC_T) ||
1301 odev->state.aftr_isp || odev->state.peer_isp ||
1302 odev->state.user_isp)
1303 return 0;
1304 }
1305}
1306
1307/**
1308 * _drbd_pause_after() - Pause resync on all devices that may not resync now
1309 * @mdev: DRBD device.
1310 *
1311 * Called from process context only (admin command and after_state_ch).
1312 */
1313static int _drbd_pause_after(struct drbd_conf *mdev)
1314{
1315 struct drbd_conf *odev;
1316 int i, rv = 0;
1317
1318 for (i = 0; i < minor_count; i++) {
1319 odev = minor_to_mdev(i);
1320 if (!odev)
1321 continue;
1322 if (odev->state.conn == C_STANDALONE && odev->state.disk == D_DISKLESS)
1323 continue;
1324 if (!_drbd_may_sync_now(odev))
1325 rv |= (__drbd_set_state(_NS(odev, aftr_isp, 1), CS_HARD, NULL)
1326 != SS_NOTHING_TO_DO);
1327 }
1328
1329 return rv;
1330}
1331
1332/**
1333 * _drbd_resume_next() - Resume resync on all devices that may resync now
1334 * @mdev: DRBD device.
1335 *
1336 * Called from process context only (admin command and worker).
1337 */
1338static int _drbd_resume_next(struct drbd_conf *mdev)
1339{
1340 struct drbd_conf *odev;
1341 int i, rv = 0;
1342
1343 for (i = 0; i < minor_count; i++) {
1344 odev = minor_to_mdev(i);
1345 if (!odev)
1346 continue;
1347 if (odev->state.conn == C_STANDALONE && odev->state.disk == D_DISKLESS)
1348 continue;
1349 if (odev->state.aftr_isp) {
1350 if (_drbd_may_sync_now(odev))
1351 rv |= (__drbd_set_state(_NS(odev, aftr_isp, 0),
1352 CS_HARD, NULL)
1353 != SS_NOTHING_TO_DO) ;
1354 }
1355 }
1356 return rv;
1357}
1358
1359void resume_next_sg(struct drbd_conf *mdev)
1360{
1361 write_lock_irq(&global_state_lock);
1362 _drbd_resume_next(mdev);
1363 write_unlock_irq(&global_state_lock);
1364}
1365
1366void suspend_other_sg(struct drbd_conf *mdev)
1367{
1368 write_lock_irq(&global_state_lock);
1369 _drbd_pause_after(mdev);
1370 write_unlock_irq(&global_state_lock);
1371}
1372
1373static int sync_after_error(struct drbd_conf *mdev, int o_minor)
1374{
1375 struct drbd_conf *odev;
1376
1377 if (o_minor == -1)
1378 return NO_ERROR;
1379 if (o_minor < -1 || minor_to_mdev(o_minor) == NULL)
1380 return ERR_SYNC_AFTER;
1381
1382 /* check for loops */
1383 odev = minor_to_mdev(o_minor);
1384 while (1) {
1385 if (odev == mdev)
1386 return ERR_SYNC_AFTER_CYCLE;
1387
1388 /* dependency chain ends here, no cycles. */
1389 if (odev->sync_conf.after == -1)
1390 return NO_ERROR;
1391
1392 /* follow the dependency chain */
1393 odev = minor_to_mdev(odev->sync_conf.after);
1394 }
1395}
1396
1397int drbd_alter_sa(struct drbd_conf *mdev, int na)
1398{
1399 int changes;
1400 int retcode;
1401
1402 write_lock_irq(&global_state_lock);
1403 retcode = sync_after_error(mdev, na);
1404 if (retcode == NO_ERROR) {
1405 mdev->sync_conf.after = na;
1406 do {
1407 changes = _drbd_pause_after(mdev);
1408 changes |= _drbd_resume_next(mdev);
1409 } while (changes);
1410 }
1411 write_unlock_irq(&global_state_lock);
1412 return retcode;
1413}
1414
309d1608
PR
1415static void ping_peer(struct drbd_conf *mdev)
1416{
1417 clear_bit(GOT_PING_ACK, &mdev->flags);
1418 request_ping(mdev);
1419 wait_event(mdev->misc_wait,
1420 test_bit(GOT_PING_ACK, &mdev->flags) || mdev->state.conn < C_CONNECTED);
1421}
1422
b411b363
PR
1423/**
1424 * drbd_start_resync() - Start the resync process
1425 * @mdev: DRBD device.
1426 * @side: Either C_SYNC_SOURCE or C_SYNC_TARGET
1427 *
1428 * This function might bring you directly into one of the
1429 * C_PAUSED_SYNC_* states.
1430 */
1431void drbd_start_resync(struct drbd_conf *mdev, enum drbd_conns side)
1432{
1433 union drbd_state ns;
1434 int r;
1435
1436 if (mdev->state.conn >= C_SYNC_SOURCE) {
1437 dev_err(DEV, "Resync already running!\n");
1438 return;
1439 }
1440
b411b363
PR
1441 /* In case a previous resync run was aborted by an IO error/detach on the peer. */
1442 drbd_rs_cancel_all(mdev);
1443
1444 if (side == C_SYNC_TARGET) {
1445 /* Since application IO was locked out during C_WF_BITMAP_T and
1446 C_WF_SYNC_UUID we are still unmodified. Before going to C_SYNC_TARGET
1447 we check that we might make the data inconsistent. */
1448 r = drbd_khelper(mdev, "before-resync-target");
1449 r = (r >> 8) & 0xff;
1450 if (r > 0) {
1451 dev_info(DEV, "before-resync-target handler returned %d, "
1452 "dropping connection.\n", r);
1453 drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
1454 return;
1455 }
1456 }
1457
1458 drbd_state_lock(mdev);
1459
1460 if (!get_ldev_if_state(mdev, D_NEGOTIATING)) {
1461 drbd_state_unlock(mdev);
1462 return;
1463 }
1464
1465 if (side == C_SYNC_TARGET) {
1466 mdev->bm_resync_fo = 0;
1467 } else /* side == C_SYNC_SOURCE */ {
1468 u64 uuid;
1469
1470 get_random_bytes(&uuid, sizeof(u64));
1471 drbd_uuid_set(mdev, UI_BITMAP, uuid);
1472 drbd_send_sync_uuid(mdev, uuid);
1473
1474 D_ASSERT(mdev->state.disk == D_UP_TO_DATE);
1475 }
1476
1477 write_lock_irq(&global_state_lock);
1478 ns = mdev->state;
1479
1480 ns.aftr_isp = !_drbd_may_sync_now(mdev);
1481
1482 ns.conn = side;
1483
1484 if (side == C_SYNC_TARGET)
1485 ns.disk = D_INCONSISTENT;
1486 else /* side == C_SYNC_SOURCE */
1487 ns.pdsk = D_INCONSISTENT;
1488
1489 r = __drbd_set_state(mdev, ns, CS_VERBOSE, NULL);
1490 ns = mdev->state;
1491
1492 if (ns.conn < C_CONNECTED)
1493 r = SS_UNKNOWN_ERROR;
1494
1495 if (r == SS_SUCCESS) {
1d7734a0
LE
1496 unsigned long tw = drbd_bm_total_weight(mdev);
1497 unsigned long now = jiffies;
1498 int i;
1499
b411b363
PR
1500 mdev->rs_failed = 0;
1501 mdev->rs_paused = 0;
b411b363 1502 mdev->rs_same_csum = 0;
0f0601f4
LE
1503 mdev->rs_last_events = 0;
1504 mdev->rs_last_sect_ev = 0;
1d7734a0
LE
1505 mdev->rs_total = tw;
1506 mdev->rs_start = now;
1507 for (i = 0; i < DRBD_SYNC_MARKS; i++) {
1508 mdev->rs_mark_left[i] = tw;
1509 mdev->rs_mark_time[i] = now;
1510 }
b411b363
PR
1511 _drbd_pause_after(mdev);
1512 }
1513 write_unlock_irq(&global_state_lock);
b411b363
PR
1514 put_ldev(mdev);
1515
1516 if (r == SS_SUCCESS) {
1517 dev_info(DEV, "Began resync as %s (will sync %lu KB [%lu bits set]).\n",
1518 drbd_conn_str(ns.conn),
1519 (unsigned long) mdev->rs_total << (BM_BLOCK_SHIFT-10),
1520 (unsigned long) mdev->rs_total);
1521
1522 if (mdev->rs_total == 0) {
1523 /* Peer still reachable? Beware of failing before-resync-target handlers! */
309d1608 1524 ping_peer(mdev);
b411b363 1525 drbd_resync_finished(mdev);
b411b363
PR
1526 }
1527
778f271d 1528 atomic_set(&mdev->rs_sect_in, 0);
0f0601f4 1529 atomic_set(&mdev->rs_sect_ev, 0);
778f271d
PR
1530 mdev->rs_in_flight = 0;
1531 mdev->rs_planed = 0;
1532 spin_lock(&mdev->peer_seq_lock);
1533 fifo_set(&mdev->rs_plan_s, 0);
1534 spin_unlock(&mdev->peer_seq_lock);
b411b363
PR
1535 /* ns.conn may already be != mdev->state.conn,
1536 * we may have been paused in between, or become paused until
1537 * the timer triggers.
1538 * No matter, that is handled in resync_timer_fn() */
1539 if (ns.conn == C_SYNC_TARGET)
1540 mod_timer(&mdev->resync_timer, jiffies);
1541
1542 drbd_md_sync(mdev);
1543 }
d0c3f60f 1544 drbd_state_unlock(mdev);
b411b363
PR
1545}
1546
1547int drbd_worker(struct drbd_thread *thi)
1548{
1549 struct drbd_conf *mdev = thi->mdev;
1550 struct drbd_work *w = NULL;
1551 LIST_HEAD(work_list);
1552 int intr = 0, i;
1553
1554 sprintf(current->comm, "drbd%d_worker", mdev_to_minor(mdev));
1555
1556 while (get_t_state(thi) == Running) {
1557 drbd_thread_current_set_cpu(mdev);
1558
1559 if (down_trylock(&mdev->data.work.s)) {
1560 mutex_lock(&mdev->data.mutex);
1561 if (mdev->data.socket && !mdev->net_conf->no_cork)
1562 drbd_tcp_uncork(mdev->data.socket);
1563 mutex_unlock(&mdev->data.mutex);
1564
1565 intr = down_interruptible(&mdev->data.work.s);
1566
1567 mutex_lock(&mdev->data.mutex);
1568 if (mdev->data.socket && !mdev->net_conf->no_cork)
1569 drbd_tcp_cork(mdev->data.socket);
1570 mutex_unlock(&mdev->data.mutex);
1571 }
1572
1573 if (intr) {
1574 D_ASSERT(intr == -EINTR);
1575 flush_signals(current);
1576 ERR_IF (get_t_state(thi) == Running)
1577 continue;
1578 break;
1579 }
1580
1581 if (get_t_state(thi) != Running)
1582 break;
1583 /* With this break, we have done a down() but not consumed
1584 the entry from the list. The cleanup code takes care of
1585 this... */
1586
1587 w = NULL;
1588 spin_lock_irq(&mdev->data.work.q_lock);
1589 ERR_IF(list_empty(&mdev->data.work.q)) {
1590 /* something terribly wrong in our logic.
1591 * we were able to down() the semaphore,
1592 * but the list is empty... doh.
1593 *
1594 * what is the best thing to do now?
1595 * try again from scratch, restarting the receiver,
1596 * asender, whatnot? could break even more ugly,
1597 * e.g. when we are primary, but no good local data.
1598 *
1599 * I'll try to get away just starting over this loop.
1600 */
1601 spin_unlock_irq(&mdev->data.work.q_lock);
1602 continue;
1603 }
1604 w = list_entry(mdev->data.work.q.next, struct drbd_work, list);
1605 list_del_init(&w->list);
1606 spin_unlock_irq(&mdev->data.work.q_lock);
1607
1608 if (!w->cb(mdev, w, mdev->state.conn < C_CONNECTED)) {
1609 /* dev_warn(DEV, "worker: a callback failed! \n"); */
1610 if (mdev->state.conn >= C_CONNECTED)
1611 drbd_force_state(mdev,
1612 NS(conn, C_NETWORK_FAILURE));
1613 }
1614 }
1615 D_ASSERT(test_bit(DEVICE_DYING, &mdev->flags));
1616 D_ASSERT(test_bit(CONFIG_PENDING, &mdev->flags));
1617
1618 spin_lock_irq(&mdev->data.work.q_lock);
1619 i = 0;
1620 while (!list_empty(&mdev->data.work.q)) {
1621 list_splice_init(&mdev->data.work.q, &work_list);
1622 spin_unlock_irq(&mdev->data.work.q_lock);
1623
1624 while (!list_empty(&work_list)) {
1625 w = list_entry(work_list.next, struct drbd_work, list);
1626 list_del_init(&w->list);
1627 w->cb(mdev, w, 1);
1628 i++; /* dead debugging code */
1629 }
1630
1631 spin_lock_irq(&mdev->data.work.q_lock);
1632 }
1633 sema_init(&mdev->data.work.s, 0);
1634 /* DANGEROUS race: if someone did queue his work within the spinlock,
1635 * but up() ed outside the spinlock, we could get an up() on the
1636 * semaphore without corresponding list entry.
1637 * So don't do that.
1638 */
1639 spin_unlock_irq(&mdev->data.work.q_lock);
1640
1641 D_ASSERT(mdev->state.disk == D_DISKLESS && mdev->state.conn == C_STANDALONE);
1642 /* _drbd_set_state only uses stop_nowait.
1643 * wait here for the Exiting receiver. */
1644 drbd_thread_stop(&mdev->receiver);
1645 drbd_mdev_cleanup(mdev);
1646
1647 dev_info(DEV, "worker terminated\n");
1648
1649 clear_bit(DEVICE_DYING, &mdev->flags);
1650 clear_bit(CONFIG_PENDING, &mdev->flags);
1651 wake_up(&mdev->state_wait);
1652
1653 return 0;
1654}
This page took 0.137686 seconds and 5 git commands to generate.