RocketMq实战
目录
Mq生产消费架构
- 生产者设置nameServer地址, 往topic发信息
- mq服务启动topic时候,自动给每一个topic创建MessageQueue,默认1:2
- 默认Push模式推送消息, MessageQueue把消息推送给消费者, 一个MessageQueue只能绑定一个消费实例,一个消费实例能对应多个MessageQueue, 也是保证顺序消费的基础??
生产者
发送消息固定步骤
1.创建消息生产者producer,并指定生产者组名
2.指定Nameserver地址, 也可配置环境变量NAMESRV_ADDR
3.启动producer。 可以认为这是消息生产者与服务端建立连接的过程。
4.创建消息对象,指定主题Topic、Tag和消息体
5.发送消息
6.关闭生产者producer,释放资源
发送模式
1. 单向发送
// 发送单向消息
producer.sendOneway(buildOneMessage());
2. 同步发送
// 同步发送消息
SendResult sendResult = producer.send(buildOneMessage());
System.out.println("同步发送消息结果:" + sendResult);
3. 异步发送
// 异步发送消息, 发送完回调, 注意:回调完之前, producer不能关闭
producer.send(buildOneMessage(), new SendCallback() {
// 成功回调
@Override
public void onSuccess(SendResult sendResult) {
System.out.println("异步发送消息结果:" + sendResult);
}
// broker处理失败回调, 超时回调, 进入onException也可能发送下游成功, 做好幂等性
@Override
public void onException(Throwable throwable) {
System.out.println("异步发送消息出现异常:" + throwable.getMessage());
}
});
生产消息完整代码
public class ProducerTest {
public static void main(String[] args) throws MQClientException, RemotingException, InterruptedException, MQBrokerException {
DefaultMQProducer producer = new DefaultMQProducer("producer_group");
producer.setNamesrvAddr("192.168.6.128:9876");
// 启动生产者
producer.start();
// 发送单向消息
producer.sendOneway(buildOneMessage());
System.out.println("单向发送消息, 不知道结果");
// 同步发送消息
SendResult sendResult = producer.send(buildOneMessage());
System.out.println("同步发送消息结果:" + sendResult);
// 异步发送消息, 发送完回调, 注意:回调完之前, producer不能关闭
producer.send(buildOneMessage(), new SendCallback() {
// 成功回调
@Override
public void onSuccess(SendResult sendResult) {
System.out.println("异步发送消息结果:" + sendResult);
}
// broker处理失败回调, 超时回调, 进入onException也可能发送下游成功, 做好幂等性
@Override
public void onException(Throwable throwable) {
System.out.println("异步发送消息出现异常:" + throwable.getMessage());
}
});
// producer.shutdown();
}
private static Message buildOneMessage() {
return new Message("kk_test_topic", "tagA", ("hello MQ" + new Random().nextInt(100)).getBytes());
}
}
消费者
消费消息固定步骤
1.创建消费者Consumer,必须指定消费者组名
2.指定Nameserver地址
3.订阅主题Topic和Tag
4.设置回调函数,处理消息
5.启动消费者consumer。消费者会一直挂起,持续处理消息。
简单消费代码示例
public class SimpleConsumer {
public static void main(String[] args) throws MQClientException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("kk_consumer_group1");
consumer.setNamesrvAddr("192.168.6.128:9876");
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
consumer.subscribe("kk_test_topic", "*");
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
for (MessageExt messageExt : list) {
System.out.println("收到消息:" + new String(messageExt.getBody()));
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
System.out.println("consumer start ...");
}
}
消费确认机制
消息消费状态有以下两种:
-
CONSUME_SUCCESS: 消费成功
-
RECONSUME_LATER: 等会再来消费, 默认最多重试16次.
如果消费者没有处理成功,返回的是RECONSUME_LATER,Broker就会过一段时间再发起消息重试。
重试逻辑:
????????每一个消费者组都会自动建一个重试队列, 如果消费者返回RECONSUME_LATER状态, 就会把消息转发到重试Topic,这样设计就可以不阻塞原有的messageQueue.如果一个消息经过多次(默认16)重试还无法正常处理, 就会把消息转发到对应的死信Topic.这时候, 可以对死信Topic的消息进行人工处理, 消息补偿或者直接删除.
消息模型
广播消息
消费者端可以设置两种消息模式, 广播模式和集群模式(默认)
- 集群模式: 同一个消费者群组下, 一条消息只能被该消费者群组下面其中一个消费者消费
- 广播模式: 会忽略消费者群组, 一条消息会被所有消费者消费
消费模式 | 集群模式 | 广播模式 |
---|---|---|
设置方式 | 默认 | MessageModel.BROADCASTING |
消费次数 | 每个消费者群组只会有一个实例消费 | 忽略群组概念,所有消费者实例都会消费 |
维护Offset | broker | 消费者自己维护,中途宕机会丢失Offset |
设置广播模式代码:
consumer.setMessageModel(MessageModel.BROADCASTING);
顺序消息
实现原理:?
- 生产者将一批消息发送到同一个messageQueue, 就能保证这批消息发送有序.
- 消费者一次锁定一个MessageQueue消息, 保证消费局部有序.
生产者代码:
@Test
public void sendOrderMessage() throws MQClientException, MQBrokerException, RemotingException, InterruptedException {
DefaultMQProducer producer = new DefaultMQProducer("PG_kk_test_order");
producer.setNamesrvAddr("192.168.6.128:9876");
// 启动生产者
producer.start();
// 创建5个订单
for (int orderId = 0; orderId < 5; orderId++) {
// 每个订单都要顺序经历0,1,2,3,4这几个步骤
for (int i = 0; i < 5; i++) {
Message message = new Message("MQ_kk_test_order", "tagA", ("开始下单, order = " + orderId + ", step =" + i)
.getBytes());
SendResult sendResult = producer.send(message, new MessageQueueSelector() {
@Override
public MessageQueue select(List<MessageQueue> queues, Message message, Object o) {
// 将相同orderId的消息放入同一个MessageQueue
int orderId = (int) o;
return queues.get(orderId % queues.size());
}
}, orderId);
System.out.println(sendResult);
}
}
producer.shutdown();
}
消费者代码:
public class OrderConsumer {
public static void main(String[] args) throws MQClientException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("CG_kk_test_order");
consumer.setNamesrvAddr("192.168.6.128:9876");
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
consumer.subscribe("MQ_kk_test_order", "*");
consumer.registerMessageListener(new MessageListenerOrderly() {
@Override
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> list, ConsumeOrderlyContext context) {
context.setAutoCommit(true);
for (MessageExt messageExt : list) {
System.out.println(" 收到消息:" + new String(messageExt.getBody()));
}
return ConsumeOrderlyStatus.SUCCESS;
}
});
consumer.start();
System.out.println("consumer start ...");
}
}
结果:
注意:
- 只能保证局部有序, 不能保证全局有序.如果要做全局有序, 那就只用一个MessageQueue,但是性能会下降
- 生产者应尽可能讲不同批次的有序消息打散到不同的MessageQueue, 避免queue排队竞争
- 消费者只能用同步的方式处理消息,不要使用异步处理。更不能自行使用批量处理
- 如果一条消息处理失败,RocketMQ会将后续消息阻塞住,让消费者进行重试。但是,如果消费者一直处理失败,超过最大重试次数,那么RocketMQ就会跳过这一条消息,处理后面的消息,这会造成消息乱序
- 消费者端如果确实处理逻辑中出现问题,不建议抛出异常,可以返回ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT替代
延迟消息
?延迟消息是指消息发送到Apache RocketMQ后,并不会马上投递这条消息,而是延迟一定时间后才投递到Consumer进行消费。
生产者核心代码:
// 给消息设置延时级别
message.setDelayTimeLevel(5);
默认18个延时级别, 也可修改默认配置
实现原理:?
????????RocketMQ的实现方式是预设一个系统Topic,名字叫做SCHEDULE_TOPIC_XXXX。在这个Topic下,预设18个延迟queue,会把延时消息转发到SCHEDULE_TOPIC。然后每次只针对这18个队列里的消息进行延迟操作,这样就不用一直定时扫描所有TOPIC。
批量消息
将多条消息合并成一个批量消息,一次性发送出去。这样可以减少网络IO,提升消息发送的吞吐量
生产者代码:
@Test
public void sendBatchMessage() throws MQClientException, MQBrokerException, RemotingException, InterruptedException {
DefaultMQProducer producer = new DefaultMQProducer("PG_kk_test_order");
producer.setNamesrvAddr("192.168.6.128:9876");
// 启动生产者
producer.start();
List<Message> messages = new ArrayList<>();
for (int i = 0; i < 5; i++) {
Message message = new Message("MQ_kk_test_order", "tagA", ("hello, MQ i = " + i)
.getBytes());
messages.add(message);
}
// 一次发送多条消息
SendResult sendResult = producer.send(messages);
System.out.println(sendResult);
producer.shutdown();
}
过滤消息
简单过滤
生产者发送消息带上TAG标识
Message message = new Message("test_topic", "tagA", "hello".getBytes());
SendResult sendResult = producer.send(message);
消费者订阅指定TAG消息
consumer.subscribe("test_topic", "tagA");
SQL过滤
首先去bin/broker.conf文件打开配置,
1. 修改选择消息模式为 SelectorType.SQL92,默认模式是SelectorType.TAG。
selectorType=SelectorType.SQL92
2. 设置允许属性过滤?
enablePropertyFilter=true
?不设置会报错
Exception in thread "main" org.apache.rocketmq.client.exception.MQClientException: CODE: 1 DESC: The broker does not support consumer to filter message by SQL92
For more information, please visit the url, http://rocketmq.apache.org/docs/faq/
at org.apache.rocketmq.client.impl.MQClientAPIImpl.checkClientInBroker(MQClientAPIImpl.java:2315)
at org.apache.rocketmq.client.impl.factory.MQClientInstance.checkClientInBroker(MQClientInstance.java:450)
at org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl.start(DefaultMQPushConsumerImpl.java:665)
at org.apache.rocketmq.client.consumer.DefaultMQPushConsumer.start(DefaultMQPushConsumer.java:707)
at com.kk.mq.demo.simple.SqlFilterConsumer.main(SqlFilterConsumer.java:32)
生产者代码
@Test
public void sendSqlMessage() throws MQClientException, MQBrokerException, RemotingException, InterruptedException {
DefaultMQProducer producer = new DefaultMQProducer("PG_SqlFilterTest2");
producer.setNamesrvAddr("192.168.6.128:9876");
// 启动生产者
producer.start();
String[] tags = new String[] {"TagA", "TagB", "TagC"};
for (int i = 0; i < 15; i++) {
Message msg = new Message("SqlFilterTest2",
tags[i % tags.length],
("Hello RocketMQ " + i).getBytes()
);
msg.putUserProperty("a", String.valueOf(i));
SendResult sendResult = producer.send(msg);
System.out.printf("%s%n", sendResult);
}
producer.shutdown();
}
消费者代码
public class SqlFilterConsumer {
public static void main(String[] args) throws MQClientException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("CG_SqlFilterTest1");
consumer.setNamesrvAddr("192.168.6.128:9876");
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
consumer.subscribe("test_sql_topic",
MessageSelector.bySql("(TAGS is not null and TAGS in ('tagA', 'tagB'))" +
"and (a is not null and a between 0 and 5)"));
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
for (MessageExt messageExt : list) {
System.out.println("收到消息:" + new String(messageExt.getBody()) + "\n" + messageExt.getProperties() + "\n");
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
System.out.println("consumer start ...");
}
}
事务消息
作用
????????和传统的分布式事务不同, 只保证生产者端事务一致, 消费者端自己保证
代码示例
1. 生产者发送消息代码
@Test
public void sendTransactionMessage() throws MQClientException, InterruptedException {
TransactionListener transactionListener = new TransactionListenerImpl();
TransactionMQProducer producer = new TransactionMQProducer("please_rename_unique_group_name");
producer.setNamesrvAddr("192.168.6.128:9876");
producer.setSendMsgTimeout(8000);
ExecutorService executorService = new ThreadPoolExecutor(2, 5, 100, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(2000), new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r);
thread.setName("client-transaction-msg-check-thread");
return thread;
}
});
producer.setExecutorService(executorService);
producer.setTransactionListener(transactionListener);
producer.start();
String[] tags = new String[] {"TagA", "TagB", "TagC", "TagD", "TagE"};
for (int i = 0; i < 10; i++) {
try {
// 依次发送10条消息, 每个Tag分别发两次
Message msg =
new Message("TopicTest_Transaction", tags[i % tags.length], "KEY" + i,
("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
SendResult sendResult = producer.sendMessageInTransaction(msg, null);
System.out.printf("%s%n", sendResult);
Thread.sleep(10);
} catch (MQClientException | UnsupportedEncodingException e) {
e.printStackTrace();
}
}
for (int i = 0; i < 100000; i++) {
Thread.sleep(1000);
}
producer.shutdown();
}
2. 定义生产者事务监听器
public class TransactionListenerImpl implements TransactionListener {
@Override
public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
String tags = msg.getTags();
if(StringUtils.contains(tags,"TagA")){
return LocalTransactionState.COMMIT_MESSAGE;
}else if(StringUtils.contains(tags,"TagB")){
return LocalTransactionState.ROLLBACK_MESSAGE;
}else{
return LocalTransactionState.UNKNOW;
}
}
@Override
public LocalTransactionState checkLocalTransaction(MessageExt msg) {
String tags = msg.getTags();
if(StringUtils.contains(tags,"TagC")){
return LocalTransactionState.COMMIT_MESSAGE;
}else if(StringUtils.contains(tags,"TagD")){
return LocalTransactionState.ROLLBACK_MESSAGE;
}else{
return LocalTransactionState.UNKNOW;
}
}
}
3. 消费者代码, 正常消费逻辑
public class SimpleConsumer {
public static void main(String[] args) throws MQClientException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("CG_TopicTest_Transaction");
consumer.setNamesrvAddr("192.168.6.128:9876");
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
// consumer.setMessageModel(MessageModel.BROADCASTING);
consumer.subscribe("TopicTest_Transaction", "*");
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
for (MessageExt messageExt : list) {
// System.out.println("收到消息:" + new String(messageExt.getBody()));
System.out.println(new Date() + " 收到消息:" + messageExt);
}
// return ConsumeConcurrentlyStatus.RECONSUME_LATER;
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
System.out.println("consumer start ...");
}
}
消费者运行结果:
可以看到消费者先收到两个TagA的消息, 过一分钟后收到两个TagC消息.类似于两阶段提交.
两阶段原理?
- 第一阶段, 生产者消息发送完, 先回调executeLocalTransaction方法, 检查状态
- 如果状态是COMMIT, 消息直接推送给消费者
- 如果状态是ROLLBACK, 消息丢弃, 生产者端回滚事务
- 如果状态是UNKNOW, 过一分钟(可配置)再来检查状态
- 如果1阶段返回是UNKNOW, 进入第二阶段, 调checkLocalTransaction方法,再次检查状态
- 如果状态是COMMIT, 消息直接推送给消费者
- 如果状态是ROLLBACK, 消息丢弃, 生产者端回滚事务
- 如果状态是UNKNOW, 过一分钟(可配置)再来检查状态, 递归调自己.最大重试15次(可配置)
broker配置项,配置间隔时间和重试次数, 应基于自身业务, 配置是broker级别.
?
# 最大重试次数
transactionCheckMax=15
# 重试间隔, 60秒
transactionCheckInterval=60000
RocketMq集成SpringBoot
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。 如若内容造成侵权/违法违规/事实不符,请联系我的编程经验分享网邮箱:veading@qq.com进行投诉反馈,一经查实,立即删除!