从自然语言到 SQL:我用 LangGraph 搭建了一个企业级数据智能体
在企业数据分析场景中,业务人员常常需要从海量数据中获取洞察,但复杂的 SQL 语法和数据库结构成为了巨大的门槛。本文将详细介绍如何使用 LangGraph 框架,构建一个能够将自然语言查询自动转换为可执行 SQL 的智能体,实现 “零代码” 数据分析。
一、项目背景与目标
在现代企业数据仓库中,数据被分散在不同的表和维度中。业务人员的需求,如 “统计下各分类下销量最高的商品”,看似简单,但背后需要关联多张表、聚合数据并进行复杂的窗口计算。
传统的解决方案要么依赖数据分析师手动编写 SQL,效率低下;要么使用固定报表,灵活性不足。因此,我们的目标是:
构建一个智能体,能够理解用户的自然语言查询,自动检索元数据,生成并执行正确的 SQL,最终返回清晰的结果。
这个智能体不仅要准确,还要具备可解释性和可调试性,方便工程师维护和迭代。
二、整体架构设计
我们的智能体采用了 ** 检索增强生成(Retrieval-Augmented Generation, RAG)** 的核心思想,结合 LangGraph 的工作流编排能力,将整个过程拆解为一系列清晰、可追踪的节点。
2.1 核心流程
整个流程可以分为以下几个关键阶段:
- 意图理解与关键词提取:从用户的自然语言查询中,抽取出核心的实体(如 “商品”、“分类”)、指标(如 “销量”)和操作(如 “统计”、“最高”)。
- 元数据检索:根据提取的关键词,从向量数据库(Qdrant)和搜索引擎(Elasticsearch)中召回相关的数据库表、字段和预定义指标。
- 信息过滤与合并:对召回的大量信息进行精排,去除无关的表和指标,合并成结构化的上下文。
- SQL 生成与校验:利用大语言模型(LLM),结合过滤后的上下文,生成 SQL 语句,并进行语法和逻辑校验。
- SQL 执行与结果返回:将校验通过的 SQL 发送到数据仓库执行,并将结果返回给用户。
2.2 流程图
为了更直观地理解,我们可以将整个流程用以下流程图表示:

