Skip to content

Commit

Permalink
wip: Make connections also to the tpu_forwards port
Browse files Browse the repository at this point in the history
  • Loading branch information
ckamm committed Mar 21, 2024
1 parent 9fabdab commit a67ef1b
Show file tree
Hide file tree
Showing 4 changed files with 29 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -474,27 +474,27 @@ async fn start_literpc_client_direct_mode(
TpuConnectionManager::new(certificate, key, fanout_slots as usize).await;

// this effectively controls how many connections we will have
let mut connections_to_keep: HashMap<Pubkey, SocketAddr> = HashMap::new();
let mut connections_to_keep = HashSet::<(Pubkey, SocketAddr)>::new();
let addr1 = UdpSocket::bind("127.0.0.1:0")
.unwrap()
.local_addr()
.unwrap();
connections_to_keep.insert(
connections_to_keep.insert((
Pubkey::from_str("1111111jepwNWbYG87sgwnBbUJnQHrPiUJzMpqJXZ")?,
addr1,
);
));

let addr2 = UdpSocket::bind("127.0.0.1:0")
.unwrap()
.local_addr()
.unwrap();
connections_to_keep.insert(
connections_to_keep.insert((
Pubkey::from_str("1111111k4AYMctpyJakWNvGcte6tR8BLyZw54R8qu")?,
addr2,
);
));

// this is the real streamer
connections_to_keep.insert(literpc_validator_identity.pubkey(), streamer_listen_addrs);
connections_to_keep.insert((literpc_validator_identity.pubkey(), streamer_listen_addrs));

// get information about the optional validator identity stake
// populated from get_stakes_for_identity()
Expand Down
4 changes: 2 additions & 2 deletions services/src/tpu_utils/quic_proxy_connection_manager.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::collections::HashMap;
use std::collections::HashSet;
use std::net::{SocketAddr, UdpSocket};
use std::sync::atomic::AtomicBool;
use std::sync::atomic::Ordering::Relaxed;
Expand Down Expand Up @@ -64,7 +64,7 @@ impl QuicProxyConnectionManager {
&self,
broadcast_receiver: Receiver<SentTransactionInfo>,
// for duration of this slot these tpu nodes will receive the transactions
connections_to_keep: HashMap<Pubkey, SocketAddr>,
connections_to_keep: HashSet<(Pubkey, SocketAddr)>,
connection_parameters: QuicConnectionParameters,
) {
debug!(
Expand Down
21 changes: 11 additions & 10 deletions services/src/tpu_utils/tpu_connection_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use solana_lite_rpc_core::{
use solana_sdk::pubkey::Pubkey;
use solana_streamer::nonblocking::quic::compute_max_allowed_uni_streams;
use std::{
collections::HashMap,
collections::HashSet,
net::SocketAddr,
sync::{
atomic::{AtomicBool, Ordering},
Expand Down Expand Up @@ -254,7 +254,7 @@ struct ActiveConnectionWithExitNotifier {

pub struct TpuConnectionManager {
endpoints: RotatingQueue<Endpoint>,
identity_to_active_connection: Arc<DashMap<Pubkey, Arc<ActiveConnectionWithExitNotifier>>>,
active_connections: Arc<DashMap<(Pubkey, SocketAddr), Arc<ActiveConnectionWithExitNotifier>>>,
}

impl TpuConnectionManager {
Expand All @@ -268,21 +268,22 @@ impl TpuConnectionManager {
endpoints: RotatingQueue::new(number_of_clients, || {
QuicConnectionUtils::create_endpoint(certificate.clone(), key.clone())
}),
identity_to_active_connection: Arc::new(DashMap::new()),
active_connections: Arc::new(DashMap::new()),
}
}

pub async fn update_connections(
&self,
broadcast_sender: Arc<Sender<SentTransactionInfo>>,
connections_to_keep: HashMap<Pubkey, SocketAddr>,
connections_to_keep: HashSet<(Pubkey, SocketAddr)>,
identity_stakes: IdentityStakesData,
data_cache: DataCache,
connection_parameters: QuicConnectionParameters,
) {
NB_CONNECTIONS_TO_KEEP.set(connections_to_keep.len() as i64);
for (identity, socket_addr) in &connections_to_keep {
if self.identity_to_active_connection.get(identity).is_none() {
let connection_key = (*identity, *socket_addr);
if self.active_connections.get(&connection_key).is_none() {
trace!("added a connection for {}, {}", identity, socket_addr);
let active_connection = ActiveConnection::new(
self.endpoints.clone(),
Expand All @@ -300,8 +301,8 @@ impl TpuConnectionManager {
exit_notifier.clone(),
identity_stakes,
);
self.identity_to_active_connection.insert(
*identity,
self.active_connections.insert(
connection_key,
Arc::new(ActiveConnectionWithExitNotifier {
active_connection,
exit_notifier,
Expand All @@ -311,9 +312,9 @@ impl TpuConnectionManager {
}

// remove connections which are no longer needed
self.identity_to_active_connection.retain(|key, value| {
if !connections_to_keep.contains_key(key) {
trace!("removing a connection for {}", key.to_string());
self.active_connections.retain(|key, value| {
if !connections_to_keep.contains(key) {
trace!("removing a connection for {} {}", key.0, key.1);
// ignore error for exit channel
value
.active_connection
Expand Down
14 changes: 10 additions & 4 deletions services/src/tpu_utils/tpu_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -138,11 +138,17 @@ impl TpuService {
(x.pubkey, tpu_port)
})
.filter(|x| x.1.is_some())
.map(|x| {
let mut addr = x.1.unwrap();
.flat_map(|x| {
let mut tpu_addr = x.1.unwrap();
// add quic port offset
addr.set_port(addr.port() + QUIC_PORT_OFFSET);
(x.0, addr)
tpu_addr.set_port(tpu_addr.port() + QUIC_PORT_OFFSET);

// Technically the forwards port could be anywhere and unfortunately getClusterNodes
// does not report it. However it's nearly always directly after the tpu port.
let mut tpu_forwards_addr = tpu_addr.clone();
tpu_forwards_addr.set_port(tpu_addr.port() + 1);

[(x.0, tpu_addr), (x.0, tpu_forwards_addr)]
})
.collect();

Expand Down

0 comments on commit a67ef1b

Please sign in to comment.