RPC魔法揭秘:从原理到BRPC实战,用C++玩转分布式通信

RPC魔法揭秘:从原理到BRPC实战,用C++玩转分布式通信

文章目录

在这里插入图片描述

本篇摘要

本文从RPC核心概念出发,阐释其“透明远程调用”的本质与工作原理,对比主流框架后聚焦百度开源的C++高性能RPC框架BRPC,详解其安装、Echo服务示例代码(含客户端/服务端实现),并延伸介绍基于ETCD的服务注册发现与信道管理封装,完整呈现分布式通信方案落地过程。

一.什么是rpc

简单理解

RPC(远程过程调用)就是让程序调用远程服务器上的功能,像调用本地函数一样简单,不用管网络传输细节。比如你手机App点“下单”,实际是调用电商服务器的“创建订单”功能(client端调用rpc服务函数请求端),RPC帮你隐藏了网络请求、数据打包等复杂操作,开发者只需写一句类似 createOrder() 的代码(rpc服务端某服务),系统自动完成远程调用并返回结果。

核心特点

  1. 透明性:调用远程服务就像调用本地函数,开发者无需关心网络通信细节。
  2. 跨进程/跨机器通信:RPC通常用于不同进程(甚至不同主机)之间的功能调用。
  3. 支持多种协议和传输方式:比如HTTP、TCP、自定义协议等,常用高性能协议如gRPC(基于HTTP/2和Protocol Buffers)。
  4. 常与序列化技术配合:调用参数和返回结果需要序列化成字节流在网络中传输,常用格式如JSON、Protobuf、Thrift等。

RPC 工作原理

  1. 客户端调用:客户端代码中调用一个看似本地的方法(如 userService.getUser(id))。
  2. 代理/Stub:客户端通过一个代理对象(Stub)把调用转换成网络请求,包括方法名、参数等。
  3. 序列化:将方法名、参数等信息序列化成二进制或文本格式(如JSON/Protobuf)。
  4. 网络传输:通过底层网络协议(如TCP/HTTP)把数据发送到服务端。
  5. 服务端处理:服务端接收请求,反序列化得到方法名和参数,找到对应的函数并执行。
  6. 返回结果:服务端将执行结果序列化后通过网络返回给客户端,客户端再反序列化得到最终结果。

常见 RPC 框架

  • gRPC:Google 开源的高性能 RPC 框架,基于 HTTP/2 和 Protocol Buffers,支持多语言。
  • Thrift:Facebook 开源的跨语言 RPC 框架,支持多种传输协议和数据格式。
  • Dubbo:阿里巴巴开源的 Java RPC 框架,广泛用于微服务架构。
  • brpc:百度开源的高性能 C++ RPC 框架,支持多种协议和多线程模型。
  • JSON-RPC / XML-RPC:基于 JSON 或 XML 的轻量级 RPC 协议,常用于 Web 服务。

典型使用场景

  • 微服务架构服务之间通过 RPC 互相调用,实现功能解耦和分布式部署
  • 分布式系统:比如分布式存储、计算任务调度、数据库中间层等。
  • 前后端分离/服务化:后端将核心功能封装成 RPC 服务,供前端、App 或其他服务调用。

简单概括下:

可以理解成在rpc服务中添加一个或者多个服务,然后把rpc服务部署在某个ip+port处运行(这里也可以分散开来部署在不同主机等,只要用户能拿到对应ip+port并且rpc服务正常注册并运行即可);然后客户就可以通过调用rpc某个服务的rpc请求调用对象通过构建对应信道进行向对应rpc服务发起请求然后收到应答等。

二.BRPC介绍

是什么?

BRPC是百度开源的C++专用RPC框架,让不同服务器上的C++程序像调用本地函数一样快速通信,支撑了百度万亿级请求的核心服务。

比gRPC强在哪?

  1. 更快:延迟更低、每秒能处理的请求更多(尤其适合C++高并发场景)。
  2. 更简单专为C++设计,不用学复杂配置,接口更直观
  3. 更灵活:支持多种协议,调试和功能扩展更方便(比如动态调参数)。
  4. 更省事:依赖少,部署简单,和百度工具链直接兼容。

