【flink番外篇】10、对有状态或及时 UDF 和自定义算子进行单元测试
Flink 系列文章
一、Flink 专栏
Flink 专栏系统介绍某一知识点,并辅以具体的示例进行说明。
-
1、Flink 部署系列
本部分介绍Flink的部署、配置相关基础内容。 -
2、Flink基础系列
本部分介绍Flink 的基础部分,比如术语、架构、编程模型、编程指南、基本的datastream api用法、四大基石等内容。 -
3、Flik Table API和SQL基础系列
本部分介绍Flink Table Api和SQL的基本用法,比如Table API和SQL创建库、表用法、查询、窗口函数、catalog等等内容。 -
4、Flik Table API和SQL提高与应用系列
本部分是table api 和sql的应用部分,和实际的生产应用联系更为密切,以及有一定开发难度的内容。 -
5、Flink 监控系列
本部分和实际的运维、监控工作相关。
二、Flink 示例专栏
Flink 示例专栏是 Flink 专栏的辅助说明,一般不会介绍知识点的信息,更多的是提供一个一个可以具体使用的示例。本专栏不再分目录,通过链接即可看出介绍的内容。
两专栏的所有文章入口点击:Flink 系列文章汇总索引
文章目录
本文着重介绍了Flink的有状态算子的单元测试,通过四个例子介绍了flatMap 和 process function的有状态单元测试。
如果需要了解更多内容,可以在本人Flink 专栏中了解更新系统的内容。
本文除了maven依赖外,没有其他依赖。
关于单元测试更详细内容参考:50、Flink的单元测试介绍及示例
一、有状态算子单元测试的介绍
对使用管理状态或定时器的用户自定义函数的功能测试会更加困难,因为它涉及到测试用户代码和 Flink 运行时的交互。 为此,Flink 提供了一组所谓的测试工具,可用于测试用户自定义函数和自定义算子:
- OneInputStreamOperatorTestHarness (适用于 DataStream 上的算子)
- KeyedOneInputStreamOperatorTestHarness (适用于 KeyedStream 上的算子)
- TwoInputStreamOperatorTestHarness (f适用于两个 DataStream 的 ConnectedStreams 算子)
- KeyedTwoInputStreamOperatorTestHarness (适用于两个 KeyedStream 上的 ConnectedStreams 算子)
要使用测试工具,还需要一组其他的依赖项,比如DataStream和TableAPI的依赖。
1、DataStream API 测试依赖
如果要为使用 DataStream API 构建的作业开发测试用例,则需要添加以下依赖项:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-test-utils</artifactId>
<version>1.17.2</version>
<scope>test</scope>
</dependency>
在各种测试实用程序中,该模块提供了 MiniCluster (一个可配置的轻量级 Flink 集群,能在 JUnit 测试中运行),可以直接执行作业。
2、Table API 测试依赖
如果您想在您的 IDE 中本地测试 Table API 和 SQL 程序,除了前述提到的 flink-test-utils 之外,您还要添加以下依赖项:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-test-utils</artifactId>
<version>1.17.2</version>
<scope>test</scope>
</dependency>
这将自动引入查询计划器和运行时,分别用于计划和执行查询。
flink-table-test-utils 模块已在 Flink 1.15 中引入,截至Flink 1.17版本被认为是实验性的。
3、maven依赖
本文示例的maven依赖
<properties>
<encoding>UTF-8</encoding>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
<java.version>1.8</java.version>
<scala.version>2.12</scala.version>
<flink.version>1.17.0</flink.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-csv</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-json</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.13</version>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
<version>4.0.0</version>
<scope>test</scope>
</dependency>
</dependencies>
二、flatmap function 单元测试
可以使用测试工具将记录和 watermark 推送到用户自定义函数或自定义算子中,控制处理时间,最后对算子的输出(包括旁路输出)进行校验。
1、OneInputStreamOperatorTestHarness使用示例
/*
* @Author: alanchan
* @LastEditors: alanchan
* @Description: 单元测试flatmap,如果是偶数则存储原值及平方数
*/
import java.util.concurrent.ConcurrentLinkedQueue;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.streaming.api.operators.StreamFlatMap;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
import org.apache.flink.streaming.util.TestHarnessUtil;
import org.apache.flink.util.Collector;
import org.junit.Before;
import org.junit.Test;
public class TestStatefulFlatMapDemo3 {
static class AlanFlatMapFunction implements FlatMapFunction<Integer, Integer> {
@Override
public void flatMap(Integer value, Collector<Integer> out) throws Exception {
if (value % 2 == 0) {
out.collect(value);
out.collect(value * value);
}
}
}
OneInputStreamOperatorTestHarness<Integer, Integer> testHarness;
@Before
public void setupTestHarness() throws Exception {
StreamFlatMap<Integer, Integer> operator = new StreamFlatMap<Integer, Integer>(new AlanFlatMapFunction());
testHarness = new OneInputStreamOperatorTestHarness<Integer, Integer>(operator);
testHarness.open();
}
@Test
public void testFlatMap2() throws Exception {
long initialTime = 0L;
ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<Object>();
testHarness.processElement(new StreamRecord<Integer>(1, initialTime + 1));
testHarness.processElement(new StreamRecord<Integer>(2, initialTime + 2));
testHarness.processWatermark(new Watermark(initialTime + 2));
testHarness.processElement(new StreamRecord<Integer>(3, initialTime + 3));
testHarness.processElement(new StreamRecord<Integer>(4, initialTime + 4));
testHarness.processElement(new StreamRecord<Integer>(5, initialTime + 5));
testHarness.processElement(new StreamRecord<Integer>(6, initialTime + 6));
testHarness.processElement(new StreamRecord<Integer>(7, initialTime + 7));
testHarness.processElement(new StreamRecord<Integer>(8, initialTime + 8));
expectedOutput.add(new StreamRecord<Integer>(2, initialTime + 2));
expectedOutput.add(new StreamRecord<Integer>(4, initialTime + 2));
expectedOutput.add(new Watermark(initialTime + 2));
expectedOutput.add(new StreamRecord<Integer>(4, initialTime + 4));
expectedOutput.add(new StreamRecord<Integer>(16, initialTime + 4));
expectedOutput.add(new StreamRecord<Integer>(6, initialTime + 6));
expectedOutput.add(new StreamRecord<Integer>(36, initialTime + 6));
expectedOutput.add(new StreamRecord<Integer>(8, initialTime + 8));
expectedOutput.add(new StreamRecord<Integer>(64, initialTime + 8));
TestHarnessUtil.assertOutputEquals("输出结果", expectedOutput, testHarness.getOutput());
}
}
2、KeyedOneInputStreamOperatorTestHarness使用示例
KeyedOneInputStreamOperatorTestHarness 和 KeyedTwoInputStreamOperatorTestHarness 可以通过为键的类另外提供一个包含 TypeInformation 的 KeySelector 来实例化。
/*
* @Author: alanchan
* @LastEditors: alanchan
* @Description: 按照城市分类,并将城市缩写变成大写
*/
import com.google.common.collect.Lists;
import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.operators.StreamFlatMap;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
import org.apache.flink.util.Collector;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
public class TestStatefulFlatMapDemo2 {
@Data
@NoArgsConstructor
@AllArgsConstructor
static class User {
private int id;
private String name;
private int age;
private String city;
}
static class AlanFlatMapFunction extends RichFlatMapFunction<User, User> {
// The state is only accessible by functions applied on a {@code KeyedStream}
ValueState<User> previousInput;
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
previousInput = getRuntimeContext()
.getState(new ValueStateDescriptor<User>("previousInput", User.class));
}
@Override
public void flatMap(User input, Collector<User> out) throws Exception {
previousInput.update(input);
input.setCity(input.getCity().toUpperCase());
out.collect(input);
}
}
AlanFlatMapFunction alanFlatMapFunction = new AlanFlatMapFunction();
OneInputStreamOperatorTestHarness<User, User> testHarness;
@Before
public void setupTestHarness() throws Exception {
alanFlatMapFunction = new AlanFlatMapFunction();
testHarness = new KeyedOneInputStreamOperatorTestHarness<>(new StreamFlatMap<>(alanFlatMapFunction),
new KeySelector<User, String>() {
@Override
public String getKey(User value) throws Exception {
return value.getCity();
}
}, Types.STRING);
testHarness.open();
}
@Test
public void testFlatMap() throws Exception {
testHarness.processElement(new User(1, "alanchan", 18, "sh"), 10);
ValueState<User> previousInput = alanFlatMapFunction.getRuntimeContext().getState(
new ValueStateDescriptor<>("previousInput", User.class));
User stateValue = previousInput.value();
Assert.assertEquals(
Lists.newArrayList(new StreamRecord<>(new User(1, "alanchan", 18, "sh".toUpperCase()), 10)),
testHarness.extractOutputStreamRecords());
Assert.assertEquals(new User(1, "alanchan", 18, "sh".toUpperCase()), stateValue);
testHarness.processElement(new User(2, "alan", 19, "bj"), 10000);
Assert.assertEquals(
Lists.newArrayList(
new StreamRecord<>(new User(1, "alanchan", 18, "sh".toUpperCase()), 10),
new StreamRecord<>(new User(2, "alan", 19, "bj".toUpperCase()), 10000)),
testHarness.extractOutputStreamRecords());
Assert.assertEquals(new User(2, "alan", 19, "bj".toUpperCase()), previousInput.value());
}
}
三、Process Function 单元测试
除了之前可以直接用于测试 ProcessFunction 的测试工具之外,Flink 还提供了一个名为 ProcessFunctionTestHarnesses 的测试工具工厂类,可以简化测试工具的实例化。
1、OneInputStreamOperatorTestHarness使用示例
import com.google.common.collect.Lists;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.streaming.api.operators.KeyedProcessOperator;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
import org.apache.flink.util.Collector;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
/*
* @Author: alanchan
* @LastEditors: alanchan
* @Description:
*/
public class TestProcessOperatorDemo1 {
// public abstract class KeyedProcessFunction<K, I, O>
static class AlanProcessFunction extends KeyedProcessFunction<String, String, String> {
@Override
public void processElement(String value, KeyedProcessFunction<String, String, String>.Context ctx,
Collector<String> out) throws Exception {
ctx.timerService().registerProcessingTimeTimer(50);
out.collect("vx->" + value);
}
@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception {
// 到达时间点触发事件操作
out.collect(String.format("定时器在 %d 被触发", timestamp));
}
}
private OneInputStreamOperatorTestHarness<String, String> testHarness;
private AlanProcessFunction processFunction;
@Before
public void setupTestHarness() throws Exception {
processFunction = new AlanProcessFunction();
testHarness = new KeyedOneInputStreamOperatorTestHarness<>(
new KeyedProcessOperator<>(processFunction),
x -> "1",
Types.STRING);
// Function time is initialized to 0
testHarness.open();
}
@Test
public void testProcessElement() throws Exception {
testHarness.processElement("alanchanchn", 10);
Assert.assertEquals(
Lists.newArrayList(
new StreamRecord<>("vx->alanchanchn", 10)),
testHarness.extractOutputStreamRecords());
}
@Test
public void testOnTimer() throws Exception {
// test first record
testHarness.processElement("alanchanchn", 10);
Assert.assertEquals(1, testHarness.numProcessingTimeTimers());
// Function time 设置为 100
testHarness.setProcessingTime(100);
Assert.assertEquals(
Lists.newArrayList(
new StreamRecord<>("vx->alanchanchn", 10),
new StreamRecord<>("定时器在 100 被触发")),
testHarness.extractOutputStreamRecords());
}
}
2、ProcessFunctionTestHarnesses使用示例
本示例通过ProcessFunctionTestHarnesses验证了ProcessFunction、KeyedProcessFunction、CoProcessFunction、KeyedCoProcessFunction和BroadcastProcessFunction,基本完成了覆盖。
import java.util.Arrays;
import java.util.Collections;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.state.ReadOnlyBroadcastState;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.streaming.api.functions.co.BroadcastProcessFunction;
import org.apache.flink.streaming.api.functions.co.CoProcessFunction;
import org.apache.flink.streaming.api.functions.co.KeyedCoProcessFunction;
import org.apache.flink.streaming.util.BroadcastOperatorTestHarness;
import org.apache.flink.streaming.util.KeyedTwoInputStreamOperatorTestHarness;
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
import org.apache.flink.streaming.util.ProcessFunctionTestHarnesses;
import org.apache.flink.streaming.util.TwoInputStreamOperatorTestHarness;
import org.apache.flink.util.Collector;
import org.junit.Assert;
import org.junit.Test;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
/*
* @Author: alanchan
*
* @LastEditors: alanchan
*
* @Description:
*/
public class TestProcessOperatorDemo3 {
@Data
@NoArgsConstructor
@AllArgsConstructor
static class User {
private int id;
private String name;
private int age;
private String city;
}
// 测试ProcessFunction 的 processElement
@Test
public void testProcessFunction() throws Exception {
// public abstract class ProcessFunction<I, O>
ProcessFunction<String, String> function = new ProcessFunction<String, String>() {
@Override
public void processElement(
String value, Context ctx, Collector<String> out) throws Exception {
out.collect("vx->" + value);
}
};
OneInputStreamOperatorTestHarness<String, String> harness = ProcessFunctionTestHarnesses
.forProcessFunction(function);
harness.processElement("alanchanchn", 10);
Assert.assertEquals(harness.extractOutputValues(), Collections.singletonList("vx->alanchanchn"));
}
// 测试KeyedProcessFunction 的 processElement
@Test
public void testKeyedProcessFunction() throws Exception {
// public abstract class KeyedProcessFunction<K, I, O>
KeyedProcessFunction<String, String, String> function = new KeyedProcessFunction<String, String, String>() {
@Override
public void processElement(String value, KeyedProcessFunction<String, String, String>.Context ctx,
Collector<String> out) throws Exception {
out.collect("vx->" + value);
}
};
OneInputStreamOperatorTestHarness<String, String> harness = ProcessFunctionTestHarnesses
.forKeyedProcessFunction(function, x -> "name", BasicTypeInfo.STRING_TYPE_INFO);
harness.processElement("alanchan", 10);
Assert.assertEquals(harness.extractOutputValues(), Collections.singletonList(1));
}
// 测试CoProcessFunction 的 processElement1、processElement2
@Test
public void testCoProcessFunction() throws Exception {
// public abstract class CoProcessFunction<IN1, IN2, OUT>
CoProcessFunction<String, User, User> function = new CoProcessFunction<String, User, User>() {
@Override
public void processElement1(String value, CoProcessFunction<String, User, User>.Context ctx,
Collector<User> out) throws Exception {
String[] userStr = value.split(",");
out.collect(
new User(Integer.parseInt(userStr[0]), userStr[1], Integer.parseInt(userStr[2]), userStr[3]));
}
@Override
public void processElement2(User value, CoProcessFunction<String, User, User>.Context ctx,
Collector<User> out) throws Exception {
out.collect(value);
}
};
TwoInputStreamOperatorTestHarness<String, User, User> harness = ProcessFunctionTestHarnesses
.forCoProcessFunction(function);
harness.processElement2(new User(2, "alan", 19, "bj"), 100);
harness.processElement1("1,alanchan,18,sh", 10);
Assert.assertEquals(harness.extractOutputValues(),
Arrays.asList(new User(1, "alanchan", 18, "sh"), new User(2, "alan", 19, "bj")));
}
// 测试KeyedCoProcessFunction 的 processElement1和processElement2
@Test
public void testKeyedCoProcessFunction() throws Exception {
// public abstract class KeyedCoProcessFunction<K, IN1, IN2, OUT>
KeyedCoProcessFunction<String, String, User, User> function = new KeyedCoProcessFunction<String, String, User, User>() {
@Override
public void processElement1(String value, KeyedCoProcessFunction<String, String, User, User>.Context ctx,
Collector<User> out) throws Exception {
String[] userStr = value.split(",");
out.collect(
new User(Integer.parseInt(userStr[0]), userStr[1], Integer.parseInt(userStr[2]), userStr[3]));
}
@Override
public void processElement2(User value, KeyedCoProcessFunction<String, String, User, User>.Context ctx,
Collector<User> out) throws Exception {
out.collect(value);
}
};
// public static <K,IN1,IN2,OUT>
// KeyedTwoInputStreamOperatorTestHarness<K,IN1,IN2,OUT>
// forKeyedCoProcessFunction(
// KeyedCoProcessFunction<K,IN1,IN2,OUT> function,
// KeySelector<IN1,K> keySelector1,
// KeySelector<IN2,K> keySelector2,
// TypeInformation<K> keyType)
KeyedTwoInputStreamOperatorTestHarness<String, String, User, User> harness = ProcessFunctionTestHarnesses
.forKeyedCoProcessFunction(function, new KeySelector<String, String>() {
@Override
public String getKey(String value) throws Exception {
return value.split(",")[3];
}
}, new KeySelector<User, String>() {
@Override
public String getKey(User value) throws Exception {
return value.getCity();
}
}, TypeInformation.of(String.class));
harness.processElement2(new User(2, "alan", 19, "bj"), 100);
harness.processElement1("1,alanchan,18,sh", 10);
Assert.assertEquals(harness.extractOutputValues(),
Arrays.asList(new User(1, "alanchan", 18, "sh"), new User(2, "alan", 19, "bj")));
}
// 测试 BroadcastProcessFunction 的 processElement 和 processBroadcastElement
@Test
public void testBroadcastOperator() throws Exception {
// 定义广播
// 数据格式:
// sh,上海
// bj,北京
// public class MapStateDescriptor<UK, UV>
MapStateDescriptor<String, String> broadcastDesc = new MapStateDescriptor("Alan_RulesBroadcastState",
String.class,
String.class);
// public abstract class BroadcastProcessFunction<IN1, IN2, OUT>
// * @param <IN1> The input type of the non-broadcast side.
// * @param <IN2> The input type of the broadcast side.
// * @param <OUT> The output type of the operator.
BroadcastProcessFunction<User, String, User> function = new BroadcastProcessFunction<User, String, User>() {
// 负责处理广播流的元素
@Override
public void processBroadcastElement(String value, BroadcastProcessFunction<User, String, User>.Context ctx,
Collector<User> out) throws Exception {
System.out.println("收到广播数据:" + value);
// 得到广播流的存储状态
ctx.getBroadcastState(broadcastDesc).put(value.split(",")[0], value.split(",")[1]);
}
// 处理非广播流,关联维度
@Override
public void processElement(User value, BroadcastProcessFunction<User, String, User>.ReadOnlyContext ctx,
Collector<User> out) throws Exception {
// 得到广播流的存储状态
ReadOnlyBroadcastState<String, String> state = ctx.getBroadcastState(broadcastDesc);
value.setCity(state.get(value.getCity()));
out.collect(value);
}
};
BroadcastOperatorTestHarness<User, String, User> harness = ProcessFunctionTestHarnesses
.forBroadcastProcessFunction(function, broadcastDesc);
harness.processBroadcastElement("sh,上海", 10);
harness.processBroadcastElement("bj,北京", 20);
harness.processElement(new User(2, "alan", 19, "bj"), 10);
harness.processElement(new User(1, "alanchan", 18, "sh"), 30);
Assert.assertEquals(harness.extractOutputValues(),
Arrays.asList(new User(1, "alanchan", 18, "上海"), new User(2, "alan", 19, "北京")));
}
}
以上,本文着重介绍了Flink的有状态算子的单元测试,通过四个例子介绍了flatMap 和 process function的有状态单元测试。
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。 如若内容造成侵权/违法违规/事实不符,请联系我的编程经验分享网邮箱:veading@qq.com进行投诉反馈,一经查实,立即删除!