Skip to content

Commit

Permalink
Stream Keyword Detection (#30)
Browse files Browse the repository at this point in the history
* detect stream keywords and restart or stop jobs based as a result

* Create EXAMPLES.md

* create job templates
  • Loading branch information
lubomirkurcak authored May 28, 2024
1 parent 40bb111 commit 56ada6d
Show file tree
Hide file tree
Showing 6 changed files with 329 additions and 16 deletions.
10 changes: 5 additions & 5 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
authors = ["Ľubomír Kurčák <lubomirkurcak@gmail.com>"]
name = "tend"
description = "Quickly spin up/down groups of command-line tasks with automated recovery"
version = "0.2.10"
version = "0.2.11"
edition = "2021"
license = "MIT OR Apache-2.0"
repository = "https://github.com/lubomirkurcak/tend"
Expand All @@ -21,7 +21,7 @@ colored = "2.1.0"
dirs-next = "2.0.0"
folktime = "0.2.1"
prettytable-rs = "0.10.0"
serde = { version = "1.0.202", features = ["derive"] }
serde = { version = "1.0.203", features = ["derive"] }
serde_json = "1.0.117"
tokio = { version = "1.37.0", features = [
"rt-multi-thread",
Expand Down
47 changes: 47 additions & 0 deletions EXAMPLES.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
# Examples

## Detecting errors and restarting jobs

Detect errors from job command's output. When an error is detected, restart it.

### Create a faulty command

Create a command `count` that prints "error" to `stderr` when the number is 5:

#### sh (Linux)

```sh
tend create --overwrite "count" -- sh -c 'for i in $(seq 1 10); do if [ $i -eq 5 ]; then echo "error" >&2; else echo "hello $i"; fi; sleep 1; done'
```

#### cmd (Windows)

```sh
tend create --overwrite "count" -- cmd /C "for /L %i in (1,1,10) do ((if %i==5 (echo error >&2) else (echo hello %i)) & timeout /t 1 /nobreak >nul)"
```

### Detect errors

Create a hook with name `error-hook` that detects the substring `error` in the `stderr` output of the command `count-err` and restarts the command when the substring is detected:

```sh
tend edit "count" hook create "error-hook" detect-substring --stream stderr "error" restart
```

## Stopping jobs

Stop a job when a certain condition is met.

### Create a command

```sh
tend create --overwrite ping-1111 1.1.1.1
```

### Detect a condition

Create a hook with name `stop-hook` that detects the substring `from 1.1.1.1` in the `stdout` output of the command `ping` and stops the command when the substring is detected:

```sh
tend edit "ping-1111" hook create "stop-hook" detect-substring --stream stdout "from 1.1.1.1" stop
```
53 changes: 53 additions & 0 deletions src/args.rs
Original file line number Diff line number Diff line change
Expand Up @@ -111,9 +111,18 @@ pub enum Commands {
default_value = "default"
)]
group: String,
#[arg(long, short = 't', help = "Template to use for job configuration")]
template: Option<crate::job::JobTemplate>,
#[arg(help = "Use -- to separate program arguments from job arguments.")]
args: Vec<String>,
},
#[command(alias = "e", alias = "ed", about = "Edit a job")]
Edit {
#[arg(help = "Name of the job to edit", exclusive = true)]
name: String,
#[command(subcommand)]
command: EditJobCommands,
},
#[command(alias = "d", alias = "rm", about = "Delete jobs")]
#[clap(group(clap::ArgGroup::new("input").required(true).args(&["name", "group", "job", "all"])))]
Delete {
Expand Down Expand Up @@ -151,3 +160,47 @@ pub enum Commands {
confirm: bool,
},
}

#[derive(Clone, Debug, Subcommand)]
pub enum EditJobCommands {
#[command(about = "Change the group of a job")]
Group {
#[arg(help = "New group name")]
group: String,
},
Hook {
#[command(subcommand)]
command: EditJobHookCommands,
},
}

#[derive(Clone, Debug, Subcommand)]
pub enum EditJobHookCommands {
List,
Create {
#[arg(help = "Name of the hook to create")]
hook: String,
#[command(subcommand)]
t: JobHook,
},
Delete {
#[arg(help = "Name of the hook to delete")]
hook: String,
},
}

#[derive(Clone, Debug, Subcommand)]
pub enum JobHook {
DetectSubstring {
substring: String,
#[arg(help = "Action to take when substring is detected.")]
action: crate::job::JobAction,
#[arg(
long,
short,
help = "Stream to detect substring in.",
default_value = "any"
)]
stream: crate::job::Stream,
},
}
139 changes: 131 additions & 8 deletions src/job/mod.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
use crate::colors::TendColors;
use anyhow::Result;
use clap::ValueEnum;
use folktime::Folktime;
use prettytable::{format, row, Table};
use serde::{Deserialize, Serialize};
use std::path::PathBuf;
use std::{collections::HashMap, path::PathBuf};
use tokio::{
io::{AsyncBufReadExt, BufReader},
process::Command,
Expand All @@ -17,16 +18,82 @@ pub struct Job {
pub program: String,
pub args: Vec<String>,
pub working_directory: PathBuf,
#[serde(default)]
pub restart: JobRestartBehavior,
#[serde(default)]
pub restart_strategy: JobRestartStrategy,
#[serde(default)]
pub event_hooks: HashMap<String, JobEventHook>,
#[serde(default)]
pub template: Option<JobTemplate>,
}

#[derive(Copy, Clone, Debug, ValueEnum, Serialize, Deserialize)]
pub enum JobTemplate {
PortForward,
}

enum JobControlFlow {
Nothing,
RestartCommand,
StopJob,
}

impl Job {
fn stdout_line_callback(&self, line: &str) -> JobControlFlow {
for hook in self.event_hooks.values() {
let JobEventHook {
event: JobEvent::DetectSubstring { stream, contains },
action,
} = hook;

let detection = match stream {
Stream::Stdout => line.contains(contains),
Stream::Stderr => false,
Stream::Any => line.contains(contains),
};

if detection {
match action {
JobAction::Restart => return JobControlFlow::RestartCommand,
JobAction::Stop => return JobControlFlow::StopJob,
}
}
}

JobControlFlow::Nothing
}

fn stderr_line_callback(&self, line: &str) -> JobControlFlow {
for hook in self.event_hooks.values() {
let JobEventHook {
event: JobEvent::DetectSubstring { stream, contains },
action,
} = hook;

let detection = match stream {
Stream::Stdout => false,
Stream::Stderr => line.contains(contains),
Stream::Any => line.contains(contains),
};

if detection {
match action {
JobAction::Restart => return JobControlFlow::RestartCommand,
JobAction::Stop => return JobControlFlow::StopJob,
}
}
}

JobControlFlow::Nothing
}
}

#[derive(Default, Debug, Clone, Serialize, Deserialize, clap::ValueEnum, Copy, PartialEq, Eq)]
pub enum JobRestartBehavior {
#[default]
Always,
OnSuccess,
#[default]
OnFailure,
Never,
}
Expand All @@ -50,6 +117,34 @@ impl JobRestartStrategy {
}
}

