From 476f39bcbfa8548d72eed421be8abd725e64f995 Mon Sep 17 00:00:00 2001 From: Maxim Zhiburt Date: Wed, 28 Feb 2024 02:21:57 +0300 Subject: [PATCH] [WIP] Move to trait based version --- Cargo.toml | 9 + examples/bash.rs | 2 +- examples/check.rs | 10 +- examples/expect_line.rs | 6 +- examples/ftp.rs | 2 +- examples/ftp_interact.rs | 37 +-- examples/interact.rs | 4 +- examples/interact_with_callback.rs | 81 ++++--- examples/log.rs | 2 +- examples/ping.rs | 2 +- examples/powershell.rs | 5 +- examples/python.rs | 2 +- examples/shell.rs | 11 +- src/check_macros.rs | 2 +- src/expect.rs | 67 ++++++ src/interact/mod.rs | 2 - src/interact/opts.rs | 175 -------------- src/interact/session.rs | 365 +++++++++++++++++++---------- src/lib.rs | 13 +- src/process/mod.rs | 78 +++++- src/process/unix.rs | 83 +++++-- src/repl.rs | 178 ++++++++++---- src/session/mod.rs | 8 +- src/session/sync_session.rs | 340 +++++++++++++++++---------- src/stream/log.rs | 13 +- tests/check.rs | 2 +- tests/expect.rs | 3 +- tests/interact.rs | 4 +- tests/io.rs | 2 +- tests/repl.rs | 3 +- tests/session.rs | 4 +- 31 files changed, 909 insertions(+), 606 deletions(-) create mode 100644 src/expect.rs delete mode 100644 src/interact/opts.rs diff --git a/Cargo.toml b/Cargo.toml index 3e353cc..8717029 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -36,3 +36,12 @@ crossbeam-channel = { version = "0.5.6", optional = true } [package.metadata.docs.rs] all-features = false + +[[target.'cfg(unix)'.example]] +name = "log" +path = "examples/log.rs" + +[[target.'cfg(windows)'.example]] +name = "powershell" +path = "examples/powershell.rs" + diff --git a/examples/bash.rs b/examples/bash.rs index 38bab9e..8e0fbbc 100644 --- a/examples/bash.rs +++ b/examples/bash.rs @@ -1,7 +1,7 @@ // An example is based on README.md from https://github.com/philippkeller/rexpect #[cfg(unix)] -use expectrl::{repl::spawn_bash, ControlCode, Regex}; +use expectrl::{repl::spawn_bash, ControlCode, Regex, Expect}; #[cfg(unix)] #[cfg(not(feature = "async"))] diff --git a/examples/check.rs b/examples/check.rs index 9968513..d6806f1 100644 --- a/examples/check.rs +++ b/examples/check.rs @@ -1,19 +1,19 @@ -use expectrl::{check, spawn, Error}; +use expectrl::{check, spawn, Error, Expect}; #[cfg(not(feature = "async"))] fn main() { - let mut session = spawn("python ./tests/source/ansi.py").expect("Can't spawn a session"); + let mut p = spawn("python ./tests/source/ansi.py").expect("Can't spawn a session"); loop { match check!( - &mut session, + &mut p, _ = "Password: " => { println!("Set password to SECURE_PASSWORD"); - session.send_line("SECURE_PASSWORD").unwrap(); + p.send_line("SECURE_PASSWORD").unwrap(); }, _ = "Continue [y/n]:" => { println!("Stop processing"); - session.send_line("n").unwrap(); + p.send_line("n").unwrap(); }, ) { Err(Error::Eof) => break, diff --git a/examples/expect_line.rs b/examples/expect_line.rs index 6a79157..57d8323 100644 --- a/examples/expect_line.rs +++ b/examples/expect_line.rs @@ -1,11 +1,11 @@ -use expectrl::{self, Any, Eof}; +use expectrl::{self, Any, Eof, Expect}; #[cfg(not(feature = "async"))] fn main() { - let mut session = expectrl::spawn("ls -al").expect("Can't spawn a session"); + let mut p = expectrl::spawn("ls -al").expect("Can't spawn a session"); loop { - let m = session + let m = p .expect(Any::boxed(vec![ Box::new("\r"), Box::new("\n"), diff --git a/examples/ftp.rs b/examples/ftp.rs index 0ad9cd2..2b65d22 100644 --- a/examples/ftp.rs +++ b/examples/ftp.rs @@ -1,4 +1,4 @@ -use expectrl::{spawn, ControlCode, Error, Regex}; +use expectrl::{spawn, ControlCode, Error, Regex, Expect}; #[cfg(not(feature = "async"))] fn main() -> Result<(), Error> { diff --git a/examples/ftp_interact.rs b/examples/ftp_interact.rs index 7114aad..2c4918f 100644 --- a/examples/ftp_interact.rs +++ b/examples/ftp_interact.rs @@ -1,32 +1,33 @@ use expectrl::{ - interact::{actions::lookup::Lookup, InteractOptions}, - spawn, - stream::stdin::Stdin, - ControlCode, Error, Regex, + interact::actions::lookup::Lookup, spawn, stream::stdin::Stdin, ControlCode, Error, Expect, + Regex, }; use std::io::stdout; #[cfg(not(all(windows, feature = "polling")))] #[cfg(not(feature = "async"))] fn main() -> Result<(), Error> { + let mut p = spawn("ftp bks4-speedtest-1.tele2.net")?; + let mut auth = false; let mut login_lookup = Lookup::new(); - let opts = InteractOptions::new(&mut auth).on_output(|ctx| { - if login_lookup - .on(ctx.buf, ctx.eof, "Login successful")? - .is_some() - { - **ctx.state = true; - return Ok(true); - } - - Ok(false) - }); + let mut stdin = Stdin::open()?; - let mut p = spawn("ftp bks4-speedtest-1.tele2.net")?; + p.interact(&mut stdin, stdout()) + .set_state(&mut auth) + .on_output(move |ctx| { + if login_lookup + .on(ctx.buf, ctx.eof, "Login successful")? + .is_some() + { + **ctx.state = true; + return Ok(true); + } + + Ok(false) + }) + .spawn()?; - let mut stdin = Stdin::open()?; - p.interact(&mut stdin, stdout()).spawn(opts)?; stdin.close()?; if !auth { diff --git a/examples/interact.rs b/examples/interact.rs index decbac1..8411417 100644 --- a/examples/interact.rs +++ b/examples/interact.rs @@ -1,6 +1,6 @@ //! To run an example run `cargo run --example interact`. -use expectrl::{interact::InteractOptions, spawn, stream::stdin::Stdin}; +use expectrl::{spawn, stream::stdin::Stdin}; use std::io::stdout; #[cfg(unix)] @@ -20,7 +20,7 @@ fn main() { let mut stdin = Stdin::open().expect("Failed to create stdin"); sh.interact(&mut stdin, stdout()) - .spawn(&mut InteractOptions::default()) + .spawn() .expect("Failed to start interact"); stdin.close().expect("Failed to close a stdin"); diff --git a/examples/interact_with_callback.rs b/examples/interact_with_callback.rs index adbfb28..3e91fc0 100644 --- a/examples/interact_with_callback.rs +++ b/examples/interact_with_callback.rs @@ -1,9 +1,4 @@ -use expectrl::{ - interact::{actions::lookup::Lookup, InteractOptions}, - spawn, - stream::stdin::Stdin, - Regex, -}; +use expectrl::{interact::actions::lookup::Lookup, spawn, stream::stdin::Stdin, Regex}; #[derive(Debug, Default)] struct State { @@ -18,53 +13,57 @@ fn main() { let mut output_action = Lookup::new(); let mut input_action = Lookup::new(); let mut state = State::default(); - let opts = InteractOptions::new(&mut state) - .on_output(|mut ctx| { - let m = output_action.on(ctx.buf, ctx.eof, "Continue [y/n]:")?; - if m.is_some() { - ctx.state.wait_for_continue = Some(true); - }; - let m = output_action.on(ctx.buf, ctx.eof, Regex("status:\\s*.*\\w+.*\\r\\n"))?; - if m.is_some() { - ctx.state.stutus_verification_counter = - Some(ctx.state.stutus_verification_counter.map_or(1, |c| c + 1)); - output_action.clear(); - } + let mut session = spawn("python ./tests/source/ansi.py").expect("Can't spawn a session"); - Ok(false) - }) - .on_input(|mut ctx| { - let m = input_action.on(ctx.buf, ctx.eof, "y")?; - if m.is_some() { - if let Some(_a @ true) = ctx.state.wait_for_continue { - ctx.state.pressed_yes_on_continue = Some(true); - } - }; + let mut stdin = Stdin::open().unwrap(); + let stdout = std::io::stdout(); - let m = input_action.on(ctx.buf, ctx.eof, "n")?; - if m.is_some() { - if let Some(_a @ true) = ctx.state.wait_for_continue { - ctx.state.pressed_yes_on_continue = Some(false); + let (is_alive, status) = { + let mut interact = session.interact(&mut stdin, stdout).set_state(&mut state); + interact + .on_output(move |ctx| { + let m = output_action.on(ctx.buf, ctx.eof, "Continue [y/n]:")?; + if m.is_some() { + ctx.state.wait_for_continue = Some(true); + }; + + let m = output_action.on(ctx.buf, ctx.eof, Regex("status:\\s*.*\\w+.*\\r\\n"))?; + if m.is_some() { + ctx.state.stutus_verification_counter = + Some(ctx.state.stutus_verification_counter.map_or(1, |c| c + 1)); + output_action.clear(); } - } - Ok(false) - }); + Ok(false) + }) + .on_input(move |ctx| { + let m = input_action.on(ctx.buf, ctx.eof, "y")?; + if m.is_some() { + if let Some(_a @ true) = ctx.state.wait_for_continue { + ctx.state.pressed_yes_on_continue = Some(true); + } + }; + + let m = input_action.on(ctx.buf, ctx.eof, "n")?; + if m.is_some() { + if let Some(_a @ true) = ctx.state.wait_for_continue { + ctx.state.pressed_yes_on_continue = Some(false); + } + } - let mut session = spawn("python ./tests/source/ansi.py").expect("Can't spawn a session"); + Ok(false) + }); - let mut stdin = Stdin::open().unwrap(); - let stdout = std::io::stdout(); - - let mut interact = session.interact(&mut stdin, stdout); + let is_alive = interact.spawn().expect("Failed to start interact"); - let is_alive = interact.spawn(opts).expect("Failed to start interact"); + (is_alive, interact.get_status()) + }; if !is_alive { println!("The process was exited"); #[cfg(unix)] - println!("Status={:?}", interact.get_status()); + println!("Status={:?}", status); } stdin.close().unwrap(); diff --git a/examples/log.rs b/examples/log.rs index 55156c7..5e6f1a2 100644 --- a/examples/log.rs +++ b/examples/log.rs @@ -1,4 +1,4 @@ -use expectrl::{spawn, Error}; +use expectrl::{spawn, Error, Expect}; fn main() -> Result<(), Error> { let p = spawn("cat")?; diff --git a/examples/ping.rs b/examples/ping.rs index ebb258b..1ea1946 100644 --- a/examples/ping.rs +++ b/examples/ping.rs @@ -1,5 +1,5 @@ #[cfg(unix)] -use expectrl::{repl::spawn_bash, ControlCode, Error}; +use expectrl::{repl::spawn_bash, ControlCode, Error, Expect}; #[cfg(unix)] #[cfg(not(feature = "async"))] diff --git a/examples/powershell.rs b/examples/powershell.rs index cdfa5ab..4337028 100644 --- a/examples/powershell.rs +++ b/examples/powershell.rs @@ -1,7 +1,8 @@ #[cfg(windows)] -fn main() { - use expectrl::{repl::spawn_powershell, ControlCode, Regex}; +use expectrl::{repl::spawn_powershell, ControlCode, Regex}; +#[cfg(windows)] +fn main() { #[cfg(feature = "async")] { futures_lite::future::block_on(async { diff --git a/examples/python.rs b/examples/python.rs index b6f277f..c5d3b8a 100644 --- a/examples/python.rs +++ b/examples/python.rs @@ -1,4 +1,4 @@ -use expectrl::{repl::spawn_python, Regex}; +use expectrl::{repl::spawn_python, Regex, Expect}; #[cfg(not(feature = "async"))] fn main() { diff --git a/examples/shell.rs b/examples/shell.rs index 6adbe37..c07589d 100644 --- a/examples/shell.rs +++ b/examples/shell.rs @@ -1,13 +1,16 @@ use expectrl::repl::ReplSession; use std::io::Result; +use expectrl::process::Termios; #[cfg(all(unix, not(feature = "async")))] fn main() -> Result<()> { - let mut p = expectrl::spawn("sh")?; - p.get_process_mut().set_echo(true, None)?; - let mut shell = ReplSession::new(p, String::from("sh-5.1$"), Some(String::from("exit")), true); + let mut p = expectrl::spawn("sh")?; + p.set_echo(true)?; + let mut shell = ReplSession::new(p, String::from("sh-5.1$")); + shell.set_echo(true); + shell.set_quit_command("exit"); shell.expect_prompt()?; let output = exec(&mut shell, "echo Hello World")?; @@ -20,7 +23,7 @@ fn main() -> Result<()> { } #[cfg(all(unix, not(feature = "async")))] -fn exec(shell: &mut ReplSession, cmd: &str) -> Result { +fn exec(shell: &mut ReplSession, cmd: &str) -> Result { let buf = shell.execute(cmd)?; let mut string = String::from_utf8_lossy(&buf).into_owned(); string = string.replace("\r\n\u{1b}[?2004l\r", ""); diff --git a/src/check_macros.rs b/src/check_macros.rs index 41b6242..4ddbbea 100644 --- a/src/check_macros.rs +++ b/src/check_macros.rs @@ -101,7 +101,7 @@ macro_rules! check { // The question is which solution is more effichient. // I took the following approach because there's no chance we influence user's land via the variable name we pick. (@branch $session:expr, ($var:tt = $exp:expr => $body:tt, $($tail:tt)*), ($($default:tt)*)) => { - match $crate::session::Session::check($session, $exp) { + match $crate::Expect::check($session, $exp) { result if result.as_ref().map(|found| !found.is_empty()).unwrap_or(false) => { let $var = result.unwrap(); $body; diff --git a/src/expect.rs b/src/expect.rs new file mode 100644 index 0000000..ea13f55 --- /dev/null +++ b/src/expect.rs @@ -0,0 +1,67 @@ +use crate::{Captures, Error, Needle}; + +type Result = std::result::Result; + +pub trait Expect { + fn expect(&mut self, needle: N) -> Result + where + N: Needle; + + fn check(&mut self, needle: N) -> Result + where + N: Needle; + + fn is_matched(&mut self, needle: N) -> Result + where + N: Needle; + + /// Send buffer to the stream. + fn send(&mut self, buf: B) -> Result<()> + where + B: AsRef<[u8]>; + + /// Send line to the stream. + fn send_line(&mut self, buf: B) -> Result<()> + where + B: AsRef<[u8]>; +} + +impl Expect for &mut T +where + T: Expect, +{ + fn expect(&mut self, needle: N) -> Result + where + N: Needle, + { + T::expect(self, needle) + } + + fn check(&mut self, needle: N) -> Result + where + N: Needle, + { + T::check(self, needle) + } + + fn is_matched(&mut self, needle: N) -> Result + where + N: Needle, + { + T::is_matched(self, needle) + } + + fn send(&mut self, buf: B) -> Result<()> + where + B: AsRef<[u8]>, + { + T::send(self, buf) + } + + fn send_line(&mut self, buf: B) -> Result<()> + where + B: AsRef<[u8]>, + { + T::send_line(self, buf) + } +} diff --git a/src/interact/mod.rs b/src/interact/mod.rs index 97f14d0..9593d51 100644 --- a/src/interact/mod.rs +++ b/src/interact/mod.rs @@ -48,9 +48,7 @@ pub mod actions; mod context; -mod opts; mod session; pub use context::Context; -pub use opts::{InteractOptions, NoAction, NoFilter}; pub use session::InteractSession; diff --git a/src/interact/opts.rs b/src/interact/opts.rs deleted file mode 100644 index 24e08b3..0000000 --- a/src/interact/opts.rs +++ /dev/null @@ -1,175 +0,0 @@ -use std::borrow::Cow; - -use super::Context; -use crate::{session::OsProcess, Error, Session}; - -type Result = std::result::Result; - -/// Interact options (aka callbacks you can set to be callled being in an interactive mode). -#[derive(Debug)] -pub struct InteractOptions { - pub(crate) state: C, - pub(crate) input_filter: Option, - pub(crate) output_filter: Option, - pub(crate) input_action: Option, - pub(crate) output_action: Option, - pub(crate) idle_action: Option, -} - -type DefaultOps = InteractOptions< - C, - NoFilter, - NoFilter, - NoAction, I, O, C>, - NoAction, I, O, C>, - NoAction, I, O, C>, ->; - -impl Default for DefaultOps { - fn default() -> Self { - Self::new(()) - } -} - -impl DefaultOps { - /// Set a state. - pub fn new(state: C) -> Self { - Self { - state, - input_filter: None, - output_filter: None, - input_action: None, - output_action: None, - idle_action: None, - } - } -} - -impl InteractOptions { - /// Get a reference on state - pub fn get_state(&self) -> &C { - &self.state - } - - /// Get a mut reference on state - pub fn get_state_mut(&mut self) -> &mut C { - &mut self.state - } - - /// Returns a inner state. - pub fn into_inner(self) -> C { - self.state - } - - /// Sets the output filter. - /// The output_filter will be passed all the output from the child process. - /// - /// The filter isn't applied to user's `read` calls through the [`Context`] in callbacks. - pub fn output_filter(self, filter: F) -> InteractOptions - where - F: FnMut(&[u8]) -> Result>, - { - InteractOptions { - input_filter: self.input_filter, - output_filter: Some(filter), - input_action: self.input_action, - output_action: self.output_action, - idle_action: self.idle_action, - state: self.state, - } - } - - /// Sets the input filter. - /// The input_filter will be passed all the keyboard input from the user. - /// - /// The input_filter is run BEFORE the check for the escape_character. - /// The filter is called BEFORE calling a on_input callback if it's set. - pub fn input_filter(self, filter: F) -> InteractOptions - where - F: FnMut(&[u8]) -> Result>, - { - InteractOptions { - input_filter: Some(filter), - output_filter: self.output_filter, - input_action: self.input_action, - output_action: self.output_action, - idle_action: self.idle_action, - state: self.state, - } - } -} - -impl - InteractOptions, I, O, C>, OA, WA> -{ - /// Puts a hanlder which will be called when users input is detected. - /// - /// Be aware that currently async version doesn't take a Session as an argument. - /// See . - pub fn on_input(self, action: F) -> InteractOptions - where - F: FnMut(Context<'_, Session, I, O, C>) -> Result, - { - InteractOptions { - input_filter: self.input_filter, - output_filter: self.output_filter, - input_action: Some(action), - output_action: self.output_action, - idle_action: self.idle_action, - state: self.state, - } - } -} - -impl - InteractOptions, I, O, C>, WA> -{ - /// Puts a hanlder which will be called when process output is detected. - /// - /// IMPORTANT: - /// - /// Please be aware that your use of [Session::expect], [Session::check] and any `read` operation on session - /// will cause the read bytes not to apeard in the output stream! - pub fn on_output(self, action: F) -> InteractOptions - where - F: FnMut(Context<'_, Session, I, O, C>) -> Result, - { - InteractOptions { - input_filter: self.input_filter, - output_filter: self.output_filter, - input_action: self.input_action, - output_action: Some(action), - idle_action: self.idle_action, - state: self.state, - } - } -} - -impl - InteractOptions, I, O, C>> -{ - /// Puts a handler which will be called on each interaction when no input is detected. - pub fn on_idle(self, action: F) -> InteractOptions - where - F: FnMut(Context<'_, Session, I, O, C>) -> Result, - { - InteractOptions { - input_filter: self.input_filter, - output_filter: self.output_filter, - input_action: self.input_action, - output_action: self.output_action, - idle_action: Some(action), - state: self.state, - } - } -} - -/// A helper type to set a default action to [`InteractSession`]. -/// -/// [`InteractSession`]: crate::interact::InteractSession -pub type NoAction = fn(Context<'_, S, I, O, C>) -> Result; - -/// A helper type to set a default filter to [`InteractSession`]. -/// -/// [`InteractSession`]: crate::interact::InteractSession -pub type NoFilter = fn(&[u8]) -> Result>; diff --git a/src/interact/session.rs b/src/interact/session.rs index 9185fdd..ef90050 100644 --- a/src/interact/session.rs +++ b/src/interact/session.rs @@ -1,41 +1,73 @@ //! This module contains a [`InteractSession`] which runs an interact session with IO. +// todo: PtyProcess wait_echo optimize by not looping when timout is none + use std::{ - borrow::{BorrowMut, Cow}, + borrow::Cow, io::{ErrorKind, Write}, }; -use crate::{session::OsProcess, Error, Session}; - #[cfg(not(feature = "async"))] use std::io::Read; -use super::{Context, InteractOptions}; +use crate::{ + process::{Healthcheck, Termios}, + Error, Expect, +}; + +use crate::interact::Context; #[cfg(all(not(feature = "async"), not(feature = "polling")))] use crate::process::NonBlocking; +#[cfg(unix)] +use crate::process::unix::WaitStatus; + +type ExpectResult = Result; + /// InteractConfig represents options of an interactive session. -#[derive(Debug)] -pub struct InteractSession { +pub struct InteractSession { session: Session, input: Input, output: Output, escape_character: u8, #[cfg(unix)] - status: Option, + status: Option, + opts: InteractOptions, } -impl InteractSession { - /// Default escape character. - pub const ESCAPE: u8 = 29; // Ctrl-] +/// Interact options (aka callbacks you can set to be callled being in an interactive mode). +struct InteractOptions { + state: C, + input_filter: Option, + output_filter: Option, + input_action: Option>, + output_action: Option>, + idle_action: Option>, +} + +type OptAction = Box) -> ExpectResult>; + +type OptFilter = Box ExpectResult>>; - /// Creates a new object of [InteractSession]. - pub fn new(session: S, input: I, output: O) -> InteractSession { +impl InteractSession { + /// Default escape character. + pub const ESCAPE: u8 = 29; + + /// Creates a new object of [`InteractSession`]. + pub fn new(session: S, input: I, output: O, state: C) -> InteractSession { InteractSession { input, output, session, escape_character: Self::ESCAPE, + opts: InteractOptions { + state, + input_filter: None, + output_filter: None, + input_action: None, + output_action: None, + idle_action: None, + }, #[cfg(unix)] status: None, } @@ -54,50 +86,126 @@ impl InteractSession { /// Which sometimes happens and it's not considered to be a valid [`WaitStatus`], so None is returned. /// /// [`Self::spawn`]: crate::interact::InteractSession::spawn - /// [`WaitStatus`]: crate::WaitStatus #[cfg(unix)] - pub fn get_status(&self) -> Option { + pub fn get_status(&self) -> Option { self.status } } +impl InteractSession { + /// Set a state + pub fn set_state(self, state: State) -> InteractSession { + let mut s = InteractSession::new(self.session, self.input, self.output, state); + s.escape_character = self.escape_character; + #[cfg(unix)] + { + s.status = self.status; + } + + s + } + + /// Get a reference on state + pub fn get_state(&self) -> &C { + &self.opts.state + } + + /// Get a mut reference on state + pub fn get_state_mut(&mut self) -> &mut C { + &mut self.opts.state + } + + /// Returns a inner state. + pub fn into_state(self) -> C { + self.opts.state + } + + /// Sets the output filter. + /// The output_filter will be passed all the output from the child process. + /// + /// The filter isn't applied to user's `read` calls through the [`Context`] in callbacks. + pub fn output_filter(&mut self, filter: F) -> &mut Self + where + F: FnMut(&[u8]) -> ExpectResult> + 'static, + { + self.opts.output_filter = Some(Box::new(filter)); + self + } + + /// Sets the input filter. + /// The input_filter will be passed all the keyboard input from the user. + /// + /// The input_filter is run BEFORE the check for the escape_character. + /// The filter is called BEFORE calling a on_input callback if it's set. + pub fn input_filter(&mut self, filter: F) -> &mut Self + where + F: FnMut(&[u8]) -> ExpectResult> + 'static, + { + self.opts.input_filter = Some(Box::new(filter)); + self + } + + /// Puts a hanlder which will be called when users input is detected. + /// + /// Be aware that currently async version doesn't take a Session as an argument. + /// See . + pub fn on_input(&mut self, action: F) -> &mut Self + where + F: FnMut(Context<'_, S, I, O, C>) -> ExpectResult + 'static, + { + self.opts.input_action = Some(Box::new(action)); + self + } + + /// Puts a hanlder which will be called when process output is detected. + /// + /// IMPORTANT: + /// + /// Please be aware that your use of [Session::expect], [Session::check] and any `read` operation on session + /// will cause the read bytes not to apeard in the output stream! + pub fn on_output(&mut self, action: F) -> &mut Self + where + F: FnMut(Context<'_, S, I, O, C>) -> ExpectResult + 'static, + { + self.opts.output_action = Some(Box::new(action)); + self + } + + /// Puts a handler which will be called on each interaction when no input is detected. + pub fn on_idle(&mut self, action: F) -> &mut Self + where + F: FnMut(Context<'_, S, I, O, C>) -> ExpectResult + 'static, + { + self.opts.idle_action = Some(Box::new(action)); + self + } +} + #[cfg(not(any(feature = "async", feature = "polling")))] -impl InteractSession<&mut Session, I, O> +impl InteractSession where I: Read, O: Write, - S: NonBlocking + Write + Read, + S: Expect + Termios + Healthcheck + NonBlocking + Write + Read, { /// Runs the session. /// /// See [`Session::interact`]. /// /// [`Session::interact`]: crate::session::Session::interact - pub fn spawn(&mut self, mut ops: OPS) -> Result - where - OPS: BorrowMut>, - IF: FnMut(&[u8]) -> Result, Error>, - OF: FnMut(&[u8]) -> Result, Error>, - IA: FnMut(Context<'_, Session, I, O, C>) -> Result, - OA: FnMut(Context<'_, Session, I, O, C>) -> Result, - WA: FnMut(Context<'_, Session, I, O, C>) -> Result, - { + pub fn spawn(&mut self) -> ExpectResult { #[cfg(unix)] { - let is_echo = self - .session - .get_process() - .get_echo() - .map_err(|e| Error::unknown("failed to get echo", e.to_string()))?; + let is_echo = self.session.is_echo()?; if !is_echo { - let _ = self.session.get_process_mut().set_echo(true, None); + let _ = self.session.set_echo(true); } self.status = None; - let is_alive = interact_buzy_loop(self, ops.borrow_mut())?; + let is_alive = interact_buzy_loop(self)?; if !is_echo { - let _ = self.session.get_process_mut().set_echo(false, None); + let _ = self.session.set_echo(false); } Ok(is_alive) @@ -105,7 +213,7 @@ where #[cfg(windows)] { - interact_buzy_loop(self, ops.borrow_mut()) + interact_buzy_loop(self) } } } @@ -222,98 +330,92 @@ where } } -#[cfg(all(not(feature = "async"), not(feature = "polling")))] -fn interact_buzy_loop( - interact: &mut InteractSession<&mut Session, I, O>, - opts: &mut InteractOptions, -) -> Result +impl std::fmt::Debug for InteractSession where - S: NonBlocking + Write + Read, - I: Read, + S: std::fmt::Debug, + I: std::fmt::Debug, + O: std::fmt::Debug, + C: std::fmt::Debug, +{ + #[rustfmt::skip] + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("InteractSession") + .field("session", &self.session) + .field("input", &self.input) + .field("output", &self.output) + .field("escape_character", &self.escape_character) + .field("status", &self.status) + .field("state", &std::ptr::addr_of!(self.opts.state)) + .field("opts:on_idle", &get_pointer(&self.opts.idle_action)) + .field("opts:on_input", &get_pointer(&self.opts.input_action)) + .field("opts:on_output", &get_pointer(&self.opts.output_action)) + .field("opts:input_filter", &get_pointer(&self.opts.input_filter)) + .field("opts:output_filter", &get_pointer(&self.opts.output_filter)) + .finish() + } +} + +#[cfg(all(unix, not(feature = "async"), not(feature = "polling")))] +fn interact_buzy_loop(s: &mut InteractSession) -> ExpectResult +where + S: Healthcheck + NonBlocking + Write + Read, O: Write, - IF: FnMut(&[u8]) -> Result, Error>, - OF: FnMut(&[u8]) -> Result, Error>, - IA: FnMut(Context<'_, Session, I, O, C>) -> Result, - OA: FnMut(Context<'_, Session, I, O, C>) -> Result, - WA: FnMut(Context<'_, Session, I, O, C>) -> Result, + I: Read, { let mut buf = [0; 512]; + loop { - #[cfg(unix)] - { - let status = get_status(interact.session)?; - if !matches!(status, Some(crate::WaitStatus::StillAlive)) { - interact.status = status; - return Ok(false); - } + let status = get_status(&s.session)?; + if !matches!(status, Some(WaitStatus::StillAlive)) { + s.status = status; + return Ok(false); } - #[cfg(windows)] - { - if !interact.session.is_alive()? { - return Ok(false); + if let Some(n) = try_read(&mut s.session, &mut buf)? { + let eof = n == 0; + let buf = &buf[..n]; + let buf = call_filter(s.opts.output_filter.as_mut(), buf)?; + + #[rustfmt::skip] + let exit = opt_action( + Context::new(&mut s.session, &mut s.input, &mut s.output, &mut s.opts.state, &buf, eof), + &mut s.opts.output_action, + )?; + if eof || exit { + return Ok(true); } - } - match interact.session.try_read(&mut buf) { - Ok(n) => { - let eof = n == 0; - let buf = &buf[..n]; - let buf = call_filter(opts.output_filter.as_mut(), buf)?; - - let exit = call_action( - opts.output_action.as_mut(), - interact.session, - &mut interact.input, - &mut interact.output, - &mut opts.state, - &buf, - eof, - )?; - - if eof || exit { - return Ok(true); - } - - spin_write(&mut interact.output, &buf)?; - spin_flush(&mut interact.output)?; - } - Err(err) if err.kind() == ErrorKind::WouldBlock => {} - Err(err) => return Err(err.into()), + spin_write(&mut s.output, &buf)?; + spin_flush(&mut s.output)?; } // We dont't print user input back to the screen. // In terminal mode it will be ECHOed back automatically. // This way we preserve terminal seetings for example when user inputs password. // The terminal must have been prepared before. - match interact.input.read(&mut buf) { + match s.input.read(&mut buf) { Ok(n) => { let eof = n == 0; let buf = &buf[..n]; - let buf = call_filter(opts.input_filter.as_mut(), buf)?; + let buf = call_filter(s.opts.input_filter.as_mut(), buf)?; - let exit = call_action( - opts.input_action.as_mut(), - interact.session, - &mut interact.input, - &mut interact.output, - &mut opts.state, - &buf, - eof, + #[rustfmt::skip] + let exit = opt_action( + Context::new(&mut s.session, &mut s.input, &mut s.output, &mut s.opts.state, &buf, eof), + &mut s.opts.input_action, )?; - if eof | exit { return Ok(true); } - let escape_char_position = buf.iter().position(|c| *c == interact.escape_character); + let escape_char_position = buf.iter().position(|c| *c == s.escape_character); match escape_char_position { Some(pos) => { - interact.session.write_all(&buf[..pos])?; + s.session.write_all(&buf[..pos])?; return Ok(true); } None => { - interact.session.write_all(&buf[..])?; + s.session.write_all(&buf[..])?; } } } @@ -321,16 +423,11 @@ where Err(err) => return Err(err.into()), } - let exit = call_action( - opts.idle_action.as_mut(), - interact.session, - &mut interact.input, - &mut interact.output, - &mut opts.state, - &[], - false, + #[rustfmt::skip] + let exit = opt_action( + Context::new(&mut s.session, &mut s.input, &mut s.output, &mut s.opts.state, &buf, false), + &mut s.opts.idle_action, )?; - if exit { return Ok(true); } @@ -770,20 +867,12 @@ where } } -fn call_action( - action: Option, - s: &mut S, - r: &mut I, - w: &mut O, - state: &mut C, - buf: &[u8], - eof: bool, -) -> Result -where - F: FnMut(Context<'_, S, I, O, C>) -> Result, -{ - match action { - Some(mut action) => (action)(Context::new(s, r, w, state, buf, eof)), +fn opt_action( + ctx: Context<'_, S, I, O, C>, + opt: &mut Option>, +) -> ExpectResult { + match opt { + Some(action) => (action)(ctx), None => Ok(false), } } @@ -799,10 +888,38 @@ where } #[cfg(unix)] -fn get_status(session: &Session) -> Result, Error> { - match session.get_process().status() { +fn get_status(session: &S) -> Result, Error> +where + S: Healthcheck, +{ + match session.get_status() { Ok(status) => Ok(Some(status)), - Err(ptyprocess::errno::Errno::ECHILD | ptyprocess::errno::Errno::ESRCH) => Ok(None), - Err(err) => Err(Error::IO(std::io::Error::new(ErrorKind::Other, err))), + Err(err) if err.kind() == ErrorKind::WouldBlock => Ok(None), + Err(err) => Err(Error::IO(err)), } } + +#[cfg(unix)] +fn try_read(session: &mut S, buf: &mut [u8]) -> ExpectResult> +where + S: NonBlocking + Read, +{ + session.set_blocking(false)?; + + let result = session.read(buf); + + session.set_blocking(true)?; + + match result { + Ok(n) => Ok(Some(n)), + Err(err) if err.kind() == ErrorKind::WouldBlock => Ok(None), + Err(err) => Err(Error::IO(err)), + } +} + +fn get_pointer(ptr: &Option>) -> usize +where + T: ?Sized, +{ + ptr.as_ref().map_or(0, |f| std::ptr::addr_of!(f) as usize) +} diff --git a/src/lib.rs b/src/lib.rs index b8b8686..c314732 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -114,6 +114,7 @@ mod captures; mod check_macros; mod control_code; mod error; +mod expect; mod needle; #[cfg(all(windows, feature = "polling"))] @@ -130,11 +131,12 @@ pub use control_code::ControlCode; pub use error::Error; pub use needle::{Any, Eof, NBytes, Needle, Regex}; -#[cfg(unix)] -pub use ptyprocess::{Signal, WaitStatus}; - +pub use expect::Expect; pub use session::Session; +use process::Process; +use session::OsProcess; + /// Spawn spawnes a new session. /// /// It accepts a command and possibly arguments just as string. @@ -159,6 +161,9 @@ pub use session::Session; /// ``` /// /// [`Session::spawn`]: ./struct.Session.html?#spawn -pub fn spawn>(cmd: S) -> Result { +pub fn spawn(cmd: S) -> Result::Stream>, Error> +where + S: AsRef, +{ Session::spawn_cmd(cmd.as_ref()) } diff --git a/src/process/mod.rs b/src/process/mod.rs index 7d6f967..14dca4a 100644 --- a/src/process/mod.rs +++ b/src/process/mod.rs @@ -15,7 +15,9 @@ pub trait Process: Sized { type Stream; /// Spawn parses a given string as a commandline string and spawns it on a process. - fn spawn>(cmd: S) -> Result; + fn spawn(cmd: S) -> Result + where + S: AsRef; /// Spawn_command runs a process with a given command. fn spawn_command(command: Self::Command) -> Result; /// It opens a IO stream with a spawned process. @@ -25,17 +27,81 @@ pub trait Process: Sized { #[allow(clippy::wrong_self_convention)] /// Healthcheck represents a check by which we can determine if a spawned process is still alive. pub trait Healthcheck { + /// A status healthcheck can return. + type Status; + + /// The function returns a status of a process if it still alive and it can operate. + fn get_status(&self) -> Result; + /// The function returns a status of a process if it still alive and it can operate. - fn is_alive(&mut self) -> Result; + fn is_alive(&self) -> Result; +} + +impl Healthcheck for &T +where + T: Healthcheck, +{ + type Status = T::Status; + + fn get_status(&self) -> Result { + T::get_status(self) + } + + fn is_alive(&self) -> Result { + T::is_alive(self) + } +} + +impl Healthcheck for &mut T +where + T: Healthcheck, +{ + type Status = T::Status; + + fn get_status(&self) -> Result { + T::get_status(self) + } + + fn is_alive(&self) -> Result { + T::is_alive(self) + } } /// NonBlocking interface represens a [std::io::Read]er which can be turned in a non blocking mode /// so its read operations will return imideately. pub trait NonBlocking { - /// Sets a [std::io::Read]er into a non blocking mode. - fn set_non_blocking(&mut self) -> Result<()>; - /// Sets a [std::io::Read]er back into a blocking mode. - fn set_blocking(&mut self) -> Result<()>; + /// Sets a [std::io::Read]er into a non/blocking mode. + fn set_blocking(&mut self, on: bool) -> Result<()>; +} + +impl NonBlocking for &mut T +where + T: NonBlocking, +{ + fn set_blocking(&mut self, on: bool) -> Result<()> { + T::set_blocking(self, on) + } +} + +/// Terminal configuration trait, used for IO configuration. +pub trait Termios { + /// Verifies whether a [`std::io::Write`] will be repeated in output stream and be read by [`std::io::Read`]. + fn is_echo(&self) -> Result; + /// Configure a echo logic. + fn set_echo(&mut self, on: bool) -> Result; +} + +impl Termios for &mut T +where + T: Termios, +{ + fn is_echo(&self) -> Result { + T::is_echo(self) + } + + fn set_echo(&mut self, on: bool) -> Result { + T::set_echo(self, on) + } } #[cfg(feature = "async")] diff --git a/src/process/unix.rs b/src/process/unix.rs index b3acb89..b2a7b68 100644 --- a/src/process/unix.rs +++ b/src/process/unix.rs @@ -1,8 +1,18 @@ //! This module contains a Unix implementation of [crate::process::Process]. -use super::{Healthcheck, NonBlocking, Process}; -use crate::error::to_io_error; -use ptyprocess::{stream::Stream, PtyProcess}; +use std::{ + io::{self, ErrorKind, Read, Result, Write}, + ops::{Deref, DerefMut}, + os::unix::prelude::{AsRawFd, RawFd}, + process::Command, +}; + +use crate::{ + error::to_io_error, + process::{Healthcheck, NonBlocking, Process, Termios}, +}; + +use ptyprocess::{errno::Errno, stream::Stream, PtyProcess}; #[cfg(feature = "async")] use super::IntoAsyncStream; @@ -14,12 +24,7 @@ use std::{ task::{Context, Poll}, }; -use std::{ - io::{self, Read, Result, Write}, - ops::{Deref, DerefMut}, - os::unix::prelude::{AsRawFd, RawFd}, - process::Command, -}; +pub use ptyprocess::{Signal, WaitStatus}; /// A Unix representation of a [Process] via [PtyProcess] #[derive(Debug)] @@ -31,13 +36,13 @@ impl Process for UnixProcess { type Command = Command; type Stream = PtyStream; - fn spawn>(cmd: S) -> Result { + fn spawn(cmd: S) -> Result + where + S: AsRef, + { let args = tokenize_command(cmd.as_ref()); if args.is_empty() { - return Err(io::Error::new( - io::ErrorKind::Other, - "Failed to parse a command", - )); + return Err(io_error("failed to parse a command")); } let mut command = std::process::Command::new(&args[0]); @@ -63,10 +68,30 @@ impl Process for UnixProcess { } impl Healthcheck for UnixProcess { - fn is_alive(&mut self) -> Result { + type Status = WaitStatus; + + fn get_status(&self) -> Result { + get_status(&self.proc) + } + + fn is_alive(&self) -> Result { self.proc .is_alive() - .map_err(to_io_error("Failed to call pty.is_alive()")) + .map_err(to_io_error("failed to determine if process is alive")) + } +} + +impl Termios for UnixProcess { + fn is_echo(&self) -> Result { + let value = self.proc.get_echo()?; + + Ok(value) + } + + fn set_echo(&mut self, on: bool) -> Result { + let value = self.proc.set_echo(on, None)?; + + Ok(value) } } @@ -117,14 +142,12 @@ impl Read for PtyStream { } impl NonBlocking for PtyStream { - fn set_non_blocking(&mut self) -> Result<()> { - let fd = self.handle.as_raw_fd(); - make_non_blocking(fd, true) - } - - fn set_blocking(&mut self) -> Result<()> { + fn set_blocking(&mut self, on: bool) -> Result<()> { let fd = self.handle.as_raw_fd(); - make_non_blocking(fd, false) + match on { + true => make_non_blocking(fd, false), + false => make_non_blocking(fd, true), + } } } @@ -223,6 +246,20 @@ fn tokenize_command(program: &str) -> Vec { res } +fn get_status(proc: &PtyProcess) -> std::prelude::v1::Result { + match proc.status() { + Ok(status) => Ok(status), + Err(err) => match err { + Errno::ECHILD | Errno::ESRCH => Err(io::Error::new(ErrorKind::WouldBlock, err)), + err => Err(io::Error::new(ErrorKind::Other, err)), + }, + } +} + +fn io_error(msg: &str) -> io::Error { + io::Error::new(io::ErrorKind::Other, msg) +} + #[cfg(test)] mod tests { use super::*; diff --git a/src/repl.rs b/src/repl.rs index f61d7b9..8f1ca20 100644 --- a/src/repl.rs +++ b/src/repl.rs @@ -1,25 +1,32 @@ //! This module contains a list of special Sessions that can be spawned. +use std::{ + io, + ops::{Deref, DerefMut}, +}; + +#[cfg(unix)] +use std::process::Command; + use crate::{ error::Error, + process::{Healthcheck, Termios}, session::{OsProcess, OsProcessStream}, - Captures, Session, + Captures, Expect, Needle, Session, }; -use std::ops::{Deref, DerefMut}; #[cfg(not(feature = "async"))] use crate::process::NonBlocking; #[cfg(not(feature = "async"))] use std::io::{Read, Write}; -#[cfg(unix)] -use std::process::Command; - use crate::spawn; #[cfg(feature = "async")] use futures_lite::{AsyncRead, AsyncWrite}; +type ExpectResult = Result; + /// Spawn a bash session. /// /// It uses a custom prompt to be able to controll shell better. @@ -28,8 +35,9 @@ use futures_lite::{AsyncRead, AsyncWrite}; /// Because we don't handle echoes here (currently). Ideally we need to. #[cfg(unix)] #[cfg(not(feature = "async"))] -pub fn spawn_bash() -> Result { +pub fn spawn_bash() -> ExpectResult>> { const DEFAULT_PROMPT: &str = "EXPECT_PROMPT"; + let mut cmd = Command::new("bash"); let _ = cmd.env("PS1", DEFAULT_PROMPT); // bind 'set enable-bracketed-paste off' turns off paste mode, @@ -43,12 +51,8 @@ pub fn spawn_bash() -> Result { let session = crate::session::Session::spawn(cmd)?; - let mut bash = ReplSession::new( - session, - DEFAULT_PROMPT.to_string(), - Some("quit".to_string()), - false, - ); + let mut bash = ReplSession::new(session, DEFAULT_PROMPT); + bash.set_quit_command("quit"); // read a prompt to make it not available on next read. // @@ -66,7 +70,7 @@ pub fn spawn_bash() -> Result { /// It uses a custom prompt to be able to controll shell better. #[cfg(unix)] #[cfg(feature = "async")] -pub async fn spawn_bash() -> Result { +pub async fn spawn_bash() -> ExpectResult { const DEFAULT_PROMPT: &str = "EXPECT_PROMPT"; let mut cmd = Command::new("bash"); let _ = cmd.env("PS1", DEFAULT_PROMPT); @@ -96,20 +100,22 @@ pub async fn spawn_bash() -> Result { /// Spawn default python's IDLE. #[cfg(not(feature = "async"))] -pub fn spawn_python() -> Result { +pub fn spawn_python() -> ExpectResult>> { // todo: check windows here // If we spawn it as ProcAttr::default().commandline("python") it will spawn processes endlessly.... let session = spawn("python")?; - let mut idle = ReplSession::new(session, ">>> ".to_owned(), Some("quit()".to_owned()), false); + let mut idle = ReplSession::new(session, ">>> "); + idle.set_quit_command("quit()"); idle.expect_prompt()?; + Ok(idle) } /// Spawn default python's IDLE. #[cfg(feature = "async")] -pub async fn spawn_python() -> Result { +pub async fn spawn_python() -> ExpectResult { // todo: check windows here // If we spawn it as ProcAttr::default().commandline("python") it will spawn processes endlessly.... @@ -125,7 +131,7 @@ pub async fn spawn_python() -> Result { /// It uses a custom prompt to be able to controll the shell. #[cfg(windows)] #[cfg(not(feature = "async"))] -pub fn spawn_powershell() -> Result { +pub fn spawn_powershell() -> ExpectResult { const DEFAULT_PROMPT: &str = "EXPECTED_PROMPT>"; let session = spawn("pwsh -NoProfile -NonInteractive -NoLogo")?; let mut powershell = ReplSession::new( @@ -156,7 +162,7 @@ pub fn spawn_powershell() -> Result { /// It uses a custom prompt to be able to controll the shell. #[cfg(windows)] #[cfg(feature = "async")] -pub async fn spawn_powershell() -> Result { +pub async fn spawn_powershell() -> ExpectResult { const DEFAULT_PROMPT: &str = "EXPECTED_PROMPT>"; let session = spawn("pwsh -NoProfile -NonInteractive -NoLogo")?; let mut powershell = ReplSession::new( @@ -190,19 +196,19 @@ pub async fn spawn_powershell() -> Result { /// you have a prompt where a user inputs commands and the shell /// which executes them and manages IO streams. #[derive(Debug)] -pub struct ReplSession

