Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

enhancement: allow configuring maximum number of metrics per DD metrics request payloads #476

Merged
merged 1 commit into from
Feb 5, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 31 additions & 12 deletions lib/saluki-components/src/destinations/datadog/metrics/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use saluki_core::{
pooling::{FixedSizeObjectPool, ObjectPool},
};
use saluki_error::GenericError;
use saluki_event::DataType;
use saluki_event::{metric::Metric, DataType};
use saluki_io::buf::{BytesBuffer, FixedSizeVec, FrozenChunkedBytesBuffer};
use saluki_metrics::MetricsBuilder;
use serde::Deserialize;
Expand All @@ -24,6 +24,10 @@ use self::request_builder::{MetricsEndpoint, RequestBuilder};
const RB_BUFFER_POOL_COUNT: usize = 128;
const RB_BUFFER_POOL_BUF_SIZE: usize = 32_768;

const fn default_max_metrics_per_payload() -> usize {
10_000
}

/// Datadog Metrics destination.
///
/// Forwards metrics to the Datadog platform. It can handle both series and sketch metrics, and only utilizes the latest
Expand All @@ -45,6 +49,17 @@ pub struct DatadogMetricsConfiguration {

#[serde(skip)]
config_refresher: Option<RefreshableConfiguration>,

/// Maximum number of input metrics to encode into a single request payload.
///
/// This applies both to the series and sketches endpoints.
///
/// Defaults to 10,000.
#[serde(
rename = "serializer_max_metrics_per_payload",
default = "default_max_metrics_per_payload"
)]
max_metrics_per_payload: usize,
}

