RocketMQ 生产者源码分析:DefaultMQProducer、DefaultMQProducerImpl

2024-01-02 07:41:32

在这里插入图片描述

🔭 嗨,您好 👋 我是 vnjohn,在互联网企业担任 Java 开发,CSDN 优质创作者
📖 推荐专栏:Spring、MySQL、Nacos、Java,后续其他专栏会持续优化更新迭代
🌲文章所在专栏:RocketMQ
🤔 我当前正在学习微服务领域、云原生领域、消息中间件等架构、原理知识
💬 向我询问任何您想要的东西,ID:vnjohn
🔥觉得博主文章写的还 OK,能够帮助到您的,感谢三连支持博客🙏
😄 代词: vnjohn
? 有趣的事实:音乐、跑步、电影、游戏

目录

前言

生产者(Producer)就是消息的发送者,Apache RocketMQ 拥有丰富的消息类型,可以支持不同的应用场景,在不同的场景中,需要使用不同的消息进行发送。比如在电商交易中超时未支付关闭订单的场景,在订单创建时会发送一条延时消息。这条消息将会在 30 分钟以后投递给消费者,消费者收到此消息后需要判断对应的订单是否已完成支付。如支付未完成,则关闭订单。如已完成支付则忽略,此时就需要用到延迟消息;电商场景中,业务上要求同一订单的消息保持严格顺序,此时就要用到顺序消息。在日志处理场景中,可以接受的比较大的发送延迟,但对吞吐量的要求很高,希望每秒能处理百万条日志,此时可以使用批量消息。在银行扣款的场景中,要保持上游的扣款操作和下游的短信通知保持一致,此时就要使用事务消息

MQProducer

MQProducer 是基于最顶层的接口,它将一些子类各自行为抽象为定义

在这里插入图片描述

方法介绍

常见的几个方法定义如下:

// 采用同步的方式发送消息,此方法仅在发送过程全部完成以后才返回
SendResult send(final Message msg);

// 采用异步的方式将其发送到 Broker 中,执行该方法会立即返回,在发送完成以后,将会执行 SellCallbak 函数的内容
void send(final Message msg, final SendCallback sendCallback);

// 此方法在返回之前不会等待broker的确认。显然,它具有最大的吞吐量,但也存在消息丢失的可能性
void sendOneway(final Message msg);

// 消息指定同一个 Message-Queue 来保证消息的顺序发送
SendResult send(final Message msg, final MessageQueueSelector selector, final Object arg);

// 该方法用于发送事务类型的消息,由 TransactionMQProducer 类具体去实现
TransactionSendResult sendMessageInTransaction(final Message msg, final Object arg);

// 采用同步的方式,请求消息并回复消息时才返回
Message request(final Message msg, final long timeout);

// 采用异步的方式请求消息,该方法会立即返回,当收到回复消息以后,会调用 RequestCallback 函数的内容
void request(final Message msg, final RequestCallback requestCallback, final long timeout);

生产者发送消息有同步、异步、单向发送、顺序的方式,同步和异步的方式对消息来说是较为可靠的,因为它们会有发送是否成功的应答

在 send 方法处不 + SendCallback 实现,默认就是同步的发送方式

既然是投递消息,由于网络抖动或业务异常等原因,会影响到消息的正常投递结果,在 RocketMQ 内部有重投的机制来保证这种错误的发生

消息可靠性

生产者:重投保证消息可靠性

