Linux C/C++ 学习日记(70):grpc(三):基于grpc编写同步的server、client

Linux C/C++ 学习日记(70):grpc(三):基于grpc编写同步的server、client

注:该文用于个人学习记录和知识交流,如有不足,欢迎指点。

一、proto文件

syntax = "proto3"; package example; // 定义四种 RPC 模式的服务 service ExampleService { // 1. 一元 RPC (Unary):客户端发一个请求,服务端回一个响应 rpc UnaryCall (Request) returns (Response); // 2. 服务端流式 RPC (Server Streaming):客户端发一个请求,服务端回多个响应 rpc ServerStream (Request) returns (stream Response); // 3. 客户端流式 RPC (Client Streaming):客户端发多个请求,服务端回一个响应 rpc ClientStream (stream Request) returns (Response); // 4. 双向流式 RPC (Bidirectional Streaming):双方互相发多个消息 rpc BidiStream (stream Request) returns (stream Response); } // 请求消息 message Request { string data = 1; } // 响应消息 message Response { string data = 1; }

二、server同步

/* * 同步服务端 */ #include <grpcpp/grpcpp.h> #include "./build/example.grpc.pb.h" using grpc::Server; using grpc::ServerBuilder; using grpc::ServerContext; using grpc::ServerReader; using grpc::ServerWriter; using grpc::ServerReaderWriter; using grpc::Status; using example::ExampleService; using example::Request; using example::Response; // 服务实现类 class ExampleServiceImpl final : public ExampleService::Service { // 1. 一元 RPC Status UnaryCall(ServerContext* context, const Request* req, Response* res) override { res->set_data("Server: " + req->data()); return Status::OK; } // 2. 服务端流式 RPC Status ServerStream(ServerContext* context, const Request* req, ServerWriter<Response>* writer) override { for (int i = 0; i < 3; ++i) { Response res; res.set_data("Server Stream " + std::to_string(i) + ": " + req->data()); writer->Write(res); // 每次循环立即发送 1 条数据 } return Status::OK; // 发送结束标记,告诉客户端“发完了” } // 3. 客户端流式 RPC Status ClientStream(ServerContext* context, ServerReader<Request>* reader, Response* res) override { std::string combined; Request req; while (reader->Read(&req)) { // Read()阻塞等待响应,收到数据时返回 true,收到结束标记时返回 false combined += req.data() + " "; } res->set_data("Server Combined: " + combined); return Status::OK; } // 4. 双向流式 RPC Status BidiStream(ServerContext* context, ServerReaderWriter<Response, Request>* stream) override { Request req; while (stream->Read(&req)) { Response res; res.set_data("Server Echo: " + req.data()); stream->Write(res); } return Status::OK; } }; void RunServer() { std::string addr("0.0.0.0:50051"); ExampleServiceImpl service; ServerBuilder builder; builder.AddListeningPort(addr, grpc::InsecureServerCredentials()); builder.RegisterService(&service); std::unique_ptr<Server> server(builder.BuildAndStart()); server->Wait(); } int main() { RunServer(); return 0; }

 三、client同步

