RocketMQ与SpringBoot实际项目中使用
2023-12-25 21:51:15
消息生产者
1)添加依赖
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.0.1.RELEASE</version>
</parent>
<properties>
<rocketmq-spring-boot-starter-version>2.0.3</rocketmq-spring-boot-starter-version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>${rocketmq-spring-boot-starter-version}</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.6</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
2)配置文件
# application.properties
rocketmq.name-server=192.168.25.135:9876;192.168.25.138:9876
rocketmq.producer.group=my-group
3)测试类
@RunWith(SpringRunner.class)
@SpringBootTest(classes = {MQSpringBootApplication.class})
public class ProducerTest {
@Autowired
private RocketMQTemplate rocketMQTemplate;
@Test
public void test1(){
rocketMQTemplate.convertAndSend("springboot-mq","hello springboot rocketmq");
}
}
消息消费者
1)添加依赖
同消息生产者
2)配置文件
同消息生产者
4)消息监听器
@Slf4j
@Component
@RocketMQMessageListener(topic = "springboot-mq",consumerGroup = "springboot-mq-consumer-1")
public class Consumer implements RocketMQListener<String> {
@Override
public void onMessage(String message) {
log.info("Receive message:"+message);
}
}
失败补偿机制
1 消息发送方
消息发送方
- 配置RocketMQ属性值
rocketmq.name-server=192.168.25.135:9876;192.168.25.138:9876
rocketmq.producer.group=orderProducerGroup
mq.order.consumer.group.name=order_orderTopic_cancel_group
mq.order.topic=orderTopic
mq.order.tag.confirm=order_confirm
mq.order.tag.cancel=order_cancel
- 注入模板类和属性值信息
@Autowired
private RocketMQTemplate rocketMQTemplate;
@Value("${mq.order.topic}")
private String topic;
@Value("${mq.order.tag.cancel}")
private String cancelTag;
- 发送下单失败消息
@Override
public Result confirmOrder(TradeOrder order) {
//1.校验订单
//2.生成预订
try {
//3.扣减库存
//4.扣减优惠券
//5.使用余额
//6.确认订单
} catch (Exception e) {
//确认订单失败,发送消息
CancelOrderMQ cancelOrderMQ = new CancelOrderMQ();
cancelOrderMQ.setOrderId(order.getOrderId());
cancelOrderMQ.setCouponId(order.getCouponId());
cancelOrderMQ.setGoodsId(order.getGoodsId());
cancelOrderMQ.setGoodsNumber(order.getGoodsNumber());
cancelOrderMQ.setUserId(order.getUserId());
cancelOrderMQ.setUserMoney(order.getMoneyPaid());
try {
sendMessage(topic,
cancelTag,
cancelOrderMQ.getOrderId().toString(),
JSON.toJSONString(cancelOrderMQ));
} catch (Exception e1) {
e1.printStackTrace();
CastException.cast(ShopCode.SHOP_MQ_SEND_MESSAGE_FAIL);
}
return new Result(ShopCode.SHOP_FAIL.getSuccess(), ShopCode.SHOP_FAIL.getMessage());
}
}
private void sendMessage(String topic, String tags, String keys, String body) throws Exception {
//判断Topic是否为空
if (StringUtils.isEmpty(topic)) {
CastException.cast(ShopCode.SHOP_MQ_TOPIC_IS_EMPTY);
}
//判断消息内容是否为空
if (StringUtils.isEmpty(body)) {
CastException.cast(ShopCode.SHOP_MQ_MESSAGE_BODY_IS_EMPTY);
}
//消息体
Message message = new Message(topic, tags, keys, body.getBytes());
//发送消息
rocketMQTemplate.getProducer().send(message);
}
2 消费接收方
- 配置RocketMQ属性值
rocketmq.name-server=192.168.25.135:9876;192.168.25.138:9876
mq.order.consumer.group.name=order_orderTopic_cancel_group
mq.order.topic=orderTopic
- 创建监听类,消费消息
@Slf4j
@Component
@RocketMQMessageListener(topic = "${mq.order.topic}",
consumerGroup = "${mq.order.consumer.group.name}",
messageModel = MessageModel.BROADCASTING)
public class CancelOrderConsumer implements RocketMQListener<MessageExt>{
@Override
public void onMessage(MessageExt messageExt) {
...
}
}
2.1)回退库存
- 消息消费者
@Slf4j
@Component
@RocketMQMessageListener(topic = "${mq.order.topic}",consumerGroup = "${mq.order.consumer.group.name}",messageModel = MessageModel.BROADCASTING )
public class CancelMQListener implements RocketMQListener<MessageExt>{
@Value("${mq.order.consumer.group.name}")
private String groupName;
@Autowired
private TradeGoodsMapper goodsMapper;
@Autowired
private TradeMqConsumerLogMapper mqConsumerLogMapper;
@Autowired
private TradeGoodsNumberLogMapper goodsNumberLogMapper;
@Override
public void onMessage(MessageExt messageExt) {
String msgId=null;
String tags=null;
String keys=null;
String body=null;
try {
//1. 解析消息内容
msgId = messageExt.getMsgId();
tags= messageExt.getTags();
keys= messageExt.getKeys();
body= new String(messageExt.getBody(),"UTF-8");
log.info("接受消息成功");
//2. 查询消息消费记录
TradeMqConsumerLogKey primaryKey = new TradeMqConsumerLogKey();
primaryKey.setMsgTag(tags);
primaryKey.setMsgKey(keys);
primaryKey.setGroupName(groupName);
TradeMqConsumerLog mqConsumerLog = mqConsumerLogMapper.selectByPrimaryKey(primaryKey);
if(mqConsumerLog!=null){
//3. 判断如果消费过...
//3.1 获得消息处理状态
Integer status = mqConsumerLog.getConsumerStatus();
//处理过...返回
if(ShopCode.SHOP_MQ_MESSAGE_STATUS_SUCCESS.getCode().intValue()==status.intValue()){
log.info("消息:"+msgId+",已经处理过");
return;
}
//正在处理...返回
if(ShopCode.SHOP_MQ_MESSAGE_STATUS_PROCESSING.getCode().intValue()==status.intValue()){
log.info("消息:"+msgId+",正在处理");
return;
}
//处理失败
if(ShopCode.SHOP_MQ_MESSAGE_STATUS_FAIL.getCode().intValue()==status.intValue()){
//获得消息处理次数
Integer times = mqConsumerLog.getConsumerTimes();
if(times>3){
log.info("消息:"+msgId+",消息处理超过3次,不能再进行处理了");
return;
}
mqConsumerLog.setConsumerStatus(ShopCode.SHOP_MQ_MESSAGE_STATUS_PROCESSING.getCode());
//使用数据库乐观锁更新
TradeMqConsumerLogExample example = new TradeMqConsumerLogExample();
TradeMqConsumerLogExample.Criteria criteria = example.createCriteria();
criteria.andMsgTagEqualTo(mqConsumerLog.getMsgTag());
criteria.andMsgKeyEqualTo(mqConsumerLog.getMsgKey());
criteria.andGroupNameEqualTo(groupName);
criteria.andConsumerTimesEqualTo(mqConsumerLog.getConsumerTimes());
int r = mqConsumerLogMapper.updateByExampleSelective(mqConsumerLog, example);
if(r<=0){
//未修改成功,其他线程并发修改
log.info("并发修改,稍后处理");
}
}
}else{
//4. 判断如果没有消费过...
mqConsumerLog = new TradeMqConsumerLog();
mqConsumerLog.setMsgTag(tags);
mqConsumerLog.setMsgKey(keys);
mqConsumerLog.setConsumerStatus(ShopCode.SHOP_MQ_MESSAGE_STATUS_PROCESSING.getCode());
mqConsumerLog.setMsgBody(body);
mqConsumerLog.setMsgId(msgId);
mqConsumerLog.setConsumerTimes(0);
//将消息处理信息添加到数据库
mqConsumerLogMapper.insert(mqConsumerLog);
}
//5. 回退库存
MQEntity mqEntity = JSON.parseObject(body, MQEntity.class);
Long goodsId = mqEntity.getGoodsId();
TradeGoods goods = goodsMapper.selectByPrimaryKey(goodsId);
goods.setGoodsNumber(goods.getGoodsNumber()+mqEntity.getGoodsNum());
goodsMapper.updateByPrimaryKey(goods);
//记录库存操作日志
TradeGoodsNumberLog goodsNumberLog = new TradeGoodsNumberLog();
goodsNumberLog.setOrderId(mqEntity.getOrderId());
goodsNumberLog.setGoodsId(goodsId);
goodsNumberLog.setGoodsNumber(mqEntity.getGoodsNum());
goodsNumberLog.setLogTime(new Date());
goodsNumberLogMapper.insert(goodsNumberLog);
//6. 将消息的处理状态改为成功
mqConsumerLog.setConsumerStatus(ShopCode.SHOP_MQ_MESSAGE_STATUS_SUCCESS.getCode());
mqConsumerLog.setConsumerTimestamp(new Date());
mqConsumerLogMapper.updateByPrimaryKey(mqConsumerLog);
log.info("回退库存成功");
} catch (Exception e) {
e.printStackTrace();
TradeMqConsumerLogKey primaryKey = new TradeMqConsumerLogKey();
primaryKey.setMsgTag(tags);
primaryKey.setMsgKey(keys);
primaryKey.setGroupName(groupName);
TradeMqConsumerLog mqConsumerLog = mqConsumerLogMapper.selectByPrimaryKey(primaryKey);
if(mqConsumerLog==null){
//数据库未有记录
mqConsumerLog = new TradeMqConsumerLog();
mqConsumerLog.setMsgTag(tags);
mqConsumerLog.setMsgKey(keys);
mqConsumerLog.setConsumerStatus(ShopCode.SHOP_MQ_MESSAGE_STATUS_FAIL.getCode());
mqConsumerLog.setMsgBody(body);
mqConsumerLog.setMsgId(msgId);
mqConsumerLog.setConsumerTimes(1);
mqConsumerLogMapper.insert(mqConsumerLog);
}else{
mqConsumerLog.setConsumerTimes(mqConsumerLog.getConsumerTimes()+1);
mqConsumerLogMapper.updateByPrimaryKeySelective(mqConsumerLog);
}
}
}
}
2.2)回退优惠券
@Slf4j
@Component
@RocketMQMessageListener(topic = "${mq.order.topic}",consumerGroup = "${mq.order.consumer.group.name}",messageModel = MessageModel.BROADCASTING )
public class CancelMQListener implements RocketMQListener<MessageExt>{
@Autowired
private TradeCouponMapper couponMapper;
@Override
public void onMessage(MessageExt message) {
try {
//1. 解析消息内容
String body = new String(message.getBody(), "UTF-8");
MQEntity mqEntity = JSON.parseObject(body, MQEntity.class);
log.info("接收到消息");
//2. 查询优惠券信息
TradeCoupon coupon = couponMapper.selectByPrimaryKey(mqEntity.getCouponId());
//3.更改优惠券状态
coupon.setUsedTime(null);
coupon.setIsUsed(ShopCode.SHOP_COUPON_UNUSED.getCode());
coupon.setOrderId(null);
couponMapper.updateByPrimaryKey(coupon);
log.info("回退优惠券成功");
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
log.error("回退优惠券失败");
}
}
}
2.3)回退余额
@Slf4j
@Component
@RocketMQMessageListener(topic = "${mq.order.topic}",consumerGroup = "${mq.order.consumer.group.name}",messageModel = MessageModel.BROADCASTING )
public class CancelMQListener implements RocketMQListener<MessageExt>{
@Autowired
private IUserService userService;
@Override
public void onMessage(MessageExt messageExt) {
try {
//1.解析消息
String body = new String(messageExt.getBody(), "UTF-8");
MQEntity mqEntity = JSON.parseObject(body, MQEntity.class);
log.info("接收到消息");
if(mqEntity.getUserMoney()!=null && mqEntity.getUserMoney().compareTo(BigDecimal.ZERO)>0){
//2.调用业务层,进行余额修改
TradeUserMoneyLog userMoneyLog = new TradeUserMoneyLog();
userMoneyLog.setUseMoney(mqEntity.getUserMoney());
userMoneyLog.setMoneyLogType(ShopCode.SHOP_USER_MONEY_REFUND.getCode());
userMoneyLog.setUserId(mqEntity.getUserId());
userMoneyLog.setOrderId(mqEntity.getOrderId());
userService.updateMoneyPaid(userMoneyLog);
log.info("余额回退成功");
}
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
log.error("余额回退失败");
}
}
}
2.4)取消订单
@Override
public void onMessage(MessageExt messageExt) {
String body = new String(messageExt.getBody(), "UTF-8");
String msgId = messageExt.getMsgId();
String tags = messageExt.getTags();
String keys = messageExt.getKeys();
log.info("CancelOrderProcessor receive message:"+messageExt);
CancelOrderMQ cancelOrderMQ = JSON.parseObject(body, CancelOrderMQ.class);
TradeOrder order = orderService.findOne(cancelOrderMQ.getOrderId());
order.setOrderStatus(ShopCode.SHOP_ORDER_CANCEL.getCode());
orderService.changeOrderStatus(order);
log.info("订单:["+order.getOrderId()+"]状态设置为取消");
return order;
}
线程池优化消息发送逻辑
- 创建线程池对象
@Bean
public ThreadPoolTaskExecutor getThreadPool() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(4);
executor.setMaxPoolSize(8);
executor.setQueueCapacity(100);
executor.setKeepAliveSeconds(60);
executor.setThreadNamePrefix("Pool-A");
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
executor.initialize();
return executor;
}
- 使用线程池
@Autowired
private ThreadPoolTaskExecutor executorService;
public Result callbackPayment(TradePay tradePay) {
if (tradePay.getIsPaid().equals(ShopCode.SHOP_ORDER_PAY_STATUS_IS_PAY.getCode())) {
tradePay = tradePayMapper.selectByPrimaryKey(tradePay.getPayId());
if (tradePay == null) {
CastException.cast(ShopCode.SHOP_PAYMENT_NOT_FOUND);
}
tradePay.setIsPaid(ShopCode.SHOP_ORDER_PAY_STATUS_IS_PAY.getCode());
int i = tradePayMapper.updateByPrimaryKeySelective(tradePay);
//更新成功代表支付成功
if (i == 1) {
TradeMqProducerTemp mqProducerTemp = new TradeMqProducerTemp();
mqProducerTemp.setId(String.valueOf(idWorker.nextId()));
mqProducerTemp.setGroupName("payProducerGroup");
mqProducerTemp.setMsgKey(String.valueOf(tradePay.getPayId()));
mqProducerTemp.setMsgTag(topic);
mqProducerTemp.setMsgBody(JSON.toJSONString(tradePay));
mqProducerTemp.setCreateTime(new Date());
mqProducerTempMapper.insert(mqProducerTemp);
TradePay finalTradePay = tradePay;
executorService.submit(new Runnable() {
@Override
public void run() {
try {
SendResult sendResult = sendMessage(topic,
tag,
finalTradePay.getPayId(),
JSON.toJSONString(finalTradePay));
log.info(JSON.toJSONString(sendResult));
if (SendStatus.SEND_OK.equals(sendResult.getSendStatus())) {
mqProducerTempMapper.deleteByPrimaryKey(mqProducerTemp.getId());
System.out.println("删除消息表成功");
}
} catch (Exception e) {
e.printStackTrace();
}
}
});
} else {
CastException.cast(ShopCode.SHOP_PAYMENT_IS_PAID);
}
}
return new Result(ShopCode.SHOP_SUCCESS.getSuccess(), ShopCode.SHOP_SUCCESS.getMessage());
}
文章来源:https://blog.csdn.net/Forbidden_City/article/details/135207202
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。 如若内容造成侵权/违法违规/事实不符,请联系我的编程经验分享网邮箱:veading@qq.com进行投诉反馈,一经查实,立即删除!
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。 如若内容造成侵权/违法违规/事实不符,请联系我的编程经验分享网邮箱:veading@qq.com进行投诉反馈,一经查实,立即删除!