Spring boot 使用Redis 消息发布订阅

2023-12-14 19:33:53

Spring boot 使用Redis 消息发布订阅

最近在做请求风控的时候,在网上搜集了大量的解决方案,最后使用Redis 消息发布订阅 比较符合业务。做一下记录

img

Redis 消息发布订阅

img

Redis 发布订阅 命令:redis命令手册

1、Redis 中"pub/sub"的消息,为"即发即失",server 不会保存消息,如果 publish 的消息没有任何 client 处于 “subscribe” 状态,消息将会被丢弃;如果 client 在 subcribe 时,链接断开后重连,那在么此期间的消息也将丢失。

2、Redis server 将会"尽力"将消息发送给处于 subscribe 状态的 client,但是仍不会保证每条消息都能被正确接收。

**优点:**支持发布订阅,支持多组生产者、消费者处理消息

缺点:

  1. 消费者下线数据会丢失

  2. 不支持数据持久化,Redis宕机则数据也会丢失

  3. 消息堆积,缓存区溢出,消费者会被强制踢下线,数据也会丢失

Redis 发布订阅 命令
命令描述
Redis Unsubscribe 命令指退订给定的频道。
Redis Subscribe 命令订阅给定的一个或多个频道的信息。
Redis Pubsub 命令查看订阅与发布系统状态。
Redis Punsubscribe 命令退订所有给定模式的频道。
Redis Publish 命令将信息发送到指定的频道。
Redis Psubscribe 命令订阅一个或多个符合给定模式的频道。

Spring boot 实现消息发布订阅

1、引入 Redis 依赖

    <!--Spring Boot redis 启动器-->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-data-redis</artifactId>
    </dependency>

2、Redis 数据库配置

spring:
  data:
    redis:
      database: 0
      host: localhost
      port: 6379
      password:

发布消息

	/**
     * redis 将信息发送到指定的频道
     * @param topic   :消息所属的主题/频道
     * @param context :消息内容
     * @return
     */
	redisTemplate.convertAndSend(topic, context);

@RequiredArgsConstructor
@Service
public class RequestRateLimiterService {

	private final RedisTemplate<String, Object> redisTemplate;

	// Redis 中的 key 前缀
	private static final String REDIS_KEY_PREFIX = "select_rate_limit:";

	// Redis 中的通道名称
	private static final String REDIS_CHANNEL = "select_rate_limit_channel";

    // 根据用户名 请求风控
	public boolean allowRequest(String username) {
		
		// 每分钟最大请求次数
		Long MAX_REQUESTS_PER_MINUTE = 60L;

		String key = REDIS_KEY_PREFIX + username;
		Long currentRequests = redisTemplate.opsForValue().increment(key);
		if (currentRequests != null && currentRequests > MAX_REQUESTS_PER_MINUTE) {
			redisTemplate.convertAndSend(REDIS_CHANNEL, username);
			return false; // 超过阈值,拒绝请求
		}
		if (currentRequests != null && currentRequests == 1) {
			redisTemplate.expire(key, 1, TimeUnit.MINUTES); // 设置过期时间为1分钟
		}
		return true; // 允许请求
	}

}

消息监听

1、 Redis 消息订阅-消息监听器,当收到阅订的消息时,会将消息交给这个类处理。

/**
 * Redis 消息订阅-消息监听器,当收到阅订的消息时,会将消息交给这个类处理
 * <p>
 * 1、可以直接实现 MessageListener 接口,也可以继承它的实现类 MessageListenerAdapter.
 * 2、自动多线程处理,打印日志即可看出,即使手动延迟,也不会影响后面消息的接收。
 *
 */
