前言
本文是 Flink SQL 系列中的一篇,更多文章请关注Flink SQL 系列文档。
内容全部是原创,如有错误,欢迎铁汁们指出。另外,未经同意,不得转载。
背景
业务同学反馈使用 CUMULATE 窗口聚合计算 UV,从 savepoint 恢复后数据曲线偶尔会出现数据尖刺。
问题已经修复到 Flink 1.15 版本。这篇文章主要分析这个问题是怎么产生的以及解决方案。
复现问题的条件
数据源乱序比较严重,导致从 savepoint 恢复作业后,watermark 重建出的新 watermark 回退了。
问题分析
该问题的根本原因是受两点原因综合影响:
- CUMULATE WINDOW 聚合算子对迟到数据的处理方式
- watermark 不记录到状态里,从savepoint/checkpoint 恢复后 watermark 会重建
下面分别说明这两个原因。
CUMULATE WINDOW 聚合算子对迟到数据的处理方式
目前 Cumulate Window Aggregate 对迟到数据的处理方式是判断该数据所属的大窗口是否已经迟到,如果大窗口迟到了,直接丢弃;如果大窗口还没有迟到,把该条数据记录到 merged state 里,不会修正已经发出的窗口数据,但是后续的同属于一个大窗口的结果数据会算上这条记录。
1 | SELECT window_start, window_end, COUNT(USER_ID) |
以上面的例子为例,当 watermark 到了 2021-10-11 11:01,往下游发送 [00:00, 11:01) 窗口的结果,比如是 INSERT (00:00, 11:01, 4)。
这个时候来了一个迟到数据,event time 是 2021-10-11 11:00,这条数据所属的大窗口还没有迟到,因此直接把该条记录记录到 merged state 里,后续窗口的结果(比如 [00:00, 11:02),[00:00, 11:03)… )会算上该条迟到数据,但是不会修正已经发送出去的窗口数据。
从savepoint/checkpoint 恢复后 Watermark 重建
目前 FLINK 里,watermark 不会记录到 state 里,从savepoint 和 checkpoint 恢复后,watermark 会发生重建,这会带来一些非预期的行为,比如原本的迟到数据再重启后被当成了非迟到数据。
还以上面的例子来说,当 watermark 到了 2021-10-11 11:01,往下游发送 [00:00, 11:01) 窗口的结果,比如是 INSERT (00:00, 11:01, 4)。
这个时候做 savepoint,并让作业从 savepoint 重启,由于 watermark 会重建,重建后的 watermark 可能会比重启前的小,尤其是遇到乱序数据时,比如 watermark 可能回退到了11:00,
这个时候来了一条数据,event time 是 2021-10-11 11:00。这个数据在重启前会被认为是迟到数据,但是重启后,由于发生了 watermark 重建,该条数据不会被认为是迟到数据,当 watermark 再次到达 11:01 时,会往下游再次发送一条窗口是 [00:00, 11:01] 的结果。
因此下游会收到两次 [00:00, 11:01] 窗口的 INSERT 结果。可能会引发数据质量问题,比如外层进行合桶聚合时。
解决方案
上面介绍了,该问题的根本原因是受两点原因综合影响:
- CUMULATE WINDOW 聚合算子对迟到数据的处理方式
- Watermark 不记录到状态里,从savepoint/checkpoint 恢复后 watermark 会重建
相应的,解决方案也有两种,只需要破坏上面的两点原因中的任何一点即可。
- 把 Watermark 记录到 Cumulate Window 聚合算子 的 state 里。
- 修改 CUMULATE WINDOW 聚合算子对迟到的数据方式,对已经发送的窗口数据,以 retract 流的方式进行更新。但是不推荐这种方案,因为
a. 实现上需要对 CUMULATE WINDOW 聚合算子进行较大的修改,调整状态保留机制和对迟到数据的处理方式
b. 我们不希望 CUMULATE WINDOW 算子发送 retract 流,否则又可能会出现两级聚合后带来的数据曲线问题,见文章 Flink SQL 系列文章: 针对天级别聚合且按分钟打点的需求,方案演进之路
因此,我们选择方案1的方式来修复。把 Watermark 记录到 Window 算子的 Operator State 里。感兴趣的同学可以参考 FLINK-24501 了解细节。
另外,不光 Window Operator 存在这个问题,IntervalJoin 等依赖 rowtime 的算子都存在这个问题,根本的解决方案是考虑一个通用的方案,当作业依赖 watermark 时,把 watermark 记录到 State 里。