SparkSQL的编程模型(DataFrame和DataSet)
1.2 SparkSQL的编程模型(DataFrame和DataSet)
1.2.1 ?编程模型简介
主要通过两种方式操作SparkSQL,一种就是SQL,另一种为DataFrame和Dataset。
-
SQL SQL不用多说,就和Hive操作一样,但是需要清楚一点的时候,SQL操作的是表,所以要想用SQL进行操作,就需要将SparkSQL对应的编程模型转化成为一张表才可以。 同时支持,通用SQL和HQL。
-
DataFrame和Dataset
DataFrame和Dataset是SparkSQL中的编程模型。DataFrame和Dataset我们都可以理解为是一张mysql中的二维表,表有什么?表头,表名,字段,字段类型。RDD其实说白了也是一张二维表,但是这张二维表相比较于DataFrame和Dataset却少了很多东西,比如表头,表名,字段,字段类型,只有数据。 Dataset是在spark1.6.2开始出现出现的api,DataFrame是1.3的时候出现的,早期的时候DataFrame叫SchemaRDD,SchemaRDD和SparkCore中的RDD相比较,就多了Schema,所谓约束信息,元数据信息。 一般的,将RDD称之为Spark体系中的第一代编程模型;DataFrame比RDD多了一个Schema元数据信息,被称之为Spark体系中的第二代编程模型;Dataset吸收了RDD的优点(强类型推断和强大的函数式编程)和DataFrame中的优化(SQL优化引擎,内存列存储),成为Spark的最新一代的编程模型。
1.2.2 RDD\DataSet\DataFrame
RDD
弹性分布式数据集,是Spark对数据进行的一种抽象,可以理解为Spark对数据的一种组织方式,更简单些说,RDD就是一种数据结构,里面包含了数据和操作数据的方法
从字面上就能看出的几个特点:
-
弹性:
数据可完全放内存或完全放磁盘,也可部分存放在内存,部分存放在磁盘,并可以自动切换
RDD出错后可自动重新计算(通过血缘自动容错)
可checkpoint(设置检查点,用于容错),可persist或cache(缓存)
里面的数据是分片的(也叫分区,partition),分片的大小可自由设置和细粒度调整
-
分布式:
RDD中的数据可存放在多个节点上
-
数据集:
数据的集合,没啥好说的
相对于与DataFrame和Dataset,RDD是Spark最底层的抽象,目前是开发者用的最多的,但逐步会转向DataFrame和Dataset(当然,这是Spark的发展趋势)
DataFrame
DataFrame:理解了RDD,DataFrame就容易理解些,DataFrame的思想来源于Python的pandas库,RDD是一个数据集,DataFrame在RDD的基础上加了Schema(描述数据的信息,可以认为是元数据,DataFrame曾经就有个名字叫SchemaRDD)
假设RDD中的两行数据长这样;
| 1 | 张三 | 20 |
|---|---|---|
| 2 | 李四 | 21 |
| 3 | 王五 | 22 |
那么在DataFrame中数据就变成这样;
| ID:Int | Name:String | Age:Int |
|---|---|---|
| 1 | 张三 | 20 |
| 2 | 李四 | 21 |
| 3 | 王五 | 22 |
从上面两个表格可以看出,DataFrame比RDD多了一个表头信息(Schema),像一张表了,DataFrame还配套了新的操作数据的方法等,有了DataFrame这个高一层的抽象后,我们处理数据更加简单了,甚至可以用SQL来处理数据了,对开发者来说,易用性有了很大的提升。,不仅如此,通过DataFrame API或SQL处理数据,会自动经过Spark 优化器(Catalyst)的优化,即使你写的程序或SQL不高效,也可以运行的很快。
Dataset
相对于RDD,Dataset提供了强类型支持,也是在RDD的每行数据加了类型约束
假设RDD中的两行数据长这样;
| 1 | 张三 | 20 |
|---|---|---|
| 2 | 李四 | 21 |
| 3 | 王五 | 22 |
那么在DataFrame中数据就变成这样;
| ID:Int | Name:String | Age:Int |
|---|---|---|
| 1 | 张三 | 20 |
| 2 | 李四 | 21 |
| 3 | 王五 | 22 |
那么在DataSet中数据就变成这样;
| Person(id:Int,Name:String,Age:Int) |
|---|
| Person(1,张三,20) |
| Person(2,李四,21) |
| Person(3,王五,22) |
目前仅支持Scala、Java API,尚未提供Python的API(所以一定要学习Scala),相比DataFrame,Dataset提供了编译时类型检查,对于分布式程序来讲,提交一次作业太费劲了(要编译、打包、上传、运行),到提交到集群运行时才发现错误,实在是想骂人,这也是引入Dataset的一个重要原因。 ? 使用DataFrame的代码中json文件中并没有score字段,但是能编译通过,但是运行时会报异常!如下图代码所示.
val df1 = spark.read.json( "/tmp/people.json")
// json文件中没有score字段,但是能编译通过
val df2 = df1.filter("score > 60")df2.show()
而使用Dataset实现,会在IDE中就报错,出错提前到了编译之前
val ds1 = spark.read.json( "/tmp/people.json" ).as[ People]
// 使用dataset这样写,在IDE中就能发现错误
val ds2 = ds1.filter(_.score < 60)
val ds3 = ds1.filter(_.age < 18)
// 打印
ds3.show( )
总体来说DS这种方式更加合理,并且更加人性化,比较适合程序员的开发及使用,而且Spark在2.X版本以后也在推行开发者开发中使用DS进行开发。
1.2.3 SparkSQL的编程入口
在SparkSQL中的编程模型,不在是SparkContext,但是创建需要依赖SparkContext。SparkSQL中的编程模型,在spark2.0以前的版本中为SQLContext和HiveContext,HiveContext是SQLContext的一个子类,提供Hive中特有的一些功能,比如row_number开窗函数等等,这是SQLContext所不具备的,在Spark2.0之后将这两个进行了合并——SparkSession。SparkSession的构建需要依赖SparkConf或者SparkContext。使用工厂构建器(Builder方式)模式创建SparkSession。
1.2.4 SparkSQL基本编程
创建SparkSQL的模块
创建工程省略,直接在原有工程引入Pom即可
<dependency>
? ? <groupId>org.apache.spark</groupId>
? ? <artifactId>spark-sql_2.12</artifactId>
? ? <version>${spark.version}</version>
</dependency>
1.2.5 SparkSQL编程初体验
-
SparkSession的构建
val spark = SparkSession.builder()
.appName("SparkSQLOps")
.master("local[*]")
//.enableHiveSupport()//支持hive的相关操作
.getOrCreate()
-
基本编程
object _01SparkSQLOps {
? ?def main(args: Array[String]): Unit = {
?
? ? ? ?val spark = SparkSession.builder()
? ? ? ? ? ? ? .appName("SparkSQLOps")
? ? ? ? ? ? ? .master("local[*]")
// ? ? ? ? ? ? ? .enableHiveSupport()//支持hive的相关操作
? ? ? ? ? ? ? .getOrCreate()
? ? ? ?//加载数据
? ? ? ?val pdf:DataFrame = spark.read.json("file:///E:/data/spark/sql/people.json")
? ? ? ?//二维表结构
? ? ? ?pdf.printSchema()
? ? ? ?//数据内容 select * from tbl
? ? ? ?pdf.show()
? ? ? ?//具体的查询 select name, age from tbl
? ? ? ?pdf.select("name", "age").show()
? ? ? ?import spark.implicits._//导入sparksession中的隐式转换操作,增强sql的功能
? ? ? ?pdf.select($"name",$"age").show()
? ? ? ?//列的运算,给每个人的年龄+10 select name, age+10,height-1 from tbl
? ? ? ?pdf.select($"name",$"height" - 1, new Column("age").+(10)).show()
? ? ? ?//起别名 select name, age+10 as age,height-1 as height from tbl
? ? ? ?pdf.select($"name",($"height" - 1).as("height"), new Column("age").+(10).as("age")).show()
? ? ? ?//做聚合统计 统计不同年龄的人数 select age, count(1) counts from tbl group by age
? ? ? ?pdf.select($"age").groupBy($"age").count().show()
? ? ? ?//条件查询 获取年龄超过18的用户 select * from tbl where age > 18
? ? ? ?// pdf.select("name", "age", "height").where($"age".>(18)).show()
? ? ? ?pdf.select("name", "age", "height").where("age > 18").show()
? ? ? ?//sql
? ? ? ?//pdf.registerTempTable()
? ? ? ?//在spark2.0之后处于维护状态,使用createOrReplaceTempView
? ? ? ?/*
? ? ? ? ? ?从使用范围上说,分为global和非global
? ? ? ? ? ? ? ?global是当前SparkApplication中可用,非global只在当前SparkSession中可用
? ? ? ? ? ?从创建的角度上说,分为createOrReplace和不Replace
? ? ? ? ? ? ? ?createOrReplace会覆盖之前的数据
? ? ? ? ? ? ? ?create不Replace,如果视图存在,会报错
? ? ? ? */
? ? ? ?pdf.createOrReplaceTempView("people")
? ? ? ?// 使用SQL语法进行处理
? ? ? ?spark.sql(
? ? ? ? ? ?"""
? ? ? ? ? ? ?|select
? ? ? ? ? ? ?| age,
? ? ? ? ? ? ?| count(1) as countz
? ? ? ? ? ? ?|from people
? ? ? ? ? ? ?|group by age
? ? ? ? ? ?""".stripMargin).show
? ? ? ?// 打印输出
? ? ? ?spark.stop()
? }
}
1.2.6 SparkSQL编程模型的操作
DataFrame的构建方式
构建方式有两,一种通过Javabean+反射的方式来进行构建;还有一种的话通过动态编码的方式来构建。
-
JavaBean+反射
import org.apache.spark.sql.{DataFrame, SparkSession}
?
/**
* 创建DataFrame方式
* 反射方式
*/
object _02SparkSQLCreateDFOps {
?def main(args: Array[String]): Unit = {
? ?// 创建执行入口
? ?val spark = SparkSession.builder()
? ? .appName("createDF")
? ? .master("local")
? ? .getOrCreate()
? ?// 创建集合数据,并将数据封装到样例类中
? ?val list = List(
? ? ?student(1,"王凯",0,23),
? ? ?student(2,"赵凯",0,32),
? ? ?student(3,"姜华劲",1,24)
? )
? ?// 导入隐式转换
? ?import spark.implicits._
? ?// 创建DF,创建DF的同时可以进行字段名重命名
? ?val df: DataFrame = list.toDF("ids","names","genders","ages")
? ?// 打印输出
? ?df.printSchema()
? ?df.show()
? ?// 关闭
? ?spark.stop()
}
}
// 构建反射方式的样例类
case class student(id:Int,name:String,gender:Int,age:Int)
-
动态编程
object _02SparkSQLDataFrameOps {
? ?def main(args: Array[String]): Unit = {
? ? ? ?val spark = SparkSession.builder()
? ? ? ? ? ? ? ? ? .master("local[*]")
? ? ? ? ? ? ? ? ? .appName("SparkSQLDataFrame")
? ? ? ? ? ? ? ? ? .getOrCreate()
/*
? ? ? ? ? ?使用动态编程的方式构建DataFrame
? ? ? ? ? ?Row-->行,就代表了二维表中的一行记录,jdbc中的resultset,就是java中的一个对象
? ? ? ? */
val row:RDD[Row] = spark.sparkContext.parallelize(List(
? ?Row(1, "李伟", 1, 180.0),
? ?Row(2, "汪松伟", 2, 179.0),
? ?Row(3, "常洪浩", 1, 183.0),
? ?Row(4, "麻宁娜", 0, 168.0)
))
//表对应的元数据信息
val schema = StructType(List(
? ?StructField("id", DataTypes.IntegerType, false),
? ?StructField("name", DataTypes.StringType, false),
? ?StructField("gender", DataTypes.IntegerType, false),
? ?StructField("height", DataTypes.DoubleType, false)
))
?
val df = spark.createDataFrame(row, schema)
df.printSchema()
df.show()
? }
}
说明:这里学习三个新的类:
Row:代表的是二维表中的一行记录,或者就是一个Java对象
StructType:是该二维表的元数据信息,是StructField的集合
StructField:是该二维表中某一个字段/列的元数据信息(主要包括,列名,类型,是否可以为null)
-
总结: 这两种方式,都是非常常用,但是动态编程更加的灵活,因为javabean的方式的话,提前要确定好数据格式类型,后期无法做改动。
Dataset的构建方式
Dataset是DataFrame的升级版,创建方式和DataFrame类似,但有不同。
//dataset的构建
object _03SparkSQLDatasetOps {
? ?def main(args: Array[String]): Unit = {
?
? ? ? ?val spark = SparkSession.builder()
? ? ? ? ? ? ? ? ? .appName("SparkSQLDataset")
? ? ? ? ? ? ? ? ? .master("local[*]")
? ? ? ? ? ? ? ? ? .getOrCreate()
?
? ? ? ?//dataset的构建
? ? ? ?val list = List(
? ? ? ? ? ?new Student(1, "王盛芃", 1, 19),
? ? ? ? ? ?new Student(2, "李金宝", 1, 49),
? ? ? ? ? ?new Student(3, "张海波", 1, 39),
? ? ? ? ? ?new Student(4, "张文悦", 0, 29)
? ? ? )
? ? ? ?import spark.implicits._
? ? ? ?val ds = spark.createDataset[Student](list)
? ? ? ?ds.printSchema()
? ? ? ?ds.show()
? ? ? ?spark.stop()
? }
}
case class Student(id:Int, name:String, gender:Int, age:Int)
注意:出现如下错误

