Skip to content

Commit

Permalink
refactor!: relax manager constraints for ravs
Browse files Browse the repository at this point in the history
Signed-off-by: Gustavo Inacio <gustavo@semiotic.ai>
  • Loading branch information
gusinacio committed Jan 30, 2025
1 parent dbae001 commit 252f456
Show file tree
Hide file tree
Showing 3 changed files with 47 additions and 56 deletions.
90 changes: 41 additions & 49 deletions tap_core/src/manager/tap_manager.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
// Copyright 2023-, Semiotic AI, Inc.
// SPDX-License-Identifier: Apache-2.0

use std::marker::PhantomData;

use alloy::{dyn_abi::Eip712Domain, sol_types::SolStruct};
use tap_receipt::rav::Aggregate;

Expand All @@ -20,7 +18,7 @@ use crate::{
Error,
};

pub struct Manager<E, Rcpt, Rav> {
pub struct Manager<E, Rcpt> {
/// Context that implements adapters
context: E,

Expand All @@ -30,11 +28,9 @@ pub struct Manager<E, Rcpt, Rav> {
/// Struct responsible for doing checks for receipt. Ownership stays with manager allowing manager
/// to update configuration ( like minimum timestamp ).
domain_separator: Eip712Domain,

_phantom: PhantomData<Rav>,
}

impl<E, Rcpt, Rav> Manager<E, Rcpt, Rav> {
impl<E, Rcpt> Manager<E, Rcpt> {
/// Creates new manager with provided `adapters`, any receipts received by this manager
/// will complete all `required_checks` before being accepted or declined from RAV.
/// `starting_min_timestamp` will be used as min timestamp until the first RAV request is created.
Expand All @@ -48,27 +44,40 @@ impl<E, Rcpt, Rav> Manager<E, Rcpt, Rav> {
context,
domain_separator,
checks: checks.into(),
_phantom: PhantomData,
}
}
}

impl<E, Rcpt, Rav> Manager<E, Rcpt, Rav>
where
E: RavStore<Rav> + SignatureChecker,
Rav: SolStruct + PartialEq<Rav> + Sync + std::fmt::Debug,
{
async fn get_previous_rav<Rav: SolStruct>(
&self,
) -> Result<Option<Eip712SignedMessage<Rav>>, Error>
where
E: RavRead<Rav>,
{
let previous_rav = self
.context
.last_rav()
.await
.map_err(|err| Error::AdapterError {
source_error: anyhow::Error::new(err),
})?;
Ok(previous_rav)
}

/// Verify `signed_rav` matches all values on `expected_rav`, and that `signed_rav` has a valid signer.
///
/// # Errors
///
/// Returns [`Error::AdapterError`] if there are any errors while storing RAV
///
pub async fn verify_and_store_rav(
pub async fn verify_and_store_rav<Rav>(
&self,
expected_rav: Rav,
signed_rav: Eip712SignedMessage<Rav>,
) -> std::result::Result<(), Error> {
) -> std::result::Result<(), Error>
where
E: RavStore<Rav> + SignatureChecker,
Rav: SolStruct + PartialEq<Rav> + Sync + std::fmt::Debug,
{
self.context
.check_signature(&signed_rav, &self.domain_separator)
.await?;
Expand All @@ -91,27 +100,10 @@ where
}
}

impl<E, Rcpt, Rav> Manager<E, Rcpt, Rav>
impl<E, Rcpt> Manager<E, Rcpt>
where
E: RavRead<Rav>,
Rav: SolStruct,
{
async fn get_previous_rav(&self) -> Result<Option<Eip712SignedMessage<Rav>>, Error> {
let previous_rav = self
.context
.last_rav()
.await
.map_err(|err| Error::AdapterError {
source_error: anyhow::Error::new(err),
})?;
Ok(previous_rav)
}
}

impl<E, Rcpt, Rav> Manager<E, Rcpt, Rav>
where
E: ReceiptRead<Rcpt> + SignatureChecker,
Rcpt: WithUniqueId + WithValueAndTimestamp + Sync,
E: ReceiptRead<Rcpt>,
Rcpt: WithUniqueId + WithValueAndTimestamp,
{
async fn collect_receipts(
&self,
Expand Down Expand Up @@ -168,14 +160,7 @@ where

Ok((checked_receipts, failed_receipts))
}
}

impl<E, Rcpt, Rav> Manager<E, Rcpt, Rav>
where
E: ReceiptRead<Rcpt> + RavRead<Rav> + SignatureChecker,
Rav: SolStruct + WithValueAndTimestamp + Clone + Aggregate<Rcpt>,
Rcpt: WithUniqueId + WithValueAndTimestamp + Sync,
{
/// Completes remaining checks on all receipts up to
/// (current time - `timestamp_buffer_ns`). Returns them in two lists
/// (valid receipts and invalid receipts) along with the expected RAV that
Expand All @@ -191,12 +176,16 @@ where
/// previous RAV is greater than the min timestamp. Caused by timestamp
/// buffer being too large, or requests coming too soon.
///
pub async fn create_rav_request(
pub async fn create_rav_request<Rav>(
&self,
ctx: &Context,
timestamp_buffer_ns: u64,
receipts_limit: Option<u64>,
) -> Result<RavRequest<Rcpt, Rav>, Error> {
) -> Result<RavRequest<Rcpt, Rav>, Error>
where
E: RavRead<Rav>,
Rav: SolStruct + WithValueAndTimestamp + Clone + Aggregate<Rcpt>,
{
let previous_rav = self.get_previous_rav().await?;
let min_timestamp_ns = previous_rav
.as_ref()
Expand All @@ -218,10 +207,9 @@ where
}
}

impl<E, Rcpt, Rav> Manager<E, Rcpt, Rav>
impl<E, Rcpt> Manager<E, Rcpt>
where
E: ReceiptDelete + RavRead<Rav>,
Rav: SolStruct + WithValueAndTimestamp,
E: ReceiptDelete,
{
/// Removes obsolete receipts from storage. Obsolete receipts are receipts
/// that are older than the last RAV, and therefore already aggregated into the RAV.
Expand All @@ -233,7 +221,11 @@ where
/// Returns [`Error::AdapterError`] if there are any errors while retrieving
/// last RAV or removing receipts
///
pub async fn remove_obsolete_receipts(&self) -> Result<(), Error> {
pub async fn remove_obsolete_receipts<Rav>(&self) -> Result<(), Error>
where
E: RavRead<Rav>,
Rav: SolStruct + WithValueAndTimestamp,
{
match self.get_previous_rav().await? {
Some(last_rav) => {
self.context
Expand All @@ -249,7 +241,7 @@ where
}
}

impl<E, Rcpt, Rav> Manager<E, Rcpt, Rav>
impl<E, Rcpt> Manager<E, Rcpt>
where
E: ReceiptStore<Rcpt>,
{
Expand Down
3 changes: 1 addition & 2 deletions tap_core/tests/manager_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -128,8 +128,7 @@ async fn manager_verify_and_store_varying_initial_checks(
signer,
..
} = context;
let manager =
Manager::<_, _, ReceiptAggregateVoucher>::new(domain_separator.clone(), context, checks);
let manager = Manager::new(domain_separator.clone(), context, checks);

let value = 20u128;
let signed_receipt = Eip712SignedMessage::new(
Expand Down
10 changes: 5 additions & 5 deletions tap_integration_tests/tests/indexer_mock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,9 @@ pub trait Rpc {
/// threshold is a limit to which receipt_count can increment, after reaching which RAV request is triggered.
/// aggregator_client is an HTTP client used for making JSON-RPC requests to another server.
pub struct RpcManager<E> {
manager: Arc<Manager<E, SignedReceipt, ReceiptAggregateVoucher>>, // Manager object reference counted with an Arc
receipt_count: Arc<AtomicU64>, // Thread-safe atomic counter for receipts
threshold: u64, // The count at which a RAV request will be triggered
manager: Arc<Manager<E, SignedReceipt>>, // Manager object reference counted with an Arc
receipt_count: Arc<AtomicU64>, // Thread-safe atomic counter for receipts
threshold: u64, // The count at which a RAV request will be triggered
aggregator_client: (HttpClient, String), // HTTP client for sending requests to the aggregator server
}

Expand All @@ -66,7 +66,7 @@ where
aggregate_server_api_version: String,
) -> Result<Self> {
Ok(Self {
manager: Arc::new(Manager::<E, SignedReceipt, ReceiptAggregateVoucher>::new(
manager: Arc::new(Manager::<E, SignedReceipt>::new(
domain_separator,
context,
required_checks,
Expand Down Expand Up @@ -184,7 +184,7 @@ where

// request_rav function creates a request for aggregate receipts (RAV), sends it to another server and verifies the result.
async fn request_rav<E>(
manager: &Arc<Manager<E, SignedReceipt, ReceiptAggregateVoucher>>,
manager: &Arc<Manager<E, SignedReceipt>>,
time_stamp_buffer: u64, // Buffer for timestamping, see tap_core for details
aggregator_client: &(HttpClient, String), // HttpClient for making requests to the tap_aggregator server
threshold: usize,
Expand Down

0 comments on commit 252f456

Please sign in to comment.