From 5e3ceb2f3024442e4ae41f7b076733e04d1146c1 Mon Sep 17 00:00:00 2001 From: fys Date: Thu, 26 Dec 2024 21:18:58 +0800 Subject: [PATCH 1/7] add docs for producer and prealloc --- examples/multi_thread_producer.rs | 2 +- examples/producer.rs | 2 +- src/producer.rs | 34 +++++++++++++++++++++++-------- src/producer/prealloc.rs | 2 ++ tests/ringbuf/mpsc.rs | 2 +- tests/ringbuf/spsc.rs | 2 +- 6 files changed, 32 insertions(+), 12 deletions(-) diff --git a/examples/multi_thread_producer.rs b/examples/multi_thread_producer.rs index c37bff0..d8bdb46 100644 --- a/examples/multi_thread_producer.rs +++ b/examples/multi_thread_producer.rs @@ -21,7 +21,7 @@ async fn main() { .heartbeat_interval(Duration::from_secs(1)) .build(); - let producer = RingbufProducer::connect_lazy(settings).await.unwrap(); + let producer = RingbufProducer::new(settings).await.unwrap(); let producer = Arc::new(producer); let mut join_handles = Vec::with_capacity(4); diff --git a/examples/producer.rs b/examples/producer.rs index fab69ed..245c31f 100644 --- a/examples/producer.rs +++ b/examples/producer.rs @@ -18,7 +18,7 @@ async fn main() { .heartbeat_interval(Duration::from_secs(1)) .build(); - let producer = RingbufProducer::connect_lazy(settings).await.unwrap(); + let producer = RingbufProducer::new(settings).await.unwrap(); let mut joins = Vec::with_capacity(100); for i in 0..10000 { diff --git a/src/producer.rs b/src/producer.rs index a148f2e..6797a3a 100644 --- a/src/producer.rs +++ b/src/producer.rs @@ -26,6 +26,7 @@ use crate::memfd::create_fd; use crate::memfd::Settings; use crate::ringbuf::Ringbuf; +/// The producer of the ringbuf based on shared memory. pub struct RingbufProducer { ringbuf: RwLock, grpc_client: GrpcClient, @@ -36,9 +37,11 @@ pub struct RingbufProducer { } impl RingbufProducer { - pub async fn connect_lazy( - settings: ProducerSettings, - ) -> Result { + /// Create a [`RingbufProducer`] by the given settings. + /// + /// It will initially try to establish the required connection, and if fails, + /// it will retry in the background. + pub async fn new(settings: ProducerSettings) -> Result { #[cfg(not(any( target_os = "linux", target_os = "android", @@ -128,9 +131,20 @@ impl RingbufProducer { Ok(producer) } - pub fn reserve(&self, size: usize) -> Result { + /// Reserve a [`PreAlloc`] for committing data. + /// + /// # Errors: + /// + /// - If the requested space exceeds the capacity of ringbuf, an + /// [`crate::error::Error::ExceedCapacity`] error will be returned. + /// + /// - If the requested space exceeds the remaining space of ringbuf, and + /// not exceeds the capacity, an [`crate::error::Error::NotEnoughSpace`] + /// error will be returned. + pub fn reserve(&self, bytes: usize) -> Result { let req_id = self.gen_req_id(); - let data_block = self.ringbuf.write().unwrap().reserve(size, req_id)?; + let data_block = + self.ringbuf.write().unwrap().reserve(bytes, req_id)?; let rx = self.result_fetcher.subscribe(req_id); @@ -139,6 +153,10 @@ impl RingbufProducer { Ok(pre) } + /// Notify the consumer to process the data. + /// + /// If the accumulated data in the ringbuf exceeds the notify_threshold, will + /// notify the consumer to process the data. pub async fn notify_consumer(&self, notify_threshold: Option) { let need_notify = notify_threshold.is_none_or(|threshold| { self.ringbuf.read().unwrap().written_bytes() > threshold @@ -160,7 +178,7 @@ impl RingbufProducer { self.online.load(Ordering::Relaxed) } - /// Check if the result fetcher is normal. + /// Check if the gRPC stream which fetch execution results is created . pub fn result_fetch_normal(&self) -> bool { self.result_fetcher.is_normal() } @@ -179,13 +197,13 @@ impl Drop for RingbufProducer { /// The [`SessionHandle`] is used to send the memfd, client id and ringbuf len /// to the consumer. -pub struct SessionHandle { +pub(crate) struct SessionHandle { pub client_id: String, pub fdpass_sock_path: PathBuf, pub memfd: File, } -pub type SessionHandleRef = Arc; +pub(crate) type SessionHandleRef = Arc; impl SessionHandle { pub async fn send(&self) -> Result<()> { diff --git a/src/producer/prealloc.rs b/src/producer/prealloc.rs index 072e7e6..98c9feb 100644 --- a/src/producer/prealloc.rs +++ b/src/producer/prealloc.rs @@ -12,6 +12,7 @@ use crate::error::Result; use crate::ringbuf::data_block::DataBlock; use crate::ringbuf::DropGuard; +/// The pre-allocated data block. pub struct PreAlloc { pub(super) data_block: DataBlock, pub(super) rx: Receiver, @@ -40,6 +41,7 @@ impl PreAlloc { self.data_block.commit(); } + /// Return a [`Handle`] to wait for the result of data processing. pub fn wait_result(self) -> Handle { Handle { rx: self.rx } } diff --git a/tests/ringbuf/mpsc.rs b/tests/ringbuf/mpsc.rs index 5b5baa5..e73d9c9 100644 --- a/tests/ringbuf/mpsc.rs +++ b/tests/ringbuf/mpsc.rs @@ -81,7 +81,7 @@ async fn do_test_ringbuf_mpsc( let msg_num = msg_num(); let producer = - Arc::new(RingbufProducer::connect_lazy(settings).await.unwrap()); + Arc::new(RingbufProducer::new(settings).await.unwrap()); for (i, expected_send) in expected_sends.into_iter().enumerate() { let options = StartProducerOptions { diff --git a/tests/ringbuf/spsc.rs b/tests/ringbuf/spsc.rs index 6e8a84e..32e44fd 100644 --- a/tests/ringbuf/spsc.rs +++ b/tests/ringbuf/spsc.rs @@ -72,7 +72,7 @@ async fn do_test_ringbuf_spsc( let msg_num = msg_num(); let producer = - Arc::new(RingbufProducer::connect_lazy(settings).await.unwrap()); + Arc::new(RingbufProducer::new(settings).await.unwrap()); let options = StartProducerOptions { producer, From 686e1a6fec2df2cb320d7f84f9fb0ffe246a5086 Mon Sep 17 00:00:00 2001 From: fys Date: Thu, 26 Dec 2024 21:31:43 +0800 Subject: [PATCH 2/7] add keywords to Cargo.toml --- Cargo.toml | 2 ++ 1 file changed, 2 insertions(+) diff --git a/Cargo.toml b/Cargo.toml index 33d69a2..54481a0 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,7 +1,9 @@ [package] name = "shm-ringbuf" version = "0.1.0" +authors = ["fys "] edition = "2021" +keywords = ["ringbuf", "shared-memory", "ipc"] license = "Apache-2.0" [features] From ae5bf82bb1351ccb8eff8a0059d9d3f6ca959ab5 Mon Sep 17 00:00:00 2001 From: fys Date: Thu, 26 Dec 2024 21:43:01 +0800 Subject: [PATCH 3/7] add doc for consumer settings --- src/consumer/settings.rs | 27 +++++++++++++++------------ 1 file changed, 15 insertions(+), 12 deletions(-) diff --git a/src/consumer/settings.rs b/src/consumer/settings.rs index 60e722e..4cb1821 100644 --- a/src/consumer/settings.rs +++ b/src/consumer/settings.rs @@ -10,7 +10,7 @@ pub struct ConsumerSettings { pub(super) grpc_sock_path: PathBuf, pub(super) fdpass_sock_path: PathBuf, pub(super) process_interval: Duration, - pub(super) max_session_capacity: u64, + pub(super) max_session_num: u64, pub(super) session_tti: Duration, } @@ -19,7 +19,7 @@ pub struct ConsumerSettingsBuilder { grpc_sock_path: Option, fdpass_sock_path: Option, process_duration: Option, - max_session_capacity: Option, + max_session_num: Option, session_tti: Option, } @@ -28,14 +28,13 @@ impl ConsumerSettingsBuilder { ConsumerSettingsBuilder::default() } - /// Set the path of the unix socket for gRPC communication. + /// Set the path of the unix socket for gRPC. pub fn grpc_sock_path(mut self, path: impl Into) -> Self { self.grpc_sock_path = Some(path.into()); self } - /// Set the path of the unix socket for passing file descriptor and other - /// information. + /// Set the path of the unix socket for passing file descriptor. pub fn fdpass_sock_path(mut self, path: impl Into) -> Self { self.fdpass_sock_path = Some(path.into()); self @@ -47,11 +46,15 @@ impl ConsumerSettingsBuilder { self } - pub fn max_session_capacity(mut self, capacity: u64) -> Self { - self.max_session_capacity = Some(capacity); + /// Set the maximum number of sessions, which limits the number of producers + /// at the same time. + pub fn max_session_num(mut self, capacity: u64) -> Self { + self.max_session_num = Some(capacity); self } + /// Set the time-to-live of a session. If a session is not used for a long + /// time, it will be purged. pub fn session_tti(mut self, ttl: Duration) -> Self { self.session_tti = Some(ttl); self @@ -68,7 +71,7 @@ impl ConsumerSettingsBuilder { let process_duration = self.process_duration.unwrap_or(DEFAULT_PROCESS_DURATION); - let max_session_capacity = self.max_session_capacity.unwrap_or(10); + let max_session_capacity = self.max_session_num.unwrap_or(10); let session_ttl = self.session_tti.unwrap_or(Duration::from_secs(10)); @@ -76,7 +79,7 @@ impl ConsumerSettingsBuilder { grpc_sock_path, fdpass_sock_path, process_interval: process_duration, - max_session_capacity, + max_session_num: max_session_capacity, session_tti: session_ttl, } } @@ -100,7 +103,7 @@ mod tests { grpc_sock_path, fdpass_sock_path, process_interval: process_duration, - max_session_capacity, + max_session_num: max_session_capacity, session_tti, } = settings; @@ -117,7 +120,7 @@ mod tests { .grpc_sock_path("/tmp/grpc_test.sock") .fdpass_sock_path("/tmp/fd_test.sock") .process_interval(Duration::from_millis(100)) - .max_session_capacity(20) + .max_session_num(20) .session_tti(Duration::from_secs(30)) .build(); @@ -125,7 +128,7 @@ mod tests { grpc_sock_path, fdpass_sock_path, process_interval: process_duration, - max_session_capacity, + max_session_num: max_session_capacity, session_tti, } = settings; From a31f523ea227be77dd7341fcc39ccac80eb0e88f Mon Sep 17 00:00:00 2001 From: fys Date: Thu, 26 Dec 2024 21:44:48 +0800 Subject: [PATCH 4/7] fix: cargo build --- src/consumer.rs | 2 +- tests/ringbuf/mpsc.rs | 3 +-- tests/ringbuf/spsc.rs | 3 +-- 3 files changed, 3 insertions(+), 5 deletions(-) diff --git a/src/consumer.rs b/src/consumer.rs index 42de026..dc2f73c 100644 --- a/src/consumer.rs +++ b/src/consumer.rs @@ -36,7 +36,7 @@ pub struct RingbufConsumer { impl RingbufConsumer { pub fn new(settings: ConsumerSettings) -> Self { let session_manager = Arc::new(SessionManager::new( - settings.max_session_capacity, + settings.max_session_num, settings.session_tti, )); let notify = Arc::new(Notify::new()); diff --git a/tests/ringbuf/mpsc.rs b/tests/ringbuf/mpsc.rs index e73d9c9..6656a99 100644 --- a/tests/ringbuf/mpsc.rs +++ b/tests/ringbuf/mpsc.rs @@ -80,8 +80,7 @@ async fn do_test_ringbuf_mpsc( let msg_num = msg_num(); - let producer = - Arc::new(RingbufProducer::new(settings).await.unwrap()); + let producer = Arc::new(RingbufProducer::new(settings).await.unwrap()); for (i, expected_send) in expected_sends.into_iter().enumerate() { let options = StartProducerOptions { diff --git a/tests/ringbuf/spsc.rs b/tests/ringbuf/spsc.rs index 32e44fd..c801b83 100644 --- a/tests/ringbuf/spsc.rs +++ b/tests/ringbuf/spsc.rs @@ -71,8 +71,7 @@ async fn do_test_ringbuf_spsc( let msg_num = msg_num(); - let producer = - Arc::new(RingbufProducer::new(settings).await.unwrap()); + let producer = Arc::new(RingbufProducer::new(settings).await.unwrap()); let options = StartProducerOptions { producer, From 4d2ac3e55766c9f4733a02372f44a366f82b04c0 Mon Sep 17 00:00:00 2001 From: fys Date: Fri, 27 Dec 2024 11:22:35 +0800 Subject: [PATCH 5/7] refactor: modify the ringbuf consumer api --- src/consumer.rs | 71 ++++++++++++++++++++++++++++++++++++++----------- 1 file changed, 55 insertions(+), 16 deletions(-) diff --git a/src/consumer.rs b/src/consumer.rs index dc2f73c..8c09e87 100644 --- a/src/consumer.rs +++ b/src/consumer.rs @@ -25,39 +25,81 @@ use crate::grpc::proto::shm_control_server::ShmControlServer; use crate::grpc::server::ShmCtlHandler; use crate::grpc::server::ShmCtlServer; +/// The comsumer of the ringbuf based on shared memory. pub struct RingbufConsumer { session_manager: SessionManagerRef, notify: Arc, settings: ConsumerSettings, cancel: CancellationToken, - detach_grpc: AtomicBool, + grpc_detached: AtomicBool, + started: AtomicBool, } impl RingbufConsumer { + /// Create a [`RingbufConsumer`] by the given settings. pub fn new(settings: ConsumerSettings) -> Self { + let (consumer, _) = Self::new_with_detach_grpc(settings, false); + consumer + } + + /// Create a [`RingbufConsumer`] by the given settings and detach the gRPC + /// server. + /// + /// Since some users have their own grpc services and do not want to occupy + /// an additional uds. + /// + /// Note: If detached, the `grpc_sock_path` in the settings will be ignored. + pub fn new_with_detach_grpc( + settings: ConsumerSettings, + detached: bool, + ) -> (RingbufConsumer, Option>) { let session_manager = Arc::new(SessionManager::new( settings.max_session_num, settings.session_tti, )); let notify = Arc::new(Notify::new()); let cancel = CancellationToken::new(); - let detach_grpc = AtomicBool::new(false); + let started = AtomicBool::new(false); + let grpc_detached = AtomicBool::new(detached); + + let grpc_server = if detached { + let handler = ShmCtlHandler { + notify: notify.clone(), + session_manager: session_manager.clone(), + }; + Some(ShmControlServer::new(handler)) + } else { + None + }; - RingbufConsumer { + let consumer = RingbufConsumer { session_manager, notify, cancel, settings, - detach_grpc, - } + grpc_detached, + started, + }; + + (consumer, grpc_server) } + /// Run the consumer, which will block the current thread. pub async fn run(&self, processor: P) where P: DataProcess, E: Into + Debug + Send, { - if !self.detach_grpc.load(Ordering::Relaxed) { + if self + .started + .compare_exchange(false, true, Ordering::Relaxed, Ordering::Relaxed) + .is_err() + { + warn!("the consumer has already started."); + return; + } + + if !self.grpc_detached.load(Ordering::Relaxed) { self.start_grpc_server().await; } @@ -68,19 +110,14 @@ impl RingbufConsumer { self.process_loop(&processor, interval, Some(cancel)).await; } - pub fn detach_grpc_server(&self) -> ShmControlServer { - let handler = ShmCtlHandler { - notify: self.notify.clone(), - session_manager: self.session_manager.clone(), - }; - self.detach_grpc.store(true, Ordering::Relaxed); - ShmControlServer::new(handler) - } - + /// Cancel the consumer. pub fn cancel(&self) { self.cancel.cancel(); } + /// Start the gRPC server. + /// 1. receive the notification from the producer. + /// 2. send the execution results to the producer via gRPC stream. async fn start_grpc_server(&self) { let cancel = self.cancel.clone(); let notify = self.notify.clone(); @@ -98,6 +135,7 @@ impl RingbufConsumer { }); } + /// Start the server to receive the file descriptors from the producer. async fn start_fdrecv_server(&self) { let cancel = self.cancel.clone(); let session_manager = self.session_manager.clone(); @@ -113,7 +151,8 @@ impl RingbufConsumer { }); } - pub async fn process_loop( + /// The main loop to process the ringbufs. + async fn process_loop( &self, processor: &P, interval: Duration, From b8ffd2a5c8ac9484458738534439ffc9fe624a61 Mon Sep 17 00:00:00 2001 From: fys Date: Fri, 27 Dec 2024 11:25:49 +0800 Subject: [PATCH 6/7] fix: doc --- src/consumer.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/consumer.rs b/src/consumer.rs index 8c09e87..b2b5c51 100644 --- a/src/consumer.rs +++ b/src/consumer.rs @@ -43,7 +43,7 @@ impl RingbufConsumer { } /// Create a [`RingbufConsumer`] by the given settings and detach the gRPC - /// server. + /// server according to the `detached` flag. /// /// Since some users have their own grpc services and do not want to occupy /// an additional uds. From ca7fb6c8a1a4b8ea63455077b5d0838a76b91c8d Mon Sep 17 00:00:00 2001 From: fys Date: Fri, 27 Dec 2024 11:31:44 +0800 Subject: [PATCH 7/7] fix: typo --- src/consumer.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/consumer.rs b/src/consumer.rs index b2b5c51..22602d2 100644 --- a/src/consumer.rs +++ b/src/consumer.rs @@ -25,7 +25,7 @@ use crate::grpc::proto::shm_control_server::ShmControlServer; use crate::grpc::server::ShmCtlHandler; use crate::grpc::server::ShmCtlServer; -/// The comsumer of the ringbuf based on shared memory. +/// The consumer of the ringbuf based on shared memory. pub struct RingbufConsumer { session_manager: SessionManagerRef, notify: Arc,