Skip to content

Commit

Permalink
chore: rand message for test
Browse files Browse the repository at this point in the history
  • Loading branch information
fengys1996 committed Dec 24, 2024
1 parent b1d67b2 commit f13552c
Show file tree
Hide file tree
Showing 3 changed files with 81 additions and 138 deletions.
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ tracing = { version = "0.1" }

[dev-dependencies]
criterion = { version = "0.5" }
rand = { version = "0.8" }
tempfile = { version = "3.14" }
tracing-subscriber = { version = "0.3" }

Expand Down
15 changes: 14 additions & 1 deletion tests/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ impl From<Error> for DataProcessResult {

pub fn msg_num() -> usize {
std::env::var("MSG_NUM")
.unwrap_or_else(|_| "100000".to_string())
.unwrap_or_else(|_| "10000".to_string())
.parse()
.unwrap()
}
Expand Down Expand Up @@ -102,3 +102,16 @@ pub async fn wait_consumer_online(

Err("wait consumer online timeout".to_string())
}

pub fn gen_str(max_len: usize) -> String {
const CHARSET: &[u8] = b"ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789)(*&^%$#@!~";
let len = rand::random::<usize>() % max_len + 1;

let mut s = String::new();

for _ in 0..len {
s.push(CHARSET[rand::random::<usize>() % CHARSET.len()] as char);
}

s
}
203 changes: 66 additions & 137 deletions tests/ringbuf_spsc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,148 +3,49 @@ mod common;
use std::sync::Arc;
use std::time::Duration;

use common::{msg_num, reserve_with_retry, wait_consumer_online, MsgForward};
use common::{
gen_str, msg_num, reserve_with_retry, wait_consumer_online, MsgForward,
};
use shm_ringbuf::consumer::settings::ConsumerSettingsBuilder;
use shm_ringbuf::consumer::RingbufConsumer;
use shm_ringbuf::producer::settings::ProducerSettingsBuilder;
use shm_ringbuf::producer::RingbufProducer;

#[tokio::test]
async fn test_ringbuf_spsc_base() {
tracing_subscriber::fmt::init();

let (send_msgs, mut recv_msgs) = tokio::sync::mpsc::channel(100);

let dir = tempfile::tempdir().unwrap();
let grpc_sock_path = dir.path().join("control.sock");
let fdpass_sock_path = dir.path().join("sendfd.sock");

let settings = ConsumerSettingsBuilder::new()
.grpc_sock_path(grpc_sock_path.clone())
.fdpass_sock_path(fdpass_sock_path.clone())
.process_interval(Duration::from_millis(10))
.build();

tokio::spawn(async move {
let string_print = MsgForward { sender: send_msgs };
RingbufConsumer::new(settings).run(string_print).await;
});

// wait for the consumer to start.
tokio::time::sleep(Duration::from_millis(10)).await;

let settings = ProducerSettingsBuilder::new()
.grpc_sock_path(grpc_sock_path.clone())
.fdpass_sock_path(fdpass_sock_path.clone())
.build();

let producer =
Arc::new(RingbufProducer::connect_lazy(settings).await.unwrap());

let msg_num = msg_num();

tokio::spawn(async move {
for i in 0..msg_num {
let mut pre_alloc =
reserve_with_retry(&producer, 20, 3, Duration::from_secs(1))
.await
.unwrap();

let write_str = format!("hello, {}", i);

wait_consumer_online(&producer, 5, Duration::from_secs(3))
.await
.unwrap();

pre_alloc.write(write_str.as_bytes()).unwrap();

pre_alloc.commit();
}
});

for i in 0..msg_num {
let msg = format!("hello, {}", i);
assert_eq!(recv_msgs.recv().await.unwrap(), msg);
}
do_test_ringbuf_spsc(false, Duration::from_millis(10), None).await;
}

#[tokio::test]
async fn test_ringbuf_spsc_with_notify() {
tracing_subscriber::fmt::init();

let (send_msgs, mut recv_msgs) = tokio::sync::mpsc::channel(100);

let dir = tempfile::tempdir().unwrap();
let grpc_sock_path = dir.path().join("control.sock");
let fdpass_sock_path = dir.path().join("sendfd.sock");

let settings = ConsumerSettingsBuilder::new()
.grpc_sock_path(grpc_sock_path.clone())
.fdpass_sock_path(fdpass_sock_path.clone())
.process_interval(Duration::from_millis(10))
// Set too long interval for testing notify.
.process_interval(Duration::from_millis(1000))
.build();

tokio::spawn(async move {
let string_print = MsgForward { sender: send_msgs };
RingbufConsumer::new(settings).run(string_print).await;
});

// Wait for the consumer to start.
tokio::time::sleep(Duration::from_millis(10)).await;

let settings = ProducerSettingsBuilder::new()
.grpc_sock_path(grpc_sock_path.clone())
.fdpass_sock_path(fdpass_sock_path.clone())
.build();

let producer =
Arc::new(RingbufProducer::connect_lazy(settings).await.unwrap());

let msg_num = msg_num();

tokio::spawn(async move {
for i in 0..msg_num {
let mut pre_alloc =
reserve_with_retry(&producer, 20, 3, Duration::from_secs(1))
.await
.unwrap();

let write_str = format!("hello, {}", i);

wait_consumer_online(&producer, 5, Duration::from_secs(3))
.await
.unwrap();

pre_alloc.write(write_str.as_bytes()).unwrap();

pre_alloc.commit();

producer.notify_consumer(Some(1000)).await;
}
});

for i in 0..msg_num {
let msg = format!("hello, {}", i);
assert_eq!(recv_msgs.recv().await.unwrap(), msg);
}
// Set too long interval for testing notify.
do_test_ringbuf_spsc(false, Duration::from_secs(100), Some(1000)).await;
}

#[tokio::test]
async fn test_ringbuf_spsc_with_wait_result() {
do_test_ringbuf_spsc(true, Duration::from_millis(10), None).await;
}

async fn do_test_ringbuf_spsc(
wait_result: bool,
process_interval: Duration,
notify_limit: Option<u32>,
) {
tracing_subscriber::fmt::init();

let (send_msgs, mut recv_msgs) = tokio::sync::mpsc::channel(100);

let (expected_send, mut expected_recv) = tokio::sync::mpsc::channel(100);

let dir = tempfile::tempdir().unwrap();
let grpc_sock_path = dir.path().join("control.sock");
let fdpass_sock_path = dir.path().join("sendfd.sock");

let settings = ConsumerSettingsBuilder::new()
.grpc_sock_path(grpc_sock_path.clone())
.fdpass_sock_path(fdpass_sock_path.clone())
.process_interval(Duration::from_millis(10))
.process_interval(process_interval)
.build();

tokio::spawn(async move {
Expand All @@ -166,14 +67,25 @@ async fn test_ringbuf_spsc_with_wait_result() {
let msg_num = msg_num();

tokio::spawn(async move {
let mut joins = Vec::with_capacity(100);
let mut joins = if wait_result {
Some(Vec::with_capacity(1000))
} else {
None
};

for i in 0..msg_num {
let mut pre_alloc =
reserve_with_retry(&producer, 20, 3, Duration::from_secs(1))
.await
.unwrap();
let write_str = gen_str(1000);

let write_str = format!("hello, {}", i);
expected_send.send(write_str.clone()).await.unwrap();

let mut pre_alloc = reserve_with_retry(
&producer,
write_str.len(),
3,
Duration::from_secs(1),
)
.await
.unwrap();

wait_consumer_online(&producer, 5, Duration::from_secs(3))
.await
Expand All @@ -183,28 +95,45 @@ async fn test_ringbuf_spsc_with_wait_result() {

pre_alloc.commit();

let join = pre_alloc.wait_result();
if let Some(limit) = notify_limit {
// If we set a longer process interval, the last batch of messages
// may not be processed quickly, because the data accumulated in
// the ringbuf may be too small and does not exceed the notify limit,
// so the notification will not be triggered. Therefore, we need
// to trigger a notification at the end.
if i == msg_num - 1 {
producer.notify_consumer(None).await;
} else {
producer.notify_consumer(Some(limit)).await;
}
}

if let Some(joins) = &mut joins {
let join = pre_alloc.wait_result();

joins.push(join);
joins.push(join);

// Wait the result every 1000 messages.
if i % 1000 == 0 {
for join in joins.drain(..) {
let result = join.await.unwrap();
assert_eq!(result.status_code, 0);
// Wait the result every 1000 messages.
if i % 1000 == 0 {
for join in joins.drain(..) {
let result = join.await.unwrap();
assert_eq!(result.status_code, 0);
}
}
}
if i == msg_num - 1 {
for join in joins.drain(..) {
let result = join.await.unwrap();
assert_eq!(result.status_code, 0);
if i == msg_num - 1 {
for join in joins.drain(..) {
let result = join.await.unwrap();
assert_eq!(result.status_code, 0);
}
}
}
}
});

for i in 0..msg_num {
let msg = format!("hello, {}", i);
assert_eq!(recv_msgs.recv().await.unwrap(), msg);
for _i in 0..msg_num {
let expected = expected_recv.recv().await.unwrap();
let actual = recv_msgs.recv().await.unwrap();

assert_eq!(expected, actual);
}
}

0 comments on commit f13552c

Please sign in to comment.