Spring 提供了任务执行器(TaskExecutor)来实现多线程和并发编程。使用ThreadPoolTaskExecutor可实现一个基于线程池的TaskExecutor。
要实现任务异步执行,在配置为中添加@EnableAsync开启对异步任务的支持,在执行Bean的方法中使用@Async注解来声明一个异步方法。
**注意:**创建本地实例(new)调用@Async注解的方法是,异步执行是不起效的。实例必须在@Configuration 类中创建或由@ComponentScan 扫描,意味必须是 Spring Bean 之间调用异步方法才会生效。
定义异步线程池
创建定义异步线程池的配置类,继承 AsyncConfigurer,重写 getAsyncExecutor 方法,返回 ThreadPoolTaskExecutor。
| 12
 3
 4
 5
 6
 7
 8
 9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 
 | 
 
 @Configuration
 @EnableAsync
 public class AsyncTaskExecutePool implements AsyncConfigurer {
 private Logger logger = LoggerFactory.getLogger(AsyncTaskExecutePool.class);
 
 @Override
 public Executor getAsyncExecutor(){
 ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
 
 executor.setCorePoolSize(5);
 
 executor.setMaxPoolSize(50);
 
 executor.setKeepAliveSeconds(60);
 
 executor.setQueueCapacity(10000);
 
 executor.setThreadNamePrefix("....");
 executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
 executor.initialize();
 return executor;
 }
 
 @Override
 public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler(){
 return new AsyncUncaughtExceptionHandler() {
 @Override
 public void handleUncaughtException(Throwable ex, Method method, Object... params) {
 logger.error(ex.getMessage() + "--------" + ex);
 logger.error("exception method:" + method.getName());
 }
 };
 
 }
 }
 
 | 
AsyncConfigurer:为异步执行提供配置。源码如下:
| 12
 3
 4
 5
 6
 7
 8
 9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 
 | @Configurationpublic abstract class AbstractAsyncConfiguration implements ImportAware {
 
 @Nullable
 protected AnnotationAttributes enableAsync;
 
 @Nullable
 protected Supplier<Executor> executor;
 
 @Nullable
 protected Supplier<AsyncUncaughtExceptionHandler> exceptionHandler;
 
 
 @Override
 public void setImportMetadata(AnnotationMetadata importMetadata) {
 this.enableAsync = AnnotationAttributes.fromMap(
 importMetadata.getAnnotationAttributes(EnableAsync.class.getName(), false));
 if (this.enableAsync == null) {
 throw new IllegalArgumentException(
 "@EnableAsync is not present on importing class " + importMetadata.getClassName());
 }
 }
 
 
 
 
 @Autowired(required = false)
 void setConfigurers(Collection<AsyncConfigurer> configurers) {
 if (CollectionUtils.isEmpty(configurers)) {
 return;
 }
 if (configurers.size() > 1) {
 throw new IllegalStateException("Only one AsyncConfigurer may exist");
 }
 AsyncConfigurer configurer = configurers.iterator().next();
 this.executor = configurer::getAsyncExecutor;
 this.exceptionHandler = configurer::getAsyncUncaughtExceptionHandler;
 }
 
 }
 
 | 
Spring 项目,如果没有自定义异步线程池,Spring 会创建一个 SimpleAsyncTaskExecutor并使用它。
**AsyncExecutionInterceptor:**AOP 实现的异步执行拦截器。源码如下:
| 12
 3
 4
 5
 6
 7
 8
 9
 10
 11
 12
 13
 
 | 
 
 
 
 
 
 @Override
 @Nullable
 protected Executor getDefaultExecutor(@Nullable BeanFactory beanFactory) {
 Executor defaultExecutor = super.getDefaultExecutor(beanFactory);
 return (defaultExecutor != null ? defaultExecutor : new SimpleAsyncTaskExecutor());
 }
 
 | 
