Skip to content

Commit

Permalink
test: add mpsc test
Browse files Browse the repository at this point in the history
  • Loading branch information
fengys1996 committed Dec 25, 2024
1 parent 65dfa5a commit c48cef0
Show file tree
Hide file tree
Showing 6 changed files with 303 additions and 150 deletions.
96 changes: 95 additions & 1 deletion tests/common.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::{str::from_utf8, time::Duration};
use std::{str::from_utf8, sync::Arc, time::Duration};

use shm_ringbuf::{
consumer::process::DataProcess,
Expand Down Expand Up @@ -116,3 +116,97 @@ pub fn gen_str(min_len: usize, max_len: usize) -> String {

s
}

pub struct StartProducerOptions {
pub producer: Arc<RingbufProducer>,
pub msg_num: usize,
pub expected_send: Sender<String>,
pub wait_result: bool,
pub min_msg_len: usize,
pub max_msg_len: usize,
pub notify_limit: Option<u32>,
pub msg_prefix: Option<String>,
}

pub async fn start_producer(options: StartProducerOptions) {
let StartProducerOptions {
producer,
msg_num,
expected_send,
wait_result,
min_msg_len,
max_msg_len,
notify_limit,
msg_prefix,
} = options;

tokio::spawn(async move {
let mut joins = if wait_result {
Some(Vec::with_capacity(1000))
} else {
None
};

for i in 0..msg_num {
let write_str = gen_str(min_msg_len, max_msg_len);

let write_str = if let Some(msg_prefix) = &msg_prefix {
format!("{}{}", msg_prefix, write_str)
} else {
write_str
};

expected_send.send(write_str.clone()).await.unwrap();

let mut pre_alloc = reserve_with_retry(
&producer,
write_str.len(),
3,
Duration::from_secs(1),
)
.await
.unwrap();

wait_consumer_online(&producer, 5, Duration::from_secs(3))
.await
.unwrap();

pre_alloc.write(write_str.as_bytes()).unwrap();

pre_alloc.commit();

if let Some(limit) = notify_limit {
// If we set a longer process interval, the last batch of messages
// may not be processed quickly, because the data accumulated in
// the ringbuf may be too small and does not exceed the notify limit,
// so the notification will not be triggered. Therefore, we need
// to trigger a notification at the end.
if i == msg_num - 1 {
producer.notify_consumer(None).await;
} else {
producer.notify_consumer(Some(limit)).await;
}
}

if let Some(joins) = &mut joins {
let join = pre_alloc.wait_result();

joins.push(join);

// Wait the result every 1000 messages.
if i % 1000 == 0 {
for join in joins.drain(..) {
let result = join.await.unwrap();
assert_eq!(result.status_code, 0);
}
}
if i == msg_num - 1 {
for join in joins.drain(..) {
let result = join.await.unwrap();
assert_eq!(result.status_code, 0);
}
}
}
}
});
}
2 changes: 2 additions & 0 deletions tests/main.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
pub mod common;
pub mod ringbuf;
2 changes: 2 additions & 0 deletions tests/ringbuf/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
pub mod mpsc;
pub mod spsc;
108 changes: 108 additions & 0 deletions tests/ringbuf/mpsc.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
use std::{sync::Arc, time::Duration};

use crate::common::{
msg_num, start_producer, MsgForward, StartProducerOptions,
};
use shm_ringbuf::{
consumer::{settings::ConsumerSettingsBuilder, RingbufConsumer},
producer::{settings::ProducerSettingsBuilder, RingbufProducer},
};
use tokio::sync::mpsc;

#[tokio::test]
async fn test_ringbuf_mpsc_base() {
do_test_ringbuf_mpsc(false, Duration::from_millis(10), None, 0, 1000).await;
}

#[tokio::test]
async fn test_ringbuf_mpsc_with_notify() {
// Set too long interval for testing notify.
do_test_ringbuf_mpsc(false, Duration::from_secs(100), Some(500), 501, 1000)
.await;
}

#[tokio::test]
async fn test_ringbuf_mpsc_with_wait_result() {
do_test_ringbuf_mpsc(true, Duration::from_millis(10), None, 0, 1000).await;
}

#[tokio::test]
async fn test_ringbuf_mpsc_with_wait_result_and_notify() {
// Set too long interval for testing notify.
do_test_ringbuf_mpsc(true, Duration::from_secs(100), Some(500), 501, 1000)
.await;
}

async fn do_test_ringbuf_mpsc(
wait_result: bool,
process_interval: Duration,
notify_limit: Option<u32>,
min_msg_len: usize,
max_msg_len: usize,
) {
tracing_subscriber::fmt::init();

let (send_msgs, mut recv_msgs) = mpsc::channel(100);

let num_producers = 4;

let mut expected_sends = Vec::with_capacity(4);
let mut expected_recvs = Vec::with_capacity(4);

for _ in 0..num_producers {
let (send, recv) = mpsc::channel(100);
expected_sends.push(send);
expected_recvs.push(recv);
}

let dir = tempfile::tempdir().unwrap();
let grpc_sock_path = dir.path().join("control.sock");
let fdpass_sock_path = dir.path().join("sendfd.sock");

let settings = ConsumerSettingsBuilder::new()
.grpc_sock_path(grpc_sock_path.clone())
.fdpass_sock_path(fdpass_sock_path.clone())
.process_interval(process_interval)
.build();

tokio::spawn(async move {
let string_print = MsgForward { sender: send_msgs };
RingbufConsumer::new(settings).run(string_print).await;
});

// Wait for the consumer to start.
tokio::time::sleep(Duration::from_millis(10)).await;

let settings = ProducerSettingsBuilder::new()
.grpc_sock_path(grpc_sock_path.clone())
.fdpass_sock_path(fdpass_sock_path.clone())
.build();

let msg_num = msg_num();

let producer =
Arc::new(RingbufProducer::connect_lazy(settings).await.unwrap());

for (i, expected_send) in expected_sends.into_iter().enumerate() {
let options = StartProducerOptions {
producer: producer.clone(),
msg_num,
expected_send,
wait_result,
min_msg_len,
max_msg_len,
notify_limit,
msg_prefix: Some(format!("{}-", i)),
};

start_producer(options).await;
}

for _ in 0..num_producers * msg_num {
let actual = recv_msgs.recv().await.unwrap();
let index = actual.split('-').nth(0).unwrap().parse::<usize>().unwrap();
let expected = expected_recvs[index].recv().await.unwrap();

assert_eq!(expected, actual);
}
}
96 changes: 96 additions & 0 deletions tests/ringbuf/spsc.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
use std::{sync::Arc, time::Duration};

use crate::common::{
msg_num, start_producer, MsgForward, StartProducerOptions,
};
use shm_ringbuf::{
consumer::{settings::ConsumerSettingsBuilder, RingbufConsumer},
producer::{settings::ProducerSettingsBuilder, RingbufProducer},
};
use tokio::sync::mpsc;

#[tokio::test]
async fn test_ringbuf_spsc_base() {
do_test_ringbuf_spsc(false, Duration::from_millis(10), None, 0, 1000).await;
}

#[tokio::test]
async fn test_ringbuf_spsc_with_notify() {
// Set too long interval for testing notify.
do_test_ringbuf_spsc(false, Duration::from_secs(100), Some(500), 501, 1000)
.await;
}

#[tokio::test]
async fn test_ringbuf_spsc_with_wait_result() {
do_test_ringbuf_spsc(true, Duration::from_millis(10), None, 0, 1000).await;
}

#[tokio::test]
async fn test_ringbuf_spsc_with_wait_result_and_notify() {
// Set too long interval for testing notify.
do_test_ringbuf_spsc(true, Duration::from_secs(100), Some(500), 501, 1000)
.await;
}

async fn do_test_ringbuf_spsc(
wait_result: bool,
process_interval: Duration,
notify_limit: Option<u32>,
min_msg_len: usize,
max_msg_len: usize,
) {
tracing_subscriber::fmt::init();

let (send_msgs, mut recv_msgs) = mpsc::channel(100);

let (expected_send, mut expected_recv) = mpsc::channel(100);

let dir = tempfile::tempdir().unwrap();
let grpc_sock_path = dir.path().join("control.sock");
let fdpass_sock_path = dir.path().join("sendfd.sock");

let settings = ConsumerSettingsBuilder::new()
.grpc_sock_path(grpc_sock_path.clone())
.fdpass_sock_path(fdpass_sock_path.clone())
.process_interval(process_interval)
.build();

tokio::spawn(async move {
let string_print = MsgForward { sender: send_msgs };
RingbufConsumer::new(settings).run(string_print).await;
});

// Wait for the consumer to start.
tokio::time::sleep(Duration::from_millis(10)).await;

let settings = ProducerSettingsBuilder::new()
.grpc_sock_path(grpc_sock_path.clone())
.fdpass_sock_path(fdpass_sock_path.clone())
.build();

let msg_num = msg_num();

let producer =
Arc::new(RingbufProducer::connect_lazy(settings).await.unwrap());

let options = StartProducerOptions {
producer,
msg_num,
expected_send,
wait_result,
min_msg_len,
max_msg_len,
notify_limit,
msg_prefix: None,
};

start_producer(options).await;

for _ in 0..msg_num {
let expected = expected_recv.recv().await.unwrap();
let actual = recv_msgs.recv().await.unwrap();

assert_eq!(expected, actual);
}
}
Loading

0 comments on commit c48cef0

Please sign in to comment.