Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[APR-205] dogstatsd: optimize multi-value distribution decoding #135

Merged
merged 3 commits into from
Jul 31, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions lib/saluki-event/src/metric/value.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,18 @@ impl MetricValue {
Self::Distribution { sketch }
}

/// Creates a distribution from values in an iterator.
pub fn distribution_from_iter<I, E>(iter: I) -> Result<Self, E>
where
I: Iterator<Item = Result<f64, E>>,
{
let mut sketch = DDSketch::default();
for value in iter {
sketch.insert(value?);
}
Ok(Self::Distribution { sketch })
}

/// Merges another metric value into this one.
///
/// If both `self` and `other` are the same metric type, their values will be merged appropriately. If the metric
Expand Down
61 changes: 56 additions & 5 deletions lib/saluki-io/src/deser/codec/dogstatsd/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -673,6 +673,13 @@ impl<'a> ValueIter<'a> {

/// Returns the number of values in the iterator.
fn len(&self) -> usize {
if self.kind == ValueKind::Set || self.kind == ValueKind::Distribution {
// For sets, they can't be multi-value, so iterating over the value bytes is pointless. Likewise, for
// distributions, we take an optimization where we just read all of the values in one call and build a
// single distribution, so in both cases, we're only ever emitting a single metric.
return 1;
}

memchr::memchr_iter(b':', self.raw_values).count() + 1
}
}
Expand All @@ -699,6 +706,17 @@ impl<'a> Iterator for ValueIter<'a> {
return Some(value);
}

if self.kind == ValueKind::Distribution {
// For distributions, we try to optimize for when they're multi-value payloads by not generating one for
// each payload. Instead, we just create a lightweight iterator over the values here and then create the
// distribution in a single go.
let float_iter = FloatIter::new(self.raw_values);
let value = MetricValue::distribution_from_iter(float_iter);

self.raw_values = &[];
return Some(value);
}

// For all other metric types, we always parse the value as a double, so we do that first and then figure out
// what kind of `MetricValue` we need to emit.
let (raw_value, tail) = split_at_delimiter(self.raw_values, b':')?;
Expand All @@ -715,12 +733,42 @@ impl<'a> Iterator for ValueIter<'a> {
Some(Ok(match self.kind {
ValueKind::Counter => MetricValue::Counter { value },
ValueKind::Gauge => MetricValue::Gauge { value },
ValueKind::Distribution => MetricValue::distribution_from_value(value),
_ => unreachable!("set values should have been handled above"),
_ => unreachable!("set and distribution values should have been handled above"),
}))
}
}

struct FloatIter<'a> {
raw_values: &'a [u8],
}

impl<'a> FloatIter<'a> {
fn new(raw_values: &'a [u8]) -> Self {
Self { raw_values }
}
}

impl<'a> Iterator for FloatIter<'a> {
type Item = Result<f64, nom::Err<nom::error::Error<&'a [u8]>>>;

fn next(&mut self) -> Option<Self::Item> {
if self.raw_values.is_empty() {
return None;
}

let (raw_value, tail) = split_at_delimiter(self.raw_values, b':')?;
self.raw_values = tail;

// SAFETY: The caller that creates `ValueIter` is responsible for ensuring that the entire byte slice is valid
// UTF-8.
let value_s = unsafe { std::str::from_utf8_unchecked(raw_value) };
match value_s.parse::<f64>() {
Ok(value) => Some(Ok(value)),
Err(_) => Some(Err(nom::Err::Error(Error::new(raw_value, ErrorKind::Float)))),
}
}
}

/// Action to take for a given tag.
#[derive(Debug, Clone, Copy, Eq, PartialEq)]
pub enum InterceptAction {
Expand Down Expand Up @@ -929,8 +977,8 @@ mod tests {
values.iter().map(|value| gauge(name, *value)).collect()
}

fn distribution_multivalue(name: &str, values: &[f64]) -> Vec<Metric> {
values.iter().map(|value| distribution(name, *value)).collect()
fn distribution_multivalue(name: &str, values: &[f64]) -> Metric {
create_metric(name, MetricValue::distribution_from_values(values))
}

fn parse_dogstatsd_metric_with_default_config(input: &[u8]) -> IResult<&[u8], OneOrMany<Event>> {
Expand Down Expand Up @@ -1178,6 +1226,9 @@ mod tests {

// Special case where we check this for all three variants -- timers, histograms, and distributions -- since we
// treat them all the same when parsing.
//
// Additionally, we have an optimization to return a single distribution metric from multi-value payloads, so we
// also check here that only one metric is generated for multi-value timers/histograms/distributions.
let distribution_name = "my.distribution";
let distribution_values = [27.5, 4.20, 80.085];
let distribution_values_stringified = distribution_values.iter().map(|v| v.to_string()).collect::<Vec<_>>();
Expand All @@ -1191,7 +1242,7 @@ mod tests {
let distributions_expected = distribution_multivalue(distribution_name, &distribution_values);
let (remaining, distributions_actual) =
parse_dogstatsd_metric_with_default_config(distribution_raw.as_bytes()).unwrap();
check_basic_metric_multivalue_eq(distributions_expected, distributions_actual);
check_basic_metric_eq(distributions_expected, distributions_actual);
assert!(remaining.is_empty());
}
}
Expand Down