Skip to content

Commit

Permalink
Add tests (#12)
Browse files Browse the repository at this point in the history
* add tests

* add tests

* remove running

* improve cron

---------

Co-authored-by: Tuyen Tran <tuytran@axon.com>
  • Loading branch information
tuyentv96 and Tuyen Tran authored Sep 8, 2024
1 parent fde8499 commit 05a7e83
Show file tree
Hide file tree
Showing 7 changed files with 295 additions and 67 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ jobs:
uses: actions-rs/cargo@v1
with:
command: tarpaulin
args: --all --verbose --all-features --out Lcov -- --test-threads 1
args: --all --verbose --all-features --out Lcov -- --test-threads 4

- name: Upload to CodeCov
uses: codecov/codecov-action@v4
Expand Down
13 changes: 4 additions & 9 deletions examples/async_simple.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,9 @@ use tokio::sync::Mutex;

#[tokio::main]
async fn main() {
let local_tz = Local::from_offset(&FixedOffset::east(7));
let local_tz = Local::from_offset(&FixedOffset::east_opt(7).unwrap());
let mut cron = AsyncCron::new(local_tz);

let first_job_id = cron.add_fn("* * * * * *", print_now).await;

cron.start().await;

let counter = Arc::new(Mutex::new(1));
Expand All @@ -23,14 +21,11 @@ async fn main() {
println!("{} counter value: {}", now, counter);
}
})
.await;
.await
.unwrap();

std::thread::sleep(std::time::Duration::from_secs(10));

// stop cron
cron.stop();
}

async fn print_now() {
println!("now: {}", Local::now().to_string());
cron.stop().await;
}
2 changes: 1 addition & 1 deletion examples/simple.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use chrono::{FixedOffset, Local, TimeZone};
use cron_tab;

fn main() {
let local_tz = Local::from_offset(&FixedOffset::east(7));
let local_tz = Local::from_offset(&FixedOffset::east_opt(7).unwrap());
let mut cron = cron_tab::Cron::new(local_tz);

let first_job_id = cron.add_fn("* * * * * * *", print_now).unwrap();
Expand Down
50 changes: 39 additions & 11 deletions src/async_cron.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,22 +12,38 @@ use tokio::time as tokio_time;
use crate::async_entry::{AsyncEntry, TaskWrapper};
use crate::{Result, MAX_WAIT_SECONDS};

/// The `AsyncCron` struct manages scheduled jobs that run asynchronously.
/// It holds the job entries, keeps track of job IDs, and manages the state
/// of the running cron.
#[derive(Clone)]
pub struct AsyncCron<Z>
where
Z: TimeZone + Send + Sync + 'static,
Z::Offset: Send,
{
/// A thread-safe, asynchronous list of job entries (schedules and tasks).
entries: Arc<Mutex<Vec<AsyncEntry<Z>>>>,

/// A counter for assigning unique IDs to job entries.
next_id: Arc<AtomicUsize>,

/// Indicates whether the cron is currently running.
running: Arc<AtomicBool>,

/// The timezone used for scheduling tasks.
tz: Z,

/// A channel sender for adding new entries to the cron scheduler.
add_tx: Arc<Mutex<Option<mpsc::UnboundedSender<AsyncEntry<Z>>>>>,

/// A channel sender for removing entries from the cron scheduler.
remove_tx: Arc<Mutex<Option<mpsc::UnboundedSender<usize>>>>,

/// A channel sender for stopping the cron scheduler.
stop_tx: Arc<Mutex<Option<mpsc::UnboundedSender<bool>>>>,
}

/// Cron contains and executes the scheduled jobs.
/// Implementation of the `AsyncCron` struct, which provides methods for managing scheduled tasks.
impl<Z> AsyncCron<Z>
where
Z: TimeZone + Send + Sync + 'static,
Expand Down Expand Up @@ -85,8 +101,11 @@ where
}
}

