FastAPI 完全指南:现代 Python Web 开发的终极选择

FastAPI 完全指南:现代 Python Web 开发的终极选择

目录

  1. 引言:为什么选择 FastAPI?
  2. 环境搭建与基础配置
  3. 核心概念深度解析
  4. 路由与请求处理
  5. 数据验证与序列化
  6. 依赖注入系统
  7. 数据库集成
  8. 认证与安全
  9. 中间件与后台任务
  10. 测试与部署
  11. 性能优化最佳实践

引言:为什么选择 FastAPI?

FastAPI 是由 Sebastián Ramírez 于 2018 年创建的现代、高性能 Web 框架。它基于 Starlette(ASGI 工具集)和 Pydantic(数据验证库),为 Python 开发者带来了革命性的开发体验。

核心优势

特性说明
极致性能与 Node.js 和 Go 相当,是 Python 最快的框架之一
自动文档内置 Swagger UI 和 ReDoc,零配置生成交互式 API 文档
类型安全基于 Python 3.6+ 类型提示,减少 40% 的人为错误
智能编辑器支持完整的自动补全和类型检查
异步原生原生支持 async/await,轻松处理高并发

环境搭建与基础配置

安装 FastAPI

# 基础安装 pip install fastapi ​ # 生产环境需要 ASGI 服务器 pip install "uvicorn[standard]" ​ # 完整安装(包含所有常用依赖) pip install fastapi[all]

最小可行应用(MVP)

创建一个 main.py

