Springboot整合MQ学习记录

2024-01-07 17:22:56

Mq介绍

RabbitMQ是由erlang语言开发,基于AMQP(Advanced Message Queue 高级消息队列协议)协议实现的消息队列,它是一种应用程序之间的通信方法,消息队列在分布式系统开发中应用非常广泛。支持Windows、Linux/Unix、MAC OS X操作系统和包括JAVA在内的多种编程语言。

引入Pom

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

具体的模型。概念都不说了。直接看代码

控制器测试代码

package com.example.demoamqp.controller;

import cn.hutool.core.date.DateUtil;
import cn.hutool.core.util.IdUtil;
import cn.hutool.core.util.RandomUtil;
import com.example.demoamqp.entity.Order;
import com.example.demoamqp.send.*;
import lombok.extern.slf4j.Slf4j;
import org.springframework.web.bind.annotation.*;

/**
 * @version 1.0.0
 * @className: TestSendMqController
 * @description: 消息发送者
 * @author: zhangjunfa
 * @date: 2023/6/16 11:05
 */
@Slf4j
@RequestMapping
@RestController
public class TestSendMqController {

    private Sender sender;
    private FanoutSender fanoutSender;

    private TopicSender topicSender;
    private DeadSender deadSender;
    private DelayQueueSender delayQueueSender;
    private Delay2Sender delay2Sender;


    public TestSendMqController(Sender sender, FanoutSender fanoutSender, TopicSender topicSender, DeadSender deadSender, DelayQueueSender delayQueueSender, Delay2Sender delay2Sender) {
        this.sender = sender;
        this.fanoutSender = fanoutSender;
        this.topicSender = topicSender;
        this.deadSender = deadSender;
        this.delayQueueSender = delayQueueSender;
        this.delay2Sender = delay2Sender;
    }


    @PostMapping("/send")
    public Object send(@RequestParam(name = "param") String param) throws InterruptedException {
        Thread.sleep(3000);
        sender.send(param);
        return "success";
    }

    @PostMapping("/sendSimple")
    public Object sendSimple(@RequestParam(name = "orderName") String orderName) throws InterruptedException {
        Order order = new Order();
        order.setId(IdUtil.getSnowflakeNextId());
        order.setOrderName(orderName);
        order.setOrderNo(IdUtil.nanoId());
        order.setCreatedTime(DateUtil.date());
        sender.sendSimple(order);
        return "success";
    }

    @PostMapping("/sendCode")
    public Object sendCode() throws InterruptedException {
        int randomInt = RandomUtil.randomInt(100000, 999999);
        log.info("生产者生成了一个验证码:{}", randomInt);
        this.fanoutSender.sendCode(String.valueOf(randomInt));
        return "success";
    }

    @PostMapping("/sendTopic")
    public Object sendTopic(@RequestParam(name = "msg") String msg, @RequestParam(name = "routingKey") String routingKey) throws InterruptedException {
        this.topicSender.sendMsg(msg, routingKey);
        return "success";
    }

    /**
     * 延迟队列(死信)设计
     *
     * @param msg
     * @return
     * @throws InterruptedException
     */
    @PostMapping("/sendDead")
    public Object sendDead(@RequestParam(name = "msg") String msg) throws InterruptedException {
        this.deadSender.sendDelay(msg, 2000);
        return "success 我是死信队列";
    }

    /**
     * 延迟队列设计
     *
     * @param msg
     * @return
     * @throws InterruptedException
     */
    @PostMapping("/sendDelay")
    public Object sendDelay(@RequestParam(name = "msg") String msg) throws InterruptedException {
        this.delayQueueSender.sendMsg(msg);
        return "success 我是延迟队列";
    }

    /**
     * 延迟队列设计
     *
     * @param msg
     * @return
     * @throws InterruptedException
     */
    @PostMapping("/sendDelay2")
    public Object sendDelay2(@RequestParam(name = "msg") String msg,@RequestParam(name = "delayTime") Integer delayTime) throws InterruptedException {
        this.delay2Sender.sendDelay2(msg,delayTime);
        return "success 我是延迟队列";
    }
}

配置类代码

package com.example.demoamqp.config;

import com.example.demoamqp.conatants.Constants;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * 创建消息队列并注入容器中
 *
 * @author ross
 */

@Configuration
public class QueueConfig {

    /**
     * 创建队列
     *
     * @return
     */
    @Bean
    public Queue createQueue() {
        return new Queue("ross_amq");
    }


