Flink系列之:窗口函数Windowing table-valued functions (Windowing TVFs)
Flink系列之:窗口函数Windowing table-valued functions Windowing TVFs
- 适用流、批
Windows 是处理无限流的核心。 Windows 将流分割成有限大小的“桶”,我们可以在其中应用计算。本文档重点介绍 Flink SQL 中如何执行窗口以及程序员如何从其提供的功能中获得最大收益。
Apache Flink 提供了多个窗口表值函数 (TVF) 将表的元素划分为窗口,包括:
- Tumble Windows
- Hop Windows
- Cumulate Windows
- 会话窗口(即将支持)
请注意,每个元素在逻辑上可以属于多个窗口,具体取决于您使用的窗口表值函数。例如,HOP 开窗创建重叠窗口,其中单个元素可以分配给多个窗口。
窗口 TVF 是 Fl??ink 定义的多态表函数(缩写为 PTF)。 PTF 是 SQL 2016 标准的一部分,是一种特殊的表函数,但可以将表作为参数。 PTF 是一个强大的功能,可以改变表格的形状。由于 PTF 在语义上的使用与表类似,因此它们的调用发生在 SELECT 语句的 FROM 子句中。
窗口 TVF 是传统分组窗口函数的替代品。窗口 TVF 更符合 SQL 标准,并且更强大,可以支持复杂的基于窗口的计算,例如窗口 TopN、窗口连接。但是,分组窗口函数只能支持窗口聚合。
了解更多如何基于加窗 TVF 应用进一步计算:
- 窗口聚合
- 窗口顶部N
- 窗口连接
- 窗口重复数据删除
一、窗口函数
Apache Flink 提供了 3 个内置窗口 TVF:TUMBLE、HOP 和 CUMULATE。窗口TVF的返回值是一个新的关系,包括原始关系的所有列以及额外的3列,名为“window_start”,“window_end”,“window_time”以指示分配的窗口。在流模式下,“window_time”字段是窗口的时间属性。在批处理模式下,“window_time”字段是基于输入时间字段类型的 TIMESTAMP 或 TIMESTAMP_LTZ 类型的属性。 “window_time”字段可用于后续基于时间的操作,例如另一个基于聚合的窗口 TVF 或间隔连接。 window_time 的值始终等于 window_end - 1ms。
二、Tumble Windows
TUMBLE 函数将每个元素分配给指定窗口大小的窗口。翻滚窗口具有固定大小且不重叠。例如,假设您指定大小为 5 分钟的滚动窗口。在这种情况下,Flink 将评估当前窗口,并每五分钟启动一个新窗口,如下图所示。
TUMBLE 函数根据时间属性字段为关系的每一行分配一个窗口。在流模式下,时间属性字段必须是事件或处理时间属性。在批处理模式下,窗口表函数的时间属性字段必须是TIMESTAMP或TIMESTAMP_LTZ类型的属性。 TUMBLE 的返回值是一个新的关系,其中包括原始关系的所有列以及额外的 3 列,名为“window_start”、“window_end”、“window_time”以指示分配的窗口。原始时间属性“timecol”将是窗口TVF之后的常规时间戳列。
TUMBLE 函数采用 3 个必需参数和 1 个可选参数:
TUMBLE(TABLE data, DESCRIPTOR(timecol), size [, offset ])
- data:是表参数,可以是与时间属性列的任意关系。
- timecol:是一个列描述符,指示数据的哪些时间属性列应映射到滚动窗口。
- size:是指定翻滚窗口宽度的持续时间。
- offset:是一个可选参数,用于指定窗口开始移动的偏移量。
以下是对 Bid 表的调用示例:
-- 表必须具有时间属性,例如该表中的“bidtime”
Flink SQL> desc Bid;
+-------------+------------------------+------+-----+--------+---------------------------------+
| name | type | null | key | extras | watermark |
+-------------+------------------------+------+-----+--------+---------------------------------+
| bidtime | TIMESTAMP(3) *ROWTIME* | true | | | `bidtime` - INTERVAL '1' SECOND |
| price | DECIMAL(10, 2) | true | | | |
| item | STRING | true | | | |
+-------------+------------------------+------+-----+--------+---------------------------------+
Flink SQL> SELECT * FROM Bid;
+------------------+-------+------+
| bidtime | price | item |
+------------------+-------+------+
| 2020-04-15 08:05 | 4.00 | C |
| 2020-04-15 08:07 | 2.00 | A |
| 2020-04-15 08:09 | 5.00 | D |
| 2020-04-15 08:11 | 3.00 | B |
| 2020-04-15 08:13 | 1.00 | E |
| 2020-04-15 08:17 | 6.00 | F |
+------------------+-------+------+
Flink SQL> SELECT * FROM TABLE(
TUMBLE(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '10' MINUTES));
-- 或使用命名参数
-- 注意:DATA 参数必须是第一个
Flink SQL> SELECT * FROM TABLE(
TUMBLE(
DATA => TABLE Bid,
TIMECOL => DESCRIPTOR(bidtime),
SIZE => INTERVAL '10' MINUTES));
+------------------+-------+------+------------------+------------------+-------------------------+
| bidtime | price | item | window_start | window_end | window_time |
+------------------+-------+------+------------------+------------------+-------------------------+
| 2020-04-15 08:05 | 4.00 | C | 2020-04-15 08:00 | 2020-04-15 08:10 | 2020-04-15 08:09:59.999 |
| 2020-04-15 08:07 | 2.00 | A | 2020-04-15 08:00 | 2020-04-15 08:10 | 2020-04-15 08:09:59.999 |
| 2020-04-15 08:09 | 5.00 | D | 2020-04-15 08:00 | 2020-04-15 08:10 | 2020-04-15 08:09:59.999 |
| 2020-04-15 08:11 | 3.00 | B | 2020-04-15 08:10 | 2020-04-15 08:20 | 2020-04-15 08:19:59.999 |
| 2020-04-15 08:13 | 1.00 | E | 2020-04-15 08:10 | 2020-04-15 08:20 | 2020-04-15 08:19:59.999 |
| 2020-04-15 08:17 | 6.00 | F | 2020-04-15 08:10 | 2020-04-15 08:20 | 2020-04-15 08:19:59.999 |
+------------------+-------+------+------------------+------------------+-------------------------+
-- 在翻滚窗口表上应用聚合
Flink SQL> SELECT window_start, window_end, SUM(price)
FROM TABLE(
TUMBLE(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '10' MINUTES))
GROUP BY window_start, window_end;
+------------------+------------------+-------+
| window_start | window_end | price |
+------------------+------------------+-------+
| 2020-04-15 08:00 | 2020-04-15 08:10 | 11.00 |
| 2020-04-15 08:10 | 2020-04-15 08:20 | 10.00 |
+------------------+------------------+-------+
注意:为了更好地理解窗口的行为,我们简化了时间戳值的显示,不显示尾随零,例如如果类型为 TIMESTAMP(3),2020-04-15 08:05 在 Flink SQL Client 中应显示为 2020-04-15 08:05:00.000。
三、Hop Windows
HOP 函数将元素分配给固定长度的窗口。与 TUMBLE 窗口函数一样,窗口的大小由窗口大小参数配置。附加的窗口滑动参数控制跳跃窗口启动的频率。因此,如果幻灯片小于窗口大小,则跳跃窗口可能会重叠。在这种情况下,元素被分配给多个窗口。跳跃窗口也称为“滑动窗口”。
例如,您可以将大小为 10 分钟的窗口滑动 5 分钟。这样,您每 5 分钟就会获得一个窗口,其中包含过去 10 分钟内到达的事件,如下图所示。
HOP 函数分配覆盖大小间隔内的行的窗口,并根据时间属性字段移动每张幻灯片。在流模式下,时间属性字段必须是事件或处理时间属性。在批处理模式下,窗口表函数的时间属性字段必须是TIMESTAMP或TIMESTAMP_LTZ类型的属性。 HOP的返回值是一个新的关系,包括原始关系的所有列以及额外的3列,名为“window_start”,“window_end”,“window_time”以指示分配的窗口。原始时间属性“timecol”将是加窗TVF后的常规时间戳列。
HOP 采用 4 个必需参数和 1 个可选参数:
HOP(TABLE data, DESCRIPTOR(timecol), slide, size [, offset ])
- data:是一个表参数,可以是与时间属性列的任意关系。
- timecol:是一个列描述符,指示数据的哪些时间属性列应映射到跳跃窗口。
- Slide:是指定连续跳跃窗口开始之间的持续时间的持续时间
- size:是指定跳跃窗口宽度的持续时间。
- offset:是一个可选参数,用于指定窗口开始移动的偏移量。
以下是对 Bid 表的调用示例:
> SELECT * FROM TABLE(
HOP(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '5' MINUTES, INTERVAL '10' MINUTES));
-- 或使用命名参数
-- 注意:DATA 参数必须是第一个
> SELECT * FROM TABLE(
HOP(
DATA => TABLE Bid,
TIMECOL => DESCRIPTOR(bidtime),
SLIDE => INTERVAL '5' MINUTES,
SIZE => INTERVAL '10' MINUTES));
+------------------+-------+------+------------------+------------------+-------------------------+
| bidtime | price | item | window_start | window_end | window_time |
+------------------+-------+------+------------------+------------------+-------------------------+
| 2020-04-15 08:05 | 4.00 | C | 2020-04-15 08:00 | 2020-04-15 08:10 | 2020-04-15 08:09:59.999 |
| 2020-04-15 08:05 | 4.00 | C | 2020-04-15 08:05 | 2020-04-15 08:15 | 2020-04-15 08:14:59.999 |
| 2020-04-15 08:07 | 2.00 | A | 2020-04-15 08:00 | 2020-04-15 08:10 | 2020-04-15 08:09:59.999 |
| 2020-04-15 08:07 | 2.00 | A | 2020-04-15 08:05 | 2020-04-15 08:15 | 2020-04-15 08:14:59.999 |
| 2020-04-15 08:09 | 5.00 | D | 2020-04-15 08:00 | 2020-04-15 08:10 | 2020-04-15 08:09:59.999 |
| 2020-04-15 08:09 | 5.00 | D | 2020-04-15 08:05 | 2020-04-15 08:15 | 2020-04-15 08:14:59.999 |
| 2020-04-15 08:11 | 3.00 | B | 2020-04-15 08:05 | 2020-04-15 08:15 | 2020-04-15 08:14:59.999 |
| 2020-04-15 08:11 | 3.00 | B | 2020-04-15 08:10 | 2020-04-15 08:20 | 2020-04-15 08:19:59.999 |
| 2020-04-15 08:13 | 1.00 | E | 2020-04-15 08:05 | 2020-04-15 08:15 | 2020-04-15 08:14:59.999 |
| 2020-04-15 08:13 | 1.00 | E | 2020-04-15 08:10 | 2020-04-15 08:20 | 2020-04-15 08:19:59.999 |
| 2020-04-15 08:17 | 6.00 | F | 2020-04-15 08:10 | 2020-04-15 08:20 | 2020-04-15 08:19:59.999 |
| 2020-04-15 08:17 | 6.00 | F | 2020-04-15 08:15 | 2020-04-15 08:25 | 2020-04-15 08:24:59.999 |
+------------------+-------+------+------------------+------------------+-------------------------+
-- 在跳跃窗口表上应用聚合
> SELECT window_start, window_end, SUM(price)
FROM TABLE(
HOP(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '5' MINUTES, INTERVAL '10' MINUTES))
GROUP BY window_start, window_end;
+------------------+------------------+-------+
| window_start | window_end | price |
+------------------+------------------+-------+
| 2020-04-15 08:00 | 2020-04-15 08:10 | 11.00 |
| 2020-04-15 08:05 | 2020-04-15 08:15 | 15.00 |
| 2020-04-15 08:10 | 2020-04-15 08:20 | 10.00 |
| 2020-04-15 08:15 | 2020-04-15 08:25 | 6.00 |
+------------------+------------------+-------+
四、Cumulate Windows
累积窗口在某些场景中非常有用,例如在固定窗口间隔内提前触发的翻滚窗口。例如,每日仪表板绘制从 00:00 到每分钟的累积 UV,10:00 的 UV 代表从 00:00 到 10:00 的 UV 总数。这可以通过累积窗口轻松有效地实现。
CUMULATE 函数将元素分配给覆盖初始步长间隔内的行的窗口,并每一步扩展为一个步长(保持窗口起始位置固定),直到达到最大窗口大小。您可以将 CUMULATE 函数视为首先应用最大窗口大小的 TUMBLE 窗口,并将每个翻滚窗口拆分为具有相同窗口开始和窗口结束步长差异的多个窗口。因此累积窗口确实会重叠并且没有固定的大小。
例如,您可以有一个 1 小时步长和 1 天最大大小的累积窗口,您将获得窗口:[00:00, 01:00)、[00:00, 02:00)、[00:00, 03:00), …, [00:00, 24:00) 每天。
CUMULATE 函数根据时间属性列分配窗口。在流模式下,时间属性字段必须是事件或处理时间属性。在批处理模式下,窗口表函数的时间属性字段必须是TIMESTAMP或TIMESTAMP_LTZ类型的属性。 CUMULATE 的返回值是一个新的关系,其中包括原始关系的所有列以及额外的 3 个名为“window_start”、“window_end”、“window_time”的列来指示分配的窗口。原始时间属性“timecol”将是窗口TVF之后的常规时间戳列。
CUMULATE 采用四个必需参数和一个可选参数:
CUMULATE(TABLE data, DESCRIPTOR(timecol), step, size)
- data:是一个表参数,可以是与时间属性列的任意关系。
- timecol:是一个列描述符,指示数据的哪些时间属性列应映射到累积窗口。
- 步骤:是指定连续累积窗口末尾之间增加的窗口大小的持续时间。
- size:是指定累积窗口最大宽度的持续时间。 size 必须是step 的整数倍。
- offset:是一个可选参数,用于指定窗口开始移动的偏移量。
以下是对 Bid 表的调用示例:
> SELECT * FROM TABLE(
CUMULATE(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '2' MINUTES, INTERVAL '10' MINUTES));
-- 或使用命名参数
-- 注意:DATA 参数必须是第一个
> SELECT * FROM TABLE(
CUMULATE(
DATA => TABLE Bid,
TIMECOL => DESCRIPTOR(bidtime),
STEP => INTERVAL '2' MINUTES,
SIZE => INTERVAL '10' MINUTES));
+------------------+-------+------+------------------+------------------+-------------------------+
| bidtime | price | item | window_start | window_end | window_time |
+------------------+-------+------+------------------+------------------+-------------------------+
| 2020-04-15 08:05 | 4.00 | C | 2020-04-15 08:00 | 2020-04-15 08:06 | 2020-04-15 08:05:59.999 |
| 2020-04-15 08:05 | 4.00 | C | 2020-04-15 08:00 | 2020-04-15 08:08 | 2020-04-15 08:07:59.999 |
| 2020-04-15 08:05 | 4.00 | C | 2020-04-15 08:00 | 2020-04-15 08:10 | 2020-04-15 08:09:59.999 |
| 2020-04-15 08:07 | 2.00 | A | 2020-04-15 08:00 | 2020-04-15 08:08 | 2020-04-15 08:07:59.999 |
| 2020-04-15 08:07 | 2.00 | A | 2020-04-15 08:00 | 2020-04-15 08:10 | 2020-04-15 08:09:59.999 |
| 2020-04-15 08:09 | 5.00 | D | 2020-04-15 08:00 | 2020-04-15 08:10 | 2020-04-15 08:09:59.999 |
| 2020-04-15 08:11 | 3.00 | B | 2020-04-15 08:10 | 2020-04-15 08:12 | 2020-04-15 08:11:59.999 |
| 2020-04-15 08:11 | 3.00 | B | 2020-04-15 08:10 | 2020-04-15 08:14 | 2020-04-15 08:13:59.999 |
| 2020-04-15 08:11 | 3.00 | B | 2020-04-15 08:10 | 2020-04-15 08:16 | 2020-04-15 08:15:59.999 |
| 2020-04-15 08:11 | 3.00 | B | 2020-04-15 08:10 | 2020-04-15 08:18 | 2020-04-15 08:17:59.999 |
| 2020-04-15 08:11 | 3.00 | B | 2020-04-15 08:10 | 2020-04-15 08:20 | 2020-04-15 08:19:59.999 |
| 2020-04-15 08:13 | 1.00 | E | 2020-04-15 08:10 | 2020-04-15 08:14 | 2020-04-15 08:13:59.999 |
| 2020-04-15 08:13 | 1.00 | E | 2020-04-15 08:10 | 2020-04-15 08:16 | 2020-04-15 08:15:59.999 |
| 2020-04-15 08:13 | 1.00 | E | 2020-04-15 08:10 | 2020-04-15 08:18 | 2020-04-15 08:17:59.999 |
| 2020-04-15 08:13 | 1.00 | E | 2020-04-15 08:10 | 2020-04-15 08:20 | 2020-04-15 08:19:59.999 |
| 2020-04-15 08:17 | 6.00 | F | 2020-04-15 08:10 | 2020-04-15 08:18 | 2020-04-15 08:17:59.999 |
| 2020-04-15 08:17 | 6.00 | F | 2020-04-15 08:10 | 2020-04-15 08:20 | 2020-04-15 08:19:59.999 |
+------------------+-------+------+------------------+------------------+-------------------------+
-- 对累积窗口表应用聚合
> SELECT window_start, window_end, SUM(price)
FROM TABLE(
CUMULATE(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '2' MINUTES, INTERVAL '10' MINUTES))
GROUP BY window_start, window_end;
+------------------+------------------+-------+
| window_start | window_end | price |
+------------------+------------------+-------+
| 2020-04-15 08:00 | 2020-04-15 08:06 | 4.00 |
| 2020-04-15 08:00 | 2020-04-15 08:08 | 6.00 |
| 2020-04-15 08:00 | 2020-04-15 08:10 | 11.00 |
| 2020-04-15 08:10 | 2020-04-15 08:12 | 3.00 |
| 2020-04-15 08:10 | 2020-04-15 08:14 | 4.00 |
| 2020-04-15 08:10 | 2020-04-15 08:16 | 4.00 |
| 2020-04-15 08:10 | 2020-04-15 08:18 | 10.00 |
| 2020-04-15 08:10 | 2020-04-15 08:20 | 10.00 |
+------------------+------------------+-------+
四、Window Offset
Offset 是一个可选参数,可用于更改窗口分配。它可以是正持续时间和负持续时间。窗口偏移的默认值为0。如果设置不同的偏移值,同一条记录可能会分配到不同的窗口。
例如,对于大小为 10 MINUTE 的 Tumble 窗口,时间戳为 2021-06-30 00:00:04 的记录将分配给哪个窗口?
- 如果偏移值为-16 MINUTE,则记录分配给窗口[2021-06-29 23:54:00, 2021-06-30 00:04:00)。
- 如果偏移值为-6 MINUTE,则记录分配给窗口[2021-06-29 23:54:00, 2021-06-30 00:04:00)。
- 如果偏移量为-4 MINUTE,则记录分配给窗口[2021-06-29 23:56:00, 2021-06-30 00:06:00)。
- 如果 offset 为 0,则记录分配给窗口 [2021-06-30 00:00:00, 2021-06-30 00:10:00)。
- 如果偏移量为 4 MINUTE,则记录分配给窗口 [2021-06-29 23:54:00, 2021-06-30 00:04:00)。
- 如果偏移量为 6 MINUTE,则记录分配给窗口 [2021-06-29 23:56:00, 2021-06-30 00:06:00)。
- 如果偏移量为 16 MINUTE,则记录分配给窗口 [2021-06-29 23:56:00, 2021-06-30 00:06:00)。我们可以发现,一些窗口偏移参数可能对窗口的分配有同样的影响。在上述情况下,-16 MINUTE、-6 MINUTE 和 4 MINUTE 对于大小为 10 MINUTE 的翻滚窗口具有相同的效果。
注意:窗口偏移的作用只是更新窗口分配,对 Watermark 没有影响。
我们通过一个例子来描述如何在下面的 SQL 中使用 Tumble 窗口中的偏移量。
-- 注意:目前 Flink 不支持评估单个窗口表值函数,
-- 窗口表值函数应与聚合操作一起使用,
-- 此示例仅用于解释表值函数的语法和产生的数据。
Flink SQL> SELECT * FROM TABLE(
TUMBLE(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '10' MINUTES, INTERVAL '1' MINUTES));
-- 或使用命名参数
-- 注意:DATA 参数必须是第一个
Flink SQL> SELECT * FROM TABLE(
TUMBLE(
DATA => TABLE Bid,
TIMECOL => DESCRIPTOR(bidtime),
SIZE => INTERVAL '10' MINUTES,
OFFSET => INTERVAL '1' MINUTES));
+------------------+-------+------+------------------+------------------+-------------------------+
| bidtime | price | item | window_start | window_end | window_time |
+------------------+-------+------+------------------+------------------+-------------------------+
| 2020-04-15 08:05 | 4.00 | C | 2020-04-15 08:01 | 2020-04-15 08:11 | 2020-04-15 08:10:59.999 |
| 2020-04-15 08:07 | 2.00 | A | 2020-04-15 08:01 | 2020-04-15 08:11 | 2020-04-15 08:10:59.999 |
| 2020-04-15 08:09 | 5.00 | D | 2020-04-15 08:01 | 2020-04-15 08:11 | 2020-04-15 08:10:59.999 |
| 2020-04-15 08:11 | 3.00 | B | 2020-04-15 08:11 | 2020-04-15 08:21 | 2020-04-15 08:20:59.999 |
| 2020-04-15 08:13 | 1.00 | E | 2020-04-15 08:11 | 2020-04-15 08:21 | 2020-04-15 08:20:59.999 |
| 2020-04-15 08:17 | 6.00 | F | 2020-04-15 08:11 | 2020-04-15 08:21 | 2020-04-15 08:20:59.999 |
+------------------+-------+------+------------------+------------------+-------------------------+
-- 在翻滚窗口表上应用聚合
Flink SQL> SELECT window_start, window_end, SUM(price)
FROM TABLE(
TUMBLE(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '10' MINUTES, INTERVAL '1' MINUTES))
GROUP BY window_start, window_end;
+------------------+------------------+-------+
| window_start | window_end | price |
+------------------+------------------+-------+
| 2020-04-15 08:01 | 2020-04-15 08:11 | 11.00 |
| 2020-04-15 08:11 | 2020-04-15 08:21 | 10.00 |
+------------------+------------------+-------+
注意:为了更好地理解窗口的行为,我们简化了时间戳值的显示,不显示尾随零,例如如果类型为 TIMESTAMP(3),2020-04-15 08:05 在 Flink SQL Client 中应显示为 2020-04-15 08:05:00.000。
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。 如若内容造成侵权/违法违规/事实不符,请联系我的编程经验分享网邮箱:veading@qq.com进行投诉反馈,一经查实,立即删除!