【flink】基于flink全量同步postgres表到doris

2023-12-13 10:28:19

在不借助第三方组件的进行数据同步时,doris支持采用外部表进行insert select的方式进行导入,但是不适用于数据量大的表,除非自己手动做分片进行多次导入 。

flink提供了doris connector进行数据写入,实际是stream load方式(每个checkpoint都会开启一个streamload,期间sink算子接收的数据会通过http chunked机制流式往BE写),这个方式受streaming_load_max_mb参数限制,即一次checkpoint周期内数据的大小上限,因此我们期望的是流式读取postgres源表,同时周期性地触发checkpoint,每次checkpoint都将大表的部分数据归到一个streamload往doris提交,这样就能持续稳定地同步大表。

总的来说,在source算子中读取大表时需要支持两点:

  1. 流读数据,不能把数据一次性全部加载出来
  2. 在流读数据期间,可以触发并完成checkpoint

flink cdc connector

如果使用的是flink-cdc 2.4+ 版本,那么这是最好的方式。postgres cdc在这个版本才接入了增量快照框架,除了满足上述条件,还支持并行读取,断点续传等功能。

如果cdc版本小于2.4,是直接基于debezium实现 ,在全量同步阶段一直持有checkpoint lock,导致无法进行checkpoint.

  • 解决方法一

对于千万级这种不是太大的表,可以预估时间增加checkpoint间隔,或者调整execution.checkpointing.tolerable-failed-checkpoints,确保在数据全部读完后才触发第一次成功的checkpoint,同时由于全量数据都在同一个checkpoint提交到doris了,所以还需要适当调整streaming_load_max_mb。不公限制大,还很麻烦。

  • 解决方法二

全量阶段不再持有checkopointLock,支持checkpoint,并忽略在全量阶段触发的checkpoint

com.ververica.cdc.debezium.internal.DebeziumChangeFetcher#runFetchLoop:

if (isInDbSnapshotPhase) {
    List<ChangeEvent<SourceRecord, SourceRecord>> events = handover.pollNext();
    LOG.info("Database snapshot phase can perform checkpoint, not acquired Checkpoint lock.");
    handleBatch(events);
    while (isRunning && isInDbSnapshotPhase) {
        handleBatch(handover.pollNext());
    }
    LOG.info("Received record from streaming binlog phase, none checkpoint lock to be released.");
  
}

com.ververica.cdc.debezium.DebeziumSourceFunction#snapshotState:

if (this.debeziumChangeFetcher == null
        || this.debeziumChangeFetcher.isInDbSnapshotPhase()) {
    LOG.info(
            "Is snapshot phase, skip checkpoint {}.",
            functionSnapshotContext.getCheckpointId());
    return;
}

相比于2.4版本,只能是单并行度读,且在job异常时可能需要整表重新导入,如果只想执行全量阶段,可以配置 debezium.snapshot.mode=initial_only,不方便升级的话这种方式也凑合,数据慢慢同步。

flink jdbc connector

基于jdbc连接器生成的是有界的流,source算子在数据读取过程中是支持checkpoint的,因为在InputFormatSourceFunction#run中是不会获取checkpoint锁。在使用该连接器读postgresql表时,需要注意两个配置:

  1. scan.auto-commit设置为false
  2. scan.fetch-size设置合理大小

这样才能实现流读的效果,不至于一次性把数据加载到ResultSet中。同时jdbc连接器还支持表按指定的列进行分区,以支持并行拉取数据,要注意的是,当将表分区并行读,如果source算子某些task分配不到分片或者数据倾斜而造成只有部分task处理RUNNING状态,那将一直触发不了checkpoint.

Failed to trigger checkpoint for job d8beca3ec399435cf54b09d029a75543 since Some tasks of the job have already finished and checkpointing with finished tasks is not enabled. Failure reason: Not all required tasks are currently running…

所以这种方式同步数据到doris,不能多并行度读。

自定义SourceFunction

  1. 继承SourceFunction,不能同步checkpoint锁
  2. 设置非自动提交及fetch size实现流读
  3. 禁止事务空闲超时idle_in_transaction_session_timeout=0
  4. 转换postgresql中timestamp类型列格式
  5. 输出json字符串格式给到doris sink.
    @Override
    public void run(SourceContext ctx) throws Exception {
        
        Class.forName("org.postgresql.Driver");
        Connection con = DriverManager.getConnection(url,usr,pwd);
        con.setAutoCommit(false);  // 结合fetch size实现流读pg
        Statement stmt = con.createStatement();
        stmt.setQueryTimeout(Integer.MAX_VALUE);
        stmt.execute("SET idle_in_transaction_session_timeout = 0");
        stmt.setFetchSize(10000);
        try {
            ResultSet rs = stmt.executeQuery(sql);
            ResultSetMetaData md = rs.getMetaData();
            int columnCount = md.getColumnCount(); // 获得列数
            HashMap<String, String> colTypes = new HashMap<>();
            for (int i = 1; i <= columnCount; i++) {
                colTypes.put(md.getColumnName(i), md.getColumnTypeName(i));
            }
            while (rs.next()) {
                Map<String, Object> rowData = new HashMap<>();
                for (String name : colTypes.keySet()) {
                    Object data = rs.getObject(name);
                    if (colTypes.get(name).equals("timestamp") && data != null) {
                        data = TemporalConversions.toLocalDateTime(data, ZoneId.of("Asia/Shanghai")).format(RFC3339_TIMESTAMP_FORMAT);
                    }
                    rowData.put(name, data);
                }
                ctx.collect(JSONUtil.toJsonStr(rowData));
            }
        } catch (Exception e) {
            logger.error("source异常,执行失败:{}", sql, e);
        }
    }

文章来源:https://blog.csdn.net/czmacd/article/details/134838515
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。