diff --git a/lib/saluki-components/src/transforms/aggregate/mod.rs b/lib/saluki-components/src/transforms/aggregate/mod.rs index 301a4239..615e4877 100644 --- a/lib/saluki-components/src/transforms/aggregate/mod.rs +++ b/lib/saluki-components/src/transforms/aggregate/mod.rs @@ -1,4 +1,7 @@ -use std::{num::NonZeroU64, time::Duration}; +use std::{ + num::NonZeroU64, + time::{Duration, Instant}, +}; use async_trait::async_trait; use hashbrown::{hash_map::Entry, HashMap}; @@ -8,9 +11,9 @@ use saluki_context::Context; use saluki_core::{ components::{transforms::*, ComponentContext}, observability::ComponentMetricsExt as _, - pooling::ObjectPool, + pooling::{ElasticObjectPool, ObjectPool}, topology::{ - interconnect::{BufferedForwarder, FixedSizeEventBuffer}, + interconnect::{BufferedForwarder, FixedSizeEventBuffer, FixedSizeEventBufferInner, Forwarder}, OutputDefinition, }, }; @@ -20,7 +23,10 @@ use saluki_event::{metric::*, DataType, Event}; use saluki_metrics::MetricsBuilder; use serde::Deserialize; use smallvec::SmallVec; -use tokio::{select, time::interval_at}; +use tokio::{ + select, + time::{interval, interval_at}, +}; use tracing::{debug, error, trace}; mod telemetry; @@ -29,11 +35,14 @@ use self::telemetry::Telemetry; mod config; use self::config::HistogramConfiguration; +const PASSTHROUGH_IDLE_FLUSH_CHECK_INTERVAL: Duration = Duration::from_secs(2); +const PASSTHROUGH_EVENT_BUFFERS_MAX: usize = 16; + const fn default_window_duration() -> Duration { Duration::from_secs(10) } -const fn default_flush_interval() -> Duration { +const fn default_primary_flush_interval() -> Duration { Duration::from_secs(15) } @@ -45,10 +54,18 @@ const fn default_counter_expiry_seconds() -> Option { Some(300) } -const fn default_forward_timestamped_metrics() -> bool { +const fn default_passthrough_timestamped_metrics() -> bool { true } +const fn default_passthrough_idle_flush_timeout() -> Duration { + Duration::from_secs(1) +} + +const fn default_passthrough_event_buffer_len() -> usize { + 2048 +} + /// Aggregate transform. /// /// Aggregates metrics into fixed-size windows, flushing them at a regular interval. @@ -86,8 +103,8 @@ pub struct AggregateConfiguration { /// systems, etc) and the frequency of updates (how often updates to a metric are emitted). /// /// Defaults to 15 seconds. - #[serde(rename = "aggregate_flush_interval", default = "default_flush_interval")] - flush_interval: Duration, + #[serde(rename = "aggregate_flush_interval", default = "default_primary_flush_interval")] + primary_flush_interval: Duration, /// Maximum number of contexts to aggregate per window. /// @@ -131,7 +148,7 @@ pub struct AggregateConfiguration { #[serde(alias = "dogstatsd_expiry_seconds", default = "default_counter_expiry_seconds")] counter_expiry_seconds: Option, - /// Whether or not to immediately forward metrics with pre-defined timestamps. + /// Whether or not to immediately forward (passthrough) metrics with pre-defined timestamps. /// /// When enabled, this causes the aggregator to immediately forward metrics that already have a timestamp present. /// Only metrics without a timestamp will be aggregated. This can be useful when metrics are already pre-aggregated @@ -141,9 +158,35 @@ pub struct AggregateConfiguration { /// Defaults to `true`. #[serde( rename = "dogstatsd_no_aggregation_pipeline", - default = "default_forward_timestamped_metrics" + default = "default_passthrough_timestamped_metrics" + )] + passthrough_timestamped_metrics: bool, + + /// How often to flush buffered passthrough metrics. + /// + /// While passthrough metrics are not re-aggregated by the transform, they will still be temporarily buffered in + /// order to optimize the efficiency of processing them in the next component. This setting controls the maximum + /// amount of time that passthrough metrics will be buffered before being forwarded. + /// + /// Defaults to 1 seconds. + #[serde( + rename = "aggregate_passthrough_idle_flush_timeout", + default = "default_passthrough_idle_flush_timeout" )] - forward_timestamped_metrics: bool, + passthrough_idle_flush_timeout: Duration, + + /// Length of event buffers used exclusive for passthrough metrics. + /// + /// While passthrough metrics are not re-aggregated by the transform, they will still be temporarily buffered in + /// order to optimize the efficiency of processing them in the next component. This setting controls the maximum + /// number of passthrough metrics that can be buffered in a single batch before being forwarded. + /// + /// Defaults to 2048. + #[serde( + rename = "dogstatsd_no_aggregation_pipeline_batch_size", + default = "default_passthrough_event_buffer_len" + )] + passthrough_event_buffer_len: usize, /// Histogram aggregation configuration. /// @@ -163,11 +206,13 @@ impl AggregateConfiguration { pub fn with_defaults() -> Self { Self { window_duration: default_window_duration(), - flush_interval: default_flush_interval(), + primary_flush_interval: default_primary_flush_interval(), context_limit: default_context_limit(), flush_open_windows: false, counter_expiry_seconds: default_counter_expiry_seconds(), - forward_timestamped_metrics: default_forward_timestamped_metrics(), + passthrough_timestamped_metrics: default_passthrough_timestamped_metrics(), + passthrough_idle_flush_timeout: default_passthrough_idle_flush_timeout(), + passthrough_event_buffer_len: default_passthrough_event_buffer_len(), hist_config: HistogramConfiguration::default(), } } @@ -175,15 +220,26 @@ impl AggregateConfiguration { #[async_trait] impl TransformBuilder for AggregateConfiguration { - async fn build(&self, _context: ComponentContext) -> Result, GenericError> { + async fn build(&self, context: ComponentContext) -> Result, GenericError> { + let metrics_builder = MetricsBuilder::from_component_context(context); + let telemetry = Telemetry::new(&metrics_builder); + + let state = AggregationState::new( + self.window_duration, + self.context_limit, + self.counter_expiry_seconds.filter(|s| *s != 0).map(Duration::from_secs), + self.hist_config.clone(), + telemetry.clone(), + ); + Ok(Box::new(Aggregate { - window_duration: self.window_duration, - flush_interval: self.flush_interval, - context_limit: self.context_limit, + state, + telemetry, + primary_flush_interval: self.primary_flush_interval, flush_open_windows: self.flush_open_windows, - counter_expiry_seconds: self.counter_expiry_seconds, - forward_timestamped_metrics: self.forward_timestamped_metrics, - hist_config: self.hist_config.clone(), + passthrough_timestamped_metrics: self.passthrough_timestamped_metrics, + passthrough_idle_flush_timeout: self.passthrough_idle_flush_timeout, + passthrough_event_buffer_len: self.passthrough_event_buffer_len, })) } @@ -208,10 +264,18 @@ impl MemoryBounds for AggregateConfiguration { // // However, there could be many more values in a single metric, and we don't account for that. + let passthrough_event_buffer_min_elements = self.passthrough_event_buffer_len; + let passthrough_event_buffer_max_elements = + self.passthrough_event_buffer_len * (PASSTHROUGH_EVENT_BUFFERS_MAX - 1); + builder .minimum() // Capture the size of the heap allocation when the component is built. - .with_single_value::("component struct"); + .with_single_value::("component struct") + .with_array::( + "passthrough event buffer pool (minimum)", + passthrough_event_buffer_min_elements, + ); builder .firm() // Account for the aggregation state map, where we map contexts to the merged metric. @@ -223,18 +287,20 @@ impl MemoryBounds for AggregateConfiguration { UsageExpr::struct_size::("aggregated metric"), ), UsageExpr::config("aggregate_context_limit", self.context_limit), - )); + )) + // Upper bound of our passthrough event buffer object pool. + .with_array::("passthrough event buffer pool", passthrough_event_buffer_max_elements); } } pub struct Aggregate { - window_duration: Duration, - flush_interval: Duration, - context_limit: usize, + state: AggregationState, + telemetry: Telemetry, + primary_flush_interval: Duration, flush_open_windows: bool, - counter_expiry_seconds: Option, - forward_timestamped_metrics: bool, - hist_config: HistogramConfiguration, + passthrough_timestamped_metrics: bool, + passthrough_idle_flush_timeout: Duration, + passthrough_event_buffer_len: usize, } #[async_trait] @@ -242,44 +308,44 @@ impl Transform for Aggregate { async fn run(mut self: Box, mut context: TransformContext) -> Result<(), GenericError> { let mut health = context.take_health_handle(); - let metrics_builder = MetricsBuilder::from_component_context(context.component_context()); - let telemetry = Telemetry::new(&metrics_builder); - - let mut state = AggregationState::new( - self.window_duration, - self.context_limit, - self.counter_expiry_seconds.filter(|s| *s != 0).map(Duration::from_secs), - self.hist_config, - telemetry.clone(), + let mut primary_flush = interval_at( + tokio::time::Instant::now() + self.primary_flush_interval, + self.primary_flush_interval, ); + let mut final_primary_flush = false; - let mut flush = interval_at(tokio::time::Instant::now() + self.flush_interval, self.flush_interval); - - let metrics_builder = MetricsBuilder::from_component_context(context.component_context()); - let telemetry = Telemetry::new(&metrics_builder); + let passthrough_flush = interval(PASSTHROUGH_IDLE_FLUSH_CHECK_INTERVAL); + let mut passthrough_batcher = PassthroughBatcher::new( + self.passthrough_event_buffer_len, + self.passthrough_idle_flush_timeout, + self.telemetry.clone(), + ) + .await; health.mark_ready(); debug!("Aggregation transform started."); - let mut final_flush = false; + tokio::pin!(passthrough_flush); loop { select! { _ = health.live() => continue, - _ = flush.tick() => { + _ = primary_flush.tick() => { // We've reached the end of the current window. Flush our aggregation state and forward the metrics // onwards. Regardless of whether any metrics were aggregated, we always update the aggregation // state to track the start time of the current aggregation window. - if !state.is_empty() { + if !self.state.is_empty() { debug!("Flushing aggregated metrics..."); - let should_flush_open_windows = final_flush && self.flush_open_windows; + let should_flush_open_windows = final_primary_flush && self.flush_open_windows; let mut forwarder = context.forwarder().buffered().expect("default output should always exist"); - if let Err(e) = state.flush(get_unix_timestamp(), should_flush_open_windows, &mut forwarder).await { + if let Err(e) = self.state.flush(get_unix_timestamp(), should_flush_open_windows, &mut forwarder).await { error!(error = %e, "Failed to flush aggregation state."); } + self.telemetry.increment_flushes(); + match forwarder.flush().await { Ok(aggregated_events) => debug!(aggregated_events, "Forwarded events."), Err(e) => error!(error = %e, "Failed to flush aggregated events."), @@ -287,56 +353,60 @@ impl Transform for Aggregate { } // If this is the final flush, we break out of the loop. - if final_flush { + if final_primary_flush { debug!("All aggregation complete."); break } }, - maybe_events = context.event_stream().next(), if !final_flush => match maybe_events { + _ = passthrough_flush.tick() => passthrough_batcher.try_flush(context.forwarder()).await, + maybe_events = context.event_stream().next(), if !final_primary_flush => match maybe_events { Some(events) => { trace!(events_len = events.len(), "Received events."); - let mut forwarder = context.forwarder().buffered().expect("default output should always exist"); let current_time = get_unix_timestamp(); + let mut processed_passthrough_metrics = false; for event in events { if let Some(metric) = event.try_into_metric() { - let metric = if self.forward_timestamped_metrics { - // If we're configured to forward timestamped metrics immediately, then we need to - // try to handle any timestamped values in this metric. If we get back `Some(...)`, - // it's either the original metric because no values had timestamps _or_ it's a - // modified version of the metric after all timestamped values were split out and - // directly forwarded. - match handle_forward_timestamped_metric(metric, &mut forwarder, &telemetry).await { - Ok(None) => continue, - Ok(Some(metric)) => metric, - Err(e) => { - error!(error = %e, "Failed to handle timestamped metric."); - continue; - } + let metric = if self.passthrough_timestamped_metrics { + // Try splitting out any timestamped values, and if we have any, we'll buffer them + // separately and process the remaining nontimestamped metric (if any) by + // aggregating it like normal. + let (maybe_timestamped_metric, maybe_nontimestamped_metric) = try_split_timestamped_values(metric); + + // If we have a timestamped metric, then batch it up out-of-band. + if let Some(timestamped_metric) = maybe_timestamped_metric { + passthrough_batcher.push_metric(timestamped_metric, context.forwarder()).await; + processed_passthrough_metrics = true; + } + + // If we have an nontimestamped metric, we'll process it like normal. + // + // Otherwise, continue to the next event. + match maybe_nontimestamped_metric { + Some(metric) => metric, + None => continue, } } else { metric }; - if !state.insert(current_time, metric) { + if !self.state.insert(current_time, metric) { trace!("Dropping metric due to context limit."); - telemetry.increment_events_dropped(); + self.telemetry.increment_events_dropped(); } } } - match forwarder.flush().await { - Ok(unaggregated_events) => debug!(unaggregated_events, "Forwarded events."), - Err(e) => error!(error = %e, "Failed to flush unaggregated events."), + if processed_passthrough_metrics { + passthrough_batcher.update_last_processed_at(); } }, None => { // We've reached the end of our input stream, so mark ourselves for a final flush and reset the // interval so it ticks immediately on the next loop iteration. - final_flush = true; - - flush.reset_immediately(); + final_primary_flush = true; + primary_flush.reset_immediately(); debug!("Aggregation transform stopping..."); } @@ -344,38 +414,119 @@ impl Transform for Aggregate { } } + // Do a final flush of any timestamped metrics that we've buffered up. + passthrough_batcher.try_flush(context.forwarder()).await; + debug!("Aggregation transform stopped."); Ok(()) } } -async fn handle_forward_timestamped_metric( - mut metric: Metric, forwarder: &mut BufferedForwarder<'_, O>, telemetry: &Telemetry, -) -> Result, GenericError> -where - O: ObjectPool, -{ +fn try_split_timestamped_values(mut metric: Metric) -> (Option, Option) { if metric.values().all_timestamped() { - // All the values are timestamped, so take and forward the metric as-is. - forwarder.push(Event::Metric(metric)).await?; - - telemetry.increment_passthrough_metrics(); - - Ok(None) + (Some(metric), None) } else if metric.values().any_timestamped() { - // Only _some_ of the values are timestamped, so split out those timestamped ones, forward them, and then hand - // back the now-modified original metric. + // Only _some_ of the values are timestamped, so we'll split the timestamped values into a new metric. let new_metric_values = metric.values_mut().split_timestamped(); let new_metric = Metric::from_parts(metric.context().clone(), new_metric_values, metric.metadata().clone()); - forwarder.push(Event::Metric(new_metric)).await?; - telemetry.increment_passthrough_metrics(); - - Ok(Some(metric)) + (Some(new_metric), Some(metric)) } else { // No timestamped values, so we need to aggregate this metric. - Ok(Some(metric)) + (None, Some(metric)) + } +} + +struct PassthroughBatcher { + buffer_pool: ElasticObjectPool, + active_buffer: FixedSizeEventBuffer, + active_buffer_start: Instant, + last_processed_at: Instant, + idle_flush_timeout: Duration, + telemetry: Telemetry, +} + +impl PassthroughBatcher { + async fn new(event_buffer_len: usize, idle_flush_timeout: Duration, telemetry: Telemetry) -> Self { + let (buffer_pool, pool_shrinker) = ElasticObjectPool::::with_builder( + "agg_passthrough_event_buffers", + 1, + PASSTHROUGH_EVENT_BUFFERS_MAX, + move || FixedSizeEventBufferInner::with_capacity(event_buffer_len), + ); + tokio::spawn(pool_shrinker); + + let active_buffer = buffer_pool.acquire().await; + + Self { + buffer_pool, + active_buffer, + active_buffer_start: Instant::now(), + last_processed_at: Instant::now(), + idle_flush_timeout, + telemetry, + } + } + + async fn push_metric(&mut self, metric: Metric, forwarder: &Forwarder) { + // Try pushing the metric into our active buffer. + // + // If our active buffer is full, then we'll flush the buffer, grab a new one, and push the metric into it. + if let Some(event) = self.active_buffer.try_push(Event::Metric(metric)) { + debug!("Passthrough event buffer was full. Flushing..."); + self.forward_events(forwarder).await; + + if self.active_buffer.try_push(event).is_some() { + error!("Event buffer is full even after forwarding events. Dropping event."); + self.telemetry.increment_events_dropped(); + return; + } + } + + // If this is the first metric in the buffer, we've started a new batch, so track when it started. + if self.active_buffer.len() == 1 { + self.active_buffer_start = Instant::now(); + } + + self.telemetry.increment_passthrough_metrics(); + } + + fn update_last_processed_at(&mut self) { + // We expose this as a standalone method, rather than just doing it automatically in `push_metric`, because + // otherwise we might be calling this 10-20K times per second, instead of simply doing it after the end of each + // input event buffer in the transform's main loop, which should be much less frequent. + self.last_processed_at = Instant::now(); + } + + async fn try_flush(&mut self, forwarder: &Forwarder) { + // If our active buffer isn't empty, and we've exceeded our idle flush timeout, then flush the buffer. + if !self.active_buffer.is_empty() && self.last_processed_at.elapsed() >= self.idle_flush_timeout { + debug!("Passthrough processing exceeded idle flush timeout. Flushing..."); + + self.forward_events(forwarder).await; + } + } + + async fn forward_events(&mut self, forwarder: &Forwarder) { + if !self.active_buffer.is_empty() { + let unaggregated_events = self.active_buffer.len(); + + // Track how long this batch was alive for. + let batch_duration = self.active_buffer_start.elapsed(); + self.telemetry.record_passthrough_batch_duration(batch_duration); + + self.telemetry.increment_passthrough_flushes(); + + // Swap our active buffer with a new, empty one, and then forward the old one. + let new_active_buffer = self.buffer_pool.acquire().await; + let old_active_buffer = std::mem::replace(&mut self.active_buffer, new_active_buffer); + + match forwarder.forward_buffer(old_active_buffer).await { + Ok(()) => debug!(unaggregated_events, "Forwarded events."), + Err(e) => error!(error = %e, "Failed to flush unaggregated events."), + } + } } } diff --git a/lib/saluki-components/src/transforms/aggregate/telemetry.rs b/lib/saluki-components/src/transforms/aggregate/telemetry.rs index c71ea823..c022ea32 100644 --- a/lib/saluki-components/src/transforms/aggregate/telemetry.rs +++ b/lib/saluki-components/src/transforms/aggregate/telemetry.rs @@ -1,4 +1,6 @@ -use metrics::{Counter, Gauge}; +use std::time::Duration; + +use metrics::{Counter, Gauge, Histogram}; use saluki_event::metric::MetricValues; use saluki_metrics::MetricsBuilder; @@ -52,8 +54,11 @@ impl MetricTypedGauge { pub struct Telemetry { active_contexts: Gauge, active_contexts_by_type: MetricTypedGauge, - passthrough_metrics: Counter, events_dropped: Counter, + flushes: Counter, + passthrough_metrics: Counter, + passthrough_flushes: Counter, + passthrough_batch_duration: Histogram, } impl Telemetry { @@ -61,9 +66,12 @@ impl Telemetry { Self { active_contexts: builder.register_debug_gauge("aggregate_active_contexts"), active_contexts_by_type: MetricTypedGauge::new(builder, "aggregate_active_contexts_by_type"), - passthrough_metrics: builder.register_debug_counter("aggregate_passthrough_metrics_total"), events_dropped: builder .register_debug_counter_with_tags("component_events_dropped_total", ["intentional:true"]), + flushes: builder.register_debug_counter("aggregate_flushes_total"), + passthrough_metrics: builder.register_debug_counter("aggregate_passthrough_metrics_total"), + passthrough_flushes: builder.register_debug_counter("aggregate_passthrough_flushes_total"), + passthrough_batch_duration: builder.register_debug_histogram("aggregate_passthrough_batch_duration_secs"), } } @@ -72,8 +80,11 @@ impl Telemetry { Self { active_contexts: Gauge::noop(), active_contexts_by_type: MetricTypedGauge::noop(), - passthrough_metrics: Counter::noop(), events_dropped: Counter::noop(), + flushes: Counter::noop(), + passthrough_metrics: Counter::noop(), + passthrough_flushes: Counter::noop(), + passthrough_batch_duration: Histogram::noop(), } } @@ -87,11 +98,23 @@ impl Telemetry { self.active_contexts_by_type.for_values(values).decrement(1); } + pub fn increment_events_dropped(&self) { + self.events_dropped.increment(1); + } + + pub fn increment_flushes(&self) { + self.flushes.increment(1); + } + pub fn increment_passthrough_metrics(&self) { self.passthrough_metrics.increment(1); } - pub fn increment_events_dropped(&self) { - self.events_dropped.increment(1); + pub fn increment_passthrough_flushes(&self) { + self.passthrough_flushes.increment(1); + } + + pub fn record_passthrough_batch_duration(&self, duration: Duration) { + self.passthrough_batch_duration.record(duration.as_secs_f64()); } }