Netty 线程模型(Reactor)

很多同学学 Netty 的时候,会被"BossGroup"、"WorkerGroup"、"NioEventLoopGroup" 这些概念绕晕。为什么要两组线程?它们是怎么配合工作的?

我当初也是看了很多遍才搞明白。后来发现,只要理解了一个词——Reactor(反应器),一切就清晰了。

Reactor 的核心思想是:事件驱动。你不用主动去问"有没有人连接",而是等操作系统告诉你"有人来了"。Netty 正是基于这个模式设计的。

今天这篇文章,我们从 Reactor 模式讲起,把 Netty 的线程模型彻底搞透。

一、Reactor 模式是什么

1.1 背景:BIO 的问题

在讲 Reactor 之前,先回顾一下 BIO 的问题:

// BIO 线程模型
while (true) {
    Socket socket = server.accept();  // 阻塞
    new Thread(() -> handle(socket)).start();  // 每连接一线程
}

问题:一个线程只能处理一个连接。连接多了,线程就爆炸了。

1.2 Reactor 的核心思想

Reactor 模式的核心是:把"等待事件"和"处理事件"分开

传统模式:                         Reactor 模式:
                                 
 连接1 ──→ 线程 ──→ 处理           事件分发器(单线程)
 连接2 ──→ 线程 ──→ 处理    ───→  ─┬─ 连接1 ──→ Handler
 连接3 ──→ 线程 ──→ 处理     ───→  ├─ 连接2 ──→ Handler
      ...                           ├─ 连接3 ──→ Handler
                                    └─ ...

单线程的事件分发器(Reactor)负责:

  1. 监听所有连接的事件(accept、read、write)
  2. 事件就绪时,分发给对应的 Handler 处理

这样,一个线程就能管理成百上千个连接了。

1.3 Reactor 的三种模式

模式说明适用场景
单 Reactor 单线程一个 Reactor 处理所有事件简单场景,不适合 CPU 密集型
单 Reactor 多线程一个 Reactor + 线程池处理业务常用模式
多 Reactor 多线程主 Reactor 处理 accept,子 Reactor 处理 IONetty 使用

Netty 使用的是第三种:多 Reactor 多线程

二、Netty 的线程模型

2.1 整体架构图

                    ┌─────────────────────────────────────────┐
                    │                  Client                  │
                    └─────────────────┬───────────────────────┘
                                        │ TCP 连接

                    ┌─────────────────────────────────────────┐
                    │              BossGroup                    │
                    │  ┌─────────────────────────────────┐    │
                    │  │       NioEventLoop (1个)         │    │
                    │  │   accept() 接受新连接            │    │
                    │  └─────────────────────────────────┘    │
                    └─────────────────┬───────────────────────┘
                                        │ 注册到 Worker

                    ┌─────────────────────────────────────────┐
                    │              WorkerGroup                  │
                    │  ┌──────────┐ ┌──────────┐ ┌──────────┐  │
                    │  │ NioEvent │ │ NioEvent │ │ NioEvent │  │
                    │  │  Loop 1  │ │  Loop 2  │ │  Loop N  │  │
                    │  │ read/write│ │ read/write│ │ read/write│ │
                    │  └──────────┘ └──────────┘ └──────────┘  │
                    └─────────────────────────────────────────┘

2.2 BossGroup 和 WorkerGroup

线程组作用线程数量负责事件
BossGroup接受新连接通常 1 个OP_ACCEPT
WorkerGroup处理 IO 读写通常 2 * CPU 核心数OP_READ、OP_WRITE
// BossGroup 通常用 1 个线程
EventLoopGroup bossGroup = new NioEventLoopGroup(1);

// WorkerGroup 用 CPU 核心数的 2 倍
EventLoopGroup workerGroup = new NioEventLoopGroup(); // 默认是 CPU*2

2.3 【直观类比】餐厅模型

【直观类比】

把 Netty 比作一家餐厅:

  • BossGroup:前台接待员(通常 1 个)。只负责迎接客人(接受连接),客人坐下后就不管了。
  • WorkerGroup:服务员团队(多个)。负责给客人点菜、上菜(读写数据)。
餐厅门口 ──→ 接待员(Boss)接受客人 ──→ 分配到服务员(Worker)──→ 服务员处理
              只负责迎客                     客人坐下了                   后续服务

