Flink watermark

关于 watermark

Nicki

是某一线互联网大厂的数据开发

最近

由于公司业务的发展, 以及业务对数据实时性要求变高

Nicki 开始使用 flink 进行实时数据开发

今天

Nicki 在使用 flink datastream api 进行开发

当她写完 watermark 分配器之后

脑海中突然有了一个疑问

watermark 的分配只能使用时间戳吗?

带着这个疑问,Nicki 找到了 lsp 数据羊

img

img

img

img

img

img

img

img

img

img

[白话解析] Flink的Watermark机制

0x00 摘要

对于Flink来说,Watermark是个很难绕过去的概念。本文将从整体的思路上来说,运用感性直觉的思考来帮大家梳理Watermark概念。

0x01 问题

关于Watermark,很容易产生几个问题

  • Flink 流处理应用中,常见的处理需求/应对方案是什么?
  • Watermark究竟应该翻译成水印还是水位线?
  • Watermark本质是什么?
  • Watermark是如何解决问题?

下面我们就来简要解答这些问题以给大家一个大致概念,在后文中,会再深入描述。

聚合类的处理 Flink可以每来一个消息就处理一次,但是有时我们需要做一些聚合类的处理,例如:在过去的1分钟内有多少用户点击了我们的网页。所以Flink引入了窗口概念。

窗口 窗口的作用为了周期性的获取数据。就是把传入的原始数据流切分成多个buckets,所有计算都在单一的buckets中进行。窗口(window)就是从 Streaming 到 Batch 的一个桥梁。

带来的问题:聚合类处理带来了新的问题,比如乱序/延迟。其解决方案就是 Watermark / allowLateNess / sideOutPut 这一组合拳。

Watermark 的作用是防止 数据乱序 / 指定时间内获取不到全部数据。

allowLateNess 是将窗口关闭时间再延迟一段时间。

sideOutPut 是最后兜底操作,当指定窗口已经彻底关闭后,就会把所有过期延迟数据放到侧输出流,让用户决定如何处理。

总结起来就是说

1
2
3
Windows -----> Watermark -----> allowLateNess -----> sideOutPut 
   
用Windows把流数据分块处理,用Watermark确定什么时候不再等待更早的数据/触发窗口进行计算,用allowLateNess 将窗口关闭时间再延迟一段时间。用sideOutPut 最后兜底把数据导出到其他地方。

问题2. Watermark应该翻译成水位线

**我最初看的一篇文章中把Watermark翻译成“*水印”。我当时比较晕。因为按说名字一定能够反应事物本质。但是我怎么也脑补不出这个”水印“的本质。

继续看文章内容,越来越觉得这个应该翻译成“水位线”。于是查了查,确实英文有如下翻译:high-water mark 高水位线(海水或洪水所达到的最高水位)。

后来逐渐看到其他文章中也有翻译成水位线,我才放心下来,终于不会出现第二个“套接字”这样神奇的翻译了。

问题3. Watermark本质是什么

Watermarks是基于已经收集的消息来估算是否还有消息未到达,本质上是一个时间戳。时间戳反映的是事件发生的时间,而不是事件处理的时间。

这个从Flink的源码就能看出来,唯一有意义的成员变量就是 timestamp。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public final class Watermark extends StreamElement {
 /*The watermark that signifies end-of-event-time. */
 public static final Watermark MAX_WATERMARK = new Watermark(Long.MAX_VALUE);
 /* The timestamp of the watermark in milliseconds. */
 private final long timestamp;
 /* Creates a new watermark with the given timestamp in milliseconds.*/
 public Watermarklong timestamp) {
   this.timestamp = timestamp;
}
 /*Returns the timestamp associated with this {@link Watermark} in milliseconds.**/
 public long getTimestamp() {
   return timestamp;
}
}

问题4. Watermark如何解决问题

Watermark是一种告诉Flink一个消息延迟多少的方式。它定义了什么时候不再等待更早的数据。

可以把Watermarks理解为一个水位线,这个Watermarks在不断的变化。Watermark实际上作为数据流的一部分随数据流流动。

当Flink中的运算符接收到Watermarks时,它明白早于该时间的消息已经完全抵达计算引擎,即假设不会再有时间小于水位线的事件到达。

这个假设是触发窗口计算的基础,只有水位线越过窗口对应的结束时间,窗口才会关闭和进行计算。

