RPC魔法揭秘:从原理到BRPC实战,用C++玩转分布式通信

RPC魔法揭秘:从原理到BRPC实战,用C++玩转分布式通信

文章目录

在这里插入图片描述

本篇摘要

本文从RPC核心概念出发,阐释其“透明远程调用”的本质与工作原理,对比主流框架后聚焦百度开源的C++高性能RPC框架BRPC,详解其安装、Echo服务示例代码(含客户端/服务端实现),并延伸介绍基于ETCD的服务注册发现与信道管理封装,完整呈现分布式通信方案落地过程。

一.什么是rpc

简单理解

RPC(远程过程调用)就是让程序调用远程服务器上的功能,像调用本地函数一样简单,不用管网络传输细节。比如你手机App点“下单”,实际是调用电商服务器的“创建订单”功能(client端调用rpc服务函数请求端),RPC帮你隐藏了网络请求、数据打包等复杂操作,开发者只需写一句类似 createOrder() 的代码(rpc服务端某服务),系统自动完成远程调用并返回结果。

核心特点

  1. 透明性:调用远程服务就像调用本地函数,开发者无需关心网络通信细节。
  2. 跨进程/跨机器通信:RPC通常用于不同进程(甚至不同主机)之间的功能调用。
  3. 支持多种协议和传输方式:比如HTTP、TCP、自定义协议等,常用高性能协议如gRPC(基于HTTP/2和Protocol Buffers)。
  4. 常与序列化技术配合:调用参数和返回结果需要序列化成字节流在网络中传输,常用格式如JSON、Protobuf、Thrift等。

RPC 工作原理

  1. 客户端调用:客户端代码中调用一个看似本地的方法(如 userService.getUser(id))。
  2. 代理/Stub:客户端通过一个代理对象(Stub)把调用转换成网络请求,包括方法名、参数等。
  3. 序列化:将方法名、参数等信息序列化成二进制或文本格式(如JSON/Protobuf)。
  4. 网络传输:通过底层网络协议(如TCP/HTTP)把数据发送到服务端。
  5. 服务端处理:服务端接收请求,反序列化得到方法名和参数,找到对应的函数并执行。
  6. 返回结果:服务端将执行结果序列化后通过网络返回给客户端,客户端再反序列化得到最终结果。

常见 RPC 框架

  • gRPC:Google 开源的高性能 RPC 框架,基于 HTTP/2 和 Protocol Buffers,支持多语言。
  • Thrift:Facebook 开源的跨语言 RPC 框架,支持多种传输协议和数据格式。
  • Dubbo:阿里巴巴开源的 Java RPC 框架,广泛用于微服务架构。
  • brpc:百度开源的高性能 C++ RPC 框架,支持多种协议和多线程模型。
  • JSON-RPC / XML-RPC:基于 JSON 或 XML 的轻量级 RPC 协议,常用于 Web 服务。

典型使用场景

  • 微服务架构服务之间通过 RPC 互相调用,实现功能解耦和分布式部署
  • 分布式系统:比如分布式存储、计算任务调度、数据库中间层等。
  • 前后端分离/服务化:后端将核心功能封装成 RPC 服务,供前端、App 或其他服务调用。

简单概括下:

可以理解成在rpc服务中添加一个或者多个服务,然后把rpc服务部署在某个ip+port处运行(这里也可以分散开来部署在不同主机等,只要用户能拿到对应ip+port并且rpc服务正常注册并运行即可);然后客户就可以通过调用rpc某个服务的rpc请求调用对象通过构建对应信道进行向对应rpc服务发起请求然后收到应答等。

二.BRPC介绍

是什么?

BRPC是百度开源的C++专用RPC框架,让不同服务器上的C++程序像调用本地函数一样快速通信,支撑了百度万亿级请求的核心服务。

比gRPC强在哪?

  1. 更快:延迟更低、每秒能处理的请求更多(尤其适合C++高并发场景)。
  2. 更简单专为C++设计,不用学复杂配置,接口更直观
  3. 更灵活:支持多种协议,调试和功能扩展更方便(比如动态调参数)。
  4. 更省事:依赖少,部署简单,和百度工具链直接兼容。

