From 042dc07fa97d2f268f3bb40ebd02e1ed9f874cd9 Mon Sep 17 00:00:00 2001 From: Ivan Frolov Date: Tue, 4 Feb 2025 18:14:16 -0500 Subject: [PATCH] fix: restart ws connection to rpc when it was dropped and sync lost blocks --- omni-relayer/src/main.rs | 3 +- omni-relayer/src/startup/evm.rs | 137 +++++++++++++++++------------ omni-relayer/src/startup/solana.rs | 50 ++++++----- 3 files changed, 110 insertions(+), 80 deletions(-) diff --git a/omni-relayer/src/main.rs b/omni-relayer/src/main.rs index 00537be7..8026c452 100644 --- a/omni-relayer/src/main.rs +++ b/omni-relayer/src/main.rs @@ -4,6 +4,7 @@ use anyhow::{Context, Result}; use clap::Parser; use log::{error, info}; use omni_types::ChainKind; +use solana_sdk::signature::Signature; mod config; mod startup; @@ -29,7 +30,7 @@ struct CliArgs { arb_start_block: Option, /// Start signature for Solana indexer #[clap(long)] - solana_start_signature: Option, + solana_start_signature: Option, } #[tokio::main] diff --git a/omni-relayer/src/startup/evm.rs b/omni-relayer/src/startup/evm.rs index 4e8adcb3..afdc6c3b 100644 --- a/omni-relayer/src/startup/evm.rs +++ b/omni-relayer/src/startup/evm.rs @@ -32,7 +32,7 @@ pub async fn start_indexer( config: config::Config, redis_client: redis::Client, chain_kind: ChainKind, - start_block: Option, + mut start_block: Option, ) -> Result<()> { let mut redis_connection = redis_client.get_multiplexed_tokio_connection().await?; @@ -49,17 +49,89 @@ pub async fn start_indexer( _ => anyhow::bail!("Unsupported chain kind: {chain_kind:?}"), }; - let http_provider = ProviderBuilder::new().on_http(rpc_http_url.parse().context(format!( - "Failed to parse {chain_kind:?} rpc provider as url", - ))?); + let filter = Filter::new() + .address(bridge_token_factory_address) + .event_signature( + [ + utils::evm::InitTransfer::SIGNATURE_HASH, + utils::evm::FinTransfer::SIGNATURE_HASH, + utils::evm::DeployToken::SIGNATURE_HASH, + ] + .to_vec(), + ); + + loop { + let http_provider = ProviderBuilder::new().on_http(rpc_http_url.parse().context( + format!("Failed to parse {chain_kind:?} rpc provider as url",), + )?); + + process_recent_blocks( + &mut redis_connection, + &http_provider, + &filter, + chain_kind, + start_block, + block_processing_batch_size, + expected_finalization_time, + ) + .await?; + + info!( + "All historical logs processed, starting {:?} WS subscription", + chain_kind + ); + + let ws_provider = crate::skip_fail!( + ProviderBuilder::new() + .on_ws(WsConnect::new(&rpc_ws_url)) + .await, + format!("{chain_kind:?} WebSocket connection failed"), + 5 + ); + + let mut stream = crate::skip_fail!( + ws_provider.subscribe_logs(&filter).await, + format!("{chain_kind:?} WebSocket subscription failed"), + 5 + ) + .into_stream(); + + info!("Subscribed to {:?} logs", chain_kind); + + while let Some(log) = stream.next().await { + process_log( + chain_kind, + &mut redis_connection, + &http_provider, + log, + expected_finalization_time, + ) + .await; + } + error!("{chain_kind:?} WebSocket stream closed unexpectedly, reconnecting..."); + start_block = None; + + tokio::time::sleep(tokio::time::Duration::from_secs(5)).await; + } +} + +async fn process_recent_blocks( + redis_connection: &mut redis::aio::MultiplexedConnection, + http_provider: &RootProvider>, + filter: &Filter, + chain_kind: ChainKind, + start_block: Option, + block_processing_batch_size: u64, + expected_finalization_time: i64, +) -> Result<()> { let last_processed_block_key = utils::redis::get_last_processed_key(chain_kind); let latest_block = http_provider.get_block_number().await?; let from_block = match start_block { Some(block) => block, None => { if let Some(block) = utils::redis::get_last_processed::<&str, u64>( - &mut redis_connection, + redis_connection, &last_processed_block_key, ) .await @@ -67,7 +139,7 @@ pub async fn start_indexer( block + 1 } else { utils::redis::update_last_processed( - &mut redis_connection, + redis_connection, &last_processed_block_key, latest_block + 1, ) @@ -79,17 +151,6 @@ pub async fn start_indexer( info!("{chain_kind:?} indexer will start from block: {from_block}"); - let filter = Filter::new() - .address(bridge_token_factory_address) - .event_signature( - [ - utils::evm::InitTransfer::SIGNATURE_HASH, - utils::evm::FinTransfer::SIGNATURE_HASH, - utils::evm::DeployToken::SIGNATURE_HASH, - ] - .to_vec(), - ); - for current_block in (from_block..latest_block).step_by(usize::try_from(block_processing_batch_size)?) { @@ -105,8 +166,8 @@ pub async fn start_indexer( for log in logs { process_log( chain_kind, - &mut redis_connection, - &http_provider, + redis_connection, + http_provider, log, expected_finalization_time, ) @@ -114,43 +175,7 @@ pub async fn start_indexer( } } - info!( - "All historical logs processed, starting {:?} WS subscription", - chain_kind - ); - - loop { - let ws_provider = crate::skip_fail!( - ProviderBuilder::new() - .on_ws(WsConnect::new(&rpc_ws_url)) - .await, - format!("{chain_kind:?} WebSocket connection failed"), - 5 - ); - - let mut stream = crate::skip_fail!( - ws_provider.subscribe_logs(&filter).await, - format!("{chain_kind:?} WebSocket subscription failed"), - 5 - ) - .into_stream(); - - info!("Subscribed to {:?} logs", chain_kind); - - while let Some(log) = stream.next().await { - process_log( - chain_kind, - &mut redis_connection, - &http_provider, - log, - expected_finalization_time, - ) - .await; - } - - error!("{chain_kind:?} WebSocket stream closed unexpectedly, reconnecting..."); - tokio::time::sleep(tokio::time::Duration::from_secs(5)).await; - } + Ok(()) } async fn process_log( diff --git a/omni-relayer/src/startup/solana.rs b/omni-relayer/src/startup/solana.rs index eae166b9..f5f4e72c 100644 --- a/omni-relayer/src/startup/solana.rs +++ b/omni-relayer/src/startup/solana.rs @@ -31,7 +31,7 @@ pub fn get_keypair(file: Option<&String>) -> Keypair { pub async fn start_indexer( config: config::Config, redis_client: redis::Client, - start_signature: Option, + mut start_signature: Option, ) -> Result<()> { let Some(solana_config) = config.solana else { anyhow::bail!("Failed to get Solana config"); @@ -43,27 +43,26 @@ pub async fn start_indexer( let rpc_ws_url = &solana_config.rpc_ws_url; let program_id = Pubkey::from_str(&solana_config.program_id)?; - let http_client = RpcClient::new(rpc_http_url.to_string()); - - if let Err(e) = process_recent_signatures( - &mut redis_connection, - &http_client, - &program_id, - start_signature, - ) - .await - { - warn!("Failed to fetch recent logs: {}", e); - } + loop { + crate::skip_fail!( + process_recent_signatures( + &mut redis_connection, + rpc_http_url.clone(), + &program_id, + start_signature, + ) + .await, + "Failed to process recent signatures", + 5 + ); - info!("All historical logs processed, starting Solana WS subscription"); + info!("All historical logs processed, starting Solana WS subscription"); - let filter = RpcTransactionLogsFilter::Mentions(vec![program_id.to_string()]); - let config = RpcTransactionLogsConfig { - commitment: Some(CommitmentConfig::processed()), - }; + let filter = RpcTransactionLogsFilter::Mentions(vec![program_id.to_string()]); + let config = RpcTransactionLogsConfig { + commitment: Some(CommitmentConfig::processed()), + }; - loop { let ws_client = crate::skip_fail!( PubsubClient::new(rpc_ws_url).await, "Solana WebSocket connection failed", @@ -96,26 +95,31 @@ pub async fn start_indexer( } error!("Solana WebSocket stream closed, reconnecting..."); + start_signature = None; + + tokio::time::sleep(tokio::time::Duration::from_secs(5)).await; } } async fn process_recent_signatures( redis_connection: &mut redis::aio::MultiplexedConnection, - http_client: &RpcClient, + rpc_http_url: String, program_id: &Pubkey, - start_signature: Option, + start_signature: Option, ) -> Result<()> { + let http_client = RpcClient::new(rpc_http_url); + let from_signature = if let Some(signature) = start_signature { utils::redis::add_event( redis_connection, utils::redis::SOLANA_EVENTS, - signature.clone(), + signature.to_string(), // TODO: It's better to come up with a solution that wouldn't require storing `Null` value serde_json::Value::Null, ) .await; - Signature::from_str(&signature)? + signature } else { let Some(signature) = utils::redis::get_last_processed::<&str, String>( redis_connection,