Flink实时电商数仓(九)
2023-12-29 17:34:03
用户注册汇总表
需求分析
- 统计各窗口的注册用户数,写入Doris
思路分析
- 读取kafka用户注册主题数据
- 转换数据结构
string -> JSONObject->javaBean - 使用user_info表中的数据代表用户注册
- 设置水位线
- 开窗聚合
- 写入Doris
具体实现
- 创建用户注册统计类继承BaseApp,设置端口,并行度,kafka消费者组,kafka主题(Topic_user_register)
- 启动zookeeper, HDFS, kafka, maxwell等框架
- 测试能够收到数据
stream.print() - 数据清洗过滤,并且转换数据结构为javaBean
JSONObject.parseObject(value);转换格式json.getString();获取对应字段- 判断对应字段是否为空,不为空则
out.collect()写出
- 添加水位线
assignTimestampsAndWatermark()- 使用
WatermarkStrategy.<泛型>乱序流 DateFormatUtil.dateTimeToTs(element.getCreateTime());提取数据中的时间
- 分组开窗聚合
- 启动doris, 在hadoop102:8030打开web页面
- 在doris页面建立相应的表格
- 创建对应的doris sink
context.window()获取窗口windowwindow.getStart()和window.getEnd()
- 写出到doris,
stream.sinkTo(doris sink);
用户加购汇总表
需求分析
统计各窗口加购独立用户数,写入Doris
思路分析
和上面一样
具体实现
- 数据的清洗过滤,判断user_id和ts不能为空
- 使用try-catch包裹转换判断代码
- 修改ts的位数,原先是10位的秒级单位,*1000更改为毫秒级
- 添加水位线,获取数据中的ts
- 水位线可以保证数据是有序到达的
- 按照user_id进行分组
- 判断是否为独立用户
- 创建独立用户加购类
CartAddUuBean - 在open方法中存储用户上次登录日期
lastLoginDtState- 设置状态的生存时间:
lastLoginDtDesc.enableTimeToLive(StateTtlConfig.newBuilder(Time.days(1)).builder)
- 设置状态的生存时间:
- 在processElement方法中,判断当前数据的时间和状态中的上次登录时间
- 如果上次登录时间为空或者上次登录时间不等于今天,就是独立用户
lastLoginDtState.update(curDt);更新当前的状态- 如果是独立访客,才需要out.collect()写出
- 创建独立用户加购类
- 开窗聚合
v1.set(v1.get + v2.get)对度量值进行聚合TimeWindow window = context.window()获取窗口信息
- 测试开窗聚合信息是否完成
- 写出到Doris,
.map(转换为蛇形字符串) .sinkTo(doris sink);
[gitee仓库地址:(https://gitee.com/langpaian/gmall2023-realtime)
文章来源:https://blog.csdn.net/qq_44273739/article/details/135282804
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。 如若内容造成侵权/违法违规/事实不符,请联系我的编程经验分享网邮箱:veading@qq.com进行投诉反馈,一经查实,立即删除!
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。 如若内容造成侵权/违法违规/事实不符,请联系我的编程经验分享网邮箱:veading@qq.com进行投诉反馈,一经查实,立即删除!