简单说C++项目追求性能和省心,选BRPC;需要多语言支持,用gRPC。

三.基于brpc实现简单的服务调用

上面说了对应的brpc的优点,因此这里可以看出更适合用brpc而不是grpc来实现。

brpc安装教程

安装步骤:

  1. 安装依赖:
sudoapt-getinstall -y git g++ make libssl-dev libprotobuf-dev librocksdb-dev libprotoc-dev protobuf-compiler libleveldb-dev 
  1. 安装 brpc:
git clone https://github.com/apache/brpc.git cd brpc/ mkdir build &&cd build cmake -DCMAKE_INSTALL_PREFIX=/usr .. cmake --build . -j6 makesudomakeinstall

简单实现客户端向brpc服务端口请求服务完成应答过程(以echo回显为例)

首先先说下对应思路:

  • rpc服务端进行对应echo服务注册(通过Protobuf协议为服务端与客户端生成对应类,然后服务端填充对应的echo功能完成添加进入rpc-server中),最后启动对应server,也就是把rpc服务部署在对应ip+port处(即包含对应echo服务功能)。
  • 然后对应客户端通过约定好的Protobuf协议生成的对应调用相关rpc服务的rpc请求接口进行调用(拿对应rpc服务信道进行初始化(rpc服务部署ip+port以及对应选项来初始化对应信道));调用rpc服务请求接口发送对应请求等。
  • rpc服务端收到对应请求就去拿着请求调用用户注册进来的服务函数进行操作,最后填充好答复以及其他相关信息发送回去。
  • 客户端收到对应答复后进行解析拿到结果以及其他信息,然后调用回调函数进行操作或者通过答复结果进行其他处理操作。

下面看下博主总结的关于这个过程的图:

在这里插入图片描述

简单来说分为rpc服务端与rpc客户端(具体操作如下):

  • 1·rpc服务端:关闭默认日志+构建服务对象向rpc-server中添加+启动rpc服务器。
  • 2·rpc客户端:初始化信道+构建并发送rpc请求对象+调用回调恢复(可选)。

测试效果

在这里插入图片描述
  • rpc-server注册完对应的echo服务后启动在这等待。
在这里插入图片描述
  • 服务端接收到客户端的请求进行相关服务处理(echo服务处理函数)。
在这里插入图片描述
  • rpc-client通过特定接口进行rpc请求,之后拿到对应响应答复后异步调用回调进行操作。

代码汇总

对应代码及详细注释:

1.Protobuf用于后续设定对应rpc服务:

syntax="proto3"; package example; option cc_generic_services =true; message EchoRequest { string message =1;} message EchoResponse { string message =1;}// 搭建对应的服务,然后为这个服务实例化出模版echo基类提供用户填充 service EchoService { rpc Echo(EchoRequest)returns(EchoResponse);}

2.makefile(注意这里链接的所有库及顺序):

all :server client server:server.cc echo.pb.cc g++-g -std=c++17 $^-o $@ -L/usr/local/lib -lspdlog -lfmt -letcd-cpp-api -lcpprest -lbrpc -lgflags -lssl -lcrypto -lprotobuf -lleveldb client:client.cc echo.pb.cc g++-g -std=c++17 $^-o $@ -L/usr/local/lib -lspdlog -lfmt -letcd-cpp-api -lcpprest -lbrpc -lgflags -lssl -lcrypto -lprotobuf -lleveldb .PHONY:clean clean: rm -r server client #brpc库依赖glags库,故必须把gflags链接在brpc库后面,否则报错(这里链接的Protobuf库的版本必须和protoc的时候是一样的否则报错) #-L/usr/local/lib 当前没有对应库再去系统默认的地方链接找库 

3.rpc-client::