简单说C++项目追求性能和省心,选BRPC;需要多语言支持,用gRPC。

三.基于brpc实现简单的服务调用

上面说了对应的brpc的优点,因此这里可以看出更适合用brpc而不是grpc来实现。

brpc安装教程

安装步骤:

  1. 安装依赖:
sudoapt-getinstall -y git g++ make libssl-dev libprotobuf-dev librocksdb-dev libprotoc-dev protobuf-compiler libleveldb-dev 
  1. 安装 brpc:
git clone https://github.com/apache/brpc.git cd brpc/ mkdir build &&cd build cmake -DCMAKE_INSTALL_PREFIX=/usr .. cmake --build . -j6 makesudomakeinstall

简单实现客户端向brpc服务端口请求服务完成应答过程(以echo回显为例)

首先先说下对应思路:

  • rpc服务端进行对应echo服务注册(通过Protobuf协议为服务端与客户端生成对应类,然后服务端填充对应的echo功能完成添加进入rpc-server中),最后启动对应server,也就是把rpc服务部署在对应ip+port处(即包含对应echo服务功能)。
  • 然后对应客户端通过约定好的Protobuf协议生成的对应调用相关rpc服务的rpc请求接口进行调用(拿对应rpc服务信道进行初始化(rpc服务部署ip+port以及对应选项来初始化对应信道));调用rpc服务请求接口发送对应请求等。
  • rpc服务端收到对应请求就去拿着请求调用用户注册进来的服务函数进行操作,最后填充好答复以及其他相关信息发送回去。
  • 客户端收到对应答复后进行解析拿到结果以及其他信息,然后调用回调函数进行操作或者通过答复结果进行其他处理操作。

下面看下博主总结的关于这个过程的图:

在这里插入图片描述

简单来说分为rpc服务端与rpc客户端(具体操作如下):

  • 1·rpc服务端:关闭默认日志+构建服务对象向rpc-server中添加+启动rpc服务器。
  • 2·rpc客户端:初始化信道+构建并发送rpc请求对象+调用回调恢复(可选)。

测试效果

在这里插入图片描述
  • rpc-server注册完对应的echo服务后启动在这等待。
在这里插入图片描述
  • 服务端接收到客户端的请求进行相关服务处理(echo服务处理函数)。
在这里插入图片描述
  • rpc-client通过特定接口进行rpc请求,之后拿到对应响应答复后异步调用回调进行操作。

代码汇总

对应代码及详细注释:

1.Protobuf用于后续设定对应rpc服务:

syntax="proto3"; package example; option cc_generic_services =true; message EchoRequest { string message =1;} message EchoResponse { string message =1;}// 搭建对应的服务,然后为这个服务实例化出模版echo基类提供用户填充 service EchoService { rpc Echo(EchoRequest)returns(EchoResponse);}

2.makefile(注意这里链接的所有库及顺序):

all :server client server:server.cc echo.pb.cc g++-g -std=c++17 $^-o $@ -L/usr/local/lib -lspdlog -lfmt -letcd-cpp-api -lcpprest -lbrpc -lgflags -lssl -lcrypto -lprotobuf -lleveldb client:client.cc echo.pb.cc g++-g -std=c++17 $^-o $@ -L/usr/local/lib -lspdlog -lfmt -letcd-cpp-api -lcpprest -lbrpc -lgflags -lssl -lcrypto -lprotobuf -lleveldb .PHONY:clean clean: rm -r server client #brpc库依赖glags库,故必须把gflags链接在brpc库后面,否则报错(这里链接的Protobuf库的版本必须和protoc的时候是一样的否则报错) #-L/usr/local/lib 当前没有对应库再去系统默认的地方链接找库 

3.rpc-client::

