diff --git a/quic-forward-proxy-integration-test/tests/quic_proxy_tpu_integrationtest.rs b/quic-forward-proxy-integration-test/tests/quic_proxy_tpu_integrationtest.rs index 06ec9307..62dc6b56 100644 --- a/quic-forward-proxy-integration-test/tests/quic_proxy_tpu_integrationtest.rs +++ b/quic-forward-proxy-integration-test/tests/quic_proxy_tpu_integrationtest.rs @@ -474,27 +474,27 @@ async fn start_literpc_client_direct_mode( TpuConnectionManager::new(certificate, key, fanout_slots as usize).await; // this effectively controls how many connections we will have - let mut connections_to_keep: HashMap = HashMap::new(); + let mut connections_to_keep = HashSet::<(Pubkey, SocketAddr)>::new(); let addr1 = UdpSocket::bind("127.0.0.1:0") .unwrap() .local_addr() .unwrap(); - connections_to_keep.insert( + connections_to_keep.insert(( Pubkey::from_str("1111111jepwNWbYG87sgwnBbUJnQHrPiUJzMpqJXZ")?, addr1, - ); + )); let addr2 = UdpSocket::bind("127.0.0.1:0") .unwrap() .local_addr() .unwrap(); - connections_to_keep.insert( + connections_to_keep.insert(( Pubkey::from_str("1111111k4AYMctpyJakWNvGcte6tR8BLyZw54R8qu")?, addr2, - ); + )); // this is the real streamer - connections_to_keep.insert(literpc_validator_identity.pubkey(), streamer_listen_addrs); + connections_to_keep.insert((literpc_validator_identity.pubkey(), streamer_listen_addrs)); // get information about the optional validator identity stake // populated from get_stakes_for_identity() diff --git a/services/src/tpu_utils/quic_proxy_connection_manager.rs b/services/src/tpu_utils/quic_proxy_connection_manager.rs index 07ee1820..ca33d529 100644 --- a/services/src/tpu_utils/quic_proxy_connection_manager.rs +++ b/services/src/tpu_utils/quic_proxy_connection_manager.rs @@ -1,4 +1,4 @@ -use std::collections::HashMap; +use std::collections::HashSet; use std::net::{SocketAddr, UdpSocket}; use std::sync::atomic::AtomicBool; use std::sync::atomic::Ordering::Relaxed; @@ -64,7 +64,7 @@ impl QuicProxyConnectionManager { &self, broadcast_receiver: Receiver, // for duration of this slot these tpu nodes will receive the transactions - connections_to_keep: HashMap, + connections_to_keep: HashSet<(Pubkey, SocketAddr)>, connection_parameters: QuicConnectionParameters, ) { debug!( diff --git a/services/src/tpu_utils/tpu_connection_manager.rs b/services/src/tpu_utils/tpu_connection_manager.rs index 8f92cb30..beba2c6c 100644 --- a/services/src/tpu_utils/tpu_connection_manager.rs +++ b/services/src/tpu_utils/tpu_connection_manager.rs @@ -14,7 +14,7 @@ use solana_lite_rpc_core::{ use solana_sdk::pubkey::Pubkey; use solana_streamer::nonblocking::quic::compute_max_allowed_uni_streams; use std::{ - collections::HashMap, + collections::HashSet, net::SocketAddr, sync::{ atomic::{AtomicBool, Ordering}, @@ -254,7 +254,7 @@ struct ActiveConnectionWithExitNotifier { pub struct TpuConnectionManager { endpoints: RotatingQueue, - identity_to_active_connection: Arc>>, + active_connections: Arc>>, } impl TpuConnectionManager { @@ -268,21 +268,22 @@ impl TpuConnectionManager { endpoints: RotatingQueue::new(number_of_clients, || { QuicConnectionUtils::create_endpoint(certificate.clone(), key.clone()) }), - identity_to_active_connection: Arc::new(DashMap::new()), + active_connections: Arc::new(DashMap::new()), } } pub async fn update_connections( &self, broadcast_sender: Arc>, - connections_to_keep: HashMap, + connections_to_keep: HashSet<(Pubkey, SocketAddr)>, identity_stakes: IdentityStakesData, data_cache: DataCache, connection_parameters: QuicConnectionParameters, ) { NB_CONNECTIONS_TO_KEEP.set(connections_to_keep.len() as i64); for (identity, socket_addr) in &connections_to_keep { - if self.identity_to_active_connection.get(identity).is_none() { + let connection_key = (*identity, *socket_addr); + if self.active_connections.get(&connection_key).is_none() { trace!("added a connection for {}, {}", identity, socket_addr); let active_connection = ActiveConnection::new( self.endpoints.clone(), @@ -300,8 +301,8 @@ impl TpuConnectionManager { exit_notifier.clone(), identity_stakes, ); - self.identity_to_active_connection.insert( - *identity, + self.active_connections.insert( + connection_key, Arc::new(ActiveConnectionWithExitNotifier { active_connection, exit_notifier, @@ -311,9 +312,9 @@ impl TpuConnectionManager { } // remove connections which are no longer needed - self.identity_to_active_connection.retain(|key, value| { - if !connections_to_keep.contains_key(key) { - trace!("removing a connection for {}", key.to_string()); + self.active_connections.retain(|key, value| { + if !connections_to_keep.contains(key) { + trace!("removing a connection for {} {}", key.0, key.1); // ignore error for exit channel value .active_connection diff --git a/services/src/tpu_utils/tpu_service.rs b/services/src/tpu_utils/tpu_service.rs index 087a71f0..da2f4203 100644 --- a/services/src/tpu_utils/tpu_service.rs +++ b/services/src/tpu_utils/tpu_service.rs @@ -138,11 +138,17 @@ impl TpuService { (x.pubkey, tpu_port) }) .filter(|x| x.1.is_some()) - .map(|x| { - let mut addr = x.1.unwrap(); + .flat_map(|x| { + let mut tpu_addr = x.1.unwrap(); // add quic port offset - addr.set_port(addr.port() + QUIC_PORT_OFFSET); - (x.0, addr) + tpu_addr.set_port(tpu_addr.port() + QUIC_PORT_OFFSET); + + // Technically the forwards port could be anywhere and unfortunately getClusterNodes + // does not report it. However it's nearly always directly after the tpu port. + let mut tpu_forwards_addr = tpu_addr.clone(); + tpu_forwards_addr.set_port(tpu_addr.port() + 1); + + [(x.0, tpu_addr), (x.0, tpu_forwards_addr)] }) .collect();