drbd: This code is dead now
[deliverable/linux.git] / drivers / block / drbd / drbd_worker.c
CommitLineData
b411b363
PR
1/*
2 drbd_worker.c
3
4 This file is part of DRBD by Philipp Reisner and Lars Ellenberg.
5
6 Copyright (C) 2001-2008, LINBIT Information Technologies GmbH.
7 Copyright (C) 1999-2008, Philipp Reisner <philipp.reisner@linbit.com>.
8 Copyright (C) 2002-2008, Lars Ellenberg <lars.ellenberg@linbit.com>.
9
10 drbd is free software; you can redistribute it and/or modify
11 it under the terms of the GNU General Public License as published by
12 the Free Software Foundation; either version 2, or (at your option)
13 any later version.
14
15 drbd is distributed in the hope that it will be useful,
16 but WITHOUT ANY WARRANTY; without even the implied warranty of
17 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18 GNU General Public License for more details.
19
20 You should have received a copy of the GNU General Public License
21 along with drbd; see the file COPYING. If not, write to
22 the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
23
24 */
25
b411b363 26#include <linux/module.h>
b411b363
PR
27#include <linux/drbd.h>
28#include <linux/sched.h>
b411b363
PR
29#include <linux/wait.h>
30#include <linux/mm.h>
31#include <linux/memcontrol.h>
32#include <linux/mm_inline.h>
33#include <linux/slab.h>
34#include <linux/random.h>
b411b363
PR
35#include <linux/string.h>
36#include <linux/scatterlist.h>
37
38#include "drbd_int.h"
39#include "drbd_req.h"
b411b363 40
b411b363 41static int w_make_ov_request(struct drbd_conf *mdev, struct drbd_work *w, int cancel);
9d77a5fe
PR
42static int w_make_resync_request(struct drbd_conf *mdev,
43 struct drbd_work *w, int cancel);
b411b363
PR
44
45
46
47/* defined here:
48 drbd_md_io_complete
45bb912b 49 drbd_endio_sec
b411b363
PR
50 drbd_endio_pri
51
52 * more endio handlers:
53 atodb_endio in drbd_actlog.c
54 drbd_bm_async_io_complete in drbd_bitmap.c
55
56 * For all these callbacks, note the following:
57 * The callbacks will be called in irq context by the IDE drivers,
58 * and in Softirqs/Tasklets/BH context by the SCSI drivers.
59 * Try to get the locking right :)
60 *
61 */
62
63
64/* About the global_state_lock
65 Each state transition on an device holds a read lock. In case we have
66 to evaluate the sync after dependencies, we grab a write lock, because
67 we need stable states on all devices for that. */
68rwlock_t global_state_lock;
69
70/* used for synchronous meta data and bitmap IO
71 * submitted by drbd_md_sync_page_io()
72 */
73void drbd_md_io_complete(struct bio *bio, int error)
74{
75 struct drbd_md_io *md_io;
76
77 md_io = (struct drbd_md_io *)bio->bi_private;
78 md_io->error = error;
79
b411b363
PR
80 complete(&md_io->event);
81}
82
83/* reads on behalf of the partner,
84 * "submitted" by the receiver
85 */
45bb912b 86void drbd_endio_read_sec_final(struct drbd_epoch_entry *e) __releases(local)
b411b363
PR
87{
88 unsigned long flags = 0;
45bb912b 89 struct drbd_conf *mdev = e->mdev;
b411b363
PR
90
91 D_ASSERT(e->block_id != ID_VACANT);
92
b411b363
PR
93 spin_lock_irqsave(&mdev->req_lock, flags);
94 mdev->read_cnt += e->size >> 9;
95 list_del(&e->w.list);
96 if (list_empty(&mdev->read_ee))
97 wake_up(&mdev->ee_wait);
45bb912b
LE
98 if (test_bit(__EE_WAS_ERROR, &e->flags))
99 __drbd_chk_io_error(mdev, FALSE);
b411b363
PR
100 spin_unlock_irqrestore(&mdev->req_lock, flags);
101
b411b363
PR
102 drbd_queue_work(&mdev->data.work, &e->w);
103 put_ldev(mdev);
b411b363
PR
104}
105
106/* writes on behalf of the partner, or resync writes,
45bb912b
LE
107 * "submitted" by the receiver, final stage. */
108static void drbd_endio_write_sec_final(struct drbd_epoch_entry *e) __releases(local)
b411b363
PR
109{
110 unsigned long flags = 0;
45bb912b 111 struct drbd_conf *mdev = e->mdev;
b411b363
PR
112 sector_t e_sector;
113 int do_wake;
114 int is_syncer_req;
115 int do_al_complete_io;
b411b363 116
b411b363
PR
117 D_ASSERT(e->block_id != ID_VACANT);
118
b411b363
PR
119 /* after we moved e to done_ee,
120 * we may no longer access it,
121 * it may be freed/reused already!
122 * (as soon as we release the req_lock) */
123 e_sector = e->sector;
124 do_al_complete_io = e->flags & EE_CALL_AL_COMPLETE_IO;
45bb912b 125 is_syncer_req = is_syncer_block_id(e->block_id);
b411b363 126
45bb912b
LE
127 spin_lock_irqsave(&mdev->req_lock, flags);
128 mdev->writ_cnt += e->size >> 9;
b411b363
PR
129 list_del(&e->w.list); /* has been on active_ee or sync_ee */
130 list_add_tail(&e->w.list, &mdev->done_ee);
131
b411b363
PR
132 /* No hlist_del_init(&e->colision) here, we did not send the Ack yet,
133 * neither did we wake possibly waiting conflicting requests.
134 * done from "drbd_process_done_ee" within the appropriate w.cb
135 * (e_end_block/e_end_resync_block) or from _drbd_clear_done_ee */
136
137 do_wake = is_syncer_req
138 ? list_empty(&mdev->sync_ee)
139 : list_empty(&mdev->active_ee);
140
45bb912b 141 if (test_bit(__EE_WAS_ERROR, &e->flags))
b411b363
PR
142 __drbd_chk_io_error(mdev, FALSE);
143 spin_unlock_irqrestore(&mdev->req_lock, flags);
144
145 if (is_syncer_req)
146 drbd_rs_complete_io(mdev, e_sector);
147
148 if (do_wake)
149 wake_up(&mdev->ee_wait);
150
151 if (do_al_complete_io)
152 drbd_al_complete_io(mdev, e_sector);
153
154 wake_asender(mdev);
155 put_ldev(mdev);
45bb912b 156}
b411b363 157
45bb912b
LE
158/* writes on behalf of the partner, or resync writes,
159 * "submitted" by the receiver.
160 */
161void drbd_endio_sec(struct bio *bio, int error)
162{
163 struct drbd_epoch_entry *e = bio->bi_private;
164 struct drbd_conf *mdev = e->mdev;
165 int uptodate = bio_flagged(bio, BIO_UPTODATE);
166 int is_write = bio_data_dir(bio) == WRITE;
167
168 if (error)
169 dev_warn(DEV, "%s: error=%d s=%llus\n",
170 is_write ? "write" : "read", error,
171 (unsigned long long)e->sector);
172 if (!error && !uptodate) {
173 dev_warn(DEV, "%s: setting error to -EIO s=%llus\n",
174 is_write ? "write" : "read",
175 (unsigned long long)e->sector);
176 /* strange behavior of some lower level drivers...
177 * fail the request by clearing the uptodate flag,
178 * but do not return any error?! */
179 error = -EIO;
180 }
181
182 if (error)
183 set_bit(__EE_WAS_ERROR, &e->flags);
184
185 bio_put(bio); /* no need for the bio anymore */
186 if (atomic_dec_and_test(&e->pending_bios)) {
187 if (is_write)
188 drbd_endio_write_sec_final(e);
189 else
190 drbd_endio_read_sec_final(e);
191 }
b411b363
PR
192}
193
194/* read, readA or write requests on R_PRIMARY coming from drbd_make_request
195 */
196void drbd_endio_pri(struct bio *bio, int error)
197{
a115413d 198 unsigned long flags;
b411b363
PR
199 struct drbd_request *req = bio->bi_private;
200 struct drbd_conf *mdev = req->mdev;
a115413d 201 struct bio_and_error m;
b411b363
PR
202 enum drbd_req_event what;
203 int uptodate = bio_flagged(bio, BIO_UPTODATE);
204
b411b363
PR
205 if (!error && !uptodate) {
206 dev_warn(DEV, "p %s: setting error to -EIO\n",
207 bio_data_dir(bio) == WRITE ? "write" : "read");
208 /* strange behavior of some lower level drivers...
209 * fail the request by clearing the uptodate flag,
210 * but do not return any error?! */
211 error = -EIO;
212 }
213
b411b363
PR
214 /* to avoid recursion in __req_mod */
215 if (unlikely(error)) {
216 what = (bio_data_dir(bio) == WRITE)
217 ? write_completed_with_error
5c3c7e64 218 : (bio_rw(bio) == READ)
b411b363
PR
219 ? read_completed_with_error
220 : read_ahead_completed_with_error;
221 } else
222 what = completed_ok;
223
224 bio_put(req->private_bio);
225 req->private_bio = ERR_PTR(error);
226
a115413d
LE
227 /* not req_mod(), we need irqsave here! */
228 spin_lock_irqsave(&mdev->req_lock, flags);
229 __req_mod(req, what, &m);
230 spin_unlock_irqrestore(&mdev->req_lock, flags);
231
232 if (m.bio)
233 complete_master_bio(mdev, &m);
b411b363
PR
234}
235
b411b363
PR
236int w_read_retry_remote(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
237{
238 struct drbd_request *req = container_of(w, struct drbd_request, w);
239
240 /* We should not detach for read io-error,
241 * but try to WRITE the P_DATA_REPLY to the failed location,
242 * to give the disk the chance to relocate that block */
243
244 spin_lock_irq(&mdev->req_lock);
d255e5ff
LE
245 if (cancel || mdev->state.pdsk != D_UP_TO_DATE) {
246 _req_mod(req, read_retry_remote_canceled);
b411b363 247 spin_unlock_irq(&mdev->req_lock);
b411b363
PR
248 return 1;
249 }
250 spin_unlock_irq(&mdev->req_lock);
251
252 return w_send_read_req(mdev, w, 0);
253}
254
255int w_resync_inactive(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
256{
257 ERR_IF(cancel) return 1;
258 dev_err(DEV, "resync inactive, but callback triggered??\n");
259 return 1; /* Simply ignore this! */
260}
261
45bb912b
LE
262void drbd_csum_ee(struct drbd_conf *mdev, struct crypto_hash *tfm, struct drbd_epoch_entry *e, void *digest)
263{
264 struct hash_desc desc;
265 struct scatterlist sg;
266 struct page *page = e->pages;
267 struct page *tmp;
268 unsigned len;
269
270 desc.tfm = tfm;
271 desc.flags = 0;
272
273 sg_init_table(&sg, 1);
274 crypto_hash_init(&desc);
275
276 while ((tmp = page_chain_next(page))) {
277 /* all but the last page will be fully used */
278 sg_set_page(&sg, page, PAGE_SIZE, 0);
279 crypto_hash_update(&desc, &sg, sg.length);
280 page = tmp;
281 }
282 /* and now the last, possibly only partially used page */
283 len = e->size & (PAGE_SIZE - 1);
284 sg_set_page(&sg, page, len ?: PAGE_SIZE, 0);
285 crypto_hash_update(&desc, &sg, sg.length);
286 crypto_hash_final(&desc, digest);
287}
288
289void drbd_csum_bio(struct drbd_conf *mdev, struct crypto_hash *tfm, struct bio *bio, void *digest)
b411b363
PR
290{
291 struct hash_desc desc;
292 struct scatterlist sg;
293 struct bio_vec *bvec;
294 int i;
295
296 desc.tfm = tfm;
297 desc.flags = 0;
298
299 sg_init_table(&sg, 1);
300 crypto_hash_init(&desc);
301
302 __bio_for_each_segment(bvec, bio, i, 0) {
303 sg_set_page(&sg, bvec->bv_page, bvec->bv_len, bvec->bv_offset);
304 crypto_hash_update(&desc, &sg, sg.length);
305 }
306 crypto_hash_final(&desc, digest);
307}
308
309static int w_e_send_csum(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
310{
311 struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w);
312 int digest_size;
313 void *digest;
314 int ok;
315
316 D_ASSERT(e->block_id == DRBD_MAGIC + 0xbeef);
317
318 if (unlikely(cancel)) {
319 drbd_free_ee(mdev, e);
320 return 1;
321 }
322
45bb912b 323 if (likely((e->flags & EE_WAS_ERROR) == 0)) {
b411b363
PR
324 digest_size = crypto_hash_digestsize(mdev->csums_tfm);
325 digest = kmalloc(digest_size, GFP_NOIO);
326 if (digest) {
45bb912b 327 drbd_csum_ee(mdev, mdev->csums_tfm, e, digest);
b411b363
PR
328
329 inc_rs_pending(mdev);
330 ok = drbd_send_drequest_csum(mdev,
331 e->sector,
332 e->size,
333 digest,
334 digest_size,
335 P_CSUM_RS_REQUEST);
336 kfree(digest);
337 } else {
338 dev_err(DEV, "kmalloc() of digest failed.\n");
339 ok = 0;
340 }
341 } else
342 ok = 1;
343
344 drbd_free_ee(mdev, e);
345
346 if (unlikely(!ok))
347 dev_err(DEV, "drbd_send_drequest(..., csum) failed\n");
348 return ok;
349}
350
351#define GFP_TRY (__GFP_HIGHMEM | __GFP_NOWARN)
352
353static int read_for_csum(struct drbd_conf *mdev, sector_t sector, int size)
354{
355 struct drbd_epoch_entry *e;
356
357 if (!get_ldev(mdev))
80a40e43 358 return -EIO;
b411b363 359
e3555d85 360 if (drbd_rs_should_slow_down(mdev, sector))
0f0601f4
LE
361 goto defer;
362
b411b363
PR
363 /* GFP_TRY, because if there is no memory available right now, this may
364 * be rescheduled for later. It is "only" background resync, after all. */
365 e = drbd_alloc_ee(mdev, DRBD_MAGIC+0xbeef, sector, size, GFP_TRY);
45bb912b 366 if (!e)
80a40e43 367 goto defer;
b411b363 368
80a40e43 369 e->w.cb = w_e_send_csum;
b411b363
PR
370 spin_lock_irq(&mdev->req_lock);
371 list_add(&e->w.list, &mdev->read_ee);
372 spin_unlock_irq(&mdev->req_lock);
373
0f0601f4 374 atomic_add(size >> 9, &mdev->rs_sect_ev);
45bb912b 375 if (drbd_submit_ee(mdev, e, READ, DRBD_FAULT_RS_RD) == 0)
80a40e43 376 return 0;
b411b363 377
22cc37a9
LE
378 /* drbd_submit_ee currently fails for one reason only:
379 * not being able to allocate enough bios.
380 * Is dropping the connection going to help? */
381 spin_lock_irq(&mdev->req_lock);
382 list_del(&e->w.list);
383 spin_unlock_irq(&mdev->req_lock);
384
45bb912b 385 drbd_free_ee(mdev, e);
80a40e43 386defer:
45bb912b 387 put_ldev(mdev);
80a40e43 388 return -EAGAIN;
b411b363
PR
389}
390
391void resync_timer_fn(unsigned long data)
392{
b411b363
PR
393 struct drbd_conf *mdev = (struct drbd_conf *) data;
394 int queue;
395
63106d3c
PR
396 queue = 1;
397 switch (mdev->state.conn) {
398 case C_VERIFY_S:
399 mdev->resync_work.cb = w_make_ov_request;
400 break;
401 case C_SYNC_TARGET:
402 mdev->resync_work.cb = w_make_resync_request;
403 break;
404 default:
b411b363
PR
405 queue = 0;
406 mdev->resync_work.cb = w_resync_inactive;
407 }
408
b411b363
PR
409 /* harmless race: list_empty outside data.work.q_lock */
410 if (list_empty(&mdev->resync_work.list) && queue)
411 drbd_queue_work(&mdev->data.work, &mdev->resync_work);
412}
413
778f271d
PR
414static void fifo_set(struct fifo_buffer *fb, int value)
415{
416 int i;
417
418 for (i = 0; i < fb->size; i++)
f10f2623 419 fb->values[i] = value;
778f271d
PR
420}
421
422static int fifo_push(struct fifo_buffer *fb, int value)
423{
424 int ov;
425
426 ov = fb->values[fb->head_index];
427 fb->values[fb->head_index++] = value;
428
429 if (fb->head_index >= fb->size)
430 fb->head_index = 0;
431
432 return ov;
433}
434
435static void fifo_add_val(struct fifo_buffer *fb, int value)
436{
437 int i;
438
439 for (i = 0; i < fb->size; i++)
440 fb->values[i] += value;
441}
442
9d77a5fe 443static int drbd_rs_controller(struct drbd_conf *mdev)
778f271d
PR
444{
445 unsigned int sect_in; /* Number of sectors that came in since the last turn */
446 unsigned int want; /* The number of sectors we want in the proxy */
447 int req_sect; /* Number of sectors to request in this turn */
448 int correction; /* Number of sectors more we need in the proxy*/
449 int cps; /* correction per invocation of drbd_rs_controller() */
450 int steps; /* Number of time steps to plan ahead */
451 int curr_corr;
452 int max_sect;
453
454 sect_in = atomic_xchg(&mdev->rs_sect_in, 0); /* Number of sectors that came in */
455 mdev->rs_in_flight -= sect_in;
456
457 spin_lock(&mdev->peer_seq_lock); /* get an atomic view on mdev->rs_plan_s */
458
459 steps = mdev->rs_plan_s.size; /* (mdev->sync_conf.c_plan_ahead * 10 * SLEEP_TIME) / HZ; */
460
461 if (mdev->rs_in_flight + sect_in == 0) { /* At start of resync */
462 want = ((mdev->sync_conf.rate * 2 * SLEEP_TIME) / HZ) * steps;
463 } else { /* normal path */
464 want = mdev->sync_conf.c_fill_target ? mdev->sync_conf.c_fill_target :
465 sect_in * mdev->sync_conf.c_delay_target * HZ / (SLEEP_TIME * 10);
466 }
467
468 correction = want - mdev->rs_in_flight - mdev->rs_planed;
469
470 /* Plan ahead */
471 cps = correction / steps;
472 fifo_add_val(&mdev->rs_plan_s, cps);
473 mdev->rs_planed += cps * steps;
474
475 /* What we do in this step */
476 curr_corr = fifo_push(&mdev->rs_plan_s, 0);
477 spin_unlock(&mdev->peer_seq_lock);
478 mdev->rs_planed -= curr_corr;
479
480 req_sect = sect_in + curr_corr;
481 if (req_sect < 0)
482 req_sect = 0;
483
484 max_sect = (mdev->sync_conf.c_max_rate * 2 * SLEEP_TIME) / HZ;
485 if (req_sect > max_sect)
486 req_sect = max_sect;
487
488 /*
489 dev_warn(DEV, "si=%u if=%d wa=%u co=%d st=%d cps=%d pl=%d cc=%d rs=%d\n",
490 sect_in, mdev->rs_in_flight, want, correction,
491 steps, cps, mdev->rs_planed, curr_corr, req_sect);
492 */
493
494 return req_sect;
495}
496
9d77a5fe 497static int drbd_rs_number_requests(struct drbd_conf *mdev)
e65f440d
LE
498{
499 int number;
500 if (mdev->rs_plan_s.size) { /* mdev->sync_conf.c_plan_ahead */
501 number = drbd_rs_controller(mdev) >> (BM_BLOCK_SHIFT - 9);
502 mdev->c_sync_rate = number * HZ * (BM_BLOCK_SIZE / 1024) / SLEEP_TIME;
503 } else {
504 mdev->c_sync_rate = mdev->sync_conf.rate;
505 number = SLEEP_TIME * mdev->c_sync_rate / ((BM_BLOCK_SIZE / 1024) * HZ);
506 }
507
e65f440d
LE
508 /* ignore the amount of pending requests, the resync controller should
509 * throttle down to incoming reply rate soon enough anyways. */
510 return number;
511}
512
9d77a5fe
PR
513static int w_make_resync_request(struct drbd_conf *mdev,
514 struct drbd_work *w, int cancel)
b411b363
PR
515{
516 unsigned long bit;
517 sector_t sector;
518 const sector_t capacity = drbd_get_capacity(mdev->this_bdev);
1816a2b4 519 int max_bio_size;
e65f440d 520 int number, rollback_i, size;
b411b363 521 int align, queued, sndbuf;
0f0601f4 522 int i = 0;
b411b363
PR
523
524 if (unlikely(cancel))
525 return 1;
526
527 if (unlikely(mdev->state.conn < C_CONNECTED)) {
528 dev_err(DEV, "Confused in w_make_resync_request()! cstate < Connected");
529 return 0;
530 }
531
532 if (mdev->state.conn != C_SYNC_TARGET)
533 dev_err(DEV, "%s in w_make_resync_request\n",
534 drbd_conn_str(mdev->state.conn));
535
af85e8e8
LE
536 if (mdev->rs_total == 0) {
537 /* empty resync? */
538 drbd_resync_finished(mdev);
539 return 1;
540 }
541
b411b363
PR
542 if (!get_ldev(mdev)) {
543 /* Since we only need to access mdev->rsync a
544 get_ldev_if_state(mdev,D_FAILED) would be sufficient, but
545 to continue resync with a broken disk makes no sense at
546 all */
547 dev_err(DEV, "Disk broke down during resync!\n");
548 mdev->resync_work.cb = w_resync_inactive;
549 return 1;
550 }
551
bb3d000c
LE
552 /* starting with drbd 8.3.8, we can handle multi-bio EEs,
553 * if it should be necessary */
1816a2b4
LE
554 max_bio_size =
555 mdev->agreed_pro_version < 94 ? queue_max_hw_sectors(mdev->rq_queue) << 9 :
556 mdev->agreed_pro_version < 95 ? DRBD_MAX_SIZE_H80_PACKET : DRBD_MAX_BIO_SIZE;
bb3d000c 557
e65f440d
LE
558 number = drbd_rs_number_requests(mdev);
559 if (number == 0)
0f0601f4 560 goto requeue;
b411b363 561
b411b363
PR
562 for (i = 0; i < number; i++) {
563 /* Stop generating RS requests, when half of the send buffer is filled */
564 mutex_lock(&mdev->data.mutex);
565 if (mdev->data.socket) {
566 queued = mdev->data.socket->sk->sk_wmem_queued;
567 sndbuf = mdev->data.socket->sk->sk_sndbuf;
568 } else {
569 queued = 1;
570 sndbuf = 0;
571 }
572 mutex_unlock(&mdev->data.mutex);
573 if (queued > sndbuf / 2)
574 goto requeue;
575
576next_sector:
577 size = BM_BLOCK_SIZE;
578 bit = drbd_bm_find_next(mdev, mdev->bm_resync_fo);
579
580 if (bit == -1UL) {
581 mdev->bm_resync_fo = drbd_bm_bits(mdev);
582 mdev->resync_work.cb = w_resync_inactive;
583 put_ldev(mdev);
584 return 1;
585 }
586
587 sector = BM_BIT_TO_SECT(bit);
588
e3555d85
PR
589 if (drbd_rs_should_slow_down(mdev, sector) ||
590 drbd_try_rs_begin_io(mdev, sector)) {
b411b363
PR
591 mdev->bm_resync_fo = bit;
592 goto requeue;
593 }
594 mdev->bm_resync_fo = bit + 1;
595
596 if (unlikely(drbd_bm_test_bit(mdev, bit) == 0)) {
597 drbd_rs_complete_io(mdev, sector);
598 goto next_sector;
599 }
600
1816a2b4 601#if DRBD_MAX_BIO_SIZE > BM_BLOCK_SIZE
b411b363
PR
602 /* try to find some adjacent bits.
603 * we stop if we have already the maximum req size.
604 *
605 * Additionally always align bigger requests, in order to
606 * be prepared for all stripe sizes of software RAIDs.
b411b363
PR
607 */
608 align = 1;
d207450c 609 rollback_i = i;
b411b363 610 for (;;) {
1816a2b4 611 if (size + BM_BLOCK_SIZE > max_bio_size)
b411b363
PR
612 break;
613
614 /* Be always aligned */
615 if (sector & ((1<<(align+3))-1))
616 break;
617
618 /* do not cross extent boundaries */
619 if (((bit+1) & BM_BLOCKS_PER_BM_EXT_MASK) == 0)
620 break;
621 /* now, is it actually dirty, after all?
622 * caution, drbd_bm_test_bit is tri-state for some
623 * obscure reason; ( b == 0 ) would get the out-of-band
624 * only accidentally right because of the "oddly sized"
625 * adjustment below */
626 if (drbd_bm_test_bit(mdev, bit+1) != 1)
627 break;
628 bit++;
629 size += BM_BLOCK_SIZE;
630 if ((BM_BLOCK_SIZE << align) <= size)
631 align++;
632 i++;
633 }
634 /* if we merged some,
635 * reset the offset to start the next drbd_bm_find_next from */
636 if (size > BM_BLOCK_SIZE)
637 mdev->bm_resync_fo = bit + 1;
638#endif
639
640 /* adjust very last sectors, in case we are oddly sized */
641 if (sector + (size>>9) > capacity)
642 size = (capacity-sector)<<9;
643 if (mdev->agreed_pro_version >= 89 && mdev->csums_tfm) {
644 switch (read_for_csum(mdev, sector, size)) {
80a40e43 645 case -EIO: /* Disk failure */
b411b363
PR
646 put_ldev(mdev);
647 return 0;
80a40e43 648 case -EAGAIN: /* allocation failed, or ldev busy */
b411b363
PR
649 drbd_rs_complete_io(mdev, sector);
650 mdev->bm_resync_fo = BM_SECT_TO_BIT(sector);
d207450c 651 i = rollback_i;
b411b363 652 goto requeue;
80a40e43
LE
653 case 0:
654 /* everything ok */
655 break;
656 default:
657 BUG();
b411b363
PR
658 }
659 } else {
660 inc_rs_pending(mdev);
661 if (!drbd_send_drequest(mdev, P_RS_DATA_REQUEST,
662 sector, size, ID_SYNCER)) {
663 dev_err(DEV, "drbd_send_drequest() failed, aborting...\n");
664 dec_rs_pending(mdev);
665 put_ldev(mdev);
666 return 0;
667 }
668 }
669 }
670
671 if (mdev->bm_resync_fo >= drbd_bm_bits(mdev)) {
672 /* last syncer _request_ was sent,
673 * but the P_RS_DATA_REPLY not yet received. sync will end (and
674 * next sync group will resume), as soon as we receive the last
675 * resync data block, and the last bit is cleared.
676 * until then resync "work" is "inactive" ...
677 */
678 mdev->resync_work.cb = w_resync_inactive;
679 put_ldev(mdev);
680 return 1;
681 }
682
683 requeue:
778f271d 684 mdev->rs_in_flight += (i << (BM_BLOCK_SHIFT - 9));
b411b363
PR
685 mod_timer(&mdev->resync_timer, jiffies + SLEEP_TIME);
686 put_ldev(mdev);
687 return 1;
688}
689
690static int w_make_ov_request(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
691{
692 int number, i, size;
693 sector_t sector;
694 const sector_t capacity = drbd_get_capacity(mdev->this_bdev);
695
696 if (unlikely(cancel))
697 return 1;
698
699 if (unlikely(mdev->state.conn < C_CONNECTED)) {
700 dev_err(DEV, "Confused in w_make_ov_request()! cstate < Connected");
701 return 0;
702 }
703
2649f080 704 number = drbd_rs_number_requests(mdev);
b411b363
PR
705
706 sector = mdev->ov_position;
707 for (i = 0; i < number; i++) {
708 if (sector >= capacity) {
709 mdev->resync_work.cb = w_resync_inactive;
710 return 1;
711 }
712
713 size = BM_BLOCK_SIZE;
714
e3555d85
PR
715 if (drbd_rs_should_slow_down(mdev, sector) ||
716 drbd_try_rs_begin_io(mdev, sector)) {
b411b363
PR
717 mdev->ov_position = sector;
718 goto requeue;
719 }
720
721 if (sector + (size>>9) > capacity)
722 size = (capacity-sector)<<9;
723
724 inc_rs_pending(mdev);
725 if (!drbd_send_ov_request(mdev, sector, size)) {
726 dec_rs_pending(mdev);
727 return 0;
728 }
729 sector += BM_SECT_PER_BIT;
730 }
731 mdev->ov_position = sector;
732
733 requeue:
2649f080 734 mdev->rs_in_flight += (i << (BM_BLOCK_SHIFT - 9));
b411b363
PR
735 mod_timer(&mdev->resync_timer, jiffies + SLEEP_TIME);
736 return 1;
737}
738
c4752ef1
PR
739
740int w_start_resync(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
741{
742 drbd_start_resync(mdev, C_SYNC_SOURCE);
743
744 return 1;
745}
746
b411b363
PR
747int w_ov_finished(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
748{
749 kfree(w);
750 ov_oos_print(mdev);
751 drbd_resync_finished(mdev);
752
753 return 1;
754}
755
756static int w_resync_finished(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
757{
758 kfree(w);
759
760 drbd_resync_finished(mdev);
761
762 return 1;
763}
764
af85e8e8
LE
765static void ping_peer(struct drbd_conf *mdev)
766{
767 clear_bit(GOT_PING_ACK, &mdev->flags);
768 request_ping(mdev);
769 wait_event(mdev->misc_wait,
770 test_bit(GOT_PING_ACK, &mdev->flags) || mdev->state.conn < C_CONNECTED);
771}
772
b411b363
PR
773int drbd_resync_finished(struct drbd_conf *mdev)
774{
775 unsigned long db, dt, dbdt;
776 unsigned long n_oos;
777 union drbd_state os, ns;
778 struct drbd_work *w;
779 char *khelper_cmd = NULL;
26525618 780 int verify_done = 0;
b411b363
PR
781
782 /* Remove all elements from the resync LRU. Since future actions
783 * might set bits in the (main) bitmap, then the entries in the
784 * resync LRU would be wrong. */
785 if (drbd_rs_del_all(mdev)) {
786 /* In case this is not possible now, most probably because
787 * there are P_RS_DATA_REPLY Packets lingering on the worker's
788 * queue (or even the read operations for those packets
789 * is not finished by now). Retry in 100ms. */
790
b411b363
PR
791 __set_current_state(TASK_INTERRUPTIBLE);
792 schedule_timeout(HZ / 10);
793 w = kmalloc(sizeof(struct drbd_work), GFP_ATOMIC);
794 if (w) {
795 w->cb = w_resync_finished;
796 drbd_queue_work(&mdev->data.work, w);
797 return 1;
798 }
799 dev_err(DEV, "Warn failed to drbd_rs_del_all() and to kmalloc(w).\n");
800 }
801
802 dt = (jiffies - mdev->rs_start - mdev->rs_paused) / HZ;
803 if (dt <= 0)
804 dt = 1;
805 db = mdev->rs_total;
806 dbdt = Bit2KB(db/dt);
807 mdev->rs_paused /= HZ;
808
809 if (!get_ldev(mdev))
810 goto out;
811
af85e8e8
LE
812 ping_peer(mdev);
813
b411b363
PR
814 spin_lock_irq(&mdev->req_lock);
815 os = mdev->state;
816
26525618
LE
817 verify_done = (os.conn == C_VERIFY_S || os.conn == C_VERIFY_T);
818
b411b363
PR
819 /* This protects us against multiple calls (that can happen in the presence
820 of application IO), and against connectivity loss just before we arrive here. */
821 if (os.conn <= C_CONNECTED)
822 goto out_unlock;
823
824 ns = os;
825 ns.conn = C_CONNECTED;
826
827 dev_info(DEV, "%s done (total %lu sec; paused %lu sec; %lu K/sec)\n",
26525618 828 verify_done ? "Online verify " : "Resync",
b411b363
PR
829 dt + mdev->rs_paused, mdev->rs_paused, dbdt);
830
831 n_oos = drbd_bm_total_weight(mdev);
832
833 if (os.conn == C_VERIFY_S || os.conn == C_VERIFY_T) {
834 if (n_oos) {
835 dev_alert(DEV, "Online verify found %lu %dk block out of sync!\n",
836 n_oos, Bit2KB(1));
837 khelper_cmd = "out-of-sync";
838 }
839 } else {
840 D_ASSERT((n_oos - mdev->rs_failed) == 0);
841
842 if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T)
843 khelper_cmd = "after-resync-target";
844
845 if (mdev->csums_tfm && mdev->rs_total) {
846 const unsigned long s = mdev->rs_same_csum;
847 const unsigned long t = mdev->rs_total;
848 const int ratio =
849 (t == 0) ? 0 :
850 (t < 100000) ? ((s*100)/t) : (s/(t/100));
851 dev_info(DEV, "%u %% had equal check sums, eliminated: %luK; "
852 "transferred %luK total %luK\n",
853 ratio,
854 Bit2KB(mdev->rs_same_csum),
855 Bit2KB(mdev->rs_total - mdev->rs_same_csum),
856 Bit2KB(mdev->rs_total));
857 }
858 }
859
860 if (mdev->rs_failed) {
861 dev_info(DEV, " %lu failed blocks\n", mdev->rs_failed);
862
863 if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T) {
864 ns.disk = D_INCONSISTENT;
865 ns.pdsk = D_UP_TO_DATE;
866 } else {
867 ns.disk = D_UP_TO_DATE;
868 ns.pdsk = D_INCONSISTENT;
869 }
870 } else {
871 ns.disk = D_UP_TO_DATE;
872 ns.pdsk = D_UP_TO_DATE;
873
874 if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T) {
875 if (mdev->p_uuid) {
876 int i;
877 for (i = UI_BITMAP ; i <= UI_HISTORY_END ; i++)
878 _drbd_uuid_set(mdev, i, mdev->p_uuid[i]);
879 drbd_uuid_set(mdev, UI_BITMAP, mdev->ldev->md.uuid[UI_CURRENT]);
880 _drbd_uuid_set(mdev, UI_CURRENT, mdev->p_uuid[UI_CURRENT]);
881 } else {
882 dev_err(DEV, "mdev->p_uuid is NULL! BUG\n");
883 }
884 }
885
886 drbd_uuid_set_bm(mdev, 0UL);
887
888 if (mdev->p_uuid) {
889 /* Now the two UUID sets are equal, update what we
890 * know of the peer. */
891 int i;
892 for (i = UI_CURRENT ; i <= UI_HISTORY_END ; i++)
893 mdev->p_uuid[i] = mdev->ldev->md.uuid[i];
894 }
895 }
896
897 _drbd_set_state(mdev, ns, CS_VERBOSE, NULL);
898out_unlock:
899 spin_unlock_irq(&mdev->req_lock);
900 put_ldev(mdev);
901out:
902 mdev->rs_total = 0;
903 mdev->rs_failed = 0;
904 mdev->rs_paused = 0;
26525618
LE
905 if (verify_done)
906 mdev->ov_start_sector = 0;
b411b363 907
13d42685
LE
908 drbd_md_sync(mdev);
909
b411b363 910 if (test_and_clear_bit(WRITE_BM_AFTER_RESYNC, &mdev->flags)) {
bc571b8c 911 dev_info(DEV, "Writing the whole bitmap\n");
b411b363
PR
912 drbd_queue_bitmap_io(mdev, &drbd_bm_write, NULL, "write from resync_finished");
913 }
914
915 if (khelper_cmd)
916 drbd_khelper(mdev, khelper_cmd);
917
918 return 1;
919}
920
921/* helper */
922static void move_to_net_ee_or_free(struct drbd_conf *mdev, struct drbd_epoch_entry *e)
923{
45bb912b 924 if (drbd_ee_has_active_page(e)) {
b411b363 925 /* This might happen if sendpage() has not finished */
78db8928 926 int i = (e->size + PAGE_SIZE -1) >> PAGE_SHIFT;
435f0740
LE
927 atomic_add(i, &mdev->pp_in_use_by_net);
928 atomic_sub(i, &mdev->pp_in_use);
b411b363
PR
929 spin_lock_irq(&mdev->req_lock);
930 list_add_tail(&e->w.list, &mdev->net_ee);
931 spin_unlock_irq(&mdev->req_lock);
435f0740 932 wake_up(&drbd_pp_wait);
b411b363
PR
933 } else
934 drbd_free_ee(mdev, e);
935}
936
937/**
938 * w_e_end_data_req() - Worker callback, to send a P_DATA_REPLY packet in response to a P_DATA_REQUEST
939 * @mdev: DRBD device.
940 * @w: work object.
941 * @cancel: The connection will be closed anyways
942 */
943int w_e_end_data_req(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
944{
945 struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w);
946 int ok;
947
948 if (unlikely(cancel)) {
949 drbd_free_ee(mdev, e);
950 dec_unacked(mdev);
951 return 1;
952 }
953
45bb912b 954 if (likely((e->flags & EE_WAS_ERROR) == 0)) {
b411b363
PR
955 ok = drbd_send_block(mdev, P_DATA_REPLY, e);
956 } else {
957 if (__ratelimit(&drbd_ratelimit_state))
958 dev_err(DEV, "Sending NegDReply. sector=%llus.\n",
959 (unsigned long long)e->sector);
960
961 ok = drbd_send_ack(mdev, P_NEG_DREPLY, e);
962 }
963
964 dec_unacked(mdev);
965
966 move_to_net_ee_or_free(mdev, e);
967
968 if (unlikely(!ok))
969 dev_err(DEV, "drbd_send_block() failed\n");
970 return ok;
971}
972
973/**
974 * w_e_end_rsdata_req() - Worker callback to send a P_RS_DATA_REPLY packet in response to a P_RS_DATA_REQUESTRS
975 * @mdev: DRBD device.
976 * @w: work object.
977 * @cancel: The connection will be closed anyways
978 */
979int w_e_end_rsdata_req(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
980{
981 struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w);
982 int ok;
983
984 if (unlikely(cancel)) {
985 drbd_free_ee(mdev, e);
986 dec_unacked(mdev);
987 return 1;
988 }
989
990 if (get_ldev_if_state(mdev, D_FAILED)) {
991 drbd_rs_complete_io(mdev, e->sector);
992 put_ldev(mdev);
993 }
994
45bb912b 995 if (likely((e->flags & EE_WAS_ERROR) == 0)) {
b411b363
PR
996 if (likely(mdev->state.pdsk >= D_INCONSISTENT)) {
997 inc_rs_pending(mdev);
998 ok = drbd_send_block(mdev, P_RS_DATA_REPLY, e);
999 } else {
1000 if (__ratelimit(&drbd_ratelimit_state))
1001 dev_err(DEV, "Not sending RSDataReply, "
1002 "partner DISKLESS!\n");
1003 ok = 1;
1004 }
1005 } else {
1006 if (__ratelimit(&drbd_ratelimit_state))
1007 dev_err(DEV, "Sending NegRSDReply. sector %llus.\n",
1008 (unsigned long long)e->sector);
1009
1010 ok = drbd_send_ack(mdev, P_NEG_RS_DREPLY, e);
1011
1012 /* update resync data with failure */
1013 drbd_rs_failed_io(mdev, e->sector, e->size);
1014 }
1015
1016 dec_unacked(mdev);
1017
1018 move_to_net_ee_or_free(mdev, e);
1019
1020 if (unlikely(!ok))
1021 dev_err(DEV, "drbd_send_block() failed\n");
1022 return ok;
1023}
1024
1025int w_e_end_csum_rs_req(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1026{
1027 struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w);
1028 struct digest_info *di;
1029 int digest_size;
1030 void *digest = NULL;
1031 int ok, eq = 0;
1032
1033 if (unlikely(cancel)) {
1034 drbd_free_ee(mdev, e);
1035 dec_unacked(mdev);
1036 return 1;
1037 }
1038
1d53f09e
LE
1039 if (get_ldev(mdev)) {
1040 drbd_rs_complete_io(mdev, e->sector);
1041 put_ldev(mdev);
1042 }
b411b363 1043
85719573 1044 di = e->digest;
b411b363 1045
45bb912b 1046 if (likely((e->flags & EE_WAS_ERROR) == 0)) {
b411b363
PR
1047 /* quick hack to try to avoid a race against reconfiguration.
1048 * a real fix would be much more involved,
1049 * introducing more locking mechanisms */
1050 if (mdev->csums_tfm) {
1051 digest_size = crypto_hash_digestsize(mdev->csums_tfm);
1052 D_ASSERT(digest_size == di->digest_size);
1053 digest = kmalloc(digest_size, GFP_NOIO);
1054 }
1055 if (digest) {
45bb912b 1056 drbd_csum_ee(mdev, mdev->csums_tfm, e, digest);
b411b363
PR
1057 eq = !memcmp(digest, di->digest, digest_size);
1058 kfree(digest);
1059 }
1060
1061 if (eq) {
1062 drbd_set_in_sync(mdev, e->sector, e->size);
676396d5
LE
1063 /* rs_same_csums unit is BM_BLOCK_SIZE */
1064 mdev->rs_same_csum += e->size >> BM_BLOCK_SHIFT;
b411b363
PR
1065 ok = drbd_send_ack(mdev, P_RS_IS_IN_SYNC, e);
1066 } else {
1067 inc_rs_pending(mdev);
204bba99
PR
1068 e->block_id = ID_SYNCER; /* By setting block_id, digest pointer becomes invalid! */
1069 e->flags &= ~EE_HAS_DIGEST; /* This e no longer has a digest pointer */
1070 kfree(di);
b411b363
PR
1071 ok = drbd_send_block(mdev, P_RS_DATA_REPLY, e);
1072 }
1073 } else {
1074 ok = drbd_send_ack(mdev, P_NEG_RS_DREPLY, e);
1075 if (__ratelimit(&drbd_ratelimit_state))
1076 dev_err(DEV, "Sending NegDReply. I guess it gets messy.\n");
1077 }
1078
1079 dec_unacked(mdev);
b411b363
PR
1080 move_to_net_ee_or_free(mdev, e);
1081
1082 if (unlikely(!ok))
1083 dev_err(DEV, "drbd_send_block/ack() failed\n");
1084 return ok;
1085}
1086
1087int w_e_end_ov_req(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1088{
1089 struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w);
1090 int digest_size;
1091 void *digest;
1092 int ok = 1;
1093
1094 if (unlikely(cancel))
1095 goto out;
1096
45bb912b 1097 if (unlikely((e->flags & EE_WAS_ERROR) != 0))
b411b363
PR
1098 goto out;
1099
1100 digest_size = crypto_hash_digestsize(mdev->verify_tfm);
1101 /* FIXME if this allocation fails, online verify will not terminate! */
1102 digest = kmalloc(digest_size, GFP_NOIO);
1103 if (digest) {
45bb912b 1104 drbd_csum_ee(mdev, mdev->verify_tfm, e, digest);
b411b363
PR
1105 inc_rs_pending(mdev);
1106 ok = drbd_send_drequest_csum(mdev, e->sector, e->size,
1107 digest, digest_size, P_OV_REPLY);
1108 if (!ok)
1109 dec_rs_pending(mdev);
1110 kfree(digest);
1111 }
1112
1113out:
1114 drbd_free_ee(mdev, e);
1115
1116 dec_unacked(mdev);
1117
1118 return ok;
1119}
1120
1121void drbd_ov_oos_found(struct drbd_conf *mdev, sector_t sector, int size)
1122{
1123 if (mdev->ov_last_oos_start + mdev->ov_last_oos_size == sector) {
1124 mdev->ov_last_oos_size += size>>9;
1125 } else {
1126 mdev->ov_last_oos_start = sector;
1127 mdev->ov_last_oos_size = size>>9;
1128 }
1129 drbd_set_out_of_sync(mdev, sector, size);
1130 set_bit(WRITE_BM_AFTER_RESYNC, &mdev->flags);
1131}
1132
1133int w_e_end_ov_reply(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1134{
1135 struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w);
1136 struct digest_info *di;
1137 int digest_size;
1138 void *digest;
1139 int ok, eq = 0;
1140
1141 if (unlikely(cancel)) {
1142 drbd_free_ee(mdev, e);
1143 dec_unacked(mdev);
1144 return 1;
1145 }
1146
1147 /* after "cancel", because after drbd_disconnect/drbd_rs_cancel_all
1148 * the resync lru has been cleaned up already */
1d53f09e
LE
1149 if (get_ldev(mdev)) {
1150 drbd_rs_complete_io(mdev, e->sector);
1151 put_ldev(mdev);
1152 }
b411b363 1153
85719573 1154 di = e->digest;
b411b363 1155
45bb912b 1156 if (likely((e->flags & EE_WAS_ERROR) == 0)) {
b411b363
PR
1157 digest_size = crypto_hash_digestsize(mdev->verify_tfm);
1158 digest = kmalloc(digest_size, GFP_NOIO);
1159 if (digest) {
45bb912b 1160 drbd_csum_ee(mdev, mdev->verify_tfm, e, digest);
b411b363
PR
1161
1162 D_ASSERT(digest_size == di->digest_size);
1163 eq = !memcmp(digest, di->digest, digest_size);
1164 kfree(digest);
1165 }
1166 } else {
1167 ok = drbd_send_ack(mdev, P_NEG_RS_DREPLY, e);
1168 if (__ratelimit(&drbd_ratelimit_state))
1169 dev_err(DEV, "Sending NegDReply. I guess it gets messy.\n");
1170 }
1171
1172 dec_unacked(mdev);
b411b363
PR
1173 if (!eq)
1174 drbd_ov_oos_found(mdev, e->sector, e->size);
1175 else
1176 ov_oos_print(mdev);
1177
1178 ok = drbd_send_ack_ex(mdev, P_OV_RESULT, e->sector, e->size,
1179 eq ? ID_IN_SYNC : ID_OUT_OF_SYNC);
1180
1181 drbd_free_ee(mdev, e);
1182
ea5442af
LE
1183 --mdev->ov_left;
1184
1185 /* let's advance progress step marks only for every other megabyte */
1186 if ((mdev->ov_left & 0x200) == 0x200)
1187 drbd_advance_rs_marks(mdev, mdev->ov_left);
1188
1189 if (mdev->ov_left == 0) {
b411b363
PR
1190 ov_oos_print(mdev);
1191 drbd_resync_finished(mdev);
1192 }
1193
1194 return ok;
1195}
1196
1197int w_prev_work_done(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1198{
1199 struct drbd_wq_barrier *b = container_of(w, struct drbd_wq_barrier, w);
1200 complete(&b->done);
1201 return 1;
1202}
1203
1204int w_send_barrier(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1205{
1206 struct drbd_tl_epoch *b = container_of(w, struct drbd_tl_epoch, w);
1207 struct p_barrier *p = &mdev->data.sbuf.barrier;
1208 int ok = 1;
1209
1210 /* really avoid racing with tl_clear. w.cb may have been referenced
1211 * just before it was reassigned and re-queued, so double check that.
1212 * actually, this race was harmless, since we only try to send the
1213 * barrier packet here, and otherwise do nothing with the object.
1214 * but compare with the head of w_clear_epoch */
1215 spin_lock_irq(&mdev->req_lock);
1216 if (w->cb != w_send_barrier || mdev->state.conn < C_CONNECTED)
1217 cancel = 1;
1218 spin_unlock_irq(&mdev->req_lock);
1219 if (cancel)
1220 return 1;
1221
1222 if (!drbd_get_data_sock(mdev))
1223 return 0;
1224 p->barrier = b->br_number;
1225 /* inc_ap_pending was done where this was queued.
1226 * dec_ap_pending will be done in got_BarrierAck
1227 * or (on connection loss) in w_clear_epoch. */
1228 ok = _drbd_send_cmd(mdev, mdev->data.socket, P_BARRIER,
0b70a13d 1229 (struct p_header80 *)p, sizeof(*p), 0);
b411b363
PR
1230 drbd_put_data_sock(mdev);
1231
1232 return ok;
1233}
1234
1235int w_send_write_hint(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1236{
1237 if (cancel)
1238 return 1;
1239 return drbd_send_short_cmd(mdev, P_UNPLUG_REMOTE);
1240}
1241
73a01a18
PR
1242int w_send_oos(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1243{
1244 struct drbd_request *req = container_of(w, struct drbd_request, w);
1245 int ok;
1246
1247 if (unlikely(cancel)) {
1248 req_mod(req, send_canceled);
1249 return 1;
1250 }
1251
1252 ok = drbd_send_oos(mdev, req);
1253 req_mod(req, oos_handed_to_network);
1254
1255 return ok;
1256}
1257
b411b363
PR
1258/**
1259 * w_send_dblock() - Worker callback to send a P_DATA packet in order to mirror a write request
1260 * @mdev: DRBD device.
1261 * @w: work object.
1262 * @cancel: The connection will be closed anyways
1263 */
1264int w_send_dblock(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1265{
1266 struct drbd_request *req = container_of(w, struct drbd_request, w);
1267 int ok;
1268
1269 if (unlikely(cancel)) {
1270 req_mod(req, send_canceled);
1271 return 1;
1272 }
1273
1274 ok = drbd_send_dblock(mdev, req);
1275 req_mod(req, ok ? handed_over_to_network : send_failed);
1276
1277 return ok;
1278}
1279
1280/**
1281 * w_send_read_req() - Worker callback to send a read request (P_DATA_REQUEST) packet
1282 * @mdev: DRBD device.
1283 * @w: work object.
1284 * @cancel: The connection will be closed anyways
1285 */
1286int w_send_read_req(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1287{
1288 struct drbd_request *req = container_of(w, struct drbd_request, w);
1289 int ok;
1290
1291 if (unlikely(cancel)) {
1292 req_mod(req, send_canceled);
1293 return 1;
1294 }
1295
1296 ok = drbd_send_drequest(mdev, P_DATA_REQUEST, req->sector, req->size,
1297 (unsigned long)req);
1298
1299 if (!ok) {
1300 /* ?? we set C_TIMEOUT or C_BROKEN_PIPE in drbd_send();
1301 * so this is probably redundant */
1302 if (mdev->state.conn >= C_CONNECTED)
1303 drbd_force_state(mdev, NS(conn, C_NETWORK_FAILURE));
1304 }
1305 req_mod(req, ok ? handed_over_to_network : send_failed);
1306
1307 return ok;
1308}
1309
265be2d0
PR
1310int w_restart_disk_io(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1311{
1312 struct drbd_request *req = container_of(w, struct drbd_request, w);
1313
0778286a 1314 if (bio_data_dir(req->master_bio) == WRITE && req->rq_state & RQ_IN_ACT_LOG)
265be2d0
PR
1315 drbd_al_begin_io(mdev, req->sector);
1316 /* Calling drbd_al_begin_io() out of the worker might deadlocks
1317 theoretically. Practically it can not deadlock, since this is
1318 only used when unfreezing IOs. All the extents of the requests
1319 that made it into the TL are already active */
1320
1321 drbd_req_make_private_bio(req, req->master_bio);
1322 req->private_bio->bi_bdev = mdev->ldev->backing_bdev;
1323 generic_make_request(req->private_bio);
1324
1325 return 1;
1326}
1327
b411b363
PR
1328static int _drbd_may_sync_now(struct drbd_conf *mdev)
1329{
1330 struct drbd_conf *odev = mdev;
1331
1332 while (1) {
1333 if (odev->sync_conf.after == -1)
1334 return 1;
1335 odev = minor_to_mdev(odev->sync_conf.after);
1336 ERR_IF(!odev) return 1;
1337 if ((odev->state.conn >= C_SYNC_SOURCE &&
1338 odev->state.conn <= C_PAUSED_SYNC_T) ||
1339 odev->state.aftr_isp || odev->state.peer_isp ||
1340 odev->state.user_isp)
1341 return 0;
1342 }
1343}
1344
1345/**
1346 * _drbd_pause_after() - Pause resync on all devices that may not resync now
1347 * @mdev: DRBD device.
1348 *
1349 * Called from process context only (admin command and after_state_ch).
1350 */
1351static int _drbd_pause_after(struct drbd_conf *mdev)
1352{
1353 struct drbd_conf *odev;
1354 int i, rv = 0;
1355
1356 for (i = 0; i < minor_count; i++) {
1357 odev = minor_to_mdev(i);
1358 if (!odev)
1359 continue;
1360 if (odev->state.conn == C_STANDALONE && odev->state.disk == D_DISKLESS)
1361 continue;
1362 if (!_drbd_may_sync_now(odev))
1363 rv |= (__drbd_set_state(_NS(odev, aftr_isp, 1), CS_HARD, NULL)
1364 != SS_NOTHING_TO_DO);
1365 }
1366
1367 return rv;
1368}
1369
1370/**
1371 * _drbd_resume_next() - Resume resync on all devices that may resync now
1372 * @mdev: DRBD device.
1373 *
1374 * Called from process context only (admin command and worker).
1375 */
1376static int _drbd_resume_next(struct drbd_conf *mdev)
1377{
1378 struct drbd_conf *odev;
1379 int i, rv = 0;
1380
1381 for (i = 0; i < minor_count; i++) {
1382 odev = minor_to_mdev(i);
1383 if (!odev)
1384 continue;
1385 if (odev->state.conn == C_STANDALONE && odev->state.disk == D_DISKLESS)
1386 continue;
1387 if (odev->state.aftr_isp) {
1388 if (_drbd_may_sync_now(odev))
1389 rv |= (__drbd_set_state(_NS(odev, aftr_isp, 0),
1390 CS_HARD, NULL)
1391 != SS_NOTHING_TO_DO) ;
1392 }
1393 }
1394 return rv;
1395}
1396
1397void resume_next_sg(struct drbd_conf *mdev)
1398{
1399 write_lock_irq(&global_state_lock);
1400 _drbd_resume_next(mdev);
1401 write_unlock_irq(&global_state_lock);
1402}
1403
1404void suspend_other_sg(struct drbd_conf *mdev)
1405{
1406 write_lock_irq(&global_state_lock);
1407 _drbd_pause_after(mdev);
1408 write_unlock_irq(&global_state_lock);
1409}
1410
1411static int sync_after_error(struct drbd_conf *mdev, int o_minor)
1412{
1413 struct drbd_conf *odev;
1414
1415 if (o_minor == -1)
1416 return NO_ERROR;
1417 if (o_minor < -1 || minor_to_mdev(o_minor) == NULL)
1418 return ERR_SYNC_AFTER;
1419
1420 /* check for loops */
1421 odev = minor_to_mdev(o_minor);
1422 while (1) {
1423 if (odev == mdev)
1424 return ERR_SYNC_AFTER_CYCLE;
1425
1426 /* dependency chain ends here, no cycles. */
1427 if (odev->sync_conf.after == -1)
1428 return NO_ERROR;
1429
1430 /* follow the dependency chain */
1431 odev = minor_to_mdev(odev->sync_conf.after);
1432 }
1433}
1434
1435int drbd_alter_sa(struct drbd_conf *mdev, int na)
1436{
1437 int changes;
1438 int retcode;
1439
1440 write_lock_irq(&global_state_lock);
1441 retcode = sync_after_error(mdev, na);
1442 if (retcode == NO_ERROR) {
1443 mdev->sync_conf.after = na;
1444 do {
1445 changes = _drbd_pause_after(mdev);
1446 changes |= _drbd_resume_next(mdev);
1447 } while (changes);
1448 }
1449 write_unlock_irq(&global_state_lock);
1450 return retcode;
1451}
1452
9bd28d3c
LE
1453void drbd_rs_controller_reset(struct drbd_conf *mdev)
1454{
1455 atomic_set(&mdev->rs_sect_in, 0);
1456 atomic_set(&mdev->rs_sect_ev, 0);
1457 mdev->rs_in_flight = 0;
1458 mdev->rs_planed = 0;
1459 spin_lock(&mdev->peer_seq_lock);
1460 fifo_set(&mdev->rs_plan_s, 0);
1461 spin_unlock(&mdev->peer_seq_lock);
1462}
1463
b411b363
PR
1464/**
1465 * drbd_start_resync() - Start the resync process
1466 * @mdev: DRBD device.
1467 * @side: Either C_SYNC_SOURCE or C_SYNC_TARGET
1468 *
1469 * This function might bring you directly into one of the
1470 * C_PAUSED_SYNC_* states.
1471 */
1472void drbd_start_resync(struct drbd_conf *mdev, enum drbd_conns side)
1473{
1474 union drbd_state ns;
1475 int r;
1476
c4752ef1 1477 if (mdev->state.conn >= C_SYNC_SOURCE && mdev->state.conn < C_AHEAD) {
b411b363
PR
1478 dev_err(DEV, "Resync already running!\n");
1479 return;
1480 }
1481
59817f4f
PR
1482 if (mdev->state.conn < C_AHEAD) {
1483 /* In case a previous resync run was aborted by an IO error/detach on the peer. */
1484 drbd_rs_cancel_all(mdev);
1485 /* This should be done when we abort the resync. We definitely do not
1486 want to have this for connections going back and forth between
1487 Ahead/Behind and SyncSource/SyncTarget */
1488 }
b411b363
PR
1489
1490 if (side == C_SYNC_TARGET) {
1491 /* Since application IO was locked out during C_WF_BITMAP_T and
1492 C_WF_SYNC_UUID we are still unmodified. Before going to C_SYNC_TARGET
1493 we check that we might make the data inconsistent. */
1494 r = drbd_khelper(mdev, "before-resync-target");
1495 r = (r >> 8) & 0xff;
1496 if (r > 0) {
1497 dev_info(DEV, "before-resync-target handler returned %d, "
1498 "dropping connection.\n", r);
1499 drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
1500 return;
1501 }
09b9e797
PR
1502 } else /* C_SYNC_SOURCE */ {
1503 r = drbd_khelper(mdev, "before-resync-source");
1504 r = (r >> 8) & 0xff;
1505 if (r > 0) {
1506 if (r == 3) {
1507 dev_info(DEV, "before-resync-source handler returned %d, "
1508 "ignoring. Old userland tools?", r);
1509 } else {
1510 dev_info(DEV, "before-resync-source handler returned %d, "
1511 "dropping connection.\n", r);
1512 drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
1513 return;
1514 }
1515 }
b411b363
PR
1516 }
1517
1518 drbd_state_lock(mdev);
1519
1520 if (!get_ldev_if_state(mdev, D_NEGOTIATING)) {
1521 drbd_state_unlock(mdev);
1522 return;
1523 }
1524
1525 if (side == C_SYNC_TARGET) {
1526 mdev->bm_resync_fo = 0;
1527 } else /* side == C_SYNC_SOURCE */ {
1528 u64 uuid;
1529
1530 get_random_bytes(&uuid, sizeof(u64));
1531 drbd_uuid_set(mdev, UI_BITMAP, uuid);
1532 drbd_send_sync_uuid(mdev, uuid);
1533
1534 D_ASSERT(mdev->state.disk == D_UP_TO_DATE);
1535 }
1536
1537 write_lock_irq(&global_state_lock);
1538 ns = mdev->state;
1539
1540 ns.aftr_isp = !_drbd_may_sync_now(mdev);
1541
1542 ns.conn = side;
1543
1544 if (side == C_SYNC_TARGET)
1545 ns.disk = D_INCONSISTENT;
1546 else /* side == C_SYNC_SOURCE */
1547 ns.pdsk = D_INCONSISTENT;
1548
1549 r = __drbd_set_state(mdev, ns, CS_VERBOSE, NULL);
1550 ns = mdev->state;
1551
1552 if (ns.conn < C_CONNECTED)
1553 r = SS_UNKNOWN_ERROR;
1554
1555 if (r == SS_SUCCESS) {
1d7734a0
LE
1556 unsigned long tw = drbd_bm_total_weight(mdev);
1557 unsigned long now = jiffies;
1558 int i;
1559
b411b363
PR
1560 mdev->rs_failed = 0;
1561 mdev->rs_paused = 0;
b411b363 1562 mdev->rs_same_csum = 0;
0f0601f4
LE
1563 mdev->rs_last_events = 0;
1564 mdev->rs_last_sect_ev = 0;
1d7734a0
LE
1565 mdev->rs_total = tw;
1566 mdev->rs_start = now;
1567 for (i = 0; i < DRBD_SYNC_MARKS; i++) {
1568 mdev->rs_mark_left[i] = tw;
1569 mdev->rs_mark_time[i] = now;
1570 }
b411b363
PR
1571 _drbd_pause_after(mdev);
1572 }
1573 write_unlock_irq(&global_state_lock);
b411b363
PR
1574 put_ldev(mdev);
1575
1576 if (r == SS_SUCCESS) {
1577 dev_info(DEV, "Began resync as %s (will sync %lu KB [%lu bits set]).\n",
1578 drbd_conn_str(ns.conn),
1579 (unsigned long) mdev->rs_total << (BM_BLOCK_SHIFT-10),
1580 (unsigned long) mdev->rs_total);
1581
af85e8e8
LE
1582 if (mdev->agreed_pro_version < 95 && mdev->rs_total == 0) {
1583 /* This still has a race (about when exactly the peers
1584 * detect connection loss) that can lead to a full sync
1585 * on next handshake. In 8.3.9 we fixed this with explicit
1586 * resync-finished notifications, but the fix
1587 * introduces a protocol change. Sleeping for some
1588 * time longer than the ping interval + timeout on the
1589 * SyncSource, to give the SyncTarget the chance to
1590 * detect connection loss, then waiting for a ping
1591 * response (implicit in drbd_resync_finished) reduces
1592 * the race considerably, but does not solve it. */
1593 if (side == C_SYNC_SOURCE)
1594 schedule_timeout_interruptible(
1595 mdev->net_conf->ping_int * HZ +
1596 mdev->net_conf->ping_timeo*HZ/9);
b411b363 1597 drbd_resync_finished(mdev);
b411b363
PR
1598 }
1599
9bd28d3c 1600 drbd_rs_controller_reset(mdev);
b411b363
PR
1601 /* ns.conn may already be != mdev->state.conn,
1602 * we may have been paused in between, or become paused until
1603 * the timer triggers.
1604 * No matter, that is handled in resync_timer_fn() */
1605 if (ns.conn == C_SYNC_TARGET)
1606 mod_timer(&mdev->resync_timer, jiffies);
1607
1608 drbd_md_sync(mdev);
1609 }
d0c3f60f 1610 drbd_state_unlock(mdev);
b411b363
PR
1611}
1612
1613int drbd_worker(struct drbd_thread *thi)
1614{
1615 struct drbd_conf *mdev = thi->mdev;
1616 struct drbd_work *w = NULL;
1617 LIST_HEAD(work_list);
1618 int intr = 0, i;
1619
1620 sprintf(current->comm, "drbd%d_worker", mdev_to_minor(mdev));
1621
1622 while (get_t_state(thi) == Running) {
1623 drbd_thread_current_set_cpu(mdev);
1624
1625 if (down_trylock(&mdev->data.work.s)) {
1626 mutex_lock(&mdev->data.mutex);
1627 if (mdev->data.socket && !mdev->net_conf->no_cork)
1628 drbd_tcp_uncork(mdev->data.socket);
1629 mutex_unlock(&mdev->data.mutex);
1630
1631 intr = down_interruptible(&mdev->data.work.s);
1632
1633 mutex_lock(&mdev->data.mutex);
1634 if (mdev->data.socket && !mdev->net_conf->no_cork)
1635 drbd_tcp_cork(mdev->data.socket);
1636 mutex_unlock(&mdev->data.mutex);
1637 }
1638
1639 if (intr) {
1640 D_ASSERT(intr == -EINTR);
1641 flush_signals(current);
1642 ERR_IF (get_t_state(thi) == Running)
1643 continue;
1644 break;
1645 }
1646
1647 if (get_t_state(thi) != Running)
1648 break;
1649 /* With this break, we have done a down() but not consumed
1650 the entry from the list. The cleanup code takes care of
1651 this... */
1652
1653 w = NULL;
1654 spin_lock_irq(&mdev->data.work.q_lock);
1655 ERR_IF(list_empty(&mdev->data.work.q)) {
1656 /* something terribly wrong in our logic.
1657 * we were able to down() the semaphore,
1658 * but the list is empty... doh.
1659 *
1660 * what is the best thing to do now?
1661 * try again from scratch, restarting the receiver,
1662 * asender, whatnot? could break even more ugly,
1663 * e.g. when we are primary, but no good local data.
1664 *
1665 * I'll try to get away just starting over this loop.
1666 */
1667 spin_unlock_irq(&mdev->data.work.q_lock);
1668 continue;
1669 }
1670 w = list_entry(mdev->data.work.q.next, struct drbd_work, list);
1671 list_del_init(&w->list);
1672 spin_unlock_irq(&mdev->data.work.q_lock);
1673
1674 if (!w->cb(mdev, w, mdev->state.conn < C_CONNECTED)) {
1675 /* dev_warn(DEV, "worker: a callback failed! \n"); */
1676 if (mdev->state.conn >= C_CONNECTED)
1677 drbd_force_state(mdev,
1678 NS(conn, C_NETWORK_FAILURE));
1679 }
1680 }
1681 D_ASSERT(test_bit(DEVICE_DYING, &mdev->flags));
1682 D_ASSERT(test_bit(CONFIG_PENDING, &mdev->flags));
1683
1684 spin_lock_irq(&mdev->data.work.q_lock);
1685 i = 0;
1686 while (!list_empty(&mdev->data.work.q)) {
1687 list_splice_init(&mdev->data.work.q, &work_list);
1688 spin_unlock_irq(&mdev->data.work.q_lock);
1689
1690 while (!list_empty(&work_list)) {
1691 w = list_entry(work_list.next, struct drbd_work, list);
1692 list_del_init(&w->list);
1693 w->cb(mdev, w, 1);
1694 i++; /* dead debugging code */
1695 }
1696
1697 spin_lock_irq(&mdev->data.work.q_lock);
1698 }
1699 sema_init(&mdev->data.work.s, 0);
1700 /* DANGEROUS race: if someone did queue his work within the spinlock,
1701 * but up() ed outside the spinlock, we could get an up() on the
1702 * semaphore without corresponding list entry.
1703 * So don't do that.
1704 */
1705 spin_unlock_irq(&mdev->data.work.q_lock);
1706
1707 D_ASSERT(mdev->state.disk == D_DISKLESS && mdev->state.conn == C_STANDALONE);
1708 /* _drbd_set_state only uses stop_nowait.
1709 * wait here for the Exiting receiver. */
1710 drbd_thread_stop(&mdev->receiver);
1711 drbd_mdev_cleanup(mdev);
1712
1713 dev_info(DEV, "worker terminated\n");
1714
1715 clear_bit(DEVICE_DYING, &mdev->flags);
1716 clear_bit(CONFIG_PENDING, &mdev->flags);
1717 wake_up(&mdev->state_wait);
1718
1719 return 0;
1720}
This page took 0.393766 seconds and 5 git commands to generate.