AI Agent 搭建完整流程指南
1. 环境准备
1.1 安装 Python
Windows
- 访问 Python 官网
- 下载最新的 Python 3.10 或 3.11 版本
- 运行安装程序,勾选 "Add Python to PATH"
- 点击 "Install Now"
验证安装
python --version
pip --version
1.2 创建虚拟环境
python -m venv ai-agent-env
ai-agent-env\Scripts\activate
source ai-agent-env/bin/activate
pip install --upgrade pip
1.3 安装必要工具
git --version
2. 核心组件安装
2.1 安装 LangChain
pip install langchain langchain-core langchain-community langchain-openai
2.2 安装模型相关库
pip install openai
pip install transformers torch torchvision torchaudio
2.3 安装文档处理库
pip install pypdf pdfplumber
pip install pytesseract pillow
pip install python-docx
pip install beautifulsoup4
2.4 安装向量数据库
pip install chromadb
pip install pinecone-client
pip install faiss-cpu
pip install faiss-gpu
2.5 安装 Web 框架
pip install fastapi uvicorn
pip install flask
3. 项目结构设计
ai-agent/
├── app/
│ ├── main.py
│ ├── config/
│ │ └── settings.py
│ ├── components/
│ │ ├── document_loader.py
│ │ ├── text_splitter.py
│ │ ├── vector_store.py
│ │ └── llm_handler.py
│ ├── api/
│ │ ├── endpoints/
│ │ │ ├── docs.py
│ │ │ └── chat.py
│ │ └── routers.py
│ └── services/
│ ├── document_service.py
│ └── chat_service.py
├── data/
│ ├── raw/
│ └── processed/
├── models/
├── tests/
├── requirements.txt
├── .env
├── Dockerfile
└── README.md
4. 配置文件设置
4.1 创建 .env 文件
touch .env
4.2 配置环境变量
OPENAI_API_KEY=your-openai-api-key
OPENAI_MODEL_NAME=gpt-3.5-turbo
CHROMA_DB_PATH=./data/chroma_db
PINECONE_API_KEY=your-pinecone-api-key
PINECONE_ENVIRONMENT=your-pinecone-environment
PINECONE_INDEX_NAME=your-index-name
APP_NAME=AI Agent
APP_VERSION=1.0.0
DEBUG=True
4.3 创建配置加载文件
from pydantic_settings import BaseSettings
from typing import Optional
class Settings(BaseSettings):
openai_api_key: str
openai_model_name: str = "gpt-3.5-turbo"
chroma_db_path: str = "./data/chroma_db"
pinecone_api_key: Optional[str] = None
pinecone_environment: Optional[str] = None
pinecone_index_name: Optional[str] = None
app_name: str = "AI Agent"
app_version: str = "1.0.0"
debug: bool = True
class Config:
env_file = ".env"
env_file_encoding = "utf-8"
settings = Settings()
5. 核心组件开发
5.1 文档加载器
from langchain_community.document_loaders import PyPDFLoader, Docx2txtLoader, TextLoader, BSHTMLLoader
from langchain_community.document_loaders import UnstructuredFileLoader
from typing import List
from langchain_core.documents import Document
class DocumentLoader:
@staticmethod
def load_document(file_path: str) -> List[Document]:
"""根据文件类型加载文档"""
if file_path.endswith('.pdf'):
loader = PyPDFLoader(file_path)
elif file_path.endswith('.docx'):
loader = Docx2txtLoader(file_path)
elif file_path.endswith('.txt'):
loader = TextLoader(file_path, encoding='utf-8')
elif file_path.endswith('.html'):
loader = BSHTMLLoader(file_path)
else:
loader = UnstructuredFileLoader(file_path)
return loader.load()
@staticmethod
def load_documents(file_paths: List[str]) -> List[Document]:
"""加载多个文档"""
documents = []
for file_path in file_paths:
documents.extend(DocumentLoader.load_document(file_path))
return documents
5.2 文本分割器
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain_core.documents import Document
from typing import List
class TextSplitter:
def __init__(self, chunk_size: int = 1000, chunk_overlap: int = 200):
self.splitter = RecursiveCharacterTextSplitter(
chunk_size=chunk_size,
chunk_overlap=chunk_overlap,
length_function=len,
separators=["\n\n", "\n", " ", ""])
def split_documents(self, documents: List[Document]) -> List[Document]:
"""分割文档为小块"""
return self.splitter.split_documents(documents)
def split_text(self, text: str) -> List[str]:
"""分割文本为小块"""
return self.splitter.split_text(text)
5.3 向量存储
from langchain_community.vectorstores import Chroma, Pinecone
from langchain_openai import OpenAIEmbeddings
from langchain_community.embeddings import HuggingFaceEmbeddings
from langchain_core.documents import Document
from typing import List, Optional
from app.config.settings import settings
class VectorStoreManager:
def __init__(self):
self.embeddings = OpenAIEmbeddings(
api_key=settings.openai_api_key
)
def create_chroma_vector_store(self, documents: List[Document], persist_directory: Optional[str] = None) -> Chroma:
"""创建 Chroma 向量存储"""
persist_dir = persist_directory or settings.chroma_db_path
vector_store = Chroma.from_documents(
documents=documents,
embedding=self.embeddings,
persist_directory=persist_dir
)
vector_store.persist()
return vector_store
def load_chroma_vector_store(self, persist_directory: Optional[str] = None) -> Chroma:
persist_dir = persist_directory settings.chroma_db_path
Chroma(
embedding_function=.embeddings,
persist_directory=persist_dir
)
() -> Pinecone:
pinecone
pinecone.init(
api_key=settings.pinecone_api_key,
environment=settings.pinecone_environment
)
index_name = settings.pinecone_index_name
index_name pinecone.list_indexes():
pinecone.create_index(
name=index_name,
dimension=,
metric=
)
vector_store = Pinecone.from_documents(
documents=documents,
embedding=.embeddings,
index_name=index_name
)
vector_store
() -> Pinecone:
pinecone
pinecone.init(
api_key=settings.pinecone_api_key,
environment=settings.pinecone_environment
)
Pinecone.from_existing_index(
index_name=settings.pinecone_index_name,
embedding=.embeddings
)
5.4 LLM 处理器
from langchain_openai import ChatOpenAI
from langchain_community.chat_models import ChatOllama
from langchain_core.messages import HumanMessage, SystemMessage
from langchain_core.output_parsers import StrOutputParser
from langchain_core.prompts import ChatPromptTemplate
from app.config.settings import settings
class LLMHandler:
def __init__(self):
self.llm = ChatOpenAI(
api_key=settings.openai_api_key,
model_name=settings.openai_model_name,
temperature=0.1
)
def generate_response(self, prompt: str, system_prompt: Optional[str] = None) -> str:
"""生成 LLM 响应"""
messages = []
if system_prompt:
messages.append(SystemMessage(content=system_prompt))
messages.append(HumanMessage(content=prompt))
response = self.llm.invoke(messages)
return response.content
def generate_with_template(self, template: str, input_variables: dict) -> :
prompt_template = ChatPromptTemplate.from_template(template)
chain = prompt_template | .llm | StrOutputParser()
chain.invoke(input_variables)
6. 业务逻辑服务
6.1 文档处理服务
from app.components.document_loader import DocumentLoader
from app.components.text_splitter import TextSplitter
from app.components.vector_store import VectorStoreManager
from langchain_core.documents import Document
from typing import List
import os
class DocumentService:
def __init__(self):
self.loader = DocumentLoader()
self.splitter = TextSplitter()
self.vector_store_manager = VectorStoreManager()
def process_and_store_document(self, file_path: str) -> bool:
"""处理并存储单个文档"""
try:
documents = self.loader.load_document(file_path)
split_docs = self.splitter.split_documents(documents)
self.vector_store_manager.create_chroma_vector_store(split_docs)
return True
except Exception as e:
print(f"处理文档时出错:{e}")
return False
def process_and_store_documents(self, file_paths: []) -> :
results = {: [], : []}
file_path file_paths:
.process_and_store_document(file_path):
results[].append(file_path)
:
results[].append(file_path)
results
() -> :
supported_extensions = [, , , ]
file_paths = []
root, dirs, files os.walk(folder_path):
file files:
(file.endswith(ext) ext supported_extensions):
file_paths.append(os.path.join(root, file))
.process_and_store_documents(file_paths)
6.2 聊天服务
from app.components.vector_store import VectorStoreManager
from app.components.llm_handler import LLMHandler
from langchain.chains import RetrievalQA
from langchain_core.prompts import ChatPromptTemplate
class ChatService:
def __init__(self):
self.vector_store_manager = VectorStoreManager()
self.llm_handler = LLMHandler()
self.vector_store = self.vector_store_manager.load_chroma_vector_store()
self.retriever = self.vector_store.as_retriever(
search_type="similarity",
search_kwargs={"k": 5}
)
self.qa_chain = RetrievalQA.from_chain_type(
llm=self.llm_handler.llm,
chain_type="stuff",
retriever=self.retriever,
return_source_documents=True
)
def get_answer(self, question: str) -> dict:
"""根据问题获取答案"""
result = self.qa_chain.invoke({"query": question})
sources = []
for doc in result["source_documents"]:
sources.append({
: doc.page_content[:] + (doc.page_content) > doc.page_content,
: doc.metadata
})
{: result[], : sources}
() -> :
chat_history:
context = .join([ msg chat_history])
prompt =
:
prompt = question
.get_answer(prompt)
7. API 开发
7.1 主应用入口
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
from app.api.routers import router
from app.config.settings import settings
app = FastAPI(
title=settings.app_name,
version=settings.app_version,
debug=settings.debug
)
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
app.include_router(router, prefix="/api")
@app.get("/")
def root():
return {"message": f"Welcome to {settings.app_name}", "version": settings.app_version}
@app.get("/health")
def health_check():
return {"status": "healthy"}
7.2 路由注册
from fastapi import APIRouter
from app.api.endpoints import docs, chat
router = APIRouter()
router.include_router(docs.router, prefix="/docs", tags=["documents"])
router.include_router(chat.router, prefix="/chat", tags=["chat"])
7.3 文档 API 端点
from fastapi import APIRouter, UploadFile, File, HTTPException
from fastapi.responses import JSONResponse
from typing import List
import os
import shutil
from app.services.document_service import DocumentService
router = APIRouter()
document_service = DocumentService()
UPLOAD_DIR = "./data/raw"
os.makedirs(UPLOAD_DIR, exist_ok=True)
@router.post("/upload")
async def upload_document(file: UploadFile = File(...)):
"""上传并处理单个文档"""
try:
file_path = os.path.join(UPLOAD_DIR, file.filename)
with open(file_path, "wb") as buffer:
shutil.copyfileobj(file.file, buffer)
success = document_service.process_and_store_document(file_path)
if success:
return JSONResponse(
status_code=200,
content={"message": f"文档 {file.filename} 上传并处理成功"}
)
else:
raise HTTPException(status_code=500, detail=f"文档 {file.filename} 处理失败")
except Exception as e:
raise HTTPException(status_code=, detail=)
():
file_paths = []
:
file files:
file_path = os.path.join(UPLOAD_DIR, file.filename)
(file_path, ) buffer:
shutil.copyfileobj(file.file, buffer)
file_paths.append(file_path)
results = document_service.process_and_store_documents(file_paths)
JSONResponse(
status_code=,
content={: , : results}
)
Exception e:
HTTPException(status_code=, detail=)
():
:
os.path.exists(folder_path):
HTTPException(status_code=, detail=)
results = document_service.add_folder_documents(folder_path)
JSONResponse(
status_code=,
content={: , : results}
)
Exception e:
HTTPException(status_code=, detail=)
7.4 聊天 API 端点
from fastapi import APIRouter, HTTPException
from pydantic import BaseModel
from app.services.chat_service import ChatService
router = APIRouter()
chat_service = ChatService()
class ChatRequest(BaseModel):
question: str
chat_history: list = None
@router.post("/query")
async def query_agent(request: ChatRequest):
"""查询 AI Agent"""
try:
result = chat_service.chat_with_context(request.question, request.chat_history)
return result
except Exception as e:
raise HTTPException(status_code=500, detail=f"查询时出错:{str(e)}")
8. 前端界面开发(可选)
8.1 创建前端项目
node --version
npm --version
npm create vite@latest ai-agent-frontend -- --template react
cd ai-agent-frontend
npm install
npm install axios react-dropzone
8.2 主要组件开发
App.jsx
import { useState } from 'react'
import './App.css'
import ChatInterface from './components/ChatInterface'
import DocumentUpload from './components/DocumentUpload'
function App() {
const [activeTab, setActiveTab] = useState('chat')
return (
<div className="app">
<header className="app-header">
<h1>AI Agent</h1>
<nav>
<button className={activeTab === 'chat' ? 'active' : ''} onClick={() => setActiveTab('chat')}>
聊天
</button>
<button className={activeTab === 'upload' ? 'active' : ''} onClick={() => setActiveTab('upload')}>
上传文档
</button>
{activeTab === 'chat' && }
{activeTab === 'upload' && }
)
}
ChatInterface.jsx
import { useState, useEffect } from 'react'
import axios from 'axios'
function ChatInterface() {
const [messages, setMessages] = useState([])
const [input, setInput] = useState('')
const [loading, setLoading] = useState(false)
const sendMessage = async () => {
if (!input.trim()) return
const userMessage = { role: 'user', content: input }
setMessages(prev => [...prev, userMessage])
setInput('')
setLoading(true)
try {
const response = await axios.post('http://localhost:8000/api/chat/query', {
question: input,
chat_history: messages.map(msg => ({
question: msg.role === 'user' ? msg.content : '',
answer: msg.role === 'assistant' ? msg. :
})).( msg.)
})
assistantMessage = { : , : response.. }
( [...prev, assistantMessage])
} (error) {
.(, error)
errorMessage = { : , : }
( [...prev, errorMessage])
} {
()
}
}
(
)
}
DocumentUpload.jsx
import { useCallback } from 'react'
import { useDropzone } from 'react-dropzone'
import axios from 'axios'
function DocumentUpload() {
const [uploading, setUploading] = useState(false)
const [result, setResult] = useState(null)
const onDrop = useCallback(async (acceptedFiles) => {
setUploading(true)
setResult(null)
const formData = new FormData()
acceptedFiles.forEach(file => {
formData.append('files', file)
})
try {
const response = await axios.post('http://localhost:8000/api/docs/upload-multiple', formData, {
headers: {
'Content-Type': 'multipart/form-data'
}
})
setResult(response.data)
} catch (error) {
console.error('上传文件失败:', error)
setResult({ error: '文件上传失败' })
} finally {
()
}
}, [])
{ getRootProps, getInputProps, isDragActive } = ({ onDrop })
(
)}
</div>
)
}
8.3 启动前端开发服务器
npm run dev
9. 测试与调试
9.1 运行后端服务
uvicorn app.main:app --reload --host 0.0.0.0 --port 8000
9.2 访问 API 文档
打开浏览器访问 http://localhost:8000/docs,查看自动生成的 API 文档。
9.3 测试 API 端点
使用 Postman 或 curl 测试 API 端点:
curl -X POST -H "Content-Type: application/json" -d '{"question": "你好"}' http://localhost:8000/api/chat/query
9.4 单元测试
pip install pytest
pytest tests/
10. 部署与监控
10.1 Docker 部署
Dockerfile
FROM python:3.10-slim
WORKDIR /app
# 安装系统依赖
RUN apt-get update && apt-get install -y \
build-essential \
curl \
&& rm -rf /var/lib/apt/lists/*
# 复制依赖文件
COPY requirements.txt .
# 安装 Python 依赖
RUN pip install --no-cache-dir -r requirements.txt
# 复制应用代码
COPY . .
# 暴露端口
EXPOSE 8000
# 启动命令
CMD ["uvicorn", "app.main:app", "--host", "0.0.0.0", "--port", "8000"]
构建和运行 Docker 镜像
docker build -t ai-agent .
docker run -d -p 8000:8000 --name ai-agent ai-agent
10.2 监控
安装 Prometheus 和 Grafana
docker run -d -p 9090:9090 -v /path/to/prometheus.yml:/etc/prometheus/prometheus.yml prom/prometheus
docker run -d -p 3000:3000 grafana/grafana
配置 Prometheus
global:
scrape_interval: 15s
scrape_configs:
- job_name: 'ai-agent'
static_configs:
- targets: ['ai-agent:8000']
11. 优化与迭代
11.1 性能优化
- 模型优化:
- 使用模型量化
- 调整 chunk 大小和 overlap
- 使用更高效的嵌入模型
- 向量数据库优化:
- 调整索引参数
- 使用 GPU 加速(如果可用)
- 定期清理无用数据
- API 优化:
11.2 功能迭代
- 添加更多文档格式支持
- 实现多语言支持
- 添加文档摘要功能
- 实现文档问答功能
- 添加文档分类功能
- 实现文档搜索功能
12. 常见问题与解决方案
12.1 文档加载失败
- 检查文件格式是否支持
- 检查文件编码是否正确
- 检查文件是否损坏
12.2 向量存储连接失败
- 检查环境变量配置
- 检查数据库服务是否运行
- 检查网络连接
12.3 API 调用失败
- 检查 API 端点是否正确
- 检查请求格式是否正确
- 检查服务器日志
12.4 响应速度慢
- 调整 chunk 大小
- 调整检索参数
- 考虑使用更强大的模型或硬件
13. 资源推荐
13.1 学习资源
13.2 开源项目
13.3 工具
- Postman:API 测试工具
- Docker:容器化工具
- Prometheus + Grafana:监控工具
- VS Code:代码编辑器
14. 总结
本教程提供了一个完整的 AI Agent 搭建流程,从环境准备到部署监控,涵盖了所有必要的步骤。您可以根据自己的需求和资源情况,选择适合的组件和配置。
AI Agent 的搭建是一个持续优化的过程,您可以根据实际使用情况不断调整和改进。