单聊机器人实现
开发 Stream 模式推送服务端(推荐)
什么是 Stream 模式
Stream 模式是钉钉开放平台提供的一种集成方式,它可以监听机器人回调、事件订阅回调和注册卡片回调。使用 Stream 模式接入,钉钉开放平台将通过 Websocket 连接与应用程序通讯,Stream 模式将极大降低接入门槛和资源依赖,不需要公网服务器、IP、域名等资源,只需集成钉钉开放平台 SDK 即可。
Stream 模式原理
在 Stream 模式下,开发者的应用程序通过集成 SDK 的方式与钉钉开放平台建立一条 WebSocket 连接,建立连接过程中开放平台将对连接进行鉴权。当有卡片回调发生时,开放平台将通过 WebSocket 连接将数据通知到开发者的应用程序。开发者的应用程序可以接收到这些数据并进行相应处理,从而实现与钉钉开放平台的实时通信,参考下图所示。
Stream 模式优势
在钉钉开放平台向应用程序发送请求的场景中,大部分都是采用 Webhook(注册公网 HTTPS 服务)的方式,包括卡片回调,使用 Webhook 方式开发过程中会遇到较多的问题,包括:
- 申请公网域名和 TLS 证书
- 申请公网 IP 并部署接入网关
- 部署应用防火墙并配置白名单
- 独立处理请求的鉴权,以及加解密处理
- 搭建内网穿透环境进行本地开发调试
针对以上问题,Stream 模式将为开发者提供'五零'接入体验,将 1~2 周的接入开发周期降低到 5 分钟,包括:
- 零公网 IP:不需要依赖公网 IP 或域名,也不需要暴露公网 IP,减少了公网暴露服务的安全风险并降低了开发门槛。
- 零加解密/签名/TLS 证书管理:使用应用身份对连接进行鉴权,通过反向连接的方式与钉钉开放平台建立 TLS 加密连接,提供了快速、安全的通信体验。
- 零防火墙白名单:Stream 模式下开发者无需向公网开放提供任何服务端口,无需部署防火墙和配置白名单。
- 零网关部署:通过反向连接的方式建立通道,开发者只需保证运行环境具备公网访问能力即可,无需部署网关。
- 零内网穿透:开发者无需在本地搭建内网穿透工具,通过 Stream 模式在本地开发环境中即可接收卡片回调。
接入方式
接入限制
- 应用程序所部署环境具备访问公网的能力。
- 仅适用于企业内部开发和第三方企业应用。
- 每个客户端实例默认启用一条 WebSocket 连接,一个应用默认最多建立 50 条连接。
协议接入步骤
介绍
钉钉 Stream 协议接入主要包括两个步骤:
- 注册连接凭证:通过 HTTP POST 方法,获取 WebSocket 通道的 endpoint(协议域名和 Path 信息)和 ticket(URL 中的 Ticket 参数);
- 建立 WebSocket 连接:通过步骤一中获取的 endpoint 和 ticket 信息,建立 WebSocket 通道,开始订阅;
步骤一:注册连接凭证
调用以下接口注册 Stream 连接凭证:
请求方法 (HTTP) 示例:
POST /v1.0/gateway/connections/open HTTP/1.1
Host: api.dingtalk.com
Content-Type: application/json
Accept: application/json
{"clientId":"${ClientID}","clientSecret":"${ClientSecret}","localIp":"10.34.22.11","subscriptions":[{"topic":"*","type":"EVENT"},{"topic":"/v1.0/im/bot/messages/get","type":"CALLBACK"}],"ua":"dingtalk-sdk-java/1.0.2"}
步骤二:建立 WebSocket 连接
注册长连接信息成功后,客户端将获取长连的身份标识 ticket 和钉钉开放平台的地址信息,通过此信息客户端和钉钉服务端建立一条 WebSocket 连接,握手请求的路径和参数信息如下所示:
GET /connect?ticket=${ticket} HTTP/1.1
Host: wss-open-connection.dingtalk.com
Upgrade: websocket
以上示例中 Host、Path 信息仅用于示例展示,请用步骤一中返回的 endpoint 信息作为 Host 和 Path 构建 WebSocket 请求。
钉钉服务端收到 WebSocket 握手信息后会通过 ticket 校验客户端身份信息,校验成功后会返回正确的握手信息。
至此,已经完成了推送订阅通道建立,可以实时接收到订阅的消息列表。下一章节将介绍各种类型的消息,以及如何响应(用于通知钉钉服务端已经成功接收,请勿重复推送)。
Java
- 运行环境:JDK1.8 及以上。
- 安装 Java SDK:添加依赖项到工程的
pom.xml文件或下载对应的 jar 包,最新的 SDK 版本可以在这里 查看和下载。
<dependency>
<groupId>com.dingtalk.open</groupId>
<artifactId>dingtalk-stream</artifactId>
<version>{sdk-version}</version>
</dependency>
钉钉 Stream 流的构建
创建 IM 消息的监听
@Configuration
public class DingTalkStreamClientConfiguration {
@Value("${app.appKey}")
private String clientId;
@Value("${app.appSecret}")
private String clientSecret;
/**
* 配置 OpenDingTalkClient 客户端并配置初始化方法 (start)
*
* @param chatBotCallbackListener
* @param aiGraphPluginCallbackListener
* @return
* @throws Exception
*/
@Bean(initMethod = "start")
public OpenDingTalkClient configureStreamClient(
@Autowired ChatBotCallbackListener chatBotCallbackListener) throws Exception {
// init stream client
return OpenDingTalkStreamClientBuilder.custom()
// 配置应用的身份信息,企业内部应用分别为 appKey 和 appSecret,三方应用为 suiteKey 和 suiteSecret.
.credential(new AuthClientCredential(clientId, clientSecret))
// 注册机器人回调
.registerCallbackListener(DingTalkStreamTopics.BOT_MESSAGE_TOPIC, chatBotCallbackListener)
.build();
}
}
机器人消息回调
/**
* 机器人消息回调
*
* @author zeymo
*/
@Slf4j
@Component
public class ChatBotCallbackListener implements OpenDingTalkCallbackListener<ChatbotMessage, JSONObject> {
private RobotPrivateMessagesService robotPrivateMessagesService;
@Autowired
public ChatBotCallbackListener(RobotPrivateMessagesService robotPrivateMessagesService) {
this.robotPrivateMessagesService = robotPrivateMessagesService;
}
/**
* https://open.dingtalk.com/document/orgapp/the-application-robot-in-the-enterprise-sends-group-chat-messages
*
* @param message
* @return
*/
@Override
public JSONObject execute(ChatbotMessage message) {
try {
MessageContent text = message.getText();
if (text != null) {
String msg = text.getContent();
log.info("receive bot message from user={}, msg={}", message.getSenderId(), msg);
String openConversationId = message.getConversationId();
try {
// 发送机器人消息
robotPrivateMessagesService.send(openConversationId, "hello");
} catch (Exception e) {
log.error("send group message by robot error:" + e.getMessage(), e);
}
}
} catch (Exception e) {
log.error("receive group message by robot error:" + e.getMessage(), e);
}
return new JSONObject();
}
}
以上代码实现了这些能力:
- 通过命令行参数读取 Client ID 和 Client Secret 选项
- 通过 Client ID 和 Client Secret 创建 Stream Client
- 在 Stream Client 中注册机器人消息回调方法,实现消息接收能力
- 在消息回调方法中,简单 echo 机器人消息回去,实现消息发送 (回复) 能力
在 IDE 中运行 BotEchoMarkdownApplication.java 中 main 函数,当看到这样的日志输出时候表示运行成功 [DingTalk] connection is established, connectionId=...
配置 Stream 配送
前提条件
- 拥有所在钉钉组织开发者后台的 开发者权限。
- 拥有所在钉钉组织的 企业内部应用。
- 已经完成 开发 Stream 模式推送服务端(推荐) 流程。
操作步骤
- 登录 开发者后台,单击目标应用,进入应用详情页。
- 单击 开发配置 > 事件订阅,选择 Stream 模式推送。
- 服务端开发完成后,单击 已完成接入,验证连接通道。
- 单击 保存。保存完成后,事件订阅列表才会展示。
常见问题与解决方案
-
重新发送 原因是网络延迟不可控,如果因为互联网的正常抖动导致推送延迟,触发超时重新推送的话,就会出现重复事件。因此无论是否正确的 ACK 了,都需要考虑到收到重复事件的可能性。 备注:机器人消息当前是 fire-forgot 模式,不会因为网络超时而重复推送。
-
多个实例,监听同一个机器人。只会有一个消费。确保只能被服务器消费。 解决方案:使用 conditional 控制 IP 地址来阻止 Stream 流连接。
ConditionalOnIp.java
@Target({ElementType.TYPE, ElementType.METHOD}) @Retention(RetentionPolicy.RUNTIME) @Documented @Conditional(RobotContion.class) public @interface ConditionalOnIp { /** * 允许的 IP 地址列表,支持通配符和 CIDR 表示法 * 例如:"192.168.1.*", "10.0.0.0/24", "127.0.0.1" */ String[] allowed() default {}; /** * 禁止的 IP 地址列表 */ String[] denied() default {}; /** * 配置属性名称,从配置文件中读取 IP 列表 */ String value() default ""; /** * 当无法获取 IP 时是否匹配(默认 false) */ boolean matchIfMissing() default false; }RobotContion.java
public class RobotContion implements Condition { private static final String LOCAL_IP = getLocalIp(); @Override public boolean matches(ConditionContext context, AnnotatedTypeMetadata metadata) { Map<String, Object> annotationAttributes = metadata.getAnnotationAttributes(ConditionalOnIp.class.getName()); if (annotationAttributes == null) { return false; } String[] allowedIPs = (String[]) annotationAttributes.get("allowed"); String[] deniedIPs = (String[]) annotationAttributes.get("denied"); String configProperty = (String) annotationAttributes.get("value"); boolean matchIfMissing = (boolean) annotationAttributes.get("matchIfMissing"); if (StringUtils.hasText(configProperty)) { Environment env = context.getEnvironment(); String configValue = env.getProperty(configProperty); if (configValue != null) { allowedIPs = configValue.split(","); } else { return matchIfMissing; } } String currentIP = LOCAL_IP; if (currentIP == null) { return matchIfMissing; } // 检查禁止列表 if (deniedIPs.length > 0 && isIPInList(currentIP, deniedIPs)) { return false; } // 检查允许列表 if (allowedIPs.length > 0) { return isIPInList(currentIP, allowedIPs); } // 如果没有设置允许列表,且不在禁止 return true; } /** * 判断 IP 是否在列表中 */ private boolean isIPInList(String ip, String[] ipList) { for (String ipPattern : ipList) { if (matchesIP(ip, ipPattern.trim())) { return true; } } return false; } /** * IP 匹配逻辑,支持通配符和 CIDR */ private boolean matchesIP(String ip, String pattern) { // 精确匹配 if (ip.equals(pattern)) { return true; } // 通配符匹配:192.168.1.* if (pattern.contains("*")) { String regex = pattern.replace(".", "\\.").replace("*", ".*"); return Pattern.matches(regex, ip); } // CIDR 表示法匹配:192.168.1.0/24 if (pattern.contains("/")) { try { return isInCIDRRange(ip, pattern); } catch (Exception e) { return false; } } // IP 范围匹配:192.168.1.1-192.168.1.100 if (pattern.contains("-")) { try { return isInIPRange(ip, pattern); } catch (Exception e) { return false; } } return false; } /** * CIDR 范围匹配 */ private boolean isInCIDRRange(String ip, String cidr) { String[] parts = cidr.split("/"); String network = parts[0]; int prefixLength = Integer.parseInt(parts[1]); long ipLong = ipToLong(ip); long networkLong = ipToLong(network); long mask = (0xFFFFFFFFL) << (32 - prefixLength); return (ipLong & mask) == (networkLong & mask); } /** * IP 范围匹配 */ private boolean isInIPRange(String ip, String range) { String[] ips = range.split("-"); long start = ipToLong(ips[0].trim()); long end = ipToLong(ips[1].trim()); long ipLong = ipToLong(ip); return ipLong >= start && ipLong <= end; } /** * IP 转 long */ private long ipToLong(String ip) { String[] octets = ip.split("\\."); long result = 0; for (int i = 0; i < 4; i++) { result |= Long.parseLong(octets[i]) << (24 - (8 * i)); } return result; } private static String getLocalIp() { // 优先获取回环地址 try { Enumeration<NetworkInterface> netInterfaces = NetworkInterface.getNetworkInterfaces(); while (netInterfaces.hasMoreElements()) { NetworkInterface iface = netInterfaces.nextElement(); if (iface.isLoopback() || !iface.isUp()) { continue; } Enumeration<InetAddress> addresses = iface.getInetAddresses(); while (addresses.hasMoreElements()) { InetAddress addr = addresses.nextElement(); if (!addr.isLoopbackAddress() && iface.getDisplayName().indexOf("Virtual") == -1) { return addr.getHostAddress(); } } } // System.out.println(InetAddress.getLocalHost().getHostAddress()); // 如果没有找到,返回会回环地址 return InetAddress.getLocalHost().getHostAddress(); } catch (SocketException e) { return "127.0.0.1"; } catch (UnknownHostException e) { return "127.0.0.1"; } } }接下来就可以设置配置文件或者设置 allowed 属性来控制 IP 了。
-
响应消息处理是否支持负载均衡方式处理? 补充问题:启动多个程序订阅相同事件进行数据处理,担心将来推送数据量大时可能出现无法及时响应的情况;可能需要考虑方案提供给客户端侧进行参考 支持的。如果事件量较大的话,可以采用多进程,或者单进程下多 Stream Client 实例方式,建立多个 Stream 通道,也即多个 WebSocket 长连接。钉钉服务端每次推送消息时候,通过随机策略选取一个 Stream 通道推送。如果需要支持更多的负载均衡策略,可以通过技术支持提交反馈。
-
注册连接凭证中的 localIp 是否可以标记多个 IP? 补充问题:或增加自定义的客户端标识参数【可选】 可以支持多个 IP,采用英文逗号分隔。

