coderj的博客

talk is cheap show me the code

0%

背景

上午 10 点左右我被拉进一个群里,群里有个同事反馈某个接口延时变高希望我们处理下。我本人之前不负责这个服务,因为同事都 🐑 了,所以叫我来救火 🐶!(无从下手,一头雾水)

jvm gc time/count 监控

先看了下监控服务 jvm 果然不正常 ,gc 耗时 80 多秒了 🐶
image.pngimage.png

接口流量监控

然后看一下接口流量上有啥变化,发现某个接口的调用量从前一天下午三点后开始增多,推测是这个是触发 gc 异常的条件
image.png

MAT 分析 dump 文件

不过光有推测还是不行的,需要拿出证据,可惜的是上图 jvm 异常的现场没有了(我另外一个同事使用了重启大法)。我只能在测试环境模拟复现这个场景,写个脚本大量调用这个接口,果然测试环境的 gc 也异常了,然后我 dump 了整个 jvm,再使用我的大杀器 MAT 分析。MAT 提示 org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler 类的某个对象占用 1.65g 的内存不能释放,这不就是 spring 对线程池的封装嘛,那有理由推测是线程池使用不当导致内存泄露的。那是谁使用了这个类呢?
image.png
image.png

谁使用了 ThreadPoolTaskScheduler

看了下这个类的源码,发现子类很多调用这个类的也很多,此时毫无头绪。无奈,一顿百度找到了一些灵感,spring 的@async 注解
image.png

是否使用了@async 注解

撸一下源码看一下这个接口是否使用了该注解,如下图果然使用了
image.png
image.png
image.png

@async 注解为啥会导致内存溢出

先分析下,ThreadPoolTaskScheduler 是个线程池,线程池导致内存溢出的可能就是线程池的队列是个无界队列。那我们来验证一下这个猜想。

AsyncExecutionInterceptor

处理@async 注解的入口:invoke 方法 ,其实就是基于 aop 实现的

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
   /**
* Intercept the given method invocation, submit the actual calling of the method to
* the correct task executor and return immediately to the caller.
* @param invocation the method to intercept and make asynchronous
* @return {@link Future} if the original method returns {@code Future}; {@code null}
* otherwise.
*/
@Override
@Nullable
public Object invoke(final MethodInvocation invocation) throws Throwable {
Class<?> targetClass = (invocation.getThis() != null ? AopUtils.getTargetClass(invocation.getThis()) : null);
Method specificMethod = ClassUtils.getMostSpecificMethod(invocation.getMethod(), targetClass);
final Method userDeclaredMethod = BridgeMethodResolver.findBridgedMethod(specificMethod);

// 申明一个线程池
AsyncTaskExecutor executor = determineAsyncExecutor(userDeclaredMethod);
if (executor == null) {
throw new IllegalStateException(
"No executor specified and no default executor set on AsyncExecutionInterceptor either");
}

Callable<Object> task = () -> {
try {
Object result = invocation.proceed();
if (result instanceof Future) {
return ((Future<?>) result).get();
}
}
catch (ExecutionException ex) {
handleError(ex.getCause(), userDeclaredMethod, invocation.getArguments());
}
catch (Throwable ex) {
handleError(ex, userDeclaredMethod, invocation.getArguments());
}
return null;
};

// 向线程池提交任务
return doSubmit(task, executor, invocation.getMethod().getReturnType());
}
AsyncExecutionAspectSupport

获取线程池的具体实现,默认是从 spring 中获取一个类型是 TaskExecutor 的 bean

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
   /**
* Create a new instance with a default {@link AsyncUncaughtExceptionHandler}.
* @param defaultExecutor the {@code Executor} (typically a Spring {@code AsyncTaskExecutor}
* or {@link java.util.concurrent.ExecutorService}) to delegate to, unless a more specific
* executor has been requested via a qualifier on the async method, in which case the
* executor will be looked up at invocation time against the enclosing bean factory
*/
public AsyncExecutionAspectSupport(@Nullable Executor defaultExecutor) {
this.defaultExecutor = new SingletonSupplier<>(defaultExecutor, () -> getDefaultExecutor(this.beanFactory));
this.exceptionHandler = SingletonSupplier.of(SimpleAsyncUncaughtExceptionHandler::new);
}


