Skip to content

Commit

Permalink
clippy+fmt
Browse files Browse the repository at this point in the history
  • Loading branch information
grooviegermanikus committed Jun 13, 2024
1 parent c7c192e commit bd0655a
Show file tree
Hide file tree
Showing 5 changed files with 81 additions and 83 deletions.
12 changes: 6 additions & 6 deletions cluster-endpoints/src/grpc/grpc_accounts_streaming.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,16 @@
use anyhow::anyhow;
use futures::StreamExt;
use merge_streams::MergeStreams;
use std::{
collections::{HashMap, HashSet},
sync::Arc,
time::Duration,
};
use anyhow::anyhow;

use geyser_grpc_connector::yellowstone_grpc_util::{connect_with_timeout_with_buffers, GeyserGrpcClientBufferConfig, GeyserGrpcWrappedResult};
use geyser_grpc_connector::{GeyserGrpcClient, GeyserGrpcClientResult, GrpcSourceConfig};
use geyser_grpc_connector::yellowstone_grpc_util::{
connect_with_timeout_with_buffers, GeyserGrpcClientBufferConfig,
};
use geyser_grpc_connector::{GeyserGrpcClient, GrpcSourceConfig};
use itertools::Itertools;
use solana_lite_rpc_core::{
commitment_utils::Commitment,
Expand Down Expand Up @@ -219,9 +221,7 @@ async fn create_connection(
},
)
.await
.map_err(|e| {
anyhow!("Failed to connect to grpc source: {e:?}")
})
.map_err(|e| anyhow!("Failed to connect to grpc source: {e:?}"))
}

