From d3eafbe464b37dd633f336350e2a6041df54144f Mon Sep 17 00:00:00 2001 From: Gustavo Inacio Date: Fri, 24 May 2024 19:22:00 -0300 Subject: [PATCH] Gusinacio/dont crash sender account (#190) * feat(tap-agent): ignore blocked allocation ids Signed-off-by: Gustavo Inacio * fix(tap-agent): retry last rav forever Signed-off-by: Gustavo Inacio * fix(tap-agent): never fail sender account, add block allocation when closing it Signed-off-by: Gustavo Inacio * docs(tap-agent): update comments Co-authored-by: Alexis Asseman --------- Signed-off-by: Gustavo Inacio Co-authored-by: Alexis Asseman --- tap-agent/src/agent/sender_account.rs | 31 +++++++++++++++-------- tap-agent/src/agent/sender_allocation.rs | 26 +++++++------------ tap-agent/src/agent/sender_fee_tracker.rs | 27 ++++++++++++++++++++ 3 files changed, 57 insertions(+), 27 deletions(-) diff --git a/tap-agent/src/agent/sender_account.rs b/tap-agent/src/agent/sender_account.rs index 8f156033..bee3ae9a 100644 --- a/tap-agent/src/agent/sender_account.rs +++ b/tap-agent/src/agent/sender_account.rs @@ -136,7 +136,10 @@ impl State { async fn rav_requester_single(&mut self) -> Result<()> { let Some(allocation_id) = self.sender_fee_tracker.get_heaviest_allocation_id() else { - anyhow::bail!("Error while getting the heaviest allocation because none has unaggregated fees tracked"); + anyhow::bail!( + "Error while getting the heaviest allocation because \ + no unblocked allocation has enough unaggregated fees tracked" + ); }; let sender_allocation_id = self.format_sender_allocation(&allocation_id); let allocation = ActorRef::::where_is(sender_allocation_id); @@ -147,7 +150,9 @@ impl State { ); }; // we call and wait for the response so we don't process anymore update - let (fees, rav) = call!(allocation, SenderAllocationMessage::TriggerRAVRequest)?; + let Ok((fees, rav)) = call!(allocation, SenderAllocationMessage::TriggerRAVRequest) else { + anyhow::bail!("Error while sending and waiting message for actor {allocation_id}"); + }; // update rav tracker self.rav_tracker.update( @@ -470,14 +475,13 @@ impl Actor for SenderAccount { trigger_value = state.config.tap.rav_request_trigger_value, "Total fee greater than the trigger value. Triggering RAV request" ); - // There are only 3 scenarios where this can fail: - // - No allocation ids - // - The SenderAllocation could not be found - // - The SenderAllocation could not process the message - // In any case, we want to respawn this whole actor - // and respawn all its children actors. Thus, we can safely - // panic the actor by using ? - state.rav_requester_single().await?; + // In case we fail, we want our actor to keep running + if let Err(err) = state.rav_requester_single().await { + tracing::error!( + error = %err, + "There was an error while requesting a RAV." + ); + } } // Maybe allow the sender right after the potential RAV request. This way, the @@ -502,6 +506,9 @@ impl Actor for SenderAccount { state.format_sender_allocation(allocation_id), ) { tracing::trace!(%allocation_id, "SenderAccount shutting down SenderAllocation"); + // we can not send a rav request to this allocation + // because it's gonna trigger the last rav + state.sender_fee_tracker.block_allocation_id(*allocation_id); sender_handle.stop(None); } } @@ -594,6 +601,10 @@ impl Actor for SenderAccount { let tracker = &mut state.sender_fee_tracker; tracker.update(allocation_id, 0); + // clean up hashset + state + .sender_fee_tracker + .unblock_allocation_id(allocation_id); // rav tracker is not updated because it's still not redeemed } SupervisionEvent::ActorPanicked(cell, error) => { diff --git a/tap-agent/src/agent/sender_allocation.rs b/tap-agent/src/agent/sender_allocation.rs index 71176414..61cd42d0 100644 --- a/tap-agent/src/agent/sender_allocation.rs +++ b/tap-agent/src/agent/sender_allocation.rs @@ -194,27 +194,18 @@ impl Actor for SenderAllocation { ); // Request a RAV and mark the allocation as final. while state.unaggregated_fees.value > 0 { - if state.request_rav().await.is_err() { - break; + if let Err(err) = state.request_rav().await { + error!(error = %err, "There was an error while requesting rav. Retrying in 30 seconds..."); + tokio::time::sleep(Duration::from_secs(30)).await; } } - if state.unaggregated_fees.value > 0 { - Err(anyhow!( - "There are still pending unaggregated_fees for sender {} and allocation {}.\ - Not marking as last.", - state.sender, - state.allocation_id - ))?; - } - state.mark_rav_last().await.inspect_err(|e| { - error!( - "Error while marking allocation {} as last for sender {}: {}", - state.allocation_id, state.sender, e - ); - })?; + while let Err(err) = state.mark_rav_last().await { + error!(error = %err, %state.allocation_id, %state.sender, "Error while marking allocation last. Retrying in 30 seconds..."); + tokio::time::sleep(Duration::from_secs(30)).await; + } - //Since this is only triggered after allocation is closed will be counted here + // Since this is only triggered after allocation is closed will be counted here CLOSED_SENDER_ALLOCATIONS.inc(); Ok(()) @@ -250,6 +241,7 @@ impl Actor for SenderAllocation { ); u128::MAX }); + // it's fine to crash the actor, could not send a message to its parent state .sender_account_ref .cast(SenderAccountMessage::UpdateReceiptFees( diff --git a/tap-agent/src/agent/sender_fee_tracker.rs b/tap-agent/src/agent/sender_fee_tracker.rs index 5ab4aaf9..d03792b6 100644 --- a/tap-agent/src/agent/sender_fee_tracker.rs +++ b/tap-agent/src/agent/sender_fee_tracker.rs @@ -9,6 +9,10 @@ use tracing::error; pub struct SenderFeeTracker { id_to_fee: HashMap, total_fee: u128, + // there are some allocations that we don't want it to be + // heaviest allocation, because they are already marked for finalization, + // and thus requesting RAVs on their own in their `post_stop` routine. + blocked_addresses: HashSet
, } impl SenderFeeTracker { @@ -32,10 +36,19 @@ impl SenderFeeTracker { } } + pub fn block_allocation_id(&mut self, address: Address) { + self.blocked_addresses.insert(address); + } + + pub fn unblock_allocation_id(&mut self, address: Address) { + self.blocked_addresses.remove(&address); + } + pub fn get_heaviest_allocation_id(&self) -> Option
{ // just loop over and get the biggest fee self.id_to_fee .iter() + .filter(|(addr, _)| !self.blocked_addresses.contains(*addr)) .fold(None, |acc: Option<(&Address, u128)>, (addr, fee)| { if let Some((_, max_fee)) = acc { if *fee > max_fee { @@ -82,10 +95,24 @@ mod tests { assert_eq!(tracker.get_heaviest_allocation_id(), Some(allocation_id_0)); assert_eq!(tracker.get_total_fee(), 10); + tracker.block_allocation_id(allocation_id_0); + assert_eq!(tracker.get_heaviest_allocation_id(), None); + assert_eq!(tracker.get_total_fee(), 10); + + tracker.unblock_allocation_id(allocation_id_0); + assert_eq!(tracker.get_heaviest_allocation_id(), Some(allocation_id_0)); + tracker.update(allocation_id_2, 20); assert_eq!(tracker.get_heaviest_allocation_id(), Some(allocation_id_2)); assert_eq!(tracker.get_total_fee(), 30); + tracker.block_allocation_id(allocation_id_2); + assert_eq!(tracker.get_heaviest_allocation_id(), Some(allocation_id_0)); + assert_eq!(tracker.get_total_fee(), 30); + + tracker.unblock_allocation_id(allocation_id_2); + assert_eq!(tracker.get_heaviest_allocation_id(), Some(allocation_id_2)); + tracker.update(allocation_id_1, 30); assert_eq!(tracker.get_heaviest_allocation_id(), Some(allocation_id_1)); assert_eq!(tracker.get_total_fee(), 60);