drbd: detach from frozen backing device
[deliverable/linux.git] / drivers / block / drbd / drbd_worker.c
1 /*
2 drbd_worker.c
3
4 This file is part of DRBD by Philipp Reisner and Lars Ellenberg.
5
6 Copyright (C) 2001-2008, LINBIT Information Technologies GmbH.
7 Copyright (C) 1999-2008, Philipp Reisner <philipp.reisner@linbit.com>.
8 Copyright (C) 2002-2008, Lars Ellenberg <lars.ellenberg@linbit.com>.
9
10 drbd is free software; you can redistribute it and/or modify
11 it under the terms of the GNU General Public License as published by
12 the Free Software Foundation; either version 2, or (at your option)
13 any later version.
14
15 drbd is distributed in the hope that it will be useful,
16 but WITHOUT ANY WARRANTY; without even the implied warranty of
17 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18 GNU General Public License for more details.
19
20 You should have received a copy of the GNU General Public License
21 along with drbd; see the file COPYING. If not, write to
22 the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
23
24 */
25
26 #include <linux/module.h>
27 #include <linux/drbd.h>
28 #include <linux/sched.h>
29 #include <linux/wait.h>
30 #include <linux/mm.h>
31 #include <linux/memcontrol.h>
32 #include <linux/mm_inline.h>
33 #include <linux/slab.h>
34 #include <linux/random.h>
35 #include <linux/string.h>
36 #include <linux/scatterlist.h>
37
38 #include "drbd_int.h"
39 #include "drbd_req.h"
40
41 static int w_make_ov_request(struct drbd_work *w, int cancel);
42
43
44 /* endio handlers:
45 * drbd_md_io_complete (defined here)
46 * drbd_request_endio (defined here)
47 * drbd_peer_request_endio (defined here)
48 * bm_async_io_complete (defined in drbd_bitmap.c)
49 *
50 * For all these callbacks, note the following:
51 * The callbacks will be called in irq context by the IDE drivers,
52 * and in Softirqs/Tasklets/BH context by the SCSI drivers.
53 * Try to get the locking right :)
54 *
55 */
56
57
58 /* About the global_state_lock
59 Each state transition on an device holds a read lock. In case we have
60 to evaluate the resync after dependencies, we grab a write lock, because
61 we need stable states on all devices for that. */
62 rwlock_t global_state_lock;
63
64 /* used for synchronous meta data and bitmap IO
65 * submitted by drbd_md_sync_page_io()
66 */
67 void drbd_md_io_complete(struct bio *bio, int error)
68 {
69 struct drbd_md_io *md_io;
70 struct drbd_conf *mdev;
71
72 md_io = (struct drbd_md_io *)bio->bi_private;
73 mdev = container_of(md_io, struct drbd_conf, md_io);
74
75 md_io->error = error;
76
77 md_io->done = 1;
78 wake_up(&mdev->misc_wait);
79 bio_put(bio);
80 drbd_md_put_buffer(mdev);
81 put_ldev(mdev);
82 }
83
84 /* reads on behalf of the partner,
85 * "submitted" by the receiver
86 */
87 void drbd_endio_read_sec_final(struct drbd_peer_request *peer_req) __releases(local)
88 {
89 unsigned long flags = 0;
90 struct drbd_conf *mdev = peer_req->w.mdev;
91
92 spin_lock_irqsave(&mdev->tconn->req_lock, flags);
93 mdev->read_cnt += peer_req->i.size >> 9;
94 list_del(&peer_req->w.list);
95 if (list_empty(&mdev->read_ee))
96 wake_up(&mdev->ee_wait);
97 if (test_bit(__EE_WAS_ERROR, &peer_req->flags))
98 __drbd_chk_io_error(mdev, false);
99 spin_unlock_irqrestore(&mdev->tconn->req_lock, flags);
100
101 drbd_queue_work(&mdev->tconn->data.work, &peer_req->w);
102 put_ldev(mdev);
103 }
104
105 /* writes on behalf of the partner, or resync writes,
106 * "submitted" by the receiver, final stage. */
107 static void drbd_endio_write_sec_final(struct drbd_peer_request *peer_req) __releases(local)
108 {
109 unsigned long flags = 0;
110 struct drbd_conf *mdev = peer_req->w.mdev;
111 struct drbd_interval i;
112 int do_wake;
113 u64 block_id;
114 int do_al_complete_io;
115
116 /* after we moved peer_req to done_ee,
117 * we may no longer access it,
118 * it may be freed/reused already!
119 * (as soon as we release the req_lock) */
120 i = peer_req->i;
121 do_al_complete_io = peer_req->flags & EE_CALL_AL_COMPLETE_IO;
122 block_id = peer_req->block_id;
123
124 spin_lock_irqsave(&mdev->tconn->req_lock, flags);
125 mdev->writ_cnt += peer_req->i.size >> 9;
126 list_del(&peer_req->w.list); /* has been on active_ee or sync_ee */
127 list_add_tail(&peer_req->w.list, &mdev->done_ee);
128
129 /*
130 * Do not remove from the write_requests tree here: we did not send the
131 * Ack yet and did not wake possibly waiting conflicting requests.
132 * Removed from the tree from "drbd_process_done_ee" within the
133 * appropriate w.cb (e_end_block/e_end_resync_block) or from
134 * _drbd_clear_done_ee.
135 */
136
137 do_wake = list_empty(block_id == ID_SYNCER ? &mdev->sync_ee : &mdev->active_ee);
138
139 if (test_bit(__EE_WAS_ERROR, &peer_req->flags))
140 __drbd_chk_io_error(mdev, false);
141 spin_unlock_irqrestore(&mdev->tconn->req_lock, flags);
142
143 if (block_id == ID_SYNCER)
144 drbd_rs_complete_io(mdev, i.sector);
145
146 if (do_wake)
147 wake_up(&mdev->ee_wait);
148
149 if (do_al_complete_io)
150 drbd_al_complete_io(mdev, &i);
151
152 wake_asender(mdev->tconn);
153 put_ldev(mdev);
154 }
155
156 /* writes on behalf of the partner, or resync writes,
157 * "submitted" by the receiver.
158 */
159 void drbd_peer_request_endio(struct bio *bio, int error)
160 {
161 struct drbd_peer_request *peer_req = bio->bi_private;
162 struct drbd_conf *mdev = peer_req->w.mdev;
163 int uptodate = bio_flagged(bio, BIO_UPTODATE);
164 int is_write = bio_data_dir(bio) == WRITE;
165
166 if (error && __ratelimit(&drbd_ratelimit_state))
167 dev_warn(DEV, "%s: error=%d s=%llus\n",
168 is_write ? "write" : "read", error,
169 (unsigned long long)peer_req->i.sector);
170 if (!error && !uptodate) {
171 if (__ratelimit(&drbd_ratelimit_state))
172 dev_warn(DEV, "%s: setting error to -EIO s=%llus\n",
173 is_write ? "write" : "read",
174 (unsigned long long)peer_req->i.sector);
175 /* strange behavior of some lower level drivers...
176 * fail the request by clearing the uptodate flag,
177 * but do not return any error?! */
178 error = -EIO;
179 }
180
181 if (error)
182 set_bit(__EE_WAS_ERROR, &peer_req->flags);
183
184 bio_put(bio); /* no need for the bio anymore */
185 if (atomic_dec_and_test(&peer_req->pending_bios)) {
186 if (is_write)
187 drbd_endio_write_sec_final(peer_req);
188 else
189 drbd_endio_read_sec_final(peer_req);
190 }
191 }
192
193 /* read, readA or write requests on R_PRIMARY coming from drbd_make_request
194 */
195 void drbd_request_endio(struct bio *bio, int error)
196 {
197 unsigned long flags;
198 struct drbd_request *req = bio->bi_private;
199 struct drbd_conf *mdev = req->w.mdev;
200 struct bio_and_error m;
201 enum drbd_req_event what;
202 int uptodate = bio_flagged(bio, BIO_UPTODATE);
203
204 if (!error && !uptodate) {
205 dev_warn(DEV, "p %s: setting error to -EIO\n",
206 bio_data_dir(bio) == WRITE ? "write" : "read");
207 /* strange behavior of some lower level drivers...
208 * fail the request by clearing the uptodate flag,
209 * but do not return any error?! */
210 error = -EIO;
211 }
212
213 /* to avoid recursion in __req_mod */
214 if (unlikely(error)) {
215 what = (bio_data_dir(bio) == WRITE)
216 ? WRITE_COMPLETED_WITH_ERROR
217 : (bio_rw(bio) == READ)
218 ? READ_COMPLETED_WITH_ERROR
219 : READ_AHEAD_COMPLETED_WITH_ERROR;
220 } else
221 what = COMPLETED_OK;
222
223 bio_put(req->private_bio);
224 req->private_bio = ERR_PTR(error);
225
226 /* not req_mod(), we need irqsave here! */
227 spin_lock_irqsave(&mdev->tconn->req_lock, flags);
228 __req_mod(req, what, &m);
229 spin_unlock_irqrestore(&mdev->tconn->req_lock, flags);
230
231 if (m.bio)
232 complete_master_bio(mdev, &m);
233 }
234
235 int w_read_retry_remote(struct drbd_work *w, int cancel)
236 {
237 struct drbd_request *req = container_of(w, struct drbd_request, w);
238 struct drbd_conf *mdev = w->mdev;
239
240 /* We should not detach for read io-error,
241 * but try to WRITE the P_DATA_REPLY to the failed location,
242 * to give the disk the chance to relocate that block */
243
244 spin_lock_irq(&mdev->tconn->req_lock);
245 if (cancel || mdev->state.pdsk != D_UP_TO_DATE) {
246 _req_mod(req, READ_RETRY_REMOTE_CANCELED);
247 spin_unlock_irq(&mdev->tconn->req_lock);
248 return 0;
249 }
250 spin_unlock_irq(&mdev->tconn->req_lock);
251
252 return w_send_read_req(w, 0);
253 }
254
255 void drbd_csum_ee(struct drbd_conf *mdev, struct crypto_hash *tfm,
256 struct drbd_peer_request *peer_req, void *digest)
257 {
258 struct hash_desc desc;
259 struct scatterlist sg;
260 struct page *page = peer_req->pages;
261 struct page *tmp;
262 unsigned len;
263
264 desc.tfm = tfm;
265 desc.flags = 0;
266
267 sg_init_table(&sg, 1);
268 crypto_hash_init(&desc);
269
270 while ((tmp = page_chain_next(page))) {
271 /* all but the last page will be fully used */
272 sg_set_page(&sg, page, PAGE_SIZE, 0);
273 crypto_hash_update(&desc, &sg, sg.length);
274 page = tmp;
275 }
276 /* and now the last, possibly only partially used page */
277 len = peer_req->i.size & (PAGE_SIZE - 1);
278 sg_set_page(&sg, page, len ?: PAGE_SIZE, 0);
279 crypto_hash_update(&desc, &sg, sg.length);
280 crypto_hash_final(&desc, digest);
281 }
282
283 void drbd_csum_bio(struct drbd_conf *mdev, struct crypto_hash *tfm, struct bio *bio, void *digest)
284 {
285 struct hash_desc desc;
286 struct scatterlist sg;
287 struct bio_vec *bvec;
288 int i;
289
290 desc.tfm = tfm;
291 desc.flags = 0;
292
293 sg_init_table(&sg, 1);
294 crypto_hash_init(&desc);
295
296 __bio_for_each_segment(bvec, bio, i, 0) {
297 sg_set_page(&sg, bvec->bv_page, bvec->bv_len, bvec->bv_offset);
298 crypto_hash_update(&desc, &sg, sg.length);
299 }
300 crypto_hash_final(&desc, digest);
301 }
302
303 /* MAYBE merge common code with w_e_end_ov_req */
304 static int w_e_send_csum(struct drbd_work *w, int cancel)
305 {
306 struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
307 struct drbd_conf *mdev = w->mdev;
308 int digest_size;
309 void *digest;
310 int err = 0;
311
312 if (unlikely(cancel))
313 goto out;
314
315 if (unlikely((peer_req->flags & EE_WAS_ERROR) != 0))
316 goto out;
317
318 digest_size = crypto_hash_digestsize(mdev->tconn->csums_tfm);
319 digest = kmalloc(digest_size, GFP_NOIO);
320 if (digest) {
321 sector_t sector = peer_req->i.sector;
322 unsigned int size = peer_req->i.size;
323 drbd_csum_ee(mdev, mdev->tconn->csums_tfm, peer_req, digest);
324 /* Free peer_req and pages before send.
325 * In case we block on congestion, we could otherwise run into
326 * some distributed deadlock, if the other side blocks on
327 * congestion as well, because our receiver blocks in
328 * drbd_alloc_pages due to pp_in_use > max_buffers. */
329 drbd_free_peer_req(mdev, peer_req);
330 peer_req = NULL;
331 inc_rs_pending(mdev);
332 err = drbd_send_drequest_csum(mdev, sector, size,
333 digest, digest_size,
334 P_CSUM_RS_REQUEST);
335 kfree(digest);
336 } else {
337 dev_err(DEV, "kmalloc() of digest failed.\n");
338 err = -ENOMEM;
339 }
340
341 out:
342 if (peer_req)
343 drbd_free_peer_req(mdev, peer_req);
344
345 if (unlikely(err))
346 dev_err(DEV, "drbd_send_drequest(..., csum) failed\n");
347 return err;
348 }
349
350 #define GFP_TRY (__GFP_HIGHMEM | __GFP_NOWARN)
351
352 static int read_for_csum(struct drbd_conf *mdev, sector_t sector, int size)
353 {
354 struct drbd_peer_request *peer_req;
355
356 if (!get_ldev(mdev))
357 return -EIO;
358
359 if (drbd_rs_should_slow_down(mdev, sector))
360 goto defer;
361
362 /* GFP_TRY, because if there is no memory available right now, this may
363 * be rescheduled for later. It is "only" background resync, after all. */
364 peer_req = drbd_alloc_peer_req(mdev, ID_SYNCER /* unused */, sector,
365 size, GFP_TRY);
366 if (!peer_req)
367 goto defer;
368
369 peer_req->w.cb = w_e_send_csum;
370 spin_lock_irq(&mdev->tconn->req_lock);
371 list_add(&peer_req->w.list, &mdev->read_ee);
372 spin_unlock_irq(&mdev->tconn->req_lock);
373
374 atomic_add(size >> 9, &mdev->rs_sect_ev);
375 if (drbd_submit_peer_request(mdev, peer_req, READ, DRBD_FAULT_RS_RD) == 0)
376 return 0;
377
378 /* If it failed because of ENOMEM, retry should help. If it failed
379 * because bio_add_page failed (probably broken lower level driver),
380 * retry may or may not help.
381 * If it does not, you may need to force disconnect. */
382 spin_lock_irq(&mdev->tconn->req_lock);
383 list_del(&peer_req->w.list);
384 spin_unlock_irq(&mdev->tconn->req_lock);
385
386 drbd_free_peer_req(mdev, peer_req);
387 defer:
388 put_ldev(mdev);
389 return -EAGAIN;
390 }
391
392 int w_resync_timer(struct drbd_work *w, int cancel)
393 {
394 struct drbd_conf *mdev = w->mdev;
395 switch (mdev->state.conn) {
396 case C_VERIFY_S:
397 w_make_ov_request(w, cancel);
398 break;
399 case C_SYNC_TARGET:
400 w_make_resync_request(w, cancel);
401 break;
402 }
403
404 return 0;
405 }
406
407 void resync_timer_fn(unsigned long data)
408 {
409 struct drbd_conf *mdev = (struct drbd_conf *) data;
410
411 if (list_empty(&mdev->resync_work.list))
412 drbd_queue_work(&mdev->tconn->data.work, &mdev->resync_work);
413 }
414
415 static void fifo_set(struct fifo_buffer *fb, int value)
416 {
417 int i;
418
419 for (i = 0; i < fb->size; i++)
420 fb->values[i] = value;
421 }
422
423 static int fifo_push(struct fifo_buffer *fb, int value)
424 {
425 int ov;
426
427 ov = fb->values[fb->head_index];
428 fb->values[fb->head_index++] = value;
429
430 if (fb->head_index >= fb->size)
431 fb->head_index = 0;
432
433 return ov;
434 }
435
436 static void fifo_add_val(struct fifo_buffer *fb, int value)
437 {
438 int i;
439
440 for (i = 0; i < fb->size; i++)
441 fb->values[i] += value;
442 }
443
444 struct fifo_buffer *fifo_alloc(int fifo_size)
445 {
446 struct fifo_buffer *fb;
447
448 fb = kzalloc(sizeof(struct fifo_buffer) + sizeof(int) * fifo_size, GFP_KERNEL);
449 if (!fb)
450 return NULL;
451
452 fb->head_index = 0;
453 fb->size = fifo_size;
454 fb->total = 0;
455
456 return fb;
457 }
458
459 static int drbd_rs_controller(struct drbd_conf *mdev)
460 {
461 struct disk_conf *dc;
462 unsigned int sect_in; /* Number of sectors that came in since the last turn */
463 unsigned int want; /* The number of sectors we want in the proxy */
464 int req_sect; /* Number of sectors to request in this turn */
465 int correction; /* Number of sectors more we need in the proxy*/
466 int cps; /* correction per invocation of drbd_rs_controller() */
467 int steps; /* Number of time steps to plan ahead */
468 int curr_corr;
469 int max_sect;
470 struct fifo_buffer *plan;
471
472 sect_in = atomic_xchg(&mdev->rs_sect_in, 0); /* Number of sectors that came in */
473 mdev->rs_in_flight -= sect_in;
474
475 dc = rcu_dereference(mdev->ldev->disk_conf);
476 plan = rcu_dereference(mdev->rs_plan_s);
477
478 steps = plan->size; /* (dc->c_plan_ahead * 10 * SLEEP_TIME) / HZ; */
479
480 if (mdev->rs_in_flight + sect_in == 0) { /* At start of resync */
481 want = ((dc->resync_rate * 2 * SLEEP_TIME) / HZ) * steps;
482 } else { /* normal path */
483 want = dc->c_fill_target ? dc->c_fill_target :
484 sect_in * dc->c_delay_target * HZ / (SLEEP_TIME * 10);
485 }
486
487 correction = want - mdev->rs_in_flight - plan->total;
488
489 /* Plan ahead */
490 cps = correction / steps;
491 fifo_add_val(plan, cps);
492 plan->total += cps * steps;
493
494 /* What we do in this step */
495 curr_corr = fifo_push(plan, 0);
496 plan->total -= curr_corr;
497
498 req_sect = sect_in + curr_corr;
499 if (req_sect < 0)
500 req_sect = 0;
501
502 max_sect = (dc->c_max_rate * 2 * SLEEP_TIME) / HZ;
503 if (req_sect > max_sect)
504 req_sect = max_sect;
505
506 /*
507 dev_warn(DEV, "si=%u if=%d wa=%u co=%d st=%d cps=%d pl=%d cc=%d rs=%d\n",
508 sect_in, mdev->rs_in_flight, want, correction,
509 steps, cps, mdev->rs_planed, curr_corr, req_sect);
510 */
511
512 return req_sect;
513 }
514
515 static int drbd_rs_number_requests(struct drbd_conf *mdev)
516 {
517 int number;
518
519 rcu_read_lock();
520 if (rcu_dereference(mdev->rs_plan_s)->size) {
521 number = drbd_rs_controller(mdev) >> (BM_BLOCK_SHIFT - 9);
522 mdev->c_sync_rate = number * HZ * (BM_BLOCK_SIZE / 1024) / SLEEP_TIME;
523 } else {
524 mdev->c_sync_rate = rcu_dereference(mdev->ldev->disk_conf)->resync_rate;
525 number = SLEEP_TIME * mdev->c_sync_rate / ((BM_BLOCK_SIZE / 1024) * HZ);
526 }
527 rcu_read_unlock();
528
529 /* ignore the amount of pending requests, the resync controller should
530 * throttle down to incoming reply rate soon enough anyways. */
531 return number;
532 }
533
534 int w_make_resync_request(struct drbd_work *w, int cancel)
535 {
536 struct drbd_conf *mdev = w->mdev;
537 unsigned long bit;
538 sector_t sector;
539 const sector_t capacity = drbd_get_capacity(mdev->this_bdev);
540 int max_bio_size;
541 int number, rollback_i, size;
542 int align, queued, sndbuf;
543 int i = 0;
544
545 if (unlikely(cancel))
546 return 0;
547
548 if (mdev->rs_total == 0) {
549 /* empty resync? */
550 drbd_resync_finished(mdev);
551 return 0;
552 }
553
554 if (!get_ldev(mdev)) {
555 /* Since we only need to access mdev->rsync a
556 get_ldev_if_state(mdev,D_FAILED) would be sufficient, but
557 to continue resync with a broken disk makes no sense at
558 all */
559 dev_err(DEV, "Disk broke down during resync!\n");
560 return 0;
561 }
562
563 max_bio_size = queue_max_hw_sectors(mdev->rq_queue) << 9;
564 number = drbd_rs_number_requests(mdev);
565 if (number == 0)
566 goto requeue;
567
568 for (i = 0; i < number; i++) {
569 /* Stop generating RS requests, when half of the send buffer is filled */
570 mutex_lock(&mdev->tconn->data.mutex);
571 if (mdev->tconn->data.socket) {
572 queued = mdev->tconn->data.socket->sk->sk_wmem_queued;
573 sndbuf = mdev->tconn->data.socket->sk->sk_sndbuf;
574 } else {
575 queued = 1;
576 sndbuf = 0;
577 }
578 mutex_unlock(&mdev->tconn->data.mutex);
579 if (queued > sndbuf / 2)
580 goto requeue;
581
582 next_sector:
583 size = BM_BLOCK_SIZE;
584 bit = drbd_bm_find_next(mdev, mdev->bm_resync_fo);
585
586 if (bit == DRBD_END_OF_BITMAP) {
587 mdev->bm_resync_fo = drbd_bm_bits(mdev);
588 put_ldev(mdev);
589 return 0;
590 }
591
592 sector = BM_BIT_TO_SECT(bit);
593
594 if (drbd_rs_should_slow_down(mdev, sector) ||
595 drbd_try_rs_begin_io(mdev, sector)) {
596 mdev->bm_resync_fo = bit;
597 goto requeue;
598 }
599 mdev->bm_resync_fo = bit + 1;
600
601 if (unlikely(drbd_bm_test_bit(mdev, bit) == 0)) {
602 drbd_rs_complete_io(mdev, sector);
603 goto next_sector;
604 }
605
606 #if DRBD_MAX_BIO_SIZE > BM_BLOCK_SIZE
607 /* try to find some adjacent bits.
608 * we stop if we have already the maximum req size.
609 *
610 * Additionally always align bigger requests, in order to
611 * be prepared for all stripe sizes of software RAIDs.
612 */
613 align = 1;
614 rollback_i = i;
615 for (;;) {
616 if (size + BM_BLOCK_SIZE > max_bio_size)
617 break;
618
619 /* Be always aligned */
620 if (sector & ((1<<(align+3))-1))
621 break;
622
623 /* do not cross extent boundaries */
624 if (((bit+1) & BM_BLOCKS_PER_BM_EXT_MASK) == 0)
625 break;
626 /* now, is it actually dirty, after all?
627 * caution, drbd_bm_test_bit is tri-state for some
628 * obscure reason; ( b == 0 ) would get the out-of-band
629 * only accidentally right because of the "oddly sized"
630 * adjustment below */
631 if (drbd_bm_test_bit(mdev, bit+1) != 1)
632 break;
633 bit++;
634 size += BM_BLOCK_SIZE;
635 if ((BM_BLOCK_SIZE << align) <= size)
636 align++;
637 i++;
638 }
639 /* if we merged some,
640 * reset the offset to start the next drbd_bm_find_next from */
641 if (size > BM_BLOCK_SIZE)
642 mdev->bm_resync_fo = bit + 1;
643 #endif
644
645 /* adjust very last sectors, in case we are oddly sized */
646 if (sector + (size>>9) > capacity)
647 size = (capacity-sector)<<9;
648 if (mdev->tconn->agreed_pro_version >= 89 && mdev->tconn->csums_tfm) {
649 switch (read_for_csum(mdev, sector, size)) {
650 case -EIO: /* Disk failure */
651 put_ldev(mdev);
652 return -EIO;
653 case -EAGAIN: /* allocation failed, or ldev busy */
654 drbd_rs_complete_io(mdev, sector);
655 mdev->bm_resync_fo = BM_SECT_TO_BIT(sector);
656 i = rollback_i;
657 goto requeue;
658 case 0:
659 /* everything ok */
660 break;
661 default:
662 BUG();
663 }
664 } else {
665 int err;
666
667 inc_rs_pending(mdev);
668 err = drbd_send_drequest(mdev, P_RS_DATA_REQUEST,
669 sector, size, ID_SYNCER);
670 if (err) {
671 dev_err(DEV, "drbd_send_drequest() failed, aborting...\n");
672 dec_rs_pending(mdev);
673 put_ldev(mdev);
674 return err;
675 }
676 }
677 }
678
679 if (mdev->bm_resync_fo >= drbd_bm_bits(mdev)) {
680 /* last syncer _request_ was sent,
681 * but the P_RS_DATA_REPLY not yet received. sync will end (and
682 * next sync group will resume), as soon as we receive the last
683 * resync data block, and the last bit is cleared.
684 * until then resync "work" is "inactive" ...
685 */
686 put_ldev(mdev);
687 return 0;
688 }
689
690 requeue:
691 mdev->rs_in_flight += (i << (BM_BLOCK_SHIFT - 9));
692 mod_timer(&mdev->resync_timer, jiffies + SLEEP_TIME);
693 put_ldev(mdev);
694 return 0;
695 }
696
697 static int w_make_ov_request(struct drbd_work *w, int cancel)
698 {
699 struct drbd_conf *mdev = w->mdev;
700 int number, i, size;
701 sector_t sector;
702 const sector_t capacity = drbd_get_capacity(mdev->this_bdev);
703
704 if (unlikely(cancel))
705 return 1;
706
707 number = drbd_rs_number_requests(mdev);
708
709 sector = mdev->ov_position;
710 for (i = 0; i < number; i++) {
711 if (sector >= capacity) {
712 return 1;
713 }
714
715 size = BM_BLOCK_SIZE;
716
717 if (drbd_rs_should_slow_down(mdev, sector) ||
718 drbd_try_rs_begin_io(mdev, sector)) {
719 mdev->ov_position = sector;
720 goto requeue;
721 }
722
723 if (sector + (size>>9) > capacity)
724 size = (capacity-sector)<<9;
725
726 inc_rs_pending(mdev);
727 if (drbd_send_ov_request(mdev, sector, size)) {
728 dec_rs_pending(mdev);
729 return 0;
730 }
731 sector += BM_SECT_PER_BIT;
732 }
733 mdev->ov_position = sector;
734
735 requeue:
736 mdev->rs_in_flight += (i << (BM_BLOCK_SHIFT - 9));
737 mod_timer(&mdev->resync_timer, jiffies + SLEEP_TIME);
738 return 1;
739 }
740
741 int w_ov_finished(struct drbd_work *w, int cancel)
742 {
743 struct drbd_conf *mdev = w->mdev;
744 kfree(w);
745 ov_out_of_sync_print(mdev);
746 drbd_resync_finished(mdev);
747
748 return 0;
749 }
750
751 static int w_resync_finished(struct drbd_work *w, int cancel)
752 {
753 struct drbd_conf *mdev = w->mdev;
754 kfree(w);
755
756 drbd_resync_finished(mdev);
757
758 return 0;
759 }
760
761 static void ping_peer(struct drbd_conf *mdev)
762 {
763 struct drbd_tconn *tconn = mdev->tconn;
764
765 clear_bit(GOT_PING_ACK, &tconn->flags);
766 request_ping(tconn);
767 wait_event(tconn->ping_wait,
768 test_bit(GOT_PING_ACK, &tconn->flags) || mdev->state.conn < C_CONNECTED);
769 }
770
771 int drbd_resync_finished(struct drbd_conf *mdev)
772 {
773 unsigned long db, dt, dbdt;
774 unsigned long n_oos;
775 union drbd_state os, ns;
776 struct drbd_work *w;
777 char *khelper_cmd = NULL;
778 int verify_done = 0;
779
780 /* Remove all elements from the resync LRU. Since future actions
781 * might set bits in the (main) bitmap, then the entries in the
782 * resync LRU would be wrong. */
783 if (drbd_rs_del_all(mdev)) {
784 /* In case this is not possible now, most probably because
785 * there are P_RS_DATA_REPLY Packets lingering on the worker's
786 * queue (or even the read operations for those packets
787 * is not finished by now). Retry in 100ms. */
788
789 schedule_timeout_interruptible(HZ / 10);
790 w = kmalloc(sizeof(struct drbd_work), GFP_ATOMIC);
791 if (w) {
792 w->cb = w_resync_finished;
793 drbd_queue_work(&mdev->tconn->data.work, w);
794 return 1;
795 }
796 dev_err(DEV, "Warn failed to drbd_rs_del_all() and to kmalloc(w).\n");
797 }
798
799 dt = (jiffies - mdev->rs_start - mdev->rs_paused) / HZ;
800 if (dt <= 0)
801 dt = 1;
802 db = mdev->rs_total;
803 dbdt = Bit2KB(db/dt);
804 mdev->rs_paused /= HZ;
805
806 if (!get_ldev(mdev))
807 goto out;
808
809 ping_peer(mdev);
810
811 spin_lock_irq(&mdev->tconn->req_lock);
812 os = drbd_read_state(mdev);
813
814 verify_done = (os.conn == C_VERIFY_S || os.conn == C_VERIFY_T);
815
816 /* This protects us against multiple calls (that can happen in the presence
817 of application IO), and against connectivity loss just before we arrive here. */
818 if (os.conn <= C_CONNECTED)
819 goto out_unlock;
820
821 ns = os;
822 ns.conn = C_CONNECTED;
823
824 dev_info(DEV, "%s done (total %lu sec; paused %lu sec; %lu K/sec)\n",
825 verify_done ? "Online verify " : "Resync",
826 dt + mdev->rs_paused, mdev->rs_paused, dbdt);
827
828 n_oos = drbd_bm_total_weight(mdev);
829
830 if (os.conn == C_VERIFY_S || os.conn == C_VERIFY_T) {
831 if (n_oos) {
832 dev_alert(DEV, "Online verify found %lu %dk block out of sync!\n",
833 n_oos, Bit2KB(1));
834 khelper_cmd = "out-of-sync";
835 }
836 } else {
837 D_ASSERT((n_oos - mdev->rs_failed) == 0);
838
839 if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T)
840 khelper_cmd = "after-resync-target";
841
842 if (mdev->tconn->csums_tfm && mdev->rs_total) {
843 const unsigned long s = mdev->rs_same_csum;
844 const unsigned long t = mdev->rs_total;
845 const int ratio =
846 (t == 0) ? 0 :
847 (t < 100000) ? ((s*100)/t) : (s/(t/100));
848 dev_info(DEV, "%u %% had equal checksums, eliminated: %luK; "
849 "transferred %luK total %luK\n",
850 ratio,
851 Bit2KB(mdev->rs_same_csum),
852 Bit2KB(mdev->rs_total - mdev->rs_same_csum),
853 Bit2KB(mdev->rs_total));
854 }
855 }
856
857 if (mdev->rs_failed) {
858 dev_info(DEV, " %lu failed blocks\n", mdev->rs_failed);
859
860 if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T) {
861 ns.disk = D_INCONSISTENT;
862 ns.pdsk = D_UP_TO_DATE;
863 } else {
864 ns.disk = D_UP_TO_DATE;
865 ns.pdsk = D_INCONSISTENT;
866 }
867 } else {
868 ns.disk = D_UP_TO_DATE;
869 ns.pdsk = D_UP_TO_DATE;
870
871 if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T) {
872 if (mdev->p_uuid) {
873 int i;
874 for (i = UI_BITMAP ; i <= UI_HISTORY_END ; i++)
875 _drbd_uuid_set(mdev, i, mdev->p_uuid[i]);
876 drbd_uuid_set(mdev, UI_BITMAP, mdev->ldev->md.uuid[UI_CURRENT]);
877 _drbd_uuid_set(mdev, UI_CURRENT, mdev->p_uuid[UI_CURRENT]);
878 } else {
879 dev_err(DEV, "mdev->p_uuid is NULL! BUG\n");
880 }
881 }
882
883 if (!(os.conn == C_VERIFY_S || os.conn == C_VERIFY_T)) {
884 /* for verify runs, we don't update uuids here,
885 * so there would be nothing to report. */
886 drbd_uuid_set_bm(mdev, 0UL);
887 drbd_print_uuids(mdev, "updated UUIDs");
888 if (mdev->p_uuid) {
889 /* Now the two UUID sets are equal, update what we
890 * know of the peer. */
891 int i;
892 for (i = UI_CURRENT ; i <= UI_HISTORY_END ; i++)
893 mdev->p_uuid[i] = mdev->ldev->md.uuid[i];
894 }
895 }
896 }
897
898 _drbd_set_state(mdev, ns, CS_VERBOSE, NULL);
899 out_unlock:
900 spin_unlock_irq(&mdev->tconn->req_lock);
901 put_ldev(mdev);
902 out:
903 mdev->rs_total = 0;
904 mdev->rs_failed = 0;
905 mdev->rs_paused = 0;
906 if (verify_done)
907 mdev->ov_start_sector = 0;
908
909 drbd_md_sync(mdev);
910
911 if (khelper_cmd)
912 drbd_khelper(mdev, khelper_cmd);
913
914 return 1;
915 }
916
917 /* helper */
918 static void move_to_net_ee_or_free(struct drbd_conf *mdev, struct drbd_peer_request *peer_req)
919 {
920 if (drbd_peer_req_has_active_page(peer_req)) {
921 /* This might happen if sendpage() has not finished */
922 int i = (peer_req->i.size + PAGE_SIZE -1) >> PAGE_SHIFT;
923 atomic_add(i, &mdev->pp_in_use_by_net);
924 atomic_sub(i, &mdev->pp_in_use);
925 spin_lock_irq(&mdev->tconn->req_lock);
926 list_add_tail(&peer_req->w.list, &mdev->net_ee);
927 spin_unlock_irq(&mdev->tconn->req_lock);
928 wake_up(&drbd_pp_wait);
929 } else
930 drbd_free_peer_req(mdev, peer_req);
931 }
932
933 /**
934 * w_e_end_data_req() - Worker callback, to send a P_DATA_REPLY packet in response to a P_DATA_REQUEST
935 * @mdev: DRBD device.
936 * @w: work object.
937 * @cancel: The connection will be closed anyways
938 */
939 int w_e_end_data_req(struct drbd_work *w, int cancel)
940 {
941 struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
942 struct drbd_conf *mdev = w->mdev;
943 int err;
944
945 if (unlikely(cancel)) {
946 drbd_free_peer_req(mdev, peer_req);
947 dec_unacked(mdev);
948 return 0;
949 }
950
951 if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
952 err = drbd_send_block(mdev, P_DATA_REPLY, peer_req);
953 } else {
954 if (__ratelimit(&drbd_ratelimit_state))
955 dev_err(DEV, "Sending NegDReply. sector=%llus.\n",
956 (unsigned long long)peer_req->i.sector);
957
958 err = drbd_send_ack(mdev, P_NEG_DREPLY, peer_req);
959 }
960
961 dec_unacked(mdev);
962
963 move_to_net_ee_or_free(mdev, peer_req);
964
965 if (unlikely(err))
966 dev_err(DEV, "drbd_send_block() failed\n");
967 return err;
968 }
969
970 /**
971 * w_e_end_rsdata_req() - Worker callback to send a P_RS_DATA_REPLY packet in response to a P_RS_DATA_REQUESTRS
972 * @mdev: DRBD device.
973 * @w: work object.
974 * @cancel: The connection will be closed anyways
975 */
976 int w_e_end_rsdata_req(struct drbd_work *w, int cancel)
977 {
978 struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
979 struct drbd_conf *mdev = w->mdev;
980 int err;
981
982 if (unlikely(cancel)) {
983 drbd_free_peer_req(mdev, peer_req);
984 dec_unacked(mdev);
985 return 0;
986 }
987
988 if (get_ldev_if_state(mdev, D_FAILED)) {
989 drbd_rs_complete_io(mdev, peer_req->i.sector);
990 put_ldev(mdev);
991 }
992
993 if (mdev->state.conn == C_AHEAD) {
994 err = drbd_send_ack(mdev, P_RS_CANCEL, peer_req);
995 } else if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
996 if (likely(mdev->state.pdsk >= D_INCONSISTENT)) {
997 inc_rs_pending(mdev);
998 err = drbd_send_block(mdev, P_RS_DATA_REPLY, peer_req);
999 } else {
1000 if (__ratelimit(&drbd_ratelimit_state))
1001 dev_err(DEV, "Not sending RSDataReply, "
1002 "partner DISKLESS!\n");
1003 err = 0;
1004 }
1005 } else {
1006 if (__ratelimit(&drbd_ratelimit_state))
1007 dev_err(DEV, "Sending NegRSDReply. sector %llus.\n",
1008 (unsigned long long)peer_req->i.sector);
1009
1010 err = drbd_send_ack(mdev, P_NEG_RS_DREPLY, peer_req);
1011
1012 /* update resync data with failure */
1013 drbd_rs_failed_io(mdev, peer_req->i.sector, peer_req->i.size);
1014 }
1015
1016 dec_unacked(mdev);
1017
1018 move_to_net_ee_or_free(mdev, peer_req);
1019
1020 if (unlikely(err))
1021 dev_err(DEV, "drbd_send_block() failed\n");
1022 return err;
1023 }
1024
1025 int w_e_end_csum_rs_req(struct drbd_work *w, int cancel)
1026 {
1027 struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
1028 struct drbd_conf *mdev = w->mdev;
1029 struct digest_info *di;
1030 int digest_size;
1031 void *digest = NULL;
1032 int err, eq = 0;
1033
1034 if (unlikely(cancel)) {
1035 drbd_free_peer_req(mdev, peer_req);
1036 dec_unacked(mdev);
1037 return 0;
1038 }
1039
1040 if (get_ldev(mdev)) {
1041 drbd_rs_complete_io(mdev, peer_req->i.sector);
1042 put_ldev(mdev);
1043 }
1044
1045 di = peer_req->digest;
1046
1047 if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
1048 /* quick hack to try to avoid a race against reconfiguration.
1049 * a real fix would be much more involved,
1050 * introducing more locking mechanisms */
1051 if (mdev->tconn->csums_tfm) {
1052 digest_size = crypto_hash_digestsize(mdev->tconn->csums_tfm);
1053 D_ASSERT(digest_size == di->digest_size);
1054 digest = kmalloc(digest_size, GFP_NOIO);
1055 }
1056 if (digest) {
1057 drbd_csum_ee(mdev, mdev->tconn->csums_tfm, peer_req, digest);
1058 eq = !memcmp(digest, di->digest, digest_size);
1059 kfree(digest);
1060 }
1061
1062 if (eq) {
1063 drbd_set_in_sync(mdev, peer_req->i.sector, peer_req->i.size);
1064 /* rs_same_csums unit is BM_BLOCK_SIZE */
1065 mdev->rs_same_csum += peer_req->i.size >> BM_BLOCK_SHIFT;
1066 err = drbd_send_ack(mdev, P_RS_IS_IN_SYNC, peer_req);
1067 } else {
1068 inc_rs_pending(mdev);
1069 peer_req->block_id = ID_SYNCER; /* By setting block_id, digest pointer becomes invalid! */
1070 peer_req->flags &= ~EE_HAS_DIGEST; /* This peer request no longer has a digest pointer */
1071 kfree(di);
1072 err = drbd_send_block(mdev, P_RS_DATA_REPLY, peer_req);
1073 }
1074 } else {
1075 err = drbd_send_ack(mdev, P_NEG_RS_DREPLY, peer_req);
1076 if (__ratelimit(&drbd_ratelimit_state))
1077 dev_err(DEV, "Sending NegDReply. I guess it gets messy.\n");
1078 }
1079
1080 dec_unacked(mdev);
1081 move_to_net_ee_or_free(mdev, peer_req);
1082
1083 if (unlikely(err))
1084 dev_err(DEV, "drbd_send_block/ack() failed\n");
1085 return err;
1086 }
1087
1088 int w_e_end_ov_req(struct drbd_work *w, int cancel)
1089 {
1090 struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
1091 struct drbd_conf *mdev = w->mdev;
1092 sector_t sector = peer_req->i.sector;
1093 unsigned int size = peer_req->i.size;
1094 int digest_size;
1095 void *digest;
1096 int err = 0;
1097
1098 if (unlikely(cancel))
1099 goto out;
1100
1101 digest_size = crypto_hash_digestsize(mdev->tconn->verify_tfm);
1102 digest = kmalloc(digest_size, GFP_NOIO);
1103 if (!digest) {
1104 err = 1; /* terminate the connection in case the allocation failed */
1105 goto out;
1106 }
1107
1108 if (likely(!(peer_req->flags & EE_WAS_ERROR)))
1109 drbd_csum_ee(mdev, mdev->tconn->verify_tfm, peer_req, digest);
1110 else
1111 memset(digest, 0, digest_size);
1112
1113 /* Free e and pages before send.
1114 * In case we block on congestion, we could otherwise run into
1115 * some distributed deadlock, if the other side blocks on
1116 * congestion as well, because our receiver blocks in
1117 * drbd_alloc_pages due to pp_in_use > max_buffers. */
1118 drbd_free_peer_req(mdev, peer_req);
1119 peer_req = NULL;
1120 inc_rs_pending(mdev);
1121 err = drbd_send_drequest_csum(mdev, sector, size, digest, digest_size, P_OV_REPLY);
1122 if (err)
1123 dec_rs_pending(mdev);
1124 kfree(digest);
1125
1126 out:
1127 if (peer_req)
1128 drbd_free_peer_req(mdev, peer_req);
1129 dec_unacked(mdev);
1130 return err;
1131 }
1132
1133 void drbd_ov_out_of_sync_found(struct drbd_conf *mdev, sector_t sector, int size)
1134 {
1135 if (mdev->ov_last_oos_start + mdev->ov_last_oos_size == sector) {
1136 mdev->ov_last_oos_size += size>>9;
1137 } else {
1138 mdev->ov_last_oos_start = sector;
1139 mdev->ov_last_oos_size = size>>9;
1140 }
1141 drbd_set_out_of_sync(mdev, sector, size);
1142 }
1143
1144 int w_e_end_ov_reply(struct drbd_work *w, int cancel)
1145 {
1146 struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
1147 struct drbd_conf *mdev = w->mdev;
1148 struct digest_info *di;
1149 void *digest;
1150 sector_t sector = peer_req->i.sector;
1151 unsigned int size = peer_req->i.size;
1152 int digest_size;
1153 int err, eq = 0;
1154
1155 if (unlikely(cancel)) {
1156 drbd_free_peer_req(mdev, peer_req);
1157 dec_unacked(mdev);
1158 return 0;
1159 }
1160
1161 /* after "cancel", because after drbd_disconnect/drbd_rs_cancel_all
1162 * the resync lru has been cleaned up already */
1163 if (get_ldev(mdev)) {
1164 drbd_rs_complete_io(mdev, peer_req->i.sector);
1165 put_ldev(mdev);
1166 }
1167
1168 di = peer_req->digest;
1169
1170 if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
1171 digest_size = crypto_hash_digestsize(mdev->tconn->verify_tfm);
1172 digest = kmalloc(digest_size, GFP_NOIO);
1173 if (digest) {
1174 drbd_csum_ee(mdev, mdev->tconn->verify_tfm, peer_req, digest);
1175
1176 D_ASSERT(digest_size == di->digest_size);
1177 eq = !memcmp(digest, di->digest, digest_size);
1178 kfree(digest);
1179 }
1180 }
1181
1182 /* Free peer_req and pages before send.
1183 * In case we block on congestion, we could otherwise run into
1184 * some distributed deadlock, if the other side blocks on
1185 * congestion as well, because our receiver blocks in
1186 * drbd_alloc_pages due to pp_in_use > max_buffers. */
1187 drbd_free_peer_req(mdev, peer_req);
1188 if (!eq)
1189 drbd_ov_out_of_sync_found(mdev, sector, size);
1190 else
1191 ov_out_of_sync_print(mdev);
1192
1193 err = drbd_send_ack_ex(mdev, P_OV_RESULT, sector, size,
1194 eq ? ID_IN_SYNC : ID_OUT_OF_SYNC);
1195
1196 dec_unacked(mdev);
1197
1198 --mdev->ov_left;
1199
1200 /* let's advance progress step marks only for every other megabyte */
1201 if ((mdev->ov_left & 0x200) == 0x200)
1202 drbd_advance_rs_marks(mdev, mdev->ov_left);
1203
1204 if (mdev->ov_left == 0) {
1205 ov_out_of_sync_print(mdev);
1206 drbd_resync_finished(mdev);
1207 }
1208
1209 return err;
1210 }
1211
1212 int w_prev_work_done(struct drbd_work *w, int cancel)
1213 {
1214 struct drbd_wq_barrier *b = container_of(w, struct drbd_wq_barrier, w);
1215
1216 complete(&b->done);
1217 return 0;
1218 }
1219
1220 int w_send_barrier(struct drbd_work *w, int cancel)
1221 {
1222 struct drbd_socket *sock;
1223 struct drbd_tl_epoch *b = container_of(w, struct drbd_tl_epoch, w);
1224 struct drbd_conf *mdev = w->mdev;
1225 struct p_barrier *p;
1226
1227 /* really avoid racing with tl_clear. w.cb may have been referenced
1228 * just before it was reassigned and re-queued, so double check that.
1229 * actually, this race was harmless, since we only try to send the
1230 * barrier packet here, and otherwise do nothing with the object.
1231 * but compare with the head of w_clear_epoch */
1232 spin_lock_irq(&mdev->tconn->req_lock);
1233 if (w->cb != w_send_barrier || mdev->state.conn < C_CONNECTED)
1234 cancel = 1;
1235 spin_unlock_irq(&mdev->tconn->req_lock);
1236 if (cancel)
1237 return 0;
1238
1239 sock = &mdev->tconn->data;
1240 p = drbd_prepare_command(mdev, sock);
1241 if (!p)
1242 return -EIO;
1243 p->barrier = b->br_number;
1244 /* inc_ap_pending was done where this was queued.
1245 * dec_ap_pending will be done in got_BarrierAck
1246 * or (on connection loss) in w_clear_epoch. */
1247 return drbd_send_command(mdev, sock, P_BARRIER, sizeof(*p), NULL, 0);
1248 }
1249
1250 int w_send_write_hint(struct drbd_work *w, int cancel)
1251 {
1252 struct drbd_conf *mdev = w->mdev;
1253 struct drbd_socket *sock;
1254
1255 if (cancel)
1256 return 0;
1257 sock = &mdev->tconn->data;
1258 if (!drbd_prepare_command(mdev, sock))
1259 return -EIO;
1260 return drbd_send_command(mdev, sock, P_UNPLUG_REMOTE, 0, NULL, 0);
1261 }
1262
1263 int w_send_out_of_sync(struct drbd_work *w, int cancel)
1264 {
1265 struct drbd_request *req = container_of(w, struct drbd_request, w);
1266 struct drbd_conf *mdev = w->mdev;
1267 int err;
1268
1269 if (unlikely(cancel)) {
1270 req_mod(req, SEND_CANCELED);
1271 return 0;
1272 }
1273
1274 err = drbd_send_out_of_sync(mdev, req);
1275 req_mod(req, OOS_HANDED_TO_NETWORK);
1276
1277 return err;
1278 }
1279
1280 /**
1281 * w_send_dblock() - Worker callback to send a P_DATA packet in order to mirror a write request
1282 * @mdev: DRBD device.
1283 * @w: work object.
1284 * @cancel: The connection will be closed anyways
1285 */
1286 int w_send_dblock(struct drbd_work *w, int cancel)
1287 {
1288 struct drbd_request *req = container_of(w, struct drbd_request, w);
1289 struct drbd_conf *mdev = w->mdev;
1290 int err;
1291
1292 if (unlikely(cancel)) {
1293 req_mod(req, SEND_CANCELED);
1294 return 0;
1295 }
1296
1297 err = drbd_send_dblock(mdev, req);
1298 req_mod(req, err ? SEND_FAILED : HANDED_OVER_TO_NETWORK);
1299
1300 return err;
1301 }
1302
1303 /**
1304 * w_send_read_req() - Worker callback to send a read request (P_DATA_REQUEST) packet
1305 * @mdev: DRBD device.
1306 * @w: work object.
1307 * @cancel: The connection will be closed anyways
1308 */
1309 int w_send_read_req(struct drbd_work *w, int cancel)
1310 {
1311 struct drbd_request *req = container_of(w, struct drbd_request, w);
1312 struct drbd_conf *mdev = w->mdev;
1313 int err;
1314
1315 if (unlikely(cancel)) {
1316 req_mod(req, SEND_CANCELED);
1317 return 0;
1318 }
1319
1320 err = drbd_send_drequest(mdev, P_DATA_REQUEST, req->i.sector, req->i.size,
1321 (unsigned long)req);
1322
1323 req_mod(req, err ? SEND_FAILED : HANDED_OVER_TO_NETWORK);
1324
1325 return err;
1326 }
1327
1328 int w_restart_disk_io(struct drbd_work *w, int cancel)
1329 {
1330 struct drbd_request *req = container_of(w, struct drbd_request, w);
1331 struct drbd_conf *mdev = w->mdev;
1332
1333 if (bio_data_dir(req->master_bio) == WRITE && req->rq_state & RQ_IN_ACT_LOG)
1334 drbd_al_begin_io(mdev, &req->i);
1335 /* Calling drbd_al_begin_io() out of the worker might deadlocks
1336 theoretically. Practically it can not deadlock, since this is
1337 only used when unfreezing IOs. All the extents of the requests
1338 that made it into the TL are already active */
1339
1340 drbd_req_make_private_bio(req, req->master_bio);
1341 req->private_bio->bi_bdev = mdev->ldev->backing_bdev;
1342 generic_make_request(req->private_bio);
1343
1344 return 0;
1345 }
1346
1347 static int _drbd_may_sync_now(struct drbd_conf *mdev)
1348 {
1349 struct drbd_conf *odev = mdev;
1350 int resync_after;
1351
1352 while (1) {
1353 if (!odev->ldev)
1354 return 1;
1355 rcu_read_lock();
1356 resync_after = rcu_dereference(odev->ldev->disk_conf)->resync_after;
1357 rcu_read_unlock();
1358 if (resync_after == -1)
1359 return 1;
1360 odev = minor_to_mdev(resync_after);
1361 if (!expect(odev))
1362 return 1;
1363 if ((odev->state.conn >= C_SYNC_SOURCE &&
1364 odev->state.conn <= C_PAUSED_SYNC_T) ||
1365 odev->state.aftr_isp || odev->state.peer_isp ||
1366 odev->state.user_isp)
1367 return 0;
1368 }
1369 }
1370
1371 /**
1372 * _drbd_pause_after() - Pause resync on all devices that may not resync now
1373 * @mdev: DRBD device.
1374 *
1375 * Called from process context only (admin command and after_state_ch).
1376 */
1377 static int _drbd_pause_after(struct drbd_conf *mdev)
1378 {
1379 struct drbd_conf *odev;
1380 int i, rv = 0;
1381
1382 rcu_read_lock();
1383 idr_for_each_entry(&minors, odev, i) {
1384 if (odev->state.conn == C_STANDALONE && odev->state.disk == D_DISKLESS)
1385 continue;
1386 if (!_drbd_may_sync_now(odev))
1387 rv |= (__drbd_set_state(_NS(odev, aftr_isp, 1), CS_HARD, NULL)
1388 != SS_NOTHING_TO_DO);
1389 }
1390 rcu_read_unlock();
1391
1392 return rv;
1393 }
1394
1395 /**
1396 * _drbd_resume_next() - Resume resync on all devices that may resync now
1397 * @mdev: DRBD device.
1398 *
1399 * Called from process context only (admin command and worker).
1400 */
1401 static int _drbd_resume_next(struct drbd_conf *mdev)
1402 {
1403 struct drbd_conf *odev;
1404 int i, rv = 0;
1405
1406 rcu_read_lock();
1407 idr_for_each_entry(&minors, odev, i) {
1408 if (odev->state.conn == C_STANDALONE && odev->state.disk == D_DISKLESS)
1409 continue;
1410 if (odev->state.aftr_isp) {
1411 if (_drbd_may_sync_now(odev))
1412 rv |= (__drbd_set_state(_NS(odev, aftr_isp, 0),
1413 CS_HARD, NULL)
1414 != SS_NOTHING_TO_DO) ;
1415 }
1416 }
1417 rcu_read_unlock();
1418 return rv;
1419 }
1420
1421 void resume_next_sg(struct drbd_conf *mdev)
1422 {
1423 write_lock_irq(&global_state_lock);
1424 _drbd_resume_next(mdev);
1425 write_unlock_irq(&global_state_lock);
1426 }
1427
1428 void suspend_other_sg(struct drbd_conf *mdev)
1429 {
1430 write_lock_irq(&global_state_lock);
1431 _drbd_pause_after(mdev);
1432 write_unlock_irq(&global_state_lock);
1433 }
1434
1435 /* caller must hold global_state_lock */
1436 enum drbd_ret_code drbd_resync_after_valid(struct drbd_conf *mdev, int o_minor)
1437 {
1438 struct drbd_conf *odev;
1439 int resync_after;
1440
1441 if (o_minor == -1)
1442 return NO_ERROR;
1443 if (o_minor < -1 || minor_to_mdev(o_minor) == NULL)
1444 return ERR_RESYNC_AFTER;
1445
1446 /* check for loops */
1447 odev = minor_to_mdev(o_minor);
1448 while (1) {
1449 if (odev == mdev)
1450 return ERR_RESYNC_AFTER_CYCLE;
1451
1452 rcu_read_lock();
1453 resync_after = rcu_dereference(odev->ldev->disk_conf)->resync_after;
1454 rcu_read_unlock();
1455 /* dependency chain ends here, no cycles. */
1456 if (resync_after == -1)
1457 return NO_ERROR;
1458
1459 /* follow the dependency chain */
1460 odev = minor_to_mdev(resync_after);
1461 }
1462 }
1463
1464 /* caller must hold global_state_lock */
1465 void drbd_resync_after_changed(struct drbd_conf *mdev)
1466 {
1467 int changes;
1468
1469 do {
1470 changes = _drbd_pause_after(mdev);
1471 changes |= _drbd_resume_next(mdev);
1472 } while (changes);
1473 }
1474
1475 void drbd_rs_controller_reset(struct drbd_conf *mdev)
1476 {
1477 struct fifo_buffer *plan;
1478
1479 atomic_set(&mdev->rs_sect_in, 0);
1480 atomic_set(&mdev->rs_sect_ev, 0);
1481 mdev->rs_in_flight = 0;
1482
1483 /* Updating the RCU protected object in place is necessary since
1484 this function gets called from atomic context.
1485 It is valid since all other updates also lead to an completely
1486 empty fifo */
1487 rcu_read_lock();
1488 plan = rcu_dereference(mdev->rs_plan_s);
1489 plan->total = 0;
1490 fifo_set(plan, 0);
1491 rcu_read_unlock();
1492 }
1493
1494 void start_resync_timer_fn(unsigned long data)
1495 {
1496 struct drbd_conf *mdev = (struct drbd_conf *) data;
1497
1498 drbd_queue_work(&mdev->tconn->data.work, &mdev->start_resync_work);
1499 }
1500
1501 int w_start_resync(struct drbd_work *w, int cancel)
1502 {
1503 struct drbd_conf *mdev = w->mdev;
1504
1505 if (atomic_read(&mdev->unacked_cnt) || atomic_read(&mdev->rs_pending_cnt)) {
1506 dev_warn(DEV, "w_start_resync later...\n");
1507 mdev->start_resync_timer.expires = jiffies + HZ/10;
1508 add_timer(&mdev->start_resync_timer);
1509 return 0;
1510 }
1511
1512 drbd_start_resync(mdev, C_SYNC_SOURCE);
1513 clear_bit(AHEAD_TO_SYNC_SOURCE, &mdev->current_epoch->flags);
1514 return 0;
1515 }
1516
1517 /**
1518 * drbd_start_resync() - Start the resync process
1519 * @mdev: DRBD device.
1520 * @side: Either C_SYNC_SOURCE or C_SYNC_TARGET
1521 *
1522 * This function might bring you directly into one of the
1523 * C_PAUSED_SYNC_* states.
1524 */
1525 void drbd_start_resync(struct drbd_conf *mdev, enum drbd_conns side)
1526 {
1527 union drbd_state ns;
1528 int r;
1529
1530 if (mdev->state.conn >= C_SYNC_SOURCE && mdev->state.conn < C_AHEAD) {
1531 dev_err(DEV, "Resync already running!\n");
1532 return;
1533 }
1534
1535 if (mdev->state.conn < C_AHEAD) {
1536 /* In case a previous resync run was aborted by an IO error/detach on the peer. */
1537 drbd_rs_cancel_all(mdev);
1538 /* This should be done when we abort the resync. We definitely do not
1539 want to have this for connections going back and forth between
1540 Ahead/Behind and SyncSource/SyncTarget */
1541 }
1542
1543 if (!test_bit(B_RS_H_DONE, &mdev->flags)) {
1544 if (side == C_SYNC_TARGET) {
1545 /* Since application IO was locked out during C_WF_BITMAP_T and
1546 C_WF_SYNC_UUID we are still unmodified. Before going to C_SYNC_TARGET
1547 we check that we might make the data inconsistent. */
1548 r = drbd_khelper(mdev, "before-resync-target");
1549 r = (r >> 8) & 0xff;
1550 if (r > 0) {
1551 dev_info(DEV, "before-resync-target handler returned %d, "
1552 "dropping connection.\n", r);
1553 conn_request_state(mdev->tconn, NS(conn, C_DISCONNECTING), CS_HARD);
1554 return;
1555 }
1556 } else /* C_SYNC_SOURCE */ {
1557 r = drbd_khelper(mdev, "before-resync-source");
1558 r = (r >> 8) & 0xff;
1559 if (r > 0) {
1560 if (r == 3) {
1561 dev_info(DEV, "before-resync-source handler returned %d, "
1562 "ignoring. Old userland tools?", r);
1563 } else {
1564 dev_info(DEV, "before-resync-source handler returned %d, "
1565 "dropping connection.\n", r);
1566 conn_request_state(mdev->tconn, NS(conn, C_DISCONNECTING), CS_HARD);
1567 return;
1568 }
1569 }
1570 }
1571 }
1572
1573 if (current == mdev->tconn->worker.task) {
1574 /* The worker should not sleep waiting for state_mutex,
1575 that can take long */
1576 if (!mutex_trylock(mdev->state_mutex)) {
1577 set_bit(B_RS_H_DONE, &mdev->flags);
1578 mdev->start_resync_timer.expires = jiffies + HZ/5;
1579 add_timer(&mdev->start_resync_timer);
1580 return;
1581 }
1582 } else {
1583 mutex_lock(mdev->state_mutex);
1584 }
1585 clear_bit(B_RS_H_DONE, &mdev->flags);
1586
1587 if (!get_ldev_if_state(mdev, D_NEGOTIATING)) {
1588 mutex_unlock(mdev->state_mutex);
1589 return;
1590 }
1591
1592 write_lock_irq(&global_state_lock);
1593 ns = drbd_read_state(mdev);
1594
1595 ns.aftr_isp = !_drbd_may_sync_now(mdev);
1596
1597 ns.conn = side;
1598
1599 if (side == C_SYNC_TARGET)
1600 ns.disk = D_INCONSISTENT;
1601 else /* side == C_SYNC_SOURCE */
1602 ns.pdsk = D_INCONSISTENT;
1603
1604 r = __drbd_set_state(mdev, ns, CS_VERBOSE, NULL);
1605 ns = drbd_read_state(mdev);
1606
1607 if (ns.conn < C_CONNECTED)
1608 r = SS_UNKNOWN_ERROR;
1609
1610 if (r == SS_SUCCESS) {
1611 unsigned long tw = drbd_bm_total_weight(mdev);
1612 unsigned long now = jiffies;
1613 int i;
1614
1615 mdev->rs_failed = 0;
1616 mdev->rs_paused = 0;
1617 mdev->rs_same_csum = 0;
1618 mdev->rs_last_events = 0;
1619 mdev->rs_last_sect_ev = 0;
1620 mdev->rs_total = tw;
1621 mdev->rs_start = now;
1622 for (i = 0; i < DRBD_SYNC_MARKS; i++) {
1623 mdev->rs_mark_left[i] = tw;
1624 mdev->rs_mark_time[i] = now;
1625 }
1626 _drbd_pause_after(mdev);
1627 }
1628 write_unlock_irq(&global_state_lock);
1629
1630 if (r == SS_SUCCESS) {
1631 dev_info(DEV, "Began resync as %s (will sync %lu KB [%lu bits set]).\n",
1632 drbd_conn_str(ns.conn),
1633 (unsigned long) mdev->rs_total << (BM_BLOCK_SHIFT-10),
1634 (unsigned long) mdev->rs_total);
1635 if (side == C_SYNC_TARGET)
1636 mdev->bm_resync_fo = 0;
1637
1638 /* Since protocol 96, we must serialize drbd_gen_and_send_sync_uuid
1639 * with w_send_oos, or the sync target will get confused as to
1640 * how much bits to resync. We cannot do that always, because for an
1641 * empty resync and protocol < 95, we need to do it here, as we call
1642 * drbd_resync_finished from here in that case.
1643 * We drbd_gen_and_send_sync_uuid here for protocol < 96,
1644 * and from after_state_ch otherwise. */
1645 if (side == C_SYNC_SOURCE && mdev->tconn->agreed_pro_version < 96)
1646 drbd_gen_and_send_sync_uuid(mdev);
1647
1648 if (mdev->tconn->agreed_pro_version < 95 && mdev->rs_total == 0) {
1649 /* This still has a race (about when exactly the peers
1650 * detect connection loss) that can lead to a full sync
1651 * on next handshake. In 8.3.9 we fixed this with explicit
1652 * resync-finished notifications, but the fix
1653 * introduces a protocol change. Sleeping for some
1654 * time longer than the ping interval + timeout on the
1655 * SyncSource, to give the SyncTarget the chance to
1656 * detect connection loss, then waiting for a ping
1657 * response (implicit in drbd_resync_finished) reduces
1658 * the race considerably, but does not solve it. */
1659 if (side == C_SYNC_SOURCE) {
1660 struct net_conf *nc;
1661 int timeo;
1662
1663 rcu_read_lock();
1664 nc = rcu_dereference(mdev->tconn->net_conf);
1665 timeo = nc->ping_int * HZ + nc->ping_timeo * HZ / 9;
1666 rcu_read_unlock();
1667 schedule_timeout_interruptible(timeo);
1668 }
1669 drbd_resync_finished(mdev);
1670 }
1671
1672 drbd_rs_controller_reset(mdev);
1673 /* ns.conn may already be != mdev->state.conn,
1674 * we may have been paused in between, or become paused until
1675 * the timer triggers.
1676 * No matter, that is handled in resync_timer_fn() */
1677 if (ns.conn == C_SYNC_TARGET)
1678 mod_timer(&mdev->resync_timer, jiffies);
1679
1680 drbd_md_sync(mdev);
1681 }
1682 put_ldev(mdev);
1683 mutex_unlock(mdev->state_mutex);
1684 }
1685
1686 int drbd_worker(struct drbd_thread *thi)
1687 {
1688 struct drbd_tconn *tconn = thi->tconn;
1689 struct drbd_work *w = NULL;
1690 struct drbd_conf *mdev;
1691 struct net_conf *nc;
1692 LIST_HEAD(work_list);
1693 int vnr, intr = 0;
1694 int cork;
1695
1696 while (get_t_state(thi) == RUNNING) {
1697 drbd_thread_current_set_cpu(thi);
1698
1699 if (down_trylock(&tconn->data.work.s)) {
1700 mutex_lock(&tconn->data.mutex);
1701
1702 rcu_read_lock();
1703 nc = rcu_dereference(tconn->net_conf);
1704 cork = nc ? nc->tcp_cork : 0;
1705 rcu_read_unlock();
1706
1707 if (tconn->data.socket && cork)
1708 drbd_tcp_uncork(tconn->data.socket);
1709 mutex_unlock(&tconn->data.mutex);
1710
1711 intr = down_interruptible(&tconn->data.work.s);
1712
1713 mutex_lock(&tconn->data.mutex);
1714 if (tconn->data.socket && cork)
1715 drbd_tcp_cork(tconn->data.socket);
1716 mutex_unlock(&tconn->data.mutex);
1717 }
1718
1719 if (intr) {
1720 flush_signals(current);
1721 if (get_t_state(thi) == RUNNING) {
1722 conn_warn(tconn, "Worker got an unexpected signal\n");
1723 continue;
1724 }
1725 break;
1726 }
1727
1728 if (get_t_state(thi) != RUNNING)
1729 break;
1730 /* With this break, we have done a down() but not consumed
1731 the entry from the list. The cleanup code takes care of
1732 this... */
1733
1734 w = NULL;
1735 spin_lock_irq(&tconn->data.work.q_lock);
1736 if (list_empty(&tconn->data.work.q)) {
1737 /* something terribly wrong in our logic.
1738 * we were able to down() the semaphore,
1739 * but the list is empty... doh.
1740 *
1741 * what is the best thing to do now?
1742 * try again from scratch, restarting the receiver,
1743 * asender, whatnot? could break even more ugly,
1744 * e.g. when we are primary, but no good local data.
1745 *
1746 * I'll try to get away just starting over this loop.
1747 */
1748 conn_warn(tconn, "Work list unexpectedly empty\n");
1749 spin_unlock_irq(&tconn->data.work.q_lock);
1750 continue;
1751 }
1752 w = list_entry(tconn->data.work.q.next, struct drbd_work, list);
1753 list_del_init(&w->list);
1754 spin_unlock_irq(&tconn->data.work.q_lock);
1755
1756 if (w->cb(w, tconn->cstate < C_WF_REPORT_PARAMS)) {
1757 /* dev_warn(DEV, "worker: a callback failed! \n"); */
1758 if (tconn->cstate >= C_WF_REPORT_PARAMS)
1759 conn_request_state(tconn, NS(conn, C_NETWORK_FAILURE), CS_HARD);
1760 }
1761 }
1762
1763 spin_lock_irq(&tconn->data.work.q_lock);
1764 while (!list_empty(&tconn->data.work.q)) {
1765 list_splice_init(&tconn->data.work.q, &work_list);
1766 spin_unlock_irq(&tconn->data.work.q_lock);
1767
1768 while (!list_empty(&work_list)) {
1769 w = list_entry(work_list.next, struct drbd_work, list);
1770 list_del_init(&w->list);
1771 w->cb(w, 1);
1772 }
1773
1774 spin_lock_irq(&tconn->data.work.q_lock);
1775 }
1776 sema_init(&tconn->data.work.s, 0);
1777 /* DANGEROUS race: if someone did queue his work within the spinlock,
1778 * but up() ed outside the spinlock, we could get an up() on the
1779 * semaphore without corresponding list entry.
1780 * So don't do that.
1781 */
1782 spin_unlock_irq(&tconn->data.work.q_lock);
1783
1784 rcu_read_lock();
1785 idr_for_each_entry(&tconn->volumes, mdev, vnr) {
1786 D_ASSERT(mdev->state.disk == D_DISKLESS && mdev->state.conn == C_STANDALONE);
1787 kref_get(&mdev->kref);
1788 rcu_read_unlock();
1789 drbd_mdev_cleanup(mdev);
1790 kref_put(&mdev->kref, &drbd_minor_destroy);
1791 rcu_read_lock();
1792 }
1793 rcu_read_unlock();
1794
1795 return 0;
1796 }
This page took 0.100602 seconds and 5 git commands to generate.