@Component
public class RequestRateLimitSubscriber implements MessageListener {
    // 直接从容器中获取
    @Resource
    private RedisTemplate<String, Object> redisTemplate;
    /**
     * 监听到的消息必须进行与发送时相同的方式进行反序列
     * 1、订阅端与发布端 Redis 序列化的方式必须相同,否则会乱码。
     *
     * @param message :消息实体
     * @param pattern :匹配模式
     */
    @Override
    public void onMessage(Message message, byte[] pattern) {
        // 消息订阅的匹配规则,如 new PatternTopic("basic-*") 中的 basic-*
        String msgPattern = new String(pattern);
        // 消息所属的通道,可以根据不同的通道做不同的业务逻辑
        String channel = (String) redisTemplate.getStringSerializer().deserialize(message.getChannel());
        // 接收的消息内容,可以根据自己需要强转为自己需要的对象,但最好先使用 instanceof 判断一下
        Object body = redisTemplate.getValueSerializer().deserialize(message.getBody());
 
        log.info("收到 Redis 订阅消息: channel={} body={} pattern={} ", channel, body, msgPattern);
 
        // 模拟数据处理 ********
        // 发送警告通知,可以通过邮件、短信等方式进行通知
        log.info("------------数据处理完成.......");
    }
}

主题订阅

1、自定义 RedisTemplate 序列化方式(发布者和订阅者必须相同)。

2、配置主题订阅 - Redis 消息监听器绑定监听指定通道。

/**
 * 自定义 RedisTemplate 序列化方式
 * 配置主题订阅 - Redis 消息监听器绑定监听指定通道
 */
@Configuration
public class RedisConfig {
    // 自定义的消息订阅监听器,当收到阅订的消息时,会将消息交给这个类处理
    @Resource
    private RequestRateLimitSubscriber requestRateLimitSubscriber;
 
    //  自定义 RedisTemplate 序列化方式   
    @Bean
	public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory factory) {
		RedisTemplate<String, Object> redisTemplate = new RedisTemplate<>();
		redisTemplate.setKeySerializer(RedisSerializer.string());// key 序列化规则
		redisTemplate.setHashKeySerializer(RedisSerializer.string());// hash key 序列化规则
		redisTemplate.setValueSerializer(RedisSerializer.java());// value 序列化规则
		redisTemplate.setHashValueSerializer(RedisSerializer.java()); // hash value 序列化规则
		redisTemplate.setConnectionFactory(factory); //绑定 RedisConnectionFactory
		return redisTemplate; //返回设置好的 RedisTemplate
	}
    /**
     * 配置主题订阅
     * RedisMessageListenerContainer - Redis 消息监听器绑定监听指定通道
     * 1、可以添加多个监听器,监听多个通道,只需要将消息监听器与订阅的通道/主题绑定即可。
     * 2、订阅的通道可以配置在全局配置文件中,也可以配置在数据库中,
     * <p>
     * addMessageListener(MessageListener listener, Collection<? extends Topic> topics):将消息监听器与多个订阅的通道/主题绑定
     * addMessageListener(MessageListener listener, Topic topic):将消息监听器与订阅的通道/主题绑定
     *
     * @param connectionFactory
     * @return
     */
	@Bean
	public RedisMessageListenerContainer redisMessageListenerContainer(RedisConnectionFactory factory) {
		RedisMessageListenerContainer container = new RedisMessageListenerContainer();
		// 设置连接工厂,RedisConnectionFactory 可以直接从容器中取,也可以从 RedisTemplate 中取
		container.setConnectionFactory(factory);
		// 订阅名称叫 select_rate_limit_channel 的通道, 类似 Redis 中的 subscribe 命令
		container.addMessageListener(requestRateLimitSubscriber, new ChannelTopic("*"));
		// 订阅名称以 'basic-' 开头的全部通道, 类似 Redis 的 pSubscribe 命令
		container.addMessageListener(requestRateLimitSubscriber, new PatternTopic("*"));
		return container;

	}
}

Spring boot 监听 Key 过期事件

