消息积压处理

2020年双十一凌晨1点,某电商平台的订单取消服务突然失效:用户取消订单后,优惠券没有返还,积分没有扣除。

技术团队排查后发现:订单取消消息在MQ中积压了10万条,最早的消息已经等待了30分钟。

根因分析:优惠券服务在凌晨0点进行了一次代码发布,新代码里有个bug导致优惠券处理失败,失败消息被重新放入队列,导致队列持续积压。

更严重的是:重试机制导致积压越来越多,形成了"死亡螺旋"。

这次积压影响了约5万个订单的取消处理,用户投诉爆增。

【面试官手记】

消息积压是MQ系统最常见的问题之一。我面试过的候选人里,能说清楚"积压原因"的不超过30%,能说清楚"处理积压"方法的不超过20%。消息积压的关键是快速发现 + 正确处理 + 根因修复

一、消息积压的常见原因 🔴

1.1 五大原因

消息积压的五大原因:

1. 消费者处理慢
   - 消费者代码性能问题
   - 消费者资源不足(CPU、内存)
   - 消费者被限流
   - 症状:积压量持续增长

2. 消费者数量不足
   - 消费者实例数量少于预期
   - 消费者实例宕机
   - 消费者被踢出消费组
   - 症状:积压量突然增长

3. 消费失败重试
   - 消息处理失败被重试
   - 重试间隔短,队列持续积压
   - 症状:积压量周期性波动

4. 生产速度超过消费速度
   - 上游流量突增
   - 批量消息集中发送
   - 症状:积压量快速增长

5. 消息本身大
   - 单条消息体积过大
   - 网络传输慢
   - 序列化/反序列化慢
   - 症状:消费耗时异常高

1.2 量化指标

MQ积压监控指标:

积压量:
- 积压条数:积压的消息数量
- 积压时间:消息在队列中等待的时间

消费速度:
- 消费QPS:每秒消费的消息数
- 生产QPS:每秒生产的消息数

消费耗时:
- 平均消费耗时:每条消息的处理时间
- P99消费耗时:99%消息的处理时间

1.3 面试追问

面试官:消息积压了怎么处理?

候选人:两个方向:一是快速消费积压,二是排查积压原因。

面试官:怎么快速消费积压?

候选人:扩容消费者实例。如果消费者是 stateless 的,可以水平扩容。同时加大消费批次,每批处理更多消息。

【面试官心理】

消息积压的追问通常很务实。能回答出"扩容消费者"的候选人,说明知道基本方法;能说出"加大消费批次"的候选人,说明有优化意识;能说出"死信队列"的候选人,说明知道兜底方案。

二、排查流程 🔴

2.1 Kafka积压查看

# 查看消费组的积压情况
kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group <group> --describe

# 输出示例:
TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG
order-cancel    0          123456789       123566789       110000
order-cancel    1          123456789       123566789       110000

# LAG列就是积压量

2.2 RocketMQ积压查看

# 查看消费组的积压情况
mqadmin consumerProgress -n localhost:9876 -g <group>

# 查看Topic的积压情况
mqadmin topicStatus -n localhost:9876 -t <topic>

2.3 RabbitMQ积压查看

# 查看队列消息数量
rabbitmqctl list_queues name messages messages_ready messages_unacknowledged

# 查看消费者数量
rabbitmqctl list_queues name consumers

三、积压处理方法 🟡

3.1 扩容消费者

# Kubernetes水平扩容
kubectl scale deployment order-consumer --replicas=10

# 或调整消费者线程池
spring:
  rabbitmq:
    listener:
      simple:
        concurrency: 10        # 核心消费者数
        max-concurrency: 20    # 最大消费者数
        prefetch: 10           # 预取消息数

3.2 调整消费策略

// RocketMQ消费者配置
@Bean
public DefaultMQPushConsumer consumer() {
    DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("my-consumer-group");
    consumer.setConsumeThreadMin(20);   // 最小线程
    consumer.setConsumeThreadMax(40);  // 最大线程
    consumer.setConsumeMessageBatchMaxSize(32);  // 批量消费大小
    consumer.setPullBatchSize(32);      // 拉取批次
    return consumer;
}

3.3 跳过问题消息

// 跳过无法消费的消息,避免死循环
@RocketMQMessageListener(
    consumerGroup = "my-consumer-group",
    topic = "my-topic",
    maxReconsumeTimes = 3  // 最大重试3次
)
public class MyConsumer implements RocketMQListener<OrderMessage> {

    @Override
    public void onMessage(OrderMessage message) {
        try {
            processOrder(message);
        } catch (Exception e) {
            // 记录日志,跳过继续处理
            log.error("处理订单失败,消息ID: {}", message.getMsgId(), e);
            // 抛异常触发重试,或直接ack跳过
        }
    }
}

四、死信队列处理 🟡

4.1 死信队列配置