/// Run a blocking loop for schedule jobs
/// Starts the cron scheduler in a blocking loop, processing scheduled jobs.
/// The loop will sleep until the next scheduled job is ready to run, and it
/// will handle adding, removing, and stopping jobs.
pub async fn start_blocking(&mut self) {
// Channels for communicating with the cron loop (adding/removing/stopping jobs).
let (add_tx, mut add_rx) = mpsc::unbounded_channel();
let (remove_tx, mut remove_rx) = mpsc::unbounded_channel();
let (stop_tx, mut stop_rx) = mpsc::unbounded_channel();
Expand All @@ -97,19 +116,20 @@ where
*self.stop_tx.lock().await = Some(stop_tx);
}

self.running.store(true, Ordering::SeqCst);

// Initialize the next scheduled time for all entries.
for entry in self.entries.lock().await.iter_mut() {
entry.next = entry.get_next(self.get_timezone());
}

// default long timer duration
// Set a default long wait duration for sleeping.
let mut wait_duration = Duration::from_secs(MAX_WAIT_SECONDS);

loop {
// Lock and sort entries to prioritize the closest scheduled job.
let mut entries = self.entries.lock().await;
entries.sort_by(|b, a| b.next.cmp(&a.next));

// Determine the wait duration based on the next scheduled job.
if let Some(entry) = entries.first() {
// get first entry from sorted entries for timer duration
let wait_milis = (entry.next.as_ref().unwrap().timestamp_millis() as u64)
Expand All @@ -121,41 +141,46 @@ where
// release lock
drop(entries);

// Use `select!` to handle multiple asynchronous tasks concurrently.
select! {
// sleep and wait until next scheduled time
// Sleep until the next scheduled job is ready to run.
_ = tokio_time::sleep(wait_duration) => {
let now = self.now();
for entry in self.entries.lock().await.iter_mut() {
if entry.next.as_ref().unwrap().gt(&now) {
break;
}

// Spawn the job to run asynchronously.
let run = entry.run.clone();
tokio::spawn(async move {
run.as_ref().get_pinned().await;
});

// Schedule the next run of the job.
entry.next = entry.get_next(self.get_timezone());
}
},
// wait new entry added signal
// Add a new entry to the scheduler.
new_entry = add_rx.recv() => {
let mut entry = new_entry.unwrap();
entry.next = entry.get_next(self.get_timezone());
self.entries.lock().await.push(entry);
},
// wait entry removed signal
// Remove an entry from the scheduler by ID.
id = remove_rx.recv() => {
self.remove_entry(id.unwrap()).await;
},
// wait cron stopped signal
// Stop the cron scheduler.
_ = stop_rx.recv() => {
return;
},
}
}
}

/// Schedules a new job by creating an `AsyncEntry` and adding it to the scheduler.
/// If the scheduler is running, the job is added via the channel; otherwise, it's added directly.
async fn schedule<F, T>(&mut self, schedule: cron::Schedule, f: F) -> Result<usize>
where
F: 'static + Fn() -> T + Send + Sync,
Expand All @@ -170,8 +195,10 @@ where
run: Arc::new(TaskWrapper::new(f)),
};

// Determine the next scheduled time for the job.
entry.next = entry.get_next(self.get_timezone());

// If the cron is running, send the entry via the channel; otherwise, add it directly.
match self.add_tx.lock().await.as_ref() {
Some(tx) if self.running.load(Ordering::SeqCst) => tx.send(entry).unwrap(),
_ => self.entries.lock().await.push(entry),
Expand All @@ -185,10 +212,11 @@ where
self.tz = tz;
}

