diff --git a/lib/saluki-components/src/destinations/datadog/metrics/mod.rs b/lib/saluki-components/src/destinations/datadog/metrics/mod.rs index df09bdbd..e3918044 100644 --- a/lib/saluki-components/src/destinations/datadog/metrics/mod.rs +++ b/lib/saluki-components/src/destinations/datadog/metrics/mod.rs @@ -8,7 +8,7 @@ use saluki_core::{ pooling::{FixedSizeObjectPool, ObjectPool}, }; use saluki_error::GenericError; -use saluki_event::DataType; +use saluki_event::{metric::Metric, DataType}; use saluki_io::buf::{BytesBuffer, FixedSizeVec, FrozenChunkedBytesBuffer}; use saluki_metrics::MetricsBuilder; use serde::Deserialize; @@ -24,6 +24,10 @@ use self::request_builder::{MetricsEndpoint, RequestBuilder}; const RB_BUFFER_POOL_COUNT: usize = 128; const RB_BUFFER_POOL_BUF_SIZE: usize = 32_768; +const fn default_max_metrics_per_payload() -> usize { + 10_000 +} + /// Datadog Metrics destination. /// /// Forwards metrics to the Datadog platform. It can handle both series and sketch metrics, and only utilizes the latest @@ -45,6 +49,17 @@ pub struct DatadogMetricsConfiguration { #[serde(skip)] config_refresher: Option, + + /// Maximum number of input metrics to encode into a single request payload. + /// + /// This applies both to the series and sketches endpoints. + /// + /// Defaults to 10,000. + #[serde( + rename = "serializer_max_metrics_per_payload", + default = "default_max_metrics_per_payload" + )] + max_metrics_per_payload: usize, } impl DatadogMetricsConfiguration { @@ -78,8 +93,14 @@ impl DestinationBuilder for DatadogMetricsConfiguration { // Create our request builders. let rb_buffer_pool = create_request_builder_buffer_pool(); - let series_request_builder = RequestBuilder::new(MetricsEndpoint::Series, rb_buffer_pool.clone()).await?; - let sketches_request_builder = RequestBuilder::new(MetricsEndpoint::Sketches, rb_buffer_pool).await?; + let series_request_builder = RequestBuilder::new( + MetricsEndpoint::Series, + rb_buffer_pool.clone(), + self.max_metrics_per_payload, + ) + .await?; + let sketches_request_builder = + RequestBuilder::new(MetricsEndpoint::Sketches, rb_buffer_pool, self.max_metrics_per_payload).await?; Ok(Box::new(DatadogMetrics { series_request_builder, @@ -104,19 +125,17 @@ impl MemoryBounds for DatadogMetricsConfiguration { .with_single_value::>>("component struct") // Capture the size of our buffer pool. .with_fixed_amount("buffer pool", rb_buffer_pool_size) - // Capture the size of the scratch buffer which may grow up to the uncompressed limit. - .with_fixed_amount( - "series scratch buffer", - MetricsEndpoint::Series.uncompressed_size_limit(), - ) - .with_fixed_amount( - "sketches scratch buffer", - MetricsEndpoint::Sketches.uncompressed_size_limit(), - ) // Capture the size of the requests channel. // // TODO: This type signature is _ugly_, and it would be nice to improve it somehow. .with_array::<(usize, Request)>("requests channel", 32); + + builder + .firm() + // Capture the size of the "split re-encode" buffers in the request builders, which is where we keep owned + // versions of metrics that we encode in case we need to actually re-encode them during a split operation. + .with_array::("series metrics split re-encode buffer", self.max_metrics_per_payload) + .with_array::("sketch metrics split re-encode buffer", self.max_metrics_per_payload); } } diff --git a/lib/saluki-components/src/destinations/datadog/metrics/request_builder.rs b/lib/saluki-components/src/destinations/datadog/metrics/request_builder.rs index 368ce7f8..b59f3816 100644 --- a/lib/saluki-components/src/destinations/datadog/metrics/request_builder.rs +++ b/lib/saluki-components/src/destinations/datadog/metrics/request_builder.rs @@ -119,6 +119,7 @@ where uncompressed_len: usize, compressed_len_limit: usize, uncompressed_len_limit: usize, + max_metrics_per_payload: usize, encoded_metrics: Vec, } @@ -127,7 +128,9 @@ where O: ObjectPool + 'static, { /// Creates a new `RequestBuilder` for the given endpoint. - pub async fn new(endpoint: MetricsEndpoint, buffer_pool: O) -> Result { + pub async fn new( + endpoint: MetricsEndpoint, buffer_pool: O, max_metrics_per_payload: usize, + ) -> Result { let chunked_buffer_pool = ChunkedBytesBufferObjectPool::new(buffer_pool); let compressor = create_compressor(&chunked_buffer_pool).await; Ok(Self { @@ -140,6 +143,7 @@ where uncompressed_len: 0, compressed_len_limit: endpoint.compressed_size_limit(), uncompressed_len_limit: endpoint.uncompressed_size_limit(), + max_metrics_per_payload, encoded_metrics: Vec::new(), }) } @@ -172,6 +176,11 @@ where }); } + // Make sure we haven't hit the maximum number of metrics per payload. + if self.encoded_metrics.len() >= self.max_metrics_per_payload { + return Ok(Some(metric)); + } + // Encode the metric and then see if it will fit into the current request payload. // // If not, we return the original metric, signaling to the caller that they need to flush the current request @@ -671,7 +680,7 @@ mod tests { // Create a regular ol' request builder with normal (un)compressed size limits. let buffer_pool = create_request_builder_buffer_pool(); - let mut request_builder = RequestBuilder::new(MetricsEndpoint::Series, buffer_pool) + let mut request_builder = RequestBuilder::new(MetricsEndpoint::Series, buffer_pool, usize::MAX) .await .expect("should not fail to create request builder"); @@ -693,4 +702,41 @@ mod tests { let requests = request_builder.flush().await; assert_eq!(requests.len(), 2); } + + #[tokio::test] + async fn obeys_max_metrics_per_payload() { + // Generate some simple metrics. + let counter1 = Metric::counter(("abcdefg", &["345", "678"][..]), 1.0); + let counter2 = Metric::counter(("hijklmn", &["9!@", "#$%"][..]), 1.0); + let counter3 = Metric::counter(("opqrstu", &["^&*", "()A"][..]), 1.0); + + // Create a regular ol' request builder with normal (un)compressed size limits, and no limit on the number of + // metrics per payload. + // + // We should be able to encode the three metrics without issue. + let buffer_pool = create_request_builder_buffer_pool(); + let mut request_builder = RequestBuilder::new(MetricsEndpoint::Series, buffer_pool, usize::MAX) + .await + .expect("should not fail to create request builder"); + + assert_eq!(None, request_builder.encode(counter1.clone()).await.unwrap()); + assert_eq!(None, request_builder.encode(counter2.clone()).await.unwrap()); + assert_eq!(None, request_builder.encode(counter3.clone()).await.unwrap()); + + // Now create a request builder with normal (un)compressed size limits, but a limit of 2 metrics per payload. + // + // We should only be able to encode two of the three metrics before we're signaled to flush. + let buffer_pool = create_request_builder_buffer_pool(); + let mut request_builder = RequestBuilder::new(MetricsEndpoint::Series, buffer_pool, 2) + .await + .expect("should not fail to create request builder"); + + assert_eq!(None, request_builder.encode(counter1.clone()).await.unwrap()); + assert_eq!(None, request_builder.encode(counter2.clone()).await.unwrap()); + assert_eq!(Some(counter3.clone()), request_builder.encode(counter3).await.unwrap()); + + // Since we know we could fit the same three metrics in the first request builder when there was no limit on the + // number of metrics per payload, we know we're not being instructed to flush here due to hitting (un)compressed + // size limits. + } } diff --git a/lib/saluki-event/src/metric/mod.rs b/lib/saluki-event/src/metric/mod.rs index 2e2926ab..27cc4ff4 100644 --- a/lib/saluki-event/src/metric/mod.rs +++ b/lib/saluki-event/src/metric/mod.rs @@ -30,7 +30,7 @@ pub use self::value::{HistogramPoints, HistogramSummary, MetricValues, ScalarPoi /// /// The metadata contains ancillary data related to the metric, such as the timestamp, sample rate, and origination /// information like hostname and sender. -#[derive(Clone, Debug)] +#[derive(Clone, Debug, PartialEq)] pub struct Metric { context: Context, values: MetricValues,