Flink入门之DataStream API及kafka消费者
2023-12-13 03:27:04
DataStream API
- 主要流程:
- 获取执行环境
- 读取数据源
- 转换操作
- 输出数据
- Execute触发执行
- 获取执行环境
- 根据实际情况获取StreamExceptionEnvironment.getExecutionEnvironment(conf)
- 创建本地环境StreamExecutionEnvironment.createLocalEnvironment()
- 创建远程环境createRemoteEnvironment(“hadoop102”, 37784, “jar/1.jar”)
- 参数1:主机号
- 参数2:端口号
- 参数3:作业jar包的路径
- 获取数据源
- 简单数据源
- 从集合中读取数据env.fromCollection(集合)
- 从元素列表中获取数据env.fromElements()
- 从文件中读取数据,env.readTextFIle(路径), 已废弃
- 从端口读取数据,env.socketTextStream()
- 文件数据源
- kafka数据源
- DataGen数据源
- 自定义数据源
- 简单数据源
文件数据源
使用文件数据源前,需要先添加相关依赖
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-files</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
public class Flink02_FileSource {
public static void main(String[] args) throw Exception {
//1.创建运行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//默认是最大并行度
env.setParallelism(1);
//file source
FileSource.FileSourceBuilder<String> fileSourceBuilder = FileSource
.<String>forRecordStreamFormat(new TextLineInputFormat("utf-8"), new Path("input/word.txt"));
FileSource<String> fileSource = fileSourceBuilder.build();
//source 算子
DataStreamSource<String> ds = env.fromSource(fileSource, WatermarkStrategy.noWatermarks(), "fileSource");
ds.print();
env.execute();
}
}
DataGen数据源
主要用于生成模拟数据,也需要导入相关依赖
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-datagen</artifactId>
<version>${flink.version}</version>
<scope>compile</scope>
</dependency>
public class Flink04_DataGenSource {
public static void main(String[] args) {
//1.创建运行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//默认是最大并行度
env.setParallelism(1);
DataGeneratorSource<String> dataGeneratorSource = new DataGeneratorSource<>(
new GeneratorFunction<Long, String>() {
@Override
public String map(Long value) throws Exception {
return UUID.randomUUID() + "->" + value;
}
},
100,
RateLimiterStrategy.perSecond(1),
Types.STRING
);
DataStreamSource<String> dataGenDs = env.fromSource(dataGeneratorSource, WatermarkStrategy.noWatermarks(), "dataGenDs");
dataGenDs.print();
try {
env.execute();
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}
Kafka消费者
-
消费方式:拉取
-
消费者对象:KafkaConsumenr
-
消费原则:
一个主题的一个分区只能被一个消费者组中的一个消费者消费
一个消费者组中的一个消费者可以消费一个主题中的多个分区 -
消费者相关的参数:
- key.deserializer 反序列化
- value.deserializer
- bootstrap.servers 集群的位置
- group.id 消费者组id (为何分组,方便同一组的消费者进行断点续传)
- auto.commit.interval.ms 自动提交间隔 默认5s
- enable.auto.commit: 开启自动提交offset偏移量
- auto.offset.reset: 当offset不存在时,offset重置,默认是最末尾的位置
- ①新的消费者组,之前没有消费过,没有记录的offset
- ②当前要消费的offset在kafka中已经不存在,可能是因为时间久了,对应的数据清理掉了
- 重置策略:
- earliest: 头,能消费到分区中现有的数据
- latest: 尾,只能消费到分区中新来的数据
- isolation.level:事务隔离级别
- 读未提交
- 读已提交
-
消费数据存在的问题
- 漏消费,导致数据丢失
- 重复消费,导致数据重复
-
shell 创建生产者对象:kafka-console-producer.sh --bootstrap-server hadoop102:9092 --topic first
public class Flink03_KafkaSource {
public static void main(String[] args) {
//1.创建运行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//默认是最大并行度
env.setParallelism(1);
KafkaSource<String> stringKafkaSource = KafkaSource.<String>builder()
.setBootstrapServers("hadoop102:9092,hadoop103:9092")
.setGroupId("flink")
.setTopics("first")
//优先使用消费者组记录的Offset进行消费,如果offset不存在,根据策略进行重置
.setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.LATEST))
.setValueOnlyDeserializer(new SimpleStringSchema())
//如果还有别的配置需要指定,统一使用通用方法
// .setProperty("isolation.level", "read_committed")
.build();
DataStreamSource<String> kafkaDS = env.fromSource(stringKafkaSource, WatermarkStrategy.noWatermarks(), "kafkaDS");
kafkaDS.print();
try {
env.execute();
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}
文章来源:https://blog.csdn.net/qq_44273739/article/details/134814641
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。 如若内容造成侵权/违法违规/事实不符,请联系我的编程经验分享网邮箱:veading@qq.com进行投诉反馈,一经查实,立即删除!
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。 如若内容造成侵权/违法违规/事实不符,请联系我的编程经验分享网邮箱:veading@qq.com进行投诉反馈,一经查实,立即删除!