Flink sink schema 字段设计小技巧

1.1sink schema 中添加 version 版本字段

如 title,直接上实践案例和使用方式。

实践案例及使用方式

  • 「非故障场景下产出的每条记录的 version 字段值为 1」
  • 「故障场景下,可以在同一 sink 中产出 version > 1(非 1)的数据,代表故障修复数据提供给下游消费」

可应对的故障场景

上游 flink 任务 A 发生故障导致产出脏数据至 kafka X,并且下游消费方消费了脏数据(下游消费方按照下面两类进行划分):

  • 「下游为 flink 任务」:flink 任务 B 消费 kafka X 中的脏数据,结果计算并产出错误数据
  • 「下游为 OLAP 引擎以及 BI 看板:结果导致看板展示数据异常

首先介绍下避免以及处理上述问题的整体思路:

  • 「1.优化逻辑,保障上游任务稳定性」:首先通过一些优化手段,尽可能保证上游 flink 任务 A 不出现故障
  • 「2.配置作业监控报警」:针对整条链路配置对应的监控报警等,以及时发现和定位问题
  • 「3.制定故障处理、修复预案」:需要制定对应的故障处理、修复预案,一旦出现故障,需要有可处理故障的能力
  • 「4.下游针对数据源特性改进消费和处理方式」:保障即使消费了脏数据也不会对业务逻辑产生影响

下文主要介绍**「第 3 点」**,出现上述故障时修复的方案,针对以上场景,目前有如下 3 种可选方案修复数据:

  • 「方案 1 - 离线方式修复」:通过离线方式产出修复数据,对脏数据进行覆盖操作。缺点是故障修复延迟较高,需要切换离线、实时数据源,人工操作成本较高
  • 「方案 2 - 实时方式修复」:重跑修数逻辑,产出修复数据至 kafka X-fix,下游 flink 任务 B 重新从 kafka X-fix 中的指定 offset 开始消费,计算并产出正确的数据。此方案对下游 flink 任务 B 来说,需要改动代码逻辑,存在修数 topic 和原 topic 切换逻辑,修复逻辑较为复杂
  • 「方案 3 - 实时方式修复(本小节 version 字段方案)」:为避免下游产生数据源切换操作带来的高成本操作,可在原有 kafka topic 中产出修复数据,通过 version 字段区分正常产出数据以及修复数据,相对方案 1 和 2 的优点在于,不存在数据源切换逻辑,下游通过控制 version 字段值就可消费到对应的修复数据,明显降低人工操作成本,且修复逻辑相对简单

❝Note: 方案 3 需要对 Kafka X 预留一定的 buffer,否则在产出修复数据时,由于写入或读出 Kafka X 的 QPS 过高,会影响正常产出数据的任务。 ❞

1.2sink schema 中添加时间戳字段

实践案例及使用方式

有窗口场景中,sink schema 中可添加以下字段:

  • 「flink_process_start_time(long):代表 flink 窗口开始逻辑处理的时间戳」
  • 「flink_process_end_time(long):代表 flink 窗口结束逻辑处理的时间戳」
  • 「window_start(long):代表 flink 窗口开始时间戳」
  • 「window_end(long):代表 flink 窗口结束时间戳」

生产实践案例

  • 「flink_process_start_time,flink_process_end_time 在开发、测试、验数阶段可帮助用户定位数据偏差原因」
  • 「window_start,window_end 可以帮助用户定位每个窗口处理是否有丢数,及每个窗口处理的具体数据」

总结

本文主要介绍了在 sink schema 中添加 version(版本),时间戳扩展字段的小技巧,以帮助用户在生产环境中提升实时数据故障修复效率以及可用性。

2.Flink 使用 union 代替 join、cogroup

2.1需求场景分析

需求场景

需求诱诱诱来了。。。数据产品妹妹想要统计单个短视频粒度的**「点赞,播放,评论,分享,举报」五类实时指标,并且汇总成 photo_id、1 分钟时间粒度的实时视频消费宽表(即宽表字段至少为:****「photo_id + play_cnt + like_cnt + comment_cnt + share_cnt + negative_cnt + minute_timestamp」)产出至实时大屏。

问题在于对同一个视频,五类视频消费行为的触发机制以及上报时间是不同,也就决定了对实时处理来说五类行为日志对应着五个不同的数据源。sql boy 们自然就想到了 join 操作将五类消费行为日志合并,可是实时 join(cogroup) 真的那么完美咩~,下文细谈。

