Skip to content

Commit

Permalink
use larger, dedicated event buffers for no-agg data flow
Browse files Browse the repository at this point in the history
  • Loading branch information
tobz committed Jan 16, 2025
1 parent 1f1fdd8 commit af4d048
Showing 1 changed file with 58 additions and 15 deletions.
73 changes: 58 additions & 15 deletions lib/saluki-components/src/transforms/aggregate/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,9 @@ use saluki_context::Context;
use saluki_core::{
components::{transforms::*, ComponentContext},
observability::ComponentMetricsExt as _,
pooling::ObjectPool,
pooling::{ElasticObjectPool, ObjectPool},
topology::{
interconnect::{BufferedForwarder, FixedSizeEventBuffer},
interconnect::{BufferedForwarder, FixedSizeEventBuffer, FixedSizeEventBufferInner, Forwarder},
OutputDefinition,
},
};
Expand All @@ -32,6 +32,8 @@ use self::telemetry::Telemetry;
mod config;
use self::config::HistogramConfiguration;

const PASSTHROUGH_EVENT_BUFFERS_MAX: usize = 16;

const fn default_window_duration() -> Duration {
Duration::from_secs(10)
}
Expand All @@ -56,6 +58,10 @@ const fn default_passthrough_flush_interval() -> Duration {
Duration::from_secs(2)
}

const fn default_passthrough_event_buffer_len() -> usize {
2048
}

/// Aggregate transform.
///
/// Aggregates metrics into fixed-size windows, flushing them at a regular interval.
Expand Down Expand Up @@ -165,6 +171,19 @@ pub struct AggregateConfiguration {
)]
passthrough_flush_interval: Duration,

/// Length of event buffers used exclusive for passthrough metrics.
///
/// While passthrough metrics are not re-aggregated by the transform, they will still be temporarily buffered in
/// order to optimize the efficiency of processing them in the next component. This setting controls the maximum
/// number of passthrough metrics that can be buffered in a single batch before being forwarded.
///
/// Defaults to 2048.
#[serde(
rename = "dogstatsd_no_aggregation_pipeline_batch_size",
default = "default_passthrough_event_buffer_len"
)]
passthrough_event_buffer_len: usize,

/// Histogram aggregation configuration.
///
/// Controls the aggregates/percentiles that are generated for distributions in "histogram" mode (client-side
Expand All @@ -189,6 +208,7 @@ impl AggregateConfiguration {
counter_expiry_seconds: default_counter_expiry_seconds(),
passthrough_timestamped_metrics: default_passthrough_timestamped_metrics(),
passthrough_flush_interval: default_passthrough_flush_interval(),
passthrough_event_buffer_len: default_passthrough_event_buffer_len(),
hist_config: HistogramConfiguration::default(),
}
}
Expand All @@ -215,6 +235,7 @@ impl TransformBuilder for AggregateConfiguration {
flush_open_windows: self.flush_open_windows,
passthrough_timestamped_metrics: self.passthrough_timestamped_metrics,
passthrough_flush_interval: self.passthrough_flush_interval,
passthrough_event_buffer_len: self.passthrough_event_buffer_len,
}))
}

Expand All @@ -239,14 +260,22 @@ impl MemoryBounds for AggregateConfiguration {
//
// However, there could be many more values in a single metric, and we don't account for that.

let passthrough_event_buffer_min_elements = self.passthrough_event_buffer_len;
let passthrough_event_buffer_max_elements =
self.passthrough_event_buffer_len * (PASSTHROUGH_EVENT_BUFFERS_MAX - 1);

builder
.minimum()
// Capture the size of the heap allocation when the component is built.
.with_single_value::<Aggregate>();
.with_single_value::<Aggregate>()
// Lower bound of our passthrough event buffer object pool.
.with_array::<Event>(passthrough_event_buffer_min_elements);
builder
.firm()
// Account for the aggregation state map, where we map contexts to the merged metric.
.with_map::<Context, AggregatedMetric>(self.context_limit);
.with_map::<Context, AggregatedMetric>(self.context_limit)
// Upper bound of our passthrough event buffer object pool.
.with_array::<Event>(passthrough_event_buffer_max_elements);
}
}

