管道-过滤器架构
一个 ETL 系统的崩塌
2020年双十一大促前夜,我们上线了新版数据中台 ETL 系统。
大促当天,实时数据从 Kafka 流入 ETL 管道:解析 → 清洗 → 转换 → 聚合 → 写入下游。凌晨0点大促开始,流量峰值是平时的 20 倍。
ETL 管道在凌晨 0:15 开始出现严重延迟——消息处理不过来了。最要命的是,整个 ETL 是一个单体程序,解析、清洗、转换、聚合全耦合在一起。要扩容,只能整体扩容;要修一个环节的 bug,只能停掉整个管道。
从凌晨 0:15 发现问题,到凌晨 2:30 紧急扩容完成恢复,延迟了将近 2 个小时。运营日报的数据全部延迟,品牌方的实时大屏也黑了。
这次事故之后,我们重构成了管道-过滤器架构。
问题定义
传统单体 ETL 或数据处理系统的问题:
- 紧耦合:每个处理环节依赖其他环节的数据结构和状态
- 难以扩展:某个环节成为瓶颈时,只能整体扩容
- 难以测试:无法单独测试某个处理步骤
- 难以复用:相同的解析逻辑在多个地方重复
- 单点故障:一个环节崩了,整个管道停摆
【架构权衡】
管道-过滤器架构的核心思想是:将数据处理过程分解为独立的、可组合的处理单元。每个过滤器(Filter)只做一件事,单元之间通过管道(Pipe)连接,数据像水流一样在管道中流动。
这就像工厂流水线——每个工位只完成一道工序,工序之间用传送带连接。任何工位可以单独优化、单独替换、单独扩容。
核心概念
graph LR
A[数据源] --> B[Filter 1<br/>解析]
B --> C[Filter 2<br/>清洗]
C --> D[Filter 3<br/>转换]
D --> E[Filter 4<br/>聚合]
E --> F[数据汇]
style B fill:#e1f5ff
style C fill:#e1f5ff
style D fill:#e1f5ff
style E fill:#e1f5ff
管道(Pipe):负责在过滤器之间传递数据,通常是阻塞队列或异步消息通道
过滤器(Filter):独立的处理单元,输入数据,输出处理后的数据
数据源(Source):数据入口,如 Kafka、文件、数据库
数据汇(Sink):数据出口,如数据库、消息队列、文件系统
方案演进
方案A:单体管道(传统做法)
// 单体 ETL:所有逻辑耦合在一起
class MonolithicETL {
public void process(List<String> rawData) {
for (String record : rawData) {
// 解析
OrderEvent event = parse(record);
// 清洗
if (!validate(event)) continue;
// 转换
OrderModel model = transform(event);
// 聚合
aggregator.aggregate(model);
}
}
}
优点:简单,开发周期短,适合小数据量
缺点:紧耦合,无法独立扩展,单点故障
适用场景:数据量稳定、处理逻辑简单、无高并发要求
方案B:管道-过滤器(轻量版)
// 管道接口
interface Pipe<T> {
void write(T data);
void connect(Filter<T, ?> next);
}
// 过滤器接口
interface Filter<T, R> {
R process(T input);
}
// 解析过滤器
class ParserFilter implements Filter<String, OrderEvent> {
@Override
public OrderEvent process(String input) {
return JSON.parseObject(input, OrderEvent.class);
}
}
// 清洗过滤器
class ValidationFilter implements Filter<OrderEvent, OrderEvent> {
@Override
public OrderEvent process(OrderEvent input) {
if (input.getAmount() == null || input.getAmount().compareTo(BigDecimal.ZERO) <= 0) {
return null; // 过滤掉非法数据
}
return input;
}
}
// 转换过滤器
class TransformFilter implements Filter<OrderEvent, OrderModel> {
@Override
public OrderModel process(OrderEvent input) {
if (input == null) return null;
OrderModel model = new OrderModel();
model.setOrderId(input.getId());
model.setAmount(input.getAmount());
model.setTimestamp(input.getCreatedAt());
model.setCategory(categoryMapper.map(input.getCategoryCode()));
return model;
}
}
// 聚合过滤器(状态ful,需要内部状态)
class AggregationFilter implements Filter<OrderModel, Void> {
private Map<String, OrderStats> statsMap = new ConcurrentHashMap<>();
@Override
public Void process(OrderModel input) {
if (input == null) return null;
statsMap.compute(input.getCategory(), (k, v) -> {
if (v == null) v = new OrderStats();
v.setOrderCount(v.getOrderCount() + 1);
v.setTotalAmount(v.getTotalAmount().add(input.getAmount()));
return v;
});
return null;
}
public Map<String, OrderStats> getStats() { return statsMap; }
}
// 管道组装
class Pipeline {
private Filter head;
private Filter tail;
public Pipeline add(Filter filter) {
if (head == null) {
head = tail = filter;
} else {
tail.connect(filter);
tail = filter;
}
return this;
}
}
优点:过滤器独立,可单独测试,可复用
缺点:Java 接口方式有对象创建开销;在 JVM 中不如流式 API 高效
适用场景:中等规模数据处理,需要一定灵活性
方案C:Java Stream 管道(生产推荐)
Java 8+ 的 Stream API 天然就是管道-过滤器的实现。
// 使用 Stream 实现管道过滤器
public class StreamBasedETL {
public Stream<OrderModel> buildPipeline(Stream<String> rawData) {
return rawData
.map(this::parse) // Filter: 解析
.filter(Objects::nonNull) // Filter: 清洗
.filter(this::validate)
.map(this::transform) // Filter: 转换
.filter(Objects::nonNull);
}
// 大促期间用并行流加速
public Stream<OrderModel> buildParallelPipeline(Stream<String> rawData) {
return rawData
.parallel() // 并行处理
.map(this::parse)
.filter(Objects::nonNull)
.filter(this::validate)
.map(this::transform)
.filter(Objects::nonNull);
}
private OrderEvent parse(String record) {
try {
return objectMapper.readValue(record, OrderEvent.class);
} catch (JsonProcessingException e) {
log.warn("解析失败: {}", record, e);
return null;
}
}
private boolean validate(OrderEvent event) {
return event.getAmount() != null
&& event.getAmount().compareTo(BigDecimal.ZERO) > 0
&& event.getId() != null;
}
private OrderModel transform(OrderEvent event) {
OrderModel model = new OrderModel();
model.setOrderId(event.getId());
model.setAmount(event.getAmount());
model.setTimestamp(event.getCreatedAt());
model.setCategory(categoryMapper.map(event.getCategoryCode()));
return model;
}
}
方案D:分布式管道(高并发场景)
当单机无法承载数据量时,需要分布式管道。
graph TD
A[Kafka Source] --> B[Parse Filter<br/>Flink Operator]
B --> C[Kafka Topic<br/>中间队列]
C --> D[Validate Filter<br/>Flink Operator]
D --> E[Kafka Topic<br/>中间队列]
E --> F[Transform Filter<br/>Flink Operator]
F --> G[Kafka Topic<br/>中间队列]
G --> H[Aggregate Filter<br/>Flink Operator]
H --> I[Data Warehouse]
B1[Parse Instance 2] --> C
B2[Parse Instance 3] --> C
D1[Validate Instance 2] --> E
D2[Validate Instance 3] --> E
// Flink 分布式管道实现
public class FlinkETLPipeline {
public void buildPipeline(StreamExecutionEnvironment env) {
// Source: 从 Kafka 读取原始数据
DataStream<String> rawStream = env.addSource(
new FlinkKafkaConsumer<>(
"order-events",
new SimpleStringSchema(),
kafkaProps
)
);
// 解析
DataStream<OrderEvent> parsedStream = rawStream
.map(record -> {
try {
return objectMapper.readValue(record, OrderEvent.class);
} catch (JsonProcessingException e) {
return null;
}
})
.filter(Objects::nonNull);
// 清洗
DataStream<OrderEvent> validatedStream = parsedStream
.filter(e -> e.getAmount() != null
&& e.getAmount().compareTo(BigDecimal.ZERO) > 0
&& e.getId() != null);
// 按类目分组的聚合(10秒滚动窗口)
DataStream<CategoryStats> statsStream = validatedStream
.keyBy(OrderEvent::getCategory)
.window(TumblingProcessingTimeWindows.of(Time.seconds(10)))
.aggregate(new CategoryAggregator());
// Sink: 写入下游
statsStream.addSink(
new FlinkKafkaProducer<>("order-stats", new JsonSerializationSchema())
);
}
}
// 自定义聚合器
class CategoryAggregator implements AggregateFunction<OrderEvent, OrderStats, CategoryStats> {
@Override
public OrderStats createAccumulator() {
return new OrderStats();
}
@Override
public OrderStats add(OrderEvent value, OrderStats accumulator) {
accumulator.addOrder(value);
return accumulator;
}
@Override
public CategoryStats getResult(OrderStats accumulator) {
return accumulator.toCategoryStats();
}
@Override
public OrderStats merge(OrderStats a, OrderStats b) {
a.merge(b);
return a;
}
}
关键设计决策
管道缓冲与反压
过滤器之间用有界队列(Bounded Queue)连接。当下游处理速度慢于上游时,队列会满,上游自动被反压(Back Pressure)。
// BlockingQueue 实现管道缓冲
class BlockingQueuePipe<T> implements Pipe<T> {
private BlockingQueue<T> queue;
private Filter<T, ?> nextFilter;
private ExecutorService executor;
public BlockingQueuePipe(int capacity) {
this.queue = new LinkedBlockingQueue<>(capacity);
}
@Override
public void write(T data) {
boolean offered = queue.offer(data, 100, TimeUnit.MILLISECONDS);
if (!offered) {
// 队列满了,触发反压策略
// 1. 丢弃(熔断)
// 2. 阻塞等待
// 3. 告警 + 降级
handleBackPressure(data);
}
}
@Override
public void connect(Filter<T, ?> next) {
this.nextFilter = next;
this.executor = Executors.newSingleThreadExecutor();
this.executor.submit(this::consumeLoop);
}
private void consumeLoop() {
while (true) {
try {
T data = queue.poll(1, TimeUnit.SECONDS);
if (data != null && nextFilter != null) {
Object result = nextFilter.process(data);
if (result != null && nextPipe != null) {
nextPipe.write(result);
}
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
break;
}
}
}
}
【架构权衡】
管道缓冲大小的选择是一个关键权衡:
- 队列太大:内存占用高,数据延迟大(数据在队列里等很久)
- 队列太小:容易触发反压,上游被迫降速或丢弃数据
经验值:
- 处理延迟敏感:
queue size = 100~1000
- 吞吐量优先:
queue size = 10000~50000
- 内存敏感:
queue size = 1000,配置好告警和熔断
过滤器热插拔
管道-过滤器架构最大的优势之一是可以在运行时修改管道拓扑。
// 动态管道管理器
class DynamicPipelineManager {
private Pipeline pipeline = new Pipeline();
private Map<String, Filter> registeredFilters = new ConcurrentHashMap<>();
// 注册过滤器
public void registerFilter(String name, Filter filter) {
registeredFilters.put(name, filter);
}
// 动态添加过滤器(在转换和聚合之间插入一个新的过滤器)
public void insertFilter(String after, String name) {
Filter target = registeredFilters.get(after);
Filter newFilter = registeredFilters.get(name);
pipeline.insertAfter(target, newFilter);
}
// 动态移除过滤器(不重启服务)
public void removeFilter(String name) {
Filter filter = registeredFilters.get(name);
pipeline.remove(filter);
}
// 下游出问题时,快速切换到降级管道
public void switchToDegradedPipeline() {
pipeline.clear();
pipeline.add(registeredFilters.get("parse"))
.add(registeredFilters.get("validate"))
.add(registeredFilters.get("simpleTransform")) // 降级:跳过复杂转换
.add(registeredFilters.get("basicAggregate"));
}
}
生产避坑
坑1:过滤器顺序不当导致数据倾斜
聚合过滤器如果放在管道前端,会导致大量中间数据堆积。
场景:将 GroupByKey 聚合放在管道前 10% 的位置,所有流量都打到同一个聚合实例。
正确做法:遵循"先过滤、再转换、最后聚合"的原则。减少数据量后再做重量级操作。
坑2:NullPointerException 在管道中扩散
一个过滤器返回 null,如果下游没有做空值保护,会导致整个管道崩溃。
// 错误的写法:没有空值保护
DataStream<OrderModel> result = rawData
.map(this::transform) // transform 可能返回 null
.map(model -> model.toStats()); // NPE!
// 正确的写法:每一步都保护
DataStream<OrderModel> result = rawData
.map(this::transform)
.filter(Objects::nonNull) // 关键:过滤掉 null
.map(model -> model.toStats());
坑3:状态ful 过滤器无法并行
聚合类过滤器有内部状态,如果不小心用了并行流,状态会错乱。
// 错误:并行流 + 有状态聚合 = 数据错乱
rawData.parallelStream()
.map(this::parse)
.filter(this::validate)
.forEach(model -> {
statsMap.compute(...) // 并行写入同一个 Map,数据会错乱
});
// 正确:用串行流或者分区聚合
rawData.stream() // 串行流
.map(this::parse)
.forEach(model -> statsMap.compute(...));
坑4:管道启动时数据丢失
新过滤器上线时,Kafka 或消息队列中积压的数据可能被重复消费或漏掉。
解决方案:
- 使用 Flink 的 exactly-once 语义(checkpoint)
- 管道启动前记录消费位点,启动后从断点继续
- 关键管道上线前做小流量灰度
工程代价评估
落地 Checklist
常见应用场景
场景1:日志处理流水线
rawLogStream
.map(LogParser::parse) // 解析
.filter(Log::isValid) // 过滤无效日志
.map(LogNormalizer::normalize) // 标准化
.keyBy(Log::getTraceId)
.window(TumblingEventTimeWindows.of(Time.minutes(5)))
.process(new LogAggregator()) // 聚合
.addSink(ElasticsearchSink);
场景2:ETL 数据管道
// Source -> Parse -> Dedupe -> Transform -> Enrich -> Sink
Pipeline etl = new Pipeline();
etl.add(new BinaryParser()) // 二进制解析
.add(new DedupeFilter(redisClient)) // 去重(基于 Redis)
.add(new SchemaTransform()) // Schema 转换
.add(new EnrichFilter(userService)) // 数据丰富
.add(new ElasticSearchSink()); // 写入 ES
场景3:实时告警处理
// Metric -> Parse -> ThresholdCheck -> GroupBy -> Aggregate -> Alert
metricStream
.map(MetricParser::parse)
.filter(m -> m.getValue() > threshold)
.keyBy(Metric::getMetricName)
.window(Time.minutes(5))
.aggregate(new AlertAggregator(threshold))
.filter(stats -> stats.getCount() > alertMinCount)
.addSink(AlertNotifier);
【架构权衡】
管道-过滤器是处理"数据流"问题的经典架构,但要注意它不是万能的:
- 当管道层次数超过 10 层时,延迟会成为主要瓶颈,考虑合并轻量级过滤器
- 当过滤器之间的数据格式差异很大时,转换过滤器会变成维护噩梦,考虑引入 schema registry
- 当需要复杂的条件路由(不是线性管道而是 DAG)时,考虑引入 Apache Camel 或 Flink 的侧输出