AI Agent 实战:生产级框架搭建与落地指南
!在这里插入图片描述 2023 年是 ChatGPT 元年,2024 年是多模态爆发之年,而 2026 年则是 AI Agent 的落地元年。 你可能用过 Coze 搭建过聊天机器人,也可能在 Dify 上配置过知识库问答,但当真正要把 AI Agent 投入生产环境时,你会发现: 上下文记忆经常丢失 工具调用成功率只有 60% 成本控制一团混乱 无法处理复杂的多步任务 将带你从零开始,手写一…

!在这里插入图片描述 2023 年是 ChatGPT 元年,2024 年是多模态爆发之年,而 2026 年则是 AI Agent 的落地元年。 你可能用过 Coze 搭建过聊天机器人,也可能在 Dify 上配置过知识库问答,但当真正要把 AI Agent 投入生产环境时,你会发现: 上下文记忆经常丢失 工具调用成功率只有 60% 成本控制一团混乱 无法处理复杂的多步任务 将带你从零开始,手写一…

2023 年是 ChatGPT 元年,2024 年是多模态爆发之年,而 2026 年则是 AI Agent 的落地元年。
你可能用过 Coze 搭建过聊天机器人,也可能在 Dify 上配置过知识库问答,但当真正要把 AI Agent 投入生产环境时,你会发现:
本文将带你从零开始,手写一个生产级 AI Agent 框架,解决上述所有问题。

