FastAPI 完全指南:现代 Python Web 开发的终极选择

FastAPI 完全指南:现代 Python Web 开发的终极选择

目录

  1. 引言:为什么选择 FastAPI?
  2. 环境搭建与基础配置
  3. 核心概念深度解析
  4. 路由与请求处理
  5. 数据验证与序列化
  6. 依赖注入系统
  7. 数据库集成
  8. 认证与安全
  9. 中间件与后台任务
  10. 测试与部署
  11. 性能优化最佳实践

引言:为什么选择 FastAPI?

FastAPI 是由 Sebastián Ramírez 于 2018 年创建的现代、高性能 Web 框架。它基于 Starlette(ASGI 工具集)和 Pydantic(数据验证库),为 Python 开发者带来了革命性的开发体验。

核心优势

特性说明
极致性能与 Node.js 和 Go 相当,是 Python 最快的框架之一
自动文档内置 Swagger UI 和 ReDoc,零配置生成交互式 API 文档
类型安全基于 Python 3.6+ 类型提示,减少 40% 的人为错误
智能编辑器支持完整的自动补全和类型检查
异步原生原生支持 async/await,轻松处理高并发

环境搭建与基础配置

安装 FastAPI

# 基础安装 pip install fastapi ​ # 生产环境需要 ASGI 服务器 pip install "uvicorn[standard]" ​ # 完整安装(包含所有常用依赖) pip install fastapi[all]

最小可行应用(MVP)

创建一个 main.py

