实现定时任务的几种方式

常用的定时任务的实现方式:

  • Spring Scheduled
  • Quartz Scheduler
  • JDK ScheduledExecutorService
  • JDK Timer
  • 分布式任务调度系统:xxl-job,elsatic-job

定时任务启动方式

定时任务通常是随系统启运而运行,如果实现定时任务的方式自身不能随系统启动而运行,就需要与系统启动绑定来启动运行。

可以实现 CommandLineRunner 接口的 run 方法,或将任务处理类注册为 Bean,借助 Bean 的初始化和参数设置方法来实现。

实现CommandLineRunner的run方法

创建任务处理 Bean,实现 CommandLineRunnerrun 方法,在 run 方法里轮询获取和处理延时任务。

@PostConstruct注解到init方法上

创建任务处理 Bean,定义 init()方法,在方法上添加 @PostConstruct注解,@PostConstruct 是 Spring Bean 的生命周期注解,在对象创建完实例后会执行 @PostConstruct 注解到的 init()方法,以实现一些初始化操作。

init()方法里实现定时任务轮询处理。–不建议此方式,虽然能实现业务,但定时任务的生命周期应该是随应用启动而开始,而不是依赖于某个 Bean。

实现InitializingBean的afterPropertiesSet

创建任务处理 Bean,实现 InitializingBean 接口的 afterPropertiesSet() 方法,此方法允许 bean 实例在设置了所有bean 属性后执行,其目的是对总体配置的验证和最终初始化。其替代实现是 @PostConstruct 注解的 init() 方法。

–不建议此方式,虽然能实现业务,但定时任务的生命周期应该是随应用启动而开始,而不是依赖于某个 Bean。

定时任务实现方式

Spring Scheduled

  1. 基于 Spring Scheduled 实现,在 Spring Boot 入口类上添加 @EnableScheduling注解开启任务调度,定义任务处理类和周期轮询方法,方法上添加 @Scheduled注解标记这是一个任务,设置轮循策略。如下示例:

    入口类:

    1
    2
    3
    4
    5
    6
    7
    8
    @EnableScheduling
    @SpringBootApplication
    public class ManagerProviderApplication {

    public static void main(String[] args) {
    SpringApplication.run(ManagerProviderApplication.class, args);
    }
    }

    任务处理类和方法:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    @Component
    public class ScheduledTaskManager {
    private static final Logger logger = LogManager.getLogger(ScheduledTaskManager.class);

    /**
    * corn 表达式:
    * [0 0/2 * * * ?]每2分种执行一次
    * [0/5 * * * * ?]每5秒扫许一次
    * 如果本次任务的执行时间稍长点, 则难以评估下次任务的开始时间
    */
    @Scheduled(cron = "0/2 * * * * *")
    public void scheduled() {
    logger.info("=====>>>>>cron表达式 start: {}", System.currentTimeMillis() / 1000);
    }

    /**
    * fixedRate:指定本次任务开始到下次任务开始的周期间隔,包含了本次任务的执行时间
    * fixedRate = 下次任务开始时间 - 本次任务开始时间
    * 如果本次任务的执行时间小于间隔时间,下次任务开始时间 = 本次任务开始时间 + 执行时间
    * 如果本次任务的执行时间大于等于间隔时间, 下次任务开始时间 = 则本次任务的结束时间
    */
    @Scheduled(fixedRate = 2000)
    public void scheduled1() {
    logger.info("=====>>>>>fixedRate start: {}", System.currentTimeMillis() / 1000);
    }

    /**
    * fixedDelay: 指定本次任务结束到下次任务开始的周期间隔
    * 下次任务开始时间 = 本次任务结束时间 + 周期间隔, 不受本次任务的执行时间影响
    */
    @Scheduled(fixedDelay = 2000)
    public void scheduled2() {
    logger.info("=====>>>>>fixedDelay start: {}", System.currentTimeMillis() / 1000);
    }
    }

在一个类里面开启多个定时任务,不开启异步线程的情况下是使用同一个线程串行执行,如果一个任务卡死,则会导致其它任务无法执行。这是不允许的,就需要开启异步线程来执行。

异步线程

自定义任务线程池,在类上添加 @EnableAsync 注解开启异线程。

