为了在多线程并发的情况下能够保证多个线程对数据进行合理的访问,避免出现并发情况下的线程安全问题,Java 从 JDK1.5 开始提供了 java.util.concurrent.atomic 包,它可以通过原子操作类来对某个变量实现更加简单、线程安全的更新操作。

概述

对于一个比较常见的线程安全问题:多个线程同时对某个对象名为 obj 的成员变量 i 进行自增操作。假设我们使用双核处理器来执行 A、B 两个线程,核 1 执行线程 A,核 2 执行线程 B,这两个线程都需要对变量 i=0 进行加 1 操作。此时就有可能发生 i 的最终结果是 1 而不是 2 的线程安全问题。

为什么会出现 i 的最终结果是 1 的情况

这是因为线程 A 先将内存中的变量 i 读取到核 1 中的算数运算单元,然后进行加 1 操作,再将这个计算结果写会到内存中。你可以看到,这个过程不是原子操作,只要线程 B 在线程 A 写回到内存之前,读取了内存中 i 的值(此时为 0),那么就一定会出现 i 的最终结果为 1 的情况。因为线程 A 和线程 B 读取到的 i 值都是 0,分别进行加 1 后的值都是 1,两个线程先后将计算结果写回到内存中,就发生了 i 被两次写入的值都是 1 的情况。

从计算机硬件方面讲,如果一段代码被认为是 Atomic(也就是原子的,不可再进行分割的),则表示这段代码在执行的过程中是不能被中断的。通常来说,原子指令由硬件提供,而原子方法则由软件实现。这里的原子方法指的是当某个线程进入到该方法后,不会被中断,而是直到其执行完成。例如,在 x86 平台上,CPU 提供了在指令执行期间对总线加锁的手段。CPU 芯片上有一条引线 #HLOCK pin,如果在汇编指令前添加一个LOCK前缀,则经过汇编以后的机器码就会使 CPU 在执行这条指令的时候把#HLOCK pin的电位拉低,一直持续到这条指令结束的时候才放开,从而把总线锁住,这样同一总线上的 CPU 就暂时不能通过总线访问内存了,这样就保证了这条指令在多处理器环境中的原子性。

对于以上这种线程不安全的更新操作,通常我们会使用 synchronized 将对象 obj 进行加锁来解决这个问题,但 synchronized 采用的是悲观锁的策略,并不是一种高效的手段。Java 在 JDK1.5 及其以后提供了 java.util.concurrent.atomic 包,供我们以一种简单、高效的方式来实现对变量进行更新的线程安全的操作。

java.util.concurrent.atomic 包提供了不同类型的原子更新方式,都采用的是以乐观锁的策略进行更新数据,其核心是 CAS操作。因此,在分析 Atomic 包之前,需要知道 Java 中 CAS 的概念。

CAS

多个线程在修改某一变量的时候需要加锁,加锁之后再进行修改,这样可以保证数据的正确性。如果想要在不上锁的情况下达到这种效果,则可以使用 CAS,即 Compare and Swap,底层使用汇编指令 CMPXCHG 实现。该指令有三个需要操作数:变量的内存值 V(value)、变量的当前预期值E(exception)以及变量想要更新的值U(update),当且仅当预期值 E 和内存值 V 相等时,将内存值 V 修改为想要更新的值 U 并返回 true,否则什么都不做返回 false。

我们可以利用 CAS 操作来解决上述变量自增遇到的问题,线程 A 将内存中的变量 i=0 读到一个临时变量中,然后再将 i 读取到核 1 的算术运算单元中,接下来进行加 1 操作,比较临时变量中的值和 i 当前的值是否相同,如果相同则用运算单元中的运算结果 1 覆盖内存中的 i 值(此过程属于原子操作,不能被中断并且其他线程中的 CAS 操作不能同时执行),否则执行失败。如果执行失败的话,说明另外一个线程 B 已经将 i 的值进行了修改。

