Learnitweb

Kafka Producer in Spring Boot

In this tutorial, we’ll create a Kafka producer in Spring Boot.

Dependency

To work with Kafka in Spring Boot, add the following dependency:

<dependency>
	<groupId>org.springframework.kafka</groupId>
	<artifactId>spring-kafka</artifactId>
</dependency>

Configuration

Add the following configuration in the properties file, application.properties:

spring.kafka.producer.bootstrap-servers=localhost:9092,localhost:9094
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer

Explaination:

  1. Bootstrap Servers
spring.kafka.producer.bootstrap-servers=localhost:9092,localhost:9094
  • Specifies the Kafka broker addresses where the producer should send messages.
  • Here, two brokers (localhost:9092 and localhost:9094) are listed, indicating a Kafka cluster setup.
  • The producer will try these brokers to establish a connection.

2. Key Serializer

spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
  • Defines the serializer for the message key.
  • org.apache.kafka.common.serialization.StringSerializer converts the key into a byte array before sending it to Kafka.
  • Typically, keys are used for partitioning messages.

3. Value Serializer

spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer
  • Defines the serializer for the message value.
  • org.springframework.kafka.support.serializer.JsonSerializer converts Java objects into JSON format before sending them to Kafka.
  • This is useful when sending structured data as JSON instead of plain strings.

Implementation

Following is the

package com.learnitweb.productmicroservice.service;

import com.learnitweb.productmicroservice.rest.CreateProductRes;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.stereotype.Service;

import java.util.UUID;
import java.util.concurrent.CompletableFuture;

@Service
public class ProductServiceImpl implements ProductService{

    KafkaTemplate<String, ProductCreatedEvent> kafkaTemplate;

    public ProductServiceImpl(KafkaTemplate<String, ProductCreatedEvent> kafkaTemplate){
        this.kafkaTemplate = kafkaTemplate;
    }
    @Override
    public String createProduct(CreateProductRes productRes) {
        String productId = UUID.randomUUID().toString();

        ProductCreatedEvent productCreatedEvent = new ProductCreatedEvent(productId,
                productRes.getTitle(), productRes.getPrice(), productRes.getQuantity());

        CompletableFuture<SendResult<String, ProductCreatedEvent>> future = kafkaTemplate.send("product-created-events-topic", productId, productCreatedEvent);

        future.whenComplete((result, exception) -> {
            if(exception != null) {
                System.out.println("failed to send message");
                exception.printStackTrace();
            } else {
                System.out.println("Message sent successfully: " + result.getRecordMetadata());
            }
        });

        future.join();
        return productId;
    }
}

Following is the important piece of code to send event:

CompletableFuture<SendResult<String, ProductCreatedEvent>> future = kafkaTemplate.send("product-created-events-topic", productId, productCreatedEvent);
  • Topic Name: "product-created-events-topic" → This is the Kafka topic where the event is being published.
  • Message Key: productId → This is the key for the message, which helps in partitioning.
  • Message Value: productCreatedEvent → This is the actual event (of type ProductCreatedEvent) that will be sent to Kafka.
  • The send method returns a CompletableFuture<SendResult<K, V>>, which allows asynchronous processing.
  • SendResult<String, ProductCreatedEvent> contains metadata about the message once it is successfully sent.

Note: Here, future.join(); means this is a synchronous call.

Other classes

ProductController.java

package com.learnitweb.productmicroservice.rest;

import com.learnitweb.productmicroservice.service.ProductService;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

@RestController
@RequestMapping("/products")
public class ProductController {
    ProductService productService;

    public ProductController(ProductService productService){
        this.productService = productService;
    }

    @PostMapping
    public ResponseEntity<String> createProduct(@RequestBody CreateProductRes product){
        String productId = productService.createProduct(product);
        return ResponseEntity.status(HttpStatus.CREATED).body(productId);
    }
}

CreateProductRes.java

package com.learnitweb.productmicroservice.rest;

import java.math.BigDecimal;

public class CreateProductRes {
    private String title;
    private BigDecimal price;
    private Integer quantity;

    public String getTitle() {
        return title;
    }

    public void setTitle(String title) {
        this.title = title;
    }

    public BigDecimal getPrice() {
        return price;
    }

    public void setPrice(BigDecimal price) {
        this.price = price;
    }

    public Integer getQuantity() {
        return quantity;
    }

    public void setQuantity(Integer quantity) {
        this.quantity = quantity;
    }
}

ProductCreatedEvent.java

package com.learnitweb.productmicroservice.service;

import java.math.BigDecimal;

