Skip to content

Commit

Permalink
Move ValueMap to mod file to allow for code reuse (open-telemetry#2012)
Browse files Browse the repository at this point in the history
  • Loading branch information
utpilla authored Aug 12, 2024
1 parent ed82d78 commit d583695
Show file tree
Hide file tree
Showing 2 changed files with 130 additions and 129 deletions.
128 changes: 126 additions & 2 deletions opentelemetry-sdk/src/metrics/internal/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,136 @@ mod last_value;
mod sum;

use core::fmt;
use std::collections::HashMap;
use std::marker::PhantomData;
use std::ops::{Add, AddAssign, Sub};
use std::sync::atomic::{AtomicI64, AtomicU64, Ordering};
use std::sync::Mutex;
use std::sync::atomic::{AtomicBool, AtomicI64, AtomicU64, AtomicUsize, Ordering};
use std::sync::{Arc, Mutex, RwLock};

use aggregate::is_under_cardinality_limit;
pub(crate) use aggregate::{AggregateBuilder, ComputeAggregation, Measure};
pub(crate) use exponential_histogram::{EXPO_MAX_SCALE, EXPO_MIN_SCALE};
use once_cell::sync::Lazy;
use opentelemetry::metrics::MetricsError;
use opentelemetry::{global, KeyValue};

use crate::metrics::AttributeSet;

pub(crate) static STREAM_OVERFLOW_ATTRIBUTES: Lazy<Vec<KeyValue>> =
Lazy::new(|| vec![KeyValue::new("otel.metric.overflow", "true")]);

/// Abstracts the update operation for a measurement.
pub(crate) trait Operation {
fn update_tracker<T: 'static, AT: AtomicTracker<T>>(tracker: &AT, value: T);
}

struct Increment;

impl Operation for Increment {
fn update_tracker<T: 'static, AT: AtomicTracker<T>>(tracker: &AT, value: T) {
tracker.add(value);
}
}

struct Assign;

impl Operation for Assign {
fn update_tracker<T: 'static, AT: AtomicTracker<T>>(tracker: &AT, value: T) {
tracker.store(value);
}
}

/// The storage for sums.
///
/// This structure is parametrized by an `Operation` that indicates how
/// updates to the underlying value trackers should be performed.
pub(crate) struct ValueMap<T: Number<T>, O> {
/// Trackers store the values associated with different attribute sets.
trackers: RwLock<HashMap<Vec<KeyValue>, Arc<T::AtomicTracker>>>,
/// Number of different attribute set stored in the `trackers` map.
count: AtomicUsize,
/// Indicates whether a value with no attributes has been stored.
has_no_attribute_value: AtomicBool,
/// Tracker for values with no attributes attached.
no_attribute_tracker: T::AtomicTracker,
phantom: PhantomData<O>,
}

impl<T: Number<T>, O> Default for ValueMap<T, O> {
fn default() -> Self {
ValueMap::new()
}
}

impl<T: Number<T>, O> ValueMap<T, O> {
fn new() -> Self {
ValueMap {
trackers: RwLock::new(HashMap::new()),
has_no_attribute_value: AtomicBool::new(false),
no_attribute_tracker: T::new_atomic_tracker(),
count: AtomicUsize::new(0),
phantom: PhantomData,
}
}
}

impl<T: Number<T>, O: Operation> ValueMap<T, O> {
fn measure(&self, measurement: T, attributes: &[KeyValue]) {
if attributes.is_empty() {
O::update_tracker(&self.no_attribute_tracker, measurement);
self.has_no_attribute_value.store(true, Ordering::Release);
return;
}

let Ok(trackers) = self.trackers.read() else {
return;
};

// Try to retrieve and update the tracker with the attributes in the provided order first
if let Some(tracker) = trackers.get(attributes) {
O::update_tracker(&**tracker, measurement);
return;
}

// Try to retrieve and update the tracker with the attributes sorted.
let sorted_attrs = AttributeSet::from(attributes).into_vec();
if let Some(tracker) = trackers.get(sorted_attrs.as_slice()) {
O::update_tracker(&**tracker, measurement);
return;
}

// Give up the read lock before acquiring the write lock.
drop(trackers);

let Ok(mut trackers) = self.trackers.write() else {
return;
};

// Recheck both the provided and sorted orders after acquiring the write lock
// in case another thread has pushed an update in the meantime.
if let Some(tracker) = trackers.get(attributes) {
O::update_tracker(&**tracker, measurement);
} else if let Some(tracker) = trackers.get(sorted_attrs.as_slice()) {
O::update_tracker(&**tracker, measurement);
} else if is_under_cardinality_limit(self.count.load(Ordering::SeqCst)) {
let new_tracker = Arc::new(T::new_atomic_tracker());
O::update_tracker(&*new_tracker, measurement);

// Insert tracker with the attributes in the provided and sorted orders
trackers.insert(attributes.to_vec(), new_tracker.clone());
trackers.insert(sorted_attrs, new_tracker);

self.count.fetch_add(1, Ordering::SeqCst);
} else if let Some(overflow_value) = trackers.get(STREAM_OVERFLOW_ATTRIBUTES.as_slice()) {
O::update_tracker(&**overflow_value, measurement);
} else {
let new_tracker = T::new_atomic_tracker();
O::update_tracker(&new_tracker, measurement);
trackers.insert(STREAM_OVERFLOW_ATTRIBUTES.clone(), Arc::new(new_tracker));
global::handle_error(MetricsError::Other("Warning: Maximum data points for metric stream exceeded. Entry added to overflow. Subsequent overflows to same metric until next collect will not be logged.".into()));
}
}
}

