事件驱动架构

一次促销活动的血案

2023年双十一前,我们团队上线了一个促销活动。活动规则:用户下单后,根据金额赠送不同等级的优惠券。

原本的实现是同步调用:

@Service
class OrderService {
    @Autowired
    private CouponService couponService;

    public void createOrder(Order order) {
        orderDao.save(order);

        // 同步调用:所有操作串行执行
        couponService.grantCoupon(order); // 如果这里慢,订单就慢

        notificationService.notify(order);
        pointsService.addPoints(order);
        inventoryService.lock(order);
    }
}

上线后问题来了:优惠券系统响应变慢,导致整个订单接口 P99 延迟从 50ms 飙升到 3s。更要命的是,优惠券系统宕机,所有订单都失败了。

这就是同步耦合的代价:任何一个下游系统的故障,都会导致主流程失败。

事件驱动架构解决的就是这个问题:把同步调用变成异步事件,让系统之间充分解耦。


二、同步架构 vs 事件驱动🔴

2.1 同步架构的问题

客户端 → OrderService → CouponService → NotificationService → PointsService → 库存服务
         (全部串行,任意一个失败全部失败)

问题清单

  1. 紧耦合:订单服务需要知道所有下游服务的接口
  2. 单点故障:任何一个下游失败,整个流程失败
  3. 扩展性差:加一个下游就要改订单服务
  4. 性能瓶颈:所有操作串行执行,总时间 = 所有操作之和

2.2 事件驱动架构

客户端 → OrderService → (发布事件)

               Message Queue

         ┌──────────┼──────────┐
         ↓          ↓          ↓
   CouponService  PointsService  NotificationService
   (并行消费)     (并行消费)     (并行消费)

优势

  1. 松耦合:订单服务只管发事件,不知道谁订阅
  2. 容错性:下游失败不影响主流程(可以重试)
  3. 可扩展:新增下游只需要订阅事件,不需要改订单服务
  4. 性能高:并行处理,总时间 = 最慢操作的时间

三、事件驱动模式🟡

3.1 事件通知模式(Event Notification)

最简单的模式:事件只是通知,不携带完整数据。

// 事件只包含 ID,具体数据由消费者查询
class OrderCreatedEvent {
    private final String eventId;
    private final Long orderId;
    private final Instant timestamp;

    // 只传 orderId,不传完整订单数据
}

class CouponService {
    @KafkaListener(topics = "order.created")
    public void handleOrderCreated(OrderCreatedEvent event) {
        // 消费者自己查询完整数据
        Order order = orderService.getOrder(event.getOrderId());
        couponService.grantCoupon(order);
    }
}

适用场景:消费者不需要太多数据,或者数据变化不频繁。

3.2 事件携带状态模式(Event-Carried State Transfer)

事件携带完整的业务数据,消费者不需要再查询。

class OrderCreatedEvent {
    private final String eventId;
    private final Long orderId;
    private final Long userId;
    private final String userName;
    private final BigDecimal amount;
    private final List<OrderItemDTO> items;
    private final Instant timestamp;
}

class CouponService {
    @KafkaListener(topics = "order.created")
    public void handleOrderCreated(OrderCreatedEvent event) {
        // 不需要再查数据库,直接用事件中的数据
        couponService.grantCoupon(event.getUserId(), event.getAmount());
    }
}

优势:消费者不需要调用生产者的接口,减少网络开销。

劣势:数据可能过时(消费者拿到的是事件发送时的快照)。

3.3 领域事件模式(Domain Event)

在 DDD 中,聚合状态变化时发布领域事件:

class Order extends BaseAggregateRoot {
    private OrderId id;
    private OrderStatus status;

    public void pay(Money amount) {
        // 业务逻辑
        this.status = OrderStatus.PAID;

        // 发布领域事件
        registerEvent(new OrderPaidEvent(
            this.id,
            this.userId,
            this.totalAmount,
            Instant.now()
        ));
    }
}

// 事件发布
class OrderEventPublisher {
    @Autowired
    private ApplicationEventPublisher publisher;

    public void publish(DomainEvent event) {
        publisher.publishEvent(event);
    }
}

3.4 CQRS 事件流

结合 CQRS,读模型通过消费事件来更新:

// 订单聚合发布事件
class Order extends BaseAggregateRoot {
    public void create(...) {
        // 创建订单逻辑
        registerEvent(new OrderCreatedEvent(this));
    }
}

// 读模型订阅事件
@KafkaListener(topics = "order.events")
class OrderReadModelUpdater {
    @Autowired
    private JdbcTemplate jdbcTemplate;

    @Transactional
    public void handleOrderCreated(OrderCreatedEvent event) {
        jdbcTemplate.update(
            "INSERT INTO order_read_model (id, user_id, amount, status, created_at) " +
            "VALUES (?, ?, ?, ?, ?)",
            event.getOrderId(), event.getUserId(), event.getAmount(),
            event.getStatus(), event.getCreatedAt()
        );
    }

    @Transactional
    public void handleOrderPaid(OrderPaidEvent event) {
        jdbcTemplate.update(
            "UPDATE order_read_model SET status = ?, paid_at = ? WHERE id = ?",
            event.getStatus(), event.getPaidAt(), event.getOrderId()
        );
    }
}

四、Kafka 生产实践🟡

4.1 消息分区策略

// 按用户 ID 分区:同一用户的消息在同一个分区(保证顺序)
@Configuration
class KafkaConfig {
    @Bean
    public NewTopic orderTopic() {
        return TopicBuilder.name("order.events")
            .partitions(12)  // 分区数 = 消费者数量
            .replicas(1)
            .build();
    }
}

// 生产者指定分区键
@Service
class OrderService {
    @Autowired
    private KafkaTemplate<String, OrderEvent> kafkaTemplate;

    public void createOrder(Order order) {
        orderDao.save(order);

        OrderCreatedEvent event = new OrderCreatedEvent(order);
        // 分区键:userId,保证同一用户的订单在同一个分区
        kafkaTemplate.send("order.events", order.getUserId().toString(), event);
    }
}

// 消费者:按分区消费
@KafkaListener(topics = "order.events", groupId = "coupon-service")
class CouponConsumer {
    @KafkaHandler
    public void handleOrderCreated(OrderCreatedEvent event) {
        // 消费逻辑
    }
}

4.2 消息顺序保证

Kafka 只保证单分区有序。如果需要全局有序,需要:

  1. 只用一个分区(性能瓶颈)
  2. 或者在业务层面处理乱序(如加版本号)
// 业务层面的顺序处理
class OrderProcessor {
    private final Map<String, Long> lastVersion = new ConcurrentHashMap<>();

    public void processOrderEvent(OrderEvent event) {
        String key = event.getOrderId();
        Long current = lastVersion.get(key);

        if (current != null && event.getVersion() <= current) {
            // 版本号不递增,说明是乱序消息,丢弃或延迟处理
            log.warn("Out-of-order event: {} <= {}", event.getVersion(), current);
            return;
        }

        lastVersion.put(key, event.getVersion());
        doProcess(event);
    }
}

4.3 消息幂等处理

消息队列中最重要的问题:消息重复。消费者的处理逻辑必须是幂等的。

// 方式一:数据库唯一键约束
@Service
class CouponService {
    @Transactional
    public void grantCoupon(Long orderId, BigDecimal amount) {
        // 幂等键:orderId
        // 如果重复插入,数据库会报错(或用 INSERT IGNORE)
        couponDao.insertIgnore(
            Coupon.builder()
                .id(UUID.randomUUID().toString())
                .orderId(orderId) // 唯一键
                .amount(calculate(amount))
                .build()
        );
    }
}

// 方式二:消费者幂等表
@Service
class IdempotentConsumer {
    @Autowired
    private RedisTemplate<String, String> redis;

    public void handleEvent(OrderEvent event) {
        String key = "processed:" + event.getEventId();

        // SETNX 保证幂等
        if (Boolean.FALSE.equals(redis.opsForValue().setIfAbsent(key, "1", Duration.ofDays(7)))) {
            log.info("Event already processed: {}", event.getEventId());
            return;
        }

        doProcess(event);
    }
}

五、事务消息🟡

5.1 问题

业务操作和消息发送如何保证一致性?

// ❌ 错误:业务和消息不在同一事务
@Service
class OrderService {
    @Transactional
    public void createOrder(Order order) {
        orderDao.save(order);
        // 事务提交后,消息发送失败
        kafkaTemplate.send("order.events", order.getId().toString(), event);
    }
}

// 结果:数据库有订单,但消息没发出去

5.2 RocketMQ 事务消息

// RocketMQ 事务消息:两阶段提交
@Service
class OrderService {
    @Autowired
    private TransactionMQProducer producer;

    public void createOrder(Order order) {
        orderDao.save(order);

        OrderEvent event = new OrderCreatedEvent(order);
        // 发送半消息(prepare)
        producer.sendMessageInTransaction(
            new Message("order.events", JSON.toJSONString(event)),
            (msg, arg) -> {
                // 本地事务执行(这里扣库存等操作)
                orderService.executeLocalTransaction(order);
                return LocalTransactionState.COMMIT_MESSAGE;
            },
            null
        );
    }
}

// 事务监听器
class TransactionListenerImpl implements TransactionListener {
    @Override
    public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
        // 本地事务
        try {
            executeLocalTransaction((Order) arg);
            return LocalTransactionState.COMMIT_MESSAGE;
        } catch (Exception e) {
            return LocalTransactionState.ROLLBACK_MESSAGE;
        }
    }

    @Override
    public LocalTransactionState checkLocalTransaction(MessageExt msg) {
        // 回调检查:RocketMQ 自动调用,查询本地事务是否成功
        // 根据订单状态判断
        Order order = orderDao.findById(parseOrderId(msg));
        if (order != null) {
            return LocalTransactionState.COMMIT_MESSAGE;
        }
        return LocalTransactionState.ROLLBACK_MESSAGE;
    }
}

【架构权衡】 RocketMQ 事务消息的代价:

  1. 架构复杂度增加
  2. 延迟(需要等待事务状态确认)
  3. 回调接口可能重复调用(需要幂等处理)

如果业务允许最终一致性,用"本地消息表"方案更简单。


六、生产避坑清单

6.1 消息丢失

// ❌ 生产者:发送后不确认
kafkaTemplate.send("topic", event); // fire-and-forget,可能丢失

// ✅ 正确:确认发送结果
ListenableFuture<SendResult> future = kafkaTemplate.send("topic", event);
future.addCallback(
    result -> log.info("Message sent: {}", result.getRecordMetadata()),
    ex -> {
        log.error("Failed to send message", ex);
        // 重试或告警
    }
);

6.2 消息堆积

// ❌ 消费者:处理太慢
@KafkaListener
public void handle(OrderEvent event) {
    // 同步调用数据库,耗时 500ms
    // 如果生产速度 > 消费速度,消息堆积
    Thread.sleep(500);
}

// ✅ 正确:批量处理 + 并行消费
@Bean
public ConcurrentKafkaListenerContainerFactory factory(
        KafkaProperties properties) {
    ConcurrentKafkaListenerContainerFactory factory =
        new ConcurrentKafkaListenerContainerFactory();
    factory.setConcurrency(6); // 并行消费线程数
    factory.setBatchListener(true); // 批量消费
    return factory;
}

@KafkaListener
public void handleBatch(List<OrderEvent> events) {
    // 批量处理,数据库批量写入
    orderService.processBatch(events);
}

6.3 重复消费

// 幂等处理是必须的
@Service
class ConsumerService {
    @Autowired
    private OrderDao orderDao;

    public void processEvent(OrderCreatedEvent event) {
        // 幂等检查
        if (orderDao.existsByOrderId(event.getOrderId())) {
            log.info("Event already processed: {}", event.getOrderId());
            return;
        }

        // 处理逻辑
        orderDao.updateStatus(event.getOrderId(), event.getStatus());
    }
}

七、面试总结

7.1 核心追问

  1. "同步调用和事件驱动的区别?" —— 解耦、容错、性能
  2. "消息队列怎么保证顺序?" —— 单分区有序
  3. "消息丢失怎么办?" —— 确认机制、重试
  4. "消息重复怎么办?" —— 幂等处理
  5. "事务消息怎么实现?" —— 两阶段提交、RocketMQ

7.2 级别差异

级别期望回答
P5能说出同步和异步的区别,知道消息队列基本概念
P6能说出分区策略、幂等处理、顺序保证
P7能设计事件驱动架构,知道事务消息的权衡