即时通讯系统核心模块实现

即时通讯系统核心模块实现

即时通讯系统核心模块实现:从消息传输到存储检索的全链路设计

在当今数字化时代,即时通讯(IM)系统已成为人们日常沟通、工作协作的基础设施。一个高性能、高可靠的 IM 系统需要妥善解决消息的实时传输、持久化存储、快速检索等核心问题。本文将基于一套实际生产环境的代码实现,详细解析 IM 系统中消息传输服务与存储检索服务的设计思路、技术选型与具体实现,带你深入理解 IM 系统的核心工作原理。

一、系统架构 overview:核心模块与技术栈

在展开具体实现前,我们先梳理这套 IM 系统的核心模块与技术选型。从代码来看,该系统采用微服务架构,将核心功能拆分为消息传输服务消息存储检索服务,通过标准化接口实现模块间通信。

1.1 核心业务流程

IM 系统的核心业务流程可概括为:

  1. 消息发送:用户发送消息后,由消息传输服务负责验证、封装并转发给目标用户
  2. 消息存储:传输服务将消息同步到消息队列,由存储服务消费并持久化到数据库
  3. 消息检索:用户查询历史消息或关键词搜索时,存储服务从数据库 / 搜索引擎中获取数据并返回

1.2 技术栈选型

为满足高并发、低延迟、可扩展的需求,系统选用了以下技术组件:

  • RPC 框架:brpc(百度开源的高性能 RPC 框架,支持高并发场景)
  • 关系型数据库:MySQL(存储消息元数据、用户会话关系等结构化数据)
  • 搜索引擎:Elasticsearch(简称 ES,用于消息全文检索,支持中文分词)
  • 消息队列:RabbitMQ(实现消息异步传输,解耦服务间依赖)
  • 服务治理:etcd(提供服务注册与发现,支持动态扩缩容)
  • 数据序列化:Protobuf(高效的结构化数据序列化协议)
  • 日志系统:自定义 logger(记录系统运行状态与错误信息)

1.3 模块交互关系

各模块通过 “服务注册 - 发现” 机制动态感知对方地址,通过 RPC 接口通信:

  • 消息传输服务依赖用户服务获取发送者信息,依赖 MySQL 获取会话成员列表
  • 消息存储服务依赖文件服务存储图片 / 语音等二进制数据,依赖用户服务获取发送者信息
  • 所有服务通过 etcd 注册自身地址,通过服务发现机制找到依赖服务的地址

二、数据存储层设计:ES 与 MySQL 的协同方案

消息数据的存储是 IM 系统的基础,需要同时满足 “可靠存储” 与 “高效检索” 的需求。系统采用 “MySQL+ES” 的混合存储方案:MySQL 存储消息完整元数据,ES 存储消息索引与文本内容用于检索。

2.1 Elasticsearch 封装:索引设计与操作抽象

ES 作为全文搜索引擎,其核心是索引设计查询语法。代码中通过ESIndexESInsertESRemoveESSearch等类对 ES 操作进行封装,简化上层调用。

2.1.1 ES 索引设计:字段类型与分词策略

索引是 ES 中数据的组织形式,类似 MySQL 的表结构。在 IM 系统中,需要为 “用户” 和 “消息” 分别创建索引,其中消息索引需要支持中文分词检索。

// 用户索引创建示例(ESUser类) bool createIndex() { bool ret = zrt::ESIndex(_es_client, "user") .append("user_id", "keyword", "standard", true) // 用户ID:精确匹配,不分词 .append("nickname") // 昵称:文本类型,默认ik分词 .append("phone", "keyword", "standard", true) // 手机号:精确匹配 .append("description", "text", "standard", false) // 描述:文本类型 .append("avatar_id", "keyword", "standard", false) // 头像ID:精确匹配 .create(); // ... 日志与返回处理 } 

索引设计的核心考量:

  • 字段类型:区分 “keyword”(精确匹配,如 ID、手机号)与 “text”(全文检索,如昵称、消息内容)
  • 分词器:中文场景使用ik_max_word(最大化分词),英文 / 数字使用standard
  • 动态映射:开启dynamic: true,允许新增字段自动映射类型(灵活应对业务扩展)
2.1.2 ES 操作封装:CRUD 接口抽象

