From bd0655a3326f000869a4a947d4d255846d524ca2 Mon Sep 17 00:00:00 2001 From: GroovieGermanikus Date: Thu, 13 Jun 2024 10:37:20 +0200 Subject: [PATCH] clippy+fmt --- .../src/grpc/grpc_accounts_streaming.rs | 12 +-- cluster-endpoints/src/grpc_subscription.rs | 24 ++--- .../src/rpc_polling/poll_blocks.rs | 28 +++--- .../tests/quic_proxy_tpu_integrationtest.rs | 96 ++++++++++--------- services/src/transaction_service.rs | 4 +- 5 files changed, 81 insertions(+), 83 deletions(-) diff --git a/cluster-endpoints/src/grpc/grpc_accounts_streaming.rs b/cluster-endpoints/src/grpc/grpc_accounts_streaming.rs index f7ebd5b3..dc4a7f5b 100644 --- a/cluster-endpoints/src/grpc/grpc_accounts_streaming.rs +++ b/cluster-endpoints/src/grpc/grpc_accounts_streaming.rs @@ -1,3 +1,4 @@ +use anyhow::anyhow; use futures::StreamExt; use merge_streams::MergeStreams; use std::{ @@ -5,10 +6,11 @@ use std::{ sync::Arc, time::Duration, }; -use anyhow::anyhow; -use geyser_grpc_connector::yellowstone_grpc_util::{connect_with_timeout_with_buffers, GeyserGrpcClientBufferConfig, GeyserGrpcWrappedResult}; -use geyser_grpc_connector::{GeyserGrpcClient, GeyserGrpcClientResult, GrpcSourceConfig}; +use geyser_grpc_connector::yellowstone_grpc_util::{ + connect_with_timeout_with_buffers, GeyserGrpcClientBufferConfig, +}; +use geyser_grpc_connector::{GeyserGrpcClient, GrpcSourceConfig}; use itertools::Itertools; use solana_lite_rpc_core::{ commitment_utils::Commitment, @@ -219,9 +221,7 @@ async fn create_connection( }, ) .await - .map_err(|e| { - anyhow!("Failed to connect to grpc source: {e:?}") - }) + .map_err(|e| anyhow!("Failed to connect to grpc source: {e:?}")) } pub fn create_grpc_account_streaming( diff --git a/cluster-endpoints/src/grpc_subscription.rs b/cluster-endpoints/src/grpc_subscription.rs index 6eb0f352..57a55f38 100644 --- a/cluster-endpoints/src/grpc_subscription.rs +++ b/cluster-endpoints/src/grpc_subscription.rs @@ -3,11 +3,6 @@ use crate::grpc::grpc_accounts_streaming::create_grpc_account_streaming; use crate::grpc_multiplex::{ create_grpc_multiplex_blocks_subscription, create_grpc_multiplex_processed_slots_subscription, }; -use anyhow::Context; -use futures::StreamExt; -use geyser_grpc_connector::yellowstone_grpc_util::{ - connect_with_timeout_with_buffers, GeyserGrpcClientBufferConfig, -}; use geyser_grpc_connector::GrpcSourceConfig; use itertools::Itertools; use log::trace; @@ -35,13 +30,9 @@ use solana_sdk::{ }; use solana_transaction_status::{Reward, RewardType}; use std::cell::OnceCell; -use std::collections::HashMap; use std::sync::Arc; -use std::time::Duration; -use tokio::sync::{broadcast, Notify}; +use tokio::sync::Notify; use tracing::trace_span; -use yellowstone_grpc_proto::geyser::subscribe_update::UpdateOneof; -use yellowstone_grpc_proto::geyser::{CommitmentLevel, SubscribeRequestFilterBlocks}; use crate::rpc_polling::vote_accounts_and_cluster_info_polling::{ poll_cluster_info, poll_vote_accounts, @@ -222,8 +213,9 @@ fn map_compute_budget_instructions(message: &VersionedMessage) -> (Option, .program_id(message.static_account_keys()) .eq(&compute_budget::id()) }) { - if let Ok(budget_ins) = - solana_sdk::borsh1::try_from_slice_unchecked::(compute_budget_ins.data.as_slice()) + if let Ok(budget_ins) = solana_sdk::borsh1::try_from_slice_unchecked::< + ComputeBudgetInstruction, + >(compute_budget_ins.data.as_slice()) { match budget_ins { // aka cu requested @@ -245,12 +237,8 @@ fn map_compute_budget_instructions(message: &VersionedMessage) -> (Option, } } - let cu_requested = cu_requested_cell - .get() - .cloned(); - let prioritization_fees = prioritization_fees_cell - .get() - .cloned(); + let cu_requested = cu_requested_cell.get().cloned(); + let prioritization_fees = prioritization_fees_cell.get().cloned(); (cu_requested, prioritization_fees) } diff --git a/cluster-endpoints/src/rpc_polling/poll_blocks.rs b/cluster-endpoints/src/rpc_polling/poll_blocks.rs index e4607539..7bf513e9 100644 --- a/cluster-endpoints/src/rpc_polling/poll_blocks.rs +++ b/cluster-endpoints/src/rpc_polling/poll_blocks.rs @@ -222,7 +222,7 @@ pub fn from_ui_block( _ => None, }; - let mut cu_requested = tx.message.instructions().iter().find_map(|i| { + let cu_requested = tx.message.instructions().iter().find_map(|i| { if i.program_id(tx.message.static_account_keys()) .eq(&compute_budget::id()) { @@ -235,7 +235,7 @@ pub fn from_ui_block( None }); - let mut prioritization_fees = tx.message.instructions().iter().find_map(|i| { + let prioritization_fees = tx.message.instructions().iter().find_map(|i| { if i.program_id(tx.message.static_account_keys()) .eq(&compute_budget::id()) { @@ -327,17 +327,19 @@ fn map_block_info(produced_block: &ProducedBlock) -> BlockInfo { } } -#[inline] -fn calc_prioritization_fees(units: u32, additional_fee: u32) -> u64 { - (units as u64 * 1000) / additional_fee as u64 -} +#[cfg(test)] +mod tests { + #[inline] + fn calc_prioritization_fees(units: u32, additional_fee: u32) -> u64 { + (units as u64 * 1000) / additional_fee as u64 + } -#[test] -fn overflow_u32() { - // value high enough to overflow u32 if multiplied by 1000 - let units: u32 = 4_000_000_000; - let additional_fee: u32 = 100; - let prioritization_fees: u64 = calc_prioritization_fees(units, additional_fee); + #[test] + fn test_calc_prioritization_fees() { + let units: u32 = 100; + let additional_fee: u32 = 10; + let prioritization_fees: u64 = calc_prioritization_fees(units, additional_fee); - assert_eq!(40_000_000_000, prioritization_fees); + assert_eq!(1000, prioritization_fees); + } } diff --git a/quic-forward-proxy-integration-test/tests/quic_proxy_tpu_integrationtest.rs b/quic-forward-proxy-integration-test/tests/quic_proxy_tpu_integrationtest.rs index b9f61914..5cdc6319 100644 --- a/quic-forward-proxy-integration-test/tests/quic_proxy_tpu_integrationtest.rs +++ b/quic-forward-proxy-integration-test/tests/quic_proxy_tpu_integrationtest.rs @@ -15,7 +15,7 @@ use solana_sdk::pubkey::Pubkey; use solana_sdk::signature::{Keypair, Signature, Signer}; use solana_sdk::transaction::{Transaction, VersionedTransaction}; -use solana_streamer::nonblocking::quic::ConnectionPeerType; +use solana_streamer::nonblocking::quic::{ConnectionPeerType, SpawnNonBlockingServerResult}; use solana_streamer::packet::PacketBatch; use solana_streamer::quic::StreamStats; use solana_streamer::streamer::StakedNodes; @@ -424,21 +424,23 @@ async fn solana_quic_streamer_start() { let keypair = Keypair::new(); // gossip_host is used in the server certificate let gossip_host = "127.0.0.1".parse().unwrap(); - let (_, stats, t) = solana_streamer::nonblocking::quic::spawn_server( - "test-quic-server", - sock.try_clone().unwrap(), - &keypair, - gossip_host, - sender, - exit.clone(), - 1, - staked_nodes, - 10, - 10, - Duration::from_millis(1000), - Duration::from_millis(1000), - ) - .unwrap(); + let SpawnNonBlockingServerResult { stats, thread, .. } = + solana_streamer::nonblocking::quic::spawn_server( + "test-quic-server", + sock.try_clone().unwrap(), + &keypair, + gossip_host, + sender, + exit.clone(), + 1, + staked_nodes, + 10, + 10, + 9999, // max_streams_per_ms + Duration::from_millis(1000), + Duration::from_millis(1000), + ) + .unwrap(); let addr = sock.local_addr().unwrap().ip(); let port = sock.local_addr().unwrap().port(); @@ -447,7 +449,7 @@ async fn solana_quic_streamer_start() { // sleep(Duration::from_millis(500)).await; exit.store(true, Ordering::Relaxed); - t.await.unwrap(); + thread.await.unwrap(); stats.report("test-quic-streamer"); } @@ -499,13 +501,14 @@ async fn start_literpc_client_direct_mode( // get information about the optional validator identity stake // populated from get_stakes_for_identity() + let stakes = if test_case_params.stake_connection { + 30 + } else { + 0 + }; let identity_stakes = IdentityStakesData { - peer_type: ConnectionPeerType::Staked, - stakes: if test_case_params.stake_connection { - 30 - } else { - 0 - }, // stake of lite-rpc + peer_type: ConnectionPeerType::Staked(stakes), + stakes, // stake of lite-rpc min_stakes: 0, max_stakes: 40, total_stakes: 100, @@ -599,13 +602,14 @@ async fn start_literpc_client_proxy_mode( // get information about the optional validator identity stake // populated from get_stakes_for_identity() + let stake = if test_case_params.stake_connection { + 30 + } else { + 0 + }; let _identity_stakes = IdentityStakesData { - peer_type: ConnectionPeerType::Staked, - stakes: if test_case_params.stake_connection { - 30 - } else { - 0 - }, // stake of lite-rpc + peer_type: ConnectionPeerType::Staked(stake), // not sure if that is correct + stakes: stake, // stake of lite-rpc min_stakes: 0, max_stakes: 40, total_stakes: 100, @@ -705,21 +709,23 @@ impl SolanaQuicStreamer { let keypair = Keypair::new(); // gossip_host is used in the server certificate let gossip_host = "127.0.0.1".parse().unwrap(); - let (_, stats, jh) = solana_streamer::nonblocking::quic::spawn_server( - "test-quic-server", - udp_socket.try_clone().unwrap(), - &keypair, - gossip_host, - sender, - exit.clone(), - MAX_QUIC_CONNECTIONS_PER_PEER, - staked_nodes, - 10, - 10, - Duration::from_millis(1000), - Duration::from_millis(1000), - ) - .unwrap(); + let SpawnNonBlockingServerResult { stats, thread, .. } = + solana_streamer::nonblocking::quic::spawn_server( + "test-quic-server", + udp_socket.try_clone().unwrap(), + &keypair, + gossip_host, + sender, + exit.clone(), + MAX_QUIC_CONNECTIONS_PER_PEER, + staked_nodes, + 10, + 10, + 9999, // max_streams_per_ms + Duration::from_millis(1000), + Duration::from_millis(1000), + ) + .unwrap(); let addr = udp_socket.local_addr().unwrap().ip(); let port = udp_socket.local_addr().unwrap().port(); @@ -728,7 +734,7 @@ impl SolanaQuicStreamer { Self { sock: udp_socket, exit, - join_handler: jh, + join_handler: thread, stats, } } diff --git a/services/src/transaction_service.rs b/services/src/transaction_service.rs index 78d44d71..ce34caef 100644 --- a/services/src/transaction_service.rs +++ b/services/src/transaction_service.rs @@ -164,7 +164,9 @@ impl TransactionService { .program_id(tx.message.static_account_keys()) .eq(&compute_budget::id()) { - let ix_which = solana_sdk::borsh1::try_from_slice_unchecked::(ix.data.as_slice()); + let ix_which = solana_sdk::borsh1::try_from_slice_unchecked::< + ComputeBudgetInstruction, + >(ix.data.as_slice()); if let Ok(ComputeBudgetInstruction::SetComputeUnitPrice(fees)) = ix_which { prioritization_fee = fees; }