RDD算子——Action 操作
-
reduce
-
reduce
和reduceByKey
有什么区别:-
reduce
是一个 Action 算子,reduceByKey
是一个转换算子 -
假设一个 RDD 里面有一万条数据,大部分 Key 是相同的,有十个不同的 Key。
rdd.reduceByKey
生成 10 条数据 而rdd.reduce
生成一条数据reduceByKey
本质上是先按照Key分组,然后把每组聚合reduce
是针对于一整个数据集来进行聚合 -
reduceByKey
是针对 KV 型数据来进行计算,reduce 可以针对所有类型的数据进行计算
-
-
reduce 算子是一个shuffle操作吗?
-
Shuffle 操作分为 mapper 和 reducer,mapper 将数据放入 paritioner 的函数计算,求得分为哪个 reducer,后分到对应的 reducer 中
-
reduce
操作并没有 mapper 和 reducer, 因为reduce
算子会作用于 RDD 中的每一个分区,然后在分区上求得局部结果,最终汇总到 Driver 中求得最终结果
-
-
RDD中有五大属性,Paritioner 在 Shufle 过程中使用 Partitioner 只有 KV 型的 RDD 才有
-
作用
-
对整个结果集规约, 最终生成一条数据, 是整个数据集的汇总
-
-
调用
-
reduce( (currValue[T], agg[T]) ? T )
-
-
注意点
-
reduce 和 reduceByKey 是完全不同的, reduce 是一个 action, 并不是 Shuffled 操作
-
本质上 reduce 就是现在每个 partition 上求值, 最终把每个 partition 的结果再汇总
-
-
code
@Test def reduce(): Unit = { ?//注意,函数中传入的curr不是value而是整条数据:("手机", 10)。reduce整体上的结果, 只有一个 ?//生成的结果类型是(“商品”,price) ?val rdd = sc.parallelize(Seq(("手机", 10), ("手机", 15), ("电脑", 20))) ?val rdd1 = rdd.reduce((curr, agg)=>("总价",curr._2+agg._2)) // agg 是局部结果,("总价",curr._2+agg)用curr._2+agg._2求总和 ?println(rdd1) }
-
-
foreach
-
code
@Test def foreach(): Unit = { ?// //注意点。item的收集是一个异步的过程,并行执行。所以结果可能不是按顺序来的 ?val rdd = sc.parallelize(Seq(1, 2, 3)) ?rdd.foreach(item => println(item)) }
-
-
count
和countByKey
(两个都是求数量的)/* ?* count 和 countByKey 的结果相聚很远,每次调用Action都会生成一个job,job会运行获取结果 ?* 所以在两个job之间有大量的log打出,其实就是在启动job ?* ?* countByKey 的运行结果是 Map(key,value 对应的是 Key的 count) ?* 如果要解决数据倾斜的问题,可以通过 countByKey 查看 key 对应的数据总数,从而解决倾斜 ?* */ @Test def count(): Unit = { ?val rdd = sc.parallelize(Seq(("a",1),("b",2),("c",3),("a",4))) ?println(rdd.count()) // 4 ?println(rdd.countByKey()) // Map(a -> 2, b -> 1, c -> 1) }
-
first
(只获取第一个元素)take
(获取前几个元素)takesample
(直接拿到结果和之前的sample类似)/* ?* first take takesample ?* 注意点: ?* take按顺序获取。takesample是采样获取 ?* first:一般情况下,action会从所有的分区获取数据,相对来说比较慢,但是first只会获取第一个元素。处理第一个分区,无需处理所有的数据比较快 ?* */ @Test def take(): Unit = { ?val rdd = sc.parallelize(Seq(1, 2, 3, 4, 5, 6)) ?rdd.take(3).foreach(item => println(item)) // 取前3个 ?println(rdd.first()) // 取第1个 ?rdd.takeSample(withReplacement = true, 3).foreach(item => println(item)) //withReplacement = true,有放回抽3个 }
-
collect
(以数组形式返回元素) -
总结
-
算子功能上进行分类:
-
转换算子transformation
-
动作算子Action
-
-
RDD中存放的数据类型:
-
基本类型 String 对象
-
KV 类型
-
数字类型
-
-
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。 如若内容造成侵权/违法违规/事实不符,请联系我的编程经验分享网邮箱:veading@qq.com进行投诉反馈,一经查实,立即删除!