    /******************************  发布、订阅者模式  *********************************/
    @Bean // 邮箱的队列
    public Queue mailQueue(){
        return new Queue(Constants.MQ_MAIL_QUEUE,
                Constants.durable,
                Constants.exclusive,
                Constants.autoDelete);
    }

    @Bean // 电话的队列
    public Queue phoneQueue(){
        return new Queue(Constants.MQ_PHONE_QUEUE,
                Constants.durable,
                Constants.exclusive,
                Constants.autoDelete);
    }
    @Bean // 交换机
    public FanoutExchange fanoutExchange(){
        return new FanoutExchange(Constants.MQ_FANOUT_EXCHANGE,
                Constants.durable,
                Constants.autoDelete);
    }

    /**
     * 邮箱绑定交换机
     * @return
     */
    @Bean
    public Binding mailBinding(){
        return BindingBuilder.bind(mailQueue())
                .to(fanoutExchange());
    }

    /**
     * 电话绑定交换机
     * @return
     */
    @Bean
    public Binding phoneBinding(){
        return BindingBuilder.bind(phoneQueue())
                .to(fanoutExchange());
    }




    /*----------------------------------------------------*/
    @Bean // A队列
    public Queue topicAQueue(){
        return new Queue(Constants.MQ_TOPIC_QUEUE_A,
                Constants.durable,
                Constants.exclusive,
                Constants.autoDelete);
    }

    /**
     * topic模式相关配置
     */

    @Bean // B队列
    public Queue topicBQueue(){
        return new Queue(Constants.MQ_TOPIC_QUEUE_B,
                Constants.durable,
                Constants.exclusive,
                Constants.autoDelete);
    }

    @Bean // topic的交换机
    public TopicExchange topicMyExchange(){
        return new TopicExchange(Constants.MQ_TOPIC_EXCHANGE,
                Constants.durable,
                Constants.autoDelete);
    }


    @Bean
    public Binding topicAQueueBinding(){
        return BindingBuilder
                .bind(topicAQueue())
                .to(topicMyExchange())
                .with("topic.xxx"); // 规则 topic.xxx
    }

    @Bean
    public Binding topicBQueueBinding(){
        return BindingBuilder
                .bind(topicBQueue())
                .to(topicMyExchange())
                .with("topic.*"); // 规则 topic.xxx
    }
}

消费者

package com.example.demoamqp.receiver;

import cn.hutool.core.date.DateUtil;
import com.example.demoamqp.conatants.Constants;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

/**
 * 死信队列消费者
 * @className: MqDelayReceiver
 * @projectName: demo-one
 * @auth: rosszhang
 * @date: 2023/12/28 16:53
 */
@Slf4j
@Component
public class MqDelayReceiver {
    @RabbitListener(queues = Constants.MQ_DELAY_QUEUE)
    public void delayConsume(String msg) {
        log.debug("[消费者消费信息:{},时间:{}", msg, DateUtil.date());
    }

}

发送者代码

package com.example.demoamqp.send;

import cn.hutool.core.date.DateUtil;
import com.example.demoamqp.conatants.Constants;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

/**
 * 死信队列
 *
 * @className: DeadSender
 * @projectName: demo-one
 * @auth: rosszhang
 * @date: 2023/12/28 16:49
 */
@Slf4j
@Component
public class DeadSender {
    private AmqpTemplate rabbitAmqpTemplate;

    @Autowired
    public void setRabbitAmqpTemplate(AmqpTemplate rabbitAmqpTemplate) {
        this.rabbitAmqpTemplate = rabbitAmqpTemplate;
    }

    /**
     * 死信队列
     *
     * @param msg
     * @param delayTime
     */
    public void sendDelay(String msg, int delayTime) {
        rabbitAmqpTemplate.convertAndSend(
                Constants.MQ_NORMAL_EXCHANGE,
                Constants.MQ_NORMAL_ROUTING_KEY,
                msg,
                process -> {
                    process.getMessageProperties().setExpiration(String.valueOf(delayTime));
                    return process;
                }
        );
        log.debug("[生产者:]发送消息:{},时间{},延迟{}秒", msg, DateUtil.date(), delayTime / 1000);

    }
}

源码大家可以看我Gitte地址
Gitte仓库地址

下面这个是我的个人公共号 只会写Bug的程序猿,大家可以关注一下,一键三连。相互交流学习。
在这里插入图片描述

文章来源:https://blog.csdn.net/u013432938/article/details/135424681
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。