Kafka 消息丢失与重复消费
双十一活动结束后,订单中心发现了一个严重问题:出现了大量重复订单。
开发团队排查了三天,发现罪魁祸首是一个Kafka消费者——在Rebalance期间,部分消息被重复消费了。
更糟糕的是,由于消费者使用了自动提交offset,Rebalance前处理的消息在offset提交后丢失了——这两件事同时发生了。
这是一个Kafka消息可靠性的经典"组合拳"翻车事故。
【面试官心理】
这道题我用来测试候选人对Kafka可靠性问题的系统理解。90%的候选人能说出"消息丢失"或"重复消费"其中一个,但说不清两者的因果关系——消息丢失和重复消费往往是一枚硬币的两面,很多"防丢失"的方案反而会加剧"重复消费",反之亦然。能从生产端、Broker端、消费端三端完整分析可靠性的,是有实战经验的P6+。
一、核心问题:消息丢失的三端分析 🔴
1.1 问题拆解
第一层:场景认知
面试官问:"Kafka会出现消息丢失吗?在什么环节会丢?"
候选人答:"...网络不好的时候?Broker挂的时候?"
考察点:基本认知
第二层:三端分析
面试官追问:"消息从生产到消费,经过了哪些环节?每个环节都可能丢吗?"
候选人答:...(拉开点1)
考察点:全链路思维
第三层:解决方案
面试官追问:"怎么防止消息丢失?生产端、Broker端、消费端分别怎么配置?"
候选人答:...(拉开点2)
考察点:工程实践
第四层:重复消费的关联
面试官追问:"防止丢失的配置会导致重复消费吗?丢失和重复怎么平衡?"
候选人答:...(P7区分点)
考察点:Trade-off思维
1.2 错误示范
候选人原话 A:"Kafka消息不会丢,因为它有副本机制。"
问题诊断:
- 副本机制不等于不丢消息
- 如果写入时只等Leader确认,Follower还没同步完就宕机,数据照样丢
- 说出这话说明对ACK机制完全不了解
候选人原话 B:"把acks=all和retries设大就不会丢消息了。"
问题诊断:
- 知道用
acks=all是对的,但不理解其代价
- 忽略了Broker端和Consumer端的配置
- 没有形成完整的可靠性体系思维
候选人原话 C:"消费者处理完后手动提交offset就行了。"
问题诊断:
- 手动提交只能保证Consumer端不丢
- 如果在处理前就提交了offset,处理失败也会丢
- 不知道先处理再提交的基本原则
1.3 标准回答
消息丢失分布在三个环节,每个环节的根因和解决方案都不同:
第一端:生产端丢消息
触发条件:acks=0 + 网络抖动
场景:
Producer.send(record) → Broker没收到 → 网络抖动丢包
→ Fire-and-Forget,发出去就不管了
实际场景:
订单服务发消息 → 网络抖动 → Kafka没收到
→ 订单已创建,但MQ里没有
→ 下游系统收不到通知,订单状态不一致
// 生产端防丢失配置
props.put("acks", "all"); // 等待所有ISR副本确认
props.put("retries", Integer.MAX_VALUE); // 无限重试
props.put("retry.backoff.ms", "1000"); // 重试间隔1秒
props.put("enable.idempotence", "true"); // 幂等,防止重复
第二端:Broker端丢消息
触发条件:acks=1 + Leader宕机
场景:
Producer发消息 → Leader写入成功返回ACK
→ Follower还没同步 → Leader宕机
→ 新Leader没有这条消息 → 消息丢失
触发条件2:unclean leader election
所有ISR副本都挂了
unclean.leader.election.enable=true
→ 从非ISR的落后副本选Leader
→ 落后副本丢失的数据成为新Leader的数据
→ 消息彻底丢失
# Broker端防丢失配置
default.replication.factor=3 # 副本数3
min.insync.replicas=2 # ISR最少2个
unclean.leader.election.enable=false # 不允许从不完整副本选Leader
⚠️
Broker端丢消息是最隐蔽的——配置看起来都对,Leader也正常,但一旦Leader切换,数据就不完整了。unclean.leader.election.enable=false 是必须配置的关键项,但90%的团队会忽略它。
第三端:消费端丢消息
触发条件:自动提交offset + Consumer处理失败
场景:
Consumer.poll() → 拉取消息
→ 自动提交offset(偏移量已提交)
→ 处理消息 → 异常!
→ 消息没处理成功,但offset已经提交
→ Consumer重启后从新offset开始消费
→ 旧offset之后的消息被跳过 → 消息丢失
更隐蔽的场景:
Consumer.poll() → 拉取1000条消息
→ 处理到第500条时宕机
→ 自动提交每5秒一次,已经提交了offset=500
→ 500条消息丢失
// 消费端防丢失配置
props.put("enable.auto.commit", "false"); // 关闭自动提交
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
for (ConsumerRecord<String, String> record : records) {
try {
processMessage(record);
consumer.commitSync(); // 处理成功后手动提交
} catch (Exception e) {
// 方案1:不提交offset,下次重新消费这条消息
// 方案2:记录失败消息,人工/定时重试
handleFailure(record);
}
}
}
【面试官心理】
三端丢消息的问题,我最关心的是候选人能不能串起来讲——不只是知道"怎么防",更要知道"为什么这样防"。能说出"防止丢失的配置会导致重复消费,而重复消费需要幂等兜底"这个因果链的,说明有全局思维。
1.4 追问升级
P6/P7 差距拉开点:
面试官问:"如果我既要防止丢失,又要避免重复消费,能同时做到吗?"
这道题的分水岭:
- P5:不能同时做到,丢失和重复是矛盾的
- P6:能说出"理论上可以,用Exactly-Once",但说不清具体实现
- P7:能说出Kafka的Exactly-Once只解决Producer端问题,Consumer端仍需幂等;能解释"本地事务+消息处理"的原子性方案
二、延伸问题:重复消费的根源与处理 🟡
2.1 重复消费的三大场景
场景一:Producer 重试导致重复发送
触发路径:
Producer.send() → Broker收到 → 返回ACK时网络超时
→ Producer重试 → Broker再次收到同一条消息
→ 消息被写入两次
PID+SequenceNumber 解决:
每个Producer有唯一PID
每条消息带Sequence Number
Broker记录每个PID+Partition的最新序列号
重复消息被识别并丢弃
场景二:Consumer Rebalance 导致重复消费
触发路径:
Consumer消费到第100条消息时,处理中
→ Consumer心跳超时,被踢出Consumer Group
→ Rebalance发生
→ 新Consumer从offset=0开始消费
→ 第0~100条消息被重新消费
这是Rebalance最讨厌的地方:
- 消费者正在处理的消息被打断
- 新消费者从头开始
→ 重复消费几乎不可避免
场景三:Consumer 重启后从头消费
触发路径:
Consumer上次消费到offset=5000
→ 重启
→ 自动提交offset设置了5000
→ 但业务处理是异步的,处理到offset=4800时宕机
→ 实际处理到了4800,但提交的是5000
→ 重启后从5000开始,4800~4999被跳过
手动提交offset的情况下:
重启后从最后一次提交的offset继续
→ 如果上次提交的是offset=4800(处理成功),正常
→ 如果上次提交的是offset=5000(处理失败),消息丢失
2.2 幂等消费者的实现
Consumer端防重复消费的核心是业务幂等:
// 方案1:Redis去重(推荐,适用于高频场景)
public void processMessage(ConsumerRecord<String, String> record) {
String msgId = record.key(); // 消息的唯一ID
String redisKey = "msg:processed:" + msgId;
// SETNX + TTL,防止重复处理
boolean alreadyProcessed = redis.setIfAbsent(redisKey, "1",
java.time.Duration.ofDays(7));
if (alreadyProcessed) {
// 第一次处理,正常执行
doProcess(record);
} else {
// 已处理过,跳过
log.info("消息已处理过,跳过: {}", msgId);
}
}
// 方案2:数据库唯一键去重(适用于低频场景)
public void processMessage(OrderMessage msg) {
try {
// 尝试插入,如果唯一键冲突则忽略
orderMapper.insertIgnore(msg.getOrderId(), msg.getContent());
} catch (DuplicateKeyException e) {
// 唯一键冲突,说明已处理过,跳过
log.info("订单已处理过,跳过: {}", msg.getOrderId());
}
}
// 方案3:状态机校验(适用于有明确状态流转的场景)
public void processMessage(OrderMessage msg) {
Order order = orderMapper.selectById(msg.getOrderId());
// 状态机校验:只有从"待支付"才能流转到"已支付"
if ("待支付".equals(order.getStatus()) && "支付".equals(msg.getAction())) {
orderMapper.updateStatus(msg.getOrderId(), "已支付");
} else {
log.info("状态不符合,跳过: orderId={}, status={}, action={}",
msg.getOrderId(), order.getStatus(), msg.getAction());
}
}
💡
幂等方案的选择原则:
- 高频消息(每秒>1000条):用Redis,内存操作,延迟低
- 低频消息(每秒
<100条):用数据库唯一键,实现简单
- 有状态流转的消息:用状态机,保证业务逻辑正确
三、生产避坑:丢消息和重复消费的经典组合拳
3.1 坑一:acks=all + 自动提交 导致"既丢又重"
场景:生产端配置了acks=all,消费端用了enable.auto.commit=true。
发生了什么:
1. Consumer poll() 拉取1000条消息
2. 自动提交offset=1000(每秒一次)
3. 处理到第500条时宕机
4. 重启后从offset=1000继续
5. offset 500~999的消息丢失!
同时,Broker端acks=all:
6. Producer发消息到3个副本,都确认了
7. 但ISR切换时,未完全同步的消息被截断
8. Consumer收到的是截断后的消息
→ 部分消息丢失,部分消息重复
正确做法:
- 消费端必须用手动提交
- 先处理业务,再提交offset
- 处理失败不提交offset,下次重新消费
3.2 坑二:幂等方案选错导致性能问题
场景:高频消息(每秒10万条),用数据库唯一键做幂等。
问题:
- 每次消息处理都要插入数据库
- 数据库成为瓶颈,吞吐量骤降
- 甚至可能出现数据库连接池耗尽
解决方案:
- Redis SETNX 代替数据库插入
- 批量去重:先收集1000条消息,再批量查Redis
- 或者使用Kafka本身提供的幂等producer
四、工程选型:可靠性配置的工程代价
4.1 场景化配置矩阵
4.2 Exactly-Once 的真相
Kafka的Exactly-Once语义需要三重保障:
Exactly-Once = 幂等生产者(防重复发送)
+ 事务(原子性保证)
+ 消费者幂等(防重复消费)
代价:
- 吞吐量下降 30%~50%(事务开销)
- 端到端延迟增加(事务需要协调)
- 运维复杂度提升(事务日志管理)
所以:
日志采集场景:用At-Least-Once + 消费者幂等就够了
交易链路场景:用Exactly-Once + 幂等消费者
【面试官心理】
面试这道题,我最想听的是候选人对"可靠性不是免费的"的理解。能说出"提高可靠性会降低吞吐、增加延迟、提升复杂度",能根据业务场景选择合适配置的,是有工程判断力的P6+。只会背配置参数、不知道权衡取舍的,是典型的P5。