Skip to content

Commit

Permalink
[APR-205] dogstatsd: optimize multi-value distribution decoding (#135)
Browse files Browse the repository at this point in the history
  • Loading branch information
tobz authored Jul 31, 2024
1 parent 464a24a commit 90312b4
Show file tree
Hide file tree
Showing 2 changed files with 68 additions and 5 deletions.
12 changes: 12 additions & 0 deletions lib/saluki-event/src/metric/value.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,18 @@ impl MetricValue {
Self::Distribution { sketch }
}

/// Creates a distribution from values in an iterator.
pub fn distribution_from_iter<I, E>(iter: I) -> Result<Self, E>
where
I: Iterator<Item = Result<f64, E>>,
{
let mut sketch = DDSketch::default();
for value in iter {
sketch.insert(value?);
}
Ok(Self::Distribution { sketch })
}

/// Merges another metric value into this one.
///
/// If both `self` and `other` are the same metric type, their values will be merged appropriately. If the metric
Expand Down
61 changes: 56 additions & 5 deletions lib/saluki-io/src/deser/codec/dogstatsd/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -673,6 +673,13 @@ impl<'a> ValueIter<'a> {

/// Returns the number of values in the iterator.
fn len(&self) -> usize {
if self.kind == ValueKind::Set || self.kind == ValueKind::Distribution {
// For sets, they can't be multi-value, so iterating over the value bytes is pointless. Likewise, for
// distributions, we take an optimization where we just read all of the values in one call and build a
// single distribution, so in both cases, we're only ever emitting a single metric.
return 1;
}

memchr::memchr_iter(b':', self.raw_values).count() + 1
}
}
Expand All @@ -699,6 +706,17 @@ impl<'a> Iterator for ValueIter<'a> {
return Some(value);
}

if self.kind == ValueKind::Distribution {
// For distributions, we try to optimize for when they're multi-value payloads by not generating one for
// each payload. Instead, we just create a lightweight iterator over the values here and then create the
// distribution in a single go.
let float_iter = FloatIter::new(self.raw_values);
let value = MetricValue::distribution_from_iter(float_iter);

self.raw_values = &[];
return Some(value);
}

// For all other metric types, we always parse the value as a double, so we do that first and then figure out
// what kind of `MetricValue` we need to emit.
let (raw_value, tail) = split_at_delimiter(self.raw_values, b':')?;
Expand All @@ -715,12 +733,42 @@ impl<'a> Iterator for ValueIter<'a> {
Some(Ok(match self.kind {
ValueKind::Counter => MetricValue::Counter { value },
ValueKind::Gauge => MetricValue::Gauge { value },
ValueKind::Distribution => MetricValue::distribution_from_value(value),
_ => unreachable!("set values should have been handled above"),
_ => unreachable!("set and distribution values should have been handled above"),
}))
}
}

struct FloatIter<'a> {
raw_values: &'a [u8],
}

impl<'a> FloatIter<'a> {
fn new(raw_values: &'a [u8]) -> Self {
Self { raw_values }
}
}

impl<'a> Iterator for FloatIter<'a> {
type Item = Result<f64, nom::Err<nom::error::Error<&'a [u8]>>>;

fn next(&mut self) -> Option<Self::Item> {
if self.raw_values.is_empty() {
return None;
}

let (raw_value, tail) = split_at_delimiter(self.raw_values, b':')?;
self.raw_values = tail;

// SAFETY: The caller that creates `ValueIter` is responsible for ensuring that the entire byte slice is valid
// UTF-8.
let value_s = unsafe { std::str::from_utf8_unchecked(raw_value) };
match value_s.parse::<f64>() {
Ok(value) => Some(Ok(value)),
Err(_) => Some(Err(nom::Err::Error(Error::new(raw_value, ErrorKind::Float)))),
}
}
}

/// Action to take for a given tag.
#[derive(Debug, Clone, Copy, Eq, PartialEq)]
pub enum InterceptAction {
Expand Down Expand Up @@ -929,8 +977,8 @@ mod tests {
values.iter().map(|value| gauge(name, *value)).collect()
}

fn distribution_multivalue(name: &str, values: &[f64]) -> Vec<Metric> {
values.iter().map(|value| distribution(name, *value)).collect()
fn distribution_multivalue(name: &str, values: &[f64]) -> Metric {
create_metric(name, MetricValue::distribution_from_values(values))
}

fn parse_dogstatsd_metric_with_default_config(input: &[u8]) -> IResult<&[u8], OneOrMany<Event>> {
Expand Down Expand Up @@ -1178,6 +1226,9 @@ mod tests {

// Special case where we check this for all three variants -- timers, histograms, and distributions -- since we
// treat them all the same when parsing.
//
// Additionally, we have an optimization to return a single distribution metric from multi-value payloads, so we
// also check here that only one metric is generated for multi-value timers/histograms/distributions.
let distribution_name = "my.distribution";
let distribution_values = [27.5, 4.20, 80.085];
let distribution_values_stringified = distribution_values.iter().map(|v| v.to_string()).collect::<Vec<_>>();
Expand All @@ -1191,7 +1242,7 @@ mod tests {
let distributions_expected = distribution_multivalue(distribution_name, &distribution_values);
let (remaining, distributions_actual) =
parse_dogstatsd_metric_with_default_config(distribution_raw.as_bytes()).unwrap();
check_basic_metric_multivalue_eq(distributions_expected, distributions_actual);
check_basic_metric_eq(distributions_expected, distributions_actual);
assert!(remaining.is_empty());
}
}
Expand Down

0 comments on commit 90312b4

Please sign in to comment.