RocketMQ源码 Broker-BrokerFastFailure 快速失败请求组件源码分析
2023-12-14 15:44:39
前言
BrokerFastFailure 快速失败处理组件,主要负责,在系统激活broker快速失败能力时,以后台定时调度任务方式进行过期任务清理。主要对以下几类任务进行处理:
- 消息存储组件中的请求。如果消息存储组件中的OSPageCacheBusy(系统页缓存繁忙)为true,则对它的请求循环进行快速失败处理。
- 清理发送线程池队列中的过期请求。
- 清理拉取线程池队列中的过期请求。
- 清理心跳线程池队列中的过期请求。
- 清理事务线程池队列中的过期请求。
源码版本:4.9.3
源码分析
源码比较短,仅仅100多行,源码 + 注释 如下:
// broker快速失败组件
public class BrokerFastFailure {
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
// broker快速失败调度线程池
private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl(
"BrokerFastFailureScheduledThread"));
private final BrokerController brokerController;
public BrokerFastFailure(final BrokerController brokerController) {
this.brokerController = brokerController;
}
// 对线程进行转换
public static RequestTask castRunnable(final Runnable runnable) {
try {
if (runnable instanceof FutureTaskExt) {
FutureTaskExt object = (FutureTaskExt) runnable;
return (RequestTask) object.getRunnable();
}
} catch (Throwable e) {
log.error(String.format("castRunnable exception, %s", runnable.getClass().getName()), e);
}
return null;
}
public void start() {
// 注册定时调度任务
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
// 如果激活快速失败功能
if (brokerController.getBrokerConfig().isBrokerFastFailureEnable()) {
// 清理过期请求
cleanExpiredRequest();
}
}
}, 1000, 10, TimeUnit.MILLISECONDS);
}
private void cleanExpiredRequest() {
// 如果消息存储组件中的OSPageCacheBusy(系统页缓存繁忙)为true,则进行快速失败处理
while (this.brokerController.getMessageStore().isOSPageCacheBusy()) {
try {
// 取出发送线程池队列中的线程,并进行快速失败处理
if (!this.brokerController.getSendThreadPoolQueue().isEmpty()) {
final Runnable runnable = this.brokerController.getSendThreadPoolQueue().poll(0, TimeUnit.SECONDS);
if (null == runnable) {
break;
}
final RequestTask rt = castRunnable(runnable);
rt.returnResponse(RemotingSysResponseCode.SYSTEM_BUSY, String.format("[PCBUSY_CLEAN_QUEUE]broker busy, start flow control for a while, period in queue: %sms, size of queue: %d", System.currentTimeMillis() - rt.getCreateTimestamp(), this.brokerController.getSendThreadPoolQueue().size()));
} else {
break;
}
} catch (Throwable ignored) {
}
}
// 清理发送线程池队列中的过期请求
cleanExpiredRequestInQueue(this.brokerController.getSendThreadPoolQueue(),
this.brokerController.getBrokerConfig().getWaitTimeMillsInSendQueue());
// 清理拉取线程池队列中的过期请求
cleanExpiredRequestInQueue(this.brokerController.getPullThreadPoolQueue(),
this.brokerController.getBrokerConfig().getWaitTimeMillsInPullQueue());
// 清理心跳线程池队列中的过期请求
cleanExpiredRequestInQueue(this.brokerController.getHeartbeatThreadPoolQueue(),
this.brokerController.getBrokerConfig().getWaitTimeMillsInHeartbeatQueue());
// 清理事务线程池队列中的过期请求
cleanExpiredRequestInQueue(this.brokerController.getEndTransactionThreadPoolQueue(), this
.brokerController.getBrokerConfig().getWaitTimeMillsInTransactionQueue());
}
void cleanExpiredRequestInQueue(final BlockingQueue<Runnable> blockingQueue, final long maxWaitTimeMillsInQueue) {
while (true) {
try {
if (!blockingQueue.isEmpty()) {
// 死循环访问队列,将队列头部所有过期请求清理掉,直到处理到一个没有过期的请求
final Runnable runnable = blockingQueue.peek();
if (null == runnable) {
break;
}
final RequestTask rt = castRunnable(runnable);
if (rt == null || rt.isStopRun()) {
break;
}
// 计算当前请求是否过期
final long behind = System.currentTimeMillis() - rt.getCreateTimestamp();
if (behind >= maxWaitTimeMillsInQueue) {
// 过期后移除当前请求
if (blockingQueue.remove(runnable)) {
// 设置当前请求为停止状态
rt.setStopRun(true);
// 返回当前请求响应
rt.returnResponse(RemotingSysResponseCode.SYSTEM_BUSY, String.format("[TIMEOUT_CLEAN_QUEUE]broker busy, start flow control for a while, period in queue: %sms, size of queue: %d", behind, blockingQueue.size()));
}
} else {
break;
}
} else {
break;
}
} catch (Throwable ignored) {
}
}
}
public void shutdown() {
this.scheduledExecutorService.shutdown();
}
文章来源:https://blog.csdn.net/hzwangmr/article/details/134995596
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。 如若内容造成侵权/违法违规/事实不符,请联系我的编程经验分享网邮箱:veading@qq.com进行投诉反馈,一经查实,立即删除!
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。 如若内容造成侵权/违法违规/事实不符,请联系我的编程经验分享网邮箱:veading@qq.com进行投诉反馈,一经查实,立即删除!