Learnitweb

Dead Letter Topic (DLT) in Kafka with Spring Boot

1. Introduction

In a Kafka-based microservices system, consumers continuously read and process messages from topics. However, sometimes the consumers can encounter problematic messages:

  • Messages that cannot be deserialized due to format mismatches.
  • Messages that throw exceptions during processing.

If we don’t handle these bad messages carefully, they can crash the consumer or block it into an endless retry loop.

One way to deal with such issues is by using a Dead Letter Topic (DLT) — a special Kafka topic where we can send failed messages instead of endlessly retrying or ignoring them.

2. Why Use Dead Letter Topics?

  • Isolate problematic messages from normal message flow.
  • Inspect bad messages later for debugging.
  • Avoid consumer crash or endless retries.
  • Create a separate flow to deal with invalid messages (e.g., alert someone, fix manually).

3. Basic Idea

  • Producer sends a message to Kafka topic products.
  • Consumer reads and processes the message.
  • If deserialization or processing fails, instead of retrying endlessly:
    • The consumer sends the bad message to another topic, called products.DLT (DLT = Dead Letter Topic).
    • Consumer skips the faulty message and moves on to next one.

4. Example Scenario

System Components:

  • Products Microservice: Sends Product Created events as JSON to Kafka topic products.
  • Email Notification Microservice: Listens to the products topic and sends notification emails.

Problem:

  • A new Admin Microservice mistakenly sends a String (not a JSON) to the products topic.
  • Email Notification Microservice fails to deserialize that message (because it expects JSON).
  • Without DLT setup, the consumer would fail again and again.

5. Dead Letter Topic Approach

We configure Email Notification Microservice to:

  • Catch deserialization or processing errors.
  • Send bad messages to a products.DLT topic.
  • Move on and continue processing good messages.

6. Step-by-Step Setup in Spring Boot

6.1 Project Setup

Add the following dependencies in pom.xml:

<dependencies>
    <dependency>
        <groupId>org.springframework.kafka</groupId>
        <artifactId>spring-kafka</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter</artifactId>
    </dependency>
</dependencies>

6.2 Create ProductCreatedEvent class

package com.example.kafka.model;

public class ProductCreatedEvent {
    private String id;
    private String name;
    private double price;

    // Getters and setters
}

6.3 Kafka Consumer Configuration

Create a configuration class KafkaConsumerConfig.java:

package com.example.kafka.config;

import com.example.kafka.model.ProductCreatedEvent;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.env.Environment;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.*;
import org.springframework.kafka.listener.DefaultErrorHandler;
import org.springframework.kafka.listener.DeadLetterPublishingRecoverer;
import org.springframework.kafka.support.serializer.JsonDeserializer;
import org.springframework.kafka.support.serializer.JsonSerializer;
import org.springframework.util.backoff.FixedBackOff;

import java.util.HashMap;
import java.util.Map;

@EnableKafka
@Configuration
public class KafkaConsumerConfig {

    @Value("${spring.kafka.bootstrap-servers}")
    private String bootstrapServers;

    // 1. Kafka Listener Factory with Error Handling
    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, ProductCreatedEvent> kafkaListenerContainerFactory(
            ConsumerFactory<String, ProductCreatedEvent> consumerFactory,
            KafkaTemplate<String, Object> kafkaTemplate) {

        ConcurrentKafkaListenerContainerFactory<String, ProductCreatedEvent> factory =
                new ConcurrentKafkaListenerContainerFactory<>();

        factory.setConsumerFactory(consumerFactory);

        // Set Error Handler
        DeadLetterPublishingRecoverer recoverer = new DeadLetterPublishingRecoverer(kafkaTemplate);
        DefaultErrorHandler errorHandler = new DefaultErrorHandler(recoverer, new FixedBackOff(0L, 0)); // No retries

        factory.setCommonErrorHandler(errorHandler);

        return factory;
    }

    // 2. Consumer Factory
    @Bean
    public ConsumerFactory<String, ProductCreatedEvent> consumerFactory() {
        JsonDeserializer<ProductCreatedEvent> deserializer = new JsonDeserializer<>(ProductCreatedEvent.class);
        deserializer.addTrustedPackages("*");

        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "email-notification-group");
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

        return new DefaultKafkaConsumerFactory<>(props, new StringDeserializer(), deserializer);
    }

    // 3. Kafka Template (for Dead Letter Publishing Recoverer)
    @Bean
    public KafkaTemplate<String, Object> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }

    // 4. Producer Factory
    @Bean
    public ProducerFactory<String, Object> producerFactory() {
        Map<String, Object> configProps = new HashMap<>();
        configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
        return new DefaultKafkaProducerFactory<>(configProps);
    }
}

6.4 Kafka Listener

Create a listener in KafkaConsumerService.java:

package com.example.kafka.service;

import com.example.kafka.model.ProductCreatedEvent;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;

@Service
public class KafkaConsumerService {

    @KafkaListener(topics = "products", groupId = "email-notification-group")
    public void consume(ProductCreatedEvent event) {
        System.out.println("Consumed Event: " + event.getName());
        
        // Simulate processing
        if (event.getName().equalsIgnoreCase("bad-product")) {
            throw new RuntimeException("Simulated processing error");
        }
    }
}

6.5 Application Properties

Configure Kafka in application.properties:

spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.group-id=email-notification-group

7. What Happens at Runtime?

  • If deserialization succeeds and processing is fine, the consumer processes normally.
  • If deserialization fails or an exception is thrown inside the listener method:
    • Kafka will send the problematic message to a new topic: products.DLT.
    • The message is saved there for further analysis or reprocessing.

8. Notes

  • FixedBackOff(0L, 0) means no retries — fail immediately and move to DLT.
  • Dead Letter Topic naming convention by default: <original-topic>.DLT
  • You can create a separate consumer for Dead Letter Topics to process or alert on failures.
  • It’s good practice to monitor DLT topics and have alerting.