【RocketMQ每日一问】RocketMQ消息追踪如何实现的?
1.概述
RocketMQ 的消息追踪功能是在 4.3.0 版本中引入的。在这个版本中,RocketMQ 引入了一个名为 Tracing 的模块,用于实现消息追踪功能。
Tracing 模块基于 Opentracing 标准实现,提供消息追踪的功能,可以帮助用户追踪消息的生产和消费过程,以及定位潜在的性能问题和错误。
Tracing 模块包括生产者追踪、消费者追踪和消息事务追踪三种模式。生产者追踪可以追踪消息的发送过程,消费者追踪可以追踪消息的消费过程,消息事务追踪可以追踪消息事务的执行过程。
在消息追踪功能中,消息会被标记上一个 Trace ID,这个 Trace ID 可以跨越整个消息的生产和消费过程,从而实现对消息的全链路追踪和监控。同时,RocketMQ 还提供了相关的工具和接口,方便用户对消息追踪数据进行收集和分析。
2.生产trace
- 生产者开启消息追踪
DefaultMQProducer producer = new DefaultMQProducer(PRODUCER_GROUP, true);
-
开启之后会注册两个hook,一个正常发消息的,一个是事务消息的
AsyncTraceDispatcher dispatcher = new AsyncTraceDispatcher(producerGroup, TraceDispatcher.Type.PRODUCE, customizedTraceTopic, rpcHook); dispatcher.setHostProducer(this.defaultMQProducerImpl); traceDispatcher = dispatcher; this.defaultMQProducerImpl.registerSendMessageHook( new SendMessageTraceHookImpl(traceDispatcher)); this.defaultMQProducerImpl.registerEndTransactionHook( new EndTransactionTraceHookImpl(traceDispatcher));
-
org.apache.rocketmq.client.trace.hook.SendMessageTraceHookImpl这个里面记录了追踪消息的格式,并且发送完成消息之后调用hook,将追踪消息记录到一个traceContextQueue中
-
然后有任务拿出来,达到4M之后AsyncDataSendTask异步发送到默认追踪topic中RMQ_SYS_TRACE_TOPIC
追踪消息的格式
?org.apache.rocketmq.client.trace.TraceContext
3.消费trace
- 同样的在消费者启动的时候注册了一个hookorg.apache.rocketmq.client.trace.hook.ConsumeMessageTraceHookImpl#ConsumeMessageTraceHookImpl
- 同样的在ConsumeMessageTraceHookImpl类中构造出追踪context,不同的是contextType为ConsumeContextType
- 然后放入traceContextQueue中
- 然后有任务拿出来,达到4M之后AsyncDataSendTask异步发送到默认追踪topic中RMQ_SYS_TRACE_TOPIC
4.生产者追踪消息和消费者的追踪消息如何关联起来
在 RocketMQ 中,生产者发送的追踪消息和消费者发送的追踪消息通过msgId 属性值进行关联。在生产者发送消息时,生成一个唯一的 msgId,并将其作为 Trace 数据中的 msgId属性值保存到消息中。在消费者消费消息时,消费者会从消息中取出 msgId属性值,并将其作为 msgId属性 值,用于查询和匹配 Trace 数据。通过 msgId 属性值,可以将生产者发送的追踪消息和消费者发送的追踪消息进行关联,实现对消息全链路的追踪和监控。
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。 如若内容造成侵权/违法违规/事实不符,请联系我的编程经验分享网邮箱:veading@qq.com进行投诉反馈,一经查实,立即删除!