diff --git a/spellcheck.dic b/spellcheck.dic index c30e72ee590..962dce9560b 100644 --- a/spellcheck.dic +++ b/spellcheck.dic @@ -1,4 +1,4 @@ -300 +302 & + < @@ -139,6 +139,7 @@ implementers implementor implementors incrementing +initializer inlining interoperate invariants @@ -299,3 +300,4 @@ Wakers wakeup wakeups workstealing +ZSTs diff --git a/tokio/src/loom/std/atomic_u8.rs b/tokio/src/loom/std/atomic_u8.rs new file mode 100644 index 00000000000..559e71505c2 --- /dev/null +++ b/tokio/src/loom/std/atomic_u8.rs @@ -0,0 +1,35 @@ +use std::cell::UnsafeCell; +use std::fmt; +use std::ops::Deref; +use std::panic; + +/// `AtomicU8` providing an additional `with_mut` function. +pub(crate) struct AtomicU8 { + inner: std::sync::atomic::AtomicU8, +} + +impl AtomicU8 { + pub(crate) const fn new(val: u8) -> AtomicU8 { + let inner = std::sync::atomic::AtomicU8::new(val); + AtomicU8 { inner } + } + + /// Get access to a mutable reference to the inner value. + pub(crate) fn with_mut(&mut self, f: impl FnOnce(&mut u8) -> R) -> R { + f(self.inner.get_mut()) + } +} + +impl Deref for AtomicU8 { + type Target = std::sync::atomic::AtomicU8; + + fn deref(&self) -> &Self::Target { + &self.inner + } +} + +impl fmt::Debug for AtomicU8 { + fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { + self.deref().fmt(fmt) + } +} diff --git a/tokio/src/loom/std/mod.rs b/tokio/src/loom/std/mod.rs index 14e552a9aa5..68006599496 100644 --- a/tokio/src/loom/std/mod.rs +++ b/tokio/src/loom/std/mod.rs @@ -3,6 +3,7 @@ mod atomic_u16; mod atomic_u32; mod atomic_u64; +mod atomic_u8; mod atomic_usize; mod barrier; mod mutex; @@ -77,9 +78,10 @@ pub(crate) mod sync { pub(crate) use crate::loom::std::atomic_u16::AtomicU16; pub(crate) use crate::loom::std::atomic_u32::AtomicU32; pub(crate) use crate::loom::std::atomic_u64::{AtomicU64, StaticAtomicU64}; + pub(crate) use crate::loom::std::atomic_u8::AtomicU8; pub(crate) use crate::loom::std::atomic_usize::AtomicUsize; - pub(crate) use std::sync::atomic::{fence, AtomicBool, AtomicPtr, AtomicU8, Ordering}; + pub(crate) use std::sync::atomic::{fence, AtomicBool, AtomicPtr, Ordering}; } pub(crate) use super::barrier::Barrier; diff --git a/tokio/src/macros/cfg.rs b/tokio/src/macros/cfg.rs index 3242d3ce2ea..1bb70a51af8 100644 --- a/tokio/src/macros/cfg.rs +++ b/tokio/src/macros/cfg.rs @@ -144,6 +144,20 @@ macro_rules! cfg_io_driver_impl { } } +macro_rules! cfg_io_driver_or_sync { + ( $( $item:item )* ) => { + $( + #[cfg(any( + feature = "net", + all(unix, feature = "process"), + all(unix, feature = "signal"), + feature = "sync", + ))] + $item + )* + } +} + macro_rules! cfg_not_io_driver { ($($item:item)*) => { $( diff --git a/tokio/src/sync/batch_semaphore.rs b/tokio/src/sync/batch_semaphore.rs index 07120d63411..09069ae8d47 100644 --- a/tokio/src/sync/batch_semaphore.rs +++ b/tokio/src/sync/batch_semaphore.rs @@ -193,33 +193,6 @@ impl Semaphore { } } - /// Creates a new closed semaphore with 0 permits. - pub(crate) fn new_closed() -> Self { - Self { - permits: AtomicUsize::new(Self::CLOSED), - waiters: Mutex::new(Waitlist { - queue: LinkedList::new(), - closed: true, - }), - #[cfg(all(tokio_unstable, feature = "tracing"))] - resource_span: tracing::Span::none(), - } - } - - /// Creates a new closed semaphore with 0 permits. - #[cfg(not(all(loom, test)))] - pub(crate) const fn const_new_closed() -> Self { - Self { - permits: AtomicUsize::new(Self::CLOSED), - waiters: Mutex::const_new(Waitlist { - queue: LinkedList::new(), - closed: true, - }), - #[cfg(all(tokio_unstable, feature = "tracing"))] - resource_span: tracing::Span::none(), - } - } - /// Returns the current number of available permits. pub(crate) fn available_permits(&self) -> usize { self.permits.load(Acquire) >> Self::PERMIT_SHIFT diff --git a/tokio/src/sync/once_cell.rs b/tokio/src/sync/once_cell.rs index 1b723048dff..44d7cac3e06 100644 --- a/tokio/src/sync/once_cell.rs +++ b/tokio/src/sync/once_cell.rs @@ -1,28 +1,38 @@ -use super::{Semaphore, SemaphorePermit, TryAcquireError}; use crate::loom::cell::UnsafeCell; +use crate::loom::sync::atomic::AtomicU8; +use crate::loom::sync::Mutex; +use crate::util::linked_list::LinkedList; +use crate::util::{linked_list, WakeList}; +use std::convert::Infallible; use std::error::Error; use std::fmt; use std::future::Future; +use std::marker::PhantomPinned; use std::mem::MaybeUninit; use std::ops::Drop; +use std::pin::Pin; use std::ptr; -use std::sync::atomic::{AtomicBool, Ordering}; - -// This file contains an implementation of an OnceCell. The principle -// behind the safety of the cell is that any thread with an `&OnceCell` may -// access the `value` field according the following rules: -// -// 1. When `value_set` is false, the `value` field may be modified by the -// thread holding the permit on the semaphore. -// 2. When `value_set` is true, the `value` field may be accessed immutably by -// any thread. -// -// It is an invariant that if the semaphore is closed, then `value_set` is true. -// The reverse does not necessarily hold — but if not, the semaphore may not -// have any available permits. +use std::ptr::NonNull; +use std::sync::atomic::Ordering; +use std::task::{ready, Context, Poll, Waker}; + +// This file contains an implementation of an OnceCell. Its synchronization relies +// on an atomic state with 3 possible values: +// - STATE_UNSET: the cell is uninitialized +// - STATE_SET: the cell is initialized, its value can be accessed +// - STATE_LOCKED: the cell is locked, its value can be set // -// A thread with a `&mut OnceCell` may modify the value in any way it wants as -// long as the invariants are upheld. +// Initializing the cell is done in 3 steps: +// - acquire the cell lock by setting its state to `STATE_LOCKED` with a CAS +// - writing the cell value +// - setting the state to `STATE_SET` + +/// Cell is uninitialized. +const STATE_UNSET: u8 = 0; +/// Cell is initialized. +const STATE_SET: u8 = 1; +/// Cell is locked for initialization. +const STATE_LOCKED: u8 = 2; /// A thread-safe cell that can be written to only once. /// @@ -68,9 +78,9 @@ use std::sync::atomic::{AtomicBool, Ordering}; /// } /// ``` pub struct OnceCell { - value_set: AtomicBool, + state: AtomicU8, value: UnsafeCell>, - semaphore: Semaphore, + waiters: Mutex::Target>>, } impl Default for OnceCell { @@ -113,23 +123,17 @@ impl Drop for OnceCell { } impl From for OnceCell { + #[track_caller] fn from(value: T) -> Self { - OnceCell { - value_set: AtomicBool::new(true), - value: UnsafeCell::new(MaybeUninit::new(value)), - semaphore: Semaphore::new_closed(), - } + Self::new_with(Some(value)) } } impl OnceCell { /// Creates a new empty `OnceCell` instance. + #[track_caller] pub fn new() -> Self { - OnceCell { - value_set: AtomicBool::new(false), - value: UnsafeCell::new(MaybeUninit::uninit()), - semaphore: Semaphore::new(1), - } + Self::new_with(None) } /// Creates a new empty `OnceCell` instance. @@ -167,9 +171,9 @@ impl OnceCell { #[cfg(not(all(loom, test)))] pub const fn const_new() -> Self { OnceCell { - value_set: AtomicBool::new(false), + state: AtomicU8::new(STATE_UNSET), value: UnsafeCell::new(MaybeUninit::uninit()), - semaphore: Semaphore::const_new(1), + waiters: Mutex::const_new(LinkedList::new()), } } @@ -182,11 +186,16 @@ impl OnceCell { // and tokio MSRV is bumped to the rustc version with it stabilised, // we can make this function available in const context, // by creating `Semaphore::const_new_closed`. + #[track_caller] pub fn new_with(value: Option) -> Self { - if let Some(v) = value { - OnceCell::from(v) - } else { - OnceCell::new() + let (state, value) = match value { + Some(v) => (STATE_SET, MaybeUninit::new(v)), + None => (STATE_UNSET, MaybeUninit::uninit()), + }; + Self { + state: AtomicU8::new(state), + value: UnsafeCell::new(value), + waiters: Mutex::new(LinkedList::new()), } } @@ -222,9 +231,9 @@ impl OnceCell { #[cfg(not(all(loom, test)))] pub const fn const_new_with(value: T) -> Self { OnceCell { - value_set: AtomicBool::new(true), + state: AtomicU8::new(STATE_SET), value: UnsafeCell::new(MaybeUninit::new(value)), - semaphore: Semaphore::const_new_closed(), + waiters: Mutex::const_new(LinkedList::new()), } } @@ -233,13 +242,13 @@ impl OnceCell { pub fn initialized(&self) -> bool { // Using acquire ordering so any threads that read a true from this // atomic is able to read the value. - self.value_set.load(Ordering::Acquire) + self.state.load(Ordering::Acquire) == STATE_SET } /// Returns `true` if the `OnceCell` currently contains a value, and `false` /// otherwise. fn initialized_mut(&mut self) -> bool { - *self.value_set.get_mut() + self.state.with_mut(|s| *s == STATE_SET) } // SAFETY: The OnceCell must not be empty. @@ -252,17 +261,19 @@ impl OnceCell { &mut *self.value.with_mut(|ptr| (*ptr).as_mut_ptr()) } - fn set_value(&self, value: T, permit: SemaphorePermit<'_>) -> &T { - // SAFETY: We are holding the only permit on the semaphore. + /// # Safety + /// + /// `set_value` must be called after having locked the state + unsafe fn set_value(&self, value: T) -> &T { + // SAFETY: the state is locked unsafe { self.value.with_mut(|ptr| (*ptr).as_mut_ptr().write(value)); } // Using release ordering so any threads that read a true from this // atomic is able to read the value we just stored. - self.value_set.store(true, Ordering::Release); - self.semaphore.close(); - permit.forget(); + self.state.store(STATE_SET, Ordering::Release); + self.notify_initialized(); // SAFETY: We just initialized the cell. unsafe { self.get_unchecked() } @@ -304,32 +315,143 @@ impl OnceCell { /// [`SetError::AlreadyInitializedError`]: crate::sync::SetError::AlreadyInitializedError /// [`SetError::InitializingError`]: crate::sync::SetError::InitializingError pub fn set(&self, value: T) -> Result<(), SetError> { - if self.initialized() { - return Err(SetError::AlreadyInitializedError(value)); + // special handling for ZSTs + if std::mem::size_of::() == 0 { + return self.set_zst(value); + } + // Try to lock the state, using acquire `Acquire` ordering for both success + // and failure to have happens-before relation with previous initialization + // attempts. + match self.state.compare_exchange( + STATE_UNSET, + STATE_LOCKED, + Ordering::Acquire, + Ordering::Acquire, + ) { + Ok(_) => { + // SAFETY: state has been locked + unsafe { self.set_value(value) }; + Ok(()) + } + Err(STATE_LOCKED) => Err(SetError::InitializingError(value)), + Err(STATE_SET) => Err(SetError::AlreadyInitializedError(value)), + // SAFETY: all possible values of state are covered + Err(_) => unsafe { std::hint::unreachable_unchecked() }, } + } - // Another task might be initializing the cell, in which case - // `try_acquire` will return an error. If we succeed to acquire the - // permit, then we can set the value. - match self.semaphore.try_acquire() { - Ok(permit) => { - debug_assert!(!self.initialized()); - self.set_value(value, permit); + /// ZSTs don't need a two-phase writing, as they don't even need to be written; + /// `MaybeUninit::assume_init` is indeed always safe and valid for ZSTs. + fn set_zst(&self, value: T) -> Result<(), SetError> { + assert_eq!(std::mem::size_of::(), 0); + // Even if there is no value set, user may expect to have `set` + // happens-before `wait_initialized`, and successful `set` happens-before + // failing `set`, so release ordering is used for store and acquire for load. + match self.state.compare_exchange( + STATE_UNSET, + STATE_SET, + Ordering::Release, + Ordering::Acquire, + ) { + Ok(_) => { + // Forget the value, as `T` could implement `Drop`. + std::mem::forget(value); + self.notify_initialized(); Ok(()) } - Err(TryAcquireError::NoPermits) => { - // Some other task is holding the permit. That task is - // currently trying to initialize the value. - Err(SetError::InitializingError(value)) + Err(STATE_LOCKED) => Err(SetError::InitializingError(value)), + Err(STATE_SET) => Err(SetError::AlreadyInitializedError(value)), + // SAFETY: all possible values of state are covered + Err(_) => unsafe { std::hint::unreachable_unchecked() }, + } + } + + /// Notify all waiting tasks. + fn notify_initialized(&self) { + let mut wakers = WakeList::new(); + let mut waiters = self.waiters.lock(); + loop { + while wakers.can_push() { + if let Some(mut waiter) = waiters.pop_front() { + // Safety: we hold the lock, so we can access the waker. + wakers.push(unsafe { waiter.as_mut().waker.take() }.unwrap()); + } else { + // Release the lock before notifying + drop(waiters); + wakers.wake_all(); + return; + } } - Err(TryAcquireError::Closed) => { - // The semaphore was closed. Some other task has initialized - // the value. - Err(SetError::AlreadyInitializedError(value)) + drop(waiters); + wakers.wake_all(); + waiters = self.waiters.lock(); + } + } + + /// Notify one initializer task. + fn notify_unlocked(&self) { + let mut waiters = self.waiters.lock(); + if let Some(mut waiter) = waiters.drain_filter(|w| w.is_initializer).next() { + // SAFETY: we hold the lock, so we can access the waker. + let waker = unsafe { waiter.as_mut().waker.take() }.unwrap(); + drop(waiters); + waker.wake(); + } + } + + /// Wait for the state to have the required value. + /// If the node is pushed in the waiter + fn poll_wait(&self, cx: &mut Context<'_>, node: Pin<&mut Waiter>) -> Poll { + // Clone the waker before locking, a waker clone can be triggering arbitrary code. + let waker = cx.waker().clone(); + let mut waiters = self.waiters.lock(); + // SAFETY: node is not moved out of pinned reference, + // and it is accessed with the lock held + let node = unsafe { node.get_unchecked_mut() }; + // Checks the state with the lock held; if the waiter has been notified, + // then the state is ensured to have been modified. + // Uses acquire ordering as value may be accessed after in case of success. + let state = self.state.load(Ordering::Acquire); + if state == STATE_SET || node.is_initializer && state == STATE_UNSET { + let prev_waker = node.waker.take(); + // Removes the node from the list, in case the task was + // not awakened by the waker (e.g. in a `select!`). + // SAFETY: The node is contained in the list. + let removed = unsafe { waiters.remove(NonNull::from(node)) }; + debug_assert_eq!(removed.is_some(), prev_waker.is_some()); + drop(waiters); + // Drops previous waker after unlocking + drop(prev_waker); + Poll::Ready(state) + } else { + // If there is no waker, it means the node is not queued, + // so it is pushed in the list. + if node.waker.is_none() { + node.waker = Some(waker); + waiters.push_front(NonNull::from(node)); + // Otherwise, replaces the waker if needed. + } else if !node.waker.as_ref().unwrap().will_wake(&waker) { + let prev_waker = node.waker.replace(waker); + drop(waiters); + // Drops previous waker after unlocking + drop(prev_waker); } + Poll::Pending } } + /// Waits for the `OnceCell` to be initialized, and returns a reference + /// to the value stored. + pub async fn wait_initialized(&self) -> &T { + if let Some(value) = self.get() { + return value; + } + let state = WaitFuture::new(self, false).await; + debug_assert_eq!(state, STATE_SET); + // SAFETY: the cell has been initialized + unsafe { self.get_unchecked() } + } + /// Gets the value currently in the `OnceCell`, or initialize it with the /// given asynchronous operation. /// @@ -348,36 +470,9 @@ impl OnceCell { F: FnOnce() -> Fut, Fut: Future, { - crate::trace::async_trace_leaf().await; - - if self.initialized() { - // SAFETY: The OnceCell has been fully initialized. - unsafe { self.get_unchecked() } - } else { - // Here we try to acquire the semaphore permit. Holding the permit - // will allow us to set the value of the OnceCell, and prevents - // other tasks from initializing the OnceCell while we are holding - // it. - match self.semaphore.acquire().await { - Ok(permit) => { - debug_assert!(!self.initialized()); - - // If `f()` panics or `select!` is called, this - // `get_or_init` call is aborted and the semaphore permit is - // dropped. - let value = f().await; - - self.set_value(value, permit) - } - Err(_) => { - debug_assert!(self.initialized()); - - // SAFETY: The semaphore has been closed. This only happens - // when the OnceCell is fully initialized. - unsafe { self.get_unchecked() } - } - } - } + self.get_or_try_init(|| async move { Ok::<_, Infallible>(f().await) }) + .await + .unwrap() } /// Gets the value currently in the `OnceCell`, or initialize it with the @@ -400,35 +495,52 @@ impl OnceCell { { crate::trace::async_trace_leaf().await; - if self.initialized() { - // SAFETY: The OnceCell has been fully initialized. - unsafe { Ok(self.get_unchecked()) } - } else { - // Here we try to acquire the semaphore permit. Holding the permit - // will allow us to set the value of the OnceCell, and prevents - // other tasks from initializing the OnceCell while we are holding - // it. - match self.semaphore.acquire().await { - Ok(permit) => { - debug_assert!(!self.initialized()); - - // If `f()` panics or `select!` is called, this - // `get_or_try_init` call is aborted and the semaphore - // permit is dropped. - let value = f().await; - - match value { - Ok(value) => Ok(self.set_value(value, permit)), - Err(e) => Err(e), + loop { + // Try to lock the state, using acquire `Acquire` ordering for both success + // and failure to have happens-before relation with previous initialization + // attempts. + match self.state.compare_exchange( + STATE_UNSET, + STATE_LOCKED, + Ordering::Acquire, + Ordering::Acquire, + ) { + // State has been successfully locked, + // execute the initializer and set the result. + Ok(_) => { + // If `f().await` panics, is cancelled, or fails, + // the state will be unlocked by the guard. + struct DropGuard<'a, T>(&'a OnceCell); + impl Drop for DropGuard<'_, T> { + fn drop(&mut self) { + // Use release ordering to for this failed initialization + // attempt to happens-before the next one. + self.0.state.store(STATE_UNSET, Ordering::Release); + self.0.notify_unlocked(); + } } + let guard = DropGuard(self); + let value = f().await?; + std::mem::forget(guard); + // SAFETY: state has been locked + return Ok(unsafe { self.set_value(value) }); } - Err(_) => { - debug_assert!(self.initialized()); - - // SAFETY: The semaphore has been closed. This only happens - // when the OnceCell is fully initialized. - unsafe { Ok(self.get_unchecked()) } + // State is currently locked by another initializer, + // wait for it to succeed or fail. + Err(STATE_LOCKED) => { + let state = WaitFuture::new(self, true).await; + // SAFETY: the cell has been initialized + if state == STATE_SET { + return Ok(unsafe { self.get_unchecked() }); + } + debug_assert_eq!(state, STATE_UNSET); + // The other initializer has failed, try to lock the state again + continue; } + // SAFETY: the cell has been initialized + Err(STATE_SET) => return Ok(unsafe { self.get_unchecked() }), + // SAFETY: all possible values of state are covered + Err(_) => unsafe { std::hint::unreachable_unchecked() }, } } } @@ -438,14 +550,14 @@ impl OnceCell { pub fn into_inner(mut self) -> Option { if self.initialized_mut() { // Set to uninitialized for the destructor of `OnceCell` to work properly - *self.value_set.get_mut() = false; + self.state.with_mut(|s| *s = STATE_UNSET); Some(unsafe { self.value.with(|ptr| ptr::read(ptr).assume_init()) }) } else { None } } - /// Takes ownership of the current value, leaving the cell empty. Returns + /// Takes ownership of the current value, leaving the cell empty. Returns /// `None` if the cell is empty. pub fn take(&mut self) -> Option { std::mem::take(self).into_inner() @@ -464,6 +576,137 @@ unsafe impl Sync for OnceCell {} // it's safe to send it to another thread unsafe impl Send for OnceCell {} +/// An entry in the wait queue. +/// +/// `Waiter` should be accessed with the wait queue lock acquired. +struct Waiter { + /// If the waiter is a cell initializer. + is_initializer: bool, + /// The waker of the awaiting task. A waker is present + /// if and only if the waiter is queued in the wait queue; + /// the invariant must be upheld when the queue lock is released. + waker: Option, + /// Intrusive linked-list pointers. + pointers: linked_list::Pointers, + /// Should not be `Unpin`. + _p: PhantomPinned, +} + +impl Waiter { + fn new(is_initializer: bool) -> Self { + Self { + is_initializer, + waker: None, + pointers: linked_list::Pointers::new(), + _p: PhantomPinned, + } + } +} + +// SAFETY: `Waiter` is forced to be !Unpin. +unsafe impl linked_list::Link for Waiter { + type Handle = NonNull; + type Target = Waiter; + + fn as_raw(handle: &NonNull) -> NonNull { + *handle + } + + unsafe fn from_raw(ptr: NonNull) -> NonNull { + ptr + } + + unsafe fn pointers(target: NonNull) -> NonNull> { + Waiter::addr_of_pointers(target) + } +} + +generate_addr_of_methods! { + impl<> Waiter { + unsafe fn addr_of_pointers(self: NonNull) -> NonNull> { + &self.pointers + } + } +} + +/// Wait for the cell to be initialized, or the state to be unlocked +/// in the case of an initializer. +struct WaitFuture<'a, T> { + node: Waiter, + cell: &'a OnceCell, + /// If `true`, the node *may* be queued in the wait queue. + /// If `false`, the node is not in the wait queue. + maybe_queued: bool, +} + +impl<'a, T> WaitFuture<'a, T> { + /// Initializes the future. + /// + /// If `is_initializer` is true, then the future will wait the state + /// to be unlocked. Otherwise, it waits the cell to be initialized. + fn new(cell: &'a OnceCell, is_initializer: bool) -> Self { + Self { + cell, + node: Waiter::new(is_initializer), + maybe_queued: false, + } + } + + fn project(self: Pin<&mut Self>) -> (Pin<&mut Waiter>, &OnceCell, &mut bool) { + fn is_unpin() {} + // SAFETY: all fields other than `node` are `Unpin` + unsafe { + is_unpin::<&OnceCell>(); + + let this = self.get_unchecked_mut(); + ( + Pin::new_unchecked(&mut this.node), + this.cell, + &mut this.maybe_queued, + ) + } + } +} + +impl Drop for WaitFuture<'_, T> { + fn drop(&mut self) { + // If the node is not in the queue, we can skip acquiring the lock. + if !self.maybe_queued { + return; + } + // This is where we ensure safety. The future is being dropped, + // which means we must ensure that the waiter entry is no longer stored + // in the linked list. + let mut waiters = self.cell.waiters.lock(); + + // remove the entry from the list + let node = NonNull::from(&mut self.node); + // SAFETY: we have locked the wait list. + unsafe { waiters.remove(node) }; + } +} + +impl Future for WaitFuture<'_, T> { + type Output = u8; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let (node, cell, maybe_queued) = self.project(); + let coop = ready!(crate::task::coop::poll_proceed(cx)); + + match cell.poll_wait(cx, node) { + Poll::Ready(state) => { + coop.made_progress(); + *maybe_queued = false; + Poll::Ready(state) + } + Poll::Pending => { + *maybe_queued = true; + Poll::Pending + } + } + } +} + /// Errors that can be returned from [`OnceCell::set`]. /// /// [`OnceCell::set`]: crate::sync::OnceCell::set diff --git a/tokio/src/sync/semaphore.rs b/tokio/src/sync/semaphore.rs index 97963afddc2..09da727f790 100644 --- a/tokio/src/sync/semaphore.rs +++ b/tokio/src/sync/semaphore.rs @@ -503,25 +503,6 @@ impl Semaphore { } } - /// Creates a new closed semaphore with 0 permits. - pub(crate) fn new_closed() -> Self { - Self { - ll_sem: ll::Semaphore::new_closed(), - #[cfg(all(tokio_unstable, feature = "tracing"))] - resource_span: tracing::Span::none(), - } - } - - /// Creates a new closed semaphore with 0 permits. - #[cfg(not(all(loom, test)))] - pub(crate) const fn const_new_closed() -> Self { - Self { - ll_sem: ll::Semaphore::const_new_closed(), - #[cfg(all(tokio_unstable, feature = "tracing"))] - resource_span: tracing::Span::none(), - } - } - /// Returns the current number of available permits. pub fn available_permits(&self) -> usize { self.ll_sem.available_permits() diff --git a/tokio/src/sync/tests/loom_once_cell.rs b/tokio/src/sync/tests/loom_once_cell.rs new file mode 100644 index 00000000000..ba40035ceb9 --- /dev/null +++ b/tokio/src/sync/tests/loom_once_cell.rs @@ -0,0 +1,106 @@ +use crate::sync::OnceCell; +use loom::future::block_on; +use loom::sync::atomic::{AtomicUsize, Ordering}; +use loom::sync::Arc; +use loom::thread; + +#[test] +fn zst() { + loom::model(|| { + let cell = Arc::new(OnceCell::new()); + let cell2 = cell.clone(); + + let th = thread::spawn(move || { + block_on(async { + cell2.wait_initialized().await; + }); + }); + + cell.set(()).unwrap(); + th.join().unwrap(); + }); +} + +#[test] +fn wait_initialized() { + loom::model(|| { + let cell = Arc::new(OnceCell::new()); + let cell2 = cell.clone(); + + let th = thread::spawn(move || { + block_on(async { + assert_eq!(*cell2.wait_initialized().await, 42); + }); + }); + + cell.set(42).unwrap(); + th.join().unwrap(); + }); +} + +#[test] +fn get_or_init() { + loom::model(|| { + let cell = Arc::new(OnceCell::new()); + let cell2 = cell.clone(); + + let th = + thread::spawn(move || block_on(async { *cell2.get_or_init(|| async { 1 }).await })); + + let value = block_on(async { *cell.get_or_init(|| async { 2 }).await }); + assert!(value == 1 || value == 2); + assert!(th.join().unwrap() == value); + }); +} + +#[test] +fn get_or_try_init() { + loom::model(|| { + let cell = Arc::new(OnceCell::new()); + let cell2 = cell.clone(); + + let th = + thread::spawn(move || block_on(async { *cell2.get_or_init(|| async { 1 }).await })); + + let res = block_on(async { cell.get_or_try_init(|| async { Err(()) }).await }); + assert!(matches!(res, Ok(&1) | Err(()))); + assert!(th.join().unwrap() == 1); + }); +} + +#[test] +fn init_attempt_happens_before() { + loom::model(|| { + let cell = Arc::new(OnceCell::new()); + let cell2 = cell.clone(); + let cell3 = cell.clone(); + let atomic = Arc::new(AtomicUsize::new(0)); + let atomic2 = atomic.clone(); + let atomic3 = atomic.clone(); + + async fn incr_and_fail(atomic: Arc) -> Result { + let atomic_value = atomic.load(Ordering::Relaxed); + atomic.fetch_add(1, Ordering::Relaxed); + Err(atomic_value) + } + + let th1 = thread::spawn(move || { + block_on(async { cell2.get_or_try_init(|| incr_and_fail(atomic2)).await.err() }) + }); + let th2 = thread::spawn(move || { + block_on(async { cell3.get_or_try_init(|| incr_and_fail(atomic3)).await.err() }) + }); + let atomic_value = cell.set(42).ok().map(|_| atomic.load(Ordering::Relaxed)); + + assert!(matches!( + (atomic_value, th1.join().unwrap(), th2.join().unwrap()), + (None, Some(0), Some(1)) + | (None, Some(1), Some(0)) + | (Some(0), None, None) + | (Some(1), Some(0), None) + | (Some(1), None, Some(0)) + | (Some(2), Some(0), Some(1)) + | (Some(2), Some(1), Some(0)), + )); + }); +} diff --git a/tokio/src/sync/tests/mod.rs b/tokio/src/sync/tests/mod.rs index ee76418ac59..597c5aa2e96 100644 --- a/tokio/src/sync/tests/mod.rs +++ b/tokio/src/sync/tests/mod.rs @@ -10,6 +10,7 @@ cfg_loom! { mod loom_list; mod loom_mpsc; mod loom_notify; + mod loom_once_cell; mod loom_oneshot; mod loom_semaphore_batch; mod loom_watch; diff --git a/tokio/src/util/linked_list.rs b/tokio/src/util/linked_list.rs index 3650f87fbb0..cdc94ee03ea 100644 --- a/tokio/src/util/linked_list.rs +++ b/tokio/src/util/linked_list.rs @@ -263,7 +263,7 @@ impl Default for LinkedList { // ===== impl DrainFilter ===== -cfg_io_driver_impl! { +cfg_io_driver_or_sync! { pub(crate) struct DrainFilter<'a, T: Link, F> { list: &'a mut LinkedList, filter: F, diff --git a/tokio/tests/sync_once_cell.rs b/tokio/tests/sync_once_cell.rs index b662db3add1..214d147da30 100644 --- a/tokio/tests/sync_once_cell.rs +++ b/tokio/tests/sync_once_cell.rs @@ -272,3 +272,28 @@ fn get_or_try_init() { assert_eq!(*result2.unwrap(), 10); }); } + +#[test] +fn wait_initialized() { + let rt = runtime::Builder::new_current_thread() + .enable_time() + .start_paused(true) + .build() + .unwrap(); + + static ONCE: OnceCell = OnceCell::const_new(); + + rt.block_on(async { + let handle1 = rt.spawn(async { ONCE.wait_initialized().await }); + let handle2 = rt.spawn(async { ONCE.get_or_init(func2).await }); + + time::advance(Duration::from_millis(1)).await; + time::resume(); + + let result1 = handle1.await.unwrap(); + let result2 = handle2.await.unwrap(); + + assert_eq!(*result1, 10); + assert_eq!(*result2, 10); + }); +}