Netty 核心组件(Channel / EventLoop / Pipeline)

Netty 的组件很多,我当年学的时候也是一脸懵:Channel 是什么?Pipeline 和 Handler 是什么关系?EventLoop 和 EventExecutor 有什么区别?

后来仔细研究了一下源码,发现这些组件的设计其实非常清晰:Channel 是连接,Pipeline 是处理链,EventLoop 是执行引擎,Handler 是处理逻辑

今天我们就来把这些核心组件彻底讲透。

一、Channel:连接的定义

1.1 Channel 是什么

Channel 是 Netty 对网络连接的抽象,类似于 JDK NIO 的 SocketChannelServerSocketChannel,但封装得更好。

// NIO 原生
SocketChannel channel = SocketChannel.open();
channel.connect(new InetSocketAddress("localhost", 8080));

// Netty
SocketChannel ch = ctx.channel(); // 获取当前连接的 Channel
ch.writeAndFlush(msg);            // 写数据
ch.close();                        // 关闭连接

1.2 常见的 Channel 类型

Channel 类型说明对应 NIO
NioSocketChannel客户端 TCP 连接SocketChannel
NioServerSocketChannel服务器监听ServerSocketChannel
NioDatagramChannelUDP 传输DatagramChannel
EpollSocketChannelLinux EPoll(Linux 特有,更高效)
OioSocketChannel阻塞 IOSocket(兼容性)
// 客户端引导
Bootstrap bootstrap = new Bootstrap();
bootstrap.channel(NioSocketChannel.class);  // NIO 客户端

// 服务器引导
ServerBootstrap server = new ServerBootstrap();
server.channel(NioServerSocketChannel.class);  // NIO 服务器

1.3 Channel 的核心方法

// 连接相关
ChannelFuture connect(SocketAddress remoteAddress);  // 连接
ChannelFuture bind(SocketAddress localAddress);        // 绑定端口
ChannelFuture close();                                  // 关闭连接

// 数据读写
ChannelFuture writeAndFlush(Object msg);               // 写并发送
Channel read();                                        // 读取数据

// 状态查询
boolean isActive();    // 是否活跃
boolean isOpen();      // 是否打开
boolean isWritable();  // 是否可写

// 属性
ChannelId id();         // 唯一标识
ChannelPipeline pipeline();  // 获取 Pipeline
EventLoop eventLoop();      // 获取 EventLoop

二、ChannelPipeline:处理链

2.1 Pipeline 是什么

每个 Channel 都有一个自己的 Pipeline,它是一个 Handler 的双向链表:

ChannelPipeline 内部结构:

                    ┌─────────────────────────────────────┐
                    │           ChannelPipeline             │
                    │                                       │
   Inbound:    Head → Handler1 → Handler2 → Handler3 → Tail
                    │                                       │
   Outbound:   Tail ← Handler3 ← Handler2 ← Handler1 ← Head
                    │                                       │
                    └─────────────────────────────────────┘

Inbound 事件(如 channelRead)从 Head 流向 Tail,Outbound 事件(如 write)从 Tail 流向 Head。

2.2 Pipeline 的创建

Pipeline 是在 Channel 创建时自动创建的:

// NioSocketChannel 的构造方法
public NioSocketChannel(Channel parent, SocketChannel socket) {
    // 创建 Pipeline
    pipeline = newChannelPipeline();
}

// 创建 Pipeline 时,自动添加 HeadContext 和 TailContext
protected DefaultChannelPipeline newChannelPipeline() {
    return new DefaultChannelPipeline(this);
}

// Pipeline 构造函数
protected DefaultChannelPipeline(Channel channel) {
    tail = new TailContext(this);
    head = new HeadContext(this);
    head.next = tail;
    tail.prev = head;
}

2.3 Handler 添加顺序

Handler 的添加顺序很重要!Inbound 和 Outbound 的执行顺序不同:

ChannelPipeline p = ch.pipeline();

// 添加顺序:1 → 2 → 3
p.addLast("handler1", new Handler1());  // 第一个
p.addLast("handler2", new Handler2());  // 第二个
p.addLast("handler3", new Handler3());  // 第三个

