2024 1.9 Spark_SQL , 数据清洗API , 写出操作
目录
? ? ? ? 2. SparkSQL的shuffle分区设置
一. DataFrame 详解
? ? ? ? 1. 数据清洗API?
? ? ? ? 1.1 去重 :
????????????????DropDupilcates :
????????????????????????? init_df.dropDuplicates().show()
? ? ????????????????????????init_df.dropDuplicates(subset=["id","name"]).show()用来删除重复数据。如果没有指定参数subset,那么要比对行中的所有字段内容,
? ? ? ? ? ? 如果全部相同,就认为是重复数据,会被删除;如果有指定参数subset,那么只比对subset中指定的字段范围
? ? ? ? 1.2?去除空:
????????????????Dropna:
?????????????????????????????????init_df.dropna().show()
? ????????????????????????????????? init_df.dropna(subset=["id","name"]).show()????????????????????????????????????????init_df.dropna(thresh=2,subset=["name","age","address"]).show()
? ????????????????????????? init_df.dropna(thresh=2).show()????????????????????????????????dropna(thresh,subset):删除缺失值数据.
? ? ? ? ? ? 1- 如果不传递任何参数,只要有任意一个字段值为null,那么就删除整行数据
? ? ? ? ? ? 2- 如果只指定了subset,那么空值的检查,就只会限定在subset指定的范围内
? ? ? ? ? ? 3- 如果只指定了thresh,那么空值检查的这些字段中,至少需要有thresh(>=thresh)个字段的值不为空,才不会被删除
? ? ? ? 1.3?填充替换 :
? ? ? ? ? ? ? ? fillna :?
????????????????????????init_df.fillna(value=999).show()
? ????????????????????????? init_df.fillna(value=999,subset=["id","name"]).show()
? ? ????????????????????????init_df.fillna(value={"id":111,"name":"未知姓名","age":100,"address":"北京"}).show()?????????????????????????????????fillna(value,subset):替换缺失值数据
? ? ? ? ? ? value:必须要传递参数.是用来填充缺失值的
? ? ? ? ? ? subset:限定缺失值替换范围
? ? ? ? 注意:
? ? ? ? ? ? 1-value如果不是字典,那么只会替换字段类型匹配的空值
? ? ? ? ? ? 2-最常用的是value传递字典的形式
? ? ? ? 2. SparkSQL的shuffle分区设置
????????????????如何调整shuffle分区数量呢? spark.sql.shuffle.partitions
方案二(常用,推荐使用): 在客户端通过submit命令提交的时候, 动态设置shuffle的分区数量。部署、上线的时候、基于spark-submit提交运行的时候
?? ?./spark-submit --conf "spark.sql.shuffle.partitions=20"
?方案三(比较常用): 在代码中设置。主要在测试环境中使用, 但是一般在部署上线的时候, 会删除(写死)。优先级也是最高的。一般的使用场景是,当你的数据量未来不会发生太大的波动。
?? ? sparkSession.conf.set('spark.sql.shuffle.partitions',20)
? ? ? ? 3 . SparkSQL 数据写出操作
? ? ? ? ????????3.1 ?写出到文件系统
常用参数说明:
??? 1- path:指定结果数据输出路径。支持本地文件系统和HDFS文件系统
??? 2- mode:当输出目录中文件已经存在的时候处理办法
??????? 2.1- append:追加。如果文件已经存在,那么继续在该目录下产生新的文件
??????? 2.2- overwrite:覆盖。如果文件已经存在,那么就先将已有的文件清除,再写入进去
??????? 2.3- ignore:忽略。如果文件已经存在,那么不执行任何操作
??????? 2.4- error:报错。如果文件已经存在,那么直接报错。会报错AnalysisException: path file:xxx already exists.
??? 3- sep:字段间的分隔符
??? 4- header:数据输出的时候,是否要将字段名称输出到文件的第一行。推荐设置为True
??? 5- encoding:文件输出的编码方式
????????????????对应的简写API格式如下,以CSV为例:
init_df.write.csv(
? ? path='存储路径',
? ? mode='模式',
? ? header=True,
? ? sep='\001',
? ? encoding='UTF-8'
)
?????????????????# 数据输出到文件系统:复杂API
?
? ? ? ? 设置mode,需要单独调用mode()方法
? ? result.write\
? ? ? ? .format('json')\
? ? ? ? .mode("overwrite")\
? ? ? ? .option("encoding","UTF-8")\
? ? ? ? .save('存储路径')
? ? ? ? ? ? ? ? 3.2? 写出到数据库
?# 数据输出到数据
? ? ? ? 创建数据库命令:create database 库名 character set utf8;
? ? result.write.jdbc(
? ? ? ? url='jdbc:mysql://node1:3306/库名?useUnicode=true&characterEncoding=utf-8',
? ? ? ? table='表名',
? ? ? ? mode='append',
? ? ? ? properties={ 'user' : '用户名', 'password' : '密码' }
? ? )
?
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。 如若内容造成侵权/违法违规/事实不符,请联系我的编程经验分享网邮箱:veading@qq.com进行投诉反馈,一经查实,立即删除!