diff --git a/core/node/node_sync/src/external_io.rs b/core/node/node_sync/src/external_io.rs index 5719da4769ea..b0389420134a 100644 --- a/core/node/node_sync/src/external_io.rs +++ b/core/node/node_sync/src/external_io.rs @@ -40,7 +40,6 @@ pub struct ExternalIO { actions: ActionQueue, main_node_client: Box, chain_id: L2ChainId, - pub next_l2_block_param: L2BlockParams, } impl ExternalIO { @@ -57,7 +56,6 @@ impl ExternalIO { actions, main_node_client, chain_id, - next_l2_block_param: L2BlockParams::default(), }) } @@ -350,7 +348,6 @@ impl StateKeeperIO for ExternalIO { "L2 block number mismatch: expected {}, got {number}", cursor.next_l2_block ); - self.next_l2_block_param = params; return Ok(Some(params)); } other => { @@ -361,9 +358,7 @@ impl StateKeeperIO for ExternalIO { } } - fn get_updated_l2_block_params(&mut self) -> L2BlockParams { - self.next_l2_block_param - } + fn update_next_l2_block_timestamp(&mut self, _block_timestamp: &mut u64) {} async fn wait_for_next_tx( &mut self, diff --git a/core/node/state_keeper/src/io/mempool.rs b/core/node/state_keeper/src/io/mempool.rs index aad300ae5b3c..83494c1a09d3 100644 --- a/core/node/state_keeper/src/io/mempool.rs +++ b/core/node/state_keeper/src/io/mempool.rs @@ -59,7 +59,6 @@ pub struct MempoolIO { chain_id: L2ChainId, l2_da_validator_address: Option
, pubdata_type: PubdataType, - pub next_l2_block_param: L2BlockParams, } impl IoSealCriteria for MempoolIO { @@ -289,17 +288,14 @@ impl StateKeeperIO for MempoolIO { // This value is effectively ignored by the protocol. virtual_blocks: 1, }; - self.next_l2_block_param = params; Ok(Some(params)) } - fn get_updated_l2_block_params(&mut self) -> L2BlockParams { + fn update_next_l2_block_timestamp(&mut self, block_timestamp: &mut u64) { let current_timestamp_millis = millis_since_epoch(); let current_timestamp = (current_timestamp_millis / 1_000) as u64; - let mut updated_params = self.next_l2_block_param; - updated_params.timestamp = current_timestamp; - updated_params + *block_timestamp = current_timestamp; } async fn wait_for_next_tx( @@ -524,7 +520,6 @@ impl MempoolIO { chain_id, l2_da_validator_address, pubdata_type, - next_l2_block_param: L2BlockParams::default(), }) } diff --git a/core/node/state_keeper/src/io/mod.rs b/core/node/state_keeper/src/io/mod.rs index 354a6d9afa3d..acf08b747835 100644 --- a/core/node/state_keeper/src/io/mod.rs +++ b/core/node/state_keeper/src/io/mod.rs @@ -135,8 +135,8 @@ pub trait StateKeeperIO: 'static + Send + Sync + fmt::Debug + IoSealCriteria { max_wait: Duration, ) -> anyhow::Result>; - /// Get the updated parameters for the next L2 block. - fn get_updated_l2_block_params(&mut self) -> L2BlockParams; + /// Update the next block param timestamp + fn update_next_l2_block_timestamp(&mut self, block_timestamp: &mut u64); /// Blocks for up to `max_wait` until the next transaction is available for execution. /// Returns `None` if no transaction became available until the timeout. diff --git a/core/node/state_keeper/src/io/persistence.rs b/core/node/state_keeper/src/io/persistence.rs index 8db7fe4120ed..423c1cd56654 100644 --- a/core/node/state_keeper/src/io/persistence.rs +++ b/core/node/state_keeper/src/io/persistence.rs @@ -512,10 +512,11 @@ mod tests { vec![], ); output_handler.handle_l2_block(&updates).await.unwrap(); - updates.push_l2_block(L2BlockParams { + updates.set_next_l2_block_parameters(L2BlockParams { timestamp: 1, virtual_blocks: 1, }); + updates.push_l2_block(); let mut batch_result = FinishedL1Batch::mock(); batch_result.final_execution_state.deduplicated_storage_logs = @@ -623,10 +624,11 @@ mod tests { persistence.submit_l2_block(seal_command).await; // The second command should lead to blocking - updates_manager.push_l2_block(L2BlockParams { + updates_manager.set_next_l2_block_parameters(L2BlockParams { timestamp: 2, virtual_blocks: 1, }); + updates_manager.push_l2_block(); let seal_command = updates_manager.seal_l2_block_command(Some(Address::default()), false); { let submit_future = persistence.submit_l2_block(seal_command); @@ -651,10 +653,11 @@ mod tests { // Check that `wait_for_all_commands()` state is reset after use. persistence.wait_for_all_commands().await; - updates_manager.push_l2_block(L2BlockParams { + updates_manager.set_next_l2_block_parameters(L2BlockParams { timestamp: 3, virtual_blocks: 1, }); + updates_manager.push_l2_block(); let seal_command = updates_manager.seal_l2_block_command(Some(Address::default()), false); persistence.submit_l2_block(seal_command).await; let command = sealer.commands_receiver.recv().await.unwrap(); @@ -675,10 +678,11 @@ mod tests { for i in 1..=5 { let seal_command = updates_manager.seal_l2_block_command(Some(Address::default()), false); - updates_manager.push_l2_block(L2BlockParams { + updates_manager.set_next_l2_block_parameters(L2BlockParams { timestamp: i, virtual_blocks: 1, }); + updates_manager.push_l2_block(); persistence.submit_l2_block(seal_command).await; } diff --git a/core/node/state_keeper/src/keeper.rs b/core/node/state_keeper/src/keeper.rs index a2177bb06f17..bed6d741af7e 100644 --- a/core/node/state_keeper/src/keeper.rs +++ b/core/node/state_keeper/src/keeper.rs @@ -195,12 +195,8 @@ impl ZkSyncStateKeeper { let new_l2_block_params = self .wait_for_new_l2_block_params(&updates_manager, &stop_receiver) .await?; - Self::start_next_l2_block( - new_l2_block_params, - &mut updates_manager, - &mut *batch_executor, - ) - .await?; + Self::set_l2_block_params(&mut updates_manager, new_l2_block_params); + Self::start_next_l2_block(&mut updates_manager, &mut *batch_executor).await?; } let (finished_batch, _) = batch_executor.finish_batch().await?; @@ -411,6 +407,17 @@ impl ZkSyncStateKeeper { Err(Error::Canceled) } + #[tracing::instrument( + skip_all, + fields( + l1_batch = %updates_manager.l1_batch.number, + l2_block = %updates_manager.l2_block.number, + ) + )] + fn set_l2_block_params(updates_manager: &mut UpdatesManager, l2_block_param: L2BlockParams) { + updates_manager.set_next_l2_block_parameters(l2_block_param); + } + #[tracing::instrument( skip_all, fields( @@ -419,11 +426,10 @@ impl ZkSyncStateKeeper { ) )] async fn start_next_l2_block( - params: L2BlockParams, updates_manager: &mut UpdatesManager, batch_executor: &mut dyn BatchExecutor, ) -> anyhow::Result<()> { - updates_manager.push_l2_block(params); + updates_manager.push_l2_block(); let block_env = updates_manager.l2_block.get_env(); batch_executor .start_next_l2_block(block_env) @@ -475,15 +481,14 @@ impl ZkSyncStateKeeper { for (index, l2_block) in l2_blocks_to_reexecute.into_iter().enumerate() { // Push any non-first L2 block to updates manager. The first one was pushed when `updates_manager` was initialized. if index > 0 { - Self::start_next_l2_block( + Self::set_l2_block_params( + updates_manager, L2BlockParams { timestamp: l2_block.timestamp, virtual_blocks: l2_block.virtual_blocks, }, - updates_manager, - batch_executor, - ) - .await?; + ); + Self::start_next_l2_block(updates_manager, batch_executor).await?; } let l2_block_number = l2_block.number; @@ -550,9 +555,11 @@ impl ZkSyncStateKeeper { // We've processed all the L2 blocks, and right now we're preparing the next *actual* L2 block. // The `wait_for_new_l2_block_params` call is used to initialize the StateKeeperIO with a correct new l2 block params - self.wait_for_new_l2_block_params(updates_manager, stop_receiver) + let new_l2_block_params = self + .wait_for_new_l2_block_params(updates_manager, stop_receiver) .await .map_err(|e| e.context("wait_for_new_l2_block_params"))?; + Self::set_l2_block_params(updates_manager, new_l2_block_params); Ok(true) } @@ -569,10 +576,6 @@ impl ZkSyncStateKeeper { stop_receiver: &watch::Receiver, ) -> Result<(), Error> { let mut is_last_block_sealed = state_restored; - let mut next_l2_block_params = L2BlockParams { - timestamp: updates_manager.l2_block.timestamp, - virtual_blocks: updates_manager.l2_block.virtual_blocks, - }; if let Some(protocol_upgrade_tx) = protocol_upgrade_tx { self.process_upgrade_tx(batch_executor, updates_manager, protocol_upgrade_tx) @@ -593,19 +596,15 @@ impl ZkSyncStateKeeper { // Push the current block if it has not been done yet if is_last_block_sealed { - next_l2_block_params = self.io.get_updated_l2_block_params(); + self.io + .update_next_l2_block_timestamp(updates_manager.next_l2_block_timestamp()); tracing::debug!( "Initialized new L2 block #{} (L1 batch #{}) with timestamp {}", updates_manager.l2_block.number + 1, updates_manager.l1_batch.number, - display_timestamp(next_l2_block_params.timestamp) + display_timestamp(updates_manager.next_l2_block_param.timestamp) ); - Self::start_next_l2_block( - next_l2_block_params, - updates_manager, - batch_executor, - ) - .await?; + Self::start_next_l2_block(updates_manager, batch_executor).await?; } return Ok(()); } @@ -620,20 +619,25 @@ impl ZkSyncStateKeeper { is_last_block_sealed = true; // Get a tentative new l2 block parameters - next_l2_block_params = self + let next_l2_block_params = self .wait_for_new_l2_block_params(updates_manager, stop_receiver) .await .map_err(|e| e.context("wait_for_new_l2_block_params"))?; + Self::set_l2_block_params(updates_manager, next_l2_block_params); } let waiting_latency = KEEPER_METRICS.waiting_for_tx.start(); if is_last_block_sealed { // The next block has not started yet, we keep updating the next l2 block parameters with correct timestamp - next_l2_block_params = self.io.get_updated_l2_block_params(); + self.io + .update_next_l2_block_timestamp(updates_manager.next_l2_block_timestamp()); } let Some(tx) = self .io - .wait_for_next_tx(POLL_WAIT_DURATION, next_l2_block_params.timestamp) + .wait_for_next_tx( + POLL_WAIT_DURATION, + updates_manager.next_l2_block_param.timestamp, + ) .instrument(info_span!("wait_for_next_tx")) .await .context("error waiting for next transaction")? @@ -652,10 +656,9 @@ impl ZkSyncStateKeeper { "Initialized new L2 block #{} (L1 batch #{}) with timestamp {}", updates_manager.l2_block.number + 1, updates_manager.l1_batch.number, - display_timestamp(next_l2_block_params.timestamp) + display_timestamp(updates_manager.next_l2_block_param.timestamp) ); - Self::start_next_l2_block(next_l2_block_params, updates_manager, batch_executor) - .await?; + Self::start_next_l2_block(updates_manager, batch_executor).await?; is_last_block_sealed = false; } diff --git a/core/node/state_keeper/src/testonly/test_batch_executor.rs b/core/node/state_keeper/src/testonly/test_batch_executor.rs index ab1e6b57519b..3b5eefd2a089 100644 --- a/core/node/state_keeper/src/testonly/test_batch_executor.rs +++ b/core/node/state_keeper/src/testonly/test_batch_executor.rs @@ -562,7 +562,6 @@ pub(crate) struct TestIO { protocol_version: ProtocolVersionId, previous_batch_protocol_version: ProtocolVersionId, protocol_upgrade_txs: HashMap, - pub next_l2_block_param: L2BlockParams, } impl fmt::Debug for TestIO { @@ -610,7 +609,6 @@ impl TestIO { protocol_version: ProtocolVersionId::latest(), previous_batch_protocol_version: ProtocolVersionId::latest(), protocol_upgrade_txs: HashMap::default(), - next_l2_block_param: L2BlockParams::default(), }; (this, OutputHandler::new(Box::new(persistence))) } @@ -719,13 +717,10 @@ impl StateKeeperIO for TestIO { }; self.l2_block_number += 1; self.timestamp += 1; - self.next_l2_block_param = params; Ok(Some(params)) } - fn get_updated_l2_block_params(&mut self) -> L2BlockParams { - self.next_l2_block_param - } + fn update_next_l2_block_timestamp(&mut self, _block_timestamp: &mut u64) {} async fn wait_for_next_tx( &mut self, diff --git a/core/node/state_keeper/src/updates/mod.rs b/core/node/state_keeper/src/updates/mod.rs index b4f548527652..b0b11b9d0168 100644 --- a/core/node/state_keeper/src/updates/mod.rs +++ b/core/node/state_keeper/src/updates/mod.rs @@ -39,6 +39,7 @@ pub struct UpdatesManager { pub l2_block: L2BlockUpdates, pub storage_writes_deduplicator: StorageWritesDeduplicator, pubdata_params: PubdataParams, + pub next_l2_block_param: L2BlockParams, } impl UpdatesManager { @@ -66,6 +67,10 @@ impl UpdatesManager { storage_writes_deduplicator: StorageWritesDeduplicator::new(), storage_view_cache: None, pubdata_params, + next_l2_block_param: L2BlockParams { + timestamp: l1_batch_env.first_l2_block.timestamp, + virtual_blocks: l1_batch_env.first_l2_block.max_virtual_blocks_to_create, + }, } } @@ -77,6 +82,10 @@ impl UpdatesManager { self.base_system_contract_hashes } + pub(crate) fn next_l2_block_timestamp(&mut self) -> &mut u64 { + &mut self.next_l2_block_param.timestamp + } + pub(crate) fn io_cursor(&self) -> IoCursor { IoCursor { next_l2_block: self.l2_block.number + 1, @@ -167,12 +176,12 @@ impl UpdatesManager { /// Pushes a new L2 block with the specified timestamp into this manager. The previously /// held L2 block is considered sealed and is used to extend the L1 batch data. - pub fn push_l2_block(&mut self, l2_block_params: L2BlockParams) { + pub fn push_l2_block(&mut self) { let new_l2_block_updates = L2BlockUpdates::new( - l2_block_params.timestamp, + self.next_l2_block_param.timestamp, self.l2_block.number + 1, self.l2_block.get_l2_block_hash(), - l2_block_params.virtual_blocks, + self.next_l2_block_param.virtual_blocks, self.protocol_version, ); let old_l2_block_updates = std::mem::replace(&mut self.l2_block, new_l2_block_updates); @@ -180,6 +189,10 @@ impl UpdatesManager { .extend_from_sealed_l2_block(old_l2_block_updates); } + pub fn set_next_l2_block_parameters(&mut self, l2_block_param: L2BlockParams) { + self.next_l2_block_param = l2_block_param + } + pub(crate) fn pending_executed_transactions_len(&self) -> usize { self.l1_batch.executed_transactions.len() + self.l2_block.executed_transactions.len() } @@ -243,10 +256,11 @@ mod tests { assert_eq!(updates_manager.l1_batch.executed_transactions.len(), 0); // Seal an L2 block. - updates_manager.push_l2_block(L2BlockParams { + updates_manager.set_next_l2_block_parameters(L2BlockParams { timestamp: 2, virtual_blocks: 1, }); + updates_manager.push_l2_block(); // Check that L1 batch updates are the same with the pending state // and L2 block updates are empty.