From d20ba07a9745f1ca99f869af8d128216518f0204 Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Tue, 30 Aug 2022 17:29:21 +0200 Subject: [PATCH 1/2] Add benchmark for xtra vs tokio mpsc channels --- Cargo.toml | 4 ++ benches/xtra_vs_tokio_mpsc.rs | 116 ++++++++++++++++++++++++++++++++++ 2 files changed, 120 insertions(+) create mode 100644 benches/xtra_vs_tokio_mpsc.rs diff --git a/Cargo.toml b/Cargo.toml index 7440b3bc..05dd06e9 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -119,3 +119,7 @@ rustdoc-args = ["--cfg", "docsrs"] [[bench]] name = "throughput" harness = false + +[[bench]] +name = "xtra_vs_tokio_mpsc" +harness = false diff --git a/benches/xtra_vs_tokio_mpsc.rs b/benches/xtra_vs_tokio_mpsc.rs new file mode 100644 index 00000000..1ab1658e --- /dev/null +++ b/benches/xtra_vs_tokio_mpsc.rs @@ -0,0 +1,116 @@ +use std::future::Future; + +use async_trait::async_trait; +use criterion::{criterion_group, criterion_main, BatchSize, BenchmarkId, Criterion}; +use tokio::runtime::Runtime; +use tokio::sync::mpsc; +use xtra::{Actor, Context, Handler}; + +struct Counter(u64); + +#[async_trait] +impl Actor for Counter { + type Stop = (); + async fn stopped(self) -> Self::Stop {} +} + +#[derive(Debug)] +struct Increment {} +struct Stop; + +#[async_trait::async_trait] +impl Handler for Counter { + type Return = (); + + async fn handle(&mut self, _: Increment, _ctx: &mut Context) { + self.0 += 1; + } +} + +#[async_trait::async_trait] +impl Handler for Counter { + type Return = (); + + async fn handle(&mut self, _: Stop, ctx: &mut Context) { + ctx.stop_self(); + } +} + +fn mpsc_counter() -> (mpsc::UnboundedSender, impl Future) { + let (sender, mut receiver) = mpsc::unbounded_channel(); + + let actor = async move { + let mut _counter = 0; + + while let Some(Increment {}) = receiver.recv().await { + _counter += 1; + } + }; + + (sender, actor) +} + +fn xtra_throughput(c: &mut Criterion) { + let mut group = c.benchmark_group("increment"); + let runtime = Runtime::new().unwrap(); + let _g = runtime.enter(); + + for num_messages in [100, 1000, 10000] { + group.bench_with_input( + BenchmarkId::new("xtra", num_messages), + &num_messages, + |b, &num_messages| { + b.to_async(&runtime).iter_batched( + || { + let (xtra_address, xtra_context) = Context::new(None); + runtime.spawn(xtra_context.run(Counter(0))); + + xtra_address + }, + |xtra_address| async move { + for _ in 0..num_messages - 1 { + let _ = xtra_address.send(Increment {}).await.unwrap(); + } + + xtra_address.send(Stop).await.unwrap(); + }, + BatchSize::SmallInput, + ); + }, + ); + } +} + +fn mpsc_throughput(c: &mut Criterion) { + let mut group = c.benchmark_group("increment"); + let runtime = Runtime::new().unwrap(); + let _g = runtime.enter(); + + for num_messages in [100, 1000, 10000] { + group.bench_with_input( + BenchmarkId::new("mpsc", num_messages), + &num_messages, + |b, &num_messages| { + b.to_async(&runtime).iter_batched( + || { + let (mpsc_address, mpsc_actor) = mpsc_counter(); + runtime.spawn(mpsc_actor); + + mpsc_address + }, + |mpsc_address| async move { + for _ in 0..num_messages - 1 { + mpsc_address.send(Increment {}).unwrap(); + } + + drop(mpsc_address); + }, + BatchSize::SmallInput, + ); + }, + ); + } +} + +criterion_group!(benches, xtra_throughput, mpsc_throughput); +criterion_main!(benches); From b0119b9cb2fe0b3e21d32f597ad63351f6934e29 Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Tue, 30 Aug 2022 17:37:19 +0200 Subject: [PATCH 2/2] Fix clippy --- benches/xtra_vs_tokio_mpsc.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/benches/xtra_vs_tokio_mpsc.rs b/benches/xtra_vs_tokio_mpsc.rs index 1ab1658e..e683d2fc 100644 --- a/benches/xtra_vs_tokio_mpsc.rs +++ b/benches/xtra_vs_tokio_mpsc.rs @@ -69,7 +69,7 @@ fn xtra_throughput(c: &mut Criterion) { }, |xtra_address| async move { for _ in 0..num_messages - 1 { - let _ = xtra_address.send(Increment {}).await.unwrap(); + xtra_address.send(Increment {}).await.unwrap(); } xtra_address.send(Stop).await.unwrap();