Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

wip: truncate leading zeros in bitfield display output #178

Draft
wants to merge 8 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 5 additions & 2 deletions async/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ license = "MIT"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
mycelium-bitfield = { path = "../bitfield" }
mycelium-util = { path = "../util" }
mycotest = { path = "../mycotest", default-features = false }
cordyceps = { path = "../cordyceps" }
Expand All @@ -20,8 +21,10 @@ package = "tracing"
default_features = false
git = "https://github.com/tokio-rs/tracing"

[dev-dependencies]
futures-util = "0.3"

[target.'cfg(loom)'.dev-dependencies]
loom = { version = "0.5.5", features = ["futures"] }
tracing_01 = { package = "tracing", version = "0.1", default_features = false }
tracing_subscriber_03 = { package = "tracing-subscriber", version = "0.3.11", features = ["fmt"] }
futures-util = "0.3"
tracing_subscriber_03 = { package = "tracing-subscriber", version = "0.3.11", features = ["fmt"] }
113 changes: 111 additions & 2 deletions async/src/scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ impl Future for Stub {

#[cfg(all(test, not(loom)))]
mod tests {
use super::test_util::Yield;
use super::test_util::{Chan, Yield};
use super::*;
use core::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use mycelium_util::sync::Lazy;
Expand Down Expand Up @@ -229,6 +229,57 @@ mod tests {
assert!(!tick.has_remaining);
}

#[test]
fn notify_future() {
static SCHEDULER: Lazy<StaticScheduler> = Lazy::new(StaticScheduler::new);
static COMPLETED: AtomicUsize = AtomicUsize::new(0);

let chan = Chan::new(1);

SCHEDULER.spawn({
let chan = chan.clone();
async move {
chan.wait().await;
COMPLETED.fetch_add(1, Ordering::SeqCst);
}
});

SCHEDULER.spawn(async move {
Yield::once().await;
chan.notify();
});

dbg!(SCHEDULER.tick());

assert_eq!(COMPLETED.load(Ordering::SeqCst), 1);
}

#[test]
fn notify_external() {
static SCHEDULER: Lazy<StaticScheduler> = Lazy::new(StaticScheduler::new);
static COMPLETED: AtomicUsize = AtomicUsize::new(0);

let chan = Chan::new(1);

SCHEDULER.spawn({
let chan = chan.clone();
async move {
chan.wait().await;
COMPLETED.fetch_add(1, Ordering::SeqCst);
}
});

dbg!(SCHEDULER.tick());

std::thread::spawn(move || {
chan.notify();
});

dbg!(SCHEDULER.tick());

assert_eq!(COMPLETED.load(Ordering::SeqCst), 1);
}

#[test]
fn many_yields() {
static SCHEDULER: Lazy<StaticScheduler> = Lazy::new(StaticScheduler::new);
Expand All @@ -253,14 +304,15 @@ mod tests {

#[cfg(all(test, loom))]
mod loom {
use super::test_util::Yield;
use super::test_util::{Chan, Yield};
use super::*;
use crate::loom::{
self,
sync::{
atomic::{AtomicBool, AtomicUsize, Ordering},
Arc,
},
thread,
};
use core::{
future::Future,
Expand Down Expand Up @@ -316,6 +368,61 @@ mod loom {
})
}

#[test]
fn notify_external() {
loom::model(|| {
let scheduler = Scheduler::new();
let chan = Chan::new(1);
let it_worked = Arc::new(AtomicBool::new(false));

scheduler.spawn({
let it_worked = it_worked.clone();
let chan = chan.clone();
track_future(async move {
chan.wait().await;
it_worked.store(true, Ordering::Release);
})
});

thread::spawn(move || {
chan.notify();
});

while scheduler.tick().completed < 1 {
thread::yield_now();
}

assert!(it_worked.load(Ordering::Acquire));
})
}

#[test]
fn notify_future() {
loom::model(|| {
let scheduler = Scheduler::new();
let chan = Chan::new(1);
let it_worked = Arc::new(AtomicBool::new(false));

scheduler.spawn({
let it_worked = it_worked.clone();
let chan = chan.clone();
track_future(async move {
chan.wait().await;
it_worked.store(true, Ordering::Release);
})
});

scheduler.spawn(async move {
Yield::once().await;
chan.notify();
});

test_dbg!(scheduler.tick());

assert!(it_worked.load(Ordering::Acquire));
})
}

#[test]
fn schedule_many() {
const TASKS: usize = 10;
Expand Down Expand Up @@ -391,6 +498,8 @@ mod test_util {
task::{Context, Poll},
};

pub(crate) use crate::wait::cell::test_util::Chan;

pub(crate) struct Yield {
yields: usize,
}
Expand Down
87 changes: 67 additions & 20 deletions async/src/task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,16 +9,17 @@ use cordyceps::{mpsc_queue, Linked};
pub use core::task::{Context, Poll, Waker};
use core::{
any::type_name,
fmt,
future::Future,
mem,
pin::Pin,
ptr::NonNull,
task::{RawWaker, RawWakerVTable},
};

use mycelium_util::fmt;
mod state;

use self::state::StateVar;
use self::state::{OrDrop, ScheduleAction, StateCell};

#[derive(Debug)]
pub(crate) struct TaskRef(NonNull<Header>);
Expand All @@ -27,7 +28,7 @@ pub(crate) struct TaskRef(NonNull<Header>);
#[derive(Debug)]
pub(crate) struct Header {
run_queue: mpsc_queue::Links<Header>,
state: StateVar,
state: StateCell,
// task_list: list::Links<TaskRef>,
vtable: &'static Vtable,
}
Expand Down Expand Up @@ -62,8 +63,8 @@ macro_rules! trace_task {
($ptr:expr, $f:ty, $method:literal) => {
tracing::trace!(
ptr = ?$ptr,
concat!("Task::<Output = {}>::", $method),
type_name::<<$f>::Output>()
output = %type_name::<<$f>::Output>(),
concat!("Task::", $method),
);
};
}
Expand All @@ -86,22 +87,27 @@ impl<S: Schedule, F: Future> Task<S, F> {
header: Header {
run_queue: mpsc_queue::Links::new(),
vtable: &Self::TASK_VTABLE,
state: StateVar::new(),
state: StateCell::new(),
},
scheduler,
inner: UnsafeCell::new(Cell::Future(future)),
})
}

fn raw_waker(this: *const Self) -> RawWaker {
unsafe { (*this).header.state.clone_ref() };
RawWaker::new(this as *const (), &Self::WAKER_VTABLE)
}

#[inline]
fn state(&self) -> &StateCell {
&self.header.state
}

unsafe fn clone_waker(ptr: *const ()) -> RawWaker {
trace_task!(ptr, F, "clone_waker");

Self::raw_waker(ptr as *const Self)
let this = ptr as *const Self;
(*this).state().clone_ref();
Self::raw_waker(this)
}

unsafe fn drop_waker(ptr: *const ()) {
Expand All @@ -115,13 +121,31 @@ impl<S: Schedule, F: Future> Task<S, F> {
trace_task!(ptr, F, "wake_by_val");

let this = non_null(ptr as *mut ()).cast::<Self>();
Self::schedule(this);
match test_dbg!(this.as_ref().state().wake_by_val()) {
OrDrop::Drop => drop(Box::from_raw(this.as_ptr())),
OrDrop::Action(ScheduleAction::Enqueue) => {
// the task should be enqueued.
//
// in the case that the task is enqueued, the state
// transition does *not* decrement the reference count. this is
// in order to avoid dropping the task while it is being
// scheduled. one reference is consumed by enqueuing the task...
Self::schedule(this);
// now that the task has been enqueued, decrement the reference
// count to drop the waker that performed the `wake_by_val`.
Self::drop_ref(this);
}
OrDrop::Action(ScheduleAction::None) => {}
}
}

unsafe fn wake_by_ref(ptr: *const ()) {
trace_task!(ptr, F, "wake_by_ref");

Self::schedule(non_null(ptr as *mut ()).cast::<Self>())
let this = non_null(ptr as *mut ()).cast::<Self>();
if this.as_ref().state().wake_by_ref() == ScheduleAction::Enqueue {
Self::schedule(this);
}
}

#[inline(always)]
Expand All @@ -134,7 +158,7 @@ impl<S: Schedule, F: Future> Task<S, F> {
#[inline]
unsafe fn drop_ref(this: NonNull<Self>) {
trace_task!(this, F, "drop_ref");
if !this.as_ref().header.state.drop_ref() {
if !this.as_ref().state().drop_ref() {
return;
}

Expand All @@ -143,13 +167,36 @@ impl<S: Schedule, F: Future> Task<S, F> {

unsafe fn poll(ptr: NonNull<Header>) -> Poll<()> {
trace_task!(ptr, F, "poll");
let ptr = ptr.cast::<Self>();
let waker = Waker::from_raw(Self::raw_waker(ptr.as_ptr()));
let mut this = ptr.cast::<Self>();
test_trace!(task = ?fmt::alt(this.as_ref()));
// try to transition the task to the polling state
let state = &this.as_ref().state();
match test_dbg!(state.start_poll()) {
// transitioned successfully!
Ok(_) => {}
Err(_state) => {
// TODO(eliza): could run the dealloc glue here instead of going
// through a ref cycle?
return Poll::Ready(());
}
}

// wrap the waker in `ManuallyDrop` because we're converting it from an
// existing task ref, rather than incrementing the task ref count. if
// this waker is consumed during the poll, we don't want to decrement
// its ref count when the poll ends.
let waker = mem::ManuallyDrop::new(Waker::from_raw(Self::raw_waker(this.as_ptr())));
let cx = Context::from_waker(&waker);
let pin = Pin::new_unchecked(ptr.cast::<Self>().as_mut());

// actually poll the task
let pin = Pin::new_unchecked(this.as_mut());
let poll = pin.poll_inner(cx);
if poll.is_ready() {
Self::drop_ref(ptr);

// post-poll state transition
match test_dbg!(state.end_poll(poll.is_ready())) {
OrDrop::Drop => drop(Box::from_raw(this.as_ptr())),
OrDrop::Action(ScheduleAction::Enqueue) => Self::schedule(this),
OrDrop::Action(ScheduleAction::None) => {}
}

poll
Expand Down Expand Up @@ -185,9 +232,9 @@ unsafe impl<S: Sync, F: Future + Sync> Sync for Task<S, F> {}
impl<S, F: Future> fmt::Debug for Task<S, F> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("Task")
.field("future_type", &type_name::<F>())
.field("output_type", &type_name::<F::Output>())
.field("scheduler_type", &type_name::<S>())
// .field("future_type", &fmt::display(type_name::<F>()))
.field("output_type", &fmt::display(type_name::<F::Output>()))
.field("scheduler_type", &fmt::display(type_name::<S>()))
.field("header", &self.header)
.field("inner", &self.inner)
.finish()
Expand Down
Loading