// 获取线程池的具体实现
@Nullable
protected AsyncTaskExecutor determineAsyncExecutor(Method method) {
AsyncTaskExecutor executor = this.executors.get(method);
if (executor == null) {
Executor targetExecutor;
String qualifier = getExecutorQualifier(method);
if (StringUtils.hasLength(qualifier)) {
targetExecutor = findQualifiedExecutor(this.beanFactory, qualifier);
}
else {
// 如果@async注解没有制定线程池则使用默认线程池
targetExecutor = this.defaultExecutor.get();
}
if (targetExecutor == null) {
return null;
}
executor = (targetExecutor instanceof AsyncListenableTaskExecutor ?
(AsyncListenableTaskExecutor) targetExecutor : new TaskExecutorAdapter(targetExecutor));
this.executors.put(method, executor);
}
return executor;
}


/**
* Retrieve or build a default executor for this advice instance.
* An executor returned from here will be cached for further use.
* <p>The default implementation searches for a unique {@link TaskExecutor} bean
* in the context, or for an {@link Executor} bean named "taskExecutor" otherwise.
* If neither of the two is resolvable, this implementation will return {@code null}.
* @param beanFactory the BeanFactory to use for a default executor lookup
* @return the default executor, or {@code null} if none available
* @since 4.2.6
* @see #findQualifiedExecutor(BeanFactory, String)
* @see #DEFAULT_TASK_EXECUTOR_BEAN_NAME
*/
@Nullable
protected Executor getDefaultExecutor(@Nullable BeanFactory beanFactory) {
if (beanFactory != null) {
try {
// Search for TaskExecutor bean... not plain Executor since that would
// match with ScheduledExecutorService as well, which is unusable for
// our purposes here. TaskExecutor is more clearly designed for it.
// 从bean工厂中获取线程池bean实例
return beanFactory.getBean(TaskExecutor.class);
}
catch (NoUniqueBeanDefinitionException ex) {
logger.debug("Could not find unique TaskExecutor bean", ex);
try {
return beanFactory.getBean(DEFAULT_TASK_EXECUTOR_BEAN_NAME, Executor.class);
}
catch (NoSuchBeanDefinitionException ex2) {
if (logger.isInfoEnabled()) {
logger.info("More than one TaskExecutor bean found within the context, and none is named " +
"'taskExecutor'. Mark one of them as primary or name it 'taskExecutor' (possibly " +
"as an alias) in order to use it for async processing: " + ex.getBeanNamesFound());
}
}
}
catch (NoSuchBeanDefinitionException ex) {
logger.debug("Could not find default TaskExecutor bean", ex);
try {
return beanFactory.getBean(DEFAULT_TASK_EXECUTOR_BEAN_NAME, Executor.class);
}
catch (NoSuchBeanDefinitionException ex2) {
logger.info("No task executor bean found for async processing: " +
"no bean of type TaskExecutor and no bean named 'taskExecutor' either");
}
// Giving up -> either using local default executor or none at all...
}
}
return null;
}
TaskSchedulingAutoConfiguration

自动装配 ThreadPoolTaskScheduler (实现 TaskExecutor),所以 spring bean 工厂就有了 TaskExecutor 实例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
@ConditionalOnClass(ThreadPoolTaskScheduler.class)
@Configuration
@EnableConfigurationProperties(TaskSchedulingProperties.class)
public class TaskSchedulingAutoConfiguration {

@Bean
@ConditionalOnBean(name = TaskManagementConfigUtils.SCHEDULED_ANNOTATION_PROCESSOR_BEAN_NAME)
@ConditionalOnMissingBean({ SchedulingConfigurer.class, TaskScheduler.class,
ScheduledExecutorService.class })
public ThreadPoolTaskScheduler taskScheduler(TaskSchedulerBuilder builder) {
return builder.build();
}

@Bean
@ConditionalOnMissingBean
public TaskSchedulerBuilder taskSchedulerBuilder(TaskSchedulingProperties properties,
ObjectProvider<TaskSchedulerCustomizer> taskSchedulerCustomizers) {
TaskSchedulerBuilder builder = new TaskSchedulerBuilder();
builder = builder.poolSize(properties.getPool().getSize());
builder = builder.threadNamePrefix(properties.getThreadNamePrefix());
builder = builder.customizers(taskSchedulerCustomizers);
return builder;
}

}
ThreadPoolTaskScheduler 和 TaskExecutor 类关系图

