Skip to content

Commit

Permalink
Add additional helpers for access to the component streams of Join
Browse files Browse the repository at this point in the history
  • Loading branch information
danieldg committed Dec 2, 2022
1 parent e39255d commit 9fb1536
Showing 1 changed file with 52 additions and 0 deletions.
52 changes: 52 additions & 0 deletions src/join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,58 @@ where

(self.stream_a, self.stream_b, item)
}

/// Provide direct access to the underlying stream.
///
/// This may be useful if the stream provides APIs beyond [OrderedStream]. Note that the join
/// itself may be buffering an item from this stream, so you should consult
/// [Self::peek_buffered] and, if needed, [Self::take_buffered] before polling it directly.
pub fn stream_a(self: Pin<&mut Self>) -> Pin<&mut A> {
self.project().stream_a
}

/// Provide direct access to the underlying stream.
///
/// This may be useful if the stream provides APIs beyond [OrderedStream]. Note that the join
/// itself may be buffering an item from this stream, so you should consult
/// [Self::peek_buffered] and, if needed, [Self::take_buffered] before polling it directly.
pub fn stream_b(self: Pin<&mut Self>) -> Pin<&mut B> {
self.project().stream_b
}

/// Allow access to the buffered item, if any.
///
/// At most one of the two sides will be `Some`. The returned item is a candidate for being
/// the next item returned by the joined stream, but it could not be returned by the most
/// recent [`OrderedStream::poll_next_before`] call.
pub fn peek_buffered(
self: Pin<&mut Self>,
) -> (
Option<(&mut A::Data, &A::Ordering)>,
Option<(&mut B::Data, &B::Ordering)>,
) {
match self.project().state {
JoinState::A(a, o) => (Some((a, o)), None),
JoinState::B(b, o) => (None, Some((b, o))),
_ => (None, None),
}
}

/// Remove the buffered item, if one is present.
///
/// This does not poll either underlying stream. See [Self::peek_buffered] for details on why
/// buffering exists.
pub fn take_buffered(self: Pin<&mut Self>) -> Option<(A::Data, A::Ordering)> {
let state = self.project().state;
match mem::replace(state, JoinState::None) {
JoinState::A(a, o) => Some((a, o)),
JoinState::B(b, o) => Some((b, o)),
other => {
*state = other;
None
}
}
}
}

impl<A, B> OrderedStream for Join<A, B>
Expand Down

0 comments on commit 9fb1536

Please sign in to comment.