消息积压处理策略
凌晨2点,监控告警响起:Kafka消费严重积压,lag超过100万条。
开发同学从床上爬起来,打开监控面板一看,发现是某个Consumer处理一条消息要5秒——因为上游服务接口超时,导致每条消息都在等待。
更糟糕的是,这个Consumer所在的Consumer Group只有2个实例,而Topic有8个分区。
消息积压越来越严重,每小时增长50万条,照这个速度,明天早上就能积压1000万条。
【面试官心理】
这道题我考察的是候选人对MQ生产故障的处理能力。消息积压是MQ使用中最常见的事故之一,90%的候选人知道"增加消费者",但90%说不清"什么时候该扩容"、"扩容后数据怎么不乱"、"怎么避免重复消费"。能完整处理消息积压问题的,是有生产运维经验的P6+。
一、核心问题:消息积压的原因分析 🔴
1.1 问题拆解
第一层:识别问题
面试官问:"消息积压是什么?怎么判断有没有积压?"
候选人答:"consumer lag变大,消息堆积..."
考察点:基本认知
第二层:原因分析
面试官追问:"积压的原因有哪些?怎么定位是哪个环节的问题?"
候选人答:...(拉开点1)
考察点:排查能力
第三层:应急处理
面试官追问:"线上积压了100万条,怎么快速处理?"
候选人答:...(核心拉开点)
考察点:应急处理能力
第四层:根治方案
面试官追问:"处理完之后怎么防止再积压?"
候选人答:...(P7区分点)
考察点:系统性思维
1.2 错误示范
候选人原话 A:"消息积压就增加消费者,简单。"
问题诊断:
- 增加消费者能解决问题,但不是所有场景都有效
- 如果Consumer处理慢是瓶颈,加再多Consumer也没用
- 不诊断根因就扩容是乱弹琴
候选人原话 B:"积压了就等它慢慢消费完,不着急。"
问题诊断:
- 积压意味着下游状态和上游严重不一致
- 业务时效性要求高的场景(如秒杀),积压会导致大量超时
- 缺乏紧迫感
候选人原话 C:"消费者挂了,重启就好了。"
问题诊断:
- 如果不重启,消费者会继续积压
- 如果重启,要小心offset提交问题(已消费的消息不能重复)
- 不分析挂的原因,下次还会挂
1.3 标准回答
消息积压的本质:消费速度跟不上生产速度
积压公式:
积压量 = 生产速度 × 时间 - 消费速度 × 时间
当 积压量 > 0 时,消息持续堆积
临界点:
消费速度 >= 生产速度 → 积压量不再增长(但也不会下降)
消费速度 > 生产速度 → 积压量开始下降
积压的五大原因:
原因1:Consumer 挂了
症状:
- Consumer Lag瞬间飙升到积压总量
- Consumer Group的成员数量减少
- 其他Consumer的Lag也飙升(因为接管了原本该接管的分区)
排查命令(Kafka):
kafka-consumer-groups.sh --bootstrap-server kafka:9092 \
--group consumer-group --describe
# 输出中查看:
# CONSUMERS: 成员数量
# LAG: 当前积压量
# STATUS: Consumer状态(Stable/PreparingRebalance等)
原因2:Consumer 处理速度慢
症状:
- Consumer Lag缓慢增长
- Consumer Group成员数量正常
- 所有Consumer的Lag都在涨
常见原因:
1. 外部服务调用超时(如数据库慢查询、RPC超时)
2. 单条消息处理逻辑过重(如复杂的业务计算)
3. Consumer配置不当(如max.poll.records设太大)
4. 消费者线程池配置不当(并发数不足)
原因3:生产者速度过快
症状:
- 消息积压突然增加
- Consumer处理速度正常
- 可能是营销活动、突发流量
排查:
- 查看生产端日志,看是否有异常流量
- 查看上游服务的调用量
- 分析消息的产生规律
原因4:消息格式/反序列化问题
症状:
- Consumer消费报错(反序列化异常)
- 大量消息消费失败,进入重试
- 重试消息和正常消息混在一起
排查:
- Consumer日志中搜索异常关键字
- 查看死信队列(DLQ)是否有大量消息
原因5:消费者 Rebalance 风暴
症状:
- Consumer Lag周期性增长
- Consumer Group状态频繁变化
- 多个Consumer反复加入/离开
原因:
- session.timeout.ms设得太小
- Consumer处理超时(超过max.poll.interval.ms)
- 网络抖动导致心跳丢失
【面试官心理】
积压原因的分析是这道题的核心。我会追问:"如果consumer_lag=100万,你怎么定位是哪条消息导致的积压?"能说出"从消费最慢的那条消息入手"、"分析单条消息的处理时间分布"的P6+。能说出"用分布式追踪定位慢消息"的P7。
1.4 追问升级
P6/P7 差距拉开点:
面试官问:"Consumer重启后,积压的消息会重复消费吗?怎么避免?"
这道题的分水岭:
- P5:不知道会不会重复
- P6:知道用手动提交offset避免重复消费
- P7:能说出"重启前记录消费进度"、"幂等消费"、"分批处理积压消息"的完整方案
二、延伸问题:积压的应急处理 🟡
2.1 紧急扩容:三步走
第一步:快速扩容消费者
# 扩容前:2个Consumer,8个分区
# 每个Consumer平均消费4个分区
# 扩容后:8个Consumer,8个分区
# 每个Consumer平均消费1个分区
# 消费能力理论上提升4倍
# 但扩容过程中会触发Rebalance!
# 建议:用StickyAssignor,减少Rebalance影响
// 如果是Kafka,扩容Consumer实例
// 注意:Consumer数不能超过Partition数
// 如果超过了,多余的Consumer会空转
// 查看当前分区数
kafka-topics.sh --describe --topic order-events --bootstrap-server kafka:9092
// 如果分区数不够,先增加分区
kafka-topics.sh --alter --topic order-events \
--partitions 16 --bootstrap-server kafka:9092
第二步:限流生产端
// 生产端限流:降低生产速度,为消费端争取时间
// 方案1:降低发送频率
Thread.sleep(100); // 每条消息间隔100ms
// 方案2:使用令牌桶限流
RateLimiter limiter = RateLimiter.create(1000); // 每秒1000条
for (Message msg : messages) {
limiter.acquire();
producer.send(msg);
}
// 方案3:直接暂停生产(极端情况)
producer.pause(); // 暂停发送
// 处理完积压后
producer.resume(); // 恢复发送
第三步:优化消费逻辑
// 积压期间,临时优化Consumer配置
// 1. 增加每次拉取的消息数量
props.put("max.poll.records", "2000"); // 默认500条
// 2. 减少poll间隔
props.put("fetch.min.bytes", "1"); // 有消息就拉
// 3. 关闭部分校验逻辑(积压期间降级)
// 积压处理优先,不做详细的业务校验
// 4. 使用批量处理
List<Message> batch = new ArrayList<>();
while ((msg = consumer.poll(100)) != null) {
batch.add(msg);
if (batch.size() >= 100) {
processBatch(batch); // 批量处理
batch.clear();
}
}
💡
扩容不是银弹。如果Consumer处理慢的根因没解决,扩容只能缓解症状。比如,如果每个Consumer调用同一个慢接口,扩容10倍不会让接口变快,反而可能把上游服务打挂。
2.2 积压消息的特殊处理
场景:积压的消息有大量是超时无效的(如订单已取消),不需要处理。
// 方案:优先消费"有价值"的消息,跳过"无效"消息
while (true) {
List<MessageExt> msgs = consumer.poll(1000);
// 按消息年龄分组
Map<Long, List<MessageExt>> grouped =
msgs.stream().collect(Collectors.groupingBy(msg -> {
return (System.currentTimeMillis() - msg.getBornTimestamp()) / 60000;
}));
// 优先处理最近的消息(可能还在时效内)
grouped.keySet().stream().sorted()
.forEach(age -> {
for (MessageExt msg : grouped.get(age)) {
if (isStillValid(msg)) {
processMessage(msg);
} else {
skipMessage(msg); // 跳过无效消息
}
}
});
}
三、生产避坑:积压的预防机制
3.1 坑一:Consumer处理失败导致无限重试
场景:某条消息处理失败后无限重试,导致后续消息全部卡住。
问题:
ConsumeConcurrentlyContext.setDelayLevelWhenQueueConsumeSpan(3000);
→ 消费失败后延迟3秒重试
→ 如果这条消息一直失败(死循环、格式错误)
→ 这条消息会一直重试,阻塞后续消息
影响:
顺序消费模式下,当前Queue的消息全部卡住
并行消费模式下,如果重试队列满了,新消息也无法处理
解决方案:
// 1. 设置重试上限
int maxRetry = 3;
int retryCount = getRetryCount(msg.getMsgId()); // 从Redis/DB获取重试次数
if (retryCount >= maxRetry) {
// 超过重试次数,投递到死信队列(DLQ)
sendToDeadLetterQueue(msg);
return ConsumeStatus.SUCCESS; // 返回成功,不再重试
}
// 2. 区分可重试和不可重试的异常
try {
processMessage(msg);
} catch (TransientException e) {
// 临时异常(网络、数据库),可以重试
return ConsumeStatus.RECONSUME_LATER;
} catch (FatalException e) {
// 致命异常(数据格式、业务错误),直接投递DLQ
sendToDeadLetterQueue(msg);
return ConsumeStatus.SUCCESS;
}
3.2 坑二:消费者扩容后消息乱序
场景:扩容后,部分消息被多个Consumer重复消费。
问题:
Consumer-1 正在处理消息A
Consumer-1 被判定为离线(Rebalance发生)
Consumer-2 接管了原本属于Consumer-1的分区
Consumer-2 重新消费了消息A
→ 消息A被消费了两次
解决方案:
// 1. 幂等消费:业务层去重
public void processMessage(Message msg) {
String msgId = msg.getMsgId();
String key = "consumed:msg:" + msgId;
// Redis SETNX 去重
boolean alreadyConsumed = redis.setIfAbsent(key, "1",
Duration.ofDays(7));
if (alreadyConsumed) {
doProcess(msg);
} else {
log.info("消息已处理过,跳过: {}", msgId);
}
}
// 2. 扩容时避免Rebalance
// 使用StickyAssignor
props.put("partition.assignment.strategy",
"org.apache.kafka.clients.consumer.StickyAssignor");
// 或者:扩容时先暂停消费,处理完积压再扩容
3.3 坑三:消息堆积导致磁盘爆满
场景:Kafka积压太久,磁盘空间不足。
问题:
Kafka retention默认7天
如果积压超过7天,旧消息会被删除
但如果Consumer完全停摆了,消息会一直堆积
Kafka磁盘空间计算:
每条消息约 1KB
100万消息 = 1GB
峰值10万QPS积压1小时 = 360GB
磁盘必须预留足够空间
处理方案:
# 1. 查看Broker磁盘使用情况
kafka-log-dirs.sh --describe \
--bootstrap-server kafka:9092 \
--topic-list order-events
# 2. 如果磁盘紧张,可以临时调整retention
# 但这会导致旧消息被删除,可能丢失数据
kafka-configs.sh --alter \
--bootstrap-server kafka:9092 \
--topic order-events \
--add-config retention.bytes=1073741824 # 限制单Partition大小
# 3. 扩容磁盘(长期方案)
四、工程选型:积压监控与预防体系
4.1 核心监控指标
4.2 告警配置
# Prometheus + AlertManager 告警规则
groups:
- name: kafka-backlog
rules:
- alert: ConsumerLagHigh
expr: kafka_consumer_lag_sum > 100000
for: 5m
labels:
severity: critical
annotations:
summary: "Kafka消费严重积压,lag={{ $value }}"
- alert: ConsumerLagGrowing
expr: increase(kafka_consumer_lag_sum[5m]) > 50000
for: 1m
labels:
severity: warning
annotations:
summary: "Kafka积压增长速度过快"
- alert: ConsumerProcessSlow
expr: kafka_consumer_fetch_manager_records_consumed_total / kafka_consumer_fetch_manager_poll_total < 100
for: 5m
labels:
severity: warning
annotations:
summary: "Consumer处理速度异常慢"
4.3 积压预防 checklist
日常巡检:
[ ] 每日检查各Consumer Group的Lag
[ ] 监控Consumer处理时间的P99
[ ] 检查Consumer Group成员数量是否有异常波动
容量规划:
[ ] 预估峰值QPS,提前扩容
[ ] Consumer数量 = Partition数量(或略少)
[ ] 预留20%的消费余量
高可用设计:
[ ] Consumer部署多个实例(至少2个)
[ ] Consumer异常退出时自动拉起
[ ] 设置合理的session.timeout和heartbeat
快速响应:
[ ] 积压告警7x24小时通知
[ ] 制定积压应急响应SOP
[ ] 定期演练扩容流程
【面试官心理】
面试这道题,我会问一个终极问题:"如果积压了1000万条消息,你怎么设计一个紧急处理方案?"能说出"快速扩容 + 限流生产端 + 优先处理有效消息 + 幂等消费 + 事后复盘"的P6+。能进一步说出"如何避免对上游服务造成冲击"、"如何保证业务正确性不受影响"的,是有全局视野的P7。