跳到主要内容AI Agent 生产级框架实战:架构设计与核心实现 | 极客日志PythonAI算法
AI Agent 生产级框架实战:架构设计与核心实现
AI Agent 开发面临记忆丢失、工具调用不稳定及成本失控等挑战。本文构建生产级 Agent 框架,涵盖核心架构设计、混合记忆管理、ReAct 推理范式及任务规划器实现。通过智能客服案例展示完整落地流程,并提供性能优化与成本控制策略,帮助开发者掌握从原型到生产环境的工程化实践。
flc13 浏览 2023 年 ChatGPT 引爆了大模型应用,2024 年多模态能力迅速成熟,而当前阶段,AI Agent 正从概念验证走向实际生产力落地。在实际工程中,我们常遇到上下文记忆丢失、工具调用成功率低、成本不可控以及复杂任务规划困难等问题。本文将带你从零构建一个生产级的 AI Agent 框架,解决上述痛点。
AI Agent 的核心架构
简单来说,AI Agent = LLM + 记忆 + 规划 + 工具。用户输入经过感知层进入大脑(LLM 推理引擎),结合记忆层的短期与长期信息,由规划层分解任务,最终通过工具层执行动作并输出结果。
在技术选型上,编排框架推荐 LangChain 或 LangGraph,运行时可选 AutoGen,向量数据库使用 Milvus 或 Chroma,工具生态依赖 OpenAI Function Calling,记忆管理可参考 MemGPT,评估则用 Ragas 或 TruLens。
从零搭建生产级 Agent 框架
项目结构设计
合理的目录结构是工程化的基础。核心模块包括 Agent 基类、记忆管理、任务规划器和工具注册器。记忆层需区分短期(Redis)和长期(向量 DB),工具层包含基类和内置功能,评估层负责成本和性能监控。
agent-framework/
├── core/
│ ├── agent.py # Agent 核心类
│ ├── memory.py # 记忆管理模块
│ ├── planner.py # 任务规划器
│ └── tools.py # 工具注册器
├── memory/
│ ├── short_term.py # 短期记忆(Redis)
│ ├── long_term.py # 长期记忆(向量 DB)
│ └── semantic.py # 语义记忆检索
├── tools/
│ ├── base.py # 工具基类
│ ├── registry.py # 工具注册中心
│ └── builtin/ # 内置工具
├── evaluators/
│ ├── cost.py # 成本评估
│ └── performance.py# 性能评估
└── utils/
├── logger.py # 日志系统
└── retry.py # 重试机制
核心代码:Agent 基类
Agent 的状态管理至关重要。我们需要定义空闲、思考、执行等状态,并维护消息历史。主循环负责迭代推理、检索上下文、构建提示词、LLM 推理及工具调用。
from typing import List, Dict, Any, Optional
from dataclasses import dataclass
from enum import Enum
import asyncio
import json
class AgentState():
IDLE =
THINKING =
ACTING =
WAITING =
ERROR =
:
role:
content:
tool_calls: [[]] =
timestamp: =
metadata: [, ] =
:
():
.llm = llm_client
.memory = memory_manager
.tools = tool_registry
.max_iterations = max_iterations
.verbose = verbose
.state = AgentState.IDLE
.conversation_history: [Message] = []
() -> :
.conversation_history.append(Message(role=, content=user_input))
.state = AgentState.THINKING
iteration (.max_iterations):
._log()
context = ._retrieve_context(user_input)
prompt = ._build_prompt(context)
response = ._llm_inference(prompt)
response.tool_calls:
.state = AgentState.ACTING
tool_results = ._execute_tools(response.tool_calls)
result tool_results:
.conversation_history.append(
Message(role=, content=result[], tool_name=result[])
)
:
.state = AgentState.IDLE
.conversation_history.append(Message(role=, content=response.content))
response.content
() -> :
.memory:
.memory.search(query, top_k=)
() -> :
system_prompt =
system_prompt
() -> :
messages = [{: , : prompt},
*[{: m.role, : m.content} m .conversation_history]]
response = .llm.chat.completions.create(
model=, messages=messages,
tools=.tools.get_tool_schemas() .tools ,
temperature=
)
response.choices[].message
() -> []:
results = []
call tool_calls:
tool_name = call[][]
arguments = json.loads(call[][])
._log()
:
tool = .tools.get_tool(tool_name)
result = tool.execute(**arguments)
results.append({: tool_name, : json.dumps(result, ensure_ascii=)})
Exception e:
results.append({: tool_name, : json.dumps({: (e)})})
results
():
.verbose:
()
Enum
"""Agent 状态枚举"""
"idle"
"thinking"
"acting"
"waiting"
"error"
@dataclass
class
Message
"""消息数据结构"""
str
str
Optional
List
Dict
None
float
None
Dict
str
Any
None
class
BaseAgent
"""生产级 Agent 基类"""
def
__init__
self, llm_client: Any, memory_manager: Any = None,
tool_registry: Any = None, max_iterations: int = 10, verbose: bool = True
self
self
self
self
self
self
self
List
async
def
run
self, user_input: str
str
"""Agent 主执行循环"""
self
"user"
self
for
in
range
self
self
f"迭代 {iteration + 1}/{self.max_iterations}"
await
self
self
await
self
if
self
await
self
for
in
self
"tool"
"content"
"tool_name"
else
self
self
"assistant"
return
return
"超过最大迭代次数,任务未完成"
async
def
_retrieve_context
self, query: str
str
if
not
self
return
""
return
await
self
3
def
_build_prompt
self, context: str
str
f"""你是一个智能 AI 助手。
# 可用工具 {self.tools.get_tool_descriptions() if self.tools else '无'}
# 相关记忆 {context}
# 任务要求
1. 分析用户需求
2. 如需信息查询或执行操作,调用相应工具
3. 基于工具结果给出准确答案
4. 如无法完成,明确说明原因
开始工作!"""
return
async
def
_llm_inference
self, prompt: str
Any
"role"
"system"
"content"
"role"
"content"
for
in
self
await
self
"gpt-4"
self
if
self
else
None
0.7
return
0
async
def
_execute_tools
self, tool_calls: List[Dict]
List
Dict
for
in
"function"
"name"
"function"
"arguments"
self
f"调用工具:{tool_name} | 参数:{arguments}"
try
self
await
"tool_name"
"content"
False
except
as
"tool_name"
"content"
"error"
str
return
def
_log
self, message: str
if
self
print
f"[Agent] {message}"
记忆管理系统
记忆是 Agent 的长期竞争力。混合记忆架构结合了 Redis 的会话记忆和向量数据库的语义记忆。短期记忆用于快速检索最近交互,长期记忆存储重要知识以便跨会话复用。
from abc import ABC, abstractmethod
from typing import List, Dict, Any
import redis
import numpy as np
from datetime import datetime, timedelta
class MemoryBackend(ABC):
@abstractmethod
async def add(self, content: str, metadata: Dict = None) -> str: pass
@abstractmethod
async def search(self, query: str, top_k: int = 5) -> List[Dict]: pass
class ShortTermMemory(MemoryBackend):
def __init__(self, redis_url: str = "redis://localhost:6379", ttl: int = 3600):
self.client = redis.from_url(redis_url)
self.ttl = ttl
async def add(self, content: str, metadata: Dict = None) -> str:
memory_id = f"mem:{datetime.now().timestamp()}"
memory_data = {"content": content, "metadata": metadata or {}, "timestamp": datetime.now().isoformat()}
self.client.setex(memory_id, self.ttl, json.dumps(memory_data, ensure_ascii=False))
return memory_id
async def search(self, query: str, top_k: int = 5) -> List[Dict]:
keys = self.client.keys("mem:*")
memories = []
for key in keys[-top_k:]:
data = json.loads(self.client.get(key))
memories.append(data)
return memories
class LongTermMemory(MemoryBackend):
def __init__(self, embedding_model: Any, vector_db: Any):
self.embedding_model = embedding_model
self.vector_db = vector_db
async def add(self, content: str, metadata: Dict = None) -> str:
embedding = await self.embedding_model.embed(content)
memory_id = self.vector_db.insert(vector=embedding, payload={"content": content, "metadata": metadata or {}})
return memory_id
async def search(self, query: str, top_k: int = 5) -> List[Dict]:
query_embedding = await self.embedding_model.embed(query)
results = self.vector_db.search(vector=query_embedding, top_k=top_k, score_threshold=0.7)
return results
class HybridMemory:
def __init__(self, short_term: ShortTermMemory, long_term: LongTermMemory):
self.short_term = short_term
self.long_term = long_term
async def remember(self, content: str, importance: float = 0.5, metadata: Dict = None):
await self.short_term.add(content, metadata)
if importance > 0.7:
await self.long_term.add(content, metadata)
async def recall(self, query: str, top_k: int = 5) -> List[Dict]:
short_results = await self.short_term.search(query, top_k // 2)
long_results = await self.long_term.search(query, top_k // 2)
all_results = short_results + long_results
return all_results[:top_k]
三大核心技术实现
ReAct 框架:推理 + 行动协同
ReAct(Reasoning + Acting)是目前最主流的推理范式。它强制模型在采取行动前进行思考,并通过观察工具结果来修正后续步骤。这种闭环机制显著提高了复杂任务的完成率。
class ReActAgent(BaseAgent):
def _build_react_prompt(self, question: str) -> str:
return f"""使用以下格式回答问题:
Question: {question}
Thought: 你应该思考做什么
Action: 要采取的操作,应该是 [{self.tools.get_tool_names()}] 中的一个
Observation: 操作的结果
...
Thought: 我现在知道最终答案了
Answer: 对原始问题的最终答案
开始!
Question: {question}
Thought:"""
async def run(self, user_input: str) -> str:
prompt = self._build_react_prompt(user_input)
for _ in range(self.max_iterations):
response = await self.llm.generate(prompt)
thought, action, action_input = self._parse_react_response(response)
if not action:
return thought
observation = await self._execute_action(action, action_input)
prompt += f"\n{response}\nObservation: {observation}\nThought:"
return "无法在指定迭代次数内完成"
def _parse_react_response(self, response: str) -> tuple:
lines = response.strip().split('\n')
thought, action, action_input = "", None, None
for line in lines:
if line.startswith("Thought:"): thought = line.replace("Thought:", "").strip()
elif line.startswith("Action:"): action = line.replace("Action:", "").strip()
elif line.startswith("Action Input:"): action_input = line.replace("Action Input:", "").strip()
return thought, action, action_input
工具调用系统
工具注册中心需要支持动态扩展。通过装饰器自动提取函数签名生成 Schema,简化了工具接入流程。注意处理异常返回,避免单个工具失败导致整个 Agent 崩溃。
from typing import Callable, Dict, Any, List
import inspect
from pydantic import BaseModel, Field
class Tool(BaseModel):
name: str = Field(description="工具名称")
description: str = Field(description="工具功能描述")
parameters: Dict[str, Any] = Field(default_factory=dict, description="参数 schema")
function: Callable = Field(description="工具执行函数")
class Config:
arbitrary_types_allowed = True
async def execute(self, **kwargs) -> Any:
return await self.function(**kwargs)
def to_openai_schema(self) -> Dict:
return {"type": "function", "function": {"name": self.name, "description": self.description, "parameters": self.parameters}}
def tool(name: str = None, description: str = None):
def decorator(func: Callable) -> Tool:
sig = inspect.signature(func)
parameters = {}
for param_name, param in sig.parameters.items():
param_type = param.annotation if param.annotation != inspect.Parameter.empty else "string"
parameters[param_name] = {
"type": param_type.__name__ if hasattr(param_type, "__name__") else "string",
"description": f"参数 {param_name}"
}
return Tool(
name=name or func.__name__,
description=description or func.__doc__ or "",
parameters={"type": "object", "properties": parameters, "required": [p for p in sig.parameters if p.default == inspect.Parameter.empty]},
function=func
)
return decorator
class ToolRegistry:
def __init__(self):
self._tools: Dict[str, Tool] = {}
def register(self, tool: Tool):
self._tools[tool.name] = tool
def get_tool(self, name: str) -> Tool:
return self._tools.get(name)
def get_tool_names(self) -> List[str]:
return list(self._tools.keys())
def get_tool_descriptions(self) -> str:
descriptions = []
for tool in self._tools.values():
descriptions.append(f"- {tool.name}: {tool.description}")
return "\n".join(descriptions)
def get_tool_schemas(self) -> List[Dict]:
return [tool.to_openai_schema() for tool in self._tools.values()]
任务规划器
对于多步任务,简单的单轮对话不够用。任务规划器将目标拆解为有序的子任务,并跟踪执行状态。这允许 Agent 处理更复杂的业务逻辑。
class TaskPlanner:
def __init__(self, llm_client: Any):
self.llm = llm_client
async def plan(self, goal: str) -> List[Dict]:
prompt = f"""将以下目标分解为具体的、可执行的子任务列表。
目标:{goal}
请按以下格式输出:
1. [任务描述]
2. [任务描述]
...
要求:
- 每个任务应该独立且可执行
- 任务之间应该有逻辑顺序
- 尽量细化到可以直接执行"""
response = await self.llm.generate(prompt)
tasks = []
for line in response.strip().split('\n'):
if line.strip():
task_desc = line.split('.', 1)[1].strip() if '.' in line else line.strip()
tasks.append({"task": task_desc, "order": len(tasks) + 1, "status": "pending"})
return tasks
async def execute_plan(self, agent: BaseAgent, tasks: List[Dict]) -> Dict:
completed, failed = [], []
for task in tasks:
print(f"\n执行任务 {task['order']}: {task['task']}")
try:
result = await agent.run(task['task'])
task['status'] = 'completed'
task['result'] = result
completed.append(task)
except Exception as e:
task['status'] = 'failed'
task['error'] = str(e)
failed.append(task)
return {
"success": len(failed) == 0,
"completed_tasks": completed,
"failed_tasks": failed,
"final_result": completed[-1]['result'] if completed else None
}
实战案例:智能客服 Agent
以政务大厅智能客服为例,该场景需要政策问答、办事流程引导、工单生成及人工转接能力。我们通过意图识别调整系统提示词,使 Agent 在不同场景下扮演不同角色。
import asyncio
from typing import Optional
class CustomerServiceAgent(ReActAgent):
def __init__(self, knowledge_base, ticket_system, *args, **kwargs):
super().__init__(*args, **kwargs)
self.knowledge_base = knowledge_base
self.ticket_system = ticket_system
self._register_customer_service_tools()
def _register_customer_service_tools(self):
@self.tools.register
@tool(name="search_policy", description="搜索政策信息")
async def search_policy(query: str):
results = await self.knowledge_base.search(query, top_k=3)
return "\n".join([r['content'] for r in results])
@self.tools.register
@tool(name="get_process_guide", description="获取办事流程")
async def get_process_guide(service_type: str):
guide = await self.knowledge_base.get_guide(service_type)
return guide
@self.tools.register
@tool(name="create_ticket", description="创建工单")
async def create_ticket(category: str, description: str, priority: str = "normal"):
ticket_id = await self.ticket_system.create(category=category, description=description, priority=priority)
return f"工单已创建,编号:{ticket_id},我们将在 1 个工作日内处理"
@self.tools.register
@tool(name="transfer_to_human", description="转人工客服")
async def transfer_to_human(reason: str):
queue_number = await self.ticket_system.human_transfer(reason)
return f"已为您转接人工客服,当前排队人数:{queue_number}人,预计等待时间:{queue_number * 2}分钟"
async def handle_customer_query(self, user_input: str) -> str:
intent = await self._detect_intent(user_input)
system_prompt = self._get_system_prompt(intent)
return await self.run(user_input)
async def _detect_intent(self, user_input: str) -> str:
intent_prompt = f"""分类以下用户咨询的意图类型:
用户输入:{user_input}
意图类型:
1. policy_inquiry - 政策咨询
2. process_guide - 办事流程咨询
3. complaint - 投诉建议
4. complex - 复杂问题需人工
只返回意图类型代码:"""
response = await self.llm.generate(intent_prompt)
return response.strip()
def _get_system_prompt(self, intent: str) -> str:
prompts = {
"policy_inquiry": "你是政策咨询专员,请准确引用政策文件内容...",
"process_guide": "你是办事引导员,请给出清晰的办事步骤...",
"complaint": "你是投诉处理专员,请先安抚情绪,再记录问题...",
"complex": "你是客服助理,对于复杂问题,请主动建议转人工..."
}
return prompts.get(intent, "你是智能客服助手...")
相比传统规则客服和基础 Chatbot,智能 Agent 在多轮对话、工具调用及学习进化能力上有显著提升,问题解决率可达 85% 以上。
性能优化与成本控制
生产环境中,Token 消耗是主要成本来源。通过混合模型策略(GPT-4/GPT-3.5)、Prompt 精简、缓存重复查询及动态裁剪上下文,可有效降低 40%-50% 的成本。
import hashlib
from functools import wraps
import time
def smart_cache(ttl: int = 3600):
cache = {}
def decorator(func):
@wraps(func)
async def wrapper(*args, **kwargs):
key = hashlib.md5(f"{func.__name__}{args}{kwargs}".encode()).hexdigest()
if key in cache:
cache_data = cache[key]
if time.time() - cache_data['timestamp'] < ttl:
print("缓存命中!")
return cache_data['result']
result = await func(*args, **kwargs)
cache[key] = {'result': result, 'timestamp': time.time()}
return result
return wrapper
return decorator
class OptimizedAgent(BaseAgent):
@smart_cache(ttl=1800)
async def _llm_inference(self, prompt: str):
return await super()._llm_inference(prompt)
总结
AI Agent 正在从玩具向生产力工具转变。掌握 Agent 开发已成为工程师的核心竞争力。关键点在于:混合记忆架构是最佳实践,工具调用决定能力边界,成本控制是生产化关键,完善的评估体系必不可少。下一步建议深入学习 LangChain/LangGraph,研究多 Agent 协作模式,并建立 Agent 评估体系。
相关免费在线工具
- 加密/解密文本
使用加密算法(如AES、TripleDES、Rabbit或RC4)加密和解密文本明文。 在线工具,加密/解密文本在线工具,online
- RSA密钥对生成器
生成新的随机RSA私钥和公钥pem证书。 在线工具,RSA密钥对生成器在线工具,online
- Mermaid 预览与可视化编辑
基于 Mermaid.js 实时预览流程图、时序图等图表,支持源码编辑与即时渲染。 在线工具,Mermaid 预览与可视化编辑在线工具,online
- 随机西班牙地址生成器
随机生成西班牙地址(支持马德里、加泰罗尼亚、安达卢西亚、瓦伦西亚筛选),支持数量快捷选择、显示全部与下载。 在线工具,随机西班牙地址生成器在线工具,online
- Gemini 图片去水印
基于开源反向 Alpha 混合算法去除 Gemini/Nano Banana 图片水印,支持批量处理与下载。 在线工具,Gemini 图片去水印在线工具,online
- curl 转代码
解析常见 curl 参数并生成 fetch、axios、PHP curl 或 Python requests 示例代码。 在线工具,curl 转代码在线工具,online