Skip to content

Commit

Permalink
make io stateless and store last block param in updatemanager
Browse files Browse the repository at this point in the history
  • Loading branch information
thomas-nguy committed Jan 31, 2025
1 parent ebcd2b4 commit d10c4b2
Show file tree
Hide file tree
Showing 7 changed files with 67 additions and 61 deletions.
7 changes: 1 addition & 6 deletions core/node/node_sync/src/external_io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@ pub struct ExternalIO {
actions: ActionQueue,
main_node_client: Box<dyn MainNodeClient>,
chain_id: L2ChainId,
pub next_l2_block_param: L2BlockParams,
}

impl ExternalIO {
Expand All @@ -57,7 +56,6 @@ impl ExternalIO {
actions,
main_node_client,
chain_id,
next_l2_block_param: L2BlockParams::default(),
})
}

Expand Down Expand Up @@ -350,7 +348,6 @@ impl StateKeeperIO for ExternalIO {
"L2 block number mismatch: expected {}, got {number}",
cursor.next_l2_block
);
self.next_l2_block_param = params;
return Ok(Some(params));
}
other => {
Expand All @@ -361,9 +358,7 @@ impl StateKeeperIO for ExternalIO {
}
}

fn get_updated_l2_block_params(&mut self) -> L2BlockParams {
self.next_l2_block_param
}
fn update_next_l2_block_timestamp(&mut self, _block_timestamp: &mut u64) {}

async fn wait_for_next_tx(
&mut self,
Expand Down
9 changes: 2 additions & 7 deletions core/node/state_keeper/src/io/mempool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,6 @@ pub struct MempoolIO {
chain_id: L2ChainId,
l2_da_validator_address: Option<Address>,
pubdata_type: PubdataType,
pub next_l2_block_param: L2BlockParams,
}

impl IoSealCriteria for MempoolIO {
Expand Down Expand Up @@ -289,17 +288,14 @@ impl StateKeeperIO for MempoolIO {
// This value is effectively ignored by the protocol.
virtual_blocks: 1,
};
self.next_l2_block_param = params;
Ok(Some(params))
}

fn get_updated_l2_block_params(&mut self) -> L2BlockParams {
fn update_next_l2_block_timestamp(&mut self, block_timestamp: &mut u64) {
let current_timestamp_millis = millis_since_epoch();
let current_timestamp = (current_timestamp_millis / 1_000) as u64;

let mut updated_params = self.next_l2_block_param;
updated_params.timestamp = current_timestamp;
updated_params
*block_timestamp = current_timestamp;
}

async fn wait_for_next_tx(
Expand Down Expand Up @@ -524,7 +520,6 @@ impl MempoolIO {
chain_id,
l2_da_validator_address,
pubdata_type,
next_l2_block_param: L2BlockParams::default(),
})
}

Expand Down
4 changes: 2 additions & 2 deletions core/node/state_keeper/src/io/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -135,8 +135,8 @@ pub trait StateKeeperIO: 'static + Send + Sync + fmt::Debug + IoSealCriteria {
max_wait: Duration,
) -> anyhow::Result<Option<L2BlockParams>>;

/// Get the updated parameters for the next L2 block.
fn get_updated_l2_block_params(&mut self) -> L2BlockParams;
/// Update the next block param timestamp
fn update_next_l2_block_timestamp(&mut self, block_timestamp: &mut u64);

