iceberg1.4.2 +minio通过spark创建表,插入数据

2023-12-26 06:17:50

iceberg 是一种开放的表格式管理,解决大数据数据中结构化,非结构化和半结构化不统一的问题。主要是通过对表的管理实现增删改查,同时支持历史回滚(版本旅行)等操作。下层支持hadoop,s3,对象存储,上层支持hive,spark,flink 等应用。实现在中间把两部分隔离开来,实现一种对接和数据管理的标准。有这个标准,不管是谁建的表,都可以操作和访问。比如我用spark创建表,flink去读取的时候,可以读取到数据。不存在组件不同无法识别的情况。

在idea进行pom.xml配置

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
  <modelVersion>4.0.0</modelVersion>
  <groupId>org.gbicc</groupId>
  <artifactId>bigdata</artifactId>
  <version>1.0-SNAPSHOT</version>
  <inceptionYear>2008</inceptionYear>
  <properties>
    <scala.version>2.12.18</scala.version>
  </properties>

  <repositories>
    <repository>
      <id>scala-tools.org</id>
      <name>Scala-Tools Maven2 Repository</name>
      <url>http://scala-tools.org/repo-releases</url>
    </repository>
  </repositories>

  <pluginRepositories>
    <pluginRepository>
      <id>scala-tools.org</id>
      <name>Scala-Tools Maven2 Repository</name>
      <url>http://scala-tools.org/repo-releases</url>
    </pluginRepository>
  </pluginRepositories>

  <dependencies>
    <dependency>
      <groupId>org.scala-lang</groupId>
      <artifactId>scala-library</artifactId>
      <version>${scala.version}</version>
    </dependency>
    <dependency>
      <groupId>junit</groupId>
      <artifactId>junit</artifactId>
      <version>4.4</version>
      <scope>test</scope>
    </dependency>
    <dependency>
      <groupId>org.specs</groupId>
      <artifactId>specs</artifactId>
      <version>1.2.5</version>
      <scope>test</scope>
    </dependency>
    <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-core -->
    <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-core -->

    <dependency>
      <groupId>org.apache.iceberg</groupId>
      <artifactId>iceberg-core</artifactId>
      <version>1.4.2</version>
    </dependency>

    <dependency>
      <groupId>io.minio</groupId>
      <artifactId>minio</artifactId>
      <version>8.5.7</version>
    </dependency>
    <!-- https://mvnrepository.com/artifact/com.amazonaws/aws-java-sdk-s3 -->
    <dependency>
      <groupId>com.amazonaws</groupId>
      <artifactId>aws-java-sdk-s3</artifactId>
      <version>1.12.620</version>
    </dependency>
    <dependency>
      <groupId>org.apache.hadoop</groupId>
      <artifactId>hadoop-aws</artifactId>
      <version>3.2.2</version>
    </dependency>
    <dependency>
      <groupId>org.apache.hadoop</groupId>
      <artifactId>hadoop-common</artifactId>
      <version>3.2.2</version>
    </dependency>


    <!-- https://mvnrepository.com/artifact/org.apache.iceberg/iceberg-data -->
    <dependency>
      <groupId>org.apache.iceberg</groupId>
      <artifactId>iceberg-data</artifactId>
      <version>1.4.2</version>
    </dependency>
    <dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-core_2.12</artifactId>
    <version>3.4.2</version> <!-- 根据实际情况选择版本号 -->
  </dependency>
    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-sql_2.12</artifactId>
      <version>3.4.2</version> <!-- 根据实际情况选择版本号 -->
    </dependency>
    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-streaming_2.12</artifactId>
      <version>3.4.2</version> <!-- 根据实际情况选择版本号 -->
    </dependency>
    <!-- https://mvnrepository.com/artifact/org.apache.iceberg/iceberg-spark -->
    <dependency>
      <groupId>org.apache.iceberg</groupId>
      <artifactId>iceberg-spark</artifactId>
      <version>1.4.2</version>
    </dependency>
      <!-- https://mvnrepository.com/artifact/org.apache.iceberg/iceberg-spark-runtime-3.3 -->
      <dependency>
          <groupId>org.apache.iceberg</groupId>
          <artifactId>iceberg-spark-runtime-3.4_2.12</artifactId>
          <version>1.4.2</version>
      </dependency>
    <dependency>
      <groupId>com.fasterxml.jackson.core</groupId>
      <artifactId>jackson-databind</artifactId>
      <version>2.14.2</version>

    </dependency>
    <dependency>
      <groupId>org.apache.iceberg</groupId>
      <artifactId>iceberg-data</artifactId>
      <version>1.4.2</version>
    </dependency>

    <dependency>
      <groupId>com.amazonaws</groupId>
      <artifactId>aws-java-sdk-s3</artifactId>
      <version>1.12.620</version>
    </dependency>
    <dependency>
      <groupId>org.apache.hadoop</groupId>
      <artifactId>hadoop-aws</artifactId>
      <version>3.2.2</version>
    </dependency>
    <dependency>
      <groupId>org.apache.iceberg</groupId>
      <artifactId>iceberg-aws</artifactId>
      <version>1.4.2</version>
    </dependency>
    <dependency>
      <groupId>com.amazonaws</groupId>
      <artifactId>aws-java-sdk-bundle</artifactId>
      <version>1.11.375</version>
    </dependency>
    <dependency>
      <groupId>org.apache.iceberg</groupId>
      <artifactId>iceberg-parquet</artifactId>
      <version>1.4.2</version>
    </dependency>
    <dependency>
      <groupId>io.delta</groupId>
      <artifactId>delta-core_2.12</artifactId>
      <version>2.4.0</version>
    </dependency>
    <dependency>
      <groupId>io.delta</groupId>
      <artifactId>delta-spark_2.12</artifactId>
      <version>3.0.0</version>
    </dependency>
  </dependencies>


  <reporting>
    <plugins>
      <plugin>
        <groupId>org.scala-tools</groupId>
        <artifactId>maven-scala-plugin</artifactId>
        <configuration>
          <scalaVersion>${scala.version}</scalaVersion>
        </configuration>
      </plugin>
    </plugins>
  </reporting>
