rabbitmq-常见七种消息队列-控制台界面管理-python-实现简单访问

2023-12-18 12:39:54

1.消息的基本概念

1.1.生产者和消费者

生产者(Producer)
消息的创建者。
负责创建和推送数据到消息服务器。

消费者(Consumer)
消息的接收方。
负责接收消息和处理数据。

1.2.消息队列(Queue)

消息队列是RabbitMQ的内部对象,用于存储生产者的消息直到发送给消费者,它是消费者接收消息的地方。

消息队列的重要属性:

持久性
broker重启前都有效。
自动删除
在所有消费者停止使用之后自动删除。
惰性
没有主动声明队列,调用会导致异常。
排他性
一旦启用,声明它的消费者才能使用。

1.3.交换机(Exchange)

交换机用于接收,分配消息。

  1. 生产者要先指定一个routing key,然后将消息发送到交换机。
  2. routing key需要与exchange type和binding key联合使用才能最终生效。
  3. 交换机将消息路由到一个或多个队列中,或丢弃。

交换机包含4中类型: direct, topic, fanout, headers。

direct(直连交换机)
具有路由功能的交换机,绑定到此交换机的时候需要指定一个routing_key,交换机发送消息的时候需要routing_key,会将消息发送道对应的队列。先匹配,再投送。Direct Exchange是RabbitMQ的默认交换机模式。这是最简单的模式。它根据routing key全文匹配去寻找队列。

1.4.消息确认

消息确认是指当一个消息从队列中投递给消费者(consumer)后,消费者会通知一下消息代理(broker)。消息确认可以自动,也可以由处理消息的开发者手动执行。当启用消息确认后,消息代理需要收到来自消费者的确认回执后,才完全将消息从队列中删除。

2.七种队列模式

2.1.简单模式(Hello World)

在这里插入图片描述

做最简单的事情,一个生产者对应一个消费者,RabbitMQ相当于一个消息代理,负责将A的消息转发给B。
单生产者,单消费者,单队列。

应用场景:

将发送的电子邮件放到消息队列,然后邮件服务在队列中获取邮件并发送给收件人。

2.2.工作队列模式(Work queues)

在这里插入图片描述

在多个消费者之间分配任务(竞争的消费者模式),一个生产者对应多个消费者。适用于资源密集型任务, 单个消费者处理不过来,需要多个消费者进行处理的场景。单生产者,多消费者,单队列。

应用场景:

一个订单的处理需要10s,有多个订单可以同时放到消息队列, 然后让多个消费者同时并行处理,而不是单个消费者的串行消费。

2.3.发布订阅模式(Publish/Subscribe)

在这里插入图片描述
一次向许多消费者发送消息,将消息将广播到所有的消费者。单生产者,多消费者,多队列。

应用场景:

更新商品库存后需要通知多个缓存和多个数据库。

结构如下:

一个fanout类型交换机扇出两个消息队列,分别为缓存消息队列、数据库消息队列
一个缓存消息队列对应着多个缓存消费者
一个数据库消息队列对应着多个数据库消费者

2.4.路由模式(Routing)

在这里插入图片描述
根据Routing Key有选择地接收消息。多消费者,选择性多队列,每个队列通过routing key全文匹配。发送消息到交换机并且要指定路由键(Routing key) 。消费者将队列绑定到交换机时需要指定路由key,仅消费指定路由key的消息。

应用场景:

在商品库存中增加了1台iphone12,iphone12促销活动消费者指定routing key为iphone12 promote,只有此促销活动会接收到消息,其它促销活动不关心也不会消费此routing key的消息。

2.5.主题模式(Topics)

在这里插入图片描述
主题交换机方式接收消息,将routing key和模式进行匹配。多消费者,选择性多队列,每个队列通过模式匹配。队列需要绑定在一个模式上。#匹配一个词或多个词,*只匹配一个词。

应用场景:

iphone促销活动可以接收主题为多种iPhone的消息,如iphone12、iphone13等。

2.6.远程过程调用(RPC)

在这里插入图片描述

