diff --git a/devbox.lock b/devbox.lock index e6a24bda..89366eb1 100644 --- a/devbox.lock +++ b/devbox.lock @@ -7,31 +7,11 @@ }, "darwin.apple_sdk.frameworks.Security": { "resolved": "github:NixOS/nixpkgs/75a52265bda7fd25e06e3a67dee3f0354e73243c#darwin.apple_sdk.frameworks.Security", - "source": "nixpkg", - "systems": { - "aarch64-darwin": { - "outputs": [ - { - "path": "/nix/store/9jbhn62ld50znjqxll0q5gq9i4disc1s-apple-framework-Security-11.0.0", - "default": true - } - ] - } - } + "source": "nixpkg" }, "darwin.apple_sdk.frameworks.SystemConfiguration": { "resolved": "github:NixOS/nixpkgs/75a52265bda7fd25e06e3a67dee3f0354e73243c#darwin.apple_sdk.frameworks.SystemConfiguration", - "source": "nixpkg", - "systems": { - "aarch64-darwin": { - "outputs": [ - { - "path": "/nix/store/jaxmz1dvsz8il7nkxwzw8xfxac3sfz5v-apple-framework-SystemConfiguration-11.0.0", - "default": true - } - ] - } - } + "source": "nixpkg" }, "git@latest": { "last_modified": "2024-08-14T11:41:26Z", diff --git a/flmodules/README.md b/flmodules/README.md index 491dce0f..e337c206 100644 --- a/flmodules/README.md +++ b/flmodules/README.md @@ -24,6 +24,11 @@ mode - requesting list of available events from other nodes. the signalling server. - `web_proxy` allows sending a http GET request to another node, using the other node as a proxy. +- `overlay` is an intermediate layer that contains all messages to be implemented +for current and future communication layers. +Currently it's implemented for `random_connections` and `network`. +Future implementations might include a `dht_router` and a `mix_net` communication +layer. # Adding your own modules diff --git a/flmodules/src/gossip_events/broker.rs b/flmodules/src/gossip_events/broker.rs index e4d00924..a1e304c2 100644 --- a/flmodules/src/gossip_events/broker.rs +++ b/flmodules/src/gossip_events/broker.rs @@ -2,15 +2,17 @@ use std::sync::mpsc::{channel, Receiver, Sender}; use flarch::{ broker::{Broker, BrokerError, Subsystem, SubsystemHandler}, - nodeids::{NodeID, U256}, platform_async_trait, + nodeids::{NodeID, U256}, + platform_async_trait, }; use super::{ core::{Category, Event, EventsStorage}, - messages::{Config, GossipEvents, GossipIn, GossipMessage, GossipOut, MessageNode}, + messages::{Config, GossipEvents, GossipIn, GossipMessage, GossipOut}, }; use crate::{ - random_connections::messages::{ModuleMessage, RandomIn, RandomMessage, RandomOut}, + overlay::messages::NetworkWrapper, + random_connections::messages::{RandomIn, RandomMessage, RandomOut}, timer::TimerMessage, }; @@ -23,7 +25,7 @@ pub struct GossipBroker { pub storage: EventsStorage, /// Represents the underlying broker. pub broker: Broker, - /// Is used to pass the EventsStorage structure from the Transalate to the GossipLink. + /// Is used to pass the EventsStorage structure from the Translate to the GossipLink. storage_rx: Receiver, } @@ -116,16 +118,10 @@ impl Translate { fn link_rnd_gossip(msg: RandomMessage) -> Option { if let RandomMessage::Output(msg_out) = msg { match msg_out { - RandomOut::ListUpdate(list) => Some(GossipIn::NodeList(list.into()).into()), - RandomOut::NodeMessageFromNetwork(id, msg) => { - if msg.module == MODULE_NAME { - serde_yaml::from_str::(&msg.msg) - .ok() - .map(|msg_node| GossipIn::Node(id, msg_node).into()) - } else { - None - } - } + RandomOut::NodeIDsConnected(list) => Some(GossipIn::NodeList(list.into()).into()), + RandomOut::NodeMessageFromNetwork(id, msg) => msg + .unwrap_yaml(MODULE_NAME) + .map(|msg| GossipIn::Node(id, msg).into()), _ => None, } } else { @@ -138,10 +134,7 @@ impl Translate { Some( RandomIn::NodeMessageToNetwork( id, - ModuleMessage { - module: MODULE_NAME.into(), - msg: serde_yaml::to_string(&msg_node).unwrap(), - }, + NetworkWrapper::wrap_yaml(MODULE_NAME, &msg_node).unwrap(), ) .into(), ) @@ -191,6 +184,7 @@ mod tests { use std::error::Error; use crate::gossip_events::core::{Category, Event}; + use crate::gossip_events::messages::MessageNode; use flarch::nodeids::NodeID; use flarch::{start_logging, tasks::now}; @@ -207,7 +201,7 @@ mod tests { let id2 = NodeID::rnd(); let (tap_rnd, _) = broker_rnd.get_tap_sync().await?; broker_rnd - .settle_msg(RandomMessage::Output(RandomOut::ListUpdate( + .settle_msg(RandomMessage::Output(RandomOut::NodeIDsConnected( vec![id2].into(), ))) .await?; @@ -224,10 +218,7 @@ mod tests { .settle_msg( RandomOut::NodeMessageFromNetwork( id2, - ModuleMessage { - module: MODULE_NAME.into(), - msg: serde_yaml::to_string(&msg).unwrap(), - }, + NetworkWrapper::wrap_yaml(MODULE_NAME, &msg).unwrap(), ) .into(), ) diff --git a/flmodules/src/lib.rs b/flmodules/src/lib.rs index 23c0be23..cd36484a 100644 --- a/flmodules/src/lib.rs +++ b/flmodules/src/lib.rs @@ -20,3 +20,4 @@ pub mod timer; pub mod ping; pub mod web_proxy; pub mod network; +pub mod overlay; diff --git a/flmodules/src/overlay/README.md b/flmodules/src/overlay/README.md new file mode 100644 index 00000000..0264491a --- /dev/null +++ b/flmodules/src/overlay/README.md @@ -0,0 +1,13 @@ +# Overlay + +Gives an abstraction to allow for different network overlays, e.g., random_connections, dht_network, loopix, and +others. It defines a basic set of messages and a broker, which can then be extended to work with the different +network connection modules. + +Two examples are implemented: +- `OverlayRandom` uses the `RandomConnection` broker to handle the network and forwards messages as appropriate +- `OverlayDirect` uses the `Network` broker to handle the network. One problem with this is that the `Connected` +and `Disconnected` messages from the `Network` broker need to be handled here. + +To implement a new example who has the `Available`, `Connected`, `Disconnected` messages, the simplest way is to +copy `OverlayRandom` into a new broker. \ No newline at end of file diff --git a/flmodules/src/overlay/broker.rs b/flmodules/src/overlay/broker.rs new file mode 100644 index 00000000..68556733 --- /dev/null +++ b/flmodules/src/overlay/broker.rs @@ -0,0 +1,288 @@ +use flarch::{ + broker::{Broker, BrokerError, Subsystem, SubsystemHandler}, + nodeids::U256, + platform_async_trait, +}; + +use super::messages::{OverlayIn, OverlayInternal, OverlayMessage, OverlayOut}; +use crate::{ + network::messages::{NetworkIn, NetworkMessage, NetworkOut}, + nodeconfig::NodeInfo, + random_connections::messages::{RandomIn, RandomMessage, RandomOut}, +}; + +pub struct OverlayRandom {} + +impl OverlayRandom { + pub async fn start( + random: Broker, + ) -> Result, BrokerError> { + let mut b = Broker::new(); + // Translate RandomOut to OverlayOut, and OverlayIn to RandomIn. + // A module connected to the Broker will get translations of the + // RandomOut messages, and can send messages to RandomIn using the Overlay. + b.link_bi( + random, + Box::new(|msg| { + if let RandomMessage::Output(out) = msg { + let ret = match out { + RandomOut::NodeIDsConnected(node_ids) => { + OverlayOut::NodeIDsConnected(node_ids) + } + RandomOut::NodeInfosConnected(infos) => { + OverlayOut::NodeInfosConnected(infos) + } + RandomOut::NodeMessageFromNetwork(id, module_message) => { + OverlayOut::NetworkMapperFromNetwork(id, module_message) + } + _ => return None, + }; + return Some(ret.into()); + } + None + }), + Box::new(|msg| { + if let OverlayMessage::Input(input) = msg { + let ret = match input { + OverlayIn::NetworkWrapperToNetwork(id, module_message) => { + RandomIn::NodeMessageToNetwork(id, module_message) + } + }; + return Some(RandomMessage::Input(ret)); + } + None + }), + ) + .await?; + Ok(b) + } +} + +/** + * Connects directly to the Network broker. + * Because the Network broker only delivers 'Connected' and 'Disconnected' + * messages, this implementation has to do some bookkeeping to send out + * the correct messages. + */ +pub struct OverlayDirect { + nodes_available: Vec, + nodes_connected: Vec, +} + +impl OverlayDirect { + pub async fn start( + direct: Broker, + ) -> Result, BrokerError> { + let mut b = Broker::new(); + // Subsystem for handling Connected and Disconnected messages from the Network broker. + b.add_subsystem(Subsystem::Handler(Box::new(OverlayDirect { + nodes_available: vec![], + nodes_connected: vec![], + }))) + .await?; + + // Translate NetworkOut to OverlayOut, and OverlayIn to NetworkIn. + // A module connected to the Broker will get translations of the + // NetworkOut messages, and can send messages to NetworkIn using the Overlay. + b.link_bi( + direct, + Box::new(|msg| { + if let NetworkMessage::Output(out) = msg { + return match out { + NetworkOut::RcvNodeMessage(id, msg_str) => { + serde_yaml::from_str(&msg_str).ok().map(|module_message| { + OverlayOut::NetworkMapperFromNetwork(id, module_message).into() + }) + } + NetworkOut::RcvWSUpdateList(vec) => { + Some(OverlayInternal::Available(vec).into()) + } + NetworkOut::Connected(id) => Some(OverlayInternal::Connected(id).into()), + NetworkOut::Disconnected(id) => { + Some(OverlayInternal::Disconnected(id).into()) + } + _ => None, + }; + } + None + }), + Box::new(|msg| { + if let OverlayMessage::Input(input) = msg { + let ret = match input { + OverlayIn::NetworkWrapperToNetwork(id, module_message) => { + if let Ok(msg_str) = serde_yaml::to_string(&module_message) { + NetworkIn::SendNodeMessage(id, msg_str) + } else { + return None; + } + } + }; + return Some(NetworkMessage::Input(ret)); + } + None + }), + ) + .await?; + Ok(b) + } + + fn available_connected(&self, id: U256) -> (bool, bool) { + ( + self.nodes_available + .iter() + .find(|info| info.get_id() == id) + .is_some(), + self.nodes_connected.contains(&id), + ) + } +} + +#[platform_async_trait()] +impl SubsystemHandler for OverlayDirect { + async fn messages(&mut self, msgs: Vec) -> Vec { + let mut ret = false; + // Keep track of available and connected / disconnected nodes. + for msg in msgs { + if let OverlayMessage::Internal(internal) = msg { + match internal { + OverlayInternal::Connected(id) => { + if self.available_connected(id) == (true, false) { + self.nodes_connected.push(id); + ret = true; + } + } + OverlayInternal::Disconnected(id) => { + if self.available_connected(id).1 == true { + self.nodes_connected.retain(|&other| other != id); + ret = true; + } + } + OverlayInternal::Available(vec) => { + self.nodes_available = vec; + ret = true; + } + } + } + } + + // If something changed, output all relevant messages. + if ret { + vec![ + OverlayOut::NodeInfoAvailable(self.nodes_available.clone()), + OverlayOut::NodeIDsConnected(self.nodes_connected.clone().into()), + OverlayOut::NodeInfosConnected( + self.nodes_available + .iter() + .filter(|node| self.nodes_connected.contains(&node.get_id())) + .cloned() + .collect(), + ), + ] + .into_iter() + .map(|msg| msg.into()) + .collect() + } else { + vec![] + } + } +} + +#[cfg(test)] +mod test { + use flarch::nodeids::NodeID; + + use crate::nodeconfig::NodeConfig; + + use super::*; + + fn check_msgs(msgs: Vec, available: &[NodeInfo], connected: &[NodeInfo]) { + assert_eq!(3, msgs.len()); + assert_eq!( + OverlayMessage::Output(OverlayOut::NodeInfoAvailable(available.to_vec())), + msgs[0] + ); + assert_eq!( + OverlayMessage::Output(OverlayOut::NodeIDsConnected( + connected + .iter() + .map(|info| info.get_id()) + .collect::>() + .into() + )), + msgs[1] + ); + assert_eq!( + OverlayMessage::Output(OverlayOut::NodeInfosConnected(connected.to_vec())), + msgs[2] + ); + } + + #[tokio::test] + async fn test_direct_dis_connect() -> Result<(), BrokerError> { + let mut od = OverlayDirect { + nodes_available: vec![], + nodes_connected: vec![], + }; + let nodes = [NodeConfig::new().info, NodeConfig::new().info]; + let node_unknown = NodeConfig::new().info.get_id(); + + // Start with two new nodes, but not yet connected. + let msgs = od + .messages(vec![OverlayInternal::Available(vec![ + nodes[0].clone(), + nodes[1].clone(), + ]) + .into()]) + .await; + check_msgs(msgs, &nodes, &[]); + + // Connect first node 0, then node 1 + let msgs = od + .messages(vec![OverlayInternal::Connected(nodes[0].get_id()).into()]) + .await; + check_msgs(msgs, &nodes, &[nodes[0].clone()]); + + let msgs = od + .messages(vec![OverlayInternal::Connected(nodes[1].get_id()).into()]) + .await; + check_msgs(msgs, &nodes, &nodes); + + // Re-connect node 1, should do nothing + let msgs = od + .messages(vec![OverlayInternal::Connected(nodes[1].get_id()).into()]) + .await; + assert_eq!(0, msgs.len()); + + // Connect an unknown node - this should be ignored + let msgs = od + .messages(vec![OverlayInternal::Connected(node_unknown).into()]) + .await; + assert_eq!(0, msgs.len()); + + // Disconnect an unknown node - twice, in case it keeps it and would fail to remove it when + // it isn't here anymore. + let msgs = od + .messages(vec![OverlayInternal::Disconnected(node_unknown).into()]) + .await; + assert_eq!(0, msgs.len()); + + let msgs = od + .messages(vec![OverlayInternal::Disconnected(node_unknown).into()]) + .await; + assert_eq!(0, msgs.len()); + + // Disconnect a node + let msgs = od + .messages(vec![OverlayInternal::Disconnected(nodes[0].get_id()).into()]) + .await; + check_msgs(msgs, &nodes, &[nodes[1].clone()]); + + // Disconnect an unconnected node + let msgs = od + .messages(vec![OverlayInternal::Disconnected(nodes[0].get_id()).into()]) + .await; + assert_eq!(0, msgs.len()); + + Ok(()) + } +} diff --git a/flmodules/src/overlay/messages.rs b/flmodules/src/overlay/messages.rs new file mode 100644 index 00000000..d04fc04c --- /dev/null +++ b/flmodules/src/overlay/messages.rs @@ -0,0 +1,73 @@ +use flarch::nodeids::{NodeID, NodeIDs, U256}; +use serde::{de::DeserializeOwned, Deserialize, Serialize}; + +use crate::nodeconfig::NodeInfo; + +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] +pub struct NetworkWrapper { + pub module: String, + pub msg: String, +} + +#[derive(Clone, Debug, PartialEq)] +pub enum OverlayMessage { + Input(OverlayIn), + Output(OverlayOut), + Internal(OverlayInternal), +} + +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] +pub enum OverlayIn { + NetworkWrapperToNetwork(NodeID, NetworkWrapper), +} + +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] +pub enum OverlayOut { + NodeInfoAvailable(Vec), + NodeIDsConnected(NodeIDs), + NodeInfosConnected(Vec), + NetworkMapperFromNetwork(NodeID, NetworkWrapper), +} + +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] +pub enum OverlayInternal { + Connected(U256), + Disconnected(U256), + Available(Vec), +} + +impl From for OverlayMessage { + fn from(value: OverlayOut) -> Self { + OverlayMessage::Output(value) + } +} + +impl From for OverlayMessage { + fn from(value: OverlayIn) -> Self { + OverlayMessage::Input(value) + } +} + +impl From for OverlayMessage { + fn from(value: OverlayInternal) -> Self { + OverlayMessage::Internal(value) + } +} + +impl NetworkWrapper { + pub fn wrap_yaml(module: &str, msg: &T) -> Result { + Ok(Self { + module: module.into(), + msg: serde_yaml::to_string(msg)?, + }) + } + + pub fn unwrap_yaml(&self, module: &str) -> Option { + if self.module == module { + if let Ok(msg) = serde_yaml::from_str(&self.msg) { + return Some(msg); + } + } + None + } +} diff --git a/flmodules/src/overlay/mod.rs b/flmodules/src/overlay/mod.rs new file mode 100644 index 00000000..1ce4ede3 --- /dev/null +++ b/flmodules/src/overlay/mod.rs @@ -0,0 +1,2 @@ +pub mod broker; +pub mod messages; \ No newline at end of file diff --git a/flmodules/src/ping/broker.rs b/flmodules/src/ping/broker.rs index 600f9e9a..79fbb517 100644 --- a/flmodules/src/ping/broker.rs +++ b/flmodules/src/ping/broker.rs @@ -6,13 +6,14 @@ use flarch::{ }; use crate::{ - random_connections::messages::{ModuleMessage, RandomIn, RandomMessage, RandomOut}, + overlay::messages::NetworkWrapper, + random_connections::messages::{RandomIn, RandomMessage, RandomOut}, timer::TimerMessage, }; use super::{ - messages::{MessageNode, Ping, PingConfig, PingIn, PingMessage, PingOut}, core::PingStorage, + messages::{Ping, PingConfig, PingIn, PingMessage, PingOut}, }; const MODULE_NAME: &str = "Ping"; @@ -89,16 +90,10 @@ impl Translate { if let RandomMessage::Output(msg_out) = msg { match msg_out { RandomOut::DisconnectNode(id) => Some(PingIn::DisconnectNode(id).into()), - RandomOut::ListUpdate(list) => Some(PingIn::NodeList(list.into()).into()), - RandomOut::NodeMessageFromNetwork(id, msg) => { - if msg.module == MODULE_NAME { - serde_yaml::from_str::(&msg.msg) - .ok() - .map(|msg_node| PingIn::Message(id, msg_node).into()) - } else { - None - } - } + RandomOut::NodeIDsConnected(list) => Some(PingIn::NodeList(list.into()).into()), + RandomOut::NodeMessageFromNetwork(id, msg) => msg + .unwrap_yaml(MODULE_NAME) + .map(|msg| PingIn::Message(id, msg).into()), _ => None, } } else { @@ -112,10 +107,7 @@ impl Translate { PingOut::Message(id, msg_node) => Some( RandomIn::NodeMessageToNetwork( id, - ModuleMessage { - module: MODULE_NAME.into(), - msg: serde_yaml::to_string(&msg_node).unwrap(), - }, + NetworkWrapper::wrap_yaml(MODULE_NAME, &msg_node).unwrap(), ) .into(), ), diff --git a/flmodules/src/random_connections/messages.rs b/flmodules/src/random_connections/messages.rs index b3ccdd8b..eeac7a4e 100644 --- a/flmodules/src/random_connections/messages.rs +++ b/flmodules/src/random_connections/messages.rs @@ -3,22 +3,16 @@ use serde::{Deserialize, Serialize}; use flarch::nodeids::{NodeID, NodeIDs, U256}; -use crate::nodeconfig::NodeInfo; +use crate::{nodeconfig::NodeInfo, overlay::messages::NetworkWrapper}; use super::core::RandomStorage; #[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] pub enum NodeMessage { - Module(ModuleMessage), + Module(NetworkWrapper), DropConnection, } -#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] -pub struct ModuleMessage { - pub module: String, - pub msg: String, -} - #[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] pub enum RandomMessage { Input(RandomIn), @@ -32,7 +26,7 @@ pub enum RandomIn { NodeConnected(NodeID), NodeDisconnected(NodeID), NodeMessageFromNetwork(NodeID, NodeMessage), - NodeMessageToNetwork(NodeID, ModuleMessage), + NodeMessageToNetwork(NodeID, NetworkWrapper), Tick, } @@ -40,10 +34,10 @@ pub enum RandomIn { pub enum RandomOut { ConnectNode(NodeID), DisconnectNode(NodeID), - ListUpdate(NodeIDs), - NodeInfoConnected(Vec), + NodeIDsConnected(NodeIDs), + NodeInfosConnected(Vec), NodeMessageToNetwork(NodeID, NodeMessage), - NodeMessageFromNetwork(NodeID, ModuleMessage), + NodeMessageFromNetwork(NodeID, NetworkWrapper), Storage(RandomStorage), } @@ -113,7 +107,7 @@ impl RandomConnections { ); vec![ RandomOut::DisconnectNode(dst), - RandomOut::ListUpdate(self.storage.connected.get_nodes()), + RandomOut::NodeIDsConnected(self.storage.connected.get_nodes()), ] } } @@ -200,8 +194,8 @@ impl RandomConnections { fn update(&self) -> Vec { vec![ - RandomOut::ListUpdate(self.storage.connected.get_nodes()), - RandomOut::NodeInfoConnected(self.storage.get_connected_info()), + RandomOut::NodeIDsConnected(self.storage.connected.get_nodes()), + RandomOut::NodeInfosConnected(self.storage.get_connected_info()), RandomOut::Storage(self.storage.clone()), ] } diff --git a/flmodules/src/template/broker.rs b/flmodules/src/template/broker.rs index 7d6070e6..cc181ebe 100644 --- a/flmodules/src/template/broker.rs +++ b/flmodules/src/template/broker.rs @@ -2,7 +2,10 @@ use flarch::{data_storage::DataStorage, platform_async_trait, tasks::spawn_local use std::error::Error; use tokio::sync::watch; -use crate::random_connections::messages::{ModuleMessage, RandomIn, RandomMessage, RandomOut}; +use crate::{ + overlay::messages::NetworkWrapper, + random_connections::messages::{RandomIn, RandomMessage, RandomOut}, +}; use flarch::{ broker::{Broker, BrokerError, Subsystem, SubsystemHandler}, nodeids::NodeID, @@ -100,16 +103,12 @@ impl Translate { fn link_rnd_template(msg: RandomMessage) -> Option { if let RandomMessage::Output(msg_out) = msg { match msg_out { - RandomOut::ListUpdate(list) => Some(TemplateIn::UpdateNodeList(list.into()).into()), - RandomOut::NodeMessageFromNetwork(id, msg) => { - if msg.module == MODULE_NAME { - serde_yaml::from_str::(&msg.msg) - .ok() - .map(|msg_node| TemplateIn::Node(id, msg_node).into()) - } else { - None - } + RandomOut::NodeIDsConnected(list) => { + Some(TemplateIn::UpdateNodeList(list.into()).into()) } + RandomOut::NodeMessageFromNetwork(id, msg) => msg + .unwrap_yaml(MODULE_NAME) + .map(|msg| TemplateIn::Node(id, msg).into()), _ => None, } } else { @@ -122,10 +121,7 @@ impl Translate { Some( RandomIn::NodeMessageToNetwork( id, - ModuleMessage { - module: MODULE_NAME.into(), - msg: serde_yaml::to_string(&msg_node).unwrap(), - }, + NetworkWrapper::wrap_yaml(MODULE_NAME, &msg_node).unwrap(), ) .into(), ) @@ -171,7 +167,7 @@ mod tests { let mut tap = rnd.get_tap().await?; assert_eq!(0, tr.get_counter()); - rnd.settle_msg(RandomMessage::Output(RandomOut::ListUpdate( + rnd.settle_msg(RandomMessage::Output(RandomOut::NodeIDsConnected( vec![id1].into(), ))) .await?; diff --git a/flmodules/src/web_proxy/broker.rs b/flmodules/src/web_proxy/broker.rs index d784a786..c5d571a5 100644 --- a/flmodules/src/web_proxy/broker.rs +++ b/flmodules/src/web_proxy/broker.rs @@ -8,7 +8,7 @@ use flarch::{ use thiserror::Error; use tokio::sync::{mpsc::channel, watch}; -use crate::random_connections::messages::{ModuleMessage, RandomIn, RandomMessage, RandomOut}; +use crate::overlay::messages::{NetworkWrapper, OverlayIn, OverlayMessage, OverlayOut}; use flarch::{ broker::{Broker, BrokerError, Subsystem, SubsystemHandler}, nodeids::{NodeID, U256}, @@ -16,7 +16,7 @@ use flarch::{ use super::{ core::{Counters, WebProxyConfig, WebProxyStorage, WebProxyStorageSave}, - messages::{MessageNode, WebProxyIn, WebProxyMessage, WebProxyMessages, WebProxyOut}, + messages::{WebProxyIn, WebProxyMessage, WebProxyMessages, WebProxyOut}, response::Response, }; @@ -45,7 +45,7 @@ impl WebProxy { pub async fn start( mut ds: Box, our_id: NodeID, - rc: Broker, + overlay: Broker, config: WebProxyConfig, ) -> Result { let str = ds.get(MODULE_NAME).unwrap_or("".into()); @@ -53,7 +53,7 @@ impl WebProxy { let mut web_proxy = Broker::new(); let messages = WebProxyMessages::new(storage.clone(), config, our_id, web_proxy.clone())?; - Translate::start(web_proxy.clone(), rc, messages).await?; + Translate::start(web_proxy.clone(), overlay, messages).await?; let (tx, storage) = watch::channel(storage); let (mut tap, _) = web_proxy.get_tap().await?; @@ -112,7 +112,7 @@ struct Translate { impl Translate { async fn start( mut web_proxy: Broker, - random: Broker, + overlay: Broker, messages: WebProxyMessages, ) -> Result<(), WebProxyError> { web_proxy @@ -120,29 +120,23 @@ impl Translate { .await?; web_proxy .link_bi( - random, - Box::new(Self::link_rnd_proxy), - Box::new(Self::link_proxy_rnd), + overlay, + Box::new(Self::link_overlay_proxy), + Box::new(Self::link_proxy_overlay), ) .await?; Ok(()) } - fn link_rnd_proxy(msg: RandomMessage) -> Option { - if let RandomMessage::Output(msg_out) = msg { + fn link_overlay_proxy(msg: OverlayMessage) -> Option { + if let OverlayMessage::Output(msg_out) = msg { match msg_out { - RandomOut::NodeInfoConnected(list) => { + OverlayOut::NodeInfosConnected(list) => { Some(WebProxyIn::NodeInfoConnected(list).into()) } - RandomOut::NodeMessageFromNetwork(id, msg) => { - if msg.module == MODULE_NAME { - serde_yaml::from_str::(&msg.msg) - .ok() - .map(|msg_node| WebProxyIn::Node(id, msg_node).into()) - } else { - None - } - } + OverlayOut::NetworkMapperFromNetwork(id, msg) => msg + .unwrap_yaml(MODULE_NAME) + .map(|msg| WebProxyIn::Node(id, msg).into()), _ => None, } } else { @@ -150,15 +144,12 @@ impl Translate { } } - fn link_proxy_rnd(msg: WebProxyMessage) -> Option { + fn link_proxy_overlay(msg: WebProxyMessage) -> Option { if let WebProxyMessage::Output(WebProxyOut::Node(id, msg_node)) = msg { Some( - RandomIn::NodeMessageToNetwork( + OverlayIn::NetworkWrapperToNetwork( id, - ModuleMessage { - module: MODULE_NAME.into(), - msg: serde_yaml::to_string(&msg_node).unwrap(), - }, + NetworkWrapper::wrap_yaml(MODULE_NAME, &msg_node).unwrap(), ) .into(), ) @@ -214,8 +205,10 @@ mod tests { let (mut wp_tap, _) = wp_rnd.get_tap().await?; let list = vec![cl_in, wp_in]; - cl_rnd.emit_msg(RandomMessage::Output(RandomOut::NodeInfoConnected(list.clone())))?; - wp_rnd.emit_msg(RandomMessage::Output(RandomOut::NodeInfoConnected(list)))?; + cl_rnd.emit_msg(OverlayMessage::Output(OverlayOut::NodeInfosConnected( + list.clone(), + )))?; + wp_rnd.emit_msg(OverlayMessage::Output(OverlayOut::NodeInfosConnected(list)))?; let (tx, mut rx) = channel(1); spawn_local(async move { @@ -232,23 +225,23 @@ mod tests { return Ok(()); } - if let Ok(RandomMessage::Input(RandomIn::NodeMessageToNetwork(dst, msg))) = + if let Ok(OverlayMessage::Input(OverlayIn::NetworkWrapperToNetwork(dst, msg))) = cl_tap.try_recv() { log::debug!("Sending to WP: {msg:?}"); wp_rnd - .emit_msg(RandomMessage::Output(RandomOut::NodeMessageFromNetwork( + .emit_msg(OverlayMessage::Output(OverlayOut::NetworkMapperFromNetwork( dst, msg, ))) .expect("sending to wp"); } - if let Ok(RandomMessage::Input(RandomIn::NodeMessageToNetwork(dst, msg))) = + if let Ok(OverlayMessage::Input(OverlayIn::NetworkWrapperToNetwork(dst, msg))) = wp_tap.try_recv() { log::debug!("Sending to CL: {msg:?}"); cl_rnd - .emit_msg(RandomMessage::Output(RandomOut::NodeMessageFromNetwork( + .emit_msg(OverlayMessage::Output(OverlayOut::NetworkMapperFromNetwork( dst, msg, ))) .expect("sending to wp"); diff --git a/flnode/src/node.rs b/flnode/src/node.rs index 1c865249..a1c8de6d 100644 --- a/flnode/src/node.rs +++ b/flnode/src/node.rs @@ -15,17 +15,10 @@ use flmodules::{ broker::GossipBroker, core::{self, Category, Event}, messages::{GossipIn, GossipMessage}, - }, - network::messages::{NetworkIn, NetworkError, NetworkMessage}, - nodeconfig::{ConfigError, NodeConfig, NodeInfo}, - ping::{broker::PingBroker, messages::PingConfig}, - random_connections::broker::RandomBroker, - timer::{TimerBroker, TimerMessage}, - web_proxy::{ + }, network::messages::{NetworkError, NetworkIn, NetworkMessage}, nodeconfig::{ConfigError, NodeConfig, NodeInfo}, overlay::broker::OverlayRandom, ping::{broker::PingBroker, messages::PingConfig}, random_connections::broker::RandomBroker, timer::{TimerBroker, TimerMessage}, web_proxy::{ broker::{WebProxy, WebProxyError}, core::WebProxyConfig, - }, - Modules, + }, Modules }; use crate::stat::StatBroker; @@ -116,7 +109,7 @@ impl Node { WebProxy::start( storage.clone(), id, - rnd.broker.clone(), + OverlayRandom::start(rnd.broker.clone()).await?, WebProxyConfig::default(), ) .await?,