消息积压处理策略

凌晨2点,监控告警响起:Kafka消费严重积压,lag超过100万条。

开发同学从床上爬起来,打开监控面板一看,发现是某个Consumer处理一条消息要5秒——因为上游服务接口超时,导致每条消息都在等待。

更糟糕的是,这个Consumer所在的Consumer Group只有2个实例,而Topic有8个分区。

消息积压越来越严重,每小时增长50万条,照这个速度,明天早上就能积压1000万条。

【面试官心理】

这道题我考察的是候选人对MQ生产故障的处理能力。消息积压是MQ使用中最常见的事故之一,90%的候选人知道"增加消费者",但90%说不清"什么时候该扩容"、"扩容后数据怎么不乱"、"怎么避免重复消费"。能完整处理消息积压问题的,是有生产运维经验的P6+。

一、核心问题:消息积压的原因分析 🔴

1.1 问题拆解

第一层:识别问题 面试官问:"消息积压是什么?怎么判断有没有积压?" 候选人答:"consumer lag变大,消息堆积..." 考察点:基本认知

第二层:原因分析 面试官追问:"积压的原因有哪些?怎么定位是哪个环节的问题?" 候选人答:...(拉开点1) 考察点:排查能力

第三层:应急处理 面试官追问:"线上积压了100万条,怎么快速处理?" 候选人答:...(核心拉开点) 考察点:应急处理能力

第四层:根治方案 面试官追问:"处理完之后怎么防止再积压?" 候选人答:...(P7区分点) 考察点:系统性思维

1.2 错误示范

候选人原话 A:"消息积压就增加消费者,简单。"

问题诊断

  • 增加消费者能解决问题,但不是所有场景都有效
  • 如果Consumer处理慢是瓶颈,加再多Consumer也没用
  • 不诊断根因就扩容是乱弹琴

候选人原话 B:"积压了就等它慢慢消费完,不着急。"

问题诊断

  • 积压意味着下游状态和上游严重不一致
  • 业务时效性要求高的场景(如秒杀),积压会导致大量超时
  • 缺乏紧迫感

候选人原话 C:"消费者挂了,重启就好了。"

问题诊断

  • 如果不重启,消费者会继续积压
  • 如果重启,要小心offset提交问题(已消费的消息不能重复)
  • 不分析挂的原因,下次还会挂

1.3 标准回答

消息积压的本质:消费速度跟不上生产速度

积压公式:
  积压量 = 生产速度 × 时间 - 消费速度 × 时间

当 积压量 > 0 时,消息持续堆积

临界点:
  消费速度 >= 生产速度 → 积压量不再增长(但也不会下降)
  消费速度 > 生产速度 → 积压量开始下降

积压的五大原因

原因1:Consumer 挂了

症状:
  - Consumer Lag瞬间飙升到积压总量
  - Consumer Group的成员数量减少
  - 其他Consumer的Lag也飙升(因为接管了原本该接管的分区)

排查命令(Kafka):
  kafka-consumer-groups.sh --bootstrap-server kafka:9092 \
    --group consumer-group --describe

  # 输出中查看:
  # CONSUMERS: 成员数量
  # LAG: 当前积压量
  # STATUS: Consumer状态(Stable/PreparingRebalance等)

原因2:Consumer 处理速度慢

症状:
  - Consumer Lag缓慢增长
  - Consumer Group成员数量正常
  - 所有Consumer的Lag都在涨

常见原因:
  1. 外部服务调用超时(如数据库慢查询、RPC超时)
  2. 单条消息处理逻辑过重(如复杂的业务计算)
  3. Consumer配置不当(如max.poll.records设太大)
  4. 消费者线程池配置不当(并发数不足)

原因3:生产者速度过快

症状:
  - 消息积压突然增加
  - Consumer处理速度正常
  - 可能是营销活动、突发流量

排查:
  - 查看生产端日志,看是否有异常流量
  - 查看上游服务的调用量
  - 分析消息的产生规律

原因4:消息格式/反序列化问题

症状:
  - Consumer消费报错(反序列化异常)
  - 大量消息消费失败,进入重试
  - 重试消息和正常消息混在一起

排查:
  - Consumer日志中搜索异常关键字
  - 查看死信队列(DLQ)是否有大量消息

原因5:消费者 Rebalance 风暴

症状:
  - Consumer Lag周期性增长
  - Consumer Group状态频繁变化
  - 多个Consumer反复加入/离开

原因:
  - session.timeout.ms设得太小
  - Consumer处理超时(超过max.poll.interval.ms)
  - 网络抖动导致心跳丢失

【面试官心理】

