Listening To Message
1. Using KafkaMessageListenerContainer
We can declare in our KafkaConfiguration
@Configuration
public class KafkaConfiguration {
public static final String TOPIC_NAME = "mytopic";
@Bean
public ConsumerFactory<String, String> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(Map.of(
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9093",
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class,
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class,
ErrorHandlingDeserializer.KEY_DESERIALIZER_CLASS, StringDeserializer.class,
ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS, StringDeserializer.class
));
}
@Bean
public KafkaMessageListenerContainer<String, String> kafkaMessageListenerContainer() {
var properties = new ContainerProperties(TOPIC_NAME);
properties.setGroupId("some name");
var listenerContainer = new KafkaMessageListenerContainer<>(
consumerFactory(),
properties
);
listenerContainer.setupMessageListener(messageListener());
return listenerContainer;
}
@Bean
public MessageListener<String, String> messageListener() {
return data -> System.out.println("listener received: " + data.value());
}
}
After doing this, everytime the producer sends a message, we will log out the listener received:
Note that data
is of type ConsumerRecord<String, String>
which contains the HEADER
, TIMESTAMP
and so on⦠In here we're interested in the value
.
2. Use @KafkaListener
Using kafka listener we can just do it simpler:
Configuration
@Configuration
public class KafkaConfiguration {
public static final String TOPIC_NAME = "mytopic";
@Bean
public ConsumerFactory<String, String> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(Map.of(
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9093",
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class,
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class,
ErrorHandlingDeserializer.KEY_DESERIALIZER_CLASS, StringDeserializer.class,
ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS, StringDeserializer.class
));
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> defaultMessageListenerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> containerFactory = new ConcurrentKafkaListenerContainerFactory<>();
containerFactory.setConsumerFactory(consumerFactory());
return containerFactory;
}
}
Service
@Service
public class DefaultEventListener {
@KafkaListener(id = "listener1", topics = TOPIC_NAME, containerFactory = "defaultMessageListenerFactory")
public void listen(String event) {
System.out.println("Listened events: " + event);
}
}