解决方案

Java 常见队列的使用与缓冲实现

seo靠我 2023-09-23 23:26:02

常见队列介绍与缓冲实现

一 常见队列二 常见队列的方法1.ArrayBlockingQueue2.LinkedBlockingQueue3.LinkedBlockingDeque4.ConcurrentSEO靠我LinkedQueue5.SynchronousQueue6.LinkedTransferQueue1.定义缓冲队列类2.定义数据对象3.测试代码

一 常见队列

队列说明ArrayBlockingQueuSEO靠我e有界LinkedBlockingQueue有/无界LinkedBlockingDeque无界ConcurrentLinkedQueue无界SynchronousQueue无界LinkedTransfSEO靠我erQueue无界DelayQueue延迟

二 常见队列的方法

1.ArrayBlockingQueue

public static void arrayBlockingQueue() throws IntSEO靠我erruptedException {/*** 有界阻塞队列,初始化必须指定大小*/ArrayBlockingQueue<Integer> queue = new ArrayBlockingQueueSEO靠我<>(1);/*** 阻塞插入,容量不足会阻塞,直到有空闲位置*/queue.put(1);/*** 非阻塞插入,返回插入状态 true/false*/queue.offer(1);/*** 阻塞指定SEO靠我时长插入,阻塞时间内有空闲位置则可插入,返回插入状态 true/false*/queue.offer(1,1, TimeUnit.SECONDS);/*** 异常插入,无空闲位置则抛出异常*///quSEO靠我eue.add(1);/*** 非阻塞拉取,取出并移除元素*/queue.poll();/*** 阻塞指定时长拉取,阻塞时间内有数据则直接取数,取出并移除元素*/queue.poll(1,TimeUnSEO靠我it.SECONDS);/*** 非阻塞拉取,队首为空返回NULL,取出不移除元素*/queue.peek();/*** 阻塞拉取,直到有元素*/queue.take();}

2.LinkedBlockSEO靠我ingQueue

public static void linkedBlockingQueue() throws InterruptedException {/*** 指定长度即为有界* 不指定长度即为SEO靠我无界*/LinkedBlockingQueue<Integer> queue = new LinkedBlockingQueue<>(1);/*** 阻塞插入,容量不足会阻塞,直到有空闲位置*/queSEO靠我ue.put(1);/*** 非阻塞插入,返回插入状态 true/false*/queue.offer(1);/*** 阻塞指定时长插入,阻塞时间内有空闲位置则可插入,返回插入状态 true/falsSEO靠我e*/queue.offer(1,1, TimeUnit.SECONDS);/*** 异常插入,无空闲位置则抛出异常*///queue.add(1);/*** 非阻塞拉取,取出并移除元素*/queueSEO靠我.poll();/*** 阻塞指定时长拉取,阻塞时间内有数据则直接取数,取出并移除元素*/queue.poll(1,TimeUnit.SECONDS);/*** 非阻塞拉取,队首为空返回NULL,取出SEO靠我不移除元素*/queue.peek();/*** 阻塞拉取,直到有元素*/queue.take();}

3.LinkedBlockingDeque

public static void linkedBloSEO靠我ckingDeque() throws InterruptedException {/*** 消费默认从头开始*/LinkedBlockingDeque<Integer> deque = new LiSEO靠我nkedBlockingDeque<>(1);/*** 阻塞插入,容量不足会阻塞,直到有空闲位置*/deque.put(1);/*** 非阻塞插入,返回插入状态 true/false*/deque.oSEO靠我ffer(1);/*** 阻塞指定时长插入,阻塞时间内有空闲位置则可插入,返回插入状态 true/false*/deque.offer(1,1, TimeUnit.SECONDS);/*** 异常插入SEO靠我,无空闲位置则抛出异常*///deque.add(1);/*** 从头非阻塞插入,返回插入状态 true/false*/deque.offerFirst(1);/*** 从头阻塞指定时长插入,阻塞时间SEO靠我内有空闲位置则可插入,返回插入状态 true/false*/deque.offerFirst(1,1,TimeUnit.SECONDS);/*** 从尾非阻塞插入,返回插入状态 true/false*SEO靠我/deque.offerLast(1);/*** 从尾阻塞指定时长插入,阻塞时间内有空闲位置则可插入,返回插入状态 true/false*/deque.offerFirst(1,1,TimeUnit.SEO靠我SECONDS);/*** 非阻塞拉取,取出并移除元素*/deque.poll();/*** 阻塞指定时长拉取,阻塞时间内有数据则直接取数,取出并移除元素*/deque.poll(1,TimeUnitSEO靠我.SECONDS);/*** 非阻塞拉取头元素,取出并移除元素*/deque.pollFirst();/*** 阻塞指定时长拉取头元素,阻塞时间内有数据则直接取数,取出并移除元素*/deque.polSEO靠我lFirst(1,TimeUnit.SECONDS);/*** 非阻塞拉取尾元素,取出并移除元素*/deque.pollLast();/*** 阻塞指定时长拉取尾元素,阻塞时间内有数据则直接取数,取出SEO靠我并移除元素*/deque.pollLast(1,TimeUnit.SECONDS);/*** 非阻塞拉取,队首为空返回NULL,取出不移除元素*/deque.peek();/*** 阻塞拉取,直到有元SEO靠我素*/deque.take();/*** 阻塞拉取,直到头有元素*/deque.takeFirst();/*** 阻塞拉取,直到尾有元素*/deque.takeLast();}

4.ConcurrentSEO靠我LinkedQueue

public static void concurrentLinkedQueue(){/*** 无界,不支持指定大小* 无锁,通过 CAS 控制并发;可能存在读写不一致*/ConSEO靠我currentLinkedQueue<Integer> queue = new ConcurrentLinkedQueue<>();/*** 非阻塞插入,返回插入状态 true/false*/queuSEO靠我e.offer(1);/*** 内部调用了 offer 方法*/queue.add(1);/*** 非阻塞拉取,取出并移除元素*/queue.poll();/*** 非阻塞拉取,队首为空返回NULL,SEO靠我取出不移除元素*/queue.peek();}public static void synchronousQueue () throws InterruptedException {ThreadPooSEO靠我lExecutor executor = new ThreadPoolExecutor(1, 100,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueueSEO靠我<>());/*** 同步队列 不存储数据 匹配生产者和消费者线程*/SynchronousQueue<Integer> queue = new SynchronousQueue<>();/*** 阻SEO靠我塞插入,阻塞直到有消费线程与之匹配*/executor.execute(()->{try {queue.put(1);} catch (InterruptedException e) {throw nSEO靠我ew RuntimeException(e);}});/*** 非阻塞插入,没有消费线程直接返回 true/false*/queue.offer(1);/*** 阻塞指定时长插入,阻塞时间内有消费线程SEO靠我与之匹配则可插入,返回插入状态 true/false*/queue.offer(1,3, TimeUnit.SECONDS);/*** 异常插入,无消费线程则抛出异常*///queue.add(1);SEO靠我/*** 非阻塞拉取,如果有生产线程,则消费其数据*/System.out.println("First:" + queue.poll());/*** 阻塞指定时长拉取,阻塞时间内有生产线程与之匹配则SEO靠我直接取数,取出并移除元素*/System.out.println("Second:" + queue.poll(3,TimeUnit.SECONDS));/*** 无效方法 直接返回 NULL*/quSEO靠我eue.peek();/*** 阻塞插入,阻塞直到有消费线程与之匹配*/executor.execute(()->{try {queue.offer(999999,3, TimeUnit.SECONDSEO靠我S);} catch (InterruptedException e) {throw new RuntimeException(e);}});/*** 阻塞拉取,直到有生产线程与之匹配*/SystemSEO靠我.out.println("Third:" + queue.take());}

5.SynchronousQueue

public static void synchronousQueue () throSEO靠我ws InterruptedException {ThreadPoolExecutor executor = new ThreadPoolExecutor(1, 100,0L, TimeUnit.MISEO靠我LLISECONDS,new LinkedBlockingQueue<>());/*** 同步队列 不存储数据 匹配生产者和消费者线程*/SynchronousQueue<Integer> queueSEO靠我 = new SynchronousQueue<>();/*** 阻塞插入,阻塞直到有消费线程与之匹配*/executor.execute(()->{try {queue.put(1);} catchSEO靠我 (InterruptedException e) {throw new RuntimeException(e);}});/*** 非阻塞插入,没有消费线程直接返回 true/false*/queueSEO靠我.offer(1);/*** 阻塞指定时长插入,阻塞时间内有消费线程与之匹配则可插入,返回插入状态 true/false*/queue.offer(1,3, TimeUnit.SECONDS);/**SEO靠我* 异常插入,无消费线程则抛出异常*///queue.add(1);/*** 非阻塞拉取,如果有生产线程,则消费其数据*/System.out.println("First:" + queue.polSEO靠我l());/*** 阻塞指定时长拉取,阻塞时间内有生产线程与之匹配则直接取数,取出并移除元素*/System.out.println("Second:" + queue.poll(3,TimeUnitSEO靠我.SECONDS));/*** 无效方法 直接返回 NULL*/queue.peek();/*** 阻塞插入,阻塞直到有消费线程与之匹配*/executor.execute(()->{try {queSEO靠我ue.offer(999999,3, TimeUnit.SECONDS);} catch (InterruptedException e) {throw new RuntimeException(e)SEO靠我;}});/*** 阻塞拉取,直到有生产线程与之匹配*/System.out.println("Third:" + queue.take());}

6.LinkedTransferQueue

publicSEO靠我 static void linkedTransferQueue() throws InterruptedException {/*** 无界非阻塞队列 类似 LinkedBlockingQueue SEO靠我+ SynchronousQueue* 可以存储实体并实现生产者和消费者线程匹配*/LinkedTransferQueue<Integer> queue = new LinkedTransferQueSEO靠我ue<>();/*** 非阻塞插入:实际都是调用 xfer(e, true, ASYNC, 0L)*/queue.put(1);queue.offer(2);queue.offer(3,1,TimeUSEO靠我nit.SECONDS);queue.add(4);/*** 非阻塞获取,并移除元素*/System.out.println(queue.poll());/*** 阻塞指定时长获取,并移除元素*/SySEO靠我stem.out.println(queue.poll(1, TimeUnit.SECONDS));/*** 非阻塞获取,不移除元素*/System.out.println(queue.peek())SEO靠我;/*** 阻塞获取*/System.out.println(queue.take());}

三 缓冲队列实现

队列常用语 FIFO 场景的数据处理,同时如多线程生产单线程消费、多线程生产多线程消费和 SEO靠我 单线程生产多线程消费

1.定义缓冲队列类

import org.slf4j.Logger; import org.slf4j.LoggerFactory; iSEO靠我mport org.springframework.util.CollectionUtils;import java.util.ArrayList; import java.util.SEO靠我Collections; import java.util.List; import java.util.concurrent.LinkedBlockingQueue;SEO靠我 import java.util.concurrent.Semaphore; import java.util.concurrent.ThreadPoolExecutSEO靠我or; import java.util.concurrent.TimeUnit; import java.util.function.Function;/*** 任务SEO靠我缓冲队列 用于批量处理 或 顺序缓冲处理** @author * @date 2023-05-16 19:05* @since 1.8*/ public class TaskQueueSEO靠我<T,R> {/*** 日志打印*/private static final Logger logger = LoggerFactory.getLogger(TaskQueue.class);/***SEO靠我 任务批量大小*/private int taskSize = 300;/*** 延迟处理时间设置 默认 1 MIN*/private long handDelayTime = 1000 * 60 ;SEO靠我/*** 缓冲队列大小 默认 1W*/private int capacity = 10000;/*** 无界阻塞队列,用于数据/任务缓冲*/private LinkedBlockingQueue<TSEO靠我> queue = new LinkedBlockingQueue<>(capacity);/*** 待处理数据/任务集合*/private List<T> taskList = new ArrayLSEO靠我ist<>(taskSize);/*** 单线程处理*/private Semaphore handSemaphore = new Semaphore(1);/*** 是否强制处理*/private SEO靠我boolean isForceDeal = false;/*** 最迟处理超时时间(MS) 默认 3 * 60 * 1000*/private int lastDealTTL = 180000;/**SEO靠我* 上次处理时间*/private long lastDealTime = 0;/*** 阻塞队列阻塞超时时间(MS) 默认 3 * 60 * 1000*/private int queueTTL =SEO靠我 180000;/*** 同步信号量阻塞超时(MS) 默认 1MIN*/private int semaphoreTTL = 6000;/*** 定义消费线程池 仅通过核心线程循环消费队列数据即可*/SEO靠我private ThreadPoolExecutor executor = new ThreadPoolExecutor(1, 1,0L, TimeUnit.MILLISECONDS,new LinkSEO靠我edBlockingQueue<>(1));/*** 自定义处理方法*/private Function<List<T>,R> function;/*** 无参*/public TaskQueue()SEO靠我{}/*** 初始化队列大小* @param capacity*/public TaskQueue(int capacity){this.capacity = capacity;}/*** 初始化锁 SEO靠我用于线程检查*/private static final Integer INIT_LOCK = 0;/*** 消费方法* @return*/private void taskHandlerThreaSEO靠我d(){executor.execute(()->{//定义任务/数据对象T temp;//循环消费while (true){try {//取数据temp = queue.poll(queueTTL,SEO靠我TimeUnit.MILLISECONDS);//判断是否为 NULL 或是否超过最迟处理时间if (temp == null || isNeedDeal()){//阻塞等待后仍没有新数据则直接处理tSEO靠我askHandler();//阻塞直到有新数据进来if (temp == null){temp = queue.take();}}//填充到集合taskList.add(temp);// 判断是否需要SEO靠我发送if (taskList.size() >= taskSize){taskHandler();}} catch (InterruptedException e) {logger.error("TaSEO靠我ke Consumer InterruptedException:",e);} catch (Exception e){logger.error("Take Consumer Exception:",SEO靠我e);}}});}/*** 是否需要处理(针对超时情况)* 避免数据迟迟无法处理,否则最迟情况可能为 queueTTL * taskSize 时长无法处理* @return*/private boolSEO靠我ean isNeedDeal(){return isForceDeal && System.currentTimeMillis() > lastDealTime;}/*** 数据/任务集合处理方法* SEO靠我Spring 管理下可用 @PreDestroy 在实例销毁前触发一次处理*/private void taskHandler(){boolean acq = false;try {//本次许可超时则SEO靠我等待下次触发(1MIN)acq = handSemaphore.tryAcquire(semaphoreTTL,TimeUnit.MILLISECONDS);//如果成功获取许可才发送if (acq)SEO靠我{if (!CollectionUtils.isEmpty(taskList)){deal(taskList);}//重置时间lastDealTime = System.currentTimeMillSEO靠我is() + lastDealTTL;//处理完成清空集合taskList.clear();}} catch (Exception e) {logger.error("Task List HandleSEO靠我 Exception:",e);} finally {if (acq){handSemaphore.release();}}}/*** 检查线程池是否就绪*/private boolean isHanSEO靠我dlerReady(){//如果变量为 nullif (executor == null ){synchronized (INIT_LOCK){if (executor == null ){{execSEO靠我utor = new ThreadPoolExecutor(1, 1,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<>(1));}}}taskHaSEO靠我ndlerThread();return false;} else {//如果活动任务数小于 1 重新添加任务int count = executor.getActiveCount();if (couSEO靠我nt < 1){synchronized (INIT_LOCK){count = executor.getActiveCount();if (count < 1){taskHandlerThread(SEO靠我);}}return false;} else {//否则线程池健康 则返回 True 允许使用return true;}}}/*** 入队,失败则直接处理* @param t*/private boSEO靠我olean put(T t) {try {return queue.offer(t);} catch (Exception e) {logger.error("Offer Queue ExceptioSEO靠我n:",e);return false;}}/*** 填充到阻塞队列,如果失败就立即发送* @param t*/private void handleByQueue(T t){boolean putSSEO靠我uccess = this.put(t);if (!putSuccess){//否则直接处理this.deal(Collections.singletonList(t));}}/*** 前置过滤,校验SEO靠我消费者状态* @param t*/public void taskHandlerPre(T t){// 默认使用连接池if (isHandlerReady()){this.handleByQueue(SEO靠我t);} else {//否则直接处理deal(Collections.singletonList(t));}}/*** 设置 Function* @param function*/public voSEO靠我id setFunction(Function<List<T>,R> function){this.function = function;}/*** 重写处理方法* @param list*/pubSEO靠我lic R deal(List<T> list) {if (null == function){return null;}return function.apply(list);} }SEO靠我

2.定义数据对象

public class User {private String code;private String name;private int age;public User(int aSEO靠我ge,String name){this.age = age;this.name = name;}public User(String code,String name,int age){this.cSEO靠我ode = code;this.name = name;this.age = age;}public String getCode() {return code;}public void setCodSEO靠我e(String code) {this.code = code;}public String getName() {return name;}public void setName(String nSEO靠我ame) {this.name = name;}public int getAge() {return age;}public void setAge(int age) {this.age = ageSEO靠我;}@Overridepublic String toString(){return "{\"Code\":\"" + code + "\",\"Name\":\"" + name + "\",\"aSEO靠我ge\":" + age + "}";} }

3.测试代码

public void task(){/*** 初始化延迟队列* User 数据/任务类* String 返回值类型* 其他初始SEO靠我化参数自行修改或支持外部修改*/TaskQueue<User,String> queue = new TaskQueue<>();/*** 批量发送* 批量入库* 批量计算*/queue.setFunSEO靠我ction(t->{t.forEach(k->{System.out.println(k);});return "";});/*** 模拟发送数据到缓冲队列*/for (int i=0;i<302;iSEO靠我++){queue.taskHandlerPre(new User(1,"Alycia:" + i));}}
“SEO靠我”的新闻页面文章、图片、音频、视频等稿件均为自媒体人、第三方机构发布或转载。如稿件涉及版权等问题,请与 我们联系删除或处理,客服邮箱:html5sh@163.com,稿件内容仅为传递更多信息之目的,不代表本网观点,亦不代表本网站赞同 其观点或证实其内容的真实性。

网站备案号:浙ICP备17034767号-2