RDD算子——Action 操作

2024-01-09 11:38:04
  • reduce

    • reducereduceByKey 有什么区别:

      1. reduce 是一个 Action 算子,reduceByKey 是一个转换算子

      2. 假设一个 RDD 里面有一万条数据,大部分 Key 是相同的,有十个不同的 Key。

        rdd.reduceByKey 生成 10 条数据 而rdd.reduce 生成一条数据

        reduceByKey本质上是先按照Key分组,然后把每组聚合

        reduce 是针对于一整个数据集来进行聚合

      3. reduceByKey 是针对 KV 型数据来进行计算,reduce 可以针对所有类型的数据进行计算

    • reduce 算子是一个shuffle操作吗?

      • Shuffle 操作分为 mapper 和 reducer,mapper 将数据放入 paritioner 的函数计算,求得分为哪个 reducer,后分到对应的 reducer 中

      • reduce 操作并没有 mapper 和 reducer, 因为 reduce 算子会作用于 RDD 中的每一个分区,然后在分区上求得局部结果,最终汇总到 Driver 中求得最终结果

    • RDD中有五大属性,Paritioner 在 Shufle 过程中使用 Partitioner 只有 KV 型的 RDD 才有

    • 作用

      • 对整个结果集规约, 最终生成一条数据, 是整个数据集的汇总

    • 调用

      • reduce( (currValue[T], agg[T]) ? T )

    • 注意点

      • reduce 和 reduceByKey 是完全不同的, reduce 是一个 action, 并不是 Shuffled 操作

      • 本质上 reduce 就是现在每个 partition 上求值, 最终把每个 partition 的结果再汇总

    • code

      @Test
      def reduce(): Unit = {
       ?//注意,函数中传入的curr不是value而是整条数据:("手机", 10)。reduce整体上的结果, 只有一个
       ?//生成的结果类型是(“商品”,price)
       ?val rdd = sc.parallelize(Seq(("手机", 10), ("手机", 15), ("电脑", 20)))
       ?val rdd1 = rdd.reduce((curr, agg)=>("总价",curr._2+agg._2)) // agg 是局部结果,("总价",curr._2+agg)用curr._2+agg._2求总和
       ?println(rdd1)
      }
  • foreach

    • code

      @Test
      def foreach(): Unit = {
       ?// //注意点。item的收集是一个异步的过程,并行执行。所以结果可能不是按顺序来的
       ?val rdd = sc.parallelize(Seq(1, 2, 3))
       ?rdd.foreach(item => println(item))
      }
  • countcountByKey(两个都是求数量的)

    /*
     ?* count 和 countByKey 的结果相聚很远,每次调用Action都会生成一个job,job会运行获取结果
     ?* 所以在两个job之间有大量的log打出,其实就是在启动job
     ?*
     ?* countByKey 的运行结果是 Map(key,value 对应的是 Key的 count)
     ?* 如果要解决数据倾斜的问题,可以通过 countByKey 查看 key 对应的数据总数,从而解决倾斜
     ?* */
    @Test
    def count(): Unit = {
     ?val rdd = sc.parallelize(Seq(("a",1),("b",2),("c",3),("a",4)))
     ?println(rdd.count()) // 4
     ?println(rdd.countByKey()) // Map(a -> 2, b -> 1, c -> 1)
    }

  • first(只获取第一个元素) take(获取前几个元素)takesample(直接拿到结果和之前的sample类似)

    /*
     ?* first take takesample
     ?* 注意点:
     ?* take按顺序获取。takesample是采样获取
     ?* first:一般情况下,action会从所有的分区获取数据,相对来说比较慢,但是first只会获取第一个元素。处理第一个分区,无需处理所有的数据比较快
     ?* */
    @Test
    def take(): Unit = {
     ?val rdd = sc.parallelize(Seq(1, 2, 3, 4, 5, 6))
     ?rdd.take(3).foreach(item => println(item)) // 取前3个
     ?println(rdd.first()) // 取第1个
     ?rdd.takeSample(withReplacement = true, 3).foreach(item => println(item))
     //withReplacement = true,有放回抽3个
    }

  1. collect (以数组形式返回元素)

  2. 总结

    1. 算子功能上进行分类:

      1. 转换算子transformation

      2. 动作算子Action

    2. RDD中存放的数据类型:

      1. 基本类型 String 对象

      2. KV 类型

      3. 数字类型

文章来源:https://blog.csdn.net/m0_56181660/article/details/135467230
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。