在远程计算机上运行功能并等待结果。

应用场景:

需要等待接口返回数据,如订单支付。

2.7.发布者确认(Publisher Confirms)

与发布者进行可靠的发布确认,发布者确认是RabbitMQ扩展,可以实现可靠的发布。在通道上启用发布者确认后,RabbitMQ将异步确认发送者发布的消息,这意味着它们已在服务器端处理。

应用场景:

对于消息可靠性要求较高,比如钱包扣款。

3.rabbitmq控制台管理

打开浏览器。访问 http://127.0.0.1:15672
出现管理页面:
账号:guest
密码:guest

登录后,您将看到 RabbitMQ 的控制台界面。该界面将显示以下几个主要部分:

Overview:概览页面提供了关于 RabbitMQ 节点、队列和交换机等的统计信息。
Connections:连接页面提供了有关当前客户端连接的详细信息。
Channels:通道页面提供了有关当前打开通道的详细信息。
Exchanges:交换机页面提供了有关 RabbitMQ 服务器上已声明的交换机的详细信息。
Queues:队列页面提供了有关 RabbitMQ 服务器上已声明的队列的详细信息。
Admin:管理页面提供了一些高级管理功能,例如添加用户、设置权限和定义策略等。

通过 RabbitMQ 控制台,您可以执行许多操作,例如创建和删除队列、交换机和绑定、查看队列和交换机的详细信息、监视连接和通道等。控制台还提供了一些高级管理功能,例如添加用户、设置权限和定义策略等。

3.1.三种基本队列

队列是具有两个主要操作的顺序数据结构:入队和出队。RabbitMQ中的队列是FIFO(先进先出)。一些队列因为一些特性,即消费者的优先级和重新排队,会影响消费者消费的顺序。
在这里插入图片描述

3.1.2.Classic经典队列-单机环境队列

这个是RabbitMQ最经典的队列类型。在单机环境中,拥有比较高的消息可靠性。
在这里插入图片描述我们在创建队列的时候,根据上图可以看到,经典队列可以选择是否持久化(Durability)以及是否自动删除(Auto delete)两个属性。

Durability:是否持久化,可选项为持久化(Durable)和 临时(Transient)。Durable相对消息安全性更高。但是同时需要有更多的IO操作,所以生产和消费消息的性能,相比Transient会比较低。
Auto delete:是否自动删除,如果选择是,则消息会被其中一个消费者消费之后,队列会自动销毁,其他消费者也会断开连接。一般不会自动删除。

3.1.3.Quorum仲裁队列-高可用队列

仲裁队列,是RabbitMQ从3.8.0版本引入的新的队列类型,整个3.8.X版本,也是针对仲裁队列进行完善和优化。Quorum相比Classic在分布式环境下对消息的可靠性保障更高。官方文档中表示,未来会使用Quorum代替Classic。

在这里插入图片描述
Quorum是基于Raft一致性协议实现的一种新型的分布式消息队列,他实现了持久化,多备份的FIFO队列,主要就是针对RabbitMQ的镜像模式设计的。简单理解就是Quorum队列中的消息需要有集群中多半节点同意确认后,才会写入到队列中。这种队列类似于RocketMQ当中的DLedger集群。这种方式可以保证消息在集群内部不会丢失。同时,Quorum是以牺牲很多高级队列特性为代价,来进一步保证消息在分布式环境下的高可靠。从整体功能上来说,Quorum队列是在Classic经典队列的基础上做减法,因此对于RabbitMQ的长期使用者而言,其实是会影响使用体验的。他与普通队列的区别:

