物流实时数仓:数仓搭建(DWD)二

2023-12-18 09:38:10

系列文章目录

物流实时数仓:采集通道搭建
物流实时数仓:数仓搭建
物流实时数仓:数仓搭建(DIM)
物流实时数仓:数仓搭建(DWD)一
物流实时数仓:数仓搭建(DWD)二



前言

这次博客我们完成剩下的DWD层的建设
在这里插入图片描述
由流程图可知,我们还需要编写两个Flink程序


提示:以下是本篇文章正文内容,下面案例可供参考

一、代码编写

1.文件创建

我们需要在添加5个bean文件
在这里插入图片描述
还有两个app文件。
在这里插入图片描述

2.物流域运输完成事实表

1.DwdTransTransFinishBean

package com.atguigu.tms.realtime.beans;

import lombok.Data;

import java.math.BigDecimal;

/**
 * 物流域:运输完成事实表实体类
 */
@Data
public class DwdTransTransFinishBean {
    // 编号(主键)
    String id;

    // 班次ID
    String shiftId;

    // 线路ID
    String lineId;

    // 起始机构ID
    String startOrgId;

    // 起始机构名称
    String startOrgName;

    // 目的机构id
    String endOrgId;

    // 目的机构名称
    String endOrgName;

    // 运单个数
    Integer orderNum;

    // 司机1 ID
    String driver1EmpId;

    // 司机1名称
    String driver1Name;

    // 司机2 ID
    String driver2EmpId;

    // 司机2名称
    String driver2Name;

    // 卡车ID
    String truckId;

    // 卡车号牌
    String truckNo;

    // 实际启动时间
    String actualStartTime;

    // 实际到达时间
    String actualEndTime;

    // 运输时长
    Long transportTime;

    // 实际行驶距离
    BigDecimal actualDistance;

    // 时间戳
    Long ts;
}

2.DwdTransTransFinish

package com.atguigu.tms.realtime.app.dwd;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.atguigu.tms.realtime.beans.DwdTransTransFinishBean;
import com.atguigu.tms.realtime.utils.CreateEnvUtil;
import com.atguigu.tms.realtime.utils.DateFormatUtil;
import com.atguigu.tms.realtime.utils.KafkaUtil;
import org.apache.commons.codec.digest.DigestUtils;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;

