Spark Streaming之DStream转换操作
一、无状态转化操作
1、普通的rdd操作
每个批次做自己单独的转化操作,没有状态的记录。批次就是RDD,微批次处理
private val wordCount: DStream[(String, Int)] =
value.map(_.value()) .map((_, 1))
.reduceByKey(_ + _)
wordCount.print()
?结果:实现wordCount
2、Transform
还有通用的Transform操作,即RDD->RDD其中的操作自定义,不局限于提供的map等基本算子
private val strData: DStream[String] =
value.transform(rdd =>
{
rdd.map(
_.value() + "lalala"
)
}
) strData.print()
?结果:实现字符串拼接。中间的wordcount是之前的程序的打印
3、join
两个数据流的关联,两个数据流需要批次大小一致
应用:两个数据流传入的单词统计
启动两个主题生产数据
在hadoop01:
bin/kafka-console-producer.sh --broker-list hadoop01:9092 --topic mySparkTopic
在hadoop02:
bin/kafka-console-producer.sh --broker-list hadoop02:9092 --topic MySparkTopic
两个数据流的join
package SparkStream
import SparkSQL.UDF.UdfDemo.getStreamingContext
import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord}
import org.apache.log4j.{Level, Logger}
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.dstream.{DStream, InputDStream}
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}
object join extends App {
Logger.getLogger("org").setLevel(Level.ERROR)
private val ssc: StreamingContext = getStreamingContext()
ssc.checkpoint("cp") //需要设定检查点路径
//定义 Kafka 参数
val kafkaPara: Map[String, Object] = Map[String, Object](
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG ->"hadoop01:9092,hadoop02:9092,hadoop03:9092",
ConsumerConfig.GROUP_ID_CONFIG -> "mySparkTopic",
"key.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer",
"value.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer"
)
//消费两个数据流
private val value: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream[String, String](
ssc,
LocationStrategies.PreferConsistent,
ConsumerStrategies.Subscribe[String, String](Set("mySparkTopic"), kafkaPara)
)
private val value1: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream[String, String](
ssc,
LocationStrategies.PreferConsistent,
ConsumerStrategies.Subscribe[String, String](Set("MySparkTopic"), kafkaPara)
)
//两个数据流关联
private val joinResult: DStream[(String, (String, String))] =
value.map(data=>(data.value(),"ni")).join(
value1.map(data=>(data.value(),"wo"))
)
joinResult.print()
ssc.start()
ssc.awaitTermination()
}
结果:两个主题生产的数据进行join
二、有状态转化操作?
1、updateStateByKey
在历史状态的基础上进行更新,比如刚才的wordCount是每个周期的统计,是无状态转化。
有状态转化就是更新上一个周期的结果,对整个的单词进行统计,不仅仅是那一个周期
private val updateStateResult: DStream[(String, Int)] = value.map(_.value())
.map((_, 1)).updateStateByKey(
(seq: Seq[Int], op: Option[Int]) => {
Option(op.getOrElse(0) + seq.sum)
}
)
updateStateResult.print()
/*
seq是这一批次里的rdd的value
op 是缓冲区的值
Option[A] 是一个类型为 A 的可选值的容器:
如果值存在, Option[A] 就是一个 Some[A]
如果不存在, Option[A] 就是对象 None。Some与None是Option的两个子类
*/
代码操作练习:
package SparkStream
import SparkSQL.UDF.UdfDemo.getStreamingContext
import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord}
import org.apache.log4j.{Level, Logger}
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.dstream.{DStream, InputDStream}
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}
object MyReceiverStreaming extends App {
Logger.getLogger("org").setLevel(Level.ERROR)
private val ssc: StreamingContext = getStreamingContext()
ssc.checkpoint("cp") //需要设定检查点路径
//定义 Kafka 参数
val kafkaPara: Map[String, Object] = Map[String, Object](
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG ->"hadoop01:9092,hadoop02:9092,hadoop03:9092",
ConsumerConfig.GROUP_ID_CONFIG -> "mySparkTopic",
"key.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer",
"value.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer"
)
private val value: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream[String, String](
ssc,
LocationStrategies.PreferConsistent,
ConsumerStrategies.Subscribe[String, String](Set("mySparkTopic"), kafkaPara)
)
private val wordCount: DStream[(String, Int)] =
value.map(_.value())
.map((_, 1))
.reduceByKey(_ + _)
//wordCount.print()
//以上为无状态转化的操作,数据没有累加
private val updateStateResult: DStream[(String, Int)] = value.map(_.value())
.map((_, 1)).updateStateByKey(
(seq: Seq[Int], op: Option[Int]) => {
Option(op.getOrElse(0) + seq.sum)
}
)
updateStateResult.print()
/*
seq是这一批次里的rdd的value
op 是缓冲区的值
Option[A] 是一个类型为 A 的可选值的容器:
如果值存在, Option[A] 就是一个 Some[A]
如果不存在, Option[A] 就是对象 None。Some与None是Option的两个子类
*/
private val strData: DStream[String] = value.transform(rdd => {
rdd.map(
_.value() + "lalala"
)
})
strData.print()
ssc.start()
ssc.awaitTermination()
}
三、窗口操作WindowOperations
Window Operations 可以设置窗口的大小和滑动窗口的间隔来动态的获取当前 Steaming 的允许
状态。所有基于窗口的操作都需要两个参数,分别为窗口时长以及滑动步长。
? 窗口时长:计算内容的时间范围;
? 滑动步长:隔多久触发一次计算。
注意:这两者都必须为采集周期大小的整数倍。
(1)window
(1)window(windowLength, slideInterval):
窗口长度为三秒,六秒间隔
value.map(data=>(data.value(),1))
.window(Seconds(3),Seconds(6))
.reduceByKey(_+_).print()
生产数据:
消费结果
窗口长度>滑步长度->有重复数据:
窗口长度:6s,滑步长度:3s,后3s的数据会在第一次计算和第二次计算中重复,第一次计算是1-6s的数据,第二次计算是4-9s的数据
窗口长度缺失数据:
窗口长度:3s,滑步长度:6s,3-6s的数据既不在第一次计算也不再第二次计算中,第一次计算是1-3s的数据,第二次计算是6-9s的数据
?(2)countByWindow
countByWindow(windowLength, slideInterval),对窗口内的元素做计数
相当于 window+count
// (2)countByWindow(windowLength, slideInterval)
value.map(data=>(data.value(),1)).window(Seconds(6),Seconds(6)).count().print()
value.map(data=>(data.value(),1)).countByWindow(Seconds(6),Seconds(6)).print()
?两者结果相同
(3)reduceByWindow?
(3)reduceByWindow(func, windowLength, slideInterval),相当于window+reduce,将一个窗口内的数据聚合到一起,自定义聚合方式
// (3)reduceByWindow(func, windowLength, slideInterval)
value.map(_.value()).reduceByWindow(
(x,y)=>x+"---"+y
,Seconds(6),Seconds(6)).print()
?
(4)reduceByKeyAndWindow
reduceByKeyAndWindow(func, windowLength, slideInterval, [numTasks])
(5)reduceByKeyAndWindow
(5)reduceByKeyAndWindow(func, invFunc, windowLength, slideInterval, [numTasks])
相当于window+reduceByKey
两个函数参数,第一个是和后面新增的数据的操作,第二个是和前面减掉的数据的操作
比如累加,第一个就是相加,第二个就是x-y
比如字符串拼接,第一个就是当前字符串拼接后面的字符串,第二个就是当前字符串去掉前面的划窗划走的字符串
四、DStream 输出
类似于RDD的collect、foreach
输出操作指定了对流数据经转化操作得到的数据所要执行的操作(例如把结果推入外部数据库或输出到屏幕上)。
与 RDD 中的惰性求值类似,如果一个 DStream 及其派生出的 DStream 都没有被执行输出操作,那么这些 DStream 就都不会被求值。
如果 StreamingContext 中没有设定输出操作,整个 context 就都不会启动。
输出操作如下:
? print():在运行流程序的驱动结点上打印 DStream 中每一批次数据的最开始 10 个元素。
这用于开发和调试。在 Python API 中,同样的操作叫 print()。
? saveAsTextFiles(prefix, [suffix]):以 text 文件形式存储这个 DStream 的内容。
每一批次的存储文件名基于参数中的 prefix 和 suffix。”prefix-Time_IN_MS[.suffix]”。
? saveAsObjectFiles(prefix, [suffix]):以 Java 对象序列化的方式将 Stream 中的数据保存为SequenceFiles .
每一批次的存储文件名基于参数中的为"prefix-TIME_IN_MS[.suffix]". Python中目前不可用。
? saveAsHadoopFiles(prefix, [suffix]):将 Stream 中的数据保存为 Hadoop files.
每一批次的存储文件名基于参数中的为"prefix-TIME_IN_MS[.suffix]"。Python API 中目前不可用。
? foreachRDD(func):这是最通用的输出操作,即将函数 func 用于产生于 stream 的每一个RDD。
其中参数传入的函数 func 应该实现将每一个 RDD 中数据推送到外部系统,如将RDD 存入文件或者通过网络将其写入数据库。
通用的输出操作 foreachRDD(),它用来对 DStream 中的 RDD 运行任意计算。这和 transform() 有些类似,都可以让我们访问任意 RDD。
在 foreachRDD()中,可以重用我们在 Spark 中实现的所有行动操作。比如,常见的用例之一是把数据写到诸如 MySQL 的外部数据库中。
DStream 输出的具体练习见后续博客~
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。 如若内容造成侵权/违法违规/事实不符,请联系我的编程经验分享网邮箱:veading@qq.com进行投诉反馈,一经查实,立即删除!