springboot rabbitmq 发布订阅 广播模式
2023-12-15 20:55:07
根据amqp协议、rabbitmq入门、springboot集成rabbitmq 可知,rabbitmq的广播模式关键是使用fanout类型的exchange,fanout exchange会忽略message中的routing-key、queue中的binding-key,发给绑定exchange的全部queue。
创建fanout类型的exchange
import org.springframework.amqp.core.*;
@Configuration
public class MqConfig {
/**
* 定义广播交换机
* @return
*/
@Bean
public FanoutExchange fanoutExchange() {
final FanoutExchange fanoutExchange = new FanoutExchange("自定义广播类型的交换机名称");
return fanoutExchange;
}
}
发送
@Autowired
private AmqpTemplate amqpTemplate;
//发送到订阅数据的exchange中
amqpTemplate.convertAndSend("自定义广播类型的交换机名称",
//fanout类型的exchange会忽略routing-key,所以这里的binding key传空字符串
"",
message);
消费
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
/**
* 将数据发送给队列1
* @param message
*/
@RabbitListener(bindings = @QueueBinding(value = @Queue(“自定义队列1”),
exchange = @Exchange(value = "自定义广播类型的交换机名称",
type = ExchangeTypes.FANOUT),
//fanout类型exchange会忽略binding-key
key = ""))
public void doSynAddDataToJD(String message) {
log.info("广播模式,同步数据给订阅方");
}
/**
* 将数据发送给队列2
* @param message
*/
@RabbitListener(bindings = @QueueBinding(value = @Queue(“自定义队列2”),
exchange = @Exchange(value = "自定义广播类型的交换机名称",
type = ExchangeTypes.FANOUT),
key = ""))
public void doSynAddDataToJD(String message) {
log.info("广播模式,同步数据给订阅方");
}
总结
实现发布订阅(广播模式)的关键在于对exchange类型的理解,可参考amqp协议、rabbitmq入门、springboot集成rabbitmq 、AMQP 0-9-1 Model Explained,源码中的类型有如下几种
package org.springframework.amqp.core;
/**
* Constants for the standard Exchange type names.
*
* @author Mark Fisher
* @author Gary Russell
*/
public abstract class ExchangeTypes {
/**
* Direct exchange.
* routing key和binding key完全匹配
*/
public static final String DIRECT = "direct";
/**
* Topic exchange.
* binding key可使用通配符来匹配routing key
*/
public static final String TOPIC = "topic";
/**
* Fanout exchange.
* 会忽略routing key、binding key,消息发送到绑定exchange的全部queue
*/
public static final String FANOUT = "fanout";
/**
* Headers exchange.
* 使用headers中的属性来匹配,有只匹配一项或者全部匹配可选
*/
public static final String HEADERS = "headers";
/**
* System exchange.
* 这个类型,暂时缺乏相关资料。
*/
public static final String SYSTEM = "system";
}
文章来源:https://blog.csdn.net/wangjun5159/article/details/134995029
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。 如若内容造成侵权/违法违规/事实不符,请联系我的编程经验分享网邮箱:veading@qq.com进行投诉反馈,一经查实,立即删除!
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。 如若内容造成侵权/违法违规/事实不符,请联系我的编程经验分享网邮箱:veading@qq.com进行投诉反馈,一经查实,立即删除!