0%

针对天级别聚合且按分钟打点的需求,方案演进之路

前言

本文是 Flink SQL 系列中的一篇,更多文章请关注Flink SQL 系列文档
内容全部是原创,如有错误,欢迎铁汁们指出。另外,未经同意,不得转载。

背景

业务上经常有天级别聚合,按分钟打点的需求,比如计算当天的 UV和PV,但是分钟级别打点。目前 Flink 有两种方案来支持这种需求。

老方案是 Early fire 方案,即开天级别 Tumble Window,每分钟触发一次。
Early fire 用于在窗口结束前以固定周期把中间结果发给下游,发出去的结果和触发的时机没有严格的对应关系。
基于 Early fire 机制,并没有把数据做严格的窗口划分,有如下限制:

  1. 各个并发触发的时间是不对齐且不确定的
  2. max(ts) 的结果作为看板的时间戳,每个max(ts) 时间点对应的 sum(num) 值,并不保证表示这个时间点及以前的所有累计值,可能有部分这个业务时间上发生的数据打到后面的时间戳上。

因为 Early fire 机制,并没有做到数据级别严格划窗口,这种方案会出现:

  1. 漏点,导致曲线不平滑
  2. 掉坑
  3. 分维度的累计值和总维度的累计值不相同的各种问题。
    针对Early fire 方案上面的三个缺陷,虽然分别采用了一些临时方案绕过,但是这些临时方案都有缺陷,下文会详细介绍。这说明 Early fire 方案并不是解决该业务需求的完美方案。

为了彻底解决 Early fire 的痛点,推出新方案 Cumualate Window 。这种方案可以一举解决老方案上面提到的所有问题。该方案在在各项活动里得到生产验证。
本文会说明为何 Cumualate Window 方案可以一举解决这些问题。

Early fire 方案缺点剖析及临时解决方案

缺陷1:自增指标曲线有凹坑

问题描述

大部分活动作业都引入两层聚合来解决上一节中提到的数据倾斜问题。两层聚合的作业(不管使用 Unbounded Aggregate 还是 Window Aggregate)偶发出现理论上自增的聚合指标(PV/UV/…),在曲线上出现掉坑。该问题的本质原因是 Flink 引擎的 retraction 机制没有把 update before 消息和 update after 编码在同一个消息里,而是分在两个单独的消息里,下游算子也会分开处理 update before 消息和 update after 消息。以如下的 SQL 为例详细分析:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
INSERT INTO Sink
SELECT
max(server_timestamp) as server_timestamp,
sum(cash_cnt) AS cash_cnt
FROM
(
SELECT
max(cast(server_timestamp AS BIGINT)) AS server_timestamp,
tumble_start(server_timestamp, INTERVAL '1' DAY) AS w_start
FROM kwaistream_intermedia.wrapped_source_table
GROUP BY
mod(user_id, 2048),
tumble(server_timestamp,INTERVAL '1' DAY)
)t
GROUP BY w_start;

为了解决倾斜问题,里层的聚合按照 user_id 分成2048个桶计算每个桶下当天的累计 PV,外层聚合合并桶计算当天累计的总 PV 值。假设 01:02:00 这个时刻,作业的各个算子的状态如下图所示。里层聚合的两个桶的 State 分别是 (01:00:00,1)、 (01:00:00,2),外层聚合的 State 是 (01:00:00,3)。
null

01:02:01(墙上时钟)进入两条数据 (01:01:01, 1)、(01:02:00, 1),期待的曲线如下。
null

两条输入数据分别分配到 Bucket1 和 Bucket2。作业收到这两条数据后,数据流如下:

  1. Flink 引擎的 retraction 机制没有把 update before 消息和 update after 编码在同一个消息里,而是分在两个单独的消息里。Bucket1 收到 (01:01:01, 1) 后,发送两条消息,Update Before 消息 (01:00:00,1)、Update After 消息 (01:01:01,2)。Bucket2 收到 (01:02:00, 1) 后,发送两条消息, Update Before 消息(01:00:00,2),Update After 消息(01:02:00,3)。
  2. 4条数据经过网络后,外层聚合算子收到消息的顺序有可能变成 UB (01:00:00,1)、UB(01:00:00,2)、UA(01:01:01,2)、UA(01:02:00,3)。外层聚合算子依次处理这4条消息,先后发到下游 Kafka Sink 算子 8条消息 UB (01:00:00,3)、UA (01:00:00,2)、UB (01:00:00,2)、UA (00:59:00,0)、UB (00:59:00,0)、UA (01:01:01,2)、UB (01:01:01,2)、UA (01:02:00,5)。
  3. Kafka Sink 往 Kafka 写入的时候丢掉 Update Before 消息。
  4. Kafka 往 Druid 同步数据,Druid 做分钟粒度的聚合,取 1min 内的最大值。
    null
    实际的数据曲线如下,01:01 的数据比 01:00 小,原本应该自增的曲线上有凹坑。
    null