/// Marks a type that can have a value added and retrieved atomically. Required since
/// different types have different backing atomic mechanisms
Expand Down
131 changes: 4 additions & 127 deletions opentelemetry-sdk/src/metrics/internal/sum.rs
Original file line number Diff line number Diff line change
@@ -1,137 +1,14 @@
use std::collections::HashSet;
use std::marker::PhantomData;
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use std::sync::atomic::Ordering;
use std::sync::Arc;
use std::vec;
use std::{
collections::HashMap,
sync::{Mutex, RwLock},
time::SystemTime,
};
use std::{collections::HashMap, sync::Mutex, time::SystemTime};

use crate::metrics::data::{self, Aggregation, DataPoint, Temporality};
use crate::metrics::AttributeSet;
use once_cell::sync::Lazy;
use opentelemetry::KeyValue;
use opentelemetry::{global, metrics::MetricsError};

use super::{aggregate::is_under_cardinality_limit, AtomicTracker, Number};

pub(crate) static STREAM_OVERFLOW_ATTRIBUTES: Lazy<Vec<KeyValue>> =
Lazy::new(|| vec![KeyValue::new("otel.metric.overflow", "true")]);

/// Abstracts the update operation for a measurement.
trait Operation {
fn update_tracker<T: 'static, AT: AtomicTracker<T>>(tracker: &AT, value: T);
}

struct Increment;

impl Operation for Increment {
fn update_tracker<T: 'static, AT: AtomicTracker<T>>(tracker: &AT, value: T) {
tracker.add(value);
}
}

struct Assign;

impl Operation for Assign {
fn update_tracker<T: 'static, AT: AtomicTracker<T>>(tracker: &AT, value: T) {
tracker.store(value);
}
}

/// The storage for sums.
///
/// This structure is parametrized by an `Operation` that indicates how
/// updates to the underlying value trackers should be performed.
struct ValueMap<T: Number<T>, O> {
/// Trackers store the values associated with different attribute sets.
trackers: RwLock<HashMap<Vec<KeyValue>, Arc<T::AtomicTracker>>>,
/// Number of different attribute set stored in the `trackers` map.
count: AtomicUsize,
/// Indicates whether a value with no attributes has been stored.
has_no_attribute_value: AtomicBool,
/// Tracker for values with no attributes attached.
no_attribute_tracker: T::AtomicTracker,
phantom: PhantomData<O>,
}

impl<T: Number<T>, O> Default for ValueMap<T, O> {
fn default() -> Self {
ValueMap::new()
}
}

impl<T: Number<T>, O> ValueMap<T, O> {
fn new() -> Self {
ValueMap {
trackers: RwLock::new(HashMap::new()),
has_no_attribute_value: AtomicBool::new(false),
no_attribute_tracker: T::new_atomic_tracker(),
count: AtomicUsize::new(0),
phantom: PhantomData,
}
}
}

impl<T: Number<T>, O: Operation> ValueMap<T, O> {
fn measure(&self, measurement: T, attributes: &[KeyValue]) {
if attributes.is_empty() {
O::update_tracker(&self.no_attribute_tracker, measurement);
self.has_no_attribute_value.store(true, Ordering::Release);
return;
}

let Ok(trackers) = self.trackers.read() else {
return;
};

// Try to retrieve and update the tracker with the attributes in the provided order first
if let Some(tracker) = trackers.get(attributes) {
O::update_tracker(&**tracker, measurement);
return;
}

// Try to retrieve and update the tracker with the attributes sorted.
let sorted_attrs = AttributeSet::from(attributes).into_vec();
if let Some(tracker) = trackers.get(sorted_attrs.as_slice()) {
O::update_tracker(&**tracker, measurement);
return;
}

// Give up the read lock before acquiring the write lock.
drop(trackers);

let Ok(mut trackers) = self.trackers.write() else {
return;
};

// Recheck both the provided and sorted orders after acquiring the write lock
// in case another thread has pushed an update in the meantime.
if let Some(tracker) = trackers.get(attributes) {
O::update_tracker(&**tracker, measurement);
} else if let Some(tracker) = trackers.get(sorted_attrs.as_slice()) {
O::update_tracker(&**tracker, measurement);
} else if is_under_cardinality_limit(self.count.load(Ordering::SeqCst)) {
let new_tracker = Arc::new(T::new_atomic_tracker());
O::update_tracker(&*new_tracker, measurement);

// Insert tracker with the attributes in the provided and sorted orders
trackers.insert(attributes.to_vec(), new_tracker.clone());
trackers.insert(sorted_attrs, new_tracker);

self.count.fetch_add(1, Ordering::SeqCst);
} else if let Some(overflow_value) = trackers.get(STREAM_OVERFLOW_ATTRIBUTES.as_slice()) {
O::update_tracker(&**overflow_value, measurement);
} else {
let new_tracker = T::new_atomic_tracker();
O::update_tracker(&new_tracker, measurement);
trackers.insert(STREAM_OVERFLOW_ATTRIBUTES.clone(), Arc::new(new_tracker));
global::handle_error(MetricsError::Other("Warning: Maximum data points for metric stream exceeded. Entry added to overflow. Subsequent overflows to same metric until next collect will not be logged.".into()));
}
}
}
use super::{Assign, Increment, ValueMap};
use super::{AtomicTracker, Number};

/// Summarizes a set of measurements made as their arithmetic sum.
pub(crate) struct Sum<T: Number<T>> {
Expand Down

0 comments on commit d583695

Please sign in to comment.