/// Start cron in background.
/// A toki stask will be spawn for schedule jobs
/// Starts the cron scheduler in the background.
/// A separate task is spawned to handle scheduled jobs asynchronously.
pub async fn start(&mut self) {
let mut cloned = self.clone();
self.running.store(true, Ordering::SeqCst);
tokio::spawn(async move {
cloned.start_blocking().await;
});
Expand Down
64 changes: 19 additions & 45 deletions src/cron.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use std::str::FromStr;
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::{Arc, Mutex};
use std::thread;
use std::time::Duration;
Expand All @@ -19,11 +19,15 @@ where
{
entries: Arc<Mutex<Vec<Entry<Z>>>>,
next_id: Arc<AtomicUsize>,
running: Arc<AtomicBool>,
tz: Z,
add_tx: Option<crossbeam_channel::Sender<Entry<Z>>>,
remove_tx: Option<crossbeam_channel::Sender<usize>>,
stop_tx: Option<crossbeam_channel::Sender<bool>>,
add_channel: (
crossbeam_channel::Sender<Entry<Z>>,
crossbeam_channel::Receiver<Entry<Z>>,
),
stop_channel: (
crossbeam_channel::Sender<bool>,
crossbeam_channel::Receiver<bool>,
),
}

/// Cron contains and executes the scheduled jobs.
Expand All @@ -45,11 +49,9 @@ where
Cron {
entries: Arc::new(Mutex::new(Vec::new())),
next_id: Arc::new(AtomicUsize::new(0)),
running: Arc::new(AtomicBool::new(false)),
tz,
add_tx: None,
remove_tx: None,
stop_tx: None,
add_channel: crossbeam_channel::unbounded(),
stop_channel: crossbeam_channel::unbounded(),
}
}

Expand Down Expand Up @@ -86,11 +88,7 @@ where
};

entry.next = entry.schedule_next(self.get_timezone());

match self.add_tx.as_ref() {
Some(tx) if self.running.load(Ordering::SeqCst) => tx.send(entry).unwrap(),
_ => self.entries.lock().unwrap().push(entry),
}
self.add_channel.0.send(entry).unwrap();

Ok(next_id)
}
Expand Down Expand Up @@ -132,15 +130,7 @@ where
/// cron.remove(job_id);
/// ```
pub fn remove(&self, id: usize) {
if self.running.load(Ordering::SeqCst) {
if let Some(tx) = self.remove_tx.as_ref() {
tx.send(id).unwrap();
}

return;
}

self.remove_entry(id);
self.remove_entry(id)
}

/// Stop Cron.
Expand All @@ -154,9 +144,7 @@ where
/// cron.stop();
/// ```
pub fn stop(&self) {
if let Some(tx) = self.stop_tx.as_ref() {
tx.send(true).unwrap();
}
self.stop_channel.0.send(true).unwrap()
}

/// Start cron.
Expand All @@ -170,24 +158,15 @@ where
/// cron.start();
/// ```
pub fn start(&mut self) {
let mut cloned_cron = self.clone();
let mut cron = self.clone();

thread::spawn(move || {
cloned_cron.start_blocking();
cron.start_blocking();
});
}

/// Run a loop for schedule jobs
pub fn start_blocking(&mut self) {
let (add_tx, add_rx) = crossbeam_channel::unbounded();
let (remove_tx, remove_rx) = crossbeam_channel::unbounded();
let (stop_tx, stop_rx) = crossbeam_channel::unbounded();

self.add_tx = Some(add_tx);
self.remove_tx = Some(remove_tx);
self.stop_tx = Some(stop_tx);

self.running.store(true, Ordering::SeqCst);

for entry in self.entries.lock().unwrap().iter_mut() {
entry.next = entry.schedule_next(self.get_timezone());
}
Expand Down Expand Up @@ -228,17 +207,12 @@ where
}
},
// wait add new entry
recv(add_rx) -> new_entry => {
recv(self.add_channel.1) -> new_entry => {
let mut entry = new_entry.unwrap();
entry.next = entry.schedule_next(self.get_timezone());
self.entries.lock().unwrap().push(entry);
},
// wait remove entry
recv(remove_rx) -> id => {
self.remove_entry(id.unwrap());
},
// wait stop cron
recv(stop_rx) -> _ => {
recv(self.stop_channel.1) -> _ => {
return;
},
}
Expand Down
Loading

0 comments on commit 05a7e83

Please sign in to comment.