Commit | Line | Data |
---|---|---|
9d979fda MK |
1 | /******************************************************************************* |
2 | * Copyright (c) 2015 Ericsson | |
3 | * | |
4 | * All rights reserved. This program and the accompanying materials are | |
5 | * made available under the terms of the Eclipse Public License v1.0 which | |
6 | * accompanies this distribution, and is available at | |
7 | * http://www.eclipse.org/legal/epl-v10.html | |
8 | * | |
9 | * Contributors: | |
10 | * Matthew Khouzam - Initial API and implementation | |
11 | *******************************************************************************/ | |
12 | ||
13 | package org.eclipse.tracecompass.common.core.tests.collect; | |
14 | ||
9d979fda MK |
15 | import static org.junit.Assert.assertEquals; |
16 | import static org.junit.Assert.assertFalse; | |
17 | import static org.junit.Assert.assertTrue; | |
fadd7888 | 18 | import static org.junit.Assert.fail; |
9d979fda | 19 | |
47c79d9f AM |
20 | import java.util.Collection; |
21 | import java.util.Deque; | |
d6e2666b | 22 | import java.util.HashSet; |
9d979fda | 23 | import java.util.LinkedList; |
fadd7888 | 24 | import java.util.List; |
9d979fda | 25 | import java.util.Random; |
d6e2666b | 26 | import java.util.Set; |
9d979fda MK |
27 | import java.util.concurrent.Callable; |
28 | import java.util.concurrent.ExecutionException; | |
29 | import java.util.concurrent.ExecutorService; | |
30 | import java.util.concurrent.Executors; | |
31 | import java.util.concurrent.Future; | |
32 | import java.util.concurrent.TimeUnit; | |
33 | ||
fadd7888 | 34 | import org.eclipse.jdt.annotation.NonNull; |
9d979fda MK |
35 | import org.eclipse.tracecompass.common.core.NonNullUtils; |
36 | import org.eclipse.tracecompass.common.core.collect.BufferedBlockingQueue; | |
37 | import org.junit.Before; | |
38 | import org.junit.Rule; | |
39 | import org.junit.Test; | |
40 | import org.junit.rules.TestRule; | |
41 | import org.junit.rules.Timeout; | |
42 | ||
fadd7888 AM |
43 | import com.google.common.base.Functions; |
44 | import com.google.common.collect.FluentIterable; | |
47c79d9f | 45 | import com.google.common.collect.HashMultiset; |
fadd7888 | 46 | import com.google.common.collect.Iterables; |
47c79d9f | 47 | import com.google.common.collect.Iterators; |
fadd7888 | 48 | import com.google.common.primitives.Chars; |
47c79d9f | 49 | |
9d979fda MK |
50 | /** |
51 | * Test suite for the {@link BufferedBlockingQueue} | |
52 | */ | |
53 | public class BufferedBlockingQueueTest { | |
54 | ||
55 | /** Timeout the tests after 2 minutes */ | |
56 | @Rule | |
57 | public TestRule timeoutRule = new Timeout(120000); | |
58 | ||
47c79d9f AM |
59 | private static final String testString = "abcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyz" + |
60 | "abcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyz" + | |
61 | "abcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyz"; | |
62 | ||
9d979fda MK |
63 | private BufferedBlockingQueue<Character> charQueue; |
64 | ||
65 | /** | |
66 | * Test setup | |
67 | */ | |
68 | @Before | |
69 | public void init() { | |
70 | charQueue = new BufferedBlockingQueue<>(15, 15); | |
71 | } | |
72 | ||
73 | /** | |
74 | * Test inserting one element and removing it. | |
75 | */ | |
76 | @Test | |
77 | public void testSingleInsertion() { | |
78 | Character element = 'x'; | |
79 | charQueue.put(element); | |
80 | charQueue.flushInputBuffer(); | |
81 | ||
82 | Character out = charQueue.take(); | |
83 | assertEquals(element, out); | |
84 | } | |
85 | ||
86 | /** | |
87 | * Test insertion of elements that fit into the input buffer. | |
88 | */ | |
89 | @Test | |
90 | public void testSimpleInsertion() { | |
91 | String string = "Hello world!"; | |
92 | for (char elem : string.toCharArray()) { | |
93 | charQueue.put(elem); | |
94 | } | |
95 | charQueue.flushInputBuffer(); | |
96 | ||
97 | StringBuilder sb = new StringBuilder(); | |
98 | while (!charQueue.isEmpty()) { | |
99 | sb.append(charQueue.take()); | |
100 | } | |
101 | assertEquals(string, sb.toString()); | |
102 | } | |
103 | ||
104 | /** | |
105 | * Test insertion of elements that will require more than one input buffer. | |
106 | */ | |
107 | @Test | |
108 | public void testLargeInsertion() { | |
109 | String string = testString.substring(0, 222); | |
110 | for (char elem : string.toCharArray()) { | |
111 | charQueue.put(elem); | |
112 | } | |
113 | charQueue.flushInputBuffer(); | |
114 | ||
115 | StringBuilder sb = new StringBuilder(); | |
116 | while (!charQueue.isEmpty()) { | |
117 | sb.append(charQueue.take()); | |
118 | } | |
119 | assertEquals(string, sb.toString()); | |
120 | } | |
121 | ||
122 | /** | |
123 | * Test the state of the {@link BufferedBlockingQueue#isEmpty()} method at | |
124 | * various moments. | |
125 | */ | |
126 | @Test | |
127 | public void testIsEmpty() { | |
128 | BufferedBlockingQueue<String> stringQueue = new BufferedBlockingQueue<>(15, 15); | |
129 | assertTrue(stringQueue.isEmpty()); | |
130 | ||
131 | stringQueue.put("Hello"); | |
132 | assertFalse(stringQueue.isEmpty()); | |
133 | ||
134 | stringQueue.flushInputBuffer(); | |
135 | assertFalse(stringQueue.isEmpty()); | |
136 | ||
137 | stringQueue.flushInputBuffer(); | |
138 | assertFalse(stringQueue.isEmpty()); | |
139 | ||
140 | stringQueue.flushInputBuffer(); | |
141 | stringQueue.take(); | |
142 | assertTrue(stringQueue.isEmpty()); | |
143 | ||
144 | stringQueue.flushInputBuffer(); | |
145 | assertTrue(stringQueue.isEmpty()); | |
146 | } | |
147 | ||
148 | /** | |
149 | * Write random data in and read it, several times. | |
150 | */ | |
151 | @Test | |
152 | public void testOddInsertions() { | |
153 | BufferedBlockingQueue<Object> objectQueue = new BufferedBlockingQueue<>(15, 15); | |
154 | LinkedList<Object> expectedValues = new LinkedList<>(); | |
155 | Random rnd = new Random(); | |
156 | rnd.setSeed(123); | |
157 | ||
158 | for (int i = 0; i < 10; i++) { | |
159 | /* | |
160 | * The queue's total size is 225 (15x15). We must make sure to not | |
161 | * fill it up here! | |
162 | */ | |
163 | for (int j = 0; j < 50; j++) { | |
164 | Integer testInt = NonNullUtils.checkNotNull(rnd.nextInt()); | |
165 | Long testLong = NonNullUtils.checkNotNull(rnd.nextLong()); | |
166 | Double testDouble = NonNullUtils.checkNotNull(rnd.nextDouble()); | |
167 | Double testGaussian = NonNullUtils.checkNotNull(rnd.nextGaussian()); | |
168 | ||
169 | expectedValues.add(testInt); | |
170 | expectedValues.add(testLong); | |
171 | expectedValues.add(testDouble); | |
172 | expectedValues.add(testGaussian); | |
173 | objectQueue.put(testInt); | |
174 | objectQueue.put(testLong); | |
175 | objectQueue.put(testDouble); | |
176 | objectQueue.put(testGaussian); | |
177 | } | |
178 | objectQueue.flushInputBuffer(); | |
179 | ||
180 | while (!expectedValues.isEmpty()) { | |
181 | Object expected = expectedValues.removeFirst(); | |
182 | Object actual = objectQueue.take(); | |
183 | assertEquals(expected, actual); | |
184 | } | |
185 | } | |
186 | } | |
187 | ||
188 | /** | |
189 | * Read with a producer and a consumer | |
190 | * | |
191 | * @throws InterruptedException | |
192 | * The test was interrupted | |
193 | */ | |
194 | @Test | |
195 | public void testMultiThread() throws InterruptedException { | |
196 | /* A character not found in the test string */ | |
197 | final Character lastElement = '%'; | |
198 | ||
199 | Thread producer = new Thread() { | |
200 | @Override | |
201 | public void run() { | |
202 | for (char c : testString.toCharArray()) { | |
203 | charQueue.put(c); | |
204 | } | |
205 | charQueue.put(lastElement); | |
206 | charQueue.flushInputBuffer(); | |
207 | } | |
208 | }; | |
209 | producer.start(); | |
210 | ||
211 | Thread consumer = new Thread() { | |
212 | @Override | |
213 | public void run() { | |
214 | Character s = charQueue.take(); | |
215 | while (!s.equals(lastElement)) { | |
216 | s = charQueue.take(); | |
217 | } | |
218 | } | |
219 | }; | |
220 | consumer.start(); | |
221 | ||
222 | consumer.join(); | |
223 | producer.join(); | |
224 | } | |
225 | ||
47c79d9f AM |
226 | /** |
227 | * Test the contents returned by {@link BufferedBlockingQueue#iterator()}. | |
228 | * | |
229 | * The test is sequential, because the iterator has no guarantee wrt to its | |
230 | * contents when run concurrently. | |
231 | */ | |
232 | @Test | |
233 | public void testIteratorContents() { | |
234 | Deque<Character> expected = new LinkedList<>(); | |
235 | ||
236 | /* Iterator should be empty initially */ | |
237 | assertFalse(charQueue.iterator().hasNext()); | |
238 | ||
239 | /* Insert the first 50 elements */ | |
240 | for (int i = 0; i < 50; i++) { | |
241 | char c = testString.charAt(i); | |
242 | charQueue.put(c); | |
243 | expected.addFirst(c); | |
244 | } | |
245 | LinkedList<Character> actual = new LinkedList<>(); | |
246 | Iterators.addAll(actual, charQueue.iterator()); | |
247 | assertSameElements(expected, actual); | |
248 | ||
249 | /* | |
250 | * Insert more elements, flush the input buffer (should not affect the | |
251 | * iteration). | |
252 | */ | |
253 | for (int i = 50; i < 60; i++) { | |
254 | char c = testString.charAt(i); | |
255 | charQueue.put(c); | |
256 | charQueue.flushInputBuffer(); | |
257 | expected.addFirst(c); | |
258 | } | |
259 | actual = new LinkedList<>(); | |
260 | Iterators.addAll(actual, charQueue.iterator()); | |
261 | assertSameElements(expected, actual); | |
262 | ||
263 | /* Consume the 30 last elements from the queue */ | |
264 | for (int i = 0; i < 30; i++) { | |
265 | charQueue.take(); | |
266 | expected.removeLast(); | |
267 | } | |
268 | actual = new LinkedList<>(); | |
269 | Iterators.addAll(actual, charQueue.iterator()); | |
270 | assertSameElements(expected, actual); | |
271 | ||
272 | /* Now empty the queue */ | |
273 | while (!charQueue.isEmpty()) { | |
274 | charQueue.take(); | |
275 | expected.removeLast(); | |
276 | } | |
277 | assertFalse(charQueue.iterator().hasNext()); | |
278 | } | |
279 | ||
280 | /** | |
281 | * Utility method to verify that two collections contain the exact same | |
282 | * elements, not necessarily in the same iteration order. | |
283 | * | |
284 | * {@link Collection#equals} requires the iteration order to be the same, | |
285 | * which we do not want here. | |
286 | * | |
287 | * Using a {@link Set} or {@link Collection#containsAll} is not sufficient | |
288 | * either, because those will throw away duplicate elements. | |
289 | */ | |
290 | private static <T> void assertSameElements(Collection<T> c1, Collection<T> c2) { | |
291 | assertEquals(HashMultiset.create(c1), HashMultiset.create(c2)); | |
292 | } | |
293 | ||
9d979fda | 294 | /** |
d6e2666b AM |
295 | * Test iterating on the queue while a producer and a consumer threads are |
296 | * using it. The iteration should not affect the elements taken by the | |
297 | * consumer. | |
9d979fda MK |
298 | */ |
299 | @Test | |
fadd7888 | 300 | public void testConcurrentIteration() { |
d6e2666b | 301 | final BufferedBlockingQueue<String> queue = new BufferedBlockingQueue<>(15, 15); |
fadd7888 | 302 | final String poisonPill = "That's all folks!"; |
9d979fda | 303 | |
fadd7888 AM |
304 | /* |
305 | * Convert the test's testBuffer into an array of String, one for each | |
306 | * character. There are probably simpler ways of doing this, but it | |
307 | * would not look as impressive. | |
308 | */ | |
309 | FluentIterable<Character> fi = FluentIterable.from(Chars.asList(testString.toCharArray())); | |
310 | List<String> strings = fi.transform(Functions.toStringFunction()).toList(); | |
9d979fda | 311 | |
fadd7888 AM |
312 | Iterable<Iterable<String>> results = |
313 | runConcurrencyTest(queue, strings, poisonPill, 1, 1, 1); | |
314 | ||
315 | assertEquals(strings, Iterables.getOnlyElement(results)); | |
316 | } | |
9d979fda | 317 | |
fadd7888 AM |
318 | /** |
319 | * Run a concurrency test on a {@link BufferedBlockingQueue}, with the | |
320 | * specified number of producer, consumer and observer/iterator threads. | |
321 | * | |
322 | * The returned value represents the elements consumed by each consumer | |
323 | * thread. Thus, if there is one consumer, the top-level {@link Iterable} | |
324 | * will be of size 1, and the inner one should contain all the elements that | |
325 | * were inserted. | |
326 | * | |
327 | * @param queue | |
328 | * The queue to run the test on | |
329 | * @param testBuffer | |
330 | * The data set to insert in the queue. Every producer will | |
331 | * insert one entire set. | |
332 | * @param poisonPill | |
333 | * The "poison pill" to indicate the end. Simply make sure it is | |
334 | * a element of type <T> that is not present in the 'testBuffer'. | |
335 | * @param nbProducerThreads | |
336 | * Number of producer threads. There should be at least 1. | |
337 | * @param nbConsumerThreads | |
338 | * Number of consumer threads. There should be at least 1. | |
339 | * @param nbObserverThreads | |
340 | * Number of observer threads. It should be >= 0. | |
341 | * @return The consumed elements, as seen by each consumer thread. | |
342 | */ | |
343 | private static <T> Iterable<Iterable<T>> runConcurrencyTest(final BufferedBlockingQueue<T> queue, | |
344 | final List<T> testBuffer, | |
345 | final @NonNull T poisonPill, | |
346 | int nbProducerThreads, | |
347 | int nbConsumerThreads, | |
348 | int nbObserverThreads) { | |
349 | ||
350 | final class ProducerThread implements Runnable { | |
9d979fda MK |
351 | @Override |
352 | public void run() { | |
fadd7888 AM |
353 | for (int i = 0; i < testBuffer.size(); i++) { |
354 | T elem = testBuffer.get(i); | |
355 | if (elem == null) { | |
356 | // TODO replace with List<@NonNull T> once we can | |
357 | throw new IllegalArgumentException(); | |
358 | } | |
359 | queue.put(elem); | |
9d979fda | 360 | } |
d6e2666b AM |
361 | queue.put(poisonPill); |
362 | queue.flushInputBuffer(); | |
9d979fda | 363 | } |
fadd7888 | 364 | } |
9d979fda | 365 | |
fadd7888 AM |
366 | /** |
367 | * The consumer thread will return the elements it read via its Future. | |
368 | * | |
369 | * Note that if there are multiple consumers, there is no guarantee with | |
370 | * regards the contents of an individual one. | |
371 | */ | |
372 | final class ConsumerThread implements Callable<Iterable<T>> { | |
9d979fda | 373 | @Override |
fadd7888 AM |
374 | public Iterable<T> call() { |
375 | List<T> results = new LinkedList<>(); | |
376 | T elem = queue.take(); | |
377 | while (!elem.equals(poisonPill)) { | |
378 | results.add(elem); | |
379 | elem = queue.take(); | |
9d979fda | 380 | } |
fadd7888 | 381 | return results; |
9d979fda | 382 | } |
fadd7888 | 383 | } |
9d979fda | 384 | |
fadd7888 | 385 | final class ObserverThread implements Runnable { |
9d979fda MK |
386 | @Override |
387 | public void run() { | |
fadd7888 AM |
388 | for (int i = 0; i < 5; i++) { |
389 | final Set<T> results = new HashSet<>(); | |
390 | for (T input : queue) { | |
391 | /* | |
392 | * Do something with the element so that this iteration | |
393 | * does not get optimized out. | |
394 | */ | |
d6e2666b | 395 | results.add(input); |
9d979fda MK |
396 | } |
397 | } | |
9d979fda | 398 | } |
fadd7888 | 399 | } |
9d979fda | 400 | |
fadd7888 AM |
401 | if (nbProducerThreads < 1 || nbConsumerThreads < 1 || nbObserverThreads < 0) { |
402 | throw new IllegalArgumentException(); | |
403 | } | |
9d979fda | 404 | |
fadd7888 AM |
405 | final ExecutorService pool = Executors.newFixedThreadPool( |
406 | nbProducerThreads + nbConsumerThreads + nbObserverThreads); | |
9d979fda | 407 | |
fadd7888 AM |
408 | /* Consumed elements, per consumer thread */ |
409 | List<Future<Iterable<T>>> consumedElements = new LinkedList<>(); | |
9d979fda | 410 | |
fadd7888 AM |
411 | for (int i = 0; i < nbProducerThreads; i++) { |
412 | pool.submit(new ProducerThread()); | |
413 | } | |
414 | for (int i = 0; i < nbConsumerThreads; i++) { | |
415 | consumedElements.add(pool.submit(new ConsumerThread())); | |
416 | } | |
417 | for (int i = 0; i < nbObserverThreads; i++) { | |
418 | pool.submit(new ObserverThread()); | |
419 | } | |
420 | ||
421 | List<Iterable<T>> results = new LinkedList<>(); | |
422 | try { | |
423 | /* Convert the Future's to the actual return value */ | |
424 | for (Future<Iterable<T>> future : consumedElements) { | |
425 | Iterable<T> threadResult = future.get(); | |
426 | results.add(threadResult); | |
427 | } | |
428 | ||
429 | pool.shutdown(); | |
430 | boolean success = pool.awaitTermination(2, TimeUnit.MINUTES); | |
431 | if (!success) { | |
432 | throw new InterruptedException(); | |
433 | } | |
434 | ||
435 | } catch (ExecutionException | InterruptedException e) { | |
436 | fail(e.getMessage()); | |
437 | } | |
438 | ||
439 | return results; | |
440 | } | |
47c79d9f | 441 | |
fadd7888 | 442 | } |