Skip to content

Commit

Permalink
refactor: modify the ringbuf consumer api
Browse files Browse the repository at this point in the history
  • Loading branch information
fengys1996 committed Dec 27, 2024
1 parent a31f523 commit 4d2ac3e
Showing 1 changed file with 55 additions and 16 deletions.
71 changes: 55 additions & 16 deletions src/consumer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,39 +25,81 @@ use crate::grpc::proto::shm_control_server::ShmControlServer;
use crate::grpc::server::ShmCtlHandler;
use crate::grpc::server::ShmCtlServer;

/// The comsumer of the ringbuf based on shared memory.

Check warning on line 28 in src/consumer.rs

View workflow job for this annotation

GitHub Actions / Typos Check

"comsumer" should be "consumer".
pub struct RingbufConsumer {
session_manager: SessionManagerRef,
notify: Arc<Notify>,
settings: ConsumerSettings,
cancel: CancellationToken,
detach_grpc: AtomicBool,
grpc_detached: AtomicBool,
started: AtomicBool,
}

impl RingbufConsumer {
/// Create a [`RingbufConsumer`] by the given settings.
pub fn new(settings: ConsumerSettings) -> Self {
let (consumer, _) = Self::new_with_detach_grpc(settings, false);
consumer
}

/// Create a [`RingbufConsumer`] by the given settings and detach the gRPC
/// server.
///
/// Since some users have their own grpc services and do not want to occupy
/// an additional uds.
///
/// Note: If detached, the `grpc_sock_path` in the settings will be ignored.
pub fn new_with_detach_grpc(
settings: ConsumerSettings,
detached: bool,
) -> (RingbufConsumer, Option<ShmControlServer<ShmCtlHandler>>) {
let session_manager = Arc::new(SessionManager::new(
settings.max_session_num,
settings.session_tti,
));
let notify = Arc::new(Notify::new());
let cancel = CancellationToken::new();
let detach_grpc = AtomicBool::new(false);
let started = AtomicBool::new(false);
let grpc_detached = AtomicBool::new(detached);

let grpc_server = if detached {
let handler = ShmCtlHandler {
notify: notify.clone(),
session_manager: session_manager.clone(),
};
Some(ShmControlServer::new(handler))
} else {
None
};

RingbufConsumer {
let consumer = RingbufConsumer {
session_manager,
notify,
cancel,
settings,
detach_grpc,
}
grpc_detached,
started,
};

(consumer, grpc_server)
}

/// Run the consumer, which will block the current thread.
pub async fn run<P, E>(&self, processor: P)
where
P: DataProcess<Error = E>,
E: Into<DataProcessResult> + Debug + Send,
{
if !self.detach_grpc.load(Ordering::Relaxed) {
if self
.started
.compare_exchange(false, true, Ordering::Relaxed, Ordering::Relaxed)
.is_err()
{
warn!("the consumer has already started.");
return;
}

if !self.grpc_detached.load(Ordering::Relaxed) {
self.start_grpc_server().await;
}

Expand All @@ -68,19 +110,14 @@ impl RingbufConsumer {
self.process_loop(&processor, interval, Some(cancel)).await;
}

pub fn detach_grpc_server(&self) -> ShmControlServer<ShmCtlHandler> {
let handler = ShmCtlHandler {
notify: self.notify.clone(),
session_manager: self.session_manager.clone(),
};
self.detach_grpc.store(true, Ordering::Relaxed);
ShmControlServer::new(handler)
}

/// Cancel the consumer.
pub fn cancel(&self) {
self.cancel.cancel();
}

/// Start the gRPC server.
/// 1. receive the notification from the producer.
/// 2. send the execution results to the producer via gRPC stream.
async fn start_grpc_server(&self) {
let cancel = self.cancel.clone();
let notify = self.notify.clone();
Expand All @@ -98,6 +135,7 @@ impl RingbufConsumer {
});
}

/// Start the server to receive the file descriptors from the producer.
async fn start_fdrecv_server(&self) {
let cancel = self.cancel.clone();
let session_manager = self.session_manager.clone();
Expand All @@ -113,7 +151,8 @@ impl RingbufConsumer {
});
}

pub async fn process_loop<P, E>(
/// The main loop to process the ringbufs.
async fn process_loop<P, E>(
&self,
processor: &P,
interval: Duration,
Expand Down

0 comments on commit 4d2ac3e

Please sign in to comment.