在之前的《Java 中的线程池》一文中,在介绍线程池参数的时候,其中BlockingQueue<Runnable> workQueue用于存储等待执行的任务,提到了几种不同类型的等待队列。本文将详细的介绍阻塞队列的实现原理以及各自的使用方法。

概述

在并发编程中,有时候需要使用到线程安全的队列。实现一个线程安全的队列有两种方式:阻塞算法非阻塞算法

  • 阻塞算法:通过使用一个锁(出队和入队使用同一把锁)或者两个锁(出队和入队使用不同的锁)等方式来实现,如 BlockingQueue。
  • 非阻塞算法:使用循环 CAS 的方式实现,如 ConcurrentLinkedMap。

非阻塞队列的缺点在于:它不会对当前线程产生阻塞,例如在面对消费者/生产者模型的时候,就必须额外的实现同步策略和线程间的唤醒策略。而阻塞队列不同,它会对当前线程产生阻塞,例如某线程从一个空的阻塞队列中准备取元素,由于此时的队列中没有元素,因此该线程就会处于阻塞状态。当队列中有了元素之后,被阻塞的线程会自动被唤醒,这样就极大的提高了方便性。

本文将会介绍 7 种不同的 BlockingQueue,以及在 Java 中阻塞队列的 4 种不同的处理方式,最后结合具体的实例来分析 BlockingQueue 的原理及其使用场景。

阻塞队列

队列是一种先进先出(FIFO)的数据结构,具有在队尾插入元素以及在队头删除元素的功能。而阻塞队列就是在队列的基础上,又支持了两个附加的操作,可以支持阻塞式的插入元素和删除元素操作。即:

  • 当队列为满的时候,队列会阻塞想要插入元素的线程,直到队列不满;
  • 当队列为空的时候,获取元素的线程会等待队列变为非空。

那么如何实现上面的功能呢?这里需要介绍一些常用的方法。对于非阻塞队列来说,例如 PriorityQueue、LinkedList,它们常用的方法有:

  • add(E e):将元素 e 插入到队列的尾部。如果插入成功,则返回 true;如果插入失败(即队列已满),则抛出异常。
  • remove():移除队首元素。若移除成功,则返回 true;如果移除失败(即队列为空),则抛出异常。
  • offer(E e):将元素 e 插入到队列的尾部。如果插入成功,则返回 true;如果插入失败(即队列已满),则返回 false。
  • poll():移除并获取队首元素。如果成功,则返回队首元素,否则返回 null。
  • peek():获取队首元素。如果成功,则返回队首元素,否则返回 null。

以上都是非阻塞队列的方法,这里推荐使用 offer、poll、peek 方法,因为这三个方法可以通过返回值来判断相应的操作是否成功,而使用 add、remove 方法则得不到这种效果。还需要注意的是:非阻塞队列中的方法都不具有同步措施。

阻塞队列除了具有以上方法外,还将以上方法实现了同步。除此之外,阻塞队列还提供了另外 4 个常用的方法:

  • put(E e):向队尾添加元素。如果队列是满的,则等待。
  • take():从队首取元素。如果队列是空的,则等待。
  • offer(E e, long timeout, TimeUnit unit):向队尾插入元素。如果队列是满的,则等待一段 timeout 时间。当时间的期限达到时,如果还没有插入成功,则返回 false,否则返回 true。
  • poll(long timeout, TimeUnit unit):取队首元素。如果队列为空,则等待一段 timeout 时间。当时间的期限达到时,如果取不到,则返回 null,否则返回取得的元素。

可以看到,以上方法的处理方式,有的是抛出异常,有的是带有返回值,有的是一直阻塞,还有的是超时退出。这里需要注意的是:如果是无界阻塞队列,则队列不可能出现满的情况,所以使用 put 或者 offer 方法永远不会被阻塞,而且使用 offer 方法时,永远返回 true。

阻塞队列的种类

在 JDK1.5 及其以后的版本中,java.util.concurrent 包下的阻塞队列的种类如下图所示:

image.png

1.ArrayBlockingQueue

基于数组实现的有界阻塞队列,在创建时必须指定容量大小,即在调用 ArrayBlockingQueue(int capacity) 时,需要指定 capacity 的大小。此外,还可以通过调用 ArrayBlockingQueue(int capacity, boolean fair) 指定公平性和非公平性。默认情况下为 fasle,即以非公平的方式访问队列。此外,这是一个先进先出的队列。

公平与非公平的区别:

公平的访问队列是指阻塞的线程可以按照阻塞的先后顺序访问队列,即先阻塞线程先访问队列;

