Kafka是由LinkedIn开发并开源的分布式消息系统,现已成为Apache顶级项目。Kafka的核心特点包括高吞吐量和基于Pull模式的消息消费机制。最初设计目标是用于日志收集和传输。从0.8版本起,Kafka支持数据复制,但不支持事务处理,对于消息的重复、丢失和错误处理没有严格的要求,适用于产生大量数据的互联网服务的数据采集场景。
Kafka基础概念
pom文件配置
Kafka客户端库
xml
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.11.0.1</version>
</dependency>
Spring Kafka库
xml
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>1.1.6.RELEASE</version>
</dependency>
消费者配置
java
// bootstrap.servers: 部署的Brokers集群
// group.id: 消费者群组ID
// enable.auto.commit: 是否自动提交offset
// fetch.max.bytes: 一次fetch请求的最大字节数
// max.poll.records: 一次poll请求的最大记录数
生产者配置
埋点数据已经存储在大数据方的Kafka中,我们只需要消费数据。后期开发了对外的SDK,通过引入SDK可以将分析数据存储到我们提供的Kafka中。
由于消费过程中涉及复杂的业务逻辑,我们将消费Service放入线程池中异步执行。
Spring线程池配置
java
@Configuration
public class ThreadPoolConfig {
@Bean(name = "kafkaConsumerTaskExecutor")
public ThreadPoolTaskExecutor kafkaConsumerTaskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(5);
executor.setMaxPoolSize(10);
executor.setQueueCapacity(20);
executor.initialize();
return executor;
}
}
消费代码
```java @Component public class NgLogConsumerListener implements BatchAcknowledgingMessageListener {
private static final Logger LOGGER = LoggerFactory.getLogger(NgLogConsumerListener.class);
@Autowired
private NgLogConsumerService ngLogConsumerService;
@Autowired
@Qualifier("kafkaConsumerTaskExecutor")
private ThreadPoolTaskExecutor threadPoolTaskExecutor;
@Override
public void onMessage(List<ConsumerRecord<String, String>> data, Acknowledgment acknowledgment) {
if (data.isEmpty()) {
return;
}
try {
threadPoolTaskExecutor.submit(() -> {
ngLogConsumerService.handleData(data);
});
} catch (Exception e) {
LOGGER.error("往消费线程池添加任务异常. data={}, ExceptionMsg={}", data, e.getMessage(), e);
}
try {
acknowledgment.acknowledge();
} catch (Exception e) {
LOGGER.error("提交确认失败. data={}, ExceptionMsg={}", data, e.getMessage(), e);
}
}
} ```
项目初期,埋点数据存储在大数据端。随着项目的迭代,其他项目需要接入监控系统,因此提供了自用的Kafka环境和SDK。
接入方数据格式
接入系统的业务数据为Java Bean AgentMessage
,包含三个参数:
- appName
: 项目名称
- type
: 业务类型
- source
: 业务数据
SDK配置
```xml
SDK工具类
```java public class MonitorUtil {
private static final Logger LOGGER = LoggerFactory.getLogger(MonitorUtil.class);
private static volatile KafkaProducer<String, String> producer;
private static volatile ThreadPoolExecutor executor;
public static void sendData(AgentMessage message) {
final AgentMessage finalMessage = message;
ThreadPoolExecutor executor = getExecutor();
try {
executor.submit(() -> {
send(finalMessage);
});
} catch (Exception e) {
LOGGER.error("往消费线程池添加任务异常,message={}, errorMsg={}", JSON.toJSONString(finalMessage), e.getMessage(), e);
}
}
private static void send(AgentMessage message) {
final KafkaProducer<String, String> producer = getProducer();
if (StringUtils.isBlank(message.getAppName())) {
if (!StringUtils.isBlank(appName)) {
message.setAppName(appName);
} else {
LOGGER.warn("无法获取appName");
return;
}
}
String topic = "monitor_" + message.getAppName();
producer.send(new ProducerRecord<>(topic, JSON.toJSONString(message)), (metadata, exception) -> {
if (exception != null) {
LOGGER.error("发送记录失败,msg={}, errorMsg={}", JSON.toJSONString(message), exception.getMessage(), exception);
}
});
}
private static ThreadPoolExecutor getExecutor() {
if (executor == null) {
synchronized (MonitorUtil.class) {
if (executor == null) {
BlockingQueue<Runnable> queue = new LinkedBlockingQueue<>(3000);
RejectedExecutionHandler handler = new ThreadPoolExecutor.AbortPolicy();
executor = new ThreadPoolExecutor(25, 50, 300, TimeUnit.SECONDS, queue, handler);
}
}
}
return executor;
}
private static KafkaProducer<String, String> getProducer() {
if (producer == null) {
synchronized (MonitorUtil.class) {
if (producer == null) {
producer = new KafkaProducer<>(createProps());
}
}
}
return producer;
}
private static Properties createProps() {
Properties props = new Properties();
props.put("bootstrap.servers", ConfigManager.get("monitor.kafka.bootstrap.servers"));
props.put("acks", ConfigManager.get("monitor.kafka.acks", "all"));
props.put("retries", ConfigManager.getInteger("monitor.kafka.retries", 3));
props.put("batch.size", ConfigManager.getInteger("monitor.kafka.batch.size", 16384));
props.put("linger.ms", ConfigManager.getInteger("monitor.kafka.linger.ms", 10));
props.put("buffer.memory", ConfigManager.getInteger("monitor.kafka.buffer.memory", 33554432));
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
return props;
}
} ```
问题一:消费者配置问题
fetch.message.max.bytes
已经过时,应替换为 fetch.max.bytes
。问题二:消费历史数据
问题三:提交offset失败
Kafka的配置和使用已经顺利完成,接下来将介绍如何将数据写入Elasticsearch。