Skip to content

Commit

Permalink
feat: support detach grpc server (#12)
Browse files Browse the repository at this point in the history
* feat: support detach grpc server

* expose some structs
  • Loading branch information
fengys1996 authored Oct 31, 2024
1 parent c30b134 commit 193dd7a
Show file tree
Hide file tree
Showing 3 changed files with 24 additions and 6 deletions.
22 changes: 19 additions & 3 deletions src/consumer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ pub mod settings;
pub(crate) mod session_manager;

use std::fmt::Debug;
use std::sync::atomic::AtomicBool;
use std::sync::atomic::Ordering;
use std::sync::Arc;
use std::time::Duration;

Expand All @@ -19,13 +21,16 @@ use tracing::warn;

use crate::error::DataProcessResult;
use crate::fd_pass::FdRecvServer;
use crate::grpc::proto::shm_control_server::ShmControlServer;
use crate::grpc::server::ShmCtlHandler;
use crate::grpc::server::ShmCtlServer;

pub struct RingbufConsumer {
session_manager: SessionManagerRef,
notify: Arc<Notify>,
settings: ConsumerSettings,
cancel: CancellationToken,
detach_grpc: AtomicBool,
}

impl RingbufConsumer {
Expand All @@ -34,16 +39,16 @@ impl RingbufConsumer {
settings.max_session_capacity,
settings.session_tti,
));

let notify = Arc::new(Notify::new());

let cancel = CancellationToken::new();
let detach_grpc = AtomicBool::new(false);

RingbufConsumer {
session_manager,
notify,
cancel,
settings,
detach_grpc,
}
}

Expand All @@ -52,7 +57,9 @@ impl RingbufConsumer {
P: DataProcess<Error = E>,
E: Into<DataProcessResult> + Debug + Send,
{
self.start_grpc_server().await;
if !self.detach_grpc.load(Ordering::Relaxed) {
self.start_grpc_server().await;
}

self.start_fdrecv_server().await;

Expand All @@ -61,6 +68,15 @@ impl RingbufConsumer {
self.process_loop(&processor, interval, Some(cancel)).await;
}

pub fn detach_grpc_server(&self) -> ShmControlServer<ShmCtlHandler> {
let handler = ShmCtlHandler {
notify: self.notify.clone(),
session_manager: self.session_manager.clone(),
};
self.detach_grpc.store(true, Ordering::Relaxed);
ShmControlServer::new(handler)
}

pub fn cancel(&self) {
self.cancel.cancel();
}
Expand Down
6 changes: 3 additions & 3 deletions src/grpc/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,9 +93,9 @@ where
}
}

struct ShmCtlHandler {
notify: Arc<Notify>,
session_manager: SessionManagerRef,
pub struct ShmCtlHandler {
pub(crate) notify: Arc<Notify>,
pub(crate) session_manager: SessionManagerRef,
}

#[async_trait::async_trait]
Expand Down
2 changes: 2 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,5 +8,7 @@ mod macros;
mod memfd;
mod ringbuf;

pub use grpc::proto::shm_control_server::ShmControlServer;
pub use grpc::server::ShmCtlHandler;
#[cfg(feature = "benchmark")]
pub use ringbuf::Ringbuf;

0 comments on commit 193dd7a

Please sign in to comment.