学习 Flink(四):水印

更新至 Flink 1.9.0 版本

时间

要理解水印(Wartermark),首先需要理解三个时间概念:

  • 事件时间(Event Time)
  • 摄入时间(Ingestion Time)
  • 处理时间(Process Time)

Time Clock

事件时间是每一个独立的事件在产生它的设备中发生的时间。以用户行为分析为例,事件时间就是用户访问页面点击按钮时的时间。

摄入时间是事件进入 Flink 的时间。以用户行为分析为例,摄入时间就是 Flink Kafka Connector 从 Kafaka 消费埋点数据的时间。

处理时间是执行相应操作的机器的系统时间。以用户行为分析为例,处理时间就是埋点数据执行操作的 JobManager 机器上的系统时间。

理想情况,产生的数据被立即处理,那么事件时间等于处理时间。现实情况,因底层输入数据源、执行引擎和硬件的差异,事件时间和处理时间会有非常大的不同。

Time Domain Mapping

X 轴表示事件时间,Y 轴表示处理时间,虚线表示理想情况,红色实线表示现实情况。通过观察可以发现处理时间滞后(Processing Time Lag)和事件时间倾斜(Event Time Skew)的情况。

水印

在事件时间场景下,Flink 支持水印按事件时间处理可能的延迟和乱序事件。水印的作用:告知算子之后不会有小于或等于水印时间戳的事件

按事件时间处理:

env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)  

最佳实践是在尽可能靠近数据源甚至在 SourceFuntion 分配时间戳和生产水印。

Flink 提供了两个预定义实现类:

  • org.apache.flink.streaming.api.functions.timestamps.AscendingTimestampExtractor 适用于时间戳递增的情况;
  • org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor 适用于乱序但最大延迟已知的情况。

AscendingTimestampExtractor 🌰:

stream.assignTimestampsAndWatermarks(new AscendingTimestampExtractor[Event] {  
    override def extractAscendingTimestamp(element: Event): Long = { ... }
})

BoundedOutOfOrdernessTimestampExtractor 🌰:

stream.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[String](Time.minutes(1L)) {  
    override def extractTimestamp(element: String): Long = { ... }
})

同时,也可以自定义实现类,通过实现👇接口:

  • org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks 用于周期生成水印,通过 ExecutionConfig.setAutoWatermarkInterval(long interval):ExecutionConfig 方法设置周期;
  • org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks 用于根据特定的元素生成水印。

TimestampAssigner

参考