In this tutorial, we’ll create a Kafka producer in Spring Boot.
Dependency
To work with Kafka in Spring Boot, add the following dependency:
<dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> </dependency>
Configuration
Add the following configuration in the properties file, application.properties:
spring.kafka.producer.bootstrap-servers=localhost:9092,localhost:9094 spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer
Explaination:
- Bootstrap Servers
spring.kafka.producer.bootstrap-servers=localhost:9092,localhost:9094
- Specifies the Kafka broker addresses where the producer should send messages.
- Here, two brokers (
localhost:9092andlocalhost:9094) are listed, indicating a Kafka cluster setup. - The producer will try these brokers to establish a connection.
2. Key Serializer
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
- Defines the serializer for the message key.
org.apache.kafka.common.serialization.StringSerializerconverts the key into a byte array before sending it to Kafka.- Typically, keys are used for partitioning messages.
3. Value Serializer
spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer
- Defines the serializer for the message value.
org.springframework.kafka.support.serializer.JsonSerializerconverts Java objects into JSON format before sending them to Kafka.- This is useful when sending structured data as JSON instead of plain strings.
Implementation
Following is the
package com.learnitweb.productmicroservice.service;
import com.learnitweb.productmicroservice.rest.CreateProductRes;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.stereotype.Service;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
@Service
public class ProductServiceImpl implements ProductService{
KafkaTemplate<String, ProductCreatedEvent> kafkaTemplate;
public ProductServiceImpl(KafkaTemplate<String, ProductCreatedEvent> kafkaTemplate){
this.kafkaTemplate = kafkaTemplate;
}
@Override
public String createProduct(CreateProductRes productRes) {
String productId = UUID.randomUUID().toString();
ProductCreatedEvent productCreatedEvent = new ProductCreatedEvent(productId,
productRes.getTitle(), productRes.getPrice(), productRes.getQuantity());
CompletableFuture<SendResult<String, ProductCreatedEvent>> future = kafkaTemplate.send("product-created-events-topic", productId, productCreatedEvent);
future.whenComplete((result, exception) -> {
if(exception != null) {
System.out.println("failed to send message");
exception.printStackTrace();
} else {
System.out.println("Message sent successfully: " + result.getRecordMetadata());
}
});
future.join();
return productId;
}
}
Following is the important piece of code to send event:
CompletableFuture<SendResult<String, ProductCreatedEvent>> future = kafkaTemplate.send("product-created-events-topic", productId, productCreatedEvent);
- Topic Name:
"product-created-events-topic"→ This is the Kafka topic where the event is being published. - Message Key:
productId→ This is the key for the message, which helps in partitioning. - Message Value:
productCreatedEvent→ This is the actual event (of typeProductCreatedEvent) that will be sent to Kafka. - The
sendmethod returns aCompletableFuture<SendResult<K, V>>, which allows asynchronous processing. SendResult<String, ProductCreatedEvent>contains metadata about the message once it is successfully sent.
Note: Here, future.join(); means this is a synchronous call.
Other classes
ProductController.java
package com.learnitweb.productmicroservice.rest;
import com.learnitweb.productmicroservice.service.ProductService;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
@RestController
@RequestMapping("/products")
public class ProductController {
ProductService productService;
public ProductController(ProductService productService){
this.productService = productService;
}
@PostMapping
public ResponseEntity<String> createProduct(@RequestBody CreateProductRes product){
String productId = productService.createProduct(product);
return ResponseEntity.status(HttpStatus.CREATED).body(productId);
}
}
CreateProductRes.java
package com.learnitweb.productmicroservice.rest;
import java.math.BigDecimal;
public class CreateProductRes {
private String title;
private BigDecimal price;
private Integer quantity;
public String getTitle() {
return title;
}
public void setTitle(String title) {
this.title = title;
}
public BigDecimal getPrice() {
return price;
}
public void setPrice(BigDecimal price) {
this.price = price;
}
public Integer getQuantity() {
return quantity;
}
public void setQuantity(Integer quantity) {
this.quantity = quantity;
}
}
ProductCreatedEvent.java
package com.learnitweb.productmicroservice.service;
import java.math.BigDecimal;
public class ProductCreatedEvent {
private String productId;
private String title;
private BigDecimal price;
private Integer quantity;
public ProductCreatedEvent(){
}
public ProductCreatedEvent(String productId, String title, BigDecimal price, Integer quantity) {
this.productId = productId;
this.title = title;
this.price = price;
this.quantity = quantity;
}
public String getProductId() {
return productId;
}
public void setProductId(String productId) {
this.productId = productId;
}
public String getTitle() {
return title;
}
public void setTitle(String title) {
this.title = title;
}
public BigDecimal getPrice() {
return price;
}
public void setPrice(BigDecimal price) {
this.price = price;
}
public Integer getQuantity() {
return quantity;
}
public void setQuantity(Integer quantity) {
this.quantity = quantity;
}
}
ProductService.java
package com.learnitweb.productmicroservice.service;
import com.learnitweb.productmicroservice.rest.CreateProductRes;
public interface ProductService {
String createProduct(CreateProductRes productRes);
}
KafkaConfig.java
package com.learnitweb.productmicroservice;
import org.apache.kafka.clients.admin.NewTopic;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.TopicBuilder;
import java.util.Map;
@Configuration
public class KafkaConfig {
@Bean
NewTopic createTopic(){
return TopicBuilder.name("product-created-events-topic")
.partitions(3)
.replicas(3)
.configs(Map.of("min.insync.replicas","2"))
.build();
}
}
Trying it out
Once you start the application, the topic is created:
>kafka-topics.bat --bootstrap-server localhost:9092 --describe
Topic: product-created-events-topic TopicId: yXka5-lURkeCBPKi9h4kwg PartitionCount: 3 ReplicationFactor: 3 Configs: min.insync.replicas=2,segment.bytes=1073741824
Topic: product-created-events-topic Partition: 0 Leader: 1 Replicas: 1,2,3 Isr: 1,2,3 Elr: LastKnownElr:
Topic: product-created-events-topic Partition: 1 Leader: 2 Replicas: 2,3,1 Isr: 2,3,1 Elr: LastKnownElr:
Topic: product-created-events-topic Partition: 2 Leader: 3 Replicas: 3,1,2 Isr: 3,1,2 Elr: LastKnownElr:
Once the application is started, try the endpoint with postman:
localhost:8080/products
{
"title": "Car",
"price":500000,
"quantity":1
}
In the Kafka consumer, you should see like the following:
>kafka-console-consumer.bat -topic product-created-events-topic --bootstrap-server localhost:9092 --property print.key=true
58d57236-be73-48bc-b2ee-86c76c2ceb7f {"productId":"58d57236-be73-48bc-b2ee-86c76c2ceb7f","title":"Car","price":500000,"quantity":1}
Send message synchronously
Sending messages synchronously ensures that your microservice application waits for an acknowledgment from all Kafka brokers, guaranteeing that the message is successfully stored in the Kafka topic. This contrasts with asynchronous message sending, where there is a risk of message loss if an error occurs.
Methods to Make Kafka Message Sending Synchronous
There are multiple ways to make Kafka message sending synchronous. One quick method is to call the join method on the CompletableFuture object. However, while this approach is simple, it can be misleading to other developers who might assume the code is asynchronous due to the presence of CompletableFuture keywords. Instead, a more explicit approach is to remove CompletableFuture entirely and use a blocking get method.
Using join Method
If you want a quick way to make Kafka message sending synchronous while keeping the existing structure intact, you can add:
future.join();
This ensures that the thread waits at this line until the CompletableFuture completes. The benefit of this approach is that it can be easily refactored back to an asynchronous process by removing the join method. However, it might confuse developers who assume the presence of CompletableFuture implies an asynchronous process.
Removing CompletableFuture for Explicit Synchronization
A more explicit approach to making Kafka message sending synchronous involves removing CompletableFuture and using the get method. Here are the required changes:
- Remove
CompletableFuture: Since thesendmethod will no longer be asynchronous, eliminate any references toCompletableFuture. - Remove the
whenCompleteBlock: Since we are not waiting for aCompletableFutureresult, we no longer need thewhenCompletecallback. - Rename the Variable: Change the variable name from
futuretoresultto reflect its new synchronous behavior. - Use
getMethod: To ensure thesendmethod waits for an acknowledgment from the Kafka broker, invoke thegetmethod:
SendResult<String, ProductCreatedEvent> result = kafkaTemplate.send("product-created-events-topic", productId, productCreatedEvent).get();
