Rabbit MQ工作模式
2024-01-03 17:21:39
1. 简单模式
简单模式就是消息队列的最直观的收发消息
<dependency> ? ?<groupId>com.rabbitmq</groupId> ? ?<artifactId>amqp-client</artifactId> ? ?<version>3.6.5</version> </dependency>
package com.qf.rabbitmq.util;
?
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
?
import java.io.IOException;
import java.util.concurrent.TimeoutException;
?
public class ConnectionUtil {
?
? ?public static final String SIMPLE_QUEUE = "simple_queue";
?
? ?private static ConnectionFactory factory;
? ?static {
? ? ? ?factory = new ConnectionFactory();
? ? ? ?factory.setHost("localhost");//设置IP
? ? ? ?factory.setPort(5672);//设置端口
? ? ? ?factory.setUsername("admin");//设置账号
? ? ? ?factory.setPassword("admin");//设置密码
? ? ? ?factory.setVirtualHost("/");//设置能够访问的虚拟主机
? }
?
? ?public static Connection getConnection() throws IOException, TimeoutException {
? ? ? ?return factory.newConnection();//建立一个新的连接
? }
}
?
package com.qf.rabbitmq.simple;
?
import com.qf.rabbitmq.util.ConnectionUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
?
import java.io.IOException;
import java.util.concurrent.TimeoutException;
?
public class SimpleProducer {
?
? ?public static void main(String[] args) throws IOException, TimeoutException {
? ? ? ?Connection connection = ConnectionUtil.getConnection();
? ? ? ?//在连接上开辟信道
? ? ? ?Channel channel = connection.createChannel();
? ? ? ?//信道直接声明与消息队列挂钩
? ? ? ?//第一个参数是队列的名称
? ? ? ?//第二个参数是队列是否持久化
? ? ? ?//第三个参数是队列在当前连接中是否排他
? ? ? ?//第四个参数是消息被消费了后是否自动删除
? ? ? ?//第五参数是队列的属性
? ? ? ?channel.queueDeclare(ConnectionUtil.SIMPLE_QUEUE,false,false,false,null);
? ? ? ?//通过信道发送消息
? ? ? ?String msg = "简单模式";
? ? ? ?//发布消息至队列中
? ? ? ?//第一个参数是交换机的名称
? ? ? ?//第二个参数是队列的名称
? ? ? ?//第三个参数是消息的头部信息
? ? ? ?//第四个参数是消息的内容
? ? ? ?channel.basicPublish("", ConnectionUtil.SIMPLE_QUEUE, null, msg.getBytes());
? ? ? ?System.out.println("发送了消息:" + msg);
? ? ? ?channel.close();
? ? ? ?connection.close();
? }
}
?
package com.qf.rabbitmq.simple;
?
import com.qf.rabbitmq.util.ConnectionUtil;
import com.rabbitmq.client.*;
?
import java.io.IOException;
import java.util.concurrent.TimeoutException;
?
public class SimpleConsumer {
?
? ?public static void main(String[] args) throws IOException, TimeoutException {
? ? ? ?Connection connection = ConnectionUtil.getConnection();
? ? ? ?Channel channel = connection.createChannel();
? ? ? ?channel.queueDeclare(ConnectionUtil.SIMPLE_QUEUE, false, false, false, null);
? ? ? ?DefaultConsumer consumer = new DefaultConsumer(channel){
? ? ? ? ? ?@Override
? ? ? ? ? ?public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
? ? ? ? ? ? ? ?System.out.println("接收到消息:" + ?new String(body));
? ? ? ? ? ? ? ?//确认消息已经被消费
? ? ? ? ? ? ? ?channel.basicAck(envelope.getDeliveryTag(), false);
? ? ? ? ? }
? ? ? };
? ? ? ?//监听队列,只要队列中有消息,就会消费队列中的消息
? ? ? ?//第一个参数是队列的名称
? ? ? ?//第二个参数是是否自动发送回执(消费了消息的回执信息)
? ? ? ?//第三个参数是消息消费者对象
? ? ? ?channel.basicConsume(ConnectionUtil.SIMPLE_QUEUE, false, consumer);
? }
}
在简单模式下,当生产者生产消息速度远远大于消费者消费消息的速度时,容易出现消息堆积。此时就需要使用多个消费者来消费生产者生产的消息,这就需要使用到工作模式。
2. 工作模
工作模式体现了能者多劳。如果消费者的性能存在较大差异,那么他们消费消息的数量也应该存在差异。
package com.qf.rabbitmq.work;
?
import com.qf.rabbitmq.util.ConnectionUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
?
import java.io.IOException;
import java.util.concurrent.TimeoutException;
?
public class WorkProducer {
?
? ?public static void main(String[] args) throws IOException, TimeoutException {
? ? ? ?Connection connection = ConnectionUtil.getConnection();
? ? ? ?Channel channel = connection.createChannel();
? ? ? ?channel.queueDeclare(ConnectionUtil.WORK_QUEUE,false,false,false,null);
? ? ? ?for(int i=0; i<100; i++){
? ? ? ? ? ?String msg = "工作模式" + i;
? ? ? ? ? ?channel.basicPublish("", ConnectionUtil.WORK_QUEUE, null, msg.getBytes());
? ? ? ? ? ?System.out.println("发送了消息:" + msg);
? ? ? }
? ? ? ?channel.close();
? ? ? ?connection.close();
? }
}
?
package com.qf.rabbitmq.work;
?
import com.qf.rabbitmq.util.ConnectionUtil;
import com.rabbitmq.client.*;
?
import java.io.IOException;
import java.util.concurrent.TimeoutException;
?
public class WorkConsumer1 {
?
? ?public static void main(String[] args) throws IOException, TimeoutException {
? ? ? ?Connection connection = ConnectionUtil.getConnection();
? ? ? ?Channel channel = connection.createChannel();
? ? ? ?channel.queueDeclare(ConnectionUtil.WORK_QUEUE, false, false, false, null);
? ? ? ?DefaultConsumer consumer = new DefaultConsumer(channel){
? ? ? ? ? ?@Override
? ? ? ? ? ?public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
? ? ? ? ? ? ? ?System.out.println("接收到消息:" + ?new String(body));
? ? ? ? ? ? ? ?try {
? ? ? ? ? ? ? ? ? ?Thread.sleep(500L);
? ? ? ? ? ? ? } catch (InterruptedException e) {
? ? ? ? ? ? ? ? ? ?e.printStackTrace();
? ? ? ? ? ? ? }
? ? ? ? ? ? ? ?channel.basicAck(envelope.getDeliveryTag(), false);
? ? ? ? ? }
? ? ? };
? ? ? ?channel.basicQos(1);
? ? ? ?channel.basicConsume(ConnectionUtil.WORK_QUEUE, false, consumer);
? }
}
?
package com.qf.rabbitmq.work;
?
import com.qf.rabbitmq.util.ConnectionUtil;
import com.rabbitmq.client.*;
?
import java.io.IOException;
import java.util.concurrent.TimeoutException;
?
public class WorkConsumer2 {
?
? ?public static void main(String[] args) throws IOException, TimeoutException {
? ? ? ?Connection connection = ConnectionUtil.getConnection();
? ? ? ?Channel channel = connection.createChannel();
? ? ? ?channel.queueDeclare(ConnectionUtil.WORK_QUEUE, false, false, false, null);
? ? ? ?DefaultConsumer consumer = new DefaultConsumer(channel){
? ? ? ? ? ?@Override
? ? ? ? ? ?public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
? ? ? ? ? ? ? ?System.out.println("接收到消息:" + ?new String(body));
? ? ? ? ? ? ? ?try {
? ? ? ? ? ? ? ? ? ?Thread.sleep(100L);
? ? ? ? ? ? ? } catch (InterruptedException e) {
? ? ? ? ? ? ? ? ? ?e.printStackTrace();
? ? ? ? ? ? ? }
? ? ? ? ? ? ? ?channel.basicAck(envelope.getDeliveryTag(), false);
? ? ? ? ? }
? ? ? };
? ? ? ?channel.basicQos(1);
? ? ? ?channel.basicConsume(ConnectionUtil.WORK_QUEUE, false, consumer);
? }
}
在工作模式下,可以解决消息堆积的问题,但是又有新的问题产生,所有消费者都需要接收生产者发送的消息(也就是生产者发了一个广播通知),此时就需要使用到交换机了,可以使用交换机的发布订阅模式来解决问题。
3. 发布订阅模式
生产者将消息发送到交换机,但交换机本身并没有存储消息的能力,因此,需要消费者先建立队列绑定到交换机,这样,生产者发送的消息通过交换机直接到达所有绑定的队列中,消费者就可以从队列中消费这些消息了。
package com.qf.rabbitmq.fanout;
?
import com.qf.rabbitmq.util.ConnectionUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
?
import java.io.IOException;
import java.util.concurrent.TimeoutException;
?
public class FanoutProducer {
?
? ?public static void main(String[] args) throws IOException, TimeoutException {
? ? ? ?Connection connection = ConnectionUtil.getConnection();
? ? ? ?Channel channel = connection.createChannel();
? ? ? ?//声明一个类型为fanout的交换机
? ? ? ?channel.exchangeDeclare(ConnectionUtil.FANOUT_EXCHANGE, "fanout");
? ? ? ?String msg = "发布订阅模式,广播通知";
? ? ? ?channel.basicPublish(ConnectionUtil.FANOUT_EXCHANGE, "", null, msg.getBytes());
? ? ? ?System.out.println("发送了消息:" + msg);
? ? ? ?channel.close();
? ? ? ?connection.close();
? }
}
?
?
package com.qf.rabbitmq.fanout;
?
import com.qf.rabbitmq.util.ConnectionUtil;
import com.rabbitmq.client.*;
?
import java.io.IOException;
import java.util.concurrent.TimeoutException;
?
public class FanoutConsumer1 {
?
? ?private static final String QUEUE_NAME = "fanout_queue_1";
?
? ?public static void main(String[] args) throws IOException, TimeoutException {
? ? ? ?Connection connection = ConnectionUtil.getConnection();
? ? ? ?Channel channel = connection.createChannel();
? ? ? ?channel.queueDeclare(QUEUE_NAME, false, false, false, null);
? ? ? ?channel.queueBind(QUEUE_NAME, ConnectionUtil.FANOUT_EXCHANGE,"");
? ? ? ?DefaultConsumer consumer = new DefaultConsumer(channel){
? ? ? ? ? ?@Override
? ? ? ? ? ?public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
? ? ? ? ? ? ? ?System.out.println("接收到消息:" + ?new String(body));
? ? ? ? ? }
? ? ? };
? ? ? ?//因为是广播,所以这里需要设置为自动回执
? ? ? ?channel.basicConsume(QUEUE_NAME, true, consumer);
? }
}
?
?
package com.qf.rabbitmq.fanout;
?
import com.qf.rabbitmq.util.ConnectionUtil;
import com.rabbitmq.client.*;
?
import java.io.IOException;
import java.util.concurrent.TimeoutException;
?
public class FanoutConsumer2 {
?
? ?private static final String QUEUE_NAME = "fanout_queue_2";
?
? ?public static void main(String[] args) throws IOException, TimeoutException {
? ? ? ?Connection connection = ConnectionUtil.getConnection();
? ? ? ?Channel channel = connection.createChannel();
? ? ? ?channel.queueDeclare(QUEUE_NAME, false, false, false, null);
? ? ? ?channel.queueBind(QUEUE_NAME, ConnectionUtil.FANOUT_EXCHANGE,"");
? ? ? ?DefaultConsumer consumer = new DefaultConsumer(channel){
? ? ? ? ? ?@Override
? ? ? ? ? ?public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
? ? ? ? ? ? ? ?System.out.println("接收到消息:" + ?new String(body));
? ? ? ? ? ? ? ?channel.basicAck(envelope.getDeliveryTag(), false);
? ? ? ? ? }
? ? ? };
? ? ? ?channel.basicConsume(QUEUE_NAME, true, consumer);
? }
}
在实际生产环境,通常会出现消费者消费指定的消息,发布订阅模式显然做不到,因此,可以采用路由键的方式,将队列和交换机绑定在一起,从而消费指定的消息。
4. 路由模式
生产者生产的消息,在发送给交换机时需要指定一个路由键(routing key),交换机接收到消息时,将消息递交给与路由键(routing key)完全匹配的队列,如果不存在匹配的队列,那么这条消息会被丢弃。最后,每个队列中的消息可能就不一样了,那么消费者就可以消费自己订阅的消息。
package com.qf.rabbitmq.route;
?
import com.qf.rabbitmq.util.ConnectionUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
?
public class RouteProducer {
?
? ?public static void main(String[] argv) throws Exception {
? ? ? ?// 获取到连接
? ? ? ?Connection connection = ConnectionUtil.getConnection();
? ? ? ?// 获取通道
? ? ? ?Channel channel = connection.createChannel();
? ? ? ?// 声明exchange,指定类型为direct
? ? ? ?channel.exchangeDeclare(ConnectionUtil.DIRECT_EXCHANGE, "direct");
? ? ? ?// 消息内容
? ? ? ?String message1 = "成都很美丽";
? ? ? ?String message2 = "杭州很美丽";
? ? ? ?String message3 = "北京很繁华";
? ? ? ?String message4 = "深圳很繁华";
? ? ? ?channel.basicPublish(ConnectionUtil.DIRECT_EXCHANGE, "city1", null, message1.getBytes());
? ? ? ?System.out.println("发送消息:"+message1);
? ? ? ?channel.basicPublish(ConnectionUtil.DIRECT_EXCHANGE, "city1", null, message2.getBytes());
? ? ? ?System.out.println("发送消息:"+message2);
? ? ? ?channel.basicPublish(ConnectionUtil.DIRECT_EXCHANGE, "city2", null, message3.getBytes());
? ? ? ?System.out.println("发送消息:"+message3);
? ? ? ?channel.basicPublish(ConnectionUtil.DIRECT_EXCHANGE, "city2", null, message4.getBytes());
? ? ? ?System.out.println("发送消息:"+message4);
? ? ? ?channel.close();
? ? ? ?connection.close();
? }
}
?
?
package com.qf.rabbitmq.route;
?
import com.qf.rabbitmq.util.ConnectionUtil;
import com.rabbitmq.client.*;
?
import java.io.IOException;
?
public class RouteConsumer1 {
?
? ? ? ?private final static String QUEUE_NAME = "direct_queue_1";
?
? ? ? ?public static void main(String[] argv) throws Exception {
? ? ? ? ? ?Connection connection = ConnectionUtil.getConnection();
? ? ? ? ? ?Channel channel = connection.createChannel();
? ? ? ? ? ?channel.queueDeclare(QUEUE_NAME, false, false, false, null);
? ? ? ? ? ?// 绑定队列到交换机,同时指定需要订阅的routing key。可以指定多个
? ? ? ? ? ?channel.queueBind(QUEUE_NAME, ConnectionUtil.DIRECT_EXCHANGE, "city1");
? ? ? ? ? ?//channel.queueBind(QUEUE_NAME, ConnectionUtil.DIRECT_EXCHANGE, "city2");
?
? ? ? ? ? ?// 定义队列的消费者
? ? ? ? ? ?DefaultConsumer consumer = new DefaultConsumer(channel) {
? ? ? ? ? ? ? ?// 获取消息,并且处理,这个方法类似事件监听,如果有消息的时候,会被自动调用
? ? ? ? ? ? ? ?@Override
? ? ? ? ? ? ? ?public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? byte[] body) throws IOException {
? ? ? ? ? ? ? ? ? ?System.out.println("接收到消息: " + new String(body));
? ? ? ? ? ? ? }
? ? ? ? ? };
? ? ? ? ? ?// 监听队列,自动ACK
? ? ? ? ? ?channel.basicConsume(QUEUE_NAME, true, consumer);
? ? ? }
}
?
package com.qf.rabbitmq.route;
?
import com.qf.rabbitmq.util.ConnectionUtil;
import com.rabbitmq.client.*;
?
import java.io.IOException;
?
public class RouteConsumer2 {
?
? ? ? ?private final static String QUEUE_NAME = "direct_queue_2";
?
? ? ? ?public static void main(String[] argv) throws Exception {
? ? ? ? ? ?Connection connection = ConnectionUtil.getConnection();
? ? ? ? ? ?Channel channel = connection.createChannel();
? ? ? ? ? ?channel.queueDeclare(QUEUE_NAME, false, false, false, null);
? ? ? ? ? ?channel.queueBind(QUEUE_NAME, ConnectionUtil.DIRECT_EXCHANGE, "city2");
?
? ? ? ? ? ?DefaultConsumer consumer = new DefaultConsumer(channel) {
? ? ? ? ? ? ? ?@Override
? ? ? ? ? ? ? ?public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? byte[] body) throws IOException {
? ? ? ? ? ? ? ? ? ?System.out.println("接收到消息: " + new String(body));
? ? ? ? ? ? ? }
? ? ? ? ? };
? ? ? ? ? ?// 监听队列,自动ACK
? ? ? ? ? ?channel.basicConsume(QUEUE_NAME, true, consumer);
? ? ? }
}
在路由模式下,如果需要匹配多个路由键,就显得很累赘了,而主题模式支持模糊匹配,正好可以满足这一需求。
5. 主题模式
每个消费者监听自己的队列,并且设置带通配符的routingkey,生产者将消息发给broker,由交换机根据routingkey来转发消息到指定的队列。Routingkey一般都是有一个或者多个单词组成,多个单词之间以“.”分割。
通配符规则:
#:匹配一个或多个词
*:匹配不多不少恰好1个词
package com.qf.rabbitmq.topic;
?
import com.qf.rabbitmq.util.ConnectionUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
?
public class TopicProducer {
?
? ?public static void main(String[] argv) throws Exception {
? ? ? ?Connection connection = ConnectionUtil.getConnection();
? ? ? ?Channel channel = connection.createChannel();
? ? ? ?//声明exchange,指定类型为topic
? ? ? ?channel.exchangeDeclare(ConnectionUtil.TOPIC_EXCHANGE, "topic");
? ? ? ?String message1 = "匹配A";
? ? ? ?String message2 = "匹配AB";
? ? ? ?// 发送消息,并且指定routing key为:quick.orange.rabbit
? ? ? ?channel.basicPublish(ConnectionUtil.TOPIC_EXCHANGE, "com.qf.aa", null, message1.getBytes());
? ? ? ?System.out.println("发送消息:"+message1);
? ? ? ?channel.basicPublish(ConnectionUtil.TOPIC_EXCHANGE, "com.qf.aa.bb", null, message2.getBytes());
? ? ? ?System.out.println("发送消息:"+message2);
?
? ? ? ?channel.close();
? ? ? ?connection.close();
? }
}
?
package com.qf.rabbitmq.topic;
?
import com.qf.rabbitmq.util.ConnectionUtil;
import com.rabbitmq.client.*;
?
import java.io.IOException;
import java.util.concurrent.TimeoutException;
?
public class TopicConsumer1 {
?
? ?private static final String QUEUE_NAME = "topic_queue_1";
?
? ?public static void main(String[] args) throws IOException, TimeoutException {
? ? ? ?Connection connection = ConnectionUtil.getConnection();
? ? ? ?Channel channel = connection.createChannel();
? ? ? ?channel.queueDeclare(QUEUE_NAME, false, false, false, null);
? ? ? ?channel.queueBind(QUEUE_NAME, ConnectionUtil.TOPIC_EXCHANGE,"com.qf.*");
? ? ? ?DefaultConsumer consumer = new DefaultConsumer(channel){
? ? ? ? ? ?@Override
? ? ? ? ? ?public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
? ? ? ? ? ? ? ?System.out.println("接收到消息:" + ?new String(body));
? ? ? ? ? }
? ? ? };
? ? ? ?//因为是广播,所以这里需要设置为自动回执
? ? ? ?channel.basicConsume(QUEUE_NAME, true, consumer);
? }
}
?
package com.qf.rabbitmq.topic;
?
import com.qf.rabbitmq.util.ConnectionUtil;
import com.rabbitmq.client.*;
?
import java.io.IOException;
import java.util.concurrent.TimeoutException;
?
public class TopicConsumer2 {
?
? ?private static final String QUEUE_NAME = "topic_queue_2";
?
? ?public static void main(String[] args) throws IOException, TimeoutException {
? ? ? ?Connection connection = ConnectionUtil.getConnection();
? ? ? ?Channel channel = connection.createChannel();
? ? ? ?channel.queueDeclare(QUEUE_NAME, false, false, false, null);
? ? ? ?channel.queueBind(QUEUE_NAME, ConnectionUtil.TOPIC_EXCHANGE,"com.qf.#");
? ? ? ?DefaultConsumer consumer = new DefaultConsumer(channel){
? ? ? ? ? ?@Override
? ? ? ? ? ?public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
? ? ? ? ? ? ? ?System.out.println("接收到消息:" + ?new String(body));
? ? ? ? ? }
? ? ? };
? ? ? ?//因为是广播,所以这里需要设置为自动回执
? ? ? ?channel.basicConsume(QUEUE_NAME, true, consumer);
? }
}
6. RPC模式

文章来源:https://blog.csdn.net/m0_68933188/article/details/135339669
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。 如若内容造成侵权/违法违规/事实不符,请联系我的编程经验分享网邮箱:veading@qq.com进行投诉反馈,一经查实,立即删除!
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。 如若内容造成侵权/违法违规/事实不符,请联系我的编程经验分享网邮箱:veading@qq.com进行投诉反馈,一经查实,立即删除!