之前的《Java 中的 Lock 及各种锁的概念》简单介绍了 Lock 的使用以及 Lock 的唯一实现类 ReentrantLock 中的方法。我们一般会拿 synchronized 和 ReentrantLock 做比较。而在了解 ReentrantLock 之前需要理解 AQS 的实现。因此,本文分析了 AQS 中主要的源码,包括独占模式与共享模式,为更好的认识 ReentrantLock 打下基础。

概述

ReentrantLock 是基于 AQS 实现的,而 AQS 又是基于 CAS 实现的。因此,最好先熟悉一下 CAS 的实现原理,对于这点我在《Java 中的 Atomic 包》中提到过。这样的话,接下来应该介绍的是 AQS(AbstractQueuedSynchronizer,即抽象的队列式的同步器),其内部的结构是基于队列锁实现的。因此,还需要提前介绍一下队列锁相关的内容。

我在准备起笔 AQS 文章时,在网上搜集了许多相关的文章,绝大部分都是直奔主题,直接从 AQS 讲起。这样直接讲的话会缺少必要的背景知识,看过文章之后使读者不清楚 AQS 的来龙去脉。我在这里一方面是抛砖引玉,另一方面是尽量保证 AQS 介绍的完整性。因此,本文的总体目录结构如下所示

  • 队列锁(Queue Lock)
  • CAS(Compare and Swap,比较且交换)
  • AQS

需要提及的是,CAS 已经在《Java 中的 Atomic 包》中提到过,包括它的概念以及需要注意的问题,当然我也会在本文中提到 CAS。

队列锁

概念

这里所说的队列锁属于互斥算法(Mutual exclusion algorithms)的一种,互斥算法可以用来对共享内存进行读、修改、写操作。而在多处理器的环境下,现代共享内存多处理器提供了比读写更强的操作。例如所谓的“原子”操作 CAS,假如内存中有一个变量 i 的值为 0,当处理器想要对 i 进行加 1 操作的时候,该处理器首先会将内存中的 0 读到算数运算单元,然后将其加 1 后变成 1。在写回到内存之前,处理器需要判断之前首次读进算数运算单元的值与内存中的值是否相等,由于之前首次读进算数运算单元的值为 0 并且内存中的值也为 0,因此,处理器会将计算后的结果(也就是 1)写回到内存中,此时内存中变量 i 的值就变成了 1。如果不相等的话,处理器会再次从内存中读取 i 的值,再次比较。(详细部分可见上面提到的原文)

当然,原子操作不仅仅可以实现读写,还有一些其它的用处。比如,可以用来实现队列锁算法。这种队列和普通的队列大体上是相同的,也具有入队、出队操作以及队头、队尾的描述形式。但由于多核处理器在硬件上不支持这种队列,因此可以使用 CAS 来实现。

自旋和阻塞

如果一个线程 B 想要获得某个已经被线程 A 使用的锁(临界资源)的时候,将会发生两种情况:自旋(Spinning)和阻塞(Blocking):

自旋:在进行自旋的时候,线程 B 会一直保持尝试获取锁的这种状态,即一直循环检测锁是否被释放,而不是进入线程挂起或睡眠状态。我们可以使用 while 进行实现,即设置某个变量,不断地通过 while 检查该变量的值,等到在某些条件下等于这个值并且返回 true 的时候,就说明线程 B 获取锁成功了。如果线程 B 等待时间很短的话,这种自旋的实现方式是很有效的。而如果自旋时间过长,则会增加计算机的负担,反而会降低其执行效率。

阻塞:也就是互斥,如果产生阻塞的话,线程 B 会阻塞自己知道临界资源解除占用,然后会再次尝试获取。

如果是竞争非常激烈的场景,在使用自旋的时候会遇到一些其它的问题:

  • 可能导致某些线程自始至终都无法获取到锁,这些线程会一直处于饥饿状态;
  • 自旋的时候需要依赖一个共享的锁标识,在竞争激烈的环境下,锁标识的同步也需要消耗大量的资源;
  • 如果想要通过自旋的方式实现公平锁(先到先获取),还需要额外的变量。

