RDD 特性——RDD 的分区和 Shuffle
目标
-
RDD 的分区操作
-
Shuffle 的原理
-
分区的作用
-
RDD 使用分区来分布式并行处理数据, 并且要做到尽量少的在不同的 Executor 之间使用网络交换数据, 所以当使用 RDD 读取数据的时候, 会尽量的在物理上靠近数据源, 比如说在读取 Cassandra 或者 HDFS 中数据的时候, 会尽量的保持 RDD 的分区和数据源的分区数, 分区模式等一 一对应
-
-
分区和 Shuffle 的关系
-
分区的主要作用是用来实现并行计算, 本质上和 Shuffle 没什么关系, 但是往往在进行数据处理的时候, 例如
reduceByKey
,groupByKey
等聚合操作, 需要把 Key 相同的 Value 拉取到一起进行计算, 这个时候因为这些 Key 相同的 Value 可能会坐落于不同的分区, 于是理解分区才能理解 Shuffle 的根本原理
-
-
Spark 中的 Shuffle 操作的特点
-
只有?
Key-Value
?型的 RDD 才会有 Shuffle 操作, 例如?RDD[(K, V)]
, 但是有一个特例, 就是?repartition
?算子可以对任何数据类型 Shuffle -
早期版本 Spark 的 Shuffle 算法是?
Hash base shuffle
, 后来改为?Sort base shuffle
, 更适合大吞吐量的场景
-
-
查看 RDD 分区
-
指定RDD分区
-
通过本地集合创建的时候指定分区数
-
通过读取文件创建的时候指定分区数
-
如何进行重分区 coalesce和repatition
-
coalesce
-
repatition
-
-
通过其他算子指定分区数
-
很多算子都可以指定分区数:
-
一般情况下设计 shuffle 操作的算子都运行指定分区数
-
一般这些算子,可以在最后一个参数的位置传入新的分区数
-
如果没有重新指定分区数,默认从父 RDD 中继承分区数
-
-
partitioner 分区函数 (一般算子的底层重载方法都有)
-
-
-
Shuffle 过程的简要说明
-
Shuffle 过程
val sourceRdd = sc.textFile("hdfs://master:9000/dataset/wordcount.txt") val flattenCountRdd = sourceRdd.flatMap(_.split (" "))map((_,1)) val aggCountRdd = flattenCountRdd.reduceByKey(_ + _) val result = aggCountRdd.collect ? result: Array[(String, Int)] = Array((spark,2), (hadoop,3), (flume,2))
reduceByKey
这个算子本质上就是先按照 Key 分组, 后对每一组数据进行?reduce
, 所面临的挑战就是 Key 相同的所有数据可能分布在不同的 Partition 分区中, 甚至可能在不同的节点中, 但是它们必须被共同计算.为了让来自相同 Key 的所有数据都在?
reduceByKey
的同一个?reduce
中处理, 需要执行一个?all-to-all
的操作, 需要在不同的节点(不同的分区)之间拷贝数据, 必须跨分区聚集相同 Key 的所有数据, 这个过程叫做?Shuffle
-
Shuffle 的原理
-
Spark 的 Shuffle 发展大致有两个阶段:?
Hash base shuffle
和?Sort base shuffle
partitioner
?用于计算一个数据应该发往哪个机器上hash base
和sort base
用于描述中间过程如何存放文件-
Hash base shuffle
大致的原理是分桶, 假设 Reducer 的个数为 R, 那么每个 Mapper 有 R 个桶, 按照 Key 的 Hash 将数据映射到不同的桶中, Reduce 找到每一个 Mapper 中对应自己的桶拉取数据.
假设 Mapper 的个数为 M, 整个集群的文件数量是?
M * R
, 如果有 1,000 个 Mapper 和 Reducer, 则会生成 1,000,000 个文件, 这个量非常大了.过多的文件会导致文件系统打开过多的文件描述符, 占用系统资源. 所以这种方式并不适合大规模数据的处理,只适合中等规模和小规模的数据处理, 在 Spark 1.2 版本中废弃了这种方式.
-
Sort base shuffle
对于 Sort base shuffle 来说, 每个 Map 侧的分区只有一个输出文件, Reduce 侧的 Task 来拉取, 大致流程如下
-
Map 侧将数据全部放入一个叫做 AppendOnlyMap 的组件中, 同时可以在这个特殊的数据结构中做聚合操作
-
然后通过一个类似于 MergeSort 的排序算法 TimSort 对 AppendOnlyMap 底层的 Array 排序
-
先按照 Partition ID 排序, 后按照 Key 的 HashCode 排序
-
-
最终每个 Map Task 生成一个 输出文件, Reduce Task 来拉取自己对应的数据
从上面可以得到结论, Sort base shuffle 确实可以大幅度减少所产生的中间文件, 从而能够更好的应对大吞吐量的场景, 在 Spark 1.2 以后, 已经默认采用这种方式.
但是需要大家知道的是, Spark 的 Shuffle 算法并不只是这一种, 即使是在最新版本, 也有三种 Shuffle 算法, 这三种算法对每个 Map 都只产生一个临时文件, 但是产生文件的方式不同, 一种是类似 Hash 的方式, 一种是刚才所说的 Sort, 一种是对 Sort 的一种优化(使用 Unsafe API 直接申请堆外内存)
-
-
-
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。 如若内容造成侵权/违法违规/事实不符,请联系我的编程经验分享网邮箱:veading@qq.com进行投诉反馈,一经查实,立即删除!