Redis项目实战——黑马点评——从入门到深入

2023-12-18 18:33:26

项目介绍

项目图

图片.png

数据库

图片.png

登录功能实现

基于Session实现登录

流程图

图片.png

发送短信验证码

接口介绍

图片.png

代码实现

实现流程:
1.校验手机号 2.如果不符合,返回错误信息 3.符合,生成验证码 4.保存到Session 5.发送验证码 6.返回ok

注意:

  1. _在校验手机号时提供了正则表达的工具类_RegexUtil里面有各种格式的校验。
  2. 生成验证码使用RandomUtil的工具类
  3. 发送短信验证码,在瑞吉外卖项目已经实现过,详情可见瑞吉外卖总结
    public Result sendCode(String phone, HttpSession session) {
        //1.校验手机号
        if (RegexUtils.isPhoneInvalid(phone)) {
            //2.如果不符合,返回错误信息
            return Result.fail("手机号格式错误");
        }
        //3.符合,生成验证码
        String code = RandomUtil.randomNumbers(6);
        //4.保存到Session
        session.setAttribute("code",code);
        //5.发送验证码
        log.debug("发送短信验证码成功,验证码为:{}",code);
        //6.返回ok
        return Result.ok();
    }

短信登录

接口介绍

图片.png

代码实现

实现流程:

_1.校验手机号 2校验验证码 3.不一致,报错 4.一致,根据手机号查询用户 5.判断用户是否存在 6.不存在,创建新用户并保存 7.保存用户信息到session _

    public Result login(LoginFormDTO loginForm, HttpSession session) {
        //1.校验手机号
        String phone = loginForm.getPhone();
        if (RegexUtils.isPhoneInvalid(phone)) {
            //2.如果不符合,返回错误信息
            return Result.fail("手机号格式错误");
        }
        //2.校验验证码
        Object cacheCode = session.getAttribute("code");
        String code = loginForm.getCode();
        if (cacheCode == null || !cacheCode.toString().equals(code)){
            //3.不一致,报错
            return Result.fail("验证码错误");
        }
        //4.一致,根据手机号查询用户
        User user = query().eq("phone", phone).one();
        //5.判断用户是否存在
        if (user == null){
            //6.不存在,创建新用户并保存
            user = createUserWithPhone(phone);
        }
        //7.保存用户信息到session
        session.setAttribute("user",user);
        return Result.ok();
    }

登录验证功能

接口介绍

访问每个Controller之前都需要进行session验证,这样很麻烦,我们在访问Controller之前,用拦截器进行拦截,拦截到保存用户的信息,发送到Controller中,使用TreadLocal,线程安全。

代码实现

实现流程:
//1.获取session
//2.获取session中的用户
//3.判断用户是否存在
//4.不存在,拦截
//5.存在,保存用户信息到ThreadLocal
//6.放行

注意:

  1. 在使用ThreadLocal之前,在工具类中提供了UserHolder的工具类,里面定义了三个方法。save、get、remove方法。
  2. _在编写完拦截器之后拦截器并没有生效,需要编写配置类,实现WebMvcConfigurer接口。在里面实现方法addInterceptor。_registry.addInterceptor(new LoginInterceptor());然后配置要拦截的路径。
拦截器
    public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception {
        //1.获取session
        HttpSession session = request.getSession();
        //2.获取session中的用户
        Object user = session.getAttribute("user");
        //3.判断用户是否存在
        if (user == null){
            //4.不存在,拦截
            response.setStatus(401);
            return false;
        }
        //5.存在,保存用户信息到ThreadLocal
        UserHolder.saveUser((User)user);
        //6.放行
        return true;
    }

集群的session共享问题

session共享问题:多台Tomcat并不共享session存储空间,当请求切换到不同的tomcat服务器时,导致的数据丢失问题
session的替代方案应该满足:

  • 数据共享
  • 内存存储
  • key-value存储

图片.png

基于Redis实现共享session登录

流程图

图片.png
图片.png

代码实现

  1. 发送短信流程 :

1.校验手机号 //2.如果不符合,返回错误信息//3.符合,生成验证码//4.保存到redis//5.发送验证码//6.返回ok

    public Result sendCode(String phone, HttpSession session) {
        //1.校验手机号
        if (RegexUtils.isPhoneInvalid(phone)) {
            //2.如果不符合,返回错误信息
            return Result.fail("手机号格式错误");
        }
        //3.符合,生成验证码
        String code = RandomUtil.randomNumbers(6);
        //4.保存到redis
        stringRedisTemplate.opsForValue().set(RedisConstants.LOGIN_CODE_KEY +  phone,code,RedisConstants.LOGIN_CODE_TTL, TimeUnit.MINUTES);
        //5.发送验证码
        log.debug("发送短信验证码成功,验证码为:{}",code);
        //6.返回ok
        return Result.ok();
    }
  1. 登录流程:

//1.校验手机号//2.从redis获取并校验验证码//3.不一致,报错//4.一致,根据手机号查询用户//5.判断用户是否存在//6.不存在,创建新用户并保存//7.保存用户信息到redis//7.1随机生成token,作为登录令牌//7.2将user对象转为hash存储//7.3存储//7.4设置token有效期//8.返回token

 public Result login(LoginFormDTO loginForm, HttpSession session) {
        //1.校验手机号
        String phone = loginForm.getPhone();
        if (RegexUtils.isPhoneInvalid(phone)) {
            //2.如果不符合,返回错误信息
            return Result.fail("手机号格式错误");
        }
        //2.从redis获取并校验验证码
        String cacheCode = stringRedisTemplate.opsForValue().get(RedisConstants.LOGIN_CODE_KEY +  phone);
        String code = loginForm.getCode();
        if (cacheCode == null || !cacheCode.equals(code)){
            //3.不一致,报错
            return Result.fail("验证码错误");
        }
        //4.一致,根据手机号查询用户
        User user = query().eq("phone", phone).one();
        //5.判断用户是否存在
        if (user == null){
            //6.不存在,创建新用户并保存
            user = createUserWithPhone(phone);
        }
        //7.保存用户信息到redis
        //7.1随机生成token,作为登录令牌
        String token = UUID.randomUUID().toString(true);

        //7.2将user对象转为hash存储
        UserDTO userDTO = BeanUtil.copyProperties(user, UserDTO.class);
        Map<String, Object> userMap = BeanUtil.beanToMap(userDTO,new HashMap<>(),
                CopyOptions.create()
                        .setIgnoreNullValue(true)
                        .setFieldValueEditor((fieldName,fieldValue)->fieldValue.toString()));
        //7.3存储
        String tokenKey = "login:token:"+ token;
        stringRedisTemplate.opsForHash().putAll(tokenKey,userMap);
        //7.4设置token有效期
        stringRedisTemplate.expire(tokenKey,30,TimeUnit.MINUTES);
        //8.返回token
        return Result.ok(token);
    }
  1. 拦截器流程:

//1.获取请求头中的token//2.基于token获取redis中的用户//3.判断用户是否存在//4.不存在,拦截//5.将查询到的Hash数据转换为UserDTO对象//6.存在,保存用户信息到ThreadLocal//7.刷新token的有效期//7.放行

    public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception {
        //1.获取请求头中的token
        String token = request.getHeader("authorization");
        if (StrUtil.isBlank(token)){
            return false;
        }
        //2.基于token获取redis中的用户
        Map<Object, Object> userMap = stringRedisTemplate.opsForHash().entries(RedisConstants.LOGIN_USER_KEY + token);
        //3.判断用户是否存在
        if (userMap.isEmpty()){
            //4.不存在,拦截
            response.setStatus(401);
            return false;
        }
        //5.将查询到的Hash数据转换为UserDTO对象
        UserDTO userDTO = BeanUtil.fillBeanWithMap(userMap, new UserDTO(), false);

        //6.存在,保存用户信息到ThreadLocal
        UserHolder.saveUser(userDTO);
        //7.刷新token的有效期
        String tokenKey = "login:token:"+ token;
        stringRedisTemplate.expire(tokenKey,30, TimeUnit.MINUTES);
        //7.放行
        return true;
    }

注意:

  1. 把对象存入redis中的map时每个属性的值都应为String类型,在此项目中dto的id为long类型,需要做如下配置
  2. redis的key需要唯一并且后面容易取到,类型选择也要注意。
        Map<String, Object> userMap = BeanUtil.beanToMap(userDTO,new HashMap<>(),
                CopyOptions.create()
                        .setIgnoreNullValue(true)
                        .setFieldValueEditor((fieldName,fieldValue)->fieldValue.toString()));

登录拦截器的优化

流程图

图片.png