1、Redis 数据库可以通过命令设置 Key 的有效时间,当一个 Key 过期后会自动从数据库中删除,释放空间。得益于于这个特性,可以很轻松地实现诸多类似于 “Session” 管理、数据缓存等功能。它们都有一个共同点就是,数据不会永久保存!

2、在有些场景中,可能希望在某些 Key 过期的时候获取到通知,进行一些业务处理。或者是干脆用于 “定时通知/任务” 功能,例如:下单 30 分钟后未支付,则取消订单。那么可以在用户下单的时候使用订单号作为 key 设置到 Redis 数据库中,并且设置过期时间为 30 分钟。当超时后,可以在 “key 过期通知” 中获取到 key 也就是订单号,判断用户是否已经支付从而是否取消订单。

3、Redis 的 Key 过期通知功能本质上是通过 发布/订阅 功能实现的,所以它「不能保证通知消息的交付」,当 Key 过期时如果服务器停机、重启后则该通知消息会永久丢失。

消息监听

1、Spring Data Redis 专门提供了一个密钥过期事件消息侦听器:KeyExpirationEventMessageListener,自定义监听器类继承它,然后覆写 doHandleMessage(Message message) 方法即可。

2、doHandleMessage 方法用于处理 Redis Key 过期通知事件,其中 Message 参数表示通知消息,只有 2 属性,分别表示消息正文(在这里就是过期的 Key 名称)以及来自于哪个 channel。

3、在 Redis Key 过期事件中,「只能获取到已过期的 Key 的名称,不能获取到值。」

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.listener.KeyExpirationEventMessageListener;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.stereotype.Component;
/**
 * Redis 缓存 Key 过期监听器
 * Spring Data Redis 专门提供了一个密钥过期事件消息侦听器:KeyExpirationEventMessageListener,
 * 自定义监听器类继承它,然后覆写 doHandleMessage(Message message) 方法即可。
 */
@Component
public class KeyExpireListener extends KeyExpirationEventMessageListener {
    private static final Logger logger = LoggerFactory.getLogger(KeyExpireListener.class);
    /**
     * 通过构造函数注入 RedisMessageListenerContainer 给 KeyExpirationEventMessageListener
     *
     * @param listenerContainer : Redis消息侦听器容器
     */
    public KeyExpireListener(RedisMessageListenerContainer listenerContainer) {
        super(listenerContainer);
    }
    /**
     * doHandleMessage 方法用于处理 Redis Key 过期通知事件,
     * 在 Redis Key 过期事件中,「只能获取到已过期的 Key 的名称,不能获取到值。」
     *
     * @param message:通知消息,只有 2 属性,分别表示消息正文(在这里就是过期的 Key 名称)以及来自于哪个 channel。
     */
    @Override
    public void doHandleMessage(Message message) {
        // 过期的 key
        String key = new String(message.getBody());
        // 消息通道
        String channel = new String(message.getChannel());
        logger.info("过期key={} 消息通道(channel)={}", key, channel);
    }
}

主题订阅

1、与上面稍微有点不同,因为 key 过期事件属于 Redis 内部消息,内部频道/通道,所以只需要往容器中注入 RedisMessageListenerContainer 就行,不需要 addMessageListener 手动设置监听器 监听指定的通道/频道(topic 表达式)。

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
@Configuration
public class RedisConfig {
    @Bean
    public RedisMessageListenerContainer redisMessageListenerContainer(RedisConnectionFactory factory) {
        RedisMessageListenerContainer container = new RedisMessageListenerContainer();
        container.setConnectionFactory(factory);
        
    //  container.setTaskExecutor(null);            // 设置用于执行监听器方法的 Executor
    //  container.setErrorHandler(null);            // 设置监听器方法执行过程中出现异常的处理器
    //  container.addMessageListener(null, null);   // 手动设置监听器 & 监听的 topic 表达式
        return container;
    }
}

文章来源:https://blog.csdn.net/qq_35098526/article/details/134846059
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。