《OpenClaw架构与源码解读》· 第 12 章 Cron、Webhooks 与事件驱动自动化

第 12 章 Cron、Webhooks 与事件驱动自动化

前面第 8–10 章介绍的消息处理链路,都是被动响应式的:用户先说话,OpenClaw 才行动。但 OpenClaw 更有价值的地方之一,恰恰是它可以主动出击——在你没有发消息的时候,悄悄把事情做了,再来汇报。

本章介绍三种让 OpenClaw「自己动起来」的机制:Cron 定时任务、Webhooks 外部触发、以及类 Gmail Pub/Sub 的长链路事件源。

12.1 Cron Jobs:让 OpenClaw「记住」该做什么

12.1.1 什么是 Cron Jobs

Cron Jobs 就是定时任务:在指定时间或时间间隔触发一段操作。在 OpenClaw 里,你可以用 Cron Jobs 让它每天早上 8 点给你发今日简报(天气、日历、收件箱摘要),每 2 小时检查一次 CI/CD 状态有失败时主动告警,每周一整理一次你的 GitHub Issue 积压,或者每晚 11 点发一条「今天的未完成 Todo」。

12.1.2 Cron 配置

~/.openclaw/openclaw.json 中,Cron Jobs 的配置格式大致如下:

// ~/.openclaw/openclaw.json(宽松 JSON){cron:[{id:"morning-briefing",name:"早晨简报",schedule:"0 8 * * *",// 每天 8:00message:"给我一个今天的早晨简报:天气、日历安排、未读邮件摘要。",enabled:true},{id:"ci-check",name:"CI 状态检查",schedule:"0 */2 * * *",// 每 2 小时message:"检查最近 2 小时的 CI/CD 状态,有失败的通知我。",enabled:true}]}

schedule 字段使用标准的 cron 表达式(分 时 日 月 星期):

表达式含义
0 8 * * *每天 8:00
0 */2 * * *每 2 小时整点
*/15 * * * *每 15 分钟
0 9 * * 1每周一 9:00

12.1.3 Cron 任务的触发流程

// src/cron/cron-engine.ts(伪代码)classCronEngine{private jobs: Map<string, CronJob>=newMap();asyncinit(configs: CronConfig[], agentPool: AgentPool){for(const config of configs){if(!config.enabled)continue;const job = schedule.createJob(config.schedule,async()=>{awaitthis.triggerJob(config, agentPool);});this.jobs.set(config.id, job);}}privateasynctriggerJob(config: CronConfig, agentPool: AgentPool){const syntheticMsg: InboundMessage ={ id:`cron:${config.id}:${Date.now()}`, channel:"internal:cron", peerId:"system", chatId: config.sessionId, text: config.message, timestamp:newDate(), raw:{ triggerType:"cron", jobId: config.id },};await gateway.dispatchInbound(syntheticMsg);}}

关键设计点在于:Cron 触发的「消息」走的是和用户消息完全一样的分发链路。这意味着 Cron 任务也会经过 Session 解析、Agent 路由、Agent Runtime、Skill 调用、回复生成,最终把结果发回你的 Slack/iMessage。这种「统一入口」设计极大地简化了代码,避免了 Cron 路径和用户消息路径之间的逻辑重复。

12.1.4 管理 Cron Jobs

# 列出所有 Cron Jobs openclaw cron list # 启用/禁用某个 Job openclaw cronenable morning-briefing openclaw cron disable ci-check # 立刻手动触发一次(不等到下次调度时间) openclaw cron trigger morning-briefing # 新增一个 Cron Job openclaw cronadd--id nightly-todo --schedule"0 23 * * *"\--message"给我一个今天的未完成 Todo 总结"--agent personal-assistant 

12.2 Webhooks:让外部系统「推」给 OpenClaw

12.2.1 Webhook 的使用场景

Cron 是「定时触发」,Webhook 是「事件触发」。GitHub 合并了一个 PR 可以触发 OpenClaw 发一条通知,Sentry 检测到线上报错可以触发 OpenClaw 通知你并自动尝试诊断,Stripe 收到一笔付款可以触发收款通知,某个爬虫任务完成后可以触发 OpenClaw 处理数据并发送摘要。Webhook 的核心优势是近实时——事件发生后几秒内就能触发,而不是像 Cron 那样要等到下一个轮询周期才知道。

12.2.2 Webhook 端点的注册与配置

Gateway 会暴露一个统一的 Webhook 端点:

POST http://localhost:18789/webhook/{webhookId} 

~/.openclaw/openclaw.json 里注册一个 Webhook:

