线程池执行流程

候选人小庞在面试美团 P6 时,面试官问道:

"线程池是怎么执行任务的?能说下完整流程吗?"

小庞说:"提交任务 → 核心线程执行 → 队列等待 → 最大线程数..."面试官追问:"Worker 线程是怎么不断获取任务的?getTask() 的逻辑是什么?"

小庞支支吾吾答不上来。面试官继续:"线程复用是怎么实现的?"

小庞彻底卡住了...

一、核心问题:线程池执行流程 🔴

1.1 问题拆解

第一层:完整流程(有哪些步骤?)
  "线程池执行任务的全流程是什么?"
  考察点:execute() 的三段判断逻辑

第二层:Worker 与任务获取(怎么循环?)
  "Worker 线程是怎么不断获取任务的?getTask() 的阻塞逻辑是什么?"
  考察点:Worker.run() → runWorker() → getTask() 的循环

第三层:线程复用(怎么复用?)
  "线程池是怎么实现线程复用的?"
  考察点:runWorker() 的 while 循环、task = null 的处理

1.2 ❌ 错误示范

候选人原话 A:"任务完成后线程就结束了,线程池会创建新线程处理下一个任务。"

问题诊断:线程池通过 runWorker() 的 while 循环 实现线程复用。任务完成后线程不退出,而是从队列中获取下一个任务。

候选人原话 B:"线程池满了就拒绝任务。"

问题诊断:流程是:核心线程满 → 队列 → 最大线程数 → 拒绝策略。不是"满了就拒绝"。

1.3 标准回答

P5 级别:execute() 三段判断

线程池执行任务的三段流程

public void execute(Runnable command) {
    if (command == null) throw new NullPointerException();

    // 阶段 1:当前线程数 < corePoolSize?创建核心线程
    int c = ctl.get();
    if (workerCountOf(c) < corePoolSize) {
        if (addWorker(command, true))  // true = 核心线程
            return;
        c = ctl.get();
    }

    // 阶段 2:核心线程满,加入队列
    if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();
        if (!isRunning(recheck) && remove(command))
            reject(command);
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);  // 保证至少有一个 worker
    }

    // 阶段 3:队列满,尝试创建非核心线程
    else if (!addWorker(command, false))  // false = 非核心线程
        reject(command);  // 拒绝
}

关键顺序核心线程 → 队列 → 最大线程 → 拒绝

P6 级别:Worker 与任务获取循环

Worker 的设计

Worker 是线程池内部封装 Runnable 的类,同时管理线程生命周期:

private final class Worker extends AbstractQueuedSynchronizer implements Runnable {
    final Thread thread;           // 实际的工作线程
    Runnable firstTask;             // 第一个任务(可为空)
    volatile long completedTasks;   // 完成的任务计数

    Worker(Runnable firstTask) {
        this.firstTask = firstTask;
        this.thread = threadFactory.newThread(this);  // 创建线程
    }

    public void run() {
        runWorker(this);  // 执行任务
    }
}

runWorker() 的核心循环

final void runWorker(Worker w) {
    Thread wt = Thread.currentThread();
    Runnable task = w.firstTask;  // 第一个任务
    w.firstTask = null;          // 清空,任务已取出

    boolean completedAbruptly = true;
    try {
        // 关键:while 循环 → 线程复用的核心
        while (task != null || (task = getTask()) != null) {
            // 执行前:获取锁(可中断、可超时)
            w.lock();

            // 清理中断标志
            if ((runStateAtLeast(ctl.get(), STOP) ||
                 (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) &&
                !wt.isInterrupted())
                wt.interrupt();

            try {
                beforeExecute(wt, task);  // 钩子方法
                task.run();               // 执行任务
                afterExecute(task, thrown);  // 钩子方法
            } catch (Throwable ex) {
                afterExecute(task, ex);
                throw ex;
            } finally {
                task = null;  // 任务置 null,进入下一次 while 循环获取新任务
                w.completedTasks++;
                w.unlock();
            }
        }
        completedAbruptly = false;
    } finally {
        processWorkerExit(w, completedAbruptly);  // 退出处理
    }
}

线程复用的精髓

while (task != null || (task = getTask()) != null)

  • task != null:执行第一个任务
  • task = getTask()从队列获取下一个任务。如果队列为空,getTask() 会阻塞
  • getTask() 返回 null(队列为空 + 线程需要退出),退出 while 循环
  • 线程不退出,而是回到 while 循环,从队列获取新任务(如果队列有新任务)

P7 级别:getTask() 的阻塞逻辑

getTask() 的实现

private Runnable getTask() {
    boolean timedOut = false;  // 上次 poll 是否超时

    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);

        // 1. 线程池已停止或队列为空 → 返回 null(退出)
        if (rs `>=` SHUTDOWN && (rs `>=` STOP || workQueue.isEmpty())) {
            decrementWorkerCount();
            return null;
        }

        int wc = workerCountOf(c);

        // 2. 判断是否需要超时控制
        // timed = 允许核心线程超时 或 当前线程数 > 核心线程数
        boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

        // 3. 超时控制
        if ((wc > maximumPoolSize || (timed && timedOut)) && wc > 1) {
            // 减少 worker 数量
            if (compareAndDecrementWorkerCount(c))
                return null;
            continue;
        }

        // 4. 从队列获取任务
        Runnable r;
        if (timed) {
            // 超时模式:poll(keepAliveTime) → 等待一段时间
            r = workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS);
            if (r == null) timedOut = true;  // 超时标记
        } else {
            // 阻塞模式:take() → 无限等待
            r = workQueue.take();
        }

        if (r != null) return r;  // 拿到任务
        // r == null → timedOut=true,重新进入循环,判断是否退出
    }
}

