前言
本文是 Flink SQL 系列中的一篇,更多文章请关注Flink SQL 系列文档。
内容全部是原创,如有错误,欢迎铁汁们指出。另外,未经同意,不得转载。
如何在 UDF 里读取自定义的参数
第一步:定义参数 pipeline.global-job-parameters
在FLINK 作业的 YAML 文件里加入参数: pipeline.global-job-parameters
1
| pipeline.global-job-parameters: k1:v1,k2:v2
|
第二部:UDF 里定义 open 函数
假设用户在作业里定义了:
1
| pipeline.global-job-parameters: int.value:1,fail-for-cached-file:true
|
Function 里在 open 方法获取自定义的参数值
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
| @SerialVersionUID(1L) class RichFunc1 extends ScalarFunction { var added: Int = Int.MaxValue
override def open(context: FunctionContext): Unit = { added = context.getJobParameter("int.value", "0").toInt if (context.getJobParameter("fail-for-cached-file", "false").toBoolean) { context.getCachedFile("FAIL") } }
def eval(index: Int): Int = { index + added }
override def close(): Unit = { added = Int.MaxValue } }
|