数据湖存储解决方案之Iceberg

2024-01-09 17:32:12

1.Iceberg是什么?

Apache Iceberg 是由 Netflix 开发开源的,其于2018年11月16日进入 Apache 孵化器,是 Netflix 公司数据仓库基础。Apache Iceberg设计初衷是为了解决Hive离线数仓计算慢的问题,经过多年迭代已经发展成为构建数据湖服务的表格式标准。

Iceberg 本质上是一种专为海量分析设计的表格式标准,可为主流计算引擎如 Presto、Spark 等提供高性能的读写和元数据管理能力。Iceberg 不关注底层存储(如 HDFS)与表结构(业务定义),它为两者之间提供了一个抽象层,将数据与元数据组织了起来。

Apache Iceberg官方网站是:Apache Iceberg

官方对Iceberg的定义如下:

Apache Iceberg is an open table format for huge analytic datasets. Iceberg adds tables to compute engines including Spark, Trino, PrestoDB, Flink, Hive and Impala using a high-performance table format that works just like a SQL table.

上述翻译过来就是:

Apache Iceberg是一种用于大型分析数据集的开放表格式。Iceberg使用一种高性能的表格式将表添加到计算引擎中,这些引擎包括Spark、Trino、PrestgreSQL、Flink、Hive和Impala,该格式的工作方式类似于SQL表。

Apache Iceberg作为一款新兴的数据湖解决方案在实现上高度抽象,在存储上能够对接当前主流的HDFS,S3文件系统并且支持多种文件存储格式,例如Parquet、ORC、AVRO。相较于Hudi、Delta与Spark的强耦合,Iceberg可以与多种计算引擎对接,目前社区已经支持Spark读写Iceberg、Impala/Hive查询Iceberg。

从Iceberg的定义中不难看出,它的定位是在计算引擎之下,又在存储之上。同时,它也是一种数据存储格式,Iceberg则称其为"table format"。因此,这类技术可以看作介于计算引擎和数据存储格式中间的数据组织格式,通过特定的方式将数据和元数据组织起来,所以称之为数据组织格式更为合理。

Iceberg用户体验

Iceberg避免了令人不愉快的惊喜。模式演化工作正常,并且不会意外地取消删除数据。用户无需了解分区即可获得快速查询。

  • 模式演化(Schema Evolution):支持添加、删除、更新或重命名操作,并且没有副作用。
  • 隐藏的分区(Hidden Partitioning):可以防止用户错误导致静默不正确的结果或极慢的查询。
  • 分区布局演化(Partition Layout Evolution):可以根据数据量或查询模式的变化来更新表格的布局。
  • 时间旅行(Time Travel)?:可以实现使用完全相同的表格快照进行可重现的查询,或者让用户轻松查看更改。
  • 版本回滚(Version Rollback):允许用户通过将表格重置为良好状态来快速纠正问题。

Iceberg 主要特性

  • ACID:具备 ACID 能力,支持 row level update/delete;支持 serializable isolation 与 multiple concurrent writers
  • Table Evolution:支持 inplace table evolution(schema & partition),可像 SQL 一样操作 table schema;支持 hidden partitioning,用户无需显示指定
  • 接口通用化:为上层数据处理引擎提供丰富的表操作接口;屏蔽底层数据存储格式差异,提供对 Parquet、ORC 和 Avro 格式支持

依赖以上特性,Iceberg 可帮助用户低成本的实现 T+0 级数据湖。

可靠性和性能

Iceberg专为大型表格而构建。在生产环境中,Iceberg被用于单个表格可能包含数十PB的数据,甚至这些庞大的表格也可以在没有分布式SQL引擎的情况下进行读取。

  • 扫描计划很快:读取表格或查找文件不需要分布式SQL引擎。
  • 高级过滤:使用分区和列级统计信息以及表格元数据来修剪数据文件。
  • 最终一致性:Iceberg旨在解决最终一致性云对象存储中的正确性问题。
  • 适用于任何云存储:通过避免列表和重命名操作减少HDFS中的名称节点拥塞。
  • 可串行化隔离:表格更改是原子的,读取者永远不会看到部分或未提交的更改。
  • 多个并发写入器:使用乐观并发性,并将重试以确保兼容更新成功,即使写入冲突。

