2024.1.7 Spark SQL , DataFrame
目录
一 . SparkSQL简介
Spark SQL只能处理结构化数据 ,属于Spark框架一个部分? Schema:元数据信息
特点: 融合性 ,统一数据访问,hive兼容 , 标准化连接??
将hive sql翻译成Spark上对应的RDD操作 ,底层运行SparkRDD?
DataFrames是在RDD上面增加与省略了一些东西
DataFrame? =? RDD -泛型 +Schema? +方便到的SQL操作 + 优化? ,是个特殊的RDD
RDD存储任意结构数据? ;? ? ? ? ?DataFrame存储二维表结构数据
二 . Spark SQL与HIVE的异同?
1- Spark SQL是基于内存计算, 而HIVE SQL是基于磁盘进行计算的
2- Spark SQL没有元数据管理服务(自己维护), 而HIVE SQL是有metastore的元数据管理服务的
3- Spark SQL底层执行Spark RDD程序, 而HIVE SQL底层执行是MapReduce
4- Spark SQL可以编写SQL也可以编写代码,但是HIVE SQL仅能编写SQL语句
?三 . DataFrame
DataFrame表示的是一个二维的表。二维表,必然存在行、列等表结构描述信息
表结构描述信息(元数据Schema): StructType对象
字段: StructField对象,可以描述字段名称、字段数据类型、是否可以为空
行: Row对象
列: Column对象,包含字段名称和字段值
在一个StructType对象下,由多个StructField组成,构建成一个完整的元数据信息
1. 创建 DataFrame
import os
from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession
# 绑定指定的Python解释器
os.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'
if __name__ == '__main__':
spark = SparkSession.builder.appName('创建DataFrame')\
.master('local[*]').getOrCreate()
init_df = spark.createDataFrame(
data=[(1,'张三',18),(2,'李四',40),(3,'王五',60)],
schema='id:int,name:string,age:int'
)
init_df2 = spark.createDataFrame(
data=[(1, '张三', 18), (2, '李四', 30),(3,'王五',60)],
schema=["id","name","age"]
)
init_df.show()
'''
+---+----+---+
| id|name|age|
+---+----+---+
| 1|张三| 18|
| 2|李四| 30|
+---+----+---+
'''
init_df2.show()
init_df.printSchema()
'''
root
|-- id: integer (nullable = true)
|-- name: string (nullable = true)
|-- age: integer (nullable = true)
'''
init_df2.printSchema()
'''
root
|-- id: long (nullable = true)
|-- name: string (nullable = true)
|-- age: long (nullable = true)
'''
spark.stop()
2. RDD转换DataFrame
from pyspark import SparkConf, SparkContext
import os
from pyspark.sql import SparkSession
# 绑定指定的Python解释器
from pyspark.sql.types import StructType, IntegerType, StringType, StructField
os.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'
if __name__ == '__main__':
# 1- 创建SparkSession对象
spark = SparkSession.builder\
.appName('rdd_2_dataframe')\
.master('local[*]')\
.getOrCreate()
# 通过SparkSession得到SparkContext
sc = spark.sparkContext
# 2- 数据输入
# 2.1- 创建一个RDD
init_rdd = sc.parallelize(["1,李白,20","2,安其拉,18"])
# 2.2- 将RDD的数据结构转换成二维结构
new_rdd = init_rdd.map(lambda line: (
int(line.split(",")[0]),
line.split(",")[1],
int(line.split(",")[2])
)
)
# 将RDD转成DataFrame:方式一
# schema方式一
schema = StructType()\
.add('id',IntegerType(),False)\
.add('name',StringType(),False)\
.add('age',IntegerType(),False)
# schema方式二
schema = StructType([
StructField('id',IntegerType(),False),
StructField('name',StringType(),False),
StructField('age',IntegerType(),False)
])
# schema方式三
schema = "id:int,name:string,age:int"
# schema方式四
schema = ["id","name","age"]
init_df = spark.createDataFrame(
data=new_rdd,
schema=schema
)
# 将RDD转成DataFrame:方式二
"""
toDF:中的schema既可以传List,也可以传字符串形式的schema信息
"""
# init_df = new_rdd.toDF(schema=["id","name","age"])
init_df = new_rdd.toDF(schema="id:int,name:string,age:int")
# 3- 数据处理
# 4- 数据输出
init_df.show()
init_df.printSchema()
# 5- 释放资源
sc.stop()
spark.stop()
?
四 . 操作DataFrame
?SQL方式:
df.createTempView('视图名称'): 创建一个临时的视图(表名)
df.createOrReplaceTempView('视图名称'): 创建一个临时的视图(表名),如果视图存在,直接替换
临时视图,仅能在当前这个Spark Session的会话中使用
df.createGlobalTempView('视图名称'): 创建一个全局视图,运行在一个Spark应用中多个spark会话中都可以使用。在使用的时候必须通过 global_temp.视图名称 方式才可以加载到。较少使用
DSL方式:
-
show():用于展示DF中数据, 默认仅展示前20行
-
参数1:设置默认展示多少行 默认为20
-
参数2:是否为阶段列, 默认仅展示前20个字符数据, 如果过长, 不展示(一般不设置)
-
-
printSchema():用于打印当前这个DF的表结构信息
-
select():类似于SQL中select, SQL中select后面可以写什么, 这样同样也一样
-
filter()和 where():用于对数据进行过滤操作, 一般在spark SQL中主要使用where
-
groupBy():用于执行分组操作
-
orderBy():用于执行排序操作
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。 如若内容造成侵权/违法违规/事实不符,请联系我的编程经验分享网邮箱:veading@qq.com进行投诉反馈,一经查实,立即删除!