上篇文章《Java 中的 AQS》 对 AbstractQueuedSynchronizer 中的方法及实现方式做了详细的解释。有了 AQS 和 CAS 的基础以后,我们再来分析 ReentrantLock 就会更加容易一些。

概述

之前在 《Java 中的 Lock 及各种锁的概念》 中提到了 Lock 这个接口,讲到了里面涉及的几个方法。虽然也简单的介绍了 Lock 接口的唯一实现类 ReentrantLock,但只是简单介绍了里面的方法,而对于一些其它的具体实现(例如公平锁和非公平锁是如何实现的)则没有较为详细的介绍。因此,我想通过这篇文章对 ReentrantLock 进行一次较为细致的理解。或者也可以说,我就将对 ReentrantLock 的理解“输出”成一篇文章。如果有错误或者理解不到位的地方,还请在评论中指出。

在介绍 ReentrantLock 的时候难免会和 synchronized 做对比。在 Java 中实现锁的方式一般可以使用 ReentrantLock 和 synchronized,两者都可以实现线程同步,通过加锁的方式实现对临界资源的互斥访问。但前者实现起来更加灵活,功能更加丰富。例如 ReentrantLock 在加锁期间可以响应中断、设置超时时间等。

以下针对这两种锁在不同方面进行了对比,如下所示:

特性 synchronized ReentrantLock
可重入
响应中断
超时等待
公平锁
非公平锁
可尝试加锁
属于Java 内置特性
自动获取/释放锁

ReentrantLock 还提供了一些其它的方法用于满足不同的需求。例如,可以通过 isLocked() 方法查询当前的锁是否由任意的线程持有,如果有任意的一个线程持有这把锁,则返回 true,否则返回 false;可以通过 getHoldCount() 方法查询由当前线程加锁的次数。另外,synchronized 使用对象或类进行加锁,而 ReentrantLock 底层是通过 AQS 中的同步队列进行加锁。

接下来简单介绍一下以上提到的特定:

可重入

可重入指的是某个线程在执行过程中是否可以再次获取同一把锁。例如由两个方法 foo1() 和 foo2(),这两个方法中都进行了 lock.lock() 以及 lock.unlock() 操作,同时,在 foo1() 中的 try{ } 块中又调用了 foo2()。此时,当线程 A 进入到 foo1() 中后成功获得了锁,然后在没有释放锁的情况下,调用了 foo2() 方法。由于 foo1() 和 foo2() 使用时是同一把可重入锁,因此线程 A 是可以进入到 foo2() 中的,并且再次获得锁,线程 A 不会被阻塞。

而 ReentrantLock 中可重入的实现方式是:通过内部的 AQS 实现的,在 AQS 中有一个变量 state 用于记录同步状态。初始情况下为 0,表示 ReentrantLock 目前处于解锁状态。如果有线程调用 lock() 方法进行加锁,则 state 会加 1。如果该线程再次调用 lock() 方法加锁,则 state 会再次加 1。如果该线程调用 unlock() 方法则会执行 state–。通过查询 state 的值就可以知道 ReentrantLock 被重入的次数。

响应中断

响应中断指的是当某个线程在获取锁的过程中是否可以被其它线程打断,一般通过 Thread.interrupt() 方法中断一个线程,即请求另一个线程在它愿意并方便的时候停止它正在做的事,线程在获取锁的过程中假如被中断了,则会做出响应。synchronized 不可响应中断,一个线程获取不到锁就一直处于阻塞状态。而 ReentrantLock 提供了一个可以响应中断的方法 lockInterruptibly(),如果当前线程被中断了,则会抛出 InterruptedException 异常。

超时等待

ReentrantLock 除了能以中断的方式获取锁,还可以通过超时等待的方式获取锁,即如果线程在指定的超时时间内没有获取到锁,则将返回 false,而不是一直处于获取锁的状态。而 synchronized 不支持超时等待。

公平锁/非公平锁

