drbd: improve log message if received sector offset exceeds local capacity
[deliverable/linux.git] / drivers / block / drbd / drbd_worker.c
CommitLineData
b411b363
PR
1/*
2 drbd_worker.c
3
4 This file is part of DRBD by Philipp Reisner and Lars Ellenberg.
5
6 Copyright (C) 2001-2008, LINBIT Information Technologies GmbH.
7 Copyright (C) 1999-2008, Philipp Reisner <philipp.reisner@linbit.com>.
8 Copyright (C) 2002-2008, Lars Ellenberg <lars.ellenberg@linbit.com>.
9
10 drbd is free software; you can redistribute it and/or modify
11 it under the terms of the GNU General Public License as published by
12 the Free Software Foundation; either version 2, or (at your option)
13 any later version.
14
15 drbd is distributed in the hope that it will be useful,
16 but WITHOUT ANY WARRANTY; without even the implied warranty of
17 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18 GNU General Public License for more details.
19
20 You should have received a copy of the GNU General Public License
21 along with drbd; see the file COPYING. If not, write to
22 the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
23
24 */
25
b411b363 26#include <linux/module.h>
b411b363
PR
27#include <linux/drbd.h>
28#include <linux/sched.h>
b411b363
PR
29#include <linux/wait.h>
30#include <linux/mm.h>
31#include <linux/memcontrol.h>
32#include <linux/mm_inline.h>
33#include <linux/slab.h>
34#include <linux/random.h>
b411b363
PR
35#include <linux/string.h>
36#include <linux/scatterlist.h>
37
38#include "drbd_int.h"
39#include "drbd_req.h"
b411b363 40
b411b363 41static int w_make_ov_request(struct drbd_conf *mdev, struct drbd_work *w, int cancel);
9d77a5fe
PR
42static int w_make_resync_request(struct drbd_conf *mdev,
43 struct drbd_work *w, int cancel);
b411b363
PR
44
45
46
47/* defined here:
48 drbd_md_io_complete
45bb912b 49 drbd_endio_sec
b411b363
PR
50 drbd_endio_pri
51
52 * more endio handlers:
53 atodb_endio in drbd_actlog.c
54 drbd_bm_async_io_complete in drbd_bitmap.c
55
56 * For all these callbacks, note the following:
57 * The callbacks will be called in irq context by the IDE drivers,
58 * and in Softirqs/Tasklets/BH context by the SCSI drivers.
59 * Try to get the locking right :)
60 *
61 */
62
63
64/* About the global_state_lock
65 Each state transition on an device holds a read lock. In case we have
66 to evaluate the sync after dependencies, we grab a write lock, because
67 we need stable states on all devices for that. */
68rwlock_t global_state_lock;
69
70/* used for synchronous meta data and bitmap IO
71 * submitted by drbd_md_sync_page_io()
72 */
73void drbd_md_io_complete(struct bio *bio, int error)
74{
75 struct drbd_md_io *md_io;
76
77 md_io = (struct drbd_md_io *)bio->bi_private;
78 md_io->error = error;
79
b411b363
PR
80 complete(&md_io->event);
81}
82
83/* reads on behalf of the partner,
84 * "submitted" by the receiver
85 */
45bb912b 86void drbd_endio_read_sec_final(struct drbd_epoch_entry *e) __releases(local)
b411b363
PR
87{
88 unsigned long flags = 0;
45bb912b 89 struct drbd_conf *mdev = e->mdev;
b411b363
PR
90
91 D_ASSERT(e->block_id != ID_VACANT);
92
b411b363
PR
93 spin_lock_irqsave(&mdev->req_lock, flags);
94 mdev->read_cnt += e->size >> 9;
95 list_del(&e->w.list);
96 if (list_empty(&mdev->read_ee))
97 wake_up(&mdev->ee_wait);
45bb912b 98 if (test_bit(__EE_WAS_ERROR, &e->flags))
81e84650 99 __drbd_chk_io_error(mdev, false);
b411b363
PR
100 spin_unlock_irqrestore(&mdev->req_lock, flags);
101
b411b363
PR
102 drbd_queue_work(&mdev->data.work, &e->w);
103 put_ldev(mdev);
b411b363
PR
104}
105
106/* writes on behalf of the partner, or resync writes,
45bb912b
LE
107 * "submitted" by the receiver, final stage. */
108static void drbd_endio_write_sec_final(struct drbd_epoch_entry *e) __releases(local)
b411b363
PR
109{
110 unsigned long flags = 0;
45bb912b 111 struct drbd_conf *mdev = e->mdev;
b411b363
PR
112 sector_t e_sector;
113 int do_wake;
114 int is_syncer_req;
115 int do_al_complete_io;
b411b363 116
b411b363
PR
117 D_ASSERT(e->block_id != ID_VACANT);
118
b411b363
PR
119 /* after we moved e to done_ee,
120 * we may no longer access it,
121 * it may be freed/reused already!
122 * (as soon as we release the req_lock) */
123 e_sector = e->sector;
124 do_al_complete_io = e->flags & EE_CALL_AL_COMPLETE_IO;
45bb912b 125 is_syncer_req = is_syncer_block_id(e->block_id);
b411b363 126
45bb912b
LE
127 spin_lock_irqsave(&mdev->req_lock, flags);
128 mdev->writ_cnt += e->size >> 9;
b411b363
PR
129 list_del(&e->w.list); /* has been on active_ee or sync_ee */
130 list_add_tail(&e->w.list, &mdev->done_ee);
131
b411b363
PR
132 /* No hlist_del_init(&e->colision) here, we did not send the Ack yet,
133 * neither did we wake possibly waiting conflicting requests.
134 * done from "drbd_process_done_ee" within the appropriate w.cb
135 * (e_end_block/e_end_resync_block) or from _drbd_clear_done_ee */
136
137 do_wake = is_syncer_req
138 ? list_empty(&mdev->sync_ee)
139 : list_empty(&mdev->active_ee);
140
45bb912b 141 if (test_bit(__EE_WAS_ERROR, &e->flags))
81e84650 142 __drbd_chk_io_error(mdev, false);
b411b363
PR
143 spin_unlock_irqrestore(&mdev->req_lock, flags);
144
145 if (is_syncer_req)
146 drbd_rs_complete_io(mdev, e_sector);
147
148 if (do_wake)
149 wake_up(&mdev->ee_wait);
150
151 if (do_al_complete_io)
152 drbd_al_complete_io(mdev, e_sector);
153
154 wake_asender(mdev);
155 put_ldev(mdev);
45bb912b 156}
b411b363 157
45bb912b
LE
158/* writes on behalf of the partner, or resync writes,
159 * "submitted" by the receiver.
160 */
161void drbd_endio_sec(struct bio *bio, int error)
162{
163 struct drbd_epoch_entry *e = bio->bi_private;
164 struct drbd_conf *mdev = e->mdev;
165 int uptodate = bio_flagged(bio, BIO_UPTODATE);
166 int is_write = bio_data_dir(bio) == WRITE;
167
07194272 168 if (error && __ratelimit(&drbd_ratelimit_state))
45bb912b
LE
169 dev_warn(DEV, "%s: error=%d s=%llus\n",
170 is_write ? "write" : "read", error,
171 (unsigned long long)e->sector);
172 if (!error && !uptodate) {
07194272
LE
173 if (__ratelimit(&drbd_ratelimit_state))
174 dev_warn(DEV, "%s: setting error to -EIO s=%llus\n",
175 is_write ? "write" : "read",
176 (unsigned long long)e->sector);
45bb912b
LE
177 /* strange behavior of some lower level drivers...
178 * fail the request by clearing the uptodate flag,
179 * but do not return any error?! */
180 error = -EIO;
181 }
182
183 if (error)
184 set_bit(__EE_WAS_ERROR, &e->flags);
185
186 bio_put(bio); /* no need for the bio anymore */
187 if (atomic_dec_and_test(&e->pending_bios)) {
188 if (is_write)
189 drbd_endio_write_sec_final(e);
190 else
191 drbd_endio_read_sec_final(e);
192 }
b411b363
PR
193}
194
195/* read, readA or write requests on R_PRIMARY coming from drbd_make_request
196 */
197void drbd_endio_pri(struct bio *bio, int error)
198{
a115413d 199 unsigned long flags;
b411b363
PR
200 struct drbd_request *req = bio->bi_private;
201 struct drbd_conf *mdev = req->mdev;
a115413d 202 struct bio_and_error m;
b411b363
PR
203 enum drbd_req_event what;
204 int uptodate = bio_flagged(bio, BIO_UPTODATE);
205
b411b363
PR
206 if (!error && !uptodate) {
207 dev_warn(DEV, "p %s: setting error to -EIO\n",
208 bio_data_dir(bio) == WRITE ? "write" : "read");
209 /* strange behavior of some lower level drivers...
210 * fail the request by clearing the uptodate flag,
211 * but do not return any error?! */
212 error = -EIO;
213 }
214
b411b363
PR
215 /* to avoid recursion in __req_mod */
216 if (unlikely(error)) {
217 what = (bio_data_dir(bio) == WRITE)
218 ? write_completed_with_error
5c3c7e64 219 : (bio_rw(bio) == READ)
b411b363
PR
220 ? read_completed_with_error
221 : read_ahead_completed_with_error;
222 } else
223 what = completed_ok;
224
225 bio_put(req->private_bio);
226 req->private_bio = ERR_PTR(error);
227
a115413d
LE
228 /* not req_mod(), we need irqsave here! */
229 spin_lock_irqsave(&mdev->req_lock, flags);
230 __req_mod(req, what, &m);
231 spin_unlock_irqrestore(&mdev->req_lock, flags);
232
233 if (m.bio)
234 complete_master_bio(mdev, &m);
b411b363
PR
235}
236
b411b363
PR
237int w_read_retry_remote(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
238{
239 struct drbd_request *req = container_of(w, struct drbd_request, w);
240
241 /* We should not detach for read io-error,
242 * but try to WRITE the P_DATA_REPLY to the failed location,
243 * to give the disk the chance to relocate that block */
244
245 spin_lock_irq(&mdev->req_lock);
d255e5ff
LE
246 if (cancel || mdev->state.pdsk != D_UP_TO_DATE) {
247 _req_mod(req, read_retry_remote_canceled);
b411b363 248 spin_unlock_irq(&mdev->req_lock);
b411b363
PR
249 return 1;
250 }
251 spin_unlock_irq(&mdev->req_lock);
252
253 return w_send_read_req(mdev, w, 0);
254}
255
45bb912b
LE
256void drbd_csum_ee(struct drbd_conf *mdev, struct crypto_hash *tfm, struct drbd_epoch_entry *e, void *digest)
257{
258 struct hash_desc desc;
259 struct scatterlist sg;
260 struct page *page = e->pages;
261 struct page *tmp;
262 unsigned len;
263
264 desc.tfm = tfm;
265 desc.flags = 0;
266
267 sg_init_table(&sg, 1);
268 crypto_hash_init(&desc);
269
270 while ((tmp = page_chain_next(page))) {
271 /* all but the last page will be fully used */
272 sg_set_page(&sg, page, PAGE_SIZE, 0);
273 crypto_hash_update(&desc, &sg, sg.length);
274 page = tmp;
275 }
276 /* and now the last, possibly only partially used page */
277 len = e->size & (PAGE_SIZE - 1);
278 sg_set_page(&sg, page, len ?: PAGE_SIZE, 0);
279 crypto_hash_update(&desc, &sg, sg.length);
280 crypto_hash_final(&desc, digest);
281}
282
283void drbd_csum_bio(struct drbd_conf *mdev, struct crypto_hash *tfm, struct bio *bio, void *digest)
b411b363
PR
284{
285 struct hash_desc desc;
286 struct scatterlist sg;
287 struct bio_vec *bvec;
288 int i;
289
290 desc.tfm = tfm;
291 desc.flags = 0;
292
293 sg_init_table(&sg, 1);
294 crypto_hash_init(&desc);
295
296 __bio_for_each_segment(bvec, bio, i, 0) {
297 sg_set_page(&sg, bvec->bv_page, bvec->bv_len, bvec->bv_offset);
298 crypto_hash_update(&desc, &sg, sg.length);
299 }
300 crypto_hash_final(&desc, digest);
301}
302
303static int w_e_send_csum(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
304{
305 struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w);
306 int digest_size;
307 void *digest;
308 int ok;
309
310 D_ASSERT(e->block_id == DRBD_MAGIC + 0xbeef);
311
312 if (unlikely(cancel)) {
313 drbd_free_ee(mdev, e);
314 return 1;
315 }
316
45bb912b 317 if (likely((e->flags & EE_WAS_ERROR) == 0)) {
b411b363
PR
318 digest_size = crypto_hash_digestsize(mdev->csums_tfm);
319 digest = kmalloc(digest_size, GFP_NOIO);
320 if (digest) {
45bb912b 321 drbd_csum_ee(mdev, mdev->csums_tfm, e, digest);
b411b363
PR
322
323 inc_rs_pending(mdev);
324 ok = drbd_send_drequest_csum(mdev,
325 e->sector,
326 e->size,
327 digest,
328 digest_size,
329 P_CSUM_RS_REQUEST);
330 kfree(digest);
331 } else {
332 dev_err(DEV, "kmalloc() of digest failed.\n");
333 ok = 0;
334 }
335 } else
336 ok = 1;
337
338 drbd_free_ee(mdev, e);
339
340 if (unlikely(!ok))
341 dev_err(DEV, "drbd_send_drequest(..., csum) failed\n");
342 return ok;
343}
344
345#define GFP_TRY (__GFP_HIGHMEM | __GFP_NOWARN)
346
347static int read_for_csum(struct drbd_conf *mdev, sector_t sector, int size)
348{
349 struct drbd_epoch_entry *e;
350
351 if (!get_ldev(mdev))
80a40e43 352 return -EIO;
b411b363 353
e3555d85 354 if (drbd_rs_should_slow_down(mdev, sector))
0f0601f4
LE
355 goto defer;
356
b411b363
PR
357 /* GFP_TRY, because if there is no memory available right now, this may
358 * be rescheduled for later. It is "only" background resync, after all. */
359 e = drbd_alloc_ee(mdev, DRBD_MAGIC+0xbeef, sector, size, GFP_TRY);
45bb912b 360 if (!e)
80a40e43 361 goto defer;
b411b363 362
80a40e43 363 e->w.cb = w_e_send_csum;
b411b363
PR
364 spin_lock_irq(&mdev->req_lock);
365 list_add(&e->w.list, &mdev->read_ee);
366 spin_unlock_irq(&mdev->req_lock);
367
0f0601f4 368 atomic_add(size >> 9, &mdev->rs_sect_ev);
45bb912b 369 if (drbd_submit_ee(mdev, e, READ, DRBD_FAULT_RS_RD) == 0)
80a40e43 370 return 0;
b411b363 371
10f6d992
LE
372 /* If it failed because of ENOMEM, retry should help. If it failed
373 * because bio_add_page failed (probably broken lower level driver),
374 * retry may or may not help.
375 * If it does not, you may need to force disconnect. */
22cc37a9
LE
376 spin_lock_irq(&mdev->req_lock);
377 list_del(&e->w.list);
378 spin_unlock_irq(&mdev->req_lock);
379
45bb912b 380 drbd_free_ee(mdev, e);
80a40e43 381defer:
45bb912b 382 put_ldev(mdev);
80a40e43 383 return -EAGAIN;
b411b363
PR
384}
385
794abb75 386int w_resync_timer(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
b411b363 387{
63106d3c
PR
388 switch (mdev->state.conn) {
389 case C_VERIFY_S:
794abb75 390 w_make_ov_request(mdev, w, cancel);
63106d3c
PR
391 break;
392 case C_SYNC_TARGET:
794abb75 393 w_make_resync_request(mdev, w, cancel);
63106d3c 394 break;
b411b363
PR
395 }
396
794abb75
PR
397 return 1;
398}
399
400void resync_timer_fn(unsigned long data)
401{
402 struct drbd_conf *mdev = (struct drbd_conf *) data;
403
404 if (list_empty(&mdev->resync_work.list))
b411b363
PR
405 drbd_queue_work(&mdev->data.work, &mdev->resync_work);
406}
407
778f271d
PR
408static void fifo_set(struct fifo_buffer *fb, int value)
409{
410 int i;
411
412 for (i = 0; i < fb->size; i++)
f10f2623 413 fb->values[i] = value;
778f271d
PR
414}
415
416static int fifo_push(struct fifo_buffer *fb, int value)
417{
418 int ov;
419
420 ov = fb->values[fb->head_index];
421 fb->values[fb->head_index++] = value;
422
423 if (fb->head_index >= fb->size)
424 fb->head_index = 0;
425
426 return ov;
427}
428
429static void fifo_add_val(struct fifo_buffer *fb, int value)
430{
431 int i;
432
433 for (i = 0; i < fb->size; i++)
434 fb->values[i] += value;
435}
436
9d77a5fe 437static int drbd_rs_controller(struct drbd_conf *mdev)
778f271d
PR
438{
439 unsigned int sect_in; /* Number of sectors that came in since the last turn */
440 unsigned int want; /* The number of sectors we want in the proxy */
441 int req_sect; /* Number of sectors to request in this turn */
442 int correction; /* Number of sectors more we need in the proxy*/
443 int cps; /* correction per invocation of drbd_rs_controller() */
444 int steps; /* Number of time steps to plan ahead */
445 int curr_corr;
446 int max_sect;
447
448 sect_in = atomic_xchg(&mdev->rs_sect_in, 0); /* Number of sectors that came in */
449 mdev->rs_in_flight -= sect_in;
450
451 spin_lock(&mdev->peer_seq_lock); /* get an atomic view on mdev->rs_plan_s */
452
453 steps = mdev->rs_plan_s.size; /* (mdev->sync_conf.c_plan_ahead * 10 * SLEEP_TIME) / HZ; */
454
455 if (mdev->rs_in_flight + sect_in == 0) { /* At start of resync */
456 want = ((mdev->sync_conf.rate * 2 * SLEEP_TIME) / HZ) * steps;
457 } else { /* normal path */
458 want = mdev->sync_conf.c_fill_target ? mdev->sync_conf.c_fill_target :
459 sect_in * mdev->sync_conf.c_delay_target * HZ / (SLEEP_TIME * 10);
460 }
461
462 correction = want - mdev->rs_in_flight - mdev->rs_planed;
463
464 /* Plan ahead */
465 cps = correction / steps;
466 fifo_add_val(&mdev->rs_plan_s, cps);
467 mdev->rs_planed += cps * steps;
468
469 /* What we do in this step */
470 curr_corr = fifo_push(&mdev->rs_plan_s, 0);
471 spin_unlock(&mdev->peer_seq_lock);
472 mdev->rs_planed -= curr_corr;
473
474 req_sect = sect_in + curr_corr;
475 if (req_sect < 0)
476 req_sect = 0;
477
478 max_sect = (mdev->sync_conf.c_max_rate * 2 * SLEEP_TIME) / HZ;
479 if (req_sect > max_sect)
480 req_sect = max_sect;
481
482 /*
483 dev_warn(DEV, "si=%u if=%d wa=%u co=%d st=%d cps=%d pl=%d cc=%d rs=%d\n",
484 sect_in, mdev->rs_in_flight, want, correction,
485 steps, cps, mdev->rs_planed, curr_corr, req_sect);
486 */
487
488 return req_sect;
489}
490
9d77a5fe 491static int drbd_rs_number_requests(struct drbd_conf *mdev)
e65f440d
LE
492{
493 int number;
494 if (mdev->rs_plan_s.size) { /* mdev->sync_conf.c_plan_ahead */
495 number = drbd_rs_controller(mdev) >> (BM_BLOCK_SHIFT - 9);
496 mdev->c_sync_rate = number * HZ * (BM_BLOCK_SIZE / 1024) / SLEEP_TIME;
497 } else {
498 mdev->c_sync_rate = mdev->sync_conf.rate;
499 number = SLEEP_TIME * mdev->c_sync_rate / ((BM_BLOCK_SIZE / 1024) * HZ);
500 }
501
e65f440d
LE
502 /* ignore the amount of pending requests, the resync controller should
503 * throttle down to incoming reply rate soon enough anyways. */
504 return number;
505}
506
9d77a5fe
PR
507static int w_make_resync_request(struct drbd_conf *mdev,
508 struct drbd_work *w, int cancel)
b411b363
PR
509{
510 unsigned long bit;
511 sector_t sector;
512 const sector_t capacity = drbd_get_capacity(mdev->this_bdev);
1816a2b4 513 int max_bio_size;
e65f440d 514 int number, rollback_i, size;
b411b363 515 int align, queued, sndbuf;
0f0601f4 516 int i = 0;
b411b363
PR
517
518 if (unlikely(cancel))
519 return 1;
520
af85e8e8
LE
521 if (mdev->rs_total == 0) {
522 /* empty resync? */
523 drbd_resync_finished(mdev);
524 return 1;
525 }
526
b411b363
PR
527 if (!get_ldev(mdev)) {
528 /* Since we only need to access mdev->rsync a
529 get_ldev_if_state(mdev,D_FAILED) would be sufficient, but
530 to continue resync with a broken disk makes no sense at
531 all */
532 dev_err(DEV, "Disk broke down during resync!\n");
b411b363
PR
533 return 1;
534 }
535
bb3d000c
LE
536 /* starting with drbd 8.3.8, we can handle multi-bio EEs,
537 * if it should be necessary */
1816a2b4
LE
538 max_bio_size =
539 mdev->agreed_pro_version < 94 ? queue_max_hw_sectors(mdev->rq_queue) << 9 :
540 mdev->agreed_pro_version < 95 ? DRBD_MAX_SIZE_H80_PACKET : DRBD_MAX_BIO_SIZE;
bb3d000c 541
e65f440d
LE
542 number = drbd_rs_number_requests(mdev);
543 if (number == 0)
0f0601f4 544 goto requeue;
b411b363 545
b411b363
PR
546 for (i = 0; i < number; i++) {
547 /* Stop generating RS requests, when half of the send buffer is filled */
548 mutex_lock(&mdev->data.mutex);
549 if (mdev->data.socket) {
550 queued = mdev->data.socket->sk->sk_wmem_queued;
551 sndbuf = mdev->data.socket->sk->sk_sndbuf;
552 } else {
553 queued = 1;
554 sndbuf = 0;
555 }
556 mutex_unlock(&mdev->data.mutex);
557 if (queued > sndbuf / 2)
558 goto requeue;
559
560next_sector:
561 size = BM_BLOCK_SIZE;
562 bit = drbd_bm_find_next(mdev, mdev->bm_resync_fo);
563
4b0715f0 564 if (bit == DRBD_END_OF_BITMAP) {
b411b363 565 mdev->bm_resync_fo = drbd_bm_bits(mdev);
b411b363
PR
566 put_ldev(mdev);
567 return 1;
568 }
569
570 sector = BM_BIT_TO_SECT(bit);
571
e3555d85
PR
572 if (drbd_rs_should_slow_down(mdev, sector) ||
573 drbd_try_rs_begin_io(mdev, sector)) {
b411b363
PR
574 mdev->bm_resync_fo = bit;
575 goto requeue;
576 }
577 mdev->bm_resync_fo = bit + 1;
578
579 if (unlikely(drbd_bm_test_bit(mdev, bit) == 0)) {
580 drbd_rs_complete_io(mdev, sector);
581 goto next_sector;
582 }
583
1816a2b4 584#if DRBD_MAX_BIO_SIZE > BM_BLOCK_SIZE
b411b363
PR
585 /* try to find some adjacent bits.
586 * we stop if we have already the maximum req size.
587 *
588 * Additionally always align bigger requests, in order to
589 * be prepared for all stripe sizes of software RAIDs.
b411b363
PR
590 */
591 align = 1;
d207450c 592 rollback_i = i;
b411b363 593 for (;;) {
1816a2b4 594 if (size + BM_BLOCK_SIZE > max_bio_size)
b411b363
PR
595 break;
596
597 /* Be always aligned */
598 if (sector & ((1<<(align+3))-1))
599 break;
600
601 /* do not cross extent boundaries */
602 if (((bit+1) & BM_BLOCKS_PER_BM_EXT_MASK) == 0)
603 break;
604 /* now, is it actually dirty, after all?
605 * caution, drbd_bm_test_bit is tri-state for some
606 * obscure reason; ( b == 0 ) would get the out-of-band
607 * only accidentally right because of the "oddly sized"
608 * adjustment below */
609 if (drbd_bm_test_bit(mdev, bit+1) != 1)
610 break;
611 bit++;
612 size += BM_BLOCK_SIZE;
613 if ((BM_BLOCK_SIZE << align) <= size)
614 align++;
615 i++;
616 }
617 /* if we merged some,
618 * reset the offset to start the next drbd_bm_find_next from */
619 if (size > BM_BLOCK_SIZE)
620 mdev->bm_resync_fo = bit + 1;
621#endif
622
623 /* adjust very last sectors, in case we are oddly sized */
624 if (sector + (size>>9) > capacity)
625 size = (capacity-sector)<<9;
626 if (mdev->agreed_pro_version >= 89 && mdev->csums_tfm) {
627 switch (read_for_csum(mdev, sector, size)) {
80a40e43 628 case -EIO: /* Disk failure */
b411b363
PR
629 put_ldev(mdev);
630 return 0;
80a40e43 631 case -EAGAIN: /* allocation failed, or ldev busy */
b411b363
PR
632 drbd_rs_complete_io(mdev, sector);
633 mdev->bm_resync_fo = BM_SECT_TO_BIT(sector);
d207450c 634 i = rollback_i;
b411b363 635 goto requeue;
80a40e43
LE
636 case 0:
637 /* everything ok */
638 break;
639 default:
640 BUG();
b411b363
PR
641 }
642 } else {
643 inc_rs_pending(mdev);
644 if (!drbd_send_drequest(mdev, P_RS_DATA_REQUEST,
645 sector, size, ID_SYNCER)) {
646 dev_err(DEV, "drbd_send_drequest() failed, aborting...\n");
647 dec_rs_pending(mdev);
648 put_ldev(mdev);
649 return 0;
650 }
651 }
652 }
653
654 if (mdev->bm_resync_fo >= drbd_bm_bits(mdev)) {
655 /* last syncer _request_ was sent,
656 * but the P_RS_DATA_REPLY not yet received. sync will end (and
657 * next sync group will resume), as soon as we receive the last
658 * resync data block, and the last bit is cleared.
659 * until then resync "work" is "inactive" ...
660 */
b411b363
PR
661 put_ldev(mdev);
662 return 1;
663 }
664
665 requeue:
778f271d 666 mdev->rs_in_flight += (i << (BM_BLOCK_SHIFT - 9));
b411b363
PR
667 mod_timer(&mdev->resync_timer, jiffies + SLEEP_TIME);
668 put_ldev(mdev);
669 return 1;
670}
671
672static int w_make_ov_request(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
673{
674 int number, i, size;
675 sector_t sector;
676 const sector_t capacity = drbd_get_capacity(mdev->this_bdev);
677
678 if (unlikely(cancel))
679 return 1;
680
2649f080 681 number = drbd_rs_number_requests(mdev);
b411b363
PR
682
683 sector = mdev->ov_position;
684 for (i = 0; i < number; i++) {
685 if (sector >= capacity) {
b411b363
PR
686 return 1;
687 }
688
689 size = BM_BLOCK_SIZE;
690
e3555d85
PR
691 if (drbd_rs_should_slow_down(mdev, sector) ||
692 drbd_try_rs_begin_io(mdev, sector)) {
b411b363
PR
693 mdev->ov_position = sector;
694 goto requeue;
695 }
696
697 if (sector + (size>>9) > capacity)
698 size = (capacity-sector)<<9;
699
700 inc_rs_pending(mdev);
701 if (!drbd_send_ov_request(mdev, sector, size)) {
702 dec_rs_pending(mdev);
703 return 0;
704 }
705 sector += BM_SECT_PER_BIT;
706 }
707 mdev->ov_position = sector;
708
709 requeue:
2649f080 710 mdev->rs_in_flight += (i << (BM_BLOCK_SHIFT - 9));
b411b363
PR
711 mod_timer(&mdev->resync_timer, jiffies + SLEEP_TIME);
712 return 1;
713}
714
c4752ef1 715
370a43e7
PR
716void start_resync_timer_fn(unsigned long data)
717{
718 struct drbd_conf *mdev = (struct drbd_conf *) data;
719
720 drbd_queue_work(&mdev->data.work, &mdev->start_resync_work);
721}
722
c4752ef1
PR
723int w_start_resync(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
724{
370a43e7
PR
725 if (atomic_read(&mdev->unacked_cnt) || atomic_read(&mdev->rs_pending_cnt)) {
726 dev_warn(DEV, "w_start_resync later...\n");
727 mdev->start_resync_timer.expires = jiffies + HZ/10;
728 add_timer(&mdev->start_resync_timer);
729 return 1;
730 }
c4752ef1 731
370a43e7
PR
732 drbd_start_resync(mdev, C_SYNC_SOURCE);
733 clear_bit(AHEAD_TO_SYNC_SOURCE, &mdev->current_epoch->flags);
c4752ef1
PR
734 return 1;
735}
736
b411b363
PR
737int w_ov_finished(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
738{
739 kfree(w);
740 ov_oos_print(mdev);
741 drbd_resync_finished(mdev);
742
743 return 1;
744}
745
746static int w_resync_finished(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
747{
748 kfree(w);
749
750 drbd_resync_finished(mdev);
751
752 return 1;
753}
754
af85e8e8
LE
755static void ping_peer(struct drbd_conf *mdev)
756{
757 clear_bit(GOT_PING_ACK, &mdev->flags);
758 request_ping(mdev);
759 wait_event(mdev->misc_wait,
760 test_bit(GOT_PING_ACK, &mdev->flags) || mdev->state.conn < C_CONNECTED);
761}
762
b411b363
PR
763int drbd_resync_finished(struct drbd_conf *mdev)
764{
765 unsigned long db, dt, dbdt;
766 unsigned long n_oos;
767 union drbd_state os, ns;
768 struct drbd_work *w;
769 char *khelper_cmd = NULL;
26525618 770 int verify_done = 0;
b411b363
PR
771
772 /* Remove all elements from the resync LRU. Since future actions
773 * might set bits in the (main) bitmap, then the entries in the
774 * resync LRU would be wrong. */
775 if (drbd_rs_del_all(mdev)) {
776 /* In case this is not possible now, most probably because
777 * there are P_RS_DATA_REPLY Packets lingering on the worker's
778 * queue (or even the read operations for those packets
779 * is not finished by now). Retry in 100ms. */
780
20ee6390 781 schedule_timeout_interruptible(HZ / 10);
b411b363
PR
782 w = kmalloc(sizeof(struct drbd_work), GFP_ATOMIC);
783 if (w) {
784 w->cb = w_resync_finished;
785 drbd_queue_work(&mdev->data.work, w);
786 return 1;
787 }
788 dev_err(DEV, "Warn failed to drbd_rs_del_all() and to kmalloc(w).\n");
789 }
790
791 dt = (jiffies - mdev->rs_start - mdev->rs_paused) / HZ;
792 if (dt <= 0)
793 dt = 1;
794 db = mdev->rs_total;
795 dbdt = Bit2KB(db/dt);
796 mdev->rs_paused /= HZ;
797
798 if (!get_ldev(mdev))
799 goto out;
800
af85e8e8
LE
801 ping_peer(mdev);
802
b411b363
PR
803 spin_lock_irq(&mdev->req_lock);
804 os = mdev->state;
805
26525618
LE
806 verify_done = (os.conn == C_VERIFY_S || os.conn == C_VERIFY_T);
807
b411b363
PR
808 /* This protects us against multiple calls (that can happen in the presence
809 of application IO), and against connectivity loss just before we arrive here. */
810 if (os.conn <= C_CONNECTED)
811 goto out_unlock;
812
813 ns = os;
814 ns.conn = C_CONNECTED;
815
816 dev_info(DEV, "%s done (total %lu sec; paused %lu sec; %lu K/sec)\n",
26525618 817 verify_done ? "Online verify " : "Resync",
b411b363
PR
818 dt + mdev->rs_paused, mdev->rs_paused, dbdt);
819
820 n_oos = drbd_bm_total_weight(mdev);
821
822 if (os.conn == C_VERIFY_S || os.conn == C_VERIFY_T) {
823 if (n_oos) {
824 dev_alert(DEV, "Online verify found %lu %dk block out of sync!\n",
825 n_oos, Bit2KB(1));
826 khelper_cmd = "out-of-sync";
827 }
828 } else {
829 D_ASSERT((n_oos - mdev->rs_failed) == 0);
830
831 if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T)
832 khelper_cmd = "after-resync-target";
833
834 if (mdev->csums_tfm && mdev->rs_total) {
835 const unsigned long s = mdev->rs_same_csum;
836 const unsigned long t = mdev->rs_total;
837 const int ratio =
838 (t == 0) ? 0 :
839 (t < 100000) ? ((s*100)/t) : (s/(t/100));
840 dev_info(DEV, "%u %% had equal check sums, eliminated: %luK; "
841 "transferred %luK total %luK\n",
842 ratio,
843 Bit2KB(mdev->rs_same_csum),
844 Bit2KB(mdev->rs_total - mdev->rs_same_csum),
845 Bit2KB(mdev->rs_total));
846 }
847 }
848
849 if (mdev->rs_failed) {
850 dev_info(DEV, " %lu failed blocks\n", mdev->rs_failed);
851
852 if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T) {
853 ns.disk = D_INCONSISTENT;
854 ns.pdsk = D_UP_TO_DATE;
855 } else {
856 ns.disk = D_UP_TO_DATE;
857 ns.pdsk = D_INCONSISTENT;
858 }
859 } else {
860 ns.disk = D_UP_TO_DATE;
861 ns.pdsk = D_UP_TO_DATE;
862
863 if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T) {
864 if (mdev->p_uuid) {
865 int i;
866 for (i = UI_BITMAP ; i <= UI_HISTORY_END ; i++)
867 _drbd_uuid_set(mdev, i, mdev->p_uuid[i]);
868 drbd_uuid_set(mdev, UI_BITMAP, mdev->ldev->md.uuid[UI_CURRENT]);
869 _drbd_uuid_set(mdev, UI_CURRENT, mdev->p_uuid[UI_CURRENT]);
870 } else {
871 dev_err(DEV, "mdev->p_uuid is NULL! BUG\n");
872 }
873 }
874
62b0da3a
LE
875 if (!(os.conn == C_VERIFY_S || os.conn == C_VERIFY_T)) {
876 /* for verify runs, we don't update uuids here,
877 * so there would be nothing to report. */
878 drbd_uuid_set_bm(mdev, 0UL);
879 drbd_print_uuids(mdev, "updated UUIDs");
880 if (mdev->p_uuid) {
881 /* Now the two UUID sets are equal, update what we
882 * know of the peer. */
883 int i;
884 for (i = UI_CURRENT ; i <= UI_HISTORY_END ; i++)
885 mdev->p_uuid[i] = mdev->ldev->md.uuid[i];
886 }
b411b363
PR
887 }
888 }
889
890 _drbd_set_state(mdev, ns, CS_VERBOSE, NULL);
891out_unlock:
892 spin_unlock_irq(&mdev->req_lock);
893 put_ldev(mdev);
894out:
895 mdev->rs_total = 0;
896 mdev->rs_failed = 0;
897 mdev->rs_paused = 0;
26525618
LE
898 if (verify_done)
899 mdev->ov_start_sector = 0;
b411b363 900
13d42685
LE
901 drbd_md_sync(mdev);
902
b411b363
PR
903 if (khelper_cmd)
904 drbd_khelper(mdev, khelper_cmd);
905
906 return 1;
907}
908
909/* helper */
910static void move_to_net_ee_or_free(struct drbd_conf *mdev, struct drbd_epoch_entry *e)
911{
45bb912b 912 if (drbd_ee_has_active_page(e)) {
b411b363 913 /* This might happen if sendpage() has not finished */
78db8928 914 int i = (e->size + PAGE_SIZE -1) >> PAGE_SHIFT;
435f0740
LE
915 atomic_add(i, &mdev->pp_in_use_by_net);
916 atomic_sub(i, &mdev->pp_in_use);
b411b363
PR
917 spin_lock_irq(&mdev->req_lock);
918 list_add_tail(&e->w.list, &mdev->net_ee);
919 spin_unlock_irq(&mdev->req_lock);
435f0740 920 wake_up(&drbd_pp_wait);
b411b363
PR
921 } else
922 drbd_free_ee(mdev, e);
923}
924
925/**
926 * w_e_end_data_req() - Worker callback, to send a P_DATA_REPLY packet in response to a P_DATA_REQUEST
927 * @mdev: DRBD device.
928 * @w: work object.
929 * @cancel: The connection will be closed anyways
930 */
931int w_e_end_data_req(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
932{
933 struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w);
934 int ok;
935
936 if (unlikely(cancel)) {
937 drbd_free_ee(mdev, e);
938 dec_unacked(mdev);
939 return 1;
940 }
941
45bb912b 942 if (likely((e->flags & EE_WAS_ERROR) == 0)) {
b411b363
PR
943 ok = drbd_send_block(mdev, P_DATA_REPLY, e);
944 } else {
945 if (__ratelimit(&drbd_ratelimit_state))
946 dev_err(DEV, "Sending NegDReply. sector=%llus.\n",
947 (unsigned long long)e->sector);
948
949 ok = drbd_send_ack(mdev, P_NEG_DREPLY, e);
950 }
951
952 dec_unacked(mdev);
953
954 move_to_net_ee_or_free(mdev, e);
955
956 if (unlikely(!ok))
957 dev_err(DEV, "drbd_send_block() failed\n");
958 return ok;
959}
960
961/**
962 * w_e_end_rsdata_req() - Worker callback to send a P_RS_DATA_REPLY packet in response to a P_RS_DATA_REQUESTRS
963 * @mdev: DRBD device.
964 * @w: work object.
965 * @cancel: The connection will be closed anyways
966 */
967int w_e_end_rsdata_req(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
968{
969 struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w);
970 int ok;
971
972 if (unlikely(cancel)) {
973 drbd_free_ee(mdev, e);
974 dec_unacked(mdev);
975 return 1;
976 }
977
978 if (get_ldev_if_state(mdev, D_FAILED)) {
979 drbd_rs_complete_io(mdev, e->sector);
980 put_ldev(mdev);
981 }
982
d612d309
PR
983 if (mdev->state.conn == C_AHEAD) {
984 ok = drbd_send_ack(mdev, P_RS_CANCEL, e);
985 } else if (likely((e->flags & EE_WAS_ERROR) == 0)) {
b411b363
PR
986 if (likely(mdev->state.pdsk >= D_INCONSISTENT)) {
987 inc_rs_pending(mdev);
988 ok = drbd_send_block(mdev, P_RS_DATA_REPLY, e);
989 } else {
990 if (__ratelimit(&drbd_ratelimit_state))
991 dev_err(DEV, "Not sending RSDataReply, "
992 "partner DISKLESS!\n");
993 ok = 1;
994 }
995 } else {
996 if (__ratelimit(&drbd_ratelimit_state))
997 dev_err(DEV, "Sending NegRSDReply. sector %llus.\n",
998 (unsigned long long)e->sector);
999
1000 ok = drbd_send_ack(mdev, P_NEG_RS_DREPLY, e);
1001
1002 /* update resync data with failure */
1003 drbd_rs_failed_io(mdev, e->sector, e->size);
1004 }
1005
1006 dec_unacked(mdev);
1007
1008 move_to_net_ee_or_free(mdev, e);
1009
1010 if (unlikely(!ok))
1011 dev_err(DEV, "drbd_send_block() failed\n");
1012 return ok;
1013}
1014
1015int w_e_end_csum_rs_req(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1016{
1017 struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w);
1018 struct digest_info *di;
1019 int digest_size;
1020 void *digest = NULL;
1021 int ok, eq = 0;
1022
1023 if (unlikely(cancel)) {
1024 drbd_free_ee(mdev, e);
1025 dec_unacked(mdev);
1026 return 1;
1027 }
1028
1d53f09e
LE
1029 if (get_ldev(mdev)) {
1030 drbd_rs_complete_io(mdev, e->sector);
1031 put_ldev(mdev);
1032 }
b411b363 1033
85719573 1034 di = e->digest;
b411b363 1035
45bb912b 1036 if (likely((e->flags & EE_WAS_ERROR) == 0)) {
b411b363
PR
1037 /* quick hack to try to avoid a race against reconfiguration.
1038 * a real fix would be much more involved,
1039 * introducing more locking mechanisms */
1040 if (mdev->csums_tfm) {
1041 digest_size = crypto_hash_digestsize(mdev->csums_tfm);
1042 D_ASSERT(digest_size == di->digest_size);
1043 digest = kmalloc(digest_size, GFP_NOIO);
1044 }
1045 if (digest) {
45bb912b 1046 drbd_csum_ee(mdev, mdev->csums_tfm, e, digest);
b411b363
PR
1047 eq = !memcmp(digest, di->digest, digest_size);
1048 kfree(digest);
1049 }
1050
1051 if (eq) {
1052 drbd_set_in_sync(mdev, e->sector, e->size);
676396d5
LE
1053 /* rs_same_csums unit is BM_BLOCK_SIZE */
1054 mdev->rs_same_csum += e->size >> BM_BLOCK_SHIFT;
b411b363
PR
1055 ok = drbd_send_ack(mdev, P_RS_IS_IN_SYNC, e);
1056 } else {
1057 inc_rs_pending(mdev);
204bba99
PR
1058 e->block_id = ID_SYNCER; /* By setting block_id, digest pointer becomes invalid! */
1059 e->flags &= ~EE_HAS_DIGEST; /* This e no longer has a digest pointer */
1060 kfree(di);
b411b363
PR
1061 ok = drbd_send_block(mdev, P_RS_DATA_REPLY, e);
1062 }
1063 } else {
1064 ok = drbd_send_ack(mdev, P_NEG_RS_DREPLY, e);
1065 if (__ratelimit(&drbd_ratelimit_state))
1066 dev_err(DEV, "Sending NegDReply. I guess it gets messy.\n");
1067 }
1068
1069 dec_unacked(mdev);
b411b363
PR
1070 move_to_net_ee_or_free(mdev, e);
1071
1072 if (unlikely(!ok))
1073 dev_err(DEV, "drbd_send_block/ack() failed\n");
1074 return ok;
1075}
1076
1077int w_e_end_ov_req(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1078{
1079 struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w);
1080 int digest_size;
1081 void *digest;
1082 int ok = 1;
1083
1084 if (unlikely(cancel))
1085 goto out;
1086
45bb912b 1087 if (unlikely((e->flags & EE_WAS_ERROR) != 0))
b411b363
PR
1088 goto out;
1089
1090 digest_size = crypto_hash_digestsize(mdev->verify_tfm);
1091 /* FIXME if this allocation fails, online verify will not terminate! */
1092 digest = kmalloc(digest_size, GFP_NOIO);
1093 if (digest) {
45bb912b 1094 drbd_csum_ee(mdev, mdev->verify_tfm, e, digest);
b411b363
PR
1095 inc_rs_pending(mdev);
1096 ok = drbd_send_drequest_csum(mdev, e->sector, e->size,
1097 digest, digest_size, P_OV_REPLY);
1098 if (!ok)
1099 dec_rs_pending(mdev);
1100 kfree(digest);
1101 }
1102
1103out:
1104 drbd_free_ee(mdev, e);
1105
1106 dec_unacked(mdev);
1107
1108 return ok;
1109}
1110
1111void drbd_ov_oos_found(struct drbd_conf *mdev, sector_t sector, int size)
1112{
1113 if (mdev->ov_last_oos_start + mdev->ov_last_oos_size == sector) {
1114 mdev->ov_last_oos_size += size>>9;
1115 } else {
1116 mdev->ov_last_oos_start = sector;
1117 mdev->ov_last_oos_size = size>>9;
1118 }
1119 drbd_set_out_of_sync(mdev, sector, size);
b411b363
PR
1120}
1121
1122int w_e_end_ov_reply(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1123{
1124 struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w);
1125 struct digest_info *di;
1126 int digest_size;
1127 void *digest;
1128 int ok, eq = 0;
1129
1130 if (unlikely(cancel)) {
1131 drbd_free_ee(mdev, e);
1132 dec_unacked(mdev);
1133 return 1;
1134 }
1135
1136 /* after "cancel", because after drbd_disconnect/drbd_rs_cancel_all
1137 * the resync lru has been cleaned up already */
1d53f09e
LE
1138 if (get_ldev(mdev)) {
1139 drbd_rs_complete_io(mdev, e->sector);
1140 put_ldev(mdev);
1141 }
b411b363 1142
85719573 1143 di = e->digest;
b411b363 1144
45bb912b 1145 if (likely((e->flags & EE_WAS_ERROR) == 0)) {
b411b363
PR
1146 digest_size = crypto_hash_digestsize(mdev->verify_tfm);
1147 digest = kmalloc(digest_size, GFP_NOIO);
1148 if (digest) {
45bb912b 1149 drbd_csum_ee(mdev, mdev->verify_tfm, e, digest);
b411b363
PR
1150
1151 D_ASSERT(digest_size == di->digest_size);
1152 eq = !memcmp(digest, di->digest, digest_size);
1153 kfree(digest);
1154 }
1155 } else {
1156 ok = drbd_send_ack(mdev, P_NEG_RS_DREPLY, e);
1157 if (__ratelimit(&drbd_ratelimit_state))
1158 dev_err(DEV, "Sending NegDReply. I guess it gets messy.\n");
1159 }
1160
1161 dec_unacked(mdev);
b411b363
PR
1162 if (!eq)
1163 drbd_ov_oos_found(mdev, e->sector, e->size);
1164 else
1165 ov_oos_print(mdev);
1166
1167 ok = drbd_send_ack_ex(mdev, P_OV_RESULT, e->sector, e->size,
1168 eq ? ID_IN_SYNC : ID_OUT_OF_SYNC);
1169
1170 drbd_free_ee(mdev, e);
1171
ea5442af
LE
1172 --mdev->ov_left;
1173
1174 /* let's advance progress step marks only for every other megabyte */
1175 if ((mdev->ov_left & 0x200) == 0x200)
1176 drbd_advance_rs_marks(mdev, mdev->ov_left);
1177
1178 if (mdev->ov_left == 0) {
b411b363
PR
1179 ov_oos_print(mdev);
1180 drbd_resync_finished(mdev);
1181 }
1182
1183 return ok;
1184}
1185
1186int w_prev_work_done(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1187{
1188 struct drbd_wq_barrier *b = container_of(w, struct drbd_wq_barrier, w);
1189 complete(&b->done);
1190 return 1;
1191}
1192
1193int w_send_barrier(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1194{
1195 struct drbd_tl_epoch *b = container_of(w, struct drbd_tl_epoch, w);
1196 struct p_barrier *p = &mdev->data.sbuf.barrier;
1197 int ok = 1;
1198
1199 /* really avoid racing with tl_clear. w.cb may have been referenced
1200 * just before it was reassigned and re-queued, so double check that.
1201 * actually, this race was harmless, since we only try to send the
1202 * barrier packet here, and otherwise do nothing with the object.
1203 * but compare with the head of w_clear_epoch */
1204 spin_lock_irq(&mdev->req_lock);
1205 if (w->cb != w_send_barrier || mdev->state.conn < C_CONNECTED)
1206 cancel = 1;
1207 spin_unlock_irq(&mdev->req_lock);
1208 if (cancel)
1209 return 1;
1210
1211 if (!drbd_get_data_sock(mdev))
1212 return 0;
1213 p->barrier = b->br_number;
1214 /* inc_ap_pending was done where this was queued.
1215 * dec_ap_pending will be done in got_BarrierAck
1216 * or (on connection loss) in w_clear_epoch. */
1217 ok = _drbd_send_cmd(mdev, mdev->data.socket, P_BARRIER,
0b70a13d 1218 (struct p_header80 *)p, sizeof(*p), 0);
b411b363
PR
1219 drbd_put_data_sock(mdev);
1220
1221 return ok;
1222}
1223
1224int w_send_write_hint(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1225{
1226 if (cancel)
1227 return 1;
1228 return drbd_send_short_cmd(mdev, P_UNPLUG_REMOTE);
1229}
1230
73a01a18
PR
1231int w_send_oos(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1232{
1233 struct drbd_request *req = container_of(w, struct drbd_request, w);
1234 int ok;
1235
1236 if (unlikely(cancel)) {
1237 req_mod(req, send_canceled);
1238 return 1;
1239 }
1240
1241 ok = drbd_send_oos(mdev, req);
1242 req_mod(req, oos_handed_to_network);
1243
1244 return ok;
1245}
1246
b411b363
PR
1247/**
1248 * w_send_dblock() - Worker callback to send a P_DATA packet in order to mirror a write request
1249 * @mdev: DRBD device.
1250 * @w: work object.
1251 * @cancel: The connection will be closed anyways
1252 */
1253int w_send_dblock(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1254{
1255 struct drbd_request *req = container_of(w, struct drbd_request, w);
1256 int ok;
1257
1258 if (unlikely(cancel)) {
1259 req_mod(req, send_canceled);
1260 return 1;
1261 }
1262
1263 ok = drbd_send_dblock(mdev, req);
1264 req_mod(req, ok ? handed_over_to_network : send_failed);
1265
1266 return ok;
1267}
1268
1269/**
1270 * w_send_read_req() - Worker callback to send a read request (P_DATA_REQUEST) packet
1271 * @mdev: DRBD device.
1272 * @w: work object.
1273 * @cancel: The connection will be closed anyways
1274 */
1275int w_send_read_req(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1276{
1277 struct drbd_request *req = container_of(w, struct drbd_request, w);
1278 int ok;
1279
1280 if (unlikely(cancel)) {
1281 req_mod(req, send_canceled);
1282 return 1;
1283 }
1284
1285 ok = drbd_send_drequest(mdev, P_DATA_REQUEST, req->sector, req->size,
1286 (unsigned long)req);
1287
1288 if (!ok) {
1289 /* ?? we set C_TIMEOUT or C_BROKEN_PIPE in drbd_send();
1290 * so this is probably redundant */
1291 if (mdev->state.conn >= C_CONNECTED)
1292 drbd_force_state(mdev, NS(conn, C_NETWORK_FAILURE));
1293 }
1294 req_mod(req, ok ? handed_over_to_network : send_failed);
1295
1296 return ok;
1297}
1298
265be2d0
PR
1299int w_restart_disk_io(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1300{
1301 struct drbd_request *req = container_of(w, struct drbd_request, w);
1302
0778286a 1303 if (bio_data_dir(req->master_bio) == WRITE && req->rq_state & RQ_IN_ACT_LOG)
265be2d0
PR
1304 drbd_al_begin_io(mdev, req->sector);
1305 /* Calling drbd_al_begin_io() out of the worker might deadlocks
1306 theoretically. Practically it can not deadlock, since this is
1307 only used when unfreezing IOs. All the extents of the requests
1308 that made it into the TL are already active */
1309
1310 drbd_req_make_private_bio(req, req->master_bio);
1311 req->private_bio->bi_bdev = mdev->ldev->backing_bdev;
1312 generic_make_request(req->private_bio);
1313
1314 return 1;
1315}
1316
b411b363
PR
1317static int _drbd_may_sync_now(struct drbd_conf *mdev)
1318{
1319 struct drbd_conf *odev = mdev;
1320
1321 while (1) {
1322 if (odev->sync_conf.after == -1)
1323 return 1;
1324 odev = minor_to_mdev(odev->sync_conf.after);
1325 ERR_IF(!odev) return 1;
1326 if ((odev->state.conn >= C_SYNC_SOURCE &&
1327 odev->state.conn <= C_PAUSED_SYNC_T) ||
1328 odev->state.aftr_isp || odev->state.peer_isp ||
1329 odev->state.user_isp)
1330 return 0;
1331 }
1332}
1333
1334/**
1335 * _drbd_pause_after() - Pause resync on all devices that may not resync now
1336 * @mdev: DRBD device.
1337 *
1338 * Called from process context only (admin command and after_state_ch).
1339 */
1340static int _drbd_pause_after(struct drbd_conf *mdev)
1341{
1342 struct drbd_conf *odev;
1343 int i, rv = 0;
1344
1345 for (i = 0; i < minor_count; i++) {
1346 odev = minor_to_mdev(i);
1347 if (!odev)
1348 continue;
1349 if (odev->state.conn == C_STANDALONE && odev->state.disk == D_DISKLESS)
1350 continue;
1351 if (!_drbd_may_sync_now(odev))
1352 rv |= (__drbd_set_state(_NS(odev, aftr_isp, 1), CS_HARD, NULL)
1353 != SS_NOTHING_TO_DO);
1354 }
1355
1356 return rv;
1357}
1358
1359/**
1360 * _drbd_resume_next() - Resume resync on all devices that may resync now
1361 * @mdev: DRBD device.
1362 *
1363 * Called from process context only (admin command and worker).
1364 */
1365static int _drbd_resume_next(struct drbd_conf *mdev)
1366{
1367 struct drbd_conf *odev;
1368 int i, rv = 0;
1369
1370 for (i = 0; i < minor_count; i++) {
1371 odev = minor_to_mdev(i);
1372 if (!odev)
1373 continue;
1374 if (odev->state.conn == C_STANDALONE && odev->state.disk == D_DISKLESS)
1375 continue;
1376 if (odev->state.aftr_isp) {
1377 if (_drbd_may_sync_now(odev))
1378 rv |= (__drbd_set_state(_NS(odev, aftr_isp, 0),
1379 CS_HARD, NULL)
1380 != SS_NOTHING_TO_DO) ;
1381 }
1382 }
1383 return rv;
1384}
1385
1386void resume_next_sg(struct drbd_conf *mdev)
1387{
1388 write_lock_irq(&global_state_lock);
1389 _drbd_resume_next(mdev);
1390 write_unlock_irq(&global_state_lock);
1391}
1392
1393void suspend_other_sg(struct drbd_conf *mdev)
1394{
1395 write_lock_irq(&global_state_lock);
1396 _drbd_pause_after(mdev);
1397 write_unlock_irq(&global_state_lock);
1398}
1399
1400static int sync_after_error(struct drbd_conf *mdev, int o_minor)
1401{
1402 struct drbd_conf *odev;
1403
1404 if (o_minor == -1)
1405 return NO_ERROR;
1406 if (o_minor < -1 || minor_to_mdev(o_minor) == NULL)
1407 return ERR_SYNC_AFTER;
1408
1409 /* check for loops */
1410 odev = minor_to_mdev(o_minor);
1411 while (1) {
1412 if (odev == mdev)
1413 return ERR_SYNC_AFTER_CYCLE;
1414
1415 /* dependency chain ends here, no cycles. */
1416 if (odev->sync_conf.after == -1)
1417 return NO_ERROR;
1418
1419 /* follow the dependency chain */
1420 odev = minor_to_mdev(odev->sync_conf.after);
1421 }
1422}
1423
1424int drbd_alter_sa(struct drbd_conf *mdev, int na)
1425{
1426 int changes;
1427 int retcode;
1428
1429 write_lock_irq(&global_state_lock);
1430 retcode = sync_after_error(mdev, na);
1431 if (retcode == NO_ERROR) {
1432 mdev->sync_conf.after = na;
1433 do {
1434 changes = _drbd_pause_after(mdev);
1435 changes |= _drbd_resume_next(mdev);
1436 } while (changes);
1437 }
1438 write_unlock_irq(&global_state_lock);
1439 return retcode;
1440}
1441
9bd28d3c
LE
1442void drbd_rs_controller_reset(struct drbd_conf *mdev)
1443{
1444 atomic_set(&mdev->rs_sect_in, 0);
1445 atomic_set(&mdev->rs_sect_ev, 0);
1446 mdev->rs_in_flight = 0;
1447 mdev->rs_planed = 0;
1448 spin_lock(&mdev->peer_seq_lock);
1449 fifo_set(&mdev->rs_plan_s, 0);
1450 spin_unlock(&mdev->peer_seq_lock);
1451}
1452
b411b363
PR
1453/**
1454 * drbd_start_resync() - Start the resync process
1455 * @mdev: DRBD device.
1456 * @side: Either C_SYNC_SOURCE or C_SYNC_TARGET
1457 *
1458 * This function might bring you directly into one of the
1459 * C_PAUSED_SYNC_* states.
1460 */
1461void drbd_start_resync(struct drbd_conf *mdev, enum drbd_conns side)
1462{
1463 union drbd_state ns;
1464 int r;
1465
c4752ef1 1466 if (mdev->state.conn >= C_SYNC_SOURCE && mdev->state.conn < C_AHEAD) {
b411b363
PR
1467 dev_err(DEV, "Resync already running!\n");
1468 return;
1469 }
1470
59817f4f
PR
1471 if (mdev->state.conn < C_AHEAD) {
1472 /* In case a previous resync run was aborted by an IO error/detach on the peer. */
1473 drbd_rs_cancel_all(mdev);
1474 /* This should be done when we abort the resync. We definitely do not
1475 want to have this for connections going back and forth between
1476 Ahead/Behind and SyncSource/SyncTarget */
1477 }
b411b363
PR
1478
1479 if (side == C_SYNC_TARGET) {
1480 /* Since application IO was locked out during C_WF_BITMAP_T and
1481 C_WF_SYNC_UUID we are still unmodified. Before going to C_SYNC_TARGET
1482 we check that we might make the data inconsistent. */
1483 r = drbd_khelper(mdev, "before-resync-target");
1484 r = (r >> 8) & 0xff;
1485 if (r > 0) {
1486 dev_info(DEV, "before-resync-target handler returned %d, "
1487 "dropping connection.\n", r);
1488 drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
1489 return;
1490 }
09b9e797
PR
1491 } else /* C_SYNC_SOURCE */ {
1492 r = drbd_khelper(mdev, "before-resync-source");
1493 r = (r >> 8) & 0xff;
1494 if (r > 0) {
1495 if (r == 3) {
1496 dev_info(DEV, "before-resync-source handler returned %d, "
1497 "ignoring. Old userland tools?", r);
1498 } else {
1499 dev_info(DEV, "before-resync-source handler returned %d, "
1500 "dropping connection.\n", r);
1501 drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
1502 return;
1503 }
1504 }
b411b363
PR
1505 }
1506
1507 drbd_state_lock(mdev);
1508
1509 if (!get_ldev_if_state(mdev, D_NEGOTIATING)) {
1510 drbd_state_unlock(mdev);
1511 return;
1512 }
1513
b411b363
PR
1514 write_lock_irq(&global_state_lock);
1515 ns = mdev->state;
1516
1517 ns.aftr_isp = !_drbd_may_sync_now(mdev);
1518
1519 ns.conn = side;
1520
1521 if (side == C_SYNC_TARGET)
1522 ns.disk = D_INCONSISTENT;
1523 else /* side == C_SYNC_SOURCE */
1524 ns.pdsk = D_INCONSISTENT;
1525
1526 r = __drbd_set_state(mdev, ns, CS_VERBOSE, NULL);
1527 ns = mdev->state;
1528
1529 if (ns.conn < C_CONNECTED)
1530 r = SS_UNKNOWN_ERROR;
1531
1532 if (r == SS_SUCCESS) {
1d7734a0
LE
1533 unsigned long tw = drbd_bm_total_weight(mdev);
1534 unsigned long now = jiffies;
1535 int i;
1536
b411b363
PR
1537 mdev->rs_failed = 0;
1538 mdev->rs_paused = 0;
b411b363 1539 mdev->rs_same_csum = 0;
0f0601f4
LE
1540 mdev->rs_last_events = 0;
1541 mdev->rs_last_sect_ev = 0;
1d7734a0
LE
1542 mdev->rs_total = tw;
1543 mdev->rs_start = now;
1544 for (i = 0; i < DRBD_SYNC_MARKS; i++) {
1545 mdev->rs_mark_left[i] = tw;
1546 mdev->rs_mark_time[i] = now;
1547 }
b411b363
PR
1548 _drbd_pause_after(mdev);
1549 }
1550 write_unlock_irq(&global_state_lock);
5a22db89 1551
b411b363
PR
1552 if (r == SS_SUCCESS) {
1553 dev_info(DEV, "Began resync as %s (will sync %lu KB [%lu bits set]).\n",
1554 drbd_conn_str(ns.conn),
1555 (unsigned long) mdev->rs_total << (BM_BLOCK_SHIFT-10),
1556 (unsigned long) mdev->rs_total);
6c922ed5
LE
1557 if (side == C_SYNC_TARGET)
1558 mdev->bm_resync_fo = 0;
1559
1560 /* Since protocol 96, we must serialize drbd_gen_and_send_sync_uuid
1561 * with w_send_oos, or the sync target will get confused as to
1562 * how much bits to resync. We cannot do that always, because for an
1563 * empty resync and protocol < 95, we need to do it here, as we call
1564 * drbd_resync_finished from here in that case.
1565 * We drbd_gen_and_send_sync_uuid here for protocol < 96,
1566 * and from after_state_ch otherwise. */
1567 if (side == C_SYNC_SOURCE && mdev->agreed_pro_version < 96)
1568 drbd_gen_and_send_sync_uuid(mdev);
b411b363 1569
af85e8e8
LE
1570 if (mdev->agreed_pro_version < 95 && mdev->rs_total == 0) {
1571 /* This still has a race (about when exactly the peers
1572 * detect connection loss) that can lead to a full sync
1573 * on next handshake. In 8.3.9 we fixed this with explicit
1574 * resync-finished notifications, but the fix
1575 * introduces a protocol change. Sleeping for some
1576 * time longer than the ping interval + timeout on the
1577 * SyncSource, to give the SyncTarget the chance to
1578 * detect connection loss, then waiting for a ping
1579 * response (implicit in drbd_resync_finished) reduces
1580 * the race considerably, but does not solve it. */
1581 if (side == C_SYNC_SOURCE)
1582 schedule_timeout_interruptible(
1583 mdev->net_conf->ping_int * HZ +
1584 mdev->net_conf->ping_timeo*HZ/9);
b411b363 1585 drbd_resync_finished(mdev);
b411b363
PR
1586 }
1587
9bd28d3c 1588 drbd_rs_controller_reset(mdev);
b411b363
PR
1589 /* ns.conn may already be != mdev->state.conn,
1590 * we may have been paused in between, or become paused until
1591 * the timer triggers.
1592 * No matter, that is handled in resync_timer_fn() */
1593 if (ns.conn == C_SYNC_TARGET)
1594 mod_timer(&mdev->resync_timer, jiffies);
1595
1596 drbd_md_sync(mdev);
1597 }
5a22db89 1598 put_ldev(mdev);
d0c3f60f 1599 drbd_state_unlock(mdev);
b411b363
PR
1600}
1601
1602int drbd_worker(struct drbd_thread *thi)
1603{
1604 struct drbd_conf *mdev = thi->mdev;
1605 struct drbd_work *w = NULL;
1606 LIST_HEAD(work_list);
1607 int intr = 0, i;
1608
1609 sprintf(current->comm, "drbd%d_worker", mdev_to_minor(mdev));
1610
1611 while (get_t_state(thi) == Running) {
1612 drbd_thread_current_set_cpu(mdev);
1613
1614 if (down_trylock(&mdev->data.work.s)) {
1615 mutex_lock(&mdev->data.mutex);
1616 if (mdev->data.socket && !mdev->net_conf->no_cork)
1617 drbd_tcp_uncork(mdev->data.socket);
1618 mutex_unlock(&mdev->data.mutex);
1619
1620 intr = down_interruptible(&mdev->data.work.s);
1621
1622 mutex_lock(&mdev->data.mutex);
1623 if (mdev->data.socket && !mdev->net_conf->no_cork)
1624 drbd_tcp_cork(mdev->data.socket);
1625 mutex_unlock(&mdev->data.mutex);
1626 }
1627
1628 if (intr) {
1629 D_ASSERT(intr == -EINTR);
1630 flush_signals(current);
1631 ERR_IF (get_t_state(thi) == Running)
1632 continue;
1633 break;
1634 }
1635
1636 if (get_t_state(thi) != Running)
1637 break;
1638 /* With this break, we have done a down() but not consumed
1639 the entry from the list. The cleanup code takes care of
1640 this... */
1641
1642 w = NULL;
1643 spin_lock_irq(&mdev->data.work.q_lock);
1644 ERR_IF(list_empty(&mdev->data.work.q)) {
1645 /* something terribly wrong in our logic.
1646 * we were able to down() the semaphore,
1647 * but the list is empty... doh.
1648 *
1649 * what is the best thing to do now?
1650 * try again from scratch, restarting the receiver,
1651 * asender, whatnot? could break even more ugly,
1652 * e.g. when we are primary, but no good local data.
1653 *
1654 * I'll try to get away just starting over this loop.
1655 */
1656 spin_unlock_irq(&mdev->data.work.q_lock);
1657 continue;
1658 }
1659 w = list_entry(mdev->data.work.q.next, struct drbd_work, list);
1660 list_del_init(&w->list);
1661 spin_unlock_irq(&mdev->data.work.q_lock);
1662
1663 if (!w->cb(mdev, w, mdev->state.conn < C_CONNECTED)) {
1664 /* dev_warn(DEV, "worker: a callback failed! \n"); */
1665 if (mdev->state.conn >= C_CONNECTED)
1666 drbd_force_state(mdev,
1667 NS(conn, C_NETWORK_FAILURE));
1668 }
1669 }
1670 D_ASSERT(test_bit(DEVICE_DYING, &mdev->flags));
1671 D_ASSERT(test_bit(CONFIG_PENDING, &mdev->flags));
1672
1673 spin_lock_irq(&mdev->data.work.q_lock);
1674 i = 0;
1675 while (!list_empty(&mdev->data.work.q)) {
1676 list_splice_init(&mdev->data.work.q, &work_list);
1677 spin_unlock_irq(&mdev->data.work.q_lock);
1678
1679 while (!list_empty(&work_list)) {
1680 w = list_entry(work_list.next, struct drbd_work, list);
1681 list_del_init(&w->list);
1682 w->cb(mdev, w, 1);
1683 i++; /* dead debugging code */
1684 }
1685
1686 spin_lock_irq(&mdev->data.work.q_lock);
1687 }
1688 sema_init(&mdev->data.work.s, 0);
1689 /* DANGEROUS race: if someone did queue his work within the spinlock,
1690 * but up() ed outside the spinlock, we could get an up() on the
1691 * semaphore without corresponding list entry.
1692 * So don't do that.
1693 */
1694 spin_unlock_irq(&mdev->data.work.q_lock);
1695
1696 D_ASSERT(mdev->state.disk == D_DISKLESS && mdev->state.conn == C_STANDALONE);
1697 /* _drbd_set_state only uses stop_nowait.
1698 * wait here for the Exiting receiver. */
1699 drbd_thread_stop(&mdev->receiver);
1700 drbd_mdev_cleanup(mdev);
1701
1702 dev_info(DEV, "worker terminated\n");
1703
1704 clear_bit(DEVICE_DYING, &mdev->flags);
1705 clear_bit(CONFIG_PENDING, &mdev->flags);
1706 wake_up(&mdev->state_wait);
1707
1708 return 0;
1709}
This page took 0.159402 seconds and 5 git commands to generate.