ring-buffer: use BUF_PAGE_HDR_SIZE in calculating index
[deliverable/linux.git] / kernel / trace / ring_buffer.c
index 94530236869125aa91bbb5fd54723309c73acc76..6b17a11e42a24aed7674be4f1b00cf6791797bc6 100644 (file)
@@ -205,6 +205,7 @@ EXPORT_SYMBOL_GPL(tracing_is_on);
 #define RB_EVNT_HDR_SIZE (offsetof(struct ring_buffer_event, array))
 #define RB_ALIGNMENT           4U
 #define RB_MAX_SMALL_DATA      (RB_ALIGNMENT * RINGBUF_TYPE_DATA_TYPE_LEN_MAX)
+#define RB_EVNT_MIN_SIZE       8U      /* two 32bit words */
 
 /* define RINGBUF_TYPE_DATA for 'case RINGBUF_TYPE_DATA:' */
 #define RINGBUF_TYPE_DATA 0 ... RINGBUF_TYPE_DATA_TYPE_LEN_MAX
@@ -370,6 +371,9 @@ static inline int test_time_stamp(u64 delta)
 /* Max payload is BUF_PAGE_SIZE - header (8bytes) */
 #define BUF_MAX_DATA_SIZE (BUF_PAGE_SIZE - (sizeof(u32) * 2))
 
