JedisCluster 通过 Pipeline 实现两套数据轮换更新

2023-12-16 09:30:31

其他系列文章导航

Java基础合集
数据结构与算法合集

设计模式合集

多线程合集

分布式合集

ES合集


文章目录

其他系列文章导航

文章目录

前言

一、整体流程

1.1 大致流程

1.2 流程代码解释

二、从数据库里查数据

2.1 SQL语句

三、更新当前前缀

3.1 设置前缀常量

3.2 初始化 currentPrefixIndex

3.3 获取当日前缀?

?3.4?更新 currentPrefixIndex

四、往redis集群更新数据

4.1 大致流程

五、JedisCluster 实现?Pipeline 操作

5.1 实现过程


前言

本文实现了通过定时任务来调用接口,使两套数据轮换更新。

因为要区分两套数据,所以 key 要设置前缀。

例如:一天数据一换,今天查的 A 开头的 key ,明天查 B 开头的 key ,后天又查 A 开头的 key 。今天查完后,明天更新 B 开头的 key ,但是?A 开头的 key 暂时不动,后天再查的时候,A开头的 key 要进行更新,先删再更新。


一、整体流程

1.1 大致流程

  1. 从数据库里查数据。
  2. 更新当前前缀。
  3. 往redis集群更新数据。

1.2 流程代码解释

    @Override
    public R<String> updateCampToJedis() {
        R<String> r = new R<>();
        SimpleDateFormat dateFormat = new SimpleDateFormat("yyyyMM");
        String currentMonth = dateFormat.format(new Date());
        //1. 从数据库里查数据
        List<UserWideInfo> UserWideInfoList = UserWideInfoMapper.selectFromTable(currentMonth);
        if (UserWideInfoList.size() == 0) {
            r.setCode(R.ERROR_CODE);
            r.setMsg("没有数据");
            return r;
        }
        //2. 更新当前前缀
        updateCurrentPrefixIndex();
        r.setCode(R.SUCCESS_CODE);
        //3. 往redis集群存入数据
        insertToJedis(ZhmsUserWideInfoList);
        return r;
    }

二、从数据库里查数据

2.1 SQL语句

这里因为每个月查询的是不同月份的表,所有用到动态 sql 。

    <select  id="selectFromTable" resultType="com.hopedata.zhmscloud.camp.entity.po.ZhmsUserWideInfo">
        SELECT * FROM USER_WIDE_INFO_M_${SysMonth}
    </select>

三、更新当前前缀

要做到更新当前前缀,需要有两套前缀不同的 key ,还需要一个能区分前缀的前缀索引?currentPrefixIndex 。

3.1 设置前缀常量

用 A 和 B 来区分两组 key 。

代码如下:?

    private static final String PREFIX_A = "A";
    private static final String PREFIX_B = "B";

3.2 初始化 currentPrefixIndex

向 redis集群中存入初始的?currentPrefixIndex 。

代码如下:?

    @GetMapping("/init")
    public String init() {
        return jedisCluster.set("currentPrefixIndex", "0");
    }

3.3 获取当日前缀?

先取出当日的前缀索引 currentPrefixIndex ,与 2 取余数 ,来获取当日的前缀。

代码如下:?

     //获取当日前缀
    private String getKeyPrefix() {
        int currentPrefixIndex = Integer.parseInt(jedisCluster.get("currentPrefixIndex"));
        if (currentPrefixIndex % 2 == 0) {
            return PREFIX_A;
        } else {
            return PREFIX_B;
        }
    }

?3.4?更新 currentPrefixIndex

每天需要更新前缀索引 currentPrefixIndex ,让 currentPrefixIndex + 1 ,?使区分读的数据。

代码如下:?

    // 重新设置currentPrefixIndex
    private void updateCurrentPrefixIndex() {
        String currentValue = jedisCluster.get("currentPrefixIndex");
        int newValue = Integer.parseInt(currentValue) + 1;
        jedisCluster.set("currentPrefixIndex", String.valueOf(newValue));
    }

四、往redis集群更新数据

这其实是最重要的一步,因为同时存入大量的数据,所以要使用到 Pipeline 来实现。

4.1 大致流程

  1. 获取到当前前缀,查出相关的 key ,更新数据之前把旧数据删除。
  2. 把新数据解析后更新到 redis 集群。

注意:因为数据量大,为了减少网络性能消耗,删除和更新都要用?Pipeline 来操作。

