RabbitMQ消息模型之Routing-Topic

2023-12-13 03:58:03

Routing Topic

Topic类型的Exchange与Direct相比,都是可以根据RoutingKey把消息路由到不同的队列。只不过Topic类型Exchange可以让队列在绑定Routing key的时候使用通配符!这种模型Routingkey一般都是由一个或多个单词组成,多个单词之间以”.”分割,例如: item.insertitem.#item.*

image-20220526183808035

统配符

* (star) can substitute for exactly one word.    匹配不多不少恰好1个词
# (hash) can substitute for zero or more words.  匹配[0-n]个词

a.#   可以匹配 a.b、a.b.c、a.b.c.d 等只要是a.开头的情况
a.*   只能匹配 a.b 这种后面只有一个单词的情况

通配符可以出现在

注意:RoutingKey的任意位置。

创建生产者

public class MyProducer {

    @Test
    public void test() throws Exception {
        // 交换机
        String exchange = "logs_topic";

        // 创建工厂
        ConnectionFactory factory = new ConnectionFactory();
        factory.setVirtualHost("/");
        factory.setHost("xuewei.world");
        factory.setUsername("xuewei");
        factory.setPassword("123456");
        factory.setPort(5672);

        // 创建连接和通道
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        // 声明交换机
        channel.exchangeDeclare(exchange, "topic");
            // 发布消息
            channel.basicPublish(exchange, "a.b", null, "a.b".getBytes());
            channel.basicPublish(exchange, "a.b.c", null, "a.b.c".getBytes());
            channel.basicPublish(exchange, "a.b.c.d", null, "a.b.c.d" .getBytes());
            channel.basicPublish(exchange, "a.b.c.d.e", null, "a.b.c.d.e".getBytes());
    }
}

创建消费者1

public class MyConsumer1 {

    public static void main(String[] args) throws Exception {
        // 指定交换机
        String exchange = "logs_topic";

        // 创建工厂
        ConnectionFactory factory = new ConnectionFactory();
        factory.setVirtualHost("/");
        factory.setHost("xuewei.world");
        factory.setUsername("xuewei");
        factory.setPassword("123456");
        factory.setPort(5672);

        // 创建连接和通道
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        // 绑定交换机
        channel.exchangeDeclare(exchange, "topic");
        // 创建临时队列
        String queue = channel.queueDeclare().getQueue();
        // 将临时队列绑定exchange
        channel.queueBind(queue, exchange, "a.*");
        channel.queueBind(queue, exchange, "#.d.#");
        // 处理消息
        channel.basicConsume(queue, true, new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("消费者1: " + new String(body));
                // TODO 业务处理
            }
        });
    }
}

创建消费者2

public class MyConsumer2 {

    public static void main(String[] args) throws Exception {
        // 指定交换机
        String exchange = "logs_topic";

        // 创建工厂
        ConnectionFactory factory = new ConnectionFactory();
        factory.setVirtualHost("/");
        factory.setHost("xuewei.world");
        factory.setUsername("xuewei");
        factory.setPassword("123456");
        factory.setPort(5672);

        // 创建连接和通道
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        // 绑定交换机
        channel.exchangeDeclare(exchange, "topic");
        // 创建临时队列
        String queue = channel.queueDeclare().getQueue();
        // 将临时队列绑定exchange
        channel.queueBind(queue, exchange, "#.b.#");
        // 处理消息
        channel.basicConsume(queue, true, new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("消费者2: " + new String(body));
                // TODO 业务处理
            }
        });
    }
}

生产者生产的消息:a.ba.b.ca.b.c.da.b.c.d.e

消费者1接受的消息规则为:

channel.queueBind(queue, exchange, "a.*");
channel.queueBind(queue, exchange, "#.d.#");

所以消费者1将会接收到:a.ba.b.c.da.b.c.d.e

消费者2接受的消息规则为:

channel.queueBind(queue, exchange, "#.b.#");

所以消费者2将会接收到:a.ba.b.ca.b.c.da.b.c.d.e

image-20220526185946980

文章来源:https://blog.csdn.net/weixin_53287520/article/details/134701423
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。