// 物流域:运输完成事实表
public class DwdTransTransFinish {
    public static void main(String[] args) throws Exception {
        // 准备环境
        StreamExecutionEnvironment env = CreateEnvUtil.getStreamEnv(args);
        env.setParallelism(4);

        // 从kafka读取数据
        String topic = "tms_ods";
        String groupId = "dwd_trans_tran_finish_group";
        KafkaSource<String> kafkaSource = KafkaUtil.getKafkaSource(topic, groupId, args);
        SingleOutputStreamOperator<String> kafkaStrDS = env.fromSource(kafkaSource, WatermarkStrategy.noWatermarks(), "Kafka_source")
                .uid("Kafka_source");

        // 筛选出运输完成的数据
        SingleOutputStreamOperator<String> filterDS = kafkaStrDS.filter(
                new FilterFunction<String>() {
                    @Override
                    public boolean filter(String jsonStr) throws Exception {
                        JSONObject jsonObj = JSON.parseObject(jsonStr);
                        // 将transport_task 表的操作数据过滤处理
                        String table = jsonObj.getJSONObject("source").getString("table");
                        if (!"transport_task".equals(table)) {
                            return false;
                        }
                        String op = jsonObj.getString("op");
                        JSONObject beforeJsonObj = jsonObj.getJSONObject("before");
                        if (beforeJsonObj == null) {
                            return false;
                        }
                        JSONObject afterJsonObj = jsonObj.getJSONObject("after");
                        String oldActualEndTime = beforeJsonObj.getString("actual_end_time");
                        String newActualEndTime = afterJsonObj.getString("actual_end_time");

                        return "u".equals(op) && oldActualEndTime == null && newActualEndTime != null;
                    }
                }
        );


        // 筛选出的数据进行处理
        SingleOutputStreamOperator<String> processDS = filterDS.process(
                new ProcessFunction<String, String>() {
                    @Override
                    public void processElement(String jsonStr, ProcessFunction<String, String>.Context ctx, Collector<String> out) throws Exception {
                        JSONObject jsonObj = JSON.parseObject(jsonStr);
                        DwdTransTransFinishBean finishBean = jsonObj.getObject("after", DwdTransTransFinishBean.class);

                        // 补充运输时常字段
                        finishBean.setTransportTime(Long.parseLong(finishBean.getActualStartTime()) - Long.parseLong(finishBean.getActualEndTime()));

                        // 将运输结束时间转换为毫秒-8小时 赋值给事件时间字段ts
                        finishBean.setTs(Long.parseLong(finishBean.getActualEndTime()) - 8 * 60 * 60 * 1000L);

                        // 处理时间问题
                        finishBean.setActualStartTime(DateFormatUtil.toYmdHms(Long.parseLong(finishBean.getActualStartTime()) - 8 * 60 * 60 * 1000L));
                        finishBean.setActualEndTime(DateFormatUtil.toYmdHms(Long.parseLong(finishBean.getActualEndTime()) - 8 * 60 * 60 * 1000L));

                        // 脱敏
                        String driver1Name = finishBean.getDriver1Name();
                        String driver2Name = finishBean.getDriver2Name();
                        String truckNo = finishBean.getTruckNo();

                        driver1Name = driver1Name.charAt(0) +
                                driver1Name.substring(1).replaceAll(".", "\\*");
                        driver2Name = driver2Name == null ? driver2Name : driver2Name.charAt(0) +
                                driver2Name.substring(1).replaceAll(".", "\\*");
                        truckNo = DigestUtils.md5Hex(truckNo);

                        finishBean.setDriver1Name(driver1Name);
                        finishBean.setDriver2Name(driver2Name);
                        finishBean.setTruckNo(truckNo);

                        out.collect(JSON.toJSONString(finishBean));
                    }
                }
        );
        // 处理后的数据写入kafka
        String sinkTopic = "tms_dwd_trans_trans_finish";
        processDS.sinkTo(KafkaUtil.getKafkaSink(sinkTopic,args)).uid("kafka_sink");

        env.execute();
    }
}

3.中转域中转流程分流应用

1.DwdOrderOrgBoundOriginBean

package com.atguigu.tms.realtime.beans;

import lombok.Data;

/**
 * 中转实体类
 */
@Data
public class DwdOrderOrgBoundOriginBean {
    // 编号(主键)
    String id;

    // 运单编号
    String orderId;

    // 机构id
    String orgId;

    // 状态 出库 入库
    String status;

    // 入库时间
    String inboundTime;

    // 入库人员id
    String inboundEmpId;

    // 分拣时间
    String sortTime;

    // 分拣人员id
    String sorterEmpId;

    // 出库时间
    String outboundTime;

    // 出库人员id
    String outboundEmpId;

    // 创建时间
    String createTime;

    // 修改时间
    String updateTime;

    // 删除标志
    String isDeleted;
}

2.DwdBoundInboundBean

package com.atguigu.tms.realtime.beans;

import lombok.Builder;
import lombok.Data;

/**
 * 中转域:入库实体类
 */
@Data
@Builder
public class DwdBoundInboundBean {
    // 编号(主键)
    String id;

    // 运单编号
    String orderId;

    // 机构id
    String orgId;

    // 入库时间
    String inboundTime;

    // 入库人员id
    String inboundEmpId;

    // 时间戳
    Long ts;
}

3.DwdBoundSortBean

package com.atguigu.tms.realtime.beans;

import lombok.Builder;
import lombok.Data;

/**
 * 中转域:分拣实体类
 */