getTask() 的两种获取模式

  • 核心线程timed = false):调用 take() 无限阻塞,直到队列有新任务
  • 非核心线程timed = true):调用 poll(timeout) 等待一段时间,超时返回 null

【面试官心理】 这道题我能问到 P7 级别,是因为 runWorker() 和 getTask() 的循环是线程池复用机制的核心。能说清 getTask() 的 timed vs non-timed 模式的候选人说明他理解了线程回收机制。能说清 completedAbruptly 异常退出的候选人说明他对边界情况有思考。

1.4 追问升级

追问 1:线程池怎么实现线程复用的?

通过 runWorker() 的 while 循环实现。线程执行完第一个任务后,不退出,而是调用 getTask() 从队列获取下一个任务。只要队列不空,线程就一直从队列获取任务并执行——这就是复用。

追问 2:如果线程在执行任务时抛出异常,会怎样?

runWorker() 的 try-catch 捕获异常(Throwable),调用 afterExecute(),然后重新进入 while 循环获取下一个任务。但该线程不会退出,而是由 try-finally 中的 completedAbruptly = true 触发 processWorkerExit(),如果 completedAbruptly = true,线程会被移除。

二、生产避坑 🟡

2.1 任务抛出异常导致线程消失

pool.execute(() -> {
    throw new RuntimeException("Task error");
});

如果任务抛出未捕获的异常,且线程工厂创建的线程没有设置 UncaughtExceptionHandler,JVM 默认会打印堆栈但线程不会消失(runWorker 的 catch 会处理)。

但如果使用 submit() 返回 Future,异常会被包装在 Future 中。

2.2 拒绝策略的选择

// CallerRunsPolicy:将任务退回给调用方
// 适合:需要限流,但不希望丢弃任务的场景
new ThreadPoolExecutor.CallerRunsPolicy()

// 问题:如果调用方也是线程池线程,可能导致线程池饱和
pool.submit(() -> pool.execute(task));  // 调用方是线程池 → 可能死锁

三、钩子方法与监控 🟢

3.1 beforeExecute 和 afterExecute

ThreadPoolExecutor executor = new ThreadPoolExecutor(...) {
    @Override
>    protected void beforeExecute(Thread t, Runnable r) {
>        // 记录开始时间、监控
>    }
>
>    @Override
>    protected void afterExecute(Runnable r, Throwable t) {
>        // 记录结束时间
>        // 如果 t != null,记录异常信息
>    }
};

3.2 监控指标

pool.getActiveCount();    // 当前活跃线程数
pool.getCompletedTaskCount();  // 已完成任务数
pool.getQueue().size();   // 队列大小
pool.getLargestPoolSize();    // 历史最大线程数
💡

面试加分点:能说出"JDK 8 的 CompletableFuture 内部使用 ForkJoinPool.commonPool(),这个线程池默认的线程数是 availableProcessors() - 1(如果 <= 1 则为 1)",说明他对 JDK 8 的异步工具有了解。

⚠️

面试陷阱:被问到"getTask() 中 timed=false 的核心线程在队列为空时会一直阻塞吗",很多人会说"是"。准确答案是:会的,直到有新任务加入队列(take() 阻塞)或线程池被 shutdown。核心线程在没有任务时会一直等待,不会被回收。