2 * ltt/ltt-relay-lockless.c
4 * (C) Copyright 2005-2008 - Mathieu Desnoyers (mathieu.desnoyers@polymtl.ca)
6 * LTTng lockless buffer space management (reader/writer).
9 * Mathieu Desnoyers (mathieu.desnoyers@polymtl.ca)
12 * Karim Yaghmour (karim@opersys.com)
13 * Tom Zanussi (zanussi@us.ibm.com)
14 * Bob Wisniewski (bob@watson.ibm.com)
16 * Bob Wisniewski (bob@watson.ibm.com)
20 * 19/10/05, Complete lockless mechanism.
21 * 27/05/05, Modular redesign and rewrite.
23 * Userspace reader semantic :
24 * while (poll fd != POLLHUP) {
25 * - ioctl RELAY_GET_SUBBUF_SIZE
28 * - splice 1 subbuffer worth of data to a pipe
29 * - splice the data from pipe to disk/network
30 * - ioctl PUT_SUBBUF, check error value
31 * if err val < 0, previous subbuffer was corrupted.
35 * Dual LGPL v2.1/GPL v2 license.
38 #include <linux/time.h>
39 #include <linux/module.h>
40 #include <linux/string.h>
41 #include <linux/slab.h>
42 #include <linux/init.h>
43 #include <linux/rcupdate.h>
44 #include <linux/timer.h>
45 #include <linux/sched.h>
46 #include <linux/bitops.h>
47 #include <linux/smp_lock.h>
48 #include <linux/stat.h>
49 #include <linux/cpu.h>
50 #include <linux/idle.h>
51 #include <linux/delay.h>
52 #include <linux/notifier.h>
53 #include <asm/atomic.h>
54 #include <asm/local.h>
56 #include "ltt-tracer.h"
57 #include "ltt-relay.h"
58 #include "ltt-relay-lockless.h"
61 #define printk_dbg(fmt, args...) printk(fmt, args)
63 #define printk_dbg(fmt, args...)
66 struct ltt_reserve_switch_offsets
{
68 long begin_switch
, end_switch_current
, end_switch_old
;
69 size_t before_hdr_pad
, size
;
73 void ltt_force_switch(struct ltt_chanbuf
*buf
, enum force_switch_mode mode
);
76 void ltt_relay_print_buffer_errors(struct ltt_chan
*chan
, unsigned int cpu
);
78 static const struct file_operations ltt_file_operations
;
81 void ltt_buffer_begin(struct ltt_chanbuf
*buf
, u64 tsc
, unsigned int subbuf_idx
)
83 struct ltt_chan
*chan
= container_of(buf
->a
.chan
, struct ltt_chan
, a
);
84 struct ltt_subbuffer_header
*header
=
85 (struct ltt_subbuffer_header
*)
86 ltt_relay_offset_address(&buf
->a
,
87 subbuf_idx
* chan
->a
.sb_size
);
89 header
->cycle_count_begin
= tsc
;
90 header
->data_size
= 0xFFFFFFFF; /* for debugging */
91 ltt_write_trace_header(chan
->a
.trace
, header
);
95 * offset is assumed to never be 0 here : never deliver a completely empty
96 * subbuffer. The lost size is between 0 and subbuf_size-1.
99 void ltt_buffer_end(struct ltt_chanbuf
*buf
, u64 tsc
, unsigned int offset
,
100 unsigned int subbuf_idx
)
102 struct ltt_chan
*chan
= container_of(buf
->a
.chan
, struct ltt_chan
, a
);
103 struct ltt_subbuffer_header
*header
=
104 (struct ltt_subbuffer_header
*)
105 ltt_relay_offset_address(&buf
->a
,
106 subbuf_idx
* chan
->a
.sb_size
);
107 u32 data_size
= SUBBUF_OFFSET(offset
- 1, chan
) + 1;
109 header
->data_size
= data_size
;
110 header
->sb_size
= PAGE_ALIGN(data_size
);
111 header
->cycle_count_end
= tsc
;
112 header
->events_lost
= local_read(&buf
->events_lost
);
113 header
->subbuf_corrupt
= local_read(&buf
->corrupted_subbuffers
);
117 * Must be called under trace lock or cpu hotplug protection.
119 void ltt_chanbuf_free(struct ltt_chanbuf
*buf
)
121 struct ltt_chan
*chan
= container_of(buf
->a
.chan
, struct ltt_chan
, a
);
123 ltt_relay_print_buffer_errors(chan
, buf
->a
.cpu
);
124 #ifdef CONFIG_LTT_VMCORE
125 kfree(buf
->commit_seq
);
127 kfree(buf
->commit_count
);
129 ltt_chanbuf_alloc_free(&buf
->a
);
133 * Must be called under trace lock or cpu hotplug protection.
135 int ltt_chanbuf_create(struct ltt_chanbuf
*buf
, struct ltt_chan_alloc
*chana
,
138 struct ltt_chan
*chan
= container_of(chana
, struct ltt_chan
, a
);
139 struct ltt_trace
*trace
= chana
->trace
;
140 unsigned int j
, n_sb
;
143 /* Test for cpu hotplug */
144 if (buf
->a
.allocated
)
147 ret
= ltt_chanbuf_alloc_create(&buf
->a
, &chan
->a
, cpu
);
152 kzalloc_node(ALIGN(sizeof(*buf
->commit_count
) * chan
->a
.n_sb
,
153 1 << INTERNODE_CACHE_SHIFT
),
154 GFP_KERNEL
, cpu_to_node(cpu
));
155 if (!buf
->commit_count
) {
160 #ifdef CONFIG_LTT_VMCORE
162 kzalloc_node(ALIGN(sizeof(*buf
->commit_seq
) * chan
->a
.n_sb
,
163 1 << INTERNODE_CACHE_SHIFT
),
164 GFP_KERNEL
, cpu_to_node(cpu
));
165 if (!buf
->commit_seq
) {
166 kfree(buf
->commit_count
);
172 local_set(&buf
->offset
, ltt_sb_header_size());
173 atomic_long_set(&buf
->consumed
, 0);
174 atomic_long_set(&buf
->active_readers
, 0);
176 for (j
= 0; j
< n_sb
; j
++) {
177 local_set(&buf
->commit_count
[j
].cc
, 0);
178 local_set(&buf
->commit_count
[j
].cc_sb
, 0);
179 local_set(&buf
->commit_count
[j
].events
, 0);
181 init_waitqueue_head(&buf
->write_wait
);
182 init_waitqueue_head(&buf
->read_wait
);
183 spin_lock_init(&buf
->full_lock
);
185 RCHAN_SB_CLEAR_NOREF(buf
->a
.buf_wsb
[0].pages
);
186 ltt_buffer_begin(buf
, trace
->start_tsc
, 0);
187 /* atomic_add made on local variable on data that belongs to
188 * various CPUs : ok because tracing not started (for this cpu). */
189 local_add(ltt_sb_header_size(), &buf
->commit_count
[0].cc
);
191 local_set(&buf
->events_lost
, 0);
192 local_set(&buf
->corrupted_subbuffers
, 0);
195 ret
= ltt_chanbuf_create_file(chan
->a
.filename
, chan
->a
.parent
,
201 * Ensure the buffer is ready before setting it to allocated.
202 * Used for cpu hotplug vs async wakeup.
205 buf
->a
.allocated
= 1;
211 #ifdef CONFIG_LTT_VMCORE
212 kfree(buf
->commit_seq
);
215 kfree(buf
->commit_count
);
217 ltt_chanbuf_alloc_free(&buf
->a
);
221 void ltt_chan_remove_files(struct ltt_chan
*chan
)
223 ltt_ascii_remove(chan
);
224 ltt_chan_alloc_remove_files(&chan
->a
);
226 EXPORT_SYMBOL_GPL(ltt_chan_remove_files
);
229 void ltt_chan_free(struct kref
*kref
)
231 struct ltt_chan
*chan
= container_of(kref
, struct ltt_chan
, a
.kref
);
233 ltt_chan_alloc_free(&chan
->a
);
235 EXPORT_SYMBOL_GPL(ltt_chan_free
);
238 * ltt_chan_create - Create channel.
240 int ltt_chan_create(const char *base_filename
,
241 struct ltt_chan
*chan
, struct dentry
*parent
,
242 size_t sb_size
, size_t n_sb
,
243 int overwrite
, struct ltt_trace
*trace
)
247 chan
->overwrite
= overwrite
;
249 ret
= ltt_chan_alloc_init(&chan
->a
, trace
, base_filename
, parent
,
250 sb_size
, n_sb
, overwrite
, overwrite
);
254 chan
->commit_count_mask
= (~0UL >> chan
->a
.n_sb_order
);
256 ret
= ltt_ascii_create(chan
);
258 goto error_chan_alloc_free
;
262 error_chan_alloc_free
:
263 ltt_chan_alloc_free(&chan
->a
);
267 EXPORT_SYMBOL_GPL(ltt_chan_create
);
269 int ltt_chanbuf_open_read(struct ltt_chanbuf
*buf
)
271 kref_get(&buf
->a
.chan
->kref
);
272 if (!atomic_long_add_unless(&buf
->active_readers
, 1, 1)) {
273 kref_put(&buf
->a
.chan
->kref
, ltt_chan_free
);
279 EXPORT_SYMBOL_GPL(ltt_chanbuf_open_read
);
281 void ltt_chanbuf_release_read(struct ltt_chanbuf
*buf
)
283 //ltt_relay_destroy_buffer(&buf->a.chan->a, buf->a.cpu);
284 WARN_ON(atomic_long_read(&buf
->active_readers
) != 1);
285 atomic_long_dec(&buf
->active_readers
);
286 kref_put(&buf
->a
.chan
->kref
, ltt_chan_free
);
288 EXPORT_SYMBOL_GPL(ltt_chanbuf_release_read
);
293 * This must be done after the trace is removed from the RCU list so that there
294 * are no stalled writers.
296 static void ltt_relay_wake_writers(struct ltt_chanbuf
*buf
)
299 if (waitqueue_active(&buf
->write_wait
))
300 wake_up_interruptible(&buf
->write_wait
);
304 * This function should not be called from NMI interrupt context
306 static void ltt_buf_unfull(struct ltt_chanbuf
*buf
)
308 ltt_relay_wake_writers(buf
);
312 * Promote compiler barrier to a smp_mb().
313 * For the specific LTTng case, this IPI call should be removed if the
314 * architecture does not reorder writes. This should eventually be provided by
315 * a separate architecture-specific infrastructure.
317 static void remote_mb(void *info
)
322 int ltt_chanbuf_get_subbuf(struct ltt_chanbuf
*buf
, unsigned long *consumed
)
324 struct ltt_chan
*chan
= container_of(buf
->a
.chan
, struct ltt_chan
, a
);
325 long consumed_old
, consumed_idx
, commit_count
, write_offset
;
328 consumed_old
= atomic_long_read(&buf
->consumed
);
329 consumed_idx
= SUBBUF_INDEX(consumed_old
, chan
);
330 commit_count
= local_read(&buf
->commit_count
[consumed_idx
].cc_sb
);
332 * Make sure we read the commit count before reading the buffer
333 * data and the write offset. Correct consumed offset ordering
334 * wrt commit count is insured by the use of cmpxchg to update
335 * the consumed offset.
336 * smp_call_function_single can fail if the remote CPU is offline,
337 * this is OK because then there is no wmb to execute there.
338 * If our thread is executing on the same CPU as the on the buffers
339 * belongs to, we don't have to synchronize it at all. If we are
340 * migrated, the scheduler will take care of the memory barriers.
341 * Normally, smp_call_function_single() should ensure program order when
342 * executing the remote function, which implies that it surrounds the
343 * function execution with :
354 * However, smp_call_function_single() does not seem to clearly execute
355 * such barriers. It depends on spinlock semantic to provide the barrier
356 * before executing the IPI and, when busy-looping, csd_lock_wait only
357 * executes smp_mb() when it has to wait for the other CPU.
359 * I don't trust this code. Therefore, let's add the smp_mb() sequence
360 * required ourself, even if duplicated. It has no performance impact
363 * smp_mb() is needed because smp_rmb() and smp_wmb() only order read vs
364 * read and write vs write. They do not ensure core synchronization. We
365 * really have to ensure total order between the 3 barriers running on
368 #ifdef LTT_NO_IPI_BARRIER
370 * Local rmb to match the remote wmb to read the commit count before the
371 * buffer data and the write offset.
375 if (raw_smp_processor_id() != buf
->a
.cpu
) {
376 smp_mb(); /* Total order with IPI handler smp_mb() */
377 smp_call_function_single(buf
->a
.cpu
, remote_mb
, NULL
, 1);
378 smp_mb(); /* Total order with IPI handler smp_mb() */
381 write_offset
= local_read(&buf
->offset
);
383 * Check that the subbuffer we are trying to consume has been
384 * already fully committed.
386 if (((commit_count
- chan
->a
.sb_size
)
387 & chan
->commit_count_mask
)
388 - (BUFFER_TRUNC(consumed_old
, chan
)
389 >> chan
->a
.n_sb_order
)
394 * Check that we are not about to read the same subbuffer in
395 * which the writer head is.
397 if ((SUBBUF_TRUNC(write_offset
, chan
)
398 - SUBBUF_TRUNC(consumed_old
, chan
))
403 ret
= update_read_sb_index(&buf
->a
, &chan
->a
, consumed_idx
);
407 *consumed
= consumed_old
;
410 EXPORT_SYMBOL_GPL(ltt_chanbuf_get_subbuf
);
412 int ltt_chanbuf_put_subbuf(struct ltt_chanbuf
*buf
, unsigned long consumed
)
414 struct ltt_chan
*chan
= container_of(buf
->a
.chan
, struct ltt_chan
, a
);
415 long consumed_new
, consumed_old
;
417 WARN_ON(atomic_long_read(&buf
->active_readers
) != 1);
419 consumed_old
= consumed
;
420 consumed_new
= SUBBUF_ALIGN(consumed_old
, chan
);
421 WARN_ON_ONCE(RCHAN_SB_IS_NOREF(buf
->a
.buf_rsb
.pages
));
422 RCHAN_SB_SET_NOREF(buf
->a
.buf_rsb
.pages
);
424 spin_lock(&buf
->full_lock
);
425 if (atomic_long_cmpxchg(&buf
->consumed
, consumed_old
, consumed_new
)
427 /* We have been pushed by the writer. */
428 spin_unlock(&buf
->full_lock
);
430 * We exchanged the subbuffer pages. No corruption possible
431 * even if the writer did push us. No more -EIO possible.
435 /* tell the client that buffer is now unfull */
438 index
= SUBBUF_INDEX(consumed_old
, chan
);
439 data
= BUFFER_OFFSET(consumed_old
, chan
);
441 spin_unlock(&buf
->full_lock
);
445 EXPORT_SYMBOL_GPL(ltt_chanbuf_put_subbuf
);
447 static void switch_buffer(unsigned long data
)
449 struct ltt_chanbuf
*buf
= (struct ltt_chanbuf
*)data
;
450 struct ltt_chan
*chan
= container_of(buf
->a
.chan
, struct ltt_chan
, a
);
453 * Only flush buffers periodically if readers are active.
455 if (atomic_long_read(&buf
->active_readers
))
456 ltt_force_switch(buf
, FORCE_ACTIVE
);
458 mod_timer_pinned(&buf
->switch_timer
,
459 jiffies
+ chan
->switch_timer_interval
);
462 static void ltt_chanbuf_start_switch_timer(struct ltt_chanbuf
*buf
)
464 struct ltt_chan
*chan
= container_of(buf
->a
.chan
, struct ltt_chan
, a
);
466 if (!chan
->switch_timer_interval
)
469 init_timer_deferrable(&buf
->switch_timer
);
470 buf
->switch_timer
.function
= switch_buffer
;
471 buf
->switch_timer
.expires
= jiffies
+ chan
->switch_timer_interval
;
472 buf
->switch_timer
.data
= (unsigned long)buf
;
473 add_timer_on(&buf
->switch_timer
, buf
->a
.cpu
);
477 * called with ltt traces lock held.
479 void ltt_chan_start_switch_timer(struct ltt_chan
*chan
)
483 if (!chan
->switch_timer_interval
)
486 for_each_online_cpu(cpu
) {
487 struct ltt_chanbuf
*buf
;
489 buf
= per_cpu_ptr(chan
->a
.buf
, cpu
);
490 ltt_chanbuf_start_switch_timer(buf
);
494 static void ltt_chanbuf_stop_switch_timer(struct ltt_chanbuf
*buf
)
496 struct ltt_chan
*chan
= container_of(buf
->a
.chan
, struct ltt_chan
, a
);
498 if (!chan
->switch_timer_interval
)
501 del_timer_sync(&buf
->switch_timer
);
505 * called with ltt traces lock held.
507 void ltt_chan_stop_switch_timer(struct ltt_chan
*chan
)
511 if (!chan
->switch_timer_interval
)
514 for_each_online_cpu(cpu
) {
515 struct ltt_chanbuf
*buf
;
517 buf
= per_cpu_ptr(chan
->a
.buf
, cpu
);
518 ltt_chanbuf_stop_switch_timer(buf
);
522 static void ltt_chanbuf_idle_switch(struct ltt_chanbuf
*buf
)
524 struct ltt_chan
*chan
= container_of(buf
->a
.chan
, struct ltt_chan
, a
);
526 if (chan
->switch_timer_interval
)
527 ltt_force_switch(buf
, FORCE_ACTIVE
);
531 * ltt_chanbuf_switch is called from a remote CPU to ensure that the buffers of
532 * a cpu which went down are flushed. Note that if we execute concurrently
533 * with trace allocation, a buffer might appear be unallocated (because it
534 * detects that the target CPU is offline).
536 static void ltt_chanbuf_switch(struct ltt_chanbuf
*buf
)
538 if (buf
->a
.allocated
)
539 ltt_force_switch(buf
, FORCE_ACTIVE
);
543 * ltt_chanbuf_hotcpu_callback - CPU hotplug callback
544 * @nb: notifier block
545 * @action: hotplug action to take
548 * Returns the success/failure of the operation. (%NOTIFY_OK, %NOTIFY_BAD)
551 int ltt_chanbuf_hotcpu_callback(struct notifier_block
*nb
,
552 unsigned long action
,
555 unsigned int cpu
= (unsigned long)hcpu
;
558 case CPU_DOWN_FAILED
:
559 case CPU_DOWN_FAILED_FROZEN
:
561 case CPU_ONLINE_FROZEN
:
563 * CPU hotplug lock protects trace lock from this callback.
565 ltt_chan_for_each_channel(ltt_chanbuf_start_switch_timer
, cpu
);
568 case CPU_DOWN_PREPARE
:
569 case CPU_DOWN_PREPARE_FROZEN
:
571 * Performs an IPI to delete the timer locally on the target
572 * CPU. CPU hotplug lock protects trace lock from this
575 ltt_chan_for_each_channel(ltt_chanbuf_stop_switch_timer
, cpu
);
579 case CPU_DEAD_FROZEN
:
581 * Performing a buffer switch on a remote CPU. Performed by
582 * the CPU responsible for doing the hotunplug after the target
583 * CPU stopped running completely. Ensures that all data
584 * from that remote CPU is flushed. CPU hotplug lock protects
585 * trace lock from this callback.
587 ltt_chan_for_each_channel(ltt_chanbuf_switch
, cpu
);
595 static int pm_idle_entry_callback(struct notifier_block
*self
,
596 unsigned long val
, void *data
)
598 if (val
== IDLE_START
) {
599 rcu_read_lock_sched_notrace();
600 ltt_chan_for_each_channel(ltt_chanbuf_idle_switch
,
602 rcu_read_unlock_sched_notrace();
607 struct notifier_block pm_idle_entry_notifier
= {
608 .notifier_call
= pm_idle_entry_callback
,
609 .priority
= ~0U, /* smallest prio, run after tracing events */
613 void ltt_relay_print_written(struct ltt_chan
*chan
, long cons_off
,
616 struct ltt_chanbuf
*buf
= per_cpu_ptr(chan
->a
.buf
, cpu
);
617 long cons_idx
, events_count
;
619 cons_idx
= SUBBUF_INDEX(cons_off
, chan
);
620 events_count
= local_read(&buf
->commit_count
[cons_idx
].events
);
624 "LTT: %lu events written in channel %s "
625 "(cpu %u, index %lu)\n",
626 events_count
, chan
->a
.filename
, cpu
, cons_idx
);
630 void ltt_relay_print_subbuffer_errors(struct ltt_chanbuf
*buf
,
631 struct ltt_chan
*chan
, long cons_off
,
634 long cons_idx
, commit_count
, commit_count_sb
, write_offset
;
636 cons_idx
= SUBBUF_INDEX(cons_off
, chan
);
637 commit_count
= local_read(&buf
->commit_count
[cons_idx
].cc
);
638 commit_count_sb
= local_read(&buf
->commit_count
[cons_idx
].cc_sb
);
640 * No need to order commit_count and write_offset reads because we
641 * execute after trace is stopped when there are no readers left.
643 write_offset
= local_read(&buf
->offset
);
645 "LTT : unread channel %s offset is %ld "
646 "and cons_off : %ld (cpu %u)\n",
647 chan
->a
.filename
, write_offset
, cons_off
, cpu
);
648 /* Check each sub-buffer for non filled commit count */
649 if (((commit_count
- chan
->a
.sb_size
) & chan
->commit_count_mask
)
650 - (BUFFER_TRUNC(cons_off
, chan
) >> chan
->a
.n_sb_order
)
653 "LTT : %s : subbuffer %lu has non filled "
654 "commit count [cc, cc_sb] [%lu,%lu].\n",
655 chan
->a
.filename
, cons_idx
, commit_count
,
657 printk(KERN_ALERT
"LTT : %s : commit count : %lu, subbuf size %lu\n",
658 chan
->a
.filename
, commit_count
, chan
->a
.sb_size
);
662 void ltt_relay_print_errors(struct ltt_chanbuf
*buf
, struct ltt_chan
*chan
,
663 struct ltt_trace
*trace
, int cpu
)
668 * Can be called in the error path of allocation when
669 * trans_channel_data is not yet set.
673 for (cons_off
= 0; cons_off
< chan
->a
.buf_size
;
674 cons_off
= SUBBUF_ALIGN(cons_off
, chan
))
675 ltt_relay_print_written(chan
, cons_off
, cpu
);
676 for (cons_off
= atomic_long_read(&buf
->consumed
);
677 (SUBBUF_TRUNC(local_read(&buf
->offset
), chan
)
679 cons_off
= SUBBUF_ALIGN(cons_off
, chan
))
680 ltt_relay_print_subbuffer_errors(buf
, chan
, cons_off
, cpu
);
684 void ltt_relay_print_buffer_errors(struct ltt_chan
*chan
, unsigned int cpu
)
686 struct ltt_trace
*trace
= chan
->a
.trace
;
687 struct ltt_chanbuf
*buf
= per_cpu_ptr(chan
->a
.buf
, cpu
);
689 if (local_read(&buf
->events_lost
))
691 "LTT : %s : %ld events lost "
692 "in %s channel (cpu %u).\n",
693 chan
->a
.filename
, local_read(&buf
->events_lost
),
694 chan
->a
.filename
, cpu
);
695 if (local_read(&buf
->corrupted_subbuffers
))
697 "LTT : %s : %ld corrupted subbuffers "
698 "in %s channel (cpu %u).\n",
700 local_read(&buf
->corrupted_subbuffers
),
701 chan
->a
.filename
, cpu
);
703 ltt_relay_print_errors(buf
, chan
, trace
, cpu
);
706 static void ltt_relay_remove_dirs(struct ltt_trace
*trace
)
708 ltt_ascii_remove_dir(trace
);
709 debugfs_remove(trace
->dentry
.trace_root
);
712 static int ltt_relay_create_dirs(struct ltt_trace
*new_trace
)
714 struct dentry
*ltt_root_dentry
;
717 ltt_root_dentry
= get_ltt_root();
718 if (!ltt_root_dentry
)
721 new_trace
->dentry
.trace_root
= debugfs_create_dir(new_trace
->trace_name
,
724 if (new_trace
->dentry
.trace_root
== NULL
) {
725 printk(KERN_ERR
"LTT : Trace directory name %s already taken\n",
726 new_trace
->trace_name
);
729 ret
= ltt_ascii_create_dir(new_trace
);
731 printk(KERN_WARNING
"LTT : Unable to create ascii output file "
732 "for trace %s\n", new_trace
->trace_name
);
738 * LTTng channel flush function.
740 * Must be called when no tracing is active in the channel, because of
741 * accesses across CPUs.
743 static notrace
void ltt_relay_buffer_flush(struct ltt_chanbuf
*buf
)
746 ltt_force_switch(buf
, FORCE_FLUSH
);
749 static void ltt_relay_async_wakeup_chan(struct ltt_chan
*chan
)
753 for_each_possible_cpu(i
) {
754 struct ltt_chanbuf
*buf
;
756 buf
= per_cpu_ptr(chan
->a
.buf
, i
);
757 if (!buf
->a
.allocated
)
760 * Ensure the buffer has been allocated before reading its
761 * content. Sync cpu hotplug vs async wakeup.
764 if (ltt_poll_deliver(buf
, chan
))
765 wake_up_interruptible(&buf
->read_wait
);
769 static void ltt_relay_finish_buffer(struct ltt_chan
*chan
, unsigned int cpu
)
771 struct ltt_chanbuf
*buf
= per_cpu_ptr(chan
->a
.buf
, cpu
);
773 if (buf
->a
.allocated
) {
774 ltt_relay_buffer_flush(buf
);
775 ltt_relay_wake_writers(buf
);
780 static void ltt_relay_finish_channel(struct ltt_chan
*chan
)
784 for_each_possible_cpu(i
)
785 ltt_relay_finish_buffer(chan
, i
);
789 * This is called with preemption disabled when user space has requested
790 * blocking mode. If one of the active traces has free space below a
791 * specific threshold value, we reenable preemption and block.
794 int ltt_relay_user_blocking(struct ltt_trace
*trace
, unsigned int chan_index
,
795 size_t data_size
, struct user_dbg_data
*dbg
)
797 struct ltt_chanbuf
*buf
;
798 struct ltt_chan
*chan
;
800 DECLARE_WAITQUEUE(wait
, current
);
802 chan
= &trace
->channels
[chan_index
];
803 cpu
= smp_processor_id();
804 buf
= per_cpu_ptr(chan
->a
.buf
, cpu
);
807 * Check if data is too big for the channel : do not
810 if (LTT_RESERVE_CRITICAL
+ data_size
> chan
->a
.sb_size
)
814 * If free space too low, we block. We restart from the
815 * beginning after we resume (cpu id may have changed
816 * while preemption is active).
818 spin_lock(&buf
->full_lock
);
819 if (!chan
->overwrite
) {
820 dbg
->write
= local_read(&buf
->offset
);
821 dbg
->read
= atomic_long_read(&buf
->consumed
);
822 dbg
->avail_size
= dbg
->write
+ LTT_RESERVE_CRITICAL
+ data_size
823 - SUBBUF_TRUNC(dbg
->read
, chan
);
824 if (dbg
->avail_size
> chan
->a
.buf_size
) {
825 __set_current_state(TASK_INTERRUPTIBLE
);
826 add_wait_queue(&buf
->write_wait
, &wait
);
827 spin_unlock(&buf
->full_lock
);
830 __set_current_state(TASK_RUNNING
);
831 remove_wait_queue(&buf
->write_wait
, &wait
);
832 if (signal_pending(current
))
838 spin_unlock(&buf
->full_lock
);
843 void ltt_relay_print_user_errors(struct ltt_trace
*trace
,
844 unsigned int chan_index
, size_t data_size
,
845 struct user_dbg_data
*dbg
, int cpu
)
847 struct ltt_chanbuf
*buf
;
848 struct ltt_chan
*chan
;
850 chan
= &trace
->channels
[chan_index
];
851 buf
= per_cpu_ptr(chan
->a
.buf
, cpu
);
853 printk(KERN_ERR
"Error in LTT usertrace : "
854 "buffer full : event lost in blocking "
855 "mode. Increase LTT_RESERVE_CRITICAL.\n");
856 printk(KERN_ERR
"LTT nesting level is %u.\n",
857 per_cpu(ltt_nesting
, cpu
));
858 printk(KERN_ERR
"LTT available size %lu.\n",
860 printk(KERN_ERR
"available write : %lu, read : %lu\n",
861 dbg
->write
, dbg
->read
);
863 dbg
->write
= local_read(&buf
->offset
);
864 dbg
->read
= atomic_long_read(&buf
->consumed
);
866 printk(KERN_ERR
"LTT current size %lu.\n",
867 dbg
->write
+ LTT_RESERVE_CRITICAL
+ data_size
868 - SUBBUF_TRUNC(dbg
->read
, chan
));
869 printk(KERN_ERR
"current write : %lu, read : %lu\n",
870 dbg
->write
, dbg
->read
);
874 * ltt_reserve_switch_old_subbuf: switch old subbuffer
876 * Concurrency safe because we are the last and only thread to alter this
877 * sub-buffer. As long as it is not delivered and read, no other thread can
878 * alter the offset, alter the reserve_count or call the
879 * client_buffer_end_callback on this sub-buffer.
881 * The only remaining threads could be the ones with pending commits. They will
882 * have to do the deliver themselves. Not concurrency safe in overwrite mode.
883 * We detect corrupted subbuffers with commit and reserve counts. We keep a
884 * corrupted sub-buffers count and push the readers across these sub-buffers.
886 * Not concurrency safe if a writer is stalled in a subbuffer and another writer
887 * switches in, finding out it's corrupted. The result will be than the old
888 * (uncommited) subbuffer will be declared corrupted, and that the new subbuffer
889 * will be declared corrupted too because of the commit count adjustment.
891 * Note : offset_old should never be 0 here.
894 void ltt_reserve_switch_old_subbuf(struct ltt_chanbuf
*buf
,
895 struct ltt_chan
*chan
,
896 struct ltt_reserve_switch_offsets
*offsets
,
899 long oldidx
= SUBBUF_INDEX(offsets
->old
- 1, chan
);
900 long commit_count
, padding_size
;
902 padding_size
= chan
->a
.sb_size
903 - (SUBBUF_OFFSET(offsets
->old
- 1, chan
) + 1);
904 ltt_buffer_end(buf
, *tsc
, offsets
->old
, oldidx
);
907 * Must write slot data before incrementing commit count.
908 * This compiler barrier is upgraded into a smp_wmb() by the IPI
909 * sent by get_subbuf() when it does its smp_rmb().
912 local_add(padding_size
, &buf
->commit_count
[oldidx
].cc
);
913 commit_count
= local_read(&buf
->commit_count
[oldidx
].cc
);
914 ltt_check_deliver(buf
, chan
, offsets
->old
- 1, commit_count
, oldidx
);
915 ltt_write_commit_counter(buf
, chan
, oldidx
, offsets
->old
, commit_count
,
920 * ltt_reserve_switch_new_subbuf: Populate new subbuffer.
922 * This code can be executed unordered : writers may already have written to the
923 * sub-buffer before this code gets executed, caution. The commit makes sure
924 * that this code is executed before the deliver of this sub-buffer.
927 void ltt_reserve_switch_new_subbuf(struct ltt_chanbuf
*buf
,
928 struct ltt_chan
*chan
,
929 struct ltt_reserve_switch_offsets
*offsets
,
932 long beginidx
= SUBBUF_INDEX(offsets
->begin
, chan
);
935 ltt_buffer_begin(buf
, *tsc
, beginidx
);
938 * Must write slot data before incrementing commit count.
939 * This compiler barrier is upgraded into a smp_wmb() by the IPI
940 * sent by get_subbuf() when it does its smp_rmb().
943 local_add(ltt_sb_header_size(), &buf
->commit_count
[beginidx
].cc
);
944 commit_count
= local_read(&buf
->commit_count
[beginidx
].cc
);
945 /* Check if the written buffer has to be delivered */
946 ltt_check_deliver(buf
, chan
, offsets
->begin
, commit_count
, beginidx
);
947 ltt_write_commit_counter(buf
, chan
, beginidx
, offsets
->begin
,
948 commit_count
, ltt_sb_header_size());
953 * ltt_reserve_end_switch_current: finish switching current subbuffer
955 * Concurrency safe because we are the last and only thread to alter this
956 * sub-buffer. As long as it is not delivered and read, no other thread can
957 * alter the offset, alter the reserve_count or call the
958 * client_buffer_end_callback on this sub-buffer.
960 * The only remaining threads could be the ones with pending commits. They will
961 * have to do the deliver themselves. Not concurrency safe in overwrite mode.
962 * We detect corrupted subbuffers with commit and reserve counts. We keep a
963 * corrupted sub-buffers count and push the readers across these sub-buffers.
965 * Not concurrency safe if a writer is stalled in a subbuffer and another writer
966 * switches in, finding out it's corrupted. The result will be than the old
967 * (uncommited) subbuffer will be declared corrupted, and that the new subbuffer
968 * will be declared corrupted too because of the commit count adjustment.
971 void ltt_reserve_end_switch_current(struct ltt_chanbuf
*buf
,
972 struct ltt_chan
*chan
,
973 struct ltt_reserve_switch_offsets
*offsets
,
976 long endidx
= SUBBUF_INDEX(offsets
->end
- 1, chan
);
977 long commit_count
, padding_size
;
979 padding_size
= chan
->a
.sb_size
980 - (SUBBUF_OFFSET(offsets
->end
- 1, chan
) + 1);
982 ltt_buffer_end(buf
, *tsc
, offsets
->end
, endidx
);
985 * Must write slot data before incrementing commit count.
986 * This compiler barrier is upgraded into a smp_wmb() by the IPI
987 * sent by get_subbuf() when it does its smp_rmb().
990 local_add(padding_size
, &buf
->commit_count
[endidx
].cc
);
991 commit_count
= local_read(&buf
->commit_count
[endidx
].cc
);
992 ltt_check_deliver(buf
, chan
, offsets
->end
- 1, commit_count
, endidx
);
993 ltt_write_commit_counter(buf
, chan
, endidx
, offsets
->end
, commit_count
,
1000 * !0 if execution must be aborted.
1003 int ltt_relay_try_switch_slow(enum force_switch_mode mode
,
1004 struct ltt_chanbuf
*buf
, struct ltt_chan
*chan
,
1005 struct ltt_reserve_switch_offsets
*offsets
,
1009 long reserve_commit_diff
;
1012 offsets
->begin
= local_read(&buf
->offset
);
1013 offsets
->old
= offsets
->begin
;
1014 offsets
->begin_switch
= 0;
1015 offsets
->end_switch_old
= 0;
1017 *tsc
= trace_clock_read64();
1019 off
= SUBBUF_OFFSET(offsets
->begin
, chan
);
1020 if ((mode
!= FORCE_ACTIVE
&& off
> 0) || off
> ltt_sb_header_size()) {
1021 offsets
->begin
= SUBBUF_ALIGN(offsets
->begin
, chan
);
1022 offsets
->end_switch_old
= 1;
1024 /* we do not have to switch : buffer is empty */
1027 if (mode
== FORCE_ACTIVE
)
1028 offsets
->begin
+= ltt_sb_header_size();
1030 * Always begin_switch in FORCE_ACTIVE mode.
1031 * Test new buffer integrity
1033 sb_index
= SUBBUF_INDEX(offsets
->begin
, chan
);
1034 reserve_commit_diff
=
1035 (BUFFER_TRUNC(offsets
->begin
, chan
)
1036 >> chan
->a
.n_sb_order
)
1037 - (local_read(&buf
->commit_count
[sb_index
].cc_sb
)
1038 & chan
->commit_count_mask
);
1039 if (reserve_commit_diff
== 0) {
1040 /* Next buffer not corrupted. */
1041 if (mode
== FORCE_ACTIVE
1043 && offsets
->begin
- atomic_long_read(&buf
->consumed
)
1044 >= chan
->a
.buf_size
) {
1046 * We do not overwrite non consumed buffers and we are
1047 * full : ignore switch while tracing is active.
1053 * Next subbuffer corrupted. Force pushing reader even in normal
1057 offsets
->end
= offsets
->begin
;
1062 * Force a sub-buffer switch for a per-cpu buffer. This operation is
1063 * completely reentrant : can be called while tracing is active with
1064 * absolutely no lock held.
1066 * Note, however, that as a local_cmpxchg is used for some atomic
1067 * operations, this function must be called from the CPU which owns the buffer
1068 * for a ACTIVE flush.
1070 void ltt_force_switch_lockless_slow(struct ltt_chanbuf
*buf
,
1071 enum force_switch_mode mode
)
1073 struct ltt_chan
*chan
= container_of(buf
->a
.chan
, struct ltt_chan
, a
);
1074 struct ltt_reserve_switch_offsets offsets
;
1080 * Perform retryable operations.
1083 if (ltt_relay_try_switch_slow(mode
, buf
, chan
, &offsets
, &tsc
))
1085 } while (local_cmpxchg(&buf
->offset
, offsets
.old
, offsets
.end
)
1089 * Atomically update last_tsc. This update races against concurrent
1090 * atomic updates, but the race will always cause supplementary full TSC
1091 * events, never the opposite (missing a full TSC event when it would be
1094 save_last_tsc(buf
, tsc
);
1097 * Push the reader if necessary
1099 if (mode
== FORCE_ACTIVE
) {
1100 ltt_reserve_push_reader(buf
, chan
, offsets
.end
- 1);
1101 ltt_clear_noref_flag(&buf
->a
, SUBBUF_INDEX(offsets
.end
- 1,
1106 * Switch old subbuffer if needed.
1108 if (offsets
.end_switch_old
) {
1109 ltt_clear_noref_flag(&buf
->a
, SUBBUF_INDEX(offsets
.old
- 1,
1111 ltt_reserve_switch_old_subbuf(buf
, chan
, &offsets
, &tsc
);
1115 * Populate new subbuffer.
1117 if (mode
== FORCE_ACTIVE
)
1118 ltt_reserve_switch_new_subbuf(buf
, chan
, &offsets
, &tsc
);
1120 EXPORT_SYMBOL_GPL(ltt_force_switch_lockless_slow
);
1125 * !0 if execution must be aborted.
1128 int ltt_relay_try_reserve_slow(struct ltt_chanbuf
*buf
, struct ltt_chan
*chan
,
1129 struct ltt_reserve_switch_offsets
*offsets
,
1130 size_t data_size
, u64
*tsc
, unsigned int *rflags
,
1133 long reserve_commit_diff
;
1135 offsets
->begin
= local_read(&buf
->offset
);
1136 offsets
->old
= offsets
->begin
;
1137 offsets
->begin_switch
= 0;
1138 offsets
->end_switch_current
= 0;
1139 offsets
->end_switch_old
= 0;
1141 *tsc
= trace_clock_read64();
1142 if (last_tsc_overflow(buf
, *tsc
))
1143 *rflags
= LTT_RFLAG_ID_SIZE_TSC
;
1145 if (unlikely(SUBBUF_OFFSET(offsets
->begin
, chan
) == 0)) {
1146 offsets
->begin_switch
= 1; /* For offsets->begin */
1148 offsets
->size
= ltt_get_header_size(chan
, offsets
->begin
,
1150 &offsets
->before_hdr_pad
,
1152 offsets
->size
+= ltt_align(offsets
->begin
+ offsets
->size
,
1155 if (unlikely((SUBBUF_OFFSET(offsets
->begin
, chan
) +
1156 offsets
->size
) > chan
->a
.sb_size
)) {
1157 offsets
->end_switch_old
= 1; /* For offsets->old */
1158 offsets
->begin_switch
= 1; /* For offsets->begin */
1161 if (unlikely(offsets
->begin_switch
)) {
1165 * We are typically not filling the previous buffer completely.
1167 if (likely(offsets
->end_switch_old
))
1168 offsets
->begin
= SUBBUF_ALIGN(offsets
->begin
, chan
);
1169 offsets
->begin
= offsets
->begin
+ ltt_sb_header_size();
1170 /* Test new buffer integrity */
1171 sb_index
= SUBBUF_INDEX(offsets
->begin
, chan
);
1172 reserve_commit_diff
=
1173 (BUFFER_TRUNC(offsets
->begin
, chan
)
1174 >> chan
->a
.n_sb_order
)
1175 - (local_read(&buf
->commit_count
[sb_index
].cc_sb
)
1176 & chan
->commit_count_mask
);
1177 if (likely(reserve_commit_diff
== 0)) {
1178 /* Next buffer not corrupted. */
1179 if (unlikely(!chan
->overwrite
&&
1180 (SUBBUF_TRUNC(offsets
->begin
, chan
)
1181 - SUBBUF_TRUNC(atomic_long_read(&buf
->consumed
),
1183 >= chan
->a
.buf_size
)) {
1185 * We do not overwrite non consumed buffers
1186 * and we are full : event is lost.
1188 local_inc(&buf
->events_lost
);
1192 * next buffer not corrupted, we are either in
1193 * overwrite mode or the buffer is not full.
1194 * It's safe to write in this new subbuffer.
1199 * Next subbuffer corrupted. Drop event in normal and
1200 * overwrite mode. Caused by either a writer OOPS or
1201 * too many nested writes over a reserve/commit pair.
1203 local_inc(&buf
->events_lost
);
1206 offsets
->size
= ltt_get_header_size(chan
, offsets
->begin
,
1208 &offsets
->before_hdr_pad
,
1210 offsets
->size
+= ltt_align(offsets
->begin
+ offsets
->size
,
1213 if (unlikely((SUBBUF_OFFSET(offsets
->begin
, chan
)
1214 + offsets
->size
) > chan
->a
.sb_size
)) {
1216 * Event too big for subbuffers, report error, don't
1217 * complete the sub-buffer switch.
1219 local_inc(&buf
->events_lost
);
1223 * We just made a successful buffer switch and the event
1224 * fits in the new subbuffer. Let's write.
1229 * Event fits in the current buffer and we are not on a switch
1230 * boundary. It's safe to write.
1233 offsets
->end
= offsets
->begin
+ offsets
->size
;
1235 if (unlikely((SUBBUF_OFFSET(offsets
->end
, chan
)) == 0)) {
1237 * The offset_end will fall at the very beginning of the next
1240 offsets
->end_switch_current
= 1; /* For offsets->begin */
1246 * ltt_relay_reserve_slot_lockless_slow - Atomic slot reservation in a buffer.
1247 * @trace: the trace structure to log to.
1248 * @ltt_channel: channel structure
1249 * @transport_data: data structure specific to ltt relay
1250 * @data_size: size of the variable length data to log.
1251 * @slot_size: pointer to total size of the slot (out)
1252 * @buf_offset : pointer to reserved buffer offset (out)
1253 * @tsc: pointer to the tsc at the slot reservation (out)
1256 * Return : -ENOSPC if not enough space, else returns 0.
1257 * It will take care of sub-buffer switching.
1259 int ltt_reserve_slot_lockless_slow(struct ltt_chan
*chan
,
1260 struct ltt_trace
*trace
, size_t data_size
,
1261 int largest_align
, int cpu
,
1262 struct ltt_chanbuf
**ret_buf
,
1263 size_t *slot_size
, long *buf_offset
,
1264 u64
*tsc
, unsigned int *rflags
)
1266 struct ltt_chanbuf
*buf
= *ret_buf
= per_cpu_ptr(chan
->a
.buf
, cpu
);
1267 struct ltt_reserve_switch_offsets offsets
;
1272 if (unlikely(ltt_relay_try_reserve_slow(buf
, chan
, &offsets
,
1273 data_size
, tsc
, rflags
,
1276 } while (unlikely(local_cmpxchg(&buf
->offset
, offsets
.old
, offsets
.end
)
1280 * Atomically update last_tsc. This update races against concurrent
1281 * atomic updates, but the race will always cause supplementary full TSC
1282 * events, never the opposite (missing a full TSC event when it would be
1285 save_last_tsc(buf
, *tsc
);
1288 * Push the reader if necessary
1290 ltt_reserve_push_reader(buf
, chan
, offsets
.end
- 1);
1293 * Clear noref flag for this subbuffer.
1295 ltt_clear_noref_flag(&buf
->a
, SUBBUF_INDEX(offsets
.end
- 1, chan
));
1298 * Switch old subbuffer if needed.
1300 if (unlikely(offsets
.end_switch_old
)) {
1301 ltt_clear_noref_flag(&buf
->a
, SUBBUF_INDEX(offsets
.old
- 1,
1303 ltt_reserve_switch_old_subbuf(buf
, chan
, &offsets
, tsc
);
1307 * Populate new subbuffer.
1309 if (unlikely(offsets
.begin_switch
))
1310 ltt_reserve_switch_new_subbuf(buf
, chan
, &offsets
, tsc
);
1312 if (unlikely(offsets
.end_switch_current
))
1313 ltt_reserve_end_switch_current(buf
, chan
, &offsets
, tsc
);
1315 *slot_size
= offsets
.size
;
1316 *buf_offset
= offsets
.begin
+ offsets
.before_hdr_pad
;
1319 EXPORT_SYMBOL_GPL(ltt_reserve_slot_lockless_slow
);
1321 static struct ltt_transport ltt_relay_transport
= {
1323 .owner
= THIS_MODULE
,
1325 .create_dirs
= ltt_relay_create_dirs
,
1326 .remove_dirs
= ltt_relay_remove_dirs
,
1327 .create_channel
= ltt_chan_create
,
1328 .finish_channel
= ltt_relay_finish_channel
,
1329 .remove_channel
= ltt_chan_free
,
1330 .remove_channel_files
= ltt_chan_remove_files
,
1331 .wakeup_channel
= ltt_relay_async_wakeup_chan
,
1332 .user_blocking
= ltt_relay_user_blocking
,
1333 .user_errors
= ltt_relay_print_user_errors
,
1334 .start_switch_timer
= ltt_chan_start_switch_timer
,
1335 .stop_switch_timer
= ltt_chan_stop_switch_timer
,
1339 static struct notifier_block fn_ltt_chanbuf_hotcpu_callback
= {
1340 .notifier_call
= ltt_chanbuf_hotcpu_callback
,
1344 int __init
ltt_relay_init(void)
1346 printk(KERN_INFO
"LTT : ltt-relay init\n");
1348 ltt_transport_register(<t_relay_transport
);
1349 register_cpu_notifier(&fn_ltt_chanbuf_hotcpu_callback
);
1350 register_idle_notifier(&pm_idle_entry_notifier
);
1355 void __exit
ltt_relay_exit(void)
1357 printk(KERN_INFO
"LTT : ltt-relay exit\n");
1359 unregister_idle_notifier(&pm_idle_entry_notifier
);
1360 unregister_cpu_notifier(&fn_ltt_chanbuf_hotcpu_callback
);
1361 ltt_transport_unregister(<t_relay_transport
);
1364 MODULE_LICENSE("GPL and additional rights");
1365 MODULE_AUTHOR("Mathieu Desnoyers");
1366 MODULE_DESCRIPTION("Linux Trace Toolkit Next Generation Lockless Relay");