TransferQueue

LinkedTransferQueue (Java Platform SE 8 ) (oracle.com)

java.util.concurrent.LinkedTransferQueue<E>

Is a combination of SynchronousQueue and LinkedBlockingQueue

For transfer(), it blocks until a consumer take it using either take() or poll().

However take() and put() works similar to LinkedBlockingQueue

[!note]
TransferQueue has no capacity since it always can produce max 1 item

ProducerConsumer Pattern

For example:

@ThreadSafe
public class ProducerConsumerDemo {
    public static void main(String[] args) {
        ExecutorService producerExecutorService = Executors.newFixedThreadPool(2);
        ExecutorService consumerExecutorService = Executors.newFixedThreadPool(5);

        TransferQueue<Integer> taskQueue = new LinkedTransferQueue<>();

        producerExecutorService.submit(() -> {
            while (true) {
                var producer = new Producer(taskQueue);
                producer.produce();
            }
        });

        producerExecutorService.shutdown();

        consumerExecutorService.submit(() -> {
            while (true) {
                var consumer = new Consumer(taskQueue);
                consumer.consume();
            }
        });

        consumerExecutorService.shutdown();
    }
}

@ThreadSafe
class Producer {
    private volatile TransferQueue<Integer> taskQueue;
    private static final Random randomGenerator = new Random();
    private final String name = UUID.randomUUID().toString();
    public Producer(TransferQueue<Integer> taskQueue) {
        this.taskQueue = taskQueue;
    }

    public void produce() throws InterruptedException {
        Thread.sleep(1000);
        try {
            Integer item = randomGenerator.nextInt(1000);
            this.taskQueue.transfer(item);
            System.out.printf("Producer %s added item: %s. Queue size: %s%n", name, item, taskQueue.size());
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

@ThreadSafe
class Consumer {
    private volatile BlockingQueue<Integer> taskQueue;
    private final String name = UUID.randomUUID().toString();

    public Consumer(BlockingQueue<Integer> taskQueue) {
        this.taskQueue = taskQueue;
    }

    public void consume() throws InterruptedException {
        Thread.sleep(2000);
        Integer processedItem = taskQueue.take();
        System.out.printf("Consumer %s processed: %s. Queue size: %s%n", name, processedItem, taskQueue.size());
    }
}

Which will produce

Consumer cf3ff7f3-3a1a-4691-9d0c-e5ccb3433f04 processed: 719. Queue size: 0
Producer ff4720a2-b9d9-4a5c-954e-f5afc2f063d5 added item: 719. Queue size: 0
Consumer a50f2cdb-7396-494d-a0ff-a4fb1219f65b processed: 289. Queue size: 0
Producer 13c1e62e-577d-4d25-a5e8-59b41d01dd62 added item: 289. Queue size: 0
Consumer 9db028c2-c81f-463b-8550-650217130cbc processed: 871. Queue size: 0
Producer aaa25d5e-af4c-4499-838f-7e5e45372ad9 added item: 871. Queue size: 0
Consumer 54e2bfcf-1b1e-4e54-9c89-18950e3d0624 processed: 462. Queue size: 0
Producer 6209e3ec-9070-4960-9405-27b2cdd0003e added item: 462. Queue size: 0