实现流程:
token拦截器:

 public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception {
        //1.获取请求头中的token
        String token = request.getHeader("authorization");
        if (StrUtil.isBlank(token)){
            return true;
        }
        //2.基于token获取redis中的用户
        Map<Object, Object> userMap = stringRedisTemplate.opsForHash().entries(RedisConstants.LOGIN_USER_KEY + token);
        //3.判断用户是否存在
        if (userMap.isEmpty()){
            //4.不存在,拦截
            response.setStatus(401);
            return true;
        }
        //5.将查询到的Hash数据转换为UserDTO对象
        UserDTO userDTO = BeanUtil.fillBeanWithMap(userMap, new UserDTO(), false);

        //6.存在,保存用户信息到ThreadLocal
        UserHolder.saveUser(userDTO);
        //7.刷新token的有效期
        String tokenKey = "login:token:"+ token;
        stringRedisTemplate.expire(tokenKey,30, TimeUnit.MINUTES);
        //7.放行
        return true;
    }

登录拦截器

    public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception {
        //1.判断是否需要拦截(ThreadLocal中是否有用户)
        if (UserHolder.getUser() == null){
            response.setStatus(401);
            return false;
        }
        return true;
    }

注意:在配置拦截器时,如果希望调节拦截器的执行顺序。可以调换注册顺序,也可以使用.order方法设置排序顺序,值越小的,越先执行。

商户缓存

什么是缓存?
缓存就是数据交换的缓冲区,称作cache,是存储数据的临时地方,一般读写性能比较高。

图片.png

查询商户添加redis缓存

模型图

图片.png

代码实现

业务流程

//1.从redis查询商铺缓存
//2.判断是否存在
//3.存在,直接返回
//4.不存在,根据id查询数据库
//5.不存在返回错误
//6.存在,写入redis
//7.返回

    public Result queryById(Long id) {
        String key ="cache:shop" + id;
        //1.从redis查询商铺缓存
        String shopJson = stringRedisTemplate.opsForValue().get(key);
        //2.判断是否存在
        if (StrUtil.isNotBlank(shopJson)) {
            //3.存在,直接返回
            Shop shop = JSONUtil.toBean(shopJson, Shop.class);
            return Result.ok(shop);
        }
        //4.不存在,根据id查询数据库
        Shop shop = getById(id);
        //5.不存在返回错误
        if (shop == null){
            return Result.fail("店铺不存在");
        }
        //6.存在,写入redis
        stringRedisTemplate.opsForValue().set(key,JSONUtil.toJsonStr(shop));
        //7.返回
        return Result.ok(shop);
    }

店铺类型缓存

缓存更新策略

图片.png

业务场景:

  • 低一致性需求:使用内存淘汰机制。例如店铺类型的缓存查询
  • 高一致性需求:主动更新,并以超时剔除作为兜底方案。例如店铺详情查询。

主动更新策略

  1. Cache Aside Pattern:由缓存的调用者,在更新数据库时的同时更新缓存。
  2. Read/Write Though Pattern:缓存和数据库整合为一个服务,由服务来维护一致性。调用者调用该服务,无需关心一致性问题。
  3. Write Behind Caching Pattern:调用者只操作缓存,其他线程异步的将缓存数据持久化到数据库,保证最终一致。

操作缓存和数据库时有三个问题需要考虑

  1. 删除缓存还是更新缓存?

更新缓存:每次更新数据库都更新缓存,无效写操作较多。

删除缓存:更新数据库时让缓存无效,查询时再更新缓存。
2. 如何保证缓存与数据库的操作的同时成功或失败?

  • 单体系统,将缓存与数据库操作放在一个事务里。
  • 分布式系统:利用TCC等分布式事务方案。
  1. 先操作缓存还是先操作数据库?
  • 先删除缓存,再操作数据库
  • 先操作数据库,再删除缓存

图片.png

缓存更新策略的最佳实践方案
  1. 低一致性需求:使用redis自带的内存淘汰机制
  2. 高一致性需求:主动更新,并以超时剔除作为兜底方案
  • 读操作:
    • 缓存命中则直接返回
    • 缓存未命中则查询数据库,并写入缓存,设置超时时间。
  • 写操作:
    • 先写数据库,再删除缓存操作
    • 要保证数据库与缓存操作的原子性

给查询商铺的缓存添加超时剔除和主动更新的策略

修改ShopController中的逻辑业务,满足下面的需求:

  1. 根据id查询店铺时,如果缓存未命中,则查询数据库,将数据库写入缓存,并设置超时时间。
  2. 根据id修改店铺时,先改数据库,在删除缓存。
//查询:
    public Result queryById(Long id) {
        String key =RedisConstants.CACHE_SHOP_KEY+ id;
        //1.从redis查询商铺缓存
        String shopJson = stringRedisTemplate.opsForValue().get(key);
        //2.判断是否存在
        if (StrUtil.isNotBlank(shopJson)) {
            //3.存在,直接返回
            Shop shop = JSONUtil.toBean(shopJson, Shop.class);
            return Result.ok(shop);
        }
        //4.不存在,根据id查询数据库
        Shop shop = getById(id);
        //5.不存在返回错误
        if (shop == null){
            return Result.fail("店铺不存在");
        }
        //6.存在,写入redis
        stringRedisTemplate.opsForValue().set(key,JSONUtil.toJsonStr(shop), RedisConstants.CACHE_SHOP_TTL, TimeUnit.MINUTES);
        //7.返回
        return Result.ok(shop);
    }

更新:
    @Transactional
    public Result update(Shop shop) {
        Long id = shop.getId();
        if (id == null){
            return Result.fail("店铺id不能为空");
        }
        //1.更新数据库
        updateById(shop);
        //2.删除缓存
        stringRedisTemplate.delete(RedisConstants.CACHE_SHOP_KEY+ id);
        return Result.ok();
    }

缓存穿透

缓存穿透是指客户端请求的数据在缓存中和数据库中都不存在,这样缓存永远不会生效,这些请求都会达到数据库

图片.png

解决方案

缓存空对象

图片.png

优点:实现简单、维护方便
缺点:额外的内存消耗(设置过期时间解决)、可能造成短期的不一致

布隆过滤

图片.png

优点:内存占用少,没有多余key
缺点:实现复杂、存在误判可能

代码解决

流程图

图片.png

代码改进
    public Result queryById(Long id) {
        String key =RedisConstants.CACHE_SHOP_KEY+ id;
        //1.从redis查询商铺缓存
        String shopJson = stringRedisTemplate.opsForValue().get(key);
        //2.判断是否存在
        if (StrUtil.isNotBlank(shopJson)) {
            //3.存在,直接返回
            Shop shop = JSONUtil.toBean(shopJson, Shop.class);
            return Result.ok(shop);
        }
        //判断命中的是否是空值
        if (shopJson != null){
            //返回错误信息
            return Result.fail("店铺不存在");
        }
        //4.不存在,根据id查询数据库
        Shop shop = getById(id);
        //5.不存在返回错误
        if (shop == null){
            //将空值写入redis
            stringRedisTemplate.opsForValue().set(key,"", RedisConstants.CACHE_NULL_TTL, TimeUnit.MINUTES);
            //返回错误信息
            return Result.fail("店铺不存在");
        }
        //6.存在,写入redis
        stringRedisTemplate.opsForValue().set(key,JSONUtil.toJsonStr(shop), RedisConstants.CACHE_SHOP_TTL, TimeUnit.MINUTES);
        //7.返回
        return Result.ok(shop);
    }

解决缓存穿透的两个重点:

  1. 如果提交的查询条件不存在,需要将空值写入redis
  2. 判断命中缓存后,命中的是否为null。

总结
  1. 缓存穿透产生的原因是什么?

用户请求的数据在缓存中和数据库中都不存在,不断发起这样的请求,给数据库带来了巨大压力。

  1. 缓存穿透的结局方案有哪些?
  • 缓存null
  • 布隆过滤
  • 增加id的复杂度,避免被猜测id规律
  • 做好数据的基础格式校验
  • 加强用户权限校验
  • 做好热点参数校验

缓存雪崩

缓存雪崩是指在同一时段,大量的缓存key同时失效或者Redis服务宕机,导致大量的请求到达数据库,带来巨大压力。

图片.png

解决方案

  1. 给不同的key的TTL添加随机值(批量导入时,随机设置TTL)
  2. 利用Redis集群提高服务的可用性
  3. 给缓存业务添加降级限流策略
  4. 给业务添加多级缓存

缓存击穿

缓存击穿问题也叫热点key问题,就是一个被高并发访问并且缓存重建业务比较复杂的key突然失效了,无数的请求访问会在瞬间给数据库带来的巨大冲击

图片.png

解决方案

互斥锁

图片.png

逻辑过期

图片.png

两种方案对比

图片.png

基于互斥锁方式解决缓存击穿的问题

需求:修改根据id查询商铺的业务,基于互斥锁方式来解决缓存击穿问题

流程图:
图片.png

利用setnx命令原理(如果key不存在,新建key,如果key已经存在,那么不进行操作)来实现互斥锁。

