2 * ring_buffer_backend.c
4 * Copyright (C) 2005-2010 - Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
6 * Dual LGPL v2.1/GPL v2 license.
19 * lib_ring_buffer_backend_allocate - allocate a channel buffer
20 * @config: ring buffer instance configuration
21 * @buf: the buffer struct
22 * @size: total size of the buffer
23 * @num_subbuf: number of subbuffers
24 * @extra_reader_sb: need extra subbuffer for reader
27 int lib_ring_buffer_backend_allocate(const struct lib_ring_buffer_config
*config
,
28 struct lib_ring_buffer_backend
*bufb
,
29 size_t size
, size_t num_subbuf
,
31 struct shm_header
*shm_header
)
33 struct channel_backend
*chanb
= &shmp(bufb
->chan
)->backend
;
34 unsigned long subbuf_size
, mmap_offset
= 0;
35 unsigned long num_subbuf_alloc
;
38 subbuf_size
= chanb
->subbuf_size
;
39 num_subbuf_alloc
= num_subbuf
;
44 set_shmp(bufb
->array
, zalloc_shm(shm_header
,
45 sizeof(*bufb
->array
) * num_subbuf_alloc
));
46 if (unlikely(!shmp(bufb
->array
)))
49 set_shmp(bufb
->memory_map
, zalloc_shm(shm_header
,
50 subbuf_size
* num_subbuf_alloc
));
51 if (unlikely(!shmp(bufb
->memory_map
)))
52 goto memory_map_error
;
54 /* Allocate backend pages array elements */
55 for (i
= 0; i
< num_subbuf_alloc
; i
++) {
56 set_shmp(bufb
->array
[i
],
57 zalloc_shm(shm_header
,
58 sizeof(struct lib_ring_buffer_backend_pages
) +
60 if (!shmp(bufb
->array
[i
]))
64 /* Allocate write-side subbuffer table */
65 bufb
->buf_wsb
= zalloc_shm(shm_header
,
66 sizeof(struct lib_ring_buffer_backend_subbuffer
)
68 if (unlikely(!shmp(bufb
->buf_wsb
)))
71 for (i
= 0; i
< num_subbuf
; i
++)
72 shmp(bufb
->buf_wsb
)[i
].id
= subbuffer_id(config
, 0, 1, i
);
74 /* Assign read-side subbuffer table */
76 bufb
->buf_rsb
.id
= subbuffer_id(config
, 0, 1,
77 num_subbuf_alloc
- 1);
79 bufb
->buf_rsb
.id
= subbuffer_id(config
, 0, 1, 0);
81 /* Assign pages to page index */
82 for (i
= 0; i
< num_subbuf_alloc
; i
++) {
83 set_shmp(shmp(bufb
->array
)[i
]->p
,
84 &shmp(bufb
->memory_map
)[i
* subbuf_size
]);
85 if (config
->output
== RING_BUFFER_MMAP
) {
86 shmp(bufb
->array
)[i
]->mmap_offset
= mmap_offset
;
87 mmap_offset
+= subbuf_size
;
94 /* bufb->array[i] will be freed by shm teardown */
96 /* bufb->array will be freed by shm teardown */
101 int lib_ring_buffer_backend_create(struct lib_ring_buffer_backend
*bufb
,
102 struct channel_backend
*chanb
, int cpu
,
103 struct shm_header
*shm_header
)
105 const struct lib_ring_buffer_config
*config
= chanb
->config
;
107 set_shmp(&bufb
->chan
, caa_container_of(chanb
, struct channel
, backend
));
110 return lib_ring_buffer_backend_allocate(config
, bufb
, chanb
->buf_size
,
112 chanb
->extra_reader_sb
,
116 void lib_ring_buffer_backend_free(struct lib_ring_buffer_backend
*bufb
)
118 /* bufb->buf_wsb will be freed by shm teardown */
119 /* bufb->array[i] will be freed by shm teardown */
120 /* bufb->array will be freed by shm teardown */
124 void lib_ring_buffer_backend_reset(struct lib_ring_buffer_backend
*bufb
)
126 struct channel_backend
*chanb
= &shmp(bufb
->chan
)->backend
;
127 const struct lib_ring_buffer_config
*config
= chanb
->config
;
128 unsigned long num_subbuf_alloc
;
131 num_subbuf_alloc
= chanb
->num_subbuf
;
132 if (chanb
->extra_reader_sb
)
135 for (i
= 0; i
< chanb
->num_subbuf
; i
++)
136 shmp(bufb
->buf_wsb
)[i
].id
= subbuffer_id(config
, 0, 1, i
);
137 if (chanb
->extra_reader_sb
)
138 bufb
->buf_rsb
.id
= subbuffer_id(config
, 0, 1,
139 num_subbuf_alloc
- 1);
141 bufb
->buf_rsb
.id
= subbuffer_id(config
, 0, 1, 0);
143 for (i
= 0; i
< num_subbuf_alloc
; i
++) {
144 /* Don't reset mmap_offset */
145 v_set(config
, &shmp(bufb
->array
)[i
]->records_commit
, 0);
146 v_set(config
, &shmp(bufb
->array
)[i
]->records_unread
, 0);
147 shmp(bufb
->array
)[i
]->data_size
= 0;
148 /* Don't reset backend page and virt addresses */
150 /* Don't reset num_pages_per_subbuf, cpu, allocated */
151 v_set(config
, &bufb
->records_read
, 0);
155 * The frontend is responsible for also calling ring_buffer_backend_reset for
156 * each buffer when calling channel_backend_reset.
158 void channel_backend_reset(struct channel_backend
*chanb
)
160 struct channel
*chan
= caa_container_of(chanb
, struct channel
, backend
);
161 const struct lib_ring_buffer_config
*config
= chanb
->config
;
164 * Don't reset buf_size, subbuf_size, subbuf_size_order,
165 * num_subbuf_order, buf_size_order, extra_reader_sb, num_subbuf,
166 * priv, notifiers, config, cpumask and name.
168 chanb
->start_tsc
= config
->cb
.ring_buffer_clock_read(chan
);
172 * channel_backend_init - initialize a channel backend
173 * @chanb: channel backend
174 * @name: channel name
175 * @config: client ring buffer configuration
176 * @priv: client private data
177 * @parent: dentry of parent directory, %NULL for root directory
178 * @subbuf_size: size of sub-buffers (> PAGE_SIZE, power of 2)
179 * @num_subbuf: number of sub-buffers (power of 2)
180 * @shm_header: shared memory header
182 * Returns channel pointer if successful, %NULL otherwise.
184 * Creates per-cpu channel buffers using the sizes and attributes
185 * specified. The created channel buffer files will be named
186 * name_0...name_N-1. File permissions will be %S_IRUSR.
188 * Called with CPU hotplug disabled.
190 int channel_backend_init(struct channel_backend
*chanb
,
192 const struct lib_ring_buffer_config
*config
,
193 void *priv
, size_t subbuf_size
, size_t num_subbuf
,
194 struct shm_header
*shm_header
)
196 struct channel
*chan
= caa_container_of(chanb
, struct channel
, backend
);
203 if (!(subbuf_size
&& num_subbuf
))
206 /* Check that the subbuffer size is larger than a page. */
207 if (subbuf_size
< PAGE_SIZE
)
211 * Make sure the number of subbuffers and subbuffer size are power of 2.
213 CHAN_WARN_ON(chanb
, hweight32(subbuf_size
) != 1);
214 CHAN_WARN_ON(chanb
, hweight32(num_subbuf
) != 1);
216 ret
= subbuffer_id_check_index(config
, num_subbuf
);
221 chanb
->buf_size
= num_subbuf
* subbuf_size
;
222 chanb
->subbuf_size
= subbuf_size
;
223 chanb
->buf_size_order
= get_count_order(chanb
->buf_size
);
224 chanb
->subbuf_size_order
= get_count_order(subbuf_size
);
225 chanb
->num_subbuf_order
= get_count_order(num_subbuf
);
226 chanb
->extra_reader_sb
=
227 (config
->mode
== RING_BUFFER_OVERWRITE
) ? 1 : 0;
228 chanb
->num_subbuf
= num_subbuf
;
229 strncpy(chanb
->name
, name
, NAME_MAX
);
230 chanb
->name
[NAME_MAX
- 1] = '\0';
231 chanb
->config
= config
;
233 if (config
->alloc
== RING_BUFFER_ALLOC_PER_CPU
) {
234 struct lib_ring_buffer
*buf
;
237 /* Allocating the buffer per-cpu structures */
238 alloc_size
= sizeof(struct lib_ring_buffer
);
239 buf
= zalloc_shm(shm_header
, alloc_size
* num_possible_cpus());
242 set_shmp(chanb
->buf
, buf
);
245 * We need to allocate for all possible cpus.
247 for_each_possible_cpu(i
) {
248 ret
= lib_ring_buffer_create(&shmp(chanb
->buf
)[i
],
249 chanb
, i
, shm_header
);
251 goto free_bufs
; /* cpu hotplug locked */
254 struct lib_ring_buffer
*buf
;
257 alloc_size
= sizeof(struct lib_ring_buffer
);
258 chanb
->buf
= zmalloc(sizeof(struct lib_ring_buffer
));
259 buf
= zalloc_shm(shm_header
, alloc_size
);
262 set_shmp(chanb
->buf
, buf
);
263 ret
= lib_ring_buffer_create(shmp(chanb
->buf
), chanb
, -1,
268 chanb
->start_tsc
= config
->cb
.ring_buffer_clock_read(chan
);
273 if (config
->alloc
== RING_BUFFER_ALLOC_PER_CPU
) {
274 for_each_possible_cpu(i
) {
275 struct lib_ring_buffer
*buf
= &shmp(chanb
->buf
)[i
];
277 if (!buf
->backend
.allocated
)
279 lib_ring_buffer_free(buf
);
282 /* We only free the buffer data upon shm teardown */
288 * channel_backend_free - destroy the channel
291 * Destroy all channel buffers and frees the channel.
293 void channel_backend_free(struct channel_backend
*chanb
)
295 const struct lib_ring_buffer_config
*config
= chanb
->config
;
298 if (config
->alloc
== RING_BUFFER_ALLOC_PER_CPU
) {
299 for_each_possible_cpu(i
) {
300 struct lib_ring_buffer
*buf
= &shmp(chanb
->buf
)[i
];
302 if (!buf
->backend
.allocated
)
304 lib_ring_buffer_free(buf
);
307 struct lib_ring_buffer
*buf
= shmp(chanb
->buf
);
309 CHAN_WARN_ON(chanb
, !buf
->backend
.allocated
);
310 lib_ring_buffer_free(buf
);
312 /* We only free the buffer data upon shm teardown */
316 * lib_ring_buffer_read - read data from ring_buffer_buffer.
317 * @bufb : buffer backend
318 * @offset : offset within the buffer
319 * @dest : destination address
320 * @len : length to copy to destination
322 * Should be protected by get_subbuf/put_subbuf.
323 * Returns the length copied.
325 size_t lib_ring_buffer_read(struct lib_ring_buffer_backend
*bufb
, size_t offset
,
326 void *dest
, size_t len
)
328 struct channel_backend
*chanb
= &shmp(bufb
->chan
)->backend
;
329 const struct lib_ring_buffer_config
*config
= chanb
->config
;
331 struct lib_ring_buffer_backend_pages
*rpages
;
332 unsigned long sb_bindex
, id
;
335 offset
&= chanb
->buf_size
- 1;
339 id
= bufb
->buf_rsb
.id
;
340 sb_bindex
= subbuffer_id_get_index(config
, id
);
341 rpages
= shmp(bufb
->array
)[sb_bindex
];
343 * Underlying layer should never ask for reads across
346 CHAN_WARN_ON(chanb
, offset
>= chanb
->buf_size
);
347 CHAN_WARN_ON(chanb
, config
->mode
== RING_BUFFER_OVERWRITE
348 && subbuffer_id_is_noref(config
, id
));
349 memcpy(dest
, shmp(rpages
->p
) + (offset
& ~(chanb
->subbuf_size
- 1)), len
);
354 * lib_ring_buffer_read_cstr - read a C-style string from ring_buffer.
355 * @bufb : buffer backend
356 * @offset : offset within the buffer
357 * @dest : destination address
358 * @len : destination's length
360 * return string's length
361 * Should be protected by get_subbuf/put_subbuf.
363 int lib_ring_buffer_read_cstr(struct lib_ring_buffer_backend
*bufb
, size_t offset
,
364 void *dest
, size_t len
)
366 struct channel_backend
*chanb
= &shmp(bufb
->chan
)->backend
;
367 const struct lib_ring_buffer_config
*config
= chanb
->config
;
368 ssize_t string_len
, orig_offset
;
370 struct lib_ring_buffer_backend_pages
*rpages
;
371 unsigned long sb_bindex
, id
;
373 offset
&= chanb
->buf_size
- 1;
374 orig_offset
= offset
;
375 id
= bufb
->buf_rsb
.id
;
376 sb_bindex
= subbuffer_id_get_index(config
, id
);
377 rpages
= shmp(bufb
->array
)[sb_bindex
];
379 * Underlying layer should never ask for reads across
382 CHAN_WARN_ON(chanb
, offset
>= chanb
->buf_size
);
383 CHAN_WARN_ON(chanb
, config
->mode
== RING_BUFFER_OVERWRITE
384 && subbuffer_id_is_noref(config
, id
));
385 str
= (char *)shmp(rpages
->p
) + (offset
& ~(chanb
->subbuf_size
- 1));
386 string_len
= strnlen(str
, len
);
388 memcpy(dest
, str
, string_len
);
389 ((char *)dest
)[0] = 0;
391 return offset
- orig_offset
;
395 * lib_ring_buffer_read_offset_address - get address of a buffer location
396 * @bufb : buffer backend
397 * @offset : offset within the buffer.
399 * Return the address where a given offset is located (for read).
400 * Should be used to get the current subbuffer header pointer. Given we know
401 * it's never on a page boundary, it's safe to write directly to this address,
402 * as long as the write is never bigger than a page size.
404 void *lib_ring_buffer_read_offset_address(struct lib_ring_buffer_backend
*bufb
,
407 struct lib_ring_buffer_backend_pages
*rpages
;
408 struct channel_backend
*chanb
= &shmp(bufb
->chan
)->backend
;
409 const struct lib_ring_buffer_config
*config
= chanb
->config
;
410 unsigned long sb_bindex
, id
;
412 offset
&= chanb
->buf_size
- 1;
413 id
= bufb
->buf_rsb
.id
;
414 sb_bindex
= subbuffer_id_get_index(config
, id
);
415 rpages
= shmp(bufb
->array
)[sb_bindex
];
416 CHAN_WARN_ON(chanb
, config
->mode
== RING_BUFFER_OVERWRITE
417 && subbuffer_id_is_noref(config
, id
));
418 return shmp(rpages
->p
) + (offset
& ~(chanb
->subbuf_size
- 1));
422 * lib_ring_buffer_offset_address - get address of a location within the buffer
423 * @bufb : buffer backend
424 * @offset : offset within the buffer.
426 * Return the address where a given offset is located.
427 * Should be used to get the current subbuffer header pointer. Given we know
428 * it's always at the beginning of a page, it's safe to write directly to this
429 * address, as long as the write is never bigger than a page size.
431 void *lib_ring_buffer_offset_address(struct lib_ring_buffer_backend
*bufb
,
435 struct lib_ring_buffer_backend_pages
*rpages
;
436 struct channel_backend
*chanb
= &shmp(bufb
->chan
)->backend
;
437 const struct lib_ring_buffer_config
*config
= chanb
->config
;
438 unsigned long sb_bindex
, id
;
440 offset
&= chanb
->buf_size
- 1;
441 sbidx
= offset
>> chanb
->subbuf_size_order
;
442 id
= shmp(bufb
->buf_wsb
)[sbidx
].id
;
443 sb_bindex
= subbuffer_id_get_index(config
, id
);
444 rpages
= shmp(bufb
->array
)[sb_bindex
];
445 CHAN_WARN_ON(chanb
, config
->mode
== RING_BUFFER_OVERWRITE
446 && subbuffer_id_is_noref(config
, id
));
447 return shmp(rpages
->p
) + (offset
& ~(chanb
->subbuf_size
- 1));