From e3b6478e74da63dede20d01e53ed07a8a4189232 Mon Sep 17 00:00:00 2001 From: pool2win Date: Sat, 5 Oct 2024 11:17:00 +0200 Subject: [PATCH] Use echo broadcast from node as a showcase --- src/node.rs | 21 ++++++++++++++++++++- 1 file changed, 20 insertions(+), 1 deletion(-) diff --git a/src/node.rs b/src/node.rs index 2c27d7c..8f61690 100644 --- a/src/node.rs +++ b/src/node.rs @@ -16,7 +16,10 @@ // along with Frost-Federation. If not, see // . +use self::echo_broadcast::{start_echo_broadcast, EchoBroadcastHandle}; use self::{membership::MembershipHandle, protocol::Message}; +use crate::node::echo_broadcast::service::EchoBroadcast; +use crate::node::protocol::PingMessage; use crate::node::reliable_sender::service::ReliableSend; use crate::node::reliable_sender::ReliableNetworkMessage; #[mockall_double::double] @@ -28,6 +31,7 @@ use crate::node::{ }; #[mockall_double::double] use connection::ConnectionHandle; +use protocol::message_id_generator::MessageIdGenerator; use std::error::Error; use tokio::{ net::{ @@ -54,12 +58,15 @@ pub struct Node { pub static_key_pem: String, pub delivery_timeout: u64, pub(crate) state: State, + pub(crate) echo_broadcast_handle: EchoBroadcastHandle, } impl Node { /// Use builder pattern pub async fn new() -> Self { let bind_address = "localhost".to_string(); + let message_id_generator = MessageIdGenerator::new(bind_address.clone()); + let echo_broadcast_handle = start_echo_broadcast(message_id_generator).await; Node { seeds: vec!["localhost:6680".to_string()], bind_address: bind_address.clone(), @@ -68,6 +75,7 @@ impl Node { state: State { membership_handle: MembershipHandle::start(bind_address).await, }, + echo_broadcast_handle, } } @@ -178,7 +186,8 @@ impl Node { return; } let node_id = self.get_node_id(); - let handshake_service = protocol::Protocol::new(node_id); + + let handshake_service = protocol::Protocol::new(node_id.clone()); let reliable_sender_service = ReliableSend::new(handshake_service, reliable_sender_handle); let timeout_layer = tower::timeout::TimeoutLayer::new( @@ -188,6 +197,16 @@ impl Node { .layer(reliable_sender_service) .oneshot(HandshakeMessage::default_as_message()) .await; + + let ping_service = protocol::Protocol::new(node_id.clone()); + let echo_broadcast_service = EchoBroadcast::new( + ping_service, + self.echo_broadcast_handle.clone(), + self.state.clone(), + ); + let _ = echo_broadcast_service + .oneshot(PingMessage::default_as_message()) + .await; } }