Skip to content

Commit

Permalink
Add ready channel into Node::start
Browse files Browse the repository at this point in the history
Use the channel to start nodes in sequence during integration tests
  • Loading branch information
pool2win committed Oct 14, 2024
1 parent 2d634fa commit c7eb076
Show file tree
Hide file tree
Showing 4 changed files with 71 additions and 41 deletions.
4 changes: 3 additions & 1 deletion src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
use clap::Parser;
use frost_federation::node::commands::CommandExecutor;
use std::error::Error;
use tokio::sync::mpsc;

use frost_federation::cli;
use frost_federation::config;
Expand All @@ -42,7 +43,8 @@ async fn main() -> Result<(), Box<dyn Error>> {
.static_key_pem(config.noise.key)
.delivery_timeout(config.peer.delivery_timeout);

node.start(command_rx).await;
let (ready_tx, _ready_rx) = mpsc::channel(1);
node.start(command_rx, ready_tx).await;
Ok(())
}

Expand Down
24 changes: 16 additions & 8 deletions src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,8 @@ use tokio::{
tcp::{OwnedReadHalf, OwnedWriteHalf},
TcpListener, TcpStream,
},
sync::mpsc::Receiver,
sync::mpsc,
sync::oneshot,
};
use tokio_util::codec::{FramedRead, FramedWrite, LengthDelimitedCodec};
use tower::Layer;
Expand Down Expand Up @@ -116,8 +117,12 @@ impl Node {
}