非公平访问队列是指对先等待的线程是非公平的,当队列可用时,阻塞的线程都可以争夺访问队列的资源,有可能先阻塞的线程最后才访问队列。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
public ArrayBlockingQueue(int capacity) {
    this(capacity, false);
}

public ArrayBlockingQueue(int capacity, boolean fair) {
    if (capacity <= 0)
        throw new IllegalArgumentException();
    this.items = new Object[capacity];
    lock = new ReentrantLock(fair);
    notEmpty = lock.newCondition();
    notFull =  lock.newCondition();
}

2.LinkedBlockingQueue

基于链表实现的有界阻塞队列,在创建 LinkedBlockingQueue 时如果不指定具体的容量大小,则默认的大小为 Integer.MAX_VALUE。此外,这也是一个先进先出的队列。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
public LinkedBlockingQueue() {
    this(Integer.MAX_VALUE);
}


public LinkedBlockingQueue(int capacity) {
    if (capacity <= 0) throw new IllegalArgumentException();
    this.capacity = capacity;
    last = head = new Node<E>(null);
}

3.PriorityBlockingQueue

支持优先级排序的无界阻塞队列,默认情况下采取自然顺序升序排序,按照元素的优先级顺序出队,每次出队的元素都是优先级最高的。此外,该阻塞队列的容量没有上限,亦成为无界阻塞队列。如果不指定参数的话,那么 PriorityBlockingQueue 默认的初始化容量为 11,如果在添加元素的过程中超过了该容量,则 PriorityBlockingQueue 会动态的改变容量大小。正是因为其容量没有限制,所以在添加元素的时候有可能会添加失败,因为不停地添加元素有可能导致资源耗尽并且会发生内存溢出。

需要注意的是,PriorityBlockingQueue 中不可以插入 null 对象,这是因为在插入元素时会对该元素进行判空检查,如果该元素是 null,则会抛出 NullPointerException。源码如下:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
/**
  * Default array capacity.
  */
private static final int DEFAULT_INITIAL_CAPACITY = 11;


public PriorityBlockingQueue() {
    this(DEFAULT_INITIAL_CAPACITY, null);
}


public PriorityBlockingQueue(int initialCapacity) {
    this(initialCapacity, null);
}


public PriorityBlockingQueue(int initialCapacity, Comparator<? super E> comparator) {
    if (initialCapacity < 1)
        throw new IllegalArgumentException();

    this.lock = new ReentrantLock();
    this.notEmpty = lock.newCondition();
    this.comparator = comparator;
    this.queue = new Object[initialCapacity];
}


public boolean offer(E e) {
    if (e == null)
        throw new NullPointerException();
    //...
}

当然也可以根据 PriorityBlockingQueue 不同的构造函数,通过使用自定义实现类 compareTo() 方法来指定元素的排序规则,或者初始化 PriorityBlockingQueue 时。指定参数 Comparator 来对元素进行排序。但是对于排序的元素也要注意:PriorityBlockingQueue 不能保证同优先级元素的顺序。

4.DelayQueue

使用优先级队列 PriorityQueue 实现的一个支持延时获取元素的无界阻塞队列。DelayQueue 中的元素只有当其指定的延迟时间到了,才能够从队列中获取元素。如果元素没有达到延时时间,则会阻塞当前线程。由于属于无界阻塞队列,因此往队列中插入元素的时永远不会阻塞,而获取元素时有可能会发生阻塞。

1
2
3
4
5
6
public DelayQueue() {}


public DelayQueue(Collection<? extends E> c) {
    this.addAll(c);
}

正是因为 DelayQueue 具有延时的效果,因此可以适用于以下场景:

  • 设计缓存系统:使用 DelayQueue 存储缓存元素的有效期,使用一个线程循环查询 DelayQueue,一旦能够从 DelayQueue 中获取到元素,则说明缓存的有效期到了。
  • 定时任务调度:使用 DelayQueue 保存当天将会执行的任务和执行时间,一旦从 DelayQueue 中获取到任务就开始执行。

5.SynchronousQueue

不存储元素的阻塞队列,每个 put 操作必须等待每个 take 操作,否则不能继续添加元素。此外,SynchronousQueue 默认情况下采用非公平性策略访问队列。并且,该队列不允许 null 元素。

1
2
3
4
5
6
7
public SynchronousQueue() {
    this(false);
}

public SynchronousQueue(boolean fair) {
    transferer = fair ? new TransferQueue<E>() : new TransferStack<E>();
}

6.LinkedTransferQueue

