在大数据领域,SQL引擎是不可或缺的一部分。为了提升Flink的功能,尤其是增强其在批处理方面的能力,我们决定强化FlinkSQL的功能,使用户可以通过Flink实现更多任务。Flink 1.9.0版本开始引入了与Apache Hive集成的新功能,使得用户能够通过Flink访问和操作Hive中的元数据及数据表。
与Hive集成主要包括对元数据和实际数据表的访问,以下是该项目的架构设计:
元数据
为了更好地访问外部系统的元数据,Flink提供了一个新的Catalog接口来替代原有的ExternalCatalog。这个新Catalog接口不仅支持多种元数据对象,还能在同一用户会话中管理多个外部系统的元数据。此外,它支持插件化,允许用户自定义实现。目前有两个Catalog实现:GenericInMemoryCatalog和HiveCatalog。前者将所有元数据保存在内存中,后者则与Hive Metastore连接,实现元数据的持久化。
表数据
我们通过Hive Data Connector来读写Hive中的表数据。这一组件尽可能复用了Hive的Input/Output Format和SerDe等类,以确保与Hive的高度兼容性。目前支持的Hive版本为2.3.4和1.2.1。
在Flink 1.9.0版本中,Hive集成功能作为试验性功能发布。以下是一些已经实现的功能:
- 支持简单的DDL命令,例如show databases
、show tables
、describe table
等。
- 可以通过Catalog API修改Hive元数据,如创建和删除表。
- 支持读取Hive中的数据,包括分区表和非分区表。
- 支持写入非分区表。
- 支持多种文件格式,如Text、ORC、Parquet、SequenceFile等。
- 支持调用Hive中的用户自定义函数(UDF)。
然而,还有一些功能尚待完善,包括不支持INSERT OVERWRITE、写入分区表、ACID表、Bucket表和视图等。
添加依赖
使用Flink与Hive集成功能需要先添加相关依赖。如果是通过SQL客户端使用,需将依赖的jar文件添加到Flink的lib目录;如果是通过Table API使用,则需要将依赖添加到项目的构建配置文件中(如pom.xml)。目前支持的Hive版本包括2.3.4和1.2.1。
配置HiveCatalog
若要与Hive交互,必须使用HiveCatalog。在SQL客户端中,需要在sql-client-defaults.yaml
文件中指定所需的Catalog。例如:
```yaml
catalogs:
在Table API中,可以创建并注册HiveCatalog到TableEnvironment中:
java
String name = "myhive";
String defaultDatabase = "default";
String hiveConfDir = "/path/to/hive_conf_dir";
String version = "2.3.4";
TableEnvironment tableEnv = ...; // 创建TableEnvironment
HiveCatalog hiveCatalog = new HiveCatalog(name, defaultDatabase, hiveConfDir, version);
tableEnv.registerCatalog(name, hiveCatalog);
tableEnv.useCatalog(name);
读写Hive表
配置好HiveCatalog后,即可通过SQL客户端或Table API读写Hive中的表。例如,使用SQL客户端读取表:
sql
Flink SQL> describe src;
root
|-- key: STRING
|-- value: STRING
Flink SQL> select * from src;
Flink SQL> insert into src values ('newKey', 'newVal');
使用Table API读写表:
java
TableEnvironment tableEnv = ...; // 创建TableEnvironment
tableEnv.registerCatalog("myhive", hiveCatalog);
tableEnv.useCatalog("myhive");
Table src = tableEnv.sqlQuery("select * from src");
tableEnv.sqlUpdate("insert into src values ('newKey', 'newVal')");
tableEnv.execute("insert into src");
支持不同的Hive版本
Flink 1.9.0版本中支持Hive 2.3.4和1.2.1版本。如果使用的Hive版本与支持的版本不符,可以指定一个支持的版本来试用与Hive集成的功能。
执行模式与Planner的选择
在Flink 1.9.0版本中,Hive的TableSink仅能在批处理模式下工作。建议使用新的blink Planner,因为它功能更为全面。例如,在SQL客户端中,可以在sql-client-defaults.yaml
文件中指定执行模式和Planner:
yaml
execution:
planner: blink
type: batch
对应的Table API写法如下:
java
EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build();
TableEnvironment tableEnv = TableEnvironment.create(settings);
未来版本中,我们将进一步完善与Hive集成的功能,目标是在1.10.0版本中实现生产就绪。具体计划包括支持更完整的数据类型、写入分区表、支持INSERT OVERWRITE、支持视图、更完整的DDL和DML支持、支持Hive的TableSink在流处理模式下工作、测试并支持更多Hive版本以及优化Bucket表功能。
欢迎试用Flink 1.9版本中的Hive功能,如有任何问题可通过钉钉、邮件列表等方式联系我们。