Impala4.x源码阅读笔记(二)——Impala如何高效读取Iceberg表

2023-12-13 16:55:08

前言

本文为笔者个人阅读Apache Impala源码时的笔记,仅代表我个人对代码的理解,个人水平有限,文章可能存在理解错误、遗漏或者过时之处。如果有任何错误或者有更好的见解,欢迎指正。

Iceberg表是一种用于存储大规模结构化数据的开源表格式,旨在提供高效的数据存储和丰富的查询能力。不同于Parquet,Orc等文件格式定义了数据如何在文件中存储和索引,Iceberg作为一种表格式定义的是数据文件如何组织,换句话说就是一系列的数据文件如何构成一张表以及我们如何从大量数据文件中找到我们需要的。Iceberg表支持事务性写入和多版本并发控制,这使得它适合于需要大规模数据存储和高效查询的数据湖或数据仓库环境。Iceberg表还提供了对数据架构变化的良好支持,使得数据架构的演进变得更加容易。

Iceberg表的基本结构包括数据文件和元数据文件两部分,这些文件被组织在目录结构中,其中数据文件是实际存储数据的地方,支持Parquet、Orc等业界主流数据文件格式。元数据文件除了记录表的结构信息和演进记录外,还有一种Manifest文件用于索引数据文件,支持Iceberg使用快照(Snapshot)的概念来维护表的版本信息,可以快速定位某个版本的表包括了哪些数据文件,这使得我们可以方便地在Iceberg表上进行时间旅行查询和回滚操作。另外在Iceberg V2格式的表中,还在之前V1格式表的基础之上增加了数据的行级更新与删除能力,通过写入专门的Delete File和MOR(Merge On Read)技术,可以在不重写现有数据文件的前提下实现行级别的删除。

在impala步入4.x的大版本后,对Iceberg表的支持一直是社区关注的重点。在社区的重点投入下,截止Apache Impala 4.3.0版本,Impala对Iceberg表的支持度已经相当高了,除了建表删表修改字段等常规表支持的操作外,对Iceberg特有的时间旅行、版本回滚、清理快照也进行了支持,另外还提供了从Hive表迁移到Iceberg的功能。社区目前正在大力推进对于Iceberg V2表的支持,目前已经完整支持了对Iceberg表的行级Delete和MOR读取。Impala在实现MOR时没有使用Iceberg API提供的读取方法,而是使用了自身由C++实现的执行引擎进行读取。得益于Impala本身对于HDFS+Parquet表的长期优化,使得Impala在Iceberg的MOR读取性能方面也具有优势。

本文主要根据源码就Impala如何高效读取地Iceberg表进行分析,这里的读取包括了同时包括了对Iceberg表的时间旅行和MOR的支持。分析的过程中着重于扫描Iceberg表的执行计划如何制定,以及期间做了哪些优化。

准备工作

为了分析Impala如何读取Iceberg表,我们可以从一个简单但是功能覆盖充分的例子入手。首先我们使用Impala创建一张Iceberg V2表,并进行一些数据写入和删除操作:

-- 创建一张V2格式的Iceberg表
CREATE TABLE ice_v2 (id int, name string) STORED BY ICEBERG TBLPROPERTIES('format-version'='2');
-- 插入最初的两条数据
INSERT INTO ice_v2 VALUES (1, 'a'), (2, 'b');
-- 删除id为2的一条数据
DELETE FROM ice_v2 WHERE id = 2;
-- 再插入一条数据
INSERT INTO ice_v2 VALUES (3, 'c');

现在我们就准备好了一张包含Delete数据的Iceberg表了,因为我们总共进行了三次DML操作,现在这张Iceberg表就有3个快照了。在Impala中可以通过执行DESCRIBE HISTORY语句查看Iceberg表历史版本:

在这里插入图片描述

也可以看见数据文件中有两个Insert文件和一个Delete文件,都使用Parquet格式储存:

在这里插入图片描述

