Learnitweb

BlockingQueue in Java

1. Introduction

In this tutorial we’ll discuss BlockingQueue interface which is defined in java.util.concurrent package.

BlockingQueue is a subinterface of java.util.Queue that also supports blocking operations that wait for the queue to become nonempty before retrieving an element and wait for space to become available in the queue before storing an element.

BlockingQueue is typically used in producer-consumer problem. It is a collections which can be used safely with multiple threads.

BlockingQueue has Collection, Iterable and Queue as Superinterfaces. Methods which BlockingQueue has from Queue are thread-safe. However, the methods which BlockingQueue inherits from Collection are not necessarily atomic. However, this depends on the implementation of methods in the implementing class of BlockingQueue.

A BlockingQueue does not accept null elements. NullPointerException is thrown if null is used with add, put and offer.

2. Implementing classes of BlockingQueue interface

Following are all known implementing classes of BlockingQueue interface:

  • ArrayBlockingQueue
  • DelayQueue
  • LinkedBlockingDeque
  • LinkedBlockingQueue
  • LinkedTransferQueue
  • PriorityBlockingQueue
  • SynchronousQueue

3. Methods in BlockingQueue

As mentioned in official documentation of Java, BlockingQueue methods have four forms:

  1. throws exception if the attempted operation is not possible or unsuccessful.
  2. returns a special value (either null or false based on the operation).
  3. blocks the current thread indefinitely until the operation can succeed.
  4. If the attempted operation is not possible immediately, it blocks but waits no longer than the given timeout.

The same can be summarized like this:

Throws exceptionSpecial valueBlocksTimes out
Insert add(e)offer(e)put(e)offer(e, time, unit)
Removeremove()poll()take()poll(time, unit)
Examineelement()peek()not applicablenot applicable

3.1 BlockingQueue methods example

In this example, you’ll see use of some common methods.

package com.learnitweb;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;

public class BlockingQueueExample {

	public static void main(String args[]) throws InterruptedException {
		BlockingQueue<String> blockingQueue = new LinkedBlockingQueue<>();

		// add elements
		blockingQueue.add("one");
		blockingQueue.add("two");
		blockingQueue.add("three");

		System.out.println("blockingQueue after insert: " + blockingQueue);

		// retrieve and remove head of queue
		blockingQueue.remove();

		System.out.println("blockingQueue after remove: " + blockingQueue);

		// retrieve but not remove head
		System.out.println("head of blockingQueue: " + blockingQueue.peek());
		System.out.println("blockingQueue after peek: " + blockingQueue);

		// retrieve and remove head
		blockingQueue.poll();

		System.out.println("blockingQueue after poll: " + blockingQueue);
	}
}

4. Create BlockingQueue

Creating a BlockingQueue is simple. Suppose if you want to use LinkedBlockingQueue, you can do it like this:

BlockingQueue<Integer> q = new LinkedBlockingQueue<>();

Here LinkedBlockingQueue is the implementation of BlockingQueue interface.

5. Unbounded and bounded queue

BlockingQueue can be categorized in two types:

5.1 Unbounded queue

Queue which does not have capacity constraint. For unbounded queue, the remaining capacity is always Integer.MAX_VALUE. It means operation to add element to the unbounded will never block.

Creating an unbounded BlockingQueue is simple, do not provide any capacity while creating instance.

BlockingQueue<Integer> q = new LinkedBlockingQueue<>();

5.2 Bounded queue

You can create a bounded queue by passing the capacity as an argument to the constructor of the implementing class. In bounded queue, if the queue is full, depending on a method that was used to add it (offer(), add() or put()), it will block until space is available to add the object. Otherwise, the operations will fail.

BlockingQueue<Integer> q = new LinkedBlockingQueue<>(10);

6. Multithreaded producer-consumer example with BlockingQueue

We’ll now show the use of BlockingQueue with the help of an example. A common use of BlockingQueue is in producer-consumer problem. In this example, we’ll produce random numbers and create two threads to consume it. Note that BlockingQueue can safely be used with multiple producers and multiple consumers. We are passing instance of LinkedBlockingQueue to both producer and consumer.

Let us now write code for producer.

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ThreadLocalRandom;

public class Producer implements Runnable {
	private BlockingQueue<Integer> numbersQueue;

	public Producer(BlockingQueue<Integer> numbersQueue) {
		this.numbersQueue = numbersQueue;
	}

	public void run() {
		try {
			generateNumbers();
		} catch (InterruptedException e) {
			Thread.currentThread().interrupt();
		}
	}

	private void generateNumbers() throws InterruptedException {
		for (int i = 0; i < 100; i++) {
			numbersQueue.put(ThreadLocalRandom.current().nextInt(100));
		}
	}
}

Let us now write our consumer.

import java.util.concurrent.BlockingQueue;

public class Consumer implements Runnable {
	private BlockingQueue<Integer> queue;

	public Consumer(BlockingQueue<Integer> queue) {
		this.queue = queue;
	}

	public void run() {
		try {
			while (true) {
				Integer number = queue.take();
				System.out.println(Thread.currentThread().getName() + " result: " + number);
			}
		} catch (InterruptedException e) {
			Thread.currentThread().interrupt();
		}
	}
}

Following is the service class.

public class ProducerConsumerExample {

	public static void main(String[] args) {
		BlockingQueue<Integer> q = new LinkedBlockingQueue<>();
		Producer p = new Producer(q);
		Consumer c1 = new Consumer(q);
		Consumer c2 = new Consumer(q);
		new Thread(p).start();
		new Thread(c1).start();
		new Thread(c2).start();

	}
}

7. Conclusion

In this tutorial, we discussed about BlockingQueue. We discussed how to create a BlockingQueue and use it to solve producer-consumer problem.