消费可靠性投递-confirm确认模式
2024-01-08 11:28:42
package com.java1234.producer.config;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RabbitMQConfig {
/**
* direct交换机名称
*/
public static final String DIRECT_EXCHANGE="directExchange";
/**
* direct交换机名称1
*/
public static final String DIRECT_EXCHANGE1="directExchange1";
/**
* fanout交换机名称
*/
public static final String FANOUT_EXCHANGE="fanoutExchange";
/**
* direct队列名称
*/
public static final String DIRECT_QUEUE="directQueue";
/**
* direct1队列名称
*/
public static final String DIRECT_QUEUE1="directQueue1";
/**
* direct2队列名称
*/
public static final String DIRECT_QUEUE2="directQueue2";
/**
* 订阅队列1名称
*/
public static final String SUB_QUEUE1="subQueue1";
/**
* 订阅队列2名称
*/
public static final String SUB_QUEUE2="subQueue2";
/**
* direct路由Key
*/
public static final String DIRECT_ROUTINGKEY="directRoutingKey";
/**
* topic队列名称1
*
*/
public static final String TOPIC_QUEUE1="topicQueue1";
/**
* topic队列名称2
*
*/
public static final String TOPIC_QUEUE2="topicQueue2";
/**
* direct交换机名称
*/
public static final String TOPIC_EXCHANGE="topicExchange";
/**
* 定义一个direct交换机
* @return
*/
@Bean
public DirectExchange directExchange(){
return new DirectExchange(DIRECT_EXCHANGE);
}
/**
* 定义一个direct交换机1
* @return
*/
@Bean
public DirectExchange directExchange1(){
return new DirectExchange(DIRECT_EXCHANGE1);
}
/**
* 定义一个direct交换机
* @return
*/
@Bean
public FanoutExchange fanoutExchange(){
return new FanoutExchange(FANOUT_EXCHANGE);
}
/**
* 定义一个direct队列
* @return
*/
@Bean
public Queue directQueue(){
return new Queue(DIRECT_QUEUE);
}
/**
* 定义一个direct1队列
* @return
*/
@Bean
public Queue directQueue1(){
return new Queue(DIRECT_QUEUE1);
}
/**
* 定义一个direct2队列
* @return
*/
@Bean
public Queue directQueue2(){
return new Queue(DIRECT_QUEUE2);
}
/**
* 定义一个订阅队列1
* @return
*/
@Bean
public Queue subQueue1(){
return new Queue(SUB_QUEUE1);
}
/**
* 定义一个订阅队列2
* @return
*/
@Bean
public Queue subQueue2(){
return new Queue(SUB_QUEUE2);
}
/**
* 定义一个队列和交换机的绑定
* @return
*/
@Bean
public Binding directBinding(){
return BindingBuilder.bind(directQueue()).to(directExchange()).with(DIRECT_ROUTINGKEY);
}
/**
* 定义一个队列和交换机的绑定
* @return
*/
@Bean
public Binding fanoutBinding1(){
return BindingBuilder.bind(subQueue1()).to(fanoutExchange());
}
/**
* 定义一个队列和交换机的绑定
* @return
*/
@Bean
public Binding fanoutBinding2(){
return BindingBuilder.bind(subQueue2()).to(fanoutExchange());
}
/**
* 定义一个队列和交换机的绑定
* @return
*/
@Bean
public Binding directBinding1(){
return BindingBuilder.bind(directQueue1()).to(directExchange1()).with("error");
}
/**
* 定义一个队列和交换机的绑定
* @return
*/
@Bean
public Binding directBinding2(){
return BindingBuilder.bind(directQueue2()).to(directExchange1()).with("info");
}
/**
* 定义一个队列和交换机的绑定
* @return
*/
@Bean
public Binding directBinding3(){
return BindingBuilder.bind(directQueue2()).to(directExchange1()).with("error");
}
/**
* 定义一个队列和交换机的绑定
* @return
*/
@Bean
public Binding directBinding4(){
return BindingBuilder.bind(directQueue2()).to(directExchange1()).with("warning");
}
/**
* 定义一个topic队列1
*/
@Bean
public Queue topicQueue1(){
return new Queue(TOPIC_QUEUE1);
}
/**
* 定义一个topic队列2
*/
@Bean
public Queue topicQueue2(){
return new Queue(TOPIC_QUEUE2);
}
/**
* 定义一个direct交换机
* @return
*/
@Bean
public TopicExchange topicExchange(){
return new TopicExchange(TOPIC_EXCHANGE);
}
/**
* 定义一个队列和交换机的绑定
* @return
*/
@Bean
public Binding topicBinding1(){
return BindingBuilder.bind(topicQueue1()).to(topicExchange()).with("*.orange.*");
}
/**
* 定义一个队列和交换机的绑定
* @return
*/
@Bean
public Binding topicBinding2(){
return BindingBuilder.bind(topicQueue2()).to(topicExchange()).with("*.*.rabbit");
}
/**
* 定义一个队列和交换机的绑定
* @return
*/
@Bean
public Binding topicBinding3(){
return BindingBuilder.bind(topicQueue2()).to(topicExchange()).with("lazy.#");
}
}
package com.java1234.consumer.service.impl;
import com.java1234.consumer.service.RabbitMqService;
import com.java1234.producer.config.RabbitMQConfig;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
@Service("rabbitmqService")
public class RabbitMqServiceImpl implements RabbitMqService {
@Autowired
private AmqpTemplate amqpTemplate;
@Override
public void receiveMessage() {
String message=(String) amqpTemplate.receiveAndConvert(RabbitMQConfig.DIRECT_QUEUE);
System.out.println("接受到的mq消息:"+message);
}
@Override
@RabbitListener(queues = {RabbitMQConfig.TOPIC_QUEUE1})
public void receiveMessage2(String message) {
// System.out.println("消费者1:接收到的mq消息:"+message);
System.out.println("队列1接收日志消息:"+message);
}
@Override
@RabbitListener(queues = {RabbitMQConfig.TOPIC_QUEUE2})
public void receiveMessage3(String message) {
// System.out.println("消费者2:接收到的mq消息:"+message);
System.out.println("队列2接收日志消息:"+message);
}
@Override
@RabbitListener(queues = {RabbitMQConfig.SUB_QUEUE1})
public void receiveSubMessage1(String message) {
System.out.println("订阅者1:接收到的mq消息:"+message);
}
@Override
@RabbitListener(queues = {RabbitMQConfig.SUB_QUEUE2})
public void receiveSubMessage2(String message) {
System.out.println("订阅者2:接收到的mq消息:"+message);
}
}
package com.java1234.consumer.service;
public interface RabbitMqService {
/**
* 接受消息
*/
public void receiveMessage();
/**
* 接受消息
*/
public void receiveMessage2(String message);
/**
* 接受消息
*/
public void receiveMessage3(String message);
/**
* 接受订阅消息1
*/
public void receiveSubMessage1(String message);
/**
* 接受订阅消息2
*/
public void receiveSubMessage2(String message);
}
package com.java1234.consumer;
import com.java1234.consumer.service.RabbitMqService;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.ApplicationContext;
@SpringBootApplication
public class ConsumerApplication {
public static void main(String[] args) {
ApplicationContext ac = SpringApplication.run(ConsumerApplication.class,args);
// RabbitMqService rabbitMqService=(RabbitMqService) ac.getBean("rabbitmqService");
// rabbitMqService.receiveMessage();
}
}
package com.java1234.producer.service.impl;
import com.java1234.producer.config.RabbitMQConfig;
import com.java1234.producer.service.RabbitMqService;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import javax.annotation.PostConstruct;
@Service("rabbitmqService")
public class RabbitMqServiceImpl implements RabbitMqService, RabbitTemplate.ConfirmCallback {
@Autowired
private AmqpTemplate amqpTemplate;
@Autowired
private RabbitTemplate rabbitTemplate;
@PostConstruct
public void init(){
rabbitTemplate.setConfirmCallback(this);
}
/**
* String exchange 交换机名称
* String routingKey 路由Key
* Object object 具体发送的消息
* @param message
*/
@Override
public void sendMessage(String message) {
// amqpTemplate.convertAndSend(RabbitMQConfig.DIRECT_EXCHANGE,RabbitMQConfig.DIRECT_ROUTINGKEY,message);
CorrelationData correlationData=new CorrelationData("3453");
rabbitTemplate.convertAndSend(RabbitMQConfig.DIRECT_EXCHANGE,RabbitMQConfig.DIRECT_ROUTINGKEY,message,correlationData);
}
@Override
public void sendFanoutMessage(String message) {
amqpTemplate.convertAndSend(RabbitMQConfig.FANOUT_EXCHANGE,"",message);
}
@Override
public void sendRoutingMessage() {
amqpTemplate.convertAndSend(RabbitMQConfig.DIRECT_EXCHANGE1,"warning2","发送warning2级别的消息");
}
@Override
public void sendTopicMessage() {
// amqpTemplate.convertAndSend(RabbitMQConfig.TOPIC_EXCHANGE,"quick.orange.rabbit","飞快的橘色兔子");
// amqpTemplate.convertAndSend(RabbitMQConfig.TOPIC_EXCHANGE,"lazy.orange.elephant","慢腾腾的橘色大象");
// amqpTemplate.convertAndSend(RabbitMQConfig.TOPIC_EXCHANGE,"quick.orange.fox","quick.orange.fox");
// amqpTemplate.convertAndSend(RabbitMQConfig.TOPIC_EXCHANGE,"lazy.brown.fox","lazy.brown.fox");
amqpTemplate.convertAndSend(RabbitMQConfig.TOPIC_EXCHANGE,"quick.brown.fox","quick.brown.fox");
}
/**
*
* @param correlationData 消息唯一标识
* @param ack 交换机是否成功收到消息 true成功 false失败
* @param cause 失败原因
*/
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
System.out.println("confirm方法被执行了..."+correlationData);
if(ack){
System.out.println("交换机,消息接收成功"+cause);
}else{
System.out.println("交换机,消息接收失败"+cause);
//我们这里要做一些消息补发的措施
System.out.println("id="+correlationData.getId());
}
}
}
package com.java1234.producer.service;
public interface RabbitMqService {
/**
* 发送消息
* @param message
*/
public void sendMessage(String message);
/**
* 发送消息
* @param message
*/
public void sendFanoutMessage(String message);
/**
* 发送路由模式消息
*/
public void sendRoutingMessage();
/**
* 发送Topic模式消息
*/
public void sendTopicMessage();
}
package com.java1234.producer;
import com.java1234.producer.service.RabbitMqService;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ConfigurableApplicationContext;
@SpringBootApplication
public class ProducerApplication {
public static void main(String[] args) {
ApplicationContext ac = SpringApplication.run(ProducerApplication.class, args);
RabbitMqService rabbitMqService=(RabbitMqService) ac.getBean("rabbitmqService");
// rabbitMqService.sendRoutingMessage();
// rabbitMqService.sendTopicMessage();
rabbitMqService.sendMessage("confirm确认测试消息");
// for(int i=0;i < 10;i++){
rabbitMqService.sendMessage("RabbitMQ大爷你好!!!"+i);
// rabbitMqService.sendFanoutMessage(i+"用户欠费了");
// }
}
}
server:
port: 80
spring:
rabbitmq:
host: 192.168.30.113
port: 5672
username: pzy
password: 123456
virtual-host: /
publisher-confirm-type: correlated
文章来源:https://blog.csdn.net/m0_68935893/article/details/135452496
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。 如若内容造成侵权/违法违规/事实不符,请联系我的编程经验分享网邮箱:veading@qq.com进行投诉反馈,一经查实,立即删除!
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。 如若内容造成侵权/违法违规/事实不符,请联系我的编程经验分享网邮箱:veading@qq.com进行投诉反馈,一经查实,立即删除!