为了后续一些概念的理解,我们再看下这三个文件中的数据:

在这里插入图片描述

可以发现Insert文件中的数据和我们执行的Insert语句是对应的,而Delete文件中的数据这是另外的Schema,记录的是删除的行所在的文件和位置。在有了这些信息之后,我们就可以开始分析Impala是如何读取一张Iceberg表的了。

执行计划

首先我们先看一下扫描Iceberg表的执行计划长什么样,执行如下SQL:

-- Result: (3, 'c'), (1, 'a')
SELECT * FROM ice_v2 FOR SYSTEM_VERSION AS OF 5109113003992490801

其中FOR SYSTEM_VERSION AS OF子句就是对Iceberg表进行时间旅行查询使用的,5109113003992490801DESCRIBE HISTORY中显示的Iceberg表最新的快照ID。这样我们可以指定一个Iceberg快照版本进行查询,当然不指定时Impala会默认查询最新的快照,因此实际上这个查询子句加与不加结果是一样的,只是为了后面触发时间旅行查询相关的逻辑。我们也可以用FOR SYSTEM_TIME AS OF子句来指定一个时间点来查询这个时间点时这张Iceberg表的数据,就像进行了时间旅行穿越回过去直接查询这张表一样,也就是所谓的时间旅行查询了。虽然这只是一个简单的SELECT *查询,但是它在Impala中的执行计划却不简单:

在这里插入图片描述

从SQL角度来说,一条SQL在Impala中的处理过程逻辑上大体可以分为四个阶段,解析、分析、计划与执行。其中解析就是解析SQL,通过SQL Parser将SQL字符串转换为语句类StatementBase的子类对象,比如说上文中的查询会被转换为SelectStmt,其中又包含SelectListFromClause等子句部分,FromClause又包括了一个TableRef的列表表示FROM子句后面的各个表或者类似于表的对象,在TableRef中又包括一个TimeTravelSpec对象对应了我们的时间旅行子句。这些语句对象都实现了自己的analyze方法,会在后面的分析过程中调用。比如说SelectStmtanalyze会调用FromClauseanalyze,同时自己还会进行星号展开、注册Slot等工作。FromClauseanalyze则会分析路径、建立别名并调用各个TableRefanalyze方法。在TableRefanalyze方法中除了分析Join、Hints之外也会调用TimeTravelSpecanalyze方法。TimeTravelSpecanalyze方法则会计算AS OF后的表达式得到目标时间旅行的版本或者时间,为后续索引数据文件做准备。

经过了复杂的分析之后就可以进入计划阶段了,计划阶段的任务就是根据语句类对象和先前的分析结果给查询制定一个执行计划。执行计划是一个由各种PlanNode的子类对象构成的一个树状结构PlanTree,它会指导查询如何进行执行,具体包括执行需要哪些结点参与,结点间的数据流向等等。执行计划会被转为Thrift结构体传递给Impala的C++执行引擎进行执行,执行时PlanNode会转变为对应ExecNode的子类对象,每个结点都负责各自对应的工作,比如说ScanNode负责一张表的扫描任务,是树中的叶子结点。而JoinNodeUnionNode分别负责连接任务与合并任务,都有多个输入和一个输出。数据在结点间以行批的形式传递,最终汇聚到根结点并返回给客户端完成查询执行。

计划阶段又可以大体分为两部分,首先是制定单点执行计划,给出一颗完整的执行计划树。但是单点执行计划还不足以指导查询执行,Impala作为一个MPP架构的执行引擎可以在分布式集群的多个执行者上并发执行查询,因此还需要将单点执行计划转变为分布式执行计划。分布式执行计划实际上就是将单点执行计划切分为多个片段Fragment,并在其间插入一些交换节点ExchangeNode用于在Fragment之间传递数据。在上面的执行计划树图中我们可以看到结点间有虚线和实线两种连接方式,实线连接的结点就是同属于一个Fragment的,而虚线连接的是不同的FragmentFragment是查询执行阶段可以调度的最小执行单元,调度器可以根据Fragment的性质将其调度到一个或多个执行者上创建执行实例Instance并开始实际执行。比如说对于只有一个文件的扫描结点的Fragment,我们只需要将其调度到一个执行者上执行就够了,而那些扫描文件很多或者需要高并发执行的Fragment,我们则需要将其调度到多个执行者并发加速执行。