/// Start node by listening, accepting and connecting to peers
pub async fn start(&mut self, command_rx: Receiver<Command>) {
log::debug!("Starting...");
pub async fn start(
&mut self,
command_rx: mpsc::Receiver<Command>,
accept_ready_tx: mpsc::Sender<()>,
) {
log::debug!("Starting... {}", self.bind_address);
if self.connect_to_seeds().await.is_err() {
log::info!("Connecting to seeds failed.");
return;
Expand All @@ -126,13 +131,15 @@ impl Node {
if listener.is_err() {
log::info!("Error starting listen");
} else {
let accept_task = self.start_accept(listener.unwrap());
let accept_task = self.start_accept(listener.unwrap(), accept_ready_tx);
let command_task = self.start_command_loop(command_rx);
// Stop node when accept returns or Command asks us to stop.
tokio::select! {
_ = accept_task => {
log::debug!("Accept finished");
},
_ = command_task => {
log::debug!("Command finished");
}
};
}
Expand Down Expand Up @@ -177,11 +184,12 @@ impl Node {
}

/// Start accepting connections
pub async fn start_accept(&self, listener: TcpListener) {
pub async fn start_accept(&self, listener: TcpListener, accept_ready_tx: mpsc::Sender<()>) {
log::debug!("Start accepting...");
let initiator = true;
loop {
log::debug!("Waiting on accept...");
let _ = accept_ready_tx.clone().send(()).await;
let (stream, socket_addr) = listener.accept().await.unwrap();
log::info!("Accept connection from {}", socket_addr);
let (reader, writer) = self.build_reader_writer(stream);
Expand Down Expand Up @@ -273,8 +281,8 @@ impl Node {
pub async fn start_reliable_sender_receiver(
&self,
connection_handle: ConnectionHandle,
connection_receiver: Receiver<ReliableNetworkMessage>,
) -> (ReliableSenderHandle, Receiver<Message>) {
connection_receiver: mpsc::Receiver<ReliableNetworkMessage>,
) -> (ReliableSenderHandle, mpsc::Receiver<Message>) {
let (client_receiver, reliable_sender_handle) =
ReliableSenderHandle::start(connection_handle, connection_receiver).await;
(reliable_sender_handle, client_receiver)
Expand All @@ -284,7 +292,7 @@ impl Node {
&self,
addr: String,
reliable_sender_handle: ReliableSenderHandle,
mut client_receiver: Receiver<Message>,
mut client_receiver: mpsc::Receiver<Message>,
echo_broadcast_handle: EchoBroadcastHandle,
) {
let membership_handle = self.state.membership_handle.clone();
Expand Down
4 changes: 3 additions & 1 deletion src/node/commands.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ mod command_tests {
use super::Node;
#[mockall_double::double]
use crate::node::echo_broadcast::EchoBroadcastHandle;
use tokio::sync::mpsc;

#[tokio::test]
async fn it_should_run_node_with_command_rx() {
Expand All @@ -93,7 +94,8 @@ mod command_tests {
.static_key_pem("a key".to_string())
.delivery_timeout(1000);

let node_task = node.start(command_rx);
let (ready_tx, _ready_rx) = mpsc::channel(1);
let node_task = node.start(command_rx, ready_tx);
// Node shuts down on shutdown command
let _ = exector.shutdown().await;
node_task.await;
Expand Down
80 changes: 49 additions & 31 deletions tests/run_nodes_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,49 +17,67 @@
// <https://www.gnu.org/licenses/>.

mod node_integration_tests {
// Use the same noise public key for all nodes in these tests
const KEY: &str = "
-----BEGIN PRIVATE KEY-----
MFECAQEwBQYDK2VwBCIEIJ7pILqR7yBPsVuysfGyofjOEm19skmtqcJYWiVwjKH1
gSEA68zeZuy7PMMQC9ECPmWqDl5AOFj5bi243F823ZVWtXY=
-----END PRIVATE KEY-----
";

use frost_federation::node;
use tokio::sync::mpsc;

#[test]
fn test_start_two_nodes_and_let_them_connect_without_an_error() {
use frost_federation::node::commands::CommandExecutor;
let _ = env_logger::builder().is_test(true).try_init();

tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.unwrap()
.block_on(async {
let mut node_b = node::Node::new()
.await
.seeds(vec!["localhost:6880".into()])
.bind_address("localhost:6881".into())
.static_key_pem("MFECAQEwBQYDK2VwBCIEIPCN4nC8Zn9jEKBc4jiUCPcHNdqQ8WgpyUv09eKJHSxfgSEAtJysJ2e6m4ze8Kz1zYjBByVR4EO/7iGRkTtd7cYOGi0=".into())
.delivery_timeout(100);
.enable_all()
.build()
.unwrap()
.block_on(async {
let mut node = node::Node::new()
.await
.seeds(vec![])
.bind_address("localhost:6880".into())
.static_key_pem(KEY.into())
.delivery_timeout(100);

let (ready_tx, mut ready_rx) = mpsc::channel(1);
let (_executor, command_rx) = CommandExecutor::new();
let node_task = async {
node.start(command_rx, ready_tx).await;
};

let (executor_b, command_rx_b) = CommandExecutor::new();
let node_b_task = node_b.start(command_rx_b);
let mut node_b = node::Node::new()
.await
.seeds(vec!["localhost:6880".into()])
.bind_address("localhost:6881".into())
.static_key_pem(KEY.into())
.delivery_timeout(100);

let mut node = node::Node::new()
.await
.seeds(vec![])
.bind_address("localhost:6880".into())
.static_key_pem("MFECAQEwBQYDK2VwBCIEIJ7pILqR7yBPsVuysfGyofjOEm19skmtqcJYWiVwjKH1gSEA68zeZuy7PMMQC9ECPmWqDl5AOFj5bi243F823ZVWtXY=".into())
.delivery_timeout(100);
let (ready_tx_b, mut _ready_rx_b) = mpsc::channel(1);
let (executor_b, command_rx_b) = CommandExecutor::new();
let node_b_task = async {
let _ = ready_rx.recv().await;
node_b.start(command_rx_b, ready_tx_b).await;
};

let (_executor, command_rx) = CommandExecutor::new();
let node_task = node.start(command_rx);
tokio::spawn(async move {
while let Ok(members) = executor_b.get_members().await {
if members.len() == 1 {
assert_eq!(members.len(), 1);
let _ = executor_b.shutdown().await;
tokio::spawn(async move {
while let Ok(members) = executor_b.get_members().await {
if members.len() == 1 {
assert_eq!(members.len(), 1);
let _ = executor_b.shutdown().await;
}
}
}});
tokio::select! {
_ = node_task => {}
_ = node_b_task => {}
}
});
});

tokio::select! {
_ = node_task => {}
_ = node_b_task => {}
}
});
}
}

0 comments on commit c7eb076

Please sign in to comment.