代码如下:

    private void insertToJedis(List<UserWideInfo> UserWideInfoList) {
        String keyPrefix = getKeyPrefix();
        List<String> keys = new ArrayList<>();
        Map<String, JedisPool> clusterNodes = jedisCluster.getClusterNodes();
        for (JedisPool node : clusterNodes.values()) {
            try (Jedis jedis = node.getResource()) {
                Set<String> nodeKeys = jedis.keys(keyPrefix + "*");
                keys.addAll(nodeKeys);
            }
        }
        Map<JedisPool, List<String>> delKey = assignKey(keys, jedisCluster);
        //先删旧的
        for (JedisPool jedisPool : delKey.keySet()) {
            try (Jedis jedis = jedisPool.getResource()){
                Pipeline pipelined = jedis.pipelined();
                List<String> keysList = delKey.get(jedisPool);
                for (String key : keysList) {
                    pipelined.del(key);
                }
                pipelined.sync();
            }
        }
        List<String> keyList =new ArrayList<>();
        HashMap<String, String> map = new HashMap<>();
        //填充keyList和value
        for (UserWideInfo UserWideInfo : UserWideInfoList) {
            String key = keyPrefix + "_" + UserWideInfo.getBillNo();
            keyList.add(key);
            //构建value
            ...
            ...
            map.put(key, value);
        }
        Map<JedisPool, List<String>> result = assignKey(keyList, jedisCluster);
        for (JedisPool jedisPool : result.keySet()) {
            try (Jedis jedis = jedisPool.getResource()){
                Pipeline pipelined = jedis.pipelined();
                // 获取当前JedisPool对应的键列表
                List<String> keysList = result.get(jedisPool); 
                // 将命令添加到Pipeline中
                for (String key : keysList) {
                    String value = map.get(key);
                    pipelined.set(key, value);
                }
                // 执行Pipeline中的所有命令
                pipelined.sync();
            }
        }
    }

五、JedisCluster 实现?Pipeline 操作

5.1 实现过程

因为 JedisCluster 不支持?Pipeline 操作,所以需要自己来实现。

代码如下:

@Slf4j
public class JedisPipelineUtil {

    /**
     * jedis集群下使用pipeline之前先将key分配管道
     * Map<String, List<String>> 键值为节点ip和端口号 192.168.1.1:6397 value为redis存入的key
     *
     * @param list         存redis的key
     * @param jedisCluster
     * @return
     */
    public static Map<String, List<String>> assignSlot(List<String> list, JedisCluster jedisCluster) {
        Map<String, List<String>> hostPhoneMap = new HashMap<>();
        Map<String, JedisPool> clusterNodes = jedisCluster.getClusterNodes();
        Map.Entry<String, JedisPool> next = clusterNodes.entrySet().iterator().next();
        JedisPool jedisPool = next.getValue();
        Jedis jedis = jedisPool.getResource();
        Map<Integer, String> slots = discoverClusterSlots(jedis);
        for (String s : list) {
            String hostAndPort = slots.get(JedisClusterCRC16.getSlot(s));
            if (hostPhoneMap.containsKey(hostAndPort)) {
                hostPhoneMap.get(hostAndPort).add(s);
            } else {
                List<String> newList = new ArrayList<>();
                newList.add(s);
                hostPhoneMap.put(hostAndPort, newList);
            }
        }
        jedis.close();
        return hostPhoneMap;
    }

    /**
     * jedis集群下使用pipeline之前先将key分配管道
     * Map<JedisPool, List<String>> 键值为节JedisPool value为redis存入的key
     *
     * @param list         存redis的key
     * @param jedisCluster
     * @return
     */
    public static Map<JedisPool, List<String>> assignKey(List<String> list, JedisCluster jedisCluster) {
        Map<JedisPool, List<String>> map = new HashMap<>();
        Map<String, List<String>> var1 = assignSlot(list, jedisCluster);
        Iterator<Map.Entry<String, List<String>>> iterator = var1.entrySet().iterator();
        while (iterator.hasNext()) {
            Map.Entry<String, List<String>> next = iterator.next();
            JedisPool jedisPool = jedisCluster.getClusterNodes().get(next.getKey());
            map.put(jedisPool, next.getValue());
        }
        return map;

    }

    private static Map<Integer, String> discoverClusterSlots(Jedis jedis) {
        Map<Integer, String> slotsMap = new HashMap<>();
        List<Object> slots = jedis.clusterSlots();
        Iterator var3 = slots.iterator();
        while (var3.hasNext()) {
            Object slotInfoObj = var3.next();
            List<Object> slotInfo = (List) slotInfoObj;
            if (slotInfo.size() > 2) {
                List<Integer> slotNums = getAssignedSlotArray(slotInfo);
                List<Object> hostInfos = (List) slotInfo.get(2);
                if (!hostInfos.isEmpty()) {
                    String targetNode = generateHostAndPort(hostInfos);
                    Iterator<Integer> var4 = slotNums.iterator();
                    while (var4.hasNext()) {
                        Integer slot = var4.next();
                        slotsMap.put(slot, targetNode);
                    }
                }
            }
        }
        return slotsMap;
    }

    private static List<Integer> getAssignedSlotArray(List<Object> slotInfo) {
        List<Integer> slotNums = new ArrayList<>();

        for (int slot = ((Long) slotInfo.get(0)).intValue(); slot <= ((Long) slotInfo.get(1)).intValue(); ++slot) {
            slotNums.add(slot);
        }

        return slotNums;
    }

    private static String generateHostAndPort(List<Object> hostInfos) {
        String host = SafeEncoder.encode((byte[]) hostInfos.get(0));
        int port = ((Long) hostInfos.get(1)).intValue();
        return host + ":" + port;
    }
}

使用?assignKey 方法就可以分配管道。


文章来源:https://blog.csdn.net/kologin/article/details/134825927
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。