Skip to content

Commit

Permalink
feat(repo): Add pointers inside the StreamResponse (#404)
Browse files Browse the repository at this point in the history
feat(repo): add record pointer on stream response
  • Loading branch information
pedronauck authored Feb 17, 2025
1 parent b968281 commit efd339c
Show file tree
Hide file tree
Showing 55 changed files with 1,109 additions and 1,074 deletions.
4 changes: 0 additions & 4 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion crates/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ version.workspace = true
anyhow.workspace = true
async-nats.workspace = true
async-stream.workspace = true
bytes.workspace = true
dotenvy.workspace = true
fuel-core.workspace = true
fuel-message-broker.workspace = true
Expand Down
105 changes: 102 additions & 3 deletions crates/core/src/server/responses.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,26 @@
use std::sync::Arc;

use fuel_streams_domains::{
blocks::BlockDbItem,
inputs::InputDbItem,
outputs::OutputDbItem,
receipts::ReceiptDbItem,
transactions::TransactionDbItem,
utxos::UtxoDbItem,
};
use fuel_streams_store::{
db::DbError,
record::{DataEncoder, RecordEntity, RecordEntityError},
db::{DbError, DbItem},
record::{
DataEncoder,
EncoderError,
RecordEntity,
RecordEntityError,
RecordPacket,
RecordPacketError,
RecordPointer,
},
};
use fuel_web_utils::server::api::API_VERSION;
use serde::{Deserialize, Serialize};

use crate::types::*;
Expand Down Expand Up @@ -104,15 +121,50 @@ impl MessagePayload {
}
}