为简化 ES 操作,代码通过链式调用封装了索引创建、数据插入、删除、查询等操作:

// 数据插入封装(ESInsert类) template<typename T> ESInsert &append(const std::string &key, const T &val){ _item[key] = val; // 用Json::Value暂存数据 return *this; } bool insert(const std::string) { std::string body; Serialize(_item, body); // 序列化为JSON try { auto rsp = _client->index(_name, _type, id, body); // 调用ES客户端API // 状态码检查与异常处理 } } 

查询操作通过ESSearch类实现,支持组合条件查询(must/should/must_not):

// 消息搜索示例(ESMessage类) std::vector<zrt::Message> search(const std::string &key, const std::string &ssid) { Json::Value json_user = ESSearch(_es_client, "message") .append_must_term("chat_session_id.keyword", ssid) // 必须匹配会话ID .append_must_match("content", key) // 必须匹配关键词 .search(); // 结果解析与转换 } 

这种封装的优势在于:

  • 隐藏 ES 底层 API 细节,上层无需关注 JSON 构建与网络请求
  • 链式调用简化多条件查询的组合,代码可读性更高
  • 统一异常处理与日志记录,减少重复代码

2.2 MySQL 消息存储:结构化数据的可靠持久化

MySQL 主要存储消息的完整元数据(如发送者 ID、会话 ID、时间戳、文件信息等),支持按会话、时间范围的高效查询。代码中通过MessageTable类封装 MySQL 操作(基于 odb ORM 框架)。

// 消息存储示例(MessageServiceImpl类) ret = _mysql_message->insert(msg); // 插入消息到MySQL // 历史消息查询(基于时间范围) auto msg_lists = _mysql_message->range(chat_ssid, stime, etime); // 最近消息查询(按时间倒序取前N条) auto msg_lists = _mysql_message->recent(chat_ssid, msg_count); 

MySQL 表设计的核心字段(对应Message类):

  • message_id:主键,消息唯一标识
  • session_id:会话 ID,用于关联会话
  • user_id:发送者 ID,关联用户信息
  • message_type:消息类型(文本 / 图片 / 文件 / 语音)
  • create_time:发送时间,用于排序与范围查询
  • file_id/file_name/file_size:文件相关元数据(非文本消息)

2.3 存储协同策略:MySQL 与 ES 的分工

为什么需要同时使用 MySQL 和 ES?两者的分工如下:

  • MySQL:存储完整消息数据,支持按会话、时间的精确查询,保证数据可靠性(ACID 特性)
  • ES:存储消息文本内容与索引,支持全文检索、关键词高亮等高级查询,提供毫秒级响应

数据同步流程:

  1. 消息到达后,先写入 MySQL 保证数据不丢失
  2. 仅文本消息同步到 ES(非文本消息无需检索)
  3. 删除消息时,同时删除 MySQL 记录与 ES 文档

三、消息传输服务:从发送到转发的全流程

消息传输服务(TransmiteService)是 IM 系统的 “交通枢纽”,负责接收用户发送的消息、封装消息元数据、获取目标用户列表并转发,同时将消息同步到消息队列供存储服务处理。

3.1 RPC 接口设计:定义消息传输契约

使用 Protobuf 定义 RPC 接口,明确服务端与客户端的交互规范。消息传输服务的核心接口是GetTransmitTarget,用于获取消息转发目标:

// transmit.proto service MsgTransmitService { rpc GetTransmitTarget(NewMessageReq) returns (GetTransmitTargetRsp); } message NewMessageReq { string request_id = 1; // 请求唯一标识 string user_id = 2; // 发送者ID string chat_session_id = 3; // 会话ID MessageContent message = 4; // 消息内容 } message GetTransmitTargetRsp { string request_id = 1; bool success = 2; string errmsg = 3; MessageInfo message = 4; // 封装后的消息 repeated string target_id_list = 5; // 目标用户列表 } 

接口设计的核心原则:

  • 包含request_id用于链路追踪与日志关联
  • 明确successerrmsg字段,便于错误处理
  • 消息内容与元数据分离,MessageInfo包含发送者信息、时间戳等元数据

3.2 消息封装流程:从原始内容到完整消息

用户发送的原始消息(如文本 “你好”)需要封装为包含元数据的MessageInfo,流程如下:

