coderj的博客

talk is cheap show me the code

0%

@Async注解引起的堆内存溢出故障分析

背景

上午 10 点左右我被拉进一个群里,群里有个同事反馈某个接口延时变高希望我们处理下。我本人之前不负责这个服务,因为同事都 🐑 了,所以叫我来救火 🐶!(无从下手,一头雾水)

jvm gc time/count 监控

先看了下监控服务 jvm 果然不正常 ,gc 耗时 80 多秒了 🐶
image.pngimage.png

接口流量监控

然后看一下接口流量上有啥变化,发现某个接口的调用量从前一天下午三点后开始增多,推测是这个是触发 gc 异常的条件
image.png

MAT 分析 dump 文件

不过光有推测还是不行的,需要拿出证据,可惜的是上图 jvm 异常的现场没有了(我另外一个同事使用了重启大法)。我只能在测试环境模拟复现这个场景,写个脚本大量调用这个接口,果然测试环境的 gc 也异常了,然后我 dump 了整个 jvm,再使用我的大杀器 MAT 分析。MAT 提示 org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler 类的某个对象占用 1.65g 的内存不能释放,这不就是 spring 对线程池的封装嘛,那有理由推测是线程池使用不当导致内存泄露的。那是谁使用了这个类呢?
image.png
image.png

谁使用了 ThreadPoolTaskScheduler

看了下这个类的源码,发现子类很多调用这个类的也很多,此时毫无头绪。无奈,一顿百度找到了一些灵感,spring 的@async 注解
image.png

是否使用了@async 注解

撸一下源码看一下这个接口是否使用了该注解,如下图果然使用了
image.png
image.png
image.png

@async 注解为啥会导致内存溢出

先分析下,ThreadPoolTaskScheduler 是个线程池,线程池导致内存溢出的可能就是线程池的队列是个无界队列。那我们来验证一下这个猜想。

AsyncExecutionInterceptor

处理@async 注解的入口:invoke 方法 ,其实就是基于 aop 实现的

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
   /**
* Intercept the given method invocation, submit the actual calling of the method to
* the correct task executor and return immediately to the caller.
* @param invocation the method to intercept and make asynchronous
* @return {@link Future} if the original method returns {@code Future}; {@code null}
* otherwise.
*/
@Override
@Nullable
public Object invoke(final MethodInvocation invocation) throws Throwable {
Class<?> targetClass = (invocation.getThis() != null ? AopUtils.getTargetClass(invocation.getThis()) : null);
Method specificMethod = ClassUtils.getMostSpecificMethod(invocation.getMethod(), targetClass);
final Method userDeclaredMethod = BridgeMethodResolver.findBridgedMethod(specificMethod);

// 申明一个线程池
AsyncTaskExecutor executor = determineAsyncExecutor(userDeclaredMethod);
if (executor == null) {
throw new IllegalStateException(
"No executor specified and no default executor set on AsyncExecutionInterceptor either");
}

Callable<Object> task = () -> {
try {
Object result = invocation.proceed();
if (result instanceof Future) {
return ((Future<?>) result).get();
}
}
catch (ExecutionException ex) {
handleError(ex.getCause(), userDeclaredMethod, invocation.getArguments());
}
catch (Throwable ex) {
handleError(ex, userDeclaredMethod, invocation.getArguments());
}
return null;
};

// 向线程池提交任务
return doSubmit(task, executor, invocation.getMethod().getReturnType());
}
AsyncExecutionAspectSupport

获取线程池的具体实现,默认是从 spring 中获取一个类型是 TaskExecutor 的 bean

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
   /**
* Create a new instance with a default {@link AsyncUncaughtExceptionHandler}.
* @param defaultExecutor the {@code Executor} (typically a Spring {@code AsyncTaskExecutor}
* or {@link java.util.concurrent.ExecutorService}) to delegate to, unless a more specific
* executor has been requested via a qualifier on the async method, in which case the
* executor will be looked up at invocation time against the enclosing bean factory
*/
public AsyncExecutionAspectSupport(@Nullable Executor defaultExecutor) {
this.defaultExecutor = new SingletonSupplier<>(defaultExecutor, () -> getDefaultExecutor(this.beanFactory));
this.exceptionHandler = SingletonSupplier.of(SimpleAsyncUncaughtExceptionHandler::new);
}


