Flink 实时写入数据到 ElasticSearch 功能调优
作者头像
  • 漫谈互联网
  • 2019-10-25 12:42:42 5

简介

线上业务在使用 Flink 消费 Kafka 中的轨迹数据时出现了严重的 backpressure,数据积压情况明显。每次批量写入的数据量为 3000 条/50MB/30 秒,并且并行度设置为 48。为了防止对线上业务造成影响,我们申请了一个与现有集群配置相同的 Elasticsearch 集群来进行调试。

背景分析

在线业务中,Flink 消费 Kafka 中的轨迹数据时出现了严重的 backpressure,导致数据积压严重。每次批量写入的数据量为 3000 条/50MB/30 秒,并且并行度为 48。为了应对这种情况,我们建立了一个与生产环境配置一致的 Elasticsearch 集群进行调优测试。

测试环境

  • Elasticsearch: 版本 2.3.3
  • Flink: 版本 1.6.3
  • flink-connector-elasticsearch: 版本 2.2.11
  • 硬件配置: 八台 SSD 服务器,56 核心处理器(3 主节点,5 从节点)
  • 压力测试工具: 使用 Rally 对 Elasticsearch 集群进行分布式压力测试

通过压力测试,我们发现集群的平均写入性能大约为每秒 10 万条文档。

Flink 写入测试

配置文件

java config.put("cluster.name", ConfigUtil.getString(ES_CLUSTER_NAME, "flinktest")); config.put("bulk.flush.max.actions", ConfigUtil.getString(ES_BULK_FLUSH_MAX_ACTIONS, "3000")); config.put("bulk.flush.max.size.mb", ConfigUtil.getString(ES_BULK_FLUSH_MAX_SIZE_MB, "50")); config.put("bulk.flush.interval.ms", ConfigUtil.getString(ES_BULK_FLUSH_INTERVAL, "3000"));

执行代码片段

```java final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); initEnv(env); Properties properties = ConfigUtil.getProperties(CONFIGFILEPATH);

// 从 Kafka 获取轨迹数据 FlinkKafkaConsumer010 flinkKafkaConsumer010 = new FlinkKafkaConsumer010(properties.getProperty("topic.name"), new SimpleStringSchema(), properties); flinkKafkaConsumer010.setStartFromLatest();

DataStreamSource streamSource = env.addSource(flinkKafkaConsumer010);

// 将数据写入 Elasticsearch streamSource.map(s -> JSONObject.parseObject(s, Trajectory.class)) .addSink(EsSinkFactory.createSinkFunction(new TrajectoryDetailEsSinkFunction())) .name("esSink");

env.execute("flinktest"); ```

运行时配置

任务容器数量为 24 个,总共 48 个并行度。保存点时间为 15 分钟。

运行现象

  1. Source 和 Map 算子 出现较高反压。
  2. Elasticsearch 集群 目标索引写入速度显著下降,平均 QPS 为 12k 左右。
  3. 取消 Sink 算子 后,QPS 提升至 116k 左右。

有无 Sink 参照实验的结论

取消写入 Elasticsearch 的操作后,QPS 达到 110k,是之前的 10 倍。这表明 Elasticsearch 集群的写入性能是导致下游反压的主要原因。

优化方向

索引字段类型调整

  • 问题:部分字段格式被自动检测为日期格式,导致空字符串无法解析报错。
  • 解决方案:关闭索引的自动检测功能。

降低副本数

  • 命令curl -XPUT {集群地址}/{索引名称}/_settings?timeout=3m -H "Content-Type: application/json" -d '{"number_of_replicas":"0"}'

提高刷新间隔

  • 命令curl -XPUT {集群地址}/{索引名称}/_settings?timeout=3m -H "Content-Type: application/json" -d '{ "settings": { "index": {"refresh_interval" : -1 } } }'

检查集群节点 CPU 核数

  • 操作:通过 Grafana 监控节点 CPU 使用率,并通过 Linux 命令查看 CPU 核数。将高负载节点的索引分片移动到 CPU 核数更多的节点上。

线程队列优化

  • 操作:调整 Elasticsearch 线程池的队列长度,使其与集群硬件配置相匹配。

Index Buffer Size 优化

  • 参考:根据官方建议,将 indices.memory.index_buffer_size 设置为 10%。

Translog 优化

  • 操作:修改 translog 的持久化策略为异步模式,增加刷新阈值和刷新间隔。

优化效果

经过上述优化措施,我们发现写入性能有所提升,Flink 的反压情况也得到了缓解。特别是在取消写入 Elasticsearch 的操作后,Kafka 中的 topic 积压明显减少。

总结

当遇到 Elasticsearch 写入性能瓶颈时,可以采取以下步骤进行排查和优化:

  1. 检查日志,确认是否有字段类型不匹配或脏数据。
  2. 检查 CPU 使用情况,确认集群是否存在异构问题。
  3. 检查客户端配置,确认是否使用了批量写入。
  4. 查看线程堆栈,确认耗时最长的方法调用。
  5. 根据业务类型(如 ToB 或 ToC),调整副本数和刷新时间。
  6. 进行 index buffer size 和 translog 的优化。
  7. 重启集群,验证优化效果。
    本文来源:图灵汇
责任编辑: : 漫谈互联网
声明:本文系图灵汇原创稿件,版权属图灵汇所有,未经授权不得转载,已经协议授权的媒体下载使用时须注明"稿件来源:图灵汇",违者将依法追究责任。
    分享
写入ElasticSearch实时功能数据Flink
    下一篇