diff --git a/Cargo.toml b/Cargo.toml index 335afb48..359033c8 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -11,27 +11,27 @@ repository = "https://github.com/slowtec/tokio-modbus" edition = "2018" [dependencies] -bytes = "0.5" +bytes = "1" byteorder = "1" futures = { version = "0.3", optional = true } futures-util = { version = "0.3", default-features = false } log = "0.4" net2 = { version = "0.2", optional = true, default-features = false } smallvec = { version = "1", default-features = false } -# rt-core should be enabled only with "server" feature. Waiting for https://github.com/rust-lang/cargo/issues/5954 -tokio = { version = "0.2", features = ["rt-core"] } -tokio-util = { version = "0.2", features = ["codec"] } +# rt should be enabled only with "server" feature. Waiting for https://github.com/rust-lang/cargo/issues/5954 +tokio = { version = "1", features = ["rt", "rt-multi-thread"] } +tokio-util = { version = "0.6", features = ["codec"] } # Disable default-features to exclude unused dependency on libudev -tokio-serial = { version = "4.3.3", optional = true, default-features = false } +serial-io = { version = "0.3", features = ["tokio"], optional = true, default-features = false } [dev-dependencies] env_logger = "0.7" futures = "0.3" -tokio = { version = "0.2", features = ["tcp", "macros", "io-util"] } +tokio = { version = "1", features = ["net", "macros", "io-util"] } [features] default = ["tcp", "rtu", "sync"] -rtu = ["tokio-serial", "futures-util/sink"] +rtu = ["serial-io", "futures-util/sink"] tcp = ["tokio/net", "futures-util/sink"] sync = [] server = ["net2", "futures"] @@ -41,3 +41,38 @@ tcp-server-unstable = ["server"] travis-ci = { repository = "slowtec/tokio-modbus" } coveralls = { repository = "slowtec/tokio-modbus", branch = "master", service = "github" } maintenance = { status = "actively-developed" } + +[[example]] +name = "rtu-client-shared-context" +path = "examples/rtu-client-shared-context.rs" +required-features = ["rtu"] + +[[example]] +name = "rtu-client-sync" +path = "examples/rtu-client-sync.rs" +required-features = ["rtu", "sync"] + +[[example]] +name = "rtu-client" +path = "examples/rtu-client.rs" +required-features = ["rtu"] + +[[example]] +name = "tcp-client-custom-fn" +path = "examples/tcp-client-custom-fn.rs" +required-features = ["tcp"] + +[[example]] +name = "tcp-client-sync" +path = "examples/tcp-client-sync.rs" +required-features = ["tcp", "sync"] + +[[example]] +name = "tcp-client" +path = "examples/tcp-client.rs" +required-features = ["tcp"] + +[[example]] +name = "tcp-server" +path = "examples/tcp-server.rs" +required-features = ["tcp", "server", "tcp-server-unstable"] diff --git a/examples/rtu-client-shared_context.rs b/examples/rtu-client-shared-context.rs similarity index 78% rename from examples/rtu-client-shared_context.rs rename to examples/rtu-client-shared-context.rs index 91353640..c98621c0 100644 --- a/examples/rtu-client-shared_context.rs +++ b/examples/rtu-client-shared-context.rs @@ -1,28 +1,26 @@ -#[cfg(feature = "rtu")] -#[tokio::main] +#[tokio::main(flavor = "current_thread")] pub async fn main() -> Result<(), Box> { use std::{cell::RefCell, future::Future, io::Error, pin::Pin, rc::Rc}; + use serial_io::{build, AsyncSerial, SerialPortBuilder}; use tokio_modbus::client::{ rtu, util::{reconnect_shared_context, NewContext, SharedContext}, Context, }; use tokio_modbus::prelude::*; - use tokio_serial::{Serial, SerialPortSettings}; const SLAVE_1: Slave = Slave(0x01); const SLAVE_2: Slave = Slave(0x02); #[derive(Debug)] struct SerialConfig { - path: String, - settings: SerialPortSettings, + builder: SerialPortBuilder, } impl NewContext for SerialConfig { fn new_context(&self) -> Pin>>> { - let serial = Serial::from_path(&self.path, &self.settings); + let serial = AsyncSerial::from_builder(&self.builder); Box::pin(async { let port = serial?; rtu::connect(port).await @@ -31,11 +29,7 @@ pub async fn main() -> Result<(), Box> { } let serial_config = SerialConfig { - path: "/dev/ttyUSB0".into(), - settings: SerialPortSettings { - baud_rate: 19200, - ..Default::default() - }, + builder: build("/dev/ttyUSB0", 19200), }; println!("Configuration: {:?}", serial_config); @@ -68,9 +62,3 @@ pub async fn main() -> Result<(), Box> { Ok(()) } - -#[cfg(not(feature = "rtu"))] -pub fn main() { - println!("feature `rtu` is required to run this example"); - std::process::exit(1); -} diff --git a/examples/rtu-client-sync.rs b/examples/rtu-client-sync.rs new file mode 100644 index 00000000..53d7101b --- /dev/null +++ b/examples/rtu-client-sync.rs @@ -0,0 +1,17 @@ +pub fn main() -> Result<(), Box> { + use serial_io::build; + + use tokio_modbus::prelude::*; + + let tty_path = "/dev/ttyUSB0"; + let slave = Slave(0x17); + + let builder = build(tty_path, 19200); + + let mut ctx = sync::rtu::connect_slave(&builder, slave)?; + println!("Reading a sensor value"); + let rsp = ctx.read_holding_registers(0x082B, 2)?; + println!("Sensor value is: {:?}", rsp); + + Ok(()) +} diff --git a/examples/rtu-client.rs b/examples/rtu-client.rs index ffc01c20..f2e391db 100644 --- a/examples/rtu-client.rs +++ b/examples/rtu-client.rs @@ -1,16 +1,14 @@ -#[cfg(feature = "rtu")] -#[tokio::main] -async fn main() -> Result<(), Box> { - use tokio_serial::{Serial, SerialPortSettings}; +#[tokio::main(flavor = "current_thread")] +pub async fn main() -> Result<(), Box> { + use serial_io::{build, AsyncSerial}; use tokio_modbus::prelude::*; let tty_path = "/dev/ttyUSB0"; let slave = Slave(0x17); - let mut settings = SerialPortSettings::default(); - settings.baud_rate = 19200; - let port = Serial::from_path(tty_path, &settings).unwrap(); + let builder = build(tty_path, 19200); + let port = AsyncSerial::from_builder(&builder).unwrap(); let mut ctx = rtu::connect_slave(port, slave).await?; println!("Reading a sensor value"); @@ -19,9 +17,3 @@ async fn main() -> Result<(), Box> { Ok(()) } - -#[cfg(not(feature = "rtu"))] -pub fn main() { - println!("feature `rtu` is required to run this example"); - std::process::exit(1); -} diff --git a/examples/tcp-client-custom-fn.rs b/examples/tcp-client-custom-fn.rs index f1a74fa6..8568ab76 100644 --- a/examples/tcp-client-custom-fn.rs +++ b/examples/tcp-client-custom-fn.rs @@ -1,5 +1,4 @@ -#[cfg(feature = "tcp")] -#[tokio::main] +#[tokio::main(flavor = "current_thread")] pub async fn main() -> Result<(), Box> { use tokio_modbus::prelude::*; @@ -15,15 +14,9 @@ pub async fn main() -> Result<(), Box> { println!("Result for function {} is '{:?}'", f, rsp); } _ => { - panic!("unexpeted result"); + panic!("unexpected result"); } } Ok(()) } - -#[cfg(not(feature = "tcp"))] -pub fn main() { - println!("feature `tcp` is required to run this example"); - std::process::exit(1); -} diff --git a/examples/tcp-client.rs b/examples/tcp-client.rs index 62af027a..7231b283 100644 --- a/examples/tcp-client.rs +++ b/examples/tcp-client.rs @@ -1,5 +1,4 @@ -#[cfg(feature = "tcp")] -#[tokio::main] +#[tokio::main(flavor = "current_thread")] pub async fn main() -> Result<(), Box> { use tokio_modbus::prelude::*; @@ -20,9 +19,3 @@ pub async fn main() -> Result<(), Box> { Ok(()) } - -#[cfg(not(feature = "tcp"))] -pub fn main() { - println!("feature `tcp` is required to run this example"); - std::process::exit(1); -} diff --git a/examples/tcp-server.rs b/examples/tcp-server.rs index 667ec6be..91ce38ab 100644 --- a/examples/tcp-server.rs +++ b/examples/tcp-server.rs @@ -1,5 +1,4 @@ -#[cfg(all(feature = "tcp", feature = "server"))] -#[tokio::main] +#[tokio::main(flavor = "current_thread")] pub async fn main() -> Result<(), Box> { use futures::future; use std::{thread, time::Duration}; @@ -44,9 +43,3 @@ pub async fn main() -> Result<(), Box> { Ok(()) } - -#[cfg(not(all(feature = "tcp", feature = "server")))] -pub fn main() { - println!("both `tcp` and `server` features is required to run this example"); - std::process::exit(1); -} diff --git a/src/client/sync/rtu.rs b/src/client/sync/rtu.rs index a65520f0..df0d68f5 100644 --- a/src/client/sync/rtu.rs +++ b/src/client/sync/rtu.rs @@ -3,22 +3,20 @@ use super::{Context, Result}; use crate::client::rtu::connect_slave as async_connect_slave; use crate::slave::Slave; -use tokio_serial::{Serial, SerialPortSettings}; +use serial_io::{AsyncSerial, SerialPortBuilder}; /// Connect to no particular Modbus slave device for sending /// broadcast messages. -pub fn connect(tty_path: &str, settings: &SerialPortSettings) -> Result { - connect_slave(tty_path, settings, Slave::broadcast()) +pub fn connect(builder: &SerialPortBuilder) -> Result { + connect_slave(builder, Slave::broadcast()) } /// Connect to any kind of Modbus slave device. -pub fn connect_slave( - tty_path: &str, - settings: &SerialPortSettings, - slave: Slave, -) -> Result { - let mut rt = tokio::runtime::Runtime::new()?; - let serial = Serial::from_path(tty_path, settings)?; +pub fn connect_slave(builder: &SerialPortBuilder, slave: Slave) -> Result { + let rt = tokio::runtime::Builder::new_current_thread() + .enable_io() + .build()?; + let serial = AsyncSerial::from_builder(builder).unwrap(); let async_ctx = rt.block_on(async_connect_slave(serial, slave))?; let sync_ctx = Context { core: rt, diff --git a/src/client/sync/tcp.rs b/src/client/sync/tcp.rs index 83cdb807..f562bed1 100644 --- a/src/client/sync/tcp.rs +++ b/src/client/sync/tcp.rs @@ -13,7 +13,9 @@ pub fn connect(socket_addr: SocketAddr) -> Result { /// gateway that is forwarding messages to/from the corresponding unit identified /// by the slave parameter. pub fn connect_slave(socket_addr: SocketAddr, slave: Slave) -> Result { - let mut rt = tokio::runtime::Runtime::new()?; + let rt = tokio::runtime::Builder::new_current_thread() + .enable_io() + .build()?; let async_ctx = rt.block_on(async_connect_slave(socket_addr, slave))?; let sync_ctx = Context { core: rt, diff --git a/src/codec/rtu.rs b/src/codec/rtu.rs index 450ce3ff..b029d633 100644 --- a/src/codec/rtu.rs +++ b/src/codec/rtu.rs @@ -343,8 +343,7 @@ impl Decoder for ServerCodec { } } -impl Encoder for ClientCodec { - type Item = RequestAdu; +impl Encoder for ClientCodec { type Error = Error; fn encode(&mut self, adu: RequestAdu, buf: &mut BytesMut) -> Result<()> { @@ -369,8 +368,7 @@ impl Encoder for ClientCodec { } } -impl Encoder for ServerCodec { - type Item = ResponseAdu; +impl Encoder for ServerCodec { type Error = Error; fn encode(&mut self, adu: ResponseAdu, buf: &mut BytesMut) -> Result<()> { diff --git a/src/codec/tcp.rs b/src/codec/tcp.rs index 8ec2eea6..c871033f 100644 --- a/src/codec/tcp.rs +++ b/src/codec/tcp.rs @@ -128,8 +128,7 @@ impl Decoder for ServerCodec { } } -impl Encoder for ClientCodec { - type Item = RequestAdu; +impl Encoder for ClientCodec { type Error = Error; fn encode(&mut self, adu: RequestAdu, buf: &mut BytesMut) -> Result<()> { @@ -154,8 +153,7 @@ impl Encoder for ClientCodec { } } -impl Encoder for ServerCodec { - type Item = ResponseAdu; +impl Encoder for ServerCodec { type Error = Error; fn encode(&mut self, adu: ResponseAdu, buf: &mut BytesMut) -> Result<()> { diff --git a/src/lib.rs b/src/lib.rs index b38d4fef..da360856 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -51,8 +51,8 @@ //! ## TCP client //! //! ```rust,no_run -//! #[cfg(feature = "tcp")] -//! #[tokio::main] +//! # #[cfg(feature = "tcp")] +//! #[tokio::main(flavor = "current_thread")] //! pub async fn main() -> Result<(), Box> { //! use std::future::Future; //! use tokio::runtime::Runtime; @@ -71,7 +71,7 @@ //! ## Sync TCP client //! //! ```rust,no_run -//! #[cfg(all(feature = "tcp", feature = "sync"))] +//! # #[cfg(all(feature = "tcp", feature = "sync"))] //! pub fn main() -> Result<(), Box> { //! use tokio_modbus::prelude::*; //! @@ -88,19 +88,18 @@ //! ## RTU client //! //! ```rust,no_run -//! #[cfg(feature = "rtu")] -//! #[tokio::main] -//! async fn main() -> Result<(), Box> { -//! use tokio_serial::{Serial, SerialPortSettings}; +//! # #[cfg(feature = "rtu")] +//! #[tokio::main(flavor = "current_thread")] +//! pub async fn main() -> Result<(), Box> { +//! use serial_io::{build, AsyncSerial}; //! //! use tokio_modbus::prelude::*; //! //! let tty_path = "/dev/ttyUSB0"; //! let slave = Slave(0x17); //! -//! let mut settings = SerialPortSettings::default(); -//! settings.baud_rate = 19200; -//! let port = Serial::from_path(tty_path, &settings).unwrap(); +//! let builder = build(tty_path, 19200); +//! let port = AsyncSerial::from_builder(&builder).unwrap(); //! //! let mut ctx = rtu::connect_slave(port, slave).await?; //! println!("Reading a sensor value"); @@ -111,6 +110,29 @@ //! } //! ``` //! +//! ## Sync RTU client +//! +//! ```rust,no_run +//! # #[cfg(all(feature = "rtu", feature = "sync"))] +//! pub fn main() -> Result<(), Box> { +//! use serial_io::{build, AsyncSerial}; +//! +//! use tokio_modbus::prelude::*; +//! +//! let tty_path = "/dev/ttyUSB0"; +//! let slave = Slave(0x17); +//! +//! let builder = build(tty_path, 19200); +//! +//! let mut ctx = sync::rtu::connect_slave(&builder, slave)?; +//! println!("Reading a sensor value"); +//! let rsp = ctx.read_holding_registers(0x082B, 2)?; +//! println!("Sensor value is: {:?}", rsp); +//! +//! Ok(()) +//! } +//! ``` +//! //! More examples can be found in the [examples](https://github.com/slowtec/tokio-modbus/tree/master/examples) folder. //! //! # Protocol-Specification diff --git a/src/server/tcp.rs b/src/server/tcp.rs index 725319b9..7ecda921 100644 --- a/src/server/tcp.rs +++ b/src/server/tcp.rs @@ -80,12 +80,15 @@ where S::Instance: 'static + Send + Sync, Sd: Future + Unpin + Send + Sync + 'static, { - let mut rt = tokio::runtime::Runtime::new().unwrap(); + let rt = tokio::runtime::Builder::new_multi_thread() + .enable_io() + .build() + .unwrap(); let new_service = Arc::new(new_service); let server = async { - let mut listener = listener(&addr, workers).unwrap(); + let listener = listener(&addr, workers).unwrap(); loop { let (stream, _) = listener.accept().await?; @@ -193,7 +196,7 @@ mod tests { #[derive(Clone)] struct DummyService { response: Response, - }; + } impl Service for DummyService { type Request = Request; diff --git a/src/service/rtu.rs b/src/service/rtu.rs index 03a6cf55..72b0d2f0 100644 --- a/src/service/rtu.rs +++ b/src/service/rtu.rs @@ -105,7 +105,7 @@ mod tests { pin::Pin, task::{Context, Poll}, }; - use tokio::io::{AsyncRead, AsyncWrite, Result}; + use tokio::io::{AsyncRead, AsyncWrite, ReadBuf, Result}; struct MockTransport; @@ -117,9 +117,9 @@ mod tests { fn poll_read( self: Pin<&mut Self>, _: &mut Context, - _: &mut [u8], - ) -> Poll> { - Poll::Ready(Ok(0)) + _: &mut ReadBuf, + ) -> Poll> { + Poll::Ready(Ok(())) } }