HelloWorld搭建(第一种模型)

2024-01-01 17:31:56

1.创建Springboot项目并且引入依赖

<!-- 引入RabbitMQ的相关依赖 -->
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.7.2</version>
</dependency>

2.第一种模式(直连)

P:生产者,也就是发送消息的程序。

C:消费者:消息的接收者,会一直等待消息的程序。

queue:消息队列,类似一个邮箱,可以缓存消息;生产者向其中投递消息,消费者从其中取出消费消息。

3.定义生产者消费者案例

3.1封装工具类

public class RabbitMqUtil {
    private static ConnectionFactory connectionFactory;

    static {
        //创建连接mq的连接工厂对象
        connectionFactory = new ConnectionFactory();
        //设置连接rabbitmq的主机
        connectionFactory.setHost("127.0.0.1");
        //设置端口号
        connectionFactory.setPort(5672);
        //设置连接哪个虚拟主机
        connectionFactory.setVirtualHost("/ems");
        //设置访问虚拟主机的用户,需要用户名和密码
        connectionFactory.setUsername("ems");
        connectionFactory.setPassword("ems");
    }
    //获取连接对象
    public static Connection getConnection(){
        try {
            return connectionFactory.newConnection();
        } catch (Exception e) {
            e.printStackTrace();
        }
        return null;
    }

    //关闭通道和关闭连接工具方法
    public static void closeConnectionAndChannel(Channel channel, Connection connection){
        try{
            if(channel!=null) channel.close();
            if(connection!=null) connection.close();
        }catch (Exception e){
            e.printStackTrace();
        }
    }
}

?

3.2生产者

public class Provider {
    //生产消息
    @Test
    public void testSendMessage() throws IOException, TimeoutException {

        //获取连接对象
        Connection connection = RabbitMqUtil.getConnection();
        //获取连接通道
        Channel channel = connection.createChannel();
        /*
            通道绑定对应消息队列
            queue:队列名称 如果队列不存在会自动创建
            durable:用来定义队列特性是否要持久化 true:持久化队列  false:不持久化,如果是不持久化,消息队列重启队列就会全部消失,消息也会丢失
            exclusive:是否独占队列 true 独占  false 不独占
            autoDelete:在消费者完成消费并与队列断开连接后是否自动删除队列
            arguments:额外附加参数
         */
        channel.queueDeclare("hello",true,false,false,null);
        //发布消息(这一步才是关键,指明了消息到底发到哪个队列去了)
        /*
            参数:交换机名称
                队列名称
                传递消息额外设置   MessageProperties.PERSISTENT_TEXT_PLAIN表示消息持久化
                消息具体内容
         */
        channel.basicPublish("","hello", MessageProperties.PERSISTENT_TEXT_PLAIN,"hello rabbitmq".getBytes());
        //关闭通道   关闭连接
        RabbitMqUtil.closeConnectionAndChannel(channel,connection);
    }
}

3.3消费者

public class Consumer {
    //消费消息,这里需要用main函数,因为消费端要一直监听队列,而test测试会直接结束
    public static void main(String[] args) throws IOException, TimeoutException {
        //创建连接mq的连接工厂对象
        ConnectionFactory connectionFactory = new ConnectionFactory();
        //设置连接rabbitmq的主机
        connectionFactory.setHost("127.0.0.1");
        //设置端口号
        connectionFactory.setPort(5672);
        //设置连接哪个虚拟主机
        connectionFactory.setVirtualHost("/ems");
        //设置访问虚拟主机的用户,需要用户名和密码
        connectionFactory.setUsername("ems");
        connectionFactory.setPassword("ems");
        //获取连接对象
        Connection connection = connectionFactory.newConnection();
        //获取连接通道
        Channel channel = connection.createChannel();
        /*
            通道绑定对应消息队列
            queue:队列名称 如果队列不存在会自动创建
            durable:用来定义队列特性是否要持久化 true:持久化队列  false:不持久化
            exclusive:是否独占队列
            autoDelete:是否在消费完成后自动删除队列
            arguments:额外附加参数
         */
        channel.queueDeclare("hello",true,false,false,null);
        /*
            参数1:消费的队列名称
            参数2:开始消息的自动确定机制
            参数3:消费时的回调接口
         */
        channel.basicConsume("hello",true,new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("=============="+new String(body));
            }
        });
        //注意这里不能关闭通道和连接,因为要一直监听
    }
}

文章来源:https://blog.csdn.net/m0_62565675/article/details/135326221
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。