Skip to content

Commit

Permalink
Run all operations within the manager’s thread pool
Browse files Browse the repository at this point in the history
Also allow adjusting the split depth limit
  • Loading branch information
nhusung committed Jul 10, 2024
1 parent 3965fd0 commit 6993705
Show file tree
Hide file tree
Showing 11 changed files with 366 additions and 201 deletions.
20 changes: 19 additions & 1 deletion crates/oxidd-core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -947,7 +947,25 @@ pub trait WorkerManager: Manager + Sync {
/// Get the current number of threads
fn current_num_threads(&self) -> usize;

/// Execute `op_a` and `op_b` in parallel
/// Get the recursion depth up to which operations are split
fn split_depth(&self) -> u32;

/// Set the recursion depth up to which operations are split
///
/// `None` means that the implementation should automatically choose the
/// depth. `Some(0)` means that no operations are split.
fn set_split_depth(&self, depth: Option<u32>);

/// Execute `op` within the thread pool
///
/// If this method is called from another thread pool, it may cooperatively
/// yield execution to that pool until `op` has finished.
fn install<R: Send>(&self, op: impl FnOnce() -> R + Send) -> R;

/// Execute `op_a` and `op_b` in parallel within the thread pool
///
/// Note that the split depth has no influence on this method. Checking
/// whether to split an operation must be done externally.
fn join<RA: Send, RB: Send>(
&self,
op_a: impl FnOnce() -> RA + Send,
Expand Down
65 changes: 41 additions & 24 deletions crates/oxidd-manager-index/src/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,43 +14,30 @@
//! | `RC` | Diagram Rules Type Constructor |
//! | `OP` | Operation |
use std::cell::Cell;
use std::cell::UnsafeCell;
use std::cell::{Cell, UnsafeCell};
use std::cmp::Ordering;
use std::hash::Hash;
use std::hash::Hasher;
use std::hash::{Hash, Hasher};
use std::iter::FusedIterator;
use std::marker::PhantomData;
use std::mem::ManuallyDrop;
use std::sync::atomic::AtomicU32;
use std::sync::atomic::Ordering::{Acquire, Relaxed};
use std::sync::Arc;

use bitvec::vec::BitVec;
use crossbeam_utils::CachePadded;
use linear_hashtbl::raw::RawTable;
use oxidd_core::function::EdgeOfFunc;
use oxidd_core::util::GCContainer;
use parking_lot::Condvar;
use parking_lot::Mutex;
use parking_lot::MutexGuard;
use parking_lot::RwLock;
use parking_lot::{Condvar, Mutex, MutexGuard, RwLock};
use rayon::ThreadPool;
use rustc_hash::FxHasher;

use oxidd_core::util::AbortOnDrop;
use oxidd_core::util::AllocResult;
use oxidd_core::util::Borrowed;
use oxidd_core::util::DropWith;
use oxidd_core::util::OutOfMemory;
use oxidd_core::DiagramRules;
use oxidd_core::InnerNode;
use oxidd_core::LevelNo;
use oxidd_core::Tag;
use oxidd_core::function::EdgeOfFunc;
use oxidd_core::util::{AbortOnDrop, AllocResult, Borrowed, DropWith, GCContainer, OutOfMemory};
use oxidd_core::{DiagramRules, InnerNode, LevelNo, Tag};

use crate::node::NodeBase;
use crate::terminal_manager::TerminalManager;
use crate::util::Invariant;
use crate::util::TryLock;
use crate::util::{Invariant, TryLock};

// === Type Constructors =======================================================

Expand Down Expand Up @@ -285,6 +272,7 @@ where
reorder_count: u64,
gc_ongoing: TryLock,
workers: ThreadPool,
split_depth: AtomicU32,
}

/// Type "constructor" for the manager from `InnerNodeCons` etc.
Expand Down Expand Up @@ -1088,6 +1076,15 @@ where
}
}

fn auto_split_depth(workers: &rayon::ThreadPool) -> u32 {
let threads = workers.current_num_threads();
if threads > 1 {
(4096 * threads).ilog2()
} else {
0
}
}

impl<'id, N, ET, TM, R, MD, const TERMINALS: usize> oxidd_core::WorkerManager
for Manager<'id, N, ET, TM, R, MD, TERMINALS>
where
Expand All @@ -1102,6 +1099,24 @@ where
self.workers.current_num_threads()
}

#[inline(always)]
fn split_depth(&self) -> u32 {
self.split_depth.load(Relaxed)
}

fn set_split_depth(&self, depth: Option<u32>) {
let depth = match depth {
Some(d) => d,
None => auto_split_depth(&self.workers),
};
self.split_depth.store(depth, Relaxed);
}

#[inline]
fn install<RA: Send>(&self, op: impl FnOnce() -> RA + Send) -> RA {
self.workers.install(op)
}

#[inline]
fn join<RA: Send, RB: Send>(
&self,
Expand All @@ -1112,10 +1127,10 @@ where
}