生成锁方法:

    private boolean tryLock(String key){
        Boolean flag = stringRedisTemplate.opsForValue().setIfAbsent(key, "1", 10, TimeUnit.SECONDS);
        return BooleanUtil.isTrue(flag);
    }

释放锁方法:

    private void unlock(String key){
        stringRedisTemplate.delete(key);
    }

互斥锁实现缓存击穿代码:

    public Shop queryWithMutex(Long id){
        String key =RedisConstants.CACHE_SHOP_KEY+ id;
        //1.从redis查询商铺缓存
        String shopJson = stringRedisTemplate.opsForValue().get(key);
        //2.判断是否存在
        if (StrUtil.isNotBlank(shopJson)) {
            //3.存在,直接返回
            Shop shop = JSONUtil.toBean(shopJson, Shop.class);
            return shop;
        }
        //判断命中的是否是空值
        if (shopJson != null){
            //返回错误信息
            return null;
        }
        //4.实现缓存重建
        //4.1获取互斥锁
        String lockKey = RedisConstants.LOCK_SHOP_KEY + id;
        Shop shop = null;
        try {
            boolean isLock = tryLock(lockKey);
            //4.2判断是否成功
            if (!isLock){
                //4.3失败则休眠并重试
                Thread.sleep(50);
                return queryWithMutex(id);
            }
            //4.4成功,根据id查询数据库
            shop = getById(id);
            //5.不存在返回错误
            if (shop == null){
                //将空值写入redis
                stringRedisTemplate.opsForValue().set(key,"", RedisConstants.CACHE_NULL_TTL, TimeUnit.MINUTES);
                //返回错误信息
                return null;
            }
            //6.存在,写入redis
            stringRedisTemplate.opsForValue().set(key,JSONUtil.toJsonStr(shop), RedisConstants.CACHE_SHOP_TTL, TimeUnit.MINUTES);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }finally {
            //7.释放互斥锁
            unlock(lockKey);
        }
        //7.返回
        return shop;
    }

基于逻辑过期方式解决缓存击穿问题

逻辑过期不是真正的过期,在存入redis额外的添加一个过期时间的字段。不用设置ttl(过期时间),过期时间不是由redis控制,而是由程序员控制。

流程图:

图片.png

代码实现:

    /**
     * 封装店铺信息逻辑过期时间的方法
     * @param id
     * @param expireSeconds
     */
    private void saveShop2Redis(Long id,Long expireSeconds){
        //1.查询店铺数据
        Shop shop = getById(id);
        //2.封装逻辑过期时间
        RedisData redisData = new RedisData();
        redisData.setData(shop);
        redisData.setExpireTime(LocalDateTime.now().plusSeconds(expireSeconds));
        //3.写入Redis
        stringRedisTemplate.opsForValue().set(RedisConstants.CACHE_SHOP_KEY+id,JSONUtil.toJsonStr(redisData));
    }
    private static final ExecutorService CACHE_REBUILD_EXECUTOR = Executors.newFixedThreadPool(10);
    /**
     * 逻辑过期解决缓存击穿
     * @param id
     * @return
     */
    public Shop queryWithLogicalExpire(Long id){
        String key =RedisConstants.CACHE_SHOP_KEY+ id;
        //1.从redis查询商铺缓存
        String shopJson = stringRedisTemplate.opsForValue().get(key);
        //2.判断是否存在
        if (StrUtil.isBlank(shopJson)) {
            //3.不存在,直接返回
            return null;
        }
        //4.命中,需要把json反序列化为对象
        RedisData redisData = JSONUtil.toBean(shopJson, RedisData.class);
        LocalDateTime expireTime = redisData.getExpireTime();
        JSONObject data = (JSONObject) redisData.getData();
        Shop shop = JSONUtil.toBean(data, Shop.class);
        //5.判断是否过期
        if (expireTime.isAfter(LocalDateTime.now())){
            //5.1未过期,直接返回
            return shop;
        }
        //5.2已过期,需要缓存重建
        //6.缓存重建
        //6.1获取互斥锁
        String lockKey = RedisConstants.LOCK_SHOP_KEY + id;
        boolean isLock = tryLock(lockKey);
        //6.2判断是否获取互斥锁成功
        if (isLock){
            //6.3成功,开启独立线程,实现缓存重建
            CACHE_REBUILD_EXECUTOR.submit(()->{
               //重建缓存
                try {
                    this.saveShop2Redis(id,20L);
                } catch (Exception e) {
                    throw new RuntimeException(e);
                }finally {
                    //释放锁
                    unlock(lockKey);
                }
            });
        }

        //6.4返回过期的商铺信息
        return shop;
    }

缓存工具类的封装

基于StringRedisTemplete封装一个缓存工具类,满足下面的需求

  • 方法1:将任意java对象序列化为json并存储在string类型的key中,并且可以设置TTL过期时间
  • 方法2:将任意ava对象序列化为json并存储在string类型的key中,并且可以设置逻辑过期时间,用于处理缓存击穿问题
  • 方法3:根据指定的key查询缓存,并反序列化为指定类型,利用缓存空值的方式解决缓存穿透问题
  • 方法4:根据指定的key查询缓存,并反序列化为指定类型,需要利用逻辑过期解决缓存击穿问题
    /**
     * 将任意java对象序列化为json并存储在string类型的key中,并且可以设置TTL过期时间
     * @param key
     * @param value
     * @param time
     * @param unit
     */
    public void  set(String key, Object value, Long time, TimeUnit unit){
        stringRedisTemplate.opsForValue().set(key, JSONUtil.toJsonStr(value),time,unit);
    }
    /**
     * 将任意ava对象序列化为json并存储在string类型的key中,并且可以设置逻辑过期时间,用于处理缓存击穿问题
     * @param key
     * @param value
     * @param time
     * @param unit
     */
    public void  setWithLogicalExpire(String key, Object value, Long time, TimeUnit unit){
        //所谓逻辑过期,在存入redis时,给数据里加入一个逻辑过期的字段
        //设置逻辑过期
        RedisData redisData = new RedisData();
        redisData.setData(value);
        redisData.setExpireTime(LocalDateTime.now().plusSeconds(unit.toSeconds(time)));
        stringRedisTemplate.opsForValue().set(key, JSONUtil.toJsonStr(redisData));
    }
    /**
     * 解决缓存穿透的工具方法
     * @param id
     * @param type
     * @return
     * @param <R>
     */
    public <R,ID> R queryWithPassThrough(
            String keyPrefix, ID id, Class<R> type, Function<ID,R> dbFallback,Long time, TimeUnit unit){
        String key = keyPrefix + id;
        //1.从redis查询商铺缓存
        String json = stringRedisTemplate.opsForValue().get(key);
        //2.判断是否存在
        if (StrUtil.isNotBlank(json)) {
            //3.存在,直接返回
            return JSONUtil.toBean(json,type);
        }
        //判断命中的是否是空值
        if (json != null){
            //返回错误信息
            return null;
        }
        //4.不存在,根据id查询数据库
        R r = dbFallback.apply(id);
        //5.不存在返回错误
        if (r == null){
            //将空值写入redis
            stringRedisTemplate.opsForValue().set(key,"", RedisConstants.CACHE_NULL_TTL, TimeUnit.MINUTES);
            //返回错误信息
            return null;
        }
        //6.存在,写入redis
       this.set(key,r,time,unit);
        //7.返回
        return r;
    }
 private static final ExecutorService CACHE_REBUILD_EXECUTOR = Executors.newFixedThreadPool(10);
    /**
     * 逻辑过期解决缓存击穿
     * @param id
     * @return
     */
    public <R,ID> R queryWithLogicalExpire(String keyPrefix,ID id,Class<R> type,Function<ID,R> dbFallback,Long time, TimeUnit unit){
        String key = keyPrefix + id;
        //1.从redis查询商铺缓存
        String json = stringRedisTemplate.opsForValue().get(key);
        //2.判断是否存在
        if (StrUtil.isBlank(json)) {
            //3.不存在,直接返回
            return null;
        }
        //4.命中,需要把json反序列化为对象
        RedisData redisData = JSONUtil.toBean(json, RedisData.class);
        LocalDateTime expireTime = redisData.getExpireTime();
        JSONObject data = (JSONObject) redisData.getData();
        R r = JSONUtil.toBean(data,type);
        //5.判断是否过期
        if (expireTime.isAfter(LocalDateTime.now())){
            //5.1未过期,直接返回
            return r;
        }
        //5.2已过期,需要缓存重建
        //6.缓存重建
        //6.1获取互斥锁
        String lockKey = RedisConstants.LOCK_SHOP_KEY + id;
        boolean isLock = tryLock(lockKey);
        //6.2判断是否获取互斥锁成功
        if (isLock){
            //6.3成功,开启独立线程,实现缓存重建
            CACHE_REBUILD_EXECUTOR.submit(()->{
                //重建缓存
                try {
                    //查询数据库
                    R r1 = dbFallback.apply(id);
                    //写入redis
                    this.setWithLogicalExpire(key,r1,time,unit);
                } catch (Exception e) {
                    throw new RuntimeException(e);
                }finally {
                    //释放锁
                    unlock(lockKey);
                }
            });
        }
        //6.4返回过期的商铺信息
        return r;
    }

    /**
     * 获取锁
     * @param key
     * @return
     */
    private boolean tryLock(String key){
        Boolean flag = stringRedisTemplate.opsForValue().setIfAbsent(key, "1", 10, TimeUnit.SECONDS);
        return BooleanUtil.isTrue(flag);
    }

    /**
     * 释放锁
     * @param key
     */
    private void unlock(String key){
        stringRedisTemplate.delete(key);
    }

}