因此,对于上面所发生的问题,我们可以使用队列锁来实现,即让线程排队获取。队列锁的实现方法主要有 Anderson、Graunke/Thakkar、Mellor-Crummey/Scott(MCS)、Craig-Landin-Hagersten(CLH)。由于 AQS 所使用的队列锁是在 CLH 的基础上改进的,所以这里主要介绍 CLH 的实现方式。

CLH

由 Craig、Landin、Hagersten 三人首字母命名的自旋锁 CLH 是基于链表实现的无界且公平的先进先出(FIFO)队列,线程在本地局部变量上自旋,不断轮询前置节点的状态。如果发现前置节点已经释放了锁,则会结束自旋操作并获取锁。而 Java 中的 AQS 使用的是 CLH 的一种变体。

原始的 CLH 锁的内部有两个被 ThreadLocal 修饰的变量,分别表示当前的节点 node 以及其前驱(前置)节点 preNode。此外,还有一个被 AtomicReference 修饰的原子引用变量 tail 指向队列的尾部。具体加锁和释放锁的过程如下所示:

  • 首先会创建一个节点,并将该节点置为 true,表示尝试获取锁;
  • 然后将 tail 指向该节点,表示该节点为当前队列的尾部,并且会获得前置节点的引用,即 locked;
  • 判断前置节点的状态,如果为 true,则在前置节点上自旋,直到获取到锁;
  • 当前线程释放锁时,将 locked 状态置为 false,并将当前节点指向前置节点。

以下是 CLH 的简单实现:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public class CLH implements Lock {
    private final ThreadLocal<Node> preNode = ThreadLocal.withInitial(() -> null);
    private final ThreadLocal<Node> node = ThreadLocal.withInitial(Node::new);
    private final AtomicReference<Node> tail = new AtomicReference<>(new Node());

    private static class Node {
        private volatile boolean locked;
    }

    @Override
    public void lock() {
        final Node node = this.node.get();
        node.locked = true;
        Node pre = this.tail.getAndSet(node);
        this.preNode.set(pre);
        while (pre.locked) ;
    }

    @Override
    public void unlock() {
        final Node node = this.node.get();
        node.locked = false;
        this.node.set(this.preNode.get());
    }
}

这里顺带一提 CLH 和 MCS 的区别,这俩都是自旋锁的实现,其区别如下:

  • 链表结构不同
    • CLH 中的链表包含的是当前节点以及前驱节点,而 MCS 中的链表包含的是当前节点以及后继节点。
  • 自旋对象不同
    • CLH 的自旋对象是前驱节点,而 MCS 自旋对象是自身节点,也就是当前节点。

AQS

CountDownLatch、ThreadPoolExecutor、ReentrantLock、ReentrantReadWriteLock 以及 Semaphore 中的内部类都继承了 AbstractQueuedSynchronizer。因此,对于 ReentrantLock 的理解是需要建立在 AQS 之上的。AQS 的核心思想是:如果被请求的共享资源是空闲的,那么就将当前请求资源的线程设置为有效的工作线程,并将共享资源设置为锁定状态。而如果共享资源被占用,就需要一定的阻塞等待唤醒机制来保证锁的分配。该机制就是使用 CLH 队列的变体实现的,即将暂时获取不到的线程放进队列中

上面提到的 CLH 是基于单向链表实现的,而 AQS 中的 CLH 变体是基于双向队列实现的,AQS 通过将每个请求共享资源的线程封装成一个节点来实现锁的分配,当多线程在争用资源被阻塞时会进入此队列。整体实现结构如下图所示:

在 AQS 的内部类 Node 中定义了该结构,该 Node 表示的就是上图中的每个节点。Node 节点是对每个等待获取资源的线程的封装,它包含了需要同步的线程本身及其等待状态,例如是否被阻塞、是否等待唤醒、是否已经被取消等。

