Skip to content

Commit

Permalink
feat: track filters in state for eth_getFilterChanges (#448)
Browse files Browse the repository at this point in the history
* Create .deepsource.toml

* feat: implement eth_getFilterLogs

* fixup! fmt

* fix: get_filter_changes return type

* fix merge conflicts

* track filters in state
  • Loading branch information
eshaan7 authored Dec 17, 2024
1 parent cd17cd4 commit ebb92c4
Show file tree
Hide file tree
Showing 9 changed files with 134 additions and 29 deletions.
4 changes: 2 additions & 2 deletions core/src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use std::sync::Arc;
use std::time::Duration;

use alloy::primitives::{Address, Bytes, B256, U256};
use alloy::rpc::types::{Filter, Log, SyncStatus};
use alloy::rpc::types::{Filter, FilterChanges, Log, SyncStatus};
use eyre::Result;
use tracing::{info, warn};

Expand Down Expand Up @@ -131,7 +131,7 @@ impl<N: NetworkSpec, C: Consensus<N::TransactionResponse>> Client<N, C> {
self.node.get_logs(filter).await
}

pub async fn get_filter_changes(&self, filter_id: U256) -> Result<Vec<Log>> {
pub async fn get_filter_changes(&self, filter_id: U256) -> Result<FilterChanges> {
self.node.get_filter_changes(filter_id).await
}

Expand Down
4 changes: 2 additions & 2 deletions core/src/client/node.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use std::sync::Arc;

use alloy::primitives::{Address, Bytes, B256, U256};
use alloy::rpc::types::{Filter, Log, SyncInfo, SyncStatus};
use alloy::rpc::types::{Filter, FilterChanges, Log, SyncInfo, SyncStatus};
use eyre::{eyre, Result};

use crate::consensus::Consensus;
Expand Down Expand Up @@ -166,7 +166,7 @@ impl<N: NetworkSpec, C: Consensus<N::TransactionResponse>> Node<N, C> {
format!("helios-{}", helios_version)
}

pub async fn get_filter_changes(&self, filter_id: U256) -> Result<Vec<Log>> {
pub async fn get_filter_changes(&self, filter_id: U256) -> Result<FilterChanges> {
self.execution.get_filter_changes(filter_id).await
}

Expand Down
6 changes: 3 additions & 3 deletions core/src/client/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use std::{fmt::Display, net::SocketAddr, sync::Arc};
use alloy::network::{ReceiptResponse, TransactionResponse};
use alloy::primitives::{Address, Bytes, B256, U256, U64};
use alloy::rpc::json_rpc::RpcObject;
use alloy::rpc::types::{Filter, Log, SyncStatus};
use alloy::rpc::types::{Filter, FilterChanges, Log, SyncStatus};
use eyre::Result;
use jsonrpsee::{
core::{async_trait, server::Methods},
Expand Down Expand Up @@ -124,7 +124,7 @@ trait EthRpc<TX: TransactionResponse + RpcObject, TXR: RpcObject, R: ReceiptResp
#[method(name = "getLogs")]
async fn get_logs(&self, filter: Filter) -> Result<Vec<Log>, ErrorObjectOwned>;
#[method(name = "getFilterChanges")]
async fn get_filter_changes(&self, filter_id: U256) -> Result<Vec<Log>, ErrorObjectOwned>;
async fn get_filter_changes(&self, filter_id: U256) -> Result<FilterChanges, ErrorObjectOwned>;
#[method(name = "getFilterLogs")]
async fn get_filter_logs(&self, filter_id: U256) -> Result<Vec<Log>, ErrorObjectOwned>;
#[method(name = "uninstallFilter")]
Expand Down Expand Up @@ -333,7 +333,7 @@ impl<N: NetworkSpec, C: Consensus<N::TransactionResponse>>
convert_err(self.node.get_logs(&filter).await)
}

async fn get_filter_changes(&self, filter_id: U256) -> Result<Vec<Log>, ErrorObjectOwned> {
async fn get_filter_changes(&self, filter_id: U256) -> Result<FilterChanges, ErrorObjectOwned> {
convert_err(self.node.get_filter_changes(filter_id).await)
}

Expand Down
2 changes: 2 additions & 0 deletions core/src/execution/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ pub enum ExecutionError {
BlockNotFound(BlockTag),
#[error("receipts root mismatch for block: {0}")]
BlockReceiptsRootMismatch(BlockTag),
#[error("filter not found: 0x{0:x}")]
FilterNotFound(U256),
}

/// Errors that can occur during evm.rs calls
Expand Down
88 changes: 74 additions & 14 deletions core/src/execution/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use std::collections::{HashMap, HashSet};
use alloy::network::ReceiptResponse;
use alloy::primitives::{keccak256, Address, B256, U256};
use alloy::rlp::encode;
use alloy::rpc::types::{Filter, Log};
use alloy::rpc::types::{Filter, FilterChanges, Log};
use constants::{BLOB_BASE_FEE_UPDATE_FRACTION, MIN_BASE_FEE_PER_BLOB_GAS};
use eyre::Result;
use futures::future::try_join_all;
Expand All @@ -18,7 +18,7 @@ use self::constants::MAX_SUPPORTED_LOGS_NUMBER;
use self::errors::ExecutionError;
use self::proof::{encode_account, verify_proof};
use self::rpc::ExecutionRpc;
use self::state::State;
use self::state::{FilterType, State};
use self::types::Account;

pub mod constants;
Expand Down Expand Up @@ -337,15 +337,53 @@ impl<N: NetworkSpec, R: ExecutionRpc<N>> ExecutionClient<N, R> {
Ok(logs)
}

pub async fn get_filter_changes(&self, filter_id: U256) -> Result<Vec<Log>> {
let logs = self.rpc.get_filter_changes(filter_id).await?;
if logs.len() > MAX_SUPPORTED_LOGS_NUMBER {
return Err(
ExecutionError::TooManyLogsToProve(logs.len(), MAX_SUPPORTED_LOGS_NUMBER).into(),
);
}
self.verify_logs(&logs).await?;
Ok(logs)
pub async fn get_filter_changes(&self, filter_id: U256) -> Result<FilterChanges> {
let filter_type = self.state.get_filter(&filter_id).await;

Ok(match &filter_type {
None => {
// only concerned with filters created via helios
return Err(ExecutionError::FilterNotFound(filter_id).into());
}
Some(FilterType::Logs) => {
// underlying RPC takes care of keeping track of changes
let filter_changes = self.rpc.get_filter_changes(filter_id).await?;
let logs = filter_changes.as_logs().unwrap_or(&[]);
if logs.len() > MAX_SUPPORTED_LOGS_NUMBER {
return Err(ExecutionError::TooManyLogsToProve(
logs.len(),
MAX_SUPPORTED_LOGS_NUMBER,
)
.into());
}
self.verify_logs(logs).await?;
FilterChanges::Logs(logs.to_vec())
}
Some(FilterType::NewBlock(last_block_num)) => {
let blocks = self
.state
.get_blocks_after(BlockTag::Number(*last_block_num))
.await;
if !blocks.is_empty() {
// keep track of the last block number in state
// so next call can filter starting from the prev call's (last block number + 1)
self.state
.push_filter(
filter_id,
FilterType::NewBlock(blocks.last().unwrap().number.to()),
)
.await;
}
let block_hashes = blocks.into_iter().map(|b| b.hash).collect();
FilterChanges::Hashes(block_hashes)
}
Some(FilterType::PendingTransactions) => {
// underlying RPC takes care of keeping track of changes
let filter_changes = self.rpc.get_filter_changes(filter_id).await?;
let tx_hashes = filter_changes.as_hashes().unwrap_or(&[]);
FilterChanges::Hashes(tx_hashes.to_vec())
}
})
}

pub async fn get_filter_logs(&self, filter_id: U256) -> Result<Vec<Log>> {
Expand All @@ -360,6 +398,8 @@ impl<N: NetworkSpec, R: ExecutionRpc<N>> ExecutionClient<N, R> {
}

pub async fn uninstall_filter(&self, filter_id: U256) -> Result<bool> {
// remove the filter from the state
self.state.remove_filter(&filter_id).await;
self.rpc.uninstall_filter(filter_id).await
}

Expand All @@ -378,15 +418,35 @@ impl<N: NetworkSpec, R: ExecutionRpc<N>> ExecutionClient<N, R> {
} else {
filter
};
self.rpc.new_filter(&filter).await
let filter_id = self.rpc.new_filter(&filter).await?;

// record the filter in the state
self.state.push_filter(filter_id, FilterType::Logs).await;

Ok(filter_id)
}

pub async fn new_block_filter(&self) -> Result<U256> {
self.rpc.new_block_filter().await
let filter_id = self.rpc.new_block_filter().await?;

// record the filter in the state
let latest_block_num = self.state.latest_block_number().await.unwrap_or(1);
self.state
.push_filter(filter_id, FilterType::NewBlock(latest_block_num))
.await;

Ok(filter_id)
}

pub async fn new_pending_transaction_filter(&self) -> Result<U256> {
self.rpc.new_pending_transaction_filter().await
let filter_id = self.rpc.new_pending_transaction_filter().await?;

// record the filter in the state
self.state
.push_filter(filter_id, FilterType::PendingTransactions)
.await;

Ok(filter_id)
}

async fn verify_logs(&self, logs: &[Log]) -> Result<()> {
Expand Down
10 changes: 6 additions & 4 deletions core/src/execution/rpc/http_rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@ use alloy::eips::BlockNumberOrTag;
use alloy::primitives::{Address, B256, U256};
use alloy::providers::{Provider, ProviderBuilder, RootProvider};
use alloy::rpc::client::ClientBuilder;
use alloy::rpc::types::{BlockId, EIP1186AccountProofResponse, FeeHistory, Filter, Log};
use alloy::rpc::types::{
BlockId, EIP1186AccountProofResponse, FeeHistory, Filter, FilterChanges, Log,
};
use alloy::transports::http::Http;
use alloy::transports::layers::{RetryBackoffLayer, RetryBackoffService};
use async_trait::async_trait;
Expand Down Expand Up @@ -150,10 +152,10 @@ impl<N: NetworkSpec> ExecutionRpc<N> for HttpRpc<N> {
.map_err(|e| RpcError::new("get_logs", e))?)
}

async fn get_filter_changes(&self, filter_id: U256) -> Result<Vec<Log>> {
async fn get_filter_changes(&self, filter_id: U256) -> Result<FilterChanges> {
Ok(self
.provider
.get_filter_changes(filter_id)
.get_filter_changes_dyn(filter_id)
.await
.map_err(|e| RpcError::new("get_filter_changes", e))?)
}
Expand Down Expand Up @@ -193,7 +195,7 @@ impl<N: NetworkSpec> ExecutionRpc<N> for HttpRpc<N> {
async fn new_pending_transaction_filter(&self) -> Result<U256> {
Ok(self
.provider
.new_pending_transactions_filter(true)
.new_pending_transactions_filter(false)
.await
.map_err(|e| RpcError::new("new_pending_transaction_filter", e))?)
}
Expand Down
4 changes: 2 additions & 2 deletions core/src/execution/rpc/mock_rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use std::{fs::read_to_string, path::PathBuf, str::FromStr};

use alloy::primitives::{Address, B256, U256};
use alloy::rpc::types::{
AccessList, BlockId, EIP1186AccountProofResponse, FeeHistory, Filter, Log,
AccessList, BlockId, EIP1186AccountProofResponse, FeeHistory, Filter, FilterChanges, Log,
};
use async_trait::async_trait;
use eyre::{eyre, Result};
Expand Down Expand Up @@ -74,7 +74,7 @@ impl<N: NetworkSpec> ExecutionRpc<N> for MockRpc {
Ok(serde_json::from_str(&logs)?)
}

async fn get_filter_changes(&self, _filter_id: U256) -> Result<Vec<Log>> {
async fn get_filter_changes(&self, _filter_id: U256) -> Result<FilterChanges> {
let logs = read_to_string(self.path.join("logs.json"))?;
Ok(serde_json::from_str(&logs)?)
}
Expand Down
4 changes: 2 additions & 2 deletions core/src/execution/rpc/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use alloy::primitives::{Address, B256, U256};
use alloy::rpc::types::{
AccessList, BlockId, EIP1186AccountProofResponse, FeeHistory, Filter, Log,
AccessList, BlockId, EIP1186AccountProofResponse, FeeHistory, Filter, FilterChanges, Log,
};
use async_trait::async_trait;
use eyre::Result;
Expand Down Expand Up @@ -37,7 +37,7 @@ pub trait ExecutionRpc<N: NetworkSpec>: Send + Clone + Sync + 'static {
async fn get_block_receipts(&self, block: BlockTag) -> Result<Option<Vec<N::ReceiptResponse>>>;
async fn get_transaction(&self, tx_hash: B256) -> Result<Option<N::TransactionResponse>>;
async fn get_logs(&self, filter: &Filter) -> Result<Vec<Log>>;
async fn get_filter_changes(&self, filter_id: U256) -> Result<Vec<Log>>;
async fn get_filter_changes(&self, filter_id: U256) -> Result<FilterChanges>;
async fn get_filter_logs(&self, filter_id: U256) -> Result<Vec<Log>>;
async fn uninstall_filter(&self, filter_id: U256) -> Result<bool>;
async fn new_filter(&self, filter: &Filter) -> Result<U256>;
Expand Down
41 changes: 41 additions & 0 deletions core/src/execution/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,23 @@ impl<N: NetworkSpec, R: ExecutionRpc<N>> State<N, R> {
.cloned()
}

pub async fn get_blocks_after(&self, tag: BlockTag) -> Vec<Block<N::TransactionResponse>> {
let start_block = self.get_block(tag).await;
if start_block.is_none() {
return vec![];
}
let start_block_num = start_block.unwrap().number.to::<u64>();
let blocks = self
.inner
.read()
.await
.blocks
.range((start_block_num + 1)..)
.map(|(_, v)| v.clone())
.collect();
blocks
}

// transaction fetch

pub async fn get_transaction(&self, hash: B256) -> Option<N::TransactionResponse> {
Expand Down Expand Up @@ -157,6 +174,20 @@ impl<N: NetworkSpec, R: ExecutionRpc<N>> State<N, R> {
self.get_block(tag).await.map(|block| block.miner)
}

// filter

pub async fn push_filter(&self, id: U256, filter: FilterType) {
self.inner.write().await.filters.insert(id, filter);
}

pub async fn remove_filter(&self, id: &U256) -> bool {
self.inner.write().await.filters.remove(id).is_some()
}

pub async fn get_filter(&self, id: &U256) -> Option<FilterType> {
self.inner.read().await.filters.get(id).cloned()
}

// misc

pub async fn latest_block_number(&self) -> Option<u64> {
Expand All @@ -175,6 +206,7 @@ struct Inner<N: NetworkSpec, R: ExecutionRpc<N>> {
finalized_block: Option<Block<N::TransactionResponse>>,
hashes: HashMap<B256, u64>,
txs: HashMap<B256, TransactionLocation>,
filters: HashMap<U256, FilterType>,
history_length: u64,
rpc: R,
}
Expand All @@ -187,6 +219,7 @@ impl<N: NetworkSpec, R: ExecutionRpc<N>> Inner<N, R> {
finalized_block: None,
hashes: HashMap::default(),
txs: HashMap::default(),
filters: HashMap::default(),
rpc,
}
}
Expand Down Expand Up @@ -320,3 +353,11 @@ struct TransactionLocation {
block: u64,
index: usize,
}

#[derive(Clone)]
pub enum FilterType {
Logs,
// block number when the filter was created or last queried
NewBlock(u64),
PendingTransactions,
}

0 comments on commit ebb92c4

Please sign in to comment.