积压原因的分析是这道题的核心。我会追问:"如果consumer_lag=100万,你怎么定位是哪条消息导致的积压?"能说出"从消费最慢的那条消息入手"、"分析单条消息的处理时间分布"的P6+。能说出"用分布式追踪定位慢消息"的P7。

1.4 追问升级

P6/P7 差距拉开点:

面试官问:"Consumer重启后,积压的消息会重复消费吗?怎么避免?"

这道题的分水岭:

  • P5:不知道会不会重复
  • P6:知道用手动提交offset避免重复消费
  • P7:能说出"重启前记录消费进度"、"幂等消费"、"分批处理积压消息"的完整方案

二、延伸问题:积压的应急处理 🟡

2.1 紧急扩容:三步走

第一步:快速扩容消费者

# 扩容前:2个Consumer,8个分区
# 每个Consumer平均消费4个分区

# 扩容后:8个Consumer,8个分区
# 每个Consumer平均消费1个分区
# 消费能力理论上提升4倍

# 但扩容过程中会触发Rebalance!
# 建议:用StickyAssignor,减少Rebalance影响
// 如果是Kafka,扩容Consumer实例
// 注意:Consumer数不能超过Partition数
// 如果超过了,多余的Consumer会空转

// 查看当前分区数
kafka-topics.sh --describe --topic order-events --bootstrap-server kafka:9092

// 如果分区数不够,先增加分区
kafka-topics.sh --alter --topic order-events \
  --partitions 16 --bootstrap-server kafka:9092

第二步:限流生产端

// 生产端限流:降低生产速度,为消费端争取时间

// 方案1:降低发送频率
Thread.sleep(100);  // 每条消息间隔100ms

// 方案2:使用令牌桶限流
RateLimiter limiter = RateLimiter.create(1000);  // 每秒1000条
for (Message msg : messages) {
    limiter.acquire();
    producer.send(msg);
}

// 方案3:直接暂停生产(极端情况)
producer.pause();  // 暂停发送
// 处理完积压后
producer.resume();  // 恢复发送

第三步:优化消费逻辑

// 积压期间,临时优化Consumer配置

// 1. 增加每次拉取的消息数量
props.put("max.poll.records", "2000");  // 默认500条

// 2. 减少poll间隔
props.put("fetch.min.bytes", "1");  // 有消息就拉

// 3. 关闭部分校验逻辑(积压期间降级)
// 积压处理优先,不做详细的业务校验

// 4. 使用批量处理
List<Message> batch = new ArrayList<>();
while ((msg = consumer.poll(100)) != null) {
    batch.add(msg);
    if (batch.size() >= 100) {
        processBatch(batch);  // 批量处理
        batch.clear();
    }
}
💡

扩容不是银弹。如果Consumer处理慢的根因没解决,扩容只能缓解症状。比如,如果每个Consumer调用同一个慢接口,扩容10倍不会让接口变快,反而可能把上游服务打挂。

2.2 积压消息的特殊处理

场景:积压的消息有大量是超时无效的(如订单已取消),不需要处理。

// 方案:优先消费"有价值"的消息,跳过"无效"消息

while (true) {
    List<MessageExt> msgs = consumer.poll(1000);

    // 按消息年龄分组
    Map<Long, List<MessageExt>> grouped =
        msgs.stream().collect(Collectors.groupingBy(msg -> {
            return (System.currentTimeMillis() - msg.getBornTimestamp()) / 60000;
        }));

    // 优先处理最近的消息(可能还在时效内)
    grouped.keySet().stream().sorted()
        .forEach(age -> {
            for (MessageExt msg : grouped.get(age)) {
                if (isStillValid(msg)) {
                    processMessage(msg);
                } else {
                    skipMessage(msg);  // 跳过无效消息
                }
            }
        });
}

三、生产避坑:积压的预防机制

3.1 坑一:Consumer处理失败导致无限重试

场景:某条消息处理失败后无限重试,导致后续消息全部卡住。

问题:
  ConsumeConcurrentlyContext.setDelayLevelWhenQueueConsumeSpan(3000);
  → 消费失败后延迟3秒重试
  → 如果这条消息一直失败(死循环、格式错误)
  → 这条消息会一直重试,阻塞后续消息

影响:
  顺序消费模式下,当前Queue的消息全部卡住
  并行消费模式下,如果重试队列满了,新消息也无法处理

解决方案

// 1. 设置重试上限
int maxRetry = 3;
int retryCount = getRetryCount(msg.getMsgId());  // 从Redis/DB获取重试次数

if (retryCount >= maxRetry) {
    // 超过重试次数,投递到死信队列(DLQ)
    sendToDeadLetterQueue(msg);
    return ConsumeStatus.SUCCESS;  // 返回成功,不再重试
}