0x02 背景概念

流处理

流处理,最本质的是在处理数据的时候,接受一条处理一条数据。

批处理,则是累积数据到一定程度在处理。这是他们本质的区别。

在设计上Flink认为数据是流式的,批处理只是流处理的特例。同时对数据分为有界数据和无界数据。

  • 有界数据对应批处理,API对应Dateset。
  • 无界数据对应流处理,API对应DataStream。

乱序(out-of-order)

什么是乱序呢?可以理解为数据到达的顺序和其实际产生时间的排序不一致。导致这的原因有很多,比如延迟,消息积压,重试等等。

我们知道,流处理从事件产生,到流经source,再到operator,中间是有一个过程和时间的。虽然大部分情况下,流到operator的数据都是按照事件产生的时间顺序来的,但是也不排除由于网络、背压等原因,导致乱序的产生(out-of-order或者说late element)。

比如:

1
2
3
4
5
某数据源中的某些数据由于某种原因(如:网络原因,外部存储自身原因)会有5秒的延时,
也就是在实际时间的第1秒产生的数据有可能在第5秒中产生的数据之后到来(比如到Window处理节点)。

有1~10个事件。
乱序到达的序列是:2,3,4,5,1,6,3,8,9,10,7

0x03 Flink中的窗口概念

窗口

对于Flink,如果来一条消息计算一条,这样是可以的,但是这样计算是非常频繁而且消耗资源,如果想做一些统计这是不可能的。所以对于Spark和Flink都产生了窗口计算。

比如 是因为我们想看到过去一分钟,过去半小时的访问数据,这时候我们就需要窗口。

Window:Window是处理无界流的关键,Windows将流拆分为一个个有限大小的buckets,可以可以在每一个buckets中进行计算。

start_time,end_time:当Window时时间窗口的时候,每个window都会有一个开始时间和结束时间(前开后闭),这个时间是系统时间。

窗口生命周期

简而言之,只要属于此窗口的第一个元素到达,就会创建一个窗口,当时间(事件或处理时间)超过其结束时间戳加上用户指定的允许延迟时,窗口将被完全删除。

例如:

1
2
3
4
5
使用基于事件时间的窗口策略,每5分钟创建一个不重叠(或翻滚)的窗口并允许延迟1分钟。
   
假定目前是12:00。

当具有落入该间隔的时间戳的第一个元素到达时,Flink将为12:00到12:05之间的间隔创建一个新窗口,当水位线(watermark)到12:06时间戳时将删除它。

窗口有如下组件:

Window Assigner: 用来决定某个元素被分配到哪个/哪些窗口中去。

Trigger: 触发器。决定了一个窗口何时能够被计算或清除。触发策略可能类似于“当窗口中的元素数量大于4”时,或“当水位线通过窗口结束时”。

Evictor: 它可以在 触发器触发后 & 应用函数之前和/或之后 从窗口中删除元素。

窗口还拥有函数,比如 ProcessWindowFunction,ReduceFunction,AggregateFunction或FoldFunction。该函数将包含要应用于窗口内容的计算,而触发器指定窗口被认为准备好应用该函数的条件。

Keyed vs Non-Keyed Windows

在定义窗口之前,要指定的第一件事是流是否需要Keyed,使用keyBy(…)将无界流分成逻辑的keyed stream。 如果未调用keyBy(…),则表示流不是keyed stream。

  • 对于Keyed流,可以将传入事件的任何属性用作key。 拥有Keyed stream将允许窗口计算由多个任务并行执行,因为每个逻辑Keyed流可以独立于其余任务进行处理。 相同Key的所有元素将被发送到同一个任务。
  • 在Non-Keyed流的情况下,原始流将不会被分成多个逻辑流,并且所有窗口逻辑将由单个任务执行,即并行性为1。

窗口分类

窗口分类可以分成:翻滚窗口(Tumbling Window,无重叠),滚动窗口(Sliding Window,有重叠),和会话窗口,(Session Window,活动间隙)

滚动窗口
滚动窗口分配器将每个元素分配给固定窗口大小的窗口。滚动窗口大小固定的并且不重叠。例如,如果指定大小为5分钟的滚动窗口,则将执行当前窗口,并且每五分钟将启动一个新窗口。

滑动窗口

滑动窗口与滚动窗口的区别就是滑动窗口有重复的计算部分。

