LangGraph 状态机:复杂 Agent 任务流程管理实战
LangGraph 是专为 LLM 应用设计的状态机工作流编排框架,用于管理复杂任务中的状态流转与异常处理。 LangGraph 的核心概念,包括状态定义、节点转换、持久化存储及错误恢复机制。通过智能客服系统的实战案例,展示了如何构建包含问候、意图识别、查询处理和结果确认的流程图。文章还总结了状态设计、转换逻辑优化、错误处理策略及性能优化的最佳实践,并分析了状态爆炸、死锁和一致性等常见陷阱的解决方案。

LangGraph 是专为 LLM 应用设计的状态机工作流编排框架,用于管理复杂任务中的状态流转与异常处理。 LangGraph 的核心概念,包括状态定义、节点转换、持久化存储及错误恢复机制。通过智能客服系统的实战案例,展示了如何构建包含问候、意图识别、查询处理和结果确认的流程图。文章还总结了状态设计、转换逻辑优化、错误处理策略及性能优化的最佳实践,并分析了状态爆炸、死锁和一致性等常见陷阱的解决方案。

LangGraph 是一个专门为 LLM(大语言模型)应用设计的工作流编排框架。它的核心理念是将复杂任务拆分为状态和转换,管理状态之间的流转逻辑,并处理任务执行过程中的各种异常情况。
想象一下购物过程:浏览商品 → 加入购物车 → 结算 → 支付,LangGraph 就是帮助我们管理这种流程的工具。它允许开发者构建具有循环、分支和持久化能力的有向图,非常适合需要多步推理或长期记忆的 Agent 系统。
状态就像是任务执行过程中的'检查点',定义了当前时刻系统的所有关键数据。在 Python 中,通常使用 TypedDict 来定义类型安全的状态结构。
from typing import TypedDict, List
class ShoppingState(TypedDict):
# 当前步骤标识
current_step: str
# 购物车商品列表
cart_items: List[str]
# 总金额
total_amount: float
# 用户输入历史
user_input: str
状态转换定义了任务流程的'路线图'。通过添加节点和边,可以控制数据如何在不同处理单元之间流动。支持条件边(Conditional Edges)以实现动态路由。
from langgraph.graph import StateGraph, END
class ShoppingController:
def define_transitions(self):
# 初始化图
self.graph = StateGraph(ShoppingState)
# 添加节点
self.graph.add_node("browse", self.browse_products)
self.graph.add_node("add_to_cart", self.add_to_cart)
self.graph.add_node("checkout", self.checkout)
self.graph.add_node("payment", self.payment)
# 添加边
self.graph.add_edge("browse", "add_to_cart")
self.graph.add_edge("add_to_cart", "browse")
self.graph.add_edge("add_to_cart", "checkout")
self.graph.add_edge("checkout", "payment")
# 设置入口点
self.graph.set_entry_point("browse")
def should_move_to_cart(self, state: ShoppingState) -> bool:
"""判断是否应该转换到购物车状态"""
return "add to cart" in state["user_input"].lower()
为了保证系统的可靠性,特别是在分布式环境或长时间运行的任务中,我们需要持久化状态信息。LangGraph 内置了 Checkpointer 机制,也可以结合 Redis 等外部存储实现自定义持久化。
import redis
import json
class StateManager:
def __init__(self):
self.redis_client = redis.Redis(host='localhost', port=6379, db=0)
def save_state(self, session_id: str, state: dict):
"""保存状态到 Redis"""
self.redis_client.set(
f"shopping_state:{session_id}",
json.dumps(state),
ex=3600 # 1 小时过期
)
def load_state(self, session_id: str) -> dict:
"""从 Redis 加载状态"""
state_data = self.redis_client.get(f"shopping_state:{session_id}")
return json.loads(state_data) if state_data else None
任何步骤都可能出错,我们需要优雅地处理这些情况,包括重试、回滚和最终错误处理。
import asyncio
class ErrorHandler:
def __init__(self):
self.max_retries = 3
async def with_retry(self, func, state: dict):
"""带重试机制的函数执行"""
retries = 0
while retries < self.max_retries:
try:
return await func(state)
except Exception as e:
retries += 1
if retries == self.max_retries:
return self.handle_final_error(e, state)
await self.handle_retry(e, state, retries)
def handle_final_error(self, error, state: dict):
"""处理最终错误"""
state["error"] = str(error)
return self.rollback_to_last_stable_state(state)
让我们看一个实际的例子 - 智能客服系统。该系统需要维护对话历史、识别用户意图并确认问题解决状态。
from langgraph.graph import StateGraph
from typing import TypedDict, List
class CustomerServiceState(TypedDict):
conversation_history: List[str]
current_intent: str
user_info: dict
resolved: bool
class CustomerServiceGraph(StateGraph):
def __init__(self):
super().__init__(CustomerServiceState)
# 初始化状态节点
self.add_node("greeting", self.greet_customer)
self.add_node("understand_intent", self.analyze_intent)
self.add_node("handle_query", self.process_query)
self.add_node("confirm_resolution", self.check_resolution)
# 定义流程
self.add_edge("greeting", "understand_intent")
self.add_edge("understand_intent", "handle_query")
self.add_edge("handle_query", "confirm_resolution")
self.add_conditional_edges(
"confirm_resolution",
lambda s: "end" if s["resolved"] else "handle_query"
)
async def greet_customer(self, state: State):
"""欢迎客户"""
response = await self.llm.generate(
prompt=f"""
历史对话:{state['conversation_history']}
任务:生成合适的欢迎语
要求:
1. 保持专业友好
2. 如果是老客户,表示认出了他们
3. 询问如何帮助
"""
)
state['conversation_history'].append(f"Assistant: {response}")
return state
async def analyze_intent(self, state: State):
"""理解用户意图"""
response = await self.llm.generate(
prompt=f"""
历史对话:{state['conversation_history']}
任务:分析用户意图
输出格式:
{{
"intent": "退款/咨询/投诉/其他",
"confidence": 0.95,
"details": "具体描述"
}}
"""
)
state['current_intent'] = json.loads(response)
return state
# 初始化系统
graph = CustomerServiceGraph().compile()
state_manager = StateManager()
error_handler = ErrorHandler()
async def handle_customer_query(user_id: str, message: str):
# 加载或创建状态
state = state_manager.load_state(user_id) or {
"conversation_history": [],
"current_intent": None,
"user_info": {},
"resolved": False
}
# 添加用户消息
state["conversation_history"].append(f"User: {message}")
# 执行状态机流程
try:
result = await graph.ainvoke(state)
# 保存状态
state_manager.save_state(user_id, result)
return result["conversation_history"][-1]
except Exception as e:
return await error_handler.with_retry(
graph.ainvoke,
state
)
add_conditional_edges 实现动态路由。set_entry_point 和结束节点。await 和 ainvoke 提高并发处理能力。LangGraph 状态机为复杂 AI Agent 任务流程管理提供了一个强大的解决方案。通过清晰的任务流程管理、可靠的状态持久化、完善的错误处理以及灵活的扩展能力,开发者可以构建更加健壮和智能的应用程序。在实际项目中,建议根据业务需求合理设计状态图,并重视异常处理和性能优化,以确保系统的稳定性和用户体验。

微信公众号「极客日志」,在微信中扫描左侧二维码关注。展示文案:极客日志 zeeklog
使用加密算法(如AES、TripleDES、Rabbit或RC4)加密和解密文本明文。 在线工具,加密/解密文本在线工具,online
生成新的随机RSA私钥和公钥pem证书。 在线工具,RSA密钥对生成器在线工具,online
基于 Mermaid.js 实时预览流程图、时序图等图表,支持源码编辑与即时渲染。 在线工具,Mermaid 预览与可视化编辑在线工具,online
解析常见 curl 参数并生成 fetch、axios、PHP curl 或 Python requests 示例代码。 在线工具,curl 转代码在线工具,online
将字符串编码和解码为其 Base64 格式表示形式即可。 在线工具,Base64 字符串编码/解码在线工具,online
将字符串、文件或图像转换为其 Base64 表示形式。 在线工具,Base64 文件转换器在线工具,online