跳到主要内容
极客日志极客日志面向AI+效率的开发者社区
首页博客GitHub 精选镜像工具UI配色美学隐私政策关于联系
搜索内容 / 工具 / 仓库 / 镜像...⌘K搜索
注册
博客列表
Javajava

钉钉单聊机器人 Stream 模式实现

综述由AI生成基于钉钉开放平台 Stream 模式实现单聊机器人的方案。Stream 模式通过 WebSocket 连接降低接入门槛,无需公网 IP、域名及防火墙配置。文章详细说明了凭证注册、WebSocket 连接建立流程,提供了 Java SDK 集成代码示例,包括客户端配置和消息回调处理。此外,还总结了重复推送、多实例监听及负载均衡等常见问题的解决方案。

云间漫步发布于 2026/4/6更新于 2026/5/2225 浏览

单聊机器人实现

开发 Stream 模式推送服务端(推荐)
什么是 Stream 模式

Stream 模式是钉钉开放平台提供的一种集成方式,它可以监听机器人回调、事件订阅回调和注册卡片回调。使用 Stream 模式接入,钉钉开放平台将通过 Websocket 连接与应用程序通讯,Stream 模式将极大降低接入门槛和资源依赖,不需要公网服务器、IP、域名等资源,只需集成钉钉开放平台 SDK 即可。

Stream 模式原理

在 Stream 模式下,开发者的应用程序通过集成 SDK 的方式与钉钉开放平台建立一条 WebSocket 连接,建立连接过程中开放平台将对连接进行鉴权。当有卡片回调发生时,开放平台将通过 WebSocket 连接将数据通知到开发者的应用程序。开发者的应用程序可以接收到这些数据并进行相应处理,从而实现与钉钉开放平台的实时通信,参考下图所示。

在这里插入图片描述

Stream 模式优势

在钉钉开放平台向应用程序发送请求的场景中,大部分都是采用 Webhook(注册公网 HTTPS 服务)的方式,包括卡片回调,使用 Webhook 方式开发过程中会遇到较多的问题,包括:

  • 申请公网域名和 TLS 证书
  • 申请公网 IP 并部署接入网关
  • 部署应用防火墙并配置白名单
  • 独立处理请求的鉴权,以及加解密处理
  • 搭建内网穿透环境进行本地开发调试

针对以上问题,Stream 模式将为开发者提供'五零'接入体验,将 1~2 周的接入开发周期降低到 5 分钟,包括:

  • 零公网 IP:不需要依赖公网 IP 或域名,也不需要暴露公网 IP,减少了公网暴露服务的安全风险并降低了开发门槛。
  • 零加解密/签名/TLS 证书管理:使用应用身份对连接进行鉴权,通过反向连接的方式与钉钉开放平台建立 TLS 加密连接,提供了快速、安全的通信体验。
  • 零防火墙白名单:Stream 模式下开发者无需向公网开放提供任何服务端口,无需部署防火墙和配置白名单。
  • 零网关部署:通过反向连接的方式建立通道,开发者只需保证运行环境具备公网访问能力即可,无需部署网关。
  • 零内网穿透:开发者无需在本地搭建内网穿透工具,通过 Stream 模式在本地开发环境中即可接收卡片回调。
接入方式
接入限制
  1. 应用程序所部署环境具备访问公网的能力。
  2. 仅适用于企业内部开发和第三方企业应用。
  3. 每个客户端实例默认启用一条 WebSocket 连接,一个应用默认最多建立 50 条连接。
协议接入步骤
介绍

钉钉 Stream 协议接入主要包括两个步骤:

  1. 注册连接凭证:通过 HTTP POST 方法,获取 WebSocket 通道的 endpoint(协议域名和 Path 信息)和 ticket(URL 中的 Ticket 参数);
  2. 建立 WebSocket 连接:通过步骤一中获取的 endpoint 和 ticket 信息,建立 WebSocket 通道,开始订阅;
步骤一:注册连接凭证

调用以下接口注册 Stream 连接凭证:

请求方法 (HTTP) 示例:

POST /v1.0/gateway/connections/open HTTP/1.1
Host: api.dingtalk.com
Content-Type: application/json
Accept: application/json
{"clientId":"${ClientID}","clientSecret":"${ClientSecret}","localIp":"10.34.22.11","subscriptions":[{"topic":"*","type":"EVENT"},{"topic":"/v1.0/im/bot/messages/get","type":"CALLBACK"}],"ua":"dingtalk-sdk-java/1.0.2"}
步骤二:建立 WebSocket 连接

注册长连接信息成功后,客户端将获取长连的身份标识 ticket 和钉钉开放平台的地址信息,通过此信息客户端和钉钉服务端建立一条 WebSocket 连接,握手请求的路径和参数信息如下所示:

GET /connect?ticket=${ticket} HTTP/1.1
Host: wss-open-connection.dingtalk.com
Upgrade: websocket

以上示例中 Host、Path 信息仅用于示例展示,请用步骤一中返回的 endpoint 信息作为 Host 和 Path 构建 WebSocket 请求。

钉钉服务端收到 WebSocket 握手信息后会通过 ticket 校验客户端身份信息,校验成功后会返回正确的握手信息。

至此,已经完成了推送订阅通道建立,可以实时接收到订阅的消息列表。下一章节将介绍各种类型的消息,以及如何响应(用于通知钉钉服务端已经成功接收,请勿重复推送)。
Java
  • 运行环境:JDK1.8 及以上。
  • 安装 Java SDK:添加依赖项到工程的 pom.xml 文件或下载对应的 jar 包,最新的 SDK 版本可以在这里 查看和下载。
<dependency>
    <groupId>com.dingtalk.open</groupId>
    <artifactId>dingtalk-stream</artifactId>
    <version>{sdk-version}</version>
</dependency>
钉钉 Stream 流的构建

创建 IM 消息的监听

@Configuration
public class DingTalkStreamClientConfiguration {
    @Value("${app.appKey}")
    private String clientId;
    
    @Value("${app.appSecret}")
    private String clientSecret;

    /**
     * 配置 OpenDingTalkClient 客户端并配置初始化方法 (start)
     *
     * @param chatBotCallbackListener
     * @param aiGraphPluginCallbackListener
     * @return
     * @throws Exception
     */
    @Bean(initMethod = "start")
    public OpenDingTalkClient configureStreamClient(
            @Autowired ChatBotCallbackListener chatBotCallbackListener) throws Exception {
        // init stream client
        return OpenDingTalkStreamClientBuilder.custom()
                // 配置应用的身份信息,企业内部应用分别为 appKey 和 appSecret,三方应用为 suiteKey 和 suiteSecret.
                .credential(new AuthClientCredential(clientId, clientSecret))
                // 注册机器人回调
                .registerCallbackListener(DingTalkStreamTopics.BOT_MESSAGE_TOPIC, chatBotCallbackListener)
                .build();
    }
}

机器人消息回调

/**
 * 机器人消息回调
 *
 * @author zeymo
 */
@Slf4j
@Component
public class ChatBotCallbackListener implements OpenDingTalkCallbackListener<ChatbotMessage, JSONObject> {
    private RobotPrivateMessagesService robotPrivateMessagesService;

    @Autowired
    public ChatBotCallbackListener(RobotPrivateMessagesService robotPrivateMessagesService) {
        this.robotPrivateMessagesService = robotPrivateMessagesService;
    }

    /**
     * https://open.dingtalk.com/document/orgapp/the-application-robot-in-the-enterprise-sends-group-chat-messages
     *
     * @param message
     * @return
     */
    @Override
    public JSONObject execute(ChatbotMessage message) {
        try {
            MessageContent text = message.getText();
            if (text != null) {
                String msg = text.getContent();
                log.info("receive bot message from user={}, msg={}", message.getSenderId(), msg);
                String openConversationId = message.getConversationId();
                try {
                    // 发送机器人消息
                    robotPrivateMessagesService.send(openConversationId, "hello");
                } catch (Exception e) {
                    log.error("send group message by robot error:" + e.getMessage(), e);
                }
            }
        } catch (Exception e) {
            log.error("receive group message by robot error:" + e.getMessage(), e);
        }
        return new JSONObject();
    }
}