{webhooks:[{id:"github-pr-merged",name:"GitHub PR 合并通知",secret:"my-secret-token",messageTemplate:"GitHub 上有一个 PR 被合并了:{payload.pull_request.title}(仓库:{payload.repository.full_name})",enabled:true}]}

然后在 GitHub 的 Webhook 设置里填入对应的 Payload URL、Secret 和 Content type。

12.2.3 Webhook 处理流程

// src/gateway/server-webhook.ts(伪代码) app.post("/webhook/:webhookId",async(req, res)=>{const config = webhookRegistry.get(req.params.webhookId);if(!config ||!config.enabled){ res.status(404).end();return;}// 1. 验证签名(防止伪造请求)const signature = req.headers["x-hub-signature-256"]asstring;const isValid =verifySignature(req.rawBody, config.secret, signature);if(!isValid){ res.status(401).end();return;}// 2. 立即返回 200(让发起方尽快确认接收,避免超时重发) res.status(200).end();// 3. 异步处理 Webhook 内容setImmediate(async()=>{const payload = req.body;const messageText =renderTemplate(config.messageTemplate,{ payload });const syntheticMsg: InboundMessage ={ id:`webhook:${config.id}:${Date.now()}`, channel:"internal:webhook", peerId:"system", chatId: config.sessionId, text: messageText, timestamp:newDate(), raw:{ triggerType:"webhook", webhookId: config.id, payload },};await gateway.dispatchInbound(syntheticMsg);});});

注意第 2 步:先返回 200,再异步处理。这是 Webhook 处理的最佳实践——大多数发起方(GitHub、Stripe 等)要求在 5 到 10 秒内收到响应,如果处理逻辑太慢就会触发重发。先回 200 确认收到,再慢慢处理,是标准做法。

12.2.4 幂等性:重复触发的防护

由于网络或发送方重试,同一个 Webhook 事件可能被发送多次。为了避免重复触发 Agent,Gateway 需要做幂等去重:

const deliveryId = req.headers["x-github-delivery"]asstring;if(deliveryId){const isDuplicate =await idempotencyStore.check(deliveryId);if(isDuplicate){ res.status(200).end();return;}await idempotencyStore.mark(deliveryId,TTL_24H);}

12.3 Gmail Pub/Sub:长链路事件源处理

对于邮件这类特殊场景,既不适合 Cron(延迟较高),也没有可靠的 Webhook 推送,Gmail 提供了一套基于 Google Cloud Pub/Sub 的实时通知机制。你授权 Gmail 把邮件变更事件推送到某个 Google Cloud Pub/Sub 主题,OpenClaw 订阅该主题,收到通知后拉取最新邮件变更,然后根据规则决定是否触发 Agent(例如「有新的 GitHub 通知邮件时」)。

// src/gateway/gmail-pubsub.ts(伪代码)asyncfunctionstartGmailWatch(config: GmailPubSubConfig){const client =awaitgetGmailClient(config.userId);await client.users.watch({ userId:"me", requestBody:{ topicName: config.pubsubTopicName, labelIds:["INBOX"],},});const pubsub =newPubSub({ projectId: config.gcpProjectId });const subscription = pubsub.subscription(config.subscriptionName); subscription.on("message",async(message)=>{ message.ack();const notification =JSON.parse( Buffer.from(message.data asstring,"base64").toString());awaitprocessGmailHistory(notification.historyId, config);});}asyncfunctionprocessGmailHistory(historyId:string, config: GmailPubSubConfig){const client =awaitgetGmailClient(config.userId);const history =await client.users.history.list({ userId:"me", startHistoryId: historyId, historyTypes:["messageAdded"],});const newMessages = history.data.history?.flatMap(h => h.messagesAdded ??[])??[];for(const{ message }of newMessages){if(matchesRule(message, config.rules)){const syntheticMsg: InboundMessage ={ id:`gmail-pubsub:${message.id}`, channel:"internal:gmail-pubsub", peerId:"system", chatId: config.sessionId, text:`收到一封新邮件,ID: ${message.id}。请检查并根据规则处理。`, timestamp:newDate(), raw:{ messageId: message.id, historyId },};await gateway.dispatchInbound(syntheticMsg);}}}

12.4 事件总线:统一的异步事件分发

除了上面三种机制,Gateway 内部还有一个轻量级的事件总线(EventBus),用于在各模块之间发布和订阅事件,避免直接耦合:

// 发布事件 eventBus.emit("skill:gmail:archive_completed",{ sessionId:"main", count:17, timestamp:newDate(),});// 订阅事件 eventBus.on("skill:gmail:archive_completed",async(data)=>{await dashboard.updateStats({ type:"archive", count: data.count });});

