Flink-1.17集群部署

2023-12-26 14:38:50

1、部署

1.1、修改flink-conf.yaml

1.1.1、flink-17
jobmanager.rpc.address: boshi-122
jobmanager.rpc.port: 6123
# 设置jobmanager总内存
jobmanager.memory.process.size: 2048m
# 设置taskmanager的运行总内存
taskmanager.memory.process.size: 4096mb
# 设置用户代码运行内存
taskmanager.memory.task.heap.size: 3072m
# 设置flink框架内存
taskmanager.memory.framework.heap.size: 128m
# 设置managed memory内存
taskmanager.memory.managed.size: 128m
# 设置堆外内存
taskmanager.memory.framework.off-heap.size: 128m
# 设置网络缓存
taskmanager.memory.network.max: 128m
# 设置JVM内存
taskmanager.memory.jvm-metaspace.size: 256m
taskmanager.memory.jvm-overhead.max: 256m
taskmanager.numberOfTaskSlots: 8
parallelism.default: 1
jobmanager.execution.failover-strategy: region
classloader.check-leaked-classloader: false
akka.ask.timeout: 50s
web.timeout: 50000
heartbeat.timeout: 180000
taskmanager.network.request-backoff.max: 240000
state.savepoints.dir: file:///data/flink/savepoint/
state.checkpoints.dir: file:///data/flink/checkpoint/
env.java.opts: -XX:+UseConcMarkSweepGC -XX:CMSInitiatingOccupancyFraction=75 -XX:+UseCMSInitiatingOccupancyOnly -XX:+AlwaysPreTouch -server -XX:+HeapDumpOnOutOfMemoryError
1.1.2、flink-1-13
jobmanager.rpc.address: boshi-146
jobmanager.rpc.port: 6123
# 设置jobmanager总内存
jobmanager.memory.process.size: 4096m
# 设置taskmanager的运行总内存
taskmanager.memory.process.size: 16384mb
# 设置用户代码运行内存
taskmanager.memory.task.heap.size: 15360m
# 设置flink框架内存
taskmanager.memory.framework.heap.size: 128m
# 设置managed memory内存
taskmanager.memory.managed.size: 128m
# 设置堆外内存
taskmanager.memory.framework.off-heap.size: 128m
# 设置网络缓存
taskmanager.memory.network.max: 128m
# 设置JVM内存
taskmanager.memory.jvm-metaspace.size: 256m
taskmanager.memory.jvm-overhead.max: 256m
taskmanager.numberOfTaskSlots: 2
parallelism.default: 2
jobmanager.execution.failover-strategy: region
classloader.check-leaked-classloader: false
akka.ask.timeout: 100s
web.timeout: 100000
heartbeat.timeout: 180000
taskmanager.network.request-backoff.max: 240000
state.savepoints.dir: hdfs://hdfs-ha/flink/savepoint/
state.checkpoints.dir: hdfs://hdfs-ha/flink/checkpoint/
env.java.opts: -server -XX:+UseG1GC -Xloggc:<LOG_DIR>/gc.log -XX:+PrintGCDetails -XX:-OmitStackTraceInFastThrow -XX:+PrintGCTimeStamps -XX:+PrintGCDateStamps -XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles=20 -XX:GCLogFileSize=100M

1.2、masters

boshi-122:8081

1.3、workers

boshi-129
boshi-137
boshi-144
boshi-166

2、提交任务

2.1、mysql-to-kafka-starrocks

--MySQL表
CREATE TABLE mysql_crawl_enterprise_website (
  `id` int,
  `eid` varchar,
  `enterprise_name` varchar,
  `website` varchar,
  `html` varchar,
   PRIMARY KEY (`id`) NOT ENFORCED
) WITH (
'connector' = 'mysql-cdc',
'hostname' = 'ip',
'port' = '3306',
'username' = 'root',
'password' = '',
'database-name' = 'db_enterprise_outer_resource',
'table-name' = 'crawl_enterprise_website',
'scan.incremental.snapshot.enabled' = 'false'
);

--Kafka表
CREATE TABLE kafka_crawl_enterprise_website (
  `id` int,
  `eid` varchar,
  `enterprise_name` varchar,
  `website` varchar,
  `html` varchar,
  PRIMARY KEY (`id`) NOT ENFORCED
) WITH (
    'connector' = 'upsert-kafka',
    'topic' = 'ods_crawl_enterprise_website',
    'properties.bootstrap.servers' = 'ip:6667,ip:6667,ip:6667',
    'properties.group.id' = 'source_province',
	  'properties.max.request.size' = '512000000',
    'properties.session.timeout.ms' = '60000',
    'properties.request.timeout.ms' = '40000',
    -- 'properties.max.poll.records' = '100',
    -- 'properties.auto.offset.reset' = 'latest',
    'key.format' = 'json',
    'value.format' = 'json'
);

--Starrocks表
CREATE TABLE starrock_ods_crawl_enterprise_website (
  `id` int,
  `eid` varchar,
  `enterprise_name` varchar,
  `website` varchar,
  `html` varchar,
   PRIMARY KEY (`id`) NOT ENFORCED -- 如果要同步的数据库表定义了主键, 则这里也需要定义
) WITH (
  'connector' = 'starrocks',
  'jdbc-url' = 'jdbc:mysql://ip:9030',
  'load-url' = 'ip:8030',
  'database-name' = 'ods',
  'table-name' = 'ods_crawl_enterprise_website',
  'username' = 'starrocks',
  'password' = '',
  'sink.max-retries' = '5',
  -- 'sink.parallelism' = '2',
  -- 'sink.version' = 'V1',
  -- 'sink.buffer-flush.max-rows' = '64000',
  'sink.buffer-flush.max-bytes' = '256000000',
  'sink.buffer-flush.interval-ms' = '3000',
  -- 'sink.properties.label' ='ods_crawl_enterprise_website',
  'sink.properties.format' = 'json',
  'sink.properties.strip_outer_array' = 'true',
  'sink.properties.ignore_json_size' = 'true' --忽略对 JSON Body 大小的检查

);

--MySQL数据同步到Kafka
insert into kafka_crawl_enterprise_website select * from mysql_crawl_enterprise_website;
--Kafka数据同步到Starrocks
insert into starrock_ods_crawl_enterprise_website select * from kafka_crawl_enterprise_website;

2.2、提交参数

jobmanager.memory.process.size=4096m
taskmanager.memory.process.size=8192m
taskmanager.memory.task.heap.size=7168m
taskmanager.memory.framework.heap.size=128m
taskmanager.memory.framework.off-heap.size=128m
taskmanager.memory.managed.size=128m
taskmanager.memory.network.max=128m
taskmanager.memory.jvm-metaspace.size=256m
taskmanager.memory.jvm-overhead.max=256m
parallelism.default=3
taskmanager.numberOfTaskSlots=1
yarn.containers.vcores=1

文章来源:https://blog.csdn.net/docsz/article/details/135219580
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。