IM即时通讯系统设计

一个"简单"聊天室的教训

2018年,我们团队接到了一个需求:"做个内部聊天工具。"

我想:"不就是 WebSocket 推送消息吗?20行代码搞定。"

两周后,我被产品经理追着打了:

  • "为什么张三的消息李四没收到?"
  • "为什么消息顺序是乱的?"
  • "为什么用户 A 在线、用户 B 也在线,但消息发送失败?"
  • "为什么消息有重复?"
  • "为什么断网重连后消息丢失了?"

IM 系统的复杂度远超"发消息"本身。


二、IM 系统的核心挑战🔴

2.1 核心指标

指标目标
消息延迟< 100ms(P99)
可用性99.99%
消息到达率99.999%
支持在线用户100万+
消息存储10亿+ 条

2.2 核心问题

IM 系统的三大难题:

1. 消息可靠性
   - 不丢消息(消息持久化 + 确认机制)
   - 不重复消息(幂等去重)
   - 不乱序(消息 ID + 序列号)

2. 消息实时性
   - WebSocket 长连接
   - 心跳保活
   - 重连恢复

3. 消息一致性
   - 多端同步(手机 + PC + Web)
   - 消息已读未读
   - 消息漫游

三、系统架构设计🔴

3.1 整体架构

┌─────────────────────────────────────────────────────────┐
│                      用户端                              │
│  ┌─────────┐   ┌─────────┐   ┌─────────┐             │
│  │   Web    │   │ Android │   │   iOS    │             │
│  └────┬─────┘   └────┬─────┘   └────┬─────┘             │
│       └──────────────┼──────────────┘                   │
└──────────────────────┼──────────────────────────────────┘
                       │ WebSocket / MQTT

┌─────────────────────────────────────────────────────────┐
│                    Gateway 集群                          │
│  ┌─────────────┐  ┌─────────────┐  ┌─────────────┐      │
│  │  Gateway-1  │  │  Gateway-2  │  │  Gateway-N  │      │
│  └──────┬──────┘  └──────┬──────┘  └──────┬──────┘      │
└─────────┼────────────────┼─────────────────┼──────────────┘
          │                │                 │
          └────────────────┼─────────────────┘
                           │ TCP / RPC

┌─────────────────────────────────────────────────────────┐
│                   Message Router                         │
│         (消息路由:根据用户 ID 找到用户的连接)            │
└──────────────────────────┬──────────────────────────────┘

          ┌────────────────┼────────────────┐
          ↓                ↓                ↓
┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐
│  Session Service │ │  Push Service   │ │ Message Service │
│  (用户在线状态)  │ │  (消息推送)    │ │  (消息存储)    │
└─────────────────┘ └─────────────────┘ └─────────────────┘
          │                │                │
          └────────────────┼────────────────┘

┌─────────────────────────────────────────────────────────┐
│                    存储层                                │
│  ┌──────────┐  ┌──────────┐  ┌──────────┐  ┌──────────┐ │
│  │  Redis   │  │  MySQL   │  │   MQ    │  │  Kafka   │ │
│  │ 会话状态  │  │ 消息存储  │  │ 离线队列  │  │ 消息队列  │ │
│  └──────────┘  └──────────┘  └──────────┘  └──────────┘ │
└─────────────────────────────────────────────────────────┘

3.2 核心组件

组件职责
GatewayWebSocket 连接管理、长连接、心跳
Session Service用户在线状态、会话路由
Push Service消息推送、离线消息
Message Service消息存储、消息同步

四、消息发送流程🔴

4.1 核心流程

@Service
class MessageService {
    @Autowired
    private MessageRepository messageRepository;
    @Autowired
    private SessionService sessionService;
    @Autowired
    private PushService pushService;

    /**
     * 发送消息
     */
    public Message send(SendMessageRequest request) {
        // 1. 生成消息 ID(雪花算法)
        long messageId = idGenerator.nextId();

        // 2. 持久化消息
        Message message = new Message();
        message.setId(messageId);
        message.setFromUserId(request.getFromUserId());
        message.setToUserId(request.getToUserId());
        message.setContent(request.getContent());
        message.setTimestamp(System.currentTimeMillis());
        message.setStatus(MessageStatus.PENDING);
        messageRepository.save(message);

        // 3. 更新消息状态为成功
        message.setStatus(MessageStatus.DELIVERED);
        messageRepository.update(message);

        // 4. 推送消息
        pushService.push(message);

        // 5. 返回消息
        return message;
    }
}