如果接待员和服务员是同一个人,那他接待客人的时候就没法给其他桌服务了。所以分开,各司其职。

三、NioEventLoop:核心执行单元

3.1 NioEventLoop 是什么

NioEventLoop 是 Netty 的核心执行单元,它组合了三个角色:

NioEventLoop = Selector + Thread + TaskQueue
组件作用
Selector监听 Channel 的 IO 事件
Thread执行任务的线程
TaskQueue存放待执行的任务
// NioEventLoop 的核心循环
while (!terminated) {
    // 1. 监听 IO 事件(阻塞直到有事件就绪)
    int ready = selector.select();
    
    // 2. 处理 IO 事件
    processSelectedKeys();
    
    // 3. 处理普通任务(TaskQueue 中的任务)
    runAllTasks();
}

3.2 一个 NioEventLoop 的一生

public final class NioEventLoop extends SingleThreadEventLoop {
    
    private final Selector selector;
    private final Thread thread;
    private final Queue<Runnable> taskQueue;
    
    @Override
    protected void run() {
        for (;;) {
            try {
                // 阻塞等待事件
                int ready = selectStrategy.calculateStrategy(
                    selectSupplier, hasTasks()
                )();
                
                // 处理就绪的 Keys
                processSelectedKeys();
                
                // 执行任务(队列中的任务)
                runAllTasks();
                
            } catch (Exception e) {
                // 异常处理
            }
            
            // 检查是否应该退出
            if (isShuttingDown()) {
                closeAll();
                if (confirmShutdown()) {
                    break;
                }
            }
        }
    }
}

3.3 NioEventLoopGroup 管理多个 NioEventLoop

NioEventLoopGroup group = new NioEventLoopGroup(4); // 创建 4 个 NioEventLoop

// 分配 Channel 时,Netty 会轮询选择 NioEventLoop
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(group);  // WorkerGroup
bootstrap.channel(NioSocketChannel.class);

Netty 使用 PowerOfTwoEventExecutorChooser 实现轮询分配:

// 分配算法
private final AtomicInteger idx = new AtomicInteger();

public EventExecutor next() {
    return executors[Math.abs(idx.getAndIncrement() % executors.length)];
}

四、完整的 Netty 服务器示例

4.1 标准 Echo 服务器

public class NettyEchoServer {
    public static void main(String[] args) {
        // 1. 创建 Boss 和 Worker 线程组
        EventLoopGroup bossGroup = new NioEventLoopGroup(1);
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        
        try {
            // 2. 创建服务器启动引导
            ServerBootstrap bootstrap = new ServerBootstrap();
            bootstrap.group(bossGroup, workerGroup)      // 绑定线程组
                     .channel(NioServerSocketChannel.class)  // 使用 NIO
                     .option(ChannelOption.SO_BACKLOG, 128)  // 连接队列大小
                     .childOption(ChannelOption.SO_KEEPALIVE, true)  // TCP KeepAlive
                     .handler(new LoggingHandler(LogLevel.INFO))       // Boss 处理器
                     .childHandler(new ChannelInitializer<SocketChannel>() {
                         @Override
                         protected void initChannel(SocketChannel ch) {
                             ch.pipeline()
                               .addLast(new LineBasedFrameDecoder(1024))
                               .addLast(new StringDecoder())
                               .addLast(new EchoServerHandler());
                         }
                     });
            
            // 3. 绑定端口
            ChannelFuture future = bootstrap.bind(8080).sync();
            System.out.println("Netty 服务器启动,端口 8080");
            
            // 4. 等待服务器关闭
            future.channel().closeFuture().sync();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            // 5. 优雅关闭
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }
}

class EchoServerHandler extends ChannelInboundHandlerAdapter {
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        // 收到数据后写回
        ByteBuf in = (ByteBuf) msg;
        System.out.println("收到: " + in.toString(CharsetUtil.UTF_8));
        ctx.write(in);  // 写回给客户端
    }
    
    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) {
        ctx.flush();  // 刷新数据
    }
    
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        cause.printStackTrace();
        ctx.close();
    }
}

4.2 服务器启动流程解析

bootstrap.bind(8080).sync()


ServerBootstrap.bind()


init(serverChannel)  // 初始化 ServerSocketChannel


doBind(0.0.0.0:8080)


headPipeline.fireChannelActive()