如果线程池类实现了 AsyncConfigurer 接口,重写了 getAsyncExecutor() 方法,则线程池会自动被注册为 Bean,推荐这种方式,可以重写 getAsyncUncaughtExceptionHandler() 方法捕获线程异常;否则需要在方法上使用 @Bean来把线程池注册为 Bean。

然后在定时任务的方法上使用 @Async 注解来标识该任务是异常线程执行。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
@Configuration
@EnableAsync
public class AsyncTaskExecutePool implements AsyncConfigurer {
private Logger logger = LoggerFactory.getLogger(AsyncTaskExecutePool.class);

private static int corePoolSize = Runtime.getRuntime().availableProcessors() * 2;
private static int maxPoolSize = corePoolSize * 10;
private static int keepAliveSeconds = 60;
private static int queueCapacity = 2000;
private static String threadNamePrefix = ".......Async";

/* @Bean
public Executor taskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(corePoolSize);
executor.setMaxPoolSize(maxPoolSize);
executor.setQueueCapacity(queueCapacity);
executor.initialize();
return executor;
}*/

@Override
public Executor getAsyncExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
/*核收线程数*/
executor.setCorePoolSize(corePoolSize);
/*最大线程数*/
executor.setMaxPoolSize(maxPoolSize);
/*线程被关闭前的空闲时间*/
executor.setKeepAliveSeconds(keepAliveSeconds);
/*队列长度*/
executor.setQueueCapacity(queueCapacity);
/*设置线程名前缀*/
executor.setThreadNamePrefix(threadNamePrefix);
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
executor.initialize();
return executor;
}

@Override
public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
return new AsyncUncaughtExceptionHandler() {
@Override
public void handleUncaughtException(Throwable ex, Method method, Object... params) {
logger.error(ex.getMessage() + "--------" + ex);
logger.error("exception method:" + method.getName());
}
};
}
}

Quartz Scheduler

Spring Boot 为集成 Quartz 提供spring-boot-starter-quartz包。Quartz 是个强大的任务调度组件,可以将任务详情,任务执行情况持久化到数据库。

Quartz 详细参考 boot-features-quartz官网 Quartz scheduler

简单示列

添加 Quartz 依赖:

1
2
3
4
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-quartz</artifactId>
</dependency>

添加 Quartz 配置:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
@Configuration
public class QuartzConfig {

@Bean
public JobDetail quartzDetail(){
return JobBuilder.newJob(QuartzTask.class).withIdentity("quartzTask").storeDurably().build();
}

@Bean
public Trigger testQuartzTrigger(){
SimpleScheduleBuilder scheduleBuilder = SimpleScheduleBuilder.simpleSchedule()
.withIntervalInSeconds(2) //设置时间周期,单位秒
.repeatForever();
return TriggerBuilder.newTrigger().forJob(quartzDetail())
.withIdentity("quartzTrigger")
.withSchedule(scheduleBuilder)
.build();
}
}

任务处理类:jobExecutionContext 执行上下文可以获取任务及详情

1
2
3
4
5
6
7
8
public class QuartzTask extends QuartzJobBean {
private static final Logger logger = LoggerFactory.getLogger(QuartzTask.class);

@Override
protected void executeInternal(JobExecutionContext jobExecutionContext) throws JobExecutionException {
logger.info("====>>>>执行任务:{}", System.currentTimeMillis() / 1000);
}
}

JDK Timer 实现

JDK 提供的 Timer 是用于按排任务以在后台线程中执行的工具。任务可以安排为一次性执行,也可以定期重复执行。

1
2
3
4
5
6
7
Timer timer = new Timer();
timer.schedule(new TimerTask() {
@Override
public void run() {
// 处理延时任务
}
}, (3* 1000));

