之前的文章,比如在多线程环境中使用 synchronizedLock 可以通过锁的方式实现线程之间的同步互斥访问临界资源。也就是说,只有当一个线程释放了锁之后,另一个线程才能获得锁,从而对数据进行操作。但在有些情况下,需要多个线程之间进行协作,以使得多个线程可以一起去解决某个问题,此时就需要采用线程通信-协作机制。

概述

总体来讲,线程之间有以下几种通信方式:

  • 等待/通知机制,即 wait() 和 notify();
  • 使用 join();
  • 管道输入输出流,使用内存作为传输媒介;
  • 生产者-消费者模型;
  • 使用 Condition 控制线程通信;
  • 使用阻塞队列(BlockingQueue)控制线程通信。

进程之间的通信方式主要有以下几种:

  • 管道(pipe)
    • 有名管道(nameed pipe)
  • 信号量机制(semophore)
    • 信号(signal)
  • 消息队列(message queue)
  • 共享内存(shared memory)
  • 套接字(socket)

假如我们不使用以上的方式,而是自己实现两个线程之间的通信的话,可以使用不断轮询的方法,也就是使用 while 语句来让一个线程不断检测是否满足某个条件。这样做的缺点是浪费 CPU 的资源,我们通过以下例子进行说明。

该例子包含三个类:

  • MyList:即资源类,用于设置临界资源,让线程 A 和线程 B 进行访问;
  • ThreadA:即线程 A,不断地向资源类中添加元素;
  • ThreadB:即线程 B,使用 while(ture) 语句不断轮询,等到临界资源的大小达到某一值的时候做出响应。

如下所示:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
// 资源类
class MyList {

    // 临界资源
    private volatile List<String> list = new ArrayList<>();

    public void add() {
        list.add("123");
    }

    public int size() {
        return list.size();
    }
}


// 线程 A
class ThreadA extends Thread {
    private MyList list;

    public ThreadA(MyList list, String name) {
        super(name);
        this.list = list;
    }

