首页 > 文章列表 > Java阻塞队列BlockingQueue的实例分析

Java阻塞队列BlockingQueue的实例分析

java BlockingQueue
365 2023-04-29

Java阻塞队列BlockingQueue实例分析

队列的类型

  • 无限队列(unbounded queue) 无容量限定,只随存储变化

  • 有限队列(bounded queue) 定义了最大容量

向无限队列添加元素的所有操作都将永远不会阻塞(也是线程安全的),因此它可以增长到非常大的容量。 使用无限阻塞队列 BlockingQueue 设计生产者 - 消费者模型时最重要的是消费者应该能够像生产者向队列添加消息一样快地消费消息 。否则可能内存不足而抛出 OutOfMemory 异常。

数据结构

  • 1.通常使用链表或数组实现

  • 2.一般具有 FIFO(先进先出) 特性,也可以设计为双端队列

  • 3.队列的主要操作:入队和出队

阻塞队列 BlockingQueue

定义:线程通信中,在任意时刻,无论并发有多高,在单个 JVM 上,同一时间永远只有一个线程能对队列进行入队或出队操作。BlockingQueue 可以在线程之间共享而无需任何显式同步

阻塞队列的类型:  

JAVA中的应用场景 : 线程池、SpringCloud-Eureka 三级缓存、Nacos、MQ、Netty 等

常见的阻塞队列

  • ArrayBlockingQueue : 由数组支持的有界队列

    • 应用场景: 线程池中有比较多的应用、生产者消费者模型

    • 工作原理: 基于 ReentrantLock 保证线程安全,根据Condition实现队列满时的阻塞

  • LinkedBlockingQueue : 基于链表的无界队列(理论上有界)

  • PriorityBlockingQueue : 由优先级堆支持的无界优先级队列

  • DelayQueue : 由优先级堆支持的、基于时间的调度队列,内部基于无界队列PriorityQueue 实现,而无界队列基于数组的扩容实现

    • 使用方法: 入队的对象必须要实现 Delayed 接口,而 Delayed 集成自 Comparable 接口

    • 应用场景: 售卖电影票等

    • 工作原理: 队列内部会根据时间优先级进行排序。延迟类线程池周期执行。

它们都实现了BlockingQueue接口,都有put()和take()等方法,创建方式如下:

BlockingQueue<String> blockingQueue = new ArrayBlockingQueue<> (666);

BlockingQueue API

添加元素:

方法含义
add()如果插入成功则返回 true,否则抛出 IllegalStateException 异常
put()将指定的元素插入队列,如果队列满了,会阻塞直到有空间插入
offer()如果插入成功则返回 true,否则返回 false
offer(E e, long timeout, TimeUnit unit)尝试将元素插入队列,如果队列已满,会阻塞直到有空间插入,阻塞有时间控制

检索元素:

方法含义
take()获取队列的头部元素并将其删除,如果队列为空,则阻塞并等待元素变为可用
poll(long timeout, TimeUnit unit)检索并删除队列的头部,如有必要,等待指定的等待时间以使元素可用,如果超时,则返回 null

ArrayBlockingQueue 源码简解

实现:同步等待队列(CLH)+ 条件等待队列满足条件的元素在CLH队列中等待锁,不满足条件的队列挪到条件等待队列,满足条件后再从 tail 插入 CLH 队列

线程获取锁的条件: 在 CLH 队列里等待的 Node 节点,并且 Node 节点的前驱节点是 Singal。条件等待队列里的线程是无法获取锁的。

/**

 * 构造方法

 * 还有两个构造函数,一个无fair参数,一个可传入集合,创建时插入队列

 * @param capacity 固定容量

 * @param fair 默认是false:访问顺序未指定; true:按照FIFO顺序处理

 */

public ArrayBlockingQueue(int capacity, boolean fair) {

   if (capacity <= 0)

        throw new IllegalArgumentException();

    this.items = new Object[capacity];

    lock = new ReentrantLock(fair); // 根据fair创建对应的锁

    // 条件对象,配合容器能满足业务

    notEmpty = lock.newCondition(); // 出队条件对象

    notFull =  lock.newCondition(); // 入队条件对象

}

