Flink 义务运转日志搜集方案,这方案很直接
作者头像
  • 韶华杂谈
  • 2019-11-04 10:01:36 2

场景描述:Flink 日志管理

Flink 日志包括系统日志以及用户代码中的日志输出,这些信息可以在 Flink 的 Web 页面上查看。当前的部署模式主要是 YARN,因此在 YARN 的界面上也可以查看到这些日志。在开发或测试环境中,日志量相对较小,便于查看。然而,在生产环境中,由于任务需要 24/7 运行,日志量会显著增加。这会导致在 Flink Web 页面上查看日志时出现卡顿,难以通过日志排查问题。因此,有必要将日志发送到外部的搜索系统中,以便更方便地进行日志搜索。

关键词:Flink 日志收集

方案概述

对于开源的日志收集方案,最常用的是 Elasticsearch + Logstash + Kibana(即 ELK)。该方案通过 Logstash 收集日志并发送到 Elasticsearch,然后通过 Kibana 查询 Elasticsearch 中的数据。主要需要考虑的问题是如何让 Logstash 收集 Flink 的日志。通常需要在每个 NodeManager 节点上安装 Logstash 来收集日志,但由于任务可能会重启,日志目录也会随之变化,这会占用集群资源。此外,当集群扩容时也需要在新节点上安装 Logstash。鉴于这些因素,我们采用了另一种解决方案:通过 Log4j 将日志发送到 Kafka,再由 Logstash 消费 Kafka 中的数据。

Flink 日志打印

Flink 提供了 Log4j 和 Logback 两种日志打印方式,这里选择较为熟悉的 Log4j。Log4j 可以通过成熟的插件 KafkaLog4jAppender 将日志发送到 Kafka,但默认的日志格式并不是 Elasticsearch 可以解析的 JSON 格式。此外,Flink 的日志中并不包含任务对应的 applicationId 信息,这在 Kibana 中查看会比较麻烦。因此,我们采用自定义的 Log4j Appender 方式,将数据发送到 Kafka,并定义符合标准的数据格式,同时获取到任务的 applicationId。

在 Flink 的配置目录 conf 下,有一个 log4j.properties 文件,负责日志的相关配置。默认情况下,该文件中有一个 ${log.file} 变量,它代表日志路径,路径中包含了 applicationId。我们可以将这个变量传递给自定义的 Appender,然后解析出 applicationId。以下是具体实现步骤:

实现步骤

  1. 继承 AppenderSkeleton 类:在自定义的 Appender 中定义 Kafka 相关参数和日志路径参数。
  2. 日志打印:在 append 方法中处理日志打印逻辑。为了避免影响 Flink 任务的执行,这里使用了 try/catch 结构,并且异步发送日志到 Kafka。
  3. 数据格式化与发送:在 AppendKafkaTask 类中处理数据格式化和 applicationId 解析,并将数据发送到 Kafka。

Log4j 配置

在 log4j.properties 文件中进行相应的配置,确保日志能够按照预期格式发送到 Kafka。

完成以上配置后,将自定义的 Log4j Appender 打包并放置在 Flink 的 lib 目录下,即可在 Kibana 中查看到日志信息。

    本文来源:图灵汇
责任编辑: : 韶华杂谈
声明:本文系图灵汇原创稿件,版权属图灵汇所有,未经授权不得转载,已经协议授权的媒体下载使用时须注明"稿件来源:图灵汇",违者将依法追究责任。
    分享
方案运转搜集义务直接Flink日志
    下一篇