EventBus 让不同模块(Skills、Nodes、Automation Engine、Web UI)可以松散地相互感知,而不需要直接调用对方的接口。

12.5 源码走读导向

在阅读自动化相关代码时,可以沿以下路径。Cron Engine 在 src/cron/ 中,关注如何把 cron 表达式转化为定时器以及如何构造合成消息触发 Gateway。Webhook Handler 在 src/gateway/ 中与 webhook 相关的文件(如 server-webhook.ts),以及 src/plugin-sdk/webhook-request-guards.ts(签名验证)和 src/plugin-sdk/persistent-dedupe.ts(幂等去重)。Gmail Pub/Sub 相关代码可能分布在 Skills 或 src/gateway/ 的特定模块中,关注 Watch 注册和 Pub/Sub 消息解析。EventBus 在 src/gateway/ 中事件相关模块。

12.6 小结

本章介绍了 OpenClaw 的三种主动触发机制。Cron 定时触发,构造合成消息走标准分发链路,配置简单直观。Webhook 是外部事件推送,先 200 再异步处理,注意签名验证和幂等去重。Gmail Pub/Sub 专为邮件类实时通知设计,依赖 GCP 基础设施。

三种机制的共同点是:都最终收敛到 Gateway 的 dispatchInbound 这同一个入口,这让自动化任务和用户主动触发的任务在处理逻辑上保持完全一致。

下一章,我们进入本书的「动手实战」章节:从零开始,一步步构建你自己的 Skill。

Read more

从零开始用魔珐星云SDK搭建AI面试官:3D数字人应用的实时性与成本控制

从零开始用魔珐星云SDK搭建AI面试官:3D数字人应用的实时性与成本控制

文章目录 * 引言 * 一、项目背景:具身智能的 iPhone 时刻 * 二、创作目标与体验方式 * 三、星云平台 6 大核心特点 * 四、体验 Part1:星云平台使用流程 * 4.1 账号注册(邀请码有福利!) * 4.2 创建具身智能应用 * 4.3 应用配置流程 * 五、体验 Part2:基于魔珐星云 SDK 开发应用 * 5.1 环境部署 * 5.2 Demo代码详解 * 5.2.1 核心SDK文件 * 5.2.2 配套功能 * 5.2.3

前端学习日记 - 前端函数防抖详解

前端学习日记 - 前端函数防抖详解

前端函数防抖详解 * 为什么使用防抖 * 函数防抖的应用场景 * 函数防抖原理与手写实现 * 原理 * 手写实现 * 使用 Lodash 的 \_.debounce * 完整示例:防抖搜索组件 * 结语 在现代 Web 应用中,函数防抖(debounce)是一种常见且高效的性能优化手段,用于限制高频事件触发下的函数调用次数,从而减少不必要的计算、网络请求或 DOM 操作。本文将从“为什么使用防抖”切入,介绍典型的应用场景,深入解析防抖原理,并给出从零实现到在实际项目中使用 Lodash 的完整代码示例,帮助你快速掌握前端防抖技术。 为什么使用防抖 函数防抖的核心思想是在连续触发的事件停止后,仅执行最后一次调用,以避免频繁触发带来的性能问题 ([MDN Web Docs][1])。 在不使用防抖的情况下,例如在 input 输入事件或 window.resize 事件中直接调用逻辑,页面可能会因短时间内大量调用而出现卡顿或请求风暴 ([GeeksforGeeks]

【Zabbix 自定义监控全流程实战指南(附图文教程):从语法基础到内存传参、PHP-FPM 服务、Web 场景监控配置】

【Zabbix 自定义监控全流程实战指南(附图文教程):从语法基础到内存传参、PHP-FPM 服务、Web 场景监控配置】

提示:本文原创作品,良心制作,干货为主,简洁清晰,一看就会 zabbix自定义监控 * 前言 * 一、自定义监控语法 * 二、监控内存--基础用法 * 三、监控内存--传参用法 * 四、监控php-fpm 服务的状态 * 五、Web场景监控 前言 这篇内容带大家快速上手 Zabbix 的基础用法 关于 Zabbix 的安装步骤或创建监控项,还不太清楚的小伙伴,可以查看这篇文章补充相关知识 https://blog.ZEEKLOG.net/m0_63756214/article/details/156421867?spm=1001.2014.3001.5501 关于 Zabbix 创建触发器,动作,媒介及图形,还不太清楚的小伙伴,可以查看这篇文章补充相关知识https://blog.