FastAPI 为何脱颖而出
FastAPI 的三大杀手锏:
FastAPI 基于 Python 类型提示实现自动数据验证,原生支持异步 IO 提升性能。文章深入解析依赖注入三层架构、后台任务系统设计与 WebSocket 实时通信实战。涵盖中间件安全配置、性能监控指标收集及生产环境 Docker 部署方案。提供企业级代码示例与常见避坑指南,助力构建高性能现代化 API 服务。

FastAPI 的三大杀手锏:
FastAPI 从底层为异步设计,不是简单包装。对比测试数据:
| 场景 | FastAPI | Flask+gevent | 性能提升 |
|---|---|---|---|
| 1000 并发 IO 操作 | 1.8 秒 | 2.9 秒 | 61% |
| 混合负载 | 2.1 秒 | 3.4 秒 | 62% |
| 内存占用 | 85MB | 120MB | 29% |
# 真正的异步处理
from fastapi import FastAPI, BackgroundTasks
import asyncio
from datetime import datetime
app = FastAPI()
@app.get("/api/status")
async def get_status():
"""完全异步的端点"""
start = datetime.now()
# 并发执行多个 IO 操作
results = await asyncio.gather(
fetch_user_data(),
fetch_order_data(),
fetch_product_data()
)
return {
"time": (datetime.now() - start).total_seconds(),
"data": results
}
async def fetch_user_data():
await asyncio.sleep(0.1) # 模拟数据库查询
return {"users": 150}
async def fetch_order_data():
await asyncio.sleep(0.2)
return {"orders": 45}
async def fetch_product_data():
await asyncio.sleep(0.15)
return {"products": 89}
from fastapi import Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer
from typing import Optional, Generator
from contextlib import contextmanager
import redis
from datetime import datetime, timedelta
# 1. 认证依赖
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="auth/token")
async def get_current_user(token: str = Depends(oauth2_scheme)):
"""用户认证依赖"""
try:
# 解码 JWT 令牌
payload = decode_jwt(token)
return {
"id": payload["user_id"],
"username": payload["sub"],
"roles": payload.get("roles", [])
}
except Exception:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="无效的认证令牌"
)
# 2. 角色权限依赖
def require_roles(required_roles: list):
"""角色检查依赖工厂"""
async def check_roles(current_user: dict = Depends(get_current_user)):
user_roles = set(current_user.get("roles", []))
required = set(required_roles)
if not required.intersection(user_roles):
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="权限不足"
)
return current_user
return check_roles
# 3. 数据库会话依赖
class DatabaseSession:
"""数据库会话管理"""
def __init__(self, readonly: bool = False):
self.readonly = readonly
async def __call__(self):
# 创建数据库会话
session = create_db_session(readonly=self.readonly)
try:
yield session
finally:
session.close()
# 4. Redis 缓存依赖
@contextmanager
def get_redis_connection():
"""Redis 连接管理"""
client = redis.Redis.from_url("redis://localhost:6379")
try:
yield client
finally:
client.close()
# 5. 在路由中使用
@app.get("/admin/users")
async def get_users(
db=Depends(DatabaseSession()),
cache=Depends(get_redis_connection),
admin_only: bool = Depends(require_roles(["admin"]))
):
"""管理员获取用户列表"""
# 检查缓存
cached = cache.get("all_users")
if cached:
return json.loads(cached)
# 查询数据库
users = db.query(User).all()
result = [user.dict() for user in users]
# 设置缓存
cache.setex("all_users", 300, json.dumps(result))
return result
from fastapi import BackgroundTasks
from pydantic import BaseModel, EmailStr
from typing import List
import smtplib
from email.mime.text import MIMEText
import logging
from datetime import datetime
import asyncio
logger = logging.getLogger(__name__)
class EmailService:
"""邮件服务"""
def __init__(self):
self.tasks = {}
async def send_bulk_email(self, recipients: List[EmailStr], subject: str, content: str):
"""批量发送邮件"""
success = 0
failed = 0
for email in recipients:
try:
await self._send_single(email, subject, content)
success += 1
except Exception as e:
logger.error(f"发送失败到 {email}: {e}")
failed += 1
# 失败重试逻辑
await self._retry(email, subject, content)
return {"success": success, "failed": failed}
async def _send_single(self, to: str, subject: str, content: str):
"""发送单封邮件"""
# 模拟邮件发送
await asyncio.sleep(0.5)
logger.info(f"邮件已发送到:{to}")
async def _retry(self, to: str, subject: str, content: str, max_retries: int = 3):
"""失败重试"""
for i in range(max_retries):
try:
await asyncio.sleep(2 ** i) # 指数退避
await self._send_single(to, subject, content)
return True
except Exception:
continue
return False
# 邮件模型
class EmailRequest(BaseModel):
to: List[EmailStr]
subject: str
content: str
is_html: bool = False
# 初始化服务
email_service = EmailService()
@app.post("/send-newsletter")
async def send_newsletter(request: EmailRequest, background_tasks: BackgroundTasks):
"""发送新闻邮件"""
# 记录任务开始
task_id = f"newsletter_{datetime.now().timestamp()}"
email_service.tasks[task_id] = {
"status": "pending",
"started_at": None,
"finished_at": None,
"result": None
}
# 定义任务函数
async def send_task():
email_service.tasks[task_id]["status"] = "running"
email_service.tasks[task_id]["started_at"] = datetime.now()
try:
result = await email_service.send_bulk_email(
request.to, request.subject, request.content
)
email_service.tasks[task_id].update({
"status": "completed",
"finished_at": datetime.now(),
"result": result
})
except Exception as e:
email_service.tasks[task_id].update({
"status": "failed",
"finished_at": datetime.now(),
"result": str(e)
})
# 添加到后台任务
background_tasks.add_task(send_task)
return {
"task_id": task_id,
"message": "邮件发送任务已启动",
"recipients_count": len(request.to)
}
@app.get("/task/{task_id}/status")
async def get_task_status(task_id: str):
"""获取任务状态"""
task = email_service.tasks.get(task_id)
if not task:
raise HTTPException(404, detail="任务不存在")
return task
from fastapi import FastAPI, WebSocket, WebSocketDisconnect
from typing import Dict, Set, List
import asyncio
import json
import uuid
from datetime import datetime
import logging
logger = logging.getLogger(__name__)
class ConnectionManager:
"""连接管理器"""
def __init__(self):
self.active_connections: Dict[str, WebSocket] = {}
self.user_rooms: Dict[str, Set[str]] = {}
self.room_connections: Dict[str, Set[str]] = {}
self.connection_info: Dict[str, dict] = {}
async def connect(self, websocket: WebSocket, user_id: str):
"""接受连接"""
await websocket.accept()
connection_id = str(uuid.uuid4())
self.active_connections[connection_id] = websocket
self.connection_info[connection_id] = {
"user_id": user_id,
"connected_at": datetime.now()
}
logger.info(f"用户 {user_id} 已连接")
return connection_id
async def disconnect(self, connection_id: str):
"""断开连接"""
if connection_id in self.active_connections:
websocket = self.active_connections.pop(connection_id)
info = self.connection_info.pop(connection_id, {})
user_id = info.get("user_id")
# 清理房间信息
if user_id in self.user_rooms:
for room_id in list(self.user_rooms[user_id]):
await self.leave_room(connection_id, room_id)
del self.user_rooms[user_id]
logger.info(f"用户 {user_id} 已断开")
async def join_room(self, connection_id: str, room_id: str):
"""加入房间"""
info = self.connection_info.get(connection_id)
if not info:
return
user_id = info["user_id"]
# 初始化数据结构
if user_id not in self.user_rooms:
self.user_rooms[user_id] = set()
if room_id not in self.room_connections:
self.room_connections[room_id] = set()
# 更新状态
self.user_rooms[user_id].add(room_id)
self.room_connections[room_id].add(connection_id)
# 广播加入消息
await self.broadcast_to_room(room_id, {
"type": "user_joined",
"user_id": user_id,
"room_id": room_id,
"timestamp": datetime.now().isoformat()
})
# 发送当前房间成员
members = await self.get_room_members(room_id)
await self.send_to(connection_id, {
"type": "room_info",
"room_id": room_id,
"members": members
})
async def leave_room(self, connection_id: str, room_id: str):
"""离开房间"""
if room_id not in self.room_connections:
return
# 从房间移除
self.room_connections[room_id].discard(connection_id)
# 清理空房间
if not self.room_connections[room_id]:
del self.room_connections[room_id]
# 广播离开消息
info = self.connection_info.get(connection_id)
if info:
await self.broadcast_to_room(room_id, {
"type": "user_left",
"user_id": info["user_id"],
"room_id": room_id,
"timestamp": datetime.now().isoformat()
})
async def send_to(self, connection_id: str, message: dict):
"""发送消息给指定连接"""
if connection_id in self.active_connections:
try:
await self.active_connections[connection_id].send_json(message)
except:
await self.disconnect(connection_id)
async def broadcast_to_room(self, room_id: str, message: dict):
"""广播消息到房间"""
if room_id not in self.room_connections:
return
for connection_id in list(self.room_connections[room_id]):
await self.send_to(connection_id, message)
async def get_room_members(self, room_id: str) -> List[str]:
"""获取房间成员列表"""
if room_id not in self.room_connections:
return []
members = []
for connection_id in self.room_connections[room_id]:
info = self.connection_info.get(connection_id)
if info:
members.append(info["user_id"])
return members
# 初始化管理器
manager = ConnectionManager()
app = FastAPI()
@app.websocket("/ws/{user_id}")
async def websocket_endpoint(websocket: WebSocket, user_id: str):
"""WebSocket 主入口"""
connection_id = await manager.connect(websocket, user_id)
try:
# 发送连接成功消息
await manager.send_to(connection_id, {
"type": "connected",
"user_id": user_id,
"timestamp": datetime.now().isoformat()
})
# 主消息循环
while True:
try:
data = await websocket.receive_json()
message_type = data.get("type")
if message_type == "join":
# 加入房间
room_id = data.get("room_id")
if room_id:
await manager.join_room(connection_id, room_id)
elif message_type == "message":
# 发送消息
room_id = data.get("room_id")
content = data.get("content")
if room_id and content:
info = manager.connection_info.get(connection_id)
if info and room_id in manager.room_connections:
await manager.broadcast_to_room(room_id, {
"type": "message",
"user_id": info["user_id"],
"room_id": room_id,
"content": content,
"timestamp": datetime.now().isoformat()
})
elif message_type == "typing":
# 正在输入状态
room_id = data.get("room_id")
if room_id:
info = manager.connection_info.get(connection_id)
if info:
# 广播给房间内其他用户
for conn_id in manager.room_connections.get(room_id, []):
if conn_id != connection_id:
await manager.send_to(conn_id, {
"type": "user_typing",
"user_id": info["user_id"],
"room_id": room_id
})
elif message_type == "leave":
# 离开房间
room_id = data.get("room_id")
if room_id:
await manager.leave_room(connection_id, room_id)
except json.JSONDecodeError:
await manager.send_to(connection_id, {
"type": "error",
"message": "无效的消息格式"
})
except WebSocketDisconnect:
logger.info(f"连接断开:{connection_id}")
finally:
await manager.disconnect(connection_id)
# 状态查询接口
@app.get("/chat/rooms/{room_id}")
async def get_room_info(room_id: str):
"""获取房间信息"""
members = await manager.get_room_members(room_id)
return {
"room_id": room_id,
"member_count": len(members),
"members": members,
"is_active": room_id in manager.room_connections
}
from fastapi import FastAPI, Request
from starlette.middleware.base import BaseHTTPMiddleware
import time
import hashlib
from typing import Dict
from datetime import datetime, timedelta
from collections import defaultdict
import logging
logger = logging.getLogger(__name__)
class SecurityMiddleware(BaseHTTPMiddleware):
"""安全中间件"""
def __init__(self, app, rate_limit: int = 100):
super().__init__(app)
self.rate_limit = rate_limit
self.request_counts: Dict[str, list] = defaultdict(list)
async def dispatch(self, request: Request, call_next):
# 1. 获取客户端 IP
client_ip = request.client.host
# 2. 速率限制检查
if not self.check_rate_limit(client_ip):
from fastapi import HTTPException
raise HTTPException(
status_code=429,
detail="请求过于频繁,请稍后再试",
headers={"Retry-After": "60"}
)
# 3. 请求 ID 生成
request_id = hashlib.md5(
f"{client_ip}:{time.time()}".encode()
).hexdigest()[:8]
# 4. 记录请求开始
start_time = time.time()
logger.info(f"[{request_id}] {request.method} {request.url.path}")
try:
# 5. 处理请求
response = await call_next(request)
# 6. 记录请求完成
process_time = (time.time() - start_time) * 1000
logger.info(
f"[{request_id}] 完成 - "
f"状态:{response.status_code} - "
f"耗时:{process_time:.2f}ms"
)
# 7. 添加安全头
response.headers.update({
"X-Request-ID": request_id,
"X-Process-Time": f"{process_time:.2f}ms",
"X-Content-Type-Options": "nosniff",
"X-Frame-Options": "DENY",
"X-XSS-Protection": "1; mode=block",
"Strict-Transport-Security": "max-age=31536000; includeSubDomains",
})
return response
except Exception as e:
# 8. 记录异常
process_time = (time.time() - start_time) * 1000
logger.error(
f"[{request_id}] 异常 - "
f"错误:{str(e)} - "
f"耗时:{process_time:.2f}ms"
)
raise
def check_rate_limit(self, client_ip: str) -> bool:
"""检查速率限制"""
now = time.time()
# 清理过期记录
self.request_counts[client_ip] = [
timestamp for timestamp in self.request_counts[client_ip]
if now - timestamp < 60 # 60 秒窗口
]
# 检查限制
if len(self.request_counts[client_ip]) >= self.rate_limit:
return False
# 记录本次请求
self.request_counts[client_ip].append(now)
return True
class PerformanceMiddleware(BaseHTTPMiddleware):
"""性能监控中间件"""
async def dispatch(self, request: Request, call_next):
start_time = time.perf_counter()
# 添加计时器
request.state.start_time = start_time
# 处理请求
response = await call_next(request)
# 计算耗时
process_time = time.perf_counter() - start_time
# 记录慢请求
if process_time > 1.0: # 超过 1 秒
logger.warning(
f"慢请求:{request.method} {request.url.path} "
f"耗时:{process_time:.3f}s"
)
# 添加响应头
response.headers["X-Response-Time"] = f"{process_time:.3f}s"
return response
# 配置中间件
app = FastAPI()
# 添加中间件(注意顺序很重要)
app.add_middleware(SecurityMiddleware, rate_limit=100) # 限流
app.add_middleware(PerformanceMiddleware) # 性能监控
# CORS 配置
from fastapi.middleware.cors import CORSMiddleware
app.add_middleware(
CORSMiddleware,
allow_origins=["https://example.com"], # 生产环境指定具体域名
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
max_age=600, # 预检请求缓存时间
)
from prometheus_client import Counter, Histogram, Gauge, generate_latest
from fastapi import FastAPI, Request, Response
from fastapi.routing import APIRoute
import time
from typing import Callable
import psutil
import os
# 定义指标
REQUEST_COUNT = Counter(
'http_requests_total', 'HTTP 请求总数', ['method', 'endpoint', 'status']
)
REQUEST_DURATION = Histogram(
'http_request_duration_seconds', 'HTTP 请求耗时', ['method', 'endpoint']
)
REQUEST_IN_PROGRESS = Gauge(
'http_requests_in_progress', '进行中的 HTTP 请求数', ['method', 'endpoint']
)
MEMORY_USAGE = Gauge('process_memory_usage_bytes', '进程内存使用量')
CPU_USAGE = Gauge('process_cpu_percent', '进程 CPU 使用率')
class MonitoringRoute(APIRoute):
"""监控路由"""
def get_route_handler(self) -> Callable:
original_route_handler = super().get_route_handler()
async def custom_route_handler(request: Request) -> Response:
# 记录开始时间
start_time = time.time()
# 增加进行中请求计数
REQUEST_IN_PROGRESS.labels(
method=request.method,
endpoint=request.url.path
).inc()
try:
# 处理请求
response = await original_route_handler(request)
# 记录请求指标
REQUEST_COUNT.labels(
method=request.method,
endpoint=request.url.path,
status=response.status_code
).inc()
REQUEST_DURATION.labels(
method=request.method,
endpoint=request.url.path
).observe(time.time() - start_time)
return response
finally:
# 减少进行中请求计数
REQUEST_IN_PROGRESS.labels(
method=request.method,
endpoint=request.url.path
).dec()
return custom_route_handler
app = FastAPI()
app.router.route_class = MonitoringRoute
@app.get("/metrics")
async def get_metrics():
"""提供 Prometheus 指标"""
# 更新系统指标
MEMORY_USAGE.set(psutil.Process(os.getpid()).memory_info().rss)
CPU_USAGE.set(psutil.Process(os.getpid()).cpu_percent())
return Response(
content=generate_latest(),
media_type="text/plain"
)
@app.get("/health")
async def health_check():
"""健康检查端点"""
return {
"status": "healthy",
"timestamp": time.time(),
"memory_usage_mb": psutil.Process(os.getpid()).memory_info().rss / 1024 / 1024,
"cpu_percent": psutil.Process(os.getpid()).cpu_percent()
}
# config/production.py
from pydantic import BaseSettings
from typing import Optional
import os
class Settings(BaseSettings):
# 基础配置
ENV: str = "production"
DEBUG: bool = False
SECRET_KEY: str
ALLOWED_HOSTS: list = ["yourdomain.com"]
# 数据库
DATABASE_URL: str
DB_POOL_SIZE: int = 20
DB_MAX_OVERFLOW: int = 10
# Redis
REDIS_URL: str
REDIS_POOL_SIZE: int = 10
# JWT
JWT_SECRET_KEY: str
JWT_ALGORITHM: str = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES: int = 30
# 日志
LOG_LEVEL: str = "INFO"
LOG_FILE: str = "/var/log/fastapi.log"
# 监控
ENABLE_METRICS: bool = True
METRICS_PORT: int = 9090
class Config:
env_file = ".env"
# 中间件配置
MIDDLEWARE_CONFIG = {
"CORSMiddleware": {
"allow_origins": ["https://yourdomain.com"],
"allow_credentials": True,
"allow_methods": ["*"],
"allow_headers": ["*"],
},
"TrustedHostMiddleware": {
"allowed_hosts": ["yourdomain.com", "api.yourdomain.com"]
},
"HTTPSRedirectMiddleware": {},
"GZipMiddleware": {
"minimum_size": 1000
}
}
# 性能优化配置
PERFORMANCE_CONFIG = {
"MAX_WORKERS": 4, # 根据 CPU 核心数调整
"KEEPALIVE": 5,
"TIMEOUT": 30,
"LIMIT_CONCURRENCY": 1000,
"LIMIT_MAX_REQUESTS": 10000
}
# docker-compose.prod.yml
version: '3.8'
services:
api:
build: .
ports:
- "8000:8000"
environment:
- DATABASE_URL=postgresql://user:pass@db:5432/app
- REDIS_URL=redis://redis:6379/0
- ENV=production
depends_on:
- db
- redis
restart: always
networks:
- app-network
deploy:
resources:
limits:
cpus: '2'
memory: 1G
healthcheck:
test: ["CMD", "curl", "-f", "http://localhost:8000/health"]
interval: 30s
timeout: 10s
retries: 3
db:
image: postgres:13
environment:
- POSTGRES_DB=app
- POSTGRES_USER=user
- POSTGRES_PASSWORD=pass
volumes:
- postgres_data:/var/lib/postgresql/data
networks:
- app-network
restart: always
redis:
image: redis:6-alpine
command: redis-server --appendonly yes
volumes:
- redis_data:/data
networks:
- app-network
restart: always
nginx:
image: nginx:alpine
ports:
- "80:80"
- "443:443"
volumes:
- ./nginx.conf:/etc/nginx/nginx.conf
- ./ssl:/etc/nginx/ssl
depends_on:
- api
networks:
- app-network
restart: always
prometheus:
image: prom/prometheus
ports:
- "9090:9090"
volumes:
- ./prometheus.yml:/etc/prometheus/prometheus.yml
- prometheus_data:/prometheus
networks:
- app-network
restart: always
grafana:
image: grafana/grafana
ports:
- "3000:3000"
environment:
- GF_SECURITY_ADMIN_PASSWORD=admin
volumes:
- grafana_data:/var/lib/grafana
networks:
- app-network
restart: always
networks:
app-network:
driver: bridge
volumes:
postgres_data:
redis_data:
prometheus_data:
grafana_data:
我在实际项目中踩过的坑:

微信公众号「极客日志」,在微信中扫描左侧二维码关注。展示文案:极客日志 zeeklog
解析常见 curl 参数并生成 fetch、axios、PHP curl 或 Python requests 示例代码。 在线工具,curl 转代码在线工具,online
将字符串编码和解码为其 Base64 格式表示形式即可。 在线工具,Base64 字符串编码/解码在线工具,online
将字符串、文件或图像转换为其 Base64 表示形式。 在线工具,Base64 文件转换器在线工具,online
将 Markdown(GFM)转为 HTML 片段,浏览器内 marked 解析;与 HTML转Markdown 互为补充。 在线工具,Markdown转HTML在线工具,online
将 HTML 片段转为 GitHub Flavored Markdown,支持标题、列表、链接、代码块与表格等;浏览器内处理,可链接预填。 在线工具,HTML转Markdown在线工具,online
通过删除不必要的空白来缩小和压缩JSON。 在线工具,JSON 压缩在线工具,online