// TransmiteServiceImpl::GetTransmitTarget MessageInfo message; message.set_message_id(uuid()); // 生成唯一消息ID message.set_chat_session_id(chat_ssid); // 关联会话 message.set_timestamp(time(nullptr)); // 记录发送时间 message.mutable_sender()->CopyFrom(rsp.user_info()); // 填充发送者信息(从用户服务获取) message.mutable_message()->CopyFrom(content); // 填充消息内容 

消息 ID 生成采用 UUID,保证全局唯一;发送者信息通过调用用户服务的GetUserInfo接口获取,包含用户昵称、头像等展示所需数据。

3.3 目标用户获取:从会话成员表查询

IM 系统中,消息需要转发给会话中的所有成员(除发送者外)。目标用户列表从 MySQL 的chat_session_member表获取:

// 获取会话成员列表 auto target_list = _mysql_session_member_table->members(chat_ssid); 

ChatSessionMemeberTable封装了会话成员的查询逻辑,返回该会话的所有用户 ID。实际应用中,还需过滤掉发送者自身 ID,避免消息回传。

3.4 消息队列集成:异步存储解耦

为避免消息传输过程被存储操作阻塞,系统通过 RabbitMQ 实现异步存储:传输服务将消息发布到队列,存储服务消费队列消息并持久化。

// 发布消息到RabbitMQ bool ret = _mq_client->publish(_exchange_name, message.SerializeAsString(), _routing_key); 

消息队列的作用:

  • 解耦:传输服务与存储服务无需直接通信,通过队列间接交互
  • 削峰:高并发场景下,队列缓冲消息,避免存储服务被瞬时流量压垮
  • 可靠投递:通过 RabbitMQ 的持久化机制,保证消息不会因服务宕机丢失

四、消息存储检索服务:从持久化到高效查询

消息存储检索服务(MessageService)负责消息的持久化存储与查询,提供历史消息查询、近期消息查询、关键词搜索等核心功能,是 IM 系统中数据访问的入口。

4.1 多类型消息处理:文本、图片、文件与语音

IM 系统支持多种消息类型,不同类型的消息存储方式不同:

  • 文本消息:内容直接存储到 MySQL 与 ES
  • 图片 / 语音 / 文件:二进制数据存储到文件服务,MySQL 仅存储文件 ID 等元数据
// 消息存储处理(MessageServiceImpl::onMessage) switch(message.message().message_type()) { case MessageType::STRING: // 文本消息:直接存储内容 content = message.message().string_message().content(); _es_message->appendData(...); // 同步到ES break; case MessageType::IMAGE: // 图片消息:上传文件到文件服务,存储文件ID ret = _PutFile("", msg.image_content(), ..., file_id); break; // 其他类型消息处理类似 } // 统一存储元数据到MySQL zrt::Message msg(...); msg.file_id(file_id); // 关联文件ID _mysql_message->insert(msg); 

文件上传通过调用文件服务的PutSingleFile接口实现,返回的file_id用于后续下载文件时查询。

4.2 历史消息查询:时间范围与分页

用户查询历史消息时,需支持按会话 ID、时间范围筛选,并分页返回结果。实现流程如下:

// GetHistoryMsg接口实现 // 1. 从MySQL查询时间范围内的消息元数据 auto msg_lists = _mysql_message->range(chat_ssid, stime, etime); // 2. 批量获取消息中的文件数据(如图片、语音) std::unordered_set<std::string> file_id_lists; for (const auto &msg : msg_lists) { if (!msg.file_id().empty()) file_id_lists.insert(msg.file_id()); } std::unordered_map<std::string, std::string> file_data_lists; _GetFile(rid, file_id_lists, file_data_lists); // 调用文件服务批量下载 // 3. 批量获取发送者用户信息 std::unordered_set<std::string> user_id_lists; for (const auto &msg : msg_lists) { user_id_lists.insert(msg.user_id()); } std::unordered_map<std::string, UserInfo> user_lists; _GetUser(rid, user_id_lists, user_lists); // 调用用户服务批量获取 // 4. 组装响应数据 for (const auto &msg : msg_lists) { auto message_info = response->add_msg_list(); // 填充消息元数据、用户信息、文件数据 } 

优化点:

  • 批量查询替代循环单查,减少 RPC 调用次数
  • 只下载当前页需要的文件数据,避免大量无效 IO
  • 用户信息与文件数据缓存,减少重复查询

