跳到主要内容
AIGC 异步回调系统架构设计与实现 | 极客日志
Python AI 算法
AIGC 异步回调系统架构设计与实现 一套用于处理 AIGC 长耗时任务(如视频、图片生成)的通用异步回调架构。系统通过接收请求立即返回 task_id,利用数据库追踪状态,并在任务完成后自动回调后端。核心设计包括分层架构(API、业务、数据、集成层)、处理器分发器模式、双回调地址机制(供应商回调与后端通知)以及完整的时间戳链路追踪。文档涵盖项目结构、核心代码逻辑、配置说明、开发指南及运维监控指标,支持多业务类型扩展与可观测性管理。
SqlMaster 发布于 2026/3/22 更新于 2026/5/11 22 浏览AIGC 异步回调系统架构设计与实现
一、系统概述
1.1 背景与目标
本系统实现了一套通用的异步回调架构,用于处理 AIGC 服务(视频生成、图片生成等)的长耗时任务。
核心目标 :
快速响应 :接收请求后立即返回 task_id,不阻塞
状态管理 :通过数据库追踪任务状态
回调通知 :完成后自动回调后端
统一管理 :一套架构支持多种业务类型
可观测性 :完整的时间戳链路追踪
1.2 两种回调地址(核心概念)
回调地址 说明 示例 配置方式 algorithm_callback_url AIGC 供应商回调我们的地址 https://algorithm.com/api/callback方式 1:提交时发送 方式 2:供应商平台配置 backend_callback_url 我们完成后端通知的地址 https://backend.com/api/notify后端通过请求参数传递
关键理解 :
后端 → 算法端 → AIGC 供应商
发起 → 提交任务 → 异步处理
(1 -5 分钟)
↓ ↓ ↓
←─────────收到回调─┘
↓ ↓ ↓
下载 + 上传 OSS
──────→通知后端
二、核心文件说明
2.1 典型项目结构
异步回调系统通常采用分层架构,典型结构如下:
project/
├── api/ # API 层 - HTTP 接口定义
│ ├── routes/ # 路由模块
│ │ └── callback.py # 回调统一入口
│ └── schemas/ # 请求/响应模型
├── core/ # 核心业务层
│ ├── dispatcher.py # 处理器分发器
│ ├── handlers/ # 回调处理器
│ │ ├── base .py # 处理器基类
│ │ └── video.py # 视频处理器示例
│ └── services/ # 业务逻辑
│ ├── models/ # 数据层
│ │ ├── task.py # 任务模型
│ │ └── database.py # 数据库操作
│ ├── integrations/
│ │ ├── factory.py
│ │ └── providers/
├── config/
│ └── settings.py
├── utils/
│ ├── storage.py
│ └── logger/
# 外部服务集成
# 模型工厂
# 各厂商模型封装对象
# 配置
# 配置管理
# 工具模块
# 存储/OSS 工具
# 日志系统
┌─────────────────────────────────────────────┐
│ API 层 接收请求、参数验证 │
├─────────────────────────────────────────────┤
│ 业务层 回调处理、业务编排 │
├─────────────────────────────────────────────┤
│ 数据层 模型定义、数据持久化 │
├─────────────────────────────────────────────┤
│ 集成层 外部 API 调用 │
├─────────────────────────────────────────────┤
│ 工具层 通用工具函数 │
└─────────────────────────────────────────────┘
2.2 核心文件详解
2.2.1 API 层
回调统一入口
解析回调数据,提取 TaskId(支持多种格式)
根据 TaskId 查询数据库,获取任务信息
根据 business_type 路由到对应处理器
更新回调接收时间戳
@router.post("/callback" )
async def handle_callback (request: Request ) -> JSONResponse:
pass
供应商 A:Event.TaskId(事件嵌套格式)
供应商 A:TaskId(扁平格式)
供应商 B:req_id
通用:task_id
model_task_id = (
data.get("TaskId" ) or
data.get("task_id" ) or
data.get("req_id" ) or
data.get("Event" , {}).get("TaskId" )
)
db = get_db()
task = await db.get_by_model_task_id(model_task_id)
handler = Dispatcher.get_handler(task.business_type)
await handler.process_callback(task, data)
callback.py → models/database.py (数据库查询) → core/dispatcher.py (处理器路由) → core/handlers/*.py (具体处理器)
业务接口路由
POST /api/v1/video/async - 视频异步生成
GET /api/v1/video/status/{task_id} - 查询任务状态
接收 HTTP 请求
参数验证(Pydantic Schema)
调用业务层处理
返回统一格式响应
2.2.2 业务层
处理器分发器 职责 :处理器分发器,根据 business_type 路由到对应处理器
维护处理器注册表
提供 register() 方法注册处理器
提供 get_handler() 方法获取处理器实例
class CallbackDispatcher :
"""回调处理器分发器"""
_handlers: dict [str , Type ] = {}
@classmethod
def register (cls, business_type: str , handler_class: Type ):
"""注册处理器"""
cls._handlers[business_type] = handler_class
logger.info(f"注册处理器:{business_type} -> {handler_class.__name__} " )
@classmethod
def get_handler (cls, business_type: str ):
"""获取对应的处理器实例"""
handler_class = cls._handlers.get(business_type)
if not handler_class:
raise ValueError(f"未知的业务类型:{business_type} " )
return handler_class(business_type)
video_generation → VideoGenerationCallbackHandler
image_generation → ImageGenerationCallbackHandler
(可扩展)
处理器基类
定义 process_callback() 抽象方法(子类必须实现)
提供通用的 OSS 上传方法 download_and_upload_storage()
提供通用的后端通知方法 check_and_notify()
提供通用的 URL 提取方法 extract_result_url()
async def process_callback (task, callback_data ):
result_url = self .extract_result_url(callback_data)
storage_url = await self .download_and_upload_storage(result_url, ...)
await db.update_storage_uploaded(task.model_task_id, storage_url)
await self .check_and_notify(task.task_id)
@abstractmethod
async def process_callback (self, task: Task, callback_data: dict ) -> None :
"""处理回调的核心逻辑"""
pass
async def download_and_upload_storage (self, url: str , task_id: str , ... ) -> str :
"""从 URL 流式上传到存储(不落地)"""
pass
async def check_and_notify (self, task_id: str , results: list = None ) -> None :
"""检查是否所有子任务完成,通知后端"""
pass
def extract_result_url (self, callback_data: dict ) -> str :
"""从回调数据提取结果 URL(支持多种格式)"""
pass
BaseCallbackHandler (抽象基类)
├── VideoGenerationCallbackHandler
├── ImageGenerationCallbackHandler
└── ... (其他业务处理器)
处理器自动注册
from .video_handler import VideoGenerationCallbackHandler
from .image_handler import ImageGenerationCallbackHandler
from core.dispatcher import CallbackDispatcher
CallbackDispatcher.register("video_generation" , VideoGenerationCallbackHandler)
CallbackDispatcher.register("image_generation" , ImageGenerationCallbackHandler)
新增处理器后,必须在此文件中导入和注册
business_type 必须唯一,否则会覆盖
具体业务处理器示例
实现 process_callback() 方法
处理视频生成回调
调用基类的通用方法
class VideoGenerationCallbackHandler (BaseCallbackHandler ):
"""视频生成回调处理器"""
async def process_callback (
self, task: Task, callback_data: Dict [str , Any ]
) -> None :
video_url = self .extract_result_url(callback_data)
storage_url = await self .download_and_upload_storage(
url=video_url, task_id=task.task_id, item_index=task.item_index
)
await db.update_storage_uploaded(task.model_task_id, storage_url)
await self .check_and_notify(task.task_id)
def _build_result_item (self, task: Task ) -> Dict [str , Any ]:
"""构建单个结果项(业务格式)"""
request_data = task.get_request_data()
return {
"index" : task.item_index,
"model_task_id" : task.model_task_id,
"custom_field" : request_data.get("custom_field" , "" ),
"storage_url" : task.storage_url,
"status" : task.status.value,
}
业务逻辑服务
接收 API 层请求
获取回调地址配置
循环提交任务到外部服务
保存任务映射关系到数据库
返回统一格式响应
async def submit_video_generation_task (request: Dict [str , Any ] ) -> Dict [str , Any ]:
"""提交视频生成任务"""
algorithm_callback_url = _get_callback_url()
backend_callback_url = request.get("notify_url" )
for idx, item in enumerate (request.get("items" , [])):
result = await client.create_task(...)
model_task_id = result["TaskId" ]
task_record = Task(
task_id=task_id,
model_task_id=model_task_id,
business_type="video_generation" ,
backend_callback_url=backend_callback_url,
algorithm_callback_url=algorithm_callback_url,
...
)
await db.create_task(task_record)
return {"status" : "accepted" , "task_id" : task_id}
video_service.py → integrations/factory.py (获取 Client) → models/database.py (数据库操作) → models/task.py (任务模型)
2.2.3 数据层
任务数据模型 @dataclass
class Task :
task_id: str
model_task_id: str
business_type: str
provider: str
status: TaskStatus = TaskStatus.PENDING
item_index: int = 0
request_payload: Optional [str ] = None
prompt: Optional [str ] = None
result_url: Optional [str ] = None
storage_url: Optional [str ] = None
callback_payload: Optional [str ] = None
processed_data: Optional [str ] = None
error_message: Optional [str ] = None
backend_callback_url: Optional [str ] = None
algorithm_callback_url: Optional [str ] = None
backend_notified: bool = False
request_time: Optional [datetime] = None
created_at: Optional [datetime] = None
callback_received_at: Optional [datetime] = None
task_completed_time: Optional [datetime] = None
storage_uploaded_at: Optional [datetime] = None
processed_at: Optional [datetime] = None
backend_notified_at: Optional [datetime] = None
class TaskStatus (str , Enum):
PENDING = "pending"
PROCESSING = "processing"
COMPLETED = "completed"
FAILED = "failed"
数据库管理 class DatabaseManager :
async def create_task (self, task: Task ) -> int :
"""创建任务记录"""
pass
async def get_by_model_task_id (self, model_task_id: str ) -> Optional [Task]:
"""根据 model_task_id 查询任务"""
pass
async def get_by_task_id (self, task_id: str ) -> List [Task]:
"""根据 task_id 查询所有子任务"""
pass
async def update_status (
self, model_task_id: str , status: TaskStatus, error_message: str = None
) -> bool :
"""更新任务状态"""
pass
async def update_callback_received (
self, model_task_id: str , callback_payload: str , result_url: str , task_completed_time: datetime
) -> bool :
"""更新回调接收状态"""
pass
async def update_storage_uploaded (self, model_task_id: str , storage_url: str ) -> bool :
"""更新存储上传状态"""
pass
async def check_all_completed (self, task_id: str ) -> bool :
"""检查是否所有子任务完成"""
pass
async def update_backend_notified (self, task_id: str ) -> bool :
"""更新后端通知状态"""
pass
CREATE TABLE tasks (
id INTEGER PRIMARY KEY AUTOINCREMENT,
task_id TEXT NOT NULL ,
model_task_id TEXT NOT NULL UNIQUE ,
business_type TEXT NOT NULL ,
provider TEXT NOT NULL ,
status TEXT DEFAULT 'pending' ,
item_index INTEGER DEFAULT 0 ,
request_payload TEXT,
prompt TEXT,
result_url TEXT,
storage_url TEXT,
callback_payload TEXT,
processed_data TEXT,
error_message TEXT,
backend_callback_url TEXT,
algorithm_callback_url TEXT,
backend_notified BOOLEAN DEFAULT 0 ,
request_time TIMESTAMP ,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ,
callback_received_at TIMESTAMP ,
task_completed_time TIMESTAMP ,
storage_uploaded_at TIMESTAMP ,
processed_at TIMESTAMP ,
backend_notified_at TIMESTAMP ,
INDEX idx_task_id (task_id),
INDEX idx_model_task (model_task_id),
INDEX idx_business_type (business_type),
INDEX idx_status (status)
);
2.2.4 集成层
外部服务 Client class ExternalServiceClient (BaseClient ):
async def create_video_task (
self, model: str , prompt: str = "" , ...
) -> Dict [str , Any ]:
"""创建视频生成任务"""
pass
async def query_task (self, task_id: str ) -> Dict [str , Any ]:
"""查询任务状态"""
pass
使用对应服务商的签名方式
支持多种模型
回调地址可通过平台配置,API 参数可选
2.2.5 配置层
统一配置管理 CALLBACK_CONFIG = {
"base_url" : os.getenv("CALLBACK_BASE_URL" , "https://your-domain.com" ),
"callback_path" : "/api/callback" ,
}
def _get_providers ():
return {
"provider_a" : {"api_key" : "..." , "secret_id" : "..." , ...},
"provider_b" : {"api_key" : "..." , ...},
}
CONFIG = {
"env" : os.getenv("ENV" , "dev" ),
"debug" : os.getenv("ENV" , "dev" ) == "dev" ,
"providers" : _get_providers(),
"timeout" : TIMEOUT_CONFIG,
"retry" : RETRY_CONFIG,
"callback" : CALLBACK_CONFIG,
}
CALLBACK_BASE_URL=http://localhost:8000
ENV=dev
CALLBACK_BASE_URL=https://your-domain.com
ENV=prod
三、数据流详解
3.1 任务提交流程
后端服务 :接收用户请求,记录 request_time,验证参数。
配置读取 :获取 algorithm_callback_url(从 config/settings.py 读取)。
任务创建 :创建 Task 对象,包含 task_id, model_task_id, business_type, backend_callback_url, algorithm_callback_url, request_time。
循环提交 :对每个 item 调用 POST CreateTask,获取 TaskId。
返回响应 :返回 202 Accepted 及 task_id。
外部服务商 :异步处理任务(1-5 分钟)。
3.2 回调处理流程
外部服务商 :任务完成后,回调 POST /api/callback。
API 层 :
提取 TaskId
查询数据库
更新 callback_received_at
提取 result_url
业务层 :
根据 business_type 路由到对应处理器
流式上传(不落地)到存储
从数据库读取 backend_callback_url
判断完成 :
若还有子任务未完成:暂不通知,等待其他回调。
若所有子任务完成:构建完整结果列表,POST backend_callback_url。
后端服务 :接收通知,更新 backend_notified 状态。
3.3 字段更新时间线 ┌─────────────────────────────────────────────────────────────┐
│ 时间线 │
├─────────────────────────────────────────────────────────────┤
│ │
│ 10:00:00 ─ request_time ← API 层记录 │
│ ↓ │
│ 10:00:01 ─ created_at ← 数据库记录创建 │
│ ↓ │
│ ─ 提交到外部服务,获得 TaskId │
│ ↓ │
│ ─ 外部服务异步处理中 (1-5 分钟) │
│ ↓ │
│ 10:05:00 ─ callback_received_at ← 收到外部服务回调 │
│ 10:05:00 ─ task_completed_time ← 外部服务任务完成 │
│ ↓ │
│ 10:05:10 ─ storage_uploaded_at ← 存储上传完成 │
│ 10:05:11 ─ processed_at ← 业务处理完成 │
│ ↓ │
│ 10:05:13 ─ backend_notified_at ← 后端通知完成 │
│ │
└─────────────────────────────────────────────────────────────┘
四、配置说明
4.1 回调地址配置
4.1.1 algorithm_callback_url(算法端回调地址) CALLBACK_CONFIG = {
"base_url" : os.getenv("CALLBACK_BASE_URL" , "https://your-domain.com" ),
"callback_path" : "/api/callback" ,
}
def _get_callback_url () -> str :
"""获取算法端回调地址"""
callback_config = CONFIG.get("callback" , {})
base_url = callback_config.get("base_url" , "" )
return f"{base_url} /api/callback"
服务商 配置方式 是否必须 优先级 服务商 A 平台配置 / 提交时发送 否 平台配置优先 服务商 B 提交时发送 是 必须发送 服务商 C 平台配置 否 平台配置
登录服务商控制台
进入产品页面
找到【事件回调】或【Webhook 配置】
添加回调服务器:
回调地址:https://your-domain.com/api/callback
鉴权方式:密钥鉴权
回调事件:勾选【任务完成】
保存配置
4.1.2 backend_callback_url(后端回调地址) POST /api/v1/video/async
{
"notify_url" : "https://backend-service.com/api/notify"
}
保存到数据库 (backend_callback_url 字段)
所有子任务完成后,从数据库读取并回调
4.2 环境变量配置
CALLBACK_BASE_URL=http://localhost:8000
ENV=dev
CALLBACK_BASE_URL=https://algorithm-domain.com
ENV=prod
PROVIDER_A_SECRET_ID=your_secret_id
PROVIDER_A_SECRET_KEY=your_secret_key
PROVIDER_B_API_KEY=your_api_key
五、开发指南
5.1 新增业务处理器
步骤 1:创建处理器文件
from typing import Dict , Any
from .base import BaseCallbackHandler
from models.task import Task, TaskStatus
from models.database import get_db
class MyBusinessCallbackHandler (BaseCallbackHandler ):
"""我的业务回调处理器"""
def __init__ (self, business_type: str = "my_business" ):
super ().__init__(business_type=business_type)
async def process_callback (
self, task: Task, callback_data: Dict [str , Any ]
) -> None :
"""处理回调"""
result_url = self .extract_result_url(callback_data)
if not result_url:
await get_db().update_status(
task.model_task_id, TaskStatus.FAILED, "无法提取结果 URL"
)
return
try :
storage_url = await self .download_and_upload_storage(
url=result_url, task_id=task.task_id, item_index=task.item_index
)
except Exception as e:
await get_db().update_status(
task.model_task_id, TaskStatus.FAILED, f"存储上传失败:{str (e)} "
)
return
await get_db().update_storage_uploaded(task.model_task_id, storage_url)
await self .check_and_notify(task.task_id)
def _build_result_item (self, task: Task ) -> Dict [str , Any ]:
"""构建单个结果项(自定义格式)"""
request_data = task.get_request_data()
return {
"index" : task.item_index,
"model_task_id" : task.model_task_id,
"custom_field" : request_data.get("custom_field" , "" ),
"storage_url" : task.storage_url,
"status" : task.status.value,
}
步骤 2:注册处理器
from .my_business_handler import MyBusinessCallbackHandler
from core.dispatcher import CallbackDispatcher
CallbackDispatcher.register("my_business" , MyBusinessCallbackHandler)
步骤 3:实现业务逻辑
async def submit_my_business_task (request: Dict [str , Any ] ) -> Dict [str , Any ]:
"""提交我的业务任务"""
from config.settings import CONFIG
callback_config = CONFIG.get("callback" , {})
base_url = callback_config.get("base_url" , "" )
algorithm_callback_url = f"{base_url} /api/callback"
backend_callback_url = request.get("notify_url" )
for idx, item in enumerate (request.get("items" , [])):
result = await client.create_task(...)
model_task_id = result["TaskId" ]
task_record = Task(
task_id=task_id,
model_task_id=model_task_id,
business_type="my_business" ,
provider="xxx" ,
backend_callback_url=backend_callback_url,
algorithm_callback_url=algorithm_callback_url,
...
)
await db.create_task(task_record)
return {"status" : "accepted" , "task_id" : task_id}
5.2 调试技巧
5.2.1 日志追踪 from utils.logger.config import get_business_logger
logger = get_business_logger("my_business" )
logger.info("任务提交" , extra={
"task_id" : task_id,
"model_task_id" : model_task_id,
"request" : request
})
5.2.2 数据库查询
SELECT * FROM tasks WHERE status != 'completed' ;
SELECT * FROM tasks WHERE task_id = 'task_001' ;
SELECT * FROM tasks WHERE model_task_id = 'provider_abc123' ;
SELECT status, COUNT (* ) FROM tasks GROUP BY status;
5.2.3 回调测试
import httpx
import json
async def test_manual_callback ():
"""手动测试回调"""
url = "http://localhost:8000/api/callback"
callback_data = {
"Event" : {
"TaskId" : "provider_abc123" ,
"Status" : "FINISH" ,
"Output" : {
"FileUrl" : "https://test.com/video.mp4"
}
}
}
async with httpx.AsyncClient() as client:
response = await client.post(url, json=callback_data)
print (response.status_code)
print (response.json())
六、运维指南
6.1 部署检查清单
6.1.1 环境变量检查
6.1.2 数据库初始化
SELECT name FROM sqlite_master WHERE type= 'table' AND name= 'tasks' ;
SELECT name FROM sqlite_master WHERE type= 'index' AND tbl_name= 'tasks' ;
6.1.3 回调地址测试
curl -X POST https://your-domain.com/api/callback \
-H "Content-Type: application/json" \
-d '{"test": true}'
6.2 监控指标
6.2.1 关键性能指标(KPIs) 指标 说明 目标值 任务提交延迟 从收到请求到返回 202 < 3s 回调处理延迟 从收到回调到处理完成 < 15s 存储上传延迟 从下载到上传完成 < 10s 后端通知延迟 从全部完成到通知后端 < 2s
6.2.2 错误监控 错误类型 监控方式 告警阈值 TaskId 找不到 日志统计 > 10 次/小时 存储上传失败 数据库 status=failed > 5% 后端回调失败 日志统计 > 5 次/小时 任务超时 callback_received_at 为空且 > 30min 手动处理
6.3 常见问题排查
6.3.1 收不到回调
algorithm_callback_url 配置错误
回调地址不是公网可访问
服务商平台未配置回调地址
grep CALLBACK_BASE_URL .env
curl https://your-domain.com/api/callback
sqlite3 database.db "SELECT * FROM tasks WHERE status='pending' LIMIT 10;"
6.3.2 后端通知失败
backend_callback_url 地址错误
后端服务不可用
网络问题
sqlite3 database.db "SELECT task_id, backend_callback_url FROM tasks WHERE task_id='xxx';"
curl -X POST https://backend-service.com/api/notify \
-H "Content-Type: application/json" \
-d '{"task_id": "test", "status": "completed"}'
grep "后端回调失败" logs/*.log
6.3.3 存储上传失败
sqlite3 database.db "SELECT model_task_id, error_message FROM tasks WHERE status='failed';"
python -c "from utils.storage import upload_from_url; print(upload_from_url('https://test.com/video.mp4', 'test_task'))"
curl -I https://provider.com/xxx.mp4
6.4 数据维护
6.4.1 定期清理
DELETE FROM tasks
WHERE status= 'completed'
AND backend_notified = 1
AND backend_notified_at < datetime('now' ,'-30 days' );
6.4.2 数据统计
SELECT status, COUNT (* ) as count, ROUND(COUNT (* )* 100.0 / (SELECT COUNT (* ) FROM tasks),2 ) as percentage
FROM tasks GROUP BY status;
SELECT business_type, COUNT (* ) as count,
COUNT (CASE WHEN status= 'completed' THEN 1 END ) as success_count,
COUNT (CASE WHEN status= 'failed' THEN 1 END ) as failed_count
FROM tasks GROUP BY business_type;
SELECT business_type, AVG ((julianday(storage_uploaded_at)- julianday(callback_received_at))* 86400 ) as avg_processing_seconds
FROM tasks WHERE status= 'completed' AND storage_uploaded_at IS NOT NULL GROUP BY business_type;
附录
A. 完整文件映射表 功能 文件路径 核心类/方法 说明 API 层 回调入口 api/routes/callback.pyhandle_callback()统一回调接收 业务接口 api/routes/video.pyPOST /video/async视频接口 业务层 处理器分发 core/dispatcher.pyCallbackDispatcher处理器注册和路由 处理器基类 core/handlers/base.pyBaseCallbackHandler标准处理流程 处理器注册 core/handlers/__init__.py- 自动注册处理器 视频处理器 core/handlers/video.pyVideoCallbackHandler视频处理器 业务逻辑 core/services/video.pysubmit_video_task()任务提交逻辑 数据层 任务模型 models/task.pyTask任务数据模型 数据库管理 models/database.pyDatabaseManager数据库 CRUD 集成层 外部服务 integrations/providers/provider_a.pyProviderAClient外部 API 配置 统一配置 config/settings.pyCALLBACK_CONFIG回调地址配置 工具 存储工具 utils/storage.pyupload_from_url()存储上传 日志工具 utils/logger/config.pyget_business_logger()业务日志
B. 两种回调地址对比表 维度 algorithm_callback_url (算法端回调地址) backend_callback_url (后端回调地址) 方向 外部服务 → 我们 我们 → 后端 配置方式 平台配置 / 提交时发送 请求参数传递 示例 https://algorithm.com/api/callbackhttps://backend.com/api/notify谁提供 算法端配置 后端请求参数 谁回调谁 外部服务回调算法端 算法端回调后端 配置位置 外部服务商控制台 / API 参数 后端请求的 notify_url 安全性 平台配置更安全 需要后端验证 存储位置 数据库 algorithm_callback_url 字段 数据库 backend_callback_url 字段 使用时机 提交任务时发送给外部服务 所有子任务完成后回调后端
C. 数据库字段说明 字段 类型 说明 示例 task_idTEXT 业务 ID(多个子任务共享) task_001model_task_idTEXT 外部服务 TaskId(唯一) provider_abc123business_typeTEXT 业务类型(路由用) video_generationproviderTEXT 服务商名称 provider_a/provider_bstatusTEXT 任务状态 pending/processing/completed/faileditem_indexINTEGER 子任务序号 0, 1, 2result_urlTEXT 外部服务返回的 URL https://provider.com/xxx.mp4storage_urlTEXT 上传到存储的 URL https://storage.com/xxx.mp4backend_callback_urlTEXT 后端的回调地址 https://backend.com/notifyalgorithm_callback_urlTEXT 算法端的回调地址 https://algorithm.com/callbackbackend_notifiedBOOLEAN 是否已通知后端 0/1request_timeTIMESTAMP HTTP 请求到达时间 2025-01-21 10:00:00created_atTIMESTAMP 数据库记录创建时间 2025-01-21 10:00:01callback_received_atTIMESTAMP 收到外部服务回调时间 2025-01-21 10:05:00task_completed_timeTIMESTAMP 外部服务任务完成时间 2025-01-21 10:05:00storage_uploaded_atTIMESTAMP 存储上传完成时间 2025-01-21 10:05:10processed_atTIMESTAMP 业务处理完成时间 2025-01-21 10:05:11backend_notified_atTIMESTAMP 后端通知时间 2025-01-21 10:05:13
相关免费在线工具 加密/解密文本 使用加密算法(如AES、TripleDES、Rabbit或RC4)加密和解密文本明文。 在线工具,加密/解密文本在线工具,online
RSA密钥对生成器 生成新的随机RSA私钥和公钥pem证书。 在线工具,RSA密钥对生成器在线工具,online
Mermaid 预览与可视化编辑 基于 Mermaid.js 实时预览流程图、时序图等图表,支持源码编辑与即时渲染。 在线工具,Mermaid 预览与可视化编辑在线工具,online
随机西班牙地址生成器 随机生成西班牙地址(支持马德里、加泰罗尼亚、安达卢西亚、瓦伦西亚筛选),支持数量快捷选择、显示全部与下载。 在线工具,随机西班牙地址生成器在线工具,online
Gemini 图片去水印 基于开源反向 Alpha 混合算法去除 Gemini/Nano Banana 图片水印,支持批量处理与下载。 在线工具,Gemini 图片去水印在线工具,online
curl 转代码 解析常见 curl 参数并生成 fetch、axios、PHP curl 或 Python requests 示例代码。 在线工具,curl 转代码在线工具,online