Learnitweb

Working with Reactive Lists in Redisson

Redisson provides reactive implementations for Redis data structures. Among them, RListReactive allows reactive interaction with Redis lists, enabling asynchronous operations, backpressure handling, and non-blocking I/O.

This tutorial focuses on:

  • Creating a reactive list.
  • Adding elements asynchronously.
  • Handling insertion order in reactive programming.
  • Bulk insertion to maintain order.

1. Setting Up a Reactive List

First, you need a reactive Redisson client and a reactive list:

import org.redisson.api.RListReactive;
import org.redisson.api.RedissonClient;
import reactor.core.publisher.Flux;
import reactor.test.StepVerifier;

import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.LongStream;

public class ReactiveListExample {

    private RListReactive<Long> numberList;

    public void setup(RedissonClient redisson) {
        numberList = redisson.getList("noInputList");
    }
}

Explanation:

  • RListReactive<Long> – reactive list with Long elements.
  • noInputList – Redis key for the list.

2. Adding Elements Reactively

Method 1: Using flatMap (Reactive Insertion)

Flux.range(0, 10)
    .flatMap(i -> numberList.add((long) i))
    .then()
    .subscribe();

Explanation:

  • Flux.range(0, 10) generates numbers 0 to 9.
  • Each number is sent as a separate reactive insert.
  • flatMap executes all inserts concurrently.
  • Network timing may cause out-of-order inserts in Redis.

Key point:

  • In reactive programming, operations are asynchronous. Even though the logical sequence is 0 → 9, the actual insertion order may vary due to concurrency and network latency.
  • This is not a bug; it’s a normal behavior of reactive streams.

Checking List Size

StepVerifier.create(numberList.size())
    .expectNext(10L)
    .verifyComplete();
  • Ensures that all 10 elements are added successfully.
  • StepVerifier is useful for testing reactive flows.

3. Observing Insertion Order Issue

If you try to read all elements:

numberList.range(0, -1)
    .subscribe(System.out::println);
  • You may not see numbers in the expected order.
  • Reason: flatMap sends multiple reactive insertions concurrently.
  • Elements are inserted nearly simultaneously, and Redis receives them asynchronously.

Solution: Use bulk insertion for maintaining order.


4. Bulk Insertion to Preserve Order

Method 2: Using a Single Collection Insert

List<Long> longList = LongStream.rangeClosed(1, 10)
                                .boxed()
                                .collect(Collectors.toList());

// Clear existing list
numberList.delete().block();

// Add all elements in a single call
numberList.addAll(longList).block();

Explanation:

  • LongStream.rangeClosed(1, 10) generates 1–10 in order.
  • addAll inserts all elements in a single Redis operation.
  • Ensures the order is preserved as intended.
  • block() is used here for demonstration/testing purposes to wait for completion.

5. Verifying Correct Order

StepVerifier.create(numberList.range(0, -1))
    .expectNext(1L, 2L, 3L, 4L, 5L, 6L, 7L, 8L, 9L, 10L)
    .verifyComplete();
  • Confirms that elements are stored in the intended sequence.
  • Useful for tests and demonstrations.

6. Key Takeaways

  1. Reactive insertion may be concurrent:
    Using flatMap or multiple asynchronous inserts can cause elements to appear out-of-order in Redis.
  2. Bulk insert preserves order:
    Use addAll to insert a collection atomically while maintaining the sequence.
  3. StepVerifier for testing:
    Always verify reactive flows using StepVerifier to ensure correct behavior and completion.
  4. Reactive lists are asynchronous:
    Operations are non-blocking; completion is not immediate. Always subscribe or block in tests to observe results.