Skip to content

Commit

Permalink
no-std recapn-channel
Browse files Browse the repository at this point in the history
  • Loading branch information
mikea committed Dec 12, 2024
1 parent 9e80d53 commit 36a77ca
Show file tree
Hide file tree
Showing 9 changed files with 60 additions and 49 deletions.
4 changes: 4 additions & 0 deletions recapn-channel/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,10 @@ version = "0.0.0"
authors = ["Sydney Acksman <obsidianminor@gmail.com>"]
edition = "2021"

[features]
default = ["std"]
std = []

[dependencies]
pin-project = "1.1"
parking_lot = "0.12"
Expand Down
6 changes: 5 additions & 1 deletion recapn-channel/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,8 @@
use std::hash::Hash;
#![cfg_attr(not(feature = "std"), no_std)]

use core::hash::Hash;

extern crate alloc;

pub mod mpsc;
pub mod request;
Expand Down
36 changes: 18 additions & 18 deletions recapn-channel/src/mpsc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,19 +15,19 @@
//! this set of types combines the channel queue with the oneshot mechanism and makes it
//! so one request makes one allocation.
use std::cell::UnsafeCell;
use std::fmt::{self, Debug};
use std::future::{poll_fn, Future};
use std::hash::{Hash, Hasher};
use std::marker::PhantomData;
use std::mem::MaybeUninit;
use std::pin::Pin;
use std::process::abort;
use std::ptr::{addr_of_mut, NonNull};
use std::sync::atomic::AtomicUsize;
use std::sync::atomic::Ordering::Relaxed;
use std::sync::{Arc, Weak};
use std::task::{Context, Poll, Waker};
use alloc::boxed::Box;
use alloc::sync::{Arc, Weak};
use core::cell::UnsafeCell;
use core::fmt::{self, Debug};
use core::future::{poll_fn, Future};
use core::hash::{Hash, Hasher};
use core::marker::PhantomData;
use core::mem::MaybeUninit;
use core::pin::Pin;
use core::ptr::{addr_of_mut, NonNull};
use core::sync::atomic::AtomicUsize;
use core::sync::atomic::Ordering::Relaxed;
use core::task::{Context, Poll, Waker};

use parking_lot::{Mutex, MutexGuard};
use pin_project::{pin_project, pinned_drop};
Expand Down Expand Up @@ -444,7 +444,7 @@ impl<C: Chan + ?Sized> Eq for Sender<C> {}

impl<C: Chan + ?Sized> Hash for Sender<C> {
fn hash<H: Hasher>(&self, state: &mut H) {
std::ptr::hash(Arc::as_ptr(&self.shared), state);
core::ptr::hash(Arc::as_ptr(&self.shared), state);
}
}

