#事件驱动架构
#一次促销活动的血案
2023年双十一前,我们团队上线了一个促销活动。活动规则:用户下单后,根据金额赠送不同等级的优惠券。
原本的实现是同步调用:
@Service
class OrderService {
@Autowired
private CouponService couponService;
public void createOrder(Order order) {
orderDao.save(order);
// 同步调用:所有操作串行执行
couponService.grantCoupon(order); // 如果这里慢,订单就慢
notificationService.notify(order);
pointsService.addPoints(order);
inventoryService.lock(order);
}
}上线后问题来了:优惠券系统响应变慢,导致整个订单接口 P99 延迟从 50ms 飙升到 3s。更要命的是,优惠券系统宕机,所有订单都失败了。
这就是同步耦合的代价:任何一个下游系统的故障,都会导致主流程失败。
事件驱动架构解决的就是这个问题:把同步调用变成异步事件,让系统之间充分解耦。
#二、同步架构 vs 事件驱动🔴
#2.1 同步架构的问题
客户端 → OrderService → CouponService → NotificationService → PointsService → 库存服务
(全部串行,任意一个失败全部失败)问题清单:
- 紧耦合:订单服务需要知道所有下游服务的接口
- 单点故障:任何一个下游失败,整个流程失败
- 扩展性差:加一个下游就要改订单服务
- 性能瓶颈:所有操作串行执行,总时间 = 所有操作之和
#2.2 事件驱动架构
客户端 → OrderService → (发布事件)
↓
Message Queue
↓
┌──────────┼──────────┐
↓ ↓ ↓
CouponService PointsService NotificationService
(并行消费) (并行消费) (并行消费)优势:
- 松耦合:订单服务只管发事件,不知道谁订阅
- 容错性:下游失败不影响主流程(可以重试)
- 可扩展:新增下游只需要订阅事件,不需要改订单服务
- 性能高:并行处理,总时间 = 最慢操作的时间
#三、事件驱动模式🟡
#3.1 事件通知模式(Event Notification)
最简单的模式:事件只是通知,不携带完整数据。
// 事件只包含 ID,具体数据由消费者查询
class OrderCreatedEvent {
private final String eventId;
private final Long orderId;
private final Instant timestamp;
// 只传 orderId,不传完整订单数据
}
class CouponService {
@KafkaListener(topics = "order.created")
public void handleOrderCreated(OrderCreatedEvent event) {
// 消费者自己查询完整数据
Order order = orderService.getOrder(event.getOrderId());
couponService.grantCoupon(order);
}
}适用场景:消费者不需要太多数据,或者数据变化不频繁。
#3.2 事件携带状态模式(Event-Carried State Transfer)
事件携带完整的业务数据,消费者不需要再查询。
class OrderCreatedEvent {
private final String eventId;
private final Long orderId;
private final Long userId;
private final String userName;
private final BigDecimal amount;
private final List<OrderItemDTO> items;
private final Instant timestamp;
}
class CouponService {
@KafkaListener(topics = "order.created")
public void handleOrderCreated(OrderCreatedEvent event) {
// 不需要再查数据库,直接用事件中的数据
couponService.grantCoupon(event.getUserId(), event.getAmount());
}
}优势:消费者不需要调用生产者的接口,减少网络开销。
劣势:数据可能过时(消费者拿到的是事件发送时的快照)。
#3.3 领域事件模式(Domain Event)
在 DDD 中,聚合状态变化时发布领域事件:
class Order extends BaseAggregateRoot {
private OrderId id;
private OrderStatus status;
public void pay(Money amount) {
// 业务逻辑
this.status = OrderStatus.PAID;
// 发布领域事件
registerEvent(new OrderPaidEvent(
this.id,
this.userId,
this.totalAmount,
Instant.now()
));
}
}
// 事件发布
class OrderEventPublisher {
@Autowired
private ApplicationEventPublisher publisher;
public void publish(DomainEvent event) {
publisher.publishEvent(event);
}
}#3.4 CQRS 事件流
结合 CQRS,读模型通过消费事件来更新:
// 订单聚合发布事件
class Order extends BaseAggregateRoot {
public void create(...) {
// 创建订单逻辑
registerEvent(new OrderCreatedEvent(this));
}
}
// 读模型订阅事件
@KafkaListener(topics = "order.events")
class OrderReadModelUpdater {
@Autowired
private JdbcTemplate jdbcTemplate;
@Transactional
public void handleOrderCreated(OrderCreatedEvent event) {
jdbcTemplate.update(
"INSERT INTO order_read_model (id, user_id, amount, status, created_at) " +
"VALUES (?, ?, ?, ?, ?)",
event.getOrderId(), event.getUserId(), event.getAmount(),
event.getStatus(), event.getCreatedAt()
);
}
@Transactional
public void handleOrderPaid(OrderPaidEvent event) {
jdbcTemplate.update(
"UPDATE order_read_model SET status = ?, paid_at = ? WHERE id = ?",
event.getStatus(), event.getPaidAt(), event.getOrderId()
);
}
}#四、Kafka 生产实践🟡
#4.1 消息分区策略
// 按用户 ID 分区:同一用户的消息在同一个分区(保证顺序)
@Configuration
class KafkaConfig {
@Bean
public NewTopic orderTopic() {
return TopicBuilder.name("order.events")
.partitions(12) // 分区数 = 消费者数量
.replicas(1)
.build();
}
}
// 生产者指定分区键
@Service
class OrderService {
@Autowired
private KafkaTemplate<String, OrderEvent> kafkaTemplate;
public void createOrder(Order order) {
orderDao.save(order);
OrderCreatedEvent event = new OrderCreatedEvent(order);
// 分区键:userId,保证同一用户的订单在同一个分区
kafkaTemplate.send("order.events", order.getUserId().toString(), event);
}
}
// 消费者:按分区消费
@KafkaListener(topics = "order.events", groupId = "coupon-service")
class CouponConsumer {
@KafkaHandler
public void handleOrderCreated(OrderCreatedEvent event) {
// 消费逻辑
}
}#4.2 消息顺序保证
Kafka 只保证单分区有序。如果需要全局有序,需要:
- 只用一个分区(性能瓶颈)
- 或者在业务层面处理乱序(如加版本号)
// 业务层面的顺序处理
class OrderProcessor {
private final Map<String, Long> lastVersion = new ConcurrentHashMap<>();
public void processOrderEvent(OrderEvent event) {
String key = event.getOrderId();
Long current = lastVersion.get(key);
if (current != null && event.getVersion() <= current) {
// 版本号不递增,说明是乱序消息,丢弃或延迟处理
log.warn("Out-of-order event: {} <= {}", event.getVersion(), current);
return;
}
lastVersion.put(key, event.getVersion());
doProcess(event);
}
}#4.3 消息幂等处理
消息队列中最重要的问题:消息重复。消费者的处理逻辑必须是幂等的。
// 方式一:数据库唯一键约束
@Service
class CouponService {
@Transactional
public void grantCoupon(Long orderId, BigDecimal amount) {
// 幂等键:orderId
// 如果重复插入,数据库会报错(或用 INSERT IGNORE)
couponDao.insertIgnore(
Coupon.builder()
.id(UUID.randomUUID().toString())
.orderId(orderId) // 唯一键
.amount(calculate(amount))
.build()
);
}
}
// 方式二:消费者幂等表
@Service
class IdempotentConsumer {
@Autowired
private RedisTemplate<String, String> redis;
public void handleEvent(OrderEvent event) {
String key = "processed:" + event.getEventId();
// SETNX 保证幂等
if (Boolean.FALSE.equals(redis.opsForValue().setIfAbsent(key, "1", Duration.ofDays(7)))) {
log.info("Event already processed: {}", event.getEventId());
return;
}
doProcess(event);
}
}#五、事务消息🟡
#5.1 问题
业务操作和消息发送如何保证一致性?
// ❌ 错误:业务和消息不在同一事务
@Service
class OrderService {
@Transactional
public void createOrder(Order order) {
orderDao.save(order);
// 事务提交后,消息发送失败
kafkaTemplate.send("order.events", order.getId().toString(), event);
}
}
// 结果:数据库有订单,但消息没发出去#5.2 RocketMQ 事务消息
// RocketMQ 事务消息:两阶段提交
@Service
class OrderService {
@Autowired
private TransactionMQProducer producer;
public void createOrder(Order order) {
orderDao.save(order);
OrderEvent event = new OrderCreatedEvent(order);
// 发送半消息(prepare)
producer.sendMessageInTransaction(
new Message("order.events", JSON.toJSONString(event)),
(msg, arg) -> {
// 本地事务执行(这里扣库存等操作)
orderService.executeLocalTransaction(order);
return LocalTransactionState.COMMIT_MESSAGE;
},
null
);
}
}
// 事务监听器
class TransactionListenerImpl implements TransactionListener {
@Override
public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
// 本地事务
try {
executeLocalTransaction((Order) arg);
return LocalTransactionState.COMMIT_MESSAGE;
} catch (Exception e) {
return LocalTransactionState.ROLLBACK_MESSAGE;
}
}
@Override
public LocalTransactionState checkLocalTransaction(MessageExt msg) {
// 回调检查:RocketMQ 自动调用,查询本地事务是否成功
// 根据订单状态判断
Order order = orderDao.findById(parseOrderId(msg));
if (order != null) {
return LocalTransactionState.COMMIT_MESSAGE;
}
return LocalTransactionState.ROLLBACK_MESSAGE;
}
}【架构权衡】 RocketMQ 事务消息的代价:
- 架构复杂度增加
- 延迟(需要等待事务状态确认)
- 回调接口可能重复调用(需要幂等处理)
如果业务允许最终一致性,用"本地消息表"方案更简单。
#六、生产避坑清单
#6.1 消息丢失
// ❌ 生产者:发送后不确认
kafkaTemplate.send("topic", event); // fire-and-forget,可能丢失
// ✅ 正确:确认发送结果
ListenableFuture<SendResult> future = kafkaTemplate.send("topic", event);
future.addCallback(
result -> log.info("Message sent: {}", result.getRecordMetadata()),
ex -> {
log.error("Failed to send message", ex);
// 重试或告警
}
);#6.2 消息堆积
// ❌ 消费者:处理太慢
@KafkaListener
public void handle(OrderEvent event) {
// 同步调用数据库,耗时 500ms
// 如果生产速度 > 消费速度,消息堆积
Thread.sleep(500);
}
// ✅ 正确:批量处理 + 并行消费
@Bean
public ConcurrentKafkaListenerContainerFactory factory(
KafkaProperties properties) {
ConcurrentKafkaListenerContainerFactory factory =
new ConcurrentKafkaListenerContainerFactory();
factory.setConcurrency(6); // 并行消费线程数
factory.setBatchListener(true); // 批量消费
return factory;
}
@KafkaListener
public void handleBatch(List<OrderEvent> events) {
// 批量处理,数据库批量写入
orderService.processBatch(events);
}#6.3 重复消费
// 幂等处理是必须的
@Service
class ConsumerService {
@Autowired
private OrderDao orderDao;
public void processEvent(OrderCreatedEvent event) {
// 幂等检查
if (orderDao.existsByOrderId(event.getOrderId())) {
log.info("Event already processed: {}", event.getOrderId());
return;
}
// 处理逻辑
orderDao.updateStatus(event.getOrderId(), event.getStatus());
}
}#七、面试总结
#7.1 核心追问
- "同步调用和事件驱动的区别?" —— 解耦、容错、性能
- "消息队列怎么保证顺序?" —— 单分区有序
- "消息丢失怎么办?" —— 确认机制、重试
- "消息重复怎么办?" —— 幂等处理
- "事务消息怎么实现?" —— 两阶段提交、RocketMQ
#7.2 级别差异
| 级别 | 期望回答 |
|---|---|
| P5 | 能说出同步和异步的区别,知道消息队列基本概念 |
| P6 | 能说出分区策略、幂等处理、顺序保证 |
| P7 | 能设计事件驱动架构,知道事务消息的权衡 |