OpenClaw WebSocket Channel开发实战:从零打造自定义 AI 通信通道
🎯 项目背景
为什么做这个项目?
最近 OpenClaw 特别火🔥,这是一个强大的个人 AI 助手网关,支持接入 WhatsApp、Telegram、Discord 等 15+ 个消息平台。作为一个技术爱好者,我决定深入学习一下它的架构设计。
学习目标:
- ✅ 理解多通道 AI 网关的架构模式
- ✅ 掌握 OpenClaw 插件化开发技能
- ✅ 实践 WebSocket 实时双向通信
- ✅ 为社区贡献一个实用的教学案例
项目定位:这不是一个生产级项目,而是一个学习性质的教学案例,帮助其他开发者快速上手 OpenClaw 插件开发。
技术栈
前端层:Vue 3 + WebSocket ↓ 服务端:Python + aiohttp + uv ↓ 通道层:Node.js + ws + OpenClaw Plugin SDK ↓ AI 层:OpenClaw Gateway + LLM Provider 🚀 快速开始
项目结构
openclaw-websocket-channel/ ├── websocket-service/ # Python WebSocket 服务端 │ ├── app.py # aiohttp 主程序 │ └── requirements.txt # Python 依赖 ├── websocket-web/ # Vue 3 前端 │ ├── src/ │ │ └── App.vue # 主界面 │ └── package.json └── websocket-channel/ # OpenClaw 通道插件 ├── index.ts # 插件主逻辑 └── openclaw.plugin.json 1. 启动 Python WebSocket 服务端
# 进入服务端目录cd websocket-service # 使用 uv 安装依赖 uv sync# 启动服务端 python app.py # 默认监听:ws://localhost:87652. 启动 Vue 前端
# 进入前端目录cd websocket-web # 安装依赖npminstall# 开发模式运行npm run dev # 访问:http://localhost:3000前端界面功能:
- 💬 实时聊天窗口
- 🔌 连接状态显示
- ✉️ 消息收发日志
3. 安装 WebSocket Channel
# 进入通道插件目录cd websocket-channel # 安装到 OpenClaw openclaw plugins install.# 验证安装 openclaw plugins list # 应该看到:websocket-channel4. 配置 OpenClaw
编辑 ~/.openclaw/config.json(或通过 Web UI):
{"channels":{"websocket-channel":{"enabled":true,"config":{"enabled":true,"wsUrl":"ws://localhost:8765/openclaw"}}}}配置说明:
enabled: 启用通道wsUrl: WebSocket 服务端地址- 无需
groupPolicy:默认就是开放模式
5. 重启 OpenClaw Gateway
# 如果使用 macOS 应用# 点击菜单栏 OpenClaw → Restart Gateway# 或命令行重启pkill-f openclaw-gateway openclaw gateway run 6. 测试
- 打开浏览器访问前端:
http://localhost:3000 - 点击 “连接” 按钮
- 发送消息:“你好,请介绍一下自己”
- 等待 AI 回复…
预期效果:
你:你好,请介绍一下自己 AI:你好!我是你的个人 AI 助手,基于 OpenClaw 框架运行。 我可以帮助你回答问题、编写代码、分析数据等。 有什么我可以帮你的吗?😊 🏗️ 程序架构
整体架构图
┌─────────────────┐ │ 用户浏览器 │ │ (Vue 前端) │ └────────┬────────┘ │ WebSocket │ ws://localhost:8765 ▼ ┌─────────────────┐ │ Python 服务端 │ │ (aiohttp) │ └────────┬────────┘ │ WebSocket │ 长连接 ▼ ┌─────────────────┐ │ Node.js 通道 │ │ (ws 库) │ └────────┬────────┘ │ OpenClaw Plugin API ▼ ┌─────────────────┐ │ OpenClaw │ │ Gateway │ └────────┬────────┘ │ ▼ ┌─────────────────┐ │ AI Provider │ │ (Qwen/Bailian) │ └─────────────────┘ 数据流详解
入站消息(前端 → AI)
1. 用户在 Vue 界面输入消息 ↓ 2. 前端通过 WebSocket 发送到 Python 服务端 ↓ 3. Python 服务端转发给 Node.js 通道 ↓ 4. Node.js 通道的 ws.on("message") 接收 ↓ 5. 标准化消息格式 ↓ 6. 调用 OpenClaw 框架 API ↓ 7. Gateway 调用 AI Provider 生成回复 出站消息(AI → 前端)
1. AI 生成回复文本 ↓ 2. OpenClaw 调用 deliver 回调 ↓ 3. Node.js 通道通过 WebSocket 发送给 Python 服务端 ↓ 4. Python 服务端转发给 Vue 前端 ↓ 5. 前端界面显示 AI 回复 💻 Channel开发详解
1. 项目初始化
# 创建插件目录mkdir-p openclaw-websocket-channel/websocket-channel cd openclaw-websocket-channel/websocket-channel # 创建基础文件touch index.ts openclaw.plugin.json package.json 2. 定义插件元数据
index.ts:
importtype{ ReplyPayload }from"openclaw/auto-reply/types";importtype{ ChannelPlugin, OpenClawConfig }from"openclaw/plugin-sdk";import{ createDefaultChannelRuntimeState }from"openclaw/plugin-sdk";interfaceWebSocketChannelConnection{ ws:any; accountId:string;}interfaceWebSocketChannelAccount{ accountId:string; wsUrl:string; enabled?:boolean; configured?:boolean; dmPolicy?:"pairing"|"allowlist"|"open"|"disabled";}const connections =newMap<string, WebSocketChannelConnection>();let pluginRuntime:any=null;const WebSocketChannel: ChannelPlugin<WebSocketChannelAccount>={ id:"websocket-channel", meta:{ id:"websocket-channel", label:"Websocket Channel", selectionLabel:"Websocket Channel (Custom)", docsPath:"/channels/websocket-channel", blurb:"WebSocket based messaging channel.", aliases:["ws"],},// ... 其他配置};3. 实现配置适配器
config:{/** * 列出所有配置的账户 ID * @returns 固定返回 ["default"] */listAccountIds:(cfg: OpenClawConfig)=>{return["default"];},/** * 解析账户配置 */resolveAccount:(cfg: OpenClawConfig, accountId:string)=>{const channelCfg = cfg.channels?.["websocket-channel"];if(!channelCfg ||!channelCfg.config){returnundefined;}const config = channelCfg.config asany;return{ accountId:"default", wsUrl: config.wsUrl ||"ws://localhost:8765/openclaw", enabled: config.enabled !==false,};},/** * 检查账户是否已配置 */isConfigured:async(account, cfg)=>{returnBoolean(account.wsUrl && account.wsUrl.trim()!=="");},}4. 实现状态管理适配器 ⭐关键
status:{/** * 默认运行时状态模板 * ⚠️ 必须实现这个方法,否则 UI 会显示 "0/1 connected" */ defaultRuntime:createDefaultChannelRuntimeState("default",{ wsUrl:null, connected:false, groupPolicy:null,}),/** * 构建通道摘要(用于 UI 显示) */buildChannelSummary:({ snapshot })=>({ wsUrl: snapshot.wsUrl ??null, connected: snapshot.connected ??null, groupPolicy: snapshot.groupPolicy ??null,}),/** * 构建账户完整快照 */buildAccountSnapshot:({ account, runtime })=>({ accountId: account.accountId, enabled: account.enabled, configured: account.configured, wsUrl: account.wsUrl, running: runtime?.running ??false, connected: runtime?.connected ??false, groupPolicy: runtime?.groupPolicy ??null, lastStartAt: runtime?.lastStartAt ??null, lastStopAt: runtime?.lastStopAt ??null, lastError: runtime?.lastError ??null,}),}为什么需要 defaultRuntime?
OpenClaw 的 UI 通过读取通道的 defaultRuntime 来知道要跟踪哪些状态字段。如果没有这个配置:
- UI 不知道要显示
connected字段 - 即使你在
startAccount中设置了connected: true - UI 也只会显示 “0/1 connected”
正确做法:
- 在
defaultRuntime中声明要跟踪的字段(包括connected: false) - 在
startAccount开始时调用ctx.setStatus({ connected: true }) - UI 就会正确显示 “1/1 connected”
5. 实现网关适配器(核心)
gateway:{/** * 启动 WebSocket 账户连接 */startAccount:async(ctx)=>{const{ log, account, abortSignal, cfg }= ctx; log?.info(`[websocket-channel] Starting WebSocket Channel for ${account.accountId}`);// 获取 runtime APIconst runtime = pluginRuntime;// ⭐ 关键:设置初始状态为已连接 ctx.setStatus({ accountId: account.accountId, wsUrl: account.wsUrl, running:true, connected:true,}); log?.info(`[websocket-channel] Status set: connected=true, running=true`);// 创建 WebSocket 连接const WebSocketLib =awaitimport("ws");const ws =new(WebSocketLib.default asany)(account.wsUrl);// 存储连接 connections.set(account.accountId,{ ws, accountId: account.accountId });// 监听消息事件 ws.on("message",async(data: Buffer)=>{try{// 1. 解析原始消息const rawData = data.toString();const eventData =JSON.parse(rawData);const innerData = eventData.data ||{};// 2. 标准化消息const normalizedMessage ={ id:`${eventData.source ||"websocket"}-${Date.now()}`, channel:"websocket-channel", accountId: account.accountId, senderId: innerData.source || eventData.source ||"unknown", senderName: innerData.source || eventData.source ||"Unknown", text: innerData.content || innerData.text ||"", timestamp: innerData.timestamp || Date.now().toISOString(), isGroup:false, groupId:undefined, attachments:[], metadata:{},}; log?.info(`[websocket-channel] 📨 Received: "${normalizedMessage.text}" from ${normalizedMessage.senderId}`,);// 3. 解析路由const route = runtime.channel.routing.resolveAgentRoute({ cfg, channel:"websocket-channel", accountId: account.accountId, peer:{ kind:"direct", id: normalizedMessage.senderId,},});// 4. 构建消息上下文const ctxPayload = runtime.channel.reply.finalizeInboundContext({ Body: normalizedMessage.text, BodyForAgent: normalizedMessage.text, From: normalizedMessage.senderId, To:undefined, SessionKey: route.sessionKey, AccountId: route.accountId, ChatType:"direct", SenderName: normalizedMessage.senderName, SenderId: normalizedMessage.senderId, Provider:"websocket-channel", Surface:"websocket-channel", MessageSid: normalizedMessage.id, Timestamp: Date.now(),});// 5. 调用框架调度器await runtime.channel.reply.dispatchReplyWithBufferedBlockDispatcher({ ctx: ctxPayload, cfg: cfg, dispatcherOptions:{deliver:async(payload: ReplyPayload,{ kind })=>{ log?.info(`[websocket-channel] Delivering ${kind} reply via WebSocket...`);const currentConn = connections.get(account.accountId);if(!currentConn ||!currentConn.ws || currentConn.ws.readyState !==1){thrownewError("No WebSocket connection available");}// 发送 AI 回复 currentConn.ws.send(JSON.stringify({ type:"reply", content: payload.text ||"", kind,}));},onError:(err,{ kind })=>{ log?.error(`[websocket-channel] Delivery error for ${kind}: ${err.message}`);},},}); log?.info(`[websocket-channel] Message dispatched successfully`);}catch(err){ log?.error(`[websocket-channel] Failed to process message: ${err.message}`);}});// 监听错误和关闭 ws.on("error",(err: Error)=>{ log?.error(`[websocket-channel] ❌ WebSocket error: ${err.message}`); connections.delete(account.accountId);reject(err);}); ws.on("close",()=>{ log?.info(`[websocket-channel] 🔴 Connection closed`); connections.delete(account.accountId);resolve();});// 监听中止信号 abortSignal.addEventListener("abort",()=>{ log?.info(`[websocket-channel] ⏹️ Abort requested`); ws.close();resolve();});// 保持连接运行awaitPromise.race([ connectionPromise,newPromise<void>((resolve)=>{ abortSignal.addEventListener("abort",()=>resolve());}),]); connections.delete(account.accountId);},}6. 注册插件入口
/** * 注册插件入口 * @param api - 插件 API */exportdefaultfunctionregister(api:any){console.log("[websocket-channel] Registering WebSocket Channel plugin"); pluginRuntime = api.runtime; api.registerChannel({ plugin: WebSocketChannel });}