Flink 1.9 实战:运用 SQL 读取 Kafka 并写入 MySQL
作者头像
  • 黑科技评测室
  • 2019-10-23 07:05:16 9

上周六在深圳分享了《Flink SQL 1.9.0 技术内情和最佳实践》,会后不少听众对最后演示环节中的 Demo 代码表示出了浓厚兴趣,迫切希望能够亲自尝试一下。因此,本文旨在分享这段代码,希望能对 Flink SQL 初学者有所帮助。完整分享内容可以查看 Meetup 的视频回顾:https://developer.aliyun.com/live/1416。

这段代码已开源至 GitHub:https://github.com/wuchong/flink-sql-submit

代码主要分为两部分:一是用于提交 SQL 文件的 SqlSubmit 实现;二是包含 SQL 示例、Kafka 启动脚本、测试数据集以及 Kafka 数据源生成器。

学习要点

通过本次实战,你可以掌握: 1. 如何使用 Blink Planner 2. 如何通过 DDL 创建 Kafka 源表和 MySQL 结果表 3. 如何运行一个从 Kafka 读取数据、计算 PV 和 UV,并将结果写入 MySQL 的任务 4. 如何设置调优参数并观察其对任务性能的影响

SqlSubmit 实现

起初,作者考虑使用 SQL Client 来实现整个演示过程,但由于 Flink 1.9 版本的 SQL CLI 不支持处理 CREATE TABLE 语句,作者最终决定自己编写一个简单的提交脚本。这样做不仅可以让听众了解如何通过 SQL 方式使用 Flink SQL,还能让他们了解编程方式的应用。

SqlSubmit 的主要功能是执行并提交一个 SQL 文件。其实现相对简单,通过正则表达式匹配每个语句块。如果是 CREATE TABLE 或 INSERT INTO 开头,则调用 tEnv.sqlUpdate() 方法;如果是 SET 开头,则将配置设置到 TableConfig 上。以下是核心代码片段:

```java EnvironmentSettings settings = EnvironmentSettings.newInstance() .useBlinkPlanner() .inStreamingMode() .build(); TableEnvironment tEnv = TableEnvironment.create(settings);

List sql = Files.readAllLines(path); List calls = SqlCommandParser.parse(sql);

for (SqlCommandCall call : calls) { switch (call.command) { case SET: String key = call.operands[0]; String value = call.operands[1]; tEnv.getConfig().getConfiguration().setString(key, value); break; case CREATETABLE: String ddl = call.operands[0]; tEnv.sqlUpdate(ddl); break; case INSERTINTO: String dml = call.operands[0]; tEnv.sqlUpdate(dml); break; default: throw new RuntimeException("Unsupported command: " + call.command); } }

tEnv.execute("SQL Job"); ```

使用 DDL 连接 Kafka 源表

在 flink-sql-submit 项目中,有一份测试数据集(来自阿里云天池公开数据集),位于 src/main/resources/user_behavior.log。数据以 JSON 格式编码,示例如下:

json {"user_id": "543462", "item_id":"1715", "category_id": "1464116", "behavior": "pv", "ts": "2017-11-26T01:00:00Z"} {"user_id": "662867", "item_id":"2244074", "category_id": "1575622", "behavior": "pv", "ts": "2017-11-26T01:00:00Z"}

为了模拟真实的 Kafka 数据源,作者编写了一个 source-generator.sh 脚本(感兴趣的读者可以查看源码),该脚本会自动读取 user_behavior.log 文件的数据,并以默认每毫秒一条的速度发送到 Kafka 的 user_behavior 主题中。

有了数据源之后,我们可以通过 DDL 创建并连接 Kafka 主题(详见 src/main/resources/q1.sql):

sql CREATE TABLE user_log ( user_id VARCHAR, item_id VARCHAR, category_id VARCHAR, behavior VARCHAR, ts TIMESTAMP ) WITH ( 'connector.type' = 'kafka', 'connector.version' = 'universal', 'connector.topic' = 'user_behavior', 'connector.startup-mode' = 'earliest-offset', 'connector.properties.0.key' = 'zookeeper.connect', 'connector.properties.0.value' = 'localhost:2181', 'connector.properties.1.key' = 'bootstrap.servers', 'connector.properties.1.value' = 'localhost:9092', 'update-mode' = 'append', 'format.type' = 'json', 'format.derive-schema' = 'true' )

使用 DDL 连接 MySQL 结果表

连接 MySQL 可以使用 Flink 提供的 JDBC 连接器。例如:

sql CREATE TABLE pvuv_sink ( dt VARCHAR, pv BIGINT, uv BIGINT ) WITH ( 'connector.type' = 'jdbc', 'connector.url' = 'jdbc:mysql://localhost:3306/flink-test', 'connector.table' = 'pvuv_sink', 'connector.username' = 'root', 'connector.password' = '123456', 'connector.write.flush.max-rows' = '1' )

