Spring boot 使用Redis 消息发布订阅
Spring boot 使用Redis 消息发布订阅
文章目录
最近在做请求风控的时候,在网上搜集了大量的解决方案,最后使用Redis 消息发布订阅 比较符合业务。做一下记录
Redis 消息发布订阅
Redis 发布订阅 命令:redis命令手册
1、Redis 中"pub/sub"的消息,为"即发即失",server 不会保存消息,如果 publish 的消息没有任何 client 处于 “subscribe” 状态,消息将会被丢弃;如果 client 在 subcribe 时,链接断开后重连,那在么此期间的消息也将丢失。
2、Redis server 将会"尽力"将消息发送给处于 subscribe 状态的 client,但是仍不会保证每条消息都能被正确接收。
**优点:**支持发布订阅,支持多组生产者、消费者处理消息
缺点:
-
消费者下线数据会丢失
-
不支持数据持久化,Redis宕机则数据也会丢失
-
消息堆积,缓存区溢出,消费者会被强制踢下线,数据也会丢失
Redis 发布订阅 命令
命令 | 描述 |
---|---|
Redis Unsubscribe 命令 | 指退订给定的频道。 |
Redis Subscribe 命令 | 订阅给定的一个或多个频道的信息。 |
Redis Pubsub 命令 | 查看订阅与发布系统状态。 |
Redis Punsubscribe 命令 | 退订所有给定模式的频道。 |
Redis Publish 命令 | 将信息发送到指定的频道。 |
Redis Psubscribe 命令 | 订阅一个或多个符合给定模式的频道。 |
Spring boot 实现消息发布订阅
1、引入 Redis 依赖
<!--Spring Boot redis 启动器-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
2、Redis 数据库配置
spring:
data:
redis:
database: 0
host: localhost
port: 6379
password:
发布消息
/**
* redis 将信息发送到指定的频道
* @param topic :消息所属的主题/频道
* @param context :消息内容
* @return
*/
redisTemplate.convertAndSend(topic, context);
@RequiredArgsConstructor
@Service
public class RequestRateLimiterService {
private final RedisTemplate<String, Object> redisTemplate;
// Redis 中的 key 前缀
private static final String REDIS_KEY_PREFIX = "select_rate_limit:";
// Redis 中的通道名称
private static final String REDIS_CHANNEL = "select_rate_limit_channel";
// 根据用户名 请求风控
public boolean allowRequest(String username) {
// 每分钟最大请求次数
Long MAX_REQUESTS_PER_MINUTE = 60L;
String key = REDIS_KEY_PREFIX + username;
Long currentRequests = redisTemplate.opsForValue().increment(key);
if (currentRequests != null && currentRequests > MAX_REQUESTS_PER_MINUTE) {
redisTemplate.convertAndSend(REDIS_CHANNEL, username);
return false; // 超过阈值,拒绝请求
}
if (currentRequests != null && currentRequests == 1) {
redisTemplate.expire(key, 1, TimeUnit.MINUTES); // 设置过期时间为1分钟
}
return true; // 允许请求
}
}
消息监听
1、 Redis 消息订阅-消息监听器,当收到阅订的消息时,会将消息交给这个类处理。
/**
* Redis 消息订阅-消息监听器,当收到阅订的消息时,会将消息交给这个类处理
* <p>
* 1、可以直接实现 MessageListener 接口,也可以继承它的实现类 MessageListenerAdapter.
* 2、自动多线程处理,打印日志即可看出,即使手动延迟,也不会影响后面消息的接收。
*
*/
@Component
public class RequestRateLimitSubscriber implements MessageListener {
// 直接从容器中获取
@Resource
private RedisTemplate<String, Object> redisTemplate;
/**
* 监听到的消息必须进行与发送时相同的方式进行反序列
* 1、订阅端与发布端 Redis 序列化的方式必须相同,否则会乱码。
*
* @param message :消息实体
* @param pattern :匹配模式
*/
@Override
public void onMessage(Message message, byte[] pattern) {
// 消息订阅的匹配规则,如 new PatternTopic("basic-*") 中的 basic-*
String msgPattern = new String(pattern);
// 消息所属的通道,可以根据不同的通道做不同的业务逻辑
String channel = (String) redisTemplate.getStringSerializer().deserialize(message.getChannel());
// 接收的消息内容,可以根据自己需要强转为自己需要的对象,但最好先使用 instanceof 判断一下
Object body = redisTemplate.getValueSerializer().deserialize(message.getBody());
log.info("收到 Redis 订阅消息: channel={} body={} pattern={} ", channel, body, msgPattern);
// 模拟数据处理 ********
// 发送警告通知,可以通过邮件、短信等方式进行通知
log.info("------------数据处理完成.......");
}
}
主题订阅
1、自定义 RedisTemplate 序列化方式(发布者和订阅者必须相同)。
2、配置主题订阅 - Redis 消息监听器绑定监听指定通道。
/**
* 自定义 RedisTemplate 序列化方式
* 配置主题订阅 - Redis 消息监听器绑定监听指定通道
*/
@Configuration
public class RedisConfig {
// 自定义的消息订阅监听器,当收到阅订的消息时,会将消息交给这个类处理
@Resource
private RequestRateLimitSubscriber requestRateLimitSubscriber;
// 自定义 RedisTemplate 序列化方式
@Bean
public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory factory) {
RedisTemplate<String, Object> redisTemplate = new RedisTemplate<>();
redisTemplate.setKeySerializer(RedisSerializer.string());// key 序列化规则
redisTemplate.setHashKeySerializer(RedisSerializer.string());// hash key 序列化规则
redisTemplate.setValueSerializer(RedisSerializer.java());// value 序列化规则
redisTemplate.setHashValueSerializer(RedisSerializer.java()); // hash value 序列化规则
redisTemplate.setConnectionFactory(factory); //绑定 RedisConnectionFactory
return redisTemplate; //返回设置好的 RedisTemplate
}
/**
* 配置主题订阅
* RedisMessageListenerContainer - Redis 消息监听器绑定监听指定通道
* 1、可以添加多个监听器,监听多个通道,只需要将消息监听器与订阅的通道/主题绑定即可。
* 2、订阅的通道可以配置在全局配置文件中,也可以配置在数据库中,
* <p>
* addMessageListener(MessageListener listener, Collection<? extends Topic> topics):将消息监听器与多个订阅的通道/主题绑定
* addMessageListener(MessageListener listener, Topic topic):将消息监听器与订阅的通道/主题绑定
*
* @param connectionFactory
* @return
*/
@Bean
public RedisMessageListenerContainer redisMessageListenerContainer(RedisConnectionFactory factory) {
RedisMessageListenerContainer container = new RedisMessageListenerContainer();
// 设置连接工厂,RedisConnectionFactory 可以直接从容器中取,也可以从 RedisTemplate 中取
container.setConnectionFactory(factory);
// 订阅名称叫 select_rate_limit_channel 的通道, 类似 Redis 中的 subscribe 命令
container.addMessageListener(requestRateLimitSubscriber, new ChannelTopic("*"));
// 订阅名称以 'basic-' 开头的全部通道, 类似 Redis 的 pSubscribe 命令
container.addMessageListener(requestRateLimitSubscriber, new PatternTopic("*"));
return container;
}
}
Spring boot 监听 Key 过期事件
1、Redis 数据库可以通过命令设置 Key 的有效时间,当一个 Key 过期后会自动从数据库中删除,释放空间。得益于于这个特性,可以很轻松地实现诸多类似于 “Session” 管理、数据缓存等功能。它们都有一个共同点就是,数据不会永久保存!
2、在有些场景中,可能希望在某些 Key 过期的时候获取到通知,进行一些业务处理。或者是干脆用于 “定时通知/任务” 功能,例如:下单 30 分钟后未支付,则取消订单。那么可以在用户下单的时候使用订单号作为 key 设置到 Redis 数据库中,并且设置过期时间为 30 分钟。当超时后,可以在 “key 过期通知” 中获取到 key 也就是订单号,判断用户是否已经支付从而是否取消订单。
3、Redis 的 Key 过期通知功能本质上是通过 发布/订阅 功能实现的,所以它「不能保证通知消息的交付」,当 Key 过期时如果服务器停机、重启后则该通知消息会永久丢失。
消息监听
1、Spring Data Redis 专门提供了一个密钥过期事件消息侦听器:KeyExpirationEventMessageListener,自定义监听器类继承它,然后覆写 doHandleMessage(Message message) 方法即可。
2、doHandleMessage 方法用于处理 Redis Key 过期通知事件,其中 Message 参数表示通知消息,只有 2 属性,分别表示消息正文(在这里就是过期的 Key 名称)以及来自于哪个 channel。
3、在 Redis Key 过期事件中,「只能获取到已过期的 Key 的名称,不能获取到值。」
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.listener.KeyExpirationEventMessageListener;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.stereotype.Component;
/**
* Redis 缓存 Key 过期监听器
* Spring Data Redis 专门提供了一个密钥过期事件消息侦听器:KeyExpirationEventMessageListener,
* 自定义监听器类继承它,然后覆写 doHandleMessage(Message message) 方法即可。
*/
@Component
public class KeyExpireListener extends KeyExpirationEventMessageListener {
private static final Logger logger = LoggerFactory.getLogger(KeyExpireListener.class);
/**
* 通过构造函数注入 RedisMessageListenerContainer 给 KeyExpirationEventMessageListener
*
* @param listenerContainer : Redis消息侦听器容器
*/
public KeyExpireListener(RedisMessageListenerContainer listenerContainer) {
super(listenerContainer);
}
/**
* doHandleMessage 方法用于处理 Redis Key 过期通知事件,
* 在 Redis Key 过期事件中,「只能获取到已过期的 Key 的名称,不能获取到值。」
*
* @param message:通知消息,只有 2 属性,分别表示消息正文(在这里就是过期的 Key 名称)以及来自于哪个 channel。
*/
@Override
public void doHandleMessage(Message message) {
// 过期的 key
String key = new String(message.getBody());
// 消息通道
String channel = new String(message.getChannel());
logger.info("过期key={} 消息通道(channel)={}", key, channel);
}
}
主题订阅
1、与上面稍微有点不同,因为 key 过期事件属于 Redis 内部消息,内部频道/通道,所以只需要往容器中注入 RedisMessageListenerContainer 就行,不需要 addMessageListener 手动设置监听器 监听指定的通道/频道(topic 表达式)。
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
@Configuration
public class RedisConfig {
@Bean
public RedisMessageListenerContainer redisMessageListenerContainer(RedisConnectionFactory factory) {
RedisMessageListenerContainer container = new RedisMessageListenerContainer();
container.setConnectionFactory(factory);
// container.setTaskExecutor(null); // 设置用于执行监听器方法的 Executor
// container.setErrorHandler(null); // 设置监听器方法执行过程中出现异常的处理器
// container.addMessageListener(null, null); // 手动设置监听器 & 监听的 topic 表达式
return container;
}
}
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。 如若内容造成侵权/违法违规/事实不符,请联系我的编程经验分享网邮箱:veading@qq.com进行投诉反馈,一经查实,立即删除!