生产者在发送消息时,同步消息失败会重投,异步消息有重试,oneway 没有任何保证。消息重投保证消息尽可能发送成功、不丢失,但可能会造成消息重复,消息重复在 RocketMQ 中是无法避免的问题。消息重复在一般情况下不会发生,当出现消息量大、网络抖动,消息重复就会是大概率事件。另外,生产者主动重发、consumer 负载变化也会导致重复消息。如下方法可以设置消息重试策略:

  1. retryTimesWhenSendFailed:同步发送失败重投次数,默认为 2,因此生产者会最多尝试发送 retryTimesWhenSendFailed + 1次。不会选择上次失败的 broker,尝试向其他 broker 发送,最大程度保证消息不丢失。超过重投次数,抛出异常,由客户端保证消息不丢。当出现 RemotingException、MQClientException 和部分 MQBrokerException 时会重投

    由 RocketMQ 重投机制来保证的情况下,消息仍然是处于投递失败的情况,可能是因为 MQ 集群在此时处于一个不健康的状态下,在业务应用的场景下,当所处的业务特别之重要时,我们通常需要增加本地消息表来保证业务信息不丢失,通过定时任务的方式来扫描本地消息表确保业务消息数据处于健康态。

  2. retryTimesWhenSendAsyncFailed:异步发送失败重试次数,异步重试不会选择其他 broker,仅在同一个 broker 上做重试,不保证消息不丢失

    由于它处于异步的方式发送,在发送完成以后不做任何生产者侧状态的维护了,自然而然不会增加额外的维护成本去更换 Broker

  3. retryAnotherBrokerWhenNotStoreOK:消息刷盘(主或备)超时或 Slave 不可用(返回状态非 SEND_OK),是否尝试发送到其他broker,默认 false。十分重要消息可以开启

以上考虑的是在生产者侧的角度来看「如何保证消息不丢失」问题,追加消息准确的极致是采用同步的方式进行发送

单向发送:在日志传输的场景下特别常用

异步发送:追求的是响应时间,在一块业务上不要求 RT 太长.
顺序发送:业务上追加消息处理的顺序性

同步发送:企业围绕这块的重要业务开展工作的

在针对消息的重投机制,应用程序这侧要针对业务字段维护好消息的幂等性

Broker:配置降低吞吐保证消息可靠性

SEND_OK 消息发送成功,要注意的是消息发送成功也不意味着它是可靠的。要确保不会丢失任何消息,还应启用同步 Master 服务器或同步刷盘,即 SYNC_MASTER 或 SYNC_FLUSH

因为在生产者投递消息到 Broker 时,Broker 使用默认的方式进行消息落盘时,运用的仍然是内核的 page cache 页缓存机制,它只有当内存的容量达到一定数量以后才会才 Dirty 刷写到磁盘中去,在此期间,发送了机器故障或机房停电等情况下,消息是有发送丢失风险的.

DefaultMQProducer

DefaultMQProducer 是作为一个生产消息的应用程序入口,上面介绍的一些相关的参数配置都在该类能够体现

属性介绍

// 生产者组
private String producerGroup;
// 创建每个默认 Topic 的队列数
private volatile int defaultTopicQueueNums = 4;
// 发送消息超时时间
private int sendMsgTimeout = 3000;
// 压缩消息体的阈值,即默认压缩大于 4K 的消息体
private int compressMsgBodyOverHowmuch = 1024 * 4;
// 在同步模式下声明发送失败之前,要在内部执行的最大重试次数
private int retryTimesWhenSendFailed = 2;
// 在异步模式下声明发送失败之前,要在内部执行的最大重试次数
private int retryTimesWhenSendAsyncFailed = 2;
// 是否在内部重试失败时重试另外一个 Broker
private boolean retryAnotherBrokerWhenNotStoreOK = false;
// 最大允许的消息大小,以字节为单位,4M
private int maxMessageSize = 1024 * 1024 * 4;
// 异步传输的数据接口
private TraceDispatcher traceDispatcher = null;


// DefaultMQProducer 类所有方法进行内部包装实现,互相持有对方的引用!!!
// DefaultMQProducer -> DefaultMQProducerImpl
// DefaultMQProducerImpl -> DefaultMQProducer
protected final transient DefaultMQProducerImpl defaultMQProducerImpl;


// 17、14、1、16、204、205
// 当发送消息到 Broker 处时,Broker 返回的是以下这些状态码,代表是需要进行重投的!
private final Set<Integer> retryResponseCodes = new CopyOnWriteArraySet<Integer>(Arrays.asList(
  ResponseCode.TOPIC_NOT_EXIST,
  ResponseCode.SERVICE_NOT_AVAILABLE,
  ResponseCode.SYSTEM_ERROR,
  ResponseCode.NO_PERMISSION,
  ResponseCode.NO_BUYER_ID,
  ResponseCode.NOT_IN_CURRENT_UNIT
));

