跳到主要内容RPC 原理与 BRPC 实战:基于 C++ 的分布式通信实现 | 极客日志C++
RPC 原理与 BRPC 实战:基于 C++ 的分布式通信实现
RPC 实现跨进程透明调用。介绍百度开源 C++ 高性能框架 BRPC,涵盖安装、Echo 服务示例代码及客户端服务端实现。进一步讲解基于 Etcd 的服务注册发现机制,封装 ServiceChannels 与 ServiceManager 管理信道轮转,提供完整的分布式通信落地方案参考。
暖阳29 浏览 本篇摘要
本文从 RPC 核心概念出发,阐释其'透明远程调用'的本质与工作原理,对比主流框架后聚焦百度开源的 C++ 高性能 RPC 框架 BRPC,详解其安装、Echo 服务示例代码(含客户端/服务端实现),并延伸介绍基于 Etcd 的服务注册发现与信道管理封装,完整呈现分布式通信方案落地过程。
一、什么是 RPC
简单理解
RPC(远程过程调用)就是让程序调用远程服务器上的功能,像调用本地函数一样简单,不用管网络传输细节。比如你手机 App 点'下单',实际是调用电商服务器的'创建订单'功能,RPC 帮你隐藏了网络请求、数据打包等复杂操作,开发者只需写一句类似 createOrder() 的代码,系统自动完成远程调用并返回结果。
核心特点
- 透明性:调用远程服务就像调用本地函数,开发者无需关心网络通信细节。
- 跨进程/跨机器通信:RPC 通常用于不同进程(甚至不同主机)之间的功能调用。
- 支持多种协议和传输方式:比如 HTTP、TCP、自定义协议等,常用高性能协议如 gRPC(基于 HTTP/2 和 Protocol Buffers)。
- 常与序列化技术配合:调用参数和返回结果需要序列化成字节流在网络中传输,常用格式如 JSON、Protobuf、Thrift 等。
RPC 工作原理
- 客户端调用:客户端代码中调用一个看似本地的方法(如
userService.getUser(id))。
- 代理/Stub:客户端通过一个代理对象(Stub)把调用转换成网络请求,包括方法名、参数等。
- 序列化:将方法名、参数等信息序列化成二进制或文本格式(如 JSON/Protobuf)。
- 网络传输:通过底层网络协议(如 TCP/HTTP)把数据发送到服务端。
- 服务端处理:服务端接收请求,反序列化得到方法名和参数,找到对应的函数并执行。
- 返回结果:服务端将执行结果序列化后通过网络返回给客户端,客户端再反序列化得到最终结果。
常见 RPC 框架
- gRPC:Google 开源的高性能 RPC 框架,基于 HTTP/2 和 Protocol Buffers,支持多语言。
- Thrift:Facebook 开源的跨语言 RPC 框架,支持多种传输协议和数据格式。
- Dubbo:阿里巴巴开源的 Java RPC 框架,广泛用于微服务架构。
- brpc:百度开源的高性能 C++ RPC 框架,支持多种协议和多线程模型。
- JSON-RPC / XML-RPC:基于 JSON 或 XML 的轻量级 RPC 协议,常用于 Web 服务。
典型使用场景
- 微服务架构:服务之间通过 RPC 互相调用,实现功能解耦和分布式部署。
- 分布式系统:比如分布式存储、计算任务调度、数据库中间层等。
- 前后端分离/服务化:后端将核心功能封装成 RPC 服务,供前端、App 或其他服务调用。
简单概括下:
可以理解成在 rpc 服务中添加一个或者多个服务,然后把 rpc 服务部署在某个 ip+port 处运行;然后客户就可以通过调用 rpc 某个服务的 rpc 请求调用对象通过构建对应信道进行向对应 rpc 服务发起请求然后收到应答等。
二、BRPC 介绍
是什么?
BRPC 是百度开源的 C++ 专用 RPC 框架,让不同服务器上的 C++ 程序像调用本地函数一样快速通信,支撑了百度万亿级请求的核心服务。
比 gRPC 强在哪?
- 更快:延迟更低、每秒能处理的请求更多(尤其适合 C++ 高并发场景)。
更简单:专为 C++ 设计,不用学复杂配置,接口更直观。更灵活:支持多种协议,调试和功能扩展更方便(比如动态调参数)。更省事:依赖少,部署简单,和百度工具链直接兼容。简单说:C++ 项目追求性能和省心,选 BRPC;需要多语言支持,用 gRPC。
三、基于 BRPC 实现简单的服务调用
上面说了对应的 brpc 的优点,因此这里可以看出更适合用 brpc 而不是 grpc 来实现。
BRPC 安装教程
sudo apt-get install -y git g++ make libssl-dev libprotobuf-dev librocksdb-dev libprotoc-dev protobuf-compiler libleveldb-dev
git clone https://github.com/apache/brpc.git
cd brpc/
mkdir build && cd build
cmake -DCMAKE_INSTALL_PREFIX=/usr ..
cmake --build . -j6
sudo make install
简单实现客户端向 brpc 服务端口请求服务完成应答过程(以 echo 回显为例)
- rpc 服务端:进行对应 echo 服务注册(通过 Protobuf 协议为服务端与客户端生成对应类,然后服务端填充对应的 echo 功能完成添加进入 rpc-server 中),最后启动对应 server,也就是把 rpc 服务部署在对应 ip+port 处。
- rpc 客户端:通过约定好的 Protobuf 协议生成的对应调用相关 rpc 服务的 rpc 请求接口进行调用(拿对应 rpc 服务信道进行初始化);调用 rpc 服务请求接口发送对应请求等。
- rpc 服务端:收到对应请求就去拿着请求调用用户注册进来的服务函数进行操作,最后填充好答复以及其他相关信息发送回去。
- rpc 客户端:收到对应答复后进行解析拿到结果以及其他信息,然后调用回调函数进行操作或者通过答复结果进行其他处理操作。
简单来说分为 rpc 服务端与 rpc 客户端(具体操作如下):
- 1·rpc 服务端:关闭默认日志 + 构建服务对象向 rpc-server 中添加 + 启动 rpc 服务器。
- 2·rpc 客户端:初始化信道 + 构建并发送 rpc 请求对象 + 调用回调恢复(可选)。
测试效果
- rpc-server 注册完对应的 echo 服务后启动在这等待。
- 服务端接收到客户端的请求进行相关服务处理(echo 服务处理函数)。
- rpc-client 通过特定接口进行 rpc 请求,之后拿到对应响应答复后异步调用回调进行操作。
代码汇总
1. Protobuf 用于后续设定对应 rpc 服务:
syntax="proto3";
package example;
option cc_generic_services = true;
message EchoRequest {
string message = 1;
}
message EchoResponse {
string message = 1;
}
// 搭建对应的服务,然后为这个服务实例化出模版 echo 基类提供用户填充
service EchoService {
rpc Echo(EchoRequest) returns (EchoResponse);
}
2. Makefile(注意这里链接的所有库及顺序):
all : server client
server: server.cc echo.pb.cc
g++ -g -std=c++17 $^ -o $@ -L/usr/local/lib -lspdlog -lfmt -letcd-cpp-api -lcpprest -lbrpc -lgflags -lssl -lcrypto -lprotobuf -lleveldb
client: client.cc echo.pb.cc
g++ -g -std=c++17 $^ -o $@ -L/usr/local/lib -lspdlog -lfmt -letcd-cpp-api -lcpprest -lbrpc -lgflags -lssl -lcrypto -lprotobuf -lleveldb
.PHONY: clean
clean:
rm -r server client
#include <brpc/channel.h>
#include <thread>
#include "echo.pb.h"
void clientcallback(brpc::Controller *cntl, ::example::EchoResponse *response) {
std::unique_ptr<brpc::Controller> cntl_guard(cntl);
std::unique_ptr<example::EchoResponse> resp_guard(response);
if (cntl->Failed() == true) {
std::cout << "Rpc 调用失败:" << cntl->ErrorText() << std::endl;
return;
}
std::cout << "收到响应:" << response->message() << std::endl;
}
int main() {
brpc::ChannelOptions options;
options.connect_timeout_ms = -1;
options.timeout_ms = -1;
options.max_retry = 3;
options.protocol = "baidu_std";
brpc::Channel channel;
auto ret = channel.Init("127.0.0.1:8080", &options);
if (ret == -1) {
std::cout << "初始化信道失败!\n";
return -1;
}
example::EchoService_Stub stub(&channel);
brpc::Controller *cntl = new brpc::Controller();
example::EchoResponse *rsp = new example::EchoResponse();
example::EchoRequest req;
req.set_message("你好 rpc!");
auto closure = google::protobuf::NewCallback(clientcallback, cntl, rsp);
stub.Echo(cntl, &req, rsp, closure);
std::cout << "异步调用!\n";
std::this_thread::sleep_for(std::chrono::seconds(3));
return 0;
}
#include <brpc/server.h>
#include <butil/logging.h>
#include "echo.pb.h"
class EchoService : public example::EchoService {
void Echo(google::protobuf::RpcController *controller,
const ::example::EchoRequest *request,
::example::EchoResponse *response,
::google::protobuf::Closure *done) {
brpc::ClosureGuard rpc_guard(done);
std::cout << "收到消息:" << request->message() << std::endl;
std::string str = request->message() + "--这是响应!!";
response->set_message(str);
}
};
int main() {
logging::LoggingSettings logset;
logset.logging_dest = logging::LoggingDestination::LOG_TO_NONE;
logging::InitLogging(logset);
brpc::Server server;
EchoService echo;
server.AddService(&echo, brpc::ServiceOwnership::SERVER_DOESNT_OWN_SERVICE);
brpc::ServerOptions options;
options.idle_timeout_sec = -1;
options.num_threads = 1;
auto ret = server.Start(8080, &options);
if (ret == -1) {
std::cout << "启动服务器失败!\n";
return -1;
}
server.RunUntilAskedToQuit();
return 0;
}
四、封装每个服务的 channels 及所有服务管理者
rpc 调用这里的封装,因为不同的服务调用使用的是不同的 Stub,这个封装起来的意义不大,因此这里封装的是每个服务占用的信道集合和管理起来所有服务。
封装思想(需要封装的两大类分别是 service-channels 与 servicesmanager):
- 首先要知道注册中心注册(etcd)注册对应服务是以服务 + 单例名称 +ip-port 来注册的,也就是说明一个指定服务可以有多个单例来运行(即多个 rpc 服务,可以是在不同 ip+port 处,因此就有了多个信道概念),得出结论是一个服务对应多个服务单例。
- ServiceChannels 就是管理一个服务间所有关于它的实例信道(也就是站在某个服务的所有单例对象的信道管理者角度)。
- 因此这个类就需要对应信道的增删查等功能,完成对应主机名与服务信道的映射,以及采取 RR 轮转方式使用对应服务的信道。
- 这个其实就是把上面的 service-channels 管理起来,暴露给外面使用的,并集合对应 etcd 的 watch 监控来用,当用户线设定好要关系你的服务后,把上线和下线逻辑函数交给 watch 进行回调,也就是当有服务上下线后(某个服务的单例)判断是否为当前用户关心来完成对应增加删除对应服务信道而已。
- 当用户进行选择服务的时候,采取 RR 轮转选取对应服务的某个信道。
#pragma once
#include <brpc/channel.h>
#include <string>
#include <vector>
#include <unordered_map>
#include <mutex>
#include "log.hpp"
class ServiceChannels {
public:
using Ptr = std::shared_ptr<ServiceChannels>;
using channelptr = std::shared_ptr<brpc::Channel>;
ServiceChannels(const std::string &name) : _service_name(name), _idx(0) {}
void Append(const std::string &host) {
std::unique_lock<std::mutex> lock(_mtx);
auto it = _hosts.find(host);
if (it == _hosts.end()) {
std::shared_ptr<brpc::Channel> pchannel = std::make_shared<brpc::Channel>();
brpc::ChannelOptions options;
options.connect_timeout_ms = -1;
options.timeout_ms = -1;
options.max_retry = 3;
options.protocol = "baidu_std";
auto ok = pchannel->Init(host.c_str(), &options);
if (ok == -1) {
LOG_ERROR("初始化{}-{}信道失败!", _service_name, host);
return;
}
_hosts[host] = pchannel;
_channels.push_back(pchannel);
}
}
void Remove(const std::string &host) {
std::unique_lock<std::mutex> lock(_mtx);
auto it = _hosts.find(host);
if (it == _hosts.end()) {
LOG_WARN("{}-{}节点删除信道时,没有找到信道信息!", _service_name, host);
return;
}
for (auto vit = _channels.begin(); vit != _channels.end(); vit++) {
if (*vit == it->second) {
_channels.erase(vit);
break;
}
}
_hosts.erase(host);
}
channelptr Choose() {
std::unique_lock<std::mutex> lock(_mtx);
if (!_channels.size()) return channelptr();
int32_t index = _idx++ % _channels.size();
return _channels[index];
}
private:
std::mutex _mtx;
std::string _service_name;
std::unordered_map<std::string, channelptr> _hosts;
std::vector<channelptr> _channels;
int32_t _idx;
};
class ServiceManager {
public:
using Ptr = std::shared_ptr<ServiceManager>;
ServiceManager() {}
void Cared(const std::string &service_name) {
std::unique_lock<std::mutex> lock(_mtx);
_care_services.insert(service_name);
}
void OnlineService(const std::string &service_instance_name, const std::string &host) {
const std::string &service_name = GetService(service_instance_name);
auto service = ServiceChannels::Ptr();
{
std::unique_lock<std::mutex> lock(_mtx);
auto fit = _care_services.find(service_name);
if (fit == _care_services.end()) {
LOG_DEBUG("{}-{} 服务上线了,但是当前并不关心!", service_name, host);
return;
}
auto sit = _services.find(service_name);
if (sit == _services.end()) {
service = std::make_shared<ServiceChannels>(service_name);
_services.insert(std::make_pair(service_name, service));
} else {
service = sit->second;
}
}
if (!service) {
LOG_ERROR("新增 {} 服务管理节点失败!", service_name);
return;
}
service->Append(host);
LOG_DEBUG("{}-{} 服务上线新节点,进行添加管理!", service_name, host);
}
void UnonlineService(const std::string &service_instance_name, const std::string &host) {
const std::string &service_name = GetService(service_instance_name);
auto service = ServiceChannels::Ptr();
{
std::unique_lock<std::mutex> lock(_mtx);
auto fit = _care_services.find(service_name);
if (fit == _care_services.end()) {
LOG_DEBUG("{}-{} 服务下线了,但是当前并不关心!", service_name, host);
return;
}
auto sit = _services.find(service_name);
if (sit == _services.end()) {
LOG_WARN("删除{}服务节点时,没有找到管理对象", service_name);
return;
} else {
service = sit->second;
}
}
service->Remove(host);
LOG_DEBUG("{}-{} 服务下线节点,进行删除管理!", service_name, host);
}
ServiceChannels::channelptr ChooseService(const std::string &servicename) {
std::unique_lock<std::mutex> lock(_mtx);
auto sit = _services.find(servicename);
if (sit == _services.end()) {
LOG_ERROR("当前没有能够提供 {} 服务的节点!", servicename);
return ServiceChannels::channelptr();
}
return sit->second->Choose();
}
private:
const std::string GetService(const std::string &service_instance_name) {
auto pos = service_instance_name.find_last_of('/');
if (pos == std::string::npos) return service_instance_name;
return service_instance_name.substr(0, pos);
}
std::mutex _mtx;
std::unordered_map<std::string, ServiceChannels::Ptr> _services;
std::unordered_set<std::string> _care_services;
};
五、基于 Etcd 实现服务上下线监控来完成 BRPC 服务调用
上面封装好了对应的 channel.hpp,下面就使用它结合之前封装的 etcd.hpp 以及复用下 brpc 的简单 echo 服务组合起来使用测试下(说白了其实就是 brpc 的 echo 服务的 server 与 client 端套用下然后结合封装 etcd 代码以及改变下获取 channel 方式而已)。
- 实现
register 与 discovery 两个程序 也就是对应的添加 rpc 服务启动 rpc-server+etcd 注册 echo 服务与借助 servicesmanager 获取对应服务信道 + 进行 rpc 服务请求发送。
测试效果
- 一开始启动对应的 rpc 客户端,发现 etcd 中没有关心的服务即对应实例,只能看到之前在 etcd 服务器中注册的其他服务,但是是不相关的故不进行调用。
- 此时启动对应 rpc-server 也就是给 rpc 服务器添加对应服务并部署,给 etcd 注册进去对应服务后开始运行等待。
- 此时服务端收到对应的 rpc 客户端发来的请求,然后进行调用 echo 服务进行构建答复发送回去。
- 客户端收到服务端发送来的答复进行解析处理。
六、本篇小结
RPC 通过隐藏网络细节实现跨进程调用,BRPC 凭借高性能与 C++ 友好性成为优选;本文从理论到实践,结合 ETCD 服务治理,为分布式系统通信提供了可落地的参考路径。
相关免费在线工具
- Base64 字符串编码/解码
将字符串编码和解码为其 Base64 格式表示形式即可。 在线工具,Base64 字符串编码/解码在线工具,online
- Base64 文件转换器
将字符串、文件或图像转换为其 Base64 表示形式。 在线工具,Base64 文件转换器在线工具,online
- Markdown转HTML
将 Markdown(GFM)转为 HTML 片段,浏览器内 marked 解析;与 HTML转Markdown 互为补充。 在线工具,Markdown转HTML在线工具,online
- HTML转Markdown
将 HTML 片段转为 GitHub Flavored Markdown,支持标题、列表、链接、代码块与表格等;浏览器内处理,可链接预填。 在线工具,HTML转Markdown在线工具,online
- JSON 压缩
通过删除不必要的空白来缩小和压缩JSON。 在线工具,JSON 压缩在线工具,online
- JSON美化和格式化
将JSON字符串修饰为友好的可读格式。 在线工具,JSON美化和格式化在线工具,online