Skip to content

Commit

Permalink
really, truly try to mimic the Agent's no agg behavior
Browse files Browse the repository at this point in the history
  • Loading branch information
tobz committed Jan 17, 2025
1 parent 3a6c144 commit 3dc98ab
Showing 1 changed file with 35 additions and 23 deletions.
58 changes: 35 additions & 23 deletions lib/saluki-components/src/transforms/aggregate/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
use std::{num::NonZeroU64, time::Duration};
use std::{
num::NonZeroU64,
time::{Duration, Instant},
};

use async_trait::async_trait;
use hashbrown::{hash_map::Entry, HashMap};
Expand All @@ -22,7 +25,7 @@ use serde::Deserialize;
use smallvec::SmallVec;
use tokio::{
select,
time::{interval_at, sleep},
time::{interval, interval_at},
};
use tracing::{debug, error, trace};

Expand Down Expand Up @@ -54,8 +57,8 @@ const fn default_passthrough_timestamped_metrics() -> bool {
true
}

const fn default_passthrough_flush_interval() -> Duration {
Duration::from_secs(2)
const fn default_passthrough_idle_flush_timeout() -> Duration {
Duration::from_secs(1)
}

const fn default_passthrough_event_buffer_len() -> usize {
Expand Down Expand Up @@ -166,10 +169,10 @@ pub struct AggregateConfiguration {
///
/// Defaults to 2 seconds.
#[serde(
rename = "aggregate_passthrough_flush_interval",
default = "default_passthrough_flush_interval"
rename = "aggregate_passthrough_idle_flush_timeout",
default = "default_passthrough_idle_flush_timeout"
)]
passthrough_flush_interval: Duration,
passthrough_idle_flush_timeout: Duration,

/// Length of event buffers used exclusive for passthrough metrics.
///
Expand Down Expand Up @@ -207,7 +210,7 @@ impl AggregateConfiguration {
flush_open_windows: false,
counter_expiry_seconds: default_counter_expiry_seconds(),
passthrough_timestamped_metrics: default_passthrough_timestamped_metrics(),
passthrough_flush_interval: default_passthrough_flush_interval(),
passthrough_idle_flush_timeout: default_passthrough_idle_flush_timeout(),
passthrough_event_buffer_len: default_passthrough_event_buffer_len(),
hist_config: HistogramConfiguration::default(),
}
Expand All @@ -234,7 +237,7 @@ impl TransformBuilder for AggregateConfiguration {
primary_flush_interval: self.primary_flush_interval,
flush_open_windows: self.flush_open_windows,
passthrough_timestamped_metrics: self.passthrough_timestamped_metrics,
passthrough_flush_interval: self.passthrough_flush_interval,
passthrough_idle_flush_timeout: self.passthrough_idle_flush_timeout,
passthrough_event_buffer_len: self.passthrough_event_buffer_len,
}))
}
Expand Down Expand Up @@ -285,7 +288,7 @@ pub struct Aggregate {
primary_flush_interval: Duration,
flush_open_windows: bool,
passthrough_timestamped_metrics: bool,
passthrough_flush_interval: Duration,
passthrough_idle_flush_timeout: Duration,
passthrough_event_buffer_len: usize,
}

Expand All @@ -300,8 +303,9 @@ impl Transform for Aggregate {
);
let mut final_primary_flush = false;

let passthrough_flush = sleep(self.passthrough_flush_interval);
let passthrough_flush = interval(Duration::from_secs(2));
let mut pending_passthrough_flush = false;
let mut last_passthrough_processed_at = Instant::now();

let passthrough_event_buffer_len = self.passthrough_event_buffer_len;
let (passthrough_event_buffer_pool, shrinker) = ElasticObjectPool::<FixedSizeEventBuffer>::with_builder(
Expand Down Expand Up @@ -348,19 +352,25 @@ impl Transform for Aggregate {
break
}
},
_ = &mut passthrough_flush, if pending_passthrough_flush => {
// We've hit our deadline for batching up any timestamped metrics, so forward those now.
pending_passthrough_flush = false;
match forward_events(&mut passthrough_event_buffer, &passthrough_event_buffer_pool, context.forwarder()).await {
Ok(unaggregated_events) => debug!(unaggregated_events, "Forwarded events."),
Err(e) => error!(error = %e, "Failed to flush unaggregated events."),
_ = passthrough_flush.tick() => {
// Check if our passthrough processing has been idle long enough to force a flush.
if pending_passthrough_flush && last_passthrough_processed_at.elapsed() >= self.passthrough_idle_flush_timeout {
debug!("Passthrough processing exceeded idle flush timeout. Flushing...");

pending_passthrough_flush = false;

match forward_events(&mut passthrough_event_buffer, &passthrough_event_buffer_pool, context.forwarder()).await {
Ok(unaggregated_events) => debug!(unaggregated_events, "Forwarded events."),
Err(e) => error!(error = %e, "Failed to flush unaggregated events."),
}
}
},
maybe_events = context.event_stream().next(), if !final_primary_flush => match maybe_events {
Some(events) => {
trace!(events_len = events.len(), "Received events.");

let current_time = get_unix_timestamp();
let mut processed_passthrough_metrics = false;

for event in events {
if let Some(metric) = event.try_into_metric() {
Expand All @@ -377,6 +387,8 @@ impl Transform for Aggregate {
// continuing.
if let Some(timestamped_metric) = maybe_timestamped_metric {
if let Some(event) = passthrough_event_buffer.try_push(Event::Metric(timestamped_metric)) {
debug!("Passthrough event buffer was full. Flushing...");

// Our current passthrough event buffer is full, so we need to forward
// it and replace it with a new event buffer.
match forward_events(&mut passthrough_event_buffer, &passthrough_event_buffer_pool, context.forwarder()).await {
Expand All @@ -390,12 +402,7 @@ impl Transform for Aggregate {
}
}

// If we haven't already, reset our passthrough flush timeout to start the clock
// on when we actually forward these passthrough metrics.
if !pending_passthrough_flush {
pending_passthrough_flush = true;
passthrough_flush.as_mut().reset(tokio::time::Instant::now() + self.passthrough_flush_interval);
}
processed_passthrough_metrics = true;

self.telemetry.increment_passthrough_metrics();
}
Expand All @@ -416,6 +423,11 @@ impl Transform for Aggregate {
}
}
}

if processed_passthrough_metrics {
pending_passthrough_flush = true;
last_passthrough_processed_at = Instant::now();
}
},
None => {
// We've reached the end of our input stream, so mark ourselves for a final flush and reset the
Expand Down

0 comments on commit 3dc98ab

Please sign in to comment.