Skip to content

Commit

Permalink
feat(papyrus_network): add broadcast and sqmr metrics structs (#4046)
Browse files Browse the repository at this point in the history
* feat(papyrus_network): add broadcast and sqmr metrics structs

* feat(papyrus_network): track broadcast metrics

* refactor(papyrus_network): fix CR comments

* fix(starknet_consensus_manager): fix dependency on state_sync_types
  • Loading branch information
AlonLStarkWare authored Feb 12, 2025
1 parent c7124b0 commit 9ec2637
Show file tree
Hide file tree
Showing 7 changed files with 102 additions and 39 deletions.
42 changes: 36 additions & 6 deletions crates/papyrus_network/src/network_manager/metrics.rs
Original file line number Diff line number Diff line change
@@ -1,17 +1,47 @@
use starknet_sequencer_metrics::metrics::MetricGauge;
pub struct NetworkMetrics {
pub num_connected_peers: MetricGauge,
use starknet_sequencer_metrics::metrics::{MetricCounter, MetricGauge};

// TODO(alonl): consider splitting the metrics by topic
pub struct BroadcastNetworkMetrics {
pub num_sent_broadcast_messages: MetricCounter,
pub num_received_broadcast_messages: MetricCounter,
}

impl BroadcastNetworkMetrics {
pub fn register(&self) {
self.num_sent_broadcast_messages.register();
self.num_received_broadcast_messages.register();
}
}

pub struct SqmrNetworkMetrics {
pub num_active_inbound_sessions: MetricGauge,
pub num_active_outbound_sessions: MetricGauge,
}

impl NetworkMetrics {
impl SqmrNetworkMetrics {
pub fn register(&self) {
let num_connected_peers_metric = self.num_connected_peers.register();
num_connected_peers_metric.set(0f64);
let num_active_inbound_sessions_metric = self.num_active_inbound_sessions.register();
num_active_inbound_sessions_metric.set(0f64);
let num_active_outbound_sessions_metric = self.num_active_outbound_sessions.register();
num_active_outbound_sessions_metric.set(0f64);
}
}

pub struct NetworkMetrics {
pub num_connected_peers: MetricGauge,
pub broadcast_metrics: Option<BroadcastNetworkMetrics>,
pub sqmr_metrics: Option<SqmrNetworkMetrics>,
}

impl NetworkMetrics {
pub fn register(&self) {
let num_connected_peers_metric = self.num_connected_peers.register();
num_connected_peers_metric.set(0f64);
if let Some(broadcast_metrics) = self.broadcast_metrics.as_ref() {
broadcast_metrics.register();
}
if let Some(sqmr_metrics) = self.sqmr_metrics.as_ref() {
sqmr_metrics.register();
}
}
}
36 changes: 28 additions & 8 deletions crates/papyrus_network/src/network_manager/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -417,8 +417,10 @@ impl<SwarmT: SwarmTrait> GenericNetworkManager<SwarmT> {
);
return;
};
if let Some(metrics) = self.metrics.as_ref() {
metrics.num_active_inbound_sessions.increment(1);
if let Some(sqmr_metrics) =
self.metrics.as_ref().and_then(|metrics| metrics.sqmr_metrics.as_ref())
{
sqmr_metrics.num_active_inbound_sessions.increment(1);
}
let (responses_sender, responses_receiver) = futures::channel::mpsc::channel(
*self
Expand Down Expand Up @@ -515,8 +517,14 @@ impl<SwarmT: SwarmTrait> GenericNetworkManager<SwarmT> {
&mut self,
event: gossipsub_impl::ExternalEvent,
) -> Result<(), NetworkError> {
if let Some(broadcast_metrics) =
self.metrics.as_ref().and_then(|metrics| metrics.broadcast_metrics.as_ref())
{
broadcast_metrics.num_received_broadcast_messages.increment(1);
}
let gossipsub_impl::ExternalEvent::Received { originated_peer_id, message, topic_hash } =
event;
trace!("Received broadcast message with topic hash: {topic_hash:?}");
let broadcasted_message_metadata = BroadcastedMessageMetadata {
originator_id: OpaquePeerId::private_new(originated_peer_id),
encoded_message_length: message.len(),
Expand Down Expand Up @@ -578,28 +586,40 @@ impl<SwarmT: SwarmTrait> GenericNetworkManager<SwarmT> {
) {
let SqmrClientPayload { query, report_receiver, responses_sender } = client_payload;
let outbound_session_id = self.swarm.send_query(query, protocol.clone());
if let Some(metrics) = self.metrics.as_ref() {
metrics.num_active_outbound_sessions.increment(1);
if let Some(sqmr_metrics) =
self.metrics.as_ref().and_then(|metrics| metrics.sqmr_metrics.as_ref())
{
sqmr_metrics.num_active_outbound_sessions.increment(1);
}
self.sqmr_outbound_response_senders.insert(outbound_session_id, responses_sender);
self.sqmr_outbound_report_receivers_awaiting_assignment
.insert(outbound_session_id, report_receiver);
}

fn broadcast_message(&mut self, message: Bytes, topic_hash: TopicHash) {
if let Some(broadcast_metrics) =
self.metrics.as_ref().and_then(|metrics| metrics.broadcast_metrics.as_ref())
{
broadcast_metrics.num_sent_broadcast_messages.increment(1);
}
trace!("Sending broadcast message with topic hash: {topic_hash:?}");
self.swarm.broadcast_message(message, topic_hash);
}

fn report_session_removed_to_metrics(&mut self, session_id: SessionId) {
match session_id {
SessionId::InboundSessionId(_) => {
if let Some(metrics) = self.metrics.as_ref() {
metrics.num_active_inbound_sessions.decrement(1);
if let Some(sqmr_metrics) =
self.metrics.as_ref().and_then(|metrics| metrics.sqmr_metrics.as_ref())
{
sqmr_metrics.num_active_inbound_sessions.decrement(1);
}
}
SessionId::OutboundSessionId(_) => {
if let Some(metrics) = self.metrics.as_ref() {
metrics.num_active_outbound_sessions.decrement(1);
if let Some(sqmr_metrics) =
self.metrics.as_ref().and_then(|metrics| metrics.sqmr_metrics.as_ref())
{
sqmr_metrics.num_active_outbound_sessions.decrement(1);
}
}
}
Expand Down
1 change: 1 addition & 0 deletions crates/starknet_consensus_manager/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -32,3 +32,4 @@ validator.workspace = true
mockall.workspace = true
rstest.workspace = true
starknet_batcher_types = { path = "../starknet_batcher_types", features = ["testing"] }
starknet_state_sync_types = { path = "../starknet_state_sync_types", features = ["testing"] }
13 changes: 8 additions & 5 deletions crates/starknet_consensus_manager/src/consensus_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use std::sync::Arc;
use apollo_reverts::revert_blocks_and_eternal_pending;
use async_trait::async_trait;
use papyrus_network::gossipsub_impl::Topic;
use papyrus_network::network_manager::metrics::NetworkMetrics;
use papyrus_network::network_manager::metrics::{BroadcastNetworkMetrics, NetworkMetrics};
use papyrus_network::network_manager::{BroadcastTopicChannels, NetworkManager};
use papyrus_protobuf::consensus::{HeightAndRound, ProposalPart, StreamMessage, Vote};
use starknet_api::block::BlockNumber;
Expand All @@ -22,9 +22,9 @@ use starknet_infra_utils::type_name::short_type_name;
use starknet_sequencer_infra::component_definitions::ComponentStarter;
use starknet_sequencer_infra::errors::ComponentError;
use starknet_sequencer_metrics::metric_definitions::{
CONSENSUS_NUM_ACTIVE_INBOUND_SESSIONS,
CONSENSUS_NUM_ACTIVE_OUTBOUND_SESSIONS,
CONSENSUS_NUM_CONNECTED_PEERS,
CONSENSUS_NUM_RECEIVED_MESSAGES,
CONSENSUS_NUM_SENT_MESSAGES,
};
use starknet_state_sync_types::communication::SharedStateSyncClient;
use tracing::{error, info};
Expand Down Expand Up @@ -56,8 +56,11 @@ impl ConsensusManager {

let network_manager_metrics = Some(NetworkMetrics {
num_connected_peers: CONSENSUS_NUM_CONNECTED_PEERS,
num_active_inbound_sessions: CONSENSUS_NUM_ACTIVE_INBOUND_SESSIONS,
num_active_outbound_sessions: CONSENSUS_NUM_ACTIVE_OUTBOUND_SESSIONS,
broadcast_metrics: Some(BroadcastNetworkMetrics {
num_sent_broadcast_messages: CONSENSUS_NUM_SENT_MESSAGES,
num_received_broadcast_messages: CONSENSUS_NUM_RECEIVED_MESSAGES,
}),
sqmr_metrics: None,
});
let mut network_manager =
NetworkManager::new(self.config.network_config.clone(), None, network_manager_metrics);
Expand Down
13 changes: 8 additions & 5 deletions crates/starknet_mempool_p2p/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,15 @@ pub mod runner;

use futures::FutureExt;
use papyrus_network::gossipsub_impl::Topic;
use papyrus_network::network_manager::metrics::NetworkMetrics;
use papyrus_network::network_manager::metrics::{BroadcastNetworkMetrics, NetworkMetrics};
use papyrus_network::network_manager::{BroadcastTopicChannels, NetworkManager};
use starknet_class_manager_types::transaction_converter::TransactionConverter;
use starknet_class_manager_types::SharedClassManagerClient;
use starknet_gateway_types::communication::SharedGatewayClient;
use starknet_sequencer_metrics::metric_definitions::{
MEMPOOL_P2P_NUM_ACTIVE_INBOUND_SESSIONS,
MEMPOOL_P2P_NUM_ACTIVE_OUTBOUND_SESSIONS,
MEMPOOL_P2P_NUM_CONNECTED_PEERS,
MEMPOOL_P2P_NUM_RECEIVED_MESSAGES,
MEMPOOL_P2P_NUM_SENT_MESSAGES,
};

use crate::config::MempoolP2pConfig;
Expand All @@ -32,8 +32,11 @@ pub fn create_p2p_propagator_and_runner(
);
let network_manager_metrics = Some(NetworkMetrics {
num_connected_peers: MEMPOOL_P2P_NUM_CONNECTED_PEERS,
num_active_inbound_sessions: MEMPOOL_P2P_NUM_ACTIVE_INBOUND_SESSIONS,
num_active_outbound_sessions: MEMPOOL_P2P_NUM_ACTIVE_OUTBOUND_SESSIONS,
broadcast_metrics: Some(BroadcastNetworkMetrics {
num_sent_broadcast_messages: MEMPOOL_P2P_NUM_SENT_MESSAGES,
num_received_broadcast_messages: MEMPOOL_P2P_NUM_RECEIVED_MESSAGES,
}),
sqmr_metrics: None,
});
let mut network_manager = NetworkManager::new(
mempool_p2p_config.network_config,
Expand Down
16 changes: 9 additions & 7 deletions crates/starknet_sequencer_metrics/src/metric_definitions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,14 +79,10 @@ define_gauge_metrics!(
},
MetricScope::Network => {
{ MEMPOOL_P2P_NUM_CONNECTED_PEERS, "apollo_mempool_p2p_num_connected_peers", "The number of connected peers to the mempool p2p component" },
{ MEMPOOL_P2P_NUM_ACTIVE_INBOUND_SESSIONS, "apollo_mempool_p2p_num_active_inbound_sessions", "The number of inbound sessions to the mempool p2p component" },
{ MEMPOOL_P2P_NUM_ACTIVE_OUTBOUND_SESSIONS, "apollo_mempool_p2p_num_active_outbound_sessions", "The number of outbound sessions to the mempool p2p component" },
{ CONSENSUS_NUM_CONNECTED_PEERS, "apollo_consensus_num_connected_peers", "The number of connected peers to the consensus p2p component" },
{ CONSENSUS_NUM_ACTIVE_INBOUND_SESSIONS, "apollo_consensus_num_active_inbound_sessions", "The number of inbound sessions to the consensus p2p component" },
{ CONSENSUS_NUM_ACTIVE_OUTBOUND_SESSIONS, "apollo_consensus_num_active_outbound_sessions", "The number of outbound sessions to the consensus p2p component" },
{ STATE_SYNC_NUM_CONNECTED_PEERS, "apollo_sync_num_connected_peers", "The number of connected peers to the state sync p2p component" },
{ STATE_SYNC_NUM_ACTIVE_INBOUND_SESSIONS, "apollo_sync_num_active_inbound_sessions", "The number of inbound sessions to the state sync p2p component" },
{ STATE_SYNC_NUM_ACTIVE_OUTBOUND_SESSIONS, "apollo_sync_num_active_outbound_sessions", "The number of outbound sessions to the state sync p2p component" },
{ STATE_SYNC_P2P_NUM_CONNECTED_PEERS, "apollo_sync_num_connected_peers", "The number of connected peers to the state sync p2p component" },
{ STATE_SYNC_P2P_NUM_ACTIVE_INBOUND_SESSIONS, "apollo_sync_num_active_inbound_sessions", "The number of inbound sessions to the state sync p2p component" },
{ STATE_SYNC_P2P_NUM_ACTIVE_OUTBOUND_SESSIONS, "apollo_sync_num_active_outbound_sessions", "The number of outbound sessions to the state sync p2p component" },
}
);

Expand All @@ -104,4 +100,10 @@ define_counter_metrics!(
{ ADDED_TRANSACTIONS_SUCCESS, "ADDED_TRANSACTIONS_SUCCESS", "Number of successfully added transactions", 0 },
{ ADDED_TRANSACTIONS_FAILURE, "ADDED_TRANSACTIONS_FAILURE", "Number of faulty added transactions", 0 }
},
MetricScope::Network => {
{ MEMPOOL_P2P_NUM_SENT_MESSAGES, "apollo_mempool_num_sent_messages", "The number of messages sent by the mempool p2p component", 0 },
{ MEMPOOL_P2P_NUM_RECEIVED_MESSAGES, "apollo_mempool_num_received_messages", "The number of messages received by the mempool p2p component", 0 },
{ CONSENSUS_NUM_SENT_MESSAGES, "apollo_consensus_num_sent_messages", "The number of messages sent by the consensus p2p component", 0 },
{ CONSENSUS_NUM_RECEIVED_MESSAGES, "apollo_consensus_num_received_messages", "The number of messages received by the consensus p2p component", 0 },
}
);
20 changes: 12 additions & 8 deletions crates/starknet_state_sync/src/runner/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@ use futures::future::{self, BoxFuture};
use futures::never::Never;
use futures::{FutureExt, StreamExt};
use papyrus_common::pending_classes::PendingClasses;
use papyrus_network::network_manager::{self, metrics, NetworkError, NetworkManager};
use papyrus_network::network_manager::metrics::{NetworkMetrics, SqmrNetworkMetrics};
use papyrus_network::network_manager::{self, NetworkError, NetworkManager};
use papyrus_p2p_sync::client::{
P2pSyncClient,
P2pSyncClientChannels,
Expand All @@ -35,9 +36,9 @@ use starknet_sequencer_infra::component_definitions::ComponentStarter;
use starknet_sequencer_infra::component_server::WrapperServer;
use starknet_sequencer_infra::errors::ComponentError;
use starknet_sequencer_metrics::metric_definitions::{
STATE_SYNC_NUM_ACTIVE_INBOUND_SESSIONS,
STATE_SYNC_NUM_ACTIVE_OUTBOUND_SESSIONS,
STATE_SYNC_NUM_CONNECTED_PEERS,
STATE_SYNC_P2P_NUM_ACTIVE_INBOUND_SESSIONS,
STATE_SYNC_P2P_NUM_ACTIVE_OUTBOUND_SESSIONS,
STATE_SYNC_P2P_NUM_CONNECTED_PEERS,
};
use starknet_state_sync_types::state_sync_types::SyncBlock;
use tokio::sync::RwLock;
Expand Down Expand Up @@ -90,10 +91,13 @@ impl StateSyncRunner {
network_config,
} = config;

let network_manager_metrics = Some(metrics::NetworkMetrics {
num_connected_peers: STATE_SYNC_NUM_CONNECTED_PEERS,
num_active_inbound_sessions: STATE_SYNC_NUM_ACTIVE_INBOUND_SESSIONS,
num_active_outbound_sessions: STATE_SYNC_NUM_ACTIVE_OUTBOUND_SESSIONS,
let network_manager_metrics = Some(NetworkMetrics {
num_connected_peers: STATE_SYNC_P2P_NUM_CONNECTED_PEERS,
broadcast_metrics: None,
sqmr_metrics: Some(SqmrNetworkMetrics {
num_active_inbound_sessions: STATE_SYNC_P2P_NUM_ACTIVE_INBOUND_SESSIONS,
num_active_outbound_sessions: STATE_SYNC_P2P_NUM_ACTIVE_OUTBOUND_SESSIONS,
}),
});
let mut network_manager = network_manager::NetworkManager::new(
network_config,
Expand Down

0 comments on commit 9ec2637

Please sign in to comment.