pub fn create_grpc_account_streaming(
Expand Down
24 changes: 6 additions & 18 deletions cluster-endpoints/src/grpc_subscription.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,6 @@ use crate::grpc::grpc_accounts_streaming::create_grpc_account_streaming;
use crate::grpc_multiplex::{
create_grpc_multiplex_blocks_subscription, create_grpc_multiplex_processed_slots_subscription,
};
use anyhow::Context;
use futures::StreamExt;
use geyser_grpc_connector::yellowstone_grpc_util::{
connect_with_timeout_with_buffers, GeyserGrpcClientBufferConfig,
};
use geyser_grpc_connector::GrpcSourceConfig;
use itertools::Itertools;
use log::trace;
Expand Down Expand Up @@ -35,13 +30,9 @@ use solana_sdk::{
};
use solana_transaction_status::{Reward, RewardType};
use std::cell::OnceCell;
use std::collections::HashMap;
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::{broadcast, Notify};
use tokio::sync::Notify;
use tracing::trace_span;
use yellowstone_grpc_proto::geyser::subscribe_update::UpdateOneof;
use yellowstone_grpc_proto::geyser::{CommitmentLevel, SubscribeRequestFilterBlocks};

use crate::rpc_polling::vote_accounts_and_cluster_info_polling::{
poll_cluster_info, poll_vote_accounts,
Expand Down Expand Up @@ -222,8 +213,9 @@ fn map_compute_budget_instructions(message: &VersionedMessage) -> (Option<u32>,
.program_id(message.static_account_keys())
.eq(&compute_budget::id())
}) {
if let Ok(budget_ins) =
solana_sdk::borsh1::try_from_slice_unchecked::<ComputeBudgetInstruction>(compute_budget_ins.data.as_slice())
if let Ok(budget_ins) = solana_sdk::borsh1::try_from_slice_unchecked::<
ComputeBudgetInstruction,
>(compute_budget_ins.data.as_slice())
{
match budget_ins {
// aka cu requested
Expand All @@ -245,12 +237,8 @@ fn map_compute_budget_instructions(message: &VersionedMessage) -> (Option<u32>,
}
}

let cu_requested = cu_requested_cell
.get()
.cloned();
let prioritization_fees = prioritization_fees_cell
.get()
.cloned();
let cu_requested = cu_requested_cell.get().cloned();
let prioritization_fees = prioritization_fees_cell.get().cloned();
(cu_requested, prioritization_fees)
}

Expand Down
28 changes: 15 additions & 13 deletions cluster-endpoints/src/rpc_polling/poll_blocks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,7 @@ pub fn from_ui_block(
_ => None,
};

let mut cu_requested = tx.message.instructions().iter().find_map(|i| {
let cu_requested = tx.message.instructions().iter().find_map(|i| {
if i.program_id(tx.message.static_account_keys())
.eq(&compute_budget::id())
{
Expand All @@ -235,7 +235,7 @@ pub fn from_ui_block(
None
});

let mut prioritization_fees = tx.message.instructions().iter().find_map(|i| {
let prioritization_fees = tx.message.instructions().iter().find_map(|i| {
if i.program_id(tx.message.static_account_keys())
.eq(&compute_budget::id())
{
Expand Down Expand Up @@ -327,17 +327,19 @@ fn map_block_info(produced_block: &ProducedBlock) -> BlockInfo {
}
}

#[inline]
fn calc_prioritization_fees(units: u32, additional_fee: u32) -> u64 {
(units as u64 * 1000) / additional_fee as u64
}
#[cfg(test)]
mod tests {
#[inline]
fn calc_prioritization_fees(units: u32, additional_fee: u32) -> u64 {
(units as u64 * 1000) / additional_fee as u64
}

#[test]
fn overflow_u32() {
// value high enough to overflow u32 if multiplied by 1000
let units: u32 = 4_000_000_000;
let additional_fee: u32 = 100;
let prioritization_fees: u64 = calc_prioritization_fees(units, additional_fee);
#[test]
fn test_calc_prioritization_fees() {
let units: u32 = 100;
let additional_fee: u32 = 10;
let prioritization_fees: u64 = calc_prioritization_fees(units, additional_fee);

assert_eq!(40_000_000_000, prioritization_fees);
assert_eq!(1000, prioritization_fees);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use solana_sdk::pubkey::Pubkey;
use solana_sdk::signature::{Keypair, Signature, Signer};

use solana_sdk::transaction::{Transaction, VersionedTransaction};
use solana_streamer::nonblocking::quic::ConnectionPeerType;
use solana_streamer::nonblocking::quic::{ConnectionPeerType, SpawnNonBlockingServerResult};
use solana_streamer::packet::PacketBatch;
use solana_streamer::quic::StreamStats;
use solana_streamer::streamer::StakedNodes;
Expand Down Expand Up @@ -424,21 +424,23 @@ async fn solana_quic_streamer_start() {
let keypair = Keypair::new();
// gossip_host is used in the server certificate
let gossip_host = "127.0.0.1".parse().unwrap();
let (_, stats, t) = solana_streamer::nonblocking::quic::spawn_server(
"test-quic-server",
sock.try_clone().unwrap(),
&keypair,
gossip_host,
sender,
exit.clone(),
1,
staked_nodes,
10,
10,
Duration::from_millis(1000),
Duration::from_millis(1000),
)
.unwrap();
let SpawnNonBlockingServerResult { stats, thread, .. } =
solana_streamer::nonblocking::quic::spawn_server(
"test-quic-server",
sock.try_clone().unwrap(),
&keypair,
gossip_host,
sender,
exit.clone(),
1,
staked_nodes,
10,
10,
9999, // max_streams_per_ms
Duration::from_millis(1000),
Duration::from_millis(1000),
)
.unwrap();

let addr = sock.local_addr().unwrap().ip();
let port = sock.local_addr().unwrap().port();
Expand All @@ -447,7 +449,7 @@ async fn solana_quic_streamer_start() {
// sleep(Duration::from_millis(500)).await;

exit.store(true, Ordering::Relaxed);
t.await.unwrap();
thread.await.unwrap();

stats.report("test-quic-streamer");
}
Expand Down Expand Up @@ -499,13 +501,14 @@ async fn start_literpc_client_direct_mode(

// get information about the optional validator identity stake
// populated from get_stakes_for_identity()
let stakes = if test_case_params.stake_connection {
30
} else {
0
};
let identity_stakes = IdentityStakesData {
peer_type: ConnectionPeerType::Staked,
stakes: if test_case_params.stake_connection {
30
} else {
0
}, // stake of lite-rpc
peer_type: ConnectionPeerType::Staked(stakes),
stakes, // stake of lite-rpc
min_stakes: 0,
max_stakes: 40,
total_stakes: 100,
Expand Down Expand Up @@ -599,13 +602,14 @@ async fn start_literpc_client_proxy_mode(

// get information about the optional validator identity stake
// populated from get_stakes_for_identity()
let stake = if test_case_params.stake_connection {
30
} else {
0
};
let _identity_stakes = IdentityStakesData {
peer_type: ConnectionPeerType::Staked,
stakes: if test_case_params.stake_connection {
30
} else {
0
}, // stake of lite-rpc
peer_type: ConnectionPeerType::Staked(stake), // not sure if that is correct
stakes: stake, // stake of lite-rpc
min_stakes: 0,
max_stakes: 40,
total_stakes: 100,
Expand Down Expand Up @@ -705,21 +709,23 @@ impl SolanaQuicStreamer {
let keypair = Keypair::new();
// gossip_host is used in the server certificate
let gossip_host = "127.0.0.1".parse().unwrap();
let (_, stats, jh) = solana_streamer::nonblocking::quic::spawn_server(
"test-quic-server",
udp_socket.try_clone().unwrap(),
&keypair,
gossip_host,
sender,
exit.clone(),
MAX_QUIC_CONNECTIONS_PER_PEER,
staked_nodes,
10,
10,
Duration::from_millis(1000),
Duration::from_millis(1000),
)
.unwrap();
let SpawnNonBlockingServerResult { stats, thread, .. } =
solana_streamer::nonblocking::quic::spawn_server(
"test-quic-server",
udp_socket.try_clone().unwrap(),
&keypair,
gossip_host,
sender,
exit.clone(),
MAX_QUIC_CONNECTIONS_PER_PEER,
staked_nodes,
10,
10,
9999, // max_streams_per_ms
Duration::from_millis(1000),
Duration::from_millis(1000),
)
.unwrap();

let addr = udp_socket.local_addr().unwrap().ip();
let port = udp_socket.local_addr().unwrap().port();
Expand All @@ -728,7 +734,7 @@ impl SolanaQuicStreamer {
Self {
sock: udp_socket,
exit,
join_handler: jh,
join_handler: thread,
stats,
}
}
Expand Down
4 changes: 3 additions & 1 deletion services/src/transaction_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,9 @@ impl TransactionService {
.program_id(tx.message.static_account_keys())
.eq(&compute_budget::id())
{
let ix_which = solana_sdk::borsh1::try_from_slice_unchecked::<ComputeBudgetInstruction>(ix.data.as_slice());
let ix_which = solana_sdk::borsh1::try_from_slice_unchecked::<
ComputeBudgetInstruction,
>(ix.data.as_slice());
if let Ok(ComputeBudgetInstruction::SetComputeUnitPrice(fees)) = ix_which {
prioritization_fee = fees;
}
Expand Down

0 comments on commit bd0655a

Please sign in to comment.