Skip to content

Commit

Permalink
Bring back workers
Browse files Browse the repository at this point in the history
  • Loading branch information
IvanUkhov committed Dec 28, 2024
1 parent 2e2b176 commit b363be2
Show file tree
Hide file tree
Showing 6 changed files with 20 additions and 12 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "loop"
version = "0.4.0"
version = "0.5.0"
edition = "2021"
license = "Apache-2.0/MIT"
authors = ["Ivan Ukhov <ivan.ukhov@gmail.com>"]
Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use futures::stream::StreamExt;
use r#loop::asynchronous::parallelize;

let double = |value| async move { 2 * value };
let _ = parallelize(0..10, double).collect::<Vec<_>>().await;
let _ = parallelize(0..10, double, None).collect::<Vec<_>>().await;
```

## Contribution
Expand Down
7 changes: 5 additions & 2 deletions src/asynchronous.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use tokio_stream::wrappers::ReceiverStream;
pub fn parallelize<Items, Item, Map, Future, Output>(
items: Items,
mut map: Map,
workers: Option<usize>,
) -> impl futures::stream::Stream<Item = Output>
where
Items: IntoIterator<Item = Item> + Send + 'static,
Expand All @@ -18,7 +19,7 @@ where
Future: std::future::Future<Output = Output> + Send,
Output: Send + 'static,
{
let workers = crate::support::workers();
let workers = crate::support::workers(workers);
let (item_sender, item_receiver) = mpsc::channel::<Item>(workers);
let (output_sender, output_receiver) = mpsc::channel::<Output>(workers);
let item_receiver = Arc::new(Mutex::new(item_receiver));
Expand Down Expand Up @@ -49,7 +50,9 @@ mod tests {

#[tokio::test]
async fn parallelize() {
let mut values = super::parallelize(0..10, double).collect::<Vec<_>>().await;
let mut values = super::parallelize(0..10, double, None)
.collect::<Vec<_>>()
.await;
values.sort();
assert_eq!(values, &[0, 2, 4, 6, 8, 10, 12, 14, 16, 18]);
}
Expand Down
4 changes: 2 additions & 2 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
//! use r#loop::parallelize;
//!
//! let double = |value| 2 * value;
//! let _ = parallelize(0..10, double).collect::<Vec<_>>();
//! let _ = parallelize(0..10, double, None).collect::<Vec<_>>();
//! }
//!```
//!
Expand All @@ -23,7 +23,7 @@
//! use r#loop::asynchronous::parallelize;
//!
//! let double = |value| async move { 2 * value };
//! let _ = parallelize(0..10, double).collect::<Vec<_>>().await;
//! let _ = parallelize(0..10, double, None).collect::<Vec<_>>().await;
//! }
//! # #[cfg(not(feature = "asynchronous"))]
//! # fn main() {}
Expand Down
12 changes: 8 additions & 4 deletions src/support.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
pub fn workers() -> usize {
std::thread::available_parallelism()
.map(|value| value.get())
.unwrap_or(1)
pub fn workers(value: Option<usize>) -> usize {
value
.map(|value| std::cmp::min(value, 1))
.unwrap_or_else(|| {
std::thread::available_parallelism()
.map(|value| value.get())
.unwrap_or(1)
})
}
5 changes: 3 additions & 2 deletions src/synchronous.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,15 @@ use std::sync::{mpsc, Arc, Mutex};
pub fn parallelize<Items, Item, Map, Output>(
items: Items,
mut map: Map,
workers: Option<usize>,
) -> impl Iterator<Item = Output>
where
Items: IntoIterator<Item = Item> + Send + 'static,
Item: Send + 'static,
Map: FnMut(Item) -> Output + Copy + Send + 'static,
Output: Send + 'static,
{
let workers = crate::support::workers();
let workers = crate::support::workers(workers);
let (item_sender, item_receiver) = mpsc::sync_channel::<Item>(workers);
let (output_sender, output_receiver) = mpsc::sync_channel::<Output>(workers);
let item_receiver = Arc::new(Mutex::new(item_receiver));
Expand Down Expand Up @@ -42,7 +43,7 @@ where
mod tests {
#[test]
fn parallelize() {
let mut values = super::parallelize(0..10, double).collect::<Vec<_>>();
let mut values = super::parallelize(0..10, double, None).collect::<Vec<_>>();
values.sort();
assert_eq!(values, &[0, 2, 4, 6, 8, 10, 12, 14, 16, 18]);
}
Expand Down

0 comments on commit b363be2

Please sign in to comment.