@Data
@Builder
public class DwdBoundSortBean {
    // 编号(主键)
    String id;

    // 运单编号
    String orderId;

    // 机构id
    String orgId;

    // 分拣时间
    String sortTime;

    // 分拣人员id
    String sorterEmpId;

    // 时间戳
    Long ts;
}

4.DwdBoundOutboundBean

package com.atguigu.tms.realtime.beans;

import lombok.Builder;
import lombok.Data;

/**
 * 中转域:出库实体类
 */
@Data
@Builder
public class DwdBoundOutboundBean {
    // 编号(主键)
    String id;

    // 运单编号
    String orderId;

    // 机构id
    String orgId;

    // 出库时间
    String outboundTime;

    // 出库人员id
    String outboundEmpId;

    // 时间戳
    Long ts;
}

5.DwdBoundRelevantApp

package com.atguigu.tms.realtime.app.dwd;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.atguigu.tms.realtime.beans.DwdBoundInboundBean;
import com.atguigu.tms.realtime.beans.DwdBoundOutboundBean;
import com.atguigu.tms.realtime.beans.DwdBoundSortBean;
import com.atguigu.tms.realtime.beans.DwdOrderOrgBoundOriginBean;
import com.atguigu.tms.realtime.utils.CreateEnvUtil;
import com.atguigu.tms.realtime.utils.DateFormatUtil;
import com.atguigu.tms.realtime.utils.KafkaUtil;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.streaming.api.datastream.SideOutputDataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;