因此可以看到,如果两个线程一开始读取 i 的都是 0,那么只能有一个线程的 CAS 操作能成功,因为 CAS 操作不能并发执行。对 CAS 操作执行失败的线程,只需要通过自旋(循环)操作,那么就一定会执行成功。注意到此过程没有发生阻塞。

如何理解自旋操作

假设线程 A 要对数值 0 进行自增操作:

  • 首先线程 A 会拿到 0 这个值,然后进行加 1 操作,得到结果值为 1;
  • 在写回内存的时候(也就是在覆盖原值的时候)需要判断原始的 0 值和我最开始拿到的 0 值是否是相同的:
    • 如果相同,则说明没有其它线程对 0 值进行修改过,则线程 A 会将修改后的 1 进行写回操作;
    • 如果不相同,则说明其它线程已经对 0 值进行修改过了,假设修改成了 1,则线程 A 此时需要再次获取 1 这个值,再进行自增操作变成 2,然后还需要比较之前的 1 是否和线程 A 拿到的 1 相等:
      • 如果相等,则将计算后的结果 2 写回;
      • 如果不相等,则说明有其它的线程已经修改过了,则还需要再次获取值。
  • 因此这是一个不断自旋的操作,只要之前的值和当前读取到的值不相等,就一直自旋,具体可以通过 while 循环进行不断的自旋。

但需要注意的是,CAS 会面临以下三个问题

  • ABA 问题。如果一个值原来是 A,由于某些原因,变成了 B,结果又变成了 A。那么使用 CAS 在检查时会发现它没有发生变化,但实际上已经发生了变化。也就是说,其它线程修改多次以后,最后得到的值和原始值相同,那么线程 A 就会认为这个值没有被修改过,但事实上已经被修改过了,只是线程 A 看不到而已。
  • 循环时间长开销大。自旋 CAS 如果长时间不成功,则会给 CPU 带来很大的执行开销。
  • 只能保证一个共享变量的原子操作。当只有一个共享变量的时候,使用 CAS 来保证原子操作是没有问题的。而如果对多个共享变量进行操作时,则无法保证操作的原子性。

对于如何 ABA 问题,可以给原始值添加一个版本号,每次变量更新的时候就会增加,例如 A->B->C 会变成 1A->2B->3C。当然,也可以加时间戳。此外,还可以通过 Atomic 包下的 AtomicStampedReference 中的 compareAndSet 方法来解决 ABA 问题。

Atomic

Atomic 包下的类所实现的功能就是在多线程环境下,当有多个线程同时对的变量进行操作时,具有排他性,即当多个线程同时对某变量进行修改的时候,仅有一个线程会成功,未成功的线程可以通过自旋操作不断尝试,直到成功为止。

由于变量的类型有很多,例如基本类型和引用类型,因此 Atomic 包下大体可以分为 4 种类型的原子更新方式,即对基本类型进行原子更新、对数组进行原子更新、对引用类型进行原子更新以及对属性(字段)进行原子更新。

需要注意的是,Atomic 包下的类基本都是使用的 Unsafe 实现的包装类。即 Atomic 包下的类中的核心方法会调用 Unsafe 类中的几个本地(native)方法。sun.misc.Unsafe 包含了大量的对 C 代码的操作,包括直接对内存进行操作以及原子操作的调用。称之为 Unsafe 是因为对该类中的方法进行调用会发生安全隐患,需要小心使用,有可能会遇到类似于 C++ 中的指针越界等问题。

Java 无法直接访问底层操作系统,而是通过本地方法(native)来访问。Unsafe 提供了硬件级别的操作,例如获取某个属性在内存中的位置,或者修改对象的字段值,只不过平时很少用到。

Atomic 包提供的类如下图所示:

种类