/// Blocks for up to `max_wait` until the next transaction is available for execution.
/// Returns `None` if no transaction became available until the timeout.
Expand Down
12 changes: 8 additions & 4 deletions core/node/state_keeper/src/io/persistence.rs
Original file line number Diff line number Diff line change
Expand Up @@ -512,10 +512,11 @@ mod tests {
vec![],
);
output_handler.handle_l2_block(&updates).await.unwrap();
updates.push_l2_block(L2BlockParams {
updates.set_next_l2_block_parameters(L2BlockParams {
timestamp: 1,
virtual_blocks: 1,
});
updates.push_l2_block();

let mut batch_result = FinishedL1Batch::mock();
batch_result.final_execution_state.deduplicated_storage_logs =
Expand Down Expand Up @@ -623,10 +624,11 @@ mod tests {
persistence.submit_l2_block(seal_command).await;

// The second command should lead to blocking
updates_manager.push_l2_block(L2BlockParams {
updates_manager.set_next_l2_block_parameters(L2BlockParams {
timestamp: 2,
virtual_blocks: 1,
});
updates_manager.push_l2_block();
let seal_command = updates_manager.seal_l2_block_command(Some(Address::default()), false);
{
let submit_future = persistence.submit_l2_block(seal_command);
Expand All @@ -651,10 +653,11 @@ mod tests {
// Check that `wait_for_all_commands()` state is reset after use.
persistence.wait_for_all_commands().await;

updates_manager.push_l2_block(L2BlockParams {
updates_manager.set_next_l2_block_parameters(L2BlockParams {
timestamp: 3,
virtual_blocks: 1,
});
updates_manager.push_l2_block();
let seal_command = updates_manager.seal_l2_block_command(Some(Address::default()), false);
persistence.submit_l2_block(seal_command).await;
let command = sealer.commands_receiver.recv().await.unwrap();
Expand All @@ -675,10 +678,11 @@ mod tests {
for i in 1..=5 {
let seal_command =
updates_manager.seal_l2_block_command(Some(Address::default()), false);
updates_manager.push_l2_block(L2BlockParams {
updates_manager.set_next_l2_block_parameters(L2BlockParams {
timestamp: i,
virtual_blocks: 1,
});
updates_manager.push_l2_block();
persistence.submit_l2_block(seal_command).await;
}

Expand Down
67 changes: 35 additions & 32 deletions core/node/state_keeper/src/keeper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -195,12 +195,8 @@ impl ZkSyncStateKeeper {
let new_l2_block_params = self
.wait_for_new_l2_block_params(&updates_manager, &stop_receiver)
.await?;
Self::start_next_l2_block(
new_l2_block_params,
&mut updates_manager,
&mut *batch_executor,
)
.await?;
Self::set_l2_block_params(&mut updates_manager, new_l2_block_params);
Self::start_next_l2_block(&mut updates_manager, &mut *batch_executor).await?;
}

let (finished_batch, _) = batch_executor.finish_batch().await?;
Expand Down Expand Up @@ -411,6 +407,17 @@ impl ZkSyncStateKeeper {
Err(Error::Canceled)
}

#[tracing::instrument(
skip_all,
fields(
l1_batch = %updates_manager.l1_batch.number,
l2_block = %updates_manager.l2_block.number,
)
)]
fn set_l2_block_params(updates_manager: &mut UpdatesManager, l2_block_param: L2BlockParams) {
updates_manager.set_next_l2_block_parameters(l2_block_param);
}

#[tracing::instrument(
skip_all,
fields(
Expand All @@ -419,11 +426,10 @@ impl ZkSyncStateKeeper {
)
)]
async fn start_next_l2_block(
params: L2BlockParams,
updates_manager: &mut UpdatesManager,
batch_executor: &mut dyn BatchExecutor<OwnedStorage>,
) -> anyhow::Result<()> {
updates_manager.push_l2_block(params);
updates_manager.push_l2_block();
let block_env = updates_manager.l2_block.get_env();
batch_executor
.start_next_l2_block(block_env)
Expand Down Expand Up @@ -475,15 +481,14 @@ impl ZkSyncStateKeeper {
for (index, l2_block) in l2_blocks_to_reexecute.into_iter().enumerate() {
// Push any non-first L2 block to updates manager. The first one was pushed when `updates_manager` was initialized.
if index > 0 {
Self::start_next_l2_block(
Self::set_l2_block_params(
updates_manager,
L2BlockParams {
timestamp: l2_block.timestamp,
virtual_blocks: l2_block.virtual_blocks,
},
updates_manager,
batch_executor,
)
.await?;
);
Self::start_next_l2_block(updates_manager, batch_executor).await?;
}

let l2_block_number = l2_block.number;
Expand Down Expand Up @@ -550,9 +555,11 @@ impl ZkSyncStateKeeper {

// We've processed all the L2 blocks, and right now we're preparing the next *actual* L2 block.
// The `wait_for_new_l2_block_params` call is used to initialize the StateKeeperIO with a correct new l2 block params
self.wait_for_new_l2_block_params(updates_manager, stop_receiver)
let new_l2_block_params = self
.wait_for_new_l2_block_params(updates_manager, stop_receiver)
.await
.map_err(|e| e.context("wait_for_new_l2_block_params"))?;
Self::set_l2_block_params(updates_manager, new_l2_block_params);
Ok(true)
}

Expand All @@ -569,10 +576,6 @@ impl ZkSyncStateKeeper {
stop_receiver: &watch::Receiver<bool>,
) -> Result<(), Error> {
let mut is_last_block_sealed = state_restored;
let mut next_l2_block_params = L2BlockParams {
timestamp: updates_manager.l2_block.timestamp,
virtual_blocks: updates_manager.l2_block.virtual_blocks,
};

if let Some(protocol_upgrade_tx) = protocol_upgrade_tx {
self.process_upgrade_tx(batch_executor, updates_manager, protocol_upgrade_tx)
Expand All @@ -593,19 +596,15 @@ impl ZkSyncStateKeeper {

// Push the current block if it has not been done yet
if is_last_block_sealed {
next_l2_block_params = self.io.get_updated_l2_block_params();
self.io
.update_next_l2_block_timestamp(updates_manager.next_l2_block_timestamp());
tracing::debug!(
"Initialized new L2 block #{} (L1 batch #{}) with timestamp {}",
updates_manager.l2_block.number + 1,
updates_manager.l1_batch.number,
display_timestamp(next_l2_block_params.timestamp)
display_timestamp(updates_manager.next_l2_block_param.timestamp)
);
Self::start_next_l2_block(
next_l2_block_params,
updates_manager,
batch_executor,
)
.await?;
Self::start_next_l2_block(updates_manager, batch_executor).await?;
}
return Ok(());
}
Expand All @@ -620,20 +619,25 @@ impl ZkSyncStateKeeper {
is_last_block_sealed = true;

// Get a tentative new l2 block parameters
next_l2_block_params = self
let next_l2_block_params = self
.wait_for_new_l2_block_params(updates_manager, stop_receiver)
.await
.map_err(|e| e.context("wait_for_new_l2_block_params"))?;
Self::set_l2_block_params(updates_manager, next_l2_block_params);
}
let waiting_latency = KEEPER_METRICS.waiting_for_tx.start();

if is_last_block_sealed {
// The next block has not started yet, we keep updating the next l2 block parameters with correct timestamp
next_l2_block_params = self.io.get_updated_l2_block_params();
self.io
.update_next_l2_block_timestamp(updates_manager.next_l2_block_timestamp());
}
let Some(tx) = self
.io
.wait_for_next_tx(POLL_WAIT_DURATION, next_l2_block_params.timestamp)
.wait_for_next_tx(
POLL_WAIT_DURATION,
updates_manager.next_l2_block_param.timestamp,
)
.instrument(info_span!("wait_for_next_tx"))
.await
.context("error waiting for next transaction")?
Expand All @@ -652,10 +656,9 @@ impl ZkSyncStateKeeper {
"Initialized new L2 block #{} (L1 batch #{}) with timestamp {}",
updates_manager.l2_block.number + 1,
updates_manager.l1_batch.number,
display_timestamp(next_l2_block_params.timestamp)
display_timestamp(updates_manager.next_l2_block_param.timestamp)
);
Self::start_next_l2_block(next_l2_block_params, updates_manager, batch_executor)
.await?;
Self::start_next_l2_block(updates_manager, batch_executor).await?;
is_last_block_sealed = false;
}

Expand Down
7 changes: 1 addition & 6 deletions core/node/state_keeper/src/testonly/test_batch_executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -562,7 +562,6 @@ pub(crate) struct TestIO {
protocol_version: ProtocolVersionId,
previous_batch_protocol_version: ProtocolVersionId,
protocol_upgrade_txs: HashMap<ProtocolVersionId, ProtocolUpgradeTx>,
pub next_l2_block_param: L2BlockParams,
}

impl fmt::Debug for TestIO {
Expand Down Expand Up @@ -610,7 +609,6 @@ impl TestIO {
protocol_version: ProtocolVersionId::latest(),
previous_batch_protocol_version: ProtocolVersionId::latest(),
protocol_upgrade_txs: HashMap::default(),
next_l2_block_param: L2BlockParams::default(),
};
(this, OutputHandler::new(Box::new(persistence)))
}
Expand Down Expand Up @@ -719,13 +717,10 @@ impl StateKeeperIO for TestIO {
};
self.l2_block_number += 1;
self.timestamp += 1;
self.next_l2_block_param = params;
Ok(Some(params))
}

fn get_updated_l2_block_params(&mut self) -> L2BlockParams {
self.next_l2_block_param
}
fn update_next_l2_block_timestamp(&mut self, _block_timestamp: &mut u64) {}

async fn wait_for_next_tx(
&mut self,
Expand Down
22 changes: 18 additions & 4 deletions core/node/state_keeper/src/updates/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ pub struct UpdatesManager {
pub l2_block: L2BlockUpdates,
pub storage_writes_deduplicator: StorageWritesDeduplicator,
pubdata_params: PubdataParams,
pub next_l2_block_param: L2BlockParams,
}

impl UpdatesManager {
Expand Down Expand Up @@ -66,6 +67,10 @@ impl UpdatesManager {
storage_writes_deduplicator: StorageWritesDeduplicator::new(),
storage_view_cache: None,
pubdata_params,
next_l2_block_param: L2BlockParams {
timestamp: l1_batch_env.first_l2_block.timestamp,
virtual_blocks: l1_batch_env.first_l2_block.max_virtual_blocks_to_create,
},
}
}

Expand All @@ -77,6 +82,10 @@ impl UpdatesManager {
self.base_system_contract_hashes
}

pub(crate) fn next_l2_block_timestamp(&mut self) -> &mut u64 {
&mut self.next_l2_block_param.timestamp
}

pub(crate) fn io_cursor(&self) -> IoCursor {
IoCursor {
next_l2_block: self.l2_block.number + 1,
Expand Down Expand Up @@ -167,19 +176,23 @@ impl UpdatesManager {

/// Pushes a new L2 block with the specified timestamp into this manager. The previously
/// held L2 block is considered sealed and is used to extend the L1 batch data.
pub fn push_l2_block(&mut self, l2_block_params: L2BlockParams) {
pub fn push_l2_block(&mut self) {
let new_l2_block_updates = L2BlockUpdates::new(
l2_block_params.timestamp,
self.next_l2_block_param.timestamp,
self.l2_block.number + 1,
self.l2_block.get_l2_block_hash(),
l2_block_params.virtual_blocks,
self.next_l2_block_param.virtual_blocks,
self.protocol_version,
);
let old_l2_block_updates = std::mem::replace(&mut self.l2_block, new_l2_block_updates);
self.l1_batch
.extend_from_sealed_l2_block(old_l2_block_updates);
}

pub fn set_next_l2_block_parameters(&mut self, l2_block_param: L2BlockParams) {
self.next_l2_block_param = l2_block_param
}

pub(crate) fn pending_executed_transactions_len(&self) -> usize {
self.l1_batch.executed_transactions.len() + self.l2_block.executed_transactions.len()
}
Expand Down Expand Up @@ -243,10 +256,11 @@ mod tests {
assert_eq!(updates_manager.l1_batch.executed_transactions.len(), 0);

// Seal an L2 block.
updates_manager.push_l2_block(L2BlockParams {
updates_manager.set_next_l2_block_parameters(L2BlockParams {
timestamp: 2,
virtual_blocks: 1,
});
updates_manager.push_l2_block();

// Check that L1 batch updates are the same with the pending state
// and L2 block updates are empty.
Expand Down

0 comments on commit d10c4b2

Please sign in to comment.