drbd: new configuration parameter c-min-rate
[deliverable/linux.git] / drivers / block / drbd / drbd_worker.c
1 /*
2 drbd_worker.c
3
4 This file is part of DRBD by Philipp Reisner and Lars Ellenberg.
5
6 Copyright (C) 2001-2008, LINBIT Information Technologies GmbH.
7 Copyright (C) 1999-2008, Philipp Reisner <philipp.reisner@linbit.com>.
8 Copyright (C) 2002-2008, Lars Ellenberg <lars.ellenberg@linbit.com>.
9
10 drbd is free software; you can redistribute it and/or modify
11 it under the terms of the GNU General Public License as published by
12 the Free Software Foundation; either version 2, or (at your option)
13 any later version.
14
15 drbd is distributed in the hope that it will be useful,
16 but WITHOUT ANY WARRANTY; without even the implied warranty of
17 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18 GNU General Public License for more details.
19
20 You should have received a copy of the GNU General Public License
21 along with drbd; see the file COPYING. If not, write to
22 the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
23
24 */
25
26 #include <linux/module.h>
27 #include <linux/drbd.h>
28 #include <linux/sched.h>
29 #include <linux/smp_lock.h>
30 #include <linux/wait.h>
31 #include <linux/mm.h>
32 #include <linux/memcontrol.h>
33 #include <linux/mm_inline.h>
34 #include <linux/slab.h>
35 #include <linux/random.h>
36 #include <linux/string.h>
37 #include <linux/scatterlist.h>
38
39 #include "drbd_int.h"
40 #include "drbd_req.h"
41
42 static int w_make_ov_request(struct drbd_conf *mdev, struct drbd_work *w, int cancel);
43
44
45
46 /* defined here:
47 drbd_md_io_complete
48 drbd_endio_sec
49 drbd_endio_pri
50
51 * more endio handlers:
52 atodb_endio in drbd_actlog.c
53 drbd_bm_async_io_complete in drbd_bitmap.c
54
55 * For all these callbacks, note the following:
56 * The callbacks will be called in irq context by the IDE drivers,
57 * and in Softirqs/Tasklets/BH context by the SCSI drivers.
58 * Try to get the locking right :)
59 *
60 */
61
62
63 /* About the global_state_lock
64 Each state transition on an device holds a read lock. In case we have
65 to evaluate the sync after dependencies, we grab a write lock, because
66 we need stable states on all devices for that. */
67 rwlock_t global_state_lock;
68
69 /* used for synchronous meta data and bitmap IO
70 * submitted by drbd_md_sync_page_io()
71 */
72 void drbd_md_io_complete(struct bio *bio, int error)
73 {
74 struct drbd_md_io *md_io;
75
76 md_io = (struct drbd_md_io *)bio->bi_private;
77 md_io->error = error;
78
79 complete(&md_io->event);
80 }
81
82 /* reads on behalf of the partner,
83 * "submitted" by the receiver
84 */
85 void drbd_endio_read_sec_final(struct drbd_epoch_entry *e) __releases(local)
86 {
87 unsigned long flags = 0;
88 struct drbd_conf *mdev = e->mdev;
89
90 D_ASSERT(e->block_id != ID_VACANT);
91
92 spin_lock_irqsave(&mdev->req_lock, flags);
93 mdev->read_cnt += e->size >> 9;
94 list_del(&e->w.list);
95 if (list_empty(&mdev->read_ee))
96 wake_up(&mdev->ee_wait);
97 if (test_bit(__EE_WAS_ERROR, &e->flags))
98 __drbd_chk_io_error(mdev, FALSE);
99 spin_unlock_irqrestore(&mdev->req_lock, flags);
100
101 drbd_queue_work(&mdev->data.work, &e->w);
102 put_ldev(mdev);
103 }
104
105 static int is_failed_barrier(int ee_flags)
106 {
107 return (ee_flags & (EE_IS_BARRIER|EE_WAS_ERROR|EE_RESUBMITTED))
108 == (EE_IS_BARRIER|EE_WAS_ERROR);
109 }
110
111 /* writes on behalf of the partner, or resync writes,
112 * "submitted" by the receiver, final stage. */
113 static void drbd_endio_write_sec_final(struct drbd_epoch_entry *e) __releases(local)
114 {
115 unsigned long flags = 0;
116 struct drbd_conf *mdev = e->mdev;
117 sector_t e_sector;
118 int do_wake;
119 int is_syncer_req;
120 int do_al_complete_io;
121
122 /* if this is a failed barrier request, disable use of barriers,
123 * and schedule for resubmission */
124 if (is_failed_barrier(e->flags)) {
125 drbd_bump_write_ordering(mdev, WO_bdev_flush);
126 spin_lock_irqsave(&mdev->req_lock, flags);
127 list_del(&e->w.list);
128 e->flags = (e->flags & ~EE_WAS_ERROR) | EE_RESUBMITTED;
129 e->w.cb = w_e_reissue;
130 /* put_ldev actually happens below, once we come here again. */
131 __release(local);
132 spin_unlock_irqrestore(&mdev->req_lock, flags);
133 drbd_queue_work(&mdev->data.work, &e->w);
134 return;
135 }
136
137 D_ASSERT(e->block_id != ID_VACANT);
138
139 /* after we moved e to done_ee,
140 * we may no longer access it,
141 * it may be freed/reused already!
142 * (as soon as we release the req_lock) */
143 e_sector = e->sector;
144 do_al_complete_io = e->flags & EE_CALL_AL_COMPLETE_IO;
145 is_syncer_req = is_syncer_block_id(e->block_id);
146
147 spin_lock_irqsave(&mdev->req_lock, flags);
148 mdev->writ_cnt += e->size >> 9;
149 list_del(&e->w.list); /* has been on active_ee or sync_ee */
150 list_add_tail(&e->w.list, &mdev->done_ee);
151
152 /* No hlist_del_init(&e->colision) here, we did not send the Ack yet,
153 * neither did we wake possibly waiting conflicting requests.
154 * done from "drbd_process_done_ee" within the appropriate w.cb
155 * (e_end_block/e_end_resync_block) or from _drbd_clear_done_ee */
156
157 do_wake = is_syncer_req
158 ? list_empty(&mdev->sync_ee)
159 : list_empty(&mdev->active_ee);
160
161 if (test_bit(__EE_WAS_ERROR, &e->flags))
162 __drbd_chk_io_error(mdev, FALSE);
163 spin_unlock_irqrestore(&mdev->req_lock, flags);
164
165 if (is_syncer_req)
166 drbd_rs_complete_io(mdev, e_sector);
167
168 if (do_wake)
169 wake_up(&mdev->ee_wait);
170
171 if (do_al_complete_io)
172 drbd_al_complete_io(mdev, e_sector);
173
174 wake_asender(mdev);
175 put_ldev(mdev);
176 }
177
178 /* writes on behalf of the partner, or resync writes,
179 * "submitted" by the receiver.
180 */
181 void drbd_endio_sec(struct bio *bio, int error)
182 {
183 struct drbd_epoch_entry *e = bio->bi_private;
184 struct drbd_conf *mdev = e->mdev;
185 int uptodate = bio_flagged(bio, BIO_UPTODATE);
186 int is_write = bio_data_dir(bio) == WRITE;
187
188 if (error)
189 dev_warn(DEV, "%s: error=%d s=%llus\n",
190 is_write ? "write" : "read", error,
191 (unsigned long long)e->sector);
192 if (!error && !uptodate) {
193 dev_warn(DEV, "%s: setting error to -EIO s=%llus\n",
194 is_write ? "write" : "read",
195 (unsigned long long)e->sector);
196 /* strange behavior of some lower level drivers...
197 * fail the request by clearing the uptodate flag,
198 * but do not return any error?! */
199 error = -EIO;
200 }
201
202 if (error)
203 set_bit(__EE_WAS_ERROR, &e->flags);
204
205 bio_put(bio); /* no need for the bio anymore */
206 if (atomic_dec_and_test(&e->pending_bios)) {
207 if (is_write)
208 drbd_endio_write_sec_final(e);
209 else
210 drbd_endio_read_sec_final(e);
211 }
212 }
213
214 /* read, readA or write requests on R_PRIMARY coming from drbd_make_request
215 */
216 void drbd_endio_pri(struct bio *bio, int error)
217 {
218 struct drbd_request *req = bio->bi_private;
219 struct drbd_conf *mdev = req->mdev;
220 enum drbd_req_event what;
221 int uptodate = bio_flagged(bio, BIO_UPTODATE);
222
223 if (!error && !uptodate) {
224 dev_warn(DEV, "p %s: setting error to -EIO\n",
225 bio_data_dir(bio) == WRITE ? "write" : "read");
226 /* strange behavior of some lower level drivers...
227 * fail the request by clearing the uptodate flag,
228 * but do not return any error?! */
229 error = -EIO;
230 }
231
232 /* to avoid recursion in __req_mod */
233 if (unlikely(error)) {
234 what = (bio_data_dir(bio) == WRITE)
235 ? write_completed_with_error
236 : (bio_rw(bio) == READ)
237 ? read_completed_with_error
238 : read_ahead_completed_with_error;
239 } else
240 what = completed_ok;
241
242 bio_put(req->private_bio);
243 req->private_bio = ERR_PTR(error);
244
245 req_mod(req, what);
246 }
247
248 int w_read_retry_remote(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
249 {
250 struct drbd_request *req = container_of(w, struct drbd_request, w);
251
252 /* We should not detach for read io-error,
253 * but try to WRITE the P_DATA_REPLY to the failed location,
254 * to give the disk the chance to relocate that block */
255
256 spin_lock_irq(&mdev->req_lock);
257 if (cancel || mdev->state.pdsk != D_UP_TO_DATE) {
258 _req_mod(req, read_retry_remote_canceled);
259 spin_unlock_irq(&mdev->req_lock);
260 return 1;
261 }
262 spin_unlock_irq(&mdev->req_lock);
263
264 return w_send_read_req(mdev, w, 0);
265 }
266
267 int w_resync_inactive(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
268 {
269 ERR_IF(cancel) return 1;
270 dev_err(DEV, "resync inactive, but callback triggered??\n");
271 return 1; /* Simply ignore this! */
272 }
273
274 void drbd_csum_ee(struct drbd_conf *mdev, struct crypto_hash *tfm, struct drbd_epoch_entry *e, void *digest)
275 {
276 struct hash_desc desc;
277 struct scatterlist sg;
278 struct page *page = e->pages;
279 struct page *tmp;
280 unsigned len;
281
282 desc.tfm = tfm;
283 desc.flags = 0;
284
285 sg_init_table(&sg, 1);
286 crypto_hash_init(&desc);
287
288 while ((tmp = page_chain_next(page))) {
289 /* all but the last page will be fully used */
290 sg_set_page(&sg, page, PAGE_SIZE, 0);
291 crypto_hash_update(&desc, &sg, sg.length);
292 page = tmp;
293 }
294 /* and now the last, possibly only partially used page */
295 len = e->size & (PAGE_SIZE - 1);
296 sg_set_page(&sg, page, len ?: PAGE_SIZE, 0);
297 crypto_hash_update(&desc, &sg, sg.length);
298 crypto_hash_final(&desc, digest);
299 }
300
301 void drbd_csum_bio(struct drbd_conf *mdev, struct crypto_hash *tfm, struct bio *bio, void *digest)
302 {
303 struct hash_desc desc;
304 struct scatterlist sg;
305 struct bio_vec *bvec;
306 int i;
307
308 desc.tfm = tfm;
309 desc.flags = 0;
310
311 sg_init_table(&sg, 1);
312 crypto_hash_init(&desc);
313
314 __bio_for_each_segment(bvec, bio, i, 0) {
315 sg_set_page(&sg, bvec->bv_page, bvec->bv_len, bvec->bv_offset);
316 crypto_hash_update(&desc, &sg, sg.length);
317 }
318 crypto_hash_final(&desc, digest);
319 }
320
321 static int w_e_send_csum(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
322 {
323 struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w);
324 int digest_size;
325 void *digest;
326 int ok;
327
328 D_ASSERT(e->block_id == DRBD_MAGIC + 0xbeef);
329
330 if (unlikely(cancel)) {
331 drbd_free_ee(mdev, e);
332 return 1;
333 }
334
335 if (likely((e->flags & EE_WAS_ERROR) == 0)) {
336 digest_size = crypto_hash_digestsize(mdev->csums_tfm);
337 digest = kmalloc(digest_size, GFP_NOIO);
338 if (digest) {
339 drbd_csum_ee(mdev, mdev->csums_tfm, e, digest);
340
341 inc_rs_pending(mdev);
342 ok = drbd_send_drequest_csum(mdev,
343 e->sector,
344 e->size,
345 digest,
346 digest_size,
347 P_CSUM_RS_REQUEST);
348 kfree(digest);
349 } else {
350 dev_err(DEV, "kmalloc() of digest failed.\n");
351 ok = 0;
352 }
353 } else
354 ok = 1;
355
356 drbd_free_ee(mdev, e);
357
358 if (unlikely(!ok))
359 dev_err(DEV, "drbd_send_drequest(..., csum) failed\n");
360 return ok;
361 }
362
363 #define GFP_TRY (__GFP_HIGHMEM | __GFP_NOWARN)
364
365 static int read_for_csum(struct drbd_conf *mdev, sector_t sector, int size)
366 {
367 struct drbd_epoch_entry *e;
368
369 if (!get_ldev(mdev))
370 return -EIO;
371
372 if (drbd_rs_should_slow_down(mdev))
373 goto defer;
374
375 /* GFP_TRY, because if there is no memory available right now, this may
376 * be rescheduled for later. It is "only" background resync, after all. */
377 e = drbd_alloc_ee(mdev, DRBD_MAGIC+0xbeef, sector, size, GFP_TRY);
378 if (!e)
379 goto defer;
380
381 e->w.cb = w_e_send_csum;
382 spin_lock_irq(&mdev->req_lock);
383 list_add(&e->w.list, &mdev->read_ee);
384 spin_unlock_irq(&mdev->req_lock);
385
386 atomic_add(size >> 9, &mdev->rs_sect_ev);
387 if (drbd_submit_ee(mdev, e, READ, DRBD_FAULT_RS_RD) == 0)
388 return 0;
389
390 drbd_free_ee(mdev, e);
391 defer:
392 put_ldev(mdev);
393 return -EAGAIN;
394 }
395
396 void resync_timer_fn(unsigned long data)
397 {
398 unsigned long flags;
399 struct drbd_conf *mdev = (struct drbd_conf *) data;
400 int queue;
401
402 spin_lock_irqsave(&mdev->req_lock, flags);
403
404 if (likely(!test_and_clear_bit(STOP_SYNC_TIMER, &mdev->flags))) {
405 queue = 1;
406 if (mdev->state.conn == C_VERIFY_S)
407 mdev->resync_work.cb = w_make_ov_request;
408 else
409 mdev->resync_work.cb = w_make_resync_request;
410 } else {
411 queue = 0;
412 mdev->resync_work.cb = w_resync_inactive;
413 }
414
415 spin_unlock_irqrestore(&mdev->req_lock, flags);
416
417 /* harmless race: list_empty outside data.work.q_lock */
418 if (list_empty(&mdev->resync_work.list) && queue)
419 drbd_queue_work(&mdev->data.work, &mdev->resync_work);
420 }
421
422 static void fifo_set(struct fifo_buffer *fb, int value)
423 {
424 int i;
425
426 for (i = 0; i < fb->size; i++)
427 fb->values[i] += value;
428 }
429
430 static int fifo_push(struct fifo_buffer *fb, int value)
431 {
432 int ov;
433
434 ov = fb->values[fb->head_index];
435 fb->values[fb->head_index++] = value;
436
437 if (fb->head_index >= fb->size)
438 fb->head_index = 0;
439
440 return ov;
441 }
442
443 static void fifo_add_val(struct fifo_buffer *fb, int value)
444 {
445 int i;
446
447 for (i = 0; i < fb->size; i++)
448 fb->values[i] += value;
449 }
450
451 int drbd_rs_controller(struct drbd_conf *mdev)
452 {
453 unsigned int sect_in; /* Number of sectors that came in since the last turn */
454 unsigned int want; /* The number of sectors we want in the proxy */
455 int req_sect; /* Number of sectors to request in this turn */
456 int correction; /* Number of sectors more we need in the proxy*/
457 int cps; /* correction per invocation of drbd_rs_controller() */
458 int steps; /* Number of time steps to plan ahead */
459 int curr_corr;
460 int max_sect;
461
462 sect_in = atomic_xchg(&mdev->rs_sect_in, 0); /* Number of sectors that came in */
463 mdev->rs_in_flight -= sect_in;
464
465 spin_lock(&mdev->peer_seq_lock); /* get an atomic view on mdev->rs_plan_s */
466
467 steps = mdev->rs_plan_s.size; /* (mdev->sync_conf.c_plan_ahead * 10 * SLEEP_TIME) / HZ; */
468
469 if (mdev->rs_in_flight + sect_in == 0) { /* At start of resync */
470 want = ((mdev->sync_conf.rate * 2 * SLEEP_TIME) / HZ) * steps;
471 } else { /* normal path */
472 want = mdev->sync_conf.c_fill_target ? mdev->sync_conf.c_fill_target :
473 sect_in * mdev->sync_conf.c_delay_target * HZ / (SLEEP_TIME * 10);
474 }
475
476 correction = want - mdev->rs_in_flight - mdev->rs_planed;
477
478 /* Plan ahead */
479 cps = correction / steps;
480 fifo_add_val(&mdev->rs_plan_s, cps);
481 mdev->rs_planed += cps * steps;
482
483 /* What we do in this step */
484 curr_corr = fifo_push(&mdev->rs_plan_s, 0);
485 spin_unlock(&mdev->peer_seq_lock);
486 mdev->rs_planed -= curr_corr;
487
488 req_sect = sect_in + curr_corr;
489 if (req_sect < 0)
490 req_sect = 0;
491
492 max_sect = (mdev->sync_conf.c_max_rate * 2 * SLEEP_TIME) / HZ;
493 if (req_sect > max_sect)
494 req_sect = max_sect;
495
496 /*
497 dev_warn(DEV, "si=%u if=%d wa=%u co=%d st=%d cps=%d pl=%d cc=%d rs=%d\n",
498 sect_in, mdev->rs_in_flight, want, correction,
499 steps, cps, mdev->rs_planed, curr_corr, req_sect);
500 */
501
502 return req_sect;
503 }
504
505 int w_make_resync_request(struct drbd_conf *mdev,
506 struct drbd_work *w, int cancel)
507 {
508 unsigned long bit;
509 sector_t sector;
510 const sector_t capacity = drbd_get_capacity(mdev->this_bdev);
511 int max_segment_size;
512 int number, rollback_i, size, pe, mx;
513 int align, queued, sndbuf;
514 int i = 0;
515
516 if (unlikely(cancel))
517 return 1;
518
519 if (unlikely(mdev->state.conn < C_CONNECTED)) {
520 dev_err(DEV, "Confused in w_make_resync_request()! cstate < Connected");
521 return 0;
522 }
523
524 if (mdev->state.conn != C_SYNC_TARGET)
525 dev_err(DEV, "%s in w_make_resync_request\n",
526 drbd_conn_str(mdev->state.conn));
527
528 if (!get_ldev(mdev)) {
529 /* Since we only need to access mdev->rsync a
530 get_ldev_if_state(mdev,D_FAILED) would be sufficient, but
531 to continue resync with a broken disk makes no sense at
532 all */
533 dev_err(DEV, "Disk broke down during resync!\n");
534 mdev->resync_work.cb = w_resync_inactive;
535 return 1;
536 }
537
538 /* starting with drbd 8.3.8, we can handle multi-bio EEs,
539 * if it should be necessary */
540 max_segment_size = mdev->agreed_pro_version < 94 ?
541 queue_max_segment_size(mdev->rq_queue) : DRBD_MAX_SEGMENT_SIZE;
542
543 if (mdev->rs_plan_s.size) { /* mdev->sync_conf.c_plan_ahead */
544 number = drbd_rs_controller(mdev) >> (BM_BLOCK_SHIFT - 9);
545 mdev->c_sync_rate = number * HZ * (BM_BLOCK_SIZE / 1024) / SLEEP_TIME;
546 } else {
547 mdev->c_sync_rate = mdev->sync_conf.rate;
548 number = SLEEP_TIME * mdev->c_sync_rate / ((BM_BLOCK_SIZE / 1024) * HZ);
549 }
550
551 /* Throttle resync on lower level disk activity, which may also be
552 * caused by application IO on Primary/SyncTarget.
553 * Keep this after the call to drbd_rs_controller, as that assumes
554 * to be called as precisely as possible every SLEEP_TIME,
555 * and would be confused otherwise. */
556 if (drbd_rs_should_slow_down(mdev))
557 goto requeue;
558
559 mutex_lock(&mdev->data.mutex);
560 if (mdev->data.socket)
561 mx = mdev->data.socket->sk->sk_rcvbuf / sizeof(struct p_block_req);
562 else
563 mx = 1;
564 mutex_unlock(&mdev->data.mutex);
565
566 /* For resync rates >160MB/sec, allow more pending RS requests */
567 if (number > mx)
568 mx = number;
569
570 /* Limit the number of pending RS requests to no more than the peer's receive buffer */
571 pe = atomic_read(&mdev->rs_pending_cnt);
572 if ((pe + number) > mx) {
573 number = mx - pe;
574 }
575
576 for (i = 0; i < number; i++) {
577 /* Stop generating RS requests, when half of the send buffer is filled */
578 mutex_lock(&mdev->data.mutex);
579 if (mdev->data.socket) {
580 queued = mdev->data.socket->sk->sk_wmem_queued;
581 sndbuf = mdev->data.socket->sk->sk_sndbuf;
582 } else {
583 queued = 1;
584 sndbuf = 0;
585 }
586 mutex_unlock(&mdev->data.mutex);
587 if (queued > sndbuf / 2)
588 goto requeue;
589
590 next_sector:
591 size = BM_BLOCK_SIZE;
592 bit = drbd_bm_find_next(mdev, mdev->bm_resync_fo);
593
594 if (bit == -1UL) {
595 mdev->bm_resync_fo = drbd_bm_bits(mdev);
596 mdev->resync_work.cb = w_resync_inactive;
597 put_ldev(mdev);
598 return 1;
599 }
600
601 sector = BM_BIT_TO_SECT(bit);
602
603 if (drbd_try_rs_begin_io(mdev, sector)) {
604 mdev->bm_resync_fo = bit;
605 goto requeue;
606 }
607 mdev->bm_resync_fo = bit + 1;
608
609 if (unlikely(drbd_bm_test_bit(mdev, bit) == 0)) {
610 drbd_rs_complete_io(mdev, sector);
611 goto next_sector;
612 }
613
614 #if DRBD_MAX_SEGMENT_SIZE > BM_BLOCK_SIZE
615 /* try to find some adjacent bits.
616 * we stop if we have already the maximum req size.
617 *
618 * Additionally always align bigger requests, in order to
619 * be prepared for all stripe sizes of software RAIDs.
620 */
621 align = 1;
622 rollback_i = i;
623 for (;;) {
624 if (size + BM_BLOCK_SIZE > max_segment_size)
625 break;
626
627 /* Be always aligned */
628 if (sector & ((1<<(align+3))-1))
629 break;
630
631 /* do not cross extent boundaries */
632 if (((bit+1) & BM_BLOCKS_PER_BM_EXT_MASK) == 0)
633 break;
634 /* now, is it actually dirty, after all?
635 * caution, drbd_bm_test_bit is tri-state for some
636 * obscure reason; ( b == 0 ) would get the out-of-band
637 * only accidentally right because of the "oddly sized"
638 * adjustment below */
639 if (drbd_bm_test_bit(mdev, bit+1) != 1)
640 break;
641 bit++;
642 size += BM_BLOCK_SIZE;
643 if ((BM_BLOCK_SIZE << align) <= size)
644 align++;
645 i++;
646 }
647 /* if we merged some,
648 * reset the offset to start the next drbd_bm_find_next from */
649 if (size > BM_BLOCK_SIZE)
650 mdev->bm_resync_fo = bit + 1;
651 #endif
652
653 /* adjust very last sectors, in case we are oddly sized */
654 if (sector + (size>>9) > capacity)
655 size = (capacity-sector)<<9;
656 if (mdev->agreed_pro_version >= 89 && mdev->csums_tfm) {
657 switch (read_for_csum(mdev, sector, size)) {
658 case -EIO: /* Disk failure */
659 put_ldev(mdev);
660 return 0;
661 case -EAGAIN: /* allocation failed, or ldev busy */
662 drbd_rs_complete_io(mdev, sector);
663 mdev->bm_resync_fo = BM_SECT_TO_BIT(sector);
664 i = rollback_i;
665 goto requeue;
666 case 0:
667 /* everything ok */
668 break;
669 default:
670 BUG();
671 }
672 } else {
673 inc_rs_pending(mdev);
674 if (!drbd_send_drequest(mdev, P_RS_DATA_REQUEST,
675 sector, size, ID_SYNCER)) {
676 dev_err(DEV, "drbd_send_drequest() failed, aborting...\n");
677 dec_rs_pending(mdev);
678 put_ldev(mdev);
679 return 0;
680 }
681 }
682 }
683
684 if (mdev->bm_resync_fo >= drbd_bm_bits(mdev)) {
685 /* last syncer _request_ was sent,
686 * but the P_RS_DATA_REPLY not yet received. sync will end (and
687 * next sync group will resume), as soon as we receive the last
688 * resync data block, and the last bit is cleared.
689 * until then resync "work" is "inactive" ...
690 */
691 mdev->resync_work.cb = w_resync_inactive;
692 put_ldev(mdev);
693 return 1;
694 }
695
696 requeue:
697 mdev->rs_in_flight += (i << (BM_BLOCK_SHIFT - 9));
698 mod_timer(&mdev->resync_timer, jiffies + SLEEP_TIME);
699 put_ldev(mdev);
700 return 1;
701 }
702
703 static int w_make_ov_request(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
704 {
705 int number, i, size;
706 sector_t sector;
707 const sector_t capacity = drbd_get_capacity(mdev->this_bdev);
708
709 if (unlikely(cancel))
710 return 1;
711
712 if (unlikely(mdev->state.conn < C_CONNECTED)) {
713 dev_err(DEV, "Confused in w_make_ov_request()! cstate < Connected");
714 return 0;
715 }
716
717 number = SLEEP_TIME*mdev->sync_conf.rate / ((BM_BLOCK_SIZE/1024)*HZ);
718 if (atomic_read(&mdev->rs_pending_cnt) > number)
719 goto requeue;
720
721 number -= atomic_read(&mdev->rs_pending_cnt);
722
723 sector = mdev->ov_position;
724 for (i = 0; i < number; i++) {
725 if (sector >= capacity) {
726 mdev->resync_work.cb = w_resync_inactive;
727 return 1;
728 }
729
730 size = BM_BLOCK_SIZE;
731
732 if (drbd_try_rs_begin_io(mdev, sector)) {
733 mdev->ov_position = sector;
734 goto requeue;
735 }
736
737 if (sector + (size>>9) > capacity)
738 size = (capacity-sector)<<9;
739
740 inc_rs_pending(mdev);
741 if (!drbd_send_ov_request(mdev, sector, size)) {
742 dec_rs_pending(mdev);
743 return 0;
744 }
745 sector += BM_SECT_PER_BIT;
746 }
747 mdev->ov_position = sector;
748
749 requeue:
750 mod_timer(&mdev->resync_timer, jiffies + SLEEP_TIME);
751 return 1;
752 }
753
754
755 int w_ov_finished(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
756 {
757 kfree(w);
758 ov_oos_print(mdev);
759 drbd_resync_finished(mdev);
760
761 return 1;
762 }
763
764 static int w_resync_finished(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
765 {
766 kfree(w);
767
768 drbd_resync_finished(mdev);
769
770 return 1;
771 }
772
773 int drbd_resync_finished(struct drbd_conf *mdev)
774 {
775 unsigned long db, dt, dbdt;
776 unsigned long n_oos;
777 union drbd_state os, ns;
778 struct drbd_work *w;
779 char *khelper_cmd = NULL;
780
781 /* Remove all elements from the resync LRU. Since future actions
782 * might set bits in the (main) bitmap, then the entries in the
783 * resync LRU would be wrong. */
784 if (drbd_rs_del_all(mdev)) {
785 /* In case this is not possible now, most probably because
786 * there are P_RS_DATA_REPLY Packets lingering on the worker's
787 * queue (or even the read operations for those packets
788 * is not finished by now). Retry in 100ms. */
789
790 drbd_kick_lo(mdev);
791 __set_current_state(TASK_INTERRUPTIBLE);
792 schedule_timeout(HZ / 10);
793 w = kmalloc(sizeof(struct drbd_work), GFP_ATOMIC);
794 if (w) {
795 w->cb = w_resync_finished;
796 drbd_queue_work(&mdev->data.work, w);
797 return 1;
798 }
799 dev_err(DEV, "Warn failed to drbd_rs_del_all() and to kmalloc(w).\n");
800 }
801
802 dt = (jiffies - mdev->rs_start - mdev->rs_paused) / HZ;
803 if (dt <= 0)
804 dt = 1;
805 db = mdev->rs_total;
806 dbdt = Bit2KB(db/dt);
807 mdev->rs_paused /= HZ;
808
809 if (!get_ldev(mdev))
810 goto out;
811
812 spin_lock_irq(&mdev->req_lock);
813 os = mdev->state;
814
815 /* This protects us against multiple calls (that can happen in the presence
816 of application IO), and against connectivity loss just before we arrive here. */
817 if (os.conn <= C_CONNECTED)
818 goto out_unlock;
819
820 ns = os;
821 ns.conn = C_CONNECTED;
822
823 dev_info(DEV, "%s done (total %lu sec; paused %lu sec; %lu K/sec)\n",
824 (os.conn == C_VERIFY_S || os.conn == C_VERIFY_T) ?
825 "Online verify " : "Resync",
826 dt + mdev->rs_paused, mdev->rs_paused, dbdt);
827
828 n_oos = drbd_bm_total_weight(mdev);
829
830 if (os.conn == C_VERIFY_S || os.conn == C_VERIFY_T) {
831 if (n_oos) {
832 dev_alert(DEV, "Online verify found %lu %dk block out of sync!\n",
833 n_oos, Bit2KB(1));
834 khelper_cmd = "out-of-sync";
835 }
836 } else {
837 D_ASSERT((n_oos - mdev->rs_failed) == 0);
838
839 if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T)
840 khelper_cmd = "after-resync-target";
841
842 if (mdev->csums_tfm && mdev->rs_total) {
843 const unsigned long s = mdev->rs_same_csum;
844 const unsigned long t = mdev->rs_total;
845 const int ratio =
846 (t == 0) ? 0 :
847 (t < 100000) ? ((s*100)/t) : (s/(t/100));
848 dev_info(DEV, "%u %% had equal check sums, eliminated: %luK; "
849 "transferred %luK total %luK\n",
850 ratio,
851 Bit2KB(mdev->rs_same_csum),
852 Bit2KB(mdev->rs_total - mdev->rs_same_csum),
853 Bit2KB(mdev->rs_total));
854 }
855 }
856
857 if (mdev->rs_failed) {
858 dev_info(DEV, " %lu failed blocks\n", mdev->rs_failed);
859
860 if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T) {
861 ns.disk = D_INCONSISTENT;
862 ns.pdsk = D_UP_TO_DATE;
863 } else {
864 ns.disk = D_UP_TO_DATE;
865 ns.pdsk = D_INCONSISTENT;
866 }
867 } else {
868 ns.disk = D_UP_TO_DATE;
869 ns.pdsk = D_UP_TO_DATE;
870
871 if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T) {
872 if (mdev->p_uuid) {
873 int i;
874 for (i = UI_BITMAP ; i <= UI_HISTORY_END ; i++)
875 _drbd_uuid_set(mdev, i, mdev->p_uuid[i]);
876 drbd_uuid_set(mdev, UI_BITMAP, mdev->ldev->md.uuid[UI_CURRENT]);
877 _drbd_uuid_set(mdev, UI_CURRENT, mdev->p_uuid[UI_CURRENT]);
878 } else {
879 dev_err(DEV, "mdev->p_uuid is NULL! BUG\n");
880 }
881 }
882
883 drbd_uuid_set_bm(mdev, 0UL);
884
885 if (mdev->p_uuid) {
886 /* Now the two UUID sets are equal, update what we
887 * know of the peer. */
888 int i;
889 for (i = UI_CURRENT ; i <= UI_HISTORY_END ; i++)
890 mdev->p_uuid[i] = mdev->ldev->md.uuid[i];
891 }
892 }
893
894 _drbd_set_state(mdev, ns, CS_VERBOSE, NULL);
895 out_unlock:
896 spin_unlock_irq(&mdev->req_lock);
897 put_ldev(mdev);
898 out:
899 mdev->rs_total = 0;
900 mdev->rs_failed = 0;
901 mdev->rs_paused = 0;
902 mdev->ov_start_sector = 0;
903
904 if (test_and_clear_bit(WRITE_BM_AFTER_RESYNC, &mdev->flags)) {
905 dev_warn(DEV, "Writing the whole bitmap, due to failed kmalloc\n");
906 drbd_queue_bitmap_io(mdev, &drbd_bm_write, NULL, "write from resync_finished");
907 }
908
909 if (khelper_cmd)
910 drbd_khelper(mdev, khelper_cmd);
911
912 return 1;
913 }
914
915 /* helper */
916 static void move_to_net_ee_or_free(struct drbd_conf *mdev, struct drbd_epoch_entry *e)
917 {
918 if (drbd_ee_has_active_page(e)) {
919 /* This might happen if sendpage() has not finished */
920 spin_lock_irq(&mdev->req_lock);
921 list_add_tail(&e->w.list, &mdev->net_ee);
922 spin_unlock_irq(&mdev->req_lock);
923 } else
924 drbd_free_ee(mdev, e);
925 }
926
927 /**
928 * w_e_end_data_req() - Worker callback, to send a P_DATA_REPLY packet in response to a P_DATA_REQUEST
929 * @mdev: DRBD device.
930 * @w: work object.
931 * @cancel: The connection will be closed anyways
932 */
933 int w_e_end_data_req(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
934 {
935 struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w);
936 int ok;
937
938 if (unlikely(cancel)) {
939 drbd_free_ee(mdev, e);
940 dec_unacked(mdev);
941 return 1;
942 }
943
944 if (likely((e->flags & EE_WAS_ERROR) == 0)) {
945 ok = drbd_send_block(mdev, P_DATA_REPLY, e);
946 } else {
947 if (__ratelimit(&drbd_ratelimit_state))
948 dev_err(DEV, "Sending NegDReply. sector=%llus.\n",
949 (unsigned long long)e->sector);
950
951 ok = drbd_send_ack(mdev, P_NEG_DREPLY, e);
952 }
953
954 dec_unacked(mdev);
955
956 move_to_net_ee_or_free(mdev, e);
957
958 if (unlikely(!ok))
959 dev_err(DEV, "drbd_send_block() failed\n");
960 return ok;
961 }
962
963 /**
964 * w_e_end_rsdata_req() - Worker callback to send a P_RS_DATA_REPLY packet in response to a P_RS_DATA_REQUESTRS
965 * @mdev: DRBD device.
966 * @w: work object.
967 * @cancel: The connection will be closed anyways
968 */
969 int w_e_end_rsdata_req(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
970 {
971 struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w);
972 int ok;
973
974 if (unlikely(cancel)) {
975 drbd_free_ee(mdev, e);
976 dec_unacked(mdev);
977 return 1;
978 }
979
980 if (get_ldev_if_state(mdev, D_FAILED)) {
981 drbd_rs_complete_io(mdev, e->sector);
982 put_ldev(mdev);
983 }
984
985 if (likely((e->flags & EE_WAS_ERROR) == 0)) {
986 if (likely(mdev->state.pdsk >= D_INCONSISTENT)) {
987 inc_rs_pending(mdev);
988 ok = drbd_send_block(mdev, P_RS_DATA_REPLY, e);
989 } else {
990 if (__ratelimit(&drbd_ratelimit_state))
991 dev_err(DEV, "Not sending RSDataReply, "
992 "partner DISKLESS!\n");
993 ok = 1;
994 }
995 } else {
996 if (__ratelimit(&drbd_ratelimit_state))
997 dev_err(DEV, "Sending NegRSDReply. sector %llus.\n",
998 (unsigned long long)e->sector);
999
1000 ok = drbd_send_ack(mdev, P_NEG_RS_DREPLY, e);
1001
1002 /* update resync data with failure */
1003 drbd_rs_failed_io(mdev, e->sector, e->size);
1004 }
1005
1006 dec_unacked(mdev);
1007
1008 move_to_net_ee_or_free(mdev, e);
1009
1010 if (unlikely(!ok))
1011 dev_err(DEV, "drbd_send_block() failed\n");
1012 return ok;
1013 }
1014
1015 int w_e_end_csum_rs_req(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1016 {
1017 struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w);
1018 struct digest_info *di;
1019 int digest_size;
1020 void *digest = NULL;
1021 int ok, eq = 0;
1022
1023 if (unlikely(cancel)) {
1024 drbd_free_ee(mdev, e);
1025 dec_unacked(mdev);
1026 return 1;
1027 }
1028
1029 drbd_rs_complete_io(mdev, e->sector);
1030
1031 di = e->digest;
1032
1033 if (likely((e->flags & EE_WAS_ERROR) == 0)) {
1034 /* quick hack to try to avoid a race against reconfiguration.
1035 * a real fix would be much more involved,
1036 * introducing more locking mechanisms */
1037 if (mdev->csums_tfm) {
1038 digest_size = crypto_hash_digestsize(mdev->csums_tfm);
1039 D_ASSERT(digest_size == di->digest_size);
1040 digest = kmalloc(digest_size, GFP_NOIO);
1041 }
1042 if (digest) {
1043 drbd_csum_ee(mdev, mdev->csums_tfm, e, digest);
1044 eq = !memcmp(digest, di->digest, digest_size);
1045 kfree(digest);
1046 }
1047
1048 if (eq) {
1049 drbd_set_in_sync(mdev, e->sector, e->size);
1050 /* rs_same_csums unit is BM_BLOCK_SIZE */
1051 mdev->rs_same_csum += e->size >> BM_BLOCK_SHIFT;
1052 ok = drbd_send_ack(mdev, P_RS_IS_IN_SYNC, e);
1053 } else {
1054 inc_rs_pending(mdev);
1055 e->block_id = ID_SYNCER;
1056 ok = drbd_send_block(mdev, P_RS_DATA_REPLY, e);
1057 }
1058 } else {
1059 ok = drbd_send_ack(mdev, P_NEG_RS_DREPLY, e);
1060 if (__ratelimit(&drbd_ratelimit_state))
1061 dev_err(DEV, "Sending NegDReply. I guess it gets messy.\n");
1062 }
1063
1064 dec_unacked(mdev);
1065 move_to_net_ee_or_free(mdev, e);
1066
1067 if (unlikely(!ok))
1068 dev_err(DEV, "drbd_send_block/ack() failed\n");
1069 return ok;
1070 }
1071
1072 int w_e_end_ov_req(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1073 {
1074 struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w);
1075 int digest_size;
1076 void *digest;
1077 int ok = 1;
1078
1079 if (unlikely(cancel))
1080 goto out;
1081
1082 if (unlikely((e->flags & EE_WAS_ERROR) != 0))
1083 goto out;
1084
1085 digest_size = crypto_hash_digestsize(mdev->verify_tfm);
1086 /* FIXME if this allocation fails, online verify will not terminate! */
1087 digest = kmalloc(digest_size, GFP_NOIO);
1088 if (digest) {
1089 drbd_csum_ee(mdev, mdev->verify_tfm, e, digest);
1090 inc_rs_pending(mdev);
1091 ok = drbd_send_drequest_csum(mdev, e->sector, e->size,
1092 digest, digest_size, P_OV_REPLY);
1093 if (!ok)
1094 dec_rs_pending(mdev);
1095 kfree(digest);
1096 }
1097
1098 out:
1099 drbd_free_ee(mdev, e);
1100
1101 dec_unacked(mdev);
1102
1103 return ok;
1104 }
1105
1106 void drbd_ov_oos_found(struct drbd_conf *mdev, sector_t sector, int size)
1107 {
1108 if (mdev->ov_last_oos_start + mdev->ov_last_oos_size == sector) {
1109 mdev->ov_last_oos_size += size>>9;
1110 } else {
1111 mdev->ov_last_oos_start = sector;
1112 mdev->ov_last_oos_size = size>>9;
1113 }
1114 drbd_set_out_of_sync(mdev, sector, size);
1115 set_bit(WRITE_BM_AFTER_RESYNC, &mdev->flags);
1116 }
1117
1118 int w_e_end_ov_reply(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1119 {
1120 struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w);
1121 struct digest_info *di;
1122 int digest_size;
1123 void *digest;
1124 int ok, eq = 0;
1125
1126 if (unlikely(cancel)) {
1127 drbd_free_ee(mdev, e);
1128 dec_unacked(mdev);
1129 return 1;
1130 }
1131
1132 /* after "cancel", because after drbd_disconnect/drbd_rs_cancel_all
1133 * the resync lru has been cleaned up already */
1134 drbd_rs_complete_io(mdev, e->sector);
1135
1136 di = e->digest;
1137
1138 if (likely((e->flags & EE_WAS_ERROR) == 0)) {
1139 digest_size = crypto_hash_digestsize(mdev->verify_tfm);
1140 digest = kmalloc(digest_size, GFP_NOIO);
1141 if (digest) {
1142 drbd_csum_ee(mdev, mdev->verify_tfm, e, digest);
1143
1144 D_ASSERT(digest_size == di->digest_size);
1145 eq = !memcmp(digest, di->digest, digest_size);
1146 kfree(digest);
1147 }
1148 } else {
1149 ok = drbd_send_ack(mdev, P_NEG_RS_DREPLY, e);
1150 if (__ratelimit(&drbd_ratelimit_state))
1151 dev_err(DEV, "Sending NegDReply. I guess it gets messy.\n");
1152 }
1153
1154 dec_unacked(mdev);
1155 if (!eq)
1156 drbd_ov_oos_found(mdev, e->sector, e->size);
1157 else
1158 ov_oos_print(mdev);
1159
1160 ok = drbd_send_ack_ex(mdev, P_OV_RESULT, e->sector, e->size,
1161 eq ? ID_IN_SYNC : ID_OUT_OF_SYNC);
1162
1163 drbd_free_ee(mdev, e);
1164
1165 if (--mdev->ov_left == 0) {
1166 ov_oos_print(mdev);
1167 drbd_resync_finished(mdev);
1168 }
1169
1170 return ok;
1171 }
1172
1173 int w_prev_work_done(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1174 {
1175 struct drbd_wq_barrier *b = container_of(w, struct drbd_wq_barrier, w);
1176 complete(&b->done);
1177 return 1;
1178 }
1179
1180 int w_send_barrier(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1181 {
1182 struct drbd_tl_epoch *b = container_of(w, struct drbd_tl_epoch, w);
1183 struct p_barrier *p = &mdev->data.sbuf.barrier;
1184 int ok = 1;
1185
1186 /* really avoid racing with tl_clear. w.cb may have been referenced
1187 * just before it was reassigned and re-queued, so double check that.
1188 * actually, this race was harmless, since we only try to send the
1189 * barrier packet here, and otherwise do nothing with the object.
1190 * but compare with the head of w_clear_epoch */
1191 spin_lock_irq(&mdev->req_lock);
1192 if (w->cb != w_send_barrier || mdev->state.conn < C_CONNECTED)
1193 cancel = 1;
1194 spin_unlock_irq(&mdev->req_lock);
1195 if (cancel)
1196 return 1;
1197
1198 if (!drbd_get_data_sock(mdev))
1199 return 0;
1200 p->barrier = b->br_number;
1201 /* inc_ap_pending was done where this was queued.
1202 * dec_ap_pending will be done in got_BarrierAck
1203 * or (on connection loss) in w_clear_epoch. */
1204 ok = _drbd_send_cmd(mdev, mdev->data.socket, P_BARRIER,
1205 (struct p_header *)p, sizeof(*p), 0);
1206 drbd_put_data_sock(mdev);
1207
1208 return ok;
1209 }
1210
1211 int w_send_write_hint(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1212 {
1213 if (cancel)
1214 return 1;
1215 return drbd_send_short_cmd(mdev, P_UNPLUG_REMOTE);
1216 }
1217
1218 /**
1219 * w_send_dblock() - Worker callback to send a P_DATA packet in order to mirror a write request
1220 * @mdev: DRBD device.
1221 * @w: work object.
1222 * @cancel: The connection will be closed anyways
1223 */
1224 int w_send_dblock(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1225 {
1226 struct drbd_request *req = container_of(w, struct drbd_request, w);
1227 int ok;
1228
1229 if (unlikely(cancel)) {
1230 req_mod(req, send_canceled);
1231 return 1;
1232 }
1233
1234 ok = drbd_send_dblock(mdev, req);
1235 req_mod(req, ok ? handed_over_to_network : send_failed);
1236
1237 return ok;
1238 }
1239
1240 /**
1241 * w_send_read_req() - Worker callback to send a read request (P_DATA_REQUEST) packet
1242 * @mdev: DRBD device.
1243 * @w: work object.
1244 * @cancel: The connection will be closed anyways
1245 */
1246 int w_send_read_req(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1247 {
1248 struct drbd_request *req = container_of(w, struct drbd_request, w);
1249 int ok;
1250
1251 if (unlikely(cancel)) {
1252 req_mod(req, send_canceled);
1253 return 1;
1254 }
1255
1256 ok = drbd_send_drequest(mdev, P_DATA_REQUEST, req->sector, req->size,
1257 (unsigned long)req);
1258
1259 if (!ok) {
1260 /* ?? we set C_TIMEOUT or C_BROKEN_PIPE in drbd_send();
1261 * so this is probably redundant */
1262 if (mdev->state.conn >= C_CONNECTED)
1263 drbd_force_state(mdev, NS(conn, C_NETWORK_FAILURE));
1264 }
1265 req_mod(req, ok ? handed_over_to_network : send_failed);
1266
1267 return ok;
1268 }
1269
1270 int w_restart_disk_io(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1271 {
1272 struct drbd_request *req = container_of(w, struct drbd_request, w);
1273
1274 if (bio_data_dir(req->master_bio) == WRITE)
1275 drbd_al_begin_io(mdev, req->sector);
1276 /* Calling drbd_al_begin_io() out of the worker might deadlocks
1277 theoretically. Practically it can not deadlock, since this is
1278 only used when unfreezing IOs. All the extents of the requests
1279 that made it into the TL are already active */
1280
1281 drbd_req_make_private_bio(req, req->master_bio);
1282 req->private_bio->bi_bdev = mdev->ldev->backing_bdev;
1283 generic_make_request(req->private_bio);
1284
1285 return 1;
1286 }
1287
1288 static int _drbd_may_sync_now(struct drbd_conf *mdev)
1289 {
1290 struct drbd_conf *odev = mdev;
1291
1292 while (1) {
1293 if (odev->sync_conf.after == -1)
1294 return 1;
1295 odev = minor_to_mdev(odev->sync_conf.after);
1296 ERR_IF(!odev) return 1;
1297 if ((odev->state.conn >= C_SYNC_SOURCE &&
1298 odev->state.conn <= C_PAUSED_SYNC_T) ||
1299 odev->state.aftr_isp || odev->state.peer_isp ||
1300 odev->state.user_isp)
1301 return 0;
1302 }
1303 }
1304
1305 /**
1306 * _drbd_pause_after() - Pause resync on all devices that may not resync now
1307 * @mdev: DRBD device.
1308 *
1309 * Called from process context only (admin command and after_state_ch).
1310 */
1311 static int _drbd_pause_after(struct drbd_conf *mdev)
1312 {
1313 struct drbd_conf *odev;
1314 int i, rv = 0;
1315
1316 for (i = 0; i < minor_count; i++) {
1317 odev = minor_to_mdev(i);
1318 if (!odev)
1319 continue;
1320 if (odev->state.conn == C_STANDALONE && odev->state.disk == D_DISKLESS)
1321 continue;
1322 if (!_drbd_may_sync_now(odev))
1323 rv |= (__drbd_set_state(_NS(odev, aftr_isp, 1), CS_HARD, NULL)
1324 != SS_NOTHING_TO_DO);
1325 }
1326
1327 return rv;
1328 }
1329
1330 /**
1331 * _drbd_resume_next() - Resume resync on all devices that may resync now
1332 * @mdev: DRBD device.
1333 *
1334 * Called from process context only (admin command and worker).
1335 */
1336 static int _drbd_resume_next(struct drbd_conf *mdev)
1337 {
1338 struct drbd_conf *odev;
1339 int i, rv = 0;
1340
1341 for (i = 0; i < minor_count; i++) {
1342 odev = minor_to_mdev(i);
1343 if (!odev)
1344 continue;
1345 if (odev->state.conn == C_STANDALONE && odev->state.disk == D_DISKLESS)
1346 continue;
1347 if (odev->state.aftr_isp) {
1348 if (_drbd_may_sync_now(odev))
1349 rv |= (__drbd_set_state(_NS(odev, aftr_isp, 0),
1350 CS_HARD, NULL)
1351 != SS_NOTHING_TO_DO) ;
1352 }
1353 }
1354 return rv;
1355 }
1356
1357 void resume_next_sg(struct drbd_conf *mdev)
1358 {
1359 write_lock_irq(&global_state_lock);
1360 _drbd_resume_next(mdev);
1361 write_unlock_irq(&global_state_lock);
1362 }
1363
1364 void suspend_other_sg(struct drbd_conf *mdev)
1365 {
1366 write_lock_irq(&global_state_lock);
1367 _drbd_pause_after(mdev);
1368 write_unlock_irq(&global_state_lock);
1369 }
1370
1371 static int sync_after_error(struct drbd_conf *mdev, int o_minor)
1372 {
1373 struct drbd_conf *odev;
1374
1375 if (o_minor == -1)
1376 return NO_ERROR;
1377 if (o_minor < -1 || minor_to_mdev(o_minor) == NULL)
1378 return ERR_SYNC_AFTER;
1379
1380 /* check for loops */
1381 odev = minor_to_mdev(o_minor);
1382 while (1) {
1383 if (odev == mdev)
1384 return ERR_SYNC_AFTER_CYCLE;
1385
1386 /* dependency chain ends here, no cycles. */
1387 if (odev->sync_conf.after == -1)
1388 return NO_ERROR;
1389
1390 /* follow the dependency chain */
1391 odev = minor_to_mdev(odev->sync_conf.after);
1392 }
1393 }
1394
1395 int drbd_alter_sa(struct drbd_conf *mdev, int na)
1396 {
1397 int changes;
1398 int retcode;
1399
1400 write_lock_irq(&global_state_lock);
1401 retcode = sync_after_error(mdev, na);
1402 if (retcode == NO_ERROR) {
1403 mdev->sync_conf.after = na;
1404 do {
1405 changes = _drbd_pause_after(mdev);
1406 changes |= _drbd_resume_next(mdev);
1407 } while (changes);
1408 }
1409 write_unlock_irq(&global_state_lock);
1410 return retcode;
1411 }
1412
1413 static void ping_peer(struct drbd_conf *mdev)
1414 {
1415 clear_bit(GOT_PING_ACK, &mdev->flags);
1416 request_ping(mdev);
1417 wait_event(mdev->misc_wait,
1418 test_bit(GOT_PING_ACK, &mdev->flags) || mdev->state.conn < C_CONNECTED);
1419 }
1420
1421 /**
1422 * drbd_start_resync() - Start the resync process
1423 * @mdev: DRBD device.
1424 * @side: Either C_SYNC_SOURCE or C_SYNC_TARGET
1425 *
1426 * This function might bring you directly into one of the
1427 * C_PAUSED_SYNC_* states.
1428 */
1429 void drbd_start_resync(struct drbd_conf *mdev, enum drbd_conns side)
1430 {
1431 union drbd_state ns;
1432 int r;
1433
1434 if (mdev->state.conn >= C_SYNC_SOURCE) {
1435 dev_err(DEV, "Resync already running!\n");
1436 return;
1437 }
1438
1439 /* In case a previous resync run was aborted by an IO error/detach on the peer. */
1440 drbd_rs_cancel_all(mdev);
1441
1442 if (side == C_SYNC_TARGET) {
1443 /* Since application IO was locked out during C_WF_BITMAP_T and
1444 C_WF_SYNC_UUID we are still unmodified. Before going to C_SYNC_TARGET
1445 we check that we might make the data inconsistent. */
1446 r = drbd_khelper(mdev, "before-resync-target");
1447 r = (r >> 8) & 0xff;
1448 if (r > 0) {
1449 dev_info(DEV, "before-resync-target handler returned %d, "
1450 "dropping connection.\n", r);
1451 drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
1452 return;
1453 }
1454 }
1455
1456 drbd_state_lock(mdev);
1457
1458 if (!get_ldev_if_state(mdev, D_NEGOTIATING)) {
1459 drbd_state_unlock(mdev);
1460 return;
1461 }
1462
1463 if (side == C_SYNC_TARGET) {
1464 mdev->bm_resync_fo = 0;
1465 } else /* side == C_SYNC_SOURCE */ {
1466 u64 uuid;
1467
1468 get_random_bytes(&uuid, sizeof(u64));
1469 drbd_uuid_set(mdev, UI_BITMAP, uuid);
1470 drbd_send_sync_uuid(mdev, uuid);
1471
1472 D_ASSERT(mdev->state.disk == D_UP_TO_DATE);
1473 }
1474
1475 write_lock_irq(&global_state_lock);
1476 ns = mdev->state;
1477
1478 ns.aftr_isp = !_drbd_may_sync_now(mdev);
1479
1480 ns.conn = side;
1481
1482 if (side == C_SYNC_TARGET)
1483 ns.disk = D_INCONSISTENT;
1484 else /* side == C_SYNC_SOURCE */
1485 ns.pdsk = D_INCONSISTENT;
1486
1487 r = __drbd_set_state(mdev, ns, CS_VERBOSE, NULL);
1488 ns = mdev->state;
1489
1490 if (ns.conn < C_CONNECTED)
1491 r = SS_UNKNOWN_ERROR;
1492
1493 if (r == SS_SUCCESS) {
1494 unsigned long tw = drbd_bm_total_weight(mdev);
1495 unsigned long now = jiffies;
1496 int i;
1497
1498 mdev->rs_failed = 0;
1499 mdev->rs_paused = 0;
1500 mdev->rs_same_csum = 0;
1501 mdev->rs_last_events = 0;
1502 mdev->rs_last_sect_ev = 0;
1503 mdev->rs_total = tw;
1504 mdev->rs_start = now;
1505 for (i = 0; i < DRBD_SYNC_MARKS; i++) {
1506 mdev->rs_mark_left[i] = tw;
1507 mdev->rs_mark_time[i] = now;
1508 }
1509 _drbd_pause_after(mdev);
1510 }
1511 write_unlock_irq(&global_state_lock);
1512 put_ldev(mdev);
1513
1514 if (r == SS_SUCCESS) {
1515 dev_info(DEV, "Began resync as %s (will sync %lu KB [%lu bits set]).\n",
1516 drbd_conn_str(ns.conn),
1517 (unsigned long) mdev->rs_total << (BM_BLOCK_SHIFT-10),
1518 (unsigned long) mdev->rs_total);
1519
1520 if (mdev->rs_total == 0) {
1521 /* Peer still reachable? Beware of failing before-resync-target handlers! */
1522 ping_peer(mdev);
1523 drbd_resync_finished(mdev);
1524 }
1525
1526 atomic_set(&mdev->rs_sect_in, 0);
1527 atomic_set(&mdev->rs_sect_ev, 0);
1528 mdev->rs_in_flight = 0;
1529 mdev->rs_planed = 0;
1530 spin_lock(&mdev->peer_seq_lock);
1531 fifo_set(&mdev->rs_plan_s, 0);
1532 spin_unlock(&mdev->peer_seq_lock);
1533 /* ns.conn may already be != mdev->state.conn,
1534 * we may have been paused in between, or become paused until
1535 * the timer triggers.
1536 * No matter, that is handled in resync_timer_fn() */
1537 if (ns.conn == C_SYNC_TARGET)
1538 mod_timer(&mdev->resync_timer, jiffies);
1539
1540 drbd_md_sync(mdev);
1541 }
1542 drbd_state_unlock(mdev);
1543 }
1544
1545 int drbd_worker(struct drbd_thread *thi)
1546 {
1547 struct drbd_conf *mdev = thi->mdev;
1548 struct drbd_work *w = NULL;
1549 LIST_HEAD(work_list);
1550 int intr = 0, i;
1551
1552 sprintf(current->comm, "drbd%d_worker", mdev_to_minor(mdev));
1553
1554 while (get_t_state(thi) == Running) {
1555 drbd_thread_current_set_cpu(mdev);
1556
1557 if (down_trylock(&mdev->data.work.s)) {
1558 mutex_lock(&mdev->data.mutex);
1559 if (mdev->data.socket && !mdev->net_conf->no_cork)
1560 drbd_tcp_uncork(mdev->data.socket);
1561 mutex_unlock(&mdev->data.mutex);
1562
1563 intr = down_interruptible(&mdev->data.work.s);
1564
1565 mutex_lock(&mdev->data.mutex);
1566 if (mdev->data.socket && !mdev->net_conf->no_cork)
1567 drbd_tcp_cork(mdev->data.socket);
1568 mutex_unlock(&mdev->data.mutex);
1569 }
1570
1571 if (intr) {
1572 D_ASSERT(intr == -EINTR);
1573 flush_signals(current);
1574 ERR_IF (get_t_state(thi) == Running)
1575 continue;
1576 break;
1577 }
1578
1579 if (get_t_state(thi) != Running)
1580 break;
1581 /* With this break, we have done a down() but not consumed
1582 the entry from the list. The cleanup code takes care of
1583 this... */
1584
1585 w = NULL;
1586 spin_lock_irq(&mdev->data.work.q_lock);
1587 ERR_IF(list_empty(&mdev->data.work.q)) {
1588 /* something terribly wrong in our logic.
1589 * we were able to down() the semaphore,
1590 * but the list is empty... doh.
1591 *
1592 * what is the best thing to do now?
1593 * try again from scratch, restarting the receiver,
1594 * asender, whatnot? could break even more ugly,
1595 * e.g. when we are primary, but no good local data.
1596 *
1597 * I'll try to get away just starting over this loop.
1598 */
1599 spin_unlock_irq(&mdev->data.work.q_lock);
1600 continue;
1601 }
1602 w = list_entry(mdev->data.work.q.next, struct drbd_work, list);
1603 list_del_init(&w->list);
1604 spin_unlock_irq(&mdev->data.work.q_lock);
1605
1606 if (!w->cb(mdev, w, mdev->state.conn < C_CONNECTED)) {
1607 /* dev_warn(DEV, "worker: a callback failed! \n"); */
1608 if (mdev->state.conn >= C_CONNECTED)
1609 drbd_force_state(mdev,
1610 NS(conn, C_NETWORK_FAILURE));
1611 }
1612 }
1613 D_ASSERT(test_bit(DEVICE_DYING, &mdev->flags));
1614 D_ASSERT(test_bit(CONFIG_PENDING, &mdev->flags));
1615
1616 spin_lock_irq(&mdev->data.work.q_lock);
1617 i = 0;
1618 while (!list_empty(&mdev->data.work.q)) {
1619 list_splice_init(&mdev->data.work.q, &work_list);
1620 spin_unlock_irq(&mdev->data.work.q_lock);
1621
1622 while (!list_empty(&work_list)) {
1623 w = list_entry(work_list.next, struct drbd_work, list);
1624 list_del_init(&w->list);
1625 w->cb(mdev, w, 1);
1626 i++; /* dead debugging code */
1627 }
1628
1629 spin_lock_irq(&mdev->data.work.q_lock);
1630 }
1631 sema_init(&mdev->data.work.s, 0);
1632 /* DANGEROUS race: if someone did queue his work within the spinlock,
1633 * but up() ed outside the spinlock, we could get an up() on the
1634 * semaphore without corresponding list entry.
1635 * So don't do that.
1636 */
1637 spin_unlock_irq(&mdev->data.work.q_lock);
1638
1639 D_ASSERT(mdev->state.disk == D_DISKLESS && mdev->state.conn == C_STANDALONE);
1640 /* _drbd_set_state only uses stop_nowait.
1641 * wait here for the Exiting receiver. */
1642 drbd_thread_stop(&mdev->receiver);
1643 drbd_mdev_cleanup(mdev);
1644
1645 dev_info(DEV, "worker terminated\n");
1646
1647 clear_bit(DEVICE_DYING, &mdev->flags);
1648 clear_bit(CONFIG_PENDING, &mdev->flags);
1649 wake_up(&mdev->state_wait);
1650
1651 return 0;
1652 }
This page took 0.066532 seconds and 6 git commands to generate.