/* * 同步客户端 */ #include <grpcpp/grpcpp.h> #include "./build/example.grpc.pb.h" #include <vector> using grpc::Channel; using grpc::ClientContext; using grpc::ClientReader; using grpc::ClientWriter; using grpc::ClientReaderWriter; using grpc::Status; using example::ExampleService; using example::Request; using example::Response; class ExampleClient { public: ExampleClient(std::shared_ptr<Channel> channel) : stub_(ExampleService::NewStub(channel)) {} // 1. 一元 RPC void CallUnary(const std::string& msg) { Request req; req.set_data(msg); Response res; ClientContext ctx; Status status = stub_->UnaryCall(&ctx, req, &res); if (status.ok()) std::cout << "Unary: " << res.data() << std::endl; } // 2. 服务端流式 RPC void CallServerStream(const std::string& msg) { Request req; req.set_data(msg); ClientContext ctx; std::unique_ptr<ClientReader<Response>> reader = stub_->ServerStream(&ctx, req); Response res; while (reader->Read(&res)) { // Read()阻塞等待响应,收到数据时返回 true,收到结束标记时返回 false std::cout << "Server Stream: " << res.data() << std::endl; } } // 3. 客户端流式 RPC void CallClientStream(const std::vector<std::string>& msgs) { ClientContext ctx; Response res; std::unique_ptr<ClientWriter<Request>> writer = stub_->ClientStream(&ctx, &res); for (const auto& msg : msgs) { Request req; req.set_data(msg); writer->Write(req); // 发送数据给服务端 } writer->WritesDone(); // 发送结束标记,告诉服务端“发完了” Status status = writer->Finish(); // 阻塞等待服务端的响应 if (status.ok()) std::cout << "Client Stream: " << res.data() << std::endl; } // 4. 双向流式 RPC void CallBidiStream(const std::vector<std::string>& msgs) { ClientContext ctx; std::unique_ptr<ClientReaderWriter<Request, Response>> stream = stub_->BidiStream(&ctx); // 写请求 for (const auto& msg : msgs) { Request req; req.set_data(msg); stream->Write(req); } stream->WritesDone(); // 读响应 Response res; while (stream->Read(&res)) { std::cout << "Bidi Stream: " << res.data() << std::endl; } } private: std::unique_ptr<ExampleService::Stub> stub_; }; int main() { ExampleClient client(grpc::CreateChannel("localhost:50051", grpc::InsecureChannelCredentials())); client.CallUnary("Hello"); client.CallServerStream("Hi"); client.CallClientStream({"A", "B", "C"}); client.CallBidiStream({"X", "Y", "Z"}); return 0; }

四、消息的发送和接收的时机

同步 API 会阻塞当前线程,直到操作完成

1. 一元 RPC (Unary)

客户端发送时机

调用 stub_->UnaryCall(&ctx, req, &res) 时:

  • 函数内部会将 Request 序列化为二进制,立即发送给服务端
  • 然后阻塞,直到收到服务端的 Response 或出错。
服务端发送时机

UnaryCall 函数中执行 return Status::OK; 时:

  • 之前设置的 res->set_data(...) 会被序列化,Status::OK 一起发送给客户端

2. 服务端流式 RPC (Server Streaming)

客户端发送时机

调用 stub_->ServerStream(&ctx, req) 时:

  • Request立即发送给服务端
  • 函数返回 ClientReader,后续通过 reader->Read(&res) 阻塞接收服务端的流式响应。
服务端发送时机

每次调用 writer->Write(res) 时:

  • 当前的 Response立即发送给客户端(不会等循环结束)。
  • Write 会阻塞直到数据写入传输缓冲区。

3. 客户端流式 RPC (Client Streaming)

客户端发送时机
  • 每次调用 writer->Write(req) 时:当前的 Request立即发送给服务端
  • 调用 writer->WritesDone() 时:向服务端发送「客户端已写完所有请求」的标记(不发数据,只发控制帧)。
  • 调用 writer->Finish() 时:阻塞等待服务端的最终 Response
服务端发送时机

ClientStream 函数中执行 return Status::OK; 时:

  • 之前设置的 res->set_data(...)Status::OK 一起发送给客户端

4. 双向流式 RPC (Bidirectional Streaming)

客户端发送时机
  • 每次调用 stream->Write(req) 时:当前的 Request立即发送给服务端
  • 调用 stream->WritesDone() 时:发送「客户端已写完」的标记。
服务端发送时机

每次调用 stream->Write(res) 时:当前的 Response立即发送给客户端

总结

模式客户端发送数据时机服务端发送数据时机
一元调用 UnaryCallreturn Status::OK
服务端流调用 ServerStream 时(发起请求,无后续主动发送,仅通过 reader->Read() 接收)每次 writer->Write 时;return Status::OK 时发送结束标记
客户端流每次 writer->Write 时;writer->WritesDone() 发送结束标记return Status::OK
双向流每次 stream->Write 时;stream->WritesDone() 发送结束标记每次 stream->Write 时;return Status::OK 时发送结束标记
调用模式客户端接收数据时机服务端接收数据时机
一元 RPC(Unary)调用 stub->UnaryCall(&ctx, req, &res) 阻塞返回时,从 res 中获取服务端响应实现 ExampleService::UnaryCall 方法时,直接从入参 req 中获取客户端请求数据
服务端流(ServerStream)调用 stub->ServerStream(&ctx, req, &reader) 后,循环调用 reader->Read(&res) 接收服务端推送的每一条数据;Read() 返回 false 表示流结束实现 ExampleService::ServerStream 方法时,直接从入参 req 中获取客户端请求数据(仅 1 次)
客户端流(ClientStream)调用 writer->Finish(&res) 阻塞返回时,从 res 中获取服务端最终响应实现 ExampleService::ClientStream 方法时,循环调用 reader->Read(&req) 接收客户端推送的每一条数据;Read() 返回 false 表示客户端发送完毕
双向流(BidiStream)调用 stub->BidiStream(&ctx, &stream) 后,循环调用 stream->Read(&res) 接收服务端推送的每一条数据;Read() 返回 false 表示服务端流结束实现 ExampleService::BidiStream 方法时,循环调用 stream->Read(&req) 接收客户端推送的每一条数据;Read() 返回 false 表示客户端流结束