滑动窗口分配器将每个元素分配给固定窗口大小的窗口。类似于滚动窗口分配器,窗口的大小由窗口大小参数配置。另外一个窗口滑动参数控制滑动窗口的启动频率(how frequently a sliding window is started)。因此,如果滑动大小小于窗口大小,滑动窗可以重叠。在这种情况下,元素被分配到多个窗口。

例如,你可以使用窗口大小为10分钟的窗口,滑动大小为5分钟。这样,每5分钟会生成一个窗口,包含最后10分钟内到达的事件。

会话窗口
会话窗口分配器通过活动会话分组元素。与滚动窗口和滑动窗口相比,会话窗口不会重叠,也没有固定的开始和结束时间。相反,当会话窗口在一段时间内没有接收到元素时会关闭。

例如,不活动的间隙时。会话窗口分配器配置会话间隙,定义所需的不活动时间长度(defines how long is the required period of inactivity)。当此时间段到期时,当前会话关闭,后续元素被分配到新的会话窗口。

0x04 Flink中的时间概念

Flink在流处理程序支持不同的时间概念。分别为Event Time/Processing Time/Ingestion Time,也就是事件时间、处理时间、提取时间。

从时间序列角度来说,发生的先后顺序是:

1
事件时间(Event Time)----> 提取时间(Ingestion Time)----> 处理时间(Processing Time)
  • Event Time 是事件在现实世界中发生的时间,它通常由事件中的时间戳描述。
  • Ingestion Time 是数据进入Apache Flink流处理系统的时间,也就是Flink读取数据源时间。
  • Processing Time 是数据流入到具体某个算子 (消息被计算处理) 时候相应的系统时间。也就是Flink程序处理该事件时当前系统时间。

但是我们讲解时,会从后往前讲解,把最重要的Event Time放在最后。

处理时间

是数据流入到具体某个算子时候相应的系统时间。

这个系统时间指的是执行相应操作的机器的系统时间。当一个流程序通过处理时间来运行时,所有基于时间的操作(如: 时间窗口)将使用各自操作所在的物理机的系统时间。

ProcessingTime 有最好的性能和最低的延迟。但在分布式计算环境或者异步环境中,ProcessingTime具有不确定性,相同数据流多次运行有可能产生不同的计算结果。因为它容易受到从记录到达系统的速度(例如从消息队列)到记录在系统内的operator之间流动的速度的影响(停电,调度或其他)。

提取时间

IngestionTime是数据进入Apache Flink框架的时间,是在Source Operator中设置的。每个记录将源的当前时间作为时间戳,并且后续基于时间的操作(如时间窗口)引用该时间戳。

提取时间在概念上位于事件时间和处理时间之间。与处理时间相比,它稍早一些。IngestionTime与ProcessingTime相比可以提供更可预测的结果,因为IngestionTime的时间戳比较稳定(在源处只记录一次),所以同一数据在流经不同窗口操作时将使用相同的时间戳,而对于ProcessingTime同一数据在流经不同窗口算子会有不同的处理时间戳。

与事件时间相比,提取时间程序无法处理任何无序事件或后期数据,但程序不必指定如何生成水位线。

在内部,提取时间与事件时间非常相似,但具有自动时间戳分配和自动水位线生成功能。

事件时间

事件时间就是事件在真实世界的发生时间,即每个事件在产生它的设备上发生的时间(当地时间)。比如一个点击事件的时间发生时间,是用户点击操作所在的手机或电脑的时间。

在进入Apache Flink框架之前EventTime通常要嵌入到记录中,并且EventTime也可以从记录中提取出来。在实际的网上购物订单等业务场景中,大多会使用EventTime来进行数据计算。

基于事件时间处理的强大之处在于即使在乱序事件,延迟事件,历史数据以及从备份或持久化日志中的重复数据也能获得正确的结果。对于事件时间,时间的进度取决于数据,而不是任何时钟。

事件时间程序必须指定如何生成事件时间的Watermarks,这是表示事件时间进度的机制。

现在假设我们正在创建一个排序的数据流。这意味着应用程序处理流中的乱序到达的事件,并生成同样事件但按时间戳(事件时间)排序的新数据流。

比如:

1
2
3
有1~10个事件。
乱序到达的序列是:1,2,4,5,6,3,8,9,10,7
经过按 事件时间 处理后的序列是:1,2,3,4,5,6,7,8,9,10

