Spark-Streaming+HDFS+Hive实战

2023-12-16 13:32:15


前言

在本文中,我们将介绍如何使用Spark Streaming从GBIF接口获取数据,并将其处理为HDFS文件,并映射为Hive外部表。我们将详细说明Spark Streaming、HDFS和Hive的简介,并给出了从GBIF接口获取数据并处理为HDFS文件并映射为Hive外部表的实际需求说明。


一、简介

1. Spark-Streaming简介

Spark Streaming是Apache Spark提供的一种可扩展、高吞吐量的实时数据处理引擎。它允许您使用Spark的强大功能来处理实时数据流,并提供了与批处理作业相似的编程模型。Spark Streaming支持各种数据源,包括Kafka、Flume、HDFS等,并提供了丰富的操作和转换函数来处理流数据。

2. HDFS简介

HDFS(Hadoop Distributed File System)是Apache Hadoop生态系统的核心组件之一,用于存储和处理大规模数据集。HDFS采用分布式存储和计算的方式,将大文件切分为多个数据块,并将这些数据块存储在集群中的多个节点上,以实现高容错性和高吞吐量的数据存储和处理能力。

3. Hive简介

Hive是基于Hadoop的数据仓库基础设施,提供了类似于传统数据库的查询和分析功能。Hive使用HiveQL(类似于SQL)作为查询语言,并将查询转换为MapReduce任务或Spark任务来执行。Hive还支持表的分区和存储格式的定义,以及外部表的创建,使得数据的管理和查询更加灵活和高效。

二、需求说明

需求说明:从GBIF接口获取数据并处理为HDFS文件并映射为Hive外部表

1. 目标:

  • 从GBIF(Global Biodiversity Information Facility)接口获取数据。
  • 使用Spark Streaming处理数据。
  • 将处理后的数据保存到HDFS文件系统。
  • 创建Hive外部表,将HDFS文件映射为表。

2. 数据源:

  • GBIF接口(https://api.gbif.org/v1/dataset)提供了生物多样性相关的数据集。

3. 数据处理流程:

  • 使用HTTP请求从GBIF接口获取数据集。
  • 使用Spark Streaming处理数据集,可以使用httpclient获取数据。
  • 对获取的数据进行必要的转换、清洗和处理,以满足需求。
  • 将处理后的数据保存到HDFS文件系统。

4. HDFS文件保存:

  • 使用Spark Streaming将处理后的数据保存到HDFS文件系统。
  • 可以选择合适的文件格式(如Parquet、ORC、Avro等)进行保存。

5. Hive外部表映射:

  • 在Hive中创建一个外部表,将HDFS文件映射为表。
  • 外部表可以直接引用HDFS文件中的数据,而无需将数据复制到Hive的仓库目录。
  • 可以定义表的结构和分区等元数据信息。

三、实战示例演练

1. 编写gbifdataset.properties配置文件

hdfsUri=hdfs://192.168.198.101:8020
pathDir=/opt/bigdata/dataset

# 请求url
url=https://api.gbif.org/v1/dataset

2. 导入依赖

这部分代码是Maven的依赖配置,用于引入所需的Spark、Kafka和MySQL相关的库。

<dependency>
    <groupId>org.apache.httpcomponents</groupId>
    <artifactId>httpclient</artifactId>
    <version>4.5.14</version>
</dependency>
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-autoconfigure</artifactId>
    <version>2.7.15</version>
</dependency>
<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-core_2.11</artifactId>
    <version>2.4.0</version>
</dependency>
<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-sql_2.11</artifactId>
    <version>2.4.0</version>
</dependency>
<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-streaming_2.11</artifactId>
    <version>2.4.0</version>
</dependency>
<dependency>
    <groupId>org.projectlombok</groupId>
    <artifactId>lombok</artifactId>
    <version>1.18.28</version>
    <scope>compile</scope>
</dependency>
<dependency>
    <groupId>com.google.code.gson</groupId>
    <artifactId>gson</artifactId>
    <version>2.9.1</version>
</dependency>

3. 编写ConfigUtils类

package com.zcs.utils;

import java.io.IOException;
import java.io.InputStream;
import java.io.Serializable;
import java.util.Properties;

public class ConfigUtils implements Serializable {
   
    public static String getDatasetProp(String key) throws IOException {
   
        Properties properties = new Properties();
        InputStream resource = ConfigUtils.class.getResourceAsStream("/gbifdataset.properties");
        properties.load(resource);
        return properties.getProperty(key);
    }
}

4. 编写FieldUtils类

package com.zcs.utils;

import java.io.Serializable;
import java.lang.reflect.Field;
import java.util.List;

public class FieldUtils implements Serializable 

文章来源:https://blog.csdn.net/zcs2312852665/article/details/135029749
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。