Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
mattsse committed Feb 24, 2025
1 parent 08ef8fc commit f400c20
Showing 1 changed file with 82 additions and 23 deletions.
105 changes: 82 additions & 23 deletions crates/engine/tree/src/tree/payload_processor.rs
Original file line number Diff line number Diff line change
@@ -1,18 +1,19 @@
//! Entrypoint for payload processing.
use crate::tree::{
cached_state::{CachedStateMetrics, ProviderCaches, SavedCache},
cached_state::{CachedStateMetrics, CachedStateProvider, ProviderCaches, SavedCache},
root2::*,
StateProviderBuilder,
};
use alloy_consensus::{transaction::Recovered, BlockHeader};
use alloy_primitives::B256;
use reth_evm::ConfigureEvmEnvFor;
use reth_evm::{ConfigureEvm, ConfigureEvmEnvFor};
use reth_primitives_traits::{header::SealedHeaderFor, NodePrimitives, RecoveredBlock};
use reth_provider::{
providers::ConsistentDbView, BlockReader, DatabaseProviderFactory, StateCommitmentProvider,
StateProviderFactory, StateReader,
};
use reth_revm::{database::StateProviderDatabase, state::EvmState};
use reth_trie::TrieInput;
use reth_workload_executor::WorkloadExecutor;
use std::{
Expand All @@ -23,6 +24,7 @@ use std::{
Arc, RwLock,
},
};
use tracing::trace;

