Skip to content

Commit

Permalink
Make CombinedNetworks delay duration configurable (#2726)
Browse files Browse the repository at this point in the history
* Make CombinedNetworks delay duration configurable

* Secondary network delay configurable in HotShotConfig

* Rename CombinedConfig to CombinedNetworkConfig

* Network delay in test network generator

`secondary_network_delay` removed from `HotShotConfig`
because it cannot easily be passed to the test network
generator.

* Temporary pinning to hotshot-types branch

TODO: switch to hotshot-types tag or main branch
before merging

* Pin to hotshot-types tag 0.1.2

* Remove files added back by mistake
  • Loading branch information
lukaszrzasik authored Mar 13, 2024
1 parent d5adb9c commit 078e5b8
Show file tree
Hide file tree
Showing 7 changed files with 48 additions and 13 deletions.
27 changes: 18 additions & 9 deletions examples/infra/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use hotshot_orchestrator::config::NetworkConfigSource;
use hotshot_orchestrator::{
self,
client::{BenchResults, OrchestratorClient, ValidatorArgs},
config::{NetworkConfig, NetworkConfigFile, WebServerConfig},
config::{CombinedNetworkConfig, NetworkConfig, NetworkConfigFile, WebServerConfig},
};
use hotshot_types::message::Message;
use hotshot_types::traits::network::ConnectedNetwork;
Expand Down Expand Up @@ -876,6 +876,9 @@ where
wait_between_polls,
}: WebServerConfig = config.clone().da_web_server_config.unwrap();

let CombinedNetworkConfig { delay_duration }: CombinedNetworkConfig =
config.clone().combined_network_config.unwrap();

// Create and wait for underlying webserver network
let web_quorum_network =
webserver_network_from_config::<TYPES>(config.clone(), pub_key.clone());
Expand All @@ -885,14 +888,20 @@ where
web_quorum_network.wait_for_ready().await;

// Combine the two communication channels
let da_channel = CombinedNetworks::new(Arc::new(UnderlyingCombinedNetworks(
web_da_network.clone(),
libp2p_underlying_quorum_network.clone(),
)));
let quorum_channel = CombinedNetworks::new(Arc::new(UnderlyingCombinedNetworks(
web_quorum_network.clone(),
libp2p_underlying_quorum_network.clone(),
)));
let da_channel = CombinedNetworks::new(
Arc::new(UnderlyingCombinedNetworks(
web_da_network.clone(),
libp2p_underlying_quorum_network.clone(),
)),
delay_duration,
);
let quorum_channel = CombinedNetworks::new(
Arc::new(UnderlyingCombinedNetworks(
web_quorum_network.clone(),
libp2p_underlying_quorum_network.clone(),
)),
delay_duration,
);

