AIGC 异步回调系统实现
AIGC 异步回调系统实现
目录
一、系统概述
1.1 背景与目标
本系统实现了一套通用的异步回调架构,用于处理 AIGC 服务(视频生成、图片生成等)的长耗时任务。
核心目标:
- ✅ 快速响应:接收请求后立即返回 task_id,不阻塞
- ✅ 状态管理:通过数据库追踪任务状态
- ✅ 回调通知:完成后自动回调后端
- ✅ 统一管理:一套架构支持多种业务类型
- ✅ 可观测性:完整的时间戳链路追踪
1.2 两种回调地址(核心概念)
| 回调地址 | 说明 | 示例 | 配置方式 |
|---|---|---|---|
| algorithm_callback_url | AIGC 供应商回调我们的地址 | https://algorithm.com/api/callback | 方式1:提交时发送 方式2:供应商平台配置 |
| backend_callback_url | 我们完成后端通知的地址 | https://backend.com/api/notify | 后端通过请求参数传递 |
关键理解:
后端 → 算法端 → AIGC供应商 ↓ ↓ ↓ 发起 提交任务 异步处理 任务 +回调地址 (1-5分钟) ↓ ↓ ↓ ←─────────收到回调─┘ ↓ 下载+上传OSS ↓ ──────→通知后端 二、核心文件说明
2.1 典型项目结构
异步回调系统通常采用分层架构,典型结构如下:
project/ ├── api/ # API 层 - HTTP 接口定义 │ ├── routes/ # 路由模块 │ │ └── callback.py # 回调统一入口 ⭐⭐⭐ │ └── schemas/ # 请求/响应模型 │ ├── core/ # 核心业务层 │ ├── dispatcher.py # 处理器分发器 ⭐⭐⭐ │ ├── handlers/ # 回调处理器 │ │ ├── base.py # 处理器基类 ⭐⭐⭐ │ │ └── video.py # 视频处理器示例 │ └── services/ # 业务逻辑 │ ├── models/ # 数据层 │ ├── task.py # 任务模型 ⭐⭐⭐ │ └── database.py # 数据库操作 ⭐⭐⭐ │ ├── integrations/ # 外部服务集成 │ ├── factory.py # 模型 工厂 │ └── providers/ # 各厂商 模型封装对象 │ ├── config/ # 配置 │ └── settings.py # 配置管理 ⭐⭐ │ └── utils/ # 工具模块 ├── storage.py # 存储/OSS 工具 └── logger/ # 日志系统 架构分层:
┌─────────────────────────────────────────────┐ │ API 层 接收请求、参数验证 │ ├─────────────────────────────────────────────┤ │ 业务层 回调处理、业务编排 │ ├─────────────────────────────────────────────┤ │ 数据层 模型定义、数据持久化 │ ├─────────────────────────────────────────────┤ │ 集成层 外部 API 调用 │ ├─────────────────────────────────────────────┤ │ 工具层 通用工具函数 │ └─────────────────────────────────────────────┘ 2.2 核心文件详解
2.2.1 API 层
回调统一入口 ⭐⭐⭐
职责:统一回调入口,接收所有供应商的回调请求
核心功能:
- 解析回调数据,提取 TaskId(支持多种格式)
- 根据 TaskId 查询数据库,获取任务信息
- 根据 business_type 路由到对应处理器
- 更新回调接收时间戳
核心接口:
@router.post("/callback")asyncdefhandle_callback(request: Request)-> JSONResponse 支持的回调格式:
- 供应商 A:
Event.TaskId(事件嵌套格式) - 供应商 A:
TaskId(扁平格式) - 供应商 B:
req_id - 通用:
task_id
关键代码片段:
# 提取 TaskId(支持多种格式) model_task_id =( data.get("TaskId")or data.get("task_id")or data.get("req_id")or data.get("Event",{}).get("TaskId"))# 查询数据库 db = get_db() task =await db.get_by_model_task_id(model_task_id)# 路由到处理器 handler = Dispatcher.get_handler(task.business_type)await handler.process_callback(task, data)依赖关系:
callback.py → models/database.py (数据库查询) → core/dispatcher.py (处理器路由) → core/handlers/*.py (具体处理器) 业务接口路由
职责:业务接口的路由定义
核心接口:
POST /api/v1/video/async- 视频异步生成GET /api/v1/video/status/{task_id}- 查询任务状态
职责范围:
- 接收 HTTP 请求
- 参数验证(Pydantic Schema)
- 调用业务层处理
- 返回统一格式响应
2.2.2 业务层
处理器分发器 ⭐⭐⭐
职责:处理器分发器,根据 business_type 路由到对应处理器
核心功能:
- 维护处理器注册表
- 提供
register()方法注册处理器 - 提供
get_handler()方法获取处理器实例
设计模式:策略模式 + 工厂模式
关键代码:
classCallbackDispatcher:"""回调处理器分发器"""# 处理器注册表 _handlers:dict[str, Type]={}@classmethoddefregister(cls, business_type:str, handler_class: Type):"""注册处理器""" cls._handlers[business_type]= handler_class logger.info(f"注册处理器: {business_type} -> {handler_class.__name__}")@classmethoddefget_handler(cls, business_type:str):"""获取对应的处理器实例""" handler_class = cls._handlers.get(business_type)ifnot handler_class:raise ValueError(f"未知的业务类型: {business_type}")return handler_class(business_type)已注册的处理器示例:
video_generation→ VideoGenerationCallbackHandlerimage_generation→ ImageGenerationCallbackHandler- (可扩展)
处理器基类 ⭐⭐⭐
职责:回调处理器基类,定义标准处理流程
核心功能:
- 定义
process_callback()抽象方法(子类必须实现) - 提供通用的 OSS 上传方法
download_and_upload_storage() - 提供通用的后端通知方法
check_and_notify() - 提供通用的 URL 提取方法
extract_result_url()
标准处理流程:
asyncdefprocess_callback(task, callback_data):# 1. 提取结果 URL result_url = self.extract_result_url(callback_data)# 2. 下载并上传存储 storage_url =await self.download_and_upload_storage(result_url,...)# 3. 更新数据库状态await db.update_storage_uploaded(task.model_task_id, storage_url)# 4. 检查是否所有子任务完成await self.check_and_notify(task.task_id)关键方法:
# 抽象方法(子类必须实现)@abstractmethodasyncdefprocess_callback(self, task: Task, callback_data:dict)->None:"""处理回调的核心逻辑"""pass# 通用方法asyncdefdownload_and_upload_storage(self, url:str, task_id:str,...)->str:"""从URL流式上传到存储(不落地)"""asyncdefcheck_and_notify(self, task_id:str, results:list=None)->None:"""检查是否所有子任务完成,通知后端"""defextract_result_url(self, callback_data:dict)->str:"""从回调数据提取结果URL(支持多种格式)"""继承关系:
BaseCallbackHandler (抽象基类) ├── VideoGenerationCallbackHandler ├── ImageGenerationCallbackHandler └── ... (其他业务处理器) 处理器自动注册 ⭐⭐
职责:自动注册所有回调处理器
实现方式:模块导入时自动注册
# core/handlers/__init__.pyfrom.video_handler import VideoGenerationCallbackHandler from.image_handler import ImageGenerationCallbackHandler # 导入时自动注册到 Dispatcherfrom core.dispatcher import CallbackDispatcher # 注册处理器 CallbackDispatcher.register("video_generation", VideoGenerationCallbackHandler) CallbackDispatcher.register("image_generation", ImageGenerationCallbackHandler)注意:
- ⚠️ 新增处理器后,必须在此文件中导入和注册
- ⚠️ business_type 必须唯一,否则会覆盖
具体业务处理器示例 ⭐⭐
职责:视频生成回调处理器(示例实现)
核心功能:
- 实现
process_callback()方法 - 处理视频生成回调
- 调用基类的通用方法
关键代码:
classVideoGenerationCallbackHandler(BaseCallbackHandler):"""视频生成回调处理器"""asyncdefprocess_callback( self, task: Task, callback_data: Dict[str, Any])->None:# 1. 提取视频URL video_url = self.extract_result_url(callback_data)# 2. 流式上传到存储 storage_url =await self.download_and_upload_storage( url=video_url, task_id=task.task_id, item_index=task.item_index )# 3. 更新数据库状态await db.update_storage_uploaded(task.model_task_id, storage_url)# 4. 检查是否所有子任务完成并通知后端await self.check_and_notify(task.task_id)def_build_result_item(self, task: Task)-> Dict[str, Any]:"""构建单个结果项(业务格式)""" request_data = task.get_request_data()return{"index": task.item_index,"model_task_id": task.model_task_id,"custom_field": request_data.get("custom_field",""),"storage_url": task.storage_url,"status": task.status.value,}业务逻辑服务 ⭐⭐
职责:视频生成业务逻辑
核心功能:
- 接收 API 层请求
- 获取回调地址配置
- 循环提交任务到外部服务
- 保存任务映射关系到数据库
- 返回统一格式响应
关键方法:
asyncdefsubmit_video_generation_task(request: Dict[str, Any])-> Dict[str, Any]:"""提交视频生成任务"""# 1. 获取算法端回调地址(配置) algorithm_callback_url = _get_callback_url()# 2. 获取后端回调地址(请求参数) backend_callback_url = request.get("notify_url")# 3. 循环提交任务for idx, item inenumerate(request.get("items",[])): result =await client.create_task(...) model_task_id = result["TaskId"]# 4. 保存到数据库 task_record = Task( task_id=task_id, model_task_id=model_task_id, business_type="video_generation", backend_callback_url=backend_callback_url, algorithm_callback_url=algorithm_callback_url,...)await db.create_task(task_record)# 5. 返回响应return{"status":"accepted","task_id": task_id}依赖关系:
video_service.py → integrations/factory.py (获取 Client) → models/database.py (数据库操作) → models/task.py (任务模型) 2.2.3 数据层
任务数据模型 ⭐⭐⭐
职责:任务数据模型定义
核心字段:
@dataclassclassTask:# 核心关联字段 task_id:str# 业务ID(多个子任务共享) model_task_id:str# 外部服务返回的TaskId(唯一)# 业务类型(用于路由到对应处理器) business_type:str# 如:video_generation, image_generation provider:str# 如:provider_a, provider_b# 任务信息 status: TaskStatus = TaskStatus.PENDING # 任务状态 item_index:int=0# 序号(方便排序和聚合)# 请求数据 request_payload: Optional[str]=None# 完整的请求数据(JSON字符串) prompt: Optional[str]=None# 提取的prompt(方便查询)# 结果数据 result_url: Optional[str]=None# 外部服务返回的URL storage_url: Optional[str]=None# 上传到存储的URL callback_payload: Optional[str]=None# 完整回调数据(JSON字符串) processed_data: Optional[str]=None# 业务处理后的数据(JSON字符串)# 错误信息 error_message: Optional[str]=None# 回调地址(重要!区分两个) backend_callback_url: Optional[str]=None# 后端的回调地址(后端传来) algorithm_callback_url: Optional[str]=None# 算法端的回调地址(配置,提交给外部服务)# 后端通知 backend_notified:bool=False# 是否已通知后端# 时间戳(链路追踪) request_time: Optional[datetime]=None# HTTP请求到达时间(API层记录) created_at: Optional[datetime]=None# 数据库记录首次创建时间 callback_received_at: Optional[datetime]=None# 收到外部服务回调时间 task_completed_time: Optional[datetime]=None# 外部服务任务完成时间 storage_uploaded_at: Optional[datetime]=None# 存储上传完成时间 processed_at: Optional[datetime]=None# 业务处理完成时间 backend_notified_at: Optional[datetime]=None# 后端通知时间核心方法:
defto_dict(self)-> Dict[str, Any]:"""转换为字典"""passdefto_db_row(self)->tuple:"""转换为数据库行(用于插入)"""pass@classmethoddeffrom_db_row(cls, row:tuple)->"Task":"""从数据库行创建实例"""passdefget_request_data(self)-> Dict[str, Any]:"""获取解析后的请求数据"""pass状态枚举:
classTaskStatus(str, Enum): PENDING ="pending"# 待处理(已提交任务) PROCESSING ="processing"# 处理中(收到回调) COMPLETED ="completed"# 已完成 FAILED ="failed"# 失败数据库管理 ⭐⭐⭐
职责:数据库管理,提供 CRUD 操作
核心方法:
classDatabaseManager:asyncdefcreate_task(self, task: Task)->int:"""创建任务记录"""passasyncdefget_by_model_task_id(self, model_task_id:str)-> Optional[Task]:"""根据 model_task_id 查询任务"""passasyncdefget_by_task_id(self, task_id:str)-> List[Task]:"""根据 task_id 查询所有子任务"""passasyncdefupdate_status( self, model_task_id:str, status: TaskStatus, error_message:str=None)->bool:"""更新任务状态"""passasyncdefupdate_callback_received( self, model_task_id:str, callback_payload:str, result_url:str, task_completed_time: datetime )->bool:"""更新回调接收状态"""passasyncdefupdate_storage_uploaded( self, model_task_id:str, storage_url:str)->bool:"""更新存储上传状态"""passasyncdefcheck_all_completed(self, task_id:str)->bool:"""检查是否所有子任务完成"""passasyncdefupdate_backend_notified(self, task_id:str)->bool:"""更新后端通知状态"""pass数据库表结构:
CREATETABLE tasks (-- 主键 id INTEGERPRIMARYKEY AUTOINCREMENT,-- 核心关联字段 task_id TEXTNOTNULL,-- 业务ID(多个子任务共享) model_task_id TEXTNOTNULLUNIQUE,-- 外部服务返回的TaskId-- 业务类型(用于路由) business_type TEXTNOTNULL,-- "video_generation", "image_generation", ... provider TEXTNOTNULL,-- "provider_a", "provider_b"-- 任务信息statusTEXTDEFAULT'pending',-- pending, processing, completed, failed item_index INTEGERDEFAULT0,-- 序号(方便排序和聚合)-- 请求数据 request_payload TEXT,-- 完整的请求数据(JSON字符串) prompt TEXT,-- 提取的prompt(方便查询)-- 结果数据 result_url TEXT,-- 外部服务返回的URL storage_url TEXT,-- 上传到存储的URL callback_payload TEXT,-- 完整回调数据(JSON字符串) processed_data TEXT,-- 业务处理后的数据(JSON字符串)-- 错误信息 error_message TEXT,-- 回调地址(重要!区分两个) backend_callback_url TEXT,-- 后端的回调地址(后端传来) algorithm_callback_url TEXT,-- 算法端的回调地址(配置,提交给外部服务)-- 后端通知 backend_notified BOOLEANDEFAULT0,-- 是否已通知后端-- 时间戳(链路追踪) request_time TIMESTAMP,-- HTTP请求到达时间(API层记录) created_at TIMESTAMPDEFAULTCURRENT_TIMESTAMP,-- 数据库记录首次创建时间 callback_received_at TIMESTAMP,-- 收到外部服务回调时间 task_completed_time TIMESTAMP,-- 外部服务任务完成时间 storage_uploaded_at TIMESTAMP,-- 存储上传完成时间 processed_at TIMESTAMP,-- 业务处理完成时间 backend_notified_at TIMESTAMP,-- 后端通知时间-- 索引INDEX idx_task_id (task_id),INDEX idx_model_task (model_task_id),INDEX idx_business_type (business_type),INDEX idx_status (status));2.2.4 集成层
外部服务 Client
职责:外部服务 API 调用封装
核心方法:
classExternalServiceClient(BaseClient):asyncdefcreate_video_task( self, model:str, prompt:str="",...)-> Dict[str, Any]:"""创建视频生成任务"""passasyncdefquery_task(self, task_id:str)-> Dict[str, Any]:"""查询任务状态"""pass说明:
- 使用对应服务商的签名方式
- 支持多种模型
- 回调地址可通过平台配置,API 参数可选
2.2.5 配置层
统一配置管理 ⭐⭐
职责:统一配置管理
核心配置:
CALLBACK_CONFIG ={# 算法端回调地址基础路径(推荐通过环境变量注入)"base_url": os.getenv("CALLBACK_BASE_URL","https://your-domain.com"),"callback_path":"/api/callback",}# 服务商配置(密钥从环境变量读取)def_get_providers():return{"provider_a":{"api_key":"...","secret_id":"...",...},"provider_b":{"api_key":"...",},...} CONFIG ={"env": os.getenv("ENV","dev"),"debug": os.getenv("ENV","dev")=="dev","providers": _get_providers(),# 每次访问都加载最新密钥"timeout": TIMEOUT_CONFIG,"retry": RETRY_CONFIG,"callback": CALLBACK_CONFIG,}环境变量:
# 开发环境 (.env)CALLBACK_BASE_URL=http://localhost:8000 ENV=dev # 生产环境CALLBACK_BASE_URL=https://your-domain.com ENV=prod 三、数据流详解
3.1 任务提交流程
外部服务商integrations/providers/models/database.pycore/services/api/routes/后端服务外部服务商integrations/providers/models/database.pycore/services/api/routes/后端服务1. 记录 request_time2. 验证参数3. 获取 algorithm_callback_url(从 config/settings.py 读取)4. 创建 Task 对象task_id, model_task_id,business_type, backend_callback_url,algorithm_callback_url, request_timeloop[每个item]POST /api/v1/video/async{notify_url, items}submit_video_generation_task(request)async with get_client()POST CreateTask{prompt}{TaskId: "provider_abc123"}TaskIdcreate_task(Task){status: "accepted", task_id}202 Accepted
3.2 回调处理流程
后端服务存储服务core/handlers/video.pycore/dispatcher.pymodels/database.pyapi/routes/callback.py外部服务商后端服务存储服务core/handlers/video.pycore/dispatcher.pymodels/database.pyapi/routes/callback.py外部服务商1. 提取 TaskId2. 查询数据库3. 更新 callback_received_at4. 提取 result_url5. 根据 business_type路由到对应处理器流式上传(不落地)从数据库读取 backend_callback_url暂不通知,等待其他回调alt[所有子任务完成][还有子任务未完成]POST /api/callback{TaskId, Status, Output}get_by_model_task_id(provider_abc123)Task对象update_callback_received(...)get_handler("video_generation")Handler实例process_callback(task, callback_data)extract_result_url()download_and_upload_storage(result_url)storage_urlupdate_storage_uploaded(model_task_id, storage_url)check_all_completed(task_id)true构建完整结果列表POST backend_callback_url{callback_type, task_id, results}200 OKupdate_backend_notified(task_id)false处理完成200 OK
3.3 字段更新时间线
┌─────────────────────────────────────────────────────────────┐ │ 时间线 │ ├─────────────────────────────────────────────────────────────┤ │ │ │ 10:00:00 ─ request_time ← API 层记录 │ │ ↓ │ │ 10:00:01 ─ created_at ← 数据库记录创建 │ │ ↓ │ │ ─ 提交到外部服务,获得 TaskId │ │ ↓ │ │ ─ 外部服务异步处理中 (1-5 分钟) │ │ ↓ │ │ 10:05:00 ─ callback_received_at ← 收到外部服务回调 │ │ 10:05:00 ─ task_completed_time ← 外部服务任务完成 │ │ ↓ │ │ 10:05:10 ─ storage_uploaded_at ← 存储上传完成 │ │ 10:05:11 ─ processed_at ← 业务处理完成 │ │ ↓ │ │ 10:05:13 ─ backend_notified_at ← 后端通知完成 │ │ │ └─────────────────────────────────────────────────────────────┘ 四、配置说明
4.1 回调地址配置
4.1.1 algorithm_callback_url(算法端回调地址)
作用:外部服务商回调我们的地址
配置位置:config/settings.py
CALLBACK_CONFIG ={"base_url": os.getenv("CALLBACK_BASE_URL","https://your-domain.com"),"callback_path":"/api/callback",}获取方式:
def_get_callback_url()->str:"""获取算法端回调地址""" callback_config = CONFIG.get("callback",{}) base_url = callback_config.get("base_url","")returnf"{base_url}/api/callback"服务商配置方式对比:
| 服务商 | 配置方式 | 是否必须 | 优先级 |
|---|---|---|---|
| 服务商 A | 平台配置 / 提交时发送 | 否 | 平台配置优先 |
| 服务商 B | 提交时发送 | 是 | 必须发送 |
| 服务商 C | 平台配置 | 否 | 平台配置 |
服务商 A 平台配置示例:
1. 登录服务商控制台 2. 进入产品页面 3. 找到【事件回调】或【Webhook 配置】 4. 添加回调服务器: - 回调地址: https://your-domain.com/api/callback - 鉴权方式: 密钥鉴权 - 回调事件: 勾选【任务完成】 5. 保存配置 4.1.2 backend_callback_url(后端回调地址)
作用:我们完成后端通知的地址
来源:后端通过请求参数传递
示例:
POST/api/v1/video/async {"notify_url":"https://backend-service.com/api/notify"}使用位置:
- 保存到数据库 (
backend_callback_url字段) - 所有子任务完成后,从数据库读取并回调
4.2 环境变量配置
# 开发环境 (.env)CALLBACK_BASE_URL=http://localhost:8000 ENV=dev # 生产环境CALLBACK_BASE_URL=https://algorithm-domain.com ENV=prod # 服务商 APROVIDER_A_SECRET_ID=your_secret_id PROVIDER_A_SECRET_KEY=your_secret_key # 服务商 BPROVIDER_B_API_KEY=your_api_key 五、开发指南
5.1 新增业务处理器
步骤 1:创建处理器文件
# core/handlers/my_business_handler.pyfrom typing import Dict, Any from.base import BaseCallbackHandler from models.task import Task, TaskStatus from models.database import get_db classMyBusinessCallbackHandler(BaseCallbackHandler):"""我的业务回调处理器"""def__init__(self, business_type:str="my_business"):super().__init__(business_type=business_type)asyncdefprocess_callback( self, task: Task, callback_data: Dict[str, Any])->None:"""处理回调"""# 1. 提取结果 URL result_url = self.extract_result_url(callback_data)ifnot result_url:await get_db().update_status( task.model_task_id, TaskStatus.FAILED,"无法提取结果URL")return# 2. 上传到存储try: storage_url =await self.download_and_upload_storage( url=result_url, task_id=task.task_id, item_index=task.item_index )except Exception as e:await get_db().update_status( task.model_task_id, TaskStatus.FAILED,f"存储上传失败: {str(e)}")return# 3. 更新数据库await get_db().update_storage_uploaded(task.model_task_id, storage_url)# 4. 检查是否所有子任务完成并通知后端await self.check_and_notify(task.task_id)def_build_result_item(self, task: Task)-> Dict[str, Any]:"""构建单个结果项(自定义格式)""" request_data = task.get_request_data()return{"index": task.item_index,"model_task_id": task.model_task_id,"custom_field": request_data.get("custom_field",""),"storage_url": task.storage_url,"status": task.status.value,}步骤 2:注册处理器
# core/handlers/__init__.pyfrom.my_business_handler import MyBusinessCallbackHandler from core.dispatcher import CallbackDispatcher # 注册处理器 CallbackDispatcher.register("my_business", MyBusinessCallbackHandler)步骤 3:实现业务逻辑
# core/services/my_business.pyasyncdefsubmit_my_business_task(request: Dict[str, Any])-> Dict[str, Any]:"""提交我的业务任务"""# 1. 获取算法端回调地址from config.settings import CONFIG callback_config = CONFIG.get("callback",{}) base_url = callback_config.get("base_url","") algorithm_callback_url =f"{base_url}/api/callback"# 2. 获取后端回调地址 backend_callback_url = request.get("notify_url")# 3. 提交任务for idx, item inenumerate(request.get("items",[])): result =await client.create_task(...) model_task_id = result["TaskId"]# 4. 保存到数据库 task_record = Task( task_id=task_id, model_task_id=model_task_id, business_type="my_business",# 必须与注册时一致 provider="xxx", backend_callback_url=backend_callback_url, algorithm_callback_url=algorithm_callback_url,...)await db.create_task(task_record)return{"status":"accepted","task_id": task_id}5.2 调试技巧
5.2.1 日志追踪
from utils.logger.config import get_business_logger logger = get_business_logger("my_business") logger.info("任务提交", extra={"task_id": task_id,"model_task_id": model_task_id,"request": request # 完整请求})5.2.2 数据库查询
-- 查询所有未完成的任务SELECT*FROM tasks WHEREstatus!='completed';-- 查询某个业务的所有子任务SELECT*FROM tasks WHERE task_id ='task_001';-- 查询某个 model_task_id 的任务SELECT*FROM tasks WHERE model_task_id ='provider_abc123';-- 统计各状态的任务数量SELECTstatus,COUNT(*)FROM tasks GROUPBYstatus;5.2.3 回调测试
# test/callback/test_manual_callback.pyimport httpx import json asyncdeftest_manual_callback():"""手动测试回调""" url ="http://localhost:8000/api/callback"# 模拟外部服务商回调 callback_data ={"Event":{"TaskId":"provider_abc123",# 替换为真实的 TaskId"Status":"FINISH","Output":{"FileUrl":"https://test.com/video.mp4"}}}asyncwith httpx.AsyncClient()as client: response =await client.post(url, json=callback_data)print(response.status_code)print(response.json())六、运维指南
6.1 部署检查清单
6.1.1 环境变量检查
# 检查环境变量是否配置echo$CALLBACK_BASE_URL6.1.2 数据库初始化
-- 检查表是否存在SELECT name FROM sqlite_master WHEREtype='table'AND name='tasks';-- 检查索引是否存在SELECT name FROM sqlite_master WHEREtype='index'AND tbl_name='tasks';6.1.3 回调地址测试
# 测试算法端回调地址是否可访问curl -X POST https://your-domain.com/api/callback \ -H "Content-Type: application/json"\ -d '{"test": true}'# 预期返回(TaskId 找不到是正常的):# {"status": "error", "message": "Task not found"}6.2 监控指标
6.2.1 关键性能指标(KPIs)
| 指标 | 说明 | 目标值 |
|---|---|---|
| 任务提交延迟 | 从收到请求到返回202 | < 3s |
| 回调处理延迟 | 从收到回调到处理完成 | < 15s |
| 存储上传延迟 | 从下载到上传完成 | < 10s |
| 后端通知延迟 | 从全部完成到通知后端 | < 2s |
6.2.2 错误监控
| 错误类型 | 监控方式 | 告警阈值 |
|---|---|---|
| TaskId 找不到 | 日志统计 | > 10次/小时 |
| 存储上传失败 | 数据库 status=failed | > 5% |
| 后端回调失败 | 日志统计 | > 5次/小时 |
| 任务超时 | callback_received_at 为空且 > 30min | 手动处理 |
6.3 常见问题排查
6.3.1 收不到回调
可能原因:
- algorithm_callback_url 配置错误
- 回调地址不是公网可访问
- 服务商平台未配置回调地址
排查步骤:
# 1. 检查配置grep CALLBACK_BASE_URL .env # 2. 检查回调地址是否可访问curl https://your-domain.com/api/callback # 3. 检查服务商平台配置# 查看控制台回调配置# 4. 检查数据库中是否有记录 sqlite3 database.db "SELECT * FROM tasks WHERE status='pending' LIMIT 10;"6.3.2 后端通知失败
可能原因:
- backend_callback_url 地址错误
- 后端服务不可用
- 网络问题
排查步骤:
# 1. 查询任务的 backend_callback_url sqlite3 database.db "SELECT task_id, backend_callback_url FROM tasks WHERE task_id='xxx';"# 2. 手动测试后端回调地址curl -X POST https://backend-service.com/api/notify \ -H "Content-Type: application/json"\ -d '{"task_id": "test", "status": "completed"}'# 3. 查看日志中的错误信息grep"后端回调失败" logs/*.log 6.3.3 存储上传失败
可能原因:
- 存储配置错误
- 网络问题
- 外部 URL 已过期
排查步骤:
# 1. 查看失败任务 sqlite3 database.db "SELECT model_task_id, error_message FROM tasks WHERE status='failed';"# 2. 手动测试上传 python -c "from utils.storage import upload_from_url; print(upload_from_url('https://test.com/video.mp4', 'test_task'))"# 3. 检查外部 URL 是否可访问curl -I https://provider.com/xxx.mp4 6.4 数据维护
6.4.1 定期清理
-- 清理 30 天前已完成的通知任务DELETEFROM tasks WHEREstatus='completed'AND backend_notified =1AND backend_notified_at <datetime('now','-30 days');6.4.2 数据统计
-- 任务状态统计SELECTstatus,COUNT(*)as count,ROUND(COUNT(*)*100.0/(SELECTCOUNT(*)FROM tasks),2)as percentage FROM tasks GROUPBYstatus;-- 业务类型统计SELECT business_type,COUNT(*)as count,COUNT(CASEWHENstatus='completed'THEN1END)as success_count,COUNT(CASEWHENstatus='failed'THEN1END)as failed_count FROM tasks GROUPBY business_type;-- 平均处理时间SELECT business_type,AVG((julianday(storage_uploaded_at)- julianday(callback_received_at))*86400)as avg_processing_seconds FROM tasks WHEREstatus='completed'AND storage_uploaded_at ISNOTNULLGROUPBY business_type;附录
A. 完整文件映射表
| 功能 | 文件路径 | 核心类/方法 | 说明 |
|---|---|---|---|
| API 层 | |||
| 回调入口 | api/routes/callback.py | handle_callback() | 统一回调接收 |
| 业务接口 | api/routes/video.py | POST /video/async | 视频接口 |
| 业务层 | |||
| 处理器分发 | core/dispatcher.py | CallbackDispatcher | 处理器注册和路由 |
| 处理器基类 | core/handlers/base.py | BaseCallbackHandler | 标准处理流程 |
| 处理器注册 | core/handlers/__init__.py | - | 自动注册处理器 |
| 视频处理器 | core/handlers/video.py | VideoCallbackHandler | 视频处理器 |
| 业务逻辑 | core/services/video.py | submit_video_task() | 任务提交逻辑 |
| 数据层 | |||
| 任务模型 | models/task.py | Task | 任务数据模型 |
| 数据库管理 | models/database.py | DatabaseManager | 数据库 CRUD |
| 集成层 | |||
| 外部服务 | integrations/providers/provider_a.py | ProviderAClient | 外部 API |
| 配置 | |||
| 统一配置 | config/settings.py | CALLBACK_CONFIG | 回调地址配置 |
| 工具 | |||
| 存储工具 | utils/storage.py | upload_from_url() | 存储上传 |
| 日志工具 | utils/logger/config.py | get_business_logger() | 业务日志 |
B. 两种回调地址对比表
| 维度 | algorithm_callback_url 算法端回调地址 | backend_callback_url 后端回调地址 |
|---|---|---|
| 方向 | 外部服务 → 我们 | 我们 → 后端 |
| 配置方式 | 平台配置 / 提交时发送 | 请求参数传递 |
| 示例 | https://algorithm.com/api/callback | https://backend.com/api/notify |
| 谁提供 | 算法端配置 | 后端请求参数 |
| 谁回调谁 | 外部服务回调算法端 | 算法端回调后端 |
| 配置位置 | 外部服务商控制台 / API 参数 | 后端请求的 notify_url |
| 安全性 | 平台配置更安全 | 需要后端验证 |
| 存储位置 | 数据库 algorithm_callback_url 字段 | 数据库 backend_callback_url 字段 |
| 使用时机 | 提交任务时发送给外部服务 | 所有子任务完成后回调后端 |
C. 数据库字段说明
| 字段 | 类型 | 说明 | 示例 |
|---|---|---|---|
task_id | TEXT | 业务ID(多个子任务共享) | task_001 |
model_task_id | TEXT | 外部服务TaskId(唯一) | provider_abc123 |
business_type | TEXT | 业务类型(路由用) | video_generation |
provider | TEXT | 服务商名称 | provider_a/provider_b |
status | TEXT | 任务状态 | pending/processing/completed/failed |
item_index | INTEGER | 子任务序号 | 0, 1, 2 |
result_url | TEXT | 外部服务返回的URL | https://provider.com/xxx.mp4 |
storage_url | TEXT | 上传到存储的URL | https://storage.com/xxx.mp4 |
backend_callback_url | TEXT | 后端的回调地址 | https://backend.com/notify |
algorithm_callback_url | TEXT | 算法端的回调地址 | https://algorithm.com/callback |
backend_notified | BOOLEAN | 是否已通知后端 | 0/1 |
request_time | TIMESTAMP | HTTP请求到达时间 | 2025-01-21 10:00:00 |
created_at | TIMESTAMP | 数据库记录创建时间 | 2025-01-21 10:00:01 |
callback_received_at | TIMESTAMP | 收到外部服务回调时间 | 2025-01-21 10:05:00 |
task_completed_time | TIMESTAMP | 外部服务任务完成时间 | 2025-01-21 10:05:00 |
storage_uploaded_at | TIMESTAMP | 存储上传完成时间 | 2025-01-21 10:05:10 |
processed_at | TIMESTAMP | 业务处理完成时间 | 2025-01-21 10:05:11 |
backend_notified_at | TIMESTAMP | 后端通知时间 | 2025-01-21 10:05:13 |