The PriorityBlockingQueue
in Java is a thread-safe, unbounded implementation of the BlockingQueue
interface. It orders elements according to their natural ordering or by a Comparator
provided at construction time. Unlike PriorityQueue
, it’s designed for use in concurrent environments, where multiple threads might be adding and removing elements simultaneously.
Key Characteristics
- Thread-Safety: It achieves thread-safety using a reentrant lock internally, ensuring that only one thread can modify the queue at a time.
- Blocking Operations: Like other
BlockingQueue
implementations, it provides methods that wait for the queue to become non-empty when retrieving an element, or wait for space to become available when adding an element (though it’s unbounded, so adding never blocks). - Unbounded: The capacity is not fixed; it can grow as needed. This means that
add
andoffer
operations will never block. - Priority-Based Ordering: Elements are ordered based on priority, with the head of the queue being the element with the highest priority.
- No Nulls: It does not permit
null
elements.
Creating a PriorityBlockingQueue
You can create a PriorityBlockingQueue
in a few ways, similar to a PriorityQueue
.
1. Default Constructor
This creates a PriorityBlockingQueue
with a default initial capacity (11) that orders elements according to their natural ordering.
import java.util.concurrent.PriorityBlockingQueue; // For integers, smallest number has the highest priority PriorityBlockingQueue<Integer> pq = new PriorityBlockingQueue<>();
2. Specifying Initial Capacity
You can set an initial capacity to avoid resizing, but remember, the queue is unbounded and will grow if needed.
PriorityBlockingQueue<Integer> pqWithCapacity = new PriorityBlockingQueue<>(20);
3. Using a Custom Comparator
To define a custom ordering, you can provide a Comparator
in the constructor. This is essential for creating a max-heap or for ordering custom objects.
import java.util.Collections; // Creates a max-heap where the largest number has the highest priority PriorityBlockingQueue<Integer> maxHeap = new PriorityBlockingQueue<>(11, Collections.reverseOrder()); // Using a lambda for a max-heap PriorityBlockingQueue<Integer> customPq = new PriorityBlockingQueue<>(11, (a, b) -> b - a);
Example: Producer-Consumer Model
A classic use case for PriorityBlockingQueue
is in a multi-threaded producer-consumer scenario where tasks have different priorities.
import java.util.concurrent.PriorityBlockingQueue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; class Task implements Comparable<Task> { private String name; private int priority; public Task(String name, int priority) { this.name = name; this.priority = priority; } @Override public int compareTo(Task other) { // Lower priority number means higher priority (min-heap behavior) return Integer.compare(this.priority, other.priority); } @Override public String toString() { return "Task{" + "name='" + name + "', priority=" + priority + '}'; } } class Producer implements Runnable { private final BlockingQueue<Task> queue; public Producer(BlockingQueue<Task> queue) { this.queue = queue; } @Override public void run() { try { System.out.println("Producer adding tasks..."); queue.put(new Task("Deploy App", 1)); queue.put(new Task("Check Logs", 3)); queue.put(new Task("Run Test Suite", 2)); queue.put(new Task("Critical Bug Fix", 0)); // Highest priority System.out.println("Producer finished adding tasks."); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } } } class Consumer implements Runnable { private final BlockingQueue<Task> queue; public Consumer(BlockingQueue<Task> queue) { this.queue = queue; } @Override public void run() { try { while (true) { Task task = queue.take(); // Waits until a task is available System.out.println("Processing: " + task); Thread.sleep(1000); // Simulate work } } catch (InterruptedException e) { Thread.currentThread().interrupt(); } } } public class PriorityBlockingQueueExample { public static void main(String[] args) throws InterruptedException { BlockingQueue<Task> taskQueue = new PriorityBlockingQueue<>(); ExecutorService executor = Executors.newFixedThreadPool(2); // Start producer and consumer threads executor.submit(new Producer(taskQueue)); executor.submit(new Consumer(taskQueue)); // Let the threads run for a bit executor.shutdown(); executor.awaitTermination(5, TimeUnit.SECONDS); } }
Expected Output
The output will show the tasks being processed in priority order, even though they were added out of order:
Producer adding tasks... Producer finished adding tasks. Processing: Task{name='Critical Bug Fix', priority=0} Processing: Task{name='Deploy App', priority=1} Processing: Task{name='Run Test Suite', priority=2} Processing: Task{name='Check Logs', priority=3}
Internal Structure and Implementation
PriorityBlockingQueue
uses a binary heap, just like PriorityQueue
. The difference lies in its concurrency handling. It wraps the heap with a ReentrantLock
and a Condition
to manage blocking and waking up threads. The take()
method, for example, acquires the lock and then waits on a condition variable if the queue is empty. When a new element is added via put()
, the adding thread acquires the lock, inserts the element, and then signals the condition to wake up any waiting consumer threads. This locking mechanism ensures that multiple threads can’t corrupt the heap’s state simultaneously, making it safe for concurrent use.