在之前的《Java 中的线程池》一文中,在介绍线程池参数的时候,其中BlockingQueue<Runnable> workQueue
用于存储等待执行的任务,提到了几种不同类型的等待队列。本文将详细的介绍阻塞队列的实现原理以及各自的使用方法。
概述
在并发编程中,有时候需要使用到线程安全的队列。实现一个线程安全的队列有两种方式:阻塞算法
和非阻塞算法
。
- 阻塞算法:通过使用一个锁(出队和入队使用同一把锁)或者两个锁(出队和入队使用不同的锁)等方式来实现,如 BlockingQueue。
- 非阻塞算法:使用循环 CAS 的方式实现,如 ConcurrentLinkedMap。
非阻塞队列的缺点在于:它不会对当前线程产生阻塞,例如在面对消费者/生产者模型的时候,就必须额外的实现同步策略和线程间的唤醒策略。而阻塞队列不同,它会对当前线程产生阻塞,例如某线程从一个空的阻塞队列中准备取元素,由于此时的队列中没有元素,因此该线程就会处于阻塞状态。当队列中有了元素之后,被阻塞的线程会自动被唤醒,这样就极大的提高了方便性。
本文将会介绍 7 种不同的 BlockingQueue,以及在 Java 中阻塞队列的 4 种不同的处理方式,最后结合具体的实例来分析 BlockingQueue 的原理及其使用场景。
阻塞队列
队列是一种先进先出(FIFO)的数据结构,具有在队尾插入元素以及在队头删除元素的功能。而阻塞队列就是在队列的基础上,又支持了两个附加的操作,可以支持阻塞式
的插入元素和删除元素操作。即:
- 当队列为满的时候,队列会阻塞想要插入元素的线程,直到队列不满;
- 当队列为空的时候,获取元素的线程会等待队列变为非空。
那么如何实现上面的功能呢?这里需要介绍一些常用的方法。对于非阻塞队列来说,例如 PriorityQueue、LinkedList,它们常用的方法有:
- add(E e):将元素 e 插入到队列的尾部。如果插入成功,则返回 true;如果插入失败(即队列已满),则抛出异常。
- remove():移除队首元素。若移除成功,则返回 true;如果移除失败(即队列为空),则抛出异常。
- offer(E e):将元素 e 插入到队列的尾部。如果插入成功,则返回 true;如果插入失败(即队列已满),则返回 false。
- poll():移除并获取队首元素。如果成功,则返回队首元素,否则返回 null。
- peek():获取队首元素。如果成功,则返回队首元素,否则返回 null。
以上都是非阻塞队列的方法,这里推荐使用 offer、poll、peek 方法,因为这三个方法可以通过返回值来判断相应的操作是否成功,而使用 add、remove 方法则得不到这种效果。还需要注意的是:非阻塞队列中的方法都不具有同步措施。
阻塞队列除了具有以上方法外,还将以上方法实现了同步。除此之外,阻塞队列还提供了另外 4 个常用的方法:
- put(E e):向队尾添加元素。如果队列是满的,则等待。
- take():从队首取元素。如果队列是空的,则等待。
- offer(E e, long timeout, TimeUnit unit):向队尾插入元素。如果队列是满的,则等待一段 timeout 时间。当时间的期限达到时,如果还没有插入成功,则返回 false,否则返回 true。
- poll(long timeout, TimeUnit unit):取队首元素。如果队列为空,则等待一段 timeout 时间。当时间的期限达到时,如果取不到,则返回 null,否则返回取得的元素。
可以看到,以上方法的处理方式,有的是抛出异常,有的是带有返回值,有的是一直阻塞,还有的是超时退出。这里需要注意的是:如果是无界阻塞队列,则队列不可能出现满的情况,所以使用 put 或者 offer 方法永远不会被阻塞,而且使用 offer 方法时,永远返回 true。
阻塞队列的种类
在 JDK1.5 及其以后的版本中,java.util.concurrent 包下的阻塞队列的种类如下图所示:
1.ArrayBlockingQueue
基于数组实现的有界阻塞队列,在创建时必须指定容量大小,即在调用 ArrayBlockingQueue(int capacity) 时,需要指定 capacity 的大小。此外,还可以通过调用 ArrayBlockingQueue(int capacity, boolean fair) 指定公平性和非公平性。默认情况下为 fasle,即以非公平
的方式访问队列。此外,这是一个先进先出的队列。
公平与非公平的区别:
公平的访问队列
是指阻塞的线程可以按照阻塞的先后顺序访问队列,即先阻塞线程先访问队列;
非公平访问队列
是指对先等待的线程是非公平的,当队列可用时,阻塞的线程都可以争夺访问队列的资源,有可能先阻塞的线程最后才访问队列。
1
2
3
4
5
6
7
8
9
10
11
12
|
public ArrayBlockingQueue(int capacity) {
this(capacity, false);
}
public ArrayBlockingQueue(int capacity, boolean fair) {
if (capacity <= 0)
throw new IllegalArgumentException();
this.items = new Object[capacity];
lock = new ReentrantLock(fair);
notEmpty = lock.newCondition();
notFull = lock.newCondition();
}
|
2.LinkedBlockingQueue
基于链表实现的有界阻塞队列,在创建 LinkedBlockingQueue 时如果不指定具体的容量大小,则默认的大小为 Integer.MAX_VALUE。此外,这也是一个先进先出的队列。
1
2
3
4
5
6
7
8
9
10
|
public LinkedBlockingQueue() {
this(Integer.MAX_VALUE);
}
public LinkedBlockingQueue(int capacity) {
if (capacity <= 0) throw new IllegalArgumentException();
this.capacity = capacity;
last = head = new Node<E>(null);
}
|
3.PriorityBlockingQueue
支持优先级排序的无界阻塞队列,默认情况下采取自然顺序升序排序,按照元素的优先级顺序出队,每次出队的元素都是优先级最高的。此外,该阻塞队列的容量没有上限,亦成为无界阻塞队列。如果不指定参数的话,那么 PriorityBlockingQueue 默认的初始化容量为 11,如果在添加元素的过程中超过了该容量,则 PriorityBlockingQueue 会动态的改变容量大小。正是因为其容量没有限制,所以在添加元素的时候有可能会添加失败,因为不停地添加元素有可能导致资源耗尽并且会发生内存溢出。
需要注意的是,PriorityBlockingQueue 中不可以插入 null 对象,这是因为在插入元素时会对该元素进行判空检查,如果该元素是 null,则会抛出 NullPointerException。源码如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
|
/**
* Default array capacity.
*/
private static final int DEFAULT_INITIAL_CAPACITY = 11;
public PriorityBlockingQueue() {
this(DEFAULT_INITIAL_CAPACITY, null);
}
public PriorityBlockingQueue(int initialCapacity) {
this(initialCapacity, null);
}
public PriorityBlockingQueue(int initialCapacity, Comparator<? super E> comparator) {
if (initialCapacity < 1)
throw new IllegalArgumentException();
this.lock = new ReentrantLock();
this.notEmpty = lock.newCondition();
this.comparator = comparator;
this.queue = new Object[initialCapacity];
}
public boolean offer(E e) {
if (e == null)
throw new NullPointerException();
//...
}
|
当然也可以根据 PriorityBlockingQueue 不同的构造函数,通过使用自定义实现类 compareTo() 方法来指定元素的排序规则,或者初始化 PriorityBlockingQueue 时。指定参数 Comparator 来对元素进行排序。但是对于排序的元素也要注意:PriorityBlockingQueue 不能保证同优先级元素的顺序。
4.DelayQueue
使用优先级队列 PriorityQueue 实现的一个支持延时获取元素的无界阻塞队列。DelayQueue 中的元素只有当其指定的延迟时间到了,才能够从队列中获取元素。如果元素没有达到延时时间,则会阻塞当前线程。由于属于无界阻塞队列,因此往队列中插入元素的时永远不会阻塞,而获取元素时有可能会发生阻塞。
1
2
3
4
5
6
|
public DelayQueue() {}
public DelayQueue(Collection<? extends E> c) {
this.addAll(c);
}
|
正是因为 DelayQueue 具有延时的效果,因此可以适用于以下场景:
- 设计缓存系统:使用 DelayQueue 存储缓存元素的有效期,使用一个线程循环查询 DelayQueue,一旦能够从 DelayQueue 中获取到元素,则说明缓存的有效期到了。
- 定时任务调度:使用 DelayQueue 保存当天将会执行的任务和执行时间,一旦从 DelayQueue 中获取到任务就开始执行。
5.SynchronousQueue
不存储元素的阻塞队列,每个 put 操作必须等待每个 take 操作,否则不能继续添加元素。此外,SynchronousQueue 默认情况下采用非公平性策略访问队列。并且,该队列不允许 null 元素。
1
2
3
4
5
6
7
|
public SynchronousQueue() {
this(false);
}
public SynchronousQueue(boolean fair) {
transferer = fair ? new TransferQueue<E>() : new TransferStack<E>();
}
|
6.LinkedTransferQueue
由链表结构组成的无界阻塞队列,基于 TransferQueue 实现的。相对于其它阻塞队列,LinkedTransferQueue 提供了 tryTransfer 和 transfer 方法。
1
2
3
4
5
6
7
8
9
10
11
|
public boolean tryTransfer(E e) {
return xfer(e, true, NOW, 0) == null;
}
public void transfer(E e) throws InterruptedException {
if (xfer(e, true, SYNC, 0) != null) {
Thread.interrupted(); // failure possible only due to interrupt
throw new InterruptedException();
}
}
|
transfer 方法的逻辑是:如果当前有消费者正在等待接收元素,通过 transfer 方法可以把生产者传入的元素立刻传输给消费者。如果没有消费者在等待接收元素,则 transfer 方法会将元素存放在队列的 tail 节点,并等到该元素被消费者消费了才返回。
而 tryTransfer 方法用来试探生产者传入的元素是否能够直接传输给消费者。如果没有消费者等待接收元素,则返回 fase。和 transfer 方法的区别在于:tryTransfer 方法无论消费者是否接收,则方法会立即返回,而 transfer 方法必须等到消费者消费了才返回。
7.LinkedBlockingDeque
基于链表实现的双向阻塞队列,可以实现在队头进行插入、删除操作,也可以在队尾进行插入、删除操作。与其它阻塞队列的区别在于:LinkedBlockingDeque 新增了 addFirst、addLast、offerFirst、offerLast、peekFirst、peekLast 方法。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
|
public LinkedBlockingDeque() {
this(Integer.MAX_VALUE);
}
public LinkedBlockingDeque(int capacity) {
if (capacity <= 0) throw new IllegalArgumentException();
this.capacity = capacity;
}
public LinkedBlockingDeque(Collection<? extends E> c) {
this(Integer.MAX_VALUE);
final ReentrantLock lock = this.lock;
lock.lock(); // Never contended, but necessary for visibility
try {
for (E e : c) {
if (e == null)
throw new NullPointerException();
if (!linkLast(new Node<E>(e)))
throw new IllegalStateException("Deque full");
}
} finally {
lock.unlock();
}
}
|
阻塞队列的应用
下面以阻塞队列 ArrayListBlockingQueue 实现一个生产者/消费者模型:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
|
public class Test {
private int queueSize = 10;
private ArrayBlockingQueue<Integer> blockingQueue = new ArrayBlockingQueue<>(queueSize);
public static void main(String[] args) {
Test test = new Test();
Producer producer = test.new Producer();
Consumer consumer = test.new Consumer();
producer.start();
consumer.start();
}
class Consumer extends Thread {
@Override
public void run() {
consume();
}
private void consume() {
while (true) {
try {
blockingQueue.take();
System.out.println("从队列中取出元素,队列剩余: " + blockingQueue.size() + " 个元素...");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
class Producer extends Thread {
@Override
public void run() {
produce();
}
private void produce() {
while (true) {
try {
blockingQueue.put(1);
System.out.println("向队列中添加元素,队列剩余: " + blockingQueue.size() + " 个元素...");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
}
|
输出结果如下所示:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
|
向队列中添加元素,队列剩余: 1 个元素...
向队列中添加元素,队列剩余: 2 个元素...
向队列中添加元素,队列剩余: 3 个元素...
向队列中添加元素,队列剩余: 4 个元素...
向队列中添加元素,队列剩余: 5 个元素...
向队列中添加元素,队列剩余: 6 个元素...
向队列中添加元素,队列剩余: 7 个元素...
向队列中添加元素,队列剩余: 8 个元素...
向队列中添加元素,队列剩余: 9 个元素...
向队列中添加元素,队列剩余: 10 个元素...
从队列中取出元素,队列剩余: 0 个元素...
从队列中取出元素,队列剩余: 9 个元素...
从队列中取出元素,队列剩余: 9 个元素...
从队列中取出元素,队列剩余: 8 个元素...
从队列中取出元素,队列剩余: 7 个元素...
从队列中取出元素,队列剩余: 6 个元素...
从队列中取出元素,队列剩余: 5 个元素...
从队列中取出元素,队列剩余: 4 个元素...
从队列中取出元素,队列剩余: 3 个元素...
从队列中取出元素,队列剩余: 2 个元素...
从队列中取出元素,队列剩余: 1 个元素...
从队列中取出元素,队列剩余: 0 个元素...
向队列中添加元素,队列剩余: 10 个元素...
(以下省略)
|
参考