diff --git a/opentelemetry-sdk/src/metrics/internal/mod.rs b/opentelemetry-sdk/src/metrics/internal/mod.rs index cf0edeb47c..84d0735053 100644 --- a/opentelemetry-sdk/src/metrics/internal/mod.rs +++ b/opentelemetry-sdk/src/metrics/internal/mod.rs @@ -5,12 +5,136 @@ mod last_value; mod sum; use core::fmt; +use std::collections::HashMap; +use std::marker::PhantomData; use std::ops::{Add, AddAssign, Sub}; -use std::sync::atomic::{AtomicI64, AtomicU64, Ordering}; -use std::sync::Mutex; +use std::sync::atomic::{AtomicBool, AtomicI64, AtomicU64, AtomicUsize, Ordering}; +use std::sync::{Arc, Mutex, RwLock}; +use aggregate::is_under_cardinality_limit; pub(crate) use aggregate::{AggregateBuilder, ComputeAggregation, Measure}; pub(crate) use exponential_histogram::{EXPO_MAX_SCALE, EXPO_MIN_SCALE}; +use once_cell::sync::Lazy; +use opentelemetry::metrics::MetricsError; +use opentelemetry::{global, KeyValue}; + +use crate::metrics::AttributeSet; + +pub(crate) static STREAM_OVERFLOW_ATTRIBUTES: Lazy> = + Lazy::new(|| vec![KeyValue::new("otel.metric.overflow", "true")]); + +/// Abstracts the update operation for a measurement. +pub(crate) trait Operation { + fn update_tracker>(tracker: &AT, value: T); +} + +struct Increment; + +impl Operation for Increment { + fn update_tracker>(tracker: &AT, value: T) { + tracker.add(value); + } +} + +struct Assign; + +impl Operation for Assign { + fn update_tracker>(tracker: &AT, value: T) { + tracker.store(value); + } +} + +/// The storage for sums. +/// +/// This structure is parametrized by an `Operation` that indicates how +/// updates to the underlying value trackers should be performed. +pub(crate) struct ValueMap, O> { + /// Trackers store the values associated with different attribute sets. + trackers: RwLock, Arc>>, + /// Number of different attribute set stored in the `trackers` map. + count: AtomicUsize, + /// Indicates whether a value with no attributes has been stored. + has_no_attribute_value: AtomicBool, + /// Tracker for values with no attributes attached. + no_attribute_tracker: T::AtomicTracker, + phantom: PhantomData, +} + +impl, O> Default for ValueMap { + fn default() -> Self { + ValueMap::new() + } +} + +impl, O> ValueMap { + fn new() -> Self { + ValueMap { + trackers: RwLock::new(HashMap::new()), + has_no_attribute_value: AtomicBool::new(false), + no_attribute_tracker: T::new_atomic_tracker(), + count: AtomicUsize::new(0), + phantom: PhantomData, + } + } +} + +impl, O: Operation> ValueMap { + fn measure(&self, measurement: T, attributes: &[KeyValue]) { + if attributes.is_empty() { + O::update_tracker(&self.no_attribute_tracker, measurement); + self.has_no_attribute_value.store(true, Ordering::Release); + return; + } + + let Ok(trackers) = self.trackers.read() else { + return; + }; + + // Try to retrieve and update the tracker with the attributes in the provided order first + if let Some(tracker) = trackers.get(attributes) { + O::update_tracker(&**tracker, measurement); + return; + } + + // Try to retrieve and update the tracker with the attributes sorted. + let sorted_attrs = AttributeSet::from(attributes).into_vec(); + if let Some(tracker) = trackers.get(sorted_attrs.as_slice()) { + O::update_tracker(&**tracker, measurement); + return; + } + + // Give up the read lock before acquiring the write lock. + drop(trackers); + + let Ok(mut trackers) = self.trackers.write() else { + return; + }; + + // Recheck both the provided and sorted orders after acquiring the write lock + // in case another thread has pushed an update in the meantime. + if let Some(tracker) = trackers.get(attributes) { + O::update_tracker(&**tracker, measurement); + } else if let Some(tracker) = trackers.get(sorted_attrs.as_slice()) { + O::update_tracker(&**tracker, measurement); + } else if is_under_cardinality_limit(self.count.load(Ordering::SeqCst)) { + let new_tracker = Arc::new(T::new_atomic_tracker()); + O::update_tracker(&*new_tracker, measurement); + + // Insert tracker with the attributes in the provided and sorted orders + trackers.insert(attributes.to_vec(), new_tracker.clone()); + trackers.insert(sorted_attrs, new_tracker); + + self.count.fetch_add(1, Ordering::SeqCst); + } else if let Some(overflow_value) = trackers.get(STREAM_OVERFLOW_ATTRIBUTES.as_slice()) { + O::update_tracker(&**overflow_value, measurement); + } else { + let new_tracker = T::new_atomic_tracker(); + O::update_tracker(&new_tracker, measurement); + trackers.insert(STREAM_OVERFLOW_ATTRIBUTES.clone(), Arc::new(new_tracker)); + global::handle_error(MetricsError::Other("Warning: Maximum data points for metric stream exceeded. Entry added to overflow. Subsequent overflows to same metric until next collect will not be logged.".into())); + } + } +} /// Marks a type that can have a value added and retrieved atomically. Required since /// different types have different backing atomic mechanisms diff --git a/opentelemetry-sdk/src/metrics/internal/sum.rs b/opentelemetry-sdk/src/metrics/internal/sum.rs index 1ed76fdae9..36108c86dc 100644 --- a/opentelemetry-sdk/src/metrics/internal/sum.rs +++ b/opentelemetry-sdk/src/metrics/internal/sum.rs @@ -1,137 +1,14 @@ use std::collections::HashSet; -use std::marker::PhantomData; -use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; +use std::sync::atomic::Ordering; use std::sync::Arc; use std::vec; -use std::{ - collections::HashMap, - sync::{Mutex, RwLock}, - time::SystemTime, -}; +use std::{collections::HashMap, sync::Mutex, time::SystemTime}; use crate::metrics::data::{self, Aggregation, DataPoint, Temporality}; -use crate::metrics::AttributeSet; -use once_cell::sync::Lazy; use opentelemetry::KeyValue; -use opentelemetry::{global, metrics::MetricsError}; -use super::{aggregate::is_under_cardinality_limit, AtomicTracker, Number}; - -pub(crate) static STREAM_OVERFLOW_ATTRIBUTES: Lazy> = - Lazy::new(|| vec![KeyValue::new("otel.metric.overflow", "true")]); - -/// Abstracts the update operation for a measurement. -trait Operation { - fn update_tracker>(tracker: &AT, value: T); -} - -struct Increment; - -impl Operation for Increment { - fn update_tracker>(tracker: &AT, value: T) { - tracker.add(value); - } -} - -struct Assign; - -impl Operation for Assign { - fn update_tracker>(tracker: &AT, value: T) { - tracker.store(value); - } -} - -/// The storage for sums. -/// -/// This structure is parametrized by an `Operation` that indicates how -/// updates to the underlying value trackers should be performed. -struct ValueMap, O> { - /// Trackers store the values associated with different attribute sets. - trackers: RwLock, Arc>>, - /// Number of different attribute set stored in the `trackers` map. - count: AtomicUsize, - /// Indicates whether a value with no attributes has been stored. - has_no_attribute_value: AtomicBool, - /// Tracker for values with no attributes attached. - no_attribute_tracker: T::AtomicTracker, - phantom: PhantomData, -} - -impl, O> Default for ValueMap { - fn default() -> Self { - ValueMap::new() - } -} - -impl, O> ValueMap { - fn new() -> Self { - ValueMap { - trackers: RwLock::new(HashMap::new()), - has_no_attribute_value: AtomicBool::new(false), - no_attribute_tracker: T::new_atomic_tracker(), - count: AtomicUsize::new(0), - phantom: PhantomData, - } - } -} - -impl, O: Operation> ValueMap { - fn measure(&self, measurement: T, attributes: &[KeyValue]) { - if attributes.is_empty() { - O::update_tracker(&self.no_attribute_tracker, measurement); - self.has_no_attribute_value.store(true, Ordering::Release); - return; - } - - let Ok(trackers) = self.trackers.read() else { - return; - }; - - // Try to retrieve and update the tracker with the attributes in the provided order first - if let Some(tracker) = trackers.get(attributes) { - O::update_tracker(&**tracker, measurement); - return; - } - - // Try to retrieve and update the tracker with the attributes sorted. - let sorted_attrs = AttributeSet::from(attributes).into_vec(); - if let Some(tracker) = trackers.get(sorted_attrs.as_slice()) { - O::update_tracker(&**tracker, measurement); - return; - } - - // Give up the read lock before acquiring the write lock. - drop(trackers); - - let Ok(mut trackers) = self.trackers.write() else { - return; - }; - - // Recheck both the provided and sorted orders after acquiring the write lock - // in case another thread has pushed an update in the meantime. - if let Some(tracker) = trackers.get(attributes) { - O::update_tracker(&**tracker, measurement); - } else if let Some(tracker) = trackers.get(sorted_attrs.as_slice()) { - O::update_tracker(&**tracker, measurement); - } else if is_under_cardinality_limit(self.count.load(Ordering::SeqCst)) { - let new_tracker = Arc::new(T::new_atomic_tracker()); - O::update_tracker(&*new_tracker, measurement); - - // Insert tracker with the attributes in the provided and sorted orders - trackers.insert(attributes.to_vec(), new_tracker.clone()); - trackers.insert(sorted_attrs, new_tracker); - - self.count.fetch_add(1, Ordering::SeqCst); - } else if let Some(overflow_value) = trackers.get(STREAM_OVERFLOW_ATTRIBUTES.as_slice()) { - O::update_tracker(&**overflow_value, measurement); - } else { - let new_tracker = T::new_atomic_tracker(); - O::update_tracker(&new_tracker, measurement); - trackers.insert(STREAM_OVERFLOW_ATTRIBUTES.clone(), Arc::new(new_tracker)); - global::handle_error(MetricsError::Other("Warning: Maximum data points for metric stream exceeded. Entry added to overflow. Subsequent overflows to same metric until next collect will not be logged.".into())); - } - } -} +use super::{Assign, Increment, ValueMap}; +use super::{AtomicTracker, Number}; /// Summarizes a set of measurements made as their arithmetic sum. pub(crate) struct Sum> {