延时队列及 JDK Delay Queue 实现

最近项目用到了延时队列,是基于 RedisSorted Set 数据类型和 ZRANGEBYSCORE命令实现的。

延时队列还有其他实现方式,可以根据项目环境和业务需要选择适当的方案,这里做个汇总和记录。

延时队列

延时队列(Delay Queue):把需要延时的任务元素加入到一个队列中,队列具有延迟消费的特性,即可以指定队列中的元素在某个时间点才被消费。

延时队列中的元素必带的一个核心的参数就是时间时间戳相对时间差),或称为权重,本质是按时间排序,然后从队头取出消息消费。

这里自然延伸想到 优先队列,有最大优先队列,最大元素优先出队;最小优先队列,最小元素优先出队。

应用场景

  • 超时处理:自动取消超时未支付的订单;自动退还超时未被查收的红包。

  • 延迟处理:延时发送通知消息;预约任务需延迟处理。

  • 延迟重试:结果通知失败延迟重试。

    例如,支付结果异步通知失败重试,按 2s,4s,8s,16s, 32s 时间延迟重试通知。

实现方式

  • 定时任务轮询数据库
  • JDK 的 DelayQueue
  • Redis 的 Sort Set 实现
  • Redis 键过期通知
  • Redisson 的 Delayed Queue 实现
  • RabbitMQ 延时队列实现
  • Netty 时间轮(Time Wheel)算法实现

基于定时任务实现

使用定时任务来扫描数据库里符合条件的数据,例如,查询某个时间范围内(或加排序)符合条件的记录,并对其进行业务处理和更新。

可以使用 Spring-Scheduled,Quartz,Elastic-job 或 自己写个 Timer 也能实现。

定时轮询数据库这种方式存在明显的弊端,要不停的扫描数据库,如果数据量非常大,且任务执行间隔时间较短,会给数据库带来较大的压力。在某些大业务量或高并发的系统中,这是不允许的。

基于定时任务实现,因定时任务执行是有周期性的,可能元素已过期,但定时任务还未达到触发执行的时间点,就会造成已过期的元素处理不够及时,这依赖于定时任务的执行间隔,所以使用此方式需要考虑业务的及时性来设置合理的定时任务执行周期。

JDK DelayQueue 实现

JDK 提供了延时队列实现 java.util.concurrent.DelayQueue,DelayQueue 是一种无界阻塞队列(Unbounded Blocking Queue),底层实现是优先队列PriorityQueue。

DelayQueue 中的元素只有在其延迟过期后才能被获取。队列的头元素是延迟过期最久的元素,如果没有延迟,则没有头,获取元素的poll方法返回 null。当元素的 getDelay(TimeUnit.NANOSECONDS)方法返回值小于或等于零时,就会发生过期。

无法使用 takepoll删除未过期的元素,但它们仍被视为普通元素。例如,size方法返回过期和未过期元素的计数。此延时队列不允许空元素

此类及其迭代器实现了 CollectionIterator 接口的所有可选方法。iterator()方法提供的迭代不保证以任何特定顺序遍历 DelayQueue 元素。

部分源码

1
2
3
4
5
6
7
8
9
10
11
public class DelayQueue<E extends Delayed> extends AbstractQueue<E>
implements BlockingQueue<E> {

private final transient ReentrantLock lock = new ReentrantLock();
private final PriorityQueue<E> q = new PriorityQueue<E>();

private Thread leader = null;

private final Condition available = lock.newCondition();
//-------方法和内部类 Itr 省略-----------------
}

DelayQueue 类的声明可以看出,DelayQueue 是个泛型队列,它接受继承 Delayed 接口类型的元素,即在使用时定义的元素类需要实现 Delayed 接口,实现其 getDelay(TimeUnit unit)方法;而 Delayed 接口继承自 Comparable 接口,所以还要实现 Comparable 接口的 int compareTo(T o)方法 。

1
2
3
4
5
6
7
public interface Delayed extends Comparable<Delayed> {
long getDelay(TimeUnit unit);
}

public interface Comparable<T> {
public int compareTo(T o);
}
  • long getDelay(TimeUnit unit):该方法确定元素是否到期(是否可以被获取)。当返回值小于或等于零时,表示元素已过期,可以被读取出来。
  • int compareTo(T o):该方法在往 DelayQueue 中添加元素时会被执行,以确定元素的排序位置。

DelayQueue 为获取元素提供了三个方法:

  • *take()*:检索并移除此队列的头,如果队列头为空,则等待阻塞等待,直到延迟过期的元素在此队列头上可用。
  • *poll(long timeout, TimeUnit unit)*:检索并移除此队列的头,如果队列头为空,则阻塞等待,此方法需指定的阻塞等待时间,如果等待超时仍没有过期元素,则返回 NULL。
  • *peek()*:如果队列头中没有可用的过期元素,则返回下一个将要过期的元素。

方法及类图

