Redis项目实战——黑马点评——从入门到深入
项目介绍
项目图
数据库
登录功能实现
基于Session实现登录
流程图
发送短信验证码
接口介绍
代码实现
实现流程:
1.校验手机号 2.如果不符合,返回错误信息 3.符合,生成验证码 4.保存到Session 5.发送验证码 6.返回ok
注意:
- _在校验手机号时提供了正则表达的工具类_RegexUtil里面有各种格式的校验。
- 生成验证码使用RandomUtil的工具类
- 发送短信验证码,在瑞吉外卖项目已经实现过,详情可见瑞吉外卖总结
public Result sendCode(String phone, HttpSession session) {
//1.校验手机号
if (RegexUtils.isPhoneInvalid(phone)) {
//2.如果不符合,返回错误信息
return Result.fail("手机号格式错误");
}
//3.符合,生成验证码
String code = RandomUtil.randomNumbers(6);
//4.保存到Session
session.setAttribute("code",code);
//5.发送验证码
log.debug("发送短信验证码成功,验证码为:{}",code);
//6.返回ok
return Result.ok();
}
短信登录
接口介绍
代码实现
实现流程:
_1.校验手机号 2校验验证码 3.不一致,报错 4.一致,根据手机号查询用户 5.判断用户是否存在 6.不存在,创建新用户并保存 7.保存用户信息到session _
public Result login(LoginFormDTO loginForm, HttpSession session) {
//1.校验手机号
String phone = loginForm.getPhone();
if (RegexUtils.isPhoneInvalid(phone)) {
//2.如果不符合,返回错误信息
return Result.fail("手机号格式错误");
}
//2.校验验证码
Object cacheCode = session.getAttribute("code");
String code = loginForm.getCode();
if (cacheCode == null || !cacheCode.toString().equals(code)){
//3.不一致,报错
return Result.fail("验证码错误");
}
//4.一致,根据手机号查询用户
User user = query().eq("phone", phone).one();
//5.判断用户是否存在
if (user == null){
//6.不存在,创建新用户并保存
user = createUserWithPhone(phone);
}
//7.保存用户信息到session
session.setAttribute("user",user);
return Result.ok();
}
登录验证功能
接口介绍
访问每个Controller之前都需要进行session验证,这样很麻烦,我们在访问Controller之前,用拦截器进行拦截,拦截到保存用户的信息,发送到Controller中,使用TreadLocal,线程安全。
代码实现
实现流程:
//1.获取session
//2.获取session中的用户
//3.判断用户是否存在
//4.不存在,拦截
//5.存在,保存用户信息到ThreadLocal
//6.放行
注意:
- 在使用ThreadLocal之前,在工具类中提供了UserHolder的工具类,里面定义了三个方法。save、get、remove方法。
- _在编写完拦截器之后拦截器并没有生效,需要编写配置类,实现WebMvcConfigurer接口。在里面实现方法addInterceptor。_registry.addInterceptor(new LoginInterceptor());然后配置要拦截的路径。
拦截器
public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception {
//1.获取session
HttpSession session = request.getSession();
//2.获取session中的用户
Object user = session.getAttribute("user");
//3.判断用户是否存在
if (user == null){
//4.不存在,拦截
response.setStatus(401);
return false;
}
//5.存在,保存用户信息到ThreadLocal
UserHolder.saveUser((User)user);
//6.放行
return true;
}
集群的session共享问题
session共享问题:多台Tomcat并不共享session存储空间,当请求切换到不同的tomcat服务器时,导致的数据丢失问题
session的替代方案应该满足:
- 数据共享
- 内存存储
- key-value存储
基于Redis实现共享session登录
流程图
代码实现
- 发送短信流程 :
1.校验手机号 //2.如果不符合,返回错误信息//3.符合,生成验证码//4.保存到redis//5.发送验证码//6.返回ok
public Result sendCode(String phone, HttpSession session) {
//1.校验手机号
if (RegexUtils.isPhoneInvalid(phone)) {
//2.如果不符合,返回错误信息
return Result.fail("手机号格式错误");
}
//3.符合,生成验证码
String code = RandomUtil.randomNumbers(6);
//4.保存到redis
stringRedisTemplate.opsForValue().set(RedisConstants.LOGIN_CODE_KEY + phone,code,RedisConstants.LOGIN_CODE_TTL, TimeUnit.MINUTES);
//5.发送验证码
log.debug("发送短信验证码成功,验证码为:{}",code);
//6.返回ok
return Result.ok();
}
- 登录流程:
//1.校验手机号//2.从redis获取并校验验证码//3.不一致,报错//4.一致,根据手机号查询用户//5.判断用户是否存在//6.不存在,创建新用户并保存//7.保存用户信息到redis//7.1随机生成token,作为登录令牌//7.2将user对象转为hash存储//7.3存储//7.4设置token有效期//8.返回token
public Result login(LoginFormDTO loginForm, HttpSession session) {
//1.校验手机号
String phone = loginForm.getPhone();
if (RegexUtils.isPhoneInvalid(phone)) {
//2.如果不符合,返回错误信息
return Result.fail("手机号格式错误");
}
//2.从redis获取并校验验证码
String cacheCode = stringRedisTemplate.opsForValue().get(RedisConstants.LOGIN_CODE_KEY + phone);
String code = loginForm.getCode();
if (cacheCode == null || !cacheCode.equals(code)){
//3.不一致,报错
return Result.fail("验证码错误");
}
//4.一致,根据手机号查询用户
User user = query().eq("phone", phone).one();
//5.判断用户是否存在
if (user == null){
//6.不存在,创建新用户并保存
user = createUserWithPhone(phone);
}
//7.保存用户信息到redis
//7.1随机生成token,作为登录令牌
String token = UUID.randomUUID().toString(true);
//7.2将user对象转为hash存储
UserDTO userDTO = BeanUtil.copyProperties(user, UserDTO.class);
Map<String, Object> userMap = BeanUtil.beanToMap(userDTO,new HashMap<>(),
CopyOptions.create()
.setIgnoreNullValue(true)
.setFieldValueEditor((fieldName,fieldValue)->fieldValue.toString()));
//7.3存储
String tokenKey = "login:token:"+ token;
stringRedisTemplate.opsForHash().putAll(tokenKey,userMap);
//7.4设置token有效期
stringRedisTemplate.expire(tokenKey,30,TimeUnit.MINUTES);
//8.返回token
return Result.ok(token);
}
- 拦截器流程:
//1.获取请求头中的token//2.基于token获取redis中的用户//3.判断用户是否存在//4.不存在,拦截//5.将查询到的Hash数据转换为UserDTO对象//6.存在,保存用户信息到ThreadLocal//7.刷新token的有效期//7.放行
public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception {
//1.获取请求头中的token
String token = request.getHeader("authorization");
if (StrUtil.isBlank(token)){
return false;
}
//2.基于token获取redis中的用户
Map<Object, Object> userMap = stringRedisTemplate.opsForHash().entries(RedisConstants.LOGIN_USER_KEY + token);
//3.判断用户是否存在
if (userMap.isEmpty()){
//4.不存在,拦截
response.setStatus(401);
return false;
}
//5.将查询到的Hash数据转换为UserDTO对象
UserDTO userDTO = BeanUtil.fillBeanWithMap(userMap, new UserDTO(), false);
//6.存在,保存用户信息到ThreadLocal
UserHolder.saveUser(userDTO);
//7.刷新token的有效期
String tokenKey = "login:token:"+ token;
stringRedisTemplate.expire(tokenKey,30, TimeUnit.MINUTES);
//7.放行
return true;
}
注意:
- 把对象存入redis中的map时每个属性的值都应为String类型,在此项目中dto的id为long类型,需要做如下配置
- redis的key需要唯一并且后面容易取到,类型选择也要注意。
Map<String, Object> userMap = BeanUtil.beanToMap(userDTO,new HashMap<>(),
CopyOptions.create()
.setIgnoreNullValue(true)
.setFieldValueEditor((fieldName,fieldValue)->fieldValue.toString()));
登录拦截器的优化
流程图
实现流程:
token拦截器:
public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception {
//1.获取请求头中的token
String token = request.getHeader("authorization");
if (StrUtil.isBlank(token)){
return true;
}
//2.基于token获取redis中的用户
Map<Object, Object> userMap = stringRedisTemplate.opsForHash().entries(RedisConstants.LOGIN_USER_KEY + token);
//3.判断用户是否存在
if (userMap.isEmpty()){
//4.不存在,拦截
response.setStatus(401);
return true;
}
//5.将查询到的Hash数据转换为UserDTO对象
UserDTO userDTO = BeanUtil.fillBeanWithMap(userMap, new UserDTO(), false);
//6.存在,保存用户信息到ThreadLocal
UserHolder.saveUser(userDTO);
//7.刷新token的有效期
String tokenKey = "login:token:"+ token;
stringRedisTemplate.expire(tokenKey,30, TimeUnit.MINUTES);
//7.放行
return true;
}
登录拦截器
public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception {
//1.判断是否需要拦截(ThreadLocal中是否有用户)
if (UserHolder.getUser() == null){
response.setStatus(401);
return false;
}
return true;
}
注意:在配置拦截器时,如果希望调节拦截器的执行顺序。可以调换注册顺序,也可以使用.order方法设置排序顺序,值越小的,越先执行。
商户缓存
什么是缓存?
缓存就是数据交换的缓冲区,称作cache,是存储数据的临时地方,一般读写性能比较高。
查询商户添加redis缓存
模型图
代码实现
业务流程
//1.从redis查询商铺缓存
//2.判断是否存在
//3.存在,直接返回
//4.不存在,根据id查询数据库
//5.不存在返回错误
//6.存在,写入redis
//7.返回
public Result queryById(Long id) {
String key ="cache:shop" + id;
//1.从redis查询商铺缓存
String shopJson = stringRedisTemplate.opsForValue().get(key);
//2.判断是否存在
if (StrUtil.isNotBlank(shopJson)) {
//3.存在,直接返回
Shop shop = JSONUtil.toBean(shopJson, Shop.class);
return Result.ok(shop);
}
//4.不存在,根据id查询数据库
Shop shop = getById(id);
//5.不存在返回错误
if (shop == null){
return Result.fail("店铺不存在");
}
//6.存在,写入redis
stringRedisTemplate.opsForValue().set(key,JSONUtil.toJsonStr(shop));
//7.返回
return Result.ok(shop);
}
店铺类型缓存
缓存更新策略
业务场景:
- 低一致性需求:使用内存淘汰机制。例如店铺类型的缓存查询
- 高一致性需求:主动更新,并以超时剔除作为兜底方案。例如店铺详情查询。
主动更新策略
- Cache Aside Pattern:由缓存的调用者,在更新数据库时的同时更新缓存。
- Read/Write Though Pattern:缓存和数据库整合为一个服务,由服务来维护一致性。调用者调用该服务,无需关心一致性问题。
- Write Behind Caching Pattern:调用者只操作缓存,其他线程异步的将缓存数据持久化到数据库,保证最终一致。
操作缓存和数据库时有三个问题需要考虑
- 删除缓存还是更新缓存?
更新缓存:每次更新数据库都更新缓存,无效写操作较多。
删除缓存:更新数据库时让缓存无效,查询时再更新缓存。
2. 如何保证缓存与数据库的操作的同时成功或失败?
- 单体系统,将缓存与数据库操作放在一个事务里。
- 分布式系统:利用TCC等分布式事务方案。
- 先操作缓存还是先操作数据库?
- 先删除缓存,再操作数据库
- 先操作数据库,再删除缓存
缓存更新策略的最佳实践方案
- 低一致性需求:使用redis自带的内存淘汰机制
- 高一致性需求:主动更新,并以超时剔除作为兜底方案
- 读操作:
- 缓存命中则直接返回
- 缓存未命中则查询数据库,并写入缓存,设置超时时间。
- 写操作:
- 先写数据库,再删除缓存操作
- 要保证数据库与缓存操作的原子性
给查询商铺的缓存添加超时剔除和主动更新的策略
修改ShopController中的逻辑业务,满足下面的需求:
- 根据id查询店铺时,如果缓存未命中,则查询数据库,将数据库写入缓存,并设置超时时间。
- 根据id修改店铺时,先改数据库,在删除缓存。
//查询:
public Result queryById(Long id) {
String key =RedisConstants.CACHE_SHOP_KEY+ id;
//1.从redis查询商铺缓存
String shopJson = stringRedisTemplate.opsForValue().get(key);
//2.判断是否存在
if (StrUtil.isNotBlank(shopJson)) {
//3.存在,直接返回
Shop shop = JSONUtil.toBean(shopJson, Shop.class);
return Result.ok(shop);
}
//4.不存在,根据id查询数据库
Shop shop = getById(id);
//5.不存在返回错误
if (shop == null){
return Result.fail("店铺不存在");
}
//6.存在,写入redis
stringRedisTemplate.opsForValue().set(key,JSONUtil.toJsonStr(shop), RedisConstants.CACHE_SHOP_TTL, TimeUnit.MINUTES);
//7.返回
return Result.ok(shop);
}
更新:
@Transactional
public Result update(Shop shop) {
Long id = shop.getId();
if (id == null){
return Result.fail("店铺id不能为空");
}
//1.更新数据库
updateById(shop);
//2.删除缓存
stringRedisTemplate.delete(RedisConstants.CACHE_SHOP_KEY+ id);
return Result.ok();
}
缓存穿透
缓存穿透是指客户端请求的数据在缓存中和数据库中都不存在,这样缓存永远不会生效,这些请求都会达到数据库
解决方案
缓存空对象
优点:实现简单、维护方便
缺点:额外的内存消耗(设置过期时间解决)、可能造成短期的不一致
布隆过滤
优点:内存占用少,没有多余key
缺点:实现复杂、存在误判可能
代码解决
流程图
代码改进
public Result queryById(Long id) {
String key =RedisConstants.CACHE_SHOP_KEY+ id;
//1.从redis查询商铺缓存
String shopJson = stringRedisTemplate.opsForValue().get(key);
//2.判断是否存在
if (StrUtil.isNotBlank(shopJson)) {
//3.存在,直接返回
Shop shop = JSONUtil.toBean(shopJson, Shop.class);
return Result.ok(shop);
}
//判断命中的是否是空值
if (shopJson != null){
//返回错误信息
return Result.fail("店铺不存在");
}
//4.不存在,根据id查询数据库
Shop shop = getById(id);
//5.不存在返回错误
if (shop == null){
//将空值写入redis
stringRedisTemplate.opsForValue().set(key,"", RedisConstants.CACHE_NULL_TTL, TimeUnit.MINUTES);
//返回错误信息
return Result.fail("店铺不存在");
}
//6.存在,写入redis
stringRedisTemplate.opsForValue().set(key,JSONUtil.toJsonStr(shop), RedisConstants.CACHE_SHOP_TTL, TimeUnit.MINUTES);
//7.返回
return Result.ok(shop);
}
解决缓存穿透的两个重点:
- 如果提交的查询条件不存在,需要将空值写入redis
- 判断命中缓存后,命中的是否为null。
总结
- 缓存穿透产生的原因是什么?
用户请求的数据在缓存中和数据库中都不存在,不断发起这样的请求,给数据库带来了巨大压力。
- 缓存穿透的结局方案有哪些?
- 缓存null
- 布隆过滤
- 增加id的复杂度,避免被猜测id规律
- 做好数据的基础格式校验
- 加强用户权限校验
- 做好热点参数校验
缓存雪崩
缓存雪崩是指在同一时段,大量的缓存key同时失效或者Redis服务宕机,导致大量的请求到达数据库,带来巨大压力。
解决方案
- 给不同的key的TTL添加随机值(批量导入时,随机设置TTL)
- 利用Redis集群提高服务的可用性
- 给缓存业务添加降级限流策略
- 给业务添加多级缓存
缓存击穿
缓存击穿问题也叫热点key问题,就是一个被高并发访问并且缓存重建业务比较复杂的key突然失效了,无数的请求访问会在瞬间给数据库带来的巨大冲击
解决方案
互斥锁
逻辑过期
两种方案对比
基于互斥锁方式解决缓存击穿的问题
需求:修改根据id查询商铺的业务,基于互斥锁方式来解决缓存击穿问题
流程图:
利用setnx命令原理(如果key不存在,新建key,如果key已经存在,那么不进行操作)来实现互斥锁。
生成锁方法:
private boolean tryLock(String key){
Boolean flag = stringRedisTemplate.opsForValue().setIfAbsent(key, "1", 10, TimeUnit.SECONDS);
return BooleanUtil.isTrue(flag);
}
释放锁方法:
private void unlock(String key){
stringRedisTemplate.delete(key);
}
互斥锁实现缓存击穿代码:
public Shop queryWithMutex(Long id){
String key =RedisConstants.CACHE_SHOP_KEY+ id;
//1.从redis查询商铺缓存
String shopJson = stringRedisTemplate.opsForValue().get(key);
//2.判断是否存在
if (StrUtil.isNotBlank(shopJson)) {
//3.存在,直接返回
Shop shop = JSONUtil.toBean(shopJson, Shop.class);
return shop;
}
//判断命中的是否是空值
if (shopJson != null){
//返回错误信息
return null;
}
//4.实现缓存重建
//4.1获取互斥锁
String lockKey = RedisConstants.LOCK_SHOP_KEY + id;
Shop shop = null;
try {
boolean isLock = tryLock(lockKey);
//4.2判断是否成功
if (!isLock){
//4.3失败则休眠并重试
Thread.sleep(50);
return queryWithMutex(id);
}
//4.4成功,根据id查询数据库
shop = getById(id);
//5.不存在返回错误
if (shop == null){
//将空值写入redis
stringRedisTemplate.opsForValue().set(key,"", RedisConstants.CACHE_NULL_TTL, TimeUnit.MINUTES);
//返回错误信息
return null;
}
//6.存在,写入redis
stringRedisTemplate.opsForValue().set(key,JSONUtil.toJsonStr(shop), RedisConstants.CACHE_SHOP_TTL, TimeUnit.MINUTES);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}finally {
//7.释放互斥锁
unlock(lockKey);
}
//7.返回
return shop;
}
基于逻辑过期方式解决缓存击穿问题
逻辑过期不是真正的过期,在存入redis额外的添加一个过期时间的字段。不用设置ttl(过期时间),过期时间不是由redis控制,而是由程序员控制。
流程图:
代码实现:
/**
* 封装店铺信息逻辑过期时间的方法
* @param id
* @param expireSeconds
*/
private void saveShop2Redis(Long id,Long expireSeconds){
//1.查询店铺数据
Shop shop = getById(id);
//2.封装逻辑过期时间
RedisData redisData = new RedisData();
redisData.setData(shop);
redisData.setExpireTime(LocalDateTime.now().plusSeconds(expireSeconds));
//3.写入Redis
stringRedisTemplate.opsForValue().set(RedisConstants.CACHE_SHOP_KEY+id,JSONUtil.toJsonStr(redisData));
}
private static final ExecutorService CACHE_REBUILD_EXECUTOR = Executors.newFixedThreadPool(10);
/**
* 逻辑过期解决缓存击穿
* @param id
* @return
*/
public Shop queryWithLogicalExpire(Long id){
String key =RedisConstants.CACHE_SHOP_KEY+ id;
//1.从redis查询商铺缓存
String shopJson = stringRedisTemplate.opsForValue().get(key);
//2.判断是否存在
if (StrUtil.isBlank(shopJson)) {
//3.不存在,直接返回
return null;
}
//4.命中,需要把json反序列化为对象
RedisData redisData = JSONUtil.toBean(shopJson, RedisData.class);
LocalDateTime expireTime = redisData.getExpireTime();
JSONObject data = (JSONObject) redisData.getData();
Shop shop = JSONUtil.toBean(data, Shop.class);
//5.判断是否过期
if (expireTime.isAfter(LocalDateTime.now())){
//5.1未过期,直接返回
return shop;
}
//5.2已过期,需要缓存重建
//6.缓存重建
//6.1获取互斥锁
String lockKey = RedisConstants.LOCK_SHOP_KEY + id;
boolean isLock = tryLock(lockKey);
//6.2判断是否获取互斥锁成功
if (isLock){
//6.3成功,开启独立线程,实现缓存重建
CACHE_REBUILD_EXECUTOR.submit(()->{
//重建缓存
try {
this.saveShop2Redis(id,20L);
} catch (Exception e) {
throw new RuntimeException(e);
}finally {
//释放锁
unlock(lockKey);
}
});
}
//6.4返回过期的商铺信息
return shop;
}
缓存工具类的封装
基于StringRedisTemplete封装一个缓存工具类,满足下面的需求
- 方法1:将任意java对象序列化为json并存储在string类型的key中,并且可以设置TTL过期时间
- 方法2:将任意ava对象序列化为json并存储在string类型的key中,并且可以设置逻辑过期时间,用于处理缓存击穿问题
- 方法3:根据指定的key查询缓存,并反序列化为指定类型,利用缓存空值的方式解决缓存穿透问题
- 方法4:根据指定的key查询缓存,并反序列化为指定类型,需要利用逻辑过期解决缓存击穿问题
/**
* 将任意java对象序列化为json并存储在string类型的key中,并且可以设置TTL过期时间
* @param key
* @param value
* @param time
* @param unit
*/
public void set(String key, Object value, Long time, TimeUnit unit){
stringRedisTemplate.opsForValue().set(key, JSONUtil.toJsonStr(value),time,unit);
}
/**
* 将任意ava对象序列化为json并存储在string类型的key中,并且可以设置逻辑过期时间,用于处理缓存击穿问题
* @param key
* @param value
* @param time
* @param unit
*/
public void setWithLogicalExpire(String key, Object value, Long time, TimeUnit unit){
//所谓逻辑过期,在存入redis时,给数据里加入一个逻辑过期的字段
//设置逻辑过期
RedisData redisData = new RedisData();
redisData.setData(value);
redisData.setExpireTime(LocalDateTime.now().plusSeconds(unit.toSeconds(time)));
stringRedisTemplate.opsForValue().set(key, JSONUtil.toJsonStr(redisData));
}
/**
* 解决缓存穿透的工具方法
* @param id
* @param type
* @return
* @param <R>
*/
public <R,ID> R queryWithPassThrough(
String keyPrefix, ID id, Class<R> type, Function<ID,R> dbFallback,Long time, TimeUnit unit){
String key = keyPrefix + id;
//1.从redis查询商铺缓存
String json = stringRedisTemplate.opsForValue().get(key);
//2.判断是否存在
if (StrUtil.isNotBlank(json)) {
//3.存在,直接返回
return JSONUtil.toBean(json,type);
}
//判断命中的是否是空值
if (json != null){
//返回错误信息
return null;
}
//4.不存在,根据id查询数据库
R r = dbFallback.apply(id);
//5.不存在返回错误
if (r == null){
//将空值写入redis
stringRedisTemplate.opsForValue().set(key,"", RedisConstants.CACHE_NULL_TTL, TimeUnit.MINUTES);
//返回错误信息
return null;
}
//6.存在,写入redis
this.set(key,r,time,unit);
//7.返回
return r;
}
private static final ExecutorService CACHE_REBUILD_EXECUTOR = Executors.newFixedThreadPool(10);
/**
* 逻辑过期解决缓存击穿
* @param id
* @return
*/
public <R,ID> R queryWithLogicalExpire(String keyPrefix,ID id,Class<R> type,Function<ID,R> dbFallback,Long time, TimeUnit unit){
String key = keyPrefix + id;
//1.从redis查询商铺缓存
String json = stringRedisTemplate.opsForValue().get(key);
//2.判断是否存在
if (StrUtil.isBlank(json)) {
//3.不存在,直接返回
return null;
}
//4.命中,需要把json反序列化为对象
RedisData redisData = JSONUtil.toBean(json, RedisData.class);
LocalDateTime expireTime = redisData.getExpireTime();
JSONObject data = (JSONObject) redisData.getData();
R r = JSONUtil.toBean(data,type);
//5.判断是否过期
if (expireTime.isAfter(LocalDateTime.now())){
//5.1未过期,直接返回
return r;
}
//5.2已过期,需要缓存重建
//6.缓存重建
//6.1获取互斥锁
String lockKey = RedisConstants.LOCK_SHOP_KEY + id;
boolean isLock = tryLock(lockKey);
//6.2判断是否获取互斥锁成功
if (isLock){
//6.3成功,开启独立线程,实现缓存重建
CACHE_REBUILD_EXECUTOR.submit(()->{
//重建缓存
try {
//查询数据库
R r1 = dbFallback.apply(id);
//写入redis
this.setWithLogicalExpire(key,r1,time,unit);
} catch (Exception e) {
throw new RuntimeException(e);
}finally {
//释放锁
unlock(lockKey);
}
});
}
//6.4返回过期的商铺信息
return r;
}
/**
* 获取锁
* @param key
* @return
*/
private boolean tryLock(String key){
Boolean flag = stringRedisTemplate.opsForValue().setIfAbsent(key, "1", 10, TimeUnit.SECONDS);
return BooleanUtil.isTrue(flag);
}
/**
* 释放锁
* @param key
*/
private void unlock(String key){
stringRedisTemplate.delete(key);
}
}
总结
缓存穿透
缓存雪崩
缓存击穿(热点key失效)
优惠卷秒杀
全局唯一ID
当用户抢购时,就会生成订单并保存到tb_voucher_oder这张表中,而订单表如果使用数据库自增ID就会存在一些问题:
- id的规律太明显
- 受单表数据量的显示
全局id生成器
是一种在分布式系统下用来生成全局的唯一id的工具,一般要满足下列特性:
为了增加ID的安全性,我们不可以直接使用Redis自增的数值,而是拼接一些其他信息:
ID的组成成分:
- 符号位:1bit,永远为0
- 时间戳:31bit,以秒为单位,可以使用69年
- 序列号:32bit,秒内计数器,支持每秒产生2的32次方个不同的ID
代码:
public class RedisIdWorker {
//开始的时间戳
private static final long BEGIN_TIMESTAMP = 1640995200L;
//序列号的位数
private static final int COUNT_BITS = 32;
@Resource
private StringRedisTemplate stringRedisTemplate;
public long nextId(String keyPrefix){
//1.生成时间戳
LocalDateTime now = LocalDateTime.now();
long nowSecond = now.toEpochSecond(ZoneOffset.UTC);
long timestamp = nowSecond - BEGIN_TIMESTAMP;
//2.生成序列化
//2.1获取当前的日期,精确到天
String date = now.format(DateTimeFormatter.ofPattern("yyyyMMdd"));
//2.2自增长
Long count = stringRedisTemplate.opsForValue().increment("icr:" + keyPrefix + ":" + date);
//3.拼接返回
return timestamp << COUNT_BITS | count;
}
}
总结
全局唯一id生成策略:
- UUID
- Redis自增
- 雪花算法
- 数据库自增
Redis自增ID策略:
- 每天一个key,方便统计订单量
- ID构造是:时间戳+计数器
实现优惠券的秒杀下单
每个店铺都可以发布优惠券,分为平价券和特价券。平价券可以任意购买,特价券需要秒杀抢购。
表关系如下:
- tb_voucher:优惠券的基本信息,优惠金额、使用规则等
- tb_seckill_voucher:优惠券的库存、开始抢购时间、结束抢购时间。特价优惠券才需要填写这些信息。
接口介绍
注意事项
下单时需要注意两点:
- 秒杀是否开始或结束,如果未开始或者已结束则无法下单
- 库存是否充足,不足则无法下单
流程图
代码实现
代码流程:
//1.查询优惠卷
//2.判断秒杀是否开始
//3.判断秒杀是否结束
//4.判断库存是否充足
//5.扣除库存
//6.创建菜单
//7.返回订单id
@Resource
private ISeckillVoucherService seckillVoucherService;
@Resource
private RedisIdWorker redisIdWorker;
@Transactional
@Override
public Result seckillVoucher(Long voucherId) {
//1.查询优惠卷
SeckillVoucher voucher = seckillVoucherService.getById(voucherId);
//2.判断秒杀是否开始
if (voucher.getBeginTime().isAfter(LocalDateTime.now())) {
//尚未开始
return Result.fail("秒杀尚未开始");
}
//3.判断秒杀是否结束
if (voucher.getEndTime().isBefore(LocalDateTime.now())){
//秒杀结束
return Result.fail("秒杀时间结束");
}
//4.判断库存是否充足
if (voucher.getStock()<1){
return Result.fail("库存不足");
}
//5.扣除库存
boolean success = seckillVoucherService.update()
.setSql("stock = stock - 1")
.eq("voucher_id", voucherId).update();
if (!success){
//扣件失败
return Result.fail("库存不足");
}
//6.创建菜单
VoucherOrder voucherOrder = new VoucherOrder();
//6.1.订单id
long orderId = redisIdWorker.nextId("oder");
voucherOrder.setId(orderId);
//6.2.用户id
Long userId = UserHolder.getUser().getId();
voucherOrder.setUserId(userId);
//6.3.代金券id
voucherOrder.setVoucherId(voucherId);
save(voucherOrder);
//7.返回订单id
return Result.ok(orderId);
}
超卖问题
超卖问题是典型的多线程安全问题。
解决方案
针对这一问题的常见方案就是加锁
乐观锁解决
乐观锁的关键是判断之前查询到的数据是否有被修改过,常见的方式有两种:
- 版本号法
- CAS法
总结
- 悲观锁:添加同步锁,让线程串行执行
- 优点:简单粗暴
- 缺点:性能一般
- 乐观锁:不加锁,在更新时判断是否有其他的线程在修改
- 优点:性能好
- 缺点:存在成功率比较低的问题
一人一单
需求:修改秒杀业务,要求一个优惠卷,一个用户只能下一单
流程图
代码
通过对用户的id进行加锁实现了一人一单的问题
@Resource
private ISeckillVoucherService seckillVoucherService;
@Resource
private RedisIdWorker redisIdWorker;
@Override
public Result seckillVoucher(Long voucherId) {
//1.查询优惠卷
SeckillVoucher voucher = seckillVoucherService.getById(voucherId);
//2.判断秒杀是否开始
if (voucher.getBeginTime().isAfter(LocalDateTime.now())) {
//尚未开始
return Result.fail("秒杀尚未开始");
}
//3.判断秒杀是否结束
if (voucher.getEndTime().isBefore(LocalDateTime.now())){
//秒杀结束
return Result.fail("秒杀时间结束");
}
//4.判断库存是否充足
if (voucher.getStock()<1){
return Result.fail("库存不足");
}
Long userId = UserHolder.getUser().getId();
synchronized (userId.toString().intern()) {
//获取代理对象(事务)
IVoucherOrderService proxy = (IVoucherOrderService) AopContext.currentProxy();
return proxy.createVoucherOrder(voucherId);
}
}
@Transactional
public Result createVoucherOrder(Long voucherId) {
//6.一人一单
Long userId = UserHolder.getUser().getId();
//6.1查询订单
Integer count = query().eq("user_id", userId).eq("voucher_id", voucherId).count();
//6.2判断是否存在
if (count > 0) {
//用户已经购买
return Result.fail("用户购买已经购买过");
}
//5.扣除库存
boolean success = seckillVoucherService.update()
.setSql("stock = stock - 1")
.eq("voucher_id", voucherId).gt("stock", 0)
.update();
if (!success) {
//扣件失败
return Result.fail("库存不足");
}
//6.创建菜单
VoucherOrder voucherOrder = new VoucherOrder();
//6.1.订单id
long orderId = redisIdWorker.nextId("oder");
voucherOrder.setId(orderId);
//6.2.用户id
voucherOrder.setUserId(userId);
//6.3.代金券id
voucherOrder.setVoucherId(voucherId);
save(voucherOrder);
//7.返回订单id
return Result.ok(orderId);
}
一人一单的并发安全问题
通过加锁可以解决在单机情况下的宜人一单的安全问题,但是在集群模式下就不行了。
在集群模式下,多个jvm里面多个锁监视器,导致锁失效。因为只能保持单个jvm。
分布式锁
满足分布式系统或集群模式下多进程可见并且互斥的锁
分布式锁的实现
分布式锁的核心是实现多进程之间的互斥,而满足这一点的方式有很多,常见的有三种
基于Redis的分布式锁
实现分布式锁时需要实现的两个基本方法
- 获取锁:
- 互斥:确保只有一个线程获取锁:setnx key value
- 添加过期时间,避免服务器宕机引起的死锁:expire key 10(time)
- 非阻塞:尝试一次,成功返回true,失败返回false
- 释放锁:
- 手动释放:del key
- 超时释放:获取锁时添加一个过期时间
流程图:
初级版本
需求:定义一个类,实现下面的接口,利用redis实现分布式锁功能
创建锁释放锁的工具类:
public class SimpleRedisLock implements ILock{
private StringRedisTemplate stringRedisTemplate;
private String name;
public SimpleRedisLock(StringRedisTemplate stringRedisTemplate, String name) {
this.stringRedisTemplate = stringRedisTemplate;
this.name = name;
}
private static final String key_prefix = "lock:";
@Override
public boolean tryLock(long timeoutSec) {
//获取线程标识
long threadId = Thread.currentThread().getId();
//获取锁
Boolean seccess = stringRedisTemplate.opsForValue().setIfAbsent(key_prefix+name, threadId + " ", timeoutSec, TimeUnit.SECONDS);
return Boolean.TRUE.equals(seccess);
}
@Override
public void unlock() {
stringRedisTemplate.delete(key_prefix+name);
}
}
代码实现
@Service
public class VoucherOrderServiceImpl extends ServiceImpl<VoucherOrderMapper, VoucherOrder> implements IVoucherOrderService {
@Resource
private ISeckillVoucherService seckillVoucherService;
@Resource
private RedisIdWorker redisIdWorker;
@Resource
private StringRedisTemplate stringRedisTemplate;
@Override
public Result seckillVoucher(Long voucherId) {
//1.查询优惠卷
SeckillVoucher voucher = seckillVoucherService.getById(voucherId);
//2.判断秒杀是否开始
if (voucher.getBeginTime().isAfter(LocalDateTime.now())) {
//尚未开始
return Result.fail("秒杀尚未开始");
}
//3.判断秒杀是否结束
if (voucher.getEndTime().isBefore(LocalDateTime.now())){
//秒杀结束
return Result.fail("秒杀时间结束");
}
//4.判断库存是否充足
if (voucher.getStock()<1){
return Result.fail("库存不足");
}
Long userId = UserHolder.getUser().getId();
//创建锁对象
SimpleRedisLock lock = new SimpleRedisLock(stringRedisTemplate, "oder" + userId);
//获取锁
boolean islock = lock.tryLock(5);
//判断是否获取锁成功
if (!islock){
//获取锁失败,返回错误或重试
return Result.fail("不允许重复下单");
}
try {
//获取代理对象(事务)
IVoucherOrderService proxy = (IVoucherOrderService) AopContext.currentProxy();
return proxy.createVoucherOrder(voucherId);
} catch (IllegalStateException e) {
throw new RuntimeException(e);
} finally {
lock.unlock();
}
}
@Transactional
public Result createVoucherOrder(Long voucherId) {
//6.一人一单
Long userId = UserHolder.getUser().getId();
//6.1查询订单
Integer count = query().eq("user_id", userId).eq("voucher_id", voucherId).count();
//6.2判断是否存在
if (count > 0) {
//用户已经购买
return Result.fail("用户购买已经购买过");
}
//5.扣除库存
boolean success = seckillVoucherService.update()
.setSql("stock = stock - 1")
.eq("voucher_id", voucherId).gt("stock", 0)
.update();
if (!success) {
//扣件失败
return Result.fail("库存不足");
}
//6.创建菜单
VoucherOrder voucherOrder = new VoucherOrder();
//6.1.订单id
long orderId = redisIdWorker.nextId("oder");
voucherOrder.setId(orderId);
//6.2.用户id
voucherOrder.setUserId(userId);
//6.3.代金券id
voucherOrder.setVoucherId(voucherId);
save(voucherOrder);
//7.返回订单id
return Result.ok(orderId);
}
}
极端情况
线程1释放了别的线程的锁。
在释放锁时要判断锁的标识是否是自己的
改进Redis的分布式锁
需求:修改之前的分布式锁实现,满足:
- 在获取锁时存入线程标识(可用UUID表示)
- 在释放锁时先获取锁中的线程标识,判断是否是当前的线程标识
- 如果一致则释放锁
- 如果不一致则不释放锁
流程图:
代码实现:
public class SimpleRedisLock implements ILock{
private StringRedisTemplate stringRedisTemplate;
private String name;
public SimpleRedisLock(StringRedisTemplate stringRedisTemplate, String name) {
this.stringRedisTemplate = stringRedisTemplate;
this.name = name;
}
private static final String key_prefix = "lock:";
private static final String ID_PREFIX = UUID.randomUUID().toString(true)+"-";
@Override
public boolean tryLock(long timeoutSec) {
//获取线程标识
String threadId = ID_PREFIX+Thread.currentThread().getId();
//获取锁
Boolean seccess = stringRedisTemplate.opsForValue().setIfAbsent(key_prefix+name, threadId, timeoutSec, TimeUnit.SECONDS);
return Boolean.TRUE.equals(seccess);
}
@Override
public void unlock() {
//获取线程标识
String threadId = ID_PREFIX+Thread.currentThread().getId();
//获取锁中的标识
String id = stringRedisTemplate.opsForValue().get(key_prefix + name);
//判断标识是否一致
if (threadId.equals(id)){
stringRedisTemplate.delete(key_prefix+name);
}
}
}
又存在的问题
redis的lua脚本
调用脚本
实例
再次改进Redis的分布式锁
需求:基于Lua脚本实现分布式锁的释放锁逻辑
提示:RedisTemplete调用Lua脚本的API如下:
代码实现:
@Override
public void unlock() {
//调用lua脚本
stringRedisTemplate.execute(
UNLOCK_SCRIPT,
Collections.singletonList(key_prefix+name),
ID_PREFIX+Thread.currentThread().getId());
}
private static final DefaultRedisScript<Long> UNLOCK_SCRIPT;
static {
UNLOCK_SCRIPT = new DefaultRedisScript<>();
UNLOCK_SCRIPT.setLocation(new ClassPathResource("unlock.lua"));
UNLOCK_SCRIPT.setResultType(Long.class);
}
lua脚本:
--比较线程标识与锁中的标识是否一致
if (redis.call('get',KEYS[1])==ARGV[1]) then
-- 释放锁 del key
return redis.call('del',KEYS[1])
end
return 0
总结
基于redis的分布式锁的实现思路:
- 利用setnx获取锁,并设置过期时间,保存线程标识
- 释放锁时先判断线程标识是否与自己的一致,一致则删除锁。
特性:
- 利用setnx满足互斥性
- 利用setex保证故障时锁依然能够释放,避免死锁,提高安全性。
- 利用redis集群保证高可用性和高并发性的特性
基于redis的分布式锁的优化
基于setnx实现的分布式锁存在下面的问题:
Redisson
Redisson是一个在redis基础上实现的Java驻内存数据网络。它不仅提供了一系列的分布式的Java常用对象,还提供了许多分布式服务,其中就包含了各种分布式锁的实现。
官网地址:https://redison.org
GitHub地址:https://github.com/redisson/redisson
Redisson入门
Redisson可重入锁的原理
lua脚本解决:获取锁
lua脚本解决释放锁:
整个流程图
总结
Redisson分布式锁的原理:
- 可重入:利用hash结构记录线程id和重入次数:
- 每次获取锁时都判断锁是否存在,如果不存在直接获取,如果存在,对比线程标识,如果当前的线程标识是当前线程,那么可以再次获取,并把重入次数加一。在以后释放锁时,把重用次数减一。重用次数减到0时,代表业务走到最外层,就可以真正的释放锁。
- 可重试:利用信号量和PubSub功能实现等待、唤醒、获取锁失败的重试机制。
- 在第一次重试获取锁失败后,不立即失败,而是等待PubSub释放锁的消息,就可以重新获取锁。有等待的超时时间。
- 超时续约:利用watchdaog,每隔一段时间(releaseTime/3),重置超时时间。
- 获取锁成功以后,开启一个定时任务。这个定时任务每隔一段时间,重置超时时间。
Redisson分布式锁主从一致问题
- 不可重入的Redis分布式锁
- 原理:利用setnx的互斥性;利用ex避免死锁;释放锁时判断线程标识
- 缺陷:不可重入、无法重试、锁超时失效
- 可重入的Redis分布式锁
- **原理:**利用hash结构,记录线程标识和重入次数;利用watchDog延续锁时间;利用信号量控制锁重试等待
- 缺陷:redis宕机失效问题。
- Redisson的multiLock:
- 原理:多个独立的Redis节点,必须在所有的节点都获取重入锁,才算获取锁成功。
- 缺陷:运维成本高、实现复杂。
Redis优化秒杀
流程图
实现
案例 改进秒杀业务,提高并发性能
需求:
- 新增秒杀优惠券的同时,将优惠券信息保存到redis中
stringRedisTemplate.opsForValue().set(RedisConstants.SECKILL_STOCK_KEY+voucher.getId(),voucher.getStock().toString());
- 基于lua脚本,判断秒杀库存、一人一单,决定用户是否抢购成功
--1.参数列表
--1.1优惠券id
local voucherId = ARGV[1]
--1.2.用户id
local userId = ARGV[2]
--2.数据key
--2.1.库存key
local stockKey = 'seckill:stock' .. voucherId
--2.2订单key
local orderKey = 'seckill:order' .. voucherId
--3.脚本业务
--3.1.判断库存是否充足 get stockKey是否大于0
if (tonumber(redis.call('get',stockKey)) <= 0) then
--库存不足,返回1
return 1
end
--3.2判断用户是否下单 SISMEMBER oderKey userId 判断用户的id是否在里面
if (redis.call('sismember',orderKey,userId) == 1) then
--存在,说明重复下单 返回2
return 2
end
--3.4.扣库存 incrby stockKey -1
redis.call('incrby',stockKey,-1)
--3.5下单(保存用户)sadd orderKey userId 把userId保存到orderKey里面
redis.call('sadd',orderKey,userId)
- 如果抢购成功,将优惠券id和用户id封装后存入阻塞队列
private static final DefaultRedisScript<Long> SECKILL_SCRIPT;
static {
SECKILL_SCRIPT = new DefaultRedisScript<>();
SECKILL_SCRIPT.setLocation(new ClassPathResource("seckill.lua"));
SECKILL_SCRIPT.setResultType(Long.class);
}
private BlockingQueue<VoucherOrder> orderTasks = new ArrayBlockingQueue<>(1024*1024);
@Override
public Result seckillVoucher(Long voucherId) {
//获取用户
Long userId = UserHolder.getUser().getId();
//1.执行lua脚本
Long result = stringRedisTemplate.execute(
SECKILL_SCRIPT,
Collections.emptyList(),
voucherId.toString(), userId.toString());
//2.判断结果是否为0
int r = result.intValue();
if (r != 0){
//2.1.不为0,没有购买资格
return Result.fail(r == 1?"库存不足":"不能重复下单");
}
//2.2.为0,有购买资格,把下单信息保存到阻塞队列
VoucherOrder voucherOrder = new VoucherOrder();
//2.3.订单id
long orderId = redisIdWorker.nextId("order");
voucherOrder.setId(orderId);
//2.4.用户id
voucherOrder.setUserId(userId);
//2.5.代金券id
voucherOrder.setVoucherId(voucherId);
//2.6放入阻塞队列
orderTasks.add(voucherOrder);
//3.返回订单id
return Result.ok(orderId);
}
- 开启线程任务,不断从阻塞队列中获取信息,实现异步下单功能
private static final DefaultRedisScript<Long> SECKILL_SCRIPT;
static {
SECKILL_SCRIPT = new DefaultRedisScript<>();
SECKILL_SCRIPT.setLocation(new ClassPathResource("seckill.lua"));
SECKILL_SCRIPT.setResultType(Long.class);
}
private BlockingQueue<VoucherOrder> orderTasks = new ArrayBlockingQueue<>(1024*1024);
private static final ExecutorService SECKILL_ORDER_EXECUTOR = Executors.newSingleThreadExecutor();
private IVoucherOrderService proxy;
//此注解代表,当前类初始化完成后方法开始执行
@PostConstruct
private void init(){
SECKILL_ORDER_EXECUTOR.submit(new VoucherOrderHandler());
}
private class VoucherOrderHandler implements Runnable{
@Override
public void run() {
while (true){
try {
//1.获取队列中的订单信息
VoucherOrder voucherOrder = orderTasks.take();
//2.创建订单
handleVoucherOrder(voucherOrder);
} catch (Exception e) {
log.error("处理订单异常");
}
}
}
}
private void handleVoucherOrder(VoucherOrder voucherOrder) {
//获取用户
Long userId = voucherOrder.getUserId();
//创建锁对象
RLock lock = redissonClient.getLock("lock:oder" + userId);
//获取锁
boolean islock = lock.tryLock();
//判断是否获取锁成功
if (!islock){
//获取锁失败,返回错误或重试
log.error("不允许重复下单");
return;
}
try {
return proxy.createVoucherOrder(voucherOrder);
} catch (IllegalStateException e) {
throw new RuntimeException(e);
} finally {
lock.unlock();
}
总结
- 秒杀业务的优化是什么?
- 先利用Redis完成库存量、一人一单的判断,完成抢单的业务。
- 再将下单业务放入阻塞队列,利用独立的线程异步下单
- 基于阻塞队列的异步秒杀存在哪些问题
- 内存限制问题
- 数据安全问题
Redis消息队列实现异步秒杀
消息队列
字面意思就是存放消息的队列,最简单的消息队列模型包括三个角色:
- 消息队列:存储和管理消息,也被称为消息代理(Message Broker)
- 生产者:发送消息到消息队列
- 消费者:从消息队列获取消息并处理消息
Redis提供了三种不同的方式来实现消息队列
- list结构:基于list结构模拟的消息队列
- PubSub:基于点对点的消息模型
- Stream:比较完善的消息队列模型
Redis中的消息队列
基于List的结构模拟的消息队列
Redis的list数据结构是一个双向链表,很容易模拟出队列效果:
队列的出口和入口不是一边,因此我们可以利用:LPUSH结合RPOP或者RPUSH结合LPOP来实现。
不过要注意的是,当队列中没有消息是RPOP或LPOP操作会返回null,并不像JVM的阻塞队列那样会阻塞并等待消息。因此这里应该使用BRPOP或BLPOP来实现阻塞效果。
基于List消息队列有哪些优缺点?
优点:
- 利用Redis存储,不受限于JVM内存上限
- 基于Redis的持久化机制,数据安全性有保证
- 可以满足消息的有序性
缺点:
- 无法避免消息丢失
- 只支持单消费者
基于PubSub的消息队列
基于Stream的消息队列
Stream是Redis5.0引入的一种新的数据类型,可以实现一个功能非常完善的消息队列
例如:
Stream消息队列-消费者组
将多个消费者划分到一个组中,监听同一个队列,具备一下特点:
总结
基于Stream作为消息队列,实现异步秒杀下单
达人探店
发布探店笔记
探店笔记类似于点评网站的评价,往往是图文结合。对应的表有两个:
- tb_blog:探店笔记表,包含笔记中的标题、文字、图片等
- tb_blog_comments:其他用户对探店笔记的评价
查看发布探店笔记
接口介绍
代码实现
@Override
public Result queryBlogById(Long id) {
//1.查询blog
Blog blog = getById(id);
if (blog == null){
return Result.fail("博客不存在");
}
//2.查询blog有关的用户
queryBlogByUser(blog);
return Result.ok(blog);
}
private void queryBlogByUser(Blog blog) {
Long userId = blog.getUserId();
User user = userService.getById(userId);
blog.setName(user.getNickName());
blog.setIcon(user.getIcon());
}
点赞功能
需求
- 同一个用户只能点赞一次,再次点击则取消赞
- 如果当前用户已经点赞,则点赞按钮高亮显示(前端已实现,判断字段Blog类的isLike属性)
实现步骤
- 给Blog类中添加一个isLike字段,标识是否被当前用户点赞
- 修改点赞功能,利用Redis的set集合判断是否点赞过,未点赞过则点赞数加一,已点赞过则点赞数减一
- 修改根据id查询Blog的业务,判断当前用户是否点赞过,赋值给isLike字段
- 修改分页查询Blog,判断当前登录用户是否点赞过,赋值给isLike字段
代码实现
public Result queryHotBlog(Integer current) {
// 根据用户查询
Page<Blog> page = query()
.orderByDesc("liked")
.page(new Page<>(current, SystemConstants.MAX_PAGE_SIZE));
// 获取当前页数据
List<Blog> records = page.getRecords();
// 查询用户
records.forEach(blog -> {
this.queryBlogByUser(blog);
this.isBlockLiked(blog);
});
return Result.ok(records);
}
@Override
public Result queryBlogById(Long id) {
//1.查询blog
Blog blog = getById(id);
if (blog == null){
return Result.fail("博客不存在");
}
//2.查询blog有关的用户
queryBlogByUser(blog);
//3.查询blog是否备点赞
isBlockLiked(blog);
return Result.ok(blog);
}
private void isBlockLiked(Blog blog) {
//1.获取当前登录用户
Long userId = UserHolder.getUser().getId();
//2.判断当前登录用户是否点赞
String key = RedisConstants.BLOG_LIKED_KEY + blog.getId();
Boolean isMember = stringRedisTemplate.opsForSet().isMember(key,userId.toString());
blog.setIsLike(BooleanUtil.isTrue(isMember));
}
@Override
public Result likeBlog(Long id) {
//1.获取当前登录用户
Long userId = UserHolder.getUser().getId();
//2.判断当前登录用户是否点赞
String key = RedisConstants.BLOG_LIKED_KEY + id;
Boolean isMember = stringRedisTemplate.opsForSet().isMember(key,userId.toString());
if (BooleanUtil.isFalse(isMember)){
//3.如果未点赞,可以点赞
//3.1.数据库点赞加一
boolean isSuccess = update().setSql("liked = liked +1").eq("id", id).update();
//3.2保存用户到redis的set集合
if (isSuccess){
stringRedisTemplate.opsForSet().add(key,userId.toString());
}
}else {
//4.如果一点赞,取消点赞
//4.1数据库点赞数减一
boolean isSuccess = update().setSql("liked = liked -1").eq("id", id).update();
if (isSuccess){
//4.2.把用于从Redis的set结合移除
stringRedisTemplate.opsForSet().add(key,userId.toString());
}
}
return Result.ok();
}
点赞排行榜
在探店笔记的详情页面,应该把给笔记点赞的人显示出来,比如最早点赞的TOP5,形成点赞排行榜:
接口介绍
需求
按照点赞时间先后顺序,返回top5的用户
改善点赞功能
可以使用SortedSet里面的排序,以时间戳作为score排序,然后显示排行。
用score方法判断是否存在,如果存在返回score不存在返回null的原理
private void isBlockLiked(Blog blog) {
//1.获取当前登录用户
Long userId = UserHolder.getUser().getId();
//2.判断当前登录用户是否点赞
String key = RedisConstants.BLOG_LIKED_KEY + blog.getId();
Double score = stringRedisTemplate.opsForZSet().score(key, userId.toString());
blog.setIsLike(score != null);
}
@Override
public Result likeBlog(Long id) {
//1.获取当前登录用户
Long userId = UserHolder.getUser().getId();
//2.判断当前登录用户是否点赞
String key = RedisConstants.BLOG_LIKED_KEY + id;
Double score = stringRedisTemplate.opsForZSet().score(key, userId.toString());
if (score == null){
//3.如果未点赞,可以点赞
//3.1.数据库点赞加一
boolean isSuccess = update().setSql("liked = liked +1").eq("id", id).update();
//3.2保存用户到redis的set集合
if (isSuccess){
stringRedisTemplate.opsForZSet().add(key,userId.toString(),System.currentTimeMillis());
}
}else {
//4.如果一点赞,取消点赞
//4.1数据库点赞数减一
boolean isSuccess = update().setSql("liked = liked -1").eq("id", id).update();
if (isSuccess){
//4.2.把用于从Redis的set结合移除
stringRedisTemplate.opsForZSet().remove(key,userId.toString());
}
}
return Result.ok();
}
点赞top5实现
逻辑:
//1.查询top5的点赞用户 zrange key 0 4
//2.解析出其中的用户id
//3.根据用户id查询用户
//4.返回
代码实现:
public Result queryBlogLikes(Long id) {
String key = RedisConstants.BLOG_LIKED_KEY +id;
//1.查询top5的点赞用户 zrange key 0 4
Set<String> top5 = stringRedisTemplate.opsForZSet().range(key, 0, 4);
if (top5 == null || top5.isEmpty()){
return Result.ok(Collections.emptyList());
}
//2.解析出其中的用户id
List<Long> ids = top5.stream().map(Long::valueOf).collect(Collectors.toList());
String idStr = StringUtil.join(ids,",");
//3.根据用户id查询用户
List<UserDTO> userDtos = userService.query()
.in("id",ids)
.last("order by FIELD(id," + idStr + ")").list()
.stream()
.map(user -> BeanUtil.copyProperties(user, UserDTO.class))
.collect(Collectors.toList());
//4.返回
return Result.ok(userDtos);
}
好友关注
关注和取关
需求:基于该表数据结构,实现两个接口
- 关注和取关接口
- 判断是否关注的接口
关注是User之间的关系,是博主与粉丝之间的关系,数据库中有一张表tb_follow标识
关注 取关
业务流程:
//1.判断是关注还是取关
//2.关注,新增数据
//3.取关,删除数据
public Result follow(Long followUserId, Boolean isFollow) {
//1.获取登录用户的id
Long userId = UserHolder.getUser().getId();
//1.判断是关注还是取关
if (isFollow) {
//2.关注,新增数据
Follow follow = new Follow();
follow.setUserId(userId);
follow.setFollowUserId(followUserId);
save(follow);
}else {
//3.取关,删除数据
remove(new QueryWrapper<Follow>().eq("user_id",userId).eq("follow_user_id",followUserId));
}
return Result.ok();
}
判断是否关注
@Override
public Result isFollow(Long followUserId) {
//1.获取登录用户的id
Long userId = UserHolder.getUser().getId();
//1.查询是否关注
Integer count = query().eq("user_id", userId).eq("follow_user_id", followUserId).count();
return Result.ok(count>0);
}
共同关注
需求
利用redis中恰当的数据结构,实现共同关注的功能。在博主的个人页面展示出当前用户与博主的共同好友。
实现思路
取博主和登录用户关注的人的交集,利用redis的set集合的 实现,改造之前的关注功能,在关注时,将用户的id作为key,被关注的用户的id作为值存入redis中,然后查的时候取交集。
public Result follow(Long followUserId, Boolean isFollow) {
//1.获取登录用户的id
Long userId = UserHolder.getUser().getId();
String key = "follows:"+userId;
//1.判断是关注还是取关
if (isFollow) {
//2.关注,新增数据
Follow follow = new Follow();
follow.setUserId(userId);
follow.setFollowUserId(followUserId);
boolean isSuccess = save(follow);
if (isSuccess){
//把关注的用户的id,放入redis的set集合 add userId foolowUserId
stringRedisTemplate.opsForSet().add(key,followUserId.toString());
}
}else {
//3.取关,删除数据
remove(new QueryWrapper<Follow>().eq("user_id",userId).eq("follow_user_id",followUserId));
//把关注用户的id从Redis中移除
stringRedisTemplate.opsForSet().remove(key,followUserId);
}
return Result.ok();
}
共同关注的接口介绍
代码实现
实现思路:
//1.获取当前登录用户
//2.求交集
//3.解析id集合
//4.查询用户
代码:
@Override
public Result followCommons(Long id) {
//1.获取当前登录用户
Long userId = UserHolder.getUser().getId();
String key = "follows:" +userId;
//2.求交集
String key2 = "follows:"+id;
Set<String> intersect = stringRedisTemplate.opsForSet().intersect(key, key2);
if (intersect == null || intersect.isEmpty()){
return Result.ok(Collections.emptyList());
}
//3.解析id集合
List<Long> ids = intersect.stream().map(Long::valueOf).collect(Collectors.toList());
//4.查询用户
List<UserDTO> users = userService.listByIds(ids)
.stream()
.map(user -> BeanUtil.copyProperties(user, UserDTO.class))
.collect(Collectors.toList());
return Result.ok(users);
}
关注推送
关注推送也叫Feed流,直译为投喂,为用户提供“沉浸式”体验。通过无线下拉刷新获取新的消息
Feed流的模式
Feed流产品有两种常见的形式
- Timeline:不做内容筛选,简单的按照内容发布时间顺序,常用语好友或关注。例如朋友圈
- 优点:信息全面,不会有确实,实现也相对简单。
- 缺点:信息噪音较多,用户不一定感兴趣,内容获取效率低。
- 智能排序:利用智能算法屏蔽掉违规的、用户不感兴趣的内容。推送用户感兴趣的信息,来吸引用户
- 优点:投喂用户感兴趣信息,用户粘度很高,容易沉迷。
- 缺点:算法不准确,可能起到反作用。
例如本例中的个人页面,是基于关注的好友来做Feed流,因此采用Timeline的模式。该模式的实现有三种方案:
- 拉模式:博主发布消息后(有时间戳),在用户读的时候,拉取博主发布的消息。
- 缺点:延迟
- 推模式:在博主发布消息后,直接推送给关注他的人。
- 缺点:占内存
- 推拉结合:大v推送给活跃粉丝,普通粉丝用拉取
基于推模式实现关注推送功能
需求:
- 修改新增探店笔记的业务,在保存blog到数据库的同时,推送到粉丝的收件箱
public Result saveBlog(Blog blog) {
// 获取登录用户
UserDTO user = UserHolder.getUser();
blog.setUserId(user.getId());
// 保存探店博文
boolean isSuccess = save(blog);
if (!isSuccess){
return Result.fail("新增笔记失败");
}
//3.查询笔记作者的所有粉丝
List<Follow> follows = followService.query().eq("follow_user_id", user.getId()).list();
//4.推送笔记id给所有粉丝
for (Follow follow : follows) {
//4.1.获取粉丝的id
Long userId = follow.getUserId();
//4.2.推送
String key = "feed:"+userId;
stringRedisTemplate.opsForZSet().add(key,blog.getId().toString(),System.currentTimeMillis());
}
// 返回id
return Result.ok(blog.getId());
}
- 收件箱满足可以根据时间戳排序,必须用Redis的数据结构实现
- 查询收件箱数据时,可以实现分页查询
滚动分页查询参数:
- max(时间戳):当前时间戳 | 上一次查询的最小的时间戳
- min:0
- offset (偏移量):0 | 在上一次的结果中,与最小值一样的元素个数
- count(查询数量):查几条
接口:
Feed流的分页问题
Feed流中的数据会不断更新,所以数据的角标也在发生变化,因此不能采用传统的分页模式。
Feed流的滚动分页
实现逻辑:
//1.获取当前用户
//2.查询收件箱
//3.解析数据:blogId、score(时间戳)、offset
//4.根据id查询blog
//5.封装并返回
代码实现:
@Override
public Result queryBlogOfFollow(Long max, Integer offset) {
//1.获取当前用户
Long userId = UserHolder.getUser().getId();
//2.查询收件箱 ZREVRANGBYSCORE key max Min LIMIT offset count
String key = "feed:"+userId;
Set<ZSetOperations.TypedTuple<String>> typedTuples = stringRedisTemplate.opsForZSet()
.reverseRangeByScoreWithScores(key, 0, max, offset, 3);
//3.非空判断
if (typedTuples == null || typedTuples.isEmpty()){
return null;
}
long minTime =0;
int os = 1;
//3.解析数据:blogId、score(时间戳)、offset
List<Long> ids = new ArrayList<>(typedTuples.size());
for (ZSetOperations.TypedTuple<String> tuple : typedTuples) {
//4.1获取id
ids.add(Long.valueOf(tuple.getValue()));
//4.2获取分数(时间戳)
Long time = tuple.getScore().longValue();
if (time == minTime){
os++;
}else {
minTime = time;
os = 1;
}
}
String idStr = StringUtil.join(ids, ",");
//4.根据id查询blog
List<Blog> blogs = query()
.in("id",ids)
.last("order by FIELD(id," + idStr + ")").list();
for (Blog blog : blogs) {
//2.查询blog有关的用户
queryBlogByUser(blog);
//3.查询blog是否备点赞
isBlockLiked(blog);
}
//5.封装并返回
ScrollResult r = new ScrollResult();
r.setList(blogs);
r.setOffset(offset);
r.setMinTime(minTime);
return Result.ok(r);
}
附近商户
GEO数据结构
附近商户搜索
接口介绍
实现思路
按照商户类型做分组,类型相同的商户作为同一组,以typeId作为key存入同一个GEO集合中即可。
代码实现
用户签到
BitMap
假如我们用一张表来存储用户的登录信息,其结构应该如下。
我们按月来统计用户信息。签到记录为1,未签到则为0
把每一个bit位对应当月的每一天,形成了映射关系。用0和1标识业务状态,这种思路称为位图BitMap
Redis中利用String类型的数据结构实现BitMap,因此最大上限是52M,转换为bit则是2的32次个bit位。
BitMap的操作命令
实现签到功能
需求
实现签到接口,将当前的用户当天的签到信息保存到Redis中
注意:因为BitMap底层是基于String数据结构,因此其操作都封装在字符串相关的操作中。
代码实现
实现逻辑
//1.获取当前登录用户
//2.获取日期
//3.拼接key
//4.获取今天是本月的第几天
//5.写入redis SETBIT key offset 1
代码
public Result sign() {
//1.获取当前登录用户
Long userId = UserHolder.getUser().getId();
//2.获取日期
LocalDateTime now = LocalDateTime.now();
//3.拼接key
String keySuffix = now.format(DateTimeFormatter.ofPattern(":yyyyMM"));
String key = RedisConstants.USER_SIGN_KEY + userId + keySuffix;
//4.获取今天是本月的第几天
int dayOfMonth = now.getDayOfMonth();
//5.写入redis SETBIT key offset 1
stringRedisTemplate.opsForValue().setBit(key,dayOfMonth-1,true);
return Result.ok();
}
签到统计
问题
需求
实现逻辑
//1.获取当前登录用户
//2.获取日期
//3.拼接key
//4.获取今天是本月的第几天
//5.获取本月截止今天为止的所有的签到记录,返回的是一个十进制的数字
//6.循环遍历
//7.让这个数字与1做与运算,得到数字的最后的一个bit位
//判断这个bit位是否为0
//如果为0,说明未签到,结束
//如果不为0。说明已签到。计数器+1
//把数字右移一位,抛弃最后一个bit位,继续下一个bit位
代码实现
public Result signCount() {
//1.获取当前登录用户
Long userId = UserHolder.getUser().getId();
//2.获取日期
LocalDateTime now = LocalDateTime.now();
//3.拼接key
String keySuffix = now.format(DateTimeFormatter.ofPattern(":yyyyMM"));
String key = RedisConstants.USER_SIGN_KEY + userId + keySuffix;
//4.获取今天是本月的第几天
int dayOfMonth = now.getDayOfMonth();
//5.获取本月截止今天为止的所有的签到记录,返回的是一个十进制的数字
List<Long> result = stringRedisTemplate.opsForValue().bitField(
key,
BitFieldSubCommands.create().get(BitFieldSubCommands.BitFieldType.unsigned(dayOfMonth)).valueAt(0));
if (result == null || result.isEmpty()){
//没有任何签到结果
return Result.ok(0);
}
Long num = result.get(0);
if (num == null || num ==0){
return Result.ok(0);
}
//6.循环遍历
int count = 0;
while (true){
//7.让这个数字与1做与运算,得到数字的最后的一个bit位
//判断这个bit位是否为0
if ((num & 1) == 0){
//如果为0,说明未签到,结束
break;
}else {
//如果不为0。说明已签到。计数器+1
count++;
}
//把数字右移一位,抛弃最后一个bit位,继续下一个bit位
num >>>= 1;
}
return Result.ok(count);
}
HyperLogLog
HyperLogLog的用法
Redis高级篇
单点Redis的问题:
Redis持久化
RDB
RDB全称Redis Database Backup file (Redis数据备份文件),也被叫做Redis数据快照。简单来说就是把内存中的所有数据都记录到磁盘中。当Redis实例故障重启后,从磁盘读取快照文件,恢复数据。
快照文件成为RDB文件,默认是保存在当前运行目录。
Redis停机时会自动执行一次RDB
Redis内部有触发RDB的机制,可以在redis.conf文件中找到。格式如下:
RDB的其他配置也可以在redis.conf文件中设置
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。 如若内容造成侵权/违法违规/事实不符,请联系我的编程经验分享网邮箱:veading@qq.com进行投诉反馈,一经查实,立即删除!