在构建实时流媒体应用程序时,事件时间处理是一项不可或缺的功能。因为在现实世界的大多数应用场景中,消息可能会无序到达,所以需要一种机制让系统理解消息可能迟到,并据此进行处理。本文将探讨为什么我们需要事件时间处理,以及如何在Apache Flink中启用这一功能。
事件时间指的是消息在实际环境中发生的时间,而处理时间则是Flink系统处理消息的时间。为了理解事件时间处理的重要性,我们首先构建一个基于处理时间的系统,观察其存在的问题。
我们将创建一个大小为10秒的滑动窗口,每5秒滑动一次。在窗口结束时,系统会统计在这个时间段内收到的消息数量。理解事件时间处理与滑动窗口的关系后,我们再来看看如何在滚动窗口中应用它。接下来,我们开始探讨。
在这个例子中,我们期望消息具有值和时间戳两种属性,其中值是消息的内容,时间戳是消息生成的时间。由于我们正在构建基于处理时间的系统,以下代码将忽略时间戳部分。
理解消息应包含生成时间的信息是至关重要的。Flink或任何其他系统都不是一个魔法盒子,能够自动识别这一点。稍后我们会看到,事件时间处理可以从消息中提取时间戳信息,以便处理延迟到达的消息。
scala
val text = senv.socketTextStream("localhost", 9999)
val counts = text.map { m => (m.split(",")(0), 1) }
.keyBy(0)
.timeWindow(Time.seconds(10), Time.seconds(5))
.sum(1)
counts.print
senv.execute("处理时间处理示例")
假设源分别在第13秒、第13秒和第16秒生成了三个类型a的消息。(小时和分钟不重要,因为窗口大小只有10秒)。
这些消息将落入窗口中,前两个消息在第13秒生成,会落入窗口1 [5秒-15秒]和窗口2 [10秒-20秒];第三个消息在第16秒生成,会落入窗口2 [10秒-20秒]和窗口3 [15秒-25秒]。每个窗口最终输出的数量分别为(a,2)、(a,3)和(a,1)。
这个输入被认为是预期的行为。现在,我们来看一下当消息到达系统的时间发生变化时会发生什么。
假设其中一个消息(在第13秒生成)延迟了6秒(在第19秒)到达,可能是由于网络拥堵。你能猜到这个消息会落入哪个窗口?
延迟的消息会落入窗口2和窗口3,因为19秒在10秒-20秒和15秒-25秒之间。在窗口2中计算结果没有问题(因为消息应该落入该窗口),但这会影响窗口1和窗口3的结果。现在,我们将尝试使用事件时间处理解决这个问题。
要启用事件时间处理,我们需要一个时间戳提取器,从消息中提取事件时间信息。请记住,消息的格式为值和时间戳。extractTimestamp
方法获取时间戳部分并转换为long型。
scala
class TimestampExtractor extends AssignerWithPeriodicWatermarks[String] with Serializable {
override def extractTimestamp(element: String, previousElementTimestamp: Long): Long = {
element.split(",")(1).toLong
}
override def getCurrentWatermark(): Watermark = {
new Watermark(System.currentTimeMillis())
}
}
我们需要设置这个时间戳提取器,并将时间特性设置为事件时间。其余代码与处理时间的情况保持一致。
scala
senv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
val text = senv.socketTextStream("localhost", 9999)
.assignTimestampsAndWatermarks(new TimestampExtractor)
val counts = text.map { m => (m.split(",")(0), 1) }
.keyBy(0)
.timeWindow(Time.seconds(10), Time.seconds(5))
.sum(1)
counts.print
senv.execute("事件时间处理示例")
运行上述代码的结果如下:
结果看起来更好,窗口2和窗口3现在输出了正确的结果,但窗口1仍然有问题。Flink不会将延迟的消息分配给窗口3,因为它现在检查了消息的事件时间,并意识到它不在该窗口内。那么,为什么消息没有分配给窗口1呢?原因是在延迟的消息到达系统时(第19秒),窗口1的评估已经完成(第15秒)。现在,我们将通过引入水印来解决这个问题。
水印是一个非常关键且有趣的概念,我将尽量简洁地介绍。如果你有兴趣了解更多,可以参考谷歌上的精彩演讲或DataArtisans的博客。水印本质上是一个时间戳。当Flink中的运算符接收到水印时,它知道不会再看到比该时间戳更早的消息。
为了简化说明,我们可以把水印看作一种方式,告诉Flink消息可以延迟多久。在之前的尝试中,我们将水印设置为当前系统时间,这意味着不期望有任何延迟的消息。现在,我们将水印设置为当前时间减去5秒,这意味着告诉Flink消息最多可能延迟5秒。这是因为每个窗口只有在水印经过时才会进行评估。因此,第一个窗口[5秒-15秒]将在第20秒被评估,第二个窗口[10秒-20秒]将在第25秒被评估,以此类推。
scala
override def getCurrentWatermark(): Watermark = {
new Watermark(System.currentTimeMillis() - 5000)
}
通常情况下,最好保持接收的最大时间戳,并创建具有最大预期延迟的水印,而不是简单地从当前系统时间减去。
进行上述修改后,运行代码的结果如下:
最终我们得到了正确结果,所有三个窗口现在都按照预期的方式输出计数,即(a,2)、(a,3)和(a,1)。
更新:我们还可以使用AllowedLateness
功能来设置消息的最大允许延迟,从而解决这个问题。
结论:
实时流处理系统的应用越来越广泛,处理延迟消息是构建此类系统的重要组成部分。在这篇文章中,我们探讨了到达的消息延迟如何影响系统的结果,以及如何使用Apache Flink的事件时间处理功能来解决这些问题。感谢您的阅读!