#IM消息可靠性保证
#一条消息的三种命运
IM 系统中,每条消息都可能面临三种命运:
1. 消息丢失:发送方发了,接收方没收到
2. 消息重复:发送方发了一条,接收方收到了两条
3. 消息乱序:发送顺序是 A → B → C,接收顺序是 A → C → B任何一种情况都会导致糟糕的用户体验。
#二、消息不丢失🔴
#2.1 消息链路
发送方 ──发送──▶ 服务器 ──存储──▶ 数据库
│ │
│ └───推送──▶ 接收方(在线)
│
└───存入──▶ 离线消息队列(接收方不在线)#2.2 服务端存储 + ACK
@Service
class MessageService {
@Autowired
private MessageDao messageDao;
@Autowired
private PushService pushService;
@Autowired
private OfflineQueue offlineQueue;
/**
* 发送消息
*/
public Message send(MessageRequest request) {
// 1. 生成消息 ID
long messageId = idGenerator.nextId();
// 2. 持久化消息(最重要的一步)
Message message = new Message();
message.setId(messageId);
message.setFromUserId(request.getFromUserId());
message.setToUserId(request.getToUserId());
message.setContent(request.getContent());
message.setTimestamp(System.currentTimeMillis());
message.setStatus(MessageStatus.PENDING);
messageDao.save(message);
// 3. 更新状态为已存储
message.setStatus(MessageStatus.STORED);
messageDao.update(message);
// 4. 推送给接收方
if (userService.isOnline(request.getToUserId())) {
pushService.pushToUser(request.getToUserId(), message);
message.setStatus(MessageStatus.DELIVERED);
} else {
// 接收方不在线,存入离线队列
offlineQueue.push(request.getToUserId(), message);
message.setStatus(MessageStatus.QUEUED);
}
messageDao.update(message);
return message;
}
}#2.3 客户端 ACK
class MessageClient {
private final Map<Long, Message> pendingMessages = new ConcurrentHashMap<>();
/**
* 发送消息并等待 ACK
*/
public CompletableFuture<Message> send(Message message) {
pendingMessages.put(message.getId(), message);
return CompletableFuture.supplyAsync(() -> {
// 1. 发送消息到服务器
server.send(message);
// 2. 等待 ACK
return waitForAck(message.getId(), 30000); // 30秒超时
}).thenApply(ack -> {
pendingMessages.remove(message.getId());
return message;
}).exceptionally(ex -> {
// 发送失败,标记重试
markForRetry(message);
throw new RuntimeException(ex);
});
}
}#三、消息不重复🔴
#3.1 服务端去重
@Service
class IdempotentService {
@Autowired
private RedisTemplate<String, String> redisTemplate;
private static final String DEDUP_PREFIX = "msg:dedup:";
/**
* 检查消息是否重复
*/
public boolean isDuplicate(long messageId) {
String key = DEDUP_PREFIX + messageId;
// SETNX 原子操作
Boolean result = redisTemplate.opsForValue()
.setIfAbsent(key, "1", Duration.ofDays(7));
return !Boolean.TRUE.equals(result);
}
/**
* 处理消息
*/
public void handleMessage(Message message) {
if (isDuplicate(message.getId())) {
log.info("消息重复: {}", message.getId());
return;
}
// 处理业务逻辑
doProcess(message);
}
}#3.2 客户端幂等
class MessageClient {
private final Set<Long> receivedMessageIds = ConcurrentHashMap.newKeySet();
/**
* 处理收到的消息
*/
public void onMessageReceived(Message message) {
if (receivedMessageIds.contains(message.getId())) {
// 重复消息,忽略
return;
}
receivedMessageIds.add(message.getId());
// 渲染消息
renderMessage(message);
// 发送已读回执
sendReadReceipt(message.getId());
}
}#四、消息顺序保证🟡
#4.1 单会话顺序
@Service
class MessageService {
/**
* 获取会话消息(保证顺序)
*/
public List<Message> getConversation(Long userId1, Long userId2,
Long lastMessageId, int limit) {
// 1. 按消息 ID 升序获取
List<Message> messages = messageDao.findByConversation(
userId1, userId2, lastMessageId, limit
);
// 2. 按时间戳排序(作为保底)
messages.sort(Comparator.comparingLong(Message::getId));
return messages;
}
}#4.2 全局顺序 vs 会话顺序
| 类型 | 说明 | 实现方式 |
|---|---|---|
| 全局顺序 | 所有消息按绝对时间排序 | 单队列,所有消息排队 |
| 会话顺序 | 每个会话内消息有序,不同会话可并行 | 分会话队列 |
全局顺序(绝对有序):
消息队列 ──单队列──▶ 顺序消费
会话顺序(相对有序):
消息队列 ──按会话分片──▶ 多消费者并行消费#4.3 乱序处理
class MessageRenderer {
private final Map<String, TreeMap<Long, Message>> pendingMessages =
new ConcurrentHashMap<>();
/**
* 渲染消息,处理乱序
*/
public void renderMessage(Long conversationId, Message message) {
TreeMap<Long, Message> conversation =
pendingMessages.computeIfAbsent(
String.valueOf(conversationId),
k -> new TreeMap<>()
);
conversation.put(message.getId(), message);
// 渲染连续的消息
Long lowestKey = conversation.firstKey();
List<Message> toRender = new ArrayList<>();
for (Map.Entry<Long, Message> entry : conversation.entrySet()) {
if (entry.getKey() == lowestKey + toRender.size()) {
toRender.add(entry.getValue());
} else {
break;
}
}
// 渲染并移除已渲染的消息
for (Message m : toRender) {
render(m);
conversation.remove(m.getId());
}
}
}#五、离线消息同步🟡
#5.1 拉取离线消息
@Service
class OfflineMessageService {
@Autowired
private OfflineMessageDao offlineMessageDao;
/**
* 拉取离线消息
*/
public List<Message> pullOfflineMessages(Long userId, Long lastMessageId) {
// 1. 查询离线消息
List<OfflineMessage> offlineMessages =
offlineMessageDao.findByUserIdAfter(userId, lastMessageId);
List<Message> messages = offlineMessages.stream()
.map(OfflineMessage::getMessage)
.collect(Collectors.toList());
// 2. 清理已拉取的离线消息
offlineMessageDao.deleteByUserIdAndMessageIdBefore(
userId, lastMessageId
);
return messages;
}
}#5.2 增量同步
@Service
class IncrementalSyncService {
/**
* 增量同步消息(基于时间戳)
*/
public SyncResult sync(Long userId, Long lastSyncTime) {
// 1. 查询增量消息
List<Message> newMessages = messageDao.findAfter(userId, lastSyncTime);
// 2. 查询已删除消息
List<Long> deletedIds = deletedMessageDao.findAfter(userId, lastSyncTime);
return new SyncResult(
newMessages,
deletedIds,
System.currentTimeMillis()
);
}
}【架构权衡】 IM 消息可靠性的核心是消息不丢。所有其他机制(幂等、去重、排序)都建立在这个基础之上。
#六、面试总结
| 级别 | 期望回答 |
|---|---|
| P5 | 能说出消息 ACK 机制 |
| P6 | 能说出幂等和顺序保证的实现 |
| P7 | 能设计完整的 IM 可靠性方案 |