常见队列介绍与缓冲实现
一 常见队列二 常见队列的方法1.ArrayBlockingQueue2.LinkedBlockingQueue3.LinkedBlockingDeque4.Concurrent
SEO靠我LinkedQueue5.SynchronousQueue6.LinkedTransferQueue1.定义缓冲队列类2.定义数据对象3.测试代码
一 常见队列
队列说明ArrayBlockingQueu
SEO靠我e有界LinkedBlockingQueue有/无界LinkedBlockingDeque无界ConcurrentLinkedQueue无界SynchronousQueue无界LinkedTransf
SEO靠我erQueue无界DelayQueue延迟
二 常见队列的方法
1.ArrayBlockingQueue
public static void arrayBlockingQueue() throws Int
SEO靠我erruptedException {/*** 有界阻塞队列,初始化必须指定大小*/ArrayBlockingQueue<Integer> queue = new ArrayBlockingQueue
SEO靠我<>(1);/*** 阻塞插入,容量不足会阻塞,直到有空闲位置*/queue.put(1);/*** 非阻塞插入,返回插入状态 true/false*/queue.offer(1);/*** 阻塞指定
SEO靠我时长插入,阻塞时间内有空闲位置则可插入,返回插入状态 true/false*/queue.offer(1,1, TimeUnit.SECONDS);/*** 异常插入,无空闲位置则抛出异常*///qu
SEO靠我eue.add(1);/*** 非阻塞拉取,取出并移除元素*/queue.poll();/*** 阻塞指定时长拉取,阻塞时间内有数据则直接取数,取出并移除元素*/queue.poll(1,TimeUn
SEO靠我it.SECONDS);/*** 非阻塞拉取,队首为空返回NULL,取出不移除元素*/queue.peek();/*** 阻塞拉取,直到有元素*/queue.take();}
2.LinkedBlockSEO靠我ingQueue
public static void linkedBlockingQueue() throws InterruptedException {/*** 指定长度即为有界* 不指定长度即为
SEO靠我无界*/LinkedBlockingQueue<Integer> queue = new LinkedBlockingQueue<>(1);/*** 阻塞插入,容量不足会阻塞,直到有空闲位置*/que
SEO靠我ue.put(1);/*** 非阻塞插入,返回插入状态 true/false*/queue.offer(1);/*** 阻塞指定时长插入,阻塞时间内有空闲位置则可插入,返回插入状态 true/fals
SEO靠我e*/queue.offer(1,1, TimeUnit.SECONDS);/*** 异常插入,无空闲位置则抛出异常*///queue.add(1);/*** 非阻塞拉取,取出并移除元素*/queue
SEO靠我.poll();/*** 阻塞指定时长拉取,阻塞时间内有数据则直接取数,取出并移除元素*/queue.poll(1,TimeUnit.SECONDS);/*** 非阻塞拉取,队首为空返回NULL,取出
SEO靠我不移除元素*/queue.peek();/*** 阻塞拉取,直到有元素*/queue.take();}
3.LinkedBlockingDeque
public static void linkedBlo
SEO靠我ckingDeque() throws InterruptedException {/*** 消费默认从头开始*/LinkedBlockingDeque<Integer> deque = new Li
SEO靠我nkedBlockingDeque<>(1);/*** 阻塞插入,容量不足会阻塞,直到有空闲位置*/deque.put(1);/*** 非阻塞插入,返回插入状态 true/false*/deque.o
SEO靠我ffer(1);/*** 阻塞指定时长插入,阻塞时间内有空闲位置则可插入,返回插入状态 true/false*/deque.offer(1,1, TimeUnit.SECONDS);/*** 异常插入
SEO靠我,无空闲位置则抛出异常*///deque.add(1);/*** 从头非阻塞插入,返回插入状态 true/false*/deque.offerFirst(1);/*** 从头阻塞指定时长插入,阻塞时间
SEO靠我内有空闲位置则可插入,返回插入状态 true/false*/deque.offerFirst(1,1,TimeUnit.SECONDS);/*** 从尾非阻塞插入,返回插入状态 true/false*
SEO靠我/deque.offerLast(1);/*** 从尾阻塞指定时长插入,阻塞时间内有空闲位置则可插入,返回插入状态 true/false*/deque.offerFirst(1,1,TimeUnit.
SEO靠我SECONDS);/*** 非阻塞拉取,取出并移除元素*/deque.poll();/*** 阻塞指定时长拉取,阻塞时间内有数据则直接取数,取出并移除元素*/deque.poll(1,TimeUnit
SEO靠我.SECONDS);/*** 非阻塞拉取头元素,取出并移除元素*/deque.pollFirst();/*** 阻塞指定时长拉取头元素,阻塞时间内有数据则直接取数,取出并移除元素*/deque.pol
SEO靠我lFirst(1,TimeUnit.SECONDS);/*** 非阻塞拉取尾元素,取出并移除元素*/deque.pollLast();/*** 阻塞指定时长拉取尾元素,阻塞时间内有数据则直接取数,取出
SEO靠我并移除元素*/deque.pollLast(1,TimeUnit.SECONDS);/*** 非阻塞拉取,队首为空返回NULL,取出不移除元素*/deque.peek();/*** 阻塞拉取,直到有元
SEO靠我素*/deque.take();/*** 阻塞拉取,直到头有元素*/deque.takeFirst();/*** 阻塞拉取,直到尾有元素*/deque.takeLast();}
4.ConcurrentSEO靠我LinkedQueue
public static void concurrentLinkedQueue(){/*** 无界,不支持指定大小* 无锁,通过 CAS 控制并发;可能存在读写不一致*/Con
SEO靠我currentLinkedQueue<Integer> queue = new ConcurrentLinkedQueue<>();/*** 非阻塞插入,返回插入状态 true/false*/queu
SEO靠我e.offer(1);/*** 内部调用了 offer 方法*/queue.add(1);/*** 非阻塞拉取,取出并移除元素*/queue.poll();/*** 非阻塞拉取,队首为空返回NULL,
SEO靠我取出不移除元素*/queue.peek();}public static void synchronousQueue () throws InterruptedException {ThreadPoo
SEO靠我lExecutor executor = new ThreadPoolExecutor(1, 100,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue
SEO靠我<>());/*** 同步队列 不存储数据 匹配生产者和消费者线程*/SynchronousQueue<Integer> queue = new SynchronousQueue<>();/*** 阻
SEO靠我塞插入,阻塞直到有消费线程与之匹配*/executor.execute(()->{try {queue.put(1);} catch (InterruptedException e) {throw n
SEO靠我ew RuntimeException(e);}});/*** 非阻塞插入,没有消费线程直接返回 true/false*/queue.offer(1);/*** 阻塞指定时长插入,阻塞时间内有消费线程
SEO靠我与之匹配则可插入,返回插入状态 true/false*/queue.offer(1,3, TimeUnit.SECONDS);/*** 异常插入,无消费线程则抛出异常*///queue.add(1);
SEO靠我/*** 非阻塞拉取,如果有生产线程,则消费其数据*/System.out.println("First:" + queue.poll());/*** 阻塞指定时长拉取,阻塞时间内有生产线程与之匹配则
SEO靠我直接取数,取出并移除元素*/System.out.println("Second:" + queue.poll(3,TimeUnit.SECONDS));/*** 无效方法 直接返回 NULL*/qu
SEO靠我eue.peek();/*** 阻塞插入,阻塞直到有消费线程与之匹配*/executor.execute(()->{try {queue.offer(999999,3, TimeUnit.SECOND
SEO靠我S);} catch (InterruptedException e) {throw new RuntimeException(e);}});/*** 阻塞拉取,直到有生产线程与之匹配*/System
SEO靠我.out.println("Third:" + queue.take());}
5.SynchronousQueue
public static void synchronousQueue () thro
SEO靠我ws InterruptedException {ThreadPoolExecutor executor = new ThreadPoolExecutor(1, 100,0L, TimeUnit.MI
SEO靠我LLISECONDS,new LinkedBlockingQueue<>());/*** 同步队列 不存储数据 匹配生产者和消费者线程*/SynchronousQueue<Integer> queue
SEO靠我 = new SynchronousQueue<>();/*** 阻塞插入,阻塞直到有消费线程与之匹配*/executor.execute(()->{try {queue.put(1);} catch
SEO靠我 (InterruptedException e) {throw new RuntimeException(e);}});/*** 非阻塞插入,没有消费线程直接返回 true/false*/queue
SEO靠我.offer(1);/*** 阻塞指定时长插入,阻塞时间内有消费线程与之匹配则可插入,返回插入状态 true/false*/queue.offer(1,3, TimeUnit.SECONDS);/**
SEO靠我* 异常插入,无消费线程则抛出异常*///queue.add(1);/*** 非阻塞拉取,如果有生产线程,则消费其数据*/System.out.println("First:" + queue.pol
SEO靠我l());/*** 阻塞指定时长拉取,阻塞时间内有生产线程与之匹配则直接取数,取出并移除元素*/System.out.println("Second:" + queue.poll(3,TimeUnit
SEO靠我.SECONDS));/*** 无效方法 直接返回 NULL*/queue.peek();/*** 阻塞插入,阻塞直到有消费线程与之匹配*/executor.execute(()->{try {que
SEO靠我ue.offer(999999,3, TimeUnit.SECONDS);} catch (InterruptedException e) {throw new RuntimeException(e)
SEO靠我;}});/*** 阻塞拉取,直到有生产线程与之匹配*/System.out.println("Third:" + queue.take());}
6.LinkedTransferQueue
public
SEO靠我 static void linkedTransferQueue() throws InterruptedException {/*** 无界非阻塞队列 类似 LinkedBlockingQueue
SEO靠我+ SynchronousQueue* 可以存储实体并实现生产者和消费者线程匹配*/LinkedTransferQueue<Integer> queue = new LinkedTransferQue
SEO靠我ue<>();/*** 非阻塞插入:实际都是调用 xfer(e, true, ASYNC, 0L)*/queue.put(1);queue.offer(2);queue.offer(3,1,TimeU
SEO靠我nit.SECONDS);queue.add(4);/*** 非阻塞获取,并移除元素*/System.out.println(queue.poll());/*** 阻塞指定时长获取,并移除元素*/Sy
SEO靠我stem.out.println(queue.poll(1, TimeUnit.SECONDS));/*** 非阻塞获取,不移除元素*/System.out.println(queue.peek())
SEO靠我;/*** 阻塞获取*/System.out.println(queue.take());}
三 缓冲队列实现
队列常用语 FIFO 场景的数据处理,同时如多线程生产单线程消费、多线程生产多线程消费和
SEO靠我 单线程生产多线程消费
1.定义缓冲队列类
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
i
SEO靠我mport org.springframework.util.CollectionUtils;import java.util.ArrayList;
import java.util.
SEO靠我Collections;
import java.util.List;
import java.util.concurrent.LinkedBlockingQueue;
SEO靠我
import java.util.concurrent.Semaphore;
import java.util.concurrent.ThreadPoolExecut
SEO靠我or;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;/*** 任务
SEO靠我缓冲队列 用于批量处理 或 顺序缓冲处理** @author * @date 2023-05-16 19:05* @since 1.8*/
public class TaskQueue
SEO靠我<T,R> {/*** 日志打印*/private static final Logger logger = LoggerFactory.getLogger(TaskQueue.class);/***
SEO靠我 任务批量大小*/private int taskSize = 300;/*** 延迟处理时间设置 默认 1 MIN*/private long handDelayTime = 1000 * 60 ;
SEO靠我/*** 缓冲队列大小 默认 1W*/private int capacity = 10000;/*** 无界阻塞队列,用于数据/任务缓冲*/private LinkedBlockingQueue<T
SEO靠我> queue = new LinkedBlockingQueue<>(capacity);/*** 待处理数据/任务集合*/private List<T> taskList = new ArrayL
SEO靠我ist<>(taskSize);/*** 单线程处理*/private Semaphore handSemaphore = new Semaphore(1);/*** 是否强制处理*/private
SEO靠我boolean isForceDeal = false;/*** 最迟处理超时时间(MS) 默认 3 * 60 * 1000*/private int lastDealTTL = 180000;/**
SEO靠我* 上次处理时间*/private long lastDealTime = 0;/*** 阻塞队列阻塞超时时间(MS) 默认 3 * 60 * 1000*/private int queueTTL =
SEO靠我 180000;/*** 同步信号量阻塞超时(MS) 默认 1MIN*/private int semaphoreTTL = 6000;/*** 定义消费线程池 仅通过核心线程循环消费队列数据即可*/
SEO靠我private ThreadPoolExecutor executor = new ThreadPoolExecutor(1, 1,0L, TimeUnit.MILLISECONDS,new Link
SEO靠我edBlockingQueue<>(1));/*** 自定义处理方法*/private Function<List<T>,R> function;/*** 无参*/public TaskQueue()
SEO靠我{}/*** 初始化队列大小* @param capacity*/public TaskQueue(int capacity){this.capacity = capacity;}/*** 初始化锁
SEO靠我用于线程检查*/private static final Integer INIT_LOCK = 0;/*** 消费方法* @return*/private void taskHandlerThrea
SEO靠我d(){executor.execute(()->{//定义任务/数据对象T temp;//循环消费while (true){try {//取数据temp = queue.poll(queueTTL,
SEO靠我TimeUnit.MILLISECONDS);//判断是否为 NULL 或是否超过最迟处理时间if (temp == null || isNeedDeal()){//阻塞等待后仍没有新数据则直接处理t
SEO靠我askHandler();//阻塞直到有新数据进来if (temp == null){temp = queue.take();}}//填充到集合taskList.add(temp);// 判断是否需要
SEO靠我发送if (taskList.size() >= taskSize){taskHandler();}} catch (InterruptedException e) {logger.error("Ta
SEO靠我ke Consumer InterruptedException:",e);} catch (Exception e){logger.error("Take Consumer Exception:",
SEO靠我e);}}});}/*** 是否需要处理(针对超时情况)* 避免数据迟迟无法处理,否则最迟情况可能为 queueTTL * taskSize 时长无法处理* @return*/private bool
SEO靠我ean isNeedDeal(){return isForceDeal && System.currentTimeMillis() > lastDealTime;}/*** 数据/任务集合处理方法*
SEO靠我Spring 管理下可用 @PreDestroy 在实例销毁前触发一次处理*/private void taskHandler(){boolean acq = false;try {//本次许可超时则
SEO靠我等待下次触发(1MIN)acq = handSemaphore.tryAcquire(semaphoreTTL,TimeUnit.MILLISECONDS);//如果成功获取许可才发送if (acq)
SEO靠我{if (!CollectionUtils.isEmpty(taskList)){deal(taskList);}//重置时间lastDealTime = System.currentTimeMill
SEO靠我is() + lastDealTTL;//处理完成清空集合taskList.clear();}} catch (Exception e) {logger.error("Task List Handle
SEO靠我 Exception:",e);} finally {if (acq){handSemaphore.release();}}}/*** 检查线程池是否就绪*/private boolean isHan
SEO靠我dlerReady(){//如果变量为 nullif (executor == null ){synchronized (INIT_LOCK){if (executor == null ){{exec
SEO靠我utor = new ThreadPoolExecutor(1, 1,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<>(1));}}}taskHa
SEO靠我ndlerThread();return false;} else {//如果活动任务数小于 1 重新添加任务int count = executor.getActiveCount();if (cou
SEO靠我nt < 1){synchronized (INIT_LOCK){count = executor.getActiveCount();if (count < 1){taskHandlerThread(
SEO靠我);}}return false;} else {//否则线程池健康 则返回 True 允许使用return true;}}}/*** 入队,失败则直接处理* @param t*/private bo
SEO靠我olean put(T t) {try {return queue.offer(t);} catch (Exception e) {logger.error("Offer Queue Exceptio
SEO靠我n:",e);return false;}}/*** 填充到阻塞队列,如果失败就立即发送* @param t*/private void handleByQueue(T t){boolean putS
SEO靠我uccess = this.put(t);if (!putSuccess){//否则直接处理this.deal(Collections.singletonList(t));}}/*** 前置过滤,校验
SEO靠我消费者状态* @param t*/public void taskHandlerPre(T t){// 默认使用连接池if (isHandlerReady()){this.handleByQueue(
SEO靠我t);} else {//否则直接处理deal(Collections.singletonList(t));}}/*** 设置 Function* @param function*/public vo
SEO靠我id setFunction(Function<List<T>,R> function){this.function = function;}/*** 重写处理方法* @param list*/pub
SEO靠我lic R deal(List<T> list) {if (null == function){return null;}return function.apply(list);}
}
SEO靠我
2.定义数据对象
public class User {private String code;private String name;private int age;public User(int a
SEO靠我ge,String name){this.age = age;this.name = name;}public User(String code,String name,int age){this.c
SEO靠我ode = code;this.name = name;this.age = age;}public String getCode() {return code;}public void setCod
SEO靠我e(String code) {this.code = code;}public String getName() {return name;}public void setName(String n
SEO靠我ame) {this.name = name;}public int getAge() {return age;}public void setAge(int age) {this.age = age
SEO靠我;}@Overridepublic String toString(){return "{\"Code\":\"" + code + "\",\"Name\":\"" + name + "\",\"a
SEO靠我ge\":" + age + "}";}
}
3.测试代码
public void task(){/*** 初始化延迟队列* User 数据/任务类* String 返回值类型* 其他初始
SEO靠我化参数自行修改或支持外部修改*/TaskQueue<User,String> queue = new TaskQueue<>();/*** 批量发送* 批量入库* 批量计算*/queue.setFun
SEO靠我ction(t->{t.forEach(k->{System.out.println(k);});return "";});/*** 模拟发送数据到缓冲队列*/for (int i=0;i<302;i
SEO靠我++){queue.taskHandlerPre(new User(1,"Alycia:" + i));}}