Flink流批一体计算(24):Flink SQL之mysql维表实时关联

2023-12-13 05:56:58

目录

1.维表

2.数据准备

创建源数据

创建维度表

创建Sink表

3.配置任务

Flink SQL创建kafka源表

Flink SQL创建MySQL维表

Flink SQL创建MySQL结果表

编写计算任务

核验数据


1.维表

目前在实时计算的场景中,大多数都使用过MySQL、Hbase、redis作为维表引擎存储一些维度数据,然后在DataStream API中调用MySQL、Hbase、redis客户端去获取到维度数据进行维度扩充。

本案例采用MySQL创建维表,与创建MySQL sink表语法相同。

2.数据准备

创建源数据

重启kafka,创建Topic:? case_kafka_mysql

写入json格式的数据

? {"ts": "20201011","id": 8,"price_amt":211}

创建维度表

在MySQL中创建名为product_dim的表

CREATE TABLE `product_dim` (
? `id` bigint(11) NOT NULL,
? `coupon_price_amt` bigint(11) DEFAULT NULL,
? PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;

向数据表插入如下数据:

INSERT INTO `product_dim` VALUES (1, 1);
INSERT INTO `product_dim` VALUES (3, 1);
INSERT INTO `product_dim` VALUES (8, 1);
创建Sink表

在MySQL中创建名为sync_test_3的表

CREATE TABLE `sync_test_3` (
? `id` bigint(11) NOT NULL AUTO_INCREMENT,
? `ts` varchar(64) DEFAULT NULL,
? `total_gmv` bigint(11) DEFAULT NULL,
? PRIMARY KEY (`id`),
? UNIQUE KEY `uidx` (`ts`) USING BTREE
) ENGINE=InnoDB AUTO_INCREMENT=5 DEFAULT CHARSET=utf8mb4;

3.配置任务

Flink SQL创建kafka源表
create table flink_test_3 (
? id BIGINT,
? ts VARCHAR,
? price_amt BIGINT,
? proctime AS PROCTIME ()
)
?with (
??? 'connector' = 'kafka',
??? 'topic' = 'case_kafka_mysql',
??? 'properties.bootstrap.servers' = '127.0.0.1:9092',
??? 'properties.group.id' = 'flink_gp_test3',
??? 'scan.startup.mode' = 'earliest-offset',
??? 'format' = 'json',
??? 'json.fail-on-missing-field' = 'false',
??? 'json.ignore-parse-errors' = 'true',
??? 'properties.zookeeper.connect' = '127.0.0.1:2181/kafka'
? );
Flink SQL创建MySQL维表
create table flink_test_3_dim (
? id BIGINT,
? coupon_price_amt BIGINT
)
WITH (
?? 'connector' = 'jdbc',
?? 'url' = 'jdbc:mysql://127.0.0.1:3306/db01?characterEncoding=UTF-8',
?? 'table-name' = 'product_dim',
?? 'username' = 'root',
?? 'password' = 'Admin',
?? 'lookup.max-retries' = '3',
?? 'lookup.cache.max-rows' = 1000
?);

WITH参数

参数

说明

类型

备注

lookup.cache.max-rows

指定缓存的最大行数。如果超过该值,则最老的行记录将会过期,会被新的记录替换掉。

Integer

默认情况下,维表Cache是未开启的。

lookup.cache.ttl

指定缓存中每行记录的最大存活时间。如果某行记录超过该时间,则该行记录将会过期。

Duration

默认情况下,维表Cache是未开启的。你可以设置lookup.cache.max-rows?lookup.cache.ttl参数来启用维表Cache。启用缓存时,采用的是LRU策略缓存。

lookup.cache.caching-missing-key

是否缓存空的查询结果。

Boolean

参数取值如下:

true(默认值):缓存空的查询结果。

false:不缓存空的查询结果。

lookup.max-retries

查询数据库失败的最大重试次数。

Integer

默认值为3

Flink SQL创建MySQL结果表
CREATE TABLE sync_test_3 (
?????????????????? ts string,
?????????????????? total_gmv bigint,
?????????????????? PRIMARY KEY (ts) NOT ENFORCED
?) WITH (
?? 'connector' = 'jdbc',
?? 'url' = 'jdbc:mysql://127.0.0.1:3306/db01?characterEncoding=UTF-8',
?? 'table-name' = 'sync_test_3',
?? 'username' = 'root',
?? 'password' = 'Admin'
?);
编写计算任务
INSERT INTO sync_test_3
SELECT
? ts,
? SUM(price_amt - coupon_price_amt) AS total_gmv
FROM
? (
??? SELECT
????? a.ts as ts,
????? a.price_amt as price_amt,
????? b.coupon_price_amt as coupon_price_amt
??? FROM
????? flink_test_3 as a
????? LEFT JOIN flink_test_3_dim? FOR SYSTEM_TIME AS OF? a.proctime? as b
???? ON b.id = a.id
? )
GROUP BY ts;
核验数据
SELECT id, ts, total_gmv FROM sync_test_3;

文章来源:https://blog.csdn.net/victory0508/article/details/134832112
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。