消息积压处理
2020年双十一凌晨1点,某电商平台的订单取消服务突然失效:用户取消订单后,优惠券没有返还,积分没有扣除。
技术团队排查后发现:订单取消消息在MQ中积压了10万条,最早的消息已经等待了30分钟。
根因分析:优惠券服务在凌晨0点进行了一次代码发布,新代码里有个bug导致优惠券处理失败,失败消息被重新放入队列,导致队列持续积压。
更严重的是:重试机制导致积压越来越多,形成了"死亡螺旋"。
这次积压影响了约5万个订单的取消处理,用户投诉爆增。
【面试官手记】
消息积压是MQ系统最常见的问题之一。我面试过的候选人里,能说清楚"积压原因"的不超过30%,能说清楚"处理积压"方法的不超过20%。消息积压的关键是快速发现 + 正确处理 + 根因修复。
一、消息积压的常见原因 🔴
1.1 五大原因
消息积压的五大原因:
1. 消费者处理慢
- 消费者代码性能问题
- 消费者资源不足(CPU、内存)
- 消费者被限流
- 症状:积压量持续增长
2. 消费者数量不足
- 消费者实例数量少于预期
- 消费者实例宕机
- 消费者被踢出消费组
- 症状:积压量突然增长
3. 消费失败重试
- 消息处理失败被重试
- 重试间隔短,队列持续积压
- 症状:积压量周期性波动
4. 生产速度超过消费速度
- 上游流量突增
- 批量消息集中发送
- 症状:积压量快速增长
5. 消息本身大
- 单条消息体积过大
- 网络传输慢
- 序列化/反序列化慢
- 症状:消费耗时异常高
1.2 量化指标
MQ积压监控指标:
积压量:
- 积压条数:积压的消息数量
- 积压时间:消息在队列中等待的时间
消费速度:
- 消费QPS:每秒消费的消息数
- 生产QPS:每秒生产的消息数
消费耗时:
- 平均消费耗时:每条消息的处理时间
- P99消费耗时:99%消息的处理时间
1.3 面试追问
面试官:消息积压了怎么处理?
候选人:两个方向:一是快速消费积压,二是排查积压原因。
面试官:怎么快速消费积压?
候选人:扩容消费者实例。如果消费者是 stateless 的,可以水平扩容。同时加大消费批次,每批处理更多消息。
【面试官心理】
消息积压的追问通常很务实。能回答出"扩容消费者"的候选人,说明知道基本方法;能说出"加大消费批次"的候选人,说明有优化意识;能说出"死信队列"的候选人,说明知道兜底方案。
二、排查流程 🔴
2.1 Kafka积压查看
# 查看消费组的积压情况
kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group <group> --describe
# 输出示例:
TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG
order-cancel 0 123456789 123566789 110000
order-cancel 1 123456789 123566789 110000
# LAG列就是积压量
2.2 RocketMQ积压查看
# 查看消费组的积压情况
mqadmin consumerProgress -n localhost:9876 -g <group>
# 查看Topic的积压情况
mqadmin topicStatus -n localhost:9876 -t <topic>
2.3 RabbitMQ积压查看
# 查看队列消息数量
rabbitmqctl list_queues name messages messages_ready messages_unacknowledged
# 查看消费者数量
rabbitmqctl list_queues name consumers
三、积压处理方法 🟡
3.1 扩容消费者
# Kubernetes水平扩容
kubectl scale deployment order-consumer --replicas=10
# 或调整消费者线程池
spring:
rabbitmq:
listener:
simple:
concurrency: 10 # 核心消费者数
max-concurrency: 20 # 最大消费者数
prefetch: 10 # 预取消息数
3.2 调整消费策略
// RocketMQ消费者配置
@Bean
public DefaultMQPushConsumer consumer() {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("my-consumer-group");
consumer.setConsumeThreadMin(20); // 最小线程
consumer.setConsumeThreadMax(40); // 最大线程
consumer.setConsumeMessageBatchMaxSize(32); // 批量消费大小
consumer.setPullBatchSize(32); // 拉取批次
return consumer;
}
3.3 跳过问题消息
// 跳过无法消费的消息,避免死循环
@RocketMQMessageListener(
consumerGroup = "my-consumer-group",
topic = "my-topic",
maxReconsumeTimes = 3 // 最大重试3次
)
public class MyConsumer implements RocketMQListener<OrderMessage> {
@Override
public void onMessage(OrderMessage message) {
try {
processOrder(message);
} catch (Exception e) {
// 记录日志,跳过继续处理
log.error("处理订单失败,消息ID: {}", message.getMsgId(), e);
// 抛异常触发重试,或直接ack跳过
}
}
}
四、死信队列处理 🟡
4.1 死信队列配置
// RocketMQ死信队列
// 消息重试3次后仍失败,进入死信队列
// 死信队列Topic: %DLQ%<ConsumerGroup>
// 死信队列消费
@Bean
public DefaultMQPushConsumer deadLetterConsumer() {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("my-dlq-consumer-group");
consumer.setConsumeThreadMin(5);
consumer.setConsumeThreadMax(10);
consumer.subscribe("%DLQ%my-consumer-group", "*");
return consumer;
}
4.2 死信消息处理
// 死信队列消费者
public class DeadLetterConsumer {
public void processDeadLetter(Message message) {
// 1. 分析死信原因
String reason = message.getUserProperty("REASON");
log.error("死信原因: {}, 消息: {}", reason, message);
// 2. 根据原因处理
if ("INVALID_MESSAGE".equals(reason)) {
// 无效消息,丢弃或记录
discardMessage(message);
} else if ("PROCESS_FAILED".equals(reason)) {
// 处理失败,可以人工处理
notifyOps(message);
}
// 3. 确认消费
}
}
五、预防措施 🟡
5.1 监控告警
# Prometheus告警规则
groups:
- name: mq-alerts
rules:
- alert: MQBacklogHigh
expr: kafka_consumer_lag_sum > 100000
for: 5m
labels:
severity: critical
annotations:
summary: "消息积压超过10万"
- alert: MQConsumerDown
expr: kafka_consumer_group_count == 0
for: 1m
labels:
severity: critical
annotations:
summary: "消费者实例数为0"
5.2 限流保护
// 生产者限流
@RateLimiter(name = "order-producer", permitsPerSecond = 10000)
public void sendOrderMessage(Order order) {
mqProducer.send(order);
}
// 消费者限流
@Bean
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(
ConnectionFactory connectionFactory) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
factory.setConcurrent(10);
factory.setMaxConcurrent(20);
factory.setPrefetchCount(10); // 限流:每次只拿10条
return factory;
}
六、生产避坑 🟡
6.1 消息积压的五大坑
坑1:无限重试导致积压加剧
问题:消费失败后无限重试,积压越来越多
场景:死循环重试
解决方案:
- 设置最大重试次数
- 重试间隔要足够长
- 最终进入死信队列
坑2:消费失败但不抛异常
问题:消费失败但没有抛异常,消息被确认丢失
场景:catch块里只记日志
解决方案:
- 消费失败必须抛异常
- 或者返回CONSUME_SUCCESS以外的状态
坑3:只扩容不优化消费逻辑
问题:扩容消费者,但每条消息处理仍然很慢
场景:消费逻辑本身有问题
解决方案:
- 先优化消费逻辑
- 再扩容
坑4:忽略了消息顺序
问题:提高并发消费,但消息顺序乱了
场景:订单取消需要按顺序处理
解决方案:
- 顺序消息只用一个消费者
- 或者用MessageSelector过滤
坑5:没有监控死信队列
问题:只关注主队列,忽略死信队列
场景:死信队列积压也没发现
解决方案:
- 死信队列也要监控
- 告警规则和主队列一样
七、真实面试回放 🟡
面试官:消息队列积压了怎么处理?
候选人(小王):分两步:紧急处理和根因排查。
紧急处理:扩容消费者实例,加快消费速度。如果消费者是stateless的,可以水平扩容。同时可以调整消费批次大小,每批处理更多消息。
根因排查:看是消费者处理慢,还是消费失败重试。如果消费者慢就优化代码;如果是失败重试,就看日志找原因。
面试官:如果积压太多,一时半会消费不完怎么办?
小王:两个方案:
一是先让新的消息正常消费,老的积压消息慢慢消费。可以用新消费者组,只消费最新的消息。
二是评估影响范围。如果积压的消息已经过期,可以直接丢弃。比如订单取消消息,超过30分钟没处理可以认为无效。
面试官:怎么预防消息积压?
小王:三个措施:
一是监控告警。积压超过阈值(如1万条)就告警。
二是限流保护。生产者限流,不要让流量超过消费能力。
三是熔断降级。消费失败率超过阈值时,暂停消费,防止积压加剧。
【面试官手记】
小王这场面试的亮点:
-
知道紧急处理和根因排查两步走
-
知道新消费者组只消费新消息的方案
-
知道评估过期消息是否需要丢弃
-
知道限流和熔断的预防措施
消息积压是P6工程师必备技能,能完整回答的候选人,说明有实际排查经验。
消息积压处理的核心是快速消费 + 根因修复。记住三个要点:
- 紧急处理:扩容消费者 + 加大批次 + 跳过过期消息
- 根因排查:是消费者慢还是重试导致
- 预防措施:监控告警 + 限流保护 + 死信队列兜底
消息积压的尽头不是无限的扩容,而是找到消费慢的根因。