线程池拒绝策略

候选人小赵在面试拼多多 P6 时,面试官问道:

"线程池的拒绝策略有哪些?什么场景用什么策略?"

小赵说:"有抛异常的、丢弃的..."面试官追问:"CallerRunsPolicy 是怎么限流的?"

小赵答不上来。面试官继续:"自定义拒绝策略能做什么?"

小赵彻底卡住了...

一、核心问题:拒绝策略 🔴

1.1 问题拆解

第一层:四种策略(有哪些?)
  "线程池有哪几种内置的拒绝策略?"
  考察点:Abort/CallerRuns/Discard/DiscardOldest

第二层:CallerRunsPolicy(怎么限流?)
  "CallerRunsPolicy 是怎么实现限流的?"
  考察点:调用方线程执行、限流原理

第三层:自定义策略(怎么实现?)
  "RejectedExecutionHandler 接口怎么用?"
  考察点:日志、监控、持久化

1.2 ❌ 错误示范

候选人原话 A:"CallerRunsPolicy 就是把任务丢弃了。"

问题诊断:CallerRunsPolicy 不是丢弃,而是由提交任务的线程(调用方)执行该任务。这样调用方线程会被阻塞,直到任务完成,间接起到限流作用。

候选人原话 B:"DiscardOldestPolicy 丢弃的是新任务。"

问题诊断:DiscardOldestPolicy 丢弃的是队列中最老的任务(即将被执行的任务),不是新任务。

1.3 标准回答

P5 级别:四种内置策略

四大内置拒绝策略

策略行为
AbortPolicynew ThreadPoolExecutor.AbortPolicy()抛 RejectedExecutionException
CallerRunsPolicynew ThreadPoolExecutor.CallerRunsPolicy()由调用方线程执行任务
DiscardPolicynew ThreadPoolExecutor.DiscardPolicy()静默丢弃任务,不抛异常
DiscardOldestPolicynew ThreadPoolExecutor.DiscardOldestPolicy()丢弃队列中最老的任务,尝试重试新任务

AbortPolicy(默认)

public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
    throw new RejectedExecutionException(
        "Task " + r + " rejected from " + e);
}

抛出 RejectedExecutionException,调用方需要处理这个异常。

P6 级别:CallerRunsPolicy 的限流原理

CallerRunsPolicy 的实现

public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
    if (!e.isShutdown()) {
        r.run();  // 由调用方线程直接执行!
    }
}

限流原理

调用方线程(比如 Tomcat worker 线程)

调用 pool.execute(task) 被拒绝

CallerRunsPolicy 让调用方线程执行 task

调用方线程被阻塞,直到 task 完成

调用方线程无法继续提交其他任务 → 间接限流

CallerRunsPolicy 的副作用

  1. 任务执行延迟增加:调用方线程被阻塞,HTTP 请求响应时间增加
  2. 线程池饱和传播:如果调用方也是线程池线程,可能导致整个线程池链路的任务堆积
  3. CallerRunsPolicy 不是完美的限流:它无法控制调用方线程的执行速率

P7 级别:自定义拒绝策略

RejectedExecutionHandler 接口

public interface RejectedExecutionHandler {
    void rejectedExecution(Runnable r, ThreadPoolExecutor e);
}

自定义策略:带监控和告警

class MonitoredRejectedPolicy implements RejectedExecutionHandler {
    private final MetricRegistry metrics;
    private final Logger logger;

    @Override
    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        // 1. 记录指标
        metrics.counter("threadpool.rejected").inc();

        // 2. 记录日志
        logger.warn("Task {} rejected from {}", r, e);

        // 3. 持久化任务(可选)
        persistTask(r);

        // 4. 根据策略决定是否抛异常
        throw new RejectedExecutionException("Task rejected");
    }
}

自定义策略:队列替换

class TransferQueuePolicy implements RejectedExecutionHandler {
    private final BlockingQueue<Runnable> transferQueue;

    @Override
    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        // 尝试将任务转移到另一个队列
        try {
            transferQueue.offer(r, 5, TimeUnit.SECONDS);
        } catch (InterruptedException ex) {
            Thread.currentThread().interrupt();
            throw new RejectedExecutionException("Transfer failed", ex);
        }
    }
}

【面试官心理】 这道题我能问到 P7 级别,是因为拒绝策略涉及了线程池的边界处理、限流策略、监控告警等工程实践。能实现带监控的拒绝策略的候选人说明他有生产运营的思考。

1.4 追问升级

追问 1:为什么需要四种策略?它们各自的适用场景是什么?

  • AbortPolicy:需要感知任务被拒绝的场景(如关键业务任务)
  • CallerRunsPolicy:需要限流但允许任务延迟处理的场景(如异步任务处理)
  • DiscardPolicy:不关心任务是否被执行(如日志收集、监控数据)
  • DiscardOldestPolicy:需要丢弃低优先级任务以处理高优先级任务的场景

追问 2:Dubbo 和 Spring 的线程池拒绝策略有什么特点?

Dubbo 的 AbortPolicy 会打印线程池状态:RejectedExecutionException("Thread pool is EXHAUSTED!...")

Spring 的 TaskDecorator 可以在任务执行前后注入上下文,支持在拒绝策略中获取上下文信息。

二、生产避坑 🟡

2.1 丢弃任务导致的问题

使用 DiscardPolicy 或 DiscardOldestPolicy 时,任务被丢弃但没有任何告警,可能导致业务逻辑缺失。

解决:使用自定义拒绝策略,记录丢弃的任务。

2.2 CallerRunsPolicy 的线程池嵌套死锁

pool.execute(() -> {
    pool.execute(innerTask);  // 死锁风险!
});

场景:线程池已满 + 队列满 + CallerRunsPolicy。如果调用方是线程池线程,它在执行 innerTask 时可能需要等待 pool 的其他资源,而其他资源又被阻塞。

解决:不要在线程池任务中向同一线程池提交关键任务,或使用不同的线程池。

三、最佳实践 🟢

3.1 推荐的自定义策略

ThreadPoolExecutor pool = new ThreadPoolExecutor(
    coreSize, maxSize, 60L, TimeUnit.SECONDS,
    new ArrayBlockingQueue<>(1000),
    new ThreadFactoryBuilder().setNameFormat("Worker-%d").build(),
    (r, executor) -> {
>        // 1. 记录被拒绝的任务
>        log.warn("Task {} rejected from {}", r, executor);
>        // 2. 尝试持久化
>        persistTask(r);
>        // 3. 抛异常或记录
>        throw new RejectedExecutionException("Task rejected");
>    }
);

3.2 监控指标

// 关键监控指标
pool.getActiveCount();      // 活跃线程数
pool.getQueue().size();    // 队列长度
pool.getCompletedTaskCount();  // 完成的任务数

// 被拒绝的任务数需要自行统计
AtomicLong rejectedCount = new AtomicLong();
💡

面试加分点:能说出"Guava 的 ListeningExecutorService 配合 MoreExecutors.rejectionHandler(...) 可以提供更丰富的拒绝策略支持",说明他对 Guava 的并发工具有了解。

⚠️

面试陷阱:被问到"拒绝策略是在什么时候触发的",很多人会说"队列满的时候"。准确答案是:拒绝策略在 addWorker(command, false) 返回 false 时触发,这发生在:(1) 线程数已达 maximumPoolSize 且 (2) 队列已满 或 (3) 线程池已停止。