#include<brpc/channel.h>#include<thread>#include"echo.pb.h"voidclientcallback(brpc::Controller *cntl,::example::EchoResponse *response){//把指针直接给智能指针对象此时直接被智能指针保存地址即可,智能指针就不用在内部new了,出了函数作用自动帮忙把客户端构建对象析构释放: std::unique_ptr<brpc::Controller>cntl_guard(cntl); std::unique_ptr<example::EchoResponse>resp_guard(response);if(cntl->Failed()==true){ std::cout <<"Rpc调用失败:"<< cntl->ErrorText()<< std::endl;return;} std::cout <<"收到响应: "<< response->message()<< std::endl;}intmain(){// 客户端调用rpc接口通过信道与rpc服务端进行通信 brpc::ChannelOptions options; options.connect_timeout_ms =-1;// 连接等待超时时间,-1表示一直等待 options.timeout_ms =-1;// rpc请求等待超时时间,-1表示一直等待 options.max_retry =3;// 请求重试次数 options.protocol ="baidu_std";// 序列化协议,默认使用baidu_std brpc::Channel channel;auto ret = channel.Init("127.0.0.1:8080",&options);if(ret ==-1){ std::cout <<"初始化信道失败!\n";return-1;}// 构建EchoService_Stub对象用于向服务端rpc发请求: example::EchoService_Stub stub(&channel); brpc::Controller *cntl =new brpc::Controller(); example::EchoResponse *rsp =new example::EchoResponse(); example::EchoRequest req; req.set_message("你好rpc!");auto closure = google::protobuf::NewCallback(clientcallback, cntl, rsp);//异步线程出动走closure调用也就是回调函数的处理,主线程收到答复填充完接着往下走://主线程这里收到对应答复进行填充好对应的内容如rsp cntl等然后如果closure不为空就获取对应回调通知然后派异步线程去执行对应clientcallback回调函数处理即可 stub.Echo(cntl,&req, rsp, closure);//向rpcserver发送对应req,然后接收到response填充rsp及cntl,之后再去调用closure这个可调用对象即调用clientcallback std::cout <<"异步调用!\n"; std::this_thread::sleep_for(std::chrono::seconds(3));return0;}

4.rpc-server:

#include<brpc/server.h>#include<butil/logging.h>#include"echo.pb.h"classEchoService:public example::EchoService{voidEcho(google::protobuf::RpcController *controller,const::example::EchoRequest *request,::example::EchoResponse *response,::google::protobuf::Closure *done){// 创建一个 brpc::ClosureGuard对象,用于管理 done的生命周期。当 rpc_guard对象析构时,会自动调用 done,通知框架RPC处理完成。 brpc::ClosureGuard rpc_guard(done);//这里析构的时候调用对应的done->run()构建对应答复信息来通知请求方可以进行自己的回调处理了 std::cout <<"收到消息:"<< request->message()<< std::endl; std::string str = request->message()+"--这是响应!!"; response->set_message(str);}//调用完对应自定义函数echo后由rpc发送对应答复集合。};intmain(){// 关闭brpc默认输出日志: logging::LoggingSettings logset; logset.logging_dest = logging::LoggingDestination::LOG_TO_NONE; logging::InitLogging(logset);// 创建rpc服务器注册对应服务: brpc::Server server; EchoService echo; server.AddService(&echo, brpc::ServiceOwnership::SERVER_DOESNT_OWN_SERVICE);// 添加选项并启动rpc服务: brpc::ServerOptions options; options.idle_timeout_sec =-1;// 连接空闲超时时间-超时后连接被关闭 options.num_threads =1;// io线程数量auto ret = server.Start(8080,&options);//rpc服务端对应的echo服务启动了:也就是收到对应echo的resquest就去调用对应Echo函数完成response构建然后自动发送if(ret ==-1){ std::cout <<"启动服务器失败!\n";return-1;} server.RunUntilAskedToQuit();// 修改等待运行结束return0;}

四.封装每个服务的channels及所有服务管理者

rpc 调用这里的封装,因为不同的服务调用使用的是不同的 Stub,这个封装起来的意义不大,因此这里封装的是每个服务占用的信道集合和管理起来所有服务。

封装思想(需要封装的两大类分别是service-channelsservicesmanager):

1·ServiceChannels:

  • 首先要知道注册中心注册(etcd)注册对应服务是以服务+单例名称+ip-port来注册的,也就是说明一个指定服务可以有多个单例来运行(即多个rpc服务,可以是在不同ip+port处,因此就有了多个信道概念),得出结论是一个服务对应多个服务单例。
  • ServiceChannels就是管理一个服务间所有关于它的实例信道(也就是站在某个服务的所有单例对象的信道管理者角度)。
  • 因此这个类就需要对应信道的增删查等功能,完成对应主机名与服务信道的映射,以及采取RR轮转方式使用对应服务的信道。

