diff --git a/opentelemetry-sdk/src/metrics/mod.rs b/opentelemetry-sdk/src/metrics/mod.rs index 94ab2345f6..e342311c30 100644 --- a/opentelemetry-sdk/src/metrics/mod.rs +++ b/opentelemetry-sdk/src/metrics/mod.rs @@ -150,6 +150,7 @@ mod tests { use rand::{rngs, Rng, SeedableRng}; use std::borrow::Cow; use std::sync::{Arc, Mutex}; + use std::thread; // Run all tests in this mod // cargo test metrics::tests --features=metrics,testing @@ -217,14 +218,14 @@ mod tests { #[tokio::test(flavor = "multi_thread", worker_threads = 1)] async fn updown_counter_aggregation_cumulative() { // Run this test with stdout enabled to see output. - // cargo test counter_aggregation_cumulative --features=metrics,testing -- --nocapture + // cargo test updown_counter_aggregation_cumulative --features=metrics,testing -- --nocapture updown_counter_aggregation_helper(Temporality::Cumulative); } #[tokio::test(flavor = "multi_thread", worker_threads = 1)] async fn updown_counter_aggregation_delta() { // Run this test with stdout enabled to see output. - // cargo test counter_aggregation_delta --features=metrics,testing -- --nocapture + // cargo test updown_counter_aggregation_delta --features=metrics,testing -- --nocapture updown_counter_aggregation_helper(Temporality::Delta); } @@ -232,7 +233,10 @@ mod tests { async fn gauge_aggregation() { // Run this test with stdout enabled to see output. // cargo test gauge_aggregation --features=metrics,testing -- --nocapture - gauge_aggregation_helper(); + + // Gauge should use last value aggregation regardless of the aggregation temporality used. + gauge_aggregation_helper(Temporality::Delta); + gauge_aggregation_helper(Temporality::Cumulative); } #[tokio::test(flavor = "multi_thread", worker_threads = 1)] @@ -1029,6 +1033,61 @@ mod tests { assert!(resource_metrics.is_empty(), "No metrics should be exported as no new measurements were recorded since last collect."); } + #[tokio::test(flavor = "multi_thread", worker_threads = 1)] + async fn counter_multithreaded() { + // Run this test with stdout enabled to see output. + // cargo test counter_multithreaded --features=metrics,testing -- --nocapture + + counter_multithreaded_aggregation_helper(Temporality::Delta); + counter_multithreaded_aggregation_helper(Temporality::Cumulative); + } + + fn counter_multithreaded_aggregation_helper(temporality: Temporality) { + // Arrange + let mut test_context = TestContext::new(temporality); + let counter = Arc::new(test_context.u64_counter("test", "my_counter", None)); + + let mut update_threads = vec![]; + for _ in 0..10 { + let counter = Arc::clone(&counter); + + update_threads.push(thread::spawn(move || { + counter.add(1, &[KeyValue::new("key1", "value1")]); + counter.add(1, &[KeyValue::new("key1", "value1")]); + counter.add(1, &[KeyValue::new("key1", "value1")]); + counter.add(1, &[KeyValue::new("key1", "value1")]); + counter.add(1, &[KeyValue::new("key1", "value1")]); + })); + } + + for thread in update_threads { + thread.join().unwrap(); + } + + test_context.flush_metrics(); + + // Assert + let sum = test_context.get_aggregation::>("my_counter", None); + // Expecting 2 time-series. + assert_eq!(sum.data_points.len(), 1); + assert!(sum.is_monotonic, "Counter should produce monotonic."); + assert_eq!(sum.temporality, temporality); + if let Temporality::Cumulative = temporality { + assert_eq!( + sum.temporality, + Temporality::Cumulative, + "Should produce cumulative" + ); + } else { + assert_eq!(sum.temporality, Temporality::Delta, "Should produce delta"); + } + + // find and validate key1=value2 datapoint + let data_point1 = find_datapoint_with_key_value(&sum.data_points, "key1", "value1") + .expect("datapoint with key1=value1 expected"); + assert_eq!(data_point1.value, 50); // Each of the 10 update threads record measurements summing up to 5. + } + fn histogram_aggregation_helper(temporality: Temporality) { // Arrange let mut test_context = TestContext::new(temporality); @@ -1134,9 +1193,9 @@ mod tests { } } - fn gauge_aggregation_helper() { + fn gauge_aggregation_helper(temporality: Temporality) { // Arrange - let mut test_context = TestContext::new(Temporality::Delta); + let mut test_context = TestContext::new(temporality); let gauge = test_context.meter().i64_gauge("my_gauge").init(); // Act @@ -1274,15 +1333,15 @@ mod tests { let counter = test_context.i64_up_down_counter("test", "my_updown_counter", None); // Act - counter.add(1, &[KeyValue::new("key1", "value1")]); - counter.add(1, &[KeyValue::new("key1", "value1")]); - counter.add(1, &[KeyValue::new("key1", "value1")]); - counter.add(1, &[KeyValue::new("key1", "value1")]); + counter.add(10, &[KeyValue::new("key1", "value1")]); + counter.add(-1, &[KeyValue::new("key1", "value1")]); + counter.add(-5, &[KeyValue::new("key1", "value1")]); + counter.add(0, &[KeyValue::new("key1", "value1")]); counter.add(1, &[KeyValue::new("key1", "value1")]); - counter.add(1, &[KeyValue::new("key1", "value2")]); - counter.add(1, &[KeyValue::new("key1", "value2")]); - counter.add(1, &[KeyValue::new("key1", "value2")]); + counter.add(10, &[KeyValue::new("key1", "value2")]); + counter.add(0, &[KeyValue::new("key1", "value2")]); + counter.add(-3, &[KeyValue::new("key1", "value2")]); test_context.flush_metrics(); @@ -1311,19 +1370,19 @@ mod tests { let data_point1 = find_datapoint_with_key_value(&sum.data_points, "key1", "value2") .expect("datapoint with key1=value2 expected"); - assert_eq!(data_point1.value, 3); + assert_eq!(data_point1.value, 7); // Reset and report more measurements test_context.reset_metrics(); - counter.add(1, &[KeyValue::new("key1", "value1")]); - counter.add(1, &[KeyValue::new("key1", "value1")]); - counter.add(1, &[KeyValue::new("key1", "value1")]); - counter.add(1, &[KeyValue::new("key1", "value1")]); + counter.add(10, &[KeyValue::new("key1", "value1")]); + counter.add(-1, &[KeyValue::new("key1", "value1")]); + counter.add(-5, &[KeyValue::new("key1", "value1")]); + counter.add(0, &[KeyValue::new("key1", "value1")]); counter.add(1, &[KeyValue::new("key1", "value1")]); - counter.add(1, &[KeyValue::new("key1", "value2")]); - counter.add(1, &[KeyValue::new("key1", "value2")]); - counter.add(1, &[KeyValue::new("key1", "value2")]); + counter.add(10, &[KeyValue::new("key1", "value2")]); + counter.add(0, &[KeyValue::new("key1", "value2")]); + counter.add(-3, &[KeyValue::new("key1", "value2")]); test_context.flush_metrics(); @@ -1340,9 +1399,9 @@ mod tests { let data_point1 = find_datapoint_with_key_value(&sum.data_points, "key1", "value2") .expect("datapoint with key1=value2 expected"); if temporality == Temporality::Cumulative { - assert_eq!(data_point1.value, 6); + assert_eq!(data_point1.value, 14); } else { - assert_eq!(data_point1.value, 3); + assert_eq!(data_point1.value, 7); } }