临时解决方案

在这种方案里,Window 聚合算子输出的 Retract 流,而 Flink 引擎的 retraction 机制没有把 update before 消息和 update after 编码在同一个消息里,而是分在两个单独的消息里,下游算子也会分开处理 update before 消息和 update after 消息。虽然可以暂时用临时方案绕过,即在外层聚合里引入 UDAG (sum_max_long) 替换 sum 函数,sum_max_long UDAG 里记录历史上出现过的最大的 sum 值(max_sum),发送数据时取 max(sum, max_sum)。
该方案有以下缺点:
(1)比较 hack,而且特别容易误用,这个特殊的 UDAG 只能用在递增的指标里,而且只能用在双层聚合的最外层聚合上,否则会出现正确性问题。
(2)只能做到最终的曲线不出现凹坑,即只是平滑了掉坑的点,而不能把错误的点修正成正确值。

缺陷2:当天累计UV/PV类曲线作业的不可回溯

问题描述

业务上有绘制当天累计的UV/PV 等指标曲线的需求,同时要求这类作业在主动回追数据或者作业发生failover,曲线基本和尽量保证曲线符合原始曲线。
如果只为了满足绘制天级别的数据曲线需求,Unbounded Aggregate 和 Window Aggregate 都可以满足需求。但是用 Unbounded Aggregate,一旦作业发生 failover 或者用户主动回追数据,曲线会和原始曲线有较大偏差。因此用 Unbounded Aggregate 不能满足数据曲线的可回溯性需求。因此锁定用 Window Aggregate 来解决该需求。开了1天的窗口,如果不开启 Early fire 机制,默认等窗口结束才会输出数据。这显然不满足用户的需求。基于 Early fire,可以在窗口结束前以固定周期把中间结果发给下游。但是引擎层提供的 Early fire 机制是基于 proctime ,在作业发生 failover 或者 主动回溯数据时,也会遇到和 Unbounded Aggregate 类似的问题,曲线会和原始曲线有较大偏差。如何尽量保证当天累计 UV/PV 类曲线作业的可回溯性?

临时解决方案

基于 event time 的 Early fire 方案,即开天级别窗口,基于 event timer 每分钟触发一次。

这种方案基于 Early fire 机制,并没有把数据做严格的窗口划分,有如下限制:

  1. 各个并发触发的时间是不对齐且不确定的
  2. max(ts) 的结果作为看板的时间戳,每个max(ts) 时间点对应的 sum(num) 值,并不保证表示这个时间点及以前的所有累计值,可能有部分这个业务时间上发生的数据打到后面的时间戳上。

这种方案基于 Early fire based on processing time 上改动较小,风险更小。但是由于 Early fire 机制并没有做到数据级别严格划窗口,这种方案的缺点:

  1. 回溯数据时,数据不平滑,会出现跳变。配合watermark 限速可以缓解这个问题。
  2. 分维度的累计值和总维度的累计值不相同的各种问题(见问题4)。

注意:在稀疏流的场景下,选择基于 rowtime 的 early fire 机制,某些并发上可能没有数据,导致作业整体的watermark 不前进,影响数据产出的实时性。比如在每天凌晨业务低峰期,数据量相对较少,不能保证每个并发上都有数据,作业整体 watermark 不推进,反应在用户的数据曲线上就是虽然墙上时钟已经推进到 4:00,但是看到的最新的一个时间点是 3:00。这种特殊场景,依然推荐用户沿用基于process time 的 early fire。

缺陷3:看板实时维度加和与汇总不一致

问题描述

用户反馈这种方案看板实时维度累积值加和与汇总累积值不一致。Early fire 用于在窗口结束前以固定周期把中间结果发给下游,发出去的结果和触发的时机没有严格的对应关系。基于 Early fire 机制,只能保证一天窗口结束的时候,发出来的数据分维度上汇总值加和等于总的汇总值。并不能做到看板上各个时间点分维度的汇总值求和等于总的汇总值。主要有两个原因导致:

  1. 各个并发触发的时间是不对齐且不确定的

  2. max(ts) 的结果作为看板的时间戳,每个max(ts) 时间点对应的 sum(num) 值,并不保证表示这个时间点及以前的所有累计值,可能有部分这个业务时间上发生的数据打到后面的时间戳上。

    1
    2
    3
    4
    5
    6
    7
    SELECT 
    type, sum(num), max(ts)
    FROM T GROUP BY type, TUMBLE(ts, INTERVAL '1' DAY)
    UNION ALL
    SELECT
    'ALL', sum(num), max(ts)
    FROM T GROUP BY TUMBLE(ts, INTERVAL '1' DAY)

