阻塞队列是一种特殊的队列,也遵守 “先进先出” 的原则。

阻塞队列能是一种线程安全的数据结构, 并且具有以下特性:

  • 当队列满的时候, 继续入队列就会阻塞, 直到有其他线程从队列中取走元素;
  • 当队列空的时候, 继续出队列也会阻塞, 直到有其他线程往队列中插入元素。

JAVA标准库中已经实现了阻塞队列,我们可以直接进行使用

BlockingQueue

BlockingQueue是一个接口,阻塞队列也和普通队列一样有两种实现方式:数组和链表。

注:创建阻塞队列时需要传入队列的长度参数。

BlockingQueue queue = new ArrayBlockingQueue(10);

由于BlockingQueue继承自Queue所以普通队列的接口也可以正常使用,但是没有阻塞效果。

BlockingQueue提供了两个带有阻塞效果且线程安全的方法:put()和take()。

public static void main(String[] args) throws InterruptedException {//创建一个长度为10的阻塞队列BlockingQueue queue = new ArrayBlockingQueue(10);//入队五次queue.put("1");queue.put("2");queue.put("3");queue.put("4");queue.put("5");//出队列六次System.out.println(queue.take());System.out.println(queue.take());System.out.println(queue.take());System.out.println(queue.take());System.out.println(queue.take());//由于此时队列为空,所以会出现阻塞System.out.println(queue.take());}

为了更好的理解阻塞队列我们可以自己设计一个简单的阻塞队列。

模拟实现

先写一个普通的循环队列

public class MyBlockingQueue {private String[] queue = new String[10];//表示队列内元素数量private int size;//头尾指针private int head;private int tail;//入队列public boolean put(String str) {if (this.size == 10) {//队列满return false;}this.queue[this.tail++] = str;this.tail %= 10;this.size++;return true;}//出队列public String take() {if (this.size == 0) {//队列空return null;}String ret = this.queue[this.head];this.head = (++this.head+10) % 10;this.size--;return ret;}}

现在它是线程不安全的所以我们应该加锁,因为里面的两个方法几乎每一步都有修改操作所以我们直接给整个方法都加上锁

//入队列public synchronized boolean put(String str) {……}//出队列public synchronized String take() {……}

为了防止编译器优化我们对值会被修改的属性都使用volatile进行修饰

public class MyBlockingQueue {private String[] queue = new String[10];//表示队列内元素数量private volatile int size;//头尾指针private volatile int head;private volatile int tail;}

接下来我们还需加上阻塞的特性即:

  • 当队列满的时候, 继续入队列就会阻塞, 直到有其他线程从队列中取走元素;
  • 当队列空的时候, 继续出队列也会阻塞, 直到有其他线程往队列中插入元素。

我们只需在put()方法判断队列满之后将返回修改为等待即可。

//入队列public synchronized boolean put(String str) {if (this.size == 10) {//队列满try {this.wait();} catch (InterruptedException e) {throw new RuntimeException(e);}}this.queue[this.tail++] = str;this.tail %= 10;this.size++;return true;}

当任意线程调用take()方法后put()方法就应该继续执行入队操作,所以在tack方法的最后应该加上notify()方法来唤醒线程。

//出队列public synchronized String take() {if (this.size == 0) {//队列空return null;}String ret = this.queue[this.head];this.head = (++this.head+10) % 10;this.size--;this.notify();return ret;}

出队列的阻塞也和入队列的阻塞原理相同。

//入队列public synchronized boolean put(String str) {if (this.size == 10) {//队列满try {this.wait();} catch (InterruptedException e) {throw new RuntimeException(e);}}this.queue[this.tail++] = str;this.tail %= 10;this.size++;this.notify();return true;}//出队列public synchronized String take() {if (this.size == 0) {//队列空try {this.wait();} catch (InterruptedException e) {throw new RuntimeException(e);}}String ret = this.queue[this.head];this.head = (++this.head+10) % 10;this.size--;this.notify();return ret;}

wait()和notify()的对应关系如下:

此时代码还是有一个非常隐蔽的BUG。那就是wait()除了可以被notify()唤醒外还可以被interrupt唤醒所以应该将if判断改为while循环。

//入队列public synchronized boolean put(String str) {while (this.size == 10) {//队列满try {this.wait();} catch (InterruptedException e) {throw new RuntimeException(e);}}……}//出队列public synchronized String take() {while (this.size == 0) {//队列空try {this.wait();} catch (InterruptedException e) {throw new RuntimeException(e);}}……}

测试

public static void main(String[] args) {MyBlockingQueue queue = new MyBlockingQueue();//入队五次queue.put("1");queue.put("2");queue.put("3");queue.put("4");queue.put("5");//出队列六次System.out.println(queue.take());System.out.println(queue.take());System.out.println(queue.take());System.out.println(queue.take());System.out.println(queue.take());//由于此时队列为空,所以会出现阻塞System.out.println(queue.take());}

public static void main(String[] args) {MyBlockingQueue queue = new MyBlockingQueue();//入队11次queue.put("1");queue.put("2");queue.put("3");queue.put("4");queue.put("5");queue.put("6");queue.put("7");queue.put("8");queue.put("9");queue.put("10");//由于队列满出现阻塞queue.put("11");}

在jconsole 中查看线程状态为WAITING

完整代码

public class MyBlockingQueue {private String[] queue = new String[10];//表示队列内元素数量private volatile int size;//头尾指针private volatile int head;private volatile int tail;//入队列public synchronized boolean put(String str) {while (this.size == 10) {//队列满try {this.wait();} catch (InterruptedException e) {throw new RuntimeException(e);}}this.queue[this.tail++] = str;this.tail %= 10;this.size++;this.notify();return true;}//出队列public synchronized String take() {while (this.size == 0) {//队列空try {this.wait();} catch (InterruptedException e) {throw new RuntimeException(e);}}String ret = this.queue[this.head];this.head = (++this.head+10) % 10;this.size--;this.notify();return ret;}}