Introduction
In this tutorial, we will build a Spring Boot microservice that acts as a Kafka consumer. The primary use case is to consume messages from a Kafka topic published by another microservice
Kafka Consumer Characteristics
- Message Persistence:
- Kafka follows a log-based storage model. Once a message is published to a topic, it remains in the topic until Kafka’s retention policy deletes it.
- This means even after a consumer reads a message, that message still exists in the partition for other consumers (from different groups) to read.
- The default retention period is 168 hours (7 days), but this can be configured in Kafka’s settings.
- Multiple Consumers:
- Kafka supports having multiple consumer instances read from the same topic.
- These consumers can either be part of the same group (to share the load) or different groups (to process all messages independently).
- Each consumer in the same group reads messages from exclusive partitions to avoid duplication.
- Ordering Guarantees:
- Kafka guarantees strict ordering of messages within a single partition. If a producer sends messages A, B, C to partition 1, a consumer will always read them in that exact order.
- However, there is no global ordering between partitions. Messages from partition 0 may be read before or after messages in partition 2.
- This design choice helps Kafka achieve high throughput while maintaining predictable behavior within a partition.
- Parallelism and Scaling:
- With a single consumer instance, messages are pulled from all partitions. However, this approach limits throughput and fault tolerance.
- By running multiple instances of the same consumer microservice, Kafka can assign different partitions to different instances.
- For example, with three consumer instances and three partitions, each instance can process one partition’s messages independently and in parallel, leading to improved performance.
- Consumer Groups:
- A consumer group is a collection of consumer instances that work together to consume a topic.
- Kafka ensures that each partition is consumed by only one member of the group.
- This feature enables load balancing: messages are evenly distributed among the consumers, and each one processes its own set of messages.
- If a new instance joins the group, Kafka rebalances the assignment of partitions among all consumers in the group.
Configuring application.properties
for Kafka Consumer
Now that your project is set up, the next step is to configure Kafka consumer properties using the application.properties
file.
1. Set the Server Port:
server.port=0
- Setting the port to
0
allows the operating system to assign a random port number at runtime. - This is especially useful during development or when you want to run multiple instances of your microservice without port conflicts.
2. Specify Kafka Bootstrap Servers
spring.kafka.consumer.bootstrap-servers=localhost:1992,localhost:1993
- This property lists the initial Kafka brokers your consumer will contact to establish a connection with the Kafka cluster.
- Even though only one broker is required for connection, specifying multiple brokers improves fault tolerance.
- Once connected to a broker, the consumer can automatically discover and communicate with all other brokers in the cluster.
3. Set Key Deserializer
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
- Kafka messages consist of key-value pairs serialized as byte arrays.
- This property defines the deserializer class that converts the key from a byte array into its original format.
- In this case, we use the
StringDeserializer
to convert keys into readable string values.
4. Set Value Deserializer
spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer
- This property defines the deserializer class for the message value.
- The
JsonDeserializer
from Spring Kafka is used here to convert JSON-formatted byte arrays into Java objects. - This is useful when your producer sends structured data like POJOs as JSON.
5. Configure Consumer Group ID
spring.kafka.consumer.group-id=product-created-events
- A consumer group ID uniquely identifies a set of consumers that share the workload of reading from a Kafka topic.
- All consumer instances with the same group ID coordinate to consume messages from different partitions of the same topic without overlapping.
- If a consumer in the group fails or shuts down, Kafka reassigns its partitions to other group members.
6. Define Trusted Packages for Deserialization
spring.kafka.consumer.properties.spring.json.trusted.packages=*
- During deserialization, Kafka needs to know which packages are allowed to load classes.
- Using
*
means all packages are trusted — this is acceptable for development. - For production environments, it’s best to restrict this to specific packages (e.g.,
com.appdeveloperblog.ws.shared.events
) to reduce security risks.
Kafka Consumer
import org.springframework.kafka.annotation.KafkaListener; import org.springframework.kafka.annotation.KafkaHandler; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.stereotype.Component; @Component @KafkaListener(topics = "product-created-event-topic") public class ProductCreatedEventHandler { private static final Logger logger = LoggerFactory.getLogger(ProductCreatedEventHandler.class); @KafkaHandler public void handle(ProductCreatedEvent event) { logger.info("Received new product created event: " + event.getTitle()); } }
In this case:
- The
@KafkaListener
annotation is placed at the class level, indicating that all methods inside this class are listening to messages from the specified topic. - The method
handle
is annotated with@KafkaHandler
to specify it is a handler for the product-created-event-topic.
Now, if you want to handle multiple event types in the same class, you can add more methods and annotate each with @KafkaHandler
.
Kafka Consumer Spring Bean Configuration
We configured the Kafka consumer for our microservice using the application.properties
file. While using application.properties
is simple and centralized, it has a major drawback: when specifying key and value deserializers, we must type the fully qualified class names manually.
This can easily lead to typos, and the compiler won’t catch them — errors will only surface when running the application. To avoid this risk, we can configure the Kafka consumer directly in Java code by defining a Spring @Configuration
class.
Now, we’ll move Kafka consumer configuration from the application.properties
file into a dedicated Java configuration class using Spring Beans.
import org.apache.kafka.common.serialization.StringDeserializer; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.core.env.Environment; import org.springframework.kafka.core.ConsumerFactory; import org.springframework.kafka.core.DefaultKafkaConsumerFactory; import org.springframework.kafka.support.serializer.JsonDeserializer; import java.util.HashMap; import java.util.Map; import org.apache.kafka.clients.consumer.ConsumerConfig; @Configuration public class KafkaConsumerConfiguration { @Autowired private Environment environment; @Bean public ConsumerFactory<String, Object> consumerFactory() { Map<String, Object> configs = new HashMap<>(); configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, environment.getProperty("spring.kafka.bootstrap-servers")); configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class); configs.put(JsonDeserializer.TRUSTED_PACKAGES, environment.getProperty("spring.kafka.consumer.trusted-packages")); configs.put(ConsumerConfig.GROUP_ID_CONFIG, environment.getProperty("spring.kafka.consumer.group-id")); return new DefaultKafkaConsumerFactory<>(configs); } }