unregister event when FD is closed
[deliverable/lttng-modules.git] / ltt-ring-buffer-client.h
1 /*
2 * ltt-ring-buffer-client.h
3 *
4 * Copyright (C) 2010 - Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
5 *
6 * LTTng lib ring buffer client template.
7 *
8 * Dual LGPL v2.1/GPL v2 license.
9 */
10
11 #include <linux/module.h>
12 #include <linux/types.h>
13 #include "wrapper/vmalloc.h" /* for wrapper_vmalloc_sync_all() */
14 #include "wrapper/trace-clock.h"
15 #include "ltt-events.h"
16 #include "ltt-tracer.h"
17
18 /*
19 * Keep the natural field alignment for _each field_ within this structure if
20 * you ever add/remove a field from this header. Packed attribute is not used
21 * because gcc generates poor code on at least powerpc and mips. Don't ever
22 * let gcc add padding between the structure elements.
23 */
24 struct packet_header {
25 uint32_t magic; /*
26 * Trace magic number.
27 * contains endianness information.
28 */
29 uint8_t uuid[16];
30 uint32_t stream_id;
31 uint64_t timestamp_begin; /* Cycle count at subbuffer start */
32 uint64_t timestamp_end; /* Cycle count at subbuffer end */
33 uint32_t content_size; /* Size of data in subbuffer */
34 uint32_t packet_size; /* Subbuffer size (include padding) */
35 uint32_t events_lost; /*
36 * Events lost in this subbuffer since
37 * the beginning of the trace.
38 * (may overflow)
39 */
40 #if 0
41 uint64_t start_time_sec; /* NTP-corrected start time */
42 uint64_t start_time_usec;
43 uint64_t start_freq; /*
44 * Frequency at trace start,
45 * used all along the trace.
46 */
47 uint32_t freq_scale; /* Frequency scaling (divisor) */
48 #endif //0
49 uint8_t header_end[0]; /* End of header */
50 };
51
52
53 static inline notrace u64 lib_ring_buffer_clock_read(struct channel *chan)
54 {
55 return trace_clock_read64();
56 }
57
58 /*
59 * record_header_size - Calculate the header size and padding necessary.
60 * @config: ring buffer instance configuration
61 * @chan: channel
62 * @offset: offset in the write buffer
63 * @data_size: size of the payload
64 * @pre_header_padding: padding to add before the header (output)
65 * @rflags: reservation flags
66 * @ctx: reservation context
67 *
68 * Returns the event header size (including padding).
69 *
70 * Important note :
71 * The event header must be 32-bits. The total offset calculated here :
72 *
73 * Alignment of header struct on 32 bits (min arch size, header size)
74 * + sizeof(header struct) (32-bits)
75 * + (opt) u16 (ext. event id)
76 * + (opt) u16 (event_size)
77 * (if event_size == LTT_MAX_SMALL_SIZE, has ext. event size)
78 * + (opt) u32 (ext. event size)
79 * + (opt) u64 full TSC (aligned on min(64-bits, arch size))
80 *
81 * The payload must itself determine its own alignment from the biggest type it
82 * contains.
83 */
84 static __inline__
85 unsigned char record_header_size(const struct lib_ring_buffer_config *config,
86 struct channel *chan, size_t offset,
87 size_t data_size, size_t *pre_header_padding,
88 unsigned int rflags,
89 struct lib_ring_buffer_ctx *ctx)
90 {
91 size_t orig_offset = offset;
92 size_t padding;
93
94 BUILD_BUG_ON(sizeof(struct event_header) != sizeof(u32));
95
96 padding = lib_ring_buffer_align(offset,
97 sizeof(struct event_header));
98 offset += padding;
99 offset += sizeof(struct event_header);
100
101 if (unlikely(rflags)) {
102 switch (rflags) {
103 case LTT_RFLAG_ID_SIZE_TSC:
104 offset += sizeof(u16) + sizeof(u16);
105 if (data_size >= LTT_MAX_SMALL_SIZE)
106 offset += sizeof(u32);
107 offset += lib_ring_buffer_align(offset, sizeof(u64));
108 offset += sizeof(u64);
109 break;
110 case LTT_RFLAG_ID_SIZE:
111 offset += sizeof(u16) + sizeof(u16);
112 if (data_size >= LTT_MAX_SMALL_SIZE)
113 offset += sizeof(u32);
114 break;
115 case LTT_RFLAG_ID:
116 offset += sizeof(u16);
117 break;
118 }
119 }
120
121 *pre_header_padding = padding;
122 return offset - orig_offset;
123 }
124
125 #include "wrapper/ringbuffer/api.h"
126
127 extern
128 void ltt_write_event_header_slow(const struct lib_ring_buffer_config *config,
129 struct lib_ring_buffer_ctx *ctx,
130 u16 eID, u32 event_size);
131
132 /*
133 * ltt_write_event_header
134 *
135 * Writes the event header to the offset (already aligned on 32-bits).
136 *
137 * @config: ring buffer instance configuration
138 * @ctx: reservation context
139 * @eID : event ID
140 * @event_size : size of the event, excluding the event header.
141 */
142 static __inline__
143 void ltt_write_event_header(const struct lib_ring_buffer_config *config,
144 struct lib_ring_buffer_ctx *ctx,
145 u16 eID, u32 event_size)
146 {
147 struct event_header header;
148
149 if (unlikely(ctx->rflags))
150 goto slow_path;
151
152 header.id_time = eID << LTT_TSC_BITS;
153 header.id_time |= (u32)ctx->tsc & LTT_TSC_MASK;
154 lib_ring_buffer_write(config, ctx, &header, sizeof(header));
155
156 slow_path:
157 ltt_write_event_header_slow(config, ctx, eID, event_size);
158 }
159
160 void ltt_write_event_header_slow(const struct lib_ring_buffer_config *config,
161 struct lib_ring_buffer_ctx *ctx,
162 u16 eID, u32 event_size)
163 {
164 struct event_header header;
165 u16 small_size;
166
167 switch (ctx->rflags) {
168 case LTT_RFLAG_ID_SIZE_TSC:
169 header.id_time = 29 << LTT_TSC_BITS;
170 break;
171 case LTT_RFLAG_ID_SIZE:
172 header.id_time = 30 << LTT_TSC_BITS;
173 break;
174 case LTT_RFLAG_ID:
175 header.id_time = 31 << LTT_TSC_BITS;
176 break;
177 default:
178 WARN_ON_ONCE(1);
179 header.id_time = 0;
180 }
181
182 header.id_time |= (u32)ctx->tsc & LTT_TSC_MASK;
183 lib_ring_buffer_write(config, ctx, &header, sizeof(header));
184
185 switch (ctx->rflags) {
186 case LTT_RFLAG_ID_SIZE_TSC:
187 small_size = (u16)min_t(u32, event_size, LTT_MAX_SMALL_SIZE);
188 lib_ring_buffer_write(config, ctx, &eID, sizeof(u16));
189 lib_ring_buffer_write(config, ctx, &small_size, sizeof(u16));
190 if (small_size == LTT_MAX_SMALL_SIZE)
191 lib_ring_buffer_write(config, ctx, &event_size,
192 sizeof(u32));
193 lib_ring_buffer_align_ctx(ctx, sizeof(u64));
194 lib_ring_buffer_write(config, ctx, &ctx->tsc, sizeof(u64));
195 break;
196 case LTT_RFLAG_ID_SIZE:
197 small_size = (u16)min_t(u32, event_size, LTT_MAX_SMALL_SIZE);
198 lib_ring_buffer_write(config, ctx, &eID, sizeof(u16));
199 lib_ring_buffer_write(config, ctx, &small_size, sizeof(u16));
200 if (small_size == LTT_MAX_SMALL_SIZE)
201 lib_ring_buffer_write(config, ctx, &event_size,
202 sizeof(u32));
203 break;
204 case LTT_RFLAG_ID:
205 lib_ring_buffer_write(config, ctx, &eID, sizeof(u16));
206 break;
207 }
208 }
209
210 static const struct lib_ring_buffer_config client_config;
211
212 static u64 client_ring_buffer_clock_read(struct channel *chan)
213 {
214 return lib_ring_buffer_clock_read(chan);
215 }
216
217 static
218 size_t client_record_header_size(const struct lib_ring_buffer_config *config,
219 struct channel *chan, size_t offset,
220 size_t data_size,
221 size_t *pre_header_padding,
222 unsigned int rflags,
223 struct lib_ring_buffer_ctx *ctx)
224 {
225 return record_header_size(config, chan, offset, data_size,
226 pre_header_padding, rflags, ctx);
227 }
228
229 /**
230 * client_packet_header_size - called on buffer-switch to a new sub-buffer
231 *
232 * Return header size without padding after the structure. Don't use packed
233 * structure because gcc generates inefficient code on some architectures
234 * (powerpc, mips..)
235 */
236 static size_t client_packet_header_size(void)
237 {
238 return offsetof(struct packet_header, header_end);
239 }
240
241 static void client_buffer_begin(struct lib_ring_buffer *buf, u64 tsc,
242 unsigned int subbuf_idx)
243 {
244 struct channel *chan = buf->backend.chan;
245 struct packet_header *header =
246 (struct packet_header *)
247 lib_ring_buffer_offset_address(&buf->backend,
248 subbuf_idx * chan->backend.subbuf_size);
249 struct ltt_session *session = channel_get_private(chan);
250
251 header->magic = CTF_MAGIC_NUMBER;
252 memcpy(header->uuid, session->uuid.b, sizeof(session->uuid));
253 header->timestamp_begin = tsc;
254 header->timestamp_end = 0;
255 header->content_size = 0xFFFFFFFF; /* for debugging */
256 header->packet_size = 0xFFFFFFFF;
257 header->events_lost = 0;
258 #if 0
259 header->start_time_sec = ltt_chan->session->start_time.tv_sec;
260 header->start_time_usec = ltt_chan->session->start_time.tv_usec;
261 header->start_freq = ltt_chan->session->start_freq;
262 header->freq_scale = ltt_chan->session->freq_scale;
263 #endif //0
264 }
265
266 /*
267 * offset is assumed to never be 0 here : never deliver a completely empty
268 * subbuffer. data_size is between 1 and subbuf_size.
269 */
270 static void client_buffer_end(struct lib_ring_buffer *buf, u64 tsc,
271 unsigned int subbuf_idx, unsigned long data_size)
272 {
273 struct channel *chan = buf->backend.chan;
274 struct packet_header *header =
275 (struct packet_header *)
276 lib_ring_buffer_offset_address(&buf->backend,
277 subbuf_idx * chan->backend.subbuf_size);
278 unsigned long records_lost = 0;
279
280 header->timestamp_end = tsc;
281 header->content_size = data_size;
282 header->packet_size = PAGE_ALIGN(data_size);
283 records_lost += lib_ring_buffer_get_records_lost_full(&client_config, buf);
284 records_lost += lib_ring_buffer_get_records_lost_wrap(&client_config, buf);
285 records_lost += lib_ring_buffer_get_records_lost_big(&client_config, buf);
286 header->events_lost = records_lost;
287 }
288
289 static int client_buffer_create(struct lib_ring_buffer *buf, void *priv,
290 int cpu, const char *name)
291 {
292 return 0;
293 }
294
295 static void client_buffer_finalize(struct lib_ring_buffer *buf, void *priv, int cpu)
296 {
297 }
298
299 static const struct lib_ring_buffer_config client_config = {
300 .cb.ring_buffer_clock_read = client_ring_buffer_clock_read,
301 .cb.record_header_size = client_record_header_size,
302 .cb.subbuffer_header_size = client_packet_header_size,
303 .cb.buffer_begin = client_buffer_begin,
304 .cb.buffer_end = client_buffer_end,
305 .cb.buffer_create = client_buffer_create,
306 .cb.buffer_finalize = client_buffer_finalize,
307
308 .tsc_bits = 32,
309 .alloc = RING_BUFFER_ALLOC_PER_CPU,
310 .sync = RING_BUFFER_SYNC_PER_CPU,
311 .mode = RING_BUFFER_MODE_TEMPLATE,
312 .backend = RING_BUFFER_PAGE,
313 .output = RING_BUFFER_SPLICE,
314 .oops = RING_BUFFER_OOPS_CONSISTENCY,
315 .ipi = RING_BUFFER_IPI_BARRIER,
316 .wakeup = RING_BUFFER_WAKEUP_BY_TIMER,
317 };
318
319 static
320 struct channel *_channel_create(const char *name,
321 struct ltt_session *session, void *buf_addr,
322 size_t subbuf_size, size_t num_subbuf,
323 unsigned int switch_timer_interval,
324 unsigned int read_timer_interval)
325 {
326 return channel_create(&client_config, name, session, buf_addr,
327 subbuf_size, num_subbuf, switch_timer_interval,
328 read_timer_interval);
329 }
330
331 static
332 void ltt_channel_destroy(struct channel *chan)
333 {
334 channel_destroy(chan);
335 }
336
337 static
338 struct lib_ring_buffer *ltt_buffer_read_open(struct channel *chan)
339 {
340 struct lib_ring_buffer *buf;
341 int cpu;
342
343 for_each_channel_cpu(cpu, chan) {
344 buf = channel_get_ring_buffer(&client_config, chan, cpu);
345 if (!lib_ring_buffer_open_read(buf))
346 return buf;
347 }
348 return NULL;
349 }
350
351 static
352 void ltt_buffer_read_close(struct lib_ring_buffer *buf)
353 {
354 lib_ring_buffer_release_read(buf);
355
356 }
357
358 static
359 int ltt_event_reserve(struct lib_ring_buffer_ctx *ctx)
360 {
361 int ret, cpu;
362
363 cpu = lib_ring_buffer_get_cpu(&client_config);
364 if (cpu < 0)
365 return -EPERM;
366 ctx->cpu = cpu;
367
368 ret = lib_ring_buffer_reserve(&client_config, ctx);
369 if (ret)
370 goto put;
371 return ret;
372
373 put:
374 lib_ring_buffer_put_cpu(&client_config);
375 return ret;
376 }
377
378 static
379 void ltt_event_commit(struct lib_ring_buffer_ctx *ctx)
380 {
381 lib_ring_buffer_commit(&client_config, ctx);
382 lib_ring_buffer_put_cpu(&client_config);
383 }
384
385 static
386 void ltt_event_write(struct lib_ring_buffer_ctx *ctx, const void *src,
387 size_t len)
388 {
389 lib_ring_buffer_write(&client_config, ctx, src, len);
390 }
391
392 static
393 wait_queue_head_t *ltt_get_reader_wait_queue(struct ltt_channel *chan)
394 {
395 return &chan->chan->read_wait;
396 }
397
398 static struct ltt_transport ltt_relay_transport = {
399 .name = "relay-" RING_BUFFER_MODE_TEMPLATE_STRING,
400 .owner = THIS_MODULE,
401 .ops = {
402 .channel_create = _channel_create,
403 .channel_destroy = ltt_channel_destroy,
404 .buffer_read_open = ltt_buffer_read_open,
405 .buffer_read_close = ltt_buffer_read_close,
406 .event_reserve = ltt_event_reserve,
407 .event_commit = ltt_event_commit,
408 .event_write = ltt_event_write,
409 .packet_avail_size = NULL, /* Would be racy anyway */
410 .get_reader_wait_queue = ltt_get_reader_wait_queue,
411 },
412 };
413
414 static int __init ltt_ring_buffer_client_init(void)
415 {
416 /*
417 * This vmalloc sync all also takes care of the lib ring buffer
418 * vmalloc'd module pages when it is built as a module into LTTng.
419 */
420 wrapper_vmalloc_sync_all();
421 printk(KERN_INFO "LTT : ltt ring buffer client init\n");
422 ltt_transport_register(&ltt_relay_transport);
423 return 0;
424 }
425
426 module_init(ltt_ring_buffer_client_init);
427
428 static void __exit ltt_ring_buffer_client_exit(void)
429 {
430 printk(KERN_INFO "LTT : ltt ring buffer client exit\n");
431 ltt_transport_unregister(&ltt_relay_transport);
432 }
433
434 module_exit(ltt_ring_buffer_client_exit);
435
436 MODULE_LICENSE("GPL and additional rights");
437 MODULE_AUTHOR("Mathieu Desnoyers");
438 MODULE_DESCRIPTION("LTTng ring buffer " RING_BUFFER_MODE_TEMPLATE_STRING
439 " client");
This page took 0.083948 seconds and 6 git commands to generate.