除此之外,AQS 还维护了一个被 volatile 修饰的 state,代表共享资源,访问 state 的方式有以下三种:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
protected final int getState() {
    return state;
}

protected final void setState(int newState) {
    state = newState;
}

protected final boolean compareAndSetState(int expect, int update) {
    return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}

AQS 定义了两种资源共享的方式:EXCLUSIVE 和 SHARED。EXCLUSIVE 表示独占模式,只能有一个线程执行,例如 ReentrantLock;SHARED 表示共享模式,即多个线程可以同时执行,例如 CountDownLatch、Semaphore。

不同的自定义同步器争用共享资源的方式也是不同的,在实现自定义同步器时只需要实现共享资源 state 的获取和释放方式即可,至于线程等待队列的维护(例如获取资源失败入队/唤醒出队),AQS 已经在底层实现好了。如果想要自定义同步器,则需要实现以下几个方法:

  • boolean isHeldExclusively():表示该线程是否正在独占资源,只有用到 Condition 的时候才需要实现它;
  • boolean tryAcquire(int arg):表示在独占模式下尝试获取资源,成功返回 true,否则返回 false;
  • boolean tryRelease(int arg):表示在独占模式表尝试释放资源,成功返回 true,否则返回 false;
  • int tryAcquireShared(int arg):表示在共享模式下尝试获取资源。
    • 返回负数表示失败;
    • 返回 0 表示成功,但没有剩余可用资源;
    • 返回正数表示成功,且有剩余资源。
  • boolean tryReleaseShared(int arg):表示在共享模式下尝试释放资源,如果释放后允许唤醒后续等待节点则返回 true,否则返回 false。

以 ReentrantLock 为例,state 初始化为 0,表示处于未锁定状态。线程 A 执行 lock() 时,会调用 tryAcquire() 独占该锁并将 state+1。此后其它线程再 tryAcquire() 时就会失败,直到线程 A 执行 unlock() 到 state=0 时释放锁为止,其它线程才有机会获取该锁。需要注意的是,在释放锁之前,由于线程 A 已经获取到了锁,因此线程 A 还可以重复获取这个锁,state 会累加,这就是可重入的概念。但获取了多少次就应该释放多少次,这样才能保证 state 回到 0 状态。

以 CountDownLatch 为例,将某个任务分为 N 个子任务并行执行(N 与线程个数一致),state 初始化为 N。每个子任务执行完后每执行 countDown() 一次,state 就会 CAS 减 1。等到所有子任务都执行完以后,即 state=0 时,会 unpark() 主线程,然后主线程就会从 await() 函数返回,继续后续的执行。

一般情况下,自定义同步器可以使用独占方式或共享方式,如果使用独占方式,则实现 tryAcquire() 和 tryRelease() 即可;而如果实现共享方式,则实现 tryAcquireShared() 和 tryReleaseShared() 即可。如果想要同时支持独占和共享的话,可以使用 AQS 提供的 ReentrantReadWriteLock。

waitStatus

在内部类 Node 中,使用 waitStatus 表示当前节点在队列中的等待状态,共有 5 种取值。如下所示:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
static final class Node {
    // 表示一个节点在共享模式下处于等待状态,即共享模式
    static final Node SHARED = new Node();
    // 表示一个节点在独占模式下处于等待状态,即独占模式
    static final Node EXCLUSIVE = null;

    static final int CANCELLED =  1;
    static final int SIGNAL    = -1;
    static final int CONDITION = -2;
    static final int PROPAGATE = -3;

