本文将介绍如何利用Flink从Kafka中读取Topic的数据。
Flink与Spark类似,内置了读写Kafka Topic的功能,即Kafka连接器(Kafka Connectors)。Flink Kafka消费者与Flink的检查点机制(Checkpoint)进行了整合,从而提供了精确一次(exactly-once)的处理语义。为了实现这一功能,Flink不仅依赖于跟踪Kafka消费者的组偏移量,还将这些偏移量存储在其外部以进行追踪。
Flink和Kafka的集成API并没有包含在Flink包中,而是作为单独的模块进行管理。因此,如果我们需要在Flink中使用Kafka,必须将其相关的库添加到项目的pom.xml
文件中。本文以Flink 1.0.0版本和Scala 2.10.x为例,演示如何在pom.xml
中添加以下依赖:
xml
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka-0.8_2.10</artifactId>
<version>1.0.0</version>
</dependency>
我们需要编写一个Kafka消费者,通过Flink计算引擎从Kafka相应的Topic中读取数据。在Flink中,可以通过FlinkKafkaConsumer08
类实现这一功能,该类提供了读取单个或多个Kafka Topic的机制。其构造函数接受以下参数:
bootstrap.servers
、zookeeper.connect
(仅在Kafka 0.8版本中需要)和group.id
属性。接下来,我们来看一下如何使用FlinkKafkaConsumer08
类。初始化过程如下:
scala
val properties = new Properties()
properties.setProperty("bootstrap.servers", "www.iteblog.com:9092")
// 仅在Kafka 0.8版本中需要
properties.setProperty("zookeeper.connect", "www.iteblog.com:2181")
properties.setProperty("group.id", "iteblog")
val stream = env.addSource(new FlinkKafkaConsumer08[String]("iteblog",
new SimpleStringSchema(), properties))
stream.print()
在这个例子中,我们使用了SimpleStringSchema
来反序列化消息。这是一个实现了DeserializationSchema
接口的类,重写了T deserialize(byte[] message)
方法。DeserializationSchema
接口仅提供了反序列化数据的方法,因此如果需要反序列化键值,则需要使用KeyedDeserializationSchema
的子类。KeyedDeserializationSchema
接口提供了T deserialize(byte[] messageKey, byte[] message, String topic, int partition, long offset)
方法,可用于反序列化Kafka消息的数据和键。
为了方便使用,Flink提供了多种序列化方案,如TypeInformationSerializationSchema
和TypeInformationKeyValueSerializationSchema
,它们可以根据Flink的TypeInformation
信息自动选择合适的序列化方案。
如果我们启用了Flink的检查点机制,Flink Kafka消费者将从指定的Topic中消费消息,并定期将Kafka的偏移量、状态信息及其他操作信息保存到检查点中。因此,如果Flink作业出现故障,可以从最近的检查点恢复,并从上次的偏移量处继续读取消息。
要在执行环境中启用Flink的检查点机制,可以按照以下方式进行设置:
scala
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.enableCheckpointing(5000) // 每隔5000毫秒进行一次检查点
需要注意的是,只有在有足够的处理插槽(slots)时,Flink才能从检查点恢复。在YARN模式下,Flink支持动态重启丢失的YARN容器。
如果没有启用检查点机制,Flink Kafka消费者会定期向Zookeeper提交偏移量。
完整代码如下:
```scala package com.iteblog
import java.util.Properties
import org.apache.flink.streaming.api.scala._ import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08 import org.apache.flink.streaming.util.serialization.SimpleStringSchema
object FlinkKafkaStreaming { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment env.enableCheckpointing(5000) val properties = new Properties() properties.setProperty("bootstrap.servers", "www.iteblog.com:9092") // 仅在Kafka 0.8版本中需要 properties.setProperty("zookeeper.connect", "www.iteblog.com:2181") properties.setProperty("group.id", "iteblog")
val stream = env.addSource(new FlinkKafkaConsumer08[String]("iteblog",
new SimpleStringSchema(), properties))
stream.setParallelism(4).writeAsText("hdfs:///tmp/iteblog/data")
env.execute("IteblogFlinkKafkaStreaming")
} } ```
运行此程序将在hdfs:///tmp/iteblog/
目录下创建名为data
的文件,并将数据写入其中。