跳到主要内容Prompt 驱动的 ETL 流程自动化实战指南 | 极客日志PythonAI算法
Prompt 驱动的 ETL 流程自动化实战指南
本文探讨利用自然语言提示词(Prompt)引导大模型实现 ETL 流程自动化的方案。通过抽取、转换、加载三阶段的 Prompt 设计,解决传统 ETL 技术门槛高、迭代慢的问题。涵盖 MySQL、API、CSV 等数据源处理,以及数据清洗、字段计算、多表关联等转换操作,并提供完整 Python 代码模板与实战案例,助力实现低门槛、高效率的数据自动化处理。
晚风叙旧3 浏览 Prompt 驱动的 ETL 流程自动化实战指南
在数据驱动决策的时代,ETL(Extract-抽取、Transform-转换、Load-加载)作为数据管道的核心环节,直接决定了数据从原始状态到可用资产的转化效率。传统 ETL 流程依赖工程师编写大量代码(如 SQL、Python)、配置复杂工具,往往面临技术门槛高、迭代效率低、场景适配难三大痛点。
而利用自然语言指令定义 ETL 规则,让大语言模型(LLM)自动生成处理逻辑或代码,能从根本上解决上述问题。本文将聚焦这一模式的核心价值,分阶段详解抽取、转换、加载的 Prompt 设计逻辑与实战案例,并提供完整代码模板,助力实现自然语言驱动的 ETL 自动化转型。
一、核心价值:为什么用 Prompt 做 ETL?
- 降本提效:将搭建周期从'天级'压缩至'小时级',非技术人员可通过自然语言参与流程设计,减少跨岗位沟通成本。
- 灵活适配:无需修改底层代码,仅通过调整 Prompt 即可适配新数据源、新业务规则,尤其擅长处理半结构化/非结构化数据的预处理。
- 低门槛复用:将复杂 ETL 逻辑封装为'Prompt 模板',后续同类场景仅需替换变量(如数据源路径、字段映射关系)即可快速复用。
二、Prompt 设计逻辑与原则
ETL 流程的三阶段目标不同,对应的 Prompt 设计需遵循'场景化指令 + 明确约束 + 示例引导'的核心逻辑。
(一)通用设计原则
无论针对哪个阶段,Prompt 需满足以下四点,确保 LLM 输出符合预期:
- 目标明确化:清晰说明该阶段需达成的具体结果,避免模糊表述。
- 上下文完整化:提供数据源信息、业务规则、工具限制。
- 格式约束化:指定输出格式(如'生成可直接运行的 Python 代码')。
- 示例引导化:若场景复杂,可提供简单示例降低理解成本。
(二)各阶段侧重点
| ETL 阶段 | 核心目标 | Prompt 设计侧重点 |
|---|
| 抽取 (Extract) | 准确获取目标数据,避免遗漏或冗余 | 明确数据源类型、访问方式、筛选条件、数据预览 |
| 转换 (Transform) | 清洗、整合数据,确保数据质量 | 拆解转换步骤、明确业务规则、校验指标 |
| 加载 (Load) | 写入目标存储,确保格式兼容、高效 | 指定目标类型、写入策略、日志输出 |
抽取阶段的核心是'精准获取目标数据'。
场景 1:从 MySQL 数据库抽取数据
Prompt 指令要点:
- 明确数据库地址、端口、库名、表名及字段。
- 定义筛选条件(时间范围、状态排除等)。
- 要求分批读取防止内存溢出,并包含异常处理。
代码实现思路:
我们通常使用 pymysql 连接,配合 LIMIT + OFFSET 进行分页查询。敏感信息(如密码)应设为变量而非硬编码。以下是生成的 Python 代码示例:
import pymysql
pandas pd
pymysql.cursors DictCursor
():
db_config = {
: ,
: ,
: ,
: ,
: ,
:
}
target_fields =
start_time =
end_time =
batch_size =
all_orders = []
:
connection = pymysql.connect(**db_config, cursorclass=DictCursor)
cursor = connection.cursor()
()
count_sql =
cursor.execute(count_sql)
total_count = cursor.fetchone()[]
()
offset (, total_count, batch_size):
extract_sql =
cursor.execute(extract_sql)
batch_data = cursor.fetchall()
all_orders.extend(batch_data)
()
all_orders:
df_preview = pd.DataFrame(all_orders).head()
df_preview[] = df_preview[].astype().[-:]
()
(df_preview.to_string(index=))
pd.DataFrame(all_orders)
Exception e:
()
:
():
cursor.close()
() connection.:
connection.close()
()
__name__ == :
extracted_data = extract_mysql_orders()
extracted_data :
()
import
as
from
import
def
extract_mysql_orders
"host"
"192.168.1.100"
"port"
3306
"user"
"your_username"
"password"
"your_password"
"db"
"ecommerce_db"
"charset"
"utf8mb4"
"order_id, user_id, order_time, amount, pay_status"
"2024-01-01 00:00:00"
"2024-02-01 00:00:00"
1000
try
print
"MySQL 数据库连接成功"
f"""
SELECT COUNT(*) AS total FROM orders
WHERE order_time >= '{start_time}' AND order_time < '{end_time}'
AND pay_status != '退款'
"""
"total"
print
f"符合条件的总数据量:{total_count} 条"
for
in
range
0
f"""
SELECT {target_fields} FROM orders
WHERE order_time >= '{start_time}' AND order_time < '{end_time}'
AND pay_status != '退款' LIMIT {batch_size} OFFSET {offset}
"""
print
f"已抽取 {len(all_orders)}/{total_count} 条数据"
if
5
"user_id"
"user_id"
str
str
4
print
"\n数据预览(前 5 条):"
print
False
return
except
as
print
f"抽取数据失败:{str(e)}"
return
None
finally
if
"cursor"
in
locals
if
"connection"
in
locals
and
open
print
"MySQL 数据库连接已关闭"
if
"__main__"
if
is
not
None
print
f"\n抽取完成,共获取 {len(extracted_data)} 条订单数据"
- 安全设计:密码设为变量,避免泄露;分批读取适合百万级数据。
- 用户体验:添加进度提示和数据预览,方便验证。
- 异常处理:覆盖连接失败、查询报错场景。
场景 2:从 API 接口抽取数据
对于 API 数据,稳定性至关重要。我们需要在 Prompt 中强调重试机制和超时设置。
代码实现思路:
使用 requests 库,配合循环遍历分页,并在请求失败时增加重试逻辑。
import requests
import time
import csv
from typing import List, Dict
def extract_api_products():
API_URL = "https://api.ecommerce.com/v1/products"
APP_KEY = "your_app_key"
CATEGORY_ID = 3
PAGE_SIZE = 20
TIMEOUT = 10
RETRY_TIMES = 2
RETRY_INTERVAL = 3
OUTPUT_CSV = "./products_electronic.csv"
all_products: List[Dict] = []
current_page = 1
total_page = 1
print(f"开始抽取分类 {CATEGORY_ID}(电子产品)商品数据")
try:
while current_page <= total_page:
params = {
"app_key": APP_KEY,
"category_id": CATEGORY_ID,
"page": current_page,
"page_size": PAGE_SIZE
}
retry_count = 0
while retry_count < RETRY_TIMES:
try:
response = requests.get(API_URL, params=params, timeout=TIMEOUT)
response.raise_for_status()
result = response.json()
if result.get("code") != 200:
raise Exception(f"API 响应错误:code={result.get('code')}")
page_products = result.get("data", [])
total_page = result.get("total_page", 1)
all_products.extend(page_products)
print(f"第 {current_page} 页抽取成功,获取 {len(page_products)} 条数据")
break
except Exception as e:
retry_count += 1
if retry_count >= RETRY_TIMES:
raise Exception(f"第 {current_page} 页抽取失败:{str(e)}")
print(f"第 {current_page} 页抽取失败,{RETRY_INTERVAL} 秒后重试")
time.sleep(RETRY_INTERVAL)
current_page += 1
if all_products:
fieldnames = all_products[0].keys()
with open(OUTPUT_CSV, "w", encoding="utf-8", newline="") as f:
writer = csv.DictWriter(f, fieldnames=fieldnames)
writer.writeheader()
writer.writerows(all_products)
print(f"\n抽取完成,共获取 {len(all_products)} 条商品数据")
return all_products
except Exception as e:
print(f"\n抽取过程异常终止:{str(e)}")
return None
if __name__ == "__main__":
extract_api_products()
四、转换阶段(Transform)实战
转换是 ETL 中最复杂的环节,涉及清洗、计算、关联等。
场景 1:数据清洗(缺失值、异常值)
- 明确缺失值填充规则(如'未知用户')。
- 定义异常值修正逻辑(如金额<0 修正为 0)。
- 要求保留原始数据副本,新增标记字段。
import pandas as pd
import numpy as np
def transform_clean_orders(df_orders: pd.DataFrame) -> pd.DataFrame:
df_orders_clean = df_orders.copy()
print("=== 开始订单数据清洗 ===")
missing_before = df_orders_clean.isnull().sum()
print(f"清洗前缺失值统计:{missing_before[missing_before > 0]}")
user_id_missing_mask = df_orders_clean["user_id"].isnull()
df_orders_clean.loc[user_id_missing_mask, "user_id"] = (
"未知用户_" + df_orders_clean.loc[user_id_missing_mask, "order_id"]
)
order_time_missing_mask = df_orders_clean["order_time"].isnull()
df_orders_clean.loc[order_time_missing_mask, "order_time"] = "2099-12-31 23:59:59"
df_orders_clean["amount_flag"] = "正常"
neg_mask = df_orders_clean["amount"] < 0
if neg_mask.sum() > 0:
df_orders_clean.loc[neg_mask, "amount"] = 0
df_orders_clean.loc[neg_mask, "amount_flag"] = "异常 - 金额为负"
high_mask = df_orders_clean["amount"] > 10000
if high_mask.sum() > 0:
df_orders_clean.loc[high_mask, "amount"] = 10000
df_orders_clean.loc[high_mask, "amount_flag"] = "异常 - 超最高限额"
print("=== 订单数据清洗完成 ===")
return df_orders_clean
场景 2:多表关联
- 明确关联键(如
user_id)和关联方式(内连接)。
- 定义派生字段计算逻辑(如用户资历天数)。
import pandas as pd
def transform_join_tables(df_orders_transformed: pd.DataFrame, df_users: pd.DataFrame) -> pd.DataFrame:
print("=== 开始订单表与用户表关联 ===")
df_users_processed = df_users.copy()
df_users_processed["register_time"] = pd.to_datetime(
df_users_processed["register_time"], format="%Y-%m-%d"
)
df_joined = pd.merge(
left=df_orders_transformed,
right=df_users_processed,
on="user_id",
how="inner",
suffixes=("", "_user")
)
df_joined["user_seniority"] = (
df_joined["order_time"] - df_joined["register_time"]
).dt.days
df_joined.loc[df_joined["user_seniority"] < 0, "user_seniority"] = 0
def get_level_discount(level):
if level == "钻石": return 0.05
elif level == "黄金": return 0.03
else: return 0.00
df_joined["user_level_discount"] = df_joined["user_level"].apply(get_level_discount)
df_joined["user_level_discount_str"] = df_joined["user_level_discount"].apply(lambda x: f"{x*100:.1f}%")
print("=== 关联完成 ===")
return df_joined
五、加载阶段(Load)实战
加载阶段需将数据写入目标存储,常见包括文件(CSV/Parquet)和数据库(MySQL/Hive)。
场景 1:加载到 CSV/Parquet 文件
- 指定文件名规则(含日期)。
- 要求自动创建目录,失败时清理不完整文件。
- 校验写入后的数据完整性。
import pandas as pd
import os
from datetime import datetime
import pyarrow as pa
import pyarrow.parquet as pq
def load_to_files(df_joined: pd.DataFrame) -> bool:
print("=== 开始将数据加载到文件 ===")
load_success = False
try:
current_date = datetime.now().strftime("%Y%m%d")
output_dir = "./etl_output/"
csv_filename = f"orders_joined_{current_date}.csv"
parquet_filename = f"orders_joined_{current_date}.parquet"
csv_path = os.path.join(output_dir, csv_filename)
parquet_path = os.path.join(output_dir, parquet_filename)
if not os.path.exists(output_dir):
os.makedirs(output_dir, exist_ok=True)
df_joined.to_csv(path_or_buf=csv_path, index=False, encoding="utf-8", na_rep="NA")
print(f"CSV 文件写入完成")
table = pa.Table.from_pandas(df_joined)
pq.write_to_dataset(table, root_path=parquet_path, compression="snappy", overwrite=True)
print(f"Parquet 文件写入完成")
load_success = True
except Exception as e:
print(f"加载失败:{str(e)}")
if os.path.exists(csv_path): os.remove(csv_path)
if os.path.exists(parquet_path): shutil.rmtree(parquet_path)
return load_success
场景 2:加载到 MySQL 数据库
- 定义表结构(字段类型、主键)。
- 要求批量插入优化性能。
- 加载前后校验行数一致性。
import pandas as pd
from sqlalchemy import create_engine
from sqlalchemy.exc import SQLAlchemyError
def load_to_mysql(df_joined: pd.DataFrame) -> bool:
print("=== 开始将数据加载到 MySQL 数据库 ===")
load_success = False
engine = None
try:
db_config = {
"host": "192.168.1.100",
"port": 3306,
"db": "ecommerce_etl",
"user": "etl_user",
"password": "etl_password",
"charset": "utf8mb4"
}
table_name = "orders_joined"
conn_str = f"mysql+pymysql://{db_config['user']}:{db_config['password']}@{db_config['host']}:{db_config['port']}/{db_config['db']}?charset={db_config['charset']}"
engine = create_engine(conn_str)
create_table_sql = f"""
CREATE TABLE IF NOT EXISTS {table_name} (
order_id VARCHAR(20) NOT NULL PRIMARY KEY,
user_id VARCHAR(20) NOT NULL,
amount DECIMAL(10,2) NOT NULL,
order_time DATETIME NOT NULL
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
"""
with engine.connect() as conn:
conn.execute(create_table_sql)
conn.execute(f"TRUNCATE TABLE {table_name};")
df_joined.to_sql(name=table_name, con=engine, if_exists="append", index=False, chunksize=1000, method="multi")
with engine.connect() as conn:
result = conn.execute(f"SELECT COUNT(*) AS total FROM {table_name}.").fetchone()
if result["total"] == len(df_joined):
load_success = True
print("加载校验成功")
except SQLAlchemyError as e:
print(f"数据库错误:{str(e)}")
finally:
if engine is not None:
engine.dispose()
return load_success
六、完整 ETL 流程模板
将上述逻辑整合,可形成一套完整的 Prompt 模板,便于后续调用。
请作为数据工程师,生成一套完整的 ETL 自动化 Python 代码,处理'电商订单数据',具体要求如下:
#
1. 订单数据源:MySQL 数据库...
2. 用户数据源:CSV 文件...
#
#
#
#
#
1. 生成完整 Python 代码,包含函数封装...
2. 敏感信息设为变量...
基于此模板生成的代码将包含 extract_order_data, transform_data, load_data 等模块,并具备完整的日志打印和异常捕获机制。
七、优势与注意事项
核心优势
- 降低技术门槛:非技术人员可通过自然语言定义规则。
- 提升迭代效率:业务规则变化时,仅需修改 Prompt。
- 场景适配灵活:擅长处理半结构化数据。
关键注意事项
- 敏感信息保护:Prompt 中避免直接填写密码,使用占位符。
- 数据质量校验:LLM 生成的代码可能存在逻辑漏洞,务必小批量测试。
- 性能优化边界:超大规模数据需手动优化批量大小或索引。
- 模型选型适配:复杂流程建议使用 ChatGPT-4 等强模型,开源模型需搭配 Few-Shot 示例。
八、课后练习
基础练习
修改 Prompt 指令,实现'从 CSV 订单数据中抽取 2024 年 2 月的未支付订单,清洗金额异常值(>5000 标记为'待审核'),并加载到 Excel 文件'。
进阶实战
设计 Prompt 并生成代码,实现'多数据源(MySQL 订单表 + API 用户表 + CSV 商品表)的 ETL 自动化,最终加载到 Hive 数据仓库'。
通过以上练习,可深入掌握 Prompt 与 ETL 流程的结合逻辑,逐步实现从'手动编写 ETL 代码'到'自然语言驱动自动化'的转型。
相关免费在线工具
- 加密/解密文本
使用加密算法(如AES、TripleDES、Rabbit或RC4)加密和解密文本明文。 在线工具,加密/解密文本在线工具,online
- RSA密钥对生成器
生成新的随机RSA私钥和公钥pem证书。 在线工具,RSA密钥对生成器在线工具,online
- Mermaid 预览与可视化编辑
基于 Mermaid.js 实时预览流程图、时序图等图表,支持源码编辑与即时渲染。 在线工具,Mermaid 预览与可视化编辑在线工具,online
- 随机西班牙地址生成器
随机生成西班牙地址(支持马德里、加泰罗尼亚、安达卢西亚、瓦伦西亚筛选),支持数量快捷选择、显示全部与下载。 在线工具,随机西班牙地址生成器在线工具,online
- Gemini 图片去水印
基于开源反向 Alpha 混合算法去除 Gemini/Nano Banana 图片水印,支持批量处理与下载。 在线工具,Gemini 图片去水印在线工具,online
- curl 转代码
解析常见 curl 参数并生成 fetch、axios、PHP curl 或 Python requests 示例代码。 在线工具,curl 转代码在线工具,online