前言
本文是 Flink SQL 系列中的一篇,更多文章请关注Flink SQL 系列文档。
内容全部是原创,如有错误,欢迎铁汁们指出。另外,未经同意,不得转载。
背景
1 | SELECT |
上面的SQL是为了统计一个应用各个频道的UV。
有两个业务特点,频道值是可枚举的,事先可以确定的;不同频道的用户重合度很高。
假设频道的枚举值有三个值:A,B,OTHER。这个原始的查询语句,设备的集合在状态中怎么存呢?
Group Key | MapState: Key | MapState: Value |
---|---|---|
A | 1 | 1 |
A | 2 | 1 |
B | 1 | 1 |
B | 3 | 1 |
OTHER | 3 | 1 |
DISTINCT device_id 在状态中是个 MapState。MapState 的 Key 是 设备id;Value 是一个 Long 类型的值,一共64 bit,每个bit 表示这个频道下该设备是否出现了。在这个简单的场景下,Value 值就是 1。
A 频道下有两个设备来访问,ID 分别为 1 和 2。ID 为1 的设备也访问了 B 频道。ID 为 2 的设备也访问了 OTHER 频道。
我们很容易发现,每个频道有对应的Map,而且不同频道的 Map 的 Key 有大量的重合,怎么复用这些Key 呢?
原先的解决方案
可以用社区提供的方法手动改写 SQL,来复用 MapState 的 Key。
首先是行转列操作,把频道拍到COUNT DISTINCT聚合函数的 filter 条件里,每列表示每个频道的UV 值。在输出之前用UDTF 做列转行。
1 | -- 手动改写SQL 的方案 -- |
改写后,设备集合在状态中是怎么存的呢?
Group Key | MapState: Key | MapState: Value |
---|---|---|
empty | 1 | 110 |
empty | 2 | 010 |
empty | 3 | 011 |
MapState 的Value 依然是一个 Long 型的值,一共64 bit,每个bit 表示各个频道下这个设备是否出现了。
比如 ID 为1的设备,Value 值是110,表示出现在了频道 A 和 B 中;ID 为3的设备,出现在了频道 B 和 OTHER 中。
收益
可以复用 MapState 的Key,节省状态。
缺点
- 手动改写 SQL 比较复杂,尤其一个维度下有多个枚举值或者有多个可枚举的维度
- 需要自己写 UDTF 做一行转多行的转换
新的解决方式
为了提高用户的使用体验,可以在Flink 引擎层面针对这种场景自动做优化。
SQL 语法上采用标准SQL:用户在 WHERE 条件里指定 GROUP KEY 的枚举值。
引擎层面识别到这种 Pattern后,自动做节点改写。
新的方案和老的方案相比,优势是 SQL 写法简单。
1 | SELECT |
限制条件
- 至少有一个可枚举维度且枚举值可以事先确定
- 当用于窗口聚合时,窗口函数必须是行语义,即不适用于集合语义的窗口。
这个优化会调整聚合算子的 Group Key。调整完后,当前窗口收到的数据集合可能就变了,因此这个优化不适用于具有集合语义的窗口。什么是行语义,什么是集合语义?
行语义:当前这条数据属于哪个窗口只取决于当前输入数据本身。比如 TUMBLE/HOP 窗口函数。
集合语义:当前这条数据属于哪个窗口不仅取决于当前输入数据,还取决于这个窗口收到过的历史数据集合。比如 SESSION 窗口函数。 - 另外各个维度值下的Distinct Key 得有重合,才可以节约状态。假设维度值是省份 id,计算各个省份下的UV,基本可以认为不同省份的 device_id 是不同的,这个时候复用 distinct key 是没有收益的。
为什么语法上不采用 Calcite 的 PIVOT/UNPOVIT 显示地表达行转列和列转行。
- 条件不具备,Calcite 中1.26版本才开始引入 PIVOT,1.27 版本才开始引入 UNPOVIT。
而 Flink 1.12 版本才开始依赖 Calcite 的1.26版本,至今依然是。- 用 PIVOT 和 UNPIVOT 语法来表达,SQL 会比现在的冗长很多。
适用场景
- 适用于无限流聚合和窗口聚合
- 适用于单个可枚举的维度和多个可枚举的维度
- 适用于简单的分组聚合和多维聚合
SQL 表示
用户在 WHERE 条件里指定 GROUP KEY 的枚举值,其他方式不变。
单个可枚举维度的 Case
1 | SELECT |
多个可枚举维度的 Case
1 | SELECT |
Groupset/Cube/Rollup 下可枚举纬度的 Case
Cube/Rollup/GroupSet 情况类似,这里只以 GroupSet 举例。
1 | SELECT |