Flink构造宽表实时入库案例介绍
2024-01-10 11:21:55
1. 安装包准备
Flink 1.15.4 安装包 |
Flink cdc的mysql连接器 |
Flink sql的sdb连接器 |
MySQL驱动 |
SDB驱动 |
Flink jdbc的mysql连接器 |
?
2. 入库流程图
3. Flink安装部署
- 上传Flink压缩包到服务器,并解压
tar -zxvf ?flink-1.14.5-bin-scala_2.11.tgz ?-C /opt/ |
- 复制依赖至Flink中
cp sdb-flink-connector-3.4.8-jar-with-dependencies.jar /opt/flink-1.14.5/lib cp sequoiadb-driver-3.4.8.jre8.jar?/opt/flink-1.14.5/lib cp flink-sql-connector-mysql-cdc-2.2.1.jar /opt/flink-1.14.5/lib cp flink-connector-jdbc_2.11-1.14.6.jar /opt/flink-1.14.5/lib |
- 修改flink-conf.yaml文件
vi conf/flink-conf.yaml
### 配置Master的机器名(IP地址) jobmanager.rpc.address: sdb1 ### 配置每个taskmanager 生成的临时文件夹 io.tmp.dirs: /opt/flink-1.14.5/tmp |
- 修改master文件
vi conf/masters
#作为master的ip和端口号 upgrade1:8081 |
- 修改worker文件
vi conf/workers
#集群主机名 upgrade1 upgrade2 upgrade3 |
- 拷贝到集群其他机器
scp -r /opt/flink-1.14.5 sdbadmin@upgrade2:/opt/ scp -r /opt/flink-1.14.5 sdbadmin@upgrade3:/opt/ |
- 启动flink集群
[sdbadmin@upgrade1 flink-1.14.5]$ ./bin/start-cluster.sh |
- 启动flink-SQL
[sdbadmin@upgrade1 flink-1.14.5]$ ./bin/sql-client.sh |
4. 实时入库
编写造数程序进行造数
4.1 环境准备
4.1.1 开启mysql的binlog
- 创建binlog文件夹
[sdbadmin@upgrade1 mysql]$ mkdir /opt/sequoiasql/mysql/database/3306/binlog |
- 开启binlog
vim /opt/sequoiasql/mysql/database/3306/auto.cnf
>>配置以下内容: log_bin=/opt/sequoiasql/mysql/database/3306/binlog binlog_format=ROW expire_logs_days=1 server_id=1 |
配置完成之后,重启mysql
[sdbadmin@upgrade1 mysql]$ ./bin/sdb_mysql_ctl stop myinst [sdbadmin@upgrade1 mysql]$ ./bin/sdb_mysql_ctl start myinst |
4.1.2 创建mysql表
创建库
create database sbtest; use sbtest; |
创建表
CREATE TABLE sbtest1 ( ????id INT UNSIGNED AUTO_INCREMENT, ????uuid INT(10), ????name1 CHAR(120), ????age INT(4), ????time1 DATETIME, ????PRIMARY KEY(id) ); |
CREATE TABLE sbtest2 ( ????id INT UNSIGNED AUTO_INCREMENT, ????uuid INT(10), ????name2 CHAR(120), ????age INT(4), ????time1 DATETIME, ????PRIMARY KEY(id) ); |
CREATE TABLE sbtest3 ( ????id INT UNSIGNED AUTO_INCREMENT, ????uuid INT(10), ????name3 CHAR(120), ????age INT(4), ????time1 DATETIME, ????PRIMARY KEY(id) ); |
创建flink入库表
CREATE TABLE sbtest4 ( ????id INT UNSIGNED AUTO_INCREMENT, ????uuid INT(10), ????name1 CHAR(120), ????name2 CHAR(120), ????name3 CHAR(120), ????age INT(4), ????time1 DATETIME, ????PRIMARY KEY(id) ); |
4.1.3 创建flink映射表
需要用到flink-sql-connector-mysql-cdc-2.2.1.jar
CREATE TABLE sbtest1_mysql ( ????id INT, ????uuid INT, ????name1 CHAR(120), ????age INT, ????time1 TIMESTAMP, ????PRIMARY KEY (id) NOT ENFORCED ??) WITH ( ????'connector' = 'mysql-cdc', ????'hostname' = '192.168.223.135', ????'port' = '3306', ????'username' = 'root', ????'password' = 'root', ????'database-name' = 'sbtest', ????'table-name' = 'sbtest1' ); |
CREATE TABLE sbtest2_mysql ( ????id INT, ????uuid INT, ????name2 CHAR(120), ????age INT, ????time1 TIMESTAMP, ????PRIMARY KEY (id) NOT ENFORCED ??) WITH ( ????'connector' = 'mysql-cdc', ????'hostname' = '192.168.223.135', ????'port' = '3306', ????'username' = 'root', ????'password' = 'root', ????'database-name' = 'sbtest', ????'table-name' = 'sbtest2' ); |
CREATE TABLE sbtest3_mysql ( ????id INT, ????uuid INT, ????name3 CHAR(120), ????age INT, ????time1 TIMESTAMP, ????PRIMARY KEY (id) NOT ENFORCED ??) WITH ( ????'connector' = 'mysql-cdc', ????'hostname' = '192.168.223.135', ????'port' = '3306', ????'username' = 'root', ????'password' = 'root', ????'database-name' = 'sbtest', ????'table-name' = 'sbtest3' ); |
创建flink --> ?mysql入库映射表
需要用到flink-connector-jdbc_2.11-1.14.6.jar
CREATE TABLE sbtest4_mysql ( ????id BIGINT, ????uuid INT, ????name1 CHAR(120), ????name2 CHAR(120), ????name3 CHAR(120), ????age INT, ????time1 TIMESTAMP, ????PRIMARY KEY (id) NOT ENFORCED ??) WITH ( ????'connector' = 'jdbc', ????'url' = 'jdbc:mysql://192.168.223.135:3306/sbtest', ????'username' = 'root', ????'password' = 'root', ????'table-name' = 'sbtest4' ); |
创建flink --> ?mysql入库映射表
需要用到sdb-flink-connector-3.4.8-jar-with-dependencies.jar
CREATE TABLE sbtest_sdb ( ????id BIGINT, ????uuid INT, ????name1 CHAR(120), ????name2 CHAR(120), ????name3 CHAR(120), ????age INT, ????time1 TIMESTAMP, ????PRIMARY KEY (id) NOT ENFORCED ) WITH ( ????'connector' = 'sequoiadb', ????'bulksize' = '1', ????'hosts' = '192.168.223.135:11810', ????'collectionspace' = 'sbtest', ????'collection' = 'sbtest4' ); |
4.2 MySQL实时入库
4.2.1 Flink left join
select sdb1.id, sdb1.uuid, sdb1.name1, sdb2.name2, sdb3.name3, sdb1.age, sdb1.time1 from sbtest1_mysql sdb1 left join sbtest2_mysql sdb2 on sdb1.id = sdb2.id left join sbtest3_mysql sdb3 on sdb1.id = sdb3.id; |
4.2.2 mysql实时入库
insert into sbtest4_mysql select sdb1.id, sdb1.uuid, sdb1.name1, sdb2.name2, sdb3.name3, sdb1.age, sdb1.time1 from sbtest1_mysql sdb1 left join sbtest2_mysql sdb2 on sdb1.id = sdb2.id left join sbtest3_mysql sdb3 on sdb1.id = sdb3.id; |
查看Flink任务
查看可以成功入库
4.3 SDB实时入库
4.3.1 Flink left join
select sdb1.id, sdb1.uuid, sdb1.name1, sdb2.name2, sdb3.name3, sdb1.age, sdb1.time1 from sbtest1_mysql sdb1 left join sbtest2_mysql sdb2 on sdb1.id = sdb2.id left join sbtest3_mysql sdb3 on sdb1.id = sdb3.id; |
4.3.2 sdb实时入库
insert into sbtest_sdb select sdb1.id, sdb1.uuid, sdb1.name1, sdb2.name2, sdb3.name3, sdb1.age, sdb1.time1 from sbtest1_mysql sdb1 left join sbtest2_mysql sdb2 on sdb1.id = sdb2.id left join sbtest3_mysql sdb3 on sdb1.id = sdb3.id; |
查看Flink任务
显示已经成功入库
文章来源:https://blog.csdn.net/u014439239/article/details/135495181
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。
如若内容造成侵权/违法违规/事实不符,请联系我的编程经验分享网邮箱:veading@qq.com进行投诉反馈,一经查实,立即删除!