DOU士杨
2022/10/26阅读:30主题:默认主题
线程池使用不当造成的bug
问题描述
在生产环境突然发现一个监听容器内消息的类接收不到消息了...
这期间上过一次线【针对压测过程中出现的问题进行的一次优化】,上线之前是可以正常接收到消息的,但在上线之后就收不到消息了
于是开启了漫漫解bug的路程........
问题排查
第一步:优先排查上线内容
上线内容是增加了一个RedisStream的Config类。立即返回测试环境进行测试,测试结果是 存在相同的问题。第一次看,这个Config类与消息监听器完全没有关系。但现实情况确实是因为增加了这个类导致的
第二步:寻找两个类的关联性
贴代码如下:
监听消息的监听器
@Service
@Slf4j
@EnableAsync
public class MessageEventListener implements ApplicationListener<MessageEvent> {
@Autowired
private PaasService paasService;
@Autowired
private ThreadPoolTaskExecutor threadPoolTaskExecutor;
/**
* PAAS异步消息发送
*
* @param event 事件对象
*/
@Override
@Async
public void onApplicationEvent(MessageEvent event) {
MessageReqDTO messageReq = event.getPaasMessageReq();
paasService.sendMessage(messageReq);
log.info("消息事件处理完成:messageReq: {}", messageReq);
}
}
新增加的Redis的Stream配置类
@Slf4j
@Configuration
public class LoginStreamConfig {
@Autowired
private ThreadPoolTaskExecutor threadPoolTaskExecutor;
@Autowired
private LoginStreamListener listener;
@Value("${***:login-stream}")
public String streamName;
@Value("${***:login-group}")
private String groupName;
@Value("${***:login-consumer}")
private String consumerName;
@Bean(name = "messageListener")
public StreamMessageListenerContainer<String, ObjectRecord<String, String>> getStreamMessageListenerContainer(RedisConnectionFactory factory) {
StreamMessageListenerContainer.StreamMessageListenerContainerOptions<String, ObjectRecord<String, String>> options = StreamMessageListenerContainer.StreamMessageListenerContainerOptions
.builder().pollTimeout(Duration.ofSeconds(1)).batchSize(10).targetType(String.class)
.executor(threadPoolTaskExecutor).build();
StreamMessageListenerContainer<String, ObjectRecord<String, String>> container = StreamMessageListenerContainer
.create(factory, options);
//指定消费者对象
container.register(buildReadRequest(streamName,groupName,consumerName), listener);
container.start();
return container;
}
/**
* 构建readRequest
*
* @param streamName stream名
* @param groupName 组名
* @param consumerName 消费者名
* @return readRequest
*/
private StreamMessageListenerContainer.StreamReadRequest<String> buildReadRequest(String streamName, String groupName,
String consumerName) {
//指定消费最新的消息
StreamOffset<String> streamOffset = StreamOffset.create(streamName, ReadOffset.lastConsumed());
//创建消费者
Consumer consumer = Consumer.from(groupName, consumerName);
return StreamMessageListenerContainer.StreamReadRequest.builder(streamOffset)
.errorHandler((error) -> log.error(error.getMessage()))
.cancelOnError(e -> false).consumer(consumer)
//关闭自动ack确认
.autoAcknowledge(false).build();
}
第一次找这两个类,确实是没有任何关联....
思考中~~~
在代码排查的过程中,发现了原来使用的【另外一个监听器类】
番外:这个监听器原来只给当前的模块一使用,所以在这个模块下有一个监听器,后来因其他的模块也需要这个监听器,所以将监听器Copy了一份到公共模块中,将原来的监听器做了全部注释掉的操作, 所以项目中有了两个监听器,我们暂且将公共模块中的监听器 称为A监听器,原模块一中的监听器 称之为B监听器
尝试将公共模块中的监听器全部注释掉,打开B监听器,发现B监听器 可以正常的接收消息。
经对比发现,两个监听器的唯一区别是 在方法上 A监听器有@Async注解,而B监听器是没有这个异步的注解。
诶,难不成问题 出在这个异步的注解上??但没上线之前 是没有问题的啊。。
再去仔细看本次上线的配置类,其中的一个 @Autowired 引起我的注意
@Autowired
private ThreadPoolTaskExecutor threadPoolTaskExecutor;
这是一个线程池的引入,它的作用是为了异步启动Redis流的监听器,保证这个类处于持续的监听中
终于找到了突破口,这个问题一定与线程池有关,很可能是线程池的线程占满了,导致异步没有生效~
验证猜想
思路如下:
首先-回退代码
A、判读ThreadPoolTaskExecutor 与 异步@Async的线程池的关系
B、判断是否是线程池中的线程全部被占满
A、打印当前异步线程与ThreadPoolTaskExecutor线程池的名称,结果如下

很明显可以看出,@Async使用的线程池也是ThreadPoolTaskExecutor线程池,也就是两者会共享此线程池中的线程数
B、由下图可以看出,ThreadPoolTaskExecutor线程池核心线程数共计8个,目前已被占用数也是8个

结合线程池工作原理,如下图

可以看出,当核心线程数全部被使用之后,任务会被加入到队列中进行等待,而当前项目中恰好已经配置了8个RedisStreamConfig类,从而导致了收不到消息的问题
解决方式
因为Spring的事件监听器本身就是已经是异步的处理方式了,所以无须再使用@Async继续去异步执行
反思
项目中使用到ThreadPoolTaskExecutor的地方,需要注意配置的数量,如:当前项目已经配置了8个RedisStreamConfig的类,并且是在项目初始化时就占用了8个核心线程数,所以 在必要时 需要对线程池进行重新配置。
作者介绍