2·servicesmanager:

  • 这个其实就是把上面的service-channels管理起来,暴露给外面使用的,并集合对应etcd的watch监控来用,当用户线设定好要关系你的服务后,把上线和下线逻辑函数交给watch进行回调,也就是当有服务上下线后(某个服务的单例)判断是否为当前用户关心来完成对应增加删除对应服务信道而已。
  • 当用户进行选择服务的时候,采取RR轮转选取对应服务的某个信道。

过程图如下:

在这里插入图片描述

实现代码即对应注释(channel.hpp):

#pragmaonce#include<brpc/channel.h>#include<string>#include<vector>#include<unordered_map>#include<mutex>#include"log.hpp"// 使用指针管理对象都::// etcd通知某个服务上线或者下线是通过host:ip+port通知的,然后这边收到就要进行对应的管理,故进行host与对应信道映射。// 管理一个服务间所有关于它的实例信道(也就是站在某个服务的所有单例对象的信道管理者角度)classServiceChannels{public:using Ptr = std::shared_ptr<ServiceChannels>;using channelptr = std::shared_ptr<brpc::Channel>;ServiceChannels(const std::string &name):_service_name(name),_idx(0){}voidAppend(const std::string &host){ std::unique_lock<std::mutex>lock(_mtx);auto it = _hosts.find(host);if(it == _hosts.end()){// 构建host对应的信道进行初始化: std::shared_ptr<brpc::Channel> pchannel = std::make_shared<brpc::Channel>(); brpc::ChannelOptions options; options.connect_timeout_ms =-1; options.timeout_ms =-1; options.max_retry =3; options.protocol ="baidu_std";auto ok = pchannel->Init(host.c_str(),&options);if(ok ==-1){LOG_ERROR("初始化{}-{}信道失败!", _service_name, host);return;}// 完成信道这边的管理: _hosts[host]= pchannel; _channels.push_back(pchannel);}}voidRemove(const std::string &host){ std::unique_lock<std::mutex>lock(_mtx);auto it = _hosts.find(host);if(it == _hosts.end()){LOG_WARN("{}-{}节点删除信道时,没有找到信道信息!", _service_name, host);return;}for(auto vit = _channels.begin(); vit != _channels.end(); vit++){if(*vit == it->second){ _channels.erase(vit);break;}} _hosts.erase(host);} channelptr Choose(){ std::unique_lock<std::mutex>lock(_mtx);if(!_channels.size())returnchannelptr();// 存在进行轮询输出:int32_t index = _idx++% _channels.size();return _channels[index];}private: std::mutex _mtx; std::string _service_name; std::unordered_map<std::string, channelptr> _hosts;// 主机名(ip+port)与brpc信道之间映射关系 std::vector<channelptr> _channels;// 维护所有的channelint32_t _idx;// 用于RR轮转策略};// 所有服务的管理者,负责接收到etcd服务通知进行对应注册删除服务(或者为服务注册单例对象也就是信道等)等classServiceManager{public:using Ptr = std::shared_ptr<ServiceManager>;ServiceManager(){}// 先声明,我关注哪些服务的上下线,不关心的就不需要管理了voidCared(const std::string &service_name){ std::unique_lock<std::mutex>lock(_mtx); _care_services.insert(service_name);}// etcd监视的回调函数voidOnlineService(const std::string &service_instance_name,const std::string &host){const std::string &service_name =GetService(service_instance_name);auto service =ServiceChannels::Ptr();// 空的服务信道集{ std::unique_lock<std::mutex>lock(_mtx);auto fit = _care_services.find(service_name);if(fit == _care_services.end()){LOG_DEBUG("{}-{} 服务上线了,但是当前并不关心!", service_name, host);return;}// 说明关心对应服务,如果服务存在就添加对应信道即可,不存在就创建服务再添加对应信道:auto sit = _services.find(service_name);if(sit == _services.end()){ service = std::make_shared<ServiceChannels>(service_name); _services.insert(std::make_pair(service_name, service));}else{ service = sit->second;}}if(!service){LOG_ERROR("新增 {} 服务管理节点失败!", service_name);return;} service->Append(host);LOG_DEBUG("{}-{} 服务上线新节点,进行添加管理!", service_name, host);}voidUnonlineService(const std::string &service_instance_name,const std::string &host){const std::string &service_name =GetService(service_instance_name);auto service =ServiceChannels::Ptr();// 空的服务信道集{ std::unique_lock<std::mutex>lock(_mtx);auto fit = _care_services.find(service_name);if(fit == _care_services.end()){LOG_DEBUG("{}-{} 服务下线了,但是当前并不关心!", service_name, host);return;}// 从服务集合中查找进行删除:auto sit = _services.find(service_name);if(sit == _services.end()){LOG_WARN("删除{}服务节点时,没有找到管理对象", service_name);return;}else{ service = sit->second;}} service->Remove(host);LOG_DEBUG("{}-{} 服务下线节点,进行删除管理!", service_name, host);} ServiceChannels::channelptr ChooseService(const std::string &servicename){ std::unique_lock<std::mutex>lock(_mtx);auto sit = _services.find(servicename);if(sit == _services.end()){LOG_ERROR("当前没有能够提供 {} 服务的节点!", servicename);returnServiceChannels::channelptr();}return sit->second->Choose();}private:const std::string GetService(const std::string &service_instance_name){auto pos = service_instance_name.find_last_of('/');if(pos == std::string::npos)return service_instance_name;// 一般来说不会找不到,注册进去的都是包含实例return service_instance_name.substr(0, pos);} std::mutex _mtx; std::unordered_map<std::string, ServiceChannels::Ptr> _services;// 添加进来的服务都是与业务有关的,也就是关心的 std::unordered_set<std::string> _care_services;};