Expand Down Expand Up @@ -500,19 +500,19 @@ impl<C: Chan> Receiver<C> {
let self_ptr = core::ptr::from_ref(this.as_ref());
let other_ptr = core::ptr::from_ref(other_shared.as_ref());
match self_ptr.cmp(&other_ptr) {
std::cmp::Ordering::Less => {
core::cmp::Ordering::Less => {
lock_a = this.state.lock();
lock_b = other_shared.state.lock();
self_lock = &mut lock_a;
other_lock = &mut lock_b;
}
std::cmp::Ordering::Greater => {
core::cmp::Ordering::Greater => {
lock_a = other_shared.state.lock();
lock_b = this.state.lock();
self_lock = &mut lock_b;
other_lock = &mut lock_a;
}
std::cmp::Ordering::Equal => return Err(self),
core::cmp::Ordering::Equal => return Err(self),
};

(other, resolution) = other.most_resolved();
Expand Down Expand Up @@ -745,7 +745,7 @@ impl<C: Chan + ?Sized> ResolutionState<C> {
let old = self.sender_count.fetch_add(1, Relaxed);

if old == usize::MAX {
abort();
panic!("out of sender counters");
}

// Make sure I don't accidentally attempt to re-open the channel.
Expand Down
24 changes: 12 additions & 12 deletions recapn-channel/src/request.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,17 @@
//!
//! Request, Response, Pipeline, all in one allocation.
use std::cell::UnsafeCell;
use std::fmt::{self, Debug};
use std::future::{Future, IntoFuture};
use std::hash::Hash;
use std::mem::{ManuallyDrop, MaybeUninit};
use std::ops::Deref;
use std::pin::Pin;
use std::sync::atomic::Ordering::{Acquire, Relaxed};
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use std::task::{Context, Poll};
use alloc::sync::Arc;
use core::cell::UnsafeCell;
use core::fmt::{self, Debug};
use core::future::{Future, IntoFuture};
use core::hash::Hash;
use core::mem::{ManuallyDrop, MaybeUninit};
use core::ops::Deref;
use core::pin::Pin;
use core::sync::atomic::Ordering::{Acquire, Relaxed};
use core::sync::atomic::{AtomicUsize, Ordering};
use core::task::{Context, Poll};

use hashbrown::hash_map::RawEntryMut;
use hashbrown::{Equivalent, HashMap};
Expand Down Expand Up @@ -250,7 +250,7 @@ impl<C: Chan> SharedRequest<C> {
let old = self.receivers_count.fetch_add(1, Relaxed);

if old == usize::MAX {
std::process::abort();
panic!("out of receiver counters");
}

// Make sure I don't accidentally attempt to re-open the channel.
Expand Down
2 changes: 1 addition & 1 deletion recapn-channel/src/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ pub mod closed_task;
pub mod linked_list;
pub mod wait_list;

use std::sync::Arc;
use alloc::sync::Arc;

/// A **M**ovable **Arc**.
///
Expand Down
4 changes: 2 additions & 2 deletions recapn-channel/src/util/atomic_state.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
//! Atomic state shared by channels and requests.
use std::sync::atomic::AtomicU8;
use std::sync::atomic::Ordering::{self, AcqRel, Acquire, Relaxed, Release};
use core::sync::atomic::AtomicU8;
use core::sync::atomic::Ordering::{self, AcqRel, Acquire, Relaxed, Release};

const STATE_SET: u8 = 0b0000_0001;
const STATE_SEND_CLOSED: u8 = 0b0000_0010;
Expand Down
8 changes: 4 additions & 4 deletions recapn-channel/src/util/closed_task.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use std::cell::UnsafeCell;
use std::mem::MaybeUninit;
use std::sync::atomic::Ordering::Acquire;
use std::task::{Context, Poll, Waker};
use core::cell::UnsafeCell;
use core::mem::MaybeUninit;
use core::sync::atomic::Ordering::Acquire;
use core::task::{Context, Poll, Waker};

use super::atomic_state::AtomicState;

Expand Down
5 changes: 4 additions & 1 deletion recapn-channel/src/util/linked_list.rs
Original file line number Diff line number Diff line change
Expand Up @@ -398,7 +398,10 @@ impl<L: Link> GuardedLinkedList<L, L::Target> {
pub(crate) mod tests {
use super::*;

use std::pin::Pin;
use alloc::boxed::Box;
use alloc::vec;
use alloc::vec::Vec;
use core::pin::Pin;

#[derive(Debug)]
#[repr(C)]
Expand Down
20 changes: 10 additions & 10 deletions recapn-channel/src/util/wait_list.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
use std::cell::UnsafeCell;
use std::marker::PhantomPinned;
use std::pin::{pin, Pin};
use std::ptr::{addr_of_mut, NonNull};
use std::sync::atomic::AtomicBool;
use std::sync::atomic::Ordering::{Acquire, Relaxed, Release};
use std::task::{Context, Waker};
use core::cell::UnsafeCell;
use core::marker::PhantomPinned;
use core::pin::{pin, Pin};
use core::ptr::{addr_of_mut, NonNull};
use core::sync::atomic::AtomicBool;
use core::sync::atomic::Ordering::{Acquire, Relaxed, Release};
use core::task::{Context, Waker};

use parking_lot::Mutex;
use pin_project::pin_project;
Expand Down Expand Up @@ -146,7 +146,7 @@ impl WaitList {
// that we will not leave any list entry with a pointer to the local
// guard node after this function returns / panics.
let mut list =
RecvWaitersList::new(std::mem::take(&mut *waiters), pinned_guard.as_ref(), self);
RecvWaitersList::new(core::mem::take(&mut *waiters), pinned_guard.as_ref(), self);

const NUM_WAKERS: usize = 32;

Expand Down Expand Up @@ -305,7 +305,7 @@ impl RecvWaiter {
|| matches!(current, Some(w) if w.will_wake(polling_waker));
if should_update {
old_waker =
std::mem::replace(&mut *current, Some(polling_waker.clone()));
core::mem::replace(&mut *current, Some(polling_waker.clone()));
}
}

Expand Down Expand Up @@ -485,7 +485,7 @@ impl ClosedWaiter {
|| matches!(current, Some(w) if w.will_wake(polling_waker));
if should_update {
old_waker =
std::mem::replace(&mut *current, Some(polling_waker.clone()));
core::mem::replace(&mut *current, Some(polling_waker.clone()));
}
}

Expand Down

0 comments on commit 36a77ca

Please sign in to comment.