From e2e6a6e2a86f6258511e0a9fdd638b828fe670a4 Mon Sep 17 00:00:00 2001 From: Utkarsh Umesan Pillai <66651184+utpilla@users.noreply.github.com> Date: Wed, 29 May 2024 16:17:18 -0700 Subject: [PATCH] Add multi-threaded test for Counter (#1845) Co-authored-by: Cijo Thomas --- opentelemetry-sdk/src/metrics/mod.rs | 56 ++++++++++++++++++++++++++++ 1 file changed, 56 insertions(+) diff --git a/opentelemetry-sdk/src/metrics/mod.rs b/opentelemetry-sdk/src/metrics/mod.rs index 5e75b0c7c4..e342311c30 100644 --- a/opentelemetry-sdk/src/metrics/mod.rs +++ b/opentelemetry-sdk/src/metrics/mod.rs @@ -150,6 +150,7 @@ mod tests { use rand::{rngs, Rng, SeedableRng}; use std::borrow::Cow; use std::sync::{Arc, Mutex}; + use std::thread; // Run all tests in this mod // cargo test metrics::tests --features=metrics,testing @@ -1032,6 +1033,61 @@ mod tests { assert!(resource_metrics.is_empty(), "No metrics should be exported as no new measurements were recorded since last collect."); } + #[tokio::test(flavor = "multi_thread", worker_threads = 1)] + async fn counter_multithreaded() { + // Run this test with stdout enabled to see output. + // cargo test counter_multithreaded --features=metrics,testing -- --nocapture + + counter_multithreaded_aggregation_helper(Temporality::Delta); + counter_multithreaded_aggregation_helper(Temporality::Cumulative); + } + + fn counter_multithreaded_aggregation_helper(temporality: Temporality) { + // Arrange + let mut test_context = TestContext::new(temporality); + let counter = Arc::new(test_context.u64_counter("test", "my_counter", None)); + + let mut update_threads = vec![]; + for _ in 0..10 { + let counter = Arc::clone(&counter); + + update_threads.push(thread::spawn(move || { + counter.add(1, &[KeyValue::new("key1", "value1")]); + counter.add(1, &[KeyValue::new("key1", "value1")]); + counter.add(1, &[KeyValue::new("key1", "value1")]); + counter.add(1, &[KeyValue::new("key1", "value1")]); + counter.add(1, &[KeyValue::new("key1", "value1")]); + })); + } + + for thread in update_threads { + thread.join().unwrap(); + } + + test_context.flush_metrics(); + + // Assert + let sum = test_context.get_aggregation::>("my_counter", None); + // Expecting 2 time-series. + assert_eq!(sum.data_points.len(), 1); + assert!(sum.is_monotonic, "Counter should produce monotonic."); + assert_eq!(sum.temporality, temporality); + if let Temporality::Cumulative = temporality { + assert_eq!( + sum.temporality, + Temporality::Cumulative, + "Should produce cumulative" + ); + } else { + assert_eq!(sum.temporality, Temporality::Delta, "Should produce delta"); + } + + // find and validate key1=value2 datapoint + let data_point1 = find_datapoint_with_key_value(&sum.data_points, "key1", "value1") + .expect("datapoint with key1=value1 expected"); + assert_eq!(data_point1.value, 50); // Each of the 10 update threads record measurements summing up to 5. + } + fn histogram_aggregation_helper(temporality: Temporality) { // Arrange let mut test_context = TestContext::new(temporality);