五.基于etcd实现服务上下线监控来完成brpc服务调用

上面封装好了对应的channel.hpp,下面就使用它结合之前封装的etcd.hpp以及复用下brpc的简单echo服务组合起来使用测试下( 说白了其实就是brpc的echo服务的server与client端套用下然后结合封装etcd代码以及改变下获取channel方式而已。)。

说下大致思路:

  • 实现register与discovery两个程序也就是对应的添加rpc服务启动rpc-server+etcd注册echo服务借助servicesmanager获取对应服务信道+进行rpc服务请求发送

逻辑过程图:

在这里插入图片描述

测试效果

在这里插入图片描述
  • 一开始启动对应的rpc客户端,发现etcd中没有关心的服务即对应实例,只能看到之前在etcd服务器中注册的其他服务,但是是不相关的故不进行调用。
在这里插入图片描述
  • 此时启动对应rpc-server也就是给rpc服务器添加对应服务并部署,给etcd注册进去对应服务后开始运行等待。
在这里插入图片描述
  • 此时服务端收到对应的rpc客户端发来的请求,然后进行调用echo服务进行构建答复发送回去。
在这里插入图片描述
  • 客户端收到服务端发送来的答复进行解析处理。

代码汇总

https://gitee.com/etcd_brpc(点我跳转)

六.本篇小结

RPC通过隐藏网络细节实现跨进程调用,BRPC凭借高性能与C++友好性成为优选;本文从理论到实践,结合ETCD服务治理,为分布式系统通信提供了可落地的参考路径。

Read more

Flutter for OpenHarmony:Flutter 三方库 os_detect — 精准洞察鸿蒙系统的底层脉络(适配鸿蒙 HarmonyOS Next ohos)

Flutter for OpenHarmony:Flutter 三方库 os_detect — 精准洞察鸿蒙系统的底层脉络(适配鸿蒙 HarmonyOS Next ohos)

欢迎加入开源鸿蒙跨平台社区:https://openharmonycrossplatform.ZEEKLOG.net。 Flutter for OpenHarmony:Flutter 三方库 os_detect — 精准洞察鸿蒙系统的底层脉络(适配鸿蒙 HarmonyOS Next ohos) 在进行 Flutter for OpenHarmony 跨平台开发时,我们经常需要处理“差异化”的需求。有的功能可能只在真正的 OpenHarmony 原生环境下运行(如特定的 N-API 调用),而在 Web 或其他桌面模拟器环境下则需要进行降级处理。 传统的 Platform.isAndroid 或 kIsWeb 在处理日渐复杂的鸿蒙生态环境时,往往显得力不从心。os_detect 库提供了一套更轻量、更可靠的系统环境感知方案,能帮助我们精准识别应用正跑在哪个“灵魂”之下。 一、为什么需要系统环境检测?