为了处理事件时间,Flink需要知道事件的时间戳,这意味着流中的每条数据都需要分配其事件时间戳。这通常通过提取每条数据中的固定字段来完成时间戳的获取。

设定时间特性

Flink DataStream 程序的第一部分通常是设置基本时间特性。 该设置定义了数据流源的行为方式(例如:它们是否将分配时间戳),以及像 KeyedStream.timeWindow(Time.seconds(30)) 这样的窗口操作应该使用上面哪种时间概念。

比如:

1
2
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);

0x05 Watermark

前文讲到了事件时间,这个真实发生的时间是我们业务在实时处理程序中非常关心的。在一个理想的情况下,事件时间处理将产生完全一致和确定的结果,无论事件何时到达或其排序。但是在现实中,消息不在是按照顺序发送,产生了乱序,这时候该怎么处理?

Watermark是Apache Flink为了处理EventTime 窗口计算提出的一种机制,本质上也是一种时间戳。watermark是用于处理乱序事件或延迟数据的,这通常用watermark机制结合window来实现(Watermarks用来触发window窗口计算)。

比如对于late element,我们不能无限期的等下去,必须要有个机制来保证一个特定的时间后,必须触发window去进行计算了。这个特别的机制,就是watermark。 可以把Watermark看作是一种告诉Flink一个消息延迟多少的方式。定义了什么时候不再等待更早的数据。

1. 窗口触发条件

上面谈到了对数据乱序问题的处理机制是watermark+window,那么window什么时候该被触发呢?

基于Event Time的事件处理,Flink默认的事件触发条件为:

对于out-of-order及正常的数据而言

  • watermark的时间戳 > = window endTime
  • 在 [window_start_time,window_end_time] 中有数据存在。

对于late element太多的数据而言

  • Event Time > watermark的时间戳

WaterMark相当于一个EndLine,一旦Watermarks大于了某个window的end_time,就意味着windows_end_time时间和WaterMark时间相同的窗口开始计算执行了。

就是说,我们根据一定规则,计算出Watermarks,并且设置一些延迟,给迟到的数据一些机会,也就是说正常来讲,对于迟到的数据,我只等你一段时间,再不来就没有机会了。

WaterMark时间可以用Flink系统现实时间,也可以用处理数据所携带的Event time。

使用Flink系统现实时间,在并行和多线程中需要注意的问题较少,因为都是以现实时间为标准。

如果使用处理数据所携带的Event time作为WaterMark时间,需要注意两点:

  • 因为数据到达并不是循序的,注意保存一个当前最大时间戳作为WaterMark时间
  • 并行同步问题

2. WaterMark设定方法

标点水位线(Punctuated Watermark)

标点水位线(Punctuated Watermark)通过数据流中某些特殊标记事件来触发新水位线的生成。这种方式下窗口的触发与时间无关,而是决定于何时收到标记事件。

在实际的生产中Punctuated方式在TPS很高的场景下会产生大量的Watermark在一定程度上对下游算子造成压力,所以只有在实时性要求非常高的场景才会选择Punctuated的方式进行Watermark的生成。

定期水位线(Periodic Watermark)

周期性的(允许一定时间间隔或者达到一定的记录条数)产生一个Watermark。水位线提升的时间间隔是由用户设置的,在两次水位线提升时隔内会有一部分消息流入,用户可以根据这部分数据来计算出新的水位线。

在实际的生产中Periodic的方式必须结合时间和积累条数两个维度继续周期性产生Watermark,否则在极端情况下会有很大的延时。

举个例子,最简单的水位线算法就是取目前为止最大的事件时间,然而这种方式比较暴力,对乱序事件的容忍程度比较低,容易出现大量迟到事件。

3. 迟到事件

虽说水位线表明着早于它的事件不应该再出现,但是上如上文所讲,接收到水位线以前的的消息是不可避免的,这就是所谓的迟到事件。实际上迟到事件是乱序事件的特例,和一般乱序事件不同的是它们的乱序程度超出了水位线的预计,导致窗口在它们到达之前已经关闭。

迟到事件出现时窗口已经关闭并产出了计算结果,因此处理的方法有3种:

  • 重新激活已经关闭的窗口并重新计算以修正结果。
  • 将迟到事件收集起来另外处理。
  • 将迟到事件视为错误消息并丢弃。