/**

 * 入队方法

 * 在队列的尾部插入指定的元素,如果队列已满,则等待空间可用

 */

public void put(E e) throws InterruptedException {

    checkNotNull(e); // 检查put对象是否为空,空抛出异常

    final ReentrantLock lock = this.lock;

    lock.lockInterruptibly(); // 若未被中断尝试获取锁,详见下文

    try {

        // 队列中元素的数量 等于 排队元素的长度

        while (count == items.length)

            notFull.await(); // 见下文

        enqueue(e); // 元素入队

    } finally {

        lock.unlock();

    }

}

/**

 * 出队方法

 * 获取队列的头部元素并将其删除,如果队列为空,则阻塞并等待元素变为可用

 */

public E take() throws InterruptedException {

    final ReentrantLock lock = this.lock;

    lock.lockInterruptibly(); // 见下文

    try {

        while (count == 0)

            notEmpty.await(); // 见下文

        return dequeue(); // 元素出队

    } finally {

        lock.unlock();

    }

}

令当前线程等待,直到收到信号或被中断详:与此 Condition 关联的锁被自动释放,进入等待,并且处于休眠状态,直到发生以下四种情况之一:

  • ①其他线程调用这个Condition的 signal 方法,当前线程恰好被选为要被唤醒的线程;

  • ②其他线程调用这个条件的 signalAll 方法

  • ③其他线程中断当前线程,支持中断线程挂起;

  • ④一个“虚假的唤醒”发生了。

在这些情况下,在此方法返回之前,当前线程必须重新获得与此条件相关联的锁。当线程返回时,保证它持有这个锁。

如果当前线程有以下两种情况之一:

  • ①在进入该方法时设置中断状态;

  • ②在等待时被中断,支持线程挂起的中断 抛出InterruptedException

生产者消费者模式

BlockingQueue 可以在线程之间共享而无需任何显式同步,在生产者消费者之间,只需要将阻塞队列以参数的形式进行传递即可。它内部的机制会自动保证线程的安全性。

生产者:实现了 Runnable 接口,每个生产者生产100种商品和1个中断标记后完成线程任务

@Slf4j

@Slf4j

public class Producer implements Runnable{

    // 作为参数的阻塞队列

    private BlockingQueue<Integer> blockingQueue;

    private final int stopTag;

    /**

     * 构造方法

     * @param blockingQueue

     * @param stopTag

     */

    public Producer(BlockingQueue<Integer> blockingQueue,int stopTag) {

        this.blockingQueue = blockingQueue;

        this.stopTag = stopTag;

    }

    @Override

    public void run() {

        try {

            generateNumbers();

        } catch (InterruptedException e) {

            Thread.currentThread().interrupt();

        }

    }

   private void generateNumbers() throws InterruptedException {

        // 每个生产者都随机生产10种商品

        for (int i = 0; i < 10; i++) {

            int product = ThreadLocalRandom.current().nextInt(1000,1100);

            log.info("生产者{}号,生产了商品,编号为{}",Thread.currentThread().getId(),product);

            blockingQueue.put(product);

        }

        // 生产终止标记

        blockingQueue.put(stopTag);

        log.info("生产者{}号,生产了第终止标记编号{}",Thread.currentThread().getId(),Thread.currentThread().getId());

    }

}

消费者:消费者拿到终止消费标记终止消费,否则消费商品,拿到终止标记后完成线程任务

@Slf4j

public class Consumer implements Runnable{

    // 作为参数的阻塞队列

    private BlockingQueue<Integer> queue;

    private final int stopTage;

    public Consumer(BlockingQueue<Integer> queue, int stopTage) {

        this.queue = queue;

        this.stopTage = stopTage;

    }

    @Override

