Listening To Message

1. Using KafkaMessageListenerContainer

We can declare in our KafkaConfiguration

@Configuration  
public class KafkaConfiguration {

	public static final String TOPIC_NAME = "mytopic";

	@Bean  
	public ConsumerFactory<String, String> consumerFactory() {  
	  return new DefaultKafkaConsumerFactory<>(Map.of(  
	      ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9093",  
	      ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class,  
	      ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class,  
	      ErrorHandlingDeserializer.KEY_DESERIALIZER_CLASS, StringDeserializer.class,  
	      ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS, StringDeserializer.class  
	  ));  
	}

	@Bean
	public KafkaMessageListenerContainer<String, String> kafkaMessageListenerContainer() {
	    var properties = new ContainerProperties(TOPIC_NAME);
	
	    properties.setGroupId("some name");
	    
	    var listenerContainer = new KafkaMessageListenerContainer<>(
	        consumerFactory(),
	        properties
	    );
	
	    listenerContainer.setupMessageListener(messageListener());
	    return listenerContainer;
	}
	
	@Bean  
	public MessageListener<String, String> messageListener() {  
	  return data -> System.out.println("listener received: " + data.value());  
	}
}

After doing this, everytime the producer sends a message, we will log out the listener received:

Note that data is of type ConsumerRecord<String, String> which contains the HEADER, TIMESTAMP and so on… In here we're interested in the value.

2. Use @KafkaListener

Using kafka listener we can just do it simpler:

Configuration

@Configuration  
public class KafkaConfiguration {

  public static final String TOPIC_NAME = "mytopic";
	
  @Bean
  public ConsumerFactory<String, String> consumerFactory() {
    return new DefaultKafkaConsumerFactory<>(Map.of(
        ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9093",
        ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class,
        ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class,
        ErrorHandlingDeserializer.KEY_DESERIALIZER_CLASS, StringDeserializer.class,
        ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS, StringDeserializer.class
    ));
  }

  @Bean
  public ConcurrentKafkaListenerContainerFactory<String, String> defaultMessageListenerFactory() {
    ConcurrentKafkaListenerContainerFactory<String, String> containerFactory = new ConcurrentKafkaListenerContainerFactory<>();
    containerFactory.setConsumerFactory(consumerFactory());
    return containerFactory;
  }
}

Service

@Service  
public class DefaultEventListener {  
  @KafkaListener(id = "listener1", topics = TOPIC_NAME, containerFactory = "defaultMessageListenerFactory")  
  public void listen(String event) {  
    System.out.println("Listened events: " + event);  
  }  
}