Home 成长之路 手撕生产者消费者(BlockingQueue)

手撕生产者消费者(BlockingQueue)

参加北森评测时遇到了,当时我用的是C语言写的生产者消费者

今天学习了NIO,BlockingQueue(阻塞队列),ThreadPool(线程池相关概念),为复习巩固记录一个简易的生产者消费者Demo

代码地址:ThreadPool\MyBlockingQueue

 

 

生产者:

package MyBlockingQueue;

import java.util.Scanner;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;

/**
 * @Author Chengzhi
 * @Date 2021/4/18 0:47
 * @Version 1.0
 *
 * 角色:生产者
 * 说明:以实现 Runnable 接口的形式
 *      将自己包装成单个线程
 *      从而实现多线程
 */
public class Producer implements Runnable{

    /* 生产者与消费者之间应该有 BlockingQueue,那么这个 Queue 应该定义在外部更合理
       因此需要传入Queue  */
    private BlockingQueue queue;
    private volatile boolean  isRunning = true;//是否在运行标志
    public Producer(BlockingQueue queue){
        this.queue = queue;
    }

    /* 重写 run() 方法
       实现消息的创建,并放置到 Queue 中 */
    @Override
    public void run() {
        System.out.println(Thread.currentThread().getName() + " 号生产者---启动");
        try {
            while (isRunning) {
                //输入要发送的消息
                Scanner scanner = new Scanner(System.in);
                String msg = scanner.nextLine();
                System.out.println(Thread.currentThread().getName() + " 号生产者---输入成功!!!内容为:" + msg);

                //将消息放入阻塞队列,设定等待时间为5秒,如果5秒没加进去则返回 false
                boolean isSuccess = queue.offer(msg,5, TimeUnit.SECONDS);

                if(isSuccess){
                    System.out.println(Thread.currentThread().getName() + " 号生产者---生产成功");

                    //生产成功后休息十秒,给其他线程生产机会
                    Thread.sleep(10000);
                }
            }
        }catch (Exception e){
            e.printStackTrace();

            //改变线程中断状态
            Thread.currentThread().interrupt();
        }finally {
            System.out.println(Thread.currentThread().getName() + " 号生产者---退出");
        }
    }
    public void stop() {
        isRunning = false;
    }
}

消费者:
package MyBlockingQueue;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;

/**
 * @Author Chengzhi
 * @Date 2021/4/18 1:02
 * @Version 1.0
 *
 * 角色:消费者
 * 说明:以实现 Runnable 接口的形式
 *      将自己包装成单个线程
 *      从而实现多线程
 */
public class Customer implements Runnable{

    /* 同样需要传入阻塞队列 */
    private BlockingQueue<String> queue;

    public Customer(BlockingQueue queue){
        this.queue = queue;
    }

    /* 重写 run() 方法
       实现消息的消费,将消息从 Queue 中移除 */
    @Override
    public void run() {

        System.out.println(Thread.currentThread().getName() + " 号[消费者]---启动");
        // isRunning 用于判断是否有消息来到
        boolean isRunning = true;
        try {
            while (isRunning) {
                System.out.println(Thread.currentThread().getName() + " 号正在从阻塞队列 Queue 中获得消息...");
                //获取需要消费的消息
                System.out.println(Thread.currentThread().getName() + " 号[消费者]---等待五秒");

                //从阻塞队列取出一个队首的对象,如果在5秒内队列一旦有数据可取,则立即返回队列中的数据,否则返回false
                String msg = queue.poll(5, TimeUnit.SECONDS);//有数据时直接从队列的队首取走,无数据时阻塞,在5s内有数据,取走,超过5s还没数据,返回null

                //判断是否拿到消息
                if(null != msg){
                    //拿到数据并输出
                    System.out.println(Thread.currentThread().getName() + " 号[消费者]---拿到消息:"+ msg);
                }
                else{
                    // 超过5s还没数据,认为所有生产线程都已经退出,自动退出消费线程。
                    isRunning = false;
                }
            }
        } catch (InterruptedException e) {
            e.printStackTrace();

            //改变线程中断状态
            Thread.currentThread().interrupt();
        }finally {
            System.out.println(Thread.currentThread().getName() + " 号[消费者]---退出");
        }
    }
}

测试类:
package MyBlockingQueue;

import java.util.concurrent.*;

/**
 * @Author Chengzhi
 * @Date 2021/4/18 1:35
 * @Version 1.0
 */
public class MyBlockingTest {
    private static final int CORE_POOL_SIZE = 3;
    private static final int MAX_POOL_SIZE = 5;
    private static final int QUEUE_CAPACITY = 5;
    private static final Long KEEP_ALIVE_TIME = 1L;

    public static void main(String[] args) throws InterruptedException {

        BlockingQueue<String> Queue = new LinkedBlockingDeque<>(10);
        //创建一个生产者线程
        Producer Producer1 = new Producer(Queue);
        Producer Producer2 = new Producer(Queue);
        Producer Producer3 = new Producer(Queue);
        //创建一个消费者线程
        Customer customer1 = new Customer(Queue);

        //创建线程池接收线程任务
        //通过ThreadPoolExecutor构造函数自定义参数创建
        ThreadPoolExecutor executor = new ThreadPoolExecutor(
                CORE_POOL_SIZE,
                MAX_POOL_SIZE,
                KEEP_ALIVE_TIME,
                TimeUnit.SECONDS,
                new ArrayBlockingQueue<>(QUEUE_CAPACITY),
                new ThreadPoolExecutor.CallerRunsPolicy());

        //启动线程
        executor.execute(Producer1);
        executor.execute(Producer2);
        executor.execute(Producer3);
        executor.execute(customer1);

        // 执行60s
        Thread.sleep(10 * 1000);
        Producer1.stop();

        Thread.sleep(2000);
        // 退出Executor
        executor.shutdown();
    }
}

SIMILAR ARTICLES

发表评论

发表评论