Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(papyrus_network): add broadcast and sqmr metrics structs #4046

Merged
merged 4 commits into from
Feb 12, 2025
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 36 additions & 6 deletions crates/papyrus_network/src/network_manager/metrics.rs
Original file line number Diff line number Diff line change
@@ -1,17 +1,47 @@
use starknet_sequencer_metrics::metrics::MetricGauge;
pub struct NetworkMetrics {
pub num_connected_peers: MetricGauge,
use starknet_sequencer_metrics::metrics::{MetricCounter, MetricGauge};

// TODO(alonl): consider splitting the metrics by topic
pub struct BroadcastNetworkMetrics {
pub num_sent_broadcast_messages: MetricCounter,
pub num_received_broadcast_messages: MetricCounter,
}

impl BroadcastNetworkMetrics {
pub fn register(&self) {
self.num_sent_broadcast_messages.register();
self.num_received_broadcast_messages.register();
}
}

pub struct SqmrNetworkMetrics {
pub num_active_inbound_sessions: MetricGauge,
pub num_active_outbound_sessions: MetricGauge,
}

impl NetworkMetrics {
impl SqmrNetworkMetrics {
pub fn register(&self) {
let num_connected_peers_metric = self.num_connected_peers.register();
num_connected_peers_metric.set(0f64);
let num_active_inbound_sessions_metric = self.num_active_inbound_sessions.register();
num_active_inbound_sessions_metric.set(0f64);
let num_active_outbound_sessions_metric = self.num_active_outbound_sessions.register();
num_active_outbound_sessions_metric.set(0f64);
}
}

pub struct NetworkMetrics {
pub num_connected_peers: MetricGauge,
pub broadcast_metrics: Option<BroadcastNetworkMetrics>,
pub sqmr_metrics: Option<SqmrNetworkMetrics>,
}

impl NetworkMetrics {
pub fn register(&self) {
let num_connected_peers_metric = self.num_connected_peers.register();
num_connected_peers_metric.set(0f64);
if let Some(broadcast_metrics) = self.broadcast_metrics.as_ref() {
broadcast_metrics.register();
}
if let Some(sqmr_metrics) = self.sqmr_metrics.as_ref() {
sqmr_metrics.register();
}
}
}
36 changes: 28 additions & 8 deletions crates/papyrus_network/src/network_manager/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -417,8 +417,10 @@ impl<SwarmT: SwarmTrait> GenericNetworkManager<SwarmT> {
);
return;
};
if let Some(metrics) = self.metrics.as_ref() {
metrics.num_active_inbound_sessions.increment(1);
if let Some(sqmr_metrics) =
self.metrics.as_ref().and_then(|metrics| metrics.sqmr_metrics.as_ref())
{
sqmr_metrics.num_active_inbound_sessions.increment(1);
}
let (responses_sender, responses_receiver) = futures::channel::mpsc::channel(
*self
Expand Down Expand Up @@ -515,8 +517,14 @@ impl<SwarmT: SwarmTrait> GenericNetworkManager<SwarmT> {
&mut self,
event: gossipsub_impl::ExternalEvent,
) -> Result<(), NetworkError> {
if let Some(broadcast_metrics) =
self.metrics.as_ref().and_then(|metrics| metrics.broadcast_metrics.as_ref())
{
broadcast_metrics.num_received_broadcast_messages.increment(1);
}
let gossipsub_impl::ExternalEvent::Received { originated_peer_id, message, topic_hash } =
event;
trace!("Received broadcast message with topic hash: {topic_hash:?}");
let broadcasted_message_metadata = BroadcastedMessageMetadata {
originator_id: OpaquePeerId::private_new(originated_peer_id),
encoded_message_length: message.len(),
Expand Down Expand Up @@ -578,28 +586,40 @@ impl<SwarmT: SwarmTrait> GenericNetworkManager<SwarmT> {
) {
let SqmrClientPayload { query, report_receiver, responses_sender } = client_payload;
let outbound_session_id = self.swarm.send_query(query, protocol.clone());
if let Some(metrics) = self.metrics.as_ref() {
metrics.num_active_outbound_sessions.increment(1);
if let Some(sqmr_metrics) =
self.metrics.as_ref().and_then(|metrics| metrics.sqmr_metrics.as_ref())
{
sqmr_metrics.num_active_outbound_sessions.increment(1);
}
self.sqmr_outbound_response_senders.insert(outbound_session_id, responses_sender);
self.sqmr_outbound_report_receivers_awaiting_assignment
.insert(outbound_session_id, report_receiver);
}

fn broadcast_message(&mut self, message: Bytes, topic_hash: TopicHash) {
if let Some(broadcast_metrics) =
self.metrics.as_ref().and_then(|metrics| metrics.broadcast_metrics.as_ref())
{
broadcast_metrics.num_sent_broadcast_messages.increment(1);
}
trace!("Sending broadcast message with topic hash: {topic_hash:?}");
self.swarm.broadcast_message(message, topic_hash);
}

fn report_session_removed_to_metrics(&mut self, session_id: SessionId) {
match session_id {
SessionId::InboundSessionId(_) => {
if let Some(metrics) = self.metrics.as_ref() {
metrics.num_active_inbound_sessions.decrement(1);
if let Some(sqmr_metrics) =
self.metrics.as_ref().and_then(|metrics| metrics.sqmr_metrics.as_ref())
{
sqmr_metrics.num_active_inbound_sessions.decrement(1);
}
}
SessionId::OutboundSessionId(_) => {
if let Some(metrics) = self.metrics.as_ref() {
metrics.num_active_outbound_sessions.decrement(1);
if let Some(sqmr_metrics) =
self.metrics.as_ref().and_then(|metrics| metrics.sqmr_metrics.as_ref())
{
sqmr_metrics.num_active_outbound_sessions.decrement(1);
}
}
}
Expand Down
13 changes: 8 additions & 5 deletions crates/starknet_consensus_manager/src/consensus_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use std::sync::Arc;

use async_trait::async_trait;
use papyrus_network::gossipsub_impl::Topic;
use papyrus_network::network_manager::metrics::NetworkMetrics;
use papyrus_network::network_manager::metrics::{BroadcastNetworkMetrics, NetworkMetrics};
use papyrus_network::network_manager::{BroadcastTopicChannels, NetworkManager};
use papyrus_protobuf::consensus::{HeightAndRound, ProposalPart, StreamMessage, Vote};
use starknet_api::block::BlockNumber;
Expand All @@ -21,9 +21,9 @@ use starknet_infra_utils::type_name::short_type_name;
use starknet_sequencer_infra::component_definitions::ComponentStarter;
use starknet_sequencer_infra::errors::ComponentError;
use starknet_sequencer_metrics::metric_definitions::{
CONSENSUS_NUM_ACTIVE_INBOUND_SESSIONS,
CONSENSUS_NUM_ACTIVE_OUTBOUND_SESSIONS,
CONSENSUS_NUM_CONNECTED_PEERS,
CONSENSUS_NUM_RECEIVED_MESSAGES,
CONSENSUS_NUM_SENT_MESSAGES,
};
use starknet_state_sync_types::communication::SharedStateSyncClient;
use tracing::{error, info};
Expand Down Expand Up @@ -58,8 +58,11 @@ impl ConsensusManager {

let network_manager_metrics = Some(NetworkMetrics {
num_connected_peers: CONSENSUS_NUM_CONNECTED_PEERS,
num_active_inbound_sessions: CONSENSUS_NUM_ACTIVE_INBOUND_SESSIONS,
num_active_outbound_sessions: CONSENSUS_NUM_ACTIVE_OUTBOUND_SESSIONS,
broadcast_metrics: Some(BroadcastNetworkMetrics {
num_sent_broadcast_messages: CONSENSUS_NUM_SENT_MESSAGES,
num_received_broadcast_messages: CONSENSUS_NUM_RECEIVED_MESSAGES,
}),
sqmr_metrics: None,
});
let mut network_manager =
NetworkManager::new(self.config.network_config.clone(), None, network_manager_metrics);
Expand Down
13 changes: 8 additions & 5 deletions crates/starknet_mempool_p2p/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,15 @@ pub mod runner;

use futures::FutureExt;
use papyrus_network::gossipsub_impl::Topic;
use papyrus_network::network_manager::metrics::NetworkMetrics;
use papyrus_network::network_manager::metrics::{BroadcastNetworkMetrics, NetworkMetrics};
use papyrus_network::network_manager::{BroadcastTopicChannels, NetworkManager};
use starknet_class_manager_types::transaction_converter::TransactionConverter;
use starknet_class_manager_types::SharedClassManagerClient;
use starknet_gateway_types::communication::SharedGatewayClient;
use starknet_sequencer_metrics::metric_definitions::{
MEMPOOL_P2P_NUM_ACTIVE_INBOUND_SESSIONS,
MEMPOOL_P2P_NUM_ACTIVE_OUTBOUND_SESSIONS,
MEMPOOL_P2P_NUM_CONNECTED_PEERS,
MEMPOOL_P2P_NUM_RECEIVED_MESSAGES,
MEMPOOL_P2P_NUM_SENT_MESSAGES,
};

use crate::config::MempoolP2pConfig;
Expand All @@ -32,8 +32,11 @@ pub fn create_p2p_propagator_and_runner(
);
let network_manager_metrics = Some(NetworkMetrics {
num_connected_peers: MEMPOOL_P2P_NUM_CONNECTED_PEERS,
num_active_inbound_sessions: MEMPOOL_P2P_NUM_ACTIVE_INBOUND_SESSIONS,
num_active_outbound_sessions: MEMPOOL_P2P_NUM_ACTIVE_OUTBOUND_SESSIONS,
broadcast_metrics: Some(BroadcastNetworkMetrics {
num_sent_broadcast_messages: MEMPOOL_P2P_NUM_SENT_MESSAGES,
num_received_broadcast_messages: MEMPOOL_P2P_NUM_RECEIVED_MESSAGES,
}),
sqmr_metrics: None,
});
let mut network_manager = NetworkManager::new(
mempool_p2p_config.network_config,
Expand Down
16 changes: 9 additions & 7 deletions crates/starknet_sequencer_metrics/src/metric_definitions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,14 +79,10 @@ define_gauge_metrics!(
},
MetricScope::Network => {
{ MEMPOOL_P2P_NUM_CONNECTED_PEERS, "apollo_mempool_p2p_num_connected_peers", "The number of connected peers to the mempool p2p component" },
{ MEMPOOL_P2P_NUM_ACTIVE_INBOUND_SESSIONS, "apollo_mempool_p2p_num_active_inbound_sessions", "The number of inbound sessions to the mempool p2p component" },
{ MEMPOOL_P2P_NUM_ACTIVE_OUTBOUND_SESSIONS, "apollo_mempool_p2p_num_active_outbound_sessions", "The number of outbound sessions to the mempool p2p component" },
{ CONSENSUS_NUM_CONNECTED_PEERS, "apollo_consensus_num_connected_peers", "The number of connected peers to the consensus p2p component" },
{ CONSENSUS_NUM_ACTIVE_INBOUND_SESSIONS, "apollo_consensus_num_active_inbound_sessions", "The number of inbound sessions to the consensus p2p component" },
{ CONSENSUS_NUM_ACTIVE_OUTBOUND_SESSIONS, "apollo_consensus_num_active_outbound_sessions", "The number of outbound sessions to the consensus p2p component" },
{ STATE_SYNC_NUM_CONNECTED_PEERS, "apollo_sync_num_connected_peers", "The number of connected peers to the state sync p2p component" },
{ STATE_SYNC_NUM_ACTIVE_INBOUND_SESSIONS, "apollo_sync_num_active_inbound_sessions", "The number of inbound sessions to the state sync p2p component" },
{ STATE_SYNC_NUM_ACTIVE_OUTBOUND_SESSIONS, "apollo_sync_num_active_outbound_sessions", "The number of outbound sessions to the state sync p2p component" },
{ STATE_SYNC_P2P_NUM_CONNECTED_PEERS, "apollo_sync_num_connected_peers", "The number of connected peers to the state sync p2p component" },
{ STATE_SYNC_P2P_NUM_ACTIVE_INBOUND_SESSIONS, "apollo_sync_num_active_inbound_sessions", "The number of inbound sessions to the state sync p2p component" },
{ STATE_SYNC_P2P_NUM_ACTIVE_OUTBOUND_SESSIONS, "apollo_sync_num_active_outbound_sessions", "The number of outbound sessions to the state sync p2p component" },
}
);

Expand All @@ -104,4 +100,10 @@ define_counter_metrics!(
{ ADDED_TRANSACTIONS_SUCCESS, "ADDED_TRANSACTIONS_SUCCESS", "Number of successfully added transactions", 0 },
{ ADDED_TRANSACTIONS_FAILURE, "ADDED_TRANSACTIONS_FAILURE", "Number of faulty added transactions", 0 }
},
MetricScope::Network => {
{ MEMPOOL_P2P_NUM_SENT_MESSAGES, "apollo_mempool_num_sent_messages", "The number of messages sent by the mempool p2p component", 0 },
{ MEMPOOL_P2P_NUM_RECEIVED_MESSAGES, "apollo_mempool_num_received_messages", "The number of messages received by the mempool p2p component", 0 },
{ CONSENSUS_NUM_SENT_MESSAGES, "apollo_consensus_num_sent_messages", "The number of messages sent by the consensus p2p component", 0 },
{ CONSENSUS_NUM_RECEIVED_MESSAGES, "apollo_consensus_num_received_messages", "The number of messages received by the consensus p2p component", 0 },
}
);
20 changes: 12 additions & 8 deletions crates/starknet_state_sync/src/runner/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@ use futures::future::{self, BoxFuture};
use futures::never::Never;
use futures::{FutureExt, StreamExt};
use papyrus_common::pending_classes::PendingClasses;
use papyrus_network::network_manager::{self, metrics, NetworkError, NetworkManager};
use papyrus_network::network_manager::metrics::{NetworkMetrics, SqmrNetworkMetrics};
use papyrus_network::network_manager::{self, NetworkError, NetworkManager};
use papyrus_p2p_sync::client::{
P2pSyncClient,
P2pSyncClientChannels,
Expand All @@ -35,9 +36,9 @@ use starknet_sequencer_infra::component_definitions::ComponentStarter;
use starknet_sequencer_infra::component_server::WrapperServer;
use starknet_sequencer_infra::errors::ComponentError;
use starknet_sequencer_metrics::metric_definitions::{
STATE_SYNC_NUM_ACTIVE_INBOUND_SESSIONS,
STATE_SYNC_NUM_ACTIVE_OUTBOUND_SESSIONS,
STATE_SYNC_NUM_CONNECTED_PEERS,
STATE_SYNC_P2P_NUM_ACTIVE_INBOUND_SESSIONS,
STATE_SYNC_P2P_NUM_ACTIVE_OUTBOUND_SESSIONS,
STATE_SYNC_P2P_NUM_CONNECTED_PEERS,
};
use starknet_state_sync_types::state_sync_types::SyncBlock;
use tokio::sync::RwLock;
Expand Down Expand Up @@ -90,10 +91,13 @@ impl StateSyncRunner {
network_config,
} = config;

let network_manager_metrics = Some(metrics::NetworkMetrics {
num_connected_peers: STATE_SYNC_NUM_CONNECTED_PEERS,
num_active_inbound_sessions: STATE_SYNC_NUM_ACTIVE_INBOUND_SESSIONS,
num_active_outbound_sessions: STATE_SYNC_NUM_ACTIVE_OUTBOUND_SESSIONS,
let network_manager_metrics = Some(NetworkMetrics {
num_connected_peers: STATE_SYNC_P2P_NUM_CONNECTED_PEERS,
broadcast_metrics: None,
sqmr_metrics: Some(SqmrNetworkMetrics {
num_active_inbound_sessions: STATE_SYNC_P2P_NUM_ACTIVE_INBOUND_SESSIONS,
num_active_outbound_sessions: STATE_SYNC_P2P_NUM_ACTIVE_OUTBOUND_SESSIONS,
}),
});
let mut network_manager = network_manager::NetworkManager::new(
network_config,
Expand Down
Loading