<JavaEE> 经典设计模式之 -- 使用阻塞队列实现“生产者-消费者模型”

2023-12-13 16:32:47

目录

一、阻塞队列和“生产者-消费者模型”之间的关系

二、标准库提供了阻塞队列

三、实现自己的阻塞队列

3.1?基于数组实现普通的环形队列

3.2?将上述代码改造为线程安全

3.3 增加阻塞功能

四、使用阻塞队列实现“生产者-消费者模型”


一、阻塞队列和“生产者-消费者模型”之间的关系

1)什么是阻塞队列?

队列是一种“先进先出”的数据结构,阻塞队列是一种带有阻塞功能、线程安全的队列。

当队列满时,继续入队列则会阻塞,直到队列不为满时,才会继续入队列。

当队列空时,继续出队列则会阻塞,直到队列不为空时,才会继续出队列。

2)什么是“生产者-消费者模型”?

“生产者-消费者模型”是一种经典的开发模型,是阻塞队列的典型应用场景。

“生产者-消费者模型”主要由生产者-容器(阻塞队列)-消费者构成。

3)阻塞队列在“生产者-消费者模型”中的作用
<1>

阻塞队列可以让生产者和消费者之间解耦合。

使用阻塞队列后,生产者和消费者之间不再直接通信,而是通过阻塞队列进行沟通。

<2>阻塞队列相当于生产者和消费者之间的缓冲区,可以平衡“生产速度”和“消费速度”。

图示演示“生产者-消费者模型”:


二、标准库提供了阻塞队列

1)标准库提供了哪些阻塞队列?

在 Java 标准库中内置了阻塞队列 —— BlockingQueue 。?

BlockingQueue 是一个接口,接口的实现类包括 ArrayBlockingQueue、LinkedBlockingQueue、PriorityBlockingQueue 等。

2)BlockingQueue 的常用方法

常用方法有:

put() 方法,带阻塞功能,用于入队列。

take() 方法,带阻塞功能,用于出队列。

BlockingQueue 也提供了 offer()、poll()、peek() 等方法,但这些方法没有阻塞功能。

三、实现自己的阻塞队列

3.1?基于数组实现普通的环形队列

基于数组实现普通的环形队列

队列中应至少包括以下内容:

一个用于存放元素的数组 elems
指向队首元素的索引 head
指向队尾元素的索引 tail
用于记录队列内有效元素个数的 size
用于构造对象,参数为数组容量的构造方法
用于入队列的 put() 方法
用于出队列的 take() 方法

代码演示基于数组实现普通的环形队列:

class MyBlockingQueue {
    //数组存放元素;
    private String[] elems = null;
    //头节点;
    private int head = 0;
    //尾节点;
    private int tail = 0;
    //队列内有效元素个数;
    private int size = 0;

    //参数为数组容量的构造方法;
    public MyBlockingQueue(int capacity){
        elems = new String[capacity];
    }

    //入队列方法;
    public void put(String elem){
        if(size == elems.length){
            return;
        }
        elems[tail] = elem;
        tail++;
        if(tail >= elems.length){
            tail = 0;
        }
        size++;
    }
    
    //出队列方法;
    public String take(){
        if(size == 0){
            return "";
        }
        String elem = elems[head];
        elems[head] = null;
        head++;
        if(head >= elems.length){
            head = 0;
        }
        size--;
        return elem;
    }
}

3.2?将上述代码改造为线程安全

将上述代码改造为线程安全

以 put() 方法为例,对以下三部分进行分析:

<1> 在多线程环境下,多个线程可以同时修改共享变量 head、tail、size 等,因此需要使用 volatile 对共享变量进行修饰,保证其内存可见性。

<2> 图示分析“写操作”:

<3> 图示分析“读写操作非原子”:

依照上述三个部分的分析结果,可以对代码进行改造,改造结果如下:

class MyBlockingQueue {
    //数组存放元素;
    private String[] elems = null;
    //头节点;
    private volatile int head = 0;
    //尾节点;
    private volatile int tail = 0;
    //队列内有效元素个数;
    private volatile int size = 0;