4.2 在线用户消息推送

@Service
class PushService {
    @Autowired
    private SessionService sessionService;
    @Autowired
    private WebSocketServer webSocketServer;

    public void push(Message message) {
        Long toUserId = message.getToUserId();

        // 1. 查询用户所有在线会话
        List<Session> sessions = sessionService.getUserSessions(toUserId);

        for (Session session : sessions) {
            // 2. 通过 WebSocket 推送
            webSocketServer.sendToSession(
                session.getSessionId(),
                message
            );
        }

        // 3. 如果没有在线会话,存入离线消息队列
        if (sessions.isEmpty()) {
            saveOfflineMessage(message);
        }
    }
}

五、消息可靠性保证🔴

5.1 消息确认机制(ACK)

// 客户端发送消息
class MessageService {
    public void onClientSend(Message message) {
        // 1. 客户端发送消息
        message.setStatus(MessageStatus.SENDING);
        messageRepository.save(message);

        // 2. 服务端收到后返回 ACK
        message.setStatus(MessageStatus.SENT);
        messageRepository.update(message);

        // 3. 客户端收到 ACK 后标记已送达
        sendAckToClient(message.getId(), AckStatus.SENT);
    }
}

// 消息已读回执
class ReadReceiptService {
    public void handleReadReceipt(ReadReceipt receipt) {
        // 用户已读消息
        messageRepository.updateStatus(
            receipt.getMessageId(),
            MessageStatus.READ
        );

        // 通知发送方
        Long senderId = messageRepository.getSenderId(receipt.getMessageId());
        notifySenderRead(senderId, receipt.getMessageId());
    }
}

5.2 消息重试机制

@Service
class MessageRetryService {
    @Autowired
    private MessageRepository messageRepository;
    @Autowired
    private PushService pushService;

    /**
     * 定时重试未送达的消息
     */
    @Scheduled(fixedDelay = 5000)
    public void retryUndeliveredMessages() {
        // 查询 5 分钟内未送达的消息
        List<Message> undelivered = messageRepository
            .findUndeliveredMessages(
                MessageStatus.SENT,
                System.currentTimeMillis() - 5 * 60 * 1000
            );

        for (Message message : undelivered) {
            try {
                // 重试推送
                pushService.push(message);

                // 更新重试次数
                message.incrementRetryCount();
                messageRepository.update(message);

            } catch (Exception e) {
                log.error("重试失败: messageId={}", message.getId(), e);
            }
        }
    }
}

5.3 幂等去重

@Service
class IdempotentService {
    @Autowired
    private RedisTemplate<String, String> redisTemplate;

    private static final String DEDUP_PREFIX = "msg:dedup:";

    /**
     * 消息去重
     */
    public boolean isDuplicate(long messageId) {
        String key = DEDUP_PREFIX + messageId;

        // SETNX 保证原子性
        Boolean result = redisTemplate.opsForValue()
            .setIfAbsent(key, "1", Duration.ofMinutes(5));

        return !Boolean.TRUE.equals(result);
    }

    public void sendMessage(Message message) {
        if (isDuplicate(message.getId())) {
            log.info("消息重复: messageId={}", message.getId());
            return;
        }

        // 正常发送
        doSendMessage(message);
    }
}

六、消息同步设计🟡

6.1 多端同步

@Service
class MultiDeviceSyncService {
    @Autowired
    private SessionService sessionService;
    @Autowired
    private SyncService syncService;

    /**
     * 同步消息到所有在线设备
     */
    public void syncToAllDevices(Long userId, Message message) {
        List<Device> devices = deviceService.getUserDevices(userId);

        for (Device device : devices) {
            if (device.isOnline()) {
                // 在线设备:实时推送
                syncService.pushToDevice(device, message);
            } else {
                // 离线设备:存入离线消息
                saveToOfflineQueue(device, message);
            }
        }
    }
}

6.2 离线消息拉取

@Service
class OfflineMessageService {
    @Autowired
    private OfflineMessageRepository offlineMessageRepository;