按照不同的类型,可分为如下 4 类:

  • 对基本类型进行原子操作:
    • AtomicBoolean
    • AtomicInteger
    • AtomicLong
  • 对数组进行原子操作:这里操作的表示数组,而是数组中的每个元素。
    • AtomicIntegerArray
    • AtomicLongArray
    • AtomicReferenceArray
  • 对引用类型进行原子操作:
    • AtomicReference
    • AtomicMarkableReference
    • AtomicStampedReference
  • 对属性(字段)类型进行原子操作:
    • AtomicIntegerFieldUpdater
    • AtomicLongFieldUpdater
    • AtomicReferenceFieldUpdater

对于常用的 AtomicInteger、AtomicReference 以及 AtomicStampedReference,这里以 AtomicInteger 为例,介绍该类中的几个方法。

需要注意的是:JDK1.7 中 AtomicInteger 下的多个方法与 JDK1.8 中的多个方法的实现方式是不同的,这里以 JDK1.8 为例。

基本类型的原子操作

先来看 AtomicInteger 的构造方法

1
2
3
4
5
6
7
8
private volatile int value;

public AtomicInteger() {
}

public AtomicInteger(int initialValue) {
    value = initialValue;
}

如果在初始化的时候指定了参数 initialValue,则会将该参数保存在被 volatile 修饰的 value 里,volatile 可以保证一个线程的修改能够被其它线程所看到,即内存的可见性。

addAndGet(int delta) 方法使用原子的方式将输入的值 delta 与上述的 value 值进行相加,并返回结果,如下所示:

1
2
3
public final int addAndGet(int delta) {
    return unsafe.getAndAddInt(this, valueOffset, delta) + delta;
}

compareAndSet(int expect, int update) 属于 CAS 最核心的操作,如果当前值等于预期值,则以原子的方式将该值设置为给定的更新值,如下:

1
2
3
public final boolean compareAndSet(int expect, int update) {
    return unsafe.compareAndSwapInt(this, valueOffset, expect, update);
}

getAndIncrement() 方法以原子的方式将当前值加 1。需要注意的是该方法返回的是之前的值。如果想要返回更新后的值,可以使用incrementAndGet() 方法,如下所示:

1
2
3
4
5
6
7
public final int getAndIncrement() {
    return unsafe.getAndAddInt(this, valueOffset, 1);
}

public final int incrementAndGet() {
    return unsafe.getAndAddInt(this, valueOffset, 1) + 1;
}

下面就是一个简单的示例:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
public class AtomicIntegerTest {

    public static void main(String[] args) {
        AtomicInteger atomicInteger = new AtomicInteger(1);

        System.out.println(atomicInteger.get());
        System.out.println(atomicInteger.getAndIncrement());
        System.out.println(atomicInteger.get());
        System.out.println(atomicInteger.incrementAndGet());
    }
}

输出结果如下所示:

1
2
3
4
1
1
2
3

get() 方法返回当前的值,因为初始化的时候设置的值为 1,因此 get() 方法返回 1;getAndIncrement() 方法对 1 进行了自增操作,因此 value 是 2,但该方法返回的是之前的值,即 1;这里又一次调用了 get() 方法,因此返回的是 2;最后调用了 incrementAndGet() 方法,该方法对 2 进行了自增操作,然后返回的是更新后的值,也就是 3。

这里以 getAndIncrement() 的源码为例,来理解整个 CAS 的过程,如下所示:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
// AtomicInteger.java
public final int getAndIncrement() {
    return unsafe.getAndAddInt(this, valueOffset, 1);
}


// Unsafe.class
public final int getAndAddInt(Object var1, long var2, int var4) {
    int var5;
    do {
        var5 = this.getIntVolatile(var1, var2);
    } while(!this.compareAndSwapInt(var1, var2, var5, var5 + var4));

    return var5;
}


// Unsafe.class
public native int getIntVolatile(Object var1, long var2);


// Unsafe.class
public final native boolean compareAndSwapInt(Object var1, long var2, int var4, int var5);

