Redisson provides reactive implementations for Redis data structures. Among them, RListReactive allows reactive interaction with Redis lists, enabling asynchronous operations, backpressure handling, and non-blocking I/O.
This tutorial focuses on:
- Creating a reactive list.
- Adding elements asynchronously.
- Handling insertion order in reactive programming.
- Bulk insertion to maintain order.
1. Setting Up a Reactive List
First, you need a reactive Redisson client and a reactive list:
import org.redisson.api.RListReactive;
import org.redisson.api.RedissonClient;
import reactor.core.publisher.Flux;
import reactor.test.StepVerifier;
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.LongStream;
public class ReactiveListExample {
private RListReactive<Long> numberList;
public void setup(RedissonClient redisson) {
numberList = redisson.getList("noInputList");
}
}
Explanation:
RListReactive<Long>– reactive list withLongelements.noInputList– Redis key for the list.
2. Adding Elements Reactively
Method 1: Using flatMap (Reactive Insertion)
Flux.range(0, 10)
.flatMap(i -> numberList.add((long) i))
.then()
.subscribe();
Explanation:
Flux.range(0, 10)generates numbers 0 to 9.- Each number is sent as a separate reactive insert.
flatMapexecutes all inserts concurrently.- Network timing may cause out-of-order inserts in Redis.
Key point:
- In reactive programming, operations are asynchronous. Even though the logical sequence is 0 → 9, the actual insertion order may vary due to concurrency and network latency.
- This is not a bug; it’s a normal behavior of reactive streams.
Checking List Size
StepVerifier.create(numberList.size())
.expectNext(10L)
.verifyComplete();
- Ensures that all 10 elements are added successfully.
StepVerifieris useful for testing reactive flows.
3. Observing Insertion Order Issue
If you try to read all elements:
numberList.range(0, -1)
.subscribe(System.out::println);
- You may not see numbers in the expected order.
- Reason:
flatMapsends multiple reactive insertions concurrently. - Elements are inserted nearly simultaneously, and Redis receives them asynchronously.
Solution: Use bulk insertion for maintaining order.
4. Bulk Insertion to Preserve Order
Method 2: Using a Single Collection Insert
List<Long> longList = LongStream.rangeClosed(1, 10)
.boxed()
.collect(Collectors.toList());
// Clear existing list
numberList.delete().block();
// Add all elements in a single call
numberList.addAll(longList).block();
Explanation:
LongStream.rangeClosed(1, 10)generates 1–10 in order.addAllinserts all elements in a single Redis operation.- Ensures the order is preserved as intended.
block()is used here for demonstration/testing purposes to wait for completion.
5. Verifying Correct Order
StepVerifier.create(numberList.range(0, -1))
.expectNext(1L, 2L, 3L, 4L, 5L, 6L, 7L, 8L, 9L, 10L)
.verifyComplete();
- Confirms that elements are stored in the intended sequence.
- Useful for tests and demonstrations.
6. Key Takeaways
- Reactive insertion may be concurrent:
UsingflatMapor multiple asynchronous inserts can cause elements to appear out-of-order in Redis. - Bulk insert preserves order:
UseaddAllto insert a collection atomically while maintaining the sequence. - StepVerifier for testing:
Always verify reactive flows usingStepVerifierto ensure correct behavior and completion. - Reactive lists are asynchronous:
Operations are non-blocking; completion is not immediate. Always subscribe or block in tests to observe results.
