Skip to content

Commit

Permalink
Added 'serial' flag for blocks replay
Browse files Browse the repository at this point in the history
  • Loading branch information
Eagle941 committed Oct 1, 2024
1 parent 7859ee6 commit 0bdeb2a
Show file tree
Hide file tree
Showing 4 changed files with 66 additions and 10 deletions.
9 changes: 9 additions & 0 deletions cli/src/args.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,4 +45,13 @@ pub struct Args {
/// exists.
#[arg(long)]
pub overwrite: bool,

/// Set to perform serial replay of blocks.
///
/// Slower, but forces initial state of block `n+1` consistent with final
/// state of block `n`. This is not ensured with parallel replay because
/// the final state may differ with the final state on the official
/// blockchain.
#[arg(long)]
pub serial: bool,
}
3 changes: 2 additions & 1 deletion cli/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ fn run(args: Args) -> anyhow::Result<()> {
let txt_out = args.txt_out;
let trace_out = args.trace_out;
let overwrite = args.overwrite;
let serial = args.serial;

check_file(&svg_path, overwrite)?;
check_file(&txt_out, overwrite)?;
Expand All @@ -117,7 +118,7 @@ fn run(args: Args) -> anyhow::Result<()> {
tracing::info!(%start_block, %end_block, "Re-executing blocks");
let start_time = std::time::Instant::now();

let visited_pcs = run_replay(&replay_range, &trace_out, &storage)?;
let visited_pcs = run_replay(&replay_range, &trace_out, &storage, serial)?;

let elapsed = start_time.elapsed();
tracing::info!(?elapsed, "Finished");
Expand Down
60 changes: 53 additions & 7 deletions starknet-replay/src/runner/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ pub fn run_replay<T>(
replay_range: &ReplayRange,
trace_out: &Option<PathBuf>,
storage: &T,
serial: bool,
) -> Result<VisitedPcs, RunnerError>
where
T: Storage + Sync + Send,
Expand All @@ -51,7 +52,11 @@ where

// Iterate through each block in `replay_work` and replay all the
// transactions
replay_blocks(storage, trace_out, &replay_work)
if !serial {
replay_blocks_parallel(storage, trace_out, &replay_work)
} else {
replay_blocks_serial(storage, trace_out, &replay_work)
}
}

/// Generates the list of transactions to be replayed.
Expand Down Expand Up @@ -137,31 +142,33 @@ pub fn process_transaction_traces(transaction_simulations: Vec<TransactionOutput
cumulative_visited_pcs
}

/// Re-executes the list of transactions in `replay_work` and return the
/// Re-executes the list of blocks in `replay_work` in parallel and returns the
/// statistics on libfunc usage.
///
/// `replay_work` contains the list of transactions to replay grouped by block.
/// With parallel replay, initial state is always queried from the RPC server.
/// The consequence is that initial state of block `n+1` may be different from
/// final state of block `n`. This has many causes expecially for old blocks.
///
/// # Arguments
///
/// - `storage`: The object to query the starknet blockchain using the RPC
/// protocol.
/// - `trace_out`: The output file of the transaction traces.
/// - `replay_work`: The list of blocks to be replayed.
/// - `replay_work`: The list of transactions to replay grouped by block.
///
/// # Errors
///
/// Returns [`Err`] if the function `execute_block` fails to replay any
/// transaction.
pub fn replay_blocks<T>(
pub fn replay_blocks_parallel<T>(
storage: &T,
trace_out: &Option<PathBuf>,
replay_work: &[ReplayBlock],
) -> Result<VisitedPcs, RunnerError>
where
T: Storage + Sync + Send,
{
info!("Starting transactions replay");
info!("Starting parallel blocks replay");
let (sender, receiver) = channel();
replay_work
.par_iter()
Expand All @@ -170,7 +177,7 @@ where
|(storage, trace_out, sender), block| -> anyhow::Result<()> {
let block_transaction_traces = storage.execute_block(block, trace_out)?;
let block_number = BlockNumber::new(block.header.block_number.0);
info!("Simulation completed block {block_number}");
info!("Replay completed block {block_number}");
let visited_pcs = process_transaction_traces(block_transaction_traces);
sender.send(visited_pcs)?;
Ok(())
Expand All @@ -187,3 +194,42 @@ where

Ok(cumulative_visited_pcs)
}

/// Serially re-executes the list of blocks in `replay_work` and returns the
/// statistics on libfunc usage.
///
/// Serial replay is slower than parallel, however it ensures state consistency
/// between initial state of block `n+1` and final state of block `n`.
///
/// # Arguments
///
/// - `storage`: The object to query the starknet blockchain using the RPC
/// protocol.
/// - `trace_out`: The output file of the transaction traces.
/// - `replay_work`: The list of transactions to replay grouped by block.
///
/// # Errors
///
/// Returns [`Err`] if the function `execute_block` fails to replay any
/// transaction.
pub fn replay_blocks_serial<T>(
storage: &T,
trace_out: &Option<PathBuf>,
replay_work: &[ReplayBlock],
) -> Result<VisitedPcs, RunnerError>
where
T: Storage + Sync + Send,
{
info!("Starting serial blocks replay");

let mut cumulative_visited_pcs = VisitedPcs::default();
for block in replay_work {
let block_transaction_traces = storage.execute_block(block, trace_out)?;
let block_number = BlockNumber::new(block.header.block_number.0);
info!("Replay completed block {block_number}");
let visited_pcs = process_transaction_traces(block_transaction_traces);
cumulative_visited_pcs.extend(visited_pcs.into_iter());
}

Ok(cumulative_visited_pcs)
}
4 changes: 2 additions & 2 deletions starknet-replay/tests/test_replay_blocks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use starknet_replay::block_number::BlockNumber;
use starknet_replay::profiler::analysis::extract_libfuncs_weight;
use starknet_replay::profiler::replay_statistics::ReplayStatistics;
use starknet_replay::runner::replay_block::ReplayBlock;
use starknet_replay::runner::replay_blocks;
use starknet_replay::runner::replay_blocks_parallel;
use starknet_replay::storage::rpc::RpcStorage;
use starknet_replay::storage::Storage;
use test_log::test;
Expand Down Expand Up @@ -47,7 +47,7 @@ fn test_replay_blocks() {
replay_work.push(replay_block);

let trace_out = None;
let visited_pcs = replay_blocks(&storage, &trace_out, &replay_work).unwrap();
let visited_pcs = replay_blocks_parallel(&storage, &trace_out, &replay_work).unwrap();

let libfunc_stats = extract_libfuncs_weight(&visited_pcs, &storage).unwrap();

Expand Down

0 comments on commit 0bdeb2a

Please sign in to comment.