Flink自定义Source模拟数据流
2024-01-08 06:00:45
maven依赖
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.zxl</groupId>
<artifactId>FlinkJoin</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<flink.version>1.17.0</flink.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-core</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.16.22</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner_2.12</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-common</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-table-api-java -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>8.0.27</version>
</dependency>
<!--com.mysql.cj.jdbc.Driver-->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients</artifactId>
<version>${flink.version}</version>
</dependency>
<!--这个一定要加,否则会报错(5001好像是,记不清了)-->
<dependency>
<groupId>org.glassfish.jersey.inject</groupId>
<artifactId>jersey-hk2</artifactId>
<version>2.34</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java</artifactId>
<version>1.17.0</version>
<scope>compile</scope>
</dependency>
</dependencies>
</project>
实体类
订单类
package com.zxl.bean;
// TODO: 2024/1/6 订单类
public class Orders {
//订单ID
private Long order_id;
//用户ID
private Long user_id;
//订单日期
private Long order_date;
//订单金额
private Integer order_amount;
//商品ID
private Integer product_id;
//订单数量
private Long order_num;
public Long getOrder_id() {
return order_id;
}
public void setOrder_id(Long order_id) {
this.order_id = order_id;
}
public Long getUser_id() {
return user_id;
}
public void setUser_id(Long user_id) {
this.user_id = user_id;
}
public Long getOrder_date() {
return order_date;
}
public void setOrder_date(Long order_date) {
this.order_date = order_date;
}
public Integer getOrder_amount() {
return order_amount;
}
public void setOrder_amount(Integer order_amount) {
this.order_amount = order_amount;
}
public Integer getProduct_id() {
return product_id;
}
public void setProduct_id(Integer product_id) {
this.product_id = product_id;
}
public Long getOrder_num() {
return order_num;
}
public void setOrder_num(Long order_num) {
this.order_num = order_num;
}
public Orders() {
}
public Orders(Long order_id, Long user_id, Long order_date, Integer order_amount, Integer product_id, Long order_num) {
this.order_id = order_id;
this.user_id = user_id;
this.order_date = order_date;
this.order_amount = order_amount;
this.product_id = product_id;
this.order_num = order_num;
}
@Override
public String toString() {
return "Orders{" +
"order_id=" + order_id +
", user_id=" + user_id +
", order_date=" + order_date +
", order_amount=" + order_amount +
", product_id=" + product_id +
", order_num=" + order_num +
'}';
}
}
支付类
package com.zxl.bean;
// TODO: 2024/1/6 支付类
public class Payments {
//支付ID
private Long payment_id;
//订单号
private Long order_id;
//支付金额
private Integer payment_amount;
//支付类型
private String payment_type;
//支付日期
private Long payment_date;
public Long getPayment_id() {
return payment_id;
}
public void setPayment_id(Long payment_id) {
this.payment_id = payment_id;
}
public Long getOrder_id() {
return order_id;
}
public void setOrder_id(Long order_id) {
this.order_id = order_id;
}
public Integer getPayment_amount() {
return payment_amount;
}
public void setPayment_amount(Integer payment_amount) {
this.payment_amount = payment_amount;
}
public String getPayment_type() {
return payment_type;
}
public void setPayment_type(String payment_type) {
this.payment_type = payment_type;
}
public Long getPayment_date() {
return payment_date;
}
public void setPayment_date(Long payment_date) {
this.payment_date = payment_date;
}
public Payments() {
}
public Payments(Long payment_id, Long order_id, Integer payment_amount, String payment_type, Long payment_date) {
this.payment_id = payment_id;
this.order_id = order_id;
this.payment_amount = payment_amount;
this.payment_type = payment_type;
this.payment_date = payment_date;
}
@Override
public String toString() {
return "payments{" +
"payment_id=" + payment_id +
", order_id=" + order_id +
", payment_amount=" + payment_amount +
", payment_type='" + payment_type + '\'' +
", payment_date=" + payment_date +
'}';
}
}
商品类
用作维表测试
package com.zxl.bean;
// TODO: 2024/1/6 商品类
public class Products {
//商品ID
private Integer product_id;
//商品名称
private String product_name;
//商品价格
private Integer product_price;
//商品库存
private Long product_num;
//商品分类
private String product_type;
public Integer getProduct_id() {
return product_id;
}
public void setProduct_id(Integer product_id) {
this.product_id = product_id;
}
public String getProduct_name() {
return product_name;
}
public void setProduct_name(String product_name) {
this.product_name = product_name;
}
public Integer getProduct_price() {
return product_price;
}
public void setProduct_price(Integer product_price) {
this.product_price = product_price;
}
public Long getProduct_num() {
return product_num;
}
public void setProduct_num(Long product_num) {
this.product_num = product_num;
}
public String getProduct_type() {
return product_type;
}
public void setProduct_type(String product_type) {
this.product_type = product_type;
}
public Products() {
}
public Products(Integer product_id, String product_name, Integer product_price, Long product_num, String product_type) {
this.product_id = product_id;
this.product_name = product_name;
this.product_price = product_price;
this.product_num = product_num;
this.product_type = product_type;
}
@Override
public String toString() {
return "products{" +
"product_id=" + product_id +
", product_name='" + product_name + '\'' +
", product_price=" + product_price +
", product_num=" + product_num +
", product_type='" + product_type + '\'' +
'}';
}
}
数据生成
订单数据生成
package com.zxl.datas;
import com.zxl.bean.Orders;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import java.util.Random;
public class OrdersData implements SourceFunction<Orders> {
private static Random random = new Random();
private static boolean isRunning = true;
private static Integer num = 0;
//订单ID
private static Long getOrder_id() {
num++;
long aLong = Long.parseLong(num.toString());
return aLong;
}
//订单日期
private static Long getOrder_date() {
//为了模拟数据延迟所里利用随机数进行模拟时间
int i = random.nextInt(15);
return Long.valueOf(i);
}
//用户ID
private static Long getUser_id() {
return random.nextLong();
}
//订单金额
private static Integer getOrder_amount() {
return random.nextInt(100);
}
//商品ID
private static Integer getProduct_id() {
return random.nextInt(100);
}
//订单数量
private static Long getOrder_num() {
return random.nextLong();
}
//订单类
private static Orders getOrders() {
Orders orders = new Orders(getOrder_id(), getUser_id(), getOrder_date(), getOrder_amount(), getProduct_id(), getOrder_num());
return orders;
}
@Override
public void run(SourceContext<Orders> sourceContext) throws Exception {
while (isRunning) {
sourceContext.collect(getOrders());
Thread.sleep(1000);
}
}
@Override
public void cancel() {
isRunning = false;
}
}
支付数据生成
package com.zxl.datas;
import com.zxl.bean.Payments;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import java.util.Date;
import java.util.Random;
public class PaymentData implements SourceFunction<Payments> {
private static Random random = new Random();
private static boolean isRunning = true;
private static Integer num = 0;
//支付ID
private static Long getPayment_id(){
return random.nextLong();
}
//订单ID
private static Long getOrder_id() {
num++;
long aLong = Long.parseLong(num.toString());
return aLong;
}
//支付金额
private static Integer getPayment_amount(){
return random.nextInt(1000);
}
//支付类型
private static String getPayment_type(){
String[] type = {"银行卡", "支付宝", "微信", "美团", "抖音", "现金"};
int are= random.nextInt(6);
String area=type[are];
return area;
}
//支付日期
private static Long getPayment_date(){
//为了模拟数据延迟所里利用随机数进行模拟时间
int i = random.nextInt(15);
return Long.valueOf(i);
}
//支付类
private static Payments getPayments(){
Payments payments = new Payments(getPayment_id(),getOrder_id(),getPayment_amount(),getPayment_type(),getPayment_date());
return payments;
}
@Override
public void run(SourceContext<Payments> sourceContext) throws Exception {
while (isRunning) {
sourceContext.collect(getPayments());
Thread.sleep(1000);
}
}
@Override
public void cancel() {
isRunning = false;
}
}
测试数据打印
package com.zxl.flink;
import com.zxl.bean.Orders;
import com.zxl.bean.Payments;
import com.zxl.datas.OrdersData;
import com.zxl.datas.PaymentData;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
public class flinkWorks {
public static void main(String[] args) throws Exception {
//创建Flink流处理执行环境
StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
//设置并行度为1
environment.setParallelism(1);
//调用Flink自定义Source
// TODO: 2024/1/6 订单数据
DataStreamSource<Orders> ordersDataStreamSource = environment.addSource(new OrdersData());
// TODO: 2024/1/6 支付数据
DataStreamSource<Payments> paymentsDataStreamSource = environment.addSource(new PaymentData());
//打印数据
paymentsDataStreamSource.print();
ordersDataStreamSource.print();
//启动程序
environment.execute();
}
}
文章来源:https://blog.csdn.net/m0_52606060/article/details/135436048
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。 如若内容造成侵权/违法违规/事实不符,请联系我的编程经验分享网邮箱:veading@qq.com进行投诉反馈,一经查实,立即删除!
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。 如若内容造成侵权/违法违规/事实不符,请联系我的编程经验分享网邮箱:veading@qq.com进行投诉反馈,一经查实,立即删除!