跳到主要内容OpenClaw Cron 系统设计:实现 AI Agent 自主定时任务 | 极客日志TypeScriptNode.jsAI算法
OpenClaw Cron 系统设计:实现 AI Agent 自主定时任务
OpenClaw Cron 系统采用单一定时器架构与持久化存储机制,支持 at、every、cron 三种调度类型,实现 AI Agent 自主定时任务。通过 Main 注入与 Isolated 独立会话两种执行模式,解决上下文隔离与后台执行问题,提供并发控制、状态流转及结果汇报能力,提升 Agent 主动服务能力。
片刻15 浏览 OpenClaw Cron 系统设计:实现 AI Agent 自主定时任务
为什么 Agent 需要定时任务?
想象一下这个场景:你让 AI 助手帮你每天早上 9 点检查一下服务器状态。
传统的做法是什么?你得自己设个闹钟,到点了打开对话框,再敲一遍帮我检查服务器。这跟没有 AI 助手有什么区别?
真正智能的 Agent 应该能够:
- 自主调度:记住用户的需求,到点自动执行
- 后台执行:不打扰用户,悄悄干活
- 主动汇报:干完了告诉你结果
这就是 OpenClaw Cron 系统要解决的问题。它让 Agent 从被动响应升级为主动服务。
系统架构总览
Cron 系统由三个核心组件构成——CronStore 负责持久化、CronOps 处理增删改查、CronTimer 调度执行。最关键的是 executeJob() 执行引擎,它决定任务是注入主会话还是启动独立会话。
整个 Cron 系统的设计思路很清晰:
| 组件 | 职责 | 关键方法 |
|---|
| CronStore | 持久化存储 | load / save |
| CronOps | CRUD 操作 | add / remove / update |
| CronTimer | 定时调度 | armTimer / onTimer |
| executeJob | 任务执行 | main vs isolated |
这套架构的精妙之处在于:一个定时器管所有任务。不是给每个任务都开一个定时器(那样内存会爆),而是只维护一个指向最近要执行的任务的定时器。每次触发后,再计算下一个最近的任务。
三种调度类型
at:一次性定时
{
kind: "at",
atMs: Date.now() + 3600_000
}
这是最简单的调度——在指定时间点执行一次。执行完成后,job.enabled 自动设为 false,任务就算完结了。
适用场景:
every:间隔执行
{
kind: "every",
everyMs: 60_000,
anchorMs?: 1706745600000
}
固定间隔重复执行。有个可选的 anchorMs 参数很有意思——它是对齐锚点。
比如你想让任务每小时整点执行,而不是从现在开始每小时执行,就可以设置一个整点时间戳作为锚点。计算公式是:
nextRun = anchor + Math.ceil((now - anchor) / everyMs) * everyMs
cron:表达式调度
{
kind: "cron",
expr: "0 9 * * 1-5",
tz: "Asia/Shanghai"
}
标准 cron 格式,还支持时区设置。上面这个表达式的意思是:工作日每天早 9 点(上海时间)。
| 位置 | 含义 | 示例 |
|---|
| 1 | 分钟 | 0-59 |
| 2 | 小时 | 0-23 |
| 3 | 日期 | 1-31 |
| 4 | 月份 | 1-12 |
| 5 | 星期 | 0-7 (0 和 7 都是周日) |
OpenClaw 使用 croniter 库解析表达式。这个库在 Python 生态里很成熟,处理各种边界情况(比如闰年、夏令时)都很稳定。
两种执行模式:Main vs Isolated
这是 Cron 系统最有趣的设计。同样是定时任务,执行方式完全不同:
Main Session:注入主会话
{
sessionTarget: "main",
payload: {
kind: "systemEvent",
text: "每日提醒:检查邮件"
},
wakeMode: "now"
}
任务不是执行,而是注入。系统把消息塞进主会话的消息队列,就像有人在对话框里发了一条系统消息。
- 简单提醒(不需要 Agent 做复杂操作)
- 需要用户看到并响应的任务
- 依赖现有上下文的任务
wakeMode 参数控制是否立即触发 Agent 心跳:
now:立刻触发,Agent 马上处理这条消息
next-heartbeat:等下次自然心跳时再处理
Isolated Session:独立会话执行
{
sessionTarget: "isolated",
payload: {
kind: "agentTurn",
message: "检查服务器健康状态并生成报告",
model: "claude-3-5-sonnet",
timeoutSeconds: 300,
deliver: true,
channel: "telegram",
to: "@user123"
}
}
这才是真正的后台执行。系统会启动一个全新的 Agent 会话,专门执行这个任务。执行完后,把结果汇报回主会话。
- 复杂任务(需要多轮思考和工具调用)
- 耗时任务(用户不想等)
- 不需要用户介入的任务
| 参数 | 作用 |
|---|
model | 指定执行任务的模型 |
timeoutSeconds | 超时限制 |
deliver | 是否把结果推送给用户 |
channel | 推送渠道(telegram/email/…) |
to | 收件人 |
定时器核心逻辑
单一定时器模式
const MAX_TIMEOUT_MS = 2**31 - 1;
export function armTimer(state: CronServiceState) {
if (state.timer) clearTimeout(state.timer);
state.timer = null;
const nextAt = nextWakeAtMs(state);
if (!nextAt) return;
const delay = Math.max(nextAt - state.deps.nowMs(), 0);
const clampedDelay = Math.min(delay, MAX_TIMEOUT_MS);
state.timer = setTimeout(() => {
void onTimer(state);
}, clampedDelay);
state.timer.unref?.();
}
- 单一定时器:永远只有一个活跃的定时器,指向最近的任务。这避免了定时器泛滥。
- 延迟上限处理:JavaScript 的
setTimeout 最大只支持约 24.8 天(2^31-1 毫秒)。如果任务在更远的未来,先设一个最大延迟,到时候再重新计算。
- unref 调用:
timer.unref() 让这个定时器不阻止 Node.js 进程退出。如果用户关闭了应用,不会因为还有待执行的定时任务而卡住。
并发控制
export async function onTimer(state: CronServiceState) {
if (state.running) return;
state.running = true;
try {
await locked(state, async () => {
await ensureLoaded(state);
await runDueJobs(state);
await persist(state);
armTimer(state);
});
} finally {
state.running = false;
}
}
用一个简单的 running 标志位防止重入。如果定时器触发时上一次执行还没结束,直接跳过。
任务执行流程
阶段 1:Timer 触发
阶段 2:筛选到期任务
const due = jobs.filter(j =>
j.enabled &&
j.state.runningAtMs === null &&
j.state.nextRunAtMs !== null &&
nowMs >= j.state.nextRunAtMs
);
注意 runningAtMs 检查——如果一个任务正在执行(比如上次还没跑完),不会重复触发。
阶段 3:执行任务
state.deps.enqueueSystemEvent(text, { agentId: job.agentId });
if (job.wakeMode === "now") {
const result = await state.deps.runHeartbeatOnce({ reason: `cron:${job.id}` });
}
const res = await state.deps.runIsolatedAgentJob({ job, message: job.payload.message });
state.deps.enqueueSystemEvent(`${prefix}: ${res.summary}`, { agentId: job.agentId });
阶段 4:更新状态
job.state.lastRunAtMs = startedAt;
job.state.lastStatus = status;
job.state.lastDurationMs = endedAt - startedAt;
if (job.schedule.kind === "at" && status === "ok") {
job.enabled = false;
} else if (job.enabled) {
job.state.nextRunAtMs = computeNextRunAtMs(job.schedule, nowMs);
}
阶段 5:持久化 + 重新调度
保存任务状态到存储,然后 armTimer() 重新设置下一个定时器。
任务状态机
┌─────────────────────────────────────────────────────────────┐
│ │
│ ┌──────────┐ add() ┌──────────┐ │
│ │ 创建 │ ─────>│ enabled │ <────┐ │
│ └──────────┘ └────┬─────┘ │ │
│ │ │ │
│ │ 到期触发 │ │
│ │ │ │
│ v │ │
│ ┌──────────┐ │ │
│ │ running │ │ │
│ └────┬─────┘ │ │
│ │ │ │
│ ┌────────────┼───────────────┐ │
│ │ │ │ │
│ v v v │
│ ┌──────────┐ ┌──────────┐ ┌──────────┐ │
│ │ ok │ │ error │ │ skipped │ │
│ └────┬─────┘ └────┬─────┘ └────┬─────┘ │
│ │ │ │ │
│ └───────────┴───────────┘ │
│ │ │
│ ┌───────┴───────┐ │
│ │ │ │
│ v v │
│ ┌──────────┐ ┌──────────┐ │
│ │ disabled │ │ 等待下次执行 │ │
│ └──────────┘ └──────────┘ │
│ │
└─────────────────────────────────────────────────────────────┘
| 状态字段 | 含义 |
|---|
enabled | 任务是否激活 |
nextRunAtMs | 下次执行时间戳 |
runningAtMs | 当前执行开始时间(null 表示未在执行) |
lastStatus | 上次执行结果 |
lastDurationMs | 上次执行耗时 |
设计要点总结
1. 单一定时器 vs 多定时器
- 内存占用:每个任务一个定时器,1000 个任务就是 1000 个定时器
- 精度问题:大量定时器可能导致事件循环拥堵
- 难以管理:取消、更新操作复杂
- 内存恒定:永远只有一个活跃定时器
- 逻辑清晰:所有调度逻辑集中在
armTimer()
- 易于调试:只需关注一个定时器的行为
2. Main vs Isolated 的权衡
| 维度 | Main | Isolated |
|---|
| 上下文 | 共享主会话 | 独立会话 |
| 适用任务 | 简单提醒 | 复杂操作 |
| 用户感知 | 立即可见 | 执行完再通知 |
| 资源消耗 | 低 | 高(新建会话) |
- 只是提醒用户做某事
- 需要用户确认或响应
- 任务依赖当前对话上下文
- 任务可能耗时较长
- 不需要用户介入
- 需要干净的执行环境
3. 结果汇报机制
Isolated 任务执行完后,通过 enqueueSystemEvent() 把结果注入主会话。用户会看到类似:
这个 Cron: 前缀是可配置的(postToMainPrefix 字段)。
Python 复现建议
如果你想用 Python 实现类似的 Cron 系统,核心依赖是:
1. 调度计算
from croniter import croniter
from zoneinfo import ZoneInfo
def compute_next_run(schedule, now_ms):
if schedule.kind == "cron":
tz = ZoneInfo(schedule.tz) if schedule.tz else timezone.utc
base_time = datetime.fromtimestamp(now_ms / 1000, tz=tz)
cron = croniter(schedule.expr, base_time)
next_time = cron.get_next(datetime)
return int(next_time.timestamp() * 1000)
2. 异步定时器
async def arm_timer(self):
if self.timer_task:
self.timer_task.cancel()
next_at = self.next_wake_at_ms()
if next_at is None:
return
delay = max(0, (next_at - self.now_ms()) / 1000)
self.timer_task = asyncio.create_task(self.timer_tick(delay))
3. 依赖注入
class CronService:
def __init__(
self,
on_system_event: Callable[[str], None],
run_agent_turn: Callable[[Job, str], dict],
run_heartbeat: Callable[[str], dict],
):
self.on_system_event = on_system_event
self.run_agent_turn = run_agent_turn
self.run_heartbeat = run_heartbeat
把执行 Agent、触发心跳等操作作为依赖注入,让 Cron 模块可以独立测试。
思考
这套设计解决了什么问题?
传统的定时任务系统(比如 crontab、APScheduler)只管到点执行。但 Agent 场景下,执行本身是个复杂的过程——需要上下文、需要推理、需要调用工具、还需要汇报结果。
OpenClaw 的 Cron 系统把这些都考虑进去了:
- 上下文隔离:Isolated 模式避免污染主对话
- 结果回传:执行完自动汇报
- 灵活调度:三种调度类型覆盖常见场景
还有什么可以改进?
- 任务依赖:当前任务之间是独立的。如果 A 任务失败了,B 任务是否还执行?没有依赖图的概念。
- 重试机制:任务失败后没有自动重试。对于网络请求类任务,这可能是个问题。
- 优先级调度:所有任务平等。如果同一时刻有多个任务到期,按什么顺序执行?
- 分布式支持:单机单定时器的设计,在分布式场景下需要改造。
实际应用场景
- 个人助理 Agent:每日提醒、定期汇总、自动检查
- 监控 Agent:定时巡检、异常告警
- 内容 Agent:定时抓取、自动发布
想象一下:你对 AI 说每天晚上 10 点帮我总结一下今天的邮件,它就真的每天 10 点自动干活,干完了发个总结给你。这才是真正有用的 AI 助手。
相关免费在线工具
- 加密/解密文本
使用加密算法(如AES、TripleDES、Rabbit或RC4)加密和解密文本明文。 在线工具,加密/解密文本在线工具,online
- RSA密钥对生成器
生成新的随机RSA私钥和公钥pem证书。 在线工具,RSA密钥对生成器在线工具,online
- Mermaid 预览与可视化编辑
基于 Mermaid.js 实时预览流程图、时序图等图表,支持源码编辑与即时渲染。 在线工具,Mermaid 预览与可视化编辑在线工具,online
- 随机西班牙地址生成器
随机生成西班牙地址(支持马德里、加泰罗尼亚、安达卢西亚、瓦伦西亚筛选),支持数量快捷选择、显示全部与下载。 在线工具,随机西班牙地址生成器在线工具,online
- Gemini 图片去水印
基于开源反向 Alpha 混合算法去除 Gemini/Nano Banana 图片水印,支持批量处理与下载。 在线工具,Gemini 图片去水印在线工具,online
- Base64 字符串编码/解码
将字符串编码和解码为其 Base64 格式表示形式即可。 在线工具,Base64 字符串编码/解码在线工具,online