Skip to content

Commit

Permalink
feat: Make ClientRPC safe for inheritance
Browse files Browse the repository at this point in the history
  • Loading branch information
Tradias committed Jul 29, 2023
1 parent de01be2 commit 79cfd43
Show file tree
Hide file tree
Showing 9 changed files with 287 additions and 174 deletions.
13 changes: 7 additions & 6 deletions example/snippets/client_rpc.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
namespace asio = boost::asio;

/* [client_rpc-unary] */
asio::awaitable<void> unary(agrpc::GrpcContext& grpc_context, example::v1::Example::Stub& stub)
asio::awaitable<void> client_rpc_unary(agrpc::GrpcContext& grpc_context, example::v1::Example::Stub& stub)
{
using RPC = agrpc::ClientRPC<&example::v1::Example::Stub::PrepareAsyncUnary>;
grpc::ClientContext client_context;
Expand All @@ -38,7 +38,7 @@ asio::awaitable<void> unary(agrpc::GrpcContext& grpc_context, example::v1::Examp
/* [client_rpc-unary] */

/* [client_rpc-client-streaming] */
asio::awaitable<void> client_streaming(agrpc::GrpcContext& grpc_context, example::v1::Example::Stub& stub)
asio::awaitable<void> client_rpc_client_streaming(agrpc::GrpcContext& grpc_context, example::v1::Example::Stub& stub)
{
using RPC = asio::use_awaitable_t<>::as_default_on_t<
agrpc::ClientRPC<&example::v1::Example::Stub::PrepareAsyncClientStreaming>>;
Expand Down Expand Up @@ -74,7 +74,7 @@ asio::awaitable<void> client_streaming(agrpc::GrpcContext& grpc_context, example
/* [client_rpc-client-streaming] */

/* [client_rpc-server-streaming] */
asio::awaitable<void> server_streaming(agrpc::GrpcContext& grpc_context, example::v1::Example::Stub& stub)
asio::awaitable<void> client_rpc_server_streaming(agrpc::GrpcContext& grpc_context, example::v1::Example::Stub& stub)
{
using RPC = asio::use_awaitable_t<>::as_default_on_t<
agrpc::ClientRPC<&example::v1::Example::Stub::PrepareAsyncServerStreaming>>;
Expand Down Expand Up @@ -107,7 +107,8 @@ asio::awaitable<void> server_streaming(agrpc::GrpcContext& grpc_context, example
/* [client_rpc-server-streaming] */

/* [client_rpc-bidi-streaming] */
asio::awaitable<void> bidirectional_streaming(agrpc::GrpcContext& grpc_context, example::v1::Example::Stub& stub)
asio::awaitable<void> client_rpc_bidirectional_streaming(agrpc::GrpcContext& grpc_context,
example::v1::Example::Stub& stub)
{
using RPC = asio::use_awaitable_t<>::as_default_on_t<
agrpc::ClientRPC<&example::v1::Example::Stub::PrepareAsyncBidirectionalStreaming>>;
Expand Down Expand Up @@ -143,7 +144,7 @@ asio::awaitable<void> bidirectional_streaming(agrpc::GrpcContext& grpc_context,
/* [client_rpc-bidi-streaming] */

/* [client_rpc-generic-unary] */
asio::awaitable<void> generic_unary(agrpc::GrpcContext& grpc_context, grpc::GenericStub& stub)
asio::awaitable<void> client_rpc_generic_unary(agrpc::GrpcContext& grpc_context, grpc::GenericStub& stub)
{
grpc::ClientContext client_context;
client_context.set_deadline(std::chrono::system_clock::now() + std::chrono::seconds(5));
Expand Down Expand Up @@ -179,5 +180,5 @@ asio::awaitable<void> generic_unary(agrpc::GrpcContext& grpc_context, grpc::Gene
/* [client_rpc-generic-unary] */

/* [client_rpc-generic-streaming] */
asio::awaitable<void> client_generic_streaming_request(grpc::GenericStub&) { co_return; }
asio::awaitable<void> client_rpc_generic_streaming(agrpc::GrpcContext&, grpc::GenericStub&) { co_return; }
/* [client_rpc-generic-streaming] */
90 changes: 26 additions & 64 deletions src/agrpc/client_rpc.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,46 +23,12 @@
#include <agrpc/detail/initiate_sender_implementation.hpp>
#include <agrpc/detail/name.hpp>
#include <agrpc/detail/rpc_client_context_base.hpp>
#include <agrpc/detail/rpc_executor_base.hpp>
#include <agrpc/detail/rpc_type.hpp>
#include <agrpc/grpc_executor.hpp>

AGRPC_NAMESPACE_BEGIN()

namespace detail
{
/**
* @brief (experimental) RPC's executor base
*
* @since 2.1.0
*/
template <class Executor>
class RPCExecutorBase
{
public:
/**
* @brief The executor type
*/
using executor_type = Executor;

/**
* @brief Get the executor
*
* Thread-safe
*/
[[nodiscard]] const executor_type& get_executor() const noexcept { return executor_; }

protected:
RPCExecutorBase() : executor_(agrpc::GrpcExecutor{}) {}

explicit RPCExecutorBase(const Executor& executor) : executor_(executor) {}

auto& grpc_context() const noexcept { return detail::query_grpc_context(executor_); }

private:
Executor executor_;
};
}

namespace detail
{
/**
Expand Down Expand Up @@ -521,7 +487,7 @@ class ClientRPC<PrepareAsyncClientStreaming, Executor>
template <class CompletionToken = detail::DefaultCompletionTokenT<Executor>>
auto read_initial_metadata(CompletionToken token = detail::DefaultCompletionTokenT<Executor>{})
{
return detail::async_initiate_sender_implementation<detail::ReadInitialMetadataSenderImplementation<ClientRPC>>(
return detail::async_initiate_sender_implementation<detail::ReadInitialMetadataSenderImplementation<Responder>>(
this->grpc_context(), {}, {*this}, token);
}

Expand All @@ -536,7 +502,7 @@ class ClientRPC<PrepareAsyncClientStreaming, Executor>
CompletionToken token = detail::DefaultCompletionTokenT<Executor>{})
{
return detail::async_initiate_sender_implementation<
detail::WriteClientStreamingSenderImplementation<ClientRPC>>(this->grpc_context(), {request, options},
detail::WriteClientStreamingSenderImplementation<Responder>>(this->grpc_context(), {request, options},
{*this}, token);
}

Expand Down Expand Up @@ -591,8 +557,8 @@ class ClientRPC<PrepareAsyncClientStreaming, Executor>
}

private:
friend detail::ReadInitialMetadataSenderImplementation<ClientRPC>;
friend detail::WriteClientStreamingSenderImplementation<ClientRPC>;
friend detail::ReadInitialMetadataSenderImplementation<Responder>;
friend detail::WriteClientStreamingSenderImplementation<Responder>;
friend detail::ClientFinishSenderImplementation<ClientRPC>;
friend detail::ClientStreamingRequestSenderInitiation<PrepareAsyncClientStreaming, Executor>;
friend detail::ClientStreamingRequestSenderImplementation<PrepareAsyncClientStreaming, Executor>;
Expand Down Expand Up @@ -766,9 +732,8 @@ class ClientRPCServerStreamingBase<PrepareAsyncServerStreaming, Executor>
template <class CompletionToken = detail::DefaultCompletionTokenT<Executor>>
auto read_initial_metadata(CompletionToken token = detail::DefaultCompletionTokenT<Executor>{})
{
return detail::async_initiate_sender_implementation<
detail::ReadInitialMetadataSenderImplementation<ClientRPCServerStreamingBase>>(this->grpc_context(), {},
{*this}, token);
return detail::async_initiate_sender_implementation<detail::ReadInitialMetadataSenderImplementation<Responder>>(
this->grpc_context(), {}, {*this}, token);
}

/**
Expand All @@ -785,9 +750,8 @@ class ClientRPCServerStreamingBase<PrepareAsyncServerStreaming, Executor>
template <class CompletionToken = detail::DefaultCompletionTokenT<Executor>>
auto read(ResponseT& response, CompletionToken token = detail::DefaultCompletionTokenT<Executor>{})
{
return detail::async_initiate_sender_implementation<
detail::ReadServerStreamingSenderImplementation<ClientRPCServerStreamingBase>>(this->grpc_context(),
{response}, {*this}, token);
return detail::async_initiate_sender_implementation<detail::ReadServerStreamingSenderImplementation<Responder>>(
this->grpc_context(), {response}, {*this}, token);
}

/**
Expand Down Expand Up @@ -822,14 +786,14 @@ class ClientRPCServerStreamingBase<PrepareAsyncServerStreaming, Executor>
auto finish(CompletionToken token = detail::DefaultCompletionTokenT<Executor>{})
{
return detail::async_initiate_sender_implementation<
detail::ClientFinishServerStreamingSenderImplementation<ClientRPCServerStreamingBase>>(this->grpc_context(),
{}, {*this}, token);
detail::ClientFinishServerStreamingSenderImplementation<Responder>>(this->grpc_context(), {}, {*this},
token);
}

private:
friend detail::ReadInitialMetadataSenderImplementation<ClientRPCServerStreamingBase>;
friend detail::ReadServerStreamingSenderImplementation<ClientRPCServerStreamingBase>;
friend detail::ClientFinishServerStreamingSenderImplementation<ClientRPCServerStreamingBase>;
friend detail::ReadInitialMetadataSenderImplementation<Responder>;
friend detail::ReadServerStreamingSenderImplementation<Responder>;
friend detail::ClientFinishServerStreamingSenderImplementation<Responder>;
friend detail::ClientStreamingRequestSenderInitiation<PrepareAsyncServerStreaming, Executor>;
friend detail::ClientStreamingRequestSenderImplementation<PrepareAsyncServerStreaming, Executor>;
};
Expand Down Expand Up @@ -969,9 +933,8 @@ class ClientRPCBidiStreamingBase<ResponderT<RequestT, ResponseT>, Executor>
template <class CompletionToken = detail::DefaultCompletionTokenT<Executor>>
auto read_initial_metadata(CompletionToken token = detail::DefaultCompletionTokenT<Executor>{})
{
return detail::async_initiate_sender_implementation<
detail::ReadInitialMetadataSenderImplementation<ClientRPCBidiStreamingBase>>(this->grpc_context(), {},
{*this}, token);
return detail::async_initiate_sender_implementation<detail::ReadInitialMetadataSenderImplementation<Responder>>(
this->grpc_context(), {}, {*this}, token);
}

/**
Expand All @@ -990,8 +953,8 @@ class ClientRPCBidiStreamingBase<ResponderT<RequestT, ResponseT>, Executor>
auto read(ResponseT& response, CompletionToken token = detail::DefaultCompletionTokenT<Executor>{})
{
return detail::async_initiate_sender_implementation<
detail::ClientReadBidiStreamingSenderImplementation<Responder, Executor>>(this->grpc_context(), {response},
{*this}, token);
detail::ClientReadBidiStreamingSenderImplementation<Responder>>(this->grpc_context(), {response}, {*this},
token);
}

/**
Expand All @@ -1012,8 +975,8 @@ class ClientRPCBidiStreamingBase<ResponderT<RequestT, ResponseT>, Executor>
CompletionToken token = detail::DefaultCompletionTokenT<Executor>{})
{
return detail::async_initiate_sender_implementation<
detail::ClientWriteBidiStreamingSenderImplementation<Responder, Executor>>(
this->grpc_context(), {request, options}, {*this}, token);
detail::ClientWriteBidiStreamingSenderImplementation<Responder>>(this->grpc_context(), {request, options},
{*this}, token);
}

/**
Expand Down Expand Up @@ -1042,9 +1005,8 @@ class ClientRPCBidiStreamingBase<ResponderT<RequestT, ResponseT>, Executor>
template <class CompletionToken = detail::DefaultCompletionTokenT<Executor>>
auto writes_done(CompletionToken token = detail::DefaultCompletionTokenT<Executor>{})
{
return detail::async_initiate_sender_implementation<
detail::ClientWritesDoneSenderImplementation<Responder, Executor>>(this->grpc_context(), {}, {*this},
token);
return detail::async_initiate_sender_implementation<detail::ClientWritesDoneSenderImplementation<Responder>>(
this->grpc_context(), {}, {*this}, token);
}

/**
Expand Down Expand Up @@ -1084,10 +1046,10 @@ class ClientRPCBidiStreamingBase<ResponderT<RequestT, ResponseT>, Executor>
}

private:
friend detail::ReadInitialMetadataSenderImplementation<ClientRPCBidiStreamingBase>;
friend detail::ClientReadBidiStreamingSenderImplementation<Responder, Executor>;
friend detail::ClientWriteBidiStreamingSenderImplementation<Responder, Executor>;
friend detail::ClientWritesDoneSenderImplementation<Responder, Executor>;
friend detail::ReadInitialMetadataSenderImplementation<Responder>;
friend detail::ClientReadBidiStreamingSenderImplementation<Responder>;
friend detail::ClientWriteBidiStreamingSenderImplementation<Responder>;
friend detail::ClientWritesDoneSenderImplementation<Responder>;
friend detail::ClientFinishSenderImplementation<ClientRPCBidiStreamingBase>;
};
}
Expand Down
4 changes: 2 additions & 2 deletions src/agrpc/detail/alarm.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,9 @@ struct MoveAlarmSenderImplementation

auto& stop_function_arg(const Initiation&) noexcept { return alarm_.alarm_; }

void initiate(agrpc::GrpcContext& grpc_context, const Initiation& deadline, detail::OperationBase* operation)
void initiate(agrpc::GrpcContext& grpc_context, const Initiation& deadline, void* tag)
{
detail::AlarmInitFunction{alarm_.alarm_, deadline}(grpc_context, operation);
detail::AlarmInitFunction{alarm_.alarm_, deadline}(grpc_context, tag);
}

template <class OnDone>
Expand Down
Loading

0 comments on commit 79cfd43

Please sign in to comment.