以如下SQL 来详细分析。
输入数据
数据到达机器时间-> ts, type, num
00:01:01 -> 00:01:00,A,10
00:01:20 -> 00:01:15,A,5
00:02:10 -> 00:02:01,A,5

输出结果
计算分维度的作业上,A 所在的并发上 timer 的触发时间是 XX:XX:12,
触发时间 -> type, sum(num),max(ts)
00:01:12 -> A,10,00:01:00
00:02:12 -> A,20,00:02:01

计算总维度的作业上,timer 触发时间是 XX:XX:23
触发时间 00:01:23 -> ALL,15,00:01:15
触发时间 00:02:23 -> ALL,20, 00:02:01
在看板上观测到的效果是 00:01 这分钟,A对应的累计值10 != ALL 对应的累计值 15。

临时解决方案

把维度列的累积值拍平到多列上,输出的时候用UDTF把一行上多个累积值转成多行。
该方案有以下缺点:
(1)不能处理维度列不能枚举的Case
(2)增加了用户的开发成本:SQL 变长,而且需要开发 UDTF。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
SELECT TMP.* FROM
(
-- 各个维度的指标拍到多列上
SELECT
sum(num) filter (WHERE type = 'A') AS sum_A,
sum(num) filter (WHERE type = 'B') AS sum_B,
sum(num) AS sum_ALL,
max(ts) AS max_timestamp
FROM T GROUP BY TUMBLE(ts, INTERVAL '1' DAY)
) T1, LATERAL TABLE(
-- toMultipleRow 做一行转多行的转换
toMultipleRow(sum_A, sum_B, sum_ALL, max_timestamp))
AS TMP(
type,
sum_val,
max_timestamp)

toMultipleRow 对应的 UDTF 代码示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.table.functions.TableFunction;
import org.apache.flink.types.Row;

public class MultipleRowConverter extends TableFunction<Row> {


public void eval(Long sumA, Long sumB, Long sumAll, Long maxTimestamp) {
// 拆成三行
collect(Row.of("A", sumA, maxTimestamp));
collect(Row.of("B", sumB, maxTimestamp));
collect(Row.of("ALL", sumAll, maxTimestamp));
}

@Override
public TypeInformation<Row> getResultType() {
// 定义输出行的 Schema
return new RowTypeInfo(Types.STRING, Types.LONG, Types.LONG);
}
}

终极解决方案:Cumulate Window 方案

为了彻底解决 Early fire 的痛点,引入新方案 Cumualate Window 方案,即开渐进式窗口,即[00:0000:01],[00:0000:02].. 这种窗口。这种方案可以一举解决老方案上面提到的所有问题。

注意,Flink 开源版本里, Cumulate Window 是在 1.13 版本才开始支持,且只支持 Window Aggregate,老的 Group Window Aggregate 暂时不支持。

下面会分析 Cumualate Window 方案可以一举解决这些问题。

Cumulate Window 为何能避免老方案的三个缺陷

输出Append 流而不是 Retract 流解决缺陷1:自增指标曲线有凹坑

Cumulate Window 算子输出流是 Append 流,而不再是 Update 流,因此避免了该问题。
注意:虽然该方案解决了窗口聚合里曲线掉坑的问题,但是不能解决无限流聚合里的曲线掉坑问题。无限流聚合场景下的曲线掉坑问题的根本解决方案是完善 retraction 机制,把 update before 消息和 update after 消息作为一个原子消息发送和处理。PS:目前这个优先级不高因为于无限流聚合不好回溯历史数据,且failover时曲线会有跳变,一般需要曲线看板类的需求都使用窗口聚合。

数据按照rowtime进行严格窗口划分解决缺陷2:当天累计UV/PV类曲线作业的不可回溯

Cumulate Window 算子的触发时机是确定的,每个并发都是在 Watermark 越过每分钟窗口的时间阈值时触发一次,输出当前窗口的最终累计结果,而不再一分钟的中间多次输出。且采用窗口的 EndTime 作为看板上的打点时间戳,这个方案可以保证曲线的可回溯性。

窗口触发时间确定解决缺陷3:看板实时维度加和与汇总不一致

由于按分钟打点当天累计UV/PV类的需求都使用 Cumulate Window 方案,输出当前窗口的最终累计结果,而不再一分钟的中间多次输出。且采用窗口的 EndTime 作为看板上的打点时间戳。因此不需要用户修改 SQL 就可以解决该问题。