11dd924ed4e6a90fe208c0eafb0c7ae0523431e4
[deliverable/lttng-modules.git] / lib / ringbuffer / ring_buffer_backend.c
1 /*
2 * ring_buffer_backend.c
3 *
4 * Copyright (C) 2005-2010 - Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
5 *
6 * Dual LGPL v2.1/GPL v2 license.
7 */
8
9 #include <linux/stddef.h>
10 #include <linux/module.h>
11 #include <linux/string.h>
12 #include <linux/bitops.h>
13 #include <linux/delay.h>
14 #include <linux/errno.h>
15 #include <linux/slab.h>
16 #include <linux/cpu.h>
17 #include <linux/mm.h>
18
19 #include "../../wrapper/vmalloc.h" /* for wrapper_vmalloc_sync_all() */
20 #include "../../wrapper/ringbuffer/config.h"
21 #include "../../wrapper/ringbuffer/backend.h"
22 #include "../../wrapper/ringbuffer/frontend.h"
23
24 /**
25 * lib_ring_buffer_backend_allocate - allocate a channel buffer
26 * @config: ring buffer instance configuration
27 * @buf: the buffer struct
28 * @size: total size of the buffer
29 * @num_subbuf: number of subbuffers
30 * @extra_reader_sb: need extra subbuffer for reader
31 */
32 static
33 int lib_ring_buffer_backend_allocate(const struct lib_ring_buffer_config *config,
34 struct lib_ring_buffer_backend *bufb,
35 size_t size, size_t num_subbuf,
36 int extra_reader_sb)
37 {
38 struct channel_backend *chanb = &bufb->chan->backend;
39 unsigned long j, num_pages, num_pages_per_subbuf, page_idx = 0;
40 unsigned long subbuf_size, mmap_offset = 0;
41 unsigned long num_subbuf_alloc;
42 struct page **pages;
43 void **virt;
44 unsigned long i;
45
46 num_pages = size >> PAGE_SHIFT;
47 num_pages_per_subbuf = num_pages >> get_count_order(num_subbuf);
48 subbuf_size = chanb->subbuf_size;
49 num_subbuf_alloc = num_subbuf;
50
51 if (extra_reader_sb) {
52 num_pages += num_pages_per_subbuf; /* Add pages for reader */
53 num_subbuf_alloc++;
54 }
55
56 pages = kmalloc_node(ALIGN(sizeof(*pages) * num_pages,
57 1 << INTERNODE_CACHE_SHIFT),
58 GFP_KERNEL, cpu_to_node(max(bufb->cpu, 0)));
59 if (unlikely(!pages))
60 goto pages_error;
61
62 virt = kmalloc_node(ALIGN(sizeof(*virt) * num_pages,
63 1 << INTERNODE_CACHE_SHIFT),
64 GFP_KERNEL, cpu_to_node(max(bufb->cpu, 0)));
65 if (unlikely(!virt))
66 goto virt_error;
67
68 bufb->array = kmalloc_node(ALIGN(sizeof(*bufb->array)
69 * num_subbuf_alloc,
70 1 << INTERNODE_CACHE_SHIFT),
71 GFP_KERNEL, cpu_to_node(max(bufb->cpu, 0)));
72 if (unlikely(!bufb->array))
73 goto array_error;
74
75 for (i = 0; i < num_pages; i++) {
76 pages[i] = alloc_pages_node(cpu_to_node(max(bufb->cpu, 0)),
77 GFP_KERNEL | __GFP_ZERO, 0);
78 if (unlikely(!pages[i]))
79 goto depopulate;
80 virt[i] = page_address(pages[i]);
81 }
82 bufb->num_pages_per_subbuf = num_pages_per_subbuf;
83
84 /* Allocate backend pages array elements */
85 for (i = 0; i < num_subbuf_alloc; i++) {
86 bufb->array[i] =
87 kzalloc_node(ALIGN(
88 sizeof(struct lib_ring_buffer_backend_pages) +
89 sizeof(struct lib_ring_buffer_backend_page)
90 * num_pages_per_subbuf,
91 1 << INTERNODE_CACHE_SHIFT),
92 GFP_KERNEL, cpu_to_node(max(bufb->cpu, 0)));
93 if (!bufb->array[i])
94 goto free_array;
95 }
96
97 /* Allocate write-side subbuffer table */
98 bufb->buf_wsb = kzalloc_node(ALIGN(
99 sizeof(struct lib_ring_buffer_backend_subbuffer)
100 * num_subbuf,
101 1 << INTERNODE_CACHE_SHIFT),
102 GFP_KERNEL, cpu_to_node(max(bufb->cpu, 0)));
103 if (unlikely(!bufb->buf_wsb))
104 goto free_array;
105
106 for (i = 0; i < num_subbuf; i++)
107 bufb->buf_wsb[i].id = subbuffer_id(config, 0, 1, i);
108
109 /* Assign read-side subbuffer table */
110 if (extra_reader_sb)
111 bufb->buf_rsb.id = subbuffer_id(config, 0, 1,
112 num_subbuf_alloc - 1);
113 else
114 bufb->buf_rsb.id = subbuffer_id(config, 0, 1, 0);
115
116 /* Assign pages to page index */
117 for (i = 0; i < num_subbuf_alloc; i++) {
118 for (j = 0; j < num_pages_per_subbuf; j++) {
119 CHAN_WARN_ON(chanb, page_idx > num_pages);
120 bufb->array[i]->p[j].virt = virt[page_idx];
121 bufb->array[i]->p[j].page = pages[page_idx];
122 page_idx++;
123 }
124 if (config->output == RING_BUFFER_MMAP) {
125 bufb->array[i]->mmap_offset = mmap_offset;
126 mmap_offset += subbuf_size;
127 }
128 }
129
130 /*
131 * If kmalloc ever uses vmalloc underneath, make sure the buffer pages
132 * will not fault.
133 */
134 wrapper_vmalloc_sync_all();
135 kfree(virt);
136 kfree(pages);
137 return 0;
138
139 free_array:
140 for (i = 0; (i < num_subbuf_alloc && bufb->array[i]); i++)
141 kfree(bufb->array[i]);
142 depopulate:
143 /* Free all allocated pages */
144 for (i = 0; (i < num_pages && pages[i]); i++)
145 __free_page(pages[i]);
146 kfree(bufb->array);
147 array_error:
148 kfree(virt);
149 virt_error:
150 kfree(pages);
151 pages_error:
152 return -ENOMEM;
153 }
154
155 int lib_ring_buffer_backend_create(struct lib_ring_buffer_backend *bufb,
156 struct channel_backend *chanb, int cpu)
157 {
158 const struct lib_ring_buffer_config *config = &chanb->config;
159
160 bufb->chan = container_of(chanb, struct channel, backend);
161 bufb->cpu = cpu;
162
163 return lib_ring_buffer_backend_allocate(config, bufb, chanb->buf_size,
164 chanb->num_subbuf,
165 chanb->extra_reader_sb);
166 }
167
168 void lib_ring_buffer_backend_free(struct lib_ring_buffer_backend *bufb)
169 {
170 struct channel_backend *chanb = &bufb->chan->backend;
171 unsigned long i, j, num_subbuf_alloc;
172
173 num_subbuf_alloc = chanb->num_subbuf;
174 if (chanb->extra_reader_sb)
175 num_subbuf_alloc++;
176
177 kfree(bufb->buf_wsb);
178 for (i = 0; i < num_subbuf_alloc; i++) {
179 for (j = 0; j < bufb->num_pages_per_subbuf; j++)
180 __free_page(bufb->array[i]->p[j].page);
181 kfree(bufb->array[i]);
182 }
183 kfree(bufb->array);
184 bufb->allocated = 0;
185 }
186
187 void lib_ring_buffer_backend_reset(struct lib_ring_buffer_backend *bufb)
188 {
189 struct channel_backend *chanb = &bufb->chan->backend;
190 const struct lib_ring_buffer_config *config = &chanb->config;
191 unsigned long num_subbuf_alloc;
192 unsigned int i;
193
194 num_subbuf_alloc = chanb->num_subbuf;
195 if (chanb->extra_reader_sb)
196 num_subbuf_alloc++;
197
198 for (i = 0; i < chanb->num_subbuf; i++)
199 bufb->buf_wsb[i].id = subbuffer_id(config, 0, 1, i);
200 if (chanb->extra_reader_sb)
201 bufb->buf_rsb.id = subbuffer_id(config, 0, 1,
202 num_subbuf_alloc - 1);
203 else
204 bufb->buf_rsb.id = subbuffer_id(config, 0, 1, 0);
205
206 for (i = 0; i < num_subbuf_alloc; i++) {
207 /* Don't reset mmap_offset */
208 v_set(config, &bufb->array[i]->records_commit, 0);
209 v_set(config, &bufb->array[i]->records_unread, 0);
210 bufb->array[i]->data_size = 0;
211 /* Don't reset backend page and virt addresses */
212 }
213 /* Don't reset num_pages_per_subbuf, cpu, allocated */
214 v_set(config, &bufb->records_read, 0);
215 }
216
217 /*
218 * The frontend is responsible for also calling ring_buffer_backend_reset for
219 * each buffer when calling channel_backend_reset.
220 */
221 void channel_backend_reset(struct channel_backend *chanb)
222 {
223 struct channel *chan = container_of(chanb, struct channel, backend);
224 const struct lib_ring_buffer_config *config = &chanb->config;
225
226 /*
227 * Don't reset buf_size, subbuf_size, subbuf_size_order,
228 * num_subbuf_order, buf_size_order, extra_reader_sb, num_subbuf,
229 * priv, notifiers, config, cpumask and name.
230 */
231 chanb->start_tsc = config->cb.ring_buffer_clock_read(chan);
232 }
233
234 #ifdef CONFIG_HOTPLUG_CPU
235 /**
236 * lib_ring_buffer_cpu_hp_callback - CPU hotplug callback
237 * @nb: notifier block
238 * @action: hotplug action to take
239 * @hcpu: CPU number
240 *
241 * Returns the success/failure of the operation. (%NOTIFY_OK, %NOTIFY_BAD)
242 */
243 static
244 int __cpuinit lib_ring_buffer_cpu_hp_callback(struct notifier_block *nb,
245 unsigned long action,
246 void *hcpu)
247 {
248 unsigned int cpu = (unsigned long)hcpu;
249 struct channel_backend *chanb = container_of(nb, struct channel_backend,
250 cpu_hp_notifier);
251 const struct lib_ring_buffer_config *config = &chanb->config;
252 struct lib_ring_buffer *buf;
253 int ret;
254
255 CHAN_WARN_ON(chanb, config->alloc == RING_BUFFER_ALLOC_GLOBAL);
256
257 switch (action) {
258 case CPU_UP_PREPARE:
259 case CPU_UP_PREPARE_FROZEN:
260 buf = per_cpu_ptr(chanb->buf, cpu);
261 ret = lib_ring_buffer_create(buf, chanb, cpu);
262 if (ret) {
263 printk(KERN_ERR
264 "ring_buffer_cpu_hp_callback: cpu %d "
265 "buffer creation failed\n", cpu);
266 return NOTIFY_BAD;
267 }
268 break;
269 case CPU_DEAD:
270 case CPU_DEAD_FROZEN:
271 /* No need to do a buffer switch here, because it will happen
272 * when tracing is stopped, or will be done by switch timer CPU
273 * DEAD callback. */
274 break;
275 }
276 return NOTIFY_OK;
277 }
278 #endif
279
280 /**
281 * channel_backend_init - initialize a channel backend
282 * @chanb: channel backend
283 * @name: channel name
284 * @config: client ring buffer configuration
285 * @priv: client private data
286 * @parent: dentry of parent directory, %NULL for root directory
287 * @subbuf_size: size of sub-buffers (> PAGE_SIZE, power of 2)
288 * @num_subbuf: number of sub-buffers (power of 2)
289 *
290 * Returns channel pointer if successful, %NULL otherwise.
291 *
292 * Creates per-cpu channel buffers using the sizes and attributes
293 * specified. The created channel buffer files will be named
294 * name_0...name_N-1. File permissions will be %S_IRUSR.
295 *
296 * Called with CPU hotplug disabled.
297 */
298 int channel_backend_init(struct channel_backend *chanb,
299 const char *name,
300 const struct lib_ring_buffer_config *config,
301 void *priv, size_t subbuf_size, size_t num_subbuf)
302 {
303 struct channel *chan = container_of(chanb, struct channel, backend);
304 unsigned int i;
305 int ret;
306
307 if (!name)
308 return -EPERM;
309
310 if (!(subbuf_size && num_subbuf))
311 return -EPERM;
312
313 /* Check that the subbuffer size is larger than a page. */
314 if (subbuf_size < PAGE_SIZE)
315 return -EINVAL;
316
317 /*
318 * Make sure the number of subbuffers and subbuffer size are power of 2.
319 */
320 CHAN_WARN_ON(chanb, hweight32(subbuf_size) != 1);
321 CHAN_WARN_ON(chanb, hweight32(num_subbuf) != 1);
322
323 ret = subbuffer_id_check_index(config, num_subbuf);
324 if (ret)
325 return ret;
326
327 chanb->priv = priv;
328 chanb->buf_size = num_subbuf * subbuf_size;
329 chanb->subbuf_size = subbuf_size;
330 chanb->buf_size_order = get_count_order(chanb->buf_size);
331 chanb->subbuf_size_order = get_count_order(subbuf_size);
332 chanb->num_subbuf_order = get_count_order(num_subbuf);
333 chanb->extra_reader_sb =
334 (config->mode == RING_BUFFER_OVERWRITE) ? 1 : 0;
335 chanb->num_subbuf = num_subbuf;
336 strlcpy(chanb->name, name, NAME_MAX);
337 memcpy(&chanb->config, config, sizeof(chanb->config));
338
339 if (config->alloc == RING_BUFFER_ALLOC_PER_CPU) {
340 if (!zalloc_cpumask_var(&chanb->cpumask, GFP_KERNEL))
341 return -ENOMEM;
342 }
343
344 if (config->alloc == RING_BUFFER_ALLOC_PER_CPU) {
345 /* Allocating the buffer per-cpu structures */
346 chanb->buf = alloc_percpu(struct lib_ring_buffer);
347 if (!chanb->buf)
348 goto free_cpumask;
349
350 /*
351 * In case of non-hotplug cpu, if the ring-buffer is allocated
352 * in early initcall, it will not be notified of secondary cpus.
353 * In that off case, we need to allocate for all possible cpus.
354 */
355 #ifdef CONFIG_HOTPLUG_CPU
356 /*
357 * buf->backend.allocated test takes care of concurrent CPU
358 * hotplug.
359 * Priority higher than frontend, so we create the ring buffer
360 * before we start the timer.
361 */
362 chanb->cpu_hp_notifier.notifier_call =
363 lib_ring_buffer_cpu_hp_callback;
364 chanb->cpu_hp_notifier.priority = 5;
365 register_hotcpu_notifier(&chanb->cpu_hp_notifier);
366
367 get_online_cpus();
368 for_each_online_cpu(i) {
369 ret = lib_ring_buffer_create(per_cpu_ptr(chanb->buf, i),
370 chanb, i);
371 if (ret)
372 goto free_bufs; /* cpu hotplug locked */
373 }
374 put_online_cpus();
375 #else
376 for_each_possible_cpu(i) {
377 ret = lib_ring_buffer_create(per_cpu_ptr(chanb->buf, i),
378 chanb, i);
379 if (ret)
380 goto free_bufs; /* cpu hotplug locked */
381 }
382 #endif
383 } else {
384 chanb->buf = kzalloc(sizeof(struct lib_ring_buffer), GFP_KERNEL);
385 if (!chanb->buf)
386 goto free_cpumask;
387 ret = lib_ring_buffer_create(chanb->buf, chanb, -1);
388 if (ret)
389 goto free_bufs;
390 }
391 chanb->start_tsc = config->cb.ring_buffer_clock_read(chan);
392
393 return 0;
394
395 free_bufs:
396 if (config->alloc == RING_BUFFER_ALLOC_PER_CPU) {
397 for_each_possible_cpu(i) {
398 struct lib_ring_buffer *buf = per_cpu_ptr(chanb->buf, i);
399
400 if (!buf->backend.allocated)
401 continue;
402 lib_ring_buffer_free(buf);
403 }
404 #ifdef CONFIG_HOTPLUG_CPU
405 put_online_cpus();
406 #endif
407 free_percpu(chanb->buf);
408 } else
409 kfree(chanb->buf);
410 free_cpumask:
411 if (config->alloc == RING_BUFFER_ALLOC_PER_CPU)
412 free_cpumask_var(chanb->cpumask);
413 return -ENOMEM;
414 }
415
416 /**
417 * channel_backend_unregister_notifiers - unregister notifiers
418 * @chan: the channel
419 *
420 * Holds CPU hotplug.
421 */
422 void channel_backend_unregister_notifiers(struct channel_backend *chanb)
423 {
424 const struct lib_ring_buffer_config *config = &chanb->config;
425
426 if (config->alloc == RING_BUFFER_ALLOC_PER_CPU)
427 unregister_hotcpu_notifier(&chanb->cpu_hp_notifier);
428 }
429
430 /**
431 * channel_backend_free - destroy the channel
432 * @chan: the channel
433 *
434 * Destroy all channel buffers and frees the channel.
435 */
436 void channel_backend_free(struct channel_backend *chanb)
437 {
438 const struct lib_ring_buffer_config *config = &chanb->config;
439 unsigned int i;
440
441 if (config->alloc == RING_BUFFER_ALLOC_PER_CPU) {
442 for_each_possible_cpu(i) {
443 struct lib_ring_buffer *buf = per_cpu_ptr(chanb->buf, i);
444
445 if (!buf->backend.allocated)
446 continue;
447 lib_ring_buffer_free(buf);
448 }
449 free_cpumask_var(chanb->cpumask);
450 free_percpu(chanb->buf);
451 } else {
452 struct lib_ring_buffer *buf = chanb->buf;
453
454 CHAN_WARN_ON(chanb, !buf->backend.allocated);
455 lib_ring_buffer_free(buf);
456 kfree(buf);
457 }
458 }
459
460 /**
461 * lib_ring_buffer_write - write data to a ring_buffer buffer.
462 * @bufb : buffer backend
463 * @offset : offset within the buffer
464 * @src : source address
465 * @len : length to write
466 * @pagecpy : page size copied so far
467 */
468 void _lib_ring_buffer_write(struct lib_ring_buffer_backend *bufb, size_t offset,
469 const void *src, size_t len, ssize_t pagecpy)
470 {
471 struct channel_backend *chanb = &bufb->chan->backend;
472 const struct lib_ring_buffer_config *config = &chanb->config;
473 size_t sbidx, index;
474 struct lib_ring_buffer_backend_pages *rpages;
475 unsigned long sb_bindex, id;
476
477 do {
478 len -= pagecpy;
479 src += pagecpy;
480 offset += pagecpy;
481 sbidx = offset >> chanb->subbuf_size_order;
482 index = (offset & (chanb->subbuf_size - 1)) >> PAGE_SHIFT;
483
484 /*
485 * Underlying layer should never ask for writes across
486 * subbuffers.
487 */
488 CHAN_WARN_ON(chanb, offset >= chanb->buf_size);
489
490 pagecpy = min_t(size_t, len, PAGE_SIZE - (offset & ~PAGE_MASK));
491 id = bufb->buf_wsb[sbidx].id;
492 sb_bindex = subbuffer_id_get_index(config, id);
493 rpages = bufb->array[sb_bindex];
494 CHAN_WARN_ON(chanb, config->mode == RING_BUFFER_OVERWRITE
495 && subbuffer_id_is_noref(config, id));
496 lib_ring_buffer_do_copy(config,
497 rpages->p[index].virt
498 + (offset & ~PAGE_MASK),
499 src, pagecpy);
500 } while (unlikely(len != pagecpy));
501 }
502 EXPORT_SYMBOL_GPL(_lib_ring_buffer_write);
503
504
505 /**
506 * lib_ring_buffer_memset - write len bytes of c to a ring_buffer buffer.
507 * @bufb : buffer backend
508 * @offset : offset within the buffer
509 * @c : the byte to write
510 * @len : length to write
511 * @pagecpy : page size copied so far
512 */
513 void _lib_ring_buffer_memset(struct lib_ring_buffer_backend *bufb,
514 size_t offset,
515 int c, size_t len, ssize_t pagecpy)
516 {
517 struct channel_backend *chanb = &bufb->chan->backend;
518 const struct lib_ring_buffer_config *config = &chanb->config;
519 size_t sbidx, index;
520 struct lib_ring_buffer_backend_pages *rpages;
521 unsigned long sb_bindex, id;
522
523 do {
524 len -= pagecpy;
525 offset += pagecpy;
526 sbidx = offset >> chanb->subbuf_size_order;
527 index = (offset & (chanb->subbuf_size - 1)) >> PAGE_SHIFT;
528
529 /*
530 * Underlying layer should never ask for writes across
531 * subbuffers.
532 */
533 CHAN_WARN_ON(chanb, offset >= chanb->buf_size);
534
535 pagecpy = min_t(size_t, len, PAGE_SIZE - (offset & ~PAGE_MASK));
536 id = bufb->buf_wsb[sbidx].id;
537 sb_bindex = subbuffer_id_get_index(config, id);
538 rpages = bufb->array[sb_bindex];
539 CHAN_WARN_ON(chanb, config->mode == RING_BUFFER_OVERWRITE
540 && subbuffer_id_is_noref(config, id));
541 lib_ring_buffer_do_memset(rpages->p[index].virt
542 + (offset & ~PAGE_MASK),
543 c, pagecpy);
544 } while (unlikely(len != pagecpy));
545 }
546 EXPORT_SYMBOL_GPL(_lib_ring_buffer_memset);
547
548
549 /**
550 * lib_ring_buffer_copy_from_user - write user data to a ring_buffer buffer.
551 * @bufb : buffer backend
552 * @offset : offset within the buffer
553 * @src : source address
554 * @len : length to write
555 * @pagecpy : page size copied so far
556 *
557 * This function deals with userspace pointers, it should never be called
558 * directly without having the src pointer checked with access_ok()
559 * previously.
560 */
561 void _lib_ring_buffer_copy_from_user(struct lib_ring_buffer_backend *bufb,
562 size_t offset,
563 const void __user *src, size_t len,
564 ssize_t pagecpy)
565 {
566 struct channel_backend *chanb = &bufb->chan->backend;
567 const struct lib_ring_buffer_config *config = &chanb->config;
568 size_t sbidx, index;
569 struct lib_ring_buffer_backend_pages *rpages;
570 unsigned long sb_bindex, id;
571 int ret;
572
573 do {
574 len -= pagecpy;
575 src += pagecpy;
576 offset += pagecpy;
577 sbidx = offset >> chanb->subbuf_size_order;
578 index = (offset & (chanb->subbuf_size - 1)) >> PAGE_SHIFT;
579
580 /*
581 * Underlying layer should never ask for writes across
582 * subbuffers.
583 */
584 CHAN_WARN_ON(chanb, offset >= chanb->buf_size);
585
586 pagecpy = min_t(size_t, len, PAGE_SIZE - (offset & ~PAGE_MASK));
587 id = bufb->buf_wsb[sbidx].id;
588 sb_bindex = subbuffer_id_get_index(config, id);
589 rpages = bufb->array[sb_bindex];
590 CHAN_WARN_ON(chanb, config->mode == RING_BUFFER_OVERWRITE
591 && subbuffer_id_is_noref(config, id));
592 ret = lib_ring_buffer_do_copy_from_user(rpages->p[index].virt
593 + (offset & ~PAGE_MASK),
594 src, pagecpy) != 0;
595 if (ret > 0) {
596 offset += (pagecpy - ret);
597 len -= (pagecpy - ret);
598 _lib_ring_buffer_memset(bufb, offset, 0, len, 0);
599 break; /* stop copy */
600 }
601 } while (unlikely(len != pagecpy));
602 }
603 EXPORT_SYMBOL_GPL(_lib_ring_buffer_copy_from_user);
604
605 /**
606 * lib_ring_buffer_read - read data from ring_buffer_buffer.
607 * @bufb : buffer backend
608 * @offset : offset within the buffer
609 * @dest : destination address
610 * @len : length to copy to destination
611 *
612 * Should be protected by get_subbuf/put_subbuf.
613 * Returns the length copied.
614 */
615 size_t lib_ring_buffer_read(struct lib_ring_buffer_backend *bufb, size_t offset,
616 void *dest, size_t len)
617 {
618 struct channel_backend *chanb = &bufb->chan->backend;
619 const struct lib_ring_buffer_config *config = &chanb->config;
620 size_t index;
621 ssize_t pagecpy, orig_len;
622 struct lib_ring_buffer_backend_pages *rpages;
623 unsigned long sb_bindex, id;
624
625 orig_len = len;
626 offset &= chanb->buf_size - 1;
627 index = (offset & (chanb->subbuf_size - 1)) >> PAGE_SHIFT;
628 if (unlikely(!len))
629 return 0;
630 for (;;) {
631 pagecpy = min_t(size_t, len, PAGE_SIZE - (offset & ~PAGE_MASK));
632 id = bufb->buf_rsb.id;
633 sb_bindex = subbuffer_id_get_index(config, id);
634 rpages = bufb->array[sb_bindex];
635 CHAN_WARN_ON(chanb, config->mode == RING_BUFFER_OVERWRITE
636 && subbuffer_id_is_noref(config, id));
637 memcpy(dest, rpages->p[index].virt + (offset & ~PAGE_MASK),
638 pagecpy);
639 len -= pagecpy;
640 if (likely(!len))
641 break;
642 dest += pagecpy;
643 offset += pagecpy;
644 index = (offset & (chanb->subbuf_size - 1)) >> PAGE_SHIFT;
645 /*
646 * Underlying layer should never ask for reads across
647 * subbuffers.
648 */
649 CHAN_WARN_ON(chanb, offset >= chanb->buf_size);
650 }
651 return orig_len;
652 }
653 EXPORT_SYMBOL_GPL(lib_ring_buffer_read);
654
655 /**
656 * __lib_ring_buffer_copy_to_user - read data from ring_buffer to userspace
657 * @bufb : buffer backend
658 * @offset : offset within the buffer
659 * @dest : destination userspace address
660 * @len : length to copy to destination
661 *
662 * Should be protected by get_subbuf/put_subbuf.
663 * access_ok() must have been performed on dest addresses prior to call this
664 * function.
665 * Returns -EFAULT on error, 0 if ok.
666 */
667 int __lib_ring_buffer_copy_to_user(struct lib_ring_buffer_backend *bufb,
668 size_t offset, void __user *dest, size_t len)
669 {
670 struct channel_backend *chanb = &bufb->chan->backend;
671 const struct lib_ring_buffer_config *config = &chanb->config;
672 size_t index;
673 ssize_t pagecpy;
674 struct lib_ring_buffer_backend_pages *rpages;
675 unsigned long sb_bindex, id;
676
677 offset &= chanb->buf_size - 1;
678 index = (offset & (chanb->subbuf_size - 1)) >> PAGE_SHIFT;
679 if (unlikely(!len))
680 return 0;
681 for (;;) {
682 pagecpy = min_t(size_t, len, PAGE_SIZE - (offset & ~PAGE_MASK));
683 id = bufb->buf_rsb.id;
684 sb_bindex = subbuffer_id_get_index(config, id);
685 rpages = bufb->array[sb_bindex];
686 CHAN_WARN_ON(chanb, config->mode == RING_BUFFER_OVERWRITE
687 && subbuffer_id_is_noref(config, id));
688 if (__copy_to_user(dest,
689 rpages->p[index].virt + (offset & ~PAGE_MASK),
690 pagecpy))
691 return -EFAULT;
692 len -= pagecpy;
693 if (likely(!len))
694 break;
695 dest += pagecpy;
696 offset += pagecpy;
697 index = (offset & (chanb->subbuf_size - 1)) >> PAGE_SHIFT;
698 /*
699 * Underlying layer should never ask for reads across
700 * subbuffers.
701 */
702 CHAN_WARN_ON(chanb, offset >= chanb->buf_size);
703 }
704 return 0;
705 }
706 EXPORT_SYMBOL_GPL(__lib_ring_buffer_copy_to_user);
707
708 /**
709 * lib_ring_buffer_read_cstr - read a C-style string from ring_buffer.
710 * @bufb : buffer backend
711 * @offset : offset within the buffer
712 * @dest : destination address
713 * @len : destination's length
714 *
715 * return string's length
716 * Should be protected by get_subbuf/put_subbuf.
717 */
718 int lib_ring_buffer_read_cstr(struct lib_ring_buffer_backend *bufb, size_t offset,
719 void *dest, size_t len)
720 {
721 struct channel_backend *chanb = &bufb->chan->backend;
722 const struct lib_ring_buffer_config *config = &chanb->config;
723 size_t index;
724 ssize_t pagecpy, pagelen, strpagelen, orig_offset;
725 char *str;
726 struct lib_ring_buffer_backend_pages *rpages;
727 unsigned long sb_bindex, id;
728
729 offset &= chanb->buf_size - 1;
730 index = (offset & (chanb->subbuf_size - 1)) >> PAGE_SHIFT;
731 orig_offset = offset;
732 for (;;) {
733 id = bufb->buf_rsb.id;
734 sb_bindex = subbuffer_id_get_index(config, id);
735 rpages = bufb->array[sb_bindex];
736 CHAN_WARN_ON(chanb, config->mode == RING_BUFFER_OVERWRITE
737 && subbuffer_id_is_noref(config, id));
738 str = (char *)rpages->p[index].virt + (offset & ~PAGE_MASK);
739 pagelen = PAGE_SIZE - (offset & ~PAGE_MASK);
740 strpagelen = strnlen(str, pagelen);
741 if (len) {
742 pagecpy = min_t(size_t, len, strpagelen);
743 if (dest) {
744 memcpy(dest, str, pagecpy);
745 dest += pagecpy;
746 }
747 len -= pagecpy;
748 }
749 offset += strpagelen;
750 index = (offset & (chanb->subbuf_size - 1)) >> PAGE_SHIFT;
751 if (strpagelen < pagelen)
752 break;
753 /*
754 * Underlying layer should never ask for reads across
755 * subbuffers.
756 */
757 CHAN_WARN_ON(chanb, offset >= chanb->buf_size);
758 }
759 if (dest && len)
760 ((char *)dest)[0] = 0;
761 return offset - orig_offset;
762 }
763 EXPORT_SYMBOL_GPL(lib_ring_buffer_read_cstr);
764
765 /**
766 * lib_ring_buffer_read_get_page - Get a whole page to read from
767 * @bufb : buffer backend
768 * @offset : offset within the buffer
769 * @virt : pointer to page address (output)
770 *
771 * Should be protected by get_subbuf/put_subbuf.
772 * Returns the pointer to the page struct pointer.
773 */
774 struct page **lib_ring_buffer_read_get_page(struct lib_ring_buffer_backend *bufb,
775 size_t offset, void ***virt)
776 {
777 size_t index;
778 struct lib_ring_buffer_backend_pages *rpages;
779 struct channel_backend *chanb = &bufb->chan->backend;
780 const struct lib_ring_buffer_config *config = &chanb->config;
781 unsigned long sb_bindex, id;
782
783 offset &= chanb->buf_size - 1;
784 index = (offset & (chanb->subbuf_size - 1)) >> PAGE_SHIFT;
785 id = bufb->buf_rsb.id;
786 sb_bindex = subbuffer_id_get_index(config, id);
787 rpages = bufb->array[sb_bindex];
788 CHAN_WARN_ON(chanb, config->mode == RING_BUFFER_OVERWRITE
789 && subbuffer_id_is_noref(config, id));
790 *virt = &rpages->p[index].virt;
791 return &rpages->p[index].page;
792 }
793 EXPORT_SYMBOL_GPL(lib_ring_buffer_read_get_page);
794
795 /**
796 * lib_ring_buffer_read_offset_address - get address of a buffer location
797 * @bufb : buffer backend
798 * @offset : offset within the buffer.
799 *
800 * Return the address where a given offset is located (for read).
801 * Should be used to get the current subbuffer header pointer. Given we know
802 * it's never on a page boundary, it's safe to write directly to this address,
803 * as long as the write is never bigger than a page size.
804 */
805 void *lib_ring_buffer_read_offset_address(struct lib_ring_buffer_backend *bufb,
806 size_t offset)
807 {
808 size_t index;
809 struct lib_ring_buffer_backend_pages *rpages;
810 struct channel_backend *chanb = &bufb->chan->backend;
811 const struct lib_ring_buffer_config *config = &chanb->config;
812 unsigned long sb_bindex, id;
813
814 offset &= chanb->buf_size - 1;
815 index = (offset & (chanb->subbuf_size - 1)) >> PAGE_SHIFT;
816 id = bufb->buf_rsb.id;
817 sb_bindex = subbuffer_id_get_index(config, id);
818 rpages = bufb->array[sb_bindex];
819 CHAN_WARN_ON(chanb, config->mode == RING_BUFFER_OVERWRITE
820 && subbuffer_id_is_noref(config, id));
821 return rpages->p[index].virt + (offset & ~PAGE_MASK);
822 }
823 EXPORT_SYMBOL_GPL(lib_ring_buffer_read_offset_address);
824
825 /**
826 * lib_ring_buffer_offset_address - get address of a location within the buffer
827 * @bufb : buffer backend
828 * @offset : offset within the buffer.
829 *
830 * Return the address where a given offset is located.
831 * Should be used to get the current subbuffer header pointer. Given we know
832 * it's always at the beginning of a page, it's safe to write directly to this
833 * address, as long as the write is never bigger than a page size.
834 */
835 void *lib_ring_buffer_offset_address(struct lib_ring_buffer_backend *bufb,
836 size_t offset)
837 {
838 size_t sbidx, index;
839 struct lib_ring_buffer_backend_pages *rpages;
840 struct channel_backend *chanb = &bufb->chan->backend;
841 const struct lib_ring_buffer_config *config = &chanb->config;
842 unsigned long sb_bindex, id;
843
844 offset &= chanb->buf_size - 1;
845 sbidx = offset >> chanb->subbuf_size_order;
846 index = (offset & (chanb->subbuf_size - 1)) >> PAGE_SHIFT;
847 id = bufb->buf_wsb[sbidx].id;
848 sb_bindex = subbuffer_id_get_index(config, id);
849 rpages = bufb->array[sb_bindex];
850 CHAN_WARN_ON(chanb, config->mode == RING_BUFFER_OVERWRITE
851 && subbuffer_id_is_noref(config, id));
852 return rpages->p[index].virt + (offset & ~PAGE_MASK);
853 }
854 EXPORT_SYMBOL_GPL(lib_ring_buffer_offset_address);
This page took 0.062237 seconds and 4 git commands to generate.