Read more

OpenClaw 实战:5 分钟用 AI Agent 自动生成规范测试用例并写入 Excel

OpenClaw 实战:5 分钟用 AI Agent 自动生成规范测试用例并写入 Excel

OpenClaw 实战:5 分钟用 AI Agent 自动生成规范测试用例并写入 Excel 一、核心前提 OpenClaw 是轻量级 Agent 框架,核心聚焦: Skill 注册 → 工具选择 → 任务执行 * 没有 Dify 的可视化界面 * 没有知识库、没有复杂工作流 * 代码极简洁、上手极快 * 适合:测试开发 / 有编程能力的测试工程师 一句话定位: OpenClaw = 极简、轻量、只专注做工具调用的小 Agent 引擎 二、环境准备 1. 安装 OpenClaw 及依赖 # 安装 OpenClaw 核心框架 pip install openclaw # Excel 操作

By Ne0inhk
【AI】大语言模型 (LLM) 产品的开发流程参考

【AI】大语言模型 (LLM) 产品的开发流程参考

🔥小龙报:个人主页 🎬作者简介:C++研发,嵌入式,机器人等方向学习者 ❄️个人专栏:《AI》 ✨ 永远相信美好的事情即将发生 文章目录 * 前言 * 一、个人开发者的大语言模型 (LLM) 产品的开发流程参考 * 1.1 准备工作 * 1.2 构建知识库索引 * 1.3 定制大模型 * 1.4 用户交互界面开发 * 1.5 测试与部署上线 * 1.6 监控结果 * 二、组织/商用级别的大语言模型 (LLM) 产品开发流程参考 * 2.1 准备工作 * 2.2 定制大模型 * 2.3 模型部署与集成 * 2.4

By Ne0inhk
AI Agent 架构:基础组成模块深度解析

AI Agent 架构:基础组成模块深度解析

AI Agent 架构:基础组成模块深度解析 📝 本章学习目标:本章是入门认知部分,帮助零基础读者建立对AI Agent的初步认知。通过本章学习,你将全面掌握"AI Agent 架构:基础组成模块深度解析"这一核心主题。 一、引言:为什么这个话题如此重要 在AI Agent快速发展的今天,AI Agent 架构:基础组成模块深度解析已经成为每个开发者和研究者必须了解的核心知识。无论你是技术背景还是非技术背景,理解这一概念都将帮助你更好地把握AI时代的机遇。 1.1 背景与意义 💡 核心认知:AI Agent正在从"对话工具"进化为"执行引擎",能够主动完成任务、调用工具、与外部世界交互。这一变革正在深刻改变我们的工作和生活方式。 从2023年AutoGPT的横空出世,到如今百花齐放的Agent生态,短短一年多时间,执行式AI已经从概念走向落地。根据最新统计,

By Ne0inhk
Ubuntu 虚拟机安装 OpenClaw

Ubuntu 虚拟机安装 OpenClaw

最近特别火的一个事是OpenClaw,个人AI助手工具,周末空闲,咱也对新技术尝尝鲜。 一、环境与前置准备 1.1虚拟机配置要求 配置项最低要求推荐配置CPU2 vCPU2-4 vCPU内存4 GB RAM8 GB RAM存储20 GB SSD40 GB+ SSD系统Ubuntu 22.04 LTSUbuntu 22.04/24.04 LTS网络公网访问(可选)稳定的外网连接 1.2 系统初始化和更新 更新系统软件包 sudo apt update && sudo apt upgrade -y 安装必要有工具 sudo apt install -y curl ca-certificates git

By Ne0inhk