From 05a7e830b21f6ced84e2ca57c048f8f474a08a84 Mon Sep 17 00:00:00 2001 From: tuyentv96 Date: Sun, 8 Sep 2024 09:53:02 +0700 Subject: [PATCH] Add tests (#12) * add tests * add tests * remove running * improve cron --------- Co-authored-by: Tuyen Tran --- .github/workflows/build.yml | 2 +- examples/async_simple.rs | 13 ++-- examples/simple.rs | 2 +- src/async_cron.rs | 50 +++++++++++---- src/cron.rs | 64 ++++++------------- tests/async.rs | 113 ++++++++++++++++++++++++++++++++++ tests/sync.rs | 118 ++++++++++++++++++++++++++++++++++++ 7 files changed, 295 insertions(+), 67 deletions(-) create mode 100644 tests/async.rs create mode 100644 tests/sync.rs diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index 416ab5b..d192c90 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -53,7 +53,7 @@ jobs: uses: actions-rs/cargo@v1 with: command: tarpaulin - args: --all --verbose --all-features --out Lcov -- --test-threads 1 + args: --all --verbose --all-features --out Lcov -- --test-threads 4 - name: Upload to CodeCov uses: codecov/codecov-action@v4 diff --git a/examples/async_simple.rs b/examples/async_simple.rs index d31d704..2f623c5 100644 --- a/examples/async_simple.rs +++ b/examples/async_simple.rs @@ -6,11 +6,9 @@ use tokio::sync::Mutex; #[tokio::main] async fn main() { - let local_tz = Local::from_offset(&FixedOffset::east(7)); + let local_tz = Local::from_offset(&FixedOffset::east_opt(7).unwrap()); let mut cron = AsyncCron::new(local_tz); - let first_job_id = cron.add_fn("* * * * * *", print_now).await; - cron.start().await; let counter = Arc::new(Mutex::new(1)); @@ -23,14 +21,11 @@ async fn main() { println!("{} counter value: {}", now, counter); } }) - .await; + .await + .unwrap(); std::thread::sleep(std::time::Duration::from_secs(10)); // stop cron - cron.stop(); -} - -async fn print_now() { - println!("now: {}", Local::now().to_string()); + cron.stop().await; } diff --git a/examples/simple.rs b/examples/simple.rs index 98d2650..40fdaed 100644 --- a/examples/simple.rs +++ b/examples/simple.rs @@ -2,7 +2,7 @@ use chrono::{FixedOffset, Local, TimeZone}; use cron_tab; fn main() { - let local_tz = Local::from_offset(&FixedOffset::east(7)); + let local_tz = Local::from_offset(&FixedOffset::east_opt(7).unwrap()); let mut cron = cron_tab::Cron::new(local_tz); let first_job_id = cron.add_fn("* * * * * * *", print_now).unwrap(); diff --git a/src/async_cron.rs b/src/async_cron.rs index 58cffce..fa63906 100644 --- a/src/async_cron.rs +++ b/src/async_cron.rs @@ -12,22 +12,38 @@ use tokio::time as tokio_time; use crate::async_entry::{AsyncEntry, TaskWrapper}; use crate::{Result, MAX_WAIT_SECONDS}; +/// The `AsyncCron` struct manages scheduled jobs that run asynchronously. +/// It holds the job entries, keeps track of job IDs, and manages the state +/// of the running cron. #[derive(Clone)] pub struct AsyncCron where Z: TimeZone + Send + Sync + 'static, Z::Offset: Send, { + /// A thread-safe, asynchronous list of job entries (schedules and tasks). entries: Arc>>>, + + /// A counter for assigning unique IDs to job entries. next_id: Arc, + + /// Indicates whether the cron is currently running. running: Arc, + + /// The timezone used for scheduling tasks. tz: Z, + + /// A channel sender for adding new entries to the cron scheduler. add_tx: Arc>>>>, + + /// A channel sender for removing entries from the cron scheduler. remove_tx: Arc>>>, + + /// A channel sender for stopping the cron scheduler. stop_tx: Arc>>>, } -/// Cron contains and executes the scheduled jobs. +/// Implementation of the `AsyncCron` struct, which provides methods for managing scheduled tasks. impl AsyncCron where Z: TimeZone + Send + Sync + 'static, @@ -85,8 +101,11 @@ where } } - /// Run a blocking loop for schedule jobs + /// Starts the cron scheduler in a blocking loop, processing scheduled jobs. + /// The loop will sleep until the next scheduled job is ready to run, and it + /// will handle adding, removing, and stopping jobs. pub async fn start_blocking(&mut self) { + // Channels for communicating with the cron loop (adding/removing/stopping jobs). let (add_tx, mut add_rx) = mpsc::unbounded_channel(); let (remove_tx, mut remove_rx) = mpsc::unbounded_channel(); let (stop_tx, mut stop_rx) = mpsc::unbounded_channel(); @@ -97,19 +116,20 @@ where *self.stop_tx.lock().await = Some(stop_tx); } - self.running.store(true, Ordering::SeqCst); - + // Initialize the next scheduled time for all entries. for entry in self.entries.lock().await.iter_mut() { entry.next = entry.get_next(self.get_timezone()); } - // default long timer duration + // Set a default long wait duration for sleeping. let mut wait_duration = Duration::from_secs(MAX_WAIT_SECONDS); loop { + // Lock and sort entries to prioritize the closest scheduled job. let mut entries = self.entries.lock().await; entries.sort_by(|b, a| b.next.cmp(&a.next)); + // Determine the wait duration based on the next scheduled job. if let Some(entry) = entries.first() { // get first entry from sorted entries for timer duration let wait_milis = (entry.next.as_ref().unwrap().timestamp_millis() as u64) @@ -121,8 +141,9 @@ where // release lock drop(entries); + // Use `select!` to handle multiple asynchronous tasks concurrently. select! { - // sleep and wait until next scheduled time + // Sleep until the next scheduled job is ready to run. _ = tokio_time::sleep(wait_duration) => { let now = self.now(); for entry in self.entries.lock().await.iter_mut() { @@ -130,25 +151,27 @@ where break; } + // Spawn the job to run asynchronously. let run = entry.run.clone(); tokio::spawn(async move { run.as_ref().get_pinned().await; }); + // Schedule the next run of the job. entry.next = entry.get_next(self.get_timezone()); } }, - // wait new entry added signal + // Add a new entry to the scheduler. new_entry = add_rx.recv() => { let mut entry = new_entry.unwrap(); entry.next = entry.get_next(self.get_timezone()); self.entries.lock().await.push(entry); }, - // wait entry removed signal + // Remove an entry from the scheduler by ID. id = remove_rx.recv() => { self.remove_entry(id.unwrap()).await; }, - // wait cron stopped signal + // Stop the cron scheduler. _ = stop_rx.recv() => { return; }, @@ -156,6 +179,8 @@ where } } + /// Schedules a new job by creating an `AsyncEntry` and adding it to the scheduler. + /// If the scheduler is running, the job is added via the channel; otherwise, it's added directly. async fn schedule(&mut self, schedule: cron::Schedule, f: F) -> Result where F: 'static + Fn() -> T + Send + Sync, @@ -170,8 +195,10 @@ where run: Arc::new(TaskWrapper::new(f)), }; + // Determine the next scheduled time for the job. entry.next = entry.get_next(self.get_timezone()); + // If the cron is running, send the entry via the channel; otherwise, add it directly. match self.add_tx.lock().await.as_ref() { Some(tx) if self.running.load(Ordering::SeqCst) => tx.send(entry).unwrap(), _ => self.entries.lock().await.push(entry), @@ -185,10 +212,11 @@ where self.tz = tz; } - /// Start cron in background. - /// A toki stask will be spawn for schedule jobs + /// Starts the cron scheduler in the background. + /// A separate task is spawned to handle scheduled jobs asynchronously. pub async fn start(&mut self) { let mut cloned = self.clone(); + self.running.store(true, Ordering::SeqCst); tokio::spawn(async move { cloned.start_blocking().await; }); diff --git a/src/cron.rs b/src/cron.rs index 1aff836..5a8a3b1 100644 --- a/src/cron.rs +++ b/src/cron.rs @@ -1,5 +1,5 @@ use std::str::FromStr; -use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; +use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::{Arc, Mutex}; use std::thread; use std::time::Duration; @@ -19,11 +19,15 @@ where { entries: Arc>>>, next_id: Arc, - running: Arc, tz: Z, - add_tx: Option>>, - remove_tx: Option>, - stop_tx: Option>, + add_channel: ( + crossbeam_channel::Sender>, + crossbeam_channel::Receiver>, + ), + stop_channel: ( + crossbeam_channel::Sender, + crossbeam_channel::Receiver, + ), } /// Cron contains and executes the scheduled jobs. @@ -45,11 +49,9 @@ where Cron { entries: Arc::new(Mutex::new(Vec::new())), next_id: Arc::new(AtomicUsize::new(0)), - running: Arc::new(AtomicBool::new(false)), tz, - add_tx: None, - remove_tx: None, - stop_tx: None, + add_channel: crossbeam_channel::unbounded(), + stop_channel: crossbeam_channel::unbounded(), } } @@ -86,11 +88,7 @@ where }; entry.next = entry.schedule_next(self.get_timezone()); - - match self.add_tx.as_ref() { - Some(tx) if self.running.load(Ordering::SeqCst) => tx.send(entry).unwrap(), - _ => self.entries.lock().unwrap().push(entry), - } + self.add_channel.0.send(entry).unwrap(); Ok(next_id) } @@ -132,15 +130,7 @@ where /// cron.remove(job_id); /// ``` pub fn remove(&self, id: usize) { - if self.running.load(Ordering::SeqCst) { - if let Some(tx) = self.remove_tx.as_ref() { - tx.send(id).unwrap(); - } - - return; - } - - self.remove_entry(id); + self.remove_entry(id) } /// Stop Cron. @@ -154,9 +144,7 @@ where /// cron.stop(); /// ``` pub fn stop(&self) { - if let Some(tx) = self.stop_tx.as_ref() { - tx.send(true).unwrap(); - } + self.stop_channel.0.send(true).unwrap() } /// Start cron. @@ -170,24 +158,15 @@ where /// cron.start(); /// ``` pub fn start(&mut self) { - let mut cloned_cron = self.clone(); + let mut cron = self.clone(); + thread::spawn(move || { - cloned_cron.start_blocking(); + cron.start_blocking(); }); } /// Run a loop for schedule jobs pub fn start_blocking(&mut self) { - let (add_tx, add_rx) = crossbeam_channel::unbounded(); - let (remove_tx, remove_rx) = crossbeam_channel::unbounded(); - let (stop_tx, stop_rx) = crossbeam_channel::unbounded(); - - self.add_tx = Some(add_tx); - self.remove_tx = Some(remove_tx); - self.stop_tx = Some(stop_tx); - - self.running.store(true, Ordering::SeqCst); - for entry in self.entries.lock().unwrap().iter_mut() { entry.next = entry.schedule_next(self.get_timezone()); } @@ -228,17 +207,12 @@ where } }, // wait add new entry - recv(add_rx) -> new_entry => { + recv(self.add_channel.1) -> new_entry => { let mut entry = new_entry.unwrap(); entry.next = entry.schedule_next(self.get_timezone()); self.entries.lock().unwrap().push(entry); }, - // wait remove entry - recv(remove_rx) -> id => { - self.remove_entry(id.unwrap()); - }, - // wait stop cron - recv(stop_rx) -> _ => { + recv(self.stop_channel.1) -> _ => { return; }, } diff --git a/tests/async.rs b/tests/async.rs new file mode 100644 index 0000000..f82ba6a --- /dev/null +++ b/tests/async.rs @@ -0,0 +1,113 @@ +#[cfg(test)] +mod tests { + use std::sync::Arc; + + use chrono::{FixedOffset, Local, TimeZone}; + use cron_tab::AsyncCron; + use tokio::sync::Mutex; + use tokio::time::{sleep, Duration}; + + #[tokio::test] + async fn start_and_stop_cron() { + let local_tz = Local::from_offset(&FixedOffset::east_opt(7).unwrap()); + let mut cron = AsyncCron::new(local_tz); + + cron.start().await; + cron.stop().await; + } + + #[tokio::test] + async fn add_job() { + let local_tz = Local::from_offset(&FixedOffset::east_opt(7).unwrap()); + let mut cron = AsyncCron::new(local_tz); + + cron.start().await; + + let counter = Arc::new(Mutex::new(0)); + let counter1 = Arc::clone(&counter); + + cron.add_fn("* * * * * *", move || { + let counter1 = Arc::clone(&counter1); + async move { + let mut value = counter1.lock().await; + *value += 1; + } + }) + .await + .unwrap(); + + sleep(Duration::from_millis(2001)).await; + let value = *counter.lock().await; + assert_eq!(value, 2) + } + + #[tokio::test] + async fn add_multiple_jobs() { + let local_tz = Local::from_offset(&FixedOffset::east_opt(7).unwrap()); + let mut cron = AsyncCron::new(local_tz); + + cron.start().await; + + let counter1 = Arc::new(Mutex::new(0)); + let c1 = Arc::clone(&counter1); + + cron.add_fn("* * * * * *", move || { + let counter = Arc::clone(&c1); + async move { + let mut value = counter.lock().await; + *value += 1; + } + }) + .await + .unwrap(); + + let counter2 = Arc::new(Mutex::new(0)); + let c2 = Arc::clone(&counter2); + cron.add_fn("*/2 * * * * *", move || { + let counter = Arc::clone(&c2); + async move { + let mut value = counter.lock().await; + *value += 1; + } + }) + .await + .unwrap(); + + sleep(Duration::from_millis(2001)).await; + let value1 = *counter1.lock().await; + let value2 = *counter2.lock().await; + assert_eq!(value1, 2); + assert_eq!(value2, 1); + } + + #[tokio::test] + async fn remove_job() { + let local_tz = Local::from_offset(&FixedOffset::east_opt(7).unwrap()); + let mut cron = AsyncCron::new(local_tz); + + cron.start().await; + + let counter = Arc::new(Mutex::new(0)); + let counter1 = Arc::clone(&counter); + + let job_id = cron + .add_fn("* * * * * *", move || { + let counter1 = Arc::clone(&counter1); + async move { + let mut value = counter1.lock().await; + *value += 1; + } + }) + .await + .unwrap(); + + sleep(Duration::from_millis(1001)).await; + let value = *counter.lock().await; + assert_eq!(value, 1); + cron.remove(job_id).await; + + sleep(Duration::from_millis(1001)).await; + let value = *counter.lock().await; + assert_eq!(value, 1) + } +} diff --git a/tests/sync.rs b/tests/sync.rs new file mode 100644 index 0000000..8871803 --- /dev/null +++ b/tests/sync.rs @@ -0,0 +1,118 @@ +#[cfg(test)] +mod tests { + use std::{ + sync::{Arc, Mutex}, + thread::sleep, + time::Duration, + }; + + use chrono::{FixedOffset, Local, TimeZone}; + use cron_tab::Cron; + + #[test] + fn start_and_stop_cron() { + let local_tz = Local::from_offset(&FixedOffset::east_opt(7).unwrap()); + let mut cron = Cron::new(local_tz); + + cron.start(); + cron.stop(); + } + + #[test] + fn add_job_before_start() { + let local_tz = Local::from_offset(&FixedOffset::east_opt(7).unwrap()); + let mut cron = Cron::new(local_tz); + + let counter = Arc::new(Mutex::new(0)); + let counter1 = Arc::clone(&counter); + + cron.add_fn("* * * * * *", move || { + let mut value = counter1.lock().unwrap(); + *value += 1; + }) + .unwrap(); + + cron.start(); + + sleep(Duration::from_millis(2001)); + let value = *counter.lock().unwrap(); + assert_eq!(value, 2) + } + + #[test] + fn add_job() { + let local_tz = Local::from_offset(&FixedOffset::east_opt(7).unwrap()); + let mut cron = Cron::new(local_tz); + + cron.start(); + + let counter = Arc::new(Mutex::new(0)); + let counter1 = Arc::clone(&counter); + + cron.add_fn("* * * * * *", move || { + let mut value = counter1.lock().unwrap(); + *value += 1; + }) + .unwrap(); + + sleep(Duration::from_millis(2001)); + let value = *counter.lock().unwrap(); + assert_eq!(value, 2) + } + + #[test] + fn add_multiple_jobs() { + let local_tz = Local::from_offset(&FixedOffset::east_opt(7).unwrap()); + let mut cron = Cron::new(local_tz); + + cron.start(); + + let counter1 = Arc::new(Mutex::new(0)); + let c1 = Arc::clone(&counter1); + + cron.add_fn("* * * * * *", move || { + let mut value = c1.lock().unwrap(); + *value += 1; + }) + .unwrap(); + + let counter2 = Arc::new(Mutex::new(0)); + let c2 = Arc::clone(&counter2); + cron.add_fn("*/2 * * * * *", move || { + let mut value = c2.lock().unwrap(); + *value += 1; + }) + .unwrap(); + + sleep(Duration::from_millis(2001)); + let value1 = *counter1.lock().unwrap(); + let value2 = *counter2.lock().unwrap(); + assert_eq!(value1, 2); + assert_eq!(value2, 1); + } + + #[test] + fn remove_job() { + let local_tz = Local::from_offset(&FixedOffset::east_opt(7).unwrap()); + let mut cron = Cron::new(local_tz); + + cron.start(); + + let counter = Arc::new(Mutex::new(0)); + let counter1 = Arc::clone(&counter); + + let job_id = cron + .add_fn("* * * * * *", move || { + *counter1.lock().unwrap() += 1; + }) + .unwrap(); + + sleep(Duration::from_millis(1001)); + assert_eq!(*counter.lock().unwrap(), 1); + cron.remove(job_id); + + sleep(Duration::from_millis(1001)); + let value = *counter.lock().unwrap(); + assert_eq!(value, 1) + } +}