公平与非公平指的是线程获取锁的方式。在公平模式下,线程在同步队列中以先进先出(FIFO)的方式获取锁,每个线程最终都能获取到锁。而在非公平模式下,线程会通过“插队”的方式去抢占锁,抢不到的则进入同步队列进行排队。默认情况下,ReentrantLock 属于非公平模式获取锁,但可以通过构造方法 ReentrantLock(boolean fair) 设置获取锁方式的公平与否。

这里再多说一点,在公平模式下的每个线程最终都会获取到锁,但效率较低。而非公平模式下线程在获取锁的过程中可能会出现饥饿的现象,即可能某些线程会始终获取不到锁,每次即将获取到锁时都被其它线程给抢了,但该模式下效率较高。效率高的原因在于:非公平模式下获取锁的时候,在恢复一个被挂起的线程与线程真正开始运行之间存在着严重的延迟。具体的说,假如线程 A 持有一把锁,并且线程 B 想请求这把锁。由于该锁已经被线程 A 持有,则线程 B 将会被挂起。只有当线程 A 将锁释放时,线程 B 才会被唤醒,紧接着再去获取锁。如果线程 C 也想获取这把锁,那么很有可能线程 C 会在线程 B 被完全唤醒前获取、使用以及释放了这把锁。这种情况下,线程 B 获取锁的时刻并没有推迟,而线程 C 更早的获得了锁。因此,这种方式执行的效率要高。

可尝试加锁

ReentrantLock 通过使用 tryLock() 方法去尝试获取锁,如果该锁没有被另外一个线程持有,则此方法会立即返回 true,并将锁持有的次数加 1。需要注意的是,即使在公平模式下,只要该锁可用,则调用 tryLock() 方法将会立即获得锁,而不管其它线程当前是否处于等待锁的状态。因此,ReentrantLock 在获取锁的方式上比 synchronized 更具多样性。

属于Java 内置特性

有关该特性的说明,已经在《Java 中的 Lock 及各种锁的概念》一文中的概述部分提到过了。简单地说,synchronized 是 Java 的关键字,因此是 Java 的内置特性,是基于 JVM 层面实现的;而 ReentrantLock 是基于 JDK 层面实现的,通过具体的方法实现对线程的同步。

自动获取/释放锁

采用 synchronized 方式不需要用户去手动释放锁,当 synchronized 方法或者 synchronized 代码块执行完之后,系统会自动让线程释放对锁的占用;而 ReentrantLock 则必须要用户去手动获取/释放锁 (发生异常时,不会自动释放锁)。特别的,如果没有主动释放锁,就有可能导致死锁现象。

ReentrantLock 结构

在之前介绍 AQS 的文章中讲到,ReentrantLock 是基于 AQS 实现,即 ReentrantLock 通过重写获取锁释放锁的方式来实现公平锁非公平锁。内部类Sync继承自 AbstractQueuedSynchronizer,并重写了它的 tryRelease() 和 isHeldExclusively() 方法。而另外两个内部类FairSyncNonfairSync都继承自 Sync,分别用来实现公平锁和非公平锁。在这两个类中,都重写了 AbstractQueuedSynchronizer 中的 tryAcquire() 方法。如下图所示:

通过重写的方法可以看到,ReentrantLock 实现的是 AQS 的独占模式,在类型上属于悲观锁。AQS 很好的封装了同步队列的管理、线程的阻塞与唤醒等基础操作。下面看一看 ReentrantLock 的构造器:

1
2
3
4
5
6
7
8
9
private final Sync sync;

public ReentrantLock() {
    sync = new NonfairSync();
}

public ReentrantLock(boolean fair) {
    sync = fair ? new FairSync() : new NonfairSync();
}

正如开头所说,ReentrantLock 默认使用的是非公平锁,也就是说,当我们使用new ReentrantLock();进行初始化的时候,等价于使用new ReentrantLock(false);,通过给构造函数传递一个true/false来实现锁的公平与否。

