在构建基于 Kafka 的消息处理系统中,消息丢失是一个需要深入研究的重要问题。强大的系统不仅依赖于其功能,而且依赖于其可靠性。因此,理解消息丢失的原因,并采取必要的措施确保消息的一致性和完整性,是构建高效可靠消息系统的重要组成部分。本文将详细分析 Kafka 消息丢失的主要原因,并提供一系列策略来解决这个问题。

消息丢失的原因

  1. 生产者端问题: 在 Kafka 系统中,生产者负责发送消息。然而,由于网络故障或其他未知问题,生产者可能无法成功发送消息到 Kafka 服务器。
  2. Kafka 服务端问题: Kafka 服务器可能会因为硬件故障、磁盘满或其他异常情况导致消息丢失。
  3. 消费者端问题: 消费者负责处理接收到的消息。但是,消费者在处理消息时可能会出现错误或崩溃,导致消息未被正确处理。

解决方案与措施

生产者端相关方案与措施

  1. 发送消息处理回调方法

由于消息的常规发送采用的异步方式,所以通常会忽略掉回调处理,为了保证消息的发送质量,一定需要对回调信息进行处理或者改为同步发送。

producer.send(new ProducerRecord<>(topic, messageKey, messageStr),  new CallBack({...});
  1. 设置有效的重试策略以及 acks 配置

我们可以在生产者端设置一个有效的重试策略,保证消息成功发送。例如,我们可以使用指数退避算法进行重试。这种算法会在每次重试失败后等待更长的时间,从而减轻服务器的压力,并增加消息成功发送的概率。

通过设置 Producer acks 机制,我们可以确保生产者收到 Kafka 服务器的确认,知晓消息是否被成功提交。

  • acks=0: 生产者在发送消息后不会等待任何确认,直接将消息视为发送成功。这种设置下,可能会出现消息丢失的情况,因为生产者不会等待服务器的任何确认即认为消息发送成功。
  • acks=1: 生产者在发送消息后会等待 Leader Broker 的确认,确认后即视为消息发送成功。这种设置下,消息的可靠性得到一定程度的保证,但仍有可能发生 Leader Broker 宕机导致消息丢失的情况。
  • acks=all: 生产者在发送消息后会等待 Leader Broker 和所有副本的确认,确认后才视为消息发送成功。这种设置下,消息的可靠性和一致性得到最高级别的保证,但同时也会增加网络延迟和资源消耗。
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;

public class KafkaProducerExample {

    private static final String TOPIC_NAME = "my-topic";
    private static final String BOOTSTRAP_SERVERS = "localhost:9092";

    public static void main(String[] args) {
        Properties props = new Properties();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

        // 设置重试次数为3次
        props.put(ProducerConfig.RETRIES_CONFIG, 3);

		// 指数退避算法参数
		// 初始重试间隔为1秒
		props.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, 1000);
		// 最大重试间隔为30秒
		props.put(ProducerConfig.RETRY_BACKOFF_MAX_MS_CONFIG, 30000);
		// 设置 acks 配置为 all
		properties.setProperty(ProducerConfig.ACKS_CONFIG, "all");

        KafkaProducer<String, String> producer = new KafkaProducer<>(props);
        // ... 其他业务逻辑
    }
}
  1. 启用 Kafka 日志压缩

Kafka 提供了日志压缩功能,这可以减少磁盘空间的使用,并提高消息存储的可靠性。通过这种方式,我们可以减少因磁盘满导致的消息丢失风险。

import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;

public class KafkaProducerExample {

    private static final String TOPIC_NAME = "my-topic";
    private static final String BOOTSTRAP_SERVERS = "localhost:9092";

    public static void main(String[] args) {
        Properties props = new Properties();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

        // 其他设置...
        // 启用日志压缩,使用gzip压缩算法
        props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "gzip");
        KafkaProducer<String, String> producer = new KafkaProducer<>(props);
        // ... 其他业务逻辑
    }
}

Kafka 服务端相关方案与措施

在 Kafka 服务端,要保证消息的可靠性,需要从几方面考虑

合理的配置

  1. 根据数据量级,配置合理的 partition 数量,提高吞吐量,避免性能瓶颈导致消息丢失

高可用:

  1. 多副本配置: 在 Kafka 集群中设置多个副本,以确保即使某些 Broker 发生故障,仍然能够保证消息的可用性和一致性
  2. ISR(In-Sync Replicas)机制: 使用 ISR 机制来确保所有副本之间的同步性,只有在所有 ISR 中的副本都同步成功后才认为消息发送成功
  3. Controller 选举: 配置 Controller 选举机制,确保 Kafka 集群中的 Controller 能够及时处理 Broker 的故障和切换

数据持久化和日志管理:

  • 数据持久化策略: 配置合适的数据持久化策略,例如使用持久化日志来存储消息,保证消息不会因为服务重启或异常导致丢失。
  • 日志管理和清理: 定期清理过期的日志段(log segment),避免日志文件过大导致磁盘空间不足,同时确保消息的及时清理和归档。

