FastAPI 打造基于 LLM 的 Web 接口实战教程
随着大语言模型(LLM)的蓬勃发展,利用 FastAPI 构建高性能、异步的 AI 应用接口已成为开发者的首选方案。本文将围绕 FastAPI、OpenAI API 以及 SQLModel 和 FastCRUD,创建一个个性化的电子邮件写作助手,展示如何结合这些技术来构建强大的应用程序。
一、安装环境
首先,我们创建一个项目文件夹并进入其中:
mkdir email-assistant-api
cd email-assistant-api
推荐使用 Poetry 管理 Python 依赖。如果尚未安装 Poetry,请先执行:
pip install poetry
在 email-assistant-api 文件夹中初始化项目:
poetry init
按提示输入项目名称等信息,默认选项即可。确认后会生成 pyproject.toml 文件,这是 Poetry 管理依赖的核心配置文件。
接下来添加核心依赖项:
poetry add fastapi fastcrud sqlmodel openai aiosqlite greenlet python-jose bcrypt uvicorn[standard]
此时会生成 poetry.lock 文件,锁定已安装包的具体版本以确保环境一致性。
二、项目结构
一个标准的 FastAPI 应用程序通常包含模型、架构和端点三个主要部分。针对本项目的规模,推荐如下目录结构:
email_assistant_api/
├── app/
│ ├── __init__.py
│ ├── main.py # 应用入口及生命周期管理
│ ├── routes.py # API 路由定义与端点逻辑
│ ├── database.py # 数据库连接与会话管理
│ ├── models.py # SQLModel 数据模型定义
│ ├── crud.py # 使用 FastCRUD 实现的 CRUD 操作
│ ├── schemas.py # 请求与响应数据验证模式
│ └── .env # 环境变量配置
├── pyproject.toml # 项目配置与依赖
├── README.md # 项目文档
└── .gitignore # 版本控制忽略文件
- models.py:定义数据库表的抽象模型。
- schemas.py:用于 Pydantic/SQLModel 的数据验证与序列化。
- routes.py:定义具体的 API 路由。
- database.py:处理数据库连接池与 Session 管理。
- crud.py:封装数据库增删改查逻辑。
- .env:存储敏感信息如 API Key。
- main.py:FastAPI 实例化及中间件配置。
注意:此结构适用于中小型应用。对于大型项目,建议参考更复杂的模板,例如 FastAPI Boilerplate。
2.1 数据库建模
我们需要两个核心模型:User(用户)和 EmailLog(邮件日志)。
- User:包含用户名、姓名、邮箱及哈希后的密码。
- EmailLog:记录用户的输入、生成的邮件内容、上下文信息及时间戳。一个用户可以拥有多条日志。
models.py 代码示例
# app/models.py
from sqlmodel import SQLModel, Field
from typing import Optional
from datetime import datetime
class User(SQLModel, table=True):
id: Optional[int] = Field(default=None, primary_key=True)
name: str = Field(..., min_length=2, max_length=30)
username: str = Field(..., min_length=2, max_length=20)
email: str
hashed_password: str
class EmailLog(SQLModel, table=True):
id: Optional[int] = Field(default=None, primary_key=True)
user_id: int = Field(foreign_key="user.id")
user_input: str
reply_to: Optional[str] = Field(default=None)
context: Optional[str] = Field(default=None)
length: Optional[int] = Field(default=None)
tone: str
generated_email: str
timestamp: datetime = Field(default_factory=datetime.now)
为了与数据库交互,我们在 crud.py 中为每个模型实例化 FastCRUD:
# app/crud.py
from fastcrud import FastCRUD
from .models import User, EmailLog
crud_user = FastCRUD(User)
crud_email_log = FastCRUD(EmailLog)
2.2 创建 Schemas
Schemas 用于验证请求数据和序列化响应数据。
schemas.py 代码示例
# app/schemas.py
from datetime import datetime
from typing import Optional
from sqlmodel import SQLModel, Field
from zoneinfo import ZoneInfo
UTC = ZoneInfo("UTC")
# ------- User Schemas -------
class UserCreate(SQLModel):
name: str
username: str
email: str
password: str
class UserRead(SQLModel):
id: int
name: str
username: str
email: str
class UserCreateInternal(SQLModel):
name: str
username: str
email: str
hashed_password: str
# ------- Email Request/Response -------
class EmailRequest(SQLModel):
user_input: str
reply_to: Optional[str] = None
context: Optional[str] = None
length: int = 120
tone: str = "formal"
class EmailResponse(SQLModel):
generated_email: str
# ------- Email Log Schemas -------
class EmailLogCreate(SQLModel):
user_id: int
user_input: str
reply_to: Optional[str] = None
context: Optional[str] = None
length: Optional[int] = None
tone: Optional[str] = None
generated_email: str
timestamp: datetime = Field(default_factory=lambda: datetime.now(UTC))
class EmailLogRead(SQLModel):
user_id: int
user_input: str
reply_to: Optional[str]
context: Optional[str]
length: Optional[int]
tone: Optional[str]
generated_email: str
timestamp: datetime
2.3 初始化应用程序与数据库
尽管有了模型和 Schema,我们还需要建立数据库连接和应用实例。
database.py 代码示例
# app/database.py
from sqlmodel import SQLModel, create_engine, AsyncSession
from sqlalchemy.ext.asyncio import create_async_engine, async_sessionmaker
from sqlalchemy.orm import sessionmaker
DATABASE_URL = "sqlite+aiosqlite:///./emailassistant.db"
engine = create_async_engine(DATABASE_URL, echo=True)
async_session_maker = async_sessionmaker(
engine, class_=AsyncSession, expire_on_commit=False
)
async def create_db_and_tables():
async with engine.begin() as conn:
await conn.run_sync(SQLModel.metadata.create_all)
async def get_session() -> AsyncSession:
async with async_session_maker() as session:
yield session
main.py 代码示例
# app/main.py
from fastapi import FastAPI
from .database import create_db_and_tables
async def lifespan(app: FastAPI):
await create_db_and_tables()
yield
app = FastAPI(lifespan=lifespan)
运行以下命令启动开发服务器:
poetry run uvicorn app.main:app --reload
访问 http://127.0.0.1:8000/docs 即可查看自动生成的 Swagger UI 文档。
三、实现核心业务逻辑
3.1 集成 OpenAI API
首先,在 .env 文件中配置 OpenAI API Key,并确保该文件已被 .gitignore 忽略。
# app/.env
OPENAI_API_KEY="your_actual_api_key_here"
在 routes.py 中加载环境变量并初始化客户端:
# app/routes.py
import os
from starlette.config import Config
from openai import OpenAI
current_file_dir = os.path.dirname(os.path.realpath(__file__))
env_path = os.path.join(current_file_dir, ".env")
config = Config(env_path)
OPENAI_API_KEY = config("OPENAI_API_KEY")
open_ai_client = OpenAI(api_key=OPENAI_API_KEY)
3.2 创建邮件生成端点
我们将创建一个系统提示符以规范输出格式,并将用户输入传递给 OpenAI 客户端。
# app/routes.py (部分)
from fastapi import APIRouter, Depends, HTTPException
from sqlalchemy.ext.asyncio import AsyncSession
from .schemas import EmailRequest, EmailResponse
from .database import get_session
from .crud import crud_email_logs
email_router = APIRouter()
@email_router.post("/", response_model=EmailResponse)
async def generate_email(
request: EmailRequest,
db: AsyncSession = Depends(get_session),
):
try:
system_prompt = """
You are a helpful email assistant.
You get a prompt to write an email,
you reply with the email and nothing else.
"""
prompt = f"""
Write an email based on the following input:
- User Input: {request.user_input}
- Reply To: {request.reply_to if request.reply_to else 'N/A'}
- Context: {request.context if request.context else 'N/A'}
- Length: {request.length} characters
- Tone: {request.tone}
"""
response = await open_ai_client.chat.completions.create(
model="gpt-3.5-turbo",
messages=[
{"role": "system", "content": system_prompt},
{"role": "user", "content": prompt},
],
max_tokens=request.length
)
generated_email = response.choices[0].message.content.strip()
log_entry = EmailLogCreate(
user_id=1, # 暂时硬编码,后续接入认证
user_input=request.user_input,
reply_to=request.reply_to,
context=request.context,
length=request.length,
tone=request.tone,
generated_email=generated_email,
)
await crud_email_logs.create(db, log_entry)
return EmailResponse(generated_email=generated_email)
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
在 main.py 中包含路由器:
# app/main.py
from .routes import email_router
app.include_router(email_router, prefix="/generate", tags=["Email"])
四、用户功能、身份验证和安全性
生产环境必须包含用户认证。我们将使用 JWT (JSON Web Token) 和 OAuth2 Password Bearer。
4.1 安全工具类
创建 helper.py 处理密码哈希和令牌生成。
# app/helper.py
import os
from datetime import UTC, datetime, timedelta
from typing import Any, Annotated
import bcrypt
from jose import JWTError, jwt
from fastapi import Depends, HTTPException
from fastapi.security import OAuth2PasswordBearer
from sqlalchemy.ext.asyncio import AsyncSession
from starlette.config import Config
from .database import get_session
from .crud import crud_users
current_file_dir = os.path.dirname(os.path.realpath(__file__))
env_path = os.path.join(current_file_dir, ".env")
config = Config(env_path)
SECRET_KEY = config("SECRET_KEY")
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/users/login")
class Token(SQLModel):
access_token: str
token_type: str
class TokenData(SQLModel):
username_or_email: str
async def verify_password(plain_password: str, hashed_password: str) -> bool:
return bcrypt.checkpw(plain_password.encode(), hashed_password.encode())
def get_password_hash(password: str) -> str:
return bcrypt.hashpw(password.encode(), bcrypt.gensalt()).decode()
async def create_access_token(data: dict[str, Any], expires_delta: timedelta | None = None) -> str:
to_encode = data.copy()
if expires_delta:
expire = datetime.now(UTC).replace(tzinfo=None) + expires_delta
else:
expire = datetime.now(UTC).replace(tzinfo=None) + timedelta(minutes=15)
to_encode.update({"exp": expire})
return jwt.encode(to_encode, SECRET_KEY, algorithm="HS256")
async def verify_token(token: str, db: AsyncSession) -> TokenData | None:
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=["HS256"])
username_or_email: str = payload.get("sub")
if username_or_email is None:
return None
return TokenData(username_or_email=username_or_email)
except JWTError:
return None
async def authenticate_user(username_or_email: str, password: str, db: AsyncSession):
if "@" in username_or_email:
db_user = await crud_users.get(db=db, email=username_or_email, is_deleted=False)
else:
db_user = await crud_users.get(db=db, username=username_or_email, is_deleted=False)
if not db_user:
return False
elif not await verify_password(password, db_user["hashed_password"]):
return False
return db_user
async def get_current_user(
token: Annotated[str, Depends(oauth2_scheme)],
db: Annotated[AsyncSession, Depends(get_session)]
) -> dict[str, Any] | None:
token_data = await verify_token(token, db)
if token_data is None:
raise HTTPException(status_code=401, detail="User not authenticated.")
if "@" in token_data.username_or_email:
user = await crud_users.get(db=db, email=token_data.username_or_email, is_deleted=False)
else:
user = await crud_users.get(db=db, username=token_data.username_or_email, is_deleted=False)
if user:
return user
raise HTTPException(status_code=401, detail="User not authenticated.")
4.2 注册与登录端点
# app/routes.py (User Router)
from datetime import timedelta
from fastapi import APIRouter, Depends, HTTPException
from fastapi.security import OAuth2PasswordRequestForm
from .schemas import UserCreate, UserRead
from .helper import (
get_password_hash,
authenticate_user,
create_access_token,
get_current_user,
Token
)
user_router = APIRouter()
@user_router.post("/register", response_model=UserRead)
async def register_user(
user: UserCreate,
db: AsyncSession = Depends(get_session)
):
hashed_password = get_password_hash(user.password)
user_data = user.dict()
user_data["hashed_password"] = hashed_password
del user_data["password"]
new_user = await crud_users.create(
db,
object=UserCreateInternal(**user_data)
)
return new_user
@user_router.post("/login", response_model=Token)
async def login_user(
form_data: Annotated[OAuth2PasswordRequestForm, Depends()],
db: AsyncSession = Depends(get_session)
):
user = await authenticate_user(
username_or_email=form_data.username,
password=form_data.password,
db=db
)
if not user:
raise HTTPException(status_code=400, detail="Invalid credentials")
access_token_expires = timedelta(minutes=30)
access_token = await create_access_token(
data={"sub": user["username"]},
expires_delta=access_token_expires
)
return {"access_token": access_token, "token_type": "bearer"}
4.3 保护端点
将 get_current_user 依赖注入到需要认证的端点中。
# app/routes.py (Protected Email Endpoint)
@email_router.post("/", response_model=EmailResponse)
async def generate_email(
request: EmailRequest,
db: AsyncSession = Depends(get_session),
current_user: dict = Depends(get_current_user)
):
# ... (Prompt logic same as before)
# ... (Call OpenAI)
# ... (Save log with current_user['id'])
pass
五、测试与部署建议
为了确保应用的健壮性,建议在本地完成单元测试,并进行生产环境部署。
5.1 接口测试
可以使用 curl 或 Postman 进行测试。获取 Token 后,在 Header 中添加 Authorization: Bearer <token>。
curl -X POST http://127.0.0.1:8000/generate/ \n-H "Authorization: Bearer YOUR_TOKEN" \n-H "Content-Type: application/json" \n-d '{"user_input": "Hello", "tone": "friendly"}'
5.2 生产部署
在生产环境中,建议使用多进程 Worker 模式运行 Uvicorn,并配合 Gunicorn 管理进程。
pip install gunicorn
启动命令:
gunicorn app.main:app -w 4 -k uvicorn.workers.UvicornWorker --bind 0.0.0.0:8000
同时,务必确保 .env 文件中的密钥不上传至代码仓库,并使用环境变量管理敏感配置。
5.3 安全最佳实践
- 速率限制:防止 API 滥用,可使用
slowapi库对端点进行限流。 - CORS 配置:在
main.py中正确配置跨域资源共享策略。 - HTTPS:生产环境必须启用 HTTPS 加密传输。
六、总结
本文详细介绍了如何使用 FastAPI 结合 OpenAI API 构建一个具备用户认证功能的邮件生成服务。通过 SQLModel 进行数据建模,FastCRUD 简化数据库操作,并利用 JWT 保障接口安全。该架构具备良好的扩展性,可轻松迁移至其他 LLM 服务或增加 RAG(检索增强生成)能力。开发者可根据实际需求进一步丰富功能,如添加多模态支持或知识库检索。


