SparkStreaming常见transformation算子
1.4 SparkStreaming常见transformation算子
1.4.1 常见的算子操作
对cogroup做一简单的说明:cogroup就是groupByKey的另外一种变体,groupByKey是操作一个K-V键值对,而cogroup一次操作两个,有点像join,不同之处在于返回值结果:
#cogroup() val ds1:DStream[(K, V)] val ds2:DStream[(K, w)] val cg:DStream[(K, (Iterable[V], Iterable[W]))] = ds1.cogroup(ds2) #groupByKey() val gp:DStream[(K, Iterable[V])] = ds1.groupByKey()
1.4.2 updateStateByKey
updateStateByKey(func) 根据于key的前置状态和key的新值,对key进行更新,返回一个新状态的Dstream。 ? 人话:统计截止到目前为止key的状态。 ? 通过分析,我们需要清楚:在这个操作中需要两个数据,一个是key的前置状态,一个是key的新增(当前批次的数据);还有历史数据(前置状态)得需要存储在磁盘,不应该保存在内存中。 ? 同时key的前置状态可能有可能没有。
案例实现——wordcount
代码实现
package com.qianfeng.sparkstreaming ? import org.apache.spark.SparkConf import org.apache.spark.streaming.dstream.DStream import org.apache.spark.streaming.{Seconds, StreamingContext} ? /** * 有状态计算 */ object Demo04_WCWithUpdateStataByKey { ?def main(args: Array[String]): Unit = { ? ?val conf = new SparkConf() ? ? .setAppName("WordCountUpdateStateByKey") ? ? .setMaster("local[*]") ? ? ?val duration = Seconds(5) ? ?val ssc = new StreamingContext(conf, duration) ? ?ssc.checkpoint("/Users/liyadong/data/sparkdata/streamingdata/chk1") ?//checkpoint监测点设置 ? ?val lines:DStream[String] = ssc.socketTextStream("qianfeng01", 6666) ? ?//数据加工 ? ?val pairs:DStream[(String, Int)] = lines.flatMap(_.split("\\s+")).map((_, 1)) ? ?//有状态累加 ? ?val usb:DStream[(String, Int)] = pairs.updateStateByKey(updateFunc) ? ?usb.print() ? ?//启动流程序 ? ?ssc.start() ? ?ssc.awaitTermination() } ? ?/* ? ? ?状态更新函数 ? ? ?根据key的前置状态和key的最新值,聚合得到截止到目前为止key的状态 ? ? ? ? ?seq:为当前key的对应值列表 ? ? ? ? ?option为key对应的历史值 ? */ ?def updateFunc(seq: Seq[Int], option: Option[Int]): Option[Int] = { ? ?println("option:" + option + ?"> seq: " + seq.mkString("[", ",", "]")) ? ?/* ? ? ? var sum = 0 ? ? ? ? ? ?for(i <- seq) sum += i- ? ? ? ? ? ?if(option.isDefined) { ? ? ? ? ? ? ? ?sum += option.get ? ? ? ? ? ?} ? ? ? ? ? ?Option(sum)*/ ? ?Option(seq.sum + option.getOrElse(0)) } }
Spark Streaming是Apache Spark的一个扩展模块,用于实现可扩展、高吞吐量和容错的实时数据流处理。在Spark Streaming中,常见的数据转换算子(transformation)用于对实时数据流进行各种高级操作,下面列举了一些常用的转换算子:
1. **map**:对DStream中的每个元素应用一个函数,返回一个由应用该函数后的元素组成的新DStream。
2. **flatMap**:类似于map,但是用于处理多维数据,如将一个DStream中的每个元素替换为另一个DStream。
3. **filter**:根据给定的条件筛选DStream中的元素,返回一个新的DStream,其中只包含符合条件的元素。
4. **reduceByKey**:将DStream中的元素按照某个键进行分组,然后对每个组内的元素应用一个reduce函数,以减少数据量。
5. **join**:将两个DStream按照某个共同的键进行连接,生成一个新的DStream,其中包含所有匹配的元素对。
6. **window**:对DStream中的元素进行滑动窗口分割,每个窗口内的元素可以应用各种计算,如reduce、count等。
7. **countByValue**:统计DStream中每个元素出现的次数。
8. **countByKey**:对DStream中的元素按照某个键进行分组,并统计每个组内的元素数量。
9. **saveAsTextFiles**:将DStream中的元素保存到文件系统中的文本文件中。
10. **foreachRDD**:对DStream中的每个RDD执行一个函数,该函数通常用于将数据输出到外部系统,如数据库或Redis。
这些算子可以组合使用,以构建复杂的流处理应用程序。通过这些算子的作用,Spark Streaming可以处理来自不同数据源的数据,如Kafka、Flume、ZeroMQ等,并将处理结果输出到不同的系统中,如HDFS、Redis或数据库。
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。 如若内容造成侵权/违法违规/事实不符,请联系我的编程经验分享网邮箱:veading@qq.com进行投诉反馈,一经查实,立即删除!