diff --git a/Cargo.lock b/Cargo.lock index c5af247e8be4..79e5ea7caaef 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1288,12 +1288,15 @@ dependencies = [ "flate2", "futures", "hyper 0.14.30", + "metrics", "nix 0.27.1", "notify", "num_cpus", + "once_cell", "opentelemetry", "opentelemetry_sdk", "postgres", + "prometheus", "regex", "remote_storage", "reqwest 0.12.4", diff --git a/compute_tools/Cargo.toml b/compute_tools/Cargo.toml index 91e0b9d5b87c..0bf4ed53d669 100644 --- a/compute_tools/Cargo.toml +++ b/compute_tools/Cargo.toml @@ -18,9 +18,11 @@ clap.workspace = true flate2.workspace = true futures.workspace = true hyper0 = { workspace = true, features = ["full"] } +metrics.workspace = true nix.workspace = true notify.workspace = true num_cpus.workspace = true +once_cell.workspace = true opentelemetry.workspace = true opentelemetry_sdk.workspace = true postgres.workspace = true @@ -39,6 +41,7 @@ tracing-subscriber.workspace = true tracing-utils.workspace = true thiserror.workspace = true url.workspace = true +prometheus.workspace = true compute_api.workspace = true utils.workspace = true diff --git a/compute_tools/src/http/api.rs b/compute_tools/src/http/api.rs index af35f71bf2d9..3677582c11ed 100644 --- a/compute_tools/src/http/api.rs +++ b/compute_tools/src/http/api.rs @@ -9,6 +9,7 @@ use crate::catalog::SchemaDumpError; use crate::catalog::{get_database_schema, get_dbs_and_roles}; use crate::compute::forward_termination_signal; use crate::compute::{ComputeNode, ComputeState, ParsedSpec}; +use crate::installed_extensions; use compute_api::requests::{ConfigurationRequest, ExtensionInstallRequest, SetRoleGrantsRequest}; use compute_api::responses::{ ComputeStatus, ComputeStatusResponse, ExtensionInstallResult, GenericAPIError, @@ -19,6 +20,8 @@ use anyhow::Result; use hyper::header::CONTENT_TYPE; use hyper::service::{make_service_fn, service_fn}; use hyper::{Body, Method, Request, Response, Server, StatusCode}; +use metrics::Encoder; +use metrics::TextEncoder; use tokio::task; use tracing::{debug, error, info, warn}; use tracing_utils::http::OtelName; @@ -65,6 +68,28 @@ async fn routes(req: Request, compute: &Arc) -> Response { + debug!("serving /metrics GET request"); + + let mut buffer = vec![]; + let metrics = installed_extensions::collect(); + let encoder = TextEncoder::new(); + encoder.encode(&metrics, &mut buffer).unwrap(); + + match Response::builder() + .status(StatusCode::OK) + .header(CONTENT_TYPE, encoder.format_type()) + .body(Body::from(buffer)) + { + Ok(response) => response, + Err(err) => { + let msg = format!("error handling /metrics request: {err}"); + error!(msg); + render_json_error(&msg, StatusCode::INTERNAL_SERVER_ERROR) + } + } + } // Collect Postgres current usage insights (&Method::GET, "/insights") => { info!("serving /insights GET request"); diff --git a/compute_tools/src/http/openapi_spec.yaml b/compute_tools/src/http/openapi_spec.yaml index 11eee6ccfd44..7b9a62c54569 100644 --- a/compute_tools/src/http/openapi_spec.yaml +++ b/compute_tools/src/http/openapi_spec.yaml @@ -37,6 +37,21 @@ paths: schema: $ref: "#/components/schemas/ComputeMetrics" + /metrics + get: + tags: + - Info + summary: Get compute node metrics in text format. + description: "" + operationId: getComputeMetrics + responses: + 200: + description: ComputeMetrics + content: + text/plain: + schema: + type: string + description: Metrics in text format. /insights: get: tags: diff --git a/compute_tools/src/installed_extensions.rs b/compute_tools/src/installed_extensions.rs index 877f99bff715..6dd55855db29 100644 --- a/compute_tools/src/installed_extensions.rs +++ b/compute_tools/src/installed_extensions.rs @@ -1,4 +1,5 @@ use compute_api::responses::{InstalledExtension, InstalledExtensions}; +use metrics::proto::MetricFamily; use std::collections::HashMap; use std::collections::HashSet; use tracing::info; @@ -8,6 +9,10 @@ use anyhow::Result; use postgres::{Client, NoTls}; use tokio::task; +use metrics::core::Collector; +use metrics::{register_uint_gauge_vec, UIntGaugeVec}; +use once_cell::sync::Lazy; + /// We don't reuse get_existing_dbs() just for code clarity /// and to make database listing query here more explicit. /// @@ -59,6 +64,12 @@ pub async fn get_installed_extensions(connstr: Url) -> Result Result Result<()> { "[NEON_EXT_STAT] {}", serde_json::to_string(&result).expect("failed to serialize extensions list") ); - Ok(()) } + +static INSTALLED_EXTENSIONS: Lazy = Lazy::new(|| { + register_uint_gauge_vec!( + "installed_extensions", + "Number of databases where the version of extension is installed", + &["extension_name", "version"] + ) + .expect("failed to define a metric") +}); + +pub fn collect() -> Vec { + INSTALLED_EXTENSIONS.collect() +} diff --git a/test_runner/fixtures/endpoint/http.py b/test_runner/fixtures/endpoint/http.py index ea8291c1e07f..db3723b7cc9a 100644 --- a/test_runner/fixtures/endpoint/http.py +++ b/test_runner/fixtures/endpoint/http.py @@ -46,3 +46,8 @@ def set_role_grants(self, database: str, role: str, schema: str, privileges: lis ) res.raise_for_status() return res.json() + + def metrics(self) -> str: + res = self.get(f"http://localhost:{self.port}/metrics") + res.raise_for_status() + return res.text diff --git a/test_runner/regress/test_installed_extensions.py b/test_runner/regress/test_installed_extensions.py index 4700db85eedd..54ce7c8340d1 100644 --- a/test_runner/regress/test_installed_extensions.py +++ b/test_runner/regress/test_installed_extensions.py @@ -1,6 +1,14 @@ +from __future__ import annotations + +import time from logging import info +from typing import TYPE_CHECKING + +from fixtures.log_helper import log +from fixtures.metrics import parse_metrics -from fixtures.neon_fixtures import NeonEnv +if TYPE_CHECKING: + from fixtures.neon_fixtures import NeonEnv def test_installed_extensions(neon_simple_env: NeonEnv): @@ -85,3 +93,52 @@ def test_installed_extensions(neon_simple_env: NeonEnv): assert ext["n_databases"] == 2 ext["versions"].sort() assert ext["versions"] == ["1.2", "1.3"] + + # check that /metrics endpoint is available + # ensure that we see the metric before and after restart + res = client.metrics() + info("Metrics: %s", res) + m = parse_metrics(res) + neon_m = m.query_all("installed_extensions", {"extension_name": "neon", "version": "1.2"}) + assert len(neon_m) == 1 + for sample in neon_m: + assert sample.value == 2 + neon_m = m.query_all("installed_extensions", {"extension_name": "neon", "version": "1.3"}) + assert len(neon_m) == 1 + for sample in neon_m: + assert sample.value == 1 + + endpoint.stop() + endpoint.start() + + timeout = 10 + while timeout > 0: + try: + res = client.metrics() + timeout = -1 + if len(parse_metrics(res).query_all("installed_extensions")) < 4: + # Assume that not all metrics that are collected yet + time.sleep(1) + timeout -= 1 + continue + except Exception: + log.exception("failed to get metrics, assume they are not collected yet") + time.sleep(1) + timeout -= 1 + continue + + assert ( + len(parse_metrics(res).query_all("installed_extensions")) >= 4 + ), "Not all metrics are collected" + + info("After restart metrics: %s", res) + m = parse_metrics(res) + neon_m = m.query_all("installed_extensions", {"extension_name": "neon", "version": "1.2"}) + assert len(neon_m) == 1 + for sample in neon_m: + assert sample.value == 1 + + neon_m = m.query_all("installed_extensions", {"extension_name": "neon", "version": "1.3"}) + assert len(neon_m) == 1 + for sample in neon_m: + assert sample.value == 1