【flink番外篇】9、Flink Table API 支持的操作示例(14)- 时态表的join(java版本)
Flink 系列文章
一、Flink 专栏
Flink 专栏系统介绍某一知识点,并辅以具体的示例进行说明。
-
1、Flink 部署系列
本部分介绍Flink的部署、配置相关基础内容。 -
2、Flink基础系列
本部分介绍Flink 的基础部分,比如术语、架构、编程模型、编程指南、基本的datastream api用法、四大基石等内容。 -
3、Flik Table API和SQL基础系列
本部分介绍Flink Table Api和SQL的基本用法,比如Table API和SQL创建库、表用法、查询、窗口函数、catalog等等内容。 -
4、Flik Table API和SQL提高与应用系列
本部分是table api 和sql的应用部分,和实际的生产应用联系更为密切,以及有一定开发难度的内容。 -
5、Flink 监控系列
本部分和实际的运维、监控工作相关。
二、Flink 示例专栏
Flink 示例专栏是 Flink 专栏的辅助说明,一般不会介绍知识点的信息,更多的是提供一个一个可以具体使用的示例。本专栏不再分目录,通过链接即可看出介绍的内容。
两专栏的所有文章入口点击:Flink 系列文章汇总索引
文章目录
本文通过两个示例介绍了时态表TemporalTableFunction的join操作。
如果需要了解更多内容,可以在本人Flink 专栏中了解更新系统的内容。
本文除了maven依赖外,没有其他依赖。
本文更详细的内容可参考文章:
17、Flink 之Table API: Table API 支持的操作(1)
17、Flink 之Table API: Table API 支持的操作(2)
本专题分为以下几篇文章:
【flink番外篇】9、Flink Table API 支持的操作示例(1)-通过Table API和SQL创建表
【flink番外篇】9、Flink Table API 支持的操作示例(2)- 通过Table API 和 SQL 创建视图
【flink番外篇】9、Flink Table API 支持的操作示例(3)- 通过API查询表和使用窗口函数的查询
【flink番外篇】9、Flink Table API 支持的操作示例(4)- Table API 对表的查询、过滤操作
【flink番外篇】9、Flink Table API 支持的操作示例(5)- 表的列操作
【flink番外篇】9、Flink Table API 支持的操作示例(6)- 表的聚合(group by、Distinct、GroupBy/Over Window Aggregation)操作
【flink番外篇】9、Flink Table API 支持的操作示例(7)- 表的join操作(内联接、外联接以及联接自定义函数等)
【flink番外篇】9、Flink Table API 支持的操作示例(8)- 时态表的join(scala版本)
【flink番外篇】9、Flink Table API 支持的操作示例(9)- 表的union、unionall、intersect、intersectall、minus、minusall和in的操作
【flink番外篇】9、Flink Table API 支持的操作示例(10)- 表的OrderBy、Offset 和 Fetch、insert操作
【flink番外篇】9、Flink Table API 支持的操作示例(11)- Group Windows(tumbling、sliding和session)操作
【flink番外篇】9、Flink Table API 支持的操作示例(12)- Over Windows(有界和无界的over window)操作
【flink番外篇】9、Flink Table API 支持的操作示例(13)- Row-based(map、flatmap、aggregate、group window aggregate等)操作
【flink番外篇】9、Flink Table API 支持的操作示例(14)- 时态表的join(java版本)
【flink番外篇】9、Flink Table API 支持的操作示例(1)-完整版
【flink番外篇】9、Flink Table API 支持的操作示例(2)-完整版
一、maven依赖
本文maven依赖参考文章:【flink番外篇】9、Flink Table API 支持的操作示例(1)-通过Table API和SQL创建表 中的依赖,为节省篇幅不再赘述。
二、时态表的join
假设有一张订单表Orders和一张汇率表Rates,那么订单来自于不同的地区,所以支付的币种各不一样,那么假设需要统计每个订单在下单时候Yen币种对应的金额。
1、统计需求对应的SQL
SELECT o.currency, o.amount, r.rate
o.amount * r.rate AS yen_amount
FROM
Orders AS o,
LATERAL TABLE (Rates(o.rowtime)) AS r
WHERE r.currency = o.currency
2、Without connnector 实现代码
就是使用静态数据实现,其验证结果在代码中的注释部分。
/*
* @Author: alanchan
* @LastEditors: alanchan
* @Description:
*/
import static org.apache.flink.table.api.Expressions.$;
import java.time.Duration;
import java.util.Arrays;
import java.util.List;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.functions.TemporalTableFunction;
import org.apache.flink.types.Row;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
public class TestTemporalTableFunctionDemo {
// 维表
@Data
@NoArgsConstructor
@AllArgsConstructor
public static class Rate {
private String currency;
private Integer rate;
private Long rate_time;
}
// 事实表
@Data
@NoArgsConstructor
@AllArgsConstructor
public static class Order {
private Long total;
private String currency;
private Long order_time;
}
final static List<Rate> rateList = Arrays.asList(
new Rate("US Dollar", 102, 1L),
new Rate("Euro", 114, 1L),
new Rate("Yen", 1, 1L),
new Rate("Euro", 116, 5L),
new Rate("Euro", 119, 7L)
);
final static List<Order> orderList = Arrays.asList(
new Order(2L, "Euro", 2L),
new Order(1L, "US Dollar", 3L),
new Order(50L, "Yen", 4L),
new Order(3L, "Euro", 5L)
);
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tenv = StreamTableEnvironment.create(env);
// order 实时流 事实表
DataStream<Order> orderDs = env.fromCollection(orderList)
.assignTimestampsAndWatermarks(WatermarkStrategy
.<Order>forBoundedOutOfOrderness(Duration.ofSeconds(10))
.withTimestampAssigner((order, rTimeStamp) -> order.getOrder_time()));
// rate 实时流 维度表
DataStream<Rate> rateDs = env.fromCollection(rateList)
.assignTimestampsAndWatermarks(WatermarkStrategy
.<Rate>forBoundedOutOfOrderness(Duration.ofSeconds(10))
.withTimestampAssigner((rate, rTimeStamp) -> rate.getRate_time()));
// 转变为Table
Table orderTable = tenv.fromDataStream(orderDs, $("total"), $("currency"), $("order_time").rowtime());
Table rateTable = tenv.fromDataStream(rateDs, $("currency"), $("rate"), $("rate_time").rowtime());
tenv.createTemporaryView("alan_orderTable", orderTable);
tenv.createTemporaryView("alan_rateTable", rateTable);
// 定义一个TemporalTableFunction
TemporalTableFunction rateDim = rateTable.createTemporalTableFunction($("rate_time"), $("currency"));
// 注册表函数
// tenv.registerFunction("alan_rateDim", rateDim);
tenv.createTemporarySystemFunction("alan_rateDim", rateDim);
String sql = "select o.*,r.rate from alan_orderTable as o,Lateral table (alan_rateDim(o.order_time)) r where r.currency = o.currency ";
// 关联查询
Table result = tenv.sqlQuery(sql);
// 打印输出
DataStream resultDs = tenv.toAppendStream(result, Row.class);
resultDs.print();
// rate 流数据(维度表)
// rateList
// order 流数据
// orderList
// 控制台输出
// 2> +I[2, Euro, 1970-01-01T00:00:00.002, 114]
// 5> +I[50, Yen, 1970-01-01T00:00:00.004, 1]
// 16> +I[1, US Dollar, 1970-01-01T00:00:00.003, 102]
// 2> +I[3, Euro, 1970-01-01T00:00:00.005, 116]
env.execute();
}
}
3、With connnector 实现代码
本处使用的是kafka作为数据源来实现。其验证结果在代码中的注释部分。
1)、bean定义
package org.tablesql.join.bean;
import java.io.Serializable;
import lombok.Data;
/*
* @Author: alanchan
* @LastEditors: alanchan
* @Description:
*/
@Data
public class CityInfo implements Serializable {
private Integer cityId;
private String cityName;
private Long ts;
}
package org.tablesql.join.bean;
import java.io.Serializable;
import lombok.Data;
/*
* @Author: alanchan
* @LastEditors: alanchan
* @Description:
*/
@Data
public class UserInfo implements Serializable {
private String userName;
private Integer cityId;
private Long ts;
}
2)、序列化定义
package org.tablesql.join.bean;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.TypeReference;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
/*
* @Author: alanchan
* @LastEditors: alanchan
* @Description:
*/
public class CityInfoSchema implements DeserializationSchema<CityInfo> {
@Override
public CityInfo deserialize(byte[] message) throws IOException {
String jsonStr = new String(message, StandardCharsets.UTF_8);
CityInfo data = JSON.parseObject(jsonStr, new TypeReference<CityInfo>() {});
return data;
}
@Override
public boolean isEndOfStream(CityInfo nextElement) {
return false;
}
@Override
public TypeInformation<CityInfo> getProducedType() {
return TypeInformation.of(new TypeHint<CityInfo>() {
});
}
}
package org.tablesql.join.bean;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.TypeReference;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
/*
* @Author: alanchan
* @LastEditors: alanchan
* @Description:
*/
public class UserInfoSchema implements DeserializationSchema<UserInfo> {
@Override
public UserInfo deserialize(byte[] message) throws IOException {
String jsonStr = new String(message, StandardCharsets.UTF_8);
UserInfo data = JSON.parseObject(jsonStr, new TypeReference<UserInfo>() {});
return data;
}
@Override
public boolean isEndOfStream(UserInfo nextElement) {
return false;
}
@Override
public TypeInformation<UserInfo> getProducedType() {
return TypeInformation.of(new TypeHint<UserInfo>() {
});
}
}
3)、实现
package org.tablesql.join;
import static org.apache.flink.table.api.Expressions.$;
import java.time.Duration;
import java.util.Properties;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.functions.TemporalTableFunction;
import org.apache.flink.types.Row;
import org.tablesql.join.bean.CityInfo;
import org.tablesql.join.bean.CityInfoSchema;
import org.tablesql.join.bean.UserInfo;
import org.tablesql.join.bean.UserInfoSchema;
/*
* @Author: alanchan
* @LastEditors: alanchan
* @Description:
*/
public class TestJoinDimByKafkaEventTimeDemo {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
// Kafka的ip和要消费的topic,//Kafka设置
Properties props = new Properties();
props.setProperty("bootstrap.servers", "192.168.10.41:9092,192.168.10.42:9092,192.168.10.43:9092");
props.setProperty("group.id", "kafkatest");
// 读取用户信息Kafka
FlinkKafkaConsumer<UserInfo> userConsumer = new FlinkKafkaConsumer<UserInfo>("user", new UserInfoSchema(),props);
userConsumer.setStartFromEarliest();
userConsumer.assignTimestampsAndWatermarks(WatermarkStrategy
.<UserInfo>forBoundedOutOfOrderness(Duration.ofSeconds(0))
.withTimestampAssigner((user, rTimeStamp) -> user.getTs()) // 该句如果不加,则是默认为kafka的事件时间
);
// 读取城市维度信息Kafka
FlinkKafkaConsumer<CityInfo> cityConsumer = new FlinkKafkaConsumer<CityInfo>("city", new CityInfoSchema(), props);
cityConsumer.setStartFromEarliest();
cityConsumer.assignTimestampsAndWatermarks(WatermarkStrategy
.<CityInfo>forBoundedOutOfOrderness(Duration.ofSeconds(0))
.withTimestampAssigner((city, rTimeStamp) -> city.getTs()) // 该句如果不加,则是默认为kafka的事件时间
);
Table userTable = tableEnv.fromDataStream(env.addSource(userConsumer), $("userName"), $("cityId"), $("ts").rowtime());
Table cityTable = tableEnv.fromDataStream(env.addSource(cityConsumer), $("cityId"), $("cityName"),$("ts").rowtime());
tableEnv.createTemporaryView("userTable", userTable);
tableEnv.createTemporaryView("cityTable", cityTable);
// 定义一个TemporalTableFunction
TemporalTableFunction dimCity = cityTable.createTemporalTableFunction($("ts"), $("cityId"));
// 注册表函数
// tableEnv.registerFunction("dimCity", dimCity);
tableEnv.createTemporarySystemFunction("dimCity", dimCity);
Table u = tableEnv.sqlQuery("select * from userTable");
// u.printSchema();
tableEnv.toAppendStream(u, Row.class).print("user流接收到:");
Table c = tableEnv.sqlQuery("select * from cityTable");
// c.printSchema();
tableEnv.toAppendStream(c, Row.class).print("city流接收到:");
// 关联查询
Table result = tableEnv
.sqlQuery("select u.userName,u.cityId,d.cityName,u.ts " +
"from userTable as u " +
", Lateral table (dimCity(u.ts)) d " +
"where u.cityId=d.cityId");
// 打印输出
DataStream resultDs = tableEnv.toAppendStream(result, Row.class);
resultDs.print("\t关联输出:");
// 用户信息格式:
// {"userName":"user1","cityId":1,"ts":0}
// {"userName":"user1","cityId":1,"ts":1}
// {"userName":"user1","cityId":1,"ts":4}
// {"userName":"user1","cityId":1,"ts":5}
// {"userName":"user1","cityId":1,"ts":7}
// {"userName":"user1","cityId":1,"ts":9}
// {"userName":"user1","cityId":1,"ts":11}
// kafka-console-producer.sh --broker-list server1:9092 --topic user
// 城市维度格式:
// {"cityId":1,"cityName":"nanjing","ts":15}
// {"cityId":1,"cityName":"beijing","ts":1}
// {"cityId":1,"cityName":"shanghai","ts":5}
// {"cityId":1,"cityName":"shanghai","ts":7}
// {"cityId":1,"cityName":"wuhan","ts":10}
// kafka-console-producer.sh --broker-list server1:9092 --topic city
// 输出
// city流接收到::6> +I[1, beijing, 1970-01-01T00:00:00.001]
// user流接收到::6> +I[user1, 1, 1970-01-01T00:00:00.004]
// city流接收到::6> +I[1, shanghai, 1970-01-01T00:00:00.005]
// user流接收到::6> +I[user1, 1, 1970-01-01T00:00:00.005]
// city流接收到::6> +I[1, shanghai, 1970-01-01T00:00:00.007]
// user流接收到::6> +I[user1, 1, 1970-01-01T00:00:00.007]
// city流接收到::6> +I[1, wuhan, 1970-01-01T00:00:00.010]
// user流接收到::6> +I[user1, 1, 1970-01-01T00:00:00.009]
// user流接收到::6> +I[user1, 1, 1970-01-01T00:00:00.011]
// 关联输出::12> +I[user1, 1, beijing, 1970-01-01T00:00:00.001]
// 关联输出::12> +I[user1, 1, beijing, 1970-01-01T00:00:00.004]
// 关联输出::12> +I[user1, 1, shanghai, 1970-01-01T00:00:00.005]
// 关联输出::12> +I[user1, 1, shanghai, 1970-01-01T00:00:00.007]
// 关联输出::12> +I[user1, 1, shanghai, 1970-01-01T00:00:00.009]
env.execute("joinDemo");
}
}
以上,本文通过两个示例介绍了时态表TemporalTableFunction的join操作。
如果需要了解更多内容,可以在本人Flink 专栏中了解更新系统的内容。
本文更详细的内容可参考文章:
17、Flink 之Table API: Table API 支持的操作(1)
17、Flink 之Table API: Table API 支持的操作(2)
本专题分为以下几篇文章:
【flink番外篇】9、Flink Table API 支持的操作示例(1)-通过Table API和SQL创建表
【flink番外篇】9、Flink Table API 支持的操作示例(2)- 通过Table API 和 SQL 创建视图
【flink番外篇】9、Flink Table API 支持的操作示例(3)- 通过API查询表和使用窗口函数的查询
【flink番外篇】9、Flink Table API 支持的操作示例(4)- Table API 对表的查询、过滤操作
【flink番外篇】9、Flink Table API 支持的操作示例(5)- 表的列操作
【flink番外篇】9、Flink Table API 支持的操作示例(6)- 表的聚合(group by、Distinct、GroupBy/Over Window Aggregation)操作
【flink番外篇】9、Flink Table API 支持的操作示例(7)- 表的join操作(内联接、外联接以及联接自定义函数等)
【flink番外篇】9、Flink Table API 支持的操作示例(8)- 时态表的join(scala版本)
【flink番外篇】9、Flink Table API 支持的操作示例(9)- 表的union、unionall、intersect、intersectall、minus、minusall和in的操作
【flink番外篇】9、Flink Table API 支持的操作示例(10)- 表的OrderBy、Offset 和 Fetch、insert操作
【flink番外篇】9、Flink Table API 支持的操作示例(11)- Group Windows(tumbling、sliding和session)操作
【flink番外篇】9、Flink Table API 支持的操作示例(12)- Over Windows(有界和无界的over window)操作
【flink番外篇】9、Flink Table API 支持的操作示例(13)- Row-based(map、flatmap、aggregate、group window aggregate等)操作
【flink番外篇】9、Flink Table API 支持的操作示例(14)- 时态表的join(java版本)
【flink番外篇】9、Flink Table API 支持的操作示例(1)-完整版
【flink番外篇】9、Flink Table API 支持的操作示例(2)-完整版
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。 如若内容造成侵权/违法违规/事实不符,请联系我的编程经验分享网邮箱:veading@qq.com进行投诉反馈,一经查实,立即删除!