#include<brpc/channel.h>#include<thread>#include"echo.pb.h"voidclientcallback(brpc::Controller *cntl,::example::EchoResponse *response){//把指针直接给智能指针对象此时直接被智能指针保存地址即可,智能指针就不用在内部new了,出了函数作用自动帮忙把客户端构建对象析构释放: std::unique_ptr<brpc::Controller>cntl_guard(cntl); std::unique_ptr<example::EchoResponse>resp_guard(response);if(cntl->Failed()==true){ std::cout <<"Rpc调用失败:"<< cntl->ErrorText()<< std::endl;return;} std::cout <<"收到响应: "<< response->message()<< std::endl;}intmain(){// 客户端调用rpc接口通过信道与rpc服务端进行通信 brpc::ChannelOptions options; options.connect_timeout_ms =-1;// 连接等待超时时间,-1表示一直等待 options.timeout_ms =-1;// rpc请求等待超时时间,-1表示一直等待 options.max_retry =3;// 请求重试次数 options.protocol ="baidu_std";// 序列化协议,默认使用baidu_std brpc::Channel channel;auto ret = channel.Init("127.0.0.1:8080",&options);if(ret ==-1){ std::cout <<"初始化信道失败!\n";return-1;}// 构建EchoService_Stub对象用于向服务端rpc发请求: example::EchoService_Stub stub(&channel); brpc::Controller *cntl =new brpc::Controller(); example::EchoResponse *rsp =new example::EchoResponse(); example::EchoRequest req; req.set_message("你好rpc!");auto closure = google::protobuf::NewCallback(clientcallback, cntl, rsp);//异步线程出动走closure调用也就是回调函数的处理,主线程收到答复填充完接着往下走://主线程这里收到对应答复进行填充好对应的内容如rsp cntl等然后如果closure不为空就获取对应回调通知然后派异步线程去执行对应clientcallback回调函数处理即可 stub.Echo(cntl,&req, rsp, closure);//向rpcserver发送对应req,然后接收到response填充rsp及cntl,之后再去调用closure这个可调用对象即调用clientcallback std::cout <<"异步调用!\n"; std::this_thread::sleep_for(std::chrono::seconds(3));return0;}

4.rpc-server:

#include<brpc/server.h>#include<butil/logging.h>#include"echo.pb.h"classEchoService:public example::EchoService{voidEcho(google::protobuf::RpcController *controller,const::example::EchoRequest *request,::example::EchoResponse *response,::google::protobuf::Closure *done){// 创建一个 brpc::ClosureGuard对象,用于管理 done的生命周期。当 rpc_guard对象析构时,会自动调用 done,通知框架RPC处理完成。 brpc::ClosureGuard rpc_guard(done);//这里析构的时候调用对应的done->run()构建对应答复信息来通知请求方可以进行自己的回调处理了 std::cout <<"收到消息:"<< request->message()<< std::endl; std::string str = request->message()+"--这是响应!!"; response->set_message(str);}//调用完对应自定义函数echo后由rpc发送对应答复集合。};intmain(){// 关闭brpc默认输出日志: logging::LoggingSettings logset; logset.logging_dest = logging::LoggingDestination::LOG_TO_NONE; logging::InitLogging(logset);// 创建rpc服务器注册对应服务: brpc::Server server; EchoService echo; server.AddService(&echo, brpc::ServiceOwnership::SERVER_DOESNT_OWN_SERVICE);// 添加选项并启动rpc服务: brpc::ServerOptions options; options.idle_timeout_sec =-1;// 连接空闲超时时间-超时后连接被关闭 options.num_threads =1;// io线程数量auto ret = server.Start(8080,&options);//rpc服务端对应的echo服务启动了:也就是收到对应echo的resquest就去调用对应Echo函数完成response构建然后自动发送if(ret ==-1){ std::cout <<"启动服务器失败!\n";return-1;} server.RunUntilAskedToQuit();// 修改等待运行结束return0;}

四.封装每个服务的channels及所有服务管理者

rpc 调用这里的封装,因为不同的服务调用使用的是不同的 Stub,这个封装起来的意义不大,因此这里封装的是每个服务占用的信道集合和管理起来所有服务。

封装思想(需要封装的两大类分别是service-channelsservicesmanager):

