Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

wip: Make connections also to the tpu_forwards port #385

Merged
merged 2 commits into from
Apr 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions examples/custom-tpu-send-transactions/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -360,6 +360,7 @@ async fn main() -> anyhow::Result<()> {
number_of_transactions_per_unistream: 1,
unistreams_to_create_new_connection_in_percentage: 5,
prioritization_heap_size,
enable_tpu_forwarding: None,
},
tpu_connection_path: TpuConnectionPath::QuicDirectPath,
};
Expand Down
4 changes: 4 additions & 0 deletions lite-rpc/src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -393,5 +393,9 @@ fn quic_params_from_environment() -> Option<QuicConnectionParameters> {
quic_connection_parameters.unistreams_to_create_new_connection_in_percentage,
);

quic_connection_parameters.enable_tpu_forwarding = env::var("ENABLE_TPU_FORWARDING")
.map(|value| Some(value.parse::<bool>().unwrap()))
.unwrap_or(quic_connection_parameters.enable_tpu_forwarding);

Some(quic_connection_parameters)
}
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ const QUIC_CONNECTION_PARAMS: QuicConnectionParameters = QuicConnectionParameter
number_of_transactions_per_unistream: 10,
unistreams_to_create_new_connection_in_percentage: 10,
prioritization_heap_size: None,
enable_tpu_forwarding: None,
};

