step2 实时分发数据到kafka的application的开发

2023-12-24 04:21:20

开发过程中代码会随着开发进度进行变更,解决bug和进行优化、功能新增等

最新的代码可以再Gitee中获取:SparkStreamingDemand: 完整的sparkStreaming项目开发代码记录

step2?实时分发数据到kafka的application的开发

模拟生成页面点击日志,传输到kafka

1、 单条数据准备

1.1 日志格式

时间 城市编码 城市名称 对应省份 用户ID 广告ID

1.2、系统时间

System.currentTimeMillis().toString

1.3、生成城市编码、城市名称、对应省份?

1.3.1 生成数据List

val cityInfoList: List[(String, String, String)] = List(
 ("0551", "合肥", "安徽"),
 ("0552", "蚌埠", "安徽"),
 ("0553", "芜湖", "安徽"),
 ("0554", "淮南", "安徽"),
 ("0555", "马鞍山", "安徽"),
 ("0556", "安庆", "安徽"),
 ("0557", "宿州", "安徽"),
 ("0558S", "阜阳", "安徽"),
 ("0558K", "亳州", "安徽"),
 ("0559", "黄山", "安徽"),
 ("0550", "滁州", "安徽"),
 ("0561", "淮北", "安徽"),
 ("0562", "铜陵", "安徽"),
 ("0563", "宣城", "安徽"),
 ("0564", "六安", "安徽"),
 ("0566", "池州", "安徽")
)

1.3.2 备用方案

?可以考虑加载数据文件封装成样例类到RDD中再collect成数组

可以考虑数据录入mysql表再读取?

1.3.3?随机获取的城市对应的

城市编码:cityInfoList(random.nextInt(15))._1

城市名称:cityInfoList(random.nextInt(15))._2

对应省份:cityInfoList(random.nextInt(15))._3

1.4 生成随机的用户id

->控制长度为8,全数字

//根据给定的长度返回随机数,可以自定义头尾,头尾默认为空
//length=8=>八个随机数拼成的字符串
		def randomIntList(head: String = "", tail: String = "", length: Int): String = {
			val random = new Random()
		var result=""+head
		for (i<- 1 to length){
			result+=random.nextInt(9)+1
		}
		result += tail
		result
	}

1.5 生成随机的广告地址

->长度为12,包含字母和字符串,数字权重为9,字母为1

	//返回自定义长度的随机字母+数字的字符串,可以自定义头尾,数字权重为9,字母为1
	def randomLetterAndIntList(head: String = "", tail: String = "",length: Int): String = {
		val letter: Array[String] = Array("a", "b", "c", "d", "e", "f", "g", "h", "r", "j", "k", "l", "m", "k", "o", "p", "q", "r", "s", "t", "u", "v", "w", "x", "y", "z")
		var result = "" + head
		val random = new Random()
		for (i <- 1 to length) {
			if (random.nextInt(9) < 8) {
				result += random.nextInt(9)
			}else{
				result += letter(random.nextInt(25))
			}
			
		}
		result+=tail
		result
	}

2 数据生成输送到kafka

2 .1 获取kafkaProductor

def createKafkaProducer(): KafkaProducer[String, String] = {
 val prop = new Properties()
 // 添加配置
 prop.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop01:9092")
 prop.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer")
 prop.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer")
 val producer: KafkaProducer[String, String] = new KafkaProducer[String, String](prop)
 producer
}

2.2 每隔三秒生成50条日志信息

def productDataToKafka(): Unit = {
 val producer: KafkaProducer[String, String] = dao.createKafkaProducer()
 while(true){
  productData.foreach(
   data=>{
    // 向Kafka中生成数据
    val record = new ProducerRecord[String, String]("mySparkTopic", data)
    producer.send(record)
    println(data)
   }
  )
  Thread.sleep(3000)
 }
}

3 程序运行

Day2到此结束,Day3将介绍实时从kafka获取数据的application的开发思路和模板~

Gitee地址:https://gitee.com/sawa0725/spark-streaming-demand

代码还在开发中~?

文章来源:https://blog.csdn.net/qq_40607631/article/details/132788836
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。