0b78bf28ff5daf06b9b4ca92caa5d56f6b375a60
[deliverable/linux.git] / fs / btrfs / async-thread.c
1 /*
2 * Copyright (C) 2007 Oracle. All rights reserved.
3 *
4 * This program is free software; you can redistribute it and/or
5 * modify it under the terms of the GNU General Public
6 * License v2 as published by the Free Software Foundation.
7 *
8 * This program is distributed in the hope that it will be useful,
9 * but WITHOUT ANY WARRANTY; without even the implied warranty of
10 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
11 * General Public License for more details.
12 *
13 * You should have received a copy of the GNU General Public
14 * License along with this program; if not, write to the
15 * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
16 * Boston, MA 021110-1307, USA.
17 */
18
19 #include <linux/kthread.h>
20 #include <linux/slab.h>
21 #include <linux/list.h>
22 #include <linux/spinlock.h>
23 #include <linux/freezer.h>
24 #include "async-thread.h"
25
26 #define WORK_QUEUED_BIT 0
27 #define WORK_DONE_BIT 1
28 #define WORK_ORDER_DONE_BIT 2
29 #define WORK_HIGH_PRIO_BIT 3
30
31 /*
32 * container for the kthread task pointer and the list of pending work
33 * One of these is allocated per thread.
34 */
35 struct btrfs_worker_thread {
36 /* pool we belong to */
37 struct btrfs_workers *workers;
38
39 /* list of struct btrfs_work that are waiting for service */
40 struct list_head pending;
41 struct list_head prio_pending;
42
43 /* list of worker threads from struct btrfs_workers */
44 struct list_head worker_list;
45
46 /* kthread */
47 struct task_struct *task;
48
49 /* number of things on the pending list */
50 atomic_t num_pending;
51
52 /* reference counter for this struct */
53 atomic_t refs;
54
55 unsigned long sequence;
56
57 /* protects the pending list. */
58 spinlock_t lock;
59
60 /* set to non-zero when this thread is already awake and kicking */
61 int working;
62
63 /* are we currently idle */
64 int idle;
65 };
66
67 static int __btrfs_start_workers(struct btrfs_workers *workers);
68
69 /*
70 * btrfs_start_workers uses kthread_run, which can block waiting for memory
71 * for a very long time. It will actually throttle on page writeback,
72 * and so it may not make progress until after our btrfs worker threads
73 * process all of the pending work structs in their queue
74 *
75 * This means we can't use btrfs_start_workers from inside a btrfs worker
76 * thread that is used as part of cleaning dirty memory, which pretty much
77 * involves all of the worker threads.
78 *
79 * Instead we have a helper queue who never has more than one thread
80 * where we scheduler thread start operations. This worker_start struct
81 * is used to contain the work and hold a pointer to the queue that needs
82 * another worker.
83 */
84 struct worker_start {
85 struct btrfs_work work;
86 struct btrfs_workers *queue;
87 };
88
89 static void start_new_worker_func(struct btrfs_work *work)
90 {
91 struct worker_start *start;
92 start = container_of(work, struct worker_start, work);
93 __btrfs_start_workers(start->queue);
94 kfree(start);
95 }
96
97 /*
98 * helper function to move a thread onto the idle list after it
99 * has finished some requests.
100 */
101 static void check_idle_worker(struct btrfs_worker_thread *worker)
102 {
103 if (!worker->idle && atomic_read(&worker->num_pending) <
104 worker->workers->idle_thresh / 2) {
105 unsigned long flags;
106 spin_lock_irqsave(&worker->workers->lock, flags);
107 worker->idle = 1;
108
109 /* the list may be empty if the worker is just starting */
110 if (!list_empty(&worker->worker_list) &&
111 !worker->workers->stopping) {
112 list_move(&worker->worker_list,
113 &worker->workers->idle_list);
114 }
115 spin_unlock_irqrestore(&worker->workers->lock, flags);
116 }
117 }
118
119 /*
120 * helper function to move a thread off the idle list after new
121 * pending work is added.
122 */
123 static void check_busy_worker(struct btrfs_worker_thread *worker)
124 {
125 if (worker->idle && atomic_read(&worker->num_pending) >=
126 worker->workers->idle_thresh) {
127 unsigned long flags;
128 spin_lock_irqsave(&worker->workers->lock, flags);
129 worker->idle = 0;
130
131 if (!list_empty(&worker->worker_list) &&
132 !worker->workers->stopping) {
133 list_move_tail(&worker->worker_list,
134 &worker->workers->worker_list);
135 }
136 spin_unlock_irqrestore(&worker->workers->lock, flags);
137 }
138 }
139
140 static void check_pending_worker_creates(struct btrfs_worker_thread *worker)
141 {
142 struct btrfs_workers *workers = worker->workers;
143 struct worker_start *start;
144 unsigned long flags;
145
146 rmb();
147 if (!workers->atomic_start_pending)
148 return;
149
150 start = kzalloc(sizeof(*start), GFP_NOFS);
151 if (!start)
152 return;
153
154 start->work.func = start_new_worker_func;
155 start->queue = workers;
156
157 spin_lock_irqsave(&workers->lock, flags);
158 if (!workers->atomic_start_pending)
159 goto out;
160
161 workers->atomic_start_pending = 0;
162 if (workers->num_workers + workers->num_workers_starting >=
163 workers->max_workers)
164 goto out;
165
166 workers->num_workers_starting += 1;
167 spin_unlock_irqrestore(&workers->lock, flags);
168 btrfs_queue_worker(workers->atomic_worker_start, &start->work);
169 return;
170
171 out:
172 kfree(start);
173 spin_unlock_irqrestore(&workers->lock, flags);
174 }
175
176 static noinline void run_ordered_completions(struct btrfs_workers *workers,
177 struct btrfs_work *work)
178 {
179 if (!workers->ordered)
180 return;
181
182 set_bit(WORK_DONE_BIT, &work->flags);
183
184 spin_lock(&workers->order_lock);
185
186 while (1) {
187 if (!list_empty(&workers->prio_order_list)) {
188 work = list_entry(workers->prio_order_list.next,
189 struct btrfs_work, order_list);
190 } else if (!list_empty(&workers->order_list)) {
191 work = list_entry(workers->order_list.next,
192 struct btrfs_work, order_list);
193 } else {
194 break;
195 }
196 if (!test_bit(WORK_DONE_BIT, &work->flags))
197 break;
198
199 /* we are going to call the ordered done function, but
200 * we leave the work item on the list as a barrier so
201 * that later work items that are done don't have their
202 * functions called before this one returns
203 */
204 if (test_and_set_bit(WORK_ORDER_DONE_BIT, &work->flags))
205 break;
206
207 spin_unlock(&workers->order_lock);
208
209 work->ordered_func(work);
210
211 /* now take the lock again and drop our item from the list */
212 spin_lock(&workers->order_lock);
213 list_del(&work->order_list);
214 spin_unlock(&workers->order_lock);
215
216 /*
217 * we don't want to call the ordered free functions
218 * with the lock held though
219 */
220 work->ordered_free(work);
221 spin_lock(&workers->order_lock);
222 }
223
224 spin_unlock(&workers->order_lock);
225 }
226
227 static void put_worker(struct btrfs_worker_thread *worker)
228 {
229 if (atomic_dec_and_test(&worker->refs))
230 kfree(worker);
231 }
232
233 static int try_worker_shutdown(struct btrfs_worker_thread *worker)
234 {
235 int freeit = 0;
236
237 spin_lock_irq(&worker->lock);
238 spin_lock(&worker->workers->lock);
239 if (worker->workers->num_workers > 1 &&
240 worker->idle &&
241 !worker->working &&
242 !list_empty(&worker->worker_list) &&
243 list_empty(&worker->prio_pending) &&
244 list_empty(&worker->pending) &&
245 atomic_read(&worker->num_pending) == 0) {
246 freeit = 1;
247 list_del_init(&worker->worker_list);
248 worker->workers->num_workers--;
249 }
250 spin_unlock(&worker->workers->lock);
251 spin_unlock_irq(&worker->lock);
252
253 if (freeit)
254 put_worker(worker);
255 return freeit;
256 }
257
258 static struct btrfs_work *get_next_work(struct btrfs_worker_thread *worker,
259 struct list_head *prio_head,
260 struct list_head *head)
261 {
262 struct btrfs_work *work = NULL;
263 struct list_head *cur = NULL;
264
265 if (!list_empty(prio_head)) {
266 cur = prio_head->next;
267 goto out;
268 }
269
270 smp_mb();
271 if (!list_empty(&worker->prio_pending))
272 goto refill;
273
274 if (!list_empty(head)) {
275 cur = head->next;
276 goto out;
277 }
278
279 refill:
280 spin_lock_irq(&worker->lock);
281 list_splice_tail_init(&worker->prio_pending, prio_head);
282 list_splice_tail_init(&worker->pending, head);
283
284 if (!list_empty(prio_head))
285 cur = prio_head->next;
286 else if (!list_empty(head))
287 cur = head->next;
288 spin_unlock_irq(&worker->lock);
289
290 if (!cur)
291 goto out_fail;
292
293 out:
294 work = list_entry(cur, struct btrfs_work, list);
295
296 out_fail:
297 return work;
298 }
299
300 /*
301 * main loop for servicing work items
302 */
303 static int worker_loop(void *arg)
304 {
305 struct btrfs_worker_thread *worker = arg;
306 struct list_head head;
307 struct list_head prio_head;
308 struct btrfs_work *work;
309
310 INIT_LIST_HEAD(&head);
311 INIT_LIST_HEAD(&prio_head);
312
313 do {
314 again:
315 while (1) {
316
317
318 work = get_next_work(worker, &prio_head, &head);
319 if (!work)
320 break;
321
322 list_del(&work->list);
323 clear_bit(WORK_QUEUED_BIT, &work->flags);
324
325 work->worker = worker;
326
327 work->func(work);
328
329 atomic_dec(&worker->num_pending);
330 /*
331 * unless this is an ordered work queue,
332 * 'work' was probably freed by func above.
333 */
334 run_ordered_completions(worker->workers, work);
335
336 check_pending_worker_creates(worker);
337 cond_resched();
338 }
339
340 spin_lock_irq(&worker->lock);
341 check_idle_worker(worker);
342
343 if (freezing(current)) {
344 worker->working = 0;
345 spin_unlock_irq(&worker->lock);
346 try_to_freeze();
347 } else {
348 spin_unlock_irq(&worker->lock);
349 if (!kthread_should_stop()) {
350 cpu_relax();
351 /*
352 * we've dropped the lock, did someone else
353 * jump_in?
354 */
355 smp_mb();
356 if (!list_empty(&worker->pending) ||
357 !list_empty(&worker->prio_pending))
358 continue;
359
360 /*
361 * this short schedule allows more work to
362 * come in without the queue functions
363 * needing to go through wake_up_process()
364 *
365 * worker->working is still 1, so nobody
366 * is going to try and wake us up
367 */
368 schedule_timeout(1);
369 smp_mb();
370 if (!list_empty(&worker->pending) ||
371 !list_empty(&worker->prio_pending))
372 continue;
373
374 if (kthread_should_stop())
375 break;
376
377 /* still no more work?, sleep for real */
378 spin_lock_irq(&worker->lock);
379 set_current_state(TASK_INTERRUPTIBLE);
380 if (!list_empty(&worker->pending) ||
381 !list_empty(&worker->prio_pending)) {
382 spin_unlock_irq(&worker->lock);
383 set_current_state(TASK_RUNNING);
384 goto again;
385 }
386
387 /*
388 * this makes sure we get a wakeup when someone
389 * adds something new to the queue
390 */
391 worker->working = 0;
392 spin_unlock_irq(&worker->lock);
393
394 if (!kthread_should_stop()) {
395 schedule_timeout(HZ * 120);
396 if (!worker->working &&
397 try_worker_shutdown(worker)) {
398 return 0;
399 }
400 }
401 }
402 __set_current_state(TASK_RUNNING);
403 }
404 } while (!kthread_should_stop());
405 return 0;
406 }
407
408 /*
409 * this will wait for all the worker threads to shutdown
410 */
411 void btrfs_stop_workers(struct btrfs_workers *workers)
412 {
413 struct list_head *cur;
414 struct btrfs_worker_thread *worker;
415 int can_stop;
416
417 spin_lock_irq(&workers->lock);
418 workers->stopping = 1;
419 list_splice_init(&workers->idle_list, &workers->worker_list);
420 while (!list_empty(&workers->worker_list)) {
421 cur = workers->worker_list.next;
422 worker = list_entry(cur, struct btrfs_worker_thread,
423 worker_list);
424
425 atomic_inc(&worker->refs);
426 workers->num_workers -= 1;
427 if (!list_empty(&worker->worker_list)) {
428 list_del_init(&worker->worker_list);
429 put_worker(worker);
430 can_stop = 1;
431 } else
432 can_stop = 0;
433 spin_unlock_irq(&workers->lock);
434 if (can_stop)
435 kthread_stop(worker->task);
436 spin_lock_irq(&workers->lock);
437 put_worker(worker);
438 }
439 spin_unlock_irq(&workers->lock);
440 }
441
442 /*
443 * simple init on struct btrfs_workers
444 */
445 void btrfs_init_workers(struct btrfs_workers *workers, char *name, int max,
446 struct btrfs_workers *async_helper)
447 {
448 workers->num_workers = 0;
449 workers->num_workers_starting = 0;
450 INIT_LIST_HEAD(&workers->worker_list);
451 INIT_LIST_HEAD(&workers->idle_list);
452 INIT_LIST_HEAD(&workers->order_list);
453 INIT_LIST_HEAD(&workers->prio_order_list);
454 spin_lock_init(&workers->lock);
455 spin_lock_init(&workers->order_lock);
456 workers->max_workers = max;
457 workers->idle_thresh = 32;
458 workers->name = name;
459 workers->ordered = 0;
460 workers->atomic_start_pending = 0;
461 workers->atomic_worker_start = async_helper;
462 workers->stopping = 0;
463 }
464
465 /*
466 * starts new worker threads. This does not enforce the max worker
467 * count in case you need to temporarily go past it.
468 */
469 static int __btrfs_start_workers(struct btrfs_workers *workers)
470 {
471 struct btrfs_worker_thread *worker;
472 int ret = 0;
473
474 worker = kzalloc(sizeof(*worker), GFP_NOFS);
475 if (!worker) {
476 ret = -ENOMEM;
477 goto fail;
478 }
479
480 INIT_LIST_HEAD(&worker->pending);
481 INIT_LIST_HEAD(&worker->prio_pending);
482 INIT_LIST_HEAD(&worker->worker_list);
483 spin_lock_init(&worker->lock);
484
485 atomic_set(&worker->num_pending, 0);
486 atomic_set(&worker->refs, 1);
487 worker->workers = workers;
488 worker->task = kthread_create(worker_loop, worker,
489 "btrfs-%s-%d", workers->name,
490 workers->num_workers + 1);
491 if (IS_ERR(worker->task)) {
492 ret = PTR_ERR(worker->task);
493 goto fail;
494 }
495
496 spin_lock_irq(&workers->lock);
497 if (workers->stopping) {
498 spin_unlock_irq(&workers->lock);
499 ret = -EINVAL;
500 goto fail_kthread;
501 }
502 list_add_tail(&worker->worker_list, &workers->idle_list);
503 worker->idle = 1;
504 workers->num_workers++;
505 workers->num_workers_starting--;
506 WARN_ON(workers->num_workers_starting < 0);
507 spin_unlock_irq(&workers->lock);
508
509 wake_up_process(worker->task);
510 return 0;
511
512 fail_kthread:
513 kthread_stop(worker->task);
514 fail:
515 kfree(worker);
516 spin_lock_irq(&workers->lock);
517 workers->num_workers_starting--;
518 spin_unlock_irq(&workers->lock);
519 return ret;
520 }
521
522 int btrfs_start_workers(struct btrfs_workers *workers)
523 {
524 spin_lock_irq(&workers->lock);
525 workers->num_workers_starting++;
526 spin_unlock_irq(&workers->lock);
527 return __btrfs_start_workers(workers);
528 }
529
530 /*
531 * run through the list and find a worker thread that doesn't have a lot
532 * to do right now. This can return null if we aren't yet at the thread
533 * count limit and all of the threads are busy.
534 */
535 static struct btrfs_worker_thread *next_worker(struct btrfs_workers *workers)
536 {
537 struct btrfs_worker_thread *worker;
538 struct list_head *next;
539 int enforce_min;
540
541 enforce_min = (workers->num_workers + workers->num_workers_starting) <
542 workers->max_workers;
543
544 /*
545 * if we find an idle thread, don't move it to the end of the
546 * idle list. This improves the chance that the next submission
547 * will reuse the same thread, and maybe catch it while it is still
548 * working
549 */
550 if (!list_empty(&workers->idle_list)) {
551 next = workers->idle_list.next;
552 worker = list_entry(next, struct btrfs_worker_thread,
553 worker_list);
554 return worker;
555 }
556 if (enforce_min || list_empty(&workers->worker_list))
557 return NULL;
558
559 /*
560 * if we pick a busy task, move the task to the end of the list.
561 * hopefully this will keep things somewhat evenly balanced.
562 * Do the move in batches based on the sequence number. This groups
563 * requests submitted at roughly the same time onto the same worker.
564 */
565 next = workers->worker_list.next;
566 worker = list_entry(next, struct btrfs_worker_thread, worker_list);
567 worker->sequence++;
568
569 if (worker->sequence % workers->idle_thresh == 0)
570 list_move_tail(next, &workers->worker_list);
571 return worker;
572 }
573
574 /*
575 * selects a worker thread to take the next job. This will either find
576 * an idle worker, start a new worker up to the max count, or just return
577 * one of the existing busy workers.
578 */
579 static struct btrfs_worker_thread *find_worker(struct btrfs_workers *workers)
580 {
581 struct btrfs_worker_thread *worker;
582 unsigned long flags;
583 struct list_head *fallback;
584 int ret;
585
586 spin_lock_irqsave(&workers->lock, flags);
587 again:
588 worker = next_worker(workers);
589
590 if (!worker) {
591 if (workers->num_workers + workers->num_workers_starting >=
592 workers->max_workers) {
593 goto fallback;
594 } else if (workers->atomic_worker_start) {
595 workers->atomic_start_pending = 1;
596 goto fallback;
597 } else {
598 workers->num_workers_starting++;
599 spin_unlock_irqrestore(&workers->lock, flags);
600 /* we're below the limit, start another worker */
601 ret = __btrfs_start_workers(workers);
602 spin_lock_irqsave(&workers->lock, flags);
603 if (ret)
604 goto fallback;
605 goto again;
606 }
607 }
608 goto found;
609
610 fallback:
611 fallback = NULL;
612 /*
613 * we have failed to find any workers, just
614 * return the first one we can find.
615 */
616 if (!list_empty(&workers->worker_list))
617 fallback = workers->worker_list.next;
618 if (!list_empty(&workers->idle_list))
619 fallback = workers->idle_list.next;
620 BUG_ON(!fallback);
621 worker = list_entry(fallback,
622 struct btrfs_worker_thread, worker_list);
623 found:
624 /*
625 * this makes sure the worker doesn't exit before it is placed
626 * onto a busy/idle list
627 */
628 atomic_inc(&worker->num_pending);
629 spin_unlock_irqrestore(&workers->lock, flags);
630 return worker;
631 }
632
633 /*
634 * btrfs_requeue_work just puts the work item back on the tail of the list
635 * it was taken from. It is intended for use with long running work functions
636 * that make some progress and want to give the cpu up for others.
637 */
638 void btrfs_requeue_work(struct btrfs_work *work)
639 {
640 struct btrfs_worker_thread *worker = work->worker;
641 unsigned long flags;
642 int wake = 0;
643
644 if (test_and_set_bit(WORK_QUEUED_BIT, &work->flags))
645 return;
646
647 spin_lock_irqsave(&worker->lock, flags);
648 if (test_bit(WORK_HIGH_PRIO_BIT, &work->flags))
649 list_add_tail(&work->list, &worker->prio_pending);
650 else
651 list_add_tail(&work->list, &worker->pending);
652 atomic_inc(&worker->num_pending);
653
654 /* by definition we're busy, take ourselves off the idle
655 * list
656 */
657 if (worker->idle) {
658 spin_lock(&worker->workers->lock);
659 worker->idle = 0;
660 list_move_tail(&worker->worker_list,
661 &worker->workers->worker_list);
662 spin_unlock(&worker->workers->lock);
663 }
664 if (!worker->working) {
665 wake = 1;
666 worker->working = 1;
667 }
668
669 if (wake)
670 wake_up_process(worker->task);
671 spin_unlock_irqrestore(&worker->lock, flags);
672 }
673
674 void btrfs_set_work_high_prio(struct btrfs_work *work)
675 {
676 set_bit(WORK_HIGH_PRIO_BIT, &work->flags);
677 }
678
679 /*
680 * places a struct btrfs_work into the pending queue of one of the kthreads
681 */
682 void btrfs_queue_worker(struct btrfs_workers *workers, struct btrfs_work *work)
683 {
684 struct btrfs_worker_thread *worker;
685 unsigned long flags;
686 int wake = 0;
687
688 /* don't requeue something already on a list */
689 if (test_and_set_bit(WORK_QUEUED_BIT, &work->flags))
690 return;
691
692 worker = find_worker(workers);
693 if (workers->ordered) {
694 /*
695 * you're not allowed to do ordered queues from an
696 * interrupt handler
697 */
698 spin_lock(&workers->order_lock);
699 if (test_bit(WORK_HIGH_PRIO_BIT, &work->flags)) {
700 list_add_tail(&work->order_list,
701 &workers->prio_order_list);
702 } else {
703 list_add_tail(&work->order_list, &workers->order_list);
704 }
705 spin_unlock(&workers->order_lock);
706 } else {
707 INIT_LIST_HEAD(&work->order_list);
708 }
709
710 spin_lock_irqsave(&worker->lock, flags);
711
712 if (test_bit(WORK_HIGH_PRIO_BIT, &work->flags))
713 list_add_tail(&work->list, &worker->prio_pending);
714 else
715 list_add_tail(&work->list, &worker->pending);
716 check_busy_worker(worker);
717
718 /*
719 * avoid calling into wake_up_process if this thread has already
720 * been kicked
721 */
722 if (!worker->working)
723 wake = 1;
724 worker->working = 1;
725
726 if (wake)
727 wake_up_process(worker->task);
728 spin_unlock_irqrestore(&worker->lock, flags);
729 }
This page took 0.0439 seconds and 4 git commands to generate.