// Inbound 执行顺序: Head → 1 → 2 → 3 → Tail
// Outbound 执行顺序: Tail → 3 → 2 → 1 → Head

2.4 常见的编解码 Handler

// 典型配置
ch.pipeline()
  // ====== Inbound 编解码 ======
  .addLast("frame", new LengthFieldBasedFrameDecoder(1024, 0, 4, 0, 4))  // 解决粘包
  .addLast("decoder", new ByteToMessageDecoder() { ... })                // 自定义解码
  .addLast("encoder", new MessageToByteEncoder<Msg>() { ... })          // 自定义编码
  
  // ====== 业务 Handler ======
  .addLast("bizHandler", new BusinessHandler());

三、ChannelHandler:处理逻辑

3.1 Handler 的分类

ChannelHandler

    ├── ChannelInboundHandler(入站处理器)
    │       │
    │       └── ChannelOutboundHandler(出站处理器)

    └── ChannelDuplexHandler(双向处理器)
类型处理方向典型场景
InboundHandler数据从网络到应用解码、业务处理
OutboundHandler数据从应用到网络编码、连接管理
ChannelDuplexHandler两者都处理日志、监控

3.2 ChannelInboundHandler

Inbound 事件会触发 InboundHandler 的方法:

public class MyInboundHandler extends ChannelInboundHandlerAdapter {
    
    // 连接建立
    @Override
    public void channelActive(ChannelHandlerContext ctx) {
        System.out.println("连接建立: " + ctx.channel());
        ctx.fireChannelActive();  // 传播到下一个 Handler
    }
    
    // 收到数据(最常用)
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        ByteBuf buf = (ByteBuf) msg;
        System.out.println("收到: " + buf.toString(CharsetUtil.UTF_8));
        // 处理完后要释放 ByteBuf
        ReferenceCountUtil.release(msg);
    }
    
    // 数据读完
    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) {
        ctx.flush();  // 把写缓冲区的数据发送出去
    }
    
    // 连接关闭
    @Override
    public void channelInactive(ChannelHandlerContext ctx) {
        System.out.println("连接断开: " + ctx.channel());
    }
    
    // 异常处理
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        cause.printStackTrace();
        ctx.close();  // 关闭连接
    }
}

3.3 ChannelOutboundHandler

Outbound 事件会触发 OutboundHandler 的方法:

public class MyOutboundHandler extends ChannelOutboundHandlerAdapter {
    
    // 绑定端口
    @Override
    public void bind(ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) {
        System.out.println("绑定端口: " + localAddress);
        ctx.bind(localAddress, promise);
    }
    
    // 连接
    @Override
    public void connect(ChannelHandlerContext ctx, SocketAddress remoteAddress, 
                       SocketAddress localAddress, ChannelPromise promise) {
        System.out.println("连接到: " + remoteAddress);
        ctx.connect(remoteAddress, localAddress, promise);
    }
    
    // 写数据
    @Override
    public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
        System.out.println("发送数据: " + msg);
        ctx.write(msg, promise);
    }
    
    // 刷新(真正发送)
    @Override
    public void flush(ChannelHandlerContext ctx) {
        ctx.flush();
    }
}

3.4 SimpleChannelInboundHandler

对于不需要手动释放的 ByteBuf,可以使用 SimpleChannelInboundHandler

// 自动释放 ByteBuf
public class AutoHandler extends SimpleChannelInboundHandler<String> {
    
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, String msg) {
        // msg 已经是解码后的 String 类型
        // 不用手动释放,Netty 会自动处理
        System.out.println("收到消息: " + msg);
        ctx.writeAndFlush("响应: " + msg);
    }
}

四、ChannelHandlerContext:上下文

4.1 Context 是什么

每个 Handler 都有一个对应的 ChannelHandlerContext,它封装了 Handler 与 Pipeline 的关联:

ChannelPipeline pipeline = ch.pipeline();
ChannelHandlerContext ctx = pipeline.context(myHandler);