from fastapi import FastAPI ​ app = FastAPI(    title="My Awesome API",    description="这是一个展示 FastAPI 强大功能的示例 API",    version="1.0.0",    docs_url="/docs",      # Swagger UI 路径    redoc_url="/redoc"     # ReDoc 路径 ) ​ @app.get("/") async def root():    return {"message": "Hello FastAPI!", "framework": "FastAPI"} ​ @app.get("/items/{item_id}") async def read_item(item_id: int, q: str | None = None):    """   获取特定项目信息       - **item_id**: 项目的唯一标识符   - **q**: 可选的查询参数   """    return {"item_id": item_id, "q": q}

启动应用

# 开发模式(热重载) uvicorn main:app --reload --host 0.0.0.0 --port 8000 ​ # 生产模式(多 worker) uvicorn main:app --workers 4 --host 0.0.0.0 --port 8000

启动后访问:

  • API 文档:http://localhost:8000/docs
  • 替代文档:http://localhost:8000/redoc

核心概念深度解析

1. 路径操作(Path Operations)

FastAPI 使用装饰器定义路由,支持所有 HTTP 方法:

from fastapi import FastAPI, HTTPException, status from enum import Enum ​ app = FastAPI() ​ class ModelName(str, Enum):    alexnet = "alexnet"    resnet = "resnet"    lenet = "lenet" ​ # GET 请求 @app.get("/models/{model_name}") async def get_model(model_name: ModelName):    if model_name == ModelName.alexnet:        return {"model_name": model_name, "message": "Deep Learning FTW!"}    if model_name.value == "lenet":        return {"model_name": model_name, "message": "LeCNN all the images"}    return {"model_name": model_name, "message": "Have some residuals"} ​ # POST 请求 - 创建资源 @app.post("/items/", status_code=status.HTTP_201_CREATED) async def create_item(item: dict):    return {"item": item, "message": "Item created successfully"} ​ # PUT 请求 - 完整更新 @app.put("/items/{item_id}") async def update_item(item_id: int, item: dict):    return {"item_id": item_id, **item} ​ # PATCH 请求 - 部分更新 @app.patch("/items/{item_id}") async def partial_update(item_id: int, item: dict):    return {"item_id": item_id, "updated_fields": list(item.keys())} ​ # DELETE 请求 @app.delete("/items/{item_id}") async def delete_item(item_id: int):    return {"message": f"Item {item_id} deleted"}

2. 请求参数详解

FastAPI 支持多种参数类型,自动进行验证和转换:

from fastapi import FastAPI, Query, Path, Body, File, UploadFile from typing import Annotated ​ app = FastAPI() ​ # 查询参数(Query Parameters) @app.get("/items/") async def read_items(    # 必填参数    q: str,    # 可选参数(带默认值)    skip: int = 0,    limit: int = 10,    # 复杂验证    price: Annotated[        float | None,        Query(            gt=0,           # 大于 0            le=1000,        # 小于等于 1000            description="价格范围必须在 0-1000 之间",            alias="item-price"  # 别名:?item-price=100       )   ] = None,    # 列表参数:?tags=foo&tags=bar    tags: Annotated[list[str] | None, Query()] = None,    # 布尔参数:?is_active=true    is_active: bool = True ):    results = {"items": [{"item_id": "Foo"}, {"item_id": "Bar"}], "skip": skip, "limit": limit}    if q:        results.update({"q": q})    return results ​ # 路径参数(Path Parameters)- 带验证 @app.get("/items/{item_id}") async def read_item(    item_id: Annotated[        int,        Path(            title="项目 ID",            description="项目的唯一标识符",            ge=1,  # 大于等于 1            le=1000       )   ] ):    return {"item_id": item_id} ​ # 请求体(Request Body) @app.post("/items/") async def create_item(    # 单个请求体    item: Annotated[dict, Body()],    # 嵌套请求体    user: Annotated[dict, Body(embed=True)],  # {"user": {...}}    # 额外数据    importance: Annotated[int, Body()] = 0 ):    return {        "item": item,        "user": user,        "importance": importance   } ​ # 文件上传 @app.post("/uploadfile/") async def create_upload_file(    file: UploadFile = File(..., description="上传的文件"),    # 多文件上传    files: list[UploadFile] = File(default=[], description="多个文件") ):    content = await file.read()    return {        "filename": file.filename,        "content_type": file.content_type,        "size": len(content),        "files_count": len(files)   }


数据验证与序列化

Pydantic 模型:FastAPI 的核心

from pydantic import BaseModel, Field, EmailStr, validator, root_validator from typing import Optional, List from datetime import datetime from enum import Enum ​ # 基础模型 class ItemBase(BaseModel):    title: str = Field(..., min_length=3, max_length=50, description="项目标题")    description: Optional[str] = Field(        None,        max_length=300,        title="项目描述",        example="这是一个示例描述"   )    price: float = Field(..., gt=0, description="必须大于 0")    tax: Optional[float] = None ​ # 创建时模型(无 ID) class ItemCreate(ItemBase):    pass ​ # 响应模型(包含 ID 和时间戳) class Item(ItemBase):    id: int    created_at: datetime    updated_at: Optional[datetime] = None        class Config:        orm_mode = True  # 支持 ORM 对象转换        schema_extra = {            "example": {                "title": "示例项目",                "description": "详细描述",                "price": 35.4,                "tax": 3.2,                "id": 1,                "created_at": "2024-01-01T00:00:00"           }       } ​ # 复杂验证示例 class UserCreate(BaseModel):    username: str = Field(..., min_length=3, max_length=20, regex="^[a-zA-Z0-9_]+$")    email: EmailStr    password: str = Field(..., min_length=8)    password_confirm: str    age: Optional[int] = Field(None, ge=18, le=120)        @validator('username')    def username_must_be_unique(cls, v):        # 模拟数据库检查        forbidden = ["admin", "root", "superuser"]        if v.lower() in forbidden:            raise ValueError(f'用户名 "{v}" 不可用')        return v        @root_validator    def passwords_match(cls, values):        pw1 = values.get('password')        pw2 = values.get('password_confirm')        if pw1 != pw2:            raise ValueError('两次输入的密码不匹配')        return values ​ # 使用模型 from fastapi import FastAPI ​ app = FastAPI() ​ @app.post("/items/", response_model=Item, status_code=201) async def create_item(item: ItemCreate):    # 模拟数据库操作    db_item = Item(        id=1,        created_at=datetime.now(),        **item.dict()   )    return db_item ​ @app.post("/users/", response_model=dict) async def create_user(user: UserCreate):    return {        "username": user.username,        "email": user.email,        "message": "用户创建成功"   }

嵌套模型与复杂数据结构

from pydantic import BaseModel, HttpUrl from typing import Set, List, Dict ​ class Image(BaseModel):    url: HttpUrl    name: str ​ class Item(BaseModel):    name: str    description: Optional[str] = None    price: float    tax: Optional[float] = None    tags: Set[str] = set()    images: Optional[List[Image]] = None        # 深度嵌套    metadata: Optional[Dict[str, str]] = None ​ # 使用示例 item_data = {    "name": "Foo",    "description": "The pretender",    "price": 42.0,    "tax": 3.2,    "tags": ["rock", "metal", "rock"],  # 自动去重为 Set    "images": [       {"url": "http://example.com/baz.jpg", "name": "The Foo live"},       {"url": "http://example.com/dave.jpg", "name": "The Baz"}   ],    "metadata": {"key1": "value1", "key2": "value2"} }


依赖注入系统

FastAPI 的依赖注入(Dependency Injection)是其最强大的特性之一,支持嵌套依赖、子依赖、路径操作装饰器依赖等。

from fastapi import Depends, HTTPException, status, Header from typing import Annotated from functools import lru_cache import jwt from datetime import datetime, timedelta ​ # 基础依赖 async def common_parameters(    q: str | None = None,    skip: int = 0,    limit: int = 100 ):    return {"q": q, "skip": skip, "limit": limit} ​ @app.get("/items/") async def read_items(commons: Annotated[dict, Depends(common_parameters)]):    return commons ​ # 类作为依赖(更复杂的场景) class CommonQueryParams:    def __init__(        self,        q: str | None = None,        skip: int = 0,        limit: int = 100   ):        self.q = q        self.skip = skip        self.limit = limit ​ @app.get("/users/") async def read_users(commons: Annotated[CommonQueryParams, Depends()]):    return commons ​ # 数据库依赖(实际应用模式) from sqlalchemy import create_engine from sqlalchemy.orm import sessionmaker, Session ​ SQLALCHEMY_DATABASE_URL = "sqlite:///./test.db" engine = create_engine(SQLALCHEMY_DATABASE_URL, connect_args={"check_same_thread": False}) SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine) ​ def get_db():    db = SessionLocal()    try:        yield db    finally:        db.close() ​ @app.get("/users/{user_id}") async def read_user(user_id: int, db: Annotated[Session, Depends(get_db)]):    # 使用 db 进行查询    return {"user_id": user_id} ​ # 认证依赖 SECRET_KEY = "your-secret-key" ALGORITHM = "HS256" ​ async def get_current_user(token: Annotated[str, Header(alias="X-Token")]):    credentials_exception = HTTPException(        status_code=status.HTTP_401_UNAUTHORIZED,        detail="Could not validate credentials",        headers={"WWW-Authenticate": "Bearer"},   )    try:        payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])        username: str = payload.get("sub")        if username is None:            raise credentials_exception    except jwt.PyJWTError:        raise credentials_exception        # 模拟获取用户    user = {"username": username, "id": 1}    if user is None:        raise credentials_exception    return user ​ async def get_current_active_user(    current_user: Annotated[dict, Depends(get_current_user)] ):    if current_user.get("disabled"):        raise HTTPException(status_code=400, detail="Inactive user")    return current_user ​ @app.get("/users/me") async def read_users_me(    current_user: Annotated[dict, Depends(get_current_active_user)] ):    return current_user ​ # 路径操作装饰器依赖(适用于整个路由) async def verify_token(x_token: Annotated[str, Header()]):    if x_token != "fake-super-secret-token":        raise HTTPException(status_code=400, detail="X-Token header invalid") ​ async def verify_key(x_key: Annotated[str, Header()]):    if x_key != "fake-super-secret-key":        raise HTTPException(status_code=400, detail="X-Key header invalid")    return x_key ​ @app.get("/items/", dependencies=[Depends(verify_token), Depends(verify_key)]) async def read_items():    return [{"item": "Foo"}, {"item": "Bar"}] ​ # 带 yield 的依赖(用于资源管理) async def get_db_with_transaction():    db = SessionLocal()    try:        yield db        db.commit()  # 成功时提交    except Exception:        db.rollback()  # 失败时回滚        raise    finally:        db.close()


数据库集成

SQLAlchemy 2.0 + FastAPI 最佳实践

from sqlalchemy import create_engine, Column, Integer, String, Float, DateTime, ForeignKey from sqlalchemy.orm import declarative_base, relationship, Session, joinedload from sqlalchemy.sql import func from fastapi import FastAPI, Depends, HTTPException from pydantic import BaseModel from typing import List, Optional from contextlib import asynccontextmanager ​ # 数据库配置 DATABASE_URL = "postgresql://user:password@localhost/dbname" engine = create_engine(DATABASE_URL, pool_pre_ping=True, pool_size=10, max_overflow=20) Base = declarative_base() ​ # 模型定义 class User(Base):    __tablename__ = "users"        id = Column(Integer, primary_key=True, index=True)    email = Column(String, unique=True, index=True, nullable=False)    hashed_password = Column(String, nullable=False)    full_name = Column(String)    is_active = Column(Integer, default=1)    created_at = Column(DateTime(timezone=True), server_default=func.now())        items = relationship("Item", back_populates="owner", cascade="all, delete-orphan") ​ class Item(Base):    __tablename__ = "items"        id = Column(Integer, primary_key=True, index=True)    title = Column(String, index=True)    description = Column(String)    price = Column(Float)    owner_id = Column(Integer, ForeignKey("users.id"))        owner = relationship("User", back_populates="items") ​ # Pydantic 模型 class UserBase(BaseModel):    email: str    full_name: Optional[str] = None ​ class UserCreate(UserBase):    password: str ​ class UserResponse(UserBase):    id: int    is_active: bool    created_at: Optional[str] = None        class Config:        from_attributes = True ​ class ItemBase(BaseModel):    title: str    description: Optional[str] = None    price: float ​ class ItemCreate(ItemBase):    pass ​ class ItemResponse(ItemBase):    id: int    owner_id: int        class Config:        from_attributes = True ​ class UserWithItems(UserResponse):    items: List[ItemResponse] = [] ​ # 数据库会话管理 def get_db():    db = Session(bind=engine)    try:        yield db    finally:        db.close() ​ # 生命周期管理 @asynccontextmanager async def lifespan(app: FastAPI):    # 启动时创建表(生产环境使用 Alembic 迁移)    Base.metadata.create_all(bind=engine)    yield    # 关闭时清理    engine.dispose() ​ app = FastAPI(lifespan=lifespan) ​ # CRUD 操作 @app.post("/users/", response_model=UserResponse, status_code=201) def create_user(user: UserCreate, db: Session = Depends(get_db)):    # 检查邮箱是否已存在    db_user = db.query(User).filter(User.email == user.email).first()    if db_user:        raise HTTPException(status_code=400, detail="Email already registered")        # 创建用户(实际应用需要哈希密码)    fake_hashed_password = user.password + "notreallyhashed"    db_user = User(        email=user.email,        hashed_password=fake_hashed_password,        full_name=user.full_name   )    db.add(db_user)    db.commit()    db.refresh(db_user)    return db_user ​ @app.get("/users/", response_model=List[UserResponse]) def read_users(skip: int = 0, limit: int = 100, db: Session = Depends(get_db)):    users = db.query(User).offset(skip).limit(limit).all()    return users ​ @app.get("/users/{user_id}", response_model=UserWithItems) def read_user(user_id: int, db: Session = Depends(get_db)):    # 使用 joinedload 避免 N+1 查询    user = db.query(User).options(joinedload(User.items)).filter(User.id == user_id).first()    if user is None:        raise HTTPException(status_code=404, detail="User not found")    return user ​ @app.post("/users/{user_id}/items/", response_model=ItemResponse) def create_item_for_user(    user_id: int,    item: ItemCreate,    db: Session = Depends(get_db) ):    db_user = db.query(User).filter(User.id == user_id).first()    if not db_user:        raise HTTPException(status_code=404, detail="User not found")        db_item = Item(**item.dict(), owner_id=user_id)    db.add(db_item)    db.commit()    db.refresh(db_item)    return db_item ​ @app.delete("/users/{user_id}") def delete_user(user_id: int, db: Session = Depends(get_db)):    user = db.query(User).filter(User.id == user_id).first()    if not user:        raise HTTPException(status_code=404, detail="User not found")        db.delete(user)    db.commit()    return {"message": "User deleted successfully"}

异步数据库(推荐用于高并发)

from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession, async_sessionmaker from sqlalchemy.future import select ​ # 使用异步驱动 ASYNC_DATABASE_URL = "postgresql+asyncpg://user:password@localhost/dbname" async_engine = create_async_engine(ASYNC_DATABASE_URL, echo=True) AsyncSessionLocal = async_sessionmaker(async_engine, class_=AsyncSession, expire_on_commit=False) ​ async def get_async_db():    async with AsyncSessionLocal() as session:        yield session ​ @app.get("/users/async", response_model=List[UserResponse]) async def read_users_async(    skip: int = 0,    limit: int = 100,    db: AsyncSession = Depends(get_async_db) ):    result = await db.execute(select(User).offset(skip).limit(limit))    users = result.scalars().all()    return users


认证与安全

JWT 认证完整实现

from datetime import datetime, timedelta from typing import Optional from jose import JWTError, jwt from passlib.context import CryptContext from fastapi import Depends, HTTPException, status from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm from pydantic import BaseModel ​ # 配置 SECRET_KEY = "09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7" ALGORITHM = "HS256" ACCESS_TOKEN_EXPIRE_MINUTES = 30 ​ # 密码加密上下文 pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto") ​ # OAuth2 方案 oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token") ​ # 模型 class Token(BaseModel):    access_token: str    token_type: str ​ class TokenData(BaseModel):    username: Optional[str] = None ​ class User(BaseModel):    username: str    email: Optional[str] = None    full_name: Optional[str] = None    disabled: Optional[bool] = None ​ class UserInDB(User):    hashed_password: str ​ # 模拟数据库 fake_users_db = {    "johndoe": {        "username": "johndoe",        "full_name": "John Doe",        "email": "[email protected]",        "hashed_password": pwd_context.hash("secret"),        "disabled": False,   } } ​ # 工具函数 def verify_password(plain_password, hashed_password):    return pwd_context.verify(plain_password, hashed_password) ​ def get_password_hash(password):    return pwd_context.hash(password) ​ def get_user(db, username: str):    if username in db:        user_dict = db[username]        return UserInDB(**user_dict) ​ def authenticate_user(fake_db, username: str, password: str):    user = get_user(fake_db, username)    if not user:        return False    if not verify_password(password, user.hashed_password):        return False    return user ​ def create_access_token(data: dict, expires_delta: Optional[timedelta] = None):    to_encode = data.copy()    if expires_delta:        expire = datetime.utcnow() + expires_delta    else:        expire = datetime.utcnow() + timedelta(minutes=15)    to_encode.update({"exp": expire})    encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)    return encoded_jwt ​ # 依赖 async def get_current_user(token: str = Depends(oauth2_scheme)):    credentials_exception = HTTPException(        status_code=status.HTTP_401_UNAUTHORIZED,        detail="Could not validate credentials",        headers={"WWW-Authenticate": "Bearer"},   )    try:        payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])        username: str = payload.get("sub")        if username is None:            raise credentials_exception        token_data = TokenData(username=username)    except JWTError:        raise credentials_exception    user = get_user(fake_users_db, username=token_data.username)    if user is None:        raise credentials_exception    return user ​ async def get_current_active_user(current_user: User = Depends(get_current_user)):    if current_user.disabled:        raise HTTPException(status_code=400, detail="Inactive user")    return current_user ​ # 路由 @app.post("/token", response_model=Token) async def login_for_access_token(form_data: OAuth2PasswordRequestForm = Depends()):    user = authenticate_user(fake_users_db, form_data.username, form_data.password)    if not user:        raise HTTPException(            status_code=status.HTTP_401_UNAUTHORIZED,            detail="Incorrect username or password",            headers={"WWW-Authenticate": "Bearer"},       )    access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)    access_token = create_access_token(        data={"sub": user.username}, expires_delta=access_token_expires   )    return {"access_token": access_token, "token_type": "bearer"} ​ @app.get("/users/me/", response_model=User) async def read_users_me(current_user: User = Depends(get_current_active_user)):    return current_user ​ @app.get("/users/me/items/") async def read_own_items(current_user: User = Depends(get_current_active_user)):    return [{"item_id": "Foo", "owner": current_user.username}]

CORS 配置与安全头

from fastapi.middleware.cors import CORSMiddleware from fastapi.middleware.trustedhost import TrustedHostMiddleware from fastapi.middleware.gzip import GZipMiddleware ​ app = FastAPI() ​ # CORS 配置 app.add_middleware(    CORSMiddleware,    allow_origins=["https://example.com", "https://www.example.com"],    allow_credentials=True,    allow_methods=["GET", "POST", "PUT", "DELETE"],    allow_headers=["*"],    expose_headers=["X-Custom-Header"],    max_age=600,  # 预检请求缓存时间 ) ​ # 受信任主机 app.add_middleware(    TrustedHostMiddleware,    allowed_hosts=["example.com", "*.example.com"] ) ​ # Gzip 压缩 app.add_middleware(GZipMiddleware, minimum_size=1000) ​ # 安全头中间件 @app.middleware("http") async def add_security_headers(request, call_next):    response = await call_next(request)    response.headers["X-Content-Type-Options"] = "nosniff"    response.headers["X-Frame-Options"] = "DENY"    response.headers["X-XSS-Protection"] = "1; mode=block"    response.headers["Strict-Transport-Security"] = "max-age=31536000; includeSubDomains"    return response


中间件与后台任务

自定义中间件

import time import logging from fastapi import Request from starlette.middleware.base import BaseHTTPMiddleware ​ # 日志配置 logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) ​ class TimingMiddleware(BaseHTTPMiddleware):    async def dispatch(self, request: Request, call_next):        start_time = time.time()                # 记录请求信息        logger.info(f"Request: {request.method} {request.url}")                response = await call_next(request)                process_time = time.time() - start_time        response.headers["X-Process-Time"] = str(process_time)                logger.info(f"Response time: {process_time:.4f}s - Status: {response.status_code}")        return response ​ class RateLimitMiddleware(BaseHTTPMiddleware):    def __init__(self, app, max_requests: int = 100, window_seconds: int = 60):        super().__init__(app)        self.max_requests = max_requests        self.window_seconds = window_seconds        self.requests = {}        async def dispatch(self, request: Request, call_next):        client_ip = request.client.host        current_time = time.time()                # 清理过期记录        self.requests = {            ip: [t for t in times if current_time - t < self.window_seconds]            for ip, times in self.requests.items()       }                # 检查限制        if len(self.requests.get(client_ip, [])) >= self.max_requests:            from fastapi.responses import JSONResponse            return JSONResponse(                status_code=429,                content={"detail": "Rate limit exceeded"}           )                # 记录请求        if client_ip not in self.requests:            self.requests[client_ip] = []        self.requests[client_ip].append(current_time)                return await call_next(request) ​ app.add_middleware(TimingMiddleware) app.add_middleware(RateLimitMiddleware, max_requests=100, window_seconds=60)

后台任务

from fastapi import BackgroundTasks, Depends from typing import Annotated import asyncio import smtplib from email.mime.text import MIMEText ​ def send_email(email_to: str, subject: str, body: str):    """模拟发送邮件(实际应用中配置 SMTP)"""    print(f"Sending email to {email_to}")    # smtp_server = smtplib.SMTP("smtp.gmail.com", 587)    # ... ​ async def process_large_file(file_path: str):    """异步处理大文件"""    await asyncio.sleep(10)  # 模拟长时间处理    print(f"File {file_path} processed") ​ @app.post("/send-notification/{email}") async def send_notification(    email: str,    background_tasks: BackgroundTasks,    message: str = "Hello from FastAPI" ):    background_tasks.add_task(send_email, email, "Notification", message)    return {"message": "Notification sent in the background"} ​ @app.post("/upload/") async def upload_file(    background_tasks: BackgroundTasks,    file: UploadFile = File(...) ):    file_path = f"/tmp/{file.filename}"    with open(file_path, "wb") as f:        content = await file.read()        f.write(content)        # 添加后台任务    background_tasks.add_task(process_large_file, file_path)        return {        "filename": file.filename,        "status": "File uploaded, processing in background"   } ​ # 使用 Celery 进行更复杂的后台任务(生产环境推荐) """ from celery import Celery ​ celery_app = Celery(   "tasks",   broker="redis://localhost:6379/0",   backend="redis://localhost:6379/0" ) ​ @celery_app.task def heavy_computation(data: dict):   import time   time.sleep(30)   return {"result": "completed", "data": data} ​ @app.post("/heavy-task/") async def trigger_heavy_task(data: dict):   task = heavy_computation.delay(data)   return {"task_id": task.id, "status": "processing"} """


测试与部署

测试 FastAPI 应用

# test_main.py import pytest from fastapi.testclient import TestClient from sqlalchemy import create_engine from sqlalchemy.orm import sessionmaker from sqlalchemy.pool import StaticPool ​ from main import app, get_db, Base ​ # 内存数据库用于测试 SQLALCHEMY_DATABASE_URL = "sqlite:///:memory:" ​ engine = create_engine(    SQLALCHEMY_DATABASE_URL,    connect_args={"check_same_thread": False},    poolclass=StaticPool, ) TestingSessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine) ​ def override_get_db():    try:        db = TestingSessionLocal()        yield db    finally:        db.close() ​ app.dependency_overrides[get_db] = override_get_db client = TestClient(app) ​ @pytest.fixture(scope="function") def setup_database():    Base.metadata.create_all(bind=engine)    yield    Base.metadata.drop_all(bind=engine) ​ def test_create_user(setup_database):    response = client.post(        "/users/",        json={"email": "[email protected]", "password": "testpass123", "full_name": "Test User"}   )    assert response.status_code == 201    data = response.json()    assert data["email"] == "[email protected]"    assert "id" in data ​ def test_read_user(setup_database):    # 先创建用户    client.post(        "/users/",        json={"email": "[email protected]", "password": "testpass123"}   )        response = client.get("/users/1")    assert response.status_code == 200    assert response.json()["email"] == "[email protected]" ​ def test_read_nonexistent_user(setup_database):    response = client.get("/users/999")    assert response.status_code == 404 ​ def test_invalid_user_data(setup_database):    response = client.post(        "/users/",        json={"email": "invalid-email", "password": "short"}   )    assert response.status_code == 422  # 验证错误 ​ # 异步测试 import httpx import pytest_asyncio ​ @pytest_asyncio.fixture async def async_client():    async with httpx.AsyncClient(app=app, base_url="http://test") as ac:        yield ac ​ @pytest.mark.asyncio async def test_async_endpoint(async_client):    response = await async_client.get("/")    assert response.status_code == 200

Docker 部署

# Dockerfile FROM python:3.11-slim ​ WORKDIR /app ​ # 安装依赖 COPY requirements.txt . RUN pip install --no-cache-dir -r requirements.txt ​ # 复制应用代码 COPY . . ​ # 非 root 用户运行 RUN useradd -m -u 1000 appuser && chown -R appuser:appuser /app USER appuser ​ # 暴露端口 EXPOSE 8000 ​ # 启动命令(生产环境使用 gunicorn + uvicorn) CMD ["gunicorn", "main:app", "-w", "4", "-k", "uvicorn.workers.UvicornWorker", "-b", "0.0.0.0:8000"] # docker-compose.yml version: '3.8' ​ services: web:   build: .   ports:     - "8000:8000"   environment:     - DATABASE_URL=postgresql://postgres:password@db:5432/fastapi_db     - SECRET_KEY=${SECRET_KEY}   depends_on:     - db     - redis   volumes:     - ./:/app   command: uvicorn main:app --host 0.0.0.0 --port 8000 --reload ​ db:   image: postgres:15   environment:     - POSTGRES_USER=postgres     - POSTGRES_PASSWORD=password     - POSTGRES_DB=fastapi_db   volumes:     - postgres_data:/var/lib/postgresql/data   ports:     - "5432:5432" ​ redis:   image: redis:7-alpine   ports:     - "6379:6379" ​ volumes: postgres_data:

Kubernetes 部署配置

# k8s-deployment.yaml apiVersion: apps/v1 kind: Deployment metadata: name: fastapi-app spec: replicas: 3 selector:   matchLabels:     app: fastapi template:   metadata:     labels:       app: fastapi   spec:     containers:     - name: fastapi       image: your-registry/fastapi-app:latest       ports:       - containerPort: 8000       env:       - name: DATABASE_URL         valueFrom:           secretKeyRef:             name: db-secret             key: url       resources:         requests:           memory: "256Mi"           cpu: "250m"         limits:           memory: "512Mi"           cpu: "500m"       livenessProbe:         httpGet:           path: /health           port: 8000         initialDelaySeconds: 30         periodSeconds: 10       readinessProbe:         httpGet:           path: /ready           port: 8000         initialDelaySeconds: 5         periodSeconds: 5 --- apiVersion: v1 kind: Service metadata: name: fastapi-service spec: selector:   app: fastapi ports: - port: 80   targetPort: 8000 type: LoadBalancer


性能优化最佳实践

1. 异步数据库连接池

from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession from sqlalchemy.orm import sessionmaker import aioredis ​ # 优化连接池配置 async_engine = create_async_engine(    "postgresql+asyncpg://user:pass@localhost/db",    pool_size=20,              # 基础连接数    max_overflow=10,           # 最大溢出连接    pool_pre_ping=True,        # 连接前 ping 检查    pool_recycle=3600,         # 连接回收时间    echo=False                 # 生产环境关闭 SQL 日志 ) ​ # Redis 连接池 redis_pool = aioredis.ConnectionPool.from_url(    "redis://localhost",    max_connections=100,    decode_responses=True )

2. 缓存策略

from fastapi_cache import FastAPICache from fastapi_cache.backends.redis import RedisBackend from fastapi_cache.decorator import cache from redis import asyncio as aioredis ​ @app.on_event("startup") async def startup():    redis = aioredis.from_url("redis://localhost", encoding="utf8", decode_responses=True)    FastAPICache.init(RedisBackend(redis), prefix="fastapi-cache") ​ @app.get("/heavy-computation") @cache(expire=3600)  # 缓存 1 小时 async def heavy_computation():    # 模拟耗时操作    await asyncio.sleep(2)    return {"result": "expensive data"} ​ # 手动缓存控制 from fastapi_cache.coder import JsonCoder ​ @app.get("/users/{user_id}") async def get_user(user_id: int):    cache_key = f"user:{user_id}"        # 尝试从缓存获取    cached = await FastAPICache.get_backend().get(cache_key)    if cached:        return json.loads(cached)        # 查询数据库    user = await fetch_user_from_db(user_id)        # 写入缓存    await FastAPICache.get_backend().set(        cache_key,        json.dumps(user),        expire=300   )    return user

3. 响应模型优化

from fastapi import Response from fastapi.responses import ORJSONResponse, StreamingResponse import orjson ​ # 使用更快的 JSON 库 @app.get("/items/", response_class=ORJSONResponse) async def get_items():    return [{"id": i, "data": "value"} for i in range(1000)] ​ # 流式响应(大文件) @app.get("/large-file/") async def get_large_file():    def file_generator():        with open("large_file.zip", "rb") as f:            while chunk := f.read(8192):                yield chunk        return StreamingResponse(        file_generator(),        media_type="application/zip",        headers={"Content-Disposition": "attachment; filename=large_file.zip"}   ) ​ # 分页优化 from fastapi import Query from sqlalchemy import func ​ @app.get("/items/paginated/") async def get_items_paginated(    page: int = Query(1, ge=1),    size: int = Query(20, ge=1, le=100),    db: AsyncSession = Depends(get_async_db) ):    offset = (page - 1) * size        # 使用 count(*) OVER() 优化总数查询    result = await db.execute(        select(            Item,            func.count().over().label("total")       )       .offset(offset)       .limit(size)   )        rows = result.all()    total = rows[0].total if rows else 0        return {        "items": [row.Item for row in rows],        "total": total,        "page": page,        "size": size,        "pages": (total + size - 1) // size   }

4. 性能监控

from prometheus_client import Counter, Histogram, generate_latest from starlette.middleware.base import BaseHTTPMiddleware ​ # Prometheus 指标 REQUEST_COUNT = Counter(    'http_requests_total',    'Total HTTP requests',   ['method', 'endpoint', 'status'] ) ​ REQUEST_LATENCY = Histogram(    'http_request_duration_seconds',    'HTTP request latency',   ['method', 'endpoint'] ) ​ class PrometheusMiddleware(BaseHTTPMiddleware):    async def dispatch(self, request, call_next):        start_time = time.time()        response = await call_next(request)        duration = time.time() - start_time                REQUEST_COUNT.labels(            method=request.method,            endpoint=request.url.path,            status=response.status_code       ).inc()                REQUEST_LATENCY.labels(            method=request.method,            endpoint=request.url.path       ).observe(duration)                return response ​ app.add_middleware(PrometheusMiddleware) ​ @app.get("/metrics") async def metrics():    return Response(generate_latest(), media_type="text/plain")


总结

FastAPI 代表了 Python Web 开发的现代化方向,它通过以下特性成为构建高性能 API 的首选框架:

  1. 开发效率:类型提示带来的智能补全和自动文档生成
  2. 运行时性能:异步原生设计,媲美 Node.js 和 Go
  3. 工程规范:内置数据验证、依赖注入、认证授权等生产级功能
  4. 生态兼容:与 SQLAlchemy、Pydantic、Celery 等主流库无缝集成

学习路径建议

  1. 入门:掌握基础路由、请求参数、Pydantic 模型
  2. 进阶:深入依赖注入系统、数据库集成、异步编程
  3. 生产:学习 Docker 部署、Kubernetes 编排、监控告警
  4. 架构:微服务拆分、事件驱动架构、CQRS 模式

FastAPI 不仅是一个框架,更是一种现代 Python 工程实践的体现。无论你是构建小型微服务还是大型分布式系统,FastAPI 都能提供坚实的技术支撑。


本文代码示例基于 FastAPI 0.104+ 和 Python 3.11+,建议在实际项目中使用最新稳定版本。

Read more

开源AI桌宠AIRI完整部署指南

开源AI桌宠AIRI完整部署指南

本文手把手记录了 AIRI 的快速部署全流程:从设置中文界面、接入阿里百炼 API,到配置本地 TTS 服务,适合想低成本体验多模态 AI 桌宠的普通用户。 序言 最近在用一个叫 AIRI 的开源 AI 项目,部署简单、还能生成一个可调形象的桌面桌宠。我花了一整天_time_实测了从模型接入、TTS 语音、到跨平台联动的全过程——没有花哨概念,只有真实可用的细节。如果你也想拥有一个能聊天、会说话、还能接入 Discord 或游戏的“数字伙伴”,这篇就是你想要的落地指南。 演示 该项目支持ios、windwos、linux下载,看清型号配置点击下载即可安装,下载链接放在文章最后了。 打开软件,桌面上就会出现一个桌宠,这里的形象、大小都是可以调整的,感兴趣的可以尝试自定义,可以先连接上大模型再优化这个形象。 在设置>外观&

2026年UI设计师必备10款AI工具,从新手到大神都够用

回望UI设计软件的迭代之路,每年都有新工具冒尖,也有曾经的经典慢慢淡出视野。如今市面上的UI设计工具越来越多,选择范围广了,可不少设计师反而犯了难——到底该选哪款才适配自己的工作? 下面我就结合实际使用体验,整理了10款实用UI设计工具,每款在功能、协作性和创新性上各有侧重,不管是刚入门的新手,还是追求高效的资深设计师,都能找到合适的款,一起来看看吧! 1. UXbot 这是一款很懂国内设计师需求的国产新工具,上手完全没有难度。UXbot 聚焦产品原型、UI 设计与前端开发全链路的 AI智能平台。用户无需代码基础,通过文字描述即可生成高保真多页面原型,支持像素级编辑与沉浸式交互设计;基于云端共享功能,可实现跨角色高效协同,显著提升团队沟通与迭代效率。 核心亮点: * 多页面项目生成:输入文字描述或示例截图,UXbot 即可智能解析需求、构建用户旅程图谱并自主选择生成页面,一次性输出整套界面体系,大幅提升构想落地效率。 * 自由编辑:集成自然语言交互与专业级精密编辑器,支持像素级细节调整,兼顾创意灵活性与设计专业性。 * 即时交互原型:一键生成含真实用户流程的可交互演示

【博客之星2025年度总评选】2025年度技术博客总结:从Python基础到AI前沿的进阶之旅

【博客之星2025年度总评选】2025年度技术博客总结:从Python基础到AI前沿的进阶之旅

本文目录 一、个人成长与突破盘点 1.1 技术深度与广度的双重突破 1.2 问题解决能力的显著提升 1.3 技术视野的前瞻性拓展 二、年度创作历程回顾 2.1 从基础到高级的系统化梳理 2.2 内容质量的持续提升 三、个人生活与博客事业的融合与平衡 四、结语         2025年对于我而言,是技术深耕与突破的关键一年。作为一位专注于Python技术栈的开发者,在这一年中不仅实现了个人技术能力的飞跃,更通过高质量的博客内容为众多开发者提供了实用的技术指南。以下是对2025年度博客创作的全面总结。 一、个人成长与突破盘点 1.1 技术深度与广度的双重突破         2025年的技术探索从Python基础逐步深入到高级应用与前沿领域。年初,专注于Python核心模块的深度解析,如random、math、operator等模块的高级用法,展现了扎实的Python基础功底。随着年份推进和技术视野不断拓展,逐步覆盖了AI绘画、OpenAI API集成、Gemini 3.0等前沿技术领域。         特别值得一提的是,