Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: restart ws connection to rpc when it was dropped and sync lost blocks #227

Open
wants to merge 1 commit into
base: feat/improved-logs
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion omni-relayer/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use anyhow::{Context, Result};
use clap::Parser;
use log::{error, info};
use omni_types::ChainKind;
use solana_sdk::signature::Signature;

mod config;
mod startup;
Expand All @@ -29,7 +30,7 @@ struct CliArgs {
arb_start_block: Option<u64>,
/// Start signature for Solana indexer
#[clap(long)]
solana_start_signature: Option<String>,
solana_start_signature: Option<Signature>,
}

#[tokio::main]
Expand Down
137 changes: 81 additions & 56 deletions omni-relayer/src/startup/evm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ pub async fn start_indexer(
config: config::Config,
redis_client: redis::Client,
chain_kind: ChainKind,
start_block: Option<u64>,
mut start_block: Option<u64>,
) -> Result<()> {
let mut redis_connection = redis_client.get_multiplexed_tokio_connection().await?;

Expand All @@ -49,25 +49,97 @@ pub async fn start_indexer(
_ => anyhow::bail!("Unsupported chain kind: {chain_kind:?}"),
};

let http_provider = ProviderBuilder::new().on_http(rpc_http_url.parse().context(format!(
"Failed to parse {chain_kind:?} rpc provider as url",
))?);
let filter = Filter::new()
.address(bridge_token_factory_address)
.event_signature(
[
utils::evm::InitTransfer::SIGNATURE_HASH,
utils::evm::FinTransfer::SIGNATURE_HASH,
utils::evm::DeployToken::SIGNATURE_HASH,
]
.to_vec(),
);

loop {
let http_provider = ProviderBuilder::new().on_http(rpc_http_url.parse().context(
format!("Failed to parse {chain_kind:?} rpc provider as url",),
)?);

process_recent_blocks(
&mut redis_connection,
&http_provider,
&filter,
chain_kind,
start_block,
block_processing_batch_size,
expected_finalization_time,
)
.await?;

info!(
"All historical logs processed, starting {:?} WS subscription",
chain_kind
);

let ws_provider = crate::skip_fail!(
ProviderBuilder::new()
.on_ws(WsConnect::new(&rpc_ws_url))
.await,
format!("{chain_kind:?} WebSocket connection failed"),
5
);

let mut stream = crate::skip_fail!(
ws_provider.subscribe_logs(&filter).await,
format!("{chain_kind:?} WebSocket subscription failed"),
5
)
.into_stream();

info!("Subscribed to {:?} logs", chain_kind);

while let Some(log) = stream.next().await {
process_log(
chain_kind,
&mut redis_connection,
&http_provider,
log,
expected_finalization_time,
)
.await;
}

error!("{chain_kind:?} WebSocket stream closed unexpectedly, reconnecting...");
start_block = None;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is done to make sure that first start uses start_block (if it was provided as an argument) and then we need to switch back to using last_processed_block. Let me know if this should be refactored using better code practices

Also, same logic can be found in solana startup files


tokio::time::sleep(tokio::time::Duration::from_secs(5)).await;
}
}

