跳到主要内容AI Agent 实战:生产级框架搭建与核心实现 | 极客日志PythonAI算法
AI Agent 实战:生产级框架搭建与核心实现
AI Agent 实战指南涵盖生产级框架搭建、记忆管理、工具调用及任务规划等核心技术。通过 ReAct 范式与混合记忆架构,解决上下文丢失与成本控制问题。结合智能客服案例展示落地方案,提供性能优化策略与成本分析,助力开发者构建高可用 AI 系统。
www1 浏览 AI Agent 实战:生产级框架搭建与核心实现
一、AI Agent 的核心架构
1.1 什么是 AI Agent?
简单来说,AI Agent = LLM + 记忆 + 规划 + 工具。
- 感知层 (Perception): 用户输入
- 大脑层 (Brain): LLM 推理引擎
- 记忆层 (Memory): 短期 + 长期记忆
- 规划层 (Planning): 任务分解
- 工具层 (Tools): API/函数调用
- 决策层 (Decision) & 行动层 (Action): 输出结果
1.2 2026 年 Agent 技术栈全景
| 技术层级 | 主流框架/工具 | 推荐指数 | 适用场景 |
|---|
| 编排框架 | LangChain / LangGraph | ⭐⭐⭐⭐⭐ | 复杂工作流编排 |
| 运行时 | AutoGen / AgentScope | ⭐⭐⭐⭐ | 多 Agent 协作 |
| 向量数据库 | Milvus / Chroma | ⭐⭐⭐⭐⭐ | RAG 知识库 |
| 工具生态 | OpenAI Function Calling | ⭐⭐⭐⭐⭐ | 结构化工具调用 |
| 记忆管理 | MemGPT | ⭐⭐⭐⭐ | 长对话场景 |
| 评估框架 | Ragas / TruLens | ⭐⭐⭐⭐ | 生产环境监控 |
二、从零搭建生产级 Agent 框架
2.1 项目结构设计
agent-framework/
├── core/
│ ├── agent.py # Agent 核心类
│ ├── memory.py # 记忆管理模块
│ ├── planner.py # 任务规划器
│ └── tools.py # 工具注册器
├── memory/
│ ├── short_term.py # 短期记忆(Redis)
│ ├── long_term.py # 长期记忆(向量 DB)
│ └── semantic.py # 语义记忆检索
├── tools/
│ ├── base.py # 工具基类
│ ├── registry.py # 工具注册中心
│ └── builtin/
├── evaluators/
│ ├── cost.py
│ └── performance.py
└── utils/
├── logger.py
└── retry.py
# 内置工具
# 成本评估
# 性能评估
# 日志系统
# 重试机制
2.2 核心代码:Agent 基类
from typing import List, Dict, Any, Optional
from dataclasses import dataclass
from enum import Enum
import asyncio
import json
class AgentState(Enum):
"""Agent 状态枚举"""
IDLE = "idle"
THINKING = "thinking"
ACTING = "acting"
WAITING = "waiting"
ERROR = "error"
@dataclass
class Message:
"""消息数据结构"""
role: str
content: str
tool_calls: Optional[List[Dict]] = None
timestamp: float = None
metadata: Dict[str, Any] = None
class BaseAgent:
"""生产级 Agent 基类"""
def __init__(self, llm_client: Any,
memory_manager: Any = None,
tool_registry: Any = None,
max_iterations: int = 10,
verbose: bool = True):
self.llm = llm_client
self.memory = memory_manager
self.tools = tool_registry
self.max_iterations = max_iterations
self.verbose = verbose
self.state = AgentState.IDLE
self.conversation_history: List[Message] = []
async def run(self, user_input: str) -> str:
"""Agent 主执行循环"""
self.conversation_history.append(Message(role="user", content=user_input))
self.state = AgentState.THINKING
for iteration in range(self.max_iterations):
self._log(f"迭代 {iteration + 1}/{self.max_iterations}")
context = await self._retrieve_context(user_input)
prompt = self._build_prompt(context)
response = await self._llm_inference(prompt)
if response.tool_calls:
self.state = AgentState.ACTING
tool_results = await self._execute_tools(response.tool_calls)
for result in tool_results:
self.conversation_history.append(
Message(role="tool", content=result["content"], tool_name=result["tool_name"])
)
else:
self.state = AgentState.IDLE
self.conversation_history.append(Message(role="assistant", content=response.content))
return response.content
return "超过最大迭代次数,任务未完成"
async def _retrieve_context(self, query: str) -> str:
if not self.memory:
return ""
return await self.memory.search(query, top_k=3)
def _build_prompt(self, context: str) -> str:
system_prompt = f"""你是一个智能 AI 助手。
# 可用工具 {self.tools.get_tool_descriptions() if self.tools else '无'}
# 相关记忆 {context}
# 任务要求
1. 分析用户需求
2. 如需信息查询或执行操作,调用相应工具
3. 基于工具结果给出准确答案
4. 如无法完成,明确说明原因
开始工作!"""
return system_prompt
async def _llm_inference(self, prompt: str) -> Any:
messages = [
{"role": "system", "content": prompt},
*[{"role": m.role, "content": m.content} for m in self.conversation_history]
]
response = await self.llm.chat.completions.create(
model="gpt-4",
messages=messages,
tools=self.tools.get_tool_schemas() if self.tools else None,
temperature=0.7
)
return response.choices[0].message
async def _execute_tools(self, tool_calls: List[Dict]) -> List[Dict]:
results = []
for call in tool_calls:
tool_name = call["function"]["name"]
arguments = json.loads(call["function"]["arguments"])
self._log(f"调用工具:{tool_name} | 参数:{arguments}")
try:
tool = self.tools.get_tool(tool_name)
result = await tool.execute(**arguments)
results.append({"tool_name": tool_name, "content": json.dumps(result, ensure_ascii=False)})
except Exception as e:
results.append({"tool_name": tool_name, "content": json.dumps({"error": str(e)})})
return results
def _log(self, message: str):
if self.verbose:
print(f"[Agent] {message}")
2.3 记忆管理系统
from abc import ABC, abstractmethod
from typing import List, Dict, Any
import redis
import numpy as np
from datetime import datetime, timedelta
class MemoryBackend(ABC):
"""记忆后端抽象基类"""
@abstractmethod
async def add(self, content: str, metadata: Dict = None) -> str:
pass
@abstractmethod
async def search(self, query: str, top_k: int = 5) -> List[Dict]:
pass
class ShortTermMemory(MemoryBackend):
"""短期记忆:基于 Redis 的会话记忆"""
def __init__(self, redis_url: str = "redis://localhost:6379", ttl: int = 3600):
self.client = redis.from_url(redis_url)
self.ttl = ttl
async def add(self, content: str, metadata: Dict = None) -> str:
memory_id = f"mem:{datetime.now().timestamp()}"
memory_data = {"content": content, "metadata": metadata or {}, "timestamp": datetime.now().isoformat()}
self.client.setex(memory_id, self.ttl, json.dumps(memory_data, ensure_ascii=False))
return memory_id
async def search(self, query: str, top_k: int = 5) -> List[Dict]:
keys = self.client.keys("mem:*")
memories = []
for key in keys[-top_k:]:
data = json.loads(self.client.get(key))
memories.append(data)
return memories
class LongTermMemory(MemoryBackend):
"""长期记忆:基于向量数据库的语义记忆"""
def __init__(self, embedding_model: Any, vector_db: Any):
self.embedding_model = embedding_model
self.vector_db = vector_db
async def add(self, content: str, metadata: Dict = None) -> str:
embedding = await self.embedding_model.embed(content)
memory_id = self.vector_db.insert(vector=embedding, payload={"content": content, "metadata": metadata or {}})
return memory_id
async def search(self, query: str, top_k: int = 5) -> List[Dict]:
query_embedding = await self.embedding_model.embed(query)
results = self.vector_db.search(vector=query_embedding, top_k=top_k, score_threshold=0.7)
return results
class HybridMemory:
"""混合记忆管理器:整合短期和长期记忆"""
def __init__(self, short_term: ShortTermMemory, long_term: LongTermMemory):
self.short_term = short_term
self.long_term = long_term
async def remember(self, content: str, importance: float = 0.5, metadata: Dict = None):
await self.short_term.add(content, metadata)
if importance > 0.7:
await self.long_term.add(content, metadata)
async def recall(self, query: str, top_k: int = 5) -> List[Dict]:
short_results = await self.short_term.search(query, top_k // 2)
long_results = await self.long_term.search(query, top_k // 2)
all_results = short_results + long_results
return all_results[:top_k]
三、三大核心技术实现
3.1 ReAct 框架:推理 + 行动协同
ReAct(Reasoning + Acting)是目前 Agent 最主流的推理范式。
流程:用户问题 -> Thought(分析问题) -> Action(调用工具) -> Observation(工具结果) -> Answer(给出答案)。
class ReActAgent(BaseAgent):
"""基于 ReAct 范式的 Agent"""
def _build_react_prompt(self, question: str) -> str:
return f"""使用以下格式回答问题:
Question: {question}
Thought: 你应该思考做什么
Action: 要采取的操作,应该是 [{self.tools.get_tool_names()}] 中的一个
Observation: 操作的结果
...
Thought: 我现在知道最终答案了
Answer: 对原始问题的最终答案
开始!
Question: {question}
Thought:"""
async def run(self, user_input: str) -> str:
prompt = self._build_react_prompt(user_input)
for _ in range(self.max_iterations):
response = await self.llm.generate(prompt)
thought, action, action_input = self._parse_react_response(response)
if not action:
return thought
observation = await self._execute_action(action, action_input)
prompt += f"\n{response}\nObservation: {observation}\nThought:"
return "无法在指定迭代次数内完成"
def _parse_react_response(self, response: str) -> tuple:
lines = response.strip().split('\n')
thought = ""
action = None
action_input = None
for line in lines:
if line.startswith("Thought:"): thought = line.replace("Thought:", "").strip()
elif line.startswith("Action:"): action = line.replace("Action:", "").strip()
elif line.startswith("Action Input:"): action_input = line.replace("Action Input:", "").strip()
return thought, action, action_input
3.2 工具调用系统
from typing import Callable, Dict, Any, List
import inspect
from pydantic import BaseModel, Field
class Tool(BaseModel):
"""工具基类"""
name: str = Field(description="工具名称")
description: str = Field(description="工具功能描述")
parameters: Dict[str, Any] = Field(default_factory=dict, description="参数 schema")
function: Callable = Field(description="工具执行函数")
class Config:
arbitrary_types_allowed = True
async def execute(self, **kwargs) -> Any:
return await self.function(**kwargs)
def to_openai_schema(self) -> Dict:
return {"type": "function", "function": {"name": self.name, "description": self.description, "parameters": self.parameters}}
def tool(name: str = None, description: str = None):
"""工具装饰器"""
def decorator(func: Callable) -> Tool:
sig = inspect.signature(func)
parameters = {}
for param_name, param in sig.parameters.items():
param_type = param.annotation if param.annotation != inspect.Parameter.empty else "string"
parameters[param_name] = {
"type": param_type.__name__ if hasattr(param_type, "__name__") else "string",
"description": f"参数 {param_name}"
}
return Tool(
name=name or func.__name__,
description=description or func.__doc__ or "",
parameters={"type": "object", "properties": parameters, "required": [p for p in sig.parameters if p.default == inspect.Parameter.empty]},
function=func
)
return decorator
@tool(name="search_web", description="搜索网络信息")
async def search_web(query: str, num_results: int = 5):
return f"找到 {num_results} 条关于 '{query}' 的结果"
@tool(name="get_weather", description="获取天气信息")
async def get_weather(location: str):
return f"{location} 今天晴,温度 25°C"
class ToolRegistry:
"""工具注册中心"""
def __init__(self):
self._tools: Dict[str, Tool] = {}
def register(self, tool: Tool):
self._tools[tool.name] = tool
def get_tool(self, name: str) -> Tool:
return self._tools.get(name)
def get_tool_names(self) -> List[str]:
return list(self._tools.keys())
def get_tool_descriptions(self) -> str:
descriptions = []
for tool in self._tools.values():
descriptions.append(f"- {tool.name}: {tool.description}")
return "\n".join(descriptions)
def get_tool_schemas(self) -> List[Dict]:
return [tool.to_openai_schema() for tool in self._tools.values()]
3.3 任务规划器
class TaskPlanner:
"""任务分解与规划器"""
def __init__(self, llm_client: Any):
self.llm = llm_client
async def plan(self, goal: str) -> List[Dict]:
prompt = f"""将以下目标分解为具体的、可执行的子任务列表。
目标:{goal}
请按以下格式输出:
1. [任务描述]
2. [任务描述]
...
要求:
- 每个任务应该独立且可执行
- 任务之间应该有逻辑顺序
- 尽量细化到可以直接执行"""
response = await self.llm.generate(prompt)
tasks = []
for line in response.strip().split('\n'):
if line.strip():
task_desc = line.split('.', 1)[1].strip() if '.' in line else line.strip()
tasks.append({"task": task_desc, "order": len(tasks) + 1, "status": "pending"})
return tasks
async def execute_plan(self, agent: BaseAgent, tasks: List[Dict]) -> Dict:
completed = []
failed = []
for task in tasks:
print(f"\n执行任务 {task['order']}: {task['task']}")
try:
result = await agent.run(task['task'])
task['status'] = 'completed'
task['result'] = result
completed.append(task)
except Exception as e:
task['status'] = 'failed'
task['error'] = str(e)
failed.append(task)
return {
"success": len(failed) == 0,
"completed_tasks": completed,
"failed_tasks": failed,
"final_result": completed[-1]['result'] if completed else None
}
四、实战案例:智能客服 Agent
4.1 场景分析
智能客服是 AI Agent 最典型的应用场景。我们来实现一个政务大厅智能客服,具备政策问答、办事流程引导、工单生成及人工转接能力。
4.2 完整实现
import asyncio
from typing import Optional
class CustomerServiceAgent(ReActAgent):
"""智能客服 Agent"""
def __init__(self, knowledge_base, ticket_system, *args, **kwargs):
super().__init__(*args, **kwargs)
self.knowledge_base = knowledge_base
self.ticket_system = ticket_system
self._register_customer_service_tools()
def _register_customer_service_tools(self):
@self.tools.register
@tool(name="search_policy", description="搜索政策信息")
async def search_policy(query: str):
results = await self.knowledge_base.search(query, top_k=3)
return "\n".join([r['content'] for r in results])
@self.tools.register
@tool(name="get_process_guide", description="获取办事流程")
async def get_process_guide(service_type: str):
guide = await self.knowledge_base.get_guide(service_type)
return guide
@self.tools.register
@tool(name="create_ticket", description="创建工单")
async def create_ticket(category: str, description: str, priority: str = "normal"):
ticket_id = await self.ticket_system.create(category=category, description=description, priority=priority)
return f"工单已创建,编号:{ticket_id},我们将在 1 个工作日内处理"
@self.tools.register
@tool(name="transfer_to_human", description="转人工客服")
async def transfer_to_human(reason: str):
queue_number = await self.ticket_system.human_transfer(reason)
return f"已为您转接人工客服,当前排队人数:{queue_number}人,预计等待时间:{queue_number * 2}分钟"
async def handle_customer_query(self, user_input: str) -> str:
intent = await self._detect_intent(user_input)
system_prompt = self._get_system_prompt(intent)
return await self.run(user_input)
async def _detect_intent(self, user_input: str) -> str:
intent_prompt = f"""分类以下用户咨询的意图类型:
用户输入:{user_input}
意图类型:
1. policy_inquiry - 政策咨询
2. process_guide - 办事流程咨询
3. complaint - 投诉建议
4. complex - 复杂问题需人工
只返回意图类型代码:"""
response = await self.llm.generate(intent_prompt)
return response.strip()
def _get_system_prompt(self, intent: str) -> str:
prompts = {
"policy_inquiry": "你是政策咨询专员,请准确引用政策文件内容...",
"process_guide": "你是办事引导员,请给出清晰的办事步骤...",
"complaint": "你是投诉处理专员,请先安抚情绪,再记录问题...",
"complex": "你是客服助理,对于复杂问题,请主动建议转人工..."
}
return prompts.get(intent, "你是智能客服助手...")
async def main():
from openai import AsyncOpenAI
llm_client = AsyncOpenAI(api_key="your-api-key")
knowledge_base = MockKnowledgeBase()
ticket_system = MockTicketSystem()
agent = CustomerServiceAgent(
llm_client=llm_client,
memory_manager=HybridMemory(short_term=ShortTermMemory(), long_term=LongTermMemory()),
tool_registry=ToolRegistry(),
knowledge_base=knowledge_base,
ticket_system=ticket_system
)
response = await agent.handle_customer_query("我想办理社保卡,需要准备什么材料?")
print(response)
if __name__ == "__main__":
asyncio.run(main())
4.3 性能对比
| 指标 | 传统规则客服 | 基础 Chatbot | 智能 Agent |
|---|
| 问题解决率 | 35% | 60% | 85% |
| 平均响应时间 | 5 分钟 | 2 秒 | 3 秒 |
| 多轮对话能力 | ❌ | ⚠️ | ✅ |
| 工具调用能力 | ❌ | ❌ | ✅ |
| 学习进化能力 | ❌ | ⚠️ | ✅ |
| 运营成本 | 高 | 低 | 中 |
五、性能优化与成本控制
5.1 成本分析
AI Agent 的主要成本来源包括 LLM Token 消耗、向量数据库、Redis 缓存及 API 调用等。
5.2 优化策略
| 优化项 | 策略 | 预期节省 |
|---|
| Prompt 优化 | 精简系统提示词 | 20-30% |
| 模型选择 | 混合使用 GPT-4/GPT-3.5 | 40-50% |
| 缓存策略 | 重复问题命中缓存 | 30-40% |
| Token 限制 | 动态裁剪上下文 | 15-20% |
| 批量处理 | 合并多个请求 | 10-15% |
import hashlib
from functools import wraps
import time
def smart_cache(ttl: int = 3600):
"""智能缓存装饰器"""
cache = {}
def decorator(func):
@wraps(func)
async def wrapper(*args, **kwargs):
key = hashlib.md5(f"{func.__name__}{args}{kwargs}".encode()).hexdigest()
if key in cache:
cache_data = cache[key]
if time.time() - cache_data['timestamp'] < ttl:
print("缓存命中!")
return cache_data['result']
result = await func(*args, **kwargs)
cache[key] = {'result': result, 'timestamp': time.time()}
return result
return wrapper
return decorator
class OptimizedAgent(BaseAgent):
@smart_cache(ttl=1800)
async def _llm_inference(self, prompt: str):
return await super()._llm_inference(prompt)
六、总结
AI Agent 正在从玩具向生产力工具转变。掌握 Agent 开发将成为 AI 工程师的核心竞争力。
关键要点
- 记忆是 Agent 的核心竞争力 - 混合记忆架构(短期 + 长期)是最佳实践
- 工具调用决定 Agent 能力边界 - 丰富的工具生态 = 更强的 Agent 能力
- 成本控制是生产化关键 - 智能缓存 + 模型混合可节省 50%+ 成本
- 评估体系必不可少 - 建立完善的监控和评估体系
相关免费在线工具
- 加密/解密文本
使用加密算法(如AES、TripleDES、Rabbit或RC4)加密和解密文本明文。 在线工具,加密/解密文本在线工具,online
- RSA密钥对生成器
生成新的随机RSA私钥和公钥pem证书。 在线工具,RSA密钥对生成器在线工具,online
- Mermaid 预览与可视化编辑
基于 Mermaid.js 实时预览流程图、时序图等图表,支持源码编辑与即时渲染。 在线工具,Mermaid 预览与可视化编辑在线工具,online
- curl 转代码
解析常见 curl 参数并生成 fetch、axios、PHP curl 或 Python requests 示例代码。 在线工具,curl 转代码在线工具,online
- Base64 字符串编码/解码
将字符串编码和解码为其 Base64 格式表示形式即可。 在线工具,Base64 字符串编码/解码在线工具,online
- Base64 文件转换器
将字符串、文件或图像转换为其 Base64 表示形式。 在线工具,Base64 文件转换器在线工具,online