Spring Kafka Quick Start
A quick start to get a Spring Kafka application up and running.
build.gradle
For build.gradle
, we need to include core springframework.boot
and spring-kafka
plugins {
id 'application'
id 'org.springframework.boot' version '3.2.0'
}
apply plugin: 'io.spring.dependency-management'
dependencies {
implementation 'org.springframework.kafka:spring-kafka:3.1.0'
implementation 'org.springframework.boot:spring-boot-starter-web'
}
...
App.java
We extends from ApplicationRunner
so that our app will execute run()
when start to send the producer events.
@SpringBootApplication
public class App implements ApplicationRunner {
@Autowired
private DefaultEventProducer defaultEventProducer;
public static void main(String[] args) {
SpringApplication.run(App.class, args);
}
@Override
public void run(ApplicationArguments args) throws Exception {
defaultEventProducer.sendMessage("hello world");
}
}
DefaultEventProducer.java
Producer class to send the kafka message
@Service
public class DefaultEventProducer {
private final KafkaTemplate<String, String> kafkaTemplate;
public DefaultEventProducer(KafkaTemplate<String, String> kafkaTemplate) {
this.kafkaTemplate = kafkaTemplate;
}
public void sendMessage(String message) {
kafkaTemplate.send(TOPIC_NAME, message);
}
}
DefaultEventListener.java
Listener class to receive events
@Service
public class DefaultEventListener {
@KafkaListener(id = "listener1", topics = TOPIC_NAME, containerFactory = "defaultMessageListenerFactory")
public void listen(String event) {
System.out.println("Listened events: " + event);
}
}
In here the id
is for consumer group id, which we can define
KafkaConfiguration.java
@Configuration
public class KafkaConfiguration {
public static final String TOPIC_NAME = "mytopic";
@Bean
public NewTopic topic() {
return TopicBuilder.name(TOPIC_NAME)
.partitions(1).replicas(1).build();
}
@Bean
public KafkaAdmin admin() {
// Only for creating a new topic
Map<String, Object> configs = new HashMap<>();
configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9093");
return new KafkaAdmin(configs);
}
@Bean
public ConsumerFactory<String, String> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(Map.of(
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9093",
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class,
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class
));
}
@Bean
public ProducerFactory<String, String> producerFactory() {
return new DefaultKafkaProducerFactory<>(
Map.of(
ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9093",
ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class,
ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class
));
}
@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> defaultMessageListenerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> containerFactory = new ConcurrentKafkaListenerContainerFactory<>();
containerFactory.setConsumerFactory(consumerFactory());
return containerFactory;
}
}
In here the consumerFactory
is provided in @KafkaListener
so that it use the consumer with that config. We make sure that we configure the consumer to listen to localhost:9093
which is where the broker from.
Similarly, for KafkaTemplate
, we also want our producer to produce into localhost:9093
.
The Serialiser
and Deserialiser
are compulsory to serialise and deserialise the message from binary.
KafkaAdmin
is to create a new topic, … etc from the code. We also need it to talk to localhost:9093
since we're going to put our server there.
Docker Compose
version: '3'
services:
zookeeper:
image: wurstmeister/zookeeper
container_name: zookeeper
ports:
- "2181:2181"
networks:
- kafka-net
kafka:
image: wurstmeister/kafka
container_name: kafka
ports:
- "9093:9093"
environment:
KAFKA_ADVERTISED_LISTENERS: INSIDE://kafka:9092,OUTSIDE://localhost:9093
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INSIDE:PLAINTEXT,OUTSIDE:PLAINTEXT
KAFKA_LISTENERS: INSIDE://0.0.0.0:9092,OUTSIDE://0.0.0.0:9093
KAFKA_INTER_BROKER_LISTENER_NAME: INSIDE
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_CREATE_TOPICS: "mytopic:1:1"
networks:
- kafka-net
kafka-ui:
container_name: kafka-ui
image: provectuslabs/kafka-ui:latest
ports:
- 8090:8080
depends_on:
- kafka
environment:
DYNAMIC_CONFIG_ENABLED: 'true'
KAFKA_CLUSTERS_0_NAME: wizard_test
KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka:9092
networks:
- kafka-net
networks:
kafka-net:
driver: bridge
The KAFKA_INTER_BROKER_LISTENER_NAME
is used to define inter.broker.listener.name
in Apache Kafka. Which will be primary used for connection for internal brokers communication.
The reason why we have INSIDE
and OUTSIDE
here is because we've defined it in KAFKA_LISTENER_SECURITY_PROTOCOL_MAP
The mytopic:x:y
means x
for parititon and y
for replica. mytopic
is the topic name