diff --git a/Cargo.lock b/Cargo.lock index 0a503854540b..4ef3ecaa2550 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4224,7 +4224,7 @@ version = "1.1.0" [[package]] name = "exex-wvm-bigquery" version = "1.0.0" -source = "git+https://github.com/weaveVM/exex-templates?b?branch=main#aa1102e1caccb7274f1b183ba339d2fe55eaf703" +source = "git+https://github.com/weaveVM/exex-templates?e?branch=main#b5108ed904a84e04e1099c68d89327b4cbdab1e1" dependencies = [ "alloy-primitives", "chrono", @@ -4241,7 +4241,7 @@ dependencies = [ [[package]] name = "exex-wvm-da" version = "1.0.0" -source = "git+https://github.com/weaveVM/exex-templates?a?branch=main#aa1102e1caccb7274f1b183ba339d2fe55eaf703" +source = "git+https://github.com/weaveVM/exex-templates?a?branch=main#b5108ed904a84e04e1099c68d89327b4cbdab1e1" dependencies = [ "async-trait", "borsh 1.5.1", @@ -11609,8 +11609,11 @@ dependencies = [ "revm", "revm-inspectors", "revm-primitives", + "serde", + "serde_json", "tokio", "tracing", + "wvm-static", ] [[package]] diff --git a/Cargo.toml b/Cargo.toml index efb505259864..d3e2e5336dd8 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -155,7 +155,7 @@ members = [ "wvm-apps/wvm-exexed/crates/lambda/", "wvm-apps/wvm-exexed/crates/precompiles/", "wvm-apps/wvm-exexed/crates/wvm-borsh/", - "wvm-apps/wvm-exexed/crates/static/", + "crates/wvm-static/", "wvm-apps/wvm-exexed/crates/brotli/", "wvm-apps/wvm-exexed/crates/tx/", ] @@ -396,7 +396,7 @@ reth-rpc-api = { path = "crates/rpc/rpc-api" } reth-rpc-api-testing-util = { path = "crates/rpc/rpc-testing-util" } reth-rpc-builder = { path = "crates/rpc/rpc-builder" } reth-rpc-engine-api = { path = "crates/rpc/rpc-engine-api" } -reth-rpc-eth-api = { path = "crates/rpc/rpc-eth-api" } +reth-rpc-eth-api = { path = "crates/rpc/rpc-eth-api", features = ["client"] } reth-rpc-eth-types = { path = "crates/rpc/rpc-eth-types", default-features = false } reth-rpc-layer = { path = "crates/rpc/rpc-layer" } reth-rpc-server-types = { path = "crates/rpc/rpc-server-types" } @@ -418,6 +418,9 @@ reth-trie-common = { path = "crates/trie/common" } reth-trie-db = { path = "crates/trie/db" } reth-trie-parallel = { path = "crates/trie/parallel" } +# wvm +wvm-static = { path = "crates/wvm-static" } + # revm revm = { git = "https://github.com/weaveVM/wvm-revm", branch = "main", features = [ "std", @@ -608,7 +611,7 @@ brotlic = "0.8.2" # exex-templates exex-wvm-da = { git = "https://github.com/weaveVM/exex-templates?a", branch = "main" } -exex-wvm-bigquery = { git = "https://github.com/weaveVM/exex-templates?b", branch = "main" } +exex-wvm-bigquery = { git = "https://github.com/weaveVM/exex-templates?e", branch = "main" } # fees fees = { git = "https://github.com/weaveVM/miscalleneous?c", branch = "main" } diff --git a/crates/rpc/rpc-eth-api/Cargo.toml b/crates/rpc/rpc-eth-api/Cargo.toml index e59ee39a694b..52c555f4d125 100644 --- a/crates/rpc/rpc-eth-api/Cargo.toml +++ b/crates/rpc/rpc-eth-api/Cargo.toml @@ -56,6 +56,11 @@ auto_impl.workspace = true dyn-clone.workspace = true tracing.workspace = true +# wvm +wvm-static.workspace = true +serde.workspace = true +serde_json.workspace = true + [features] js-tracer = ["revm-inspectors/js-tracer", "reth-rpc-eth-types/js-tracer"] client = ["jsonrpsee/client", "jsonrpsee/async-client"] diff --git a/crates/rpc/rpc-eth-api/src/core.rs b/crates/rpc/rpc-eth-api/src/core.rs index 20edf96d810d..56edbea3d507 100644 --- a/crates/rpc/rpc-eth-api/src/core.rs +++ b/crates/rpc/rpc-eth-api/src/core.rs @@ -16,7 +16,7 @@ use jsonrpsee::{core::RpcResult, proc_macros::rpc}; use reth_primitives::{BlockId, BlockNumberOrTag}; use reth_rpc_server_types::{result::internal_rpc_err, ToRpcResult}; use tracing::trace; - +use reth_rpc_eth_types::wvm::WvmTransactionRequest; use crate::{ helpers::{EthApiSpec, EthBlocks, EthCall, EthFees, EthState, EthTransactions, FullEthApi}, RpcBlock, RpcReceipt, RpcTransaction, @@ -332,6 +332,11 @@ pub trait EthApi { #[method(name = "sendTransaction")] async fn send_transaction(&self, request: TransactionRequest) -> RpcResult; + /// Sends WVM transaction; will block waiting for signer to return the + /// transaction hash. + #[method(name = "sendWvmTransaction")] + async fn send_wvm_transaction(&self, request: WvmTransactionRequest) -> RpcResult; + /// Sends signed transaction, returning its hash. #[method(name = "sendRawTransaction")] async fn send_raw_transaction(&self, bytes: Bytes) -> RpcResult; @@ -767,6 +772,12 @@ where Ok(EthTransactions::send_transaction(self, request).await?) } + /// Handler for: `eth_sendWvmTransaction` + async fn send_wvm_transaction(&self, request: WvmTransactionRequest) -> RpcResult { + trace!(target: "rpc::eth", ?request, "Serving eth_sendWvmTransaction"); + Ok(EthTransactions::send_wvm_transaction(self, request).await?) + } + /// Handler for: `eth_sendRawTransaction` async fn send_raw_transaction(&self, tx: Bytes) -> RpcResult { trace!(target: "rpc::eth", ?tx, "Serving eth_sendRawTransaction"); diff --git a/crates/rpc/rpc-eth-api/src/helpers/transaction.rs b/crates/rpc/rpc-eth-api/src/helpers/transaction.rs index 94dd04414874..69b86e92ada0 100644 --- a/crates/rpc/rpc-eth-api/src/helpers/transaction.rs +++ b/crates/rpc/rpc-eth-api/src/helpers/transaction.rs @@ -1,6 +1,7 @@ //! Database access for `eth_` transaction RPC methods. Loads transaction and receipt data w.r.t. //! network. +use std::time::{SystemTime, UNIX_EPOCH}; use alloy_dyn_abi::TypedData; use alloy_eips::eip2718::Encodable2718; use alloy_network::TransactionBuilder; @@ -16,9 +17,11 @@ use reth_rpc_eth_types::{ utils::{binary_search, recover_raw_transaction}, EthApiError, EthStateCache, SignError, TransactionSource, }; +use reth_rpc_eth_types::wvm::WvmTransactionRequest; use reth_rpc_types_compat::transaction::{from_recovered, from_recovered_with_block_context}; use reth_transaction_pool::{PoolTransaction, TransactionOrigin, TransactionPool}; - +use serde::Serialize; +use wvm_static::PRECOMPILE_WVM_BIGQUERY_CLIENT; use crate::{FromEthApiError, FullEthApiTypes, IntoEthApiError, RpcReceipt, RpcTransaction}; use super::{ @@ -336,6 +339,45 @@ pub trait EthTransactions: LoadTransaction { } } + /// WVM Exclusive + /// Sends a transaction to the blockchain (raw) + /// And saves the transaction with tags in GBQ. + /// Tags will be then be used for easier indexing of the chain transaction itself + fn send_wvm_transaction(&self, mut request: WvmTransactionRequest) -> impl Future> + Send + where + Self: EthApiSpec + LoadBlock + LoadPendingBlock + Call, { + + // WVM + let bq_client = (&*PRECOMPILE_WVM_BIGQUERY_CLIENT).clone(); + + async move { + let tags = request.tags; + let hash = self.send_raw_transaction(request.tx).await?; + let created_at = { + let now = SystemTime::now(); + let since_epoch = now.duration_since(UNIX_EPOCH).expect("Time went backwards"); + since_epoch.as_millis() + }; + + #[derive(Serialize)] + struct TagsTbl { + hash: String, + tags: String, + created_at: u128 + } + + bq_client.insert_generic("tags", None, TagsTbl { + hash: hash.to_string(), + tags: serde_json::to_string(&tags).unwrap(), + created_at + }).await + .map_err(|_| EthApiError::InternalEthError)?; + + Ok(hash) + } + + } + /// Signs transaction with a matching signer, if any and submits the transaction to the pool. /// Returns the hash of the signed transaction. fn send_transaction( diff --git a/crates/rpc/rpc-eth-types/src/lib.rs b/crates/rpc/rpc-eth-types/src/lib.rs index fa36dae4c881..f63ffd41afb1 100644 --- a/crates/rpc/rpc-eth-types/src/lib.rs +++ b/crates/rpc/rpc-eth-types/src/lib.rs @@ -21,6 +21,7 @@ pub mod revm_utils; pub mod simulate; pub mod transaction; pub mod utils; +pub mod wvm; pub use builder::{ config::{EthConfig, EthFilterConfig}, diff --git a/crates/rpc/rpc-eth-types/src/wvm.rs b/crates/rpc/rpc-eth-types/src/wvm.rs new file mode 100644 index 000000000000..e2ead428f327 --- /dev/null +++ b/crates/rpc/rpc-eth-types/src/wvm.rs @@ -0,0 +1,13 @@ +use alloy_primitives::Bytes; +use alloy_rpc_types_eth::TransactionRequest; +use jsonrpsee_core::Serialize; +use serde::Deserialize; + +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct WvmTransactionRequest { + pub tx: Bytes, + #[serde(default, skip_serializing_if = "Option::is_none")] + pub tags: Option> +} + diff --git a/wvm-apps/wvm-exexed/crates/static/Cargo.toml b/crates/wvm-static/Cargo.toml similarity index 79% rename from wvm-apps/wvm-exexed/crates/static/Cargo.toml rename to crates/wvm-static/Cargo.toml index 5a9016ce982c..2a356e6644cd 100644 --- a/wvm-apps/wvm-exexed/crates/static/Cargo.toml +++ b/crates/wvm-static/Cargo.toml @@ -12,5 +12,5 @@ exex-wvm-bigquery.workspace = true tracing.workspace = true serde_json.workspace = true tokio.workspace = true -tokio-util = { version = "0.7.12", features = ["full"] } -once_cell = { version = "1.20.2" } \ No newline at end of file +tokio-util = { workspace = true, features = ["full"] } +once_cell = { workspace = true } \ No newline at end of file diff --git a/wvm-apps/wvm-exexed/crates/static/src/lib.rs b/crates/wvm-static/src/lib.rs similarity index 100% rename from wvm-apps/wvm-exexed/crates/static/src/lib.rs rename to crates/wvm-static/src/lib.rs diff --git a/wvm-apps/wvm-exexed/Cargo.toml b/wvm-apps/wvm-exexed/Cargo.toml index c519d85422f9..21c05eaa145d 100644 --- a/wvm-apps/wvm-exexed/Cargo.toml +++ b/wvm-apps/wvm-exexed/Cargo.toml @@ -38,7 +38,7 @@ lambda = { path = "crates/lambda" } arweave-upload = { path = "crates/arweave-upload" } precompiles = { path = "crates/precompiles" } wvm-borsh = { path = "crates/wvm-borsh" } -wvm-static = { path = "crates/static" } +wvm-static.workspace = true exex-etl = { path = "crates/exex-etl" } rbrotli = { path = "crates/brotli" } wvm-tx = { path = "crates/tx" } diff --git a/wvm-apps/wvm-exexed/crates/precompiles/Cargo.toml b/wvm-apps/wvm-exexed/crates/precompiles/Cargo.toml index 8510abdceab6..8d7f97f128ca 100644 --- a/wvm-apps/wvm-exexed/crates/precompiles/Cargo.toml +++ b/wvm-apps/wvm-exexed/crates/precompiles/Cargo.toml @@ -26,7 +26,7 @@ tokio.workspace = true reqwest-graphql = "1.0.0" rbrotli = { path = "../brotli" } wvm-borsh = { path = "../wvm-borsh", name = "wvm-borsh" } -wvm-static = { path = "../static" } +wvm-static = { path = "../../../../crates/wvm-static" } borsh.workspace = true alloy-primitives.workspace = true reth-evm-ethereum.workspace = true diff --git a/wvm-apps/wvm-exexed/crates/reth-exexed/Cargo.toml b/wvm-apps/wvm-exexed/crates/reth-exexed/Cargo.toml index 825be36c99fa..5970a31d368f 100644 --- a/wvm-apps/wvm-exexed/crates/reth-exexed/Cargo.toml +++ b/wvm-apps/wvm-exexed/crates/reth-exexed/Cargo.toml @@ -22,7 +22,7 @@ lambda = { path = "../lambda" } arweave-upload = { path = "../arweave-upload" } precompiles = { path = "../precompiles" } rbrotli = { path = "../brotli" } -wvm-static = { path = "../static" } +wvm-static = { path = "../../../../crates/wvm-static" } tokio.workspace = true exex-wvm-da.workspace = true exex-wvm-bigquery.workspace = true diff --git a/wvm-apps/wvm-exexed/crates/reth-exexed/src/exex/ar_process.rs b/wvm-apps/wvm-exexed/crates/reth-exexed/src/exex/ar_process.rs new file mode 100644 index 000000000000..639644338e5d --- /dev/null +++ b/wvm-apps/wvm-exexed/crates/reth-exexed/src/exex/ar_process.rs @@ -0,0 +1,177 @@ +use crate::network_tag::get_network_tag; +use crate::new_etl_exex_biguery_client; +use crate::util::check_block_existence; +use arweave_upload::{ArweaveRequest, UploaderProvider}; +use exex_wvm_bigquery::repository::StateRepository; +use exex_wvm_bigquery::BigQueryClient; +use exex_wvm_da::{DefaultWvmDataSettler, WvmDataSettler}; +use reth::primitives::revm_primitives::alloy_primitives::private::serde::Serialize; +use reth::primitives::revm_primitives::alloy_primitives::BlockNumber; +use reth_primitives::SealedBlockWithSenders; +use std::sync::Arc; +use std::time::{SystemTime, UNIX_EPOCH}; +use tokio::sync::mpsc::Sender; +use tokio::task::JoinHandle; +use tracing::{error, info}; +use wvm_borsh::block::BorshSealedBlockWithSenders; +use wvm_static::SUPERVISOR_RT; + +pub struct ArProcess { + buffer_size: usize, + pub sender: Sender<(SealedBlockWithSenders, String)>, + thread: JoinHandle<()>, +} + +impl ArProcess { + pub fn new(buffer_size: usize) -> Self { + let (sender, mut receiver) = + tokio::sync::mpsc::channel::<(SealedBlockWithSenders, String)>(buffer_size); + + let thread = SUPERVISOR_RT.spawn(async move { + let big_query_client = new_etl_exex_biguery_client().await; + let big_query_client = Arc::new(big_query_client); + let state_repo = Arc::new(StateRepository::new(big_query_client.clone())); + + while let Some(msg) = receiver.recv().await { + let state_repo = state_repo.clone(); + let big_query_client = big_query_client.clone(); + let ar_uploader_provider = UploaderProvider::new(None); + tokio::spawn(async move { + let sealed_block = msg.0; + let notification_type = msg.1.as_str(); + + let block_number = sealed_block.block.header.header().number; + let block_hash = sealed_block.block.hash().to_string(); + + let sealed_block_clone = sealed_block.clone(); + + let borsh_brotli = match Self::serialize_block(sealed_block_clone) { + Some(value) => value, + None => return, + }; + + let does_block_exist = check_block_existence(block_hash.as_str(), false).await; + + if !does_block_exist { + let provider = ar_uploader_provider.clone(); + let arweave_id = match Self::send_block_to_arweave(&provider, notification_type, block_number, &block_hash, borsh_brotli).await { + Some(ar_id) => ar_id, + None => return, + }; + + info!(target: "wvm::exex", "irys id: {}, for block: {}", arweave_id, block_number); + + let _ = Self::bigquery_tags(big_query_client.clone(), &sealed_block); + let _ = Self::bigquery_task(state_repo.clone(), sealed_block, block_number, arweave_id); + } + }); + } + }); + + Self { sender, buffer_size, thread } + } + + fn bigquery_tags(client: Arc, sealed_block: &SealedBlockWithSenders) { + let hashes: Vec = + sealed_block.body.transactions.iter().map(|e| e.hash.to_string()).collect(); + let block_number = sealed_block.block.header.header().number; + let confirmed_tags_tbl_name = + format!("`{}.{}.{}`", client.project_id, client.dataset_id, "confirmed_tags"); + let tags_tbl_name = format!("`{}.{}.{}`", client.project_id, client.dataset_id, "tags"); + + if hashes.is_empty() { + return; + } + + let query = { + // Generate the WHERE clause + let in_clause = hashes + .into_iter() + .map(|hash| format!("\"{}\"", hash)) + .collect::>() + .join(", "); + + let ms = SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_millis(); + + // Generate the final query + format!( + "INSERT INTO {} (tx_hash, tags, block_id, `timestamp`) SELECT t.hash, t.tags, {}, {} FROM {} t WHERE t.hash IN({}) AND t.created_at <= {}", + confirmed_tags_tbl_name, block_number, ms, tags_tbl_name, in_clause, ms + ) + }; + + tokio::spawn(async move { + let run_q = client.bq_query(query.clone()).await; + if let Err(e) = run_q { + error!(target: "wvm::exex", %e, "Failed to write to bigquery, block {}. Query: {}", block_number, query); + } else { + info!(target: "wvm::exex", "Tags at block {} updated successfully", block_number); + } + }); + } + + fn bigquery_task( + state_repo: Arc, + sealed_block: SealedBlockWithSenders, + block_number: BlockNumber, + arweave_id: String, + ) -> Result, ()> { + Ok(tokio::spawn(async move { + if let Err(err) = exex_wvm_bigquery::save_block( + state_repo, + &sealed_block, + block_number, + arweave_id.clone(), + ) + .await + { + error!(target: "wvm::exex", %err, "Failed to write to bigquery, block {}", block_number); + }; + })) + } + + async fn send_block_to_arweave( + ar_uploader_provider: &UploaderProvider, + notification_type: &str, + block_number: BlockNumber, + block_hash: &String, + brotli_borsh: Vec, + ) -> Option { + let res = ArweaveRequest::new() + .set_tag("Content-Type", "application/octet-stream") + .set_tag("WeaveVM:Encoding", "Borsh-Brotli") + .set_tag("Block-Number", &block_hash) + .set_tag("Block-Hash", &block_hash) + .set_tag("Client-Version", reth_primitives::constants::RETH_CLIENT_VERSION) + .set_tag("Network", get_network_tag().as_str()) + .set_tag("WeaveVM:Internal-Chain", notification_type) + .set_data(brotli_borsh) + .send_with_provider(&ar_uploader_provider) + .await; + + let arweave_id = match res { + Ok(arweave_id) => arweave_id, + Err(err) => { + error!(target: "wvm::exex", %err, "Failed to construct arweave_id for block {}", block_number); + return None; + } + }; + + Some(arweave_id) + } + + fn serialize_block(msg: SealedBlockWithSenders) -> Option> { + let data_settler = DefaultWvmDataSettler; + let block_number = msg.block.header.header().number; + + let borsh_sealed_block = BorshSealedBlockWithSenders(msg); + let brotli_borsh = match data_settler.process_block(&borsh_sealed_block) { + Ok(data) => data, + Err(err) => { + error!(target: "wvm::exex", %err, "Failed to do brotli encoding for block {}", block_number); + return None; + } + }; + Some(brotli_borsh) + } +} diff --git a/wvm-apps/wvm-exexed/crates/reth-exexed/src/exex/mod.rs b/wvm-apps/wvm-exexed/crates/reth-exexed/src/exex/mod.rs new file mode 100644 index 000000000000..70438ed7ccf7 --- /dev/null +++ b/wvm-apps/wvm-exexed/crates/reth-exexed/src/exex/mod.rs @@ -0,0 +1 @@ +pub mod ar_process; diff --git a/wvm-apps/wvm-exexed/crates/reth-exexed/src/main.rs b/wvm-apps/wvm-exexed/crates/reth-exexed/src/main.rs index 5f7da1c6d309..0aa5b1196359 100644 --- a/wvm-apps/wvm-exexed/crates/reth-exexed/src/main.rs +++ b/wvm-apps/wvm-exexed/crates/reth-exexed/src/main.rs @@ -3,6 +3,7 @@ #![doc(issue_tracker_base_url = "https://github.com/weaveVM/wvm-reth/issues/")] mod constant; +mod exex; mod network_tag; mod util; @@ -23,9 +24,7 @@ use wvm_borsh::block::BorshSealedBlockWithSenders; async fn exex_etl_processor( mut ctx: ExExContext, - state_repository: StateRepository, - irys_provider: UploaderProvider, - _state_processor: exex_etl::state_processor::StateProcessor, + ar_process: Arc, ) -> eyre::Result<()> { while let Some(notification_result) = ctx.notifications.next().await { let notification = match notification_result { @@ -70,59 +69,13 @@ async fn exex_etl_processor( } if let Some(committed_chain) = notification.committed_chain() { - let data_settler = DefaultWvmDataSettler; - let sealed_block_with_senders = committed_chain.tip(); - let borsh_sealed_block = BorshSealedBlockWithSenders(sealed_block_with_senders.clone()); - let brotli_borsh = match data_settler.process_block(&borsh_sealed_block) { - Ok(data) => data, - Err(err) => { - error!(target: "wvm::exex", %err, "Failed to do brotli encoding for block {}", sealed_block_with_senders.block.header.header().number); - continue; - } - }; - - let blk_str_hash = sealed_block_with_senders.block.hash().to_string(); - let block_hash = blk_str_hash.as_str(); - let does_block_exist = check_block_existence(block_hash, false).await; - - if !does_block_exist { - let res = ArweaveRequest::new() - .set_tag("Content-Type", "application/octet-stream") - .set_tag("WeaveVM:Encoding", "Borsh-Brotli") - .set_tag( - "Block-Number", - sealed_block_with_senders.block.header.header().number.to_string().as_str(), - ) - .set_tag("Block-Hash", block_hash) - .set_tag("Client-Version", reth_primitives::constants::RETH_CLIENT_VERSION) - .set_tag("Network", get_network_tag().as_str()) - .set_tag("WeaveVM:Internal-Chain", notification_type) - .set_data(brotli_borsh) - .send_with_provider(&irys_provider) - .await; - - let arweave_id = match res { - Ok(arweave_id) => arweave_id, - Err(err) => { - error!(target: "wvm::exex", %err, "Failed to construct arweave_id for block {}", sealed_block_with_senders.block.header.header().number); - continue; - } - }; - - info!(target: "wvm::exex", "irys id: {}, for block: {}", arweave_id, sealed_block_with_senders.block.header.header().number); - - if let Err(err) = exex_wvm_bigquery::save_block( - &state_repository, - &sealed_block_with_senders, - committed_chain.tip().block.header.header().number, - arweave_id.clone(), - ) - .await - { - error!(target: "wvm::exex", %err, "Failed to write to bigquery, block {}", sealed_block_with_senders.block.header.header().number); - continue; - }; - } + let sealed_block_with_senders = committed_chain.tip().clone(); + let _ = ar_process + .sender + .send((sealed_block_with_senders, notification_type.to_string())) + .await; + info!(target: "wvm::exex", "Exex block has been streamed"); + // Handle recovery if `receiver` is dropped } } @@ -133,6 +86,7 @@ async fn exex_etl_processor( fn main() -> eyre::Result<()> { let _rt = &*SUPERVISOR_RT; let _bc = &*PRECOMPILE_WVM_BIGQUERY_CLIENT; + let ar_process = Arc::new(ArProcess::new(10)); reth::cli::Cli::parse_args().run(|builder, _| async move { // Initializations @@ -158,17 +112,9 @@ fn main() -> eyre::Result<()> { let run_exex = (std::env::var("RUN_EXEX").unwrap_or(String::from("false"))).to_lowercase(); if run_exex == "true" { - let big_query_client = new_etl_exex_biguery_client().await; - let state_repo = StateRepository::new(Arc::new(big_query_client)); - - // init state processor - let state_processor = exex_etl::state_processor::StateProcessor::new(); - // init irys provider - let ar_uploader_provider = UploaderProvider::new(None); - handle = handle .install_exex("exex-etl", |ctx| async move { - Ok(exex_etl_processor(ctx, state_repo, ar_uploader_provider, state_processor)) + Ok(exex_etl_processor(ctx, ar_process.clone())) }) .install_exex("exex-lambda", |ctx| async move { Ok(exex_lambda_processor(ctx)) }); } @@ -191,6 +137,7 @@ fn parse_prune_config(prune_conf: &str) -> u64 { SLOT_DURATION.as_secs() * secs } +use crate::exex::ar_process::ArProcess; use exex_wvm_bigquery::{BigQueryClient, BigQueryConfig}; use wvm_static::{PRECOMPILE_WVM_BIGQUERY_CLIENT, SUPERVISOR_RT};