【redis】redis系统实现发布订阅的标准模板

2023-12-19 23:59:51

简介

Redis发布订阅功能是Redis的一种消息传递模式,允许多个客户端之间通过消息通道进行实时的消息传递。在发布订阅模式下,消息的发送者被称为发布者(publisher),而接收消息的客户端被称为订阅者(subscriber)。

在Redis中,发布者可以将消息发布到一个或多个频道(channel),而订阅者可以选择订阅感兴趣的频道以接收相关的消息。同时,一个订阅者也可以订阅多个频道。当有消息发布到已被订阅的频道时,所有订阅该频道的客户端都能够接收并处理这些消息。

发布订阅功能在实时消息推送、事件通知、即时通讯等场景中具有广泛的应用。在分布式系统中,它也经常被用于解耦消息的发送和接收,实现松耦合的消息通信机制。

通过发布订阅功能,Redis为开发者提供了一种简单而高效的消息传递机制,使得不同的模块或系统之间可以实现解耦、实时通信和消息广播等操作。

参数配置

安装redis后,在redis.conf内,将bind 127.0.0.1一行注掉,然后重启redis(查找redis-server)

代码模板

import json
import redis

class RedisChannelManager:
    def __init__(self):
        # 初始化 Redis 连接和订阅发布功能
        # 通过传递这些参数,StrictRedis 类将创建一个与 Redis 服务器的连接
        # ,并使用指定的主机地址、端口号和数据库。你可以通过这个连接执行各种 Redis 命令和操作,包括发布和订阅消息。
        self.redisClient = redis.StrictRedis(host='127.0.0.1', port=6379, db=0)
        #方法用于创建一个 Redis 发布/订阅对象。通过这个对象,你可以订阅一个或多个通道,并接收其他客户端发布到这些通道的消息。
        self.pubsub = self.redisClient.pubsub()

    def subscribe(self,channelName):
        # 订阅指定通道
        self.pubsub.subscribe(channelName)
        self.channelName = channelName
        # pubsub.listen() 是 Redis 发布/订阅对象的方法,用于监听订阅的通道,并等待接收消息。
        # 当有消息到达时,listen() 方法将会阻塞当前线程,并返回一个生成器对象,通过这个生成器对象可以迭代获取消息。
        for item in self.pubsub.listen():
            if item['type'] == 'message':
                self.processMessage(item['data'])
    def processMessage(self, message):
        message=message.decode('utf-8')
        # 处理收到的消息
        print(f"Received message: {message}")
        data = json.loads(message)
        if data.get('type') == 'pic':
            self.handlePicBusiness(data)

    def handlePicBusiness(self, data):
        # 处理特定业务类型为 pic 的消息
        pic_list = data.get('pic_list')
        print(pic_list)
        # 进行业务操作,比如存储到数据库或者处理图片
        
    #发布报警信息
    def handleAlarm(self, data):
        self.publishMessage(data)
        
    def publishMessage(self, data,name):
        while True:
        # 发布消息到指定通道
            message = json.dumps(data)
            self.redisClient.publish(name, message)
            time.sleep(2)

# 创建两个通道管理对象
channel_manager1 = RedisChannelManager()
channel_manager2 = RedisChannelManager()


# 在不同的线程中订阅两个通道
import threading
import time
thread1 = threading.Thread(target=channel_manager1.subscribe,args=("c1",))

# 发布业务类型为 pic 的消息
pic_data = {
    'type': 'pic',
    'pic_list': ['pic1.jpg', 'pic2.jpg']
}

thread2 = threading.Thread(target=channel_manager1.publishMessage,args=(pic_data,"c1"))
thread2.start()
thread1.start()
thread2.join()
thread1.join()

文章来源:https://blog.csdn.net/hh1357102/article/details/135085395
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。