Flink 默认的处理方式是第3种直接丢弃,其他两种方式分别使用Side OutputAllowed Lateness

Side Output机制可以将迟到事件单独放入一个数据流分支,这会作为 window 计算结果的副产品,以便用户获取并对其进行特殊处理。

Allowed Lateness机制允许用户设置一个允许的最大迟到时长。Flink 会在窗口关闭后一直保存窗口的状态直至超过允许迟到时长,这期间的迟到事件不会被丢弃,而是默认会触发窗口重新计算。因为保存窗口状态需要额外内存,并且如果窗口计算使用了 ProcessWindowFunction API 还可能使得每个迟到事件触发一次窗口的全量计算,代价比较大,所以允许迟到时长不宜设得太长,迟到事件也不宜过多,否则应该考虑降低水位线提高的速度或者调整算法。

这里总结机制为:

  • 窗口window 的作用是为了周期性的获取数据。
  • watermark的作用是防止数据出现乱序(经常),事件时间内获取不到指定的全部数据,而做的一种保险方法。
  • allowLateNess是将窗口关闭时间再延迟一段时间。
  • sideOutPut是最后兜底操作,所有过期延迟数据,指定窗口已经彻底关闭了,就会把数据放到侧输出流。

4. 实例

采用系统时间做Watermark

我们将水位线设置为当前系统时间间-5秒。

1
2
3
override def getCurrentWatermark(): Watermark = {       
new Watermark(System.currentTimeMillis - 5000)
}

通常最好保持接收到的最大时间戳,并创建具有最大预期延迟的水位线,而不是从当前系统时间减去。

采用Event Time做watermark

例如基于Event Time的数据,自身都包含一个类型为timestamp的字段,假设叫做rowtime,例如1543903383(2018-12-04 14:03:03),定义一个基于rowtime列,策略为偏移3s的watermark,这条数据的水位线时间戳则是:

1
1543903383-3000 = 1543900383(2018-12-04 14:03:00)

该条数据的水位线时间含义:timestamp小于1543900383(2018-12-04 14:03:00)的数据,都已经到达了。

1
2
3
4
5
6
7
8
9
10
11
12
13
class BoundedOutOfOrdernessGenerator extends AssignerWithPeriodicWatermarks[MyEvent] {
   val maxOutOfOrderness = 3000L; // 3 seconds
   var currentMaxTimestamp: Long;
   override def extractTimestamp(element: MyEvent, previousElementTimestamp: Long): Long = {
       val timestamp = element.getCreationTime()
       currentMaxTimestamp = max(timestamp, currentMaxTimestamp)
       timestamp;
  }
   override def getCurrentWatermark(): Watermark = {
       // return the watermark as current highest timestamp minus the out-of-orderness bound
       new Watermark(currentMaxTimestamp - maxOutOfOrderness);
  }
}

看看如何触发窗口

我们明白了窗口的触发机制,这里我们添加了水位线,到底是个怎么个情况?我们来看下面

假如我们设置10s的时间窗口(window),那么010s,1020s都是一个窗口,以0~10s为例,0为start-time,10为end-time。假如有4个数据的event-time分别是8(A),12.5(B),9©,13.5(D),我们设置Watermarks为当前所有到达数据event-time的最大值减去延迟值3.5秒

当A到达的时候,Watermarks为max{8}-3.5=8-3.5 = 4.5 < 10,不会触发计算**
当B到达的时候,Watermarks为max(12.5,8)-3.5=12.5-3.5 = 9 < 10,不会触发计算
当C到达的时候,Watermarks为max(12.5,8,9)-3.5=12.5-3.5 = 9 < 10,不会触发计算
当D到达的时候,Watermarks为max(13.5,12.5,8,9)-3.5=13.5-3.5 = 10 = 10,触发计算
**触发计算的时候,会将A,C(因为他们都小于10)都计算进去,其中C是迟到的。

max这个很关键,就是当前窗口内,所有事件的最大事件。

这里的延迟3.5s是我们假设一个数据到达的时候,比他早3.5s的数据肯定也都到达了,这个是需要根据经验推算。假设加入D到达以后有到达了一个E,event-time=6,但是由于0~10的时间窗口已经开始计算了,所以E就丢了。

从这里上面E的丢失说明,水位线也不是万能的,但是如果根据我们自己的生产经验+侧道输出等方案,可以做到数据不丢失。

0x06 Flink源码

