RabbitMQ路由模式

2024-01-07 23:46:33
package com.java1234.producer.config;

import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class RabbitMQConfig {


    /**
     * direct交换机名称
     */
    public static final String DIRECT_EXCHANGE="directExchange";

    /**
     * direct交换机名称1
     */
    public static final String DIRECT_EXCHANGE1="directExchange1";


    /**
     * fanout交换机名称
     */
    public static final String FANOUT_EXCHANGE="fanoutExchange";

    /**
     * direct队列名称
     */
    public static final String DIRECT_QUEUE="directQueue";

    /**
     * direct1队列名称
     */
    public static final String DIRECT_QUEUE1="directQueue1";

    /**
     * direct2队列名称
     */
    public static final String DIRECT_QUEUE2="directQueue2";

    /**
     * 订阅队列1名称
     */
    public static final String SUB_QUEUE1="subQueue1";
    /**
     * 订阅队列2名称
     */
    public static final String SUB_QUEUE2="subQueue2";

    /**
     * direct路由Key
     */
    public static final String DIRECT_ROUTINGKEY="directRoutingKey";

    /**
     * 定义一个direct交换机
     * @return
     */
    @Bean
    public DirectExchange directExchange(){
        return new DirectExchange(DIRECT_EXCHANGE);
    }


    /**
     * 定义一个direct交换机1
     * @return
     */
    @Bean
    public DirectExchange directExchange1(){
        return new DirectExchange(DIRECT_EXCHANGE1);
    }


    /**
     * 定义一个direct交换机
     * @return
     */
    @Bean
    public FanoutExchange fanoutExchange(){
        return new FanoutExchange(FANOUT_EXCHANGE);
    }

    /**
     * 定义一个direct队列
     * @return
     */
    @Bean
    public Queue directQueue(){
        return new Queue(DIRECT_QUEUE);
    }


    /**
     * 定义一个direct1队列
     * @return
     */
    @Bean
    public Queue directQueue1(){
        return new Queue(DIRECT_QUEUE1);
    }


    /**
     * 定义一个direct2队列
     * @return
     */
    @Bean
    public Queue directQueue2(){
        return new Queue(DIRECT_QUEUE2);
    }

    /**
     * 定义一个订阅队列1
     * @return
     */
    @Bean
    public Queue subQueue1(){
        return new Queue(SUB_QUEUE1);
    }

    /**
     * 定义一个订阅队列2
     * @return
     */
    @Bean
    public Queue subQueue2(){
        return new Queue(SUB_QUEUE2);
    }

    /**
     * 定义一个队列和交换机的绑定
     * @return
     */
    @Bean
     public Binding directBinding(){
        return BindingBuilder.bind(directQueue()).to(directExchange()).with(DIRECT_ROUTINGKEY);
     }

    /**
     * 定义一个队列和交换机的绑定
     * @return
     */
    @Bean
    public Binding fanoutBinding1(){
        return BindingBuilder.bind(subQueue1()).to(fanoutExchange());
    }

    /**
     * 定义一个队列和交换机的绑定
     * @return
     */
    @Bean
    public Binding fanoutBinding2(){
        return BindingBuilder.bind(subQueue2()).to(fanoutExchange());
    }

    /**
     * 定义一个队列和交换机的绑定
     * @return
     */
    @Bean
    public Binding directBinding1(){
        return BindingBuilder.bind(directQueue1()).to(directExchange1()).with("error");
    }

    /**
     * 定义一个队列和交换机的绑定
     * @return
     */
    @Bean
    public Binding directBinding2(){
        return BindingBuilder.bind(directQueue2()).to(directExchange1()).with("info");
    }

    /**
     * 定义一个队列和交换机的绑定
     * @return
     */
    @Bean
    public Binding directBinding3(){
        return BindingBuilder.bind(directQueue2()).to(directExchange1()).with("error");
    }
    /**
     * 定义一个队列和交换机的绑定
     * @return
     */
    @Bean
    public Binding directBinding4(){
        return BindingBuilder.bind(directQueue2()).to(directExchange1()).with("warning");
    }

}

package com.java1234.consumer.service.impl;

import com.java1234.consumer.service.RabbitMqService;
import com.java1234.producer.config.RabbitMQConfig;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

@Service("rabbitmqService")
public class RabbitMqServiceImpl implements RabbitMqService {

