【架构实战】ETL架构演进:从批处理到实时流处理
一、ETL概述
ETL(Extract-Extract-Transform-Load)是数据仓库的核心环节:
数据源 → Extract → Transform → Load → 数据仓库 ↓ 数据清洗 数据转换 数据聚合 传统ETL的问题:
- 批量处理,延迟高(T+1甚至更久)
- 处理时间长,资源占用峰值高
- 难以处理实时需求
- 错误难以追溯
二、批处理ETL
1. Sqoop(数据库↔HDFS)
# 全量导入 sqoop import\--connect jdbc:mysql://mysql:3306/order_db \--username root \--password password \--table orders \ --target-dir /data/orders \ --delete-target-dir \ --num-mappers 4\ --fields-terminated-by ','# 增量导入 sqoop import\--connect jdbc:mysql://mysql:3306/order_db \--username root \--password password \--table orders \ --target-dir /data/orders \--incremental append \ --check-column order_id \ --last-value 10000\ --num-mappers 22. Spark Batch ETL
from pyspark.sql import SparkSession from pyspark.sql.functions import col, avg,sum, count spark = SparkSession.builder \ .appName("DailyOrderETL") \ .getOrCreate()# 读取数据 orders_df = spark.read \ .format("jdbc") \ .option("url","jdbc:mysql://mysql:3306/order_db") \ .option("dbtable","orders") \ .option("user","root") \ .option("password","password").load() order_items_df = spark.read \ .format("jdbc") \ .option("url","jdbc:mysql://mysql:3306/order_db") \ .option("dbtable","order_items") \ .option("user","root") \ .option("password","password").load()# 数据转换 daily_stats = orders_df \ .filter(col("order_date")=="2024-01-15") \ .join(order_items_df, orders_df.order_id == order_items_df.order_id) \ .groupBy("order_date","shop_id") \ .agg( count("orders.order_id").alias("order_count"),sum("order_items.amount").alias("total_amount"), avg("order_items.amount").alias("avg_amount"))# 写入数据仓库 daily_stats.write \ .format("parquet") \ .mode("overwrite") \ .partitionBy("order_date") \ .saveAsTable("warehouse.daily_shop_stats")3. 数据清洗
# 数据清洗处理defclean_data(df):# 去除重复 df = df.dropDuplicates()# 处理空值 df = df.fillna({"phone":"UNKNOWN","address":"UNKNOWN"})# 数据标准化 df = df.withColumn("phone", regexp_replace(col("phone"),"[^0-9]","")) df = df.withColumn("order_time", to_timestamp(col("order_time"),"yyyy-MM-dd HH:mm:ss"))# 异常值处理 df = df.filter((col("amount")>0)&(col("amount")<1000000))return df 三、实时流处理ETL
1. Flink实时ETL
流处理优势:
- 延迟低(秒级甚至毫秒级)
- 资源使用更均匀
- 支持实时监控和告警
- 错误影响范围小
WordCount示例:
publicclassFlinkWordCount{publicstaticvoidmain(String[] args)throwsException{StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(2);// 读取数据源DataStream<String> text = env.socketTextStream("localhost",9999);// 转换和聚合DataStream<WordCount> counts = text .flatMap((line, out)->{for(String word : line.split("\\s")){ out.collect(newWordCount(word,1));}}).keyBy("word").window(TumblingProcessingTimeWindows.of(Time.seconds(10))).sum("count");// 输出 counts.print(); env.execute("WordCount");}}2. 实时数据同步
publicclassKafkaToHiveETL{publicstaticvoidmain(String[] args)throwsException{StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment(); env.enableCheckpointing(60000);// 每分钟检查点 env.getCheckpointConfig().setMinPauseBetweenCheckpoints(30000);KafkaSource<String> source =KafkaSource.<String>builder().setBootstrapServers("kafka:9092").setGroupId("order-etl-consumer").setTopics("orders").setValueOnlyDeserializer(newSimpleStringSchema()).setStartingOffsets(OffsetsInitializer.committedOffsets()).build();DataStream<String> stream = env.fromSource( source,WatermarkStrategy.noWatermarks(),"Kafka Source");// 解析JSONDataStream<Order> orders = stream .map(json ->JSON.parseObject(json,Order.class)).filter(order -> order.getAmount()>0);// 写入HiveFlinkHiveConnector.writeToHive(orders,"warehouse.orders"); env.execute("KafkaToHiveETL");}}3. 实时聚合
publicclassRealTimeShopStats{publicstaticvoidmain(String[] args)throwsException{StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();// 读取KafkaDataStream<Order> orders = env.addSource(newFlinkKafkaConsumer<>("orders",newOrderDeserializationSchema(),KafkaConfig.getProperties()));// 实时统计DataStream<ShopStats> stats = orders .keyBy(Order::getShopId).window(SlidingEventTimeWindows.of(Time.minutes(5),Time.minutes(1))).aggregate(newShopStatsAggregator());// 输出到Kafka(供下游消费) stats.addSink(newFlinkKafkaProducer<>("shop-realtime-stats",newShopStatsSerializationSchema(),KafkaConfig.getProperties()));// 输出到Redis(供仪表盘查询) stats.addSink(newRedisSink<>(redisConfig)); env.execute("RealTimeShopStats");}}// 聚合器publicclassShopStatsAggregatorimplementsAggregateFunction<Order,ShopStatsAccumulator,ShopStats>{@OverridepublicShopStatsAccumulatoradd(Order value,ShopStatsAccumulator acc){ acc.orderCount++; acc.totalAmount += value.getAmount(); acc.maxAmount =Math.max(acc.maxAmount, value.getAmount());return acc;}@OverridepublicShopStatsgetResult(ShopStatsAccumulator acc){returnnewShopStats( acc.getShopId(), acc.orderCount, acc.totalAmount, acc.totalAmount / acc.orderCount, acc.maxAmount );}}四、Lambda架构
1. 架构设计
实时层(Speed Layer) │ └──► 流处理(Flink/Spark Streaming) │ └──► 实时结果(Redis) 批处理层(Batch Layer) 服务层(Serving Layer) │ │ └──► 批量计算(Hive/Spark)──┼──► 合并结果 │ ┌─────────────┴─────────────┐ │ │ 查询时合并 全量历史数据 2. 实现示例
# 批处理层 - 计算全量指标defbatch_layer():# 计算每日商家统计 daily_stats = spark.sql(""" SELECT shop_id, COUNT(*) as order_count, SUM(amount) as total_amount, MAX(amount) as max_amount FROM orders WHERE order_date >= '2023-01-01' GROUP BY shop_id, order_date """) daily_stats.write \ .format("delta") \ .mode("overwrite") \ .partitionBy("order_date") \ .saveAsTable("warehouse.shop_daily_stats_batch")# 实时层 - 计算增量指标defspeed_layer():# 计算最近5分钟的统计 stats = orders \ .keyBy("shop_id") \ .window(SlidingEventTimeWindows.of(Time.minutes(5), Time.minutes(1))) \ .aggregate(ShopStatsAggregator()) stats.write \ .format("delta") \ .mode("append") \ .saveAsTable("warehouse.shop_realtime_stats")# 服务层 - 合并查询defserving_layer_query(shop_id):# 合并批处理和实时结果 result = spark.sql(f""" SELECT a.shop_id, a.total_amount + COALESCE(b.realtime_amount, 0) as total_amount, a.order_count + COALESCE(b.realtime_count, 0) as order_count FROM ( SELECT shop_id, total_amount, order_count FROM warehouse.shop_daily_stats_batch WHERE shop_id = {shop_id} AND order_date = CURRENT_DATE - 1 ) a LEFT JOIN ( SELECT shop_id, SUM(total_amount) as realtime_amount, SUM(order_count) as realtime_count FROM warehouse.shop_realtime_stats WHERE shop_id = {shop_id} GROUP BY shop_id ) b ON a.shop_id = b.shop_id """)return result.collect()[0]五、Kappa架构
1. 架构设计
只使用流处理,抛弃批处理层 数据源 → Kafka → Flink流处理 → 结果存储 │ ├──► 实时计算 │ └──► 全量历史重计算(Replay) 2. 全量历史重计算
publicclassKappaETL{publicstaticvoidmain(String[] args)throwsException{StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();// 1. 从Kafka读取历史数据(全量重放)DataStream<Order> historicalOrders = env .readFile(newParquetInputFormat<>(Path.fromPathString("/data/orders/parquet")),"/data/orders/parquet").map(line ->JSON.parseObject(line,Order.class));// 2. 从Kafka读取实时数据DataStream<Order> realtimeOrders = env.addSource(newFlinkKafkaConsumer<>("orders",newOrderDeserializationSchema(),KafkaConfig.getProperties()));// 3. 合并数据源DataStream<Order> allOrders = historicalOrders .union(realtimeOrders);// 4. 流处理计算DataStream<ShopStats> stats = allOrders .keyBy(Order::getShopId).window(SlidingEventTimeWindows.of(Time.minutes(5),Time.minutes(1))).aggregate(newShopStatsAggregator());// 5. 输出结果 stats.addSink(newRedisSink<>(redisConfig)); env.execute("KappaETL");}}六、ETL工具对比
| 工具 | 类型 | 优点 | 缺点 | 适用场景 |
|---|---|---|---|---|
| Sqoop | 批处理 | 简单易用 | 功能单一 | 数据库↔HDFS |
| DataX | 批处理 | 跨数据源 | 无流处理 | 离线同步 |
| Flink | 流处理 | 功能强大 | 复杂度高 | 实时ETL |
| Spark Streaming | 流处理 | 与Spark集成 | 延迟较高 | 近实时 |
| Kafka Streams | 流处理 | 轻量级 | 功能有限 | 简单流处理 |
七、总结
ETL架构经历了从批处理到实时流处理的演进:
- 批处理:T+1延迟,适合历史分析
- 实时处理:秒级延迟,适合实时监控
- Lambda:结合两者,但复杂度高
- Kappa:简化架构,统一使用流处理
选型建议:
- 离线分析:Spark批处理
- 实时监控:Flink流处理
- 复杂场景:Lambda或Kappa架构
个人观点,仅供参考