@Scheduled源码解析
作者简介:大家好,我是smart哥,前中兴通讯、美团架构师,现某互联网公司CTO
联系qq:184480602,加我进群,大家一起学习,一起进步,一起对抗互联网寒冬
首先,丝毫不用怀疑,定时任务绝对不是Spring首创的,JDK本身就提供了很多种实现定时任务的方式。
来看一种最简单的实现:
实际上JDK还提供了定时任务线程池ScheduledThreadPool,我们可以直接通过Executors工具类获取:
和一般的线程池不一样的是,ScheduledThreadPool会不断地、定时地执行提交的任务。
定时任务方法的命名方式都大同小异,都叫scheduleXxx(),后面还会在别处见到这种命名方式,注意一下。
那Spring是如何实现@Scheduled的呢?
还是从@Scheduled注解入手:
我们发现@Scheduled注解是由ScheduledAnnotationBeanPostProcessor这个后置处理器处理的:
框出来的内容意思是:
当前这个PostProcessor负责注册那些加了@Scheduled注解的方法,然后根据@Scheduled注解的fixedRate、fixedDelay等属性分别交给TaskScheduler(定时任务线程池)执行。
现在有两个问题:
- 这个后置处理器怎么被加入Spring容器的?
- @Scheduled标注的方法是如何被TaskScheduler定时任务线程池处理的?
@EnableScheduling的作用
先解决第一个问题。
我们从类注释可以看到,ScheduledAnnotationBeanPostProcessor可以通过XML形式的<task:annotation-driven>标签或者注解形式的@EnableScheduling导入到Spring容器。我们主要观察@EnableScheduling:
发现@EnableScheduling导入了一个配置类:
哦,原来@EnableScheduling经过一连串的套娃操作后最终会通过@Bean的方式把ScheduledAnnotationBeanPostProcessor加入到Spring容器。
@Scheduled方法的执行流程
接下来解决第二个问题:@Scheduled标注的方法是如何被定时执行的?
分两步:
- 收集@Scheduled方法
- 执行定时任务
还是以上一篇的定时任务为例,我们通过debug的方式过一遍流程好了。
收集@Scheduled方法
每个Bean(包括TaskOne)会经过@EnableScheduling导入的ScheduledAnnotationBeanPostProcessor:
forEach会调用本类的另一个processScheduled方法,它会根据每个定时任务@Scheduled注解中的fixedRate、fixedDelay以及cron属性,分别将每个任务存储到ScheduledTaskRegistrar的不同TaskList。
跟进ScheduledTaskRegistrar#scheduleCronTask()看看:
好了,至此所有@Scheduled方法都已经被包装成Task存储到不同的任务列表中了。那么,这些任务怎么被定时调用的呢?
执行定时任务
在之前使用Java的ScheduledThreadPool执行定时任务时,都存在一个提交任务的动作(scheduleXxx):
而Spring的@Scheduled底层也是用线程池的。我们来找找。
由于ScheduledAnnotationBeanPostProcessor实现了ApplicationListener的onApplicationEvent(),所以在整个Spring容器启动完毕后最终会调用finishRegistration(),在所有定时任务注册完毕后做一些事。
你会发现,finishRegistration()内部会调用registrar.afterPropertiesSet()。
嗯?registrar?就是收集了各种定时任务的那个ScheduledTaskRegistrar吗?
没错,你猜到了,接下来就是把ScheduledTaskRegistrar的任务逐个提交给taskScheduler线程池即可。afterPropertiesSet()调用了scheduleTasks(),这个方法会把每个任务提交给线程池执行。
Executors.newSingleThreadScheduledExecutor(),这解释了为什么SpringBoot默认的定时任务是单线程的。
addScheduledTask()内部调用了scheduleCronTask(task),把任务提交给线程池:
定时任务线程池不是这么配的!
其实,在上一篇文章中,我自己也犯了一个错误。在注意到SpringBoot默认的定时任务是单线程后,想都不想直接配置了一个线程池:
还在定时任务上加了@Async:
实际上这并不是一个很好的选择。
如果我们在ScheduledTaskRegistrar#scheduleTasks()上打断点观察,会发现其实我们配置的线程池并没有赋值给taskExecutor:
此时定时任务线程池仍然是默认的SingleThreadScheduledExecutor。
你可能会想:你是不是搞错了,我明明看到控制台打印的线程名称是自定义的线程池啊。
是的,正是因为打印的是自定义线程名称,我才疏忽了。虽然从最终效果看,确实走了自定义线程池的线程,但这是因为我们在方法上加了@Async。
此时定时任务还是单线程,只不过当这个方法执行时,刚好因为加了@Async,所以走了异步。SpringBoot单线程的定时任务只需要把每个任务推到异步线程池就可以扭头执行下一个了,时间很短,看起来好像不是串行的!
也就是说,我们误打误撞地以异步的方式解决了单线程阻塞的问题,但却没有从根本改变定时任务的线程池,它还是单线程!
那么,怎样才能真正替换默认的SingleThreadScheduledExecutor呢?
还是要回到ScheduledAnnotationBeanPostProcessor#finishRegistration()方法:
private void finishRegistration() {
if (this.scheduler != null) {
this.registrar.setScheduler(this.scheduler);
}
// 1 --- 查找是否有SchedulingConfigurer类型的自定义的bean(后面解释)
if (this.beanFactory instanceof ListableBeanFactory) {
Map<String, SchedulingConfigurer> beans =
((ListableBeanFactory) this.beanFactory).getBeansOfType(SchedulingConfigurer.class);
List<SchedulingConfigurer> configurers = new ArrayList<>(beans.values());
AnnotationAwareOrderComparator.sort(configurers);
for (SchedulingConfigurer configurer : configurers) {
configurer.configureTasks(this.registrar);
}
}
// 2 --- 如果经过上一步registrar中的taskScheduler仍然未被赋值
if (this.registrar.hasTasks() && this.registrar.getScheduler() == null) {
Assert.state(this.beanFactory != null, "BeanFactory must be set to find scheduler by type");
// 2.1 --- 查找Spring容器中TaskScheduler类型的Bean
try {
this.registrar.setTaskScheduler(resolveSchedulerBean(this.beanFactory, TaskScheduler.class, false));
}
// 2.2 --- 存在多个TaskScheduler类型的Bean
catch (NoUniqueBeanDefinitionException ex) {
logger.debug("Could not find unique TaskScheduler bean", ex);
// 2.1.2 --- 由于存在多个,这次改为按名字来确定,DEFAULT_TASK_SCHEDULER_BEAN_NAME = "taskScheduler";
try {
this.registrar.setTaskScheduler(resolveSchedulerBean(this.beanFactory, TaskScheduler.class, true));
}
// 2.1.3 --- 找不到名为"taskScheduler"的Bean,抛异常
catch (NoSuchBeanDefinitionException ex2) {
// 注意下面的异常信息,解释得很清楚了,甚至告诉我们如何自定义定时任务线程池
if (logger.isInfoEnabled()) {
logger.info("More than one TaskScheduler bean exists within the context, and " +
"none is named 'taskScheduler'. Mark one of them as primary or name it 'taskScheduler' " +
"(possibly as an alias); or implement the SchedulingConfigurer interface and call " +
"ScheduledTaskRegistrar#setScheduler explicitly within the configureTasks() callback: " +
ex.getBeanNamesFound());
}
}
}
// 2.3 --- 不存在TaskScheduler类型的Bean
catch (NoSuchBeanDefinitionException ex) {
logger.debug("Could not find default TaskScheduler bean", ex);
// 2.3.1 --- 查找Spring容器中ScheduledExecutorService类型的Bean
try {
this.registrar.setScheduler(resolveSchedulerBean(this.beanFactory, ScheduledExecutorService.class, false));
}
// 2.3.2 --- 存在多个ScheduledExecutorService类型的Bean
catch (NoUniqueBeanDefinitionException ex2) {
logger.debug("Could not find unique ScheduledExecutorService bean", ex2);
// 2.3.2 --- 由于存在多个,这次改为按名字来确定,DEFAULT_TASK_SCHEDULER_BEAN_NAME = "taskScheduler";
try {
this.registrar.setScheduler(resolveSchedulerBean(this.beanFactory, ScheduledExecutorService.class, true));
}
// 2.3.3 --- 找不到名为"taskScheduler"的Bean,抛异常
catch (NoSuchBeanDefinitionException ex3) {
// 注意下面的异常信息,解释得很清楚了,甚至告诉我们如何自定义定时任务线程池
if (logger.isInfoEnabled()) {
logger.info("More than one ScheduledExecutorService bean exists within the context, and " +
"none is named 'taskScheduler'. Mark one of them as primary or name it 'taskScheduler' " +
"(possibly as an alias); or implement the SchedulingConfigurer interface and call " +
"ScheduledTaskRegistrar#setScheduler explicitly within the configureTasks() callback: " +
ex2.getBeanNamesFound());
}
}
}
catch (NoSuchBeanDefinitionException ex2) {
logger.debug("Could not find default ScheduledExecutorService bean", ex2);
// Giving up -> falling back to default scheduler within the registrar...
// 翻译:放弃,沿用registrar内部默认的scheduler(单线程)
logger.info("No TaskScheduler/ScheduledExecutorService bean found for scheduled processing");
}
}
}
this.registrar.afterPropertiesSet();
}
从源码可以看出来,SpringBoot对外暴露了三种自定义定时任务线程池的方法:
- 如果存在SchedulingConfigurer类型的Bean,用SchedulingConfigurer
- 如果存在TaskScheduler类型的Bean,用TaskScheduler
- 如果存在ScheduledExecutorService类型的Bean,用ScheduledExecutorService
- 以上都没有,就用默认的
找到以上任意一种定时任务线程池,都会调用ScheduledTaskRegistrar#setScheduler()为taskScheduler赋值:
而我们之前配置线程池是这样的,不属于上面任意一种:
所以,最终会使用默认的单线程定时任务,然后通过@Async走异步,看起来好像不是串行。
配置定时任务线程池的3种方式
分析源码后,我们很容易得到以下三种配置定时任务线程池的方式(请把@Async去掉,不需要)。
方式1:重写SchedulingConfigurer#configureTasks()
@Slf4j
@Configuration
public class ThreadPoolTaskConfig implements WebMvcConfigurer {
// 方式1: SchedulingConfigurer 其实内部还是设置一个TaskScheduler
@Bean
public SchedulingConfigurer schedulingConfigurer() {
return new MySchedulingConfigurer();
}
static class MySchedulingConfigurer implements SchedulingConfigurer {
@Override
public void configureTasks(ScheduledTaskRegistrar taskRegistrar) {
ThreadPoolTaskScheduler taskScheduler = new ThreadPoolTaskScheduler();
taskScheduler.setPoolSize(3);
taskScheduler.setThreadNamePrefix("schedule-task-");
taskScheduler.setRejectedExecutionHandler(
new RejectedExecutionHandler() {
/**
* 自定义线程池拒绝策略(模拟发送告警邮件)
*/
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
log.info("发送告警邮件======>:嘿沙雕,线上定时任务卡爆了, 当前线程名称为:{}, 当前线程池队列长度为:{}",
r.toString(),
executor.getQueue().size());
}
});
taskScheduler.initialize();
taskRegistrar.setScheduler(taskScheduler);
}
}
}
这样看起来有点别扭,其实可以把MySchedulingConfigurer单独作为一个类,加上@Component即可。
@Slf4j
@Component
class SchedulingConfig implements SchedulingConfigurer {
@Override
public void configureTasks(ScheduledTaskRegistrar taskRegistrar) {
ThreadPoolTaskScheduler taskScheduler = new ThreadPoolTaskScheduler();
taskScheduler.setPoolSize(3);
taskScheduler.setThreadNamePrefix("schedule-task-");
taskScheduler.setRejectedExecutionHandler(
new RejectedExecutionHandler() {
/**
* 自定义线程池拒绝策略(模拟发送告警邮件)
*/
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
log.info("发送告警邮件======>:嘿沙雕,线上定时任务卡爆了, 当前线程名称为:{}, 当前线程池队列长度为:{}",
r.toString(),
executor.getQueue().size());
}
});
taskScheduler.initialize();
taskRegistrar.setScheduler(taskScheduler);
}
}
方式2:@Bean + ThreadPoolTaskScheduler
@Slf4j
@Configuration
public class ThreadPoolTaskConfig implements WebMvcConfigurer {
// 方式2: taskScheduler
@Bean("taskScheduler")
public Executor taskScheduler() {
ThreadPoolTaskScheduler taskScheduler = new ThreadPoolTaskScheduler();
taskScheduler.setPoolSize(3);
taskScheduler.setThreadNamePrefix("schedule-task-");
taskScheduler.setRejectedExecutionHandler(
new RejectedExecutionHandler() {
/**
* 自定义线程池拒绝策略(模拟发送告警邮件)
*/
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
log.info("发送告警邮件======>:嘿沙雕,线上定时任务卡爆了, 当前线程名称为:{}, 当前线程池队列长度为:{}",
r.toString(),
executor.getQueue().size());
}
});
taskScheduler.initialize();
return taskScheduler;
}
}
方式3:@Bean + ScheduledThreadPoolExecutor
@Slf4j
@Configuration
public class ThreadPoolTaskConfig implements WebMvcConfigurer {
// 方式3
@Bean("taskScheduler")
public Executor taskScheduler() {
ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(
3,
new RejectedExecutionHandler() {
/**
* 自定义线程池拒绝策略(模拟发送告警邮件)
*/
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
log.info("发送告警邮件======>:嘿沙雕,线上定时任务卡爆了, 当前线程名称为:{}, 当前线程池队列长度为:{}",
r.toString(),
executor.getQueue().size());
}
}
);
executor.setMaximumPoolSize(5);
executor.setKeepAliveTime(60, TimeUnit.SECONDS);
return executor;
}
}
个人比较推荐方式1和方式2。
方式1的另一个作用是可以实现动态配置定时任务的时间,但真的很麻烦。
通过配置文件也可改变定时任务线程数
上面几种方式看起来比较复杂,因为我们采用的是自定义线程池的方式,除了改变线程数还重写了拒绝策略等。如果仅仅想要增加定时任务线程池的线程数,可以直接在配置文件中更改:
最后再次强调SpringBoot定时任务和@Async没有任何关系,不需要加@Async。
写这一篇的目的不是为了深究SpringBoot定时任务,因为上一篇说了,它有先天的不足,最致命的就是不支持分布式部署。但我希望通过这一篇的源码分析,大家能再次回顾BeanPostProcessor的机制以及重视线程池的分类。
作者简介:大家好,我是smart哥,前中兴通讯、美团架构师,现某互联网公司CTO
进群,大家一起学习,一起进步,一起对抗互联网寒冬
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。 如若内容造成侵权/违法违规/事实不符,请联系我的编程经验分享网邮箱:veading@qq.com进行投诉反馈,一经查实,立即删除!