开放标准

Iceberg被设计和开发为一个开放的社区标准,具有规范以确保跨语言和实现的兼容性。

Apache Iceberg是开源的,并在Apache Software Foundation进行开发。

核心能力

(1) 灵活的文件组织

  • 提供了基于流式的增量计算模型和基于批处理的全量表计算模型,批任务和流任务可以使用相同的存储模型(HDFS、OZONE),数据不再孤立,以构建低成本的轻量级数据湖存储服务;
  • Iceberg支持隐藏分区(Hidden Partitioning)和分区布局变更(Partition Evolution),方便业务进行数据分区策略更新;
  • 支持Parquet、ORC、Avro等存储格式。

(2) 丰富的计算引擎

  • 优秀的内核抽象使之不绑定特定引擎,目前在支持的有Spark、Flink、Presto、Hive;
  • Iceberg提供了Java native API,不用特定引擎也可以访问Iceberg表。

(3) 优化数据入湖流程

  • Iceberg提供ACID事务能力,上游数据写入即可见,不影响当前数据处理任务,这大大简化了ETL;
  • Iceberg提供upsert/merge into行级别数据变更,可以极大地缩小数据入库延迟。

(4) 增量读取数据处理能力

  • Iceberg支持通过流式方式读取增量数据,实现主流开源计算引擎入湖和分析场景的完善对接;
  • Spark struct streaming支持;
  • Flink table source支持;
  • 支持历史版本回溯。

2.Iceberg实现原理

Iceberg实现核心思想

Iceberg的核心思想,就是在时间轴上跟踪表的所有变化:

  • 快照表示表数据文件的一个完整集合。
  • 每次更新操作会生成一个新的快照。

Iceberg元数据管理:

Iceberg将数据进行分层管理,主要分为:元数据管理层数据存储层

元数据管理层又可以细分为三层:

  • Metadata File:存储当前版本的元数据信息(所有snapshot信息)
  • Snapshot:表示当前操作的一个快照,每次commit都会生成一个快照,一个快照中包含多个Manifest。
  • Manifest:记录了当前操作生成数据所对应的文件地址,也就是data files的地址。

基于snapshot的管理方式,Iceberg能够进行time travel(历史版本读取以及增量读取),并且提供了serializable isolation。
数据存储层支持不同的文件格式,目前支持Parquet、ORC、AVRO。

3.数据文件结构

总体来讲Iceberg分为两部分数据,第一部分是数据文件,如下图中的 parquet 文件。第二部分是表元数据文件(Metadata 文件),包含 Snapshot 文件(snap-*.avro)、Manifest 文件(*.avro)、TableMetadata 文件(*.json)等。

元数据文件

其中metadata目录存放元数据管理层的数据,表的元数据是不可修改的,并且始终向前迭代;当前的快照可以回退。

  • Table Metadata:version[number].metadata.json:存储每个版本的数据更改项。
  • 快照(SnapShot):snap-[snapshotID]-[attemptID]-[commitUUID].avro,存储快照snapshot文件。
  • 清单文件(Manifest File):[commitUUID]-[attemptID]-[manifestCount].avro,manifest文件

快照:

快照代表一张Iceberg表在某一时刻的状态。也被称为清单列表(Manifest List),里面存储的是清单文件列表,每个清单文件占用一行数据。清单列表文件以snap开头,以avro后缀结尾,每次更新都产生一个清单列表文件。每行中存储了清单文件的路径。

清单文件里面存储数据文件的分区范围、增加了几个数据文件、删除了几个数据文件等信息。数据文件(Data Files)存储在不同的Manifest Files里面,Manifest Files存储在一个Manifest List文件里面,而一个Manifest List文件代表一个快照。

清单文件:

是以avro格式进行存储的,以avro后缀结尾,每次更新操作都会产生多个清单文件。其里面列出了组成某个快照(snapshot)的数据文件列表。每行都是每个数据文件的详细描述,包括数据文件的状态、文件路径、分区信息、列级别的统计信息(比如每列的最大最小值、空值数等)、文件的大小以及文件里面数据的行数等信息。其中列级别的统计信息在 Scan 的时候可以为算子下推提供数据,以便可以过滤掉不必要的文件。

数据文件

data目录组织形式类似于hive,都是以分区进行目录组织。

Iceberg的数据文件通常存放在data目录下。一共有三种存储格式(Avro、Orc和Parquet),主要是看选择哪种存储格式,后缀分别对应avro、orc或者parquet。在一个目录,通常会产生多个数据文件。

4.Iceberg实现细节

4.1快照设计方式

快照隔离

  • 读操作仅适用当前已生成快照;
  • 写操作会生成新的隔离快照,并在写完成后原子性提交。

增量读取数据

Iceberg的每个snapshot都包含前一个snapshot的所有数据,每次都相当于全量读取数据,对于整个链路来说,读取数据的代价是非常高的。

如果我们只想读取当前时刻的增量数据,就可以根据Iceberg中Snapshot的回溯机制来实现,仅读取Snapshot1到Snapshot2的增量数据,也就是下图中的紫色数据部分。

同理,S3也可以只读取红色部分的增量数据,也可以读取S1-S3的增量数据。

Iceberg支持读写分离,也就是说可以支持并发读和增量读。

原子性操作

对于文件列表的所有修改都是原子操作。

  • 在分区中追加数据;
  • 合并或是重写分区。

注意:Iceberg是以文件为粒度提交事务的,所以就没有办法做到以秒为单位提交事务,否则会造成文件数据量膨胀。
比如Flink是以CheckPoint为写入单位,物理数据在写入Iceberg之后并不能被直接查询,只有当触发了CheckPoint时才会写metadata,这时数据才会由不可见变成可见。而每次CheckPoint执行也需要一定的时间。

4.2 事务性提交

写操作要求

  • 记录当前元数据的版本--base version;
  • 创建新的元数据以及manifest文件;
  • 原子性的将base version 替换为新的版本。

原子性替换保证了线性的历史;

原子性替换需要依靠以下操作来保证。

冲突解决--乐观锁

  • 假定当前没有其他的写操作;
  • 遇到冲突则基于当前最新的元数据进行重试;
  • 元数据管理器所提供的能力;
  • HDFS或是本地文件系统所提供的原子化的rename能力。

5.Iceberg结合Flink场景

5.1 构建近实时Data Pipeline

Flink+Iceberg最经典的一个场景就是构建实时的Data Pipeline。业务端产生的大量日志数据,被导入到Kafka这样的消息队列。运用Flink流计算引擎执行ETL后,导入到Apache Iceberg原始表中。有一些业务场景需要直接跑分析作业来分析原始表的数据,而另外一些业务需要对数据做进一步的提纯。那么我们可以再新起一个Flink作业从Apache Iceberg表中消费增量数据,经过处理之后写入到提纯之后的Iceberg表中。此时,可能还有业务需要对数据做进一步的聚合,那么我们继续在Iceberg表上启动增量Flink作业,将聚合之后的数据结果写入到聚合表中。

该场景也可通过Flink+Hive方式实现,但Flink长期高频率地写入会造成Partition膨胀。而Iceberg容许实现1分钟甚至30秒的增量写入,这样就可以大大提高了端到端数据的实时性,上层的分析作业可以看到更新的数据,下游的增量作业可以读取到更新的数据。

5.2?CDC数据实时摄入摄出

可以用Flink+Iceberg来分析来自MySQL等关系型数据库的binlog等。一方面,Apache Flink已经原生地支持CDC数据解析,一条binlog数据通过ververica flink-cdc-connector拉取之后,自动转换成Flink Runtime能识别的INSERT、DELETE、UPDATE_BEFORE、UPDATE_AFTER四种消息,供用户做进一步的实时计算。

