在构建基于 Kafka 的消息处理系统中,消息丢失是一个需要深入研究的重要问题。强大的系统不仅依赖于其功能,而且依赖于其可靠性。因此,理解消息丢失的原因,并采取必要的措施确保消息的一致性和完整性,是构建高效可靠消息系统的重要组成部分。本文将详细分析 Kafka 消息丢失的主要原因,并提供一系列策略来解决这个问题。
消息丢失的原因
- 生产者端问题: 在 Kafka 系统中,生产者负责发送消息。然而,由于网络故障或其他未知问题,生产者可能无法成功发送消息到 Kafka 服务器。
- Kafka 服务端问题: Kafka 服务器可能会因为硬件故障、磁盘满或其他异常情况导致消息丢失。
- 消费者端问题: 消费者负责处理接收到的消息。但是,消费者在处理消息时可能会出现错误或崩溃,导致消息未被正确处理。
解决方案与措施
生产者端相关方案与措施
- 发送消息处理回调方法
由于消息的常规发送采用的异步方式,所以通常会忽略掉回调处理,为了保证消息的发送质量,一定需要对回调信息进行处理或者改为同步发送。
producer.send(new ProducerRecord<>(topic, messageKey, messageStr), new CallBack({...});
- 设置有效的重试策略以及 acks 配置
我们可以在生产者端设置一个有效的重试策略,保证消息成功发送。例如,我们可以使用指数退避算法进行重试。这种算法会在每次重试失败后等待更长的时间,从而减轻服务器的压力,并增加消息成功发送的概率。
通过设置 Producer acks 机制,我们可以确保生产者收到 Kafka 服务器的确认,知晓消息是否被成功提交。
- acks=0: 生产者在发送消息后不会等待任何确认,直接将消息视为发送成功。这种设置下,可能会出现消息丢失的情况,因为生产者不会等待服务器的任何确认即认为消息发送成功。
- acks=1: 生产者在发送消息后会等待 Leader Broker 的确认,确认后即视为消息发送成功。这种设置下,消息的可靠性得到一定程度的保证,但仍有可能发生 Leader Broker 宕机导致消息丢失的情况。
- acks=all: 生产者在发送消息后会等待 Leader Broker 和所有副本的确认,确认后才视为消息发送成功。这种设置下,消息的可靠性和一致性得到最高级别的保证,但同时也会增加网络延迟和资源消耗。
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
public class KafkaProducerExample {
private static final String TOPIC_NAME = "my-topic";
private static final String BOOTSTRAP_SERVERS = "localhost:9092";
public static void main(String[] args) {
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
// 设置重试次数为3次
props.put(ProducerConfig.RETRIES_CONFIG, 3);
// 指数退避算法参数
// 初始重试间隔为1秒
props.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, 1000);
// 最大重试间隔为30秒
props.put(ProducerConfig.RETRY_BACKOFF_MAX_MS_CONFIG, 30000);
// 设置 acks 配置为 all
properties.setProperty(ProducerConfig.ACKS_CONFIG, "all");
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
// ... 其他业务逻辑
}
}
- 启用 Kafka 日志压缩
Kafka 提供了日志压缩功能,这可以减少磁盘空间的使用,并提高消息存储的可靠性。通过这种方式,我们可以减少因磁盘满导致的消息丢失风险。
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
public class KafkaProducerExample {
private static final String TOPIC_NAME = "my-topic";
private static final String BOOTSTRAP_SERVERS = "localhost:9092";
public static void main(String[] args) {
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
// 其他设置...
// 启用日志压缩,使用gzip压缩算法
props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "gzip");
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
// ... 其他业务逻辑
}
}
Kafka 服务端相关方案与措施
在 Kafka 服务端,要保证消息的可靠性,需要从几方面考虑
合理的配置
- 根据数据量级,配置合理的 partition 数量,提高吞吐量,避免性能瓶颈导致消息丢失
高可用:
- 多副本配置: 在 Kafka 集群中设置多个副本,以确保即使某些 Broker 发生故障,仍然能够保证消息的可用性和一致性
- ISR(In-Sync Replicas)机制: 使用 ISR 机制来确保所有副本之间的同步性,只有在所有 ISR 中的副本都同步成功后才认为消息发送成功
- Controller 选举: 配置 Controller 选举机制,确保 Kafka 集群中的 Controller 能够及时处理 Broker 的故障和切换
数据持久化和日志管理:
- 数据持久化策略: 配置合适的数据持久化策略,例如使用持久化日志来存储消息,保证消息不会因为服务重启或异常导致丢失。
- 日志管理和清理: 定期清理过期的日志段(log segment),避免日志文件过大导致磁盘空间不足,同时确保消息的及时清理和归档。
监控和故障恢复:
- 监控和报警: 配置监控系统,实时监测 Kafka 集群的运行状态和性能指标,并设置报警机制,在出现异常或性能下降时及时发现并处理。
- 故障自愈: 配置自动故障恢复机制,例如使用 Kafka 的 Controller 自动进行 Broker 的故障检测和切换,确保集群在发生故障时能够快速恢复。
消费者端解决方案与措施
手动维护 offset
- 将
enable.auto.commit
设置为false
,并且在消费者端手动提交偏移量。 - 使用较小的
auto.commit.interval.ms
值,以减少自动提交偏移量的时间间隔,提高偏移量的提交频率。
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.TopicPartition;
import redis.clients.jedis.Jedis;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;
public class KafkaConsumerWithRedisOffsetOnStartup {
public static void main(String[] args) {
String kafkaBootstrapServers = "kafka-broker1:9092,kafka-broker2:9092";
String kafkaTopic = "test-topic";
String groupId = "test-consumer-group";
String redisHost = "localhost";
int redisPort = 6379;
// Kafka Consumer 配置
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBootstrapServers);
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false"); // 关闭自动提交偏移量
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "100"); // 设置自动提交偏移量的时间间隔为 100 毫秒
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
// Redis 连接
Jedis jedis = new Jedis(redisHost, redisPort);
try {
// 从 Redis 中获取偏移量
Map<TopicPartition, Long> offsets = getOffsetsFromRedis(jedis, kafkaTopic, groupId);
// 订阅主题并设置偏移量
consumer.subscribe(Collections.singleton(kafkaTopic), new ConsumerRebalanceListener() {
@Override
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {}
@Override
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
for (TopicPartition partition : partitions) {
consumer.seek(partition, offsets.getOrDefault(partition, 0L));
}
}
});
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
System.out.println("Received message: " + record.value());
// 处理消息...
// 手动提交偏移量到 Redis
saveOffsetInRedis(jedis, record.topic(), record.partition(), record.offset());
// 手动提交偏移量到 Kafka
consumer.commitSync(Collections.singletonMap(new TopicPartition(record.topic(), record.partition()), new OffsetAndMetadata(record.offset() + 1)));
}
}
} finally {
consumer.close();
jedis.close();
}
}
private static Map<TopicPartition, Long> getOffsetsFromRedis(Jedis jedis, String topic, String groupId) {
Map<TopicPartition, Long> offsets = new HashMap<>();
for (String key : jedis.keys("offset-" + topic + "-*")) {
String[] parts = key.split("-");
int partition = Integer.parseInt(parts[parts.length - 1]);
long offset = Long.parseLong(jedis.get(key));
offsets.put(new TopicPartition(topic, partition), offset);
}
return offsets;
}
private static void saveOffsetInRedis(Jedis jedis, String topic, int partition, long offset) {
String key = "offset-" + topic + "-" + partition;
jedis.set(key, String.valueOf(offset));
}
}
总结
总得来说,关于消息可靠性的保证,需要从生产端、服务端、消费端三个方面综合考虑,不是仅仅一个方面的问题。
拓展
Kafka 官方常用工具
# 查看某个topic的message数量
$ ./kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list localhost:9092 --topic test_topic
# 查看consumer Group列表
$ ./kafka-consumer-groups.sh --list --bootstrap-server 192.168.88.108:9092
# 查看 offset 消费情况
$ ./kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group console-consumer-1152 --describe
GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
console-consumer-1152 test_topic 0 - 4 - consumer-console-consumer-1152-1-2703ea2b-b62d-4cfd-8950-34e8c321b942 /127.0.0.1 consumer-console-consumer-1152-1
Kafka 日志刷盘机制
# 推荐采用默认值,即不配置该配置,交由操作系统自行决定何时落盘,以提升性能。
# 针对 broker 配置:
log.flush.interval.messages=10000 # 日志落盘消息条数间隔,即每接收到一定条数消息,即进行log落盘。
log.flush.interval.ms=1000 # 日志落盘时间间隔,单位ms,即每隔一定时间,即进行log落盘。
# 针对 topic 配置:
flush.messages.flush.ms=1000 # topic下每1s刷盘
flush.messages=1 # topic下每个消息都落盘
# 查看 Linux 后台线程执行配置
$ sysctl -a | grep dirty
vm.dirty_background_bytes = 0
vm.dirty_background_ratio = 10 # 表示当脏页占总内存的的百分比超过这个值时,后台线程开始刷新脏页。
vm.dirty_bytes = 0
vm.dirty_expire_centisecs = 3000 # 表示脏数据多久会被刷新到磁盘上(30秒)。
vm.dirty_ratio = 20
vm.dirty_writeback_centisecs = 500 # 表示多久唤醒一次刷新脏页的后台线程(5秒)。
vm.dirtytime_expire_seconds = 43200