在 JDK1.8 下,调用 AtomicInteger 类下的 getAndIncrement() 方法,实际调用的是 Unsafe 类下的 getAndAddInt(Object var1, long var2, int var4) 方法。因此这里的 var1 表示当前对象 this,var2 表示共享变量在当前对象上的内存偏移量,var4 表示传递过来的值是 1。

在 getAndAddInt(Object var1, long var2, int var4) 方法中,首先定义了 var5 表示期望的值,然后通过 do-while 直到更新成功才跳出循环。在 do{ } 中通过 getIntVolatile() 方法获取 value 的最新值。在 while() 中,var5 + var4 表示需要更新的值,如果 compareAndSwapInt() 方法返回 false,说明 value 值被其它线程修改了,于是就循环重试,再次获取最新值,再次计算需要跟新的值 var5 + var4,直到更新成功。

这里使用了 var1、var2、var3、var4、var5,看起来不是很直观,现将其修改如下:

1
2
3
4
5
6
7
8
public final int getAndAddInt(Object obj, long valueOffset, int var) {
    int expect;
    do {
        expect = this.getIntVolatile(obj, valueOffset);
    } while(!this.compareAndSwapInt(obj, valueOffset, expect, expect + var));

    return expect;
}

数组类型的原子操作

以 AtomicIntegerArray 为例,该类型是对数组中的每个元素以原子的方式进行更新。先看它的构造方法:

1
2
3
4
5
6
7
8
9
private final int[] array;

public AtomicIntegerArray(int length) {
    array = new int[length];
}

public AtomicIntegerArray(int[] array) {
    this.array = array.clone();
}

在初始化的时候,可以指定数组的长度,也可以传入一个指定的数组。需要注意的是,后面的这种方式会将数组复制一份,因此当 AtomicIntegerArray 对数组中的元素进行修改时,不会影响传入的数组。

下面看常用的方法:

1
2
3
4
5
6
7
public final int addAndGet(int i, int delta) {
    return getAndAdd(i, delta) + delta;
}

public final int getAndAdd(int i, int delta) {
    return unsafe.getAndAddInt(array, checkedByteOffset(i), delta);
}

addAndGet() 方法表示以原子的方式将给定的值与索引为 i 的值进行相加,返回更新后的值。而 getAndAdd() 方法作用和 addAndGet() 方法是一样的,但不同的是,getAndAdd() 方法返回更新前的值

而下面的 compareAndSet() 方法表示如果当前值等于预期值,则以原子的方式将数组位置 i 的元素设置成 update 值。

1
2
3
4
5
6
7
public final boolean compareAndSet(int i, int expect, int update) {
    return compareAndSetRaw(checkedByteOffset(i), expect, update);
}

private boolean compareAndSetRaw(long offset, int expect, int update) {
    return unsafe.compareAndSwapInt(array, offset, expect, update);
}

下面通过一个示例进行说明:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
public class AtomicIntegerArrayTest {

    public static void main(String[] args) {

        AtomicIntegerArray ai = new AtomicIntegerArray(new int[]{1, 2, 3});

        System.out.println(ai.get(0));
        System.out.println(ai.addAndGet(0, 5));
        System.out.println("--");

        System.out.println(ai.getAndAdd(1, 5));
        System.out.println(ai.get(1));
        System.out.println("--");

        System.out.println(ai.compareAndSet(2, 10, 30));
        System.out.println(ai.get(2));
        System.out.println("--");

        System.out.println(ai.compareAndSet(2, 3, 30));
        System.out.println(ai.get(2));
    }
}

输出结果如下所示:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
1
6
--
2
7
--
false
3
--
true
30

