Skip to content

Commit

Permalink
enhancement: allow configuring maximum number of metrics per DD metri…
Browse files Browse the repository at this point in the history
…cs request payloads (#476)
  • Loading branch information
tobz authored Feb 5, 2025
1 parent bf76762 commit 4e8add1
Show file tree
Hide file tree
Showing 3 changed files with 80 additions and 15 deletions.
43 changes: 31 additions & 12 deletions lib/saluki-components/src/destinations/datadog/metrics/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use saluki_core::{
pooling::{FixedSizeObjectPool, ObjectPool},
};
use saluki_error::GenericError;
use saluki_event::DataType;
use saluki_event::{metric::Metric, DataType};
use saluki_io::buf::{BytesBuffer, FixedSizeVec, FrozenChunkedBytesBuffer};
use saluki_metrics::MetricsBuilder;
use serde::Deserialize;
Expand All @@ -24,6 +24,10 @@ use self::request_builder::{MetricsEndpoint, RequestBuilder};
const RB_BUFFER_POOL_COUNT: usize = 128;
const RB_BUFFER_POOL_BUF_SIZE: usize = 32_768;

const fn default_max_metrics_per_payload() -> usize {
10_000
}

/// Datadog Metrics destination.
///
/// Forwards metrics to the Datadog platform. It can handle both series and sketch metrics, and only utilizes the latest
Expand All @@ -45,6 +49,17 @@ pub struct DatadogMetricsConfiguration {

#[serde(skip)]
config_refresher: Option<RefreshableConfiguration>,

/// Maximum number of input metrics to encode into a single request payload.
///
/// This applies both to the series and sketches endpoints.
///
/// Defaults to 10,000.
#[serde(
rename = "serializer_max_metrics_per_payload",
default = "default_max_metrics_per_payload"
)]
max_metrics_per_payload: usize,
}

