RocketMQ源码 Broker-BrokerOuterAPI Broker向NameServer发送请求组件源码分析
2023-12-14 09:06:14
前言
BrokerOuterAPI Broker向NameServer命名集群发送请求组件,主要负责使用remotingClient 网络通信客户端向NameServer集群发送请求。主要操作包括:注册broker信息到NameServer集群、取消注册Broker、询问NameServer集群是否需要更新Broker的注册信息、获取所有topic配置、获取所有消费者配置、获取所有延迟消息、获取所有消费者订阅配置。
源码版本:4.9.3
源码架构图
核心数据结构
包含:
- 网络通信客户端组件,用于发起rpc请求;
- 地址管理组件,从Web Service URL 获取 NameServer URL 地址列表;
- broker外部调用线程池,用于异步并发执行网络请求。
// broker和NameServer通信组件
public class BrokerOuterAPI {
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
// 网络通信客户端组件
private final RemotingClient remotingClient;
// NameServer地址管理组件,从Web Service URL 抓取NameServer地址
private final TopAddressing topAddressing = new TopAddressing(MixAll.getWSAddr());
// NameServer地址
private String nameSrvAddr = null;
// broker 外部调用线程池,,用于异步调用NameServer,核心线程数4个,最大线程数10个,线程存活时间1分钟,队列长度32
private BrokerFixedThreadPoolExecutor brokerOuterExecutor = new BrokerFixedThreadPoolExecutor(4, 10, 1, TimeUnit.MINUTES,
new ArrayBlockingQueue<Runnable>(32), new ThreadFactoryImpl("brokerOutApi_thread_", true));
}
核心行为
主要支持的行为,都在以下代码中写了关键注释,其中注册broker节点信息到 NameServer集群时,使用了CountDownLatch 倒计时锁,进行并发控制,涉及并发编程知识点实践,可以多关注一下。
// broker和NameServer通信组件
public class BrokerOuterAPI {
public BrokerOuterAPI(final NettyClientConfig nettyClientConfig) {
this(nettyClientConfig, null);
}
public BrokerOuterAPI(final NettyClientConfig nettyClientConfig, RPCHook rpcHook) {
this.remotingClient = new NettyRemotingClient(nettyClientConfig);
this.remotingClient.registerRPCHook(rpcHook);
}
// 启动网络通信客户端组件
public void start() {
this.remotingClient.start();
}
// 关闭网络通信客户端组件,关闭线程池
public void shutdown() {
this.remotingClient.shutdown();
this.brokerOuterExecutor.shutdown();
}
// 使用NameServer地址管理组件从Web Service URL 抓取NameServer地址
public String fetchNameServerAddr() {
try {
String addrs = this.topAddressing.fetchNSAddr();
if (addrs != null) {
if (!addrs.equals(this.nameSrvAddr)) {
// NameServer地址发生变化
log.info("name server address changed, old: {} new: {}", this.nameSrvAddr, addrs);
// 更新NameServer地址列表
this.updateNameServerAddressList(addrs);
// 更新NameServer地址
this.nameSrvAddr = addrs;
return nameSrvAddr;
}
}
} catch (Exception e) {
log.error("fetchNameServerAddr Exception", e);
}
return nameSrvAddr;
}
public void updateNameServerAddressList(final String addrs) {
List<String> lst = new ArrayList<String>();
String[] addrArray = addrs.split(";");
for (String addr : addrArray) {
lst.add(addr);
}
// 更新NameServer地址列表
this.remotingClient.updateNameServerAddressList(lst);
}
// 注册broker到NameServer
public List<RegisterBrokerResult> registerBrokerAll(
final String clusterName, // broker集群名称
final String brokerAddr, // broker地址
final String brokerName, // broker名称
final long brokerId, // brokerId
final String haServerAddr, // broker所在组haServer地址
final TopicConfigSerializeWrapper topicConfigWrapper, // topic配置
final List<String> filterServerList, // 过滤器服务器地址列表
final boolean oneway, // 是否是oneway调用
final int timeoutMills, // 超时时间
final boolean compressed // 是否压缩
) {
final List<RegisterBrokerResult> registerBrokerResultList = new CopyOnWriteArrayList<>();
// 获取name server地址列表
List<String> nameServerAddressList = this.remotingClient.getNameServerAddressList();
if (nameServerAddressList != null && nameServerAddressList.size() > 0) {
// 构造注册broker信息的请求头
final RegisterBrokerRequestHeader requestHeader = new RegisterBrokerRequestHeader();
requestHeader.setBrokerAddr(brokerAddr);
requestHeader.setBrokerId(brokerId);
requestHeader.setBrokerName(brokerName);
requestHeader.setClusterName(clusterName);
requestHeader.setHaServerAddr(haServerAddr);
requestHeader.setCompressed(compressed);
// 构造注册broker信息的请求体
RegisterBrokerBody requestBody = new RegisterBrokerBody();
requestBody.setTopicConfigSerializeWrapper(topicConfigWrapper);
requestBody.setFilterServerList(filterServerList);
final byte[] body = requestBody.encode(compressed);
final int bodyCrc32 = UtilAll.crc32(body);
requestHeader.setBodyCrc32(bodyCrc32);
// 并发执行,对所有NameServer地址进行注册
final CountDownLatch countDownLatch = new CountDownLatch(nameServerAddressList.size());
for (final String namesrvAddr : nameServerAddressList) {
// 异步执行
brokerOuterExecutor.execute(new Runnable() {
@Override
public void run() {
try {
RegisterBrokerResult result = registerBroker(namesrvAddr, oneway, timeoutMills, requestHeader, body);
if (result != null) {
registerBrokerResultList.add(result);
}
log.info("register broker[{}]to name server {} OK", brokerId, namesrvAddr);
} catch (Exception e) {
log.warn("registerBroker Exception, {}", namesrvAddr, e);
} finally {
countDownLatch.countDown();
}
}
});
}
try {
// 阻塞等待,异步执行结果
countDownLatch.await(timeoutMills, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
}
}
return registerBrokerResultList;
}
private RegisterBrokerResult registerBroker(
final String namesrvAddr,
final boolean oneway,
final int timeoutMills,
final RegisterBrokerRequestHeader requestHeader,
final byte[] body
) throws RemotingCommandException, MQBrokerException, RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException,
InterruptedException {
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.REGISTER_BROKER, requestHeader);
request.setBody(body);
if (oneway) {
try {
// 执行oneway方式远程请求,注册broker信息到name server
this.remotingClient.invokeOneway(namesrvAddr, request, timeoutMills);
} catch (RemotingTooMuchRequestException e) {
// Ignore
}
return null;
}
// 同步执行远程请求,注册broker信息到name server
RemotingCommand response = this.remotingClient.invokeSync(namesrvAddr, request, timeoutMills);
assert response != null;
switch (response.getCode()) {
case ResponseCode.SUCCESS: {
RegisterBrokerResponseHeader responseHeader =
(RegisterBrokerResponseHeader) response.decodeCommandCustomHeader(RegisterBrokerResponseHeader.class);
RegisterBrokerResult result = new RegisterBrokerResult();
result.setMasterAddr(responseHeader.getMasterAddr());
result.setHaServerAddr(responseHeader.getHaServerAddr());
if (response.getBody() != null) {
result.setKvTable(KVTable.decode(response.getBody(), KVTable.class));
}
return result;
}
default:
break;
}
throw new MQBrokerException(response.getCode(), response.getRemark(), requestHeader == null ? null : requestHeader.getBrokerAddr());
}
// 取消注册broker
public void unregisterBrokerAll(
final String clusterName,
final String brokerAddr,
final String brokerName,
final long brokerId
) {
List<String> nameServerAddressList = this.remotingClient.getNameServerAddressList();
if (nameServerAddressList != null) {
for (String namesrvAddr : nameServerAddressList) {
try {
this.unregisterBroker(namesrvAddr, clusterName, brokerAddr, brokerName, brokerId);
log.info("unregisterBroker OK, NamesrvAddr: {}", namesrvAddr);
} catch (Exception e) {
log.warn("unregisterBroker Exception, {}", namesrvAddr, e);
}
}
}
}
// 取消注册broker
public void unregisterBroker(
final String namesrvAddr, // 指定了name server地址
final String clusterName,
final String brokerAddr,
final String brokerName,
final long brokerId
) throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, InterruptedException, MQBrokerException {
UnRegisterBrokerRequestHeader requestHeader = new UnRegisterBrokerRequestHeader();
requestHeader.setBrokerAddr(brokerAddr);
requestHeader.setBrokerId(brokerId);
requestHeader.setBrokerName(brokerName);
requestHeader.setClusterName(clusterName);
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.UNREGISTER_BROKER, requestHeader);
RemotingCommand response = this.remotingClient.invokeSync(namesrvAddr, request, 3000);
assert response != null;
switch (response.getCode()) {
case ResponseCode.SUCCESS: {
return;
}
default:
break;
}
throw new MQBrokerException(response.getCode(), response.getRemark(), brokerAddr);
}
// 判断是否需要注册broker,拿当前broker的数据版本号,与所有nameServer的数据版本号进行对比,有一个不一样,就需要注册
public List<Boolean> needRegister(
final String clusterName,
final String brokerAddr,
final String brokerName,
final long brokerId,
final TopicConfigSerializeWrapper topicConfigWrapper,
final int timeoutMills) {
final List<Boolean> changedList = new CopyOnWriteArrayList<>();
// 从网络通信客户端组件中,拿到所有的NameServer地址
List<String> nameServerAddressList = this.remotingClient.getNameServerAddressList();
if (nameServerAddressList != null && nameServerAddressList.size() > 0) {
// 用倒计时锁,进行并发处理,且控制超时时间
final CountDownLatch countDownLatch = new CountDownLatch(nameServerAddressList.size());
for (final String namesrvAddr : nameServerAddressList) {
// 异步处理
brokerOuterExecutor.execute(new Runnable() {
@Override
public void run() {
try {
QueryDataVersionRequestHeader requestHeader = new QueryDataVersionRequestHeader();
requestHeader.setBrokerAddr(brokerAddr);
requestHeader.setBrokerId(brokerId);
requestHeader.setBrokerName(brokerName);
requestHeader.setClusterName(clusterName);
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.QUERY_DATA_VERSION, requestHeader);
request.setBody(topicConfigWrapper.getDataVersion().encode());
// 发起远程调用,基于netty长连接
RemotingCommand response = remotingClient.invokeSync(namesrvAddr, request, timeoutMills);
DataVersion nameServerDataVersion = null;
Boolean changed = false;
switch (response.getCode()) {
case ResponseCode.SUCCESS: {
QueryDataVersionResponseHeader queryDataVersionResponseHeader =
(QueryDataVersionResponseHeader) response.decodeCommandCustomHeader(QueryDataVersionResponseHeader.class);
changed = queryDataVersionResponseHeader.getChanged();
byte[] body = response.getBody();
if (body != null) {
nameServerDataVersion = DataVersion.decode(body, DataVersion.class);
if (!topicConfigWrapper.getDataVersion().equals(nameServerDataVersion)) {
changed = true;
}
}
if (changed == null || changed) {
changedList.add(Boolean.TRUE);
}
}
default:
break;
}
log.warn("Query data version from name server {} OK,changed {}, broker {},name server {}", namesrvAddr, changed, topicConfigWrapper.getDataVersion(), nameServerDataVersion == null ? "" : nameServerDataVersion);
} catch (Exception e) {
changedList.add(Boolean.TRUE);
log.error("Query data version from name server {} Exception, {}", namesrvAddr, e);
} finally {
countDownLatch.countDown();
}
}
});
}
try {
// 倒计时锁等待,所有线程执行完毕
countDownLatch.await(timeoutMills, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
log.error("query dataversion from nameserver countDownLatch await Exception", e);
}
}
return changedList;
}
// 获取所有的topic配置
public TopicConfigSerializeWrapper getAllTopicConfig(
final String addr) throws RemotingConnectException, RemotingSendRequestException,
RemotingTimeoutException, InterruptedException, MQBrokerException {
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_ALL_TOPIC_CONFIG, null);
RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(true, addr), request, 3000);
assert response != null;
switch (response.getCode()) {
case ResponseCode.SUCCESS: {
return TopicConfigSerializeWrapper.decode(response.getBody(), TopicConfigSerializeWrapper.class);
}
default:
break;
}
throw new MQBrokerException(response.getCode(), response.getRemark(), addr);
}
// 获取所有的消费者offset
public ConsumerOffsetSerializeWrapper getAllConsumerOffset(
final String addr) throws InterruptedException, RemotingTimeoutException,
RemotingSendRequestException, RemotingConnectException, MQBrokerException {
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_ALL_CONSUMER_OFFSET, null);
RemotingCommand response = this.remotingClient.invokeSync(addr, request, 3000);
assert response != null;
switch (response.getCode()) {
case ResponseCode.SUCCESS: {
return ConsumerOffsetSerializeWrapper.decode(response.getBody(), ConsumerOffsetSerializeWrapper.class);
}
default:
break;
}
throw new MQBrokerException(response.getCode(), response.getRemark(), addr);
}
// 获取所有的延迟消息
public String getAllDelayOffset(
final String addr) throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException,
RemotingConnectException, MQBrokerException, UnsupportedEncodingException {
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_ALL_DELAY_OFFSET, null);
RemotingCommand response = this.remotingClient.invokeSync(addr, request, 3000);
assert response != null;
switch (response.getCode()) {
case ResponseCode.SUCCESS: {
return new String(response.getBody(), MixAll.DEFAULT_CHARSET);
}
default:
break;
}
throw new MQBrokerException(response.getCode(), response.getRemark(), addr);
}
// 获取所有的消费组订阅配置
public SubscriptionGroupWrapper getAllSubscriptionGroupConfig(
final String addr // nameserver地址
) throws InterruptedException, RemotingTimeoutException,
RemotingSendRequestException, RemotingConnectException, MQBrokerException {
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_ALL_SUBSCRIPTIONGROUP_CONFIG, null);
RemotingCommand response = this.remotingClient.invokeSync(addr, request, 3000);
assert response != null;
switch (response.getCode()) {
case ResponseCode.SUCCESS: {
return SubscriptionGroupWrapper.decode(response.getBody(), SubscriptionGroupWrapper.class);
}
default:
break;
}
throw new MQBrokerException(response.getCode(), response.getRemark(), addr);
}
// 注册RPC钩子
public void registerRPCHook(RPCHook rpcHook) {
remotingClient.registerRPCHook(rpcHook);
}
}
文章来源:https://blog.csdn.net/hzwangmr/article/details/134977101
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。 如若内容造成侵权/违法违规/事实不符,请联系我的编程经验分享网邮箱:veading@qq.com进行投诉反馈,一经查实,立即删除!
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。 如若内容造成侵权/违法违规/事实不符,请联系我的编程经验分享网邮箱:veading@qq.com进行投诉反馈,一经查实,立即删除!