ArrayBlockingQueue

BlockingQueue (Java Platform SE 8 ) (oracle.com)](https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/BlockingQueue.html)

java.util.concurrent.BlockingQueue

BlockingQueue is a datastructure that is synchronise. As a result, it works in a multithreaded environment.

BlockingQueue in additional to Queue offer a few other methods. Especially

  • put() — try to add() an item in the Queue but wait if necessary in case the queue is full
  • take() — try to poll() an item in the Queue but wait if necessary if there is no item to poll.

Example

@ThreadSafe
public class ProducerConsumerDemo {
    public static void main(String[] args) {
        ExecutorService producerExecutorService = Executors.newFixedThreadPool(2);
        ExecutorService consumerExecutorService = Executors.newFixedThreadPool(5);

        BlockingQueue<Integer> taskQueue = new ArrayBlockingQueue<>(10);

        producerExecutorService.submit(() -> {
            while (true) {
                var producer = new Producer(taskQueue);
                producer.produce();
            }
        });

        producerExecutorService.shutdown();

        consumerExecutorService.submit(() -> {
            while (true) {
                var consumer = new Consumer(taskQueue);
                consumer.consume();
            }
        });

        consumerExecutorService.shutdown();
    }
}

@ThreadSafe
class Producer {
    private volatile BlockingQueue<Integer> taskQueue;
    private static final Random randomGenerator = new Random();
    private final String name = UUID.randomUUID().toString();
    public Producer(BlockingQueue<Integer> taskQueue) {
        this.taskQueue = taskQueue;
    }

    public void produce() throws InterruptedException {
        Thread.sleep(1000);
        try {
            Integer item = randomGenerator.nextInt(1000);
            this.taskQueue.put(item);
            System.out.printf("Producer %s added item: %s. Queue size: %s%n", name, item, taskQueue.size());
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

@ThreadSafe
class Consumer {
    private volatile BlockingQueue<Integer> taskQueue;
    private final String name = UUID.randomUUID().toString();

    public Consumer(BlockingQueue<Integer> taskQueue) {
        this.taskQueue = taskQueue;
    }

    public void consume() throws InterruptedException {
        Thread.sleep(2000);
        Integer processedItem = taskQueue.take();
        System.out.printf("Consumer %s processed: %s. Queue size: %s%n", name, processedItem, taskQueue.size());
    }
}

Which produces:

Producer 4cc2acf4-ebf8-4862-8723-d90cd4df19c2 added item: 454. Queue size: 1
Consumer 794cec64-9b18-4898-b07c-1c9ed0ef4c2d processed: 454. Queue size: 0
Producer 2ef21ccb-a2ef-4552-a142-c29bd07dc653 added item: 121. Queue size: 1
Producer b9d4e9c5-0d1a-4ce9-b048-6d0019957c36 added item: 659. Queue size: 2
Consumer 77ccaa29-7f3f-4494-9ddd-c50d4077ca39 processed: 121. Queue size: 1
Producer 3a46012e-82d2-415e-903e-fd10b4c42794 added item: 306. Queue size: 2
Producer 2c13922c-25df-492f-804a-6aee0d5e2e98 added item: 693. Queue size: 3
Consumer 8926c1a1-8b1a-4a3d-9c32-637949ccdb57 processed: 659. Queue size: 2
Producer 8746a306-b36f-4854-a1fe-a5989ecf6ea5 added item: 927. Queue size: 3
Producer d86d4820-aa55-4a92-9999-5760d986363d added item: 219. Queue size: 4
Consumer b4c5419a-ac6c-4119-9816-65743c1d64fd processed: 306. Queue size: 3
Producer a0a973c8-cff4-4129-9b6f-75b361c34b41 added item: 567. Queue size: 4
Producer 7b69e96e-937a-462e-a8fa-974aa52322b7 added item: 885. Queue size: 5
...

In here, the ArrayBlockingQueue<>(10) limits 10 item in the queue.