    public MyBlockingQueue(int capacity){
        elems = new String[capacity];
    }

    //入队列方法;
    public void put(String elem){
        //加锁;
        synchronized (this){
            if(size == elems.length){
                return;
            }
            elems[tail] = elem;
            tail++;
            if(tail >= elems.length){
                tail = 0;
            }
            size++;
        }
    }

    //出队列方法;
    public String take(){
        String elem = null;
        //加锁;
        synchronized (this){
            if(size == 0){
                return "";
            }
            elem = elems[head];
            elems[head] = null;
            head++;
            if(head >= elems.length){
                head = 0;
            }
            size--;
        }
        return elem;
    }
}

3.3 增加阻塞功能

将上述代码增加阻塞功能

1)什么时候阻塞?

入队列时,如果队列已满,则线程阻塞。

出队列时,如果队列已空,则线程阻塞。

2)什么时候唤醒?

有新元素入队列,则唤醒阻塞等待的出队列线程。

有新元素出队列,则唤醒阻塞等待的入队列线程。

以 put() 方法为例,图示演示分析代码需要改动的部分:

代码演示阻塞队列:

class MyBlockingQueue {
    //数组存放元素;
    private String[] elems = null;
    //头节点;
    private volatile int head = 0;
    //尾节点;
    private volatile int tail = 0;
    //队列内有效元素个数;
    private volatile int size = 0;

    public MyBlockingQueue(int capacity){
        elems = new String[capacity];
    }

    //入队列方法;
    public void put(String elem) throws InterruptedException {
        synchronized (this){
            while (size == elems.length){
                this.wait();
            }
            elems[tail] = elem;
            tail++;
            if(tail >= elems.length){
                tail = 0;
            }
            size++;
            this.notifyAll();
        }
    }
    //出队列方法;
    public String take() throws InterruptedException {
        String elem = null;
        synchronized (this){
            while (size == 0){
                this.wait();
            }
            elem = elems[head];
            elems[head] = null;
            head++;
            if(head >= elems.length){
                head = 0;
            }
            size--;
            this.notify();
        }
        return elem;
    }
}

四、使用阻塞队列实现“生产者-消费者模型”

代码内容分析

有一个阻塞队列和两个线程,其中

线程 t1 做为“生产者”,不断“生产”自增的数字 num ,并将数字入队列。

线程 t2 做为“消费者”,不断“消费”队列中的数字 num ,即将 num 从队列中取出并打印。

使用上述自己实现的阻塞队列,代码演示“生产者-消费者模型”:

    public static void main(String[] args) throws InterruptedException {
        //新建容量为10的阻塞队列;
        MyBlockingQueue queue = new MyBlockingQueue(10);

        //生产者:不断生产自增的num;
        Thread t1 = new Thread(()->{
            int num = 0;
            while (true){
                try {
                    queue.put(num + "");
                    System.out.println("生产者:" + num);
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
                num++;
            }
        });

        //消费者:每隔1秒消费一个num;
        Thread t2 = new Thread(()->{
            while (true){
                try {
                    System.out.println("消费者:" + queue.take());
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
            }
        });

        t1.start();
        t2.start();
    }

//运行结果:
生产者:0
消费者:0
生产者:1
生产者:2
生产者:3
生产者:4
生产者:5
生产者:6
生产者:7
生产者:8
生产者:9
生产者:10
消费者:1
生产者:11
消费者:2
生产者:12
消费者:3
生产者:13
...

可以看到在生产者将阻塞队列放满后,开始阻塞,
等待消费者取出元素后,才又开始生产元素。

阅读指针 -> 《经典设计模式之“定时器”》

<JavaEE> 经典设计模式之 -- 定时器-CSDN博客介绍什么是定时器,以及 Java 标准库中的定时器类。实现自己的定时器类。https://blog.csdn.net/zzy734437202/article/details/134837039

文章来源:https://blog.csdn.net/zzy734437202/article/details/134807241
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。