AIO(异步IO)原理

学完 BIO 和 NIO 之后,很多同学会问:既然 NIO 已经是非阻塞了,那 AIO 又是什么?异步和非阻塞有什么区别?

我当年也被这个问题困扰了很久。先说结论:NIO 是非阻塞,告诉你"数据还没准备好,你先去干别的";AIO 是异步,告诉你"数据好了,我来通知你"

一个是"轮询模式",一个是"回调模式"。今天我们就来彻底搞清楚 AIO。

一、异步 vs 非阻塞

1.1 三种 IO 模型

模型阻塞非阻塞异步
BIO阻塞等待
NIO非阻塞,轮询
AIO异步,回调

阻塞(Blocking):调用发起后,线程一直等到数据就绪才返回。线程在等待期间什么也干不了。

非阻塞(Non-blocking):调用发起后,立即返回。如果数据没准备好,返回一个标志告诉你"还没好"。你需要不断轮询来看数据好了没有。

异步(Asynchronous):调用发起后,立即返回。你什么都不用管,当数据就绪时,系统会主动通知你(通过回调)。你只需要在回调里处理数据。

1.2 【直观类比】三种模式

想象你去餐厅点餐:

  • 阻塞模式:你站在柜台前等,服务员做好才给你,其间你什么也干不了
  • 非阻塞模式:你拿到一个号码,然后每隔一分钟去柜台问一下"好了吗?"——你被解放出来,但还是要主动去问
  • 异步模式:你拿了号码,服务员说"好了我叫您"。你去玩手机,数据好了服务员主动喊你——你完全不用管

1.3 NIO vs AIO 的核心区别

NIO(Reactor 模式):
  应用线程 ──→ select() 阻塞 ──→ 返回就绪事件 ──→ 处理

                                           还需要主动调用 read()

AIO(Proactor 模式):
  应用线程 ──→ 发起异步读 ──→ 去做别的事

                    OS 完成读后 ──→ 回调通知应用

NIO 需要应用自己调用 read() 去取数据,AIO 的 read() 完成时数据已经在你的 Buffer 里了。

二、AIO 的核心 API

2.1 AsynchronousServerSocketChannel

JDK 7 提供了 AsynchronousServerSocketChannel,用于创建异步服务器:

AsynchronousServerSocketChannel serverChannel = AsynchronousServerSocketChannel.open();

// 绑定端口
serverChannel.bind(new InetSocketAddress(8080));

// 接受连接(异步)
serverChannel.accept(null, new CompletionHandler<AsynchronousSocketChannel, Void>() {
    @Override
    public void completed(AsynchronousSocketChannel result, Void attachment) {
        // 新连接来了,处理它
        System.out.println("新连接: " + result.getRemoteAddress());
        // 继续接受下一个连接
        serverChannel.accept(null, this);
    }
    
    @Override
    public void failed(Throwable exc, Void attachment) {
        exc.printStackTrace();
    }
});

2.2 CompletionHandler 回调接口

CompletionHandler 是 AIO 的核心回调接口:

public interface CompletionHandler<V, A> {
    // 操作成功完成时调用
    void completed(V result, A attachment);
    
    // 操作失败时调用
    void failed(Throwable exc, A attachment);
}

