Skip to content

Commit

Permalink
Try using crossbeam actors
Browse files Browse the repository at this point in the history
  • Loading branch information
praveenperera committed Feb 2, 2025
1 parent e8dae66 commit 4ffcd40
Show file tree
Hide file tree
Showing 4 changed files with 163 additions and 10 deletions.
49 changes: 49 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions rustywind-cli/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -47,3 +47,7 @@ ignore = "0.4"
# parsing
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"

# num
num_cpus = "1.16"
crossbeam = "0.8"
33 changes: 23 additions & 10 deletions rustywind-cli/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
mod cli;
mod options;
pub mod par;

use ahash::AHashSet as HashSet;
use clap::Parser;
Expand All @@ -8,12 +9,14 @@ use indoc::indoc;
use once_cell::sync::Lazy;
use options::Options;
use options::WriteMode;
use par::Heard;
use rustywind_core::sorter;
use std::fs;
use std::path::Path;
use std::path::PathBuf;
use std::sync::atomic::AtomicBool;
use std::sync::atomic::Ordering;
use std::sync::Arc;

static EXIT_ERROR: Lazy<AtomicBool> = Lazy::new(|| AtomicBool::new(false));
static GRAY: Lazy<colored::CustomColor> = Lazy::new(|| colored::CustomColor::new(120, 120, 120));
Expand Down Expand Up @@ -99,7 +102,12 @@ fn main() -> Result<()> {
color_eyre::install()?;

let cli = Cli::parse();
let options = Options::new_from_cli(cli)?;

let mut options = Options::new_from_cli(cli)?;

let search_paths = std::mem::take(&mut options.search_paths);

let options = Arc::new(options);
let rustywind = &options.rustywind;

match &options.write_mode {
Expand Down Expand Up @@ -132,9 +140,8 @@ fn main() -> Result<()> {
eprint!("[WARN] No classes were found in STDIN");
}
} else {
for file_path in options.search_paths.iter() {
run_on_file_paths(file_path, &options)
}
let heard = Heard::new(options.clone());
heard.run_on_file_paths(search_paths);

// after running on all files, if there was an error, exit with 1
if EXIT_ERROR.load(Ordering::Relaxed) {
Expand All @@ -145,7 +152,9 @@ fn main() -> Result<()> {
Ok(())
}

fn run_on_file_paths(file_path: &Path, options: &Options) {
pub fn run_on_file_paths(file_path: PathBuf, options: Arc<Options>) {
let file_path = &file_path;

// if the file is in the ignored_files list return early
if should_ignore_current_file(&options.ignored_files, file_path) {
log::debug!("file path {file_path:#?} found in ignored_files, will not sort");
Expand All @@ -161,11 +170,15 @@ fn run_on_file_paths(file_path: &Path, options: &Options) {

match (contents_changed, &options.write_mode) {
(_, WriteMode::ToStdOut) => (),
(_, WriteMode::DryRun) => print_file_name(file_path, contents_changed, options),
(_, WriteMode::DryRun) => {
print_file_name(file_path, contents_changed, &options)
}

(true, WriteMode::ToFile) => write_to_file(file_path, &sorted_content, options),
(true, WriteMode::ToFile) => {
write_to_file(file_path, &sorted_content, &options)
}
(false, WriteMode::ToFile) => {
print_file_name(file_path, contents_changed, options)
print_file_name(file_path, contents_changed, &options)
}

// For now print the file contents to the console even if it hasn't changed to
Expand All @@ -175,8 +188,8 @@ fn run_on_file_paths(file_path: &Path, options: &Options) {
(true, WriteMode::ToConsole) => print_file_contents(&sorted_content),
(false, WriteMode::ToConsole) => print_file_contents(&sorted_content),

(_, WriteMode::CheckFormatted) => {
print_changed_files(file_path, contents_changed, options);
(contents_changed, WriteMode::CheckFormatted) => {
print_changed_files(file_path, contents_changed, &options);
}
}
}
Expand Down
87 changes: 87 additions & 0 deletions rustywind-cli/src/par.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
use std::{path::PathBuf, sync::Arc};

use crossbeam::channel::{Receiver, Sender};

use crate::options::Options;

#[derive(Debug, Clone)]
pub struct Actor {
pub index: usize,
pub total: usize,
pub receiver: Receiver<PathBuf>,
pub options: Arc<Options>,
}

#[derive(Debug)]
pub struct Heard {
pub senders: Vec<Sender<PathBuf>>,
pub actors: Vec<std::thread::JoinHandle<()>>,
}

impl Heard {
pub fn new(options: Arc<Options>) -> Self {
let physical_cores = num_cpus::get_physical();
let mut actors = Vec::with_capacity(physical_cores);
let mut senders = Vec::with_capacity(physical_cores);

for index in 0..physical_cores {
let (sender, receiver) = crossbeam::channel::bounded(1);
let actor = Actor::new(index, physical_cores, receiver, options.clone()).start();

senders.push(sender);
actors.push(actor);
}

Self { senders, actors }
}

pub fn run_on_file_paths(self, file_paths: Vec<PathBuf>) {
for (index, file_path) in file_paths.into_iter().enumerate() {
let sender_idx = index % self.senders.len();
let sender = self.senders[sender_idx].clone();
sender.send(file_path).unwrap();
}

self.complete();
}

pub fn complete(mut self) {
// droping the senders will close the channels
{
let senders = std::mem::take(&mut self.senders);
drop(senders);
}

// wait for all the threads to finish
for actor in self.actors.into_iter() {
actor.join().unwrap();
}
}
}

impl Actor {
pub fn new(
index: usize,
total: usize,
receiver: Receiver<PathBuf>,
options: Arc<Options>,
) -> Self {
Self {
index,
total,
receiver,
options,
}
}

pub fn start(self) -> std::thread::JoinHandle<()> {
let options = self.options.clone();

let receiver = self.receiver.clone();
std::thread::spawn(move || {
for file_path in receiver.iter() {
crate::run_on_file_paths(file_path, options.clone());
}
})
}
}

0 comments on commit 4ffcd40

Please sign in to comment.