Learnitweb

PriorityBlockingQueue in Java

The PriorityBlockingQueue in Java is a thread-safe, unbounded implementation of the BlockingQueue interface. It orders elements according to their natural ordering or by a Comparator provided at construction time. Unlike PriorityQueue, it’s designed for use in concurrent environments, where multiple threads might be adding and removing elements simultaneously.

Key Characteristics

  • Thread-Safety: It achieves thread-safety using a reentrant lock internally, ensuring that only one thread can modify the queue at a time.
  • Blocking Operations: Like other BlockingQueue implementations, it provides methods that wait for the queue to become non-empty when retrieving an element, or wait for space to become available when adding an element (though it’s unbounded, so adding never blocks).
  • Unbounded: The capacity is not fixed; it can grow as needed. This means that add and offer operations will never block.
  • Priority-Based Ordering: Elements are ordered based on priority, with the head of the queue being the element with the highest priority.
  • No Nulls: It does not permit null elements.

Creating a PriorityBlockingQueue

You can create a PriorityBlockingQueue in a few ways, similar to a PriorityQueue.

1. Default Constructor

This creates a PriorityBlockingQueue with a default initial capacity (11) that orders elements according to their natural ordering.

import java.util.concurrent.PriorityBlockingQueue;

// For integers, smallest number has the highest priority
PriorityBlockingQueue<Integer> pq = new PriorityBlockingQueue<>();

2. Specifying Initial Capacity

You can set an initial capacity to avoid resizing, but remember, the queue is unbounded and will grow if needed.

PriorityBlockingQueue<Integer> pqWithCapacity = new PriorityBlockingQueue<>(20);

3. Using a Custom Comparator

To define a custom ordering, you can provide a Comparator in the constructor. This is essential for creating a max-heap or for ordering custom objects.

import java.util.Collections;

// Creates a max-heap where the largest number has the highest priority
PriorityBlockingQueue<Integer> maxHeap = new PriorityBlockingQueue<>(11, Collections.reverseOrder());

// Using a lambda for a max-heap
PriorityBlockingQueue<Integer> customPq = new PriorityBlockingQueue<>(11, (a, b) -> b - a);

Example: Producer-Consumer Model

A classic use case for PriorityBlockingQueue is in a multi-threaded producer-consumer scenario where tasks have different priorities.

import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

class Task implements Comparable<Task> {
    private String name;
    private int priority;

    public Task(String name, int priority) {
        this.name = name;
        this.priority = priority;
    }

    @Override
    public int compareTo(Task other) {
        // Lower priority number means higher priority (min-heap behavior)
        return Integer.compare(this.priority, other.priority);
    }

    @Override
    public String toString() {
        return "Task{" + "name='" + name + "', priority=" + priority + '}';
    }
}

class Producer implements Runnable {
    private final BlockingQueue<Task> queue;

    public Producer(BlockingQueue<Task> queue) {
        this.queue = queue;
    }

    @Override
    public void run() {
        try {
            System.out.println("Producer adding tasks...");
            queue.put(new Task("Deploy App", 1));
            queue.put(new Task("Check Logs", 3));
            queue.put(new Task("Run Test Suite", 2));
            queue.put(new Task("Critical Bug Fix", 0)); // Highest priority
            System.out.println("Producer finished adding tasks.");
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }
}

class Consumer implements Runnable {
    private final BlockingQueue<Task> queue;

    public Consumer(BlockingQueue<Task> queue) {
        this.queue = queue;
    }

    @Override
    public void run() {
        try {
            while (true) {
                Task task = queue.take(); // Waits until a task is available
                System.out.println("Processing: " + task);
                Thread.sleep(1000); // Simulate work
            }
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }
}

public class PriorityBlockingQueueExample {
    public static void main(String[] args) throws InterruptedException {
        BlockingQueue<Task> taskQueue = new PriorityBlockingQueue<>();

        ExecutorService executor = Executors.newFixedThreadPool(2);

        // Start producer and consumer threads
        executor.submit(new Producer(taskQueue));
        executor.submit(new Consumer(taskQueue));

        // Let the threads run for a bit
        executor.shutdown();
        executor.awaitTermination(5, TimeUnit.SECONDS);
    }
}

Expected Output

The output will show the tasks being processed in priority order, even though they were added out of order:

Producer adding tasks...
Producer finished adding tasks.
Processing: Task{name='Critical Bug Fix', priority=0}
Processing: Task{name='Deploy App', priority=1}
Processing: Task{name='Run Test Suite', priority=2}
Processing: Task{name='Check Logs', priority=3}

Internal Structure and Implementation

PriorityBlockingQueue uses a binary heap, just like PriorityQueue. The difference lies in its concurrency handling. It wraps the heap with a ReentrantLock and a Condition to manage blocking and waking up threads. The take() method, for example, acquires the lock and then waits on a condition variable if the queue is empty. When a new element is added via put(), the adding thread acquires the lock, inserts the element, and then signals the condition to wake up any waiting consumer threads. This locking mechanism ensures that multiple threads can’t corrupt the heap’s state simultaneously, making it safe for concurrent use.