    volatile int waitStatus;
}
  • CANCELLED = 1:表示当前节点已取消调度,即当前线程取消获取锁,当 timeout 或被中断,会触发变更为此状态,进入该状态后的节点将不再发生状态的变化;
  • SIGNAL = -1:表示后继节点等待被当前节点唤醒,也就是说后继节点在被唤醒前是处于阻塞状态的。当后继节点入队时,会将前驱节点的状态更新为 SIGNAL;
  • CONDITION = -2:表示当前节点等待在 Condition 上,即当前线程位于条件队列,当其它线程调用了 Condition 的 signal() 方法后,CONDITION 状态的节点将从等待队列转移到同步队列中,等待获取同步锁;
  • PROPAGATE = -3:表示在共享模式下,前驱节点不仅会唤醒其后继节点,同时也可能会唤醒后继的后继节点。也就是说,处于共享模式下节点的最终状态,确保在 doReleaseShared 的时候将共享状态继续传播下去;
  • 0:如果当前节点不处于以上任何一种状态的话,则会处于 0 状态,即新节点入队时的默认状态。

需要注意的是,负值表示当前节点处于有效等待状态,而正值表示当前节点已被取消。

acquire(int arg)

AbstractQueuedSynchronizer 类中的 acquire() 方法用于在独占模式下获取资源,如果获取到资源,则线程直接返回,否则进入等待队列,直到获取到资源为止,该过程忽略中断的影响。如下所示:

1
2
3
4
public final void acquire(int arg) {
    if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}

acquire(int arg) 方法的流程是:

  • tryAcquire(arg) 尝试直接获取资源,如果成功则直接返回,这里体现了非公平锁,每个线程在获取锁时都会尝试进行抢占;
  • addWaiter() 将该线程加入到等待队列的尾部,并标记为独占模式;
  • acquireQueued() 使线程阻塞在等待队列中获取资源,一直获取到资源后才返回,如果在整个等待过程中被中断过,则返回 true,否则返回 false;
  • 如果线程在等待过程中被中断过,它是不会响应的,只是获取资源后再进行自我中断,即 selfInterrupt()。

tryAcquire(int arg)

该方法在独占模式下尝试获取资源,获取成功返回 true,否则返回 false。如下所示:

1
2
3
protected boolean tryAcquire(int arg) {
    throw new UnsupportedOperationException();
}

此方法直接抛出异常,是因为 AQS 只是一个框架,具体获取和释放操作由自定义同步器来实现。因此,该方法需要被重写,具体重写的方式是:可以规定为如果返回 true,则表示获取锁成功,反之失败。

回到 acquire(int arg) 方法中,如果 tryAcquire(int arg) 获取锁成功了,则 acquire 方法直接返回,如果失败了,则继续后面的操作,即将线程放进等待队列中,也就是 && 符号后面的 acquireQueued() 方法。但在此之前,需要先了解 addWaiter(Node mode) 方法,

addWaiter(Node mode)

该方法用于将当前线程加入到等待队列的队尾,并返回当前线程所在的节点,如下所示:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
// 位于 Node 类中
Node nextWaiter;

// 位于 Node 类中
Node(Thread thread, Node mode) {
    this.nextWaiter = mode;
    this.thread = thread;
}

// 位于 AbstractQueuedSynchronizer 类中
private Node addWaiter(Node mode) {
    Node node = new Node(Thread.currentThread(), mode);

    Node pred = tail;
    if (pred != null) {
        node.prev = pred;
        if (compareAndSetTail(pred, node)) {
            pred.next = node;
            return node;
        }
    }
    enq(node);
    return node;
}

流程如下:

  • 使用 Node 含参构造器创建一个 Node 对象,即构造一个当前线程的节点 node;
  • 将刚刚创建的 node 放到队尾:
    • 首先获取尾节点,如果尾节点不为 null,则将 node 放到队尾,通过 CAS 的方式设置队尾节点,如果成功,则将刚刚创建的节点 node 设置为原来队尾节点的 next,然后返回。
  • 如果尾节点 tail 是 null,则调用 enq() 方法。

enq() 方法如下所示:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
private Node enq(final Node node) {
    for (;;) {
        Node t = tail;
        if (t == null) { // Must initialize
            if (compareAndSetHead(new Node()))
                tail = head;
        } else {
            node.prev = t;
            if (compareAndSetTail(t, node)) {
                t.next = node;
                return t;
            }
        }
    }
}

