Flink流批一体计算(24):Flink SQL之mysql维表实时关联
2023-12-13 05:56:58
目录
1.维表
目前在实时计算的场景中,大多数都使用过MySQL、Hbase、redis作为维表引擎存储一些维度数据,然后在DataStream API中调用MySQL、Hbase、redis客户端去获取到维度数据进行维度扩充。
本案例采用MySQL创建维表,与创建MySQL sink表语法相同。
2.数据准备
创建源数据
重启kafka,创建Topic:? case_kafka_mysql
写入json格式的数据
? {"ts": "20201011","id": 8,"price_amt":211}
创建维度表
在MySQL中创建名为product_dim的表
CREATE TABLE `product_dim` (
? `id` bigint(11) NOT NULL,
? `coupon_price_amt` bigint(11) DEFAULT NULL,
? PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
向数据表插入如下数据:
INSERT INTO `product_dim` VALUES (1, 1);
INSERT INTO `product_dim` VALUES (3, 1);
INSERT INTO `product_dim` VALUES (8, 1);
创建Sink表
在MySQL中创建名为sync_test_3的表
CREATE TABLE `sync_test_3` (
? `id` bigint(11) NOT NULL AUTO_INCREMENT,
? `ts` varchar(64) DEFAULT NULL,
? `total_gmv` bigint(11) DEFAULT NULL,
? PRIMARY KEY (`id`),
? UNIQUE KEY `uidx` (`ts`) USING BTREE
) ENGINE=InnoDB AUTO_INCREMENT=5 DEFAULT CHARSET=utf8mb4;
3.配置任务
Flink SQL创建kafka源表
create table flink_test_3 (
? id BIGINT,
? ts VARCHAR,
? price_amt BIGINT,
? proctime AS PROCTIME ()
)
?with (
??? 'connector' = 'kafka',
??? 'topic' = 'case_kafka_mysql',
??? 'properties.bootstrap.servers' = '127.0.0.1:9092',
??? 'properties.group.id' = 'flink_gp_test3',
??? 'scan.startup.mode' = 'earliest-offset',
??? 'format' = 'json',
??? 'json.fail-on-missing-field' = 'false',
??? 'json.ignore-parse-errors' = 'true',
??? 'properties.zookeeper.connect' = '127.0.0.1:2181/kafka'
? );
Flink SQL创建MySQL维表
create table flink_test_3_dim (
? id BIGINT,
? coupon_price_amt BIGINT
)
WITH (
?? 'connector' = 'jdbc',
?? 'url' = 'jdbc:mysql://127.0.0.1:3306/db01?characterEncoding=UTF-8',
?? 'table-name' = 'product_dim',
?? 'username' = 'root',
?? 'password' = 'Admin',
?? 'lookup.max-retries' = '3',
?? 'lookup.cache.max-rows' = 1000
?);
WITH参数
参数 | 说明 | 类型 | 备注 |
lookup.cache.max-rows | 指定缓存的最大行数。如果超过该值,则最老的行记录将会过期,会被新的记录替换掉。 | Integer | 默认情况下,维表Cache是未开启的。 |
lookup.cache.ttl | 指定缓存中每行记录的最大存活时间。如果某行记录超过该时间,则该行记录将会过期。 | Duration | 默认情况下,维表Cache是未开启的。你可以设置lookup.cache.max-rows和?lookup.cache.ttl参数来启用维表Cache。启用缓存时,采用的是LRU策略缓存。 |
lookup.cache.caching-missing-key | 是否缓存空的查询结果。 | Boolean | 参数取值如下: true(默认值):缓存空的查询结果。 false:不缓存空的查询结果。 |
lookup.max-retries | 查询数据库失败的最大重试次数。 | Integer | 默认值为3。 |
Flink SQL创建MySQL结果表
CREATE TABLE sync_test_3 (
?????????????????? ts string,
?????????????????? total_gmv bigint,
?????????????????? PRIMARY KEY (ts) NOT ENFORCED
?) WITH (
?? 'connector' = 'jdbc',
?? 'url' = 'jdbc:mysql://127.0.0.1:3306/db01?characterEncoding=UTF-8',
?? 'table-name' = 'sync_test_3',
?? 'username' = 'root',
?? 'password' = 'Admin'
?);
编写计算任务
INSERT INTO sync_test_3
SELECT
? ts,
? SUM(price_amt - coupon_price_amt) AS total_gmv
FROM
? (
??? SELECT
????? a.ts as ts,
????? a.price_amt as price_amt,
????? b.coupon_price_amt as coupon_price_amt
??? FROM
????? flink_test_3 as a
????? LEFT JOIN flink_test_3_dim? FOR SYSTEM_TIME AS OF? a.proctime? as b
???? ON b.id = a.id
? )
GROUP BY ts;
核验数据
SELECT id, ts, total_gmv FROM sync_test_3;
文章来源:https://blog.csdn.net/victory0508/article/details/134832112
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。 如若内容造成侵权/违法违规/事实不符,请联系我的编程经验分享网邮箱:veading@qq.com进行投诉反馈,一经查实,立即删除!
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。 如若内容造成侵权/违法违规/事实不符,请联系我的编程经验分享网邮箱:veading@qq.com进行投诉反馈,一经查实,立即删除!