跳到主要内容
即时通讯系统核心模块:从传输到存储的全链路设计 | 极客日志
C++ SaaS WeChat 算法
即时通讯系统核心模块:从传输到存储的全链路设计 即时通讯系统需解决消息实时传输、持久化存储及快速检索问题。本文结合生产级代码,解析基于 brpc、MySQL、Elasticsearch 及 RabbitMQ 的架构方案。重点阐述 MySQL 与 ES 混合存储策略、消息传输服务封装流程、以及基于 etcd 的服务治理机制。通过 Builder 模式简化初始化,利用消息队列解耦存储,保障高并发下的可靠性与扩展性。涵盖文本、图片等多类型消息处理,并提供缓存优化与功能扩展建议,为构建高性能 IM 系统提供参考。
RustyLab 发布于 2026/3/23 0 浏览即时通讯系统核心模块:从传输到存储的全链路设计
即时通讯(IM)系统如今已是日常沟通与工作协作的基石。一个高性能、高可靠的 IM 系统需要妥善解决消息的实时传输、持久化存储、快速检索等核心问题。本文结合一套生产级代码,拆解 IM 系统中消息传输与存储检索的核心逻辑。
系统架构概览:核心模块与技术栈
在展开具体实现前,我们先梳理这套 IM 系统的核心模块与技术选型。从代码来看,该系统采用微服务架构,将核心功能拆分为消息传输服务 与消息存储检索服务 ,通过标准化接口实现模块间通信。
核心业务流程
IM 系统的核心业务流程可概括为:
消息发送 :用户发送消息后,由消息传输服务负责验证、封装并转发给目标用户
消息存储 :传输服务将消息同步到消息队列,由存储服务消费并持久化到数据库
消息检索 :用户查询历史消息或关键词搜索时,存储服务从数据库 / 搜索引擎中获取数据并返回
技术栈选型
为满足高并发、低延迟、可扩展的需求,系统选用了以下技术组件:
RPC 框架 :brpc(百度开源的高性能 RPC 框架,支持高并发场景)
关系型数据库 :MySQL(存储消息元数据、用户会话关系等结构化数据)
搜索引擎 :Elasticsearch(简称 ES,用于消息全文检索,支持中文分词)
消息队列 :RabbitMQ(实现消息异步传输,解耦服务间依赖)
服务治理 :etcd(提供服务注册与发现,支持动态扩缩容)
数据序列化 :Protobuf(高效的结构化数据序列化协议)
日志系统 :自定义 logger(记录系统运行状态与错误信息)
模块交互关系
各模块通过'服务注册 - 发现'机制动态感知对方地址,通过 RPC 接口通信:
消息传输服务依赖用户服务获取发送者信息,依赖 MySQL 获取会话成员列表
消息存储服务依赖文件服务存储图片 / 语音等二进制数据,依赖用户服务获取发送者信息
所有服务通过 etcd 注册自身地址,通过服务发现机制找到依赖服务的地址
数据存储层设计:ES 与 MySQL 的协同方案
消息数据的存储是 IM 系统的基础,需要同时满足'可靠存储'与'高效检索'的需求。系统采用'MySQL+ES'的混合存储方案:MySQL 存储消息完整元数据,ES 存储消息索引与文本内容用于检索。
Elasticsearch 封装:索引设计与操作抽象
ES 作为全文搜索引擎,其核心是索引设计 与查询语法 。代码中通过 ESIndex、ESInsert、ESRemove、ESSearch 等类对 ES 操作进行封装,简化上层调用。
ES 索引设计:字段类型与分词策略
索引是 ES 中数据的组织形式,类似 MySQL 的表结构。在 IM 系统中,需要为'用户'和'消息'分别创建索引,其中消息索引需要支持中文分词检索。
bool createIndex () {
bool ret = zrt::ESIndex (_es_client, )
. ( , , , )
. ( )
. ( , , , )
. ( , , , )
. ( , , , )
. ();
}
"user"
append
"user_id"
"keyword"
"standard"
true
append
"nickname"
append
"phone"
"keyword"
"standard"
true
append
"description"
"text"
"standard"
false
append
"avatar_id"
"keyword"
"standard"
false
create
字段类型 :区分'keyword'(精确匹配,如 ID、手机号)与'text'(全文检索,如昵称、消息内容)
分词器 :中文场景使用 ik_max_word(最大化分词),英文 / 数字使用 standard
动态映射 :开启 dynamic: true,允许新增字段自动映射类型(灵活应对业务扩展)
ES 操作封装:CRUD 接口抽象 为简化 ES 操作,代码通过链式调用封装了索引创建、数据插入、删除、查询等操作:
template <typename T>
ESInsert &append (const std::string &key, const T &val) {
_item[key] = val;
return *this ;
}
bool insert (const std::string) {
std::string body;
Serialize (_item, body);
try {
auto rsp = _client->index (_name, _type, id, body);
}
}
查询操作通过 ESSearch 类实现,支持组合条件查询(must/should/must_not):
std::vector<zrt::Message> search (const std::string &key, const std::string &ssid) {
Json::Value json_user = ESSearch (_es_client, "message" )
.append_must_term ("chat_session_id.keyword" , ssid)
.append_must_match ("content" , key)
.search ();
}
隐藏 ES 底层 API 细节,上层无需关注 JSON 构建与网络请求
链式调用简化多条件查询的组合,代码可读性更高
统一异常处理与日志记录,减少重复代码
MySQL 消息存储:结构化数据的可靠持久化 MySQL 主要存储消息的完整元数据(如发送者 ID、会话 ID、时间戳、文件信息等),支持按会话、时间范围的高效查询。代码中通过 MessageTable 类封装 MySQL 操作(基于 odb ORM 框架)。
ret = _mysql_message->insert (msg);
auto msg_lists = _mysql_message->range (chat_ssid, stime, etime);
auto msg_lists = _mysql_message->recent (chat_ssid, msg_count);
MySQL 表设计的核心字段(对应 Message 类):
message_id:主键,消息唯一标识
session_id:会话 ID,用于关联会话
user_id:发送者 ID,关联用户信息
message_type:消息类型(文本 / 图片 / 文件 / 语音)
create_time:发送时间,用于排序与范围查询
file_id/file_name/file_size:文件相关元数据(非文本消息)
存储协同策略:MySQL 与 ES 的分工 为什么需要同时使用 MySQL 和 ES?两者的分工如下:
MySQL :存储完整消息数据,支持按会话、时间的精确查询,保证数据可靠性(ACID 特性)
ES :存储消息文本内容与索引,支持全文检索、关键词高亮等高级查询,提供毫秒级响应
消息到达后,先写入 MySQL 保证数据不丢失
仅文本消息同步到 ES(非文本消息无需检索)
删除消息时,同时删除 MySQL 记录与 ES 文档
消息传输服务:从发送到转发的全流程 消息传输服务(TransmiteService)是 IM 系统的'交通枢纽',负责接收用户发送的消息、封装消息元数据、获取目标用户列表并转发,同时将消息同步到消息队列供存储服务处理。
RPC 接口设计:定义消息传输契约 使用 Protobuf 定义 RPC 接口,明确服务端与客户端的交互规范。消息传输服务的核心接口是 GetTransmitTarget,用于获取消息转发目标:
// transmit.proto
service MsgTransmitService {
rpc GetTransmitTarget(NewMessageReq) returns (GetTransmitTargetRsp);
}
message NewMessageReq {
string request_id = 1; // 请求唯一标识
string user_id = 2; // 发送者 ID
string chat_session_id = 3; // 会话 ID
MessageContent message = 4; // 消息内容
}
message GetTransmitTargetRsp {
string request_id = 1;
bool success = 2;
string errmsg = 3;
MessageInfo message = 4; // 封装后的消息
repeated string target_id_list = 5; // 目标用户列表
}
包含 request_id 用于链路追踪与日志关联
明确 success 与 errmsg 字段,便于错误处理
消息内容与元数据分离,MessageInfo 包含发送者信息、时间戳等元数据
消息封装流程:从原始内容到完整消息 用户发送的原始消息(如文本'你好')需要封装为包含元数据的 MessageInfo,流程如下:
MessageInfo message;
message.set_message_id (uuid ());
message.set_chat_session_id (chat_ssid);
message.set_timestamp (time (nullptr ));
message.mutable_sender ()->CopyFrom (rsp.user_info ());
message.mutable_message ()->CopyFrom (content);
消息 ID 生成采用 UUID,保证全局唯一;发送者信息通过调用用户服务的 GetUserInfo 接口获取,包含用户昵称、头像等展示所需数据。
目标用户获取:从会话成员表查询 IM 系统中,消息需要转发给会话中的所有成员(除发送者外)。目标用户列表从 MySQL 的 chat_session_member 表获取:
auto target_list = _mysql_session_member_table->members (chat_ssid);
ChatSessionMemeberTable 封装了会话成员的查询逻辑,返回该会话的所有用户 ID。实际应用中,还需过滤掉发送者自身 ID,避免消息回传。
消息队列集成:异步存储解耦 为避免消息传输过程被存储操作阻塞,系统通过 RabbitMQ 实现异步存储:传输服务将消息发布到队列,存储服务消费队列消息并持久化。
bool ret = _mq_client->publish (_exchange_name, message.SerializeAsString (), _routing_key);
解耦 :传输服务与存储服务无需直接通信,通过队列间接交互
削峰 :高并发场景下,队列缓冲消息,避免存储服务被瞬时流量压垮
可靠投递 :通过 RabbitMQ 的持久化机制,保证消息不会因服务宕机丢失
消息存储检索服务:从持久化到高效查询 消息存储检索服务(MessageService)负责消息的持久化存储与查询,提供历史消息查询、近期消息查询、关键词搜索等核心功能,是 IM 系统中数据访问的入口。
多类型消息处理:文本、图片、文件与语音 IM 系统支持多种消息类型,不同类型的消息存储方式不同:
文本消息 :内容直接存储到 MySQL 与 ES
图片 / 语音 / 文件 :二进制数据存储到文件服务,MySQL 仅存储文件 ID 等元数据
switch (message.message ().message_type ()) {
case MessageType::STRING:
content = message.message ().string_message ().content ();
_es_message->appendData (...);
break ;
case MessageType::IMAGE:
ret = _PutFile("" , msg.image_content (), ..., file_id);
break ;
}
zrt::Message msg (...) ;
msg.file_id (file_id);
_mysql_message->insert (msg);
文件上传通过调用文件服务的 PutSingleFile 接口实现,返回的 file_id 用于后续下载文件时查询。
历史消息查询:时间范围与分页 用户查询历史消息时,需支持按会话 ID、时间范围筛选,并分页返回结果。实现流程如下:
auto msg_lists = _mysql_message->range (chat_ssid, stime, etime);
std::unordered_set<std::string> file_id_lists;
for (const auto &msg : msg_lists) {
if (!msg.file_id ().empty ()) file_id_lists.insert (msg.file_id ());
}
std::unordered_map<std::string, std::string> file_data_lists;
_GetFile(rid, file_id_lists, file_data_lists);
std::unordered_set<std::string> user_id_lists;
for (const auto &msg : msg_lists) {
user_id_lists.insert (msg.user_id ());
}
std::unordered_map<std::string, UserInfo> user_lists;
_GetUser(rid, user_id_lists, user_lists);
for (const auto &msg : msg_lists) {
auto message_info = response->add_msg_list ();
}
批量查询替代循环单查,减少 RPC 调用次数
只下载当前页需要的文件数据,避免大量无效 IO
用户信息与文件数据缓存,减少重复查询
关键词搜索:基于 ES 的全文检索 消息搜索是 IM 系统的高频需求,要求支持关键词匹配、会话内搜索等功能。基于 ES 的实现流程如下:
auto msg_lists = _es_message->search (skey, chat_ssid);
std::unordered_set<std::string> user_id_lists;
for (const auto &msg : msg_lists) {
user_id_lists.insert (msg.user_id ());
}
std::unordered_map<std::string, UserInfo> user_lists;
_GetUser(rid, user_id_lists, user_lists);
for (const auto &msg : msg_lists) {
auto message_info = response->add_msg_list ();
}
ESSearch (_es_client, "message" )
.append_must_term ("chat_session_id.keyword" , ssid)
.append_must_match ("content" , key);
使用 IK 分词器对消息内容分词(如'即时通讯'分为'即时'、'通讯')
支持同义词扩展(如'消息'与'信息'视为等同)
按匹配度排序,优先返回相关度高的结果
服务治理:注册、发现与连接管理 在微服务架构中,服务的动态注册与发现是保证系统弹性的核心。系统基于 etcd 实现服务治理,通过 ServiceManager 管理服务连接。
服务注册:向 etcd 登记服务地址 服务启动时,将自身地址(IP: 端口)注册到 etcd,便于其他服务发现:
_registry_client = std::make_shared <Registry>(reg_host);
_registry_client->registry (service_name, access_host);
注册信息格式:/services/{service_name}/{instance_id} = {access_host},其中 instance_id 为服务实例唯一标识(如 UUID)。
服务发现:监听 etcd 节点变化 服务启动时,通过 etcd 监听依赖服务的地址变化,动态更新可用节点列表:
auto put_cb = std::bind (&ServiceManager::onServiceOnline, _mm_channels.get (), ...);
auto del_cb = std::bind (&ServiceManager::onServiceOffline, _mm_channels.get (), ...);
_service_discoverer = std::make_shared <Discovery>(reg_host, base_service_name, put_cb, del_cb);
ServiceManager 维护服务节点的在线状态:
当 etcd 新增节点(服务上线),调用 onServiceOnline 添加节点
当 etcd 删除节点(服务下线),调用 onServiceOffline 移除节点
连接管理:负载均衡与故障转移 ServiceManager 的 choose 方法用于从可用节点中选择一个进行 RPC 调用,实现负载均衡:
auto channel = _mm_channels->choose (_user_service_name);
if (!channel) {
LOG_ERROR ("没有可用的用户服务节点" );
return false ;
}
UserService_Stub stub (channel.get()) ;
简单轮询:依次选择节点,适合节点性能相近的场景
故障转移:检测到节点不可用时,自动切换到其他节点
连接池:复用 TCP 连接,减少握手开销
代码设计亮点:可复用与可扩展的实现 这套代码在设计上体现了多个优秀的编程实践,值得借鉴学习。
Builder 模式:复杂对象的分步构建 服务器的初始化涉及多个组件(MySQL、ES、MQ、RPC 等),参数多且依赖复杂。代码通过 TransmiteServerBuilder 与 MessageServerBuilder 实现分步构建:
MessageServerBuilder builder;
builder.make_es_object ({"http://es-node1:9200" , "http://es-node2:9200" });
builder.make_mysql_object ("user" , "pass" , "mysql-host" , "im_db" , "utf8" , 3306 , 10 );
builder.make_discovery_object ("etcd-host:2379" , "/services" , "file-service" , "user-service" );
builder.make_mq_object ("rabbit" , "pass" , "mq-host" , "msg-exchange" , "store-queue" , "store-key" );
builder.make_rpc_server (8002 , 60 , 16 );
auto server = builder.build ();
server->start ();
隐藏对象构建的复杂细节,提供清晰的步骤
支持灵活配置,可根据环境(开发 / 测试 / 生产)调整参数
便于扩展新组件,无需修改核心逻辑
接口封装:隔离底层依赖 代码对第三方库(如 elasticlient、brpc、RabbitMQ 客户端)进行了封装,上层业务逻辑不直接依赖具体库的 API:
ESIndex/ESInsert 封装 ES 操作,更换 ES 客户端时只需修改封装层
MQClient 封装 RabbitMQ 的连接、发布、消费操作,屏蔽 AMQP 协议细节
ServiceManager 封装服务发现与连接管理,更换服务治理组件(如从 etcd 到 Consul)时影响范围小
异常处理与日志:问题排查的关键 系统通过统一的异常处理与日志记录,确保问题可追溯:
try {
auto rsp = _client->index (_name, _type, index_id, body);
if (rsp.status_code < 200 || rsp.status_code >= 300 ) {
LOG_ERROR ("创建 ES 索引失败,状态码:{}" , rsp.status_code);
return false ;
}
} catch (std::exception &e) {
LOG_ERROR ("创建 ES 索引失败:{}" , e.what ());
return false ;
}
包含 request_id,便于追踪单次请求的全链路
区分 DEBUG/INFO/ERROR 等级别,生产环境可调整输出级别
关键操作(如消息存储、服务调用)必须记录日志,包含输入输出参数
优化与扩展:从可用到高性能 这套代码实现了 IM 系统的核心功能,但在高并发、大数据量场景下仍有优化空间。
性能优化方向
缓存热点数据 :
用户信息、会话列表等高频访问数据缓存到 Redis,减少 MySQL 查询
近期消息缓存,减少 ES 与 MySQL 访问
异步化处理 :
非核心流程(如消息已读状态更新)异步处理,降低主流程延迟
使用线程池处理批量操作(如批量消息推送)
索引优化 :
ES 索引按时间分片(如按天),避免单索引过大
MySQL 添加合适的索引(如 session_id+create_time 联合索引)
读写分离 :
MySQL 主从分离,写操作走主库,读操作走从库
ES 配置副本,查询请求分发到副本节点
功能扩展建议
消息已读状态 :
增加 read 字段标记消息已读状态
实现'已读回执'功能,同步用户阅读状态
消息撤回 :
支持指定时间内(如 5 分钟)撤回消息
撤回时同时删除 MySQL 与 ES 中的数据,并通知接收方
离线消息 :
为离线用户缓存消息,上线后批量推送
实现消息同步机制,保证多端数据一致
搜索增强 :
支持按发送者、时间范围过滤搜索结果
实现关键词高亮、模糊搜索等高级功能
总结 本文结合生产级代码,详细解析了 IM 系统中消息传输与存储检索服务的设计与实现。从技术选型来看,'MySQL+ES'的混合存储方案平衡了可靠性与检索效率,'RPC + 消息队列'的通信方式实现了服务解耦与异步处理,'etcd+ServiceManager'的服务治理保证了系统的弹性与可扩展性。
代码中体现的 Builder 模式、接口封装、异常处理等设计思想,不仅保证了当前功能的实现,也为后续扩展奠定了基础。在实际应用中,还需根据业务规模与性能需求,进一步优化缓存策略、索引设计与异步处理机制,才能构建出高性能、高可靠的 IM 系统。
IM 系统的核心价值在于'高效沟通',而支撑这一价值的,正是这些隐藏在代码中的技术细节与架构设计。希望本文能为你理解 IM 系统的工作原理,或设计自己的通讯系统提供有益的参考。
相关免费在线工具 加密/解密文本 使用加密算法(如AES、TripleDES、Rabbit或RC4)加密和解密文本明文。 在线工具,加密/解密文本在线工具,online
Base64 字符串编码/解码 将字符串编码和解码为其 Base64 格式表示形式即可。 在线工具,Base64 字符串编码/解码在线工具,online
Base64 文件转换器 将字符串、文件或图像转换为其 Base64 表示形式。 在线工具,Base64 文件转换器在线工具,online
Markdown转HTML 将 Markdown(GFM)转为 HTML 片段,浏览器内 marked 解析;与 HTML转Markdown 互为补充。 在线工具,Markdown转HTML在线工具,online
HTML转Markdown 将 HTML 片段转为 GitHub Flavored Markdown,支持标题、列表、链接、代码块与表格等;浏览器内处理,可链接预填。 在线工具,HTML转Markdown在线工具,online
JSON 压缩 通过删除不必要的空白来缩小和压缩JSON。 在线工具,JSON 压缩在线工具,online