NioServerSocketChannel.doBeginRead()


SelectionKey.register(OP_ACCEPT)


NioEventLoop.execute(() -> doBind...)


NioEventLoop.run() 开始监听 accept 事件

五、Boss 和 Worker 的协作流程

5.1 连接建立的完整流程

1. Client ──→ TCP 三次握手 ──→ Server

2. Boss NioEventLoop:
   selector.select() 返回 OP_ACCEPT 事件

3. ServerSocketChannel.accept() ──→ 创建 SocketChannel

4. SocketChannel 注册到 WorkerGroup:
   socketChannel.register(workerSelector, OP_READ, attachment)

5. Worker NioEventLoop:
   selector.select() 返回 OP_READ 事件

6. Pipeline.fireChannelRead() ──→ Handler 链处理

5.2 代码层面的协作

// Boss 的 ServerBootstrapAcceptor
// 这个 Handler 是 Netty 自动添加的,负责把新连接注册到 Worker
public void channelRead(ChannelHandlerContext ctx, Object msg) {
    NioSocketChannel ch = (NioEventLoop) msg;
    
    // 把新连接注册到 WorkerGroup
    ch.pipeline().addLast(workerGroup, handler);
    
    // 触发 channelActive 事件
    ch.fireChannelActive();
}

5.3 线程模型的配置调优

// 默认配置:boss=1, worker=CPU*2
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();

// 调优场景:
// 1. 连接非常多(>10000)
EventLoopGroup bossGroup = new NioEventLoopGroup(2); // 增加 Boss 线程

// 2. 业务逻辑重
EventLoopGroup workerGroup = new NioEventLoopGroup(4); // 增加 Worker 线程
// 同时把业务处理丢到业务线程池

// 3. 需要区分 IO 和业务
EventLoopGroup ioGroup = new NioEventLoopGroup();  // 只做 IO
EventLoopGroup bizGroup = new NioEventLoopGroup(); // 只做业务
bootstrap.group(ioGroup).childGroup(bizGroup);

六、Pipeline 和 Handler

6.1 Pipeline 是什么

每个 Channel 都有一个 Pipeline,它是一个 Handler 的双向链表:

Inbound 顺序:                    Outbound 顺序:

ChannelRead ← ← ← ← ← ← ← ← ← ← ← │ ← ← ← ← ← ← ← ← ← ← Write
    │                                    │
    ▼                                    ▼
┌───────┐   ┌───────┐   ┌───────┐   ┌───────┐
│Handler│──▶│Handler│──▶│Handler│   │Handler│──▶│Handler│──▶│Handler│
│   1   │   │   2   │   │   3   │   │   1   │   │   2   │   │   3   │
└───────┘   └───────┘   └───────┘   └───────┘   └───────┘   └───────┘
Inbound:  head → 1 → 2 → 3 → tail
Outbound: tail → 3 → 2 → 1 → head

6.2 Inbound 和 Outbound Handler

// Inbound: 数据进来时处理(读数据、解码等)
public class MyInboundHandler extends ChannelInboundHandlerAdapter {
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        // 处理收到的数据
        ctx.fireChannelRead(msg); // 传递到下一个 Handler
    }
}

// Outbound: 数据出去时处理(写数据、编码等)
public class MyOutboundHandler extends ChannelOutboundHandlerAdapter {
    @Override
    public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
        // 处理要发送的数据
        ctx.write(msg, promise);
    }
}

6.3 ChannelHandlerContext

每个 Handler 都有自己的 Context,可以操作 Pipeline:

public class Handler extends ChannelInboundHandlerAdapter {
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        // 1. 传递给下一个 Handler
        ctx.fireChannelRead(msg);
        
        // 2. 主动写数据给客户端
        ctx.writeAndFlush("response");
        
        // 3. 获取 Channel
        Channel ch = ctx.channel();
        
        // 4. 获取 Pipeline
        ChannelPipeline pipeline = ctx.pipeline();
        
        // 5. 在 Pipeline 中插入/移除 Handler
        // pipeline.addBefore("thisHandler", "newHandler", newHandler);
        // pipeline.remove(this);
    }
}

七、生产最佳实践

7.1 ❌ 错误示范:在 IO 线程中做耗时操作

