In this tutorial, we’ll create a Kafka producer in Spring Boot.
Dependency
To work with Kafka in Spring Boot, add the following dependency:
<dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> </dependency>
Configuration
Add the following configuration in the properties file, application.properties
:
spring.kafka.producer.bootstrap-servers=localhost:9092,localhost:9094 spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer
Explaination:
- Bootstrap Servers
spring.kafka.producer.bootstrap-servers=localhost:9092,localhost:9094
- Specifies the Kafka broker addresses where the producer should send messages.
- Here, two brokers (
localhost:9092
andlocalhost:9094
) are listed, indicating a Kafka cluster setup. - The producer will try these brokers to establish a connection.
2. Key Serializer
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
- Defines the serializer for the message key.
org.apache.kafka.common.serialization.StringSerializer
converts the key into a byte array before sending it to Kafka.- Typically, keys are used for partitioning messages.
3. Value Serializer
spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer
- Defines the serializer for the message value.
org.springframework.kafka.support.serializer.JsonSerializer
converts Java objects into JSON format before sending them to Kafka.- This is useful when sending structured data as JSON instead of plain strings.
Implementation
Following is the
package com.learnitweb.productmicroservice.service; import com.learnitweb.productmicroservice.rest.CreateProductRes; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.support.SendResult; import org.springframework.stereotype.Service; import java.util.UUID; import java.util.concurrent.CompletableFuture; @Service public class ProductServiceImpl implements ProductService{ KafkaTemplate<String, ProductCreatedEvent> kafkaTemplate; public ProductServiceImpl(KafkaTemplate<String, ProductCreatedEvent> kafkaTemplate){ this.kafkaTemplate = kafkaTemplate; } @Override public String createProduct(CreateProductRes productRes) { String productId = UUID.randomUUID().toString(); ProductCreatedEvent productCreatedEvent = new ProductCreatedEvent(productId, productRes.getTitle(), productRes.getPrice(), productRes.getQuantity()); CompletableFuture<SendResult<String, ProductCreatedEvent>> future = kafkaTemplate.send("product-created-events-topic", productId, productCreatedEvent); future.whenComplete((result, exception) -> { if(exception != null) { System.out.println("failed to send message"); exception.printStackTrace(); } else { System.out.println("Message sent successfully: " + result.getRecordMetadata()); } }); future.join(); return productId; } }
Following is the important piece of code to send event:
CompletableFuture<SendResult<String, ProductCreatedEvent>> future = kafkaTemplate.send("product-created-events-topic", productId, productCreatedEvent);
- Topic Name:
"product-created-events-topic"
→ This is the Kafka topic where the event is being published. - Message Key:
productId
→ This is the key for the message, which helps in partitioning. - Message Value:
productCreatedEvent
→ This is the actual event (of typeProductCreatedEvent
) that will be sent to Kafka. - The
send
method returns aCompletableFuture<SendResult<K, V>>
, which allows asynchronous processing. SendResult<String, ProductCreatedEvent>
contains metadata about the message once it is successfully sent.
Note: Here, future.join();
means this is a synchronous call.
Other classes
ProductController.java
package com.learnitweb.productmicroservice.rest; import com.learnitweb.productmicroservice.service.ProductService; import org.springframework.http.HttpStatus; import org.springframework.http.ResponseEntity; import org.springframework.web.bind.annotation.PostMapping; import org.springframework.web.bind.annotation.RequestBody; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; @RestController @RequestMapping("/products") public class ProductController { ProductService productService; public ProductController(ProductService productService){ this.productService = productService; } @PostMapping public ResponseEntity<String> createProduct(@RequestBody CreateProductRes product){ String productId = productService.createProduct(product); return ResponseEntity.status(HttpStatus.CREATED).body(productId); } }
CreateProductRes.java
package com.learnitweb.productmicroservice.rest; import java.math.BigDecimal; public class CreateProductRes { private String title; private BigDecimal price; private Integer quantity; public String getTitle() { return title; } public void setTitle(String title) { this.title = title; } public BigDecimal getPrice() { return price; } public void setPrice(BigDecimal price) { this.price = price; } public Integer getQuantity() { return quantity; } public void setQuantity(Integer quantity) { this.quantity = quantity; } }
ProductCreatedEvent.java
package com.learnitweb.productmicroservice.service; import java.math.BigDecimal; public class ProductCreatedEvent { private String productId; private String title; private BigDecimal price; private Integer quantity; public ProductCreatedEvent(){ } public ProductCreatedEvent(String productId, String title, BigDecimal price, Integer quantity) { this.productId = productId; this.title = title; this.price = price; this.quantity = quantity; } public String getProductId() { return productId; } public void setProductId(String productId) { this.productId = productId; } public String getTitle() { return title; } public void setTitle(String title) { this.title = title; } public BigDecimal getPrice() { return price; } public void setPrice(BigDecimal price) { this.price = price; } public Integer getQuantity() { return quantity; } public void setQuantity(Integer quantity) { this.quantity = quantity; } }
ProductService.java
package com.learnitweb.productmicroservice.service; import com.learnitweb.productmicroservice.rest.CreateProductRes; public interface ProductService { String createProduct(CreateProductRes productRes); }
KafkaConfig.java
package com.learnitweb.productmicroservice; import org.apache.kafka.clients.admin.NewTopic; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.kafka.config.TopicBuilder; import java.util.Map; @Configuration public class KafkaConfig { @Bean NewTopic createTopic(){ return TopicBuilder.name("product-created-events-topic") .partitions(3) .replicas(3) .configs(Map.of("min.insync.replicas","2")) .build(); } }
Trying it out
Once you start the application, the topic is created:
>kafka-topics.bat --bootstrap-server localhost:9092 --describe Topic: product-created-events-topic TopicId: yXka5-lURkeCBPKi9h4kwg PartitionCount: 3 ReplicationFactor: 3 Configs: min.insync.replicas=2,segment.bytes=1073741824 Topic: product-created-events-topic Partition: 0 Leader: 1 Replicas: 1,2,3 Isr: 1,2,3 Elr: LastKnownElr: Topic: product-created-events-topic Partition: 1 Leader: 2 Replicas: 2,3,1 Isr: 2,3,1 Elr: LastKnownElr: Topic: product-created-events-topic Partition: 2 Leader: 3 Replicas: 3,1,2 Isr: 3,1,2 Elr: LastKnownElr:
Once the application is started, try the endpoint with postman:
localhost:8080/products
{ "title": "Car", "price":500000, "quantity":1 }
In the Kafka consumer, you should see like the following:
>kafka-console-consumer.bat -topic product-created-events-topic --bootstrap-server localhost:9092 --property print.key=true 58d57236-be73-48bc-b2ee-86c76c2ceb7f {"productId":"58d57236-be73-48bc-b2ee-86c76c2ceb7f","title":"Car","price":500000,"quantity":1}
Send message synchronously
Sending messages synchronously ensures that your microservice application waits for an acknowledgment from all Kafka brokers, guaranteeing that the message is successfully stored in the Kafka topic. This contrasts with asynchronous message sending, where there is a risk of message loss if an error occurs.
Methods to Make Kafka Message Sending Synchronous
There are multiple ways to make Kafka message sending synchronous. One quick method is to call the join
method on the CompletableFuture
object. However, while this approach is simple, it can be misleading to other developers who might assume the code is asynchronous due to the presence of CompletableFuture
keywords. Instead, a more explicit approach is to remove CompletableFuture
entirely and use a blocking get
method.
Using join
Method
If you want a quick way to make Kafka message sending synchronous while keeping the existing structure intact, you can add:
future.join();
This ensures that the thread waits at this line until the CompletableFuture
completes. The benefit of this approach is that it can be easily refactored back to an asynchronous process by removing the join
method. However, it might confuse developers who assume the presence of CompletableFuture
implies an asynchronous process.
Removing CompletableFuture
for Explicit Synchronization
A more explicit approach to making Kafka message sending synchronous involves removing CompletableFuture
and using the get
method. Here are the required changes:
- Remove
CompletableFuture
: Since thesend
method will no longer be asynchronous, eliminate any references toCompletableFuture
. - Remove the
whenComplete
Block: Since we are not waiting for aCompletableFuture
result, we no longer need thewhenComplete
callback. - Rename the Variable: Change the variable name from
future
toresult
to reflect its new synchronous behavior. - Use
get
Method: To ensure thesend
method waits for an acknowledgment from the Kafka broker, invoke theget
method:
SendResult<String, ProductCreatedEvent> result = kafkaTemplate.send("product-created-events-topic", productId, productCreatedEvent).get();