ArrayBlockingQueue
BlockingQueue (Java Platform SE 8 ) (oracle.com)](https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/BlockingQueue.html)
java.util.concurrent.BlockingQueue
BlockingQueue
is a datastructure that is synchronise. As a result, it works in a multithreaded environment.
BlockingQueue
in additional to Queue
offer a few other methods. Especially
put()
— try toadd()
an item in theQueue
but wait if necessary in case the queue is fulltake()
— try topoll()
an item in theQueue
but wait if necessary if there is no item to poll.
Example
@ThreadSafe
public class ProducerConsumerDemo {
public static void main(String[] args) {
ExecutorService producerExecutorService = Executors.newFixedThreadPool(2);
ExecutorService consumerExecutorService = Executors.newFixedThreadPool(5);
BlockingQueue<Integer> taskQueue = new ArrayBlockingQueue<>(10);
producerExecutorService.submit(() -> {
while (true) {
var producer = new Producer(taskQueue);
producer.produce();
}
});
producerExecutorService.shutdown();
consumerExecutorService.submit(() -> {
while (true) {
var consumer = new Consumer(taskQueue);
consumer.consume();
}
});
consumerExecutorService.shutdown();
}
}
@ThreadSafe
class Producer {
private volatile BlockingQueue<Integer> taskQueue;
private static final Random randomGenerator = new Random();
private final String name = UUID.randomUUID().toString();
public Producer(BlockingQueue<Integer> taskQueue) {
this.taskQueue = taskQueue;
}
public void produce() throws InterruptedException {
Thread.sleep(1000);
try {
Integer item = randomGenerator.nextInt(1000);
this.taskQueue.put(item);
System.out.printf("Producer %s added item: %s. Queue size: %s%n", name, item, taskQueue.size());
} catch (Exception e) {
e.printStackTrace();
}
}
}
@ThreadSafe
class Consumer {
private volatile BlockingQueue<Integer> taskQueue;
private final String name = UUID.randomUUID().toString();
public Consumer(BlockingQueue<Integer> taskQueue) {
this.taskQueue = taskQueue;
}
public void consume() throws InterruptedException {
Thread.sleep(2000);
Integer processedItem = taskQueue.take();
System.out.printf("Consumer %s processed: %s. Queue size: %s%n", name, processedItem, taskQueue.size());
}
}
Which produces:
Producer 4cc2acf4-ebf8-4862-8723-d90cd4df19c2 added item: 454. Queue size: 1
Consumer 794cec64-9b18-4898-b07c-1c9ed0ef4c2d processed: 454. Queue size: 0
Producer 2ef21ccb-a2ef-4552-a142-c29bd07dc653 added item: 121. Queue size: 1
Producer b9d4e9c5-0d1a-4ce9-b048-6d0019957c36 added item: 659. Queue size: 2
Consumer 77ccaa29-7f3f-4494-9ddd-c50d4077ca39 processed: 121. Queue size: 1
Producer 3a46012e-82d2-415e-903e-fd10b4c42794 added item: 306. Queue size: 2
Producer 2c13922c-25df-492f-804a-6aee0d5e2e98 added item: 693. Queue size: 3
Consumer 8926c1a1-8b1a-4a3d-9c32-637949ccdb57 processed: 659. Queue size: 2
Producer 8746a306-b36f-4854-a1fe-a5989ecf6ea5 added item: 927. Queue size: 3
Producer d86d4820-aa55-4a92-9999-5760d986363d added item: 219. Queue size: 4
Consumer b4c5419a-ac6c-4119-9816-65743c1d64fd processed: 306. Queue size: 3
Producer a0a973c8-cff4-4129-9b6f-75b361c34b41 added item: 567. Queue size: 4
Producer 7b69e96e-937a-462e-a8fa-974aa52322b7 added item: 885. Queue size: 5
...
In here, the ArrayBlockingQueue<>(10)
limits 10 item in the queue.