一文弄懂@Async代理执行原理(从源码的角度深入理解@EnableAsync 注解开启原理)
视频讲解:https://www.bilibili.com/video/BV1zi4y1e7fA
一直只知道 @Async是通过代理来实现的,在同一个方法里面调用为什么不可以,只是懵懂知道一点,抽时间刚好研究一下它的原理,发现和 @Transactional 的实现原理完全一样。
文章目录
一、理论
想要使用 @Async 异步执行,首先需要开启异步功能,也就是在启动类上加开启异步的注解 @EnableAsync
,然后在需要异步的方法上面加上异步注解即可 @Async
当在启动类上加了@EnableAsync 之后,它就会往容器里面注入一个 AsyncAnnotationBeanPostProcessor,它间接实现了BeanPostProcessor 和 BeanFactoryAware,重写了 setBeanFactory 和 postProcessAfterInitialization方法。
- BeanFactoryAware 初始化了一个 AsyncAnnotationAdvisor 里面包含了 advice 和 pointcut
-
- advice 里面定义了代理之后的逻辑 AnnotationAsyncExecutionInterceptor.invoke
-
- pointcut 里面定义了对使用 @Async 注解的类和方法进行拦截
- postProcessAfterInitialization 对满足pointcut 的bean 生成代理对象
二、代理对象的生成过程
2-1、开启异步 EnableAsync
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Import(AsyncConfigurationSelector.class)
public @interface EnableAsync {
Class<? extends Annotation> annotation() default Annotation.class;
boolean proxyTargetClass() default false;
// 默认使用 Proxy
AdviceMode mode() default AdviceMode.PROXY;
int order() default Ordered.LOWEST_PRECEDENCE;
}
public class AsyncConfigurationSelector extends AdviceModeImportSelector<EnableAsync> {
private static final String ASYNC_EXECUTION_ASPECT_CONFIGURATION_CLASS_NAME = "org.springframework.scheduling.aspectj.AspectJAsyncConfiguration";
@Override
@Nullable
public String[] selectImports(AdviceMode adviceMode) {
switch (adviceMode) {
case PROXY:
return new String[] {ProxyAsyncConfiguration.class.getName()};
case ASPECTJ:
return new String[] {ASYNC_EXECUTION_ASPECT_CONFIGURATION_CLASS_NAME};
default:
return null;
}
}
}
2-2、注入后置处理器 BeanPostProcessor
@Configuration(proxyBeanMethods = false)
@Role(BeanDefinition.ROLE_INFRASTRUCTURE)
public class ProxyAsyncConfiguration extends AbstractAsyncConfiguration {
@Bean(name = TaskManagementConfigUtils.ASYNC_ANNOTATION_PROCESSOR_BEAN_NAME)
@Role(BeanDefinition.ROLE_INFRASTRUCTURE)
public AsyncAnnotationBeanPostProcessor asyncAdvisor() {
Assert.notNull(this.enableAsync, "@EnableAsync annotation metadata was not injected");
// 设置 beforeExistingAdvisors 为true
AsyncAnnotationBeanPostProcessor bpp = new AsyncAnnotationBeanPostProcessor();
bpp.configure(this.executor, this.exceptionHandler);
Class<? extends Annotation> customAsyncAnnotation = this.enableAsync.getClass("annotation");
if (customAsyncAnnotation != AnnotationUtils.getDefaultValue(EnableAsync.class, "annotation")) {
bpp.setAsyncAnnotationType(customAsyncAnnotation);
}
bpp.setProxyTargetClass(this.enableAsync.getBoolean("proxyTargetClass"));
bpp.setOrder(this.enableAsync.<Integer>getNumber("order"));
return bpp;
}
}
设置 beforeExistingAdvisors 为true,因为一个对象可能需要被多种规则代理,比如某个方法上同时加了 @Transactional 和 @Async,设置beforeExistingAdvisors 为 true的会让 @Async 放在最前面,下面代码会看到
public AsyncAnnotationBeanPostProcessor() {
setBeforeExistingAdvisors(true);
}
AsyncAnnotationBeanPostProcessor类继承关系
2-3、代理之Advisor
AsyncAnnotationBeanPostProcessor 直接重写了 BeanFactoryAware的 setBeanFactory 方法,在该方法里面生成了 AsyncAnnotationAdvisor
@Override
public void setBeanFactory(BeanFactory beanFactory) {
super.setBeanFactory(beanFactory);
AsyncAnnotationAdvisor advisor = new AsyncAnnotationAdvisor(this.executor, this.exceptionHandler);
if (this.asyncAnnotationType != null) {
advisor.setAsyncAnnotationType(this.asyncAnnotationType);
}
advisor.setBeanFactory(beanFactory);
this.advisor = advisor;
}
AsyncAnnotationAdvisor 创建的时候会生成 advice 也就是代理拦截器,和 pointcut 生成代理的拦截规则
public AsyncAnnotationAdvisor(
@Nullable Supplier<Executor> executor, @Nullable Supplier<AsyncUncaughtExceptionHandler> exceptionHandler) {
Set<Class<? extends Annotation>> asyncAnnotationTypes = new LinkedHashSet<>(2);
asyncAnnotationTypes.add(Async.class);
try {
asyncAnnotationTypes.add((Class<? extends Annotation>)
ClassUtils.forName("javax.ejb.Asynchronous", AsyncAnnotationAdvisor.class.getClassLoader()));
}
catch (ClassNotFoundException ex) {
// If EJB 3.1 API not present, simply ignore.
}
this.advice = buildAdvice(executor, exceptionHandler);
this.pointcut = buildPointcut(asyncAnnotationTypes);
}
2-3-1、代理执行拦截器 interceptor
protected Advice buildAdvice(
@Nullable Supplier<Executor> executor, @Nullable Supplier<AsyncUncaughtExceptionHandler> exceptionHandler) {
AnnotationAsyncExecutionInterceptor interceptor = new AnnotationAsyncExecutionInterceptor(null);
interceptor.configure(executor, exceptionHandler);
return interceptor;
}
AnnotationAsyncExecutionInterceptor 的继承关系如下,在AsyncExecutionInterceptor 里面有个 invoke方法,代理之后执行就会走这个方法,下面详细讲解
2-3-2、拦截规则 pointcut
asyncAnnotationTypes 在上面可以看到传递的就是 Async.class ,这里生成了2个Pointcut,一个基于类,一个基于方法
protected Pointcut buildPointcut(Set<Class<? extends Annotation>> asyncAnnotationTypes) {
ComposablePointcut result = null;
for (Class<? extends Annotation> asyncAnnotationType : asyncAnnotationTypes) {
Pointcut cpc = new AnnotationMatchingPointcut(asyncAnnotationType, true);
Pointcut mpc = new AnnotationMatchingPointcut(null, asyncAnnotationType, true);
if (result == null) {
result = new ComposablePointcut(cpc);
}
else {
result.union(cpc);
}
result = result.union(mpc);
}
return (result != null ? result : Pointcut.TRUE);
}
2-4、代理对象生成
AsyncAnnotationBeanPostProcessor 的继承关系上面已经给出,在AbstractAdvisingBeanPostProcessor 重写了 BeanPostProcessor的 bean后置处理方法postProcessAfterInitialization ,在这个方法里面完成了代理对象的生成
public Object postProcessAfterInitialization(Object bean, String beanName) {
if (this.advisor != null && !(bean instanceof AopInfrastructureBean)) {
// 如果当前对象是代理对象,会走这个。 比如方法上面有 @Transactional ,走到这里之前就已经是代理对象了
if (bean instanceof Advised) {
Advised advised = (Advised)bean;
if (!advised.isFrozen() && this.isEligible(AopUtils.getTargetClass(bean))) {
// 这个字段已经为 true, 所以当前这个 advisor 会被放在最前,
// 想想,异步要放在最前这个是合理的
if (this.beforeExistingAdvisors) {
advised.addAdvisor(0, this.advisor);
} else {
advised.addAdvisor(this.advisor);
}
return bean;
}
}
// 正常情况我们的bean不是一个代理对象,所以走的是这个
// 这个就没什么好说的了,直接基于规则生成代理对象了
// isEligible 方法判断当前 bean是不是要被当前的 advisor 拦截代理
if (this.isEligible(bean, beanName)) {
ProxyFactory proxyFactory = this.prepareProxyFactory(bean, beanName);
if (!proxyFactory.isProxyTargetClass()) {
this.evaluateProxyInterfaces(bean.getClass(), proxyFactory);
}
proxyFactory.addAdvisor(this.advisor);
this.customizeProxyFactory(proxyFactory);
ClassLoader classLoader = this.getProxyClassLoader();
if (classLoader instanceof SmartClassLoader && classLoader != bean.getClass().getClassLoader()) {
classLoader = ((SmartClassLoader)classLoader).getOriginalClassLoader();
}
return proxyFactory.getProxy(classLoader);
} else {
return bean;
}
} else {
return bean;
}
}
org.springframework.aop.framework.AbstractAdvisingBeanPostProcessor#isEligible(java.lang.Class<?>)
protected boolean isEligible(Class<?> targetClass) {
Boolean eligible = (Boolean)this.eligibleBeans.get(targetClass);
if (eligible != null) {
return eligible;
} else if (this.advisor == null) {
return false;
} else {
eligible = AopUtils.canApply(this.advisor, targetClass);
this.eligibleBeans.put(targetClass, eligible);
return eligible;
}
}
2-5、扩展:bean后置处理方法怎么调用的
BeanPostProcessor 接口里面只有两个方法,bean的前置处理和后置处理
public interface BeanPostProcessor {
@Nullable
default Object postProcessBeforeInitialization(Object bean, String beanName) throws BeansException {
return bean;
}
@Nullable
default Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
return bean;
}
}
postProcessAfterInitialization 被调用的地方也只有一个 org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory#applyBeanPostProcessorsAfterInitialization,简单理解Spring在bean完成初始化所有动作之后就开始进行调用这个方法进行bean的后置处理
@Override
public Object applyBeanPostProcessorsAfterInitialization(Object existingBean, String beanName)
throws BeansException {
Object result = existingBean;
// 找到所有的BeanPostProcessors, 一个个去匹配看是否能匹配的上
for (BeanPostProcessor processor : getBeanPostProcessors()) {
// 后置处理
Object current = processor.postProcessAfterInitialization(result, beanName);
if (current == null) {
return result;
}
result = current;
}
return result;
}
每个bean都会走这个方法,我们可以用debug条件固定我们想要看到的bean
三、代理之后异步调用的原理
相对于 @Transactional 代理,@Async 代理的拦截操作要简单的多,简单来说就是把当前任务丢到线程池里面去运行
再来看一下异步代理拦截的继承关系图
在 AsyncExecutionInterceptor 里面有个 invoke 方法,代理之后的对象执行都走这个方法
public Object invoke(MethodInvocation invocation) throws Throwable {
// 找到当前要执行的类和方法
Class<?> targetClass = invocation.getThis() != null ? AopUtils.getTargetClass(invocation.getThis()) : null;
Method specificMethod = ClassUtils.getMostSpecificMethod(invocation.getMethod(), targetClass);
Method userDeclaredMethod = BridgeMethodResolver.findBridgedMethod(specificMethod);
// 找到执行当前方法的线程池
AsyncTaskExecutor executor = this.determineAsyncExecutor(userDeclaredMethod);
if (executor == null) {
throw new IllegalStateException("No executor specified and no default executor set on AsyncExecutionInterceptor either");
} else {
// 组装当前任务
Callable<Object> task = () -> {
try {
Object result = invocation.proceed();
if (result instanceof Future) {
return ((Future)result).get();
}
} catch (ExecutionException var4) {
this.handleError(var4.getCause(), userDeclaredMethod, invocation.getArguments());
} catch (Throwable var5) {
this.handleError(var5, userDeclaredMethod, invocation.getArguments());
}
return null;
};
// 由线程池去执行任务
return this.doSubmit(task, executor, invocation.getMethod().getReturnType());
}
}
3-1、异步执行线程池的选择
在使用异步注解的时候我们有两种写法
- @Async 使用默认的线程池去执行
- @Async(“myExecutor”) 使用自定义线程池去执行
protected AsyncTaskExecutor determineAsyncExecutor(Method method) {
// 如果之前这个方法已经执行过了,就从缓存中拿到上次执行的线程池
AsyncTaskExecutor executor = (AsyncTaskExecutor)this.executors.get(method);
if (executor == null) {
// 获取当前方法 @Async 中的 value值
String qualifier = this.getExecutorQualifier(method);
Executor targetExecutor;
// 如果 value不为空
if (StringUtils.hasLength(qualifier)) {
// 从bean容器中拿到这个线程池
targetExecutor = this.findQualifiedExecutor(this.beanFactory, qualifier);
} else {
// 使用默认的线程池
targetExecutor = (Executor)this.defaultExecutor.get();
}
if (targetExecutor == null) {
return null;
}
executor = targetExecutor instanceof AsyncListenableTaskExecutor ? (AsyncListenableTaskExecutor)targetExecutor : new TaskExecutorAdapter(targetExecutor);
// 缓存一下
this.executors.put(method, executor);
}
return (AsyncTaskExecutor)executor;
}
从BeanFactory 中拿到自定义的线程池
protected Executor findQualifiedExecutor(@Nullable BeanFactory beanFactory, String qualifier) {
if (beanFactory == null) {
throw new IllegalStateException("BeanFactory must be set on " + this.getClass().getSimpleName() + " to access qualified executor '" + qualifier + "'");
} else {
return (Executor)BeanFactoryAnnotationUtils.qualifiedBeanOfType(beanFactory, Executor.class, qualifier);
}
}
使用默认的线程池
protected Executor getDefaultExecutor(@Nullable BeanFactory beanFactory) {
if (beanFactory != null) {
try {
// 这个会报错,会找到2个
return (Executor)beanFactory.getBean(TaskExecutor.class);
} catch (NoUniqueBeanDefinitionException var6) {
try {
// 最终是走这个拿到名为 taskExecutor 的线程池
return (Executor)beanFactory.getBean("taskExecutor", Executor.class);
} catch (NoSuchBeanDefinitionException var4) {
// ...
}
} catch (NoSuchBeanDefinitionException var7) {
// ...
}
}
return null;
}
通过ApplicationContext拿到这个 taskExecutor,发现它的配置核心线程数8,最大线程数无限大
3-2、获取异步执行的结果
任务提交doSubmit方法如下,它使用了call方法,返回了一个 CompletableFuture
protected Object doSubmit(Callable<Object> task, AsyncTaskExecutor executor, Class<?> returnType) {
if (CompletableFuture.class.isAssignableFrom(returnType)) {
return CompletableFuture.supplyAsync(() -> {
try {
return task.call();
} catch (Throwable var2) {
throw new CompletionException(var2);
}
}, executor);
} else if (ListenableFuture.class.isAssignableFrom(returnType)) {
return ((AsyncListenableTaskExecutor)executor).submitListenable(task);
} else if (Future.class.isAssignableFrom(returnType)) {
return executor.submit(task);
} else {
executor.submit(task);
return null;
}
}
获取返回值样例
@GetMapping("/one")
public String fun() throws ExecutionException, InterruptedException {
CompletableFuture<String> stringCompletableFuture = asyncService.funA();
System.out.println("A的返回值: " + stringCompletableFuture.get());
return "ok";
}
@Async
public CompletableFuture<String> funA() {
System.out.println("AsyncServiceImpl.funA : " + Thread.currentThread().getName());
return CompletableFuture.completedFuture("funA");
}
四、继承异步
上面的结论没有提到一种特殊的场景,继承
public class Father {
@Async
public void fun1(){
System.out.println("father : " + Thread.currentThread().getName());
}
}
@Service
public class Son extends Father {
@Override
public void fun1(){
super.fun1();
System.out.println("son : " + Thread.currentThread().getName());
}
}
测试代码:
@Autowired
private Son son;
@GetMapping("/two")
public String two() {
System.out.println("main : " + Thread.currentThread().getName());
son.fun1();
return "ok";
}
执行结果:
main : main
father : task-1
son : task-1
是不是挺意外的? 理论上我们觉得 main和son 是一个线程, father是另外一个线程,但事实不是这样的。
这涉及到 CGLIB代理对象字节码生成逻辑挺复杂的,下次研究后再单独讲,这里先记三个点
- 使用 super调用父类方法的时候,和调用本类方法是一样的,不会异步
- 子类会继承父类方法上的 @Async,不管是否在重写的方法里面使用 super 关键字
- 如果子类不重写 fun1方法,直接调用父类的 fun1 也是异步
注:这一切将会在后面 CGLIB 代理对象生成的字节码里面揭晓
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。 如若内容造成侵权/违法违规/事实不符,请联系我的编程经验分享网邮箱:veading@qq.com进行投诉反馈,一经查实,立即删除!