From 7a1bc411b74012bd36245594a78432e4ec414f2f Mon Sep 17 00:00:00 2001 From: GroovieGermanikus Date: Wed, 26 Jun 2024 09:49:40 +0200 Subject: [PATCH] parallel writes --- src/postgres.rs | 50 +++++++++++++++++++++++++------------------------ 1 file changed, 26 insertions(+), 24 deletions(-) diff --git a/src/postgres.rs b/src/postgres.rs index 2f3c024..5dab618 100644 --- a/src/postgres.rs +++ b/src/postgres.rs @@ -17,6 +17,7 @@ use prometheus::{opts, register_int_gauge, IntGauge}; use serde::Serialize; use solana_sdk::clock::Slot; use solana_sdk::pubkey::Pubkey; +use solana_sdk::signature::Signature; use solana_sdk::transaction::TransactionError; use tokio::sync::mpsc::error::SendTimeoutError; use tokio::sync::mpsc::Sender; @@ -936,34 +937,26 @@ impl PostgresSession { // 750ms let _span = tracing::info_span!("save_block", slot = block_info.slot); let instant = Instant::now(); - let signatures = { + let fut_signatures = async { // .3ms let _span = tracing::debug_span!("map_signatures", slot = block_info.slot); block_info .transactions .iter() .map(|transaction| transaction.signature.clone()) - .collect() + .collect_vec() }; - let accounts = { + let fut_accounts = async { // .6ms let _span = tracing::debug_span!("map_accounts", slot = block_info.slot); block_info .heavily_locked_accounts .iter() .map(|acc| acc.key.clone()) - .collect() + .collect::>() }; - let both_started_at = Instant::now(); - let create_tx_ids = self.create_transaction_ids(signatures, slot); - let create_accs = self.create_accounts_for_transaction(accounts, slot); - try_join!(create_tx_ids, create_accs)?; - TIME_TO_SAVE_TRANSACTION.set(both_started_at.elapsed().as_millis() as i64); - ACCOUNT_SAVE_TIME.set(both_started_at.elapsed().as_millis() as i64); - - let instant_acc_tx: Instant = Instant::now(); - let txs_accounts = { + let fut_txs_accounts = async { // 90ms let _span = tracing::debug_span!("map_txs_accounts", slot = block_info.slot); block_info @@ -980,28 +973,37 @@ impl PostgresSession { is_signer: acc.is_signer, is_atl: acc.is_alt, }) - .collect(), + .collect_vec(), }) - .collect_vec() + .collect() }; - if let Err(e) = self.insert_accounts_for_transaction(txs_accounts, slot).await { - error!("Error inserting accounts for transactions : {e:?}"); - } - TIME_TO_STORE_TX_ACCOUNT.set(instant_acc_tx.elapsed().as_millis() as i64); + let (signatures, accounts, txs_accounts) = join!(fut_signatures, fut_accounts, fut_txs_accounts); - // insert transactions - let instant_save_tx = Instant::now(); - self.insert_transactions_for_block(&block_info.transactions, slot) - .await?; - TIME_TO_SAVE_TRANSACTION_DATA.set(instant_save_tx.elapsed().as_millis() as i64); + let both_started_at = Instant::now(); + let fut_create_tx_ids = self.create_transaction_ids(signatures, slot); + let fut_create_accs = self.create_accounts_for_transaction(accounts, slot); + try_join!(fut_create_tx_ids, fut_create_accs)?; + TIME_TO_SAVE_TRANSACTION.set(both_started_at.elapsed().as_millis() as i64); + ACCOUNT_SAVE_TIME.set(both_started_at.elapsed().as_millis() as i64); + + let both_started_at = Instant::now(); + // depends on transactions and accounts mapping tables + let fut_insert_accounts_for_transaction = self.insert_accounts_for_transaction(txs_accounts, slot); + // depends on transactions mapping table + let fut_insert_transactions_for_block = self.insert_transactions_for_block(&block_info.transactions, slot); + let ((), ()) = try_join!(fut_insert_accounts_for_transaction, fut_insert_transactions_for_block)?; + TIME_TO_STORE_TX_ACCOUNT.set(both_started_at.elapsed().as_millis() as i64); + TIME_TO_SAVE_TRANSACTION_DATA.set(both_started_at.elapsed().as_millis() as i64); // save account usage in blocks let ins = Instant::now(); + // depends on accounts mapping table self.save_account_usage_in_block(&block_info).await?; TIME_TO_SAVE_BLOCK_ACCOUNTS.set(ins.elapsed().as_millis() as i64); let inst_block_info = Instant::now(); + // no dependencies self.save_block_info(&block_info).await?; BLOCK_INFO_SAVE_TIME.set(inst_block_info.elapsed().as_millis() as i64);