在Impala的执行计划中一般都是一个ScanNode对应负责一张表的扫描任务,而从图中可以发现,我们为了扫描这张Iceberg表却出动了三个ScanNode,甚至还有一个JoinNode和一个UnionNode,这主要是因为Iceberg表中有Delete文件出现了。我们可以对比看一下没有Delete文件时的Iceberg表扫描查询计划是怎样的,执行SQL:

-- Result: (1, 'a'), (2, 'b')
SELECT * FROM ice_v2 FOR SYSTEM_TIME AS OF '2023-12-07 16:00:05.5';

这次我们使用时间旅行回到表进行了第一次插入但是还没有进行Delete的时间点进行查询,执行计划如下:

在这里插入图片描述

这回就是简简单单一个ScanNode了。Impala究竟是如何为Iceberg表制定扫描计划的?我们可以从代码中找到答案。

代码分析

本文要分析的Iceberg表的扫描计划的制定是单点执行计划的制定的一部分,完整的执行计划制定代码是十分庞大和复杂的,幸运的是我们只需要关注其中的一小部分就能得到需要的答案。首先我们先快速深入的目标代码的位置,调用路径如下:

getPlannedExecRequest() -> createExecRequest() -> createPlans() -> createPlanFragments()
-> createSingleNodePlan() -> createQueryPlan() -> createSelectPlan() -> createTableRefsPlan()
-> createTableRefNode() -> createScanNode() {
  // 如果TableRef是实际的表就会调用createScanNode()方法为其创建扫描结点
  ...
  FeTable table = tblRef.getTable();
  // 通过表的类型选择对应的方法创建ScanNode
  // Iceberg表是FeIcebergTable同时也实现了FeFsTable接口,同时也是一种文件系统表(Hdfs表)
  if (table instanceof FeFsTable) {
    if (table instanceof FeIcebergTable) {
      // 对于Iceberg表有专门的IcebergScanPlanner创建扫描结点
      // IcebergScanPlanner构造时需要传入分析信息、查询上下文、表引用、连词和聚合信息
      // 同时还会在构造方法中完成谓词抽取,即将conjuncts中可以下推到Iceberg API中的谓词抽取出来并转换为Iceberg的谓词对象
      IcebergScanPlanner icebergPlanner = new IcebergScanPlanner(analyzer, ctx_, tblRef,
          conjuncts, aggInfo);
      return icebergPlanner.createIcebergScanPlan();
    }
    return createHdfsScanPlan(tblRef, aggInfo, conjuncts, analyzer);
  } else if (table instanceof FeDataSourceTable) {
  ...
}

接下来就进入正题了,我们看下IcebergScanPlanner::createIcebergScanPlan()是如何给Iceberg表创建扫描计划的:

public PlanNode createIcebergScanPlan() throws ImpalaException {
  // 首先是一个重要的判断,决定是否需要调用Iceberg的planFiles() API来列出待扫描文件
  // needIcebergForPlanning()等价于 !impalaIcebergPredicateMapping_.isEmpty() || tblRef_.getTimeTravelSpec() != null;
  // 即如果有谓词下推或者有时间旅行子句的话,我们是需要调用planFiles()来列出待扫描文件的
  // 因为这种情况下要扫描的文件往往不是最新快照的全部数据文件,而是其的一个子集或者是其他快照版本包含的数据文件
  if (!needIcebergForPlanning()) {
    // 为连词中引用到的字段进行槽位物化,槽位物化可以理解为在数据行的内存定义中保留一块内存存放对应字段的数据
    // 因为谓词评估需要使用这些字段进行计算,所以即使select list中没有它们,我们必须在数据行定义中给它们预留位置
    analyzer_.materializeSlots(conjuncts_);
    // 对于不需要planFiles()的扫描,我们可以直接调用setFileDescriptorsBasedOnFileStore()方法
    // 使用缓存的最新快照的文件集合FileStore来设置文件描述符FileDescriptors,用于之后的扫描计划创建
    setFileDescriptorsBasedOnFileStore();
    // 然后调用createIcebergScanPlanImpl()进行具体的扫描计划创建
    return createIcebergScanPlanImpl();
  }

  // 对于进行了谓词下推或包含时间旅行的查询,我们无法根据缓存的最新快照的文件集合来得到需要扫描的FileDescriptors
  // 需要调用filterFileDescriptors()来根据谓词或时间旅行子句来过滤得到需要FileDescriptors
  filterFileDescriptors();
  // 调用filterConjuncts()来过滤谓词,因为在filterFileDescriptors()中我们可能已经将部分谓词下推到Iceberg中了
  // 这部分谓词不需要在执行时再次计算,因为Iceberg API已经在文件层面将不符合谓词的数据文件都过滤了
  // 剩余需要扫描的文件中的数据肯定符合这些已经下推的谓词,因此我们需要将其过滤掉,避免扫描时多余的谓词计算
  // 只将那些没有被下推成功的谓词保留,并传递给扫描结点,由Impala的执行引擎负责计算和过滤数据
  filterConjuncts();
  analyzer_.materializeSlots(conjuncts_);
  return createIcebergScanPlanImpl();
}

对于上文中有时间旅行子句查询,没法进入createIcebergScanPlan()中的if分支,需要调用filterFileDescriptors()来得到应该扫描的文件,我们继续看下其是如何实现的:

private void filterFileDescriptors() throws ImpalaException {
  TimeTravelSpec timeTravelSpec = tblRef_.getTimeTravelSpec();
  // 调用IcebergUtil::planFiles()并传递Iceberg表、之前抽取并转换为Iceberg谓词的谓词列表和时间旅行描述对象
  // 其封装了Iceberg API的TableScan::planFiles()方法,可以将谓词和时间旅行版本传递给Iceberg API
  // 其返回的fileScanTasks就包含了所有我们需要扫描的数据文件,包括Delete文件
  try (CloseableIterable<FileScanTask> fileScanTasks =
      IcebergUtil.planFiles(getIceTable(),
          new ArrayList<>(impalaIcebergPredicateMapping_.keySet()), timeTravelSpec)) {
    long dataFilesCacheMisses = 0;
    for (FileScanTask fileScanTask : fileScanTasks) {
      // 遍历每个FileScanTask,首先处理残余表达式,也就是没有被下推到Iceberg的谓词
      // 这些谓词无法在文件级别应用,需要Impala的扫描结点在行级别应用,将其先全部加入到集合residualExpressions_中
      Expression residualExpr = fileScanTask.residual();
      if (residualExpr != null && !(residualExpr instanceof True)) {
        residualExpressions_.add(residualExpr);
      }
      // 调用getFileDescriptor()来获取文件描述符FileDescriptor和是否命中缓存的标志
      // getFileDescriptor()中封装了一层缓存文件描述符的逻辑,即使未命中缓存也会创建新的并添加到缓存
      Pair<FileDescriptor, Boolean> fileDesc = getFileDescriptor(fileScanTask.file());
      if (!fileDesc.second) ++dataFilesCacheMisses;
      // 如果这个文件没有对应的Delete文件,我们将其加入到无Delete文件的数据文件列表dataFilesWithoutDeletes_中
      if (fileScanTask.deletes().isEmpty()) {
        dataFilesWithoutDeletes_.add(fileDesc.first);
      } else {
        // 否则将其加入到有Delete文件的数据文件列表dataFilesWithDeletes_中
        dataFilesWithDeletes_.add(fileDesc.first);
        // 同时还需要遍历其所有的Delete文件,将其加入到Delete文件列表deleteFiles_中
        for (DeleteFile delFile : fileScanTask.deletes()) {
          // 截止本文撰写时,Impala还未支持等值删除EQUALITY_DELETES,只支持位置删除POSITION_DELETES
          // 如果发现目标表有等值删除文件,我们无法扫描,需要引发一个异常
          if (delFile.content() == FileContent.EQUALITY_DELETES) {
            throw new ImpalaRuntimeException(String.format(
                "Iceberg table %s has EQUALITY delete file which is currently " +
                "not supported by Impala, for example: %s", getIceTable().getFullName(),
                delFile.path()));
          }
          Pair<FileDescriptor, Boolean> delFileDesc = getFileDescriptor(delFile);
          if (!delFileDesc.second) ++dataFilesCacheMisses;
          deleteFiles_.add(delFileDesc.first);
        }
      }
    }
    if (dataFilesCacheMisses > 0) {
      Preconditions.checkState(timeTravelSpec != null);
      LOG.info("File descriptors had to be loaded on demand during time travel: " +
          String.valueOf(dataFilesCacheMisses));
    }
  } catch (IOException | TableLoadingException e) {
    throw new ImpalaRuntimeException(String.format(
        "Failed to load data files for Iceberg table: %s", getIceTable().getFullName()),
        e);
  }
  // 最后更新下Delete文件的统计信息,包括数据行数和行数据量,用于后续预估内存消耗等
  updateDeleteStatistics();
}

对谓词的处理并非本文的关注的重点,所以我们跳过filterConjuncts()方法,继续看关键的createIcebergScanPlanImpl()方法最后如何为Iceberg扫描创建扫描结点的:

private PlanNode createIcebergScanPlanImpl() throws ImpalaException {
  // 对于没有Delete文件的情况,我们只需要一个扫描结点来扫描数据文件就行了,也就是上文中第二个查询的情况
  if (deleteFiles_.isEmpty()) {
    Preconditions.checkState(dataFilesWithDeletes_.isEmpty());
    // 创建一个新的Iceberg扫描结点对象,并传递结点ID、表引用、连词列表、聚合信息、文件列表
    // 非IDENTITY列的连词列表和需要跳过连词列表,然后调用init()方法进行初始化
    // IcebergScanNode继承了HdfsScanNode,大部分逻辑也是复用的HdfsScanNode的
    // 所以很多Hdfs Scan的优化和功能也能在Iceberg扫描中使用
    PlanNode ret = new IcebergScanNode(ctx_.getNextNodeId(), tblRef_, conjuncts_,
        aggInfo_, dataFilesWithoutDeletes_, nonIdentityConjuncts_,
        getSkippedConjuncts());
    ret.init(analyzer_);
    return ret;
  }
  // 如果有Delete文件,那么一个扫描结点就不够用了,我们需要另外一个扫描结点专门负责扫描Delete文件
  // 然后还需要一个反连接结点将两者的数据进行ANTI JOIN,实现删除行的效果
  // 这部分逻辑由createPositionJoinNode()方法实现,我们后面分析,总之它会返回一个JoinNode
  PlanNode joinNode = createPositionJoinNode();

  // 如果先前的分析阶段发现这是可以进行优化的针对Iceberg V2表的count(*)查询
  // 则剩余的无Delete文件的数据文件的不需要实际扫描了,可从元数据中得到行数
  // 因此这里可以直接返回所有数据文件和所有删除文件之间的ANTI JOIN
  // 不需要处理后面的无Delete文件对应的数据文件,它们的行数会被一个ArithmeticExpr直接加到结果中
  if (ctx_.getQueryCtx().isOptimize_count_star_for_iceberg_v2()) return joinNode;

  // 当然,当所有数据文件都有相应的删除文件,即dataFilesWithoutDeletes_为空时
  // 我们也只需要返回所有数据文件和所有删除文件之间的ANTI JOIN
  if (dataFilesWithoutDeletes_.isEmpty()) return joinNode;

  // 如果还有无Delete文件对应的数据文件的话,我们则还需要这些文件创建一个扫描结点
  IcebergScanNode dataScanNode = new IcebergScanNode(
      ctx_.getNextNodeId(), tblRef_, conjuncts_, aggInfo_, dataFilesWithoutDeletes_,
      nonIdentityConjuncts_, getSkippedConjuncts());
  dataScanNode.init(analyzer_);
  // 然后根据表引用的槽位描述符创建输出表达式,并依此创建一个合并结点UnionNode来合并数据
  List<Expr> outputExprs = tblRef_.getDesc().getSlots().stream().map(
      entry -> new SlotRef(entry)).collect(Collectors.toList());
  UnionNode unionNode = new UnionNode(ctx_.getNextNodeId(), tblRef_.getId(),
      outputExprs, false);
  // 将之前创建的负责无Delete文件的数据扫描结点和负责有Delete文件的连接结点作为UnionNode的输入
  // 并完成初始化后返回,这样这张Iceberg表的扫描计划就创建完成了
  // 根结点为一个UnionNode,也就对应了上文中第一个查询的情况
  unionNode.addChild(dataScanNode, outputExprs);
  unionNode.addChild(joinNode, outputExprs);
  unionNode.init(analyzer_);
  Preconditions.checkState(unionNode.getChildCount() == 2);
  Preconditions.checkState(unionNode.getFirstNonPassthroughChildIndex() == 2);
  return unionNode;
}

看完关键的createIcebergScanPlanImpl()方法后,Iceberg表的扫描计划是如何制定的就已经很清晰了,但是还有最后一块也是最核心的逻辑还需要再进一步分析,也就是期间我们调用的createPositionJoinNode()方法,它被用来创建实现Delete文件和数据文件之间数据行删除逻辑的ANTI JOIN结点,代码分析如下:

  private PlanNode createPositionJoinNode() throws ImpalaException {
    Preconditions.checkState(deletesRecordCount_ != 0);
    Preconditions.checkState(dataFilesWithDeletesSumPaths_ != 0);
    Preconditions.checkState(dataFilesWithDeletesMaxPath_ != 0);
	// 我们需要为数据文件和Delete文件分别创建一个扫描结点,不过在此之前还有些准备工作
    PlanNodeId dataScanNodeId = ctx_.getNextNodeId();
    PlanNodeId deleteScanNodeId = ctx_.getNextNodeId();
    // 首先需要为Delete文件先创建一张虚拟表,因为扫描结点都是以表为基础的,而Delete文件并不是实际存在的表
    IcebergPositionDeleteTable deleteTable = new IcebergPositionDeleteTable(getIceTable(),
        getIceTable().getName() + "-POSITION-DELETE-" + deleteScanNodeId.toString(),
        deleteFiles_, deletesRecordCount_, getFilePathStats());
    analyzer_.addVirtualTable(deleteTable);
    // 有了对应Delete文件的虚拟表IcebergPositionDeleteTable后,再为其创建表引用对象TableRef
    TableRef deleteDeltaRef = TableRef.newTableRef(analyzer_,
        Arrays.asList(deleteTable.getDb().getName(), deleteTable.getName()),
        tblRef_.getUniqueAlias() + "-position-delete");
    // 为了数据文件和Delete文件之间能够进行ANTI JOIN,我们还需要为数据文件的表添加相关的虚拟列
    // 这些虚拟列并不是实际存在于表中的列,而是为了作为Join Key存在的
    // addDataVirtualPositionSlots()方法会为数据表添加两个虚拟列,分别为INPUT__FILE__NAME和FILE__POSITION
    // Impala已经为Parquet和Orc文件实现了这两个虚拟列,其数据可以在扫描时填充,含义分别是文件名和行号
    addDataVirtualPositionSlots(tblRef_);
    // addDeletePositionSlots()方法会为Delete文件的虚拟表添加两个实际列,分别为file_path和pos
    // 这两个列是在Delete文件中实际存在的,可以看上文中解析Delete文件得到的结果
    addDeletePositionSlots(deleteDeltaRef);
    // 然后就可以为数据文件和Delete文件分别创建一个扫描结点并初始化了
    IcebergScanNode dataScanNode = new IcebergScanNode(
        dataScanNodeId, tblRef_, conjuncts_, aggInfo_, dataFilesWithDeletes_,
        nonIdentityConjuncts_, getSkippedConjuncts(), deleteScanNodeId);
    dataScanNode.init(analyzer_);
    IcebergScanNode deleteScanNode = new IcebergScanNode(
        deleteScanNodeId,
        deleteDeltaRef,
        Collections.emptyList(), /*conjuncts*/
        aggInfo_,
        Lists.newArrayList(deleteFiles_),
        Collections.emptyList(), /*nonIdentityConjuncts*/
        Collections.emptyList()); /*skippedConjuncts*/
    deleteScanNode.init(analyzer_);

    // 然后是ANTI JOIN结点的创建,首先需要调用createPositionJoinConjuncts()创建连接谓词
    // 它会生成两个EQ谓词,相当于INPUT__FILE__NAME = file_path AND FILE__POSITION = pos
    // 这样数据文件中的行和Delete文件中的行匹配上时就可以在ANTI JOIN中被剔除了,实现了删除的效果
    List<BinaryPredicate> positionJoinConjuncts = createPositionJoinConjuncts(
            analyzer_, tblRef_.getDesc(), deleteDeltaRef.getDesc());
    // Impala还实现了一种专门针对Iceberg Delete文件DELETE JOIN结点IcebergDeleteNode
    // 如果没设置query option disable_optimized_iceberg_v2_read的话
    // 就会使用IcebergDeleteNode代替LEFT_ANTI_JOIN的HashJoinNode来进行ANTI JOIN
    // IcebergDeleteNode继承了HashJoinNode,同时专门针对Iceberg Delete进行了优化
    TQueryOptions queryOpts = analyzer_.getQueryCtx().client_request.query_options;
    JoinNode joinNode = null;
    if (queryOpts.disable_optimized_iceberg_v2_read) {
      joinNode = new HashJoinNode(dataScanNode, deleteScanNode,
          /*straight_join=*/true, DistributionMode.NONE, JoinOperator.LEFT_ANTI_JOIN,
          positionJoinConjuncts, /*otherJoinConjuncts=*/Collections.emptyList());
    } else {
      joinNode =
          new IcebergDeleteNode(dataScanNode, deleteScanNode, positionJoinConjuncts);
    }
    // ANTI JOIN结点的创建好之后再进行一些初始化工作就可以返回了
    joinNode.setId(ctx_.getNextNodeId());
    joinNode.init(analyzer_);
    joinNode.setIsDeleteRowsJoin();
    return joinNode;
  }

至此,Iceberg表的扫描计划的制定就结束了,后续再完成一些别的必须的工作就可以下发到执行引擎中进行执行了。

总结

这篇文章主要是在执行计划的制定方面分析了Iceberg表在Impala中是如何扫描的,总的来说为了实现高性能的Iceberg的MOR功能,对于一张包含Delete数据的Iceberg表Impala会可能会使用多个扫描结点以及ANTI JOIN结点和UNION结点协同工作,配合完成任务,这样的设计和实现既能复用很多现有的Impala代码,也能利用起现有的很多针对HDFS表的扫描优化。但是文章篇幅有限,实际上代码中还有许多内容无法详细展开分析,对于扫描的实际执行也还未能涉猎,有机会的话会在后续的文章中继续介绍。有兴趣的读者也可以自行阅读,Impala的代码注释还是比较丰富的,代码风格规范也比较好,阅读难度不大。

文章来源:https://blog.csdn.net/Eyizoha/article/details/134974480
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。