IM消息可靠性保证

一条消息的三种命运

IM 系统中,每条消息都可能面临三种命运:

1. 消息丢失:发送方发了,接收方没收到
2. 消息重复:发送方发了一条,接收方收到了两条
3. 消息乱序:发送顺序是 A → B → C,接收顺序是 A → C → B

任何一种情况都会导致糟糕的用户体验。


二、消息不丢失🔴

2.1 消息链路

发送方 ──发送──▶ 服务器 ──存储──▶ 数据库
       │                    │
       │                    └───推送──▶ 接收方(在线)

       └───存入──▶ 离线消息队列(接收方不在线)

2.2 服务端存储 + ACK

@Service
class MessageService {
    @Autowired
    private MessageDao messageDao;
    @Autowired
    private PushService pushService;
    @Autowired
    private OfflineQueue offlineQueue;

    /**
     * 发送消息
     */
    public Message send(MessageRequest request) {
        // 1. 生成消息 ID
        long messageId = idGenerator.nextId();

        // 2. 持久化消息(最重要的一步)
        Message message = new Message();
        message.setId(messageId);
        message.setFromUserId(request.getFromUserId());
        message.setToUserId(request.getToUserId());
        message.setContent(request.getContent());
        message.setTimestamp(System.currentTimeMillis());
        message.setStatus(MessageStatus.PENDING);
        messageDao.save(message);

        // 3. 更新状态为已存储
        message.setStatus(MessageStatus.STORED);
        messageDao.update(message);

        // 4. 推送给接收方
        if (userService.isOnline(request.getToUserId())) {
            pushService.pushToUser(request.getToUserId(), message);
            message.setStatus(MessageStatus.DELIVERED);
        } else {
            // 接收方不在线,存入离线队列
            offlineQueue.push(request.getToUserId(), message);
            message.setStatus(MessageStatus.QUEUED);
        }

        messageDao.update(message);
        return message;
    }
}

2.3 客户端 ACK

class MessageClient {
    private final Map<Long, Message> pendingMessages = new ConcurrentHashMap<>();

    /**
     * 发送消息并等待 ACK
     */
    public CompletableFuture<Message> send(Message message) {
        pendingMessages.put(message.getId(), message);

        return CompletableFuture.supplyAsync(() -> {
            // 1. 发送消息到服务器
            server.send(message);

            // 2. 等待 ACK
            return waitForAck(message.getId(), 30000); // 30秒超时
        }).thenApply(ack -> {
            pendingMessages.remove(message.getId());
            return message;
        }).exceptionally(ex -> {
            // 发送失败,标记重试
            markForRetry(message);
            throw new RuntimeException(ex);
        });
    }
}

三、消息不重复🔴

3.1 服务端去重

@Service
class IdempotentService {
    @Autowired
    private RedisTemplate<String, String> redisTemplate;

    private static final String DEDUP_PREFIX = "msg:dedup:";

    /**
     * 检查消息是否重复
     */
    public boolean isDuplicate(long messageId) {
        String key = DEDUP_PREFIX + messageId;

        // SETNX 原子操作
        Boolean result = redisTemplate.opsForValue()
            .setIfAbsent(key, "1", Duration.ofDays(7));

        return !Boolean.TRUE.equals(result);
    }

    /**
     * 处理消息
     */
    public void handleMessage(Message message) {
        if (isDuplicate(message.getId())) {
            log.info("消息重复: {}", message.getId());
            return;
        }

        // 处理业务逻辑
        doProcess(message);
    }
}

3.2 客户端幂等

class MessageClient {
    private final Set<Long> receivedMessageIds = ConcurrentHashMap.newKeySet();

    /**
     * 处理收到的消息
     */
    public void onMessageReceived(Message message) {
        if (receivedMessageIds.contains(message.getId())) {
            // 重复消息,忽略
            return;
        }

        receivedMessageIds.add(message.getId());

        // 渲染消息
        renderMessage(message);

        // 发送已读回执
        sendReadReceipt(message.getId());
    }
}

四、消息顺序保证🟡

4.1 单会话顺序

@Service
class MessageService {
    /**
     * 获取会话消息(保证顺序)
     */
    public List<Message> getConversation(Long userId1, Long userId2,
                                        Long lastMessageId, int limit) {
        // 1. 按消息 ID 升序获取
        List<Message> messages = messageDao.findByConversation(
            userId1, userId2, lastMessageId, limit
        );

        // 2. 按时间戳排序(作为保底)
        messages.sort(Comparator.comparingLong(Message::getId));

        return messages;
    }
}

4.2 全局顺序 vs 会话顺序

类型说明实现方式
全局顺序所有消息按绝对时间排序单队列,所有消息排队
会话顺序每个会话内消息有序,不同会话可并行分会话队列
全局顺序(绝对有序):
  消息队列 ──单队列──▶ 顺序消费

会话顺序(相对有序):
  消息队列 ──按会话分片──▶ 多消费者并行消费

4.3 乱序处理

class MessageRenderer {
    private final Map<String, TreeMap<Long, Message>> pendingMessages =
        new ConcurrentHashMap<>();

    /**
     * 渲染消息,处理乱序
     */
    public void renderMessage(Long conversationId, Message message) {
        TreeMap<Long, Message> conversation =
            pendingMessages.computeIfAbsent(
                String.valueOf(conversationId),
                k -> new TreeMap<>()
            );

        conversation.put(message.getId(), message);

        // 渲染连续的消息
        Long lowestKey = conversation.firstKey();
        List<Message> toRender = new ArrayList<>();

        for (Map.Entry<Long, Message> entry : conversation.entrySet()) {
            if (entry.getKey() == lowestKey + toRender.size()) {
                toRender.add(entry.getValue());
            } else {
                break;
            }
        }

        // 渲染并移除已渲染的消息
        for (Message m : toRender) {
            render(m);
            conversation.remove(m.getId());
        }
    }
}

五、离线消息同步🟡

5.1 拉取离线消息

@Service
class OfflineMessageService {
    @Autowired
    private OfflineMessageDao offlineMessageDao;

    /**
     * 拉取离线消息
     */
    public List<Message> pullOfflineMessages(Long userId, Long lastMessageId) {
        // 1. 查询离线消息
        List<OfflineMessage> offlineMessages =
            offlineMessageDao.findByUserIdAfter(userId, lastMessageId);

        List<Message> messages = offlineMessages.stream()
            .map(OfflineMessage::getMessage)
            .collect(Collectors.toList());

        // 2. 清理已拉取的离线消息
        offlineMessageDao.deleteByUserIdAndMessageIdBefore(
            userId, lastMessageId
        );

        return messages;
    }
}

5.2 增量同步

@Service
class IncrementalSyncService {
    /**
     * 增量同步消息(基于时间戳)
     */
    public SyncResult sync(Long userId, Long lastSyncTime) {
        // 1. 查询增量消息
        List<Message> newMessages = messageDao.findAfter(userId, lastSyncTime);

        // 2. 查询已删除消息
        List<Long> deletedIds = deletedMessageDao.findAfter(userId, lastSyncTime);

        return new SyncResult(
            newMessages,
            deletedIds,
            System.currentTimeMillis()
        );
    }
}

【架构权衡】 IM 消息可靠性的核心是消息不丢。所有其他机制(幂等、去重、排序)都建立在这个基础之上。


六、面试总结

级别期望回答
P5能说出消息 ACK 机制
P6能说出幂等和顺序保证的实现
P7能设计完整的 IM 可靠性方案