From 95c9ece51f3459521ccac14883d7a853ef239074 Mon Sep 17 00:00:00 2001 From: Eliza Weisman Date: Wed, 25 May 2022 11:56:03 -0700 Subject: [PATCH 1/8] wip Signed-off-by: Eliza Weisman --- async/src/task.rs | 22 ++++++- async/src/task/state.rs | 143 +++++++++++++++++++++++++++++++--------- async/src/util.rs | 2 +- async/src/wait/cell.rs | 8 +-- 4 files changed, 135 insertions(+), 40 deletions(-) diff --git a/async/src/task.rs b/async/src/task.rs index 6331bf95..3479b4fd 100644 --- a/async/src/task.rs +++ b/async/src/task.rs @@ -62,8 +62,8 @@ macro_rules! trace_task { ($ptr:expr, $f:ty, $method:literal) => { tracing::trace!( ptr = ?$ptr, - concat!("Task::::", $method), - type_name::<<$f>::Output>() + output = type_name::<<$f>::Output>(), + concat!("Task::", $method), ); }; } @@ -144,11 +144,27 @@ impl Task { unsafe fn poll(ptr: NonNull
) -> Poll<()> { trace_task!(ptr, F, "poll"); let ptr = ptr.cast::(); + + // try to transition the task to the polling state + let state = &ptr.as_ref().header.state; + match test_dbg!(state.start_poll()) { + // transitioned successfully! + Ok(_) => {} + Err(_) => { + // TODO(eliza): could run the dealloc glue here instead of going + // through a ref cycle? + return Poll::Ready(()); + } + } let waker = Waker::from_raw(Self::raw_waker(ptr.as_ptr())); let cx = Context::from_waker(&waker); let pin = Pin::new_unchecked(ptr.cast::().as_mut()); let poll = pin.poll_inner(cx); - if poll.is_ready() { + let completed = poll.is_ready(); + state + .end_poll(completed) + .expect("must transition out of polling"); + if completed { Self::drop_ref(ptr); } diff --git a/async/src/task/state.rs b/async/src/task/state.rs index 4a7ef8e2..98c5662d 100644 --- a/async/src/task/state.rs +++ b/async/src/task/state.rs @@ -12,45 +12,50 @@ pub(crate) struct State(usize); pub(super) struct StateVar(AtomicUsize); impl State { - const RUNNING: PackUsize = PackUsize::least_significant(1); - const NOTIFIED: PackUsize = Self::RUNNING.next(1); - const COMPLETED: PackUsize = Self::NOTIFIED.next(1); - const REFS: PackUsize = Self::COMPLETED.remaining(); - - const REF_ONE: usize = Self::REFS.first_bit(); - const REF_MAX: usize = Self::REFS.raw_mask(); // const STATE_MASK: usize = - // Self::RUNNING.raw_mask() | Self::NOTIFIED.raw_mask() | Self::COMPLETED.raw_mask(); + // RUNNING.raw_mask() | NOTIFIED.raw_mask() | COMPLETED.raw_mask(); #[inline] - pub(crate) fn is_running(self) -> bool { - Self::RUNNING.contained_in_all(self.0) + pub(crate) fn ref_count(self) -> usize { + REFS.unpack(self.0) } #[inline] - pub(crate) fn is_notified(self) -> bool { - Self::NOTIFIED.contained_in_all(self.0) + pub(crate) fn is(self, field: PackUsize) -> bool { + field.contained_in_all(self.0) } #[inline] - pub(crate) fn is_completed(self) -> bool { - Self::NOTIFIED.contained_in_all(self.0) - } - - #[inline] - pub(crate) fn ref_count(self) -> usize { - Self::REFS.unpack(self.0) + pub(crate) fn set(self, field: PackUsize, value: bool) -> Self { + let value = if value { 1 } else { 0 }; + Self(field.pack(value, self.0)) } } impl fmt::Debug for State { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + struct FlagSet(State); + impl FlagSet { + const POLLING: PackUsize = POLLING; + const WOKEN: PackUsize = WOKEN; + const COMPLETED: PackUsize = COMPLETED; + + #[inline] + pub(crate) fn is(&self, field: PackUsize) -> bool { + self.0.is(field) + } + } + + impl fmt::Debug for FlagSet { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + let mut _has_states = false; + fmt_bits!(self, f, _has_states, POLLING, WOKEN, COMPLETED); + Ok(()) + } + } f.debug_struct("State") - .field("running", &self.is_running()) - .field("notified", &self.is_notified()) - .field("completed", &self.is_completed()) + .field("state", &FlagSet(*self)) .field("ref_count", &self.ref_count()) - .field("bits", &format_args!("{:#b}", self.0)) .finish() } } @@ -61,11 +66,51 @@ impl fmt::Binary for State { } } +const POLLING: PackUsize = PackUsize::least_significant(1); +const WOKEN: PackUsize = POLLING.next(1); +const COMPLETED: PackUsize = WOKEN.next(1); +const REFS: PackUsize = COMPLETED.remaining(); + +const REF_ONE: usize = REFS.first_bit(); +const REF_MAX: usize = REFS.raw_mask(); // === impl StateVar === impl StateVar { pub fn new() -> Self { - Self(AtomicUsize::new(State::REF_ONE)) + Self(AtomicUsize::new(REF_ONE)) + } + + pub(super) fn start_poll(&self) -> Result<(), State> { + self.transition(|state| { + // Cannot start polling a task which is being polled on another + // thread. + if test_dbg!(state.is(POLLING)) { + return Err(state); + } + + // Cannot start polling a completed task. + if test_dbg!(state.is(COMPLETED)) { + return Err(state); + } + + let new_state = state + // The task is now being polled. + .set(POLLING, true) + // If the task was woken, consume the wakeup. + .set(WOKEN, false); + Ok(test_dbg!(new_state)) + }) + } + + pub(super) fn end_poll(&self, completed: bool) -> Result<(), State> { + self.transition(|state| { + // Cannot end a poll if a task is not being polled! + debug_assert!(state.is(POLLING)); + debug_assert!(!state.is(COMPLETED)); + + let new_state = state.set(POLLING, false).set(COMPLETED, completed); + Ok(test_dbg!(new_state)) + }) } #[inline] @@ -81,7 +126,7 @@ impl StateVar { // another must already provide any required synchronization. // // [1]: (www.boost.org/doc/libs/1_55_0/doc/html/atomic/usage_examples.html) - let old_refs = test_dbg!(self.0.fetch_add(State::REF_ONE, Relaxed)); + let old_refs = test_dbg!(self.0.fetch_add(REF_ONE, Relaxed)); // However we need to guard against massive refcounts in case someone // is `mem::forget`ing tasks. If we don't do this the count can overflow @@ -92,7 +137,7 @@ impl StateVar { // // We abort because such a program is incredibly degenerate, and we // don't care to support it. - if test_dbg!(old_refs > State::REF_MAX) { + if test_dbg!(old_refs > REF_MAX) { panic!("task reference count overflow"); } } @@ -101,10 +146,10 @@ impl StateVar { pub(super) fn drop_ref(&self) -> bool { // Because `cores` is already atomic, we do not need to synchronize // with other threads unless we are going to delete the task. - let old_refs = test_dbg!(self.0.fetch_sub(State::REF_ONE, Release)); + let old_refs = test_dbg!(self.0.fetch_sub(REF_ONE, Release)); // Did we drop the last ref? - if test_dbg!(old_refs != State::REF_ONE) { + if test_dbg!(old_refs != REF_ONE) { return false; } @@ -115,6 +160,40 @@ impl StateVar { pub(super) fn load(&self, order: Ordering) -> State { State(self.0.load(order)) } + + /// Attempts to advance this task's state by running the provided fallible + /// `transition` function on the current [`State`]. + /// + /// The `transition` function should return an error if the desired state + /// transition is not possible from the task's current state, or return `Ok` + /// with a new [`State`] if the transition is possible. + /// + /// # Returns + /// + /// - `Ok(())` if the task was successfully transitioned. + /// - `Err(E)` with the error returned by the transition function if the + /// state transition is not possible. + fn transition( + &self, + mut transition: impl FnMut(State) -> Result, + ) -> Result<(), E> { + let mut current = self.load(Acquire); + loop { + // Try to run the transition function to transition from `current` + // to the next state. If the transition functiion fails (indicating + // that the requested transition is no longer reachable from the + // current state), bail. + let State(next) = transition(current)?; + + match self + .0 + .compare_exchange_weak(current.0, next, AcqRel, Acquire) + { + Ok(_) => return Ok(()), + Err(actual) => current = State(actual), + } + } + } } impl fmt::Debug for StateVar { @@ -130,10 +209,10 @@ mod tests { #[test] fn packing_specs_valid() { PackUsize::assert_all_valid(&[ - ("RUNNING", State::RUNNING), - ("NOTIFIED", State::NOTIFIED), - ("COMPLETED", State::COMPLETED), - ("REFS", State::REFS), + ("POLLING", POLLING), + ("WOKEN", WOKEN), + ("COMPLETED", COMPLETED), + ("REFS", REFS), ]) } } diff --git a/async/src/util.rs b/async/src/util.rs index 0b55d33a..d1156ecd 100644 --- a/async/src/util.rs +++ b/async/src/util.rs @@ -40,7 +40,7 @@ macro_rules! test_trace { macro_rules! fmt_bits { ($self: expr, $f: expr, $has_states: ident, $($name: ident),+) => { $( - if $self.contains(Self::$name) { + if $self.is(Self::$name) { if $has_states { $f.write_str(" | ")?; } diff --git a/async/src/wait/cell.rs b/async/src/wait/cell.rs index 55c6bac1..034117c4 100644 --- a/async/src/wait/cell.rs +++ b/async/src/wait/cell.rs @@ -62,10 +62,10 @@ impl WaitCell { // this is based on tokio's AtomicWaker synchronization strategy match test_dbg!(self.compare_exchange(State::WAITING, State::PARKING, Acquire)) { // someone else is notifying, so don't wait! - Err(actual) if test_dbg!(actual.contains(State::CLOSED)) => { + Err(actual) if test_dbg!(actual.is(State::CLOSED)) => { return wait::closed(); } - Err(actual) if test_dbg!(actual.contains(State::NOTIFYING)) => { + Err(actual) if test_dbg!(actual.is(State::NOTIFYING)) => { waker.wake_by_ref(); crate::loom::hint::spin_loop(); return wait::notified(); @@ -111,7 +111,7 @@ impl WaitCell { waker.wake(); } - if test_dbg!(state.contains(State::CLOSED)) { + if test_dbg!(state.is(State::CLOSED)) { wait::closed() } else { wait::notified() @@ -197,7 +197,7 @@ impl State { const NOTIFYING: Self = Self(0b10); const CLOSED: Self = Self(0b100); - fn contains(self, Self(state): Self) -> bool { + fn is(self, Self(state): Self) -> bool { self.0 & state == state } } From 1cc527077fd86056ac5f16152d8d7a55b4d2282f Mon Sep 17 00:00:00 2001 From: Eliza Weisman Date: Mon, 30 May 2022 11:13:57 -0700 Subject: [PATCH 2/8] use bitfield for task states Signed-off-by: Eliza Weisman --- Cargo.lock | 1 + async/Cargo.toml | 1 + async/src/task/state.rs | 110 ++++++++++++++-------------------------- 3 files changed, 41 insertions(+), 71 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 2d73762f..3c6eb93d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -532,6 +532,7 @@ dependencies = [ "cordyceps", "futures-util", "loom", + "mycelium-bitfield", "mycelium-util", "mycotest", "pin-project", diff --git a/async/Cargo.toml b/async/Cargo.toml index 5203394b..30b2301c 100644 --- a/async/Cargo.toml +++ b/async/Cargo.toml @@ -10,6 +10,7 @@ license = "MIT" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] +mycelium-bitfield = { path = "../bitfield" } mycelium-util = { path = "../util" } mycotest = { path = "../mycotest", default-features = false } cordyceps = { path = "../cordyceps" } diff --git a/async/src/task/state.rs b/async/src/task/state.rs index 98c5662d..41e8b98a 100644 --- a/async/src/task/state.rs +++ b/async/src/task/state.rs @@ -3,76 +3,47 @@ use crate::loom::sync::atomic::{ Ordering::{self, *}, }; use core::fmt; -use mycelium_util::bits::PackUsize; -#[derive(Clone, Copy)] -pub(crate) struct State(usize); +mycelium_bitfield::bitfield! { + /// A bitfield that represents a task's current state. + #[derive(PartialEq, Eq)] + pub(crate) struct State { + /// If set, this task is currently being polled. + pub(crate) const POLLING: bool; + + /// If set, this task's [`Waker`] has been woken. + /// + /// [`Waker`]: core::task::Waker + pub(crate) const WOKEN: bool; + + /// If set, this task's [`Future`] has completed (i.e., it has returned + /// [`Poll::Ready`]). + /// + /// [`Future`]: core::future::Future + /// [`Poll::Ready`]: core::task::Poll::Ready + pub(crate) const COMPLETED: bool; + + /// The number of currently live references to this task. + /// + /// When this is 0, the task may be deallocated. + const REFS = ..; + } + +} #[repr(transparent)] pub(super) struct StateVar(AtomicUsize); impl State { - // const STATE_MASK: usize = - // RUNNING.raw_mask() | NOTIFIED.raw_mask() | COMPLETED.raw_mask(); - #[inline] pub(crate) fn ref_count(self) -> usize { - REFS.unpack(self.0) - } - - #[inline] - pub(crate) fn is(self, field: PackUsize) -> bool { - field.contained_in_all(self.0) - } - - #[inline] - pub(crate) fn set(self, field: PackUsize, value: bool) -> Self { - let value = if value { 1 } else { 0 }; - Self(field.pack(value, self.0)) - } -} - -impl fmt::Debug for State { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - struct FlagSet(State); - impl FlagSet { - const POLLING: PackUsize = POLLING; - const WOKEN: PackUsize = WOKEN; - const COMPLETED: PackUsize = COMPLETED; - - #[inline] - pub(crate) fn is(&self, field: PackUsize) -> bool { - self.0.is(field) - } - } - - impl fmt::Debug for FlagSet { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - let mut _has_states = false; - fmt_bits!(self, f, _has_states, POLLING, WOKEN, COMPLETED); - Ok(()) - } - } - f.debug_struct("State") - .field("state", &FlagSet(*self)) - .field("ref_count", &self.ref_count()) - .finish() - } -} - -impl fmt::Binary for State { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - write!(f, "State({:#b})", self.0) + self.get(Self::REFS) } } -const POLLING: PackUsize = PackUsize::least_significant(1); -const WOKEN: PackUsize = POLLING.next(1); -const COMPLETED: PackUsize = WOKEN.next(1); -const REFS: PackUsize = COMPLETED.remaining(); +const REF_ONE: usize = State::REFS.first_bit(); +const REF_MAX: usize = State::REFS.raw_mask(); -const REF_ONE: usize = REFS.first_bit(); -const REF_MAX: usize = REFS.raw_mask(); // === impl StateVar === impl StateVar { @@ -84,20 +55,20 @@ impl StateVar { self.transition(|state| { // Cannot start polling a task which is being polled on another // thread. - if test_dbg!(state.is(POLLING)) { + if test_dbg!(state.get(State::POLLING)) { return Err(state); } // Cannot start polling a completed task. - if test_dbg!(state.is(COMPLETED)) { + if test_dbg!(state.get(State::COMPLETED)) { return Err(state); } let new_state = state // The task is now being polled. - .set(POLLING, true) + .with(State::POLLING, true) // If the task was woken, consume the wakeup. - .set(WOKEN, false); + .with(State::WOKEN, false); Ok(test_dbg!(new_state)) }) } @@ -105,10 +76,12 @@ impl StateVar { pub(super) fn end_poll(&self, completed: bool) -> Result<(), State> { self.transition(|state| { // Cannot end a poll if a task is not being polled! - debug_assert!(state.is(POLLING)); - debug_assert!(!state.is(COMPLETED)); + debug_assert!(state.get(State::POLLING)); + debug_assert!(!state.get(State::COMPLETED)); - let new_state = state.set(POLLING, false).set(COMPLETED, completed); + let new_state = state + .with(State::POLLING, false) + .with(State::COMPLETED, completed); Ok(test_dbg!(new_state)) }) } @@ -208,11 +181,6 @@ mod tests { #[test] fn packing_specs_valid() { - PackUsize::assert_all_valid(&[ - ("POLLING", POLLING), - ("WOKEN", WOKEN), - ("COMPLETED", COMPLETED), - ("REFS", REFS), - ]) + State::assert_valid() } } From 7da1faf3c2f161963f5d520453ee72795b9a7f8b Mon Sep 17 00:00:00 2001 From: Eliza Weisman Date: Mon, 30 May 2022 13:11:15 -0700 Subject: [PATCH 3/8] task state transitions now basically work Signed-off-by: Eliza Weisman --- async/src/task.rs | 72 +++++++++++------- async/src/task/state.rs | 160 +++++++++++++++++++++++++++++++--------- 2 files changed, 170 insertions(+), 62 deletions(-) diff --git a/async/src/task.rs b/async/src/task.rs index 3479b4fd..75327c5c 100644 --- a/async/src/task.rs +++ b/async/src/task.rs @@ -9,16 +9,17 @@ use cordyceps::{mpsc_queue, Linked}; pub use core::task::{Context, Poll, Waker}; use core::{ any::type_name, - fmt, future::Future, + mem, pin::Pin, ptr::NonNull, task::{RawWaker, RawWakerVTable}, }; +use mycelium_util::fmt; mod state; -use self::state::StateVar; +use self::state::{OrDrop, ScheduleAction, StateCell}; #[derive(Debug)] pub(crate) struct TaskRef(NonNull
); @@ -27,7 +28,7 @@ pub(crate) struct TaskRef(NonNull
); #[derive(Debug)] pub(crate) struct Header { run_queue: mpsc_queue::Links
, - state: StateVar, + state: StateCell, // task_list: list::Links, vtable: &'static Vtable, } @@ -62,7 +63,7 @@ macro_rules! trace_task { ($ptr:expr, $f:ty, $method:literal) => { tracing::trace!( ptr = ?$ptr, - output = type_name::<<$f>::Output>(), + output = %type_name::<<$f>::Output>(), concat!("Task::", $method), ); }; @@ -86,7 +87,7 @@ impl Task { header: Header { run_queue: mpsc_queue::Links::new(), vtable: &Self::TASK_VTABLE, - state: StateVar::new(), + state: StateCell::new(), }, scheduler, inner: UnsafeCell::new(Cell::Future(future)), @@ -94,14 +95,19 @@ impl Task { } fn raw_waker(this: *const Self) -> RawWaker { - unsafe { (*this).header.state.clone_ref() }; RawWaker::new(this as *const (), &Self::WAKER_VTABLE) } + #[inline] + fn state(&self) -> &StateCell { + &self.header.state + } + unsafe fn clone_waker(ptr: *const ()) -> RawWaker { trace_task!(ptr, F, "clone_waker"); - - Self::raw_waker(ptr as *const Self) + let this = ptr as *const Self; + (*this).state().clone_ref(); + Self::raw_waker(this) } unsafe fn drop_waker(ptr: *const ()) { @@ -115,13 +121,20 @@ impl Task { trace_task!(ptr, F, "wake_by_val"); let this = non_null(ptr as *mut ()).cast::(); - Self::schedule(this); + match this.as_ref().state().wake_by_val() { + OrDrop::Drop => drop(Box::from_raw(this.as_ptr())), + OrDrop::Action(ScheduleAction::Enqueue) => Self::schedule(this), + OrDrop::Action(ScheduleAction::None) => {} + } } unsafe fn wake_by_ref(ptr: *const ()) { trace_task!(ptr, F, "wake_by_ref"); - Self::schedule(non_null(ptr as *mut ()).cast::()) + let this = non_null(ptr as *mut ()).cast::(); + if this.as_ref().state().wake_by_ref() == ScheduleAction::Enqueue { + Self::schedule(this); + } } #[inline(always)] @@ -134,7 +147,7 @@ impl Task { #[inline] unsafe fn drop_ref(this: NonNull) { trace_task!(this, F, "drop_ref"); - if !this.as_ref().header.state.drop_ref() { + if !this.as_ref().state().drop_ref() { return; } @@ -143,29 +156,36 @@ impl Task { unsafe fn poll(ptr: NonNull
) -> Poll<()> { trace_task!(ptr, F, "poll"); - let ptr = ptr.cast::(); - + let mut this = ptr.cast::(); + test_trace!(task = ?fmt::alt(this.as_ref())); // try to transition the task to the polling state - let state = &ptr.as_ref().header.state; + let state = &this.as_ref().state(); match test_dbg!(state.start_poll()) { // transitioned successfully! Ok(_) => {} - Err(_) => { + Err(_state) => { // TODO(eliza): could run the dealloc glue here instead of going // through a ref cycle? return Poll::Ready(()); } } - let waker = Waker::from_raw(Self::raw_waker(ptr.as_ptr())); + + // wrap the waker in `ManuallyDrop` because we're converting it from an + // existing task ref, rather than incrementing the task ref count. if + // this waker is consumed during the poll, we don't want to decrement + // its ref count when the poll ends. + let waker = mem::ManuallyDrop::new(Waker::from_raw(Self::raw_waker(this.as_ptr()))); let cx = Context::from_waker(&waker); - let pin = Pin::new_unchecked(ptr.cast::().as_mut()); + + // actually poll the task + let pin = Pin::new_unchecked(this.as_mut()); let poll = pin.poll_inner(cx); - let completed = poll.is_ready(); - state - .end_poll(completed) - .expect("must transition out of polling"); - if completed { - Self::drop_ref(ptr); + + // post-poll state transition + match test_dbg!(state.end_poll(poll.is_ready())) { + OrDrop::Drop => drop(Box::from_raw(this.as_ptr())), + OrDrop::Action(ScheduleAction::Enqueue) => Self::schedule(this), + OrDrop::Action(ScheduleAction::None) => {} } poll @@ -201,9 +221,9 @@ unsafe impl Sync for Task {} impl fmt::Debug for Task { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.debug_struct("Task") - .field("future_type", &type_name::()) - .field("output_type", &type_name::()) - .field("scheduler_type", &type_name::()) + // .field("future_type", &fmt::display(type_name::())) + .field("output_type", &fmt::display(type_name::())) + .field("scheduler_type", &fmt::display(type_name::())) .field("header", &self.header) .field("inner", &self.inner) .finish() diff --git a/async/src/task/state.rs b/async/src/task/state.rs index 41e8b98a..43abc563 100644 --- a/async/src/task/state.rs +++ b/async/src/task/state.rs @@ -5,7 +5,7 @@ use crate::loom::sync::atomic::{ use core::fmt; mycelium_bitfield::bitfield! { - /// A bitfield that represents a task's current state. + /// A snapshot of a task's current state. #[derive(PartialEq, Eq)] pub(crate) struct State { /// If set, this task is currently being polled. @@ -31,37 +31,66 @@ mycelium_bitfield::bitfield! { } +/// An atomic cell that stores a task's current [`State`]. #[repr(transparent)] -pub(super) struct StateVar(AtomicUsize); +pub(super) struct StateCell(AtomicUsize); + +#[derive(Copy, Clone, Debug, PartialEq, Eq)] +pub(super) enum ScheduleAction { + /// The task should be enqueued. + Enqueue, + + /// The task does not need to be enqueued. + None, +} + +#[derive(Copy, Clone, Debug, PartialEq, Eq)] +pub(super) enum OrDrop { + /// Another action should be performed. + Action(T), + + /// The task should be deallocated. + Drop, +} + +pub(super) type WakeAction = OrDrop; impl State { #[inline] pub(crate) fn ref_count(self) -> usize { self.get(Self::REFS) } + + fn drop_ref(self) -> Self { + Self(self.0 - REF_ONE) + } + + fn clone_ref(self) -> Self { + Self(self.0 + REF_ONE) + } } const REF_ONE: usize = State::REFS.first_bit(); const REF_MAX: usize = State::REFS.raw_mask(); -// === impl StateVar === +// === impl StateCell === -impl StateVar { +impl StateCell { pub fn new() -> Self { Self(AtomicUsize::new(REF_ONE)) } - pub(super) fn start_poll(&self) -> Result<(), State> { + pub(super) fn start_poll(&self) -> Result { self.transition(|state| { // Cannot start polling a task which is being polled on another // thread. if test_dbg!(state.get(State::POLLING)) { - return Err(state); + return Err(*state); } // Cannot start polling a completed task. if test_dbg!(state.get(State::COMPLETED)) { - return Err(state); + return Err(*state); } let new_state = state @@ -69,20 +98,84 @@ impl StateVar { .with(State::POLLING, true) // If the task was woken, consume the wakeup. .with(State::WOKEN, false); - Ok(test_dbg!(new_state)) + *state = new_state; + Ok(new_state) }) } - pub(super) fn end_poll(&self, completed: bool) -> Result<(), State> { + pub(super) fn end_poll(&self, completed: bool) -> WakeAction { self.transition(|state| { // Cannot end a poll if a task is not being polled! debug_assert!(state.get(State::POLLING)); debug_assert!(!state.get(State::COMPLETED)); - - let new_state = state + let next_state = state .with(State::POLLING, false) .with(State::COMPLETED, completed); - Ok(test_dbg!(new_state)) + + // Was the task woken during the poll? + if !test_dbg!(completed) && test_dbg!(state.get(State::WOKEN)) { + *state = test_dbg!(next_state); + return OrDrop::Action(ScheduleAction::Enqueue); + } + + let next_state = test_dbg!(next_state.drop_ref()); + *state = next_state; + + if next_state.ref_count() == 0 { + OrDrop::Drop + } else { + OrDrop::Action(ScheduleAction::None) + } + }) + } + + /// Transition to the woken state by value, returning `true` if the task + /// should be enqueued. + pub(super) fn wake_by_val(&self) -> WakeAction { + self.transition(|state| { + // If the task was woken *during* a poll, it will be re-queued by the + // scheduler at the end of the poll if needed, so don't enqueue it now. + if test_dbg!(state.get(State::POLLING)) { + *state = state.with(State::WOKEN, true).drop_ref(); + assert!(state.ref_count() > 0); + + return OrDrop::Action(ScheduleAction::None); + } + + // If the task is already completed or woken, we don't need to + // requeue it, but decrement the ref count for the waker that was + // used for this wakeup. + if test_dbg!(state.get(State::COMPLETED)) || test_dbg!(state.get(State::WOKEN)) { + let new_state = state.drop_ref(); + *state = new_state; + return if new_state.ref_count() == 0 { + OrDrop::Drop + } else { + OrDrop::Action(ScheduleAction::None) + }; + } + + // Otherwise, transition to the notified state and enqueue the task. + *state = state.with(State::WOKEN, true).clone_ref(); + OrDrop::Action(ScheduleAction::Enqueue) + }) + } + + /// Transition to the woken state by ref, returning `true` if the task + /// should be enqueued. + pub(super) fn wake_by_ref(&self) -> ScheduleAction { + self.transition(|state| { + if test_dbg!(state.get(State::COMPLETED)) || test_dbg!(state.get(State::WOKEN)) { + return ScheduleAction::None; + } + + if test_dbg!(state.get(State::POLLING)) { + state.set(State::WOKEN, true); + return ScheduleAction::None; + } + + *state = state.with(State::WOKEN, true).clone_ref(); + ScheduleAction::Enqueue }) } @@ -134,48 +227,37 @@ impl StateVar { State(self.0.load(order)) } - /// Attempts to advance this task's state by running the provided fallible + /// Advance this task's state by running the provided /// `transition` function on the current [`State`]. - /// - /// The `transition` function should return an error if the desired state - /// transition is not possible from the task's current state, or return `Ok` - /// with a new [`State`] if the transition is possible. - /// - /// # Returns - /// - /// - `Ok(())` if the task was successfully transitioned. - /// - `Err(E)` with the error returned by the transition function if the - /// state transition is not possible. - fn transition( - &self, - mut transition: impl FnMut(State) -> Result, - ) -> Result<(), E> { + fn transition(&self, mut transition: impl FnMut(&mut State) -> T) -> T { let mut current = self.load(Acquire); loop { - // Try to run the transition function to transition from `current` - // to the next state. If the transition functiion fails (indicating - // that the requested transition is no longer reachable from the - // current state), bail. - let State(next) = transition(current)?; + let mut next = current; + // Run the transition function. + let res = transition(&mut next); + + if current.0 == next.0 { + return res; + } match self .0 - .compare_exchange_weak(current.0, next, AcqRel, Acquire) + .compare_exchange_weak(current.0, next.0, AcqRel, Acquire) { - Ok(_) => return Ok(()), + Ok(_) => return res, Err(actual) => current = State(actual), } } } } -impl fmt::Debug for StateVar { +impl fmt::Debug for StateCell { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { self.load(Relaxed).fmt(f) } } -#[cfg(test)] +#[cfg(all(test, not(loom)))] mod tests { use super::*; @@ -183,4 +265,10 @@ mod tests { fn packing_specs_valid() { State::assert_valid() } + + #[test] + fn debug_alt() { + let state = StateCell::new(); + println!("{:#?}", state); + } } From 68c0277e17b48aa1fe0406d3cb231b9c4e1b9cf0 Mon Sep 17 00:00:00 2001 From: Eliza Weisman Date: Mon, 30 May 2022 14:05:54 -0700 Subject: [PATCH 4/8] add tests for external wakeups Signed-off-by: Eliza Weisman --- async/Cargo.toml | 6 ++- async/src/scheduler.rs | 113 ++++++++++++++++++++++++++++++++++++++++- async/src/wait.rs | 4 +- async/src/wait/cell.rs | 111 +++++++++++++++++++++++++--------------- 4 files changed, 186 insertions(+), 48 deletions(-) diff --git a/async/Cargo.toml b/async/Cargo.toml index 30b2301c..e29edcbe 100644 --- a/async/Cargo.toml +++ b/async/Cargo.toml @@ -21,8 +21,10 @@ package = "tracing" default_features = false git = "https://github.com/tokio-rs/tracing" +[dev-dependencies] +futures-util = "0.3" + [target.'cfg(loom)'.dev-dependencies] loom = { version = "0.5.5", features = ["futures"] } tracing_01 = { package = "tracing", version = "0.1", default_features = false } -tracing_subscriber_03 = { package = "tracing-subscriber", version = "0.3.11", features = ["fmt"] } -futures-util = "0.3" \ No newline at end of file +tracing_subscriber_03 = { package = "tracing-subscriber", version = "0.3.11", features = ["fmt"] } \ No newline at end of file diff --git a/async/src/scheduler.rs b/async/src/scheduler.rs index 9028d22f..ef7d8f15 100644 --- a/async/src/scheduler.rs +++ b/async/src/scheduler.rs @@ -184,7 +184,7 @@ impl Future for Stub { #[cfg(all(test, not(loom)))] mod tests { - use super::test_util::Yield; + use super::test_util::{Chan, Yield}; use super::*; use core::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; use mycelium_util::sync::Lazy; @@ -229,6 +229,57 @@ mod tests { assert!(!tick.has_remaining); } + #[test] + fn notify_future() { + static SCHEDULER: Lazy = Lazy::new(StaticScheduler::new); + static COMPLETED: AtomicUsize = AtomicUsize::new(0); + + let chan = Chan::new(1); + + SCHEDULER.spawn({ + let chan = chan.clone(); + async move { + chan.wait().await; + COMPLETED.fetch_add(1, Ordering::SeqCst); + } + }); + + SCHEDULER.spawn(async move { + Yield::once().await; + chan.notify(); + }); + + dbg!(SCHEDULER.tick()); + + assert_eq!(COMPLETED.load(Ordering::SeqCst), 1); + } + + #[test] + fn notify_external() { + static SCHEDULER: Lazy = Lazy::new(StaticScheduler::new); + static COMPLETED: AtomicUsize = AtomicUsize::new(0); + + let chan = Chan::new(1); + + SCHEDULER.spawn({ + let chan = chan.clone(); + async move { + chan.wait().await; + COMPLETED.fetch_add(1, Ordering::SeqCst); + } + }); + + dbg!(SCHEDULER.tick()); + + std::thread::spawn(move || { + chan.notify(); + }); + + dbg!(SCHEDULER.tick()); + + assert_eq!(COMPLETED.load(Ordering::SeqCst), 1); + } + #[test] fn many_yields() { static SCHEDULER: Lazy = Lazy::new(StaticScheduler::new); @@ -253,7 +304,7 @@ mod tests { #[cfg(all(test, loom))] mod loom { - use super::test_util::Yield; + use super::test_util::{Chan, Yield}; use super::*; use crate::loom::{ self, @@ -261,6 +312,7 @@ mod loom { atomic::{AtomicBool, AtomicUsize, Ordering}, Arc, }, + thread, }; use core::{ future::Future, @@ -316,6 +368,61 @@ mod loom { }) } + #[test] + fn notify_external() { + loom::model(|| { + let scheduler = Scheduler::new(); + let chan = Chan::new(1); + let it_worked = Arc::new(AtomicBool::new(false)); + + scheduler.spawn({ + let it_worked = it_worked.clone(); + let chan = chan.clone(); + track_future(async move { + chan.wait().await; + it_worked.store(true, Ordering::Release); + }) + }); + + thread::spawn(move || { + chan.notify(); + }); + + while scheduler.tick().completed < 1 { + thread::yield_now(); + } + + assert!(it_worked.load(Ordering::Acquire)); + }) + } + + #[test] + fn notify_future() { + loom::model(|| { + let scheduler = Scheduler::new(); + let chan = Chan::new(1); + let it_worked = Arc::new(AtomicBool::new(false)); + + scheduler.spawn({ + let it_worked = it_worked.clone(); + let chan = chan.clone(); + track_future(async move { + chan.wait().await; + it_worked.store(true, Ordering::Release); + }) + }); + + scheduler.spawn(async move { + Yield::once().await; + chan.notify(); + }); + + test_dbg!(scheduler.tick()); + + assert!(it_worked.load(Ordering::Acquire)); + }) + } + #[test] fn schedule_many() { const TASKS: usize = 10; @@ -391,6 +498,8 @@ mod test_util { task::{Context, Poll}, }; + pub(crate) use crate::wait::cell::test_util::Chan; + pub(crate) struct Yield { yields: usize, } diff --git a/async/src/wait.rs b/async/src/wait.rs index cc4d6841..b7309e52 100644 --- a/async/src/wait.rs +++ b/async/src/wait.rs @@ -3,14 +3,14 @@ //! This module implements two types of structure for waiting: a [`WaitCell`], //! which stores a *single* waiting task, and a wait *queue*, which //! stores a queue of waiting tasks. -mod cell; +pub(crate) mod cell; pub use cell::WaitCell; use core::task::Poll; /// An error indicating that a [`WaitCell`] or queue was closed while attempting /// register a waiter. -#[derive(Clone, Debug)] +#[derive(Clone, Debug, PartialEq, Eq)] pub struct Closed(()); pub type WaitResult = Result<(), Closed>; diff --git a/async/src/wait/cell.rs b/async/src/wait/cell.rs index 034117c4..25f6403e 100644 --- a/async/src/wait/cell.rs +++ b/async/src/wait/cell.rs @@ -56,7 +56,7 @@ impl WaitCell { } impl WaitCell { - pub fn wait(&self, waker: &Waker) -> Poll { + pub fn poll_wait(&self, waker: &Waker) -> Poll { tracing::trace!(wait_cell = ?fmt::ptr(self), ?waker, "registering waker"); // this is based on tokio's AtomicWaker synchronization strategy @@ -238,87 +238,114 @@ impl fmt::Debug for State { } } -#[cfg(all(loom, test))] -mod loom { +#[cfg(test)] +#[allow(dead_code)] +pub(crate) mod test_util { use super::*; - use crate::loom::{ - future, - sync::atomic::{AtomicUsize, Ordering::Relaxed}, - thread, - }; - use core::task::Poll; + + use crate::loom::sync::atomic::{AtomicUsize, Ordering::Relaxed}; use std::sync::Arc; - struct Chan { + #[derive(Debug)] + pub(crate) struct Chan { num: AtomicUsize, task: WaitCell, + num_notify: usize, } - const NUM_NOTIFY: usize = 2; + impl Chan { + pub(crate) fn new(num_notify: usize) -> Arc { + Arc::new(Self { + num: AtomicUsize::new(0), + task: WaitCell::new(), + num_notify, + }) + } - async fn wait_on(chan: Arc) { - futures_util::future::poll_fn(move |cx| { - let res = test_dbg!(chan.task.wait(cx.waker())); + pub(crate) async fn wait(self: Arc) { + let this = Arc::downgrade(&self); + drop(self); + futures_util::future::poll_fn(move |cx| { + let this = match this.upgrade() { + Some(this) => this, + None => return Poll::Ready(()), + }; - if NUM_NOTIFY == chan.num.load(Relaxed) { - return Poll::Ready(()); - } + let res = test_dbg!(this.task.poll_wait(cx.waker())); - if res.is_ready() { - return Poll::Ready(()); - } + if this.num_notify == this.num.load(Relaxed) { + return Poll::Ready(()); + } + + if res.is_ready() { + return Poll::Ready(()); + } + + Poll::Pending + }) + .await + } + + pub(crate) fn notify(&self) { + self.num.fetch_add(1, Relaxed); + self.task.notify(); + } - Poll::Pending - }) - .await + pub(crate) fn close(&self) { + self.num.fetch_add(1, Relaxed); + self.task.close(); + } } + impl Drop for Chan { + fn drop(&mut self) { + tracing::debug!(chan = ?fmt::alt(self), "drop") + } + } +} + +#[cfg(all(loom, test))] +mod loom { + use super::*; + use crate::loom::{future, thread}; + + const NUM_NOTIFY: usize = 2; + #[test] - fn basic_notification() { + fn basic_latch() { crate::loom::model(|| { - let chan = Arc::new(Chan { - num: AtomicUsize::new(0), - task: WaitCell::new(), - }); + let chan = test_util::Chan::new(NUM_NOTIFY); for _ in 0..NUM_NOTIFY { let chan = chan.clone(); - thread::spawn(move || { - chan.num.fetch_add(1, Relaxed); - chan.task.notify(); - }); + thread::spawn(move || chan.notify()); } - future::block_on(wait_on(chan)); + future::block_on(chan.wait()); }); } #[test] fn close() { crate::loom::model(|| { - let chan = Arc::new(Chan { - num: AtomicUsize::new(0), - task: WaitCell::new(), - }); + let chan = test_util::Chan::new(NUM_NOTIFY); thread::spawn({ let chan = chan.clone(); move || { - chan.num.fetch_add(1, Relaxed); - chan.task.notify(); + chan.notify(); } }); thread::spawn({ let chan = chan.clone(); move || { - chan.num.fetch_add(1, Relaxed); - chan.task.close(); + chan.close(); } }); - future::block_on(wait_on(chan)); + future::block_on(chan.wait()); }); } } From 0b3b46405d7ea16c7c12830a3b3b82ed70c18a09 Mon Sep 17 00:00:00 2001 From: Eliza Weisman Date: Mon, 30 May 2022 14:06:23 -0700 Subject: [PATCH 5/8] fix task leak in wake_by_val Signed-off-by: Eliza Weisman --- async/src/task.rs | 13 ++++++++++++- 1 file changed, 12 insertions(+), 1 deletion(-) diff --git a/async/src/task.rs b/async/src/task.rs index 75327c5c..554c534c 100644 --- a/async/src/task.rs +++ b/async/src/task.rs @@ -123,7 +123,18 @@ impl Task { let this = non_null(ptr as *mut ()).cast::(); match this.as_ref().state().wake_by_val() { OrDrop::Drop => drop(Box::from_raw(this.as_ptr())), - OrDrop::Action(ScheduleAction::Enqueue) => Self::schedule(this), + OrDrop::Action(ScheduleAction::Enqueue) => { + // the task should be enqueued. + // + // in the case that the task is enqueued, the state + // transition does *not* decrement the reference count. this is + // in order to avoid dropping the task while it is being + // scheduled. one reference is consumed by enqueuing the task... + Self::schedule(this); + // now that the task has been enqueued, decrement the reference + // count to drop the waker that performed the `wake_by_val`. + Self::drop_ref(this); + } OrDrop::Action(ScheduleAction::None) => {} } } From 9101c971f8367dccb833a56432e9863736260c65 Mon Sep 17 00:00:00 2001 From: Eliza Weisman Date: Mon, 30 May 2022 14:32:14 -0700 Subject: [PATCH 6/8] fix drop_refs not masking out state bits Signed-off-by: Eliza Weisman --- async/src/task/state.rs | 21 +++++++++++++++------ 1 file changed, 15 insertions(+), 6 deletions(-) diff --git a/async/src/task/state.rs b/async/src/task/state.rs index 43abc563..36b2fc72 100644 --- a/async/src/task/state.rs +++ b/async/src/task/state.rs @@ -192,7 +192,8 @@ impl StateCell { // another must already provide any required synchronization. // // [1]: (www.boost.org/doc/libs/1_55_0/doc/html/atomic/usage_examples.html) - let old_refs = test_dbg!(self.0.fetch_add(REF_ONE, Relaxed)); + let old_refs = self.0.fetch_add(REF_ONE, Relaxed); + test_dbg!(State::REFS.unpack(old_refs)); // However we need to guard against massive refcounts in case someone // is `mem::forget`ing tasks. If we don't do this the count can overflow @@ -210,12 +211,20 @@ impl StateCell { #[inline] pub(super) fn drop_ref(&self) -> bool { - // Because `cores` is already atomic, we do not need to synchronize - // with other threads unless we are going to delete the task. - let old_refs = test_dbg!(self.0.fetch_sub(REF_ONE, Release)); + // We do not need to synchronize with other cores unless we are going to + // delete the task. + let old_refs = self.0.fetch_sub(REF_ONE, Release); + + // Manually shift over the refcount to clear the state bits. We don't + // use the packing spec here, because it would also mask out any high + // bits, and we can avoid doing the bitwise-and (since there are no + // higher bits that are not part of the ref count). This is probably a + // premature optimization lol. + let old_refs = old_refs >> State::REFS.least_significant_index(); + test_dbg!(State::REFS.unpack(old_refs)); // Did we drop the last ref? - if test_dbg!(old_refs != REF_ONE) { + if test_dbg!(old_refs) > 1 { return false; } @@ -232,7 +241,7 @@ impl StateCell { fn transition(&self, mut transition: impl FnMut(&mut State) -> T) -> T { let mut current = self.load(Acquire); loop { - let mut next = current; + let mut next = test_dbg!(current); // Run the transition function. let res = transition(&mut next); From c44bae3fac0030a6382af8835c21d7cdd5882d2a Mon Sep 17 00:00:00 2001 From: Eliza Weisman Date: Mon, 30 May 2022 14:32:31 -0700 Subject: [PATCH 7/8] trace action in wake_by_val Signed-off-by: Eliza Weisman --- async/src/task.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/async/src/task.rs b/async/src/task.rs index 554c534c..5b44d4b4 100644 --- a/async/src/task.rs +++ b/async/src/task.rs @@ -121,7 +121,7 @@ impl Task { trace_task!(ptr, F, "wake_by_val"); let this = non_null(ptr as *mut ()).cast::(); - match this.as_ref().state().wake_by_val() { + match test_dbg!(this.as_ref().state().wake_by_val()) { OrDrop::Drop => drop(Box::from_raw(this.as_ptr())), OrDrop::Action(ScheduleAction::Enqueue) => { // the task should be enqueued. From 41da2f5b82ed9fce3e3d2e9ae5bb50d8229d3067 Mon Sep 17 00:00:00 2001 From: Eliza Weisman Date: Mon, 30 May 2022 15:56:47 -0700 Subject: [PATCH 8/8] wip Signed-off-by: Eliza Weisman --- bitfield/src/bitfield.rs | 92 ++++++++++++++++++++++++++++++++-------- 1 file changed, 75 insertions(+), 17 deletions(-) diff --git a/bitfield/src/bitfield.rs b/bitfield/src/bitfield.rs index db3af151..f2d2ad54 100644 --- a/bitfield/src/bitfield.rs +++ b/bitfield/src/bitfield.rs @@ -169,14 +169,15 @@ /// /// let my_bitfield = TypedBitfield::from_bits(0b0011_0101_1001_1110); /// let formatted = format!("{my_bitfield}"); +/// println!("{formatted}"); /// let expected = r#" -/// 00000000000000000011010110011110 -/// └┬─────┘││└┬───┘└┤ -/// │ ││ │ └ ENUM_VALUE: Baz (10) -/// │ ││ └────── SOME_BITS: 39 (100111) -/// │ │└─────────── FLAG_1: true (1) -/// │ └──────────── FLAG_2: false (0) -/// └─────────────────── A_BYTE: 13 (00001101) +/// 000011010110011110 +/// └┬─────┘││└┬───┘└┤ +/// │ ││ │ └ ENUM_VALUE: Baz (10) +/// │ ││ └────── SOME_BITS: 39 (100111) +/// │ │└─────────── FLAG_1: true (1) +/// │ └──────────── FLAG_2: false (0) +/// └─────────────────── A_BYTE: 13 (00001101) /// "#.trim_start(); /// assert_eq!(formatted, expected); /// ``` @@ -332,10 +333,25 @@ macro_rules! bitfield { #[automatically_derived] impl core::fmt::Display for $Name { fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result { + let mut truncated = self.0.leading_zeros(); + let mut width = $T::BITS - truncated; + let most_sig_field = Self::FIELDS[Self::FIELDS.len() - 1].1; + if width < most_sig_field.least_significant_index() + 1 { + width = most_sig_field.least_significant_index() + 3; + truncated = $T::BITS - (most_sig_field.least_significant_index() + 3); + } + let diff = most_sig_field.most_significant_index() as i32 - (width as i32); + if diff > 5 { + width += 1; + truncated -= 1; + } else if diff > 0 { + width += diff as u32; + truncated -= diff as u32; + } f.pad("")?; - writeln!(f, "{:0width$b}", self.0, width = $T::BITS as usize)?; + writeln!(f, "{:0width$b}", self.0, width = width as usize)?; f.pad("")?; - let mut cur_pos = $T::BITS; + let mut cur_pos = width; let mut max_len = 0; let mut rem = 0; let mut fields = Self::FIELDS.iter().rev().peekable(); @@ -345,6 +361,7 @@ macro_rules! bitfield { cur_pos -= 1; } let bits = field.bits(); + let mut sub_bits = bits; match (name, bits) { (name, bits) if name.starts_with("_") => { for _ in 0..bits { @@ -356,8 +373,15 @@ macro_rules! bitfield { (_, 1) => f.write_str("│")?, (_, 2) => f.write_str("└┤")?, (_, bits) => { - f.write_str("└┬")?; - for _ in 0..(bits - 3) { + let n_underlines = if field.most_significant_index() > cur_pos { + f.write_str("⋯ ┬")?; + sub_bits -= (field.most_significant_index() - width); + 4 + } else { + f.write_str("└┬")?; + 3 + }; + for _ in 0..(sub_bits.saturating_sub(n_underlines)) { f.write_str("─")?; } f.write_str("┘")?; @@ -365,11 +389,11 @@ macro_rules! bitfield { } if fields.peek().is_none() { - rem = cur_pos - (bits - 1); + rem = cur_pos - (sub_bits - 1); } max_len = core::cmp::max(max_len, name.len()); - cur_pos -= field.bits() + cur_pos -= sub_bits; } f.write_str("\n")?; @@ -379,7 +403,7 @@ macro_rules! bitfield { let name = stringify!($Field); if !name.starts_with("_") { f.pad("")?; - cur_pos = $T::BITS; + cur_pos = width; for (cur_name, cur_field) in Self::FIELDS.iter().rev() { while cur_pos > cur_field.most_significant_index() { f.write_str(" ")?; @@ -390,7 +414,13 @@ macro_rules! bitfield { break; } - let bits = cur_field.bits(); + let mut bits = cur_field.bits(); + let whitespace = if cur_field.most_significant_index() > cur_pos { + bits -= (cur_field.most_significant_index() - width); + true + } else { + false + }; match (cur_name, bits) { (name, bits) if name.starts_with("_") => { for _ in 0..bits { @@ -398,6 +428,12 @@ macro_rules! bitfield { } } (_, 1) => f.write_str("│")?, + (_, bits) if whitespace => { + f.write_str(" │")?; + for _ in 0..bits.saturating_sub(3) { + f.write_str(" ")?; + } + } (_, bits) => { f.write_str(" │")?; for _ in 0..(bits - 2) { @@ -409,10 +445,14 @@ macro_rules! bitfield { cur_pos -= bits; } - let field_bits = field.bits(); + let mut field_bits = field.bits(); if field_bits == 1 { f.write_str("└")?; cur_pos -= 1; + } else if field.most_significant_index() > width { + f.write_str(" └")?; + cur_pos -= 3; + field_bits -= truncated; } else { f.write_str(" └")?; cur_pos -= 2; @@ -633,7 +673,7 @@ mod tests { .with(TestBitfield::OF, 0) .with(TestBitfield::FUN, 9); println!("{}", test_bitfield); - + println!("empty:\n{}", TestBitfield::new()); let test_debug = TestDebug { value: 42, bits: test_bitfield, @@ -644,6 +684,24 @@ mod tests { println!("test_debug: {:?}", test_debug) } + #[test] + fn many_leading_zeros() { + bitfield! { + #[allow(dead_code)] + struct ManyLeadingZeros { + const A = 4; + const B: bool; + const C: bool; + const D = ..; + } + } + + let bitfield = ManyLeadingZeros::from_bits(0b1100_1011_0110); + println!("{bitfield}"); + let empty = ManyLeadingZeros::new(); + println!("{empty}"); + } + #[test] fn macro_bitfield_valid() { TestBitfield::assert_valid();