public class BadHandler extends ChannelInboundHandlerAdapter {
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        // ❌ 错误的做法:在 IO 线程中做耗时操作
        String result = doHeavyCalculation(msg);
        ctx.writeAndFlush(result);
    }
}

问题:如果计算耗时 1 秒,这个 Worker 线程就卡住 1 秒,无法处理其他连接。

正确做法:把耗时操作丢到业务线程池:

public class GoodHandler extends ChannelInboundHandlerAdapter {
    private final ExecutorService executor = Executors.newFixedThreadPool(10);
    
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        // 丢到业务线程池
        executor.submit(() -> {
            String result = doHeavyCalculation(msg);
            // 注意:回调时已经不在 IO 线程了
            ctx.writeAndFlush(result);
        });
    }
}

7.2 ❌ 错误示范:Channel 未移除导致内存泄漏

// ❌ ByteBuf 没有释放
public class LeakyHandler extends ChannelInboundHandlerAdapter {
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        ByteBuf buf = (ByteBuf) msg;
        // 处理数据...
        // ❌ 没有释放 buf!
        System.out.println(buf.toString());
    }
}

正确做法:使用 ReferenceCountUtil.release() 或使用 SimpleChannelInboundHandler

// 方式1:手动释放
public class ManualHandler extends ChannelInboundHandlerAdapter {
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        try {
            ByteBuf buf = (ByteBuf) msg;
            // 处理数据...
        } finally {
            ReferenceCountUtil.release(msg); // 释放
        }
    }
}

// 方式2:使用 SimpleChannelInboundHandler(推荐)
public class AutoHandler extends SimpleChannelInboundHandler<ByteBuf> {
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) {
        // Netty 会自动释放 msg
        System.out.println(msg.toString());
    }
}

7.3 ✅ 正确示范:优雅关闭

public class ServerShutdown {
    public static void main(String[] args) {
        EventLoopGroup bossGroup = new NioEventLoopGroup(1);
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        
        try {
            // 启动服务器...
            ChannelFuture future = bootstrap.bind(8080).sync();
            
            // 添加关闭钩子
            Runtime.getRuntime().addShutdownHook(new Thread(() -> {
                // 优雅关闭:停止接受新连接,等待现有连接处理完
                bossGroup.shutdownGracefully().awaitUninterruptibly();
                workerGroup.shutdownGracefully().awaitUninterruptibly();
            }));
            
            future.channel().closeFuture().sync();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

八、面试追问链

第一层:基础概念

面试官问:"Netty 的线程模型是什么?"

Netty 使用的是 Reactor 模式,主从两组 EventLoopGroup:BossGroup 处理 OP_ACCEPT 事件(接受新连接),WorkerGroup 处理 OP_READ/OP_WRITE 事件(IO 读写)。

第二层:Boss 和 Worker 协作

面试官追问:"BossGroup 和 WorkerGroup 是怎么配合工作的?"

BossGroup 通常只有 1 个线程,它调用 accept() 接受新连接,然后把新建的 SocketChannel 注册到 WorkerGroup。WorkerGroup 有多个 NioEventLoop(默认 CPU*2),每个 EventLoop 有自己的 Selector,监听注册到它上面的 Channel 的 IO 事件。

第三层:NioEventLoop 的工作原理

面试官追问:"NioEventLoop 是怎么工作的?"

NioEventLoop 内部有一个 Selector、一个 Thread 和一个 TaskQueue。它的工作循环是:select() 监听 IO 事件 → processSelectedKeys() 处理事件 → runAllTasks() 执行普通任务。这样的设计让 IO 事件和普通任务在同一个线程中执行,避免了锁竞争。

第四层:性能调优

面试官追问:"什么情况下需要调整 BossGroup 和 WorkerGroup 的大小?"

通常 BossGroup 用 1 个线程就够了,除非有大量短连接导致连接风暴。如果业务逻辑耗时很长,应该把业务处理丢到独立的业务线程池,不阻塞 WorkerGroup。

【学习小结】

  • Netty 使用 Reactor 模式:事件驱动
  • BossGroup 处理 accept(通常 1 个线程)
  • WorkerGroup 处理 IO(通常 CPU*2 个线程)
  • NioEventLoop = Selector + Thread + TaskQueue
  • Pipeline 是 Handler 的双向链表
  • 耗时操作要丢到业务线程池,不能阻塞 IO 线程
  • 注意 ByteBuf 的释放,避免内存泄漏