public class ProductCreatedEvent {
    private String productId;
    private String title;
    private BigDecimal price;
    private Integer quantity;

    public ProductCreatedEvent(){

    }

    public ProductCreatedEvent(String productId, String title, BigDecimal price, Integer quantity) {
        this.productId = productId;
        this.title = title;
        this.price = price;
        this.quantity = quantity;
    }

    public String getProductId() {
        return productId;
    }

    public void setProductId(String productId) {
        this.productId = productId;
    }

    public String getTitle() {
        return title;
    }

    public void setTitle(String title) {
        this.title = title;
    }

    public BigDecimal getPrice() {
        return price;
    }

    public void setPrice(BigDecimal price) {
        this.price = price;
    }

    public Integer getQuantity() {
        return quantity;
    }

    public void setQuantity(Integer quantity) {
        this.quantity = quantity;
    }
}

ProductService.java

package com.learnitweb.productmicroservice.service;

import com.learnitweb.productmicroservice.rest.CreateProductRes;

public interface ProductService {
    String createProduct(CreateProductRes productRes);
}

KafkaConfig.java

package com.learnitweb.productmicroservice;

import org.apache.kafka.clients.admin.NewTopic;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.TopicBuilder;

import java.util.Map;

@Configuration
public class KafkaConfig {

    @Bean
    NewTopic createTopic(){
        return TopicBuilder.name("product-created-events-topic")
                .partitions(3)
                .replicas(3)
                .configs(Map.of("min.insync.replicas","2"))
                .build();
    }
}

Trying it out

Once you start the application, the topic is created:

>kafka-topics.bat --bootstrap-server localhost:9092 --describe
Topic: product-created-events-topic     TopicId: yXka5-lURkeCBPKi9h4kwg PartitionCount: 3       ReplicationFactor: 3    Configs: min.insync.replicas=2,segment.bytes=1073741824
        Topic: product-created-events-topic     Partition: 0    Leader: 1       Replicas: 1,2,3 Isr: 1,2,3      Elr:    LastKnownElr:
        Topic: product-created-events-topic     Partition: 1    Leader: 2       Replicas: 2,3,1 Isr: 2,3,1      Elr:    LastKnownElr:
        Topic: product-created-events-topic     Partition: 2    Leader: 3       Replicas: 3,1,2 Isr: 3,1,2      Elr:    LastKnownElr:

Once the application is started, try the endpoint with postman:

localhost:8080/products

{
    "title": "Car",
    "price":500000,
    "quantity":1
}

In the Kafka consumer, you should see like the following:

>kafka-console-consumer.bat -topic product-created-events-topic --bootstrap-server localhost:9092 --property print.key=true
58d57236-be73-48bc-b2ee-86c76c2ceb7f    {"productId":"58d57236-be73-48bc-b2ee-86c76c2ceb7f","title":"Car","price":500000,"quantity":1}

Send message synchronously

Sending messages synchronously ensures that your microservice application waits for an acknowledgment from all Kafka brokers, guaranteeing that the message is successfully stored in the Kafka topic. This contrasts with asynchronous message sending, where there is a risk of message loss if an error occurs.

Methods to Make Kafka Message Sending Synchronous

There are multiple ways to make Kafka message sending synchronous. One quick method is to call the join method on the CompletableFuture object. However, while this approach is simple, it can be misleading to other developers who might assume the code is asynchronous due to the presence of CompletableFuture keywords. Instead, a more explicit approach is to remove CompletableFuture entirely and use a blocking get method.

Using join Method

If you want a quick way to make Kafka message sending synchronous while keeping the existing structure intact, you can add:

future.join();

This ensures that the thread waits at this line until the CompletableFuture completes. The benefit of this approach is that it can be easily refactored back to an asynchronous process by removing the join method. However, it might confuse developers who assume the presence of CompletableFuture implies an asynchronous process.

Removing CompletableFuture for Explicit Synchronization

A more explicit approach to making Kafka message sending synchronous involves removing CompletableFuture and using the get method. Here are the required changes:

  1. Remove CompletableFuture: Since the send method will no longer be asynchronous, eliminate any references to CompletableFuture.
  2. Remove the whenComplete Block: Since we are not waiting for a CompletableFuture result, we no longer need the whenComplete callback.
  3. Rename the Variable: Change the variable name from future to result to reflect its new synchronous behavior.
  4. Use get Method: To ensure the send method waits for an acknowledgment from the Kafka broker, invoke the get method:
SendResult<String, ProductCreatedEvent> result = kafkaTemplate.send("product-created-events-topic", productId, productCreatedEvent).get();