Flink导入StarRocks
2023-12-27 15:30:22
1、pom依赖
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
<flink.version>1.13.6</flink.version>
<scala.binary.version>2.12</scala.binary.version>
</properties>
<dependencies>
<!-- Apache Flink 的依赖, 这些依赖项,生产环境可以不打包到JAR文件中. -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-runtime-web_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- flink-connector-starrocks -->
<dependency>
<groupId>com.starrocks</groupId>
<artifactId>flink-connector-starrocks</artifactId>
<version>1.2.5_flink-1.13_2.12</version>
</dependency>
</dependencies>
2、代码编写
public class LoadJsonRecords {
public static void main(String[] args) throws Exception {
// To run the example, you should prepare in the following steps
// 1. create a primary key table in your StarRocks cluster. The DDL is
// CREATE DATABASE `test`;
// CREATE TABLE `test`.`score_board`
// (
// `id` int(11) NOT NULL COMMENT "",
// `name` varchar(65533) NULL DEFAULT "" COMMENT "",
// `score` int(11) NOT NULL DEFAULT "0" COMMENT ""
// )
// ENGINE=OLAP
// PRIMARY KEY(`id`)
// COMMENT "OLAP"
// DISTRIBUTED BY HASH(`id`)
// PROPERTIES(
// "replication_num" = "1"
// );
//
// 2. replace the connector options "jdbc-url" and "load-url" with your cluster configurations
MultipleParameterTool params = MultipleParameterTool.fromArgs(args);
String jdbcUrl = params.get("jdbcUrl", "jdbc:mysql://fe-ip:9030");
String loadUrl = params.get("loadUrl", "be-ip:8040;be-ip:8040;be-ip:8040");
//String jdbcUrl = params.get("jdbcUrl", "jdbc:mysql://fe-ip:9030");
//String loadUrl = params.get("loadUrl", "be-ip:8040;be-ip:8040;be-ip:8040");
//String jdbcUrl = params.get("jdbcUrl", "jdbc:mysql://fe-ip:9030");
//String loadUrl = params.get("loadUrl", "be-ip:8040,be-ip:8040,be-ip:8040");
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// Generate json-format records. Each record has three fields correspond to
// the columns `id`, `name`, and `score` in StarRocks table.
String[] records = new String[]{
"{\"id\":1111, \"name\":\"starrocks-json\", \"score\":100}",
"{\"id\":2222, \"name\":\"flink-json\", \"score\":100}",
};
DataStream<String> source = env.fromElements(records);
// Configure the connector with the required properties, and you also need to add properties
// "sink.properties.format" and "sink.properties.strip_outer_array" to tell the connector the
// input records are json-format.
StarRocksSinkOptions options = StarRocksSinkOptions.builder()
.withProperty("jdbc-url", jdbcUrl)
.withProperty("load-url", loadUrl)
.withProperty("database-name", "tmp")
.withProperty("table-name", "score_board")
.withProperty("username", "")
.withProperty("password", "")
.withProperty("sink.properties.format", "json")
.withProperty("sink.properties.strip_outer_array", "true")
.withProperty("sink.parallelism","1")
//.withProperty("sink.version","V1")
.build();
// Create the sink with the options
SinkFunction<String> starRockSink = StarRocksSink.sink(options);
source.addSink(starRockSink);
env.execute("LoadJsonRecords");
}
}
文章来源:https://blog.csdn.net/docsz/article/details/135241980
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。 如若内容造成侵权/违法违规/事实不符,请联系我的编程经验分享网邮箱:veading@qq.com进行投诉反馈,一经查实,立即删除!
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。 如若内容造成侵权/违法违规/事实不符,请联系我的编程经验分享网邮箱:veading@qq.com进行投诉反馈,一经查实,立即删除!