此外,CDC数据成功入湖Iceberg之后,还会打通常见的计算引擎,例如Presto、Spark、Hive等,他们都可以实时地读取到Iceberg表中的最新数据。

针对CDC场景而言,Hudi在这方面的能力比Iceberg更成熟,所以厂商更多选用Hudi。

5.3?从Iceberg历史数据启动Flink任务

采用Iceberg全量数据和Kafka的增量数据来驱动新的Flink作业。如果需要过去很长时间例如一年的数据,可以采用常见的Lambda架构,离线链路通过kafka->flink->iceberg同步写入到数据湖,由于Kafka成本较高,保留最近7天数据即可,Iceberg存储成本较低,可以存储全量的历史数据,启动新Flink作业的时候,只需要去拉Iceberg的数据,跑完之后平滑地对接到kafka数据即可。

5.4?通过Iceberg数据来修正实时聚合结果

在Lambda架构下,实时链路由于事件丢失或者到达顺序的问题,可能导致流计算端结果不一定完全准确,这时候一般都需要全量的历史数据来订正实时计算的结果。而我们的Iceberg可以很好地充当这个角色,因为它可以高性价比地管理好历史数据。

5.5?近实时场景的流批统一

原有的lambda架构,分为离线链路和实时链路。可通过iceberg进行存储中间数据,Flink统一进行计算,实现近实时场景的流批一体。

6.Iceberg的优缺点

无论是用于加速数据的可见性、构建CDC还是用Iceberg替代Hive表的低效查询,都会带来一定的性能提升。

使用Iceberg构建湖仓的优点主要包括:

  • 高效的数据存储和处理:Iceberg支持实时/批量数据写入和读取,支持多种计算引擎,包括Spark、Flink等。它还支持事务ACID,可以高效地处理大规模数据。
  • 灵活的数据组织:Iceberg支持隐藏分区和分区变更,方便业务进行数据分区策略。同时,它还支持快照数据重复查询,具备版本回滚功能。
  • 高效的查询性能:Iceberg通过表元数据来查询进行高效过滤,扫描计划非常快,可以快速返回查询结果。它还支持星型模型的数据分布组织,支持按照维度表字段对事实表数据进行排序组织和索引,进一步提高了查询效率。
  • 兼容性和集成性:Iceberg与传统的SQL on Hadoop技术栈兼容,可以作为Hive表存储格式,无需额外的学习和认知门槛。同时,它也可以与已有的大数据平台工具和服务进行集成,降低了集成成本。
  • 并发写入和数据一致性:Iceberg使用基于乐观锁的并发控制机制,提供了多线程并发写入能力并保证数据线性一致。

然而,使用Iceberg构建湖仓也存在一些缺点:

  • 版本控制:虽然Iceberg提供了快照机制,但并未提供类似HDFS的版本控制机制,对于需要频繁修改的数据集可能不太适用。
  • 数据一致性问题:在分布式环境下,由于种种原因可能导致数据一致性问题。虽然Iceberg提供了快照机制,但仍然需要谨慎处理数据一致性问题。
  • 生态成熟度不足:相比于一些成熟的开源项目,Iceberg的生态成熟度相对较低,还需要进一步完善和改进。
  • 社区规模较小:Iceberg的社区规模相对较小,这可能会影响到项目的长期发展和维护。
  • 依赖项较多:Iceberg依赖于较多的其他开源项目和组件,这可能会增加部署和维护的难度。
  • 适用场景有限:虽然Iceberg适用于大规模的分析场景,但对于需要实时处理和分析的场景可能不太适用。同时,它还需要在数据隐私和安全方面进一步加强。
  • 文档和社区支持:虽然Iceberg的文档相对较为完善,但社区支持和活跃度还有待提高。对于新手用户来说,可能会遇到一些困难和问题需要寻求帮助。