数据结构定义

在Flink DataStream中流动着不同的元素,统称为StreamElement,StreamElement可以是StreamRecord、Watermark、StreamStatus、LatencyMarker中任何一种类型。

StreamElement

StreamElement是一个抽象类(是Flink 承载消息的基类),其他四种类型继承StreamElement。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
public abstract class StreamElement {
 //判断是否是Watermark
 public final boolean isWatermark() {
   return getClass() == Watermark.class;
}
 //判断是否为StreamStatus
 public final boolean isStreamStatus() {
   return getClass() == StreamStatus.class;
}
 //判断是否为StreamRecord
 public final boolean isRecord() {
   return getClass() == StreamRecord.class;
}
 //判断是否为LatencyMarker
 public final boolean isLatencyMarker() {
   return getClass() == LatencyMarker.class;
}
 //转换为StreamRecord
 public final <E> StreamRecord<E> asRecord() {
   return (StreamRecord<E>) this;
}
 //转换为Watermark
 public final Watermark asWatermark() {
   return (Watermark) this;
}
 //转换为StreamStatus
 public final StreamStatus asStreamStatus() {
   return (StreamStatus) this;
}
 //转换为LatencyMarker
 public final LatencyMarker asLatencyMarker() {
   return (LatencyMarker) this;
}
}

Watermark

Watermark继承了StreamElement。Watermark 是和事件一个级别的抽象,其内部包含一个成员变量时间戳timestamp,标识当前数据的时间进度。Watermark实际上作为数据流的一部分随数据流流动。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
@PublicEvolving
public final class Watermark extends StreamElement {
 /*The watermark that signifies end-of-event-time. */
 public static final Watermark MAX_WATERMARK = new Watermark(Long.MAX_VALUE);
 /* The timestamp of the watermark in milliseconds. */
 private final long timestamp;
 /* Creates a new watermark with the given timestamp in milliseconds.*/
 public Watermarklong timestamp) {
this.timestamp = timestamp;
}
 /*Returns the timestamp associated with this {@link Watermark} in milliseconds.**/
 public long getTimestamp() {
   return timestamp;
}
}

Flink如何生成&处理Watermark

在实际使用中大多数情况下会选择周期性生成方式也就是AssignerWithPeriodicWatermarks方式.

1
2
3
4
5
6
7
8
//指定为evenTime时间语义
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
//生成watermark的周期
env.getConfig.setAutoWatermarkInterval(watermarkInterval)
//指定方式
dataStream.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[Element](Time.seconds(allowDealy)) {
  override def extractTimestamp(element: Element): Long = element.dT
})

BoundedOutOfOrdernessTimestampExtractor 是Flink内置提供的允许乱序最大延时的watermark生成方式,只需要重写其extractTimestamp方法即可。

assignTimestampsAndWatermarks 可以理解为是一个算子转换操作,等同于map/window一样理解,可以为其设置并行度、名称,也是一个transformation/operator,

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public SingleOutputStreamOperator<T> assignTimestampsAndWatermarks(
AssignerWithPeriodicWatermarks<T> timestampAndWatermarkAssigner) {

// match parallelism to input, otherwise dop=1 sources could lead to some strange
// behaviour: the watermark will creep along very slowly because the elements
// from the source go to each extraction operator round robin.
final int inputParallelism = getTransformation().getParallelism();
final AssignerWithPeriodicWatermarks<T> cleanedAssigner = clean(timestampAndWatermarkAssigner);

TimestampsAndPeriodicWatermarksOperator<T> operator =
new TimestampsAndPeriodicWatermarksOperator<>(cleanedAssigner);

return transform("Timestamps/Watermarks", getTransformation().getOutputType(), operator)
.setParallelism(inputParallelism);
}

其使用的StreamOperator类型TimestampsAndPeriodicWatermarksOperator,继承了AbstractUdfStreamOperator,实现了OneInputStreamOperator接口与ProcessingTimeCallback接口,