构造方法

对于 DefaultMQProducer 有以下几个方法的构造:

// 默认构造,默认组名:DEFAULT_PRODUCER
public DefaultMQProducer() {
  this(null, MixAll.DEFAULT_PRODUCER_GROUP, null);
}
// 构造一个指定钩子的生产者实例
// RPCHook 当执行每个远程命令调用时会触发的,可用于做一些统计指标
public DefaultMQProducer(RPCHook rpcHook) {
  this(null, MixAll.DEFAULT_PRODUCER_GROUP, rpcHook);
}

// 构造一个指定生产者组的实例
public DefaultMQProducer(final String producerGroup) {
  this(null, producerGroup, null);
}

// 构造一个指定生产者组的实例
// namespace 用于隔离生产者实例
public DefaultMQProducer(final String namespace, final String producerGroup) {
  this(namespace, producerGroup, null);
}

// 构建一个指定的命名空间、生产者组以及 Rpc 钩子
// DefaultMQProducerImpl(this:才是最重要的!!
public DefaultMQProducer(final String namespace, final String producerGroup, RPCHook rpcHook) {
  this.namespace = namespace;
  this.producerGroup = producerGroup;
  defaultMQProducerImpl = new DefaultMQProducerImpl(this, rpcHook);
}

参数配置

public void setProducerGroup(String producerGroup) {
  this.producerGroup = producerGroup;
}

public void setCreateTopicKey(String createTopicKey) {
  this.createTopicKey = createTopicKey;
}

public void setSendMsgTimeout(int sendMsgTimeout) {
  this.sendMsgTimeout = sendMsgTimeout;
}

public void setCompressMsgBodyOverHowmuch(int compressMsgBodyOverHowmuch) {
  this.compressMsgBodyOverHowmuch = compressMsgBodyOverHowmuch;
}

public void setRetryAnotherBrokerWhenNotStoreOK(boolean retryAnotherBrokerWhenNotStoreOK) {
  this.retryAnotherBrokerWhenNotStoreOK = retryAnotherBrokerWhenNotStoreOK;
}

public void setMaxMessageSize(int maxMessageSize) {
  this.maxMessageSize = maxMessageSize;
}

public void setDefaultTopicQueueNums(int defaultTopicQueueNums) {
  this.defaultTopicQueueNums = defaultTopicQueueNums;
}

public void setDefaultTopicQueueNums(int defaultTopicQueueNums) {
  this.defaultTopicQueueNums = defaultTopicQueueNums;
}

public void setRetryTimesWhenSendFailed(int retryTimesWhenSendFailed) {
  this.retryTimesWhenSendFailed = retryTimesWhenSendFailed;
}

public void setNotAvailableDuration(final long[] notAvailableDuration) {
  this.defaultMQProducerImpl.setNotAvailableDuration(notAvailableDuration);
}

public void setLatencyMax(final long[] latencyMax) {
  this.defaultMQProducerImpl.setLatencyMax(latencyMax);
}

public void setRetryTimesWhenSendAsyncFailed(final int retryTimesWhenSendAsyncFailed) {
  this.retryTimesWhenSendAsyncFailed = retryTimesWhenSendAsyncFailed;
}
名称描述参数类型默认值
producerGroup生产组的名称,一类 Producer 的标识StringDEFAULT_PRODUCER
createTopicKey发送消息的时候,如果没有找到 Topic,若想自动创建该 Topic,需要一个 Key Topic,这个值即是 Key Topic 的值StringTopicValidator.AUTO_CREATE_TOPIC_KEY_TOPIC
defaultTopicQueueNums自动创建 Topic的话,默认 Queue 数量是多少int4
sendMsgTimeout默认的发送超时时间int默认值 3000,单位毫秒
compressMsgBody
OverHowmuch
消息 Body 需要压缩的阈值int默认值 1024*4,4K
retryTimesWhenSendFailed同步发送失败的话,RocketMQ 内部重试多少次int2
retryTimes
WhenSendAsyncFailed
异步发送失败的话,RocketMQ 内部重试多少次int2
retryAnotherBroker
WhenNotStoreOK
发送的结果如果不是 SEND_OK 状态,是否当作失败处理而尝试往其他 Broker 重发booleanfalse
maxMessageSize客户端验证,允许发送的最大消息体大小int1014 * 1024 * 4,4M
traceDispatcher异步传输数据接口TraceDispatcher

