2024.1.6 Spark_Core 分词处理,RDD持久化,内核调度

2024-01-10 10:53:11

目录

一 .分词处理

二 . RDD持久化

?????????1. 使用缓存:

?????????2. RDD的checkpoint检查点:

????????3. 缓存和 checkpoint的区别:?

?三 . Spark内核调度

????????1.RDD依赖

? ? ? ? 2. DAG 和 Stage?

? ? ? ? 3.shuffle阶段

? ? ? ? ?4.JOB调度流程

? ? ? ? ?5. Spark RDD并行度


一 .分词处理

? ? ? ? 1.创建SparkContext对象

? ? ? ? 2.数据输入

? ? ? ? 3.数据处理

? ? ? ? 4.数据输出

? ? ? ? 5.释放资源

说明:
发现在数据中,并没有直接的关键词,关键词数据是包含在搜索词中,而且一个搜索词中包含了多个关键词,所有如何想基于关键词进行统计, 首先需求先拆分搜索词,获取关键词,思考:如何做呢?

借助第三方的分词工具实现中文分词
	Java语言:IK分词器
	Python语言:jieba(结巴)分词器

如何使用jieba分词器呢?
1- 需要在系统中安装jieba分词库: local模式只需要安装在node1即可 如果集群模式运行 需要各个节点都要安装
	安装命令:  pip install -i https://pypi.tuna.tsinghua.edu.cn/simple  jieba

2- 分词器使用
from pyspark import SparkConf, SparkContext
import os
import jieba

# 绑定指定的Python解释器
os.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'

if __name__ == '__main__':
    content = "鸡你太美蔡徐坤"

    # 精简分词模式
    print(list(jieba.cut(content)))

    # 全模式(切分会更加精细)
    print(list(jieba.cut(content, cut_all=True)))

    # 搜索引擎分词模式
    print(list(jieba.cut_for_search(content)))

二 . RDD持久化

?????????1. 使用缓存:

提升Spark程序的计算效率

rdd被重复使用,rdd的计算逻辑复杂,容错

RDD缓存主要是将数据存储在内存中,临时存储,不太稳定

使用了缓存,有向无环图会有个绿色的cache,?

使用缓存的代码?etl_rdd.persist(storageLevel:MEMORY_AND_DISK)? ,优先放到内存, 内存不够了再放到磁盘中

设置缓存的API:?
?? ?rdd.cache(): 将RDD的数据缓存储内存中
?? ?rdd.persist(缓存的级别/位置): 将RDD的数据存储在指定位置

?????????2. RDD的checkpoint检查点:

提升Spark 程序的容错性

RDD缓存主要是将数据存储在内存中,临时存储,不太稳定;?RDD 的检查点主要是将数据存储在HDFS上,是持久化存储 , 因为HDFS的三大机制让数据变的安全可靠

? ? ? ?对指定RDD启动checkpoint

? ? ? ? etl_rdd.checkpoint()? ?checkpoint设置后会将依赖效果丢弃掉

????????相关API:
?? ?sc.setCheckpointDir(存储路径): 设置checkpoint数据存放路径
?? ?rdd.checkpoint(): 对指定RDD启用checkpoint
?? ?rdd.count(): 触发checkpoint

????????3. 缓存和 checkpoint的区别:?

1- 数据存储位置不同
?? ?缓存: 存储在内存或者磁盘 或者 堆外内存中
?? ?checkpoint检查点: 可以将数据存储在磁盘或者HDFS上, 在集群模式下, 仅能保存到HDFS上

2- 数据生命周期:
?? ?缓存: 当程序执行完成后, 或者手动调用unpersist 缓存都会被删除
?? ?checkpoint检查点: 即使程序退出后, checkpoint检查点的数据依然是存在的, 不会删除, 需要手动删除

3- 血缘关系:
?? ?缓存: 不会截断RDD之间的血缘关系, 因为缓存数据有可能是失效, 当失效后, 需要重新回溯计算操作
?? ?checkpoint检查点: 会截断掉依赖关系, 因为checkpoint将数据保存到更加安全可靠的位置, 不会发生数据丢失的问题, 当执行失败的时候, 也不需要重新回溯执行
?? ?
4- 主要作用不同:
?? ?缓存: 提高Spark程序的运行效率
?? ?checkpoint检查点: 提高Spark程序的容错性

在一个项目中,推荐缓存和检查点配合使用, 在代码中先设置缓存, 再设置检查点, 然后再一同使用Action算子触发,推荐使用count算子

?三 . Spark内核调度

????????1.RDD依赖

?窄依赖 ,让spark程序并行计算 ,一个分区数据计算有问题,其他分区不受影响 , 父RDD 的分区和子RDD的分区关系是一对一的关系

宽依赖 , 也叫shuffle依赖 , 父RDD的一个分区被多个 子RDD的分区依赖 ,shuffle下游的其他操作,必须等待shuffle执行完成以后才能够继续执行

? ? ? ? 2. DAG 和 Stage?

Spark遇到一个Action算子,就会触发一个job任务, 一个job就会有一个有向无环图

一个DAG有向无环图 有多个Stage ,Stage的数量取决于宽依赖

一个Stage 有多个Task线程

一个RDD 有多个分区

一个分区 被一个Task线程所处理

DAG底层的形成

Stage内部的形成流程

Shuffle阶段的过程

? ? ? ? 3.shuffle阶段

????????MapReduce:?

????????sort shuffle : 普通机制和bypass机制??

?普通机制:每个上游Task线程处理数据,数据处理完以后,先放在内存中,接着对内存中的数据进行分区排序.将内存中的数据溢写到磁盘,形成一个个的小文件.写完后将多个小文件合并成为一个大的磁盘文件;并针对每个大的磁盘文件 ,会提供一个索引文件, 下游Task根据索引文件来读取相应的数据 ;?

bypass机制 : 在普通机制的基础上,省略了排序的过程?

bypass触发条件: 上游RDD的分区数量不能超过200个, 上游不能对数据进行提前聚合(提前聚合的话就会要分组,分组就会排序)

? ? ? ? ?4.JOB调度流程

1- Driver进程启动后,底层PY4J创建SparkContext顶级对象。在创建该对象的过程中,还会创建另外两个对象,分别是: DAGScheduler和TaskScheduler
?? ?DAGScheduler: DAG调度器。将Job任务形成DAG有向无环图和划分Stage的阶段
?? ?TaskScheduler: Task调度器。将Task线程分配给到具体的Executor执行

2- 一个Spark程序遇到一个Action算子就会触发产生一个Job任务。SparkContext将Job任务给到DAG调度器,拿到Job任务后,会将Job任务形成DAG有向无环图和划分Stage的阶段。并且会确定每个Stage阶段有多少个Task线程,会将众多的Task线程放到TaskSet的集合中。DAG调度器将TaskSet集合给到Task调度器

3- Task调度器拿到TaskSet集合以后,将Task分配给到给到具体的Executor执行。底层是基于SchedulerBackend调度队列来实现的。

4- Executor开始执行任务。并且Driver会监控各个Executor的执行状态,直到所有的Executor执行完成,就认为任务运行结束

5- 后续过程和之前一样

? ? ? ? ?5. Spark RDD并行度

说明: spark.default.parallelism该参数是SparkCore中的参数。该参数只会影响shuffle以后的分区数量。另外该参数对parallelize并行化本地集合创建的RDD不起作用。

文章来源:https://blog.csdn.net/m0_49956154/article/details/135421731
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。