获取锁

在介绍获取锁的过程之前,先来回顾一下 AQS 中的同步等待队列,该队列是 CLH 队列的变体,是一种基于双向链表的同步队列。线程在获取同步状态失败的情况下,都会被封装成节点,然后加该入队列中,如下所示:

1
2
3
4
5
6
7
8
      head                                tail
        ↓                                   ↓
     +------+   prev   +------+   prev   +------+
     |||||||| <------- |      | <------- |      |
     |||||||| -------> |      | -------> |      |
     +------+   next   +------+   next   +------+
     头节点已
   获得同步状态

在队列中,头节点 head 是获取同步状态的节点,其它节点在尝试获取同步状态失败后,会进入等待队列等待。当头节点释放同步状态后,会唤醒其后继节点。后继节点会将自己设置为头节点,并将之前的头节点从队列中移除(这时大体上的流程,如果你看过 AQS 的源码后会发现,里面还设计到许多细节)。如下所示:

1
2
3
4
5
6
7
8
                        head               tail
                          ↓                  ↓
     +------+          +------+   prev   +------+
     |      |          |||||||| <------- |      |
     |      |          |||||||| -------> |      |
     +------+          +------+   next   +------+
     原头节点           头节点已
                      获得同步状态

公平锁

在内部类 FairSync 中可以看到公平锁的加锁流程,如下所示:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
// ReentrantLock.FairSync#lock
final void lock() {
    // 调用 AQS 的 acquire 方法
    acquire(1);
}

// AbstractQueuedSynchronizer#acquire
public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}

// ReentrantLock.FairSync#tryAcquire
protected final boolean tryAcquire(int acquires) {
    final Thread current = Thread.currentThread();
    // 获得同步状态
    int c = getState();
    // 同步状态为 0 说明锁暂时没被其它线程获取(占用),不等于 0 说明锁被占用着
    if (c == 0) {
        // 判断是否有其它线程比当前线程等待的时间更长。
        // 如果有,则应该先让等待时间更长的节点获得锁;
        // 如果没有,则让当前线程称为锁的所有者,并调用 compareAndSetState() 尝试设置同步状态。
        // 这里的逻辑就是实现公平锁的机制,即让先来的线程获取锁,后来的不能抢先获取。
        if (!hasQueuedPredecessors() && compareAndSetState(0, acquires)) {
            // 将当前线程设置为持有锁的线程
            setExclusiveOwnerThread(current);
            return true;
        }
    }
    // 如果当前线程为持有锁的线程,则执行重入逻辑
    else if (current == getExclusiveOwnerThread()) {
        // 计算重入后的同步状态,acquires 为 1
        int nextc = c + acquires;
        // 如果重入次数超过限制,则抛出异常
        if (nextc < 0)
            throw new Error("Maximum lock count exceeded");
        // 设置重入后的同步状态
        setState(nextc);
        return true;
    }
    // 如果同步队列中有线程存在且锁的所有者不是当前线程,则返回 false
    return false;
}

// AbstractQueuedSynchronizer#hasQueuedPredecessors
// 用于判断同步队列中是否有比当前线程等待时间更长的线程
public final boolean hasQueuedPredecessors() {
    // 在同步队列中,头节点是已经获取了锁的节点,头节点的后继节点是即将获取锁的节点
    Node t = tail;
    Node h = head;
    Node s;
    // 如果某个线程等待的时间比当前线程长,则返回 true,否则返回 false
    return h != t && ((s = h.next) == null || s.thread != Thread.currentThread());
}

当调用公平锁的 lock.lock() 方法后,将会调用 AQS 的 acquire(1) 方法。在 acquire(1) 方法中,首先通过 tryAcquire(arg) 尝试获取锁,该方法由 AQS 的继承类(FairSync)实现,获取成功直接返回。如果 tryAcquire(arg) 返回 false,则调用 addWaiter() 方法,将当前线程封装成节点,并将节点加入到同步队列的尾部。最后调用 acquireQueued() 方法让同步队列中的节点循环尝试获取锁。

