Concurrency Attribute

You can set the concurrency for a particular listener by:

@KafkaListener(  
    groupId = "listener1",  
    topics = TOPIC_NAME,  
    containerFactory = "defaultMessageListenerFactory",  
    concurrency = "6"  
)
public void listen(String event) throws InterruptedException {  
  System.out.println("Listened events: " + event + " " + Thread.currentThread().getName());  
  try {  
    Thread.sleep(30000);  
  } catch (InterruptedException e) {  
    throw new RuntimeException(e);  
  }  
}

This means it will:

  • For the same groupId, create 6 different consumers
  • Let these 6 different consumers process parallely

So if the concurrency = "6" this means our kafka listener will spread out between 6 partitions and process in 6 threads.

By default, the concurrency is 1, so if you don't provide, 1 consumer will try to listen for 6 paritions. (Single threaded). The event will be blocked.