In this lecture, we will learn how to perform atomic transactions in Redis, using a reactive approach. Transactions help ensure that a series of operations either all succeed or all fail, which is crucial for scenarios like banking or balance transfers.
1. Problem Overview
- Without transactions, performing multiple operations can lead to inconsistent state if an error occurs midway.
- Example: Transferring money between two accounts.
- If the debit succeeds but the credit fails, the total amount becomes inconsistent.
- Redis provides transaction mechanisms to ensure all operations in a block are atomic.
2. Non-Transactional Example
Setup:
- Assume two users with balances:
long userOneBalance = 100; // User 1 long userTwoBalance = 0; // User 2
- Transfer logic without transaction:
public Mono<Void> transferNonTransactional(Bucket from, Bucket to, long amount) { return from.get() .zipWith(to.get(), (fromBalance, toBalance) -> Tuples.of(fromBalance, toBalance)) .flatMap(tuple -> { if (tuple.getT1() >= amount) { from.set(tuple.getT1() - amount); to.set(tuple.getT2() + amount); } return Mono.empty(); }); }
- Issue: If an error occurs after debiting the first account, the second account might not be credited, leading to inconsistency.
Simulated Error Scenario
- We simulate an error in the pipeline:
.map(balance -> { if (someCondition) throw new RuntimeException("Simulated error"); return balance; });
- Without a transaction:
- User 1 may have decreased balance.
- User 2 may not receive the amount.
- Data is inconsistent.
3. Transactional Approach
Step 1: Create a Transaction Object
- Redis allows creating a transaction object with optional transaction options:
TransactionOptions options = TransactionOptions.defaults(); ReactiveTransaction transaction = redisClient.createTransaction(options);
- Transaction options can specify:
- Timeout
- Retry policies
- Rollback behavior on failure
Step 2: Create Buckets for Users
- Buckets represent reactive access to Redis keys:
RBucketReactive<Long> userOneBucket = transaction.getBucket("user:1", Long.class); RBucketReactive<Long> userTwoBucket = transaction.getBucket("user:2", Long.class);
Step 3: Transfer Logic Within Transaction
- Use the same logic as before but inside the transaction:
public Mono<Void> transferTransactional(RBucketReactive<Long> from, RBucketReactive<Long> to, long amount) { return from.get() .zipWith(to.get(), (fromBalance, toBalance) -> Tuples.of(fromBalance, toBalance)) .flatMap(tuple -> { if (tuple.getT1() >= amount) { from.set(tuple.getT1() - amount); to.set(tuple.getT2() + amount); } return Mono.empty(); }); }
Step 4: Commit or Rollback
- If any operation fails, the transaction is rolled back automatically.
- If all operations succeed, commit:
transaction.execute() .doOnError(e -> { System.out.println("Transaction failed, rolling back"); transaction.rollback(); }) .subscribe();
4. Example Test Case
- Data Setup:
userOneBucket.set(100L).block(); userTwoBucket.set(0L).block();
- Transfer 50 units:
transferTransactional(userOneBucket, userTwoBucket, 50L).block();
- Verify balances:
userOneBucket.get().subscribe(balance -> System.out.println("User 1 Balance: " + balance)); userTwoBucket.get().subscribe(balance -> System.out.println("User 2 Balance: " + balance));
- Output:
User 1 Balance: 50 User 2 Balance: 50
- Simulate Error:
- If an error occurs mid-transfer, the balances remain unchanged, ensuring consistency.
5. Summary
- Transactions in Redis guarantee atomicity: either all operations succeed or none.
- Non-transactional operations may lead to inconsistent states.
- Key steps for using transactions:
- Create a transaction object with options.
- Create reactive buckets for keys.
- Perform operations inside the transaction.
- Commit if successful, rollback on error.
- Use reactive programming for asynchronous, non-blocking transaction handling.