Flink之状态编程

2023-12-13 05:47:42

状态

对人来说,状态是指当下的各种条件的具体情况就是状态;对于数据来说,状态就是当下需要维护的额外的数据。

算子

  • 无状态算子:无状态算子任务只需要观察每个独立事件,根据当前输入的数据直接转换输出结果。
  • 有状态算子:每次计算后需要保存中间结果,等下一个数据到来时再进入到计算过程。

状态的分类

  1. 托管状态:由Flink统一管理的,状态的存储访问、故障恢复和重组等一系列问题都由Flink实现。
    • 算子状态:状态由算子内所有数据共享,不做key的隔离。实现CheckpointFunction接口,否则跟本地变量没有区别
    • 按键分区状态:状态是根据key进行隔离的,每个并行子任务维护着对应的状态。
  2. 原始状态:自定义的状态,相当于就是开辟一块内存,需要我们自己管理,实现状态的序列化和故障恢复。

ListState状态算子

列表状态,可以当成一个list来使用,状态的创建、管理、备份(需要依赖检查点,按键分区时自动备份)、恢复都由Flink完成。使用时需要实现CheckpointedFunction接口。

其中接口需要实现两个方法:

  • snapshotState():状态的快照备份,随着检查点周期性调用
  • initializeState():状态初始化,每个并行度执行一次

创建步骤分解:

  1. 定义数据流
  2. 使用map算子,在map算子中调用自定义的MapFunction
  3. 自定义的MyMapFunction实现 MapFunction<>和CheckpointFunction接口
    • map(): MapFunction接口需要重写的方法,实现map逻辑
    • initializeState(): 状态的初始化,如果有历史状态,需要将历史状态恢复到当前的状态中
    • snapshotState(): 进行状态的快照备份,无需添加代码,Flink有内部机制实现了

示例代码:
在shell端输入nc -lk 8888 输入数据,输入x模拟发生异常,发生异常后Flink会尝试重试,并恢复状态,测试时可以查看状态是否正常恢复。

public class Flink02_ListStateOperator {
    public static void main(String[] args) {

        //1.创建运行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        //默认是最大并行度
        env.setParallelism(2);

        env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 1000));

        //开启了检查点,会无限重启,检查点会保存状态,重启是有意义的
        env.enableCheckpointing(2000L);

        DataStreamSink<String> ds = env.socketTextStream("hadoop102", 8888)
                .map(
                        //将读取的字符串数据维护到一个集合中,最后输出到控制台
                        new MyMapFunction()
                ).addSink(
                        new MyPrintSink()
                );


        try {
            env.execute();
        } catch (Exception e) {
            throw new RuntimeException(e);
        }


    }

    public static class MyPrintSink implements SinkFunction<String>{
        @Override
        public void invoke(String value, Context context) throws Exception {
            if(value.contains("x")){
                throw new RuntimeException("抛异常了");
            }
            System.out.println("sink:"+value);
        }
    }


    public static class MyMapFunction implements MapFunction<String, String>, CheckpointedFunction {

        //定义集合,用于维护每个数据
        //原始状态
//        private List<String> datas = new ArrayList<>();

        //列表状态
        private ListState<String> listState;

        @Override
        public String map(String value) throws Exception {
            listState.add(value);
            return listState.get().toString();
        }


        /**
         * 状态备份
         * @param context the context for drawing a snapshot of the operator
         * @throws Exception
         */
        @Override
        public void snapshotState(FunctionSnapshotContext context) throws Exception {
            System.out.println("备份中...");
        }

        /**
         * 状态的初始化
         *
         *  两种情况:
         *      1. 第一次启动程序,没有历史状态,就相当于直接创建一个状态处理
         *      2. 恢复启动,有历史的状态,需要将历史的状态恢复到当前的状态中
         * @param context the context for initializing the operator
         * @throws Exception
         */
        @Override
        public void initializeState(FunctionInitializationContext context) throws Exception {
            System.out.println("初始化");
            //存储历史状态,通过store来判断是否存在历史
            OperatorStateStore operatorStateStore = context.getOperatorStateStore();
            ListStateDescriptor<String> descriptor = new ListStateDescriptor<>("listState", String.class);
            //初始化
            listState = operatorStateStore.getListState(descriptor);
        }
    }
}

