Flink实时电商数仓(八)
2023-12-29 17:32:29
用户域登录各窗口汇总表
- 主要任务:从kafka页面日志主题读取数据,统计
- 七日回流用户:之前活跃的用户,有一段时间不活跃了,之后又开始活跃,称为回流用户
- 当日独立用户数:同一个用户当天重复登录,只算作一个独立用户。
思路分析
- 读取kafka页面主题数据
- 转换数据结构:
String -> JSONObject
- 过滤数据,uid不为null
- 登录的两种情况
- 用户打开应用后自动登录
- 用户打印应用后没有登录,浏览后跳转到登录页面
- 过滤条件:
- uid不为null且last_page_id is null
- last_page_id = login
- 登录的两种情况
- 设置水位线
- 按照uid分组
- 统计回流用户数和独立用户数
- 开窗聚合
- 写入doris
具体实现
- 设置端口、并行度、消费者组、kafka主题
- 读取dwd页面主题数据
-stream.print()
- 对数据进行清洗过滤:uid不为空
stream.flatMap()
使用flatMap过滤new FlatMapFunction<>(){}
在该方法内部转换为JSONObject
, 并且获取uid和lastPageId, try-catch这段代码- 判断是否满足思路分析中的条件,如果中途发生异常,直接catch后打印到控制台清理掉即可。
- 先注册水位线
jsonObjStream.assignTimestampAndWatermark
new SerializableTimestampAssigner<>
, 提取数据中的ts
- 按照uid分组
stream.keyby()
按照uid进行分组
- 判断独立用户和回流用户
- 创建
UserLoginBean
, 使用状态保存用户的登录信息 - 在open方法中,
getRuntimeContext().getState(new ValueStateDescriptor<>("last_login_dt",String.class))
创建状态记录用户上一次的登录时间 - 在
processElement
方法中比较当前登录的日期和状态存储的日期- 如果
lastLoginDt==null
是新用户 - 如果不为空,判断上次登录时间和当前时间的差值是否大于7天;如果大于7天,说明是回流用户。
- 如果小于7天,还需要判断上次登录时间是否是今天,如果不是今天,则说明该用户本次是独立用户。
- 如果
- 创建
- 开窗聚合
- 使用滚动窗口开窗聚合
- 在
reduce
算子中写聚合逻辑 - 在
process
算子中获取窗口信息
- 写入doris
- 创建
doris sink
,写出到doris
- 创建
核心代码
public static void main(String[] args) {
new DwsUserUserLoginWindow().start(10024,4,"dws_user_user_login_window", Constant.TOPIC_DWD_TRAFFIC_PAGE);
}
@Override
public void handle(StreamExecutionEnvironment env, DataStreamSource<String> stream) {
//1.读取dwd页面数据
//stream.print();
//2. 对数据进行清洗过滤
SingleOutputStreamOperator<JSONObject> jsonObjStream = etl(stream);
//3. 注册水位线
SingleOutputStreamOperator<JSONObject> withWatermarkStream = addWatermark(jsonObjStream);
//4. 按照uid分组
KeyedStream<JSONObject, String> keyedStream = getKeyedStream(withWatermarkStream);
//5. 判断独立用户和回流用户
SingleOutputStreamOperator<UserLoginBean> processedStream = getUserLoginBeanStream(keyedStream);
//processedStream.print();
//开窗聚合
SingleOutputStreamOperator<UserLoginBean> reducedStream = getReducedStream(processedStream);
//reducedStream.print();
//写入Doris
reducedStream.map(new DorisMapFunction<>())
.sinkTo(FlinkSinkUtil.getDorisSink(Constant.DWS_USER_USER_LOGIN_WINDOW));
}
[gitee仓库地址:(https://gitee.com/langpaian/gmall2023-realtime)
文章来源:https://blog.csdn.net/qq_44273739/article/details/135267859
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。 如若内容造成侵权/违法违规/事实不符,请联系我的编程经验分享网邮箱:veading@qq.com进行投诉反馈,一经查实,立即删除!
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。 如若内容造成侵权/违法违规/事实不符,请联系我的编程经验分享网邮箱:veading@qq.com进行投诉反馈,一经查实,立即删除!