如果在同步队列中成功获取锁,则将自己(当前线程)设置为持有锁的线程,然后返回。如果同步状态不为 0,且当前线程为持有锁的线程,则执行重入逻辑。

非公平锁

在内部类 NonfairSync 中可以看到非公平锁的加锁流程,如下所示:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
// ReentrantLock.NonfairSync
final void lock() {
    // 非公平锁在加锁的时候,会直接调用 CAS 设置 state 变量,如果设置成功,则说明加锁成功。
    // 这里并没有像公平锁那样调用 acquire() 方法让线程进入同步队列进行排队,而是直接调用
    // CAS 进行抢占。抢占失败后,才调用 acquire() 方法将线程放入同步队列尾部进行排队
    if (compareAndSetState(0, 1))
        setExclusiveOwnerThread(Thread.currentThread());
    else
        acquire(1);
}

// AbstractQueuedSynchronizer#acquire
public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}

// ReentrantLock.NonfairSync#tryAcquire
protected final boolean tryAcquire(int acquires) {
    return nonfairTryAcquire(acquires);
}

// ReentrantLock.Sync#nonfairTryAcquire
final boolean nonfairTryAcquire(int acquires) {
    final Thread current = Thread.currentThread();
    // 获取同步状态
    int c = getState();
    // 同步状态为 0 说明锁暂时没被其它线程获取(占用),此时可以加锁
    if (c == 0) {
        // 由于状态可能在检查后被立即修改,因此此处使用 compareAndSetState 来
        // 原子地更新状态,表示这个锁已经被占有,并确保状态在最后一次检查以后
        // 就没有被修改过
        if (compareAndSetState(0, acquires)) {
            // 将当前线程设置为持有锁的线程
            setExclusiveOwnerThread(current);
            return true;
        }
    }
    // 如果当前线程已经持有锁,即占用锁的线程是自己,则此处为 true,
    // 表明线程再次获取锁,即重入
    else if (current == getExclusiveOwnerThread()) {
        // 计算重入后的同步状态,acquires 为 1
        int nextc = c + acquires;
        if (nextc < 0) // overflow
            throw new Error("Maximum lock count exceeded");
        // 设置新的同步状态值
        setState(nextc);
        return true;
    }
    // 如果 state 不为 0,且锁的持有者也不是自己,则返回 false,
    // 然后线程就会进入到同步队列中
    return false;
}

非公平锁的加锁方式是,首先直接调用 compareAndSetState() 方法进行抢占式的加锁,加锁成功后将当前线程设置为持有锁的线程,然后返回。如果加锁失败,则将会调用 acquire() 方法,将当前线程放入同步队列的尾部进行等待。如果线程在同步队列中成功获取锁,则将自己设置为持有锁的线程,然后返回。如果同步状态不为 0,并且当前线程已经是持有锁的线程,则执行重入操作。

释放锁

不管是公平锁还是非公平锁,它们释放锁的操作都是相同的,如下所示:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
// ReentrantLock#unlock
public void unlock() {
    // 调用 AQS 中的 release 方法
    sync.release(1);
}

// AbstractQueuedSynchronizer#release
public final boolean release(int arg) {
    // 调用 ReentrantLock.Sync 中的 tryRelease 方法尝试释放锁
    if (tryRelease(arg)) {
        Node h = head;
        // 如果出现 head == null 的情况,表示初始的时候 head 指向的是 null,当第一个节点入队后,
        // head 会被初始化为一个 dummy 节点。如果在其它节点入队之前就调用了 release() 方法,释放同步状态的话,
        // 则会出现 head == null 的情况。
        // 如果头节点不等于 null 且等待状态为 0,则表示头节点的后继节点(线程)是活跃的(正在运行的),无需唤醒。
        // 因此,这里的头节点 h 的等待状态不是 0 的话,说明后继节点对应的线程可能被阻塞了,
        // 则应该唤醒头节点的后继节点
        if (h != null && h.waitStatus != 0)
            // 唤醒头节点的后继节点
            unparkSuccessor(h);
        return true;
    }
    return false;
}

