Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: adjust api #18

Merged
merged 7 commits into from
Dec 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
[package]
name = "shm-ringbuf"
version = "0.1.0"
authors = ["fys <fengys1996@gmail.com>"]
edition = "2021"
keywords = ["ringbuf", "shared-memory", "ipc"]
license = "Apache-2.0"

[features]
Expand Down
2 changes: 1 addition & 1 deletion examples/multi_thread_producer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ async fn main() {
.heartbeat_interval(Duration::from_secs(1))
.build();

let producer = RingbufProducer::connect_lazy(settings).await.unwrap();
let producer = RingbufProducer::new(settings).await.unwrap();
let producer = Arc::new(producer);

let mut join_handles = Vec::with_capacity(4);
Expand Down
2 changes: 1 addition & 1 deletion examples/producer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ async fn main() {
.heartbeat_interval(Duration::from_secs(1))
.build();

let producer = RingbufProducer::connect_lazy(settings).await.unwrap();
let producer = RingbufProducer::new(settings).await.unwrap();

let mut joins = Vec::with_capacity(100);
for i in 0..10000 {
Expand Down
73 changes: 56 additions & 17 deletions src/consumer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,39 +25,81 @@ use crate::grpc::proto::shm_control_server::ShmControlServer;
use crate::grpc::server::ShmCtlHandler;
use crate::grpc::server::ShmCtlServer;

/// The consumer of the ringbuf based on shared memory.
pub struct RingbufConsumer {
session_manager: SessionManagerRef,
notify: Arc<Notify>,
settings: ConsumerSettings,
cancel: CancellationToken,
detach_grpc: AtomicBool,
grpc_detached: AtomicBool,
started: AtomicBool,
}

impl RingbufConsumer {
/// Create a [`RingbufConsumer`] by the given settings.
pub fn new(settings: ConsumerSettings) -> Self {
let (consumer, _) = Self::new_with_detach_grpc(settings, false);
consumer
}

/// Create a [`RingbufConsumer`] by the given settings and detach the gRPC
/// server according to the `detached` flag.
///
/// Since some users have their own grpc services and do not want to occupy
/// an additional uds.
///
/// Note: If detached, the `grpc_sock_path` in the settings will be ignored.
pub fn new_with_detach_grpc(
settings: ConsumerSettings,
detached: bool,
) -> (RingbufConsumer, Option<ShmControlServer<ShmCtlHandler>>) {
let session_manager = Arc::new(SessionManager::new(
settings.max_session_capacity,
settings.max_session_num,
settings.session_tti,
));
let notify = Arc::new(Notify::new());
let cancel = CancellationToken::new();
let detach_grpc = AtomicBool::new(false);
let started = AtomicBool::new(false);
let grpc_detached = AtomicBool::new(detached);

let grpc_server = if detached {
let handler = ShmCtlHandler {
notify: notify.clone(),
session_manager: session_manager.clone(),
};
Some(ShmControlServer::new(handler))
} else {
None
};

RingbufConsumer {
let consumer = RingbufConsumer {
session_manager,
notify,
cancel,
settings,
detach_grpc,
}
grpc_detached,
started,
};

(consumer, grpc_server)
}

/// Run the consumer, which will block the current thread.
pub async fn run<P, E>(&self, processor: P)
where
P: DataProcess<Error = E>,
E: Into<DataProcessResult> + Debug + Send,
{
if !self.detach_grpc.load(Ordering::Relaxed) {
if self
.started
.compare_exchange(false, true, Ordering::Relaxed, Ordering::Relaxed)
.is_err()
{
warn!("the consumer has already started.");
return;
}

if !self.grpc_detached.load(Ordering::Relaxed) {
self.start_grpc_server().await;
}

Expand All @@ -68,19 +110,14 @@ impl RingbufConsumer {
self.process_loop(&processor, interval, Some(cancel)).await;
}

pub fn detach_grpc_server(&self) -> ShmControlServer<ShmCtlHandler> {
let handler = ShmCtlHandler {
notify: self.notify.clone(),
session_manager: self.session_manager.clone(),
};
self.detach_grpc.store(true, Ordering::Relaxed);
ShmControlServer::new(handler)
}

/// Cancel the consumer.
pub fn cancel(&self) {
self.cancel.cancel();
}

/// Start the gRPC server.
/// 1. receive the notification from the producer.
/// 2. send the execution results to the producer via gRPC stream.
async fn start_grpc_server(&self) {
let cancel = self.cancel.clone();
let notify = self.notify.clone();
Expand All @@ -98,6 +135,7 @@ impl RingbufConsumer {
});
}

/// Start the server to receive the file descriptors from the producer.
async fn start_fdrecv_server(&self) {
let cancel = self.cancel.clone();
let session_manager = self.session_manager.clone();
Expand All @@ -113,7 +151,8 @@ impl RingbufConsumer {
});
}

pub async fn process_loop<P, E>(
/// The main loop to process the ringbufs.
async fn process_loop<P, E>(
&self,
processor: &P,
interval: Duration,
Expand Down
27 changes: 15 additions & 12 deletions src/consumer/settings.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ pub struct ConsumerSettings {
pub(super) grpc_sock_path: PathBuf,
pub(super) fdpass_sock_path: PathBuf,
pub(super) process_interval: Duration,
pub(super) max_session_capacity: u64,
pub(super) max_session_num: u64,
pub(super) session_tti: Duration,
}

Expand All @@ -19,7 +19,7 @@ pub struct ConsumerSettingsBuilder {
grpc_sock_path: Option<PathBuf>,
fdpass_sock_path: Option<PathBuf>,
process_duration: Option<Duration>,
max_session_capacity: Option<u64>,
max_session_num: Option<u64>,
session_tti: Option<Duration>,
}

Expand All @@ -28,14 +28,13 @@ impl ConsumerSettingsBuilder {
ConsumerSettingsBuilder::default()
}

/// Set the path of the unix socket for gRPC communication.
/// Set the path of the unix socket for gRPC.
pub fn grpc_sock_path(mut self, path: impl Into<PathBuf>) -> Self {
self.grpc_sock_path = Some(path.into());
self
}

/// Set the path of the unix socket for passing file descriptor and other
/// information.
/// Set the path of the unix socket for passing file descriptor.
pub fn fdpass_sock_path(mut self, path: impl Into<PathBuf>) -> Self {
self.fdpass_sock_path = Some(path.into());
self
Expand All @@ -47,11 +46,15 @@ impl ConsumerSettingsBuilder {
self
}

pub fn max_session_capacity(mut self, capacity: u64) -> Self {
self.max_session_capacity = Some(capacity);
/// Set the maximum number of sessions, which limits the number of producers
/// at the same time.
pub fn max_session_num(mut self, capacity: u64) -> Self {
self.max_session_num = Some(capacity);
self
}

/// Set the time-to-live of a session. If a session is not used for a long
/// time, it will be purged.
pub fn session_tti(mut self, ttl: Duration) -> Self {
self.session_tti = Some(ttl);
self
Expand All @@ -68,15 +71,15 @@ impl ConsumerSettingsBuilder {
let process_duration =
self.process_duration.unwrap_or(DEFAULT_PROCESS_DURATION);

let max_session_capacity = self.max_session_capacity.unwrap_or(10);
let max_session_capacity = self.max_session_num.unwrap_or(10);

let session_ttl = self.session_tti.unwrap_or(Duration::from_secs(10));

ConsumerSettings {
grpc_sock_path,
fdpass_sock_path,
process_interval: process_duration,
max_session_capacity,
max_session_num: max_session_capacity,
session_tti: session_ttl,
}
}
Expand All @@ -100,7 +103,7 @@ mod tests {
grpc_sock_path,
fdpass_sock_path,
process_interval: process_duration,
max_session_capacity,
max_session_num: max_session_capacity,
session_tti,
} = settings;

Expand All @@ -117,15 +120,15 @@ mod tests {
.grpc_sock_path("/tmp/grpc_test.sock")
.fdpass_sock_path("/tmp/fd_test.sock")
.process_interval(Duration::from_millis(100))
.max_session_capacity(20)
.max_session_num(20)
.session_tti(Duration::from_secs(30))
.build();

let ConsumerSettings {
grpc_sock_path,
fdpass_sock_path,
process_interval: process_duration,
max_session_capacity,
max_session_num: max_session_capacity,
session_tti,
} = settings;

Expand Down
34 changes: 26 additions & 8 deletions src/producer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ use crate::memfd::create_fd;
use crate::memfd::Settings;
use crate::ringbuf::Ringbuf;

/// The producer of the ringbuf based on shared memory.
pub struct RingbufProducer {
ringbuf: RwLock<Ringbuf>,
grpc_client: GrpcClient,
Expand All @@ -36,9 +37,11 @@ pub struct RingbufProducer {
}

impl RingbufProducer {
pub async fn connect_lazy(
settings: ProducerSettings,
) -> Result<RingbufProducer> {
/// Create a [`RingbufProducer`] by the given settings.
///
/// It will initially try to establish the required connection, and if fails,
/// it will retry in the background.
pub async fn new(settings: ProducerSettings) -> Result<RingbufProducer> {
#[cfg(not(any(
target_os = "linux",
target_os = "android",
Expand Down Expand Up @@ -128,9 +131,20 @@ impl RingbufProducer {
Ok(producer)
}

pub fn reserve(&self, size: usize) -> Result<PreAlloc> {
/// Reserve a [`PreAlloc`] for committing data.
///
/// # Errors:
///
/// - If the requested space exceeds the capacity of ringbuf, an
/// [`crate::error::Error::ExceedCapacity`] error will be returned.
///
/// - If the requested space exceeds the remaining space of ringbuf, and
/// not exceeds the capacity, an [`crate::error::Error::NotEnoughSpace`]
/// error will be returned.
pub fn reserve(&self, bytes: usize) -> Result<PreAlloc> {
let req_id = self.gen_req_id();
let data_block = self.ringbuf.write().unwrap().reserve(size, req_id)?;
let data_block =
self.ringbuf.write().unwrap().reserve(bytes, req_id)?;

let rx = self.result_fetcher.subscribe(req_id);

Expand All @@ -139,6 +153,10 @@ impl RingbufProducer {
Ok(pre)
}

/// Notify the consumer to process the data.
///
/// If the accumulated data in the ringbuf exceeds the notify_threshold, will
/// notify the consumer to process the data.
pub async fn notify_consumer(&self, notify_threshold: Option<u32>) {
let need_notify = notify_threshold.is_none_or(|threshold| {
self.ringbuf.read().unwrap().written_bytes() > threshold
Expand All @@ -160,7 +178,7 @@ impl RingbufProducer {
self.online.load(Ordering::Relaxed)
}

/// Check if the result fetcher is normal.
/// Check if the gRPC stream which fetch execution results is created .
pub fn result_fetch_normal(&self) -> bool {
self.result_fetcher.is_normal()
}
Expand All @@ -179,13 +197,13 @@ impl Drop for RingbufProducer {

/// The [`SessionHandle`] is used to send the memfd, client id and ringbuf len
/// to the consumer.
pub struct SessionHandle {
pub(crate) struct SessionHandle {
pub client_id: String,
pub fdpass_sock_path: PathBuf,
pub memfd: File,
}

pub type SessionHandleRef = Arc<SessionHandle>;
pub(crate) type SessionHandleRef = Arc<SessionHandle>;

impl SessionHandle {
pub async fn send(&self) -> Result<()> {
Expand Down
2 changes: 2 additions & 0 deletions src/producer/prealloc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ use crate::error::Result;
use crate::ringbuf::data_block::DataBlock;
use crate::ringbuf::DropGuard;

/// The pre-allocated data block.
pub struct PreAlloc {
pub(super) data_block: DataBlock<DropGuard>,
pub(super) rx: Receiver<DataProcessResult>,
Expand Down Expand Up @@ -40,6 +41,7 @@ impl PreAlloc {
self.data_block.commit();
}

/// Return a [`Handle`] to wait for the result of data processing.
pub fn wait_result(self) -> Handle {
Handle { rx: self.rx }
}
Expand Down
3 changes: 1 addition & 2 deletions tests/ringbuf/mpsc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,8 +80,7 @@ async fn do_test_ringbuf_mpsc(

let msg_num = msg_num();

let producer =
Arc::new(RingbufProducer::connect_lazy(settings).await.unwrap());
let producer = Arc::new(RingbufProducer::new(settings).await.unwrap());

for (i, expected_send) in expected_sends.into_iter().enumerate() {
let options = StartProducerOptions {
Expand Down
3 changes: 1 addition & 2 deletions tests/ringbuf/spsc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,7 @@ async fn do_test_ringbuf_spsc(

let msg_num = msg_num();

let producer =
Arc::new(RingbufProducer::connect_lazy(settings).await.unwrap());
let producer = Arc::new(RingbufProducer::new(settings).await.unwrap());

let options = StartProducerOptions {
producer,
Expand Down
Loading