async fn process_recent_blocks(
redis_connection: &mut redis::aio::MultiplexedConnection,
http_provider: &RootProvider<Http<Client>>,
filter: &Filter,
chain_kind: ChainKind,
start_block: Option<u64>,
block_processing_batch_size: u64,
expected_finalization_time: i64,
) -> Result<()> {
let last_processed_block_key = utils::redis::get_last_processed_key(chain_kind);
let latest_block = http_provider.get_block_number().await?;
let from_block = match start_block {
Some(block) => block,
None => {
if let Some(block) = utils::redis::get_last_processed::<&str, u64>(
&mut redis_connection,
redis_connection,
&last_processed_block_key,
)
.await
{
block + 1
} else {
utils::redis::update_last_processed(
&mut redis_connection,
redis_connection,
&last_processed_block_key,
latest_block + 1,
)
Expand All @@ -79,17 +151,6 @@ pub async fn start_indexer(

info!("{chain_kind:?} indexer will start from block: {from_block}");

let filter = Filter::new()
.address(bridge_token_factory_address)
.event_signature(
[
utils::evm::InitTransfer::SIGNATURE_HASH,
utils::evm::FinTransfer::SIGNATURE_HASH,
utils::evm::DeployToken::SIGNATURE_HASH,
]
.to_vec(),
);

for current_block in
(from_block..latest_block).step_by(usize::try_from(block_processing_batch_size)?)
{
Expand All @@ -105,52 +166,16 @@ pub async fn start_indexer(
for log in logs {
process_log(
chain_kind,
&mut redis_connection,
&http_provider,
redis_connection,
http_provider,
log,
expected_finalization_time,
)
.await;
}
}

info!(
"All historical logs processed, starting {:?} WS subscription",
chain_kind
);

loop {
let ws_provider = crate::skip_fail!(
ProviderBuilder::new()
.on_ws(WsConnect::new(&rpc_ws_url))
.await,
format!("{chain_kind:?} WebSocket connection failed"),
5
);

let mut stream = crate::skip_fail!(
ws_provider.subscribe_logs(&filter).await,
format!("{chain_kind:?} WebSocket subscription failed"),
5
)
.into_stream();

info!("Subscribed to {:?} logs", chain_kind);

while let Some(log) = stream.next().await {
process_log(
chain_kind,
&mut redis_connection,
&http_provider,
log,
expected_finalization_time,
)
.await;
}

error!("{chain_kind:?} WebSocket stream closed unexpectedly, reconnecting...");
tokio::time::sleep(tokio::time::Duration::from_secs(5)).await;
}
Ok(())
}

async fn process_log(
Expand Down
50 changes: 27 additions & 23 deletions omni-relayer/src/startup/solana.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ pub fn get_keypair(file: Option<&String>) -> Keypair {
pub async fn start_indexer(
config: config::Config,
redis_client: redis::Client,
start_signature: Option<String>,
mut start_signature: Option<Signature>,
) -> Result<()> {
let Some(solana_config) = config.solana else {
anyhow::bail!("Failed to get Solana config");
Expand All @@ -43,27 +43,26 @@ pub async fn start_indexer(
let rpc_ws_url = &solana_config.rpc_ws_url;
let program_id = Pubkey::from_str(&solana_config.program_id)?;

let http_client = RpcClient::new(rpc_http_url.to_string());

if let Err(e) = process_recent_signatures(
&mut redis_connection,
&http_client,
&program_id,
start_signature,
)
.await
{
warn!("Failed to fetch recent logs: {}", e);
}
loop {
crate::skip_fail!(
process_recent_signatures(
&mut redis_connection,
rpc_http_url.clone(),
&program_id,
start_signature,
)
.await,
"Failed to process recent signatures",
5
);

info!("All historical logs processed, starting Solana WS subscription");
info!("All historical logs processed, starting Solana WS subscription");

let filter = RpcTransactionLogsFilter::Mentions(vec![program_id.to_string()]);
let config = RpcTransactionLogsConfig {
commitment: Some(CommitmentConfig::processed()),
};
let filter = RpcTransactionLogsFilter::Mentions(vec![program_id.to_string()]);
let config = RpcTransactionLogsConfig {
commitment: Some(CommitmentConfig::processed()),
};

loop {
let ws_client = crate::skip_fail!(
PubsubClient::new(rpc_ws_url).await,
"Solana WebSocket connection failed",
Expand Down Expand Up @@ -96,26 +95,31 @@ pub async fn start_indexer(
}

error!("Solana WebSocket stream closed, reconnecting...");
start_signature = None;

tokio::time::sleep(tokio::time::Duration::from_secs(5)).await;
}
}

async fn process_recent_signatures(
redis_connection: &mut redis::aio::MultiplexedConnection,
http_client: &RpcClient,
rpc_http_url: String,
program_id: &Pubkey,
start_signature: Option<String>,
start_signature: Option<Signature>,
) -> Result<()> {
let http_client = RpcClient::new(rpc_http_url);

let from_signature = if let Some(signature) = start_signature {
utils::redis::add_event(
redis_connection,
utils::redis::SOLANA_EVENTS,
signature.clone(),
signature.to_string(),
// TODO: It's better to come up with a solution that wouldn't require storing `Null` value
serde_json::Value::Null,
)
.await;

Signature::from_str(&signature)?
signature
} else {
let Some(signature) = utils::redis::get_last_processed::<&str, String>(
redis_connection,
Expand Down
Loading