Skip to content

Commit

Permalink
Gixing websocket subscriptions for signatures and slot (#403)
Browse files Browse the repository at this point in the history
* Updating send transaction method signature

* fixing issues with pubsub

* remove test code
  • Loading branch information
godmodegalactus authored Jun 28, 2024
1 parent f990f85 commit 129d6a8
Show file tree
Hide file tree
Showing 15 changed files with 200 additions and 187 deletions.
1 change: 1 addition & 0 deletions cluster-endpoints/src/grpc_multiplex.rs
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,7 @@ fn create_grpc_multiplex_block_info_task(
.expect("block_time from geyser block meta")
.timestamp
as u64,
parent: block_meta.parent_slot,
};

let send_started_at = Instant::now();
Expand Down
1 change: 1 addition & 0 deletions cluster-endpoints/src/rpc_polling/poll_blocks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -324,5 +324,6 @@ fn map_block_info(produced_block: &ProducedBlock) -> BlockInfo {
blockhash: produced_block.blockhash,
commitment_config: produced_block.commitment_config,
block_time: produced_block.block_time,
parent: produced_block.parent_slot,
}
}
2 changes: 1 addition & 1 deletion core/src/commitment_utils.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use solana_sdk::commitment_config::{CommitmentConfig, CommitmentLevel};

