Apache Flink 是一个支持流式数据处理的分布式计算框架。Flink 中的窗口功能可以将无限的数据流切割成有限的数据块,这是处理流数据的核心组件。目前,Flink 中的窗口可以是基于时间的(Time Window),也可以是基于数据量的(Count Window)。
以下是在 Flink 中使用窗口的两个示例:
java
// 示例代码1
// 示例代码2
在上一部分中,我们已经介绍了窗口的一些基本概念及其相关的API。现在,我们将通过一个实际例子来展示如何使用这些API。
以下是来自 flink-examples 的代码示例:
java
// 示例代码
在这个例子中,我们首先会对每条数据进行时间提取,然后进行 keyBy 操作,接着依次调用 window()、evictor() 和 trigger() 以及 maxBy() 方法。接下来我们将重点介绍 window()、evictor() 和 trigger() 这些方法。
window()
方法接收一个 WindowAssigner
参数,负责将每条数据分配到正确的窗口中(一条数据可以同时分配到多个窗口)。Flink 提供了几种常见的窗口分配器:滚动窗口(元素间无重叠)、滑动窗口(元素可以重叠)、会话窗口和全局窗口。如果需要自定义数据分配策略,可以通过继承 WindowAssigner
类实现。
evictor()
主要用于对数据进行自定义操作,可以在执行用户代码之前或之后进行。Flink 提供了以下几种通用的驱逐器:
- CountEvictor:保留指定数量的元素。
- DeltaEvictor:通过用户提供的 DeltaFunction
和预设的阈值判断是否删除元素。
- TimeEvictor:设定一个阈值间隔,删除不在 max_ts - interval
范围内的元素,其中 max_ts
是窗口内时间戳的最大值。
trigger()
用于判断一个窗口是否需要触发。每个 WindowAssigner
都自带一个默认的触发器,如果默认触发器无法满足需求,可以通过继承 Trigger
类来自定义触发器。触发器包含以下方法:
- onElement()
:每当向窗口添加一个元素时触发。
- onEventTime()
:当事件时间定时器触发时调用。
- onProcessingTime()
:当处理时间定时器触发时调用。
- onMerge()
:合并两个触发器的状态。
- clear()
:窗口销毁时调用。
触发器返回的结果有以下几种选择:
- CONTINUE
:不做任何事情。
- FIRE
:触发窗口。
- PURGE
:清空窗口中的所有元素并销毁窗口。
- FIRE_AND_PURGE
:触发窗口,然后销毁窗口。
在理解了时间驱动的窗口后,我们需要进一步了解时间(Time)和水印(Watermark)的概念。
在分布式环境中,时间是一个重要的概念。在 Flink 中,时间可分为三种类型:事件时间(Event-Time)、处理时间(Processing-Time)和摄入时间(Ingestion-Time)。这三种时间的关系可以通过下图来理解:
在 Flink 中,可以通过以下方式设置时间类型:
java
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); // 设置使用事件时间
了解了时间之后,我们还需要了解水印的概念。假设一个应用程序记录用户的点击行为,并将日志回传(在网络不佳的情况下,先保存在本地,稍后再回传)。用户A在11:02操作应用,用户B在11:03操作应用,但由于网络不稳定,用户A的日志延迟到达,导致服务器先收到用户B的11:03日志,随后才收到用户A的11:02日志,从而出现乱序。
为了保证基于事件时间的窗口在销毁时已经处理完所有数据,就需要使用水印。水印携带一个单调递增的时间戳t,表示所有时间戳不大于t的数据都已经到达,未来小于等于t的数据不会再来,因此可以放心地触发和销毁窗口。下图给出了一个乱序数据流中的水印示例:
水印可以应对乱序的数据,但在真实环境中,我们可能无法获得完美的水印值,因此实际工作中会使用近似的水印。即使生成了水印t,仍有可能接收到时间戳t之前的数据,这些数据被称为“迟到的数据”。在Flink中,可以设置允许的最大延迟时间,默认为0。可以通过以下代码设置:
java
// 示例代码
设置允许的最大延迟时间后,迟到的数据仍然可以触发窗口,进行处理。通过Flink的侧输出机制,我们可以获取这些迟到的数据,具体用法如下:
java
// 示例代码
需要注意的是,设置了允许的最大延迟时间后,迟到的数据也可能触发窗口,特别是对于会话窗口来说,可能会导致窗口合并,产生意外行为。
在讨论窗口的内部实现时,我们再次通过下图回顾窗口的生命周期:
- 每条数据过来后,由 WindowAssigner
分配到对应的窗口。
- 当窗口被触发后,交给 Evictor
(如果没有设置 Evictor
则跳过),然后处理用户函数。
其中,WindowAssigner
、Trigger
和 Evictor
我们已经在前面讨论过,而用户函数则是用户自己编写的代码。
在整个流程中,还有一个问题需要讨论:窗口中的状态存储。我们知道Flink支持精确一次(Exactly Once)处理语义,那么窗口中的状态存储和普通状态存储有什么不同呢?
首先,从接口上看,两者没有区别,但每个窗口属于不同的命名空间,而非窗口场景下则都属于 VoidNamespace
。最终由状态/检查点机制保证数据的精确一次语义。以下是来自 org.apache.flink.streaming.runtime.operators.windowing.WindowOperator
的一段代码说明:
java
// 示例代码
从这段代码中可以看出,窗口中的元素同样通过状态机制进行维护,然后由检查点机制保证精确一次语义。
至此,本文涵盖了窗口的所有相关内容,主要包括为什么需要窗口;窗口中的三个核心组件:窗口分配器、触发器和驱逐器;如何处理乱序数据、允许延迟的数据以及如何处理迟到的数据;最后,我们梳理了整个窗口的数据流程以及窗口中如何保证精确一次语义。