latencyMax、notAvailableDuration:用于设置容错策略不可用时每个级别对应的退避时长,在发送消息时计算发送成功的前后时间然后对比策略时长,围绕的是如下数组从后到前遍历的,比如:在往 Broker-A 同步投递消息时,发送消息的时长为 3000 毫秒 3 秒,那么下次我逃避它不再往这个 Broker 上投递了,先退避 180000 毫秒 3 分钟

private long[] latencyMax = {50L, 100L, 550L, 1000L, 2000L, 3000L, 15000L};
// notAvailableDuration 参数调整的就是这个退避时长,每一个元素{可}以 latencyMax 下标一一对应
private long[] notAvailableDuration = {0L, 0L, 30000L, 60000L, 120000L, 180000L, 600000L};

retryTimesWhenSendAsyncFailed:在异步模式下声明发送失败之前,要在内部执行的最大重试次数

isAutoCreateTopicEnable:生产环境强烈建议管理所有主题的生命周期,关闭自动创建参数,以避免生产集群出现大量无效主题,无法管理和回收,造成集群注册压力增大,影响生产集群的稳定性。

DefaultMQProducerImpl

MQProducer 与 DefaultMQProducerImpl 是互相持有的一个关系,DefaultMQProducerImpl 的实例化工作交由给 MQProducer 来完成,在这里又做了一层抽象,DefaultMQProducer 的方法定义来自于 MQProducer,DefaultMQProducer 的实现来自于 DefaultMQProducerImpl

属性介绍

下面,我们来介绍 DefaultMQProducerImpl 类下具体的一些属性,如下:

// 每次发送消息都会生成一个随机数,记录日志
private final Random random = new Random();
// 持有来自实例化当前类的引用
private final DefaultMQProducer defaultMQProducer;
// 每个 Topic 所持有的 MessageQueue 以及 Broker 元数据信息
private final ConcurrentMap<String/* topic */, TopicPublishInfo> topicPublishInfoTable =
  new ConcurrentHashMap<String, TopicPublishInfo>();
// 来自发送普通消息前、后要处理的钩子
private final ArrayList<SendMessageHook> sendMessageHookList = new ArrayList<SendMessageHook>();
private final ArrayList<EndTransactionHook> endTransactionHookList = new ArrayList<EndTransactionHook>();
// 执行远程调用命令触发前后的钩子
private final RPCHook rpcHook;
// 通过异步方式发送普通消息时采用的线程池以及阻塞队列
private final BlockingQueue<Runnable> asyncSenderThreadPoolQueue;
private final ExecutorService defaultAsyncSenderExecutor;
// 通过异步方式检查本地事务消息时采用的线程池以及阻塞队列
protected BlockingQueue<Runnable> checkRequestQueue;
protected ExecutorService checkExecutor;
// 维护生产者服务的状态
private ServiceState serviceState = ServiceState.CREATE_JUST;
// 生产者、消费者都会维护一个这样的客户端示例,来与 broker 做消息的处理
private MQClientInstance mQClientFactory;
private ArrayList<CheckForbiddenHook> checkForbiddenHookList = new ArrayList<CheckForbiddenHook>();
private int zipCompressLevel = Integer.parseInt(System.getProperty(MixAll.MESSAGE_COMPRESS_LEVEL, "5"));
// MQ 提供的容错机制,针对 sendLatencyFaultEnable 参数是否开启故障转移的功能
private MQFaultStrategy mqFaultStrategy = new MQFaultStrategy();
// defaultAsyncSenderExecutor | asyncSenderExecutor 在发送消息采用的那个线程池
private ExecutorService asyncSenderExecutor;