// ctx 可以做什么:
ctx.write(msg);              // 从当前位置往后传播
ctx.fireChannelRead(msg);   // 从当前位置往后传播(Inbound)
ctx.executor();             // 获取执行器
ctx.channel();              // 获取 Channel
ctx.pipeline();             // 获取 Pipeline

4.2 两类传播方法

// 传播事件(不改变位置)
ctx.fireChannelRead(msg);   // Inbound 事件向上传播
ctx.fireChannelActive();    // 连接激活向上传播

// 处理事件(改变位置)
ctx.write(msg);             // 从当前位置向前传播(Outbound)
ctx.channelRead(ctx, msg);  // 从当前位置向前传播(Inbound)

4.3 动态修改 Pipeline

可以在运行时动态添加/删除 Handler:

// 动态添加 Handler
pipeline.addLast("newHandler", new NewHandler());
pipeline.addBefore("existing", "newHandler", new NewHandler());
pipeline.addAfter("existing", "newHandler", new NewHandler());

// 动态删除 Handler
pipeline.remove("handlerName");
pipeline.remove(myHandler);

// 动态替换 Handler
pipeline.replace("oldHandler", "newHandler", new NewHandler());

五、EventLoop:执行引擎

5.1 EventLoop 的前世今生

Netty 的 EventLoop 经历了多次演进:

Netty 3.x:    Executor + ChannelHandler
Netty 4.x:    EventExecutor → EventLoop → EventLoopGroup
Netty 5.x:    已废弃

5.2 EventExecutor vs EventLoop

// EventExecutor:任务的执行器
public interface EventExecutor extends ScheduledExecutorService {
    EventExecutorGroup parent();  // 获取所属 Group
}

// EventLoop:可以处理 Channel IO 的执行器
public interface EventLoop extends EventExecutor {
    EventLoopGroup parent();  // 获取所属 Group
    ChannelFuture register(Channel channel);  // 注册 Channel
}

简单说:EventLoop 是 EventExecutor 的子类,增加了 Channel 注册能力。

5.3 EventLoop 的分配策略

Channel 注册到 EventLoopGroup 时,Netty 使用轮询分配:

// 分配算法
public EventExecutor next() {
    return chooser.next();
}

// 默认使用 PowerOfTwoEventExecutorChooser
// 保证分配均匀,且是 2 的幂次的选择

5.4 EventLoop 与 Channel 的绑定

Channel 一旦注册到某个 EventLoop,就在那个 EventLoop 上运行,永远不会改变:

// 注册过程
NioEventLoopGroup group = new NioEventLoopGroup(4);

// 第一个 Channel
Bootstrap b1 = new Bootstrap();
b1.group(group);  // 使用同一个 Group
Channel ch1 = b1.bind(8080).sync().channel();
// ch1 被分配到 group 中的某个 EventLoop

// 第二个 Channel  
Bootstrap b2 = new Bootstrap();
b2.group(group);  // 同一个 Group
Channel ch2 = b2.bind(8081).sync().channel();
// ch2 被分配到 group 中的另一个 EventLoop

// ch1 和 ch2 可能在同一个 EventLoop,也可能在不同 EventLoop
// 一旦分配,就固定了

六、Bootstrap:引导配置

6.1 客户端 Bootstrap

// 客户端引导
Bootstrap bootstrap = new Bootstrap();

bootstrap.group(workerGroup)                    // 指定 EventLoopGroup
        .channel(NioSocketChannel.class)        // Channel 类型
        .option(ChannelOption.TCP_NODELAY, true)   // TCP 参数
        .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 5000)  // 连接超时
        .handler(new ChannelInitializer<SocketChannel>() {
            @Override
            protected void initChannel(SocketChannel ch) {
                ch.pipeline()
                  .addLast(new MyCodec())
                  .addLast(new ClientHandler());
            }
        });

// 连接服务器
ChannelFuture future = bootstrap.connect("localhost", 8080);
future.addListener((ChannelFutureListener) f -> {
    if (f.isSuccess()) {
        System.out.println("连接成功");
    } else {
        System.out.println("连接失败: " + f.cause());
    }
});

