本篇摘要
本文从 RPC 核心概念出发,阐释其'透明远程调用'的本质与工作原理,对比主流框架后聚焦百度开源的 C++ 高性能 RPC 框架 BRPC,详解其安装、Echo 服务示例代码(含客户端/服务端实现),并延伸介绍基于 Etcd 的服务注册发现与信道管理封装,完整呈现分布式通信方案落地过程。
RPC 实现跨进程透明调用。本文介绍百度开源 C++ 高性能框架 BRPC,涵盖安装、Echo 服务示例代码及客户端服务端实现。进一步讲解基于 Etcd 的服务注册发现机制,封装 ServiceChannels 与 ServiceManager 管理信道轮转,提供完整的分布式通信落地方案参考。

本文从 RPC 核心概念出发,阐释其'透明远程调用'的本质与工作原理,对比主流框架后聚焦百度开源的 C++ 高性能 RPC 框架 BRPC,详解其安装、Echo 服务示例代码(含客户端/服务端实现),并延伸介绍基于 Etcd 的服务注册发现与信道管理封装,完整呈现分布式通信方案落地过程。
RPC(远程过程调用)就是让程序调用远程服务器上的功能,像调用本地函数一样简单,不用管网络传输细节。比如你手机 App 点'下单',实际是调用电商服务器的'创建订单'功能,RPC 帮你隐藏了网络请求、数据打包等复杂操作,开发者只需写一句类似 createOrder() 的代码,系统自动完成远程调用并返回结果。
userService.getUser(id))。简单概括下: 可以理解成在 rpc 服务中添加一个或者多个服务,然后把 rpc 服务部署在某个 ip+port 处运行;然后客户就可以通过调用 rpc 某个服务的 rpc 请求调用对象通过构建对应信道进行向对应 rpc 服务发起请求然后收到应答等。
BRPC 是百度开源的 C++ 专用 RPC 框架,让不同服务器上的 C++ 程序像调用本地函数一样快速通信,支撑了百度万亿级请求的核心服务。
简单说:C++ 项目追求性能和省心,选 BRPC;需要多语言支持,用 gRPC。
上面说了对应的 brpc 的优点,因此这里可以看出更适合用 brpc 而不是 grpc 来实现。
安装步骤:
sudo apt-get install -y git g++ make libssl-dev libprotobuf-dev librocksdb-dev libprotoc-dev protobuf-compiler libleveldb-dev
git clone https://github.com/apache/brpc.git
cd brpc/
mkdir build && cd build
cmake -DCMAKE_INSTALL_PREFIX=/usr ..
cmake --build . -j6
sudo make install
首先先说下对应思路:
简单来说分为 rpc 服务端与 rpc 客户端(具体操作如下):
对应代码及详细注释:
1. Protobuf 用于后续设定对应 rpc 服务:
syntax="proto3";
package example;
option cc_generic_services = true;
message EchoRequest {
string message = 1;
}
message EchoResponse {
string message = 1;
}
// 搭建对应的服务,然后为这个服务实例化出模版 echo 基类提供用户填充
service EchoService {
rpc Echo(EchoRequest) returns (EchoResponse);
}
2. Makefile(注意这里链接的所有库及顺序):
all : server client
server: server.cc echo.pb.cc
g++ -g -std=c++17 $^ -o $@ -L/usr/local/lib -lspdlog -lfmt -letcd-cpp-api -lcpprest -lbrpc -lgflags -lssl -lcrypto -lprotobuf -lleveldb
client: client.cc echo.pb.cc
g++ -g -std=c++17 $^ -o $@ -L/usr/local/lib -lspdlog -lfmt -letcd-cpp-api -lcpprest -lbrpc -lgflags -lssl -lcrypto -lprotobuf -lleveldb
.PHONY: clean
clean:
rm -r server client
# brpc 库依赖 glags 库,故必须把 gflags 链接在 brpc 库后面,否则报错
# 这里链接的 Protobuf 库的版本必须和 protoc 的时候是一样的否则报错
# -L/usr/local/lib 当前没有对应库再去系统默认的地方链接找库
3. rpc-client:
#include <brpc/channel.h>
#include <thread>
#include "echo.pb.h"
void clientcallback(brpc::Controller *cntl, ::example::EchoResponse *response) {
// 把指针直接给智能指针对象此时直接被智能指针保存地址即可,智能指针就不用在内部 new 了,出了函数作用自动帮忙把客户端构建对象析构释放
std::unique_ptr<brpc::Controller> cntl_guard(cntl);
std::unique_ptr<example::EchoResponse> resp_guard(response);
if (cntl->Failed() == true) {
std::cout << "Rpc 调用失败:" << cntl->ErrorText() << std::endl;
return;
}
std::cout << "收到响应:" << response->message() << std::endl;
}
int main() {
// 客户端调用 rpc 接口通过信道与 rpc 服务端进行通信
brpc::ChannelOptions options;
options.connect_timeout_ms = -1; // 连接等待超时时间,-1 表示一直等待
options.timeout_ms = -1; // rpc 请求等待超时时间,-1 表示一直等待
options.max_retry = 3; // 请求重试次数
options.protocol = "baidu_std"; // 序列化协议,默认使用 baidu_std
brpc::Channel channel;
auto ret = channel.Init("127.0.0.1:8080", &options);
if (ret == -1) {
std::cout << "初始化信道失败!\n";
return -1;
}
// 构建 EchoService_Stub 对象用于向服务端 rpc 发请求
example::EchoService_Stub stub(&channel);
brpc::Controller *cntl = new brpc::Controller();
example::EchoResponse *rsp = new example::EchoResponse();
example::EchoRequest req;
req.set_message("你好 rpc!");
auto closure = google::protobuf::NewCallback(clientcallback, cntl, rsp);
// 异步线程出动走 closure 调用也就是回调函数的处理,主线程收到答复填充完接着往下走
// 主线程这里收到对应答复进行填充好对应的内容如 rsp cntl 等然后如果 closure 不为空就获取对应回调通知然后派异步线程去执行对应 clientcallback 回调函数处理即可
stub.Echo(cntl, &req, rsp, closure); // 向 rpcserver 发送对应 req,然后接收到 response 填充 rsp 及 cntl,之后再去调用 closure 这个可调用对象即调用 clientcallback
std::cout << "异步调用!\n";
std::this_thread::sleep_for(std::chrono::seconds(3));
return 0;
}
4. rpc-server:
#include <brpc/server.h>
#include <butil/logging.h>
#include "echo.pb.h"
class EchoService : public example::EchoService {
void Echo(google::protobuf::RpcController *controller,
const ::example::EchoRequest *request,
::example::EchoResponse *response,
::google::protobuf::Closure *done) {
// 创建一个 brpc::ClosureGuard 对象,用于管理 done 的生命周期。当 rpc_guard 对象析构时,会自动调用 done,通知框架 RPC 处理完成
brpc::ClosureGuard rpc_guard(done);
// 这里析构的时候调用对应的 done->run()构建对应答复信息来通知请求方可以进行自己的回调处理了
std::cout << "收到消息:" << request->message() << std::endl;
std::string str = request->message() + "--这是响应!!";
response->set_message(str);
// 调用完对应自定义函数 echo 后由 rpc 发送对应答复集合
}
};
int main() {
// 关闭 brpc 默认输出日志
logging::LoggingSettings logset;
logset.logging_dest = logging::LoggingDestination::LOG_TO_NONE;
logging::InitLogging(logset);
// 创建 rpc 服务器注册对应服务
brpc::Server server;
EchoService echo;
server.AddService(&echo, brpc::ServiceOwnership::SERVER_DOESNT_OWN_SERVICE);
// 添加选项并启动 rpc 服务
brpc::ServerOptions options;
options.idle_timeout_sec = -1; // 连接空闲超时时间 - 超时后连接被关闭
options.num_threads = 1; // io 线程数量
auto ret = server.Start(8080, &options); // rpc 服务端对应的 echo 服务启动了:也就是收到对应 echo 的 resquest 就去调用对应 Echo 函数完成 response 构建然后自动发送
if (ret == -1) {
std::cout << "启动服务器失败!\n";
return -1;
}
server.RunUntilAskedToQuit(); // 修改等待运行结束
return 0;
}
rpc 调用这里的封装,因为不同的服务调用使用的是不同的 Stub,这个封装起来的意义不大,因此这里封装的是每个服务占用的信道集合和管理起来所有服务。
封装思想(需要封装的两大类分别是 service-channels 与 servicesmanager):
1. ServiceChannels:
2. servicesmanager:
实现代码即对应注释(channel.hpp):
#pragma once
#include <brpc/channel.h>
#include <string>
#include <vector>
#include <unordered_map>
#include <mutex>
#include "log.hpp"
// 使用指针管理对象都::
// etcd 通知某个服务上线或者下线是通过 host:ip+port 通知的,然后这边收到就要进行对应的管理,故进行 host 与对应信道映射。
// 管理一个服务间所有关于它的实例信道(也就是站在某个服务的所有单例对象的信道管理者角度)
class ServiceChannels {
public:
using Ptr = std::shared_ptr<ServiceChannels>;
using channelptr = std::shared_ptr<brpc::Channel>;
ServiceChannels(const std::string &name) : _service_name(name), _idx(0) {}
void Append(const std::string &host) {
std::unique_lock<std::mutex> lock(_mtx);
auto it = _hosts.find(host);
if (it == _hosts.end()) {
// 构建 host 对应的信道进行初始化
std::shared_ptr<brpc::Channel> pchannel = std::make_shared<brpc::Channel>();
brpc::ChannelOptions options;
options.connect_timeout_ms = -1;
options.timeout_ms = -1;
options.max_retry = 3;
options.protocol = "baidu_std";
auto ok = pchannel->Init(host.c_str(), &options);
if (ok == -1) {
LOG_ERROR("初始化{}-{}信道失败!", _service_name, host);
return;
}
// 完成信道这边的管理
_hosts[host] = pchannel;
_channels.push_back(pchannel);
}
}
void Remove(const std::string &host) {
std::unique_lock<std::mutex> lock(_mtx);
auto it = _hosts.find(host);
if (it == _hosts.end()) {
LOG_WARN("{}-{}节点删除信道时,没有找到信道信息!", _service_name, host);
return;
}
for (auto vit = _channels.begin(); vit != _channels.end(); vit++) {
if (*vit == it->second) {
_channels.erase(vit);
break;
}
}
_hosts.erase(host);
}
channelptr Choose() {
std::unique_lock<std::mutex> lock(_mtx);
if (!_channels.size()) return channelptr();
// 存在进行轮询输出
int32_t index = _idx++ % _channels.size();
return _channels[index];
}
private:
std::mutex _mtx;
std::string _service_name;
std::unordered_map<std::string, channelptr> _hosts; // 主机名(ip+port)与 brpc 信道之间映射关系
std::vector<channelptr> _channels; // 维护所有的 channel
int32_t _idx; // 用于 RR 轮转策略
};
// 所有服务的管理者,负责接收到 etcd 服务通知进行对应注册删除服务(或者为服务注册单例对象也就是信道等)等
class ServiceManager {
public:
using Ptr = std::shared_ptr<ServiceManager>;
ServiceManager() {}
// 先声明,我关注哪些服务的上下线,不关心的就不需要管理了
void Cared(const std::string &service_name) {
std::unique_lock<std::mutex> lock(_mtx);
_care_services.insert(service_name);
}
// etcd 监视的回调函数
void OnlineService(const std::string &service_instance_name, const std::string &host) {
const std::string &service_name = GetService(service_instance_name);
auto service = ServiceChannels::Ptr(); // 空的服务信道集
{
std::unique_lock<std::mutex> lock(_mtx);
auto fit = _care_services.find(service_name);
if (fit == _care_services.end()) {
LOG_DEBUG("{}-{} 服务上线了,但是当前并不关心!", service_name, host);
return;
}
// 说明关心对应服务,如果服务存在就添加对应信道即可,不存在就创建服务再添加对应信道
auto sit = _services.find(service_name);
if (sit == _services.end()) {
service = std::make_shared<ServiceChannels>(service_name);
_services.insert(std::make_pair(service_name, service));
} else {
service = sit->second;
}
}
if (!service) {
LOG_ERROR("新增 {} 服务管理节点失败!", service_name);
return;
}
service->Append(host);
LOG_DEBUG("{}-{} 服务上线新节点,进行添加管理!", service_name, host);
}
void UnonlineService(const std::string &service_instance_name, const std::string &host) {
const std::string &service_name = GetService(service_instance_name);
auto service = ServiceChannels::Ptr(); // 空的服务信道集
{
std::unique_lock<std::mutex> lock(_mtx);
auto fit = _care_services.find(service_name);
if (fit == _care_services.end()) {
LOG_DEBUG("{}-{} 服务下线了,但是当前并不关心!", service_name, host);
return;
}
// 从服务集合中查找进行删除
auto sit = _services.find(service_name);
if (sit == _services.end()) {
LOG_WARN("删除{}服务节点时,没有找到管理对象", service_name);
return;
} else {
service = sit->second;
}
}
service->Remove(host);
LOG_DEBUG("{}-{} 服务下线节点,进行删除管理!", service_name, host);
}
ServiceChannels::channelptr ChooseService(const std::string &servicename) {
std::unique_lock<std::mutex> lock(_mtx);
auto sit = _services.find(servicename);
if (sit == _services.end()) {
LOG_ERROR("当前没有能够提供 {} 服务的节点!", servicename);
return ServiceChannels::channelptr();
}
return sit->second->Choose();
}
private:
const std::string GetService(const std::string &service_instance_name) {
auto pos = service_instance_name.find_last_of('/');
if (pos == std::string::npos) return service_instance_name;
// 一般来说不会找不到,注册进去的都是包含实例
return service_instance_name.substr(0, pos);
}
std::mutex _mtx;
std::unordered_map<std::string, ServiceChannels::Ptr> _services; // 添加进来的服务都是与业务有关的,也就是关心的
std::unordered_set<std::string> _care_services;
};
上面封装好了对应的 channel.hpp,下面就使用它结合之前封装的 etcd.hpp 以及复用下 brpc 的简单 echo 服务组合起来使用测试下(说白了其实就是 brpc 的 echo 服务的 server 与 client 端套用下然后结合封装 etcd 代码以及改变下获取 channel 方式而已)。
说下大致思路:
register 与 discovery 两个程序 也就是对应的添加 rpc 服务启动 rpc-server+etcd 注册 echo 服务与借助 servicesmanager 获取对应服务信道 + 进行 rpc 服务请求发送。逻辑过程图:
RPC 通过隐藏网络细节实现跨进程调用,BRPC 凭借高性能与 C++ 友好性成为优选;本文从理论到实践,结合 ETCD 服务治理,为分布式系统通信提供了可落地的参考路径。

微信公众号「极客日志」,在微信中扫描左侧二维码关注。展示文案:极客日志 zeeklog
将字符串编码和解码为其 Base64 格式表示形式即可。 在线工具,Base64 字符串编码/解码在线工具,online
将字符串、文件或图像转换为其 Base64 表示形式。 在线工具,Base64 文件转换器在线工具,online
将 Markdown(GFM)转为 HTML 片段,浏览器内 marked 解析;与 HTML转Markdown 互为补充。 在线工具,Markdown转HTML在线工具,online
将 HTML 片段转为 GitHub Flavored Markdown,支持标题、列表、链接、代码块与表格等;浏览器内处理,可链接预填。 在线工具,HTML转Markdown在线工具,online
通过删除不必要的空白来缩小和压缩JSON。 在线工具,JSON 压缩在线工具,online
将JSON字符串修饰为友好的可读格式。 在线工具,JSON美化和格式化在线工具,online