最近遇到了一个将数据写入Kafka的需求,涉及到一些sink部分的代码实现。以下是原始代码:
scala
val kafkaProducer: FlinkKafkaProducer011[String] = new FlinkKafkaProducer011[String](sinkTopic, new StringKeyedSerializationSchema, producerConfig, sinkSemantic)
ds.addSink(kafkaProducer).setParallelism(sinkParallelism)
其中,StringKeyedSerializationSchema
是一个自定义的序列化器,用于序列化写入Kafka的键值对。经过初步测试,所有操作看起来都很顺利,但在一段时间后,Kafka团队反馈说只有部分分区接收到数据,其他分区没有任何数据写入。
经过调查,发现数据写入Kafka的分区策略如下:
由于我们已经指定了键,问题应该出现在分区策略上。检查源码后发现,FlinkKafkaProducer011
中确实指定了分区逻辑:
scala
if (flinkKafkaPartitioner != null) {
record = new ProducerRecord(
targetTopic,
flinkKafkaPartitioner.partition(next, serializedKey, serializedValue, targetTopic, partitions),
timestamp,
serializedKey,
serializedValue
)
} else {
record = new ProducerRecord(targetTopic, null, timestamp, serializedKey, serializedValue)
}
这里使用的分区器是默认的 FlinkFixedPartitioner
,它通过以下方式确定分区:
scala
partitions[parallelInstanceId % partitions.length]
parallelInstanceId
表示当前任务的索引,而 partitions
则是Kafka主题的分区数量。在这种情况下,由于 sinkParallelism
设置为4,而Kafka主题的分区数为6,因此某些分区无法获得数据。
为了解决这个问题,可以采取以下几种方法:
sinkParallelism
设置为与Kafka主题的分区数一致。flinkKafkaPartitioner
设置为空,并指定写入Kafka的键。FlinkKafkaPartitioner
并重写其 partition
方法。最终选择了第三种方案,修改代码如下:
scala
val kafkaProducer: FlinkKafkaProducer011[String] = new FlinkKafkaProducer011[String](
sinkTopic,
new StringKeyedSerializationSchema,
producerConfig,
Optional.ofNullable(null),
sinkSemantic,
5
)
同时,将 StringKeyedSerializationSchema
的 serializeKey
返回值设为 null
。再次运行任务后,所有分区均成功接收到数据,问题得以解决。