// 阻塞等待连接关闭
future.channel().closeFuture().sync();

6.2 服务器 ServerBootstrap

ServerBootstrap bootstrap = new ServerBootstrap();

bootstrap.group(bossGroup, workerGroup)         // Boss 和 Worker
         .channel(NioServerSocketChannel.class) // Channel 类型
         
         // ServerSocketChannel 参数(给 Boss 用)
         .option(ChannelOption.SO_BACKLOG, 1024)     // 连接队列
         .option(ChannelOption.SO_REUSEADDR, true)   // 地址复用
         
         // SocketChannel 参数(给 Worker 用)
         .childOption(ChannelOption.SO_KEEPALIVE, true)   // TCP KeepAlive
         .childOption(ChannelOption.TCP_NODELAY, true)    // 禁用 Nagle
         .childOption(ChannelOption.SO_RCVBUF, 32 * 1024) // 接收缓冲区
         
         // Boss 的 Handler(可选)
         .handler(new LoggingHandler(LogLevel.INFO))
         
         // Worker 的 Handler
         .childHandler(new ChannelInitializer<NioSocketChannel>() {
             @Override
             protected void initChannel(NioSocketChannel ch) {
                 ch.pipeline()
                   .addLast(new MyCodec())
                   .addLast(new ServerHandler());
             }
         });

// 绑定端口
ChannelFuture future = bootstrap.bind(8080);
future.syncUninterruptibly();

6.3 option vs childOption

参数作用对象说明
optionServerSocketChannelBoss 处理的参数
childOptionSocketChannelWorker 处理的参数
// ServerSocketChannel 参数(option)
bootstrap.option(ChannelOption.SO_BACKLOG, 100)    // TCP 半连接队列

// SocketChannel 参数(childOption)
bootstrap.childOption(ChannelOption.SO_KEEPALIVE, true)  // 保活

七、ByteBuf:Netty 的数据容器

7.1 ByteBuf vs ByteBuffer

Netty 为什么要自己实现 ByteBuf?因为 JDK 的 ByteBuffer 太难用了:

// JDK ByteBuffer 的问题
ByteBuffer buffer = ByteBuffer.allocate(1024);
buffer.put("hello".getBytes());  // 写完后 position 在最后
buffer.flip();                   // 需要手动 flip
buffer.get();                    // 读完后 position 又变了

// Netty ByteBuf 更方便
ByteBuf buf = Unpooled.buffer(1024);
buf.writeBytes("hello".getBytes());  // 自动扩展
System.out.println(buf.toString());   // 直接转 String
buf.release();                        // 手动释放

7.2 ByteBuf 的组成

ByteBuf 内部结构:

┌──────────────────────────────────────────┐
│              ByteBuf                      │
├──────────────────────────────────────────┤
│  readerIndex  │  writable area  │ capacity│
│       ▼        │                 │        │
│  ┌────┬────────┴────┬────────────────┐   │
│  │read│  discardRead │ writeable area │   │
│  │area│   bytes      │                │   │
│  └────┴─────────────┬────────────────┘   │
│                     ▼                      │
│                 writerIndex                │
└───────────────────────────────────────────┘
区域说明
readerIndex读指针
writerIndex写指针
discardReadBytes已读区域(可以 discard 回收)
readableBytes可读字节数
writableBytes可写字节数

7.3 ByteBuf 的类型

类型特点使用场景
HeapByteBufJVM 堆内分配,GC 管理普通使用
DirectByteBuf堆外内存,不受 GC 管理网络传输(减少一次拷贝)
PooledByteBuf对象池复用,减少分配开销高性能场景
// 堆内 ByteBuf
ByteBuf heapBuf = Unpooled.buffer(1024);

// 直接内存 ByteBuf
ByteBuf directBuf = Unpooled.directBuffer(1024);

// 池化直接内存 ByteBuf(Netty 默认)
ByteBuf pooledBuf = PooledByteBufAllocator.DEFAULT.buffer(1024);

八、生产最佳实践