在创建Dataset的时候,需要注意数据的格式,必须使用case class,或者基本数据类型,同时需要通过import spark.implicts._来完成数据类型的编码,而抽取出对应的元数据信息,否则编译无法通过
RDD和DataFrame以及DataSet的互相转换
RDD--->DataFrame
? ?def beanRDD2DataFrame(spark:SparkSession): Unit = {
? ? ? ?val stuRDD:RDD[Student] = spark.sparkContext.parallelize(List(
? ? ? ? ? ?new Student(1, "王盛芃", 1, 19),
? ? ? ? ? ?new Student(2, "李金宝", 1, 49),
? ? ? ? ? ?new Student(3, "张海波", 1, 39),
? ? ? ? ? ?new Student(4, "张文悦", 0, 29)
? ? ? ))
? ? ? ?val sdf =spark.createDataFrame(stuRDD, classOf[Student])
? ? ? ?sdf.printSchema()
? ? ? ?sdf.show()
? }
RDD--->Dataset
def ?rdd2Dataset(spark:SparkSession): Unit = {
? ?val stuRDD = spark.sparkContext.parallelize(List(
? ? ? ?Student(1, "王盛芃", 1, 19),
? ? ? ?Student(2, "李金宝", 1, 49),
? ? ? ?Student(3, "张海波", 1, 39),
? ? ? ?Student(4, "张文悦", 0, 29)
? ))
? ?import spark.implicits._
? ?val ds:Dataset[Student] = spark.createDataset(stuRDD)
?
? ?ds.show()
}
case class Student(id:Int, name:String, gender:Int, age:Int)
在RDD转换为DataFrame和Dataset的时候可以有更加简单的方式
import spark.implicits._ rdd.toDF() rdd.toDS()
DataFrame--->RDD
val rdd:RDD[Row] = df.rdd
rdd.foreach(row => {
? ?// ? ? ? ? ? println(row)
? ?val id = row.getInt(0)
? ?val name = row.getString(1)
? ?val gender = row.getInt(2)
? ?val height = row.getAs[Double]("height")
? ?println(s"id=${id},name=$name,gender=$gender,height=$height")
})
DataFrame--->Dataset
无法直接将DataFrame转化为Dataset
Dataset --->RDD
val stuDS: Dataset[Student] = list2Dataset(spark) ? ? ? ?//dataset --> rdd val stuRDD:RDD[Student] = stuDS.rdd stuRDD.foreach(println)
Dataset--->DataFrame
val stuDS: Dataset[Student] = list2Dataset(spark) ? ? ? //dataset --->dataframe val df:DataFrame = stuDS.toDF() df.show()

Guff_hys_python数据结构,大数据开发学习,python实训项目-CSDN博客
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。 如若内容造成侵权/违法违规/事实不符,请联系我的编程经验分享网邮箱:veading@qq.com进行投诉反馈,一经查实,立即删除!