Skip to content

Commit

Permalink
Remove workers
Browse files Browse the repository at this point in the history
  • Loading branch information
IvanUkhov committed Dec 26, 2024
1 parent ed0d251 commit b2e7bee
Show file tree
Hide file tree
Showing 6 changed files with 24 additions and 22 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "loop"
version = "0.2.0"
version = "0.3.0"
edition = "2021"
license = "Apache-2.0/MIT"
authors = ["Ivan Ukhov <ivan.ukhov@gmail.com>"]
Expand Down
8 changes: 6 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,22 @@

The package allows for processing iterators in parallel.

# Example
# Examples

Synchronously:

```rust
let double = |value| 2 * value;
let _ = r#loop::parallelize(0..10, double, None).collect::<Vec<_>>();
```

Asynchronously:

```rust
use futures::stream::StreamExt;

let double = |value| async move { 2 * value };
let _ = r#loop::parallelize(0..10, double, None).collect::<Vec<_>>().await;
let _ = r#loop::parallelize(0..10, double).collect::<Vec<_>>().await;
```

## Contribution
Expand Down
7 changes: 2 additions & 5 deletions src/asynchronous.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ use tokio_stream::wrappers::ReceiverStream;
pub fn parallelize<Items, Item, Map, Future, Output>(
items: Items,
mut map: Map,
workers: Option<usize>,
) -> impl futures::stream::Stream<Item = Output>
where
Items: IntoIterator<Item = Item> + Send + 'static,
Expand All @@ -17,7 +16,7 @@ where
Future: std::future::Future<Output = Output> + Send,
Output: Send + 'static,
{
let workers = crate::support::workers(workers);
let workers = crate::support::workers();
let (item_sender, item_receiver) = mpsc::channel::<Item>(workers);
let (output_sender, output_receiver) = mpsc::channel::<Output>(workers);
let item_receiver = Arc::new(Mutex::new(item_receiver));
Expand Down Expand Up @@ -48,9 +47,7 @@ mod tests {

#[tokio::test]
async fn parallelize() {
let mut values = super::parallelize(0..10, double, None)
.collect::<Vec<_>>()
.await;
let mut values = super::parallelize(0..10, double).collect::<Vec<_>>().await;
values.sort();
assert_eq!(values, &[0, 2, 4, 6, 8, 10, 12, 14, 16, 18]);
}
Expand Down
14 changes: 9 additions & 5 deletions src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,25 +1,29 @@
//! Processing iterators in parallel.
//!
//! # Example
//! # Examples
//!
//! Synchronously:
//!
//! ```
//! # #[cfg(not(feature = "asynchronous"))]
//! fn main() {
//! let multiply = |value| 2 * value;
//! let _ = r#loop::parallelize(0..10, multiply, None).collect::<Vec<_>>();
//! let double = |value| 2 * value;
//! let _ = r#loop::parallelize(0..10, double).collect::<Vec<_>>();
//! }
//! # #[cfg(feature = "asynchronous")]
//! # fn main() {}
//!```
//!
//! Asynchronously:
//!
//!```
//! # #[cfg(feature = "asynchronous")]
//! #[tokio::main]
//! async fn main() {
//! use futures::stream::StreamExt;
//!
//! let multiply = |value| async move { 2 * value };
//! let _ = r#loop::parallelize(0..10, multiply, None).collect::<Vec<_>>().await;
//! let double = |value| async move { 2 * value };
//! let _ = r#loop::parallelize(0..10, double).collect::<Vec<_>>().await;
//! }
//! # #[cfg(not(feature = "asynchronous"))]
//! # fn main() {}
Expand Down
10 changes: 4 additions & 6 deletions src/support.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
pub fn workers(value: Option<usize>) -> usize {
value.unwrap_or_else(|| {
std::thread::available_parallelism()
.map(|value| value.get())
.unwrap_or(1)
})
pub fn workers() -> usize {
std::thread::available_parallelism()
.map(|value| value.get())
.unwrap_or(1)
}
5 changes: 2 additions & 3 deletions src/synchronous.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,14 @@ use std::sync::{mpsc, Arc, Mutex};
pub fn parallelize<Items, Item, Map, Output>(
items: Items,
mut map: Map,
workers: Option<usize>,
) -> impl Iterator<Item = Output>
where
Items: IntoIterator<Item = Item> + Send + 'static,
Item: Send + 'static,
Map: FnMut(Item) -> Output + Copy + Send + 'static,
Output: Send + 'static,
{
let workers = crate::support::workers(workers);
let workers = crate::support::workers();
let (item_sender, item_receiver) = mpsc::sync_channel::<Item>(workers);
let (output_sender, output_receiver) = mpsc::sync_channel::<Output>(workers);
let item_receiver = Arc::new(Mutex::new(item_receiver));
Expand Down Expand Up @@ -41,7 +40,7 @@ where
mod tests {
#[test]
fn parallelize() {
let mut values = super::parallelize(0..10, double, None).collect::<Vec<_>>();
let mut values = super::parallelize(0..10, double).collect::<Vec<_>>();
values.sort();
assert_eq!(values, &[0, 2, 4, 6, 8, 10, 12, 14, 16, 18]);
}
Expand Down

0 comments on commit b2e7bee

Please sign in to comment.