RocketMQ系统性学习-SpringCloud Alibaba集成RocketMQ以及事务消息收发、最大重试消费实战
2023-12-18 11:31:16
事务消息收发
流程如下:
- 发送给 MQ 一条任务操作
- MQ 的 Broker 成功收到后,那么发送方就开始执行原子 db 业务
- 如果执行原子 db 业务失败,并没有将执行成功状态同步给 Broker
- 那么 Broker 会去检查 db 事务是否成功,最后要么事务提交,可以被生产者消费,要么事务回滚,生产者无法消费
事务消息收发流程图如下:
事务消息收发消费者如下:
public class TransactionConsumer {
public static void main(String[] args) throws Exception {
// 1、创建消费者对象
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("TransactionConsumer");
// 2、为消费者对象设置 NameServer 地址
consumer.setNamesrvAddr("127.0.0.1:9876");
// 3、订阅主题
consumer.subscribe("Transaction-Test-Topic", "*");
// 4、注册监听消息,并打印消息
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
for (MessageExt msg : list) {
String printMsg = new String(msg.getBody()) + ", recvTime: "
+ new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS").format(new Date());
System.out.println(printMsg);
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
// 5、把消费者直接启动起来
consumer.start();
System.out.println("Consumer Started Finished.");
}
}
这里模拟事务成功执行的生产者,执行该生产者之后,消费者可以收到消息并消费:
public class TransactionProducer {
public static void main(String[] args) throws Exception {
TransactionMQProducer producer = new TransactionMQProducer(
"transaction_producer_group");
producer.setNamesrvAddr("127.0.0.1:9876");
producer.setTransactionListener(new TransactionListener() {
@Override
public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
/**
* 这里执行本地事务,如果本地事务执行成功,就返回成功
* 如果本地事务失败,就返回失败
*/
return LocalTransactionState.COMMIT_MESSAGE;
}
@Override
public LocalTransactionState checkLocalTransaction(MessageExt msg) {
// 触发事务的检查,提供给到生产者一个检查事务是否成功提交的机会
return LocalTransactionState.COMMIT_MESSAGE;
}
});
producer.start();
List<Order> list = new ArrayList<>();
for (int i = 0; i < 12; i ++) {
Order order = new Order();
order.orderId = i;
order.desc = "desc:" + i;
order.tag = "tag" + i % 3;
list.add(order);
}
for (Order order : list) {
Message msg = new Message(
"Transaction-Test-Topic",
order.tag,
(order.toString()).getBytes());
msg.setKeys("Transaction_Tag");
msg.putUserProperty("idx", new DecimalFormat("00").format(order.orderId));
// 直接将 msg 发送出去
producer.sendMessageInTransaction(msg, null);
}
System.out.println("Send Finished.");
}
public static class Order {
int orderId;
String desc;
String tag;
@Override
public String toString() {
return "orderId="+orderId+", desc="+desc+", tag="+tag;
}
}
}
这里模拟事务执行失败的生产者,执行该生产者之后,消费者不会收到消息:
public class TransactionProducerFail {
public static void main(String[] args) throws Exception {
TransactionMQProducer producer = new TransactionMQProducer(
"transaction_producer_group_fail");
producer.setNamesrvAddr("127.0.0.1:9876");
producer.setTransactionListener(new TransactionListener() {
@Override
public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
/**
* 这里执行本地事务,如果本地事务执行成功,就返回成功
* 如果本地事务失败,就返回失败
*/
return LocalTransactionState.UNKNOW;
}
@Override
public LocalTransactionState checkLocalTransaction(MessageExt msg) {
// 触发事务的检查,提供给到生产者一个检查事务是否成功提交的机会
return LocalTransactionState.ROLLBACK_MESSAGE;
}
});
producer.start();
List<TransactionProducer.Order> list = new ArrayList<>();
for (int i = 0; i < 12; i ++) {
TransactionProducer.Order order = new TransactionProducer.Order();
order.orderId = i;
order.desc = "desc:" + i;
order.tag = "tag" + i % 3;
list.add(order);
}
for (TransactionProducer.Order order : list) {
Message msg = new Message(
"Transaction-Test-Topic",
order.tag,
(order.toString()).getBytes());
msg.setKeys("Transaction_Tag");
msg.putUserProperty("idx", new DecimalFormat("00").format(order.orderId));
// 直接将 msg 发送出去
producer.sendMessageInTransaction(msg, null);
}
System.out.println("Send Finished.");
}
public static class Order {
int orderId;
String desc;
String tag;
@Override
public String toString() {
return "orderId="+orderId+", desc="+desc+", tag="+tag;
}
}
}
最大重试消费
重试分为两种:生产者重试、消费者重试
生产者重试设置:
-
生产者配置重试次数
// 同步 producer.setRetryTimesWhenSendFailed(3) // 异步 producer.setRetryTimesWhenSendAsyncFailed(3); // 如果发送失败,是否尝试发送到其他 Broker 节点 producer.setRetryAnotherBrokerWhenNotStoreOK(true);
-
生产者设置重试的策略
producer.addRetryResponseCode(ResponseCode.FLUSH_DISK_TIMEOUT);
?
消费者重试设置:
- 消费者有序消费时,如果消费失败,返回
ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT
即可 - 消费者并发消费时,如果消费失败,返回
ConsumeConcurrentlyStatus.RECONSUME_LATER
即可
生产者代码如下(消费者代码就不贴了,只需要消费时返回需要重试的状态码即可):
public class RetryProducer {
public static void main(String[] args) throws Exception {
DefaultMQProducer producer = new DefaultMQProducer(
"producer_group",
true);
producer.setNamesrvAddr("127.0.0.1:9876");
// 设置一些重试的策略
producer.addRetryResponseCode(ResponseCode.FLUSH_DISK_TIMEOUT);
// 设置发送失败最大重试次数
producer.setRetryTimesWhenSendFailed(3);
producer.setRetryTimesWhenSendAsyncFailed(3);
producer.start();
List<Order> list = new ArrayList<>();
for (int i = 0; i < 12; i ++) {
Order order = new Order();
order.orderId = i;
order.desc = "desc:" + i;
order.tag = "tag" + i % 3;
list.add(order);
}
for (Order order : list) {
Message msg = new Message(
"Filter-Test-Topic",
order.tag,
(order.toString()).getBytes());
msg.setKeys("Filter_Tag");
msg.putUserProperty("idx", new DecimalFormat("00").format(order.orderId));
// 直接将 msg 发送出去
producer.send(msg);
}
System.out.println("Send Finished.");
}
public static class Order {
int orderId;
String desc;
String tag;
@Override
public String toString() {
return "orderId="+orderId+", desc="+desc+", tag="+tag;
}
}
}
文章来源:https://blog.csdn.net/qq_45260619/article/details/135052598
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。 如若内容造成侵权/违法违规/事实不符,请联系我的编程经验分享网邮箱:veading@qq.com进行投诉反馈,一经查实,立即删除!
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。 如若内容造成侵权/违法违规/事实不符,请联系我的编程经验分享网邮箱:veading@qq.com进行投诉反馈,一经查实,立即删除!