drbd: Add interval tree data structure
[deliverable/linux.git] / drivers / block / drbd / drbd_worker.c
CommitLineData
b411b363
PR
1/*
2 drbd_worker.c
3
4 This file is part of DRBD by Philipp Reisner and Lars Ellenberg.
5
6 Copyright (C) 2001-2008, LINBIT Information Technologies GmbH.
7 Copyright (C) 1999-2008, Philipp Reisner <philipp.reisner@linbit.com>.
8 Copyright (C) 2002-2008, Lars Ellenberg <lars.ellenberg@linbit.com>.
9
10 drbd is free software; you can redistribute it and/or modify
11 it under the terms of the GNU General Public License as published by
12 the Free Software Foundation; either version 2, or (at your option)
13 any later version.
14
15 drbd is distributed in the hope that it will be useful,
16 but WITHOUT ANY WARRANTY; without even the implied warranty of
17 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18 GNU General Public License for more details.
19
20 You should have received a copy of the GNU General Public License
21 along with drbd; see the file COPYING. If not, write to
22 the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
23
24 */
25
b411b363 26#include <linux/module.h>
b411b363
PR
27#include <linux/drbd.h>
28#include <linux/sched.h>
b411b363
PR
29#include <linux/wait.h>
30#include <linux/mm.h>
31#include <linux/memcontrol.h>
32#include <linux/mm_inline.h>
33#include <linux/slab.h>
34#include <linux/random.h>
b411b363
PR
35#include <linux/string.h>
36#include <linux/scatterlist.h>
37
38#include "drbd_int.h"
39#include "drbd_req.h"
b411b363 40
b411b363 41static int w_make_ov_request(struct drbd_conf *mdev, struct drbd_work *w, int cancel);
9d77a5fe
PR
42static int w_make_resync_request(struct drbd_conf *mdev,
43 struct drbd_work *w, int cancel);
b411b363
PR
44
45
46
c5a91619
AG
47/* endio handlers:
48 * drbd_md_io_complete (defined here)
49 * drbd_endio_pri (defined here)
50 * drbd_endio_sec (defined here)
51 * bm_async_io_complete (defined in drbd_bitmap.c)
52 *
b411b363
PR
53 * For all these callbacks, note the following:
54 * The callbacks will be called in irq context by the IDE drivers,
55 * and in Softirqs/Tasklets/BH context by the SCSI drivers.
56 * Try to get the locking right :)
57 *
58 */
59
60
61/* About the global_state_lock
62 Each state transition on an device holds a read lock. In case we have
63 to evaluate the sync after dependencies, we grab a write lock, because
64 we need stable states on all devices for that. */
65rwlock_t global_state_lock;
66
67/* used for synchronous meta data and bitmap IO
68 * submitted by drbd_md_sync_page_io()
69 */
70void drbd_md_io_complete(struct bio *bio, int error)
71{
72 struct drbd_md_io *md_io;
73
74 md_io = (struct drbd_md_io *)bio->bi_private;
75 md_io->error = error;
76
b411b363
PR
77 complete(&md_io->event);
78}
79
80/* reads on behalf of the partner,
81 * "submitted" by the receiver
82 */
45bb912b 83void drbd_endio_read_sec_final(struct drbd_epoch_entry *e) __releases(local)
b411b363
PR
84{
85 unsigned long flags = 0;
45bb912b 86 struct drbd_conf *mdev = e->mdev;
b411b363 87
b411b363
PR
88 spin_lock_irqsave(&mdev->req_lock, flags);
89 mdev->read_cnt += e->size >> 9;
90 list_del(&e->w.list);
91 if (list_empty(&mdev->read_ee))
92 wake_up(&mdev->ee_wait);
45bb912b 93 if (test_bit(__EE_WAS_ERROR, &e->flags))
81e84650 94 __drbd_chk_io_error(mdev, false);
b411b363
PR
95 spin_unlock_irqrestore(&mdev->req_lock, flags);
96
b411b363
PR
97 drbd_queue_work(&mdev->data.work, &e->w);
98 put_ldev(mdev);
b411b363
PR
99}
100
101/* writes on behalf of the partner, or resync writes,
45bb912b
LE
102 * "submitted" by the receiver, final stage. */
103static void drbd_endio_write_sec_final(struct drbd_epoch_entry *e) __releases(local)
b411b363
PR
104{
105 unsigned long flags = 0;
45bb912b 106 struct drbd_conf *mdev = e->mdev;
b411b363
PR
107 sector_t e_sector;
108 int do_wake;
579b57ed 109 u64 block_id;
b411b363 110 int do_al_complete_io;
b411b363 111
b411b363
PR
112 /* after we moved e to done_ee,
113 * we may no longer access it,
114 * it may be freed/reused already!
115 * (as soon as we release the req_lock) */
116 e_sector = e->sector;
117 do_al_complete_io = e->flags & EE_CALL_AL_COMPLETE_IO;
579b57ed 118 block_id = e->block_id;
b411b363 119
45bb912b
LE
120 spin_lock_irqsave(&mdev->req_lock, flags);
121 mdev->writ_cnt += e->size >> 9;
b411b363
PR
122 list_del(&e->w.list); /* has been on active_ee or sync_ee */
123 list_add_tail(&e->w.list, &mdev->done_ee);
124
24c4830c 125 /* No hlist_del_init(&e->collision) here, we did not send the Ack yet,
b411b363
PR
126 * neither did we wake possibly waiting conflicting requests.
127 * done from "drbd_process_done_ee" within the appropriate w.cb
128 * (e_end_block/e_end_resync_block) or from _drbd_clear_done_ee */
129
579b57ed 130 do_wake = list_empty(block_id == ID_SYNCER ? &mdev->sync_ee : &mdev->active_ee);
b411b363 131
45bb912b 132 if (test_bit(__EE_WAS_ERROR, &e->flags))
81e84650 133 __drbd_chk_io_error(mdev, false);
b411b363
PR
134 spin_unlock_irqrestore(&mdev->req_lock, flags);
135
579b57ed 136 if (block_id == ID_SYNCER)
b411b363
PR
137 drbd_rs_complete_io(mdev, e_sector);
138
139 if (do_wake)
140 wake_up(&mdev->ee_wait);
141
142 if (do_al_complete_io)
143 drbd_al_complete_io(mdev, e_sector);
144
145 wake_asender(mdev);
146 put_ldev(mdev);
45bb912b 147}
b411b363 148
45bb912b
LE
149/* writes on behalf of the partner, or resync writes,
150 * "submitted" by the receiver.
151 */
152void drbd_endio_sec(struct bio *bio, int error)
153{
154 struct drbd_epoch_entry *e = bio->bi_private;
155 struct drbd_conf *mdev = e->mdev;
156 int uptodate = bio_flagged(bio, BIO_UPTODATE);
157 int is_write = bio_data_dir(bio) == WRITE;
158
07194272 159 if (error && __ratelimit(&drbd_ratelimit_state))
45bb912b
LE
160 dev_warn(DEV, "%s: error=%d s=%llus\n",
161 is_write ? "write" : "read", error,
162 (unsigned long long)e->sector);
163 if (!error && !uptodate) {
07194272
LE
164 if (__ratelimit(&drbd_ratelimit_state))
165 dev_warn(DEV, "%s: setting error to -EIO s=%llus\n",
166 is_write ? "write" : "read",
167 (unsigned long long)e->sector);
45bb912b
LE
168 /* strange behavior of some lower level drivers...
169 * fail the request by clearing the uptodate flag,
170 * but do not return any error?! */
171 error = -EIO;
172 }
173
174 if (error)
175 set_bit(__EE_WAS_ERROR, &e->flags);
176
177 bio_put(bio); /* no need for the bio anymore */
178 if (atomic_dec_and_test(&e->pending_bios)) {
179 if (is_write)
180 drbd_endio_write_sec_final(e);
181 else
182 drbd_endio_read_sec_final(e);
183 }
b411b363
PR
184}
185
186/* read, readA or write requests on R_PRIMARY coming from drbd_make_request
187 */
188void drbd_endio_pri(struct bio *bio, int error)
189{
a115413d 190 unsigned long flags;
b411b363
PR
191 struct drbd_request *req = bio->bi_private;
192 struct drbd_conf *mdev = req->mdev;
a115413d 193 struct bio_and_error m;
b411b363
PR
194 enum drbd_req_event what;
195 int uptodate = bio_flagged(bio, BIO_UPTODATE);
196
b411b363
PR
197 if (!error && !uptodate) {
198 dev_warn(DEV, "p %s: setting error to -EIO\n",
199 bio_data_dir(bio) == WRITE ? "write" : "read");
200 /* strange behavior of some lower level drivers...
201 * fail the request by clearing the uptodate flag,
202 * but do not return any error?! */
203 error = -EIO;
204 }
205
b411b363
PR
206 /* to avoid recursion in __req_mod */
207 if (unlikely(error)) {
208 what = (bio_data_dir(bio) == WRITE)
209 ? write_completed_with_error
5c3c7e64 210 : (bio_rw(bio) == READ)
b411b363
PR
211 ? read_completed_with_error
212 : read_ahead_completed_with_error;
213 } else
214 what = completed_ok;
215
216 bio_put(req->private_bio);
217 req->private_bio = ERR_PTR(error);
218
a115413d
LE
219 /* not req_mod(), we need irqsave here! */
220 spin_lock_irqsave(&mdev->req_lock, flags);
221 __req_mod(req, what, &m);
222 spin_unlock_irqrestore(&mdev->req_lock, flags);
223
224 if (m.bio)
225 complete_master_bio(mdev, &m);
b411b363
PR
226}
227
b411b363
PR
228int w_read_retry_remote(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
229{
230 struct drbd_request *req = container_of(w, struct drbd_request, w);
231
232 /* We should not detach for read io-error,
233 * but try to WRITE the P_DATA_REPLY to the failed location,
234 * to give the disk the chance to relocate that block */
235
236 spin_lock_irq(&mdev->req_lock);
d255e5ff
LE
237 if (cancel || mdev->state.pdsk != D_UP_TO_DATE) {
238 _req_mod(req, read_retry_remote_canceled);
b411b363 239 spin_unlock_irq(&mdev->req_lock);
b411b363
PR
240 return 1;
241 }
242 spin_unlock_irq(&mdev->req_lock);
243
244 return w_send_read_req(mdev, w, 0);
245}
246
45bb912b
LE
247void drbd_csum_ee(struct drbd_conf *mdev, struct crypto_hash *tfm, struct drbd_epoch_entry *e, void *digest)
248{
249 struct hash_desc desc;
250 struct scatterlist sg;
251 struct page *page = e->pages;
252 struct page *tmp;
253 unsigned len;
254
255 desc.tfm = tfm;
256 desc.flags = 0;
257
258 sg_init_table(&sg, 1);
259 crypto_hash_init(&desc);
260
261 while ((tmp = page_chain_next(page))) {
262 /* all but the last page will be fully used */
263 sg_set_page(&sg, page, PAGE_SIZE, 0);
264 crypto_hash_update(&desc, &sg, sg.length);
265 page = tmp;
266 }
267 /* and now the last, possibly only partially used page */
268 len = e->size & (PAGE_SIZE - 1);
269 sg_set_page(&sg, page, len ?: PAGE_SIZE, 0);
270 crypto_hash_update(&desc, &sg, sg.length);
271 crypto_hash_final(&desc, digest);
272}
273
274void drbd_csum_bio(struct drbd_conf *mdev, struct crypto_hash *tfm, struct bio *bio, void *digest)
b411b363
PR
275{
276 struct hash_desc desc;
277 struct scatterlist sg;
278 struct bio_vec *bvec;
279 int i;
280
281 desc.tfm = tfm;
282 desc.flags = 0;
283
284 sg_init_table(&sg, 1);
285 crypto_hash_init(&desc);
286
287 __bio_for_each_segment(bvec, bio, i, 0) {
288 sg_set_page(&sg, bvec->bv_page, bvec->bv_len, bvec->bv_offset);
289 crypto_hash_update(&desc, &sg, sg.length);
290 }
291 crypto_hash_final(&desc, digest);
292}
293
53ea4331
LE
294/* TODO merge common code with w_e_end_ov_req */
295int w_e_send_csum(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
b411b363
PR
296{
297 struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w);
298 int digest_size;
299 void *digest;
53ea4331 300 int ok = 1;
b411b363 301
53ea4331
LE
302 if (unlikely(cancel))
303 goto out;
b411b363 304
53ea4331
LE
305 if (likely((e->flags & EE_WAS_ERROR) != 0))
306 goto out;
b411b363 307
53ea4331
LE
308 digest_size = crypto_hash_digestsize(mdev->csums_tfm);
309 digest = kmalloc(digest_size, GFP_NOIO);
310 if (digest) {
311 sector_t sector = e->sector;
312 unsigned int size = e->size;
313 drbd_csum_ee(mdev, mdev->csums_tfm, e, digest);
314 /* Free e and pages before send.
315 * In case we block on congestion, we could otherwise run into
316 * some distributed deadlock, if the other side blocks on
317 * congestion as well, because our receiver blocks in
318 * drbd_pp_alloc due to pp_in_use > max_buffers. */
319 drbd_free_ee(mdev, e);
320 e = NULL;
321 inc_rs_pending(mdev);
322 ok = drbd_send_drequest_csum(mdev, sector, size,
323 digest, digest_size,
324 P_CSUM_RS_REQUEST);
325 kfree(digest);
326 } else {
327 dev_err(DEV, "kmalloc() of digest failed.\n");
328 ok = 0;
329 }
b411b363 330
53ea4331
LE
331out:
332 if (e)
333 drbd_free_ee(mdev, e);
b411b363
PR
334
335 if (unlikely(!ok))
336 dev_err(DEV, "drbd_send_drequest(..., csum) failed\n");
337 return ok;
338}
339
340#define GFP_TRY (__GFP_HIGHMEM | __GFP_NOWARN)
341
342static int read_for_csum(struct drbd_conf *mdev, sector_t sector, int size)
343{
344 struct drbd_epoch_entry *e;
345
346 if (!get_ldev(mdev))
80a40e43 347 return -EIO;
b411b363 348
e3555d85 349 if (drbd_rs_should_slow_down(mdev, sector))
0f0601f4
LE
350 goto defer;
351
b411b363
PR
352 /* GFP_TRY, because if there is no memory available right now, this may
353 * be rescheduled for later. It is "only" background resync, after all. */
9a8e7753 354 e = drbd_alloc_ee(mdev, ID_SYNCER /* unused */, sector, size, GFP_TRY);
45bb912b 355 if (!e)
80a40e43 356 goto defer;
b411b363 357
80a40e43 358 e->w.cb = w_e_send_csum;
b411b363
PR
359 spin_lock_irq(&mdev->req_lock);
360 list_add(&e->w.list, &mdev->read_ee);
361 spin_unlock_irq(&mdev->req_lock);
362
0f0601f4 363 atomic_add(size >> 9, &mdev->rs_sect_ev);
45bb912b 364 if (drbd_submit_ee(mdev, e, READ, DRBD_FAULT_RS_RD) == 0)
80a40e43 365 return 0;
b411b363 366
10f6d992
LE
367 /* If it failed because of ENOMEM, retry should help. If it failed
368 * because bio_add_page failed (probably broken lower level driver),
369 * retry may or may not help.
370 * If it does not, you may need to force disconnect. */
22cc37a9
LE
371 spin_lock_irq(&mdev->req_lock);
372 list_del(&e->w.list);
373 spin_unlock_irq(&mdev->req_lock);
374
45bb912b 375 drbd_free_ee(mdev, e);
80a40e43 376defer:
45bb912b 377 put_ldev(mdev);
80a40e43 378 return -EAGAIN;
b411b363
PR
379}
380
794abb75 381int w_resync_timer(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
b411b363 382{
63106d3c
PR
383 switch (mdev->state.conn) {
384 case C_VERIFY_S:
794abb75 385 w_make_ov_request(mdev, w, cancel);
63106d3c
PR
386 break;
387 case C_SYNC_TARGET:
794abb75 388 w_make_resync_request(mdev, w, cancel);
63106d3c 389 break;
b411b363
PR
390 }
391
794abb75
PR
392 return 1;
393}
394
395void resync_timer_fn(unsigned long data)
396{
397 struct drbd_conf *mdev = (struct drbd_conf *) data;
398
399 if (list_empty(&mdev->resync_work.list))
b411b363
PR
400 drbd_queue_work(&mdev->data.work, &mdev->resync_work);
401}
402
778f271d
PR
403static void fifo_set(struct fifo_buffer *fb, int value)
404{
405 int i;
406
407 for (i = 0; i < fb->size; i++)
f10f2623 408 fb->values[i] = value;
778f271d
PR
409}
410
411static int fifo_push(struct fifo_buffer *fb, int value)
412{
413 int ov;
414
415 ov = fb->values[fb->head_index];
416 fb->values[fb->head_index++] = value;
417
418 if (fb->head_index >= fb->size)
419 fb->head_index = 0;
420
421 return ov;
422}
423
424static void fifo_add_val(struct fifo_buffer *fb, int value)
425{
426 int i;
427
428 for (i = 0; i < fb->size; i++)
429 fb->values[i] += value;
430}
431
9d77a5fe 432static int drbd_rs_controller(struct drbd_conf *mdev)
778f271d
PR
433{
434 unsigned int sect_in; /* Number of sectors that came in since the last turn */
435 unsigned int want; /* The number of sectors we want in the proxy */
436 int req_sect; /* Number of sectors to request in this turn */
437 int correction; /* Number of sectors more we need in the proxy*/
438 int cps; /* correction per invocation of drbd_rs_controller() */
439 int steps; /* Number of time steps to plan ahead */
440 int curr_corr;
441 int max_sect;
442
443 sect_in = atomic_xchg(&mdev->rs_sect_in, 0); /* Number of sectors that came in */
444 mdev->rs_in_flight -= sect_in;
445
446 spin_lock(&mdev->peer_seq_lock); /* get an atomic view on mdev->rs_plan_s */
447
448 steps = mdev->rs_plan_s.size; /* (mdev->sync_conf.c_plan_ahead * 10 * SLEEP_TIME) / HZ; */
449
450 if (mdev->rs_in_flight + sect_in == 0) { /* At start of resync */
451 want = ((mdev->sync_conf.rate * 2 * SLEEP_TIME) / HZ) * steps;
452 } else { /* normal path */
453 want = mdev->sync_conf.c_fill_target ? mdev->sync_conf.c_fill_target :
454 sect_in * mdev->sync_conf.c_delay_target * HZ / (SLEEP_TIME * 10);
455 }
456
457 correction = want - mdev->rs_in_flight - mdev->rs_planed;
458
459 /* Plan ahead */
460 cps = correction / steps;
461 fifo_add_val(&mdev->rs_plan_s, cps);
462 mdev->rs_planed += cps * steps;
463
464 /* What we do in this step */
465 curr_corr = fifo_push(&mdev->rs_plan_s, 0);
466 spin_unlock(&mdev->peer_seq_lock);
467 mdev->rs_planed -= curr_corr;
468
469 req_sect = sect_in + curr_corr;
470 if (req_sect < 0)
471 req_sect = 0;
472
473 max_sect = (mdev->sync_conf.c_max_rate * 2 * SLEEP_TIME) / HZ;
474 if (req_sect > max_sect)
475 req_sect = max_sect;
476
477 /*
478 dev_warn(DEV, "si=%u if=%d wa=%u co=%d st=%d cps=%d pl=%d cc=%d rs=%d\n",
479 sect_in, mdev->rs_in_flight, want, correction,
480 steps, cps, mdev->rs_planed, curr_corr, req_sect);
481 */
482
483 return req_sect;
484}
485
9d77a5fe 486static int drbd_rs_number_requests(struct drbd_conf *mdev)
e65f440d
LE
487{
488 int number;
489 if (mdev->rs_plan_s.size) { /* mdev->sync_conf.c_plan_ahead */
490 number = drbd_rs_controller(mdev) >> (BM_BLOCK_SHIFT - 9);
491 mdev->c_sync_rate = number * HZ * (BM_BLOCK_SIZE / 1024) / SLEEP_TIME;
492 } else {
493 mdev->c_sync_rate = mdev->sync_conf.rate;
494 number = SLEEP_TIME * mdev->c_sync_rate / ((BM_BLOCK_SIZE / 1024) * HZ);
495 }
496
e65f440d
LE
497 /* ignore the amount of pending requests, the resync controller should
498 * throttle down to incoming reply rate soon enough anyways. */
499 return number;
500}
501
9d77a5fe
PR
502static int w_make_resync_request(struct drbd_conf *mdev,
503 struct drbd_work *w, int cancel)
b411b363
PR
504{
505 unsigned long bit;
506 sector_t sector;
507 const sector_t capacity = drbd_get_capacity(mdev->this_bdev);
1816a2b4 508 int max_bio_size;
e65f440d 509 int number, rollback_i, size;
b411b363 510 int align, queued, sndbuf;
0f0601f4 511 int i = 0;
b411b363
PR
512
513 if (unlikely(cancel))
514 return 1;
515
af85e8e8
LE
516 if (mdev->rs_total == 0) {
517 /* empty resync? */
518 drbd_resync_finished(mdev);
519 return 1;
520 }
521
b411b363
PR
522 if (!get_ldev(mdev)) {
523 /* Since we only need to access mdev->rsync a
524 get_ldev_if_state(mdev,D_FAILED) would be sufficient, but
525 to continue resync with a broken disk makes no sense at
526 all */
527 dev_err(DEV, "Disk broke down during resync!\n");
b411b363
PR
528 return 1;
529 }
530
0cfdd247 531 max_bio_size = queue_max_hw_sectors(mdev->rq_queue) << 9;
e65f440d
LE
532 number = drbd_rs_number_requests(mdev);
533 if (number == 0)
0f0601f4 534 goto requeue;
b411b363 535
b411b363
PR
536 for (i = 0; i < number; i++) {
537 /* Stop generating RS requests, when half of the send buffer is filled */
538 mutex_lock(&mdev->data.mutex);
539 if (mdev->data.socket) {
540 queued = mdev->data.socket->sk->sk_wmem_queued;
541 sndbuf = mdev->data.socket->sk->sk_sndbuf;
542 } else {
543 queued = 1;
544 sndbuf = 0;
545 }
546 mutex_unlock(&mdev->data.mutex);
547 if (queued > sndbuf / 2)
548 goto requeue;
549
550next_sector:
551 size = BM_BLOCK_SIZE;
552 bit = drbd_bm_find_next(mdev, mdev->bm_resync_fo);
553
4b0715f0 554 if (bit == DRBD_END_OF_BITMAP) {
b411b363 555 mdev->bm_resync_fo = drbd_bm_bits(mdev);
b411b363
PR
556 put_ldev(mdev);
557 return 1;
558 }
559
560 sector = BM_BIT_TO_SECT(bit);
561
e3555d85
PR
562 if (drbd_rs_should_slow_down(mdev, sector) ||
563 drbd_try_rs_begin_io(mdev, sector)) {
b411b363
PR
564 mdev->bm_resync_fo = bit;
565 goto requeue;
566 }
567 mdev->bm_resync_fo = bit + 1;
568
569 if (unlikely(drbd_bm_test_bit(mdev, bit) == 0)) {
570 drbd_rs_complete_io(mdev, sector);
571 goto next_sector;
572 }
573
1816a2b4 574#if DRBD_MAX_BIO_SIZE > BM_BLOCK_SIZE
b411b363
PR
575 /* try to find some adjacent bits.
576 * we stop if we have already the maximum req size.
577 *
578 * Additionally always align bigger requests, in order to
579 * be prepared for all stripe sizes of software RAIDs.
b411b363
PR
580 */
581 align = 1;
d207450c 582 rollback_i = i;
b411b363 583 for (;;) {
1816a2b4 584 if (size + BM_BLOCK_SIZE > max_bio_size)
b411b363
PR
585 break;
586
587 /* Be always aligned */
588 if (sector & ((1<<(align+3))-1))
589 break;
590
591 /* do not cross extent boundaries */
592 if (((bit+1) & BM_BLOCKS_PER_BM_EXT_MASK) == 0)
593 break;
594 /* now, is it actually dirty, after all?
595 * caution, drbd_bm_test_bit is tri-state for some
596 * obscure reason; ( b == 0 ) would get the out-of-band
597 * only accidentally right because of the "oddly sized"
598 * adjustment below */
599 if (drbd_bm_test_bit(mdev, bit+1) != 1)
600 break;
601 bit++;
602 size += BM_BLOCK_SIZE;
603 if ((BM_BLOCK_SIZE << align) <= size)
604 align++;
605 i++;
606 }
607 /* if we merged some,
608 * reset the offset to start the next drbd_bm_find_next from */
609 if (size > BM_BLOCK_SIZE)
610 mdev->bm_resync_fo = bit + 1;
611#endif
612
613 /* adjust very last sectors, in case we are oddly sized */
614 if (sector + (size>>9) > capacity)
615 size = (capacity-sector)<<9;
616 if (mdev->agreed_pro_version >= 89 && mdev->csums_tfm) {
617 switch (read_for_csum(mdev, sector, size)) {
80a40e43 618 case -EIO: /* Disk failure */
b411b363
PR
619 put_ldev(mdev);
620 return 0;
80a40e43 621 case -EAGAIN: /* allocation failed, or ldev busy */
b411b363
PR
622 drbd_rs_complete_io(mdev, sector);
623 mdev->bm_resync_fo = BM_SECT_TO_BIT(sector);
d207450c 624 i = rollback_i;
b411b363 625 goto requeue;
80a40e43
LE
626 case 0:
627 /* everything ok */
628 break;
629 default:
630 BUG();
b411b363
PR
631 }
632 } else {
633 inc_rs_pending(mdev);
634 if (!drbd_send_drequest(mdev, P_RS_DATA_REQUEST,
635 sector, size, ID_SYNCER)) {
636 dev_err(DEV, "drbd_send_drequest() failed, aborting...\n");
637 dec_rs_pending(mdev);
638 put_ldev(mdev);
639 return 0;
640 }
641 }
642 }
643
644 if (mdev->bm_resync_fo >= drbd_bm_bits(mdev)) {
645 /* last syncer _request_ was sent,
646 * but the P_RS_DATA_REPLY not yet received. sync will end (and
647 * next sync group will resume), as soon as we receive the last
648 * resync data block, and the last bit is cleared.
649 * until then resync "work" is "inactive" ...
650 */
b411b363
PR
651 put_ldev(mdev);
652 return 1;
653 }
654
655 requeue:
778f271d 656 mdev->rs_in_flight += (i << (BM_BLOCK_SHIFT - 9));
b411b363
PR
657 mod_timer(&mdev->resync_timer, jiffies + SLEEP_TIME);
658 put_ldev(mdev);
659 return 1;
660}
661
662static int w_make_ov_request(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
663{
664 int number, i, size;
665 sector_t sector;
666 const sector_t capacity = drbd_get_capacity(mdev->this_bdev);
667
668 if (unlikely(cancel))
669 return 1;
670
2649f080 671 number = drbd_rs_number_requests(mdev);
b411b363
PR
672
673 sector = mdev->ov_position;
674 for (i = 0; i < number; i++) {
675 if (sector >= capacity) {
b411b363
PR
676 return 1;
677 }
678
679 size = BM_BLOCK_SIZE;
680
e3555d85
PR
681 if (drbd_rs_should_slow_down(mdev, sector) ||
682 drbd_try_rs_begin_io(mdev, sector)) {
b411b363
PR
683 mdev->ov_position = sector;
684 goto requeue;
685 }
686
687 if (sector + (size>>9) > capacity)
688 size = (capacity-sector)<<9;
689
690 inc_rs_pending(mdev);
691 if (!drbd_send_ov_request(mdev, sector, size)) {
692 dec_rs_pending(mdev);
693 return 0;
694 }
695 sector += BM_SECT_PER_BIT;
696 }
697 mdev->ov_position = sector;
698
699 requeue:
2649f080 700 mdev->rs_in_flight += (i << (BM_BLOCK_SHIFT - 9));
b411b363
PR
701 mod_timer(&mdev->resync_timer, jiffies + SLEEP_TIME);
702 return 1;
703}
704
c4752ef1 705
370a43e7
PR
706void start_resync_timer_fn(unsigned long data)
707{
708 struct drbd_conf *mdev = (struct drbd_conf *) data;
709
710 drbd_queue_work(&mdev->data.work, &mdev->start_resync_work);
711}
712
c4752ef1
PR
713int w_start_resync(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
714{
370a43e7
PR
715 if (atomic_read(&mdev->unacked_cnt) || atomic_read(&mdev->rs_pending_cnt)) {
716 dev_warn(DEV, "w_start_resync later...\n");
717 mdev->start_resync_timer.expires = jiffies + HZ/10;
718 add_timer(&mdev->start_resync_timer);
719 return 1;
720 }
c4752ef1 721
370a43e7
PR
722 drbd_start_resync(mdev, C_SYNC_SOURCE);
723 clear_bit(AHEAD_TO_SYNC_SOURCE, &mdev->current_epoch->flags);
c4752ef1
PR
724 return 1;
725}
726
b411b363
PR
727int w_ov_finished(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
728{
729 kfree(w);
730 ov_oos_print(mdev);
731 drbd_resync_finished(mdev);
732
733 return 1;
734}
735
736static int w_resync_finished(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
737{
738 kfree(w);
739
740 drbd_resync_finished(mdev);
741
742 return 1;
743}
744
af85e8e8
LE
745static void ping_peer(struct drbd_conf *mdev)
746{
747 clear_bit(GOT_PING_ACK, &mdev->flags);
748 request_ping(mdev);
749 wait_event(mdev->misc_wait,
750 test_bit(GOT_PING_ACK, &mdev->flags) || mdev->state.conn < C_CONNECTED);
751}
752
b411b363
PR
753int drbd_resync_finished(struct drbd_conf *mdev)
754{
755 unsigned long db, dt, dbdt;
756 unsigned long n_oos;
757 union drbd_state os, ns;
758 struct drbd_work *w;
759 char *khelper_cmd = NULL;
26525618 760 int verify_done = 0;
b411b363
PR
761
762 /* Remove all elements from the resync LRU. Since future actions
763 * might set bits in the (main) bitmap, then the entries in the
764 * resync LRU would be wrong. */
765 if (drbd_rs_del_all(mdev)) {
766 /* In case this is not possible now, most probably because
767 * there are P_RS_DATA_REPLY Packets lingering on the worker's
768 * queue (or even the read operations for those packets
769 * is not finished by now). Retry in 100ms. */
770
20ee6390 771 schedule_timeout_interruptible(HZ / 10);
b411b363
PR
772 w = kmalloc(sizeof(struct drbd_work), GFP_ATOMIC);
773 if (w) {
774 w->cb = w_resync_finished;
775 drbd_queue_work(&mdev->data.work, w);
776 return 1;
777 }
778 dev_err(DEV, "Warn failed to drbd_rs_del_all() and to kmalloc(w).\n");
779 }
780
781 dt = (jiffies - mdev->rs_start - mdev->rs_paused) / HZ;
782 if (dt <= 0)
783 dt = 1;
784 db = mdev->rs_total;
785 dbdt = Bit2KB(db/dt);
786 mdev->rs_paused /= HZ;
787
788 if (!get_ldev(mdev))
789 goto out;
790
af85e8e8
LE
791 ping_peer(mdev);
792
b411b363
PR
793 spin_lock_irq(&mdev->req_lock);
794 os = mdev->state;
795
26525618
LE
796 verify_done = (os.conn == C_VERIFY_S || os.conn == C_VERIFY_T);
797
b411b363
PR
798 /* This protects us against multiple calls (that can happen in the presence
799 of application IO), and against connectivity loss just before we arrive here. */
800 if (os.conn <= C_CONNECTED)
801 goto out_unlock;
802
803 ns = os;
804 ns.conn = C_CONNECTED;
805
806 dev_info(DEV, "%s done (total %lu sec; paused %lu sec; %lu K/sec)\n",
26525618 807 verify_done ? "Online verify " : "Resync",
b411b363
PR
808 dt + mdev->rs_paused, mdev->rs_paused, dbdt);
809
810 n_oos = drbd_bm_total_weight(mdev);
811
812 if (os.conn == C_VERIFY_S || os.conn == C_VERIFY_T) {
813 if (n_oos) {
814 dev_alert(DEV, "Online verify found %lu %dk block out of sync!\n",
815 n_oos, Bit2KB(1));
816 khelper_cmd = "out-of-sync";
817 }
818 } else {
819 D_ASSERT((n_oos - mdev->rs_failed) == 0);
820
821 if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T)
822 khelper_cmd = "after-resync-target";
823
824 if (mdev->csums_tfm && mdev->rs_total) {
825 const unsigned long s = mdev->rs_same_csum;
826 const unsigned long t = mdev->rs_total;
827 const int ratio =
828 (t == 0) ? 0 :
829 (t < 100000) ? ((s*100)/t) : (s/(t/100));
24c4830c 830 dev_info(DEV, "%u %% had equal checksums, eliminated: %luK; "
b411b363
PR
831 "transferred %luK total %luK\n",
832 ratio,
833 Bit2KB(mdev->rs_same_csum),
834 Bit2KB(mdev->rs_total - mdev->rs_same_csum),
835 Bit2KB(mdev->rs_total));
836 }
837 }
838
839 if (mdev->rs_failed) {
840 dev_info(DEV, " %lu failed blocks\n", mdev->rs_failed);
841
842 if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T) {
843 ns.disk = D_INCONSISTENT;
844 ns.pdsk = D_UP_TO_DATE;
845 } else {
846 ns.disk = D_UP_TO_DATE;
847 ns.pdsk = D_INCONSISTENT;
848 }
849 } else {
850 ns.disk = D_UP_TO_DATE;
851 ns.pdsk = D_UP_TO_DATE;
852
853 if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T) {
854 if (mdev->p_uuid) {
855 int i;
856 for (i = UI_BITMAP ; i <= UI_HISTORY_END ; i++)
857 _drbd_uuid_set(mdev, i, mdev->p_uuid[i]);
858 drbd_uuid_set(mdev, UI_BITMAP, mdev->ldev->md.uuid[UI_CURRENT]);
859 _drbd_uuid_set(mdev, UI_CURRENT, mdev->p_uuid[UI_CURRENT]);
860 } else {
861 dev_err(DEV, "mdev->p_uuid is NULL! BUG\n");
862 }
863 }
864
62b0da3a
LE
865 if (!(os.conn == C_VERIFY_S || os.conn == C_VERIFY_T)) {
866 /* for verify runs, we don't update uuids here,
867 * so there would be nothing to report. */
868 drbd_uuid_set_bm(mdev, 0UL);
869 drbd_print_uuids(mdev, "updated UUIDs");
870 if (mdev->p_uuid) {
871 /* Now the two UUID sets are equal, update what we
872 * know of the peer. */
873 int i;
874 for (i = UI_CURRENT ; i <= UI_HISTORY_END ; i++)
875 mdev->p_uuid[i] = mdev->ldev->md.uuid[i];
876 }
b411b363
PR
877 }
878 }
879
880 _drbd_set_state(mdev, ns, CS_VERBOSE, NULL);
881out_unlock:
882 spin_unlock_irq(&mdev->req_lock);
883 put_ldev(mdev);
884out:
885 mdev->rs_total = 0;
886 mdev->rs_failed = 0;
887 mdev->rs_paused = 0;
26525618
LE
888 if (verify_done)
889 mdev->ov_start_sector = 0;
b411b363 890
13d42685
LE
891 drbd_md_sync(mdev);
892
b411b363
PR
893 if (khelper_cmd)
894 drbd_khelper(mdev, khelper_cmd);
895
896 return 1;
897}
898
899/* helper */
900static void move_to_net_ee_or_free(struct drbd_conf *mdev, struct drbd_epoch_entry *e)
901{
45bb912b 902 if (drbd_ee_has_active_page(e)) {
b411b363 903 /* This might happen if sendpage() has not finished */
78db8928 904 int i = (e->size + PAGE_SIZE -1) >> PAGE_SHIFT;
435f0740
LE
905 atomic_add(i, &mdev->pp_in_use_by_net);
906 atomic_sub(i, &mdev->pp_in_use);
b411b363
PR
907 spin_lock_irq(&mdev->req_lock);
908 list_add_tail(&e->w.list, &mdev->net_ee);
909 spin_unlock_irq(&mdev->req_lock);
435f0740 910 wake_up(&drbd_pp_wait);
b411b363
PR
911 } else
912 drbd_free_ee(mdev, e);
913}
914
915/**
916 * w_e_end_data_req() - Worker callback, to send a P_DATA_REPLY packet in response to a P_DATA_REQUEST
917 * @mdev: DRBD device.
918 * @w: work object.
919 * @cancel: The connection will be closed anyways
920 */
921int w_e_end_data_req(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
922{
923 struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w);
924 int ok;
925
926 if (unlikely(cancel)) {
927 drbd_free_ee(mdev, e);
928 dec_unacked(mdev);
929 return 1;
930 }
931
45bb912b 932 if (likely((e->flags & EE_WAS_ERROR) == 0)) {
b411b363
PR
933 ok = drbd_send_block(mdev, P_DATA_REPLY, e);
934 } else {
935 if (__ratelimit(&drbd_ratelimit_state))
936 dev_err(DEV, "Sending NegDReply. sector=%llus.\n",
937 (unsigned long long)e->sector);
938
939 ok = drbd_send_ack(mdev, P_NEG_DREPLY, e);
940 }
941
942 dec_unacked(mdev);
943
944 move_to_net_ee_or_free(mdev, e);
945
946 if (unlikely(!ok))
947 dev_err(DEV, "drbd_send_block() failed\n");
948 return ok;
949}
950
951/**
952 * w_e_end_rsdata_req() - Worker callback to send a P_RS_DATA_REPLY packet in response to a P_RS_DATA_REQUESTRS
953 * @mdev: DRBD device.
954 * @w: work object.
955 * @cancel: The connection will be closed anyways
956 */
957int w_e_end_rsdata_req(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
958{
959 struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w);
960 int ok;
961
962 if (unlikely(cancel)) {
963 drbd_free_ee(mdev, e);
964 dec_unacked(mdev);
965 return 1;
966 }
967
968 if (get_ldev_if_state(mdev, D_FAILED)) {
969 drbd_rs_complete_io(mdev, e->sector);
970 put_ldev(mdev);
971 }
972
d612d309
PR
973 if (mdev->state.conn == C_AHEAD) {
974 ok = drbd_send_ack(mdev, P_RS_CANCEL, e);
975 } else if (likely((e->flags & EE_WAS_ERROR) == 0)) {
b411b363
PR
976 if (likely(mdev->state.pdsk >= D_INCONSISTENT)) {
977 inc_rs_pending(mdev);
978 ok = drbd_send_block(mdev, P_RS_DATA_REPLY, e);
979 } else {
980 if (__ratelimit(&drbd_ratelimit_state))
981 dev_err(DEV, "Not sending RSDataReply, "
982 "partner DISKLESS!\n");
983 ok = 1;
984 }
985 } else {
986 if (__ratelimit(&drbd_ratelimit_state))
987 dev_err(DEV, "Sending NegRSDReply. sector %llus.\n",
988 (unsigned long long)e->sector);
989
990 ok = drbd_send_ack(mdev, P_NEG_RS_DREPLY, e);
991
992 /* update resync data with failure */
993 drbd_rs_failed_io(mdev, e->sector, e->size);
994 }
995
996 dec_unacked(mdev);
997
998 move_to_net_ee_or_free(mdev, e);
999
1000 if (unlikely(!ok))
1001 dev_err(DEV, "drbd_send_block() failed\n");
1002 return ok;
1003}
1004
1005int w_e_end_csum_rs_req(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1006{
1007 struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w);
1008 struct digest_info *di;
1009 int digest_size;
1010 void *digest = NULL;
1011 int ok, eq = 0;
1012
1013 if (unlikely(cancel)) {
1014 drbd_free_ee(mdev, e);
1015 dec_unacked(mdev);
1016 return 1;
1017 }
1018
1d53f09e
LE
1019 if (get_ldev(mdev)) {
1020 drbd_rs_complete_io(mdev, e->sector);
1021 put_ldev(mdev);
1022 }
b411b363 1023
85719573 1024 di = e->digest;
b411b363 1025
45bb912b 1026 if (likely((e->flags & EE_WAS_ERROR) == 0)) {
b411b363
PR
1027 /* quick hack to try to avoid a race against reconfiguration.
1028 * a real fix would be much more involved,
1029 * introducing more locking mechanisms */
1030 if (mdev->csums_tfm) {
1031 digest_size = crypto_hash_digestsize(mdev->csums_tfm);
1032 D_ASSERT(digest_size == di->digest_size);
1033 digest = kmalloc(digest_size, GFP_NOIO);
1034 }
1035 if (digest) {
45bb912b 1036 drbd_csum_ee(mdev, mdev->csums_tfm, e, digest);
b411b363
PR
1037 eq = !memcmp(digest, di->digest, digest_size);
1038 kfree(digest);
1039 }
1040
1041 if (eq) {
1042 drbd_set_in_sync(mdev, e->sector, e->size);
676396d5
LE
1043 /* rs_same_csums unit is BM_BLOCK_SIZE */
1044 mdev->rs_same_csum += e->size >> BM_BLOCK_SHIFT;
b411b363
PR
1045 ok = drbd_send_ack(mdev, P_RS_IS_IN_SYNC, e);
1046 } else {
1047 inc_rs_pending(mdev);
204bba99
PR
1048 e->block_id = ID_SYNCER; /* By setting block_id, digest pointer becomes invalid! */
1049 e->flags &= ~EE_HAS_DIGEST; /* This e no longer has a digest pointer */
1050 kfree(di);
b411b363
PR
1051 ok = drbd_send_block(mdev, P_RS_DATA_REPLY, e);
1052 }
1053 } else {
1054 ok = drbd_send_ack(mdev, P_NEG_RS_DREPLY, e);
1055 if (__ratelimit(&drbd_ratelimit_state))
1056 dev_err(DEV, "Sending NegDReply. I guess it gets messy.\n");
1057 }
1058
1059 dec_unacked(mdev);
b411b363
PR
1060 move_to_net_ee_or_free(mdev, e);
1061
1062 if (unlikely(!ok))
1063 dev_err(DEV, "drbd_send_block/ack() failed\n");
1064 return ok;
1065}
1066
53ea4331 1067/* TODO merge common code with w_e_send_csum */
b411b363
PR
1068int w_e_end_ov_req(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1069{
1070 struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w);
53ea4331
LE
1071 sector_t sector = e->sector;
1072 unsigned int size = e->size;
b411b363
PR
1073 int digest_size;
1074 void *digest;
1075 int ok = 1;
1076
1077 if (unlikely(cancel))
1078 goto out;
1079
b411b363 1080 digest_size = crypto_hash_digestsize(mdev->verify_tfm);
b411b363 1081 digest = kmalloc(digest_size, GFP_NOIO);
8f21420e
PR
1082 if (!digest) {
1083 ok = 0; /* terminate the connection in case the allocation failed */
1084 goto out;
b411b363
PR
1085 }
1086
8f21420e
PR
1087 if (likely(!(e->flags & EE_WAS_ERROR)))
1088 drbd_csum_ee(mdev, mdev->verify_tfm, e, digest);
1089 else
1090 memset(digest, 0, digest_size);
1091
53ea4331
LE
1092 /* Free e and pages before send.
1093 * In case we block on congestion, we could otherwise run into
1094 * some distributed deadlock, if the other side blocks on
1095 * congestion as well, because our receiver blocks in
1096 * drbd_pp_alloc due to pp_in_use > max_buffers. */
1097 drbd_free_ee(mdev, e);
1098 e = NULL;
8f21420e 1099 inc_rs_pending(mdev);
53ea4331
LE
1100 ok = drbd_send_drequest_csum(mdev, sector, size,
1101 digest, digest_size,
1102 P_OV_REPLY);
8f21420e
PR
1103 if (!ok)
1104 dec_rs_pending(mdev);
1105 kfree(digest);
1106
b411b363 1107out:
53ea4331
LE
1108 if (e)
1109 drbd_free_ee(mdev, e);
b411b363 1110 dec_unacked(mdev);
b411b363
PR
1111 return ok;
1112}
1113
1114void drbd_ov_oos_found(struct drbd_conf *mdev, sector_t sector, int size)
1115{
1116 if (mdev->ov_last_oos_start + mdev->ov_last_oos_size == sector) {
1117 mdev->ov_last_oos_size += size>>9;
1118 } else {
1119 mdev->ov_last_oos_start = sector;
1120 mdev->ov_last_oos_size = size>>9;
1121 }
1122 drbd_set_out_of_sync(mdev, sector, size);
b411b363
PR
1123}
1124
1125int w_e_end_ov_reply(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1126{
1127 struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w);
1128 struct digest_info *di;
b411b363 1129 void *digest;
53ea4331
LE
1130 sector_t sector = e->sector;
1131 unsigned int size = e->size;
1132 int digest_size;
b411b363
PR
1133 int ok, eq = 0;
1134
1135 if (unlikely(cancel)) {
1136 drbd_free_ee(mdev, e);
1137 dec_unacked(mdev);
1138 return 1;
1139 }
1140
1141 /* after "cancel", because after drbd_disconnect/drbd_rs_cancel_all
1142 * the resync lru has been cleaned up already */
1d53f09e
LE
1143 if (get_ldev(mdev)) {
1144 drbd_rs_complete_io(mdev, e->sector);
1145 put_ldev(mdev);
1146 }
b411b363 1147
85719573 1148 di = e->digest;
b411b363 1149
45bb912b 1150 if (likely((e->flags & EE_WAS_ERROR) == 0)) {
b411b363
PR
1151 digest_size = crypto_hash_digestsize(mdev->verify_tfm);
1152 digest = kmalloc(digest_size, GFP_NOIO);
1153 if (digest) {
45bb912b 1154 drbd_csum_ee(mdev, mdev->verify_tfm, e, digest);
b411b363
PR
1155
1156 D_ASSERT(digest_size == di->digest_size);
1157 eq = !memcmp(digest, di->digest, digest_size);
1158 kfree(digest);
1159 }
b411b363
PR
1160 }
1161
53ea4331
LE
1162 /* Free e and pages before send.
1163 * In case we block on congestion, we could otherwise run into
1164 * some distributed deadlock, if the other side blocks on
1165 * congestion as well, because our receiver blocks in
1166 * drbd_pp_alloc due to pp_in_use > max_buffers. */
1167 drbd_free_ee(mdev, e);
b411b363 1168 if (!eq)
53ea4331 1169 drbd_ov_oos_found(mdev, sector, size);
b411b363
PR
1170 else
1171 ov_oos_print(mdev);
1172
53ea4331 1173 ok = drbd_send_ack_ex(mdev, P_OV_RESULT, sector, size,
b411b363
PR
1174 eq ? ID_IN_SYNC : ID_OUT_OF_SYNC);
1175
53ea4331 1176 dec_unacked(mdev);
b411b363 1177
ea5442af
LE
1178 --mdev->ov_left;
1179
1180 /* let's advance progress step marks only for every other megabyte */
1181 if ((mdev->ov_left & 0x200) == 0x200)
1182 drbd_advance_rs_marks(mdev, mdev->ov_left);
1183
1184 if (mdev->ov_left == 0) {
b411b363
PR
1185 ov_oos_print(mdev);
1186 drbd_resync_finished(mdev);
1187 }
1188
1189 return ok;
1190}
1191
1192int w_prev_work_done(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1193{
1194 struct drbd_wq_barrier *b = container_of(w, struct drbd_wq_barrier, w);
1195 complete(&b->done);
1196 return 1;
1197}
1198
1199int w_send_barrier(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1200{
1201 struct drbd_tl_epoch *b = container_of(w, struct drbd_tl_epoch, w);
1202 struct p_barrier *p = &mdev->data.sbuf.barrier;
1203 int ok = 1;
1204
1205 /* really avoid racing with tl_clear. w.cb may have been referenced
1206 * just before it was reassigned and re-queued, so double check that.
1207 * actually, this race was harmless, since we only try to send the
1208 * barrier packet here, and otherwise do nothing with the object.
1209 * but compare with the head of w_clear_epoch */
1210 spin_lock_irq(&mdev->req_lock);
1211 if (w->cb != w_send_barrier || mdev->state.conn < C_CONNECTED)
1212 cancel = 1;
1213 spin_unlock_irq(&mdev->req_lock);
1214 if (cancel)
1215 return 1;
1216
1217 if (!drbd_get_data_sock(mdev))
1218 return 0;
1219 p->barrier = b->br_number;
1220 /* inc_ap_pending was done where this was queued.
1221 * dec_ap_pending will be done in got_BarrierAck
1222 * or (on connection loss) in w_clear_epoch. */
1223 ok = _drbd_send_cmd(mdev, mdev->data.socket, P_BARRIER,
0b70a13d 1224 (struct p_header80 *)p, sizeof(*p), 0);
b411b363
PR
1225 drbd_put_data_sock(mdev);
1226
1227 return ok;
1228}
1229
1230int w_send_write_hint(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1231{
1232 if (cancel)
1233 return 1;
1234 return drbd_send_short_cmd(mdev, P_UNPLUG_REMOTE);
1235}
1236
73a01a18
PR
1237int w_send_oos(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1238{
1239 struct drbd_request *req = container_of(w, struct drbd_request, w);
1240 int ok;
1241
1242 if (unlikely(cancel)) {
1243 req_mod(req, send_canceled);
1244 return 1;
1245 }
1246
1247 ok = drbd_send_oos(mdev, req);
1248 req_mod(req, oos_handed_to_network);
1249
1250 return ok;
1251}
1252
b411b363
PR
1253/**
1254 * w_send_dblock() - Worker callback to send a P_DATA packet in order to mirror a write request
1255 * @mdev: DRBD device.
1256 * @w: work object.
1257 * @cancel: The connection will be closed anyways
1258 */
1259int w_send_dblock(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1260{
1261 struct drbd_request *req = container_of(w, struct drbd_request, w);
1262 int ok;
1263
1264 if (unlikely(cancel)) {
1265 req_mod(req, send_canceled);
1266 return 1;
1267 }
1268
1269 ok = drbd_send_dblock(mdev, req);
1270 req_mod(req, ok ? handed_over_to_network : send_failed);
1271
1272 return ok;
1273}
1274
1275/**
1276 * w_send_read_req() - Worker callback to send a read request (P_DATA_REQUEST) packet
1277 * @mdev: DRBD device.
1278 * @w: work object.
1279 * @cancel: The connection will be closed anyways
1280 */
1281int w_send_read_req(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1282{
1283 struct drbd_request *req = container_of(w, struct drbd_request, w);
1284 int ok;
1285
1286 if (unlikely(cancel)) {
1287 req_mod(req, send_canceled);
1288 return 1;
1289 }
1290
1291 ok = drbd_send_drequest(mdev, P_DATA_REQUEST, req->sector, req->size,
1292 (unsigned long)req);
1293
1294 if (!ok) {
1295 /* ?? we set C_TIMEOUT or C_BROKEN_PIPE in drbd_send();
1296 * so this is probably redundant */
1297 if (mdev->state.conn >= C_CONNECTED)
1298 drbd_force_state(mdev, NS(conn, C_NETWORK_FAILURE));
1299 }
1300 req_mod(req, ok ? handed_over_to_network : send_failed);
1301
1302 return ok;
1303}
1304
265be2d0
PR
1305int w_restart_disk_io(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1306{
1307 struct drbd_request *req = container_of(w, struct drbd_request, w);
1308
0778286a 1309 if (bio_data_dir(req->master_bio) == WRITE && req->rq_state & RQ_IN_ACT_LOG)
265be2d0
PR
1310 drbd_al_begin_io(mdev, req->sector);
1311 /* Calling drbd_al_begin_io() out of the worker might deadlocks
1312 theoretically. Practically it can not deadlock, since this is
1313 only used when unfreezing IOs. All the extents of the requests
1314 that made it into the TL are already active */
1315
1316 drbd_req_make_private_bio(req, req->master_bio);
1317 req->private_bio->bi_bdev = mdev->ldev->backing_bdev;
1318 generic_make_request(req->private_bio);
1319
1320 return 1;
1321}
1322
b411b363
PR
1323static int _drbd_may_sync_now(struct drbd_conf *mdev)
1324{
1325 struct drbd_conf *odev = mdev;
1326
1327 while (1) {
1328 if (odev->sync_conf.after == -1)
1329 return 1;
1330 odev = minor_to_mdev(odev->sync_conf.after);
1331 ERR_IF(!odev) return 1;
1332 if ((odev->state.conn >= C_SYNC_SOURCE &&
1333 odev->state.conn <= C_PAUSED_SYNC_T) ||
1334 odev->state.aftr_isp || odev->state.peer_isp ||
1335 odev->state.user_isp)
1336 return 0;
1337 }
1338}
1339
1340/**
1341 * _drbd_pause_after() - Pause resync on all devices that may not resync now
1342 * @mdev: DRBD device.
1343 *
1344 * Called from process context only (admin command and after_state_ch).
1345 */
1346static int _drbd_pause_after(struct drbd_conf *mdev)
1347{
1348 struct drbd_conf *odev;
1349 int i, rv = 0;
1350
1351 for (i = 0; i < minor_count; i++) {
1352 odev = minor_to_mdev(i);
1353 if (!odev)
1354 continue;
1355 if (odev->state.conn == C_STANDALONE && odev->state.disk == D_DISKLESS)
1356 continue;
1357 if (!_drbd_may_sync_now(odev))
1358 rv |= (__drbd_set_state(_NS(odev, aftr_isp, 1), CS_HARD, NULL)
1359 != SS_NOTHING_TO_DO);
1360 }
1361
1362 return rv;
1363}
1364
1365/**
1366 * _drbd_resume_next() - Resume resync on all devices that may resync now
1367 * @mdev: DRBD device.
1368 *
1369 * Called from process context only (admin command and worker).
1370 */
1371static int _drbd_resume_next(struct drbd_conf *mdev)
1372{
1373 struct drbd_conf *odev;
1374 int i, rv = 0;
1375
1376 for (i = 0; i < minor_count; i++) {
1377 odev = minor_to_mdev(i);
1378 if (!odev)
1379 continue;
1380 if (odev->state.conn == C_STANDALONE && odev->state.disk == D_DISKLESS)
1381 continue;
1382 if (odev->state.aftr_isp) {
1383 if (_drbd_may_sync_now(odev))
1384 rv |= (__drbd_set_state(_NS(odev, aftr_isp, 0),
1385 CS_HARD, NULL)
1386 != SS_NOTHING_TO_DO) ;
1387 }
1388 }
1389 return rv;
1390}
1391
1392void resume_next_sg(struct drbd_conf *mdev)
1393{
1394 write_lock_irq(&global_state_lock);
1395 _drbd_resume_next(mdev);
1396 write_unlock_irq(&global_state_lock);
1397}
1398
1399void suspend_other_sg(struct drbd_conf *mdev)
1400{
1401 write_lock_irq(&global_state_lock);
1402 _drbd_pause_after(mdev);
1403 write_unlock_irq(&global_state_lock);
1404}
1405
1406static int sync_after_error(struct drbd_conf *mdev, int o_minor)
1407{
1408 struct drbd_conf *odev;
1409
1410 if (o_minor == -1)
1411 return NO_ERROR;
1412 if (o_minor < -1 || minor_to_mdev(o_minor) == NULL)
1413 return ERR_SYNC_AFTER;
1414
1415 /* check for loops */
1416 odev = minor_to_mdev(o_minor);
1417 while (1) {
1418 if (odev == mdev)
1419 return ERR_SYNC_AFTER_CYCLE;
1420
1421 /* dependency chain ends here, no cycles. */
1422 if (odev->sync_conf.after == -1)
1423 return NO_ERROR;
1424
1425 /* follow the dependency chain */
1426 odev = minor_to_mdev(odev->sync_conf.after);
1427 }
1428}
1429
1430int drbd_alter_sa(struct drbd_conf *mdev, int na)
1431{
1432 int changes;
1433 int retcode;
1434
1435 write_lock_irq(&global_state_lock);
1436 retcode = sync_after_error(mdev, na);
1437 if (retcode == NO_ERROR) {
1438 mdev->sync_conf.after = na;
1439 do {
1440 changes = _drbd_pause_after(mdev);
1441 changes |= _drbd_resume_next(mdev);
1442 } while (changes);
1443 }
1444 write_unlock_irq(&global_state_lock);
1445 return retcode;
1446}
1447
9bd28d3c
LE
1448void drbd_rs_controller_reset(struct drbd_conf *mdev)
1449{
1450 atomic_set(&mdev->rs_sect_in, 0);
1451 atomic_set(&mdev->rs_sect_ev, 0);
1452 mdev->rs_in_flight = 0;
1453 mdev->rs_planed = 0;
1454 spin_lock(&mdev->peer_seq_lock);
1455 fifo_set(&mdev->rs_plan_s, 0);
1456 spin_unlock(&mdev->peer_seq_lock);
1457}
1458
b411b363
PR
1459/**
1460 * drbd_start_resync() - Start the resync process
1461 * @mdev: DRBD device.
1462 * @side: Either C_SYNC_SOURCE or C_SYNC_TARGET
1463 *
1464 * This function might bring you directly into one of the
1465 * C_PAUSED_SYNC_* states.
1466 */
1467void drbd_start_resync(struct drbd_conf *mdev, enum drbd_conns side)
1468{
1469 union drbd_state ns;
1470 int r;
1471
c4752ef1 1472 if (mdev->state.conn >= C_SYNC_SOURCE && mdev->state.conn < C_AHEAD) {
b411b363
PR
1473 dev_err(DEV, "Resync already running!\n");
1474 return;
1475 }
1476
59817f4f
PR
1477 if (mdev->state.conn < C_AHEAD) {
1478 /* In case a previous resync run was aborted by an IO error/detach on the peer. */
1479 drbd_rs_cancel_all(mdev);
1480 /* This should be done when we abort the resync. We definitely do not
1481 want to have this for connections going back and forth between
1482 Ahead/Behind and SyncSource/SyncTarget */
1483 }
b411b363
PR
1484
1485 if (side == C_SYNC_TARGET) {
1486 /* Since application IO was locked out during C_WF_BITMAP_T and
1487 C_WF_SYNC_UUID we are still unmodified. Before going to C_SYNC_TARGET
1488 we check that we might make the data inconsistent. */
1489 r = drbd_khelper(mdev, "before-resync-target");
1490 r = (r >> 8) & 0xff;
1491 if (r > 0) {
1492 dev_info(DEV, "before-resync-target handler returned %d, "
1493 "dropping connection.\n", r);
1494 drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
1495 return;
1496 }
09b9e797
PR
1497 } else /* C_SYNC_SOURCE */ {
1498 r = drbd_khelper(mdev, "before-resync-source");
1499 r = (r >> 8) & 0xff;
1500 if (r > 0) {
1501 if (r == 3) {
1502 dev_info(DEV, "before-resync-source handler returned %d, "
1503 "ignoring. Old userland tools?", r);
1504 } else {
1505 dev_info(DEV, "before-resync-source handler returned %d, "
1506 "dropping connection.\n", r);
1507 drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
1508 return;
1509 }
1510 }
b411b363
PR
1511 }
1512
1513 drbd_state_lock(mdev);
1514
1515 if (!get_ldev_if_state(mdev, D_NEGOTIATING)) {
1516 drbd_state_unlock(mdev);
1517 return;
1518 }
1519
b411b363
PR
1520 write_lock_irq(&global_state_lock);
1521 ns = mdev->state;
1522
1523 ns.aftr_isp = !_drbd_may_sync_now(mdev);
1524
1525 ns.conn = side;
1526
1527 if (side == C_SYNC_TARGET)
1528 ns.disk = D_INCONSISTENT;
1529 else /* side == C_SYNC_SOURCE */
1530 ns.pdsk = D_INCONSISTENT;
1531
1532 r = __drbd_set_state(mdev, ns, CS_VERBOSE, NULL);
1533 ns = mdev->state;
1534
1535 if (ns.conn < C_CONNECTED)
1536 r = SS_UNKNOWN_ERROR;
1537
1538 if (r == SS_SUCCESS) {
1d7734a0
LE
1539 unsigned long tw = drbd_bm_total_weight(mdev);
1540 unsigned long now = jiffies;
1541 int i;
1542
b411b363
PR
1543 mdev->rs_failed = 0;
1544 mdev->rs_paused = 0;
b411b363 1545 mdev->rs_same_csum = 0;
0f0601f4
LE
1546 mdev->rs_last_events = 0;
1547 mdev->rs_last_sect_ev = 0;
1d7734a0
LE
1548 mdev->rs_total = tw;
1549 mdev->rs_start = now;
1550 for (i = 0; i < DRBD_SYNC_MARKS; i++) {
1551 mdev->rs_mark_left[i] = tw;
1552 mdev->rs_mark_time[i] = now;
1553 }
b411b363
PR
1554 _drbd_pause_after(mdev);
1555 }
1556 write_unlock_irq(&global_state_lock);
5a22db89 1557
b411b363
PR
1558 if (r == SS_SUCCESS) {
1559 dev_info(DEV, "Began resync as %s (will sync %lu KB [%lu bits set]).\n",
1560 drbd_conn_str(ns.conn),
1561 (unsigned long) mdev->rs_total << (BM_BLOCK_SHIFT-10),
1562 (unsigned long) mdev->rs_total);
6c922ed5
LE
1563 if (side == C_SYNC_TARGET)
1564 mdev->bm_resync_fo = 0;
1565
1566 /* Since protocol 96, we must serialize drbd_gen_and_send_sync_uuid
1567 * with w_send_oos, or the sync target will get confused as to
1568 * how much bits to resync. We cannot do that always, because for an
1569 * empty resync and protocol < 95, we need to do it here, as we call
1570 * drbd_resync_finished from here in that case.
1571 * We drbd_gen_and_send_sync_uuid here for protocol < 96,
1572 * and from after_state_ch otherwise. */
1573 if (side == C_SYNC_SOURCE && mdev->agreed_pro_version < 96)
1574 drbd_gen_and_send_sync_uuid(mdev);
b411b363 1575
af85e8e8
LE
1576 if (mdev->agreed_pro_version < 95 && mdev->rs_total == 0) {
1577 /* This still has a race (about when exactly the peers
1578 * detect connection loss) that can lead to a full sync
1579 * on next handshake. In 8.3.9 we fixed this with explicit
1580 * resync-finished notifications, but the fix
1581 * introduces a protocol change. Sleeping for some
1582 * time longer than the ping interval + timeout on the
1583 * SyncSource, to give the SyncTarget the chance to
1584 * detect connection loss, then waiting for a ping
1585 * response (implicit in drbd_resync_finished) reduces
1586 * the race considerably, but does not solve it. */
1587 if (side == C_SYNC_SOURCE)
1588 schedule_timeout_interruptible(
1589 mdev->net_conf->ping_int * HZ +
1590 mdev->net_conf->ping_timeo*HZ/9);
b411b363 1591 drbd_resync_finished(mdev);
b411b363
PR
1592 }
1593
9bd28d3c 1594 drbd_rs_controller_reset(mdev);
b411b363
PR
1595 /* ns.conn may already be != mdev->state.conn,
1596 * we may have been paused in between, or become paused until
1597 * the timer triggers.
1598 * No matter, that is handled in resync_timer_fn() */
1599 if (ns.conn == C_SYNC_TARGET)
1600 mod_timer(&mdev->resync_timer, jiffies);
1601
1602 drbd_md_sync(mdev);
1603 }
5a22db89 1604 put_ldev(mdev);
d0c3f60f 1605 drbd_state_unlock(mdev);
b411b363
PR
1606}
1607
1608int drbd_worker(struct drbd_thread *thi)
1609{
1610 struct drbd_conf *mdev = thi->mdev;
1611 struct drbd_work *w = NULL;
1612 LIST_HEAD(work_list);
1613 int intr = 0, i;
1614
1615 sprintf(current->comm, "drbd%d_worker", mdev_to_minor(mdev));
1616
1617 while (get_t_state(thi) == Running) {
1618 drbd_thread_current_set_cpu(mdev);
1619
1620 if (down_trylock(&mdev->data.work.s)) {
1621 mutex_lock(&mdev->data.mutex);
1622 if (mdev->data.socket && !mdev->net_conf->no_cork)
1623 drbd_tcp_uncork(mdev->data.socket);
1624 mutex_unlock(&mdev->data.mutex);
1625
1626 intr = down_interruptible(&mdev->data.work.s);
1627
1628 mutex_lock(&mdev->data.mutex);
1629 if (mdev->data.socket && !mdev->net_conf->no_cork)
1630 drbd_tcp_cork(mdev->data.socket);
1631 mutex_unlock(&mdev->data.mutex);
1632 }
1633
1634 if (intr) {
1635 D_ASSERT(intr == -EINTR);
1636 flush_signals(current);
1637 ERR_IF (get_t_state(thi) == Running)
1638 continue;
1639 break;
1640 }
1641
1642 if (get_t_state(thi) != Running)
1643 break;
1644 /* With this break, we have done a down() but not consumed
1645 the entry from the list. The cleanup code takes care of
1646 this... */
1647
1648 w = NULL;
1649 spin_lock_irq(&mdev->data.work.q_lock);
1650 ERR_IF(list_empty(&mdev->data.work.q)) {
1651 /* something terribly wrong in our logic.
1652 * we were able to down() the semaphore,
1653 * but the list is empty... doh.
1654 *
1655 * what is the best thing to do now?
1656 * try again from scratch, restarting the receiver,
1657 * asender, whatnot? could break even more ugly,
1658 * e.g. when we are primary, but no good local data.
1659 *
1660 * I'll try to get away just starting over this loop.
1661 */
1662 spin_unlock_irq(&mdev->data.work.q_lock);
1663 continue;
1664 }
1665 w = list_entry(mdev->data.work.q.next, struct drbd_work, list);
1666 list_del_init(&w->list);
1667 spin_unlock_irq(&mdev->data.work.q_lock);
1668
1669 if (!w->cb(mdev, w, mdev->state.conn < C_CONNECTED)) {
1670 /* dev_warn(DEV, "worker: a callback failed! \n"); */
1671 if (mdev->state.conn >= C_CONNECTED)
1672 drbd_force_state(mdev,
1673 NS(conn, C_NETWORK_FAILURE));
1674 }
1675 }
1676 D_ASSERT(test_bit(DEVICE_DYING, &mdev->flags));
1677 D_ASSERT(test_bit(CONFIG_PENDING, &mdev->flags));
1678
1679 spin_lock_irq(&mdev->data.work.q_lock);
1680 i = 0;
1681 while (!list_empty(&mdev->data.work.q)) {
1682 list_splice_init(&mdev->data.work.q, &work_list);
1683 spin_unlock_irq(&mdev->data.work.q_lock);
1684
1685 while (!list_empty(&work_list)) {
1686 w = list_entry(work_list.next, struct drbd_work, list);
1687 list_del_init(&w->list);
1688 w->cb(mdev, w, 1);
1689 i++; /* dead debugging code */
1690 }
1691
1692 spin_lock_irq(&mdev->data.work.q_lock);
1693 }
1694 sema_init(&mdev->data.work.s, 0);
1695 /* DANGEROUS race: if someone did queue his work within the spinlock,
1696 * but up() ed outside the spinlock, we could get an up() on the
1697 * semaphore without corresponding list entry.
1698 * So don't do that.
1699 */
1700 spin_unlock_irq(&mdev->data.work.q_lock);
1701
1702 D_ASSERT(mdev->state.disk == D_DISKLESS && mdev->state.conn == C_STANDALONE);
1703 /* _drbd_set_state only uses stop_nowait.
1704 * wait here for the Exiting receiver. */
1705 drbd_thread_stop(&mdev->receiver);
1706 drbd_mdev_cleanup(mdev);
1707
1708 dev_info(DEV, "worker terminated\n");
1709
1710 clear_bit(DEVICE_DYING, &mdev->flags);
1711 clear_bit(CONFIG_PENDING, &mdev->flags);
1712 wake_up(&mdev->state_wait);
1713
1714 return 0;
1715}
This page took 0.491159 seconds and 5 git commands to generate.