import type { ReplyPayload } from "openclaw/auto-reply/types";
import type { ChannelPlugin, OpenClawConfig } from "openclaw/plugin-sdk";
import { createDefaultChannelRuntimeState } from "openclaw/plugin-sdk";
interface WebSocketChannelConnection {
ws: any;
accountId: string;
}
interface WebSocketChannelAccount {
accountId: string;
wsUrl: string;
enabled?: boolean;
configured?: boolean;
dmPolicy?: "pairing" | "allowlist" | "open" | "disabled";
}
const connections = new Map<string, WebSocketChannelConnection>();
let pluginRuntime: any = null;
const WebSocketChannel: ChannelPlugin<WebSocketChannelAccount> = {
id: "websocket-channel",
meta: {
id: "websocket-channel",
label: "Websocket Channel",
selectionLabel: "Websocket Channel (Custom)",
docsPath: "/channels/websocket-channel",
blurb: "WebSocket based messaging channel.",
aliases: ["ws"],
},
};
config: {
listAccountIds: (cfg: OpenClawConfig) => {
return ["default"];
},
resolveAccount: (cfg: OpenClawConfig, accountId: string) => {
const channelCfg = cfg.channels?.["websocket-channel"];
if (!channelCfg || !channelCfg.config) {
return undefined;
}
const config = channelCfg.config as any;
return {
accountId: "default",
wsUrl: config.wsUrl || "ws://localhost:8765/openclaw",
enabled: config.enabled !== false,
};
},
isConfigured: async (account, cfg) => {
return Boolean(account.wsUrl && account.wsUrl.trim() !== "");
},
}
status: {
defaultRuntime: createDefaultChannelRuntimeState("default", {
wsUrl: null,
connected: false,
groupPolicy: null,
}),
buildChannelSummary: ({ snapshot }) => ({
wsUrl: snapshot.wsUrl ?? null,
connected: snapshot.connected ?? null,
groupPolicy: snapshot.groupPolicy ?? null,
}),
buildAccountSnapshot: ({ account, runtime }) => ({
accountId: account.accountId,
enabled: account.enabled,
configured: account.configured,
wsUrl: account.wsUrl,
running: runtime?.running ?? false,
connected: runtime?.connected ?? false,
groupPolicy: runtime?.groupPolicy ?? null,
lastStartAt: runtime?.lastStartAt ?? null,
lastStopAt: runtime?.lastStopAt ?? null,
lastError: runtime?.lastError ?? null,
}),
}
gateway: {
startAccount: async (ctx) => {
const { log, account, abortSignal, cfg } = ctx;
log?.info(`[websocket-channel] Starting WebSocket Channel for ${account.accountId}`);
const runtime = pluginRuntime;
ctx.setStatus({
accountId: account.accountId,
wsUrl: account.wsUrl,
running: true,
connected: true,
});
log?.info(`[websocket-channel] Status set: connected=true, running=true`);
const WebSocketLib = await import("ws");
const ws = new (WebSocketLib.default as any)(account.wsUrl);
connections.set(account.accountId, {
ws,
accountId: account.accountId,
});
ws.on("message", async (data: Buffer) => {
try {
const rawData = data.toString();
const eventData = JSON.parse(rawData);
const innerData = eventData.data || {};
const normalizedMessage = {
id: `${eventData.source || "websocket"}-${Date.now()}`,
channel: "websocket-channel",
accountId: account.accountId,
senderId: innerData.source || eventData.source || "unknown",
senderName: innerData.source || eventData.source || "Unknown",
text: innerData.content || innerData.text || "",
timestamp: innerData.timestamp || Date.now().toISOString(),
isGroup: false,
groupId: undefined,
attachments: [],
metadata: {},
};
log?.info(`[websocket-channel] 📨 Received: "${normalizedMessage.text}" from ${normalizedMessage.senderId}`);
const route = runtime.channel.routing.resolveAgentRoute({
cfg,
channel: "websocket-channel",
accountId: account.accountId,
peer: {
kind: "direct",
id: normalizedMessage.senderId,
},
});
const ctxPayload = runtime.channel.reply.finalizeInboundContext({
Body: normalizedMessage.text,
BodyForAgent: normalizedMessage.text,
From: normalizedMessage.senderId,
To: undefined,
SessionKey: route.sessionKey,
AccountId: route.accountId,
ChatType: "direct",
SenderName: normalizedMessage.senderName,
SenderId: normalizedMessage.senderId,
Provider: "websocket-channel",
Surface: "websocket-channel",
MessageSid: normalizedMessage.id,
Timestamp: Date.now(),
});
await runtime.channel.reply.dispatchReplyWithBufferedBlockDispatcher({
ctx: ctxPayload,
cfg: cfg,
dispatcherOptions: {
deliver: async (payload: ReplyPayload, { kind }) => {
log?.info(`[websocket-channel] Delivering ${kind} reply via WebSocket...`);
const currentConn = connections.get(account.accountId);
if (!currentConn || !currentConn.ws || currentConn.ws.readyState !== 1) {
throw new Error("No WebSocket connection available");
}
currentConn.ws.send(
JSON.stringify({
type: "reply",
content: payload.text || "",
kind,
})
);
},
onError: (err, { kind }) => {
log?.error(`[websocket-channel] Delivery error for ${kind}: ${err.message}`);
},
},
});
log?.info(`[websocket-channel] Message dispatched successfully`);
} catch (err) {
log?.error(`[websocket-channel] Failed to process message: ${err.message}`);
}
});
ws.on("error", (err: Error) => {
log?.error(`[websocket-channel] ❌ WebSocket error: ${err.message}`);
connections.delete(account.accountId);
reject(err);
});
ws.on("close", () => {
log?.info(`[websocket-channel] 🔴 Connection closed`);
connections.delete(account.accountId);
resolve();
});
abortSignal.addEventListener("abort", () => {
log?.info(`[websocket-channel] ⏹️ Abort requested`);
ws.close();
resolve();
});
await Promise.race([
connectionPromise,
new Promise<void>((resolve) => {
abortSignal.addEventListener("abort", () => resolve());
}),
]);
connections.delete(account.accountId);
},
}