Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: ensure logs returned by RPC match filter #507

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions core/src/execution/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ pub enum ExecutionError {
BlockReceiptsRootMismatch(BlockTag),
#[error("filter not found: 0x{0:x}")]
FilterNotFound(U256),
#[error("log does not match filter")]
LogDoesNotMatchFilter(),
}

/// Errors that can occur during evm.rs calls
Expand Down
77 changes: 67 additions & 10 deletions core/src/execution/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -332,7 +332,7 @@ impl<N: NetworkSpec, R: ExecutionRpc<N>> ExecutionClient<N, R> {
ExecutionError::TooManyLogsToProve(logs.len(), MAX_SUPPORTED_LOGS_NUMBER).into(),
);
}

self.ensure_logs_match_filter(&logs, &filter).await?;
self.verify_logs(&logs).await?;
Ok(logs)
}
Expand All @@ -345,7 +345,7 @@ impl<N: NetworkSpec, R: ExecutionRpc<N>> ExecutionClient<N, R> {
// only concerned with filters created via helios
return Err(ExecutionError::FilterNotFound(filter_id).into());
}
Some(FilterType::Logs) => {
Some(FilterType::Logs(filter)) => {
// underlying RPC takes care of keeping track of changes
let filter_changes = self.rpc.get_filter_changes(filter_id).await?;
let logs = filter_changes.as_logs().unwrap_or(&[]);
Expand All @@ -356,6 +356,7 @@ impl<N: NetworkSpec, R: ExecutionRpc<N>> ExecutionClient<N, R> {
)
.into());
}
self.ensure_logs_match_filter(logs, filter).await?;
self.verify_logs(logs).await?;
FilterChanges::Logs(logs.to_vec())
}
Expand Down Expand Up @@ -387,14 +388,27 @@ impl<N: NetworkSpec, R: ExecutionRpc<N>> ExecutionClient<N, R> {
}

pub async fn get_filter_logs(&self, filter_id: U256) -> Result<Vec<Log>> {
let logs = self.rpc.get_filter_logs(filter_id).await?;
if logs.len() > MAX_SUPPORTED_LOGS_NUMBER {
return Err(
ExecutionError::TooManyLogsToProve(logs.len(), MAX_SUPPORTED_LOGS_NUMBER).into(),
);
let filter_type = self.state.get_filter(&filter_id).await;

match &filter_type {
Some(FilterType::Logs(filter)) => {
let logs = self.rpc.get_filter_logs(filter_id).await?;
if logs.len() > MAX_SUPPORTED_LOGS_NUMBER {
return Err(ExecutionError::TooManyLogsToProve(
logs.len(),
MAX_SUPPORTED_LOGS_NUMBER,
)
.into());
}
self.ensure_logs_match_filter(&logs, filter).await?;
self.verify_logs(&logs).await?;
Ok(logs)
}
_ => {
// only concerned with filters created via helios
return Err(ExecutionError::FilterNotFound(filter_id).into());
}
}
self.verify_logs(&logs).await?;
Ok(logs)
}

pub async fn uninstall_filter(&self, filter_id: U256) -> Result<bool> {
Expand All @@ -421,7 +435,9 @@ impl<N: NetworkSpec, R: ExecutionRpc<N>> ExecutionClient<N, R> {
let filter_id = self.rpc.new_filter(&filter).await?;

// record the filter in the state
self.state.push_filter(filter_id, FilterType::Logs).await;
self.state
.push_filter(filter_id, FilterType::Logs(filter))
.await;

Ok(filter_id)
}
Expand Down Expand Up @@ -449,6 +465,47 @@ impl<N: NetworkSpec, R: ExecutionRpc<N>> ExecutionClient<N, R> {
Ok(filter_id)
}

/// Ensure that each log entry in the given array of logs match the given filter.
async fn ensure_logs_match_filter(&self, logs: &[Log], filter: &Filter) -> Result<()> {
fn log_matches_filter(log: &Log, filter: &Filter) -> bool {
if let Some(block_hash) = filter.get_block_hash() {
if log.block_hash.unwrap() != block_hash {
return false;
}
}
if let Some(from_block) = filter.get_from_block() {
if log.block_number.unwrap() < from_block {
return false;
}
}
if let Some(to_block) = filter.get_to_block() {
if log.block_number.unwrap() > to_block {
return false;
}
}
if !filter.address.matches(&log.address()) {
return false;
}
for (i, topic) in filter.topics.iter().enumerate() {
if let Some(log_topic) = log.topics().get(i) {
if !topic.matches(log_topic) {
return false;
}
}
}
true
}
for log in logs {
if !log_matches_filter(log, filter) {
return Err(ExecutionError::LogDoesNotMatchFilter().into());
}
}
Ok(())
}

/// Verify the integrity of each log entry in the given array of logs by
/// checking its inclusion in the corresponding transaction receipt
/// and verifying the transaction receipt itself against the block's receipt root.
async fn verify_logs(&self, logs: &[Log]) -> Result<()> {
// Collect all (unique) block numbers
let block_nums = logs
Expand Down
5 changes: 3 additions & 2 deletions core/src/execution/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
consensus::BlockHeader,
network::{primitives::HeaderResponse, BlockResponse},
primitives::{Address, B256, U256},
rpc::types::BlockTransactions,
rpc::types::{BlockTransactions, Filter},
};
use eyre::{eyre, Result};
use tokio::{
Expand Down Expand Up @@ -303,7 +303,7 @@
}

fn prune_before(&mut self, n: u64) {
loop {

Check failure on line 306 in core/src/execution/state.rs

View workflow job for this annotation

GitHub Actions / clippy

this loop could be written as a `while let` loop
if let Some((oldest, _)) = self.blocks.first_key_value() {
let oldest = *oldest;
if oldest < n {
Expand Down Expand Up @@ -373,7 +373,8 @@

#[derive(Clone)]
pub enum FilterType {
Logs,
// filter content
Logs(Filter),
// block number when the filter was created or last queried
NewBlock(u64),
PendingTransactions,
Expand Down
Loading