How to Set Up Kafka in Spring

Apache Kafka is a distributed streaming platform often used for building real-time data pipelines and event-driven architectures. This guide walks you through configuring Kafka Producer and Consumer in a Spring Boot application.


πŸ”§ Dependencies

Ensure you have the following dependencies in your pom.xml (for Maven) or build.gradle (for Gradle):

  • spring-boot-starter
  • spring-kafka
  • lombok (optional)

πŸ“¨ Kafka Producer Configuration

We first configure the Kafka producer, which sends messages to Kafka topics.

1. Configure ProducerFactory

private Map<String, Object> producerConfigs() {
    Map<String, Object> configProps = new HashMap<>();
    configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
    configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
    return configProps;
}
 
private <T> ProducerFactory<String, T> createProducerFactory() {
    return new DefaultKafkaProducerFactory<>(producerConfigs());
}

2. Define the KafkaTemplate

private <T> KafkaTemplate<String, T> createKafkaTemplate() {
    return new KafkaTemplate<>(createProducerFactory());
}
 
@Bean
public KafkaTemplate<String, MessageDTO> kafkaTemplate() {
    return createKafkaTemplate();
}

βœ… KafkaTemplate is a high-level API to send messages to Kafka topics.


3. Define Kafka Topics

Create topics programmatically during the application startup using Spring's NewTopic.

private NewTopic createTopic(String topicName, int partitions, int replicas) {
    return TopicBuilder.name(topicName)
        .partitions(partitions)
        .replicas(replicas)
        .config(TopicConfig.RETENTION_MS_CONFIG, "604800000") // 7 days
        .config(TopicConfig.CLEANUP_POLICY_CONFIG, TopicConfig.CLEANUP_POLICY_DELETE)
        .build();
}
 
@Bean
public NewTopic chatTopic() {
    return createTopic(chatTopicName, chatPartitions, chatReplicas);
}

4. Sending Messages

Create a service to publish messages to a Kafka topic.

@Service
@RequiredArgsConstructor
public class KafkaChatProducerService {
 
    private final KafkaTemplate<String, MessageDTO> kafkaTemplate;
 
    public void sendMessage(MessageDTO message) {
        kafkaTemplate.send("topic-name", message.roomId().toString(), message);
    }
}

πŸ’‘ Make sure to use the correct topic name and keys to ensure message routing works as expected.


πŸ“₯ Kafka Consumer Configuration

To consume messages from Kafka, you must configure a ConsumerFactory and a ConcurrentKafkaListenerContainerFactory.

1. Configure ConsumerFactory

public Map<String, Object> configProperties() {
    Map<String, Object> props = new HashMap<>();
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
    props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
    return props;
}
 
private <T> ConsumerFactory<String, T> createConsumerFactory(Class<T> valueType) {
    JsonDeserializer<T> deserializer = new JsonDeserializer<>(valueType);
    deserializer.setUseTypeMapperForKey(true);
    return new DefaultKafkaConsumerFactory<>(
        configProperties(),
        new StringDeserializer(),
        deserializer);
}

πŸ” Using JsonDeserializer helps convert JSON messages back to Java objects.


2. Set up the Kafka Listener Container Factory

private <T> ConcurrentKafkaListenerContainerFactory<String, T> createKafkaListenerFactory(Class<T> valueType) {
    ConcurrentKafkaListenerContainerFactory<String, T> factory = new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(createConsumerFactory(valueType));
    return factory;
}
 
@Bean
public ConcurrentKafkaListenerContainerFactory<String, MessageDTO> messageKafkaListenerFactory() {
    return createKafkaListenerFactory(MessageDTO.class);
}

3. Listening for Messages

Finally, consume messages using the @KafkaListener annotation.

@Service
@Slf4j
@RequiredArgsConstructor
public class KafkaMessageConsumer {
 
    @KafkaListener(
        topics = "${kafka-config.chat-topic-name}",
        containerFactory = "messageKafkaListenerFactory"
    )
    public void consumeChat(MessageDTO message) {
        log.info("Message consumed: {}", message);
    }
}

πŸ“Œ Real-World Usage

This Kafka configuration is part of the actual implementation of the Chat Service.


✍️ Author

Le Minh Duc A passionate Java & Spring developer building scalable microservices.


πŸ’¬ "Turning ideas into scalable code."

Β© 2025 LΓͺ Minh Đức. Stay curious.