【C++/Linux实战项目】从零手撸 Json-RPC 框架,吃透 RPC 核心原理与落地实现
目录
5.4.1 rpc_router(服务端):RPC 请求处理与服务管理
5.4.2 Requestor(客户端):请求与响应的映射管理
5.4.3 rpc_caller(客户端):RPC 调用上层接口封装
前言
- RPC 技术的核心价值与应用场景:分布式系统中远程调用的必要性,Json-RPC 轻量、跨语言的优势
- 本文核心目标:基于 C++ 实现轻量 Json-RPC 框架,掌握 RPC 底层原理、网络通信、序列化、服务注册发现等核心技术
- 框架最终能力:支持同步 / 异步 / 回调 RPC 调用、服务注册与发现、主题发布订阅,基于 Muduo+JsonCpp 实现
- 项目Gitee源码链接:Gitee项目源码

内存泄露测试——Valgrind
为了确保代码上线后不会因为内存泄露导致崩溃,我这次用 Valgrind 对项目做了一次最严格的体检。
我没有只跑单连接测试,而是针对性地设计了两个 “极限场景”:
1. 100 连接并发测试启动 100 个客户端同时连接服务端,模拟高并发环境。结果:Valgrind 全程绿灯,服务端和客户端运行稳定,日志显示无内存泄露。
2. 百次重连压力测试让客户端反复断开重连 100 次。这个场景最容易暴露资源未释放的问题(如句柄泄露)。结果:内存占用曲线平稳,Valgrind 报告 registry_leak.log、rpc_server_leak.log 等文件均无异常。
特别是 服务注册发现模块,经过多次测试确认,无论是注册还是下线流程,内存均能精准回收。
结论:通过 Valgrind 检测,证明当前项目内存管理逻辑健壮,不存在内存泄露风险。
具体如图:
registry_leak.log | rpc_client_leak.log
|
rpc_client_long_leak.log
| rpc_server_leak.log![]() |
rpc_server_long_leak.log![]() | topic_leak.log![]() |
Valgrind 检测结果补充:非业务层内存残留说明
其中在rpc_server_leak.log和rpc_server_long_leak.log中有一处内存泄露是因为我手动ctrl+c退出后muduo库中的连接异常关闭时连接资源的存留导致的,是 muduo 库在-强制中断-场景下的非致命残留,不影响生产环境(生产环境不会手动 Ctrl+C 退出),不影响程序结束后会自动释放不属于工程中的内存泄露。
一、RPC 与 Json-RPC 核心认知
1.1 RPC 是什么?远程过程调用的核心思想
- 本地调用 vs 远程调用的本质区别,RPC “像调用本地方法一样调用远程方法” 的核心目标
- RPC 通信的核心流程:客户端请求、网络传输、服务端处理、结果返回
- 一个完整 RPC 框架的核心组成:序列化、通信协议、服务注册 / 发现、负载均衡等
1.2 Json-RPC 框架定位
- Json-RPC 的特性:基于 JSON 序列化,轻量、易解析、跨语言,无需复杂 IDL 代码生成
- 本次实现的 Json-RPC 框架核心功能:同步 / 异步 /future RPC 调用、服务注册与发现、服务上下线通知、主题发布订阅
二、技术选型与开发环境搭建
2.1 核心技术选型思路与理由
- 网络通信:Muduo 库(Reactor 模型、高并发、异步 IO,降低网络开发成本)
- 序列化 / 反序列化:JsonCpp(轻量、易上手,契合 Json-RPC 的 JSON 数据格式)
- 远程调用实现方案:基于函数名 + 参数映射,避开 IDL 代码生成的复杂度
- C++ 特性:C++11 异步操作(std::future/std::promise/std::packaged_task)实现异步调用
- 粘包解决:LV 格式应用层协议,保证消息的完整解析
2.2 跨系统开发环境搭建
- CentOS 7.6 环境搭建:软件源更换、gcc/g++ 高版本安装、JsonCpp/Muduo/CMake 等依赖部署
- Ubuntu 22.04 环境搭建:国内源配置、核心依赖安装、Muduo 库编译与部署
三、核心依赖库核心用法拆解
3.1 JsonCpp:JSON 序列化与反序列化
- JSON 数据格式核心:对象、数组、基本数据类型的表示
- JsonCpp 核心类:Json::Value(数据存储)、StreamWriterBuilder(序列化)、CharReaderBuilder(反序列化)
- 实战:基础序列化 / 反序列化示例,封装通用 Json 工具类(serialize/unserialize)
3.2 Muduo 库:C++ 高并发网络编程核心
- Muduo 库核心设计:Reactor 模型、one loop per thread 线程模型
- 核心类使用:TcpServer/TcpClient(服务端 / 客户端)、EventLoop(事件循环)、TcpConnection(连接管理)、Buffer(数据缓冲区)
- 实战:基于 Muduo 实现简单英译汉 TCP 服务端 / 客户端,吃透 Muduo 核心回调机制
四、Json-RPC 框架整体设计思路
4.1 框架核心设计目标
- 高拓展性:采用抽象层 + 具象层 + 业务层三层设计,支持底层组件的插拔替换,符合开闭原则。
- 高调用性:封装上层接口实现业务层,开发者无需关注网络细节,直接实现业务逻辑。
- 分布式能力:支持服务的发现和注册、服务上线下线通知,结合主题的发布订阅,实现分布式RPC调用。
- 三种调用方式:支持,同步、异步、回调三种RPC调用方式
4.2 三层架构设计
- 抽象层:封装了项目框架的实现接口,网络通信、协议、消息、客户端、服务器等抽象接口的封装。
- 具象层:项目框架的选型,具体使用哪种网络通信模块是muduo还是Libevent或者是Boost.Asio,以及协议上是选LV还是HTTP还是其他协议。继承抽象层完成接口的具体封装实现。
- 业务层:基于抽象层 + 具象层实现RPC调用、服务注册发现、主题发布订阅等核心业务逻辑
4.3 模块划分
- 服务端核心模块:Common、rpc_router、rpc_registry、rpc_topic、rpc_server
- 客户端核心模块:Common、Requestor、rpc_caller、rpc_registry、rpc_topic、rpc_client
- 公共模块Common:Abstract.hpp、Detail.hpp、Dispatcher.hpp、Fields.hpp、Message.hpp、Net.hpp
- 核心调度器 Dispatcher:基于消息类型映射回调函数,实现消息的分发处理
- 核心消息处理器 Requestor:基于UUID类型映射请求与响应,实现请求和响应的一一对应。
五、框架核心模块分步实现
5.1 基础工具类实现
- 日志宏封装:实现 LDBG/LINF/LERR 分级日志,快速定位问题
- UUID 生成工具:生成全局唯一消息 ID,解决请求与响应的映射问题
- 通用常量定义:消息类型、响应码、操作类型等宏定义,统一框架常量
5.2 通信层核心实现
- 应用层 LV 协议实现:解决 TCP 粘包问题,定义 “长度 + 消息类型 + ID 长度 + ID + 正文” 的消息格式
- Muduo 库封装:基于 Muduo 实现 BaseServer/BaseClient 的具象化(MuduoServer/MuduoClient)
- 连接管理:封装 MuduoConnection,统一连接的发送、关闭、状态判断接口
5.3 消息层实现
- 基础消息类:继承 BaseMessage 实现 JsonMessage,封装 JSON 序列化 / 反序列化
- 业务消息类:实现 RpcRequest/RpcResponse、TopicRequest/TopicResponse、ServiceRequest/ServiceResponse,针对不同业务做参数校验
- 消息工厂:基于消息类型动态创建消息对象,解耦消息创建与业务逻辑
5.4 核心业务模块实现
5.4.1 rpc_router(服务端):RPC 请求处理与服务管理
核心类:ServiceDescribe、DecribeFactory、ServiceManager、RpcRouter
ServiceDescribe:方法描述类——业务回调函数、方法名称、参数字段及格式描述、参数校验接口。
SDecribeFactory:工厂类,用于对设置远程调用接口的工厂、便于接口的正确设置,在build函数中会对参数进行校验。
ServiceManager:服务端会提供很多方法服务需要进行良好的管理。
RpcRouter:真正的RPC远程调用,保存了一个ServerManager类,设置onRpcRequest函数设置到epoll的就绪回调函数中。才能真正的跑起来。
具体细节代码如下:
// #include "../commom/net.hpp" #include "/home/hxw/112/112_linux/demo/source/common/net.hpp" #include "/home/hxw/112/112_linux/demo/source/common/message.hpp" // #include "../commom/message.hpp" namespace bitrpc{ namespace server{ enum class VType{ BOOL=0, INTEGRAL, NUMERIC, STREING, ARRAY, OBJECT, }; //方法描述类——业务回调函数、方法名称、参数字段及格式描述、参数校验接口 class ServiceDescribe{ public: using ptr = std::shared_ptr<ServiceDescribe>; //请求和响应 using ServiceCallback = std::function<void(const Json::Value& , Json::Value &)>; //名称+类型 using ParamsDescribe = std::pair<std::string,VType>; //引用折叠+move(右值构造)——节省空间+时间效率更高 ServiceDescribe(std::string && mname,std::vector<ParamsDescribe>&& desc,VType vtype, ServiceCallback&& handler): _method_name(std::move(mname)), _callback(std::move(handler)), _params_desc(std::move(desc)), _return_type(vtype) {} const std::string &method(){return _method_name;} //针对收到的请求中的参数进行校验 bool paramCheck(const Json::Value & params){ //对params进行参数校验---判断所描述的参数字段是否存在,类型是否一致。 for(auto& desc: _params_desc){ //isMember()——检查first是否在——名称是否正确 if(params.isMember(desc.first)==false){ ELOG("参数字段完整性校验失败!%s 字段缺失!",desc.first.c_str()); return false; } //这里为什么是params[desc.first]——解答:类似于map中的key-value,实际上就是类型 if(check(desc.second,params[desc.first])==false){ ELOG("%s 参数类型校验失败!",desc.first.c_str()); return false; } } return true; } bool call(const Json::Value ¶ms,Json::Value& result){ _callback(params,result); //result是结果,然后看结果也就是返回值是否正确 Json::FastWriter writer; DLOG("结果是: %s", writer.write(result).c_str()); if(retypeCheck(result)==false){ ELOG("回调处理函数中的响应信息校验失败!"); return false; } return true; } private: bool retypeCheck(const Json::Value &val){ return check(_return_type,val); } bool check(VType vtype,const Json::Value &val){ switch(vtype){ case VType::BOOL :return val.isBool(); case VType::INTEGRAL :return val.isIntegral(); case VType::NUMERIC :return val.isNumeric(); case VType::STREING :return val.isString(); case VType::ARRAY :return val.isArray(); case VType::OBJECT :return val.isObject(); } return false; } private: std::string _method_name;//方法名称 //实际的业务回调函数中,第一个参数是请求,然后先给第一个参数做校验,接着看是否校验成功,最后整合成响应 ServiceCallback _callback;//实际的业务回调函数 std::vector<ParamsDescribe> _params_desc;//参数字段格式的描述 VType _return_type;//对返回值类型的描述 }; class SDecribeFactory{ public: void setMethodName(const std::string & name){ _method_name=name; } void setReturnType(VType vtype){ _return_type=vtype; } void setParamsDesc(const std::string &pname,VType vtype){ _params_desc.push_back(ServiceDescribe::ParamsDescribe(pname,vtype)); } void setCallback(const ServiceDescribe::ServiceCallback &cb){ _callback=cb; } ServiceDescribe::ptr build(){ return std::make_shared<ServiceDescribe>(std::move(_method_name),std::move(_params_desc),_return_type,std::move(_callback)); } private: std::string _method_name; ServiceDescribe::ServiceCallback _callback;//实际的业务回调函数 std::vector<ServiceDescribe::ParamsDescribe> _params_desc;//参数字段格式的描述 VType _return_type;//对返回值类型的描述 }; //服务端会提供很多方法服务需要进行良好的管理。///直接写到RpcRouter中不好这样直接和业务写在一起了 class ServiceManager{ public: using ptr = std::shared_ptr<ServiceManager>; //对方法管理类的增删查——只要涉及到CURD就会考虑到锁的安全问题 void insert(const ServiceDescribe::ptr& desc){ std::unique_lock<std::mutex> _lock(_mutex); _services.insert(std::make_pair(desc->method(),desc)); } ServiceDescribe::ptr select(const std::string& method_name){ std::unique_lock<std::mutex> _lock(_mutex); auto it=_services.find(method_name); if(_services.end() == it){ return ServiceDescribe::ptr(); } return it->second; } void remove(const std::string& method_name){ std::unique_lock<std::mutex> _lock(_mutex); _services.erase(method_name); } private: std::mutex _mutex; //通过hash_map就能很容易的判断能否提供某个服务。 //方法名——方法 std::unordered_map<std::string,ServiceDescribe::ptr> _services; }; class RpcRouter{ public: using ptr = std::shared_ptr<RpcRouter>; RpcRouter():_service_manager(std::make_shared<ServiceManager>()){} //这是注册到dispatcher模块针对rpc请求进行的回调处理的业务函数 //处理逻辑——收到一个rpc请求后,去取出方法名称,参数信息,通过方法名称找到方法服务的描述对象,先进行参数校验,判断没有问题则调用回调函数进行处理。 void onRpcRequest(const BaseConnection::ptr &conn,RpcRequest::ptr &request){ //1.查询客户群请求的方法描述--判断当前服务端能否提供对应的服务 auto service = _service_manager->select(request->method()); if(nullptr == service.get()){ ELOG("%s 服务未找到!",request->method().c_str()); return response(conn,request,Json::Value(),Rcode::RECODE_NOT_FOUND_SERVICE); } //2.进行参数校验,确定能否提供服务 if(service->paramCheck(request->params())==false){ ELOG("%s 服务参数校验失败!",request->method().c_str()); return response(conn,request,Json::Value(),Rcode::RECODE_INVALID_PARAMS); } //3.调用业务回调接口进行业务处理 Json::Value rsp; bool ret=service->call(request->params(),rsp); if(false == ret){ ELOG("%s 服务参数校验失败!",request->method().c_str()); return response(conn,request,Json::Value(),Rcode::RECODE_INTERNAL_ERROR); } //4.处理完毕得到结果,组织响应,向客户端发送结果。 return response(conn,request,rsp,Rcode::RECODE_OK); } //这里我们是不知道参数个数的作为设计者,所以有两种方法一是给一个工厂让用户构造好描述信息后传进来(也就是下面的方法),方法二是用可变参数来做,虽然有些复杂但是用户的工作量确实变少了。 void registerMethod(const ServiceDescribe::ptr &service){ return _service_manager->insert(service); } private: void response(const BaseConnection::ptr &conn,const RpcRequest::ptr &req,const Json::Value & rsp,Rcode rcode){ auto msg = MessageFactory::create<RpcResponse>(); msg->setId(req->rid()); msg->setMType(bitrpc::MType::RSP_RPC); msg->setRcode(rcode); msg->setResult(rsp); conn->send(msg); } private: ServiceManager::ptr _service_manager; }; } } 5.4.2 Requestor(客户端):请求与响应的映射管理
消息的真正管理器,每一个客户端都需要包含Requestor来确保消息的正确到达和发布。通过UUID的hash映射使用_request_desc对象将请求和响应一一对应起来,确保客户端发出的请求和服务器的响应一一对应,保证了多线程中的数据有序到达。
std::unordered_map<std::string,RequestorDescribe::ptr> _request_desc;
核心数据结构:
struct RequestorDescribe{ using ptr = std::shared_ptr<RequestorDescribe>; //请求的消息 BaseMessage::ptr request; //请求类型 RType rtype; //异步请求 std::promise<BaseMessage::ptr> response; //请求回调 RequestCallback callback; };通过这个RequestorDescribe来实现消息的同步处理、异步处理、回到处理。
通过设置回调函数onResponse来自动处理响应消息。
void onResponse(const BaseConnection::ptr &conn ,BaseMessage::ptr &msg){ std::string rid = msg->rid(); RequestorDescribe::ptr rdp= getDescribe(rid); if(rdp.get() == nullptr){ ELOG("收到响应- %s ,但是未找到对应的请求描述!",rid.c_str()); return ; } if(rdp->rtype == RType::REQ_ASYNC){ rdp->response.set_value(msg);//异步的结果返回得到后需要调用set_value接口去设置结果,那样客户端就接受到了结果返回值了。 }else if(rdp->rtype == RType::REQ_CALLBACK){ if(rdp->callback){ rdp->callback(msg); } } else{//一般不会出现这种情况。 ELOG("收到了一个请求类型未知的响应!"); } delDescribe(rid);//要把已经处理完毕的响应删除。否则会内存泄漏 }因为Requestor主管消息的处理,所以我直接把send的三种处理同步、异步、回调直接封装到这个类中
异步send使用c++11特性中的promise和future来实现,同步send其实就是异步的send只是我在调用异步send后直接get()等待了,最后就是回调send把回调函数封装到了_request_desc。
5.4.3 rpc_caller(客户端):RPC 调用上层接口封装
- 封装同步 / 异步 / 回调三种 RPC 调用接口,屏蔽底层网络、协议、序列化细节
- 结果解析:将服务端返回的 RpcResponse 解析为 JSON 数据,返回给开发者
这个rpc_caller类就是对于rpc调用的客户端而言的一个接口类具体就是对rpc使用者流程而言的,内部的成员变量只有一个Requestor变量。外部使用者直接调用这个call接口就行了。同理call也有同步、异步、回调三种类型。
具体代码如下:
#pragma once #include"requestor.hpp" namespace bitrpc{ namespace client{ class RpcCaller{ public: using ptr = std::shared_ptr<RpcCaller>; using JsonAsyncResponse = std::future<Json::Value>; using JsonResponseCallback = std::function<void(const Json::Value&)>; RpcCaller(const Requestor::ptr &requestor):_requestor(requestor){} //requestor中的send里边的回调是针对BaseMessage进行处理的 //用于在rpccaller中针对结果的处理事针对RpcResonce里边的result进行的 //同步call bool call(const BaseConnection::ptr &conn,const std::string &method,const Json::Value ¶ms,Json::Value &result){ //1.组织请求 auto req_msg = MessageFactory::create<RpcRequest>(); req_msg->setId(UUID::uuid()); req_msg->setMType(MType::REQ_RPC); req_msg->setMethod(method); req_msg->setParams(params); BaseMessage::ptr rsp_msg; //2.发送请求 bool ret=_requestor->send(conn,std::dynamic_pointer_cast<BaseMessage>(req_msg),rsp_msg);//重载时是严格根据类型去重载,所以是不会有多态的,也就是不会有隐士类型转换的 if(false == ret){ ELOG("同步Rpc请求失败!"); return false; } ELOG("收到响应,进行解析,获取结果!"); //3.等待响应 auto rpc_rsp_msg = std::dynamic_pointer_cast<RpcResponce>(rsp_msg); if(!rpc_rsp_msg){ ELOG("rpc响应,向下类型转换失败!"); return false; } if(rpc_rsp_msg->rcode() != Rcode::RECODE_OK){ ELOG("rpc同步请求出错:%s",errReason(rpc_rsp_msg->rcode())); return false; } result = rpc_rsp_msg->result(); ELOG("结果设置完毕!"); return true; } //异步call bool call(const BaseConnection::ptr &conn,const std::string &method,const Json::Value ¶ms,JsonAsyncResponse &result){ //向服务器发送异步回调请求,设置回调函数,回调函数中会传入一个promise对象,在回调函数中去堆promise设置数据 auto req_msg = MessageFactory::create<RpcRequest>(); req_msg->setId(UUID::uuid()); req_msg->setMType(MType::REQ_RPC); MType(MType::REQ_RPC); req_msg->setMethod(method); req_msg->setParams(params); auto json_promise = std::make_shared<std::promise<Json::Value>>(); result = json_promise->get_future(); Requestor::RequestCallback cb = std::bind(&RpcCaller::Callback,this,json_promise,std::placeholders::_1) //2.发送请求 bool ret=_requestor->send(conn,std::dynamic_pointer_cast<BaseMessage>(req_msg),rsp_msg,cb);//重载时是严格根据类型去重载,所以是不会有多态的,也就是不会有隐士类型转换的 if(false == ret){ ELOG("异步Rpc请求失败!"); return false; } return true; } //回调call bool call(const BaseConnection::ptr &conn,const std::string &method,const Json::Value ¶ms,const JsonResponseCallback &cb){ auto req_msg = MessageFactory::create<RpcRequest>(); req_msg->setId(UUID::uuid()); req_msg->setMType(MType::REQ_RPC); req_msg->setMethod(method); req_msg->setParams(params); Requestor::RequestCallback req_cb = std::bind(&RpcCaller::Callback1,this,cb,std::placeholders::_1); bool ret=_requestor->send(conn,std::dynamic_pointer_cast<BaseMessage>(req_msg),rsp_msg,req_cb); if(false == ret){ ELOG("回调Rpc请求失败!"); return false; } return true; } private: void Callback1(const JsonResponseCallback & cb, const BaseMessage::ptr &msg){ auto rpc_rsp_msg = std::dynamic_pointer_cast<RpcResponce>(rsp_msg); if(!rpc_rsp_msg){ ELOG("rpc响应,向下类型转换失败!"); return false; } if(rpc_rsp_msg->rcode() != Rcode::RECODE_OK){ ELOG("rpc回调请求出错:%s",errReason(rpc_rsp_msg->rcode())); return false; } cb(rpc_rsp_msg->reslut()); } void Callback(std::shared_ptr<std::promise<Json::Value>> result,const BaseMessage::ptr &msg){ auto rpc_rsp_msg = std::dynamic_pointer_cast<RpcResponce>(rsp_msg); if(!rpc_rsp_msg){ ELOG("rpc响应,向下类型转换失败!"); return false; } if(rpc_rsp_msg->rcode() != Rcode::RECODE_OK){ ELOG("rpc异步请求出错:%s",errReason(rpc_rsp_msg->rcode())); return false; } result->set_value(rpc_rsp_msg->result()); } private: Requestor::ptr _requestor; }; } }5.4.4 rpc_registry:服务注册与发现
- 服务端:ProviderManager(管理服务提供者)、DiscovererManager(管理服务发现者)、PDManager(两者的总和),实现服务注册、发现、上下线通知
- 客户端:RegistryClient(服务注册)、DiscoveryClient(服务发现),维护服务提供者地址,支持服务上下线动态感知
- 分布式能力:基于注册中心,实现多个服务提供者的负载均衡(简单轮询)
这里的使用流程是这样的,在RPC的调用者这里我只需要启动RPC客户端以及rpc_registry客户端中的Discover类就行了,在服务提供者这里也需要启动这个rpc_registry客户端,只不过只需要打开这里面的Provider类就行了,把自己提供的服务和自己的主机ip和端口号发给服务端的管理者也就是PDManager类,然后Discover客户端只需要调用服务发现接口,发送给服务端,服务端的PDManager接受到后,直接把自己管理的请求方法和Provider管理集,中请求方法对应的Provider管理集发给客户端就行了。这里客户端中假设发现了多个主机提供方法那么就采取RR轮询的方式去选择调用。
具体服务端代码:
#pragma once #include"../commom/net.hpp" #include"../commom/message.hpp" #include<set> namespace bitrpc{ namespace server{ class ProviderManager{ public: using ptr = std::shared_ptr<ProviderManager>; struct Provider{ using ptr = std::shared_ptr<Provider>; std::mutex _mutex; BaseConnection::ptr conn; Address host; std::vector<std::string> methods; Provider(const BaseConnection:ptr &c,cosnt Address & h): conn(c),host(h){} void appendMethod(const std::string &method){ std::unique_lock<std::mutex> lock(_mutex); method.emplace_back(method); } }; //当一个新的服务提供者进行服务注册的时候调用 Provider::ptr addProvider(const BaseConnection:ptr &c,cosnt Address & h,const std::string &method){ Provider::ptr provider; //查找连接所关联的服务提供者对象,找到则获取,找不到则创建,并建立关联 { std::unique_lock<std::mutex> lock(_mutex); auto it = _conns.find(c); if(it !=_conns.end()){ provider = it->second; }else{ provider = std::make_shared<Provider>(c,h); _conn.insert(std::make_pair(c,provider)) } //method方法的提供主机要多出一个,_provider新增数据 auto &providers = _providers[method]; providers.insert(provider); }//这里加域限定是因为下面的新增是不需要加锁的这里加锁不括起来会影响下面新增 //向服务对象中新增一个所能提供的服务名称 provider->appendMethod(method); } //当一个服务提供者断开连接的时候,获取他的信息--用于进行服务下线通知 Provider::ptr getProvider(const BaseConnection:ptr &c){ std::unique_lock<std::mutex> lock(_mutex); auto it = _conns.find(c); if(it !=_conns.end()){ return it->second; } return Provider::ptr(); } //当一个服务提供者断开连接的时候,删除它的关联信息 void delProvider(const BaseConnection::ptr &c){ std::unique_lock<std::mutex> lock(_mutex); auto it = _conns.find(c); if(it ==_conns.end()){ //当前断开连接的不是一个服务提供者 return; } //如果是提供者,看看提供了是吗服务,从服务者提供信息中删除当前服务提供者 for(auto& method: if->second->methods){ auto &providers = _providers[method]; providers.erase(it->second); } //删除链接域服务提供者的关联关系 _conn.erase(it); } std::vector<Address> methodHosts(const std::string &method){ std::unique_lock<std::mutex> lock(_mutex); auto it = _providers.find(method); if(it == _providers.end()){ return std::vector<Address>(); } std::vector<Address> result; for(auto & provider:it->second){ result.push_back(provider->host); } return result; } private: std::mutex _mutex; std::unordered_map<std::string,std::set<Provider::ptr>> _providers; std::unordered_map<BaseConnection::ptr , Provider::ptr> _conns; }; class DiscoverManager{ public: using ptr = std::shared_ptr<DiscoverManager>; struct Discover{ using ptr = std::shared_ptr<Discover>; std::mutex _mutex; BaseConnection::ptr conn;//发现者关联的客户端连接 std::vector<std::string> methods;//发现过的服务名称 Discover(const BaseConnection::ptr &c):conn(c){} void appendMethod(const std::string & method){ std::unique_lock<std::mutex> lock(_mutex); methods.push_back(method); } }; //当每次客户端进行服务发现的时候新增发现者,新增服务名称 Discover::ptr addDiscover(const BaseConnection::ptr &c,const std::string &method){ Discover::ptr discover; { std::unique_lock<std::mutex> lock(_mutex); auto it =_conns.find(c); if(it != _conns.end()){ //找到了 discover = it->second; }else{ discover = std::make_shared<Discover>(c); _conn.insert(std::make_pair(c,discover)); } auto & discovers = _discovers[method]; discovers.insert(discover); } discover->appendMethod(method); return discover; } //发现客户端断开连接时,找到发现者信息,删除关联数据 void delDiscover(const BaseConnection::ptr &c){ std::unique_lock<std::mutex> lock(_mutex); auto it =_conns.find(c); if(it == _conns.end()){ //当前没有找到连接对应发现者信息,代表客户端不是一个服务发现者 return; } for(auto& method : it->second->methods){ auto discovers = _discovers[method]; discovers.erase(it->second); } _conn.erase(it); } //当有一个新的服务提供者上线,则进行上线通知 void onlineNotify(const std::string &metho,const Address& host){ return notify(method,host,ServiceOptype::SERVICE_ONLINE); } //当有一个服务提供者断开连接,则进行下线通知 void offlineNotify(const std::string &method,const Address& host){ return notify(method,host,ServiceOptype::SERVICE_OFFLINE); } private: void notify(const std::string &metho,const Address& host,ServiceOptype optype){ std::unique_lock<std::mutex> lock(_mutex); auto it = _discovers.find(mehtod); if(it == _discovers.end()){ //代表这个服务没有发现者 return; } auto msg_req = MessgaeFactory::create<ServiceRequest>(); msg_req->setId(UUID::uuid()); msg_req->setMType(MType::REQ_SERVICE); msg_req->setMethod(method); msg_req->setHost(host); msg_req->setOptype(optype); for(auto &discover : it->second){ discover->conn->send(); } } private: std::mutex _mutex; std::unordered_map<std::string,std::set<Discover>> _discovers; std::unordered_map<BaseConnection::ptr , Discover::ptr> _conns; }; class PDManager{ public: using ptr = std::shared_ptr<PDManager>; PDManager(): _providers(std::make_shared<ProviderManager>()), _discovers(std::make_shared<DiscoverManager>()) {} //下面是设置到Dispatched模块里的 void onServiceRequest(const BaseConnection::ptr & conn, const ServiceRequest::ptr &msg){ //服务操作请求:服务注册/服务发现/ Ser optype = msg->optype(); if(optype == ServiceOptype::SERVICE_REGISTRY){ //服务注册: // 1.新增服务提供者; 2.进行服务上线的通知; ILOG("%s:%d 注册服务 %s",msg->host().first.c_str(),msg->host().second,msg->method().c_str()); _providers->addProvider(conn,msg->host(),msg->mehtod()); _discovers->onlineNotify(msg->method(msg->host(),msg->mehtod())); return registryResponse(conn,msg); }else if(optype == ServiceOptype::SERVICE_DISCOVERY){ //服务发现: // 1.新增服务发现者 ILOG("客户端要进行%s 服务发现!",msg->method().c_str()); _discovers->addDiscover(conn,msg->method()); return discoveryResponse(conn,msg); }else{ ELOG("收到服务操作请求,但是操作类型错误!"); return errorResponse(conn,msg); } } void onConnShutdown(const BaseConnection::ptr &conn){ auto provider = _providers->getProvider(conn); if(provider.get() != nullprt){ ILOG("%s:%d 服务下线 %s",provider->host().first.c_str(),provider->host().second); for(auto & method : provider->methods){ _discovers->offlineNotify(method,provider->host()); } _providers->delProvider(conn); } _discovers->delDiscover(conn); } private: void errorResponse(const BaseConnection::ptr &conn,const ServiceRequest::ptr &msg){ auto msg_rsp = MessgaeFactory::create<ServiceResponse>(); msg_rsp->setId(msg->rid()); msg_rsp->setMType(MType::RSP_SERVICE); msg_rsp->setRCode(Rcode::RECODE_INVALID_OPTYPE); msg_rsp->setOptype(ServiceOptype::SERVICE_UNKNOW); conn->send(msg_rsp); } void registryResponse(const BaseConnection::ptr &conn,const ServiceRequest::ptr &msg){ auto msg_rsp = MessgaeFactory::create<ServiceResponse>(); msg_rsp->setId(msg->rid()); msg_rsp->setMType(MType::RSP_SERVICE); msg_rsp->setRCode(Rcode::RECODE_OK); msg_rsp->setOptype(ServiceOptype::SERVICE_REGISTRY); conn->send(msg_rsp); } void discoveryResponse(const BaseConnection::ptr &conn,const ServiceRequest::ptr &msg){ auto msg_rsp = MessgaeFactory::create<ServiceResponse>(); msg_rsp->setId(msg->rid()); msg_rsp->setMType(MType::RSP_SERVICE); msg_rsp->setOptype(ServiceOptype::SERVICE_DISCOVERY); std::vector<Address> hosts = _providers->methodHosts(msg->method()); if(hosts.empty()){ msg_rsp->setRCode(Rcode::RECODE_NOT_FOUND_SERVICE); return conn->send(msg_rsp); } msg_rsp->setRCode(Rcode::RECODE_OK); msg_rsp->setMethod(msg->method()); msg_rsp->setHost(hosts); return conn->send(msg_rsp); } private: ProviderManager::ptr _providers; DiscoverManager::ptr _discovers; }; } }具体客户端代码:
#pragma once #include"requestor.hpp" #include<unordered_set> namespace bitrpc{ namespace client{ class Provider{ public: using ptr = std:: shared_ptr<Provider>; Provider(const Requestor::ptr &requestor):_requestor(requestor){} bool registryMethod(const BaseConnection::ptr &conn,const std::string &method,const Address &host){ auto msg_req = MessgaeFactory::create<ServiceRequest>(); msg_req->setId(UUID::uuid()); msg_req->setMType(MType::REQ_SERVICE); msg_req->setMethod(method); msg_req->setHost(host); msg_req->setOptype(ServiceOptype::SERVICE_REGISTRY); BaseMessage::ptr msg_rsq; bool ret = _requestor->send(conn,msg_req,msg_rsq); if(ret == false){ ELOG("%s 服务注册失败!",method.c_str()); return false; } auto service_rsp = std::dynamic_pointer_cast<ServiceResponse>(msg_rsp); if(service_rsp.get() == nullptr){ ELOG("响应类型向下转换失败!"); return false; } if(service_rsp->rcode() != Rcode::RECODE_OK){ ELOG("服务注册失败,原因:%s" , errReason(service_rsp->rcode())); return false; } return true; } private: Requestor::ptr _requestor; }; class MethodHost{ public: using ptr = std::shared_ptr<methodHost>(); MethodHost():_idx(0){} MethodHost(const std::vecotr<Address> & hosts):_hosts(hosts.begin(),hosts.end()),_idx(0){} void appendHost(const Address& host){ //中途收到了服务上线请求后被调用 std::unique_lock<std::mutex> lock(_mutex); _hosts.push_back(host); } void removeHost(const Address &host){ std::unique_lock<std::mutex> lock(_mutex); for(auto it =_hosts.begin();it != _hosts.end();it++){ if(*it ==host){ _hosts.erase(it); return; } } } Address chooseHost(){ std::unique_lock<std::mutex> lock(_mutex); size_t pos = _idx++ % _hosts.size(); return _hosts[pos]; } bool empty(){ std::unique_lock<std::mutex> lock(_mutex); return _hosts.empty(); } private: std::mutex _mutex; size_t _idx; //选择主机评率高还是上线下线评率高来选择是vector还是哈希 std::vecotr<Address> _hosts; }; class Discover{ public: using OfflineCallback = std::function<void(const Address&)>; using ptr = std::shared_ptr<Discover>(); Discover(const Requestor::ptr &requestor,const OfflineCallback& cb):_requestor(requestor),_offline_callback(cb){} bool serviceDiscovery(const BaseConnection::ptr &conn,const std::string &mehod,Address &host){ { //当前保管的提供者存在,则直接返回地址。 std::unique_lock<std::mutex> lock(_mutex); auto it =_method_hosts.find(method); if( it != _method_hosts.end()){ //找到了 if(it->second->empty() == false){ host = it->second->chooseHost(); return; } } } //当前服务的提供者为空 auto msg_req = MessgaeFactory::cerate<ServiceRequest>(); msg_req->setId(UUID::uuid()); msg_req->setMType(MType::REQ_SERVICE); msg_req->setMethod(method); msg_req->setOptype(ServiceOptype::SERVICE_DISCOVERY); BaseMessage::ptr msg_rsp; bool ret = _requestor->send(conn,msg_req,msg_rsp); if(ret == false){ ELOG("服务发现失败!"); return false; } auto service_rsp = std::dynamic_pointer_cast<ServiceResponse>(msg_rsp); if(!service_rsp){ ELOG("向下响应类型转换失败!"); return false; } if(service_rsp->rcode() != Ecode::RECODE_OK){ ELOG("服务发现失败!%s",errReason(service_rsp->rcode()).c_str()); return false; } //能走到这,代表当前是没有对应的服务提供主机的!! std::unique_lock<std::mutex> lock(_mutex); auto method_host = std::make_shared<MethodHost>(service_rsp->hosts()); if(method_host->empty()){ ELOG("%s 服务发现失败! 没有能够提供服务的主机!",method.c_str()); return false; } host = method_host->chooseHost(); // _method_hosts.insert(std::make_pair(method,method_host)); _method_hosts[method]=method_host;//这样更好上面原本有值会出错,这样会替换,更加符合我们实际的情况 return true; } //这个接口是提供给dispatcher模块进行上线下线请求处理的回调函数。 void onServiceRequest(const BaseConnection::ptr &conn,const ServiceRequest::ptr &msg){ //1.判断是上线还是下线请求,如果都不是那就不用处理了 auto optype = msg->optype(); std::string method = msg->method(); std::unique_lock<std::mutex> lock(_mutex); if(optype == ServiceOptype::SERVICE_ONLINE){ //2.上线请求:找到MethodHOST,向其中新增一个主机地址 auto it = _method_hosts.find(method); if(it == _method_hosts.end()){ auto method_host = std::make_shared<MethodHost>(); method_host->appendHost(msg->host()); _method_hosts[method]=method_host; }else{ it->second->appendHost(msg->host()); } }else if(optype == ServiceOptype::SERVICE_OFFLIEN){ //3.下线请求:周到MethodHOST,从其中删除一个主机地址 auto it = _method_hosts.find(method); if(it == _method_hosts.end()){ return; } //找到了 it->second->removeHost(msg->hsot()); _offline_callback(msg->hsot()); } } private: OfflineCallback _offline_callback; std::mutex _mutex; std::unordered_map<std::string ,Methodhost::ptr> _method_hosts; Requestor::ptr _requestor; }; } }5.4.5 rpc_topic:主题发布与订阅
- 服务端:server::TopicManager(管理主题)、Subscriber(管理订阅者),实现主题的创建 / 删除、订阅 / 取消、消息推送
- 客户端:client::TopicManager (封装主题操作接口),支持主题操作与消息接收回调
- 消息推送:服务端接收发布消息后,推送给所有订阅该主题的客户端
在服务端中通过_topics和_subscribers保存主题和主题的主题订阅者,连接和连接订阅的主题。
std::unordered_map<std::string, Topic::ptr> _topics;
std::unordered_map<BaseConnection::ptr, Subscriber::ptr> _subscribers;
Topic是自定义的一个结构体
struct Topic { using ptr = std::shared_ptr<Topic>; std::mutex _mutex; std::string topic_name; std::unordered_set<Subscriber::ptr> subscribers; //当前主题的订阅者 Topic(const std::string &name) : topic_name(name){} //新增订阅的时候调用 void appendSubscriber(const Subscriber::ptr &subscriber) { std::unique_lock<std::mutex> lock(_mutex); subscribers.insert(subscriber); } //取消订阅 或者 订阅者连接断开 的时候调用 void removeSubscriber(const Subscriber::ptr &subscriber) { std::unique_lock<std::mutex> lock(_mutex); subscribers.erase(subscriber); } //收到消息发布请求的时候调用 void pushMessage(const BaseMessage::ptr &msg) { std::unique_lock<std::mutex> lock(_mutex); for (auto &subscriber : subscribers) { subscriber->conn->send(msg); } } };主要是保存了一个主题的订阅者。以及增删和对订阅者的消息发布操作。
Subscriber是连接着的管理结构体:
struct Subscriber { using ptr = std::shared_ptr<Subscriber>; std::mutex _mutex; BaseConnection::ptr conn; std::unordered_set<std::string> topics;//订阅者所订阅的主题名称 Subscriber(const BaseConnection::ptr &c): conn(c) { } //订阅主题的时候调用 void appendTopic(const std::string &topic_name) { std::unique_lock<std::mutex> lock(_mutex); topics.insert(topic_name); } //主题被删除 或者 取消订阅的时候,调用 void removeTopic(const std::string &topic_name) { std::unique_lock<std::mutex> lock(_mutex); topics.erase(topic_name); } };Subscriber保存的是订阅者所订阅的主题名称。以及增删操作。
设置的回到接口是onTopicRequest——用于主题的增删,主题的订阅/取消订阅,主题消息的发布操作。然后就是订阅者再连接断开时候的回调处理函数onShutdown——主要是把服务端保存的_topics和_subscribers里的信息更新一下。
在客户端这里,保存的是主题映射的回调函数,我接受到主题的消息publish之后我会进行对应的操作,比如就单纯的打印,或者我接收到运算器的主题信息后,我调用discover去进行新运算方法的服务器ip和port然后进行更新我的discover。
std::unordered_map<std::string, SubCallback> _topic_callbacks;
using SubCallback = std::function<void(const std::string &key, const std::string &msg)>;
其中SubCallback中的参数是主题名称和具体发布的消息内容。
5.5 调度器 Dispatcher 实现
- 模板回调封装:基于模板实现通用回调类,支持不同消息类型的回调处理
- 消息类型映射:维护消息类型与回调函数的哈希映射,接收消息后自动分发
- 线程安全:加锁保证映射关系的线程安全,支持高并发场景
通过_handlers:
std::unordered_map<MType, Callback::ptr> _handlers;
来实现六种不同类型消息的回调函数的分发。
其中第二个参数的Callback::ptr设计时考虑了,不同种类的消息回调函数不一样,所以我这里做了一个类似类型擦除的考量。Callback作为基类,再用CallbackT——模版类去继承这个基类,因为CallbackT是模版类,所以我在设置回调函数的时候会把第二个消息类型传进去,从而实现了类型的改变。用 std::function 封装回调函数,让 _handler 能兼容不同形式的可调用对象(普通函数、lambda、成员函数等),实现回调的灵活绑定。两层 “类型擦除”:① 第一层:CallbackT<T>(不同 T)→ Callback 基类(虚函数多态),擦除具体的消息类型 T;② 第二层:std::function 擦除回调函数的具体类型(比如是普通函数还是 lambda)。
六、Json-RPC 框架完整使用示例
6.1 基础 RPC 调用(无服务注册发现)
客户端实例化一个rpc_client中的RpcClient类,传入远程服务器的ip和port,把服务注册发现关闭设为false,然后使用call接口选择同步、异步、回调,三种方式去调用。并且在构造函数中把Requestor中的onResponse设置到Dispatcher中,然后把Dispatcher中的onMessage设置到muduo的回调函数中,通过onMessage的分发来识别是哪一种类型服务的消息(RPC模块的请求/响应、服务注册发现模块的请求/响应、主题发布订阅模块的请求/响应),在call函数中通过在Requestor设置UUID,通过一个UUID的hash映射来确保服务器的响应一一对应我的请求,在发送数据时层层回调到MuduoConnection这个类中把json格式的字段序列化,然后发送数据。
服务端实例化一个rpc_server中的RpcServer类,传入自己的ip和port,把服务注册发现以及服务注册发现服务器的ip和port设为默认(就是禁止),然后把RpcRouter中的onRpcRequest注册到函数调度器Dispatcher中,并设置类型,在RpcRouter类中通过SDecribeFactory工厂去添加新的方法提供RPC调用,通过ServiceDescribe类来进行方法的保证——参数检测、方法名称、实际业务回调函数、返回值类型、参数字段格式的描述。通过ServiceManager来管理多个ServiceDescribe类,通过方法名与ServiceDescribe类的hash映射,我这里采取的是同名方法新的替换旧的,没有做同名方法不同参数的保存。只要有客户端的RPC请求过来,首先会在MuduoServer的OnMessage函数中进行反序列化变成json格式,底层的muduo库就会调用我设置的onRpcRequest回调函数,然后进行RPC调用的参数检测,业务接口的调用,最后返回一个response响应。response函数中再设置好响应消息后调用send函数在层层回调到MuduoConnection进行序列化调用send接口。因为这里的对端绑定的是服务器这个MuduoConnection是服务器管理的客户端连接对象。
6.1 基础RPC调用 + 服务注册发现
客户端分两种,一种是使用RPC调用的客户端,一种是提供RPC调用的客户端,我们分别称为RPC调用客户端和RPC提供客户端。
RPC调用客户端:客户端的流程和基础RPC调用一样。只不过需要在基础的RPC调用上加上服务发现模块也就是DiscoveryClient类,其中Discover会保存方法——提供者的合集的映射关系。通过这个关系保存历史发现的提供的方法和提供者。如果没有这个方法就可以去使用serviceDiscovery接口去主动发现方法,然后把消息发给服务注册和发现的管理服务器,服务器会回显所提供的方法。如果为空代表没有方法提供。
RPC提供客户端:提供者直接实例化一个RegisrtyClient对象,然后使用registryMethod接口设置自己提供的方法和自己的ip和port,注册中心的ip和port可以喝RPC调用的ip和port一样。也可以分开。
服务端:实例化一个RegistryServer对象把PDManager::onServiceRequest回到函数注册到Dispatcher中,在rpc_registry模块中有三个类分别是
- 服务提供者的管理:ProviderManager
- 客户端发现服务的管理:DiscoverManager
- 汇总成一个管理类:PDManager
ProviderManager来管理提供者主机和提供的方法,用两个结构变量:
//方法——>提供者
std::unordered_map<std::string,std::set<Provider::ptr>> _providers;
//连接——>提供者
std::unordered_map<BaseConnection::ptr , Provider::ptr> _conns;
struct Provider{ using ptr = std::shared_ptr<Provider>; std::mutex _mutex; BaseConnection::ptr conn; Address host; std::vector<std::string> methods; Provider(const BaseConnection::ptr &c,const Address & h): conn(c),host(h){} void appendMethod(const std::string &method){ std::unique_lock<std::mutex> lock(_mutex); methods.emplace_back(method); } };使用自定义结构体Provider保存提供者和能提供的方法。
使用两个变量管理的原因是我既要快速返回方法的提供者萌,也需要管理连接对应的提供者,便于我对管理的增删查改。
DiscoverManager来管理哪些方法被客户端发现了,以及对发现连接的管理。为什么这里存方法对发现客户端萌的hash映射呢,主要考量是这是RPC调用,使用方法直接映射主机,便于管理,也符合项目逻辑。
//方法——>发现者
std::unordered_map<std::string,std::set<Discover::ptr>> _discovers;
//连接——>发现者
std::unordered_map<BaseConnection::ptr , Discover::ptr> _conns;
struct Discover{ using ptr = std::shared_ptr<Discover>; std::mutex _mutex; BaseConnection::ptr conn;//发现者关联的客户端连接 std::vector<std::string> methods;//发现过的服务名称 Discover(const BaseConnection::ptr &c):conn(c){} void appendMethod(const std::string & method){ std::unique_lock<std::mutex> lock(_mutex); methods.push_back(method); } }; 使用自定义结构体Discover来管理发现客户端,发现过的服务名称。
PDManager来管理发现和提供者通过onServerReuqest回调函数来区分是服务注册还是服务发现通过ServiceRequest消息的msg->optype()字段。
void onServiceRequest(const BaseConnection::ptr & conn, const ServiceRequest::ptr &msg){ //服务操作请求:服务注册/服务发现/ auto optype = msg->optype(); if(optype == ServiceOptype::SERVICE_REGISTRY){ //服务注册: // 1.新增服务提供者; 2.进行服务上线的通知; ILOG("%s:%d 注册服务 %s",msg->host().first.c_str(),msg->host().second,msg->method().c_str()); _providers->addProvider(conn,msg->host(),msg->method()); _discovers->onlineNotify(msg->method(),msg->host()); return registryResponse(conn,msg); }else if(optype == ServiceOptype::SERVICE_DISCOVERY){ //服务发现: // 1.新增服务发现者 ILOG("客户端要进行%s 服务发现!",msg->method().c_str()); _discovers->addDiscover(conn,msg->method()); return discoveryResponse(conn,msg); }else{ ELOG("收到服务操作请求,但是操作类型错误!"); return errorResponse(conn,msg); } }6.1 基础RPC调用 + 服务注册发现 + 主题发布订阅
主题发布订阅和,基础的RPC调用和服务注册发现是独立的。同时也分为两个客户端,一个服务端管理者两个客户端。两个客户端分别是:订阅者客户端,主题客户端。简单来说是,客户端订阅某个主题,服务端接收到主题订阅请求使用_topics和_subscribers保存新的订阅者客户端,同理主题客户端创建后也可以请求创建自己的主题请求,服务器接受到信息后也是保存新的主题客户端。
//主题——>所有主题订阅者
std::unordered_map<std::string , Topic::ptr> _topics;
//连接——>订阅者
std::unordered_map<BaseConnection::ptr,Subscriber::ptr> _subscribers;
主要通过onTopicRequest回调函数来处理这些逻辑
void onTopicRequest(const BaseConnection::ptr &conn,const TopicRequest::ptr &msg){ TopicOptype topic_optype = msg->optype(); bool ret = true; switch(topic_optype){ //主题的创建 case TopicOptype::TOPIC_CREATE: topicCreate(conn,msg);break; //主题的删除 case TopicOptype::TOPIC_REMOVE: topicRemove(conn,msg);break; //主题的订阅 case TopicOptype::TOPIC_SUBSCRIBE: ret = topicSubscriber(conn,msg);break; //主题的取消订阅 case TopicOptype::TOPIC_CANCEL:topicCancel(conn,msg);break; //主题消息的发布 case TopicOptype::TOPIC_PUBLISH: ret = topicPublish(conn,msg);break; //消息类型错误的响应 default: return errorResponse(conn,msg,Rcode::RECODE_INVALID_OPTYPE); } if(!ret)return errorResponse(conn,msg,Rcode::RECODE_NOT_FOUND_TOPIC); return topicResponse(conn,msg); }这是创建阶段,在主题发布新的消息后,服务端通过msg->optype()字段发现是主题消息的发布,然后把消息发布给所以订阅过这个主题的订阅者客户端。在订阅者客户端中通过自主设置SubCallback回调函数。
std::unordered_map<std::string,SubCallback> _topic_callbacks;
通过_topic_callbacks的主题名字和回调函数的映射来处理。服务端发布消息给订阅者客户端后,订阅者客户端会根据std::string topic_key = msg->topicKey();字段映射对应的回调函数,然后进行回调处理std::string topic_msg = msg->topicMsg();,比如回显topic_msg,或者执行对应的操作,比如我订阅了新闻主题,那么每次消息发过来了我就回显一下,再比如我订阅了电商货物发货主题,我接受到消息后,我就在数据库中把货物变成已发货的状态。
七、框架扩展与优化方向
负载均衡优化
- 基础轮询算法升级:实现源地址 Hash、随机、加权轮询等算法
- 负载感知:实现服务端负载上报系统,基于 CPU / 内存 / 连接数实现动态负载均衡
可靠性提升
- 服务心跳检查:实现服务提供者与注册中心的心跳机制,超时自动下线
- 超时管理:为 RPC 请求添加超时机制,避免客户端无限阻塞
- 重连机制:客户端实现服务端断开后的自动重连,提升框架健壮性
功能扩展
- 发布订阅优化:实现主题的分层、过滤,支持消息持久化,使用WAL日志(追加+IO顺序很快,短期定时销毁)+数据库(长期储存)保证消息持久化。
- 批量调用:支持多个 RPC 请求的批量发送与结果批量返回,提升通信效率
- 异常处理:完善框架的异常捕获与处理机制,提供更友好的错误提示
性能优化
- 序列化优化:支持 Protobuf 序列化,替换 JsonCpp 提升序列化效率
- 内存池:封装内存池,减少框架内的内存分配与释放,提升性能
八、总结
在 JSONRPC 项目的开发与调试过程中,核心感悟可总结为:复杂 RPC 框架的理解核心在于 “分层拆解”——muduo 底层聚焦回调原理,业务层聚焦实际实现逻辑,层层拨开后,整个项目的数据流与逻辑链会清晰可辨。