1·ServiceChannels:

  • 首先要知道注册中心注册(etcd)注册对应服务是以服务+单例名称+ip-port来注册的,也就是说明一个指定服务可以有多个单例来运行(即多个rpc服务,可以是在不同ip+port处,因此就有了多个信道概念),得出结论是一个服务对应多个服务单例。
  • ServiceChannels就是管理一个服务间所有关于它的实例信道(也就是站在某个服务的所有单例对象的信道管理者角度)。
  • 因此这个类就需要对应信道的增删查等功能,完成对应主机名与服务信道的映射,以及采取RR轮转方式使用对应服务的信道。

2·servicesmanager:

  • 这个其实就是把上面的service-channels管理起来,暴露给外面使用的,并集合对应etcd的watch监控来用,当用户线设定好要关系你的服务后,把上线和下线逻辑函数交给watch进行回调,也就是当有服务上下线后(某个服务的单例)判断是否为当前用户关心来完成对应增加删除对应服务信道而已。
  • 当用户进行选择服务的时候,采取RR轮转选取对应服务的某个信道。

过程图如下:

在这里插入图片描述

实现代码即对应注释(channel.hpp):

#pragmaonce#include<brpc/channel.h>#include<string>#include<vector>#include<unordered_map>#include<mutex>#include"log.hpp"// 使用指针管理对象都::// etcd通知某个服务上线或者下线是通过host:ip+port通知的,然后这边收到就要进行对应的管理,故进行host与对应信道映射。// 管理一个服务间所有关于它的实例信道(也就是站在某个服务的所有单例对象的信道管理者角度)classServiceChannels{public:using Ptr = std::shared_ptr<ServiceChannels>;using channelptr = std::shared_ptr<brpc::Channel>;ServiceChannels(const std::string &name):_service_name(name),_idx(0){}voidAppend(const std::string &host){ std::unique_lock<std::mutex>lock(_mtx);auto it = _hosts.find(host);if(it == _hosts.end()){// 构建host对应的信道进行初始化: std::shared_ptr<brpc::Channel> pchannel = std::make_shared<brpc::Channel>(); brpc::ChannelOptions options; options.connect_timeout_ms =-1; options.timeout_ms =-1; options.max_retry =3; options.protocol ="baidu_std";auto ok = pchannel->Init(host.c_str(),&options);if(ok ==-1){LOG_ERROR("初始化{}-{}信道失败!", _service_name, host);return;}// 完成信道这边的管理: _hosts[host]= pchannel; _channels.push_back(pchannel);}}voidRemove(const std::string &host){ std::unique_lock<std::mutex>lock(_mtx);auto it = _hosts.find(host);if(it == _hosts.end()){LOG_WARN("{}-{}节点删除信道时,没有找到信道信息!", _service_name, host);return;}for(auto vit = _channels.begin(); vit != _channels.end(); vit++){if(*vit == it->second){ _channels.erase(vit);break;}} _hosts.erase(host);} channelptr Choose(){ std::unique_lock<std::mutex>lock(_mtx);if(!_channels.size())returnchannelptr();// 存在进行轮询输出:int32_t index = _idx++% _channels.size();return _channels[index];}private: std::mutex _mtx; std::string _service_name; std::unordered_map<std::string, channelptr> _hosts;// 主机名(ip+port)与brpc信道之间映射关系 std::vector<channelptr> _channels;// 维护所有的channelint32_t _idx;// 用于RR轮转策略};// 所有服务的管理者,负责接收到etcd服务通知进行对应注册删除服务(或者为服务注册单例对象也就是信道等)等classServiceManager{public:using Ptr = std::shared_ptr<ServiceManager>;ServiceManager(){}// 先声明,我关注哪些服务的上下线,不关心的就不需要管理了voidCared(const std::string &service_name){ std::unique_lock<std::mutex>lock(_mtx); _care_services.insert(service_name);}// etcd监视的回调函数voidOnlineService(const std::string &service_instance_name,const std::string &host){const std::string &service_name =GetService(service_instance_name);auto service =ServiceChannels::Ptr();// 空的服务信道集{ std::unique_lock<std::mutex>lock(_mtx);auto fit = _care_services.find(service_name);if(fit == _care_services.end()){LOG_DEBUG("{}-{} 服务上线了,但是当前并不关心!", service_name, host);return;}// 说明关心对应服务,如果服务存在就添加对应信道即可,不存在就创建服务再添加对应信道:auto sit = _services.find(service_name);if(sit == _services.end()){ service = std::make_shared<ServiceChannels>(service_name); _services.insert(std::make_pair(service_name, service));}else{ service = sit->second;}}if(!service){LOG_ERROR("新增 {} 服务管理节点失败!", service_name);return;} service->Append(host);LOG_DEBUG("{}-{} 服务上线新节点,进行添加管理!", service_name, host);}voidUnonlineService(const std::string &service_instance_name,const std::string &host){const std::string &service_name =GetService(service_instance_name);auto service =ServiceChannels::Ptr();// 空的服务信道集{ std::unique_lock<std::mutex>lock(_mtx);auto fit = _care_services.find(service_name);if(fit == _care_services.end()){LOG_DEBUG("{}-{} 服务下线了,但是当前并不关心!", service_name, host);return;}// 从服务集合中查找进行删除:auto sit = _services.find(service_name);if(sit == _services.end()){LOG_WARN("删除{}服务节点时,没有找到管理对象", service_name);return;}else{ service = sit->second;}} service->Remove(host);LOG_DEBUG("{}-{} 服务下线节点,进行删除管理!", service_name, host);} ServiceChannels::channelptr ChooseService(const std::string &servicename){ std::unique_lock<std::mutex>lock(_mtx);auto sit = _services.find(servicename);if(sit == _services.end()){LOG_ERROR("当前没有能够提供 {} 服务的节点!", servicename);returnServiceChannels::channelptr();}return sit->second->Choose();}private:const std::string GetService(const std::string &service_instance_name){auto pos = service_instance_name.find_last_of('/');if(pos == std::string::npos)return service_instance_name;// 一般来说不会找不到,注册进去的都是包含实例return service_instance_name.substr(0, pos);} std::mutex _mtx; std::unordered_map<std::string, ServiceChannels::Ptr> _services;// 添加进来的服务都是与业务有关的,也就是关心的 std::unordered_set<std::string> _care_services;};

