Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add changes to fix issues running the Spacewalk client in Kubernetes #519

Merged
merged 28 commits into from
May 24, 2024
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
3ab512a
Change/remove `backoff::retry()` usage in runner
ebma Apr 18, 2024
2e6cbc4
Remove `backoff` from error.rs
ebma May 2, 2024
8132db0
Manually implement backoff behaviour
ebma May 2, 2024
4c52142
Merge branch 'main' into connection-issues-investigation
ebma May 7, 2024
88c108e
Fix build error
ebma May 7, 2024
e92bb7c
Formatting
ebma May 7, 2024
60d63e9
Refactor error messages
ebma May 7, 2024
a6fe253
Restore original logic with max retry interval
ebma May 8, 2024
bc4c6fc
Remove import
ebma May 8, 2024
32b8d0a
Run `cargo +nightly-2024-02-09 fmt --all`
ebma May 8, 2024
93c9bb2
Run cargo fmt with nightly
ebma May 8, 2024
8784c23
Add logic to reopen the runner websocket
ebma May 13, 2024
e24496d
Add debug logs
ebma May 13, 2024
d553ed7
Fix compile issues
ebma May 13, 2024
585c267
Add test for the backoff/retry logic
ebma May 15, 2024
2bf4dc2
Simplify retry logic
ebma May 15, 2024
8c2f5cb
Use test for both
ebma May 15, 2024
11504ed
Remove unfinished test case and refactor
ebma May 15, 2024
110dceb
fix trailing semicolons
b-yap May 16, 2024
4f3fed9
Refactor error handling around `try_get_release()`
ebma May 17, 2024
c9dcd4a
Merge branch 'main' into connection-issues-investigation
ebma May 22, 2024
18d8746
Remove patch statement for `ahash` again
ebma May 22, 2024
4e8b516
Remove patch statement for `ahash` again
ebma May 22, 2024
287116c
Replace references to `nightly-02-09` with `nightly-04-18`
ebma May 22, 2024
1409157
Don't log message for each retry
ebma May 22, 2024
64c7c9a
Change `max-parallel` to `2`
ebma May 22, 2024
cabd7f0
Add some 'allow' statements for clippy
ebma May 23, 2024
dc2407f
revert version to 2024-02-09
b-yap May 24, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
488 changes: 254 additions & 234 deletions Cargo.lock

Large diffs are not rendered by default.

