Skip to content

Commit

Permalink
refactor: spawn allocation based on type
Browse files Browse the repository at this point in the history
Signed-off-by: Gustavo Inacio <gustavo@semiotic.ai>
  • Loading branch information
gusinacio committed Feb 4, 2025
1 parent 35046f6 commit 4bf596e
Show file tree
Hide file tree
Showing 7 changed files with 247 additions and 106 deletions.
10 changes: 5 additions & 5 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 3 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -85,12 +85,12 @@ tonic-build = "0.12.3"

[patch.crates-io.tap_core]
git = "https://github.com/semiotic-ai/timeline-aggregation-protocol"
rev = "e5546a6"
rev = "9fd4beb"

[patch.crates-io.tap_aggregator]
git = "https://github.com/semiotic-ai/timeline-aggregation-protocol"
rev = "e5546a6"
rev = "9fd4beb"

[patch.crates-io.tap_graph]
git = "https://github.com/semiotic-ai/timeline-aggregation-protocol"
rev = "e5546a6"
rev = "9fd4beb"
127 changes: 99 additions & 28 deletions crates/tap-agent/src/agent/sender_account.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,10 @@ use prometheus::{register_gauge_vec, register_int_gauge_vec, GaugeVec, IntGaugeV
use ractor::{Actor, ActorProcessingErr, ActorRef, MessagingErr, SupervisionEvent};
use reqwest::Url;
use sqlx::PgPool;
use tap_aggregator::grpc::tap_aggregator_client::TapAggregatorClient;
use tap_graph::SignedRav;
use tap_aggregator::grpc::{
v1::tap_aggregator_client::TapAggregatorClient as AggregatorV1,
v2::tap_aggregator_client::TapAggregatorClient as AggregatorV2,
};
use thegraph_core::alloy::{
hex::ToHexExt,
primitives::{Address, U256},
Expand All @@ -39,7 +41,7 @@ use crate::{
adaptative_concurrency::AdaptiveLimiter,
agent::unaggregated_receipts::UnaggregatedReceipts,
backoff::BackoffInfo,
tap::context::Legacy,
tap::context::{Horizon, Legacy},
tracker::{SenderFeeTracker, SimpleFeeTracker},
};

Expand Down Expand Up @@ -95,11 +97,44 @@ const INITIAL_RAV_REQUEST_CONCURRENT: usize = 1;
type RavMap = HashMap<Address, u128>;
type Balance = U256;

#[derive(Debug, Default, PartialEq, Eq)]
pub struct RavInformation {
pub allocation_id: Address,
pub value_aggregate: u128,
}

impl From<&tap_graph::SignedRav> for RavInformation {
fn from(value: &tap_graph::SignedRav) -> Self {
RavInformation {
allocation_id: value.message.allocationId,
value_aggregate: value.message.valueAggregate,
}
}
}

impl From<tap_graph::SignedRav> for RavInformation {
fn from(value: tap_graph::SignedRav) -> Self {
RavInformation {
allocation_id: value.message.allocationId,
value_aggregate: value.message.valueAggregate,
}
}
}

impl From<&tap_graph::v2::SignedRav> for RavInformation {
fn from(value: &tap_graph::v2::SignedRav) -> Self {
RavInformation {
allocation_id: value.message.allocationId,
value_aggregate: value.message.valueAggregate,
}
}
}

#[derive(Debug)]
pub enum ReceiptFees {
NewReceipt(u128, u64),
UpdateValue(UnaggregatedReceipts),
RavRequestResponse((UnaggregatedReceipts, anyhow::Result<Option<SignedRav>>)),
RavRequestResponse((UnaggregatedReceipts, anyhow::Result<Option<RavInformation>>)),
Retry,
}

Expand All @@ -110,7 +145,7 @@ pub enum SenderAccountMessage {
NewAllocationId(Address),
UpdateReceiptFees(Address, ReceiptFees),
UpdateInvalidReceiptFees(Address, UnaggregatedReceipts),
UpdateRav(SignedRav),
UpdateRav(RavInformation),
#[cfg(test)]
GetSenderFeeTracker(ractor::RpcReplyPort<SenderFeeTracker>),
#[cfg(test)]
Expand Down Expand Up @@ -174,7 +209,8 @@ pub struct State {

domain_separator: Eip712Domain,
pgpool: PgPool,
sender_aggregator: TapAggregatorClient<Channel>,
aggregator_v1: AggregatorV1<Channel>,
aggregator_v2: AggregatorV2<Channel>,

// Backoff info
backoff_info: BackoffInfo,
Expand Down Expand Up @@ -228,29 +264,49 @@ impl State {
%allocation_id,
"SenderAccount is creating allocation."
);
let args = SenderAllocationArgs::builder()
.pgpool(self.pgpool.clone())
.allocation_id(allocation_id)
.sender(self.sender)
.escrow_accounts(self.escrow_accounts.clone())
.escrow_subgraph(self.escrow_subgraph)
.domain_separator(self.domain_separator.clone())
.sender_account_ref(sender_account_ref.clone())
.sender_aggregator(self.sender_aggregator.clone())
.config(AllocationConfig::from_sender_config(self.config))
.build();

match allocation_type {
AllocationType::Legacy => {
SenderAllocation::spawn_linked(
let args = SenderAllocationArgs::builder()
.pgpool(self.pgpool.clone())
.allocation_id(allocation_id)
.sender(self.sender)
.escrow_accounts(self.escrow_accounts.clone())
.escrow_subgraph(self.escrow_subgraph)
.domain_separator(self.domain_separator.clone())
.sender_account_ref(sender_account_ref.clone())
.sender_aggregator(self.aggregator_v1.clone())
.config(AllocationConfig::from_sender_config(self.config))
.build();
SenderAllocation::<Legacy>::spawn_linked(
Some(self.format_sender_allocation(&allocation_id)),
SenderAllocation::<Legacy>::default(),
SenderAllocation::default(),
args,
sender_account_ref.get_cell(),
)
.await?;
}
AllocationType::Horizon => {
let args = SenderAllocationArgs::builder()
.pgpool(self.pgpool.clone())
.allocation_id(allocation_id)
.sender(self.sender)
.escrow_accounts(self.escrow_accounts.clone())
.escrow_subgraph(self.escrow_subgraph)
.domain_separator(self.domain_separator.clone())
.sender_account_ref(sender_account_ref.clone())
.sender_aggregator(self.aggregator_v2.clone())
.config(AllocationConfig::from_sender_config(self.config))
.build();

SenderAllocation::<Horizon>::spawn_linked(
Some(self.format_sender_allocation(&allocation_id)),
SenderAllocation::default(),
args,
sender_account_ref.get_cell(),
)
.await?;
}
AllocationType::Horizon => unimplemented!(),
}
Ok(())
}
Expand Down Expand Up @@ -308,15 +364,15 @@ impl State {
fn finalize_rav_request(
&mut self,
allocation_id: Address,
rav_response: (UnaggregatedReceipts, anyhow::Result<Option<SignedRav>>),
rav_response: (UnaggregatedReceipts, anyhow::Result<Option<RavInformation>>),
) {
self.sender_fee_tracker.finish_rav_request(allocation_id);
let (fees, rav_result) = rav_response;
match rav_result {
Ok(signed_rav) => {
self.sender_fee_tracker.ok_rav_request(allocation_id);
self.adaptive_limiter.on_success();
let rav_value = signed_rav.map_or(0, |rav| rav.message.valueAggregate);
let rav_value = signed_rav.map_or(0, |rav| rav.value_aggregate);
self.update_rav(allocation_id, rav_value);
}
Err(err) => {
Expand Down Expand Up @@ -620,7 +676,19 @@ impl Actor for SenderAccount {
let endpoint = Endpoint::new(sender_aggregator_endpoint.to_string())
.context("Failed to create an endpoint for the sender aggregator")?;

let sender_aggregator = TapAggregatorClient::connect(endpoint.clone())
let aggregator_v1 = AggregatorV1::connect(endpoint.clone())
.await
.with_context(|| {
format!(
"Failed to connect to the TapAggregator endpoint '{}'",
endpoint.uri()
)
})?;
// wiremock_grpc used for tests doesn't support Zstd compression
#[cfg(not(test))]
let aggregator_v1 = aggregator_v1.send_compressed(tonic::codec::CompressionEncoding::Zstd);

let aggregator_v2 = AggregatorV2::connect(endpoint.clone())
.await
.with_context(|| {
format!(
Expand All @@ -630,8 +698,7 @@ impl Actor for SenderAccount {
})?;
// wiremock_grpc used for tests doesn't support Zstd compression
#[cfg(not(test))]
let sender_aggregator =
sender_aggregator.send_compressed(tonic::codec::CompressionEncoding::Zstd);
let aggregator_v2 = aggregator_v2.send_compressed(tonic::codec::CompressionEncoding::Zstd);
let state = State {
prefix,
sender_fee_tracker: SenderFeeTracker::new(config.rav_request_buffer),
Expand All @@ -651,7 +718,8 @@ impl Actor for SenderAccount {
network_subgraph,
domain_separator,
pgpool,
sender_aggregator,
aggregator_v1,
aggregator_v2,
backoff_info: BackoffInfo::default(),
config,
};
Expand Down Expand Up @@ -692,8 +760,11 @@ impl Actor for SenderAccount {
);

match message {
SenderAccountMessage::UpdateRav(rav) => {
state.update_rav(rav.message.allocationId, rav.message.valueAggregate);
SenderAccountMessage::UpdateRav(RavInformation {
allocation_id,
value_aggregate,
}) => {
state.update_rav(allocation_id, value_aggregate);

let should_deny = !state.denied && state.deny_condition_reached();
if should_deny {
Expand Down
Loading

0 comments on commit 4bf596e

Please sign in to comment.