Kafka、Elasticsearch、Grafana搭建业务监控系统(二)Kafka
作者头像
  • 米蔚然
  • 2019-10-31 09:24:02 0

Kafka、Elasticsearch、Grafana搭建业务监控系统(二)Kafka

一、Kafka简介

Kafka是由LinkedIn开发并开源的分布式消息系统,现已成为Apache顶级项目。Kafka的核心特点包括高吞吐量和基于Pull模式的消息消费机制。最初设计目标是用于日志收集和传输。从0.8版本起,Kafka支持数据复制,但不支持事务处理,对于消息的重复、丢失和错误处理没有严格的要求,适用于产生大量数据的互联网服务的数据采集场景。

Kafka基础概念

  • Producer: 发布消息到Kafka集群的服务。
  • Broker: Kafka集群中的服务器。
  • Topic: 发布到Kafka集群的消息类别。
  • Partition: 物理概念上的划分,每个Topic包含一个或多个Partition。
  • Consumer: 从Kafka集群中消费消息的服务。
  • Consumer Group: 高级消费者API中,每个消费者都属于一个Consumer Group,每条消息只能被一个Consumer Group中的一个消费者消费。
  • Replica: Topic的副本,确保数据的高可用性。
  • Leader: 副本中的角色,生产者和消费者只与Leader交互。
  • Follower: 副本中的角色,从Leader复制数据。
  • Controller: Kafka集群中的服务器之一,负责leader选举和故障转移。
  • Zookeeper: Kafka通过Zookeeper存储集群元数据。

二、Spring-Kafka配置

pom文件配置

  • Kafka客户端库

    xml <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>0.11.0.1</version> </dependency>

  • Spring Kafka库

    xml <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> <version>1.1.6.RELEASE</version> </dependency>

消费者配置

java // bootstrap.servers: 部署的Brokers集群 // group.id: 消费者群组ID // enable.auto.commit: 是否自动提交offset // fetch.max.bytes: 一次fetch请求的最大字节数 // max.poll.records: 一次poll请求的最大记录数

生产者配置

埋点数据已经存储在大数据方的Kafka中,我们只需要消费数据。后期开发了对外的SDK,通过引入SDK可以将分析数据存储到我们提供的Kafka中。

三、代码实现

由于消费过程中涉及复杂的业务逻辑,我们将消费Service放入线程池中异步执行。

Spring线程池配置

java @Configuration public class ThreadPoolConfig { @Bean(name = "kafkaConsumerTaskExecutor") public ThreadPoolTaskExecutor kafkaConsumerTaskExecutor() { ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); executor.setCorePoolSize(5); executor.setMaxPoolSize(10); executor.setQueueCapacity(20); executor.initialize(); return executor; } }

消费代码

```java @Component public class NgLogConsumerListener implements BatchAcknowledgingMessageListener {

private static final Logger LOGGER = LoggerFactory.getLogger(NgLogConsumerListener.class);

@Autowired
private NgLogConsumerService ngLogConsumerService;

@Autowired
@Qualifier("kafkaConsumerTaskExecutor")
private ThreadPoolTaskExecutor threadPoolTaskExecutor;

@Override
public void onMessage(List<ConsumerRecord<String, String>> data, Acknowledgment acknowledgment) {
    if (data.isEmpty()) {
        return;
    }

    try {
        threadPoolTaskExecutor.submit(() -> {
            ngLogConsumerService.handleData(data);
        });
    } catch (Exception e) {
        LOGGER.error("往消费线程池添加任务异常. data={}, ExceptionMsg={}", data, e.getMessage(), e);
    }

    try {
        acknowledgment.acknowledge();
    } catch (Exception e) {
        LOGGER.error("提交确认失败. data={}, ExceptionMsg={}", data, e.getMessage(), e);
    }
}

} ```

四、提供SDK

项目初期,埋点数据存储在大数据端。随着项目的迭代,其他项目需要接入监控系统,因此提供了自用的Kafka环境和SDK。

接入方数据格式