#[derive(thiserror::Error, Debug)]
pub enum StreamResponseError {
#[error(transparent)]
Encoder(#[from] EncoderError),
#[error(transparent)]
MessagePayload(#[from] MessagePayloadError),
#[error(transparent)]
RecordEntity(#[from] RecordEntityError),
#[error(transparent)]
RecordPacket(#[from] RecordPacketError),
}

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct StreamResponse {
pub version: String,
#[serde(rename = "type")]
pub ty: String,
pub version: String,
pub subject: String,
pub pointer: RecordPointer,
pub payload: MessagePayload,
}

impl StreamResponse {
pub fn new(
subject: String,
subject_id: String,
value: &[u8],
pointer: RecordPointer,
) -> Result<Self, StreamResponseError> {
let payload = MessagePayload::new(&subject_id, value)?;
Ok(Self {
ty: subject_id,
version: API_VERSION.to_string(),
subject,
payload,
pointer,
})
}
}

impl DataEncoder for StreamResponse {
type Err = StreamResponseError;
}

#[derive(Clone, Debug, Deserialize, Serialize)]
#[serde(rename_all = "camelCase")]
pub enum ServerResponse {
Expand All @@ -121,3 +173,50 @@ pub enum ServerResponse {
Response(StreamResponse),
Error(String),
}

impl<T: DbItem + Into<RecordPointer>> TryFrom<(String, T)> for StreamResponse {
type Error = StreamResponseError;
fn try_from((subject_id, item): (String, T)) -> Result<Self, Self::Error> {
let pointer: RecordPointer = item.to_owned().into();
StreamResponse::new(
item.subject_str(),
subject_id,
item.encoded_value(),
pointer,
)
}
}

impl TryFrom<&RecordPacket> for StreamResponse {
type Error = StreamResponseError;
fn try_from(packet: &RecordPacket) -> Result<Self, Self::Error> {
let subject_id = packet.subject_id();
let entity = RecordEntity::from_subject_id(&subject_id)?;
match entity {
RecordEntity::Block => {
let db_item = BlockDbItem::try_from(packet)?;
StreamResponse::try_from((subject_id, db_item))
}
RecordEntity::Transaction => {
let db_item = TransactionDbItem::try_from(packet)?;
StreamResponse::try_from((subject_id, db_item))
}
RecordEntity::Input => {
let db_item = InputDbItem::try_from(packet)?;
StreamResponse::try_from((subject_id, db_item))
}
RecordEntity::Output => {
let db_item = OutputDbItem::try_from(packet)?;
StreamResponse::try_from((subject_id, db_item))
}
RecordEntity::Receipt => {
let db_item = ReceiptDbItem::try_from(packet)?;
StreamResponse::try_from((subject_id, db_item))
}
RecordEntity::Utxo => {
let db_item = UtxoDbItem::try_from(packet)?;
StreamResponse::try_from((subject_id, db_item))
}
}
}
}
8 changes: 6 additions & 2 deletions crates/core/src/stream/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,11 @@ use async_nats::SubscribeError;
use fuel_message_broker::MessageBrokerError;
use fuel_streams_store::{
db::{DbError, SqlxError},
record::RecordPacketError,
record::{RecordEntityError, RecordPacketError},
store::StoreError,
};

use crate::server::DeliverPolicyError;
use crate::{server::DeliverPolicyError, types::StreamResponseError};

#[derive(Debug, thiserror::Error)]
pub enum StreamError {
Expand All @@ -24,4 +24,8 @@ pub enum StreamError {
RecordPacket(#[from] RecordPacketError),
#[error(transparent)]
Sqlx(#[from] SqlxError),
#[error(transparent)]
StreamResponse(#[from] StreamResponseError),
#[error(transparent)]
RecordEntity(#[from] RecordEntityError),
}
35 changes: 33 additions & 2 deletions crates/core/src/stream/fuel_streams.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
use std::sync::Arc;

use fuel_message_broker::NatsMessageBroker;
use fuel_streams_store::db::Db;
use fuel_streams_store::{
db::Db,
record::{RecordEntity, RecordPacket},
};

use super::Stream;
use super::{Stream, StreamError};
use crate::types::*;

#[derive(Clone, Debug)]
Expand Down Expand Up @@ -39,4 +42,32 @@ impl FuelStreams {
pub fn broker(&self) -> Arc<NatsMessageBroker> {
self.msg_broker.clone()
}

pub async fn publish_by_entity(
&self,
packet: Arc<RecordPacket>,
) -> Result<(), StreamError> {
let subject = (*packet).subject_str();
let subject_id = (*packet).subject_id();
let entity = RecordEntity::from_subject_id(&subject_id)?;
let response = StreamResponse::try_from(&*packet)?;
match entity {
RecordEntity::Block => {
self.blocks.publish(&subject, &response).await
}
RecordEntity::Transaction => {
self.transactions.publish(&subject, &response).await
}
RecordEntity::Input => {
self.inputs.publish(&subject, &response).await
}
RecordEntity::Receipt => {
self.receipts.publish(&subject, &response).await
}
RecordEntity::Output => {
self.outputs.publish(&subject, &response).await
}
RecordEntity::Utxo => self.utxos.publish(&subject, &response).await,
}
}
}
80 changes: 65 additions & 15 deletions crates/core/src/stream/stream_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,23 @@ use std::{sync::Arc, time::Duration};

pub use async_nats::Subscriber as StreamLiveSubscriber;
use fuel_message_broker::NatsMessageBroker;
use fuel_streams_store::{db::Db, record::Record, store::Store};
use fuel_streams_store::{
db::{Db, DbItem},
record::{DataEncoder, QueryOptions, Record},
store::{find_last_block_height, Store},
};
use fuel_streams_subject::subject::IntoSubject;
use fuel_streams_types::BlockHeight;
use futures::{
stream::{BoxStream, Stream as FStream},
StreamExt,
};
use tokio::{sync::OnceCell, time::sleep};

use super::{config, StreamError};
use crate::server::DeliverPolicy;
use crate::{server::DeliverPolicy, types::StreamResponse};

pub type BoxedStoreItem = Result<(String, Vec<u8>), StreamError>;
pub type BoxedStoreItem = Result<StreamResponse, StreamError>;
pub type BoxedStream = Box<dyn FStream<Item = BoxedStoreItem> + Send + Unpin>;

#[derive(Debug, Clone)]
Expand Down Expand Up @@ -59,24 +64,34 @@ impl<R: Record> Stream<R> {
pub async fn publish(
&self,
subject: &str,
payload: bytes::Bytes,
response: &StreamResponse,
) -> Result<(), StreamError> {
let broker = self.broker.clone();
broker.publish(subject, payload).await?;
let payload = response.encode_json()?;
broker.publish(subject, payload.into()).await?;
Ok(())
}

pub async fn subscribe<S: IntoSubject>(
&self,
subject: S,
deliver_policy: DeliverPolicy,
) -> BoxStream<'static, Result<StreamResponse, StreamError>> {
let subject = Arc::new(subject);
self.subscribe_dynamic(subject, deliver_policy).await
}

pub async fn subscribe_dynamic(
&self,
subject: Arc<dyn IntoSubject>,
deliver_policy: DeliverPolicy,
) -> BoxStream<'static, Result<(String, Vec<u8>), StreamError>> {
) -> BoxStream<'static, Result<StreamResponse, StreamError>> {
let broker = self.broker.clone();
let subject = subject.clone();
let store = self.store.clone();
let store = self.clone();
let stream = async_stream::try_stream! {
if let DeliverPolicy::FromBlock { block_height } = deliver_policy {
let mut historical = store.historical_streaming(subject.to_owned(), Some(block_height), None).await;
let mut historical = store.historical_streaming(subject.to_owned(), Some(block_height), None);
while let Some(result) = historical.next().await {
yield result?;
let throttle_time = *config::STREAM_THROTTLE_HISTORICAL;
Expand All @@ -85,20 +100,55 @@ impl<R: Record> Stream<R> {
}
let mut live = broker.subscribe(&subject.parse()).await?;
while let Some(msg) = live.next().await {
yield msg?;
let msg = msg?;
let stream_response = StreamResponse::decode_json(&msg)?;
yield stream_response;
let throttle_time = *config::STREAM_THROTTLE_LIVE;
sleep(Duration::from_millis(throttle_time as u64)).await;
}
};
Box::pin(stream)
}

pub async fn subscribe<S: IntoSubject>(
pub fn historical_streaming(
&self,
subject: S,
deliver_policy: DeliverPolicy,
) -> BoxStream<'static, Result<(String, Vec<u8>), StreamError>> {
let subject = Arc::new(subject);
self.subscribe_dynamic(subject, deliver_policy).await
subject: Arc<dyn IntoSubject>,
from_block: Option<BlockHeight>,
query_opts: Option<QueryOptions>,
) -> BoxStream<'static, Result<StreamResponse, StreamError>> {
let store = self.store.clone();
let db = self.store.db.clone();
let stream = async_stream::try_stream! {
let mut current_height = from_block.unwrap_or_default();
let mut opts = query_opts.unwrap_or_default().with_from_block(Some(current_height));
let mut last_height = find_last_block_height(&db, opts.clone()).await?;
while current_height <= last_height {
let items = store.find_many_by_subject(&subject, opts.clone()).await?;
for item in items {
let subject = item.subject_str();
let subject_id = item.subject_id();
let value = item.encoded_value().to_vec();
let pointer = item.into();
let response = StreamResponse::new(subject, subject_id, &value, pointer.to_owned())?;
yield response;
current_height = pointer.block_height;
}
opts.increment_offset();
// When we reach the last known height, we need to check if any new blocks
// were produced while we were processing the previous ones
if current_height == last_height {
let new_last_height = find_last_block_height(&db, opts.clone()).await?;
if new_last_height > last_height {
// Reset current_height back to process the blocks we haven't seen yet
current_height = last_height;
last_height = new_last_height;
} else {
tracing::debug!("No new blocks found, stopping historical streaming on block {}", current_height);
break
}
}
}
};
Box::pin(stream)
}
}
1 change: 0 additions & 1 deletion crates/domains/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ serde_json.workspace = true
sqlx.workspace = true
thiserror.workspace = true
tokio.workspace = true
tracing.workspace = true

# these dependencies need to update in the future when fuel-core 0.41.4 is on mainnet
[target.'cfg(target_os = "linux")'.dependencies]
Expand Down
Loading

0 comments on commit efd339c

Please sign in to comment.