Skip to content

Commit

Permalink
Add is_closed and has_message to the Receiver
Browse files Browse the repository at this point in the history
Allows polling the receiver for the channel state,
without changing it or without pulling any message out.
  • Loading branch information
faern committed Feb 2, 2025
1 parent ad5d1e0 commit d2c4946
Show file tree
Hide file tree
Showing 2 changed files with 52 additions and 0 deletions.
36 changes: 36 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -775,6 +775,42 @@ impl<T> Receiver<T> {
})
}

/// Returns true if the associated [`Sender`] was dropped before sending a message. Or if
/// the message has already been received.
///
/// If `true` is returned, all future calls to receive methods are guaranteed to return
/// a disconnected error. And future calls to this method is guaranteed to also return `true`.
pub fn is_closed(&self) -> bool {
// SAFETY: the existence of the `self` parameter serves as a certificate that the receiver
// is still alive, meaning that even if the sender was dropped then it would have observed
// the fact that we're still alive and left the responsibility of deallocating the
// channel to us, so `self.channel` is valid
let channel = unsafe { self.channel_ptr.as_ref() };

// ORDERING: We *chose* a Relaxed ordering here as it is sufficient to
// enforce the method's contract. Once true has been observed, it will remain true.
// However, if false is observed, the sender might have just disconnected but this thread
// has not observed it yet.
channel.state.load(Relaxed) == DISCONNECTED
}

/// Returns true if there is a message in the channel, ready to be received.
///
/// If `true` is returned, the next call to a receive method is guaranteed to return
/// a message.
pub fn has_message(&self) -> bool {
// SAFETY: the existence of the `self` parameter serves as a certificate that the receiver
// is still alive, meaning that even if the sender was dropped then it would have observed
// the fact that we're still alive and left the responsibility of deallocating the
// channel to us, so `self.channel` is valid
let channel = unsafe { self.channel_ptr.as_ref() };

// ORDERING: An acquire ordering is used to guarantee no subsequent loads is reordered
// before this one. This upholds the contract that if true is returned, the next call to
// a receive method is guaranteed to also abserve the `MESSAGE` state and return a message.
channel.state.load(Acquire) == MESSAGE
}

/// Begins the process of receiving on the channel by reference. If the message is already
/// ready, or the sender has disconnected, then this function will return the appropriate
/// Result immediately. Otherwise, it will write the waker to memory, check to see if the
Expand Down
16 changes: 16 additions & 0 deletions tests/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,12 @@ use helpers::{maybe_loom_model, DropCounter};
fn send_before_try_recv() {
maybe_loom_model(|| {
let (sender, receiver) = oneshot::channel();
assert!(!receiver.has_message());
assert!(sender.send(19i128).is_ok());

assert!(receiver.has_message());
assert_eq!(receiver.try_recv(), Ok(19i128));
assert!(!receiver.has_message());
assert_eq!(receiver.try_recv(), Err(TryRecvError::Disconnected));
#[cfg(feature = "std")]
{
Expand Down Expand Up @@ -124,6 +127,7 @@ fn try_recv_with_dropped_sender() {
maybe_loom_model(|| {
let (sender, receiver) = oneshot::channel::<u128>();
mem::drop(sender);
assert!(!receiver.has_message());
receiver.try_recv().unwrap_err();
})
}
Expand Down Expand Up @@ -347,7 +351,19 @@ fn dropping_receiver_disconnects_sender() {
maybe_loom_model(|| {
let (sender, receiver) = oneshot::channel::<()>();
assert!(!sender.is_closed());
assert!(!receiver.is_closed());
drop(receiver);
assert!(sender.is_closed());
});
}

#[test]
fn dropping_sender_disconnects_receiver() {
maybe_loom_model(|| {
let (sender, receiver) = oneshot::channel::<()>();
assert!(!sender.is_closed());
assert!(!receiver.is_closed());
drop(sender);
assert!(receiver.is_closed());
});
}

0 comments on commit d2c4946

Please sign in to comment.