Concurrency Attribute
You can set the concurrency for a particular listener by:
@KafkaListener(
groupId = "listener1",
topics = TOPIC_NAME,
containerFactory = "defaultMessageListenerFactory",
concurrency = "6"
)
public void listen(String event) throws InterruptedException {
System.out.println("Listened events: " + event + " " + Thread.currentThread().getName());
try {
Thread.sleep(30000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
This means it will:
- For the same groupId, create 6 different consumers
- Let these 6 different consumers process parallely
So if the concurrency = "6"
this means our kafka listener will spread out between 6 partitions and process in 6 threads.
By default, the concurrency is 1, so if you don't provide, 1 consumer will try to listen for 6 paritions. (Single threaded). The event will be blocked.