Skip to content

Commit

Permalink
Add multi-threaded test for Counter (open-telemetry#1845)
Browse files Browse the repository at this point in the history
Co-authored-by: Cijo Thomas <cijo.thomas@gmail.com>
  • Loading branch information
utpilla and cijothomas authored May 29, 2024
1 parent 3825edc commit e2e6a6e
Showing 1 changed file with 56 additions and 0 deletions.
56 changes: 56 additions & 0 deletions opentelemetry-sdk/src/metrics/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,7 @@ mod tests {
use rand::{rngs, Rng, SeedableRng};
use std::borrow::Cow;
use std::sync::{Arc, Mutex};
use std::thread;

// Run all tests in this mod
// cargo test metrics::tests --features=metrics,testing
Expand Down Expand Up @@ -1032,6 +1033,61 @@ mod tests {
assert!(resource_metrics.is_empty(), "No metrics should be exported as no new measurements were recorded since last collect.");
}

#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
async fn counter_multithreaded() {
// Run this test with stdout enabled to see output.
// cargo test counter_multithreaded --features=metrics,testing -- --nocapture

counter_multithreaded_aggregation_helper(Temporality::Delta);
counter_multithreaded_aggregation_helper(Temporality::Cumulative);
}

fn counter_multithreaded_aggregation_helper(temporality: Temporality) {
// Arrange
let mut test_context = TestContext::new(temporality);
let counter = Arc::new(test_context.u64_counter("test", "my_counter", None));

let mut update_threads = vec![];
for _ in 0..10 {
let counter = Arc::clone(&counter);

update_threads.push(thread::spawn(move || {
counter.add(1, &[KeyValue::new("key1", "value1")]);
counter.add(1, &[KeyValue::new("key1", "value1")]);
counter.add(1, &[KeyValue::new("key1", "value1")]);
counter.add(1, &[KeyValue::new("key1", "value1")]);
counter.add(1, &[KeyValue::new("key1", "value1")]);
}));
}

for thread in update_threads {
thread.join().unwrap();
}

test_context.flush_metrics();

// Assert
let sum = test_context.get_aggregation::<data::Sum<u64>>("my_counter", None);
// Expecting 2 time-series.
assert_eq!(sum.data_points.len(), 1);
assert!(sum.is_monotonic, "Counter should produce monotonic.");
assert_eq!(sum.temporality, temporality);
if let Temporality::Cumulative = temporality {
assert_eq!(
sum.temporality,
Temporality::Cumulative,
"Should produce cumulative"
);
} else {
assert_eq!(sum.temporality, Temporality::Delta, "Should produce delta");
}

// find and validate key1=value2 datapoint
let data_point1 = find_datapoint_with_key_value(&sum.data_points, "key1", "value1")
.expect("datapoint with key1=value1 expected");
assert_eq!(data_point1.value, 50); // Each of the 10 update threads record measurements summing up to 5.
}

fn histogram_aggregation_helper(temporality: Temporality) {
// Arrange
let mut test_context = TestContext::new(temporality);
Expand Down

0 comments on commit e2e6a6e

Please sign in to comment.