ETL 概述
ETL(Extract-Transform-Load)是数据仓库构建的核心环节,负责将分散的数据源汇聚、清洗并加载到目标存储中。
数据源 → Extract → Transform → Load → 数据仓库
传统批处理模式在实际业务中逐渐暴露出一些瓶颈:
- 延迟高:通常是 T+1 甚至更久,无法满足实时决策需求
- 资源波动大:处理时间长,容易在夜间形成资源峰值
- 容错难:一旦任务失败,排查和重跑成本较高
批处理 ETL 方案
Sqoop 与 HDFS 交互
Sqoop 是早期常用的数据库与 Hadoop 生态之间的桥梁工具。它支持全量和增量导入,配置相对简单。
# 全量导入示例
sqoop import \
--connect jdbc:mysql://mysql:3306/order_db \
--username root \
--password password \
--table orders \
--target-dir /data/orders \
--delete-target-dir \
--num-mappers 4 \
--fields-terminated-by ','
# 增量导入示例
sqoop import \
--connect jdbc:mysql://mysql:3306/order_db \
--username root \
--password password \
--table orders \
--target-dir /data/orders \
--incremental append \
--check-column order_id \
--last-value 10000 \
--num-mappers 2
Spark Batch ETL
随着大数据计算引擎的发展,Spark 成为了批处理的主流选择。利用 PySpark 可以方便地编写转换逻辑。
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, avg, sum, count
spark = SparkSession.builder \
.appName("DailyOrderETL") \
.getOrCreate()
# 读取数据
orders_df = spark.read \
.format("jdbc") \
.option("url", "jdbc:mysql://mysql:3306/order_db") \
.option("dbtable", "orders") \
.option("user", "root") \
.option("password", "password").load()
order_items_df = spark.read \
.format("jdbc") \
.option("url", ) \
.option(, ) \
.option(, ) \
.option(, ).load()
daily_stats = orders_df \
.(col() == ) \
.join(order_items_df, orders_df.order_id == order_items_df.order_id) \
.groupBy(, ) \
.agg(
count().alias(),
().alias(),
avg().alias()
)
daily_stats.write \
.() \
.mode() \
.partitionBy() \
.saveAsTable()


