Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: enable gzip for prometheus query handlers and ignore NaN values in prometheus response #5576

Merged
merged 2 commits into from
Feb 20, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions src/servers/src/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ use snafu::{ensure, ResultExt};
use tokio::sync::oneshot::{self, Sender};
use tokio::sync::Mutex;
use tower::ServiceBuilder;
use tower_http::compression::CompressionLayer;
use tower_http::cors::{AllowOrigin, Any, CorsLayer};
use tower_http::decompression::RequestDecompressionLayer;
use tower_http::trace::TraceLayer;
Expand Down Expand Up @@ -990,6 +991,7 @@ impl HttpServer {
"/label/{label_name}/values",
routing::get(label_values_query),
)
.layer(ServiceBuilder::new().layer(CompressionLayer::new()))
.with_state(prometheus_handler)
}

Expand Down
34 changes: 20 additions & 14 deletions src/servers/src/http/result/prometheus_resp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -264,23 +264,29 @@ impl PrometheusJsonResponse {

// assemble rows
for row_index in 0..batch.num_rows() {
// retrieve tags
// TODO(ruihang): push table name `__metric__`
let mut tags = Vec::with_capacity(num_label_columns + 1);
tags.push(metric_name);
for (tag_column, tag_name) in tag_columns.iter().zip(tag_names.iter()) {
// TODO(ruihang): add test for NULL tag
if let Some(tag_value) = tag_column.get_data(row_index) {
tags.push((tag_name, tag_value));
// retrieve value
if let Some(v) = field_column.get_data(row_index) {
// ignore all NaN values to reduce the amount of data to be sent.
if v.is_nan() {
continue;
}
}

// retrieve timestamp
let timestamp_millis: i64 = timestamp_column.get_data(row_index).unwrap().into();
let timestamp = timestamp_millis as f64 / 1000.0;
// retrieve tags
// TODO(ruihang): push table name `__metric__`
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FYI: these two TODOs must be fixed. cc @WenyXu @waynexia

let mut tags = Vec::with_capacity(num_label_columns + 1);
tags.push(metric_name);
for (tag_column, tag_name) in tag_columns.iter().zip(tag_names.iter()) {
// TODO(ruihang): add test for NULL tag
if let Some(tag_value) = tag_column.get_data(row_index) {
tags.push((tag_name, tag_value));
}
}

// retrieve timestamp
let timestamp_millis: i64 =
timestamp_column.get_data(row_index).unwrap().into();
let timestamp = timestamp_millis as f64 / 1000.0;

// retrieve value
if let Some(v) = field_column.get_data(row_index) {
buffer
.entry(tags)
.or_default()
Expand Down
Loading