A 直接给 B 发送数据,就是耦合性比较强,开发 A 的时候就得考虑 B 是如何接收的,开发 B 的时候就得考虑 A 是如何发送的。极端情况下 A 出现问题挂了 可以能也造成 B 出现问题导致 B 也挂了,反之 B 出现了问题,也会牵连 A 导致 A 挂了。

于是在阻塞队列的影响下,A 和 B 不再直接交互
开发阶段:A 只用考虑自己和队列如何交互,B 也只用考虑自己和队列如何交互,A 和 B 之间都不需要知道对方的存在。
部署阶段:A 如果挂了,对 B 没有任何影响;B 如果挂了,对 A 没有任何影响。

  1. 能够做到 “削峰填谷” ,提高整个系统抗风险能力。


程序猿无法控制外网有多少个用户在访问 A,当出现极端情况,外网访问请求大量涌入的时候,A 把所有请求的数据一并转让给 B 的时候,B 就容易扛不住而挂掉。

在阻塞队列的影响下
多出来的压力队列承担了,队列里多存一会儿数据就行了,即使 A 的压力比较大,B 仍按照固定的频率来取数据。


标准库中的阻塞队列

在 Java 标准库中内置了阻塞队列. 如果我们需要在一些程序中使用阻塞队列, 直接使用标准库中的即可

  • BlockingQueue 是一个接口. 真正实现的类是 LinkedBlockingQueue.
  • put 方法用于阻塞式的入队列, take 用于阻塞式的出队列.
  • BlockingQueue 也有 offer, poll, peek 等方法, 但是这些方法不带有阻塞特性.

生产者消费者模型:

public class Demo20 {    public static void main(String[] args) {        BlockingDeque<Integer> queue = new LinkedBlockingDeque<>();        Thread customer = new Thread(()->{           while (true){               try {                   int value = queue.take();                   System.out.println("消费元素:" + value);               } catch (InterruptedException e) {                   e.printStackTrace();               }           }        });        customer.start();        Thread producer = new Thread(()->{            int n = 0;            while (true){                try {                    System.out.println("生产元素:" + n);                    queue.put(n);                    n++;                    Thread.sleep(500);                } catch (InterruptedException e) {                    e.printStackTrace();                }            }        });        producer.start();    }}

运行结果演示图:

阻塞队列实现:

  • 自己模拟实现一个阻塞队列
  • 基于数组的方式来实现队列
  • 两个核心方法:1. put 入队列; 2. take 出队列
class MyBlockingQueue {    // 假定最大是 1000 个元素,当然也可以设定成可配置的    private int[] items = new int[1000];    //队首的位置    private int head = 0;    //队尾的位置    private int tail = 0;    //队列的元素个数    private int size = 0;    //入队列    public void put (int value) throws InterruptedException {        synchronized (this) {            while (size == items.length) {                //队列已满,继续等待                this.wait();            }            items[tail] = value;            tail++;            if (tail == items.length) {                //注意 如果 tail 到达数组末尾,就需要从头开始                tail = 0;            }            size++;            //即使没人在等待,多调用几次 notify 也没事,没负面影响            this.notify();        }    }    //出队列    public Integer take() throws InterruptedException {        int ret = 0;        synchronized (this) {            while (size == 0) {                //队列为空,就等待                this.wait();            }            ret = items[head];            head++;            if (head == items.length) {                head = 0;            }            size--;            this.notify();        }        return ret;    }}public class Demo21 {    public static void main(String[] args) throws InterruptedException {        MyBlockingQueue queue = new MyBlockingQueue();        queue.put(100);        queue.take();    }}
  • 入队列中的 wait出队列中的 notify 对应,满了之后,入队列就要阻塞等待,此时在取走元素之后,就可以尝试唤醒了。
  • 入队列中的 notify出队列中的 wait 对应,队列为空,也要阻塞,此时在插入成功之后,队列就不为空了,就能够把 take 的等待唤醒。
  • 一个线程中无法做到又等待又唤醒
  • 阻塞之后,就要唤醒,阻塞和唤醒之间是沧海桑田,虽然按照当下代码是有元素插入成功了,条件不成立,等待结束。但是更稳妥的做法是把 if 换成 while ,在唤醒之后,再判断一次条件!万一条件又成立了呢?万一接下来要继续阻塞等待呢?

测试代码:

public class Demo21 {    public static void main(String[] args) {        MyBlockingQueue queue = new MyBlockingQueue();        Thread customer = new Thread(()->{           while (true){               int value = 0;               try {                   value = queue.take();                   System.out.println("消费:" + value);                   Thread.sleep(500);               } catch (InterruptedException e) {                   e.printStackTrace();               }           }        });        customer.start();        Thread producer = new Thread(()->{           int value = 0;           while (true){               try {                   queue.put(value);                   System.out.println("生产:" + value);                   value++;               } catch (InterruptedException e) {                   e.printStackTrace();               }           }        });        producer.start();    }}

延缓了消费代码,也可以把生产代码延缓,调用 sleep 即可

  • 延缓消费代码
  • 延缓生产代码