impl DatadogMetricsConfiguration {
Expand Down Expand Up @@ -78,8 +93,14 @@ impl DestinationBuilder for DatadogMetricsConfiguration {

// Create our request builders.
let rb_buffer_pool = create_request_builder_buffer_pool();
let series_request_builder = RequestBuilder::new(MetricsEndpoint::Series, rb_buffer_pool.clone()).await?;
let sketches_request_builder = RequestBuilder::new(MetricsEndpoint::Sketches, rb_buffer_pool).await?;
let series_request_builder = RequestBuilder::new(
MetricsEndpoint::Series,
rb_buffer_pool.clone(),
self.max_metrics_per_payload,
)
.await?;
let sketches_request_builder =
RequestBuilder::new(MetricsEndpoint::Sketches, rb_buffer_pool, self.max_metrics_per_payload).await?;

Ok(Box::new(DatadogMetrics {
series_request_builder,
Expand All @@ -104,19 +125,17 @@ impl MemoryBounds for DatadogMetricsConfiguration {
.with_single_value::<DatadogMetrics<FixedSizeObjectPool<BytesBuffer>>>("component struct")
// Capture the size of our buffer pool.
.with_fixed_amount("buffer pool", rb_buffer_pool_size)
// Capture the size of the scratch buffer which may grow up to the uncompressed limit.
.with_fixed_amount(
"series scratch buffer",
MetricsEndpoint::Series.uncompressed_size_limit(),
)
.with_fixed_amount(
"sketches scratch buffer",
MetricsEndpoint::Sketches.uncompressed_size_limit(),
)
// Capture the size of the requests channel.
//
// TODO: This type signature is _ugly_, and it would be nice to improve it somehow.
.with_array::<(usize, Request<FrozenChunkedBytesBuffer>)>("requests channel", 32);

builder
.firm()
// Capture the size of the "split re-encode" buffers in the request builders, which is where we keep owned
// versions of metrics that we encode in case we need to actually re-encode them during a split operation.
.with_array::<Metric>("series metrics split re-encode buffer", self.max_metrics_per_payload)
.with_array::<Metric>("sketch metrics split re-encode buffer", self.max_metrics_per_payload);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@ where
uncompressed_len: usize,
compressed_len_limit: usize,
uncompressed_len_limit: usize,
max_metrics_per_payload: usize,
encoded_metrics: Vec<Metric>,
}

Expand All @@ -127,7 +128,9 @@ where
O: ObjectPool<Item = BytesBuffer> + 'static,
{
/// Creates a new `RequestBuilder` for the given endpoint.
pub async fn new(endpoint: MetricsEndpoint, buffer_pool: O) -> Result<Self, RequestBuilderError> {
pub async fn new(
endpoint: MetricsEndpoint, buffer_pool: O, max_metrics_per_payload: usize,
) -> Result<Self, RequestBuilderError> {
let chunked_buffer_pool = ChunkedBytesBufferObjectPool::new(buffer_pool);
let compressor = create_compressor(&chunked_buffer_pool).await;
Ok(Self {
Expand All @@ -140,6 +143,7 @@ where
uncompressed_len: 0,
compressed_len_limit: endpoint.compressed_size_limit(),
uncompressed_len_limit: endpoint.uncompressed_size_limit(),
max_metrics_per_payload,
encoded_metrics: Vec::new(),
})
}
Expand Down Expand Up @@ -172,6 +176,11 @@ where
});
}

// Make sure we haven't hit the maximum number of metrics per payload.
if self.encoded_metrics.len() >= self.max_metrics_per_payload {
return Ok(Some(metric));
}

// Encode the metric and then see if it will fit into the current request payload.
//
// If not, we return the original metric, signaling to the caller that they need to flush the current request
Expand Down Expand Up @@ -671,7 +680,7 @@ mod tests {

// Create a regular ol' request builder with normal (un)compressed size limits.
let buffer_pool = create_request_builder_buffer_pool();
let mut request_builder = RequestBuilder::new(MetricsEndpoint::Series, buffer_pool)
let mut request_builder = RequestBuilder::new(MetricsEndpoint::Series, buffer_pool, usize::MAX)
.await
.expect("should not fail to create request builder");

Expand All @@ -693,4 +702,41 @@ mod tests {
let requests = request_builder.flush().await;
assert_eq!(requests.len(), 2);
}

#[tokio::test]
async fn obeys_max_metrics_per_payload() {
// Generate some simple metrics.
let counter1 = Metric::counter(("abcdefg", &["345", "678"][..]), 1.0);
let counter2 = Metric::counter(("hijklmn", &["9!@", "#$%"][..]), 1.0);
let counter3 = Metric::counter(("opqrstu", &["^&*", "()A"][..]), 1.0);

// Create a regular ol' request builder with normal (un)compressed size limits, and no limit on the number of
// metrics per payload.
//
// We should be able to encode the three metrics without issue.
let buffer_pool = create_request_builder_buffer_pool();
let mut request_builder = RequestBuilder::new(MetricsEndpoint::Series, buffer_pool, usize::MAX)
.await
.expect("should not fail to create request builder");

assert_eq!(None, request_builder.encode(counter1.clone()).await.unwrap());
assert_eq!(None, request_builder.encode(counter2.clone()).await.unwrap());
assert_eq!(None, request_builder.encode(counter3.clone()).await.unwrap());

// Now create a request builder with normal (un)compressed size limits, but a limit of 2 metrics per payload.
//
// We should only be able to encode two of the three metrics before we're signaled to flush.
let buffer_pool = create_request_builder_buffer_pool();
let mut request_builder = RequestBuilder::new(MetricsEndpoint::Series, buffer_pool, 2)
.await
.expect("should not fail to create request builder");

assert_eq!(None, request_builder.encode(counter1.clone()).await.unwrap());
assert_eq!(None, request_builder.encode(counter2.clone()).await.unwrap());
assert_eq!(Some(counter3.clone()), request_builder.encode(counter3).await.unwrap());

// Since we know we could fit the same three metrics in the first request builder when there was no limit on the
// number of metrics per payload, we know we're not being instructed to flush here due to hitting (un)compressed
// size limits.
}
}
2 changes: 1 addition & 1 deletion lib/saluki-event/src/metric/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ pub use self::value::{HistogramPoints, HistogramSummary, MetricValues, ScalarPoi
///
/// The metadata contains ancillary data related to the metric, such as the timestamp, sample rate, and origination
/// information like hostname and sender.
#[derive(Clone, Debug)]
#[derive(Clone, Debug, PartialEq)]
pub struct Metric {
context: Context,
values: MetricValues,
Expand Down

0 comments on commit 4e8add1

Please sign in to comment.