延迟消息实现
2024-01-09 13:11:16
package com.java1234.producer.config;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;
@Configuration
public class RabbitMQConfig {
/**
* direct交换机名称
*/
public static final String DIRECT_EXCHANGE="directExchange";
/**
* delayedDirect交换机名称
*/
public static final String DELAYED_DIRECT_EXCHANGE="delayedDirectExchange";
/**
* TTL_direct交换机名称
*/
public static final String TTL_DIRECT_EXCHANGE="ttldirectExchange";
/**
* dlx_direct交换机名称
*/
public static final String DLX_DIRECT_EXCHANGE="dlxldirectExchange";
/**
* direct交换机名称1
*/
public static final String DIRECT_EXCHANGE1="directExchange1";
/**
* fanout交换机名称
*/
public static final String FANOUT_EXCHANGE="fanoutExchange";
/**
* direct队列名称
*/
public static final String DIRECT_QUEUE="directQueue";
/**
* delayed direct队列名称
*/
public static final String DELAYED_DIRECT_QUEUE="delayedDirectQueue";
/**
* ttl_direct队列名称
*/
public static final String TTL_DIRECT_QUEUE="ttldirectQueue";
/**
* dlx_direct队列名称
*/
public static final String DLX_DIRECT_QUEUE="dlxdirectQueue";
/**
* direct1队列名称
*/
public static final String DIRECT_QUEUE1="directQueue1";
/**
* direct2队列名称
*/
public static final String DIRECT_QUEUE2="directQueue2";
/**
* 订阅队列1名称
*/
public static final String SUB_QUEUE1="subQueue1";
/**
* 订阅队列2名称
*/
public static final String SUB_QUEUE2="subQueue2";
/**
* direct路由Key
*/
public static final String DIRECT_ROUTINGKEY="directRoutingKey";
/**
* ttl_direct路由Key
*/
public static final String TTL_DIRECT_ROUTINGKEY="ttl_directRoutingKey";
/**
* dlx_direct路由Key
*/
public static final String DLX_DIRECT_ROUTINGKEY="dlx_directRoutingKey";
/**
* delayed_direct路由Key
*/
public static final String DELAYED_DIRECT_ROUTINGKEY="delayed_directRoutingKey";
/**
* topic队列名称1
*
*/
public static final String TOPIC_QUEUE1="topicQueue1";
/**
* topic队列名称2
*
*/
public static final String TOPIC_QUEUE2="topicQueue2";
/**
* direct交换机名称
*/
public static final String TOPIC_EXCHANGE="topicExchange";
/**
* 定义一个direct交换机
* @return
*/
@Bean
public DirectExchange directExchange(){
return new DirectExchange(DIRECT_EXCHANGE);
}
/**
* 定义一个TTL direct交换机
* @return
*/
@Bean
public DirectExchange ttlDirectExchange(){
return new DirectExchange(TTL_DIRECT_EXCHANGE);
}
/**
* 定义一个delayed direct交换机
* @return
*/
@Bean
public CustomExchange delayedDirectExchange(){
Map<String, Object> args = new HashMap<>();
args.put("x-delayed-type", "direct");
return new CustomExchange(DELAYED_DIRECT_EXCHANGE,"x-delayed-message", true, false, args);
}
/**
* 定义一个DLX direct交换机
* @return
*/
@Bean
public DirectExchange dlxDirectExchange(){
return new DirectExchange(DLX_DIRECT_EXCHANGE);
}
/**
* 定义一个direct交换机1
* @return
*/
@Bean
public DirectExchange directExchange1(){
return new DirectExchange(DIRECT_EXCHANGE1);
}
/**
* 定义一个fanout交换机
* @return
*/
@Bean
public FanoutExchange fanoutExchange(){
return new FanoutExchange(FANOUT_EXCHANGE);
}
/**
* 定义一个TTL direct队列
* @return
*/
@Bean
public Queue ttlDirectQueue(){
Map<String,Object> map=new HashMap<>();
map.put("x-message-ttl",1000000);
map.put("x-dead-letter-exchange",DLX_DIRECT_EXCHANGE);
map.put("x-dead-letter-routing-key",DLX_DIRECT_ROUTINGKEY);
map.put("x-max-length",10);
return new Queue(TTL_DIRECT_QUEUE,true,false,false,map);
}
/**
* 定义一个DELAYED direct队列
* @return
*/
@Bean
public Queue delayedDirectQueue(){
return new Queue(DELAYED_DIRECT_QUEUE);
}
/**
* 定义一个DLX direct队列
* @return
*/
@Bean
public Queue dlxDirectQueue(){
return new Queue(DLX_DIRECT_QUEUE);
}
/**
* 定义一个direct队列
* @return
*/
@Bean
public Queue directQueue(){
return new Queue(DIRECT_QUEUE);
}
/**
* 定义一个direct1队列
* @return
*/
@Bean
public Queue directQueue1(){
return new Queue(DIRECT_QUEUE1);
}
/**
* 定义一个direct2队列
* @return
*/
@Bean
public Queue directQueue2(){
return new Queue(DIRECT_QUEUE2);
}
/**
* 定义一个订阅队列1
* @return
*/
@Bean
public Queue subQueue1(){
return new Queue(SUB_QUEUE1);
}
/**
* 定义一个订阅队列2
* @return
*/
@Bean
public Queue subQueue2(){
return new Queue(SUB_QUEUE2);
}
/**
* 定义一个队列和交换机的绑定
* @return
*/
@Bean
public Binding directBinding(){
return BindingBuilder.bind(directQueue()).to(directExchange()).with(DIRECT_ROUTINGKEY);
}
/**
* TTL定义一个队列和交换机的绑定
* @return
*/
@Bean
public Binding ttlDirectBinding(){
return BindingBuilder.bind(ttlDirectQueue()).to(ttlDirectExchange()).with(TTL_DIRECT_ROUTINGKEY);
}
/**
* dlx定义一个队列和交换机的绑定
* @return
*/
@Bean
public Binding dlxDirectBinding(){
return BindingBuilder.bind(dlxDirectQueue()).to(dlxDirectExchange()).with(DLX_DIRECT_ROUTINGKEY);
}
/**
* dlx定义一个队列和交换机的绑定
* @return
*/
@Bean
public Binding delayedDirectBinding(){
return BindingBuilder.bind(delayedDirectQueue()).to(delayedDirectExchange()).with(DLX_DIRECT_ROUTINGKEY).noargs();
}
/**
* 定义一个队列和交换机的绑定
* @return
*/
@Bean
public Binding fanoutBinding1(){
return BindingBuilder.bind(subQueue1()).to(fanoutExchange());
}
/**
* 定义一个队列和交换机的绑定
* @return
*/
@Bean
public Binding fanoutBinding2(){
return BindingBuilder.bind(subQueue2()).to(fanoutExchange());
}
/**
* 定义一个队列和交换机的绑定
* @return
*/
@Bean
public Binding directBinding1(){
return BindingBuilder.bind(directQueue1()).to(directExchange1()).with("error");
}
/**
* 定义一个队列和交换机的绑定
* @return
*/
@Bean
public Binding directBinding2(){
return BindingBuilder.bind(directQueue2()).to(directExchange1()).with("info");
}
/**
* 定义一个队列和交换机的绑定
* @return
*/
@Bean
public Binding directBinding3(){
return BindingBuilder.bind(directQueue2()).to(directExchange1()).with("error");
}
/**
* 定义一个队列和交换机的绑定
* @return
*/
@Bean
public Binding directBinding4(){
return BindingBuilder.bind(directQueue2()).to(directExchange1()).with("warning");
}
/**
* 定义一个topic队列1
*/
@Bean
public Queue topicQueue1(){
return new Queue(TOPIC_QUEUE1);
}
/**
* 定义一个topic队列2
*/
@Bean
public Queue topicQueue2(){
return new Queue(TOPIC_QUEUE2);
}
/**
* 定义一个direct交换机
* @return
*/
@Bean
public TopicExchange topicExchange(){
return new TopicExchange(TOPIC_EXCHANGE);
}
/**
* 定义一个队列和交换机的绑定
* @return
*/
@Bean
public Binding topicBinding1(){
return BindingBuilder.bind(topicQueue1()).to(topicExchange()).with("*.orange.*");
}
/**
* 定义一个队列和交换机的绑定
* @return
*/
@Bean
public Binding topicBinding2(){
return BindingBuilder.bind(topicQueue2()).to(topicExchange()).with("*.*.rabbit");
}
/**
* 定义一个队列和交换机的绑定
* @return
*/
@Bean
public Binding topicBinding3(){
return BindingBuilder.bind(topicQueue2()).to(topicExchange()).with("lazy.#");
}
}
package com.java1234.consumer.service.impl;
import com.java1234.consumer.service.RabbitMqService;
import com.java1234.producer.config.RabbitMQConfig;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Service;
import java.io.IOException;
import java.util.Date;
import org.springframework.context.annotation.Bean;
@Service("rabbitmqService")
public class RabbitMqServiceImpl implements RabbitMqService {
@Autowired
private AmqpTemplate amqpTemplate;
@Autowired
CachingConnectionFactory cachingConnectionFactory;
@Bean(name="limitContainerFactory")
public SimpleRabbitListenerContainerFactory simpleRabbitListenerContainerFactory(){
SimpleRabbitListenerContainerFactory factory=new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(cachingConnectionFactory);
factory.setPrefetchCount(3);
return factory;
}
@Override
@RabbitListener(queues = {RabbitMQConfig.TTL_DIRECT_QUEUE})
public void receiveMessage(String message, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag) {
try{
System.out.println(System.currentTimeMillis()+"接受到的mq消息:"+message);
//处理业务
System.out.println("处理业务"+1/0);
System.out.println("deliveryTag="+deliveryTag);
channel.basicAck(deliveryTag,true);
// if(deliveryTag==5){
// channel.basicAck(deliveryTag,true);
// }
}catch (Exception e){
e.printStackTrace();
try {
channel.basicNack(deliveryTag,false,false);
// channel.basicReject(deliveryTag,true);
// Thread.sleep(1000);
}catch (Exception e1){
e1.printStackTrace();
}
}
}
// @Override
// @RabbitListener(queues = {RabbitMQConfig.DIRECT_QUEUE},containerFactory = "limitContainerFactory")
// public void receiveMessage(String message, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag) {
// try{
// System.out.println("接受到的mq消息:"+message);
// //处理业务
// System.out.println("处理业务");
// System.out.println("deliveryTag="+deliveryTag);
// if(deliveryTag==5){
// channel.basicAck(deliveryTag,true);
// }
// }catch (Exception e){
// e.printStackTrace();
// try {
// channel.basicNack(deliveryTag,false,true);
channel.basicReject(deliveryTag,true);
Thread.sleep(1000);
// }catch (Exception e1){
// e1.printStackTrace();
// }
//
// }
//
// }
@Override
@RabbitListener(queues = {RabbitMQConfig.DELAYED_DIRECT_QUEUE})
public void receiveMessage2(String message) {
// System.out.println("消费者1:接收到的mq消息:"+message);
System.out.println("队列1接收日志消息:"+message+":"+new Date().toString());
}
@Override
@RabbitListener(queues = {RabbitMQConfig.TOPIC_QUEUE2})
public void receiveMessage3(String message) {
// System.out.println("消费者2:接收到的mq消息:"+message);
System.out.println("队列2接收日志消息:"+message);
}
@Override
@RabbitListener(queues = {RabbitMQConfig.SUB_QUEUE1})
public void receiveSubMessage1(String message) {
System.out.println("订阅者1:接收到的mq消息:"+message);
}
@Override
@RabbitListener(queues = {RabbitMQConfig.SUB_QUEUE2})
public void receiveSubMessage2(String message) {
System.out.println("订阅者2:接收到的mq消息:"+message);
}
}
package com.java1234.consumer.service;
import com.rabbitmq.client.Channel;
public interface RabbitMqService {
/**
* 接受消息
*/
public void receiveMessage(String message, Channel channel, long deliveryTag);
/**
* 接受消息
*/
public void receiveMessage2(String message);
/**
* 接受消息
*/
public void receiveMessage3(String message);
/**
* 接受订阅消息1
*/
public void receiveSubMessage1(String message);
/**
* 接受订阅消息2
*/
public void receiveSubMessage2(String message);
}
package com.java1234.consumer;
import com.java1234.consumer.service.RabbitMqService;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.ApplicationContext;
@SpringBootApplication
public class ConsumerApplication {
public static void main(String[] args) {
ApplicationContext ac = SpringApplication.run(ConsumerApplication.class,args);
// RabbitMqService rabbitMqService=(RabbitMqService) ac.getBean("rabbitmqService");
// rabbitMqService.receiveMessage();
}
}
server:
port: 81
spring:
rabbitmq:
host: 192.168.30.113
port: 5672
username: pzy
password: 123456
virtual-host: /
package com.java1234.producer.service.impl;
import com.java1234.producer.config.RabbitMQConfig;
import com.java1234.producer.service.RabbitMqService;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import javax.annotation.PostConstruct;
@Service("rabbitmqService")
public class RabbitMqServiceImpl implements RabbitMqService, RabbitTemplate.ConfirmCallback,RabbitTemplate.ReturnCallback {
@Autowired
private AmqpTemplate amqpTemplate;
@Autowired
private RabbitTemplate rabbitTemplate;
@PostConstruct
public void init(){
rabbitTemplate.setConfirmCallback(this);
rabbitTemplate.setReturnCallback(this);
}
/**
* String exchange 交换机名称
* String routingKey 路由Key
* Object object 具体发送的消息
* @param message
*/
@Override
public void sendMessage(String message) {
// amqpTemplate.convertAndSend(RabbitMQConfig.DIRECT_EXCHANGE,RabbitMQConfig.DIRECT_ROUTINGKEY,message);
CorrelationData correlationData=new CorrelationData("3453");
// rabbitTemplate.convertAndSend(RabbitMQConfig.DIRECT_EXCHANGE,RabbitMQConfig.DIRECT_ROUTINGKEY,message,correlationData);
rabbitTemplate.convertAndSend(RabbitMQConfig.DIRECT_EXCHANGE,RabbitMQConfig.DIRECT_ROUTINGKEY,message,correlationData);
}
@Override
public void sendTTLMessage(String message) {
// MessageProperties messageProperties=new MessageProperties();
// messageProperties.setExpiration("20000");//设置过期时间 20秒
// Message msg=new Message(message.getBytes(),messageProperties);
rabbitTemplate.convertAndSend(RabbitMQConfig.TTL_DIRECT_EXCHANGE,RabbitMQConfig.TTL_DIRECT_ROUTINGKEY,message);
}
@Override
public void sendFanoutMessage(String message) {
amqpTemplate.convertAndSend(RabbitMQConfig.FANOUT_EXCHANGE,"",message);
}
@Override
public void sendRoutingMessage() {
amqpTemplate.convertAndSend(RabbitMQConfig.DIRECT_EXCHANGE1,"warning2","发送warning2级别的消息");
}
@Override
public void sendTopicMessage() {
// amqpTemplate.convertAndSend(RabbitMQConfig.TOPIC_EXCHANGE,"quick.orange.rabbit","飞快的橘色兔子");
// amqpTemplate.convertAndSend(RabbitMQConfig.TOPIC_EXCHANGE,"lazy.orange.elephant","慢腾腾的橘色大象");
// amqpTemplate.convertAndSend(RabbitMQConfig.TOPIC_EXCHANGE,"quick.orange.fox","quick.orange.fox");
// amqpTemplate.convertAndSend(RabbitMQConfig.TOPIC_EXCHANGE,"lazy.brown.fox","lazy.brown.fox");
amqpTemplate.convertAndSend(RabbitMQConfig.TOPIC_EXCHANGE,"quick.brown.fox","quick.brown.fox");
}
@Override
public void sendDelayedMessage(String message, Integer delayTime) {
amqpTemplate.convertAndSend(RabbitMQConfig.DELAYED_DIRECT_EXCHANGE,RabbitMQConfig.DELAYED_DIRECT_ROUTINGKEY,message,a->{
a.getMessageProperties().setDelay(delayTime);
return a;
});
}
/**
*
* @param correlationData 消息唯一标识
* @param ack 交换机是否成功收到消息 true成功 false失败
* @param cause 失败原因
*/
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
System.out.println("confirm方法被执行了..."+correlationData);
if(ack){
System.out.println("交换机,消息接收成功"+cause);
}else{
System.out.println("交换机,消息接收失败"+cause);
//我们这里要做一些消息补发的措施
System.out.println("id="+correlationData.getId());
}
}
/**
*
* @param message 消息主体
* @param replyCode 返回code
* @param replyText 返回信息
* @param exchange 交换机
* @param routingKey 路由key
*/
@Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
System.out.println("return方法被执行...");
System.out.println("消息主体:"+new String(message.getBody()));
System.out.println("replyCode:"+replyCode);
System.out.println("replyText:"+replyText);
System.out.println("exchange:"+exchange);
System.out.println("routingKey:"+routingKey);
}
}
package com.java1234.producer.service;
public interface RabbitMqService {
/**
* 发送消息
* @param message
*/
public void sendMessage(String message);
/**
* 发送TTL消息
* @param message
*/
public void sendTTLMessage(String message);
/**
* 发送消息
* @param message
*/
public void sendFanoutMessage(String message);
/**
* 发送路由模式消息
*/
public void sendRoutingMessage();
/**
* 发送Topic模式消息
*/
public void sendTopicMessage();
/**
* 发送延迟消息
* @param message
* @param delayTime
*/
public void sendDelayedMessage(String message,Integer delayTime);
}
package com.java1234.producer;
import com.java1234.producer.service.RabbitMqService;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ConfigurableApplicationContext;
@SpringBootApplication
public class ProducerApplication {
public static void main(String[] args) {
ApplicationContext ac = SpringApplication.run(ProducerApplication.class, args);
RabbitMqService rabbitMqService=(RabbitMqService) ac.getBean("rabbitmqService");
// rabbitMqService.sendTTLMessage("测试消息TTL");
rabbitMqService.sendDelayedMessage("测试延迟消息10秒",10000);
rabbitMqService.sendDelayedMessage("测试延迟消息20秒",20000);
// rabbitMqService.sendRoutingMessage();
// rabbitMqService.sendTopicMessage();
// rabbitMqService.sendMessage("confirm确认测试消息");
// for(int i=0;i < 20;i++){
rabbitMqService.sendMessage("测试消息"+i);
// rabbitMqService.sendTTLMessage("测试消息TTL"+i);
// rabbitMqService.sendMessage("RabbitMQ大爷你好!!!"+i);
rabbitMqService.sendFanoutMessage(i+"用户欠费了");
// }
}
}
server:
port: 80
spring:
rabbitmq:
host: 192.168.30.113
port: 5672
username: pzy
password: 123456
virtual-host: /
publisher-confirm-type: correlated
publisher-returns: true
文章来源:https://blog.csdn.net/m0_68935893/article/details/135476900
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。 如若内容造成侵权/违法违规/事实不符,请联系我的编程经验分享网邮箱:veading@qq.com进行投诉反馈,一经查实,立即删除!
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。 如若内容造成侵权/违法违规/事实不符,请联系我的编程经验分享网邮箱:veading@qq.com进行投诉反馈,一经查实,立即删除!