该方法用于将 node 加入队尾,采用 CAS 自旋的方式直到成功加入到队尾。首先获取到队尾 t,如果 t 为空,说明队列没有进行初始化,因此需要进行初始化操作。这里采用 CAS 的方式创建一个空的标志节点作为 head 节点,并将 tail 指向 head。而如果 t 不为空,则直接执行 CAS 操作将 node 插入到队列尾部。

再次回到 acquire(int arg),此时需要了解的是 acquireQueued() 方法。

acquireQueued(final Node node, int arg)

前面执行了 tryAcquire() 方法和 addWaiter() 方法,此时表明该线程获取资源失败,已经被放进等待队列尾部了。下一步就是进入等待状态,直到其它线程释放资源后唤醒该线程,然后该线程再去获取资源。acquireQueued() 方法就是让线程阻塞在等待队列中获取资源,一直获取到资源后才返回,如下所示:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
final boolean acquireQueued(final Node node, int arg) {
    // 表示是否成功拿到资源
    boolean failed = true;
    try {
        // 表示等待过程中是否被中断过
        boolean interrupted = false;
        // 自旋操作
        for (;;) {
            // 获得当前节点 node 的前驱节点
            final Node p = node.predecessor();
            // 如果前驱节点是头节点,则说明当前节点 node 是第二个节点,
            // 因此便有资格去尝试获取资源
            if (p == head && tryAcquire(arg)) {
                // 将 head 指向当前节点 node
                setHead(node);
                // 经过 setHead() 后,node.prev 已经指向了 null,
                // 此时再将 p.next 置为 null,便于 GC 回收,
                // 也就意味着之前拿完资源的节点出队了
                p.next = null;
                // 表示成功获取到资源
                failed = false;
                // 返回等待过程中是否被中断过
                return interrupted;
            }
            // 如果当前线程可以休息了,就通过 park() 进入 waiting 状态,直到 unpark(),
            // 如果在不可中断的情况下被中断了,则会从 park() 中醒过来,会发现拿不到资源,
            // 从而继续进入 park() 等待
            if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())
                // 如果等待过程中被中断过,就将 interrupted 标记为 true
                interrupted = true;
        }
    } finally {
        // 如果等待过程中没有成功获取到资源,例如 timeout 或可中断的情况下被中断了,
        // 那么取消节点在队列中的等待
        if (failed)
            cancelAcquire(node);
    }
}

从注释中可以看到基本的执行流程,在梳理流程之前,这里再看一下 shouldParkAfterFailedAcquire() 和 parkAndCheckInterrupt() 方法的作用。

首先看一下 shouldParkAfterFailedAcquire() 方法:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    // 获得前驱节点 pred 的状态
    int ws = pred.waitStatus;
    // 这个地方怎么理解呢?
    // 如果之前已经告诉前驱节点:你拿到资源后要通知一下我哈
    // 也就是说,如果前驱节点处于这种 SIGNAL 状态的时候,
    // 那么我就可以直接返回了
    if (ws == Node.SIGNAL)
        return true;
    if (ws > 0) {
        // 来到这里说明前驱节点取消获取锁的操作,那就一直往前找,
        // 直到找到最近的一个处于等待状态的节点,并插在该节点的后面,
        // 那些已经略过的节点,即那些大于 0 状态的节点都会失效,被 GC 回收
        do {
            node.prev = pred = pred.prev;
        } while (pred.waitStatus > 0);
        pred.next = node;
    } else {
        // 如果前驱节点正常,则通过 CAS 的方式将前驱节点设置成 SIGNAL 状态,
        // 告诉它:获取到资源后通知一下我(当前节点)
        compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
    }
    return false;
}

再来看一下 parkAndCheckInterrupt() 方法:

1
2
3
4
private final boolean parkAndCheckInterrupt() {
    LockSupport.park(this);
    return Thread.interrupted();
}

该方法会通过 park() 让当前线程进入 waiting 状态,等到有别的线程唤醒,然后返回当前线程是否被中断了。需要注意的是,Thread.interrupted() 方法会清除当前线程的中断标记位,即清除中断状态。

