Skip to content

Commit

Permalink
make flush timeout configurable + add new telemetry for number of eve…
Browse files Browse the repository at this point in the history
…nts per request in DD metrics destination
  • Loading branch information
tobz committed Feb 14, 2025
1 parent 8fc82f9 commit 6fcd8d1
Show file tree
Hide file tree
Showing 3 changed files with 38 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,7 @@ async fn run_endpoint_io_loop<S, B>(
debug!(endpoint = endpoint_url, %status, "Request sent.");

telemetry.events_sent().increment(events as u64);
telemetry.events_sent_batch_size().record(events as f64);
} else {
telemetry.http_failed_send().increment(1);
telemetry.events_dropped_http().increment(events as u64);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use metrics::Counter;
use metrics::{Counter, Histogram};
use saluki_metrics::MetricsBuilder;

/// Component-specific telemetry.
Expand All @@ -8,6 +8,7 @@ use saluki_metrics::MetricsBuilder;
#[derive(Clone)]
pub struct ComponentTelemetry {
events_sent: Counter,
events_sent_batch_size: Histogram,
bytes_sent: Counter,
events_dropped_http: Counter,
events_dropped_encoder: Counter,
Expand All @@ -19,6 +20,7 @@ impl ComponentTelemetry {
pub fn from_builder(builder: &MetricsBuilder) -> Self {
Self {
events_sent: builder.register_debug_counter("component_events_sent_total"),
events_sent_batch_size: builder.register_debug_histogram("component_events_sent_batch_size"),
bytes_sent: builder.register_debug_counter("component_bytes_sent_total"),
events_dropped_http: builder.register_debug_counter_with_tags(
"component_events_dropped_total",
Expand All @@ -37,6 +39,10 @@ impl ComponentTelemetry {
&self.events_sent
}

pub fn events_sent_batch_size(&self) -> &Histogram {
&self.events_sent_batch_size
}

pub fn bytes_sent(&self) -> &Counter {
&self.bytes_sent
}
Expand Down
35 changes: 30 additions & 5 deletions lib/saluki-components/src/destinations/datadog/metrics/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,17 @@ use super::common::{config::ForwarderConfiguration, io::TransactionForwarder, te
mod request_builder;
use self::request_builder::{MetricsEndpoint, RequestBuilder};

const FLUSH_IDLE_TIMEOUT: Duration = Duration::from_secs(1);
const RB_BUFFER_POOL_COUNT: usize = 128;
const RB_BUFFER_POOL_BUF_SIZE: usize = 32_768;

const fn default_max_metrics_per_payload() -> usize {
10_000
}

const fn default_flush_timeout_secs() -> u64 {
2
}

/// Datadog Metrics destination.
///
/// Forwards metrics to the Datadog platform. It can handle both series and sketch metrics, and only utilizes the latest
Expand Down Expand Up @@ -63,6 +66,18 @@ pub struct DatadogMetricsConfiguration {
default = "default_max_metrics_per_payload"
)]
max_metrics_per_payload: usize,

/// Flush timeout for pending requests, in seconds.
///
/// When the destination has written metrics to the in-flight request payload, but it has not yet reached the
/// payload size limits that would force the payload to be flushed, the destination will wait for a period of time
/// before flushing the in-flight request payload. This allows for the possibility of other events to be processed
/// and written into the request payload, thereby maximizing the payload size and reducing the number of requests
/// generated and sent overall.
///
/// Defaults to 2 seconds.
#[serde(default = "default_flush_timeout_secs")]
flush_timeout_secs: u64,
}

impl DatadogMetricsConfiguration {
Expand Down Expand Up @@ -105,11 +120,19 @@ impl DestinationBuilder for DatadogMetricsConfiguration {
let sketches_request_builder =
RequestBuilder::new(MetricsEndpoint::Sketches, rb_buffer_pool, self.max_metrics_per_payload).await?;

let flush_timeout = match self.flush_timeout_secs {
// We always give ourselves a minimum flush timeout of 10ms to allow for some very minimal amount of
// batching, while still practically flushing things almost immediately.
0 => Duration::from_millis(10),
secs => Duration::from_secs(secs),
};

Ok(Box::new(DatadogMetrics {
series_request_builder,
sketches_request_builder,
forwarder,
telemetry,
flush_timeout,
}))
}
}
Expand Down Expand Up @@ -150,6 +173,7 @@ where
sketches_request_builder: RequestBuilder<O>,
forwarder: TransactionForwarder<FrozenChunkedBytesBuffer>,
telemetry: ComponentTelemetry,
flush_timeout: Duration,
}

#[allow(unused)]
Expand All @@ -164,6 +188,7 @@ where
mut sketches_request_builder,
forwarder,
telemetry,
flush_timeout,
} = *self;

let mut health = context.take_health_handle();
Expand All @@ -175,8 +200,8 @@ where
debug!("Datadog Metrics destination started.");

let mut pending_flush = false;
let mut flush_timeout = sleep(FLUSH_IDLE_TIMEOUT);
tokio::pin!(flush_timeout);
let mut pending_flush_timeout = sleep(flush_timeout);
tokio::pin!(pending_flush_timeout);

loop {
select! {
Expand Down Expand Up @@ -244,13 +269,13 @@ where

// If we're not already pending a flush, we'll start the countdown.
if !pending_flush {
flush_timeout.as_mut().reset(tokio::time::Instant::now() + FLUSH_IDLE_TIMEOUT);
pending_flush_timeout.as_mut().reset(tokio::time::Instant::now() + flush_timeout);
pending_flush = true;
}
},
None => break,
},
_ = &mut flush_timeout, if pending_flush => {
_ = &mut pending_flush_timeout, if pending_flush => {
debug!("Flushing pending request(s).");

pending_flush = false;
Expand Down

0 comments on commit 6fcd8d1

Please sign in to comment.