#[test]
Expand Down Expand Up @@ -475,27 +476,27 @@ async fn start_literpc_client_direct_mode(
TpuConnectionManager::new(certificate, key, fanout_slots as usize).await;

// this effectively controls how many connections we will have
let mut connections_to_keep: HashMap<Pubkey, SocketAddr> = HashMap::new();
let mut connections_to_keep = HashSet::<(Pubkey, SocketAddr)>::new();
let addr1 = UdpSocket::bind("127.0.0.1:0")
.unwrap()
.local_addr()
.unwrap();
connections_to_keep.insert(
connections_to_keep.insert((
Pubkey::from_str("1111111jepwNWbYG87sgwnBbUJnQHrPiUJzMpqJXZ")?,
addr1,
);
));

let addr2 = UdpSocket::bind("127.0.0.1:0")
.unwrap()
.local_addr()
.unwrap();
connections_to_keep.insert(
connections_to_keep.insert((
Pubkey::from_str("1111111k4AYMctpyJakWNvGcte6tR8BLyZw54R8qu")?,
addr2,
);
));

// this is the real streamer
connections_to_keep.insert(literpc_validator_identity.pubkey(), streamer_listen_addrs);
connections_to_keep.insert((literpc_validator_identity.pubkey(), streamer_listen_addrs));

// get information about the optional validator identity stake
// populated from get_stakes_for_identity()
Expand Down Expand Up @@ -575,27 +576,27 @@ async fn start_literpc_client_proxy_mode(
QuicProxyConnectionManager::new(certificate, key, forward_proxy_address).await;

// this effectively controls how many connections we will have
let mut connections_to_keep: HashMap<Pubkey, SocketAddr> = HashMap::new();
let mut connections_to_keep: HashSet<(Pubkey, SocketAddr)> = HashSet::new();
let addr1 = UdpSocket::bind("127.0.0.1:0")
.unwrap()
.local_addr()
.unwrap();
connections_to_keep.insert(
connections_to_keep.insert((
Pubkey::from_str("1111111jepwNWbYG87sgwnBbUJnQHrPiUJzMpqJXZ")?,
addr1,
);
));

let addr2 = UdpSocket::bind("127.0.0.1:0")
.unwrap()
.local_addr()
.unwrap();
connections_to_keep.insert(
connections_to_keep.insert((
Pubkey::from_str("1111111k4AYMctpyJakWNvGcte6tR8BLyZw54R8qu")?,
addr2,
);
));

// this is the real streamer
connections_to_keep.insert(validator_identity.pubkey(), streamer_listen_addrs);
connections_to_keep.insert((validator_identity.pubkey(), streamer_listen_addrs));

// get information about the optional validator identity stake
// populated from get_stakes_for_identity()
Expand Down
2 changes: 2 additions & 0 deletions services/src/quic_connection_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ pub struct QuicConnectionParameters {
pub number_of_transactions_per_unistream: usize,
pub unistreams_to_create_new_connection_in_percentage: u8,
pub prioritization_heap_size: Option<usize>,
pub enable_tpu_forwarding: Option<bool>,
}

impl Default for QuicConnectionParameters {
Expand All @@ -123,6 +124,7 @@ impl Default for QuicConnectionParameters {
number_of_transactions_per_unistream: 1,
unistreams_to_create_new_connection_in_percentage: 10,
prioritization_heap_size: None,
enable_tpu_forwarding: None,
}
}
}
Expand Down
4 changes: 2 additions & 2 deletions services/src/tpu_utils/quic_proxy_connection_manager.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::collections::HashMap;
use std::collections::HashSet;
use std::net::{SocketAddr, UdpSocket};
use std::sync::atomic::AtomicBool;
use std::sync::atomic::Ordering::Relaxed;
Expand Down Expand Up @@ -64,7 +64,7 @@ impl QuicProxyConnectionManager {
&self,
broadcast_receiver: Receiver<SentTransactionInfo>,
// for duration of this slot these tpu nodes will receive the transactions
connections_to_keep: HashMap<Pubkey, SocketAddr>,
connections_to_keep: HashSet<(Pubkey, SocketAddr)>,
connection_parameters: QuicConnectionParameters,
) {
debug!(
Expand Down
21 changes: 11 additions & 10 deletions services/src/tpu_utils/tpu_connection_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use solana_lite_rpc_core::{
};
use solana_sdk::pubkey::Pubkey;
use solana_streamer::nonblocking::quic::compute_max_allowed_uni_streams;
use std::{collections::HashMap, net::SocketAddr, sync::Arc, time::Duration};
use std::{collections::HashSet, net::SocketAddr, sync::Arc, time::Duration};
use tokio::sync::{
broadcast::{self, Receiver, Sender},
Notify,
Expand Down Expand Up @@ -242,7 +242,7 @@ impl ActiveConnection {

pub struct TpuConnectionManager {
endpoints: RotatingQueue<Endpoint>,
identity_to_active_connection: Arc<DashMap<Pubkey, ActiveConnection>>,
active_connections: Arc<DashMap<(Pubkey, SocketAddr), ActiveConnection>>,
}

impl TpuConnectionManager {
Expand All @@ -256,21 +256,22 @@ impl TpuConnectionManager {
endpoints: RotatingQueue::new(number_of_clients, || {
QuicConnectionUtils::create_endpoint(certificate.clone(), key.clone())
}),
identity_to_active_connection: Arc::new(DashMap::new()),
active_connections: Arc::new(DashMap::new()),
}
}

pub async fn update_connections(
&self,
broadcast_sender: Arc<Sender<SentTransactionInfo>>,
connections_to_keep: HashMap<Pubkey, SocketAddr>,
connections_to_keep: HashSet<(Pubkey, SocketAddr)>,
identity_stakes: IdentityStakesData,
data_cache: DataCache,
connection_parameters: QuicConnectionParameters,
) {
NB_CONNECTIONS_TO_KEEP.set(connections_to_keep.len() as i64);
for (identity, socket_addr) in &connections_to_keep {
if self.identity_to_active_connection.get(identity).is_none() {
let connection_key = (*identity, *socket_addr);
if self.active_connections.get(&connection_key).is_none() {
trace!("added a connection for {}, {}", identity, socket_addr);
let active_connection = ActiveConnection::new(
self.endpoints.clone(),
Expand All @@ -282,15 +283,15 @@ impl TpuConnectionManager {
// using mpsc as a oneshot channel/ because with one shot channel we cannot reuse the reciever
let broadcast_receiver = broadcast_sender.subscribe();
active_connection.start_listening(broadcast_receiver, identity_stakes);
self.identity_to_active_connection
.insert(*identity, active_connection);
self.active_connections
.insert((*identity, *socket_addr), active_connection);
}
}

// remove connections which are no longer needed
self.identity_to_active_connection.retain(|key, value| {
if !connections_to_keep.contains_key(key) {
trace!("removing a connection for {}", key.to_string());
self.active_connections.retain(|key, value| {
if !connections_to_keep.contains(key) {
trace!("removing a connection for {} {}", key.0, key.1);
// ignore error for exit channel
let _ = value.exit_notifier.send(());
false
Expand Down
40 changes: 33 additions & 7 deletions services/src/tpu_utils/tpu_service.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use anyhow::Context;
use prometheus::{core::GenericGauge, opts, register_int_gauge};
use solana_streamer::nonblocking::quic::ConnectionPeerType;

use super::tpu_connection_manager::TpuConnectionManager;
use crate::quic_connection_utils::QuicConnectionParameters;
Expand All @@ -15,7 +16,7 @@ use solana_lite_rpc_core::types::SlotStream;
use solana_lite_rpc_core::AnyhowJoinHandle;
use solana_sdk::{quic::QUIC_PORT_OFFSET, signature::Keypair, slot_history::Slot};
use solana_streamer::tls_certificates::new_self_signed_tls_certificate;
use std::collections::HashMap;
use std::collections::HashSet;
use std::{
net::{IpAddr, Ipv4Addr},
sync::Arc,
Expand Down Expand Up @@ -128,8 +129,21 @@ impl TpuService {
.leader_schedule
.get_slot_leaders(current_slot, last_slot)
.await?;

let identity_stakes = self.data_cache.identity_stakes.get_stakes().await;

let enable_tpu_forwards = {
match identity_stakes.peer_type {
ConnectionPeerType::Unstaked => false,
ConnectionPeerType::Staked => self
.config
.quic_connection_params
.enable_tpu_forwarding
.unwrap_or_default(),
}
};
// get next leader with its tpu port
let connections_to_keep: HashMap<_, _> = next_leaders
let connections_to_keep: HashSet<_, _> = next_leaders
.iter()
.map(|x| {
let contact_info = cluster_nodes.get(&x.pubkey);
Expand All @@ -140,11 +154,23 @@ impl TpuService {
(x.pubkey, tpu_port)
})
.filter(|x| x.1.is_some())
.map(|x| {
let mut addr = x.1.unwrap();
.flat_map(|x| {
let mut addresses = vec![];
let mut tpu_addr = x.1.unwrap();
// add quic port offset
addr.set_port(addr.port() + QUIC_PORT_OFFSET);
(x.0, addr)
tpu_addr.set_port(tpu_addr.port() + QUIC_PORT_OFFSET);

addresses.push((x.0, tpu_addr));

if enable_tpu_forwards {
// Technically the forwards port could be anywhere and unfortunately getClusterNodes
// does not report it. However it's nearly always directly after the tpu port.
let mut tpu_forwards_addr = tpu_addr;
tpu_forwards_addr.set_port(tpu_addr.port() + 1);
addresses.push((x.0, tpu_forwards_addr));
}

addresses
})
.collect();

Expand All @@ -156,7 +182,7 @@ impl TpuService {
.update_connections(
self.broadcast_sender.clone(),
connections_to_keep,
self.data_cache.identity_stakes.get_stakes().await,
identity_stakes,
self.data_cache.clone(),
self.config.quic_connection_params,
)
Expand Down
Loading