观察者模式

一个订单状态通知的噩梦

2023年双十一,我们的订单系统出现了一个诡异的 bug:用户支付成功后,积分没到账,库存没释放,物流系统也没收到通知。

排查了 5 个小时,发现根因是一个 if 语句的改动:

// 原来的代码
class OrderService {
    public void pay(Order order) {
        order.setStatus(OrderStatus.PAID);
        orderDao.update(order);

        // 通知各个系统
        if (order.getAmount() > 100) {  // 开发同学加的"优化"
            notificationService.notify(order); // 漏掉了!
        }
        pointsService.addPoints(order);
        inventoryService.release(order);
        logisticsService.notify(order);
    }
}

order.getAmount() > 100 的判断导致小金额订单的通知全部丢失。

这就是强耦合的代价:订单服务和所有下游服务紧绑在一起,任何改动都可能影响其他系统。观察者模式解决的就是这个问题:让发布者和订阅者解耦。


二、观察者模式核心结构🔴

2.1 标准写法

// 主题(发布者)接口
interface Subject {
    void attach(Observer observer);
    void detach(Observer observer);
    void notifyObservers();
}

// 观察者接口
interface Observer {
    void update(Order order);
}

// 具体主题
class OrderSubject implements Subject {
    private final List<Observer> observers = new ArrayList<>();
    private Order order;

    @Override
    public void attach(Observer observer) {
        observers.add(observer);
    }

    @Override
    public void detach(Observer observer) {
        observers.remove(observer);
    }

    @Override
    public void notifyObservers() {
        for (Observer o : observers) {
            o.update(order);
        }
    }

    public void setOrder(Order order) {
        this.order = order;
        this.notifyObservers();
    }
}

// 具体观察者:积分服务
class PointsObserver implements Observer {
    @Override
    public void update(Order order) {
        pointsService.addPoints(order);
    }
}

// 具体观察者:库存服务
class InventoryObserver implements Observer {
    @Override
    public void update(Order order) {
        inventoryService.release(order);
    }
}

// 具体观察者:物流服务
class LogisticsObserver implements Observer {
    @Override
    public void update(Order order) {
        logisticsService.notify(order);
    }
}

2.2 使用方式

// 创建主题
OrderSubject orderSubject = new OrderSubject();

// 注册观察者
orderSubject.attach(new PointsObserver());
orderSubject.attach(new InventoryObserver());
orderSubject.attach(new LogisticsObserver());

// 订单状态变化,自动通知所有观察者
orderSubject.setOrder(paidOrder); // 三个观察者都会被调用

现在,无论加多少个下游系统,都不需要修改 OrderService

// 新增一个观察者,不需要改订单服务
class SmsObserver implements Observer {
    @Override
    public void update(Order order) {
        smsService.send(order);
    }
}

orderSubject.attach(new SmsObserver()); // 一行注册代码

三、JDK 内置的观察者模式🔴

3.1 Observable 类

JDK 1.0 就提供了 java.util.Observable

// JDK 1.0-1.8 的 Observable
class OrderObservable extends Observable {
    private Order order;

    public void setOrder(Order order) {
        this.order = order;
        setChanged();        // 标记状态已改变
        notifyObservers();   // 通知所有观察者
    }
}

// 观察者
Observer pointsObserver = (obs, arg) -> {
    Order order = (Order) arg;
    pointsService.addPoints(order);
};

orderObservable.addObserver(pointsObserver);

3.2 Observable 被废弃的原因

JDK 9 正式废弃了 Observable,原因如下:

// 问题一:Observable 不是接口,是类 —— 无法实现,只能继承
// 这意味着一个类只能有一个 Observable 祖先
class MyClass extends Observable { } // 只能继承一个类

// 问题二:线程安全需要自己处理
Observable observable = new Observable();
// 在多线程环境中,setChanged() 的调用需要加锁

// 问题三:序列化问题
// Observable 的内部状态无法正确序列化

// 问题四:无法组合多个 Observable

【架构权衡】 JDK 废弃 Observable 不是因为观察者模式错了,而是因为它的实现有问题。正确的做法是用接口 + 实现类的组合,而不是继承。Spring 的 ApplicationEventMulticaster 就是更好的实现。


四、Spring 事件驱动🟡

4.1 Spring 的观察者模式实现

Spring 用 ApplicationEventApplicationListener 重新实现了观察者模式:

// 事件:订单支付成功
public class OrderPaidEvent extends ApplicationEvent {
    private final Order order;

    public OrderPaidEvent(Object source, Order order) {
        super(source);
        this.order = order;
    }

    public Order getOrder() {
        return order;
    }
}

// 监听器:积分服务
@Component
public class PointsListener {
    @EventListener
    public void handleOrderPaid(OrderPaidEvent event) {
        pointsService.addPoints(event.getOrder());
    }
}

// 监听器:库存服务
@Component
public class InventoryListener {
    @EventListener
    public void handleOrderPaid(OrderPaidEvent event) {
        inventoryService.release(event.getOrder());
    }
}

// 发布事件
@Service
class OrderService {
    @Autowired
    private ApplicationEventPublisher publisher;

