ArrayBlockingQueue
BlockingQueue (Java Platform SE 8 ) (oracle.com)
java.util.concurrent.BlockingQueue
BlockingQueue is a datastructure that is synchronise. As a result, it works in a multithreaded environment.
BlockingQueue in additional to Queue offer a few other methods. Especially
put()— try toadd()an item in theQueuebut wait if necessary in case the queue is fulltake()— try topoll()an item in theQueuebut wait if necessary if there is no item to poll.
Example
@ThreadSafe
public class ProducerConsumerDemo {
public static void main(String[] args) {
ExecutorService producerExecutorService = Executors.newFixedThreadPool(2);
ExecutorService consumerExecutorService = Executors.newFixedThreadPool(5);
BlockingQueue<Integer> taskQueue = new ArrayBlockingQueue<>(10);
producerExecutorService.submit(() -> {
while (true) {
var producer = new Producer(taskQueue);
producer.produce();
}
});
producerExecutorService.shutdown();
consumerExecutorService.submit(() -> {
while (true) {
var consumer = new Consumer(taskQueue);
consumer.consume();
}
});
consumerExecutorService.shutdown();
}
}
@ThreadSafe
class Producer {
private volatile BlockingQueue<Integer> taskQueue;
private static final Random randomGenerator = new Random();
private final String name = UUID.randomUUID().toString();
public Producer(BlockingQueue<Integer> taskQueue) {
this.taskQueue = taskQueue;
}
public void produce() throws InterruptedException {
Thread.sleep(1000);
try {
Integer item = randomGenerator.nextInt(1000);
this.taskQueue.put(item);
System.out.printf("Producer %s added item: %s. Queue size: %s%n", name, item, taskQueue.size());
} catch (Exception e) {
e.printStackTrace();
}
}
}
@ThreadSafe
class Consumer {
private volatile BlockingQueue<Integer> taskQueue;
private final String name = UUID.randomUUID().toString();
public Consumer(BlockingQueue<Integer> taskQueue) {
this.taskQueue = taskQueue;
}
public void consume() throws InterruptedException {
Thread.sleep(2000);
Integer processedItem = taskQueue.take();
System.out.printf("Consumer %s processed: %s. Queue size: %s%n", name, processedItem, taskQueue.size());
}
}
Which produces:
Producer 4cc2acf4-ebf8-4862-8723-d90cd4df19c2 added item: 454. Queue size: 1
Consumer 794cec64-9b18-4898-b07c-1c9ed0ef4c2d processed: 454. Queue size: 0
Producer 2ef21ccb-a2ef-4552-a142-c29bd07dc653 added item: 121. Queue size: 1
Producer b9d4e9c5-0d1a-4ce9-b048-6d0019957c36 added item: 659. Queue size: 2
Consumer 77ccaa29-7f3f-4494-9ddd-c50d4077ca39 processed: 121. Queue size: 1
Producer 3a46012e-82d2-415e-903e-fd10b4c42794 added item: 306. Queue size: 2
Producer 2c13922c-25df-492f-804a-6aee0d5e2e98 added item: 693. Queue size: 3
Consumer 8926c1a1-8b1a-4a3d-9c32-637949ccdb57 processed: 659. Queue size: 2
Producer 8746a306-b36f-4854-a1fe-a5989ecf6ea5 added item: 927. Queue size: 3
Producer d86d4820-aa55-4a92-9999-5760d986363d added item: 219. Queue size: 4
Consumer b4c5419a-ac6c-4119-9816-65743c1d64fd processed: 306. Queue size: 3
Producer a0a973c8-cff4-4129-9b6f-75b361c34b41 added item: 567. Queue size: 4
Producer 7b69e96e-937a-462e-a8fa-974aa52322b7 added item: 885. Queue size: 5
...
In here, the ArrayBlockingQueue<>(10) limits 10 item in the queue.