**Spring Boot 项目,**在启动时会自动配置一个 ThreadPoolTaskExecutor Bean,beanName 为 applicationTaskExecutor 和 taskExecutor。源码如下:
| 12
 3
 4
 5
 6
 7
 8
 9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 
 | 
 
 
 
 
 
 @ConditionalOnClass(ThreadPoolTaskExecutor.class)
 @Configuration(proxyBeanMethods = false)
 @EnableConfigurationProperties(TaskExecutionProperties.class)
 public class TaskExecutionAutoConfiguration {
 
 
 
 
 public static final String APPLICATION_TASK_EXECUTOR_BEAN_NAME = "applicationTaskExecutor";
 
 @Bean
 @ConditionalOnMissingBean
 public TaskExecutorBuilder taskExecutorBuilder(TaskExecutionProperties properties,
 ObjectProvider<TaskExecutorCustomizer> taskExecutorCustomizers,
 ObjectProvider<TaskDecorator> taskDecorator) {
 TaskExecutionProperties.Pool pool = properties.getPool();
 TaskExecutorBuilder builder = new TaskExecutorBuilder();
 builder = builder.queueCapacity(pool.getQueueCapacity());
 builder = builder.corePoolSize(pool.getCoreSize());
 builder = builder.maxPoolSize(pool.getMaxSize());
 builder = builder.allowCoreThreadTimeOut(pool.isAllowCoreThreadTimeout());
 builder = builder.keepAlive(pool.getKeepAlive());
 Shutdown shutdown = properties.getShutdown();
 builder = builder.awaitTermination(shutdown.isAwaitTermination());
 builder = builder.awaitTerminationPeriod(shutdown.getAwaitTerminationPeriod());
 builder = builder.threadNamePrefix(properties.getThreadNamePrefix());
 builder = builder.customizers(taskExecutorCustomizers.orderedStream()::iterator);
 builder = builder.taskDecorator(taskDecorator.getIfUnique());
 return builder;
 }
 
 
 @Lazy
 @Bean(name = { APPLICATION_TASK_EXECUTOR_BEAN_NAME,
 AsyncAnnotationBeanPostProcessor.DEFAULT_TASK_EXECUTOR_BEAN_NAME })
 @ConditionalOnMissingBean(Executor.class)
 public ThreadPoolTaskExecutor applicationTaskExecutor(TaskExecutorBuilder builder) {
 return builder.build();
 }
 
 }
 
 | 
此 ThreadPoolTaskExecutor 使用 TaskExecutionProperties 设置线程池参数,可以通过 spring.task.execution前缀配置参数值。如下:
| 12
 3
 4
 5
 6
 
 | spring.task.execution.pool.core-size=8spring.task.execution.pool.max-size=200
 spring.task.execution.pool.keep-alive=60s
 spring.task.execution.pool.queue-capacity=10240
 spring.task.execution.pool.allow-core-thread-timeout=true
 spring.task.execution.thread-name-prefix=Async--
 
 | 
当线程数量高于线程池的处理速度时,任务会被缓存到本地的队列中,如果超过队列容量,就会执行拒绝策略。Spring Boot 自动配置的ThreadPoolTaskExecutor的拒绝策略没有属性配置项,使用的是默认的拒绝策略 AbortPolicy。通常会配置以下 2 种拒绝策略:
- **AbortPolicy:**直接抛出 RejectedExecutionException 异常。
- **CallerRunsPolicy:**主线程直接执行该任务(同步了),执行完后尝试将下一个任务添加到线程池,这样可以有效降低线程池内添加任务的速度。
 建议使用 CallerRunsPolicy 策略,任务不会丢弃。因为当任务满后,如果直接抛异常,这个任务就会被丢弃。
@EnableAsync
在配置类添加注解 @EnableAsync 开启异步任务支持。
@Async声明异步
@Async作用在方法上表示该方法是异步的;如果作用在类上,则表示该类下的所有方法都是异步的。
| 12
 3
 4
 5
 6
 7
 8
 9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 
 | import org.springframework.scheduling.annotation.Async;import org.springframework.stereotype.Service;
 
 
 
 
 
 @Service
 public class AsyncTaskService {
 
 @Async
 public void executeAsyncTask(Integer i) {
 System.out.println("执行异步任务:" + i);
 }
 
 @Async
 public void executeAsyncTaskPlus(Integer i) {
 System.out.println("执行异步任务+1:" + (i+1));
 }
 }
 
 | 
运行测试:
| 12
 3
 4
 5
 6
 7
 8
 9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 
 | import org.springframework.context.annotation.AnnotationConfigApplicationContext;
 public class TaskMain {
 
 public static void main(String[] args) {
 AnnotationConfigApplicationContext context =
 new AnnotationConfigApplicationContext(TaskExecutorConfig.class);
 
 AsyncTaskService asyncTaskService = context.getBean(AsyncTaskService.class);
 
 for (int i = 0; i < 10; i++) {
 asyncTaskService.executeAsyncTask(i);
 asyncTaskService.executeAsyncTaskPlus(i);
 }
 
 context.close();
 }
 }
 
 | 
执行结果:
| 12
 3
 4
 5
 6
 7
 8
 9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 
 | 执行异步任务+1:1执行异步任务+1:3
 执行异步任务:0
 执行异步任务+1:4
 执行异步任务:4
 执行异步任务+1:5
 执行异步任务:5
 执行异步任务+1:6
 执行异步任务:6
 执行异步任务+1:7
 执行异步任务:7
 执行异步任务:3
 执行异步任务:8
 执行异步任务+1:9
 执行异步任务:9
 执行异步任务+1:10
 执行异步任务:1
 执行异步任务+1:8
 执行异步任务+1:2
 执行异步任务:2
 
 | 
相关参考
- Sping:Creating Asynchronous Methods
- Spring Doc EnableAsync:Annotation Type EnableAsync