Flink 日志包括系统日志以及用户代码中的日志输出,这些信息可以在 Flink 的 Web 页面上查看。当前的部署模式主要是 YARN,因此在 YARN 的界面上也可以查看到这些日志。在开发或测试环境中,日志量相对较小,便于查看。然而,在生产环境中,由于任务需要 24/7 运行,日志量会显著增加。这会导致在 Flink Web 页面上查看日志时出现卡顿,难以通过日志排查问题。因此,有必要将日志发送到外部的搜索系统中,以便更方便地进行日志搜索。
对于开源的日志收集方案,最常用的是 Elasticsearch + Logstash + Kibana(即 ELK)。该方案通过 Logstash 收集日志并发送到 Elasticsearch,然后通过 Kibana 查询 Elasticsearch 中的数据。主要需要考虑的问题是如何让 Logstash 收集 Flink 的日志。通常需要在每个 NodeManager 节点上安装 Logstash 来收集日志,但由于任务可能会重启,日志目录也会随之变化,这会占用集群资源。此外,当集群扩容时也需要在新节点上安装 Logstash。鉴于这些因素,我们采用了另一种解决方案:通过 Log4j 将日志发送到 Kafka,再由 Logstash 消费 Kafka 中的数据。
Flink 提供了 Log4j 和 Logback 两种日志打印方式,这里选择较为熟悉的 Log4j。Log4j 可以通过成熟的插件 KafkaLog4jAppender 将日志发送到 Kafka,但默认的日志格式并不是 Elasticsearch 可以解析的 JSON 格式。此外,Flink 的日志中并不包含任务对应的 applicationId 信息,这在 Kibana 中查看会比较麻烦。因此,我们采用自定义的 Log4j Appender 方式,将数据发送到 Kafka,并定义符合标准的数据格式,同时获取到任务的 applicationId。
在 Flink 的配置目录 conf 下,有一个 log4j.properties 文件,负责日志的相关配置。默认情况下,该文件中有一个 ${log.file} 变量,它代表日志路径,路径中包含了 applicationId。我们可以将这个变量传递给自定义的 Appender,然后解析出 applicationId。以下是具体实现步骤:
在 log4j.properties 文件中进行相应的配置,确保日志能够按照预期格式发送到 Kafka。
完成以上配置后,将自定义的 Log4j Appender 打包并放置在 Flink 的 lib 目录下,即可在 Kibana 中查看到日志信息。