Skip to content

Commit

Permalink
make sure to remove in prog on the right err
Browse files Browse the repository at this point in the history
  • Loading branch information
MujkicA committed Dec 16, 2023
1 parent 21f09df commit 84a126d
Show file tree
Hide file tree
Showing 2 changed files with 92 additions and 64 deletions.
2 changes: 1 addition & 1 deletion src/dispense_tracker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ impl DispenseTracker {
}
}

pub fn is_tracked(&self, address: &Address) -> bool {
pub fn has_tracked(&self, address: &Address) -> bool {
self.tracked.get(address).is_some() || self.in_progress.contains(address)
}
}
154 changes: 91 additions & 63 deletions src/routes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@ use axum::{

use fuel_core_client::client::FuelClient;
use fuel_tx::UtxoId;
use fuel_types::{Address, AssetId};
use fuels_accounts::{Account, Signer, ViewOnlyAccount};
use fuel_types::{Address, AssetId, Bytes32};
use fuels_accounts::{wallet::WalletUnlocked, Account, Signer, ViewOnlyAccount};
use fuels_core::types::transaction::{Transaction, TxPolicies};
use fuels_core::types::transaction_builders::BuildableTransaction;
use fuels_core::types::{
Expand Down Expand Up @@ -120,20 +120,83 @@ impl IntoResponse for DispenseInfoResponse {
}
}

async fn check_and_mark_dispense_limit(
fn check_and_mark_dispense_limit(
dispense_tracker: &SharedDispenseTracker,
address: Address,
interval: u64,
) -> bool {
let mut dispense_tracker = dispense_tracker.lock().unwrap();
dispense_tracker.evict_expired_entries(interval);
) -> Result<(), DispenseError> {
let mut tracker = dispense_tracker.lock().unwrap();
tracker.evict_expired_entries(interval);

if !dispense_tracker.is_tracked(&address) {
dispense_tracker.mark_in_progress(address);
return false;
if tracker.has_tracked(&address) {
return Err(error(
"Account has already received assets today".to_string(),
StatusCode::TOO_MANY_REQUESTS,
));
}

true
tracker.mark_in_progress(address);
Ok(())
}

async fn get_coin_output(wallet: &WalletUnlocked) -> Result<CoinOutput, DispenseError> {
let resources = wallet
.get_spendable_resources(AssetId::BASE, THE_BIGGEST_AMOUNT)
.await
.map_err(|e| {
error(
format!("Failed to get resources: {e}"),
StatusCode::INTERNAL_SERVER_ERROR,
)
})?;

let coin_output = resources
.into_iter()
.filter_map(|coin| match coin {
CoinType::Coin(coin) => Some(CoinOutput {
utxo_id: coin.utxo_id,
owner: coin.owner.into(),
amount: coin.amount,
}),
_ => None,
})
.last()
.ok_or_else(|| {
error(
"The wallet is empty".to_string(),
StatusCode::INTERNAL_SERVER_ERROR,
)
})?;

Ok(coin_output)
}

async fn submit_tx_with_timeout(
client: &FuelClient,
tx_id: &Bytes32,
timeout: u64,
) -> Result<(), DispenseError> {
tokio::time::timeout(
Duration::from_secs(timeout),
client.await_transaction_commit(tx_id),
)
.await
.map(|r| {
r.map_err(|e| {
error(
format!("Failed to submit transaction with error: {e}"),
StatusCode::INTERNAL_SERVER_ERROR,
)
})
})
.map_err(|e| {
error(
format!("Got a timeout during transaction submission: {e}"),
StatusCode::INTERNAL_SERVER_ERROR,
)
})??;

Ok(())
}

#[tracing::instrument(skip(wallet, config))]
Expand All @@ -158,15 +221,6 @@ pub async fn dispense_tokens(
));
}?;

if check_and_mark_dispense_limit(&dispense_tracker, address, config.dispense_limit_interval)
.await
{
return Err(error(
"Account has already received assets today".to_string(),
StatusCode::TOO_MANY_REQUESTS,
));
};

// verify captcha
if let Some(s) = config.captcha_secret.clone() {
recaptcha::verify(s.expose_secret(), input.captcha.as_str(), None)
Expand All @@ -180,39 +234,26 @@ pub async fn dispense_tokens(
})?;
}

let provider = wallet.provider().expect("client provider");
check_and_mark_dispense_limit(&dispense_tracker, address, config.dispense_limit_interval)?;
let cleanup = || {
dispense_tracker
.lock()
.unwrap()
.remove_in_progress(&address);
};

let provider = wallet.provider().expect("client provider");
let mut tx_id;

loop {
let mut guard = state.lock().await;
let coin_output = if let Some(previous_coin_output) = &guard.last_output {
*previous_coin_output
} else {
wallet
.get_spendable_resources(AssetId::BASE, THE_BIGGEST_AMOUNT)
.await
.map_err(|e| {
dispense_tracker
.lock()
.unwrap()
.remove_in_progress(&address);
error(
format!("Failed to get resources: {e}"),
StatusCode::INTERNAL_SERVER_ERROR,
)
})?
.into_iter()
.filter_map(|coin| match coin {
CoinType::Coin(coin) => Some(CoinOutput {
utxo_id: coin.utxo_id,
owner: coin.owner.into(),
amount: coin.amount,
}),
_ => None,
})
.last()
.expect("The wallet is empty")
get_coin_output(&wallet).await.map_err(|e| {
cleanup();
e
})?
};

let coin_type = CoinType::Coin(Coin {
Expand Down Expand Up @@ -286,25 +327,12 @@ pub async fn dispense_tokens(
};
}

tokio::time::timeout(
Duration::from_secs(config.timeout),
client.await_transaction_commit(&tx_id),
)
.await
.map(|r| {
r.map_err(|e| {
error(
format!("Failed to submit transaction with error: {e}"),
StatusCode::INTERNAL_SERVER_ERROR,
)
})
})
.map_err(|e| {
error(
format!("Got a timeout during transaction submission: {e}"),
StatusCode::INTERNAL_SERVER_ERROR,
)
})??;
submit_tx_with_timeout(&client, &tx_id, config.timeout)
.await
.map_err(|e| {
cleanup();
e
})?;

info!(
"dispensed {} tokens to {:#x}",
Expand Down

0 comments on commit 84a126d

Please sign in to comment.