Linux C/C++ 学习日记(70):grpc(三):基于grpc编写同步的server、client
注:该文用于个人学习记录和知识交流,如有不足,欢迎指点。
一、proto文件
syntax = "proto3"; package example; // 定义四种 RPC 模式的服务 service ExampleService { // 1. 一元 RPC (Unary):客户端发一个请求,服务端回一个响应 rpc UnaryCall (Request) returns (Response); // 2. 服务端流式 RPC (Server Streaming):客户端发一个请求,服务端回多个响应 rpc ServerStream (Request) returns (stream Response); // 3. 客户端流式 RPC (Client Streaming):客户端发多个请求,服务端回一个响应 rpc ClientStream (stream Request) returns (Response); // 4. 双向流式 RPC (Bidirectional Streaming):双方互相发多个消息 rpc BidiStream (stream Request) returns (stream Response); } // 请求消息 message Request { string data = 1; } // 响应消息 message Response { string data = 1; }二、server同步
/* * 同步服务端 */ #include <grpcpp/grpcpp.h> #include "./build/example.grpc.pb.h" using grpc::Server; using grpc::ServerBuilder; using grpc::ServerContext; using grpc::ServerReader; using grpc::ServerWriter; using grpc::ServerReaderWriter; using grpc::Status; using example::ExampleService; using example::Request; using example::Response; // 服务实现类 class ExampleServiceImpl final : public ExampleService::Service { // 1. 一元 RPC Status UnaryCall(ServerContext* context, const Request* req, Response* res) override { res->set_data("Server: " + req->data()); return Status::OK; } // 2. 服务端流式 RPC Status ServerStream(ServerContext* context, const Request* req, ServerWriter<Response>* writer) override { for (int i = 0; i < 3; ++i) { Response res; res.set_data("Server Stream " + std::to_string(i) + ": " + req->data()); writer->Write(res); // 每次循环立即发送 1 条数据 } return Status::OK; // 发送结束标记,告诉客户端“发完了” } // 3. 客户端流式 RPC Status ClientStream(ServerContext* context, ServerReader<Request>* reader, Response* res) override { std::string combined; Request req; while (reader->Read(&req)) { // Read()阻塞等待响应,收到数据时返回 true,收到结束标记时返回 false combined += req.data() + " "; } res->set_data("Server Combined: " + combined); return Status::OK; } // 4. 双向流式 RPC Status BidiStream(ServerContext* context, ServerReaderWriter<Response, Request>* stream) override { Request req; while (stream->Read(&req)) { Response res; res.set_data("Server Echo: " + req.data()); stream->Write(res); } return Status::OK; } }; void RunServer() { std::string addr("0.0.0.0:50051"); ExampleServiceImpl service; ServerBuilder builder; builder.AddListeningPort(addr, grpc::InsecureServerCredentials()); builder.RegisterService(&service); std::unique_ptr<Server> server(builder.BuildAndStart()); server->Wait(); } int main() { RunServer(); return 0; }三、client同步
/* * 同步客户端 */ #include <grpcpp/grpcpp.h> #include "./build/example.grpc.pb.h" #include <vector> using grpc::Channel; using grpc::ClientContext; using grpc::ClientReader; using grpc::ClientWriter; using grpc::ClientReaderWriter; using grpc::Status; using example::ExampleService; using example::Request; using example::Response; class ExampleClient { public: ExampleClient(std::shared_ptr<Channel> channel) : stub_(ExampleService::NewStub(channel)) {} // 1. 一元 RPC void CallUnary(const std::string& msg) { Request req; req.set_data(msg); Response res; ClientContext ctx; Status status = stub_->UnaryCall(&ctx, req, &res); if (status.ok()) std::cout << "Unary: " << res.data() << std::endl; } // 2. 服务端流式 RPC void CallServerStream(const std::string& msg) { Request req; req.set_data(msg); ClientContext ctx; std::unique_ptr<ClientReader<Response>> reader = stub_->ServerStream(&ctx, req); Response res; while (reader->Read(&res)) { // Read()阻塞等待响应,收到数据时返回 true,收到结束标记时返回 false std::cout << "Server Stream: " << res.data() << std::endl; } } // 3. 客户端流式 RPC void CallClientStream(const std::vector<std::string>& msgs) { ClientContext ctx; Response res; std::unique_ptr<ClientWriter<Request>> writer = stub_->ClientStream(&ctx, &res); for (const auto& msg : msgs) { Request req; req.set_data(msg); writer->Write(req); // 发送数据给服务端 } writer->WritesDone(); // 发送结束标记,告诉服务端“发完了” Status status = writer->Finish(); // 阻塞等待服务端的响应 if (status.ok()) std::cout << "Client Stream: " << res.data() << std::endl; } // 4. 双向流式 RPC void CallBidiStream(const std::vector<std::string>& msgs) { ClientContext ctx; std::unique_ptr<ClientReaderWriter<Request, Response>> stream = stub_->BidiStream(&ctx); // 写请求 for (const auto& msg : msgs) { Request req; req.set_data(msg); stream->Write(req); } stream->WritesDone(); // 读响应 Response res; while (stream->Read(&res)) { std::cout << "Bidi Stream: " << res.data() << std::endl; } } private: std::unique_ptr<ExampleService::Stub> stub_; }; int main() { ExampleClient client(grpc::CreateChannel("localhost:50051", grpc::InsecureChannelCredentials())); client.CallUnary("Hello"); client.CallServerStream("Hi"); client.CallClientStream({"A", "B", "C"}); client.CallBidiStream({"X", "Y", "Z"}); return 0; }四、消息的发送和接收的时机
同步 API 会阻塞当前线程,直到操作完成。
1. 一元 RPC (Unary)
客户端发送时机
调用 stub_->UnaryCall(&ctx, req, &res) 时:
- 函数内部会将
Request序列化为二进制,立即发送给服务端。 - 然后阻塞,直到收到服务端的
Response或出错。
服务端发送时机
在 UnaryCall 函数中执行 return Status::OK; 时:
- 之前设置的
res->set_data(...)会被序列化,随Status::OK一起发送给客户端。
2. 服务端流式 RPC (Server Streaming)
客户端发送时机
调用 stub_->ServerStream(&ctx, req) 时:
Request被立即发送给服务端。- 函数返回
ClientReader,后续通过reader->Read(&res)阻塞接收服务端的流式响应。
服务端发送时机
每次调用 writer->Write(res) 时:
- 当前的
Response被立即发送给客户端(不会等循环结束)。 Write会阻塞直到数据写入传输缓冲区。
3. 客户端流式 RPC (Client Streaming)
客户端发送时机
- 每次调用
writer->Write(req)时:当前的Request被立即发送给服务端。 - 调用
writer->WritesDone()时:向服务端发送「客户端已写完所有请求」的标记(不发数据,只发控制帧)。 - 调用
writer->Finish()时:阻塞等待服务端的最终Response。
服务端发送时机
在 ClientStream 函数中执行 return Status::OK; 时:
- 之前设置的
res->set_data(...)被随Status::OK一起发送给客户端。
4. 双向流式 RPC (Bidirectional Streaming)
客户端发送时机
- 每次调用
stream->Write(req)时:当前的Request被立即发送给服务端。 - 调用
stream->WritesDone()时:发送「客户端已写完」的标记。
服务端发送时机
每次调用 stream->Write(res) 时:当前的 Response 被立即发送给客户端。
总结
| 模式 | 客户端发送数据时机 | 服务端发送数据时机 |
|---|---|---|
| 一元 | 调用 UnaryCall 时 | return Status::OK 时 |
| 服务端流 | 调用 ServerStream 时(发起请求,无后续主动发送,仅通过 reader->Read() 接收) | 每次 writer->Write 时;return Status::OK 时发送结束标记 |
| 客户端流 | 每次 writer->Write 时;writer->WritesDone() 发送结束标记 | return Status::OK 时 |
| 双向流 | 每次 stream->Write 时;stream->WritesDone() 发送结束标记 | 每次 stream->Write 时;return Status::OK 时发送结束标记 |
| 调用模式 | 客户端接收数据时机 | 服务端接收数据时机 |
|---|---|---|
| 一元 RPC(Unary) | 调用 stub->UnaryCall(&ctx, req, &res) 阻塞返回时,从 res 中获取服务端响应 | 实现 ExampleService::UnaryCall 方法时,直接从入参 req 中获取客户端请求数据 |
| 服务端流(ServerStream) | 调用 stub->ServerStream(&ctx, req, &reader) 后,循环调用 reader->Read(&res) 接收服务端推送的每一条数据;Read() 返回 false 表示流结束 | 实现 ExampleService::ServerStream 方法时,直接从入参 req 中获取客户端请求数据(仅 1 次) |
| 客户端流(ClientStream) | 调用 writer->Finish(&res) 阻塞返回时,从 res 中获取服务端最终响应 | 实现 ExampleService::ClientStream 方法时,循环调用 reader->Read(&req) 接收客户端推送的每一条数据;Read() 返回 false 表示客户端发送完毕 |
| 双向流(BidiStream) | 调用 stub->BidiStream(&ctx, &stream) 后,循环调用 stream->Read(&res) 接收服务端推送的每一条数据;Read() 返回 false 表示服务端流结束 | 实现 ExampleService::BidiStream 方法时,循环调用 stream->Read(&req) 接收客户端推送的每一条数据;Read() 返回 false 表示客户端流结束 |