【flink番外篇】9、Flink Table API 支持的操作示例(7)- 表的join操作(内联接、外联接以及联接自定义函数等)
Flink 系列文章
一、Flink 专栏
Flink 专栏系统介绍某一知识点,并辅以具体的示例进行说明。
-
1、Flink 部署系列
本部分介绍Flink的部署、配置相关基础内容。 -
2、Flink基础系列
本部分介绍Flink 的基础部分,比如术语、架构、编程模型、编程指南、基本的datastream api用法、四大基石等内容。 -
3、Flik Table API和SQL基础系列
本部分介绍Flink Table Api和SQL的基本用法,比如Table API和SQL创建库、表用法、查询、窗口函数、catalog等等内容。 -
4、Flik Table API和SQL提高与应用系列
本部分是table api 和sql的应用部分,和实际的生产应用联系更为密切,以及有一定开发难度的内容。 -
5、Flink 监控系列
本部分和实际的运维、监控工作相关。
二、Flink 示例专栏
Flink 示例专栏是 Flink 专栏的辅助说明,一般不会介绍知识点的信息,更多的是提供一个一个可以具体使用的示例。本专栏不再分目录,通过链接即可看出介绍的内容。
两专栏的所有文章入口点击:Flink 系列文章汇总索引
本文介绍了表的join主要操作,比如内联接、外联接以及联接自定义函数等。
如果需要了解更多内容,可以在本人Flink 专栏中了解更新系统的内容。
本文除了maven依赖外,没有其他依赖。
本文更详细的内容可参考文章:
17、Flink 之Table API: Table API 支持的操作(1)
17、Flink 之Table API: Table API 支持的操作(2)
本专题分为以下几篇文章:
【flink番外篇】9、Flink Table API 支持的操作示例(1)-通过Table API和SQL创建表
【flink番外篇】9、Flink Table API 支持的操作示例(2)- 通过Table API 和 SQL 创建视图
【flink番外篇】9、Flink Table API 支持的操作示例(3)- 通过API查询表和使用窗口函数的查询
【flink番外篇】9、Flink Table API 支持的操作示例(4)- Table API 对表的查询、过滤操作
【flink番外篇】9、Flink Table API 支持的操作示例(5)- 表的列操作
【flink番外篇】9、Flink Table API 支持的操作示例(6)- 表的聚合(group by、Distinct、GroupBy/Over Window Aggregation)操作
【flink番外篇】9、Flink Table API 支持的操作示例(7)- 表的join操作(内联接、外联接以及联接自定义函数等)
【flink番外篇】9、Flink Table API 支持的操作示例(8)- 时态表的join(scala版本)
【flink番外篇】9、Flink Table API 支持的操作示例(9)- 表的union、unionall、intersect、intersectall、minus、minusall和in的操作
【flink番外篇】9、Flink Table API 支持的操作示例(10)- 表的OrderBy、Offset 和 Fetch、insert操作
【flink番外篇】9、Flink Table API 支持的操作示例(11)- Group Windows(tumbling、sliding和session)操作
【flink番外篇】9、Flink Table API 支持的操作示例(12)- Over Windows(有界和无界的over window)操作
【flink番外篇】9、Flink Table API 支持的操作示例(13)- Row-based(map、flatmap、aggregate、group window aggregate等)操作
【flink番外篇】9、Flink Table API 支持的操作示例(14)- 时态表的join(java版本)
【flink番外篇】9、Flink Table API 支持的操作示例(1)-完整版
【flink番外篇】9、Flink Table API 支持的操作示例(2)-完整版
一、maven依赖
本文maven依赖参考文章:【flink番外篇】9、Flink Table API 支持的操作示例(1)-通过Table API和SQL创建表 中的依赖,为节省篇幅不再赘述。
二、示例:表的join操作(内联接、外联接以及联接自定义函数等)
本部分介绍了表的join主要操作,比如内联接、外联接以及联接自定义函数等。
关于自定义函数的联接将在flink 自定义函数中介绍,因为使用函数和联接本身关系不是非常密切。
19、Flink 的Table API 和 SQL 中的自定义函数(2)
import static org.apache.flink.table.api.Expressions.$;
import static org.apache.flink.table.api.Expressions.and;
import static org.apache.flink.table.api.Expressions.call;
import java.util.Arrays;
import java.util.List;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.functions.TableFunction;
import org.apache.flink.table.functions.TemporalTableFunction;
import org.apache.flink.types.Row;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
/**
* @author alanchan
*
*/
public class TestTableAPIJoinOperationDemo {
@Data
@NoArgsConstructor
@AllArgsConstructor
public static class User {
private long id;
private String name;
private double balance;
private Long rowtime;
}
@Data
@NoArgsConstructor
@AllArgsConstructor
public static class Order {
private long id;
private long user_id;
private double amount;
private Long rowtime;
}
final static List<User> userList = Arrays.asList(
new User(1L, "alan", 18, 1698742358391L),
new User(2L, "alan", 19, 1698742359396L),
new User(3L, "alan", 25, 1698742360407L),
new User(4L, "alanchan", 28, 1698742361409L),
new User(5L, "alanchan", 29, 1698742362424L)
);
final static List<Order> orderList = Arrays.asList(
new Order(1L, 1, 18, 1698742358391L),
new Order(2L, 2, 19, 1698742359396L),
new Order(3L, 1, 25, 1698742360407L),
new Order(4L, 3, 28, 1698742361409L),
new Order(5L, 1, 29, 1698742362424L),
new Order(6L, 4, 49, 1698742362424L)
);
static void testInnerJoin() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tenv = StreamTableEnvironment.create(env);
DataStream<User> users = env.fromCollection(userList);
Table usersTable = tenv.fromDataStream(users, $("id"), $("name"),$("balance"),$("rowtime"));
DataStream<Order> orders = env.fromCollection(orderList);
Table ordersTable = tenv.fromDataStream(orders, $("id"), $("user_id"), $("amount"),$("rowtime"));
Table left = usersTable.select($("id").as("userId"), $("name"), $("balance"),$("rowtime").as("u_rowtime"));
Table right = ordersTable.select($("id").as("orderId"), $("user_id"), $("amount"),$("rowtime").as("o_rowtime"));
Table result = left.join(right)
.where($("user_id").isEqual($("userId")))
.select($("orderId"), $("user_id"), $("amount"),$("o_rowtime"),$("name"),$("balance"));
DataStream<Tuple2<Boolean, Row>> resultDS = tenv.toRetractStream(result, Row.class);
resultDS.print();
// 15> (true,+I[4, 3, 28.0, 1698742361409, alan, 25])
// 12> (true,+I[1, 1, 18.0, 1698742358391, alan, 18])
// 3> (true,+I[6, 4, 49.0, 1698742362424, alanchan, 28])
// 12> (true,+I[2, 2, 19.0, 1698742359396, alan, 19])
// 12> (true,+I[3, 1, 25.0, 1698742360407, alan, 18])
// 12> (true,+I[5, 1, 29.0, 1698742362424, alan, 18])
env.execute();
}
/**
* 和 SQL LEFT/RIGHT/FULL OUTER JOIN 子句类似。 关联两张表。 两张表必须有不同的字段名,并且必须定义至少一个等式连接谓词。
* @throws Exception
*/
static void testOuterJoin() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tenv = StreamTableEnvironment.create(env);
DataStream<User> users = env.fromCollection(userList);
Table usersTable = tenv.fromDataStream(users, $("id"), $("name"),$("balance"),$("rowtime"));
DataStream<Order> orders = env.fromCollection(orderList);
Table ordersTable = tenv.fromDataStream(orders, $("id"), $("user_id"), $("amount"),$("rowtime"));
Table left = usersTable.select($("id").as("userId"), $("name"), $("balance"),$("rowtime").as("u_rowtime"));
Table right = ordersTable.select($("id").as("orderId"), $("user_id"), $("amount"),$("rowtime").as("o_rowtime"));
Table leftOuterResult = left.leftOuterJoin(right, $("user_id").isEqual($("userId")))
.select($("orderId"), $("user_id"), $("amount"),$("o_rowtime"),$("name"),$("balance"));
DataStream<Tuple2<Boolean, Row>> leftOuterResultDS = tenv.toRetractStream(leftOuterResult, Row.class);
// leftOuterResultDS.print();
// 12> (true,+I[null, null, null, null, alan, 18])
// 3> (true,+I[null, null, null, null, alanchan, 28])
// 12> (false,-D[null, null, null, null, alan, 18])
// 12> (true,+I[1, 1, 18.0, 1698742358391, alan, 18])
// 15> (true,+I[4, 3, 28.0, 1698742361409, alan, 25])
// 12> (true,+I[null, null, null, null, alan, 19])
// 3> (false,-D[null, null, null, null, alanchan, 28])
// 12> (false,-D[null, null, null, null, alan, 19])
// 3> (true,+I[6, 4, 49.0, 1698742362424, alanchan, 28])
// 12> (true,+I[2, 2, 19.0, 1698742359396, alan, 19])
// 12> (true,+I[3, 1, 25.0, 1698742360407, alan, 18])
// 3> (true,+I[null, null, null, null, alanchan, 29])
// 12> (true,+I[5, 1, 29.0, 1698742362424, alan, 18])
Table rightOuterResult = left.rightOuterJoin(right, $("user_id").isEqual($("userId")))
.select($("orderId"), $("user_id"), $("amount"),$("o_rowtime"),$("name"),$("balance"));
DataStream<Tuple2<Boolean, Row>> rightOuterResultDS = tenv.toRetractStream(rightOuterResult, Row.class);
// rightOuterResultDS.print();
// 12> (true,+I[1, 1, 18.0, 1698742358391, alan, 18])
// 3> (true,+I[6, 4, 49.0, 1698742362424, alanchan, 28])
// 15> (true,+I[4, 3, 28.0, 1698742361409, alan, 25])
// 12> (true,+I[2, 2, 19.0, 1698742359396, alan, 19])
// 12> (true,+I[3, 1, 25.0, 1698742360407, alan, 18])
// 12> (true,+I[5, 1, 29.0, 1698742362424, alan, 18])
Table fullOuterResult = left.fullOuterJoin(right, $("user_id").isEqual($("userId")))
.select($("orderId"), $("user_id"), $("amount"),$("o_rowtime"),$("name"),$("balance"));
DataStream<Tuple2<Boolean, Row>> fullOuterResultDS = tenv.toRetractStream(fullOuterResult, Row.class);
fullOuterResultDS.print();
// 3> (true,+I[6, 4, 49.0, 1698742362424, null, null])
// 12> (true,+I[1, 1, 18.0, 1698742358391, null, null])
// 15> (true,+I[4, 3, 28.0, 1698742361409, null, null])
// 12> (false,-D[1, 1, 18.0, 1698742358391, null, null])
// 3> (false,-D[6, 4, 49.0, 1698742362424, null, null])
// 12> (true,+I[1, 1, 18.0, 1698742358391, alan, 18])
// 3> (true,+I[6, 4, 49.0, 1698742362424, alanchan, 28])
// 3> (true,+I[null, null, null, null, alanchan, 29])
// 12> (true,+I[2, 2, 19.0, 1698742359396, null, null])
// 12> (false,-D[2, 2, 19.0, 1698742359396, null, null])
// 12> (true,+I[2, 2, 19.0, 1698742359396, alan, 19])
// 15> (false,-D[4, 3, 28.0, 1698742361409, null, null])
// 12> (true,+I[3, 1, 25.0, 1698742360407, alan, 18])
// 15> (true,+I[4, 3, 28.0, 1698742361409, alan, 25])
// 12> (true,+I[5, 1, 29.0, 1698742362424, alan, 18])
env.execute();
}
/**
* Interval join 是可以通过流模式处理的常规 join 的子集。
* Interval join 至少需要一个 equi-join 谓词和一个限制双方时间界限的 join 条件。
* 这种条件可以由两个合适的范围谓词(<、<=、>=、>)或一个比较两个输入表相同时间属性(即处理时间或事件时间)的等值谓词来定义。
* @throws Exception
*/
static void testIntervalJoin() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tenv = StreamTableEnvironment.create(env);
DataStream<User> users = env.fromCollection(userList);
Table usersTable = tenv.fromDataStream(users, $("id"), $("name"),$("balance"),$("rowtime"));
DataStream<Order> orders = env.fromCollection(orderList);
Table ordersTable = tenv.fromDataStream(orders, $("id"), $("user_id"), $("amount"),$("rowtime"));
Table left = usersTable.select($("id").as("userId"), $("name"), $("balance"),$("rowtime").as("u_rowtime"));
Table right = ordersTable.select($("id").as("orderId"), $("user_id"), $("amount"),$("rowtime").as("o_rowtime"));
Table result = left.join(right)
.where(
and(
$("user_id").isEqual($("userId")),
$("user_id").isLess(3)
// $("u_rowtime").isGreaterOrEqual($("o_rowtime").minus(lit(5).minutes())),
// $("u_rowtime").isLess($("o_rowtime").plus(lit(10).minutes()))
)
)
.select($("orderId"), $("user_id"), $("amount"),$("o_rowtime"),$("name"),$("balance"))
;
result.printSchema();
DataStream<Tuple2<Boolean, Row>> resultDS = tenv.toRetractStream(result, Row.class);
resultDS.print();
// 12> (true,+I[1, 1, 18.0, 1698742358391, alan, 18.0])
// 12> (true,+I[2, 2, 19.0, 1698742359396, alan, 19.0])
// 12> (true,+I[3, 1, 25.0, 1698742360407, alan, 18.0])
// 12> (true,+I[5, 1, 29.0, 1698742362424, alan, 18.0])
env.execute();
}
/**
* join 表和表函数的结果。左(外部)表的每一行都会 join 表函数相应调用产生的所有行。
* 如果表函数调用返回空结果,则删除左侧(外部)表的一行。
* 该示例为示例性的,具体的验证将在自定义函数中进行说明
*
* @throws Exception
*/
static void testInnerJoinWithUDTF() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tenv = StreamTableEnvironment.create(env);
// 注册 User-Defined Table Function
TableFunction<Tuple3<String,String,String>> split = new SplitFunction();
tenv.registerFunction("split", split);
// join
DataStream<Order> orders = env.fromCollection(orderList);
Table ordersTable = tenv.fromDataStream(orders, $("id"), $("user_id"), $("amount"),$("rowtime"));
Table result = ordersTable
.joinLateral(call("split", $("c")).as("s", "t", "v"))
.select($("a"), $("b"), $("s"), $("t"), $("v"));
env.execute();
}
/**
* join 表和表函数的结果。左(外部)表的每一行都会 join 表函数相应调用产生的所有行。
* 如果表函数调用返回空结果,则保留相应的 outer(外部连接)行并用空值填充右侧结果。
* 目前,表函数左外连接的谓词只能为空或字面(常量)真。
* 该示例为示例性的,具体的验证将在自定义函数中进行说明
*
* @throws Exception
*/
static void testLeftOuterJoinWithUDTF() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tenv = StreamTableEnvironment.create(env);
// 注册 User-Defined Table Function
TableFunction<Tuple3<String,String,String>> split = new SplitFunction();
tenv.registerFunction("split", split);
// join
DataStream<Order> orders = env.fromCollection(orderList);
Table ordersTable = tenv.fromDataStream(orders, $("id"), $("user_id"), $("amount"),$("rowtime"));
Table result = ordersTable
.leftOuterJoinLateral(call("split", $("c")).as("s", "t", "v"))
.select($("a"), $("b"), $("s"), $("t"), $("v"));
env.execute();
}
/**
* Temporal table 是跟踪随时间变化的表。
* Temporal table 函数提供对特定时间点 temporal table 状态的访问。
* 表与 temporal table 函数进行 join 的语法和使用表函数进行 inner join 的语法相同。
* 目前仅支持与 temporal table 的 inner join。
*
* @throws Exception
*/
static void testJoinWithTemporalTable() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tenv = StreamTableEnvironment.create(env);
Table ratesHistory = tenv.from("RatesHistory");
// 注册带有时间属性和主键的 temporal table function
TemporalTableFunction rates = ratesHistory.createTemporalTableFunction(
$("r_proctime"),
$("r_currency")
);
tenv.registerFunction("rates", rates);
// 基于时间属性和键与“Orders”表关联
Table orders = tenv.from("Orders");
Table result = orders
.joinLateral(call("rates", $("o_proctime")), $("o_currency").isEqual($("r_currency")));
env.execute();
}
/**
* @param args
* @throws Exception
*/
public static void main(String[] args) throws Exception {
// testInnerJoin();
// testOuterJoin();
// testIntervalJoin();
testInnerJoinWithUDTF();
}
static class SplitFunction extends TableFunction<Tuple3<String,String,String>>{
public void eval(Tuple3<String,String,String> tp) {
// for (String s : str.split(",")) {
// // use collect(...) to emit a row
collect(Row.of(s, s.length()));
// }
}
}
}
以上,本文介绍了表的join主要操作,比如内联接、外联接以及联接自定义函数等。
如果需要了解更多内容,可以在本人Flink 专栏中了解更新系统的内容。
本文更详细的内容可参考文章:
17、Flink 之Table API: Table API 支持的操作(1)
17、Flink 之Table API: Table API 支持的操作(2)
本专题分为以下几篇文章:
【flink番外篇】9、Flink Table API 支持的操作示例(1)-通过Table API和SQL创建表
【flink番外篇】9、Flink Table API 支持的操作示例(2)- 通过Table API 和 SQL 创建视图
【flink番外篇】9、Flink Table API 支持的操作示例(3)- 通过API查询表和使用窗口函数的查询
【flink番外篇】9、Flink Table API 支持的操作示例(4)- Table API 对表的查询、过滤操作
【flink番外篇】9、Flink Table API 支持的操作示例(5)- 表的列操作
【flink番外篇】9、Flink Table API 支持的操作示例(6)- 表的聚合(group by、Distinct、GroupBy/Over Window Aggregation)操作
【flink番外篇】9、Flink Table API 支持的操作示例(7)- 表的join操作(内联接、外联接以及联接自定义函数等)
【flink番外篇】9、Flink Table API 支持的操作示例(8)- 时态表的join(scala版本)
【flink番外篇】9、Flink Table API 支持的操作示例(9)- 表的union、unionall、intersect、intersectall、minus、minusall和in的操作
【flink番外篇】9、Flink Table API 支持的操作示例(10)- 表的OrderBy、Offset 和 Fetch、insert操作
【flink番外篇】9、Flink Table API 支持的操作示例(11)- Group Windows(tumbling、sliding和session)操作
【flink番外篇】9、Flink Table API 支持的操作示例(12)- Over Windows(有界和无界的over window)操作
【flink番外篇】9、Flink Table API 支持的操作示例(13)- Row-based(map、flatmap、aggregate、group window aggregate等)操作
【flink番外篇】9、Flink Table API 支持的操作示例(14)- 时态表的join(java版本)
【flink番外篇】9、Flink Table API 支持的操作示例(1)-完整版
【flink番外篇】9、Flink Table API 支持的操作示例(2)-完整版
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。 如若内容造成侵权/违法违规/事实不符,请联系我的编程经验分享网邮箱:veading@qq.com进行投诉反馈,一经查实,立即删除!