Flink之keyby状态

2023-12-13 05:44:24

Keyed State

  1. 值状态:维护一个具体的值
    • ValueState继承自State
    • T value(): 从状态中获取维护的数据
    • update(): 更新状态
  2. 列表状态:可以当成List使用,维护多个值
    • add(): 添加一个状态
    • addAll():添加多个状态,不会覆盖原有的状态
    • get():获取状态
    • update():将指定集合的数据直接更新到状态中,会覆盖原有的状态
    • clear():清空状态
  3. Map状态:维护一个Map结构的状态
    • get(): 通过key获取状态
    • put(): 添加kv到状态中
    • putAll():将指定的map添加到状态中
    • remove():通过key移除某个状态
    • contains():判断是否包含指定的key
    • entries():将状态中所有的kv获取出来
    • keys():获取所有的key
    • values(): 获取所有的value
    • isEmpty():判断map是否为空
  4. 规约状态:类似值状态,但是提供了规约功能,规约时输入类型和输出类型一致
    • get(): 获取状态中的值
    • add():将指定的数据添加到状态中
  5. 聚合状态: 类似值状态,但是提供了聚合功能,聚合时输入类型和输出类型可以不一致

示例代码:

public class Flink05_KeyedValueState {
    public static void main(String[] args) {
        //1.创建运行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        //默认是最大并行度
        env.setParallelism(1);

        //s1,100,1000
        SingleOutputStreamOperator<WaterSensor> ds = env.socketTextStream("hadoop102", 8888)
                .map(
                        line -> {
                            String[] fields = line.split(",");
                            return new WaterSensor(fields[0].trim(), Long.valueOf(fields[1].trim()), Integer.valueOf(fields[2].trim()));
                        }
                );


        //检测每种传感器的水位值,如果连续的两个水位值超过10,就报警

        ds.keyBy(WaterSensor::getId)
                .process(
                        new KeyedProcessFunction<String, WaterSensor, String>() {

                            //声明状态
                            private ValueState<Integer> valueState;

                            //初始化
                            @Override
                            public void open(Configuration parameters) throws Exception {
                                ValueStateDescriptor<Integer> valueStateDescriptor = new ValueStateDescriptor<>("valueState", Integer.class);
                                valueState = getRuntimeContext().getState(valueStateDescriptor);
                            }

                            /**
                             *
                             * @param value The input value.
                             * @param ctx A {@link Context} that allows querying the timestamp of the element and getting a
                             *     {@link TimerService} for registering timers and querying the time. The context is only
                             *     valid during the invocation of this method, do not store it.
                             * @param out The collector for returning result values.
                             * @throws Exception
                             */
                            @Override
                            public void processElement(WaterSensor value, KeyedProcessFunction<String, WaterSensor, String>.Context ctx, Collector<String> out) throws Exception {
                                //从状态中获取上一次的水位值
                                Integer lastVc = valueState.value();
                                Integer curVc = value.getVc();
                                //判空
                                if(lastVc == null){
                                    //第一条数据,直接更新到状态中
                                    valueState.update(value.getVc());
                                    out.collect("本次的水位值:"+value.getVc());
                                }else{
                                    int diff = Math.abs(lastVc - curVc);
                                    if(diff>10)
                                        out.collect("本次水位值:"+curVc+",上次水位值:"+lastVc+"差值超过10");
                                    else{
                                        out.collect("本次的水位值:"+value.getVc());
                                        valueState.update(curVc);
                                    }
                                }
                            }
                        }
                ).print();

        try {
            env.execute();
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }
}

状态的生存时间

TTL(Time to live), 用来控制状态的清理时间,当超过某个时间不对状态操作后,Flink会自动清理该状态。

TTL的设置:

  1. 在状态的初始化时可以设置TTL
  2. 需要先通过StateTtlConfig.newBuilder()来设置一些常见参数
    • 更新类型:
      • 读或者写,重置过期时间
      • 只关注修改,不关注读操作
    • 可见性:即过期后是否提示已经过期,还是直接不可见
  3. 使用状态解释器.enableTimeToLive(ttlConfig)来设置TTL

示例代码:

private ListState<Integer> listState;

//初始化
  @Override
  public void open(Configuration parameters) throws Exception {
      ListStateDescriptor<Integer> listStateDescriptor = new ListStateDescriptor<>("listState", Integer.class);
      StateTtlConfig ttlConfig = StateTtlConfig.newBuilder(Time.seconds(10))
              .setUpdateType(StateTtlConfig.UpdateType.OnReadAndWrite)
              .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
              .build();
      listStateDescriptor.enableTimeToLive(ttlConfig);
      listState = getRuntimeContext().getListState(listStateDescriptor);
  }

状态后端

  1. 本地化状态管理
  2. 持久化状态,基于检查点机制,将本地的状态持久化到存储设备中,一般是HDFS

分类:

  • HashMapStateBackend: 将状态存储到TaskManager的内存中,读写效果高,不支持大状态存储
  • EmbeddedRocksDBBackend: RocksDB是Flink内置的数据库,存储为序列化的字节数组,读写效率比HashMap低,支持超大状态,支持增量检查点。

状态后端的配置

  1. 代码配置:env.setStateBackend(new EmbeddedRocksDBStateBackend());
  2. 提交作业的时候指定状态后端,推荐方式:flink run -Dstate.backend=hashmap|embed
  3. 在xml文件中指定,state.backend.type

注意:EmbeddedRocksDBStateBackend后端需要添加相关依赖

  <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-statebackend-rocksdb</artifactId>
            <version>${flink.version}</version>
        </dependency>

文章来源:https://blog.csdn.net/qq_44273739/article/details/134948544
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。