diff --git a/Cargo.toml b/Cargo.toml index d2ac2c4..6b7fa0a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "loop" -version = "0.2.0" +version = "0.3.0" edition = "2021" license = "Apache-2.0/MIT" authors = ["Ivan Ukhov "] diff --git a/README.md b/README.md index 806c6b7..4643ea2 100644 --- a/README.md +++ b/README.md @@ -2,18 +2,22 @@ The package allows for processing iterators in parallel. -# Example +# Examples + +Synchronously: ```rust let double = |value| 2 * value; let _ = r#loop::parallelize(0..10, double, None).collect::>(); ``` +Asynchronously: + ```rust use futures::stream::StreamExt; let double = |value| async move { 2 * value }; -let _ = r#loop::parallelize(0..10, double, None).collect::>().await; +let _ = r#loop::parallelize(0..10, double).collect::>().await; ``` ## Contribution diff --git a/src/asynchronous.rs b/src/asynchronous.rs index 0a84262..eb3ce0d 100644 --- a/src/asynchronous.rs +++ b/src/asynchronous.rs @@ -7,7 +7,6 @@ use tokio_stream::wrappers::ReceiverStream; pub fn parallelize( items: Items, mut map: Map, - workers: Option, ) -> impl futures::stream::Stream where Items: IntoIterator + Send + 'static, @@ -17,7 +16,7 @@ where Future: std::future::Future + Send, Output: Send + 'static, { - let workers = crate::support::workers(workers); + let workers = crate::support::workers(); let (item_sender, item_receiver) = mpsc::channel::(workers); let (output_sender, output_receiver) = mpsc::channel::(workers); let item_receiver = Arc::new(Mutex::new(item_receiver)); @@ -48,9 +47,7 @@ mod tests { #[tokio::test] async fn parallelize() { - let mut values = super::parallelize(0..10, double, None) - .collect::>() - .await; + let mut values = super::parallelize(0..10, double).collect::>().await; values.sort(); assert_eq!(values, &[0, 2, 4, 6, 8, 10, 12, 14, 16, 18]); } diff --git a/src/lib.rs b/src/lib.rs index 4fa96f9..8d38285 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,25 +1,29 @@ //! Processing iterators in parallel. //! -//! # Example +//! # Examples +//! +//! Synchronously: //! //! ``` //! # #[cfg(not(feature = "asynchronous"))] //! fn main() { -//! let multiply = |value| 2 * value; -//! let _ = r#loop::parallelize(0..10, multiply, None).collect::>(); +//! let double = |value| 2 * value; +//! let _ = r#loop::parallelize(0..10, double).collect::>(); //! } //! # #[cfg(feature = "asynchronous")] //! # fn main() {} //!``` //! +//! Asynchronously: +//! //!``` //! # #[cfg(feature = "asynchronous")] //! #[tokio::main] //! async fn main() { //! use futures::stream::StreamExt; //! -//! let multiply = |value| async move { 2 * value }; -//! let _ = r#loop::parallelize(0..10, multiply, None).collect::>().await; +//! let double = |value| async move { 2 * value }; +//! let _ = r#loop::parallelize(0..10, double).collect::>().await; //! } //! # #[cfg(not(feature = "asynchronous"))] //! # fn main() {} diff --git a/src/support.rs b/src/support.rs index 2673225..7354c8a 100644 --- a/src/support.rs +++ b/src/support.rs @@ -1,7 +1,5 @@ -pub fn workers(value: Option) -> usize { - value.unwrap_or_else(|| { - std::thread::available_parallelism() - .map(|value| value.get()) - .unwrap_or(1) - }) +pub fn workers() -> usize { + std::thread::available_parallelism() + .map(|value| value.get()) + .unwrap_or(1) } diff --git a/src/synchronous.rs b/src/synchronous.rs index 912638f..4d6b19c 100644 --- a/src/synchronous.rs +++ b/src/synchronous.rs @@ -4,7 +4,6 @@ use std::sync::{mpsc, Arc, Mutex}; pub fn parallelize( items: Items, mut map: Map, - workers: Option, ) -> impl Iterator where Items: IntoIterator + Send + 'static, @@ -12,7 +11,7 @@ where Map: FnMut(Item) -> Output + Copy + Send + 'static, Output: Send + 'static, { - let workers = crate::support::workers(workers); + let workers = crate::support::workers(); let (item_sender, item_receiver) = mpsc::sync_channel::(workers); let (output_sender, output_receiver) = mpsc::sync_channel::(workers); let item_receiver = Arc::new(Mutex::new(item_receiver)); @@ -41,7 +40,7 @@ where mod tests { #[test] fn parallelize() { - let mut values = super::parallelize(0..10, double, None).collect::>(); + let mut values = super::parallelize(0..10, double).collect::>(); values.sort(); assert_eq!(values, &[0, 2, 4, 6, 8, 10, 12, 14, 16, 18]); }