// ReentrantLock.Sync#tryRelease
protected final boolean tryRelease(int releases) {
    // 用 getState() 方法得到同步状态减去所释放的量 release,得到本次释放锁后的同步状态量。
    // 当 state 为 0 的时候,表示锁才完全释放完毕
    int c = getState() - releases;
    // 判断当前线程是否已经持有锁,仅允许持有锁的线程执行锁释放的逻辑
    if (Thread.currentThread() != getExclusiveOwnerThread())
        throw new IllegalMonitorStateException();
    boolean free = false;
    // 如果 c 为 0,则说明已经完全释放锁了,此时需要将持锁线程设为 null
    if (c == 0) {
        free = true;
        setExclusiveOwnerThread(null);
    }
    // 设置新的同步状态
    setState(c);
    return free;
}

需要注意的是,在if (c == 0) 进行判断的时候,如果当前线程释放锁之后的状态 state 为 0,则表示锁没有重入了,那么直接将锁的持有者设置为 null 即可,并返回 true,这就说明可以唤醒其它线程去获取锁了。而如果当前线程释放锁之后的状态 state 不为 0,则说明锁被重入了,则会返回 false,代表锁还未完全释放,不需要去唤醒其它线程。

区别

在获取锁的过程中,公平与非公平的区别在于是否含有 hasQueuedPredecessors() 方法,该方法用于判断是否其它线程比当前线程在同步队列中等待的时间更长,有的话返回 true,否则返回 false。如下所示:

1
2
3
4
5
6
7
8
      head               node1            node2              tail
        ↓                  ↓                ↓                  ↓
     +------+   prev   +------+   prev   +------+   prev   +------+
     |||||||| <------- |      | <------- |      | <------- |      |
     |||||||| -------> |      | -------> |      | -------> |      |
     +------+   next   +------+   next   +------+   next   +------+
     头节点已
   获得同步状态

对于上图来说,节点 node1 对应的线程比节点 node2 对应的线程在等待队列中等待的时间更长。如果 node2 调用 hasQueuedPredecessors() 方法的话,则会返回 true,而 node1 调用 hasQueuedPredecessors() 方法的话,则会返回 false。这时因为 node1 的前驱节点是头节点,它已经获取了同步状态,不处于等待状态。

如果在公平锁中将此方法去掉,则公平锁将不再“公平”,而会像非公平锁那样,抢占失败才会入队。

可中断方式获取锁

正如文章开头的表格中所提到的,ReentrantLock 可以响应可中断,即以中断的方式获取锁,如下所示:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
// ReentrantLock#lockInterruptibly
public void lockInterruptibly() throws InterruptedException {
    sync.acquireInterruptibly(1);
}

// AbstractQueuedSynchronizer#acquireInterruptibly
public final void acquireInterruptibly(int arg) throws InterruptedException {
    // 如果当前线程已经中断了,则抛出异常
    if (Thread.interrupted())
        throw new InterruptedException();
    // 如果当前线程仍未成功获取锁,则调用 doAcquireInterruptibly() 方法,
    // 该方法在独占的可中断的模式下获取锁
    if (!tryAcquire(arg))
        doAcquireInterruptibly(arg);
}

小结

对于以上获取锁与释放锁的底层实现,其实也在 AQS 那篇文章中讲到了,但本文从 ReentrantLock 层面分析了获取/释放锁的流程,也算是对 AQS 的一次复习和总结。在进行展开的时候,可以先从 synchronized 和 ReentrantLock 的区别说起,然后再逐渐过渡到共享模式和独占模式,最后再谈到在不同模式下的获取锁和释放锁的区别。

参考