AIO(异步IO)原理
学完 BIO 和 NIO 之后,很多同学会问:既然 NIO 已经是非阻塞了,那 AIO 又是什么?异步和非阻塞有什么区别?
我当年也被这个问题困扰了很久。先说结论:NIO 是非阻塞,告诉你"数据还没准备好,你先去干别的";AIO 是异步,告诉你"数据好了,我来通知你"。
一个是"轮询模式",一个是"回调模式"。今天我们就来彻底搞清楚 AIO。
一、异步 vs 非阻塞
1.1 三种 IO 模型
阻塞(Blocking):调用发起后,线程一直等到数据就绪才返回。线程在等待期间什么也干不了。
非阻塞(Non-blocking):调用发起后,立即返回。如果数据没准备好,返回一个标志告诉你"还没好"。你需要不断轮询来看数据好了没有。
异步(Asynchronous):调用发起后,立即返回。你什么都不用管,当数据就绪时,系统会主动通知你(通过回调)。你只需要在回调里处理数据。
1.2 【直观类比】三种模式
想象你去餐厅点餐:
- 阻塞模式:你站在柜台前等,服务员做好才给你,其间你什么也干不了
- 非阻塞模式:你拿到一个号码,然后每隔一分钟去柜台问一下"好了吗?"——你被解放出来,但还是要主动去问
- 异步模式:你拿了号码,服务员说"好了我叫您"。你去玩手机,数据好了服务员主动喊你——你完全不用管
1.3 NIO vs AIO 的核心区别
NIO(Reactor 模式):
应用线程 ──→ select() 阻塞 ──→ 返回就绪事件 ──→ 处理
│
还需要主动调用 read()
AIO(Proactor 模式):
应用线程 ──→ 发起异步读 ──→ 去做别的事
│
OS 完成读后 ──→ 回调通知应用
NIO 需要应用自己调用 read() 去取数据,AIO 的 read() 完成时数据已经在你的 Buffer 里了。
二、AIO 的核心 API
2.1 AsynchronousServerSocketChannel
JDK 7 提供了 AsynchronousServerSocketChannel,用于创建异步服务器:
AsynchronousServerSocketChannel serverChannel = AsynchronousServerSocketChannel.open();
// 绑定端口
serverChannel.bind(new InetSocketAddress(8080));
// 接受连接(异步)
serverChannel.accept(null, new CompletionHandler<AsynchronousSocketChannel, Void>() {
@Override
public void completed(AsynchronousSocketChannel result, Void attachment) {
// 新连接来了,处理它
System.out.println("新连接: " + result.getRemoteAddress());
// 继续接受下一个连接
serverChannel.accept(null, this);
}
@Override
public void failed(Throwable exc, Void attachment) {
exc.printStackTrace();
}
});
2.2 CompletionHandler 回调接口
CompletionHandler 是 AIO 的核心回调接口:
public interface CompletionHandler<V, A> {
// 操作成功完成时调用
void completed(V result, A attachment);
// 操作失败时调用
void failed(Throwable exc, A attachment);
}
其中泛型:
V:操作结果类型(如 AsynchronousSocketChannel、Integer)
A:附件对象类型(可以传 null)
2.3 读写操作的异步化
AsynchronousSocketChannel client = ...;
ByteBuffer buffer = ByteBuffer.allocate(1024);
// 异步读取
client.read(buffer, null, new CompletionHandler<Integer, Void>() {
@Override
public void completed(Integer result, Void attachment) {
// result 是读取的字节数
if (result == -1) {
// 对端关闭了连接
try { client.close(); } catch (IOException ignored) {}
return;
}
buffer.flip();
// 处理数据...
System.out.println(new String(buffer.array(), 0, buffer.limit()));
// 继续读取下一个数据包
buffer.clear();
client.read(buffer, null, this); // 继续异步读
}
@Override
public void failed(Throwable exc, Void attachment) {
exc.printStackTrace();
try { client.close(); } catch (IOException ignored) {}
}
});
// 这里主线程可以去做别的事
while (true) {
Thread.sleep(1000);
}
2.4 异步写的操作
ByteBuffer buffer = ByteBuffer.wrap("Hello, AIO".getBytes());
client.write(buffer, null, new CompletionHandler<Integer, Void>() {
@Override
public void completed(Integer result, Void attachment) {
System.out.println("发送了 " + result + " 字节");
}
@Override
public void failed(Throwable exc, Void attachment) {
exc.printStackTrace();
}
});
三、用 Future 方式处理
除了 CompletionHandler,AIO 还提供了 Future 方式:
AsynchronousServerSocketChannel server = AsynchronousServerSocketChannel.open();
server.bind(new InetSocketAddress(8080));
// 接受连接(返回 Future)
Future<AsynchronousSocketChannel> acceptFuture = server.accept();
AsynchronousSocketChannel client = acceptFuture.get(); // 阻塞直到连接到来
// 读数据(返回 Future)
ByteBuffer buffer = ByteBuffer.allocate(1024);
Future<Integer> readFuture = client.read(buffer);
Integer bytesRead = readFuture.get(); // 阻塞直到读完
// 写数据
ByteBuffer response = ByteBuffer.wrap("ACK".getBytes());
Future<Integer> writeFuture = client.write(response);
writeFuture.get(); // 等待写完
💡
Future 方式和 CompletionHandler 本质一样,只是风格不同:
- CompletionHandler:纯异步回调,适合事件驱动
- Future:可以同步等待,适合需要等待结果的场景
实际开发中,CompletionHandler 更常用,因为它真正发挥了异步的优势。
四、AsynchronousChannelGroup
4.1 为什么需要 ChannelGroup
AIO 的底层需要一个线程池来执行回调通知。AsynchronousChannelGroup 就是管理这些线程的:
// 创建自己的线程池(1个线程处理事件)
AsynchronousChannelGroup group = AsynchronousChannelGroup.withFixedThreadPool(
2, // 线程数
Executors.defaultDaemonThreadFactory()
);
AsynchronousServerSocketChannel serverChannel =
AsynchronousServerSocketChannel.open(group);
如果不指定 group,会使用系统默认的 group。
4.2 默认 group 的问题
默认 group 的线程数是有限的,如果回调处理太慢,可能影响其他事件:
// 问题:所有 AIO 通道共享同一个默认 group
AsynchronousServerSocketChannel server = AsynchronousServerSocketChannel.open();
AsynchronousSocketChannel client = AsynchronousSocketChannel.open();
// 它们可能竞争同一个 group 的资源
生产环境建议:使用单独的 AsynchronousChannelGroup。
五、完整的 AIO 服务器示例
public class AioEchoServer {
public static void main(String[] args) throws IOException, InterruptedException {
AsynchronousChannelGroup group = AsynchronousChannelGroup.withFixedThreadPool(
4, // 4 个线程处理回调
Executors.defaultDaemonThreadFactory()
);
AsynchronousServerSocketChannel server = AsynchronousServerSocketChannel.open(group);
server.bind(new InetSocketAddress(8080));
System.out.println("AIO 服务器启动,端口 8080");
// 开始接受连接
server.accept(null, new AcceptHandler(server));
// 主线程不能退出
Thread.sleep(Long.MAX_VALUE);
}
}
class AcceptHandler implements CompletionHandler<AsynchronousSocketChannel, Void> {
private final AsynchronousServerSocketChannel server;
AcceptHandler(AsynchronousServerSocketChannel server) {
this.server = server;
}
@Override
public void completed(AsynchronousSocketChannel client, Void attachment) {
System.out.println("新连接: " + client.getRemoteAddress());
// 继续接受下一个连接
server.accept(null, this);
// 处理这个连接
ByteBuffer buffer = ByteBuffer.allocate(1024);
client.read(buffer, null, new ReadHandler(client));
}
@Override
public void failed(Throwable exc, Void attachment) {
exc.printStackTrace();
}
}
class ReadHandler implements CompletionHandler<Integer, ByteBuffer> {
private final AsynchronousSocketChannel client;
ReadHandler(AsynchronousSocketChannel client) {
this.client = client;
}
@Override
public void completed(Integer result, ByteBuffer buffer) {
if (result == -1) {
// 客户端关闭
try { client.close(); } catch (IOException ignored) {}
return;
}
buffer.flip();
System.out.println("收到: " + new String(buffer.array(), 0, buffer.limit()));
// 回显
client.write(buffer, null, new CompletionHandler<Integer, ByteBuffer>() {
@Override
public void completed(Integer result, ByteBuffer buffer) {
// 继续读取下一个请求
buffer.clear();
client.read(buffer, null, ReadHandler.this);
}
@Override
public void failed(Throwable exc, ByteBuffer buffer) {
exc.printStackTrace();
try { client.close(); } catch (IOException ignored) {}
}
});
}
@Override
public void failed(Throwable exc, ByteBuffer buffer) {
exc.printStackTrace();
try { client.close(); } catch (IOException ignored) {}
}
}
六、AIO vs NIO vs BIO
6.1 性能对比
吞吐量(越高越好):
AIO > NIO > BIO
编程复杂度(越低越好):
BIO < NIO < AIO
延迟(同并发量下,越低越好):
AIO ≈ NIO > BIO
6.2 什么场景选 AIO
// 适合 AIO 的场景:
// 1. 连接数非常多(>10000)
// 2. 每个连接的数据量不大
// 3. 需要极致性能
// 4. 能接受较复杂的异步编程模型
七、Linux 下的 AIO 实现
7.1 AIO 的底层实现
在 Linux 系统上,AIO 有两种实现:
JDK 7 的 AIO 在 Linux 上早期用的是 epoll(和 NIO 一样),只是 API 不同。
JDK 11 引入对 io_uring 的支持,进一步提升性能。
7.2 io_uring:新一代异步 IO
io_uring 是 Linux 5.1+ 的新特性,提供了真正的异步 IO:
// JDK 11+ 如果系统支持 io_uring,会自动使用
// 不需要改代码,只是底层实现不同
AsynchronousServerSocketChannel channel = AsynchronousServerSocketChannel.open();
io_uring 的优势:
- 提交 IO 请求和获取结果用两个环形队列
- 减少了系统调用次数
- 支持批量提交 IO 请求
八、生产避坑
8.1 ❌ 错误示范:忘记注册下一次读
client.read(buffer, null, new CompletionHandler<Integer, Void>() {
@Override
public void completed(Integer result, Void attachment) {
// 处理数据...
// ❌ 忘记继续读,数据只会收到一次
// client.read(buffer, null, this); // 应该加上这行
}
});
8.2 ❌ 错误示范:在回调里阻塞
client.read(buffer, null, new CompletionHandler<Integer, Void>() {
@Override
public void completed(Integer result, Void attachment) {
// ❌ 错误的做法:在这里阻塞
String data = processHeavyOperation(); // 这个操作很慢,会阻塞回调线程
// 回调线程被占满,group 里的线程很快会用完
}
});
正确做法:把耗时操作丢给线程池。
8.3 ❌ 错误示范:Buffer 被重用
ByteBuffer buffer = ByteBuffer.allocate(1024);
client.read(buffer, null, new CompletionHandler<Integer, Void>() {
@Override
public void completed(Integer result, Void attachment) {
// 处理 buffer 的数据
buffer.flip();
process(buffer);
// ❌ 同一个 buffer 又开始读,数据可能丢失
buffer.clear();
client.read(buffer, null, this);
}
});
正确做法:每个连接用自己的 Buffer,或者在回调开始时创建新 Buffer。
九、面试追问链
第一层:基础概念
面试官问:"AIO 和 NIO 的区别是什么?"
NIO 是非阻塞,通过 Selector 轮询 Channel 是否就绪;AIO 是异步,数据就绪后系统主动回调通知。NIO 仍需应用自己读数据,AIO 可能在回调触发时数据已经可以用了。
第二层:回调机制
面试官追问:"CompletionHandler 的 completed 和 failed 什么时候会被调用?"
completed 在操作成功完成时调用,failed 在操作失败时调用。对于 read,completed 还会在读到数据或对端关闭时调用。
第三层:底层原理
面试官追问:"Linux 上 AIO 是怎么实现的?"
早期用 epoll 做事件探测,但读取还是同步的。JDK 11+ 引入了 io_uring 支持,真正实现了内核级的异步 IO,应用只需要提交请求和取结果。
第四层:选型建议
面试官追问:"什么场景用 AIO?"
AIO 适合超高并发(10 万+连接)、每个连接数据量不大、需要极致性能的场景。缺点是编程模型复杂,需要处理回调嵌套。普通高并发场景用 NIO/Netty 足够。
【学习小结】
- AIO 是真正的异步 IO,数据就绪后主动回调
- NIO 是非阻塞,需要应用自己轮询是否就绪
- CompletionHandler 是 AIO 的核心回调接口
- AsynchronousChannelGroup 管理 AIO 的线程池
- AIO 适合超高并发,但编程复杂度高
- 生产注意回调里不能阻塞、Buffer 不能乱重用