运用Flink读取Kafka中的音讯
作者头像
  • 2019-10-20 12:58:00 6

本文将介绍如何利用Flink从Kafka中读取Topic的数据。

Flink与Spark类似,内置了读写Kafka Topic的功能,即Kafka连接器(Kafka Connectors)。Flink Kafka消费者与Flink的检查点机制(Checkpoint)进行了整合,从而提供了精确一次(exactly-once)的处理语义。为了实现这一功能,Flink不仅依赖于跟踪Kafka消费者的组偏移量,还将这些偏移量存储在其外部以进行追踪。

Flink和Kafka的集成API并没有包含在Flink包中,而是作为单独的模块进行管理。因此,如果我们需要在Flink中使用Kafka,必须将其相关的库添加到项目的pom.xml文件中。本文以Flink 1.0.0版本和Scala 2.10.x为例,演示如何在pom.xml中添加以下依赖:

xml <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-kafka-0.8_2.10</artifactId> <version>1.0.0</version> </dependency>

我们需要编写一个Kafka消费者,通过Flink计算引擎从Kafka相应的Topic中读取数据。在Flink中,可以通过FlinkKafkaConsumer08类实现这一功能,该类提供了读取单个或多个Kafka Topic的机制。其构造函数接受以下参数:

  1. Topic名称,可以是字符串(用于读取一个Topic)或列表(用于读取多个Topic);
  2. 提供一个反序列化方案(DeserializationSchema)或键值反序列化方案(KeyedDeserializationSchema),用于反序列化Kafka中的字节数组;
  3. Kafka消费者的配置信息,包括bootstrap.serverszookeeper.connect(仅在Kafka 0.8版本中需要)和group.id属性。

接下来,我们来看一下如何使用FlinkKafkaConsumer08类。初始化过程如下:

scala val properties = new Properties() properties.setProperty("bootstrap.servers", "www.iteblog.com:9092") // 仅在Kafka 0.8版本中需要 properties.setProperty("zookeeper.connect", "www.iteblog.com:2181") properties.setProperty("group.id", "iteblog") val stream = env.addSource(new FlinkKafkaConsumer08[String]("iteblog", new SimpleStringSchema(), properties)) stream.print()

在这个例子中,我们使用了SimpleStringSchema来反序列化消息。这是一个实现了DeserializationSchema接口的类,重写了T deserialize(byte[] message)方法。DeserializationSchema接口仅提供了反序列化数据的方法,因此如果需要反序列化键值,则需要使用KeyedDeserializationSchema的子类。KeyedDeserializationSchema接口提供了T deserialize(byte[] messageKey, byte[] message, String topic, int partition, long offset)方法,可用于反序列化Kafka消息的数据和键。

为了方便使用,Flink提供了多种序列化方案,如TypeInformationSerializationSchemaTypeInformationKeyValueSerializationSchema,它们可以根据Flink的TypeInformation信息自动选择合适的序列化方案。

如果我们启用了Flink的检查点机制,Flink Kafka消费者将从指定的Topic中消费消息,并定期将Kafka的偏移量、状态信息及其他操作信息保存到检查点中。因此,如果Flink作业出现故障,可以从最近的检查点恢复,并从上次的偏移量处继续读取消息。

要在执行环境中启用Flink的检查点机制,可以按照以下方式进行设置:

scala val env = StreamExecutionEnvironment.getExecutionEnvironment env.enableCheckpointing(5000) // 每隔5000毫秒进行一次检查点

需要注意的是,只有在有足够的处理插槽(slots)时,Flink才能从检查点恢复。在YARN模式下,Flink支持动态重启丢失的YARN容器。

如果没有启用检查点机制,Flink Kafka消费者会定期向Zookeeper提交偏移量。

完整代码如下:

```scala package com.iteblog

import java.util.Properties

import org.apache.flink.streaming.api.scala._ import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08 import org.apache.flink.streaming.util.serialization.SimpleStringSchema

object FlinkKafkaStreaming { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment env.enableCheckpointing(5000) val properties = new Properties() properties.setProperty("bootstrap.servers", "www.iteblog.com:9092") // 仅在Kafka 0.8版本中需要 properties.setProperty("zookeeper.connect", "www.iteblog.com:2181") properties.setProperty("group.id", "iteblog")

val stream = env.addSource(new FlinkKafkaConsumer08[String]("iteblog", 
  new SimpleStringSchema(), properties))
stream.setParallelism(4).writeAsText("hdfs:///tmp/iteblog/data")

env.execute("IteblogFlinkKafkaStreaming")

} } ```

运行此程序将在hdfs:///tmp/iteblog/目录下创建名为data的文件,并将数据写入其中。

    本文来源:图灵汇
责任编辑: :
声明:本文系图灵汇原创稿件,版权属图灵汇所有,未经授权不得转载,已经协议授权的媒体下载使用时须注明"稿件来源:图灵汇",违者将依法追究责任。
    分享
音讯读取运用FlinkKafka
    下一篇