Skip to content

Commit

Permalink
Merge remote-tracking branch 'zeenix/split-join'
Browse files Browse the repository at this point in the history
  • Loading branch information
danieldg committed Dec 2, 2022
2 parents 0b969fb + 2d9655e commit e39255d
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 0 deletions.
7 changes: 7 additions & 0 deletions src/adapters.rs
Original file line number Diff line number Diff line change
Expand Up @@ -560,6 +560,13 @@ pin_project_lite::pin_project! {
}
}

impl<S, F> Map<S, F> {
/// Convert to source stream.
pub fn into_inner(self) -> S {
self.stream
}
}

impl<S, F, R> OrderedStream for Map<S, F>
where
S: OrderedStream,
Expand Down
20 changes: 20 additions & 0 deletions src/join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,26 @@ impl<I, T> From<Poll<PollResult<T, I>>> for PollState<I, T> {
}
}

impl<A, B> Join<A, B>
where
A: OrderedStream,
B: OrderedStream<Data = A::Data, Ordering = A::Ordering>,
{
/// Split into the source streams.
///
/// This method returns the source streams along with any buffered item and its
/// ordering.
pub fn into_inner(self) -> (A, B, Option<(A::Data, A::Ordering)>) {
let item = match self.state {
JoinState::A(a, o) => Some((a, o)),
JoinState::B(b, o) => Some((b, o)),
_ => None,
};

(self.stream_a, self.stream_b, item)
}
}

impl<A, B> OrderedStream for Join<A, B>
where
A: OrderedStream,
Expand Down

0 comments on commit e39255d

Please sign in to comment.