Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[APR-205] dogstatsd: add support for a configurable tag interceptor for filtering/augmenting metadata via tags #132

Merged
merged 2 commits into from
Jul 26, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
209 changes: 200 additions & 9 deletions lib/saluki-io/src/deser/codec/dogstatsd.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
use std::collections::HashSet;

use bytes::Buf;
use nom::{
branch::alt,
bytes::complete::{tag, take_while1},
Expand All @@ -18,7 +17,7 @@ use snafu::Snafu;
use saluki_core::topology::interconnect::EventBuffer;
use saluki_event::{metric::*, Event};

use crate::deser::Decoder;
use crate::{buf::ReadIoBuffer, deser::Decoder};

type NomParserError<'a> = nom::Err<nom::error::Error<&'a [u8]>>;

Expand Down Expand Up @@ -95,22 +94,26 @@ impl Default for DogstatsdCodecConfiguration {
///
/// [dsd]: https://docs.datadoghq.com/developers/dogstatsd/
#[derive(Clone, Debug)]
pub struct DogstatsdCodec {
pub struct DogstatsdCodec<TMI = ()> {
config: DogstatsdCodecConfiguration,
context_resolver: ContextResolver,
tag_metadata_interceptor: TMI,
codec_metrics: CodecMetrics,
}

impl DogstatsdCodec {
impl DogstatsdCodec<()> {
/// Creates a new `DogstatsdCodec` with the given context resolver, using a default configuration.
pub fn from_context_resolver(context_resolver: ContextResolver) -> Self {
Self {
config: DogstatsdCodecConfiguration::default(),
context_resolver,
tag_metadata_interceptor: (),
codec_metrics: CodecMetrics::new(),
}
}
}

impl<TMI> DogstatsdCodec<TMI> {
/// Sets the given configuration for the codec.
///
/// Different aspects of the codec's behavior (such as tag length, tag count, and timestamp parsing) can be
Expand All @@ -119,6 +122,24 @@ impl DogstatsdCodec {
Self {
config,
context_resolver: self.context_resolver,
tag_metadata_interceptor: self.tag_metadata_interceptor,
codec_metrics: self.codec_metrics,
}
}

/// Sets the given tag metadata interceptor to use.
///
/// The tag metadata interceptor is used to evaluate and potentially intercept raw tags on a metric prior to context
/// resolving. This can be used to generically drop tags as metrics enter the system, but is generally used to
/// filter out specific tags that are only used to set metadata on the metric, and aren't inherently present as a
/// way to facet the metric itself.
///
/// Defaults to a no-op interceptor, which retains all tags.
pub fn with_tag_metadata_interceptor<TMI2>(self, tag_metadata_interceptor: TMI2) -> DogstatsdCodec<TMI2> {
DogstatsdCodec {
config: self.config,
context_resolver: self.context_resolver,
tag_metadata_interceptor,
codec_metrics: self.codec_metrics,
}
}
Expand All @@ -143,17 +164,23 @@ impl<'a> From<NomParserError<'a>> for ParseError {
}
}

impl Decoder for DogstatsdCodec {
impl<TMI> Decoder for DogstatsdCodec<TMI>
where
TMI: TagMetadataInterceptor,
{
type Error = ParseError;

fn decode<B: Buf>(&mut self, buf: &mut B, events: &mut EventBuffer) -> Result<usize, Self::Error> {
fn decode<B: ReadIoBuffer>(&mut self, buf: &mut B, events: &mut EventBuffer) -> Result<usize, Self::Error> {
let data = buf.chunk();

// Decode the payload and get the representative parts of the metric.
let (remaining, (metric_name, tags_iter, values_iter, metadata)) = parse_dogstatsd(data, &self.config)?;
let (remaining, (metric_name, tags_iter, values_iter, mut metadata)) = parse_dogstatsd(data, &self.config)?;

// Build our filtered tag iterator, which we'll use to skip intercepted/dropped tags when building the context.
let filtered_tags_iter = TagFilterer::new(tags_iter.clone(), &self.tag_metadata_interceptor);

// Try resolving the context first, since we might need to bail if we can't.
let context_ref = ContextRef::from_name_and_tags(metric_name, tags_iter);
let context_ref = ContextRef::from_name_and_tags(metric_name, filtered_tags_iter);
let context = match self.context_resolver.resolve(context_ref) {
Some(context) => context,
None => {
Expand All @@ -163,6 +190,9 @@ impl Decoder for DogstatsdCodec {
}
};

// Update our metric metadata based on any tags we're configured to intercept.
update_metadata_from_tags(tags_iter.into_iter(), &self.tag_metadata_interceptor, &mut metadata);

// For each value we parsed, create a metric from it and add it to the events buffer.
//
// We reserve enough capacity in the event buffer for however many events we're going to add, since we can't
Expand Down Expand Up @@ -551,20 +581,156 @@ impl<'a> Iterator for ValueIter<'a> {
}
}

/// Action to take for a given tag.
#[derive(Debug, Clone, Copy, Eq, PartialEq)]
pub enum InterceptAction {
/// The tag should be passed through as-is.
Pass,

/// The tag should be intercepted for metadata purposes.
Intercept,

/// The tag should be dropped entirely.
Drop,
}

/// Evaluator for deciding how to handle a given tag prior to context resolving.
pub trait TagMetadataInterceptor: std::fmt::Debug {
/// Evaluate the given tag.
fn evaluate(&self, tag: &str) -> InterceptAction;

/// Intercept the given tag, updating the metric metadata based on it.
fn intercept(&self, tag: &str, metadata: &mut MetricMetadata);
}

impl TagMetadataInterceptor for () {
fn evaluate(&self, _tag: &str) -> InterceptAction {
InterceptAction::Pass
}

fn intercept(&self, _tag: &str, _metadata: &mut MetricMetadata) {}
}

impl<'a, T> TagMetadataInterceptor for &'a T
where
T: TagMetadataInterceptor,
{
fn evaluate(&self, tag: &str) -> InterceptAction {
(**self).evaluate(tag)
}

fn intercept(&self, tag: &str, metadata: &mut MetricMetadata) {
(**self).intercept(tag, metadata)
}
}

#[derive(Debug, Clone)]
struct TagFilterer<I, TMI> {
iter: I,
interceptor: TMI,
}

impl<I, TMI> TagFilterer<I, TMI> {
fn new(iter: I, interceptor: TMI) -> Self {
Self { iter, interceptor }
}
}

impl<'a, I, TMI> IntoIterator for TagFilterer<I, TMI>
where
I: IntoIterator<Item = &'a str> + Clone,
TMI: TagMetadataInterceptor,
{
type Item = I::Item;
type IntoIter = TagFiltererIter<I::IntoIter, TMI>;

fn into_iter(self) -> Self::IntoIter {
TagFiltererIter {
iter: self.iter.into_iter(),
interceptor: self.interceptor,
}
}
}

struct TagFiltererIter<I, TMI> {
iter: I,
interceptor: TMI,
}

impl<'a, I, TMI> Iterator for TagFiltererIter<I, TMI>
where
I: Iterator<Item = &'a str>,
TMI: TagMetadataInterceptor,
{
type Item = I::Item;

fn next(&mut self) -> Option<Self::Item> {
loop {
match self.iter.next() {
Some(tag) => match self.interceptor.evaluate(tag) {
InterceptAction::Pass => return Some(tag),
InterceptAction::Intercept | InterceptAction::Drop => continue,
},
None => return None,
}
}
}
}

fn update_metadata_from_tags<'a, I, TMI>(tags_iter: I, interceptor: &TMI, metadata: &mut MetricMetadata)
where
I: Iterator<Item = &'a str>,
TMI: TagMetadataInterceptor,
{
for tag in tags_iter {
if let InterceptAction::Intercept = interceptor.evaluate(tag) {
interceptor.intercept(tag, metadata)
}
}
}

#[cfg(test)]
mod tests {
use std::sync::Arc;

use nom::IResult;
use proptest::{collection::vec as arb_vec, prelude::*};
use saluki_context::{ContextRef, ContextResolver};
use saluki_core::{pooling::helpers::get_pooled_object_via_default, topology::interconnect::EventBuffer};
use saluki_event::{metric::*, Event};

use super::{parse_dogstatsd, DogstatsdCodecConfiguration};
use crate::deser::{codec::DogstatsdCodec, Decoder};

use super::{parse_dogstatsd, DogstatsdCodecConfiguration, InterceptAction, TagMetadataInterceptor};

enum OneOrMany<T> {
Single(T),
Multiple(Vec<T>),
}

#[derive(Debug)]
struct StaticInterceptor;

impl TagMetadataInterceptor for StaticInterceptor {
fn evaluate(&self, tag: &str) -> InterceptAction {
if tag.starts_with("host") {
InterceptAction::Intercept
} else if tag.starts_with("deprecated") {
InterceptAction::Drop
} else {
InterceptAction::Pass
}
}

fn intercept(&self, tag: &str, metadata: &mut MetricMetadata) {
if let Some((key, value)) = tag.split_once(':') {
if key == "host" {
metadata.set_hostname(Arc::from(value));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

unrelated question: why is hostname the only field the needs to be wrapped in an Arcfor MetricMetadata?

Copy link
Member Author

@tobz tobz Jul 24, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's the only string field, and to make MetricMetadata cloneable, at least cheaply... we wrap the hostname in Arc<T>.

Realistically, it should actually be MetaString so that we can intern it or inline it where possible, skipping a heap allocation entirely.

}
}
}
}

fn create_metric(name: &str, value: MetricValue) -> Metric {
create_metric_with_tags(name, &[], value)
}
Expand Down Expand Up @@ -957,6 +1123,31 @@ mod tests {
}
}

#[test]
fn tag_interceptor() {
let mut codec = DogstatsdCodec::from_context_resolver(ContextResolver::with_noop_interner())
.with_configuration(DogstatsdCodecConfiguration::default())
.with_tag_metadata_interceptor(StaticInterceptor);

let input = b"some_metric:1|c|#tag_a:should_pass,deprecated_tag_b:should_drop,host:should_intercept";
let mut event_buffer = get_pooled_object_via_default::<EventBuffer>();
let events_decoded = codec
.decode(&mut &input[..], &mut event_buffer)
.expect("should not fail to decode");
assert_eq!(events_decoded, 1);

let event = event_buffer.into_iter().next().expect("should have an event");
match event {
Event::Metric(metric) => {
let tags = metric.context().tags().into_iter().collect::<Vec<_>>();
assert_eq!(tags.len(), 1);
assert_eq!(tags[0], "tag_a:should_pass");
assert_eq!(metric.metadata().hostname(), Some("should_intercept"));
}
_ => unreachable!("should only have a single metric"),
}
}

proptest! {
#![proptest_config(ProptestConfig::with_cases(1000))]
#[test]
Expand Down