Skip to content

Commit

Permalink
fix timeout issue
Browse files Browse the repository at this point in the history
  • Loading branch information
b-yap committed Oct 7, 2024
1 parent 524342e commit 9b9dfd3
Show file tree
Hide file tree
Showing 3 changed files with 62 additions and 56 deletions.
6 changes: 3 additions & 3 deletions clients/vault/src/oracle/agent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,8 @@ impl OracleAgent {
}

#[cfg(any(test, feature = "integration"))]
pub fn is_stellar_running(&self) -> bool {
self.message_sender.is_some()
pub async fn is_stellar_running(&self) -> bool {
self.message_sender.is_some() && self.collector.read().await.last_slot_index() > 0
}
}

Expand Down Expand Up @@ -180,7 +180,7 @@ pub async fn start_oracle_agent(

tokio::spawn(listen_for_stellar_messages(cfg, oracle_agent.clone(), secret, shutdown_sender));

while !oracle_agent.read().await.is_stellar_running() {
while !oracle_agent.read().await.is_stellar_running().await {
sleep(Duration::from_millis(500)).await;
}

Expand Down
107 changes: 56 additions & 51 deletions clients/vault/src/oracle/collector/proof_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -237,61 +237,66 @@ impl ScpMessageCollector {
}
});

if let Some(i) = value {
if let ScpHistoryEntry::V0(scp_entry_v0) = i {
let slot_scp_envelopes = scp_entry_v0.clone().ledger_messages.messages;
let vec_scp = slot_scp_envelopes.get_vec().clone();

// Filter out any envelopes that are not externalize or confirm statements
let relevant_envelopes = vec_scp
.into_iter()
.filter(|scp| match scp.statement.pledges {
ScpStatementPledges::ScpStExternalize(_) |
ScpStatementPledges::ScpStConfirm(_) => true,
_ => false,
})
.collect::<Vec<_>>();

let externalized_envelopes_count = relevant_envelopes
.iter()
.filter(|scp| match scp.statement.pledges {
ScpStatementPledges::ScpStExternalize(_) => true,
_ => false,
})
.count();

// Ensure that at least one envelope is externalized
if externalized_envelopes_count == 0 {
tracing::error!(
let Some(ScpHistoryEntry::V0(scp_entry_v0)) = value else {
tracing::warn!("get_envelopes_from_horizon_archive(): Could not get ScpHistory entry from archive {archive_url} for slot {slot}");
continue;
};

let slot_scp_envelopes = scp_entry_v0.clone().ledger_messages.messages;
let vec_scp = slot_scp_envelopes.get_vec().clone();

// Filter out any envelopes that are not externalize or confirm statements
let mut relevant_envelopes = vec_scp
.into_iter()
.filter(|scp| match scp.statement.pledges {
ScpStatementPledges::ScpStExternalize(_) |
ScpStatementPledges::ScpStConfirm(_) => true,
_ => false,
})
.collect::<Vec<_>>();

let externalized_envelopes_count = relevant_envelopes
.iter()
.filter(|scp| match scp.statement.pledges {
ScpStatementPledges::ScpStExternalize(_) => true,
_ => false,
})
.count();

// Ensure that at least one envelope is externalized
if externalized_envelopes_count == 0 {
tracing::error!(
"get_envelopes_from_horizon_archive(): The contained archive entry fetched from {} for slot {slot} is invalid because it does not contain any externalized envelopes.",
scp_archive_storage.0
);
// remove the file since it's invalid.
scp_archive_storage.remove_file(slot);
continue;
}

let mut envelopes_map = envelopes_map_arc.write();
let mut from_archive_map = env_from_archive_map.write();

if envelopes_map.get(&slot).is_none() {
tracing::info!(
"get_envelopes_from_horizon_archive(): Adding {} archived SCP envelopes for slot {slot} to envelopes map. {} are externalized",
relevant_envelopes.len(),
externalized_envelopes_count
);
envelopes_map.insert(slot, relevant_envelopes);
// indicates that the data was taken from the archive
from_archive_map.insert(slot, ());

// remove the archive file after successfully retrieving envelopes
scp_archive_storage.remove_file(slot);
break;
}
}
} else {
tracing::warn!("get_envelopes_from_horizon_archive(): Could not get ScpHistory entry from archive {archive_url} for slot {slot}");
// remove the file since it's invalid.
scp_archive_storage.remove_file(slot);
continue;
}

let mut envelopes_map = envelopes_map_arc.write();
let mut from_archive_map = env_from_archive_map.write();

tracing::info!(
"get_envelopes_from_horizon_archive(): Adding {} archived SCP envelopes for slot {slot} to envelopes map. {} are externalized",
relevant_envelopes.len(),
externalized_envelopes_count
);
let mut envs = envelopes_map.get(&slot).map(|envs| envs.clone()).unwrap_or(vec![]);

if envs.len() > 0 {
tracing::info!("get_envelopes_from_horizon_archive(): {} envelopes already exist for slot {slot}", envs.len());
}

relevant_envelopes.append(&mut envs);
envelopes_map.insert(slot, relevant_envelopes);

// indicates that the data was taken from the archive
from_archive_map.insert(slot, ());

// remove the archive file after successfully retrieving envelopes
scp_archive_storage.remove_file(slot);
break;
}
}
}
Expand Down
5 changes: 3 additions & 2 deletions clients/vault/tests/vault_integration_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,8 @@ async fn test_redeem_succeeds_on_network(is_public_network: bool) {
let vault_id_manager =
VaultIdManager::from_map(vault_provider.clone(), vault_wallet.clone(), vault_ids);

// We issue 1/1000 unit
let issue_amount = DecimalsLookupImpl::one(CurrencyId::Native) / 1000;
// We issue 1 (spacewalk-chain) unit
let issue_amount = DecimalsLookupImpl::one(CurrencyId::Native) / 100;
let vault_collateral = get_required_vault_collateral_for_issue(
&vault_provider,
issue_amount,
Expand Down Expand Up @@ -605,6 +605,7 @@ async fn test_issue_cancel_succeeds() {
#[tokio::test(flavor = "multi_thread")]
#[serial]
async fn test_issue_execution_succeeds_from_archive_on_mainnet() {
env_logger::init();
let is_public_network = true;
test_issue_execution_succeeds_from_archive_on_network(is_public_network).await;
}
Expand Down

0 comments on commit 9b9dfd3

Please sign in to comment.