以上代码实现了这些能力:

  1. 通过命令行参数读取 Client ID 和 Client Secret 选项
  2. 通过 Client ID 和 Client Secret 创建 Stream Client
  3. 在 Stream Client 中注册机器人消息回调方法,实现消息接收能力
  4. 在消息回调方法中,简单 echo 机器人消息回去,实现消息发送 (回复) 能力

在 IDE 中运行 BotEchoMarkdownApplication.java 中 main 函数,当看到这样的日志输出时候表示运行成功 [DingTalk] connection is established, connectionId=...

配置 Stream 配送
前提条件
  1. 拥有所在钉钉组织开发者后台的 开发者权限。
  2. 拥有所在钉钉组织的 企业内部应用。
  3. 已经完成 开发 Stream 模式推送服务端(推荐) 流程。
操作步骤
  1. 登录 开发者后台,单击目标应用,进入应用详情页。
  2. 单击 开发配置 > 事件订阅,选择 Stream 模式推送。
  3. 服务端开发完成后,单击 已完成接入,验证连接通道。
  4. 单击 保存。保存完成后,事件订阅列表才会展示。
常见问题与解决方案
  1. 重新发送 原因是网络延迟不可控,如果因为互联网的正常抖动导致推送延迟,触发超时重新推送的话,就会出现重复事件。因此无论是否正确的 ACK 了,都需要考虑到收到重复事件的可能性。 备注:机器人消息当前是 fire-forgot 模式,不会因为网络超时而重复推送。

  2. 多个实例,监听同一个机器人。只会有一个消费。确保只能被服务器消费。 解决方案:使用 conditional 控制 IP 地址来阻止 Stream 流连接。

    ConditionalOnIp.java

    @Target({ElementType.TYPE, ElementType.METHOD})
    @Retention(RetentionPolicy.RUNTIME)
    @Documented
    @Conditional(RobotContion.class)
    public @interface ConditionalOnIp {
        /**
         * 允许的 IP 地址列表,支持通配符和 CIDR 表示法
         * 例如:"192.168.1.*", "10.0.0.0/24", "127.0.0.1"
         */
        String[] allowed() default {};
    
        /**
         * 禁止的 IP 地址列表
         */
        String[] denied() default {};
    
        /**
         * 配置属性名称,从配置文件中读取 IP 列表
         */
        String value() default "";
    
        /**
         * 当无法获取 IP 时是否匹配(默认 false)
         */
        boolean matchIfMissing() default false;
    }
    

    RobotContion.java

    public class RobotContion implements Condition {
        private static final String LOCAL_IP = getLocalIp();
    
        @Override
        public boolean matches(ConditionContext context, AnnotatedTypeMetadata metadata) {
            Map<String, Object> annotationAttributes = metadata.getAnnotationAttributes(ConditionalOnIp.class.getName());
            if (annotationAttributes == null) {
                return false;
            }
            String[] allowedIPs = (String[]) annotationAttributes.get("allowed");
            String[] deniedIPs = (String[]) annotationAttributes.get("denied");
            String configProperty = (String) annotationAttributes.get("value");
            boolean matchIfMissing = (boolean) annotationAttributes.get("matchIfMissing");
            if (StringUtils.hasText(configProperty)) {
                Environment env = context.getEnvironment();
                String configValue = env.getProperty(configProperty);
                if (configValue != null) {
                    allowedIPs = configValue.split(",");
                } else {
                    return matchIfMissing;
                }
            }
            String currentIP = LOCAL_IP;
            if (currentIP == null) {
                return matchIfMissing;
            }
            // 检查禁止列表
            if (deniedIPs.length > 0 && isIPInList(currentIP, deniedIPs)) {
                return false;
            }
            // 检查允许列表
            if (allowedIPs.length > 0) {
                return isIPInList(currentIP, allowedIPs);
            }
            // 如果没有设置允许列表,且不在禁止
            return true;
        }
    
        /**
         * 判断 IP 是否在列表中
         */
        private boolean isIPInList(String ip, String[] ipList) {
            for (String ipPattern : ipList) {
                if (matchesIP(ip, ipPattern.trim())) {
                    return true;
                }
            }
            return false;
        }
    
        /**
         * IP 匹配逻辑,支持通配符和 CIDR
         */
        private boolean matchesIP(String ip, String pattern) {
            // 精确匹配
            if (ip.equals(pattern)) {
                return true;
            }
            // 通配符匹配:192.168.1.*
            if (pattern.contains("*")) {
                String regex = pattern.replace(".", "\\.").replace("*", ".*");
                return Pattern.matches(regex, ip);
            }
            // CIDR 表示法匹配:192.168.1.0/24
            if (pattern.contains("/")) {
                try {
                    return isInCIDRRange(ip, pattern);
                } catch (Exception e) {
                    return false;
                }
            }
            // IP 范围匹配:192.168.1.1-192.168.1.100
            if (pattern.contains("-")) {
                try {
                    return isInIPRange(ip, pattern);
                } catch (Exception e) {
                    return false;
                }
            }
            return false;
        }
    
        /**
         * CIDR 范围匹配
         */
        private boolean isInCIDRRange(String ip, String cidr) {
            String[] parts = cidr.split("/");
            String network = parts[0];
            int prefixLength = Integer.parseInt(parts[1]);
            long ipLong = ipToLong(ip);
            long networkLong = ipToLong(network);
            long mask = (0xFFFFFFFFL) << (32 - prefixLength);
            return (ipLong & mask) == (networkLong & mask);
        }
    
        /**
         * IP 范围匹配
         */
        private boolean isInIPRange(String ip, String range) {
            String[] ips = range.split("-");
            long start = ipToLong(ips[0].trim());
            long end = ipToLong(ips[1].trim());
            long ipLong = ipToLong(ip);
            return ipLong >= start && ipLong <= end;
        }
    
        /**
         * IP 转 long
         */
        private long ipToLong(String ip) {
            String[] octets = ip.split("\\.");
            long result = 0;
            for (int i = 0; i < 4; i++) {
                result |= Long.parseLong(octets[i]) << (24 - (8 * i));
            }
            return result;
        }
    
        private static String getLocalIp() {
            // 优先获取回环地址
            try {
                Enumeration<NetworkInterface> netInterfaces = NetworkInterface.getNetworkInterfaces();
                while (netInterfaces.hasMoreElements()) {
                    NetworkInterface iface = netInterfaces.nextElement();
                    if (iface.isLoopback() || !iface.isUp()) {
                        continue;
                    }
                    Enumeration<InetAddress> addresses = iface.getInetAddresses();
                    while (addresses.hasMoreElements()) {
                        InetAddress addr = addresses.nextElement();
                        if (!addr.isLoopbackAddress() && iface.getDisplayName().indexOf("Virtual") == -1) {
                            return addr.getHostAddress();
                        }
                    }
                }
                // System.out.println(InetAddress.getLocalHost().getHostAddress());
                // 如果没有找到,返回会回环地址
                return InetAddress.getLocalHost().getHostAddress();
            } catch (SocketException e) {
                return "127.0.0.1";
            } catch (UnknownHostException e) {
                return "127.0.0.1";
            }
        }
    }
    

    接下来就可以设置配置文件或者设置 allowed 属性来控制 IP 了。

  3. 响应消息处理是否支持负载均衡方式处理? 补充问题:启动多个程序订阅相同事件进行数据处理,担心将来推送数据量大时可能出现无法及时响应的情况;可能需要考虑方案提供给客户端侧进行参考 支持的。如果事件量较大的话,可以采用多进程,或者单进程下多 Stream Client 实例方式,建立多个 Stream 通道,也即多个 WebSocket 长连接。钉钉服务端每次推送消息时候,通过随机策略选取一个 Stream 通道推送。如果需要支持更多的负载均衡策略,可以通过技术支持提交反馈。

  4. 注册连接凭证中的 localIp 是否可以标记多个 IP? 补充问题:或增加自定义的客户端标识参数【可选】 可以支持多个 IP,采用英文逗号分隔。

