From efd339cb89dbd82bb42689c62fadb8ab2f3c8f82 Mon Sep 17 00:00:00 2001 From: Pedro Nauck Date: Mon, 17 Feb 2025 13:48:55 -0300 Subject: [PATCH] feat(repo): Add pointers inside the StreamResponse (#404) feat(repo): add record pointer on stream response --- Cargo.lock | 4 - crates/core/Cargo.toml | 1 - crates/core/src/server/responses.rs | 105 +++++- crates/core/src/stream/error.rs | 8 +- crates/core/src/stream/fuel_streams.rs | 35 +- crates/core/src/stream/stream_impl.rs | 80 ++++- crates/domains/Cargo.toml | 1 - crates/domains/src/blocks/db_item.rs | 27 +- crates/domains/src/blocks/packets.rs | 1 - crates/domains/src/blocks/record_impl.rs | 5 +- crates/domains/src/inputs/db_item.rs | 33 +- crates/domains/src/inputs/mod.rs | 1 + crates/domains/src/inputs/packets.rs | 90 ++--- crates/domains/src/inputs/record_impl.rs | 5 +- crates/domains/src/outputs/db_item.rs | 35 +- crates/domains/src/outputs/mod.rs | 1 + crates/domains/src/outputs/packets.rs | 123 ++++--- crates/domains/src/outputs/record_impl.rs | 5 +- crates/domains/src/receipts/db_item.rs | 43 ++- crates/domains/src/receipts/mod.rs | 1 + crates/domains/src/receipts/packets.rs | 307 +++++++++--------- crates/domains/src/receipts/record_impl.rs | 5 +- crates/domains/src/subjects.rs | 14 - crates/domains/src/transactions/db_item.rs | 27 +- crates/domains/src/transactions/packets.rs | 1 - .../domains/src/transactions/record_impl.rs | 5 +- crates/domains/src/utxos/db_item.rs | 27 +- crates/domains/src/utxos/mod.rs | 1 + crates/domains/src/utxos/packets.rs | 172 +++++----- crates/domains/src/utxos/record_impl.rs | 5 +- crates/message-broker/src/msg_broker.rs | 4 +- crates/message-broker/src/nats.rs | 5 +- crates/store/Cargo.toml | 2 - crates/store/src/db/db_item.rs | 3 +- crates/store/src/record/record_impl.rs | 12 +- crates/store/src/record/record_packet.rs | 75 ++--- crates/store/src/store/store_impl.rs | 55 +--- services/consumer/src/errors.rs | 7 +- .../consumer/src/executor/block_executor.rs | 71 ++-- services/consumer/src/executor/mod.rs | 2 - .../consumer/src/executor/process_store.rs | 94 ------ .../consumer/src/executor/process_stream.rs | 48 --- services/consumer/src/fuel_stores.rs | 64 +++- .../src/server/websocket/subscribe.rs | 22 +- tests/src/lib.rs | 36 +- tests/tests/services/consumer.rs | 37 ++- tests/tests/store/blocks.rs | 8 +- tests/tests/store/inputs.rs | 62 ++-- tests/tests/store/outputs.rs | 90 ++--- tests/tests/store/pattern_matching.rs | 2 +- tests/tests/store/receipts.rs | 162 ++------- tests/tests/store/record.rs | 14 +- tests/tests/store/transactions.rs | 23 +- tests/tests/store/utxo.rs | 97 ++---- tests/tests/stream/live_data.rs | 20 +- 55 files changed, 1109 insertions(+), 1074 deletions(-) delete mode 100644 services/consumer/src/executor/process_store.rs delete mode 100644 services/consumer/src/executor/process_stream.rs diff --git a/Cargo.lock b/Cargo.lock index 71ef6fa5..abd88138 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3805,7 +3805,6 @@ dependencies = [ "anyhow", "async-nats", "async-stream", - "bytes", "dotenvy", "fuel-core", "fuel-message-broker", @@ -3846,7 +3845,6 @@ dependencies = [ "test-case", "thiserror 2.0.11", "tokio", - "tracing", ] [[package]] @@ -3863,13 +3861,11 @@ dependencies = [ name = "fuel-streams-store" version = "0.0.25" dependencies = [ - "async-stream", "async-trait", "dotenvy", "fuel-data-parser", "fuel-streams-subject", "fuel-streams-types", - "futures", "serde", "sqlx", "test-case", diff --git a/crates/core/Cargo.toml b/crates/core/Cargo.toml index 16c66801..c08b936e 100644 --- a/crates/core/Cargo.toml +++ b/crates/core/Cargo.toml @@ -14,7 +14,6 @@ version.workspace = true anyhow.workspace = true async-nats.workspace = true async-stream.workspace = true -bytes.workspace = true dotenvy.workspace = true fuel-core.workspace = true fuel-message-broker.workspace = true diff --git a/crates/core/src/server/responses.rs b/crates/core/src/server/responses.rs index 0229bac9..ffee19ab 100644 --- a/crates/core/src/server/responses.rs +++ b/crates/core/src/server/responses.rs @@ -1,9 +1,26 @@ use std::sync::Arc; +use fuel_streams_domains::{ + blocks::BlockDbItem, + inputs::InputDbItem, + outputs::OutputDbItem, + receipts::ReceiptDbItem, + transactions::TransactionDbItem, + utxos::UtxoDbItem, +}; use fuel_streams_store::{ - db::DbError, - record::{DataEncoder, RecordEntity, RecordEntityError}, + db::{DbError, DbItem}, + record::{ + DataEncoder, + EncoderError, + RecordEntity, + RecordEntityError, + RecordPacket, + RecordPacketError, + RecordPointer, + }, }; +use fuel_web_utils::server::api::API_VERSION; use serde::{Deserialize, Serialize}; use crate::types::*; @@ -104,15 +121,50 @@ impl MessagePayload { } } +#[derive(thiserror::Error, Debug)] +pub enum StreamResponseError { + #[error(transparent)] + Encoder(#[from] EncoderError), + #[error(transparent)] + MessagePayload(#[from] MessagePayloadError), + #[error(transparent)] + RecordEntity(#[from] RecordEntityError), + #[error(transparent)] + RecordPacket(#[from] RecordPacketError), +} + #[derive(Debug, Clone, Serialize, Deserialize)] pub struct StreamResponse { + pub version: String, #[serde(rename = "type")] pub ty: String, - pub version: String, pub subject: String, + pub pointer: RecordPointer, pub payload: MessagePayload, } +impl StreamResponse { + pub fn new( + subject: String, + subject_id: String, + value: &[u8], + pointer: RecordPointer, + ) -> Result { + let payload = MessagePayload::new(&subject_id, value)?; + Ok(Self { + ty: subject_id, + version: API_VERSION.to_string(), + subject, + payload, + pointer, + }) + } +} + +impl DataEncoder for StreamResponse { + type Err = StreamResponseError; +} + #[derive(Clone, Debug, Deserialize, Serialize)] #[serde(rename_all = "camelCase")] pub enum ServerResponse { @@ -121,3 +173,50 @@ pub enum ServerResponse { Response(StreamResponse), Error(String), } + +impl> TryFrom<(String, T)> for StreamResponse { + type Error = StreamResponseError; + fn try_from((subject_id, item): (String, T)) -> Result { + let pointer: RecordPointer = item.to_owned().into(); + StreamResponse::new( + item.subject_str(), + subject_id, + item.encoded_value(), + pointer, + ) + } +} + +impl TryFrom<&RecordPacket> for StreamResponse { + type Error = StreamResponseError; + fn try_from(packet: &RecordPacket) -> Result { + let subject_id = packet.subject_id(); + let entity = RecordEntity::from_subject_id(&subject_id)?; + match entity { + RecordEntity::Block => { + let db_item = BlockDbItem::try_from(packet)?; + StreamResponse::try_from((subject_id, db_item)) + } + RecordEntity::Transaction => { + let db_item = TransactionDbItem::try_from(packet)?; + StreamResponse::try_from((subject_id, db_item)) + } + RecordEntity::Input => { + let db_item = InputDbItem::try_from(packet)?; + StreamResponse::try_from((subject_id, db_item)) + } + RecordEntity::Output => { + let db_item = OutputDbItem::try_from(packet)?; + StreamResponse::try_from((subject_id, db_item)) + } + RecordEntity::Receipt => { + let db_item = ReceiptDbItem::try_from(packet)?; + StreamResponse::try_from((subject_id, db_item)) + } + RecordEntity::Utxo => { + let db_item = UtxoDbItem::try_from(packet)?; + StreamResponse::try_from((subject_id, db_item)) + } + } + } +} diff --git a/crates/core/src/stream/error.rs b/crates/core/src/stream/error.rs index 4487bb80..7c88d10d 100644 --- a/crates/core/src/stream/error.rs +++ b/crates/core/src/stream/error.rs @@ -2,11 +2,11 @@ use async_nats::SubscribeError; use fuel_message_broker::MessageBrokerError; use fuel_streams_store::{ db::{DbError, SqlxError}, - record::RecordPacketError, + record::{RecordEntityError, RecordPacketError}, store::StoreError, }; -use crate::server::DeliverPolicyError; +use crate::{server::DeliverPolicyError, types::StreamResponseError}; #[derive(Debug, thiserror::Error)] pub enum StreamError { @@ -24,4 +24,8 @@ pub enum StreamError { RecordPacket(#[from] RecordPacketError), #[error(transparent)] Sqlx(#[from] SqlxError), + #[error(transparent)] + StreamResponse(#[from] StreamResponseError), + #[error(transparent)] + RecordEntity(#[from] RecordEntityError), } diff --git a/crates/core/src/stream/fuel_streams.rs b/crates/core/src/stream/fuel_streams.rs index 23879a3b..377b3265 100644 --- a/crates/core/src/stream/fuel_streams.rs +++ b/crates/core/src/stream/fuel_streams.rs @@ -1,9 +1,12 @@ use std::sync::Arc; use fuel_message_broker::NatsMessageBroker; -use fuel_streams_store::db::Db; +use fuel_streams_store::{ + db::Db, + record::{RecordEntity, RecordPacket}, +}; -use super::Stream; +use super::{Stream, StreamError}; use crate::types::*; #[derive(Clone, Debug)] @@ -39,4 +42,32 @@ impl FuelStreams { pub fn broker(&self) -> Arc { self.msg_broker.clone() } + + pub async fn publish_by_entity( + &self, + packet: Arc, + ) -> Result<(), StreamError> { + let subject = (*packet).subject_str(); + let subject_id = (*packet).subject_id(); + let entity = RecordEntity::from_subject_id(&subject_id)?; + let response = StreamResponse::try_from(&*packet)?; + match entity { + RecordEntity::Block => { + self.blocks.publish(&subject, &response).await + } + RecordEntity::Transaction => { + self.transactions.publish(&subject, &response).await + } + RecordEntity::Input => { + self.inputs.publish(&subject, &response).await + } + RecordEntity::Receipt => { + self.receipts.publish(&subject, &response).await + } + RecordEntity::Output => { + self.outputs.publish(&subject, &response).await + } + RecordEntity::Utxo => self.utxos.publish(&subject, &response).await, + } + } } diff --git a/crates/core/src/stream/stream_impl.rs b/crates/core/src/stream/stream_impl.rs index a562149f..842d587f 100644 --- a/crates/core/src/stream/stream_impl.rs +++ b/crates/core/src/stream/stream_impl.rs @@ -2,8 +2,13 @@ use std::{sync::Arc, time::Duration}; pub use async_nats::Subscriber as StreamLiveSubscriber; use fuel_message_broker::NatsMessageBroker; -use fuel_streams_store::{db::Db, record::Record, store::Store}; +use fuel_streams_store::{ + db::{Db, DbItem}, + record::{DataEncoder, QueryOptions, Record}, + store::{find_last_block_height, Store}, +}; use fuel_streams_subject::subject::IntoSubject; +use fuel_streams_types::BlockHeight; use futures::{ stream::{BoxStream, Stream as FStream}, StreamExt, @@ -11,9 +16,9 @@ use futures::{ use tokio::{sync::OnceCell, time::sleep}; use super::{config, StreamError}; -use crate::server::DeliverPolicy; +use crate::{server::DeliverPolicy, types::StreamResponse}; -pub type BoxedStoreItem = Result<(String, Vec), StreamError>; +pub type BoxedStoreItem = Result; pub type BoxedStream = Box + Send + Unpin>; #[derive(Debug, Clone)] @@ -59,24 +64,34 @@ impl Stream { pub async fn publish( &self, subject: &str, - payload: bytes::Bytes, + response: &StreamResponse, ) -> Result<(), StreamError> { let broker = self.broker.clone(); - broker.publish(subject, payload).await?; + let payload = response.encode_json()?; + broker.publish(subject, payload.into()).await?; Ok(()) } + pub async fn subscribe( + &self, + subject: S, + deliver_policy: DeliverPolicy, + ) -> BoxStream<'static, Result> { + let subject = Arc::new(subject); + self.subscribe_dynamic(subject, deliver_policy).await + } + pub async fn subscribe_dynamic( &self, subject: Arc, deliver_policy: DeliverPolicy, - ) -> BoxStream<'static, Result<(String, Vec), StreamError>> { + ) -> BoxStream<'static, Result> { let broker = self.broker.clone(); let subject = subject.clone(); - let store = self.store.clone(); + let store = self.clone(); let stream = async_stream::try_stream! { if let DeliverPolicy::FromBlock { block_height } = deliver_policy { - let mut historical = store.historical_streaming(subject.to_owned(), Some(block_height), None).await; + let mut historical = store.historical_streaming(subject.to_owned(), Some(block_height), None); while let Some(result) = historical.next().await { yield result?; let throttle_time = *config::STREAM_THROTTLE_HISTORICAL; @@ -85,7 +100,9 @@ impl Stream { } let mut live = broker.subscribe(&subject.parse()).await?; while let Some(msg) = live.next().await { - yield msg?; + let msg = msg?; + let stream_response = StreamResponse::decode_json(&msg)?; + yield stream_response; let throttle_time = *config::STREAM_THROTTLE_LIVE; sleep(Duration::from_millis(throttle_time as u64)).await; } @@ -93,12 +110,45 @@ impl Stream { Box::pin(stream) } - pub async fn subscribe( + pub fn historical_streaming( &self, - subject: S, - deliver_policy: DeliverPolicy, - ) -> BoxStream<'static, Result<(String, Vec), StreamError>> { - let subject = Arc::new(subject); - self.subscribe_dynamic(subject, deliver_policy).await + subject: Arc, + from_block: Option, + query_opts: Option, + ) -> BoxStream<'static, Result> { + let store = self.store.clone(); + let db = self.store.db.clone(); + let stream = async_stream::try_stream! { + let mut current_height = from_block.unwrap_or_default(); + let mut opts = query_opts.unwrap_or_default().with_from_block(Some(current_height)); + let mut last_height = find_last_block_height(&db, opts.clone()).await?; + while current_height <= last_height { + let items = store.find_many_by_subject(&subject, opts.clone()).await?; + for item in items { + let subject = item.subject_str(); + let subject_id = item.subject_id(); + let value = item.encoded_value().to_vec(); + let pointer = item.into(); + let response = StreamResponse::new(subject, subject_id, &value, pointer.to_owned())?; + yield response; + current_height = pointer.block_height; + } + opts.increment_offset(); + // When we reach the last known height, we need to check if any new blocks + // were produced while we were processing the previous ones + if current_height == last_height { + let new_last_height = find_last_block_height(&db, opts.clone()).await?; + if new_last_height > last_height { + // Reset current_height back to process the blocks we haven't seen yet + current_height = last_height; + last_height = new_last_height; + } else { + tracing::debug!("No new blocks found, stopping historical streaming on block {}", current_height); + break + } + } + } + }; + Box::pin(stream) } } diff --git a/crates/domains/Cargo.toml b/crates/domains/Cargo.toml index a60e6ab8..993c7139 100644 --- a/crates/domains/Cargo.toml +++ b/crates/domains/Cargo.toml @@ -25,7 +25,6 @@ serde_json.workspace = true sqlx.workspace = true thiserror.workspace = true tokio.workspace = true -tracing.workspace = true # these dependencies need to update in the future when fuel-core 0.41.4 is on mainnet [target.'cfg(target_os = "linux")'.dependencies] diff --git a/crates/domains/src/blocks/db_item.rs b/crates/domains/src/blocks/db_item.rs index f803e8e6..5e803d78 100644 --- a/crates/domains/src/blocks/db_item.rs +++ b/crates/domains/src/blocks/db_item.rs @@ -2,11 +2,17 @@ use std::cmp::Ordering; use fuel_streams_store::{ db::{DbError, DbItem}, - record::{DataEncoder, RecordEntity, RecordPacket, RecordPacketError}, + record::{ + DataEncoder, + RecordEntity, + RecordPacket, + RecordPacketError, + RecordPointer, + }, }; -use fuel_streams_types::BlockHeight; use serde::{Deserialize, Serialize}; +use super::BlocksSubject; use crate::Subjects; #[derive( @@ -36,8 +42,8 @@ impl DbItem for BlockDbItem { self.subject.clone() } - fn get_block_height(&self) -> BlockHeight { - self.block_height.into() + fn subject_id(&self) -> String { + BlocksSubject::ID.to_string() } } @@ -45,6 +51,7 @@ impl TryFrom<&RecordPacket> for BlockDbItem { type Error = RecordPacketError; fn try_from(packet: &RecordPacket) -> Result { let subject: Subjects = packet + .subject_payload .to_owned() .try_into() .map_err(|_| RecordPacketError::SubjectMismatch)?; @@ -72,3 +79,15 @@ impl Ord for BlockDbItem { self.block_height.cmp(&other.block_height) } } + +impl From for RecordPointer { + fn from(val: BlockDbItem) -> Self { + RecordPointer { + block_height: val.block_height.into(), + tx_index: None, + input_index: None, + output_index: None, + receipt_index: None, + } + } +} diff --git a/crates/domains/src/blocks/packets.rs b/crates/domains/src/blocks/packets.rs index 559b184e..26fe28bc 100644 --- a/crates/domains/src/blocks/packets.rs +++ b/crates/domains/src/blocks/packets.rs @@ -16,7 +16,6 @@ impl PacketBuilder for Block { producer: Some(block_producer), } .dyn_arc(); - let packet = block.to_packet(&subject); let packet = match msg_payload.namespace.clone() { Some(ns) => packet.with_namespace(&ns), diff --git a/crates/domains/src/blocks/record_impl.rs b/crates/domains/src/blocks/record_impl.rs index f0bb9a0e..010182c3 100644 --- a/crates/domains/src/blocks/record_impl.rs +++ b/crates/domains/src/blocks/record_impl.rs @@ -1,7 +1,7 @@ use async_trait::async_trait; use fuel_streams_store::{ db::{DbError, DbResult}, - record::{DataEncoder, Record, RecordEntity, RecordPacket}, + record::{DataEncoder, Record, RecordEntity}, }; use sqlx::PgExecutor; @@ -20,13 +20,12 @@ impl Record for Block { async fn insert<'e, 'c: 'e, E>( executor: E, - packet: &RecordPacket, + db_item: Self::DbItem, ) -> DbResult where 'c: 'e, E: PgExecutor<'c>, { - let db_item = BlockDbItem::try_from(packet)?; let record = sqlx::query_as::<_, BlockDbItem>( "WITH upsert AS ( INSERT INTO blocks (subject, producer_address, block_height, value) diff --git a/crates/domains/src/inputs/db_item.rs b/crates/domains/src/inputs/db_item.rs index e2d0fac4..a3bdcdac 100644 --- a/crates/domains/src/inputs/db_item.rs +++ b/crates/domains/src/inputs/db_item.rs @@ -2,11 +2,17 @@ use std::cmp::Ordering; use fuel_streams_store::{ db::{DbError, DbItem}, - record::{DataEncoder, RecordEntity, RecordPacket, RecordPacketError}, + record::{ + DataEncoder, + RecordEntity, + RecordPacket, + RecordPacketError, + RecordPointer, + }, }; -use fuel_streams_types::BlockHeight; use serde::{Deserialize, Serialize}; +use super::subjects::*; use crate::Subjects; #[derive( @@ -44,8 +50,14 @@ impl DbItem for InputDbItem { self.subject.clone() } - fn get_block_height(&self) -> BlockHeight { - self.block_height.into() + fn subject_id(&self) -> String { + match self.input_type.as_str() { + "coin" => InputsCoinSubject::ID, + "contract" => InputsContractSubject::ID, + "message" => InputsMessageSubject::ID, + _ => InputsSubject::ID, + } + .to_string() } } @@ -53,6 +65,7 @@ impl TryFrom<&RecordPacket> for InputDbItem { type Error = RecordPacketError; fn try_from(packet: &RecordPacket) -> Result { let subject: Subjects = packet + .subject_payload .to_owned() .try_into() .map_err(|_| RecordPacketError::SubjectMismatch)?; @@ -122,3 +135,15 @@ impl Ord for InputDbItem { .then(self.input_index.cmp(&other.input_index)) } } + +impl From for RecordPointer { + fn from(val: InputDbItem) -> Self { + RecordPointer { + block_height: val.block_height.into(), + tx_index: Some(val.tx_index as u32), + input_index: Some(val.input_index as u32), + output_index: None, + receipt_index: None, + } + } +} diff --git a/crates/domains/src/inputs/mod.rs b/crates/domains/src/inputs/mod.rs index 8921f0aa..70ceed28 100644 --- a/crates/domains/src/inputs/mod.rs +++ b/crates/domains/src/inputs/mod.rs @@ -5,5 +5,6 @@ pub mod subjects; pub mod types; pub use db_item::*; +pub use packets::*; pub use subjects::*; pub use types::*; diff --git a/crates/domains/src/inputs/packets.rs b/crates/domains/src/inputs/packets.rs index 91d31185..6ace5ed5 100644 --- a/crates/domains/src/inputs/packets.rs +++ b/crates/domains/src/inputs/packets.rs @@ -20,14 +20,15 @@ impl PacketBuilder for Input { .par_iter() .enumerate() .map(move |(input_index, input)| { - let subject = main_subject( + let subject = DynInputSubject::from(( + input, msg_payload.block_height(), + tx_id.clone(), *tx_index as u32, input_index as u32, - tx_id.clone(), - input, - ); - let packet = input.to_packet(&subject); + )); + + let packet = input.to_packet(&subject.into()); match msg_payload.namespace.clone() { Some(ns) => packet.with_namespace(&ns), _ => packet, @@ -37,39 +38,50 @@ impl PacketBuilder for Input { } } -fn main_subject( - block_height: BlockHeight, - tx_index: u32, - input_index: u32, - tx_id: TxId, - input: &Input, -) -> Arc { - match input { - Input::Contract(contract) => InputsContractSubject { - block_height: Some(block_height), - tx_id: Some(tx_id), - tx_index: Some(tx_index), - input_index: Some(input_index), - contract: Some(contract.contract_id.to_owned().into()), - } - .arc(), - Input::Coin(coin) => InputsCoinSubject { - block_height: Some(block_height), - tx_id: Some(tx_id), - tx_index: Some(tx_index), - input_index: Some(input_index), - owner: Some(coin.owner.to_owned()), - asset: Some(coin.asset_id.to_owned()), - } - .arc(), - Input::Message(message) => InputsMessageSubject { - block_height: Some(block_height), - tx_id: Some(tx_id), - tx_index: Some(tx_index), - input_index: Some(input_index), - sender: Some(message.sender.to_owned()), - recipient: Some(message.recipient.to_owned()), - } - .arc(), +pub struct DynInputSubject(Arc); +impl From<(&Input, BlockHeight, TxId, u32, u32)> for DynInputSubject { + fn from( + (input, block_height, tx_id, tx_index, input_index): ( + &Input, + BlockHeight, + TxId, + u32, + u32, + ), + ) -> Self { + DynInputSubject(match input { + Input::Contract(contract) => InputsContractSubject { + block_height: Some(block_height), + tx_id: Some(tx_id), + tx_index: Some(tx_index), + input_index: Some(input_index), + contract: Some(contract.contract_id.to_owned().into()), + } + .arc(), + Input::Coin(coin) => InputsCoinSubject { + block_height: Some(block_height), + tx_id: Some(tx_id), + tx_index: Some(tx_index), + input_index: Some(input_index), + owner: Some(coin.owner.to_owned()), + asset: Some(coin.asset_id.to_owned()), + } + .arc(), + Input::Message(message) => InputsMessageSubject { + block_height: Some(block_height), + tx_id: Some(tx_id), + tx_index: Some(tx_index), + input_index: Some(input_index), + sender: Some(message.sender.to_owned()), + recipient: Some(message.recipient.to_owned()), + } + .arc(), + }) + } +} + +impl From for Arc { + fn from(subject: DynInputSubject) -> Self { + subject.0 } } diff --git a/crates/domains/src/inputs/record_impl.rs b/crates/domains/src/inputs/record_impl.rs index 2345e2c0..4c21380f 100644 --- a/crates/domains/src/inputs/record_impl.rs +++ b/crates/domains/src/inputs/record_impl.rs @@ -1,7 +1,7 @@ use async_trait::async_trait; use fuel_streams_store::{ db::{DbError, DbResult}, - record::{DataEncoder, Record, RecordEntity, RecordPacket}, + record::{DataEncoder, Record, RecordEntity}, }; use sqlx::PgExecutor; @@ -20,13 +20,12 @@ impl Record for Input { async fn insert<'e, 'c: 'e, E>( executor: E, - packet: &RecordPacket, + db_item: Self::DbItem, ) -> DbResult where 'c: 'e, E: PgExecutor<'c>, { - let db_item = InputDbItem::try_from(packet)?; let record = sqlx::query_as::<_, InputDbItem>( "WITH upsert AS ( INSERT INTO inputs ( diff --git a/crates/domains/src/outputs/db_item.rs b/crates/domains/src/outputs/db_item.rs index 9f284859..028b7b7e 100644 --- a/crates/domains/src/outputs/db_item.rs +++ b/crates/domains/src/outputs/db_item.rs @@ -2,11 +2,17 @@ use std::cmp::Ordering; use fuel_streams_store::{ db::{DbError, DbItem}, - record::{DataEncoder, RecordEntity, RecordPacket, RecordPacketError}, + record::{ + DataEncoder, + RecordEntity, + RecordPacket, + RecordPacketError, + RecordPointer, + }, }; -use fuel_streams_types::BlockHeight; use serde::{Deserialize, Serialize}; +use super::subjects::*; use crate::Subjects; #[derive( @@ -42,8 +48,16 @@ impl DbItem for OutputDbItem { self.subject.clone() } - fn get_block_height(&self) -> BlockHeight { - self.block_height.into() + fn subject_id(&self) -> String { + match self.output_type.as_str() { + "coin" => OutputsCoinSubject::ID, + "contract" => OutputsContractSubject::ID, + "change" => OutputsChangeSubject::ID, + "variable" => OutputsVariableSubject::ID, + "contract_created" => OutputsContractCreatedSubject::ID, + _ => OutputsSubject::ID, + } + .to_string() } } @@ -51,6 +65,7 @@ impl TryFrom<&RecordPacket> for OutputDbItem { type Error = RecordPacketError; fn try_from(packet: &RecordPacket) -> Result { let subject: Subjects = packet + .subject_payload .to_owned() .try_into() .map_err(|_| RecordPacketError::SubjectMismatch)?; @@ -138,3 +153,15 @@ impl Ord for OutputDbItem { .then(self.output_index.cmp(&other.output_index)) } } + +impl From for RecordPointer { + fn from(val: OutputDbItem) -> Self { + RecordPointer { + block_height: val.block_height.into(), + tx_index: Some(val.tx_index as u32), + input_index: None, + output_index: Some(val.output_index as u32), + receipt_index: None, + } + } +} diff --git a/crates/domains/src/outputs/mod.rs b/crates/domains/src/outputs/mod.rs index 8921f0aa..70ceed28 100644 --- a/crates/domains/src/outputs/mod.rs +++ b/crates/domains/src/outputs/mod.rs @@ -5,5 +5,6 @@ pub mod subjects; pub mod types; pub use db_item::*; +pub use packets::*; pub use subjects::*; pub use types::*; diff --git a/crates/domains/src/outputs/packets.rs b/crates/domains/src/outputs/packets.rs index f7b53589..859eab32 100644 --- a/crates/domains/src/outputs/packets.rs +++ b/crates/domains/src/outputs/packets.rs @@ -25,15 +25,15 @@ impl PacketBuilder for Output { .par_iter() .enumerate() .map(|(output_index, output)| { - let subject = main_subject( + let subject = DynOutputSubject::from(( + output, msg_payload.block_height(), + tx_id.to_owned(), *tx_index as u32, output_index as u32, - tx_id.to_owned(), tx, - output, - ); - let packet = output.to_packet(&subject); + )); + let packet = output.to_packet(&subject.into()); match msg_payload.namespace.clone() { Some(ns) => packet.with_namespace(&ns), _ => packet, @@ -43,79 +43,76 @@ impl PacketBuilder for Output { } } -fn main_subject( - block_height: BlockHeight, - tx_index: u32, - output_index: u32, - tx_id: TxId, - transaction: &Transaction, - output: &Output, -) -> Arc { - match output { - Output::Coin(OutputCoin { to, asset_id, .. }) => OutputsCoinSubject { - block_height: Some(block_height), - tx_id: Some(tx_id), - tx_index: Some(tx_index), - output_index: Some(output_index), - to: Some(to.to_owned()), - asset: Some(asset_id.to_owned()), - } - .arc(), - Output::Contract(contract) => { - let contract_id = - match find_output_contract_id(transaction, contract) { - Some(contract_id) => contract_id, - None => { - tracing::warn!( - "Contract ID not found for output: {:?}", - output - ); - - Default::default() - } - }; - - OutputsContractSubject { +pub struct DynOutputSubject(Arc); +impl From<(&Output, BlockHeight, TxId, u32, u32, &Transaction)> + for DynOutputSubject +{ + fn from( + (output, block_height, tx_id, tx_index, output_index, transaction): ( + &Output, + BlockHeight, + TxId, + u32, + u32, + &Transaction, + ), + ) -> Self { + DynOutputSubject(match output { + Output::Coin(coin) => OutputsCoinSubject { block_height: Some(block_height), tx_id: Some(tx_id), tx_index: Some(tx_index), output_index: Some(output_index), - contract: Some(contract_id), + to: Some(coin.to.to_owned()), + asset: Some(coin.asset_id.to_owned()), } - .arc() - } - Output::Change(OutputChange { to, asset_id, .. }) => { - OutputsChangeSubject { + .arc(), + Output::Contract(contract) => { + let contract_id = + find_output_contract_id(transaction, contract) + .unwrap_or_default(); + OutputsContractSubject { + block_height: Some(block_height), + tx_id: Some(tx_id), + tx_index: Some(tx_index), + output_index: Some(output_index), + contract: Some(contract_id), + } + .arc() + } + Output::Change(change) => OutputsChangeSubject { block_height: Some(block_height), tx_id: Some(tx_id), tx_index: Some(tx_index), output_index: Some(output_index), - to: Some(to.to_owned()), - asset: Some(asset_id.to_owned()), + to: Some(change.to.to_owned()), + asset: Some(change.asset_id.to_owned()), } - .arc() - } - Output::Variable(OutputVariable { to, asset_id, .. }) => { - OutputsVariableSubject { + .arc(), + Output::Variable(variable) => OutputsVariableSubject { block_height: Some(block_height), tx_id: Some(tx_id), tx_index: Some(tx_index), output_index: Some(output_index), - to: Some(to.to_owned()), - asset: Some(asset_id.to_owned()), + to: Some(variable.to.to_owned()), + asset: Some(variable.asset_id.to_owned()), } - .arc() - } - Output::ContractCreated(OutputContractCreated { - contract_id, .. - }) => OutputsContractCreatedSubject { - block_height: Some(block_height), - tx_id: Some(tx_id), - tx_index: Some(tx_index), - output_index: Some(output_index), - contract: Some(contract_id.to_owned()), - } - .arc(), + .arc(), + Output::ContractCreated(created) => OutputsContractCreatedSubject { + block_height: Some(block_height), + tx_id: Some(tx_id), + tx_index: Some(tx_index), + output_index: Some(output_index), + contract: Some(created.contract_id.to_owned()), + } + .arc(), + }) + } +} + +impl From for Arc { + fn from(subject: DynOutputSubject) -> Self { + subject.0 } } diff --git a/crates/domains/src/outputs/record_impl.rs b/crates/domains/src/outputs/record_impl.rs index 65fdafae..e46aa9de 100644 --- a/crates/domains/src/outputs/record_impl.rs +++ b/crates/domains/src/outputs/record_impl.rs @@ -1,7 +1,7 @@ use async_trait::async_trait; use fuel_streams_store::{ db::{DbError, DbResult}, - record::{DataEncoder, Record, RecordEntity, RecordPacket}, + record::{DataEncoder, Record, RecordEntity}, }; use sqlx::PgExecutor; @@ -20,13 +20,12 @@ impl Record for Output { async fn insert<'e, 'c: 'e, E>( executor: E, - packet: &RecordPacket, + db_item: Self::DbItem, ) -> DbResult where 'c: 'e, E: PgExecutor<'c>, { - let db_item = OutputDbItem::try_from(packet)?; let record = sqlx::query_as::<_, OutputDbItem>( "WITH upsert AS ( INSERT INTO outputs ( diff --git a/crates/domains/src/receipts/db_item.rs b/crates/domains/src/receipts/db_item.rs index 54b96729..0f517f15 100644 --- a/crates/domains/src/receipts/db_item.rs +++ b/crates/domains/src/receipts/db_item.rs @@ -2,11 +2,17 @@ use std::cmp::Ordering; use fuel_streams_store::{ db::{DbError, DbItem}, - record::{DataEncoder, RecordEntity, RecordPacket, RecordPacketError}, + record::{ + DataEncoder, + RecordEntity, + RecordPacket, + RecordPacketError, + RecordPointer, + }, }; -use fuel_streams_types::BlockHeight; use serde::{Deserialize, Serialize}; +use super::subjects::*; use crate::Subjects; #[derive( @@ -47,8 +53,24 @@ impl DbItem for ReceiptDbItem { self.subject.clone() } - fn get_block_height(&self) -> BlockHeight { - self.block_height.into() + fn subject_id(&self) -> String { + match self.receipt_type.as_str() { + "call" => ReceiptsCallSubject::ID, + "return" => ReceiptsReturnSubject::ID, + "return_data" => ReceiptsReturnDataSubject::ID, + "panic" => ReceiptsPanicSubject::ID, + "revert" => ReceiptsRevertSubject::ID, + "log" => ReceiptsLogSubject::ID, + "log_data" => ReceiptsLogDataSubject::ID, + "transfer" => ReceiptsTransferSubject::ID, + "transfer_out" => ReceiptsTransferOutSubject::ID, + "script_result" => ReceiptsScriptResultSubject::ID, + "message_out" => ReceiptsMessageOutSubject::ID, + "mint" => ReceiptsMintSubject::ID, + "burn" => ReceiptsBurnSubject::ID, + _ => ReceiptsSubject::ID, + } + .to_string() } } @@ -56,6 +78,7 @@ impl TryFrom<&RecordPacket> for ReceiptDbItem { type Error = RecordPacketError; fn try_from(packet: &RecordPacket) -> Result { let subject: Subjects = packet + .subject_payload .to_owned() .try_into() .map_err(|_| RecordPacketError::SubjectMismatch)?; @@ -304,3 +327,15 @@ impl Ord for ReceiptDbItem { .then(self.receipt_index.cmp(&other.receipt_index)) } } + +impl From for RecordPointer { + fn from(val: ReceiptDbItem) -> Self { + RecordPointer { + block_height: val.block_height.into(), + tx_index: Some(val.tx_index as u32), + input_index: None, + output_index: None, + receipt_index: Some(val.receipt_index as u32), + } + } +} diff --git a/crates/domains/src/receipts/mod.rs b/crates/domains/src/receipts/mod.rs index 8921f0aa..70ceed28 100644 --- a/crates/domains/src/receipts/mod.rs +++ b/crates/domains/src/receipts/mod.rs @@ -5,5 +5,6 @@ pub mod subjects; pub mod types; pub use db_item::*; +pub use packets::*; pub use subjects::*; pub use types::*; diff --git a/crates/domains/src/receipts/packets.rs b/crates/domains/src/receipts/packets.rs index 3aeb7e40..79a2ce31 100644 --- a/crates/domains/src/receipts/packets.rs +++ b/crates/domains/src/receipts/packets.rs @@ -21,14 +21,14 @@ impl PacketBuilder for Receipt { .par_iter() .enumerate() .map(|(receipt_index, receipt)| { - let subject = main_subject( + let subject = DynReceiptSubject::from(( + receipt, msg_payload.block_height(), + tx_id.clone(), *tx_index as u32, receipt_index as u32, - tx_id.clone(), - receipt, - ); - let packet = receipt.to_packet(&subject); + )); + let packet = receipt.to_packet(&subject.into()); match msg_payload.namespace.clone() { Some(ns) => packet.with_namespace(&ns), _ => packet, @@ -38,156 +38,173 @@ impl PacketBuilder for Receipt { } } -fn main_subject( - block_height: BlockHeight, - tx_index: u32, - receipt_index: u32, - tx_id: TxId, - receipt: &Receipt, -) -> Arc { - match receipt { - Receipt::Call(CallReceipt { - id: from, - to, - asset_id, - .. - }) => ReceiptsCallSubject { - block_height: Some(block_height), - tx_id: Some(tx_id), - tx_index: Some(tx_index), - receipt_index: Some(receipt_index), - from: Some(from.to_owned()), - to: Some(to.to_owned()), - asset: Some(asset_id.to_owned()), - } - .arc(), - Receipt::Return(ReturnReceipt { id, .. }) => ReceiptsReturnSubject { - block_height: Some(block_height), - tx_id: Some(tx_id), - tx_index: Some(tx_index), - receipt_index: Some(receipt_index), - contract: Some(id.to_owned()), - } - .arc(), - Receipt::ReturnData(ReturnDataReceipt { id, .. }) => { - ReceiptsReturnDataSubject { +pub struct DynReceiptSubject(Arc); +impl From<(&Receipt, BlockHeight, TxId, u32, u32)> for DynReceiptSubject { + fn from( + (receipt, block_height, tx_id, tx_index, receipt_index): ( + &Receipt, + BlockHeight, + TxId, + u32, + u32, + ), + ) -> Self { + DynReceiptSubject(match receipt { + Receipt::Call(CallReceipt { + id: from, + to, + asset_id, + .. + }) => ReceiptsCallSubject { + block_height: Some(block_height), + tx_id: Some(tx_id), + tx_index: Some(tx_index), + receipt_index: Some(receipt_index), + from: Some(from.to_owned()), + to: Some(to.to_owned()), + asset: Some(asset_id.to_owned()), + } + .arc(), + Receipt::Return(ReturnReceipt { id, .. }) => { + ReceiptsReturnSubject { + block_height: Some(block_height), + tx_id: Some(tx_id), + tx_index: Some(tx_index), + receipt_index: Some(receipt_index), + contract: Some(id.to_owned()), + } + .arc() + } + Receipt::ReturnData(ReturnDataReceipt { id, .. }) => { + ReceiptsReturnDataSubject { + block_height: Some(block_height), + tx_id: Some(tx_id), + tx_index: Some(tx_index), + receipt_index: Some(receipt_index), + contract: Some(id.to_owned()), + } + .arc() + } + Receipt::Panic(PanicReceipt { id, .. }) => ReceiptsPanicSubject { block_height: Some(block_height), tx_id: Some(tx_id), tx_index: Some(tx_index), receipt_index: Some(receipt_index), contract: Some(id.to_owned()), } - .arc() - } - Receipt::Panic(PanicReceipt { id, .. }) => ReceiptsPanicSubject { - block_height: Some(block_height), - tx_id: Some(tx_id), - tx_index: Some(tx_index), - receipt_index: Some(receipt_index), - contract: Some(id.to_owned()), - } - .arc(), - Receipt::Revert(RevertReceipt { id, .. }) => ReceiptsRevertSubject { - block_height: Some(block_height), - tx_id: Some(tx_id), - tx_index: Some(tx_index), - receipt_index: Some(receipt_index), - contract: Some(id.to_owned()), - } - .arc(), - Receipt::Log(LogReceipt { id, .. }) => ReceiptsLogSubject { - block_height: Some(block_height), - tx_id: Some(tx_id), - tx_index: Some(tx_index), - receipt_index: Some(receipt_index), - contract: Some(id.to_owned()), - } - .arc(), - Receipt::LogData(LogDataReceipt { id, .. }) => ReceiptsLogDataSubject { - block_height: Some(block_height), - tx_id: Some(tx_id), - tx_index: Some(tx_index), - receipt_index: Some(receipt_index), - contract: Some(id.to_owned()), - } - .arc(), - Receipt::Transfer(TransferReceipt { - id: from, - to, - asset_id, - .. - }) => ReceiptsTransferSubject { - block_height: Some(block_height), - tx_id: Some(tx_id), - tx_index: Some(tx_index), - receipt_index: Some(receipt_index), - from: Some(from.to_owned()), - to: Some(to.to_owned()), - asset: Some(asset_id.to_owned()), - } - .arc(), - - Receipt::TransferOut(TransferOutReceipt { - id: from, - to, - asset_id, - .. - }) => ReceiptsTransferOutSubject { - block_height: Some(block_height), - tx_id: Some(tx_id), - tx_index: Some(tx_index), - receipt_index: Some(receipt_index), - from: Some(from.to_owned()), - to_address: Some(to.to_owned()), - asset: Some(asset_id.to_owned()), - } - .arc(), - - Receipt::ScriptResult(ScriptResultReceipt { .. }) => { - ReceiptsScriptResultSubject { + .arc(), + Receipt::Revert(RevertReceipt { id, .. }) => { + ReceiptsRevertSubject { + block_height: Some(block_height), + tx_id: Some(tx_id), + tx_index: Some(tx_index), + receipt_index: Some(receipt_index), + contract: Some(id.to_owned()), + } + .arc() + } + Receipt::Log(LogReceipt { id, .. }) => ReceiptsLogSubject { + block_height: Some(block_height), + tx_id: Some(tx_id), + tx_index: Some(tx_index), + receipt_index: Some(receipt_index), + contract: Some(id.to_owned()), + } + .arc(), + Receipt::LogData(LogDataReceipt { id, .. }) => { + ReceiptsLogDataSubject { + block_height: Some(block_height), + tx_id: Some(tx_id), + tx_index: Some(tx_index), + receipt_index: Some(receipt_index), + contract: Some(id.to_owned()), + } + .arc() + } + Receipt::Transfer(TransferReceipt { + id: from, + to, + asset_id, + .. + }) => ReceiptsTransferSubject { block_height: Some(block_height), tx_id: Some(tx_id), tx_index: Some(tx_index), receipt_index: Some(receipt_index), + from: Some(from.to_owned()), + to: Some(to.to_owned()), + asset: Some(asset_id.to_owned()), } - .arc() - } - Receipt::MessageOut(MessageOutReceipt { - sender, recipient, .. - }) => ReceiptsMessageOutSubject { - block_height: Some(block_height), - tx_id: Some(tx_id), - tx_index: Some(tx_index), - receipt_index: Some(receipt_index), - sender: Some(sender.to_owned()), - recipient: Some(recipient.to_owned()), - } - .arc(), - Receipt::Mint(MintReceipt { - contract_id, - sub_id, - .. - }) => ReceiptsMintSubject { - block_height: Some(block_height), - tx_id: Some(tx_id), - tx_index: Some(tx_index), - receipt_index: Some(receipt_index), - contract: Some(contract_id.to_owned()), - sub_id: Some((*sub_id).to_owned()), - } - .arc(), - Receipt::Burn(BurnReceipt { - contract_id, - sub_id, - .. - }) => ReceiptsBurnSubject { - block_height: Some(block_height), - tx_id: Some(tx_id), - tx_index: Some(tx_index), - receipt_index: Some(receipt_index), - contract: Some(contract_id.to_owned()), - sub_id: Some((*sub_id).to_owned()), - } - .arc(), + .arc(), + Receipt::TransferOut(TransferOutReceipt { + id: from, + to, + asset_id, + .. + }) => ReceiptsTransferOutSubject { + block_height: Some(block_height), + tx_id: Some(tx_id), + tx_index: Some(tx_index), + receipt_index: Some(receipt_index), + from: Some(from.to_owned()), + to_address: Some(to.to_owned()), + asset: Some(asset_id.to_owned()), + } + .arc(), + Receipt::ScriptResult(ScriptResultReceipt { .. }) => { + ReceiptsScriptResultSubject { + block_height: Some(block_height), + tx_id: Some(tx_id), + tx_index: Some(tx_index), + receipt_index: Some(receipt_index), + } + .arc() + } + Receipt::MessageOut(MessageOutReceipt { + sender, + recipient, + .. + }) => ReceiptsMessageOutSubject { + block_height: Some(block_height), + tx_id: Some(tx_id), + tx_index: Some(tx_index), + receipt_index: Some(receipt_index), + sender: Some(sender.to_owned()), + recipient: Some(recipient.to_owned()), + } + .arc(), + Receipt::Mint(MintReceipt { + contract_id, + sub_id, + .. + }) => ReceiptsMintSubject { + block_height: Some(block_height), + tx_id: Some(tx_id), + tx_index: Some(tx_index), + receipt_index: Some(receipt_index), + contract: Some(contract_id.to_owned()), + sub_id: Some((*sub_id).to_owned()), + } + .arc(), + Receipt::Burn(BurnReceipt { + contract_id, + sub_id, + .. + }) => ReceiptsBurnSubject { + block_height: Some(block_height), + tx_id: Some(tx_id), + tx_index: Some(tx_index), + receipt_index: Some(receipt_index), + contract: Some(contract_id.to_owned()), + sub_id: Some((*sub_id).to_owned()), + } + .arc(), + }) + } +} + +impl From for Arc { + fn from(subject: DynReceiptSubject) -> Self { + subject.0 } } diff --git a/crates/domains/src/receipts/record_impl.rs b/crates/domains/src/receipts/record_impl.rs index a7b32d36..b0ecc74a 100644 --- a/crates/domains/src/receipts/record_impl.rs +++ b/crates/domains/src/receipts/record_impl.rs @@ -1,7 +1,7 @@ use async_trait::async_trait; use fuel_streams_store::{ db::{DbError, DbResult}, - record::{DataEncoder, Record, RecordEntity, RecordPacket}, + record::{DataEncoder, Record, RecordEntity}, }; use sqlx::PgExecutor; @@ -20,13 +20,12 @@ impl Record for Receipt { async fn insert<'e, 'c: 'e, E>( executor: E, - packet: &RecordPacket, + db_item: Self::DbItem, ) -> DbResult where 'c: 'e, E: PgExecutor<'c>, { - let db_item = ReceiptDbItem::try_from(packet)?; let record = sqlx::query_as::<_, ReceiptDbItem>( "WITH upsert AS ( INSERT INTO receipts ( diff --git a/crates/domains/src/subjects.rs b/crates/domains/src/subjects.rs index 342bea41..735a8d80 100644 --- a/crates/domains/src/subjects.rs +++ b/crates/domains/src/subjects.rs @@ -1,6 +1,5 @@ use std::sync::Arc; -use fuel_streams_store::record::RecordPacket; use fuel_streams_subject::subject::{ IntoSubject, SubjectPayload, @@ -91,19 +90,6 @@ impl From for Arc { macro_rules! impl_try_from_subjects { ($(($subject_type:ty, $variant:ident)),+ $(,)?) => { - // Implementation for RecordPacket - impl TryFrom for Subjects { - type Error = SubjectsError; - fn try_from(packet: RecordPacket) -> Result { - $( - if let Ok(subject) = packet.subject_matches::<$subject_type>() { - return Ok(Subjects::$variant(subject)); - } - )+ - Err(SubjectsError::UnknownSubject(packet.subject_str())) - } - } - // Implementation for SubjectPayload impl TryFrom for Subjects { type Error = SubjectsError; diff --git a/crates/domains/src/transactions/db_item.rs b/crates/domains/src/transactions/db_item.rs index 20716cee..d46b91d3 100644 --- a/crates/domains/src/transactions/db_item.rs +++ b/crates/domains/src/transactions/db_item.rs @@ -2,11 +2,17 @@ use std::cmp::Ordering; use fuel_streams_store::{ db::{DbError, DbItem}, - record::{DataEncoder, RecordEntity, RecordPacket, RecordPacketError}, + record::{ + DataEncoder, + RecordEntity, + RecordPacket, + RecordPacketError, + RecordPointer, + }, }; -use fuel_streams_types::BlockHeight; use serde::{Deserialize, Serialize}; +use super::subjects::*; use crate::Subjects; #[derive( @@ -39,8 +45,8 @@ impl DbItem for TransactionDbItem { self.subject.clone() } - fn get_block_height(&self) -> BlockHeight { - self.block_height.into() + fn subject_id(&self) -> String { + TransactionsSubject::ID.to_string() } } @@ -48,6 +54,7 @@ impl TryFrom<&RecordPacket> for TransactionDbItem { type Error = RecordPacketError; fn try_from(packet: &RecordPacket) -> Result { let subject: Subjects = packet + .subject_payload .to_owned() .try_into() .map_err(|_| RecordPacketError::SubjectMismatch)?; @@ -82,3 +89,15 @@ impl Ord for TransactionDbItem { .then(self.tx_index.cmp(&other.tx_index)) } } + +impl From for RecordPointer { + fn from(val: TransactionDbItem) -> Self { + RecordPointer { + block_height: val.block_height.into(), + tx_index: Some(val.tx_index as u32), + input_index: None, + output_index: None, + receipt_index: None, + } + } +} diff --git a/crates/domains/src/transactions/packets.rs b/crates/domains/src/transactions/packets.rs index 5c63ad9a..af67ee5a 100644 --- a/crates/domains/src/transactions/packets.rs +++ b/crates/domains/src/transactions/packets.rs @@ -54,7 +54,6 @@ fn main_packet( kind: Some(tx.kind.to_owned()), } .dyn_arc(); - let packet = tx.to_packet(&subject); let packet = match msg_payload.namespace.clone() { Some(ns) => packet.with_namespace(&ns), diff --git a/crates/domains/src/transactions/record_impl.rs b/crates/domains/src/transactions/record_impl.rs index 5583b898..28ed085d 100644 --- a/crates/domains/src/transactions/record_impl.rs +++ b/crates/domains/src/transactions/record_impl.rs @@ -1,7 +1,7 @@ use async_trait::async_trait; use fuel_streams_store::{ db::{DbError, DbResult}, - record::{DataEncoder, Record, RecordEntity, RecordPacket}, + record::{DataEncoder, Record, RecordEntity}, }; use sqlx::PgExecutor; @@ -20,13 +20,12 @@ impl Record for Transaction { async fn insert<'e, 'c: 'e, E>( executor: E, - packet: &RecordPacket, + db_item: Self::DbItem, ) -> DbResult where 'c: 'e, E: PgExecutor<'c>, { - let db_item = TransactionDbItem::try_from(packet)?; let record = sqlx::query_as::<_, TransactionDbItem>( "WITH upsert AS ( INSERT INTO transactions ( diff --git a/crates/domains/src/utxos/db_item.rs b/crates/domains/src/utxos/db_item.rs index c5a2c9ac..53f06be1 100644 --- a/crates/domains/src/utxos/db_item.rs +++ b/crates/domains/src/utxos/db_item.rs @@ -2,11 +2,17 @@ use std::cmp::Ordering; use fuel_streams_store::{ db::{DbError, DbItem}, - record::{DataEncoder, RecordEntity, RecordPacket, RecordPacketError}, + record::{ + DataEncoder, + RecordEntity, + RecordPacket, + RecordPacketError, + RecordPointer, + }, }; -use fuel_streams_types::BlockHeight; use serde::{Deserialize, Serialize}; +use super::subjects::*; use crate::Subjects; #[derive( @@ -40,8 +46,8 @@ impl DbItem for UtxoDbItem { self.subject.clone() } - fn get_block_height(&self) -> BlockHeight { - self.block_height.into() + fn subject_id(&self) -> String { + UtxosSubject::ID.to_string() } } @@ -49,6 +55,7 @@ impl TryFrom<&RecordPacket> for UtxoDbItem { type Error = RecordPacketError; fn try_from(packet: &RecordPacket) -> Result { let subject: Subjects = packet + .subject_payload .to_owned() .try_into() .map_err(|_| RecordPacketError::SubjectMismatch)?; @@ -86,3 +93,15 @@ impl Ord for UtxoDbItem { .then(self.input_index.cmp(&other.input_index)) } } + +impl From for RecordPointer { + fn from(val: UtxoDbItem) -> Self { + RecordPointer { + block_height: val.block_height.into(), + tx_index: Some(val.tx_index as u32), + input_index: Some(val.input_index as u32), + output_index: None, + receipt_index: None, + } + } +} diff --git a/crates/domains/src/utxos/mod.rs b/crates/domains/src/utxos/mod.rs index 8921f0aa..70ceed28 100644 --- a/crates/domains/src/utxos/mod.rs +++ b/crates/domains/src/utxos/mod.rs @@ -5,5 +5,6 @@ pub mod subjects; pub mod types; pub use db_item::*; +pub use packets::*; pub use subjects::*; pub use types::*; diff --git a/crates/domains/src/utxos/packets.rs b/crates/domains/src/utxos/packets.rs index c0a27414..7cc28f88 100644 --- a/crates/domains/src/utxos/packets.rs +++ b/crates/domains/src/utxos/packets.rs @@ -25,14 +25,14 @@ impl PacketBuilder for Utxo { .par_iter() .enumerate() .map(|(input_index, input)| { - let (utxo, subject) = main_subject( + let packet: RecordPacket = DynUtxoSubject::from(( + input, msg_payload.block_height(), + tx_id.clone(), *tx_index as u32, input_index as u32, - tx_id.clone(), - input, - ); - let packet = utxo.to_packet(&subject); + )) + .into(); match msg_payload.namespace.clone() { Some(ns) => packet.with_namespace(&ns), _ => packet, @@ -42,81 +42,99 @@ impl PacketBuilder for Utxo { } } -fn main_subject( - block_height: BlockHeight, - tx_index: u32, - input_index: u32, - tx_id: TxId, - input: &Input, -) -> (Utxo, Arc) { - match input { - Input::Contract(InputContract { utxo_id, .. }) => { - let utxo = Utxo { - utxo_id: utxo_id.to_owned(), - tx_id: tx_id.to_owned(), - ..Default::default() - }; - let subject = UtxosSubject { - block_height: Some(block_height), - tx_id: Some(tx_id), - tx_index: Some(tx_index), - input_index: Some(input_index), - utxo_type: Some(UtxoType::Contract), - utxo_id: Some(utxo_id.into()), +#[derive(Debug, Clone)] +pub struct DynUtxoSubject(Utxo, Arc); +impl From<(&Input, BlockHeight, TxId, u32, u32)> for DynUtxoSubject { + fn from( + (input, block_height, tx_id, tx_index, input_index): ( + &Input, + BlockHeight, + TxId, + u32, + u32, + ), + ) -> Self { + match input { + Input::Contract(InputContract { utxo_id, .. }) => { + let utxo = Utxo { + utxo_id: utxo_id.to_owned(), + tx_id: tx_id.to_owned(), + ..Default::default() + }; + let subject = UtxosSubject { + block_height: Some(block_height), + tx_id: Some(tx_id), + tx_index: Some(tx_index), + input_index: Some(input_index), + utxo_type: Some(UtxoType::Contract), + utxo_id: Some(utxo_id.into()), + } + .arc(); + DynUtxoSubject(utxo, subject) } - .arc(); - (utxo, subject) - } - Input::Coin(InputCoin { - utxo_id, amount, .. - }) => { - let utxo = Utxo { - utxo_id: utxo_id.to_owned(), - amount: Some(*amount), - tx_id: tx_id.to_owned(), - ..Default::default() - }; - let subject = UtxosSubject { - block_height: Some(block_height), - tx_id: Some(tx_id), - tx_index: Some(tx_index), - input_index: Some(input_index), - utxo_type: Some(UtxoType::Coin), - utxo_id: Some(utxo_id.into()), + Input::Coin(InputCoin { + utxo_id, amount, .. + }) => { + let utxo = Utxo { + utxo_id: utxo_id.to_owned(), + amount: Some(*amount), + tx_id: tx_id.to_owned(), + ..Default::default() + }; + let subject = UtxosSubject { + block_height: Some(block_height), + tx_id: Some(tx_id), + tx_index: Some(tx_index), + input_index: Some(input_index), + utxo_type: Some(UtxoType::Coin), + utxo_id: Some(utxo_id.into()), + } + .arc(); + DynUtxoSubject(utxo, subject) } - .arc(); - (utxo, subject) - } - Input::Message( - input @ InputMessage { - amount, - nonce, - recipient, - sender, - data, - .. - }, - ) => { - let utxo_id = input.computed_utxo_id(); - let utxo = Utxo { - tx_id: tx_id.to_owned(), - utxo_id: utxo_id.to_owned(), - sender: Some(sender.to_owned()), - recipient: Some(recipient.to_owned()), - nonce: Some(nonce.to_owned()), - amount: Some(*amount), - data: Some(data.to_owned()), - }; - let subject = UtxosSubject { - block_height: Some(block_height), - tx_id: Some(tx_id), - tx_index: Some(tx_index), - input_index: Some(input_index), - utxo_type: Some(UtxoType::Message), - utxo_id: Some(utxo_id.into()), + Input::Message( + input @ InputMessage { + amount, + nonce, + recipient, + sender, + data, + .. + }, + ) => { + let utxo_id = input.computed_utxo_id(); + let utxo = Utxo { + tx_id: tx_id.to_owned(), + utxo_id: utxo_id.to_owned(), + sender: Some(sender.to_owned()), + recipient: Some(recipient.to_owned()), + nonce: Some(nonce.to_owned()), + amount: Some(*amount), + data: Some(data.to_owned()), + }; + let subject = UtxosSubject { + block_height: Some(block_height), + tx_id: Some(tx_id), + tx_index: Some(tx_index), + input_index: Some(input_index), + utxo_type: Some(UtxoType::Message), + utxo_id: Some(utxo_id.into()), + } + .arc(); + DynUtxoSubject(utxo, subject) } - .arc(); - (utxo, subject) } } } + +impl DynUtxoSubject { + pub fn packet(&self) -> RecordPacket { + self.0.to_packet(&self.1.clone()) + } +} + +impl From for RecordPacket { + fn from(subject: DynUtxoSubject) -> Self { + subject.packet() + } +} diff --git a/crates/domains/src/utxos/record_impl.rs b/crates/domains/src/utxos/record_impl.rs index 71f7ad9a..338efaa2 100644 --- a/crates/domains/src/utxos/record_impl.rs +++ b/crates/domains/src/utxos/record_impl.rs @@ -1,7 +1,7 @@ use async_trait::async_trait; use fuel_streams_store::{ db::{DbError, DbResult}, - record::{DataEncoder, Record, RecordEntity, RecordPacket}, + record::{DataEncoder, Record, RecordEntity}, }; use sqlx::PgExecutor; @@ -20,13 +20,12 @@ impl Record for Utxo { async fn insert<'e, 'c: 'e, E>( executor: E, - packet: &RecordPacket, + db_item: Self::DbItem, ) -> DbResult where 'c: 'e, E: PgExecutor<'c>, { - let db_item = UtxoDbItem::try_from(packet)?; let record = sqlx::query_as::<_, UtxoDbItem>( "WITH upsert AS ( INSERT INTO utxos ( diff --git a/crates/message-broker/src/msg_broker.rs b/crates/message-broker/src/msg_broker.rs index 4dfa8c3c..edca9cfd 100644 --- a/crates/message-broker/src/msg_broker.rs +++ b/crates/message-broker/src/msg_broker.rs @@ -78,7 +78,5 @@ pub type MessageBlockStream = Box< >; pub type MessageStream = Box< - dyn Stream), MessageBrokerError>> - + Send - + Unpin, + dyn Stream> + Send + Unpin, >; diff --git a/crates/message-broker/src/nats.rs b/crates/message-broker/src/nats.rs index 8a4ac757..4590bd15 100644 --- a/crates/message-broker/src/nats.rs +++ b/crates/message-broker/src/nats.rs @@ -119,7 +119,7 @@ impl NatsMessageBroker { .subscribe(subject) .await .map_err(|e| MessageBrokerError::Subscription(e.to_string()))? - .map(|msg| Ok((msg.subject.to_string(), msg.payload.to_vec()))); + .map(|msg| Ok(bytes::Bytes::from(msg.payload.to_vec()))); Ok(Box::new(stream)) } @@ -239,8 +239,7 @@ mod tests { broker.publish("test.topic", vec![4, 5, 6].into()).await?; let result = receiver.await.expect("receiver task panicked")?; - let topic = format!("{}.{}", broker.namespace(), "test.topic"); - assert_eq!(result, (topic, vec![4, 5, 6])); + assert_eq!(result, bytes::Bytes::from(vec![4, 5, 6])); Ok(()) } diff --git a/crates/store/Cargo.toml b/crates/store/Cargo.toml index 600a0099..2b386ed7 100644 --- a/crates/store/Cargo.toml +++ b/crates/store/Cargo.toml @@ -11,13 +11,11 @@ rust-version.workspace = true version.workspace = true [dependencies] -async-stream.workspace = true async-trait.workspace = true dotenvy.workspace = true fuel-data-parser.workspace = true fuel-streams-subject.workspace = true fuel-streams-types.workspace = true -futures.workspace = true serde.workspace = true sqlx = { workspace = true, default-features = false, features = [ "any", diff --git a/crates/store/src/db/db_item.rs b/crates/store/src/db/db_item.rs index 5a766604..3c2822cf 100644 --- a/crates/store/src/db/db_item.rs +++ b/crates/store/src/db/db_item.rs @@ -1,6 +1,5 @@ use async_trait::async_trait; use fuel_data_parser::DataEncoder; -use fuel_streams_types::BlockHeight; use sqlx::postgres::PgRow; use super::DbError; @@ -24,5 +23,5 @@ pub trait DbItem: fn entity(&self) -> &RecordEntity; fn encoded_value(&self) -> &[u8]; fn subject_str(&self) -> String; - fn get_block_height(&self) -> BlockHeight; + fn subject_id(&self) -> String; } diff --git a/crates/store/src/record/record_impl.rs b/crates/store/src/record/record_impl.rs index 578aa87d..b358aa93 100644 --- a/crates/store/src/record/record_impl.rs +++ b/crates/store/src/record/record_impl.rs @@ -5,7 +5,7 @@ pub use fuel_data_parser::{DataEncoder, DataParserError as EncoderError}; use fuel_streams_subject::subject::IntoSubject; use sqlx::{PgConnection, PgExecutor, Postgres, QueryBuilder}; -use super::{QueryOptions, RecordEntity, RecordPacket}; +use super::{QueryOptions, RecordEntity, RecordPacket, RecordPointer}; use crate::db::{DbError, DbItem, DbResult}; pub trait RecordEncoder: DataEncoder {} @@ -16,14 +16,14 @@ pub type DbConnection = PgConnection; #[async_trait] pub trait Record: RecordEncoder + 'static { - type DbItem: DbItem; + type DbItem: DbItem + Into; const ENTITY: RecordEntity; const ORDER_PROPS: &'static [&'static str]; async fn insert<'e, 'c: 'e, E>( executor: E, - packet: &RecordPacket, + db_item: Self::DbItem, ) -> DbResult where 'c: 'e, @@ -31,16 +31,16 @@ pub trait Record: RecordEncoder + 'static { async fn insert_with_transaction( tx: &mut DbTransaction, - packet: &RecordPacket, + db_item: &Self::DbItem, ) -> DbResult { - Self::insert(&mut **tx, packet).await + Self::insert(&mut **tx, db_item.to_owned()).await } fn to_packet(&self, subject: &Arc) -> RecordPacket { let value = self .encode_json() .unwrap_or_else(|_| panic!("Encode failed for {}", Self::ENTITY)); - RecordPacket::new(subject.to_owned(), value) + RecordPacket::new(subject.parse(), subject.to_payload(), value) } fn from_db_item(record: &Self::DbItem) -> DbResult { diff --git a/crates/store/src/record/record_packet.rs b/crates/store/src/record/record_packet.rs index f8fb2d0b..cb350816 100644 --- a/crates/store/src/record/record_packet.rs +++ b/crates/store/src/record/record_packet.rs @@ -1,8 +1,9 @@ use std::{fmt::Debug, sync::Arc}; -use fuel_streams_subject::subject::IntoSubject; - -use super::{Record, RecordEntity, RecordEntityError}; +use fuel_data_parser::{DataEncoder, DataParserError as EncoderError}; +use fuel_streams_subject::subject::SubjectPayload; +use fuel_streams_types::BlockHeight; +use serde::{Deserialize, Serialize}; #[derive(Debug, thiserror::Error)] pub enum RecordPacketError { @@ -11,7 +12,7 @@ pub enum RecordPacketError { #[error("Subject mismatch")] SubjectMismatch, #[error(transparent)] - RecordEntity(#[from] RecordEntityError), + EncodeError(#[from] EncoderError), } pub trait PacketBuilder: Send + Sync + 'static { @@ -19,18 +20,41 @@ pub trait PacketBuilder: Send + Sync + 'static { fn build_packets(opts: &Self::Opts) -> Vec; } -#[derive(Debug, Clone)] +#[derive(Debug, Default, Clone, Serialize, Deserialize)] +pub struct RecordPointer { + pub block_height: BlockHeight, + #[serde(skip_serializing_if = "Option::is_none")] + pub tx_index: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub input_index: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub output_index: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub receipt_index: Option, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] pub struct RecordPacket { pub value: Vec, - pub subject: Arc, + pub subject: String, + pub subject_payload: SubjectPayload, namespace: Option, } +impl DataEncoder for RecordPacket { + type Err = RecordPacketError; +} + impl RecordPacket { - pub fn new(subject: Arc, value: Vec) -> Self { + pub fn new( + subject: impl ToString, + subject_payload: SubjectPayload, + value: Vec, + ) -> Self { Self { value, - subject, + subject: subject.to_string(), + subject_payload, namespace: None, } } @@ -39,51 +63,24 @@ impl RecordPacket { Arc::new(self.to_owned()) } - pub fn to_record(&self) -> R { - R::decode_json(&self.value) - .unwrap_or_else(|_| panic!("Decoded failed for {}", R::ENTITY)) - } - pub fn with_namespace(mut self, namespace: &str) -> Self { self.namespace = Some(namespace.to_string()); self } - pub fn subject_matches( - &self, - ) -> Result { - if let Some(subject) = self.subject.downcast_ref::() { - Ok(subject.clone()) - } else { - Err(RecordPacketError::DowncastError) - } - } - - pub fn get_entity(&self) -> Result { - let subject_str = self.subject_str(); - let first_part = match self.namespace { - Some(_) => subject_str.split('.').nth(1), - None => subject_str.split('.').next(), - }; - match first_part { - Some(value) => { - RecordEntity::try_from(value).map_err(RecordPacketError::from) - } - _ => Err(RecordPacketError::RecordEntity( - RecordEntityError::UnknownSubject("not_defined".to_string()), - )), - } + pub fn subject_id(&self) -> String { + self.subject_payload.subject.to_string() } pub fn subject_str(&self) -> String { if cfg!(any(test, feature = "test-helpers")) { - let mut subject = self.subject.parse(); + let mut subject = self.subject.to_owned(); if let Some(namespace) = &self.namespace { subject = format!("{}.{}", namespace, subject); } subject } else { - self.subject.parse() + self.subject.to_owned() } } diff --git a/crates/store/src/store/store_impl.rs b/crates/store/src/store/store_impl.rs index 45dd6d33..e9310dec 100644 --- a/crates/store/src/store/store_impl.rs +++ b/crates/store/src/store/store_impl.rs @@ -3,12 +3,11 @@ use std::sync::Arc; use fuel_data_parser::DataEncoder; use fuel_streams_subject::subject::IntoSubject; use fuel_streams_types::BlockHeight; -use futures::stream::BoxStream; use super::StoreError; use crate::{ - db::{Db, DbItem}, - record::{DbTransaction, QueryOptions, Record, RecordPacket}, + db::Db, + record::{DbTransaction, QueryOptions, Record}, }; pub type StoreResult = Result; @@ -41,24 +40,24 @@ impl Store { pub async fn insert_record( &self, - packet: &RecordPacket, + db_item: &R::DbItem, ) -> StoreResult { - let record = R::insert(&self.db.pool, packet).await?; + let record = R::insert(&self.db.pool, db_item.to_owned()).await?; Ok(record) } pub async fn insert_record_with_transaction( &self, tx: &mut DbTransaction, - packet: &RecordPacket, + db_item: &R::DbItem, ) -> StoreResult { - let record = R::insert_with_transaction(tx, packet).await?; + let record = R::insert_with_transaction(tx, db_item).await?; Ok(record) } pub async fn find_many_by_subject( &self, - subject: &Arc, + subject: &Arc, mut options: QueryOptions, ) -> StoreResult> { options = options.with_namespace(self.namespace.clone()); @@ -70,46 +69,6 @@ impl Store { .await .map_err(StoreError::from) } - - pub async fn historical_streaming( - &self, - subject: Arc, - from_block: Option, - query_opts: Option, - ) -> BoxStream<'static, Result<(String, Vec), StoreError>> { - let store = self.clone(); - let db = self.db.clone(); - let stream = async_stream::try_stream! { - let mut current_height = from_block.unwrap_or_default(); - let mut opts = query_opts.unwrap_or_default().with_from_block(Some(current_height)); - let mut last_height = find_last_block_height(&db, opts.clone()).await?; - while current_height <= last_height { - let items = store.find_many_by_subject(&subject, opts.clone()).await?; - for item in items { - let subject = item.subject_str(); - let value = item.encoded_value().to_vec(); - yield (subject, value); - let block_height = item.get_block_height(); - current_height = block_height; - } - opts.increment_offset(); - // When we reach the last known height, we need to check if any new blocks - // were produced while we were processing the previous ones - if current_height == last_height { - let new_last_height = find_last_block_height(&db, opts.clone()).await?; - if new_last_height > last_height { - // Reset current_height back to process the blocks we haven't seen yet - current_height = last_height; - last_height = new_last_height; - } else { - tracing::debug!("No new blocks found, stopping historical streaming on block {}", current_height); - break - } - } - } - }; - Box::pin(stream) - } } pub async fn find_last_block_height( diff --git a/services/consumer/src/errors.rs b/services/consumer/src/errors.rs index d1329a2f..9721c95d 100644 --- a/services/consumer/src/errors.rs +++ b/services/consumer/src/errors.rs @@ -1,6 +1,9 @@ use fuel_streams_core::StreamError; use fuel_streams_domains::MsgPayloadError; -use fuel_streams_store::{record::RecordPacketError, store::StoreError}; +use fuel_streams_store::{ + record::{RecordEntityError, RecordPacketError}, + store::StoreError, +}; #[derive(thiserror::Error, Debug)] pub enum ConsumerError { @@ -32,6 +35,8 @@ pub enum ConsumerError { MessageBrokerClient(#[from] fuel_message_broker::MessageBrokerError), #[error(transparent)] Sqlx(#[from] sqlx::Error), + #[error(transparent)] + RecordEntity(#[from] RecordEntityError), #[error("Database operation timed out")] DatabaseTimeout, } diff --git a/services/consumer/src/executor/block_executor.rs b/services/consumer/src/executor/block_executor.rs index 9366ddcc..d78a8a66 100644 --- a/services/consumer/src/executor/block_executor.rs +++ b/services/consumer/src/executor/block_executor.rs @@ -14,7 +14,7 @@ use fuel_web_utils::{ shutdown::shutdown_broker_with_timeout, telemetry::Telemetry, }; -use futures::StreamExt; +use futures::{future::try_join_all, StreamExt}; use tokio::{ sync::Semaphore, task::{JoinError, JoinSet}, @@ -22,9 +22,8 @@ use tokio::{ use tokio_util::sync::CancellationToken; use super::{ - block_stats::BlockStats, - process_store::handle_store_insertions, - process_stream::handle_stream_publishes, + block_stats::{ActionType, BlockStats}, + retry::RetryService, }; use crate::{errors::ConsumerError, metrics::Metrics, FuelStores}; @@ -122,7 +121,6 @@ impl BlockExecutor { let payload = msg.payload(); let msg_payload = MsgPayload::decode(&payload).await?.arc(); let packets = Self::build_packets(&msg_payload); - join_set.spawn({ let semaphore = semaphore.clone(); let packets = packets.clone(); @@ -131,13 +129,9 @@ impl BlockExecutor { let fuel_stores = fuel_stores.clone(); async move { let _permit = semaphore.acquire().await?; - let result = handle_store_insertions( - &db, - &fuel_stores, - &packets, - &msg_payload, - ) - .await; + let result = + handle_stores(&db, &fuel_stores, &packets, &msg_payload) + .await; Ok::<_, ConsumerError>(ProcessResult::Store(result)) } }); @@ -149,12 +143,8 @@ impl BlockExecutor { let fuel_streams = fuel_streams.clone(); async move { let _permit = semaphore.acquire_owned().await?; - let result = handle_stream_publishes( - &fuel_streams, - &packets, - &msg_payload, - ) - .await; + let result = + handle_streams(&fuel_streams, &packets, &msg_payload).await; Ok(ProcessResult::Stream(result)) } }); @@ -174,7 +164,6 @@ impl BlockExecutor { match result { Ok(Ok(ProcessResult::Store(store_result))) => { let store_stats = store_result?; - if let Some(metrics) = telemetry.base_metrics() { metrics.update_from_stats(&store_stats) } @@ -207,3 +196,47 @@ impl BlockExecutor { Arc::new(packets) } } + +async fn handle_stores( + db: &Arc, + fuel_stores: &Arc, + packets: &Arc>, + msg_payload: &Arc, +) -> Result { + let block_height = msg_payload.block_height(); + let stats = BlockStats::new(block_height.to_owned(), ActionType::Store); + let retry_service = RetryService::default(); + let result = retry_service + .with_retry("store_insertions", || async { + let mut tx = db.pool.begin().await?; + for packet in packets.iter() { + fuel_stores.insert_by_entity(&mut tx, packet).await?; + } + tx.commit().await?; + Ok(packets.len()) + }) + .await; + + match result { + Ok(packet_count) => Ok(stats.finish(packet_count)), + Err(e) => Ok(stats.finish_with_error(e)), + } +} + +async fn handle_streams( + fuel_streams: &Arc, + packets: &Arc>, + msg_payload: &Arc, +) -> Result { + let block_height = msg_payload.block_height(); + let stats = BlockStats::new(block_height.to_owned(), ActionType::Stream); + let publish_futures = packets.iter().map(|packet| { + let packet = packet.to_owned(); + fuel_streams.publish_by_entity(packet.arc()) + }); + + match try_join_all(publish_futures).await { + Ok(_) => Ok(stats.finish(packets.len())), + Err(e) => Ok(stats.finish_with_error(ConsumerError::from(e))), + } +} diff --git a/services/consumer/src/executor/mod.rs b/services/consumer/src/executor/mod.rs index 99070df4..00605374 100644 --- a/services/consumer/src/executor/mod.rs +++ b/services/consumer/src/executor/mod.rs @@ -1,7 +1,5 @@ mod block_executor; pub(crate) mod block_stats; -mod process_store; -mod process_stream; mod retry; pub use block_executor::*; diff --git a/services/consumer/src/executor/process_store.rs b/services/consumer/src/executor/process_store.rs deleted file mode 100644 index b6e483b2..00000000 --- a/services/consumer/src/executor/process_store.rs +++ /dev/null @@ -1,94 +0,0 @@ -use std::sync::Arc; - -use fuel_streams_domains::MsgPayload; -use fuel_streams_store::{ - db::Db, - record::{DbTransaction, RecordEntity, RecordPacket}, -}; - -use super::{ - block_stats::{ActionType, BlockStats}, - retry::RetryService, -}; -use crate::{errors::ConsumerError, FuelStores}; - -pub async fn handle_store_insertions( - db: &Arc, - fuel_stores: &Arc, - packets: &Arc>, - msg_payload: &Arc, -) -> Result { - let block_height = msg_payload.block_height(); - let stats = BlockStats::new(block_height.to_owned(), ActionType::Store); - let retry_service = RetryService::default(); - let result = retry_service - .with_retry("store_insertions", || { - process_store_packets(db, fuel_stores, packets) - }) - .await; - - match result { - Ok(packet_count) => Ok(stats.finish(packet_count)), - Err(e) => Ok(stats.finish_with_error(e)), - } -} - -async fn process_store_packets( - db: &Db, - fuel_stores: &FuelStores, - packets: &[RecordPacket], -) -> Result { - let mut tx = db.pool.begin().await?; - for packet in packets { - process_packet(fuel_stores, &mut tx, packet).await?; - } - tx.commit().await?; - Ok(packets.len()) -} - -async fn process_packet( - fuel_stores: &FuelStores, - db_tx: &mut DbTransaction, - packet: &RecordPacket, -) -> Result<(), ConsumerError> { - let entity = packet.get_entity()?; - match entity { - RecordEntity::Block => { - fuel_stores - .blocks - .insert_record_with_transaction(db_tx, packet) - .await?; - } - RecordEntity::Transaction => { - fuel_stores - .transactions - .insert_record_with_transaction(db_tx, packet) - .await?; - } - RecordEntity::Input => { - fuel_stores - .inputs - .insert_record_with_transaction(db_tx, packet) - .await?; - } - RecordEntity::Output => { - fuel_stores - .outputs - .insert_record_with_transaction(db_tx, packet) - .await?; - } - RecordEntity::Receipt => { - fuel_stores - .receipts - .insert_record_with_transaction(db_tx, packet) - .await?; - } - RecordEntity::Utxo => { - fuel_stores - .utxos - .insert_record_with_transaction(db_tx, packet) - .await?; - } - } - Ok(()) -} diff --git a/services/consumer/src/executor/process_stream.rs b/services/consumer/src/executor/process_stream.rs deleted file mode 100644 index a29fca20..00000000 --- a/services/consumer/src/executor/process_stream.rs +++ /dev/null @@ -1,48 +0,0 @@ -use std::sync::Arc; - -use fuel_streams_core::FuelStreams; -use fuel_streams_domains::MsgPayload; -use fuel_streams_store::record::{RecordEntity, RecordPacket}; -use futures::future::try_join_all; - -use super::block_stats::{ActionType, BlockStats}; -use crate::errors::ConsumerError; - -pub async fn handle_stream_publishes( - fuel_streams: &Arc, - packets: &Arc>, - msg_payload: &Arc, -) -> Result { - let block_height = msg_payload.block_height(); - let stats = BlockStats::new(block_height.to_owned(), ActionType::Stream); - let publish_futures = packets.iter().map(|packet| async { - let entity = packet.get_entity()?; - let subject = packet.subject_str(); - let payload = packet.to_owned().value.into(); - match entity { - RecordEntity::Block => { - fuel_streams.blocks.publish(&subject, payload).await - } - RecordEntity::Transaction => { - fuel_streams.transactions.publish(&subject, payload).await - } - RecordEntity::Input => { - fuel_streams.inputs.publish(&subject, payload).await - } - RecordEntity::Output => { - fuel_streams.outputs.publish(&subject, payload).await - } - RecordEntity::Receipt => { - fuel_streams.receipts.publish(&subject, payload).await - } - RecordEntity::Utxo => { - fuel_streams.utxos.publish(&subject, payload).await - } - } - }); - - match try_join_all(publish_futures).await { - Ok(_) => Ok(stats.finish(packets.len())), - Err(e) => Ok(stats.finish_with_error(ConsumerError::from(e))), - } -} diff --git a/services/consumer/src/fuel_stores.rs b/services/consumer/src/fuel_stores.rs index 02a2ac58..a0456201 100644 --- a/services/consumer/src/fuel_stores.rs +++ b/services/consumer/src/fuel_stores.rs @@ -8,7 +8,21 @@ use fuel_streams_core::types::{ Transaction, Utxo, }; -use fuel_streams_store::{db::Db, store::Store}; +use fuel_streams_domains::{ + blocks::BlockDbItem, + inputs::InputDbItem, + outputs::OutputDbItem, + receipts::ReceiptDbItem, + transactions::TransactionDbItem, + utxos::UtxoDbItem, +}; +use fuel_streams_store::{ + db::Db, + record::{DbTransaction, RecordEntity, RecordPacket}, + store::Store, +}; + +use crate::errors::ConsumerError; #[derive(Debug, Clone)] pub struct FuelStores { @@ -47,4 +61,52 @@ impl FuelStores { pub fn arc(&self) -> Arc { Arc::new(self.clone()) } + + pub async fn insert_by_entity( + &self, + db_tx: &mut DbTransaction, + packet: &RecordPacket, + ) -> Result<(), ConsumerError> { + let subject_id = packet.subject_id(); + let entity = RecordEntity::from_subject_id(&subject_id)?; + match entity { + RecordEntity::Block => { + let db_item: BlockDbItem = packet.try_into()?; + self.blocks + .insert_record_with_transaction(db_tx, &db_item) + .await?; + } + RecordEntity::Transaction => { + let db_item: TransactionDbItem = packet.try_into()?; + self.transactions + .insert_record_with_transaction(db_tx, &db_item) + .await?; + } + RecordEntity::Input => { + let db_item: InputDbItem = packet.try_into()?; + self.inputs + .insert_record_with_transaction(db_tx, &db_item) + .await?; + } + RecordEntity::Output => { + let db_item: OutputDbItem = packet.try_into()?; + self.outputs + .insert_record_with_transaction(db_tx, &db_item) + .await?; + } + RecordEntity::Receipt => { + let db_item: ReceiptDbItem = packet.try_into()?; + self.receipts + .insert_record_with_transaction(db_tx, &db_item) + .await?; + } + RecordEntity::Utxo => { + let db_item: UtxoDbItem = packet.try_into()?; + self.utxos + .insert_record_with_transaction(db_tx, &db_item) + .await?; + } + }; + Ok(()) + } } diff --git a/services/webserver/src/server/websocket/subscribe.rs b/services/webserver/src/server/websocket/subscribe.rs index dbb3b485..ed14c05b 100644 --- a/services/webserver/src/server/websocket/subscribe.rs +++ b/services/webserver/src/server/websocket/subscribe.rs @@ -2,15 +2,14 @@ use std::sync::Arc; use actix_ws::{CloseReason, Session}; use fuel_streams_core::{ - prelude::{IntoSubject, SubjectPayload}, + prelude::IntoSubject, server::{ServerResponse, Subscription}, - types::{MessagePayload, ServerRequest, StreamResponse}, + types::ServerRequest, BoxedStream, FuelStreams, }; use fuel_streams_domains::Subjects; use fuel_streams_store::record::RecordEntity; -use fuel_web_utils::server::api::API_VERSION; use futures::{future::try_join_all, StreamExt}; use crate::server::{ @@ -115,7 +114,7 @@ async fn process_subscription( Some(result) = sub.next() => { let result = result?; tracing::debug!(?payload, ?result, "Received message from stream"); - let payload = decode_and_respond(subscription.payload.clone(), result).await?; + let payload = ServerResponse::Response(result); tracing::debug!("Sending message to client: {:?}", payload); ctx.send_message(session, payload).await?; } @@ -183,18 +182,3 @@ async fn create_subscriber( }; Ok(Box::new(stream)) } - -pub async fn decode_and_respond( - subject_payload: SubjectPayload, - (subject, data): (String, Vec), -) -> Result { - let subject_id = subject_payload.subject.as_str(); - let data = MessagePayload::new(subject_id, &data)?; - let response_message = StreamResponse { - subject, - ty: subject_id.to_string(), - version: API_VERSION.to_string(), - payload: data, - }; - Ok(ServerResponse::Response(response_message)) -} diff --git a/tests/src/lib.rs b/tests/src/lib.rs index a77d6cc4..0e9014dc 100644 --- a/tests/src/lib.rs +++ b/tests/src/lib.rs @@ -12,7 +12,7 @@ use fuel_streams_domains::blocks::{ }; use fuel_streams_store::{ db::{Db, DbConnectionOpts, DbResult}, - record::{DbTransaction, Record}, + record::{DbTransaction, Record, RecordPacket}, store::Store, }; use rand::Rng; @@ -49,30 +49,37 @@ pub async fn setup_stream( // Test data // ----------------------------------------------------------------------------- -pub fn create_record(height: u32) -> (Arc, Block) { +pub fn create_record( + height: u32, + prefix: &str, +) -> (Arc, Block, RecordPacket) { let block = MockBlock::build(height); - let subject = BlocksSubject::from(&block).dyn_arc(); - (subject, block) + let subject = BlocksSubject::from(&block); + let subject = subject.dyn_arc(); + let packet = block.to_packet(&subject).with_namespace(prefix); + (subject, block, packet) } pub fn create_multiple_records( count: usize, start_height: u32, -) -> Vec<(Arc, Block)> { + prefix: &str, +) -> Vec<(Arc, Block, RecordPacket)> { (0..count) - .map(|idx| create_record(start_height + idx as u32)) + .map(|idx| create_record(start_height + idx as u32, prefix)) .collect() } pub async fn insert_records( store: &Store, prefix: &str, - records: &[(Arc, Block)], + records: &[(Arc, Block, RecordPacket)], ) -> anyhow::Result> { let mut final_records = vec![]; - for (subject, block) in records { - let packet = block.to_packet(subject).with_namespace(prefix); - let record = store.insert_record(&packet).await?; + for record in records { + let packet = record.2.to_owned().with_namespace(prefix); + let db_item: BlockDbItem = (&packet).try_into()?; + let record = store.insert_record(&db_item).await?; final_records.push(record); } Ok(final_records) @@ -82,12 +89,13 @@ pub async fn insert_records_with_transaction( store: &Store, tx: &mut DbTransaction, prefix: &str, - records: &[(Arc, Block)], + records: &[(Arc, Block, RecordPacket)], ) -> anyhow::Result<()> { let mut final_records = vec![]; - for (subject, block) in records { - let packet = block.to_packet(subject).with_namespace(prefix); - let record = store.insert_record_with_transaction(tx, &packet).await?; + for record in records { + let packet = record.2.to_owned().with_namespace(prefix); + let db_item: BlockDbItem = (&packet).try_into()?; + let record = store.insert_record_with_transaction(tx, &db_item).await?; final_records.push(record); } Ok(()) diff --git a/tests/tests/services/consumer.rs b/tests/tests/services/consumer.rs index a30f3140..4264ddf3 100644 --- a/tests/tests/services/consumer.rs +++ b/tests/tests/services/consumer.rs @@ -22,16 +22,18 @@ use pretty_assertions::assert_eq; use sv_consumer::{BlockExecutor, FuelStores}; async fn verify_blocks( + prefix: &str, fuel_stores: &Arc, msg_payload: &MsgPayload, ) -> anyhow::Result<()> { let block_subject = BlocksSubject::new() .with_height(Some(msg_payload.block_height())) .dyn_arc(); - + let options = + QueryOptions::default().with_namespace(Some(prefix.to_string())); let blocks = fuel_stores .blocks - .find_many_by_subject(&block_subject, QueryOptions::default()) + .find_many_by_subject(&block_subject, options) .await?; assert!(!blocks.is_empty(), "Expected blocks to be inserted"); @@ -43,16 +45,18 @@ async fn verify_blocks( } async fn verify_transactions( + prefix: &str, fuel_stores: &Arc, msg_payload: &MsgPayload, ) -> anyhow::Result<()> { let tx_subject = TransactionsSubject::new() .with_block_height(Some(msg_payload.block_height())) .dyn_arc(); - + let options = + QueryOptions::default().with_namespace(Some(prefix.to_string())); let transactions = fuel_stores .transactions - .find_many_by_subject(&tx_subject, QueryOptions::default()) + .find_many_by_subject(&tx_subject, options) .await?; assert!( !transactions.is_empty(), @@ -95,16 +99,18 @@ async fn verify_transactions( } async fn verify_receipts( + prefix: &str, fuel_stores: &Arc, msg_payload: &MsgPayload, ) -> anyhow::Result<()> { let receipts_subject = ReceiptsSubject::new() .with_block_height(Some(msg_payload.block_height())) .dyn_arc(); - + let options = + QueryOptions::default().with_namespace(Some(prefix.to_string())); let receipts = fuel_stores .receipts - .find_many_by_subject(&receipts_subject, QueryOptions::default()) + .find_many_by_subject(&receipts_subject, options) .await?; let expected_receipts_count: usize = msg_payload @@ -123,6 +129,7 @@ async fn verify_receipts( } async fn verify_inputs_outputs_utxos( + prefix: &str, fuel_stores: &Arc, msg_payload: &MsgPayload, ) -> anyhow::Result<()> { @@ -142,9 +149,11 @@ async fn verify_inputs_outputs_utxos( let inputs_subject = InputsSubject::new() .with_block_height(Some(msg_payload.block_height())) .dyn_arc(); + let options = + QueryOptions::default().with_namespace(Some(prefix.to_string())); let inputs = fuel_stores .inputs - .find_many_by_subject(&inputs_subject, QueryOptions::default()) + .find_many_by_subject(&inputs_subject, options.clone()) .await?; assert_eq!( inputs.len(), @@ -158,7 +167,7 @@ async fn verify_inputs_outputs_utxos( .dyn_arc(); let outputs = fuel_stores .outputs - .find_many_by_subject(&outputs_subject, QueryOptions::default()) + .find_many_by_subject(&outputs_subject, options.clone()) .await?; assert_eq!( outputs.len(), @@ -172,7 +181,7 @@ async fn verify_inputs_outputs_utxos( .dyn_arc(); let utxos = fuel_stores .utxos - .find_many_by_subject(&utxos_subject, QueryOptions::default()) + .find_many_by_subject(&utxos_subject, options) .await?; assert_eq!( utxos.len(), @@ -219,13 +228,13 @@ async fn test_consumer_inserting_records() -> anyhow::Result<()> { }); // Give some time for processing - tokio::time::sleep(std::time::Duration::from_secs(1)).await; + tokio::time::sleep(std::time::Duration::from_secs(2)).await; // Verify all records were inserted correctly - verify_blocks(&fuel_stores, &msg_payload).await?; - verify_transactions(&fuel_stores, &msg_payload).await?; - verify_receipts(&fuel_stores, &msg_payload).await?; - verify_inputs_outputs_utxos(&fuel_stores, &msg_payload).await?; + verify_blocks(&prefix, &fuel_stores, &msg_payload).await?; + verify_transactions(&prefix, &fuel_stores, &msg_payload).await?; + verify_receipts(&prefix, &fuel_stores, &msg_payload).await?; + verify_inputs_outputs_utxos(&prefix, &fuel_stores, &msg_payload).await?; Ok(()) } diff --git a/tests/tests/store/blocks.rs b/tests/tests/store/blocks.rs index 0a27df9a..02aee8eb 100644 --- a/tests/tests/store/blocks.rs +++ b/tests/tests/store/blocks.rs @@ -35,10 +35,12 @@ async fn store_can_record_blocks() -> anyhow::Result<()> { let store = setup_store::().await?; let block = MockBlock::build(1); let subject = BlocksSubject::from(&block).dyn_arc(); + let packet = block.to_packet(&subject); let prefix = create_random_db_name(); - let packet = block.to_packet(&subject).with_namespace(&prefix); - - let db_record: BlockDbItem = store.insert_record(&packet).await?; + let packet = packet.with_namespace(&prefix); + let db_item = BlockDbItem::try_from(&packet)?; + let db_record: BlockDbItem = store.insert_record(&db_item).await?; + assert_eq!(db_record, db_item); assert_eq!(db_record.subject, packet.subject_str()); assert_eq!(Block::from_db_item(&db_record)?, block); diff --git a/tests/tests/store/inputs.rs b/tests/tests/store/inputs.rs index 88d9fecb..053f5595 100644 --- a/tests/tests/store/inputs.rs +++ b/tests/tests/store/inputs.rs @@ -1,12 +1,6 @@ -use std::sync::Arc; - -use fuel_streams_core::{ - inputs::{InputsCoinSubject, InputsContractSubject, InputsMessageSubject}, - subjects::IntoSubject, - types::{Input, Transaction}, -}; +use fuel_streams_core::types::{Input, Transaction}; use fuel_streams_domains::{ - inputs::{types::MockInput, InputDbItem}, + inputs::{types::MockInput, DynInputSubject, InputDbItem}, transactions::types::MockTransaction, Subjects, }; @@ -36,7 +30,8 @@ async fn insert_input(input: Input) -> anyhow::Result<()> { packet ); - let db_record = store.insert_record(&packet).await?; + let db_item = db_item.unwrap(); + let db_record = store.insert_record(&db_item).await?; assert_eq!(db_record.subject, packet.subject_str()); Ok(()) @@ -58,35 +53,14 @@ fn create_packets( .into_iter() .enumerate() .map(|(input_index, input)| { - let subject: Arc = match &input { - Input::Coin(coin) => InputsCoinSubject { - block_height: Some(1.into()), - tx_id: Some(tx_id.clone()), - tx_index: Some(0), - input_index: Some(input_index as u32), - owner: Some(coin.owner.to_owned()), - asset: Some(coin.asset_id.to_owned()), - } - .arc(), - Input::Contract(contract) => InputsContractSubject { - block_height: Some(1.into()), - tx_id: Some(tx_id.clone()), - tx_index: Some(0), - input_index: Some(input_index as u32), - contract: Some(contract.contract_id.to_owned().into()), - } - .arc(), - Input::Message(message) => InputsMessageSubject { - block_height: Some(1.into()), - tx_id: Some(tx_id.clone()), - tx_index: Some(0), - input_index: Some(input_index as u32), - sender: Some(message.sender.to_owned()), - recipient: Some(message.recipient.to_owned()), - } - .arc(), - }; - input.to_packet(&subject).with_namespace(prefix) + let subject = DynInputSubject::from(( + &input, + 1.into(), + tx_id.clone(), + 0, + input_index as u32, + )); + input.to_packet(&subject.into()).with_namespace(prefix) }) .collect::>() } @@ -120,8 +94,11 @@ async fn find_many_by_subject_with_sql_columns() -> anyhow::Result<()> { ]); let packets = create_packets(&tx, &tx_id, &prefix); for packet in packets { + let payload = packet.subject_payload.clone(); + let subject: Subjects = payload.try_into()?; + let subject = subject.into(); let _ = store - .find_many_by_subject(&packet.subject, QueryOptions::default()) + .find_many_by_subject(&subject, QueryOptions::default()) .await?; } @@ -145,11 +122,10 @@ async fn test_input_subject_to_db_item_conversion() -> anyhow::Result<()> { let packets = create_packets(&tx, &tx_id, &prefix); for (idx, packet) in packets.into_iter().enumerate() { - let subject: Subjects = packet.clone().try_into()?; + let payload = packet.subject_payload.clone(); + let subject: Subjects = payload.try_into()?; let db_item = InputDbItem::try_from(&packet)?; - - // Assert store insert - let inserted = store.insert_record(&packet).await?; + let inserted = store.insert_record(&db_item).await?; assert_eq!(db_item, inserted); // Verify common fields diff --git a/tests/tests/store/outputs.rs b/tests/tests/store/outputs.rs index ead2fe50..4f9d6ab1 100644 --- a/tests/tests/store/outputs.rs +++ b/tests/tests/store/outputs.rs @@ -1,18 +1,6 @@ -use std::sync::Arc; - -use fuel_streams_core::{ - outputs::{ - OutputsChangeSubject, - OutputsCoinSubject, - OutputsContractCreatedSubject, - OutputsContractSubject, - OutputsVariableSubject, - }, - subjects::IntoSubject, - types::{Output, Transaction}, -}; +use fuel_streams_core::types::{Output, Transaction}; use fuel_streams_domains::{ - outputs::{types::MockOutput, OutputDbItem}, + outputs::{types::MockOutput, DynOutputSubject, OutputDbItem}, transactions::types::MockTransaction, Subjects, }; @@ -21,7 +9,7 @@ use fuel_streams_store::{ store::Store, }; use fuel_streams_test::{create_random_db_name, setup_db, setup_store}; -use fuel_streams_types::{ContractId, TxId}; +use fuel_streams_types::TxId; use pretty_assertions::assert_eq; async fn insert_output(output: Output) -> anyhow::Result<()> { @@ -41,7 +29,8 @@ async fn insert_output(output: Output) -> anyhow::Result<()> { packet ); - let db_record = store.insert_record(&packet).await?; + let db_item = db_item.unwrap(); + let db_record = store.insert_record(&db_item).await?; assert_eq!(db_record.subject, packet.subject_str()); Ok(()) @@ -63,54 +52,15 @@ fn create_packets( .into_iter() .enumerate() .map(|(output_index, output)| { - let subject: Arc = match &output { - Output::Coin(coin) => OutputsCoinSubject { - block_height: Some(1.into()), - tx_id: Some(tx_id.clone()), - tx_index: Some(0), - output_index: Some(output_index as u32), - to: Some(coin.to.to_owned()), - asset: Some(coin.asset_id.to_owned()), - } - .arc(), - Output::Contract(_) => OutputsContractSubject { - block_height: Some(1.into()), - tx_id: Some(tx_id.clone()), - tx_index: Some(0), - output_index: Some(output_index as u32), - contract: Some(ContractId::default()), - } - .arc(), - Output::Change(change) => OutputsChangeSubject { - block_height: Some(1.into()), - tx_id: Some(tx_id.clone()), - tx_index: Some(0), - output_index: Some(output_index as u32), - to: Some(change.to.to_owned()), - asset: Some(change.asset_id.to_owned()), - } - .arc(), - Output::Variable(variable) => OutputsVariableSubject { - block_height: Some(1.into()), - tx_id: Some(tx_id.clone()), - tx_index: Some(0), - output_index: Some(output_index as u32), - to: Some(variable.to.to_owned()), - asset: Some(variable.asset_id.to_owned()), - } - .arc(), - Output::ContractCreated(contract_created) => { - OutputsContractCreatedSubject { - block_height: Some(1.into()), - tx_id: Some(tx_id.clone()), - tx_index: Some(0), - output_index: Some(output_index as u32), - contract: Some(contract_created.contract_id.to_owned()), - } - .arc() - } - }; - output.to_packet(&subject).with_namespace(prefix) + let subject = DynOutputSubject::from(( + &output, + 1.into(), + tx_id.clone(), + 0, + output_index as u32, + tx, + )); + output.to_packet(&subject.into()).with_namespace(prefix) }) .collect() } @@ -156,8 +106,11 @@ async fn find_many_by_subject_with_sql_columns() -> anyhow::Result<()> { ]); let packets = create_packets(&tx, &tx_id, &prefix); for packet in packets { + let payload = packet.subject_payload.clone(); + let subject: Subjects = payload.try_into()?; + let subject = subject.into(); let _ = store - .find_many_by_subject(&packet.subject, QueryOptions::default()) + .find_many_by_subject(&subject, QueryOptions::default()) .await?; } @@ -183,11 +136,10 @@ async fn test_output_subject_to_db_item_conversion() -> anyhow::Result<()> { let packets = create_packets(&tx, &tx_id, &prefix); for (idx, packet) in packets.into_iter().enumerate() { - let subject: Subjects = packet.clone().try_into()?; + let payload = packet.subject_payload.clone(); + let subject: Subjects = payload.try_into()?; let db_item = OutputDbItem::try_from(&packet)?; - - // Assert store insert - let inserted = store.insert_record(&packet).await?; + let inserted = store.insert_record(&db_item).await?; assert_eq!(db_item, inserted); // Verify common fields diff --git a/tests/tests/store/pattern_matching.rs b/tests/tests/store/pattern_matching.rs index 3b48534f..9c9103a0 100644 --- a/tests/tests/store/pattern_matching.rs +++ b/tests/tests/store/pattern_matching.rs @@ -16,7 +16,7 @@ async fn test_asterisk_subject_string() -> anyhow::Result<()> { store.with_namespace(&prefix); // Create and insert test blocks with different subjects - let records = create_multiple_records(3, 1); + let records = create_multiple_records(3, 1, &prefix); insert_records(&store, &prefix, &records).await?; // Test subject matching diff --git a/tests/tests/store/receipts.rs b/tests/tests/store/receipts.rs index de968bed..532b46c7 100644 --- a/tests/tests/store/receipts.rs +++ b/tests/tests/store/receipts.rs @@ -1,26 +1,6 @@ -use std::sync::Arc; - -use fuel_streams_core::{ - subjects::{ - IntoSubject, - ReceiptsBurnSubject, - ReceiptsCallSubject, - ReceiptsLogDataSubject, - ReceiptsLogSubject, - ReceiptsMessageOutSubject, - ReceiptsMintSubject, - ReceiptsPanicSubject, - ReceiptsReturnDataSubject, - ReceiptsReturnSubject, - ReceiptsRevertSubject, - ReceiptsScriptResultSubject, - ReceiptsTransferOutSubject, - ReceiptsTransferSubject, - }, - types::{MockReceipt, Receipt, Transaction}, -}; +use fuel_streams_core::types::{MockReceipt, Receipt, Transaction}; use fuel_streams_domains::{ - receipts::ReceiptDbItem, + receipts::{DynReceiptSubject, ReceiptDbItem}, transactions::types::MockTransaction, Subjects, }; @@ -49,7 +29,8 @@ async fn insert_receipt(receipt: Receipt) -> anyhow::Result<()> { packet ); - let db_record = store.insert_record(&packet).await?; + let db_item = db_item.unwrap(); + let db_record = store.insert_record(&db_item).await?; assert_eq!(db_record.subject, packet.subject_str()); Ok(()) @@ -71,121 +52,14 @@ fn create_packets( .into_iter() .enumerate() .map(|(receipt_index, receipt)| { - let subject: Arc = match &receipt { - Receipt::Call(data) => ReceiptsCallSubject { - block_height: Some(1.into()), - tx_id: Some(tx_id.clone()), - tx_index: Some(0), - receipt_index: Some(receipt_index as u32), - from: Some(data.id.to_owned()), - to: Some(data.to.to_owned()), - asset: Some(data.asset_id.to_owned()), - } - .arc(), - Receipt::Return(data) => ReceiptsReturnSubject { - block_height: Some(1.into()), - tx_id: Some(tx_id.clone()), - tx_index: Some(0), - receipt_index: Some(receipt_index as u32), - contract: Some(data.id.to_owned()), - } - .arc(), - Receipt::ReturnData(data) => ReceiptsReturnDataSubject { - block_height: Some(1.into()), - tx_id: Some(tx_id.clone()), - tx_index: Some(0), - receipt_index: Some(receipt_index as u32), - contract: Some(data.id.to_owned()), - } - .arc(), - Receipt::Panic(data) => ReceiptsPanicSubject { - block_height: Some(1.into()), - tx_id: Some(tx_id.clone()), - tx_index: Some(0), - receipt_index: Some(receipt_index as u32), - contract: Some(data.id.to_owned()), - } - .arc(), - Receipt::Revert(data) => ReceiptsRevertSubject { - block_height: Some(1.into()), - tx_id: Some(tx_id.clone()), - tx_index: Some(0), - receipt_index: Some(receipt_index as u32), - contract: Some(data.id.to_owned()), - } - .arc(), - Receipt::Log(data) => ReceiptsLogSubject { - block_height: Some(1.into()), - tx_id: Some(tx_id.clone()), - tx_index: Some(0), - receipt_index: Some(receipt_index as u32), - contract: Some(data.id.to_owned()), - } - .arc(), - Receipt::LogData(data) => ReceiptsLogDataSubject { - block_height: Some(1.into()), - tx_id: Some(tx_id.clone()), - tx_index: Some(0), - receipt_index: Some(receipt_index as u32), - contract: Some(data.id.to_owned()), - } - .arc(), - Receipt::Transfer(data) => ReceiptsTransferSubject { - block_height: Some(1.into()), - tx_id: Some(tx_id.clone()), - tx_index: Some(0), - receipt_index: Some(receipt_index as u32), - from: Some(data.id.to_owned()), - to: Some(data.to.to_owned()), - asset: Some(data.asset_id.to_owned()), - } - .arc(), - Receipt::TransferOut(data) => ReceiptsTransferOutSubject { - block_height: Some(1.into()), - tx_id: Some(tx_id.clone()), - tx_index: Some(0), - receipt_index: Some(receipt_index as u32), - from: Some(data.id.to_owned()), - to_address: Some(data.to.to_owned()), - asset: Some(data.asset_id.to_owned()), - } - .arc(), - Receipt::ScriptResult(_) => ReceiptsScriptResultSubject { - block_height: Some(1.into()), - tx_id: Some(tx_id.clone()), - tx_index: Some(0), - receipt_index: Some(receipt_index as u32), - } - .arc(), - Receipt::MessageOut(data) => ReceiptsMessageOutSubject { - block_height: Some(1.into()), - tx_id: Some(tx_id.clone()), - tx_index: Some(0), - receipt_index: Some(receipt_index as u32), - sender: Some(data.sender.to_owned()), - recipient: Some(data.recipient.to_owned()), - } - .arc(), - Receipt::Mint(data) => ReceiptsMintSubject { - block_height: Some(1.into()), - tx_id: Some(tx_id.clone()), - tx_index: Some(0), - receipt_index: Some(receipt_index as u32), - contract: Some(data.contract_id.to_owned()), - sub_id: Some(data.sub_id.to_owned()), - } - .arc(), - Receipt::Burn(data) => ReceiptsBurnSubject { - block_height: Some(1.into()), - tx_id: Some(tx_id.clone()), - tx_index: Some(0), - receipt_index: Some(receipt_index as u32), - contract: Some(data.contract_id.to_owned()), - sub_id: Some(data.sub_id.to_owned()), - } - .arc(), - }; - receipt.to_packet(&subject).with_namespace(prefix) + let subject = DynReceiptSubject::from(( + &receipt, + 1.into(), + tx_id.clone(), + 0, + receipt_index as u32, + )); + receipt.to_packet(&subject.into()).with_namespace(prefix) }) .collect() } @@ -281,8 +155,11 @@ async fn find_many_by_subject_with_sql_columns() -> anyhow::Result<()> { let packets = create_packets(&tx, &tx_id, &prefix); for packet in packets { + let payload = packet.subject_payload.clone(); + let subject: Subjects = payload.try_into()?; + let subject = subject.into(); let _ = store - .find_many_by_subject(&packet.subject, QueryOptions::default()) + .find_many_by_subject(&subject, QueryOptions::default()) .await?; } @@ -316,11 +193,10 @@ async fn test_receipt_subject_to_db_item_conversion() -> anyhow::Result<()> { let packets = create_packets(&tx, &tx_id, &prefix); for (idx, packet) in packets.into_iter().enumerate() { - let subject: Subjects = packet.clone().try_into()?; + let payload = packet.subject_payload.clone(); + let subject: Subjects = payload.try_into()?; let db_item = ReceiptDbItem::try_from(&packet)?; - - // Add store insert verification - let inserted = store.insert_record(&packet).await?; + let inserted = store.insert_record(&db_item).await?; assert_eq!(db_item, inserted); // Verify common fields diff --git a/tests/tests/store/record.rs b/tests/tests/store/record.rs index a1add8a0..556303a2 100644 --- a/tests/tests/store/record.rs +++ b/tests/tests/store/record.rs @@ -16,7 +16,7 @@ async fn test_multiple_inserts() -> anyhow::Result<()> { let mut store = setup_store::().await?; store.with_namespace(&prefix); - let blocks = create_multiple_records(2, 1); + let blocks = create_multiple_records(2, 1, &prefix); let db_items = insert_records(&store, &prefix, &blocks).await?; // Verify both records exist and are correct @@ -43,7 +43,7 @@ async fn test_find_many_by_subject() -> anyhow::Result<()> { let mut store = setup_store::().await?; store.with_namespace(&prefix); - let blocks = create_multiple_records(2, 1); + let blocks = create_multiple_records(2, 1, &prefix); let _ = insert_records(&store, &prefix, &blocks).await?; let block1 = &blocks.first().unwrap().1; let block2 = &blocks.get(1).unwrap().1; @@ -72,11 +72,7 @@ async fn test_subject_matching() -> anyhow::Result<()> { let block = MockBlock::build(1); let subject = BlocksSubject::from(&block).dyn_arc(); let packet = block.to_packet(&subject); - - // Test subject matching - let matched_subject: BlocksSubject = packet - .subject_matches() - .expect("Failed to match BlocksSubject"); + let matched_subject: BlocksSubject = packet.subject_payload.into(); assert_eq!(matched_subject.parse(), subject.parse()); Ok(()) } @@ -89,7 +85,7 @@ async fn test_insert_with_transaction() -> anyhow::Result<()> { // Start a transaction let mut tx = store.db.pool.begin().await?; - let blocks = create_multiple_records(4, 1); + let blocks = create_multiple_records(4, 1, &prefix); insert_records_with_transaction(&store, &mut tx, &prefix, &blocks).await?; tx.commit().await?; @@ -102,7 +98,7 @@ async fn test_insert_with_transaction() -> anyhow::Result<()> { // Verify the records match the original blocks for (record, item) in found_records.iter().zip(blocks.iter()) { - let (_, block) = item; + let (_, block, _) = item; assert_eq!(&Block::from_db_item(record)?, block); } diff --git a/tests/tests/store/transactions.rs b/tests/tests/store/transactions.rs index dbf089e6..504ebc3a 100644 --- a/tests/tests/store/transactions.rs +++ b/tests/tests/store/transactions.rs @@ -1,7 +1,5 @@ -use std::sync::Arc; - use fuel_streams_core::{ - subjects::{IntoSubject, SubjectBuildable, TransactionsSubject}, + subjects::{SubjectBuildable, TransactionsSubject}, types::{MockInput, MockOutput, MockReceipt, MockTransaction, Transaction}, }; use fuel_streams_domains::{transactions::TransactionDbItem, Subjects}; @@ -28,21 +26,21 @@ async fn insert_transaction(tx: &Transaction) -> anyhow::Result<()> { packet ); - let db_record = store.insert_record(&packet).await?; + let db_item = db_item.unwrap(); + let db_record = store.insert_record(&db_item).await?; assert_eq!(db_record.subject, packet.subject_str()); Ok(()) } fn create_packets(tx: &Transaction, prefix: &str) -> Vec { - let subject: Arc = TransactionsSubject::new() + let subject = TransactionsSubject::new() .with_block_height(Some(1.into())) .with_tx_id(Some(tx.id.clone())) .with_tx_index(Some(0)) .with_tx_status(Some(tx.status.clone())) .with_kind(Some(tx.kind.clone())) .dyn_arc(); - vec![tx.to_packet(&subject).with_namespace(prefix)] } @@ -149,8 +147,11 @@ async fn find_many_by_subject_with_sql_columns() -> anyhow::Result<()> { for tx in transactions { let packets = create_packets(&tx, &prefix); for packet in packets { + let payload = packet.subject_payload.clone(); + let subject: Subjects = payload.try_into()?; + let subject = subject.into(); let _ = store - .find_many_by_subject(&packet.subject, QueryOptions::default()) + .find_many_by_subject(&subject, QueryOptions::default()) .await?; } } @@ -202,12 +203,10 @@ async fn test_transaction_subject_to_db_item_conversion() -> anyhow::Result<()> for tx in transactions { let packets = create_packets(&tx, &prefix); let packet = packets.first().unwrap(); - - let subject: Subjects = packet.clone().try_into()?; + let payload = packet.subject_payload.clone(); + let subject: Subjects = payload.try_into()?; let db_item = TransactionDbItem::try_from(packet)?; - - // Assert store insert - let inserted = store.insert_record(packet).await?; + let inserted = store.insert_record(&db_item).await?; assert_eq!(db_item, inserted); // Verify common fields diff --git a/tests/tests/store/utxo.rs b/tests/tests/store/utxo.rs index 3367838a..526f8727 100644 --- a/tests/tests/store/utxo.rs +++ b/tests/tests/store/utxo.rs @@ -1,30 +1,21 @@ -use std::sync::Arc; - -use fuel_streams_core::{ - subjects::IntoSubject, - types::{Transaction, Utxo}, -}; +use fuel_streams_core::types::{Input, MockInput, Transaction, Utxo}; use fuel_streams_domains::{ transactions::types::MockTransaction, - utxos::{ - subjects::UtxosSubject, - types::{MockUtxo, UtxoType}, - UtxoDbItem, - }, + utxos::{DynUtxoSubject, UtxoDbItem}, Subjects, }; use fuel_streams_store::{ - record::{QueryOptions, Record, RecordPacket}, + record::{QueryOptions, RecordPacket}, store::Store, }; use fuel_streams_test::{create_random_db_name, setup_db, setup_store}; -use fuel_streams_types::{Address, HexData, TxId}; +use fuel_streams_types::TxId; use pretty_assertions::assert_eq; -async fn insert_utxo(utxo: Utxo, utxo_type: UtxoType) -> anyhow::Result<()> { +async fn insert_utxo(input: &Input) -> anyhow::Result<()> { let prefix = create_random_db_name(); let (_, tx_id) = create_tx(); - let packets = create_packets(&tx_id, utxo, utxo_type, &prefix); + let packets = create_packets(input, &tx_id, &prefix, 0); assert_eq!(packets.len(), 1); let mut store = setup_store::().await?; @@ -38,7 +29,8 @@ async fn insert_utxo(utxo: Utxo, utxo_type: UtxoType) -> anyhow::Result<()> { packet ); - let db_record = store.insert_record(&packet).await?; + let db_item = db_item.unwrap(); + let db_record = store.insert_record(&db_item).await?; assert_eq!(db_record.subject, packet.subject_str()); Ok(()) @@ -51,42 +43,29 @@ fn create_tx() -> (Transaction, TxId) { } fn create_packets( + input: &Input, tx_id: &TxId, - utxo: Utxo, - utxo_type: UtxoType, prefix: &str, + input_index: u32, ) -> Vec { - let subject: Arc = UtxosSubject { - block_height: Some(1.into()), - tx_id: Some(tx_id.clone()), - tx_index: Some(0), - input_index: Some(0), - utxo_type: Some(utxo_type), - utxo_id: Some(HexData::default()), - } - .dyn_arc(); - - vec![utxo.to_packet(&subject).with_namespace(prefix)] + let subject = + DynUtxoSubject::from((input, 1.into(), tx_id.clone(), 0, input_index)); + vec![subject.packet().with_namespace(prefix)] } #[tokio::test] async fn store_can_record_coin_utxo() -> anyhow::Result<()> { - let recipient = Address::default(); - insert_utxo(MockUtxo::coin(100, recipient), UtxoType::Coin).await + insert_utxo(&MockInput::coin_signed()).await } #[tokio::test] async fn store_can_record_contract_utxo() -> anyhow::Result<()> { - let contract_id = Address::default(); - insert_utxo(MockUtxo::contract(contract_id), UtxoType::Contract).await + insert_utxo(&MockInput::contract()).await } #[tokio::test] async fn store_can_record_message_utxo() -> anyhow::Result<()> { - let sender = Address::default(); - let recipient = Address::default(); - insert_utxo(MockUtxo::message(100, sender, recipient), UtxoType::Message) - .await + insert_utxo(&MockInput::message_data_signed()).await } #[tokio::test] @@ -96,20 +75,20 @@ async fn find_many_by_subject_with_sql_columns() -> anyhow::Result<()> { store.with_namespace(&prefix); let (_, tx_id) = create_tx(); - let utxos = vec![ - (MockUtxo::coin(100, Address::default()), UtxoType::Coin), - (MockUtxo::contract(Address::default()), UtxoType::Contract), - ( - MockUtxo::message(100, Address::default(), Address::default()), - UtxoType::Message, - ), + let inputs = vec![ + (MockInput::coin_signed(), 0), + (MockInput::contract(), 1), + (MockInput::message_data_signed(), 2), ]; - for (utxo, utxo_type) in utxos { - let packets = create_packets(&tx_id, utxo, utxo_type, &prefix); + for (input, input_index) in inputs { + let packets = create_packets(&input, &tx_id, &prefix, input_index); for packet in packets { + let payload = packet.subject_payload.clone(); + let subject: Subjects = payload.try_into()?; + let subject = subject.into(); let _ = store - .find_many_by_subject(&packet.subject, QueryOptions::default()) + .find_many_by_subject(&subject, QueryOptions::default()) .await?; } } @@ -124,31 +103,27 @@ async fn test_utxo_subject_to_db_item_conversion() -> anyhow::Result<()> { let mut store = Store::::new(&db.arc()); store.with_namespace(&prefix); - let utxos = vec![ - (MockUtxo::coin(100, Address::default()), UtxoType::Coin), - (MockUtxo::contract(Address::default()), UtxoType::Contract), - ( - MockUtxo::message(100, Address::default(), Address::default()), - UtxoType::Message, - ), + let inputs = vec![ + (MockInput::coin_signed(), 0), + (MockInput::contract(), 1), + (MockInput::message_data_signed(), 2), ]; - for (utxo, utxo_type) in utxos { + for (input, input_index) in inputs { let (_, tx_id) = create_tx(); - let packets = create_packets(&tx_id, utxo, utxo_type, &prefix); + let packets = create_packets(&input, &tx_id, &prefix, input_index); let packet = packets.first().unwrap(); - let subject: Subjects = packet.clone().try_into()?; + let payload = packet.subject_payload.clone(); + let subject: Subjects = payload.try_into()?; let db_item = UtxoDbItem::try_from(packet)?; - - // Assert store insert - let inserted = store.insert_record(packet).await?; + let inserted = store.insert_record(&db_item).await?; assert_eq!(db_item, inserted); // Verify common fields assert_eq!(db_item.block_height, 1); assert_eq!(db_item.tx_id, tx_id.to_string()); assert_eq!(db_item.tx_index, 0); - assert_eq!(db_item.input_index, 0); + assert_eq!(db_item.input_index, input_index as i32); assert_eq!(db_item.subject, packet.subject_str()); match subject { diff --git a/tests/tests/stream/live_data.rs b/tests/tests/stream/live_data.rs index d4faa2af..cff1e3ed 100644 --- a/tests/tests/stream/live_data.rs +++ b/tests/tests/stream/live_data.rs @@ -1,5 +1,8 @@ -use fuel_streams_core::{server::DeliverPolicy, subjects::*, types::Block}; -use fuel_streams_store::record::{DataEncoder, Record}; +use fuel_streams_core::{ + server::DeliverPolicy, + subjects::*, + types::StreamResponse, +}; use fuel_streams_test::{ create_multiple_records, create_random_db_name, @@ -14,7 +17,7 @@ const NATS_URL: &str = "nats://localhost:4222"; async fn test_streaming_live_data() -> anyhow::Result<()> { let prefix = create_random_db_name(); let stream = setup_stream(NATS_URL, &prefix).await?; - let data = create_multiple_records(10, 0); + let data = create_multiple_records(10, 0, &prefix); tokio::spawn({ let data = data.clone(); @@ -29,8 +32,8 @@ async fn test_streaming_live_data() -> anyhow::Result<()> { while let Some((index, record)) = subscriber.next().await { let record = record.unwrap(); let expected_block = &data[index].1; - let decoded_block = Block::decode(&record.1).await.unwrap(); - assert_eq!(decoded_block, *expected_block); + let block = record.payload.as_block().unwrap(); + assert_eq!((*block).clone(), *expected_block); if index == data.len() - 1 { break; } @@ -38,10 +41,11 @@ async fn test_streaming_live_data() -> anyhow::Result<()> { } }); - for (subject, block) in data { - let packet = block.to_packet(&subject); + for record in data { + let packet = record.2.to_owned().with_namespace(&prefix); let subject = packet.subject_str(); - stream.publish(&subject, packet.value.into()).await?; + let response = StreamResponse::try_from(&packet)?; + stream.publish(&subject, &response).await?; } Ok(())