from fastapi import FastAPI ​ app = FastAPI(    title="My Awesome API",    description="这是一个展示 FastAPI 强大功能的示例 API",    version="1.0.0",    docs_url="/docs",      # Swagger UI 路径    redoc_url="/redoc"     # ReDoc 路径 ) ​ @app.get("/") async def root():    return {"message": "Hello FastAPI!", "framework": "FastAPI"} ​ @app.get("/items/{item_id}") async def read_item(item_id: int, q: str | None = None):    """   获取特定项目信息       - **item_id**: 项目的唯一标识符   - **q**: 可选的查询参数   """    return {"item_id": item_id, "q": q}

启动应用

# 开发模式(热重载) uvicorn main:app --reload --host 0.0.0.0 --port 8000 ​ # 生产模式(多 worker) uvicorn main:app --workers 4 --host 0.0.0.0 --port 8000

启动后访问:

  • API 文档:http://localhost:8000/docs
  • 替代文档:http://localhost:8000/redoc

核心概念深度解析

1. 路径操作(Path Operations)

FastAPI 使用装饰器定义路由,支持所有 HTTP 方法:

from fastapi import FastAPI, HTTPException, status from enum import Enum ​ app = FastAPI() ​ class ModelName(str, Enum):    alexnet = "alexnet"    resnet = "resnet"    lenet = "lenet" ​ # GET 请求 @app.get("/models/{model_name}") async def get_model(model_name: ModelName):    if model_name == ModelName.alexnet:        return {"model_name": model_name, "message": "Deep Learning FTW!"}    if model_name.value == "lenet":        return {"model_name": model_name, "message": "LeCNN all the images"}    return {"model_name": model_name, "message": "Have some residuals"} ​ # POST 请求 - 创建资源 @app.post("/items/", status_code=status.HTTP_201_CREATED) async def create_item(item: dict):    return {"item": item, "message": "Item created successfully"} ​ # PUT 请求 - 完整更新 @app.put("/items/{item_id}") async def update_item(item_id: int, item: dict):    return {"item_id": item_id, **item} ​ # PATCH 请求 - 部分更新 @app.patch("/items/{item_id}") async def partial_update(item_id: int, item: dict):    return {"item_id": item_id, "updated_fields": list(item.keys())} ​ # DELETE 请求 @app.delete("/items/{item_id}") async def delete_item(item_id: int):    return {"message": f"Item {item_id} deleted"}

2. 请求参数详解

FastAPI 支持多种参数类型,自动进行验证和转换:

from fastapi import FastAPI, Query, Path, Body, File, UploadFile from typing import Annotated ​ app = FastAPI() ​ # 查询参数(Query Parameters) @app.get("/items/") async def read_items(    # 必填参数    q: str,    # 可选参数(带默认值)    skip: int = 0,    limit: int = 10,    # 复杂验证    price: Annotated[        float | None,        Query(            gt=0,           # 大于 0            le=1000,        # 小于等于 1000            description="价格范围必须在 0-1000 之间",            alias="item-price"  # 别名:?item-price=100       )   ] = None,    # 列表参数:?tags=foo&tags=bar    tags: Annotated[list[str] | None, Query()] = None,    # 布尔参数:?is_active=true    is_active: bool = True ):    results = {"items": [{"item_id": "Foo"}, {"item_id": "Bar"}], "skip": skip, "limit": limit}    if q:        results.update({"q": q})    return results ​ # 路径参数(Path Parameters)- 带验证 @app.get("/items/{item_id}") async def read_item(    item_id: Annotated[        int,        Path(            title="项目 ID",            description="项目的唯一标识符",            ge=1,  # 大于等于 1            le=1000       )   ] ):    return {"item_id": item_id} ​ # 请求体(Request Body) @app.post("/items/") async def create_item(    # 单个请求体    item: Annotated[dict, Body()],    # 嵌套请求体    user: Annotated[dict, Body(embed=True)],  # {"user": {...}}    # 额外数据    importance: Annotated[int, Body()] = 0 ):    return {        "item": item,        "user": user,        "importance": importance   } ​ # 文件上传 @app.post("/uploadfile/") async def create_upload_file(    file: UploadFile = File(..., description="上传的文件"),    # 多文件上传    files: list[UploadFile] = File(default=[], description="多个文件") ):    content = await file.read()    return {        "filename": file.filename,        "content_type": file.content_type,        "size": len(content),        "files_count": len(files)   }


数据验证与序列化

Pydantic 模型:FastAPI 的核心

from pydantic import BaseModel, Field, EmailStr, validator, root_validator from typing import Optional, List from datetime import datetime from enum import Enum ​ # 基础模型 class ItemBase(BaseModel):    title: str = Field(..., min_length=3, max_length=50, description="项目标题")    description: Optional[str] = Field(        None,        max_length=300,        title="项目描述",        example="这是一个示例描述"   )    price: float = Field(..., gt=0, description="必须大于 0")    tax: Optional[float] = None ​ # 创建时模型(无 ID) class ItemCreate(ItemBase):    pass ​ # 响应模型(包含 ID 和时间戳) class Item(ItemBase):    id: int    created_at: datetime    updated_at: Optional[datetime] = None        class Config:        orm_mode = True  # 支持 ORM 对象转换        schema_extra = {            "example": {                "title": "示例项目",                "description": "详细描述",                "price": 35.4,                "tax": 3.2,                "id": 1,                "created_at": "2024-01-01T00:00:00"           }       } ​ # 复杂验证示例 class UserCreate(BaseModel):    username: str = Field(..., min_length=3, max_length=20, regex="^[a-zA-Z0-9_]+$")    email: EmailStr    password: str = Field(..., min_length=8)    password_confirm: str    age: Optional[int] = Field(None, ge=18, le=120)        @validator('username')    def username_must_be_unique(cls, v):        # 模拟数据库检查        forbidden = ["admin", "root", "superuser"]        if v.lower() in forbidden:            raise ValueError(f'用户名 "{v}" 不可用')        return v        @root_validator    def passwords_match(cls, values):        pw1 = values.get('password')        pw2 = values.get('password_confirm')        if pw1 != pw2:            raise ValueError('两次输入的密码不匹配')        return values ​ # 使用模型 from fastapi import FastAPI ​ app = FastAPI() ​ @app.post("/items/", response_model=Item, status_code=201) async def create_item(item: ItemCreate):    # 模拟数据库操作    db_item = Item(        id=1,        created_at=datetime.now(),        **item.dict()   )    return db_item ​ @app.post("/users/", response_model=dict) async def create_user(user: UserCreate):    return {        "username": user.username,        "email": user.email,        "message": "用户创建成功"   }

嵌套模型与复杂数据结构

from pydantic import BaseModel, HttpUrl from typing import Set, List, Dict ​ class Image(BaseModel):    url: HttpUrl    name: str ​ class Item(BaseModel):    name: str    description: Optional[str] = None    price: float    tax: Optional[float] = None    tags: Set[str] = set()    images: Optional[List[Image]] = None        # 深度嵌套    metadata: Optional[Dict[str, str]] = None ​ # 使用示例 item_data = {    "name": "Foo",    "description": "The pretender",    "price": 42.0,    "tax": 3.2,    "tags": ["rock", "metal", "rock"],  # 自动去重为 Set    "images": [       {"url": "http://example.com/baz.jpg", "name": "The Foo live"},       {"url": "http://example.com/dave.jpg", "name": "The Baz"}   ],    "metadata": {"key1": "value1", "key2": "value2"} }


依赖注入系统

FastAPI 的依赖注入(Dependency Injection)是其最强大的特性之一,支持嵌套依赖、子依赖、路径操作装饰器依赖等。

from fastapi import Depends, HTTPException, status, Header from typing import Annotated from functools import lru_cache import jwt from datetime import datetime, timedelta ​ # 基础依赖 async def common_parameters(    q: str | None = None,    skip: int = 0,    limit: int = 100 ):    return {"q": q, "skip": skip, "limit": limit} ​ @app.get("/items/") async def read_items(commons: Annotated[dict, Depends(common_parameters)]):    return commons ​ # 类作为依赖(更复杂的场景) class CommonQueryParams:    def __init__(        self,        q: str | None = None,        skip: int = 0,        limit: int = 100   ):        self.q = q        self.skip = skip        self.limit = limit ​ @app.get("/users/") async def read_users(commons: Annotated[CommonQueryParams, Depends()]):    return commons ​ # 数据库依赖(实际应用模式) from sqlalchemy import create_engine from sqlalchemy.orm import sessionmaker, Session ​ SQLALCHEMY_DATABASE_URL = "sqlite:///./test.db" engine = create_engine(SQLALCHEMY_DATABASE_URL, connect_args={"check_same_thread": False}) SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine) ​ def get_db():    db = SessionLocal()    try:        yield db    finally:        db.close() ​ @app.get("/users/{user_id}") async def read_user(user_id: int, db: Annotated[Session, Depends(get_db)]):    # 使用 db 进行查询    return {"user_id": user_id} ​ # 认证依赖 SECRET_KEY = "your-secret-key" ALGORITHM = "HS256" ​ async def get_current_user(token: Annotated[str, Header(alias="X-Token")]):    credentials_exception = HTTPException(        status_code=status.HTTP_401_UNAUTHORIZED,        detail="Could not validate credentials",        headers={"WWW-Authenticate": "Bearer"},   )    try:        payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])        username: str = payload.get("sub")        if username is None:            raise credentials_exception    except jwt.PyJWTError:        raise credentials_exception        # 模拟获取用户    user = {"username": username, "id": 1}    if user is None:        raise credentials_exception    return user ​ async def get_current_active_user(    current_user: Annotated[dict, Depends(get_current_user)] ):    if current_user.get("disabled"):        raise HTTPException(status_code=400, detail="Inactive user")    return current_user ​ @app.get("/users/me") async def read_users_me(    current_user: Annotated[dict, Depends(get_current_active_user)] ):    return current_user ​ # 路径操作装饰器依赖(适用于整个路由) async def verify_token(x_token: Annotated[str, Header()]):    if x_token != "fake-super-secret-token":        raise HTTPException(status_code=400, detail="X-Token header invalid") ​ async def verify_key(x_key: Annotated[str, Header()]):    if x_key != "fake-super-secret-key":        raise HTTPException(status_code=400, detail="X-Key header invalid")    return x_key ​ @app.get("/items/", dependencies=[Depends(verify_token), Depends(verify_key)]) async def read_items():    return [{"item": "Foo"}, {"item": "Bar"}] ​ # 带 yield 的依赖(用于资源管理) async def get_db_with_transaction():    db = SessionLocal()    try:        yield db        db.commit()  # 成功时提交    except Exception:        db.rollback()  # 失败时回滚        raise    finally:        db.close()


数据库集成

SQLAlchemy 2.0 + FastAPI 最佳实践

from sqlalchemy import create_engine, Column, Integer, String, Float, DateTime, ForeignKey from sqlalchemy.orm import declarative_base, relationship, Session, joinedload from sqlalchemy.sql import func from fastapi import FastAPI, Depends, HTTPException from pydantic import BaseModel from typing import List, Optional from contextlib import asynccontextmanager ​ # 数据库配置 DATABASE_URL = "postgresql://user:password@localhost/dbname" engine = create_engine(DATABASE_URL, pool_pre_ping=True, pool_size=10, max_overflow=20) Base = declarative_base() ​ # 模型定义 class User(Base):    __tablename__ = "users"        id = Column(Integer, primary_key=True, index=True)    email = Column(String, unique=True, index=True, nullable=False)    hashed_password = Column(String, nullable=False)    full_name = Column(String)    is_active = Column(Integer, default=1)    created_at = Column(DateTime(timezone=True), server_default=func.now())        items = relationship("Item", back_populates="owner", cascade="all, delete-orphan") ​ class Item(Base):    __tablename__ = "items"        id = Column(Integer, primary_key=True, index=True)    title = Column(String, index=True)    description = Column(String)    price = Column(Float)    owner_id = Column(Integer, ForeignKey("users.id"))        owner = relationship("User", back_populates="items") ​ # Pydantic 模型 class UserBase(BaseModel):    email: str    full_name: Optional[str] = None ​ class UserCreate(UserBase):    password: str ​ class UserResponse(UserBase):    id: int    is_active: bool    created_at: Optional[str] = None        class Config:        from_attributes = True ​ class ItemBase(BaseModel):    title: str    description: Optional[str] = None    price: float ​ class ItemCreate(ItemBase):    pass ​ class ItemResponse(ItemBase):    id: int    owner_id: int        class Config:        from_attributes = True ​ class UserWithItems(UserResponse):    items: List[ItemResponse] = [] ​ # 数据库会话管理 def get_db():    db = Session(bind=engine)    try:        yield db    finally:        db.close() ​ # 生命周期管理 @asynccontextmanager async def lifespan(app: FastAPI):    # 启动时创建表(生产环境使用 Alembic 迁移)    Base.metadata.create_all(bind=engine)    yield    # 关闭时清理    engine.dispose() ​ app = FastAPI(lifespan=lifespan) ​ # CRUD 操作 @app.post("/users/", response_model=UserResponse, status_code=201) def create_user(user: UserCreate, db: Session = Depends(get_db)):    # 检查邮箱是否已存在    db_user = db.query(User).filter(User.email == user.email).first()    if db_user:        raise HTTPException(status_code=400, detail="Email already registered")        # 创建用户(实际应用需要哈希密码)    fake_hashed_password = user.password + "notreallyhashed"    db_user = User(        email=user.email,        hashed_password=fake_hashed_password,        full_name=user.full_name   )    db.add(db_user)    db.commit()    db.refresh(db_user)    return db_user ​ @app.get("/users/", response_model=List[UserResponse]) def read_users(skip: int = 0, limit: int = 100, db: Session = Depends(get_db)):    users = db.query(User).offset(skip).limit(limit).all()    return users ​ @app.get("/users/{user_id}", response_model=UserWithItems) def read_user(user_id: int, db: Session = Depends(get_db)):    # 使用 joinedload 避免 N+1 查询    user = db.query(User).options(joinedload(User.items)).filter(User.id == user_id).first()    if user is None:        raise HTTPException(status_code=404, detail="User not found")    return user ​ @app.post("/users/{user_id}/items/", response_model=ItemResponse) def create_item_for_user(    user_id: int,    item: ItemCreate,    db: Session = Depends(get_db) ):    db_user = db.query(User).filter(User.id == user_id).first()    if not db_user:        raise HTTPException(status_code=404, detail="User not found")        db_item = Item(**item.dict(), owner_id=user_id)    db.add(db_item)    db.commit()    db.refresh(db_item)    return db_item ​ @app.delete("/users/{user_id}") def delete_user(user_id: int, db: Session = Depends(get_db)):    user = db.query(User).filter(User.id == user_id).first()    if not user:        raise HTTPException(status_code=404, detail="User not found")        db.delete(user)    db.commit()    return {"message": "User deleted successfully"}

异步数据库(推荐用于高并发)

from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession, async_sessionmaker from sqlalchemy.future import select ​ # 使用异步驱动 ASYNC_DATABASE_URL = "postgresql+asyncpg://user:password@localhost/dbname" async_engine = create_async_engine(ASYNC_DATABASE_URL, echo=True) AsyncSessionLocal = async_sessionmaker(async_engine, class_=AsyncSession, expire_on_commit=False) ​ async def get_async_db():    async with AsyncSessionLocal() as session:        yield session ​ @app.get("/users/async", response_model=List[UserResponse]) async def read_users_async(    skip: int = 0,    limit: int = 100,    db: AsyncSession = Depends(get_async_db) ):    result = await db.execute(select(User).offset(skip).limit(limit))    users = result.scalars().all()    return users


认证与安全

JWT 认证完整实现

from datetime import datetime, timedelta from typing import Optional from jose import JWTError, jwt from passlib.context import CryptContext from fastapi import Depends, HTTPException, status from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm from pydantic import BaseModel ​ # 配置 SECRET_KEY = "09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7" ALGORITHM = "HS256" ACCESS_TOKEN_EXPIRE_MINUTES = 30 ​ # 密码加密上下文 pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto") ​ # OAuth2 方案 oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token") ​ # 模型 class Token(BaseModel):    access_token: str    token_type: str ​ class TokenData(BaseModel):    username: Optional[str] = None ​ class User(BaseModel):    username: str    email: Optional[str] = None    full_name: Optional[str] = None    disabled: Optional[bool] = None ​ class UserInDB(User):    hashed_password: str ​ # 模拟数据库 fake_users_db = {    "johndoe": {        "username": "johndoe",        "full_name": "John Doe",        "email": "[email protected]",        "hashed_password": pwd_context.hash("secret"),        "disabled": False,   } } ​ # 工具函数 def verify_password(plain_password, hashed_password):    return pwd_context.verify(plain_password, hashed_password) ​ def get_password_hash(password):    return pwd_context.hash(password) ​ def get_user(db, username: str):    if username in db:        user_dict = db[username]        return UserInDB(**user_dict) ​ def authenticate_user(fake_db, username: str, password: str):    user = get_user(fake_db, username)    if not user:        return False    if not verify_password(password, user.hashed_password):        return False    return user ​ def create_access_token(data: dict, expires_delta: Optional[timedelta] = None):    to_encode = data.copy()    if expires_delta:        expire = datetime.utcnow() + expires_delta    else:        expire = datetime.utcnow() + timedelta(minutes=15)    to_encode.update({"exp": expire})    encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)    return encoded_jwt ​ # 依赖 async def get_current_user(token: str = Depends(oauth2_scheme)):    credentials_exception = HTTPException(        status_code=status.HTTP_401_UNAUTHORIZED,        detail="Could not validate credentials",        headers={"WWW-Authenticate": "Bearer"},   )    try:        payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])        username: str = payload.get("sub")        if username is None:            raise credentials_exception        token_data = TokenData(username=username)    except JWTError:        raise credentials_exception    user = get_user(fake_users_db, username=token_data.username)    if user is None:        raise credentials_exception    return user ​ async def get_current_active_user(current_user: User = Depends(get_current_user)):    if current_user.disabled:        raise HTTPException(status_code=400, detail="Inactive user")    return current_user ​ # 路由 @app.post("/token", response_model=Token) async def login_for_access_token(form_data: OAuth2PasswordRequestForm = Depends()):    user = authenticate_user(fake_users_db, form_data.username, form_data.password)    if not user:        raise HTTPException(            status_code=status.HTTP_401_UNAUTHORIZED,            detail="Incorrect username or password",            headers={"WWW-Authenticate": "Bearer"},       )    access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)    access_token = create_access_token(        data={"sub": user.username}, expires_delta=access_token_expires   )    return {"access_token": access_token, "token_type": "bearer"} ​ @app.get("/users/me/", response_model=User) async def read_users_me(current_user: User = Depends(get_current_active_user)):    return current_user ​ @app.get("/users/me/items/") async def read_own_items(current_user: User = Depends(get_current_active_user)):    return [{"item_id": "Foo", "owner": current_user.username}]

CORS 配置与安全头

from fastapi.middleware.cors import CORSMiddleware from fastapi.middleware.trustedhost import TrustedHostMiddleware from fastapi.middleware.gzip import GZipMiddleware ​ app = FastAPI() ​ # CORS 配置 app.add_middleware(    CORSMiddleware,    allow_origins=["https://example.com", "https://www.example.com"],    allow_credentials=True,    allow_methods=["GET", "POST", "PUT", "DELETE"],    allow_headers=["*"],    expose_headers=["X-Custom-Header"],    max_age=600,  # 预检请求缓存时间 ) ​ # 受信任主机 app.add_middleware(    TrustedHostMiddleware,    allowed_hosts=["example.com", "*.example.com"] ) ​ # Gzip 压缩 app.add_middleware(GZipMiddleware, minimum_size=1000) ​ # 安全头中间件 @app.middleware("http") async def add_security_headers(request, call_next):    response = await call_next(request)    response.headers["X-Content-Type-Options"] = "nosniff"    response.headers["X-Frame-Options"] = "DENY"    response.headers["X-XSS-Protection"] = "1; mode=block"    response.headers["Strict-Transport-Security"] = "max-age=31536000; includeSubDomains"    return response


中间件与后台任务

自定义中间件

import time import logging from fastapi import Request from starlette.middleware.base import BaseHTTPMiddleware ​ # 日志配置 logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) ​ class TimingMiddleware(BaseHTTPMiddleware):    async def dispatch(self, request: Request, call_next):        start_time = time.time()                # 记录请求信息        logger.info(f"Request: {request.method} {request.url}")                response = await call_next(request)                process_time = time.time() - start_time        response.headers["X-Process-Time"] = str(process_time)                logger.info(f"Response time: {process_time:.4f}s - Status: {response.status_code}")        return response ​ class RateLimitMiddleware(BaseHTTPMiddleware):    def __init__(self, app, max_requests: int = 100, window_seconds: int = 60):        super().__init__(app)        self.max_requests = max_requests        self.window_seconds = window_seconds        self.requests = {}        async def dispatch(self, request: Request, call_next):        client_ip = request.client.host        current_time = time.time()                # 清理过期记录        self.requests = {            ip: [t for t in times if current_time - t < self.window_seconds]            for ip, times in self.requests.items()       }                # 检查限制        if len(self.requests.get(client_ip, [])) >= self.max_requests:            from fastapi.responses import JSONResponse            return JSONResponse(                status_code=429,                content={"detail": "Rate limit exceeded"}           )                # 记录请求        if client_ip not in self.requests:            self.requests[client_ip] = []        self.requests[client_ip].append(current_time)                return await call_next(request) ​ app.add_middleware(TimingMiddleware) app.add_middleware(RateLimitMiddleware, max_requests=100, window_seconds=60)

后台任务

from fastapi import BackgroundTasks, Depends from typing import Annotated import asyncio import smtplib from email.mime.text import MIMEText ​ def send_email(email_to: str, subject: str, body: str):    """模拟发送邮件(实际应用中配置 SMTP)"""    print(f"Sending email to {email_to}")    # smtp_server = smtplib.SMTP("smtp.gmail.com", 587)    # ... ​ async def process_large_file(file_path: str):    """异步处理大文件"""    await asyncio.sleep(10)  # 模拟长时间处理    print(f"File {file_path} processed") ​ @app.post("/send-notification/{email}") async def send_notification(    email: str,    background_tasks: BackgroundTasks,    message: str = "Hello from FastAPI" ):    background_tasks.add_task(send_email, email, "Notification", message)    return {"message": "Notification sent in the background"} ​ @app.post("/upload/") async def upload_file(    background_tasks: BackgroundTasks,    file: UploadFile = File(...) ):    file_path = f"/tmp/{file.filename}"    with open(file_path, "wb") as f:        content = await file.read()        f.write(content)        # 添加后台任务    background_tasks.add_task(process_large_file, file_path)        return {        "filename": file.filename,        "status": "File uploaded, processing in background"   } ​ # 使用 Celery 进行更复杂的后台任务(生产环境推荐) """ from celery import Celery ​ celery_app = Celery(   "tasks",   broker="redis://localhost:6379/0",   backend="redis://localhost:6379/0" ) ​ @celery_app.task def heavy_computation(data: dict):   import time   time.sleep(30)   return {"result": "completed", "data": data} ​ @app.post("/heavy-task/") async def trigger_heavy_task(data: dict):   task = heavy_computation.delay(data)   return {"task_id": task.id, "status": "processing"} """


测试与部署

测试 FastAPI 应用

# test_main.py import pytest from fastapi.testclient import TestClient from sqlalchemy import create_engine from sqlalchemy.orm import sessionmaker from sqlalchemy.pool import StaticPool ​ from main import app, get_db, Base ​ # 内存数据库用于测试 SQLALCHEMY_DATABASE_URL = "sqlite:///:memory:" ​ engine = create_engine(    SQLALCHEMY_DATABASE_URL,    connect_args={"check_same_thread": False},    poolclass=StaticPool, ) TestingSessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine) ​ def override_get_db():    try:        db = TestingSessionLocal()        yield db    finally:        db.close() ​ app.dependency_overrides[get_db] = override_get_db client = TestClient(app) ​ @pytest.fixture(scope="function") def setup_database():    Base.metadata.create_all(bind=engine)    yield    Base.metadata.drop_all(bind=engine) ​ def test_create_user(setup_database):    response = client.post(        "/users/",        json={"email": "[email protected]", "password": "testpass123", "full_name": "Test User"}   )    assert response.status_code == 201    data = response.json()    assert data["email"] == "[email protected]"    assert "id" in data ​ def test_read_user(setup_database):    # 先创建用户    client.post(        "/users/",        json={"email": "[email protected]", "password": "testpass123"}   )        response = client.get("/users/1")    assert response.status_code == 200    assert response.json()["email"] == "[email protected]" ​ def test_read_nonexistent_user(setup_database):    response = client.get("/users/999")    assert response.status_code == 404 ​ def test_invalid_user_data(setup_database):    response = client.post(        "/users/",        json={"email": "invalid-email", "password": "short"}   )    assert response.status_code == 422  # 验证错误 ​ # 异步测试 import httpx import pytest_asyncio ​ @pytest_asyncio.fixture async def async_client():    async with httpx.AsyncClient(app=app, base_url="http://test") as ac:        yield ac ​ @pytest.mark.asyncio async def test_async_endpoint(async_client):    response = await async_client.get("/")    assert response.status_code == 200

Docker 部署

# Dockerfile FROM python:3.11-slim ​ WORKDIR /app ​ # 安装依赖 COPY requirements.txt . RUN pip install --no-cache-dir -r requirements.txt ​ # 复制应用代码 COPY . . ​ # 非 root 用户运行 RUN useradd -m -u 1000 appuser && chown -R appuser:appuser /app USER appuser ​ # 暴露端口 EXPOSE 8000 ​ # 启动命令(生产环境使用 gunicorn + uvicorn) CMD ["gunicorn", "main:app", "-w", "4", "-k", "uvicorn.workers.UvicornWorker", "-b", "0.0.0.0:8000"] # docker-compose.yml version: '3.8' ​ services: web:   build: .   ports:     - "8000:8000"   environment:     - DATABASE_URL=postgresql://postgres:password@db:5432/fastapi_db     - SECRET_KEY=${SECRET_KEY}   depends_on:     - db     - redis   volumes:     - ./:/app   command: uvicorn main:app --host 0.0.0.0 --port 8000 --reload ​ db:   image: postgres:15   environment:     - POSTGRES_USER=postgres     - POSTGRES_PASSWORD=password     - POSTGRES_DB=fastapi_db   volumes:     - postgres_data:/var/lib/postgresql/data   ports:     - "5432:5432" ​ redis:   image: redis:7-alpine   ports:     - "6379:6379" ​ volumes: postgres_data:

Kubernetes 部署配置

# k8s-deployment.yaml apiVersion: apps/v1 kind: Deployment metadata: name: fastapi-app spec: replicas: 3 selector:   matchLabels:     app: fastapi template:   metadata:     labels:       app: fastapi   spec:     containers:     - name: fastapi       image: your-registry/fastapi-app:latest       ports:       - containerPort: 8000       env:       - name: DATABASE_URL         valueFrom:           secretKeyRef:             name: db-secret             key: url       resources:         requests:           memory: "256Mi"           cpu: "250m"         limits:           memory: "512Mi"           cpu: "500m"       livenessProbe:         httpGet:           path: /health           port: 8000         initialDelaySeconds: 30         periodSeconds: 10       readinessProbe:         httpGet:           path: /ready           port: 8000         initialDelaySeconds: 5         periodSeconds: 5 --- apiVersion: v1 kind: Service metadata: name: fastapi-service spec: selector:   app: fastapi ports: - port: 80   targetPort: 8000 type: LoadBalancer


性能优化最佳实践

1. 异步数据库连接池

from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession from sqlalchemy.orm import sessionmaker import aioredis ​ # 优化连接池配置 async_engine = create_async_engine(    "postgresql+asyncpg://user:pass@localhost/db",    pool_size=20,              # 基础连接数    max_overflow=10,           # 最大溢出连接    pool_pre_ping=True,        # 连接前 ping 检查    pool_recycle=3600,         # 连接回收时间    echo=False                 # 生产环境关闭 SQL 日志 ) ​ # Redis 连接池 redis_pool = aioredis.ConnectionPool.from_url(    "redis://localhost",    max_connections=100,    decode_responses=True )

2. 缓存策略

from fastapi_cache import FastAPICache from fastapi_cache.backends.redis import RedisBackend from fastapi_cache.decorator import cache from redis import asyncio as aioredis ​ @app.on_event("startup") async def startup():    redis = aioredis.from_url("redis://localhost", encoding="utf8", decode_responses=True)    FastAPICache.init(RedisBackend(redis), prefix="fastapi-cache") ​ @app.get("/heavy-computation") @cache(expire=3600)  # 缓存 1 小时 async def heavy_computation():    # 模拟耗时操作    await asyncio.sleep(2)    return {"result": "expensive data"} ​ # 手动缓存控制 from fastapi_cache.coder import JsonCoder ​ @app.get("/users/{user_id}") async def get_user(user_id: int):    cache_key = f"user:{user_id}"        # 尝试从缓存获取    cached = await FastAPICache.get_backend().get(cache_key)    if cached:        return json.loads(cached)        # 查询数据库    user = await fetch_user_from_db(user_id)        # 写入缓存    await FastAPICache.get_backend().set(        cache_key,        json.dumps(user),        expire=300   )    return user

3. 响应模型优化

from fastapi import Response from fastapi.responses import ORJSONResponse, StreamingResponse import orjson ​ # 使用更快的 JSON 库 @app.get("/items/", response_class=ORJSONResponse) async def get_items():    return [{"id": i, "data": "value"} for i in range(1000)] ​ # 流式响应(大文件) @app.get("/large-file/") async def get_large_file():    def file_generator():        with open("large_file.zip", "rb") as f:            while chunk := f.read(8192):                yield chunk        return StreamingResponse(        file_generator(),        media_type="application/zip",        headers={"Content-Disposition": "attachment; filename=large_file.zip"}   ) ​ # 分页优化 from fastapi import Query from sqlalchemy import func ​ @app.get("/items/paginated/") async def get_items_paginated(    page: int = Query(1, ge=1),    size: int = Query(20, ge=1, le=100),    db: AsyncSession = Depends(get_async_db) ):    offset = (page - 1) * size        # 使用 count(*) OVER() 优化总数查询    result = await db.execute(        select(            Item,            func.count().over().label("total")       )       .offset(offset)       .limit(size)   )        rows = result.all()    total = rows[0].total if rows else 0        return {        "items": [row.Item for row in rows],        "total": total,        "page": page,        "size": size,        "pages": (total + size - 1) // size   }

4. 性能监控

from prometheus_client import Counter, Histogram, generate_latest from starlette.middleware.base import BaseHTTPMiddleware ​ # Prometheus 指标 REQUEST_COUNT = Counter(    'http_requests_total',    'Total HTTP requests',   ['method', 'endpoint', 'status'] ) ​ REQUEST_LATENCY = Histogram(    'http_request_duration_seconds',    'HTTP request latency',   ['method', 'endpoint'] ) ​ class PrometheusMiddleware(BaseHTTPMiddleware):    async def dispatch(self, request, call_next):        start_time = time.time()        response = await call_next(request)        duration = time.time() - start_time                REQUEST_COUNT.labels(            method=request.method,            endpoint=request.url.path,            status=response.status_code       ).inc()                REQUEST_LATENCY.labels(            method=request.method,            endpoint=request.url.path       ).observe(duration)                return response ​ app.add_middleware(PrometheusMiddleware) ​ @app.get("/metrics") async def metrics():    return Response(generate_latest(), media_type="text/plain")


总结

FastAPI 代表了 Python Web 开发的现代化方向,它通过以下特性成为构建高性能 API 的首选框架:

  1. 开发效率:类型提示带来的智能补全和自动文档生成
  2. 运行时性能:异步原生设计,媲美 Node.js 和 Go
  3. 工程规范:内置数据验证、依赖注入、认证授权等生产级功能
  4. 生态兼容:与 SQLAlchemy、Pydantic、Celery 等主流库无缝集成

学习路径建议

  1. 入门:掌握基础路由、请求参数、Pydantic 模型
  2. 进阶:深入依赖注入系统、数据库集成、异步编程
  3. 生产:学习 Docker 部署、Kubernetes 编排、监控告警
  4. 架构:微服务拆分、事件驱动架构、CQRS 模式

FastAPI 不仅是一个框架,更是一种现代 Python 工程实践的体现。无论你是构建小型微服务还是大型分布式系统,FastAPI 都能提供坚实的技术支撑。


本文代码示例基于 FastAPI 0.104+ 和 Python 3.11+,建议在实际项目中使用最新稳定版本。

Read more

【前端地图】地理编码与逆地理编码 —— 让地址和坐标不再“鸡同鸭讲”

【前端地图】地理编码与逆地理编码 —— 让地址和坐标不再“鸡同鸭讲”

🌏第 7 节:地理编码与逆地理编码 —— 让地址和坐标不再“鸡同鸭讲” 🎙️ 一、 老曹引言:地址与坐标的“爱恨情仇” 🗣️ 各位同学好,我是老曹。今天咱们来聊第 7 节,地理编码与逆地理编码。说实话,这玩意儿在地图开发里属于“看似简单,实则坑深似海”的类型。你们是不是觉得,不就是把“成都市青羊区”变成一串数字,或者把一串数字变回“成都市青羊区”吗?太天真了!在实际项目中,我见过太多因为坐标系没搞对,导致物流配送员对着地图上的标记点骂娘,明明就在楼下,导航非让他去河里捞船。这节内容,就是为了让你们少挨骂,少加班,把地址和坐标之间的翻译工作做得明明白白。 🤔 很多新人刚上手地图 SDK 的时候,最喜欢干的事就是直接调用 geocoder.getLocation,然后指望它能返回一个精准无比的 coordinate。结果呢?高德的坐标放到百度地图上,偏移了几百米;或者在国内用了

什么是MalformedStreamException,和WebKitFormBoundary有什么关系

org.apache.tomcat.util.http.fileupload.MultipartStream$MalformedStreamException https://stackoverflow.org.cn/questions/53500627  总结: MalformedStreamException是因为没读取到上传文件请求体的末尾分隔符,上传文件使用的是multipart/form-data格式,对分隔符有要求 关联问题: * MalformedStreamException报错的原因 * 什么是:multipart/form-data 的边界(boundary) * multipart/form-data 的内容格式一般是什么样子 * 这个和WebKitFormBoundary 有关系吗,为什么有些分隔符是-----------------------------149742642616556 这个错误是什么? org.apache.tomcat.util.http.fileupload.MultipartStream$Malform

前端也能玩转:用Fabric.js轻松实现图形拖拽缩放旋转(附实战技巧)

前端也能玩转:用Fabric.js轻松实现图形拖拽缩放旋转(附实战技巧)

前端也能玩转:用Fabric.js轻松实现图形拖拽缩放旋转(附实战技巧) * 小白前端也能玩转:用Fabric.js轻松实现图形拖拽缩放旋转(附实战技巧) * 原生API写拖拽?那简直是手搓发动机啊 * Fabric.js其实就是Canvas的"美颜滤镜"加"自动挡" * 三分钟搭个能"动"的画布 * 平移旋转缩放?鼠标手势随心所欲 * 那些文档里不写的坑,我血都吐出来了 * 性能优化:别让网页变成PPT * 懒人开发技巧:让代码又稳又省事 * 你以为这就完了?Fabric还能搞更多骚操作 * 线上翻车实录:文档没告诉你的那些破事 * 调试Fabric就像修水管 * 别把Fabric当银弹,但也别自己手搓轮子 小白前端也能玩转:用Fabric.js轻松实现图形拖拽缩放旋转(附实战技巧) 小白前端也能玩转:用Fabric.js轻松实现图形拖拽缩放旋转(附实战技巧) 说实话,第一次接到要在网页里做个"

Open-WebUI—开箱即用的AI对话可视化神器

Open-WebUI—开箱即用的AI对话可视化神器

你是否曾兴奋地在本地部署了Ollama,却很快被冰冷的命令行和繁琐的指令劝退?是否羡慕ChatGPT那样优雅的聊天界面,却又希望数据能牢牢掌握在自己手中?OpenWebUI。这个在GitHub上狂揽 110,000 Stars 的明星项目,完美地解决了所有痛点 github地址: https://github.com/open-webui/open-webui 1.什么是Open WebUI? Open WebUI 是一款专为大型语言模型(LLM)设计的 开源可视化交互框架,它通过简洁的Web界面,让用户无需编写代码即可与本地部署的AI模型/各大服务商提供大模型API(如DeepSeek、Llama、ChatGLM等)进行自然对话。其核心使命是 “让LLM私有化部署像打开浏览器一样简单” ,尤其适合需要快速搭建企业级AI平台或追求数据隐私的开发者。 2. 核心价值 * 开箱即用:无需复杂的前端开发,快速搭建 AI 交互界面。完全开源,可自由部署、修改和二次开发,无商业使用限制。 * 多模型支持:兼容 Ollama、