PV 和 UV 计算

假设需求是计算每小时全网用户的访问量和独立用户数。很多人可能会想到使用滚动窗口来计算。这里介绍另一种方法,即通过 GROUP AGGREGATION 的方式。

sql INSERT INTO pvuv_sink SELECT DATE_FORMAT(ts, 'yyyy-MM-dd HH:00') dt, COUNT(*) AS pv, COUNT(DISTINCT user_id) AS uv FROM user_log GROUP BY DATE_FORMAT(ts, 'yyyy-MM-dd HH:00')

该查询使用内置函数 DATE_FORMAT 将时间戳归一化为“年月日小时”的格式,并根据此格式分组,从而计算每小时的访问量和独立用户数。这种方式具有较高的实时性,但也会产生较大的输入量。

实战演示

环境准备

本次实战需要安装一些必备服务,包括: - Flink 本地集群:用于运行 Flink SQL 任务 - Kafka 本地集群:作为数据源 - MySQL 数据库:作为结果表

Flink 本地集群安装
  1. 下载 Flink 1.9.0 安装包并解压:https://www.apache.org/dist/flink/flink-1.9.0/flink-1.9.0-bin-scala_2.11.tgz
  2. 下载以下依赖 JAR 包并复制到 flink-1.9.0/lib/ 目录下:
    • flink-sql-connector-kafka2.11-1.9.0.jar:http://central.maven.org/maven2/org/apache/flink/flink-sql-connector-kafka2.11/1.9.0/flink-sql-connector-kafka2.11-1.9.0.jar
    • flink-json-1.9.0-sql-jar.jar:http://central.maven.org/maven2/org/apache/flink/flink-json/1.9.0/flink-json-1.9.0-sql-jar.jar
    • flink-jdbc2.11-1.9.0.jar:http://central.maven.org/maven2/org/apache/flink/flink-jdbc2.11/1.9.0/flink-jdbc2.11-1.9.0.jar
    • mysql-connector-java-5.1.48.jar:https://dev.mysql.com/downloads/connector/j/5.1.html
  3. 修改 flink-1.9.0/conf/flink-conf.yaml 文件中的 taskmanager.numberOfTaskSlots 参数为 10。
  4. flink-1.9.0 目录下执行 ./bin/start-cluster.sh 启动集群。

成功启动后,可在 http://localhost:8081 访问 Flink Web UI。

Kafka 本地集群安装
  1. 下载 Kafka 2.2.0 安装包并解压:https://www.apache.org/dist/kafka/2.2.0/kafka_2.11-2.2.0.tgz
  2. 将安装路径添加到 flink-sql-submit 项目的 env.sh 文件中,例如 KAFKA_DIR=/Users/wuchong/dev/install/kafka_2.11-2.2.0
  3. flink-sql-submit 目录下执行 ./start-kafka.sh 启动 Kafka 集群。

成功启动后,在命令行执行 jps 命令,如果看到 Kafka 和 QuorumPeerMain 进程,则表明启动成功。

MySQL 安装

可以在 MySQL 官方页面下载并安装 MySQL:https://dev.mysql.com/downloads/mysql/

如果有 Docker 环境,也可以使用 Docker 安装:

bash $ docker pull mysql $ docker run --name mysqldb -p 3306:3306 -e MYSQL_ROOT_PASSWORD=123456 -d mysql

然后在 MySQL 中创建一个名为 flink-test 的数据库,并按上文所述的模式创建 pvuv_sink 表。

提交 SQL 任务

  1. flink-sql-submit 目录下执行 ./source-generator.sh,自动创建 user_behavior 主题,并实时向其中灌入数据。
  2. flink-sql-submit 目录下执行 ./run.sh q1 提交任务,成功后可在 Web UI 中查看拓扑图。
  3. 在 MySQL 客户端中,可以看到每小时的 PV 和 UV 值不断变化。

结论

本文介绍了如何搭建基础集群环境,并使用 SqlSubmit 提交纯 SQL 任务来学习如何连接外部系统。flink-sql-submit/src/main/resources/q1.sql 文件中还有一些注释掉的调优参数,感兴趣的读者可以启用这些参数,观察它们对任务性能的影响。

关于这些调优参数的详细解释,可以参考我在深圳 Meetup 上的分享《Flink SQL 1.9.0 技术内情和最佳实践》。

    本文来源:图灵汇
责任编辑: : 黑科技评测室
声明:本文系图灵汇原创稿件,版权属图灵汇所有,未经授权不得转载,已经协议授权的媒体下载使用时须注明"稿件来源:图灵汇",违者将依法追究责任。
    分享
写入实战读取运用KafkaMySQLFlinkSQL1.9
    下一篇