Expand All @@ -257,6 +286,7 @@ pub struct Aggregate {
flush_open_windows: bool,
passthrough_timestamped_metrics: bool,
passthrough_flush_interval: Duration,
passthrough_event_buffer_len: usize,
}

#[async_trait]
Expand All @@ -272,7 +302,17 @@ impl Transform for Aggregate {

let passthrough_flush = sleep(self.passthrough_flush_interval);
let mut pending_passthrough_flush = false;
let mut passthrough_event_buffer = context.event_buffer_pool().acquire().await;

let passthrough_event_buffer_len = self.passthrough_event_buffer_len;
let (passthrough_event_buffer_pool, shrinker) = ElasticObjectPool::<FixedSizeEventBuffer>::with_builder(
"agg_passthrough_event_buffers",
1,
PASSTHROUGH_EVENT_BUFFERS_MAX,
move || FixedSizeEventBufferInner::with_capacity(passthrough_event_buffer_len),
);
tokio::spawn(shrinker);

let mut passthrough_event_buffer = passthrough_event_buffer_pool.acquire().await;

health.mark_ready();
debug!("Aggregation transform started.");
Expand Down Expand Up @@ -311,7 +351,7 @@ impl Transform for Aggregate {
_ = &mut passthrough_flush, if pending_passthrough_flush => {
// We've hit our deadline for batching up any timestamped metrics, so forward those now.
pending_passthrough_flush = false;
match forward_events(&mut passthrough_event_buffer, &context).await {
match forward_events(&mut passthrough_event_buffer, &passthrough_event_buffer_pool, context.forwarder()).await {
Ok(unaggregated_events) => debug!(unaggregated_events, "Forwarded events."),
Err(e) => error!(error = %e, "Failed to flush unaggregated events."),
}
Expand Down Expand Up @@ -339,7 +379,7 @@ impl Transform for Aggregate {
if let Some(event) = passthrough_event_buffer.try_push(Event::Metric(timestamped_metric)) {
// Our current passthrough event buffer is full, so we need to forward
// it and replace it with a new event buffer.
match forward_events(&mut passthrough_event_buffer, &context).await {
match forward_events(&mut passthrough_event_buffer, &passthrough_event_buffer_pool, context.forwarder()).await {
Ok(unaggregated_events) => debug!(unaggregated_events, "Forwarded events."),
Err(e) => error!(error = %e, "Failed to flush unaggregated events."),
}
Expand Down Expand Up @@ -390,7 +430,13 @@ impl Transform for Aggregate {
}

// Do a final flush of any timestamped metrics that we've buffered up.
match forward_events(&mut passthrough_event_buffer, &context).await {
match forward_events(
&mut passthrough_event_buffer,
&passthrough_event_buffer_pool,
context.forwarder(),
)
.await
{
Ok(unaggregated_events) => debug!(unaggregated_events, "Forwarded events."),
Err(e) => error!(error = %e, "Failed to flush unaggregated events."),
}
Expand All @@ -417,20 +463,17 @@ fn try_split_timestamped_values(mut metric: Metric) -> (Option<Metric>, Option<M
}

async fn forward_events(
event_buffer: &mut FixedSizeEventBuffer, context: &TransformContext,
event_buffer: &mut FixedSizeEventBuffer, object_pool: &ElasticObjectPool<FixedSizeEventBuffer>,
forwarder: &Forwarder,
) -> Result<usize, GenericError> {
if !event_buffer.is_empty() {
let events_len = event_buffer.len();

// Acquire a new event buffer to replace the one we're about to forward, and swap them.
let new_event_buffer = context.event_buffer_pool().acquire().await;
let new_event_buffer = object_pool.acquire().await;
let event_buffer = std::mem::replace(event_buffer, new_event_buffer);

context
.forwarder()
.forward_buffer(event_buffer)
.await
.map(|()| events_len)
forwarder.forward_buffer(event_buffer).await.map(|()| events_len)
} else {
Ok(0)
}
Expand Down

0 comments on commit af4d048

Please sign in to comment.