UnionListState状态算子:本质上也是ListState,唯一的区别是在初始化调用的方法为listState = operatorStateStore.getUnionListState(descriptor);,以及在出现异常后,状态恢复时会将所有状态发给所有并发子节点。而原先的ListState会使用轮询的方式恢复到原有的状态。

BroadcastState广播状态

广播状态也是Flink托管状态的一类,一般用于广播少量数据,比如一些通用的配置。大概的实现过程:

  • 首先要得到一条广播流(由一条普通的数据流进行广播操作后得到)
  • 通过广播流和普通数据流进行connect操作,在对数据进行处理的过程中使用广播状态。

详细步骤及核心代码如下:

  1. 定义普通数据流DataStreamSource<String> dataDs = env.socketTextStream("hadoop102", 8888);
  2. 定义配置数据流DataStreamSource<String> confDs = env.socketTextStream("hadoop102", 9999);
  3. 配置流进行broadcast广播操作,得到广播流broadcastStream : BroadcastStream<String> broadcastStream = confDs.broadcast(mapStateDescriptor);
  4. 普通数据流和广播流进行connect操作后,再使用process算子分别对广播流和数据流进行相应操作,一般是通过广播流的配置来修改数据的相应处理逻辑。
  5. 广播状态本质上是一个map结构,通过ctx上下文来获取广播状态对象
    • broadcastState.put(“flag”, value); 广播配置
    • String flag = readOnlyBroadcastState.get(“flag”); 获取广播配置

完整示例代码:

public class Flink04_BroadcastStateOperator {
    public static void main(String[] args) {
        //1.创建运行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        //默认是最大并行度
        env.setParallelism(1);

        DataStreamSource<String> dataDs = env.socketTextStream("hadoop102", 8888);

        //广播流,广播配置
        //配置流
        DataStreamSource<String> confDs = env.socketTextStream("hadoop102", 9999);

        MapStateDescriptor<String, String> mapStateDescriptor = new MapStateDescriptor<>("mapState", String.class, String.class);
        BroadcastStream<String> broadcastStream = confDs.broadcast(mapStateDescriptor);

        //数据流与广播流connect
        dataDs.connect(broadcastStream)
                .process(
                        new BroadcastProcessFunction<String, String, String>() {
                            /**
                             * 处理数据流
                             * @param value The stream element.
                             * @param ctx A {@link ReadOnlyContext} that allows querying the timestamp of the element,
                             *     querying the current processing/event time and updating the broadcast state. The context
                             *     is only valid during the invocation of this method, do not store it.
                             * @param out The collector to emit resulting elements to
                             * @throws Exception
                             */
                            @Override
                            public void processElement(String value, BroadcastProcessFunction<String, String, String>.ReadOnlyContext ctx, Collector<String> out) throws Exception {

                                //从广播状态中获取flag

                                ReadOnlyBroadcastState<String, String> readOnlyBroadcastState = ctx.getBroadcastState(mapStateDescriptor);

                                String flag = readOnlyBroadcastState.get("flag");

//                                String flag = null;

                                if("1".equals(flag)){
                                    System.out.println("执行1号分支");
                                }else if("2".equals(flag)){
                                    System.out.println("执行2号分支");
                                }else{
                                    System.out.println("执行默认逻辑");
                                }

                                out.collect(value);
                            }

                            /**
                             * 处理广播流
                             * @param value The stream element.
                             * @param ctx A {@link Context} that allows querying the timestamp of the element, querying the
                             *     current processing/event time and updating the broadcast state. The context is only valid
                             *     during the invocation of this method, do not store it.
                             * @param out The collector to emit resulting elements to
                             * @throws Exception
                             */
                            @Override
                            public void processBroadcastElement(String value, BroadcastProcessFunction<String, String, String>.Context ctx, Collector<String> out) throws Exception {
                                BroadcastState<String, String> broadcastState = ctx.getBroadcastState(mapStateDescriptor);
                                //广播配置
                                broadcastState.put("flag", value);
                            }
                        }
                ).print();

        try {
            env.execute();
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }
}

文章来源:https://blog.csdn.net/qq_44273739/article/details/134940976
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。