FLIP-79 的主要目标是支持 function DDL,确保 SQL 语法和语义的准确性,并且能够从外部库(如 jar 或文件)注册 UDF。Flink DDL 的初始讨论由 Chen Shuyi 等人在设计文档中提出,主要涉及 Table(表)、Type(类型)和 View(视图)。FLIP-69 对 DDL 进行了扩展,涵盖了 catalog、database 和 function 方面的内容。FLIP-79 是 FLIP-69 的一部分,旨在通过社区讨论,明确现有工作的设计,并确保不同语言的 UDF 定义一致。
在深入讨论 DDL SQL 之前,我们需要讨论 Flink runtime function 的主要需求:
外部库注册
sql
CREATE FUNCTION addfunc AS 'com.example.hiveserver2.udf.add' USING JAR 'hdfs:///path/to/jar'
语言差异
sql
CREATE FUNCTION hello (s CHAR(20)) RETURNS CHAR(50) DETERMINISTIC RETURN CONCAT('Hello, ', s, '!') LANGUAGE SQL
临时函数支持
sql
CREATE TEMPORARY FUNCTION addfunc AS 'com.example.hiveserver2.udf.add' USING JAR 'hdfs:///path/to/jar'
函数限定符
sql
CREATE FUNCTION catalog1.db1.addfunc AS 'com.example.hiveserver2.udf.add' LANGUAGE JVM
我们提出了以下 Function DDL 语法:
创建函数语句
sql
CREATE [TEMPORARY|SYSTEM] FUNCTION [IF NOT EXISTS] [catalog_name.db_name.]function_name AS identifier [LANGUAGE JVM|PYTHON] [USING JAR|FILE|ARCHIVE 'resource_path' [, USING JAR|FILE|ARCHIVE 'path']*];
删除函数语句
sql
DROP [TEMPORARY|SYSTEM] FUNCTION [IF EXISTS] [catalog_name.][db_name.] function_name;
修改函数语句
sql
ALTER [TEMPORARY|SYSTEM] FUNCTION [IF EXISTS] [catalog_name.][db_name.] function_name RENAME TO new_name;
显示函数语句
sql
SHOW FUNCTION [catalog_name.][db_name]
我们希望通过 function DDL 支持尽可能多的应用场景。以下是一些明确可行的场景:
从类路径加载 UDF
sql
CREATE TEMPORARY FUNCTION catalog1.db1.func1 AS 'com.xxx.udf.func1UDF' LANGUAGE 'JVM'
DROP FUNCTION catalog1.db1.geofence
从远程资源加载 UDF
sql
CREATE FUNCTION catalog1.db1.func2 AS 'com.xxx.udf.func2UDF' LANGUAGE JVM USING 'http://artifactory.uber.internal:4587/artifactory/libs-snapshot-local/com/xxx/xxx/xxx-udf/1.0.1-SNAPSHOT/xxx-udf-1.0.1-20180502.011548-12.jar'
从远程资源加载 Python UDF
sql
CREATE FUNCTION catalog1.db1.func3 AS 'com.xxx.udf.func3UDF' LANGUAGE 'PYTHON' USING 'http://external.resources/flink-udf.py'
首先,需要在 CatalogFunction 接口中添加更多函数:
java
public interface CatalogFunction {
String getClassName();
Enum getLanguage(); // TODO
Map<String, String> getProperties();
CatalogFunction copy();
Optional<List<String>> getResourcePaths(); // TODO
Optional<String> getDescription();
Optional<String> getDetailedDescription();
}
其次,为了支持从外部库加载 UDF,需要在 ExecutionEnvironment 中添加一个注册外部库的函数:
java
public void registerUserJarFile(String jarFile) {
Path path = new Path(jarFile);
this.userJars.add(path);
}
在作业提交前,注册的用户 jar 将被添加到 StreamGraph 中,然后加入到 JobGraphGenerator 的 JobGraph 中。
为了考虑不同会话的类加载隔离性,可以在 StreamExecutionEnvironment 中添加一个新的接口:
java
public void registerUserJarFiles(String classloaderName, String... jarFiles) {
// ...
}
这个接口可以使用特定的 key(即 classloaderName)注册一组 jar 文件。在外部,它使用与 registerCachedFile 类似的路径,后者使用 Flink 的 Blob 服务器将 jar 文件分发到运行时。
此外,在 RuntimeContext 中添加一个新接口,使用在 name 下注册的 jar 文件集,创建并缓存一个自定义UserCodeClassLoader。
java
public ClassLoader getClassLoaderByName(String classloaderName) {
// ...
}
在 UDF 函数的代码生成过程中,它将把多个 jar 文件加载到自定义 ClassLoader 中,并使用反射方式调用该函数。
此外,在 RuntimeContext 实现中,我们将保留自定义 ClassLoader 的缓存,避免多次加载同一个库。
Flink 1.10 版本
Flink 1.10 后续版本
FLIP-65 作为 Table API UDF 的类型推断接口,阻碍了向 TableEnvImpl 中添加 Scala function。因此,上述 1、2、3 目前仅支持 Java 语言。
一旦 FLIP-65 完成,可以继续 Scala function DDL 的工作,并将相应的函数注册到 TableEnvImpl 中。
支持创建、删除、修改函数语法
sql
CREATE FUNCTION [IF NOT EXISTS] [catalog_name.db_name.]function_name AS class_name;
DROP FUNCTION [IF EXISTS] [catalog_name.db_name.]function_name;
ALTER FUNCTION [IF EXISTS] [catalog_name.db_name.]function_name RENAME TO new_name;
为 function DDL 添加 "USING JAR/FILE/ARCHIVE" 功能
在 {Stream}ExecutionEnvironment 中注册用户 jar 文件