Java高并发专题之37、如何实现一个通用的延迟队列?

电商大家都用过吧,下单后若未支付,通常都有一段支付倒计时,比如15分钟,若时间到了之后,还未支付的,订单将被关闭,库存将被释放。

这种业务就需要用到延迟队列的功能,将任务丢到延迟队列、设置一个延迟时间、回调函数,到了时间之后,延迟队列将回调指定的函数消费指定的任务。

下面代码是一个通用的延迟队列的实现,大家可以直接拿去用。

代码还是比较简单的,技术要点:

  • 调用addTask方法将任务丢到延迟队列中,主要参数(延迟时间、任务信息、回调【任务到期后会进行回调】)
  • 使用到了java中的延迟队列DelayQueue来存放延迟任务
  • 下面的构造方法会自动调用一个start方法,start方法中会自动启动一个线程,线程轮询从延迟队列中拉取到期的任务,然后丢到线程池executorService.submit中进行处理,会自动调用创建延迟任务中指定的回调函数
  • main方法中有使用步骤
  1. import java.util.concurrent.*;
  2. import java.util.function.Consumer;
  3. import java.util.logging.Logger;
  4. public class DelayQueueService<T> {
  5. Logger logger = Logger.getLogger(DelayQueueService.class.getName());
  6. //延迟队列名称
  7. private String delayQueueName;
  8. private DelayQueue<DelayedTask> delayQueue = new DelayQueue<>();
  9. //处理队列中任务的线程池
  10. private ExecutorService executorService;
  11. public DelayQueueService(String delayQueueName) {
  12. this(delayQueueName, Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors() * 4));
  13. }
  14. public DelayQueueService(String delayQueueName, ExecutorService executorService) {
  15. this.delayQueueName = delayQueueName;
  16. this.executorService = executorService;
  17. //启动队列消费
  18. this.start();
  19. }
  20. /**
  21. * 添加任务
  22. *
  23. * @param delayedTimeUnit 延迟时间单位
  24. * @param delayedTime 延迟时间
  25. * @param task 任务
  26. * @param consumer 任务消费者(到期了会回调)
  27. */
  28. public void addTask(TimeUnit delayedTimeUnit, long delayedTime, T task, Consumer<T> consumer) {
  29. this.delayQueue.offer(new DelayedTask(delayedTimeUnit, delayedTime, task, consumer));
  30. }
  31. private void start() {
  32. //轮询从延迟队列中拉取任务,然后调用线程池进行处理
  33. Thread pollThread = new Thread(() -> {
  34. while (true) {
  35. try {
  36. DelayedTask delayedTask = this.delayQueue.poll(100, TimeUnit.MILLISECONDS);
  37. if (this.executorService.isShutdown()) {
  38. break;
  39. }
  40. if (delayedTask != null) {
  41. executorService.submit(() -> {
  42. delayedTask.consumer.accept(delayedTask.task);
  43. });
  44. }
  45. } catch (InterruptedException e) {
  46. logger.warning(e.getMessage());
  47. }
  48. }
  49. });
  50. pollThread.setDaemon(Thread.currentThread().isDaemon());
  51. pollThread.setName(this.getClass().getName() + -pollThread- + this.delayQueueName);
  52. pollThread.start();
  53. }
  54. public void close() {
  55. if (!this.executorService.isShutdown()) {
  56. this.executorService.shutdown();
  57. }
  58. }
  59. public class DelayedTask implements Delayed {
  60. //延迟时间单位
  61. private TimeUnit delayedTimeUnit;
  62. //延迟时间
  63. private long delayedTime;
  64. //到期时间(毫秒)
  65. private long endTime;
  66. //延迟任务信息
  67. private T task;
  68. //消费者
  69. private Consumer<T> consumer;
  70. public DelayedTask(TimeUnit delayedTimeUnit, long delayedTime, T task, Consumer<T> consumer) {
  71. this.delayedTimeUnit = delayedTimeUnit;
  72. this.delayedTime = delayedTime;
  73. this.task = task;
  74. this.endTime = System.currentTimeMillis() + delayedTimeUnit.toMillis(delayedTime);
  75. this.consumer = consumer;
  76. }
  77. @Override
  78. public long getDelay(TimeUnit unit) {
  79. return unit.convert(this.endTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
  80. }
  81. @Override
  82. public int compareTo(Delayed o) {
  83. DelayedTask task = (DelayedTask) o;
  84. return Long.compare(this.endTime, task.endTime);
  85. }
  86. }
  87. public static void main(String[] args) {
  88. //创建一个延迟队列:用来对超过支付日期的订单进行关闭
  89. String delayQueueName = orderCloseDelayQueue;
  90. //1、创建延迟队列
  91. DelayQueueService<String> orderCloseDelayQueue = new DelayQueueService<String>(delayQueueName);
  92. for (int i = 1; i <= 10; i++) {
  93. //2、调用addTask将延迟任务加入延迟队列
  94. orderCloseDelayQueue.addTask(TimeUnit.SECONDS, i, 订单 + i, new Consumer<String>() {
  95. @Override
  96. public void accept(String s) {
  97. System.out.println(System.currentTimeMillis() + , + Thread.currentThread() + ,关闭订单: + s);
  98. }
  99. });
  100. }
  101. //3、系统关闭的时候,调用延迟队列的close方法
  102. //orderCloseDelayQueue.close();
  103. }
  104. }

main方法中模拟了10个延迟任务,运行看看效果,输出

  1. 1614346780438,Thread[pool-1-thread-1,5,main],关闭订单:订单1
  2. 1614346781437,Thread[pool-1-thread-2,5,main],关闭订单:订单2
  3. 1614346782436,Thread[pool-1-thread-3,5,main],关闭订单:订单3
  4. 1614346783437,Thread[pool-1-thread-4,5,main],关闭订单:订单4
  5. 1614346784437,Thread[pool-1-thread-5,5,main],关闭订单:订单5
  6. 1614346785437,Thread[pool-1-thread-6,5,main],关闭订单:订单6
  7. 1614346786437,Thread[pool-1-thread-7,5,main],关闭订单:订单7
  8. 1614346787436,Thread[pool-1-thread-8,5,main],关闭订单:订单8
  9. 1614346788437,Thread[pool-1-thread-9,5,main],关闭订单:订单9
  10. 1614346789437,Thread[pool-1-thread-10,5,main],关闭订单:订单10
来源:http://itsoku.com/course/1/205