数据治理方面iceberg也存在一些缺点:

  • 海量小文件:Iceberg每次commit时都会生成大量文件,要求的commit时间越短,它的小文件就会越多,几天过去,这张表的小文件数可能达到几百万,甚至上千万,这个时候再去查询,Query Plan就会跑不动,变得非常慢。
  • Query Plan时延:Iceberg保存了多副本,每一次commit都会产生一个元数据的快照,快照里面包含了很多信息,元数据的数量将越来越大。如果未做一些元数据的清理或者合并,那么只是生成执行计划就需要大量耗时。

当单链路的数据量达到分钟级,每日达到万亿规模时,湖仓一体的性能问题就需要格外重视。

  • 平衡读写性能:写和读的对于性能的要求不同,如何能够平衡写和读是非常重要的一个问题。
  • 发挥极速性能:Iceberg和Hudi很多高阶的特性,比如索引之类,如何应用其高阶特性也是一个重要问题。

湖仓一体实时性限制

抛开内核,无论是Iceberg还是Hudi,本质上都是海量文件的组织方式,无法摆脱存储的限制,我们通常会把它存到内部的HDFS上,云上则会存到对象存储中。但对象存储也有它的限制,吞吐量较大,但延迟会较高。

如果需要流读,我们通常在构建实时链路的时候,会选择消息队列,它的存储模型完全不同,是低延迟高响应,顺序读写。它的存储能力决定了计算,流式计算的访问方式和离线计算的访问方式不同。

这个时候就会出现两个问题:

  • 如何平衡流式的访问和批的访问?既能做到高性能和高效,又能做到低成本?
  • 传统的Iceberg和Hudi,实现分钟级已经接近极限,如果继续加速该如何优化?

7.各厂商基于iceberg的湖仓架构

由于Iceberg的特性,大多数的厂商都是基于iceberg构建湖仓。针对Iceberg的缺点,各厂商也对内核进行了优化,保证湖仓的稳定运行。

7.1 腾讯

腾讯构建的近实时流批一体架构如下图所示。腾讯针对Iceberg的不足对内核做了大量的优化。功能上增加了大宽表支持、跨源查询支持、流转批、流式写入支持去重、增量读取、流量控制等。性能上完成元数据读取加速、复杂类型列剪支优化、V2表layout改进与合并加速、向量化,Async-IO,CBO等查询加速。通过小文件合并、自动重分布优化、自动优化实现自动数据治理。

参考Paimon方案,腾讯做了类似的方案。在这个方案中,流和批选择了不同的存储,流选择使用消息队列,批则是底层使用数据湖的格式,封装在一起就成为了流批表。有了流批表,则能够对外提供统一的流和批的读写接口。

针对Flink场景,写的时候会双写到LogStore和Filestore这两个系统中,根据不同的场景读不同的系统。如果是流式则读LogStore,批则读Filestore。

7.2 网易数帆

网易主要研发了 Arctic 产品,它是基于 Iceberg 去构建湖仓一体的系统。其定位是在 hive 和 Iceberg 之上,在计算引擎之下的一个 TableService, 并提供表结构优化以及 Kafka 以及 redis, Hbase 等 KV 存储封装的实时湖仓系统。

Arctic 的优势首先是基于 iceberg,兼容 iceberg 所有功能,同时对 hive 兼容性好,在短时间业务升级阻力更低;支持动态调度自动触发合并任务,提供分钟级别延迟数仓的 merge on read;在开启 hidden on queue 的情况下,提供流批一体的功能包括秒级实时订阅和实时 join;还提供方便管理的运维平台,方便业务更快的上手。

网易选择使用 Apache Iceberg,主要考虑是因为 Iceberg 本身的元数据管理是面向文件的,有非常全的 manifest 机制,可以把表中的所有文件管理起来,Iceberg 作为底座提供了 ACID 的事务保证以及 MVCC 功能,可以保证数据的一致性,同时又具有可扩展性。在 Iceberg 的基础上,团队又自研了实时摄取、文件索引、数据合并,以及一整套元数据管理服务。

