step2 实时分发数据到kafka的application的开发
2023-12-24 04:21:20
开发过程中代码会随着开发进度进行变更,解决bug和进行优化、功能新增等
最新的代码可以再Gitee中获取:SparkStreamingDemand: 完整的sparkStreaming项目开发代码记录
step2?实时分发数据到kafka的application的开发
模拟生成页面点击日志,传输到kafka
1、 单条数据准备
1.1 日志格式
时间 城市编码 城市名称 对应省份 用户ID 广告ID
1.2、系统时间
System.currentTimeMillis().toString
1.3、生成城市编码、城市名称、对应省份?
1.3.1 生成数据List
val cityInfoList: List[(String, String, String)] = List(
("0551", "合肥", "安徽"),
("0552", "蚌埠", "安徽"),
("0553", "芜湖", "安徽"),
("0554", "淮南", "安徽"),
("0555", "马鞍山", "安徽"),
("0556", "安庆", "安徽"),
("0557", "宿州", "安徽"),
("0558S", "阜阳", "安徽"),
("0558K", "亳州", "安徽"),
("0559", "黄山", "安徽"),
("0550", "滁州", "安徽"),
("0561", "淮北", "安徽"),
("0562", "铜陵", "安徽"),
("0563", "宣城", "安徽"),
("0564", "六安", "安徽"),
("0566", "池州", "安徽")
)
1.3.2 备用方案
?可以考虑加载数据文件封装成样例类到RDD中再collect成数组
可以考虑数据录入mysql表再读取?
1.3.3?随机获取的城市对应的
城市编码:cityInfoList(random.nextInt(15))._1
城市名称:cityInfoList(random.nextInt(15))._2
对应省份:cityInfoList(random.nextInt(15))._3
1.4 生成随机的用户id
->控制长度为8,全数字
//根据给定的长度返回随机数,可以自定义头尾,头尾默认为空
//length=8=>八个随机数拼成的字符串
def randomIntList(head: String = "", tail: String = "", length: Int): String = {
val random = new Random()
var result=""+head
for (i<- 1 to length){
result+=random.nextInt(9)+1
}
result += tail
result
}
1.5 生成随机的广告地址
->长度为12,包含字母和字符串,数字权重为9,字母为1
//返回自定义长度的随机字母+数字的字符串,可以自定义头尾,数字权重为9,字母为1
def randomLetterAndIntList(head: String = "", tail: String = "",length: Int): String = {
val letter: Array[String] = Array("a", "b", "c", "d", "e", "f", "g", "h", "r", "j", "k", "l", "m", "k", "o", "p", "q", "r", "s", "t", "u", "v", "w", "x", "y", "z")
var result = "" + head
val random = new Random()
for (i <- 1 to length) {
if (random.nextInt(9) < 8) {
result += random.nextInt(9)
}else{
result += letter(random.nextInt(25))
}
}
result+=tail
result
}
2 数据生成输送到kafka
2 .1 获取kafkaProductor
def createKafkaProducer(): KafkaProducer[String, String] = {
val prop = new Properties()
// 添加配置
prop.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop01:9092")
prop.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer")
prop.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer")
val producer: KafkaProducer[String, String] = new KafkaProducer[String, String](prop)
producer
}
2.2 每隔三秒生成50条日志信息
def productDataToKafka(): Unit = {
val producer: KafkaProducer[String, String] = dao.createKafkaProducer()
while(true){
productData.foreach(
data=>{
// 向Kafka中生成数据
val record = new ProducerRecord[String, String]("mySparkTopic", data)
producer.send(record)
println(data)
}
)
Thread.sleep(3000)
}
}
3 程序运行
Day2到此结束,Day3将介绍实时从kafka获取数据的application的开发思路和模板~
Gitee地址:https://gitee.com/sawa0725/spark-streaming-demand
代码还在开发中~?
文章来源:https://blog.csdn.net/qq_40607631/article/details/132788836
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。 如若内容造成侵权/违法违规/事实不符,请联系我的编程经验分享网邮箱:veading@qq.com进行投诉反馈,一经查实,立即删除!
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。 如若内容造成侵权/违法违规/事实不符,请联系我的编程经验分享网邮箱:veading@qq.com进行投诉反馈,一经查实,立即删除!