From 6b77e22f30efa4acca39ab741af347bc52239e9a Mon Sep 17 00:00:00 2001 From: Zeeshan Ali Khan Date: Mon, 26 Dec 2022 17:31:25 +0100 Subject: [PATCH 1/6] Add futures_executor and futures_util as dev deps We'll use them in the following commit in tests. --- Cargo.toml | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/Cargo.toml b/Cargo.toml index 1a0e7af..3ef4cb3 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -12,3 +12,7 @@ keywords = ["async", "stream", "timestamp"] [dependencies] futures-core = "0.3" pin-project-lite = "0.2" + +[dev-dependencies] +futures-executor = "0.3.25" +futures-util = "0.3.25" From a23fbcca2d79982f0dcd3e3982539230e3fcccf3 Mon Sep 17 00:00:00 2001 From: Zeeshan Ali Khan Date: Mon, 26 Dec 2022 17:40:13 +0100 Subject: [PATCH 2/6] Add a test for Join Partially addresses #8. --- src/join.rs | 36 ++++++++++++++++++++++++++++++++++++ 1 file changed, 36 insertions(+) diff --git a/src/join.rs b/src/join.rs index 63d3823..66fc821 100644 --- a/src/join.rs +++ b/src/join.rs @@ -379,3 +379,39 @@ where matches!(self.state, JoinState::Terminated) } } + +#[cfg(test)] +mod test { + use crate::join; + use crate::FromStream; + use crate::OrderedStreamExt; + + pub struct Message { + serial: u32, + } + + #[test] + fn join_two() { + futures_executor::block_on(async { + let stream1 = futures_util::stream::iter([ + Message { serial: 1 }, + Message { serial: 3 }, + Message { serial: 5 }, + ]); + + let stream2 = futures_util::stream::iter([ + Message { serial: 2 }, + Message { serial: 4 }, + Message { serial: 6 }, + ]); + let mut joined = join( + FromStream::with_ordering(stream1, |m| m.serial), + FromStream::with_ordering(stream2, |m| m.serial), + ); + for i in 0..6 { + let msg = joined.next().await.unwrap(); + assert_eq!(msg.serial, i as u32 + 1); + } + }); + } +} From 85cedb4a928e7d0b238bba4f0195f14c58045fc4 Mon Sep 17 00:00:00 2001 From: Zeeshan Ali Khan Date: Mon, 26 Dec 2022 17:33:55 +0100 Subject: [PATCH 3/6] Add a test for JoinMultiple It currently hangs (#10). Partially addresses #8. --- src/multi.rs | 52 ++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 52 insertions(+) diff --git a/src/multi.rs b/src/multi.rs index 6e5627d..ed5dd05 100644 --- a/src/multi.rs +++ b/src/multi.rs @@ -127,3 +127,55 @@ where poll_multiple(self.as_pin_mut(), cx, before) } } + +#[cfg(test)] +mod test { + extern crate alloc; + + use crate::FromStream; + use crate::JoinMultiple; + use crate::OrderedStreamExt; + use alloc::boxed::Box; + use alloc::vec::Vec; + use core::pin::Pin; + use futures_core::Stream; + + #[test] + fn join_mutiple() { + futures_executor::block_on(async { + pub struct Message { + serial: u32, + } + + pub struct RemoteLogSource { + stream: Pin>>, + } + + let mut logs = [ + RemoteLogSource { + stream: Box::pin(futures_util::stream::iter([ + Message { serial: 1 }, + Message { serial: 3 }, + Message { serial: 5 }, + ])), + }, + RemoteLogSource { + stream: Box::pin(futures_util::stream::iter([ + Message { serial: 2 }, + Message { serial: 4 }, + Message { serial: 6 }, + ])), + }, + ]; + let streams: Vec<_> = logs + .iter_mut() + .map(|s| FromStream::with_ordering(&mut s.stream, |m| m.serial).peekable()) + .collect(); + let mut joined = JoinMultiple(streams); + for i in 0..6 { + let msg = joined.next().await.unwrap(); + assert_eq!(msg.serial, i as u32 + 1); + } + }); + } +} From e71c8b89d6ec4019d29620d11dc3219311ee4732 Mon Sep 17 00:00:00 2001 From: Zeeshan Ali Khan Date: Mon, 26 Dec 2022 18:04:21 +0100 Subject: [PATCH 4/6] Yield items if we have them This fixes `OrderedStream` impl of `JoinMultiple`. Fixes#10. --- src/multi.rs | 22 +++++++++------------- 1 file changed, 9 insertions(+), 13 deletions(-) diff --git a/src/multi.rs b/src/multi.rs index ed5dd05..ac3cb70 100644 --- a/src/multi.rs +++ b/src/multi.rs @@ -16,7 +16,7 @@ where // The stream with the earliest item that is actually before the given point let mut best: Option> = None; let mut has_data = false; - let mut has_pending = true; + let mut has_pending = false; for mut stream in streams { let best_before = best.as_ref().and_then(|p| p.item().map(|i| &i.0)); let before = match (before, best_before) { @@ -32,24 +32,20 @@ where Poll::Ready(PollResult::NoneBefore) => { has_data = true; } - Poll::Ready(PollResult::Item { ordering, .. }) => { - match before { - // skip the compare if it doesn't matter - _ if has_pending => continue, - Some(max) if max < ordering => continue, - _ => { - best = Some(stream); - } + Poll::Ready(PollResult::Item { ordering, .. }) => match before { + Some(max) if max < ordering => continue, + _ => { + best = Some(stream); } - } + }, } } match best { - _ if has_pending => Poll::Pending, - // This is guaranteed to return PollResult::Item - Some(mut stream) => stream.as_mut().poll_next_before(cx, before), None if has_data => Poll::Ready(PollResult::NoneBefore), + None if has_pending => Poll::Pending, None => Poll::Ready(PollResult::Terminated), + // This is guaranteed to return PollResult::Item + Some(mut stream) => stream.as_mut().poll_next_before(cx, before), } } From 9bfa9e2d3997ed1260ca0bc48fb78b669caa54a2 Mon Sep 17 00:00:00 2001 From: Zeeshan Ali Khan Date: Tue, 27 Dec 2022 00:05:13 +0100 Subject: [PATCH 5/6] impl FusedOrderedStream for Peekable --- src/adapters.rs | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/src/adapters.rs b/src/adapters.rs index 56ab453..922fb75 100644 --- a/src/adapters.rs +++ b/src/adapters.rs @@ -1006,3 +1006,9 @@ impl OrderedStream for Peekable { } } } + +impl FusedOrderedStream for Peekable { + fn is_terminated(&self) -> bool { + self.stream.is_none() + } +} From 6729e9a8dd7f1c436bfc0e996acafb6ad8368eae Mon Sep 17 00:00:00 2001 From: Zeeshan Ali Khan Date: Tue, 27 Dec 2022 00:05:43 +0100 Subject: [PATCH 6/6] impl FusedOrderedStream for JoinMultiple Fixes #9. --- src/multi.rs | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/src/multi.rs b/src/multi.rs index ac3cb70..45bf5a7 100644 --- a/src/multi.rs +++ b/src/multi.rs @@ -86,6 +86,17 @@ where } } +impl FusedOrderedStream for JoinMultiple +where + for<'a> &'a mut C: IntoIterator>, + for<'a> &'a C: IntoIterator>, + S: OrderedStream + Unpin, +{ + fn is_terminated(&self) -> bool { + self.0.into_iter().all(|peekable| peekable.is_terminated()) + } +} + pin_project_lite::pin_project! { /// Join a collection of pinned [`OrderedStream`]s. ///