#IM即时通讯系统设计
#一个"简单"聊天室的教训
2018年,我们团队接到了一个需求:"做个内部聊天工具。"
我想:"不就是 WebSocket 推送消息吗?20行代码搞定。"
两周后,我被产品经理追着打了:
- "为什么张三的消息李四没收到?"
- "为什么消息顺序是乱的?"
- "为什么用户 A 在线、用户 B 也在线,但消息发送失败?"
- "为什么消息有重复?"
- "为什么断网重连后消息丢失了?"
IM 系统的复杂度远超"发消息"本身。
#二、IM 系统的核心挑战🔴
#2.1 核心指标
| 指标 | 目标 |
|---|---|
| 消息延迟 | < 100ms(P99) |
| 可用性 | 99.99% |
| 消息到达率 | 99.999% |
| 支持在线用户 | 100万+ |
| 消息存储 | 10亿+ 条 |
#2.2 核心问题
IM 系统的三大难题:
1. 消息可靠性
- 不丢消息(消息持久化 + 确认机制)
- 不重复消息(幂等去重)
- 不乱序(消息 ID + 序列号)
2. 消息实时性
- WebSocket 长连接
- 心跳保活
- 重连恢复
3. 消息一致性
- 多端同步(手机 + PC + Web)
- 消息已读未读
- 消息漫游#三、系统架构设计🔴
#3.1 整体架构
┌─────────────────────────────────────────────────────────┐
│ 用户端 │
│ ┌─────────┐ ┌─────────┐ ┌─────────┐ │
│ │ Web │ │ Android │ │ iOS │ │
│ └────┬─────┘ └────┬─────┘ └────┬─────┘ │
│ └──────────────┼──────────────┘ │
└──────────────────────┼──────────────────────────────────┘
│ WebSocket / MQTT
↓
┌─────────────────────────────────────────────────────────┐
│ Gateway 集群 │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │
│ │ Gateway-1 │ │ Gateway-2 │ │ Gateway-N │ │
│ └──────┬──────┘ └──────┬──────┘ └──────┬──────┘ │
└─────────┼────────────────┼─────────────────┼──────────────┘
│ │ │
└────────────────┼─────────────────┘
│ TCP / RPC
↓
┌─────────────────────────────────────────────────────────┐
│ Message Router │
│ (消息路由:根据用户 ID 找到用户的连接) │
└──────────────────────────┬──────────────────────────────┘
│
┌────────────────┼────────────────┐
↓ ↓ ↓
┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐
│ Session Service │ │ Push Service │ │ Message Service │
│ (用户在线状态) │ │ (消息推送) │ │ (消息存储) │
└─────────────────┘ └─────────────────┘ └─────────────────┘
│ │ │
└────────────────┼────────────────┘
↓
┌─────────────────────────────────────────────────────────┐
│ 存储层 │
│ ┌──────────┐ ┌──────────┐ ┌──────────┐ ┌──────────┐ │
│ │ Redis │ │ MySQL │ │ MQ │ │ Kafka │ │
│ │ 会话状态 │ │ 消息存储 │ │ 离线队列 │ │ 消息队列 │ │
│ └──────────┘ └──────────┘ └──────────┘ └──────────┘ │
└─────────────────────────────────────────────────────────┘#3.2 核心组件
| 组件 | 职责 |
|---|---|
| Gateway | WebSocket 连接管理、长连接、心跳 |
| Session Service | 用户在线状态、会话路由 |
| Push Service | 消息推送、离线消息 |
| Message Service | 消息存储、消息同步 |
#四、消息发送流程🔴
#4.1 核心流程
@Service
class MessageService {
@Autowired
private MessageRepository messageRepository;
@Autowired
private SessionService sessionService;
@Autowired
private PushService pushService;
/**
* 发送消息
*/
public Message send(SendMessageRequest request) {
// 1. 生成消息 ID(雪花算法)
long messageId = idGenerator.nextId();
// 2. 持久化消息
Message message = new Message();
message.setId(messageId);
message.setFromUserId(request.getFromUserId());
message.setToUserId(request.getToUserId());
message.setContent(request.getContent());
message.setTimestamp(System.currentTimeMillis());
message.setStatus(MessageStatus.PENDING);
messageRepository.save(message);
// 3. 更新消息状态为成功
message.setStatus(MessageStatus.DELIVERED);
messageRepository.update(message);
// 4. 推送消息
pushService.push(message);
// 5. 返回消息
return message;
}
}#4.2 在线用户消息推送
@Service
class PushService {
@Autowired
private SessionService sessionService;
@Autowired
private WebSocketServer webSocketServer;
public void push(Message message) {
Long toUserId = message.getToUserId();
// 1. 查询用户所有在线会话
List<Session> sessions = sessionService.getUserSessions(toUserId);
for (Session session : sessions) {
// 2. 通过 WebSocket 推送
webSocketServer.sendToSession(
session.getSessionId(),
message
);
}
// 3. 如果没有在线会话,存入离线消息队列
if (sessions.isEmpty()) {
saveOfflineMessage(message);
}
}
}#五、消息可靠性保证🔴
#5.1 消息确认机制(ACK)
// 客户端发送消息
class MessageService {
public void onClientSend(Message message) {
// 1. 客户端发送消息
message.setStatus(MessageStatus.SENDING);
messageRepository.save(message);
// 2. 服务端收到后返回 ACK
message.setStatus(MessageStatus.SENT);
messageRepository.update(message);
// 3. 客户端收到 ACK 后标记已送达
sendAckToClient(message.getId(), AckStatus.SENT);
}
}
// 消息已读回执
class ReadReceiptService {
public void handleReadReceipt(ReadReceipt receipt) {
// 用户已读消息
messageRepository.updateStatus(
receipt.getMessageId(),
MessageStatus.READ
);
// 通知发送方
Long senderId = messageRepository.getSenderId(receipt.getMessageId());
notifySenderRead(senderId, receipt.getMessageId());
}
}#5.2 消息重试机制
@Service
class MessageRetryService {
@Autowired
private MessageRepository messageRepository;
@Autowired
private PushService pushService;
/**
* 定时重试未送达的消息
*/
@Scheduled(fixedDelay = 5000)
public void retryUndeliveredMessages() {
// 查询 5 分钟内未送达的消息
List<Message> undelivered = messageRepository
.findUndeliveredMessages(
MessageStatus.SENT,
System.currentTimeMillis() - 5 * 60 * 1000
);
for (Message message : undelivered) {
try {
// 重试推送
pushService.push(message);
// 更新重试次数
message.incrementRetryCount();
messageRepository.update(message);
} catch (Exception e) {
log.error("重试失败: messageId={}", message.getId(), e);
}
}
}
}#5.3 幂等去重
@Service
class IdempotentService {
@Autowired
private RedisTemplate<String, String> redisTemplate;
private static final String DEDUP_PREFIX = "msg:dedup:";
/**
* 消息去重
*/
public boolean isDuplicate(long messageId) {
String key = DEDUP_PREFIX + messageId;
// SETNX 保证原子性
Boolean result = redisTemplate.opsForValue()
.setIfAbsent(key, "1", Duration.ofMinutes(5));
return !Boolean.TRUE.equals(result);
}
public void sendMessage(Message message) {
if (isDuplicate(message.getId())) {
log.info("消息重复: messageId={}", message.getId());
return;
}
// 正常发送
doSendMessage(message);
}
}#六、消息同步设计🟡
#6.1 多端同步
@Service
class MultiDeviceSyncService {
@Autowired
private SessionService sessionService;
@Autowired
private SyncService syncService;
/**
* 同步消息到所有在线设备
*/
public void syncToAllDevices(Long userId, Message message) {
List<Device> devices = deviceService.getUserDevices(userId);
for (Device device : devices) {
if (device.isOnline()) {
// 在线设备:实时推送
syncService.pushToDevice(device, message);
} else {
// 离线设备:存入离线消息
saveToOfflineQueue(device, message);
}
}
}
}#6.2 离线消息拉取
@Service
class OfflineMessageService {
@Autowired
private OfflineMessageRepository offlineMessageRepository;
/**
* 拉取离线消息
*/
public List<Message> pullOfflineMessages(Long userId, Long lastMessageId) {
// 查询 lastMessageId 之后的所有离线消息
List<OfflineMessage> offlineMessages =
offlineMessageRepository.findByUserIdAfter(userId, lastMessageId);
List<Message> messages = offlineMessages.stream()
.map(OfflineMessage::getMessage)
.collect(Collectors.toList());
// 清理已拉取的离线消息
offlineMessageRepository.deleteByUserIdAndMessageIdBefore(
userId, lastMessageId
);
return messages;
}
}#七、WebSocket 长连接管理🟡
#7.1 连接保活
@Configuration
class WebSocketConfig {
@Bean
public ServerEndpointExporter serverEndpointExporter() {
return new ServerEndpointExporter();
}
}
@Component
@ServerEndpoint("/ws/{userId}")
class WebSocketServer {
@OnOpen
public void onOpen(Session session, @PathParam("userId") Long userId) {
// 1. 保存会话
sessionService.registerSession(userId, session);
// 2. 记录在线状态
redisTemplate.opsForValue().set(
"online:user:" + userId,
"true",
Duration.ofMinutes(30)
);
// 3. 发送心跳
session.setMaxIdleTimeout(60000); // 60秒无活动断开
}
@OnMessage
public void onMessage(String data, Session session) {
// 处理心跳
if ("ping".equals(data)) {
session.getAsyncRemote().sendText("pong");
return;
}
// 处理业务消息
handleBusinessMessage(data, session);
}
@OnClose
public void onClose(Session session) {
// 清理会话
Long userId = getUserIdFromSession(session);
sessionService.unregisterSession(userId, session);
// 更新在线状态
redisTemplate.opsForValue().set(
"online:user:" + userId,
"false",
Duration.ofMinutes(5) // 短暂保留,等重连
);
}
}#7.2 消息路由
@Service
class MessageRouter {
@Autowired
private SessionService sessionService;
@Autowired
private OfflineMessageRepository offlineMessageRepository;
public void route(Message message) {
Long toUserId = message.getToUserId();
// 1. 查找用户的所有在线会话
List<Session> sessions = sessionService.getUserSessions(toUserId);
if (sessions.isEmpty()) {
// 2. 无在线会话,存入离线队列
offlineMessageRepository.save(
new OfflineMessage(toUserId, message)
);
} else {
// 3. 推送到所有在线会话
for (Session session : sessions) {
pushToSession(session, message);
}
}
}
}#八、生产避坑清单🟡
#8.1 消息乱序
// ❌ 错误:多端消息不同步
class MessageService {
public void send(Message message) {
messageRepository.save(message); // 保存到数据库
pushService.push(message); // 推送
}
}
// ✅ 正确:使用消息序列号保证顺序
class MessageService {
public void send(Message message) {
// 1. 分配序列号
long seq = redisTemplate.opsForValue().increment(
"msg:seq:" + message.getToUserId()
);
message.setSequence(seq);
// 2. 保存消息
messageRepository.save(message);
// 3. 推送
pushService.push(message);
}
}#8.2 连接风暴
// ❌ 错误:没有保护机制
@Component
class WebSocketServer {
public void broadcast(Message message) {
for (Session session : allSessions) {
session.getAsyncRemote().sendText(message); // 可能打爆服务器
}
}
}
// ✅ 正确:限流 + 批量发送
@Component
class WebSocketServer {
private final RateLimiter rateLimiter = RateLimiter.create(10000); // 限流
public void broadcast(Message message) {
for (Session session : allSessions) {
if (rateLimiter.tryAcquire()) {
session.getAsyncRemote().sendText(message);
}
}
}
}【架构权衡】 IM 系统的核心矛盾是"实时性 vs 可靠性"。为了实时性,需要长连接 + 快速推送;为了可靠性,需要消息持久化 + ACK + 重试。实际系统需要在两者之间找到平衡。
#九、面试总结
#9.1 核心追问
- "如何保证消息不丢?" —— 持久化 + ACK + 重试
- "如何保证消息不重复?" —— 幂等去重
- "如何保证消息顺序?" —— 序列号
- "断线重连后如何恢复消息?" —— 离线拉取 + 增量同步
#9.2 级别差异
| 级别 | 期望回答 |
|---|---|
| P5 | 能说出 WebSocket 基本用法 |
| P6 | 能说出消息确认、重试、幂等等机制 |
| P7 | 能设计完整的 IM 系统架构 |