1. Introduction
Kafka transactions ensure that either all Kafka operations within a transaction succeed, or none of them are applied. This guarantees:
- Atomicity: A batch of messages is either fully written or not written at all.
- Exactly-once semantics: Messages are delivered only once, even in case of failures.
Kafka achieves this by tagging producers with transactional IDs, enabling Kafka to keep track of the state of transactions even across application crashes or restarts.
2. Enable Transactions via application.properties
To enable Kafka transactions in a Spring Boot microservice acting as a producer, modify your application.properties
file:
spring.kafka.producer.transaction-id-prefix=transfer-service-
Here:
spring.kafka.producer.transaction-id-prefix
enables transactional support.- The value
transfer-service-
is the prefix used to generate unique transaction IDs for each producer instance.
You can use any value as the prefix, not just transfer-service
.
3. Understanding the Transaction ID Prefix
Spring Kafka uses a ProducerFactory to manage producers. When transactions are enabled using the transaction-id-prefix
, Spring Kafka:
- Creates a pool of producers.
- Assigns each producer a unique transactional ID using the given prefix plus an auto-generated index.
For example:
transfer-service-0
transfer-service-1
transfer-service-2
The suffix (-0
, -1
, etc.) is added automatically.
The transaction ID helps Kafka:
- Keep track of which producer sent which message.
- Recover properly after crashes or restarts.
- Avoid duplicating messages or committing incomplete transactions.
4. Making Transaction ID Prefix Unique Across Instances
When multiple instances of your microservice run (e.g., in a Kubernetes cluster), each instance must have a unique transaction ID prefix.
To generate a unique prefix dynamically, you can append a random value using Spring Boot’s expression language:
spring.kafka.producer.transaction-id-prefix=transfer-service-${random.value}-
This generates a random alphanumeric string, ensuring each instance gets a unique ID like:
transfer-service-3f6b2a91-
This generates a random alphanumeric string, ensuring each instance gets a unique ID like:
transfer-service-3f6b2a91-
5. Enable Logging for Kafka Transactions
To inspect transactional behavior during runtime, enable debug or trace logs.
For Kafka Transaction Logs:
logging.level.org.springframework.kafka.transaction=DEBUG
For Spring Transaction Infrastructure:
logging.level.org.springframework.transaction=TRACE
6. Enabling Kafka Transactions with Manually Configured ProducerFactory
In scenarios where you configure ProducerFactory
manually using Java code (via a @Bean
method), enabling transactions requires additional steps.
Let’s say you are working in a microservice called TransferService
, and you’ve manually created the producer configuration in a class like KafkaConfig
.
Step 1: Define a Member Variable to Hold the Transaction ID Prefix
In your KafkaConfig
class, define a variable that reads from application.properties
:
@Value("${spring.kafka.producer.transaction-id-prefix}") private String transactionalIdPrefix;
Step 2: Add Transactional ID to Producer Config
@Bean public ProducerFactory<String, String> producerFactory() { Map<String, Object> configProps = new HashMap<>(); configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); configProps.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, transactionalIdPrefix); DefaultKafkaProducerFactory<String, String> factory = new DefaultKafkaProducerFactory<>(configProps); factory.setTransactionIdPrefix(transactionalIdPrefix); return factory; }
Step 3: Configure KafkaTemplate and KafkaTransactionManager
@Bean public KafkaTemplate<String, String> kafkaTemplate(ProducerFactory<String, String> producerFactory) { return new KafkaTemplate<>(producerFactory); } @Bean public KafkaTransactionManager<String, String> kafkaTransactionManager(ProducerFactory<String, String> producerFactory) { return new KafkaTransactionManager<>(producerFactory); }
7. Sending Multiple Kafka Messages Within a Single Transaction
Annotate your message-sending method with @Transactional
:
@Transactional("kafkaTransactionManager") public void transfer(TransferRequest request) { kafkaTemplate.send("withdraw-money", request.getWithdrawInfo()); kafkaTemplate.send("deposit-money", request.getDepositInfo()); }
This ensures all messages in the method are part of one transaction.
8. Controlling Transaction Rollbacks with Checked and Unchecked Exceptions
By default, Spring rolls back for runtime exceptions and errors, but not for checked exceptions.
Custom Rollback:
@Transactional(value = "kafkaTransactionManager", rollbackFor = { SQLException.class, ConnectException.class })
Prevent Rollback:
@Transactional(value = "kafkaTransactionManager", noRollbackFor = { MyIgnoredException.class })
If your microservice is simple, the default behavior (@Transactional
) is usually sufficient.
9. Configuring Kafka Consumers to Read Only Committed Messages
Kafka consumers, by default, read all messages — even uncommitted or aborted ones. To ensure consumers read only successfully committed messages, set the following property:
If using application.properties
:
spring.kafka.consumer.isolation-level=read_committed
If configuring ConsumerFactory
in Java code:
props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, env.getProperty("spring.kafka.consumer.isolation-level", "read_committed").toLowerCase());
Note: In application.properties
, use uppercase READ_COMMITTED
. But in Java code, pass read_committed
in lowercase to avoid startup errors.
This ensures your consumers only process messages that are part of successfully committed transactions.