Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Dynamic send #276

Open
wants to merge 6 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -66,3 +66,6 @@ path="examples/superstreams/send_super_stream.rs"
name="environment_deserialization"
path="examples/environment_deserialization.rs"

[[bin]]
name = "perf-producer"
path = "src/bin/perf-producer.rs"
175 changes: 175 additions & 0 deletions src/bin/perf-producer.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,175 @@
#![allow(dead_code)]

use std::{
sync::{
atomic::{AtomicU32, Ordering},
Arc,
},
time::{Duration, Instant, SystemTime, UNIX_EPOCH},
};

use rabbitmq_stream_client::{
types::{ByteCapacity, Message, OffsetSpecification},
Environment,
};
use tokio::{sync::mpsc::UnboundedSender, time::sleep};
use tokio_stream::StreamExt;

static ONE_SECOND: Duration = Duration::from_secs(1);
static ONE_MINUTE: Duration = Duration::from_secs(60);

struct Metric {
created_at: u128,
received_at: SystemTime,
}

#[derive(Debug)]
struct Stats {
average_latency: f32,
messages_received: usize,
}

#[tokio::main]
async fn main() {
let stream_name = "perf-stream";

let environment = Environment::builder().build().await.unwrap();
let _ = environment.delete_stream(stream_name).await;
environment
.stream_creator()
.max_length(ByteCapacity::GB(5))
.create(stream_name)
.await
.unwrap();

let environment = Arc::new(environment);

let (sender, mut receiver) = tokio::sync::mpsc::unbounded_channel();
let consumer_env = environment.clone();
let consumer_handler = tokio::spawn(async move {
start_consumer(consumer_env, stream_name, sender).await;
});

let produced_messages = AtomicU32::new(0);
let producer_env = environment.clone();
let producer_handler = tokio::spawn(async move {
start_producer(producer_env, stream_name, &produced_messages).await;
});

let run_for = Duration::from_secs(5 * 60);

tokio::spawn(async move {
sleep(run_for).await;
producer_handler.abort();
sleep(Duration::from_secs(1)).await;
consumer_handler.abort();
});

let minutes = run_for.as_secs() / 60;

let mut now = Instant::now();
// 5 minutes of metrics
let mut metrics = Vec::with_capacity(50 * 60 * minutes as usize);
while let Some(metric) = receiver.recv().await {
if now.elapsed() > ONE_MINUTE {
now = Instant::now();

let last_metrics = metrics;
metrics = Vec::with_capacity(50 * 60 * minutes as usize);
tokio::spawn(async move {
let stats = calculate_stats(last_metrics).await;
println!("stats: {:?}", stats);
});
}
metrics.push(metric);
}

let stats = calculate_stats(metrics).await;
println!("stats: {:?}", stats);
}

Check warning on line 89 in src/bin/perf-producer.rs

View check run for this annotation

Codecov / codecov/patch

src/bin/perf-producer.rs#L33-L89

Added lines #L33 - L89 were not covered by tests

async fn calculate_stats(metrics: Vec<Metric>) -> Stats {
let mut total_latency = 0;
let metric_count = metrics.len();
for metric in metrics {
let created_at = SystemTime::UNIX_EPOCH + Duration::from_millis(metric.created_at as u64);
let received_at = metric.received_at;
let delta = received_at.duration_since(created_at).unwrap();
total_latency += delta.as_millis();
}

Check warning on line 99 in src/bin/perf-producer.rs

View check run for this annotation

Codecov / codecov/patch

src/bin/perf-producer.rs#L91-L99

Added lines #L91 - L99 were not covered by tests

Stats {
average_latency: total_latency as f32 / metric_count as f32,
messages_received: metric_count,
}
}

Check warning on line 105 in src/bin/perf-producer.rs

View check run for this annotation

Codecov / codecov/patch

src/bin/perf-producer.rs#L101-L105

Added lines #L101 - L105 were not covered by tests