{ +pub struct ReplSession { /// The prompt, used for `wait_for_prompt`, /// e.g. ">>> " for python. prompt: String, /// A pseudo-teletype session with a spawned process. - session: Session, + session: S, /// A command which will be called before termination. quit_command: Option, /// Flag to see if a echo is turned on. is_echo_on: bool, } -impl ReplSession { +impl ReplSession { /// Spawn function creates a repl session. /// /// The argument list is: @@ -210,20 +216,25 @@ impl ReplSession { /// - prompt; a string which will identify that the command was run. /// - quit_command; a command which will be called when [ReplSession] instance is dropped. /// - is_echo_on; determines whether the prompt check will be done twice. - pub fn new( - session: Session, - prompt: String, - quit_command: Option, - is_echo: bool, - ) -> Self { + pub fn new(session: S, prompt: impl Into) -> Self { Self { session, - prompt, - quit_command, - is_echo_on: is_echo, + prompt: prompt.into(), + quit_command: None, + is_echo_on: false, } } + /// Set echo settings to be expected. + pub fn set_echo(&mut self, on: bool) { + self.is_echo_on = on; + } + + /// Set quit command which will be called on `exit`. + pub fn set_quit_command(&mut self, cmd: impl Into) { + self.quit_command = Some(cmd.into()); + } + /// Get a used prompt. pub fn get_prompt(&self) -> &str { &self.prompt @@ -240,13 +251,26 @@ impl ReplSession { } /// Get an inner session. - pub fn into_session(self) -> Session { + pub fn into_session(self) -> S { self.session } + + /// Get an inner session. + pub fn get_session(&self) -> &S { + &self.session + } + + /// Get an inner session. + pub fn get_session_mut(&mut self) -> &mut S { + &mut self.session + } } #[cfg(not(feature = "async"))] -impl ReplSession { +impl ReplSession +where + S: Expect, +{ /// Block until prompt is found pub fn expect_prompt(&mut self) -> Result<(), Error> { let _ = self._expect_prompt()?; @@ -272,25 +296,37 @@ impl ReplSession { } #[cfg(not(feature = "async"))] -impl ReplSession { +impl ReplSession +where + S: Expect, +{ /// Send a command to a repl and verifies that it exited. /// Returning it's output. - pub fn execute + Clone>(&mut self, cmd: SS) -> Result, Error> { + pub fn execute(&mut self, cmd: C) -> Result, Error> + where + C: AsRef, + { self.send_line(cmd)?; let found = self._expect_prompt()?; - Ok(found.before().to_vec()) + let out = found.before().to_vec(); + + Ok(out) } /// Sends line to repl (and flush the output). /// /// If echo_on=true wait for the input to appear. #[cfg(not(feature = "async"))] - pub fn send_line>(&mut self, line: Text) -> Result<(), Error> { + pub fn send_line(&mut self, line: L) -> Result<(), Error> + where + L: AsRef, + { let text = line.as_ref(); self.session.send_line(text)?; if self.is_echo_on { - let _ = self.expect(line.as_ref())?; + let _ = self.get_session_mut().expect(line.as_ref())?; } + Ok(()) } @@ -340,16 +376,70 @@ impl ReplSession { } } -impl Deref for ReplSession { - type Target = Session; +impl Healthcheck for ReplSession +where + S: Healthcheck, +{ + type Status = S::Status; - fn deref(&self) -> &Self::Target { - &self.session + fn get_status(&self) -> io::Result { + self.get_session().get_status() + } + + fn is_alive(&self) -> io::Result { + self.get_session().is_alive() } } -impl DerefMut for ReplSession { - fn deref_mut(&mut self) -> &mut Self::Target { - &mut self.session +impl Termios for ReplSession +where + S: Termios, +{ + fn is_echo(&self) -> io::Result { + self.get_session().is_echo() + } + + fn set_echo(&mut self, on: bool) -> io::Result { + self.get_session_mut().set_echo(on) + } +} + +impl Expect for ReplSession +where + S: Expect, +{ + fn expect(&mut self, needle: N) -> ExpectResult + where + N: Needle, + { + S::expect(self.get_session_mut(), needle) + } + + fn check(&mut self, needle: N) -> ExpectResult + where + N: Needle, + { + S::check(self.get_session_mut(), needle) + } + + fn is_matched(&mut self, needle: N) -> ExpectResult + where + N: Needle, + { + S::is_matched(self.get_session_mut(), needle) + } + + fn send(&mut self, buf: B) -> ExpectResult<()> + where + B: AsRef<[u8]>, + { + S::send(self.get_session_mut(), buf) + } + + fn send_line(&mut self, buf: B) -> ExpectResult<()> + where + B: AsRef<[u8]>, + { + S::send_line(self.get_session_mut(), buf) } } diff --git a/src/session/mod.rs b/src/session/mod.rs index 5b4c961..f07bb5d 100644 --- a/src/session/mod.rs +++ b/src/session/mod.rs @@ -48,6 +48,8 @@ type OsProcStream = crate::process::windows::AsyncProcessStream; pub type OsProcess = OsProc; /// A type alias for OS process stream which is a default one for [`Session`]. pub type OsProcessStream = OsProcStream; +/// A type alias for OS session. +pub type OsSession = Session; #[cfg(feature = "async")] pub use async_session::Session; @@ -55,7 +57,7 @@ pub use async_session::Session; #[cfg(not(feature = "async"))] pub use sync_session::Session; -impl Session { +impl Session { /// Spawns a session on a platform process. /// /// # Example @@ -141,8 +143,8 @@ impl Session { /// ``` /// /// [`Read`]: std::io::Read - pub fn interact(&mut self, input: I, output: O) -> InteractSession<&mut Self, I, O> { - InteractSession::new(self, input, output) + pub fn interact(&mut self, input: I, output: O) -> InteractSession<&mut Self, I, O, ()> { + InteractSession::new(self, input, output, ()) } } diff --git a/src/session/sync_session.rs b/src/session/sync_session.rs index e185828..cb451f2 100644 --- a/src/session/sync_session.rs +++ b/src/session/sync_session.rs @@ -7,15 +7,16 @@ use std::{ use crate::{ error::Error, + expect::Expect, needle::Needle, - process::{Healthcheck, NonBlocking}, + process::{Healthcheck, NonBlocking, Process, Termios}, Captures, }; /// Session represents a spawned process and its streams. /// It controlls process and communication with it. #[derive(Debug)] -pub struct Session

{ +pub struct Session { proc: P, stream: TryStream, expect_timeout: Option, @@ -29,6 +30,7 @@ where /// Creates a new session. pub fn new(process: P, stream: S) -> io::Result { let stream = TryStream::new(stream)?; + Ok(Self { proc: process, stream, @@ -37,7 +39,7 @@ where }) } - pub(crate) fn swap_stream(mut self, new_stream: F) -> Result, Error> + pub(crate) fn swap_stream(mut self, new: F) -> Result, Error> where F: FnOnce(S) -> R, R: Read, @@ -46,10 +48,11 @@ where let buf = self.stream.get_available().to_owned(); let stream = self.stream.into_inner(); - let new_stream = new_stream(stream); + let stream = new(stream); - let mut session = Session::new(self.proc, new_stream)?; + let mut session = Session::new(self.proc, stream)?; session.stream.keep_in_buffer(&buf); + Ok(session) } } @@ -90,14 +93,10 @@ impl Session { } } -impl Session { - /// Verifies whether process is still alive. - pub fn is_alive(&mut self) -> Result { - self.proc.is_alive().map_err(|err| err.into()) - } -} - -impl Session { +impl Expect for Session +where + S: Write + Read + NonBlocking, +{ /// Expect waits until a pattern is matched. /// /// If the method returns [Ok] it is guaranteed that at least 1 match was found. @@ -136,7 +135,7 @@ impl Session { /// /// It returns an error if timeout is reached. /// You can specify a timeout value by [Session::set_expect_timeout] method. - pub fn expect(&mut self, needle: N) -> Result + fn expect(&mut self, needle: N) -> Result where N: Needle, { @@ -146,98 +145,6 @@ impl Session { } } - /// Expect which fills as much as possible to the buffer. - /// - /// See [Session::expect]. - fn expect_gready(&mut self, needle: N) -> Result - where - N: Needle, - { - let start = time::Instant::now(); - loop { - let eof = self.stream.read_available()?; - let data = self.stream.get_available(); - - let found = needle.check(data, eof)?; - if !found.is_empty() { - let end_index = Captures::right_most_index(&found); - let involved_bytes = data[..end_index].to_vec(); - self.stream.consume_available(end_index); - - return Ok(Captures::new(involved_bytes, found)); - } - - if eof { - return Err(Error::Eof); - } - - if let Some(timeout) = self.expect_timeout { - if start.elapsed() > timeout { - return Err(Error::ExpectTimeout); - } - } - } - } - - /// Expect which reads byte by byte. - /// - /// See [Session::expect]. - fn expect_lazy(&mut self, needle: N) -> Result - where - N: Needle, - { - let mut checking_data_length = 0; - let mut eof = false; - let start = time::Instant::now(); - loop { - let mut available = self.stream.get_available(); - if checking_data_length == available.len() { - // We read by byte to make things as lazy as possible. - // - // It's chose is important in using Regex as a Needle. - // Imagine we have a `\d+` regex. - // Using such buffer will match string `2` imidiately eventhough right after might be other digit. - // - // The second reason is - // if we wouldn't read by byte EOF indication could be lost. - // And next blocking std::io::Read operation could be blocked forever. - // - // We could read all data available via `read_available` to reduce IO operations, - // but in such case we would need to keep a EOF indicator internally in stream, - // which is OK if EOF happens onces, but I am not sure if this is a case. - eof = self.stream.read_available_once(&mut [0; 1])? == Some(0); - available = self.stream.get_available(); - } - - // We intentinally not increase the counter - // and run check one more time even though the data isn't changed. - // Because it may be important for custom implementations of Needle. - if checking_data_length < available.len() { - checking_data_length += 1; - } - - let data = &available[..checking_data_length]; - - let found = needle.check(data, eof)?; - if !found.is_empty() { - let end_index = Captures::right_most_index(&found); - let involved_bytes = data[..end_index].to_vec(); - self.stream.consume_available(end_index); - return Ok(Captures::new(involved_bytes, found)); - } - - if eof { - return Err(Error::Eof); - } - - if let Some(timeout) = self.expect_timeout { - if start.elapsed() > timeout { - return Err(Error::ExpectTimeout); - } - } - } - } - /// Check verifies if a pattern is matched. /// Returns empty found structure if nothing found. /// @@ -260,7 +167,7 @@ impl Session { /// let m = p.check(Regex("\\d+")).unwrap(); /// assert_eq!(m.get(0).unwrap(), b"123"); /// ``` - pub fn check(&mut self, needle: N) -> Result + fn check(&mut self, needle: N) -> Result where N: Needle, { @@ -314,7 +221,7 @@ impl Session { /// let m = p.is_matched(Regex("\\d+")).unwrap(); /// assert_eq!(m, true); /// ``` - pub fn is_matched(&mut self, needle: N) -> Result + fn is_matched(&mut self, needle: N) -> Result where N: Needle, { @@ -332,9 +239,7 @@ impl Session { Ok(false) } -} -impl Session { /// Send text to child’s STDIN. /// /// You can also use methods from [std::io::Write] instead. @@ -350,8 +255,13 @@ impl Session { /// proc.send(b"World"); /// proc.send(ControlCode::try_from("^C").unwrap()); /// ``` - pub fn send>(&mut self, buf: B) -> io::Result<()> { - self.stream.write_all(buf.as_ref()) + fn send(&mut self, buf: B) -> Result<(), Error> + where + B: AsRef<[u8]>, + { + self.stream.write_all(buf.as_ref())?; + + Ok(()) } /// Send a line to child’s STDIN. @@ -367,7 +277,10 @@ impl Session { /// proc.send_line(b"World"); /// proc.send_line(ControlCode::try_from("^C").unwrap()); /// ``` - pub fn send_line>(&mut self, buf: B) -> io::Result<()> { + fn send_line(&mut self, buf: B) -> Result<(), Error> + where + B: AsRef<[u8]>, + { #[cfg(windows)] const LINE_ENDING: &[u8] = b"\r\n"; #[cfg(not(windows))] @@ -380,11 +293,111 @@ impl Session { } } -impl Session { +impl Session +where + S: Read + NonBlocking, +{ + /// Expect which fills as much as possible to the buffer. + /// + /// See [Session::expect]. + fn expect_gready(&mut self, needle: N) -> Result + where + N: Needle, + { + let start = time::Instant::now(); + loop { + let eof = self.stream.read_available()?; + let data = self.stream.get_available(); + + let found = needle.check(data, eof)?; + if !found.is_empty() { + let end_index = Captures::right_most_index(&found); + let involved_bytes = data[..end_index].to_vec(); + self.stream.consume_available(end_index); + + return Ok(Captures::new(involved_bytes, found)); + } + + if eof { + return Err(Error::Eof); + } + + if let Some(timeout) = self.expect_timeout { + if start.elapsed() > timeout { + return Err(Error::ExpectTimeout); + } + } + } + } + + /// Expect which reads byte by byte. + /// + /// See [Session::expect]. + fn expect_lazy(&mut self, needle: N) -> Result + where + N: Needle, + { + let mut checking_data_length = 0; + let mut eof = false; + let start = time::Instant::now(); + loop { + let mut available = self.stream.get_available(); + if checking_data_length == available.len() { + // We read by byte to make things as lazy as possible. + // + // It's chose is important in using Regex as a Needle. + // Imagine we have a `\d+` regex. + // Using such buffer will match string `2` imidiately eventhough right after might be other digit. + // + // The second reason is + // if we wouldn't read by byte EOF indication could be lost. + // And next blocking std::io::Read operation could be blocked forever. + // + // We could read all data available via `read_available` to reduce IO operations, + // but in such case we would need to keep a EOF indicator internally in stream, + // which is OK if EOF happens onces, but I am not sure if this is a case. + eof = self.stream.read_available_once(&mut [0; 1])? == Some(0); + available = self.stream.get_available(); + } + + // We intentinally not increase the counter + // and run check one more time even though the data isn't changed. + // Because it may be important for custom implementations of Needle. + if checking_data_length < available.len() { + checking_data_length += 1; + } + + let data = &available[..checking_data_length]; + + let found = needle.check(data, eof)?; + if !found.is_empty() { + let end_index = Captures::right_most_index(&found); + let involved_bytes = data[..end_index].to_vec(); + self.stream.consume_available(end_index); + return Ok(Captures::new(involved_bytes, found)); + } + + if eof { + return Err(Error::Eof); + } + + if let Some(timeout) = self.expect_timeout { + if start.elapsed() > timeout { + return Err(Error::ExpectTimeout); + } + } + } + } +} + +impl Session +where + S: Read + NonBlocking, +{ /// Try to read in a non-blocking mode. /// - /// Returns `[std::io::ErrorKind::WouldBlock]` - /// in case if there's nothing to read. + /// Returns [`std::io::ErrorKind::WouldBlock`] + /// in case there's nothing to read. pub fn try_read(&mut self, buf: &mut [u8]) -> io::Result { self.stream.try_read(buf) } @@ -395,7 +408,10 @@ impl Session { } } -impl Write for Session { +impl Write for Session +where + S: Write, +{ fn write(&mut self, buf: &[u8]) -> std::io::Result { self.stream.write(buf) } @@ -409,13 +425,19 @@ impl Write for Session { } } -impl Read for Session { +impl Read for Session +where + S: Read, +{ fn read(&mut self, buf: &mut [u8]) -> std::io::Result { self.stream.read(buf) } } -impl BufRead for Session { +impl BufRead for Session +where + S: Read, +{ fn fill_buf(&mut self) -> std::io::Result<&[u8]> { self.stream.fill_buf() } @@ -425,6 +447,43 @@ impl BufRead for Session { } } +impl Healthcheck for Session +where + P: Healthcheck, +{ + type Status = P::Status; + + fn get_status(&self) -> io::Result { + self.get_process().get_status() + } + + fn is_alive(&self) -> io::Result { + self.get_process().is_alive() + } +} + +impl Termios for Session +where + P: Termios, +{ + fn is_echo(&self) -> io::Result { + self.get_process().is_echo() + } + + fn set_echo(&mut self, on: bool) -> io::Result { + self.get_process_mut().set_echo(on) + } +} + +impl NonBlocking for Session +where + S: NonBlocking, +{ + fn set_blocking(&mut self, on: bool) -> io::Result<()> { + S::set_blocking(self.get_stream_mut(), on) + } +} + #[derive(Debug)] struct TryStream { stream: ControlledReader, @@ -444,7 +503,10 @@ impl TryStream { } } -impl TryStream { +impl TryStream +where + S: Read, +{ /// The function returns a new Stream from a file. fn new(stream: S) -> io::Result { Ok(Self { @@ -471,18 +533,21 @@ impl TryStream { } } -impl TryStream { +impl TryStream +where + R: Read + NonBlocking, +{ /// Try to read in a non-blocking mode. /// /// It raises io::ErrorKind::WouldBlock if there's nothing to read. fn try_read(&mut self, buf: &mut [u8]) -> io::Result { - self.stream.get_mut().set_non_blocking()?; + self.stream.get_mut().set_blocking(false)?; let result = self.stream.inner.read(buf); // As file is DUPed changes in one descriptor affects all ones // so we need to make blocking file after we finished. - self.stream.get_mut().set_blocking()?; + self.stream.get_mut().set_blocking(true)?; result } @@ -529,19 +594,22 @@ impl TryStream { // non-buffered && non-blocking read fn try_read_inner(&mut self, buf: &mut [u8]) -> io::Result { - self.stream.get_mut().set_non_blocking()?; + self.stream.get_mut().set_blocking(false)?; let result = self.stream.get_mut().read(buf); // As file is DUPed changes in one descriptor affects all ones // so we need to make blocking file after we finished. - self.stream.get_mut().set_blocking()?; + self.stream.get_mut().set_blocking(true)?; result } } -impl Write for TryStream { +impl Write for TryStream +where + S: Write, +{ fn write(&mut self, buf: &[u8]) -> io::Result { self.stream.inner.get_mut().inner.write(buf) } @@ -555,13 +623,19 @@ impl Write for TryStream { } } -impl Read for TryStream { +impl Read for TryStream +where + R: Read, +{ fn read(&mut self, buf: &mut [u8]) -> io::Result { self.stream.inner.read(buf) } } -impl BufRead for TryStream { +impl BufRead for TryStream +where + R: Read, +{ fn fill_buf(&mut self) -> io::Result<&[u8]> { self.stream.inner.fill_buf() } @@ -576,7 +650,10 @@ struct ControlledReader { inner: BufReader>, } -impl ControlledReader { +impl ControlledReader +where + R: Read, +{ fn new(reader: R) -> Self { Self { inner: BufReader::new(BufferedReader::new(reader)), @@ -627,7 +704,10 @@ impl BufferedReader { } } -impl Read for BufferedReader { +impl Read for BufferedReader +where + R: Read, +{ fn read(&mut self, mut buf: &mut [u8]) -> std::io::Result { if self.buffer.is_empty() { self.inner.read(buf) diff --git a/src/stream/log.rs b/src/stream/log.rs index ffdc7ea..9b19638 100644 --- a/src/stream/log.rs +++ b/src/stream/log.rs @@ -82,13 +82,12 @@ impl Read for LogStream { } } -impl NonBlocking for LogStream { - fn set_non_blocking(&mut self) -> Result<()> { - self.stream.set_non_blocking() - } - - fn set_blocking(&mut self) -> Result<()> { - self.stream.set_blocking() +impl NonBlocking for LogStream +where + S: NonBlocking, +{ + fn set_blocking(&mut self, on: bool) -> Result<()> { + self.stream.set_blocking(on) } } diff --git a/tests/check.rs b/tests/check.rs index 51b8f05..4d26239 100644 --- a/tests/check.rs +++ b/tests/check.rs @@ -1,6 +1,6 @@ #![cfg(unix)] -use expectrl::{spawn, Any, Eof, NBytes, Regex}; +use expectrl::{spawn, Any, Eof, Expect, NBytes, Regex}; use std::thread; use std::time::Duration; diff --git a/tests/expect.rs b/tests/expect.rs index de5c046..435cd4a 100644 --- a/tests/expect.rs +++ b/tests/expect.rs @@ -1,6 +1,7 @@ -use expectrl::{spawn, Eof, NBytes, Regex}; use std::time::Duration; +use expectrl::{spawn, Eof, NBytes, Regex, Expect}; + #[cfg(not(feature = "async"))] use std::io::Read; diff --git a/tests/interact.rs b/tests/interact.rs index 5941ea2..86c43ef 100644 --- a/tests/interact.rs +++ b/tests/interact.rs @@ -5,6 +5,8 @@ use std::{ time::{Duration, Instant}, }; +use expectrl::Expect; + #[cfg(not(feature = "async"))] use std::io::sink; @@ -12,7 +14,7 @@ use std::io::sink; use expectrl::{interact::actions::lookup::Lookup, spawn, stream::stdin::Stdin, NBytes}; #[cfg(not(feature = "async"))] -use expectrl::WaitStatus; +use expectrl::process::unix::WaitStatus; #[cfg(unix)] #[cfg(not(feature = "async"))] diff --git a/tests/io.rs b/tests/io.rs index b28aabe..cfe048b 100644 --- a/tests/io.rs +++ b/tests/io.rs @@ -407,7 +407,7 @@ fn try_read_to_end() { Ok(0) => break, Ok(n) => buf.extend(&b[..n]), Err(err) if err.kind() == std::io::ErrorKind::WouldBlock => {} - Err(err) => Err(err).unwrap(), + Err(err) => panic!("{:?}", err), } } diff --git a/tests/repl.rs b/tests/repl.rs index fe18aa1..8fbd69b 100644 --- a/tests/repl.rs +++ b/tests/repl.rs @@ -1,8 +1,9 @@ #![cfg(unix)] use expectrl::{ + process::unix::WaitStatus, repl::{spawn_bash, spawn_python}, - ControlCode, WaitStatus, + ControlCode, Expect, }; #[cfg(feature = "async")] use futures_lite::io::AsyncBufReadExt; diff --git a/tests/session.rs b/tests/session.rs index d1b2238..e0ed279 100644 --- a/tests/session.rs +++ b/tests/session.rs @@ -1,4 +1,4 @@ -use expectrl::{spawn, Session}; +use expectrl::{session::OsSession, spawn, Expect}; #[cfg(feature = "async")] use futures_lite::io::{AsyncReadExt, AsyncWriteExt}; @@ -195,7 +195,7 @@ fn test_session_as_writer() { let _: Box = Box::new(spawn("ls").unwrap()); let _: Box = Box::new(spawn("ls").unwrap()); - fn _io_copy(mut session: Session) { + fn _io_copy(mut session: OsSession) { let _ = std::io::copy(&mut std::io::empty(), &mut session).unwrap(); } }