0%

Distinct MapState 状态复用优化

前言

本文是 Flink SQL 系列中的一篇,更多文章请关注Flink SQL 系列文档
内容全部是原创,如有错误,欢迎铁汁们指出。另外,未经同意,不得转载。

背景

1
2
3
4
5
SELECT
channel,
COUNT(DISTINCT device_id) AS uv
FROM source_table
GROUP BY channel

上面的SQL是为了统计一个应用各个频道的UV。

有两个业务特点,频道值是可枚举的,事先可以确定的;不同频道的用户重合度很高。

假设频道的枚举值有三个值:A,B,OTHER。这个原始的查询语句,设备的集合在状态中怎么存呢?

Group Key MapState: Key MapState: Value
A 1 1
A 2 1
B 1 1
B 3 1
OTHER 3 1

DISTINCT device_id 在状态中是个 MapState。MapState 的 Key 是 设备id;Value 是一个 Long 类型的值,一共64 bit,每个bit 表示这个频道下该设备是否出现了。在这个简单的场景下,Value 值就是 1。

A 频道下有两个设备来访问,ID 分别为 1 和 2。ID 为1 的设备也访问了 B 频道。ID 为 2 的设备也访问了 OTHER 频道。

我们很容易发现,每个频道有对应的Map,而且不同频道的 Map 的 Key 有大量的重合,怎么复用这些Key 呢?

原先的解决方案

可以用社区提供的方法手动改写 SQL,来复用 MapState 的 Key。

首先是行转列操作,把频道拍到COUNT DISTINCT聚合函数的 filter 条件里,每列表示每个频道的UV 值。在输出之前用UDTF 做列转行。

1
2
3
4
5
6
7
8
9
10
11
-- 手动改写SQL 的方案 --
SELECT
TMP.channel, TMP.uv
FROM (
SELECT
COUNT(DISTINCT device_id) FILTER (WHERE channel = 'A') AS uv_1,
COUNT(DISTINCT device_id) FILTER (WHERE channel = 'B') AS uv_2,
COUNT(DISTINCT device_id) FILTER (WHERE channel = 'OTHER') AS uv_3
FROM source_table
) T1, LATERAL TABLE(
toMultipleRow(uv_1, uv_2, uv_3)) AS TMP (channel, uv)

改写后,设备集合在状态中是怎么存的呢?

Group Key MapState: Key MapState: Value
empty 1 110
empty 2 010
empty 3 011

MapState 的Value 依然是一个 Long 型的值,一共64 bit,每个bit 表示各个频道下这个设备是否出现了。

比如 ID 为1的设备,Value 值是110,表示出现在了频道 A 和 B 中;ID 为3的设备,出现在了频道 B 和 OTHER 中。

收益

可以复用 MapState 的Key,节省状态。

缺点

  1. 手动改写 SQL 比较复杂,尤其一个维度下有多个枚举值或者有多个可枚举的维度
  2. 需要自己写 UDTF 做一行转多行的转换

新的解决方式

为了提高用户的使用体验,可以在Flink 引擎层面针对这种场景自动做优化。

SQL 语法上采用标准SQL:用户在 WHERE 条件里指定 GROUP KEY 的枚举值。

引擎层面识别到这种 Pattern后,自动做节点改写。
新的方案和老的方案相比,优势是 SQL 写法简单。

1
2
3
4
5
6
7
SELECT
channel,
COUNT(DISTINCT device_id) AS uv
FROM source_table
-- WHERE 条件指定维度的枚举值 ---
WHERE channel IN ('A','B','OTHER')
GROUP BY channel

限制条件

  1. 至少有一个可枚举维度且枚举值可以事先确定
  2. 当用于窗口聚合时,窗口函数必须是行语义,即不适用于集合语义的窗口。
    这个优化会调整聚合算子的 Group Key。调整完后,当前窗口收到的数据集合可能就变了,因此这个优化不适用于具有集合语义的窗口。

    什么是行语义,什么是集合语义?
    行语义:当前这条数据属于哪个窗口只取决于当前输入数据本身。比如 TUMBLE/HOP 窗口函数。
    集合语义:当前这条数据属于哪个窗口不仅取决于当前输入数据,还取决于这个窗口收到过的历史数据集合。比如 SESSION 窗口函数。

  3. 另外各个维度值下的Distinct Key 得有重合,才可以节约状态。假设维度值是省份 id,计算各个省份下的UV,基本可以认为不同省份的 device_id 是不同的,这个时候复用 distinct key 是没有收益的。

为什么语法上不采用 Calcite 的 PIVOT/UNPOVIT 显示地表达行转列和列转行。

  1. 条件不具备,Calcite 中1.26版本才开始引入 PIVOT,1.27 版本才开始引入 UNPOVIT。
    而 Flink 1.12 版本才开始依赖 Calcite 的1.26版本,至今依然是。
  2. 用 PIVOT 和 UNPIVOT 语法来表达,SQL 会比现在的冗长很多。

适用场景

  1. 适用于无限流聚合和窗口聚合
  2. 适用于单个可枚举的维度和多个可枚举的维度
  3. 适用于简单的分组聚合和多维聚合

SQL 表示

用户在 WHERE 条件里指定 GROUP KEY 的枚举值,其他方式不变。

单个可枚举维度的 Case

1
2
3
4
5
6
7
SELECT
channel,
COUNT(DISTINCT device_id) AS uv
FROM source_table
-- WHERE 条件指定维度的枚举值 ---
WHERE channel IN ('A','B','OTHER')
GROUP BY channel

多个可枚举维度的 Case

1
2
3
4
5
6
7
8
9
SELECT
channel,
platform,
entry_source,
COUNT(DISTINCT device_id) AS uv
FROM source_table
-- WHERE 条件指定维度的枚举值 ---
WHERE channel IN ('A','B','OTHER') AND platform in ('A', 'B', 'C')
GROUP BY channel, platform, entry_source

Groupset/Cube/Rollup 下可枚举纬度的 Case

Cube/Rollup/GroupSet 情况类似,这里只以 GroupSet 举例。

1
2
3
4
5
6
7
8
9
SELECT
product,
entry_source,
COUNT(DISTINCT device_id) AS uv,
GROUPING_ID(product, entry_source) AS gid,
FROM source_table
-- WHERE 条件指定维度的枚举值 ---
WHERE product in ('A','B','OTHER')
GROUP BY GROUPING SETS ((product, entry_source),())