diff --git a/example/snippets/client_rpc.cpp b/example/snippets/client_rpc.cpp index f781ea5f..c368f792 100644 --- a/example/snippets/client_rpc.cpp +++ b/example/snippets/client_rpc.cpp @@ -20,7 +20,7 @@ namespace asio = boost::asio; /* [client_rpc-unary] */ -asio::awaitable unary(agrpc::GrpcContext& grpc_context, example::v1::Example::Stub& stub) +asio::awaitable client_rpc_unary(agrpc::GrpcContext& grpc_context, example::v1::Example::Stub& stub) { using RPC = agrpc::ClientRPC<&example::v1::Example::Stub::PrepareAsyncUnary>; grpc::ClientContext client_context; @@ -38,7 +38,7 @@ asio::awaitable unary(agrpc::GrpcContext& grpc_context, example::v1::Examp /* [client_rpc-unary] */ /* [client_rpc-client-streaming] */ -asio::awaitable client_streaming(agrpc::GrpcContext& grpc_context, example::v1::Example::Stub& stub) +asio::awaitable client_rpc_client_streaming(agrpc::GrpcContext& grpc_context, example::v1::Example::Stub& stub) { using RPC = asio::use_awaitable_t<>::as_default_on_t< agrpc::ClientRPC<&example::v1::Example::Stub::PrepareAsyncClientStreaming>>; @@ -74,7 +74,7 @@ asio::awaitable client_streaming(agrpc::GrpcContext& grpc_context, example /* [client_rpc-client-streaming] */ /* [client_rpc-server-streaming] */ -asio::awaitable server_streaming(agrpc::GrpcContext& grpc_context, example::v1::Example::Stub& stub) +asio::awaitable client_rpc_server_streaming(agrpc::GrpcContext& grpc_context, example::v1::Example::Stub& stub) { using RPC = asio::use_awaitable_t<>::as_default_on_t< agrpc::ClientRPC<&example::v1::Example::Stub::PrepareAsyncServerStreaming>>; @@ -107,7 +107,8 @@ asio::awaitable server_streaming(agrpc::GrpcContext& grpc_context, example /* [client_rpc-server-streaming] */ /* [client_rpc-bidi-streaming] */ -asio::awaitable bidirectional_streaming(agrpc::GrpcContext& grpc_context, example::v1::Example::Stub& stub) +asio::awaitable client_rpc_bidirectional_streaming(agrpc::GrpcContext& grpc_context, + example::v1::Example::Stub& stub) { using RPC = asio::use_awaitable_t<>::as_default_on_t< agrpc::ClientRPC<&example::v1::Example::Stub::PrepareAsyncBidirectionalStreaming>>; @@ -143,7 +144,7 @@ asio::awaitable bidirectional_streaming(agrpc::GrpcContext& grpc_context, /* [client_rpc-bidi-streaming] */ /* [client_rpc-generic-unary] */ -asio::awaitable generic_unary(agrpc::GrpcContext& grpc_context, grpc::GenericStub& stub) +asio::awaitable client_rpc_generic_unary(agrpc::GrpcContext& grpc_context, grpc::GenericStub& stub) { grpc::ClientContext client_context; client_context.set_deadline(std::chrono::system_clock::now() + std::chrono::seconds(5)); @@ -179,5 +180,5 @@ asio::awaitable generic_unary(agrpc::GrpcContext& grpc_context, grpc::Gene /* [client_rpc-generic-unary] */ /* [client_rpc-generic-streaming] */ -asio::awaitable client_generic_streaming_request(grpc::GenericStub&) { co_return; } +asio::awaitable client_rpc_generic_streaming(agrpc::GrpcContext&, grpc::GenericStub&) { co_return; } /* [client_rpc-generic-streaming] */ diff --git a/src/agrpc/client_rpc.hpp b/src/agrpc/client_rpc.hpp index 41eda736..fe4aa7ee 100644 --- a/src/agrpc/client_rpc.hpp +++ b/src/agrpc/client_rpc.hpp @@ -23,46 +23,12 @@ #include #include #include +#include #include #include AGRPC_NAMESPACE_BEGIN() -namespace detail -{ -/** - * @brief (experimental) RPC's executor base - * - * @since 2.1.0 - */ -template -class RPCExecutorBase -{ - public: - /** - * @brief The executor type - */ - using executor_type = Executor; - - /** - * @brief Get the executor - * - * Thread-safe - */ - [[nodiscard]] const executor_type& get_executor() const noexcept { return executor_; } - - protected: - RPCExecutorBase() : executor_(agrpc::GrpcExecutor{}) {} - - explicit RPCExecutorBase(const Executor& executor) : executor_(executor) {} - - auto& grpc_context() const noexcept { return detail::query_grpc_context(executor_); } - - private: - Executor executor_; -}; -} - namespace detail { /** @@ -521,7 +487,7 @@ class ClientRPC template > auto read_initial_metadata(CompletionToken token = detail::DefaultCompletionTokenT{}) { - return detail::async_initiate_sender_implementation>( + return detail::async_initiate_sender_implementation>( this->grpc_context(), {}, {*this}, token); } @@ -536,7 +502,7 @@ class ClientRPC CompletionToken token = detail::DefaultCompletionTokenT{}) { return detail::async_initiate_sender_implementation< - detail::WriteClientStreamingSenderImplementation>(this->grpc_context(), {request, options}, + detail::WriteClientStreamingSenderImplementation>(this->grpc_context(), {request, options}, {*this}, token); } @@ -591,8 +557,8 @@ class ClientRPC } private: - friend detail::ReadInitialMetadataSenderImplementation; - friend detail::WriteClientStreamingSenderImplementation; + friend detail::ReadInitialMetadataSenderImplementation; + friend detail::WriteClientStreamingSenderImplementation; friend detail::ClientFinishSenderImplementation; friend detail::ClientStreamingRequestSenderInitiation; friend detail::ClientStreamingRequestSenderImplementation; @@ -766,9 +732,8 @@ class ClientRPCServerStreamingBase template > auto read_initial_metadata(CompletionToken token = detail::DefaultCompletionTokenT{}) { - return detail::async_initiate_sender_implementation< - detail::ReadInitialMetadataSenderImplementation>(this->grpc_context(), {}, - {*this}, token); + return detail::async_initiate_sender_implementation>( + this->grpc_context(), {}, {*this}, token); } /** @@ -785,9 +750,8 @@ class ClientRPCServerStreamingBase template > auto read(ResponseT& response, CompletionToken token = detail::DefaultCompletionTokenT{}) { - return detail::async_initiate_sender_implementation< - detail::ReadServerStreamingSenderImplementation>(this->grpc_context(), - {response}, {*this}, token); + return detail::async_initiate_sender_implementation>( + this->grpc_context(), {response}, {*this}, token); } /** @@ -822,14 +786,14 @@ class ClientRPCServerStreamingBase auto finish(CompletionToken token = detail::DefaultCompletionTokenT{}) { return detail::async_initiate_sender_implementation< - detail::ClientFinishServerStreamingSenderImplementation>(this->grpc_context(), - {}, {*this}, token); + detail::ClientFinishServerStreamingSenderImplementation>(this->grpc_context(), {}, {*this}, + token); } private: - friend detail::ReadInitialMetadataSenderImplementation; - friend detail::ReadServerStreamingSenderImplementation; - friend detail::ClientFinishServerStreamingSenderImplementation; + friend detail::ReadInitialMetadataSenderImplementation; + friend detail::ReadServerStreamingSenderImplementation; + friend detail::ClientFinishServerStreamingSenderImplementation; friend detail::ClientStreamingRequestSenderInitiation; friend detail::ClientStreamingRequestSenderImplementation; }; @@ -969,9 +933,8 @@ class ClientRPCBidiStreamingBase, Executor> template > auto read_initial_metadata(CompletionToken token = detail::DefaultCompletionTokenT{}) { - return detail::async_initiate_sender_implementation< - detail::ReadInitialMetadataSenderImplementation>(this->grpc_context(), {}, - {*this}, token); + return detail::async_initiate_sender_implementation>( + this->grpc_context(), {}, {*this}, token); } /** @@ -990,8 +953,8 @@ class ClientRPCBidiStreamingBase, Executor> auto read(ResponseT& response, CompletionToken token = detail::DefaultCompletionTokenT{}) { return detail::async_initiate_sender_implementation< - detail::ClientReadBidiStreamingSenderImplementation>(this->grpc_context(), {response}, - {*this}, token); + detail::ClientReadBidiStreamingSenderImplementation>(this->grpc_context(), {response}, {*this}, + token); } /** @@ -1012,8 +975,8 @@ class ClientRPCBidiStreamingBase, Executor> CompletionToken token = detail::DefaultCompletionTokenT{}) { return detail::async_initiate_sender_implementation< - detail::ClientWriteBidiStreamingSenderImplementation>( - this->grpc_context(), {request, options}, {*this}, token); + detail::ClientWriteBidiStreamingSenderImplementation>(this->grpc_context(), {request, options}, + {*this}, token); } /** @@ -1042,9 +1005,8 @@ class ClientRPCBidiStreamingBase, Executor> template > auto writes_done(CompletionToken token = detail::DefaultCompletionTokenT{}) { - return detail::async_initiate_sender_implementation< - detail::ClientWritesDoneSenderImplementation>(this->grpc_context(), {}, {*this}, - token); + return detail::async_initiate_sender_implementation>( + this->grpc_context(), {}, {*this}, token); } /** @@ -1084,10 +1046,10 @@ class ClientRPCBidiStreamingBase, Executor> } private: - friend detail::ReadInitialMetadataSenderImplementation; - friend detail::ClientReadBidiStreamingSenderImplementation; - friend detail::ClientWriteBidiStreamingSenderImplementation; - friend detail::ClientWritesDoneSenderImplementation; + friend detail::ReadInitialMetadataSenderImplementation; + friend detail::ClientReadBidiStreamingSenderImplementation; + friend detail::ClientWriteBidiStreamingSenderImplementation; + friend detail::ClientWritesDoneSenderImplementation; friend detail::ClientFinishSenderImplementation; }; } diff --git a/src/agrpc/detail/alarm.hpp b/src/agrpc/detail/alarm.hpp index 3d72e0c3..0fff5474 100644 --- a/src/agrpc/detail/alarm.hpp +++ b/src/agrpc/detail/alarm.hpp @@ -38,9 +38,9 @@ struct MoveAlarmSenderImplementation auto& stop_function_arg(const Initiation&) noexcept { return alarm_.alarm_; } - void initiate(agrpc::GrpcContext& grpc_context, const Initiation& deadline, detail::OperationBase* operation) + void initiate(agrpc::GrpcContext& grpc_context, const Initiation& deadline, void* tag) { - detail::AlarmInitFunction{alarm_.alarm_, deadline}(grpc_context, operation); + detail::AlarmInitFunction{alarm_.alarm_, deadline}(grpc_context, tag); } template diff --git a/src/agrpc/detail/client_rpc_sender.hpp b/src/agrpc/detail/client_rpc_sender.hpp index 0864c217..69d2ec6e 100644 --- a/src/agrpc/detail/client_rpc_sender.hpp +++ b/src/agrpc/detail/client_rpc_sender.hpp @@ -18,6 +18,7 @@ #include #include #include +#include #include #include #include @@ -26,31 +27,12 @@ AGRPC_NAMESPACE_BEGIN() -/** - * @brief (experimental) Primary ClientRPC template - * - * This is the main entrypoint into the recommended API for writing asynchronous gRPC clients. - * - * @see - * @c agrpc::ClientRPC
- * @c agrpc::ClientRPC
- * @c agrpc::ClientRPC
- * @c agrpc::ClientRPC
- * @c agrpc::ClientRPC
- * @c agrpc::ClientRPC
- * - * @since 2.6.0 - */ -template -class ClientRPC; - namespace detail { template class ClientRPCUnaryBase; -template -class ClientRPCServerStreamingBase; +using ClientRPCAccess = AutoCancelClientContextAndResponderAccess; struct ClientContextCancellationFunction { @@ -96,10 +78,10 @@ struct ClientUnaryRequestSenderImplementationBase> grpc::ClientContext& stop_function_arg(const Initiation& initiation) noexcept { return initiation.client_context_; } - void initiate(const agrpc::GrpcContext&, const Initiation& initiation, detail::OperationBase* operation) noexcept + void initiate(const agrpc::GrpcContext&, const Initiation& initiation, void* tag) noexcept { responder_->StartCall(); - responder_->Finish(&initiation.response_, &status_, operation); + responder_->Finish(&initiation.response_, &status_, tag); } template @@ -150,12 +132,14 @@ struct ClientStreamingRequestSenderInitiation { using RPC = agrpc::ClientRPC; - RPC& rpc_; - ClientStreamingRequestSenderInitiation(RPC& rpc, Stub& stub, Response& response) : rpc_(rpc) { - rpc.set_responder((stub.*PrepareAsync)(&rpc.context(), &response, rpc.grpc_context().get_completion_queue())); + ClientRPCAccess::set_responder( + rpc, (stub.*PrepareAsync)(&rpc.context(), &response, + RPCExecutorBaseAccess::grpc_context(rpc).get_completion_queue())); } + + RPC& rpc_; }; template class Responder, @@ -165,12 +149,14 @@ struct ClientStreamingRequestSenderInitiation { using RPC = detail::ClientRPCServerStreamingBase; - RPC& rpc_; - ClientStreamingRequestSenderInitiation(RPC& rpc, Stub& stub, const Request& req) : rpc_(rpc) { - rpc.set_responder((stub.*PrepareAsync)(&rpc.context(), req, rpc.grpc_context().get_completion_queue())); + ClientRPCAccess::set_responder( + rpc, + (stub.*PrepareAsync)(&rpc.context(), req, RPCExecutorBaseAccess::grpc_context(rpc).get_completion_queue())); } + + RPC& rpc_; }; template class Responder, @@ -180,12 +166,13 @@ struct ClientStreamingRequestSenderInitiation { using RPC = agrpc::ClientRPC; - RPC& rpc_; - ClientStreamingRequestSenderInitiation(RPC& rpc, Stub& stub) : rpc_(rpc) { - rpc.set_responder((stub.*PrepareAsync)(&rpc.context(), rpc.grpc_context().get_completion_queue())); + ClientRPCAccess::set_responder( + rpc, (stub.*PrepareAsync)(&rpc.context(), RPCExecutorBaseAccess::grpc_context(rpc).get_completion_queue())); } + + RPC& rpc_; }; template @@ -193,12 +180,14 @@ struct ClientStreamingRequestSenderInitiation; - RPC& rpc_; - ClientStreamingRequestSenderInitiation(RPC& rpc, const std::string& method, grpc::GenericStub& stub) : rpc_(rpc) { - rpc.set_responder(stub.PrepareCall(&rpc.context(), method, rpc.grpc_context().get_completion_queue())); + ClientRPCAccess::set_responder( + rpc, + stub.PrepareCall(&rpc.context(), method, RPCExecutorBaseAccess::grpc_context(rpc).get_completion_queue())); } + + RPC& rpc_; }; template @@ -209,9 +198,9 @@ struct ClientStreamingRequestSenderImplementation : detail::GrpcSenderImplementa auto& stop_function_arg(const Initiation& initiation) noexcept { return initiation.rpc_.context(); } - static void initiate(const agrpc::GrpcContext&, const Initiation& initiation, void* self) noexcept + static void initiate(const agrpc::GrpcContext&, const Initiation& initiation, void* tag) noexcept { - initiation.rpc_.responder().StartCall(self); + ClientRPCAccess::responder(initiation.rpc_).StartCall(tag); } template @@ -221,9 +210,10 @@ struct ClientStreamingRequestSenderImplementation : detail::GrpcSenderImplementa } }; -template +template struct ReadInitialMetadataSenderImplementation : detail::GrpcSenderImplementationBase { + using RPC = detail::AutoCancelClientContextAndResponder; using Initiation = detail::Empty; using StopFunction = detail::ClientContextCancellationFunction; @@ -231,9 +221,9 @@ struct ReadInitialMetadataSenderImplementation : detail::GrpcSenderImplementatio grpc::ClientContext& stop_function_arg(const Initiation&) noexcept { return rpc_.context(); } - void initiate(const agrpc::GrpcContext&, const Initiation&, void* self) noexcept + void initiate(const agrpc::GrpcContext&, const Initiation&, void* tag) noexcept { - rpc_.responder().ReadInitialMetadata(self); + ClientRPCAccess::responder(rpc_).ReadInitialMetadata(tag); } template @@ -245,23 +235,27 @@ struct ReadInitialMetadataSenderImplementation : detail::GrpcSenderImplementatio RPC& rpc_; }; -template -struct ReadServerStreamingSenderImplementation : detail::GrpcSenderImplementationBase +template +struct ReadServerStreamingSenderImplementation; + +template