总结

缓存穿透

图片.png

缓存雪崩

图片.png

缓存击穿(热点key失效)

图片.png图片.png

优惠卷秒杀

全局唯一ID

当用户抢购时,就会生成订单并保存到tb_voucher_oder这张表中,而订单表如果使用数据库自增ID就会存在一些问题:

  • id的规律太明显
  • 受单表数据量的显示

全局id生成器

是一种在分布式系统下用来生成全局的唯一id的工具,一般要满足下列特性:

图片.png

为了增加ID的安全性,我们不可以直接使用Redis自增的数值,而是拼接一些其他信息:

图片.png

ID的组成成分:

  • 符号位:1bit,永远为0
  • 时间戳:31bit,以秒为单位,可以使用69年
  • 序列号:32bit,秒内计数器,支持每秒产生2的32次方个不同的ID

代码:

public class RedisIdWorker {
    //开始的时间戳
    private static final long BEGIN_TIMESTAMP = 1640995200L;
    //序列号的位数
    private static final int COUNT_BITS = 32;
    @Resource
    private StringRedisTemplate stringRedisTemplate;
    public long nextId(String keyPrefix){
        //1.生成时间戳
        LocalDateTime now = LocalDateTime.now();
        long nowSecond = now.toEpochSecond(ZoneOffset.UTC);
        long timestamp = nowSecond - BEGIN_TIMESTAMP;
        //2.生成序列化
        //2.1获取当前的日期,精确到天
        String date = now.format(DateTimeFormatter.ofPattern("yyyyMMdd"));
        //2.2自增长
        Long count = stringRedisTemplate.opsForValue().increment("icr:" + keyPrefix + ":" + date);
        //3.拼接返回

        return timestamp << COUNT_BITS | count;
    }

}

总结

全局唯一id生成策略:

  • UUID
  • Redis自增
  • 雪花算法
  • 数据库自增

Redis自增ID策略:

  • 每天一个key,方便统计订单量
  • ID构造是:时间戳+计数器

实现优惠券的秒杀下单

每个店铺都可以发布优惠券,分为平价券和特价券。平价券可以任意购买,特价券需要秒杀抢购。

表关系如下:

  • tb_voucher:优惠券的基本信息,优惠金额、使用规则等
  • tb_seckill_voucher:优惠券的库存、开始抢购时间、结束抢购时间。特价优惠券才需要填写这些信息。

接口介绍

图片.png

注意事项

下单时需要注意两点:

  1. 秒杀是否开始或结束,如果未开始或者已结束则无法下单
  2. 库存是否充足,不足则无法下单

流程图

图片.png

代码实现

代码流程:
//1.查询优惠卷
//2.判断秒杀是否开始
//3.判断秒杀是否结束
//4.判断库存是否充足
//5.扣除库存
//6.创建菜单
//7.返回订单id

    @Resource
    private ISeckillVoucherService seckillVoucherService;
    @Resource
    private RedisIdWorker redisIdWorker;
    @Transactional
    @Override
    public Result seckillVoucher(Long voucherId) {
        //1.查询优惠卷
        SeckillVoucher voucher = seckillVoucherService.getById(voucherId);
        //2.判断秒杀是否开始
        if (voucher.getBeginTime().isAfter(LocalDateTime.now())) {
            //尚未开始
            return Result.fail("秒杀尚未开始");
        }
        //3.判断秒杀是否结束
        if (voucher.getEndTime().isBefore(LocalDateTime.now())){
            //秒杀结束
            return Result.fail("秒杀时间结束");
        }
        //4.判断库存是否充足
        if (voucher.getStock()<1){
            return Result.fail("库存不足");
        }
        //5.扣除库存
        boolean success = seckillVoucherService.update()
                .setSql("stock = stock - 1")
                .eq("voucher_id", voucherId).update();
        if (!success){
            //扣件失败
            return Result.fail("库存不足");
        }
        //6.创建菜单
        VoucherOrder voucherOrder = new VoucherOrder();
        //6.1.订单id
        long orderId = redisIdWorker.nextId("oder");
        voucherOrder.setId(orderId);
        //6.2.用户id
        Long userId = UserHolder.getUser().getId();
        voucherOrder.setUserId(userId);
        //6.3.代金券id
        voucherOrder.setVoucherId(voucherId);
        save(voucherOrder);
        //7.返回订单id
        return Result.ok(orderId);
    }

超卖问题

超卖问题是典型的多线程安全问题。

图片.png

解决方案

针对这一问题的常见方案就是加锁

图片.png

乐观锁解决

乐观锁的关键是判断之前查询到的数据是否有被修改过,常见的方式有两种:

  • 版本号法

图片.png

  • CAS法

图片.png

总结

  1. 悲观锁:添加同步锁,让线程串行执行
    1. 优点:简单粗暴
    2. 缺点:性能一般
  2. 乐观锁:不加锁,在更新时判断是否有其他的线程在修改
    1. 优点:性能好
    2. 缺点:存在成功率比较低的问题

一人一单

需求:修改秒杀业务,要求一个优惠卷,一个用户只能下一单

流程图

图片.png

代码

通过对用户的id进行加锁实现了一人一单的问题

    @Resource
    private ISeckillVoucherService seckillVoucherService;
    @Resource
    private RedisIdWorker redisIdWorker;

    @Override
    public Result seckillVoucher(Long voucherId) {
        //1.查询优惠卷
        SeckillVoucher voucher = seckillVoucherService.getById(voucherId);
        //2.判断秒杀是否开始
        if (voucher.getBeginTime().isAfter(LocalDateTime.now())) {
            //尚未开始
            return Result.fail("秒杀尚未开始");
        }
        //3.判断秒杀是否结束
        if (voucher.getEndTime().isBefore(LocalDateTime.now())){
            //秒杀结束
            return Result.fail("秒杀时间结束");
        }
        //4.判断库存是否充足
        if (voucher.getStock()<1){
            return Result.fail("库存不足");
        }
        Long userId = UserHolder.getUser().getId();
        synchronized (userId.toString().intern()) {
            //获取代理对象(事务)
            IVoucherOrderService proxy = (IVoucherOrderService) AopContext.currentProxy();

            return proxy.createVoucherOrder(voucherId);
        }
    }
    @Transactional
    public  Result createVoucherOrder(Long voucherId) {
        //6.一人一单
        Long userId = UserHolder.getUser().getId();

            //6.1查询订单
            Integer count = query().eq("user_id", userId).eq("voucher_id", voucherId).count();
            //6.2判断是否存在
            if (count > 0) {
                //用户已经购买
                return Result.fail("用户购买已经购买过");
            }
            //5.扣除库存
            boolean success = seckillVoucherService.update()
                    .setSql("stock = stock - 1")
                    .eq("voucher_id", voucherId).gt("stock", 0)
                    .update();
            if (!success) {
                //扣件失败
                return Result.fail("库存不足");
            }

            //6.创建菜单
            VoucherOrder voucherOrder = new VoucherOrder();
            //6.1.订单id
            long orderId = redisIdWorker.nextId("oder");
            voucherOrder.setId(orderId);
            //6.2.用户id

            voucherOrder.setUserId(userId);
            //6.3.代金券id
            voucherOrder.setVoucherId(voucherId);
            save(voucherOrder);
            //7.返回订单id
            return Result.ok(orderId);
        }

一人一单的并发安全问题

图片.png

通过加锁可以解决在单机情况下的宜人一单的安全问题,但是在集群模式下就不行了。
在集群模式下,多个jvm里面多个锁监视器,导致锁失效。因为只能保持单个jvm。

分布式锁

满足分布式系统或集群模式下多进程可见并且互斥的锁

图片.png
图片.png

分布式锁的实现

分布式锁的核心是实现多进程之间的互斥,而满足这一点的方式有很多,常见的有三种

图片.png

基于Redis的分布式锁

实现分布式锁时需要实现的两个基本方法

  1. 获取锁:
    1. 互斥:确保只有一个线程获取锁:setnx key value
    2. 添加过期时间,避免服务器宕机引起的死锁:expire key 10(time)
    3. 非阻塞:尝试一次,成功返回true,失败返回false
  2. 释放锁:
    1. 手动释放:del key
    2. 超时释放:获取锁时添加一个过期时间

