diff --git a/src/main.rs b/src/main.rs index 503466e..6efbf4b 100644 --- a/src/main.rs +++ b/src/main.rs @@ -19,6 +19,7 @@ use clap::Parser; use frost_federation::node::commands::CommandExecutor; use std::error::Error; +use tokio::sync::mpsc; use frost_federation::cli; use frost_federation::config; @@ -42,7 +43,8 @@ async fn main() -> Result<(), Box> { .static_key_pem(config.noise.key) .delivery_timeout(config.peer.delivery_timeout); - node.start(command_rx).await; + let (ready_tx, _ready_rx) = mpsc::channel(1); + node.start(command_rx, ready_tx).await; Ok(()) } diff --git a/src/node.rs b/src/node.rs index 099ed8c..8f91fc7 100644 --- a/src/node.rs +++ b/src/node.rs @@ -39,7 +39,8 @@ use tokio::{ tcp::{OwnedReadHalf, OwnedWriteHalf}, TcpListener, TcpStream, }, - sync::mpsc::Receiver, + sync::mpsc, + sync::oneshot, }; use tokio_util::codec::{FramedRead, FramedWrite, LengthDelimitedCodec}; use tower::Layer; @@ -116,8 +117,12 @@ impl Node { } /// Start node by listening, accepting and connecting to peers - pub async fn start(&mut self, command_rx: Receiver) { - log::debug!("Starting..."); + pub async fn start( + &mut self, + command_rx: mpsc::Receiver, + accept_ready_tx: mpsc::Sender<()>, + ) { + log::debug!("Starting... {}", self.bind_address); if self.connect_to_seeds().await.is_err() { log::info!("Connecting to seeds failed."); return; @@ -126,13 +131,15 @@ impl Node { if listener.is_err() { log::info!("Error starting listen"); } else { - let accept_task = self.start_accept(listener.unwrap()); + let accept_task = self.start_accept(listener.unwrap(), accept_ready_tx); let command_task = self.start_command_loop(command_rx); // Stop node when accept returns or Command asks us to stop. tokio::select! { _ = accept_task => { + log::debug!("Accept finished"); }, _ = command_task => { + log::debug!("Command finished"); } }; } @@ -177,11 +184,12 @@ impl Node { } /// Start accepting connections - pub async fn start_accept(&self, listener: TcpListener) { + pub async fn start_accept(&self, listener: TcpListener, accept_ready_tx: mpsc::Sender<()>) { log::debug!("Start accepting..."); let initiator = true; loop { log::debug!("Waiting on accept..."); + let _ = accept_ready_tx.clone().send(()).await; let (stream, socket_addr) = listener.accept().await.unwrap(); log::info!("Accept connection from {}", socket_addr); let (reader, writer) = self.build_reader_writer(stream); @@ -273,8 +281,8 @@ impl Node { pub async fn start_reliable_sender_receiver( &self, connection_handle: ConnectionHandle, - connection_receiver: Receiver, - ) -> (ReliableSenderHandle, Receiver) { + connection_receiver: mpsc::Receiver, + ) -> (ReliableSenderHandle, mpsc::Receiver) { let (client_receiver, reliable_sender_handle) = ReliableSenderHandle::start(connection_handle, connection_receiver).await; (reliable_sender_handle, client_receiver) @@ -284,7 +292,7 @@ impl Node { &self, addr: String, reliable_sender_handle: ReliableSenderHandle, - mut client_receiver: Receiver, + mut client_receiver: mpsc::Receiver, echo_broadcast_handle: EchoBroadcastHandle, ) { let membership_handle = self.state.membership_handle.clone(); diff --git a/src/node/commands.rs b/src/node/commands.rs index 7fc8b35..ce49a04 100644 --- a/src/node/commands.rs +++ b/src/node/commands.rs @@ -79,6 +79,7 @@ mod command_tests { use super::Node; #[mockall_double::double] use crate::node::echo_broadcast::EchoBroadcastHandle; + use tokio::sync::mpsc; #[tokio::test] async fn it_should_run_node_with_command_rx() { @@ -93,7 +94,8 @@ mod command_tests { .static_key_pem("a key".to_string()) .delivery_timeout(1000); - let node_task = node.start(command_rx); + let (ready_tx, _ready_rx) = mpsc::channel(1); + let node_task = node.start(command_rx, ready_tx); // Node shuts down on shutdown command let _ = exector.shutdown().await; node_task.await; diff --git a/tests/run_nodes_test.rs b/tests/run_nodes_test.rs index c497b23..2cd604c 100644 --- a/tests/run_nodes_test.rs +++ b/tests/run_nodes_test.rs @@ -17,49 +17,67 @@ // . mod node_integration_tests { + // Use the same noise public key for all nodes in these tests + const KEY: &str = " +-----BEGIN PRIVATE KEY----- +MFECAQEwBQYDK2VwBCIEIJ7pILqR7yBPsVuysfGyofjOEm19skmtqcJYWiVwjKH1 +gSEA68zeZuy7PMMQC9ECPmWqDl5AOFj5bi243F823ZVWtXY= +-----END PRIVATE KEY----- +"; use frost_federation::node; + use tokio::sync::mpsc; #[test] fn test_start_two_nodes_and_let_them_connect_without_an_error() { use frost_federation::node::commands::CommandExecutor; + let _ = env_logger::builder().is_test(true).try_init(); tokio::runtime::Builder::new_current_thread() - .enable_all() - .build() - .unwrap() - .block_on(async { - let mut node_b = node::Node::new() - .await - .seeds(vec!["localhost:6880".into()]) - .bind_address("localhost:6881".into()) - .static_key_pem("MFECAQEwBQYDK2VwBCIEIPCN4nC8Zn9jEKBc4jiUCPcHNdqQ8WgpyUv09eKJHSxfgSEAtJysJ2e6m4ze8Kz1zYjBByVR4EO/7iGRkTtd7cYOGi0=".into()) - .delivery_timeout(100); + .enable_all() + .build() + .unwrap() + .block_on(async { + let mut node = node::Node::new() + .await + .seeds(vec![]) + .bind_address("localhost:6880".into()) + .static_key_pem(KEY.into()) + .delivery_timeout(100); + let (ready_tx, mut ready_rx) = mpsc::channel(1); + let (_executor, command_rx) = CommandExecutor::new(); + let node_task = async { + node.start(command_rx, ready_tx).await; + }; - let (executor_b, command_rx_b) = CommandExecutor::new(); - let node_b_task = node_b.start(command_rx_b); + let mut node_b = node::Node::new() + .await + .seeds(vec!["localhost:6880".into()]) + .bind_address("localhost:6881".into()) + .static_key_pem(KEY.into()) + .delivery_timeout(100); - let mut node = node::Node::new() - .await - .seeds(vec![]) - .bind_address("localhost:6880".into()) - .static_key_pem("MFECAQEwBQYDK2VwBCIEIJ7pILqR7yBPsVuysfGyofjOEm19skmtqcJYWiVwjKH1gSEA68zeZuy7PMMQC9ECPmWqDl5AOFj5bi243F823ZVWtXY=".into()) - .delivery_timeout(100); + let (ready_tx_b, mut _ready_rx_b) = mpsc::channel(1); + let (executor_b, command_rx_b) = CommandExecutor::new(); + let node_b_task = async { + let _ = ready_rx.recv().await; + node_b.start(command_rx_b, ready_tx_b).await; + }; - let (_executor, command_rx) = CommandExecutor::new(); - let node_task = node.start(command_rx); - tokio::spawn(async move { - while let Ok(members) = executor_b.get_members().await { - if members.len() == 1 { - assert_eq!(members.len(), 1); - let _ = executor_b.shutdown().await; + tokio::spawn(async move { + while let Ok(members) = executor_b.get_members().await { + if members.len() == 1 { + assert_eq!(members.len(), 1); + let _ = executor_b.shutdown().await; + } } - }}); - tokio::select! { - _ = node_task => {} - _ = node_b_task => {} - } - }); + }); + + tokio::select! { + _ = node_task => {} + _ = node_b_task => {} + } + }); } }