如何在 Flink 1.9 中运用 Hive?
作者头像
  • 前沿君
  • 2019-10-23 07:15:11 6

Flink与Hive集成介绍

在大数据领域,SQL引擎是不可或缺的一部分。为了提升Flink的功能,尤其是增强其在批处理方面的能力,我们决定强化FlinkSQL的功能,使用户可以通过Flink实现更多任务。Flink 1.9.0版本开始引入了与Apache Hive集成的新功能,使得用户能够通过Flink访问和操作Hive中的元数据及数据表。

设计架构

与Hive集成主要包括对元数据和实际数据表的访问,以下是该项目的架构设计:

  1. 元数据

    为了更好地访问外部系统的元数据,Flink提供了一个新的Catalog接口来替代原有的ExternalCatalog。这个新Catalog接口不仅支持多种元数据对象,还能在同一用户会话中管理多个外部系统的元数据。此外,它支持插件化,允许用户自定义实现。目前有两个Catalog实现:GenericInMemoryCatalog和HiveCatalog。前者将所有元数据保存在内存中,后者则与Hive Metastore连接,实现元数据的持久化。

  2. 表数据

    我们通过Hive Data Connector来读写Hive中的表数据。这一组件尽可能复用了Hive的Input/Output Format和SerDe等类,以确保与Hive的高度兼容性。目前支持的Hive版本为2.3.4和1.2.1。

项目进展

在Flink 1.9.0版本中,Hive集成功能作为试验性功能发布。以下是一些已经实现的功能: - 支持简单的DDL命令,例如show databasesshow tablesdescribe table等。 - 可以通过Catalog API修改Hive元数据,如创建和删除表。 - 支持读取Hive中的数据,包括分区表和非分区表。 - 支持写入非分区表。 - 支持多种文件格式,如Text、ORC、Parquet、SequenceFile等。 - 支持调用Hive中的用户自定义函数(UDF)。

然而,还有一些功能尚待完善,包括不支持INSERT OVERWRITE、写入分区表、ACID表、Bucket表和视图等。

如何使用

  1. 添加依赖

    使用Flink与Hive集成功能需要先添加相关依赖。如果是通过SQL客户端使用,需将依赖的jar文件添加到Flink的lib目录;如果是通过Table API使用,则需要将依赖添加到项目的构建配置文件中(如pom.xml)。目前支持的Hive版本包括2.3.4和1.2.1。

  2. 配置HiveCatalog

    若要与Hive交互,必须使用HiveCatalog。在SQL客户端中,需要在sql-client-defaults.yaml文件中指定所需的Catalog。例如: ```yaml catalogs:

    • name: myhive type: hive hive-conf-dir: /path/to/hiveconfdir hive-version: 2.3.4 ```

    在Table API中,可以创建并注册HiveCatalog到TableEnvironment中: java String name = "myhive"; String defaultDatabase = "default"; String hiveConfDir = "/path/to/hive_conf_dir"; String version = "2.3.4"; TableEnvironment tableEnv = ...; // 创建TableEnvironment HiveCatalog hiveCatalog = new HiveCatalog(name, defaultDatabase, hiveConfDir, version); tableEnv.registerCatalog(name, hiveCatalog); tableEnv.useCatalog(name);

  3. 读写Hive表

    配置好HiveCatalog后,即可通过SQL客户端或Table API读写Hive中的表。例如,使用SQL客户端读取表: sql Flink SQL> describe src; root |-- key: STRING |-- value: STRING Flink SQL> select * from src; Flink SQL> insert into src values ('newKey', 'newVal');

    使用Table API读写表: java TableEnvironment tableEnv = ...; // 创建TableEnvironment tableEnv.registerCatalog("myhive", hiveCatalog); tableEnv.useCatalog("myhive"); Table src = tableEnv.sqlQuery("select * from src"); tableEnv.sqlUpdate("insert into src values ('newKey', 'newVal')"); tableEnv.execute("insert into src");

  4. 支持不同的Hive版本

    Flink 1.9.0版本中支持Hive 2.3.4和1.2.1版本。如果使用的Hive版本与支持的版本不符,可以指定一个支持的版本来试用与Hive集成的功能。

  5. 执行模式与Planner的选择

    在Flink 1.9.0版本中,Hive的TableSink仅能在批处理模式下工作。建议使用新的blink Planner,因为它功能更为全面。例如,在SQL客户端中,可以在sql-client-defaults.yaml文件中指定执行模式和Planner: yaml execution: planner: blink type: batch

    对应的Table API写法如下: java EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build(); TableEnvironment tableEnv = TableEnvironment.create(settings);

后期规划

未来版本中,我们将进一步完善与Hive集成的功能,目标是在1.10.0版本中实现生产就绪。具体计划包括支持更完整的数据类型、写入分区表、支持INSERT OVERWRITE、支持视图、更完整的DDL和DML支持、支持Hive的TableSink在流处理模式下工作、测试并支持更多Hive版本以及优化Bucket表功能。

欢迎试用Flink 1.9版本中的Hive功能,如有任何问题可通过钉钉、邮件列表等方式联系我们。

    本文来源:图灵汇
责任编辑: : 前沿君
声明:本文系图灵汇原创稿件,版权属图灵汇所有,未经授权不得转载,已经协议授权的媒体下载使用时须注明"稿件来源:图灵汇",违者将依法追究责任。
    分享
何在运用FlinkHive1.9
    下一篇