source 输入以及特点

首先分析下需求中的 source 特点:

  • photo_id 粒度 play(播放)、like(点赞)、comment(评论)、share(分享)、negative(举报)明细数据,****「用户播放(点赞、评论…)n 次,客户端\服务端就会上传 n 条播放(点赞、评论…)日志至数据源」
  • 五类视频消费行为日志的 source schema 都为:****「photo_id + timestamp + 其他维度」

sink 输出以及特点

sink 特点如下:

  • photo_id 粒度 play(播放)、like(点赞)、comment(评论)、share(分享)、negative(举报)****「1 分钟级别窗口聚合数据」
  • 实时视频消费宽表 sink schema 为:****「photo_id + play_cnt + like_cnt + comment_cnt + share_cnt + negative_cnt + minute_timestamp」

source、sink 样例数据

source 数据:

photo_idtimestampuser_id说明
12020/10/3 11:30:333播放
12020/10/3 11:30:334播放
12020/10/3 11:30:335播放
12020/10/3 11:30:334点赞
22020/10/3 11:30:335点赞
12020/10/3 11:30:335评论

sink 数据:

photo_idtimestampplay_cntlike_cntcomment_cnt
12020/10/3 11:30:00311
22020/10/3 11:30:00010

我们已经对数据源输入和输出有了完整的分析,那就瞧瞧有什么方案可以实现上述需求吧。

2.2实现方案

  • 方案1: 「本小节 cogroup 方案」 直接消费原始日志数据,对五类不同的视频消费行为日志使用 cogroup 或者 join 进行窗口聚合计算
  • 方案2:对五类不同的视频消费行为日志分别单独聚合计算出分钟粒度指标数据,下游再对聚合好的指标数据按照 photo_id 进行合并
  • 方案3: 「本小节 union 方案」 既然数据源 schema 相同,直接对五类不同的视频消费行为日志做 union 操作,在后续的窗口函数中对五类指标进行聚合计算。后文介绍 union 方案的设计过程

先上 cogroup 方案的示例代码。

2.2.1cogroup

cogroup 实现示例如下,示例代码直接使用了处理时间(也可替换为事件时间~),因此对数据源的时间戳做了简化(直接干掉):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
public class Cogroup {

   public static void main(String[] args) throws Exception {

       final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

       // Long -> photo_id 播放一次
       DataStream<Long> play = SourceFactory.getDataStream(xxx);
       // Long -> photo_id 点赞一次
       DataStream<Long> like = SourceFactory.getDataStream(xxx);
       // Long -> photo_id 评论一次
       DataStream<Long> comment = SourceFactory.getDataStream(xxx);
       // Long -> photo_id 分享一次
       DataStream<Long> share = SourceFactory.getDataStream(xxx);
       // Long -> photo_id 举报一次
       DataStream<Long> negative = SourceFactory.getDataStream(xxx);

       // Tuple3<Long, Long, Long> -> photo_id + play_cnt + like_cnt 播放和点赞的数据合并
       DataStream<Tuple3<Long, Long, Long>> playAndLikeCnt = play
          .coGroup(like)
          .where(KeySelectorFactory.get(Function.identity()))
          .equalTo(KeySelectorFactory.get(Function.identity()))
          .window(TumblingProcessingTimeWindows.of(Time.seconds(60)))
          .apply(xxx1);

       // Tuple4<Long, Long, Long, Long> -> photo_id + play_cnt + like_cnt + comment_cnt 播放、点赞、评论的数据合并
       DataStream<Tuple4<Long, Long, Long, Long, Long>> playAndLikeAndComment = playAndLikeCnt
          .coGroup(comment)
          .where(KeySelectorFactory.get(playAndLikeModel -> playAndLikeModel.f0))
          .equalTo(KeySelectorFactory.get(Function.identity()))
          .window(TumblingProcessingTimeWindows.of(Time.seconds(60)))
          .apply(xxx2);

       // Tuple5<Long, Long, Long, Long, Long> -> photo_id + play_cnt + like_cnt + comment_cnt + share_cnt 播放、点赞、评论、分享的数据合并
       DataStream<Tuple5<Long, Long, Long, Long, Long, Long>> playAndLikeAndCommentAndShare = playAndLikeAndComment
          .coGroup(share)
          .where(KeySelectorFactory.get(playAndLikeAndCommentModel -> playAndLikeAndCommentModel.f0))
          .equalTo(KeySelectorFactory.get(Function.identity()))
          .window(TumblingProcessingTimeWindows.of(Time.seconds(60)))
          .apply(xxx2);

       // Tuple7<Long, Long, Long, Long, Long, Long, Long> -> photo_id + play_cnt + like_cnt + comment_cnt + share_cnt + negative_cnt + minute_timestamp 播放、点赞、评论、分享、举报的数据合并
       // 同上~
       DataStream<Tuple7<Long, Long, Long, Long, Long, Long, Long>> playAndLikeAndCommentAndShare = ***;

       env.execute();
  }
}