4.3 关键词搜索:基于 ES 的全文检索

消息搜索是 IM 系统的高频需求,要求支持关键词匹配、会话内搜索等功能。基于 ES 的实现流程如下:

// MsgSearch接口实现 // 1. 调用ES搜索会话内包含关键词的消息 auto msg_lists = _es_message->search(skey, chat_ssid); // 2. 获取发送者用户信息(同历史消息查询) std::unordered_set<std::string> user_id_lists; for (const auto &msg : msg_lists) { user_id_lists.insert(msg.user_id()); } std::unordered_map<std::string, UserInfo> user_lists; _GetUser(rid, user_id_lists, user_lists); // 3. 组装响应 for (const auto &msg : msg_lists) { auto message_info = response->add_msg_list(); // 填充消息与用户信息 } 

ES 查询条件构建(ESSearch类):

// 会话内关键词搜索条件 ESSearch(_es_client, "message") .append_must_term("chat_session_id.keyword", ssid) // 限定会话 .append_must_match("content", key) // 匹配关键词 

中文搜索优化:

  • 使用 IK 分词器对消息内容分词(如 “即时通讯” 分为 “即时”、“通讯”)
  • 支持同义词扩展(如 “消息” 与 “信息” 视为等同)
  • 按匹配度排序,优先返回相关度高的结果

五、服务治理:注册、发现与连接管理

在微服务架构中,服务的动态注册与发现是保证系统弹性的核心。系统基于 etcd 实现服务治理,通过ServiceManager管理服务连接。

5.1 服务注册:向 etcd 登记服务地址

服务启动时,将自身地址(IP: 端口)注册到 etcd,便于其他服务发现:

// TransmiteServerBuilder::make_registry_object _registry_client = std::make_shared<Registry>(reg_host); _registry_client->registry(service_name, access_host); // 注册服务名与地址 

注册信息格式:/services/{service_name}/{instance_id} = {access_host},其中instance_id为服务实例唯一标识(如 UUID)。

5.2 服务发现:监听 etcd 节点变化

服务启动时,通过 etcd 监听依赖服务的地址变化,动态更新可用节点列表:

// MessageServerBuilder::make_discovery_object auto put_cb = std::bind(&ServiceManager::onServiceOnline, _mm_channels.get(), ...); auto del_cb = std::bind(&ServiceManager::onServiceOffline, _mm_channels.get(), ...); _service_discoverer = std::make_shared<Discovery>(reg_host, base_service_name, put_cb, del_cb); 

ServiceManager维护服务节点的在线状态:

  • 当 etcd 新增节点(服务上线),调用onServiceOnline添加节点
  • 当 etcd 删除节点(服务下线),调用onServiceOffline移除节点

5.3 连接管理:负载均衡与故障转移

ServiceManagerchoose方法用于从可用节点中选择一个进行 RPC 调用,实现负载均衡:

// 选择用户服务节点 auto channel = _mm_channels->choose(_user_service_name); if (!channel) { LOG_ERROR("没有可用的用户服务节点"); return false; } UserService_Stub stub(channel.get()); // 创建RPC客户端 

负载均衡策略:

  • 简单轮询:依次选择节点,适合节点性能相近的场景
  • 故障转移:检测到节点不可用时,自动切换到其他节点
  • 连接池:复用 TCP 连接,减少握手开销

六、代码设计亮点:可复用与可扩展的实现

这套代码在设计上体现了多个优秀的编程实践,值得借鉴学习。

6.1 Builder 模式:复杂对象的分步构建

服务器的初始化涉及多个组件(MySQL、ES、MQ、RPC 等),参数多且依赖复杂。代码通过TransmiteServerBuilderMessageServerBuilder实现分步构建:

// 消息存储服务构建示例 MessageServerBuilder builder; // 1. 构建ES客户端 builder.make_es_object({"http://es-node1:9200", "http://es-node2:9200"}); // 2. 构建MySQL客户端 builder.make_mysql_object("user", "pass", "mysql-host", "im_db", "utf8", 3306, 10); // 3. 构建服务发现 builder.make_discovery_object("etcd-host:2379", "/services", "file-service", "user-service"); // 4. 构建消息队列 builder.make_mq_object("rabbit", "pass", "mq-host", "msg-exchange", "store-queue", "store-key"); // 5. 构建RPC服务器 builder.make_rpc_server(8002, 60, 16); // 6. 生成服务器实例 auto server = builder.build(); server->start(); 

Builder 模式的优势:

  • 隐藏对象构建的复杂细节,提供清晰的步骤
  • 支持灵活配置,可根据环境(开发 / 测试 / 生产)调整参数
  • 便于扩展新组件,无需修改核心逻辑

6.2 接口封装:隔离底层依赖

代码对第三方库(如 elasticlient、brpc、RabbitMQ 客户端)进行了封装,上层业务逻辑不直接依赖具体库的 API:

  • ESIndex/ESInsert封装 ES 操作,更换 ES 客户端时只需修改封装层
  • MQClient封装 RabbitMQ 的连接、发布、消费操作,屏蔽 AMQP 协议细节
  • ServiceManager封装服务发现与连接管理,更换服务治理组件(如从 etcd 到 Consul)时影响范围小

这种封装降低了代码耦合度,提高了可维护性。

6.3 异常处理与日志:问题排查的关键

系统通过统一的异常处理与日志记录,确保问题可追溯:

// 异常处理示例 try { auto rsp = _client->index(_name, _type, index_id, body); if (rsp.status_code < 200 || rsp.status_code >= 300) { LOG_ERROR("创建ES索引失败,状态码: {}", rsp.status_code); return false; } } catch(std::exception &e) { LOG_ERROR("创建ES索引失败: {}", e.what()); return false; } 

日志设计原则:

  • 包含request_id,便于追踪单次请求的全链路
  • 区分DEBUG/INFO/ERROR等级别,生产环境可调整输出级别
  • 关键操作(如消息存储、服务调用)必须记录日志,包含输入输出参数

七、优化与扩展:从可用到高性能

这套代码实现了 IM 系统的核心功能,但在高并发、大数据量场景下仍有优化空间。

7.1 性能优化方向

  1. 缓存热点数据
  • 用户信息、会话列表等高频访问数据缓存到 Redis,减少 MySQL 查询
  • 近期消息缓存,减少 ES 与 MySQL 访问
  1. 异步化处理
  • 非核心流程(如消息已读状态更新)异步处理,降低主流程延迟
  • 使用线程池处理批量操作(如批量消息推送)
  1. 索引优化
  • ES 索引按时间分片(如按天),避免单索引过大
  • MySQL 添加合适的索引(如session_id+create_time联合索引)
  1. 读写分离
  • MySQL 主从分离,写操作走主库,读操作走从库
  • ES 配置副本,查询请求分发到副本节点

7.2 功能扩展建议

  1. 消息已读状态
  • 增加read字段标记消息已读状态
  • 实现 “已读回执” 功能,同步用户阅读状态
  1. 消息撤回
  • 支持指定时间内(如 5 分钟)撤回消息
  • 撤回时同时删除 MySQL 与 ES 中的数据,并通知接收方
  1. 离线消息
  • 为离线用户缓存消息,上线后批量推送
  • 实现消息同步机制,保证多端数据一致
  1. 搜索增强
  • 支持按发送者、时间范围过滤搜索结果
  • 实现关键词高亮、模糊搜索等高级功能

八、总结

本文基于实际代码实现,详细解析了 IM 系统中消息传输与存储检索服务的设计与实现。从技术选型来看,“MySQL+ES” 的混合存储方案平衡了可靠性与检索效率,“RPC + 消息队列” 的通信方式实现了服务解耦与异步处理,“etcd+ServiceManager” 的服务治理保证了系统的弹性与可扩展性。

代码中体现的 Builder 模式、接口封装、异常处理等设计思想,不仅保证了当前功能的实现,也为后续扩展奠定了基础。在实际应用中,还需根据业务规模与性能需求,进一步优化缓存策略、索引设计与异步处理机制,才能构建出高性能、高可靠的 IM 系统。

IM 系统的核心价值在于 “高效沟通”,而支撑这一价值的,正是这些隐藏在代码中的技术细节与架构设计。希望本文能为你理解 IM 系统的工作原理,或设计自己的通讯系统提供有益的参考。

Read more

【DeepSeek应用】100个 DeepSeek 官方推荐的工具箱

【DeepSeek应用】100个 DeepSeek 官方推荐的工具箱

