From 9d4ff3c6b6a39975040b7d366155a30e87d14083 Mon Sep 17 00:00:00 2001 From: Mike Aizatsky Date: Wed, 11 Dec 2024 10:49:59 -0800 Subject: [PATCH] traits for tokio channel() --- recapn-rpc/src/connection.rs | 26 ++++++++++++++------------ recapn-rpc/src/lib.rs | 1 + recapn-rpc/src/rt.rs | 32 ++++++++++++++++++++++++++++++++ recapn-rpc/src/tokio.rs | 33 +++++++++++++++++++++++++++++---- 4 files changed, 76 insertions(+), 16 deletions(-) create mode 100644 recapn-rpc/src/rt.rs diff --git a/recapn-rpc/src/connection.rs b/recapn-rpc/src/connection.rs index 186d08e..6ee7ce6 100644 --- a/recapn-rpc/src/connection.rs +++ b/recapn-rpc/src/connection.rs @@ -21,6 +21,7 @@ use crate::chan::{ use crate::client::Client; use crate::gen::capnp_rpc_capnp as rpc_capnp; use crate::pipeline::PipelineOp; +use crate::rt::{Receiver, Sender}; use crate::table::{CapTable, Table}; use crate::{Error, ErrorKind}; use rpc_capnp::promised_answer::Op; @@ -427,7 +428,7 @@ pub struct ConnectionOptions { pub reader_options: ReaderOptions, } -pub struct Connection { +pub struct Connection { exports: ExportTable, questions: ExportTable, answers: ImportTable, @@ -439,8 +440,8 @@ pub struct Connection { conn_events_sender: EventSender, conn_events: EventReceiver, - cap_messages_sender: crate::tokio::Sender, - cap_messages: crate::tokio::Receiver, + cap_messages_sender: Rt::Sender, + cap_messages: Rt::Receiver, call_words_in_flight: usize, @@ -451,10 +452,10 @@ pub struct Connection { outbound: T, } -impl Connection { +impl Connection { pub fn new(outbound: T, bootstrap: Client, options: ConnectionOptions) -> Self { let (conn_events_sender, conn_events) = crate::tokio::unbounded_channel(); - let (cap_messages_sender, cap_messages) = crate::tokio::channel(8); + let (cap_messages_sender, cap_messages) = Rt::channel(8); Connection { exports: ExportTable::new(), questions: ExportTable::new(), @@ -474,7 +475,7 @@ impl Connection { } } -impl Connection { +impl Connection { #[inline] fn same_connection(&self, id: &ConnectionId) -> bool { id.0.same_channel(&self.conn_events_sender) @@ -498,7 +499,7 @@ impl Connection { self.outbound.send(OutboundMessage { message }); let (sender, receiver) = mpsc::channel(chan::RpcChannel::Bootstrap(id)); - let cap_handler = ReceivedCap { + let cap_handler = ReceivedCap:: { target: CapTarget::bootstrap(id), resolution: resolve_receiver, @@ -506,7 +507,8 @@ impl Connection { outbound: self.cap_messages_sender.clone(), }; - crate::tokio::spawn(cap_handler.handle()); + let fut = cap_handler.handle(); + crate::tokio::spawn(fut); Client(sender) } @@ -1134,16 +1136,16 @@ enum CapResolution { } /// Handles forwarding messages from an internal mpsc into a connection mpsc. -struct ReceivedCap { +struct ReceivedCap { target: CapTarget, resolution: crate::tokio::oneshot::Receiver, inbound: chan::Receiver, - outbound: crate::tokio::Sender, + outbound: Rt::Sender, } -impl ReceivedCap { - pub async fn handle(mut self) { +impl ReceivedCap { + pub async fn handle(mut self) -> () { let mut handled_message = false; loop { crate::tokio::select! { diff --git a/recapn-rpc/src/lib.rs b/recapn-rpc/src/lib.rs index a27658e..fcb70ad 100644 --- a/recapn-rpc/src/lib.rs +++ b/recapn-rpc/src/lib.rs @@ -22,6 +22,7 @@ pub mod pipeline; pub mod server; pub mod table; mod tokio; +mod rt; pub(crate) use gen::capnp_rpc_capnp as rpc_capnp; pub use rpc_capnp::exception::Type as ErrorKind; diff --git a/recapn-rpc/src/rt.rs b/recapn-rpc/src/rt.rs new file mode 100644 index 0000000..f95ad93 --- /dev/null +++ b/recapn-rpc/src/rt.rs @@ -0,0 +1,32 @@ +pub trait Rt { + type Sender: Sender; + type Receiver: Receiver; + + fn channel(buffer: usize) -> (Self::Sender, Self::Receiver); +} + +pub trait Sender: Clone + Send { + fn send(&self, value: T) -> impl std::future::Future>> + Send; +} + +/// Error returned by the `Sender`. +#[derive(PartialEq, Eq, Clone, Copy)] +pub struct SendError(pub T); + +impl core::fmt::Debug for SendError { + fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result { + f.debug_struct("SendError").finish_non_exhaustive() + } +} + +impl core::fmt::Display for SendError { + fn fmt(&self, fmt: &mut core::fmt::Formatter<'_>) -> core::fmt::Result { + write!(fmt, "channel closed") + } +} + +impl core::error::Error for SendError {} + +pub trait Receiver { + async fn recv(&mut self) -> Option; +} diff --git a/recapn-rpc/src/tokio.rs b/recapn-rpc/src/tokio.rs index 9aaf12e..9468119 100644 --- a/recapn-rpc/src/tokio.rs +++ b/recapn-rpc/src/tokio.rs @@ -1,14 +1,10 @@ use tokio::sync::mpsc; -pub use mpsc::channel; pub use mpsc::unbounded_channel; pub type UnboundedSender = mpsc::UnboundedSender; pub type UnboundedReceiver = mpsc::UnboundedReceiver; -pub type Sender = mpsc::Sender; -pub type Receiver = mpsc::Receiver; - pub use tokio::sync::oneshot; pub use tokio::select; @@ -17,3 +13,32 @@ pub use tokio::task::JoinHandle; pub use tokio::task::spawn; pub use tokio::task::spawn_local; + +impl crate::rt::Sender for mpsc::Sender { + async fn send(&self, value: T) -> Result<(), crate::rt::SendError> { + self.send(value).await.map_err(|err| err.into()) + } +} + +impl crate::rt::Receiver for mpsc::Receiver { + async fn recv(&mut self) -> Option { + self.recv().await + } +} + +struct Rt; + +impl crate::rt::Rt for Rt { + type Sender = mpsc::Sender; + type Receiver = mpsc::Receiver; + + fn channel(buffer: usize) -> (Self::Sender, Self::Receiver) { + mpsc::channel(buffer) + } +} + +impl From> for crate::rt::SendError { + fn from(value: mpsc::error::SendError) -> Self { + Self(value.0) + } +}