之前的文章,比如在多线程环境中使用 synchronized 和 Lock 可以通过锁的方式实现线程之间的同步互斥访问临界资源。也就是说,只有当一个线程释放了锁之后,另一个线程才能获得锁,从而对数据进行操作。但在有些情况下,需要多个线程之间进行协作,以使得多个线程可以一起去解决某个问题,此时就需要采用线程通信-协作机制。
概述
总体来讲,线程之间有以下几种通信方式:
- 等待/通知机制,即 wait() 和 notify();
- 使用 join();
- 管道输入输出流,使用内存作为传输媒介;
- 生产者-消费者模型;
- 使用 Condition 控制线程通信;
- 使用阻塞队列(BlockingQueue)控制线程通信。
而进程之间的通信方式主要有以下几种:
- 管道(pipe)
- 信号量机制(semophore)
- 消息队列(message queue)
- 共享内存(shared memory)
- 套接字(socket)
假如我们不使用以上的方式,而是自己实现两个线程之间的通信的话,可以使用不断轮询的方法,也就是使用 while 语句来让一个线程不断检测是否满足某个条件。这样做的缺点是浪费 CPU 的资源,我们通过以下例子进行说明。
该例子包含三个类:
- MyList:即资源类,用于设置临界资源,让线程 A 和线程 B 进行访问;
- ThreadA:即线程 A,不断地向资源类中添加元素;
- ThreadB:即线程 B,使用 while(ture) 语句不断轮询,等到临界资源的大小达到某一值的时候做出响应。
如下所示:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
|
// 资源类
class MyList {
// 临界资源
private volatile List<String> list = new ArrayList<>();
public void add() {
list.add("123");
}
public int size() {
return list.size();
}
}
// 线程 A
class ThreadA extends Thread {
private MyList list;
public ThreadA(MyList list, String name) {
super(name);
this.list = list;
}
@Override
public void run() {
try {
for (int i = 0; i < 5; i++) {
list.add();
System.out.println("添加了 " + (i + 1) + " 个元素.");
Thread.sleep(1000);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
// 线程 B
class ThreadB extends Thread {
private MyList list;
public ThreadB(MyList list, String name) {
super(name);
this.list = list;
}
@Override
public void run() {
try {
while (true) {
if (list.size() == 4) {
System.out.println("list 的大小为 4 了,线程 B 要退出了!");
throw new InterruptedException();
}
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public class Test{
public static void main(String[] args) {
MyList service = new MyList();
ThreadA a = new ThreadA(service, "A");
ThreadB b = new ThreadB(service, "B");
a.start();
b.start();
}
}
|
输出结果如下所示:
1
2
3
4
5
6
7
8
|
添加了 1 个元素.
添加了 2 个元素.
添加了 3 个元素.
添加了 4 个元素.
list 的大小为 4 了,线程 B 要退出了!
java.lang.InterruptedException
at Part_05.ThreadB.run(Test.java:60)
添加了 5 个元素.
|
从结果可以看出,线程 A 不断地向 list 中添加元素,等到 list 的大小为 4 时线程 B 才退出。显然这种通信方式效率不高,CPU 的利用率也不大,并且非常耗费资源。因为线程 B 每次都需要去检查 list 中元素的个数,每次都需要做出判断。
所以,为了提高线程之间的通信效率,提高 CPU 的利用率,下面则介绍几种线程之间通信的方式。
wait/notify 机制
wait/notify 机制,即等待/通知机制,这里的等待/通知就像我们去银行办理业务。一般情况下,我们到达银行后需要取号,如果没有空闲的业务窗口的话,则需要坐在一旁等待一段时间,在等待的过程中,如果业务员通过广播叫到了我们自己手中的号码,则就去对应的窗口办理业务。你看,这里的等待就相当于 wait,而业务员的叫号操作就相当于 notify,也就是通知。
等待/通知机制主要使用的是 Object 类中的三个方法:wait()、notify()、notify()。
既然是与线程通信相关,为什么这三个方法是属于 Object 类而不属于 Thread 类呢?
因为对于每一个对象来说,它们都拥有一个锁,即 monitor。让当前线程等待某个对象的锁,则应该通过这个对象来操作,而不是用当前线程来操作。因为当前线程可能会等待多个线程的锁,如果通过线程来操作的话,那会显得十分复杂。
wait()
Object 类中的 wait() 方法里又调用了含参的 wait(long timeout) 方法,当然除了这两个方法外,还有一个含有两个参数的 wait(long timeout, int nanos) 方法,如下所示:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
|
public final void wait() throws InterruptedException {
wait(0);
}
public final native void wait(long timeout) throws InterruptedException;
public final void wait(long timeout, int nanos) throws InterruptedException {
if (timeout < 0) {
throw new IllegalArgumentException("timeout value is negative");
}
if (nanos < 0 || nanos > 999999) {
throw new IllegalArgumentException(
"nanosecond timeout value out of range");
}
if (nanos > 0) {
timeout++;
}
wait(timeout);
}
|
注意到以上的三个 wait 方法都是被 final 修饰的,而 wait(long timeout) 还被 native 修饰,属于本地方法。wait 方法的作用是让当前线程释放对象的锁并进入等待状态,直到另一个线程调用该对象的 notify() 方法或 notifyAll() 方法。。
详细的说:当前线程指的是 Thread.concurrentThread() 方法所返回的线程。并且当前线程必须拥有这个对象的锁(monitor),该线程释放对该锁(monitor)的所有权,然后等待另一个线程通过调用 notify 方法或 notifyAll 方法通知等待该对象的锁的唤醒。该线程会一直处于等待状态,直到它能重新获得锁的所有权并恢复执行。需要注意的是,该方法只能由这个对象的锁(monitor)的所有者线程调用。
wait() 方法的小结:
- wait() 方法的作用是使当前执行代码的线程进入等待状态,该方法会将线程放入“预执行队列”中,并且会在 wait() 所在的代码处停止执行,直到收到通知或被中断为止;
- 在调用 wait() 方法之前,线程必须获得该对象的锁,即只能在同步方法或同步代码块中调用 wait() 方法;
- wait() 是释放锁的,也就是说在执行到 wait() 方法之后,当前线程会释放锁,当从 wait() 方法返回之前,线程与其它线程会相互竞争以重新获得锁。
notify()/notifyAll()
notify() 方法会唤醒一个正在等待相应对象锁的线程,使其进入就绪队列,以便在当前线程释放锁后接着竞争锁,进而得到 CPU 的执行。如下所示:
1
|
public final native void notify();
|
注意这里的唤醒方式,如果有许多线程在这个对象上等待,则会只选择其中一个线程被唤醒,这个选择是任意的,由实现者自己决定。
而 notifyAll() 唤醒的方式是唤醒所有正在等待相应对象锁的线程,这里需要注意一下。源码如下所示:
1
|
public final native void notifyAll();
|
notify()/notifyAll() 小结:
- 与 wait() 方法一样,notify()/notifyAll() 方法也需要在同步方法或者同步代码块中调用,也就是说,必须在调用该方法前,首先应该获得该对象的锁;
- 需要注意的是,执行完 notify() 方法后,当前线程不会立即释放其拥有的对象的锁,而是执行完之后才会释放该对象的锁,被通知的线程也不会立即获得对象的锁,而是等到 notify() 方法执行完以后,释放了该对象的锁,才可以重新获得对象的锁。
此外,还要注意一点的是,每个锁(monitor)都有两个队列:就绪队列和阻塞队列。就绪队列存储了已就绪的线程,即准备下一步将要竞争锁的线程,而阻塞队列存储了被阻塞的线程。当一个阻塞线程被唤醒后,才会进入就绪队列,进而等待 CPU 的调度;而当一个线程被 wait 后,则会进入阻塞队列,等待被唤醒。
下面通过一个实例来演示等待/通知机制:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
|
public class Test {
public static Object object = new Object();
public static void main(String[] args) throws InterruptedException {
Thread1 thread1 = new Thread1();
Thread2 thread2 = new Thread2();
thread1.start();
Thread.sleep(2000);
thread2.start();
}
static class Thread1 extends Thread {
@Override
public void run() {
synchronized (object) {
System.out.println("线程 " + Thread.currentThread().getName() + " 获取到了锁...");
try {
System.out.println("线程 " + Thread.currentThread().getName() + " 阻塞并释放锁...");
object.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("线程 " + Thread.currentThread().getName() + " 执行完毕...");
}
}
}
static class Thread2 extends Thread {
@Override
public void run() {
synchronized (object) {
System.out.println("线程 " + Thread.currentThread().getName() + " 获取到了锁...");
object.notify();
System.out.println("线程 " + Thread.currentThread().getName() + " 唤醒了正在 wait 的线程...");
}
System.out.println("线程 " + Thread.currentThread().getName() + " 执行完毕...");
}
}
}
|
输出结果如下所示:
1
2
3
4
5
6
|
线程 Thread-0 获取到了锁...
线程 Thread-0 阻塞并释放锁...
线程 Thread-1 获取到了锁...
线程 Thread-1 唤醒了正在 wait 的线程...
线程 Thread-1 执行完毕...
线程 Thread-0 执行完毕...
|
此外,还可以将文章开头的代码进行修改,如下所示:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
|
class MyList {
private static List<Integer> list = new ArrayList<>();
public static void add() {
list.add(123);
}
public static int size() {
return list.size();
}
}
class ThreadA extends Thread {
private Object object;
public ThreadA(Object object, String name) {
super(name);
this.object = object;
}
@Override
public void run() {
try {
synchronized (object) {
if (MyList.size() != 5) {
System.out.println("开始:" + System.currentTimeMillis());
object.wait();
System.out.println("结束:" + System.currentTimeMillis());
}
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
class ThreadB extends Thread {
private Object object;
public ThreadB(Object object, String name) {
super(name);
this.object = object;
}
@Override
public void run() {
try {
synchronized (object) {
for (int i = 0; i < 10; i++) {
MyList.add();
System.out.println("添加了 " + (i + 1) + " 个元素...");
if (MyList.size() == 5) {
object.notify();
System.out.println("已调用 notify() 方法...");
}
Thread.sleep(1000);
}
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public class Test {
public static void main(String[] args) {
try {
Object object = new Object();
ThreadA a = new ThreadA(object, "A");
a.start();
Thread.sleep(50);
ThreadB b = new ThreadB(object, "B");
b.start();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
|
输出结果如下所示:
1
2
3
4
5
6
7
8
9
10
11
12
13
|
开始:1586680853048
添加了 1 个元素...
添加了 2 个元素...
添加了 3 个元素...
添加了 4 个元素...
添加了 5 个元素...
已调用 notify() 方法...
添加了 6 个元素...
添加了 7 个元素...
添加了 8 个元素...
添加了 9 个元素...
添加了 10 个元素...
结束:1586680863185
|
由此可见,wait 方法会释放锁而 notify 方法不会释放锁。
模拟阻塞队列
阻塞队列,也就是 BlockingQueue,初始时队列的最大长度为 5。如果想要往队列中添加元素,则需要判断队列的长度是否是 5,如果是 5 则先等待而不是插入;如果想要从队列中消费元素,则需要判断队列的长度是否是 0,如果是 0 则先等待而不是消费。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
|
public class MyBlockingQueue {
// list 用于存放元素
private final LinkedList<Object> list = new LinkedList<>();
// 计数器
private final AtomicInteger count = new AtomicInteger();
private final int maxSize = 5;
private final int minSize = 0;
private final Object lock = new Object();
public void put(Object obj) {
synchronized (lock) {
// 在添加元素的时候,如果已经达到了最大容量,则等待
while (count.get() == maxSize) {
try {
lock.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
// 添加元素
list.add(obj);
// 计数器增加
count.getAndIncrement();
System.out.println("元素 " + obj + " 被添加到了 list 中...");
// 通知另外一个阻塞的线程方法
lock.notify();
}
}
public Object get() {
Object temp;
synchronized (lock) {
// 队列中没有元素了,则等待
while (count.get() == minSize) {
try {
lock.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
count.getAndDecrement();
temp = list.removeFirst();
System.out.println("元素 " + temp + " 被消费了...");
lock.notify();
}
return temp;
}
public int size() {
return count.get();
}
public static void initMyBlockingQueue(MyBlockingQueue myBlockingQueue) {
myBlockingQueue.put("1");
myBlockingQueue.put("2");
myBlockingQueue.put("3");
myBlockingQueue.put("4");
myBlockingQueue.put("5");
System.out.println("阻塞队列中当前元素的个数:" + myBlockingQueue.size());
}
public static void main(String[] args) throws InterruptedException {
final MyBlockingQueue myBlockingQueue = new MyBlockingQueue();
initMyBlockingQueue(myBlockingQueue);
Thread t1 = new Thread(() -> {
myBlockingQueue.put("6");
myBlockingQueue.put("7");
}, "t1");
Thread t2 = new Thread(() -> {
try {
Thread.sleep(2000);
myBlockingQueue.get();
Thread.sleep(2000);
myBlockingQueue.get();
} catch (InterruptedException e) {
e.printStackTrace();
}
}, "t2");
t1.start();
Thread.sleep(1000);
t2.start();
}
}
|
输出结果如下所示:
1
2
3
4
5
6
7
8
9
10
|
元素 1 被添加到了 list 中...
元素 2 被添加到了 list 中...
元素 3 被添加到了 list 中...
元素 4 被添加到了 list 中...
元素 5 被添加到了 list 中...
阻塞队列中当前元素的个数:5
元素 1 被消费了...
元素 6 被添加到了 list 中...
元素 2 被消费了...
元素 7 被添加到了 list 中...
|
Condition
Condition 是一个接口,需要结合 Lock 进行使用。之前的文章《Java 中的 Lock 及各种锁的概念》介绍过 Lock 的概念与使用,它需要显式地执行获取锁和释放锁的操作,一般使用的是它的实现类 ReentrantLock。
Condition 中的 await() 和 signal() 方法可以代替上面提到的传统的 wait() 和 notify() 方法,并且 Condition 在实现线程通信时更加安全和高效。一般通过lock.newCondition()
的方式创建多个 Condition,因此可以通过多个 Condition 来控制现有的等待和通知机制。多个线程可以竞争同一把锁,一把锁也可以关联多个 Condition,以便多个线程之间进行通信和协作。
需要注意的是,Condition 中的 await() 和 signal() 必须放在 lock.lock() 和 lockunlock() 之间。其中 Condition 中的这两个方法与 Object 类中的方法是一一对应的:
- Condition 中的 await() 对应 Object 类中的 wait();
- Condition 中的 signal() 对应 Object 类中的 notify();
- Condition 中的 signalAll() 对应 Object 类中的 notifyAll()。
下面通过一个实例来展示 Condition 的使用方式,以下使用的是同一个 LockConditionDemo 对象:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
|
public class LockConditionDemo {
private Lock lock = new ReentrantLock();
private Condition condition = lock.newCondition();
public static void main(String[] args) throws InterruptedException {
LockConditionDemo demo = new LockConditionDemo();
new Thread(() -> demo.awaitMethod(), "thread-1").start();
Thread.sleep(3000);
new Thread(() -> demo.signalMethod(), "thread-2").start();
}
private void awaitMethod() {
try {
lock.lock();
System.out.println("开始等待 await...ThreadName: " + Thread.currentThread().getName());
condition.await();
System.out.println("等待 await 结束...ThreadName: " + Thread.currentThread().getName());
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
public void signalMethod() {
lock.lock();
System.out.println("发送通知 signal...ThreadName: " + Thread.currentThread().getName());
condition.signal();
lock.unlock();
}
}
|
输出结果如下所示:
1
2
3
|
开始等待 await...ThreadName: thread-1
发送通知 signal...ThreadName: thread-2
等待 await 结束...ThreadName: thread-1
|
下面通过使用 Lock 对象和多个 Condition 实现等待/通知机制:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
|
class ThreadA extends Thread {
private MyService service;
public ThreadA(MyService service, String name) {
super(name);
this.service = service;
}
@Override
public void run() {
service.awaitA();
}
}
class ThreadB extends Thread {
private MyService service;
public ThreadB(MyService service, String name) {
super(name);
this.service = service;
}
@Override
public void run() {
service.awaitB();
}
}
class MyService {
private Lock lock = new ReentrantLock();
// 使用多个 Condition 实现通知部分线程
public Condition conditionA = lock.newCondition();
public Condition conditionB = lock.newCondition();
public void awaitA() {
lock.lock();
try {
System.out.println("begin awaitA 时间为: " + System.currentTimeMillis()
+ " ThreadName: " + Thread.currentThread().getName());
conditionA.await();
System.out.println("end awaitA 时间为: " + System.currentTimeMillis()
+ " ThreadName: " + Thread.currentThread().getName());
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
public void awaitB() {
lock.lock();
try {
System.out.println("begin awaitB 时间为: " + System.currentTimeMillis()
+ " ThreadName: " + Thread.currentThread().getName());
conditionB.await();
System.out.println("end awaitB 时间为: " + System.currentTimeMillis()
+ " ThreadName: " + Thread.currentThread().getName());
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
public void signalAll_A() {
try {
lock.lock();
System.out.println("signalAll_A 时间为: " + System.currentTimeMillis()
+ " ThreadName: " + Thread.currentThread().getName());
conditionA.signalAll();
} finally {
lock.unlock();
}
}
public void signalAll_B() {
try {
lock.lock();
System.out.println("signalAll_B 时间为: " + System.currentTimeMillis()
+ " ThreadName: " + Thread.currentThread().getName());
conditionB.signalAll();
} finally {
lock.unlock();
}
}
}
public class LockConditionDemo1 {
public static void main(String[] args) throws InterruptedException {
MyService service = new MyService();
ThreadA a = new ThreadA(service, "A");
a.start();
ThreadA b = new ThreadA(service, "B");
b.start();
Thread.sleep(3000);
service.signalAll_A();
}
}
|
输出结果如下所示:
1
2
3
4
5
|
begin awaitA 时间为: 1586697501006 ThreadName: A
begin awaitA 时间为: 1586697501006 ThreadName: B
signalAll_A 时间为: 1586697504000 ThreadName: main
end awaitA 时间为: 1586697504000 ThreadName: A
end awaitA 时间为: 1586697504000 ThreadName: B
|
从运行结果可以看出,Condition 实现了一种分组机制,它将所有对临界资源进行访问的线程进行分组,以便实现线程间更细化的协作,例如通知部分线程。
生产者-消费者模型
等待/通知机制最经典的应用就是生产者-消费者模型,下面分别采用两种方式,实现该模型:
- 传统实现方式:synchronized + wait + notify 方式;
- Condition方式:Lock + Condition 方式。
其它的实现方式可以参见:Part_11
传统实现方式
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
|
// 资源
class MyQueue {
private List list = new ArrayList();
// 负责生产
public synchronized void push() {
try {
while (list.size() == 1) {
System.out.println("队列已满,线程 " + Thread.currentThread().getName() + " 呈 wait 状态...");
wait();
}
list.add(" " + Math.random());
System.out.println("线程 " + Thread.currentThread().getName() + " 生产了,队列已满...");
notifyAll();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
// 负责消费
public synchronized String pop() {
String temp = "";
try {
while (list.size() == 0) {
System.out.println("队列已空,线程 " + Thread.currentThread().getName() + " 呈 wait 状态...");
wait();
}
temp = "" + list.get(0);
list.remove(0);
System.out.println("线程 " + Thread.currentThread().getName() + " 消费了,队列已空...");
notifyAll();
} catch (InterruptedException e) {
e.printStackTrace();
}
return temp;
}
}
// 生产者线程
class ProducerThread extends Thread {
private MyQueue queue;
public ProducerThread(MyQueue queue, String name) {
super(name);
this.queue = queue;
}
public void pushService() {
queue.push();
}
@Override
public void run() {
queue.push();
}
}
// 消费者线程
class ConsumerThread extends Thread {
private MyQueue queue;
public ConsumerThread(MyQueue queue, String name) {
super(name);
this.queue = queue;
}
@Override
public void run() {
queue.pop();
}
}
public class Test {
public static void main(String[] args) {
MyQueue queue = new MyQueue();
new ProducerThread(queue, "Producer1").start();
new ProducerThread(queue, "Producer2").start();
new ProducerThread(queue, "Producer3").start();
new ProducerThread(queue, "Producer4").start();
new ProducerThread(queue, "Producer5").start();
new ConsumerThread(queue, "Consumer1").start();
new ConsumerThread(queue, "Consumer2").start();
new ConsumerThread(queue, "Consumer3").start();
new ConsumerThread(queue, "Consumer4").start();
new ConsumerThread(queue, "Consumer5").start();
new ConsumerThread(queue, "Consumer6").start();
new ConsumerThread(queue, "Consumer7").start();
}
}
|
输出结果如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
|
线程 Producer1 生产了,队列已满...
队列已满,线程 Producer2 呈 wait 状态...
队列已满,线程 Producer3 呈 wait 状态...
队列已满,线程 Producer4 呈 wait 状态...
队列已满,线程 Producer5 呈 wait 状态...
线程 Consumer1 消费了,队列已空...
线程 Producer5 生产了,队列已满...
队列已满,线程 Producer4 呈 wait 状态...
队列已满,线程 Producer3 呈 wait 状态...
队列已满,线程 Producer2 呈 wait 状态...
线程 Consumer2 消费了,队列已空...
线程 Producer2 生产了,队列已满...
队列已满,线程 Producer3 呈 wait 状态...
队列已满,线程 Producer4 呈 wait 状态...
线程 Consumer3 消费了,队列已空...
线程 Producer4 生产了,队列已满...
队列已满,线程 Producer3 呈 wait 状态...
线程 Consumer4 消费了,队列已空...
线程 Producer3 生产了,队列已满...
线程 Consumer6 消费了,队列已空...
队列已空,线程 Consumer7 呈 wait 状态...
队列已空,线程 Consumer5 呈 wait 状态...
|
Condition 方式
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
|
// 资源类
class MyService {
private ReentrantLock lock = new ReentrantLock();
// 生产线程
private Condition conditionA = lock.newCondition();
// 消费线程
private Condition conditionB = lock.newCondition();
private boolean hasValue = false;
public void set() {
try {
lock.lock();
while (hasValue == true) {
System.out.println("[生产线程] " + "线程: " + Thread.currentThread().getName() + " await...");
conditionA.await();
}
System.out.println("[生产中] " + "线程: " + Thread.currentThread().getName() + " 生产~");
Thread.sleep(1000);
hasValue = true;
System.out.println("线程: " + Thread.currentThread().getName() + " 生产完毕...");
System.out.println("[唤醒所有消费线程] " + " 线程: " + Thread.currentThread().getName() + "...");
conditionB.signalAll();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
public void get() {
try {
lock.lock();
while (hasValue == false) {
System.out.println("[消费线程] " + "线程: " + Thread.currentThread().getName() + " await...");
conditionB.await();
}
System.out.println("[消费中] " + " 线程: " + Thread.currentThread().getName() + " 消费~");
Thread.sleep(1000);
System.out.println("线程: " + Thread.currentThread().getName() + " 消费完毕...");
hasValue = false;
System.out.println("[唤醒所有生产线程] " + " 线程: " + Thread.currentThread().getName() + "...");
conditionA.signalAll();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
}
class MyThreadA extends Thread {
private MyService myService;
public MyThreadA(MyService myService, String name) {
super(name);
this.myService = myService;
}
@Override
public void run() {
while (true) {
myService.set();
}
}
}
class MyThreadB extends Thread {
private MyService myService;
public MyThreadB(MyService myService, String name) {
super(name);
this.myService = myService;
}
@Override
public void run() {
while (true) {
myService.get();
}
}
}
public class Test {
public static void main(String[] args) {
MyService myService = new MyService();
MyThreadA[] threadA = new MyThreadA[10];
MyThreadB[] threadB = new MyThreadB[10];
for (int i = 0; i < 10; i++) {
threadA[i] = new MyThreadA(myService, "ThreadA-" + i);
threadB[i] = new MyThreadB(myService, "ThreadB-" + i);
threadA[i].start();
threadB[i].start();
}
}
}
|
输出结果如下所示:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
|
[生产中] 线程: ThreadA-0 生产~
线程: ThreadA-0 生产完毕...
[唤醒所有消费线程] 线程: ThreadA-0...
[生产线程] 线程: ThreadA-0 await...
[消费中] 线程: ThreadB-0 消费~
线程: ThreadB-0 消费完毕...
[唤醒所有生产线程] 线程: ThreadB-0...
[生产中] 线程: ThreadA-1 生产~
线程: ThreadA-1 生产完毕...
[唤醒所有消费线程] 线程: ThreadA-1...
[消费中] 线程: ThreadB-1 消费~
线程: ThreadB-1 消费完毕...
[唤醒所有生产线程] 线程: ThreadB-1...
[消费线程] 线程: ThreadB-1 await...
[消费线程] 线程: ThreadB-2 await...
...
|
由此可见,等待/通知机制的经典范式,等待方
需要遵循如下原则:
-
- 获取对象的锁;
-
- 如果条件不满足,则调用对象的 wait() 方法,被通知后仍要检查条件;
-
- 条件满足则执行对应的逻辑。
伪代码如下:
1
2
3
4
5
6
|
synchronized (对象) {
while (条件不满足) {
对象.wait();
}
对应的处理逻辑
}
|
通知方
需要遵循如下原则:
-
- 获得对象的锁;
-
- 改变条件;
-
- 通知所有等待在对象上的线程。
伪代码如下:
1
2
3
4
|
synchronized (对象) {
改变条件
对象.notifyAll();
}
|
管道输入/输出流
管道输入/输出流和普通的文件输入/输出流或者网络输入/输出流不同之处在于:管道输入/输出流主要用于线程之间的通信,也就是数据传输,而传输的媒介是内存。
管道输入/输出流主要由 4 种具体实现:
- 面向字节
- PipedOutputStream
- PipedInputStream
- 面向字符
一个 PipedOutputStream 必须和一个 PipedInputStream 实例对象进行连接(调用 connect 方法)而产生一个通信管道。PipedOutputStream 用于向管道中写入数据,而 PipedInputStream 用于从管道中取出数据。
PipedOutputStream 类似于生产者,PipedInputStream 类似于消费者。在 PipedInputStream 中有一个 buffer 字节数组缓冲区,默认大小为 1024,存放生产者所提供的资源。变量 in 表示生产者生产资源的多少,变量 out 表示消费者消费资源的多少。in=-1 表示消费完了,in==out 表示生产满了。
下面通过一个实例进行说明。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
|
public class Test {
public static void main(String[] args) throws IOException {
PipedWriter out = new PipedWriter();
PipedReader in = new PipedReader();
// 将输入流和输出流进行连接
out.connect(in);
Thread thread = new Thread(new Print(in), "PrintThread");
thread.start();
int receive = 0;
try {
while ((receive = System.in.read()) != -1) {
out.write(receive);
}
} finally {
out.close();
}
}
static class Print implements Runnable {
private PipedReader in;
public Print(PipedReader in) {
this.in = in;
}
@Override
public void run() {
int receive = 0;
try {
while ((receive = in.read()) != -1) {
System.out.print((char)receive);
}
} catch (IOException e) {
}
}
}
}
|
输出结果如下所示:
由于我输入的是 123,则对应的输出也是 123。因此,以上程序的功能就是创建了一个线程 thread,用来接收 main 线程的输入,main 线程的任何输入都通过 PipedWriter 写入管道,而 thread 在管道的另一端通过 PipedReader 将内容读出并打印。
join() 方法
如果在 main 线程中调用了 thread.join() 方法,则 main 线程会等待 thread 线程执行完或等待一定的时间。如果调用的是无参的 join 方法,则等待 thread 执行完毕;如果是调用了有参的 join 方法,则需要等待一段指定的时间。
Thread 类中有 3 个重载的 join 方法:
1
2
3
4
5
6
7
|
public final void join() throws InterruptedException {
join(0);
}
public final synchronized void join(long millis) throws InterruptedException {...}
public final synchronized void join(long millis, int nanos) throws InterruptedException {...}
|
如果直接调用 thread.join() 方法的话,实际上调用的是 join(0) 方法,而在 join(long millis) 方法中,由于传入的参数为 0,因此会再调用 Object 类中的 wait(0) 方法。如下所示:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
|
// Thread 类中的 join 方法
public final synchronized void join(long millis) throws InterruptedException {
long base = System.currentTimeMillis();
long now = 0;
if (millis < 0) {
throw new IllegalArgumentException("timeout value is negative");
}
if (millis == 0) {
while (isAlive()) {
wait(0);
}
} else {
while (isAlive()) {
long delay = millis - now;
if (delay <= 0) {
break;
}
wait(delay);
now = System.currentTimeMillis() - base;
}
}
}
// Object 类中的 wait 方法
public final native void wait(long timeout) throws InterruptedException;
|
下面通过具体的实例进行说明:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
|
public class Test {
public static void main(String[] args) {
System.out.println("进入线程: " + Thread.currentThread().getName());
Test test = new Test();
MyThread thread = test.new MyThread();
thread.start();
try {
System.out.println("线程: " + thread.currentThread().getName() + " 开始等待...");
thread.join();
System.out.println("线程: " + thread.currentThread().getName() + " 继续执行...");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
class MyThread extends Thread {
@Override
public void run() {
System.out.println("进入线程: " + Thread.currentThread().getName());
try {
Thread.currentThread().sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("线程: " + Thread.currentThread().getName() + " 执行完毕...");
}
}
}
|
输出结果如下所示:
1
2
3
4
5
|
进入线程: main
线程: main 开始等待...
进入线程: Thread-0
线程: Thread-0 执行完毕...
线程: main 继续执行...
|
通过结果可以看出,当 mian 线程执行到 thread.join() 方法时,main 线程会获得线程对象 thread 的锁,也就是因为执行了 wait 方法而拿到该对象的锁。只要 thread 线程一直存活,就会调用该对象锁的 wait 方法阻塞 mian 线程。
这里需要注意的是,既然调用了 wait 方法阻塞了 main 线程,那么什么时候将 main 线程唤醒呢?从 JVM 源码中可以看到,其内部调用了 notifyAll 方法,会通知所有等待在该线程对象上的线程,因此 main 线程也就能够继续执行了。
小结:
- join 方法会调用 wait 方法,让宿主线程(例如上面的 main 线程)进入阻塞状态,并且会让线程交出 CPU 执行权限;
- join 方法会让线程释放对一个对象持有的锁;
- 如果调用了 join 方法,则必须捕获 InterruptedException 异常或向上抛出。
参考