Flink之keyby状态
2023-12-13 05:44:24
Keyed State
- 值状态:维护一个具体的值
- ValueState继承自State
- T value(): 从状态中获取维护的数据
- update(): 更新状态
- 列表状态:可以当成List使用,维护多个值
- add(): 添加一个状态
- addAll():添加多个状态,不会覆盖原有的状态
- get():获取状态
- update():将指定集合的数据直接更新到状态中,会覆盖原有的状态
- clear():清空状态
- Map状态:维护一个Map结构的状态
- get(): 通过key获取状态
- put(): 添加kv到状态中
- putAll():将指定的map添加到状态中
- remove():通过key移除某个状态
- contains():判断是否包含指定的key
- entries():将状态中所有的kv获取出来
- keys():获取所有的key
- values(): 获取所有的value
- isEmpty():判断map是否为空
- 规约状态:类似值状态,但是提供了规约功能,规约时输入类型和输出类型一致
- get(): 获取状态中的值
- add():将指定的数据添加到状态中
- 聚合状态: 类似值状态,但是提供了聚合功能,聚合时输入类型和输出类型可以不一致
示例代码:
public class Flink05_KeyedValueState {
public static void main(String[] args) {
//1.创建运行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//默认是最大并行度
env.setParallelism(1);
//s1,100,1000
SingleOutputStreamOperator<WaterSensor> ds = env.socketTextStream("hadoop102", 8888)
.map(
line -> {
String[] fields = line.split(",");
return new WaterSensor(fields[0].trim(), Long.valueOf(fields[1].trim()), Integer.valueOf(fields[2].trim()));
}
);
//检测每种传感器的水位值,如果连续的两个水位值超过10,就报警
ds.keyBy(WaterSensor::getId)
.process(
new KeyedProcessFunction<String, WaterSensor, String>() {
//声明状态
private ValueState<Integer> valueState;
//初始化
@Override
public void open(Configuration parameters) throws Exception {
ValueStateDescriptor<Integer> valueStateDescriptor = new ValueStateDescriptor<>("valueState", Integer.class);
valueState = getRuntimeContext().getState(valueStateDescriptor);
}
/**
*
* @param value The input value.
* @param ctx A {@link Context} that allows querying the timestamp of the element and getting a
* {@link TimerService} for registering timers and querying the time. The context is only
* valid during the invocation of this method, do not store it.
* @param out The collector for returning result values.
* @throws Exception
*/
@Override
public void processElement(WaterSensor value, KeyedProcessFunction<String, WaterSensor, String>.Context ctx, Collector<String> out) throws Exception {
//从状态中获取上一次的水位值
Integer lastVc = valueState.value();
Integer curVc = value.getVc();
//判空
if(lastVc == null){
//第一条数据,直接更新到状态中
valueState.update(value.getVc());
out.collect("本次的水位值:"+value.getVc());
}else{
int diff = Math.abs(lastVc - curVc);
if(diff>10)
out.collect("本次水位值:"+curVc+",上次水位值:"+lastVc+"差值超过10");
else{
out.collect("本次的水位值:"+value.getVc());
valueState.update(curVc);
}
}
}
}
).print();
try {
env.execute();
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}
状态的生存时间
TTL(Time to live), 用来控制状态的清理时间,当超过某个时间不对状态操作后,Flink会自动清理该状态。
TTL的设置:
- 在状态的初始化时可以设置TTL
- 需要先通过StateTtlConfig.newBuilder()来设置一些常见参数
- 更新类型:
- 读或者写,重置过期时间
- 只关注修改,不关注读操作
- 可见性:即过期后是否提示已经过期,还是直接不可见
- 更新类型:
- 使用状态解释器.enableTimeToLive(ttlConfig)来设置TTL
示例代码:
private ListState<Integer> listState;
//初始化
@Override
public void open(Configuration parameters) throws Exception {
ListStateDescriptor<Integer> listStateDescriptor = new ListStateDescriptor<>("listState", Integer.class);
StateTtlConfig ttlConfig = StateTtlConfig.newBuilder(Time.seconds(10))
.setUpdateType(StateTtlConfig.UpdateType.OnReadAndWrite)
.setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
.build();
listStateDescriptor.enableTimeToLive(ttlConfig);
listState = getRuntimeContext().getListState(listStateDescriptor);
}
状态后端
- 本地化状态管理
- 持久化状态,基于检查点机制,将本地的状态持久化到存储设备中,一般是HDFS
分类:
- HashMapStateBackend: 将状态存储到TaskManager的内存中,读写效果高,不支持大状态存储
- EmbeddedRocksDBBackend: RocksDB是Flink内置的数据库,存储为序列化的字节数组,读写效率比HashMap低,支持超大状态,支持增量检查点。
状态后端的配置
- 代码配置:
env.setStateBackend(new EmbeddedRocksDBStateBackend());
- 提交作业的时候指定状态后端,推荐方式:flink run -Dstate.backend=hashmap|embed
- 在xml文件中指定,state.backend.type
注意:EmbeddedRocksDBStateBackend后端需要添加相关依赖
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-statebackend-rocksdb</artifactId>
<version>${flink.version}</version>
</dependency>
文章来源:https://blog.csdn.net/qq_44273739/article/details/134948544
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。 如若内容造成侵权/违法违规/事实不符,请联系我的编程经验分享网邮箱:veading@qq.com进行投诉反馈,一经查实,立即删除!
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。 如若内容造成侵权/违法违规/事实不符,请联系我的编程经验分享网邮箱:veading@qq.com进行投诉反馈,一经查实,立即删除!