TimestampsAndPeriodicWatermarksOperator

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
/**
* A stream operator that extracts timestamps from stream elements and
* generates periodic watermarks.
*
* @param <T> The type of the input elements
*/
public class TimestampsAndPeriodicWatermarksOperator<T>
extends AbstractUdfStreamOperator<T, AssignerWithPeriodicWatermarks<T>>
implements OneInputStreamOperator<T, T>, ProcessingTimeCallback {

private static final long serialVersionUID = 1L;
private transient long watermarkInterval;
private transient long currentWatermark;

public TimestampsAndPeriodicWatermarksOperator(AssignerWithPeriodicWatermarks<T> assigner) {
super(assigner);
this.chainingStrategy = ChainingStrategy.ALWAYS;
}

@Override
public void open() throws Exception {
super.open();
       //初始化默认当前watermark
currentWatermark = Long.MIN_VALUE;
       //生成watermark周期时间配置
watermarkInterval = getExecutionConfig().getAutoWatermarkInterval();
       //注册定时其配置
if (watermarkInterval > 0) {
long now = getProcessingTimeService().getCurrentProcessingTime();
           //注册一个watermarkInterval后触发的定时器,传入回调参数是this,也就是会调用当前对象的onProcessingTime方法
getProcessingTimeService().registerTimer(now + watermarkInterval, this);
}
}

@Override
public void processElement(StreamRecord<T> element) throws Exception {
       //提取当前的事件时间
final long newTimestamp = userFunction.extractTimestamp(element.getValue(),
element.hasTimestamp() ? element.getTimestamp() : Long.MIN_VALUE);
       //保存当前最大的事件时间。
output.collect(element.replace(element.getValue(), newTimestamp));
}

@Override
public void onProcessingTime(long timestamp) throws Exception {
       //此方法表示的就是定时回调的方法,将符合要求的watermark发送出去并且注册下一个定时器。
// register next timer
Watermark newWatermark = userFunction.getCurrentWatermark();
       //当新的watermark大于当前的watermark
if (newWatermark != null && newWatermark.getTimestamp() > currentWatermark) {
currentWatermark = newWatermark.getTimestamp();
           //将符合要求的watermark发送出去
// emit watermark
output.emitWatermark(newWatermark);
}
       //注册下一次触发时间
long now = getProcessingTimeService().getCurrentProcessingTime();
getProcessingTimeService().registerTimer(now + watermarkInterval, this);
}

/**
* Override the base implementation to completely ignore watermarks propagated from
* upstream (we rely only on the {@link AssignerWithPeriodicWatermarks} to emit
* watermarks from here).
*/
@Override
public void processWatermark(Watermark mark) throws Exception {
       //用来处理上游发送过来的watermark,可以认为不做任何处理,下游的watermark只与其上游最近的生成方式相关。
// if we receive a Long.MAX_VALUE watermark we forward it since it is used
// to signal the end of input and to not block watermark progress downstream
if (mark.getTimestamp() == Long.MAX_VALUE && currentWatermark != Long.MAX_VALUE) {
currentWatermark = Long.MAX_VALUE;
output.emitWatermark(mark);
}
}

@Override
public void close() throws Exception {
super.close();

// emit a final watermark
Watermark newWatermark = userFunction.getCurrentWatermark();
if (newWatermark != null && newWatermark.getTimestamp() > currentWatermark) {
currentWatermark = newWatermark.getTimestamp();
// emit watermark
output.emitWatermark(newWatermark);
}
}
}

Flink如何处理迟到数据

这里我们使用 Side Output机制来说明。Side Output机制可以将迟到事件单独放入一个数据流分支,这会作为 window 计算结果的副产品,以便用户获取并对其进行特殊处理。

生成新的Watermark

Flink会替换StreamRecord 对象中的Timestamp,如果 根据当前事件的Timestamp 生成的Watermark 大于上一次的Watermark,就发出新的Watermark。

具体代码在 TimestampsAndPunctuatedWatermarksOperator.processElement。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
@Override
public void processElement(StreamRecord<T> element) throws Exception {
final T value = element.getValue();
   // 调用 用户实现的 extractTimestamp 获取新的Timestamp
final long newTimestamp = userFunction.extractTimestamp(value,
element.hasTimestamp() ? element.getTimestamp() : Long.MIN_VALUE);
   // 用新Timestamp 替换StreamRecord中的旧Timestamp
output.collect(element.replace(element.getValue(), newTimestamp));
   // 调用 用户实现的 checkAndGetNextWatermark 方法获取下一个Watermark
final Watermark nextWatermark = userFunction.checkAndGetNextWatermark(value, newTimestamp);
   // 如果下一个Watermark 大于当前Watermark,就发出新的Watermark
if (nextWatermark != null && nextWatermark.getTimestamp() > currentWatermark) {
currentWatermark = nextWatermark.getTimestamp();
output.emitWatermark(nextWatermark);
}
}

