【flink】基于flink全量同步postgres表到doris
在不借助第三方组件的进行数据同步时,doris支持采用外部表进行insert select的方式进行导入,但是不适用于数据量大的表,除非自己手动做分片进行多次导入 。
flink提供了doris connector进行数据写入,实际是stream load方式(每个checkpoint都会开启一个streamload,期间sink算子接收的数据会通过http chunked机制流式往BE写),这个方式受streaming_load_max_mb
参数限制,即一次checkpoint周期内数据的大小上限,因此我们期望的是流式读取postgres源表,同时周期性地触发checkpoint,每次checkpoint都将大表的部分数据归到一个streamload往doris提交,这样就能持续稳定地同步大表。
总的来说,在source算子中读取大表时需要支持两点:
- 流读数据,不能把数据一次性全部加载出来
- 在流读数据期间,可以触发并完成checkpoint
flink cdc connector
如果使用的是flink-cdc 2.4+ 版本,那么这是最好的方式。postgres cdc在这个版本才接入了增量快照框架,除了满足上述条件,还支持并行读取,断点续传等功能。
如果cdc版本小于2.4,是直接基于debezium实现 ,在全量同步阶段一直持有checkpoint lock,导致无法进行checkpoint.
- 解决方法一
对于千万级这种不是太大的表,可以预估时间增加checkpoint间隔,或者调整execution.checkpointing.tolerable-failed-checkpoints
,确保在数据全部读完后才触发第一次成功的checkpoint,同时由于全量数据都在同一个checkpoint提交到doris了,所以还需要适当调整streaming_load_max_mb
。不公限制大,还很麻烦。
- 解决方法二
全量阶段不再持有checkopointLock,支持checkpoint,并忽略在全量阶段触发的checkpoint
com.ververica.cdc.debezium.internal.DebeziumChangeFetcher#runFetchLoop:
if (isInDbSnapshotPhase) {
List<ChangeEvent<SourceRecord, SourceRecord>> events = handover.pollNext();
LOG.info("Database snapshot phase can perform checkpoint, not acquired Checkpoint lock.");
handleBatch(events);
while (isRunning && isInDbSnapshotPhase) {
handleBatch(handover.pollNext());
}
LOG.info("Received record from streaming binlog phase, none checkpoint lock to be released.");
}
com.ververica.cdc.debezium.DebeziumSourceFunction#snapshotState:
if (this.debeziumChangeFetcher == null
|| this.debeziumChangeFetcher.isInDbSnapshotPhase()) {
LOG.info(
"Is snapshot phase, skip checkpoint {}.",
functionSnapshotContext.getCheckpointId());
return;
}
相比于2.4版本,只能是单并行度读,且在job异常时可能需要整表重新导入,如果只想执行全量阶段,可以配置 debezium.snapshot.mode=initial_only,不方便升级的话这种方式也凑合,数据慢慢同步。
flink jdbc connector
基于jdbc连接器生成的是有界的流,source算子在数据读取过程中是支持checkpoint的,因为在InputFormatSourceFunction#run
中是不会获取checkpoint锁。在使用该连接器读postgresql表时,需要注意两个配置:
- scan.auto-commit设置为false
- scan.fetch-size设置合理大小
这样才能实现流读的效果,不至于一次性把数据加载到ResultSet中。同时jdbc连接器还支持表按指定的列进行分区,以支持并行拉取数据,要注意的是,当将表分区并行读,如果source算子某些task分配不到分片或者数据倾斜而造成只有部分task处理RUNNING状态,那将一直触发不了checkpoint.
Failed to trigger checkpoint for job d8beca3ec399435cf54b09d029a75543 since Some tasks of the job have already finished and checkpointing with finished tasks is not enabled. Failure reason: Not all required tasks are currently running…
所以这种方式同步数据到doris,不能多并行度读。
自定义SourceFunction
- 继承
SourceFunction
,不能同步checkpoint锁 - 设置非自动提交及fetch size实现流读
- 禁止事务空闲超时idle_in_transaction_session_timeout=0
- 转换postgresql中timestamp类型列格式
- 输出json字符串格式给到doris sink.
@Override
public void run(SourceContext ctx) throws Exception {
Class.forName("org.postgresql.Driver");
Connection con = DriverManager.getConnection(url,usr,pwd);
con.setAutoCommit(false); // 结合fetch size实现流读pg
Statement stmt = con.createStatement();
stmt.setQueryTimeout(Integer.MAX_VALUE);
stmt.execute("SET idle_in_transaction_session_timeout = 0");
stmt.setFetchSize(10000);
try {
ResultSet rs = stmt.executeQuery(sql);
ResultSetMetaData md = rs.getMetaData();
int columnCount = md.getColumnCount(); // 获得列数
HashMap<String, String> colTypes = new HashMap<>();
for (int i = 1; i <= columnCount; i++) {
colTypes.put(md.getColumnName(i), md.getColumnTypeName(i));
}
while (rs.next()) {
Map<String, Object> rowData = new HashMap<>();
for (String name : colTypes.keySet()) {
Object data = rs.getObject(name);
if (colTypes.get(name).equals("timestamp") && data != null) {
data = TemporalConversions.toLocalDateTime(data, ZoneId.of("Asia/Shanghai")).format(RFC3339_TIMESTAMP_FORMAT);
}
rowData.put(name, data);
}
ctx.collect(JSONUtil.toJsonStr(rowData));
}
} catch (Exception e) {
logger.error("source异常,执行失败:{}", sql, e);
}
}
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。 如若内容造成侵权/违法违规/事实不符,请联系我的编程经验分享网邮箱:veading@qq.com进行投诉反馈,一经查实,立即删除!