上周六在深圳分享了《Flink SQL 1.9.0 技术内情和最佳实践》,会后不少听众对最后演示环节中的 Demo 代码表示出了浓厚兴趣,迫切希望能够亲自尝试一下。因此,本文旨在分享这段代码,希望能对 Flink SQL 初学者有所帮助。完整分享内容可以查看 Meetup 的视频回顾:https://developer.aliyun.com/live/1416。
这段代码已开源至 GitHub:https://github.com/wuchong/flink-sql-submit
代码主要分为两部分:一是用于提交 SQL 文件的 SqlSubmit 实现;二是包含 SQL 示例、Kafka 启动脚本、测试数据集以及 Kafka 数据源生成器。
通过本次实战,你可以掌握: 1. 如何使用 Blink Planner 2. 如何通过 DDL 创建 Kafka 源表和 MySQL 结果表 3. 如何运行一个从 Kafka 读取数据、计算 PV 和 UV,并将结果写入 MySQL 的任务 4. 如何设置调优参数并观察其对任务性能的影响
起初,作者考虑使用 SQL Client 来实现整个演示过程,但由于 Flink 1.9 版本的 SQL CLI 不支持处理 CREATE TABLE 语句,作者最终决定自己编写一个简单的提交脚本。这样做不仅可以让听众了解如何通过 SQL 方式使用 Flink SQL,还能让他们了解编程方式的应用。
SqlSubmit 的主要功能是执行并提交一个 SQL 文件。其实现相对简单,通过正则表达式匹配每个语句块。如果是 CREATE TABLE 或 INSERT INTO 开头,则调用 tEnv.sqlUpdate() 方法;如果是 SET 开头,则将配置设置到 TableConfig 上。以下是核心代码片段:
```java EnvironmentSettings settings = EnvironmentSettings.newInstance() .useBlinkPlanner() .inStreamingMode() .build(); TableEnvironment tEnv = TableEnvironment.create(settings);
List
for (SqlCommandCall call : calls) { switch (call.command) { case SET: String key = call.operands[0]; String value = call.operands[1]; tEnv.getConfig().getConfiguration().setString(key, value); break; case CREATETABLE: String ddl = call.operands[0]; tEnv.sqlUpdate(ddl); break; case INSERTINTO: String dml = call.operands[0]; tEnv.sqlUpdate(dml); break; default: throw new RuntimeException("Unsupported command: " + call.command); } }
tEnv.execute("SQL Job"); ```
在 flink-sql-submit 项目中,有一份测试数据集(来自阿里云天池公开数据集),位于 src/main/resources/user_behavior.log
。数据以 JSON 格式编码,示例如下:
json
{"user_id": "543462", "item_id":"1715", "category_id": "1464116", "behavior": "pv", "ts": "2017-11-26T01:00:00Z"}
{"user_id": "662867", "item_id":"2244074", "category_id": "1575622", "behavior": "pv", "ts": "2017-11-26T01:00:00Z"}
为了模拟真实的 Kafka 数据源,作者编写了一个 source-generator.sh
脚本(感兴趣的读者可以查看源码),该脚本会自动读取 user_behavior.log
文件的数据,并以默认每毫秒一条的速度发送到 Kafka 的 user_behavior
主题中。
有了数据源之后,我们可以通过 DDL 创建并连接 Kafka 主题(详见 src/main/resources/q1.sql
):
sql
CREATE TABLE user_log (
user_id VARCHAR,
item_id VARCHAR,
category_id VARCHAR,
behavior VARCHAR,
ts TIMESTAMP
) WITH (
'connector.type' = 'kafka',
'connector.version' = 'universal',
'connector.topic' = 'user_behavior',
'connector.startup-mode' = 'earliest-offset',
'connector.properties.0.key' = 'zookeeper.connect',
'connector.properties.0.value' = 'localhost:2181',
'connector.properties.1.key' = 'bootstrap.servers',
'connector.properties.1.value' = 'localhost:9092',
'update-mode' = 'append',
'format.type' = 'json',
'format.derive-schema' = 'true'
)
连接 MySQL 可以使用 Flink 提供的 JDBC 连接器。例如:
sql
CREATE TABLE pvuv_sink (
dt VARCHAR,
pv BIGINT,
uv BIGINT
) WITH (
'connector.type' = 'jdbc',
'connector.url' = 'jdbc:mysql://localhost:3306/flink-test',
'connector.table' = 'pvuv_sink',
'connector.username' = 'root',
'connector.password' = '123456',
'connector.write.flush.max-rows' = '1'
)
假设需求是计算每小时全网用户的访问量和独立用户数。很多人可能会想到使用滚动窗口来计算。这里介绍另一种方法,即通过 GROUP AGGREGATION 的方式。
sql
INSERT INTO pvuv_sink
SELECT
DATE_FORMAT(ts, 'yyyy-MM-dd HH:00') dt,
COUNT(*) AS pv,
COUNT(DISTINCT user_id) AS uv
FROM user_log
GROUP BY DATE_FORMAT(ts, 'yyyy-MM-dd HH:00')
该查询使用内置函数 DATE_FORMAT 将时间戳归一化为“年月日小时”的格式,并根据此格式分组,从而计算每小时的访问量和独立用户数。这种方式具有较高的实时性,但也会产生较大的输入量。
本次实战需要安装一些必备服务,包括: - Flink 本地集群:用于运行 Flink SQL 任务 - Kafka 本地集群:作为数据源 - MySQL 数据库:作为结果表
flink-1.9.0/lib/
目录下:
flink-1.9.0/conf/flink-conf.yaml
文件中的 taskmanager.numberOfTaskSlots
参数为 10。flink-1.9.0
目录下执行 ./bin/start-cluster.sh
启动集群。成功启动后,可在 http://localhost:8081
访问 Flink Web UI。
flink-sql-submit
项目的 env.sh
文件中,例如 KAFKA_DIR=/Users/wuchong/dev/install/kafka_2.11-2.2.0
flink-sql-submit
目录下执行 ./start-kafka.sh
启动 Kafka 集群。成功启动后,在命令行执行 jps
命令,如果看到 Kafka 和 QuorumPeerMain 进程,则表明启动成功。
可以在 MySQL 官方页面下载并安装 MySQL:https://dev.mysql.com/downloads/mysql/
如果有 Docker 环境,也可以使用 Docker 安装:
bash
$ docker pull mysql
$ docker run --name mysqldb -p 3306:3306 -e MYSQL_ROOT_PASSWORD=123456 -d mysql
然后在 MySQL 中创建一个名为 flink-test
的数据库,并按上文所述的模式创建 pvuv_sink
表。
flink-sql-submit
目录下执行 ./source-generator.sh
,自动创建 user_behavior
主题,并实时向其中灌入数据。flink-sql-submit
目录下执行 ./run.sh q1
提交任务,成功后可在 Web UI 中查看拓扑图。本文介绍了如何搭建基础集群环境,并使用 SqlSubmit 提交纯 SQL 任务来学习如何连接外部系统。flink-sql-submit/src/main/resources/q1.sql
文件中还有一些注释掉的调优参数,感兴趣的读者可以启用这些参数,观察它们对任务性能的影响。
关于这些调优参数的详细解释,可以参考我在深圳 Meetup 上的分享《Flink SQL 1.9.0 技术内情和最佳实践》。