#[inline]
fn broadcast<RES: Send>(
fn broadcast<RA: Send>(
&self,
op: impl Fn(oxidd_core::BroadcastContext) -> RES + Sync,
) -> Vec<RES> {
op: impl Fn(oxidd_core::BroadcastContext) -> RA + Sync,
) -> Vec<RA> {
self.workers.broadcast(|ctx| {
op(oxidd_core::BroadcastContext {
index: ctx.index() as u32,
Expand Down Expand Up @@ -2002,6 +2017,7 @@ pub fn new_manager<
.thread_name(|i| format!("oxidd mi {i}")) // "mi" for "manager index"
.build()
.expect("could not build thread pool");
let split_depth = AtomicU32::new(auto_split_depth(&workers));

let gc_lwm = inner_node_capacity / 100 * 90;
let gc_hwm = inner_node_capacity / 100 * 95;
Expand All @@ -2027,6 +2043,7 @@ pub fn new_manager<
reorder_count: 0,
gc_ongoing: TryLock::new(),
workers,
split_depth,
}),
terminal_manager: TMC::T::<'static>::with_capacity(terminal_node_capacity),
gc_signal: (Mutex::new(GCSignal::RunGc), Condvar::new()),
Expand Down
76 changes: 45 additions & 31 deletions crates/oxidd-manager-pointer/src/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,49 +16,31 @@
use std::cmp::Ordering;
use std::collections::HashMap;
use std::hash::BuildHasherDefault;
use std::hash::Hash;
use std::hash::Hasher;
use std::hash::{BuildHasherDefault, Hash, Hasher};
use std::iter::FusedIterator;
use std::marker::PhantomData;
use std::mem::align_of;
use std::mem::ManuallyDrop;
use std::mem::MaybeUninit;
use std::ptr::addr_of;
use std::ptr::addr_of_mut;
use std::ptr::NonNull;

use arcslab::ArcSlab;
use arcslab::ArcSlabRef;
use arcslab::AtomicRefCounted;
use arcslab::ExtHandle;
use arcslab::IntHandle;
use std::mem::{align_of, ManuallyDrop, MaybeUninit};
use std::ptr::{addr_of, addr_of_mut, NonNull};
use std::sync::atomic::AtomicU32;
use std::sync::atomic::Ordering::Relaxed;

use arcslab::{ArcSlab, ArcSlabRef, AtomicRefCounted, ExtHandle, IntHandle};
use bitvec::bitvec;
use bitvec::vec::BitVec;
use linear_hashtbl::raw::RawTable;
use oxidd_core::util::GCContainer;
use parking_lot::Mutex;
use parking_lot::MutexGuard;
use rustc_hash::FxHasher;

use oxidd_core::function::EdgeOfFunc;
use oxidd_core::util::AbortOnDrop;
use oxidd_core::util::AllocResult;
use oxidd_core::util::Borrowed;
use oxidd_core::util::DropWith;
use oxidd_core::DiagramRules;
use oxidd_core::HasApplyCache;
use oxidd_core::InnerNode;
use oxidd_core::LevelNo;
use oxidd_core::Node;
use oxidd_core::Tag;
use oxidd_core::util::{AbortOnDrop, AllocResult, Borrowed, DropWith, GCContainer};
use oxidd_core::{DiagramRules, HasApplyCache, InnerNode, LevelNo, Node, Tag};

use crate::node::NodeBase;
use crate::terminal_manager::TerminalManager;
use crate::util;
use crate::util::rwlock::RwLock;
use crate::util::Invariant;
use crate::util::TryLock;
use crate::util::{Invariant, TryLock};

// === Type Constructors =======================================================

Expand Down Expand Up @@ -179,6 +161,7 @@ where
gc_ongoing: TryLock,
reorder_count: u64,
workers: rayon::ThreadPool,
split_depth: AtomicU32,
phantom: PhantomData<(TM, R)>,
}

Expand Down Expand Up @@ -292,13 +275,15 @@ where
.build()
.expect("Failed to build thread pool");

let split_depth = AtomicU32::new(auto_split_depth(&workers));
let data = RwLock::new(Manager {
unique_table: Vec::new(),
data: ManuallyDrop::new(data),
store_inner: slot,
gc_ongoing: TryLock::new(),
reorder_count: 0,
workers,
split_depth,
phantom: PhantomData,
});
unsafe { std::ptr::write(addr_of_mut!((*slot).manager), data) };
Expand Down Expand Up @@ -777,6 +762,15 @@ where
}
}

fn auto_split_depth(workers: &rayon::ThreadPool) -> u32 {
let threads = workers.current_num_threads();
if threads > 1 {
(4096 * threads).ilog2()
} else {
0
}
}

impl<'id, N, ET, TM, R, MD, const PAGE_SIZE: usize, const TAG_BITS: u32> oxidd_core::WorkerManager
for Manager<'id, N, ET, TM, R, MD, PAGE_SIZE, TAG_BITS>
where
Expand All @@ -786,10 +780,30 @@ where
R: DiagramRules<Edge<'id, N, ET, TAG_BITS>, N, TM::TerminalNode>,
MD: DropWith<Edge<'id, N, ET, TAG_BITS>> + GCContainer<Self> + Send + Sync,
{
#[inline]
fn current_num_threads(&self) -> usize {
self.workers.current_num_threads()
}

#[inline(always)]
fn split_depth(&self) -> u32 {
self.split_depth.load(Relaxed)
}

fn set_split_depth(&self, depth: Option<u32>) {
let depth = match depth {
Some(d) => d,
None => auto_split_depth(&self.workers),
};
self.split_depth.store(depth, Relaxed);
}

#[inline]
fn install<RA: Send>(&self, op: impl FnOnce() -> RA + Send) -> RA {
self.workers.install(op)
}

#[inline]
fn join<RA: Send, RB: Send>(
&self,
op_a: impl FnOnce() -> RA + Send,
Expand All @@ -798,10 +812,10 @@ where
self.workers.join(op_a, op_b)
}

fn broadcast<RES: Send>(
fn broadcast<RA: Send>(
&self,
op: impl Fn(oxidd_core::BroadcastContext) -> RES + Sync,
) -> Vec<RES> {
op: impl Fn(oxidd_core::BroadcastContext) -> RA + Sync,
) -> Vec<RA> {
self.workers.broadcast(|ctx| {
op(oxidd_core::BroadcastContext {
index: ctx.index() as u32,
Expand Down
Loading

0 comments on commit 6993705

Please sign in to comment.