Learnitweb

Using Redisson Lists as a Message Queue (Producer-Consumer Pattern)

Redisson supports reactive and blocking lists/deques, which can be leveraged to implement a message queue. This allows multiple producers and consumers to communicate asynchronously, similar to a microservice messaging system.

In this tutorial, we will cover:

  • Setting up a Redisson message queue.
  • Implementing a producer that adds messages.
  • Implementing multiple consumers that consume messages.
  • Observing the behavior in a concurrent environment.

1. Setting Up the Message Queue

We will use a blocking deque to simulate a message queue.

private RBlockingDeque<Long> messageQueue;

@BeforeEach
void setup() {
    messageQueue = redisson.getBlockingDeque("messageQueue");
    messageQueue.clear(); // Ensure the queue is empty before starting
}

Explanation:

  • RBlockingDeque<Long> supports blocking operations like take() and put().
  • Blocking deque allows consumers to wait until a new item is available.
  • We clear the queue at the start to avoid leftover items from previous runs.

2. Implementing a Consumer

A consumer continuously polls items from the queue and processes them.

public void consumerOne() {
    Flux.interval(Duration.ofMillis(500))
        .flatMap(i -> Mono.fromCallable(() -> messageQueue.take())) // Blocking take
        .doOnNext(item -> System.out.println("Consumer 1 received: " + item))
        .subscribe();
}

Explanation:

  • messageQueue.take() blocks until an item is available.
  • Flux.interval simulates a polling loop for reactive behavior.
  • Multiple consumers can independently consume items from the same queue.
  • Items are processed as they arrive; in real applications, this could be payments, reports, or tasks.

Adding a Second Consumer

public void consumerTwo() {
    Flux.interval(Duration.ofMillis(500))
        .flatMap(i -> Mono.fromCallable(() -> messageQueue.take()))
        .doOnNext(item -> System.out.println("Consumer 2 received: " + item))
        .subscribe();
}
  • When the second consumer starts, it competes with the first consumer.
  • Items are distributed between consumers automatically.
  • Example: if the queue has 1, 2, 3, 4:
    • Consumer 1 might get 1 and 3
    • Consumer 2 might get 2 and 4

This is similar to load balancing in a microservice setup.


3. Implementing a Producer

The producer continuously adds items to the queue:

public void producer() {
    Flux.range(1, 100_000)
        .delayElements(Duration.ofMillis(500)) // Simulate some delay
        .flatMap(i -> Mono.fromCallable(() -> {
            messageQueue.put(i.longValue());
            return i;
        }))
        .doOnNext(i -> System.out.println("Produced: " + i))
        .subscribe();
}

Explanation:

  • put() adds items to the blocking deque.
  • delayElements() simulates a realistic production rate.
  • Flux range can be configured for the number of messages to produce.

4. Running the System

  1. Start the producer.
  2. Start consumer 1.
  3. Optionally start consumer 2 (or more consumers).

Behavior:

  • Consumers wait if the queue is empty.
  • As the producer adds messages, consumers process them in real-time.
  • Multiple consumers share the load automatically.

Example Flow:

  • Producer adds 1, 2, 3, 4…
  • Consumer 1 receives 1, 3, 5…
  • Consumer 2 receives 2, 4, 6…
  • System continues until all messages are consumed.

5. Observations

  • Blocking behavior: Consumers wait for items if the queue is empty.
  • Load balancing: Multiple consumers share messages automatically.
  • Scalability: You can add more consumers (or microservice instances) dynamically.
  • Real-life use cases: Payment processing, report generation, task processing, event-driven microservices.

6. Summary

  • Redisson blocking lists/deques can act as a message queue.
  • Producer: Adds items using put().
  • Consumer: Consumes items using take() or poll().
  • Multiple consumers: Enable parallel processing and load balancing.
  • Reactive simulation: Use Flux or Mono to integrate with reactive streams.

This approach allows microservices or threads to communicate asynchronously without any extra messaging infrastructure, provided Redis is up and running.