再次回到 acquireQueued() 方法,具体流程就是将刚刚创建的线程节点挂起,然后等待唤醒,如果被唤醒了,则将自己设置为 head,最后返回是否被中断。

将以上方法介绍完以后,再次回到 acquire(int arg) 方法:

1
2
3
4
public final void acquire(int arg) {
    if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}

现在总结一下 acquire() 方法的流程:

  • 首先通过 tryAcquire(arg) 方法尝试获取资源,如果成功则直接返回;
  • 如果没有成功,则需要通过 addWaiter() 方法将该线程加入等待队列的尾部,并标记为独占模式;
  • acquireQueued() 使线程在等待队列中休息,轮到自己获取资源的时候会被 unpark(),然后去获取资源,进行返回。如果在整个等待过程中被中断过,则返回 true,否则返回 false;
  • 如果线程在整个等待过程中被中断过,它是不响应的。只是获取资源后才进行自我中断 selfInterrupt()。

下图来自水岩

release(int arg)

该方法用于在独占模式下释放共享资源,可以用来实现 unlock() 方法,如下所示:

1
2
3
4
5
6
7
8
9
public final boolean release(int arg) {
    if (tryRelease(arg)) {
        Node h = head;
        if (h != null && h.waitStatus != 0)
            unparkSuccessor(h);
        return true;
    }
    return false;
}

首先通过 tryRelease(arg) 尝试释放资源,如果成功,则获取头节点,当头节点不为空并且还有资源的话,则唤醒等待队列里的下一个线程。需要注意的是,release(int arg) 方法的返回值是根据 tryRelease() 方法的返回值来判断线程是否已经完成释放掉资源了。

tryRelease(int arg)

1
2
3
protected boolean tryRelease(int arg) {
    throw new UnsupportedOperationException();
}

release() 方法被 final 修饰,因此不能重写,但我们可以重写 tryRelease() 方法。该方法在独占模式下通过设置 state 的状态来尝试释放资源。由于是独占模式,因此如果某个线程要释放资源的话,那么它肯定已经拿到独占的资源了,直接通过state -= arg减掉相应的资源即可。如果已经将资源彻底释放了(state=0),则返回 true,否则返回 false。

unparkSuccessor(Node node)

该方法通过 unpark() 来唤醒等待队列中的下一个线程,也就是当前节点的后继节点。需要注意的是,需要确保后继节是存在的,如果后继节点不存在,则无法唤醒。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
private void unparkSuccessor(Node node) {
    // 获得当前节点的状态
    int ws = node.waitStatus;
    // 通过 CAS 将当前节点的状态置为 0
    if (ws < 0)
        compareAndSetWaitStatus(node, ws, 0);

    // 找到下一个需要唤醒的节点,也就是当前节点的后继节点
    Node s = node.next;
    // 如果后继节点为空或后继节点已经取消获取锁了
    if (s == null || s.waitStatus > 0) {
        s = null;
        // 从队尾往队头开始找,找到第一个状态是小于等于 0 的节点
        for (Node t = tail; t != null && t != node; t = t.prev)
            if (t.waitStatus <= 0)
                s = t;
    }
    // 找到后将其唤醒
    if (s != null)
        LockSupport.unpark(s.thread);
}

需要注意的是,release() 只是将队列中第一个满足条件的线程唤醒,所以接下来的逻辑还是在 acquireQueued() 方法中,继续尝试 tryAcquire(),如果成功,则会被出队(当前节点设为头节点),线程继续执行,否则继续等待。

综上,release() 方法是独占模式下线程释放共享资源的顶级入口,它会释放指定数量的资源,如果彻底释放了(state=0),则它会唤醒等待队列里的其它线程来获取资源

acquireShared(int arg)

该方法是处于共享模式下线程获取资源的顶级入口,它会获取指定数量的资源,获取成功则直接返回,获取失败则进入等待队列,直到获取成功为止,整个过程是忽略中断的。如下所示:

