0%

窗口聚合支持多维分析

前言

本文是 Flink SQL 系列中的一篇,更多文章请关注Flink SQL 系列文档
内容全部是原创,如有错误,欢迎铁汁们指出。另外,未经同意,不得转载。

进入正题

Flink SQL 里提供窗口聚合的功能,开源 Flink 1.13 版本之前窗口聚合的需求采用 Group Window Aggregate 语法表示。在 1.13 版本引入 Window TVF 的概念,官方推荐窗口聚合的需求尽量使用 Window TVF Aggregate 语法,原有的 Group Window Aggregate 的语法依然保留,但是会在未来的某个版本里会被废弃。

开源版本提供的 Window TVF Aggregate 已经支持 Rollup/Cube/GroupSets 的语法,用法和无限流聚合使用 Rollup/Cube/GroupSets 的方式基本一致。但是,社区版本并没有提供基于 Group Window Aggregate 的多维分析能力,笔者开发了一个提供基于 Group Window Aggrgeate 多维分析的 PR,有需要的老铁可以把这个 PR CP 到自己的 Flink 版本里使用。

使用 Window Aggregate 进行多维分析

开源版本提供的 Window TVF Aggregate 已经支持 Rollup/Cube/GroupSets 的语法,用法和无限流聚合使用 Rollup/Cube/GroupSets 的方式基本一致。
下面我们来介绍如何基于 Window TVF Aggregate 进行多维分析,这里以 Groupset 举例,Cube 和 Rollup 的例子可以参考官网文档 Window TVF aggregation

1
2
3
4
5
6
7
8
9
10
11
SELECT 
CASE WHEN GROUPING_ID(task_id, theme_id) = 1 THEN task_id
ELSE 'ALL'
END AS task_id,
CASE WHEN GROUPING_ID(task_id, theme_id) = 2 THEN theme_id
ELSE 'ALL' END AS theme_id,
COUNT(DISTINCT user_id) as uv,
window_end
FROM TABLE(
TUMBLE(TABLE MyTable, DESCRIPTOR(rowtime), INTERVAL '1' MINUTE))
GROUP BY window_start, window_end, GROUPING SETS((), (task_id), (theme_id))

上面的 SQL 的结果和下面的 SQL 结果是一样的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
SELECT
'ALL' AS task_id,
'ALL' AS theme_id,
COUNT(DISTINCT user_id) as uv,
window_end
FROM TABLE(
TUMBLE(TABLE MyTable, DESCRIPTOR(rowtime), INTERVAL '1' MINUTE))
GROUP BY window_start, window_end
UNION ALL
SELECT
task_id,
'ALL' AS theme_id,
COUNT(DISTINCT user_id) as uv,
window_end
FROM TABLE(
TUMBLE(TABLE MyTable, DESCRIPTOR(rowtime), INTERVAL '1' MINUTE))
GROUP BY window_start, window_end, task_id
UNION ALL
SELECT
'ALL' AS task_id,
theme_id,
COUNT(DISTINCT user_id) as uv,
window_end
FROM TABLE(
TUMBLE(TABLE MyTable, DESCRIPTOR(rowtime), INTERVAL '1' MINUTE))
GROUP BY window_start, window_end, theme_id

使用 Group Window Aggregate 进行多维分析

原有的 Group Window Aggregate 的语法依然保留,但是会在未来的某个版本里会被废弃。因此推荐使用 Window Aggregate 来解决。
而且,社区版本并没有提供基于 Group Window Aggregate 的多维分析能力,笔者开发了一个提供基于 Group Window Aggrgeate 多维分析的 PR,有需要的老铁可以把这个 PR CP 到自己的 Flink 版本里使用。

1
2
3
4
5
6
7
8
9
10
11
12
SELECT
CASE WHEN GROUPING_ID(task_id, theme_id) = 1 THEN task_id
ELSE 'ALL'
END AS task_id,
CASE WHEN GROUPING_ID(task_id, theme_id) = 2 THEN theme_id
ELSE 'ALL' END AS theme_id,
COUNT(DISTINCT user_id) as uv,
TUMBLE_END(rowtime, INTERVAL '1' MINUTE)
FROM MyTable
GROUP BY
GROUPING SETS((), (task_id), (theme_id)),
TUMBLE(rowtime, INTERVAL '1' MINUTE)

上面的 SQL 的结果和下面的 SQL 结果是一样的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
SELECT
'ALL' AS task_id,
'ALL' AS theme_id,
COUNT(DISTINCT user_id) as uv,
TUMBLE_END(rowtime, INTERVAL '1' MINUTE)
FROM MyTable
GROUP BY TUMBLE(rowtime, INTERVAL '1' MINUTE)
UNION ALL
SELECT
task_id,
'ALL' AS theme_id,
COUNT(DISTINCT user_id) as uv,
TUMBLE_END(rowtime, INTERVAL '1' MINUTE)
FROM MyTable
GROUP BY task_id, TUMBLE(rowtime, INTERVAL '1' MINUTE)
UNION ALL
SELECT
'ALL' AS task_id,
theme_id,
COUNT(DISTINCT user_id) as uv,
TUMBLE_END(rowtime, INTERVAL '1' MINUTE)
FROM MyTable
GROUP BY theme_id, TUMBLE(rowtime, INTERVAL '1' MINUTE)