image.png

ExecutorConfigurationSupport

初始化线程池辅助类,使用的模版方法模式 由子类定义初始化线程池具体实现,此类实现了 InitializingBean 接口

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
/**
* Calls {@code initialize()} after the container applied all property values.
* @see #initialize()
*/
@Override
public void afterPropertiesSet() {
initialize();
}

/**
* Set up the ExecutorService.
*/
public void initialize() {
if (logger.isInfoEnabled()) {
logger.info("Initializing ExecutorService" + (this.beanName != null ? " '" + this.beanName + "'" : ""));
}
if (!this.threadNamePrefixSet && this.beanName != null) {
setThreadNamePrefix(this.beanName + "-");
}
this.executor = initializeExecutor(this.threadFactory, this.rejectedExecutionHandler);
}

ThreadPoolTaskScheduler

实现 ExecutorConfigurationSupport,初始化过程,默认使用的是 DelayedWorkQueue 无界队列

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
    @Override
protected ExecutorService initializeExecutor(
ThreadFactory threadFactory, RejectedExecutionHandler rejectedExecutionHandler) {

this.scheduledExecutor = createExecutor(this.poolSize, threadFactory, rejectedExecutionHandler);

if (this.removeOnCancelPolicy) {
if (this.scheduledExecutor instanceof ScheduledThreadPoolExecutor) {
((ScheduledThreadPoolExecutor) this.scheduledExecutor).setRemoveOnCancelPolicy(true);
}
else {
logger.debug("Could not apply remove-on-cancel policy - not a ScheduledThreadPoolExecutor");
}
}

return this.scheduledExecutor;
}


/**
* Create a new {@link ScheduledExecutorService} instance.
* <p>The default implementation creates a {@link ScheduledThreadPoolExecutor}.
* Can be overridden in subclasses to provide custom {@link ScheduledExecutorService} instances.
* @param poolSize the specified pool size
* @param threadFactory the ThreadFactory to use
* @param rejectedExecutionHandler the RejectedExecutionHandler to use
* @return a new ScheduledExecutorService instance
* @see #afterPropertiesSet()
* @see java.util.concurrent.ScheduledThreadPoolExecutor
*/
protected ScheduledExecutorService createExecutor(
int poolSize, ThreadFactory threadFactory, RejectedExecutionHandler rejectedExecutionHandler) {

return new ScheduledThreadPoolExecutor(poolSize, threadFactory, rejectedExecutionHandler);
}


/**
* Creates a new ScheduledThreadPoolExecutor with the given
* initial parameters.
*
* @param corePoolSize the number of threads to keep in the pool, even
* if they are idle, unless {@code allowCoreThreadTimeOut} is set
* @param threadFactory the factory to use when the executor
* creates a new thread
* @param handler the handler to use when execution is blocked
* because the thread bounds and queue capacities are reached
* @throws IllegalArgumentException if {@code corePoolSize < 0}
* @throws NullPointerException if {@code threadFactory} or
* {@code handler} is null
*/
public ScheduledThreadPoolExecutor(int corePoolSize,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
// 默认使用的是DelayedWorkQueue 无界队列
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue(), threadFactory, handler);
}

背景

为响应国家 《数据安全法》的要求,我司数据安全部门要求各个业务部门将现存所有 mysql 表中敏感字段加密存储。而我所负责的系统模块有 10 几张表分布在 3 ~ 4 个微服务中,因此我想设计一个通用的改动最少方法对这些表字段加密。

1.0 版本设计

因我司大部分项目使用的是 mybatis 或者 mybatis-plus ORM 框架,所以 1.0 版本的设计是使用 mybatis 拦截器 interceptor 做数据的加密和解密。

遇到的问题

mybatis generator 可以生成多种方式的 mapper
xxxExample 格式的查询语句
dynamic sql 格式的查询

解决方案

1.1 版本设计

mybatis 拦截器 + jsqlparser

2.0 版本设计

因 1.0 版本和 1.1 版本设计的各种问题,需要一种通用性更强的设计,经过各种百度先后确定使用 apache shardingsphere-jdbc encrypt 模块来做加解密。

遇到的问题

mysql schema meta 无法获取

ShardingSphereResultSet