    /**
     * 拉取离线消息
     */
    public List<Message> pullOfflineMessages(Long userId, Long lastMessageId) {
        // 查询 lastMessageId 之后的所有离线消息
        List<OfflineMessage> offlineMessages =
            offlineMessageRepository.findByUserIdAfter(userId, lastMessageId);

        List<Message> messages = offlineMessages.stream()
            .map(OfflineMessage::getMessage)
            .collect(Collectors.toList());

        // 清理已拉取的离线消息
        offlineMessageRepository.deleteByUserIdAndMessageIdBefore(
            userId, lastMessageId
        );

        return messages;
    }
}

七、WebSocket 长连接管理🟡

7.1 连接保活

@Configuration
class WebSocketConfig {
    @Bean
    public ServerEndpointExporter serverEndpointExporter() {
        return new ServerEndpointExporter();
    }
}

@Component
@ServerEndpoint("/ws/{userId}")
class WebSocketServer {

    @OnOpen
    public void onOpen(Session session, @PathParam("userId") Long userId) {
        // 1. 保存会话
        sessionService.registerSession(userId, session);

        // 2. 记录在线状态
        redisTemplate.opsForValue().set(
            "online:user:" + userId,
            "true",
            Duration.ofMinutes(30)
        );

        // 3. 发送心跳
        session.setMaxIdleTimeout(60000); // 60秒无活动断开
    }

    @OnMessage
    public void onMessage(String data, Session session) {
        // 处理心跳
        if ("ping".equals(data)) {
            session.getAsyncRemote().sendText("pong");
            return;
        }

        // 处理业务消息
        handleBusinessMessage(data, session);
    }

    @OnClose
    public void onClose(Session session) {
        // 清理会话
        Long userId = getUserIdFromSession(session);
        sessionService.unregisterSession(userId, session);

        // 更新在线状态
        redisTemplate.opsForValue().set(
            "online:user:" + userId,
            "false",
            Duration.ofMinutes(5) // 短暂保留,等重连
        );
    }
}

7.2 消息路由

@Service
class MessageRouter {
    @Autowired
    private SessionService sessionService;
    @Autowired
    private OfflineMessageRepository offlineMessageRepository;

    public void route(Message message) {
        Long toUserId = message.getToUserId();

        // 1. 查找用户的所有在线会话
        List<Session> sessions = sessionService.getUserSessions(toUserId);

        if (sessions.isEmpty()) {
            // 2. 无在线会话,存入离线队列
            offlineMessageRepository.save(
                new OfflineMessage(toUserId, message)
            );
        } else {
            // 3. 推送到所有在线会话
            for (Session session : sessions) {
                pushToSession(session, message);
            }
        }
    }
}

八、生产避坑清单🟡

8.1 消息乱序

// ❌ 错误:多端消息不同步
class MessageService {
    public void send(Message message) {
        messageRepository.save(message); // 保存到数据库
        pushService.push(message);       // 推送
    }
}

// ✅ 正确:使用消息序列号保证顺序
class MessageService {
    public void send(Message message) {
        // 1. 分配序列号
        long seq = redisTemplate.opsForValue().increment(
            "msg:seq:" + message.getToUserId()
        );
        message.setSequence(seq);

        // 2. 保存消息
        messageRepository.save(message);

        // 3. 推送
        pushService.push(message);
    }
}

8.2 连接风暴

// ❌ 错误:没有保护机制
@Component
class WebSocketServer {
    public void broadcast(Message message) {
        for (Session session : allSessions) {
            session.getAsyncRemote().sendText(message); // 可能打爆服务器
        }
    }
}

// ✅ 正确:限流 + 批量发送
@Component
class WebSocketServer {
    private final RateLimiter rateLimiter = RateLimiter.create(10000); // 限流

    public void broadcast(Message message) {
        for (Session session : allSessions) {
            if (rateLimiter.tryAcquire()) {
                session.getAsyncRemote().sendText(message);
            }
        }
    }
}

【架构权衡】 IM 系统的核心矛盾是"实时性 vs 可靠性"。为了实时性,需要长连接 + 快速推送;为了可靠性,需要消息持久化 + ACK + 重试。实际系统需要在两者之间找到平衡。


九、面试总结

9.1 核心追问

  1. "如何保证消息不丢?" —— 持久化 + ACK + 重试
  2. "如何保证消息不重复?" —— 幂等去重
  3. "如何保证消息顺序?" —— 序列号
  4. "断线重连后如何恢复消息?" —— 离线拉取 + 增量同步

9.2 级别差异

级别期望回答
P5能说出 WebSocket 基本用法
P6能说出消息确认、重试、幂等等机制
P7能设计完整的 IM 系统架构