这里一共分为 4 个部分:

  • 第一部分:
    • 首先获取数组索引为 0 的元素,因此输出 1;
    • 然后将索引为 0 的元素与 5 相加,返回更新后的值,因此输出 6。
  • 第二部分:
    • 首先将数组索引为 1 的元素与 5 相加,由于 getAndAdd() 方法返回更新前的值,因此输出 2;
    • 然后获取数组索引为 1 的元素,因此输出 7。
  • 第三部分:
    • 首先进行更新操作,这里想要将索引为 2 的元素 3 更新成 30,但由于期望值 10 与之前位置上的 3 不相等,因此返回 false;
    • 然后通过查看索引为 2 的元素可以看到,该位置上的元素还是 3,没有被更新成功。
  • 第四部分:
    • 首先也是进行更新操作,这里也是想要将索引为 2 的元素 3 更新成 30,由于期望值 3 与 之前的位置上的 3 相等,因此返回 true;
    • 然后通过查看索引为 2 的元素可以看到,该位置上的元素为 30,说明更新成功。

引用类型的原子操作

这里以 AtomicReference 为例,首先看一下它的构造方法:

1
2
3
4
5
6
7
8
private volatile V value;

public AtomicReference(V initialValue) {
    value = initialValue;
}

public AtomicReference() {
}

在创建对象的时候,可以给 AtomicReference 传入一个值,该值将会赋值给被 volatile 修饰的 value,以供其它的方法(如 get()、set() 等)使用。

下面的 get()、set() 方法也很简单:

1
2
3
4
5
6
7
public final V get() {
    return value;
}

public final void set(V newValue) {
    value = newValue;
}

接下来通过一个示例说明如何以原子的方式更新引用类型:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
public class AtomicReferenceTest {

    static AtomicReference<User> ar = new AtomicReference<>();

    public static void main(String[] args) {

        User user = new User("zhangsan", 25);
        ar.set(user);

        User updateUser = new User("lisi", 30);
        System.out.println(ar.compareAndSet(user, updateUser));

        System.out.println(ar.get().getName());
        System.out.println(ar.get().getAge());
    }

    static class User {
        private String name;
        private int age;

        public User(String name, int age) {
            this.name = name;
            this.age = age;
        }

        public String getName() {
            return name;
        }

        public int getAge() {
            return age;
        }
    }
}

输出结果如下所示:

1
2
3
true
lisi
30

首先定义了一个内部类 User,并定义了两个字段 name 和 age,给出了 getter() 方法。然后创建了一个 user 对象 (“zhangsan”, 25),并通过 set() 方法将该对象设置到了 AtomicReference 中。最后调用了 compareAndSet() 方法进行了原子更新操作,实现的原理和 AtomicInteger 中的 compareAndSet() 方法相同。

如果将ar.set(user);语句进行注释,则 compareAndSet() 方法会返回 false,因为 AtomicReference 中的 value 没有被设置任务值,当通过 compareAndSet() 进行比较的时候,当前值不等于预期值,因此将会返回 false。

解决 ABA 问题

这里可以使用 AtomicStampedReference 来解决 ABA 问题,它在实现 compareAndSet() 方法的时候除了要比较当前对象的值和预期值以外,还需要比较当前的戳值和预期戳值,只有全部相同时,CAS 才能操作成功。每次更新的时候,戳值都会发生变化,我们可以自己设置该戳值。

1
2
3
4
5
6
7
public boolean compareAndSet(V expectedReference, V newReference, int expectedStamp, int newStamp) {
    Pair<V> current = pair;
    return expectedReference == current.reference &&
            expectedStamp == current.stamp &&
            ((newReference == current.reference &&
            newStamp == current.stamp) || casPair(current, Pair.of(newReference, newStamp)));
}

以上就是使用 AtomicStampedReference 来解决 ABA 问题的主要方法,但是你会发现,这里的 Pair 又是什么呢?这里的 Pair 其实是 AtomicStampedReference 的一个内部类,它将元素值和版本号绑定在了一起,存储在 Pair 的 reference 和 stamp 中,如下所示:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
private static class Pair<T> {
    final T reference;
    final int stamp;
    private Pair(T reference, int stamp) {
        this.reference = reference;
        this.stamp = stamp;
    }
    static <T> Pair<T> of(T reference, int stamp) {
        return new Pair<T>(reference, stamp);
    }
}

