Skip to content

Commit

Permalink
Merge branch 'main' into trace-set-resource
Browse files Browse the repository at this point in the history
  • Loading branch information
TommyCpp authored May 30, 2024
2 parents 577a951 + e2e6a6e commit 5d0f0c0
Showing 1 changed file with 81 additions and 22 deletions.
103 changes: 81 additions & 22 deletions opentelemetry-sdk/src/metrics/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,7 @@ mod tests {
use rand::{rngs, Rng, SeedableRng};
use std::borrow::Cow;
use std::sync::{Arc, Mutex};
use std::thread;

// Run all tests in this mod
// cargo test metrics::tests --features=metrics,testing
Expand Down Expand Up @@ -217,22 +218,25 @@ mod tests {
#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
async fn updown_counter_aggregation_cumulative() {
// Run this test with stdout enabled to see output.
// cargo test counter_aggregation_cumulative --features=metrics,testing -- --nocapture
// cargo test updown_counter_aggregation_cumulative --features=metrics,testing -- --nocapture
updown_counter_aggregation_helper(Temporality::Cumulative);
}

#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
async fn updown_counter_aggregation_delta() {
// Run this test with stdout enabled to see output.
// cargo test counter_aggregation_delta --features=metrics,testing -- --nocapture
// cargo test updown_counter_aggregation_delta --features=metrics,testing -- --nocapture
updown_counter_aggregation_helper(Temporality::Delta);
}

#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
async fn gauge_aggregation() {
// Run this test with stdout enabled to see output.
// cargo test gauge_aggregation --features=metrics,testing -- --nocapture
gauge_aggregation_helper();

// Gauge should use last value aggregation regardless of the aggregation temporality used.
gauge_aggregation_helper(Temporality::Delta);
gauge_aggregation_helper(Temporality::Cumulative);
}

#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
Expand Down Expand Up @@ -1029,6 +1033,61 @@ mod tests {
assert!(resource_metrics.is_empty(), "No metrics should be exported as no new measurements were recorded since last collect.");
}

#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
async fn counter_multithreaded() {
// Run this test with stdout enabled to see output.
// cargo test counter_multithreaded --features=metrics,testing -- --nocapture

counter_multithreaded_aggregation_helper(Temporality::Delta);
counter_multithreaded_aggregation_helper(Temporality::Cumulative);
}

fn counter_multithreaded_aggregation_helper(temporality: Temporality) {
// Arrange
let mut test_context = TestContext::new(temporality);
let counter = Arc::new(test_context.u64_counter("test", "my_counter", None));

let mut update_threads = vec![];
for _ in 0..10 {
let counter = Arc::clone(&counter);

update_threads.push(thread::spawn(move || {
counter.add(1, &[KeyValue::new("key1", "value1")]);
counter.add(1, &[KeyValue::new("key1", "value1")]);
counter.add(1, &[KeyValue::new("key1", "value1")]);
counter.add(1, &[KeyValue::new("key1", "value1")]);
counter.add(1, &[KeyValue::new("key1", "value1")]);
}));
}

for thread in update_threads {
thread.join().unwrap();
}

test_context.flush_metrics();

// Assert
let sum = test_context.get_aggregation::<data::Sum<u64>>("my_counter", None);
// Expecting 2 time-series.
assert_eq!(sum.data_points.len(), 1);
assert!(sum.is_monotonic, "Counter should produce monotonic.");
assert_eq!(sum.temporality, temporality);
if let Temporality::Cumulative = temporality {
assert_eq!(
sum.temporality,
Temporality::Cumulative,
"Should produce cumulative"
);
} else {
assert_eq!(sum.temporality, Temporality::Delta, "Should produce delta");
}

// find and validate key1=value2 datapoint
let data_point1 = find_datapoint_with_key_value(&sum.data_points, "key1", "value1")
.expect("datapoint with key1=value1 expected");
assert_eq!(data_point1.value, 50); // Each of the 10 update threads record measurements summing up to 5.
}

fn histogram_aggregation_helper(temporality: Temporality) {
// Arrange
let mut test_context = TestContext::new(temporality);
Expand Down Expand Up @@ -1134,9 +1193,9 @@ mod tests {
}
}

fn gauge_aggregation_helper() {
fn gauge_aggregation_helper(temporality: Temporality) {
// Arrange
let mut test_context = TestContext::new(Temporality::Delta);
let mut test_context = TestContext::new(temporality);
let gauge = test_context.meter().i64_gauge("my_gauge").init();

// Act
Expand Down Expand Up @@ -1274,15 +1333,15 @@ mod tests {
let counter = test_context.i64_up_down_counter("test", "my_updown_counter", None);

// Act
counter.add(1, &[KeyValue::new("key1", "value1")]);
counter.add(1, &[KeyValue::new("key1", "value1")]);
counter.add(1, &[KeyValue::new("key1", "value1")]);
counter.add(1, &[KeyValue::new("key1", "value1")]);
counter.add(10, &[KeyValue::new("key1", "value1")]);
counter.add(-1, &[KeyValue::new("key1", "value1")]);
counter.add(-5, &[KeyValue::new("key1", "value1")]);
counter.add(0, &[KeyValue::new("key1", "value1")]);
counter.add(1, &[KeyValue::new("key1", "value1")]);

counter.add(1, &[KeyValue::new("key1", "value2")]);
counter.add(1, &[KeyValue::new("key1", "value2")]);
counter.add(1, &[KeyValue::new("key1", "value2")]);
counter.add(10, &[KeyValue::new("key1", "value2")]);
counter.add(0, &[KeyValue::new("key1", "value2")]);
counter.add(-3, &[KeyValue::new("key1", "value2")]);

test_context.flush_metrics();

Expand Down Expand Up @@ -1311,19 +1370,19 @@ mod tests {

let data_point1 = find_datapoint_with_key_value(&sum.data_points, "key1", "value2")
.expect("datapoint with key1=value2 expected");
assert_eq!(data_point1.value, 3);
assert_eq!(data_point1.value, 7);

// Reset and report more measurements
test_context.reset_metrics();
counter.add(1, &[KeyValue::new("key1", "value1")]);
counter.add(1, &[KeyValue::new("key1", "value1")]);
counter.add(1, &[KeyValue::new("key1", "value1")]);
counter.add(1, &[KeyValue::new("key1", "value1")]);
counter.add(10, &[KeyValue::new("key1", "value1")]);
counter.add(-1, &[KeyValue::new("key1", "value1")]);
counter.add(-5, &[KeyValue::new("key1", "value1")]);
counter.add(0, &[KeyValue::new("key1", "value1")]);
counter.add(1, &[KeyValue::new("key1", "value1")]);

counter.add(1, &[KeyValue::new("key1", "value2")]);
counter.add(1, &[KeyValue::new("key1", "value2")]);
counter.add(1, &[KeyValue::new("key1", "value2")]);
counter.add(10, &[KeyValue::new("key1", "value2")]);
counter.add(0, &[KeyValue::new("key1", "value2")]);
counter.add(-3, &[KeyValue::new("key1", "value2")]);

test_context.flush_metrics();

Expand All @@ -1340,9 +1399,9 @@ mod tests {
let data_point1 = find_datapoint_with_key_value(&sum.data_points, "key1", "value2")
.expect("datapoint with key1=value2 expected");
if temporality == Temporality::Cumulative {
assert_eq!(data_point1.value, 6);
assert_eq!(data_point1.value, 14);
} else {
assert_eq!(data_point1.value, 3);
assert_eq!(data_point1.value, 7);
}
}

Expand Down

0 comments on commit 5d0f0c0

Please sign in to comment.