Springboot整合Redis实现消息发布订阅
2023-12-13 20:10:45
一、前言
有时候在实际项目中,某些业务场景下我们需要使用消息的发布订阅功能,以实现某些特殊的需求,那么我们实际上可以有多种选择,比如使用常见的消息中间件Rabbitmq,Kafka,Activemq等,但这几个都算是重量级的消息队列,使用成本相比较于Redis要高,而在有些业务中,我们可能只是想实现消息的发布订阅,并不是需要保证消息的完全的可靠性没有很高的要求,那么使用Redis无疑是最好的选择。
二、如何实现?
1.在pom.xml引入Redis相关maven依赖。
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
2.在application.yml中添加redis配置。
spring:
data:
redis:
host: localhost
port: 6379
password: password
3.实现自定义的Redis 消息订阅-消息监听器,处理收到的订阅消息。
import cn.hutool.core.thread.ThreadUtil;
import lombok.extern.slf4j.Slf4j;
import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.connection.MessageListener;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.util.Random;
/**
* Redis 消息订阅-消息监听器,当收到阅订的消息时,会将消息交给这个类处理
* 可以直接实现 MessageListener 接口,也可以继承它的实现类 MessageListenerAdapter.
* 自动多线程处理,打印日志即可看出,即使手动延迟,也不会影响后面消息的接收。
*
*/
@Component
@Slf4j
public class RedisReceiveListener implements MessageListener {
// 直接从容器中获取
@Resource
private RedisTemplate redisTemplate;
/**
* 监听到的消息必须进行与发送时相同的方式进行反序列
* 订阅端与发布端 Redis 序列化的方式必须相同,否则会乱码。
* @param message :消息实体
* @param pattern :匹配模式
*/
@Override
public void onMessage(Message message, byte[] pattern) {
// 消息订阅的匹配规则,如 new PatternTopic("user-*") 中的 user-*
String msgPattern = new String(pattern);
// 消息所属的通道,可以根据不同的通道做不同的业务逻辑
String channel = (String) redisTemplate.getStringSerializer().deserialize(message.getChannel());
// 接收的消息内容,可以根据自己需要强转为自己需要的对象。
Object body = redisTemplate.getValueSerializer().deserialize(message.getBody());
log.info("收到Redis订阅消息: channel={} body={} pattern={} ", channel, body, msgPattern);
// 手动延迟,模拟数据处理
ThreadUtil.safeSleep(700 + new Random().nextInt(2000));
log.info("--------------------数据处理完成");
}
}
4.配置Redis-将Redis 消息监听器绑定监听指定通道。
import com.fasterxml.jackson.annotation.JsonAutoDetect;
import com.fasterxml.jackson.annotation.PropertyAccessor;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.listener.ChannelTopic;
import org.springframework.data.redis.listener.PatternTopic;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.data.redis.serializer.Jackson2JsonRedisSerializer;
import org.springframework.data.redis.serializer.StringRedisSerializer;
import javax.annotation.Resource;
/**
* 自定义 RedisTemplate 序列化方式
* 配置主题订阅 - Redis消息监听器绑定监听指定通道
*/
@Configuration
public class RedisConfig {
// 自定义的消息订阅监听器
@Resource
private RedisReceiveListener redisReceiveListener;
/**
* 自定义 RedisTemplate 序列化方式
* @param redisConnectionFactory
* @return
*/
@Bean
public RedisTemplate<Object, Object> redisTemplate(RedisConnectionFactory redisConnectionFactory) {
RedisTemplate<Object, Object> redisTemplate = new RedisTemplate<>();
//绑定 RedisConnectionFactory
redisTemplate.setConnectionFactory(redisConnectionFactory);
//创建 Jackson2JsonRedisSerializer 序列方式,对象类型使用 Object 类型,
Jackson2JsonRedisSerializer jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer(Object.class);
ObjectMapper objectMapper = new ObjectMapper();
objectMapper.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
objectMapper.activateDefaultTyping(new LaissezFaireSubTypeValidator(), ObjectMapper.DefaultTyping.NON_FINAL);
jackson2JsonRedisSerializer.setObjectMapper(objectMapper);
// 设置 RedisTemplate 序列化规则,因为 key 通常是普通的字符串,所以使用StringRedisSerializer即可,而 value 是对象时,才需要使用序列化与反序列化。
redisTemplate.setKeySerializer(new StringRedisSerializer());
redisTemplate.setValueSerializer(jackson2JsonRedisSerializer);
// hash key 序列化规则
redisTemplate.setHashKeySerializer(new StringRedisSerializer());
redisTemplate.setHashValueSerializer(jackson2JsonRedisSerializer);
//属性设置后操作
redisTemplate.afterPropertiesSet();
return redisTemplate;
}
/**
* 配置主题订阅
* 可以添加多个监听器,监听多个通道,只需要将消息监听器与订阅的通道/主题绑定即可。
* addMessageListener(MessageListener listener, Collection<? extends Topic> topics):将消息监听器与多个订阅的通道/主题绑定
* addMessageListener(MessageListener listener, Topic topic):将消息监听器与订阅的通道/主题绑定
* @param connectionFactory
* @return
*/
@Bean
public RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory) {
RedisMessageListenerContainer container = new RedisMessageListenerContainer();
// 设置连接工厂,RedisConnectionFactory 可以直接从容器中取,也可以从 RedisTemplate 中取
container.setConnectionFactory(connectionFactory);
// 订阅名称叫cache的通道, 类似 Redis 中的subscribe命令
container.addMessageListener(redisReceiveListener, new ChannelTopic("cache"));
// 订阅名称以 'user-' 开头的全部通道, 类似 Redis 的 pSubscribe 命令
container.addMessageListener(redisReceiveListener, new PatternTopic("user-*"));
return container;
}
}
5.publisher发布者发布消息
@EnableScheduling //开启定时器功能
@Component
public class RedisMessageSender {
@Resource
private RedisTemplate redisTemplate;
@Scheduled(fixedRate = 5000) //间隔5s 通过redisTemplate对象向redis消息队列cache通道发布消息
public void sendMessage(){
redisTemplate.convertAndSend("cache",String.valueOf(Math.random()));
redisTemplate.convertAndSend("user-my",String.valueOf(Math.random()));
}
}
这样的话,发布的消息就可以被Redis消息监听器收到并处理。
6.我们也可以在Controller测试下。
@RestController
@RequestMapping("redis")
@Slf4j
public class PublishController {
@Autowired
private RedisTemplate redisTemplate;
@GetMapping(value = "/publish")
public String pubMsg(){
redisTemplate.convertAndSend("user-id","124232");
redisTemplate.convertAndSend("cache","myCache");
log.info("发布者发送Topic消息... ");
return "成功";
}
}
大家有时间可以尝试下。
文章来源:https://blog.csdn.net/qq_42077317/article/details/134952084
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。 如若内容造成侵权/违法违规/事实不符,请联系我的编程经验分享网邮箱:veading@qq.com进行投诉反馈,一经查实,立即删除!
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。 如若内容造成侵权/违法违规/事实不符,请联系我的编程经验分享网邮箱:veading@qq.com进行投诉反馈,一经查实,立即删除!