记一次Flink写入Kafka坑点
作者头像
  • 诺贝尔家
  • 2019-11-13 09:37:19 5

最近遇到了一个将数据写入Kafka的需求,涉及到一些sink部分的代码实现。以下是原始代码:

scala val kafkaProducer: FlinkKafkaProducer011[String] = new FlinkKafkaProducer011[String](sinkTopic, new StringKeyedSerializationSchema, producerConfig, sinkSemantic) ds.addSink(kafkaProducer).setParallelism(sinkParallelism)

其中,StringKeyedSerializationSchema 是一个自定义的序列化器,用于序列化写入Kafka的键值对。经过初步测试,所有操作看起来都很顺利,但在一段时间后,Kafka团队反馈说只有部分分区接收到数据,其他分区没有任何数据写入。

经过调查,发现数据写入Kafka的分区策略如下:

  1. 如果指定了分区,则数据会写入指定的分区。
  2. 如果没有指定分区但指定了键(key),则会根据键的哈希值对分区进行取模分配。
  3. 如果既没有指定分区也没有指定键,则会按照轮询方式分配。

由于我们已经指定了键,问题应该出现在分区策略上。检查源码后发现,FlinkKafkaProducer011 中确实指定了分区逻辑:

scala if (flinkKafkaPartitioner != null) { record = new ProducerRecord( targetTopic, flinkKafkaPartitioner.partition(next, serializedKey, serializedValue, targetTopic, partitions), timestamp, serializedKey, serializedValue ) } else { record = new ProducerRecord(targetTopic, null, timestamp, serializedKey, serializedValue) }

这里使用的分区器是默认的 FlinkFixedPartitioner,它通过以下方式确定分区:

scala partitions[parallelInstanceId % partitions.length]

parallelInstanceId 表示当前任务的索引,而 partitions 则是Kafka主题的分区数量。在这种情况下,由于 sinkParallelism 设置为4,而Kafka主题的分区数为6,因此某些分区无法获得数据。

为了解决这个问题,可以采取以下几种方法:

  1. sinkParallelism 设置为与Kafka主题的分区数一致。
  2. flinkKafkaPartitioner 设置为空,并指定写入Kafka的键。
  3. 自定义一个 FlinkKafkaPartitioner 并重写其 partition 方法。

最终选择了第三种方案,修改代码如下:

scala val kafkaProducer: FlinkKafkaProducer011[String] = new FlinkKafkaProducer011[String]( sinkTopic, new StringKeyedSerializationSchema, producerConfig, Optional.ofNullable(null), sinkSemantic, 5 )

同时,将 StringKeyedSerializationSchemaserializeKey 返回值设为 null。再次运行任务后,所有分区均成功接收到数据,问题得以解决。

    本文来源:图灵汇
责任编辑: : 诺贝尔家
声明:本文系图灵汇原创稿件,版权属图灵汇所有,未经授权不得转载,已经协议授权的媒体下载使用时须注明"稿件来源:图灵汇",违者将依法追究责任。
    分享
写入KafkaFlink
    下一篇