By Ne0inhk
Flutter for OpenHarmony:dio_cookie_manager 让 Dio 发挥会话管理能力,像浏览器一样自动处理 Cookie 深度解析与鸿蒙适配指南

Flutter for OpenHarmony:dio_cookie_manager 让 Dio 发挥会话管理能力,像浏览器一样自动处理 Cookie 深度解析与鸿蒙适配指南

欢迎加入开源鸿蒙跨平台社区:https://openharmonycrossplatform.ZEEKLOG.net 前言 在移动端开发中,我们通常使用 JWT (Authorization Header) 进行身份验证。但如果你的后端是基于 Session/Cookie 的老系统(如 PHP/Java JSP),或者你需要对接网页爬虫,那么 Cookie 的管理就变得至关重要。 dio 本身是不存储 Cookie 的。dio_cookie_manager 是一个官方推荐的拦截器,它结合 cookie_jar 库,能自动从响应头提取 Set-Cookie,并在下次请求时带上 Cookie,完全模拟浏览器的行为。 一、概念介绍/原理解析 1.1 基础概念 * CookieManager: Dio 的拦截器,

By Ne0inhk
Flutter 第三方库 country_list 的鸿蒙适配之路 - 全球国家数据、旗帜 Emoji 与区号国际化展示实战

Flutter 第三方库 country_list 的鸿蒙适配之路 - 全球国家数据、旗帜 Emoji 与区号国际化展示实战

欢迎加入开源鸿蒙跨平台社区:https://openharmonycrossplatform.ZEEKLOG.net Flutter 第三方库 country_list 的鸿蒙适配之路 - 全球国家数据、旗帜 Emoji 与区号国际化展示实战 前言 在全球化的今天,无论你的 App 是社交、电商还是金融,都绕不开“国家/地区选择”这一基础功能。无论是获取用户的国际区号,还是根据地理位置展示国家旗帜,都需要一套完整、准确且实时更新的数据集。 country_list 库是一个极简、高效的 Flutter 工具,它内置了 ISO 国家代码、电话前缀、甚至是符合 Unicode 标准的旗帜表情。 在 OpenHarmony 系统的生态建设中,国际化(i18n)是不可忽视的一环。本文将深入探讨如何将 country_

By Ne0inhk
Flutter 组件 edwards25519 适配鸿蒙 HarmonyOS 实战:高性能椭圆曲线加密,构建军工级身份验签与端侧隐私安全保护架构

Flutter 组件 edwards25519 适配鸿蒙 HarmonyOS 实战:高性能椭圆曲线加密,构建军工级身份验签与端侧隐私安全保护架构

欢迎加入开源鸿蒙跨平台社区:https://openharmonycrossplatform.ZEEKLOG.net Flutter 组件 edwards25519 适配鸿蒙 HarmonyOS 实战:高性能椭圆曲线加密,构建军工级身份验签与端侧隐私安全保护架构 前言 在鸿蒙(OpenHarmony)生态迈向全球化商业安全、涉及极高价值的金融支付验证、去中心化身份(DID)认证及严苛的 IoT 设备通信加密背景下,如何实现一套既能对抗高强度暴力破解、又能在资源受限的微型设备上保持瞬间验签能力的“底层密码学基座”,已成为决定应用数字主权与信任深度的基石。在鸿蒙设备这类强调 AOT 安全加固与分布式可信执行环境(TEE)协同的场景下,如果应用依然依赖陈旧的 RSA 或性能底下的加密库,由于由于算力消耗的非线性增长,极易由于由于“签名验证阻塞”导致鸿蒙应用的任务流发生严重延迟。 我们需要一种能够提供最高安全能效比(Security-per-Watt)、支持 Edwards 曲线算法(Ed25519)且具备抗侧信道攻击特性的密码学工具。 edwards25519 为 Flutte

By Ne0inhk