    public void pay(Order order) {
        order.setStatus(OrderStatus.PAID);
        orderDao.update(order);

        publisher.publishEvent(new OrderPaidEvent(this, order));
        // 三个 Listener 都会被调用,不需要改 OrderService
    }
}

4.2 @EventListener 的条件过滤

@Component
public class VipOrderListener {
    @EventListener
    public void handleVipOrder(OrderPaidEvent event) {
        if (event.getOrder().getUser().isVip()) {
            // VIP 订单特殊处理
        }
    }
}

// 更好的方式:条件表达式
@EventListener(condition = "#event.order.user.vip")
public void handleVipOrder(OrderPaidEvent event) {
    // 只有 VIP 用户的订单才会触发
}

4.3 异步事件处理

@Configuration
public class AsyncConfig {
    @Bean
    public ApplicationEventMulticaster applicationEventMulticaster() {
        SimpleApplicationEventMulticaster multicaster =
            new SimpleApplicationEventMulticaster();
        multicaster.setTaskExecutor(new SimpleAsyncTaskExecutor());
        return multicaster;
    }
}

// 或者直接在 Listener 上标注异步
@Component
public class NotificationListener {
    @Async
    @EventListener
    public void handleOrderPaid(OrderPaidEvent event) {
        // 异步执行,不阻塞主流程
        notificationService.send(event.getOrder());
    }
}

五、线程安全问题🟡

5.1 基础实现的线程问题

class OrderSubject {
    private final List<Observer> observers = new ArrayList<>();

    public void attach(Observer o) {
        observers.add(o); // 非线程安全!
    }

    public void detach(Observer o) {
        observers.remove(o); // 非线程安全!
    }

    public void notifyObservers() {
        for (Observer o : observers) { // 迭代过程中可能修改
            o.update(order);
        }
    }
}

并发场景下的两个问题:

  1. 并发修改异常ConcurrentModificationException
  2. 事件丢失或重复发送

5.2 线程安全版本

class OrderSubject {
    // 方式一:CopyOnWriteArrayList(读多写少场景)
    private final CopyOnWriteArrayList<Observer> observers = new CopyOnWriteArrayList<>();

    // 方式二:synchronized + 快照
    private final List<Observer> observers = new ArrayList<>();

    public void attach(Observer o) {
        synchronized (observers) {
            observers.add(o);
        }
    }

    public void notifyObservers() {
        List<Observer> snapshot;
        synchronized (observers) {
            snapshot = new ArrayList<>(observers);
        }
        for (Observer o : snapshot) { // 迭代快照,不影响后续修改
            o.update(order);
        }
    }
}

5.3 同步 vs 异步通知

方式优点缺点适用场景
同步通知实现简单,事件立即生效阻塞主流程,一个观察者慢影响全部观察者处理快
异步通知不阻塞主流程事件可能乱序,需要处理失败重试观察者处理慢
事务同步事件和主操作在同一个事务中实现复杂需要数据一致性
⚠️

Spring 的 @Async 事件处理有一个坑:事务不会自动传播到异步线程。如果观察者需要事务支持,需要手动开启事务或使用 TransactionSynchronization


六、生产避坑清单

6.1 循环依赖问题

// ❌ 危险:观察者中发布新事件
class PointsListener {
    @EventListener
    public void handleOrderPaid(OrderPaidEvent event) {
        pointsService.addPoints(event.getOrder());
        // 在这里发布新事件?小心循环依赖!
        publisher.publishEvent(new PointsAddedEvent(this, points));
    }
}

6.2 异常处理

// ❌ 错误:一个观察者抛异常,其他观察者不执行
@EventListener
public void handleOrderPaid(OrderPaidEvent event) {
    throw new RuntimeException("DB error!"); // 其他观察者被跳过
}

// ✅ 正确:使用 ErrorHandler 或 try-catch
@Bean
public ApplicationEventMulticaster applicationEventMulticaster() {
    SimpleApplicationEventMulticaster multicaster =
        new SimpleApplicationEventMulticaster();
    multicaster.setErrorHandler(e -> {
        log.error("Event handling failed", e);
        // 记录日志,不影响其他观察者
    });
    return multicaster;
}

6.3 事件排序

Spring 默认按照 @OrderOrdered 接口排序:

@EventListener
@Order(Ordered.HIGHEST_PRECEDENCE) // 最先执行
public void handleAudit(OrderPaidEvent event) {
    // 审计日志
}

@EventListener
@Order(Ordered.LOWEST_PRECEDENCE) // 最后执行
public void handleNotification(OrderPaidEvent event) {
    // 发送通知
}

七、面试总结

7.1 核心追问

  1. "JDK 的 Observable 为什么被废弃?" —— 继承而非接口、线程安全、无法组合
  2. "Spring 事件驱动和传统观察者模式的区别?" —— 更简洁、支持异步、条件过滤
  3. "观察者模式有哪些线程安全问题?" —— 并发修改、迭代过程中修改
  4. "观察者模式和 MQ 的区别?" —— 同步 vs 异步、本进程 vs 跨进程

7.2 级别差异

级别期望回答
P5能写出观察者模式基本结构,知道 Spring @EventListener
P6能说出 JDK Observable 废弃的原因,知道线程安全处理
P7能对比 MQ 和观察者模式,能分析同步/异步权衡,知道如何处理异常