/// Entrypoint for executing the payload.
pub struct PayloadProcessor<N, Evm> {
Expand All @@ -42,7 +44,10 @@ pub struct PayloadProcessor<N, Evm> {
impl<N, Evm> PayloadProcessor<N, Evm>
where
N: NodePrimitives,
Evm: ConfigureEvmEnvFor<N> + 'static,
Evm: ConfigureEvmEnvFor<N>
+ 'static
+ ConfigureEvm<Header = N::BlockHeader, Transaction = N::SignedTx>
+ 'static,
{
/// Executes the payload based on the configured settings.
pub fn execute(&self) {
Expand Down Expand Up @@ -161,15 +166,8 @@ where
pub struct PayloadTaskHandle {
// TODO should internals be an enum to represent no parallel workload

// needs receiver to await the stateroot from the background task

// need channel to emit `StateUpdates` to the state root task

// On drop this should also terminate the prewarm task

// must include the receiver of the state root wired to the sparse trie
prewarm: Option<Sender<PrewarmTaskEvent>>,

/// Receiver for the state root
state_root: Option<mpsc::Receiver<StateRootResult>>,
}
Expand All @@ -180,14 +178,17 @@ impl PayloadTaskHandle {
todo!()
}

/// Terminates the pre-warming processing
/// Terminates the pre-warming processing.
///
/// This will terminate all inprogress tx pre-warm execution.
pub fn terminate_prewarming(&mut self) {
self.prewarm.take().map(|tx| tx.send(PrewarmTaskEvent::Terminate).ok());
}
}

impl Drop for PayloadTaskHandle {
fn drop(&mut self) {
// Ensure we drop clean up
self.terminate_prewarming();
}
}
Expand Down Expand Up @@ -255,13 +256,13 @@ pub(crate) enum SparseTrieEvent {
}

/// A task that executes transactions individually in parallel.
pub struct PrewarmTask<N: NodePrimitives, P, C> {
pub struct PrewarmTask<N: NodePrimitives, P, Evm> {
/// The executor used to spawn execution tasks.
executor: WorkloadExecutor,
/// Transactions pending execution.
pending: VecDeque<Recovered<N::SignedTx>>,
/// Context provided to execution tasks
ctx: PrewarmContext<N, P, C>,
ctx: PrewarmContext<N, P, Evm>,
/// How many txs are currently in progress
in_progress: usize,
/// How many transactions should be executed in parallel
Expand All @@ -274,16 +275,20 @@ pub struct PrewarmTask<N: NodePrimitives, P, C> {
actions_tx: Sender<PrewarmTaskEvent>,
}

impl<N, P, C> PrewarmTask<N, P, C>
impl<N, P, Evm> PrewarmTask<N, P, Evm>
where
N: NodePrimitives,
P: BlockReader + StateProviderFactory + StateReader + StateCommitmentProvider + Clone,
P: BlockReader + StateProviderFactory + StateReader + StateCommitmentProvider + Clone + 'static,
Evm: ConfigureEvmEnvFor<N>
+ 'static
+ ConfigureEvm<Header = N::BlockHeader, Transaction = N::SignedTx>
+ 'static,
{
fn new(
executor: WorkloadExecutor,
ctx: PrewarmContext<N, P, C>,
ctx: PrewarmContext<N, P, Evm>,
to_multi_proof: mpsc::Sender<StateRootMessage>,
transactions: VecDeque<N::SignedTx>,
transactions: VecDeque<Recovered<N::SignedTx>>,
) -> Self {
let (actions_tx, actions_rx) = mpsc::channel();
Self {
Expand All @@ -302,14 +307,23 @@ where
/// Spawns the next transactions
fn spawn_next(&mut self) {
while self.in_progress < self.max_concurrency {
if let Some(event) = self.pending.pop_front() {
// TODO spawn the next tx
if let Some(tx) = self.pending.pop_front() {
self.spawn_transaction(tx);
} else {
break
}
}
}

/// Spawns the given transaction as a blocking task.
fn spawn_transaction(&mut self, tx: Recovered<N::SignedTx>) {
let ctx = self.ctx.clone();
let actions_tx = self.actions_tx.clone();
self.executor.spawn_blocking(move || {
ctx.transact(tx);
});
}

fn is_done(&self) -> bool {
self.in_progress == 0 && self.pending.is_empty()
}
Expand Down Expand Up @@ -340,21 +354,66 @@ where

/// Context required by tx execution tasks.
#[derive(Debug, Clone)]
struct PrewarmContext<N: NodePrimitives, P, C> {
struct PrewarmContext<N: NodePrimitives, P, Evm> {
header: SealedHeaderFor<N>,
evm_config: C,
evm_config: Evm,
caches: ProviderCaches,
cache_metrics: CachedStateMetrics,
/// Provider to obtain the state
provider: StateProviderBuilder<N, P>,
}

impl<N: NodePrimitives, P, C> PrewarmContext<N, P, C> {}
impl<N, P, Evm> PrewarmContext<N, P, Evm>
where
N: NodePrimitives,
P: BlockReader + StateProviderFactory + StateReader + StateCommitmentProvider + Clone + 'static,
Evm: ConfigureEvmEnvFor<N>
+ 'static
+ ConfigureEvm<Header = N::BlockHeader, Transaction = N::SignedTx>
+ 'static,
{
/// Transacts the transaction and returns the state outcome.
// TODO: proper error handling
fn transact(mut self, tx: Recovered<N::SignedTx>) -> Option<EvmState> {
let Self { header, evm_config, caches, cache_metrics, provider } = self;
// Create the state provider inside the thread
let state_provider = match provider.build() {
Ok(provider) => provider,
Err(err) => {
trace!(
target: "engine::tree",
%err,
"Failed to build state provider in prewarm thread"
);
return None
}
};

// Use the caches to create a new provider with caching
let state_provider =
CachedStateProvider::new_with_caches(state_provider, caches, cache_metrics);

let state_provider = StateProviderDatabase::new(&state_provider);

let mut evm_env = evm_config.evm_env(&header);

evm_env.cfg_env.disable_nonce_check = true;

// create a new executor and disable nonce checks in the env
let mut evm = evm_config.evm_with_env(state_provider, evm_env);

// create the tx env and reset nonce
let tx_env = evm_config.tx_env(&tx);

todo!()
}
}

enum PrewarmTaskEvent {
Terminate,
Outcome {
// Evmstate outcome
/// Returns the state if the transaction
state: Option<EvmState>,
},
}

Expand Down

0 comments on commit f400c20

Please sign in to comment.