介绍一些核心的属性,在生产者这一侧做了一些特别的处理:

  1. topicPublishInfoTable 属性:维护的是一个 Map 集合,Key -> Topic、Value -> Topic MessageQueue 以及 Broker 元数据信息,注意的是,它不是在一开始实例化时将所有 Topic 信息都加载好的,而是采用延迟的方式在启动生产者客户端实例通过定时线程去加载的,核心方法是:tryToFindTopicPublishInfo

  2. defaultAsyncSenderExecutor | asyncSenderExecutor、asyncSenderThreadPoolQueue:在这里有两个线程池,二选一的决策,我们在构建生产者实例时是可以自定义一个线程池「分配资源」去投递消息的,自定义的线程池说的就是 asyncSenderExecutor,未自定义线程池时用的就是 defaultAsyncSenderExecutor,配合使用的阻塞队列就是 asyncSenderThreadPoolQueue

    this.defaultAsyncSenderExecutor = new ThreadPoolExecutor(
      Runtime.getRuntime().availableProcessors(),
      Runtime.getRuntime().availableProcessors(),
      1000 * 60,
      TimeUnit.MILLISECONDS,
      this.asyncSenderThreadPoolQueue,
      new ThreadFactory() {
        private AtomicInteger threadIndex = new AtomicInteger(0);
    
        @Override
        public Thread newThread(Runnable r) {
          return new Thread(r, "AsyncSenderExecutor_" + this.threadIndex.incrementAndGet());
        }
      });
    
  3. checkExecutor、checkRequestQueue:该线程池是用于处理事务型消息的,检查本地事务消息的方法是采用异步回调的方式,也是支持自定义线程池的,若不指定,默认采用的是单线程的方式去检查本地事务消息状态的
    在这里插入图片描述

  4. mQClientFactory:在每个应用程序内,都是单个实例的存在,无论是生产者端还是消费者端,都会有这么一个类去充当 RocketMQ 的客户端角色,它用来与服务端 Broker 交互,在它内部有一个 MQClientAPIImpl 客户端 API 类,它用来发起远程调用请求以及响应请求的,具体的核心作用我们后续再介绍

  5. mqFaultStrategy:MQ 提供的容错机制,在生产者投递消息时做故障转移以及降级作用的

在这里介绍一个知识点,在 RocketMQ 无论是生产消息还是消费消息都是采用负载均衡的方式去进行处理的,针对的其实就是 TopicPublishInfo 的处理,Producer 生产者端在发送消息的时候,会先根据 Topic 找到指定的 TopicPublishInfo,在获取了 TopicPublishInfo 路由信息后,RocketMQ 的客户端在默认方式下 MQFaultStrategy#selectOneMessageQueue 方法会从 TopicPublishInfo 中 messageQueueList 中选择一个队列(MessageQueue)进行发送消息,同时在这里结合了容错策略的处理是以上介绍的另外一个属性:mqFaultStrategy

具体的容错策略均在 MQFaultStrategy 这个类中定义。这里有一个 sendLatencyFaultEnable 开关变量,如果开启,在随机递增取模的基础上,再过滤掉 not available 的 Broker 代理。所谓的"latencyFaultTolerance",是指对之前失败的,按一定的时间做退避。例如,如果上次请求的 latency 超过 550L ms,就退避 30000Lms;超过 1000L,就退避 60000L;如果关闭,采用随机递增取模的方式选择一个队列(MessageQueue)来发送消息,latencyFaultTolerance 机制的开启是实现消息发送高可用的核心关键所

在一个 Topic 下有多个 MessageQueue,在集群模式下, 一组 MessageQueue 是分散在多个 Broker 节点下的

在这里插入图片描述

构造方法

