FLink Function DDL支持(FLink FLIP-79翻译、总结停顿)
作者头像
  • 宝强哥
  • 2019-12-04 09:36:41 6

一、目标

FLIP-79 的主要目标是支持 function DDL,确保 SQL 语法和语义的准确性,并且能够从外部库(如 jar 或文件)注册 UDF。Flink DDL 的初始讨论由 Chen Shuyi 等人在设计文档中提出,主要涉及 Table(表)、Type(类型)和 View(视图)。FLIP-69 对 DDL 进行了扩展,涵盖了 catalog、database 和 function 方面的内容。FLIP-79 是 FLIP-69 的一部分,旨在通过社区讨论,明确现有工作的设计,并确保不同语言的 UDF 定义一致。

二、改进方案

外部需求

在深入讨论 DDL SQL 之前,我们需要讨论 Flink runtime function 的主要需求:

  1. 外部库注册

    • 该需求源自 Hive 集成,增强了 Flink 批处理功能。Hive SQL 支持类似以下语法: sql CREATE FUNCTION addfunc AS 'com.example.hiveserver2.udf.add' USING JAR 'hdfs:///path/to/jar'
  2. 语言差异

    • 由于 Scala 字节码的特殊性,从 Scala 函数中提取类型信息存在限制。同时,支持 Python UDF 是正在进行的工作之一。
    • 因此,SQL 语法需要支持多种语言,例如 MySQL 创建函数语法: sql CREATE FUNCTION hello (s CHAR(20)) RETURNS CHAR(50) DETERMINISTIC RETURN CONCAT('Hello, ', s, '!') LANGUAGE SQL
  3. 临时函数支持

    • FLIP-57 建议将临时函数和非临时函数分开处理,因为临时函数只注册在当前会话。因此,需要一个标志来区分函数的解析顺序: sql CREATE TEMPORARY FUNCTION addfunc AS 'com.example.hiveserver2.udf.add' USING JAR 'hdfs:///path/to/jar'
  4. 函数限定符

    • 函数限定符(.)的作用范围包括特定的 catalog 和 database。所有 function DDL 都需要支持以下路径格式: sql CREATE FUNCTION catalog1.db1.addfunc AS 'com.example.hiveserver2.udf.add' LANGUAGE JVM

三、Function DDL 语法

我们提出了以下 Function DDL 语法:

创建函数语句 sql CREATE [TEMPORARY|SYSTEM] FUNCTION [IF NOT EXISTS] [catalog_name.db_name.]function_name AS identifier [LANGUAGE JVM|PYTHON] [USING JAR|FILE|ARCHIVE 'resource_path' [, USING JAR|FILE|ARCHIVE 'path']*];

删除函数语句 sql DROP [TEMPORARY|SYSTEM] FUNCTION [IF EXISTS] [catalog_name.][db_name.] function_name;

修改函数语句 sql ALTER [TEMPORARY|SYSTEM] FUNCTION [IF EXISTS] [catalog_name.][db_name.] function_name RENAME TO new_name;

显示函数语句 sql SHOW FUNCTION [catalog_name.][db_name]

四、应用场景

我们希望通过 function DDL 支持尽可能多的应用场景。以下是一些明确可行的场景:

  1. 从类路径加载 UDF sql CREATE TEMPORARY FUNCTION catalog1.db1.func1 AS 'com.xxx.udf.func1UDF' LANGUAGE 'JVM' DROP FUNCTION catalog1.db1.geofence

  2. 从远程资源加载 UDF sql CREATE FUNCTION catalog1.db1.func2 AS 'com.xxx.udf.func2UDF' LANGUAGE JVM USING 'http://artifactory.uber.internal:4587/artifactory/libs-snapshot-local/com/xxx/xxx/xxx-udf/1.0.1-SNAPSHOT/xxx-udf-1.0.1-20180502.011548-12.jar'

  3. 从远程资源加载 Python UDF sql CREATE FUNCTION catalog1.db1.func3 AS 'com.xxx.udf.func3UDF' LANGUAGE 'PYTHON' USING 'http://external.resources/flink-udf.py'

