【flink番外篇】9、Flink Table API 支持的操作示例(2)- 通过Table API 和 SQL 创建视图
Flink 系列文章
一、Flink 专栏
Flink 专栏系统介绍某一知识点,并辅以具体的示例进行说明。
-
1、Flink 部署系列
本部分介绍Flink的部署、配置相关基础内容。 -
2、Flink基础系列
本部分介绍Flink 的基础部分,比如术语、架构、编程模型、编程指南、基本的datastream api用法、四大基石等内容。 -
3、Flik Table API和SQL基础系列
本部分介绍Flink Table Api和SQL的基本用法,比如Table API和SQL创建库、表用法、查询、窗口函数、catalog等等内容。 -
4、Flik Table API和SQL提高与应用系列
本部分是table api 和sql的应用部分,和实际的生产应用联系更为密切,以及有一定开发难度的内容。 -
5、Flink 监控系列
本部分和实际的运维、监控工作相关。
二、Flink 示例专栏
Flink 示例专栏是 Flink 专栏的辅助说明,一般不会介绍知识点的信息,更多的是提供一个一个可以具体使用的示例。本专栏不再分目录,通过链接即可看出介绍的内容。
两专栏的所有文章入口点击:Flink 系列文章汇总索引
本文给出了通过Table API 和SQL 的两种方式创建视图,也就是虚表。同时为了更接近实用,通过Table API 创建了一张Hive的表,然后在该表上创建视图进行示例。
如果需要了解更多内容,可以在本人Flink 专栏中了解更新系统的内容。
本文除了maven依赖外,没有其他依赖。
本文依赖hive、hadoop、kafka环境好用,代码中示例的hive配置文件路径根据你自己的环境而设置。
本文更详细的内容可参考文章:
17、Flink 之Table API: Table API 支持的操作(1)
17、Flink 之Table API: Table API 支持的操作(2)
本专题分为以下几篇文章:
【flink番外篇】9、Flink Table API 支持的操作示例(1)-通过Table API和SQL创建表
【flink番外篇】9、Flink Table API 支持的操作示例(2)- 通过Table API 和 SQL 创建视图
【flink番外篇】9、Flink Table API 支持的操作示例(3)- 通过API查询表和使用窗口函数的查询
【flink番外篇】9、Flink Table API 支持的操作示例(4)- Table API 对表的查询、过滤操作
【flink番外篇】9、Flink Table API 支持的操作示例(5)- 表的列操作
【flink番外篇】9、Flink Table API 支持的操作示例(6)- 表的聚合(group by、Distinct、GroupBy/Over Window Aggregation)操作
【flink番外篇】9、Flink Table API 支持的操作示例(7)- 表的join操作(内联接、外联接以及联接自定义函数等)
【flink番外篇】9、Flink Table API 支持的操作示例(8)- 时态表的join(scala版本)
【flink番外篇】9、Flink Table API 支持的操作示例(9)- 表的union、unionall、intersect、intersectall、minus、minusall和in的操作
【flink番外篇】9、Flink Table API 支持的操作示例(10)- 表的OrderBy、Offset 和 Fetch、insert操作
【flink番外篇】9、Flink Table API 支持的操作示例(11)- Group Windows(tumbling、sliding和session)操作
【flink番外篇】9、Flink Table API 支持的操作示例(12)- Over Windows(有界和无界的over window)操作
【flink番外篇】9、Flink Table API 支持的操作示例(13)- Row-based(map、flatmap、aggregate、group window aggregate等)操作
【flink番外篇】9、Flink Table API 支持的操作示例(14)- 时态表的join(java版本)
【flink番外篇】9、Flink Table API 支持的操作示例(1)-完整版
【flink番外篇】9、Flink Table API 支持的操作示例(2)-完整版
一、maven依赖
本文maven依赖参考文章:【flink番外篇】9、Flink Table API 支持的操作示例(1)-通过Table API和SQL创建表 中的依赖,为节省篇幅不再赘述。
二、示例:通过Table API 和 SQL 创建视图
1、示例:通过SQL创建视图
本示例是通过sql创建一个简单的表,然后再通过sql创建一个视图,最后查询视图并输出结果。
import static org.apache.flink.table.api.Expressions.$;
import static org.apache.flink.table.api.Expressions.and;
import static org.apache.flink.table.api.Expressions.lit;
import static org.apache.flink.table.expressions.ApiExpressionUtils.unresolvedCall;
import java.sql.Timestamp;
import java.time.Duration;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Over;
import org.apache.flink.table.api.Schema;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableDescriptor;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.Tumble;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.catalog.CatalogDatabaseImpl;
import org.apache.flink.table.catalog.CatalogView;
import org.apache.flink.table.catalog.Column;
import org.apache.flink.table.catalog.ObjectPath;
import org.apache.flink.table.catalog.ResolvedCatalogView;
import org.apache.flink.table.catalog.ResolvedSchema;
import org.apache.flink.table.catalog.hive.HiveCatalog;
import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
import org.apache.flink.types.Row;
import com.google.common.collect.Lists;
/**
* @author alanchan
*
*/
public class TestTableAPIDemo {
/**
* @param args
* @throws Exception
*/
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tenv = StreamTableEnvironment.create(env);
// SQL 创建输入表
String sourceSql = "CREATE TABLE Alan_KafkaTable (\r\n" +
" `event_time` TIMESTAMP(3) METADATA FROM 'timestamp',\r\n" +
" `partition` BIGINT METADATA VIRTUAL,\r\n" +
" `offset` BIGINT METADATA VIRTUAL,\r\n" +
" `user_id` BIGINT,\r\n" +
" `item_id` BIGINT,\r\n" +
" `behavior` STRING\r\n" +
") WITH (\r\n" +
" 'connector' = 'kafka',\r\n" +
" 'topic' = 'user_behavior',\r\n" +
" 'properties.bootstrap.servers' = '192.168.10.41:9092,192.168.10.42:9092,192.168.10.43:9092',\r\n" +
" 'properties.group.id' = 'testGroup',\r\n" +
" 'scan.startup.mode' = 'earliest-offset',\r\n" +
" 'format' = 'csv'\r\n" +
");";
tenv.executeSql(sourceSql);
//
String sql = "select user_id , behavior from Alan_KafkaTable group by user_id ,behavior ";
Table resultQuery = tenv.sqlQuery(sql);
tenv.createTemporaryView("Alan_KafkaView", resultQuery);
String queryViewSQL = " select * from Alan_KafkaView ";
Table queryViewResult = tenv.sqlQuery(queryViewSQL);
DataStream<Tuple2<Boolean, Row>> resultDS = tenv.toRetractStream(queryViewResult, Row.class);
// 6、sink
resultDS.print();
// 7、执行
env.execute();
// kafka中输入测试数据
// 1,1001,login
// 1,2001,p_read
// 程序运行控制台输入如下
// 3> (true,+I[1, login])
// 14> (true,+I[1, p_read])
}
}
2、示例:通过Table API创建视图
本示例是通过Table API创建一个hive的表,将数据写入hive,然后再创建视图,最后查询视图输出。
本示例依赖hive、hadoop、kafka环境好用,代码中示例的hive配置文件路径根据你自己的环境而设置。
import static org.apache.flink.table.api.Expressions.$;
import static org.apache.flink.table.api.Expressions.and;
import static org.apache.flink.table.api.Expressions.lit;
import static org.apache.flink.table.expressions.ApiExpressionUtils.unresolvedCall;
import java.sql.Timestamp;
import java.time.Duration;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Over;
import org.apache.flink.table.api.Schema;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableDescriptor;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.Tumble;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.catalog.CatalogDatabaseImpl;
import org.apache.flink.table.catalog.CatalogView;
import org.apache.flink.table.catalog.Column;
import org.apache.flink.table.catalog.ObjectPath;
import org.apache.flink.table.catalog.ResolvedCatalogView;
import org.apache.flink.table.catalog.ResolvedSchema;
import org.apache.flink.table.catalog.hive.HiveCatalog;
import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
import org.apache.flink.types.Row;
import com.google.common.collect.Lists;
/**
* @author alanchan
*
*/
public class TestTableAPIDemo {
/**
* @param args
* @throws Exception
*/
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tenv = StreamTableEnvironment.create(env);
// SQL 创建输入表
String sourceSql = "CREATE TABLE Alan_KafkaTable (\r\n" +
" `event_time` TIMESTAMP(3) METADATA FROM 'timestamp',\r\n" +
" `partition` BIGINT METADATA VIRTUAL,\r\n" +
" `offset` BIGINT METADATA VIRTUAL,\r\n" +
" `user_id` BIGINT,\r\n" +
" `item_id` BIGINT,\r\n" +
" `behavior` STRING\r\n" +
") WITH (\r\n" +
" 'connector' = 'kafka',\r\n" +
" 'topic' = 'user_behavior',\r\n" +
" 'properties.bootstrap.servers' = '192.168.10.41:9092,192.168.10.42:9092,192.168.10.43:9092',\r\n" +
" 'properties.group.id' = 'testGroup',\r\n" +
" 'scan.startup.mode' = 'earliest-offset',\r\n" +
" 'format' = 'csv'\r\n" +
");";
tenv.executeSql(sourceSql);
// 创建视图
String catalogName = "alan_hive";
String defaultDatabase = "default";
String databaseName = "viewtest_db";
String hiveConfDir = "/usr/local/bigdata/apache-hive-3.1.2-bin/conf";
HiveCatalog hiveCatalog = new HiveCatalog(catalogName, defaultDatabase, hiveConfDir);
tenv.registerCatalog(catalogName, hiveCatalog);
tenv.useCatalog(catalogName);
hiveCatalog.createDatabase(databaseName, new CatalogDatabaseImpl(new HashMap(), hiveConfDir) {
}, true);
tenv.useDatabase(databaseName);
String viewName = "Alan_KafkaView";
String originalQuery = "select user_id , behavior from Alan_KafkaTable group by user_id ,behavior ";
String expandedQuery = "SELECT user_id , behavior FROM " + databaseName + "." + "Alan_KafkaTable group by user_id ,behavior ";
String comment = "this is a comment";
ObjectPath path = new ObjectPath(databaseName, viewName);
createView(originalQuery, expandedQuery, comment, hiveCatalog, path);
// 查询视图
String queryViewSQL = " select * from Alan_KafkaView ";
Table queryViewResult = tenv.sqlQuery(queryViewSQL);
DataStream<Tuple2<Boolean, Row>> resultDS = tenv.toRetractStream(queryViewResult, Row.class);
// 6、sink
resultDS.print();
// 7、执行
env.execute();
// kafka中输入测试数据
// 1,1001,login
// 1,2001,p_read
// 程序运行控制台输入如下
// 3> (true,+I[1, login])
// 14> (true,+I[1, p_read])
}
static void createView(String originalQuery, String expandedQuery, String comment, HiveCatalog hiveCatalog, ObjectPath path) throws Exception {
ResolvedSchema resolvedSchema = new ResolvedSchema(
Arrays.asList(
Column.physical("user_id", DataTypes.INT()),
Column.physical("behavior", DataTypes.STRING())),
Collections.emptyList(),
null);
CatalogView origin = CatalogView.of(
Schema.newBuilder().fromResolvedSchema(resolvedSchema).build(),
comment,
originalQuery,
expandedQuery,
Collections.emptyMap());
CatalogView view = new ResolvedCatalogView(origin, resolvedSchema);
hiveCatalog.createTable(path, view, false);
}
}
以上,本文给出了通过Table API 和SQL 的两种方式创建视图,也就是虚表。同时为了更接近实用,通过Table API 创建了一张Hive的表,然后在该表上创建视图进行示例。
如果需要了解更多内容,可以在本人Flink 专栏中了解更新系统的内容。
本文更详细的内容可参考文章:
17、Flink 之Table API: Table API 支持的操作(1)
17、Flink 之Table API: Table API 支持的操作(2)
本专题分为以下几篇文章:
【flink番外篇】9、Flink Table API 支持的操作示例(1)-通过Table API和SQL创建表
【flink番外篇】9、Flink Table API 支持的操作示例(2)- 通过Table API 和 SQL 创建视图
【flink番外篇】9、Flink Table API 支持的操作示例(3)- 通过API查询表和使用窗口函数的查询
【flink番外篇】9、Flink Table API 支持的操作示例(4)- Table API 对表的查询、过滤操作
【flink番外篇】9、Flink Table API 支持的操作示例(5)- 表的列操作
【flink番外篇】9、Flink Table API 支持的操作示例(6)- 表的聚合(group by、Distinct、GroupBy/Over Window Aggregation)操作
【flink番外篇】9、Flink Table API 支持的操作示例(7)- 表的join操作(内联接、外联接以及联接自定义函数等)
【flink番外篇】9、Flink Table API 支持的操作示例(8)- 时态表的join(scala版本)
【flink番外篇】9、Flink Table API 支持的操作示例(9)- 表的union、unionall、intersect、intersectall、minus、minusall和in的操作
【flink番外篇】9、Flink Table API 支持的操作示例(10)- 表的OrderBy、Offset 和 Fetch、insert操作
【flink番外篇】9、Flink Table API 支持的操作示例(11)- Group Windows(tumbling、sliding和session)操作
【flink番外篇】9、Flink Table API 支持的操作示例(12)- Over Windows(有界和无界的over window)操作
【flink番外篇】9、Flink Table API 支持的操作示例(13)- Row-based(map、flatmap、aggregate、group window aggregate等)操作
【flink番外篇】9、Flink Table API 支持的操作示例(14)- 时态表的join(java版本)
【flink番外篇】9、Flink Table API 支持的操作示例(1)-完整版
【flink番外篇】9、Flink Table API 支持的操作示例(2)-完整版
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。 如若内容造成侵权/违法违规/事实不符,请联系我的编程经验分享网邮箱:veading@qq.com进行投诉反馈,一经查实,立即删除!