线上业务在使用 Flink 消费 Kafka 中的轨迹数据时出现了严重的 backpressure,数据积压情况明显。每次批量写入的数据量为 3000 条/50MB/30 秒,并且并行度设置为 48。为了防止对线上业务造成影响,我们申请了一个与现有集群配置相同的 Elasticsearch 集群来进行调试。
在线业务中,Flink 消费 Kafka 中的轨迹数据时出现了严重的 backpressure,导致数据积压严重。每次批量写入的数据量为 3000 条/50MB/30 秒,并且并行度为 48。为了应对这种情况,我们建立了一个与生产环境配置一致的 Elasticsearch 集群进行调优测试。
通过压力测试,我们发现集群的平均写入性能大约为每秒 10 万条文档。
java
config.put("cluster.name", ConfigUtil.getString(ES_CLUSTER_NAME, "flinktest"));
config.put("bulk.flush.max.actions", ConfigUtil.getString(ES_BULK_FLUSH_MAX_ACTIONS, "3000"));
config.put("bulk.flush.max.size.mb", ConfigUtil.getString(ES_BULK_FLUSH_MAX_SIZE_MB, "50"));
config.put("bulk.flush.interval.ms", ConfigUtil.getString(ES_BULK_FLUSH_INTERVAL, "3000"));
```java final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); initEnv(env); Properties properties = ConfigUtil.getProperties(CONFIGFILEPATH);
// 从 Kafka 获取轨迹数据 FlinkKafkaConsumer010 flinkKafkaConsumer010 = new FlinkKafkaConsumer010(properties.getProperty("topic.name"), new SimpleStringSchema(), properties); flinkKafkaConsumer010.setStartFromLatest();
DataStreamSource streamSource = env.addSource(flinkKafkaConsumer010);
// 将数据写入 Elasticsearch streamSource.map(s -> JSONObject.parseObject(s, Trajectory.class)) .addSink(EsSinkFactory.createSinkFunction(new TrajectoryDetailEsSinkFunction())) .name("esSink");
env.execute("flinktest"); ```
任务容器数量为 24 个,总共 48 个并行度。保存点时间为 15 分钟。
取消写入 Elasticsearch 的操作后,QPS 达到 110k,是之前的 10 倍。这表明 Elasticsearch 集群的写入性能是导致下游反压的主要原因。
curl -XPUT {集群地址}/{索引名称}/_settings?timeout=3m -H "Content-Type: application/json" -d '{"number_of_replicas":"0"}'
curl -XPUT {集群地址}/{索引名称}/_settings?timeout=3m -H "Content-Type: application/json" -d '{ "settings": { "index": {"refresh_interval" : -1 } } }'
indices.memory.index_buffer_size
设置为 10%。经过上述优化措施,我们发现写入性能有所提升,Flink 的反压情况也得到了缓解。特别是在取消写入 Elasticsearch 的操作后,Kafka 中的 topic 积压明显减少。
当遇到 Elasticsearch 写入性能瓶颈时,可以采取以下步骤进行排查和优化: