事件溯源Event Sourcing
一个监管合规的噩梦
2022年,我们金融支付系统收到了监管部门的审计要求:需要提供过去5年内任意账户任意时间点的余额快照,精确到每一次交易变更。
技术团队慌了。
我们的系统用的是传统增删改模式——订单表、账户表、流水表。只有当前状态,没有历史轨迹。想查2019年3月15日某个账户的余额?对不起,只能根据流水记录手工推算,但问题是:余额模型发生过两次大改版,2019年的交易记录格式和现在完全不同。
最后靠5个人连轴转了3周,人工还原数据,差点没赶上监管检查的死线。
这次事故之后,我们全面引入了 Event Sourcing(事件溯源)。
问题定义
传统系统设计只存储当前状态(Current State),也叫"快照模式"。这种模式的问题是:
- 丢失了过程:你只知道账户现在有多少钱,不知道这笔钱是怎么来的
- 审计成本高:要么靠流水表反推,要么加一整套审计日志
- 重放困难:业务逻辑变更后,无法用新逻辑重跑历史数据
- 调试盲区:生产问题排查时,只能靠日志"猜"当时发生了什么
【架构权衡】
事件溯源的核心思路是:不要存储状态,要存储事件。每次状态变更都是一条不可变的事件记录,当前状态由事件重放计算得出。这就像把"银行存折"和"余额"的关系反转了——不是用余额来验证流水,而是用流水来计算余额。
方案演进
我们先看看为什么传统方案会失败,再看事件溯源如何解决。
方案A:快照模式(传统做法)
// 账户表只存当前余额
class Account {
Long id;
String accountNo;
BigDecimal balance; // 当前余额快照
LocalDateTime updatedAt;
}
优点:简单,查询当前状态快
缺点:丢失历史,审计难,重放不可能
适用场景:状态不敏感、不需要审计的简单业务
方案B:流水账模式(打补丁做法)
class AccountTransaction {
Long id;
String accountNo;
BigDecimal amount;
String type; // DEPOSIT/WITHDRAW/TRANSFER
LocalDateTime createdAt;
}
优点:有审计轨迹,可追溯
缺点:重放需要业务逻辑和表结构耦合;重放慢(百万级流水重放可能需要数小时);事件本身可能包含业务含义变更(如手续费规则改变),直接重放会出错
适用场景:审计要求不高的业务
方案C:事件溯源(Event Sourcing)
// 每个状态变更都是一条不可变事件
abstract class DomainEvent {
String eventId; // 唯一标识
String aggregateId; // 聚合根ID
Long version; // 聚合版本号(乐观锁)
LocalDateTime occurredAt;// 事件发生时间
String eventType; // 事件类型
}
// 具体事件
class AccountOpened extends DomainEvent {
String accountNo;
String ownerName;
BigDecimal initialBalance;
}
class MoneyDeposited extends DomainEvent {
BigDecimal amount;
String currency;
String remark;
}
class MoneyWithdrawn extends DomainEvent {
BigDecimal amount;
String currency;
String remark;
}
class MoneyTransferred extends DomainEvent {
BigDecimal amount;
String currency;
String fromAccountNo;
String toAccountNo;
String traceId; // 关联追踪ID
}
优点:完整的历史、可重放、可重建任意时间点状态、审计天然合规
缺点:查询当前状态需要重放所有事件(慢);事件膨胀;事件 Schema 变更(版本化);最终一致性处理复杂
适用场景:金融、审计、订单、BPM等对历史过程敏感的业务
核心设计
聚合根与事件存储
事件溯源的单位是聚合根(Aggregate)。每个聚合根维护自己的事件序列,版本号用乐观锁控制。
// 聚合根:账户
class AccountAggregate extends AbstractAggregateRoot {
private String accountNo;
private BigDecimal balance;
private String status; // ACTIVE/FROZEN/CLOSED
private Long version;
// 私有构造函数,只能通过事件重建或创建
AccountAggregate() {}
// 工厂方法:创建账户
public static AccountAggregate create(String accountNo, String ownerName, BigDecimal initialBalance) {
AccountAggregate account = new AccountAggregate();
account.apply(new AccountOpened(UUID.randomUUID().toString(),
accountNo, ownerName, initialBalance, LocalDateTime.now()));
return account;
}
// 从事件重建
public void apply(DomainEvent event) {
if (event instanceof AccountOpened e) {
this.accountNo = e.getAccountNo();
this.balance = e.getInitialBalance();
this.status = "ACTIVE";
} else if (event instanceof MoneyDeposited e) {
this.balance = this.balance.add(e.getAmount());
} else if (event instanceof MoneyWithdrawn e) {
if (this.balance.compareTo(e.getAmount()) < 0) {
throw new IllegalStateException("余额不足");
}
this.balance = this.balance.subtract(e.getAmount());
}
// ...
this.version++;
}
// 命令方法:存款
public void deposit(BigDecimal amount, String remark) {
if (!"ACTIVE".equals(this.status)) {
throw new IllegalStateException("账户状态异常");
}
apply(new MoneyDeposited(UUID.randomUUID().toString(),
this.accountNo, amount, "CNY", remark, LocalDateTime.now()));
}
// 命令方法:取款
public void withdraw(BigDecimal amount, String remark) {
if (!"ACTIVE".equals(this.status)) {
throw new IllegalStateException("账户状态异常");
}
if (this.balance.compareTo(amount) < 0) {
throw new InsufficientBalanceException("余额不足,当前余额:" + this.balance);
}
apply(new MoneyWithdrawn(UUID.randomUUID().toString(),
this.accountNo, amount, "CNY", remark, LocalDateTime.now()));
}
}
关键设计点:
- 私有构造函数:防止外部直接 new,确保聚合根只能通过事件创建或重放
- apply 方法处理事件:根据事件类型更新状态,保证状态变更和事件记录一致
- 命令校验前置:在生成事件之前做业务规则校验,校验失败不产生事件
事件存储设计
// 事件存储(Event Store)
interface EventStore {
// 保存事件
void append(String aggregateId, List<DomainEvent> events, Long expectedVersion);
// 读取聚合根的所有事件
List<DomainEvent> getEventsForAggregate(String aggregateId);
// 读取指定时间范围内的事件(用于投影重建)
List<DomainEvent> getEventsSince(LocalDateTime since);
}
// MySQL 实现
class MySQLEventStore implements EventStore {
@Autowired
private JdbcTemplate jdbcTemplate;
@Override
public void append(String aggregateId, List<DomainEvent> events, Long expectedVersion) {
String sql = """
INSERT INTO domain_events
(event_id, aggregate_id, event_type, event_data, version, occurred_at)
VALUES (?, ?, ?, ?, ?, ?)
""";
jdbcTemplate.batchUpdate(sql, events, events.size(),
(ps, event) -> {
ps.setString(1, event.getEventId());
ps.setString(2, aggregateId);
ps.setString(3, event.getEventType());
ps.setString(4, toJson(event)); // 事件数据序列化
ps.setLong(5, event.getVersion());
ps.setObject(6, event.getOccurredAt());
});
}
@Override
public List<DomainEvent> getEventsForAggregate(String aggregateId) {
String sql = "SELECT * FROM domain_events WHERE aggregate_id = ? ORDER BY version";
// 从结果集重建事件对象(需要事件反序列化)
}
}
-- 事件表设计
CREATE TABLE domain_events (
id BIGINT AUTO_INCREMENT PRIMARY KEY,
event_id VARCHAR(36) NOT NULL UNIQUE, -- 事件全局唯一ID
aggregate_id VARCHAR(64) NOT NULL, -- 聚合根ID
event_type VARCHAR(128) NOT NULL, -- 事件类型名
event_data JSON NOT NULL, -- 事件数据(JSON)
version BIGINT NOT NULL, -- 聚合版本号
occurred_at TIMESTAMP NOT NULL, -- 事件发生时间
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
INDEX idx_aggregate_version (aggregate_id, version),
INDEX idx_occurred_at (occurred_at)
) ENGINE=InnoDB;
快照机制:解决重放性能问题
事件重放的核心问题是:事件数量会随着时间线性增长。一个活跃账户5年可能产生上百万条事件,每次查余额都要重放全部事件——这是不可接受的。
快照机制来解决这个问题。
// 快照存储
class SnapshotStore {
public void saveSnapshot(AccountAggregate aggregate) {
Snapshot snapshot = new Snapshot();
snapshot.setAggregateId(aggregate.getAccountNo());
snapshot.setVersion(aggregate.getVersion());
snapshot.setBalance(aggregate.getBalance());
snapshot.setStatus(aggregate.getStatus());
snapshot.setTakenAt(LocalDateTime.now());
snapshotRepository.save(snapshot);
}
// 从快照恢复 + 重放增量事件
public AccountAggregate restore(String aggregateId) {
Snapshot snapshot = snapshotRepository.findLatest(aggregateId);
AccountAggregate aggregate = new AccountAggregate();
if (snapshot != null) {
aggregate.restoreFromSnapshot(snapshot); // 快速恢复到快照版本
// 只重放快照之后的增量事件
List<DomainEvent> incrementalEvents =
eventStore.getEventsSince(snapshot.getTakenAt());
for (DomainEvent event : incrementalEvents) {
aggregate.apply(event);
}
} else {
// 没有快照,从零重放
List<DomainEvent> allEvents = eventStore.getEventsForAggregate(aggregateId);
for (DomainEvent event : allEvents) {
aggregate.apply(event);
}
}
return aggregate;
}
}
快照策略建议:
- 每处理1000个事件打一次快照(可配置)
- 快照异步存储,不阻塞主流程
- 快照保留最近N个版本,支持回溯
事件版本化:处理 Schema 变更
业务演进过程中,事件 Schema 必然会变。比如早期 MoneyTransferred 事件没有 remark 字段,后来加了。
// 事件版本化:Upcasting 模式
class MoneyTransferredUpcaster implements EventUpcaster {
@Override
public boolean canUpcast(String eventType, Long fromVersion, Long toVersion) {
return "MoneyTransferred".equals(eventType) && fromVersion < 2L;
}
@Override
public DomainEvent upcast(DomainEvent event, Long fromVersion, Long toVersion) {
// 从 v1 升级到 v2:补充 remark 字段
if (fromVersion == 1L) {
MoneyTransferredV2 upgraded = new MoneyTransferredV2();
upgraded.setAmount(event.getAmount());
upgraded.setCurrency(event.getCurrency());
upgraded.setFromAccountNo(event.getFromAccountNo());
upgraded.setToAccountNo(event.getToAccountNo());
upgraded.setRemark(""); // 旧事件无备注,补空字符串
return upgraded;
}
return event;
}
}
【架构权衡】
事件版本化是 Event Sourcing 在生产环境中最容易被低估的复杂度。网上很多文章只讲"存储事件"多美好,但不提 Schema 变更的坑:
- 每次事件字段变更都要写 Upcaster
- Upcaster 链顺序不能出错
- 历史事件 Upcast 后要不要持久化?持久化的话会改历史,不持久化每次重放都要 Upcast
生产避坑
坑1:事件数据被篡改
事件是不可变的,但这只是应用层的约定。如果有人在数据库里直接 UPDATE,后果不堪设想。
解决方案:
- 事件表去掉 UPDATE 和 DELETE 权限(DBA 层控制)
- 每个事件加 checksum,存储时校验
- 事件写入后立即异步计算 hash 并存储,读取时校验
坑2:投影重建时服务不可用
投影(Projection)是事件溯源的读模型。当投影重建时,如果数据量很大,服务在重建期间可能完全不可用。
解决方案:
- 投影重建走独立的离线任务,不影响在线读
- 蓝绿投影:新旧投影并行,切换时无感知
- 投影分片:按聚合根 ID 哈希分片,支持并行重建
坑3:并发写入导致版本冲突
两个命令同时处理同一聚合根,乐观锁版本号会冲突。如果处理不当,事件会丢失。
// 错误的并发处理
public void handle(DepositCommand cmd) {
AccountAggregate account = eventStore.getEventsForAggregate(cmd.getAccountNo())
.replay(); // 这里拿到了 v10
account.deposit(cmd.getAmount(), cmd.getRemark());
// 另一个线程已经写入了 v11
eventStore.append(cmd.getAccountNo(), account.getUncommittedEvents(), 10L); // 期望v10,但已是v11
// 乐观锁异常,事件丢失!
}
正确的并发处理:使用 EXPECTED_VERSION 乐观锁,捕获 ConcurrencyException 后重试整个命令。
public void handle(DepositCommand cmd) {
int maxRetries = 3;
for (int i = 0; i < maxRetries; i++) {
try {
AccountAggregate account = aggregateRepository.find(cmd.getAccountNo());
account.deposit(cmd.getAmount(), cmd.getRemark());
aggregateRepository.save(account); // save 内部使用乐观锁
return;
} catch (ConcurrencyException e) {
// 乐观锁冲突,重试
log.warn("乐观锁冲突,重试第{}次", i + 1);
}
}
throw new ConcurrencyException("重试次数超限");
}
工程代价评估
落地 Checklist
CQRS + Event Sourcing 组合
Event Sourcing 最好的搭档是 CQRS。写模型用事件存储,读模型用投影构建:
graph LR
A[命令] --> B[聚合根]
B --> C[事件存储]
C --> D[事件总线]
D --> E[投影重建器]
D --> F[另一个投影]
E --> G[读模型DB]
F --> H[读模型DB]
写入链路:命令 → 聚合根校验 → 生成事件 → 写入 Event Store → 发布到事件总线
读取链路:从投影 DB 直接查询(不经过聚合根)
这套组合在电商订单、金融账务、物流追踪等场景下非常有效,但组合的复杂度也是双倍的——建议只在真正需要审计追溯和复杂读模型的场景下使用。
【架构权衡】
Event Sourcing 不是银弹。它最适合的场景:
- 审计合规要求高(金融、医疗、法律)
- 业务过程本身是核心价值(订单全流程、审批流)
- 需要任意时间点的状态回溯
- CQRS 读写分离,查询模型非常复杂
不太适合的场景:
- CRUD 为主的简单业务(过度设计)
- 查询 QPS 极高(投影重建有延迟)
- 团队对 DDD 和事件驱动没有经验积累