gdbtrace: Move plugins to their own sub-directory
[deliverable/tracecompass.git] / org.eclipse.tracecompass.common.core.tests / src / org / eclipse / tracecompass / common / core / tests / collect / BufferedBlockingQueueTest.java
CommitLineData
9d979fda
MK
1/*******************************************************************************
2 * Copyright (c) 2015 Ericsson
3 *
4 * All rights reserved. This program and the accompanying materials are
5 * made available under the terms of the Eclipse Public License v1.0 which
6 * accompanies this distribution, and is available at
7 * http://www.eclipse.org/legal/epl-v10.html
8 *
9 * Contributors:
10 * Matthew Khouzam - Initial API and implementation
11 *******************************************************************************/
12
13package org.eclipse.tracecompass.common.core.tests.collect;
14
9d979fda
MK
15import static org.junit.Assert.assertEquals;
16import static org.junit.Assert.assertFalse;
17import static org.junit.Assert.assertTrue;
fadd7888 18import static org.junit.Assert.fail;
9d979fda 19
47c79d9f
AM
20import java.util.Collection;
21import java.util.Deque;
d6e2666b 22import java.util.HashSet;
9d979fda 23import java.util.LinkedList;
fadd7888 24import java.util.List;
9d979fda 25import java.util.Random;
d6e2666b 26import java.util.Set;
9d979fda
MK
27import java.util.concurrent.Callable;
28import java.util.concurrent.ExecutionException;
29import java.util.concurrent.ExecutorService;
30import java.util.concurrent.Executors;
31import java.util.concurrent.Future;
32import java.util.concurrent.TimeUnit;
33
fadd7888 34import org.eclipse.jdt.annotation.NonNull;
9d979fda
MK
35import org.eclipse.tracecompass.common.core.NonNullUtils;
36import org.eclipse.tracecompass.common.core.collect.BufferedBlockingQueue;
37import org.junit.Before;
38import org.junit.Rule;
39import org.junit.Test;
40import org.junit.rules.TestRule;
41import org.junit.rules.Timeout;
42
fadd7888
AM
43import com.google.common.base.Functions;
44import com.google.common.collect.FluentIterable;
47c79d9f 45import com.google.common.collect.HashMultiset;
fadd7888 46import com.google.common.collect.Iterables;
47c79d9f 47import com.google.common.collect.Iterators;
fadd7888 48import com.google.common.primitives.Chars;
47c79d9f 49
9d979fda
MK
50/**
51 * Test suite for the {@link BufferedBlockingQueue}
52 */
53public class BufferedBlockingQueueTest {
54
55 /** Timeout the tests after 2 minutes */
56 @Rule
57 public TestRule timeoutRule = new Timeout(120000);
58
47c79d9f
AM
59 private static final String testString = "abcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyz" +
60 "abcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyz" +
61 "abcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyz";
62
9d979fda
MK
63 private BufferedBlockingQueue<Character> charQueue;
64
65 /**
66 * Test setup
67 */
68 @Before
69 public void init() {
70 charQueue = new BufferedBlockingQueue<>(15, 15);
71 }
72
73 /**
74 * Test inserting one element and removing it.
75 */
76 @Test
77 public void testSingleInsertion() {
78 Character element = 'x';
79 charQueue.put(element);
80 charQueue.flushInputBuffer();
81
82 Character out = charQueue.take();
83 assertEquals(element, out);
84 }
85
86 /**
87 * Test insertion of elements that fit into the input buffer.
88 */
89 @Test
90 public void testSimpleInsertion() {
91 String string = "Hello world!";
92 for (char elem : string.toCharArray()) {
93 charQueue.put(elem);
94 }
95 charQueue.flushInputBuffer();
96
97 StringBuilder sb = new StringBuilder();
98 while (!charQueue.isEmpty()) {
99 sb.append(charQueue.take());
100 }
101 assertEquals(string, sb.toString());
102 }
103
104 /**
105 * Test insertion of elements that will require more than one input buffer.
106 */
107 @Test
108 public void testLargeInsertion() {
109 String string = testString.substring(0, 222);
110 for (char elem : string.toCharArray()) {
111 charQueue.put(elem);
112 }
113 charQueue.flushInputBuffer();
114
115 StringBuilder sb = new StringBuilder();
116 while (!charQueue.isEmpty()) {
117 sb.append(charQueue.take());
118 }
119 assertEquals(string, sb.toString());
120 }
121
122 /**
123 * Test the state of the {@link BufferedBlockingQueue#isEmpty()} method at
124 * various moments.
125 */
126 @Test
127 public void testIsEmpty() {
128 BufferedBlockingQueue<String> stringQueue = new BufferedBlockingQueue<>(15, 15);
129 assertTrue(stringQueue.isEmpty());
130
131 stringQueue.put("Hello");
132 assertFalse(stringQueue.isEmpty());
133
134 stringQueue.flushInputBuffer();
135 assertFalse(stringQueue.isEmpty());
136
137 stringQueue.flushInputBuffer();
138 assertFalse(stringQueue.isEmpty());
139
140 stringQueue.flushInputBuffer();
141 stringQueue.take();
142 assertTrue(stringQueue.isEmpty());
143
144 stringQueue.flushInputBuffer();
145 assertTrue(stringQueue.isEmpty());
146 }
147
148 /**
149 * Write random data in and read it, several times.
150 */
151 @Test
152 public void testOddInsertions() {
153 BufferedBlockingQueue<Object> objectQueue = new BufferedBlockingQueue<>(15, 15);
154 LinkedList<Object> expectedValues = new LinkedList<>();
155 Random rnd = new Random();
156 rnd.setSeed(123);
157
158 for (int i = 0; i < 10; i++) {
159 /*
160 * The queue's total size is 225 (15x15). We must make sure to not
161 * fill it up here!
162 */
163 for (int j = 0; j < 50; j++) {
164 Integer testInt = NonNullUtils.checkNotNull(rnd.nextInt());
165 Long testLong = NonNullUtils.checkNotNull(rnd.nextLong());
166 Double testDouble = NonNullUtils.checkNotNull(rnd.nextDouble());
167 Double testGaussian = NonNullUtils.checkNotNull(rnd.nextGaussian());
168
169 expectedValues.add(testInt);
170 expectedValues.add(testLong);
171 expectedValues.add(testDouble);
172 expectedValues.add(testGaussian);
173 objectQueue.put(testInt);
174 objectQueue.put(testLong);
175 objectQueue.put(testDouble);
176 objectQueue.put(testGaussian);
177 }
178 objectQueue.flushInputBuffer();
179
180 while (!expectedValues.isEmpty()) {
181 Object expected = expectedValues.removeFirst();
182 Object actual = objectQueue.take();
183 assertEquals(expected, actual);
184 }
185 }
186 }
187
188 /**
189 * Read with a producer and a consumer
190 *
191 * @throws InterruptedException
192 * The test was interrupted
193 */
194 @Test
195 public void testMultiThread() throws InterruptedException {
196 /* A character not found in the test string */
197 final Character lastElement = '%';
198
199 Thread producer = new Thread() {
200 @Override
201 public void run() {
202 for (char c : testString.toCharArray()) {
203 charQueue.put(c);
204 }
205 charQueue.put(lastElement);
206 charQueue.flushInputBuffer();
207 }
208 };
209 producer.start();
210
211 Thread consumer = new Thread() {
212 @Override
213 public void run() {
214 Character s = charQueue.take();
215 while (!s.equals(lastElement)) {
216 s = charQueue.take();
217 }
218 }
219 };
220 consumer.start();
221
222 consumer.join();
223 producer.join();
224 }
225
47c79d9f
AM
226 /**
227 * Test the contents returned by {@link BufferedBlockingQueue#iterator()}.
228 *
229 * The test is sequential, because the iterator has no guarantee wrt to its
230 * contents when run concurrently.
231 */
232 @Test
233 public void testIteratorContents() {
234 Deque<Character> expected = new LinkedList<>();
235
236 /* Iterator should be empty initially */
237 assertFalse(charQueue.iterator().hasNext());
238
239 /* Insert the first 50 elements */
240 for (int i = 0; i < 50; i++) {
241 char c = testString.charAt(i);
242 charQueue.put(c);
243 expected.addFirst(c);
244 }
245 LinkedList<Character> actual = new LinkedList<>();
246 Iterators.addAll(actual, charQueue.iterator());
247 assertSameElements(expected, actual);
248
249 /*
250 * Insert more elements, flush the input buffer (should not affect the
251 * iteration).
252 */
253 for (int i = 50; i < 60; i++) {
254 char c = testString.charAt(i);
255 charQueue.put(c);
256 charQueue.flushInputBuffer();
257 expected.addFirst(c);
258 }
259 actual = new LinkedList<>();
260 Iterators.addAll(actual, charQueue.iterator());
261 assertSameElements(expected, actual);
262
263 /* Consume the 30 last elements from the queue */
264 for (int i = 0; i < 30; i++) {
265 charQueue.take();
266 expected.removeLast();
267 }
268 actual = new LinkedList<>();
269 Iterators.addAll(actual, charQueue.iterator());
270 assertSameElements(expected, actual);
271
272 /* Now empty the queue */
273 while (!charQueue.isEmpty()) {
274 charQueue.take();
275 expected.removeLast();
276 }
277 assertFalse(charQueue.iterator().hasNext());
278 }
279
280 /**
281 * Utility method to verify that two collections contain the exact same
282 * elements, not necessarily in the same iteration order.
283 *
284 * {@link Collection#equals} requires the iteration order to be the same,
285 * which we do not want here.
286 *
287 * Using a {@link Set} or {@link Collection#containsAll} is not sufficient
288 * either, because those will throw away duplicate elements.
289 */
290 private static <T> void assertSameElements(Collection<T> c1, Collection<T> c2) {
291 assertEquals(HashMultiset.create(c1), HashMultiset.create(c2));
292 }
293
9d979fda 294 /**
d6e2666b
AM
295 * Test iterating on the queue while a producer and a consumer threads are
296 * using it. The iteration should not affect the elements taken by the
297 * consumer.
9d979fda
MK
298 */
299 @Test
fadd7888 300 public void testConcurrentIteration() {
d6e2666b 301 final BufferedBlockingQueue<String> queue = new BufferedBlockingQueue<>(15, 15);
fadd7888 302 final String poisonPill = "That's all folks!";
9d979fda 303
fadd7888
AM
304 /*
305 * Convert the test's testBuffer into an array of String, one for each
306 * character. There are probably simpler ways of doing this, but it
307 * would not look as impressive.
308 */
309 FluentIterable<Character> fi = FluentIterable.from(Chars.asList(testString.toCharArray()));
310 List<String> strings = fi.transform(Functions.toStringFunction()).toList();
9d979fda 311
fadd7888
AM
312 Iterable<Iterable<String>> results =
313 runConcurrencyTest(queue, strings, poisonPill, 1, 1, 1);
314
315 assertEquals(strings, Iterables.getOnlyElement(results));
316 }
9d979fda 317
fadd7888
AM
318 /**
319 * Run a concurrency test on a {@link BufferedBlockingQueue}, with the
320 * specified number of producer, consumer and observer/iterator threads.
321 *
322 * The returned value represents the elements consumed by each consumer
323 * thread. Thus, if there is one consumer, the top-level {@link Iterable}
324 * will be of size 1, and the inner one should contain all the elements that
325 * were inserted.
326 *
327 * @param queue
328 * The queue to run the test on
329 * @param testBuffer
330 * The data set to insert in the queue. Every producer will
331 * insert one entire set.
332 * @param poisonPill
333 * The "poison pill" to indicate the end. Simply make sure it is
334 * a element of type <T> that is not present in the 'testBuffer'.
335 * @param nbProducerThreads
336 * Number of producer threads. There should be at least 1.
337 * @param nbConsumerThreads
338 * Number of consumer threads. There should be at least 1.
339 * @param nbObserverThreads
340 * Number of observer threads. It should be >= 0.
341 * @return The consumed elements, as seen by each consumer thread.
342 */
343 private static <T> Iterable<Iterable<T>> runConcurrencyTest(final BufferedBlockingQueue<T> queue,
344 final List<T> testBuffer,
345 final @NonNull T poisonPill,
346 int nbProducerThreads,
347 int nbConsumerThreads,
348 int nbObserverThreads) {
349
350 final class ProducerThread implements Runnable {
9d979fda
MK
351 @Override
352 public void run() {
fadd7888
AM
353 for (int i = 0; i < testBuffer.size(); i++) {
354 T elem = testBuffer.get(i);
355 if (elem == null) {
356 // TODO replace with List<@NonNull T> once we can
357 throw new IllegalArgumentException();
358 }
359 queue.put(elem);
9d979fda 360 }
d6e2666b
AM
361 queue.put(poisonPill);
362 queue.flushInputBuffer();
9d979fda 363 }
fadd7888 364 }
9d979fda 365
fadd7888
AM
366 /**
367 * The consumer thread will return the elements it read via its Future.
368 *
369 * Note that if there are multiple consumers, there is no guarantee with
370 * regards the contents of an individual one.
371 */
372 final class ConsumerThread implements Callable<Iterable<T>> {
9d979fda 373 @Override
fadd7888
AM
374 public Iterable<T> call() {
375 List<T> results = new LinkedList<>();
376 T elem = queue.take();
377 while (!elem.equals(poisonPill)) {
378 results.add(elem);
379 elem = queue.take();
9d979fda 380 }
fadd7888 381 return results;
9d979fda 382 }
fadd7888 383 }
9d979fda 384
fadd7888 385 final class ObserverThread implements Runnable {
9d979fda
MK
386 @Override
387 public void run() {
fadd7888
AM
388 for (int i = 0; i < 5; i++) {
389 final Set<T> results = new HashSet<>();
390 for (T input : queue) {
391 /*
392 * Do something with the element so that this iteration
393 * does not get optimized out.
394 */
d6e2666b 395 results.add(input);
9d979fda
MK
396 }
397 }
9d979fda 398 }
fadd7888 399 }
9d979fda 400
fadd7888
AM
401 if (nbProducerThreads < 1 || nbConsumerThreads < 1 || nbObserverThreads < 0) {
402 throw new IllegalArgumentException();
403 }
9d979fda 404
fadd7888
AM
405 final ExecutorService pool = Executors.newFixedThreadPool(
406 nbProducerThreads + nbConsumerThreads + nbObserverThreads);
9d979fda 407
fadd7888
AM
408 /* Consumed elements, per consumer thread */
409 List<Future<Iterable<T>>> consumedElements = new LinkedList<>();
9d979fda 410
fadd7888
AM
411 for (int i = 0; i < nbProducerThreads; i++) {
412 pool.submit(new ProducerThread());
413 }
414 for (int i = 0; i < nbConsumerThreads; i++) {
415 consumedElements.add(pool.submit(new ConsumerThread()));
416 }
417 for (int i = 0; i < nbObserverThreads; i++) {
418 pool.submit(new ObserverThread());
419 }
420
421 List<Iterable<T>> results = new LinkedList<>();
422 try {
423 /* Convert the Future's to the actual return value */
424 for (Future<Iterable<T>> future : consumedElements) {
425 Iterable<T> threadResult = future.get();
426 results.add(threadResult);
427 }
428
429 pool.shutdown();
430 boolean success = pool.awaitTermination(2, TimeUnit.MINUTES);
431 if (!success) {
432 throw new InterruptedException();
433 }
434
435 } catch (ExecutionException | InterruptedException e) {
436 fail(e.getMessage());
437 }
438
439 return results;
440 }
47c79d9f 441
fadd7888 442}
This page took 0.051257 seconds and 5 git commands to generate.