Spark Streaming之DStream转换操作

2023-12-14 17:36:29

一、无状态转化操作

1、普通的rdd操作

每个批次做自己单独的转化操作,没有状态的记录。批次就是RDD,微批次处理

private val wordCount: DStream[(String, Int)] = 
value.map(_.value()) .map((_, 1)) 
.reduceByKey(_ + _) 
wordCount.print()

?结果:实现wordCount

2、Transform

还有通用的Transform操作,即RDD->RDD其中的操作自定义,不局限于提供的map等基本算子

private val strData: DStream[String] = 
value.transform(rdd => 
    { 
        rdd.map( 
            _.value() + "lalala" 
        ) 
    }
) strData.print()

?结果:实现字符串拼接。中间的wordcount是之前的程序的打印

3、join

两个数据流的关联,两个数据流需要批次大小一致

应用:两个数据流传入的单词统计

启动两个主题生产数据

在hadoop01:
bin/kafka-console-producer.sh --broker-list hadoop01:9092 --topic mySparkTopic
在hadoop02:
bin/kafka-console-producer.sh --broker-list hadoop02:9092 --topic MySparkTopic

两个数据流的join

package SparkStream

import SparkSQL.UDF.UdfDemo.getStreamingContext
import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord}
import org.apache.log4j.{Level, Logger}
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.dstream.{DStream, InputDStream}
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}

object join  extends App {
 Logger.getLogger("org").setLevel(Level.ERROR)
 private val ssc: StreamingContext = getStreamingContext()
 ssc.checkpoint("cp") //需要设定检查点路径
 //定义 Kafka 参数
 val kafkaPara: Map[String, Object] = Map[String, Object](
  ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG ->"hadoop01:9092,hadoop02:9092,hadoop03:9092",
  ConsumerConfig.GROUP_ID_CONFIG -> "mySparkTopic",
  "key.deserializer" ->  "org.apache.kafka.common.serialization.StringDeserializer",
  "value.deserializer" ->  "org.apache.kafka.common.serialization.StringDeserializer"
 )
 //消费两个数据流
 private val value: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream[String, String](
  ssc,
  LocationStrategies.PreferConsistent,
  ConsumerStrategies.Subscribe[String, String](Set("mySparkTopic"), kafkaPara)
 )
 private val value1: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream[String, String](
  ssc,
  LocationStrategies.PreferConsistent,
  ConsumerStrategies.Subscribe[String, String](Set("MySparkTopic"), kafkaPara)
 )

 //两个数据流关联
 private val joinResult: DStream[(String, (String, String))] =
  value.map(data=>(data.value(),"ni")).join(
   value1.map(data=>(data.value(),"wo"))
  )
 joinResult.print()
 ssc.start()
 ssc.awaitTermination()
}

结果:两个主题生产的数据进行join

二、有状态转化操作?

1、updateStateByKey

在历史状态的基础上进行更新,比如刚才的wordCount是每个周期的统计,是无状态转化。

有状态转化就是更新上一个周期的结果,对整个的单词进行统计,不仅仅是那一个周期

private val updateStateResult: DStream[(String, Int)] = value.map(_.value())
 .map((_, 1)).updateStateByKey(
 (seq: Seq[Int], op: Option[Int]) => {
  Option(op.getOrElse(0) + seq.sum)
 }
)
updateStateResult.print()
/*
 seq是这一批次里的rdd的value
 op 是缓冲区的值
 Option[A] 是一个类型为 A 的可选值的容器:
 如果值存在, Option[A] 就是一个 Some[A]
  如果不存在, Option[A] 就是对象 None。Some与None是Option的两个子类
*/

代码操作练习:

package SparkStream

import SparkSQL.UDF.UdfDemo.getStreamingContext
import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord}
import org.apache.log4j.{Level, Logger}
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.dstream.{DStream, InputDStream}
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}

object MyReceiverStreaming  extends App {
 Logger.getLogger("org").setLevel(Level.ERROR)
 private val ssc: StreamingContext = getStreamingContext()
 ssc.checkpoint("cp") //需要设定检查点路径
 //定义 Kafka 参数
 val kafkaPara: Map[String, Object] = Map[String, Object](
  ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG ->"hadoop01:9092,hadoop02:9092,hadoop03:9092",
  ConsumerConfig.GROUP_ID_CONFIG -> "mySparkTopic",
  "key.deserializer" ->  "org.apache.kafka.common.serialization.StringDeserializer",
  "value.deserializer" ->  "org.apache.kafka.common.serialization.StringDeserializer"
 )
 
 private val value: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream[String, String](
  ssc,
  LocationStrategies.PreferConsistent,
  ConsumerStrategies.Subscribe[String, String](Set("mySparkTopic"), kafkaPara)
 )
 private val wordCount: DStream[(String, Int)] =
  value.map(_.value())
   .map((_, 1))
   .reduceByKey(_ + _)
 //wordCount.print()
 //以上为无状态转化的操作,数据没有累加
 private val updateStateResult: DStream[(String, Int)] = value.map(_.value())
  .map((_, 1)).updateStateByKey(
  (seq: Seq[Int], op: Option[Int]) => {
   Option(op.getOrElse(0) + seq.sum)
  }
 )
 updateStateResult.print()
 /*
  seq是这一批次里的rdd的value
  op 是缓冲区的值
  Option[A] 是一个类型为 A 的可选值的容器:
  如果值存在, Option[A] 就是一个 Some[A]
   如果不存在, Option[A] 就是对象 None。Some与None是Option的两个子类
 */
 private val strData: DStream[String] = value.transform(rdd => {
  rdd.map(
   _.value() + "lalala"
  )
 })
 strData.print()
 ssc.start()
 ssc.awaitTermination()
}

