深入理解 MyBatis-Plus 批量保存方法

2024-01-09 15:37:22

前言

在项目开发中,需要插入批量插入20多万条数据,通过日志观察,发现在调用MyBatis-Plus中的saveBatch()方法性能非常的差,本篇文章主要分享一下saveBatch()的原理以及使用的注意事项

原理

我们通过源码的形式进行解析saveBatch()方法的原理

    @Transactional(rollbackFor = Exception.class)
    default boolean saveBatch(Collection<T> entityList) {
        //DEFAULT_BATCH_SIZE 默认是1000
        return saveBatch(entityList, DEFAULT_BATCH_SIZE);
    }
    @Transactional(rollbackFor = Exception.class)
    @Override
    public boolean saveBatch(Collection<T> entityList, int batchSize) {
        String sqlStatement = getSqlStatement(SqlMethod.INSERT_ONE);
        //分批执行SQL
        return executeBatch(entityList, batchSize, (sqlSession, entity) -> sqlSession.insert(sqlStatement, entity));
    }

我们看下saveBatch是怎么批量执行的

    public static <E> boolean executeBatch(Class<?> entityClass, Log log, Collection<E> list, int batchSize, BiConsumer<SqlSession, E> consumer) {
        Assert.isFalse(batchSize < 1, "batchSize must not be less than one");
        return !CollectionUtils.isEmpty(list) && executeBatch(entityClass, log, sqlSession -> {
            int size = list.size();
            int i = 1;
            for (E element : list) {
                //数据最终保存在StatementImpl.batchArgs中,用于批量保存
                consumer.accept(sqlSession, element);
                if ((i % batchSize == 0) || i == size) {
                    //批量保存StatementImpl.batchArgs中数据
                    sqlSession.flushStatements();
                }
                i++;
            }
        });
    }

通过flushStatements()方法我们可以看到最终调用的是StatementImpl中的executeBatchInternal()方法。注意:代码过长,下面方法做了删减。

protected long[] executeBatchInternal() throws SQLException {
        JdbcConnection locallyScopedConn = checkClosed();

        synchronized (locallyScopedConn.getConnectionMutex()) {
            try {
                resetCancelledState();

                statementBegins();

                try {
                    this.retrieveGeneratedKeys = true; // The JDBC spec doesn't forbid this, but doesn't provide for it either...we do..

                    long[] updateCounts = null;

                    if (batchedArgs != null) {
                        int nbrCommands = batchedArgs.size();

                        this.batchedGeneratedKeys = new ArrayList<>(batchedArgs.size());

                        //批量执行 SQL 的开关
                        boolean multiQueriesEnabled = locallyScopedConn.getPropertySet().getBooleanProperty(PropertyKey.allowMultiQueries).getValue();
                        //rewriteBatchedStatements参数(以将多条SQL语句合并为一条,提高数据库操作的效率)并且数量大于4
                        if (multiQueriesEnabled || this.rewriteBatchedStatements.getValue() && nbrCommands > 4) {
                            return executeBatchUsingMultiQueries(multiQueriesEnabled, nbrCommands, individualStatementTimeout);
                        }

            } finally {

                stopQueryTimer(timeoutTask, false, false);
                resetCancelledState();

                setTimeoutInMillis(individualStatementTimeout);

                clearBatch();
            }
        }
    }

我们再看下insert做了什么事情

  public int insert(String statement, Object parameter) {
    return update(statement, parameter);
  }
  
  public int update(String statement, Object parameter) {
    try {
      dirty = true;
      MappedStatement ms = configuration.getMappedStatement(statement);
      return executor.update(ms, wrapCollection(parameter));
    } catch (Exception e) {
      throw ExceptionFactory.wrapException("Error updating database.  Cause: " + e, e);
    } finally {
      ErrorContext.instance().reset();
    }
  }
  public int update(MappedStatement ms, Object parameter) throws SQLException {
    ErrorContext.instance().resource(ms.getResource()).activity("executing an update").object(ms.getId());
    if (closed) {
      throw new ExecutorException("Executor was closed.");
    }
    clearLocalCache();
    return doUpdate(ms, parameter);
  }

重点方法在doUpdate(ms,parameter). 完成SQL的拼装

@Override
  public int doUpdate(MappedStatement ms, Object parameterObject) throws SQLException {
    final Configuration configuration = ms.getConfiguration();
    final StatementHandler handler = configuration.newStatementHandler(this, ms, parameterObject, RowBounds.DEFAULT, null, null);
    final BoundSql boundSql = handler.getBoundSql();
    final String sql = boundSql.getSql();
    final Statement stmt;
     // 数据的SQL语句必须完全一致,包括表名和列
    if (sql.equals(currentSql) && ms.equals(currentStatement)) {
      int last = statementList.size() - 1;
      stmt = statementList.get(last);
      applyTransactionTimeout(stmt);
      handler.parameterize(stmt);// fix Issues 322
      BatchResult batchResult = batchResultList.get(last);
      batchResult.addParameterObject(parameterObject);
    } else {
      Connection connection = getConnection(ms.getStatementLog());
      stmt = handler.prepare(connection, transaction.getTimeout());
      handler.parameterize(stmt);    // fix Issues 322
      currentSql = sql;
      currentStatement = ms;
      statementList.add(stmt);
      batchResultList.add(new BatchResult(ms, sql, parameterObject));
    }
    handler.batch(stmt);
    return BATCH_UPDATE_RETURN_VALUE;
  }

以上就是saveBatch的原理。

总结

1: 想要批量执行操作 数据库链接参数加上rewriteBatchedStatements=true

rewriteBatchedStatements参数需要保证5.1.13以上版本的驱动才能实现高性能的批量插入

2: 根据doUpdate(ms,parameter). 完成SQL的拼装的原理可以得出,如果批量插入的数据,有些数据字段值为null,不会批量查询,而是单独拼装一个SQL执行。

例如:

public class Student {
    
    private String name;
    
    private String address;
}

100个Student,其中 20个name=null,其中 50个address==null。通过日志我们看下这种不会批量插入。

文章来源:https://blog.csdn.net/weixin_50087960/article/details/135480108
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。