EncryptMergedResult:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
public final class EncryptMergedResult implements MergedResult {

private final EncryptAlgorithmMetaData metaData;

private final MergedResult mergedResult;

@Override
public boolean next() throws SQLException {
return mergedResult.next();
}

@SuppressWarnings({"rawtypes", "unchecked"})
@Override
public Object getValue(final int columnIndex, final Class<?> type) throws SQLException {
Optional<EncryptContext> encryptContext = metaData.findEncryptContext(columnIndex);
if (!encryptContext.isPresent() || !metaData.isQueryWithCipherColumn(encryptContext.get().getTableName(), encryptContext.get().getColumnName())) {
return mergedResult.getValue(columnIndex, type);
}
Optional<EncryptAlgorithm> encryptAlgorithm = metaData.findEncryptor(encryptContext.get().getTableName(), encryptContext.get().getColumnName());
if (!encryptAlgorithm.isPresent()) {
return mergedResult.getValue(columnIndex, type);
}
Object cipherValue = mergedResult.getValue(columnIndex, Object.class);
return null == cipherValue ? null : encryptAlgorithm.get().decrypt(cipherValue, encryptContext.get());
}

@Override
public Object getCalendarValue(final int columnIndex, final Class<?> type, final Calendar calendar) throws SQLException {
return mergedResult.getCalendarValue(columnIndex, type, calendar);
}

@Override
public InputStream getInputStream(final int columnIndex, final String type) throws SQLException {
return mergedResult.getInputStream(columnIndex, type);
}

@Override
public boolean wasNull() throws SQLException {
return mergedResult.wasNull();
}
}

解决方案

拉取 shardingsphere 源码 修改/编译/打包/发私服

总结

入口

1
2
3
4
5
// 启动入口
SpringApplication.run(xxxxApplication.class, args);

// run实际调用
new SpringApplication(primarySources).run(args);

SpringApplication构造器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public SpringApplication(ResourceLoader resourceLoader, Class<?>... primarySources) {
this.resourceLoader = resourceLoader;
Assert.notNull(primarySources, "PrimarySources must not be null");
this.primarySources = new LinkedHashSet<>(Arrays.asList(primarySources));
// 获取web容器类型
this.webApplicationType = WebApplicationType.deduceFromClasspath();
/*
* getSpringFactoriesInstances(ApplicationContextInitializer.class) 从META-INF/spring.factories中加载初始化器的 * 具体实现类并实例化
*
* getSpringFactoriesInstances(ApplicationListener.class) 从META-INF/spring.factories中加载 监听器的具体实现类并 * 实例化
*
*/
setInitializers((Collection) getSpringFactoriesInstances(
ApplicationContextInitializer.class));
setListeners((Collection) getSpringFactoriesInstances(ApplicationListener.class));
this.mainApplicationClass = deduceMainApplicationClass();
}

SpringApplication.run()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
public ConfigurableApplicationContext run(String... args) {
StopWatch stopWatch = new StopWatch();
stopWatch.start();
ConfigurableApplicationContext context = null;
Collection<SpringBootExceptionReporter> exceptionReporters = new ArrayList<>();
configureHeadlessProperty();
// 从factories加载EventPublishingRunListener并初始化
SpringApplicationRunListeners listeners = getRunListeners(args);
listeners.starting();
try {
ApplicationArguments applicationArguments = new DefaultApplicationArguments(
args);
// 准备环境 并发送ApplicationEnvironmentPreparedEvent事件
ConfigurableEnvironment environment = prepareEnvironment(listeners,
applicationArguments);
// 配置或略的bean
configureIgnoreBeanInfo(environment);
Banner printedBanner = printBanner(environment);
// 创建spring上下文
context = createApplicationContext();
exceptionReporters = getSpringFactoriesInstances(
SpringBootExceptionReporter.class,
new Class[] { ConfigurableApplicationContext.class }, context);
// 准备spring上下文 初始化bean
prepareContext(context, environment, listeners, applicationArguments,
printedBanner);
refreshContext(context);
afterRefresh(context, applicationArguments);
stopWatch.stop();
if (this.logStartupInfo) {
new StartupInfoLogger(this.mainApplicationClass)
.logStarted(getApplicationLog(), stopWatch);
}
listeners.started(context);
callRunners(context, applicationArguments);
}
catch (Throwable ex) {
handleRunFailure(context, ex, exceptionReporters, listeners);
throw new IllegalStateException(ex);
}

try {
listeners.running(context);
}
catch (Throwable ex) {
handleRunFailure(context, ex, exceptionReporters, null);
throw new IllegalStateException(ex);
}
return context;
}