当了解了内部类 Pair 之后,现在我们看一下 compareAndSet() 方法到底是如何执行的。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
public boolean compareAndSet(V expectedReference, V newReference, int expectedStamp, int newStamp) {
    // 获取当前的(元素值,版本号)对
    Pair<V> current = pair;
    return
            // 如果引用没有变
            expectedReference == current.reference &&
            // 如果版本号也没有变
            expectedStamp == current.stamp &&
            // 如果新的引用等于旧的引用
            ((newReference == current.reference &&
            // 如果新的时间戳等于旧的时间戳
            newStamp == current.stamp) || 
            // 构造新的 Pair 对象并使用 CAS 的方式更新
            casPair(current, Pair.of(newReference, newStamp)));
}


private boolean casPair(Pair<V> cmp, Pair<V> val) {
    // 调用 Unsafe 中的 compareAndSwapObject 方法通过 CAS 的方式更新 Pair 的引用为新的引用
    return UNSAFE.compareAndSwapObject(this, pairOffset, cmp, val);
}

因此,总体的流程为:

  • 如果元素值和版本号都没有发生变化,并且和新的也相同,那么返回 true;
  • 如果元素值和版本号都没有发生变化,并且和新的不完全相同,那么就构造一个新的 Pair 对象并通过 CAS 的方式更新 Pair。

当然也可以使用 AtomicMarkableReference,它的内部存储方式也是使用了一个内部类 Pair,但与 AtomicStampedReference 不同的是,AtomicMarkableReference 维护的不是一个版本号,而是维护一个 boolean 类型的标记 mark,根据这个标记 mark 是否修改过,来解决 ABA 问题。

属性(字段)类型的原子操作

这里以 AtomicIntegerFieldUpdater 为例,它属于以原子的方式更新整型字段的更新器。要想原子的更新字段类,首先由于原子更新类都是抽象类,因此使用的时候必须使用静态方法 newUpdater() 创建一个更新器,并且需要设置想要更新的类和属性,然后在更新类的字段或属性的时候需要使用 public volatile 进行修饰。如以下示例:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
public class AtomicIntegerFieldUpdaterTest {
    private static AtomicIntegerFieldUpdater<User> atomicIntegerFieldUpdater = 
                    AtomicIntegerFieldUpdater.newUpdater(User.class, "age");

    public static void main(String[] args) {
        User user = new User("zhangsan", 25);
        System.out.println(atomicIntegerFieldUpdater.getAndIncrement(user));
        System.out.println(atomicIntegerFieldUpdater.get(user));
    }


    static class User {
        private String name;
        // 注意这里需要使用 public volatile 修饰 age
        public volatile int age;

        public User(String name, int age) {
            this.name = name;
            this.age = age;
        }

        public String getName() {
            return name;
        }

        public int getAge() {
            return age;
        }
    }
}

输出结果如下所示:

1
2
25
26

首先创建了用于原子操作的更新器 AtomicIntegerFieldUpdater,并指明了需要更新的对象类以及对象的属性,这里指定属性age作为需要更新的属性,但前提是需要在 User 类中将 age 属性使用 public volatile 修饰。然后通过 getAndIncrement() 方法将 age 字段进行加 1 操作,但需要注意的是,该方法返回的是更新之前的值,即 25,最后通过调用 get() 方法给定对象的当前 age 的值,也就是更新后的值,即 26。

这里的 getAndIncrement() 方法也是通过 CAS 进行不断的自旋进行操作的,如下所示:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
public int getAndIncrement(T obj) {
    int prev, next;
    do {
        prev = get(obj);
        next = prev + 1;
    } while (!compareAndSet(obj, prev, next));
    return prev;
}

public abstract int get(T obj);

参考