public class DwdBoundRelevantApp {
    public static void main(String[] args) throws Exception {
        // 环境准备
        StreamExecutionEnvironment env = CreateEnvUtil.getStreamEnv(args);
        env.setParallelism(4);

        // 从kafka读取数据
        String topic = "tms_ods";
        String groupId = "dwd_bound_group";

        KafkaSource<String> kafkaSource = KafkaUtil.getKafkaSource(topic, groupId, args);
        SingleOutputStreamOperator<String> kafkaStrDS = env.fromSource(kafkaSource, WatermarkStrategy.noWatermarks(), "kafka_source")
                .uid("kafka_source");

        // 筛选出订单机构中转表
        SingleOutputStreamOperator<String> filterDS = kafkaStrDS.filter(
                new FilterFunction<String>() {
                    @Override
                    public boolean filter(String jsonStr) throws Exception {
                        JSONObject jsonObj = JSON.parseObject(jsonStr);
                        String table = jsonObj.getJSONObject("source").getString("table");
                        return "order_org_bound".equals(table);
                    }
                }
        );


        // 定义侧输出流标签
        OutputTag<String> sortTag = new OutputTag<String>("sortTag") {
        };
        OutputTag<String> outboundTag = new OutputTag<String>("outboundTag") {
        };



        // 分流 入库->主流  分流->分拣侧输出流 出库->出库侧输出流
        SingleOutputStreamOperator<String> inboundDS = filterDS.process(new ProcessFunction<String, String>() {
            @Override
            public void processElement(String jsonStr, ProcessFunction<String, String>.Context ctx, Collector<String> out) throws Exception {
                JSONObject jsonObj = JSON.parseObject(jsonStr);
                String op = jsonObj.getString("op");

                DwdOrderOrgBoundOriginBean after = jsonObj.getObject("after", DwdOrderOrgBoundOriginBean.class);
                DwdOrderOrgBoundOriginBean before = jsonObj.getObject("before", DwdOrderOrgBoundOriginBean.class);

                // 获取中转数据id
                String id = after.getId();
                // 获取运单id
                String orderId = after.getOrderId();
                // 获取机构id
                String orgId = after.getOrgId();

                if ("c".equals(op)) {
                    long ts = Long.parseLong(after.getInboundEmpId()) - 8 * 60 * 60 * 1000L;
                    String inboundTime = DateFormatUtil.toYmdHms(ts);
                    String inboundEmpId = after.getInboundEmpId();

                    // 入库
                    DwdBoundInboundBean inboundBean = DwdBoundInboundBean.builder()
                            .id(id)
                            .orderId(orderId)
                            .orgId(orgId)
                            .inboundTime(inboundTime)
                            .inboundEmpId(inboundEmpId)
                            .ts(ts)
                            .build();
                    out.collect(JSON.toJSONString(inboundBean));
                } else {
                    // 将分拣数据放到侧输出流
                    String beforeSortTime = before.getSortTime();
                    String afterSortTime = after.getSortTime();
                    if (beforeSortTime == null && afterSortTime != null) {
                        long ts = Long.parseLong(after.getSortTime()) - 8 * 60 * 60 * 1000L;
                        String sortTime = DateFormatUtil.toYmdHms(ts);
                        String sorterEmpId = after.getSorterEmpId();
                        DwdBoundSortBean sortBean = DwdBoundSortBean.builder()
                                .id(id)
                                .orderId(orderId)
                                .orgId(orgId)
                                .sortTime(sortTime)
                                .sorterEmpId(sorterEmpId)
                                .ts(ts)
                                .build();
                        ctx.output(sortTag, JSON.toJSONString(sortBean));
                    }

                    // 筛选储库操作 将数据库放到出库侧输出流
                    String beforeOutboundTime = before.getOutboundTime();
                    String afterOutboundTime = after.getOutboundTime();

                    if (beforeOutboundTime == null && afterOutboundTime != null) {
                        long ts = Long.parseLong(after.getOutboundTime()) - 8 * 60 * 60 * 1000L;
                        String outboundTime = DateFormatUtil.toYmdHms(ts);
                        String outboundEmpId = after.getOutboundEmpId();

                        DwdBoundOutboundBean outboundBean = DwdBoundOutboundBean.builder()
                                .id(id)
                                .orderId(orderId)
                                .orgId(orgId)
                                .outboundTime(outboundTime)
                                .outboundEmpId(outboundEmpId)
                                .ts(ts)
                                .build();
                        ctx.output(outboundTag, JSON.toJSONString(outboundBean));
                    }

                }

            }
        });

        // 从主流中提取侧输出流
        // 分拣流
        SideOutputDataStream<String> sortDS = inboundDS.getSideOutput(sortTag);
        // 出库流
        SideOutputDataStream<String> outboundDS = inboundDS.getSideOutput(outboundTag);

        // 将不同流数据写到kafka主题
        //中转域入库事实主题
        String inboundTopic = "tms_dwd_bound_inbound";
        //中转域分拣事实主题
        String sortTopic = "tms_dwd_bound_sort";
        //中转域出库事实主题
        String outboundTopic = "tms_dwd_bound_outbound";

        inboundDS.sinkTo(KafkaUtil.getKafkaSink(inboundTopic, args)).uid("inbound_sink");
        sortDS.sinkTo(KafkaUtil.getKafkaSink(sortTopic, args)).uid("sort_sink");
        outboundDS.sinkTo(KafkaUtil.getKafkaSink(outboundTopic, args)).uid("outbound_sink");

        env.execute();
    }
}

二、代码测试

1.环境启动

hadoop,zk,kf和odsapp全部启动

2.运行flink程序

启动DwdTransTransFinish
然后开一个kafka消费者

kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic tms_dwd_trans_trans_finish

然后继续生产数据

在这里插入图片描述
kafka消费到数据之后,程序就可以关闭了。

启动DwdBoundRelevantApp
然后开启3个消费者。

kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic tms_dwd_bound_inbound
kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic tms_dwd_bound_sort
kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic tms_dwd_bound_outbound

在这里插入图片描述在这里插入图片描述
在这里插入图片描述


总结

至此数仓Dwd层搭建完成。

文章来源:https://blog.csdn.net/weixin_50835854/article/details/135018846
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。