drbd: don't count sendpage()d pages only referenced by tcp as in use
[deliverable/linux.git] / drivers / block / drbd / drbd_worker.c
CommitLineData
b411b363
PR
1/*
2 drbd_worker.c
3
4 This file is part of DRBD by Philipp Reisner and Lars Ellenberg.
5
6 Copyright (C) 2001-2008, LINBIT Information Technologies GmbH.
7 Copyright (C) 1999-2008, Philipp Reisner <philipp.reisner@linbit.com>.
8 Copyright (C) 2002-2008, Lars Ellenberg <lars.ellenberg@linbit.com>.
9
10 drbd is free software; you can redistribute it and/or modify
11 it under the terms of the GNU General Public License as published by
12 the Free Software Foundation; either version 2, or (at your option)
13 any later version.
14
15 drbd is distributed in the hope that it will be useful,
16 but WITHOUT ANY WARRANTY; without even the implied warranty of
17 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18 GNU General Public License for more details.
19
20 You should have received a copy of the GNU General Public License
21 along with drbd; see the file COPYING. If not, write to
22 the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
23
24 */
25
b411b363 26#include <linux/module.h>
b411b363
PR
27#include <linux/drbd.h>
28#include <linux/sched.h>
29#include <linux/smp_lock.h>
30#include <linux/wait.h>
31#include <linux/mm.h>
32#include <linux/memcontrol.h>
33#include <linux/mm_inline.h>
34#include <linux/slab.h>
35#include <linux/random.h>
b411b363
PR
36#include <linux/string.h>
37#include <linux/scatterlist.h>
38
39#include "drbd_int.h"
40#include "drbd_req.h"
b411b363 41
b411b363
PR
42static int w_make_ov_request(struct drbd_conf *mdev, struct drbd_work *w, int cancel);
43
44
45
46/* defined here:
47 drbd_md_io_complete
45bb912b 48 drbd_endio_sec
b411b363
PR
49 drbd_endio_pri
50
51 * more endio handlers:
52 atodb_endio in drbd_actlog.c
53 drbd_bm_async_io_complete in drbd_bitmap.c
54
55 * For all these callbacks, note the following:
56 * The callbacks will be called in irq context by the IDE drivers,
57 * and in Softirqs/Tasklets/BH context by the SCSI drivers.
58 * Try to get the locking right :)
59 *
60 */
61
62
63/* About the global_state_lock
64 Each state transition on an device holds a read lock. In case we have
65 to evaluate the sync after dependencies, we grab a write lock, because
66 we need stable states on all devices for that. */
67rwlock_t global_state_lock;
68
69/* used for synchronous meta data and bitmap IO
70 * submitted by drbd_md_sync_page_io()
71 */
72void drbd_md_io_complete(struct bio *bio, int error)
73{
74 struct drbd_md_io *md_io;
75
76 md_io = (struct drbd_md_io *)bio->bi_private;
77 md_io->error = error;
78
b411b363
PR
79 complete(&md_io->event);
80}
81
82/* reads on behalf of the partner,
83 * "submitted" by the receiver
84 */
45bb912b 85void drbd_endio_read_sec_final(struct drbd_epoch_entry *e) __releases(local)
b411b363
PR
86{
87 unsigned long flags = 0;
45bb912b 88 struct drbd_conf *mdev = e->mdev;
b411b363
PR
89
90 D_ASSERT(e->block_id != ID_VACANT);
91
b411b363
PR
92 spin_lock_irqsave(&mdev->req_lock, flags);
93 mdev->read_cnt += e->size >> 9;
94 list_del(&e->w.list);
95 if (list_empty(&mdev->read_ee))
96 wake_up(&mdev->ee_wait);
45bb912b
LE
97 if (test_bit(__EE_WAS_ERROR, &e->flags))
98 __drbd_chk_io_error(mdev, FALSE);
b411b363
PR
99 spin_unlock_irqrestore(&mdev->req_lock, flags);
100
b411b363
PR
101 drbd_queue_work(&mdev->data.work, &e->w);
102 put_ldev(mdev);
b411b363
PR
103}
104
45bb912b
LE
105static int is_failed_barrier(int ee_flags)
106{
107 return (ee_flags & (EE_IS_BARRIER|EE_WAS_ERROR|EE_RESUBMITTED))
108 == (EE_IS_BARRIER|EE_WAS_ERROR);
109}
110
b411b363 111/* writes on behalf of the partner, or resync writes,
45bb912b
LE
112 * "submitted" by the receiver, final stage. */
113static void drbd_endio_write_sec_final(struct drbd_epoch_entry *e) __releases(local)
b411b363
PR
114{
115 unsigned long flags = 0;
45bb912b 116 struct drbd_conf *mdev = e->mdev;
b411b363
PR
117 sector_t e_sector;
118 int do_wake;
119 int is_syncer_req;
120 int do_al_complete_io;
b411b363 121
45bb912b
LE
122 /* if this is a failed barrier request, disable use of barriers,
123 * and schedule for resubmission */
124 if (is_failed_barrier(e->flags)) {
b411b363
PR
125 drbd_bump_write_ordering(mdev, WO_bdev_flush);
126 spin_lock_irqsave(&mdev->req_lock, flags);
127 list_del(&e->w.list);
fc8ce194 128 e->flags = (e->flags & ~EE_WAS_ERROR) | EE_RESUBMITTED;
b411b363
PR
129 e->w.cb = w_e_reissue;
130 /* put_ldev actually happens below, once we come here again. */
131 __release(local);
132 spin_unlock_irqrestore(&mdev->req_lock, flags);
133 drbd_queue_work(&mdev->data.work, &e->w);
134 return;
135 }
136
137 D_ASSERT(e->block_id != ID_VACANT);
138
b411b363
PR
139 /* after we moved e to done_ee,
140 * we may no longer access it,
141 * it may be freed/reused already!
142 * (as soon as we release the req_lock) */
143 e_sector = e->sector;
144 do_al_complete_io = e->flags & EE_CALL_AL_COMPLETE_IO;
45bb912b 145 is_syncer_req = is_syncer_block_id(e->block_id);
b411b363 146
45bb912b
LE
147 spin_lock_irqsave(&mdev->req_lock, flags);
148 mdev->writ_cnt += e->size >> 9;
b411b363
PR
149 list_del(&e->w.list); /* has been on active_ee or sync_ee */
150 list_add_tail(&e->w.list, &mdev->done_ee);
151
b411b363
PR
152 /* No hlist_del_init(&e->colision) here, we did not send the Ack yet,
153 * neither did we wake possibly waiting conflicting requests.
154 * done from "drbd_process_done_ee" within the appropriate w.cb
155 * (e_end_block/e_end_resync_block) or from _drbd_clear_done_ee */
156
157 do_wake = is_syncer_req
158 ? list_empty(&mdev->sync_ee)
159 : list_empty(&mdev->active_ee);
160
45bb912b 161 if (test_bit(__EE_WAS_ERROR, &e->flags))
b411b363
PR
162 __drbd_chk_io_error(mdev, FALSE);
163 spin_unlock_irqrestore(&mdev->req_lock, flags);
164
165 if (is_syncer_req)
166 drbd_rs_complete_io(mdev, e_sector);
167
168 if (do_wake)
169 wake_up(&mdev->ee_wait);
170
171 if (do_al_complete_io)
172 drbd_al_complete_io(mdev, e_sector);
173
174 wake_asender(mdev);
175 put_ldev(mdev);
45bb912b 176}
b411b363 177
45bb912b
LE
178/* writes on behalf of the partner, or resync writes,
179 * "submitted" by the receiver.
180 */
181void drbd_endio_sec(struct bio *bio, int error)
182{
183 struct drbd_epoch_entry *e = bio->bi_private;
184 struct drbd_conf *mdev = e->mdev;
185 int uptodate = bio_flagged(bio, BIO_UPTODATE);
186 int is_write = bio_data_dir(bio) == WRITE;
187
188 if (error)
189 dev_warn(DEV, "%s: error=%d s=%llus\n",
190 is_write ? "write" : "read", error,
191 (unsigned long long)e->sector);
192 if (!error && !uptodate) {
193 dev_warn(DEV, "%s: setting error to -EIO s=%llus\n",
194 is_write ? "write" : "read",
195 (unsigned long long)e->sector);
196 /* strange behavior of some lower level drivers...
197 * fail the request by clearing the uptodate flag,
198 * but do not return any error?! */
199 error = -EIO;
200 }
201
202 if (error)
203 set_bit(__EE_WAS_ERROR, &e->flags);
204
205 bio_put(bio); /* no need for the bio anymore */
206 if (atomic_dec_and_test(&e->pending_bios)) {
207 if (is_write)
208 drbd_endio_write_sec_final(e);
209 else
210 drbd_endio_read_sec_final(e);
211 }
b411b363
PR
212}
213
214/* read, readA or write requests on R_PRIMARY coming from drbd_make_request
215 */
216void drbd_endio_pri(struct bio *bio, int error)
217{
b411b363
PR
218 struct drbd_request *req = bio->bi_private;
219 struct drbd_conf *mdev = req->mdev;
b411b363
PR
220 enum drbd_req_event what;
221 int uptodate = bio_flagged(bio, BIO_UPTODATE);
222
b411b363
PR
223 if (!error && !uptodate) {
224 dev_warn(DEV, "p %s: setting error to -EIO\n",
225 bio_data_dir(bio) == WRITE ? "write" : "read");
226 /* strange behavior of some lower level drivers...
227 * fail the request by clearing the uptodate flag,
228 * but do not return any error?! */
229 error = -EIO;
230 }
231
b411b363
PR
232 /* to avoid recursion in __req_mod */
233 if (unlikely(error)) {
234 what = (bio_data_dir(bio) == WRITE)
235 ? write_completed_with_error
5c3c7e64 236 : (bio_rw(bio) == READ)
b411b363
PR
237 ? read_completed_with_error
238 : read_ahead_completed_with_error;
239 } else
240 what = completed_ok;
241
242 bio_put(req->private_bio);
243 req->private_bio = ERR_PTR(error);
244
0f0601f4 245 req_mod(req, what);
b411b363
PR
246}
247
b411b363
PR
248int w_read_retry_remote(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
249{
250 struct drbd_request *req = container_of(w, struct drbd_request, w);
251
252 /* We should not detach for read io-error,
253 * but try to WRITE the P_DATA_REPLY to the failed location,
254 * to give the disk the chance to relocate that block */
255
256 spin_lock_irq(&mdev->req_lock);
d255e5ff
LE
257 if (cancel || mdev->state.pdsk != D_UP_TO_DATE) {
258 _req_mod(req, read_retry_remote_canceled);
b411b363 259 spin_unlock_irq(&mdev->req_lock);
b411b363
PR
260 return 1;
261 }
262 spin_unlock_irq(&mdev->req_lock);
263
264 return w_send_read_req(mdev, w, 0);
265}
266
267int w_resync_inactive(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
268{
269 ERR_IF(cancel) return 1;
270 dev_err(DEV, "resync inactive, but callback triggered??\n");
271 return 1; /* Simply ignore this! */
272}
273
45bb912b
LE
274void drbd_csum_ee(struct drbd_conf *mdev, struct crypto_hash *tfm, struct drbd_epoch_entry *e, void *digest)
275{
276 struct hash_desc desc;
277 struct scatterlist sg;
278 struct page *page = e->pages;
279 struct page *tmp;
280 unsigned len;
281
282 desc.tfm = tfm;
283 desc.flags = 0;
284
285 sg_init_table(&sg, 1);
286 crypto_hash_init(&desc);
287
288 while ((tmp = page_chain_next(page))) {
289 /* all but the last page will be fully used */
290 sg_set_page(&sg, page, PAGE_SIZE, 0);
291 crypto_hash_update(&desc, &sg, sg.length);
292 page = tmp;
293 }
294 /* and now the last, possibly only partially used page */
295 len = e->size & (PAGE_SIZE - 1);
296 sg_set_page(&sg, page, len ?: PAGE_SIZE, 0);
297 crypto_hash_update(&desc, &sg, sg.length);
298 crypto_hash_final(&desc, digest);
299}
300
301void drbd_csum_bio(struct drbd_conf *mdev, struct crypto_hash *tfm, struct bio *bio, void *digest)
b411b363
PR
302{
303 struct hash_desc desc;
304 struct scatterlist sg;
305 struct bio_vec *bvec;
306 int i;
307
308 desc.tfm = tfm;
309 desc.flags = 0;
310
311 sg_init_table(&sg, 1);
312 crypto_hash_init(&desc);
313
314 __bio_for_each_segment(bvec, bio, i, 0) {
315 sg_set_page(&sg, bvec->bv_page, bvec->bv_len, bvec->bv_offset);
316 crypto_hash_update(&desc, &sg, sg.length);
317 }
318 crypto_hash_final(&desc, digest);
319}
320
321static int w_e_send_csum(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
322{
323 struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w);
324 int digest_size;
325 void *digest;
326 int ok;
327
328 D_ASSERT(e->block_id == DRBD_MAGIC + 0xbeef);
329
330 if (unlikely(cancel)) {
331 drbd_free_ee(mdev, e);
332 return 1;
333 }
334
45bb912b 335 if (likely((e->flags & EE_WAS_ERROR) == 0)) {
b411b363
PR
336 digest_size = crypto_hash_digestsize(mdev->csums_tfm);
337 digest = kmalloc(digest_size, GFP_NOIO);
338 if (digest) {
45bb912b 339 drbd_csum_ee(mdev, mdev->csums_tfm, e, digest);
b411b363
PR
340
341 inc_rs_pending(mdev);
342 ok = drbd_send_drequest_csum(mdev,
343 e->sector,
344 e->size,
345 digest,
346 digest_size,
347 P_CSUM_RS_REQUEST);
348 kfree(digest);
349 } else {
350 dev_err(DEV, "kmalloc() of digest failed.\n");
351 ok = 0;
352 }
353 } else
354 ok = 1;
355
356 drbd_free_ee(mdev, e);
357
358 if (unlikely(!ok))
359 dev_err(DEV, "drbd_send_drequest(..., csum) failed\n");
360 return ok;
361}
362
363#define GFP_TRY (__GFP_HIGHMEM | __GFP_NOWARN)
364
365static int read_for_csum(struct drbd_conf *mdev, sector_t sector, int size)
366{
367 struct drbd_epoch_entry *e;
368
369 if (!get_ldev(mdev))
80a40e43 370 return -EIO;
b411b363 371
0f0601f4
LE
372 if (drbd_rs_should_slow_down(mdev))
373 goto defer;
374
b411b363
PR
375 /* GFP_TRY, because if there is no memory available right now, this may
376 * be rescheduled for later. It is "only" background resync, after all. */
377 e = drbd_alloc_ee(mdev, DRBD_MAGIC+0xbeef, sector, size, GFP_TRY);
45bb912b 378 if (!e)
80a40e43 379 goto defer;
b411b363 380
80a40e43 381 e->w.cb = w_e_send_csum;
b411b363
PR
382 spin_lock_irq(&mdev->req_lock);
383 list_add(&e->w.list, &mdev->read_ee);
384 spin_unlock_irq(&mdev->req_lock);
385
0f0601f4 386 atomic_add(size >> 9, &mdev->rs_sect_ev);
45bb912b 387 if (drbd_submit_ee(mdev, e, READ, DRBD_FAULT_RS_RD) == 0)
80a40e43 388 return 0;
b411b363 389
45bb912b 390 drbd_free_ee(mdev, e);
80a40e43 391defer:
45bb912b 392 put_ldev(mdev);
80a40e43 393 return -EAGAIN;
b411b363
PR
394}
395
396void resync_timer_fn(unsigned long data)
397{
b411b363
PR
398 struct drbd_conf *mdev = (struct drbd_conf *) data;
399 int queue;
400
63106d3c
PR
401 queue = 1;
402 switch (mdev->state.conn) {
403 case C_VERIFY_S:
404 mdev->resync_work.cb = w_make_ov_request;
405 break;
406 case C_SYNC_TARGET:
407 mdev->resync_work.cb = w_make_resync_request;
408 break;
409 default:
b411b363
PR
410 queue = 0;
411 mdev->resync_work.cb = w_resync_inactive;
412 }
413
b411b363
PR
414 /* harmless race: list_empty outside data.work.q_lock */
415 if (list_empty(&mdev->resync_work.list) && queue)
416 drbd_queue_work(&mdev->data.work, &mdev->resync_work);
417}
418
778f271d
PR
419static void fifo_set(struct fifo_buffer *fb, int value)
420{
421 int i;
422
423 for (i = 0; i < fb->size; i++)
424 fb->values[i] += value;
425}
426
427static int fifo_push(struct fifo_buffer *fb, int value)
428{
429 int ov;
430
431 ov = fb->values[fb->head_index];
432 fb->values[fb->head_index++] = value;
433
434 if (fb->head_index >= fb->size)
435 fb->head_index = 0;
436
437 return ov;
438}
439
440static void fifo_add_val(struct fifo_buffer *fb, int value)
441{
442 int i;
443
444 for (i = 0; i < fb->size; i++)
445 fb->values[i] += value;
446}
447
448int drbd_rs_controller(struct drbd_conf *mdev)
449{
450 unsigned int sect_in; /* Number of sectors that came in since the last turn */
451 unsigned int want; /* The number of sectors we want in the proxy */
452 int req_sect; /* Number of sectors to request in this turn */
453 int correction; /* Number of sectors more we need in the proxy*/
454 int cps; /* correction per invocation of drbd_rs_controller() */
455 int steps; /* Number of time steps to plan ahead */
456 int curr_corr;
457 int max_sect;
458
459 sect_in = atomic_xchg(&mdev->rs_sect_in, 0); /* Number of sectors that came in */
460 mdev->rs_in_flight -= sect_in;
461
462 spin_lock(&mdev->peer_seq_lock); /* get an atomic view on mdev->rs_plan_s */
463
464 steps = mdev->rs_plan_s.size; /* (mdev->sync_conf.c_plan_ahead * 10 * SLEEP_TIME) / HZ; */
465
466 if (mdev->rs_in_flight + sect_in == 0) { /* At start of resync */
467 want = ((mdev->sync_conf.rate * 2 * SLEEP_TIME) / HZ) * steps;
468 } else { /* normal path */
469 want = mdev->sync_conf.c_fill_target ? mdev->sync_conf.c_fill_target :
470 sect_in * mdev->sync_conf.c_delay_target * HZ / (SLEEP_TIME * 10);
471 }
472
473 correction = want - mdev->rs_in_flight - mdev->rs_planed;
474
475 /* Plan ahead */
476 cps = correction / steps;
477 fifo_add_val(&mdev->rs_plan_s, cps);
478 mdev->rs_planed += cps * steps;
479
480 /* What we do in this step */
481 curr_corr = fifo_push(&mdev->rs_plan_s, 0);
482 spin_unlock(&mdev->peer_seq_lock);
483 mdev->rs_planed -= curr_corr;
484
485 req_sect = sect_in + curr_corr;
486 if (req_sect < 0)
487 req_sect = 0;
488
489 max_sect = (mdev->sync_conf.c_max_rate * 2 * SLEEP_TIME) / HZ;
490 if (req_sect > max_sect)
491 req_sect = max_sect;
492
493 /*
494 dev_warn(DEV, "si=%u if=%d wa=%u co=%d st=%d cps=%d pl=%d cc=%d rs=%d\n",
495 sect_in, mdev->rs_in_flight, want, correction,
496 steps, cps, mdev->rs_planed, curr_corr, req_sect);
497 */
498
499 return req_sect;
500}
501
b411b363
PR
502int w_make_resync_request(struct drbd_conf *mdev,
503 struct drbd_work *w, int cancel)
504{
505 unsigned long bit;
506 sector_t sector;
507 const sector_t capacity = drbd_get_capacity(mdev->this_bdev);
bb3d000c 508 int max_segment_size;
0f0601f4 509 int number, rollback_i, size, pe, mx;
b411b363 510 int align, queued, sndbuf;
0f0601f4 511 int i = 0;
b411b363
PR
512
513 if (unlikely(cancel))
514 return 1;
515
516 if (unlikely(mdev->state.conn < C_CONNECTED)) {
517 dev_err(DEV, "Confused in w_make_resync_request()! cstate < Connected");
518 return 0;
519 }
520
521 if (mdev->state.conn != C_SYNC_TARGET)
522 dev_err(DEV, "%s in w_make_resync_request\n",
523 drbd_conn_str(mdev->state.conn));
524
525 if (!get_ldev(mdev)) {
526 /* Since we only need to access mdev->rsync a
527 get_ldev_if_state(mdev,D_FAILED) would be sufficient, but
528 to continue resync with a broken disk makes no sense at
529 all */
530 dev_err(DEV, "Disk broke down during resync!\n");
531 mdev->resync_work.cb = w_resync_inactive;
532 return 1;
533 }
534
bb3d000c
LE
535 /* starting with drbd 8.3.8, we can handle multi-bio EEs,
536 * if it should be necessary */
537 max_segment_size = mdev->agreed_pro_version < 94 ?
538 queue_max_segment_size(mdev->rq_queue) : DRBD_MAX_SEGMENT_SIZE;
539
778f271d
PR
540 if (mdev->rs_plan_s.size) { /* mdev->sync_conf.c_plan_ahead */
541 number = drbd_rs_controller(mdev) >> (BM_BLOCK_SHIFT - 9);
542 mdev->c_sync_rate = number * HZ * (BM_BLOCK_SIZE / 1024) / SLEEP_TIME;
543 } else {
544 mdev->c_sync_rate = mdev->sync_conf.rate;
545 number = SLEEP_TIME * mdev->c_sync_rate / ((BM_BLOCK_SIZE / 1024) * HZ);
546 }
0f0601f4
LE
547
548 /* Throttle resync on lower level disk activity, which may also be
549 * caused by application IO on Primary/SyncTarget.
550 * Keep this after the call to drbd_rs_controller, as that assumes
551 * to be called as precisely as possible every SLEEP_TIME,
552 * and would be confused otherwise. */
553 if (drbd_rs_should_slow_down(mdev))
554 goto requeue;
b411b363
PR
555
556 mutex_lock(&mdev->data.mutex);
557 if (mdev->data.socket)
558 mx = mdev->data.socket->sk->sk_rcvbuf / sizeof(struct p_block_req);
559 else
560 mx = 1;
561 mutex_unlock(&mdev->data.mutex);
562
563 /* For resync rates >160MB/sec, allow more pending RS requests */
564 if (number > mx)
565 mx = number;
566
567 /* Limit the number of pending RS requests to no more than the peer's receive buffer */
0f0601f4 568 pe = atomic_read(&mdev->rs_pending_cnt);
b411b363
PR
569 if ((pe + number) > mx) {
570 number = mx - pe;
571 }
572
573 for (i = 0; i < number; i++) {
574 /* Stop generating RS requests, when half of the send buffer is filled */
575 mutex_lock(&mdev->data.mutex);
576 if (mdev->data.socket) {
577 queued = mdev->data.socket->sk->sk_wmem_queued;
578 sndbuf = mdev->data.socket->sk->sk_sndbuf;
579 } else {
580 queued = 1;
581 sndbuf = 0;
582 }
583 mutex_unlock(&mdev->data.mutex);
584 if (queued > sndbuf / 2)
585 goto requeue;
586
587next_sector:
588 size = BM_BLOCK_SIZE;
589 bit = drbd_bm_find_next(mdev, mdev->bm_resync_fo);
590
591 if (bit == -1UL) {
592 mdev->bm_resync_fo = drbd_bm_bits(mdev);
593 mdev->resync_work.cb = w_resync_inactive;
594 put_ldev(mdev);
595 return 1;
596 }
597
598 sector = BM_BIT_TO_SECT(bit);
599
600 if (drbd_try_rs_begin_io(mdev, sector)) {
601 mdev->bm_resync_fo = bit;
602 goto requeue;
603 }
604 mdev->bm_resync_fo = bit + 1;
605
606 if (unlikely(drbd_bm_test_bit(mdev, bit) == 0)) {
607 drbd_rs_complete_io(mdev, sector);
608 goto next_sector;
609 }
610
611#if DRBD_MAX_SEGMENT_SIZE > BM_BLOCK_SIZE
612 /* try to find some adjacent bits.
613 * we stop if we have already the maximum req size.
614 *
615 * Additionally always align bigger requests, in order to
616 * be prepared for all stripe sizes of software RAIDs.
b411b363
PR
617 */
618 align = 1;
d207450c 619 rollback_i = i;
b411b363
PR
620 for (;;) {
621 if (size + BM_BLOCK_SIZE > max_segment_size)
622 break;
623
624 /* Be always aligned */
625 if (sector & ((1<<(align+3))-1))
626 break;
627
628 /* do not cross extent boundaries */
629 if (((bit+1) & BM_BLOCKS_PER_BM_EXT_MASK) == 0)
630 break;
631 /* now, is it actually dirty, after all?
632 * caution, drbd_bm_test_bit is tri-state for some
633 * obscure reason; ( b == 0 ) would get the out-of-band
634 * only accidentally right because of the "oddly sized"
635 * adjustment below */
636 if (drbd_bm_test_bit(mdev, bit+1) != 1)
637 break;
638 bit++;
639 size += BM_BLOCK_SIZE;
640 if ((BM_BLOCK_SIZE << align) <= size)
641 align++;
642 i++;
643 }
644 /* if we merged some,
645 * reset the offset to start the next drbd_bm_find_next from */
646 if (size > BM_BLOCK_SIZE)
647 mdev->bm_resync_fo = bit + 1;
648#endif
649
650 /* adjust very last sectors, in case we are oddly sized */
651 if (sector + (size>>9) > capacity)
652 size = (capacity-sector)<<9;
653 if (mdev->agreed_pro_version >= 89 && mdev->csums_tfm) {
654 switch (read_for_csum(mdev, sector, size)) {
80a40e43 655 case -EIO: /* Disk failure */
b411b363
PR
656 put_ldev(mdev);
657 return 0;
80a40e43 658 case -EAGAIN: /* allocation failed, or ldev busy */
b411b363
PR
659 drbd_rs_complete_io(mdev, sector);
660 mdev->bm_resync_fo = BM_SECT_TO_BIT(sector);
d207450c 661 i = rollback_i;
b411b363 662 goto requeue;
80a40e43
LE
663 case 0:
664 /* everything ok */
665 break;
666 default:
667 BUG();
b411b363
PR
668 }
669 } else {
670 inc_rs_pending(mdev);
671 if (!drbd_send_drequest(mdev, P_RS_DATA_REQUEST,
672 sector, size, ID_SYNCER)) {
673 dev_err(DEV, "drbd_send_drequest() failed, aborting...\n");
674 dec_rs_pending(mdev);
675 put_ldev(mdev);
676 return 0;
677 }
678 }
679 }
680
681 if (mdev->bm_resync_fo >= drbd_bm_bits(mdev)) {
682 /* last syncer _request_ was sent,
683 * but the P_RS_DATA_REPLY not yet received. sync will end (and
684 * next sync group will resume), as soon as we receive the last
685 * resync data block, and the last bit is cleared.
686 * until then resync "work" is "inactive" ...
687 */
688 mdev->resync_work.cb = w_resync_inactive;
689 put_ldev(mdev);
690 return 1;
691 }
692
693 requeue:
778f271d 694 mdev->rs_in_flight += (i << (BM_BLOCK_SHIFT - 9));
b411b363
PR
695 mod_timer(&mdev->resync_timer, jiffies + SLEEP_TIME);
696 put_ldev(mdev);
697 return 1;
698}
699
700static int w_make_ov_request(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
701{
702 int number, i, size;
703 sector_t sector;
704 const sector_t capacity = drbd_get_capacity(mdev->this_bdev);
705
706 if (unlikely(cancel))
707 return 1;
708
709 if (unlikely(mdev->state.conn < C_CONNECTED)) {
710 dev_err(DEV, "Confused in w_make_ov_request()! cstate < Connected");
711 return 0;
712 }
713
714 number = SLEEP_TIME*mdev->sync_conf.rate / ((BM_BLOCK_SIZE/1024)*HZ);
715 if (atomic_read(&mdev->rs_pending_cnt) > number)
716 goto requeue;
717
718 number -= atomic_read(&mdev->rs_pending_cnt);
719
720 sector = mdev->ov_position;
721 for (i = 0; i < number; i++) {
722 if (sector >= capacity) {
723 mdev->resync_work.cb = w_resync_inactive;
724 return 1;
725 }
726
727 size = BM_BLOCK_SIZE;
728
729 if (drbd_try_rs_begin_io(mdev, sector)) {
730 mdev->ov_position = sector;
731 goto requeue;
732 }
733
734 if (sector + (size>>9) > capacity)
735 size = (capacity-sector)<<9;
736
737 inc_rs_pending(mdev);
738 if (!drbd_send_ov_request(mdev, sector, size)) {
739 dec_rs_pending(mdev);
740 return 0;
741 }
742 sector += BM_SECT_PER_BIT;
743 }
744 mdev->ov_position = sector;
745
746 requeue:
747 mod_timer(&mdev->resync_timer, jiffies + SLEEP_TIME);
748 return 1;
749}
750
751
752int w_ov_finished(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
753{
754 kfree(w);
755 ov_oos_print(mdev);
756 drbd_resync_finished(mdev);
757
758 return 1;
759}
760
761static int w_resync_finished(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
762{
763 kfree(w);
764
765 drbd_resync_finished(mdev);
766
767 return 1;
768}
769
770int drbd_resync_finished(struct drbd_conf *mdev)
771{
772 unsigned long db, dt, dbdt;
773 unsigned long n_oos;
774 union drbd_state os, ns;
775 struct drbd_work *w;
776 char *khelper_cmd = NULL;
777
778 /* Remove all elements from the resync LRU. Since future actions
779 * might set bits in the (main) bitmap, then the entries in the
780 * resync LRU would be wrong. */
781 if (drbd_rs_del_all(mdev)) {
782 /* In case this is not possible now, most probably because
783 * there are P_RS_DATA_REPLY Packets lingering on the worker's
784 * queue (or even the read operations for those packets
785 * is not finished by now). Retry in 100ms. */
786
787 drbd_kick_lo(mdev);
788 __set_current_state(TASK_INTERRUPTIBLE);
789 schedule_timeout(HZ / 10);
790 w = kmalloc(sizeof(struct drbd_work), GFP_ATOMIC);
791 if (w) {
792 w->cb = w_resync_finished;
793 drbd_queue_work(&mdev->data.work, w);
794 return 1;
795 }
796 dev_err(DEV, "Warn failed to drbd_rs_del_all() and to kmalloc(w).\n");
797 }
798
799 dt = (jiffies - mdev->rs_start - mdev->rs_paused) / HZ;
800 if (dt <= 0)
801 dt = 1;
802 db = mdev->rs_total;
803 dbdt = Bit2KB(db/dt);
804 mdev->rs_paused /= HZ;
805
806 if (!get_ldev(mdev))
807 goto out;
808
809 spin_lock_irq(&mdev->req_lock);
810 os = mdev->state;
811
812 /* This protects us against multiple calls (that can happen in the presence
813 of application IO), and against connectivity loss just before we arrive here. */
814 if (os.conn <= C_CONNECTED)
815 goto out_unlock;
816
817 ns = os;
818 ns.conn = C_CONNECTED;
819
820 dev_info(DEV, "%s done (total %lu sec; paused %lu sec; %lu K/sec)\n",
821 (os.conn == C_VERIFY_S || os.conn == C_VERIFY_T) ?
822 "Online verify " : "Resync",
823 dt + mdev->rs_paused, mdev->rs_paused, dbdt);
824
825 n_oos = drbd_bm_total_weight(mdev);
826
827 if (os.conn == C_VERIFY_S || os.conn == C_VERIFY_T) {
828 if (n_oos) {
829 dev_alert(DEV, "Online verify found %lu %dk block out of sync!\n",
830 n_oos, Bit2KB(1));
831 khelper_cmd = "out-of-sync";
832 }
833 } else {
834 D_ASSERT((n_oos - mdev->rs_failed) == 0);
835
836 if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T)
837 khelper_cmd = "after-resync-target";
838
839 if (mdev->csums_tfm && mdev->rs_total) {
840 const unsigned long s = mdev->rs_same_csum;
841 const unsigned long t = mdev->rs_total;
842 const int ratio =
843 (t == 0) ? 0 :
844 (t < 100000) ? ((s*100)/t) : (s/(t/100));
845 dev_info(DEV, "%u %% had equal check sums, eliminated: %luK; "
846 "transferred %luK total %luK\n",
847 ratio,
848 Bit2KB(mdev->rs_same_csum),
849 Bit2KB(mdev->rs_total - mdev->rs_same_csum),
850 Bit2KB(mdev->rs_total));
851 }
852 }
853
854 if (mdev->rs_failed) {
855 dev_info(DEV, " %lu failed blocks\n", mdev->rs_failed);
856
857 if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T) {
858 ns.disk = D_INCONSISTENT;
859 ns.pdsk = D_UP_TO_DATE;
860 } else {
861 ns.disk = D_UP_TO_DATE;
862 ns.pdsk = D_INCONSISTENT;
863 }
864 } else {
865 ns.disk = D_UP_TO_DATE;
866 ns.pdsk = D_UP_TO_DATE;
867
868 if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T) {
869 if (mdev->p_uuid) {
870 int i;
871 for (i = UI_BITMAP ; i <= UI_HISTORY_END ; i++)
872 _drbd_uuid_set(mdev, i, mdev->p_uuid[i]);
873 drbd_uuid_set(mdev, UI_BITMAP, mdev->ldev->md.uuid[UI_CURRENT]);
874 _drbd_uuid_set(mdev, UI_CURRENT, mdev->p_uuid[UI_CURRENT]);
875 } else {
876 dev_err(DEV, "mdev->p_uuid is NULL! BUG\n");
877 }
878 }
879
880 drbd_uuid_set_bm(mdev, 0UL);
881
882 if (mdev->p_uuid) {
883 /* Now the two UUID sets are equal, update what we
884 * know of the peer. */
885 int i;
886 for (i = UI_CURRENT ; i <= UI_HISTORY_END ; i++)
887 mdev->p_uuid[i] = mdev->ldev->md.uuid[i];
888 }
889 }
890
891 _drbd_set_state(mdev, ns, CS_VERBOSE, NULL);
892out_unlock:
893 spin_unlock_irq(&mdev->req_lock);
894 put_ldev(mdev);
895out:
896 mdev->rs_total = 0;
897 mdev->rs_failed = 0;
898 mdev->rs_paused = 0;
899 mdev->ov_start_sector = 0;
900
901 if (test_and_clear_bit(WRITE_BM_AFTER_RESYNC, &mdev->flags)) {
902 dev_warn(DEV, "Writing the whole bitmap, due to failed kmalloc\n");
903 drbd_queue_bitmap_io(mdev, &drbd_bm_write, NULL, "write from resync_finished");
904 }
905
906 if (khelper_cmd)
907 drbd_khelper(mdev, khelper_cmd);
908
909 return 1;
910}
911
912/* helper */
913static void move_to_net_ee_or_free(struct drbd_conf *mdev, struct drbd_epoch_entry *e)
914{
45bb912b 915 if (drbd_ee_has_active_page(e)) {
b411b363 916 /* This might happen if sendpage() has not finished */
435f0740
LE
917 int i = DIV_ROUND_UP(e->size, PAGE_SIZE);
918 atomic_add(i, &mdev->pp_in_use_by_net);
919 atomic_sub(i, &mdev->pp_in_use);
b411b363
PR
920 spin_lock_irq(&mdev->req_lock);
921 list_add_tail(&e->w.list, &mdev->net_ee);
922 spin_unlock_irq(&mdev->req_lock);
435f0740 923 wake_up(&drbd_pp_wait);
b411b363
PR
924 } else
925 drbd_free_ee(mdev, e);
926}
927
928/**
929 * w_e_end_data_req() - Worker callback, to send a P_DATA_REPLY packet in response to a P_DATA_REQUEST
930 * @mdev: DRBD device.
931 * @w: work object.
932 * @cancel: The connection will be closed anyways
933 */
934int w_e_end_data_req(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
935{
936 struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w);
937 int ok;
938
939 if (unlikely(cancel)) {
940 drbd_free_ee(mdev, e);
941 dec_unacked(mdev);
942 return 1;
943 }
944
45bb912b 945 if (likely((e->flags & EE_WAS_ERROR) == 0)) {
b411b363
PR
946 ok = drbd_send_block(mdev, P_DATA_REPLY, e);
947 } else {
948 if (__ratelimit(&drbd_ratelimit_state))
949 dev_err(DEV, "Sending NegDReply. sector=%llus.\n",
950 (unsigned long long)e->sector);
951
952 ok = drbd_send_ack(mdev, P_NEG_DREPLY, e);
953 }
954
955 dec_unacked(mdev);
956
957 move_to_net_ee_or_free(mdev, e);
958
959 if (unlikely(!ok))
960 dev_err(DEV, "drbd_send_block() failed\n");
961 return ok;
962}
963
964/**
965 * w_e_end_rsdata_req() - Worker callback to send a P_RS_DATA_REPLY packet in response to a P_RS_DATA_REQUESTRS
966 * @mdev: DRBD device.
967 * @w: work object.
968 * @cancel: The connection will be closed anyways
969 */
970int w_e_end_rsdata_req(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
971{
972 struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w);
973 int ok;
974
975 if (unlikely(cancel)) {
976 drbd_free_ee(mdev, e);
977 dec_unacked(mdev);
978 return 1;
979 }
980
981 if (get_ldev_if_state(mdev, D_FAILED)) {
982 drbd_rs_complete_io(mdev, e->sector);
983 put_ldev(mdev);
984 }
985
45bb912b 986 if (likely((e->flags & EE_WAS_ERROR) == 0)) {
b411b363
PR
987 if (likely(mdev->state.pdsk >= D_INCONSISTENT)) {
988 inc_rs_pending(mdev);
989 ok = drbd_send_block(mdev, P_RS_DATA_REPLY, e);
990 } else {
991 if (__ratelimit(&drbd_ratelimit_state))
992 dev_err(DEV, "Not sending RSDataReply, "
993 "partner DISKLESS!\n");
994 ok = 1;
995 }
996 } else {
997 if (__ratelimit(&drbd_ratelimit_state))
998 dev_err(DEV, "Sending NegRSDReply. sector %llus.\n",
999 (unsigned long long)e->sector);
1000
1001 ok = drbd_send_ack(mdev, P_NEG_RS_DREPLY, e);
1002
1003 /* update resync data with failure */
1004 drbd_rs_failed_io(mdev, e->sector, e->size);
1005 }
1006
1007 dec_unacked(mdev);
1008
1009 move_to_net_ee_or_free(mdev, e);
1010
1011 if (unlikely(!ok))
1012 dev_err(DEV, "drbd_send_block() failed\n");
1013 return ok;
1014}
1015
1016int w_e_end_csum_rs_req(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1017{
1018 struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w);
1019 struct digest_info *di;
1020 int digest_size;
1021 void *digest = NULL;
1022 int ok, eq = 0;
1023
1024 if (unlikely(cancel)) {
1025 drbd_free_ee(mdev, e);
1026 dec_unacked(mdev);
1027 return 1;
1028 }
1029
1030 drbd_rs_complete_io(mdev, e->sector);
1031
85719573 1032 di = e->digest;
b411b363 1033
45bb912b 1034 if (likely((e->flags & EE_WAS_ERROR) == 0)) {
b411b363
PR
1035 /* quick hack to try to avoid a race against reconfiguration.
1036 * a real fix would be much more involved,
1037 * introducing more locking mechanisms */
1038 if (mdev->csums_tfm) {
1039 digest_size = crypto_hash_digestsize(mdev->csums_tfm);
1040 D_ASSERT(digest_size == di->digest_size);
1041 digest = kmalloc(digest_size, GFP_NOIO);
1042 }
1043 if (digest) {
45bb912b 1044 drbd_csum_ee(mdev, mdev->csums_tfm, e, digest);
b411b363
PR
1045 eq = !memcmp(digest, di->digest, digest_size);
1046 kfree(digest);
1047 }
1048
1049 if (eq) {
1050 drbd_set_in_sync(mdev, e->sector, e->size);
676396d5
LE
1051 /* rs_same_csums unit is BM_BLOCK_SIZE */
1052 mdev->rs_same_csum += e->size >> BM_BLOCK_SHIFT;
b411b363
PR
1053 ok = drbd_send_ack(mdev, P_RS_IS_IN_SYNC, e);
1054 } else {
1055 inc_rs_pending(mdev);
204bba99
PR
1056 e->block_id = ID_SYNCER; /* By setting block_id, digest pointer becomes invalid! */
1057 e->flags &= ~EE_HAS_DIGEST; /* This e no longer has a digest pointer */
1058 kfree(di);
b411b363
PR
1059 ok = drbd_send_block(mdev, P_RS_DATA_REPLY, e);
1060 }
1061 } else {
1062 ok = drbd_send_ack(mdev, P_NEG_RS_DREPLY, e);
1063 if (__ratelimit(&drbd_ratelimit_state))
1064 dev_err(DEV, "Sending NegDReply. I guess it gets messy.\n");
1065 }
1066
1067 dec_unacked(mdev);
b411b363
PR
1068 move_to_net_ee_or_free(mdev, e);
1069
1070 if (unlikely(!ok))
1071 dev_err(DEV, "drbd_send_block/ack() failed\n");
1072 return ok;
1073}
1074
1075int w_e_end_ov_req(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1076{
1077 struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w);
1078 int digest_size;
1079 void *digest;
1080 int ok = 1;
1081
1082 if (unlikely(cancel))
1083 goto out;
1084
45bb912b 1085 if (unlikely((e->flags & EE_WAS_ERROR) != 0))
b411b363
PR
1086 goto out;
1087
1088 digest_size = crypto_hash_digestsize(mdev->verify_tfm);
1089 /* FIXME if this allocation fails, online verify will not terminate! */
1090 digest = kmalloc(digest_size, GFP_NOIO);
1091 if (digest) {
45bb912b 1092 drbd_csum_ee(mdev, mdev->verify_tfm, e, digest);
b411b363
PR
1093 inc_rs_pending(mdev);
1094 ok = drbd_send_drequest_csum(mdev, e->sector, e->size,
1095 digest, digest_size, P_OV_REPLY);
1096 if (!ok)
1097 dec_rs_pending(mdev);
1098 kfree(digest);
1099 }
1100
1101out:
1102 drbd_free_ee(mdev, e);
1103
1104 dec_unacked(mdev);
1105
1106 return ok;
1107}
1108
1109void drbd_ov_oos_found(struct drbd_conf *mdev, sector_t sector, int size)
1110{
1111 if (mdev->ov_last_oos_start + mdev->ov_last_oos_size == sector) {
1112 mdev->ov_last_oos_size += size>>9;
1113 } else {
1114 mdev->ov_last_oos_start = sector;
1115 mdev->ov_last_oos_size = size>>9;
1116 }
1117 drbd_set_out_of_sync(mdev, sector, size);
1118 set_bit(WRITE_BM_AFTER_RESYNC, &mdev->flags);
1119}
1120
1121int w_e_end_ov_reply(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1122{
1123 struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w);
1124 struct digest_info *di;
1125 int digest_size;
1126 void *digest;
1127 int ok, eq = 0;
1128
1129 if (unlikely(cancel)) {
1130 drbd_free_ee(mdev, e);
1131 dec_unacked(mdev);
1132 return 1;
1133 }
1134
1135 /* after "cancel", because after drbd_disconnect/drbd_rs_cancel_all
1136 * the resync lru has been cleaned up already */
1137 drbd_rs_complete_io(mdev, e->sector);
1138
85719573 1139 di = e->digest;
b411b363 1140
45bb912b 1141 if (likely((e->flags & EE_WAS_ERROR) == 0)) {
b411b363
PR
1142 digest_size = crypto_hash_digestsize(mdev->verify_tfm);
1143 digest = kmalloc(digest_size, GFP_NOIO);
1144 if (digest) {
45bb912b 1145 drbd_csum_ee(mdev, mdev->verify_tfm, e, digest);
b411b363
PR
1146
1147 D_ASSERT(digest_size == di->digest_size);
1148 eq = !memcmp(digest, di->digest, digest_size);
1149 kfree(digest);
1150 }
1151 } else {
1152 ok = drbd_send_ack(mdev, P_NEG_RS_DREPLY, e);
1153 if (__ratelimit(&drbd_ratelimit_state))
1154 dev_err(DEV, "Sending NegDReply. I guess it gets messy.\n");
1155 }
1156
1157 dec_unacked(mdev);
b411b363
PR
1158 if (!eq)
1159 drbd_ov_oos_found(mdev, e->sector, e->size);
1160 else
1161 ov_oos_print(mdev);
1162
1163 ok = drbd_send_ack_ex(mdev, P_OV_RESULT, e->sector, e->size,
1164 eq ? ID_IN_SYNC : ID_OUT_OF_SYNC);
1165
1166 drbd_free_ee(mdev, e);
1167
1168 if (--mdev->ov_left == 0) {
1169 ov_oos_print(mdev);
1170 drbd_resync_finished(mdev);
1171 }
1172
1173 return ok;
1174}
1175
1176int w_prev_work_done(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1177{
1178 struct drbd_wq_barrier *b = container_of(w, struct drbd_wq_barrier, w);
1179 complete(&b->done);
1180 return 1;
1181}
1182
1183int w_send_barrier(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1184{
1185 struct drbd_tl_epoch *b = container_of(w, struct drbd_tl_epoch, w);
1186 struct p_barrier *p = &mdev->data.sbuf.barrier;
1187 int ok = 1;
1188
1189 /* really avoid racing with tl_clear. w.cb may have been referenced
1190 * just before it was reassigned and re-queued, so double check that.
1191 * actually, this race was harmless, since we only try to send the
1192 * barrier packet here, and otherwise do nothing with the object.
1193 * but compare with the head of w_clear_epoch */
1194 spin_lock_irq(&mdev->req_lock);
1195 if (w->cb != w_send_barrier || mdev->state.conn < C_CONNECTED)
1196 cancel = 1;
1197 spin_unlock_irq(&mdev->req_lock);
1198 if (cancel)
1199 return 1;
1200
1201 if (!drbd_get_data_sock(mdev))
1202 return 0;
1203 p->barrier = b->br_number;
1204 /* inc_ap_pending was done where this was queued.
1205 * dec_ap_pending will be done in got_BarrierAck
1206 * or (on connection loss) in w_clear_epoch. */
1207 ok = _drbd_send_cmd(mdev, mdev->data.socket, P_BARRIER,
0b70a13d 1208 (struct p_header80 *)p, sizeof(*p), 0);
b411b363
PR
1209 drbd_put_data_sock(mdev);
1210
1211 return ok;
1212}
1213
1214int w_send_write_hint(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1215{
1216 if (cancel)
1217 return 1;
1218 return drbd_send_short_cmd(mdev, P_UNPLUG_REMOTE);
1219}
1220
1221/**
1222 * w_send_dblock() - Worker callback to send a P_DATA packet in order to mirror a write request
1223 * @mdev: DRBD device.
1224 * @w: work object.
1225 * @cancel: The connection will be closed anyways
1226 */
1227int w_send_dblock(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1228{
1229 struct drbd_request *req = container_of(w, struct drbd_request, w);
1230 int ok;
1231
1232 if (unlikely(cancel)) {
1233 req_mod(req, send_canceled);
1234 return 1;
1235 }
1236
1237 ok = drbd_send_dblock(mdev, req);
1238 req_mod(req, ok ? handed_over_to_network : send_failed);
1239
1240 return ok;
1241}
1242
1243/**
1244 * w_send_read_req() - Worker callback to send a read request (P_DATA_REQUEST) packet
1245 * @mdev: DRBD device.
1246 * @w: work object.
1247 * @cancel: The connection will be closed anyways
1248 */
1249int w_send_read_req(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1250{
1251 struct drbd_request *req = container_of(w, struct drbd_request, w);
1252 int ok;
1253
1254 if (unlikely(cancel)) {
1255 req_mod(req, send_canceled);
1256 return 1;
1257 }
1258
1259 ok = drbd_send_drequest(mdev, P_DATA_REQUEST, req->sector, req->size,
1260 (unsigned long)req);
1261
1262 if (!ok) {
1263 /* ?? we set C_TIMEOUT or C_BROKEN_PIPE in drbd_send();
1264 * so this is probably redundant */
1265 if (mdev->state.conn >= C_CONNECTED)
1266 drbd_force_state(mdev, NS(conn, C_NETWORK_FAILURE));
1267 }
1268 req_mod(req, ok ? handed_over_to_network : send_failed);
1269
1270 return ok;
1271}
1272
265be2d0
PR
1273int w_restart_disk_io(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1274{
1275 struct drbd_request *req = container_of(w, struct drbd_request, w);
1276
0778286a 1277 if (bio_data_dir(req->master_bio) == WRITE && req->rq_state & RQ_IN_ACT_LOG)
265be2d0
PR
1278 drbd_al_begin_io(mdev, req->sector);
1279 /* Calling drbd_al_begin_io() out of the worker might deadlocks
1280 theoretically. Practically it can not deadlock, since this is
1281 only used when unfreezing IOs. All the extents of the requests
1282 that made it into the TL are already active */
1283
1284 drbd_req_make_private_bio(req, req->master_bio);
1285 req->private_bio->bi_bdev = mdev->ldev->backing_bdev;
1286 generic_make_request(req->private_bio);
1287
1288 return 1;
1289}
1290
b411b363
PR
1291static int _drbd_may_sync_now(struct drbd_conf *mdev)
1292{
1293 struct drbd_conf *odev = mdev;
1294
1295 while (1) {
1296 if (odev->sync_conf.after == -1)
1297 return 1;
1298 odev = minor_to_mdev(odev->sync_conf.after);
1299 ERR_IF(!odev) return 1;
1300 if ((odev->state.conn >= C_SYNC_SOURCE &&
1301 odev->state.conn <= C_PAUSED_SYNC_T) ||
1302 odev->state.aftr_isp || odev->state.peer_isp ||
1303 odev->state.user_isp)
1304 return 0;
1305 }
1306}
1307
1308/**
1309 * _drbd_pause_after() - Pause resync on all devices that may not resync now
1310 * @mdev: DRBD device.
1311 *
1312 * Called from process context only (admin command and after_state_ch).
1313 */
1314static int _drbd_pause_after(struct drbd_conf *mdev)
1315{
1316 struct drbd_conf *odev;
1317 int i, rv = 0;
1318
1319 for (i = 0; i < minor_count; i++) {
1320 odev = minor_to_mdev(i);
1321 if (!odev)
1322 continue;
1323 if (odev->state.conn == C_STANDALONE && odev->state.disk == D_DISKLESS)
1324 continue;
1325 if (!_drbd_may_sync_now(odev))
1326 rv |= (__drbd_set_state(_NS(odev, aftr_isp, 1), CS_HARD, NULL)
1327 != SS_NOTHING_TO_DO);
1328 }
1329
1330 return rv;
1331}
1332
1333/**
1334 * _drbd_resume_next() - Resume resync on all devices that may resync now
1335 * @mdev: DRBD device.
1336 *
1337 * Called from process context only (admin command and worker).
1338 */
1339static int _drbd_resume_next(struct drbd_conf *mdev)
1340{
1341 struct drbd_conf *odev;
1342 int i, rv = 0;
1343
1344 for (i = 0; i < minor_count; i++) {
1345 odev = minor_to_mdev(i);
1346 if (!odev)
1347 continue;
1348 if (odev->state.conn == C_STANDALONE && odev->state.disk == D_DISKLESS)
1349 continue;
1350 if (odev->state.aftr_isp) {
1351 if (_drbd_may_sync_now(odev))
1352 rv |= (__drbd_set_state(_NS(odev, aftr_isp, 0),
1353 CS_HARD, NULL)
1354 != SS_NOTHING_TO_DO) ;
1355 }
1356 }
1357 return rv;
1358}
1359
1360void resume_next_sg(struct drbd_conf *mdev)
1361{
1362 write_lock_irq(&global_state_lock);
1363 _drbd_resume_next(mdev);
1364 write_unlock_irq(&global_state_lock);
1365}
1366
1367void suspend_other_sg(struct drbd_conf *mdev)
1368{
1369 write_lock_irq(&global_state_lock);
1370 _drbd_pause_after(mdev);
1371 write_unlock_irq(&global_state_lock);
1372}
1373
1374static int sync_after_error(struct drbd_conf *mdev, int o_minor)
1375{
1376 struct drbd_conf *odev;
1377
1378 if (o_minor == -1)
1379 return NO_ERROR;
1380 if (o_minor < -1 || minor_to_mdev(o_minor) == NULL)
1381 return ERR_SYNC_AFTER;
1382
1383 /* check for loops */
1384 odev = minor_to_mdev(o_minor);
1385 while (1) {
1386 if (odev == mdev)
1387 return ERR_SYNC_AFTER_CYCLE;
1388
1389 /* dependency chain ends here, no cycles. */
1390 if (odev->sync_conf.after == -1)
1391 return NO_ERROR;
1392
1393 /* follow the dependency chain */
1394 odev = minor_to_mdev(odev->sync_conf.after);
1395 }
1396}
1397
1398int drbd_alter_sa(struct drbd_conf *mdev, int na)
1399{
1400 int changes;
1401 int retcode;
1402
1403 write_lock_irq(&global_state_lock);
1404 retcode = sync_after_error(mdev, na);
1405 if (retcode == NO_ERROR) {
1406 mdev->sync_conf.after = na;
1407 do {
1408 changes = _drbd_pause_after(mdev);
1409 changes |= _drbd_resume_next(mdev);
1410 } while (changes);
1411 }
1412 write_unlock_irq(&global_state_lock);
1413 return retcode;
1414}
1415
309d1608
PR
1416static void ping_peer(struct drbd_conf *mdev)
1417{
1418 clear_bit(GOT_PING_ACK, &mdev->flags);
1419 request_ping(mdev);
1420 wait_event(mdev->misc_wait,
1421 test_bit(GOT_PING_ACK, &mdev->flags) || mdev->state.conn < C_CONNECTED);
1422}
1423
b411b363
PR
1424/**
1425 * drbd_start_resync() - Start the resync process
1426 * @mdev: DRBD device.
1427 * @side: Either C_SYNC_SOURCE or C_SYNC_TARGET
1428 *
1429 * This function might bring you directly into one of the
1430 * C_PAUSED_SYNC_* states.
1431 */
1432void drbd_start_resync(struct drbd_conf *mdev, enum drbd_conns side)
1433{
1434 union drbd_state ns;
1435 int r;
1436
1437 if (mdev->state.conn >= C_SYNC_SOURCE) {
1438 dev_err(DEV, "Resync already running!\n");
1439 return;
1440 }
1441
b411b363
PR
1442 /* In case a previous resync run was aborted by an IO error/detach on the peer. */
1443 drbd_rs_cancel_all(mdev);
1444
1445 if (side == C_SYNC_TARGET) {
1446 /* Since application IO was locked out during C_WF_BITMAP_T and
1447 C_WF_SYNC_UUID we are still unmodified. Before going to C_SYNC_TARGET
1448 we check that we might make the data inconsistent. */
1449 r = drbd_khelper(mdev, "before-resync-target");
1450 r = (r >> 8) & 0xff;
1451 if (r > 0) {
1452 dev_info(DEV, "before-resync-target handler returned %d, "
1453 "dropping connection.\n", r);
1454 drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
1455 return;
1456 }
1457 }
1458
1459 drbd_state_lock(mdev);
1460
1461 if (!get_ldev_if_state(mdev, D_NEGOTIATING)) {
1462 drbd_state_unlock(mdev);
1463 return;
1464 }
1465
1466 if (side == C_SYNC_TARGET) {
1467 mdev->bm_resync_fo = 0;
1468 } else /* side == C_SYNC_SOURCE */ {
1469 u64 uuid;
1470
1471 get_random_bytes(&uuid, sizeof(u64));
1472 drbd_uuid_set(mdev, UI_BITMAP, uuid);
1473 drbd_send_sync_uuid(mdev, uuid);
1474
1475 D_ASSERT(mdev->state.disk == D_UP_TO_DATE);
1476 }
1477
1478 write_lock_irq(&global_state_lock);
1479 ns = mdev->state;
1480
1481 ns.aftr_isp = !_drbd_may_sync_now(mdev);
1482
1483 ns.conn = side;
1484
1485 if (side == C_SYNC_TARGET)
1486 ns.disk = D_INCONSISTENT;
1487 else /* side == C_SYNC_SOURCE */
1488 ns.pdsk = D_INCONSISTENT;
1489
1490 r = __drbd_set_state(mdev, ns, CS_VERBOSE, NULL);
1491 ns = mdev->state;
1492
1493 if (ns.conn < C_CONNECTED)
1494 r = SS_UNKNOWN_ERROR;
1495
1496 if (r == SS_SUCCESS) {
1d7734a0
LE
1497 unsigned long tw = drbd_bm_total_weight(mdev);
1498 unsigned long now = jiffies;
1499 int i;
1500
b411b363
PR
1501 mdev->rs_failed = 0;
1502 mdev->rs_paused = 0;
b411b363 1503 mdev->rs_same_csum = 0;
0f0601f4
LE
1504 mdev->rs_last_events = 0;
1505 mdev->rs_last_sect_ev = 0;
1d7734a0
LE
1506 mdev->rs_total = tw;
1507 mdev->rs_start = now;
1508 for (i = 0; i < DRBD_SYNC_MARKS; i++) {
1509 mdev->rs_mark_left[i] = tw;
1510 mdev->rs_mark_time[i] = now;
1511 }
b411b363
PR
1512 _drbd_pause_after(mdev);
1513 }
1514 write_unlock_irq(&global_state_lock);
b411b363
PR
1515 put_ldev(mdev);
1516
1517 if (r == SS_SUCCESS) {
1518 dev_info(DEV, "Began resync as %s (will sync %lu KB [%lu bits set]).\n",
1519 drbd_conn_str(ns.conn),
1520 (unsigned long) mdev->rs_total << (BM_BLOCK_SHIFT-10),
1521 (unsigned long) mdev->rs_total);
1522
1523 if (mdev->rs_total == 0) {
1524 /* Peer still reachable? Beware of failing before-resync-target handlers! */
309d1608 1525 ping_peer(mdev);
b411b363 1526 drbd_resync_finished(mdev);
b411b363
PR
1527 }
1528
778f271d 1529 atomic_set(&mdev->rs_sect_in, 0);
0f0601f4 1530 atomic_set(&mdev->rs_sect_ev, 0);
778f271d
PR
1531 mdev->rs_in_flight = 0;
1532 mdev->rs_planed = 0;
1533 spin_lock(&mdev->peer_seq_lock);
1534 fifo_set(&mdev->rs_plan_s, 0);
1535 spin_unlock(&mdev->peer_seq_lock);
b411b363
PR
1536 /* ns.conn may already be != mdev->state.conn,
1537 * we may have been paused in between, or become paused until
1538 * the timer triggers.
1539 * No matter, that is handled in resync_timer_fn() */
1540 if (ns.conn == C_SYNC_TARGET)
1541 mod_timer(&mdev->resync_timer, jiffies);
1542
1543 drbd_md_sync(mdev);
1544 }
d0c3f60f 1545 drbd_state_unlock(mdev);
b411b363
PR
1546}
1547
1548int drbd_worker(struct drbd_thread *thi)
1549{
1550 struct drbd_conf *mdev = thi->mdev;
1551 struct drbd_work *w = NULL;
1552 LIST_HEAD(work_list);
1553 int intr = 0, i;
1554
1555 sprintf(current->comm, "drbd%d_worker", mdev_to_minor(mdev));
1556
1557 while (get_t_state(thi) == Running) {
1558 drbd_thread_current_set_cpu(mdev);
1559
1560 if (down_trylock(&mdev->data.work.s)) {
1561 mutex_lock(&mdev->data.mutex);
1562 if (mdev->data.socket && !mdev->net_conf->no_cork)
1563 drbd_tcp_uncork(mdev->data.socket);
1564 mutex_unlock(&mdev->data.mutex);
1565
1566 intr = down_interruptible(&mdev->data.work.s);
1567
1568 mutex_lock(&mdev->data.mutex);
1569 if (mdev->data.socket && !mdev->net_conf->no_cork)
1570 drbd_tcp_cork(mdev->data.socket);
1571 mutex_unlock(&mdev->data.mutex);
1572 }
1573
1574 if (intr) {
1575 D_ASSERT(intr == -EINTR);
1576 flush_signals(current);
1577 ERR_IF (get_t_state(thi) == Running)
1578 continue;
1579 break;
1580 }
1581
1582 if (get_t_state(thi) != Running)
1583 break;
1584 /* With this break, we have done a down() but not consumed
1585 the entry from the list. The cleanup code takes care of
1586 this... */
1587
1588 w = NULL;
1589 spin_lock_irq(&mdev->data.work.q_lock);
1590 ERR_IF(list_empty(&mdev->data.work.q)) {
1591 /* something terribly wrong in our logic.
1592 * we were able to down() the semaphore,
1593 * but the list is empty... doh.
1594 *
1595 * what is the best thing to do now?
1596 * try again from scratch, restarting the receiver,
1597 * asender, whatnot? could break even more ugly,
1598 * e.g. when we are primary, but no good local data.
1599 *
1600 * I'll try to get away just starting over this loop.
1601 */
1602 spin_unlock_irq(&mdev->data.work.q_lock);
1603 continue;
1604 }
1605 w = list_entry(mdev->data.work.q.next, struct drbd_work, list);
1606 list_del_init(&w->list);
1607 spin_unlock_irq(&mdev->data.work.q_lock);
1608
1609 if (!w->cb(mdev, w, mdev->state.conn < C_CONNECTED)) {
1610 /* dev_warn(DEV, "worker: a callback failed! \n"); */
1611 if (mdev->state.conn >= C_CONNECTED)
1612 drbd_force_state(mdev,
1613 NS(conn, C_NETWORK_FAILURE));
1614 }
1615 }
1616 D_ASSERT(test_bit(DEVICE_DYING, &mdev->flags));
1617 D_ASSERT(test_bit(CONFIG_PENDING, &mdev->flags));
1618
1619 spin_lock_irq(&mdev->data.work.q_lock);
1620 i = 0;
1621 while (!list_empty(&mdev->data.work.q)) {
1622 list_splice_init(&mdev->data.work.q, &work_list);
1623 spin_unlock_irq(&mdev->data.work.q_lock);
1624
1625 while (!list_empty(&work_list)) {
1626 w = list_entry(work_list.next, struct drbd_work, list);
1627 list_del_init(&w->list);
1628 w->cb(mdev, w, 1);
1629 i++; /* dead debugging code */
1630 }
1631
1632 spin_lock_irq(&mdev->data.work.q_lock);
1633 }
1634 sema_init(&mdev->data.work.s, 0);
1635 /* DANGEROUS race: if someone did queue his work within the spinlock,
1636 * but up() ed outside the spinlock, we could get an up() on the
1637 * semaphore without corresponding list entry.
1638 * So don't do that.
1639 */
1640 spin_unlock_irq(&mdev->data.work.q_lock);
1641
1642 D_ASSERT(mdev->state.disk == D_DISKLESS && mdev->state.conn == C_STANDALONE);
1643 /* _drbd_set_state only uses stop_nowait.
1644 * wait here for the Exiting receiver. */
1645 drbd_thread_stop(&mdev->receiver);
1646 drbd_mdev_cleanup(mdev);
1647
1648 dev_info(DEV, "worker terminated\n");
1649
1650 clear_bit(DEVICE_DYING, &mdev->flags);
1651 clear_bit(CONFIG_PENDING, &mdev->flags);
1652 wake_up(&mdev->state_wait);
1653
1654 return 0;
1655}
This page took 0.142035 seconds and 5 git commands to generate.