    public void run() {

        try {

            while (true) {

                Integer product = queue.take();

                if (product.equals(stopTage)) {

                    log.info("{}号消费者,停止消费,因为拿到了停止消费标记",Thread.currentThread().getId());

                    return;

                }

                log.info("{}号消费者,拿到的商品编号:{}",Thread.currentThread().getId(),product);

            }

        } catch (InterruptedException e) {

            Thread.currentThread().interrupt();

        }

    }

}

客户端类: 创建与计算机 CPU 核数相同的线程数,与 16个生产者

public class ProductConsumerTest {

    public static void main(String[] args) {

        // 阻塞队列容量

        int blockingQueueSize = 10;

        // 生产者数量

        int producerSize = 16;

        // 消费者数量 = 计算机线程核数 8

        int consumerSize = Runtime.getRuntime().availableProcessors();

        // 终止消费标记

        int stopTag = Integer.MAX_VALUE;

        BlockingQueue<Integer> blockingQueue = new ArrayBlockingQueue<>(blockingQueueSize);

        // 创建16个生产者线程

        for (int i = 0; i < producerSize; i++) {

            new Thread(new Producer(blockingQueue, stopTag)).start();

        }

        // 创建8个消费者线程

        for (int j = 0; j < consumerSize; j++) {

            new Thread(new Consumer(blockingQueue, stopTag)).start();

        }

    }

}

延迟队列 DelayQueue

定义: Java 延迟队列提供了在指定时间才能获取队列元素的功能,队列头元素是最接近过期的元素。没有过期元素的话,使用 poll() 方法会返回 null 值,超时判定是通过getDelay(TimeUnit.NANOSECONDS)方法的返回值小于等于 0 来判断。延时队列不能存放空元素。

/**

 * 电影票类,实现了Delayed接口,重写 compareTo 和 getDelay方法

 */

public class MovieTicket implements Delayed {

    //延迟时间

    private final long delay;

    //到期时间

    private final long expire;

    //数据

    private final String msg;

    //创建时间

    private final long now;

    public long getDelay() {

        return delay;

    }

    public long getExpire() {

        return expire;

    }

    public String getMsg() {

        return msg;

    }

    public long getNow() {

        return now;

    }

    /**

     * @param msg 消息

     * @param delay 延期时间

     */

    public MovieTicket(String msg , long delay) {

        this.delay = delay;

        this.msg = msg;

        expire = System.currentTimeMillis() + delay;    //到期时间 = 当前时间+延迟时间

        now = System.currentTimeMillis();

    }

    /**

     * @param msg

     */

    public MovieTicket(String msg){

        this(msg,1000);

    }

    public MovieTicket(){

        this(null,1000);

    }

    /**

     * 获得延迟时间   用过期时间-当前时间,时间单位毫秒

     * @param unit

     * @return

     */

    @Override

    public long getDelay(TimeUnit unit) {

        return unit.convert(this.expire

                - System.currentTimeMillis() , TimeUnit.MILLISECONDS);

    }

    /**

     * 用于延迟队列内部比较排序  当前时间的延迟时间 - 比较对象的延迟时间

     * 越早过期的时间在队列中越靠前

     * @param delayed

     * @return

     */

    @Override

    public int compareTo(Delayed delayed) {

        return (int) (this.getDelay(TimeUnit.MILLISECONDS)

                - delayed.getDelay(TimeUnit.MILLISECONDS));

    }

}

测试类:

public static void main(String[] args) {

    DelayQueue<MovieTicket> delayQueue = new DelayQueue<MovieTicket>();

    MovieTicket ticket = new MovieTicket("电影票1",10000);

    delayQueue.put(ticket);

    MovieTicket ticket1 = new MovieTicket("电影票2",5000);

    delayQueue.put(ticket1);

    MovieTicket ticket2 = new MovieTicket("电影票3",8000);

    delayQueue.put(ticket2);

    log.info("message:--->入队完毕");

    while( delayQueue.size() > 0 ){

        try {

            ticket = delayQueue.take();

            log.info("电影票出队:{}",ticket.getMsg());

        } catch (InterruptedException e) {

            e.printStackTrace();

        }

    }

}

从运行结果可以看出队列是延迟出队,间隔和我们所设置的时间相同