Spring Kafka Quick Start

A quick start to get a Spring Kafka application up and running.

build.gradle

For build.gradle, we need to include core springframework.boot and spring-kafka


plugins {
    id 'application'
    id 'org.springframework.boot' version '3.2.0'
}

apply plugin: 'io.spring.dependency-management'

dependencies {
    implementation 'org.springframework.kafka:spring-kafka:3.1.0'
    implementation 'org.springframework.boot:spring-boot-starter-web'
}

...

App.java

We extends from ApplicationRunner so that our app will execute run() when start to send the producer events.

@SpringBootApplication  
public class App implements ApplicationRunner {  
  
    @Autowired  
    private DefaultEventProducer defaultEventProducer;  
  
    public static void main(String[] args) {  
        SpringApplication.run(App.class, args);  
    }  
  
    @Override  
    public void run(ApplicationArguments args) throws Exception {  
        defaultEventProducer.sendMessage("hello world");  
    }  
}

DefaultEventProducer.java

Producer class to send the kafka message

@Service  
public class DefaultEventProducer {  
  
  private final KafkaTemplate<String, String> kafkaTemplate;  
  
  public DefaultEventProducer(KafkaTemplate<String, String> kafkaTemplate) {  
    this.kafkaTemplate = kafkaTemplate;  
  }  
  
  public void sendMessage(String message) {  
    kafkaTemplate.send(TOPIC_NAME, message);  
  }  
}

DefaultEventListener.java

Listener class to receive events

@Service  
public class DefaultEventListener {  
  @KafkaListener(id = "listener1", topics = TOPIC_NAME, containerFactory = "defaultMessageListenerFactory")  
  public void listen(String event) {  
    System.out.println("Listened events: " + event);  
  }  
}

In here the id is for consumer group id, which we can define

KafkaConfiguration.java

@Configuration  
public class KafkaConfiguration {  
  
  public static final String TOPIC_NAME = "mytopic";  
  
  @Bean  
  public NewTopic topic() {  
    return TopicBuilder.name(TOPIC_NAME)  
        .partitions(1).replicas(1).build();  
  }  
  
  @Bean  
  public KafkaAdmin admin() {  
    // Only for creating a new topic  
    Map<String, Object> configs = new HashMap<>();  
    configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9093");  
    return new KafkaAdmin(configs);  
  }  
  
  
  @Bean  
  public ConsumerFactory<String, String> consumerFactory() {  
    return new DefaultKafkaConsumerFactory<>(Map.of(  
        ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9093",  
        ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class,  
        ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class  
    ));  
  }  
  
  
  @Bean  
  public ProducerFactory<String, String> producerFactory() {  
    return new DefaultKafkaProducerFactory<>(  
      Map.of(  
        ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9093",  
        ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class,  
        ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class  
      ));  
  }  
  
  @Bean  
  public KafkaTemplate<String, String> kafkaTemplate() {  
    return new KafkaTemplate<>(producerFactory());  
  }  
  
  @Bean  
  public ConcurrentKafkaListenerContainerFactory<String, String> defaultMessageListenerFactory() {  
    ConcurrentKafkaListenerContainerFactory<String, String> containerFactory = new ConcurrentKafkaListenerContainerFactory<>();  
    containerFactory.setConsumerFactory(consumerFactory());  
    return containerFactory;  
  }  
}

In here the consumerFactory is provided in @KafkaListener so that it use the consumer with that config. We make sure that we configure the consumer to listen to localhost:9093 which is where the broker from.

Similarly, for KafkaTemplate, we also want our producer to produce into localhost:9093.

The Serialiser and Deserialiser are compulsory to serialise and deserialise the message from binary.

KafkaAdmin is to create a new topic, … etc from the code. We also need it to talk to localhost:9093 since we're going to put our server there.

Docker Compose

version: '3'  
services:  
  zookeeper:  
    image: wurstmeister/zookeeper  
    container_name: zookeeper  
    ports:  
      - "2181:2181"  
    networks:  
      - kafka-net  
  kafka:  
    image: wurstmeister/kafka  
    container_name: kafka  
    ports:  
      - "9093:9093"  
    environment:  
      KAFKA_ADVERTISED_LISTENERS: INSIDE://kafka:9092,OUTSIDE://localhost:9093  
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INSIDE:PLAINTEXT,OUTSIDE:PLAINTEXT  
      KAFKA_LISTENERS: INSIDE://0.0.0.0:9092,OUTSIDE://0.0.0.0:9093  
      KAFKA_INTER_BROKER_LISTENER_NAME: INSIDE  
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181  
      KAFKA_CREATE_TOPICS: "mytopic:1:1"  
    networks:  
      - kafka-net  
  kafka-ui:
    container_name: kafka-ui
    image: provectuslabs/kafka-ui:latest
    ports:
      - 8090:8080
    depends_on:
      - kafka
    environment:
      DYNAMIC_CONFIG_ENABLED: 'true'
      KAFKA_CLUSTERS_0_NAME: wizard_test
      KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka:9092
    networks:
      - kafka-net
networks:  
  kafka-net:  
    driver: bridge

The KAFKA_INTER_BROKER_LISTENER_NAME is used to define inter.broker.listener.name in Apache Kafka. Which will be primary used for connection for internal brokers communication.

The reason why we have INSIDE and OUTSIDE here is because we've defined it in KAFKA_LISTENER_SECURITY_PROTOCOL_MAP

The mytopic:x:y means x for parititon and y for replica. mytopic is the topic name