五.基于etcd实现服务上下线监控来完成brpc服务调用

上面封装好了对应的channel.hpp,下面就使用它结合之前封装的etcd.hpp以及复用下brpc的简单echo服务组合起来使用测试下( 说白了其实就是brpc的echo服务的server与client端套用下然后结合封装etcd代码以及改变下获取channel方式而已。)。

说下大致思路:

  • 实现register与discovery两个程序也就是对应的添加rpc服务启动rpc-server+etcd注册echo服务借助servicesmanager获取对应服务信道+进行rpc服务请求发送

逻辑过程图:

在这里插入图片描述

测试效果

在这里插入图片描述
  • 一开始启动对应的rpc客户端,发现etcd中没有关心的服务即对应实例,只能看到之前在etcd服务器中注册的其他服务,但是是不相关的故不进行调用。
在这里插入图片描述
  • 此时启动对应rpc-server也就是给rpc服务器添加对应服务并部署,给etcd注册进去对应服务后开始运行等待。
在这里插入图片描述
  • 此时服务端收到对应的rpc客户端发来的请求,然后进行调用echo服务进行构建答复发送回去。
在这里插入图片描述
  • 客户端收到服务端发送来的答复进行解析处理。

代码汇总

https://gitee.com/etcd_brpc(点我跳转)

六.本篇小结

RPC通过隐藏网络细节实现跨进程调用,BRPC凭借高性能与C++友好性成为优选;本文从理论到实践,结合ETCD服务治理,为分布式系统通信提供了可落地的参考路径。

Read more

飞算JavaAI:专为Java开发者打造的智能编程革命

飞算JavaAI:专为Java开发者打造的智能编程革命

目录 * 飞算JavaAI:专为Java开发者打造的智能编程革命 * 一、前言与背景 * 二、什么是飞算JavaAI? * 主要特点 * 三、安装与配置 * 1. 从IDEA插件市场安装 * 2. 离线安装(适用于内网环境) * 3. 配置与激活 * 四、核心功能与使用体验 * 1. 智能开发全流程引导 * (1) 合并项目场景下的智能引导 * (2) 一键生成完整工程代码 * 2. 其他实用功能 * (1) Java Chat:你的24小时编程顾问 * (2) 智能问答:快速解决疑难杂症 * (3) SQL Chat:自然语言转高效查询 * 五、与主流AI编程助手对比 * 六、个人体验与建议 * 结语 飞算JavaAI:专为Java开发者打造的智能编程革命 一、前言与背景 在当今快节奏的软件开发领域,效率和质量成为了开发者面临的双重挑战。作为一名长期奋战在Java开发一线的程序员,

