diff --git a/Cargo.lock b/Cargo.lock index 0aa0762ebe55..478372734bdd 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4643,7 +4643,7 @@ dependencies = [ [[package]] name = "pairing_ce" version = "0.28.5" -source = "git+https://github.com/matter-labs/pairing.git?rev=f55393f#f55393fd366596eac792d78525d26e9c4d6ed1ca" +source = "git+https://github.com/matter-labs/pairing.git?rev=f55393fd366596eac792d78525d26e9c4d6ed1ca#f55393fd366596eac792d78525d26e9c4d6ed1ca" dependencies = [ "byteorder", "cfg-if 1.0.0", @@ -8465,7 +8465,7 @@ dependencies = [ [[package]] name = "zksync_concurrency" version = "0.1.0" -source = "git+https://github.com/matter-labs/era-consensus.git?rev=da015d4c94b19962bc11622b6cc256e214256555#da015d4c94b19962bc11622b6cc256e214256555" +source = "git+https://github.com/matter-labs/era-consensus.git?rev=49b1a98f80d0e9f74fdceadece4283e745c71599#49b1a98f80d0e9f74fdceadece4283e745c71599" dependencies = [ "anyhow", "once_cell", @@ -8492,9 +8492,10 @@ dependencies = [ [[package]] name = "zksync_consensus_bft" version = "0.1.0" -source = "git+https://github.com/matter-labs/era-consensus.git?rev=da015d4c94b19962bc11622b6cc256e214256555#da015d4c94b19962bc11622b6cc256e214256555" +source = "git+https://github.com/matter-labs/era-consensus.git?rev=49b1a98f80d0e9f74fdceadece4283e745c71599#49b1a98f80d0e9f74fdceadece4283e745c71599" dependencies = [ "anyhow", + "async-trait", "once_cell", "rand 0.8.5", "thiserror", @@ -8512,14 +8513,14 @@ dependencies = [ [[package]] name = "zksync_consensus_crypto" version = "0.1.0" -source = "git+https://github.com/matter-labs/era-consensus.git?rev=da015d4c94b19962bc11622b6cc256e214256555#da015d4c94b19962bc11622b6cc256e214256555" +source = "git+https://github.com/matter-labs/era-consensus.git?rev=49b1a98f80d0e9f74fdceadece4283e745c71599#49b1a98f80d0e9f74fdceadece4283e745c71599" dependencies = [ "anyhow", "blst", "ed25519-dalek", "ff_ce", "hex", - "pairing_ce 0.28.5 (git+https://github.com/matter-labs/pairing.git?rev=f55393f)", + "pairing_ce 0.28.5 (git+https://github.com/matter-labs/pairing.git?rev=f55393fd366596eac792d78525d26e9c4d6ed1ca)", "rand 0.4.6", "rand 0.8.5", "sha3 0.10.8", @@ -8530,7 +8531,7 @@ dependencies = [ [[package]] name = "zksync_consensus_executor" version = "0.1.0" -source = "git+https://github.com/matter-labs/era-consensus.git?rev=da015d4c94b19962bc11622b6cc256e214256555#da015d4c94b19962bc11622b6cc256e214256555" +source = "git+https://github.com/matter-labs/era-consensus.git?rev=49b1a98f80d0e9f74fdceadece4283e745c71599#49b1a98f80d0e9f74fdceadece4283e745c71599" dependencies = [ "anyhow", "prost", @@ -8552,7 +8553,7 @@ dependencies = [ [[package]] name = "zksync_consensus_network" version = "0.1.0" -source = "git+https://github.com/matter-labs/era-consensus.git?rev=da015d4c94b19962bc11622b6cc256e214256555#da015d4c94b19962bc11622b6cc256e214256555" +source = "git+https://github.com/matter-labs/era-consensus.git?rev=49b1a98f80d0e9f74fdceadece4283e745c71599#49b1a98f80d0e9f74fdceadece4283e745c71599" dependencies = [ "anyhow", "async-trait", @@ -8576,7 +8577,7 @@ dependencies = [ [[package]] name = "zksync_consensus_roles" version = "0.1.0" -source = "git+https://github.com/matter-labs/era-consensus.git?rev=da015d4c94b19962bc11622b6cc256e214256555#da015d4c94b19962bc11622b6cc256e214256555" +source = "git+https://github.com/matter-labs/era-consensus.git?rev=49b1a98f80d0e9f74fdceadece4283e745c71599#49b1a98f80d0e9f74fdceadece4283e745c71599" dependencies = [ "anyhow", "bit-vec", @@ -8596,7 +8597,7 @@ dependencies = [ [[package]] name = "zksync_consensus_storage" version = "0.1.0" -source = "git+https://github.com/matter-labs/era-consensus.git?rev=da015d4c94b19962bc11622b6cc256e214256555#da015d4c94b19962bc11622b6cc256e214256555" +source = "git+https://github.com/matter-labs/era-consensus.git?rev=49b1a98f80d0e9f74fdceadece4283e745c71599#49b1a98f80d0e9f74fdceadece4283e745c71599" dependencies = [ "anyhow", "async-trait", @@ -8613,7 +8614,7 @@ dependencies = [ [[package]] name = "zksync_consensus_sync_blocks" version = "0.1.0" -source = "git+https://github.com/matter-labs/era-consensus.git?rev=da015d4c94b19962bc11622b6cc256e214256555#da015d4c94b19962bc11622b6cc256e214256555" +source = "git+https://github.com/matter-labs/era-consensus.git?rev=49b1a98f80d0e9f74fdceadece4283e745c71599#49b1a98f80d0e9f74fdceadece4283e745c71599" dependencies = [ "anyhow", "thiserror", @@ -8628,7 +8629,7 @@ dependencies = [ [[package]] name = "zksync_consensus_utils" version = "0.1.0" -source = "git+https://github.com/matter-labs/era-consensus.git?rev=da015d4c94b19962bc11622b6cc256e214256555#da015d4c94b19962bc11622b6cc256e214256555" +source = "git+https://github.com/matter-labs/era-consensus.git?rev=49b1a98f80d0e9f74fdceadece4283e745c71599#49b1a98f80d0e9f74fdceadece4283e745c71599" dependencies = [ "thiserror", "zksync_concurrency", @@ -8726,6 +8727,7 @@ dependencies = [ "zksync_commitment_utils", "zksync_concurrency", "zksync_config", + "zksync_consensus_bft", "zksync_consensus_executor", "zksync_consensus_roles", "zksync_consensus_storage", @@ -8791,6 +8793,7 @@ dependencies = [ "url", "vise", "zksync_consensus_roles", + "zksync_consensus_storage", "zksync_contracts", "zksync_health_check", "zksync_protobuf", @@ -8963,7 +8966,7 @@ dependencies = [ [[package]] name = "zksync_protobuf" version = "0.1.0" -source = "git+https://github.com/matter-labs/era-consensus.git?rev=da015d4c94b19962bc11622b6cc256e214256555#da015d4c94b19962bc11622b6cc256e214256555" +source = "git+https://github.com/matter-labs/era-consensus.git?rev=49b1a98f80d0e9f74fdceadece4283e745c71599#49b1a98f80d0e9f74fdceadece4283e745c71599" dependencies = [ "anyhow", "bit-vec", @@ -8981,7 +8984,7 @@ dependencies = [ [[package]] name = "zksync_protobuf_build" version = "0.1.0" -source = "git+https://github.com/matter-labs/era-consensus.git?rev=da015d4c94b19962bc11622b6cc256e214256555#da015d4c94b19962bc11622b6cc256e214256555" +source = "git+https://github.com/matter-labs/era-consensus.git?rev=49b1a98f80d0e9f74fdceadece4283e745c71599#49b1a98f80d0e9f74fdceadece4283e745c71599" dependencies = [ "anyhow", "heck 0.4.1", diff --git a/core/lib/dal/Cargo.toml b/core/lib/dal/Cargo.toml index d73837cdc29c..b34d70ac3a97 100644 --- a/core/lib/dal/Cargo.toml +++ b/core/lib/dal/Cargo.toml @@ -18,8 +18,9 @@ zksync_system_constants = { path = "../constants" } zksync_contracts = { path = "../contracts" } zksync_types = { path = "../types" } zksync_health_check = { path = "../health_check" } -zksync_consensus_roles = { version = "0.1.0", git = "https://github.com/matter-labs/era-consensus.git", rev = "da015d4c94b19962bc11622b6cc256e214256555" } -zksync_protobuf = { version = "0.1.0", git = "https://github.com/matter-labs/era-consensus.git", rev = "da015d4c94b19962bc11622b6cc256e214256555" } +zksync_consensus_roles = { version = "0.1.0", git = "https://github.com/matter-labs/era-consensus.git", rev = "49b1a98f80d0e9f74fdceadece4283e745c71599" } +zksync_consensus_storage = { version = "0.1.0", git = "https://github.com/matter-labs/era-consensus.git", rev = "49b1a98f80d0e9f74fdceadece4283e745c71599" } +zksync_protobuf = { version = "0.1.0", git = "https://github.com/matter-labs/era-consensus.git", rev = "49b1a98f80d0e9f74fdceadece4283e745c71599" } itertools = "0.10.1" thiserror = "1.0" @@ -53,4 +54,4 @@ tracing = "0.1" assert_matches = "1.5.0" [build-dependencies] -zksync_protobuf_build = { version = "0.1.0", git = "https://github.com/matter-labs/era-consensus.git", rev = "da015d4c94b19962bc11622b6cc256e214256555" } +zksync_protobuf_build = { version = "0.1.0", git = "https://github.com/matter-labs/era-consensus.git", rev = "49b1a98f80d0e9f74fdceadece4283e745c71599" } diff --git a/core/lib/dal/migrations/20231128123456_create_consensus_replica_state_table.down.sql b/core/lib/dal/migrations/20231128123456_create_consensus_replica_state_table.down.sql new file mode 100644 index 000000000000..f81d2ea929db --- /dev/null +++ b/core/lib/dal/migrations/20231128123456_create_consensus_replica_state_table.down.sql @@ -0,0 +1 @@ +DROP TABLE consensus_replica_state; diff --git a/core/lib/dal/migrations/20231128123456_create_consensus_replica_state_table.up.sql b/core/lib/dal/migrations/20231128123456_create_consensus_replica_state_table.up.sql new file mode 100644 index 000000000000..d0cdc951d232 --- /dev/null +++ b/core/lib/dal/migrations/20231128123456_create_consensus_replica_state_table.up.sql @@ -0,0 +1,6 @@ +CREATE TABLE IF NOT EXISTS consensus_replica_state ( + state JSONB NOT NULL, + -- artificial primary key ensuring that the table contains at most 1 row. + fake_key BOOLEAN PRIMARY KEY, + CHECK (fake_key) +); diff --git a/core/lib/dal/sqlx-data.json b/core/lib/dal/sqlx-data.json index 625ce574af1f..9ca456eb08dc 100644 --- a/core/lib/dal/sqlx-data.json +++ b/core/lib/dal/sqlx-data.json @@ -13,6 +13,24 @@ }, "query": "UPDATE proof_generation_details SET status=$1, updated_at = now() WHERE l1_batch_number = $2" }, + "002a901f2802beb48e4d15437fba9a04885a5e4f8f17089a09f31edfd7b3d0bb": { + "describe": { + "columns": [ + { + "name": "number", + "ordinal": 0, + "type_info": "Int8" + } + ], + "nullable": [ + false + ], + "parameters": { + "Left": [] + } + }, + "query": "SELECT number FROM miniblocks WHERE consensus IS NOT NULL ORDER BY number DESC LIMIT 1" + }, "00bd80fd83aff559d8d9232c2e98a12a1dd2c8f31792cd915e2cf11f28e583b7": { "describe": { "columns": [ @@ -710,6 +728,24 @@ }, "query": "SELECT l1_batch_number, factory_deps_filepath, storage_logs_filepaths FROM snapshots WHERE l1_batch_number = $1" }, + "166dcd8d504ba3f52a9e44a05305ed00954ab9b5302be4bb5ab05dfd2272afca": { + "describe": { + "columns": [ + { + "name": "state!", + "ordinal": 0, + "type_info": "Jsonb" + } + ], + "nullable": [ + false + ], + "parameters": { + "Left": [] + } + }, + "query": "SELECT state as \"state!\" FROM consensus_replica_state WHERE fake_key" + }, "16bca6f4258ff3db90a26a8550c5fc35e666fb698960486528fceba3e452fd62": { "describe": { "columns": [ @@ -10486,6 +10522,18 @@ }, "query": "UPDATE basic_witness_input_producer_jobs SET status = $1, attempts = attempts + 1, updated_at = now(), processing_started_at = now() WHERE l1_batch_number = ( SELECT l1_batch_number FROM basic_witness_input_producer_jobs WHERE status = $2 OR (status = $1 AND processing_started_at < now() - $4::interval) OR (status = $3 AND attempts < $5) ORDER BY l1_batch_number ASC LIMIT 1 FOR UPDATE SKIP LOCKED ) RETURNING basic_witness_input_producer_jobs.l1_batch_number" }, + "e577743852337c926e1566c46f4cbab5e6ab1409921fa8e25c9dfa7dfa03daae": { + "describe": { + "columns": [], + "nullable": [], + "parameters": { + "Left": [ + "Jsonb" + ] + } + }, + "query": "INSERT INTO consensus_replica_state(fake_key,state) VALUES(true,$1) ON CONFLICT (fake_key) DO UPDATE SET state = excluded.state" + }, "e626aa2efb6ba875a12f2b4e37b0ba8052810e73fa5e2d3280f747f7b89b956f": { "describe": { "columns": [], @@ -10513,6 +10561,26 @@ }, "query": "UPDATE eth_txs SET gas_used = $1, confirmed_eth_tx_history_id = $2 WHERE id = $3" }, + "e8629da5d4269a565f1043969d29df2ccfbebafdce96d58c601860f30d61b4a0": { + "describe": { + "columns": [ + { + "name": "count!", + "ordinal": 0, + "type_info": "Int8" + } + ], + "nullable": [ + null + ], + "parameters": { + "Left": [ + "Int8" + ] + } + }, + "query": "SELECT COUNT(*) as \"count!\" FROM miniblocks WHERE number = $1 AND consensus IS NOT NULL" + }, "e8988deed66ad9d10be89e89966082aeb920c5dc91eb5fad16bd0d3118708c2e": { "describe": { "columns": [ diff --git a/core/lib/dal/src/blocks_dal.rs b/core/lib/dal/src/blocks_dal.rs index 16e926393fbc..8eeb994fd586 100644 --- a/core/lib/dal/src/blocks_dal.rs +++ b/core/lib/dal/src/blocks_dal.rs @@ -396,6 +396,30 @@ impl BlocksDal<'_, '_> { .await?; Ok(()) } + + /// Fetches the number of the last miniblock with consensus fields set. + /// Miniblocks with Consensus fields set constitute a prefix of sealed miniblocks, + /// so it is enough to traverse the miniblocks in descending order to find the last + /// with consensus fields. + /// + /// If better efficiency is needed we can add an index on "miniblocks without consensus fields". + pub async fn get_last_miniblock_number_with_consensus_fields( + &mut self, + ) -> anyhow::Result> { + let Some(row) = sqlx::query!("SELECT number FROM miniblocks WHERE consensus IS NOT NULL ORDER BY number DESC LIMIT 1") + .fetch_optional(self.storage.conn()) + .await? else { return Ok(None) }; + Ok(Some(MiniblockNumber(row.number.try_into()?))) + } + + /// Checks whether the specified miniblock has consensus field set. + pub async fn has_consensus_fields(&mut self, number: MiniblockNumber) -> sqlx::Result { + Ok(sqlx::query!("SELECT COUNT(*) as \"count!\" FROM miniblocks WHERE number = $1 AND consensus IS NOT NULL", number.0 as i64) + .fetch_one(self.storage.conn()) + .await? + .count > 0) + } + /// Sets consensus-related fields for the specified miniblock. pub async fn set_miniblock_consensus_fields( &mut self, diff --git a/core/lib/dal/src/consensus_dal.rs b/core/lib/dal/src/consensus_dal.rs new file mode 100644 index 000000000000..a258229fd29c --- /dev/null +++ b/core/lib/dal/src/consensus_dal.rs @@ -0,0 +1,59 @@ +use zksync_consensus_storage::ReplicaState; + +use crate::StorageProcessor; + +#[derive(Debug)] +pub struct ConsensusDal<'a, 'c> { + pub storage: &'a mut StorageProcessor<'c>, +} + +impl ConsensusDal<'_, '_> { + pub async fn replica_state(&mut self) -> anyhow::Result> { + let Some(row) = + sqlx::query!("SELECT state as \"state!\" FROM consensus_replica_state WHERE fake_key") + .fetch_optional(self.storage.conn()) + .await? + else { + return Ok(None); + }; + Ok(Some(zksync_protobuf::serde::deserialize(row.state)?)) + } + + pub async fn put_replica_state(&mut self, state: &ReplicaState) -> sqlx::Result<()> { + let state = + zksync_protobuf::serde::serialize(state, serde_json::value::Serializer).unwrap(); + sqlx::query!("INSERT INTO consensus_replica_state(fake_key,state) VALUES(true,$1) ON CONFLICT (fake_key) DO UPDATE SET state = excluded.state", state) + .execute(self.storage.conn()) + .await?; + Ok(()) + } +} + +#[cfg(test)] +mod tests { + use rand::Rng as _; + use zksync_consensus_storage::ReplicaState; + + use crate::ConnectionPool; + + #[tokio::test] + async fn replica_state_read_write() { + let pool = ConnectionPool::test_pool().await; + let mut conn = pool.access_storage().await.unwrap(); + assert!(conn + .consensus_dal() + .replica_state() + .await + .unwrap() + .is_none()); + let rng = &mut rand::thread_rng(); + for _ in 0..10 { + let want: ReplicaState = rng.gen(); + conn.consensus_dal().put_replica_state(&want).await.unwrap(); + assert_eq!( + Some(want), + conn.consensus_dal().replica_state().await.unwrap() + ); + } + } +} diff --git a/core/lib/dal/src/lib.rs b/core/lib/dal/src/lib.rs index f5c55524d6f3..05bf94271cdd 100644 --- a/core/lib/dal/src/lib.rs +++ b/core/lib/dal/src/lib.rs @@ -7,8 +7,8 @@ pub use crate::connection::ConnectionPool; use crate::{ accounts_dal::AccountsDal, basic_witness_input_producer_dal::BasicWitnessInputProducerDal, blocks_dal::BlocksDal, blocks_web3_dal::BlocksWeb3Dal, connection::holder::ConnectionHolder, - contract_verification_dal::ContractVerificationDal, eth_sender_dal::EthSenderDal, - events_dal::EventsDal, events_web3_dal::EventsWeb3Dal, + consensus_dal::ConsensusDal, contract_verification_dal::ContractVerificationDal, + eth_sender_dal::EthSenderDal, events_dal::EventsDal, events_web3_dal::EventsWeb3Dal, fri_gpu_prover_queue_dal::FriGpuProverQueueDal, fri_proof_compressor_dal::FriProofCompressorDal, fri_protocol_versions_dal::FriProtocolVersionsDal, fri_prover_dal::FriProverDal, @@ -31,6 +31,7 @@ pub mod basic_witness_input_producer_dal; pub mod blocks_dal; pub mod blocks_web3_dal; pub mod connection; +pub mod consensus_dal; pub mod contract_verification_dal; pub mod eth_sender_dal; pub mod events_dal; @@ -145,6 +146,10 @@ impl<'a> StorageProcessor<'a> { BlocksWeb3Dal { storage: self } } + pub fn consensus_dal(&mut self) -> ConsensusDal<'_, 'a> { + ConsensusDal { storage: self } + } + pub fn eth_sender_dal(&mut self) -> EthSenderDal<'_, 'a> { EthSenderDal { storage: self } } diff --git a/core/lib/types/Cargo.toml b/core/lib/types/Cargo.toml index aef8ce52d3a9..9eaab556565f 100644 --- a/core/lib/types/Cargo.toml +++ b/core/lib/types/Cargo.toml @@ -21,8 +21,8 @@ codegen = { git = "https://github.com/matter-labs/solidity_plonk_verifier.git", zkevm_test_harness = { git = "https://github.com/matter-labs/era-zkevm_test_harness.git", branch = "v1.3.3" } zk_evm_1_4_0 = { git = "https://github.com/matter-labs/era-zk_evm.git", branch = "v1.4.0", package = "zk_evm" } zk_evm = { git = "https://github.com/matter-labs/era-zk_evm.git", tag = "v1.3.3-rc2" } -zksync_consensus_roles = { version = "0.1.0", git = "https://github.com/matter-labs/era-consensus.git", rev = "da015d4c94b19962bc11622b6cc256e214256555" } -zksync_protobuf = { version = "0.1.0", git = "https://github.com/matter-labs/era-consensus.git", rev = "da015d4c94b19962bc11622b6cc256e214256555" } +zksync_consensus_roles = { version = "0.1.0", git = "https://github.com/matter-labs/era-consensus.git", rev = "49b1a98f80d0e9f74fdceadece4283e745c71599" } +zksync_protobuf = { version = "0.1.0", git = "https://github.com/matter-labs/era-consensus.git", rev = "49b1a98f80d0e9f74fdceadece4283e745c71599" } anyhow = "1.0.75" chrono = { version = "0.4", features = ["serde"] } @@ -52,4 +52,4 @@ tokio = { version = "1", features = ["rt", "macros"] } serde_with = { version = "1", features = ["hex"] } [build-dependencies] -zksync_protobuf_build = { version = "0.1.0", git = "https://github.com/matter-labs/era-consensus.git", rev = "da015d4c94b19962bc11622b6cc256e214256555" } +zksync_protobuf_build = { version = "0.1.0", git = "https://github.com/matter-labs/era-consensus.git", rev = "49b1a98f80d0e9f74fdceadece4283e745c71599" } diff --git a/core/lib/types/src/api/en.rs b/core/lib/types/src/api/en.rs index 24c92a458e37..d64e28981e70 100644 --- a/core/lib/types/src/api/en.rs +++ b/core/lib/types/src/api/en.rs @@ -9,7 +9,7 @@ use crate::ProtocolVersionId; /// Protobuf-encoded consensus-related L2 block (= miniblock) fields. /// See `zksync_dal::models::storage_sync::ConsensusBlockFields`. -#[derive(Debug, Clone, Serialize, Deserialize)] +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] #[serde(transparent)] pub struct ConsensusBlockFields(pub zksync_basic_types::Bytes); diff --git a/core/lib/zksync_core/Cargo.toml b/core/lib/zksync_core/Cargo.toml index 2d313a213675..b2904c0b3de4 100644 --- a/core/lib/zksync_core/Cargo.toml +++ b/core/lib/zksync_core/Cargo.toml @@ -40,11 +40,12 @@ vlog = { path = "../vlog" } multivm = { path = "../multivm" } # Consensus dependenices -zksync_concurrency = { version = "0.1.0", git = "https://github.com/matter-labs/era-consensus.git", rev = "da015d4c94b19962bc11622b6cc256e214256555" } -zksync_consensus_roles = { version = "0.1.0", git = "https://github.com/matter-labs/era-consensus.git", rev = "da015d4c94b19962bc11622b6cc256e214256555" } -zksync_consensus_storage = { version = "0.1.0", git = "https://github.com/matter-labs/era-consensus.git", rev = "da015d4c94b19962bc11622b6cc256e214256555" } -zksync_consensus_executor = { version = "0.1.0", git = "https://github.com/matter-labs/era-consensus.git", rev = "da015d4c94b19962bc11622b6cc256e214256555" } -zksync_protobuf = { version = "0.1.0", git = "https://github.com/matter-labs/era-consensus.git", rev = "da015d4c94b19962bc11622b6cc256e214256555" } +zksync_concurrency = { version = "0.1.0", git = "https://github.com/matter-labs/era-consensus.git", rev = "49b1a98f80d0e9f74fdceadece4283e745c71599" } +zksync_consensus_roles = { version = "0.1.0", git = "https://github.com/matter-labs/era-consensus.git", rev = "49b1a98f80d0e9f74fdceadece4283e745c71599" } +zksync_consensus_storage = { version = "0.1.0", git = "https://github.com/matter-labs/era-consensus.git", rev = "49b1a98f80d0e9f74fdceadece4283e745c71599" } +zksync_consensus_executor = { version = "0.1.0", git = "https://github.com/matter-labs/era-consensus.git", rev = "49b1a98f80d0e9f74fdceadece4283e745c71599" } +zksync_consensus_bft = { version = "0.1.0", git = "https://github.com/matter-labs/era-consensus.git", rev = "49b1a98f80d0e9f74fdceadece4283e745c71599" } +zksync_protobuf = { version = "0.1.0", git = "https://github.com/matter-labs/era-consensus.git", rev = "49b1a98f80d0e9f74fdceadece4283e745c71599" } prost = "0.12.1" serde = { version = "1.0", features = ["derive"] } @@ -98,4 +99,4 @@ tempfile = "3.0.2" test-casing = "0.1.2" [build-dependencies] -zksync_protobuf_build = { version = "0.1.0", git = "https://github.com/matter-labs/era-consensus.git", rev = "da015d4c94b19962bc11622b6cc256e214256555" } +zksync_protobuf_build = { version = "0.1.0", git = "https://github.com/matter-labs/era-consensus.git", rev = "49b1a98f80d0e9f74fdceadece4283e745c71599" } diff --git a/core/lib/zksync_core/src/consensus/mod.rs b/core/lib/zksync_core/src/consensus/mod.rs index a229666e76c8..34c73274fb5a 100644 --- a/core/lib/zksync_core/src/consensus/mod.rs +++ b/core/lib/zksync_core/src/consensus/mod.rs @@ -1,6 +1,63 @@ //! Consensus-related functionality. +use std::sync::Arc; + +use anyhow::Context as _; +use zksync_concurrency::{ctx, scope}; +use zksync_consensus_executor::{ConsensusConfig, Executor, ExecutorConfig}; +use zksync_consensus_roles::{node, validator}; +use zksync_dal::ConnectionPool; +use zksync_types::Address; mod payload; mod proto; +mod storage; + +#[cfg(test)] +pub(crate) mod testonly; +#[cfg(test)] +mod tests; + +pub(crate) use self::{payload::Payload, storage::sync_block_to_consensus_block}; + +#[derive(Debug)] +pub struct Config { + pub executor: ExecutorConfig, + pub consensus: ConsensusConfig, + pub node_key: node::SecretKey, + pub validator_key: validator::SecretKey, + pub operator_address: Address, +} -pub(crate) use self::payload::Payload; +impl Config { + #[allow(dead_code)] + pub async fn run(self, ctx: &ctx::Ctx, pool: ConnectionPool) -> anyhow::Result<()> { + anyhow::ensure!( + self.executor.validators + == validator::ValidatorSet::new(vec![self.validator_key.public()]).unwrap(), + "currently only consensus with just 1 validator is supported" + ); + let store = Arc::new( + storage::SignedBlockStore::new( + ctx, + pool, + &self.executor.genesis_block, + self.operator_address, + ) + .await?, + ); + let mut executor = Executor::new(ctx, self.executor, self.node_key, store.clone()).await?; + executor + .set_validator( + self.consensus, + self.validator_key, + store.clone(), + store.clone(), + ) + .context("executor.set_validator()")?; + scope::run!(&ctx, |ctx, s| async { + s.spawn_bg(store.run_background_tasks(ctx)); + executor.run(ctx).await + }) + .await + } +} diff --git a/core/lib/zksync_core/src/consensus/payload.rs b/core/lib/zksync_core/src/consensus/payload.rs index 8051d87ca585..be7a839c8f83 100644 --- a/core/lib/zksync_core/src/consensus/payload.rs +++ b/core/lib/zksync_core/src/consensus/payload.rs @@ -1,11 +1,14 @@ use anyhow::Context as _; use zksync_consensus_roles::validator; use zksync_protobuf::{required, ProtoFmt}; -use zksync_types::{api::en::SyncBlock, Address, L1BatchNumber, Transaction, H256}; +use zksync_types::{ + api::en::SyncBlock, Address, L1BatchNumber, ProtocolVersionId, Transaction, H256, +}; /// L2 block (= miniblock) payload. #[derive(Debug, PartialEq)] pub(crate) struct Payload { + pub protocol_version: ProtocolVersionId, pub hash: H256, pub l1_batch_number: L1BatchNumber, pub timestamp: u64, @@ -30,6 +33,9 @@ impl ProtoFmt for Payload { } Ok(Self { + protocol_version: required(&message.protocol_version) + .and_then(|x| Ok(ProtocolVersionId::try_from(u16::try_from(*x)?)?)) + .context("protocol_version")?, hash: required(&message.hash) .and_then(|bytes| Ok(<[u8; 32]>::try_from(bytes.as_slice())?.into())) .context("hash")?, @@ -49,6 +55,7 @@ impl ProtoFmt for Payload { } fn build(&self) -> Self::Proto { Self::Proto { + protocol_version: Some((self.protocol_version as u16).into()), hash: Some(self.hash.as_bytes().into()), l1_batch_number: Some(self.l1_batch_number.0), timestamp: Some(self.timestamp), @@ -74,6 +81,7 @@ impl TryFrom for Payload { fn try_from(block: SyncBlock) -> anyhow::Result { Ok(Self { + protocol_version: block.protocol_version, hash: block.hash.unwrap_or_default(), l1_batch_number: block.l1_batch_number, timestamp: block.timestamp, diff --git a/core/lib/zksync_core/src/consensus/proto/mod.proto b/core/lib/zksync_core/src/consensus/proto/mod.proto index 6199585899de..c5f99582875a 100644 --- a/core/lib/zksync_core/src/consensus/proto/mod.proto +++ b/core/lib/zksync_core/src/consensus/proto/mod.proto @@ -10,6 +10,8 @@ message Transaction { } message Payload { + // zksync-era ProtocolVersionId + optional uint32 protocol_version = 9; // required; u16 optional bytes hash = 1; // required; H256 optional uint32 l1_batch_number = 2; // required optional uint64 timestamp = 3; // required; seconds since UNIX epoch diff --git a/core/lib/zksync_core/src/consensus/storage.rs b/core/lib/zksync_core/src/consensus/storage.rs new file mode 100644 index 000000000000..d0feecf77949 --- /dev/null +++ b/core/lib/zksync_core/src/consensus/storage.rs @@ -0,0 +1,419 @@ +//! Storage implementation based on DAL. +use std::ops; + +use anyhow::Context as _; +use zksync_concurrency::{ctx, error::Wrap as _, sync, time}; +use zksync_consensus_bft::PayloadSource; +use zksync_consensus_roles::validator; +use zksync_consensus_storage::{BlockStore, ReplicaState, ReplicaStateStore, WriteBlockStore}; +use zksync_dal::{blocks_dal::ConsensusBlockFields, ConnectionPool}; +use zksync_types::{api::en::SyncBlock, Address, MiniblockNumber}; + +use crate::consensus; + +pub(crate) fn sync_block_to_consensus_block( + block: SyncBlock, +) -> anyhow::Result { + let number = validator::BlockNumber(block.number.0.into()); + let consensus = ConsensusBlockFields::decode( + block + .consensus + .as_ref() + .context("Missing consensus fields")?, + ) + .context("ConsensusBlockFields::decode()")?; + + let payload: consensus::Payload = block.try_into()?; + let payload = payload.encode(); + anyhow::ensure!(payload.hash() == consensus.justification.message.proposal.payload); + Ok(validator::FinalBlock { + header: validator::BlockHeader { + parent: consensus.parent, + number, + payload: payload.hash(), + }, + payload, + justification: consensus.justification, + }) +} + +/// Context-aware `zksync_dal::StorageProcessor` wrapper. +pub(super) struct StorageProcessor<'a>(zksync_dal::StorageProcessor<'a>); + +pub(super) async fn storage<'a>( + ctx: &ctx::Ctx, + pool: &'a ConnectionPool, +) -> ctx::Result> { + Ok(StorageProcessor( + ctx.wait(pool.access_storage_tagged("sync_layer")).await??, + )) +} + +impl<'a> StorageProcessor<'a> { + pub async fn start_transaction<'b, 'c: 'b>( + &'c mut self, + ctx: &ctx::Ctx, + ) -> ctx::Result> { + Ok(StorageProcessor( + ctx.wait(self.0.start_transaction()) + .await? + .context("sqlx")?, + )) + } + + pub async fn commit(self, ctx: &ctx::Ctx) -> ctx::Result<()> { + Ok(ctx.wait(self.0.commit()).await?.context("sqlx")?) + } + + async fn fetch_sync_block( + &mut self, + ctx: &ctx::Ctx, + number: validator::BlockNumber, + operator_address: Address, + ) -> ctx::Result> { + let number = MiniblockNumber(number.0.try_into().context("MiniblockNumber")?); + Ok(ctx + .wait(self.0.sync_dal().sync_block(number, operator_address, true)) + .await? + .context("sync_block()")?) + } + + pub async fn fetch_block( + &mut self, + ctx: &ctx::Ctx, + number: validator::BlockNumber, + operator_address: Address, + ) -> ctx::Result> { + let Some(block) = self.fetch_sync_block(ctx, number, operator_address).await? else { + return Ok(None); + }; + if block.consensus.is_none() { + return Ok(None); + } + Ok(Some( + sync_block_to_consensus_block(block).context("sync_block_to_consensus_block()")?, + )) + } + + pub async fn fetch_payload( + &mut self, + ctx: &ctx::Ctx, + block_number: validator::BlockNumber, + operator_address: Address, + ) -> ctx::Result> { + let Some(sync_block) = self + .fetch_sync_block(ctx, block_number, operator_address) + .await? + else { + return Ok(None); + }; + Ok(Some(sync_block.try_into()?)) + } + + pub async fn put_block( + &mut self, + ctx: &ctx::Ctx, + block: &validator::FinalBlock, + operator_address: Address, + ) -> ctx::Result<()> { + let n = MiniblockNumber( + block + .header + .number + .0 + .try_into() + .context("MiniblockNumber")?, + ); + let mut txn = self + .start_transaction(ctx) + .await + .wrap("start_transaction()")?; + + // We require the block to be already stored in Postgres when we set the consensus field. + let sync_block = txn + .fetch_sync_block(ctx, block.header.number, operator_address) + .await? + .context("unknown block")?; + let want = &ConsensusBlockFields { + parent: block.header.parent, + justification: block.justification.clone(), + }; + + // If consensus field is already set, just validate the stored value but don't override it. + if sync_block.consensus.is_some() { + sync_block_to_consensus_block(sync_block) + .context("an invalid block found in storage")?; + return Ok(()); + } + + // Verify that the payload matches the storage. + let want_payload: consensus::Payload = sync_block.try_into()?; + if want_payload.encode() != block.payload { + let got_payload = consensus::Payload::decode(&block.payload)?; + return Err(anyhow::anyhow!( + "payload mismatch: got {got_payload:?}, want {want_payload:?}" + ) + .into()); + } + + ctx.wait(txn.0.blocks_dal().set_miniblock_consensus_fields(n, want)) + .await? + .context("set_miniblock_consensus_fields()")?; + txn.commit(ctx).await.wrap("commit()")?; + Ok(()) + } + + pub async fn find_head_number( + &mut self, + ctx: &ctx::Ctx, + ) -> ctx::Result { + let head = ctx + .wait( + self.0 + .blocks_dal() + .get_last_miniblock_number_with_consensus_fields(), + ) + .await? + .context("get_last_miniblock_number_with_consensus_fields()")? + .context("head not found")?; + Ok(validator::BlockNumber(head.0.into())) + } + + pub async fn find_head_forward( + &mut self, + ctx: &ctx::Ctx, + start_at: validator::BlockNumber, + ) -> ctx::Result { + let mut head = MiniblockNumber(start_at.0.try_into().context("MiniblockNumber")?); + while ctx + .wait(self.0.blocks_dal().has_consensus_fields(head + 1)) + .await? + .context("has_consensus_fields()")? + { + head += 1; + } + Ok(validator::BlockNumber(head.0.into())) + } +} + +/// Postgres-based [`BlockStore`] implementation, which +/// considers blocks as stored <=> they have consensus field set. +#[derive(Debug)] +pub(super) struct SignedBlockStore { + genesis: validator::BlockNumber, + head: sync::watch::Sender, + pool: ConnectionPool, + operator_address: Address, +} + +impl SignedBlockStore { + /// Creates a new storage handle. `pool` should have multiple connections to work efficiently. + pub async fn new( + ctx: &ctx::Ctx, + pool: ConnectionPool, + genesis: &validator::FinalBlock, + operator_address: Address, + ) -> anyhow::Result { + // Ensure that genesis block has consensus field set in postgres. + let head = { + let mut storage = storage(ctx, &pool).await.wrap("storage()")?; + storage + .put_block(ctx, genesis, operator_address) + .await + .wrap("put_block()")?; + + // Find the last miniblock with consensus field set (aka head). + // We assume here that all blocks in range (genesis,head) also have consensus field set. + // WARNING: genesis should NEVER be moved to an earlier block. + storage + .find_head_number(ctx) + .await + .wrap("find_head_number()")? + }; + Ok(Self { + genesis: genesis.header.number, + head: sync::watch::channel(head).0, + pool, + operator_address, + }) + } +} + +#[async_trait::async_trait] +impl WriteBlockStore for SignedBlockStore { + /// Verify that `payload` is a correct proposal for the block `block_number`. + async fn verify_payload( + &self, + ctx: &ctx::Ctx, + block_number: validator::BlockNumber, + payload: &validator::Payload, + ) -> ctx::Result<()> { + let storage = &mut storage(ctx, &self.pool).await.wrap("storage()")?; + let want = storage + .fetch_payload(ctx, block_number, self.operator_address) + .await + .wrap("fetch_payload()")? + .context("unknown block")?; + let got = consensus::Payload::decode(payload).context("consensus::Payload::decode()")?; + if got != want { + return Err(anyhow::anyhow!("unexpected payload: got {got:?} want {want:?}").into()); + } + Ok(()) + } + + /// Puts a block into this storage. + async fn put_block(&self, ctx: &ctx::Ctx, block: &validator::FinalBlock) -> ctx::Result<()> { + let storage = &mut storage(ctx, &self.pool).await.wrap("storage()")?; + + // Currently main node is the only validator, so it should be the only one creating new + // blocks. To ensure that no gaps in the blocks are created we check here that we always + // insert the next block after the current head block. + let head = *self.head.borrow(); + let head = storage + .find_head_forward(ctx, head) + .await + .wrap("find_head_forward()")?; + if block.header.number != head.next() { + return Err(anyhow::anyhow!( + "expected block with number {}, got {}", + head.next(), + block.header.number + ) + .into()); + } + + storage + .put_block(ctx, block, self.operator_address) + .await + .wrap("put_block()") + } +} + +#[async_trait::async_trait] +impl BlockStore for SignedBlockStore { + async fn head_block(&self, ctx: &ctx::Ctx) -> ctx::Result { + let head = self.last_contiguous_block_number(ctx).await?; + Ok(self.block(ctx, head).await?.context("head block missing")?) + } + + async fn first_block(&self, ctx: &ctx::Ctx) -> ctx::Result { + Ok(self + .block(ctx, self.genesis) + .await? + .context("Genesis miniblock not present in Postgres")?) + } + + async fn last_contiguous_block_number( + &self, + ctx: &ctx::Ctx, + ) -> ctx::Result { + let storage = &mut storage(ctx, &self.pool).await.wrap("storage()")?; + let head = *self.head.borrow(); + storage + .find_head_forward(ctx, head) + .await + .wrap("find_head_forward()") + } + + async fn block( + &self, + ctx: &ctx::Ctx, + number: validator::BlockNumber, + ) -> ctx::Result> { + let storage = &mut storage(ctx, &self.pool).await.wrap("storage()")?; + storage + .fetch_block(ctx, number, self.operator_address) + .await + .wrap("fetch_block()") + } + + async fn missing_block_numbers( + &self, + ctx: &ctx::Ctx, + range: ops::Range, + ) -> ctx::Result> { + let last = self.last_contiguous_block_number(ctx).await?; + let mut output = vec![]; + output.extend((range.start.0..self.genesis.0).map(validator::BlockNumber)); + output.extend((last.next().0..range.end.0).map(validator::BlockNumber)); + Ok(output) + } + + fn subscribe_to_block_writes(&self) -> sync::watch::Receiver { + self.head.subscribe() + } +} + +#[async_trait::async_trait] +impl ReplicaStateStore for SignedBlockStore { + async fn replica_state(&self, ctx: &ctx::Ctx) -> ctx::Result> { + let storage = &mut storage(ctx, &self.pool).await.wrap("storage")?; + Ok(ctx + .wait(storage.0.consensus_dal().replica_state()) + .await? + .context("replica_state()")?) + } + + async fn put_replica_state( + &self, + ctx: &ctx::Ctx, + replica_state: &ReplicaState, + ) -> ctx::Result<()> { + let storage = &mut storage(ctx, &self.pool).await.wrap("storage")?; + Ok(ctx + .wait(storage.0.consensus_dal().put_replica_state(replica_state)) + .await? + .context("put_replica_state()")?) + } +} + +#[async_trait::async_trait] +impl PayloadSource for SignedBlockStore { + async fn propose( + &self, + ctx: &ctx::Ctx, + block_number: validator::BlockNumber, + ) -> ctx::Result { + const POLL_INTERVAL: time::Duration = time::Duration::milliseconds(50); + loop { + let storage = &mut storage(ctx, &self.pool).await.wrap("storage()")?; + if let Some(payload) = storage + .fetch_payload(ctx, block_number, self.operator_address) + .await + .wrap("fetch_payload()")? + { + return Ok(payload.encode()); + } + ctx.sleep(POLL_INTERVAL).await?; + } + } +} + +impl SignedBlockStore { + pub async fn run_background_tasks(&self, ctx: &ctx::Ctx) -> anyhow::Result<()> { + const POLL_INTERVAL: time::Duration = time::Duration::milliseconds(100); + let mut head = *self.head.borrow(); + let res: ctx::Result<()> = async { + loop { + let storage = &mut storage(ctx, &self.pool).await.wrap("storage()")?; + head = storage + .find_head_forward(ctx, head) + .await + .wrap("find_head_forward()")?; + self.head.send_if_modified(|x| { + if *x >= head { + return false; + } + *x = head; + true + }); + ctx.sleep(POLL_INTERVAL).await?; + } + } + .await; + match res.err().unwrap() { + ctx::Error::Canceled(_) => Ok(()), + ctx::Error::Internal(err) => Err(err), + } + } +} diff --git a/core/lib/zksync_core/src/consensus/testonly.rs b/core/lib/zksync_core/src/consensus/testonly.rs new file mode 100644 index 000000000000..c2a7a4ec475b --- /dev/null +++ b/core/lib/zksync_core/src/consensus/testonly.rs @@ -0,0 +1,370 @@ +use anyhow::Context as _; +use rand::Rng; +use zksync_concurrency::{ctx, scope, sync, time}; +use zksync_consensus_roles::validator; +use zksync_contracts::{BaseSystemContractsHashes, SystemContractCode}; +use zksync_dal::ConnectionPool; +use zksync_types::{ + api, block::MiniblockHasher, Address, L1BatchNumber, L2ChainId, MiniblockNumber, + ProtocolVersionId, H256, +}; + +use crate::{ + genesis::{ensure_genesis_state, GenesisParams}, + state_keeper::{ + tests::{create_l1_batch_metadata, create_l2_transaction, MockBatchExecutorBuilder}, + MiniblockSealer, ZkSyncStateKeeper, + }, + sync_layer::{ + sync_action::{ActionQueue, ActionQueueSender, SyncAction}, + ExternalIO, MainNodeClient, SyncState, + }, +}; + +#[derive(Debug, Default)] +pub(crate) struct MockMainNodeClient { + prev_miniblock_hash: H256, + l2_blocks: Vec, +} + +impl MockMainNodeClient { + /// `miniblock_count` doesn't include a fictive miniblock. Returns hashes of generated transactions. + pub fn push_l1_batch(&mut self, miniblock_count: u32) -> Vec { + let l1_batch_number = self + .l2_blocks + .last() + .map_or(L1BatchNumber(0), |block| block.l1_batch_number + 1); + let number_offset = self.l2_blocks.len() as u32; + + let mut tx_hashes = vec![]; + let l2_blocks = (0..=miniblock_count).map(|number| { + let is_fictive = number == miniblock_count; + let number = number + number_offset; + let mut hasher = MiniblockHasher::new( + MiniblockNumber(number), + number.into(), + self.prev_miniblock_hash, + ); + + let transactions = if is_fictive { + vec![] + } else { + let transaction = create_l2_transaction(10, 100); + tx_hashes.push(transaction.hash()); + hasher.push_tx_hash(transaction.hash()); + vec![transaction.into()] + }; + let miniblock_hash = hasher.finalize(if number == 0 { + ProtocolVersionId::Version0 // The genesis block always uses the legacy hashing mode + } else { + ProtocolVersionId::latest() + }); + self.prev_miniblock_hash = miniblock_hash; + + api::en::SyncBlock { + number: MiniblockNumber(number), + l1_batch_number, + last_in_batch: is_fictive, + timestamp: number.into(), + l1_gas_price: 2, + l2_fair_gas_price: 3, + base_system_contracts_hashes: BaseSystemContractsHashes::default(), + operator_address: Address::repeat_byte(2), + transactions: Some(transactions), + virtual_blocks: Some(!is_fictive as u32), + hash: Some(miniblock_hash), + protocol_version: ProtocolVersionId::latest(), + consensus: None, + } + }); + + self.l2_blocks.extend(l2_blocks); + tx_hashes + } +} + +#[async_trait::async_trait] +impl MainNodeClient for MockMainNodeClient { + async fn fetch_system_contract_by_hash( + &self, + _hash: H256, + ) -> anyhow::Result { + anyhow::bail!("Not implemented"); + } + + async fn fetch_genesis_contract_bytecode( + &self, + _address: Address, + ) -> anyhow::Result>> { + anyhow::bail!("Not implemented"); + } + + async fn fetch_protocol_version( + &self, + _protocol_version: ProtocolVersionId, + ) -> anyhow::Result { + anyhow::bail!("Not implemented"); + } + + async fn fetch_genesis_l1_batch_hash(&self) -> anyhow::Result { + anyhow::bail!("Not implemented"); + } + + async fn fetch_l2_block_number(&self) -> anyhow::Result { + if let Some(number) = self.l2_blocks.len().checked_sub(1) { + Ok(MiniblockNumber(number as u32)) + } else { + anyhow::bail!("Not implemented"); + } + } + + async fn fetch_l2_block( + &self, + number: MiniblockNumber, + with_transactions: bool, + ) -> anyhow::Result> { + let Some(mut block) = self.l2_blocks.get(number.0 as usize).cloned() else { + return Ok(None); + }; + if !with_transactions { + block.transactions = None; + } + Ok(Some(block)) + } +} + +pub(crate) struct StateKeeperHandle { + next_batch: L1BatchNumber, + next_block: MiniblockNumber, + next_timestamp: u64, + batch_sealed: bool, + + fee_per_gas: u64, + gas_per_pubdata: u32, + operator_address: Address, + + actions_sender: ActionQueueSender, +} + +pub(crate) struct StateKeeperRunner { + actions_queue: ActionQueue, + operator_address: Address, +} + +impl StateKeeperHandle { + pub fn new(operator_address: Address) -> (Self, StateKeeperRunner) { + let (actions_sender, actions_queue) = ActionQueue::new(); + ( + Self { + next_batch: L1BatchNumber(1), + next_block: MiniblockNumber(1), + next_timestamp: 124356, + batch_sealed: true, + fee_per_gas: 10, + gas_per_pubdata: 100, + operator_address, + actions_sender, + }, + StateKeeperRunner { + operator_address, + actions_queue, + }, + ) + } + + fn open_block(&mut self) -> SyncAction { + if self.batch_sealed { + let action = SyncAction::OpenBatch { + number: self.next_batch, + timestamp: self.next_timestamp, + l1_gas_price: 2, + l2_fair_gas_price: 3, + operator_address: self.operator_address, + protocol_version: ProtocolVersionId::latest(), + first_miniblock_info: (self.next_block, 1), + }; + self.next_batch += 1; + self.next_block += 1; + self.next_timestamp += 5; + self.batch_sealed = false; + action + } else { + let action = SyncAction::Miniblock { + number: self.next_block, + timestamp: self.next_timestamp, + virtual_blocks: 0, + }; + self.next_block += 1; + self.next_timestamp += 2; + action + } + } + + pub async fn push_block(&mut self, transactions: usize) { + assert!(transactions > 0); + let mut actions = vec![self.open_block()]; + for _ in 0..transactions { + let tx = create_l2_transaction(self.fee_per_gas, self.gas_per_pubdata); + actions.push(SyncAction::Tx(Box::new(tx.into()))); + } + actions.push(SyncAction::SealMiniblock(None)); + self.actions_sender.push_actions(actions).await; + } + + pub async fn seal_batch(&mut self) { + // Each batch ends with an empty block (aka fictive block). + let mut actions = vec![self.open_block()]; + actions.push(SyncAction::SealBatch { + virtual_blocks: 0, + consensus: None, + }); + self.actions_sender.push_actions(actions).await; + self.batch_sealed = true; + } + + pub async fn push_random_blocks(&mut self, rng: &mut impl Rng, count: usize) { + for _ in 0..count { + // 20% chance to seal an L1 batch. + // seal_batch() also produces a (fictive) block. + if rng.gen_range(0..100) < 20 { + self.seal_batch().await; + } else { + self.push_block(rng.gen_range(3..8)).await; + } + } + } + + // Wait for all pushed miniblocks to be produced. + pub async fn sync(&self, ctx: &ctx::Ctx, pool: &ConnectionPool) -> anyhow::Result<()> { + const POLL_INTERVAL: time::Duration = time::Duration::milliseconds(100); + loop { + let mut storage = pool.access_storage().await.context("access_storage()")?; + let head = storage + .blocks_dal() + .get_sealed_miniblock_number() + .await + .context("get_sealed_miniblock_number()")?; + if head.0 >= self.next_block.0 - 1 { + return Ok(()); + } + ctx.sleep(POLL_INTERVAL).await?; + } + } + + // Wait for all pushed miniblocks to have consensus certificate. + pub async fn sync_consensus( + &self, + ctx: &ctx::Ctx, + pool: &ConnectionPool, + ) -> anyhow::Result<()> { + const POLL_INTERVAL: time::Duration = time::Duration::milliseconds(100); + loop { + let mut storage = pool.access_storage().await.context("access_storage()")?; + if let Some(head) = storage + .blocks_dal() + .get_last_miniblock_number_with_consensus_fields() + .await + .context("get_last_miniblock_number_with_consensus_fields()")? + { + if head.0 >= self.next_block.0 - 1 { + return Ok(()); + } + } + ctx.sleep(POLL_INTERVAL).await?; + } + } + + /// Validate consensus certificates for all expected miniblocks. + pub async fn validate_consensus( + &self, + ctx: &ctx::Ctx, + pool: &ConnectionPool, + genesis: validator::BlockNumber, + validators: &validator::ValidatorSet, + ) -> anyhow::Result<()> { + let mut storage = super::storage::storage(ctx, pool) + .await + .context("storage()")?; + for i in genesis.0..self.next_block.0 as u64 { + let block = storage + .fetch_block(ctx, validator::BlockNumber(i), self.operator_address) + .await? + .with_context(|| format!("missing block {i}"))?; + block.validate(validators, 1).unwrap(); + } + Ok(()) + } +} + +// Waits for L1 batches to be sealed and then populates them with mock metadata. +async fn run_mock_metadata_calculator(ctx: &ctx::Ctx, pool: ConnectionPool) -> anyhow::Result<()> { + const POLL_INTERVAL: time::Duration = time::Duration::milliseconds(100); + let mut n = L1BatchNumber(1); + while let Ok(()) = ctx.sleep(POLL_INTERVAL).await { + let mut storage = pool.access_storage().await.context("access_storage()")?; + let last = storage + .blocks_dal() + .get_sealed_l1_batch_number() + .await + .context("get_sealed_l1_batch_number()")?; + + while n <= last { + let metadata = create_l1_batch_metadata(n.0); + storage + .blocks_dal() + .save_l1_batch_metadata(n, &metadata, H256::zero(), false) + .await + .context("save_l1_batch_metadata()")?; + n += 1; + } + } + Ok(()) +} + +impl StateKeeperRunner { + pub async fn run(self, ctx: &ctx::Ctx, pool: &ConnectionPool) -> anyhow::Result<()> { + scope::run!(ctx, |ctx, s| async { + let mut storage = pool.access_storage().await.context("access_storage()")?; + // ensure genesis + if storage + .blocks_dal() + .is_genesis_needed() + .await + .context("is_genesis_needed()")? + { + let mut params = GenesisParams::mock(); + params.first_validator = self.operator_address; + ensure_genesis_state(&mut storage, L2ChainId::default(), ¶ms) + .await + .context("ensure_genesis_state()")?; + } + let (stop_sender, stop_receiver) = sync::watch::channel(false); + let (miniblock_sealer, miniblock_sealer_handle) = MiniblockSealer::new(pool.clone(), 5); + let io = ExternalIO::new( + miniblock_sealer_handle, + pool.clone(), + self.actions_queue, + SyncState::new(), + Box::::default(), + self.operator_address, + u32::MAX, + L2ChainId::default(), + ) + .await; + s.spawn_bg(miniblock_sealer.run()); + s.spawn_bg(run_mock_metadata_calculator(ctx, pool.clone())); + s.spawn_bg( + ZkSyncStateKeeper::without_sealer( + stop_receiver, + Box::new(io), + Box::new(MockBatchExecutorBuilder), + ) + .run(), + ); + ctx.canceled().await; + stop_sender.send_replace(true); + Ok(()) + }) + .await + } +} diff --git a/core/lib/zksync_core/src/consensus/tests.rs b/core/lib/zksync_core/src/consensus/tests.rs new file mode 100644 index 000000000000..be8ff9bacb23 --- /dev/null +++ b/core/lib/zksync_core/src/consensus/tests.rs @@ -0,0 +1,81 @@ +use zksync_concurrency::{ctx, scope}; +use zksync_consensus_executor::testonly::FullValidatorConfig; +use zksync_consensus_roles::validator; +use zksync_dal::ConnectionPool; +use zksync_types::Address; + +use super::*; + +// In the current implementation, consensus certificates are created asynchronously +// for the miniblocks constructed by the StateKeeper. This means that consensus actor +// is effectively just backfilling the consensus certificates for the miniblocks in storage. +#[tokio::test(flavor = "multi_thread")] +async fn test_backfill() { + const OPERATOR_ADDRESS: Address = Address::repeat_byte(17); + const GENESIS_BLOCK: validator::BlockNumber = validator::BlockNumber(5); + + zksync_concurrency::testonly::abort_on_panic(); + let ctx = &ctx::test_root(&ctx::RealClock); + let rng = &mut ctx.rng(); + let pool = ConnectionPool::test_pool().await; + + scope::run!(ctx, |ctx, s| async { + // Start state keeper. + let (mut sk, sk_runner) = testonly::StateKeeperHandle::new(OPERATOR_ADDRESS); + s.spawn_bg(sk_runner.run(ctx, &pool)); + + // Populate storage with a bunch of blocks. + sk.push_random_blocks(rng, 10).await; + sk.sync(ctx, &pool).await.context("sk.sync(<1st phase>)")?; + + // Prepare genesis block for consensus. + let genesis_payload = { + let mut storage = storage::storage(ctx, &pool).await.context("storage()")?; + storage + .fetch_payload(ctx, GENESIS_BLOCK, OPERATOR_ADDRESS) + .await + .context("fetch_payload()")? + .context("genesis block missing")? + }; + let cfg = FullValidatorConfig::for_single_validator( + &mut ctx.rng(), + genesis_payload.encode(), + GENESIS_BLOCK, + ); + let validators = cfg.node_config.validators.clone(); + + // Start consensus actor and wait for it to catch up. + let cfg = Config { + executor: cfg.node_config, + consensus: cfg.consensus_config, + node_key: cfg.node_key, + validator_key: cfg.validator_key, + operator_address: OPERATOR_ADDRESS, + }; + s.spawn_bg(cfg.run(ctx, pool.clone())); + sk.sync_consensus(ctx, &pool) + .await + .context("sk.sync_consensus(<1st phase>)")?; + + // Generate couple more blocks and wait for consensus to catch up. + sk.push_random_blocks(rng, 7).await; + sk.sync_consensus(ctx, &pool) + .await + .context("sk.sync_consensus(<2nd phase>)")?; + + // Synchronously produce blocks one by one, and wait for consensus. + for _ in 0..5 { + sk.push_random_blocks(rng, 1).await; + sk.sync_consensus(ctx, &pool) + .await + .context("sk.sync_consensus(<3rd phase>)")?; + } + + sk.validate_consensus(ctx, &pool, GENESIS_BLOCK, &validators) + .await + .context("sk.validate_consensus()")?; + Ok(()) + }) + .await + .unwrap(); +} diff --git a/core/lib/zksync_core/src/state_keeper/tests/mod.rs b/core/lib/zksync_core/src/state_keeper/tests/mod.rs index 7ad0116078ca..26458956af7a 100644 --- a/core/lib/zksync_core/src/state_keeper/tests/mod.rs +++ b/core/lib/zksync_core/src/state_keeper/tests/mod.rs @@ -29,11 +29,13 @@ use zksync_types::{ StorageLogQuery, StorageLogQueryType, Timestamp, Transaction, H256, U256, }; -pub(crate) use self::tester::TestBatchExecutorBuilder; +mod tester; + use self::tester::{ bootloader_tip_out_of_gas, pending_batch_data, random_tx, rejected_exec, successful_exec, successful_exec_with_metrics, TestScenario, }; +pub(crate) use self::tester::{MockBatchExecutorBuilder, TestBatchExecutorBuilder}; use crate::{ gas_tracker::l1_batch_base_cost, state_keeper::{ @@ -47,8 +49,6 @@ use crate::{ }, }; -mod tester; - pub(super) static BASE_SYSTEM_CONTRACTS: Lazy = Lazy::new(BaseSystemContracts::load_from_disk); diff --git a/core/lib/zksync_core/src/state_keeper/tests/tester.rs b/core/lib/zksync_core/src/state_keeper/tests/tester.rs index a2dc7f05c5e8..2c26370a22a9 100644 --- a/core/lib/zksync_core/src/state_keeper/tests/tester.rs +++ b/core/lib/zksync_core/src/state_keeper/tests/tester.rs @@ -770,3 +770,35 @@ impl StateKeeperIO for TestIO { None } } + +/// `L1BatchExecutorBuilder` which doesn't check anything at all. +/// Accepts all transactions. +#[derive(Debug)] +pub(crate) struct MockBatchExecutorBuilder; + +#[async_trait] +impl L1BatchExecutorBuilder for MockBatchExecutorBuilder { + async fn init_batch( + &mut self, + _l1batch_params: L1BatchEnv, + _system_env: SystemEnv, + ) -> BatchExecutorHandle { + let (send, recv) = tokio::sync::mpsc::channel(1); + let handle = tokio::task::spawn(async { + let mut recv = recv; + while let Some(cmd) = recv.recv().await { + match cmd { + Command::ExecuteTx(_, resp) => resp.send(successful_exec()).unwrap(), + Command::StartNextMiniblock(_, resp) => resp.send(()).unwrap(), + Command::RollbackLastTx(_) => panic!("unexpected rollback"), + Command::FinishBatch(resp) => { + // Blanket result, it doesn't really matter. + resp.send((default_vm_block_result(), None)).unwrap(); + return; + } + } + } + }); + BatchExecutorHandle::from_raw(handle, send) + } +} diff --git a/core/lib/zksync_core/src/sync_layer/gossip/buffered/mod.rs b/core/lib/zksync_core/src/sync_layer/gossip/buffered/mod.rs index 5f2308930a3f..c4680cc3d7d2 100644 --- a/core/lib/zksync_core/src/sync_layer/gossip/buffered/mod.rs +++ b/core/lib/zksync_core/src/sync_layer/gossip/buffered/mod.rs @@ -9,8 +9,8 @@ use zksync_concurrency::{ ctx, scope, sync::{self, watch, Mutex}, }; -use zksync_consensus_roles::validator::{BlockNumber, FinalBlock}; -use zksync_consensus_storage::{BlockStore, StorageError, StorageResult, WriteBlockStore}; +use zksync_consensus_roles::validator::{BlockNumber, FinalBlock, Payload}; +use zksync_consensus_storage::{BlockStore, WriteBlockStore}; use super::{ metrics::{BlockResponseKind, METRICS}, @@ -38,7 +38,7 @@ pub(super) trait ContiguousBlockStore: BlockStore { /// /// - with the next block (i.e., one immediately after [`BlockStore::head_block()`]) /// - sequentially (i.e., multiple blocks cannot be scheduled at once) - async fn schedule_next_block(&self, ctx: &ctx::Ctx, block: &FinalBlock) -> StorageResult<()>; + async fn schedule_next_block(&self, ctx: &ctx::Ctx, block: &FinalBlock) -> ctx::Result<()>; } /// In-memory buffer or [`FinalBlock`]s received from peers, but not executed and persisted locally yet. @@ -207,7 +207,7 @@ impl Buffered { /// Schedules blocks in the underlying store as they are pushed to this store. #[tracing::instrument(level = "trace", skip_all, err)] - async fn schedule_blocks(&self, ctx: &ctx::Ctx) -> StorageResult<()> { + async fn schedule_blocks(&self, ctx: &ctx::Ctx) -> ctx::Result<()> { let mut blocks_subscriber = self.block_writes_sender.subscribe(); let mut next_scheduled_block_number = { @@ -248,7 +248,7 @@ impl Buffered { /// Runs background tasks for this store. This method **must** be spawned as a background task /// which should be running as long at the [`Buffered`] is in use; otherwise, it will function incorrectly. - pub async fn run_background_tasks(&self, ctx: &ctx::Ctx) -> StorageResult<()> { + pub async fn run_background_tasks(&self, ctx: &ctx::Ctx) -> ctx::Result<()> { scope::run!(ctx, |ctx, s| { s.spawn(async { self.listen_to_updates(ctx).await; @@ -262,7 +262,7 @@ impl Buffered { #[async_trait] impl BlockStore for Buffered { - async fn head_block(&self, ctx: &ctx::Ctx) -> StorageResult { + async fn head_block(&self, ctx: &ctx::Ctx) -> ctx::Result { let buffered_head_block = sync::lock(ctx, &self.buffer).await?.head_block(); if let Some(block) = buffered_head_block { return Ok(block); @@ -270,22 +270,18 @@ impl BlockStore for Buffered { self.inner.head_block(ctx).await } - async fn first_block(&self, ctx: &ctx::Ctx) -> StorageResult { + async fn first_block(&self, ctx: &ctx::Ctx) -> ctx::Result { // First block is always situated in the underlying store self.inner.first_block(ctx).await } - async fn last_contiguous_block_number(&self, ctx: &ctx::Ctx) -> StorageResult { + async fn last_contiguous_block_number(&self, ctx: &ctx::Ctx) -> ctx::Result { Ok(sync::lock(ctx, &self.buffer) .await? .last_contiguous_block_number()) } - async fn block( - &self, - ctx: &ctx::Ctx, - number: BlockNumber, - ) -> StorageResult> { + async fn block(&self, ctx: &ctx::Ctx, number: BlockNumber) -> ctx::Result> { let started_at = Instant::now(); { let buffer = sync::lock(ctx, &self.buffer).await?; @@ -305,7 +301,7 @@ impl BlockStore for Buffered { &self, ctx: &ctx::Ctx, range: ops::Range, - ) -> StorageResult> { + ) -> ctx::Result> { // By design, the underlying store has no missing blocks. Ok(sync::lock(ctx, &self.buffer) .await? @@ -319,16 +315,32 @@ impl BlockStore for Buffered { #[async_trait] impl WriteBlockStore for Buffered { - async fn put_block(&self, ctx: &ctx::Ctx, block: &FinalBlock) -> StorageResult<()> { + /// Verify that `payload` is a correct proposal for the block `block_number`. + async fn verify_payload( + &self, + _ctx: &ctx::Ctx, + _block_number: BlockNumber, + _payload: &Payload, + ) -> ctx::Result<()> { + // This is storage for non-validator nodes (aka full nodes), + // so verify_payload() won't be called. + // Still, it probably would be better to either + // * move verify_payload() to BlockStore, so that Buffered can just forward the call + // * create another separate trait for verify_payload. + // It will be clear what needs to be done when we implement multi-validator consensus for + // zksync-era. + unimplemented!() + } + + async fn put_block(&self, ctx: &ctx::Ctx, block: &FinalBlock) -> ctx::Result<()> { let buffer_block_latency = METRICS.buffer_block_latency.start(); { let mut buffer = sync::lock(ctx, &self.buffer).await?; let block_number = block.header.number; if block_number <= buffer.store_block_number { - let err = anyhow::anyhow!( + return Err(anyhow::anyhow!( "Cannot replace a block #{block_number} since it is already present in the underlying storage", - ); - return Err(StorageError::Database(err)); + ).into()); } buffer.put_block(block.clone()); } diff --git a/core/lib/zksync_core/src/sync_layer/gossip/buffered/tests.rs b/core/lib/zksync_core/src/sync_layer/gossip/buffered/tests.rs index c5fd860ab87d..acf15416ab26 100644 --- a/core/lib/zksync_core/src/sync_layer/gossip/buffered/tests.rs +++ b/core/lib/zksync_core/src/sync_layer/gossip/buffered/tests.rs @@ -14,14 +14,14 @@ use zksync_concurrency::{ time, }; use zksync_consensus_roles::validator::{BlockHeader, BlockNumber, FinalBlock, Payload}; -use zksync_consensus_storage::{BlockStore, InMemoryStorage, StorageResult, WriteBlockStore}; +use zksync_consensus_storage::{BlockStore, InMemoryStorage, WriteBlockStore}; use super::*; fn init_store(rng: &mut impl Rng) -> (FinalBlock, InMemoryStorage) { let payload = Payload(vec![]); let genesis_block = FinalBlock { - header: BlockHeader::genesis(payload.hash()), + header: BlockHeader::genesis(payload.hash(), BlockNumber(0)), payload, justification: rng.gen(), }; @@ -66,7 +66,7 @@ impl MockContiguousStore { &self, ctx: &ctx::Ctx, mut block_receiver: channel::UnboundedReceiver, - ) -> StorageResult<()> { + ) -> ctx::Result<()> { let rng = &mut ctx.rng(); while let Ok(block) = block_receiver.recv(ctx).await { let head_block_number = self.head_block(ctx).await?.header.number; @@ -82,23 +82,19 @@ impl MockContiguousStore { #[async_trait] impl BlockStore for MockContiguousStore { - async fn head_block(&self, ctx: &ctx::Ctx) -> StorageResult { + async fn head_block(&self, ctx: &ctx::Ctx) -> ctx::Result { self.inner.head_block(ctx).await } - async fn first_block(&self, ctx: &ctx::Ctx) -> StorageResult { + async fn first_block(&self, ctx: &ctx::Ctx) -> ctx::Result { self.inner.first_block(ctx).await } - async fn last_contiguous_block_number(&self, ctx: &ctx::Ctx) -> StorageResult { + async fn last_contiguous_block_number(&self, ctx: &ctx::Ctx) -> ctx::Result { self.inner.last_contiguous_block_number(ctx).await } - async fn block( - &self, - ctx: &ctx::Ctx, - number: BlockNumber, - ) -> StorageResult> { + async fn block(&self, ctx: &ctx::Ctx, number: BlockNumber) -> ctx::Result> { self.inner.block(ctx, number).await } @@ -106,7 +102,7 @@ impl BlockStore for MockContiguousStore { &self, ctx: &ctx::Ctx, range: ops::Range, - ) -> StorageResult> { + ) -> ctx::Result> { self.inner.missing_block_numbers(ctx, range).await } @@ -117,7 +113,7 @@ impl BlockStore for MockContiguousStore { #[async_trait] impl ContiguousBlockStore for MockContiguousStore { - async fn schedule_next_block(&self, _ctx: &ctx::Ctx, block: &FinalBlock) -> StorageResult<()> { + async fn schedule_next_block(&self, _ctx: &ctx::Ctx, block: &FinalBlock) -> ctx::Result<()> { tracing::trace!(block_number = block.header.number.0, "Scheduled next block"); self.block_sender.send(block.clone()); Ok(()) diff --git a/core/lib/zksync_core/src/sync_layer/gossip/conversions.rs b/core/lib/zksync_core/src/sync_layer/gossip/conversions.rs index 44baa9e8058a..d991fbb4924b 100644 --- a/core/lib/zksync_core/src/sync_layer/gossip/conversions.rs +++ b/core/lib/zksync_core/src/sync_layer/gossip/conversions.rs @@ -1,42 +1,11 @@ //! Conversion logic between server and consensus types. - use anyhow::Context as _; -use zksync_consensus_roles::validator::{BlockHeader, BlockNumber, FinalBlock}; +use zksync_consensus_roles::validator::FinalBlock; use zksync_dal::blocks_dal::ConsensusBlockFields; -use zksync_types::{api::en, MiniblockNumber, ProtocolVersionId}; +use zksync_types::MiniblockNumber; use crate::{consensus, sync_layer::fetcher::FetchedBlock}; -pub(super) fn sync_block_to_consensus_block(block: en::SyncBlock) -> anyhow::Result { - let number = BlockNumber(block.number.0.into()); - let consensus = block - .consensus - .as_ref() - .context("Missing consensus fields")?; - let consensus = - ConsensusBlockFields::decode(consensus).context("ConsensusBlockFields::decode()")?; - let consensus_protocol_version = consensus.justification.message.protocol_version.as_u32(); - let block_protocol_version = block.protocol_version as u32; - anyhow::ensure!( - consensus_protocol_version == block_protocol_version, - "Protocol version for justification ({consensus_protocol_version}) differs from \ - SyncBlock.protocol_version={block_protocol_version}" - ); - - let payload: consensus::Payload = block.try_into().context("Missing `SyncBlock` data")?; - let payload = payload.encode(); - let header = BlockHeader { - parent: consensus.parent, - number, - payload: payload.hash(), - }; - Ok(FinalBlock { - header, - payload, - justification: consensus.justification, - }) -} - impl FetchedBlock { pub(super) fn from_gossip_block( block: &FinalBlock, @@ -47,17 +16,11 @@ impl FetchedBlock { let payload = consensus::Payload::decode(&block.payload) .context("Failed deserializing block payload")?; - let protocol_version = block.justification.message.protocol_version; - let protocol_version = - u16::try_from(protocol_version.as_u32()).context("Invalid protocol version")?; - let protocol_version = ProtocolVersionId::try_from(protocol_version) - .with_context(|| format!("Unsupported protocol version: {protocol_version}"))?; - Ok(Self { number: MiniblockNumber(number), l1_batch_number: payload.l1_batch_number, last_in_batch, - protocol_version, + protocol_version: payload.protocol_version, timestamp: payload.timestamp, reference_hash: Some(payload.hash), l1_gas_price: payload.l1_gas_price, diff --git a/core/lib/zksync_core/src/sync_layer/gossip/mod.rs b/core/lib/zksync_core/src/sync_layer/gossip/mod.rs index 9d769ab65f30..9360a169bbed 100644 --- a/core/lib/zksync_core/src/sync_layer/gossip/mod.rs +++ b/core/lib/zksync_core/src/sync_layer/gossip/mod.rs @@ -4,10 +4,11 @@ use std::sync::Arc; use anyhow::Context as _; use tokio::sync::watch; -use zksync_concurrency::{ctx, scope}; +use zksync_concurrency::{ctx, error::Wrap as _, scope}; use zksync_consensus_executor::{Executor, ExecutorConfig}; use zksync_consensus_roles::node; use zksync_dal::ConnectionPool; +use zksync_types::Address; use self::{buffered::Buffered, storage::PostgresBlockStorage}; use super::{fetcher::FetcherCursor, sync_action::ActionQueueSender}; @@ -27,6 +28,7 @@ pub async fn run_gossip_fetcher( executor_config: ExecutorConfig, node_key: node::SecretKey, mut stop_receiver: watch::Receiver, + operator_address: Address, ) -> anyhow::Result<()> { scope::run!(&ctx::root(), |ctx, s| async { s.spawn_bg(run_gossip_fetcher_inner( @@ -35,6 +37,7 @@ pub async fn run_gossip_fetcher( actions, executor_config, node_key, + operator_address, )); if stop_receiver.changed().await.is_err() { tracing::warn!( @@ -53,6 +56,7 @@ async fn run_gossip_fetcher_inner( actions: ActionQueueSender, executor_config: ExecutorConfig, node_key: node::SecretKey, + operator_address: Address, ) -> anyhow::Result<()> { tracing::info!( "Starting gossip fetcher with {executor_config:?} and node key {:?}", @@ -63,12 +67,21 @@ async fn run_gossip_fetcher_inner( .access_storage_tagged("sync_layer") .await .context("Failed acquiring Postgres connection for cursor")?; - let cursor = FetcherCursor::new(&mut storage).await?; + let cursor = FetcherCursor::new(&mut storage) + .await + .context("FetcherCursor::new()")?; drop(storage); - let store = - PostgresBlockStorage::new(ctx, pool, actions, cursor, &executor_config.genesis_block) - .await?; + let store = PostgresBlockStorage::new( + ctx, + pool, + actions, + cursor, + &executor_config.genesis_block, + operator_address, + ) + .await + .wrap("PostgresBlockStorage::new()")?; let buffered = Arc::new(Buffered::new(store)); let store = buffered.inner(); diff --git a/core/lib/zksync_core/src/sync_layer/gossip/storage/mod.rs b/core/lib/zksync_core/src/sync_layer/gossip/storage/mod.rs index 1e35d17daaf0..bc5aeb946b27 100644 --- a/core/lib/zksync_core/src/sync_layer/gossip/storage/mod.rs +++ b/core/lib/zksync_core/src/sync_layer/gossip/storage/mod.rs @@ -6,26 +6,28 @@ use anyhow::Context as _; use async_trait::async_trait; use zksync_concurrency::{ ctx, + error::Wrap as _, sync::{self, watch, Mutex}, time, }; use zksync_consensus_roles::validator::{BlockNumber, FinalBlock}; -use zksync_consensus_storage::{BlockStore, StorageError, StorageResult}; +use zksync_consensus_storage::BlockStore; use zksync_dal::{blocks_dal::ConsensusBlockFields, ConnectionPool, StorageProcessor}; use zksync_types::{api::en::SyncBlock, Address, MiniblockNumber}; -use super::{buffered::ContiguousBlockStore, conversions::sync_block_to_consensus_block}; +#[cfg(test)] +mod tests; + +use super::buffered::ContiguousBlockStore; use crate::{ consensus, + consensus::sync_block_to_consensus_block, sync_layer::{ fetcher::{FetchedBlock, FetcherCursor}, sync_action::{ActionQueueSender, SyncAction}, }, }; -#[cfg(test)] -mod tests; - #[derive(Debug)] struct CursorWithCachedBlock { inner: FetcherCursor, @@ -73,6 +75,7 @@ pub(super) struct PostgresBlockStorage { actions: ActionQueueSender, block_sender: watch::Sender, cursor: Mutex, + operator_address: Address, } impl PostgresBlockStorage { @@ -83,17 +86,16 @@ impl PostgresBlockStorage { actions: ActionQueueSender, cursor: FetcherCursor, genesis_block: &FinalBlock, - ) -> StorageResult { - let mut storage = ctx - .wait(pool.access_storage_tagged("sync_layer")) - .await? - .map_err(StorageError::Database)?; - Self::ensure_genesis_block(ctx, &mut storage, genesis_block).await?; + operator_address: Address, + ) -> ctx::Result { + let mut storage = ctx.wait(pool.access_storage_tagged("sync_layer")).await??; + Self::ensure_genesis_block(ctx, &mut storage, genesis_block, operator_address) + .await + .wrap("ensure_genesis_block()")?; drop(storage); let first_block_number = u32::try_from(genesis_block.header.number.0) - .context("Block number overflow for genesis block") - .map_err(StorageError::Database)?; + .context("Block number overflow for genesis block")?; let first_block_number = MiniblockNumber(first_block_number); Ok(Self::new_unchecked( @@ -101,6 +103,7 @@ impl PostgresBlockStorage { first_block_number, actions, cursor, + operator_address, )) } @@ -109,6 +112,7 @@ impl PostgresBlockStorage { first_block_number: MiniblockNumber, actions: ActionQueueSender, cursor: FetcherCursor, + operator_address: Address, ) -> Self { let current_block_number = cursor.next_miniblock.0.saturating_sub(1).into(); Self { @@ -117,6 +121,7 @@ impl PostgresBlockStorage { actions, block_sender: watch::channel(BlockNumber(current_block_number)).0, cursor: Mutex::new(cursor.into()), + operator_address, } } @@ -124,31 +129,34 @@ impl PostgresBlockStorage { ctx: &ctx::Ctx, storage: &mut StorageProcessor<'_>, genesis_block: &FinalBlock, - ) -> StorageResult<()> { + operator_address: Address, + ) -> ctx::Result<()> { let block_number = u32::try_from(genesis_block.header.number.0) - .context("Block number overflow for genesis block") - .map_err(StorageError::Database)?; - let block = Self::sync_block(ctx, storage, MiniblockNumber(block_number)).await?; + .context("Block number overflow for genesis block")?; + let block = Self::sync_block( + ctx, + storage, + MiniblockNumber(block_number), + operator_address, + ) + .await + .wrap("sync_block();")?; let block = block .with_context(|| { format!("Genesis block #{block_number} (first block with consensus data) is not present in Postgres") - }) - .map_err(StorageError::Database)?; + })?; let actual_consensus_fields = block.consensus.clone(); // Some of the following checks are duplicated in `Executor` initialization, but it's necessary // to run them if the genesis consensus block is not present locally. let expected_payload = consensus::Payload::decode(&genesis_block.payload) - .context("Cannot decode genesis block payload") - .map_err(StorageError::Database)?; - let actual_payload: consensus::Payload = - block.try_into().map_err(StorageError::Database)?; + .context("Cannot decode genesis block payload")?; + let actual_payload: consensus::Payload = block.try_into()?; if actual_payload != expected_payload { - let err = anyhow::anyhow!( + return Err(anyhow::anyhow!( "Genesis block payload from Postgres {actual_payload:?} does not match the configured one \ {expected_payload:?}" - ); - return Err(StorageError::Database(err)); + ).into()); } let expected_consensus_fields = ConsensusBlockFields { @@ -157,16 +165,14 @@ impl PostgresBlockStorage { }; if let Some(actual_consensus_fields) = &actual_consensus_fields { let actual_consensus_fields = ConsensusBlockFields::decode(actual_consensus_fields) - .context("ConsensusBlockFields::decode()") - .map_err(StorageError::Database)?; + .context("ConsensusBlockFields::decode()")?; // While justifications may differ among nodes for an arbitrary block, we assume that // the genesis block has a hardcoded justification. if actual_consensus_fields != expected_consensus_fields { - let err = anyhow::anyhow!( + return Err(anyhow::anyhow!( "Genesis block consensus fields in Postgres {actual_consensus_fields:?} do not match \ the configured ones {expected_consensus_fields:?}" - ); - return Err(StorageError::Database(err)); + ).into()); } } else { tracing::info!( @@ -177,8 +183,7 @@ impl PostgresBlockStorage { &expected_consensus_fields, )) .await? - .context("Failed saving consensus fields for genesis block") - .map_err(StorageError::Database)?; + .context("Failed saving consensus fields for genesis block")?; } Ok(()) } @@ -186,13 +191,13 @@ impl PostgresBlockStorage { /// Runs background tasks for this store. This method **must** be spawned as a background task /// which should be running as long at the [`PostgresBlockStorage`] is in use; otherwise, /// it will function incorrectly. - pub async fn run_background_tasks(&self, ctx: &ctx::Ctx) -> StorageResult<()> { + pub async fn run_background_tasks(&self, ctx: &ctx::Ctx) -> anyhow::Result<()> { const POLL_INTERVAL: time::Duration = time::Duration::milliseconds(50); loop { let sealed_miniblock_number = match self.sealed_miniblock_number(ctx).await { Ok(number) => number, - Err(err @ StorageError::Database(_)) => return Err(err), - Err(StorageError::Canceled(_)) => return Ok(()), // Do not propagate cancellation errors + Err(ctx::Error::Internal(err)) => return Err(err), + Err(ctx::Error::Canceled(_)) => return Ok(()), // Do not propagate cancellation errors }; self.block_sender.send_if_modified(|number| { if *number != sealed_miniblock_number { @@ -208,85 +213,95 @@ impl PostgresBlockStorage { } } - async fn storage(&self, ctx: &ctx::Ctx) -> StorageResult> { - ctx.wait(self.pool.access_storage_tagged("sync_layer")) + async fn storage(&self, ctx: &ctx::Ctx) -> ctx::Result> { + Ok(ctx + .wait(self.pool.access_storage_tagged("sync_layer")) .await? - .context("Failed to connect to Postgres") - .map_err(StorageError::Database) + .context("Failed to connect to Postgres")?) } async fn sync_block( ctx: &ctx::Ctx, storage: &mut StorageProcessor<'_>, number: MiniblockNumber, - ) -> StorageResult> { - let operator_address = Address::default(); // FIXME: where to get this address from? - ctx.wait( - storage - .sync_dal() - .sync_block(number, operator_address, true), - ) - .await? - .with_context(|| format!("Failed getting miniblock #{number} from Postgres")) - .map_err(StorageError::Database) + operator_address: Address, + ) -> ctx::Result> { + Ok(ctx + .wait( + storage + .sync_dal() + .sync_block(number, operator_address, true), + ) + .await? + .with_context(|| format!("Failed getting miniblock #{number} from Postgres"))?) } async fn block( ctx: &ctx::Ctx, storage: &mut StorageProcessor<'_>, number: MiniblockNumber, - ) -> StorageResult> { - let Some(block) = Self::sync_block(ctx, storage, number).await? else { + operator_address: Address, + ) -> ctx::Result> { + let Some(block) = Self::sync_block(ctx, storage, number, operator_address) + .await + .wrap("Self::sync_block()")? + else { return Ok(None); }; - let block = sync_block_to_consensus_block(block).map_err(StorageError::Database)?; + let block = + sync_block_to_consensus_block(block).context("sync_block_to_consensus_block()")?; Ok(Some(block)) } - async fn sealed_miniblock_number(&self, ctx: &ctx::Ctx) -> StorageResult { - let mut storage = self.storage(ctx).await?; + async fn sealed_miniblock_number(&self, ctx: &ctx::Ctx) -> ctx::Result { + let mut storage = self.storage(ctx).await.wrap("storage()")?; let number = ctx .wait(storage.blocks_dal().get_sealed_miniblock_number()) .await? - .context("Failed getting sealed miniblock number") - .map_err(StorageError::Database)?; + .context("Failed getting sealed miniblock number")?; Ok(BlockNumber(number.0.into())) } } #[async_trait] impl BlockStore for PostgresBlockStorage { - async fn head_block(&self, ctx: &ctx::Ctx) -> StorageResult { - let mut storage = self.storage(ctx).await?; + async fn head_block(&self, ctx: &ctx::Ctx) -> ctx::Result { + let mut storage = self.storage(ctx).await.wrap("storage()")?; let miniblock_number = ctx .wait(storage.blocks_dal().get_sealed_miniblock_number()) .await? - .context("Failed getting sealed miniblock number") - .map_err(StorageError::Database)?; + .context("Failed getting sealed miniblock number")?; // ^ The number can get stale, but it's OK for our purposes - Ok(Self::block(ctx, &mut storage, miniblock_number) - .await? - .with_context(|| format!("Miniblock #{miniblock_number} disappeared from Postgres")) - .map_err(StorageError::Database)?) + Ok( + Self::block(ctx, &mut storage, miniblock_number, self.operator_address) + .await + .wrap("Self::block()")? + .with_context(|| { + format!("Miniblock #{miniblock_number} disappeared from Postgres") + })?, + ) } - async fn first_block(&self, ctx: &ctx::Ctx) -> StorageResult { - let mut storage = self.storage(ctx).await?; - Self::block(ctx, &mut storage, self.first_block_number) - .await? - .context("Genesis miniblock not present in Postgres") - .map_err(StorageError::Database) + async fn first_block(&self, ctx: &ctx::Ctx) -> ctx::Result { + let mut storage = self.storage(ctx).await.wrap("storage()")?; + Ok(Self::block( + ctx, + &mut storage, + self.first_block_number, + self.operator_address, + ) + .await + .wrap("Self::block()")? + .context("Genesis miniblock not present in Postgres")?) } - async fn last_contiguous_block_number(&self, ctx: &ctx::Ctx) -> StorageResult { - self.sealed_miniblock_number(ctx).await + async fn last_contiguous_block_number(&self, ctx: &ctx::Ctx) -> ctx::Result { + self.sealed_miniblock_number(ctx) + .await + .wrap("sealed_miniblock_number()") } - async fn block( - &self, - ctx: &ctx::Ctx, - number: BlockNumber, - ) -> StorageResult> { + async fn block(&self, ctx: &ctx::Ctx, number: BlockNumber) -> ctx::Result> { let Ok(number) = u32::try_from(number.0) else { return Ok(None); }; @@ -294,21 +309,26 @@ impl BlockStore for PostgresBlockStorage { if number < self.first_block_number { return Ok(None); } - let mut storage = self.storage(ctx).await?; - Self::block(ctx, &mut storage, number).await + let mut storage = self.storage(ctx).await.wrap("storage()")?; + Self::block(ctx, &mut storage, number, self.operator_address) + .await + .wrap("Self::block()") } async fn missing_block_numbers( &self, ctx: &ctx::Ctx, range: ops::Range, - ) -> StorageResult> { + ) -> ctx::Result> { let mut output = vec![]; let first_block_number = u64::from(self.first_block_number.0); let numbers_before_first_block = (range.start.0..first_block_number).map(BlockNumber); output.extend(numbers_before_first_block); - let last_block_number = self.sealed_miniblock_number(ctx).await?; + let last_block_number = self + .sealed_miniblock_number(ctx) + .await + .wrap("sealed_miniblock_number()")?; let numbers_after_last_block = (last_block_number.next().0..range.end.0).map(BlockNumber); output.extend(numbers_after_last_block); @@ -323,10 +343,10 @@ impl BlockStore for PostgresBlockStorage { #[async_trait] impl ContiguousBlockStore for PostgresBlockStorage { - async fn schedule_next_block(&self, ctx: &ctx::Ctx, block: &FinalBlock) -> StorageResult<()> { + async fn schedule_next_block(&self, ctx: &ctx::Ctx, block: &FinalBlock) -> ctx::Result<()> { // last_in_batch` is always set to `false` by this call; it is properly set by `CursorWithCachedBlock`. let fetched_block = - FetchedBlock::from_gossip_block(block, false).map_err(StorageError::Database)?; + FetchedBlock::from_gossip_block(block, false).context("from_gossip_block()")?; let actions = sync::lock(ctx, &self.cursor).await?.advance(fetched_block); for actions_chunk in actions { // We don't wrap this in `ctx.wait()` because `PostgresBlockStorage` will get broken diff --git a/core/lib/zksync_core/src/sync_layer/gossip/storage/tests.rs b/core/lib/zksync_core/src/sync_layer/gossip/storage/tests.rs index c7e53f6456e1..d8b1eff0e24b 100644 --- a/core/lib/zksync_core/src/sync_layer/gossip/storage/tests.rs +++ b/core/lib/zksync_core/src/sync_layer/gossip/storage/tests.rs @@ -13,7 +13,7 @@ use crate::{ add_consensus_fields, assert_first_block_actions, assert_second_block_actions, block_payload, create_genesis_block, load_final_block, }, - tests::run_state_keeper_with_multiple_miniblocks, + tests::{run_state_keeper_with_multiple_miniblocks, OPERATOR_ADDRESS}, ActionQueue, }, }; @@ -36,6 +36,7 @@ async fn block_store_basics_for_postgres() { MiniblockNumber(0), actions_sender, cursor, + OPERATOR_ADDRESS, ); let ctx = &ctx::test_root(&ctx::RealClock); @@ -76,6 +77,7 @@ async fn subscribing_to_block_updates_for_postgres() { MiniblockNumber(0), actions_sender, cursor, + OPERATOR_ADDRESS, ); let mut subscriber = storage.subscribe_to_block_writes(); @@ -128,6 +130,7 @@ async fn processing_new_blocks() { MiniblockNumber(0), actions_sender, cursor, + OPERATOR_ADDRESS, ); let ctx = &ctx::test_root(&ctx::RealClock); let ctx = &ctx.with_timeout(TEST_TIMEOUT); @@ -163,9 +166,16 @@ async fn ensuring_consensus_fields_for_genesis_block() { let genesis_block = create_genesis_block(&validator_key, 0, block_payload.clone()); let (actions_sender, _) = ActionQueue::new(); - PostgresBlockStorage::new(ctx, pool.clone(), actions_sender, cursor, &genesis_block) - .await - .unwrap(); + PostgresBlockStorage::new( + ctx, + pool.clone(), + actions_sender, + cursor, + &genesis_block, + OPERATOR_ADDRESS, + ) + .await + .unwrap(); // Check that the consensus fields are persisted for the genesis block. let mut storage = pool.access_storage().await.unwrap(); @@ -182,9 +192,16 @@ async fn ensuring_consensus_fields_for_genesis_block() { // Check that the storage can be initialized again. let (actions_sender, _) = ActionQueue::new(); - PostgresBlockStorage::new(ctx, pool.clone(), actions_sender, cursor, &genesis_block) - .await - .unwrap(); + PostgresBlockStorage::new( + ctx, + pool.clone(), + actions_sender, + cursor, + &genesis_block, + OPERATOR_ADDRESS, + ) + .await + .unwrap(); // Create a genesis block with another validator. let validator_key = validator::SecretKey::generate(&mut ctx.rng()); @@ -198,6 +215,7 @@ async fn ensuring_consensus_fields_for_genesis_block() { actions_sender, other_cursor, &other_genesis_block, + OPERATOR_ADDRESS, ) .await .unwrap_err(); @@ -222,9 +240,16 @@ async fn genesis_block_payload_mismatch() { let genesis_block = create_genesis_block(&validator_key, 0, bogus_block_payload); let (actions_sender, _) = ActionQueue::new(); - PostgresBlockStorage::new(ctx, pool.clone(), actions_sender, cursor, &genesis_block) - .await - .unwrap_err(); + PostgresBlockStorage::new( + ctx, + pool.clone(), + actions_sender, + cursor, + &genesis_block, + OPERATOR_ADDRESS, + ) + .await + .unwrap_err(); let mut bogus_block_payload = block_payload(&mut storage, 0).await; bogus_block_payload.timestamp += 1; @@ -237,6 +262,7 @@ async fn genesis_block_payload_mismatch() { actions_sender, other_cursor, &genesis_block, + OPERATOR_ADDRESS, ) .await .unwrap_err(); @@ -262,9 +288,16 @@ async fn missing_genesis_block() { let genesis_block = create_genesis_block(&validator_key, 2, block_payload.clone()); let (actions_sender, _) = ActionQueue::new(); - PostgresBlockStorage::new(ctx, pool, actions_sender, cursor, &genesis_block) - .await - .unwrap_err(); + PostgresBlockStorage::new( + ctx, + pool, + actions_sender, + cursor, + &genesis_block, + OPERATOR_ADDRESS, + ) + .await + .unwrap_err(); } #[tokio::test] @@ -283,9 +316,16 @@ async fn using_non_zero_genesis_block() { let genesis_block = create_genesis_block(&validator_key, 2, block_payload.clone()); let (actions_sender, _) = ActionQueue::new(); - let store = PostgresBlockStorage::new(ctx, pool, actions_sender, cursor, &genesis_block) - .await - .unwrap(); + let store = PostgresBlockStorage::new( + ctx, + pool, + actions_sender, + cursor, + &genesis_block, + OPERATOR_ADDRESS, + ) + .await + .unwrap(); let head_block = store.head_block(ctx).await.unwrap(); assert_eq!(head_block.header.number, BlockNumber(2)); diff --git a/core/lib/zksync_core/src/sync_layer/gossip/tests.rs b/core/lib/zksync_core/src/sync_layer/gossip/tests.rs index 338fc9016f47..ad9b00e9c153 100644 --- a/core/lib/zksync_core/src/sync_layer/gossip/tests.rs +++ b/core/lib/zksync_core/src/sync_layer/gossip/tests.rs @@ -9,9 +9,7 @@ use zksync_consensus_executor::testonly::FullValidatorConfig; use zksync_consensus_roles::validator::{self, FinalBlock}; use zksync_consensus_storage::{InMemoryStorage, WriteBlockStore}; use zksync_dal::{blocks_dal::ConsensusBlockFields, ConnectionPool, StorageProcessor}; -use zksync_types::{ - api::en::SyncBlock, Address, L1BatchNumber, MiniblockNumber, ProtocolVersionId, H256, -}; +use zksync_types::{api::en::SyncBlock, L1BatchNumber, MiniblockNumber, H256}; use super::*; use crate::{ @@ -20,7 +18,7 @@ use crate::{ sync_action::SyncAction, tests::{ mock_l1_batch_hash_computation, run_state_keeper_with_multiple_l1_batches, - run_state_keeper_with_multiple_miniblocks, StateKeeperHandles, + run_state_keeper_with_multiple_miniblocks, StateKeeperHandles, OPERATOR_ADDRESS, }, ActionQueue, }, @@ -32,7 +30,7 @@ const POLL_INTERVAL: time::Duration = time::Duration::milliseconds(50 * CLOCK_SP async fn load_sync_block(storage: &mut StorageProcessor<'_>, number: u32) -> SyncBlock { storage .sync_dal() - .sync_block(MiniblockNumber(number), Address::default(), true) + .sync_block(MiniblockNumber(number), OPERATOR_ADDRESS, true) .await .unwrap() .unwrap_or_else(|| panic!("no sync block #{number}")) @@ -44,13 +42,13 @@ pub(super) async fn load_final_block( number: u32, ) -> FinalBlock { let sync_block = load_sync_block(storage, number).await; - conversions::sync_block_to_consensus_block(sync_block).unwrap() + consensus::sync_block_to_consensus_block(sync_block).unwrap() } fn convert_sync_blocks(sync_blocks: Vec) -> Vec { sync_blocks .into_iter() - .map(|sync_block| conversions::sync_block_to_consensus_block(sync_block).unwrap()) + .map(|sync_block| consensus::sync_block_to_consensus_block(sync_block).unwrap()) .collect() } @@ -62,19 +60,12 @@ pub(super) async fn block_payload( consensus::Payload::try_from(sync_block).unwrap() } -fn latest_protocol_version() -> validator::ProtocolVersion { - (ProtocolVersionId::latest() as u32) - .try_into() - .expect("latest protocol version is invalid") -} - /// Adds consensus information for the specified `count` of miniblocks, starting from the genesis. pub(super) async fn add_consensus_fields( storage: &mut StorageProcessor<'_>, validator_key: &validator::SecretKey, block_numbers: ops::Range, ) { - let protocol_version = latest_protocol_version(); let mut prev_block_hash = validator::BlockHeaderHash::from_bytes([0; 32]); let validator_set = validator::ValidatorSet::new([validator_key.public()]).unwrap(); for number in block_numbers { @@ -85,7 +76,7 @@ pub(super) async fn add_consensus_fields( payload: payload.hash(), }; let replica_commit = validator::ReplicaCommit { - protocol_version, + protocol_version: validator::ProtocolVersion::EARLIEST, view: validator::ViewNumber(number.into()), proposal: block_header, }; @@ -119,7 +110,7 @@ pub(super) fn create_genesis_block( }; let validator_set = validator::ValidatorSet::new([validator_key.public()]).unwrap(); let replica_commit = validator::ReplicaCommit { - protocol_version: latest_protocol_version(), + protocol_version: validator::ProtocolVersion::EARLIEST, view: validator::ViewNumber(number), proposal: block_header, }; @@ -188,12 +179,14 @@ async fn syncing_via_gossip_fetcher(delay_first_block: bool, delay_second_block: let tx_hashes = run_state_keeper_with_multiple_miniblocks(pool.clone()).await; let mut storage = pool.access_storage().await.unwrap(); - let protocol_version = latest_protocol_version(); let genesis_block_payload = block_payload(&mut storage, 0).await.encode(); let ctx = &ctx::test_root(&ctx::AffineClock::new(CLOCK_SPEEDUP as f64)); let rng = &mut ctx.rng(); - let mut validator = - FullValidatorConfig::for_single_validator(rng, protocol_version, genesis_block_payload); + let mut validator = FullValidatorConfig::for_single_validator( + rng, + genesis_block_payload, + validator::BlockNumber(0), + ); let validator_set = validator.node_config.validators.clone(); let external_node = validator.connect_full_node(rng); @@ -237,6 +230,7 @@ async fn syncing_via_gossip_fetcher(delay_first_block: bool, delay_second_block: actions_sender, external_node.node_config, external_node.node_key, + OPERATOR_ADDRESS, )); if delay_first_block { @@ -319,12 +313,14 @@ async fn syncing_via_gossip_fetcher_with_multiple_l1_batches(initial_block_count let tx_hashes: Vec<_> = tx_hashes.iter().map(Vec::as_slice).collect(); let mut storage = pool.access_storage().await.unwrap(); - let protocol_version = latest_protocol_version(); let genesis_block_payload = block_payload(&mut storage, 0).await.encode(); let ctx = &ctx::test_root(&ctx::AffineClock::new(CLOCK_SPEEDUP as f64)); let rng = &mut ctx.rng(); - let mut validator = - FullValidatorConfig::for_single_validator(rng, protocol_version, genesis_block_payload); + let mut validator = FullValidatorConfig::for_single_validator( + rng, + genesis_block_payload, + validator::BlockNumber(0), + ); let validator_set = validator.node_config.validators.clone(); let external_node = validator.connect_full_node(rng); @@ -370,6 +366,7 @@ async fn syncing_via_gossip_fetcher_with_multiple_l1_batches(initial_block_count actions_sender, external_node.node_config, external_node.node_key, + OPERATOR_ADDRESS, )); state_keeper @@ -384,12 +381,12 @@ async fn syncing_via_gossip_fetcher_with_multiple_l1_batches(initial_block_count let mut storage = pool.access_storage().await.unwrap(); for number in [1, 2, 3] { let block = load_final_block(&mut storage, number).await; - block.justification.verify(&validator_set, 1).unwrap(); + block.validate(&validator_set, 1).unwrap(); } } #[test_casing(2, [1, 2])] -#[tokio::test] +#[tokio::test(flavor = "multi_thread")] async fn syncing_from_non_zero_block(first_block_number: u32) { abort_on_panic(); let pool = ConnectionPool::test_pool().await; @@ -402,11 +399,10 @@ async fn syncing_from_non_zero_block(first_block_number: u32) { .encode(); let ctx = &ctx::test_root(&ctx::AffineClock::new(CLOCK_SPEEDUP as f64)); let rng = &mut ctx.rng(); - let protocol_version = latest_protocol_version(); let mut validator = FullValidatorConfig::for_single_validator( rng, - protocol_version, genesis_block_payload.clone(), + validator::BlockNumber(0), ); // Override the genesis block since it has an incorrect block number. let genesis_block = create_genesis_block( @@ -452,11 +448,15 @@ async fn syncing_from_non_zero_block(first_block_number: u32) { ) .await?; - s.spawn_bg(validator.run(ctx)); + s.spawn_bg(async { validator.run(ctx).await.context("validator.run()") }); + s.spawn_bg(async { for block in &delayed_blocks { ctx.sleep(POLL_INTERVAL).await?; - validator_storage.put_block(ctx, block).await?; + validator_storage + .put_block(ctx, block) + .await + .wrap("validator_stroage.put_block()")?; } Ok(()) }); @@ -465,22 +465,27 @@ async fn syncing_from_non_zero_block(first_block_number: u32) { // L1 batch #1 will be sealed during the state keeper operation; we need to emulate // computing metadata for it. s.spawn_bg(async { - mock_l1_batch_hash_computation(pool.clone(), 1).await; + ctx.wait(mock_l1_batch_hash_computation(pool.clone(), 1)) + .await?; Ok(()) }); } - s.spawn_bg(run_gossip_fetcher_inner( - ctx, - pool.clone(), - actions_sender, - external_node.node_config, - external_node.node_key, - )); + s.spawn_bg(async { + run_gossip_fetcher_inner( + ctx, + pool.clone(), + actions_sender, + external_node.node_config, + external_node.node_key, + OPERATOR_ADDRESS, + ) + .await + .context("run_gossip_fetcher_inner()") + }); - state_keeper - .wait(|state| state.get_local_block() == MiniblockNumber(3)) - .await; + ctx.wait(state_keeper.wait(|state| state.get_local_block() == MiniblockNumber(3))) + .await?; Ok(()) }) .await diff --git a/core/lib/zksync_core/src/sync_layer/tests.rs b/core/lib/zksync_core/src/sync_layer/tests.rs index 2b2f2e90658c..37d76aeb142e 100644 --- a/core/lib/zksync_core/src/sync_layer/tests.rs +++ b/core/lib/zksync_core/src/sync_layer/tests.rs @@ -6,19 +6,17 @@ use std::{ time::{Duration, Instant}, }; -use async_trait::async_trait; use tokio::{sync::watch, task::JoinHandle}; use zksync_config::configs::chain::NetworkConfig; -use zksync_contracts::{BaseSystemContractsHashes, SystemContractCode}; use zksync_dal::{ConnectionPool, StorageProcessor}; use zksync_types::{ - api, block::MiniblockHasher, Address, L1BatchNumber, L2ChainId, MiniblockNumber, - ProtocolVersionId, Transaction, H256, + Address, L1BatchNumber, L2ChainId, MiniblockNumber, ProtocolVersionId, Transaction, H256, }; use super::{fetcher::FetcherCursor, sync_action::SyncAction, *}; use crate::{ api_server::web3::tests::spawn_http_server, + consensus::testonly::MockMainNodeClient, genesis::{ensure_genesis_state, GenesisParams}, state_keeper::{ tests::{create_l1_batch_metadata, create_l2_transaction, TestBatchExecutorBuilder}, @@ -28,118 +26,7 @@ use crate::{ const TEST_TIMEOUT: Duration = Duration::from_secs(10); const POLL_INTERVAL: Duration = Duration::from_millis(50); - -#[derive(Debug, Default)] -struct MockMainNodeClient { - prev_miniblock_hash: H256, - l2_blocks: Vec, -} - -impl MockMainNodeClient { - /// `miniblock_count` doesn't include a fictive miniblock. Returns hashes of generated transactions. - fn push_l1_batch(&mut self, miniblock_count: u32) -> Vec { - let l1_batch_number = self - .l2_blocks - .last() - .map_or(L1BatchNumber(0), |block| block.l1_batch_number + 1); - let number_offset = self.l2_blocks.len() as u32; - - let mut tx_hashes = vec![]; - let l2_blocks = (0..=miniblock_count).map(|number| { - let is_fictive = number == miniblock_count; - let number = number + number_offset; - let mut hasher = MiniblockHasher::new( - MiniblockNumber(number), - number.into(), - self.prev_miniblock_hash, - ); - - let transactions = if is_fictive { - vec![] - } else { - let transaction = create_l2_transaction(10, 100); - tx_hashes.push(transaction.hash()); - hasher.push_tx_hash(transaction.hash()); - vec![transaction.into()] - }; - let miniblock_hash = hasher.finalize(if number == 0 { - ProtocolVersionId::Version0 // The genesis block always uses the legacy hashing mode - } else { - ProtocolVersionId::latest() - }); - self.prev_miniblock_hash = miniblock_hash; - - api::en::SyncBlock { - number: MiniblockNumber(number), - l1_batch_number, - last_in_batch: is_fictive, - timestamp: number.into(), - l1_gas_price: 2, - l2_fair_gas_price: 3, - base_system_contracts_hashes: BaseSystemContractsHashes::default(), - operator_address: Address::repeat_byte(2), - transactions: Some(transactions), - virtual_blocks: Some(!is_fictive as u32), - hash: Some(miniblock_hash), - protocol_version: ProtocolVersionId::latest(), - consensus: None, - } - }); - - self.l2_blocks.extend(l2_blocks); - tx_hashes - } -} - -#[async_trait] -impl MainNodeClient for MockMainNodeClient { - async fn fetch_system_contract_by_hash( - &self, - _hash: H256, - ) -> anyhow::Result { - anyhow::bail!("Not implemented"); - } - - async fn fetch_genesis_contract_bytecode( - &self, - _address: Address, - ) -> anyhow::Result>> { - anyhow::bail!("Not implemented"); - } - - async fn fetch_protocol_version( - &self, - _protocol_version: ProtocolVersionId, - ) -> anyhow::Result { - anyhow::bail!("Not implemented"); - } - - async fn fetch_genesis_l1_batch_hash(&self) -> anyhow::Result { - anyhow::bail!("Not implemented"); - } - - async fn fetch_l2_block_number(&self) -> anyhow::Result { - if let Some(number) = self.l2_blocks.len().checked_sub(1) { - Ok(MiniblockNumber(number as u32)) - } else { - anyhow::bail!("Not implemented"); - } - } - - async fn fetch_l2_block( - &self, - number: MiniblockNumber, - with_transactions: bool, - ) -> anyhow::Result> { - let Some(mut block) = self.l2_blocks.get(number.0 as usize).cloned() else { - return Ok(None); - }; - if !with_transactions { - block.transactions = None; - } - Ok(Some(block)) - } -} +pub const OPERATOR_ADDRESS: Address = Address::repeat_byte(1); fn open_l1_batch(number: u32, timestamp: u64, first_miniblock_number: u32) -> SyncAction { SyncAction::OpenBatch { @@ -147,7 +34,7 @@ fn open_l1_batch(number: u32, timestamp: u64, first_miniblock_number: u32) -> Sy timestamp, l1_gas_price: 2, l2_fair_gas_price: 3, - operator_address: Default::default(), + operator_address: OPERATOR_ADDRESS, protocol_version: ProtocolVersionId::latest(), first_miniblock_info: (MiniblockNumber(first_miniblock_number), 1), } @@ -178,7 +65,7 @@ impl StateKeeperHandles { actions, sync_state.clone(), Box::::default(), - Address::repeat_byte(1), + OPERATOR_ADDRESS, u32::MAX, L2ChainId::default(), ) @@ -353,7 +240,7 @@ async fn external_io_with_multiple_miniblocks() { let sync_block = storage .sync_dal() - .sync_block(MiniblockNumber(number), Address::repeat_byte(1), true) + .sync_block(MiniblockNumber(number), OPERATOR_ADDRESS, true) .await .unwrap() .unwrap_or_else(|| panic!("Sync block #{} is not persisted", number));