Skip to content

Commit

Permalink
feat: support crc32 checksum (#20)
Browse files Browse the repository at this point in the history
* feat: support set enable_checksum in producer settings

* push checksum error to producer
  • Loading branch information
fengys1996 authored Jan 11, 2025
1 parent 83e43e7 commit 1400aa5
Show file tree
Hide file tree
Showing 11 changed files with 220 additions and 25 deletions.
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ benchmark = []

[dependencies]
async-trait = { version = "0.1" }
crc32fast = { version = "1.4" }
dashmap = { version = "6.1" }
futures = { version = "0.3" }
hyper-util = { version = "0.1" }
Expand Down
18 changes: 18 additions & 0 deletions src/consumer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use tokio_util::sync::CancellationToken;
use tracing::warn;

use crate::error::DataProcessResult;
use crate::error::CHECKSUM_MISMATCH;
use crate::fd_pass::FdRecvServer;
use crate::grpc::proto::shm_control_server::ShmControlServer;
use crate::grpc::server::ShmCtlHandler;
Expand Down Expand Up @@ -200,6 +201,7 @@ where
E: Into<DataProcessResult>,
{
let ringbuf = session.ringbuf();
let enable_checksum = session.enable_checksum();

while let Some(data_block) = ringbuf.peek() {
if data_block.is_busy() {
Expand All @@ -209,6 +211,22 @@ where
let data_slice = data_block.slice().unwrap();
let req_id = data_block.req_id();

if enable_checksum
&& crc32fast::hash(data_slice) != data_block.checksum()
{
let ret = DataProcessResult {
status_code: CHECKSUM_MISMATCH,
message: format!(
"checksum mismatch, client id: {}, req id: {}",
session.client_id(),
req_id
),
};
session.push_result(req_id, ret).await;
unsafe { ringbuf.advance_consume_offset(data_block.total_len()) }
continue;
}

if let Err(e) = processor.process(data_slice).await {
session.push_result(req_id, e).await;
} else {
Expand Down
17 changes: 16 additions & 1 deletion src/consumer/session_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,23 @@ use crate::ringbuf::Ringbuf;
/// When each client connects, the server will generate a session to save relevant
/// information.
pub struct Session {
client_id: ClientId,
ringbuf: Ringbuf,
enable_checksum: bool,
/// Send the results of data processing to the producer.
result_sender: RwLock<Option<Sender<StdResult<proto::ResultSet, Status>>>>,
}
pub type SessionRef = Arc<Session>;

impl Session {
/// Create a new session.
pub fn new(ringbuf: Ringbuf) -> Self {
pub fn new(client_id: ClientId, ringbuf: Ringbuf) -> Self {
let enable_checksum = ringbuf.checksum_flag();
Self {
client_id,
ringbuf,
result_sender: RwLock::new(None),
enable_checksum,
}
}

Expand All @@ -37,6 +42,16 @@ impl Session {
&self.ringbuf
}

/// Get the client id of the session.
pub fn client_id(&self) -> &ClientId {
&self.client_id
}

/// Whether to enable checksum.
pub fn enable_checksum(&self) -> bool {
self.enable_checksum
}

/// Push an OK result to the producer.
pub async fn push_ok(&self, request_id: u32) {
self.push_result(request_id, DataProcessResult::ok()).await;
Expand Down
4 changes: 4 additions & 0 deletions src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -168,3 +168,7 @@ impl DataProcessResult {
}
}
}

/// 100000 - 200000 are error codes used internally by shm-ringbuf and should
/// not be used as business codes.
pub const CHECKSUM_MISMATCH: u32 = 100000;
2 changes: 1 addition & 1 deletion src/fd_pass.rs
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ impl Handler {
let ringbuf = Ringbuf::from(&file)?;

// 6. Store the ringbuf to ringbuf store.
let session = Arc::new(Session::new(ringbuf));
let session = Arc::new(Session::new(client_id.clone(), ringbuf));
self.session_manager.insert(client_id, session);

Ok(())
Expand Down
16 changes: 14 additions & 2 deletions src/producer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ pub struct RingbufProducer {
cancel: CancellationToken,
req_id: AtomicU32,
result_fetcher: ResultFetcher,
enable_checksum: bool,
}

impl RingbufProducer {
Expand Down Expand Up @@ -67,6 +68,7 @@ impl RingbufProducer {
fdpass_sock_path,
heartbeat_interval,
result_fetch_retry_interval,
enable_checksum,
} = settings;

let client_id = gen_client_id();
Expand All @@ -92,7 +94,11 @@ impl RingbufProducer {
})?;

let grpc_client = GrpcClient::new(&client_id, grpc_sock_path);
let ringbuf = RwLock::new(Ringbuf::new(&memfd)?);

let ringbuf = Ringbuf::new(&memfd)?;
ringbuf.set_checksum_flag(enable_checksum);
let ringbuf = RwLock::new(ringbuf);

let online = Arc::new(AtomicBool::new(false));
let req_id = AtomicU32::new(0);
let cancel = CancellationToken::new();
Expand Down Expand Up @@ -126,6 +132,7 @@ impl RingbufProducer {
cancel,
req_id,
result_fetcher,
enable_checksum,
};

Ok(producer)
Expand All @@ -147,8 +154,13 @@ impl RingbufProducer {
self.ringbuf.write().unwrap().reserve(bytes, req_id)?;

let rx = self.result_fetcher.subscribe(req_id);
let enable_checksum = self.enable_checksum;

let pre = PreAlloc { data_block, rx };
let pre = PreAlloc {
data_block,
rx,
enable_checksum,
};

Ok(pre)
}
Expand Down
6 changes: 6 additions & 0 deletions src/producer/prealloc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use crate::ringbuf::DropGuard;
pub struct PreAlloc {
pub(super) data_block: DataBlock<DropGuard>,
pub(super) rx: Receiver<DataProcessResult>,
pub(super) enable_checksum: bool,
}

impl PreAlloc {
Expand All @@ -38,6 +39,11 @@ impl PreAlloc {
///
/// After commit, the consumer can see the written data.
pub fn commit(&self) {
if self.enable_checksum {
let checksum = crc32fast::hash(self.slice());
self.data_block.set_checksum(checksum);
}

self.data_block.commit();
}

Expand Down
11 changes: 11 additions & 0 deletions src/producer/settings.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ pub struct ProducerSettings {
pub(super) ringbuf_len: usize,
pub(super) heartbeat_interval: Duration,
pub(super) result_fetch_retry_interval: Duration,
pub(super) enable_checksum: bool,
#[cfg(not(any(
target_os = "linux",
target_os = "android",
Expand All @@ -29,6 +30,7 @@ pub struct ProducerSettingsBuilder {
ringbuf_len: Option<usize>,
heartbeat_interval: Option<Duration>,
result_fetch_retry_interval: Option<Duration>,
enable_checksum: Option<bool>,
#[cfg(not(any(
target_os = "linux",
target_os = "android",
Expand Down Expand Up @@ -74,6 +76,12 @@ impl ProducerSettingsBuilder {
self
}

/// Enable verify data consistency by checksum.
pub fn enable_checksum(mut self, enable: bool) -> Self {
self.enable_checksum = Some(enable);
self
}

#[cfg(not(any(
target_os = "linux",
target_os = "android",
Expand Down Expand Up @@ -104,6 +112,8 @@ impl ProducerSettingsBuilder {
.result_fetch_retry_interval
.unwrap_or(DEFAULT_RESULT_FETCH_RETRY_INTERVAL);

let enable_checksum = self.enable_checksum.unwrap_or(false);

#[cfg(not(any(
target_os = "linux",
target_os = "android",
Expand Down Expand Up @@ -137,6 +147,7 @@ impl ProducerSettingsBuilder {
fdpass_sock_path,
ringbuf_len,
heartbeat_interval,
enable_checksum,
result_fetch_retry_interval,
};
}
Expand Down
48 changes: 43 additions & 5 deletions src/ringbuf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ impl Ringbuf {
// 3. Reset the metadata part.
ringbuf.set_consume_offset(0);
ringbuf.set_produce_offset(0);
ringbuf.set_checksum_flag(false);

Ok(ringbuf)
}
Expand Down Expand Up @@ -378,6 +379,26 @@ impl Ringbuf {
Some((pre + len) % self.data_part_len as u32)
});
}

/// Set the checksum flag.
pub fn set_checksum_flag(&self, flag: bool) {
let ptr = self.metadata.options;

let atomic = unsafe { AtomicU32::from_ptr(ptr) };
if flag {
atomic.fetch_or(0x1, Ordering::Release);
} else {
atomic.fetch_and(!0x1, Ordering::Release);
}
}

/// Get the checksum flag.
pub fn checksum_flag(&self) -> bool {
let ptr = self.metadata.options;

let atomic = unsafe { AtomicU32::from_ptr(ptr) };
atomic.load(Ordering::Acquire) & 0x1 != 0
}
}

pub struct DropGuard {
Expand Down Expand Up @@ -417,11 +438,11 @@ fn sys_page_size() -> u64 {
/// metadata.produce_offset metadata.consume_offset
/// | |
/// v v
/// +-------------------+-------------------+-------------------+
/// | produce_offset | consume_offset | reserved |
/// +-------------------+-------------------+-------------------+
/// | 4 bytes | 4 bytes | n bytes |
/// +-------------------+-------------------+-------------------+
/// +-------------------+-------------------+-------------------+-------------------+
/// | produce_offset | consume_offset | options | reserved |
/// +-------------------+-------------------+-------------------+-------------------+
/// | 4 bytes | 4 bytes | 4 bytes | n bytes |
/// +-------------------+-------------------+-------------------+-------------------+
/// ```
#[derive(Copy, Clone, Debug)]
pub struct RingbufMetadata {
Expand All @@ -430,6 +451,8 @@ pub struct RingbufMetadata {

/// The raw pointer to consume_offset which is the next read position in ringbuf.
pub(super) consume_offset_ptr: *mut u32,

pub(super) options: *mut u32,
}

impl RingbufMetadata {
Expand All @@ -440,10 +463,12 @@ impl RingbufMetadata {
pub unsafe fn new(metadata_ptr: *mut u8) -> Self {
let produce_offset_ptr = metadata_ptr as *mut u32;
let consume_offset_ptr = unsafe { produce_offset_ptr.add(1) };
let options = unsafe { consume_offset_ptr.add(1) };

Self {
produce_offset_ptr,
consume_offset_ptr,
options,
}
}
}
Expand Down Expand Up @@ -598,4 +623,17 @@ mod tests {
pre_alloc.commit();
}
}

#[test]
fn test_checksum_flag() {
let file = tempfile::tempfile().unwrap();
let ringbuf = Ringbuf::new(&file).unwrap();
assert!(!ringbuf.checksum_flag());

ringbuf.set_checksum_flag(true);
assert!(ringbuf.checksum_flag());

ringbuf.set_checksum_flag(false);
assert!(!ringbuf.checksum_flag());
}
}
Loading

0 comments on commit 1400aa5

Please sign in to comment.