特性ClassicQuorum
非持久化队列(Non-durable queues支持不支持
独占队列(Exclusivity支持不支持
每条消息的持久化(Per message persistence每条消息总是
会员变更(Membership changes自动手动
消息TTLMessage TTL支持支持(3.10版本开始)
队列TTLQueue TTL支持支持
队列长度限制(Queue length limits支持支持
懒加载(Lazy behaviour支持始终
消息优先级(Message priority支持不支持
消费者优先级(Consumer priority支持支持
死信交换(Dead letter exchanges支持支持
毒消息处理(Poison message handling不支持支持
全局QosGlobal QoS Prefetch支持不支持

Exclusivity表示独占队列,即表示队列只能由声明该队列的Connection连接来进行使用,包括队列创建、删除、收发消息等,并且独占队列会在声明该队列的Connection断开后自动删除。其中有个特例就是这个Poison Message。所谓毒消息是指消息一直不能被消费者正常消费(可能是由于消费者失败或者消费逻辑有问题等),就会导致消息不断的重新入队,这样这些消息就成为了毒消息。这些读消息应该有保障机制进行标记并及时删除。
Quorum队列会持续跟踪消息的失败投递尝试次数,并记录在x-delivery-count这样一个头部参数中。然后,就可以通过设置 Delivery limit参数来定制一个毒消息的删除策略。当消息的重复投递次数超过了Delivery limit参数阈值时,RabbitMQ就会删除这些毒消息。当然,如果配置了死信队列的话,就会进入对应的死信队列。
Quorum队列更适合于 队列长期存在,并且对容错、数据安全方面的要求比低延迟、不持久等高级队列更能要求更严格的场景。例如 电商系统的订单,引入MQ后,处理速度可以慢一点,但是订单不能丢失。

也对应以下一些不适合使用的场景:

一些临时使用的队列:比如transient临时队列,exclusive独占队列,或者经常会修改和删除的队列。
对消息低延迟要求高: 一致性算法会影响消息的延迟。
对数据安全性要求不高:Quorum队列需要消费者手动通知或者生产者手动确认。
队列消息积压严重 : 如果队列中的消息很大,或者积压的消息很多,就不要使用Quorum队列。Quorum队列当前会将所有消息始终保存在内存中,直到达到内存使用极限。

3.1.3.Stream队列-大规模队列

Stream队列是RabbitMQ自3.9.0版本开始引入的一种新的数据队列类型,也是目前官方最为推荐的队列类型。这种队列类型的消息是持久化到磁盘并且具备分布式备份的,更适合于消费者多,读消息非常频繁的场景。
在这里插入图片描述

Stream队列的核心是以append-only只添加的日志来记录消息,整体来说,就是消息将以append-only的方式持久化到日志文件中,然后通过调整每个消费者的消费进度offset,来实现消息的多次分发。这种队列提供了RabbitMQ已有的其他队列类型不太好实现的四个特点:

大规模分发(large fan-outs)

当想要向多个订阅者发送相同的消息时,以往的队列类型必须为每个消费者绑定一个专用的队列。如果消费者的数量很大,这就会导致性能低下。而Stream队列允许任意数量的消费者使用同一个队列的消息,从而消除绑定多个队列的需求。

消息回溯(Replay/Time-travelling)

RabbitMQ已有的这些队列类型,在消费者处理完消息后,消息都会从队列中删除,因此,无法重新读取已经消费过的消息。而Stream队列允许用户在日志的任何一个连接点开始重新读取数据。

高吞吐性能(Throughput Performance)

Stream队列的设计以性能为主要目标,对消息传递吞吐量的提升非常明显。

大日志(Large logs)

RabbitMQ一直以来有一个让人诟病的地方,就是当队列中积累的消息过多时,性能下降会非常明显。但是Stream队列的设计目标就是以最小的内存开销高效地存储大量的数据。

3.2.生产者和消费者

生产者是消息的提供方,消费者是消息的执行方,代表的是行为的不同,rabbitmq通过登录来验证不同的用户,不同的行为来代表不同人的特征。

3.3.交换机(exchange)

在RabbitMQ中,所有的producer都不会直接把message发送到queue中,甚至producer都不知道message在发出后有没有发送到queue中,事实上,producer只能将message发送给exchange,由exchange来决定发送到哪个queue中。

exchange的一端用来从producer中接收message,另一端用来发送message到queue,exchange的类型规定了怎么处理接收到的message,发布订阅模式使用到的exchange类型为 fanout ,这种exchange类型非常简单,就是将接收到的message广播给已知的(即绑定到此exchange的)所有consumer。

当然,如果不想使用特定的exchange,可以使用 exchange=‘’ 表示使用默认的exchange,默认的exchange会将消息发送到 routing_key 指定的queue,可以参考工作(任务)队列模式和Hello world模式。

4.Python访问rabbitmq

在python语言环境下,使用pika库来访问访问操作rabbitmq中间件。

pip install pika

在这里插入图片描述

4.1.生产者提交消息

# coding=utf-8
# producer
import pika

# 指定远程 rabbitmq 的用户名密码并创建凭证
credentials = pika.PlainCredentials(username="guest", password="guest")

# 1. 创建 connect 连接
connect = pika.BlockingConnection(pika.ConnectionParameters(host='127.0.0.1', port=5672, virtual_host='/', credentials=credentials))

# 2. 在 connect 上创建一个 channel
channel = connect.channel()

# 3. 在 channel 上声明交换器 exchange
channel.exchange_declare(exchange='hello', exchange_type='direct', passive=False, durable=True, auto_delete=False)

# 4. 声明一个队列,生产者和消费者都要声明一个相同的队列,用来防止万一某一方挂了,另一方能正常运行
channel.queue_declare(queue='hello')

# 5. 通过键 'world' 将队列和交换器绑定
channel.queue_bind(queue='hello', exchange='hello', routing_key='world')

# 6. 创建纯文本消息
msg_props = pika.BasicProperties()
msg_props.content_type = 'text/plain'

# 7. 将消息发送到 RabbitMQ
message = 'quit'
channel.basic_publish(exchange='hello',  routing_key='world',  properties=msg_props, body=message)

# 8. 关闭通道
channel.close()

# 9. 当生产者发送完消息后,可选择关闭连接
connect.close()

4.2.消费者执行消息

#!/usr/bin/env python
# coding=utf-8

# consumer

import pika

# 指定远程 rabbitmq 的用户名密码并创建凭证
credentials = pika.PlainCredentials(username="guest", password="guest")

# 1. 创建 connect 连接
connect = pika.BlockingConnection(pika.ConnectionParameters(host='127.0.0.1', port=5672, virtual_host='/', credentials=credentials))

# 2. 在 connect 上创建一个 channel
channel = connect.channel()

# 3. 在 channel 上声明交换器 exchange
channel.exchange_declare(exchange='hello', exchange_type='direct', passive=False, durable=True, auto_delete=False)

# 4. 声明一个队列,生产者和消费者都要声明一个相同的队列,用来防止万一某一方挂了,另一方能正常运行
channel.queue_declare(queue='hello')

# 5. 通过键 'world' 将队列和交换器绑定
channel.queue_bind(queue='hello', exchange='hello', routing_key='world')

# 6. 定义一个回调函数,用来接收生产者发送的消息
'''

在 Python3 中,bytes 和 str 的互相转换方式是
str.encode('utf-8')
bytes.decode('utf-8')

'''
def callback(channel, method, properties, body):
    # 消息确认
    channel.basic_ack(delivery_tag=method.delivery_tag)

    if body.decode('utf-8') == "quit":
        # 停止消费,并退出
        channel.basic_cancel(consumer_tag='hello-consumer')
        channel.close()
        connect.close()
    else:
        print("msg is {}".format(body))
        # print("msg type {}".format(type(body)))
        # print("msg eval after type {}".format(type(eval(body))))


# 7. 消费者消费
channel.basic_consume('hello', callback,  auto_ack=False)

# 8. 开始循环取消息
channel.start_consuming()

5.总结

通过使用rabbitmq技术,可以实现生产者和消费者模式,并实现两者的解耦,生产者负责通过交换机将数据存入队列,而消费者从队列中取数据,并执行相应的消息。可以用在服务器复杂耗时任务的并行计算中使用,与常用的web服务器(如apache等)解耦,提高服务器计算资源的利用效率。

文章来源:https://blog.csdn.net/m0_67316550/article/details/135041282
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。