Netty 核心组件(Channel / EventLoop / Pipeline)
Netty 的组件很多,我当年学的时候也是一脸懵:Channel 是什么?Pipeline 和 Handler 是什么关系?EventLoop 和 EventExecutor 有什么区别?
后来仔细研究了一下源码,发现这些组件的设计其实非常清晰:Channel 是连接,Pipeline 是处理链,EventLoop 是执行引擎,Handler 是处理逻辑。
今天我们就来把这些核心组件彻底讲透。
一、Channel:连接的定义
1.1 Channel 是什么
Channel 是 Netty 对网络连接的抽象,类似于 JDK NIO 的 SocketChannel 和 ServerSocketChannel,但封装得更好。
// NIO 原生
SocketChannel channel = SocketChannel.open();
channel.connect(new InetSocketAddress("localhost", 8080));
// Netty
SocketChannel ch = ctx.channel(); // 获取当前连接的 Channel
ch.writeAndFlush(msg); // 写数据
ch.close(); // 关闭连接
1.2 常见的 Channel 类型
// 客户端引导
Bootstrap bootstrap = new Bootstrap();
bootstrap.channel(NioSocketChannel.class); // NIO 客户端
// 服务器引导
ServerBootstrap server = new ServerBootstrap();
server.channel(NioServerSocketChannel.class); // NIO 服务器
1.3 Channel 的核心方法
// 连接相关
ChannelFuture connect(SocketAddress remoteAddress); // 连接
ChannelFuture bind(SocketAddress localAddress); // 绑定端口
ChannelFuture close(); // 关闭连接
// 数据读写
ChannelFuture writeAndFlush(Object msg); // 写并发送
Channel read(); // 读取数据
// 状态查询
boolean isActive(); // 是否活跃
boolean isOpen(); // 是否打开
boolean isWritable(); // 是否可写
// 属性
ChannelId id(); // 唯一标识
ChannelPipeline pipeline(); // 获取 Pipeline
EventLoop eventLoop(); // 获取 EventLoop
二、ChannelPipeline:处理链
2.1 Pipeline 是什么
每个 Channel 都有一个自己的 Pipeline,它是一个 Handler 的双向链表:
ChannelPipeline 内部结构:
┌─────────────────────────────────────┐
│ ChannelPipeline │
│ │
Inbound: Head → Handler1 → Handler2 → Handler3 → Tail
│ │
Outbound: Tail ← Handler3 ← Handler2 ← Handler1 ← Head
│ │
└─────────────────────────────────────┘
Inbound 事件(如 channelRead)从 Head 流向 Tail,Outbound 事件(如 write)从 Tail 流向 Head。
2.2 Pipeline 的创建
Pipeline 是在 Channel 创建时自动创建的:
// NioSocketChannel 的构造方法
public NioSocketChannel(Channel parent, SocketChannel socket) {
// 创建 Pipeline
pipeline = newChannelPipeline();
}
// 创建 Pipeline 时,自动添加 HeadContext 和 TailContext
protected DefaultChannelPipeline newChannelPipeline() {
return new DefaultChannelPipeline(this);
}
// Pipeline 构造函数
protected DefaultChannelPipeline(Channel channel) {
tail = new TailContext(this);
head = new HeadContext(this);
head.next = tail;
tail.prev = head;
}
2.3 Handler 添加顺序
Handler 的添加顺序很重要!Inbound 和 Outbound 的执行顺序不同:
ChannelPipeline p = ch.pipeline();
// 添加顺序:1 → 2 → 3
p.addLast("handler1", new Handler1()); // 第一个
p.addLast("handler2", new Handler2()); // 第二个
p.addLast("handler3", new Handler3()); // 第三个
// Inbound 执行顺序: Head → 1 → 2 → 3 → Tail
// Outbound 执行顺序: Tail → 3 → 2 → 1 → Head
2.4 常见的编解码 Handler
// 典型配置
ch.pipeline()
// ====== Inbound 编解码 ======
.addLast("frame", new LengthFieldBasedFrameDecoder(1024, 0, 4, 0, 4)) // 解决粘包
.addLast("decoder", new ByteToMessageDecoder() { ... }) // 自定义解码
.addLast("encoder", new MessageToByteEncoder<Msg>() { ... }) // 自定义编码
// ====== 业务 Handler ======
.addLast("bizHandler", new BusinessHandler());
三、ChannelHandler:处理逻辑
3.1 Handler 的分类
ChannelHandler
│
├── ChannelInboundHandler(入站处理器)
│ │
│ └── ChannelOutboundHandler(出站处理器)
│
└── ChannelDuplexHandler(双向处理器)
3.2 ChannelInboundHandler
Inbound 事件会触发 InboundHandler 的方法:
public class MyInboundHandler extends ChannelInboundHandlerAdapter {
// 连接建立
@Override
public void channelActive(ChannelHandlerContext ctx) {
System.out.println("连接建立: " + ctx.channel());
ctx.fireChannelActive(); // 传播到下一个 Handler
}
// 收到数据(最常用)
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
ByteBuf buf = (ByteBuf) msg;
System.out.println("收到: " + buf.toString(CharsetUtil.UTF_8));
// 处理完后要释放 ByteBuf
ReferenceCountUtil.release(msg);
}
// 数据读完
@Override
public void channelReadComplete(ChannelHandlerContext ctx) {
ctx.flush(); // 把写缓冲区的数据发送出去
}
// 连接关闭
@Override
public void channelInactive(ChannelHandlerContext ctx) {
System.out.println("连接断开: " + ctx.channel());
}
// 异常处理
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
cause.printStackTrace();
ctx.close(); // 关闭连接
}
}
3.3 ChannelOutboundHandler
Outbound 事件会触发 OutboundHandler 的方法:
public class MyOutboundHandler extends ChannelOutboundHandlerAdapter {
// 绑定端口
@Override
public void bind(ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) {
System.out.println("绑定端口: " + localAddress);
ctx.bind(localAddress, promise);
}
// 连接
@Override
public void connect(ChannelHandlerContext ctx, SocketAddress remoteAddress,
SocketAddress localAddress, ChannelPromise promise) {
System.out.println("连接到: " + remoteAddress);
ctx.connect(remoteAddress, localAddress, promise);
}
// 写数据
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
System.out.println("发送数据: " + msg);
ctx.write(msg, promise);
}
// 刷新(真正发送)
@Override
public void flush(ChannelHandlerContext ctx) {
ctx.flush();
}
}
3.4 SimpleChannelInboundHandler
对于不需要手动释放的 ByteBuf,可以使用 SimpleChannelInboundHandler:
// 自动释放 ByteBuf
public class AutoHandler extends SimpleChannelInboundHandler<String> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, String msg) {
// msg 已经是解码后的 String 类型
// 不用手动释放,Netty 会自动处理
System.out.println("收到消息: " + msg);
ctx.writeAndFlush("响应: " + msg);
}
}
四、ChannelHandlerContext:上下文
4.1 Context 是什么
每个 Handler 都有一个对应的 ChannelHandlerContext,它封装了 Handler 与 Pipeline 的关联:
ChannelPipeline pipeline = ch.pipeline();
ChannelHandlerContext ctx = pipeline.context(myHandler);
// ctx 可以做什么:
ctx.write(msg); // 从当前位置往后传播
ctx.fireChannelRead(msg); // 从当前位置往后传播(Inbound)
ctx.executor(); // 获取执行器
ctx.channel(); // 获取 Channel
ctx.pipeline(); // 获取 Pipeline
4.2 两类传播方法
// 传播事件(不改变位置)
ctx.fireChannelRead(msg); // Inbound 事件向上传播
ctx.fireChannelActive(); // 连接激活向上传播
// 处理事件(改变位置)
ctx.write(msg); // 从当前位置向前传播(Outbound)
ctx.channelRead(ctx, msg); // 从当前位置向前传播(Inbound)
4.3 动态修改 Pipeline
可以在运行时动态添加/删除 Handler:
// 动态添加 Handler
pipeline.addLast("newHandler", new NewHandler());
pipeline.addBefore("existing", "newHandler", new NewHandler());
pipeline.addAfter("existing", "newHandler", new NewHandler());
// 动态删除 Handler
pipeline.remove("handlerName");
pipeline.remove(myHandler);
// 动态替换 Handler
pipeline.replace("oldHandler", "newHandler", new NewHandler());
五、EventLoop:执行引擎
5.1 EventLoop 的前世今生
Netty 的 EventLoop 经历了多次演进:
Netty 3.x: Executor + ChannelHandler
Netty 4.x: EventExecutor → EventLoop → EventLoopGroup
Netty 5.x: 已废弃
5.2 EventExecutor vs EventLoop
// EventExecutor:任务的执行器
public interface EventExecutor extends ScheduledExecutorService {
EventExecutorGroup parent(); // 获取所属 Group
}
// EventLoop:可以处理 Channel IO 的执行器
public interface EventLoop extends EventExecutor {
EventLoopGroup parent(); // 获取所属 Group
ChannelFuture register(Channel channel); // 注册 Channel
}
简单说:EventLoop 是 EventExecutor 的子类,增加了 Channel 注册能力。
5.3 EventLoop 的分配策略
Channel 注册到 EventLoopGroup 时,Netty 使用轮询分配:
// 分配算法
public EventExecutor next() {
return chooser.next();
}
// 默认使用 PowerOfTwoEventExecutorChooser
// 保证分配均匀,且是 2 的幂次的选择
5.4 EventLoop 与 Channel 的绑定
Channel 一旦注册到某个 EventLoop,就在那个 EventLoop 上运行,永远不会改变:
// 注册过程
NioEventLoopGroup group = new NioEventLoopGroup(4);
// 第一个 Channel
Bootstrap b1 = new Bootstrap();
b1.group(group); // 使用同一个 Group
Channel ch1 = b1.bind(8080).sync().channel();
// ch1 被分配到 group 中的某个 EventLoop
// 第二个 Channel
Bootstrap b2 = new Bootstrap();
b2.group(group); // 同一个 Group
Channel ch2 = b2.bind(8081).sync().channel();
// ch2 被分配到 group 中的另一个 EventLoop
// ch1 和 ch2 可能在同一个 EventLoop,也可能在不同 EventLoop
// 一旦分配,就固定了
六、Bootstrap:引导配置
6.1 客户端 Bootstrap
// 客户端引导
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(workerGroup) // 指定 EventLoopGroup
.channel(NioSocketChannel.class) // Channel 类型
.option(ChannelOption.TCP_NODELAY, true) // TCP 参数
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 5000) // 连接超时
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) {
ch.pipeline()
.addLast(new MyCodec())
.addLast(new ClientHandler());
}
});
// 连接服务器
ChannelFuture future = bootstrap.connect("localhost", 8080);
future.addListener((ChannelFutureListener) f -> {
if (f.isSuccess()) {
System.out.println("连接成功");
} else {
System.out.println("连接失败: " + f.cause());
}
});
// 阻塞等待连接关闭
future.channel().closeFuture().sync();
6.2 服务器 ServerBootstrap
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(bossGroup, workerGroup) // Boss 和 Worker
.channel(NioServerSocketChannel.class) // Channel 类型
// ServerSocketChannel 参数(给 Boss 用)
.option(ChannelOption.SO_BACKLOG, 1024) // 连接队列
.option(ChannelOption.SO_REUSEADDR, true) // 地址复用
// SocketChannel 参数(给 Worker 用)
.childOption(ChannelOption.SO_KEEPALIVE, true) // TCP KeepAlive
.childOption(ChannelOption.TCP_NODELAY, true) // 禁用 Nagle
.childOption(ChannelOption.SO_RCVBUF, 32 * 1024) // 接收缓冲区
// Boss 的 Handler(可选)
.handler(new LoggingHandler(LogLevel.INFO))
// Worker 的 Handler
.childHandler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel ch) {
ch.pipeline()
.addLast(new MyCodec())
.addLast(new ServerHandler());
}
});
// 绑定端口
ChannelFuture future = bootstrap.bind(8080);
future.syncUninterruptibly();
6.3 option vs childOption
// ServerSocketChannel 参数(option)
bootstrap.option(ChannelOption.SO_BACKLOG, 100) // TCP 半连接队列
// SocketChannel 参数(childOption)
bootstrap.childOption(ChannelOption.SO_KEEPALIVE, true) // 保活
七、ByteBuf:Netty 的数据容器
7.1 ByteBuf vs ByteBuffer
Netty 为什么要自己实现 ByteBuf?因为 JDK 的 ByteBuffer 太难用了:
// JDK ByteBuffer 的问题
ByteBuffer buffer = ByteBuffer.allocate(1024);
buffer.put("hello".getBytes()); // 写完后 position 在最后
buffer.flip(); // 需要手动 flip
buffer.get(); // 读完后 position 又变了
// Netty ByteBuf 更方便
ByteBuf buf = Unpooled.buffer(1024);
buf.writeBytes("hello".getBytes()); // 自动扩展
System.out.println(buf.toString()); // 直接转 String
buf.release(); // 手动释放
7.2 ByteBuf 的组成
ByteBuf 内部结构:
┌──────────────────────────────────────────┐
│ ByteBuf │
├──────────────────────────────────────────┤
│ readerIndex │ writable area │ capacity│
│ ▼ │ │ │
│ ┌────┬────────┴────┬────────────────┐ │
│ │read│ discardRead │ writeable area │ │
│ │area│ bytes │ │ │
│ └────┴─────────────┬────────────────┘ │
│ ▼ │
│ writerIndex │
└───────────────────────────────────────────┘
7.3 ByteBuf 的类型
// 堆内 ByteBuf
ByteBuf heapBuf = Unpooled.buffer(1024);
// 直接内存 ByteBuf
ByteBuf directBuf = Unpooled.directBuffer(1024);
// 池化直接内存 ByteBuf(Netty 默认)
ByteBuf pooledBuf = PooledByteBufAllocator.DEFAULT.buffer(1024);
八、生产最佳实践
8.1 ❌ 错误示范:忘记释放 ByteBuf
public class BadHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
ByteBuf buf = (ByteBuf) msg;
// ❌ 没有释放!
System.out.println(buf.toString());
}
}
正确做法:使用 SimpleChannelInboundHandler 或手动释放:
// 方法1:使用 SimpleChannelInboundHandler
public class GoodHandler extends SimpleChannelInboundHandler<ByteBuf> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) {
System.out.println(msg.toString());
// Netty 自动释放
}
}
// 方法2:手动释放
public class ManualHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
try {
ByteBuf buf = (ByteBuf) msg;
System.out.println(buf.toString());
} finally {
ReferenceCountUtil.release(msg);
}
}
}
8.2 ❌ 错误示范:ChannelHandler 顺序错误
// ❌ 错误顺序
ch.pipeline()
.addLast("decoder", new MyDecoder()) // 解码器
.addLast("handler", new BusinessHandler()) // 业务
.addLast("encoder", new MyEncoder()); // 编码器
// Inbound: decoder → handler → (自动) encoder?
// Outbound: encoder → handler → ...?
正确顺序:Inbound 在前面,Outbound 在后面:
// ✅ 正确顺序
ch.pipeline()
// Inbound 链(处理收到的数据)
.addLast("decoder", new MyDecoder()) // 解码
.addLast("handler", new BusinessHandler()) // 业务
// Outbound 链(处理发送的数据)
.addLast("encoder", new MyEncoder()); // 编码
8.3 ✅ 正确示范:资源释放顺序
// 关闭时注意顺序
public class GracefulShutdown {
public static void main(String[] args) {
EventLoopGroup boss = new NioEventLoopGroup(1);
EventLoopGroup worker = new NioEventLoopGroup();
try {
ServerBootstrap b = new ServerBootstrap();
b.group(boss, worker);
Channel ch = b.bind(8080).sync().channel();
// 添加关闭钩子
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
// 先关闭 Worker,再关闭 Boss
worker.shutdownGracefully();
boss.shutdownGracefully();
}));
ch.closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
九、面试追问链
第一层:组件关系
面试官问:"Channel、Pipeline、Handler、EventLoop 是什么关系?"
Channel 是连接,代表一个 socket。Pipeline 是处理链,里面串着 Handler。Handler 是处理逻辑。EventLoop 是执行引擎,一个 EventLoop 服务多个 Channel。
第二层:Pipeline 执行顺序
面试官追问:"Inbound 和 Outbound 事件的执行顺序是什么?"
Inbound 事件从 Head 流向 Tail,Outbound 事件从 Tail 流向 Head。如果在 InboundHandler 中调用 ctx.write(),会转成 Outbound 事件继续向前传播。
第三层:ByteBuf 释放
面试官追问:"Netty 中 ByteBuf 为什么要手动释放?"
Netty 使用引用计数管理 ByteBuf。手动释放是为了及时回收内存,避免内存泄漏。SimpleChannelInboundHandler 会自动释放输入的 ByteBuf。
第四层:线程安全
面试官追问:"Handler 是线程安全的吗?"
Handler 本身不是线程安全的。如果多个 Channel 分配到同一个 EventLoop,同一个 Handler 实例可能被并发调用。如果需要线程安全,要在 Handler 内部加锁,或者每个 Channel 用独立的 Handler 实例。
【学习小结】
- Channel 是连接,Pipeline 是 Handler 链表,Handler 是处理逻辑
- Inbound 事件从头到尾,Outbound 事件从尾到头
- EventLoop 是执行引擎,管理 Selector 和任务队列
- Bootstrap 用于引导配置
- ByteBuf 需要手动释放(除非用 SimpleChannelInboundHandler)
- Handler 添加顺序很重要