diff --git a/src/adapters.rs b/src/adapters.rs index 08a2234..56ab453 100644 --- a/src/adapters.rs +++ b/src/adapters.rs @@ -560,6 +560,13 @@ pin_project_lite::pin_project! { } } +impl Map { + /// Convert to source stream. + pub fn into_inner(self) -> S { + self.stream + } +} + impl OrderedStream for Map where S: OrderedStream, diff --git a/src/join.rs b/src/join.rs index bccc478..4a764c2 100644 --- a/src/join.rs +++ b/src/join.rs @@ -139,6 +139,26 @@ impl From>> for PollState { } } +impl Join +where + A: OrderedStream, + B: OrderedStream, +{ + /// Split into the source streams. + /// + /// This method returns the source streams along with any buffered item and its + /// ordering. + pub fn into_inner(self) -> (A, B, Option<(A::Data, A::Ordering)>) { + let item = match self.state { + JoinState::A(a, o) => Some((a, o)), + JoinState::B(b, o) => Some((b, o)), + _ => None, + }; + + (self.stream_a, self.stream_b, item) + } +} + impl OrderedStream for Join where A: OrderedStream,