By Ne0inhk

AgentScope Java多智能体框架

1. 技术架构与功能介绍 AgentScope Java 的核心设计理念是 “Agent-Oriented Programming” (面向智能体编程)。 核心功能 * ReAct 范式驱动:内置推理-行动(Reasoning-Acting)循环,智能体能自主规划步骤并调用工具。 * 响应式内核:基于 Project Reactor (Mono/Flux),天然支持非阻塞 I/O,适合处理高并发的 Agent 请求。 * 人类在环 (HITL):支持随时暂停 Agent 执行,接入人工干预后再恢复,这在企业级应用中至关重要。 * 多协议集成:支持 MCP (Model Context Protocol) 协议,可以无缝调用外部各种工具服务。 架构图示 源码级组件解析 从源码结构看,agentscope-java 主要由以下四大基石组成: 1. Msg (消息对象)

By Ne0inhk
Java毕设项目推荐-基于springboot的某农业公司基地种植管理系统基于SpringBoot+Vue的种植基地农业信息管理系统【附源码+文档,调试定制服务】

Java毕设项目推荐-基于springboot的某农业公司基地种植管理系统基于SpringBoot+Vue的种植基地农业信息管理系统【附源码+文档,调试定制服务】

博主介绍:✌️码农一枚 ,专注于大学生项目实战开发、讲解和毕业🚢文撰写修改等。全栈领域优质创作者,博客之星、掘金/华为云/阿里云/InfoQ等平台优质作者、专注于Java、小程序技术领域和毕业项目实战 ✌️技术范围::小程序、SpringBoot、SSM、JSP、Vue、PHP、Java、python、爬虫、数据可视化、大数据、物联网、机器学习等设计与开发。 主要内容:免费开题报告、任务书、全bao定制+中期检查PPT、代码编写、🚢文编写和辅导、🚢文降重、长期答辩答疑辅导、一对一专业代码讲解辅导答辩、模拟答辩演练、和理解代码逻辑思路。 特色服务内容:答辩必过班 (全程一对一技术交流,帮助大家顺利完成答辩,小白必选) 全网粉丝50W+,累计帮助2000+完成优秀毕设 🍅文末获取源码🍅 感兴趣的可以先收藏起来,还有大家在毕设选题,

By Ne0inhk

基于Spring Cloud的Java毕设入门实战:从单体到微服务的平滑迁移指南

最近在帮学弟学妹们看Java毕业设计,发现一个挺普遍的现象:很多同学的项目还是传统的单体架构,一个Spring Boot应用包打天下。答辩时老师一问“为什么不用微服务?”,往往就答不上来了。其实,对于Java方向的毕设来说,引入Spring Cloud实现一个简单的微服务架构,不仅能显著提升项目的技术含量和答辩印象分,更是一次非常好的学习实践。 今天,我就结合自己带项目的经验,聊聊如何从零开始,把一个单体应用平滑迁移成基于Spring Cloud的微服务毕设项目。我们会避开复杂的理论,聚焦于“跑起来”和“用明白”。 1. 为什么毕设需要从单体走向微服务? 你可能觉得,一个毕业设计,业务逻辑简单,用户量几乎为零,用单体应用不是更省事吗?这话没错,但毕设的目的不仅仅是“完成功能”,更是“展示能力”和“学习新知”。 单体应用的局限性在毕设中主要体现在: * 技术栈单一:难以体现你对分布式系统、服务治理等现代后端架构的理解。 * 扩展性差:答辩时如果被问到“如果用户量激增,你的系统如何扩展?”,单体架构很难给出有说服力的答案。 * 耦合度高:所有功能模块在一起,修改一处可能影响

By Ne0inhk