+/* Max number of timestamps that can fit on a page */
+#define RB_TIMESTAMPS_PER_PAGE (BUF_PAGE_SIZE / RB_LEN_TIME_STAMP)
+
 int ring_buffer_print_page_header(struct trace_seq *s)
 {
        struct buffer_data_page field;
@@ -411,6 +415,8 @@ struct ring_buffer_per_cpu {
        unsigned long                   overrun;
        unsigned long                   read;
        local_t                         entries;
+       local_t                         committing;
+       local_t                         commits;
        u64                             write_stamp;
        u64                             read_stamp;
        atomic_t                        record_disabled;
@@ -423,6 +429,8 @@ struct ring_buffer {
        atomic_t                        record_disabled;
        cpumask_var_t                   cpumask;
 
+       struct lock_class_key           *reader_lock_key;
+
        struct mutex                    mutex;
 
        struct ring_buffer_per_cpu      **buffers;
@@ -562,6 +570,7 @@ rb_allocate_cpu_buffer(struct ring_buffer *buffer, int cpu)
        cpu_buffer->cpu = cpu;
        cpu_buffer->buffer = buffer;
        spin_lock_init(&cpu_buffer->reader_lock);
+       lockdep_set_class(&cpu_buffer->reader_lock, buffer->reader_lock_key);
        cpu_buffer->lock = (raw_spinlock_t)__RAW_SPIN_LOCK_UNLOCKED;
        INIT_LIST_HEAD(&cpu_buffer->pages);
 
@@ -632,7 +641,8 @@ static int rb_cpu_notify(struct notifier_block *self,
  * when the buffer wraps. If this flag is not set, the buffer will
  * drop data when the tail hits the head.
  */
-struct ring_buffer *ring_buffer_alloc(unsigned long size, unsigned flags)
+struct ring_buffer *__ring_buffer_alloc(unsigned long size, unsigned flags,
+                                       struct lock_class_key *key)
 {
        struct ring_buffer *buffer;
        int bsize;
@@ -655,6 +665,7 @@ struct ring_buffer *ring_buffer_alloc(unsigned long size, unsigned flags)
        buffer->pages = DIV_ROUND_UP(size, BUF_PAGE_SIZE);
        buffer->flags = flags;
        buffer->clock = trace_clock_local;
+       buffer->reader_lock_key = key;
 
        /* need at least two pages */
        if (buffer->pages == 1)
@@ -712,7 +723,7 @@ struct ring_buffer *ring_buffer_alloc(unsigned long size, unsigned flags)
        kfree(buffer);
        return NULL;
 }
-EXPORT_SYMBOL_GPL(ring_buffer_alloc);
+EXPORT_SYMBOL_GPL(__ring_buffer_alloc);
 
 /**
  * ring_buffer_free - free a ring buffer.
@@ -1002,12 +1013,12 @@ rb_event_index(struct ring_buffer_event *event)
 {
        unsigned long addr = (unsigned long)event;
 
-       return (addr & ~PAGE_MASK) - (PAGE_SIZE - BUF_PAGE_SIZE);
+       return (addr & ~PAGE_MASK) - BUF_PAGE_HDR_SIZE;
 }
 
 static inline int
-rb_is_commit(struct ring_buffer_per_cpu *cpu_buffer,
-            struct ring_buffer_event *event)
+rb_event_is_commit(struct ring_buffer_per_cpu *cpu_buffer,
+                  struct ring_buffer_event *event)
 {
        unsigned long addr = (unsigned long)event;
        unsigned long index;
@@ -1019,31 +1030,6 @@ rb_is_commit(struct ring_buffer_per_cpu *cpu_buffer,
                rb_commit_index(cpu_buffer) == index;
 }
 
-static void
-rb_set_commit_event(struct ring_buffer_per_cpu *cpu_buffer,
-                   struct ring_buffer_event *event)
-{
-       unsigned long addr = (unsigned long)event;
-       unsigned long index;
-
-       index = rb_event_index(event);
-       addr &= PAGE_MASK;
-
-       while (cpu_buffer->commit_page->page != (void *)addr) {
-               if (RB_WARN_ON(cpu_buffer,
-                         cpu_buffer->commit_page == cpu_buffer->tail_page))
-                       return;
-               cpu_buffer->commit_page->page->commit =
-                       cpu_buffer->commit_page->write;
-               rb_inc_page(cpu_buffer, &cpu_buffer->commit_page);
-               cpu_buffer->write_stamp =
-                       cpu_buffer->commit_page->page->time_stamp;
-       }
-
-       /* Now set the commit to the event's index */
-       local_set(&cpu_buffer->commit_page->page->commit, index);
-}
-
 static void
 rb_set_commit_to_write(struct ring_buffer_per_cpu *cpu_buffer)
 {
@@ -1162,6 +1148,59 @@ static unsigned rb_calculate_event_length(unsigned length)
        return length;
 }
 
+static inline void
+rb_reset_tail(struct ring_buffer_per_cpu *cpu_buffer,
+             struct buffer_page *tail_page,
+             unsigned long tail, unsigned long length)
+{
+       struct ring_buffer_event *event;
+
+       /*
+        * Only the event that crossed the page boundary
+        * must fill the old tail_page with padding.
+        */
+       if (tail >= BUF_PAGE_SIZE) {
+               local_sub(length, &tail_page->write);
+               return;
+       }
+
+       event = __rb_page_index(tail_page, tail);
+
+       /*
+        * If this event is bigger than the minimum size, then
+        * we need to be careful that we don't subtract the
+        * write counter enough to allow another writer to slip
+        * in on this page.
+        * We put in a discarded commit instead, to make sure
+        * that this space is not used again.
+        *
+        * If we are less than the minimum size, we don't need to
+        * worry about it.
+        */
+       if (tail > (BUF_PAGE_SIZE - RB_EVNT_MIN_SIZE)) {
+               /* No room for any events */
+
+               /* Mark the rest of the page with padding */
+               rb_event_set_padding(event);
+
+               /* Set the write back to the previous setting */
+               local_sub(length, &tail_page->write);
+               return;
+       }
+
+       /* Put in a discarded event */
+       event->array[0] = (BUF_PAGE_SIZE - tail) - RB_EVNT_HDR_SIZE;
+       event->type_len = RINGBUF_TYPE_PADDING;
+       /* time delta must be non zero */
+       event->time_delta = 1;
+       /* Account for this as an entry */
+       local_inc(&tail_page->entries);
+       local_inc(&cpu_buffer->entries);
+
+       /* Set write to end of buffer */
+       length = (tail + length) - BUF_PAGE_SIZE;
+       local_sub(length, &tail_page->write);
+}
 
 static struct ring_buffer_event *
 rb_move_tail(struct ring_buffer_per_cpu *cpu_buffer,
@@ -1171,7 +1210,6 @@ rb_move_tail(struct ring_buffer_per_cpu *cpu_buffer,
 {
        struct buffer_page *next_page, *head_page, *reader_page;
        struct ring_buffer *buffer = cpu_buffer->buffer;
-       struct ring_buffer_event *event;
        bool lock_taken = false;
        unsigned long flags;
 
@@ -1256,26 +1294,7 @@ rb_move_tail(struct ring_buffer_per_cpu *cpu_buffer,
                cpu_buffer->tail_page->page->time_stamp = *ts;
        }
 
-       /*
-        * The actual tail page has moved forward.
-        */
-       if (tail < BUF_PAGE_SIZE) {
-               /* Mark the rest of the page with padding */
-               event = __rb_page_index(tail_page, tail);
-               rb_event_set_padding(event);
-       }
-
-       /* Set the write back to the previous setting */
-       local_sub(length, &tail_page->write);
-
-       /*
-        * If this was a commit entry that failed,
-        * increment that too
-        */
-       if (tail_page == cpu_buffer->commit_page &&
-           tail == rb_commit_index(cpu_buffer)) {
-               rb_set_commit_to_write(cpu_buffer);
-       }
+       rb_reset_tail(cpu_buffer, tail_page, tail, length);
 
        __raw_spin_unlock(&cpu_buffer->lock);
        local_irq_restore(flags);
@@ -1285,7 +1304,7 @@ rb_move_tail(struct ring_buffer_per_cpu *cpu_buffer,
 
  out_reset:
        /* reset write */
-       local_sub(length, &tail_page->write);
+       rb_reset_tail(cpu_buffer, tail_page, tail, length);
 
        if (likely(lock_taken))
                __raw_spin_unlock(&cpu_buffer->lock);
@@ -1326,15 +1345,47 @@ __rb_reserve_next(struct ring_buffer_per_cpu *cpu_buffer,
                local_inc(&tail_page->entries);
 
        /*
-        * If this is a commit and the tail is zero, then update
-        * this page's time stamp.
+        * If this is the first commit on the page, then update
+        * its timestamp.
         */
-       if (!tail && rb_is_commit(cpu_buffer, event))
-               cpu_buffer->commit_page->page->time_stamp = *ts;
+       if (!tail)
+               tail_page->page->time_stamp = *ts;
 
        return event;
 }
 
+static inline int
+rb_try_to_discard(struct ring_buffer_per_cpu *cpu_buffer,
+                 struct ring_buffer_event *event)
+{
+       unsigned long new_index, old_index;
+       struct buffer_page *bpage;
+       unsigned long index;
+       unsigned long addr;
+
+       new_index = rb_event_index(event);
+       old_index = new_index + rb_event_length(event);
+       addr = (unsigned long)event;
+       addr &= PAGE_MASK;
+
+       bpage = cpu_buffer->tail_page;
+
+       if (bpage->page == (void *)addr && rb_page_write(bpage) == old_index) {
+               /*
+                * This is on the tail page. It is possible that
+                * a write could come in and move the tail page
+                * and write to the next page. That is fine
+                * because we just shorten what is on this page.
+                */
+               index = local_cmpxchg(&bpage->write, old_index, new_index);
+               if (index == old_index)
+                       return 1;
+       }
+
+       /* could not discard */
+       return 0;
+}
+
 static int
 rb_add_time_stamp(struct ring_buffer_per_cpu *cpu_buffer,
                  u64 *ts, u64 *delta)
@@ -1367,26 +1418,33 @@ rb_add_time_stamp(struct ring_buffer_per_cpu *cpu_buffer,
                return -EAGAIN;
 
        /* Only a commited time event can update the write stamp */
-       if (rb_is_commit(cpu_buffer, event)) {
+       if (rb_event_is_commit(cpu_buffer, event)) {
                /*
-                * If this is the first on the page, then we need to
-                * update the page itself, and just put in a zero.
+                * If this is the first on the page, then it was
+                * updated with the page itself. Try to discard it
+                * and if we can't just make it zero.
                 */
                if (rb_event_index(event)) {
                        event->time_delta = *delta & TS_MASK;
                        event->array[0] = *delta >> TS_SHIFT;
                } else {
-                       cpu_buffer->commit_page->page->time_stamp = *ts;
-                       event->time_delta = 0;
-                       event->array[0] = 0;
+                       /* try to discard, since we do not need this */
+                       if (!rb_try_to_discard(cpu_buffer, event)) {
+                               /* nope, just zero it */
+                               event->time_delta = 0;
+                               event->array[0] = 0;
+                       }
                }
                cpu_buffer->write_stamp = *ts;
                /* let the caller know this was the commit */
                ret = 1;
        } else {
-               /* Darn, this is just wasted space */
-               event->time_delta = 0;
-               event->array[0] = 0;
+               /* Try to discard the event */
+               if (!rb_try_to_discard(cpu_buffer, event)) {
+                       /* Darn, this is just wasted space */
+                       event->time_delta = 0;
+                       event->array[0] = 0;
+               }
                ret = 0;
        }
 
@@ -1395,6 +1453,44 @@ rb_add_time_stamp(struct ring_buffer_per_cpu *cpu_buffer,
        return ret;
 }
 
+static void rb_start_commit(struct ring_buffer_per_cpu *cpu_buffer)
+{
+       local_inc(&cpu_buffer->committing);
+       local_inc(&cpu_buffer->commits);
+}
+
+static void rb_end_commit(struct ring_buffer_per_cpu *cpu_buffer)
+{
+       unsigned long commits;
+
+       if (RB_WARN_ON(cpu_buffer,
+                      !local_read(&cpu_buffer->committing)))
+               return;
+
+ again:
+       commits = local_read(&cpu_buffer->commits);
+       /* synchronize with interrupts */
+       barrier();
+       if (local_read(&cpu_buffer->committing) == 1)
+               rb_set_commit_to_write(cpu_buffer);
+
+       local_dec(&cpu_buffer->committing);
+
+       /* synchronize with interrupts */
+       barrier();
+
+       /*
+        * Need to account for interrupts coming in between the
+        * updating of the commit page and the clearing of the
+        * committing counter.
+        */
+       if (unlikely(local_read(&cpu_buffer->commits) != commits) &&
+           !local_read(&cpu_buffer->committing)) {
+               local_inc(&cpu_buffer->committing);
+               goto again;
+       }
+}
+
 static struct ring_buffer_event *
 rb_reserve_next_event(struct ring_buffer_per_cpu *cpu_buffer,
                      unsigned long length)
@@ -1404,6 +1500,8 @@ rb_reserve_next_event(struct ring_buffer_per_cpu *cpu_buffer,
        int commit = 0;
        int nr_loops = 0;
 
+       rb_start_commit(cpu_buffer);
+
        length = rb_calculate_event_length(length);
  again:
        /*
@@ -1416,7 +1514,7 @@ rb_reserve_next_event(struct ring_buffer_per_cpu *cpu_buffer,
         * Bail!
         */
        if (RB_WARN_ON(cpu_buffer, ++nr_loops > 1000))
-               return NULL;
+               goto out_fail;
 
        ts = rb_time_stamp(cpu_buffer->buffer, cpu_buffer->cpu);
 
@@ -1447,7 +1545,7 @@ rb_reserve_next_event(struct ring_buffer_per_cpu *cpu_buffer,
 
                        commit = rb_add_time_stamp(cpu_buffer, &ts, &delta);
                        if (commit == -EBUSY)
-                               return NULL;
+                               goto out_fail;
 
                        if (commit == -EAGAIN)
                                goto again;
@@ -1461,28 +1559,19 @@ rb_reserve_next_event(struct ring_buffer_per_cpu *cpu_buffer,
        if (unlikely(PTR_ERR(event) == -EAGAIN))
                goto again;
 
-       if (!event) {
-               if (unlikely(commit))
-                       /*
-                        * Ouch! We needed a timestamp and it was commited. But
-                        * we didn't get our event reserved.
-                        */
-                       rb_set_commit_to_write(cpu_buffer);
-               return NULL;
-       }
+       if (!event)
+               goto out_fail;
 
-       /*
-        * If the timestamp was commited, make the commit our entry
-        * now so that we will update it when needed.
-        */
-       if (unlikely(commit))
-               rb_set_commit_event(cpu_buffer, event);
-       else if (!rb_is_commit(cpu_buffer, event))
+       if (!rb_event_is_commit(cpu_buffer, event))
                delta = 0;
 
        event->time_delta = delta;
 
        return event;
+
+ out_fail:
+       rb_end_commit(cpu_buffer);
+       return NULL;
 }
 
 #define TRACE_RECURSIVE_DEPTH 16
@@ -1592,13 +1681,14 @@ static void rb_commit(struct ring_buffer_per_cpu *cpu_buffer,
 {
        local_inc(&cpu_buffer->entries);
 
-       /* Only process further if we own the commit */
-       if (!rb_is_commit(cpu_buffer, event))
-               return;
-
-       cpu_buffer->write_stamp += event->time_delta;
+       /*
+        * The event first in the commit queue updates the
+        * time stamp.
+        */
+       if (rb_event_is_commit(cpu_buffer, event))
+               cpu_buffer->write_stamp += event->time_delta;
 
-       rb_set_commit_to_write(cpu_buffer);
+       rb_end_commit(cpu_buffer);
 }
 
 /**
@@ -1682,43 +1772,23 @@ void ring_buffer_discard_commit(struct ring_buffer *buffer,
                                struct ring_buffer_event *event)
 {
        struct ring_buffer_per_cpu *cpu_buffer;
-       unsigned long new_index, old_index;
-       struct buffer_page *bpage;
-       unsigned long index;
-       unsigned long addr;
        int cpu;
 
        /* The event is discarded regardless */
        rb_event_discard(event);
 
+       cpu = smp_processor_id();
+       cpu_buffer = buffer->buffers[cpu];
+
        /*
         * This must only be called if the event has not been
         * committed yet. Thus we can assume that preemption
         * is still disabled.
         */
-       RB_WARN_ON(buffer, preemptible());
-
-       cpu = smp_processor_id();
-       cpu_buffer = buffer->buffers[cpu];
+       RB_WARN_ON(buffer, !local_read(&cpu_buffer->committing));
 
-       new_index = rb_event_index(event);
-       old_index = new_index + rb_event_length(event);
-       addr = (unsigned long)event;
-       addr &= PAGE_MASK;
-
-       bpage = cpu_buffer->tail_page;
-
-       if (bpage->page == (void *)addr && rb_page_write(bpage) == old_index) {
-               /*
-                * This is on the tail page. It is possible that
-                * a write could come in and move the tail page
-                * and write to the next page. That is fine
-                * because we just shorten what is on this page.
-                */
-               index = local_cmpxchg(&bpage->write, old_index, new_index);
-               if (index == old_index)
-                       goto out;
-       }
+       if (!rb_try_to_discard(cpu_buffer, event))
+               goto out;
 
        /*
         * The commit is still visible by the reader, so we
@@ -1726,13 +1796,7 @@ void ring_buffer_discard_commit(struct ring_buffer *buffer,
         */
        local_inc(&cpu_buffer->entries);
  out:
-       /*
-        * If a write came in and pushed the tail page
-        * we still need to update the commit pointer
-        * if we were the commit.
-        */
-       if (rb_is_commit(cpu_buffer, event))
-               rb_set_commit_to_write(cpu_buffer);
+       rb_end_commit(cpu_buffer);
 
        trace_recursive_unlock();
 
@@ -2253,8 +2317,8 @@ static void rb_advance_iter(struct ring_buffer_iter *iter)
         * Check if we are at the end of the buffer.
         */
        if (iter->head >= rb_page_size(iter->head_page)) {
-               if (RB_WARN_ON(buffer,
-                              iter->head_page == cpu_buffer->commit_page))
+               /* discarded commits can make the page empty */
+               if (iter->head_page == cpu_buffer->commit_page)
                        return;
                rb_inc_iter(iter);
                return;
@@ -2297,12 +2361,10 @@ rb_buffer_peek(struct ring_buffer *buffer, int cpu, u64 *ts)
        /*
         * We repeat when a timestamp is encountered. It is possible
         * to get multiple timestamps from an interrupt entering just
-        * as one timestamp is about to be written. The max times
-        * that this can happen is the number of nested interrupts we
-        * can have.  Nesting 10 deep of interrupts is clearly
-        * an anomaly.
+        * as one timestamp is about to be written, or from discarded
+        * commits. The most that we can have is the number on a single page.
         */
-       if (RB_WARN_ON(cpu_buffer, ++nr_loops > 10))
+       if (RB_WARN_ON(cpu_buffer, ++nr_loops > RB_TIMESTAMPS_PER_PAGE))
                return NULL;
 
        reader = rb_get_reader_page(cpu_buffer);
@@ -2368,14 +2430,14 @@ rb_iter_peek(struct ring_buffer_iter *iter, u64 *ts)
 
  again:
        /*
-        * We repeat when a timestamp is encountered. It is possible
-        * to get multiple timestamps from an interrupt entering just
-        * as one timestamp is about to be written. The max times
-        * that this can happen is the number of nested interrupts we
-        * can have. Nesting 10 deep of interrupts is clearly
-        * an anomaly.
+        * We repeat when a timestamp is encountered.
+        * We can get multiple timestamps by nested interrupts or also
+        * if filtering is on (discarding commits). Since discarding
+        * commits can be frequent we can get a lot of timestamps.
+        * But we limit them by not adding timestamps if they begin
+        * at the start of a page.
         */
-       if (RB_WARN_ON(cpu_buffer, ++nr_loops > 10))
+       if (RB_WARN_ON(cpu_buffer, ++nr_loops > RB_TIMESTAMPS_PER_PAGE))
                return NULL;
 
        if (rb_per_cpu_empty(cpu_buffer))
@@ -2652,6 +2714,8 @@ rb_reset_cpu(struct ring_buffer_per_cpu *cpu_buffer)
        cpu_buffer->overrun = 0;
        cpu_buffer->read = 0;
        local_set(&cpu_buffer->entries, 0);
+       local_set(&cpu_buffer->committing, 0);
+       local_set(&cpu_buffer->commits, 0);
 
        cpu_buffer->write_stamp = 0;
        cpu_buffer->read_stamp = 0;
@@ -3080,7 +3144,7 @@ static int rb_cpu_notify(struct notifier_block *self,
        switch (action) {
        case CPU_UP_PREPARE:
        case CPU_UP_PREPARE_FROZEN:
-               if (cpu_isset(cpu, *buffer->cpumask))
+               if (cpumask_test_cpu(cpu, buffer->cpumask))
                        return NOTIFY_OK;
 
                buffer->buffers[cpu] =
@@ -3091,7 +3155,7 @@ static int rb_cpu_notify(struct notifier_block *self,
                        return NOTIFY_OK;
                }
                smp_wmb();
-               cpu_set(cpu, *buffer->cpumask);
+               cpumask_set_cpu(cpu, buffer->cpumask);
                break;
        case CPU_DOWN_PREPARE:
        case CPU_DOWN_PREPARE_FROZEN:
This page took 0.035792 seconds and 5 git commands to generate.