bcache: Pull on disk data structures out into a separate header
[deliverable/linux.git] / drivers / md / bcache / journal.c
index 8435f81e5d858012e8aca6be8204e923a34b1d01..ecdaa671bd50457bf38d1cf9f896ffd1c8352546 100644 (file)
@@ -7,7 +7,6 @@
 #include "bcache.h"
 #include "btree.h"
 #include "debug.h"
-#include "request.h"
 
 #include <trace/events/bcache.h>
 
@@ -31,17 +30,20 @@ static void journal_read_endio(struct bio *bio, int error)
 }
 
 static int journal_read_bucket(struct cache *ca, struct list_head *list,
-                              struct btree_op *op, unsigned bucket_index)
+                              unsigned bucket_index)
 {
        struct journal_device *ja = &ca->journal;
        struct bio *bio = &ja->bio;
 
        struct journal_replay *i;
        struct jset *j, *data = ca->set->journal.w[0].data;
+       struct closure cl;
        unsigned len, left, offset = 0;
        int ret = 0;
        sector_t bucket = bucket_to_sector(ca->set, ca->sb.d[bucket_index]);
 
+       closure_init_stack(&cl);
+
        pr_debug("reading %llu", (uint64_t) bucket);
 
        while (offset < ca->sb.bucket_size) {
@@ -55,11 +57,11 @@ reread:             left = ca->sb.bucket_size - offset;
                bio->bi_size    = len << 9;
 
                bio->bi_end_io  = journal_read_endio;
-               bio->bi_private = &op->cl;
+               bio->bi_private = &cl;
                bch_bio_map(bio, data);
 
-               closure_bio_submit(bio, &op->cl, ca);
-               closure_sync(&op->cl);
+               closure_bio_submit(bio, &cl, ca);
+               closure_sync(&cl);
 
                /* This function could be simpler now since we no longer write
                 * journal entries that overlap bucket boundaries; this means
@@ -72,7 +74,7 @@ reread:               left = ca->sb.bucket_size - offset;
                        struct list_head *where;
                        size_t blocks, bytes = set_bytes(j);
 
-                       if (j->magic != jset_magic(ca->set))
+                       if (j->magic != jset_magic(&ca->sb))
                                return ret;
 
                        if (bytes > left << 9)
@@ -129,12 +131,11 @@ next_set:
        return ret;
 }
 
-int bch_journal_read(struct cache_set *c, struct list_head *list,
-                       struct btree_op *op)
+int bch_journal_read(struct cache_set *c, struct list_head *list)
 {
 #define read_bucket(b)                                                 \
        ({                                                              \
-               int ret = journal_read_bucket(ca, list, op, b);         \
+               int ret = journal_read_bucket(ca, list, b);             \
                __set_bit(b, bitmap);                                   \
                if (ret < 0)                                            \
                        return ret;                                     \
@@ -292,8 +293,7 @@ void bch_journal_mark(struct cache_set *c, struct list_head *list)
        }
 }
 
-int bch_journal_replay(struct cache_set *s, struct list_head *list,
-                         struct btree_op *op)
+int bch_journal_replay(struct cache_set *s, struct list_head *list)
 {
        int ret = 0, keys = 0, entries = 0;
        struct bkey *k;
@@ -301,31 +301,30 @@ int bch_journal_replay(struct cache_set *s, struct list_head *list,
                list_entry(list->prev, struct journal_replay, list);
 
        uint64_t start = i->j.last_seq, end = i->j.seq, n = start;
+       struct keylist keylist;
+
+       bch_keylist_init(&keylist);
 
        list_for_each_entry(i, list, list) {
                BUG_ON(i->pin && atomic_read(i->pin) != 1);
 
-               if (n != i->j.seq)
-                       pr_err(
-               "journal entries %llu-%llu missing! (replaying %llu-%llu)\n",
-               n, i->j.seq - 1, start, end);
+               cache_set_err_on(n != i->j.seq, s,
+"bcache: journal entries %llu-%llu missing! (replaying %llu-%llu)",
+                                n, i->j.seq - 1, start, end);
 
                for (k = i->j.start;
                     k < end(&i->j);
                     k = bkey_next(k)) {
                        trace_bcache_journal_replay_key(k);
 
-                       bkey_copy(op->keys.top, k);
-                       bch_keylist_push(&op->keys);
-
-                       op->journal = i->pin;
-                       atomic_inc(op->journal);
+                       bkey_copy(keylist.top, k);
+                       bch_keylist_push(&keylist);
 
-                       ret = bch_btree_insert(op, s);
+                       ret = bch_btree_insert(s, &keylist, i->pin, NULL);
                        if (ret)
                                goto err;
 
-                       BUG_ON(!bch_keylist_empty(&op->keys));
+                       BUG_ON(!bch_keylist_empty(&keylist));
                        keys++;
 
                        cond_resched();
@@ -339,14 +338,13 @@ int bch_journal_replay(struct cache_set *s, struct list_head *list,
 
        pr_info("journal replay done, %i keys in %i entries, seq %llu",
                keys, entries, end);
-
+err:
        while (!list_empty(list)) {
                i = list_first_entry(list, struct journal_replay, list);
                list_del(&i->list);
                kfree(i);
        }
-err:
-       closure_sync(&op->cl);
+
        return ret;
 }
 
@@ -358,48 +356,35 @@ static void btree_flush_write(struct cache_set *c)
         * Try to find the btree node with that references the oldest journal
         * entry, best is our current candidate and is locked if non NULL:
         */
-       struct btree *b, *best = NULL;
-       unsigned iter;
+       struct btree *b, *best;
+       unsigned i;
+retry:
+       best = NULL;
+
+       for_each_cached_btree(b, c, i)
+               if (btree_current_write(b)->journal) {
+                       if (!best)
+                               best = b;
+                       else if (journal_pin_cmp(c,
+                                       btree_current_write(best)->journal,
+                                       btree_current_write(b)->journal)) {
+                               best = b;
+                       }
+               }
 
-       for_each_cached_btree(b, c, iter) {
-               if (!down_write_trylock(&b->lock))
-                       continue;
+       b = best;
+       if (b) {
+               rw_lock(true, b, b->level);
 
-               if (!btree_node_dirty(b) ||
-                   !btree_current_write(b)->journal) {
+               if (!btree_current_write(b)->journal) {
                        rw_unlock(true, b);
-                       continue;
+                       /* We raced */
+                       goto retry;
                }
 
-               if (!best)
-                       best = b;
-               else if (journal_pin_cmp(c,
-                                        btree_current_write(best),
-                                        btree_current_write(b))) {
-                       rw_unlock(true, best);
-                       best = b;
-               } else
-                       rw_unlock(true, b);
+               bch_btree_node_write(b, NULL);
+               rw_unlock(true, b);
        }
-
-       if (best)
-               goto out;
-
-       /* We can't find the best btree node, just pick the first */
-       list_for_each_entry(b, &c->btree_cache, list)
-               if (!b->level && btree_node_dirty(b)) {
-                       best = b;
-                       rw_lock(true, best, best->level);
-                       goto found;
-               }
-
-out:
-       if (!best)
-               return;
-found:
-       if (btree_node_dirty(best))
-               bch_btree_node_write(best, NULL);
-       rw_unlock(true, best);
 }
 
 #define last_seq(j)    ((j)->seq - fifo_used(&(j)->pin) + 1)
@@ -495,7 +480,7 @@ static void journal_reclaim(struct cache_set *c)
                do_journal_discard(ca);
 
        if (c->journal.blocks_free)
-               return;
+               goto out;
 
        /*
         * Allocate:
@@ -521,7 +506,7 @@ static void journal_reclaim(struct cache_set *c)
 
        if (n)
                c->journal.blocks_free = c->sb.bucket_size >> c->block_bits;
-
+out:
        if (!journal_full(&c->journal))
                __closure_wake_up(&c->journal.wait);
 }
@@ -554,32 +539,26 @@ static void journal_write_endio(struct bio *bio, int error)
        struct journal_write *w = bio->bi_private;
 
        cache_set_err_on(error, w->c, "journal io error");
-       closure_put(&w->c->journal.io.cl);
+       closure_put(&w->c->journal.io);
 }
 
 static void journal_write(struct closure *);
 
 static void journal_write_done(struct closure *cl)
 {
-       struct journal *j = container_of(cl, struct journal, io.cl);
-       struct cache_set *c = container_of(j, struct cache_set, journal);
-
+       struct journal *j = container_of(cl, struct journal, io);
        struct journal_write *w = (j->cur == j->w)
                ? &j->w[1]
                : &j->w[0];
 
        __closure_wake_up(&w->wait);
-
-       if (c->journal_delay_ms)
-               closure_delay(&j->io, msecs_to_jiffies(c->journal_delay_ms));
-
-       continue_at(cl, journal_write, system_wq);
+       continue_at_nobarrier(cl, journal_write, system_wq);
 }
 
 static void journal_write_unlocked(struct closure *cl)
        __releases(c->journal.lock)
 {
-       struct cache_set *c = container_of(cl, struct cache_set, journal.io.cl);
+       struct cache_set *c = container_of(cl, struct cache_set, journal.io);
        struct cache *ca;
        struct journal_write *w = c->journal.cur;
        struct bkey *k = &c->journal.key;
@@ -617,7 +596,7 @@ static void journal_write_unlocked(struct closure *cl)
        for_each_cache(ca, c, i)
                w->data->prio_bucket[ca->sb.nr_this_dev] = ca->prio_buckets[0];
 
-       w->data->magic          = jset_magic(c);
+       w->data->magic          = jset_magic(&c->sb);
        w->data->version        = BCACHE_JSET_VERSION;
        w->data->last_seq       = last_seq(&c->journal);
        w->data->csum           = csum_set(w->data);
@@ -660,121 +639,134 @@ static void journal_write_unlocked(struct closure *cl)
 
 static void journal_write(struct closure *cl)
 {
-       struct cache_set *c = container_of(cl, struct cache_set, journal.io.cl);
+       struct cache_set *c = container_of(cl, struct cache_set, journal.io);
 
        spin_lock(&c->journal.lock);
        journal_write_unlocked(cl);
 }
 
-static void __journal_try_write(struct cache_set *c, bool noflush)
+static void journal_try_write(struct cache_set *c)
        __releases(c->journal.lock)
 {
-       struct closure *cl = &c->journal.io.cl;
+       struct closure *cl = &c->journal.io;
+       struct journal_write *w = c->journal.cur;
 
-       if (!closure_trylock(cl, &c->cl))
-               spin_unlock(&c->journal.lock);
-       else if (noflush && journal_full(&c->journal)) {
-               spin_unlock(&c->journal.lock);
-               continue_at(cl, journal_write, system_wq);
-       } else
+       w->need_write = true;
+
+       if (closure_trylock(cl, &c->cl))
                journal_write_unlocked(cl);
+       else
+               spin_unlock(&c->journal.lock);
 }
 
-#define journal_try_write(c)   __journal_try_write(c, false)
-
-void bch_journal_meta(struct cache_set *c, struct closure *cl)
+static struct journal_write *journal_wait_for_write(struct cache_set *c,
+                                                   unsigned nkeys)
 {
-       struct journal_write *w;
+       size_t sectors;
+       struct closure cl;
 
-       if (CACHE_SYNC(&c->sb)) {
-               spin_lock(&c->journal.lock);
+       closure_init_stack(&cl);
+
+       spin_lock(&c->journal.lock);
 
-               w = c->journal.cur;
-               w->need_write = true;
+       while (1) {
+               struct journal_write *w = c->journal.cur;
 
-               if (cl)
-                       BUG_ON(!closure_wait(&w->wait, cl));
+               sectors = __set_blocks(w->data, w->data->keys + nkeys,
+                                      c) * c->sb.block_size;
 
-               closure_flush(&c->journal.io);
-               __journal_try_write(c, true);
+               if (sectors <= min_t(size_t,
+                                    c->journal.blocks_free * c->sb.block_size,
+                                    PAGE_SECTORS << JSET_BITS))
+                       return w;
+
+               /* XXX: tracepoint */
+               if (!journal_full(&c->journal)) {
+                       trace_bcache_journal_entry_full(c);
+
+                       /*
+                        * XXX: If we were inserting so many keys that they
+                        * won't fit in an _empty_ journal write, we'll
+                        * deadlock. For now, handle this in
+                        * bch_keylist_realloc() - but something to think about.
+                        */
+                       BUG_ON(!w->data->keys);
+
+                       closure_wait(&w->wait, &cl);
+                       journal_try_write(c); /* unlocks */
+               } else {
+                       trace_bcache_journal_full(c);
+
+                       closure_wait(&c->journal.wait, &cl);
+                       journal_reclaim(c);
+                       spin_unlock(&c->journal.lock);
+
+                       btree_flush_write(c);
+               }
+
+               closure_sync(&cl);
+               spin_lock(&c->journal.lock);
        }
 }
 
+static void journal_write_work(struct work_struct *work)
+{
+       struct cache_set *c = container_of(to_delayed_work(work),
+                                          struct cache_set,
+                                          journal.work);
+       spin_lock(&c->journal.lock);
+       journal_try_write(c);
+}
+
 /*
  * Entry point to the journalling code - bio_insert() and btree_invalidate()
  * pass bch_journal() a list of keys to be journalled, and then
  * bch_journal() hands those same keys off to btree_insert_async()
  */
 
-void bch_journal(struct closure *cl)
+atomic_t *bch_journal(struct cache_set *c,
+                     struct keylist *keys,
+                     struct closure *parent)
 {
-       struct btree_op *op = container_of(cl, struct btree_op, cl);
-       struct cache_set *c = op->c;
        struct journal_write *w;
-       size_t b, n = ((uint64_t *) op->keys.top) - op->keys.list;
-
-       if (op->type != BTREE_INSERT ||
-           !CACHE_SYNC(&c->sb))
-               goto out;
+       atomic_t *ret;
 
-       /*
-        * If we're looping because we errored, might already be waiting on
-        * another journal write:
-        */
-       while (atomic_read(&cl->parent->remaining) & CLOSURE_WAITING)
-               closure_sync(cl->parent);
+       if (!CACHE_SYNC(&c->sb))
+               return NULL;
 
-       spin_lock(&c->journal.lock);
+       w = journal_wait_for_write(c, bch_keylist_nkeys(keys));
 
-       if (journal_full(&c->journal)) {
-               trace_bcache_journal_full(c);
+       memcpy(end(w->data), keys->keys, bch_keylist_bytes(keys));
+       w->data->keys += bch_keylist_nkeys(keys);
 
-               closure_wait(&c->journal.wait, cl);
+       ret = &fifo_back(&c->journal.pin);
+       atomic_inc(ret);
 
-               journal_reclaim(c);
+       if (parent) {
+               closure_wait(&w->wait, parent);
+               journal_try_write(c);
+       } else if (!w->need_write) {
+               schedule_delayed_work(&c->journal.work,
+                                     msecs_to_jiffies(c->journal_delay_ms));
+               spin_unlock(&c->journal.lock);
+       } else {
                spin_unlock(&c->journal.lock);
-
-               btree_flush_write(c);
-               continue_at(cl, bch_journal, bcache_wq);
        }
 
-       w = c->journal.cur;
-       w->need_write = true;
-       b = __set_blocks(w->data, w->data->keys + n, c);
-
-       if (b * c->sb.block_size > PAGE_SECTORS << JSET_BITS ||
-           b > c->journal.blocks_free) {
-               trace_bcache_journal_entry_full(c);
-
-               /*
-                * XXX: If we were inserting so many keys that they won't fit in
-                * an _empty_ journal write, we'll deadlock. For now, handle
-                * this in bch_keylist_realloc() - but something to think about.
-                */
-               BUG_ON(!w->data->keys);
-
-               BUG_ON(!closure_wait(&w->wait, cl));
-
-               closure_flush(&c->journal.io);
 
-               journal_try_write(c);
-               continue_at(cl, bch_journal, bcache_wq);
-       }
-
-       memcpy(end(w->data), op->keys.list, n * sizeof(uint64_t));
-       w->data->keys += n;
+       return ret;
+}
 
-       op->journal = &fifo_back(&c->journal.pin);
-       atomic_inc(op->journal);
+void bch_journal_meta(struct cache_set *c, struct closure *cl)
+{
+       struct keylist keys;
+       atomic_t *ref;
 
-       if (op->flush_journal) {
-               closure_flush(&c->journal.io);
-               closure_wait(&w->wait, cl->parent);
-       }
+       bch_keylist_init(&keys);
 
-       journal_try_write(c);
-out:
-       bch_btree_insert_async(cl);
+       ref = bch_journal(c, &keys, cl);
+       if (ref)
+               atomic_dec_bug(ref);
 }
 
 void bch_journal_free(struct cache_set *c)
@@ -790,6 +782,7 @@ int bch_journal_alloc(struct cache_set *c)
 
        closure_init_unlocked(&j->io);
        spin_lock_init(&j->lock);
+       INIT_DELAYED_WORK(&j->work, journal_write_work);
 
        c->journal_delay_ms = 100;
 
This page took 0.030572 seconds and 5 git commands to generate.