public DefaultMQProducerImpl(final DefaultMQProducer defaultMQProducer) {
  this(defaultMQProducer, null);
}

public DefaultMQProducerImpl(final DefaultMQProducer defaultMQProducer, RPCHook rpcHook) {
  this.defaultMQProducer = defaultMQProducer;
  this.rpcHook = rpcHook;

  this.asyncSenderThreadPoolQueue = new LinkedBlockingQueue<Runnable>(50000);
  this.defaultAsyncSenderExecutor = new ThreadPoolExecutor(
    Runtime.getRuntime().availableProcessors(),
    Runtime.getRuntime().availableProcessors(),
    1000 * 60,
    TimeUnit.MILLISECONDS,
    this.asyncSenderThreadPoolQueue,
    new ThreadFactory() {
      private AtomicInteger threadIndex = new AtomicInteger(0);

      @Override
      public Thread newThread(Runnable r) {
        return new Thread(r, "AsyncSenderExecutor_" + this.threadIndex.incrementAndGet());
      }
    });
}

在它的构造方法中,只是将投递异步消息默认的线程池构造了出来,并未做其他的处理,

ClientConfig

客户端常用配置:适用于生产者、消费者

DefaultMQProducer、TransactionMQProducer、DefaultMQPushConsumer、DefaultMQPullConsumer 都继承于 ClientConfig 类,ClientConfig 为客户端的公共配置类。客户端的配置都是 getter、setter 形式,每个参数都可以用 Spring 来配置,也可以在代码中配置,例如 namesrvAddr 这个参数可以这样配置,producer.setNamesrvAddr(“192.168.0.1:9876”),其他参数同理

名称描述参数类型默认值
namesrvAddrNameServer的地址列表String从 -D 系统参数 rocketmq.namesrv.addr 或环境变量
instanceName客户端实例名称String从 -D 系统参数 rocketmq.client.name 获取,否则就是 DEFAULT
clientIP客户端IPStringRemotingUtil.getLocalAddress()
namespace客户端命名空间String
accessChannel设置访问通道AccessChannelLOCAL
clientCallbackExecutorThreads客户端通信层接收到网络请求的时候,处理器的核数intRuntime.getRuntime().availableProcessors()
pollNameServerInterval轮询从 NameServer 获取路由信息的时间间隔int默认值 30000,单位毫秒
heartbeatBrokerInterval定期发送注册心跳到 broker 的间隔int默认值 30000,单位毫秒
persistConsumerOffsetInterval作用于 Consumer,持久化消费进度的间隔int默认值 5000,单位毫秒
pullTimeDelayMillsWhenException拉取消息出现异常的延迟时间设置long默认值 1000,单位毫秒
vipChannelEnabled是否启用vip netty通道以发送消息boolean从 -D com.rocketmq.sendMessageWithVIPChannel 参数的值,若无则是 true
mqClientApiTimeoutMQ 客户端 API 超时设置默认值 3000,单位毫秒

总结

该篇博文主要介绍生产者侧相关的类:MQProducer 能力定义者、DefaultMQProducer 能力抽象者、DefaultMQProducerImpl 能力实现者、ClientConfig RocketMQ 客户端公共的配置类,主要围绕它们内部的参数配置以及属性先初步开展介绍了一下,后续主要在启动 Producer 以及通过 Producer 发送消息时,在生产者这一侧处理的工作,希望该篇博文你能够喜欢,感谢三连支持??

🌟🌟🌟愿你我都能够在寒冬中相互取暖,互相成长,只有不断积累、沉淀自己,后面有机会自然能破冰而行!

博文放在 RocketMQ 专栏里,欢迎订阅,会持续更新!

如果觉得博文不错,关注我 vnjohn,后续会有更多实战、源码、架构干货分享!

推荐专栏:Spring、MySQL,订阅一波不再迷路

大家的「关注?? + 点赞👍 + 收藏?」就是我创作的最大动力!谢谢大家的支持,我们下文见!

文章来源:https://blog.csdn.net/vnjohn/article/details/135331910
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。