    @Override
    public void run() {
        try {
            for (int i = 0; i < 5; i++) {
                list.add();
                System.out.println("添加了 " + (i + 1) + " 个元素.");
                Thread.sleep(1000);
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

// 线程 B
class ThreadB extends Thread {
    private MyList list;

    public ThreadB(MyList list, String name) {
        super(name);
        this.list = list;
    }

    @Override
    public void run() {
        try {
            while (true) {
                if (list.size() == 4) {
                    System.out.println("list 的大小为 4 了,线程 B 要退出了!");
                    throw new InterruptedException();
                }
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

public class Test{
    public static void main(String[] args) {
        MyList service = new MyList();

        ThreadA a = new ThreadA(service, "A");
        ThreadB b = new ThreadB(service, "B");

        a.start();
        b.start();
    }
}

输出结果如下所示:

1
2
3
4
5
6
7
8
添加了 1 个元素.
添加了 2 个元素.
添加了 3 个元素.
添加了 4 个元素.
list 的大小为 4 了,线程 B 要退出了!
java.lang.InterruptedException
	at Part_05.ThreadB.run(Test.java:60)
添加了 5 个元素.

从结果可以看出,线程 A 不断地向 list 中添加元素,等到 list 的大小为 4 时线程 B 才退出。显然这种通信方式效率不高,CPU 的利用率也不大,并且非常耗费资源。因为线程 B 每次都需要去检查 list 中元素的个数,每次都需要做出判断。

所以,为了提高线程之间的通信效率,提高 CPU 的利用率,下面则介绍几种线程之间通信的方式。

wait/notify 机制

wait/notify 机制,即等待/通知机制,这里的等待/通知就像我们去银行办理业务。一般情况下,我们到达银行后需要取号,如果没有空闲的业务窗口的话,则需要坐在一旁等待一段时间,在等待的过程中,如果业务员通过广播叫到了我们自己手中的号码,则就去对应的窗口办理业务。你看,这里的等待就相当于 wait,而业务员的叫号操作就相当于 notify,也就是通知。

等待/通知机制主要使用的是 Object 类中的三个方法:wait()、notify()、notify()。

既然是与线程通信相关,为什么这三个方法是属于 Object 类而不属于 Thread 类呢?

因为对于每一个对象来说,它们都拥有一个锁,即 monitor。让当前线程等待某个对象的锁,则应该通过这个对象来操作,而不是用当前线程来操作。因为当前线程可能会等待多个线程的锁,如果通过线程来操作的话,那会显得十分复杂。

wait()

Object 类中的 wait() 方法里又调用了含参的 wait(long timeout) 方法,当然除了这两个方法外,还有一个含有两个参数的 wait(long timeout, int nanos) 方法,如下所示:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
public final void wait() throws InterruptedException {
    wait(0);
}

public final native void wait(long timeout) throws InterruptedException;

public final void wait(long timeout, int nanos) throws InterruptedException {
    if (timeout < 0) {
        throw new IllegalArgumentException("timeout value is negative");
    }

    if (nanos < 0 || nanos > 999999) {
        throw new IllegalArgumentException(
                            "nanosecond timeout value out of range");
    }

    if (nanos > 0) {
        timeout++;
    }

    wait(timeout);
}

注意到以上的三个 wait 方法都是被 final 修饰的,而 wait(long timeout) 还被 native 修饰,属于本地方法。wait 方法的作用是让当前线程释放对象的锁并进入等待状态,直到另一个线程调用该对象的 notify() 方法或 notifyAll() 方法。

详细的说:当前线程指的是 Thread.concurrentThread() 方法所返回的线程。并且当前线程必须拥有这个对象的锁(monitor),该线程释放对该锁(monitor)的所有权,然后等待另一个线程通过调用 notify 方法或 notifyAll 方法通知等待该对象的锁的唤醒。该线程会一直处于等待状态,直到它能重新获得锁的所有权并恢复执行。需要注意的是,该方法只能由这个对象的锁(monitor)的所有者线程调用。

wait() 方法的小结:

  • wait() 方法的作用是使当前执行代码的线程进入等待状态,该方法会将线程放入“预执行队列”中,并且会在 wait() 所在的代码处停止执行,直到收到通知或被中断为止;
  • 在调用 wait() 方法之前,线程必须获得该对象的锁,即只能在同步方法或同步代码块中调用 wait() 方法;
  • wait() 是释放锁的,也就是说在执行到 wait() 方法之后,当前线程会释放锁,当从 wait() 方法返回之前,线程与其它线程会相互竞争以重新获得锁。

notify()/notifyAll()

notify() 方法会唤醒一个正在等待相应对象锁的线程,使其进入就绪队列,以便在当前线程释放锁后接着竞争锁,进而得到 CPU 的执行。如下所示:

1
public final native void notify();

注意这里的唤醒方式,如果有许多线程在这个对象上等待,则会只选择其中一个线程被唤醒,这个选择是任意的,由实现者自己决定。

而 notifyAll() 唤醒的方式是唤醒所有正在等待相应对象锁的线程,这里需要注意一下。源码如下所示:

1
public final native void notifyAll();

notify()/notifyAll() 小结:

  • 与 wait() 方法一样,notify()/notifyAll() 方法也需要在同步方法或者同步代码块中调用,也就是说,必须在调用该方法前,首先应该获得该对象的锁;
  • 需要注意的是,执行完 notify() 方法后,当前线程不会立即释放其拥有的对象的锁,而是执行完之后才会释放该对象的锁,被通知的线程也不会立即获得对象的锁,而是等到 notify() 方法执行完以后,释放了该对象的锁,才可以重新获得对象的锁。

此外,还要注意一点的是,每个锁(monitor)都有两个队列:就绪队列和阻塞队列。就绪队列存储了已就绪的线程,即准备下一步将要竞争锁的线程,而阻塞队列存储了被阻塞的线程。当一个阻塞线程被唤醒后,才会进入就绪队列,进而等待 CPU 的调度;而当一个线程被 wait 后,则会进入阻塞队列,等待被唤醒。

下面通过一个实例来演示等待/通知机制:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
public class Test {
    public static Object object = new Object();

    public static void main(String[] args) throws InterruptedException {
        Thread1 thread1 = new Thread1();
        Thread2 thread2 = new Thread2();

        thread1.start();
        Thread.sleep(2000);
        thread2.start();
    }

    static class Thread1 extends Thread {
        @Override
        public void run() {
            synchronized (object) {
                System.out.println("线程 " + Thread.currentThread().getName() + " 获取到了锁...");
                try {
                    System.out.println("线程 " + Thread.currentThread().getName() + " 阻塞并释放锁...");
                    object.wait();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("线程 " + Thread.currentThread().getName() + " 执行完毕...");
            }
        }
    }

    static class Thread2 extends Thread {
        @Override
        public void run() {
            synchronized (object) {
                System.out.println("线程 " + Thread.currentThread().getName() + " 获取到了锁...");
                object.notify();
                System.out.println("线程 " + Thread.currentThread().getName() + " 唤醒了正在 wait 的线程...");
            }
            System.out.println("线程 " + Thread.currentThread().getName() + " 执行完毕...");
        }
    }
}

输出结果如下所示:

1
2
3
4
5
6
线程 Thread-0 获取到了锁...
线程 Thread-0 阻塞并释放锁...
线程 Thread-1 获取到了锁...
线程 Thread-1 唤醒了正在 wait 的线程...
线程 Thread-1 执行完毕...
线程 Thread-0 执行完毕...

此外,还可以将文章开头的代码进行修改,如下所示:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
class MyList {
    private static List<Integer> list = new ArrayList<>();

    public static void add() {
        list.add(123);
    }

    public static int size() {
        return list.size();
    }
}

class ThreadA extends Thread {
    private Object object;

    public ThreadA(Object object, String name) {
        super(name);
        this.object = object;
    }

    @Override
    public void run() {
        try {
            synchronized (object) {
                if (MyList.size() != 5) {
                    System.out.println("开始:" + System.currentTimeMillis());
                    object.wait();
                    System.out.println("结束:" + System.currentTimeMillis());
                }
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

class ThreadB extends Thread {
    private Object object;

    public ThreadB(Object object, String name) {
        super(name);
        this.object = object;
    }

    @Override
    public void run() {
        try {
            synchronized (object) {
                for (int i = 0; i < 10; i++) {
                    MyList.add();
                    System.out.println("添加了 " + (i + 1) + " 个元素...");
                    if (MyList.size() == 5) {
                        object.notify();
                        System.out.println("已调用 notify() 方法...");
                    }
                    Thread.sleep(1000);
                }
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

public class Test {
    public static void main(String[] args) {
        try {
            Object object = new Object();

            ThreadA a = new ThreadA(object, "A");
            a.start();
            Thread.sleep(50);

            ThreadB b = new ThreadB(object, "B");
            b.start();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

输出结果如下所示:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
开始:1586680853048
添加了 1 个元素...
添加了 2 个元素...
添加了 3 个元素...
添加了 4 个元素...
添加了 5 个元素...
已调用 notify() 方法...
添加了 6 个元素...
添加了 7 个元素...
添加了 8 个元素...
添加了 9 个元素...
添加了 10 个元素...
结束:1586680863185

由此可见,wait 方法会释放锁而 notify 方法不会释放锁。

模拟阻塞队列

阻塞队列,也就是 BlockingQueue,初始时队列的最大长度为 5。如果想要往队列中添加元素,则需要判断队列的长度是否是 5,如果是 5 则先等待而不是插入;如果想要从队列中消费元素,则需要判断队列的长度是否是 0,如果是 0 则先等待而不是消费。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
public class MyBlockingQueue {

    // list 用于存放元素
    private final LinkedList<Object> list = new LinkedList<>();

    // 计数器
    private final AtomicInteger count = new AtomicInteger();

    private final int maxSize = 5;
    private final int minSize = 0;

    private final Object lock = new Object();

    public void put(Object obj) {
        synchronized (lock) {
            // 在添加元素的时候,如果已经达到了最大容量,则等待
            while (count.get() == maxSize) {
                try {
                    lock.wait();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            // 添加元素
            list.add(obj);
            // 计数器增加
            count.getAndIncrement();
            System.out.println("元素 " + obj + " 被添加到了 list 中...");
            // 通知另外一个阻塞的线程方法
            lock.notify();
        }
    }

    public Object get() {
        Object temp;
        synchronized (lock) {
            // 队列中没有元素了,则等待
            while (count.get() == minSize) {
                try {
                    lock.wait();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            count.getAndDecrement();
            temp = list.removeFirst();
            System.out.println("元素 " + temp + " 被消费了...");
            lock.notify();
        }
        return temp;
    }

    public int size() {
        return count.get();
    }

    public static void initMyBlockingQueue(MyBlockingQueue myBlockingQueue) {
        myBlockingQueue.put("1");
        myBlockingQueue.put("2");
        myBlockingQueue.put("3");
        myBlockingQueue.put("4");
        myBlockingQueue.put("5");
        System.out.println("阻塞队列中当前元素的个数:" + myBlockingQueue.size());
    }

    public static void main(String[] args) throws InterruptedException {

        final MyBlockingQueue myBlockingQueue = new MyBlockingQueue();
        initMyBlockingQueue(myBlockingQueue);

        Thread t1 = new Thread(() -> {
            myBlockingQueue.put("6");
            myBlockingQueue.put("7");
        }, "t1");

        Thread t2 = new Thread(() -> {
            try {
                Thread.sleep(2000);
                myBlockingQueue.get();
                Thread.sleep(2000);
                myBlockingQueue.get();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }, "t2");

        t1.start();
        Thread.sleep(1000);
        t2.start();
    }
}

输出结果如下所示:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
元素 1 被添加到了 list 中...
元素 2 被添加到了 list 中...
元素 3 被添加到了 list 中...
元素 4 被添加到了 list 中...
元素 5 被添加到了 list 中...
阻塞队列中当前元素的个数:5
元素 1 被消费了...
元素 6 被添加到了 list 中...
元素 2 被消费了...
元素 7 被添加到了 list 中...

Condition

Condition 是一个接口,需要结合 Lock 进行使用。之前的文章《Java 中的 Lock 及各种锁的概念》介绍过 Lock 的概念与使用,它需要显式地执行获取锁和释放锁的操作,一般使用的是它的实现类 ReentrantLock。

Condition 中的 await() 和 signal() 方法可以代替上面提到的传统的 wait() 和 notify() 方法,并且 Condition 在实现线程通信时更加安全和高效。一般通过lock.newCondition()的方式创建多个 Condition,因此可以通过多个 Condition 来控制现有的等待和通知机制。多个线程可以竞争同一把锁,一把锁也可以关联多个 Condition,以便多个线程之间进行通信和协作。

需要注意的是,Condition 中的 await() 和 signal() 必须放在 lock.lock() 和 lockunlock() 之间。其中 Condition 中的这两个方法与 Object 类中的方法是一一对应的:

  • Condition 中的 await() 对应 Object 类中的 wait();
  • Condition 中的 signal() 对应 Object 类中的 notify();
  • Condition 中的 signalAll() 对应 Object 类中的 notifyAll()。

下面通过一个实例来展示 Condition 的使用方式,以下使用的是同一个 LockConditionDemo 对象:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
public class LockConditionDemo {
    private Lock lock = new ReentrantLock();
    private Condition condition = lock.newCondition();

    public static void main(String[] args) throws InterruptedException {
        LockConditionDemo demo = new LockConditionDemo();

        new Thread(() -> demo.awaitMethod(), "thread-1").start();
        Thread.sleep(3000);
        new Thread(() -> demo.signalMethod(), "thread-2").start();
    }

    private void awaitMethod() {
        try {
            lock.lock();
            System.out.println("开始等待 await...ThreadName: " + Thread.currentThread().getName());
            condition.await();
            System.out.println("等待 await 结束...ThreadName: " + Thread.currentThread().getName());
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }
    }

    public void signalMethod() {
        lock.lock();
        System.out.println("发送通知 signal...ThreadName: " + Thread.currentThread().getName());
        condition.signal();
        lock.unlock();
    }
}

输出结果如下所示:

1
2
3
开始等待 await...ThreadName: thread-1
发送通知 signal...ThreadName: thread-2
等待 await 结束...ThreadName: thread-1

下面通过使用 Lock 对象和多个 Condition 实现等待/通知机制:

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
class ThreadA extends Thread {
    private MyService service;

    public ThreadA(MyService service, String name) {
        super(name);
        this.service = service;
    }

    @Override
    public void run() {
        service.awaitA();
    }
}


class ThreadB extends Thread {
    private MyService service;

    public ThreadB(MyService service, String name) {
        super(name);
        this.service = service;
    }

    @Override
    public void run() {
        service.awaitB();
    }
}

class MyService {
    private Lock lock = new ReentrantLock();
    // 使用多个 Condition 实现通知部分线程
    public Condition conditionA = lock.newCondition();
    public Condition conditionB = lock.newCondition();

    public void awaitA() {
        lock.lock();
        try {
            System.out.println("begin awaitA 时间为: " + System.currentTimeMillis()
                    + " ThreadName: " + Thread.currentThread().getName());
            conditionA.await();
            System.out.println("end awaitA 时间为: " + System.currentTimeMillis()
                    + " ThreadName: " + Thread.currentThread().getName());
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }
    }

    public void awaitB() {
        lock.lock();
        try {
            System.out.println("begin awaitB 时间为: " + System.currentTimeMillis()
                    + " ThreadName: " + Thread.currentThread().getName());
            conditionB.await();
            System.out.println("end awaitB 时间为: " + System.currentTimeMillis()
                    + " ThreadName: " + Thread.currentThread().getName());
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }
    }


    public void signalAll_A() {
        try {
            lock.lock();
            System.out.println("signalAll_A 时间为: " + System.currentTimeMillis()
                    + " ThreadName: " + Thread.currentThread().getName());
            conditionA.signalAll();
        } finally {
            lock.unlock();
        }
    }

    public void signalAll_B() {
        try {
            lock.lock();
            System.out.println("signalAll_B 时间为: " + System.currentTimeMillis()
                    + " ThreadName: " + Thread.currentThread().getName());
            conditionB.signalAll();
        } finally {
            lock.unlock();
        }
    }
}


public class LockConditionDemo1 {
    public static void main(String[] args) throws InterruptedException {
        MyService service = new MyService();

        ThreadA a = new ThreadA(service, "A");
        a.start();

        ThreadA b = new ThreadA(service, "B");
        b.start();

        Thread.sleep(3000);
        service.signalAll_A();
    }
}

输出结果如下所示:

1
2
3
4
5
begin awaitA 时间为: 1586697501006 ThreadName: A
begin awaitA 时间为: 1586697501006 ThreadName: B
signalAll_A 时间为: 1586697504000 ThreadName: main
end awaitA 时间为: 1586697504000 ThreadName: A
end awaitA 时间为: 1586697504000 ThreadName: B

从运行结果可以看出,Condition 实现了一种分组机制,它将所有对临界资源进行访问的线程进行分组,以便实现线程间更细化的协作,例如通知部分线程。

生产者-消费者模型

等待/通知机制最经典的应用就是生产者-消费者模型,下面分别采用两种方式,实现该模型:

  • 传统实现方式:synchronized + wait + notify 方式;
  • Condition方式:Lock + Condition 方式。

其它的实现方式可以参见:Part_11

传统实现方式

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
// 资源
class MyQueue {
    private List list = new ArrayList();

    // 负责生产
    public synchronized void push() {
        try {
            while (list.size() == 1) {
                System.out.println("队列已满,线程 " + Thread.currentThread().getName() + " 呈 wait 状态...");
                wait();
            }
            list.add(" " + Math.random());
            System.out.println("线程 " + Thread.currentThread().getName() + " 生产了,队列已满...");
            notifyAll();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    // 负责消费
    public synchronized String pop() {
        String temp = "";
        try {
            while (list.size() == 0) {
                System.out.println("队列已空,线程 " + Thread.currentThread().getName() + " 呈 wait 状态...");
                wait();
            }
            temp = "" + list.get(0);
            list.remove(0);
            System.out.println("线程 " + Thread.currentThread().getName() + " 消费了,队列已空...");
            notifyAll();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return temp;
    }
}


// 生产者线程
class ProducerThread extends Thread {
    private MyQueue queue;

    public ProducerThread(MyQueue queue, String name) {
        super(name);
        this.queue = queue;
    }

    public void pushService() {
        queue.push();
    }

    @Override
    public void run() {
        queue.push();
    }
}


// 消费者线程
class ConsumerThread extends Thread {
    private MyQueue queue;

    public ConsumerThread(MyQueue queue, String name) {
        super(name);
        this.queue = queue;
    }

    @Override
    public void run() {
        queue.pop();
    }
}


public class Test {

    public static void main(String[] args) {
        MyQueue queue = new MyQueue();

        new ProducerThread(queue, "Producer1").start();
        new ProducerThread(queue, "Producer2").start();
        new ProducerThread(queue, "Producer3").start();
        new ProducerThread(queue, "Producer4").start();
        new ProducerThread(queue, "Producer5").start();

        new ConsumerThread(queue, "Consumer1").start();
        new ConsumerThread(queue, "Consumer2").start();
        new ConsumerThread(queue, "Consumer3").start();
        new ConsumerThread(queue, "Consumer4").start();
        new ConsumerThread(queue, "Consumer5").start();
        new ConsumerThread(queue, "Consumer6").start();
        new ConsumerThread(queue, "Consumer7").start();
    }
}

输出结果如下:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
线程 Producer1 生产了,队列已满...
队列已满,线程 Producer2 呈 wait 状态...
队列已满,线程 Producer3 呈 wait 状态...
队列已满,线程 Producer4 呈 wait 状态...
队列已满,线程 Producer5 呈 wait 状态...
线程 Consumer1 消费了,队列已空...
线程 Producer5 生产了,队列已满...
队列已满,线程 Producer4 呈 wait 状态...
队列已满,线程 Producer3 呈 wait 状态...
队列已满,线程 Producer2 呈 wait 状态...
线程 Consumer2 消费了,队列已空...
线程 Producer2 生产了,队列已满...
队列已满,线程 Producer3 呈 wait 状态...
队列已满,线程 Producer4 呈 wait 状态...
线程 Consumer3 消费了,队列已空...
线程 Producer4 生产了,队列已满...
队列已满,线程 Producer3 呈 wait 状态...
线程 Consumer4 消费了,队列已空...
线程 Producer3 生产了,队列已满...
线程 Consumer6 消费了,队列已空...
队列已空,线程 Consumer7 呈 wait 状态...
队列已空,线程 Consumer5 呈 wait 状态...

Condition 方式

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
// 资源类
class MyService {
    private ReentrantLock lock = new ReentrantLock();
    // 生产线程
    private Condition conditionA = lock.newCondition();
    // 消费线程
    private Condition conditionB = lock.newCondition();
    private boolean hasValue = false;

    public void set() {
        try {
            lock.lock();
            while (hasValue == true) {
                System.out.println("[生产线程] " + "线程: " + Thread.currentThread().getName() + " await...");
                conditionA.await();
            }
            System.out.println("[生产中] " + "线程: " + Thread.currentThread().getName() + " 生产~");
            Thread.sleep(1000);
            hasValue = true;
            System.out.println("线程: " + Thread.currentThread().getName() + " 生产完毕...");
            System.out.println("[唤醒所有消费线程] " + " 线程: " + Thread.currentThread().getName() + "...");
            conditionB.signalAll();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }
    }

    public void get() {
        try {
            lock.lock();
            while (hasValue == false) {
                System.out.println("[消费线程] " + "线程: " + Thread.currentThread().getName() + " await...");
                conditionB.await();
            }
            System.out.println("[消费中] " + " 线程: " + Thread.currentThread().getName() + " 消费~");
            Thread.sleep(1000);
            System.out.println("线程: " + Thread.currentThread().getName() + " 消费完毕...");
            hasValue = false;
            System.out.println("[唤醒所有生产线程] " + " 线程: " + Thread.currentThread().getName() + "...");
            conditionA.signalAll();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }
    }

}

class MyThreadA extends Thread {
    private MyService myService;

    public MyThreadA(MyService myService, String name) {
        super(name);
        this.myService = myService;
    }

    @Override
    public void run() {
        while (true) {
            myService.set();
        }
    }
}


class MyThreadB extends Thread {
    private MyService myService;

    public MyThreadB(MyService myService, String name) {
        super(name);
        this.myService = myService;
    }

    @Override
    public void run() {
        while (true) {
            myService.get();
        }
    }
}

public class Test {
    public static void main(String[] args) {
        MyService myService = new MyService();

        MyThreadA[] threadA = new MyThreadA[10];
        MyThreadB[] threadB = new MyThreadB[10];

        for (int i = 0; i < 10; i++) {
            threadA[i] = new MyThreadA(myService, "ThreadA-" + i);
            threadB[i] = new MyThreadB(myService, "ThreadB-" + i);
            threadA[i].start();
            threadB[i].start();
        }
    }
}

输出结果如下所示:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
[生产中] 线程: ThreadA-0 生产~
线程: ThreadA-0 生产完毕...
[唤醒所有消费线程]  线程: ThreadA-0...
[生产线程] 线程: ThreadA-0 await...
[消费中]  线程: ThreadB-0 消费~
线程: ThreadB-0 消费完毕...
[唤醒所有生产线程]  线程: ThreadB-0...
[生产中] 线程: ThreadA-1 生产~
线程: ThreadA-1 生产完毕...
[唤醒所有消费线程]  线程: ThreadA-1...
[消费中]  线程: ThreadB-1 消费~
线程: ThreadB-1 消费完毕...
[唤醒所有生产线程]  线程: ThreadB-1...
[消费线程] 线程: ThreadB-1 await...
[消费线程] 线程: ThreadB-2 await...
...

由此可见,等待/通知机制的经典范式,等待方需要遵循如下原则:

    1. 获取对象的锁;
    1. 如果条件不满足,则调用对象的 wait() 方法,被通知后仍要检查条件;
    1. 条件满足则执行对应的逻辑。

伪代码如下:

1
2
3
4
5
6
synchronized (对象) {
    while (条件不满足) {
        对象.wait();
    }
    对应的处理逻辑
}

通知方需要遵循如下原则:

    1. 获得对象的锁;
    1. 改变条件;
    1. 通知所有等待在对象上的线程。

伪代码如下:

1
2
3
4
synchronized (对象) {
    改变条件
    对象.notifyAll();
}

管道输入/输出流

管道输入/输出流和普通的文件输入/输出流或者网络输入/输出流不同之处在于:管道输入/输出流主要用于线程之间的通信,也就是数据传输,而传输的媒介是内存。

管道输入/输出流主要由 4 种具体实现:

  • 面向字节
    • PipedOutputStream
    • PipedInputStream
  • 面向字符
    • PipedReader
    • PipedWriter

一个 PipedOutputStream 必须和一个 PipedInputStream 实例对象进行连接(调用 connect 方法)而产生一个通信管道。PipedOutputStream 用于向管道中写入数据,而 PipedInputStream 用于从管道中取出数据。

PipedOutputStream 类似于生产者,PipedInputStream 类似于消费者。在 PipedInputStream 中有一个 buffer 字节数组缓冲区,默认大小为 1024,存放生产者所提供的资源。变量 in 表示生产者生产资源的多少,变量 out 表示消费者消费资源的多少。in=-1 表示消费完了,in==out 表示生产满了。

下面通过一个实例进行说明。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
public class Test {
    public static void main(String[] args) throws IOException {
        PipedWriter out = new PipedWriter();
        PipedReader in = new PipedReader();
        // 将输入流和输出流进行连接
        out.connect(in);

        Thread thread = new Thread(new Print(in), "PrintThread");
        thread.start();

        int receive = 0;
        try {
            while ((receive = System.in.read()) != -1) {
                out.write(receive);
            }
        } finally {
            out.close();
        }
    }

    static class Print implements Runnable {
        private PipedReader in;

        public Print(PipedReader in) {
            this.in = in;
        }

        @Override
        public void run() {
            int receive = 0;
            try {
                while ((receive = in.read()) != -1) {
                    System.out.print((char)receive);
                }
            } catch (IOException e) {
            }
        }
    }
}

输出结果如下所示:

1
2
123
123

由于我输入的是 123,则对应的输出也是 123。因此,以上程序的功能就是创建了一个线程 thread,用来接收 main 线程的输入,main 线程的任何输入都通过 PipedWriter 写入管道,而 thread 在管道的另一端通过 PipedReader 将内容读出并打印。

join() 方法

如果在 main 线程中调用了 thread.join() 方法,则 main 线程会等待 thread 线程执行完或等待一定的时间。如果调用的是无参的 join 方法,则等待 thread 执行完毕;如果是调用了有参的 join 方法,则需要等待一段指定的时间。

Thread 类中有 3 个重载的 join 方法:

1
2
3
4
5
6
7
public final void join() throws InterruptedException {
    join(0);
}

public final synchronized void join(long millis) throws InterruptedException {...}

public final synchronized void join(long millis, int nanos) throws InterruptedException {...}

如果直接调用 thread.join() 方法的话,实际上调用的是 join(0) 方法,而在 join(long millis) 方法中,由于传入的参数为 0,因此会再调用 Object 类中的 wait(0) 方法。如下所示:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
// Thread 类中的 join 方法
public final synchronized void join(long millis) throws InterruptedException {
    long base = System.currentTimeMillis();
    long now = 0;

    if (millis < 0) {
        throw new IllegalArgumentException("timeout value is negative");
    }

    if (millis == 0) {
        while (isAlive()) {
            wait(0);
        }
    } else {
        while (isAlive()) {
            long delay = millis - now;
            if (delay <= 0) {
                break;
            }
            wait(delay);
            now = System.currentTimeMillis() - base;
        }
    }
}


// Object 类中的 wait 方法
public final native void wait(long timeout) throws InterruptedException;

下面通过具体的实例进行说明:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
public class Test {

    public static void main(String[] args) {
        System.out.println("进入线程: " + Thread.currentThread().getName());
        Test test = new Test();
        MyThread thread = test.new MyThread();
        thread.start();

        try {
            System.out.println("线程: " + thread.currentThread().getName() + " 开始等待...");
            thread.join();
            System.out.println("线程: " + thread.currentThread().getName() + " 继续执行...");
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    class MyThread extends Thread {
        @Override
        public void run() {
            System.out.println("进入线程: " + Thread.currentThread().getName());
            try {
                Thread.currentThread().sleep(5000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("线程: " + Thread.currentThread().getName() + " 执行完毕...");
        }
    }
}

输出结果如下所示:

1
2
3
4
5
进入线程: main
线程: main 开始等待...
进入线程: Thread-0
线程: Thread-0 执行完毕...
线程: main 继续执行...

通过结果可以看出,当 mian 线程执行到 thread.join() 方法时,main 线程会获得线程对象 thread 的锁,也就是因为执行了 wait 方法而拿到该对象的锁。只要 thread 线程一直存活,就会调用该对象锁的 wait 方法阻塞 mian 线程。

这里需要注意的是,既然调用了 wait 方法阻塞了 main 线程,那么什么时候将 main 线程唤醒呢?从 JVM 源码中可以看到,其内部调用了 notifyAll 方法,会通知所有等待在该线程对象上的线程,因此 main 线程也就能够继续执行了。

小结:

  • join 方法会调用 wait 方法,让宿主线程(例如上面的 main 线程)进入阻塞状态,并且会让线程交出 CPU 执行权限;
  • join 方法会让线程释放对一个对象持有的锁;
  • 如果调用了 join 方法,则必须捕获 InterruptedException 异常或向上抛出。

参考