1
2
3
4
public final void acquireShared(int arg) {
    if (tryAcquireShared(arg) < 0)
        doAcquireShared(arg);
}

这里的 tryAcquireShared(arg) 表示在共享模式下尝试获取资源,它的返回值也代表了不同的含义:

  • 负值代表获取失败;
  • 0 值代表获取成功,但没有剩余资源;
  • 正数代表获取成功,还有剩余资源,其它线程还可以去获取。

而 acquireShared(int arg) 的整体流程就是:首先通过 tryAcquireShared(arg) 尝试获取资源,成功则直接返回,如果失败,则通过 doAcquireShared(arg) 进入等待队列,直到获取到资源为止再返回。

doAcquireShared(int arg)

该方法是以共享的、不可中断的方式获取资源为基础的,将当前线程加入等待队列的尾部,直到其它线程释放资源唤醒自己,自己成功拿到对应数量的资源时再返回,如下所示:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
private void doAcquireShared(int arg) {
    // 创建节点并加入到队列的尾部
    final Node node = addWaiter(Node.SHARED);
    // 用于标识是否成功
    boolean failed = true;
    try {
        // 用于标识等待过程中是否被中断过
        boolean interrupted = false;
        for (;;) {
            // 获得当前节点的前驱
            final Node p = node.predecessor();
            // 由于 head 是拿到资源的线程,
            // 如果当前节点的前驱是 head,则当前节点 node 需要被唤醒,
            // 也就是 head 用完了资源来唤醒当前节点 node
            if (p == head) {
                // 尝试获取资源
                int r = tryAcquireShared(arg);
                // 获取成功
                if (r >= 0) {
                    // 将 head 指向当前节点 node,
                    // 如果还有剩余资源,则可以再唤醒 node 之后的线程
                    setHeadAndPropagate(node, r);
                    p.next = null; // help GC
                    // 如果等待过程中被中断过,则补上中断
                    if (interrupted)
                        selfInterrupt();
                    failed = false;
                    return;
                }
            }
            // 如果当前节点 node 的前驱不是 head,
            // 则寻找安全点,进入 waiting 状态,等待被 unpark() 或被 interrput()
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

以上方法的执行过程和 acquireQueued(final Node node, int arg) 类似,这里只有线程是 head.next 的时候才回去尝试获取资源,还有剩余的话则会唤醒之后的线程。

假设如下场景:如果 head 节点释放了 5 个资源,而 node 节点需要 6 个,node.next 需要 1 个,node.next.next 需要 2 个,head 先唤醒了 node,node 一看资源不够,那它会把资源让给 node.next 吗?

不会的,node 由于资源不够,它会一直 park() 等待其它线程释放资源,也不会去唤醒 node.next 和 node.next.next。在独占模式下,同一时刻只有一个线程去执行,这样是可以的。而共享模式下,多线程可以同时执行,由于 node 需求量大,而将 node.next 和 node.next.next 阻塞了,这样做是由于 AQS 的公平性,保证了唤醒的顺序是按照入队顺序执行的,但可以看出,这样做虽然保证了公平,但降低了并发量。

setHeadAndPropagate(Node node, int propagate)

从下面的方法中可以看出,在唤醒自己(当前节点 ndoe)的同时,(如果还有资源的话)还回去唤醒后继节点,这是因为处于共享模式下的。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
private void setHeadAndPropagate(Node node, int propagate) {
    Node h = head;
    // 让 head 指向当前节点 node
    setHead(node);
    // 如果还有剩余资源,则唤醒当前节点 node 之后的线程
    if (propagate > 0 || h == null || h.waitStatus < 0 ||
        (h = head) == null || h.waitStatus < 0) {
        Node s = node.next;
        if (s == null || s.isShared())
            doReleaseShared();
    }
}

这里对 acquireShared(int arg) 做一个小结

  • 通过 tryAcquireShared(arg) 尝试获取资源,成功则直接返回;
  • 失败则通过 doAcquireShared(arg) 进入等待队列进行 park() 操作,直到被 unpark() 或 interrupt() 并成功获取到资源在返回,整个过程是忽略中断的。

共享模式体现在:当前节点 node 拿到资源后,它会唤醒后继节点(线程),让后继节点再去准备获取资源。

releaseShared(int arg)

该方法是在共享模式下释放指定量的资源,如果成功释放且允许唤醒等待的线程,则它会唤醒等待队列里的其它线程来获取资源,如下所示:

1
2
3
4
5
6
7
8
9
public final boolean releaseShared(int arg) {
    // 尝试释放资源
    if (tryReleaseShared(arg)) {
        // 唤醒后继节点
        doReleaseShared();
        return true;
    }
    return false;
}

与独占模式下的 tryRelease() 不同的是:tryRelease() 是在完全释放完资源(state=0)后,才会返回 true 并唤醒其它线程;而共享模式下的 releaseShared() 则没有这种完全的要求,而是拥有资源的线程在释放掉部分资源后就可以唤醒后续等待的线程。

例如资源总量是 13,线程 A 获取了 5 个资源,线程 B 获取了 7 个资源,然后并发执行,此时还剩下 1 个资源。等到线程 C 到来时,由于 C 需要获取 4 个资源才能运行,因此线程 C 目前由于资源不够而处于等待状态。当线程 A 释放了 2 个资源后,通过 tryReleaseShared(2) 返回 true 唤醒线程 C,线程 C 发现目前有 3 个资源,还是继续等待。然后线程 B 又释放了 2 个资源,通过 tryReleaseShared(2) 返回 true 唤醒线程 C,线程 C 看到总共有 5 个资源够用了,因此线程 C 就会和线程 A 和 B 一起运行了。

需要注意的是,ReentrantReadWriteLock 的 tryReleaseShared() 只有在完全释放掉资源(state=0)时才返回 true,因此自定义同步器可以根据需要来决定 tryReleaseShared() 的返回值。

doReleaseShared()

该方法用于在共享模式下唤醒当前节点的后继节点,如下所示:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
private void doReleaseShared() {
    for (;;) {
        Node h = head;
        if (h != null && h != tail) {
            int ws = h.waitStatus;
            if (ws == Node.SIGNAL) {
                if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                    continue;            // loop to recheck cases
                // 唤醒后继节点
                unparkSuccessor(h);
            }
            else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                continue;                // loop on failed CAS
        }
        if (h == head)                   // loop if head changed
            break;
    }
}

小结

以上的全部就是对 AQS 的源码分析了,下面再对独占模式共享模式的主要方法做一个总结:

  • boolean isHeldExclusively():表示该线程是否正在独占资源,只有用到 Condition 的时候才需要实现它;
  • boolean tryAcquire(int arg):表示在独占模式下尝试获取资源,成功返回 true,否则返回 false;
  • boolean tryRelease(int arg):表示在独占模式表尝试释放资源,成功返回 true,否则返回 false;
  • int tryAcquireShared(int arg):表示在共享模式下尝试获取资源。
    • 返回负数表示失败;
    • 返回 0 表示成功,但没有剩余可用资源;
    • 返回正数表示成功,且有剩余资源。
  • boolean tryReleaseShared(int arg):表示在共享模式下尝试释放资源,如果释放后允许唤醒后续等待节点则返回 true,否则返回 false。

因此,总体的思想就是在实现自定义同步器的时候,我们只需要实现共享资源 state 的获取方式和释放方式即可,而具体的线程等待与维护的细节,AQS 已经实现好了。

本文是在阅读参考文章后完成的,不得不说,AQS 的内部细节需要注意的地方太多,理解起来也不是那么的容易。也可能现在不是很理解,但我想,等到过了一个月、三个月或者更长时间后,再来回顾这篇文章,通过不断的学习,等到再次遇到 AQS 的时候,我会对它有一个更加清晰的认识。

参考