简单来说,AI Agent = LLM + 记忆 + 规划 + 工具
用户输入 -> 感知层 Perception -> 大脑层 Brain (LLM 推理引擎) -> 记忆层 Memory (短期 + 长期记忆) -> 规划层 Planning (任务分解) -> 工具层 Tools (API/函数调用) -> 决策层 Decision -> 行动层 Action -> 输出结果
| 技术层级 | 主流框架/工具 | 推荐指数 | 适用场景 |
|---|---|---|---|
| 编排框架 | LangChain / LangGraph | ⭐⭐⭐⭐⭐ | 复杂工作流编排 |
| 运行时 | AutoGen / AgentScope | ⭐⭐⭐⭐ | 多 Agent 协作 |
| 向量数据库 | Milvus / Chroma | ⭐⭐⭐⭐⭐ | RAG 知识库 |
| 工具生态 | OpenAI Function Calling | ⭐⭐⭐⭐⭐ | 结构化工具调用 |
| 记忆管理 | MemGPT | ⭐⭐⭐⭐ | 长对话场景 |
| 评估框架 | Ragas / TruLens | ⭐⭐⭐⭐ | 生产环境监控 |
agent-framework/
├── core/
│ ├── agent.py # Agent 核心类
│ ├── memory.py # 记忆管理模块
│ ├── planner.py # 任务规划器
│ └── tools.py # 工具注册器
├── memory/
│ ├── short_term.py # 短期记忆(Redis)
│ ├── long_term.py # 长期记忆(向量 DB)
│ └── semantic.py # 语义记忆检索
├── tools/
│ ├── base.py # 工具基类
│ ├── registry.py # 工具注册中心
│ └── builtin/ # 内置工具
├── evaluators/
│ ├── cost.py # 成本评估
│ └── performance.py# 性能评估
└── utils/
├── logger.py # 日志系统
└── retry.py # 重试机制
from typing import List, Dict, Any, Optional
from dataclasses import dataclass
from enum import Enum
import asyncio
import json
class AgentState(Enum):
"""Agent 状态枚举"""
IDLE = "idle" # 空闲
THINKING = "thinking" # 思考中
ACTING = "acting" # 执行中
WAITING = "waiting" # 等待外部输入
ERROR = "error" # 错误状态
@dataclass
class Message:
"""消息数据结构"""
role: str # user / assistant / system / tool
content: str # 消息内容
tool_calls: Optional[List[Dict]] = None # 工具调用
timestamp: float = None # 时间戳
metadata: Dict[str, Any] = None # 元数据
class BaseAgent:
"""生产级 Agent 基类"""
def __init__(
self,
llm_client: ,
memory_manager: = ,
tool_registry: = ,
max_iterations: = ,
verbose: =
):
.llm = llm_client
.memory = memory_manager
.tools = tool_registry
.max_iterations = max_iterations
.verbose = verbose
.state = AgentState.IDLE
.conversation_history: [Message] = []
() -> :
.conversation_history.append(Message(role=, content=user_input))
.state = AgentState.THINKING
iteration (.max_iterations):
._log()
context = ._retrieve_context(user_input)
prompt = ._build_prompt(context)
response = ._llm_inference(prompt)
response.tool_calls:
.state = AgentState.ACTING
tool_results = ._execute_tools(response.tool_calls)
result tool_results:
.conversation_history.append(
Message(role=, content=result[], tool_name=result[])
)
:
.state = AgentState.IDLE
.conversation_history.append(Message(role=, content=response.content))
response.content
() -> :
.memory:
.memory.search(query, top_k=)
() -> :
system_prompt =
system_prompt
() -> :
messages = [
{: , : prompt},
*[{: m.role, : m.content} m .conversation_history]
]
response = .llm.chat.completions.create(
model=,
messages=messages,
tools=.tools.get_tool_schemas() .tools ,
temperature=
)
response.choices[].message
() -> []:
results = []
call tool_calls:
tool_name = call[][]
arguments = json.loads(call[][])
._log()
:
tool = .tools.get_tool(tool_name)
result = tool.execute(**arguments)
results.append({
: tool_name,
: json.dumps(result, ensure_ascii=)
})
Exception e:
results.append({
: tool_name,
: json.dumps({: (e)})
})
results
():
.verbose:
()
from abc import ABC, abstractmethod
from typing import List, Dict, Any
import redis
import numpy as np
from datetime import datetime, timedelta
class MemoryBackend(ABC):
"""记忆后端抽象基类"""
@abstractmethod
async def add(self, content: str, metadata: Dict = None) -> str:
"""添加记忆"""
pass
@abstractmethod
async def search(self, query: str, top_k: int = 5) -> List[Dict]:
"""搜索记忆"""
pass
class ShortTermMemory(MemoryBackend):
"""短期记忆:基于 Redis 的会话记忆"""
def __init__(self, redis_url: str = "redis://localhost:6379", ttl: int = 3600):
self.client = redis.from_url(redis_url)
self.ttl = ttl # 记忆过期时间(秒)
async () -> :
memory_id =
memory_data = {
: content,
: metadata {},
: datetime.now().isoformat()
}
.client.setex(memory_id, .ttl, json.dumps(memory_data, ensure_ascii=))
memory_id
() -> []:
keys = .client.keys()
memories = []
key keys[-top_k:]:
data = json.loads(.client.get(key))
memories.append(data)
memories
():
():
.embedding_model = embedding_model
.vector_db = vector_db
() -> :
embedding = .embedding_model.embed(content)
memory_id = .vector_db.insert(vector=embedding, payload={: content, : metadata {}})
memory_id
() -> []:
query_embedding = .embedding_model.embed(query)
results = .vector_db.search(vector=query_embedding, top_k=top_k, score_threshold=)
results
:
():
.short_term = short_term
.long_term = long_term
():
.short_term.add(content, metadata)
importance > :
.long_term.add(content, metadata)
() -> []:
short_results = .short_term.search(query, top_k // )
long_results = .long_term.search(query, top_k // )
all_results = short_results + long_results
all_results[:top_k]
ReAct(Reasoning + Acting)是目前 Agent 最主流的推理范式。
代码实现:
class ReActAgent(BaseAgent):
"""基于 ReAct 范式的 Agent"""
def _build_react_prompt(self, question: str) -> str:
return f"""使用以下格式回答问题:
Question: {question}
Thought: 你应该思考做什么
Action: 要采取的操作,应该是 [{self.tools.get_tool_names()}] 中的一个
Observation: 操作的结果
... (这个 Thought/Action/Observation 可以重复 N 次)
Thought: 我现在知道最终答案了
Answer: 对原始问题的最终答案
开始!
Question: {question}
Thought:"""
async def run(self, user_input: str) -> str:
"""ReAct 循环执行"""
prompt = self._build_react_prompt(user_input)
for _ in range(self.max_iterations):
# LLM 生成下一步动作
response = await self.llm.generate(prompt)
# 解析响应
thought, action, action_input = self._parse_react_response(response)
if not action:
# 没有动作,说明已有答案
return thought
# 执行动作
observation = await self._execute_action(action, action_input)
# 更新提示词
prompt += f"\n{response}\nObservation: {observation}\nThought:"
return "无法在指定迭代次数内完成"
() -> :
lines = response.strip().split()
thought =
action =
action_input =
line lines:
line.startswith():
thought = line.replace(, ).strip()
line.startswith():
action = line.replace(, ).strip()
line.startswith():
action_input = line.replace(, ).strip()
thought, action, action_input
from typing import Callable, Dict, Any, List
import inspect
from pydantic import BaseModel, Field
class Tool(BaseModel):
"""工具基类"""
name: str = Field(description="工具名称")
description: str = Field(description="工具功能描述")
parameters: Dict[str, Any] = Field(default_factory=dict, description="参数 schema")
function: Callable = Field(description="工具执行函数")
class Config:
arbitrary_types_allowed = True
async def execute(self, **kwargs) -> Any:
"""执行工具"""
return await self.function(**kwargs)
def to_openai_schema(self) -> Dict:
"""转换为 OpenAI 函数调用格式"""
return {
"type": "function",
"function": {
"name": self.name,
"description": self.description,
"parameters": .parameters
}
}
():
() -> Tool:
sig = inspect.signature(func)
parameters = {}
param_name, param sig.parameters.items():
param_type = param.annotation param.annotation != inspect.Parameter.empty
parameters[param_name] = {
: param_type.__name__ (param_type, ) ,
:
}
Tool(
name=name func.__name__,
description=description func.__doc__ ,
parameters={
: ,
: parameters,
: [p p sig.parameters p.default == inspect.Parameter.empty]
},
function=func
)
decorator
():
():
:
():
._tools: [, Tool] = {}
():
._tools[tool.name] = tool
() -> Tool:
._tools.get(name)
() -> []:
(._tools.keys())
() -> :
descriptions = []
tool ._tools.values():
descriptions.append()
.join(descriptions)
() -> []:
[tool.to_openai_schema() tool ._tools.values()]
class TaskPlanner:
"""任务分解与规划器"""
def __init__(self, llm_client: Any):
self.llm = llm_client
async def plan(self, goal: str) -> List[Dict]:
"""将目标分解为子任务列表
Returns:
[ {"task": "任务描述", "order": 1, "dependencies": []}, ... ]
"""
prompt = f"""将以下目标分解为具体的、可执行的子任务列表。
目标:{goal}
请按以下格式输出:
1. [任务描述]
2. [任务描述]
...
要求:
- 每个任务应该独立且可执行
- 任务之间应该有逻辑顺序
- 尽量细化到可以直接执行"""
response = await self.llm.generate(prompt)
# 解析任务列表
tasks = []
for line in response.strip().split('\n'):
if line.strip():
# 提取任务描述
task_desc = line.split('.', 1)[1].strip() if '.' in line else line.strip()
tasks.append({"task": task_desc, "order": len(tasks) + 1, "status": "pending"})
return tasks
async def execute_plan(self, agent: BaseAgent, tasks: List[Dict]) -> Dict:
completed = []
failed = []
task tasks:
()
:
result = agent.run(task[])
task[] =
task[] = result
completed.append(task)
Exception e:
task[] =
task[] = (e)
failed.append(task)
{
: (failed) == ,
: completed,
: failed,
: completed[-][] completed
}
智能客服是 AI Agent 最典型的应用场景。我们来实现一个政务大厅智能客服,具备:
import asyncio
from typing import Optional
class CustomerServiceAgent(ReActAgent):
"""智能客服 Agent"""
def __init__(self, knowledge_base, ticket_system, *args, **kwargs):
super().__init__(*args, **kwargs)
self.knowledge_base = knowledge_base
self.ticket_system = ticket_system
# 注册客服专用工具
self._register_customer_service_tools()
def _register_customer_service_tools(self):
"""注册客服工具"""
@self.tools.register
@tool(name="search_policy", description="搜索政策信息")
async def search_policy(query: str):
"""从知识库搜索相关政策
Args:
query: 政策关键词
"""
results = await self.knowledge_base.search(query, top_k=3)
return "\n".join([r['content'] for r in results])
@self.tools.register
@tool(name="get_process_guide", description="获取办事流程")
async def get_process_guide(service_type: ):
guide = .knowledge_base.get_guide(service_type)
guide
():
ticket_id = .ticket_system.create(
category=category,
description=description,
priority=priority
)
():
queue_number = .ticket_system.human_transfer(reason)
() -> :
intent = ._detect_intent(user_input)
system_prompt = ._get_system_prompt(intent)
.run(user_input)
() -> :
intent_prompt =
response = .llm.generate(intent_prompt)
response.strip()
() -> :
prompts = {
: ,
: ,
: ,
:
}
prompts.get(intent, )
():
openai AsyncOpenAI
llm_client = AsyncOpenAI(api_key=)
knowledge_base = MockKnowledgeBase()
ticket_system = MockTicketSystem()
agent = CustomerServiceAgent(
llm_client=llm_client,
memory_manager=HybridMemory(
short_term=ShortTermMemory(),
long_term=LongTermMemory()
),
tool_registry=ToolRegistry(),
knowledge_base=knowledge_base,
ticket_system=ticket_system
)
response = agent.handle_customer_query()
(response)
__name__ == :
asyncio.run(main())
| 指标 | 传统规则客服 | 基础 Chatbot | 智能 Agent |
|---|---|---|---|
| 问题解决率 | 35% | 60% | 85% |
| 平均响应时间 | 5 分钟 | 2 秒 | 3 秒 |
| 多轮对话能力 | ❌ | ⚠️ | ✅ |
| 工具调用能力 | ❌ | ❌ | ✅ |
| 学习进化能力 | ❌ | ⚠️ | ✅ |
| 运营成本 | 高 | 低 | 中 |
AI Agent 的主要成本来源:
| 优化项 | 策略 | 预期节省 |
|---|---|---|
| Prompt 优化 | 精简系统提示词 | 20-30% |
| 模型选择 | 混合使用 GPT-4/GPT-3.5 | 40-50% |
| 缓存策略 | 重复问题命中缓存 | 30-40% |
| Token 限制 | 动态裁剪上下文 | 15-20% |
| 批量处理 | 合并多个请求 | 10-15% |
智能缓存实现:
import hashlib
from functools import wraps
import time
def smart_cache(ttl: int = 3600):
"""智能缓存装饰器"""
cache = {}
def decorator(func):
@wraps(func)
async def wrapper(*args, **kwargs):
# 生成缓存键
key = hashlib.md5(f"{func.__name__}{args}{kwargs}".encode()).hexdigest()
# 检查缓存
if key in cache:
cache_data = cache[key]
if time.time() - cache_data['timestamp'] < ttl:
print("缓存命中!")
return cache_data['result']
# 执行函数
result = await func(*args, **kwargs)
# 存储缓存
cache[key] = {'result': result, 'timestamp': time.time()}
return result
return wrapper
return decorator
# 使用示例
class OptimizedAgent(BaseAgent):
@smart_cache(ttl=1800)
async ():
()._llm_inference(prompt)
AI Agent 正在从"玩具"向"生产力工具"转变。2026 年,掌握 Agent 开发将成为 AI 工程师的核心竞争力。

微信公众号「极客日志」,在微信中扫描左侧二维码关注。展示文案:极客日志 zeeklog
使用加密算法(如AES、TripleDES、Rabbit或RC4)加密和解密文本明文。 在线工具,加密/解密文本在线工具,online
生成新的随机RSA私钥和公钥pem证书。 在线工具,RSA密钥对生成器在线工具,online
基于 Mermaid.js 实时预览流程图、时序图等图表,支持源码编辑与即时渲染。 在线工具,Mermaid 预览与可视化编辑在线工具,online
解析常见 curl 参数并生成 fetch、axios、PHP curl 或 Python requests 示例代码。 在线工具,curl 转代码在线工具,online
将字符串编码和解码为其 Base64 格式表示形式即可。 在线工具,Base64 字符串编码/解码在线工具,online
将字符串、文件或图像转换为其 Base64 表示形式。 在线工具,Base64 文件转换器在线工具,online