Redisson provides reactive implementations for Redis data structures. Among them, RListReactive
allows reactive interaction with Redis lists, enabling asynchronous operations, backpressure handling, and non-blocking I/O.
This tutorial focuses on:
- Creating a reactive list.
- Adding elements asynchronously.
- Handling insertion order in reactive programming.
- Bulk insertion to maintain order.
1. Setting Up a Reactive List
First, you need a reactive Redisson client and a reactive list:
import org.redisson.api.RListReactive; import org.redisson.api.RedissonClient; import reactor.core.publisher.Flux; import reactor.test.StepVerifier; import java.util.List; import java.util.stream.Collectors; import java.util.stream.LongStream; public class ReactiveListExample { private RListReactive<Long> numberList; public void setup(RedissonClient redisson) { numberList = redisson.getList("noInputList"); } }
Explanation:
RListReactive<Long>
– reactive list withLong
elements.noInputList
– Redis key for the list.
2. Adding Elements Reactively
Method 1: Using flatMap
(Reactive Insertion)
Flux.range(0, 10) .flatMap(i -> numberList.add((long) i)) .then() .subscribe();
Explanation:
Flux.range(0, 10)
generates numbers 0 to 9.- Each number is sent as a separate reactive insert.
flatMap
executes all inserts concurrently.- Network timing may cause out-of-order inserts in Redis.
Key point:
- In reactive programming, operations are asynchronous. Even though the logical sequence is 0 → 9, the actual insertion order may vary due to concurrency and network latency.
- This is not a bug; it’s a normal behavior of reactive streams.
Checking List Size
StepVerifier.create(numberList.size()) .expectNext(10L) .verifyComplete();
- Ensures that all 10 elements are added successfully.
StepVerifier
is useful for testing reactive flows.
3. Observing Insertion Order Issue
If you try to read all elements:
numberList.range(0, -1) .subscribe(System.out::println);
- You may not see numbers in the expected order.
- Reason:
flatMap
sends multiple reactive insertions concurrently. - Elements are inserted nearly simultaneously, and Redis receives them asynchronously.
Solution: Use bulk insertion for maintaining order.
4. Bulk Insertion to Preserve Order
Method 2: Using a Single Collection Insert
List<Long> longList = LongStream.rangeClosed(1, 10) .boxed() .collect(Collectors.toList()); // Clear existing list numberList.delete().block(); // Add all elements in a single call numberList.addAll(longList).block();
Explanation:
LongStream.rangeClosed(1, 10)
generates 1–10 in order.addAll
inserts all elements in a single Redis operation.- Ensures the order is preserved as intended.
block()
is used here for demonstration/testing purposes to wait for completion.
5. Verifying Correct Order
StepVerifier.create(numberList.range(0, -1)) .expectNext(1L, 2L, 3L, 4L, 5L, 6L, 7L, 8L, 9L, 10L) .verifyComplete();
- Confirms that elements are stored in the intended sequence.
- Useful for tests and demonstrations.
6. Key Takeaways
- Reactive insertion may be concurrent:
UsingflatMap
or multiple asynchronous inserts can cause elements to appear out-of-order in Redis. - Bulk insert preserves order:
UseaddAll
to insert a collection atomically while maintaining the sequence. - StepVerifier for testing:
Always verify reactive flows usingStepVerifier
to ensure correct behavior and completion. - Reactive lists are asynchronous:
Operations are non-blocking; completion is not immediate. Always subscribe or block in tests to observe results.