Skip to content

Commit

Permalink
enhancement: batch timestamped (passthrough) metrics for a short peri…
Browse files Browse the repository at this point in the history
…od of time before forwarding
  • Loading branch information
tobz committed Jan 16, 2025
1 parent 799f140 commit 1f1fdd8
Showing 1 changed file with 150 additions and 85 deletions.
235 changes: 150 additions & 85 deletions lib/saluki-components/src/transforms/aggregate/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,10 @@ use saluki_event::{metric::*, DataType, Event};
use saluki_metrics::MetricsBuilder;
use serde::Deserialize;
use smallvec::SmallVec;
use tokio::{select, time::interval_at};
use tokio::{
select,
time::{interval_at, sleep},
};
use tracing::{debug, error, trace};

mod telemetry;
Expand All @@ -33,7 +36,7 @@ const fn default_window_duration() -> Duration {
Duration::from_secs(10)
}

const fn default_flush_interval() -> Duration {
const fn default_primary_flush_interval() -> Duration {
Duration::from_secs(15)
}

Expand All @@ -45,10 +48,14 @@ const fn default_counter_expiry_seconds() -> Option<u64> {
Some(300)
}

const fn default_forward_timestamped_metrics() -> bool {
const fn default_passthrough_timestamped_metrics() -> bool {
true
}

const fn default_passthrough_flush_interval() -> Duration {
Duration::from_secs(2)
}

/// Aggregate transform.
///
/// Aggregates metrics into fixed-size windows, flushing them at a regular interval.
Expand Down Expand Up @@ -86,8 +93,8 @@ pub struct AggregateConfiguration {
/// systems, etc) and the frequency of updates (how often updates to a metric are emitted).
///
/// Defaults to 15 seconds.
#[serde(rename = "aggregate_flush_interval", default = "default_flush_interval")]
flush_interval: Duration,
#[serde(rename = "aggregate_flush_interval", default = "default_primary_flush_interval")]
primary_flush_interval: Duration,

/// Maximum number of contexts to aggregate per window.
///
Expand Down Expand Up @@ -131,7 +138,7 @@ pub struct AggregateConfiguration {
#[serde(alias = "dogstatsd_expiry_seconds", default = "default_counter_expiry_seconds")]
counter_expiry_seconds: Option<u64>,

/// Whether or not to immediately forward metrics with pre-defined timestamps.
/// Whether or not to immediately forward (passthrough) metrics with pre-defined timestamps.
///
/// When enabled, this causes the aggregator to immediately forward metrics that already have a timestamp present.
/// Only metrics without a timestamp will be aggregated. This can be useful when metrics are already pre-aggregated
Expand All @@ -141,9 +148,22 @@ pub struct AggregateConfiguration {
/// Defaults to `true`.
#[serde(
rename = "dogstatsd_no_aggregation_pipeline",
default = "default_forward_timestamped_metrics"
default = "default_passthrough_timestamped_metrics"
)]
passthrough_timestamped_metrics: bool,

/// How often to flush buffered passthrough metrics.
///
/// While passthrough metrics are not re-aggregated by the transform, they will still be temporarily buffered in
/// order to optimize the efficiency of processing them in the next component. This setting controls the maximum
/// amount of time that passthrough metrics will be buffered before being forwarded.
///
/// Defaults to 2 seconds.
#[serde(
rename = "aggregate_passthrough_flush_interval",
default = "default_passthrough_flush_interval"
)]
forward_timestamped_metrics: bool,
passthrough_flush_interval: Duration,

/// Histogram aggregation configuration.
///
Expand All @@ -163,27 +183,38 @@ impl AggregateConfiguration {
pub fn with_defaults() -> Self {
Self {
window_duration: default_window_duration(),
flush_interval: default_flush_interval(),
primary_flush_interval: default_primary_flush_interval(),
context_limit: default_context_limit(),
flush_open_windows: false,
counter_expiry_seconds: default_counter_expiry_seconds(),
forward_timestamped_metrics: default_forward_timestamped_metrics(),
passthrough_timestamped_metrics: default_passthrough_timestamped_metrics(),
passthrough_flush_interval: default_passthrough_flush_interval(),
hist_config: HistogramConfiguration::default(),
}
}
}

#[async_trait]
impl TransformBuilder for AggregateConfiguration {
async fn build(&self, _context: ComponentContext) -> Result<Box<dyn Transform + Send>, GenericError> {
async fn build(&self, context: ComponentContext) -> Result<Box<dyn Transform + Send>, GenericError> {
let metrics_builder = MetricsBuilder::from_component_context(context);
let telemetry = Telemetry::new(&metrics_builder);

let state = AggregationState::new(
self.window_duration,
self.context_limit,
self.counter_expiry_seconds.filter(|s| *s != 0).map(Duration::from_secs),
self.hist_config.clone(),
telemetry.clone(),
);

Ok(Box::new(Aggregate {
window_duration: self.window_duration,
flush_interval: self.flush_interval,
context_limit: self.context_limit,
state,
telemetry,
primary_flush_interval: self.primary_flush_interval,
flush_open_windows: self.flush_open_windows,
counter_expiry_seconds: self.counter_expiry_seconds,
forward_timestamped_metrics: self.forward_timestamped_metrics,
hist_config: self.hist_config.clone(),
passthrough_timestamped_metrics: self.passthrough_timestamped_metrics,
passthrough_flush_interval: self.passthrough_flush_interval,
}))
}

Expand Down Expand Up @@ -220,55 +251,48 @@ impl MemoryBounds for AggregateConfiguration {
}

pub struct Aggregate {
window_duration: Duration,
flush_interval: Duration,
context_limit: usize,
state: AggregationState,
telemetry: Telemetry,
primary_flush_interval: Duration,
flush_open_windows: bool,
counter_expiry_seconds: Option<u64>,
forward_timestamped_metrics: bool,
hist_config: HistogramConfiguration,
passthrough_timestamped_metrics: bool,
passthrough_flush_interval: Duration,
}

#[async_trait]
impl Transform for Aggregate {
async fn run(mut self: Box<Self>, mut context: TransformContext) -> Result<(), GenericError> {
let mut health = context.take_health_handle();

let metrics_builder = MetricsBuilder::from_component_context(context.component_context());
let telemetry = Telemetry::new(&metrics_builder);

let mut state = AggregationState::new(
self.window_duration,
self.context_limit,
self.counter_expiry_seconds.filter(|s| *s != 0).map(Duration::from_secs),
self.hist_config,
telemetry.clone(),
let mut primary_flush = interval_at(
tokio::time::Instant::now() + self.primary_flush_interval,
self.primary_flush_interval,
);
let mut final_primary_flush = false;

let mut flush = interval_at(tokio::time::Instant::now() + self.flush_interval, self.flush_interval);

let metrics_builder = MetricsBuilder::from_component_context(context.component_context());
let telemetry = Telemetry::new(&metrics_builder);
let passthrough_flush = sleep(self.passthrough_flush_interval);
let mut pending_passthrough_flush = false;
let mut passthrough_event_buffer = context.event_buffer_pool().acquire().await;

health.mark_ready();
debug!("Aggregation transform started.");

let mut final_flush = false;
tokio::pin!(passthrough_flush);

loop {
select! {
_ = health.live() => continue,
_ = flush.tick() => {
_ = primary_flush.tick() => {
// We've reached the end of the current window. Flush our aggregation state and forward the metrics
// onwards. Regardless of whether any metrics were aggregated, we always update the aggregation
// state to track the start time of the current aggregation window.
if !state.is_empty() {
if !self.state.is_empty() {
debug!("Flushing aggregated metrics...");

let should_flush_open_windows = final_flush && self.flush_open_windows;
let should_flush_open_windows = final_primary_flush && self.flush_open_windows;

let mut forwarder = context.forwarder().buffered().expect("default output should always exist");
if let Err(e) = state.flush(get_unix_timestamp(), should_flush_open_windows, &mut forwarder).await {
if let Err(e) = self.state.flush(get_unix_timestamp(), should_flush_open_windows, &mut forwarder).await {
error!(error = %e, "Failed to flush aggregation state.");
}

Expand All @@ -279,95 +303,136 @@ impl Transform for Aggregate {
}

// If this is the final flush, we break out of the loop.
if final_flush {
if final_primary_flush {
debug!("All aggregation complete.");
break
}
},
maybe_events = context.event_stream().next(), if !final_flush => match maybe_events {
_ = &mut passthrough_flush, if pending_passthrough_flush => {
// We've hit our deadline for batching up any timestamped metrics, so forward those now.
pending_passthrough_flush = false;
match forward_events(&mut passthrough_event_buffer, &context).await {
Ok(unaggregated_events) => debug!(unaggregated_events, "Forwarded events."),
Err(e) => error!(error = %e, "Failed to flush unaggregated events."),
}
},
maybe_events = context.event_stream().next(), if !final_primary_flush => match maybe_events {
Some(events) => {
trace!(events_len = events.len(), "Received events.");

let mut forwarder = context.forwarder().buffered().expect("default output should always exist");
let current_time = get_unix_timestamp();

for event in events {
if let Some(metric) = event.try_into_metric() {
let metric = if self.forward_timestamped_metrics {
// If we're configured to forward timestamped metrics immediately, then we need to
// try to handle any timestamped values in this metric. If we get back `Some(...)`,
// it's either the original metric because no values had timestamps _or_ it's a
// modified version of the metric after all timestamped values were split out and
// directly forwarded.
match handle_forward_timestamped_metric(metric, &mut forwarder, &telemetry).await {
Ok(None) => continue,
Ok(Some(metric)) => metric,
Err(e) => {
error!(error = %e, "Failed to handle timestamped metric.");
continue;
let metric = if self.passthrough_timestamped_metrics {
// Try splitting out any timestamped values, and if we have any, we'll buffer them
// separately and process the remaining nontimestamped metric (if any) by aggregating
// it like normal.
let (maybe_timestamped_metric, maybe_nontimestamped_metric) = try_split_timestamped_values(metric);

// If we have a timestamped metric, we'll buffer it for now so that we can
// efficiently batch them up and forward them a short time later.
//
// If the event buffer is full, however, then we'll simply flush immediately before
// continuing.
if let Some(timestamped_metric) = maybe_timestamped_metric {
if let Some(event) = passthrough_event_buffer.try_push(Event::Metric(timestamped_metric)) {
// Our current passthrough event buffer is full, so we need to forward
// it and replace it with a new event buffer.
match forward_events(&mut passthrough_event_buffer, &context).await {
Ok(unaggregated_events) => debug!(unaggregated_events, "Forwarded events."),
Err(e) => error!(error = %e, "Failed to flush unaggregated events."),
}

// Try to push the event again now that we have a new event buffer.
if passthrough_event_buffer.try_push(event).is_some() {
error!("Event buffer is full even after forwarding events. Dropping event.");
}
}

// If we haven't already, reset our passthrough flush timeout to start the clock
// on when we actually forward these passthrough metrics.
if !pending_passthrough_flush {
pending_passthrough_flush = true;
passthrough_flush.as_mut().reset(tokio::time::Instant::now() + self.passthrough_flush_interval);
}

self.telemetry.increment_passthrough_metrics();
}

// If we have an nontimestamped metric, we'll process it like normal. Otherwise,
// continue to the next event.
match maybe_nontimestamped_metric {
Some(metric) => metric,
None => continue,
}
} else {
metric
};

if !state.insert(current_time, metric) {
if !self.state.insert(current_time, metric) {
trace!("Dropping metric due to context limit.");
telemetry.increment_events_dropped();
self.telemetry.increment_events_dropped();
}
}
}

match forwarder.flush().await {
Ok(unaggregated_events) => debug!(unaggregated_events, "Forwarded events."),
Err(e) => error!(error = %e, "Failed to flush unaggregated events."),
}
},
None => {
// We've reached the end of our input stream, so mark ourselves for a final flush and reset the
// interval so it ticks immediately on the next loop iteration.
final_flush = true;

flush.reset_immediately();
final_primary_flush = true;
primary_flush.reset_immediately();

debug!("Aggregation transform stopping...");
}
},
}
}

// Do a final flush of any timestamped metrics that we've buffered up.
match forward_events(&mut passthrough_event_buffer, &context).await {
Ok(unaggregated_events) => debug!(unaggregated_events, "Forwarded events."),
Err(e) => error!(error = %e, "Failed to flush unaggregated events."),
}

debug!("Aggregation transform stopped.");

Ok(())
}
}

async fn handle_forward_timestamped_metric<O>(
mut metric: Metric, forwarder: &mut BufferedForwarder<'_, O>, telemetry: &Telemetry,
) -> Result<Option<Metric>, GenericError>
where
O: ObjectPool<Item = FixedSizeEventBuffer>,
{
fn try_split_timestamped_values(mut metric: Metric) -> (Option<Metric>, Option<Metric>) {
if metric.values().all_timestamped() {
// All the values are timestamped, so take and forward the metric as-is.
forwarder.push(Event::Metric(metric)).await?;

telemetry.increment_passthrough_metrics();

Ok(None)
(Some(metric), None)
} else if metric.values().any_timestamped() {
// Only _some_ of the values are timestamped, so split out those timestamped ones, forward them, and then hand
// back the now-modified original metric.
// Only _some_ of the values are timestamped, so we'll split the timestamped values into a new metric.
let new_metric_values = metric.values_mut().split_timestamped();
let new_metric = Metric::from_parts(metric.context().clone(), new_metric_values, metric.metadata().clone());
forwarder.push(Event::Metric(new_metric)).await?;

telemetry.increment_passthrough_metrics();

Ok(Some(metric))
(Some(new_metric), Some(metric))
} else {
// No timestamped values, so we need to aggregate this metric.
Ok(Some(metric))
(None, Some(metric))
}
}

async fn forward_events(
event_buffer: &mut FixedSizeEventBuffer, context: &TransformContext,
) -> Result<usize, GenericError> {
if !event_buffer.is_empty() {
let events_len = event_buffer.len();

// Acquire a new event buffer to replace the one we're about to forward, and swap them.
let new_event_buffer = context.event_buffer_pool().acquire().await;
let event_buffer = std::mem::replace(event_buffer, new_event_buffer);

context
.forwarder()
.forward_buffer(event_buffer)
.await
.map(|()| events_len)
} else {
Ok(0)
}
}

Expand Down

0 comments on commit 1f1fdd8

Please sign in to comment.