springboot包中META-INF/spring.factories

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
# PropertySource Loaders
org.springframework.boot.env.PropertySourceLoader=\
org.springframework.boot.env.PropertiesPropertySourceLoader,\
org.springframework.boot.env.YamlPropertySourceLoader

# Run Listeners
org.springframework.boot.SpringApplicationRunListener=\
org.springframework.boot.context.event.EventPublishingRunListener 重要的事件发布监听器

# Error Reporters
org.springframework.boot.SpringBootExceptionReporter=\
org.springframework.boot.diagnostics.FailureAnalyzers

# Application Context Initializers
org.springframework.context.ApplicationContextInitializer=\
org.springframework.boot.context.ConfigurationWarningsApplicationContextInitializer,\
org.springframework.boot.context.ContextIdApplicationContextInitializer,\
org.springframework.boot.context.config.DelegatingApplicationContextInitializer,\
org.springframework.boot.web.context.ServerPortInfoApplicationContextInitializer

# Application Listeners
org.springframework.context.ApplicationListener=\
org.springframework.boot.ClearCachesApplicationListener,\
org.springframework.boot.builder.ParentContextCloserApplicationListener,\
org.springframework.boot.context.FileEncodingApplicationListener,\
org.springframework.boot.context.config.AnsiOutputApplicationListener,\
org.springframework.boot.context.config.ConfigFileApplicationListener,\
org.springframework.boot.context.config.DelegatingApplicationListener,\
org.springframework.boot.context.logging.ClasspathLoggingApplicationListener,\
org.springframework.boot.context.logging.LoggingApplicationListener,\
org.springframework.boot.liquibase.LiquibaseServiceLocatorApplicationListener

# Environment Post Processors
org.springframework.boot.env.EnvironmentPostProcessor=\
org.springframework.boot.cloud.CloudFoundryVcapEnvironmentPostProcessor,\
org.springframework.boot.env.SpringApplicationJsonEnvironmentPostProcessor,\
org.springframework.boot.env.SystemEnvironmentPropertySourceEnvironmentPostProcessor

# Failure Analyzers
org.springframework.boot.diagnostics.FailureAnalyzer=\
org.springframework.boot.diagnostics.analyzer.BeanCurrentlyInCreationFailureAnalyzer,\
org.springframework.boot.diagnostics.analyzer.BeanDefinitionOverrideFailureAnalyzer,\
org.springframework.boot.diagnostics.analyzer.BeanNotOfRequiredTypeFailureAnalyzer,\
org.springframework.boot.diagnostics.analyzer.BindFailureAnalyzer,\
org.springframework.boot.diagnostics.analyzer.BindValidationFailureAnalyzer,\
org.springframework.boot.diagnostics.analyzer.UnboundConfigurationPropertyFailureAnalyzer,\
org.springframework.boot.diagnostics.analyzer.ConnectorStartFailureAnalyzer,\
org.springframework.boot.diagnostics.analyzer.NoSuchMethodFailureAnalyzer,\
org.springframework.boot.diagnostics.analyzer.NoUniqueBeanDefinitionFailureAnalyzer,\
org.springframework.boot.diagnostics.analyzer.PortInUseFailureAnalyzer,\
org.springframework.boot.diagnostics.analyzer.ValidationExceptionFailureAnalyzer,\
org.springframework.boot.diagnostics.analyzer.InvalidConfigurationPropertyNameFailureAnalyzer,\
org.springframework.boot.diagnostics.analyzer.InvalidConfigurationPropertyValueFailureAnalyzer

# FailureAnalysisReporters
org.springframework.boot.diagnostics.FailureAnalysisReporter=\
org.springframework.boot.diagnostics.LoggingFailureAnalysisReporter
1
2
3
4
5
6
7
8
// 消费ApplicationEnvironmentPreparedEvent事件的监听器
FileEncodingApplicationListener
AnsiOutputApplicationListener
ConfigFileApplicationListener
DelegatingApplicationListener
ClasspathLoggingApplicationListener
LoggingApplicationListener