监控和故障恢复:

  • 监控和报警: 配置监控系统,实时监测 Kafka 集群的运行状态和性能指标,并设置报警机制,在出现异常或性能下降时及时发现并处理。
  • 故障自愈: 配置自动故障恢复机制,例如使用 Kafka 的 Controller 自动进行 Broker 的故障检测和切换,确保集群在发生故障时能够快速恢复。

消费者端解决方案与措施

手动维护 offset

  • enable.auto.commit 设置为 false,并且在消费者端手动提交偏移量。
  • 使用较小的 auto.commit.interval.ms 值,以减少自动提交偏移量的时间间隔,提高偏移量的提交频率。
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.TopicPartition;
import redis.clients.jedis.Jedis;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;

public class KafkaConsumerWithRedisOffsetOnStartup {
    public static void main(String[] args) {
        String kafkaBootstrapServers = "kafka-broker1:9092,kafka-broker2:9092";
        String kafkaTopic = "test-topic";
        String groupId = "test-consumer-group";
        String redisHost = "localhost";
        int redisPort = 6379;

        // Kafka Consumer 配置
        Properties props = new Properties();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBootstrapServers);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false"); // 关闭自动提交偏移量
        props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "100"); // 设置自动提交偏移量的时间间隔为 100 毫秒

        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);

        // Redis 连接
        Jedis jedis = new Jedis(redisHost, redisPort);

        try {
            // 从 Redis 中获取偏移量
            Map<TopicPartition, Long> offsets = getOffsetsFromRedis(jedis, kafkaTopic, groupId);
            // 订阅主题并设置偏移量
            consumer.subscribe(Collections.singleton(kafkaTopic), new ConsumerRebalanceListener() {
                @Override
                public void onPartitionsRevoked(Collection<TopicPartition> partitions) {}

                @Override
                public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
                    for (TopicPartition partition : partitions) {
                        consumer.seek(partition, offsets.getOrDefault(partition, 0L));
                    }
                }
            });

            while (true) {
                ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
                for (ConsumerRecord<String, String> record : records) {
                    System.out.println("Received message: " + record.value());
                    // 处理消息...
                    // 手动提交偏移量到 Redis
                    saveOffsetInRedis(jedis, record.topic(), record.partition(), record.offset());
                    // 手动提交偏移量到 Kafka
                    consumer.commitSync(Collections.singletonMap(new TopicPartition(record.topic(), record.partition()), new OffsetAndMetadata(record.offset() + 1)));
                }
            }
        } finally {
            consumer.close();
            jedis.close();
        }
    }

    private static Map<TopicPartition, Long> getOffsetsFromRedis(Jedis jedis, String topic, String groupId) {
        Map<TopicPartition, Long> offsets = new HashMap<>();
        for (String key : jedis.keys("offset-" + topic + "-*")) {
            String[] parts = key.split("-");
            int partition = Integer.parseInt(parts[parts.length - 1]);
            long offset = Long.parseLong(jedis.get(key));
            offsets.put(new TopicPartition(topic, partition), offset);
        }
        return offsets;
    }

    private static void saveOffsetInRedis(Jedis jedis, String topic, int partition, long offset) {
        String key = "offset-" + topic + "-" + partition;
        jedis.set(key, String.valueOf(offset));
    }
}

总结

总得来说,关于消息可靠性的保证,需要从生产端、服务端、消费端三个方面综合考虑,不是仅仅一个方面的问题。

拓展

Kafka 官方常用工具

# 查看某个topic的message数量
$ ./kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list localhost:9092 --topic test_topic
# 查看consumer Group列表
$ ./kafka-consumer-groups.sh  --list  --bootstrap-server 192.168.88.108:9092
# 查看 offset 消费情况
$ ./kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group console-consumer-1152 --describe
GROUP                 TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID                                                           HOST            CLIENT-ID
console-consumer-1152 test_topic      0          -               4               -               consumer-console-consumer-1152-1-2703ea2b-b62d-4cfd-8950-34e8c321b942 /127.0.0.1      consumer-console-consumer-1152-1

Kafka  日志刷盘机制

# 推荐采用默认值,即不配置该配置,交由操作系统自行决定何时落盘,以提升性能。
# 针对 broker 配置:
log.flush.interval.messages=10000 # 日志落盘消息条数间隔,即每接收到一定条数消息,即进行log落盘。
log.flush.interval.ms=1000        # 日志落盘时间间隔,单位ms,即每隔一定时间,即进行log落盘。

# 针对 topic 配置:
flush.messages.flush.ms=1000  # topic下每1s刷盘
flush.messages=1              # topic下每个消息都落盘

# 查看 Linux 后台线程执行配置
$ sysctl -a | grep dirty
vm.dirty_background_bytes = 0
vm.dirty_background_ratio = 10      # 表示当脏页占总内存的的百分比超过这个值时,后台线程开始刷新脏页。
vm.dirty_bytes = 0
vm.dirty_expire_centisecs = 3000    # 表示脏数据多久会被刷新到磁盘上(30秒)。
vm.dirty_ratio = 20
vm.dirty_writeback_centisecs = 500  # 表示多久唤醒一次刷新脏页的后台线程(5秒)。
vm.dirtytime_expire_seconds = 43200