接入系统的业务数据为Java Bean AgentMessage,包含三个参数: - appName: 项目名称 - type: 业务类型 - source: 业务数据

SDK配置

```xml org.apache.kafka kafka-clients 0.11.0.1

com.alibaba fastjson 1.2.29 ```

SDK工具类

```java public class MonitorUtil {

private static final Logger LOGGER = LoggerFactory.getLogger(MonitorUtil.class);

private static volatile KafkaProducer<String, String> producer;
private static volatile ThreadPoolExecutor executor;

public static void sendData(AgentMessage message) {
    final AgentMessage finalMessage = message;
    ThreadPoolExecutor executor = getExecutor();
    try {
        executor.submit(() -> {
            send(finalMessage);
        });
    } catch (Exception e) {
        LOGGER.error("往消费线程池添加任务异常,message={}, errorMsg={}", JSON.toJSONString(finalMessage), e.getMessage(), e);
    }
}

private static void send(AgentMessage message) {
    final KafkaProducer<String, String> producer = getProducer();
    if (StringUtils.isBlank(message.getAppName())) {
        if (!StringUtils.isBlank(appName)) {
            message.setAppName(appName);
        } else {
            LOGGER.warn("无法获取appName");
            return;
        }
    }

    String topic = "monitor_" + message.getAppName();
    producer.send(new ProducerRecord<>(topic, JSON.toJSONString(message)), (metadata, exception) -> {
        if (exception != null) {
            LOGGER.error("发送记录失败,msg={}, errorMsg={}", JSON.toJSONString(message), exception.getMessage(), exception);
        }
    });
}

private static ThreadPoolExecutor getExecutor() {
    if (executor == null) {
        synchronized (MonitorUtil.class) {
            if (executor == null) {
                BlockingQueue<Runnable> queue = new LinkedBlockingQueue<>(3000);
                RejectedExecutionHandler handler = new ThreadPoolExecutor.AbortPolicy();
                executor = new ThreadPoolExecutor(25, 50, 300, TimeUnit.SECONDS, queue, handler);
            }
        }
    }
    return executor;
}

private static KafkaProducer<String, String> getProducer() {
    if (producer == null) {
        synchronized (MonitorUtil.class) {
            if (producer == null) {
                producer = new KafkaProducer<>(createProps());
            }
        }
    }
    return producer;
}

private static Properties createProps() {
    Properties props = new Properties();
    props.put("bootstrap.servers", ConfigManager.get("monitor.kafka.bootstrap.servers"));
    props.put("acks", ConfigManager.get("monitor.kafka.acks", "all"));
    props.put("retries", ConfigManager.getInteger("monitor.kafka.retries", 3));
    props.put("batch.size", ConfigManager.getInteger("monitor.kafka.batch.size", 16384));
    props.put("linger.ms", ConfigManager.getInteger("monitor.kafka.linger.ms", 10));
    props.put("buffer.memory", ConfigManager.getInteger("monitor.kafka.buffer.memory", 33554432));
    props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    return props;
}

} ```

五、遇到的问题及解决办法

问题一:消费者配置问题

  • 旧配置fetch.message.max.bytes 已经过时,应替换为 fetch.max.bytes
  • 解决方案:更新配置文件中的相关参数。

问题二:消费历史数据

  • 原因:消费方对已存在的topic建立了新的group ID,设置了消费的初始offset。
  • 解决方案:提前识别风险,停止消费数据,寻求Kafka管理员协助处理。

问题三:提交offset失败

  • 原因:可能由于JVM处理速度慢或网络波动导致。
  • 解决方案:改为手动提交offset。

六、总结

Kafka的配置和使用已经顺利完成,接下来将介绍如何将数据写入Elasticsearch。

    本文来源:图灵汇
责任编辑: : 米蔚然
声明:本文系图灵汇原创稿件,版权属图灵汇所有,未经授权不得转载,已经协议授权的媒体下载使用时须注明"稿件来源:图灵汇",违者将依法追究责任。
    分享
Kafka监控系统Elasticsearch搭建Grafana业务
    下一篇