Skip to content

Commit

Permalink
feat: allow disable result fetch
Browse files Browse the repository at this point in the history
  • Loading branch information
fengys1996 committed Jan 18, 2025
1 parent 8a8de2e commit 053c098
Show file tree
Hide file tree
Showing 4 changed files with 53 additions and 15 deletions.
6 changes: 6 additions & 0 deletions src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,12 @@ pub enum Error {
#[snafu(implicit)]
location: Location,
},

#[snafu(display("Result fetch is disabled"))]
ResultFetchDisabled {
#[snafu(implicit)]
location: Location,
},
}

pub type Result<T> = std::result::Result<T, Error>;
Expand Down
34 changes: 25 additions & 9 deletions src/producer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ pub struct RingbufProducer {
online: Arc<AtomicBool>,
cancel: CancellationToken,
req_id: AtomicU32,
result_fetcher: ResultFetcher,
result_fetcher: Option<ResultFetcher>,
enable_checksum: bool,
}

Expand All @@ -48,6 +48,7 @@ impl RingbufProducer {
ringbuf_len,
fdpass_sock_path,
heartbeat_interval,
enable_result_fetch,
result_fetch_retry_interval,
enable_checksum,
#[cfg(not(any(
Expand Down Expand Up @@ -97,11 +98,17 @@ impl RingbufProducer {
let cancel_c = cancel.clone();
tokio::spawn(async move { heartbeat.run(cancel_c).await });

let result_fetcher = ResultFetcher::new(
grpc_client.clone(),
result_fetch_retry_interval,
)
.await;
let result_fetcher = if enable_result_fetch {
Some(
ResultFetcher::new(
grpc_client.clone(),
result_fetch_retry_interval,
)
.await,
)
} else {
None
};

let producer = RingbufProducer {
ringbuf,
Expand Down Expand Up @@ -131,7 +138,11 @@ impl RingbufProducer {
let data_block =
self.ringbuf.write().unwrap().reserve(bytes, req_id)?;

let rx = self.result_fetcher.subscribe(req_id);
let rx = self
.result_fetcher
.as_ref()
.map(|fetcher| fetcher.subscribe(req_id));

let enable_checksum = self.enable_checksum;

let pre = PreAlloc {
Expand Down Expand Up @@ -168,9 +179,14 @@ impl RingbufProducer {
self.online.load(Ordering::Relaxed)
}

/// Check if the gRPC stream which fetch execution results is created .
/// Check if the gRPC stream which fetch execution results is created.
/// If disabled the result fetch in producer settings, we also consider it
/// as normal.
pub fn result_fetch_normal(&self) -> bool {
self.result_fetcher.is_normal()
self.result_fetcher
.as_ref()
.map(|fetcher| fetcher.is_normal())
.unwrap_or(true)
}

/// Generate a request id.
Expand Down
17 changes: 11 additions & 6 deletions src/producer/prealloc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use crate::ringbuf::DropGuard;
/// The pre-allocated data block.
pub struct PreAlloc {
pub(super) data_block: DataBlock<DropGuard>,
pub(super) rx: Receiver<DataProcessResult>,
pub(super) rx: Option<Receiver<DataProcessResult>>,
pub(super) enable_checksum: bool,
}

Expand Down Expand Up @@ -53,8 +53,9 @@ impl PreAlloc {
}
}

/// The handle to wait for the result of data processing.
pub struct Handle {
rx: Receiver<DataProcessResult>,
rx: Option<Receiver<DataProcessResult>>,
}

impl Future for Handle {
Expand All @@ -64,9 +65,13 @@ impl Future for Handle {
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Self::Output> {
self.rx.poll_unpin(cx).map_err(|e| error::Error::Recv {
source: e,
location: snafu::location!(),
})
if let Some(rx) = self.rx.as_mut() {
rx.poll_unpin(cx).map_err(|e| error::Error::Recv {
source: e,
location: snafu::location!(),
})
} else {
Poll::Ready(error::ResultFetchDisabledSnafu.fail())
}
}
}
11 changes: 11 additions & 0 deletions src/producer/settings.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ pub struct ProducerSettings {
pub(super) fdpass_sock_path: PathBuf,
pub(super) ringbuf_len: usize,
pub(super) heartbeat_interval: Duration,
pub(super) enable_result_fetch: bool,
pub(super) result_fetch_retry_interval: Duration,
pub(super) enable_checksum: bool,
#[cfg(not(any(
Expand All @@ -29,6 +30,7 @@ pub struct ProducerSettingsBuilder {
fdpass_sock_path: Option<PathBuf>,
ringbuf_len: Option<usize>,
heartbeat_interval: Option<Duration>,
enable_result_fetch: Option<bool>,
result_fetch_retry_interval: Option<Duration>,
enable_checksum: Option<bool>,
#[cfg(not(any(
Expand Down Expand Up @@ -70,6 +72,12 @@ impl ProducerSettingsBuilder {
self
}

/// Enable fetching the result of consumer processing data.
pub fn enable_result_fetch(mut self, enable: bool) -> Self {
self.enable_result_fetch = Some(enable);
self
}

/// Set the interval for retrying to fetch the result.
pub fn result_fetch_retry_interval(mut self, interval: Duration) -> Self {
self.result_fetch_retry_interval = Some(interval);
Expand Down Expand Up @@ -108,6 +116,8 @@ impl ProducerSettingsBuilder {
.heartbeat_interval
.unwrap_or(DEFAULT_HEARTBEAT_INTERVAL);

let enable_result_fetch = self.enable_result_fetch.unwrap_or(true);

let result_fetch_retry_interval = self
.result_fetch_retry_interval
.unwrap_or(DEFAULT_RESULT_FETCH_RETRY_INTERVAL);
Expand All @@ -129,6 +139,7 @@ impl ProducerSettingsBuilder {
ringbuf_len,
heartbeat_interval,
enable_checksum,
enable_result_fetch,
result_fetch_retry_interval,
#[cfg(not(any(
target_os = "linux",
Expand Down

0 comments on commit 053c098

Please sign in to comment.