How to Set Up Kafka in Spring
Apache Kafka is a distributed streaming platform often used for building real-time data pipelines and event-driven architectures. This guide walks you through configuring Kafka Producer and Consumer in a Spring Boot application.
π§ Dependencies
Ensure you have the following dependencies in your pom.xml
(for Maven) or build.gradle
(for Gradle):
spring-boot-starter
spring-kafka
lombok
(optional)
π¨ Kafka Producer Configuration
We first configure the Kafka producer, which sends messages to Kafka topics.
1. Configure ProducerFactory
private Map<String, Object> producerConfigs() {
Map<String, Object> configProps = new HashMap<>();
configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
return configProps;
}
private <T> ProducerFactory<String, T> createProducerFactory() {
return new DefaultKafkaProducerFactory<>(producerConfigs());
}
2. Define the KafkaTemplate
private <T> KafkaTemplate<String, T> createKafkaTemplate() {
return new KafkaTemplate<>(createProducerFactory());
}
@Bean
public KafkaTemplate<String, MessageDTO> kafkaTemplate() {
return createKafkaTemplate();
}
β KafkaTemplate is a high-level API to send messages to Kafka topics.
3. Define Kafka Topics
Create topics programmatically during the application startup using Spring's NewTopic
.
private NewTopic createTopic(String topicName, int partitions, int replicas) {
return TopicBuilder.name(topicName)
.partitions(partitions)
.replicas(replicas)
.config(TopicConfig.RETENTION_MS_CONFIG, "604800000") // 7 days
.config(TopicConfig.CLEANUP_POLICY_CONFIG, TopicConfig.CLEANUP_POLICY_DELETE)
.build();
}
@Bean
public NewTopic chatTopic() {
return createTopic(chatTopicName, chatPartitions, chatReplicas);
}
4. Sending Messages
Create a service to publish messages to a Kafka topic.
@Service
@RequiredArgsConstructor
public class KafkaChatProducerService {
private final KafkaTemplate<String, MessageDTO> kafkaTemplate;
public void sendMessage(MessageDTO message) {
kafkaTemplate.send("topic-name", message.roomId().toString(), message);
}
}
π‘ Make sure to use the correct topic name and keys to ensure message routing works as expected.
π₯ Kafka Consumer Configuration
To consume messages from Kafka, you must configure a ConsumerFactory
and a ConcurrentKafkaListenerContainerFactory
.
1. Configure ConsumerFactory
public Map<String, Object> configProperties() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
return props;
}
private <T> ConsumerFactory<String, T> createConsumerFactory(Class<T> valueType) {
JsonDeserializer<T> deserializer = new JsonDeserializer<>(valueType);
deserializer.setUseTypeMapperForKey(true);
return new DefaultKafkaConsumerFactory<>(
configProperties(),
new StringDeserializer(),
deserializer);
}
π Using
JsonDeserializer
helps convert JSON messages back to Java objects.
2. Set up the Kafka Listener Container Factory
private <T> ConcurrentKafkaListenerContainerFactory<String, T> createKafkaListenerFactory(Class<T> valueType) {
ConcurrentKafkaListenerContainerFactory<String, T> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(createConsumerFactory(valueType));
return factory;
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, MessageDTO> messageKafkaListenerFactory() {
return createKafkaListenerFactory(MessageDTO.class);
}
3. Listening for Messages
Finally, consume messages using the @KafkaListener
annotation.
@Service
@Slf4j
@RequiredArgsConstructor
public class KafkaMessageConsumer {
@KafkaListener(
topics = "${kafka-config.chat-topic-name}",
containerFactory = "messageKafkaListenerFactory"
)
public void consumeChat(MessageDTO message) {
log.info("Message consumed: {}", message);
}
}
π Real-World Usage
This Kafka configuration is part of the actual implementation of the Chat Service.
βοΈ Author
Le Minh Duc A passionate Java & Spring developer building scalable microservices.