btrfs: Added btrfs_workqueue_struct implemented ordered execution based on kernel...
[deliverable/linux.git] / fs / btrfs / async-thread.c
1 /*
2 * Copyright (C) 2007 Oracle. All rights reserved.
3 * Copyright (C) 2014 Fujitsu. All rights reserved.
4 *
5 * This program is free software; you can redistribute it and/or
6 * modify it under the terms of the GNU General Public
7 * License v2 as published by the Free Software Foundation.
8 *
9 * This program is distributed in the hope that it will be useful,
10 * but WITHOUT ANY WARRANTY; without even the implied warranty of
11 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
12 * General Public License for more details.
13 *
14 * You should have received a copy of the GNU General Public
15 * License along with this program; if not, write to the
16 * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
17 * Boston, MA 021110-1307, USA.
18 */
19
20 #include <linux/kthread.h>
21 #include <linux/slab.h>
22 #include <linux/list.h>
23 #include <linux/spinlock.h>
24 #include <linux/freezer.h>
25 #include <linux/workqueue.h>
26 #include "async-thread.h"
27
28 #define WORK_QUEUED_BIT 0
29 #define WORK_DONE_BIT 1
30 #define WORK_ORDER_DONE_BIT 2
31 #define WORK_HIGH_PRIO_BIT 3
32
33 /*
34 * container for the kthread task pointer and the list of pending work
35 * One of these is allocated per thread.
36 */
37 struct btrfs_worker_thread {
38 /* pool we belong to */
39 struct btrfs_workers *workers;
40
41 /* list of struct btrfs_work that are waiting for service */
42 struct list_head pending;
43 struct list_head prio_pending;
44
45 /* list of worker threads from struct btrfs_workers */
46 struct list_head worker_list;
47
48 /* kthread */
49 struct task_struct *task;
50
51 /* number of things on the pending list */
52 atomic_t num_pending;
53
54 /* reference counter for this struct */
55 atomic_t refs;
56
57 unsigned long sequence;
58
59 /* protects the pending list. */
60 spinlock_t lock;
61
62 /* set to non-zero when this thread is already awake and kicking */
63 int working;
64
65 /* are we currently idle */
66 int idle;
67 };
68
69 static int __btrfs_start_workers(struct btrfs_workers *workers);
70
71 /*
72 * btrfs_start_workers uses kthread_run, which can block waiting for memory
73 * for a very long time. It will actually throttle on page writeback,
74 * and so it may not make progress until after our btrfs worker threads
75 * process all of the pending work structs in their queue
76 *
77 * This means we can't use btrfs_start_workers from inside a btrfs worker
78 * thread that is used as part of cleaning dirty memory, which pretty much
79 * involves all of the worker threads.
80 *
81 * Instead we have a helper queue who never has more than one thread
82 * where we scheduler thread start operations. This worker_start struct
83 * is used to contain the work and hold a pointer to the queue that needs
84 * another worker.
85 */
86 struct worker_start {
87 struct btrfs_work work;
88 struct btrfs_workers *queue;
89 };
90
91 static void start_new_worker_func(struct btrfs_work *work)
92 {
93 struct worker_start *start;
94 start = container_of(work, struct worker_start, work);
95 __btrfs_start_workers(start->queue);
96 kfree(start);
97 }
98
99 /*
100 * helper function to move a thread onto the idle list after it
101 * has finished some requests.
102 */
103 static void check_idle_worker(struct btrfs_worker_thread *worker)
104 {
105 if (!worker->idle && atomic_read(&worker->num_pending) <
106 worker->workers->idle_thresh / 2) {
107 unsigned long flags;
108 spin_lock_irqsave(&worker->workers->lock, flags);
109 worker->idle = 1;
110
111 /* the list may be empty if the worker is just starting */
112 if (!list_empty(&worker->worker_list) &&
113 !worker->workers->stopping) {
114 list_move(&worker->worker_list,
115 &worker->workers->idle_list);
116 }
117 spin_unlock_irqrestore(&worker->workers->lock, flags);
118 }
119 }
120
121 /*
122 * helper function to move a thread off the idle list after new
123 * pending work is added.
124 */
125 static void check_busy_worker(struct btrfs_worker_thread *worker)
126 {
127 if (worker->idle && atomic_read(&worker->num_pending) >=
128 worker->workers->idle_thresh) {
129 unsigned long flags;
130 spin_lock_irqsave(&worker->workers->lock, flags);
131 worker->idle = 0;
132
133 if (!list_empty(&worker->worker_list) &&
134 !worker->workers->stopping) {
135 list_move_tail(&worker->worker_list,
136 &worker->workers->worker_list);
137 }
138 spin_unlock_irqrestore(&worker->workers->lock, flags);
139 }
140 }
141
142 static void check_pending_worker_creates(struct btrfs_worker_thread *worker)
143 {
144 struct btrfs_workers *workers = worker->workers;
145 struct worker_start *start;
146 unsigned long flags;
147
148 rmb();
149 if (!workers->atomic_start_pending)
150 return;
151
152 start = kzalloc(sizeof(*start), GFP_NOFS);
153 if (!start)
154 return;
155
156 start->work.func = start_new_worker_func;
157 start->queue = workers;
158
159 spin_lock_irqsave(&workers->lock, flags);
160 if (!workers->atomic_start_pending)
161 goto out;
162
163 workers->atomic_start_pending = 0;
164 if (workers->num_workers + workers->num_workers_starting >=
165 workers->max_workers)
166 goto out;
167
168 workers->num_workers_starting += 1;
169 spin_unlock_irqrestore(&workers->lock, flags);
170 btrfs_queue_worker(workers->atomic_worker_start, &start->work);
171 return;
172
173 out:
174 kfree(start);
175 spin_unlock_irqrestore(&workers->lock, flags);
176 }
177
178 static noinline void run_ordered_completions(struct btrfs_workers *workers,
179 struct btrfs_work *work)
180 {
181 if (!workers->ordered)
182 return;
183
184 set_bit(WORK_DONE_BIT, &work->flags);
185
186 spin_lock(&workers->order_lock);
187
188 while (1) {
189 if (!list_empty(&workers->prio_order_list)) {
190 work = list_entry(workers->prio_order_list.next,
191 struct btrfs_work, order_list);
192 } else if (!list_empty(&workers->order_list)) {
193 work = list_entry(workers->order_list.next,
194 struct btrfs_work, order_list);
195 } else {
196 break;
197 }
198 if (!test_bit(WORK_DONE_BIT, &work->flags))
199 break;
200
201 /* we are going to call the ordered done function, but
202 * we leave the work item on the list as a barrier so
203 * that later work items that are done don't have their
204 * functions called before this one returns
205 */
206 if (test_and_set_bit(WORK_ORDER_DONE_BIT, &work->flags))
207 break;
208
209 spin_unlock(&workers->order_lock);
210
211 work->ordered_func(work);
212
213 /* now take the lock again and drop our item from the list */
214 spin_lock(&workers->order_lock);
215 list_del(&work->order_list);
216 spin_unlock(&workers->order_lock);
217
218 /*
219 * we don't want to call the ordered free functions
220 * with the lock held though
221 */
222 work->ordered_free(work);
223 spin_lock(&workers->order_lock);
224 }
225
226 spin_unlock(&workers->order_lock);
227 }
228
229 static void put_worker(struct btrfs_worker_thread *worker)
230 {
231 if (atomic_dec_and_test(&worker->refs))
232 kfree(worker);
233 }
234
235 static int try_worker_shutdown(struct btrfs_worker_thread *worker)
236 {
237 int freeit = 0;
238
239 spin_lock_irq(&worker->lock);
240 spin_lock(&worker->workers->lock);
241 if (worker->workers->num_workers > 1 &&
242 worker->idle &&
243 !worker->working &&
244 !list_empty(&worker->worker_list) &&
245 list_empty(&worker->prio_pending) &&
246 list_empty(&worker->pending) &&
247 atomic_read(&worker->num_pending) == 0) {
248 freeit = 1;
249 list_del_init(&worker->worker_list);
250 worker->workers->num_workers--;
251 }
252 spin_unlock(&worker->workers->lock);
253 spin_unlock_irq(&worker->lock);
254
255 if (freeit)
256 put_worker(worker);
257 return freeit;
258 }
259
260 static struct btrfs_work *get_next_work(struct btrfs_worker_thread *worker,
261 struct list_head *prio_head,
262 struct list_head *head)
263 {
264 struct btrfs_work *work = NULL;
265 struct list_head *cur = NULL;
266
267 if (!list_empty(prio_head)) {
268 cur = prio_head->next;
269 goto out;
270 }
271
272 smp_mb();
273 if (!list_empty(&worker->prio_pending))
274 goto refill;
275
276 if (!list_empty(head)) {
277 cur = head->next;
278 goto out;
279 }
280
281 refill:
282 spin_lock_irq(&worker->lock);
283 list_splice_tail_init(&worker->prio_pending, prio_head);
284 list_splice_tail_init(&worker->pending, head);
285
286 if (!list_empty(prio_head))
287 cur = prio_head->next;
288 else if (!list_empty(head))
289 cur = head->next;
290 spin_unlock_irq(&worker->lock);
291
292 if (!cur)
293 goto out_fail;
294
295 out:
296 work = list_entry(cur, struct btrfs_work, list);
297
298 out_fail:
299 return work;
300 }
301
302 /*
303 * main loop for servicing work items
304 */
305 static int worker_loop(void *arg)
306 {
307 struct btrfs_worker_thread *worker = arg;
308 struct list_head head;
309 struct list_head prio_head;
310 struct btrfs_work *work;
311
312 INIT_LIST_HEAD(&head);
313 INIT_LIST_HEAD(&prio_head);
314
315 do {
316 again:
317 while (1) {
318
319
320 work = get_next_work(worker, &prio_head, &head);
321 if (!work)
322 break;
323
324 list_del(&work->list);
325 clear_bit(WORK_QUEUED_BIT, &work->flags);
326
327 work->worker = worker;
328
329 work->func(work);
330
331 atomic_dec(&worker->num_pending);
332 /*
333 * unless this is an ordered work queue,
334 * 'work' was probably freed by func above.
335 */
336 run_ordered_completions(worker->workers, work);
337
338 check_pending_worker_creates(worker);
339 cond_resched();
340 }
341
342 spin_lock_irq(&worker->lock);
343 check_idle_worker(worker);
344
345 if (freezing(current)) {
346 worker->working = 0;
347 spin_unlock_irq(&worker->lock);
348 try_to_freeze();
349 } else {
350 spin_unlock_irq(&worker->lock);
351 if (!kthread_should_stop()) {
352 cpu_relax();
353 /*
354 * we've dropped the lock, did someone else
355 * jump_in?
356 */
357 smp_mb();
358 if (!list_empty(&worker->pending) ||
359 !list_empty(&worker->prio_pending))
360 continue;
361
362 /*
363 * this short schedule allows more work to
364 * come in without the queue functions
365 * needing to go through wake_up_process()
366 *
367 * worker->working is still 1, so nobody
368 * is going to try and wake us up
369 */
370 schedule_timeout(1);
371 smp_mb();
372 if (!list_empty(&worker->pending) ||
373 !list_empty(&worker->prio_pending))
374 continue;
375
376 if (kthread_should_stop())
377 break;
378
379 /* still no more work?, sleep for real */
380 spin_lock_irq(&worker->lock);
381 set_current_state(TASK_INTERRUPTIBLE);
382 if (!list_empty(&worker->pending) ||
383 !list_empty(&worker->prio_pending)) {
384 spin_unlock_irq(&worker->lock);
385 set_current_state(TASK_RUNNING);
386 goto again;
387 }
388
389 /*
390 * this makes sure we get a wakeup when someone
391 * adds something new to the queue
392 */
393 worker->working = 0;
394 spin_unlock_irq(&worker->lock);
395
396 if (!kthread_should_stop()) {
397 schedule_timeout(HZ * 120);
398 if (!worker->working &&
399 try_worker_shutdown(worker)) {
400 return 0;
401 }
402 }
403 }
404 __set_current_state(TASK_RUNNING);
405 }
406 } while (!kthread_should_stop());
407 return 0;
408 }
409
410 /*
411 * this will wait for all the worker threads to shutdown
412 */
413 void btrfs_stop_workers(struct btrfs_workers *workers)
414 {
415 struct list_head *cur;
416 struct btrfs_worker_thread *worker;
417 int can_stop;
418
419 spin_lock_irq(&workers->lock);
420 workers->stopping = 1;
421 list_splice_init(&workers->idle_list, &workers->worker_list);
422 while (!list_empty(&workers->worker_list)) {
423 cur = workers->worker_list.next;
424 worker = list_entry(cur, struct btrfs_worker_thread,
425 worker_list);
426
427 atomic_inc(&worker->refs);
428 workers->num_workers -= 1;
429 if (!list_empty(&worker->worker_list)) {
430 list_del_init(&worker->worker_list);
431 put_worker(worker);
432 can_stop = 1;
433 } else
434 can_stop = 0;
435 spin_unlock_irq(&workers->lock);
436 if (can_stop)
437 kthread_stop(worker->task);
438 spin_lock_irq(&workers->lock);
439 put_worker(worker);
440 }
441 spin_unlock_irq(&workers->lock);
442 }
443
444 /*
445 * simple init on struct btrfs_workers
446 */
447 void btrfs_init_workers(struct btrfs_workers *workers, char *name, int max,
448 struct btrfs_workers *async_helper)
449 {
450 workers->num_workers = 0;
451 workers->num_workers_starting = 0;
452 INIT_LIST_HEAD(&workers->worker_list);
453 INIT_LIST_HEAD(&workers->idle_list);
454 INIT_LIST_HEAD(&workers->order_list);
455 INIT_LIST_HEAD(&workers->prio_order_list);
456 spin_lock_init(&workers->lock);
457 spin_lock_init(&workers->order_lock);
458 workers->max_workers = max;
459 workers->idle_thresh = 32;
460 workers->name = name;
461 workers->ordered = 0;
462 workers->atomic_start_pending = 0;
463 workers->atomic_worker_start = async_helper;
464 workers->stopping = 0;
465 }
466
467 /*
468 * starts new worker threads. This does not enforce the max worker
469 * count in case you need to temporarily go past it.
470 */
471 static int __btrfs_start_workers(struct btrfs_workers *workers)
472 {
473 struct btrfs_worker_thread *worker;
474 int ret = 0;
475
476 worker = kzalloc(sizeof(*worker), GFP_NOFS);
477 if (!worker) {
478 ret = -ENOMEM;
479 goto fail;
480 }
481
482 INIT_LIST_HEAD(&worker->pending);
483 INIT_LIST_HEAD(&worker->prio_pending);
484 INIT_LIST_HEAD(&worker->worker_list);
485 spin_lock_init(&worker->lock);
486
487 atomic_set(&worker->num_pending, 0);
488 atomic_set(&worker->refs, 1);
489 worker->workers = workers;
490 worker->task = kthread_create(worker_loop, worker,
491 "btrfs-%s-%d", workers->name,
492 workers->num_workers + 1);
493 if (IS_ERR(worker->task)) {
494 ret = PTR_ERR(worker->task);
495 goto fail;
496 }
497
498 spin_lock_irq(&workers->lock);
499 if (workers->stopping) {
500 spin_unlock_irq(&workers->lock);
501 ret = -EINVAL;
502 goto fail_kthread;
503 }
504 list_add_tail(&worker->worker_list, &workers->idle_list);
505 worker->idle = 1;
506 workers->num_workers++;
507 workers->num_workers_starting--;
508 WARN_ON(workers->num_workers_starting < 0);
509 spin_unlock_irq(&workers->lock);
510
511 wake_up_process(worker->task);
512 return 0;
513
514 fail_kthread:
515 kthread_stop(worker->task);
516 fail:
517 kfree(worker);
518 spin_lock_irq(&workers->lock);
519 workers->num_workers_starting--;
520 spin_unlock_irq(&workers->lock);
521 return ret;
522 }
523
524 int btrfs_start_workers(struct btrfs_workers *workers)
525 {
526 spin_lock_irq(&workers->lock);
527 workers->num_workers_starting++;
528 spin_unlock_irq(&workers->lock);
529 return __btrfs_start_workers(workers);
530 }
531
532 /*
533 * run through the list and find a worker thread that doesn't have a lot
534 * to do right now. This can return null if we aren't yet at the thread
535 * count limit and all of the threads are busy.
536 */
537 static struct btrfs_worker_thread *next_worker(struct btrfs_workers *workers)
538 {
539 struct btrfs_worker_thread *worker;
540 struct list_head *next;
541 int enforce_min;
542
543 enforce_min = (workers->num_workers + workers->num_workers_starting) <
544 workers->max_workers;
545
546 /*
547 * if we find an idle thread, don't move it to the end of the
548 * idle list. This improves the chance that the next submission
549 * will reuse the same thread, and maybe catch it while it is still
550 * working
551 */
552 if (!list_empty(&workers->idle_list)) {
553 next = workers->idle_list.next;
554 worker = list_entry(next, struct btrfs_worker_thread,
555 worker_list);
556 return worker;
557 }
558 if (enforce_min || list_empty(&workers->worker_list))
559 return NULL;
560
561 /*
562 * if we pick a busy task, move the task to the end of the list.
563 * hopefully this will keep things somewhat evenly balanced.
564 * Do the move in batches based on the sequence number. This groups
565 * requests submitted at roughly the same time onto the same worker.
566 */
567 next = workers->worker_list.next;
568 worker = list_entry(next, struct btrfs_worker_thread, worker_list);
569 worker->sequence++;
570
571 if (worker->sequence % workers->idle_thresh == 0)
572 list_move_tail(next, &workers->worker_list);
573 return worker;
574 }
575
576 /*
577 * selects a worker thread to take the next job. This will either find
578 * an idle worker, start a new worker up to the max count, or just return
579 * one of the existing busy workers.
580 */
581 static struct btrfs_worker_thread *find_worker(struct btrfs_workers *workers)
582 {
583 struct btrfs_worker_thread *worker;
584 unsigned long flags;
585 struct list_head *fallback;
586 int ret;
587
588 spin_lock_irqsave(&workers->lock, flags);
589 again:
590 worker = next_worker(workers);
591
592 if (!worker) {
593 if (workers->num_workers + workers->num_workers_starting >=
594 workers->max_workers) {
595 goto fallback;
596 } else if (workers->atomic_worker_start) {
597 workers->atomic_start_pending = 1;
598 goto fallback;
599 } else {
600 workers->num_workers_starting++;
601 spin_unlock_irqrestore(&workers->lock, flags);
602 /* we're below the limit, start another worker */
603 ret = __btrfs_start_workers(workers);
604 spin_lock_irqsave(&workers->lock, flags);
605 if (ret)
606 goto fallback;
607 goto again;
608 }
609 }
610 goto found;
611
612 fallback:
613 fallback = NULL;
614 /*
615 * we have failed to find any workers, just
616 * return the first one we can find.
617 */
618 if (!list_empty(&workers->worker_list))
619 fallback = workers->worker_list.next;
620 if (!list_empty(&workers->idle_list))
621 fallback = workers->idle_list.next;
622 BUG_ON(!fallback);
623 worker = list_entry(fallback,
624 struct btrfs_worker_thread, worker_list);
625 found:
626 /*
627 * this makes sure the worker doesn't exit before it is placed
628 * onto a busy/idle list
629 */
630 atomic_inc(&worker->num_pending);
631 spin_unlock_irqrestore(&workers->lock, flags);
632 return worker;
633 }
634
635 /*
636 * btrfs_requeue_work just puts the work item back on the tail of the list
637 * it was taken from. It is intended for use with long running work functions
638 * that make some progress and want to give the cpu up for others.
639 */
640 void btrfs_requeue_work(struct btrfs_work *work)
641 {
642 struct btrfs_worker_thread *worker = work->worker;
643 unsigned long flags;
644 int wake = 0;
645
646 if (test_and_set_bit(WORK_QUEUED_BIT, &work->flags))
647 return;
648
649 spin_lock_irqsave(&worker->lock, flags);
650 if (test_bit(WORK_HIGH_PRIO_BIT, &work->flags))
651 list_add_tail(&work->list, &worker->prio_pending);
652 else
653 list_add_tail(&work->list, &worker->pending);
654 atomic_inc(&worker->num_pending);
655
656 /* by definition we're busy, take ourselves off the idle
657 * list
658 */
659 if (worker->idle) {
660 spin_lock(&worker->workers->lock);
661 worker->idle = 0;
662 list_move_tail(&worker->worker_list,
663 &worker->workers->worker_list);
664 spin_unlock(&worker->workers->lock);
665 }
666 if (!worker->working) {
667 wake = 1;
668 worker->working = 1;
669 }
670
671 if (wake)
672 wake_up_process(worker->task);
673 spin_unlock_irqrestore(&worker->lock, flags);
674 }
675
676 void btrfs_set_work_high_prio(struct btrfs_work *work)
677 {
678 set_bit(WORK_HIGH_PRIO_BIT, &work->flags);
679 }
680
681 /*
682 * places a struct btrfs_work into the pending queue of one of the kthreads
683 */
684 void btrfs_queue_worker(struct btrfs_workers *workers, struct btrfs_work *work)
685 {
686 struct btrfs_worker_thread *worker;
687 unsigned long flags;
688 int wake = 0;
689
690 /* don't requeue something already on a list */
691 if (test_and_set_bit(WORK_QUEUED_BIT, &work->flags))
692 return;
693
694 worker = find_worker(workers);
695 if (workers->ordered) {
696 /*
697 * you're not allowed to do ordered queues from an
698 * interrupt handler
699 */
700 spin_lock(&workers->order_lock);
701 if (test_bit(WORK_HIGH_PRIO_BIT, &work->flags)) {
702 list_add_tail(&work->order_list,
703 &workers->prio_order_list);
704 } else {
705 list_add_tail(&work->order_list, &workers->order_list);
706 }
707 spin_unlock(&workers->order_lock);
708 } else {
709 INIT_LIST_HEAD(&work->order_list);
710 }
711
712 spin_lock_irqsave(&worker->lock, flags);
713
714 if (test_bit(WORK_HIGH_PRIO_BIT, &work->flags))
715 list_add_tail(&work->list, &worker->prio_pending);
716 else
717 list_add_tail(&work->list, &worker->pending);
718 check_busy_worker(worker);
719
720 /*
721 * avoid calling into wake_up_process if this thread has already
722 * been kicked
723 */
724 if (!worker->working)
725 wake = 1;
726 worker->working = 1;
727
728 if (wake)
729 wake_up_process(worker->task);
730 spin_unlock_irqrestore(&worker->lock, flags);
731 }
732
733 struct btrfs_workqueue_struct {
734 struct workqueue_struct *normal_wq;
735 /* List head pointing to ordered work list */
736 struct list_head ordered_list;
737
738 /* Spinlock for ordered_list */
739 spinlock_t list_lock;
740 };
741
742 struct btrfs_workqueue_struct *btrfs_alloc_workqueue(char *name,
743 int flags,
744 int max_active)
745 {
746 struct btrfs_workqueue_struct *ret = kzalloc(sizeof(*ret), GFP_NOFS);
747
748 if (unlikely(!ret))
749 return NULL;
750
751 ret->normal_wq = alloc_workqueue("%s-%s", flags, max_active,
752 "btrfs", name);
753 if (unlikely(!ret->normal_wq)) {
754 kfree(ret);
755 return NULL;
756 }
757
758 INIT_LIST_HEAD(&ret->ordered_list);
759 spin_lock_init(&ret->list_lock);
760 return ret;
761 }
762
763 static void run_ordered_work(struct btrfs_workqueue_struct *wq)
764 {
765 struct list_head *list = &wq->ordered_list;
766 struct btrfs_work_struct *work;
767 spinlock_t *lock = &wq->list_lock;
768 unsigned long flags;
769
770 while (1) {
771 spin_lock_irqsave(lock, flags);
772 if (list_empty(list))
773 break;
774 work = list_entry(list->next, struct btrfs_work_struct,
775 ordered_list);
776 if (!test_bit(WORK_DONE_BIT, &work->flags))
777 break;
778
779 /*
780 * we are going to call the ordered done function, but
781 * we leave the work item on the list as a barrier so
782 * that later work items that are done don't have their
783 * functions called before this one returns
784 */
785 if (test_and_set_bit(WORK_ORDER_DONE_BIT, &work->flags))
786 break;
787 spin_unlock_irqrestore(lock, flags);
788 work->ordered_func(work);
789
790 /* now take the lock again and drop our item from the list */
791 spin_lock_irqsave(lock, flags);
792 list_del(&work->ordered_list);
793 spin_unlock_irqrestore(lock, flags);
794
795 /*
796 * we don't want to call the ordered free functions
797 * with the lock held though
798 */
799 work->ordered_free(work);
800 }
801 spin_unlock_irqrestore(lock, flags);
802 }
803
804 static void normal_work_helper(struct work_struct *arg)
805 {
806 struct btrfs_work_struct *work;
807 struct btrfs_workqueue_struct *wq;
808 int need_order = 0;
809
810 work = container_of(arg, struct btrfs_work_struct, normal_work);
811 /*
812 * We should not touch things inside work in the following cases:
813 * 1) after work->func() if it has no ordered_free
814 * Since the struct is freed in work->func().
815 * 2) after setting WORK_DONE_BIT
816 * The work may be freed in other threads almost instantly.
817 * So we save the needed things here.
818 */
819 if (work->ordered_func)
820 need_order = 1;
821 wq = work->wq;
822
823 work->func(work);
824 if (need_order) {
825 set_bit(WORK_DONE_BIT, &work->flags);
826 run_ordered_work(wq);
827 }
828 }
829
830 void btrfs_init_work(struct btrfs_work_struct *work,
831 void (*func)(struct btrfs_work_struct *),
832 void (*ordered_func)(struct btrfs_work_struct *),
833 void (*ordered_free)(struct btrfs_work_struct *))
834 {
835 work->func = func;
836 work->ordered_func = ordered_func;
837 work->ordered_free = ordered_free;
838 INIT_WORK(&work->normal_work, normal_work_helper);
839 INIT_LIST_HEAD(&work->ordered_list);
840 work->flags = 0;
841 }
842
843 void btrfs_queue_work(struct btrfs_workqueue_struct *wq,
844 struct btrfs_work_struct *work)
845 {
846 unsigned long flags;
847
848 work->wq = wq;
849 if (work->ordered_func) {
850 spin_lock_irqsave(&wq->list_lock, flags);
851 list_add_tail(&work->ordered_list, &wq->ordered_list);
852 spin_unlock_irqrestore(&wq->list_lock, flags);
853 }
854 queue_work(wq->normal_wq, &work->normal_work);
855 }
856
857 void btrfs_destroy_workqueue(struct btrfs_workqueue_struct *wq)
858 {
859 destroy_workqueue(wq->normal_wq);
860 kfree(wq);
861 }
862
863 void btrfs_workqueue_set_max(struct btrfs_workqueue_struct *wq, int max)
864 {
865 workqueue_set_max_active(wq->normal_wq, max);
866 }
This page took 0.107343 seconds and 5 git commands to generate.