redis中使用pipeline批量处理请求提升系统性能

2023-12-13 10:57:33

在操作数据库时,为了加快程序的执行速度,在新增或更新数据时,可以通过批量提交的方式来减少应用和数据库间的传输次数;在redis中也有这样的技术实现批量处理,也就是管道——Pipeline。它也是通过批量提交数据的方式来实现的,将要执行的redis命令提交到pipeline中,pipeline一次性的将数据发送给服务器,服务器再逐条执行命令。在执行命令过程中不是原子性的,可以插入其他命令执行。
下面演示在jedis中使用管道:

<dependency>
    <groupId>redis.clients</groupId>
    <artifactId>jedis</artifactId>
    <version>4.3.0</version>
</dependency>

先通过一个测试示例代码看一下运行时间差异:

import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisPoolConfig;
import redis.clients.jedis.Pipeline;

import java.time.Duration;

public class JedisUtil {

    /**
     * 连接地址
     */
    private String host;
    /**
     * 连接端口号
     */
    private int port;
    /**
     * 密码
     */
    private String password;
    /**
     * 连接池
     */
    private JedisPool jedisPool;

    /**
     * 连接初始化
     * @param host
     * @param port
     * @param password
     */
    public JedisUtil(String host, int port, String password) {
        this.host = host;
        this.port = port;
        this.password = password;

        JedisPoolConfig config = new JedisPoolConfig();
        config.setMaxTotal(256);
        config.setMaxIdle(256);
        config.setMinIdle(1);
        config.setMaxWait(Duration.ofMillis(300));

        if(password != null && !"".equals(password)) {
            jedisPool = new JedisPool(config, host, port, 500, password);
        } else {
            jedisPool = new JedisPool(config, this.host, this.port, 500);
        }
    }

    /**
     * 关闭连接池
     */
    public void close() {
        if(jedisPool != null && !jedisPool.isClosed()) {
            jedisPool.clear();
            jedisPool.close();
        }
    }

    /**
     * 获取连接
     * @return
     */
    public Jedis getJedis() {
        if(jedisPool != null && !jedisPool.isClosed()) {
            return jedisPool.getResource();
        }
        return null;
    }

    /**
     * 归还jedis对象
     * @param jedis
     */
    public void returnJedis(Jedis jedis) {
        if(jedis != null) {
            jedis.close();
        }
    }

    public static void main(String[] args) {
        // 获取jedis连接
        JedisUtil util = new JedisUtil("192.168.56.101", 6379, "");
        Jedis jedis = util.getJedis();

        // 设置键的数量:100万
        int KEY_COUNT = 1_000_000;

        // 普通方式set
        long start1 = System.currentTimeMillis();
        for(int i = 0; i < KEY_COUNT; i++) {
            jedis.set("key1_" + i, "value1_" + i);
        }
        System.out.println("use time : " + (System.currentTimeMillis() - start1) + "ms");

        // 清理数据库的key,线上系统不要使用
        jedis.flushDB();
        
        // 使用管道set
        long start2 = System.currentTimeMillis();
        Pipeline pipeline = jedis.pipelined();
        int num = 0;
        for(int i = 0; i < KEY_COUNT; i++) {
            pipeline.set("key2_" + i, "value2_" + i);
            if(num++ >= 200) {
//                pipeline.syncAndReturnAll();
                pipeline.sync();
                pipeline.close();

                pipeline = jedis.pipelined();
                num = 0;
            }
        }
        if(num != 0) {
            pipeline.syncAndReturnAll();
            pipeline.close();
        }
        System.out.println("pipeline : " + (System.currentTimeMillis() - start2) + "ms");

        // 清理数据库的key,线上系统不要使用
        jedis.flushDB();
    }
}

上面的代码运行两次,调整先后顺序分别运行,得到的运行时间:

use time : 79297ms
pipeline : 2036ms

pipeline : 1747ms
use time : 85078ms

可以看到两次运行的时间差异还是非常明显的,基本上差距40~50倍,再实际运行时可以多次测试并调整每次pipeline提交命令的条数,找到每次提交数据时性能最好的数据条数。pipeline每次提交数据量不宜过多,太多的命令一次提交会导致客户端等待结果时间比较长,也会让连接的缓冲区数据量过大。

pipeline本身没有过多内容需要讲解,下面介绍一下如何在redisTemplate中使用pipeline,redisTemplate中已经提供了对应方法executePipelined()可以直接调用,它支持两个类型的参数:RedisCallback更接近redis原生命令,但是需要自己将键和值都转换为字节码传递过去;SessionCallback对操作进行了封装,可以根据操作不同的数据类型进行转换,方便api使用。

List<Object> datas = redisTemplate.executePipelined(new RedisCallback<Object>() {
    @Override
    public Object doInRedis(RedisConnection connection) throws DataAccessException {
        connection.set("key1".getBytes(StandardCharsets.UTF_8), "value1".getBytes(StandardCharsets.UTF_8));
        connection.set("key2".getBytes(StandardCharsets.UTF_8), "value2".getBytes(StandardCharsets.UTF_8));
        connection.set("key3".getBytes(StandardCharsets.UTF_8), "value3".getBytes(StandardCharsets.UTF_8));
        connection.set("key4".getBytes(StandardCharsets.UTF_8), "value4".getBytes(StandardCharsets.UTF_8));
        connection.set("key5".getBytes(StandardCharsets.UTF_8), "value5".getBytes(StandardCharsets.UTF_8));
        connection.set("key6".getBytes(StandardCharsets.UTF_8), "value6".getBytes(StandardCharsets.UTF_8));
        connection.get("key1".getBytes(StandardCharsets.UTF_8));

        // 这里必须返回null,在 connection.closePipeline() 时覆盖原来的返回值,所以返回值没有必要设置,设置会报错
        return null;
    }
});
List<Object> datas = redisTemplate.executePipelined(new SessionCallback<Object>() {
    @Override
    public <K, V> Object execute(RedisOperations<K, V> operations) throws DataAccessException {
        ValueOperations<String, String> op1 = (ValueOperations<String, String>) operations.opsForValue();
        op1.set("key7", "value7");
        op1.set("key8", "value8");
        op1.get("key2");

        SetOperations<String, String> op2 = (SetOperations<String, String>) operations.opsForSet();
        op2.add("set_demo", "value1", "value2", "value3");
        op2.randomMember("set_demo");

        return null;
    }
});

pipeline非常显著的提升系统性能,对于redis这种内存数据库,每天的请求量会非常高,对于系统优化来说,管道技术的使用应该成为代码的一个优化点。

文章来源:https://blog.csdn.net/dream_xin2013/article/details/134827186
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。