0%

如何解决无限流聚合上 Distinct Key 的倾斜问题

前言

本文是 Flink SQL 系列中的一篇,更多文章请关注Flink SQL 系列文档
内容全部是原创,如有错误,欢迎铁汁们指出。另外,未经同意,不得转载。

背景

1
SELECT day, COUNT(DISTINCT user_id) AS UV FROM T GROUP BY day

对于计算 UV 的业务,比如下面的 SQL。COUNT DISTINCT 在 Local 聚合时,如果 DISINCT KEY 的去重率不高,Global 节点可能仍然存在热点。
如何解决该问题

解决方案

方案1:手动改写 SQL

第一种方式是手动改写SQL变成两层聚合(增加按照 distinct key 取模的打散层),比如改写成如下 SQL。

1
2
3
4
5
6
7
SELECT day, SUM(part_uv) total FROM (

SELECT day, COUNT(DISTINCT user_id) as part_uv FROM T

GROUP BY day, MOD(user_id, 1024)

) GROUP BY day

方案1的缺点

这种做法有两个缺点:

  • 用户需要手动改写SQL,增加了开发难度。

  • 很难处理有多个 COUNT DISTINCT 聚合函数的场景。比如同时计算 COUNT (DISTINCT user_id), COUNT DISTINCT(buyer_id) 的场景。

方案2: 引擎提供的自动打散策略

使用 Flink 框架提供的自动打散策略,即 Partial Final 优化,无需改写 SQL。更多关于引擎层 Partial Final 自动优化请参考社区的文章:拆分 distinct 聚合

相关配置参数如下:

1
2
3
4
5
6
7
# 显示打开自动打散策略,默认是 false

table.optimizer.distinct-agg.split.enabled : true

# 默认值1024,可以根据业务数据量和热点情况,设置这个取模值

table.optimizer.distinct-agg.split.bucket-num : 1024

如何判断是否生效:

  • Flink Web UI 观察最终生成拓扑图的节点名中是否包含 Expand 节点

  • 原来一层的聚合变成两层的聚合

  • 里层的聚合 Key 除了SQL的 GroupBy Key,还多了 Distinct Key

方案2的缺点

这种做法可以灵活的处理多个 COUNT DISTINCT 的需求,但是有以下缺点:

聚合里只可以包含如下聚合函数,暂不支持其他聚合函数的自动拆散,COUNT/AVG/MIN/MAX/SUM/LISTAGG/FIRST_VALUE/LAST_VALUE。