0%

如何在 UDF 里读取自定义的参数值

前言

本文是 Flink SQL 系列中的一篇,更多文章请关注Flink SQL 系列文档
内容全部是原创,如有错误,欢迎铁汁们指出。另外,未经同意,不得转载。

如何在 UDF 里读取自定义的参数

第一步:定义参数 pipeline.global-job-parameters

在FLINK 作业的 YAML 文件里加入参数: pipeline.global-job-parameters

1
pipeline.global-job-parameters: k1:v1,k2:v2

第二部:UDF 里定义 open 函数

假设用户在作业里定义了:

1
pipeline.global-job-parameters: int.value:1,fail-for-cached-file:true

Function 里在 open 方法获取自定义的参数值

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
@SerialVersionUID(1L)
class RichFunc1 extends ScalarFunction {
var added: Int = Int.MaxValue

override def open(context: FunctionContext): Unit = {
added = context.getJobParameter("int.value", "0").toInt
if (context.getJobParameter("fail-for-cached-file", "false").toBoolean) {
context.getCachedFile("FAIL")
}
}

def eval(index: Int): Int = {
index + added
}

override def close(): Unit = {
added = Int.MaxValue
}
}