4 changes: 4 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -155,3 +155,7 @@ xcm = { git = "https://github.com/paritytech//polkadot", branch = "release-v0.9.
[patch.crates-io]
sp-core = { git = "https://github.com/paritytech//substrate", branch = "polkadot-v0.9.42" }
sp-runtime = { git = "https://github.com/paritytech//substrate", branch = "polkadot-v0.9.42" }

# We need to patch this to https://github.com/tkaitchuck/aHash/releases/tag/v0.8.11 to prevent a build error
# 'error[E0635]: unknown feature `stdsimd`' that occurs because this feature was removed in the latest nightly versions
ahash = { git = "https://github.com/tkaitchuck/aHash", rev = "db36e4c4f0606b786bc617eefaffbe4ae9100762" }
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you think nightly-2024-04-18 would work in Spacewalk too? Or was there another issue? I don't remember.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have tried to test with +nightly and it was fine.

Copy link
Member Author

@ebma ebma May 16, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we define nightly-2024-04-18 in the rust-toolchain file in all references (README/github actions) then and remove this patch for ahash from the Cargo.toml file? Or what do you think @b-yap?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@b-yap any thoughts on this?

Copy link
Contributor

@b-yap b-yap May 22, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

mmm, I think the patch isn't needed?
I did a cargo update in my previous PR; the ahash in cargo.lock should be ok for now.
I mentioned the minimum nightly version in the readme; but I did not explain why.
https://github.com/pendulum-chain/pendulum?tab=readme-ov-file#how-to-run-tests
We could add the reason over there.

Copy link
Member Author

@ebma ebma May 22, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I removed the patch statement again and updated all references (also in the CI file) to point to the new nightly version. Let's see if the CI passes and then I merge.

6 changes: 3 additions & 3 deletions clients/runner/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ edition = "2021"


[dependencies]
clap = { version = "4.0.17", features = ["derive"]}
clap = { version = "4.0.17", features = ["derive"] }
hex = "0.4.3"
tokio = { version = "1.8", features = ["rt-multi-thread", "macros", "time"] }
codec = { package = "parity-scale-codec", version = "3.0.0", default-features = false, features = ["derive", "full", "bit-vec"] }
Expand All @@ -22,8 +22,8 @@ bytes = "1.1.0"
signal-hook = "0.3.14"
signal-hook-tokio = { version = "0.3.1", features = ["futures-v0_3"] }
futures = "0.3.21"
backoff = { version = "0.3.0", features = ["tokio"] }
subxt = { version = "0.29.0", default-features = false, features = ["jsonrpsee-ws"] }
exponential-backoff = "1.2.0"
subxt = { version = "0.29.0", default-features = false, features = ["jsonrpsee-ws"] }
sha2 = "0.8.2"

[dev-dependencies]
Expand Down
10 changes: 0 additions & 10 deletions clients/runner/src/error.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
#![allow(clippy::enum_variant_names)]

use backoff::Error as BackoffError;
use codec::Error as CodecError;
use nix::Error as OsError;
use reqwest::Error as ReqwestError;
Expand Down Expand Up @@ -38,12 +37,3 @@ pub enum Error {
#[error("Incorrect Checksum")]
IncorrectChecksum,
}

impl<E: Into<Error> + Sized> From<BackoffError<E>> for Error {
fn from(e: BackoffError<E>) -> Self {
match e {
BackoffError::Permanent(err) => err.into(),
BackoffError::Transient(err) => err.into(),
}
}
}
2 changes: 1 addition & 1 deletion clients/runner/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ async fn main() -> Result<(), Error> {
let opts: Opts = Opts::parse();
let rpc_client = retry_with_log_async(
|| subxt_api(&opts.parachain_ws).into_future().boxed(),
"Error fetching executable".to_string(),
"Error running RPC client".to_string(),
)
.await?;
log::info!("Connected to the parachain");
Expand Down
68 changes: 41 additions & 27 deletions clients/runner/src/runner.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
use crate::{error::Error, Opts};
use backoff::{retry, Error as BackoffError, ExponentialBackoff};
use bytes::Bytes;
use codec::Decode;
use futures::{future::BoxFuture, FutureExt, StreamExt, TryFutureExt};
Expand Down Expand Up @@ -71,7 +70,7 @@ pub const RETRY_TIMEOUT: Duration = Duration::from_millis(60_000);
pub const RETRY_INTERVAL: Duration = Duration::from_millis(1_000);

/// Multiplier for the interval in retry utilities: Constant interval retry
pub const RETRY_MULTIPLIER: f64 = 1.0;
pub const RETRY_MULTIPLIER: u32 = 1;

/// Data type assumed to be used by the parachain to store the client release.
/// If this type is different from the on-chain one, decoding will fail.
Expand Down Expand Up @@ -163,7 +162,7 @@ impl Runner {
.into_future()
.boxed()
},
"Error fetching executable".to_string(),
"Error downloading executable".to_string(),
)
.await?;

Expand All @@ -188,7 +187,7 @@ impl Runner {

let bytes = retry_with_log_async(
|| Runner::get_request_bytes(release.uri.clone()).into_future().boxed(),
"Error fetching executable".to_string(),
"Error getting request bytes for executable".to_string(),
)
.await?;

Expand Down Expand Up @@ -291,7 +290,7 @@ impl Runner {
.into_future()
.boxed()
},
"Error fetching executable".to_string(),
"Error reading chain storage for release".to_string(),
)
.await
}
Expand Down Expand Up @@ -589,41 +588,56 @@ pub async fn subxt_api(url: &str) -> Result<OnlineClient<PolkadotConfig>, Error>
Ok(OnlineClient::from_url(url).await?)
}