// 2. 区分可重试和不可重试的异常
try {
    processMessage(msg);
} catch (TransientException e) {
    // 临时异常(网络、数据库),可以重试
    return ConsumeStatus.RECONSUME_LATER;
} catch (FatalException e) {
    // 致命异常(数据格式、业务错误),直接投递DLQ
    sendToDeadLetterQueue(msg);
    return ConsumeStatus.SUCCESS;
}

3.2 坑二:消费者扩容后消息乱序

场景:扩容后,部分消息被多个Consumer重复消费。

问题:
  Consumer-1 正在处理消息A
  Consumer-1 被判定为离线(Rebalance发生)
  Consumer-2 接管了原本属于Consumer-1的分区
  Consumer-2 重新消费了消息A
  → 消息A被消费了两次

解决方案

// 1. 幂等消费:业务层去重
public void processMessage(Message msg) {
    String msgId = msg.getMsgId();
    String key = "consumed:msg:" + msgId;

    // Redis SETNX 去重
    boolean alreadyConsumed = redis.setIfAbsent(key, "1",
        Duration.ofDays(7));

    if (alreadyConsumed) {
        doProcess(msg);
    } else {
        log.info("消息已处理过,跳过: {}", msgId);
    }
}

// 2. 扩容时避免Rebalance
// 使用StickyAssignor
props.put("partition.assignment.strategy",
    "org.apache.kafka.clients.consumer.StickyAssignor");

// 或者:扩容时先暂停消费,处理完积压再扩容

3.3 坑三:消息堆积导致磁盘爆满

场景:Kafka积压太久,磁盘空间不足。

问题:
  Kafka retention默认7天
  如果积压超过7天,旧消息会被删除
  但如果Consumer完全停摆了,消息会一直堆积

Kafka磁盘空间计算:
  每条消息约 1KB
  100万消息 = 1GB
  峰值10万QPS积压1小时 = 360GB
  磁盘必须预留足够空间

处理方案

# 1. 查看Broker磁盘使用情况
kafka-log-dirs.sh --describe \
  --bootstrap-server kafka:9092 \
  --topic-list order-events

# 2. 如果磁盘紧张,可以临时调整retention
# 但这会导致旧消息被删除,可能丢失数据
kafka-configs.sh --alter \
  --bootstrap-server kafka:9092 \
  --topic order-events \
  --add-config retention.bytes=1073741824  # 限制单Partition大小

# 3. 扩容磁盘(长期方案)

四、工程选型:积压监控与预防体系

4.1 核心监控指标

指标告警阈值说明
consumer_lag> 10000消费延迟超过1万条
consumer_lag_rate> 1000/min积压增长速度过快
under_replicated_partitions> 0副本不足,数据有丢失风险
partition_end_offset_diff> 10000单分区积压异常
consume_time_avg> 1000ms单条消息平均处理时间过长

4.2 告警配置

# Prometheus + AlertManager 告警规则
groups:
  - name: kafka-backlog
    rules:
      - alert: ConsumerLagHigh
        expr: kafka_consumer_lag_sum > 100000
        for: 5m
        labels:
          severity: critical
        annotations:
          summary: "Kafka消费严重积压,lag={{ $value }}"

      - alert: ConsumerLagGrowing
        expr: increase(kafka_consumer_lag_sum[5m]) > 50000
        for: 1m
        labels:
          severity: warning
        annotations:
          summary: "Kafka积压增长速度过快"

      - alert: ConsumerProcessSlow
        expr: kafka_consumer_fetch_manager_records_consumed_total / kafka_consumer_fetch_manager_poll_total < 100
        for: 5m
        labels:
          severity: warning
        annotations:
          summary: "Consumer处理速度异常慢"

4.3 积压预防 checklist

日常巡检:
  [ ] 每日检查各Consumer Group的Lag
  [ ] 监控Consumer处理时间的P99
  [ ] 检查Consumer Group成员数量是否有异常波动

容量规划:
  [ ] 预估峰值QPS,提前扩容
  [ ] Consumer数量 = Partition数量(或略少)
  [ ] 预留20%的消费余量

高可用设计:
  [ ] Consumer部署多个实例(至少2个)
  [ ] Consumer异常退出时自动拉起
  [ ] 设置合理的session.timeout和heartbeat

快速响应:
  [ ] 积压告警7x24小时通知
  [ ] 制定积压应急响应SOP
  [ ] 定期演练扩容流程

【面试官心理】

面试这道题,我会问一个终极问题:"如果积压了1000万条消息,你怎么设计一个紧急处理方案?"能说出"快速扩容 + 限流生产端 + 优先处理有效消息 + 幂等消费 + 事后复盘"的P6+。能进一步说出"如何避免对上游服务造成冲击"、"如何保证业务正确性不受影响"的,是有全局视野的P7。