底层技术选型方面,Hudi 的文件索引采用的是 Bloomfilter 以及 HBase 的机制,这两种机制都不是特别理想,HBase 需要引入第三方 KV 数据库,对商业输出不利,而 Bloomfilter 比较重,会让实时性大打折扣,因此都不太适合网易数帆的技术选型。网易数帆对 Arctic 核心功能的想法和设计,也跟 Hudi 有出入。而没选 Delta Lake 则是因为它对实时性并不是看得很重。网易数据团队通过研究相关论文发现,Delta Lake 更多还是把 Spark 的生态作为第一优先级,这与团队做湖仓一体的目标还是有一些区别。相比之下,Iceberg 相对更开放,对计算引擎的集成、对上层元数据的集成、对不同系统的集成都做得比较好,可以满足团队高度定制化的需求。

网易集团内部主要进行 Lambda 架构的改造,而针对企业客户,主要的实践是 Kappa 架构。

7.3 网易严选

严选根据当时社区发展情况和严选当时的需求场景最终选择了Iceberg,主要考虑因素如下:

  • Hudi在严选方案调研期间和spark是强绑定,同期与同样依赖spark的Deltalake相比功能并不是很完善(hudi现在已经不强依赖spark)。
  • DeltaLake功能完善,merge功能也非常简单易用,非常适合严选的binlog同步场景。但是它需要用spark streaming来做数据同步,而在严选流式计算主要是flink计算引擎,两者在长期发展路线上并不匹配。
  • Iceberg定位是一种表格式,其在设计上做了很好的抽象,没有强绑定计算引擎和存储组件,并且当时社区版本也支持upsert等功能。

严选通过缓存统计信息过滤优化、小文件合并、重写deleteFile、重排序等一系列优化,大大提高查询效率。

未来,严选在查询体验上,计划让presto也接入iceberg的支持,引入Alluxio缓存来加速元数据的加载和缓存数据,加入Z-order数据重排序和Bloom-Filter文件索引等功能提升查询效率。另外把文件监控、健康检查等功能产品化以提升易用性。

7.4 Bilibili

B站同样选择Iceberg,因为Iceberg在三个里面是表存储格式抽象的最好的,包括读写引擎、Table Schema、文件存储格式都是pluggable的,可以进行比较灵活的扩展,并保证和开源以及之前版本的兼容性。

下图是B站整体的湖仓一体架构,支持开放的Spark、Flink等引擎从Kafka、HDFS接入数据,然后Magnus服务会异步地拉起Spark任务对Iceberg数据进行重新的存储组织优化,主要是用Trino作为查询引擎,并引入Alluxio做Iceberg的元数据和索引数据的缓存加速。

Magnus是B站湖仓一体架构的核心组件,它负责管理优化所有的Iceberg表中的数据。Iceberg本身是一个表存储格式,虽然其项目本身提供了基于Spark、Flink等用于合并小文件,合并metadata文件或者清理过期Snapshot数据等Action Job,但是要依赖外部服务调度这些Action Job,而Magnus正是承担这个角色。B站对Iceberg进行了扩展,当Iceberg表发生更新的时候,会发送一个event信息到Magnus服务中,Magnus服务维护一个队列用于保存这些commit event信息,同时Magnus内部的Scheduler调度器会持续消费event队列,并根据对应Iceberg表的元数据信息及相关的策略决定是否及如何拉起Spark任务优化Iceberg表的数据组织。

基于Iceberg的湖仓一体方案在B站的数据分析场景正逐渐落地,目前已经支撑PB级的数据量,每天响应几万个查询,其中P90的查询可以在1s内响应,满足了多个运营分析数据服务交互式分析的需求。

7.5 汽车之家

汽车之家首先打通Append入湖链路、Flink SQL 入湖链路,然后内部对接预算体系、权限体系等。在对接过程中通过方法重写优化解决iceberg小文件等问题,从而实现湖仓的构建,保证湖仓的性能。

文章来源:https://blog.csdn.net/marui156/article/details/135455585
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。