#[derive(Debug, Clone, Serialize, Deserialize, ValueEnum, Default)]
pub enum Stream {
Stdout,
Stderr,
#[default]
Any,
}

/// TODO: Rework [Job::restart] to use this instead of [JobRestartStrategy]
#[derive(Debug, Clone, Serialize, Deserialize, clap::Parser)]
pub enum JobEvent {
// FinishedSuccess,
// FinishedFailure,
DetectSubstring { stream: Stream, contains: String },
}

#[derive(Debug, Clone, Serialize, Deserialize, ValueEnum)]
pub enum JobAction {
Restart,
Stop,
}

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct JobEventHook {
pub event: JobEvent,
pub action: JobAction,
}

pub enum JobFilter {
All {
exclude: Vec<String>,
Expand Down Expand Up @@ -102,13 +197,13 @@ impl Job {
Ok(())
}

// pub fn load(name: &str) -> Result<Self> {
// let jobs = Self::jobs_dir()?;
// let file = std::fs::File::open(jobs.join(name))?;
// let job: Job = serde_json::from_reader(file)?;
pub fn load(name: &str) -> Result<Self> {
let jobs = Self::jobs_dir()?;
let file = std::fs::File::open(jobs.join(name))?;
let job: Job = serde_json::from_reader(file)?;

// Ok(job)
// }
Ok(job)
}

pub fn delete(&self) -> Result<()> {
let jobs = Self::jobs_dir()?;
Expand Down Expand Up @@ -226,12 +321,40 @@ impl Job {
stdout_line = stdout.next_line() => {
if let Some(line) = stdout_line? {
println!("{}{}", format!("{}: ", self.name).job(), line);

match self.stdout_line_callback(&line) {
JobControlFlow::Nothing => (),
JobControlFlow::RestartCommand => {
println!("{} restarting", self.name.job());
process.kill().await?;
break 'process;
},
JobControlFlow::StopJob => {
println!("{} stopping", self.name.job());
process.kill().await?;
break 'job;
},
};
}
continue 'process;
}
stderr_line = stderr.next_line() => {
if let Some(line) = stderr_line? {
println!("{}{}{}{}", self.name.job(), " (stderr)".failure(), ": ".job(), line);

match self.stderr_line_callback(&line) {
JobControlFlow::Nothing => (),
JobControlFlow::RestartCommand => {
println!("{} restarting", self.name.job());
process.kill().await?;
break 'process;
},
JobControlFlow::StopJob => {
println!("{} stopping", self.name.job());
process.kill().await?;
break 'job;
},
};
}
continue 'process;
}
Expand Down
Loading

0 comments on commit 56ada6d

Please sign in to comment.