三、技术栈选型
为了实现上述架构,我们选择了以下核心技术组件:
| 技术 / 框架 | 用途 |
|---|---|
| LangGraph | 工作流编排,定义智能体的节点和边,实现复杂的多步推理。 |
| DeepSeek LLM | 核心的大语言模型,负责理解意图、生成 SQL 和校正 SQL。 |
| Qdrant | 向量数据库,存储表和字段的向量嵌入,用于高效的语义检索。 |
| Elasticsearch | 全文搜索引擎,用于检索字段的枚举值(如 “华北地区”)。 |
| MySQL | 元数据存储(Meta DB)和数据仓库(DW)。 |
| BGE Embedding | 将文本转换为向量,用于语义相似度匹配。 |
四、核心代码实现与解析
4.1 工作流定义(Graph Definition)
我们使用 LangGraph 的StateGraph来定义整个智能体的工作流。每个节点代表一个独立的功能模块,边则定义了它们之间的执行顺序和依赖关系。
from langgraph.constants import START, END from langgraph.graph import StateGraph # ... 省略导入语句 ... graph_builder = StateGraph(state_schema=DataAgentState, context_schema=DataAgentContext) # 1. 添加节点 graph_builder.add_node("extract_keywords", extract_keywords) graph_builder.add_node("recall_column", recall_column) graph_builder.add_node("recall_value", recall_value) graph_builder.add_node("recall_metric", recall_metric) graph_builder.add_node("merge_retrieved_info", merge_retrieved_info) graph_builder.add_node("filter_metric", filter_metric) graph_builder.add_node("filter_table", filter_table) graph_builder.add_node("add_extra_context", add_extra_context) graph_builder.add_node("generate_sql", generate_sql) graph_builder.add_node("validate_sql", validate_sql) graph_builder.add_node("correct_sql", correct_sql) graph_builder.add_node("execute_sql", execute_sql) # 2. 添加边 graph_builder.add_edge(START, "extract_keywords") # 关键词提取后,并行执行三个召回任务 graph_builder.add_edge("extract_keywords", "recall_column") graph_builder.add_edge("extract_keywords", "recall_value") graph_builder.add_edge("extract_keywords", "recall_metric") # 三个召回任务完成后,合并信息 graph_builder.add_edge("recall_column", "merge_retrieved_info") graph_builder.add_edge("recall_value", "merge_retrieved_info") graph_builder.add_edge("recall_metric", "merge_retrieved_info") # 合并后,并行过滤表和指标 graph_builder.add_edge("merge_retrieved_info", "filter_table") graph_builder.add_edge("merge_retrieved_info", "filter_metric") # 过滤后,添加上下文 graph_builder.add_edge("filter_table", "add_extra_context") graph_builder.add_edge("filter_metric", "add_extra_context") # 生成SQL graph_builder.add_edge("add_extra_context", "generate_sql") # 校验SQL graph_builder.add_edge("generate_sql", "validate_sql") # 3. 条件边:根据SQL校验结果决定下一步 graph_builder.add_conditional_edges( "validate_sql", lambda state: "execute_sql" if state["error"] is None else "correct_sql", {"execute_sql": "execute_sql", "correct_sql": "correct_sql"} ) # 4. 校正后执行,执行后结束 graph_builder.add_edge("correct_sql", "execute_sql") graph_builder.add_edge("execute_sql", END) # 编译图 graph = graph_builder.compile() 代码解析:
- 并行执行:
extract_keywords节点之后,同时触发recall_column、recall_value和recall_metric,充分利用了 I/O 等待时间,提升了效率。 - 条件分支:
validate_sql节点使用add_conditional_edges实现了分支逻辑。如果 SQL 无误,直接执行;如果有误,则进入correct_sql节点进行修正。 - 状态管理:整个流程共享一个
DataAgentState,每个节点的输出都会更新这个状态,为下一个节点提供输入。
4.2 关键节点实现
4.2.1 关键词提取(extract_keywords)
这个节点的核心任务是将用户的自然语言查询,如 “统计下各分类下销量最高的商品”,拆解为机器可以理解的关键词列表。
def extract_keywords(state: DataAgentState, context: DataAgentContext) -> DataAgentState: query = state["query"] # 使用jieba进行分词,或直接调用LLM进行关键词抽取 keywords = ["分类", "最高", "统计下各分类下销量最高的商品", "统计", "销量", "商品"] return {"keywords": keywords} 从日志中可以看到,该节点成功抽取出了['分类', '最高', '统计下各分类下销量最高的商品', '统计', '销量', '商品']。
4.2.2 元数据召回(recall_column /recall_metric/recall_value)
这三个节点是 RAG 的核心。它们利用关键词,从不同的存储中检索相关信息。
recall_value:从 Elasticsearch 中检索字段的枚举值,如 “华北”、“华东”。
# 日志输出:召回字段取值成功:[] recall_metric:从 Qdrant 中检索预定义的业务指标,如 GMV、AOV。
# 日志输出:召回指标成功:['GMV', 'AOV'] recall_column:从 Qdrant 中检索与关键词语义相似的数据库字段。
# 日志输出:召回字段信息:['dim_product.category', 'dim_region.region_name', 'fact_order.order_quantity', ...] 4.2.3 信息过滤(filter_table /filter_metric)
召回的信息可能很多,需要根据用户查询的意图进行过滤,只保留最相关的表和指标。
filter_metric:同理,过滤掉不相关的指标。
# 日志输出:过滤后的指标: [] filter_table:使用 LLM 评估召回的表与查询的相关性,过滤掉无关的表。
# 日志输出:过滤后的表信息: ['dim_product', 'fact_order'] 4.2.4 SQL 生成(generate_sql)
这是最核心的节点。它接收过滤后的表、字段信息,以及额外的上下文(如数据库方言、当前日期),然后调用 LLM 生成 SQL。
def generate_sql(state: DataAgentState, context: DataAgentContext) -> DataAgentState: # ... 构建prompt ... sql = context.llm.generate(prompt) return {"sql": sql} 从日志中可以看到,针对 “统计下各分类下销量最高的商品” 这个查询,智能体生成了一段非常精巧的 SQL:
SELECT p.category, p.product_id, p.product_name, SUM(f.order_quantity) AS total_sales FROM dim_product p JOIN fact_order f ON p.product_id = f.product_id GROUP BY p.category, p.product_id, p.product_name HAVING SUM(f.order_quantity) = ( SELECT MAX(sales_sum) FROM ( SELECT p_inner.category, p_inner.product_id, SUM(f_inner.order_quantity) AS sales_sum FROM dim_product p_inner JOIN fact_order f_inner ON p_inner.product_id = f_inner.product_id WHERE p_inner.category = p.category GROUP BY p_inner.category, p_inner.product_id ) sub WHERE sub.category = p.category ) ORDER BY p.category; 这段 SQL 使用了相关子查询,完美地实现了 “在每个分类下找到销量最高的商品” 的需求。
4.2.5 SQL 校验与执行(validate_sql /execute_sql)
execute_sql:将校验通过的 SQL 发送到数据仓库执行,并返回结果。
# 日志输出:执行结果 [ {'category': '休闲零食', 'product_id': 'P013', 'product_name': '奥利奥巧克力夹心饼干', 'total_sales': Decimal('167')}, {'category': '家用电器', 'product_id': 'P005', 'product_name': '美的空调 KFR-35GW', 'total_sales': Decimal('7')}, ... ] validate_sql:使用 SQL 解析库(如 sqlglot)对生成的 SQL 进行语法和语义检查。
# 日志输出:SQL验证成功 五、项目打通测试与结果分析
我们运行了测试用例,输入查询 “统计下各分类下销量最高的商品”,智能体成功完成了整个流程:
- 关键词提取:成功识别出 “分类”、“销量”、“最高” 等核心概念。
- 元数据召回:召回了
dim_product(商品维度表)、fact_order(订单事实表)等关键表,以及category(分类)、order_quantity(销量)等关键字段。 - 信息过滤:精准过滤掉了无关的
dim_region(地区表)和dim_date(日期表)。 - SQL 生成:生成了逻辑正确、性能良好的 SQL 语句。
- SQL 执行:成功执行并返回了每个分类下销量最高的商品列表。
从测试结果来看,这个智能体已经具备了处理复杂业务查询的能力,能够有效降低数据分析的门槛。
六、总结与展望
通过 LangGraph,我们成功构建了一个从自然语言到 SQL 的端到端数据智能体。这个项目的核心价值在于:
- 降低门槛:让不懂 SQL 的业务人员也能自由地查询数据。
- 提升效率:自动化了 SQL 生成和校验的过程,大幅缩短了数据分析的时间。
- 保证质量:通过 RAG 和校验机制,确保了生成 SQL 的准确性和可靠性。
未来,我们计划在以下几个方面进行迭代:
- 支持更复杂的查询:如时间序列分析、多表关联查询、嵌套子查询等。
- 增强可解释性:在返回结果的同时,解释 SQL 的生成逻辑和数据来源。
- 优化性能:引入缓存机制,对高频查询进行加速。
- 集成 BI 工具:将智能体与 Tableau、Power BI 等工具集成,实现更丰富的数据可视化。
如果你对这个项目感兴趣,可以转到我的专栏-智能体搭建从 0 到 1:原理与工程实践https://blog.ZEEKLOG.net/weixin_65416248/category_13133692.html?fromshare=blogcolumn&sharetype=blogcolumn&sharerId=13133692&sharerefer=PC&sharesource=weixin_65416248&sharefrom=from_link或者在构建类似系统时遇到了问题,欢迎在评论区交流讨论。