由链表结构组成的无界阻塞队列,基于 TransferQueue 实现的。相对于其它阻塞队列,LinkedTransferQueue 提供了 tryTransfer 和 transfer 方法。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
public boolean tryTransfer(E e) {
    return xfer(e, true, NOW, 0) == null;
}


public void transfer(E e) throws InterruptedException {
    if (xfer(e, true, SYNC, 0) != null) {
        Thread.interrupted(); // failure possible only due to interrupt
        throw new InterruptedException();
    }
}

transfer 方法的逻辑是:如果当前有消费者正在等待接收元素,通过 transfer 方法可以把生产者传入的元素立刻传输给消费者。如果没有消费者在等待接收元素,则 transfer 方法会将元素存放在队列的 tail 节点,并等到该元素被消费者消费了才返回。

而 tryTransfer 方法用来试探生产者传入的元素是否能够直接传输给消费者。如果没有消费者等待接收元素,则返回 fase。和 transfer 方法的区别在于:tryTransfer 方法无论消费者是否接收,则方法会立即返回,而 transfer 方法必须等到消费者消费了才返回。

7.LinkedBlockingDeque

基于链表实现的双向阻塞队列,可以实现在队头进行插入、删除操作,也可以在队尾进行插入、删除操作。与其它阻塞队列的区别在于:LinkedBlockingDeque 新增了 addFirst、addLast、offerFirst、offerLast、peekFirst、peekLast 方法。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public LinkedBlockingDeque() {
    this(Integer.MAX_VALUE);
}


public LinkedBlockingDeque(int capacity) {
    if (capacity <= 0) throw new IllegalArgumentException();
    this.capacity = capacity;
}

public LinkedBlockingDeque(Collection<? extends E> c) {
    this(Integer.MAX_VALUE);
    final ReentrantLock lock = this.lock;
    lock.lock(); // Never contended, but necessary for visibility
    try {
        for (E e : c) {
            if (e == null)
                throw new NullPointerException();
            if (!linkLast(new Node<E>(e)))
                throw new IllegalStateException("Deque full");
        }
    } finally {
        lock.unlock();
    }
}

阻塞队列的应用

下面以阻塞队列 ArrayListBlockingQueue 实现一个生产者/消费者模型:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
public class Test {

    private int queueSize = 10;
    private ArrayBlockingQueue<Integer> blockingQueue = new ArrayBlockingQueue<>(queueSize);

    public static void main(String[] args) {
        Test test = new Test();

        Producer producer = test.new Producer();
        Consumer consumer = test.new Consumer();

        producer.start();
        consumer.start();
    }

    class Consumer extends Thread {
        @Override
        public void run() {
            consume();
        }

        private void consume() {
            while (true) {
                try {
                    blockingQueue.take();
                    System.out.println("从队列中取出元素,队列剩余: " + blockingQueue.size() + " 个元素...");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }

    class Producer extends Thread {
        @Override
        public void run() {
            produce();
        }

        private void produce() {
            while (true) {
                try {
                    blockingQueue.put(1);
                    System.out.println("向队列中添加元素,队列剩余: " + blockingQueue.size() + " 个元素...");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }
}

输出结果如下所示:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
向队列中添加元素,队列剩余: 1 个元素...
向队列中添加元素,队列剩余: 2 个元素...
向队列中添加元素,队列剩余: 3 个元素...
向队列中添加元素,队列剩余: 4 个元素...
向队列中添加元素,队列剩余: 5 个元素...
向队列中添加元素,队列剩余: 6 个元素...
向队列中添加元素,队列剩余: 7 个元素...
向队列中添加元素,队列剩余: 8 个元素...
向队列中添加元素,队列剩余: 9 个元素...
向队列中添加元素,队列剩余: 10 个元素...
从队列中取出元素,队列剩余: 0 个元素...
从队列中取出元素,队列剩余: 9 个元素...
从队列中取出元素,队列剩余: 9 个元素...
从队列中取出元素,队列剩余: 8 个元素...
从队列中取出元素,队列剩余: 7 个元素...
从队列中取出元素,队列剩余: 6 个元素...
从队列中取出元素,队列剩余: 5 个元素...
从队列中取出元素,队列剩余: 4 个元素...
从队列中取出元素,队列剩余: 3 个元素...
从队列中取出元素,队列剩余: 2 个元素...
从队列中取出元素,队列剩余: 1 个元素...
从队列中取出元素,队列剩余: 0 个元素...
向队列中添加元素,队列剩余: 10 个元素...
(以下省略)

参考