From 684b2fd289fbce41631bcee6295615668b1997e2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Linus=20F=C3=A4rnstrand?= Date: Sun, 2 Feb 2025 23:33:03 +0100 Subject: [PATCH] Add is_closed and has_message to the Receiver Allows polling the receiver for the channel state, without changing it or without pulling any message out. --- CHANGELOG.md | 3 +++ src/lib.rs | 36 ++++++++++++++++++++++++++++++++++++ tests/sync.rs | 16 ++++++++++++++++ 3 files changed, 55 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 885acb6..11ffaf7 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -15,6 +15,9 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0. ## [Unreleased] +### Added +- Add `is_closed` and `has_message` to the `Receiver`. Allows polling for the channel + state without modifying the channel or pulling the message from it. ## [0.1.9] - 2025-02-02 diff --git a/src/lib.rs b/src/lib.rs index 6ffd726..d2c1765 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -775,6 +775,42 @@ impl Receiver { }) } + /// Returns true if the associated [`Sender`] was dropped before sending a message. Or if + /// the message has already been received. + /// + /// If `true` is returned, all future calls to receive methods are guaranteed to return + /// a disconnected error. And future calls to this method is guaranteed to also return `true`. + pub fn is_closed(&self) -> bool { + // SAFETY: the existence of the `self` parameter serves as a certificate that the receiver + // is still alive, meaning that even if the sender was dropped then it would have observed + // the fact that we're still alive and left the responsibility of deallocating the + // channel to us, so `self.channel` is valid + let channel = unsafe { self.channel_ptr.as_ref() }; + + // ORDERING: We *chose* a Relaxed ordering here as it is sufficient to + // enforce the method's contract. Once true has been observed, it will remain true. + // However, if false is observed, the sender might have just disconnected but this thread + // has not observed it yet. + channel.state.load(Relaxed) == DISCONNECTED + } + + /// Returns true if there is a message in the channel, ready to be received. + /// + /// If `true` is returned, the next call to a receive method is guaranteed to return + /// a message. + pub fn has_message(&self) -> bool { + // SAFETY: the existence of the `self` parameter serves as a certificate that the receiver + // is still alive, meaning that even if the sender was dropped then it would have observed + // the fact that we're still alive and left the responsibility of deallocating the + // channel to us, so `self.channel` is valid + let channel = unsafe { self.channel_ptr.as_ref() }; + + // ORDERING: An acquire ordering is used to guarantee no subsequent loads is reordered + // before this one. This upholds the contract that if true is returned, the next call to + // a receive method is guaranteed to also abserve the `MESSAGE` state and return a message. + channel.state.load(Acquire) == MESSAGE + } + /// Begins the process of receiving on the channel by reference. If the message is already /// ready, or the sender has disconnected, then this function will return the appropriate /// Result immediately. Otherwise, it will write the waker to memory, check to see if the diff --git a/tests/sync.rs b/tests/sync.rs index ae705e1..cfc75df 100644 --- a/tests/sync.rs +++ b/tests/sync.rs @@ -26,9 +26,12 @@ use helpers::{maybe_loom_model, DropCounter}; fn send_before_try_recv() { maybe_loom_model(|| { let (sender, receiver) = oneshot::channel(); + assert!(!receiver.has_message()); assert!(sender.send(19i128).is_ok()); + assert!(receiver.has_message()); assert_eq!(receiver.try_recv(), Ok(19i128)); + assert!(!receiver.has_message()); assert_eq!(receiver.try_recv(), Err(TryRecvError::Disconnected)); #[cfg(feature = "std")] { @@ -124,6 +127,7 @@ fn try_recv_with_dropped_sender() { maybe_loom_model(|| { let (sender, receiver) = oneshot::channel::(); mem::drop(sender); + assert!(!receiver.has_message()); receiver.try_recv().unwrap_err(); }) } @@ -347,7 +351,19 @@ fn dropping_receiver_disconnects_sender() { maybe_loom_model(|| { let (sender, receiver) = oneshot::channel::<()>(); assert!(!sender.is_closed()); + assert!(!receiver.is_closed()); drop(receiver); assert!(sender.is_closed()); }); } + +#[test] +fn dropping_sender_disconnects_receiver() { + maybe_loom_model(|| { + let (sender, receiver) = oneshot::channel::<()>(); + assert!(!sender.is_closed()); + assert!(!receiver.is_closed()); + drop(sender); + assert!(receiver.is_closed()); + }); +}