基于Flume+Kafka+Hbase+Flink+FineBI的实时综合案例(二)数据源
2023-12-13 05:58:46
04:数据源
-
目标:了解数据源的格式及实现模拟数据的生成
-
路径
- step1:数据格式
- step2:数据生成
-
实施
-
数据格式
消息时间 发件人昵称 发件人账号 发件人性别 发件人IP 发件人系统 发件人手机型号 发件人网络制式 发件人GPS 收件人昵称 收件人IP 收件人账号 收件人系统 收件人手机型号 收件人网络制式 收件人GPS 收件人性别 消息类型 双方距离 消息 msg_time sender_nickyname sender_account sender_sex sender_ip sender_os sender_phone_type sender_network sender_gps receiver_nickyname receiver_ip receiver_account receiver_os receiver_phone_type receiver_network receiver_gps receiver_sex msg_type distance message 2020/05/08 15:11:33 古博易 14747877194 男 48.147.134.255 Android 8.0 小米 Redmi K30 4G 94.704577,36.247553 莱优 97.61.25.52 17832829395 IOS 10.0 Apple iPhone 10 4G 84.034145,41.423804 女 TEXT 77.82KM 天涯海角惆怅渡,牛郎织女隔天河。佛祖座前长顿首,只求共度一百年。 -
数据生成
-
创建原始文件目录
mkdir /export/data/momo_init
-
上传模拟数据程序
cd /export/data/momo_init rz
-
创建模拟数据目录
mkdir /export/data/momo_data
-
运行程序生成数据
-
语法
java -jar /export/data/momo_init/MoMo_DataGen.jar 原始数据路径 模拟数据路径 随机产生数据间隔ms时间
-
测试:每500ms生成一条数据
java -jar /export/data/momo_init/MoMo_DataGen.jar \ /export/data/momo_init/MoMo_Data.xlsx \ /export/data/momo_data/ \ 500
-
结果:生成模拟数据文件MOMO_DATA.dat,并且每条数据中字段分隔符为\001
-
-
-
-
小结
- 了解数据源的格式及实现模拟数据的生成
05:技术架构及技术选型
-
目标:掌握实时案例的技术架构及技术选型
-
路径
- step1:需求分析
- step2:技术选型
- step3:技术架构
-
实施
-
需求分析
- 离线存储计算
- 提供离线T + 1的统计分析
- 提供离线数据的即时查询
- 实时存储计算
- 提供实时统计分析
- 离线存储计算
-
技术选型
- 离线
- 数据采集:Flume
- 离线存储:Hbase
- 离线分析:Hive:复杂计算
- 即时查询:Phoenix:高效查询
- 实时
- 数据采集:Flume
- 实时存储:Kafka
- 实时计算:Flink
- 实时应用:MySQL + FineBI 或者 Redis + JavaWeb可视化
- 离线
-
技术架构
- 为什么不直接将Flume的数据给Hbase,而统一的给了Kafka,再由Kafka到Hbase?
- 避免高并发写导致机器负载过高、实现架构解耦、实现异步高效
- 保证数据一致性
- 为什么不直接将Flume的数据给Hbase,而统一的给了Kafka,再由Kafka到Hbase?
-
-
小结
- 掌握实时案例的技术架构及技术选型
06:Flume的回顾及安装
-
目标:回顾Flume基本使用及实现Flume的安装测试
-
路径
- step1:Flume回顾
- step2:Flume的安装
- step3:Flume的测试
-
实施
-
Flume的回顾
- 功能:实时对文件或者网络端口进行数据流监听采集
- 场景:文件实时采集
- 开发
- step1:先开发一个配置文件:properties【K=V】
- step2:运行这个文件即可
- 组成
- Agent:一个Agent就是一个Flume程序
- Source:负责监听数据源,将数据源的动态数据变成每一条Event数据,将Event数据流放入Channel
- Channel:负责临时存储Source发送过来的数据,供Sink来取数据
- Sink:负责从Channel拉取数据写入目标地
- Event:代表一条数据对象
- head:Map集合[KV]
- body:byte[]
-
Flume的安装
-
上传安装包
cd /export/software/ rz
-
解压安装
tar -zxvf apache-flume-1.9.0-bin.tar.gz -C /export/server/ cd /export/server mv apache-flume-1.9.0-bin flume-1.9.0-bin
-
修改配置
#集成HDFS,拷贝HDFS配置文件 cd /export/server/flume-1.9.0-bin cp /export/server/hadoop/etc/hadoop/core-site.xml ./conf/ #修改Flume环境变量 cd /export/server/flume-1.9.0-bin/conf/ mv flume-env.sh.template flume-env.sh vim flume-env.sh
#修改22行 export JAVA_HOME=/export/server/jdk1.8.0_65 #修改34行 export HADOOP_HOME=/export/server/hadoop-3.3.0
-
删除Flume自带的guava包,替换成Hadoop的
cd /export/server/flume-1.9.0-bin rm -rf lib/guava-11.0.2.jar cp /export/server/hadoop/share/hadoop/common/lib/guava-27.0-jre.jar lib/
-
创建目录
cd /export/server/flume-1.9.0-bin #程序配置文件存储目录 mkdir usercase #Taildir元数据存储目录 mkdir position
-
-
Flume的测试
-
需求:采集聊天数据,写入HDFS
-
分析
- Source:taildir:动态监听多个文件实现实时数据采集
- Channel:mem:将数据缓存在内存
- Sink:hdfs
-
开发
vim /export/server/flume-1.9.0-bin/usercase/momo_mem_hdfs.properties
# define a1 a1.sources = s1 a1.channels = c1 a1.sinks = k1 #define s1 a1.sources.s1.type = TAILDIR #指定一个元数据记录文件 a1.sources.s1.positionFile = /export/server/flume-1.9.0-bin/position/taildir_momo_hdfs.json #将所有需要监控的数据源变成一个组 a1.sources.s1.filegroups = f1 #指定了f1是谁:监控目录下所有文件 a1.sources.s1.filegroups.f1 = /export/data/momo_data/.* #指定f1采集到的数据的header中包含一个KV对 a1.sources.s1.headers.f1.type = momo a1.sources.s1.fileHeader = true #define c1 a1.channels.c1.type = memory a1.channels.c1.capacity = 10000 a1.channels.c1.transactionCapacity = 1000 #define k1 a1.sinks.k1.type = hdfs a1.sinks.k1.hdfs.path = /flume/momo/test/daystr=%Y-%m-%d a1.sinks.k1.hdfs.fileType = DataStream #指定按照时间生成文件,一般关闭 a1.sinks.k1.hdfs.rollInterval = 0 #指定文件大小生成文件,一般120 ~ 125M对应的字节数 a1.sinks.k1.hdfs.rollSize = 102400 #指定event个数生成文件,一般关闭 a1.sinks.k1.hdfs.rollCount = 0 a1.sinks.k1.hdfs.filePrefix = momo a1.sinks.k1.hdfs.fileSuffix = .log a1.sinks.k1.hdfs.useLocalTimeStamp = true #bound a1.sources.s1.channels = c1 a1.sinks.k1.channel = c1
-
启动HDFS
start-dfs.sh
-
运行Flume
cd /export/server/flume-1.9.0-bin bin/flume-ng agent -c conf/ -n a1 -f usercase/momo_mem_hdfs.properties -Dflume.root.logger=INFO,console
-
运行模拟数据
java -jar /export/data/momo_init/MoMo_DataGen.jar \ /export/data/momo_init/MoMo_Data.xlsx \ /export/data/momo_data/ \ 100
-
查看结果
-
-
-
小结
- 回顾Flume基本使用及实现Flume的安装测试
07:Flume采集程序开发
-
目标:实现案例Flume采集程序的开发
-
路径
- step1:需求分析
- step2:程序开发
- step3:测试实现
-
实施
-
需求分析
-
需求:采集聊天数据,实时写入Kafka
-
Source:taildir
-
Channel:mem
-
Sink:Kafka sink
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink a1.sinks.k1.kafka.bootstrap.servers = node1:9092,node2:9092,node3:9092 a1.sinks.k1.kafka.producer.acks = 1 a1.sinks.k1.kafka.topic = mytopic a1.sinks.k1.kafka.flumeBatchSize = 20 a1.sinks.k1.kafka.producer.linger.ms = 1 a1.sinks.k1.kafka.producer.compression.type = snappy
-
-
程序开发
vim /export/server/flume-1.9.0-bin/usercase/momo_mem_kafka.properties
# define a1 a1.sources = s1 a1.channels = c1 a1.sinks = k1 #define s1 a1.sources.s1.type = TAILDIR #指定一个元数据记录文件 a1.sources.s1.positionFile = /export/server/flume-1.9.0-bin/position/taildir_momo_kafka.json #将所有需要监控的数据源变成一个组 a1.sources.s1.filegroups = f1 #指定了f1是谁:监控目录下所有文件 a1.sources.s1.filegroups.f1 = /export/data/momo_data/.* #指定f1采集到的数据的header中包含一个KV对 a1.sources.s1.headers.f1.type = momo a1.sources.s1.fileHeader = true #define c1 a1.channels.c1.type = memory a1.channels.c1.capacity = 10000 a1.channels.c1.transactionCapacity = 1000 #define k1 a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink a1.sinks.k1.kafka.topic = MOMO_MSG a1.sinks.k1.kafka.bootstrap.servers = node1:9092,node2:9092,node3:9092 a1.sinks.k1.kafka.flumeBatchSize = 10 a1.sinks.k1.kafka.producer.acks = 1 a1.sinks.k1.kafka.producer.linger.ms = 100 #bound a1.sources.s1.channels = c1 a1.sinks.k1.channel = c1
-
测试实现
-
启动Kafka
start-zk-all.sh start-kafka.sh
-
创建Topic
kafka-topics.sh --create --topic MOMO_MSG --partitions 3 --replication-factor 2 --bootstrap-server node1:9092,node2:9092,node3:9092
注意:Kafka2.11版本用–zookeeper 替代
kafka-topics.sh --create --topic MOMO_MSG --partitions 3 --replication-factor 2 --zookeeper node01:9092 -
列举
kafka-topics.sh --list --bootstrap-server node1:9092,node2:9092,node3:9092
-
启动消费者
kafka-console-consumer.sh --topic MOMO_MSG --bootstrap-server node1:9092,node2:9092,node3:9092
-
启动Flume程序
cd /export/server/flume-1.9.0-bin bin/flume-ng agent -c conf/ -n a1 -f usercase/momo_mem_kafka.properties -Dflume.root.logger=INFO,console
-
启动模拟数据
java -jar /export/data/momo_init/MoMo_DataGen.jar \ /export/data/momo_init/MoMo_Data.xlsx \ /export/data/momo_data/ \ 50
-
观察结果
-
-
-
小结
- 实现案例Flume采集程序的开发
文章来源:https://blog.csdn.net/xianyu120/article/details/133811713
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。 如若内容造成侵权/违法违规/事实不符,请联系我的编程经验分享网邮箱:veading@qq.com进行投诉反馈,一经查实,立即删除!
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。 如若内容造成侵权/违法违规/事实不符,请联系我的编程经验分享网邮箱:veading@qq.com进行投诉反馈,一经查实,立即删除!