处理迟到数据

首先,判断是否是迟到数据。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
@Override
public void processElement(StreamRecord<IN> element) throws Exception {
for (W window: elementWindows) {
// drop if the window is already late
               // 如果窗口已经迟到了,则处理下一条数据
if (isWindowLate(window)) {
continue;
}  
          }
  ......
}

/**
Returns {@code true} if the watermark is after the end timestamp plus the allowed lateness of the given window.
*/
protected boolean isWindowLate(W window) {
   // 当前机制是 事件时间 && 窗口元素的最大时间戳 + 允许迟到时间 <= 当前水位线 的时候为true(即当前窗口元素迟到了)
return (windowAssigner.isEventTime() && (cleanupTime(window) <= internalTimerService.currentWatermark()));
}

/**
* Returns the cleanup time for a window, which is
* {@code window.maxTimestamp + allowedLateness}. In
* case this leads to a value greater than {@link Long#MAX_VALUE}
* then a cleanup time of {@link Long#MAX_VALUE} is
* returned.
*
* @param window the window whose cleanup time we are computing.
*/
private long cleanupTime(W window) {
if (windowAssigner.isEventTime()) {
long cleanupTime = window.maxTimestamp() + allowedLateness;
   //返回窗口的 cleanup 时间 : 窗口元素的最大时间戳 + 允许延迟的时间
return cleanupTime >= window.maxTimestamp() ? cleanupTime : Long.MAX_VALUE;
} else {
return window.maxTimestamp();
}
}

其次,处理迟到数据的具体代码在WindowOperator.processElement 方法的最后一段。这里就是旁路输出。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
@Override
public void processElement(StreamRecord<IN> element) throws Exception {
   
  ......
   // 其他操作
  ......
   
   // side output input event if element not handled by any window late arriving tag has been set
   // 如果没有window处理过这条数据,isSkippedElement = true,如果上面判断为迟到数据,isSkippedElement = false
   // windowAssigner is event time and current timestamp + allowed lateness no less than element timestamp
   if (isSkippedElement && isElementLate(element)) {
     if (lateDataOutputTag != null){
         //旁路输出
         //这就是我们之前提到的,Flink 的 Side Output 机制可以将迟到事件单独放入一个数据流分支,这会作为 window 计算结果的副产品,以便用户获取并对其进行特殊处理。
       sideOutput(element);
    } else {
       this.numLateRecordsDropped.inc();
    }
  }
}

/**
* Decide if a record is currently late, based on current watermark and allowed lateness.
* 当前机制是 事件时间 && (元素时间戳 + 允许延迟的时间) <= 当前水位线
* @param element The element to check
* @return The element for which should be considered when sideoutputs
*/
protected boolean isElementLate(StreamRecord<IN> element){
return (windowAssigner.isEventTime()) &&
(element.getTimestamp() + allowedLateness <= internalTimerService.currentWatermark());
}

/**
* Write skipped late arriving element to SideOutput.
* // 把数据输出到旁路,供用户决定如何处理。
* @param element skipped late arriving element to side output
*/
protected void sideOutput(StreamRecord<IN> element){
   output.collect(lateDataOutputTag, element);
}

0x06 参考

Flink实时性、容错机制、窗口等介绍

彻底明白Flink系统学习11:【Flink1.7】事件时间、处理时间、提取时间有什么区别

彻底明白Flink系统学习10:【Flink1.7】窗口生命周期、Keyed和非Keyed及分配器诠释

Flink 轻松理解Watermark

https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/event_time.html#event-time–processing-time–ingestion-time

http://smartsi.club/flink-stream-event-time-and-processing-time.html

Flink Event Time和WaterMark结合优势分析

Flink WaterMark(水位线)分布式执行理解

初学Flink,对Watermarks的一些理解和感悟

浅谈WaterMark

Flink WaterMark实例

Apache Flink 漫谈系列(03) - Watermark

Flink 的Event Time

Flink流计算编程–watermark(水位线)简介

Flink Watermark 机制浅析(透彻)

Flink Time和Watermark的理解

【源码解析】Flink 是如何处理迟到数据

flink之延迟数据处理watermark allowedLateness() sideOutputLateData()

Flink中Watermark定时生成源码分析