Skip to content

Commit

Permalink
Gusinacio/dont crash sender account (#190)
Browse files Browse the repository at this point in the history
* feat(tap-agent): ignore blocked allocation ids

Signed-off-by: Gustavo Inacio <gustavo@semiotic.ai>

* fix(tap-agent): retry last rav forever

Signed-off-by: Gustavo Inacio <gustavo@semiotic.ai>

* fix(tap-agent): never fail sender account, add block allocation when closing it

Signed-off-by: Gustavo Inacio <gustavo@semiotic.ai>

* docs(tap-agent): update comments

Co-authored-by: Alexis Asseman <alexis@semiotic.ai>

---------

Signed-off-by: Gustavo Inacio <gustavo@semiotic.ai>
Co-authored-by: Alexis Asseman <alexis@semiotic.ai>
  • Loading branch information
gusinacio and aasseman authored May 24, 2024
1 parent 11bc9d1 commit d3eafbe
Show file tree
Hide file tree
Showing 3 changed files with 57 additions and 27 deletions.
31 changes: 21 additions & 10 deletions tap-agent/src/agent/sender_account.rs
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,10 @@ impl State {

async fn rav_requester_single(&mut self) -> Result<()> {
let Some(allocation_id) = self.sender_fee_tracker.get_heaviest_allocation_id() else {
anyhow::bail!("Error while getting the heaviest allocation because none has unaggregated fees tracked");
anyhow::bail!(
"Error while getting the heaviest allocation because \
no unblocked allocation has enough unaggregated fees tracked"
);
};
let sender_allocation_id = self.format_sender_allocation(&allocation_id);
let allocation = ActorRef::<SenderAllocationMessage>::where_is(sender_allocation_id);
Expand All @@ -147,7 +150,9 @@ impl State {
);
};
// we call and wait for the response so we don't process anymore update
let (fees, rav) = call!(allocation, SenderAllocationMessage::TriggerRAVRequest)?;
let Ok((fees, rav)) = call!(allocation, SenderAllocationMessage::TriggerRAVRequest) else {
anyhow::bail!("Error while sending and waiting message for actor {allocation_id}");
};

// update rav tracker
self.rav_tracker.update(
Expand Down Expand Up @@ -470,14 +475,13 @@ impl Actor for SenderAccount {
trigger_value = state.config.tap.rav_request_trigger_value,
"Total fee greater than the trigger value. Triggering RAV request"
);
// There are only 3 scenarios where this can fail:
// - No allocation ids
// - The SenderAllocation could not be found
// - The SenderAllocation could not process the message
// In any case, we want to respawn this whole actor
// and respawn all its children actors. Thus, we can safely
// panic the actor by using ?
state.rav_requester_single().await?;
// In case we fail, we want our actor to keep running
if let Err(err) = state.rav_requester_single().await {
tracing::error!(
error = %err,
"There was an error while requesting a RAV."
);
}
}

// Maybe allow the sender right after the potential RAV request. This way, the
Expand All @@ -502,6 +506,9 @@ impl Actor for SenderAccount {
state.format_sender_allocation(allocation_id),
) {
tracing::trace!(%allocation_id, "SenderAccount shutting down SenderAllocation");
// we can not send a rav request to this allocation
// because it's gonna trigger the last rav
state.sender_fee_tracker.block_allocation_id(*allocation_id);
sender_handle.stop(None);
}
}
Expand Down Expand Up @@ -594,6 +601,10 @@ impl Actor for SenderAccount {

let tracker = &mut state.sender_fee_tracker;
tracker.update(allocation_id, 0);
// clean up hashset
state
.sender_fee_tracker
.unblock_allocation_id(allocation_id);
// rav tracker is not updated because it's still not redeemed
}
SupervisionEvent::ActorPanicked(cell, error) => {
Expand Down
26 changes: 9 additions & 17 deletions tap-agent/src/agent/sender_allocation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -194,27 +194,18 @@ impl Actor for SenderAllocation {
);
// Request a RAV and mark the allocation as final.
while state.unaggregated_fees.value > 0 {
if state.request_rav().await.is_err() {
break;
if let Err(err) = state.request_rav().await {
error!(error = %err, "There was an error while requesting rav. Retrying in 30 seconds...");
tokio::time::sleep(Duration::from_secs(30)).await;
}
}
if state.unaggregated_fees.value > 0 {
Err(anyhow!(
"There are still pending unaggregated_fees for sender {} and allocation {}.\
Not marking as last.",
state.sender,
state.allocation_id
))?;
}

state.mark_rav_last().await.inspect_err(|e| {
error!(
"Error while marking allocation {} as last for sender {}: {}",
state.allocation_id, state.sender, e
);
})?;
while let Err(err) = state.mark_rav_last().await {
error!(error = %err, %state.allocation_id, %state.sender, "Error while marking allocation last. Retrying in 30 seconds...");
tokio::time::sleep(Duration::from_secs(30)).await;
}

//Since this is only triggered after allocation is closed will be counted here
// Since this is only triggered after allocation is closed will be counted here
CLOSED_SENDER_ALLOCATIONS.inc();

Ok(())
Expand Down Expand Up @@ -250,6 +241,7 @@ impl Actor for SenderAllocation {
);
u128::MAX
});
// it's fine to crash the actor, could not send a message to its parent
state
.sender_account_ref
.cast(SenderAccountMessage::UpdateReceiptFees(
Expand Down
27 changes: 27 additions & 0 deletions tap-agent/src/agent/sender_fee_tracker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,10 @@ use tracing::error;
pub struct SenderFeeTracker {
id_to_fee: HashMap<Address, u128>,
total_fee: u128,
// there are some allocations that we don't want it to be
// heaviest allocation, because they are already marked for finalization,
// and thus requesting RAVs on their own in their `post_stop` routine.
blocked_addresses: HashSet<Address>,
}

impl SenderFeeTracker {
Expand All @@ -32,10 +36,19 @@ impl SenderFeeTracker {
}
}

pub fn block_allocation_id(&mut self, address: Address) {
self.blocked_addresses.insert(address);
}

pub fn unblock_allocation_id(&mut self, address: Address) {
self.blocked_addresses.remove(&address);
}

pub fn get_heaviest_allocation_id(&self) -> Option<Address> {
// just loop over and get the biggest fee
self.id_to_fee
.iter()
.filter(|(addr, _)| !self.blocked_addresses.contains(*addr))
.fold(None, |acc: Option<(&Address, u128)>, (addr, fee)| {
if let Some((_, max_fee)) = acc {
if *fee > max_fee {
Expand Down Expand Up @@ -82,10 +95,24 @@ mod tests {
assert_eq!(tracker.get_heaviest_allocation_id(), Some(allocation_id_0));
assert_eq!(tracker.get_total_fee(), 10);

tracker.block_allocation_id(allocation_id_0);
assert_eq!(tracker.get_heaviest_allocation_id(), None);
assert_eq!(tracker.get_total_fee(), 10);

tracker.unblock_allocation_id(allocation_id_0);
assert_eq!(tracker.get_heaviest_allocation_id(), Some(allocation_id_0));

tracker.update(allocation_id_2, 20);
assert_eq!(tracker.get_heaviest_allocation_id(), Some(allocation_id_2));
assert_eq!(tracker.get_total_fee(), 30);

tracker.block_allocation_id(allocation_id_2);
assert_eq!(tracker.get_heaviest_allocation_id(), Some(allocation_id_0));
assert_eq!(tracker.get_total_fee(), 30);

tracker.unblock_allocation_id(allocation_id_2);
assert_eq!(tracker.get_heaviest_allocation_id(), Some(allocation_id_2));

tracker.update(allocation_id_1, 30);
assert_eq!(tracker.get_heaviest_allocation_id(), Some(allocation_id_1));
assert_eq!(tracker.get_total_fee(), 60);
Expand Down

0 comments on commit d3eafbe

Please sign in to comment.