</project>

下面进行代码编写

package org.icebergtest

import org.apache.iceberg.{PartitionSpec, Schema}
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.iceberg.catalog.TableIdentifier
import org.apache.iceberg.spark.SparkSchemaUtil
import org.apache.iceberg.types.Types
import org.apache.spark.sql.types._
import org.apache.iceberg._
import org.apache.iceberg.catalog.TableIdentifier
import org.apache.iceberg.data.GenericRecord
import org.apache.iceberg.types.{Types => _, _}
object icebergspark {
  def main(args: Array[String]): Unit = {
    val spark: SparkSession = SparkSession.builder().master("local").appName("test")
      /* .config("fs.s3a.aws.credentials.provider", "org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider")
       .config("spark.hadoop.fs.s3a.access.key", "minioadmin")
       .config("spark.hadoop.fs.s3a.secret.key", "minioadmin")
       .config("spark.hadoop.fs.s3a.endpoint", "http://127.0.0.1:9000")
       .config("spark.hadoop.fs.s3a.connection.ssl.enabled", "false")
       .config("spark.hadoop.fs.s3a.path.style.access", "true")
       .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
       .config("spark.debug.maxToStringFields", "2048")*/
      .config("spark.hadoop.fs.s3a.access.key", "minioadmin")
      .config("spark.hadoop.fs.s3a.secret.key", "minioadmin")
      .config("spark.hadoop.spark.hadoop.fs.s3a.endpoint", "http://127.0.0.1:9000")
      .config("spark.hadoop.fs.s3a.connection.ssl.enabled", "false")
      .config("spark.hadoop.fs.s3a.path.style.access", "true")
      .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
      .config("spark.hadoop.fs.s3a.aws.credentials.provider", "org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider")
      .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
      //指定hadoop catalog,catalog名称为hadoop_prod
      .config("spark.sql.catalog.hadoop_prod", "org.apache.iceberg.spark.SparkCatalog")
      .config("spark.sql.catalog.hadoop_prod.type", "hadoop")
      .config("spark.sql.catalog.hadoop_prod.hadoop.fs.s3a.access.key", "minioadmin")
        .config("spark.sql.catalog.hadoop_prod.hadoop.fs.s3a.secret.key", "minioadmin")
        .config("spark.sql.catalog.hadoop_prod.hadoop.fs.s3a.endpoint", "http://127.0.0.1:9000")


      .config("spark.sql.catalog.hadoop_prod.warehouse", "s3a://test1/")
      .config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions")
      .getOrCreate()
    import org.apache.iceberg.spark.SparkSessionCatalog
    // 将 Iceberg 的 SparkSessionCatalog 注册到 Spark 中// 将 Iceberg 的 SparkSessionCatalog 注册到 Spark 中

    // 将 Iceberg 的 SparkSessionCatalog 注册到 Spark 中


    //1.创建Iceberg表,并插入数据
    //spark.sql("create table hadoop_prod.mydb.mytest (id int,name string,age int) using iceberg".stripMargin)

    spark.sql(
      """
        |insert into hadoop_prod.mydb.mytest values (1,"zs",18),(2,"ls",19),(3,"ww",20)
      """.stripMargin)
    //1.SQL 方式读取Iceberg中的数据
   // spark.sql("select * from hadoop_prod.mydb.mytest").show()
    spark.sql(
      """
        |select * from hadoop_prod.mydb.mytest VERSION AS OF 4696493712637386339;

      """.stripMargin).show()
    /**
      * 2.使用Spark查询Iceberg中的表除了使用sql 方式之外,还可以使用DataFrame方式,建议使用SQL方式
      */
    //第一种方式使用DataFrame方式查询Iceberg表数据snapshots,history,manifests,files
  val frame1: DataFrame = spark.table("hadoop_prod.mydb.mytest.snapshots")
   frame1.show()
    val frame2: DataFrame = spark.table("hadoop_prod.mydb.mytest.history")
    frame2.show()
   // spark.read.option("snapshot-id","4696493712637386339"). format("iceberg").load("3a://test/mydb/mytest")
    //第二种方式使用DataFrame加载 Iceberg表数据
   val frame3: DataFrame = spark.read.format("iceberg").load("hadoop_prod.mydb.mytest")
   frame3.show()
  }
}

通过上面的例子,直接复制执行

文章来源:https://blog.csdn.net/smileyboy2009/article/details/135195350
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。