pub fn custom_retry_config() -> ExponentialBackoff {
ExponentialBackoff {
initial_interval: RETRY_INTERVAL,
max_elapsed_time: Some(RETRY_TIMEOUT),
multiplier: RETRY_MULTIPLIER,
..ExponentialBackoff::default()
}
fn create_backoff_strategy() -> exponential_backoff::Backoff {
let mut backoff = exponential_backoff::Backoff::new(u32::MAX, RETRY_INTERVAL, RETRY_TIMEOUT);
backoff.set_factor(RETRY_MULTIPLIER);
backoff.set_jitter(0.3);
backoff
}

pub fn retry_with_log<T, F>(mut f: F, log_msg: String) -> Result<T, Error>
where
F: FnMut() -> Result<T, Error>,
{
retry(custom_retry_config(), || {
f().map_err(|e| {
log::info!("{}: {}. Retrying...", log_msg, e.to_string());
BackoffError::Transient(e)
})
})
.map_err(Into::into)
let backoff = create_backoff_strategy();

// We store the error to return it if the backoff is exhausted
let mut error = None;
while let Some(duration) = backoff.iter().next() {
match f() {
Ok(result) => return Ok(result),
Err(err) => {
log::info!("{}: {}. Retrying...", log_msg, err.to_string());
std::thread::sleep(duration);
error = Some(err)
},
}
}

Err(error.expect("Error should not be None if we reach here."))
}

pub async fn retry_with_log_async<'a, T, F, E>(f: F, log_msg: String) -> Result<T, Error>
where
F: Fn() -> BoxFuture<'a, Result<T, E>>,
E: Into<Error> + Sized + Display,
{
backoff::future::retry(custom_retry_config(), || async {
f().await.map_err(|e| {
log::info!("{}: {}. Retrying...", log_msg, e.to_string());
BackoffError::Transient(e)
})
})
.await
.map_err(Into::into)
let backoff = create_backoff_strategy();

// We store the error to return it if the backoff is exhausted
let mut error = None;
while let Some(duration) = backoff.iter().next() {
match f().await {
Ok(result) => return Ok(result),
Err(err) => {
log::info!("{}: {}. Retrying...", log_msg, err.to_string());
tokio::time::sleep(duration).await;
error = Some(err)
},
}
}

Err(error.expect("Error should not be None if we reach here.").into())
}

#[cfg(test)]
Expand Down
14 changes: 8 additions & 6 deletions clients/runtime/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -137,10 +137,10 @@ impl Error {
let data_string = custom_error.data().map(ToString::to_string).unwrap_or_default();

if RecoverableError::is_recoverable(&data_string) {
return Some(Recoverability::Recoverable(data_string))
return Some(Recoverability::Recoverable(data_string));
}

return Some(Recoverability::Unrecoverable(data_string))
return Some(Recoverability::Unrecoverable(data_string));
} else {
None
}
Expand All @@ -160,7 +160,7 @@ impl Error {

pub fn is_rpc_disconnect_error(&self) -> bool {
match self {
Error::SubxtRuntimeError(SubxtError::Rpc(RpcError::ClientError(e))) =>
Error::SubxtRuntimeError(SubxtError::Rpc(RpcError::ClientError(e))) => {
match e.downcast_ref::<JsonRpseeError>() {
Some(e) => matches!(e, JsonRpseeError::RestartNeeded(_)),
None => {
Expand All @@ -169,7 +169,8 @@ impl Error {
);
false
},
},
}
},
Error::SubxtRuntimeError(SubxtError::Rpc(RpcError::SubscriptionDropped)) => true,
_ => false,
}
Expand Down Expand Up @@ -203,7 +204,7 @@ impl Error {

pub fn is_timeout_error(&self) -> bool {
match self {
Error::SubxtRuntimeError(SubxtError::Rpc(RpcError::ClientError(e))) =>
Error::SubxtRuntimeError(SubxtError::Rpc(RpcError::ClientError(e))) => {
match e.downcast_ref::<JsonRpseeError>() {
Some(e) => matches!(e, JsonRpseeError::RequestTimeout),
None => {
Expand All @@ -212,7 +213,8 @@ impl Error {
);
false
},
},
}
},
_ => false,
}
}
Expand Down
14 changes: 7 additions & 7 deletions clients/wallet/src/cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ macro_rules! unwrap_or_return {
Ok(result) => result,
Err(e) => {
tracing::warn!("{:?}: {:?}", $log, e);
return $ret
return $ret;
},
}
};
Expand Down Expand Up @@ -66,7 +66,7 @@ impl WalletStateStorage {
let path = self.cursor_path();
let path = Path::new(&path);
if !path.exists() {
return 0
return 0;
}

let Ok(content_from_file) = read_content_from_path(path) else { return 0 };
Expand Down Expand Up @@ -131,7 +131,7 @@ impl WalletStateStorage {
return Err(Error::cache_error_with_seq(
CacheErrorKind::SequenceNumberAlreadyUsed,
sequence,
))
));
}