其中泛型:

  • V:操作结果类型(如 AsynchronousSocketChannelInteger
  • A:附件对象类型(可以传 null)

2.3 读写操作的异步化

AsynchronousSocketChannel client = ...;
ByteBuffer buffer = ByteBuffer.allocate(1024);

// 异步读取
client.read(buffer, null, new CompletionHandler<Integer, Void>() {
    @Override
    public void completed(Integer result, Void attachment) {
        // result 是读取的字节数
        if (result == -1) {
            // 对端关闭了连接
            try { client.close(); } catch (IOException ignored) {}
            return;
        }
        
        buffer.flip();
        // 处理数据...
        System.out.println(new String(buffer.array(), 0, buffer.limit()));
        
        // 继续读取下一个数据包
        buffer.clear();
        client.read(buffer, null, this); // 继续异步读
    }
    
    @Override
    public void failed(Throwable exc, Void attachment) {
        exc.printStackTrace();
        try { client.close(); } catch (IOException ignored) {}
    }
});

// 这里主线程可以去做别的事
while (true) {
    Thread.sleep(1000);
}

2.4 异步写的操作

ByteBuffer buffer = ByteBuffer.wrap("Hello, AIO".getBytes());

client.write(buffer, null, new CompletionHandler<Integer, Void>() {
    @Override
    public void completed(Integer result, Void attachment) {
        System.out.println("发送了 " + result + " 字节");
    }
    
    @Override
    public void failed(Throwable exc, Void attachment) {
        exc.printStackTrace();
    }
});

三、用 Future 方式处理

除了 CompletionHandler,AIO 还提供了 Future 方式:

AsynchronousServerSocketChannel server = AsynchronousServerSocketChannel.open();
server.bind(new InetSocketAddress(8080));

// 接受连接(返回 Future)
Future<AsynchronousSocketChannel> acceptFuture = server.accept();
AsynchronousSocketChannel client = acceptFuture.get(); // 阻塞直到连接到来

// 读数据(返回 Future)
ByteBuffer buffer = ByteBuffer.allocate(1024);
Future<Integer> readFuture = client.read(buffer);
Integer bytesRead = readFuture.get(); // 阻塞直到读完

// 写数据
ByteBuffer response = ByteBuffer.wrap("ACK".getBytes());
Future<Integer> writeFuture = client.write(response);
writeFuture.get(); // 等待写完
💡

Future 方式和 CompletionHandler 本质一样,只是风格不同:

  • CompletionHandler:纯异步回调,适合事件驱动
  • Future:可以同步等待,适合需要等待结果的场景

实际开发中,CompletionHandler 更常用,因为它真正发挥了异步的优势。

四、AsynchronousChannelGroup

4.1 为什么需要 ChannelGroup

AIO 的底层需要一个线程池来执行回调通知。AsynchronousChannelGroup 就是管理这些线程的:

// 创建自己的线程池(1个线程处理事件)
AsynchronousChannelGroup group = AsynchronousChannelGroup.withFixedThreadPool(
    2,  // 线程数
    Executors.defaultDaemonThreadFactory()
);

AsynchronousServerSocketChannel serverChannel = 
    AsynchronousServerSocketChannel.open(group);

如果不指定 group,会使用系统默认的 group。

4.2 默认 group 的问题

默认 group 的线程数是有限的,如果回调处理太慢,可能影响其他事件:

// 问题:所有 AIO 通道共享同一个默认 group
AsynchronousServerSocketChannel server = AsynchronousServerSocketChannel.open();
AsynchronousSocketChannel client = AsynchronousSocketChannel.open();
// 它们可能竞争同一个 group 的资源

生产环境建议:使用单独的 AsynchronousChannelGroup

五、完整的 AIO 服务器示例

public class AioEchoServer {
    public static void main(String[] args) throws IOException, InterruptedException {
        AsynchronousChannelGroup group = AsynchronousChannelGroup.withFixedThreadPool(
            4, // 4 个线程处理回调
            Executors.defaultDaemonThreadFactory()
        );
        
        AsynchronousServerSocketChannel server = AsynchronousServerSocketChannel.open(group);
        server.bind(new InetSocketAddress(8080));
        
        System.out.println("AIO 服务器启动,端口 8080");
        
        // 开始接受连接
        server.accept(null, new AcceptHandler(server));
        
        // 主线程不能退出
        Thread.sleep(Long.MAX_VALUE);
    }
}

class AcceptHandler implements CompletionHandler<AsynchronousSocketChannel, Void> {
    private final AsynchronousServerSocketChannel server;
    
    AcceptHandler(AsynchronousServerSocketChannel server) {
        this.server = server;
    }
    
    @Override
    public void completed(AsynchronousSocketChannel client, Void attachment) {
        System.out.println("新连接: " + client.getRemoteAddress());
        
        // 继续接受下一个连接
        server.accept(null, this);
        
        // 处理这个连接
        ByteBuffer buffer = ByteBuffer.allocate(1024);
        client.read(buffer, null, new ReadHandler(client));
    }
    
    @Override
    public void failed(Throwable exc, Void attachment) {
        exc.printStackTrace();
    }
}

class ReadHandler implements CompletionHandler<Integer, ByteBuffer> {
    private final AsynchronousSocketChannel client;
    
    ReadHandler(AsynchronousSocketChannel client) {
        this.client = client;
    }
    
    @Override
    public void completed(Integer result, ByteBuffer buffer) {
        if (result == -1) {
            // 客户端关闭
            try { client.close(); } catch (IOException ignored) {}
            return;
        }
        
        buffer.flip();
        System.out.println("收到: " + new String(buffer.array(), 0, buffer.limit()));
        
        // 回显
        client.write(buffer, null, new CompletionHandler<Integer, ByteBuffer>() {
            @Override
            public void completed(Integer result, ByteBuffer buffer) {
                // 继续读取下一个请求
                buffer.clear();
                client.read(buffer, null, ReadHandler.this);
            }
            
            @Override
            public void failed(Throwable exc, ByteBuffer buffer) {
                exc.printStackTrace();
                try { client.close(); } catch (IOException ignored) {}
            }
        });
    }
    
    @Override
    public void failed(Throwable exc, ByteBuffer buffer) {
        exc.printStackTrace();
        try { client.close(); } catch (IOException ignored) {}
    }
}

六、AIO vs NIO vs BIO

维度BIONIOAIO
线程模型1 连接 1 线程1 线程 N 连接1 线程 N 连接
IO 模式阻塞非阻塞(轮询)异步(回调)
数据就绪通知select/epoll 轮询内核回调通知
数据处理同步读取同步读取可能已经读完
API 复杂度简单中等较复杂
适用场景低并发高并发超高并发

6.1 性能对比

吞吐量(越高越好):
AIO > NIO > BIO

编程复杂度(越低越好):
BIO < NIO < AIO

延迟(同并发量下,越低越好):
AIO ≈ NIO > BIO

6.2 什么场景选 AIO

// 适合 AIO 的场景:
// 1. 连接数非常多(>10000)
// 2. 每个连接的数据量不大
// 3. 需要极致性能
// 4. 能接受较复杂的异步编程模型

七、Linux 下的 AIO 实现

7.1 AIO 的底层实现

在 Linux 系统上,AIO 有两种实现:

实现说明
POSIX AIO(glibc)用户态实现,用线程模拟异步
Linux native AIO(io_uring)内核级支持,JDK 11+ 使用

JDK 7 的 AIO 在 Linux 上早期用的是 epoll(和 NIO 一样),只是 API 不同。

JDK 11 引入对 io_uring 的支持,进一步提升性能。

7.2 io_uring:新一代异步 IO

io_uring 是 Linux 5.1+ 的新特性,提供了真正的异步 IO:

// JDK 11+ 如果系统支持 io_uring,会自动使用
// 不需要改代码,只是底层实现不同
AsynchronousServerSocketChannel channel = AsynchronousServerSocketChannel.open();

io_uring 的优势:

  • 提交 IO 请求和获取结果用两个环形队列
  • 减少了系统调用次数
  • 支持批量提交 IO 请求

八、生产避坑

8.1 ❌ 错误示范:忘记注册下一次读

client.read(buffer, null, new CompletionHandler<Integer, Void>() {
    @Override
    public void completed(Integer result, Void attachment) {
        // 处理数据...
        
        // ❌ 忘记继续读,数据只会收到一次
        // client.read(buffer, null, this); // 应该加上这行
    }
});

8.2 ❌ 错误示范:在回调里阻塞

client.read(buffer, null, new CompletionHandler<Integer, Void>() {
    @Override
    public void completed(Integer result, Void attachment) {
        // ❌ 错误的做法:在这里阻塞
        String data = processHeavyOperation(); // 这个操作很慢,会阻塞回调线程
        // 回调线程被占满,group 里的线程很快会用完
    }
});

正确做法:把耗时操作丢给线程池。

8.3 ❌ 错误示范:Buffer 被重用

ByteBuffer buffer = ByteBuffer.allocate(1024);

client.read(buffer, null, new CompletionHandler<Integer, Void>() {
    @Override
    public void completed(Integer result, Void attachment) {
        // 处理 buffer 的数据
        buffer.flip();
        process(buffer);
        
        // ❌ 同一个 buffer 又开始读,数据可能丢失
        buffer.clear();
        client.read(buffer, null, this);
    }
});

正确做法:每个连接用自己的 Buffer,或者在回调开始时创建新 Buffer。

九、面试追问链

第一层:基础概念

面试官问:"AIO 和 NIO 的区别是什么?"

NIO 是非阻塞,通过 Selector 轮询 Channel 是否就绪;AIO 是异步,数据就绪后系统主动回调通知。NIO 仍需应用自己读数据,AIO 可能在回调触发时数据已经可以用了。

第二层:回调机制

面试官追问:"CompletionHandler 的 completed 和 failed 什么时候会被调用?"

completed 在操作成功完成时调用,failed 在操作失败时调用。对于 read,completed 还会在读到数据或对端关闭时调用。

第三层:底层原理

面试官追问:"Linux 上 AIO 是怎么实现的?"

早期用 epoll 做事件探测,但读取还是同步的。JDK 11+ 引入了 io_uring 支持,真正实现了内核级的异步 IO,应用只需要提交请求和取结果。

第四层:选型建议

面试官追问:"什么场景用 AIO?"

AIO 适合超高并发(10 万+连接)、每个连接数据量不大、需要极致性能的场景。缺点是编程模型复杂,需要处理回调嵌套。普通高并发场景用 NIO/Netty 足够。

【学习小结】

  • AIO 是真正的异步 IO,数据就绪后主动回调
  • NIO 是非阻塞,需要应用自己轮询是否就绪
  • CompletionHandler 是 AIO 的核心回调接口
  • AsynchronousChannelGroup 管理 AIO 的线程池
  • AIO 适合超高并发,但编程复杂度高
  • 生产注意回调里不能阻塞、Buffer 不能乱重用