五、新增或更改的公共接口

首先,需要在 CatalogFunction 接口中添加更多函数:

java public interface CatalogFunction { String getClassName(); Enum getLanguage(); // TODO Map<String, String> getProperties(); CatalogFunction copy(); Optional<List<String>> getResourcePaths(); // TODO Optional<String> getDescription(); Optional<String> getDetailedDescription(); }

其次,为了支持从外部库加载 UDF,需要在 ExecutionEnvironment 中添加一个注册外部库的函数:

java public void registerUserJarFile(String jarFile) { Path path = new Path(jarFile); this.userJars.add(path); }

在作业提交前,注册的用户 jar 将被添加到 StreamGraph 中,然后加入到 JobGraphGenerator 的 JobGraph 中。

六、资源隔离

为了考虑不同会话的类加载隔离性,可以在 StreamExecutionEnvironment 中添加一个新的接口:

java public void registerUserJarFiles(String classloaderName, String... jarFiles) { // ... }

这个接口可以使用特定的 key(即 classloaderName)注册一组 jar 文件。在外部,它使用与 registerCachedFile 类似的路径,后者使用 Flink 的 Blob 服务器将 jar 文件分发到运行时。

此外,在 RuntimeContext 中添加一个新接口,使用在 name 下注册的 jar 文件集,创建并缓存一个自定义UserCodeClassLoader。

java public ClassLoader getClassLoaderByName(String classloaderName) { // ... }

在 UDF 函数的代码生成过程中,它将把多个 jar 文件加载到自定义 ClassLoader 中,并使用反射方式调用该函数。

此外,在 RuntimeContext 实现中,我们将保留自定义 ClassLoader 的缓存,避免多次加载同一个库。

七、实施计划

Flink 1.10 版本

  1. 在 flink-sql-parser 中添加 function 相关语法。
  2. 在 flink-sql-parser 模块中定义 SqlCreateFunction 和 SqlDropFunction。
  3. 桥接 DDL 和 TableEnvironment,将 function 注册到 TableEnvironment 中。

Flink 1.10 后续版本

  1. 支持从 Java 外部资源加载 UDF(即支持 USING JAR 语句)。
  2. 添加 Scala function 相关的支持。

FLIP-65 作为 Table API UDF 的类型推断接口,阻碍了向 TableEnvImpl 中添加 Scala function。因此,上述 1、2、3 目前仅支持 Java 语言。

一旦 FLIP-65 完成,可以继续 Scala function DDL 的工作,并将相应的函数注册到 TableEnvImpl 中。

八、参考文献

  1. Flink SQL DDL 设计
  2. FLIP-69 Flink SQL DDL 强化
  3. FLIP-64 支持表模块中的临时对象
  4. FLIP-65 新类型推理
  5. FLIP-78 Flink Python UDF 环境和依赖项管理

九、社区问题和 PR 进展

  1. 支持创建、删除、修改函数语法

    • 已提交 PR GitHub Pull Request #9689,支持以下语句: sql CREATE FUNCTION [IF NOT EXISTS] [catalog_name.db_name.]function_name AS class_name; DROP FUNCTION [IF EXISTS] [catalog_name.db_name.]function_name; ALTER FUNCTION [IF EXISTS] [catalog_name.db_name.]function_name RENAME TO new_name;
  2. 为 function DDL 添加 "USING JAR/FILE/ARCHIVE" 功能

    • 子 issue FLINK-14055 正在处理。
  3. 在 {Stream}ExecutionEnvironment 中注册用户 jar 文件

    • 已提交 PR GitHub Pull Request #9841,投票中。
    本文来源:图灵汇
责任编辑: : 宝强哥
声明:本文系图灵汇原创稿件,版权属图灵汇所有,未经授权不得转载,已经协议授权的媒体下载使用时须注明"稿件来源:图灵汇",违者将依法追究责任。
    分享
FLink停顿Function翻译总结支持FLIPDDL79
    下一篇