Fix: ring buffer: handle concurrent update in nested buffer wrap around check
authorMathieu Desnoyers <mathieu.desnoyers@efficios.com>
Mon, 1 Jul 2013 21:01:56 +0000 (17:01 -0400)
committerMathieu Desnoyers <mathieu.desnoyers@efficios.com>
Mon, 1 Jul 2013 21:14:53 +0000 (17:14 -0400)
With stress-test loads that trigger sub-buffer switch very frequently
(small 4kB sub-buffers, frequent flush), we currently observe this kind
of warnings once every few minutes:

[65335.896208] ring buffer relay-overwrite-mmap, cpu 5: records were lost. Caused by:
[65335.896208]   [ 0 buffer full, 1 nest buffer wrap-around, 0 event too big ]

It appears that the check for nested buffer wrap-around does not take
into account that a concurrent execution contexts (either nested for
per-cpu buffers, or from another CPU or nested for global buffers) can
update the commit_count value concurrently.

What we really want to do with this check is to ensure that if we enter
a sub-buffer that had an unbalanced reserve/commit count, assuming there
is no hope that this gets rebalanced promptly, we detect this and drop
the current event. However, in the case where the commit counter has
been concurrently updated by another reserve or a switch, we want to
retry the entire reserve operation.

One way to detect this is to sample the reserve offset twice, around the
commit counter read, along with the appropriate memory barriers.
Therefore, we can detect if the mismatch between reserve and commit
counter is actually caused by a concurrent update, which necessarily has
updated the reserve counter.

Signed-off-by: Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
lib/ringbuffer/frontend_internal.h
lib/ringbuffer/ring_buffer_frontend.c

index bc284333f64c505e6e3fca646248073a8927f306..dbebdeec86f3362c30d59642ef9ab6910ca94a28 100644 (file)
@@ -331,6 +331,12 @@ void lib_ring_buffer_check_deliver(const struct lib_ring_buffer_config *config,
                 * The subbuffer size is least 2 bytes (minimum size: 1 page).
                 * This guarantees that old_commit_count + 1 != commit_count.
                 */
+
+               /*
+                * Order prior updates to reserve count prior to the
+                * commit_cold cc_sb update.
+                */
+               smp_wmb();
                if (likely(v_cmpxchg(config, &buf->commit_cold[idx].cc_sb,
                                         old_commit_count, old_commit_count + 1)
                           == old_commit_count)) {
@@ -373,6 +379,11 @@ void lib_ring_buffer_check_deliver(const struct lib_ring_buffer_config *config,
                        /* End of exclusive subbuffer access */
                        v_set(config, &buf->commit_cold[idx].cc_sb,
                              commit_count);
+                       /*
+                        * Order later updates to reserve count after
+                        * the commit_cold cc_sb update.
+                        */
+                       smp_wmb();
                        lib_ring_buffer_vmcore_check_deliver(config, buf,
                                                         commit_count, idx);
 
index 077be3c8c82c530e7a8c3aef1c6da5f7857b6b00..22cc62fcc02e7de82122801378a6bc6c3d89d7d1 100644 (file)
@@ -1559,9 +1559,10 @@ int lib_ring_buffer_try_reserve_slow(struct lib_ring_buffer *buf,
                                     struct lib_ring_buffer_ctx *ctx)
 {
        const struct lib_ring_buffer_config *config = &chan->backend.config;
-       unsigned long reserve_commit_diff;
+       unsigned long reserve_commit_diff, offset_cmp;
 
-       offsets->begin = v_read(config, &buf->offset);
+retry:
+       offsets->begin = offset_cmp = v_read(config, &buf->offset);
        offsets->old = offsets->begin;
        offsets->switch_new_start = 0;
        offsets->switch_new_end = 0;
@@ -1593,7 +1594,7 @@ int lib_ring_buffer_try_reserve_slow(struct lib_ring_buffer *buf,
                }
        }
        if (unlikely(offsets->switch_new_start)) {
-               unsigned long sb_index;
+               unsigned long sb_index, commit_count;
 
                /*
                 * We are typically not filling the previous buffer completely.
@@ -1604,12 +1605,31 @@ int lib_ring_buffer_try_reserve_slow(struct lib_ring_buffer *buf,
                                 + config->cb.subbuffer_header_size();
                /* Test new buffer integrity */
                sb_index = subbuf_index(offsets->begin, chan);
+               /*
+                * Read buf->offset before buf->commit_cold[sb_index].cc_sb.
+                * lib_ring_buffer_check_deliver() has the matching
+                * memory barriers required around commit_cold cc_sb
+                * updates to ensure reserve and commit counter updates
+                * are not seen reordered when updated by another CPU.
+                */
+               smp_rmb();
+               commit_count = v_read(config,
+                               &buf->commit_cold[sb_index].cc_sb);
+               /* Read buf->commit_cold[sb_index].cc_sb before buf->offset. */
+               smp_rmb();
+               if (unlikely(offset_cmp != v_read(config, &buf->offset))) {
+                       /*
+                        * The reserve counter have been concurrently updated
+                        * while we read the commit counter. This means the
+                        * commit counter we read might not match buf->offset
+                        * due to concurrent update. We therefore need to retry.
+                        */
+                       goto retry;
+               }
                reserve_commit_diff =
                  (buf_trunc(offsets->begin, chan)
                   >> chan->backend.num_subbuf_order)
-                 - ((unsigned long) v_read(config,
-                                           &buf->commit_cold[sb_index].cc_sb)
-                    & chan->commit_count_mask);
+                 - (commit_count & chan->commit_count_mask);
                if (likely(reserve_commit_diff == 0)) {
                        /* Next subbuffer not being written to. */
                        if (unlikely(config->mode != RING_BUFFER_OVERWRITE &&
@@ -1634,9 +1654,10 @@ int lib_ring_buffer_try_reserve_slow(struct lib_ring_buffer *buf,
                } else {
                        /*
                         * Next subbuffer reserve offset does not match the
-                        * commit offset. Drop record in producer-consumer and
-                        * overwrite mode. Caused by either a writer OOPS or too
-                        * many nested writes over a reserve/commit pair.
+                        * commit offset, and this did not involve update to the
+                        * reserve counter. Drop record in producer-consumer and
+                        * overwrite mode.  Caused by either a writer OOPS or
+                        * too many nested writes over a reserve/commit pair.
                         */
                        v_inc(config, &buf->records_lost_wrap);
                        return -EIO;
This page took 0.029524 seconds and 5 git commands to generate.