在 JDK 1.5 中的 java.util.concurrent 包下,有一些在并发情况下控制线程的工具类,有的可以控制并发流程,而有的可以交换线程之间的数据。通过这些辅助类可以更好的帮助我们进行并发编程,而本文主要介绍 CountDownLatch、CyclicBarrier、Semaphore、Exchanger 这四个并发工具类。
CountDownLatch
CountDownLatch 可以允许一个或多个线程等待其它线程完成操作。通俗的说,假如有一个任务 A,它需要等待其它多个任务执行完之后才能执行。对于这样的场景,我们就可以使用 CountDownLatch 实现。我们不妨将 CountDownLatch 这几个单词拆开来看,count down 相当于倒数的意思,latch 相当于门闩(读作 shuan,一声)。可以理解为:在创建 CountDownLatch 对象的时候需要传入一个 count 参数,在这个 count “倒数”到 0 之前,主线程需要等在门口,而这个“倒数”的动作需要执行中的线程来驱动,每个线程执行完一次就“倒数”一次,等到这些执行中的线程都执行完之后,再将结果汇总(非必须),然后主线程才可以继续往下执行。
这里首先使用线程的 join 方法来实现类似于上面的功能,如下所示:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
|
public class JoinTest {
public static void main(String[] args) throws InterruptedException {
System.out.println("主线程 main 开始执行...");
Thread t1 = new Thread(new Runnable() {
@Override
public void run() {
System.out.println("线程 t1 完成...");
}
});
Thread t2 = new Thread(new Runnable() {
@Override
public void run() {
System.out.println("线程 t2 完成...");
}
});
t1.start();
t2.start();
t1.join();
t2.join();
System.out.println("t1 和 t2 均执行完毕...");
System.out.println("继续执行主线程 main...");
System.out.println("主线程 main 执行完毕...");
}
}
|
输出结果如下所示:
1
2
3
4
5
6
|
主线程 main 开始执行...
线程 t1 完成...
线程 t2 完成...
t1 和 t2 均执行完毕...
继续执行主线程 main...
主线程 main 执行完毕...
|
需要注意的是,第 2 行和第 3 行输出语句的顺序可能是不同的,因为当运行程序以后,有可能是 t1 先获得 CPU 时间片,则 t1 先执行,也有可能是 t2 先获得 CPU 时间片,t2 先执行,因此输出结果就有可能不同了。
以上实现的功能是,通过使用 join 方法让当前执行的线程
等待join 线程
执行结束后再执行。也就是说,join 方法让主线程 main 等待线程 t1 和线程 t2 执行完以后再执行。实现的原理是,不停的检查 join 线程是否存活,如果 join 线程存活则让当前线程永远等待。
join 方法内部调用了 wait(0) 方法,等待 join 线程执行完以后,线程的 notifyAll() 方法就会被调用,此调用方式是在 JVM 层面实现的。核心源码如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
|
public final void join() throws InterruptedException {
join(0);
}
public final synchronized void join(long millis) throws InterruptedException {
...
if (millis < 0) {
...
}
if (millis == 0) {
while (isAlive()) {
wait(0);
}
} else {
...
}
}
|
以上稍微提到了一些 join 方法的使用与注意事项,当然,我们也可以使用 CountDownLatch 来实现 join 的功能,而且功能比 join 更多,如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
|
public class CountDownLatchTest {
private static CountDownLatch latch = new CountDownLatch(2);
public static void main(String[] args) throws InterruptedException {
new Thread(() -> {
try {
System.out.println("子线程 " + Thread.currentThread().getName() + " 正在执行...");
Thread.sleep(3000);
System.out.println("子线程 " + Thread.currentThread().getName() + " 执行完毕...");
// 将 latch 减 1
latch.countDown();
} catch (InterruptedException e) {
e.printStackTrace();
}
}, "t1").start();
new Thread(() -> {
try {
System.out.println("子线程 " + Thread.currentThread().getName() + " 正在执行...");
Thread.sleep(3000);
System.out.println("子线程 " + Thread.currentThread().getName() + " 执行完毕...");
// 将 latch 减 1
latch.countDown();
} catch (InterruptedException e) {
e.printStackTrace();
}
}, "t2").start();
System.out.println("等待两个子线程执行完毕...");
// 如果 latch 中的 count 值还没有减到 0 的话,
// 则调用 await 方法的宿主线程将会一直处于阻塞状态,
// 直到 count 的值减为 0,则宿主线程才会继续执行下去
latch.await();
System.out.println("两个子线程已经执行完毕...");
System.out.println("继续执行主线程 main...");
}
}
|
输出结果如下所示:
1
2
3
4
5
6
7
|
子线程 t1 正在执行...
等待两个子线程执行完毕...
子线程 t2 正在执行...
子线程 t1 执行完毕...
子线程 t2 执行完毕...
两个子线程已经执行完毕...
继续执行主线程 main...
|
以上代码的逻辑是:首先创建了一个给定 count 值为 2 的 CountDownLatch 对象,然后开启了两个线程,这两个线程所完成的工作都是一样的,这里用Thread.sleep(3000);
模拟线程正在工作(当然有些不恰当,但将其理解成线程正在处理任务即可),当执行完以后调用 countDown() 方法,表示将 count 减 1,在最后调用了 await() 方法表示使调用该方法的线程处于等待状态,这里就是主线程 mian 调用了 await() 方法从而阻塞了,一直等到 t1 和 t2 都执行完之后,主线程 main 才继续执行。
countDown() 方法一般由执行任务的线程调用,同一个线程可以调用多次,即每次都会使计数器减 1。既可以将该方法在多个线程内执行,也可以属于 1 个线程的多个执行步骤。
await() 方法一般由主线程调用,但也没有规定 await() 方法只能由一个线程执行该方法,如果多个线程同时执行 await() 方法,那么这几个线程都将处于等待状态,以共享模式享有同一把锁。此外,如果不想让主线程一直等待其它线程执行完毕,还可以使用 await(long timeout, TimeUnit unit) 方法,表明等待特定的时间后,就不再阻塞主线程了。
CountDownLatch 的使用场景:
CountDownLatch 特别适合将一个问题分成 N 个子部分的场景,将 N 个子部分交给 N 个线程去处理,等到所有的子部分都完成以后再交给主线程处理。由此可见,总的执行时间取决于执行最慢的任务,但平均来看,可谓是大大减少了总的执行时间。例如某个列表页面需要获取许多图片,如果使用的单个线程顺序获取的话,那么等待时间会很长。而使用 CountDownLatch 的话,可以对获取图片的操作进行拆分,并行的获取图片,这样就缩短了总的获取时间。
主要源码如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
|
public CountDownLatch(int count) {
if (count < 0) throw new IllegalArgumentException("count < 0");
this.sync = new Sync(count);
}
public void await() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
public boolean await(long timeout, TimeUnit unit)
throws InterruptedException {
return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
}
public void countDown() {
sync.releaseShared(1);
}
|
需要注意的是,CountDownLatch 是基于 AbstractQueuedSynchronizer 实现的,即 CountDownLatch 的内部类 Sync 继承了 AbstractQueuedSynchronizer。对于 AbstractQueuedSynchronizer 的分析,可以查看文章AbstractQueuedSynchronizer。
CyclicBarrier
从字面意思上看,CyclicBarrier 就是循环屏障的意思,或者说循环栅栏。它的作用是让一组线程到达某个屏障的时候被阻塞,直到最后一个线程到达该屏障时,这个屏障才会打开,最后所有被屏障拦截的线程才会继续往下运行。也就是说,只有所有的线程都到齐之后,这个栅栏才能打开,此时所有的线程才能继续执行下去。叫做“循环”是因为当所有的线程被释放掉以后,那么这个屏障还是可以被使用到的。
在使用 CyclicBarrier 的时候,需要传入一个参数 parties,表明屏障拦截的线程数量,每个线程调用 await 方法告诉 CyclicBarrier 我已经到达了屏障,也就是处于 Barrier 的状态,然后当前线程会被阻塞。而参数 barrierAction 表示当这些线程都到达了这个屏障的时候,优先执行 barrierAction,方便处理更复杂的业务场景。
1
2
3
4
5
6
7
8
9
10
11
|
public CyclicBarrier(int parties) {
this(parties, null);
}
public CyclicBarrier(int parties, Runnable barrierAction) {
if (parties <= 0) throw new IllegalArgumentException();
this.parties = parties;
this.count = parties;
this.barrierCommand = barrierAction;
}
|
await 方法有两个重载的版本,无参的 await 方法用来阻塞当前线程,直至所有线程都到达屏障的时候再同时执行后序任务,而含参的 await 方法则是让这些线程等待一段时间,等到时间结束后,如果还有线程没有到达屏障状态,则直接让已到达屏障状态的线程继续往下执行。源码如下所示:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
|
public int await() throws InterruptedException, BrokenBarrierException {
try {
return dowait(false, 0L);
} catch (TimeoutException toe) {
throw new Error(toe);
}
}
public int await(long timeout, TimeUnit unit)
throws InterruptedException,
BrokenBarrierException,
TimeoutException {
return dowait(true, unit.toNanos(timeout));
}
|
下面通过示例来理解 CyclicBarrier 的使用。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
|
public class CyclicBarrierTest {
static CyclicBarrier cb = new CyclicBarrier(2);
public static void main(String[] args) {
new Thread(() -> {
System.out.println("线程 " + Thread.currentThread().getName() + " 准备阻塞...");
try {
cb.await();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
System.out.println("线程 " + Thread.currentThread().getName() + " 执行完毕...");
}, "t1").start();
System.out.println("线程 " + Thread.currentThread().getName() + " 准备阻塞...");
try {
cb.await();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
System.out.println("线程 " + Thread.currentThread().getName() + " 执行完毕...");
}
}
|
输出结果如下所示(输出顺序不唯一):
1
2
3
4
|
线程 main 准备阻塞...
线程 t1 准备阻塞...
线程 t1 执行完毕...
线程 main 执行完毕...
|
这里有一个主线程 main 和一个子线程 t1,对于上面的这种输出结果来说,由于 CyclicBarrier 初始化的时候被设置为 2,所以首先执行了线程 t1,此时调用了 await 方法,由于线程数量不等于 2,则此时 t1 阻塞在了 Barrier 状态;然后又执行了主线程 main,调用了 await 方法,由于处于 Barrier 状态的线程数量达到 2 了,所以两个程序就会继续执行下去。
而如果将 CyclicBarrier 的初始化值设置为 3,则程序永远不会停止。因为当前程序最多只有两个线程到达 Barrier 状态,此时这两个线程就会一直处于阻塞状态,而不会继续执行下去。
当每个线程到达栅栏位置(即处于 Barrier 状态)的时候,都会调用 await 方法进入阻塞状态,只有当最后一个线程也到达之后,这些阻塞的线程才能全部继续往下执行
。而如果想在全部继续往下执行
之前,做一些其它事情的话,可以使用 CyclicBarrier(int parties, Runnable barrierAction) 构造函数。如下所示:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
|
public class CyclicBarrierTest_02 {
public static void main(String[] args) {
CyclicBarrier cb = new CyclicBarrier(4, new Runnable() {
@Override
public void run() {
System.out.println("123");
}
});
for (int i = 0; i < 4; i++) {
new Writer(cb).start();
}
}
static class Writer extends Thread {
private CyclicBarrier cb;
public Writer(CyclicBarrier cb) {
this.cb = cb;
}
@Override
public void run() {
System.out.println("线程 " + Thread.currentThread().getName() + " 正在写入数据...");
try {
Thread.sleep(5000); // 模拟写入数据动作
System.out.println("线程 " + Thread.currentThread().getName() + " 写入数据完毕,等待其它线程写入完毕...");
cb.await();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
System.out.println("所有线程写入完毕,继续处理其它任务...");
}
}
}
|
输出结果如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
|
线程 Thread-0 正在写入数据...
线程 Thread-1 正在写入数据...
线程 Thread-3 正在写入数据...
线程 Thread-2 正在写入数据...
线程 Thread-1 写入数据完毕,等待其它线程写入完毕...
线程 Thread-0 写入数据完毕,等待其它线程写入完毕...
线程 Thread-3 写入数据完毕,等待其它线程写入完毕...
线程 Thread-2 写入数据完毕,等待其它线程写入完毕...
123
所有线程写入完毕,继续处理其它任务...
所有线程写入完毕,继续处理其它任务...
所有线程写入完毕,继续处理其它任务...
所有线程写入完毕,继续处理其它任务...
|
当 4 个线程都到达 Barrier 状态后,会在 4 个线程执行之前先会输出 123,然后这 4 个线程才继续往下执行。通过这种实现方式,我们可以想到 CyclicBarrier 的使用场景,例如用于多线程计算数据,让每个线程分别去计算各自的数据,然后将各个线程所计算出的结果进行汇总即可。
CountDownLatch 和 CyclicBarrier 的区别?
虽然这两个并发工具类都可以将其理解成维护了一个计数器,但这两者还是有一些差别的:
- 等待方式不同:
- CountDownLatch 一般用于某个线程 A 等待若干个其它线程执行完以后,线程 A 才继续执行,它强调的是一个线程等待其它线程完成某件事;
- CyclicBarrier 一般用于一组(多个)线程相互等待至某个状态,然后这一组(多个)线程再同时执行,它强调的是线程之间相互等待,等到大家都完成,再一起继续往下执行。
- 执行方式不同:
- CountDownLatch 是在多个线程都进行了 latch.countDown() 后才会触发事件,唤醒 await 在 latch 上的线程,而执行 countDown() 的线程在执行完 countDown() 后会继续执行自己线程的工作;
- CyclicBarrier 用于同步所有调用 await 方法的线程,线程执行了 await 方法后并不会执行之后的代码,而只有当执行 await() 方法的线程数量等于指定的 parties 之后,这些执行了 await 方法的线程才会同时运行。
- 循环次数不同:
- CountDownLatch 的计数器只能使用一次,即计数器减为 0 就不能被重置了;
- CyclicBarrier 的计数器可以通过 reset() 方法重置。
Semaphore
Semaphore 用于控制线程并发的数量,它维护了一组许可证
,在构造 Semaphore 的时候,需要传入想要管理的线程数量,通过调用 acquire() 方法获取一个许可,通过 release() 释放一个许可,因此 Semaphore 相当于一个并发控制器。构造方法如下:
1
2
3
4
5
6
7
8
|
public Semaphore(int permits) {
sync = new NonfairSync(permits);
}
public Semaphore(int permits, boolean fair) {
sync = fair ? new FairSync(permits) : new NonfairSync(permits);
}
|
参数 permits 表示的实际上是可用的许可的数量,如果设置为 5 的话,则表明允许 5 个线程获取许可证,即最大并发数是 5。参数 fair 表示可以设置公平锁还是非公平锁。Semaphore 中的重要方法如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
|
public void acquire() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
public void acquire(int permits) throws InterruptedException {
if (permits < 0) throw new IllegalArgumentException();
sync.acquireSharedInterruptibly(permits);
}
public void release() {
sync.releaseShared(1);
}
public void release(int permits) {
if (permits < 0) throw new IllegalArgumentException();
sync.releaseShared(permits);
}
|
无参的 acquire() 方法表示获取一个许可,如果没有许可可以获得的话,则会一直等待,直到能够获取许可。而有参的 acquire(int permits) 方法表示获取多个许可。相应的,release() 方法表示释放一个许可,有参的 release(int permits) 方法表示释放多个许可。需要注意的是,在释放许可之前需要先获得许可。
由于以上方法有可能发生阻塞,所以还有几个 tryAcquire 方法,在没有获得许可的情况下会立即返回。下面通过示例说明 Semaphore 的使用。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
|
public class SemaphoreTest {
public static void main(String[] args) {
// 工人的数量
int N = 8;
// 机器的数量
Semaphore semaphore = new Semaphore(5);
for (int i = 0; i < N; i++) {
new Worker(i, semaphore).start();
}
}
static class Worker extends Thread {
private int num;
private Semaphore semaphore;
public Worker (int num, Semaphore semaphore) {
this.num = num;
this.semaphore = semaphore;
}
@Override
public void run() {
try {
semaphore.acquire();
System.out.println("工人 " + num + " 占用一个机器在生产...");
Thread.sleep(2000);
System.out.println("工人 " + num + " 释放出机器...");
semaphore.release();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
|
输出结果如下所示:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
|
工人 0 占用一个机器在生产...
工人 1 占用一个机器在生产...
工人 2 占用一个机器在生产...
工人 3 占用一个机器在生产...
工人 4 占用一个机器在生产...
工人 0 释放出机器...
工人 6 占用一个机器在生产...
工人 1 释放出机器...
工人 5 占用一个机器在生产...
工人 2 释放出机器...
工人 7 占用一个机器在生产...
工人 3 释放出机器...
工人 4 释放出机器...
工人 6 释放出机器...
工人 5 释放出机器...
工人 7 释放出机器...
|
由于工人的数量大于机器的数量,即 8 > 5,因此只能够允许 5 台机器同时被 5 个工人使用,也就是说只允许 5 台机器并发执行。需要注意的是,一台机器同时只能被一个工人使用,只有使用完了,其它工人才能继续使用。
Exchanger
顾名思义,Exchanger 类用于线程间进行数据的交换,如果第一个线程先执行 exchange() 方法,则该线程会一直等待另一个线程也调用 exchange() 方法,等到这两个线程都到达同步点
以后,此时就可以交换数据了。需要注意的是,如果一个线程 A 调用了 exchange() 方法而另一个线程 B 始终没有调用 exchange() 方法,则线程 A 会一直等待下去,因此可以使用重载的 exchange(V x, long timeout, TimeUnit unit) 方法设置最大等待时长,来避免一直等待的情况发生。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
|
public class ExchangerTest extends Thread {
private String str;
private Exchanger<String> exchanger;
private int sleepSecond;
public ExchangerTest(String str, Exchanger<String> exchanger, int sleepSecond) {
this.str = str;
this.exchanger = exchanger;
this.sleepSecond = sleepSecond;
}
@Override
public void run() {
try {
System.out.println(Thread.currentThread().getName() + " 启动,原始数据为: " + str + ",时间为: " + sleepSecond);
Thread.sleep(sleepSecond * 1000);
// 交换数据
str = exchanger.exchange(str);
System.out.println(Thread.currentThread().getName() + " 交换了数据,交换后的数据为: " + str + ",时间为: " + sleepSecond);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public static void main(String[] args) {
Exchanger<String> exchanger = new Exchanger<>();
ExchangerTest test1 = new ExchangerTest("111", exchanger, 3);
ExchangerTest test2 = new ExchangerTest("222", exchanger, 2);
test1.start();
test2.start();
}
}
|
输出结果如下所示:
1
2
3
4
|
Thread-0 启动,原始数据为: 111,时间为: 3
Thread-1 启动,原始数据为: 222,时间为: 2
Thread-0 交换了数据,交换后的数据为: 222,时间为: 3
Thread-1 交换了数据,交换后的数据为: 111,时间为: 2
|
对于线程 test1 和线程 test2,它们 sleep 的时间是不同的。睡眠 2 秒的线程肯定需要等待睡眠 3 秒的线程,因为前面也说了,既然需要交换数据,那么需要两个线程要到达一个时间点
或者同步点
,只有同时到达这个点以后,才能交换相应的数据。
参考