目录

  1. 单聊机器人实现
  2. 开发 Stream 模式推送服务端(推荐)
  3. 什么是 Stream 模式
  4. Stream 模式原理
  5. Stream 模式优势
  6. 接入方式
  7. 接入限制
  8. 协议接入步骤
  9. 介绍
  10. 步骤一:注册连接凭证
  11. 步骤二:建立 WebSocket 连接
  12. Java
  13. 钉钉 Stream 流的构建
  14. 配置 Stream 配送
  15. 前提条件
  16. 操作步骤
  17. 常见问题与解决方案
  • 💰 8折买阿里云服务器限时8折了解详情
  • Magick API 一键接入全球大模型注册送1000万token查看
  • 🤖 一键搭建Deepseek满血版了解详情
  • 一键打造专属AI 智能体了解详情
极客日志微信公众号二维码

微信扫一扫,关注极客日志

微信公众号「极客日志V2」,在微信中扫描左侧二维码关注。展示文案:极客日志V2 zeeklog

更多推荐文章

查看全部
  • 现代前端开发趋势:React 18、Server Components 与 AI 辅助
  • 星露谷农场规划器技术架构:Node.js 后端与 SVG 前端渲染实现
  • Komari 轻量级服务器监控探针部署指南
  • 基于腾讯元器智能体构建专属 AI 聊天工具
  • ES6 数组 some 与 every 方法用法
  • AI 产品经理的核心定义、能力模型与职业发展路径
  • 前端精确数字运算方案:使用 BigNumber.js 解决 JavaScript 精度问题
  • 为什么 AI 难以取代软件工程师?
  • OpenClaw 本地 AI 智能体入门与实战指南
  • LeetCode 热题 100 Python3 算法题解:哈希、双指针与滑动窗口
  • Formality 原语(primitive)概念详解
  • PHP 对接 DeepSeek API 实现指南
  • OpenClaw 对接飞书机器人配置踩坑:消息不回与 Gateway 断开排查
  • VS Code 中 GitHub 扩展登录报错:尚未完成授权此扩展使用 GitHub 的操作
  • 为什么 AI 圈会有这么多带'Llama'的产品?
  • 主流无人机倾斜摄影三维建模服务商盘点
  • 基于 Open3D.Art 与拓竹打印机实现 AI 生成 3D 模型快速打印流程
  • 腾讯云 VOD AIGC 视频生成工具回调实现
  • 微信官方 Bot API 与 ClawBot 插件技术解析
  • FPGA 实现 CAN 总线原理与 Verilog 代码详解

相关免费在线工具

  • Keycode 信息

    查找任何按下的键的javascript键代码、代码、位置和修饰符。 在线工具,Keycode 信息在线工具,online

  • Escape 与 Native 编解码

    JavaScript 字符串转义/反转义;Java 风格 \uXXXX(Native2Ascii)编码与解码。 在线工具,Escape 与 Native 编解码在线工具,online

  • JavaScript / HTML 格式化

    使用 Prettier 在浏览器内格式化 JavaScript 或 HTML 片段。 在线工具,JavaScript / HTML 格式化在线工具,online

  • JavaScript 压缩与混淆

    Terser 压缩、变量名混淆,或 javascript-obfuscator 高强度混淆(体积会增大)。 在线工具,JavaScript 压缩与混淆在线工具,online

  • Base64 字符串编码/解码

    将字符串编码和解码为其 Base64 格式表示形式即可。 在线工具,Base64 字符串编码/解码在线工具,online

  • Base64 文件转换器

    将字符串、文件或图像转换为其 Base64 表示形式。 在线工具,Base64 文件转换器在线工具,online