粗暴一想,上面这样一搞不就结束了么,事情没那么简单,我们来做一个详细点的分析。

上述实现可能会存在的问题点

  • 「从 flink 消费到 play 数据源的一条数据到最终产出这条数据被聚合后的数据,整个过程的数据延迟 > 3 分钟…」
  • 「如果数据源持续增加(比如添加其他视频消费操作数据源),则整个任务算子变多,数据链路更长,任务稳定性会变差,产出数据延迟也会随着窗口计算变多,延迟更久」

「数据产品妹妹」:?,小哥哥好棒,既然问题点都分析出来了,技术小哥哥就帮人家解决一下嘛~ 「头文字 ∩ 技术小哥哥」:搞。

「头文字 ∩ 技术小哥哥」:既然可能由于过多的窗口导致数据产出延迟,job 不稳定,那有没有什么方法减少窗口数量呢,思路转换一下。直接以整个 job 中只包含一个窗口算子操作为基点,逆推一下,则有以下数据链路。 ❞

逆推链路

1 - 5 为逆推的整条链路。

  • 「1.五类指标的数据都在单个窗口中计算」
  • 「2.五类指标的窗口 model 相同」
  • 「3.keyby 中的 key 一致(photo_id)」
  • 「4.五类指标的数据源都为 photo_id 粒度,并且五类数据源的 model 都必须相同,并且可以做合并」
  • 「5.union 算子可以对五类数据源做合并!!!」

img

话不多说直接上 union 方案代码。

2.2.2union

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
public class Union {

   public static void main(String[] args) throws Exception {

       final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

       // Tuple2<Long, String> -> photo_id + "PLAY"标签
       DataStream<Tuple2<Long, String>> play = SourceFactory.getDataStream(xxx);
       // Tuple2<Long, String> -> photo_id + "LIKE"标签
       DataStream<Tuple2<Long, String>> like = SourceFactory.getDataStream(xxx);
       // Tuple2<Long, String> -> photo_id + "COMMENT"标签
       DataStream<Tuple2<Long, String>> comment = SourceFactory.getDataStream(xxx);
       // Tuple2<Long, String> -> photo_id + "SHARE"标签
       DataStream<Tuple2<Long, String>> share = SourceFactory.getDataStream(xxx);
       // Tuple2<Long, String> -> photo_id + "NEGATIVE"标签
       DataStream<Tuple2<Long, String>> negative = SourceFactory.getDataStream(xxx);

       // Tuple5<Long, Long, Long, Long> -> photo_id + play_cnt + like_cnt + comment_cnt + window_start_timestamp
       DataStream<Tuple3<Long, Long, Long>> playAndLikeCnt = play
          .union(like)
          .union(comment)
          .union(share)
          .union(negative)
          .keyBy(KeySelectorFactory.get(i -> i.f0))
          .timeWindow(Time.seconds(60))
          .process(xxx);

       env.execute();
  }
}

可以发现,无论上游数据源怎样进行变化,上述 union 方案中始终可以保持只有一个窗口算子处理和计算数据,则可以解决之前列举的数据延迟以及 flink 任务算子过多的问题。

在数据源的 schema 相同(或者不同但经过处理之后可以 format 成相同格式)的情况下,或者处理逻辑相同的话,可以使用 union 进行逻辑简化。

总结

本文首先介绍了需求场景,第二部分分析了使用 cogroup(案例代码)是如何解决此需求场景,再分析了此实现方案可能会存在一些问题,并引出了 union 解决方案的逆推和设计思路。在第三部分针对此场景使用 union 代替 cogroup 进行了一定程度上的优化。如果针对此场景,大佬们有更好的优化方案的话,期待留言喔。