From 4ffcd40d8806ebd6a2f7d7acc14bd10a4bc610da Mon Sep 17 00:00:00 2001 From: Praveen Perera Date: Sun, 2 Feb 2025 13:50:01 -0600 Subject: [PATCH] Try using crossbeam actors --- Cargo.lock | 49 ++++++++++++++++++++++ rustywind-cli/Cargo.toml | 4 ++ rustywind-cli/src/main.rs | 33 ++++++++++----- rustywind-cli/src/par.rs | 87 +++++++++++++++++++++++++++++++++++++++ 4 files changed, 163 insertions(+), 10 deletions(-) create mode 100644 rustywind-cli/src/par.rs diff --git a/Cargo.lock b/Cargo.lock index 013d08d..e4f8ae7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -232,6 +232,28 @@ dependencies = [ "cfg-if", ] +[[package]] +name = "crossbeam" +version = "0.8.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1137cd7e7fc0fb5d3c5a8678be38ec56e819125d8d7907411fe24ccb943faca8" +dependencies = [ + "crossbeam-channel", + "crossbeam-deque", + "crossbeam-epoch", + "crossbeam-queue", + "crossbeam-utils", +] + +[[package]] +name = "crossbeam-channel" +version = "0.5.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "06ba6d68e24814cb8de6bb986db8222d3a027d15872cabc0d18817bc3c0e4471" +dependencies = [ + "crossbeam-utils", +] + [[package]] name = "crossbeam-deque" version = "0.8.6" @@ -251,6 +273,15 @@ dependencies = [ "crossbeam-utils", ] +[[package]] +name = "crossbeam-queue" +version = "0.3.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0f58bbc28f91df819d0aa2a2c00cd19754769c2fad90579b3592b1c9ba7a3115" +dependencies = [ + "crossbeam-utils", +] + [[package]] name = "crossbeam-utils" version = "0.8.21" @@ -362,6 +393,12 @@ version = "0.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2304e00983f87ffb38b55b444b5e3b60a884b5d30c0fca7d82fe33449bbe55ea" +[[package]] +name = "hermit-abi" +version = "0.3.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d231dfb89cfffdbc30e7fc41579ed6066ad03abda9e567ccafae602b97ec5024" + [[package]] name = "humantime" version = "2.1.0" @@ -595,6 +632,16 @@ dependencies = [ "adler2", ] +[[package]] +name = "num_cpus" +version = "1.16.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4161fcb6d602d4d2081af7c3a45852d875a03dd337a6bfdd6e06407b61342a43" +dependencies = [ + "hermit-abi", + "libc", +] + [[package]] name = "object" version = "0.32.2" @@ -747,11 +794,13 @@ dependencies = [ "clap", "color-eyre", "colored", + "crossbeam", "env_logger", "eyre", "ignore", "indoc", "log", + "num_cpus", "once_cell", "regex", "rustywind_core", diff --git a/rustywind-cli/Cargo.toml b/rustywind-cli/Cargo.toml index a239e44..b1b1a70 100644 --- a/rustywind-cli/Cargo.toml +++ b/rustywind-cli/Cargo.toml @@ -47,3 +47,7 @@ ignore = "0.4" # parsing serde = { version = "1.0", features = ["derive"] } serde_json = "1.0" + +# num +num_cpus = "1.16" +crossbeam = "0.8" diff --git a/rustywind-cli/src/main.rs b/rustywind-cli/src/main.rs index 94eaa68..7a4cafe 100644 --- a/rustywind-cli/src/main.rs +++ b/rustywind-cli/src/main.rs @@ -1,5 +1,6 @@ mod cli; mod options; +pub mod par; use ahash::AHashSet as HashSet; use clap::Parser; @@ -8,12 +9,14 @@ use indoc::indoc; use once_cell::sync::Lazy; use options::Options; use options::WriteMode; +use par::Heard; use rustywind_core::sorter; use std::fs; use std::path::Path; use std::path::PathBuf; use std::sync::atomic::AtomicBool; use std::sync::atomic::Ordering; +use std::sync::Arc; static EXIT_ERROR: Lazy = Lazy::new(|| AtomicBool::new(false)); static GRAY: Lazy = Lazy::new(|| colored::CustomColor::new(120, 120, 120)); @@ -99,7 +102,12 @@ fn main() -> Result<()> { color_eyre::install()?; let cli = Cli::parse(); - let options = Options::new_from_cli(cli)?; + + let mut options = Options::new_from_cli(cli)?; + + let search_paths = std::mem::take(&mut options.search_paths); + + let options = Arc::new(options); let rustywind = &options.rustywind; match &options.write_mode { @@ -132,9 +140,8 @@ fn main() -> Result<()> { eprint!("[WARN] No classes were found in STDIN"); } } else { - for file_path in options.search_paths.iter() { - run_on_file_paths(file_path, &options) - } + let heard = Heard::new(options.clone()); + heard.run_on_file_paths(search_paths); // after running on all files, if there was an error, exit with 1 if EXIT_ERROR.load(Ordering::Relaxed) { @@ -145,7 +152,9 @@ fn main() -> Result<()> { Ok(()) } -fn run_on_file_paths(file_path: &Path, options: &Options) { +pub fn run_on_file_paths(file_path: PathBuf, options: Arc) { + let file_path = &file_path; + // if the file is in the ignored_files list return early if should_ignore_current_file(&options.ignored_files, file_path) { log::debug!("file path {file_path:#?} found in ignored_files, will not sort"); @@ -161,11 +170,15 @@ fn run_on_file_paths(file_path: &Path, options: &Options) { match (contents_changed, &options.write_mode) { (_, WriteMode::ToStdOut) => (), - (_, WriteMode::DryRun) => print_file_name(file_path, contents_changed, options), + (_, WriteMode::DryRun) => { + print_file_name(file_path, contents_changed, &options) + } - (true, WriteMode::ToFile) => write_to_file(file_path, &sorted_content, options), + (true, WriteMode::ToFile) => { + write_to_file(file_path, &sorted_content, &options) + } (false, WriteMode::ToFile) => { - print_file_name(file_path, contents_changed, options) + print_file_name(file_path, contents_changed, &options) } // For now print the file contents to the console even if it hasn't changed to @@ -175,8 +188,8 @@ fn run_on_file_paths(file_path: &Path, options: &Options) { (true, WriteMode::ToConsole) => print_file_contents(&sorted_content), (false, WriteMode::ToConsole) => print_file_contents(&sorted_content), - (_, WriteMode::CheckFormatted) => { - print_changed_files(file_path, contents_changed, options); + (contents_changed, WriteMode::CheckFormatted) => { + print_changed_files(file_path, contents_changed, &options); } } } diff --git a/rustywind-cli/src/par.rs b/rustywind-cli/src/par.rs new file mode 100644 index 0000000..779499b --- /dev/null +++ b/rustywind-cli/src/par.rs @@ -0,0 +1,87 @@ +use std::{path::PathBuf, sync::Arc}; + +use crossbeam::channel::{Receiver, Sender}; + +use crate::options::Options; + +#[derive(Debug, Clone)] +pub struct Actor { + pub index: usize, + pub total: usize, + pub receiver: Receiver, + pub options: Arc, +} + +#[derive(Debug)] +pub struct Heard { + pub senders: Vec>, + pub actors: Vec>, +} + +impl Heard { + pub fn new(options: Arc) -> Self { + let physical_cores = num_cpus::get_physical(); + let mut actors = Vec::with_capacity(physical_cores); + let mut senders = Vec::with_capacity(physical_cores); + + for index in 0..physical_cores { + let (sender, receiver) = crossbeam::channel::bounded(1); + let actor = Actor::new(index, physical_cores, receiver, options.clone()).start(); + + senders.push(sender); + actors.push(actor); + } + + Self { senders, actors } + } + + pub fn run_on_file_paths(self, file_paths: Vec) { + for (index, file_path) in file_paths.into_iter().enumerate() { + let sender_idx = index % self.senders.len(); + let sender = self.senders[sender_idx].clone(); + sender.send(file_path).unwrap(); + } + + self.complete(); + } + + pub fn complete(mut self) { + // droping the senders will close the channels + { + let senders = std::mem::take(&mut self.senders); + drop(senders); + } + + // wait for all the threads to finish + for actor in self.actors.into_iter() { + actor.join().unwrap(); + } + } +} + +impl Actor { + pub fn new( + index: usize, + total: usize, + receiver: Receiver, + options: Arc, + ) -> Self { + Self { + index, + total, + receiver, + options, + } + } + + pub fn start(self) -> std::thread::JoinHandle<()> { + let options = self.options.clone(); + + let receiver = self.receiver.clone(); + std::thread::spawn(move || { + for file_path in receiver.iter() { + crate::run_on_file_paths(file_path, options.clone()); + } + }) + } +}