// RocketMQ死信队列
// 消息重试3次后仍失败,进入死信队列
// 死信队列Topic: %DLQ%<ConsumerGroup>

// 死信队列消费
@Bean
public DefaultMQPushConsumer deadLetterConsumer() {
    DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("my-dlq-consumer-group");
    consumer.setConsumeThreadMin(5);
    consumer.setConsumeThreadMax(10);
    consumer.subscribe("%DLQ%my-consumer-group", "*");
    return consumer;
}

4.2 死信消息处理

// 死信队列消费者
public class DeadLetterConsumer {

    public void processDeadLetter(Message message) {
        // 1. 分析死信原因
        String reason = message.getUserProperty("REASON");
        log.error("死信原因: {}, 消息: {}", reason, message);

        // 2. 根据原因处理
        if ("INVALID_MESSAGE".equals(reason)) {
            // 无效消息,丢弃或记录
            discardMessage(message);
        } else if ("PROCESS_FAILED".equals(reason)) {
            // 处理失败,可以人工处理
            notifyOps(message);
        }

        // 3. 确认消费
    }
}

五、预防措施 🟡

5.1 监控告警

# Prometheus告警规则
groups:
- name: mq-alerts
  rules:
  - alert: MQBacklogHigh
    expr: kafka_consumer_lag_sum > 100000
    for: 5m
    labels:
      severity: critical
    annotations:
      summary: "消息积压超过10万"

  - alert: MQConsumerDown
    expr: kafka_consumer_group_count == 0
    for: 1m
    labels:
      severity: critical
    annotations:
      summary: "消费者实例数为0"

5.2 限流保护

// 生产者限流
@RateLimiter(name = "order-producer", permitsPerSecond = 10000)
public void sendOrderMessage(Order order) {
    mqProducer.send(order);
}

// 消费者限流
@Bean
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(
        ConnectionFactory connectionFactory) {
    SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
    factory.setConnectionFactory(connectionFactory);
    factory.setConcurrent(10);
    factory.setMaxConcurrent(20);
    factory.setPrefetchCount(10);  // 限流:每次只拿10条
    return factory;
}

六、生产避坑 🟡

6.1 消息积压的五大坑

坑1:无限重试导致积压加剧

问题:消费失败后无限重试,积压越来越多
场景:死循环重试
解决方案:
- 设置最大重试次数
- 重试间隔要足够长
- 最终进入死信队列

坑2:消费失败但不抛异常

问题:消费失败但没有抛异常,消息被确认丢失
场景:catch块里只记日志
解决方案:
- 消费失败必须抛异常
- 或者返回CONSUME_SUCCESS以外的状态

坑3:只扩容不优化消费逻辑

问题:扩容消费者,但每条消息处理仍然很慢
场景:消费逻辑本身有问题
解决方案:
- 先优化消费逻辑
- 再扩容

坑4:忽略了消息顺序

问题:提高并发消费,但消息顺序乱了
场景:订单取消需要按顺序处理
解决方案:
- 顺序消息只用一个消费者
- 或者用MessageSelector过滤

坑5:没有监控死信队列

问题:只关注主队列,忽略死信队列
场景:死信队列积压也没发现
解决方案:
- 死信队列也要监控
- 告警规则和主队列一样

七、真实面试回放 🟡

面试官:消息队列积压了怎么处理?

候选人(小王):分两步:紧急处理和根因排查。

紧急处理:扩容消费者实例,加快消费速度。如果消费者是stateless的,可以水平扩容。同时可以调整消费批次大小,每批处理更多消息。

根因排查:看是消费者处理慢,还是消费失败重试。如果消费者慢就优化代码;如果是失败重试,就看日志找原因。

面试官:如果积压太多,一时半会消费不完怎么办?

小王:两个方案:

一是先让新的消息正常消费,老的积压消息慢慢消费。可以用新消费者组,只消费最新的消息。

二是评估影响范围。如果积压的消息已经过期,可以直接丢弃。比如订单取消消息,超过30分钟没处理可以认为无效。

面试官:怎么预防消息积压?

小王:三个措施:

一是监控告警。积压超过阈值(如1万条)就告警。

二是限流保护。生产者限流,不要让流量超过消费能力。

三是熔断降级。消费失败率超过阈值时,暂停消费,防止积压加剧。

【面试官手记】

小王这场面试的亮点:

  1. 知道紧急处理和根因排查两步走

  2. 知道新消费者组只消费新消息的方案

  3. 知道评估过期消息是否需要丢弃

  4. 知道限流和熔断的预防措施

消息积压是P6工程师必备技能,能完整回答的候选人,说明有实际排查经验。

消息积压处理的核心是快速消费 + 根因修复。记住三个要点:

  1. 紧急处理:扩容消费者 + 加大批次 + 跳过过期消息
  2. 根因排查:是消费者慢还是重试导致
  3. 预防措施:监控告警 + 限流保护 + 死信队列兜底

消息积压的尽头不是无限的扩容,而是找到消费慢的根因。