#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
#[repr(C)]
pub enum Commitment {
Processed = 0,
Expand Down
54 changes: 22 additions & 32 deletions core/src/stores/subscription_store.rs
Original file line number Diff line number Diff line change
@@ -1,55 +1,38 @@
use crate::commitment_utils::Commitment;
use crate::{structures::produced_block::TransactionInfo, types::SubscptionHanderSink};
use dashmap::DashMap;
use solana_client::rpc_response::{ProcessedSignatureResult, RpcSignatureResult};
use solana_sdk::signature::Signature;
use solana_sdk::{
commitment_config::{CommitmentConfig, CommitmentLevel},
slot_history::Slot,
};
use solana_sdk::{commitment_config::CommitmentConfig, slot_history::Slot};
use std::{sync::Arc, time::Duration};
use tokio::time::Instant;

#[derive(Clone, Default)]
pub struct SubscriptionStore {
pub signature_subscribers:
Arc<DashMap<(Signature, CommitmentConfig), (SubscptionHanderSink, Instant)>>,
Arc<DashMap<(Signature, Commitment), (SubscptionHanderSink, Instant)>>,
}

impl SubscriptionStore {
#[allow(deprecated)]
pub fn get_supported_commitment_config(
commitment_config: CommitmentConfig,
) -> CommitmentConfig {
match commitment_config.commitment {
CommitmentLevel::Finalized | CommitmentLevel::Root | CommitmentLevel::Max => {
CommitmentConfig {
commitment: CommitmentLevel::Finalized,
}
}
_ => CommitmentConfig {
commitment: CommitmentLevel::Confirmed,
},
}
}

pub fn signature_subscribe(
&self,
signature: Signature,
commitment_config: CommitmentConfig,
sink: SubscptionHanderSink,
) {
let commitment_config = Self::get_supported_commitment_config(commitment_config);
self.signature_subscribers
.insert((signature, commitment_config), (sink, Instant::now()));
self.signature_subscribers.insert(
(signature, Commitment::from(commitment_config)),
(sink, Instant::now()),
);
}

pub fn signature_un_subscribe(
&self,
signature: Signature,
commitment_config: CommitmentConfig,
) {
let commitment_config = Self::get_supported_commitment_config(commitment_config);
self.signature_subscribers
.remove(&(signature, commitment_config));
.remove(&(signature, Commitment::from(commitment_config)));
}

pub async fn notify(
Expand All @@ -58,13 +41,20 @@ impl SubscriptionStore {
transaction_info: &TransactionInfo,
commitment_config: CommitmentConfig,
) {
if let Some((_sig, (sink, _))) = self
.signature_subscribers
.remove(&(transaction_info.signature, commitment_config))
{
if let Some((_sig, (sink, _))) = self.signature_subscribers.remove(&(
transaction_info.signature,
Commitment::from(commitment_config),
)) {
let signature_result =
RpcSignatureResult::ProcessedSignature(ProcessedSignatureResult {
err: transaction_info.err.clone(),
});
// none if transaction succeeded
sink.send(slot, serde_json::json!({ "err": transaction_info.err }))
.await;
sink.send(
slot,
serde_json::to_value(signature_result).expect("Should be serializable in json"),
)
.await;
}
}

Expand Down
1 change: 1 addition & 0 deletions core/src/structures/block_info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use solana_sdk::hash::Hash;
#[derive(Clone, Debug)]
pub struct BlockInfo {
pub slot: u64,
pub parent: u64,
pub block_height: u64,
pub blockhash: Hash,
pub commitment_config: CommitmentConfig,
Expand Down
60 changes: 41 additions & 19 deletions lite-rpc/src/bridge.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,11 @@ use jsonrpsee::core::RpcResult;
use prometheus::{opts, register_int_counter, IntCounter};
use solana_account_decoder::UiAccount;
use solana_lite_rpc_accounts::account_service::AccountService;
use solana_lite_rpc_core::encoding::{BASE58, BASE64};
use solana_lite_rpc_prioritization_fees::account_prio_service::AccountPrioService;
use solana_lite_rpc_prioritization_fees::prioritization_fee_calculation_method::PrioritizationFeeCalculationMethod;
use solana_rpc_client::nonblocking::rpc_client::RpcClient;
use solana_rpc_client_api::config::RpcAccountInfoConfig;
use solana_rpc_client_api::config::{RpcAccountInfoConfig, RpcSendTransactionConfig};
use solana_rpc_client_api::response::{OptionalContext, RpcKeyedAccount};
use solana_rpc_client_api::{
config::{
Expand All @@ -21,28 +22,27 @@ use solana_rpc_client_api::{
},
};
use solana_sdk::epoch_info::EpochInfo;
use solana_sdk::packet::PACKET_DATA_SIZE;
use solana_sdk::signature::Signature;
use solana_sdk::{commitment_config::CommitmentConfig, pubkey::Pubkey, slot_history::Slot};
use solana_transaction_status::{TransactionStatus, UiConfirmedBlock};
use solana_transaction_status::{
TransactionBinaryEncoding, TransactionStatus, UiConfirmedBlock, UiTransactionEncoding,
};
use std::collections::HashMap;
use std::str::FromStr;
use std::sync::Arc;

use solana_lite_rpc_blockstore::history::History;
use solana_lite_rpc_core::solana_utils::hash_from_str;
use solana_lite_rpc_core::{
encoding,
stores::{block_information_store::BlockInformation, data_cache::DataCache},
use solana_lite_rpc_core::stores::{
block_information_store::BlockInformation, data_cache::DataCache,
};
use solana_lite_rpc_services::{
transaction_service::TransactionService, tx_sender::TXS_IN_CHANNEL,
};

use crate::rpc_errors::RpcErrors;
use crate::{
configs::{IsBlockHashValidConfig, SendTransactionConfig},
rpc::LiteRpcServer,
};
use crate::{configs::IsBlockHashValidConfig, rpc::LiteRpcServer};
use solana_lite_rpc_prioritization_fees::rpc_data::{AccountPrioFeesStats, PrioFeesStats};
use solana_lite_rpc_prioritization_fees::PrioFeesService;

Expand Down Expand Up @@ -347,37 +347,59 @@ impl LiteRpcServer for LiteBridge {
async fn send_transaction(
&self,
tx: String,
send_transaction_config: Option<SendTransactionConfig>,
send_transaction_config: Option<RpcSendTransactionConfig>,
) -> RpcResult<String> {
RPC_SEND_TX.inc();

// Copied these constants from solana labs code
const MAX_BASE58_SIZE: usize = 1683;
const MAX_BASE64_SIZE: usize = 1644;

let SendTransactionConfig {
let RpcSendTransactionConfig {
encoding,
max_retries,
..
} = send_transaction_config.unwrap_or_default();

let encoding = encoding.unwrap_or(UiTransactionEncoding::Base58);
let expected_size = match encoding {
encoding::BinaryEncoding::Base58 => MAX_BASE58_SIZE,
encoding::BinaryEncoding::Base64 => MAX_BASE64_SIZE,
UiTransactionEncoding::Base58 => MAX_BASE58_SIZE,
UiTransactionEncoding::Base64 => MAX_BASE64_SIZE,
_ => usize::MAX,
};
if tx.len() > expected_size {
return Err(jsonrpsee::types::error::ErrorCode::OversizedRequest.into());
}

let raw_tx = match encoding.decode(tx) {
Ok(raw_tx) => raw_tx,
Err(_) => {
return Err(jsonrpsee::types::error::ErrorCode::InvalidParams.into());
let binary_encoding = encoding
.into_binary_encoding()
.ok_or(jsonrpsee::types::error::ErrorCode::InvalidParams)?;

let wire_output = match binary_encoding {
TransactionBinaryEncoding::Base58 => {
if tx.len() > MAX_BASE58_SIZE {
return Err(jsonrpsee::types::error::ErrorCode::OversizedRequest.into());
}
BASE58
.decode(tx)
.map_err(|_| jsonrpsee::types::error::ErrorCode::InvalidParams)?
}
TransactionBinaryEncoding::Base64 => {
if tx.len() > MAX_BASE64_SIZE {
return Err(jsonrpsee::types::error::ErrorCode::OversizedRequest.into());
}
BASE64
.decode(tx)
.map_err(|_| jsonrpsee::types::error::ErrorCode::InvalidParams)?
}
};

if wire_output.len() > PACKET_DATA_SIZE {
return Err(jsonrpsee::types::error::ErrorCode::OversizedRequest.into());
}
let max_retries = max_retries.map(|x| x as u16);
match self
.transaction_service
.send_wire_transaction(raw_tx, max_retries)
.send_wire_transaction(wire_output, max_retries)
.await
{
Ok(sig) => {
Expand Down
29 changes: 16 additions & 13 deletions lite-rpc/src/bridge_pubsub.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
use prometheus::{opts, register_int_counter, IntCounter};
use solana_lite_rpc_accounts::account_service::AccountService;
use solana_lite_rpc_core::{
commitment_utils::Commitment, stores::data_cache::DataCache,
structures::account_data::AccountNotificationMessage, types::BlockStream,
commitment_utils::Commitment,
stores::data_cache::DataCache,
structures::account_data::AccountNotificationMessage,
types::{BlockInfoStream, BlockStream},
};
use std::{str::FromStr, sync::Arc, time::Duration};
use tokio::sync::broadcast::error::RecvError::{Closed, Lagged};
Expand Down Expand Up @@ -48,7 +50,8 @@ pub struct LitePubSubBridge {
data_cache: DataCache,
prio_fees_service: PrioFeesService,
account_priofees_service: AccountPrioService,
block_stream: BlockStream,
_block_stream: BlockStream,
block_info_stream: BlockInfoStream,
accounts_service: Option<AccountService>,
}

Expand All @@ -58,13 +61,15 @@ impl LitePubSubBridge {
prio_fees_service: PrioFeesService,
account_priofees_service: AccountPrioService,
block_stream: BlockStream,
block_info_stream: BlockInfoStream,
accounts_service: Option<AccountService>,
) -> Self {
Self {
data_cache,
prio_fees_service,
account_priofees_service,
block_stream,
_block_stream: block_stream,
block_info_stream,
accounts_service,
}
}
Expand All @@ -74,17 +79,14 @@ impl LitePubSubBridge {
impl LiteRpcPubSubServer for LitePubSubBridge {
async fn slot_subscribe(&self, pending: PendingSubscriptionSink) -> SubscriptionResult {
let sink = pending.accept().await?;
let mut block_stream = self.block_stream.resubscribe();
let mut block_info_stream = self.block_info_stream.resubscribe();
tokio::spawn(async move {
loop {
match block_stream.recv().await {
Ok(produced_block) => {
if !produced_block.commitment_config.is_processed() {
continue;
}
match block_info_stream.recv().await {
Ok(block_info) => {
let slot_info = SlotInfo {
slot: produced_block.slot,
parent: produced_block.parent_slot,
slot: block_info.slot,
parent: block_info.parent,
root: 0,
};
let result_message = jsonrpsee::SubscriptionMessage::from_json(&slot_info);
Expand Down Expand Up @@ -137,10 +139,11 @@ impl LiteRpcPubSubServer for LitePubSubBridge {
async fn signature_subscribe(
&self,
pending: PendingSubscriptionSink,
signature: Signature,
signature: String,
config: RpcSignatureSubscribeConfig,
) -> SubscriptionResult {
RPC_SIGNATURE_SUBSCRIBE.inc();
let signature = Signature::from_str(&signature)?;
let sink = pending.accept().await?;

let jsonrpsee_sink = JsonRpseeSubscriptionHandlerSink::new(sink);
Expand Down
1 change: 1 addition & 0 deletions lite-rpc/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -356,6 +356,7 @@ pub async fn start_lite_rpc(args: Config, rpc_client: Arc<RpcClient>) -> anyhow:
block_priofees_service,
account_priofees_service,
blocks_notifier,
blockinfo_notifier,
accounts_service.clone(),
);

Expand Down
6 changes: 3 additions & 3 deletions lite-rpc/src/rpc.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::configs::{IsBlockHashValidConfig, SendTransactionConfig};
use crate::configs::IsBlockHashValidConfig;
use jsonrpsee::core::RpcResult;
use jsonrpsee::proc_macros::rpc;
use solana_account_decoder::UiAccount;
Expand All @@ -7,7 +7,7 @@ use solana_lite_rpc_prioritization_fees::rpc_data::{AccountPrioFeesStats, PrioFe
use solana_rpc_client_api::config::{
RpcAccountInfoConfig, RpcBlocksConfigWrapper, RpcContextConfig, RpcGetVoteAccountsConfig,
RpcLeaderScheduleConfig, RpcProgramAccountsConfig, RpcRequestAirdropConfig,
RpcSignatureStatusConfig, RpcSignaturesForAddressConfig,
RpcSendTransactionConfig, RpcSignatureStatusConfig, RpcSignaturesForAddressConfig,
};
use solana_rpc_client_api::response::{
OptionalContext, Response as RpcResponse, RpcBlockhash,
Expand Down Expand Up @@ -133,7 +133,7 @@ pub trait LiteRpc {
async fn send_transaction(
&self,
tx: String,
send_transaction_config: Option<SendTransactionConfig>,
send_transaction_config: Option<RpcSendTransactionConfig>,
) -> RpcResult<String>;

// ***********************
Expand Down
Loading

0 comments on commit 129d6a8

Please sign in to comment.