CREATE DATABASE IF NOT EXISTS testdb CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci;
USE testdb;
CREATE TABLE IF NOT EXISTS users (
id INT AUTO_INCREMENT PRIMARY KEY,
name VARCHAR(50) NOT NULL,
email VARCHAR(100) NOT NULL UNIQUE,
created_at DATETIME DEFAULT CURRENT_TIMESTAMP
);
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
DATABASE_URL = "mysql+aiomysql://user:password@localhost:3306/testdb"
engine = create_engine(DATABASE_URL, echo=True, pool_pre_ping=True)
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
from sqlalchemy import Column, Integer, String, DateTime
from sqlalchemy.sql import func
from sqlalchemy.ext.declarative import declarative_base
Base = declarative_base()
class User(Base):
__tablename__ = "users"
id = Column(Integer, primary_key=True, index=True)
name = Column(String(50), nullable=False)
email = Column(String(100), unique=True, nullable=False)
created_at = Column(DateTime(timezone=True), server_default=func.now())
from sqlalchemy.orm import Session
from models import User
from database import SessionLocal
from app.schemas import UserCreate, UserUpdate, BatchInOrUpdate
def get_db():
db = SessionLocal()
try:
yield db
finally:
db.close()
def create_user(db: Session, user: UserCreate):
db_user = User(name=user.name, email=user.email)
db.add(db_user)
db.commit()
db.refresh(db_user)
return db_user
def get_users(db: Session, skip: int = 0, limit: int = 10):
return db.query(User).offset(skip).limit(limit).all()
def get_user(db: Session, user_id: int):
return db.query(User).filter(User.id == user_id).first()
def delete_user(db: Session, user_id: int):
user = db.query(User).filter(User.id == user_id).first()
if user:
db.delete(user)
db.commit()
return user
def batch_insert_update_users(db: Session, users: list[UserUpdate]) -> int:
"""同步批量插入/更新(耗时操作)"""
for u in users:
if u.id:
db.query(User).filter(User.id == u.id).update({"name": u.name, "email": u.email})
elif not u.id:
db.add(User(name=u.name, email=u.email))
db.commit()
return len(users)
from fastapi import FastAPI, Depends, BackgroundTasks, HTTPException
from sqlalchemy.orm import Session
from app.models import Base, User
from app.database import engine
from app.crud import create_user, get_users, get_user, delete_user, get_db, batch_insert_update_users
from app.schemas import UserCreate, UserOut, BatchInOrUpdate, TaskOut, TaskStatus
import uuid
Base.metadata.create_all(bind=engine)
app = FastAPI(title="FastAPI-SQLAlchemy-Tutorial")
@app.post("/users", response_model=UserOut, summary="① 同步 - 创建用户")
def create_user_api(user: UserCreate, db: Session = Depends(get_db)):
return create_user(db, user)
@app.get("/users", response_model=list[UserOut], summary="① 同步 - 列表用户")
def read_users(skip: int = 0, limit: int = 10, db: Session = Depends(get_db)):
return get_users(db, skip, limit)
@app.get("/users/{user_id}", response_model=UserOut, summary="① 同步 - 查询用户")
def read_user(user_id: int, db: Session = Depends(get_db)):
user = get_user(db, user_id)
if not user:
raise HTTPException(404, "User not found")
return user
@app.delete("/users/{user_id}", summary="① 同步 - 删除用户")
def delete_user_api(user_id: int, db: Session = Depends(get_db)):
user = delete_user(db, user_id)
return {"deleted": user is not None}
background_tasks = {}
def run_batch(db: Session, users: list[UserCreate]):
background_tasks["status"] = "running"
total = batch_insert_update_users(db, users)
background_tasks["status"] = "completed"
background_tasks["total"] = total
@app.post("/users/batch", response_model=TaskOut, summary="② 异步 - 批量保存/更新")
def create_batch(
users: BatchInOrUpdate,
background_tasks: BackgroundTasks,
db: Session = Depends(get_db)
):
task_id = str(uuid.uuid4())
background_tasks.add_task(run_batch, db, users.users)
return {"task_id": task_id, "message": "批量任务已提交,后台执行中"}
@app.get("/tasks/{task_id}", response_model=TaskStatus, summary="② 查询任务状态")
def read_task_status(task_id: str):
status = background_tasks.get("status", "not_found")
total = background_tasks.get("total", 0)
return {"task_id": task_id, "status": status, "total": total}