diff --git a/Cargo.toml b/Cargo.toml index 33d69a2..54481a0 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,7 +1,9 @@ [package] name = "shm-ringbuf" version = "0.1.0" +authors = ["fys "] edition = "2021" +keywords = ["ringbuf", "shared-memory", "ipc"] license = "Apache-2.0" [features] diff --git a/examples/multi_thread_producer.rs b/examples/multi_thread_producer.rs index c37bff0..d8bdb46 100644 --- a/examples/multi_thread_producer.rs +++ b/examples/multi_thread_producer.rs @@ -21,7 +21,7 @@ async fn main() { .heartbeat_interval(Duration::from_secs(1)) .build(); - let producer = RingbufProducer::connect_lazy(settings).await.unwrap(); + let producer = RingbufProducer::new(settings).await.unwrap(); let producer = Arc::new(producer); let mut join_handles = Vec::with_capacity(4); diff --git a/examples/producer.rs b/examples/producer.rs index fab69ed..245c31f 100644 --- a/examples/producer.rs +++ b/examples/producer.rs @@ -18,7 +18,7 @@ async fn main() { .heartbeat_interval(Duration::from_secs(1)) .build(); - let producer = RingbufProducer::connect_lazy(settings).await.unwrap(); + let producer = RingbufProducer::new(settings).await.unwrap(); let mut joins = Vec::with_capacity(100); for i in 0..10000 { diff --git a/src/consumer.rs b/src/consumer.rs index 42de026..22602d2 100644 --- a/src/consumer.rs +++ b/src/consumer.rs @@ -25,39 +25,81 @@ use crate::grpc::proto::shm_control_server::ShmControlServer; use crate::grpc::server::ShmCtlHandler; use crate::grpc::server::ShmCtlServer; +/// The consumer of the ringbuf based on shared memory. pub struct RingbufConsumer { session_manager: SessionManagerRef, notify: Arc, settings: ConsumerSettings, cancel: CancellationToken, - detach_grpc: AtomicBool, + grpc_detached: AtomicBool, + started: AtomicBool, } impl RingbufConsumer { + /// Create a [`RingbufConsumer`] by the given settings. pub fn new(settings: ConsumerSettings) -> Self { + let (consumer, _) = Self::new_with_detach_grpc(settings, false); + consumer + } + + /// Create a [`RingbufConsumer`] by the given settings and detach the gRPC + /// server according to the `detached` flag. + /// + /// Since some users have their own grpc services and do not want to occupy + /// an additional uds. + /// + /// Note: If detached, the `grpc_sock_path` in the settings will be ignored. + pub fn new_with_detach_grpc( + settings: ConsumerSettings, + detached: bool, + ) -> (RingbufConsumer, Option>) { let session_manager = Arc::new(SessionManager::new( - settings.max_session_capacity, + settings.max_session_num, settings.session_tti, )); let notify = Arc::new(Notify::new()); let cancel = CancellationToken::new(); - let detach_grpc = AtomicBool::new(false); + let started = AtomicBool::new(false); + let grpc_detached = AtomicBool::new(detached); + + let grpc_server = if detached { + let handler = ShmCtlHandler { + notify: notify.clone(), + session_manager: session_manager.clone(), + }; + Some(ShmControlServer::new(handler)) + } else { + None + }; - RingbufConsumer { + let consumer = RingbufConsumer { session_manager, notify, cancel, settings, - detach_grpc, - } + grpc_detached, + started, + }; + + (consumer, grpc_server) } + /// Run the consumer, which will block the current thread. pub async fn run(&self, processor: P) where P: DataProcess, E: Into + Debug + Send, { - if !self.detach_grpc.load(Ordering::Relaxed) { + if self + .started + .compare_exchange(false, true, Ordering::Relaxed, Ordering::Relaxed) + .is_err() + { + warn!("the consumer has already started."); + return; + } + + if !self.grpc_detached.load(Ordering::Relaxed) { self.start_grpc_server().await; } @@ -68,19 +110,14 @@ impl RingbufConsumer { self.process_loop(&processor, interval, Some(cancel)).await; } - pub fn detach_grpc_server(&self) -> ShmControlServer { - let handler = ShmCtlHandler { - notify: self.notify.clone(), - session_manager: self.session_manager.clone(), - }; - self.detach_grpc.store(true, Ordering::Relaxed); - ShmControlServer::new(handler) - } - + /// Cancel the consumer. pub fn cancel(&self) { self.cancel.cancel(); } + /// Start the gRPC server. + /// 1. receive the notification from the producer. + /// 2. send the execution results to the producer via gRPC stream. async fn start_grpc_server(&self) { let cancel = self.cancel.clone(); let notify = self.notify.clone(); @@ -98,6 +135,7 @@ impl RingbufConsumer { }); } + /// Start the server to receive the file descriptors from the producer. async fn start_fdrecv_server(&self) { let cancel = self.cancel.clone(); let session_manager = self.session_manager.clone(); @@ -113,7 +151,8 @@ impl RingbufConsumer { }); } - pub async fn process_loop( + /// The main loop to process the ringbufs. + async fn process_loop( &self, processor: &P, interval: Duration, diff --git a/src/consumer/settings.rs b/src/consumer/settings.rs index 60e722e..4cb1821 100644 --- a/src/consumer/settings.rs +++ b/src/consumer/settings.rs @@ -10,7 +10,7 @@ pub struct ConsumerSettings { pub(super) grpc_sock_path: PathBuf, pub(super) fdpass_sock_path: PathBuf, pub(super) process_interval: Duration, - pub(super) max_session_capacity: u64, + pub(super) max_session_num: u64, pub(super) session_tti: Duration, } @@ -19,7 +19,7 @@ pub struct ConsumerSettingsBuilder { grpc_sock_path: Option, fdpass_sock_path: Option, process_duration: Option, - max_session_capacity: Option, + max_session_num: Option, session_tti: Option, } @@ -28,14 +28,13 @@ impl ConsumerSettingsBuilder { ConsumerSettingsBuilder::default() } - /// Set the path of the unix socket for gRPC communication. + /// Set the path of the unix socket for gRPC. pub fn grpc_sock_path(mut self, path: impl Into) -> Self { self.grpc_sock_path = Some(path.into()); self } - /// Set the path of the unix socket for passing file descriptor and other - /// information. + /// Set the path of the unix socket for passing file descriptor. pub fn fdpass_sock_path(mut self, path: impl Into) -> Self { self.fdpass_sock_path = Some(path.into()); self @@ -47,11 +46,15 @@ impl ConsumerSettingsBuilder { self } - pub fn max_session_capacity(mut self, capacity: u64) -> Self { - self.max_session_capacity = Some(capacity); + /// Set the maximum number of sessions, which limits the number of producers + /// at the same time. + pub fn max_session_num(mut self, capacity: u64) -> Self { + self.max_session_num = Some(capacity); self } + /// Set the time-to-live of a session. If a session is not used for a long + /// time, it will be purged. pub fn session_tti(mut self, ttl: Duration) -> Self { self.session_tti = Some(ttl); self @@ -68,7 +71,7 @@ impl ConsumerSettingsBuilder { let process_duration = self.process_duration.unwrap_or(DEFAULT_PROCESS_DURATION); - let max_session_capacity = self.max_session_capacity.unwrap_or(10); + let max_session_capacity = self.max_session_num.unwrap_or(10); let session_ttl = self.session_tti.unwrap_or(Duration::from_secs(10)); @@ -76,7 +79,7 @@ impl ConsumerSettingsBuilder { grpc_sock_path, fdpass_sock_path, process_interval: process_duration, - max_session_capacity, + max_session_num: max_session_capacity, session_tti: session_ttl, } } @@ -100,7 +103,7 @@ mod tests { grpc_sock_path, fdpass_sock_path, process_interval: process_duration, - max_session_capacity, + max_session_num: max_session_capacity, session_tti, } = settings; @@ -117,7 +120,7 @@ mod tests { .grpc_sock_path("/tmp/grpc_test.sock") .fdpass_sock_path("/tmp/fd_test.sock") .process_interval(Duration::from_millis(100)) - .max_session_capacity(20) + .max_session_num(20) .session_tti(Duration::from_secs(30)) .build(); @@ -125,7 +128,7 @@ mod tests { grpc_sock_path, fdpass_sock_path, process_interval: process_duration, - max_session_capacity, + max_session_num: max_session_capacity, session_tti, } = settings; diff --git a/src/producer.rs b/src/producer.rs index a148f2e..6797a3a 100644 --- a/src/producer.rs +++ b/src/producer.rs @@ -26,6 +26,7 @@ use crate::memfd::create_fd; use crate::memfd::Settings; use crate::ringbuf::Ringbuf; +/// The producer of the ringbuf based on shared memory. pub struct RingbufProducer { ringbuf: RwLock, grpc_client: GrpcClient, @@ -36,9 +37,11 @@ pub struct RingbufProducer { } impl RingbufProducer { - pub async fn connect_lazy( - settings: ProducerSettings, - ) -> Result { + /// Create a [`RingbufProducer`] by the given settings. + /// + /// It will initially try to establish the required connection, and if fails, + /// it will retry in the background. + pub async fn new(settings: ProducerSettings) -> Result { #[cfg(not(any( target_os = "linux", target_os = "android", @@ -128,9 +131,20 @@ impl RingbufProducer { Ok(producer) } - pub fn reserve(&self, size: usize) -> Result { + /// Reserve a [`PreAlloc`] for committing data. + /// + /// # Errors: + /// + /// - If the requested space exceeds the capacity of ringbuf, an + /// [`crate::error::Error::ExceedCapacity`] error will be returned. + /// + /// - If the requested space exceeds the remaining space of ringbuf, and + /// not exceeds the capacity, an [`crate::error::Error::NotEnoughSpace`] + /// error will be returned. + pub fn reserve(&self, bytes: usize) -> Result { let req_id = self.gen_req_id(); - let data_block = self.ringbuf.write().unwrap().reserve(size, req_id)?; + let data_block = + self.ringbuf.write().unwrap().reserve(bytes, req_id)?; let rx = self.result_fetcher.subscribe(req_id); @@ -139,6 +153,10 @@ impl RingbufProducer { Ok(pre) } + /// Notify the consumer to process the data. + /// + /// If the accumulated data in the ringbuf exceeds the notify_threshold, will + /// notify the consumer to process the data. pub async fn notify_consumer(&self, notify_threshold: Option) { let need_notify = notify_threshold.is_none_or(|threshold| { self.ringbuf.read().unwrap().written_bytes() > threshold @@ -160,7 +178,7 @@ impl RingbufProducer { self.online.load(Ordering::Relaxed) } - /// Check if the result fetcher is normal. + /// Check if the gRPC stream which fetch execution results is created . pub fn result_fetch_normal(&self) -> bool { self.result_fetcher.is_normal() } @@ -179,13 +197,13 @@ impl Drop for RingbufProducer { /// The [`SessionHandle`] is used to send the memfd, client id and ringbuf len /// to the consumer. -pub struct SessionHandle { +pub(crate) struct SessionHandle { pub client_id: String, pub fdpass_sock_path: PathBuf, pub memfd: File, } -pub type SessionHandleRef = Arc; +pub(crate) type SessionHandleRef = Arc; impl SessionHandle { pub async fn send(&self) -> Result<()> { diff --git a/src/producer/prealloc.rs b/src/producer/prealloc.rs index 072e7e6..98c9feb 100644 --- a/src/producer/prealloc.rs +++ b/src/producer/prealloc.rs @@ -12,6 +12,7 @@ use crate::error::Result; use crate::ringbuf::data_block::DataBlock; use crate::ringbuf::DropGuard; +/// The pre-allocated data block. pub struct PreAlloc { pub(super) data_block: DataBlock, pub(super) rx: Receiver, @@ -40,6 +41,7 @@ impl PreAlloc { self.data_block.commit(); } + /// Return a [`Handle`] to wait for the result of data processing. pub fn wait_result(self) -> Handle { Handle { rx: self.rx } } diff --git a/tests/ringbuf/mpsc.rs b/tests/ringbuf/mpsc.rs index 5b5baa5..6656a99 100644 --- a/tests/ringbuf/mpsc.rs +++ b/tests/ringbuf/mpsc.rs @@ -80,8 +80,7 @@ async fn do_test_ringbuf_mpsc( let msg_num = msg_num(); - let producer = - Arc::new(RingbufProducer::connect_lazy(settings).await.unwrap()); + let producer = Arc::new(RingbufProducer::new(settings).await.unwrap()); for (i, expected_send) in expected_sends.into_iter().enumerate() { let options = StartProducerOptions { diff --git a/tests/ringbuf/spsc.rs b/tests/ringbuf/spsc.rs index 6e8a84e..c801b83 100644 --- a/tests/ringbuf/spsc.rs +++ b/tests/ringbuf/spsc.rs @@ -71,8 +71,7 @@ async fn do_test_ringbuf_spsc( let msg_num = msg_num(); - let producer = - Arc::new(RingbufProducer::connect_lazy(settings).await.unwrap()); + let producer = Arc::new(RingbufProducer::new(settings).await.unwrap()); let options = StartProducerOptions { producer,