2024 .1.7 Day05_Spark_HomeWork; Spark_SQL

2024-01-07 23:12:25

目录

1. 简述Spark? SQL与HIVE的对比

2. Spark SQL是什么?

3.代码题

需求1?直接基于DataFrame来处理,完成SparkSQL版的WordCount词频统计。DSL和SQL两种方式都要实现

4.创建Spark DataFrame的几种方式?

5.? 创建得到DataFrame的方式有哪些,各自适用场景是怎么样的?

? ? ? ? ? ? ? ? 3.1 text方式读取:

? ? ? ? ? ? ? ? 3.2? CSV方式读取:

? ? ? ? ? ? ? ? ? ? 3.3 JSON读取数据:


1. 简述Spark? SQL与HIVE的对比

相同点:?

? ? ? ? 1.都是分布式SQL计算引擎

? ? ? ? 2.都可以处理大规模的结构化数据

? ? ? ? 3.都可以建立在YARN集群之上运行

不同点:

? ? ? ? 1. Sparksql是基于内存计算 , Hivesql底层是运行在Mr上,也就是基于磁盘进行计算

? ? ? ? 2. sparksql没有元数据管理服务, hivesql是有metastore元数据管理服务的

? ? ? ? 3. Sparksql底层执行RDD程序 , HIVEsql底层执行MapReduce

? ? ? ? 4. Sparksql可以编写sql也可以编写代码, HIVEsql只能编写sql

2. Spark SQL是什么?

SparkSQL是建立在Spark上的一个工具模块,用于处理结构化的数据

3.代码题

需求1?直接基于DataFrame来处理,完成SparkSQL版的WordCount词频统计。DSL和SQL两种方式都要实现

测试数据: hello spark hadoop hive oozie sqoop hello hive hadoop java java python hadoop hive hadoop

?

import os
from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession
import pyspark.sql.functions as F

# 绑定指定的Python解释器
os.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'
# 绑定指定的Python解释器
from pyspark.sql.types import StructType, IntegerType, StringType, StructField

if __name__ == '__main__':
# 1- 创建SparkSession对象
    spark = SparkSession.builder\
        .appName('需求1词频统计')\
        .master('local[*]')\
        .getOrCreate()
# 2- 数据输入
    init_df = spark.read.text(
        paths='hdfs://node1:8020/input/day05_home_work.txt'
    )
    # 创建侧视图
    init_df.createTempView('words')
# 3- 数据处理
    print("SQL方式进行词频统计")
    spark.sql("""
    select word,count(1)as cnt 
    from
        (select explode(split(value,' ')) as word 
        from words)
        group by word
        order by cnt desc 
    """).show()
    '''
+------+---+
|  word|cnt|
+------+---+
|hadoop|  4|
|  hive|  3|
| hello|  2|
|  java|  2|
| spark|  1|
| oozie|  1|
| sqoop|  1|
|python|  1|
+------+---+
'''
    print('DSL方式实现词频统计')
    init_df.select(
        F.explode(F.split('value',' ')).alias('word')
    ).groupby('word').agg(
        F.count('word').alias('cnt'),
    ).orderBy('cnt',ascending=False).show()

    '''
    +------+---+
    |  word|cnt|
    +------+---+
    |hadoop|  4|
    |  hive|  3|
    |  java|  2|
    | hello|  2|
    | sqoop|  1|
    | spark|  1|
    |python|  1|
    | oozie|  1|
    +------+---+
    '''

# 4- 数据输出

# 5- 释放资源
    spark.stop()

4.创建Spark DataFrame的几种方式?

????????1 . 通过RDD得到DataFrame

????????2. 内部初始化数据得到DataFrame

????????3.? 读取外部文件得到DataFrame

5.? 创建得到DataFrame的方式有哪些,各自适用场景是怎么样的?

? ? ? ? 1 . RDD转DataFrame , 场景 : RDD可以存储任意结构的数据类型,而DataFrame只能存储二维表结构化数据, 在使用Spark处理数据的初期,可能输入进来的数据是半结构化或者非结构化的,那么可以先通过RDD对数据进行ETL处理成结构化数据,再使用开发高效率的SparkSQL进行后续数据处理;

? ? ? ? 2.? 内部初始化数据得到DataFrame ,? 通过createDataFrame创建DataFrame , 一般用在开发和测试中.因为只能处理少量的数据

? ? ? ? 3.? 读取外部文件得到DataFrame , Text方式\CSV方式\JSON方式 ;?

? ? ? ? ? ? ? ? 3.1 text方式读取:

????????????????????????不管文件内容如何,会将所有内容放到一个列中;

? ? ? ? ? ? ? ? ? ? ? ? 默认生成的列名叫做value,数据类型String;并且只能修改value的名称,其他内容无法修改;

? ? ? ? ? ? ? ? 3.2? CSV方式读取:

? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? 常设置的参数

? ? ? ? ? ? ? ? ? ? ? ? ????????path:指定文件路径,本地或者hdfs

? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? schema:手动指定元数据信息
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? sep:指定字段间的分隔符
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? encoding:指定文件的编码方式
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? header:指定文件中的第一行是否是字段名称
? ? ? ? ????????????????????????inferSchema:根据数据内容自动推断数据类型。但是,推断结果可能不精确

? ? ? ? ? ? ? ? ? ? 3.3 JSON读取数据:

? ? ? ? ? ? ? ? ? ? ? ? 需要手动指定schema信息.如果手动指定的时候,名称字段与json中的key名称不一致,会解析不成功, 以null值填充

? ? ? ? ? ? ? ? ? ? ? ? csv/json中schema的结构,如果是字符串类型,那么字段名称和字段数据类型间,只能以空格分隔

文章来源:https://blog.csdn.net/m0_49956154/article/details/135444138
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。