Skip to content

Commit

Permalink
traits for tokio channel()
Browse files Browse the repository at this point in the history
  • Loading branch information
mikea committed Dec 12, 2024
1 parent f82b719 commit 9d4ff3c
Show file tree
Hide file tree
Showing 4 changed files with 76 additions and 16 deletions.
26 changes: 14 additions & 12 deletions recapn-rpc/src/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use crate::chan::{
use crate::client::Client;
use crate::gen::capnp_rpc_capnp as rpc_capnp;
use crate::pipeline::PipelineOp;
use crate::rt::{Receiver, Sender};
use crate::table::{CapTable, Table};
use crate::{Error, ErrorKind};
use rpc_capnp::promised_answer::Op;
Expand Down Expand Up @@ -427,7 +428,7 @@ pub struct ConnectionOptions {
pub reader_options: ReaderOptions,
}

pub struct Connection<T: ?Sized> {
pub struct Connection<T: ?Sized, Rt: crate::rt::Rt> {
exports: ExportTable<Export>,
questions: ExportTable<Question>,
answers: ImportTable<Answer>,
Expand All @@ -439,8 +440,8 @@ pub struct Connection<T: ?Sized> {
conn_events_sender: EventSender,
conn_events: EventReceiver,

cap_messages_sender: crate::tokio::Sender<CapMessage>,
cap_messages: crate::tokio::Receiver<CapMessage>,
cap_messages_sender: Rt::Sender<CapMessage>,
cap_messages: Rt::Receiver<CapMessage>,

call_words_in_flight: usize,

Expand All @@ -451,10 +452,10 @@ pub struct Connection<T: ?Sized> {
outbound: T,
}

impl<T> Connection<T> {
impl<T, Rt: crate::rt::Rt> Connection<T, Rt> {
pub fn new(outbound: T, bootstrap: Client, options: ConnectionOptions) -> Self {
let (conn_events_sender, conn_events) = crate::tokio::unbounded_channel();
let (cap_messages_sender, cap_messages) = crate::tokio::channel(8);
let (cap_messages_sender, cap_messages) = Rt::channel(8);
Connection {
exports: ExportTable::new(),
questions: ExportTable::new(),
Expand All @@ -474,7 +475,7 @@ impl<T> Connection<T> {
}
}

impl<T: MessageOutbound + ?Sized> Connection<T> {
impl<T: MessageOutbound + ?Sized, Rt: crate::rt::Rt + 'static> Connection<T, Rt> {
#[inline]
fn same_connection(&self, id: &ConnectionId) -> bool {
id.0.same_channel(&self.conn_events_sender)
Expand All @@ -498,15 +499,16 @@ impl<T: MessageOutbound + ?Sized> Connection<T> {
self.outbound.send(OutboundMessage { message });

let (sender, receiver) = mpsc::channel(chan::RpcChannel::Bootstrap(id));
let cap_handler = ReceivedCap {
let cap_handler = ReceivedCap::<Rt> {
target: CapTarget::bootstrap(id),
resolution: resolve_receiver,

inbound: receiver,
outbound: self.cap_messages_sender.clone(),
};

crate::tokio::spawn(cap_handler.handle());
let fut = cap_handler.handle();
crate::tokio::spawn(fut);

Client(sender)
}
Expand Down Expand Up @@ -1134,16 +1136,16 @@ enum CapResolution {
}

/// Handles forwarding messages from an internal mpsc into a connection mpsc.
struct ReceivedCap {
struct ReceivedCap<Rt: crate::rt::Rt> {
target: CapTarget,
resolution: crate::tokio::oneshot::Receiver<CapResolution>,

inbound: chan::Receiver,
outbound: crate::tokio::Sender<CapMessage>,
outbound: Rt::Sender<CapMessage>,
}

impl ReceivedCap {
pub async fn handle(mut self) {
impl<Rt: crate::rt::Rt> ReceivedCap<Rt> {
pub async fn handle(mut self) -> () {
let mut handled_message = false;
loop {
crate::tokio::select! {
Expand Down
1 change: 1 addition & 0 deletions recapn-rpc/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ pub mod pipeline;
pub mod server;
pub mod table;
mod tokio;
mod rt;

pub(crate) use gen::capnp_rpc_capnp as rpc_capnp;
pub use rpc_capnp::exception::Type as ErrorKind;
Expand Down
32 changes: 32 additions & 0 deletions recapn-rpc/src/rt.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
pub trait Rt {
type Sender<T: Send>: Sender<T>;
type Receiver<T: Send>: Receiver<T>;

fn channel<T: Send>(buffer: usize) -> (Self::Sender<T>, Self::Receiver<T>);
}

pub trait Sender<T: Send>: Clone + Send {
fn send(&self, value: T) -> impl std::future::Future<Output = Result<(), SendError<T>>> + Send;
}

/// Error returned by the `Sender`.
#[derive(PartialEq, Eq, Clone, Copy)]
pub struct SendError<T: Send>(pub T);

impl<T: Send> core::fmt::Debug for SendError<T> {
fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
f.debug_struct("SendError").finish_non_exhaustive()
}
}

impl<T: Send> core::fmt::Display for SendError<T> {
fn fmt(&self, fmt: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
write!(fmt, "channel closed")
}
}

impl<T: Send> core::error::Error for SendError<T> {}

pub trait Receiver<T: Send> {
async fn recv(&mut self) -> Option<T>;
}
33 changes: 29 additions & 4 deletions recapn-rpc/src/tokio.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,10 @@
use tokio::sync::mpsc;

pub use mpsc::channel;
pub use mpsc::unbounded_channel;

pub type UnboundedSender<T> = mpsc::UnboundedSender<T>;
pub type UnboundedReceiver<T> = mpsc::UnboundedReceiver<T>;

pub type Sender<T> = mpsc::Sender<T>;
pub type Receiver<T> = mpsc::Receiver<T>;

pub use tokio::sync::oneshot;

pub use tokio::select;
Expand All @@ -17,3 +13,32 @@ pub use tokio::task::JoinHandle;

pub use tokio::task::spawn;
pub use tokio::task::spawn_local;

impl<T: Send> crate::rt::Sender<T> for mpsc::Sender<T> {
async fn send(&self, value: T) -> Result<(), crate::rt::SendError<T>> {
self.send(value).await.map_err(|err| err.into())
}
}

impl<T: Send> crate::rt::Receiver<T> for mpsc::Receiver<T> {
async fn recv(&mut self) -> Option<T> {
self.recv().await
}
}

struct Rt;

impl crate::rt::Rt for Rt {
type Sender<T: Send> = mpsc::Sender<T>;
type Receiver<T: Send> = mpsc::Receiver<T>;

fn channel<T: Send>(buffer: usize) -> (Self::Sender<T>, Self::Receiver<T>) {
mpsc::channel(buffer)
}
}

impl<T: Send> From<mpsc::error::SendError<T>> for crate::rt::SendError<T> {
fn from(value: mpsc::error::SendError<T>) -> Self {
Self(value.0)
}
}

0 comments on commit 9d4ff3c

Please sign in to comment.