From e8dae663618ddfd73c42d523ec4b03377a3b5d00 Mon Sep 17 00:00:00 2001 From: Praveen Perera Date: Sun, 2 Feb 2025 13:17:14 -0600 Subject: [PATCH 1/7] Allow `RustyWind` to be cloned --- rustywind-core/src/app.rs | 2 +- rustywind-core/src/sorter.rs | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/rustywind-core/src/app.rs b/rustywind-core/src/app.rs index 9bcfc45..bb47b80 100644 --- a/rustywind-core/src/app.rs +++ b/rustywind-core/src/app.rs @@ -10,7 +10,7 @@ use aho_corasick::{Anchored, Input}; use regex::Captures; /// The options to pass to the sorter. -#[derive(Debug)] +#[derive(Debug, Clone)] pub struct RustyWind { pub regex: FinderRegex, pub sorter: Sorter, diff --git a/rustywind-core/src/sorter.rs b/rustywind-core/src/sorter.rs index d7139e8..a47e33d 100644 --- a/rustywind-core/src/sorter.rs +++ b/rustywind-core/src/sorter.rs @@ -16,7 +16,7 @@ pub(crate) static SORTER_EXTRACTOR_RE: Lazy = Lazy::new(|| Regex::new(r"^(\.[^\s]+)[ ]").unwrap()); /// Use either our default regex in [crate::defaults::RE] or a custom regex. -#[derive(Debug)] +#[derive(Debug, Clone)] pub enum FinderRegex { DefaultRegex, CustomRegex(Regex), @@ -34,7 +34,7 @@ impl Deref for FinderRegex { } /// Use either our default sorter in [crate::defaults::SORTER] or a custom sorter. -#[derive(Debug)] +#[derive(Debug, Clone)] pub enum Sorter { DefaultSorter, CustomSorter(HashMap), From 4ffcd40d8806ebd6a2f7d7acc14bd10a4bc610da Mon Sep 17 00:00:00 2001 From: Praveen Perera Date: Sun, 2 Feb 2025 13:50:01 -0600 Subject: [PATCH 2/7] Try using crossbeam actors --- Cargo.lock | 49 ++++++++++++++++++++++ rustywind-cli/Cargo.toml | 4 ++ rustywind-cli/src/main.rs | 33 ++++++++++----- rustywind-cli/src/par.rs | 87 +++++++++++++++++++++++++++++++++++++++ 4 files changed, 163 insertions(+), 10 deletions(-) create mode 100644 rustywind-cli/src/par.rs diff --git a/Cargo.lock b/Cargo.lock index 013d08d..e4f8ae7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -232,6 +232,28 @@ dependencies = [ "cfg-if", ] +[[package]] +name = "crossbeam" +version = "0.8.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1137cd7e7fc0fb5d3c5a8678be38ec56e819125d8d7907411fe24ccb943faca8" +dependencies = [ + "crossbeam-channel", + "crossbeam-deque", + "crossbeam-epoch", + "crossbeam-queue", + "crossbeam-utils", +] + +[[package]] +name = "crossbeam-channel" +version = "0.5.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "06ba6d68e24814cb8de6bb986db8222d3a027d15872cabc0d18817bc3c0e4471" +dependencies = [ + "crossbeam-utils", +] + [[package]] name = "crossbeam-deque" version = "0.8.6" @@ -251,6 +273,15 @@ dependencies = [ "crossbeam-utils", ] +[[package]] +name = "crossbeam-queue" +version = "0.3.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0f58bbc28f91df819d0aa2a2c00cd19754769c2fad90579b3592b1c9ba7a3115" +dependencies = [ + "crossbeam-utils", +] + [[package]] name = "crossbeam-utils" version = "0.8.21" @@ -362,6 +393,12 @@ version = "0.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2304e00983f87ffb38b55b444b5e3b60a884b5d30c0fca7d82fe33449bbe55ea" +[[package]] +name = "hermit-abi" +version = "0.3.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d231dfb89cfffdbc30e7fc41579ed6066ad03abda9e567ccafae602b97ec5024" + [[package]] name = "humantime" version = "2.1.0" @@ -595,6 +632,16 @@ dependencies = [ "adler2", ] +[[package]] +name = "num_cpus" +version = "1.16.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4161fcb6d602d4d2081af7c3a45852d875a03dd337a6bfdd6e06407b61342a43" +dependencies = [ + "hermit-abi", + "libc", +] + [[package]] name = "object" version = "0.32.2" @@ -747,11 +794,13 @@ dependencies = [ "clap", "color-eyre", "colored", + "crossbeam", "env_logger", "eyre", "ignore", "indoc", "log", + "num_cpus", "once_cell", "regex", "rustywind_core", diff --git a/rustywind-cli/Cargo.toml b/rustywind-cli/Cargo.toml index a239e44..b1b1a70 100644 --- a/rustywind-cli/Cargo.toml +++ b/rustywind-cli/Cargo.toml @@ -47,3 +47,7 @@ ignore = "0.4" # parsing serde = { version = "1.0", features = ["derive"] } serde_json = "1.0" + +# num +num_cpus = "1.16" +crossbeam = "0.8" diff --git a/rustywind-cli/src/main.rs b/rustywind-cli/src/main.rs index 94eaa68..7a4cafe 100644 --- a/rustywind-cli/src/main.rs +++ b/rustywind-cli/src/main.rs @@ -1,5 +1,6 @@ mod cli; mod options; +pub mod par; use ahash::AHashSet as HashSet; use clap::Parser; @@ -8,12 +9,14 @@ use indoc::indoc; use once_cell::sync::Lazy; use options::Options; use options::WriteMode; +use par::Heard; use rustywind_core::sorter; use std::fs; use std::path::Path; use std::path::PathBuf; use std::sync::atomic::AtomicBool; use std::sync::atomic::Ordering; +use std::sync::Arc; static EXIT_ERROR: Lazy = Lazy::new(|| AtomicBool::new(false)); static GRAY: Lazy = Lazy::new(|| colored::CustomColor::new(120, 120, 120)); @@ -99,7 +102,12 @@ fn main() -> Result<()> { color_eyre::install()?; let cli = Cli::parse(); - let options = Options::new_from_cli(cli)?; + + let mut options = Options::new_from_cli(cli)?; + + let search_paths = std::mem::take(&mut options.search_paths); + + let options = Arc::new(options); let rustywind = &options.rustywind; match &options.write_mode { @@ -132,9 +140,8 @@ fn main() -> Result<()> { eprint!("[WARN] No classes were found in STDIN"); } } else { - for file_path in options.search_paths.iter() { - run_on_file_paths(file_path, &options) - } + let heard = Heard::new(options.clone()); + heard.run_on_file_paths(search_paths); // after running on all files, if there was an error, exit with 1 if EXIT_ERROR.load(Ordering::Relaxed) { @@ -145,7 +152,9 @@ fn main() -> Result<()> { Ok(()) } -fn run_on_file_paths(file_path: &Path, options: &Options) { +pub fn run_on_file_paths(file_path: PathBuf, options: Arc) { + let file_path = &file_path; + // if the file is in the ignored_files list return early if should_ignore_current_file(&options.ignored_files, file_path) { log::debug!("file path {file_path:#?} found in ignored_files, will not sort"); @@ -161,11 +170,15 @@ fn run_on_file_paths(file_path: &Path, options: &Options) { match (contents_changed, &options.write_mode) { (_, WriteMode::ToStdOut) => (), - (_, WriteMode::DryRun) => print_file_name(file_path, contents_changed, options), + (_, WriteMode::DryRun) => { + print_file_name(file_path, contents_changed, &options) + } - (true, WriteMode::ToFile) => write_to_file(file_path, &sorted_content, options), + (true, WriteMode::ToFile) => { + write_to_file(file_path, &sorted_content, &options) + } (false, WriteMode::ToFile) => { - print_file_name(file_path, contents_changed, options) + print_file_name(file_path, contents_changed, &options) } // For now print the file contents to the console even if it hasn't changed to @@ -175,8 +188,8 @@ fn run_on_file_paths(file_path: &Path, options: &Options) { (true, WriteMode::ToConsole) => print_file_contents(&sorted_content), (false, WriteMode::ToConsole) => print_file_contents(&sorted_content), - (_, WriteMode::CheckFormatted) => { - print_changed_files(file_path, contents_changed, options); + (contents_changed, WriteMode::CheckFormatted) => { + print_changed_files(file_path, contents_changed, &options); } } } diff --git a/rustywind-cli/src/par.rs b/rustywind-cli/src/par.rs new file mode 100644 index 0000000..779499b --- /dev/null +++ b/rustywind-cli/src/par.rs @@ -0,0 +1,87 @@ +use std::{path::PathBuf, sync::Arc}; + +use crossbeam::channel::{Receiver, Sender}; + +use crate::options::Options; + +#[derive(Debug, Clone)] +pub struct Actor { + pub index: usize, + pub total: usize, + pub receiver: Receiver, + pub options: Arc, +} + +#[derive(Debug)] +pub struct Heard { + pub senders: Vec>, + pub actors: Vec>, +} + +impl Heard { + pub fn new(options: Arc) -> Self { + let physical_cores = num_cpus::get_physical(); + let mut actors = Vec::with_capacity(physical_cores); + let mut senders = Vec::with_capacity(physical_cores); + + for index in 0..physical_cores { + let (sender, receiver) = crossbeam::channel::bounded(1); + let actor = Actor::new(index, physical_cores, receiver, options.clone()).start(); + + senders.push(sender); + actors.push(actor); + } + + Self { senders, actors } + } + + pub fn run_on_file_paths(self, file_paths: Vec) { + for (index, file_path) in file_paths.into_iter().enumerate() { + let sender_idx = index % self.senders.len(); + let sender = self.senders[sender_idx].clone(); + sender.send(file_path).unwrap(); + } + + self.complete(); + } + + pub fn complete(mut self) { + // droping the senders will close the channels + { + let senders = std::mem::take(&mut self.senders); + drop(senders); + } + + // wait for all the threads to finish + for actor in self.actors.into_iter() { + actor.join().unwrap(); + } + } +} + +impl Actor { + pub fn new( + index: usize, + total: usize, + receiver: Receiver, + options: Arc, + ) -> Self { + Self { + index, + total, + receiver, + options, + } + } + + pub fn start(self) -> std::thread::JoinHandle<()> { + let options = self.options.clone(); + + let receiver = self.receiver.clone(); + std::thread::spawn(move || { + for file_path in receiver.iter() { + crate::run_on_file_paths(file_path, options.clone()); + } + }) + } +} From 525402951911239db1ff8a85eee3813905c0ba3d Mon Sep 17 00:00:00 2001 From: Praveen Perera Date: Sun, 2 Feb 2025 21:25:15 -0600 Subject: [PATCH 3/7] More msgs per channel --- rustywind-cli/src/par.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rustywind-cli/src/par.rs b/rustywind-cli/src/par.rs index 779499b..808baac 100644 --- a/rustywind-cli/src/par.rs +++ b/rustywind-cli/src/par.rs @@ -25,7 +25,7 @@ impl Heard { let mut senders = Vec::with_capacity(physical_cores); for index in 0..physical_cores { - let (sender, receiver) = crossbeam::channel::bounded(1); + let (sender, receiver) = crossbeam::channel::bounded(1000); let actor = Actor::new(index, physical_cores, receiver, options.clone()).start(); senders.push(sender); From 91d43bfaacd996a358c7325aeb14691a70310916 Mon Sep 17 00:00:00 2001 From: Praveen Perera Date: Mon, 3 Feb 2025 07:30:13 -0600 Subject: [PATCH 4/7] Actors should work on large chuks --- rustywind-cli/src/main.rs | 14 +++++--------- rustywind-cli/src/par.rs | 26 ++++++++++++++++---------- 2 files changed, 21 insertions(+), 19 deletions(-) diff --git a/rustywind-cli/src/main.rs b/rustywind-cli/src/main.rs index 7a4cafe..31637de 100644 --- a/rustywind-cli/src/main.rs +++ b/rustywind-cli/src/main.rs @@ -152,7 +152,7 @@ fn main() -> Result<()> { Ok(()) } -pub fn run_on_file_paths(file_path: PathBuf, options: Arc) { +pub fn run_on_file_paths(file_path: PathBuf, options: &Options) { let file_path = &file_path; // if the file is in the ignored_files list return early @@ -170,15 +170,11 @@ pub fn run_on_file_paths(file_path: PathBuf, options: Arc) { match (contents_changed, &options.write_mode) { (_, WriteMode::ToStdOut) => (), - (_, WriteMode::DryRun) => { - print_file_name(file_path, contents_changed, &options) - } + (_, WriteMode::DryRun) => print_file_name(file_path, contents_changed, options), - (true, WriteMode::ToFile) => { - write_to_file(file_path, &sorted_content, &options) - } + (true, WriteMode::ToFile) => write_to_file(file_path, &sorted_content, options), (false, WriteMode::ToFile) => { - print_file_name(file_path, contents_changed, &options) + print_file_name(file_path, contents_changed, options) } // For now print the file contents to the console even if it hasn't changed to @@ -189,7 +185,7 @@ pub fn run_on_file_paths(file_path: PathBuf, options: Arc) { (false, WriteMode::ToConsole) => print_file_contents(&sorted_content), (contents_changed, WriteMode::CheckFormatted) => { - print_changed_files(file_path, contents_changed, &options); + print_changed_files(file_path, contents_changed, options); } } } diff --git a/rustywind-cli/src/par.rs b/rustywind-cli/src/par.rs index 808baac..66aa92d 100644 --- a/rustywind-cli/src/par.rs +++ b/rustywind-cli/src/par.rs @@ -8,13 +8,13 @@ use crate::options::Options; pub struct Actor { pub index: usize, pub total: usize, - pub receiver: Receiver, + pub receiver: Receiver>, pub options: Arc, } #[derive(Debug)] pub struct Heard { - pub senders: Vec>, + pub senders: Vec>>, pub actors: Vec>, } @@ -36,11 +36,16 @@ impl Heard { } pub fn run_on_file_paths(self, file_paths: Vec) { - for (index, file_path) in file_paths.into_iter().enumerate() { - let sender_idx = index % self.senders.len(); - let sender = self.senders[sender_idx].clone(); - sender.send(file_path).unwrap(); - } + let total_chunks = self.senders.len(); + let chunks_of = file_paths.len() / total_chunks; + + file_paths + .chunks(chunks_of) + .enumerate() + .for_each(|(index, chunk)| { + let senders = self.senders[index].clone(); + senders.send(chunk.to_vec()).unwrap(); + }); self.complete(); } @@ -63,7 +68,7 @@ impl Actor { pub fn new( index: usize, total: usize, - receiver: Receiver, + receiver: Receiver>, options: Arc, ) -> Self { Self { @@ -79,8 +84,9 @@ impl Actor { let receiver = self.receiver.clone(); std::thread::spawn(move || { - for file_path in receiver.iter() { - crate::run_on_file_paths(file_path, options.clone()); + let file_paths = receiver.recv().unwrap(); + for file_path in file_paths { + crate::run_on_file_paths(file_path, &options); } }) } From 3ab61561fc246e04f67c8bf24bb9a98366960960 Mon Sep 17 00:00:00 2001 From: Praveen Perera Date: Mon, 3 Feb 2025 07:41:57 -0600 Subject: [PATCH 5/7] Use a scoped_thread instead of actors --- Cargo.lock | 32 ---------------- rustywind-cli/Cargo.toml | 3 +- rustywind-cli/src/main.rs | 4 +- rustywind-cli/src/par.rs | 80 ++++++++------------------------------- 4 files changed, 17 insertions(+), 102 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index e4f8ae7..0252794 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -232,28 +232,6 @@ dependencies = [ "cfg-if", ] -[[package]] -name = "crossbeam" -version = "0.8.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1137cd7e7fc0fb5d3c5a8678be38ec56e819125d8d7907411fe24ccb943faca8" -dependencies = [ - "crossbeam-channel", - "crossbeam-deque", - "crossbeam-epoch", - "crossbeam-queue", - "crossbeam-utils", -] - -[[package]] -name = "crossbeam-channel" -version = "0.5.14" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "06ba6d68e24814cb8de6bb986db8222d3a027d15872cabc0d18817bc3c0e4471" -dependencies = [ - "crossbeam-utils", -] - [[package]] name = "crossbeam-deque" version = "0.8.6" @@ -273,15 +251,6 @@ dependencies = [ "crossbeam-utils", ] -[[package]] -name = "crossbeam-queue" -version = "0.3.12" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0f58bbc28f91df819d0aa2a2c00cd19754769c2fad90579b3592b1c9ba7a3115" -dependencies = [ - "crossbeam-utils", -] - [[package]] name = "crossbeam-utils" version = "0.8.21" @@ -794,7 +763,6 @@ dependencies = [ "clap", "color-eyre", "colored", - "crossbeam", "env_logger", "eyre", "ignore", diff --git a/rustywind-cli/Cargo.toml b/rustywind-cli/Cargo.toml index b1b1a70..36c56dd 100644 --- a/rustywind-cli/Cargo.toml +++ b/rustywind-cli/Cargo.toml @@ -48,6 +48,5 @@ ignore = "0.4" serde = { version = "1.0", features = ["derive"] } serde_json = "1.0" -# num +# parallelism num_cpus = "1.16" -crossbeam = "0.8" diff --git a/rustywind-cli/src/main.rs b/rustywind-cli/src/main.rs index 31637de..d16abbf 100644 --- a/rustywind-cli/src/main.rs +++ b/rustywind-cli/src/main.rs @@ -152,9 +152,7 @@ fn main() -> Result<()> { Ok(()) } -pub fn run_on_file_paths(file_path: PathBuf, options: &Options) { - let file_path = &file_path; - +pub fn run_on_file_path(file_path: &Path, options: &Options) { // if the file is in the ignored_files list return early if should_ignore_current_file(&options.ignored_files, file_path) { log::debug!("file path {file_path:#?} found in ignored_files, will not sort"); diff --git a/rustywind-cli/src/par.rs b/rustywind-cli/src/par.rs index 66aa92d..9f81b96 100644 --- a/rustywind-cli/src/par.rs +++ b/rustywind-cli/src/par.rs @@ -1,93 +1,43 @@ use std::{path::PathBuf, sync::Arc}; -use crossbeam::channel::{Receiver, Sender}; - use crate::options::Options; #[derive(Debug, Clone)] pub struct Actor { pub index: usize, pub total: usize, - pub receiver: Receiver>, pub options: Arc, } #[derive(Debug)] pub struct Heard { - pub senders: Vec>>, - pub actors: Vec>, + cpus: usize, + options: Arc, } impl Heard { pub fn new(options: Arc) -> Self { - let physical_cores = num_cpus::get_physical(); - let mut actors = Vec::with_capacity(physical_cores); - let mut senders = Vec::with_capacity(physical_cores); - - for index in 0..physical_cores { - let (sender, receiver) = crossbeam::channel::bounded(1000); - let actor = Actor::new(index, physical_cores, receiver, options.clone()).start(); - - senders.push(sender); - actors.push(actor); - } - - Self { senders, actors } + let cpus = num_cpus::get_physical(); + Self { cpus, options } } pub fn run_on_file_paths(self, file_paths: Vec) { - let total_chunks = self.senders.len(); + let total_chunks = self.cpus; let chunks_of = file_paths.len() / total_chunks; + let options = &self.options; - file_paths - .chunks(chunks_of) - .enumerate() - .for_each(|(index, chunk)| { - let senders = self.senders[index].clone(); - senders.send(chunk.to_vec()).unwrap(); + std::thread::scope(|s| { + file_paths.chunks(chunks_of).for_each(|chunk| { + s.spawn(|| { + run_on_file_paths(chunk, options); + }); }); - - self.complete(); - } - - pub fn complete(mut self) { - // droping the senders will close the channels - { - let senders = std::mem::take(&mut self.senders); - drop(senders); - } - - // wait for all the threads to finish - for actor in self.actors.into_iter() { - actor.join().unwrap(); - } + }) } } -impl Actor { - pub fn new( - index: usize, - total: usize, - receiver: Receiver>, - options: Arc, - ) -> Self { - Self { - index, - total, - receiver, - options, - } - } - - pub fn start(self) -> std::thread::JoinHandle<()> { - let options = self.options.clone(); - - let receiver = self.receiver.clone(); - std::thread::spawn(move || { - let file_paths = receiver.recv().unwrap(); - for file_path in file_paths { - crate::run_on_file_paths(file_path, &options); - } - }) +fn run_on_file_paths(file_paths: &[PathBuf], options: &Options) { + for file_path in file_paths { + crate::run_on_file_path(file_path, options); } } From 68c2da2ae4b6f6ce76a5aceea3cb5c30a784c6bc Mon Sep 17 00:00:00 2001 From: Praveen Perera Date: Mon, 3 Feb 2025 07:44:14 -0600 Subject: [PATCH 6/7] Rename files --- rustywind-cli/src/{par.rs => heard.rs} | 14 +++++--------- rustywind-cli/src/main.rs | 4 ++-- 2 files changed, 7 insertions(+), 11 deletions(-) rename rustywind-cli/src/{par.rs => heard.rs} (76%) diff --git a/rustywind-cli/src/par.rs b/rustywind-cli/src/heard.rs similarity index 76% rename from rustywind-cli/src/par.rs rename to rustywind-cli/src/heard.rs index 9f81b96..19b3a74 100644 --- a/rustywind-cli/src/par.rs +++ b/rustywind-cli/src/heard.rs @@ -1,13 +1,9 @@ -use std::{path::PathBuf, sync::Arc}; - +/// Heard is a struct that handles running the rustywind on a list of files +/// in parallel. It uses the num_cpus crate to determine the number of +/// physical cores on the machine. It then spawns a thread for each core +/// and runs the rustywind on the file paths. use crate::options::Options; - -#[derive(Debug, Clone)] -pub struct Actor { - pub index: usize, - pub total: usize, - pub options: Arc, -} +use std::{path::PathBuf, sync::Arc}; #[derive(Debug)] pub struct Heard { diff --git a/rustywind-cli/src/main.rs b/rustywind-cli/src/main.rs index d16abbf..3146c05 100644 --- a/rustywind-cli/src/main.rs +++ b/rustywind-cli/src/main.rs @@ -1,15 +1,15 @@ mod cli; +mod heard; mod options; -pub mod par; use ahash::AHashSet as HashSet; use clap::Parser; use eyre::Result; +use heard::Heard; use indoc::indoc; use once_cell::sync::Lazy; use options::Options; use options::WriteMode; -use par::Heard; use rustywind_core::sorter; use std::fs; use std::path::Path; From 1e2ef669fd778ca619eb7863e51b582279756d4c Mon Sep 17 00:00:00 2001 From: Praveen Perera Date: Mon, 3 Feb 2025 08:03:26 -0600 Subject: [PATCH 7/7] Add debug log to show how many files its checking --- rustywind-cli/src/heard.rs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/rustywind-cli/src/heard.rs b/rustywind-cli/src/heard.rs index 19b3a74..12492f7 100644 --- a/rustywind-cli/src/heard.rs +++ b/rustywind-cli/src/heard.rs @@ -18,6 +18,8 @@ impl Heard { } pub fn run_on_file_paths(self, file_paths: Vec) { + log::debug!("checking {} files", file_paths.len()); + let total_chunks = self.cpus; let chunks_of = file_paths.len() / total_chunks; let options = &self.options;