impl DatadogMetricsConfiguration {
Expand Down Expand Up @@ -78,8 +93,14 @@ impl DestinationBuilder for DatadogMetricsConfiguration {

// Create our request builders.
let rb_buffer_pool = create_request_builder_buffer_pool();
let series_request_builder = RequestBuilder::new(MetricsEndpoint::Series, rb_buffer_pool.clone()).await?;
let sketches_request_builder = RequestBuilder::new(MetricsEndpoint::Sketches, rb_buffer_pool).await?;
let series_request_builder = RequestBuilder::new(
MetricsEndpoint::Series,
rb_buffer_pool.clone(),
self.max_metrics_per_payload,
)
.await?;
let sketches_request_builder =
RequestBuilder::new(MetricsEndpoint::Sketches, rb_buffer_pool, self.max_metrics_per_payload).await?;

Ok(Box::new(DatadogMetrics {
series_request_builder,
Expand All @@ -104,19 +125,17 @@ impl MemoryBounds for DatadogMetricsConfiguration {
.with_single_value::<DatadogMetrics<FixedSizeObjectPool<BytesBuffer>>>("component struct")
// Capture the size of our buffer pool.
.with_fixed_amount("buffer pool", rb_buffer_pool_size)
// Capture the size of the scratch buffer which may grow up to the uncompressed limit.
.with_fixed_amount(
"series scratch buffer",
MetricsEndpoint::Series.uncompressed_size_limit(),
)
.with_fixed_amount(
"sketches scratch buffer",
MetricsEndpoint::Sketches.uncompressed_size_limit(),
)
// Capture the size of the requests channel.
//
// TODO: This type signature is _ugly_, and it would be nice to improve it somehow.
.with_array::<(usize, Request<FrozenChunkedBytesBuffer>)>("requests channel", 32);

builder
.firm()
// Capture the size of the "split re-encode" buffers in the request builders, which is where we keep owned
// versions of metrics that we encode in case we need to actually re-encode them during a split operation.
.with_array::<Metric>("series metrics split re-encode buffer", self.max_metrics_per_payload)
.with_array::<Metric>("sketch metrics split re-encode buffer", self.max_metrics_per_payload);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@ where
uncompressed_len: usize,
compressed_len_limit: usize,
uncompressed_len_limit: usize,
max_metrics_per_payload: usize,
encoded_metrics: Vec<Metric>,
}

Expand All @@ -127,7 +128,9 @@ where
O: ObjectPool<Item = BytesBuffer> + 'static,
{
/// Creates a new `RequestBuilder` for the given endpoint.
pub async fn new(endpoint: MetricsEndpoint, buffer_pool: O) -> Result<Self, RequestBuilderError> {
pub async fn new(
endpoint: MetricsEndpoint, buffer_pool: O, max_metrics_per_payload: usize,
) -> Result<Self, RequestBuilderError> {
let chunked_buffer_pool = ChunkedBytesBufferObjectPool::new(buffer_pool);
let compressor = create_compressor(&chunked_buffer_pool).await;
Ok(Self {
Expand All @@ -140,6 +143,7 @@ where
uncompressed_len: 0,
compressed_len_limit: endpoint.compressed_size_limit(),
uncompressed_len_limit: endpoint.uncompressed_size_limit(),
max_metrics_per_payload,
encoded_metrics: Vec::new(),
})
}
Expand Down Expand Up @@ -172,6 +176,11 @@ where
});
}

// Make sure we haven't hit the maximum number of metrics per payload.
if self.encoded_metrics.len() >= self.max_metrics_per_payload {
return Ok(Some(metric));
}

// Encode the metric and then see if it will fit into the current request payload.
//
// If not, we return the original metric, signaling to the caller that they need to flush the current request
Expand Down Expand Up @@ -671,7 +680,7 @@ mod tests {

// Create a regular ol' request builder with normal (un)compressed size limits.
let buffer_pool = create_request_builder_buffer_pool();
let mut request_builder = RequestBuilder::new(MetricsEndpoint::Series, buffer_pool)
let mut request_builder = RequestBuilder::new(MetricsEndpoint::Series, buffer_pool, usize::MAX)
.await
.expect("should not fail to create request builder");

Expand All @@ -693,4 +702,41 @@ mod tests {
let requests = request_builder.flush().await;
assert_eq!(requests.len(), 2);
}

#[tokio::test]
async fn obeys_max_metrics_per_payload() {
// Generate some simple metrics.
let counter1 = Metric::counter(("abcdefg", &["345", "678"][..]), 1.0);
let counter2 = Metric::counter(("hijklmn", &["9!@", "#$%"][..]), 1.0);
let counter3 = Metric::counter(("opqrstu", &["^&*", "()A"][..]), 1.0);

// Create a regular ol' request builder with normal (un)compressed size limits, and no limit on the number of
// metrics per payload.
//
// We should be able to encode the three metrics without issue.
let buffer_pool = create_request_builder_buffer_pool();
let mut request_builder = RequestBuilder::new(MetricsEndpoint::Series, buffer_pool, usize::MAX)
.await
.expect("should not fail to create request builder");

assert_eq!(None, request_builder.encode(counter1.clone()).await.unwrap());
assert_eq!(None, request_builder.encode(counter2.clone()).await.unwrap());
assert_eq!(None, request_builder.encode(counter3.clone()).await.unwrap());

// Now create a request builder with normal (un)compressed size limits, but a limit of 2 metrics per payload.
//
// We should only be able to encode two of the three metrics before we're signaled to flush.
let buffer_pool = create_request_builder_buffer_pool();
let mut request_builder = RequestBuilder::new(MetricsEndpoint::Series, buffer_pool, 2)
.await
.expect("should not fail to create request builder");

assert_eq!(None, request_builder.encode(counter1.clone()).await.unwrap());
assert_eq!(None, request_builder.encode(counter2.clone()).await.unwrap());
assert_eq!(Some(counter3.clone()), request_builder.encode(counter3).await.unwrap());

// Since we know we could fit the same three metrics in the first request builder when there was no limit on the
// number of metrics per payload, we know we're not being instructed to flush here due to hitting (un)compressed
// size limits.
}
}
2 changes: 1 addition & 1 deletion lib/saluki-event/src/metric/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ pub use self::value::{HistogramPoints, HistogramSummary, MetricValues, ScalarPoi
///
/// The metadata contains ancillary data related to the metric, such as the timestamp, sample rate, and origination
/// information like hostname and sender.
#[derive(Clone, Debug)]
#[derive(Clone, Debug, PartialEq)]
pub struct Metric {
context: Context,
values: MetricValues,
Expand Down
Loading