// 获取线程池的具体实现
@Nullable
protected AsyncTaskExecutor determineAsyncExecutor(Method method) {
AsyncTaskExecutor executor = this.executors.get(method);
if (executor == null) {
Executor targetExecutor;
String qualifier = getExecutorQualifier(method);
if (StringUtils.hasLength(qualifier)) {
targetExecutor = findQualifiedExecutor(this.beanFactory, qualifier);
}
else {
// 如果@async注解没有制定线程池则使用默认线程池
targetExecutor = this.defaultExecutor.get();
}
if (targetExecutor == null) {
return null;
}
executor = (targetExecutor instanceof AsyncListenableTaskExecutor ?
(AsyncListenableTaskExecutor) targetExecutor : new TaskExecutorAdapter(targetExecutor));
this.executors.put(method, executor);
}
return executor;
}


/**
* Retrieve or build a default executor for this advice instance.
* An executor returned from here will be cached for further use.
* <p>The default implementation searches for a unique {@link TaskExecutor} bean
* in the context, or for an {@link Executor} bean named "taskExecutor" otherwise.
* If neither of the two is resolvable, this implementation will return {@code null}.
* @param beanFactory the BeanFactory to use for a default executor lookup
* @return the default executor, or {@code null} if none available
* @since 4.2.6
* @see #findQualifiedExecutor(BeanFactory, String)
* @see #DEFAULT_TASK_EXECUTOR_BEAN_NAME
*/
@Nullable
protected Executor getDefaultExecutor(@Nullable BeanFactory beanFactory) {
if (beanFactory != null) {
try {
// Search for TaskExecutor bean... not plain Executor since that would
// match with ScheduledExecutorService as well, which is unusable for
// our purposes here. TaskExecutor is more clearly designed for it.
// 从bean工厂中获取线程池bean实例
return beanFactory.getBean(TaskExecutor.class);
}
catch (NoUniqueBeanDefinitionException ex) {
logger.debug("Could not find unique TaskExecutor bean", ex);
try {
return beanFactory.getBean(DEFAULT_TASK_EXECUTOR_BEAN_NAME, Executor.class);
}
catch (NoSuchBeanDefinitionException ex2) {
if (logger.isInfoEnabled()) {
logger.info("More than one TaskExecutor bean found within the context, and none is named " +
"'taskExecutor'. Mark one of them as primary or name it 'taskExecutor' (possibly " +
"as an alias) in order to use it for async processing: " + ex.getBeanNamesFound());
}
}
}
catch (NoSuchBeanDefinitionException ex) {
logger.debug("Could not find default TaskExecutor bean", ex);
try {
return beanFactory.getBean(DEFAULT_TASK_EXECUTOR_BEAN_NAME, Executor.class);
}
catch (NoSuchBeanDefinitionException ex2) {
logger.info("No task executor bean found for async processing: " +
"no bean of type TaskExecutor and no bean named 'taskExecutor' either");
}
// Giving up -> either using local default executor or none at all...
}
}
return null;
}
TaskSchedulingAutoConfiguration