三、窗口操作WindowOperations

Window Operations 可以设置窗口的大小和滑动窗口的间隔来动态的获取当前 Steaming 的允许

状态。所有基于窗口的操作都需要两个参数,分别为窗口时长以及滑动步长。

? 窗口时长:计算内容的时间范围;

? 滑动步长:隔多久触发一次计算。

注意:这两者都必须为采集周期大小的整数倍。

(1)window

(1)window(windowLength, slideInterval):

窗口长度为三秒,六秒间隔

value.map(data=>(data.value(),1))
    .window(Seconds(3),Seconds(6))
    .reduceByKey(_+_).print()

生产数据:

消费结果

窗口长度>滑步长度->有重复数据:

窗口长度:6s,滑步长度:3s,后3s的数据会在第一次计算和第二次计算中重复,第一次计算是1-6s的数据,第二次计算是4-9s的数据

窗口长度缺失数据:

窗口长度:3s,滑步长度:6s,3-6s的数据既不在第一次计算也不再第二次计算中,第一次计算是1-3s的数据,第二次计算是6-9s的数据

?(2)countByWindow

countByWindow(windowLength, slideInterval),对窗口内的元素做计数

相当于 window+count

//  (2)countByWindow(windowLength, slideInterval)
value.map(data=>(data.value(),1)).window(Seconds(6),Seconds(6)).count().print()
value.map(data=>(data.value(),1)).countByWindow(Seconds(6),Seconds(6)).print()

?两者结果相同

(3)reduceByWindow?

(3)reduceByWindow(func, windowLength, slideInterval),相当于window+reduce,将一个窗口内的数据聚合到一起,自定义聚合方式

//  (3)reduceByWindow(func, windowLength, slideInterval)
value.map(_.value()).reduceByWindow(
 (x,y)=>x+"---"+y
 ,Seconds(6),Seconds(6)).print()

?

(4)reduceByKeyAndWindow

reduceByKeyAndWindow(func, windowLength, slideInterval, [numTasks])

(5)reduceByKeyAndWindow

(5)reduceByKeyAndWindow(func, invFunc, windowLength, slideInterval, [numTasks])

相当于window+reduceByKey

两个函数参数,第一个是和后面新增的数据的操作,第二个是和前面减掉的数据的操作

比如累加,第一个就是相加,第二个就是x-y

比如字符串拼接,第一个就是当前字符串拼接后面的字符串,第二个就是当前字符串去掉前面的划窗划走的字符串

四、DStream 输出

类似于RDD的collect、foreach

输出操作指定了对流数据经转化操作得到的数据所要执行的操作(例如把结果推入外部数据库或输出到屏幕上)。

与 RDD 中的惰性求值类似,如果一个 DStream 及其派生出的 DStream 都没有被执行输出操作,那么这些 DStream 就都不会被求值。

如果 StreamingContext 中没有设定输出操作,整个 context 就都不会启动。

输出操作如下:

? print():在运行流程序的驱动结点上打印 DStream 中每一批次数据的最开始 10 个元素。

这用于开发和调试。在 Python API 中,同样的操作叫 print()。

? saveAsTextFiles(prefix, [suffix]):以 text 文件形式存储这个 DStream 的内容。

每一批次的存储文件名基于参数中的 prefix 和 suffix。”prefix-Time_IN_MS[.suffix]”。

? saveAsObjectFiles(prefix, [suffix]):以 Java 对象序列化的方式将 Stream 中的数据保存为SequenceFiles .

每一批次的存储文件名基于参数中的为"prefix-TIME_IN_MS[.suffix]". Python中目前不可用。

? saveAsHadoopFiles(prefix, [suffix]):将 Stream 中的数据保存为 Hadoop files.

每一批次的存储文件名基于参数中的为"prefix-TIME_IN_MS[.suffix]"。Python API 中目前不可用。

? foreachRDD(func):这是最通用的输出操作,即将函数 func 用于产生于 stream 的每一个RDD。

其中参数传入的函数 func 应该实现将每一个 RDD 中数据推送到外部系统,如将RDD 存入文件或者通过网络将其写入数据库。

通用的输出操作 foreachRDD(),它用来对 DStream 中的 RDD 运行任意计算。这和 transform() 有些类似,都可以让我们访问任意 RDD。

在 foreachRDD()中,可以重用我们在 Spark 中实现的所有行动操作。比如,常见的用例之一是把数据写到诸如 MySQL 的外部数据库中。

DStream 输出的具体练习见后续博客~

文章来源:https://blog.csdn.net/qq_40607631/article/details/132743262
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。