diff --git a/lib/saluki-event/src/metric/value.rs b/lib/saluki-event/src/metric/value.rs index ad313b6a..9dcbdb5d 100644 --- a/lib/saluki-event/src/metric/value.rs +++ b/lib/saluki-event/src/metric/value.rs @@ -81,6 +81,18 @@ impl MetricValue { Self::Distribution { sketch } } + /// Creates a distribution from values in an iterator. + pub fn distribution_from_iter(iter: I) -> Result + where + I: Iterator>, + { + let mut sketch = DDSketch::default(); + for value in iter { + sketch.insert(value?); + } + Ok(Self::Distribution { sketch }) + } + /// Merges another metric value into this one. /// /// If both `self` and `other` are the same metric type, their values will be merged appropriately. If the metric diff --git a/lib/saluki-io/src/deser/codec/dogstatsd/mod.rs b/lib/saluki-io/src/deser/codec/dogstatsd/mod.rs index 511129a9..4df6ad42 100644 --- a/lib/saluki-io/src/deser/codec/dogstatsd/mod.rs +++ b/lib/saluki-io/src/deser/codec/dogstatsd/mod.rs @@ -673,6 +673,13 @@ impl<'a> ValueIter<'a> { /// Returns the number of values in the iterator. fn len(&self) -> usize { + if self.kind == ValueKind::Set || self.kind == ValueKind::Distribution { + // For sets, they can't be multi-value, so iterating over the value bytes is pointless. Likewise, for + // distributions, we take an optimization where we just read all of the values in one call and build a + // single distribution, so in both cases, we're only ever emitting a single metric. + return 1; + } + memchr::memchr_iter(b':', self.raw_values).count() + 1 } } @@ -699,6 +706,17 @@ impl<'a> Iterator for ValueIter<'a> { return Some(value); } + if self.kind == ValueKind::Distribution { + // For distributions, we try to optimize for when they're multi-value payloads by not generating one for + // each payload. Instead, we just create a lightweight iterator over the values here and then create the + // distribution in a single go. + let float_iter = FloatIter::new(self.raw_values); + let value = MetricValue::distribution_from_iter(float_iter); + + self.raw_values = &[]; + return Some(value); + } + // For all other metric types, we always parse the value as a double, so we do that first and then figure out // what kind of `MetricValue` we need to emit. let (raw_value, tail) = split_at_delimiter(self.raw_values, b':')?; @@ -715,12 +733,42 @@ impl<'a> Iterator for ValueIter<'a> { Some(Ok(match self.kind { ValueKind::Counter => MetricValue::Counter { value }, ValueKind::Gauge => MetricValue::Gauge { value }, - ValueKind::Distribution => MetricValue::distribution_from_value(value), - _ => unreachable!("set values should have been handled above"), + _ => unreachable!("set and distribution values should have been handled above"), })) } } +struct FloatIter<'a> { + raw_values: &'a [u8], +} + +impl<'a> FloatIter<'a> { + fn new(raw_values: &'a [u8]) -> Self { + Self { raw_values } + } +} + +impl<'a> Iterator for FloatIter<'a> { + type Item = Result>>; + + fn next(&mut self) -> Option { + if self.raw_values.is_empty() { + return None; + } + + let (raw_value, tail) = split_at_delimiter(self.raw_values, b':')?; + self.raw_values = tail; + + // SAFETY: The caller that creates `ValueIter` is responsible for ensuring that the entire byte slice is valid + // UTF-8. + let value_s = unsafe { std::str::from_utf8_unchecked(raw_value) }; + match value_s.parse::() { + Ok(value) => Some(Ok(value)), + Err(_) => Some(Err(nom::Err::Error(Error::new(raw_value, ErrorKind::Float)))), + } + } +} + /// Action to take for a given tag. #[derive(Debug, Clone, Copy, Eq, PartialEq)] pub enum InterceptAction { @@ -929,8 +977,8 @@ mod tests { values.iter().map(|value| gauge(name, *value)).collect() } - fn distribution_multivalue(name: &str, values: &[f64]) -> Vec { - values.iter().map(|value| distribution(name, *value)).collect() + fn distribution_multivalue(name: &str, values: &[f64]) -> Metric { + create_metric(name, MetricValue::distribution_from_values(values)) } fn parse_dogstatsd_metric_with_default_config(input: &[u8]) -> IResult<&[u8], OneOrMany> { @@ -1178,6 +1226,9 @@ mod tests { // Special case where we check this for all three variants -- timers, histograms, and distributions -- since we // treat them all the same when parsing. + // + // Additionally, we have an optimization to return a single distribution metric from multi-value payloads, so we + // also check here that only one metric is generated for multi-value timers/histograms/distributions. let distribution_name = "my.distribution"; let distribution_values = [27.5, 4.20, 80.085]; let distribution_values_stringified = distribution_values.iter().map(|v| v.to_string()).collect::>(); @@ -1191,7 +1242,7 @@ mod tests { let distributions_expected = distribution_multivalue(distribution_name, &distribution_values); let (remaining, distributions_actual) = parse_dogstatsd_metric_with_default_config(distribution_raw.as_bytes()).unwrap(); - check_basic_metric_multivalue_eq(distributions_expected, distributions_actual); + check_basic_metric_eq(distributions_expected, distributions_actual); assert!(remaining.is_empty()); } }