自动装配 ThreadPoolTaskScheduler (实现 TaskExecutor),所以 spring bean 工厂就有了 TaskExecutor 实例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
@ConditionalOnClass(ThreadPoolTaskScheduler.class)
@Configuration
@EnableConfigurationProperties(TaskSchedulingProperties.class)
public class TaskSchedulingAutoConfiguration {

@Bean
@ConditionalOnBean(name = TaskManagementConfigUtils.SCHEDULED_ANNOTATION_PROCESSOR_BEAN_NAME)
@ConditionalOnMissingBean({ SchedulingConfigurer.class, TaskScheduler.class,
ScheduledExecutorService.class })
public ThreadPoolTaskScheduler taskScheduler(TaskSchedulerBuilder builder) {
return builder.build();
}

@Bean
@ConditionalOnMissingBean
public TaskSchedulerBuilder taskSchedulerBuilder(TaskSchedulingProperties properties,
ObjectProvider<TaskSchedulerCustomizer> taskSchedulerCustomizers) {
TaskSchedulerBuilder builder = new TaskSchedulerBuilder();
builder = builder.poolSize(properties.getPool().getSize());
builder = builder.threadNamePrefix(properties.getThreadNamePrefix());
builder = builder.customizers(taskSchedulerCustomizers);
return builder;
}

}
ThreadPoolTaskScheduler 和 TaskExecutor 类关系图

image.png

ExecutorConfigurationSupport

初始化线程池辅助类,使用的模版方法模式 由子类定义初始化线程池具体实现,此类实现了 InitializingBean 接口

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
/**
* Calls {@code initialize()} after the container applied all property values.
* @see #initialize()
*/
@Override
public void afterPropertiesSet() {
initialize();
}

/**
* Set up the ExecutorService.
*/
public void initialize() {
if (logger.isInfoEnabled()) {
logger.info("Initializing ExecutorService" + (this.beanName != null ? " '" + this.beanName + "'" : ""));
}
if (!this.threadNamePrefixSet && this.beanName != null) {
setThreadNamePrefix(this.beanName + "-");
}
this.executor = initializeExecutor(this.threadFactory, this.rejectedExecutionHandler);
}

ThreadPoolTaskScheduler

实现 ExecutorConfigurationSupport,初始化过程,默认使用的是 DelayedWorkQueue 无界队列

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
    @Override
protected ExecutorService initializeExecutor(
ThreadFactory threadFactory, RejectedExecutionHandler rejectedExecutionHandler) {

this.scheduledExecutor = createExecutor(this.poolSize, threadFactory, rejectedExecutionHandler);

if (this.removeOnCancelPolicy) {
if (this.scheduledExecutor instanceof ScheduledThreadPoolExecutor) {
((ScheduledThreadPoolExecutor) this.scheduledExecutor).setRemoveOnCancelPolicy(true);
}
else {
logger.debug("Could not apply remove-on-cancel policy - not a ScheduledThreadPoolExecutor");
}
}

return this.scheduledExecutor;
}


/**
* Create a new {@link ScheduledExecutorService} instance.
* <p>The default implementation creates a {@link ScheduledThreadPoolExecutor}.
* Can be overridden in subclasses to provide custom {@link ScheduledExecutorService} instances.
* @param poolSize the specified pool size
* @param threadFactory the ThreadFactory to use
* @param rejectedExecutionHandler the RejectedExecutionHandler to use
* @return a new ScheduledExecutorService instance
* @see #afterPropertiesSet()
* @see java.util.concurrent.ScheduledThreadPoolExecutor
*/
protected ScheduledExecutorService createExecutor(
int poolSize, ThreadFactory threadFactory, RejectedExecutionHandler rejectedExecutionHandler) {

return new ScheduledThreadPoolExecutor(poolSize, threadFactory, rejectedExecutionHandler);
}


/**
* Creates a new ScheduledThreadPoolExecutor with the given
* initial parameters.
*
* @param corePoolSize the number of threads to keep in the pool, even
* if they are idle, unless {@code allowCoreThreadTimeOut} is set
* @param threadFactory the factory to use when the executor
* creates a new thread
* @param handler the handler to use when execution is blocked
* because the thread bounds and queue capacities are reached
* @throws IllegalArgumentException if {@code corePoolSize < 0}
* @throws NullPointerException if {@code threadFactory} or
* {@code handler} is null
*/
public ScheduledThreadPoolExecutor(int corePoolSize,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
// 默认使用的是DelayedWorkQueue 无界队列
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue(), threadFactory, handler);
}