一、什么是 RPC
简单理解
RPC(远程过程调用)让程序调用远程服务器上的功能,像调用本地函数一样简单,无需关心网络传输细节。例如手机 App 点击'下单',实际是调用电商服务器的创建订单功能,RPC 隐藏了网络请求、数据打包等复杂操作。
RPC 远程过程调用原理通过隐藏网络细节实现跨进程通信,BRPC 作为百度开源的高性能 C++ 框架在此场景下优势明显。文章涵盖 BRPC 安装配置、Echo 服务代码示例、信道封装管理及基于 ETCD 的服务注册发现监控,提供完整的分布式通信落地方案。

RPC(远程过程调用)让程序调用远程服务器上的功能,像调用本地函数一样简单,无需关心网络传输细节。例如手机 App 点击'下单',实际是调用电商服务器的创建订单功能,RPC 隐藏了网络请求、数据打包等复杂操作。
BRPC 是百度开源的 C++ 专用 RPC 框架,让不同服务器上的 C++ 程序像调用本地函数一样快速通信,支撑了百度万亿级请求的核心服务。
安装步骤:
sudo apt-get install -y git g++ make libssl-dev libprotobuf-dev librocksdb-dev libprotoc-dev protobuf-compiler libleveldb-dev
git clone https://github.com/apache/brpc.git
cd brpc/
mkdir build && cd build
cmake -DCMAKE_INSTALL_PREFIX=/usr ..
cmake --build . -j6
sudo make install
思路:

具体操作:



1. Protobuf 用于后续设定对应 rpc 服务:
syntax="proto3";
package example;
option cc_generic_services = true;
message EchoRequest {
string message = 1;
}
message EchoResponse {
string message = 1;
}
service EchoService {
rpc Echo(EchoRequest) returns (EchoResponse);
}
2. Makefile:
all :server client
server:server.cc echo.pb.cc
g++ -g -std=c++17 $^ -o $@ -L/usr/local/lib -lspdlog -lfmt -letcd-cpp-api -lcpprest -lbrpc -lgflags -lssl -lcrypto -lprotobuf -lleveldb
client:client.cc echo.pb.cc
g++ -g -std=c++17 $^ -o $@ -L/usr/local/lib -lspdlog -lfmt -letcd-cpp-api -lcpprest -lbrpc -lgflags -lssl -lcrypto -lprotobuf -lleveldb
.PHONY:clean
clean:
rm -r server client
3. rpc-client:
#include <brpc/channel.h>
#include <thread>
#include "echo.pb.h"
void clientcallback(brpc::Controller *cntl, ::example::EchoResponse *response) {
std::unique_ptr<brpc::Controller> cntl_guard(cntl);
std::unique_ptr<example::EchoResponse> resp_guard(response);
if (cntl->Failed() == true) {
std::cout << "Rpc 调用失败:" << cntl->ErrorText() << std::endl;
return;
}
std::cout << "收到响应:" << response->message() << std::endl;
}
int main() {
brpc::ChannelOptions options;
options.connect_timeout_ms = -1;
options.timeout_ms = -1;
options.max_retry = 3;
options.protocol = "baidu_std";
brpc::Channel channel;
auto ret = channel.Init("127.0.0.1:8080", &options);
if (ret == -1) {
std::cout << "初始化信道失败!\n";
return -1;
}
example::EchoService_Stub stub(&channel);
brpc::Controller *cntl = new brpc::Controller();
example::EchoResponse *rsp = new example::EchoResponse();
example::EchoRequest req;
req.set_message("你好 rpc!");
auto closure = google::protobuf::NewCallback(clientcallback, cntl, rsp);
stub.Echo(cntl, &req, rsp, closure);
std::cout << "异步调用!\n";
std::this_thread::sleep_for(std::chrono::seconds(3));
return 0;
}
4. rpc-server:
#include <brpc/server.h>
#include <butil/logging.h>
#include "echo.pb.h"
class EchoService : public example::EchoService {
void Echo(google::protobuf::RpcController *controller,
const ::example::EchoRequest *request,
::example::EchoResponse *response,
::google::protobuf::Closure *done) {
brpc::ClosureGuard rpc_guard(done);
std::cout << "收到消息:" << request->message() << std::endl;
std::string str = request->message() + "--这是响应!!";
response->set_message(str);
}
};
int main() {
logging::LoggingSettings logset;
logset.logging_dest = logging::LoggingDestination::LOG_TO_NONE;
logging::InitLogging(logset);
brpc::Server server;
EchoService echo;
server.AddService(&echo, brpc::ServiceOwnership::SERVER_DOESNT_OWN_SERVICE);
brpc::ServerOptions options;
options.idle_timeout_sec = -1;
options.num_threads = 1;
auto ret = server.Start(8080, &options);
if (ret == -1) {
std::cout << "启动服务器失败!\n";
return -1;
}
server.RunUntilAskedToQuit();
return 0;
}
rpc 调用这里的封装,因为不同的服务调用使用的是不同的 Stub,因此封装的是每个服务占用的信道集合和管理起来所有服务。
封装思想:

实现代码(channel.hpp):
#pragma once
#include <brpc/channel.h>
#include <string>
#include <vector>
#include <unordered_map>
#include <mutex>
#include "log.hpp"
class ServiceChannels {
public:
using Ptr = std::shared_ptr<ServiceChannels>;
using channelptr = std::shared_ptr<brpc::Channel>;
ServiceChannels(const std::string &name) : _service_name(name), _idx(0) {}
void Append(const std::string &host) {
std::unique_lock<std::mutex> lock(_mtx);
auto it = _hosts.find(host);
if (it == _hosts.end()) {
std::shared_ptr<brpc::Channel> pchannel = std::make_shared<brpc::Channel>();
brpc::ChannelOptions options;
options.connect_timeout_ms = -1;
options.timeout_ms = -1;
options.max_retry = 3;
options.protocol = "baidu_std";
auto ok = pchannel->Init(host.c_str(), &options);
if (ok == -1) {
LOG_ERROR("初始化{}-{}信道失败!", _service_name, host);
return;
}
_hosts[host] = pchannel;
_channels.push_back(pchannel);
}
}
void Remove(const std::string &host) {
std::unique_lock<std::mutex> lock(_mtx);
auto it = _hosts.find(host);
if (it == _hosts.end()) {
LOG_WARN("{}-{}节点删除信道时,没有找到信道信息!", _service_name, host);
return;
}
for (auto vit = _channels.begin(); vit != _channels.end(); vit++) {
if (*vit == it->second) {
_channels.erase(vit);
break;
}
}
_hosts.erase(host);
}
channelptr Choose() {
std::unique_lock<std::mutex> lock(_mtx);
if (!_channels.size()) return channelptr();
int32_t index = _idx++ % _channels.size();
return _channels[index];
}
private:
std::mutex _mtx;
std::string _service_name;
std::unordered_map<std::string, channelptr> _hosts;
std::vector<channelptr> _channels;
int32_t _idx;
};
class ServiceManager {
public:
using Ptr = std::shared_ptr<ServiceManager>;
ServiceManager() {}
void Cared(const std::string &service_name) {
std::unique_lock<std::mutex> lock(_mtx);
_care_services.insert(service_name);
}
void OnlineService(const std::string &service_instance_name, const std::string &host) {
const std::string &service_name = GetService(service_instance_name);
auto service = ServiceChannels::Ptr();
{
std::unique_lock<std::mutex> lock(_mtx);
auto fit = _care_services.find(service_name);
if (fit == _care_services.end()) {
LOG_DEBUG("{}-{} 服务上线了,但是当前并不关心!", service_name, host);
return;
}
auto sit = _services.find(service_name);
if (sit == _services.end()) {
service = std::make_shared<ServiceChannels>(service_name);
_services.insert(std::make_pair(service_name, service));
} else {
service = sit->second;
}
}
if (!service) {
LOG_ERROR("新增 {} 服务管理节点失败!", service_name);
return;
}
service->Append(host);
LOG_DEBUG("{}-{} 服务上线新节点,进行添加管理!", service_name, host);
}
void UnonlineService(const std::string &service_instance_name, const std::string &host) {
const std::string &service_name = GetService(service_instance_name);
auto service = ServiceChannels::Ptr();
{
std::unique_lock<std::mutex> lock(_mtx);
auto fit = _care_services.find(service_name);
if (fit == _care_services.end()) {
LOG_DEBUG("{}-{} 服务下线了,但是当前并不关心!", service_name, host);
return;
}
auto sit = _services.find(service_name);
if (sit == _services.end()) {
LOG_WARN("删除{}服务节点时,没有找到管理对象", service_name);
return;
} else {
service = sit->second;
}
}
service->Remove(host);
LOG_DEBUG("{}-{} 服务下线节点,进行删除管理!", service_name, host);
}
ServiceChannels::channelptr ChooseService(const std::string &servicename) {
std::unique_lock<std::mutex> lock(_mtx);
auto sit = _services.find(servicename);
if (sit == _services.end()) {
LOG_ERROR("当前没有能够提供 {} 服务的节点!", servicename);
return ServiceChannels::channelptr();
}
return sit->second->Choose();
}
private:
const std::string GetService(const std::string &service_instance_name) {
auto pos = service_instance_name.find_last_of('/');
if (pos == std::string::npos) return service_instance_name;
return service_instance_name.substr(0, pos);
}
std::mutex _mtx;
std::unordered_map<std::string, ServiceChannels::Ptr> _services;
std::unordered_set<std::string> _care_services;
};
上面封装好了对应的 channel.hpp,下面就使用它结合之前封装的 etcd.hpp 以及复用下 brpc 的简单 echo 服务组合起来使用测试下。
大致思路:
register 与 discovery 两个程序,即对应的添加 rpc 服务启动 rpc-server+etcd 注册 echo 服务与借助 servicesmanager 获取对应服务信道 + 进行 rpc 服务请求发送。





微信公众号「极客日志」,在微信中扫描左侧二维码关注。展示文案:极客日志 zeeklog
将字符串编码和解码为其 Base64 格式表示形式即可。 在线工具,Base64 字符串编码/解码在线工具,online
将字符串、文件或图像转换为其 Base64 表示形式。 在线工具,Base64 文件转换器在线工具,online
将 Markdown(GFM)转为 HTML 片段,浏览器内 marked 解析;与 HTML转Markdown 互为补充。 在线工具,Markdown转HTML在线工具,online
将 HTML 片段转为 GitHub Flavored Markdown,支持标题、列表、链接、代码块与表格等;浏览器内处理,可链接预填。 在线工具,HTML转Markdown在线工具,online
通过删除不必要的空白来缩小和压缩JSON。 在线工具,JSON 压缩在线工具,online
将JSON字符串修饰为友好的可读格式。 在线工具,JSON美化和格式化在线工具,online