async fn start_consumer(
environment: Arc<Environment>,
stream_name: &str,
sender: UnboundedSender<Metric>,
) {
let mut consumer = environment
.consumer()
.offset(OffsetSpecification::First)
.build(stream_name)
.await
.unwrap();
while let Some(Ok(delivery)) = consumer.next().await {
let produced_at = delivery
.message()
.data()
.map(|data| {
u128::from_be_bytes([
data[0], data[1], data[2], data[3], data[4], data[5], data[6], data[7],
data[8], data[9], data[10], data[11], data[12], data[13], data[14], data[15],
])
})
.unwrap();
let metric = Metric {
created_at: produced_at,
received_at: SystemTime::now(),
};
sender.send(metric).unwrap();

Check warning on line 133 in src/bin/perf-producer.rs

View check run for this annotation

Codecov / codecov/patch

src/bin/perf-producer.rs#L107-L133

Added lines #L107 - L133 were not covered by tests
}
}

Check warning on line 135 in src/bin/perf-producer.rs

View check run for this annotation

Codecov / codecov/patch

src/bin/perf-producer.rs#L135

Added line #L135 was not covered by tests

async fn start_producer(
environment: Arc<Environment>,
stream_name: &str,
produced_messages: &AtomicU32,
) {
let message_per_second = 50_usize;
let producer = environment.producer().build(stream_name).await.unwrap();

Check warning on line 143 in src/bin/perf-producer.rs

View check run for this annotation

Codecov / codecov/patch

src/bin/perf-producer.rs#L137-L143

Added lines #L137 - L143 were not covered by tests

loop {
let start = Instant::now();
let messages = create_messages(message_per_second);
let messages_sent = messages.len() as u32;
for message in messages {
producer.send(message, |_| async {}).await.unwrap();

Check warning on line 150 in src/bin/perf-producer.rs

View check run for this annotation

Codecov / codecov/patch

src/bin/perf-producer.rs#L146-L150

Added lines #L146 - L150 were not covered by tests
}
produced_messages.fetch_add(messages_sent, Ordering::Relaxed);

let elapsed = start.elapsed();

if ONE_SECOND > elapsed {
sleep(ONE_SECOND - elapsed).await;
}

Check warning on line 158 in src/bin/perf-producer.rs

View check run for this annotation

Codecov / codecov/patch

src/bin/perf-producer.rs#L152-L158

Added lines #L152 - L158 were not covered by tests
}
}

fn create_messages(message_count_per_batch: usize) -> Vec<Message> {
(0..message_count_per_batch)
.map(|_| {
let start = SystemTime::now();
let since_the_epoch = start
.duration_since(UNIX_EPOCH)
.expect("Time went backwards");
let since_the_epoch = since_the_epoch.as_millis();
Message::builder()
.body(since_the_epoch.to_be_bytes())
.build()
})
.collect()
}

Check warning on line 175 in src/bin/perf-producer.rs

View check run for this annotation

Codecov / codecov/patch

src/bin/perf-producer.rs#L162-L175

Added lines #L162 - L175 were not covered by tests
2 changes: 1 addition & 1 deletion src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -622,7 +622,7 @@
M: FnOnce(u32) -> R,
{
let Some((correlation_id, mut receiver)) = self.dispatcher.response_channel() else {
trace!("Connection si closed here");
trace!("Connection is closed here");

Check warning on line 625 in src/client/mod.rs

View check run for this annotation

Codecov / codecov/patch

src/client/mod.rs#L625

Added line #L625 was not covered by tests
return Err(ClientError::ConnectionClosed);
};

Expand Down
2 changes: 0 additions & 2 deletions src/environment.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
use std::marker::PhantomData;
use std::sync::Arc;
use std::time::Duration;

use crate::types::OffsetSpecification;
use crate::{client::TlsConfiguration, producer::NoDedup};
Expand Down Expand Up @@ -197,7 +196,6 @@ impl Environment {
environment: self.clone(),
name: None,
batch_size: 100,
batch_publishing_delay: Duration::from_millis(100),
data: PhantomData,
filter_value_extractor: None,
client_provided_name: String::from("rust-stream-producer"),
Expand Down
Loading
Loading