Flink-1.17集群部署
2023-12-26 14:38:50
1、部署
1.1、修改flink-conf.yaml
1.1.1、flink-17
jobmanager.rpc.address: boshi-122
jobmanager.rpc.port: 6123
# 设置jobmanager总内存
jobmanager.memory.process.size: 2048m
# 设置taskmanager的运行总内存
taskmanager.memory.process.size: 4096mb
# 设置用户代码运行内存
taskmanager.memory.task.heap.size: 3072m
# 设置flink框架内存
taskmanager.memory.framework.heap.size: 128m
# 设置managed memory内存
taskmanager.memory.managed.size: 128m
# 设置堆外内存
taskmanager.memory.framework.off-heap.size: 128m
# 设置网络缓存
taskmanager.memory.network.max: 128m
# 设置JVM内存
taskmanager.memory.jvm-metaspace.size: 256m
taskmanager.memory.jvm-overhead.max: 256m
taskmanager.numberOfTaskSlots: 8
parallelism.default: 1
jobmanager.execution.failover-strategy: region
classloader.check-leaked-classloader: false
akka.ask.timeout: 50s
web.timeout: 50000
heartbeat.timeout: 180000
taskmanager.network.request-backoff.max: 240000
state.savepoints.dir: file:///data/flink/savepoint/
state.checkpoints.dir: file:///data/flink/checkpoint/
env.java.opts: -XX:+UseConcMarkSweepGC -XX:CMSInitiatingOccupancyFraction=75 -XX:+UseCMSInitiatingOccupancyOnly -XX:+AlwaysPreTouch -server -XX:+HeapDumpOnOutOfMemoryError
1.1.2、flink-1-13
jobmanager.rpc.address: boshi-146
jobmanager.rpc.port: 6123
# 设置jobmanager总内存
jobmanager.memory.process.size: 4096m
# 设置taskmanager的运行总内存
taskmanager.memory.process.size: 16384mb
# 设置用户代码运行内存
taskmanager.memory.task.heap.size: 15360m
# 设置flink框架内存
taskmanager.memory.framework.heap.size: 128m
# 设置managed memory内存
taskmanager.memory.managed.size: 128m
# 设置堆外内存
taskmanager.memory.framework.off-heap.size: 128m
# 设置网络缓存
taskmanager.memory.network.max: 128m
# 设置JVM内存
taskmanager.memory.jvm-metaspace.size: 256m
taskmanager.memory.jvm-overhead.max: 256m
taskmanager.numberOfTaskSlots: 2
parallelism.default: 2
jobmanager.execution.failover-strategy: region
classloader.check-leaked-classloader: false
akka.ask.timeout: 100s
web.timeout: 100000
heartbeat.timeout: 180000
taskmanager.network.request-backoff.max: 240000
state.savepoints.dir: hdfs://hdfs-ha/flink/savepoint/
state.checkpoints.dir: hdfs://hdfs-ha/flink/checkpoint/
env.java.opts: -server -XX:+UseG1GC -Xloggc:<LOG_DIR>/gc.log -XX:+PrintGCDetails -XX:-OmitStackTraceInFastThrow -XX:+PrintGCTimeStamps -XX:+PrintGCDateStamps -XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles=20 -XX:GCLogFileSize=100M
1.2、masters
boshi-122:8081
1.3、workers
boshi-129
boshi-137
boshi-144
boshi-166
2、提交任务
2.1、mysql-to-kafka-starrocks
--MySQL表
CREATE TABLE mysql_crawl_enterprise_website (
`id` int,
`eid` varchar,
`enterprise_name` varchar,
`website` varchar,
`html` varchar,
PRIMARY KEY (`id`) NOT ENFORCED
) WITH (
'connector' = 'mysql-cdc',
'hostname' = 'ip',
'port' = '3306',
'username' = 'root',
'password' = '',
'database-name' = 'db_enterprise_outer_resource',
'table-name' = 'crawl_enterprise_website',
'scan.incremental.snapshot.enabled' = 'false'
);
--Kafka表
CREATE TABLE kafka_crawl_enterprise_website (
`id` int,
`eid` varchar,
`enterprise_name` varchar,
`website` varchar,
`html` varchar,
PRIMARY KEY (`id`) NOT ENFORCED
) WITH (
'connector' = 'upsert-kafka',
'topic' = 'ods_crawl_enterprise_website',
'properties.bootstrap.servers' = 'ip:6667,ip:6667,ip:6667',
'properties.group.id' = 'source_province',
'properties.max.request.size' = '512000000',
'properties.session.timeout.ms' = '60000',
'properties.request.timeout.ms' = '40000',
-- 'properties.max.poll.records' = '100',
-- 'properties.auto.offset.reset' = 'latest',
'key.format' = 'json',
'value.format' = 'json'
);
--Starrocks表
CREATE TABLE starrock_ods_crawl_enterprise_website (
`id` int,
`eid` varchar,
`enterprise_name` varchar,
`website` varchar,
`html` varchar,
PRIMARY KEY (`id`) NOT ENFORCED -- 如果要同步的数据库表定义了主键, 则这里也需要定义
) WITH (
'connector' = 'starrocks',
'jdbc-url' = 'jdbc:mysql://ip:9030',
'load-url' = 'ip:8030',
'database-name' = 'ods',
'table-name' = 'ods_crawl_enterprise_website',
'username' = 'starrocks',
'password' = '',
'sink.max-retries' = '5',
-- 'sink.parallelism' = '2',
-- 'sink.version' = 'V1',
-- 'sink.buffer-flush.max-rows' = '64000',
'sink.buffer-flush.max-bytes' = '256000000',
'sink.buffer-flush.interval-ms' = '3000',
-- 'sink.properties.label' ='ods_crawl_enterprise_website',
'sink.properties.format' = 'json',
'sink.properties.strip_outer_array' = 'true',
'sink.properties.ignore_json_size' = 'true' --忽略对 JSON Body 大小的检查
);
--MySQL数据同步到Kafka
insert into kafka_crawl_enterprise_website select * from mysql_crawl_enterprise_website;
--Kafka数据同步到Starrocks
insert into starrock_ods_crawl_enterprise_website select * from kafka_crawl_enterprise_website;
2.2、提交参数
jobmanager.memory.process.size=4096m
taskmanager.memory.process.size=8192m
taskmanager.memory.task.heap.size=7168m
taskmanager.memory.framework.heap.size=128m
taskmanager.memory.framework.off-heap.size=128m
taskmanager.memory.managed.size=128m
taskmanager.memory.network.max=128m
taskmanager.memory.jvm-metaspace.size=256m
taskmanager.memory.jvm-overhead.max=256m
parallelism.default=3
taskmanager.numberOfTaskSlots=1
yarn.containers.vcores=1
文章来源:https://blog.csdn.net/docsz/article/details/135219580
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。 如若内容造成侵权/违法违规/事实不符,请联系我的编程经验分享网邮箱:veading@qq.com进行投诉反馈,一经查实,立即删除!
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。 如若内容造成侵权/违法违规/事实不符,请联系我的编程经验分享网邮箱:veading@qq.com进行投诉反馈,一经查实,立即删除!