该方式存在一定的问题:

  • Timer 在执行所有定时任务时只会创建一个线程。如果某个任务的执行时间长度大于其周期时间长度,那么就会导致这一次的任务还在执行,而下一个周期的任务已经需要开始执行而未执行。

    当然在一个线程内这两个任务只能顺序执行,有两种情况:对于之前需要执行但还没有执行的任务,一是当前任务执行完马上执行那些任务(按顺序来),二是干脆把那些任务丢掉,不去执行它们。

  • Timer线程是不会捕获异常的,如果 TimerTask 抛出了未检查异常则会导致 Timer 线程终止,同时 Timer 也不会重新恢复线程的执行,它会错误的认为整个 Timer 线程都会取消。

    同时,已经被安排但尚未执行的 TimerTask 也不会再执行了,新的任务也不能被调度。故如果 TimerTask抛 出未检查的异常,Timer 将会产生无法预料的行为。

针对上述问题,可以使用ScheduledThreadPoolExecutor替代。但是由于ScheduledThreadPoolExecutor是使用线程池多线程处理,可以设置核心线程数。

注意ScheduledThreadPoolExecutor 的所有构造方法设置了默认的最大线程数(Integer.MAX_VALUE),即不同任务会被放到线程池中的不同线程来处理。可能处理极端情况,当任务结束释放线程的速度小于任务新增的速度时,就会不断创建新的线程直到默认的最大线程数,创建大量的线程是要消耗内存的,就容易出现无可用线程池甚至内存溢出等异常。

所以在创建 ScheduledThreadPoolExecutor对象时,虽然不可以在构造方法中设置最大线程数,因其继承自 ThreadPoolExecutor,可以调用父类的 setXXXX()方法设置线程池的核心参数,包括:核心线程数,最大线程数,空闲时间,线程工厂,拒绝策略

ScheduledExecutorService

JDK 提供的调度线程池执行器 ScheduledExecutorService,其继承自 ThreadPoolExecutor ,实现了 ScheduledExecutorService 接口,主要用来在给定的 延迟时间时运行周期执行

  • ThreadPoolExecutor:提供了 execute() 和 submit() 方法提交异步任务的基础功能。
  • ScheduledExecutorService:提供了延时执行或周期执行任务的功能。

使用示例

实现 CommandLineRunner 接口的 run 方法,创建调度线程池 ScheduledExecutorService,在 run 方法里面使用线程池执行任务调度处理。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
@Component
public class ScheduledTaskExecutor implements CommandLineRunner {
private static final Logger logger = LogManager.getLogger(ScheduledTaskExecutor.class);

private static final ScheduledExecutorService executor = Executors.newScheduledThreadPool(1);

@Override
public void run(String... args) throws Exception {
this.taskProcess();
}

private void taskProcess() {
logger.info("任务处理启动:{}", System.currentTimeMillis() / 1000);

executor.scheduleWithFixedDelay(new Runnable() {
@Override
public void run() {
logger.info("处理业务:{}", System.currentTimeMillis() / 1000);
}
}, 3, 5, TimeUnit.SECONDS); //延后3秒启动,周期间隔5秒
}
}

分布式任务调度系统

例如使用 xxl-Job,Elastic-Job 等分布式任务调度系统来执行周期性任务。

相关参考

  1. 实现一个延时队列:模拟 DelayQueue 实现自定义的延时对列,对理解 DelayQueue 实现原理非常有帮助。
  2. 有赞延迟队列设计:基于 Redis 实现,把定时任务和消费进行了拆分。
  3. 延时队列实现思路:Redis,RabbitMQ,Kafka,Netty,DelayQueue,没有示例代码。
  4. 定时任务实现几种方式:@schedule 注解,Timer & TimerTask,Quartz,ScheduleExecutorService。
  5. 美图延时队列实现-LMSTFY:基于 Redis 实现,LMSTFY Github地址
  6. Redis实现消息队列:借助了 Redis 的 List 的 BLPOP 或 BRPOP 阻塞消费消息。
  7. Lua Guava-EventBus 实现延时队列,这个实现思路值得参考。
  8. 10种延迟任务实现方式:做了汇总,有示例代码,可参考。
  9. Redus 过期 Key 监听与发布订阅功能:有详情的代码示例参考。
  10. Spring Messaging with Redis:Spring 官方手册,基于 Redis 的 发布/订阅 来发送消息。
  11. Spring Messaging with RabbitMQ:Spring 官方手册,基于 RabbitMQ 的 发布/订阅 来发送消息。
作者

光星

发布于

2021-03-16

更新于

2022-07-12

许可协议

评论