When designing a Kafka consumer application, you must decide how to handle failures.
Failures can happen for two main reasons:
- Temporary environment issues (like network or database failures)
- Bad or invalid messages (wrong format, corrupted data)
Spring Kafka allows you to classify exceptions into two types:
- Retryable Exceptions: We should retry these because they might succeed later.
- Non-Retryable Exceptions: Retrying is useless; these must be moved away immediately.
1. Retryable Exceptions
A retryable exception is an exception that happens due to a temporary issue that is likely to be fixed if we wait and retry after some time.
Retrying the same message later makes sense because the underlying cause might disappear automatically (network restored, database up again, external system healthy again).
Common real-world situations that cause retryable exceptions:
- Database temporarily unavailable due to maintenance, restart, or network partition.
- External API call fails because the service is temporarily down but will come back soon.
- Network latency or packet loss causing timeouts when making downstream calls.
- Kafka Broker temporary unavailability during cluster rebalancing.
Java/Spring exceptions that are usually retryable:
org.springframework.dao.TransientDataAccessException
— temporary database failure.java.net.SocketTimeoutException
— network call timeout.org.springframework.web.client.ResourceAccessException
— REST API access failure.org.apache.kafka.common.errors.RetriableException
— general Kafka retriable error.
Behavior for retryable exceptions:
- The consumer pauses briefly and retries processing the same message.
- Retry attempts are governed by a backoff policy (how many retries, how long to wait).
- If the retries are exhausted and still failing, then the message is moved to a Dead Letter Topic (DLT) for future investigation.
2. Non-Retryable Exceptions
A non-retryable exception occurs when the problem is with the message content itself, or the failure is a permanent logic problem in your application.
Retrying such a message will not solve anything because the message is fundamentally invalid, corrupted, or logically wrong.
Retrying it 1000 times would still fail.
Common real-world situations that cause non-retryable exceptions:
- Badly formatted JSON that your deserializer cannot parse.
- Missing mandatory fields like ID or name in your payload.
- Unexpected data types — for example, getting a String instead of an expected Integer.
- Schema mismatch — when two teams don’t coordinate on message structure.
- Wrong topic being consumed — getting a completely different type of message.
Java/Spring exceptions that are usually non-retryable:
org.springframework.kafka.support.serializer.DeserializationException
— failed to convert Kafka byte array to a Java object.com.fasterxml.jackson.databind.JsonMappingException
— JSON structure mismatch.IllegalArgumentException
— bad arguments or missing fields.org.apache.kafka.common.errors.SerializationException
— error during message serialization/deserialization.
Behavior for non-retryable exceptions:
- The consumer does NOT retry these messages because retrying is pointless.
- The message is immediately sent to a Dead Letter Topic (DLT) or discarded depending on your configuration.
- This keeps your consumer moving forward without getting stuck on bad messages.
3. How to Configure Retryable and Non-Retryable Exceptions in Spring Kafka
In Spring Kafka, you can control how different exceptions are treated by configuring a DefaultErrorHandler
.
You can explicitly tell Spring:
- “Please retry these exceptions.”
- “Please do NOT retry these exceptions.”
Example configuration:
import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.producer.ProducerConfig; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.core.env.Environment; import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; import org.springframework.kafka.core.ConsumerFactory; import java.util.Map; import java.util.HashMap; import org.springframework.kafka.core.DefaultKafkaConsumerFactory; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.core.ProducerFactory; import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.serialization.StringSerializer; import org.springframework.kafka.support.serializer.JsonDeserializer; import org.springframework.kafka.support.serializer.ErrorHandlingDeserializer; import org.springframework.kafka.listener.DeadLetterPublishingRecoverer; import org.springframework.kafka.listener.DefaultErrorHandler; import org.springframework.kafka.core.DefaultKafkaProducerFactory; import org.springframework.kafka.support.serializer.JsonSerializer; import org.springframework.util.backoff.FixedBackOff; import org.springframework.web.client.HttpServerErrorException; import com.appsdeveloperblog.ws.emailnotification.error.NotRetryableException; import com.appsdeveloperblog.ws.emailnotification.error.RetryableException; @Configuration public class KafkaConsumerConfiguration { @Autowired Environment environment; @Bean ConsumerFactory<String, Object> consumerFactory() { Map<String, Object> config = new HashMap<>(); config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, environment.getProperty("spring.kafka.consumer.bootstrap-servers")); config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class); config.put(ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS, JsonDeserializer.class); config.put(JsonDeserializer.TRUSTED_PACKAGES, environment.getProperty("spring.kafka.consumer.properties.spring.json.trusted.packages")); config.put(ConsumerConfig.GROUP_ID_CONFIG, environment.getProperty("spring.kafka.consumer.group-id")); return new DefaultKafkaConsumerFactory<>(config); } @Bean ConcurrentKafkaListenerContainerFactory<String, Object> kafkaListenerContainerFactory( ConsumerFactory<String, Object> consumerFactory, KafkaTemplate<String, Object> kafkaTemplate) { DefaultErrorHandler errorHandler = new DefaultErrorHandler(new DeadLetterPublishingRecoverer(kafkaTemplate), new FixedBackOff(5000,3)); errorHandler.addNotRetryableExceptions(NotRetryableException.class); errorHandler.addRetryableExceptions(RetryableException.class); ConcurrentKafkaListenerContainerFactory<String, Object> factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(consumerFactory); factory.setCommonErrorHandler(errorHandler); return factory; } @Bean KafkaTemplate<String, Object> kafkaTemplate(ProducerFactory<String, Object> producerFactory) { return new KafkaTemplate<>(producerFactory); } @Bean ProducerFactory<String, Object> producerFactory() { Map<String, Object> config = new HashMap<>(); config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, environment.getProperty("spring.kafka.consumer.bootstrap-servers")); config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class); config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); return new DefaultKafkaProducerFactory<>(config); } }
RetryableException.java
public class RetryableException extends RuntimeException { public RetryableException(String message) { super(message); // TODO Auto-generated constructor stub } public RetryableException(Throwable cause) { super(cause); // TODO Auto-generated constructor stub } }
NotRetryableException.java
public class NotRetryableException extends RuntimeException { public NotRetryableException(String message) { super(message); // TODO Auto-generated constructor stub } public NotRetryableException(Throwable cause) { super(cause); // TODO Auto-generated constructor stub } }
Important points in this configuration:
- DeadLetterPublishingRecoverer is used to move failed messages to a DLT.
- FixedBackOff configures retry behavior: wait time between retries and maximum attempts.
addNotRetryableExceptions()
defines which exceptions are not retried.addRetryableExceptions()
defines which exceptions are retried.
Throwing Exceptions in Kafka Listener
Now, inside your Kafka listener, you can catch different types of exceptions and intelligently rethrow either RetryableException
or NonRetryableException
.
@KafkaListener(topics = "email-notifications", groupId = "email-group") public void listen(EmailEvent event) { try { // Try to process the message emailService.sendEmail(event); } catch (HttpServerErrorException e) { // Temporary HTTP error (Retry) throw new RetryableException(e); } catch (IllegalArgumentException e) { // Bad data in message (Do not Retry) throw new NonRetryableException("Invalid Email Event received"); } catch (Exception e) { // Default: Retryable for unexpected issues throw new RetryableException(e); } }
Detailed Explanation:
- If a temporary HTTP server error occurs, we wrap and throw
RetryableException
. - If the incoming message is invalid (bad payload, bad format), we wrap and throw
NonRetryableException
. - For unexpected unknown exceptions, we default to retryable, assuming it might be recoverable.
This ensures maximum reliability while preventing infinite retries for hopeless cases.
4. Real-life Message Flow in Kafka Consumer (with retryable and non-retryable)
Step | Details |
1 | Consumer reads a new message from the Kafka topic. |
2 | Consumer attempts to process the message. |
3 | An exception occurs during processing. |
4A | If exception is retryable: retry consuming the message according to the backoff policy (example: 3 retries, 1 second apart). |
4B | If exception is non-retryable: immediately send the message to a Dead Letter Topic (DLT) without retries. |
5 | If retries are exhausted for a retryable exception, the message is also moved to DLT. |
6 | Consumer continues processing the next message without getting stuck. |
5. Why It’s Critical to Separate Retryable and Non-Retryable?
Understanding and implementing this separation properly brings huge benefits:
- Prevents infinite loops: Without proper separation, your consumer might get stuck retrying a bad message forever.
- Improves consumer reliability: You can keep your consumers healthy by moving bad data out quickly.
- Helps debugging: All bad messages land in the DLT, which you can inspect separately and fix root causes.
- Optimizes resource usage: Prevents wasting CPU/memory trying to reprocess hopeless messages.
- Allows better system recovery: Retryable errors can be self-healed without human intervention.