Redisson supports reactive and blocking lists/deques, which can be leveraged to implement a message queue. This allows multiple producers and consumers to communicate asynchronously, similar to a microservice messaging system.
In this tutorial, we will cover:
- Setting up a Redisson message queue.
- Implementing a producer that adds messages.
- Implementing multiple consumers that consume messages.
- Observing the behavior in a concurrent environment.
1. Setting Up the Message Queue
We will use a blocking deque to simulate a message queue.
private RBlockingDeque<Long> messageQueue;
@BeforeEach
void setup() {
messageQueue = redisson.getBlockingDeque("messageQueue");
messageQueue.clear(); // Ensure the queue is empty before starting
}
Explanation:
RBlockingDeque<Long>supports blocking operations liketake()andput().- Blocking deque allows consumers to wait until a new item is available.
- We clear the queue at the start to avoid leftover items from previous runs.
2. Implementing a Consumer
A consumer continuously polls items from the queue and processes them.
public void consumerOne() {
Flux.interval(Duration.ofMillis(500))
.flatMap(i -> Mono.fromCallable(() -> messageQueue.take())) // Blocking take
.doOnNext(item -> System.out.println("Consumer 1 received: " + item))
.subscribe();
}
Explanation:
messageQueue.take()blocks until an item is available.Flux.intervalsimulates a polling loop for reactive behavior.- Multiple consumers can independently consume items from the same queue.
- Items are processed as they arrive; in real applications, this could be payments, reports, or tasks.
Adding a Second Consumer
public void consumerTwo() {
Flux.interval(Duration.ofMillis(500))
.flatMap(i -> Mono.fromCallable(() -> messageQueue.take()))
.doOnNext(item -> System.out.println("Consumer 2 received: " + item))
.subscribe();
}
- When the second consumer starts, it competes with the first consumer.
- Items are distributed between consumers automatically.
- Example: if the queue has 1, 2, 3, 4:
- Consumer 1 might get 1 and 3
- Consumer 2 might get 2 and 4
This is similar to load balancing in a microservice setup.
3. Implementing a Producer
The producer continuously adds items to the queue:
public void producer() {
Flux.range(1, 100_000)
.delayElements(Duration.ofMillis(500)) // Simulate some delay
.flatMap(i -> Mono.fromCallable(() -> {
messageQueue.put(i.longValue());
return i;
}))
.doOnNext(i -> System.out.println("Produced: " + i))
.subscribe();
}
Explanation:
put()adds items to the blocking deque.delayElements()simulates a realistic production rate.- Flux range can be configured for the number of messages to produce.
4. Running the System
- Start the producer.
- Start consumer 1.
- Optionally start consumer 2 (or more consumers).
Behavior:
- Consumers wait if the queue is empty.
- As the producer adds messages, consumers process them in real-time.
- Multiple consumers share the load automatically.
Example Flow:
- Producer adds 1, 2, 3, 4…
- Consumer 1 receives 1, 3, 5…
- Consumer 2 receives 2, 4, 6…
- System continues until all messages are consumed.
5. Observations
- Blocking behavior: Consumers wait for items if the queue is empty.
- Load balancing: Multiple consumers share messages automatically.
- Scalability: You can add more consumers (or microservice instances) dynamically.
- Real-life use cases: Payment processing, report generation, task processing, event-driven microservices.
6. Summary
- Redisson blocking lists/deques can act as a message queue.
- Producer: Adds items using
put(). - Consumer: Consumes items using
take()orpoll(). - Multiple consumers: Enable parallel processing and load balancing.
- Reactive simulation: Use
FluxorMonoto integrate with reactive streams.
This approach allows microservices or threads to communicate asynchronously without any extra messaging infrastructure, provided Redis is up and running.
