Skip to content

Commit

Permalink
push passthrough logic into dedicated type + better telemetry
Browse files Browse the repository at this point in the history
  • Loading branch information
tobz committed Jan 17, 2025
1 parent 6c4a343 commit 49022d1
Show file tree
Hide file tree
Showing 2 changed files with 122 additions and 91 deletions.
189 changes: 103 additions & 86 deletions lib/saluki-components/src/transforms/aggregate/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -304,20 +304,12 @@ impl Transform for Aggregate {
let mut final_primary_flush = false;

let passthrough_flush = interval(Duration::from_secs(2));
let mut pending_passthrough_flush = false;
let mut pending_passthrough_batch_start = Instant::now();
let mut last_passthrough_processed_at = Instant::now();

let passthrough_event_buffer_len = self.passthrough_event_buffer_len;
let (passthrough_event_buffer_pool, shrinker) = ElasticObjectPool::<FixedSizeEventBuffer>::with_builder(
"agg_passthrough_event_buffers",
1,
PASSTHROUGH_EVENT_BUFFERS_MAX,
move || FixedSizeEventBufferInner::with_capacity(passthrough_event_buffer_len),
);
tokio::spawn(shrinker);

let mut passthrough_event_buffer = passthrough_event_buffer_pool.acquire().await;
let mut passthrough_batcher = PassthroughBatcher::new(
self.passthrough_event_buffer_len,
self.passthrough_idle_flush_timeout,
self.telemetry.clone(),
)
.await;

health.mark_ready();
debug!("Aggregation transform started.");
Expand All @@ -341,6 +333,8 @@ impl Transform for Aggregate {
error!(error = %e, "Failed to flush aggregation state.");
}

self.telemetry.increment_flushes();

match forwarder.flush().await {
Ok(aggregated_events) => debug!(aggregated_events, "Forwarded events."),
Err(e) => error!(error = %e, "Failed to flush aggregated events."),
Expand All @@ -353,22 +347,7 @@ impl Transform for Aggregate {
break
}
},
_ = passthrough_flush.tick() => {
// Check if our passthrough processing has been idle long enough to force a flush.
if pending_passthrough_flush && last_passthrough_processed_at.elapsed() >= self.passthrough_idle_flush_timeout {
let batch_duration = pending_passthrough_batch_start.elapsed();
self.telemetry.record_passthrough_batch_duration(batch_duration);

debug!("Passthrough processing exceeded idle flush timeout. Flushing...");

pending_passthrough_flush = false;

match forward_events(&mut passthrough_event_buffer, &passthrough_event_buffer_pool, context.forwarder()).await {
Ok(unaggregated_events) => debug!(unaggregated_events, "Forwarded events."),
Err(e) => error!(error = %e, "Failed to flush unaggregated events."),
}
}
},
_ = passthrough_flush.tick() => passthrough_batcher.try_flush(context.forwarder()).await,
maybe_events = context.event_stream().next(), if !final_primary_flush => match maybe_events {
Some(events) => {
trace!(events_len = events.len(), "Received events.");
Expand All @@ -380,39 +359,19 @@ impl Transform for Aggregate {
if let Some(metric) = event.try_into_metric() {
let metric = if self.passthrough_timestamped_metrics {
// Try splitting out any timestamped values, and if we have any, we'll buffer them
// separately and process the remaining nontimestamped metric (if any) by aggregating
// it like normal.
// separately and process the remaining nontimestamped metric (if any) by
// aggregating it like normal.
let (maybe_timestamped_metric, maybe_nontimestamped_metric) = try_split_timestamped_values(metric);

// If we have a timestamped metric, we'll buffer it for now so that we can
// efficiently batch them up and forward them a short time later.
//
// If the event buffer is full, however, then we'll simply flush immediately before
// continuing.
// If we have a timestamped metric, then batch it up out-of-band.
if let Some(timestamped_metric) = maybe_timestamped_metric {
if let Some(event) = passthrough_event_buffer.try_push(Event::Metric(timestamped_metric)) {
debug!("Passthrough event buffer was full. Flushing...");

// Our current passthrough event buffer is full, so we need to forward
// it and replace it with a new event buffer.
match forward_events(&mut passthrough_event_buffer, &passthrough_event_buffer_pool, context.forwarder()).await {
Ok(unaggregated_events) => debug!(unaggregated_events, "Forwarded events."),
Err(e) => error!(error = %e, "Failed to flush unaggregated events."),
}

// Try to push the event again now that we have a new event buffer.
if passthrough_event_buffer.try_push(event).is_some() {
error!("Event buffer is full even after forwarding events. Dropping event.");
}
}

passthrough_batcher.push_metric(timestamped_metric, context.forwarder()).await;
processed_passthrough_metrics = true;

self.telemetry.increment_passthrough_metrics();
}

// If we have an nontimestamped metric, we'll process it like normal. Otherwise,
// continue to the next event.
// If we have an nontimestamped metric, we'll process it like normal.
//
// Otherwise, continue to the next event.
match maybe_nontimestamped_metric {
Some(metric) => metric,
None => continue,
Expand All @@ -429,13 +388,7 @@ impl Transform for Aggregate {
}

if processed_passthrough_metrics {
// If this is the first time we've processed passthrough metrics since we last flushed any
// passthrough metrics, then mark ourselves as pending and track when this batch started.
if !pending_passthrough_flush {
pending_passthrough_batch_start = Instant::now();
pending_passthrough_flush = true;
}
last_passthrough_processed_at = Instant::now();
passthrough_batcher.update_last_processed_at();
}
},
None => {
Expand All @@ -451,16 +404,7 @@ impl Transform for Aggregate {
}

// Do a final flush of any timestamped metrics that we've buffered up.
match forward_events(
&mut passthrough_event_buffer,
&passthrough_event_buffer_pool,
context.forwarder(),
)
.await
{
Ok(unaggregated_events) => debug!(unaggregated_events, "Forwarded events."),
Err(e) => error!(error = %e, "Failed to flush unaggregated events."),
}
passthrough_batcher.try_flush(context.forwarder()).await;

debug!("Aggregation transform stopped.");

Expand All @@ -483,20 +427,93 @@ fn try_split_timestamped_values(mut metric: Metric) -> (Option<Metric>, Option<M
}
}

async fn forward_events(
event_buffer: &mut FixedSizeEventBuffer, object_pool: &ElasticObjectPool<FixedSizeEventBuffer>,
forwarder: &Forwarder,
) -> Result<usize, GenericError> {
if !event_buffer.is_empty() {
let events_len = event_buffer.len();
struct PassthroughBatcher {
buffer_pool: ElasticObjectPool<FixedSizeEventBuffer>,
active_buffer: FixedSizeEventBuffer,
active_buffer_start: Instant,
last_processed_at: Instant,
idle_flush_timeout: Duration,
telemetry: Telemetry,
}

// Acquire a new event buffer to replace the one we're about to forward, and swap them.
let new_event_buffer = object_pool.acquire().await;
let event_buffer = std::mem::replace(event_buffer, new_event_buffer);
impl PassthroughBatcher {
async fn new(event_buffer_len: usize, idle_flush_timeout: Duration, telemetry: Telemetry) -> Self {
let (buffer_pool, pool_shrinker) = ElasticObjectPool::<FixedSizeEventBuffer>::with_builder(
"agg_passthrough_event_buffers",
1,
PASSTHROUGH_EVENT_BUFFERS_MAX,
move || FixedSizeEventBufferInner::with_capacity(event_buffer_len),
);
tokio::spawn(pool_shrinker);

forwarder.forward_buffer(event_buffer).await.map(|()| events_len)
} else {
Ok(0)
let active_buffer = buffer_pool.acquire().await;

Self {
buffer_pool,
active_buffer,
active_buffer_start: Instant::now(),
last_processed_at: Instant::now(),
idle_flush_timeout,
telemetry,
}
}

async fn push_metric(&mut self, metric: Metric, forwarder: &Forwarder) {
// Try pushing the metric into our active buffer.
//
// If our active buffer is full, then we'll flush the buffer, grab a new one, and push the metric into it.
if let Some(event) = self.active_buffer.try_push(Event::Metric(metric)) {
debug!("Passthrough event buffer was full. Flushing...");
self.forward_events(forwarder).await;

if self.active_buffer.try_push(event).is_some() {
error!("Event buffer is full even after forwarding events. Dropping event.");
}
}

// If this is the first metric in the buffer, we've started a new batch, so track when it started.
if self.active_buffer.len() == 1 {
self.active_buffer_start = Instant::now();
}

self.telemetry.increment_passthrough_metrics();
}

fn update_last_processed_at(&mut self) {
// We expose this as a standalone method, rather than just doing it automatically in `push_metric`, because
// otherwise we might be calling this 10-20K times per second, instead of simply doing it after the end of each
// input event buffer in the transform's main loop, which should be much less frequent.
self.last_processed_at = Instant::now();
}

async fn try_flush(&mut self, forwarder: &Forwarder) {
// If our active buffer isn't empty, and we've exceeded our idle flush timeout, then flush the buffer.
if !self.active_buffer.is_empty() && self.last_processed_at.elapsed() >= self.idle_flush_timeout {
debug!("Passthrough processing exceeded idle flush timeout. Flushing...");

self.forward_events(forwarder).await;
}
}

async fn forward_events(&mut self, forwarder: &Forwarder) {
if !self.active_buffer.is_empty() {
let unaggregated_events = self.active_buffer.len();

// Track how long this batch was alive for.
let batch_duration = self.active_buffer_start.elapsed();
self.telemetry.record_passthrough_batch_duration(batch_duration);

self.telemetry.increment_passthrough_flushes();

// Swap our active buffer with a new, empty one, and then forward the old one.
let new_active_buffer = self.buffer_pool.acquire().await;
let old_active_buffer = std::mem::replace(&mut self.active_buffer, new_active_buffer);

match forwarder.forward_buffer(old_active_buffer).await {
Ok(()) => debug!(unaggregated_events, "Forwarded events."),
Err(e) => error!(error = %e, "Failed to flush unaggregated events."),
}
}
}
}

Expand Down
24 changes: 19 additions & 5 deletions lib/saluki-components/src/transforms/aggregate/telemetry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,10 @@ impl MetricTypedGauge {
pub struct Telemetry {
active_contexts: Gauge,
active_contexts_by_type: MetricTypedGauge,
passthrough_metrics: Counter,
events_dropped: Counter,
flushes: Counter,
passthrough_metrics: Counter,
passthrough_flushes: Counter,
passthrough_batch_duration: Histogram,
}

Expand All @@ -64,9 +66,11 @@ impl Telemetry {
Self {
active_contexts: builder.register_debug_gauge("aggregate_active_contexts"),
active_contexts_by_type: MetricTypedGauge::new(builder, "aggregate_active_contexts_by_type"),
passthrough_metrics: builder.register_debug_counter("aggregate_passthrough_metrics_total"),
events_dropped: builder
.register_debug_counter_with_tags("component_events_dropped_total", ["intentional:true"]),
flushes: builder.register_debug_counter("aggregate_flushes_total"),
passthrough_metrics: builder.register_debug_counter("aggregate_passthrough_metrics_total"),
passthrough_flushes: builder.register_debug_counter("aggregate_passthrough_flushes_total"),
passthrough_batch_duration: builder.register_debug_histogram("aggregate_passthrough_batch_duration_secs"),
}
}
Expand All @@ -76,8 +80,10 @@ impl Telemetry {
Self {
active_contexts: Gauge::noop(),
active_contexts_by_type: MetricTypedGauge::noop(),
passthrough_metrics: Counter::noop(),
events_dropped: Counter::noop(),
flushes: Counter::noop(),
passthrough_metrics: Counter::noop(),
passthrough_flushes: Counter::noop(),
passthrough_batch_duration: Histogram::noop(),
}
}
Expand All @@ -92,12 +98,20 @@ impl Telemetry {
self.active_contexts_by_type.for_values(values).decrement(1);
}

pub fn increment_events_dropped(&self) {
self.events_dropped.increment(1);
}

pub fn increment_flushes(&self) {
self.flushes.increment(1);
}

pub fn increment_passthrough_metrics(&self) {
self.passthrough_metrics.increment(1);
}

pub fn increment_events_dropped(&self) {
self.events_dropped.increment(1);
pub fn increment_passthrough_flushes(&self) {
self.passthrough_flushes.increment(1);
}

pub fn record_passthrough_batch_duration(&self, duration: Duration) {
Expand Down

0 comments on commit 49022d1

Please sign in to comment.