diff --git a/.cspell.json b/.cspell.json index a091c48706..47fc5d18e8 100644 --- a/.cspell.json +++ b/.cspell.json @@ -32,8 +32,10 @@ "Cijo", "clippy", "codecov", + "datapoint", "deque", "Dirkjan", + "EPYC", "hasher", "isahc", "Isobel", @@ -45,6 +47,7 @@ "msrv", "mykey", "myvalue", + "nocapture", "Ochtman", "opentelemetry", "OTLP", diff --git a/opentelemetry-sdk/src/metrics/mod.rs b/opentelemetry-sdk/src/metrics/mod.rs index e7a51213b0..ca5076763d 100644 --- a/opentelemetry-sdk/src/metrics/mod.rs +++ b/opentelemetry-sdk/src/metrics/mod.rs @@ -145,9 +145,10 @@ mod tests { use crate::metrics::reader::TemporalitySelector; use crate::testing::metrics::InMemoryMetricsExporterBuilder; use crate::{runtime, testing::metrics::InMemoryMetricsExporter}; - use opentelemetry::metrics::{Counter, UpDownCounter}; + use opentelemetry::metrics::{Counter, Meter, UpDownCounter}; use opentelemetry::{metrics::MeterProvider as _, KeyValue}; use std::borrow::Cow; + use std::sync::{Arc, Mutex}; // Run all tests in this mod // cargo test metrics::tests --features=metrics,testing @@ -213,86 +214,94 @@ mod tests { } #[tokio::test(flavor = "multi_thread", worker_threads = 1)] - async fn observable_counter_aggregation() { + async fn observable_counter_aggregation_cumulative_non_zero_increment() { // Run this test with stdout enabled to see output. - // cargo test observable_counter_aggregation --features=metrics,testing -- --nocapture + // cargo test observable_counter_aggregation_cumulative_non_zero_increment --features=testing -- --nocapture + observable_counter_aggregation_helper(Temporality::Cumulative, 100, 10, 4); + } - // Arrange - let exporter = InMemoryMetricsExporter::default(); - let reader = PeriodicReader::builder(exporter.clone(), runtime::Tokio).build(); - let meter_provider = SdkMeterProvider::builder().with_reader(reader).build(); + #[tokio::test(flavor = "multi_thread", worker_threads = 1)] + async fn observable_counter_aggregation_delta_non_zero_increment() { + // Run this test with stdout enabled to see output. + // cargo test observable_counter_aggregation_delta_non_zero_increment --features=testing -- --nocapture + observable_counter_aggregation_helper(Temporality::Delta, 100, 10, 4); + } - // Act - let meter = meter_provider.meter("test"); - let _counter = meter + #[tokio::test(flavor = "multi_thread", worker_threads = 1)] + async fn observable_counter_aggregation_cumulative_zero_increment() { + // Run this test with stdout enabled to see output. + // cargo test observable_counter_aggregation_cumulative_zero_increment --features=testing -- --nocapture + observable_counter_aggregation_helper(Temporality::Cumulative, 100, 0, 4); + } + + #[tokio::test(flavor = "multi_thread", worker_threads = 1)] + #[ignore = "Aggregation bug! https://github.com/open-telemetry/opentelemetry-rust/issues/1517"] + async fn observable_counter_aggregation_delta_zero_increment() { + // Run this test with stdout enabled to see output. + // cargo test observable_counter_aggregation_delta_zero_increment --features=testing -- --nocapture + observable_counter_aggregation_helper(Temporality::Delta, 100, 0, 4); + } + + fn observable_counter_aggregation_helper( + temporality: Temporality, + start: u64, + increment: u64, + length: u64, + ) { + // Arrange + let mut test_context = TestContext::new(temporality); + // The Observable counter reports values[0], values[1],....values[n] on each flush. + let values: Vec = (0..length).map(|i| start + i * increment).collect(); + println!("Testing with observable values: {:?}", values); + let values = Arc::new(values); + let values_clone = values.clone(); + let i = Arc::new(Mutex::new(0)); + let _observable_counter = test_context + .meter() .u64_observable_counter("my_observable_counter") .with_unit("my_unit") - .with_callback(|observer| { - observer.observe(100, &[KeyValue::new("key1", "value1")]); - observer.observe(200, &[KeyValue::new("key1", "value2")]); + .with_callback(move |observer| { + let mut index = i.lock().unwrap(); + if *index < values.len() { + observer.observe(values[*index], &[KeyValue::new("key1", "value1")]); + *index += 1; + } }) .init(); - meter_provider.force_flush().unwrap(); - - // Assert - let resource_metrics = exporter - .get_finished_metrics() - .expect("metrics are expected to be exported."); - assert!(!resource_metrics.is_empty()); - let metric = &resource_metrics[0].scope_metrics[0].metrics[0]; - assert_eq!(metric.name, "my_observable_counter"); - assert_eq!(metric.unit, "my_unit"); - let sum = metric - .data - .as_any() - .downcast_ref::>() - .expect("Sum aggregation expected for ObservableCounter instruments by default"); - - // Expecting 2 time-series. - assert_eq!(sum.data_points.len(), 2); - assert!(sum.is_monotonic, "Counter should produce monotonic."); - assert_eq!( - sum.temporality, - data::Temporality::Cumulative, - "Should produce cumulative by default." - ); - - // find and validate key1=value1 datapoint - let mut data_point1 = None; - for datapoint in &sum.data_points { - if datapoint - .attributes - .iter() - .any(|kv| kv.key.as_str() == "key1" && kv.value.as_str() == "value1") - { - data_point1 = Some(datapoint); + for (iter, v) in values_clone.iter().enumerate() { + test_context.flush_metrics(); + let sum = test_context.get_aggregation::>("my_observable_counter", None); + assert_eq!(sum.data_points.len(), 1); + assert!(sum.is_monotonic, "Counter should produce monotonic."); + if let Temporality::Cumulative = temporality { + assert_eq!( + sum.temporality, + Temporality::Cumulative, + "Should produce cumulative" + ); + } else { + assert_eq!(sum.temporality, Temporality::Delta, "Should produce delta"); } - } - assert_eq!( - data_point1 - .expect("datapoint with key1=value1 expected") - .value, - 100 - ); - // find and validate key1=value2 datapoint - let mut data_point1 = None; - for datapoint in &sum.data_points { - if datapoint - .attributes - .iter() - .any(|kv| kv.key.as_str() == "key1" && kv.value.as_str() == "value2") - { - data_point1 = Some(datapoint); + // find and validate key1=value1 datapoint + let data_point = find_datapoint_with_key_value(&sum.data_points, "key1", "value1") + .expect("datapoint with key1=value1 expected"); + if let Temporality::Cumulative = temporality { + // Cumulative counter should have the value as is. + assert_eq!(data_point.value, *v); + } else { + // Delta counter should have the increment value. + // Except for the first value which should be the start value. + if iter == 0 { + assert_eq!(data_point.value, start); + } else { + assert_eq!(data_point.value, increment); + } } + + test_context.reset_metrics(); } - assert_eq!( - data_point1 - .expect("datapoint with key1=value2 expected") - .value, - 200 - ); } #[tokio::test(flavor = "multi_thread", worker_threads = 1)] @@ -566,7 +575,6 @@ mod tests { } #[tokio::test(flavor = "multi_thread", worker_threads = 1)] - // #[ignore = "Spatial aggregation is not yet implemented."] async fn spatial_aggregation_when_view_drops_attributes_observable_counter() { // cargo test spatial_aggregation_when_view_drops_attributes_observable_counter --features=metrics,testing @@ -1169,6 +1177,10 @@ mod tests { updown_counter_builder.init() } + fn meter(&self) -> Meter { + self.meter_provider.meter("test") + } + fn flush_metrics(&self) { self.meter_provider.force_flush().unwrap(); }