流程图:
图片.png

初级版本

需求:定义一个类,实现下面的接口,利用redis实现分布式锁功能

图片.png
创建锁释放锁的工具类:

public class SimpleRedisLock implements ILock{
    private StringRedisTemplate stringRedisTemplate;
    private String name;
    public SimpleRedisLock(StringRedisTemplate stringRedisTemplate, String name) {
        this.stringRedisTemplate = stringRedisTemplate;
        this.name = name;
    }
    private static final String key_prefix = "lock:";
    @Override
    public boolean tryLock(long timeoutSec) {
        //获取线程标识
        long threadId = Thread.currentThread().getId();
        //获取锁
        Boolean seccess = stringRedisTemplate.opsForValue().setIfAbsent(key_prefix+name, threadId + " ", timeoutSec, TimeUnit.SECONDS);
        return Boolean.TRUE.equals(seccess);
    }
    @Override
    public void unlock() {
        stringRedisTemplate.delete(key_prefix+name);
    }
}

代码实现

@Service
public class VoucherOrderServiceImpl extends ServiceImpl<VoucherOrderMapper, VoucherOrder> implements IVoucherOrderService {
    @Resource
    private ISeckillVoucherService seckillVoucherService;
    @Resource
    private RedisIdWorker redisIdWorker;
    @Resource
    private StringRedisTemplate stringRedisTemplate;

    @Override
    public Result seckillVoucher(Long voucherId) {
        //1.查询优惠卷
        SeckillVoucher voucher = seckillVoucherService.getById(voucherId);
        //2.判断秒杀是否开始
        if (voucher.getBeginTime().isAfter(LocalDateTime.now())) {
            //尚未开始
            return Result.fail("秒杀尚未开始");
        }
        //3.判断秒杀是否结束
        if (voucher.getEndTime().isBefore(LocalDateTime.now())){
            //秒杀结束
            return Result.fail("秒杀时间结束");
        }
        //4.判断库存是否充足
        if (voucher.getStock()<1){
            return Result.fail("库存不足");
        }
        Long userId = UserHolder.getUser().getId();
        //创建锁对象
        SimpleRedisLock lock = new SimpleRedisLock(stringRedisTemplate, "oder" + userId);
        //获取锁
        boolean islock = lock.tryLock(5);
        //判断是否获取锁成功
        if (!islock){
            //获取锁失败,返回错误或重试
            return Result.fail("不允许重复下单");
        }
        try {
            //获取代理对象(事务)
            IVoucherOrderService proxy = (IVoucherOrderService) AopContext.currentProxy();
            return proxy.createVoucherOrder(voucherId);
        } catch (IllegalStateException e) {
            throw new RuntimeException(e);
        } finally {
            lock.unlock();
        }
    }
    @Transactional
    public  Result createVoucherOrder(Long voucherId) {
        //6.一人一单
        Long userId = UserHolder.getUser().getId();

            //6.1查询订单
            Integer count = query().eq("user_id", userId).eq("voucher_id", voucherId).count();
            //6.2判断是否存在
            if (count > 0) {
                //用户已经购买
                return Result.fail("用户购买已经购买过");
            }
            //5.扣除库存
            boolean success = seckillVoucherService.update()
                    .setSql("stock = stock - 1")
                    .eq("voucher_id", voucherId).gt("stock", 0)
                    .update();
            if (!success) {
                //扣件失败
                return Result.fail("库存不足");
            }

            //6.创建菜单
            VoucherOrder voucherOrder = new VoucherOrder();
            //6.1.订单id
            long orderId = redisIdWorker.nextId("oder");
            voucherOrder.setId(orderId);
            //6.2.用户id

            voucherOrder.setUserId(userId);
            //6.3.代金券id
            voucherOrder.setVoucherId(voucherId);
            save(voucherOrder);
            //7.返回订单id
            return Result.ok(orderId);
        }

}

极端情况

图片.png

线程1释放了别的线程的锁。
在释放锁时要判断锁的标识是否是自己的

改进Redis的分布式锁

需求:修改之前的分布式锁实现,满足:

  1. 在获取锁时存入线程标识(可用UUID表示)
  2. 在释放锁时先获取锁中的线程标识,判断是否是当前的线程标识
    1. 如果一致则释放锁
    2. 如果不一致则不释放锁

流程图:

图片.png
代码实现:

public class SimpleRedisLock implements ILock{
    private StringRedisTemplate stringRedisTemplate;
    private String name;
    public SimpleRedisLock(StringRedisTemplate stringRedisTemplate, String name) {
        this.stringRedisTemplate = stringRedisTemplate;
        this.name = name;
    }
    private static final String key_prefix = "lock:";
    private static final String ID_PREFIX = UUID.randomUUID().toString(true)+"-";
    @Override
    public boolean tryLock(long timeoutSec) {
        //获取线程标识
        String threadId = ID_PREFIX+Thread.currentThread().getId();
        //获取锁
        Boolean seccess = stringRedisTemplate.opsForValue().setIfAbsent(key_prefix+name, threadId, timeoutSec, TimeUnit.SECONDS);
        return Boolean.TRUE.equals(seccess);
    }
    @Override
    public void unlock() {
        //获取线程标识
        String threadId = ID_PREFIX+Thread.currentThread().getId();
        //获取锁中的标识
        String id = stringRedisTemplate.opsForValue().get(key_prefix + name);
        //判断标识是否一致
        if (threadId.equals(id)){
            stringRedisTemplate.delete(key_prefix+name);
        }
    }
}

又存在的问题

图片.png

redis的lua脚本

图片.png

调用脚本

图片.png

实例

图片.png

再次改进Redis的分布式锁

需求:基于Lua脚本实现分布式锁的释放锁逻辑
提示:RedisTemplete调用Lua脚本的API如下:

图片.png
代码实现:

    @Override
    public void unlock() {
        //调用lua脚本
        stringRedisTemplate.execute(
                UNLOCK_SCRIPT,
                Collections.singletonList(key_prefix+name),
                ID_PREFIX+Thread.currentThread().getId());
    }
    private static final DefaultRedisScript<Long> UNLOCK_SCRIPT;
    static {
        UNLOCK_SCRIPT = new DefaultRedisScript<>();
        UNLOCK_SCRIPT.setLocation(new ClassPathResource("unlock.lua"));
        UNLOCK_SCRIPT.setResultType(Long.class);
    }

lua脚本:

--比较线程标识与锁中的标识是否一致
if (redis.call('get',KEYS[1])==ARGV[1]) then
    -- 释放锁 del key
    return redis.call('del',KEYS[1])
end
return 0

总结

基于redis的分布式锁的实现思路:

  1. 利用setnx获取锁,并设置过期时间,保存线程标识
  2. 释放锁时先判断线程标识是否与自己的一致,一致则删除锁。

特性:

  1. 利用setnx满足互斥性
  2. 利用setex保证故障时锁依然能够释放,避免死锁,提高安全性。
  3. 利用redis集群保证高可用性和高并发性的特性

基于redis的分布式锁的优化

基于setnx实现的分布式锁存在下面的问题:

图片.png

Redisson

Redisson是一个在redis基础上实现的Java驻内存数据网络。它不仅提供了一系列的分布式的Java常用对象,还提供了许多分布式服务,其中就包含了各种分布式锁的实现。

图片.png官网地址:https://redison.org
GitHub地址:https://github.com/redisson/redisson

Redisson入门

图片.png图片.png

Redisson可重入锁的原理

图片.png

lua脚本解决:获取锁

图片.png

lua脚本解决释放锁:

图片.png

整个流程图

图片.png

总结

Redisson分布式锁的原理:

  • 可重入:利用hash结构记录线程id和重入次数:
    • 每次获取锁时都判断锁是否存在,如果不存在直接获取,如果存在,对比线程标识,如果当前的线程标识是当前线程,那么可以再次获取,并把重入次数加一。在以后释放锁时,把重用次数减一。重用次数减到0时,代表业务走到最外层,就可以真正的释放锁。
  • 可重试:利用信号量和PubSub功能实现等待、唤醒、获取锁失败的重试机制。
    • 在第一次重试获取锁失败后,不立即失败,而是等待PubSub释放锁的消息,就可以重新获取锁。有等待的超时时间。
  • 超时续约:利用watchdaog,每隔一段时间(releaseTime/3),重置超时时间。
    • 获取锁成功以后,开启一个定时任务。这个定时任务每隔一段时间,重置超时时间。

Redisson分布式锁主从一致问题
  1. 不可重入的Redis分布式锁
  • 原理:利用setnx的互斥性;利用ex避免死锁;释放锁时判断线程标识
  • 缺陷:不可重入、无法重试、锁超时失效
  1. 可重入的Redis分布式锁
  • **原理:**利用hash结构,记录线程标识和重入次数;利用watchDog延续锁时间;利用信号量控制锁重试等待
  • 缺陷:redis宕机失效问题。
  1. Redisson的multiLock:
  • 原理:多个独立的Redis节点,必须在所有的节点都获取重入锁,才算获取锁成功。
  • 缺陷:运维成本高、实现复杂。

