Netty 线程模型(Reactor)
很多同学学 Netty 的时候,会被"BossGroup"、"WorkerGroup"、"NioEventLoopGroup" 这些概念绕晕。为什么要两组线程?它们是怎么配合工作的?
我当初也是看了很多遍才搞明白。后来发现,只要理解了一个词——Reactor(反应器),一切就清晰了。
Reactor 的核心思想是:事件驱动。你不用主动去问"有没有人连接",而是等操作系统告诉你"有人来了"。Netty 正是基于这个模式设计的。
今天这篇文章,我们从 Reactor 模式讲起,把 Netty 的线程模型彻底搞透。
一、Reactor 模式是什么
1.1 背景:BIO 的问题
在讲 Reactor 之前,先回顾一下 BIO 的问题:
// BIO 线程模型
while (true) {
Socket socket = server.accept(); // 阻塞
new Thread(() -> handle(socket)).start(); // 每连接一线程
}
问题:一个线程只能处理一个连接。连接多了,线程就爆炸了。
1.2 Reactor 的核心思想
Reactor 模式的核心是:把"等待事件"和"处理事件"分开。
传统模式: Reactor 模式:
连接1 ──→ 线程 ──→ 处理 事件分发器(单线程)
连接2 ──→ 线程 ──→ 处理 ───→ ─┬─ 连接1 ──→ Handler
连接3 ──→ 线程 ──→ 处理 ───→ ├─ 连接2 ──→ Handler
... ├─ 连接3 ──→ Handler
└─ ...
单线程的事件分发器(Reactor)负责:
- 监听所有连接的事件(accept、read、write)
- 事件就绪时,分发给对应的 Handler 处理
这样,一个线程就能管理成百上千个连接了。
1.3 Reactor 的三种模式
Netty 使用的是第三种:多 Reactor 多线程。
二、Netty 的线程模型
2.1 整体架构图
┌─────────────────────────────────────────┐
│ Client │
└─────────────────┬───────────────────────┘
│ TCP 连接
▼
┌─────────────────────────────────────────┐
│ BossGroup │
│ ┌─────────────────────────────────┐ │
│ │ NioEventLoop (1个) │ │
│ │ accept() 接受新连接 │ │
│ └─────────────────────────────────┘ │
└─────────────────┬───────────────────────┘
│ 注册到 Worker
▼
┌─────────────────────────────────────────┐
│ WorkerGroup │
│ ┌──────────┐ ┌──────────┐ ┌──────────┐ │
│ │ NioEvent │ │ NioEvent │ │ NioEvent │ │
│ │ Loop 1 │ │ Loop 2 │ │ Loop N │ │
│ │ read/write│ │ read/write│ │ read/write│ │
│ └──────────┘ └──────────┘ └──────────┘ │
└─────────────────────────────────────────┘
2.2 BossGroup 和 WorkerGroup
// BossGroup 通常用 1 个线程
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
// WorkerGroup 用 CPU 核心数的 2 倍
EventLoopGroup workerGroup = new NioEventLoopGroup(); // 默认是 CPU*2
2.3 【直观类比】餐厅模型
【直观类比】
把 Netty 比作一家餐厅:
- BossGroup:前台接待员(通常 1 个)。只负责迎接客人(接受连接),客人坐下后就不管了。
- WorkerGroup:服务员团队(多个)。负责给客人点菜、上菜(读写数据)。
餐厅门口 ──→ 接待员(Boss)接受客人 ──→ 分配到服务员(Worker)──→ 服务员处理
只负责迎客 客人坐下了 后续服务
如果接待员和服务员是同一个人,那他接待客人的时候就没法给其他桌服务了。所以分开,各司其职。
三、NioEventLoop:核心执行单元
3.1 NioEventLoop 是什么
NioEventLoop 是 Netty 的核心执行单元,它组合了三个角色:
NioEventLoop = Selector + Thread + TaskQueue
// NioEventLoop 的核心循环
while (!terminated) {
// 1. 监听 IO 事件(阻塞直到有事件就绪)
int ready = selector.select();
// 2. 处理 IO 事件
processSelectedKeys();
// 3. 处理普通任务(TaskQueue 中的任务)
runAllTasks();
}
3.2 一个 NioEventLoop 的一生
public final class NioEventLoop extends SingleThreadEventLoop {
private final Selector selector;
private final Thread thread;
private final Queue<Runnable> taskQueue;
@Override
protected void run() {
for (;;) {
try {
// 阻塞等待事件
int ready = selectStrategy.calculateStrategy(
selectSupplier, hasTasks()
)();
// 处理就绪的 Keys
processSelectedKeys();
// 执行任务(队列中的任务)
runAllTasks();
} catch (Exception e) {
// 异常处理
}
// 检查是否应该退出
if (isShuttingDown()) {
closeAll();
if (confirmShutdown()) {
break;
}
}
}
}
}
3.3 NioEventLoopGroup 管理多个 NioEventLoop
NioEventLoopGroup group = new NioEventLoopGroup(4); // 创建 4 个 NioEventLoop
// 分配 Channel 时,Netty 会轮询选择 NioEventLoop
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(group); // WorkerGroup
bootstrap.channel(NioSocketChannel.class);
Netty 使用 PowerOfTwoEventExecutorChooser 实现轮询分配:
// 分配算法
private final AtomicInteger idx = new AtomicInteger();
public EventExecutor next() {
return executors[Math.abs(idx.getAndIncrement() % executors.length)];
}
四、完整的 Netty 服务器示例
4.1 标准 Echo 服务器
public class NettyEchoServer {
public static void main(String[] args) {
// 1. 创建 Boss 和 Worker 线程组
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
// 2. 创建服务器启动引导
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(bossGroup, workerGroup) // 绑定线程组
.channel(NioServerSocketChannel.class) // 使用 NIO
.option(ChannelOption.SO_BACKLOG, 128) // 连接队列大小
.childOption(ChannelOption.SO_KEEPALIVE, true) // TCP KeepAlive
.handler(new LoggingHandler(LogLevel.INFO)) // Boss 处理器
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) {
ch.pipeline()
.addLast(new LineBasedFrameDecoder(1024))
.addLast(new StringDecoder())
.addLast(new EchoServerHandler());
}
});
// 3. 绑定端口
ChannelFuture future = bootstrap.bind(8080).sync();
System.out.println("Netty 服务器启动,端口 8080");
// 4. 等待服务器关闭
future.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
// 5. 优雅关闭
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
class EchoServerHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
// 收到数据后写回
ByteBuf in = (ByteBuf) msg;
System.out.println("收到: " + in.toString(CharsetUtil.UTF_8));
ctx.write(in); // 写回给客户端
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) {
ctx.flush(); // 刷新数据
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
cause.printStackTrace();
ctx.close();
}
}
4.2 服务器启动流程解析
bootstrap.bind(8080).sync()
│
▼
ServerBootstrap.bind()
│
▼
init(serverChannel) // 初始化 ServerSocketChannel
│
▼
doBind(0.0.0.0:8080)
│
▼
headPipeline.fireChannelActive()
│
▼
NioServerSocketChannel.doBeginRead()
│
▼
SelectionKey.register(OP_ACCEPT)
│
▼
NioEventLoop.execute(() -> doBind...)
│
▼
NioEventLoop.run() 开始监听 accept 事件
五、Boss 和 Worker 的协作流程
5.1 连接建立的完整流程
1. Client ──→ TCP 三次握手 ──→ Server
│
2. Boss NioEventLoop:
selector.select() 返回 OP_ACCEPT 事件
│
3. ServerSocketChannel.accept() ──→ 创建 SocketChannel
│
4. SocketChannel 注册到 WorkerGroup:
socketChannel.register(workerSelector, OP_READ, attachment)
│
5. Worker NioEventLoop:
selector.select() 返回 OP_READ 事件
│
6. Pipeline.fireChannelRead() ──→ Handler 链处理
5.2 代码层面的协作
// Boss 的 ServerBootstrapAcceptor
// 这个 Handler 是 Netty 自动添加的,负责把新连接注册到 Worker
public void channelRead(ChannelHandlerContext ctx, Object msg) {
NioSocketChannel ch = (NioEventLoop) msg;
// 把新连接注册到 WorkerGroup
ch.pipeline().addLast(workerGroup, handler);
// 触发 channelActive 事件
ch.fireChannelActive();
}
5.3 线程模型的配置调优
// 默认配置:boss=1, worker=CPU*2
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
// 调优场景:
// 1. 连接非常多(>10000)
EventLoopGroup bossGroup = new NioEventLoopGroup(2); // 增加 Boss 线程
// 2. 业务逻辑重
EventLoopGroup workerGroup = new NioEventLoopGroup(4); // 增加 Worker 线程
// 同时把业务处理丢到业务线程池
// 3. 需要区分 IO 和业务
EventLoopGroup ioGroup = new NioEventLoopGroup(); // 只做 IO
EventLoopGroup bizGroup = new NioEventLoopGroup(); // 只做业务
bootstrap.group(ioGroup).childGroup(bizGroup);
六、Pipeline 和 Handler
6.1 Pipeline 是什么
每个 Channel 都有一个 Pipeline,它是一个 Handler 的双向链表:
Inbound 顺序: Outbound 顺序:
│
ChannelRead ← ← ← ← ← ← ← ← ← ← ← │ ← ← ← ← ← ← ← ← ← ← Write
│ │
▼ ▼
┌───────┐ ┌───────┐ ┌───────┐ ┌───────┐
│Handler│──▶│Handler│──▶│Handler│ │Handler│──▶│Handler│──▶│Handler│
│ 1 │ │ 2 │ │ 3 │ │ 1 │ │ 2 │ │ 3 │
└───────┘ └───────┘ └───────┘ └───────┘ └───────┘ └───────┘
Inbound: head → 1 → 2 → 3 → tail
Outbound: tail → 3 → 2 → 1 → head
6.2 Inbound 和 Outbound Handler
// Inbound: 数据进来时处理(读数据、解码等)
public class MyInboundHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
// 处理收到的数据
ctx.fireChannelRead(msg); // 传递到下一个 Handler
}
}
// Outbound: 数据出去时处理(写数据、编码等)
public class MyOutboundHandler extends ChannelOutboundHandlerAdapter {
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
// 处理要发送的数据
ctx.write(msg, promise);
}
}
6.3 ChannelHandlerContext
每个 Handler 都有自己的 Context,可以操作 Pipeline:
public class Handler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
// 1. 传递给下一个 Handler
ctx.fireChannelRead(msg);
// 2. 主动写数据给客户端
ctx.writeAndFlush("response");
// 3. 获取 Channel
Channel ch = ctx.channel();
// 4. 获取 Pipeline
ChannelPipeline pipeline = ctx.pipeline();
// 5. 在 Pipeline 中插入/移除 Handler
// pipeline.addBefore("thisHandler", "newHandler", newHandler);
// pipeline.remove(this);
}
}
七、生产最佳实践
7.1 ❌ 错误示范:在 IO 线程中做耗时操作
public class BadHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
// ❌ 错误的做法:在 IO 线程中做耗时操作
String result = doHeavyCalculation(msg);
ctx.writeAndFlush(result);
}
}
问题:如果计算耗时 1 秒,这个 Worker 线程就卡住 1 秒,无法处理其他连接。
正确做法:把耗时操作丢到业务线程池:
public class GoodHandler extends ChannelInboundHandlerAdapter {
private final ExecutorService executor = Executors.newFixedThreadPool(10);
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
// 丢到业务线程池
executor.submit(() -> {
String result = doHeavyCalculation(msg);
// 注意:回调时已经不在 IO 线程了
ctx.writeAndFlush(result);
});
}
}
7.2 ❌ 错误示范:Channel 未移除导致内存泄漏
// ❌ ByteBuf 没有释放
public class LeakyHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
ByteBuf buf = (ByteBuf) msg;
// 处理数据...
// ❌ 没有释放 buf!
System.out.println(buf.toString());
}
}
正确做法:使用 ReferenceCountUtil.release() 或使用 SimpleChannelInboundHandler:
// 方式1:手动释放
public class ManualHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
try {
ByteBuf buf = (ByteBuf) msg;
// 处理数据...
} finally {
ReferenceCountUtil.release(msg); // 释放
}
}
}
// 方式2:使用 SimpleChannelInboundHandler(推荐)
public class AutoHandler extends SimpleChannelInboundHandler<ByteBuf> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) {
// Netty 会自动释放 msg
System.out.println(msg.toString());
}
}
7.3 ✅ 正确示范:优雅关闭
public class ServerShutdown {
public static void main(String[] args) {
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
// 启动服务器...
ChannelFuture future = bootstrap.bind(8080).sync();
// 添加关闭钩子
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
// 优雅关闭:停止接受新连接,等待现有连接处理完
bossGroup.shutdownGracefully().awaitUninterruptibly();
workerGroup.shutdownGracefully().awaitUninterruptibly();
}));
future.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
八、面试追问链
第一层:基础概念
面试官问:"Netty 的线程模型是什么?"
Netty 使用的是 Reactor 模式,主从两组 EventLoopGroup:BossGroup 处理 OP_ACCEPT 事件(接受新连接),WorkerGroup 处理 OP_READ/OP_WRITE 事件(IO 读写)。
第二层:Boss 和 Worker 协作
面试官追问:"BossGroup 和 WorkerGroup 是怎么配合工作的?"
BossGroup 通常只有 1 个线程,它调用 accept() 接受新连接,然后把新建的 SocketChannel 注册到 WorkerGroup。WorkerGroup 有多个 NioEventLoop(默认 CPU*2),每个 EventLoop 有自己的 Selector,监听注册到它上面的 Channel 的 IO 事件。
第三层:NioEventLoop 的工作原理
面试官追问:"NioEventLoop 是怎么工作的?"
NioEventLoop 内部有一个 Selector、一个 Thread 和一个 TaskQueue。它的工作循环是:select() 监听 IO 事件 → processSelectedKeys() 处理事件 → runAllTasks() 执行普通任务。这样的设计让 IO 事件和普通任务在同一个线程中执行,避免了锁竞争。
第四层:性能调优
面试官追问:"什么情况下需要调整 BossGroup 和 WorkerGroup 的大小?"
通常 BossGroup 用 1 个线程就够了,除非有大量短连接导致连接风暴。如果业务逻辑耗时很长,应该把业务处理丢到独立的业务线程池,不阻塞 WorkerGroup。
【学习小结】
- Netty 使用 Reactor 模式:事件驱动
- BossGroup 处理 accept(通常 1 个线程)
- WorkerGroup 处理 IO(通常 CPU*2 个线程)
- NioEventLoop = Selector + Thread + TaskQueue
- Pipeline 是 Handler 的双向链表
- 耗时操作要丢到业务线程池,不能阻塞 IO 线程
- 注意 ByteBuf 的释放,避免内存泄漏