Spark常见算子汇总
2023-12-13 14:42:57
创建RDD
在Spark中创建RDD的方式分为三种:
- 从外部存储创建RDD
- 从集合中创建RDD
- 从其他RDD创建
textfile
调用SparkContext.textFile()方法,从外部存储中读取数据来创建 RDD
parallelize
调用SparkContext 的 parallelize()方法,将一个存在的集合,变成一个RDD
makeRDD
方法一
/** Distribute a local Scala collection to form an RDD.
*
* This method is identical to `parallelize`.
*/
def makeRDD[T: ClassTag](
seq: Seq[T],
numSlices: Int = defaultParallelism): RDD[T] = withScope {
parallelize(seq, numSlices)
}
方法二:分配一个本地Scala集合形成一个RDD,为每个集合对象创建一个最佳分区。
/**
* Distribute a local Scala collection to form an RDD, with one or more
* location preferences (hostnames of Spark nodes) for each object.
* Create a new partition for each collection item.
*/
def makeRDD[T: ClassTag](seq: Seq[(T, Seq[String])]): RDD[T] = withScope {
assertNotStopped()
val indexToPrefs = seq.zipWithIndex.map(t => (t._2, t._1._2)).toMap
new ParallelCollectionRDD[T](this, seq.map(_._1), math.max(seq.size, 1), indexToPrefs)
}
?举例
scala> val rdd = sc.parallelize(1 to 6, 2)
val rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[2] at parallelize at <console>:1
scala> rdd.collect()
val res4: Array[Int] = Array(1, 2, 3, 4, 5, 6)
scala> val seq = List(("American Person", List("Tom", "Jim")), ("China Person", List("LiLei", "HanMeiMei")), ("Color Type", List("Red", "Blue")))
val seq: List[(String, List[String])] = List((American Person,List(Tom, Jim)), (China Person,List(LiLei, HanMeiMei)), (Color Type,List(Red, Blue)))
scala> val rdd2 = sc.makeRDD(seq)
val rdd2: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[0] at makeRDD at <console>:1
scala> rdd2.partitions.size
val res0: Int = 3
scala> rdd2.foreach(println)
American Person
Color Type
China Person
scala> val rdd1 = sc.parallelize(seq)
val rdd1: org.apache.spark.rdd.RDD[(String, List[String])] = ParallelCollectionRDD[1] at parallelize at <console>:1
scala> rdd1.partitions.size
val res1: Int = 2
scala> rdd2.collect()
val res2: Array[String] = Array(American Person, China Person, Color Type)
scala> rdd1.collect()
val res3: Array[(String, List[String])] = Array((American Person,List(Tom, Jim)), (China Person,List(LiLei, HanMeiMei)), (Color Type,List(Red, Blue)))
scala> var lines = sc.textFile("/root/tmp/a.txt",3)
var lines: org.apache.spark.rdd.RDD[String] = /root/tmp/a.txt MapPartitionsRDD[4] at textFile at <console>:1
scala> lines.collect()
val res6: Array[String] = Array(a,b,c)
scala> lines.partitions.size
val res7: Int = 3
转换算子
flatMap
map
mapPartitions
map与mapPartitions的区别
map: 比如一个partition中有1万条数据;那么你的function要执行和计算1万次。
MapPartitions:一个task仅仅会执行一次function,function一次接收所有的partition数据。只要执行一次就可以了,性能比较高。
reduceByKey
groupByKey
举例
scala> var lines = sc.textFile("/root/tmp/a.txt",3)
var lines: org.apache.spark.rdd.RDD[String] = /root/tmp/a.txt MapPartitionsRDD[13] at textFile at <console>:1
scala> lines.flatMap(x=>x.split(",")).map(x=>(x,1)).reduceByKey((a,b)=>a+b).foreach(println)
(c,2)
(b,1)
(d,1)
(a,2)
scala> lines.collect()
val res27: Array[String] = Array(a,b,c, c, a,d)
scala> lines.map(_.split(",")).collect()
val res25: Array[Array[String]] = Array(Array(a, b, c), Array(c), Array(a, d))
scala> lines.flatMap(_.split(",")).collect()
val res26: Array[String] = Array(a, b, c, c, a, d)
行动算子
groupByKey
reduceByKey
aggregateByKey
RDD 的话建议使用 reduceByKey 或者 aggregateByKey 算子来替代掉 groupByKey 算子。 因为 reduceByKey 和 aggregateByKey 算子都会使用用户自定义的函数对每个节点本地的相 同 key 进行预聚合。而 groupByKey 算子是不会进行预聚合的,全量的数据会在集群的各个 节点之间分发和传输,性能相对来说比较差。
SparkSQL 本身的 HashAggregte 就会实现本地预聚合+全局聚合。
重分区算子
coalesce
repartition?
文章来源:https://blog.csdn.net/weixin_40035038/article/details/134825780
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。 如若内容造成侵权/违法违规/事实不符,请联系我的编程经验分享网邮箱:veading@qq.com进行投诉反馈,一经查实,立即删除!
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。 如若内容造成侵权/违法违规/事实不符,请联系我的编程经验分享网邮箱:veading@qq.com进行投诉反馈,一经查实,立即删除!