Skip to content

Commit

Permalink
refactor(papyrus_network): fix CR comments
Browse files Browse the repository at this point in the history
  • Loading branch information
AlonLStarkWare committed Feb 11, 2025
1 parent 612604d commit e75d412
Show file tree
Hide file tree
Showing 6 changed files with 40 additions and 39 deletions.
18 changes: 10 additions & 8 deletions crates/papyrus_network/src/network_manager/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,16 +13,22 @@ impl BroadcastNetworkMetrics {
}
}

pub struct SqmrNetworkMetrics {}
pub struct SqmrNetworkMetrics {
pub num_active_inbound_sessions: MetricGauge,
pub num_active_outbound_sessions: MetricGauge,
}

impl SqmrNetworkMetrics {
pub fn register(&self) {}
pub fn register(&self) {
let num_active_inbound_sessions_metric = self.num_active_inbound_sessions.register();
num_active_inbound_sessions_metric.set(0f64);
let num_active_outbound_sessions_metric = self.num_active_outbound_sessions.register();
num_active_outbound_sessions_metric.set(0f64);
}
}

pub struct NetworkMetrics {
pub num_connected_peers: MetricGauge,
pub num_active_inbound_sessions: MetricGauge,
pub num_active_outbound_sessions: MetricGauge,
pub broadcast_metrics: Option<BroadcastNetworkMetrics>,
pub sqmr_metrics: Option<SqmrNetworkMetrics>,
}
Expand All @@ -31,10 +37,6 @@ impl NetworkMetrics {
pub fn register(&self) {
let num_connected_peers_metric = self.num_connected_peers.register();
num_connected_peers_metric.set(0f64);
let num_active_inbound_sessions_metric = self.num_active_inbound_sessions.register();
num_active_inbound_sessions_metric.set(0f64);
let num_active_outbound_sessions_metric = self.num_active_outbound_sessions.register();
num_active_outbound_sessions_metric.set(0f64);
if let Some(broadcast_metrics) = self.broadcast_metrics.as_ref() {
broadcast_metrics.register();
}
Expand Down
42 changes: 26 additions & 16 deletions crates/papyrus_network/src/network_manager/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -417,8 +417,10 @@ impl<SwarmT: SwarmTrait> GenericNetworkManager<SwarmT> {
);
return;
};
if let Some(metrics) = self.metrics.as_ref() {
metrics.num_active_inbound_sessions.increment(1);
if let Some(sqmr_metrics) =
self.metrics.as_ref().and_then(|metrics| metrics.sqmr_metrics.as_ref())
{
sqmr_metrics.num_active_inbound_sessions.increment(1);
}
let (responses_sender, responses_receiver) = futures::channel::mpsc::channel(
*self
Expand Down Expand Up @@ -515,13 +517,14 @@ impl<SwarmT: SwarmTrait> GenericNetworkManager<SwarmT> {
&mut self,
event: gossipsub_impl::ExternalEvent,
) -> Result<(), NetworkError> {
if let Some(metrics) = self.metrics.as_ref() {
if let Some(broadcast_metrics) = metrics.broadcast_metrics.as_ref() {
broadcast_metrics.num_received_broadcast_messages.increment(1);
}
if let Some(broadcast_metrics) =
self.metrics.as_ref().and_then(|metrics| metrics.broadcast_metrics.as_ref())
{
broadcast_metrics.num_received_broadcast_messages.increment(1);
}
let gossipsub_impl::ExternalEvent::Received { originated_peer_id, message, topic_hash } =
event;
trace!("Received broadcast message with topic hash: {topic_hash:?}");
let broadcasted_message_metadata = BroadcastedMessageMetadata {
originator_id: OpaquePeerId::private_new(originated_peer_id),
encoded_message_length: message.len(),
Expand Down Expand Up @@ -583,33 +586,40 @@ impl<SwarmT: SwarmTrait> GenericNetworkManager<SwarmT> {
) {
let SqmrClientPayload { query, report_receiver, responses_sender } = client_payload;
let outbound_session_id = self.swarm.send_query(query, protocol.clone());
if let Some(metrics) = self.metrics.as_ref() {
metrics.num_active_outbound_sessions.increment(1);
if let Some(sqmr_metrics) =
self.metrics.as_ref().and_then(|metrics| metrics.sqmr_metrics.as_ref())
{
sqmr_metrics.num_active_outbound_sessions.increment(1);
}
self.sqmr_outbound_response_senders.insert(outbound_session_id, responses_sender);
self.sqmr_outbound_report_receivers_awaiting_assignment
.insert(outbound_session_id, report_receiver);
}

fn broadcast_message(&mut self, message: Bytes, topic_hash: TopicHash) {
if let Some(metrics) = self.metrics.as_ref() {
if let Some(broadcast_metrics) = metrics.broadcast_metrics.as_ref() {
broadcast_metrics.num_sent_broadcast_messages.increment(1);
}
if let Some(broadcast_metrics) =
self.metrics.as_ref().and_then(|metrics| metrics.broadcast_metrics.as_ref())
{
broadcast_metrics.num_sent_broadcast_messages.increment(1);
}
trace!("Sending broadcast message with topic hash: {topic_hash:?}");
self.swarm.broadcast_message(message, topic_hash);
}

fn report_session_removed_to_metrics(&mut self, session_id: SessionId) {
match session_id {
SessionId::InboundSessionId(_) => {
if let Some(metrics) = self.metrics.as_ref() {
metrics.num_active_inbound_sessions.decrement(1);
if let Some(sqmr_metrics) =
self.metrics.as_ref().and_then(|metrics| metrics.sqmr_metrics.as_ref())
{
sqmr_metrics.num_active_inbound_sessions.decrement(1);
}
}
SessionId::OutboundSessionId(_) => {
if let Some(metrics) = self.metrics.as_ref() {
metrics.num_active_outbound_sessions.decrement(1);
if let Some(sqmr_metrics) =
self.metrics.as_ref().and_then(|metrics| metrics.sqmr_metrics.as_ref())
{
sqmr_metrics.num_active_outbound_sessions.decrement(1);
}
}
}
Expand Down
4 changes: 0 additions & 4 deletions crates/starknet_consensus_manager/src/consensus_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,6 @@ use starknet_infra_utils::type_name::short_type_name;
use starknet_sequencer_infra::component_definitions::ComponentStarter;
use starknet_sequencer_infra::errors::ComponentError;
use starknet_sequencer_metrics::metric_definitions::{
CONSENSUS_NUM_ACTIVE_INBOUND_SESSIONS,
CONSENSUS_NUM_ACTIVE_OUTBOUND_SESSIONS,
CONSENSUS_NUM_CONNECTED_PEERS,
CONSENSUS_NUM_RECEIVED_MESSAGES,
CONSENSUS_NUM_SENT_MESSAGES,
Expand Down Expand Up @@ -60,8 +58,6 @@ impl ConsensusManager {

let network_manager_metrics = Some(NetworkMetrics {
num_connected_peers: CONSENSUS_NUM_CONNECTED_PEERS,
num_active_inbound_sessions: CONSENSUS_NUM_ACTIVE_INBOUND_SESSIONS,
num_active_outbound_sessions: CONSENSUS_NUM_ACTIVE_OUTBOUND_SESSIONS,
broadcast_metrics: Some(BroadcastNetworkMetrics {
num_sent_broadcast_messages: CONSENSUS_NUM_SENT_MESSAGES,
num_received_broadcast_messages: CONSENSUS_NUM_RECEIVED_MESSAGES,
Expand Down
4 changes: 0 additions & 4 deletions crates/starknet_mempool_p2p/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,6 @@ use starknet_class_manager_types::transaction_converter::TransactionConverter;
use starknet_class_manager_types::SharedClassManagerClient;
use starknet_gateway_types::communication::SharedGatewayClient;
use starknet_sequencer_metrics::metric_definitions::{
MEMPOOL_P2P_NUM_ACTIVE_INBOUND_SESSIONS,
MEMPOOL_P2P_NUM_ACTIVE_OUTBOUND_SESSIONS,
MEMPOOL_P2P_NUM_CONNECTED_PEERS,
MEMPOOL_P2P_NUM_RECEIVED_MESSAGES,
MEMPOOL_P2P_NUM_SENT_MESSAGES,
Expand All @@ -34,8 +32,6 @@ pub fn create_p2p_propagator_and_runner(
);
let network_manager_metrics = Some(NetworkMetrics {
num_connected_peers: MEMPOOL_P2P_NUM_CONNECTED_PEERS,
num_active_inbound_sessions: MEMPOOL_P2P_NUM_ACTIVE_INBOUND_SESSIONS,
num_active_outbound_sessions: MEMPOOL_P2P_NUM_ACTIVE_OUTBOUND_SESSIONS,
broadcast_metrics: Some(BroadcastNetworkMetrics {
num_sent_broadcast_messages: MEMPOOL_P2P_NUM_SENT_MESSAGES,
num_received_broadcast_messages: MEMPOOL_P2P_NUM_RECEIVED_MESSAGES,
Expand Down
4 changes: 0 additions & 4 deletions crates/starknet_sequencer_metrics/src/metric_definitions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,11 +79,7 @@ define_gauge_metrics!(
},
MetricScope::Network => {
{ MEMPOOL_P2P_NUM_CONNECTED_PEERS, "apollo_mempool_p2p_num_connected_peers", "The number of connected peers to the mempool p2p component" },
{ MEMPOOL_P2P_NUM_ACTIVE_INBOUND_SESSIONS, "apollo_mempool_p2p_num_active_inbound_sessions", "The number of inbound sessions to the mempool p2p component" },
{ MEMPOOL_P2P_NUM_ACTIVE_OUTBOUND_SESSIONS, "apollo_mempool_p2p_num_active_outbound_sessions", "The number of outbound sessions to the mempool p2p component" },
{ CONSENSUS_NUM_CONNECTED_PEERS, "apollo_consensus_num_connected_peers", "The number of connected peers to the consensus p2p component" },
{ CONSENSUS_NUM_ACTIVE_INBOUND_SESSIONS, "apollo_consensus_num_active_inbound_sessions", "The number of inbound sessions to the consensus p2p component" },
{ CONSENSUS_NUM_ACTIVE_OUTBOUND_SESSIONS, "apollo_consensus_num_active_outbound_sessions", "The number of outbound sessions to the consensus p2p component" },
{ STATE_SYNC_P2P_NUM_CONNECTED_PEERS, "apollo_sync_num_connected_peers", "The number of connected peers to the state sync p2p component" },
{ STATE_SYNC_P2P_NUM_ACTIVE_INBOUND_SESSIONS, "apollo_sync_num_active_inbound_sessions", "The number of inbound sessions to the state sync p2p component" },
{ STATE_SYNC_P2P_NUM_ACTIVE_OUTBOUND_SESSIONS, "apollo_sync_num_active_outbound_sessions", "The number of outbound sessions to the state sync p2p component" },
Expand Down
7 changes: 4 additions & 3 deletions crates/starknet_state_sync/src/runner/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,10 +93,11 @@ impl StateSyncRunner {

let network_manager_metrics = Some(NetworkMetrics {
num_connected_peers: STATE_SYNC_P2P_NUM_CONNECTED_PEERS,
num_active_inbound_sessions: STATE_SYNC_P2P_NUM_ACTIVE_INBOUND_SESSIONS,
num_active_outbound_sessions: STATE_SYNC_P2P_NUM_ACTIVE_OUTBOUND_SESSIONS,
broadcast_metrics: None,
sqmr_metrics: Some(SqmrNetworkMetrics {}),
sqmr_metrics: Some(SqmrNetworkMetrics {
num_active_inbound_sessions: STATE_SYNC_P2P_NUM_ACTIVE_INBOUND_SESSIONS,
num_active_outbound_sessions: STATE_SYNC_P2P_NUM_ACTIVE_OUTBOUND_SESSIONS,
}),
});
let mut network_manager = network_manager::NetworkManager::new(
network_config,
Expand Down

0 comments on commit e75d412

Please sign in to comment.