【DeepSeek应用】Deepseek R1 本地部署(Ollama+Docker+OpenWebUI) 【DeepSeek应用】DeepSeek 搭建个人知识库(Ollama+CherryStudio) 【DeepSeek应用】100个 DeepSeek 官方推荐的工具箱 【DeepSeek应用】Zotero+Deepseek 阅读与分析文献 【DeepSeek应用】100个 DeepSeek 官方推荐的工具箱 * 1. DeepSeek 工具箱:应用程序 * 2. DeepSeek 工具箱:AI Agent 框架 * 3. DeepSeek 工具箱:RAG 框架 * 4. DeepSeek 工具箱:即时通讯软件 * 5. DeepSeek 工具箱:浏览器插件 * 6. DeepSeek 工具箱:

By Ne0inhk
“现在的AI就像1880年的笨重工厂!”微软CSO斯坦福泼冷水:别急着造神

“现在的AI就像1880年的笨重工厂!”微软CSO斯坦福泼冷水:别急着造神

大模型仍未对上商业的齿轮? 编译 | 王启隆 来源 | youtu.be/aWqfH0aSGKI 出品丨AI 科技大本营(ID:rgznai100) 现在的硅谷,空气里都飘着一股“再不上车就晚了”的焦躁感。 最近 OpenClaw 风头正旺,强势登顶 GitHub,终结了 React 神话,许多人更是觉得“AI 自己干活赚钱”的日子就在明天了。 特别是在斯坦福商学院(GSB)这种地方,台下坐着的都是成天琢磨怎么用下一个技术风口搞个独角兽出来的狠人。 微软的首席科学官(CSO)Eric Horvitz 被请到了这个几乎全美最想用 AI 变现的礼堂里。作为从上世纪 80 年代就开始搞 AI 的绝对老炮、也是微软技术底座的“扫地僧”,这位老哥并没有顺着台下的胃口,去吹捧下个月大模型又要颠覆什么行业,而是兜头给大家浇了一盆带点学术味的冷水。 他讲了一个挺有画面感的比喻:大家都在聊

By Ne0inhk
Godot被AI代码“围攻”!维护者崩溃发声:“不知道还能坚持多久”

Godot被AI代码“围攻”!维护者崩溃发声:“不知道还能坚持多久”

整理 | 郑丽媛 出品 | ZEEKLOG(ID:ZEEKLOGnews) 当大模型能在几秒钟内生成一段“看起来像那么回事”的补丁时,开源社区却开始付出另一种代价。 最近,开源游戏引擎 Godot 的核心维护团队公开吐槽:他们正被大量“AI 生成的低质量代码”淹没。那些代码往往结构完整、注释齐全、描述洋洋洒洒,但真正的问题是——提交者可能并不理解自己交上来的内容。 这件事,并不是简单的“有人偷懒用 AI 写代码”。它正在触及开源协作最核心的东西:信任。 一场悄无声息的“AI 洪水” 事情的导火索来自一条 Bluesky 讨论帖。 Godot 主要维护者之一、同时也是 Godot 商业支持公司 W4 Games 联合创始人的 Rémi Verschelde 表示,所谓的“AI slop”

By Ne0inhk
48小时“烧光”56万!三人创业团队濒临破产,仅因Gemini API密钥被盗:“AI账单远超我们的银行余额”

48小时“烧光”56万!三人创业团队濒临破产,仅因Gemini API密钥被盗:“AI账单远超我们的银行余额”

整理 | 苏宓 出品 | ZEEKLOG(ID:ZEEKLOGnews) 「仅过了 48 小时,一笔 8.2 万美元的天价费用凭空出现,较这家小型初创公司的正常月费暴涨近 46000%。」 这不是假设的虚幻故事,而是一家墨西哥初创公司正在经历的真实危机。 近日,一位名为 RatonVaquero 的开发者在 Reddit 发帖求助称,由于他的 Gemini API 密钥被盗用,原本每月仅约 180 美元(约 1242 元)的费用,在短短 48 小时内暴涨到 82,314.44 美元(约 56.8 万元)。对于这家只有三名开发者的小型创业团队来说,这笔突如其来的账单,几乎等同于灭顶之灾。 “我现在整个人都处在震惊和恐慌之中。”RatonVaquero

By Ne0inhk