8.1 ❌ 错误示范:忘记释放 ByteBuf

public class BadHandler extends ChannelInboundHandlerAdapter {
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        ByteBuf buf = (ByteBuf) msg;
        // ❌ 没有释放!
        System.out.println(buf.toString());
    }
}

正确做法:使用 SimpleChannelInboundHandler 或手动释放:

// 方法1:使用 SimpleChannelInboundHandler
public class GoodHandler extends SimpleChannelInboundHandler<ByteBuf> {
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) {
        System.out.println(msg.toString());
        // Netty 自动释放
    }
}

// 方法2:手动释放
public class ManualHandler extends ChannelInboundHandlerAdapter {
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        try {
            ByteBuf buf = (ByteBuf) msg;
            System.out.println(buf.toString());
        } finally {
            ReferenceCountUtil.release(msg);
        }
    }
}

8.2 ❌ 错误示范:ChannelHandler 顺序错误

// ❌ 错误顺序
ch.pipeline()
  .addLast("decoder", new MyDecoder())      // 解码器
  .addLast("handler", new BusinessHandler()) // 业务
  .addLast("encoder", new MyEncoder());       // 编码器
// Inbound: decoder → handler → (自动) encoder?
// Outbound: encoder → handler → ...?

正确顺序:Inbound 在前面,Outbound 在后面:

// ✅ 正确顺序
ch.pipeline()
  // Inbound 链(处理收到的数据)
  .addLast("decoder", new MyDecoder())      // 解码
  .addLast("handler", new BusinessHandler()) // 业务
  
  // Outbound 链(处理发送的数据)
  .addLast("encoder", new MyEncoder());     // 编码

8.3 ✅ 正确示范:资源释放顺序

// 关闭时注意顺序
public class GracefulShutdown {
    public static void main(String[] args) {
        EventLoopGroup boss = new NioEventLoopGroup(1);
        EventLoopGroup worker = new NioEventLoopGroup();
        
        try {
            ServerBootstrap b = new ServerBootstrap();
            b.group(boss, worker);
            Channel ch = b.bind(8080).sync().channel();
            
            // 添加关闭钩子
            Runtime.getRuntime().addShutdownHook(new Thread(() -> {
                // 先关闭 Worker,再关闭 Boss
                worker.shutdownGracefully();
                boss.shutdownGracefully();
            }));
            
            ch.closeFuture().sync();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

九、面试追问链

第一层:组件关系

面试官问:"Channel、Pipeline、Handler、EventLoop 是什么关系?"

Channel 是连接,代表一个 socket。Pipeline 是处理链,里面串着 Handler。Handler 是处理逻辑。EventLoop 是执行引擎,一个 EventLoop 服务多个 Channel。

第二层:Pipeline 执行顺序

面试官追问:"Inbound 和 Outbound 事件的执行顺序是什么?"

Inbound 事件从 Head 流向 Tail,Outbound 事件从 Tail 流向 Head。如果在 InboundHandler 中调用 ctx.write(),会转成 Outbound 事件继续向前传播。

第三层:ByteBuf 释放

面试官追问:"Netty 中 ByteBuf 为什么要手动释放?"

Netty 使用引用计数管理 ByteBuf。手动释放是为了及时回收内存,避免内存泄漏。SimpleChannelInboundHandler 会自动释放输入的 ByteBuf。

第四层:线程安全

面试官追问:"Handler 是线程安全的吗?"

Handler 本身不是线程安全的。如果多个 Channel 分配到同一个 EventLoop,同一个 Handler 实例可能被并发调用。如果需要线程安全,要在 Handler 内部加锁,或者每个 Channel 用独立的 Handler 实例。

【学习小结】

  • Channel 是连接,Pipeline 是 Handler 链表,Handler 是处理逻辑
  • Inbound 事件从头到尾,Outbound 事件从尾到头
  • EventLoop 是执行引擎,管理 Selector 和任务队列
  • Bootstrap 用于引导配置
  • ByteBuf 需要手动释放(除非用 SimpleChannelInboundHandler)
  • Handler 添加顺序很重要