let mut file = OpenOptions::new()
Expand Down Expand Up @@ -189,7 +189,7 @@ impl WalletStateStorage {
if let Err(e) = create_dir_all(&full_path) {
tracing::warn!("Failed to create directory of {full_path}: {:?}", e);
}
return
return;
};

for entry in directory.flatten() {
Expand All @@ -207,7 +207,7 @@ impl WalletStateStorage {
let path = Path::new(&full_file_path);

if !path.exists() {
return Err(Error::cache_error_with_seq(CacheErrorKind::FileDoesNotExist, sequence))
return Err(Error::cache_error_with_seq(CacheErrorKind::FileDoesNotExist, sequence));
}

extract_tx_envelope_from_path(path).map(|(tx, _)| tx)
Expand Down Expand Up @@ -251,7 +251,7 @@ impl WalletStateStorage {

// return an error if all the files have errors.
if tx_envelopes.is_empty() && !errors.is_empty() {
return Err(errors)
return Err(errors);
}

// sort in ascending order, based on the sequence number.
Expand Down Expand Up @@ -290,7 +290,7 @@ fn extract_tx_envelope_from_path<P: AsRef<Path> + std::fmt::Debug + Clone>(

// convert the content into `Vec<u8>`
let Some(content_as_vec_u8) = parse_xdr_string_to_vec_u8(&content_from_file) else {
return Err(Error::cache_error_with_path(CacheErrorKind::InvalidFile, format!("{path:?}")))
return Err(Error::cache_error_with_path(CacheErrorKind::InvalidFile, format!("{path:?}")));
};

// convert the content to TransactionEnvelope
Expand Down
4 changes: 2 additions & 2 deletions pallets/currency/src/amount.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ mod math {

pub fn ensure_is_compatible_with_target_chain(&self) -> Result<(), DispatchError> {
if !T::AmountCompatibility::is_compatible_with_target(self.amount) {
return Err(Error::<T>::IncompatibleAmount.into())
return Err(Error::<T>::IncompatibleAmount.into());
}
Ok(())
}
Expand All @@ -115,7 +115,7 @@ mod math {
F: Fn(&BalanceOf<T>, &BalanceOf<T>) -> Option<BalanceOf<T>>,
{
if self.currency_id != other.currency_id {
return Err(Error::<T>::InvalidCurrency.into())
return Err(Error::<T>::InvalidCurrency.into());
}
let amount = f(&self.amount, &other.amount).ok_or(err)?;

Expand Down
10 changes: 5 additions & 5 deletions pallets/oracle/src/dia.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,8 +74,8 @@ impl<T: NativeCurrencyKey + XCMCurrencyConversion> Convert<OracleKey, Option<(Ve

Some((FIAT_DIA_BLOCKCHAIN.as_bytes().to_vec(), fiat_quote))
},
CurrencyId::Stellar(primitives::Asset::AlphaNum12 { .. }) |
CurrencyId::ZenlinkLPToken(_, _, _, _) => None,
CurrencyId::Stellar(primitives::Asset::AlphaNum12 { .. })
| CurrencyId::ZenlinkLPToken(_, _, _, _) => None,
CurrencyId::Token(_) => None,
},
}
Expand All @@ -93,8 +93,8 @@ impl<T: NativeCurrencyKey + XCMCurrencyConversion> Convert<(Vec<u8>, Vec<u8>), O
Some(OracleKey::ExchangeRate(CurrencyId::XCM(xcm_currency_id)))
} else if blockchain == T::native_chain() && symbol == T::native_symbol() {
Some(OracleKey::ExchangeRate(CurrencyId::Native))
} else if blockchain == STELLAR_DIA_BLOCKCHAIN.as_bytes().to_vec() &&
symbol == STELLAR_DIA_SYMBOL.as_bytes().to_vec()
} else if blockchain == STELLAR_DIA_BLOCKCHAIN.as_bytes().to_vec()
&& symbol == STELLAR_DIA_SYMBOL.as_bytes().to_vec()
{
Some(OracleKey::ExchangeRate(CurrencyId::StellarNative))
} else if blockchain == FIAT_DIA_BLOCKCHAIN.as_bytes().to_vec() {
Expand Down Expand Up @@ -143,7 +143,7 @@ where

let value = ConvertPrice::convert(coin_info.price)?;
let Some(timestamp) = ConvertMoment::convert(coin_info.last_update_timestamp) else {
return None
return None;
};

Some(TimestampedValue { value, timestamp })
Expand Down
10 changes: 5 additions & 5 deletions pallets/oracle/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -234,9 +234,9 @@ impl<T: Config> Pallet<T> {
}

let current_status_is_online = Self::is_oracle_online();
let new_status_is_online = oracle_keys.len() > 0 &&
updated_items_len > 0 &&
updated_items_len == oracle_keys.len();
let new_status_is_online = oracle_keys.len() > 0
&& updated_items_len > 0
&& updated_items_len == oracle_keys.len();

if current_status_is_online != new_status_is_online {
if new_status_is_online {
Expand Down Expand Up @@ -294,7 +294,7 @@ impl<T: Config> Pallet<T> {
ext::security::ensure_parachain_status_running::<T>()?;

let Some(price) = T::DataProvider::get_no_op(&key) else {
return Err(Error::<T>::MissingExchangeRate.into())
return Err(Error::<T>::MissingExchangeRate.into());
};
Ok(price.value)
}
Expand Down Expand Up @@ -360,7 +360,7 @@ impl<T: Config> Pallet<T> {
to_decimals: u32,
) -> Result<BalanceOf<T>, DispatchError> {
if from_amount.is_zero() {
return Ok(Zero::zero())
return Ok(Zero::zero());
}

let from_amount = T::UnsignedFixedPoint::from_inner(from_amount);
Expand Down
15 changes: 9 additions & 6 deletions pallets/oracle/src/oracle_mock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,12 +41,15 @@ impl Convert<Key, Option<(Vec<u8>, Vec<u8>)>> for MockOracleKeyConvertor {
CurrencyId::XCM(token_symbol) => Some((vec![0u8], vec![token_symbol])),
CurrencyId::Native => Some((vec![2u8], vec![])),
CurrencyId::StellarNative => Some((vec![3u8], vec![])),
CurrencyId::Stellar(Asset::AlphaNum4 { code, .. }) =>
Some((vec![4u8], code.to_vec())),
CurrencyId::Stellar(Asset::AlphaNum12 { code, .. }) =>
Some((vec![5u8], code.to_vec())),
CurrencyId::ZenlinkLPToken(token1_id, token1_type, token2_id, token2_type) =>
Some((vec![6], vec![token1_id, token1_type, token2_id, token2_type])),
CurrencyId::Stellar(Asset::AlphaNum4 { code, .. }) => {
Some((vec![4u8], code.to_vec()))
},
CurrencyId::Stellar(Asset::AlphaNum12 { code, .. }) => {
Some((vec![5u8], code.to_vec()))
},
CurrencyId::ZenlinkLPToken(token1_id, token1_type, token2_id, token2_type) => {
Some((vec![6], vec![token1_id, token1_type, token2_id, token2_type]))
},
CurrencyId::Token(token_symbol) => {
let token_symbol = token_symbol.to_be_bytes().to_vec();
Some((vec![7], token_symbol))
Expand Down
Loading
Loading