    @Autowired
    private AmqpTemplate amqpTemplate;

    @Override
    public void receiveMessage() {
        String message=(String) amqpTemplate.receiveAndConvert(RabbitMQConfig.DIRECT_QUEUE);
        System.out.println("接受到的mq消息:"+message);
    }

    @Override
    @RabbitListener(queues = {RabbitMQConfig.DIRECT_QUEUE1})
    public void receiveMessage2(String message) {

//        System.out.println("消费者1:接收到的mq消息:"+message);
                System.out.println("队列1接收日志消息:"+message);

    }


    @Override
    @RabbitListener(queues = {RabbitMQConfig.DIRECT_QUEUE2})
    public void receiveMessage3(String message) {
//        System.out.println("消费者2:接收到的mq消息:"+message);
                System.out.println("队列2接收日志消息:"+message);

    }

    @Override
    @RabbitListener(queues = {RabbitMQConfig.SUB_QUEUE1})
    public void receiveSubMessage1(String message) {
        System.out.println("订阅者1:接收到的mq消息:"+message);
    }

    @Override
    @RabbitListener(queues = {RabbitMQConfig.SUB_QUEUE2})
    public void receiveSubMessage2(String message) {
        System.out.println("订阅者2:接收到的mq消息:"+message);
    }

}

package com.java1234.consumer.service;

public interface RabbitMqService {

    /**
     * 接受消息
     */
    public void receiveMessage();

    /**
     * 接受消息
     */
    public void receiveMessage2(String message);

    /**
     * 接受消息
     */
    public void receiveMessage3(String message);


    /**
     * 接受订阅消息1
     */
    public void receiveSubMessage1(String message);
    /**
     * 接受订阅消息2
     */
    public void receiveSubMessage2(String message);
}

package com.java1234.consumer;

import com.java1234.consumer.service.RabbitMqService;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.ApplicationContext;

@SpringBootApplication
public class ConsumerApplication {

    public static void main(String[] args) {
       ApplicationContext ac = SpringApplication.run(ConsumerApplication.class,args);
//        RabbitMqService rabbitMqService=(RabbitMqService) ac.getBean("rabbitmqService");
//        rabbitMqService.receiveMessage();
    }
}

package com.java1234.producer.service.impl;

import com.java1234.producer.config.RabbitMQConfig;
import com.java1234.producer.service.RabbitMqService;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

@Service("rabbitmqService")
public class RabbitMqServiceImpl implements RabbitMqService {

    @Autowired
    private AmqpTemplate amqpTemplate;

    /**
     * String exchange 交换机名称
     * String routingKey 路由Key
     * Object object 具体发送的消息
     * @param message
     */
    @Override
    public void sendMessage(String message) {
        amqpTemplate.convertAndSend(RabbitMQConfig.DIRECT_EXCHANGE,RabbitMQConfig.DIRECT_ROUTINGKEY,message);
    }

    @Override
    public void sendFanoutMessage(String message) {
        amqpTemplate.convertAndSend(RabbitMQConfig.FANOUT_EXCHANGE,"",message);
    }

    @Override
    public void sendRoutingMessage() {
        amqpTemplate.convertAndSend(RabbitMQConfig.DIRECT_EXCHANGE1,"warning","发送warning级别的消息");
    }
}

package com.java1234.producer.service;

public interface RabbitMqService {

    /**
     * 发送消息
     * @param message
     */
    public void sendMessage(String message);

    /**
     * 发送消息
     * @param message
     */
    public void sendFanoutMessage(String message);


    /**
     * 发送路由模式消息
     */
    public void sendRoutingMessage();
}

package com.java1234.producer;

import com.java1234.producer.service.RabbitMqService;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ConfigurableApplicationContext;

@SpringBootApplication
public class ProducerApplication {

    public static void main(String[] args) {
        ApplicationContext ac = SpringApplication.run(ProducerApplication.class, args);
        RabbitMqService rabbitMqService=(RabbitMqService) ac.getBean("rabbitmqService");
        rabbitMqService.sendRoutingMessage();
        //        for(int i=0;i < 10;i++){
            rabbitMqService.sendMessage("RabbitMQ大爷你好!!!"+i);
//             rabbitMqService.sendFanoutMessage(i+"用户欠费了");
//        }
    }
}

文章来源:https://blog.csdn.net/m0_68935893/article/details/135446618
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。