Flink之Task重启策略
2024-01-09 17:39:16
Task重启策略
1 策略API
-
noRestart
无参数,task失败后不重启,整个job同时失败,默认策略.
代码示例
RestartStrategies.noRestart();
-
fixedDelayRestart
参数 注释 restartAttempts 最大重启次数 delayBetweenAttempts 重启时间间隔 代码示例
// 最多重启5次,每次任务失败后间隔1s重启 RestartStrategies.fixedDelayRestart(5, 1000);
-
exponentialDelayRestart
参数 注释 initialBackoff 重启间隔惩罚时长初始值(重启延迟时间) maxBackoff 重启间隔最大惩罚时长 backoffMultiplier 重启间隔时长的惩罚倍数 resetBackoffThreshold 重置惩罚时长的平稳运行时长(平稳运行时长达到这个阈值后,再次发生故障则重启延迟时间恢复到初始值) jitterFactor 取一个随机数,来加在重启时间点上,已让每次重启的时间呈现一定随机性(避免某一时刻集群中有大量的task同时重启,如果task重启时间是规律性的就可能发生这种情况) 代码示例
// 第一次失败后间隔1s重启任务,如果稳定运行时长没有达到120s就发生task失败,则重启间隔时长=上一次重启间隔时长*1.2,如果稳定运行时长超过120秒则重启间隔时长恢复到1s RestartStrategies.exponentialDelayRestart(Time.seconds(1), Time.seconds(30), 1.2, Time.seconds(120), 0.56);
-
failureRateRestart
参数 注释 failureRate 指定时间范围内的最大Task任务失败率(次数) failureInterval 指定时间范围 delayInterval 重启时间间隔 代码示例
// task失败重启间隔为1s,只要在30分钟内task失败重启次数没超过3次就可以一直执行这个策略,如果超过则job停止 RestartStrategies.failureRateRestart(3, Time.minutes(30), Time.seconds(1));
-
fallBackRestart
无参数,常用于自定义的RestartStrategy,即用户自定义了重启策略,且将其配置在了flink-conf.yaml文件中,也就是说调用这个方法时会读取集群的配置文件,根据配置文件的内容调整策略
代码示例
RestartStrategies.fallBackRestart();
2 代码详情
public class FlinkCheckpoint {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 开启并设置checkpoint的时间间隔
env.enableCheckpointing(3000);
// 设置checkpoint的存储位置
env.getCheckpointConfig().setCheckpointStorage(new Path("hdfs://lx01:8020/flink-ck"));
// 允许checkpoint失败的最大次数
env.getCheckpointConfig().setTolerableCheckpointFailureNumber(2);
// checkpoint的算法模式,是否需要对其(EXACTLY_ONCE)
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
// job取消是否保留checkpoint数据
env.getCheckpointConfig().setExternalizedCheckpointCleanup(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
// 设置checkpoint对齐的超时时间
env.getCheckpointConfig().setAlignedCheckpointTimeout(Duration.ofMillis(10000));
// 两次checkpoint的最小时间间隔
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(2000);
// 并行最大的checkpoint数
env.getCheckpointConfig().setMaxConcurrentCheckpoints(3);
// 选择后端状态(默认HashMapStateBackend)
env.setStateBackend(new EmbeddedRocksDBStateBackend());
// TODO Task重启策略
RestartStrategies.RestartStrategyConfiguration restartStrategy = null;
// 第一次失败后间隔1s重启任务,如果稳定运行时长没有达到120s就发生task失败,则重启间隔时长=上一次重启间隔时长*1.2,如果稳定运行时长超过120秒则重启间隔时长恢复到1s
restartStrategy = RestartStrategies.exponentialDelayRestart(Time.seconds(1), Time.seconds(30), 1.2, Time.seconds(120), 0.56);
// 配置Task重启策略
env.setRestartStrategy(restartStrategy);
// ...业务代码
env.execute();
}
}
文章来源:https://blog.csdn.net/AnameJL/article/details/135484753
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。 如若内容造成侵权/违法违规/事实不符,请联系我的编程经验分享网邮箱:veading@qq.com进行投诉反馈,一经查实,立即删除!
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。 如若内容造成侵权/违法违规/事实不符,请联系我的编程经验分享网邮箱:veading@qq.com进行投诉反馈,一经查实,立即删除!