Redis优化秒杀

流程图

图片.png

实现

图片.png

案例 改进秒杀业务,提高并发性能

需求:

  1. 新增秒杀优惠券的同时,将优惠券信息保存到redis中
stringRedisTemplate.opsForValue().set(RedisConstants.SECKILL_STOCK_KEY+voucher.getId(),voucher.getStock().toString());
  1. 基于lua脚本,判断秒杀库存、一人一单,决定用户是否抢购成功
--1.参数列表
--1.1优惠券id
local voucherId = ARGV[1]
--1.2.用户id
local userId = ARGV[2]
--2.数据key
--2.1.库存key
local stockKey = 'seckill:stock' .. voucherId

--2.2订单key
local orderKey = 'seckill:order' .. voucherId
--3.脚本业务
--3.1.判断库存是否充足  get stockKey是否大于0
if (tonumber(redis.call('get',stockKey)) <= 0) then
    --库存不足,返回1
    return 1
end
--3.2判断用户是否下单  SISMEMBER oderKey userId 判断用户的id是否在里面
if (redis.call('sismember',orderKey,userId) == 1) then
    --存在,说明重复下单  返回2
    return 2
end
--3.4.扣库存  incrby stockKey -1
redis.call('incrby',stockKey,-1)
--3.5下单(保存用户)sadd orderKey userId 把userId保存到orderKey里面
redis.call('sadd',orderKey,userId)
  1. 如果抢购成功,将优惠券id和用户id封装后存入阻塞队列
    private static final DefaultRedisScript<Long> SECKILL_SCRIPT;
    static {
        SECKILL_SCRIPT = new DefaultRedisScript<>();
        SECKILL_SCRIPT.setLocation(new ClassPathResource("seckill.lua"));
        SECKILL_SCRIPT.setResultType(Long.class);
    }
    private BlockingQueue<VoucherOrder> orderTasks = new ArrayBlockingQueue<>(1024*1024);
    @Override
    public Result seckillVoucher(Long voucherId) {
        //获取用户
        Long userId = UserHolder.getUser().getId();
        //1.执行lua脚本
        Long result = stringRedisTemplate.execute(
                SECKILL_SCRIPT,
                Collections.emptyList(),
                voucherId.toString(), userId.toString());
        //2.判断结果是否为0
        int r = result.intValue();
        if (r != 0){
            //2.1.不为0,没有购买资格
            return Result.fail(r == 1?"库存不足":"不能重复下单");
        }
        //2.2.为0,有购买资格,把下单信息保存到阻塞队列
        VoucherOrder voucherOrder = new VoucherOrder();
        //2.3.订单id
        long orderId = redisIdWorker.nextId("order");
        voucherOrder.setId(orderId);
        //2.4.用户id
        voucherOrder.setUserId(userId);
        //2.5.代金券id
        voucherOrder.setVoucherId(voucherId);
        //2.6放入阻塞队列
        orderTasks.add(voucherOrder);
        //3.返回订单id
        return Result.ok(orderId);
    }
  1. 开启线程任务,不断从阻塞队列中获取信息,实现异步下单功能
 private static final DefaultRedisScript<Long> SECKILL_SCRIPT;
    static {
        SECKILL_SCRIPT = new DefaultRedisScript<>();
        SECKILL_SCRIPT.setLocation(new ClassPathResource("seckill.lua"));
        SECKILL_SCRIPT.setResultType(Long.class);
    }
    private BlockingQueue<VoucherOrder> orderTasks = new ArrayBlockingQueue<>(1024*1024);
    private static final ExecutorService SECKILL_ORDER_EXECUTOR = Executors.newSingleThreadExecutor();
    private IVoucherOrderService proxy;
    //此注解代表,当前类初始化完成后方法开始执行
    @PostConstruct
    private void init(){
        SECKILL_ORDER_EXECUTOR.submit(new VoucherOrderHandler());
    }
    private class VoucherOrderHandler implements Runnable{

        @Override
        public void run() {
            while (true){

                try {
                    //1.获取队列中的订单信息
                    VoucherOrder voucherOrder = orderTasks.take();
                    //2.创建订单
                    handleVoucherOrder(voucherOrder);
                } catch (Exception e) {
                    log.error("处理订单异常");
                }
            }
        }
    }

    private void handleVoucherOrder(VoucherOrder voucherOrder) {
        //获取用户
        Long userId = voucherOrder.getUserId();
        //创建锁对象
        RLock lock = redissonClient.getLock("lock:oder" + userId);
        //获取锁
        boolean islock = lock.tryLock();
        //判断是否获取锁成功
        if (!islock){
            //获取锁失败,返回错误或重试
            log.error("不允许重复下单");
            return;
        }
        try {
            return proxy.createVoucherOrder(voucherOrder);

        } catch (IllegalStateException e) {
            throw new RuntimeException(e);
        } finally {
            lock.unlock();
        }

总结

  1. 秒杀业务的优化是什么?
    1. 先利用Redis完成库存量、一人一单的判断,完成抢单的业务。
    2. 再将下单业务放入阻塞队列,利用独立的线程异步下单
  2. 基于阻塞队列的异步秒杀存在哪些问题
    1. 内存限制问题
    2. 数据安全问题

Redis消息队列实现异步秒杀

消息队列

字面意思就是存放消息的队列,最简单的消息队列模型包括三个角色:

  • 消息队列:存储和管理消息,也被称为消息代理(Message Broker)
  • 生产者:发送消息到消息队列
  • 消费者:从消息队列获取消息并处理消息

Redis提供了三种不同的方式来实现消息队列

  • list结构:基于list结构模拟的消息队列
  • PubSub:基于点对点的消息模型
  • Stream:比较完善的消息队列模型

Redis中的消息队列

基于List的结构模拟的消息队列

Redis的list数据结构是一个双向链表,很容易模拟出队列效果:
队列的出口和入口不是一边,因此我们可以利用:LPUSH结合RPOP或者RPUSH结合LPOP来实现。
不过要注意的是,当队列中没有消息是RPOP或LPOP操作会返回null,并不像JVM的阻塞队列那样会阻塞并等待消息。因此这里应该使用BRPOP或BLPOP来实现阻塞效果。

图片.png

基于List消息队列有哪些优缺点?
优点:

  • 利用Redis存储,不受限于JVM内存上限
  • 基于Redis的持久化机制,数据安全性有保证
  • 可以满足消息的有序性

缺点:

  • 无法避免消息丢失
  • 只支持单消费者

基于PubSub的消息队列

图片.png图片.png

基于Stream的消息队列

Stream是Redis5.0引入的一种新的数据类型,可以实现一个功能非常完善的消息队列

图片.png例如:
图片.png图片.png
图片.png图片.png

Stream消息队列-消费者组

将多个消费者划分到一个组中,监听同一个队列,具备一下特点:

图片.png图片.png图片.png

总结

图片.png

基于Stream作为消息队列,实现异步秒杀下单

图片.png

达人探店

发布探店笔记

探店笔记类似于点评网站的评价,往往是图文结合。对应的表有两个:

  • tb_blog:探店笔记表,包含笔记中的标题、文字、图片等
  • tb_blog_comments:其他用户对探店笔记的评价

图片.png

查看发布探店笔记

接口介绍

图片.png

代码实现

    @Override
    public Result queryBlogById(Long id) {
        //1.查询blog
        Blog blog = getById(id);
        if (blog == null){
            return Result.fail("博客不存在");
        }
        //2.查询blog有关的用户
        queryBlogByUser(blog);
        return Result.ok(blog);
    }

    private void queryBlogByUser(Blog blog) {
        Long userId = blog.getUserId();
        User user = userService.getById(userId);
        blog.setName(user.getNickName());
        blog.setIcon(user.getIcon());
    }

点赞功能

需求

  • 同一个用户只能点赞一次,再次点击则取消赞
  • 如果当前用户已经点赞,则点赞按钮高亮显示(前端已实现,判断字段Blog类的isLike属性)

实现步骤

  1. 给Blog类中添加一个isLike字段,标识是否被当前用户点赞
  2. 修改点赞功能,利用Redis的set集合判断是否点赞过,未点赞过则点赞数加一,已点赞过则点赞数减一
  3. 修改根据id查询Blog的业务,判断当前用户是否点赞过,赋值给isLike字段
  4. 修改分页查询Blog,判断当前登录用户是否点赞过,赋值给isLike字段

代码实现

 public Result queryHotBlog(Integer current) {
        // 根据用户查询
        Page<Blog> page = query()
                .orderByDesc("liked")
                .page(new Page<>(current, SystemConstants.MAX_PAGE_SIZE));
        // 获取当前页数据
        List<Blog> records = page.getRecords();
        // 查询用户
        records.forEach(blog -> {
            this.queryBlogByUser(blog);
            this.isBlockLiked(blog);
        });
        return Result.ok(records);
    }

    @Override
    public Result queryBlogById(Long id) {
        //1.查询blog
        Blog blog = getById(id);
        if (blog == null){
            return Result.fail("博客不存在");
        }
        //2.查询blog有关的用户
        queryBlogByUser(blog);
        //3.查询blog是否备点赞
        isBlockLiked(blog);
        return Result.ok(blog);
    }

    private void isBlockLiked(Blog blog) {
        //1.获取当前登录用户
        Long userId = UserHolder.getUser().getId();
        //2.判断当前登录用户是否点赞
        String key = RedisConstants.BLOG_LIKED_KEY + blog.getId();
        Boolean isMember = stringRedisTemplate.opsForSet().isMember(key,userId.toString());
        blog.setIsLike(BooleanUtil.isTrue(isMember));
    }

    @Override
    public Result likeBlog(Long id) {
        //1.获取当前登录用户
        Long userId = UserHolder.getUser().getId();
        //2.判断当前登录用户是否点赞
        String key = RedisConstants.BLOG_LIKED_KEY + id;
        Boolean isMember = stringRedisTemplate.opsForSet().isMember(key,userId.toString());

        if (BooleanUtil.isFalse(isMember)){
            //3.如果未点赞,可以点赞
            //3.1.数据库点赞加一
            boolean isSuccess = update().setSql("liked = liked +1").eq("id", id).update();
            //3.2保存用户到redis的set集合
            if (isSuccess){
                stringRedisTemplate.opsForSet().add(key,userId.toString());
            }
        }else {
            //4.如果一点赞,取消点赞
            //4.1数据库点赞数减一
            boolean isSuccess = update().setSql("liked = liked -1").eq("id", id).update();
            if (isSuccess){
                //4.2.把用于从Redis的set结合移除
                stringRedisTemplate.opsForSet().add(key,userId.toString());
            }
        }
        return Result.ok();
    }

点赞排行榜

在探店笔记的详情页面,应该把给笔记点赞的人显示出来,比如最早点赞的TOP5,形成点赞排行榜:

接口介绍

图片.png

需求

按照点赞时间先后顺序,返回top5的用户

图片.png

改善点赞功能

可以使用SortedSet里面的排序,以时间戳作为score排序,然后显示排行。
用score方法判断是否存在,如果存在返回score不存在返回null的原理

  private void isBlockLiked(Blog blog) {
        //1.获取当前登录用户
        Long userId = UserHolder.getUser().getId();
        //2.判断当前登录用户是否点赞
        String key = RedisConstants.BLOG_LIKED_KEY + blog.getId();
        Double score = stringRedisTemplate.opsForZSet().score(key, userId.toString());
        blog.setIsLike(score != null);
    }

    @Override
    public Result likeBlog(Long id) {
        //1.获取当前登录用户
        Long userId = UserHolder.getUser().getId();
        //2.判断当前登录用户是否点赞
        String key = RedisConstants.BLOG_LIKED_KEY + id;
        Double score = stringRedisTemplate.opsForZSet().score(key, userId.toString());

        if (score == null){
            //3.如果未点赞,可以点赞
            //3.1.数据库点赞加一
            boolean isSuccess = update().setSql("liked = liked +1").eq("id", id).update();
            //3.2保存用户到redis的set集合
            if (isSuccess){
                stringRedisTemplate.opsForZSet().add(key,userId.toString(),System.currentTimeMillis());
            }
        }else {
            //4.如果一点赞,取消点赞
            //4.1数据库点赞数减一
            boolean isSuccess = update().setSql("liked = liked -1").eq("id", id).update();
            if (isSuccess){
                //4.2.把用于从Redis的set结合移除
                stringRedisTemplate.opsForZSet().remove(key,userId.toString());
            }
        }
        return Result.ok();
    }

点赞top5实现

逻辑:
//1.查询top5的点赞用户 zrange key 0 4
//2.解析出其中的用户id
//3.根据用户id查询用户
//4.返回

代码实现:

    public Result queryBlogLikes(Long id) {
        String key = RedisConstants.BLOG_LIKED_KEY +id;
        //1.查询top5的点赞用户 zrange key 0 4
        Set<String> top5 = stringRedisTemplate.opsForZSet().range(key, 0, 4);
        if (top5 == null || top5.isEmpty()){
            return Result.ok(Collections.emptyList());
        }
        //2.解析出其中的用户id
        List<Long> ids = top5.stream().map(Long::valueOf).collect(Collectors.toList());
        String idStr = StringUtil.join(ids,",");
        //3.根据用户id查询用户
        List<UserDTO> userDtos = userService.query()
                .in("id",ids)
                .last("order by FIELD(id," + idStr + ")").list()
                .stream()
                .map(user -> BeanUtil.copyProperties(user, UserDTO.class))
                .collect(Collectors.toList());
        //4.返回
        return Result.ok(userDtos);
    }

好友关注

关注和取关

需求:基于该表数据结构,实现两个接口

  1. 关注和取关接口
  2. 判断是否关注的接口

关注是User之间的关系,是博主与粉丝之间的关系,数据库中有一张表tb_follow标识

图片.png

关注 取关

业务流程:
//1.判断是关注还是取关
//2.关注,新增数据
//3.取关,删除数据

    public Result follow(Long followUserId, Boolean isFollow) {
        //1.获取登录用户的id
        Long userId = UserHolder.getUser().getId();
        //1.判断是关注还是取关
        if (isFollow) {
            //2.关注,新增数据
            Follow follow = new Follow();
            follow.setUserId(userId);
            follow.setFollowUserId(followUserId);
            save(follow);
        }else {
            //3.取关,删除数据
            remove(new QueryWrapper<Follow>().eq("user_id",userId).eq("follow_user_id",followUserId));
        }
        return Result.ok();
    }

判断是否关注
    @Override
    public Result isFollow(Long followUserId) {
        //1.获取登录用户的id
        Long userId = UserHolder.getUser().getId();
        //1.查询是否关注
        Integer count = query().eq("user_id", userId).eq("follow_user_id", followUserId).count();

        return Result.ok(count>0);
    }

共同关注

需求

利用redis中恰当的数据结构,实现共同关注的功能。在博主的个人页面展示出当前用户与博主的共同好友。

实现思路

取博主和登录用户关注的人的交集,利用redis的set集合的 实现,改造之前的关注功能,在关注时,将用户的id作为key,被关注的用户的id作为值存入redis中,然后查的时候取交集。

    public Result follow(Long followUserId, Boolean isFollow) {
        //1.获取登录用户的id
        Long userId = UserHolder.getUser().getId();
        String key = "follows:"+userId;
        //1.判断是关注还是取关
        if (isFollow) {
            //2.关注,新增数据
            Follow follow = new Follow();
            follow.setUserId(userId);
            follow.setFollowUserId(followUserId);
            boolean isSuccess = save(follow);
            if (isSuccess){
                //把关注的用户的id,放入redis的set集合  add userId foolowUserId

                stringRedisTemplate.opsForSet().add(key,followUserId.toString());
            }
        }else {
            //3.取关,删除数据
            remove(new QueryWrapper<Follow>().eq("user_id",userId).eq("follow_user_id",followUserId));
            //把关注用户的id从Redis中移除
            stringRedisTemplate.opsForSet().remove(key,followUserId);
        }
        return Result.ok();
    }

共同关注的接口介绍

图片.png

代码实现

实现思路:
//1.获取当前登录用户
//2.求交集

//3.解析id集合
//4.查询用户

代码:

    @Override
    public Result followCommons(Long id) {
        //1.获取当前登录用户
        Long userId = UserHolder.getUser().getId();
        String key = "follows:" +userId;
        //2.求交集
        String key2 = "follows:"+id;
        Set<String> intersect = stringRedisTemplate.opsForSet().intersect(key, key2);
        if (intersect == null || intersect.isEmpty()){
            return Result.ok(Collections.emptyList());
        }
        //3.解析id集合
        List<Long> ids = intersect.stream().map(Long::valueOf).collect(Collectors.toList());
        //4.查询用户
        List<UserDTO> users = userService.listByIds(ids)
                .stream()
                .map(user -> BeanUtil.copyProperties(user, UserDTO.class))
                .collect(Collectors.toList());

        return Result.ok(users);
    }

关注推送

关注推送也叫Feed流,直译为投喂,为用户提供“沉浸式”体验。通过无线下拉刷新获取新的消息

Feed流的模式

Feed流产品有两种常见的形式

  1. Timeline:不做内容筛选,简单的按照内容发布时间顺序,常用语好友或关注。例如朋友圈
    1. 优点:信息全面,不会有确实,实现也相对简单。
    2. 缺点:信息噪音较多,用户不一定感兴趣,内容获取效率低。
  2. 智能排序:利用智能算法屏蔽掉违规的、用户不感兴趣的内容。推送用户感兴趣的信息,来吸引用户
    1. 优点:投喂用户感兴趣信息,用户粘度很高,容易沉迷。
    2. 缺点:算法不准确,可能起到反作用。

例如本例中的个人页面,是基于关注的好友来做Feed流,因此采用Timeline的模式。该模式的实现有三种方案:

  1. 拉模式:博主发布消息后(有时间戳),在用户读的时候,拉取博主发布的消息。
    1. 缺点:延迟

图片.png

  1. 推模式:在博主发布消息后,直接推送给关注他的人。
    1. 缺点:占内存

图片.png

  1. 推拉结合:大v推送给活跃粉丝,普通粉丝用拉取

图片.png图片.png

基于推模式实现关注推送功能

需求:

  1. 修改新增探店笔记的业务,在保存blog到数据库的同时,推送到粉丝的收件箱
    public Result saveBlog(Blog blog) {
        // 获取登录用户
        UserDTO user = UserHolder.getUser();
        blog.setUserId(user.getId());
        // 保存探店博文
        boolean isSuccess = save(blog);

        if (!isSuccess){
            return Result.fail("新增笔记失败");
        }

        //3.查询笔记作者的所有粉丝
        List<Follow> follows = followService.query().eq("follow_user_id", user.getId()).list();
        //4.推送笔记id给所有粉丝
        for (Follow follow : follows) {
            //4.1.获取粉丝的id
            Long userId = follow.getUserId();
            //4.2.推送
            String key = "feed:"+userId;
            stringRedisTemplate.opsForZSet().add(key,blog.getId().toString(),System.currentTimeMillis());
        }
        // 返回id
        return Result.ok(blog.getId());
    }
  1. 收件箱满足可以根据时间戳排序,必须用Redis的数据结构实现
  2. 查询收件箱数据时,可以实现分页查询

滚动分页查询参数:

  • max(时间戳):当前时间戳 | 上一次查询的最小的时间戳
  • min:0
  • offset (偏移量):0 | 在上一次的结果中,与最小值一样的元素个数
  • count(查询数量):查几条

接口:
图片.png

Feed流的分页问题
Feed流中的数据会不断更新,所以数据的角标也在发生变化,因此不能采用传统的分页模式。

图片.png

Feed流的滚动分页

图片.png

实现逻辑:
//1.获取当前用户
//2.查询收件箱
//3.解析数据:blogId、score(时间戳)、offset
//4.根据id查询blog
//5.封装并返回

代码实现:

    @Override
    public Result queryBlogOfFollow(Long max, Integer offset) {
        //1.获取当前用户
        Long userId = UserHolder.getUser().getId();
        //2.查询收件箱 ZREVRANGBYSCORE key max Min LIMIT offset count
        String key = "feed:"+userId;
        Set<ZSetOperations.TypedTuple<String>> typedTuples = stringRedisTemplate.opsForZSet()
                .reverseRangeByScoreWithScores(key, 0, max, offset, 3);
        //3.非空判断
        if (typedTuples == null || typedTuples.isEmpty()){
            return null;
        }
        long minTime =0;
        int os = 1;
        //3.解析数据:blogId、score(时间戳)、offset
        List<Long> ids = new ArrayList<>(typedTuples.size());
        for (ZSetOperations.TypedTuple<String> tuple : typedTuples) {

            //4.1获取id
            ids.add(Long.valueOf(tuple.getValue()));

            //4.2获取分数(时间戳)

            Long time = tuple.getScore().longValue();
            if (time == minTime){
                os++;
            }else {
                minTime = time;
                os = 1;
            }
        }
        String idStr = StringUtil.join(ids, ",");
        //4.根据id查询blog
        List<Blog> blogs = query()
                .in("id",ids)
                .last("order by FIELD(id," + idStr + ")").list();
        for (Blog blog : blogs) {
            //2.查询blog有关的用户
            queryBlogByUser(blog);
            //3.查询blog是否备点赞
            isBlockLiked(blog);
        }
        //5.封装并返回
        ScrollResult r = new ScrollResult();
        r.setList(blogs);
        r.setOffset(offset);
        r.setMinTime(minTime);
        return Result.ok(r);
    }

附近商户

GEO数据结构

图片.png

附近商户搜索

接口介绍

图片.png

实现思路

按照商户类型做分组,类型相同的商户作为同一组,以typeId作为key存入同一个GEO集合中即可。

图片.png
图片.png

代码实现

用户签到

BitMap

假如我们用一张表来存储用户的登录信息,其结构应该如下。

图片.png

我们按月来统计用户信息。签到记录为1,未签到则为0
把每一个bit位对应当月的每一天,形成了映射关系。用0和1标识业务状态,这种思路称为位图BitMap

图片.png

Redis中利用String类型的数据结构实现BitMap,因此最大上限是52M,转换为bit则是2的32次个bit位。

BitMap的操作命令

图片.png

实现签到功能

需求

实现签到接口,将当前的用户当天的签到信息保存到Redis中

图片.png

注意:因为BitMap底层是基于String数据结构,因此其操作都封装在字符串相关的操作中。

图片.png

代码实现

实现逻辑

//1.获取当前登录用户
//2.获取日期
//3.拼接key
//4.获取今天是本月的第几天
//5.写入redis SETBIT key offset 1

代码
    public Result sign() {
        //1.获取当前登录用户
        Long userId = UserHolder.getUser().getId();
        //2.获取日期
        LocalDateTime now = LocalDateTime.now();
        //3.拼接key
        String keySuffix = now.format(DateTimeFormatter.ofPattern(":yyyyMM"));
        String key = RedisConstants.USER_SIGN_KEY + userId + keySuffix;
        //4.获取今天是本月的第几天
        int dayOfMonth = now.getDayOfMonth();
        //5.写入redis  SETBIT key offset 1
        stringRedisTemplate.opsForValue().setBit(key,dayOfMonth-1,true);
        return Result.ok();
    }

签到统计

问题

图片.png

需求

图片.png

实现逻辑

//1.获取当前登录用户
//2.获取日期
//3.拼接key
//4.获取今天是本月的第几天

//5.获取本月截止今天为止的所有的签到记录,返回的是一个十进制的数字
//6.循环遍历
//7.让这个数字与1做与运算,得到数字的最后的一个bit位
//判断这个bit位是否为0
//如果为0,说明未签到,结束
//如果不为0。说明已签到。计数器+1
//把数字右移一位,抛弃最后一个bit位,继续下一个bit位

代码实现

    public Result signCount() {
        //1.获取当前登录用户
        Long userId = UserHolder.getUser().getId();
        //2.获取日期
        LocalDateTime now = LocalDateTime.now();
        //3.拼接key
        String keySuffix = now.format(DateTimeFormatter.ofPattern(":yyyyMM"));
        String key = RedisConstants.USER_SIGN_KEY + userId + keySuffix;
        //4.获取今天是本月的第几天
        int dayOfMonth = now.getDayOfMonth();
        //5.获取本月截止今天为止的所有的签到记录,返回的是一个十进制的数字
        List<Long> result = stringRedisTemplate.opsForValue().bitField(
                key,
                BitFieldSubCommands.create().get(BitFieldSubCommands.BitFieldType.unsigned(dayOfMonth)).valueAt(0));
        if (result == null || result.isEmpty()){
            //没有任何签到结果
            return Result.ok(0);
        }
        Long num = result.get(0);
        if (num == null || num ==0){
            return Result.ok(0);
        }
        //6.循环遍历
        int count = 0;
        while (true){
            //7.让这个数字与1做与运算,得到数字的最后的一个bit位
            //判断这个bit位是否为0
            if ((num & 1) == 0){
                //如果为0,说明未签到,结束
                break;
            }else {
                //如果不为0。说明已签到。计数器+1
                count++;
            }
            //把数字右移一位,抛弃最后一个bit位,继续下一个bit位
            num >>>= 1;
        }
        return Result.ok(count);
    }

HyperLogLog

HyperLogLog的用法

图片.png

Redis高级篇

单点Redis的问题:

图片.png

Redis持久化

RDB

RDB全称Redis Database Backup file (Redis数据备份文件),也被叫做Redis数据快照。简单来说就是把内存中的所有数据都记录到磁盘中。当Redis实例故障重启后,从磁盘读取快照文件,恢复数据。
快照文件成为RDB文件,默认是保存在当前运行目录。

图片.png

Redis停机时会自动执行一次RDB

Redis内部有触发RDB的机制,可以在redis.conf文件中找到。格式如下:

图片.png

RDB的其他配置也可以在redis.conf文件中设置

图片.png

文章来源:https://blog.csdn.net/m0_65180797/article/details/132756744
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。