DelayQueue 方法及类图:

JDK-DelayQueue

优缺点

  • 优点:JDK 自带,不需要引入第三方依赖,实现简单。
  • 缺点:内存存储,就不支持分部署,若发生故障会可能会造成数据丢失,无界队列还存在 OOM 的风险。

实现示例

定义队列元素类,实现 Delayed 接口,重写其中的两个方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
@Data
public class Message implements Delayed {
/**
* 内容
*/
private String content;
/**
* 发送时间
*/
private Long extTime;

public Message(String content, Long extTime) {
this.content = content;
this.extTime = extTime;
}

@Override
public long getDelay(TimeUnit unit) {
return extTime - System.currentTimeMillis();
}

@Override
public int compareTo(Delayed o) {
Message msg = (Message) o;
return this.extTime - msg.extTime <= 0 ? -1 : 1;
}
}

添加和获取元素,打断点可以看到,每次插入数据都会重新排序,过期时间越小的越排在队列前面。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public class MainTest {

public static void main(String[] args) throws InterruptedException {
Message msg1 = new Message("A", System.currentTimeMillis() + 30000L);
Message msg2 = new Message("B", System.currentTimeMillis() + 20000L);
Message msg3 = new Message("C", System.currentTimeMillis() + 50000L);

DelayQueue<Message> delayQueue = new DelayQueue<>();
delayQueue.add(msg1);
delayQueue.add(msg2);
delayQueue.add(msg3);

int size = delayQueue.size();

System.out.println("start------------------------------------------------" + System.currentTimeMillis());
for (int i = 0; i < size; i++) {
System.out.println(delayQueue.take() + "------------" + System.currentTimeMillis());
}
System.out.println();
}
}

输出结果

1
2
3
4
start------------------------------------------------1615274332477
Message(content=B, extTime=1615274352475)------------1615274352475
Message(content=A, extTime=1615274362475)------------1615274362475
Message(content=C, extTime=1615274382475)------------1615274382475

实际使用时可以随应用启动时开启一个线程执行一个死循环,循环体内获取延时队列中的任务来处理业务。简单示例如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
@Component
public class DelayQueueManager implements CommandLineRunner {
private DelayQueue<MessageTask> delayQueue = new DelayQueue<MessageTask>();
private ExecutorService executor = Executors.newSingleThreadExecutor();

@Override
public void run(String... args) throws Exception {
ExecutorService executor = Executors.newSingleThreadExecutor();
executor.execute(new Thread(this::executeTask));
}

/**
* 延时任务执行线程
*/
private void executeTask() {
while (true) {
try {
MessageTask task = delayQueue.take();
//............处理业务............
} catch (InterruptedException e) {
break;
}
}
}

/**
* 添加元素
* DelayQueue 的 put 方法是线程安全的
* @param messageTask
*/
private void put(MessageTask messageTask){
delayQueue.put(messageTask);
}

}

服务重启问题

基于 JDK 的 DelayQueue 实现延时队列,在服务重启后会丢失数据,需要有补偿机制。

可以在应用启动时去数据库扫描任务数据,将未处理的任务加到延时队列中,这里面涉及任务状态,过期时间检测。

创建描述库中的数据处理 Bean,实现 ApplicationRunner 接口的 run() 方法,在 run() 实现任务的补偿处理。

同样要处理任务状态的一致性,防止任务被重复处理。

相关参考

  1. 使用Redis实现延时任务一:总结了几种延时任务实现方式,给出了示例代码
  2. 使用Redis实现延时任务二:对延时任务进行了分片和监控,有示例代码
  3. 实现一个延时队列:模拟 DelayQueue 实现自定义的延时对列,对理解 DelayQueue 实现原理非常有帮助。
  4. 有赞延迟队列设计:基于 Redis 实现,把定时任务和消费进行了拆分。
  5. 延时队列实现思路:Redis,RabbitMQ,Kafka,Netty,DelayQueue,没有示例代码。
  6. 定时任务实现几种方式:@schedule 注解,Timer & TimerTask,Quartz,ScheduleExecutorService。
  7. 美图延时队列实现-LMSTFY:基于 Redis 实现,LMSTFY Github地址
  8. Redis实现消息队列:借助了 Redis 的 List 的 BLPOP 或 BRPOP 阻塞消费消息。
  9. Lua Guava-EventBus 实现延时队列,这个实现思路值得参考。
  10. 10种延迟任务实现方式:做了汇总,有示例代码,可参考。
  11. Redus 过期 Key 监听与发布订阅功能:有详情的代码示例参考。
  12. Spring Messaging with Redis:Spring 官方手册,基于 Redis 的 发布/订阅 来发送消息。
  13. Spring Messaging with RabbitMQ:Spring 官方手册,基于 RabbitMQ 的 发布/订阅 来发送消息。
作者

光星

发布于

2021-03-15

更新于

2022-07-12

许可协议

评论