Redisson supports reactive and blocking lists/deques, which can be leveraged to implement a message queue. This allows multiple producers and consumers to communicate asynchronously, similar to a microservice messaging system.
In this tutorial, we will cover:
- Setting up a Redisson message queue.
- Implementing a producer that adds messages.
- Implementing multiple consumers that consume messages.
- Observing the behavior in a concurrent environment.
1. Setting Up the Message Queue
We will use a blocking deque to simulate a message queue.
private RBlockingDeque<Long> messageQueue; @BeforeEach void setup() { messageQueue = redisson.getBlockingDeque("messageQueue"); messageQueue.clear(); // Ensure the queue is empty before starting }
Explanation:
RBlockingDeque<Long>
supports blocking operations liketake()
andput()
.- Blocking deque allows consumers to wait until a new item is available.
- We clear the queue at the start to avoid leftover items from previous runs.
2. Implementing a Consumer
A consumer continuously polls items from the queue and processes them.
public void consumerOne() { Flux.interval(Duration.ofMillis(500)) .flatMap(i -> Mono.fromCallable(() -> messageQueue.take())) // Blocking take .doOnNext(item -> System.out.println("Consumer 1 received: " + item)) .subscribe(); }
Explanation:
messageQueue.take()
blocks until an item is available.Flux.interval
simulates a polling loop for reactive behavior.- Multiple consumers can independently consume items from the same queue.
- Items are processed as they arrive; in real applications, this could be payments, reports, or tasks.
Adding a Second Consumer
public void consumerTwo() { Flux.interval(Duration.ofMillis(500)) .flatMap(i -> Mono.fromCallable(() -> messageQueue.take())) .doOnNext(item -> System.out.println("Consumer 2 received: " + item)) .subscribe(); }
- When the second consumer starts, it competes with the first consumer.
- Items are distributed between consumers automatically.
- Example: if the queue has 1, 2, 3, 4:
- Consumer 1 might get 1 and 3
- Consumer 2 might get 2 and 4
This is similar to load balancing in a microservice setup.
3. Implementing a Producer
The producer continuously adds items to the queue:
public void producer() { Flux.range(1, 100_000) .delayElements(Duration.ofMillis(500)) // Simulate some delay .flatMap(i -> Mono.fromCallable(() -> { messageQueue.put(i.longValue()); return i; })) .doOnNext(i -> System.out.println("Produced: " + i)) .subscribe(); }
Explanation:
put()
adds items to the blocking deque.delayElements()
simulates a realistic production rate.- Flux range can be configured for the number of messages to produce.
4. Running the System
- Start the producer.
- Start consumer 1.
- Optionally start consumer 2 (or more consumers).
Behavior:
- Consumers wait if the queue is empty.
- As the producer adds messages, consumers process them in real-time.
- Multiple consumers share the load automatically.
Example Flow:
- Producer adds 1, 2, 3, 4…
- Consumer 1 receives 1, 3, 5…
- Consumer 2 receives 2, 4, 6…
- System continues until all messages are consumed.
5. Observations
- Blocking behavior: Consumers wait for items if the queue is empty.
- Load balancing: Multiple consumers share messages automatically.
- Scalability: You can add more consumers (or microservice instances) dynamically.
- Real-life use cases: Payment processing, report generation, task processing, event-driven microservices.
6. Summary
- Redisson blocking lists/deques can act as a message queue.
- Producer: Adds items using
put()
. - Consumer: Consumes items using
take()
orpoll()
. - Multiple consumers: Enable parallel processing and load balancing.
- Reactive simulation: Use
Flux
orMono
to integrate with reactive streams.
This approach allows microservices or threads to communicate asynchronously without any extra messaging infrastructure, provided Redis is up and running.