From f13552ccd3480cca2363ccfa596e3189981bbec5 Mon Sep 17 00:00:00 2001 From: fys Date: Tue, 24 Dec 2024 13:06:25 +0800 Subject: [PATCH] chore: rand message for test --- Cargo.toml | 1 + tests/common.rs | 15 +++- tests/ringbuf_spsc.rs | 203 ++++++++++++++---------------------------- 3 files changed, 81 insertions(+), 138 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index e8c9d90..33d69a2 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -30,6 +30,7 @@ tracing = { version = "0.1" } [dev-dependencies] criterion = { version = "0.5" } +rand = { version = "0.8" } tempfile = { version = "3.14" } tracing-subscriber = { version = "0.3" } diff --git a/tests/common.rs b/tests/common.rs index 12b6829..cb4f416 100644 --- a/tests/common.rs +++ b/tests/common.rs @@ -57,7 +57,7 @@ impl From for DataProcessResult { pub fn msg_num() -> usize { std::env::var("MSG_NUM") - .unwrap_or_else(|_| "100000".to_string()) + .unwrap_or_else(|_| "10000".to_string()) .parse() .unwrap() } @@ -102,3 +102,16 @@ pub async fn wait_consumer_online( Err("wait consumer online timeout".to_string()) } + +pub fn gen_str(max_len: usize) -> String { + const CHARSET: &[u8] = b"ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789)(*&^%$#@!~"; + let len = rand::random::() % max_len + 1; + + let mut s = String::new(); + + for _ in 0..len { + s.push(CHARSET[rand::random::() % CHARSET.len()] as char); + } + + s +} diff --git a/tests/ringbuf_spsc.rs b/tests/ringbuf_spsc.rs index dc9a183..6bc8d5b 100644 --- a/tests/ringbuf_spsc.rs +++ b/tests/ringbuf_spsc.rs @@ -3,7 +3,9 @@ mod common; use std::sync::Arc; use std::time::Duration; -use common::{msg_num, reserve_with_retry, wait_consumer_online, MsgForward}; +use common::{ + gen_str, msg_num, reserve_with_retry, wait_consumer_online, MsgForward, +}; use shm_ringbuf::consumer::settings::ConsumerSettingsBuilder; use shm_ringbuf::consumer::RingbufConsumer; use shm_ringbuf::producer::settings::ProducerSettingsBuilder; @@ -11,132 +13,31 @@ use shm_ringbuf::producer::RingbufProducer; #[tokio::test] async fn test_ringbuf_spsc_base() { - tracing_subscriber::fmt::init(); - - let (send_msgs, mut recv_msgs) = tokio::sync::mpsc::channel(100); - - let dir = tempfile::tempdir().unwrap(); - let grpc_sock_path = dir.path().join("control.sock"); - let fdpass_sock_path = dir.path().join("sendfd.sock"); - - let settings = ConsumerSettingsBuilder::new() - .grpc_sock_path(grpc_sock_path.clone()) - .fdpass_sock_path(fdpass_sock_path.clone()) - .process_interval(Duration::from_millis(10)) - .build(); - - tokio::spawn(async move { - let string_print = MsgForward { sender: send_msgs }; - RingbufConsumer::new(settings).run(string_print).await; - }); - - // wait for the consumer to start. - tokio::time::sleep(Duration::from_millis(10)).await; - - let settings = ProducerSettingsBuilder::new() - .grpc_sock_path(grpc_sock_path.clone()) - .fdpass_sock_path(fdpass_sock_path.clone()) - .build(); - - let producer = - Arc::new(RingbufProducer::connect_lazy(settings).await.unwrap()); - - let msg_num = msg_num(); - - tokio::spawn(async move { - for i in 0..msg_num { - let mut pre_alloc = - reserve_with_retry(&producer, 20, 3, Duration::from_secs(1)) - .await - .unwrap(); - - let write_str = format!("hello, {}", i); - - wait_consumer_online(&producer, 5, Duration::from_secs(3)) - .await - .unwrap(); - - pre_alloc.write(write_str.as_bytes()).unwrap(); - - pre_alloc.commit(); - } - }); - - for i in 0..msg_num { - let msg = format!("hello, {}", i); - assert_eq!(recv_msgs.recv().await.unwrap(), msg); - } + do_test_ringbuf_spsc(false, Duration::from_millis(10), None).await; } #[tokio::test] async fn test_ringbuf_spsc_with_notify() { - tracing_subscriber::fmt::init(); - - let (send_msgs, mut recv_msgs) = tokio::sync::mpsc::channel(100); - - let dir = tempfile::tempdir().unwrap(); - let grpc_sock_path = dir.path().join("control.sock"); - let fdpass_sock_path = dir.path().join("sendfd.sock"); - - let settings = ConsumerSettingsBuilder::new() - .grpc_sock_path(grpc_sock_path.clone()) - .fdpass_sock_path(fdpass_sock_path.clone()) - .process_interval(Duration::from_millis(10)) - // Set too long interval for testing notify. - .process_interval(Duration::from_millis(1000)) - .build(); - - tokio::spawn(async move { - let string_print = MsgForward { sender: send_msgs }; - RingbufConsumer::new(settings).run(string_print).await; - }); - - // Wait for the consumer to start. - tokio::time::sleep(Duration::from_millis(10)).await; - - let settings = ProducerSettingsBuilder::new() - .grpc_sock_path(grpc_sock_path.clone()) - .fdpass_sock_path(fdpass_sock_path.clone()) - .build(); - - let producer = - Arc::new(RingbufProducer::connect_lazy(settings).await.unwrap()); - - let msg_num = msg_num(); - - tokio::spawn(async move { - for i in 0..msg_num { - let mut pre_alloc = - reserve_with_retry(&producer, 20, 3, Duration::from_secs(1)) - .await - .unwrap(); - - let write_str = format!("hello, {}", i); - - wait_consumer_online(&producer, 5, Duration::from_secs(3)) - .await - .unwrap(); - - pre_alloc.write(write_str.as_bytes()).unwrap(); - - pre_alloc.commit(); - - producer.notify_consumer(Some(1000)).await; - } - }); - - for i in 0..msg_num { - let msg = format!("hello, {}", i); - assert_eq!(recv_msgs.recv().await.unwrap(), msg); - } + // Set too long interval for testing notify. + do_test_ringbuf_spsc(false, Duration::from_secs(100), Some(1000)).await; } #[tokio::test] async fn test_ringbuf_spsc_with_wait_result() { + do_test_ringbuf_spsc(true, Duration::from_millis(10), None).await; +} + +async fn do_test_ringbuf_spsc( + wait_result: bool, + process_interval: Duration, + notify_limit: Option, +) { tracing_subscriber::fmt::init(); let (send_msgs, mut recv_msgs) = tokio::sync::mpsc::channel(100); + let (expected_send, mut expected_recv) = tokio::sync::mpsc::channel(100); + let dir = tempfile::tempdir().unwrap(); let grpc_sock_path = dir.path().join("control.sock"); let fdpass_sock_path = dir.path().join("sendfd.sock"); @@ -144,7 +45,7 @@ async fn test_ringbuf_spsc_with_wait_result() { let settings = ConsumerSettingsBuilder::new() .grpc_sock_path(grpc_sock_path.clone()) .fdpass_sock_path(fdpass_sock_path.clone()) - .process_interval(Duration::from_millis(10)) + .process_interval(process_interval) .build(); tokio::spawn(async move { @@ -166,14 +67,25 @@ async fn test_ringbuf_spsc_with_wait_result() { let msg_num = msg_num(); tokio::spawn(async move { - let mut joins = Vec::with_capacity(100); + let mut joins = if wait_result { + Some(Vec::with_capacity(1000)) + } else { + None + }; + for i in 0..msg_num { - let mut pre_alloc = - reserve_with_retry(&producer, 20, 3, Duration::from_secs(1)) - .await - .unwrap(); + let write_str = gen_str(1000); - let write_str = format!("hello, {}", i); + expected_send.send(write_str.clone()).await.unwrap(); + + let mut pre_alloc = reserve_with_retry( + &producer, + write_str.len(), + 3, + Duration::from_secs(1), + ) + .await + .unwrap(); wait_consumer_online(&producer, 5, Duration::from_secs(3)) .await @@ -183,28 +95,45 @@ async fn test_ringbuf_spsc_with_wait_result() { pre_alloc.commit(); - let join = pre_alloc.wait_result(); + if let Some(limit) = notify_limit { + // If we set a longer process interval, the last batch of messages + // may not be processed quickly, because the data accumulated in + // the ringbuf may be too small and does not exceed the notify limit, + // so the notification will not be triggered. Therefore, we need + // to trigger a notification at the end. + if i == msg_num - 1 { + producer.notify_consumer(None).await; + } else { + producer.notify_consumer(Some(limit)).await; + } + } + + if let Some(joins) = &mut joins { + let join = pre_alloc.wait_result(); - joins.push(join); + joins.push(join); - // Wait the result every 1000 messages. - if i % 1000 == 0 { - for join in joins.drain(..) { - let result = join.await.unwrap(); - assert_eq!(result.status_code, 0); + // Wait the result every 1000 messages. + if i % 1000 == 0 { + for join in joins.drain(..) { + let result = join.await.unwrap(); + assert_eq!(result.status_code, 0); + } } - } - if i == msg_num - 1 { - for join in joins.drain(..) { - let result = join.await.unwrap(); - assert_eq!(result.status_code, 0); + if i == msg_num - 1 { + for join in joins.drain(..) { + let result = join.await.unwrap(); + assert_eq!(result.status_code, 0); + } } } } }); - for i in 0..msg_num { - let msg = format!("hello, {}", i); - assert_eq!(recv_msgs.recv().await.unwrap(), msg); + for _i in 0..msg_num { + let expected = expected_recv.recv().await.unwrap(); + let actual = recv_msgs.recv().await.unwrap(); + + assert_eq!(expected, actual); } }