CombinedDARun {
config,
Expand Down
11 changes: 7 additions & 4 deletions hotshot/src/traits/networking/combined_network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,15 +88,15 @@ impl<TYPES: NodeType> CombinedNetworks<TYPES> {
///
/// Panics if `COMBINED_NETWORK_CACHE_SIZE` is 0
#[must_use]
pub fn new(networks: Arc<UnderlyingCombinedNetworks<TYPES>>) -> Self {
pub fn new(networks: Arc<UnderlyingCombinedNetworks<TYPES>>, delay_duration: Duration) -> Self {
Self {
networks,
message_cache: Arc::new(RwLock::new(LruCache::new(
NonZeroUsize::new(COMBINED_NETWORK_CACHE_SIZE).unwrap(),
))),
primary_down: Arc::new(AtomicU64::new(0)),
delayed_tasks: Arc::default(),
delay_duration: Arc::new(RwLock::new(Duration::from_millis(1000))),
delay_duration: Arc::new(RwLock::new(delay_duration)),
}
}

Expand Down Expand Up @@ -187,6 +187,7 @@ impl<TYPES: NodeType> TestableNetworkingImplementation<TYPES> for CombinedNetwor
da_committee_size: usize,
is_da: bool,
reliability_config: Option<Box<dyn NetworkReliability>>,
secondary_network_delay: Duration,
) -> Box<dyn Fn(u64) -> (Arc<Self>, Arc<Self>) + 'static> {
let generators = (
<WebServerNetwork<
Expand All @@ -198,6 +199,7 @@ impl<TYPES: NodeType> TestableNetworkingImplementation<TYPES> for CombinedNetwor
da_committee_size,
is_da,
None,
Duration::default(),
),
<Libp2pNetwork<Message<TYPES>, TYPES::SignatureKey> as TestableNetworkingImplementation<_>>::generator(
expected_node_count,
Expand All @@ -206,6 +208,7 @@ impl<TYPES: NodeType> TestableNetworkingImplementation<TYPES> for CombinedNetwor
da_committee_size,
is_da,
reliability_config,
Duration::default(),
)
);
Box::new(move |node_id| {
Expand All @@ -228,7 +231,7 @@ impl<TYPES: NodeType> TestableNetworkingImplementation<TYPES> for CombinedNetwor
))),
primary_down: Arc::new(AtomicU64::new(0)),
delayed_tasks: Arc::default(),
delay_duration: Arc::new(RwLock::new(Duration::from_millis(1000))),
delay_duration: Arc::new(RwLock::new(secondary_network_delay)),
};
let da_net = Self {
networks: Arc::new(da_networks),
Expand All @@ -237,7 +240,7 @@ impl<TYPES: NodeType> TestableNetworkingImplementation<TYPES> for CombinedNetwor
))),
primary_down: Arc::new(AtomicU64::new(0)),
delayed_tasks: Arc::default(),
delay_duration: Arc::new(RwLock::new(Duration::from_millis(1000))),
delay_duration: Arc::new(RwLock::new(secondary_network_delay)),
};
(quorum_net.into(), da_net.into())
})
Expand Down
1 change: 1 addition & 0 deletions hotshot/src/traits/networking/libp2p_network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,7 @@ where
da_committee_size: usize,
_is_da: bool,
reliability_config: Option<Box<dyn NetworkReliability>>,
_secondary_network_delay: Duration,
) -> Box<dyn Fn(u64) -> (Arc<Self>, Arc<Self>) + 'static> {
assert!(
da_committee_size <= expected_node_count,
Expand Down
2 changes: 2 additions & 0 deletions hotshot/src/traits/networking/memory_network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use async_compatibility_layer::{
use async_lock::{Mutex, RwLock};
use async_trait::async_trait;
use bincode::Options;
use core::time::Duration;
use dashmap::DashMap;
use futures::StreamExt;
use hotshot_types::{
Expand Down Expand Up @@ -187,6 +188,7 @@ impl<TYPES: NodeType> TestableNetworkingImplementation<TYPES>
_da_committee_size: usize,
_is_da: bool,
reliability_config: Option<Box<dyn NetworkReliability>>,
_secondary_network_delay: Duration,
) -> Box<dyn Fn(u64) -> (Arc<Self>, Arc<Self>) + 'static> {
let master: Arc<_> = MasterMap::new();
// We assign known_nodes' public key and stake value rather than read from config file since it's a test
Expand Down
1 change: 1 addition & 0 deletions hotshot/src/traits/networking/web_server_network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1242,6 +1242,7 @@ impl<TYPES: NodeType> TestableNetworkingImplementation<TYPES> for WebServerNetwo
da_committee_size: usize,
_is_da: bool,
reliability_config: Option<Box<dyn NetworkReliability>>,
_secondary_network_delay: Duration,
) -> Box<dyn Fn(u64) -> (Arc<Self>, Arc<Self>) + 'static> {
let da_gen = Self::single_generator(
expected_node_count,
Expand Down
14 changes: 14 additions & 0 deletions orchestrator/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,13 @@ pub struct WebServerConfig {
pub wait_between_polls: Duration,
}

/// configuration for combined network
#[derive(serde::Serialize, serde::Deserialize, Clone, Debug)]
pub struct CombinedNetworkConfig {
/// delay duration before sending a message through the secondary network
pub delay_duration: Duration,
}

/// a network configuration error
#[derive(Error, Debug)]
pub enum NetworkConfigError {
Expand Down Expand Up @@ -154,6 +161,8 @@ pub struct NetworkConfig<KEY: SignatureKey, ELECTIONCONFIG: ElectionConfig> {
pub web_server_config: Option<WebServerConfig>,
/// the data availability web server config
pub da_web_server_config: Option<WebServerConfig>,
/// combined network config
pub combined_network_config: Option<CombinedNetworkConfig>,
/// the commit this run is based on
pub commit_sha: String,
}
Expand Down Expand Up @@ -388,6 +397,7 @@ impl<K: SignatureKey, E: ElectionConfig> Default for NetworkConfig<K, E> {
election_config_type_name: std::any::type_name::<E>().to_string(),
web_server_config: None,
da_web_server_config: None,
combined_network_config: None,
next_view_timeout: 10,
num_bootrap: 5,
propose_min_round_time: Duration::from_secs(0),
Expand Down Expand Up @@ -432,6 +442,9 @@ pub struct NetworkConfigFile<KEY: SignatureKey> {
/// the data availability web server config
#[serde(default)]
pub da_web_server_config: Option<WebServerConfig>,
/// combined network config
#[serde(default)]
pub combined_network_config: Option<CombinedNetworkConfig>,
}

impl<K: SignatureKey, E: ElectionConfig> From<NetworkConfigFile<K>> for NetworkConfig<K, E> {
Expand Down Expand Up @@ -473,6 +486,7 @@ impl<K: SignatureKey, E: ElectionConfig> From<NetworkConfigFile<K>> for NetworkC
start_delay_seconds: val.start_delay_seconds,
web_server_config: val.web_server_config,
da_web_server_config: val.da_web_server_config,
combined_network_config: val.combined_network_config,
commit_sha: String::new(),
}
}
Expand Down
5 changes: 5 additions & 0 deletions testing/src/test_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ pub struct TimingData {
pub propose_min_round_time: Duration,
/// The maximum amount of time a leader can wait to start a round
pub propose_max_round_time: Duration,
/// Delay before sending through the secondary network in CombinedNetworks
pub secondary_network_delay: Duration,
}

/// metadata describing a test
Expand Down Expand Up @@ -81,6 +83,7 @@ impl Default for TimingData {
start_delay: 100,
propose_min_round_time: Duration::new(0, 0),
propose_max_round_time: Duration::from_millis(100),
secondary_network_delay: Duration::from_millis(1000),
}
}
}
Expand Down Expand Up @@ -297,6 +300,7 @@ impl TestMetadata {
start_delay,
propose_min_round_time,
propose_max_round_time,
secondary_network_delay,
} = timing_data;
let mod_config =
// TODO this should really be using the timing config struct
Expand All @@ -316,6 +320,7 @@ impl TestMetadata {
num_bootstrap_nodes,
da_staked_committee_size,
unreliable_network,
secondary_network_delay,
),
storage: Box::new(|_| I::construct_tmp_storage().unwrap()),
config,
Expand Down

0 comments on commit 078e5b8

Please sign in to comment.