Skip to content

Commit

Permalink
pageserver: add tokio-epoll-uring slots waiters queue depth metrics (#…
Browse files Browse the repository at this point in the history
…9482)

In complement to
neondatabase/tokio-epoll-uring#56.

## Problem

We want to make tokio-epoll-uring slots waiters queue depth observable
via Prometheus.

## Summary of changes

- Add `pageserver_tokio_epoll_uring_slots_submission_queue_depth`
metrics as a `Histogram`.
- Each thread-local tokio-epoll-uring system is given a `LocalHistogram`
to observe the metrics.
- Keep a list of `Arc<ThreadLocalMetrics>` used on-demand to flush data
to the shared histogram.
- Extend `Collector::collect` to report
`pageserver_tokio_epoll_uring_slots_submission_queue_depth`.

Signed-off-by: Yuchen Liang <yuchen@neon.tech>
Co-authored-by: Christian Schwarz <christian@neon.tech>
  • Loading branch information
yliang412 and problame authored Oct 25, 2024
1 parent 76328ad commit 85b954f
Show file tree
Hide file tree
Showing 5 changed files with 129 additions and 10 deletions.
4 changes: 2 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions libs/metrics/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use once_cell::sync::Lazy;
use prometheus::core::{
Atomic, AtomicU64, Collector, GenericCounter, GenericCounterVec, GenericGauge, GenericGaugeVec,
};
pub use prometheus::local::LocalHistogram;
pub use prometheus::opts;
pub use prometheus::register;
pub use prometheus::Error;
Expand Down
115 changes: 112 additions & 3 deletions pageserver/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3040,13 +3040,111 @@ impl<F: Future<Output = Result<O, E>>, O, E> Future for MeasuredRemoteOp<F> {
}

pub mod tokio_epoll_uring {
use metrics::{register_int_counter, UIntGauge};
use std::{
collections::HashMap,
sync::{Arc, Mutex},
};

use metrics::{register_histogram, register_int_counter, Histogram, LocalHistogram, UIntGauge};
use once_cell::sync::Lazy;

/// Shared storage for tokio-epoll-uring thread local metrics.
pub(crate) static THREAD_LOCAL_METRICS_STORAGE: Lazy<ThreadLocalMetricsStorage> =
Lazy::new(|| {
let slots_submission_queue_depth = register_histogram!(
"pageserver_tokio_epoll_uring_slots_submission_queue_depth",
"The slots waiters queue depth of each tokio_epoll_uring system",
vec![1.0, 2.0, 4.0, 8.0, 16.0, 32.0, 64.0, 128.0, 256.0, 512.0, 1024.0],
)
.expect("failed to define a metric");
ThreadLocalMetricsStorage {
observers: Mutex::new(HashMap::new()),
slots_submission_queue_depth,
}
});

pub struct ThreadLocalMetricsStorage {
/// List of thread local metrics observers.
observers: Mutex<HashMap<u64, Arc<ThreadLocalMetrics>>>,
/// A histogram shared between all thread local systems
/// for collecting slots submission queue depth.
slots_submission_queue_depth: Histogram,
}

/// Each thread-local [`tokio_epoll_uring::System`] gets one of these as its
/// [`tokio_epoll_uring::metrics::PerSystemMetrics`] generic.
///
/// The System makes observations into [`Self`] and periodically, the collector
/// comes along and flushes [`Self`] into the shared storage [`THREAD_LOCAL_METRICS_STORAGE`].
///
/// [`LocalHistogram`] is `!Send`, so, we need to put it behind a [`Mutex`].
/// But except for the periodic flush, the lock is uncontended so there's no waiting
/// for cache coherence protocol to get an exclusive cache line.
pub struct ThreadLocalMetrics {
/// Local observer of thread local tokio-epoll-uring system's slots waiters queue depth.
slots_submission_queue_depth: Mutex<LocalHistogram>,
}

impl ThreadLocalMetricsStorage {
/// Registers a new thread local system. Returns a thread local metrics observer.
pub fn register_system(&self, id: u64) -> Arc<ThreadLocalMetrics> {
let per_system_metrics = Arc::new(ThreadLocalMetrics::new(
self.slots_submission_queue_depth.local(),
));
let mut g = self.observers.lock().unwrap();
g.insert(id, Arc::clone(&per_system_metrics));
per_system_metrics
}

/// Removes metrics observer for a thread local system.
/// This should be called before dropping a thread local system.
pub fn remove_system(&self, id: u64) {
let mut g = self.observers.lock().unwrap();
g.remove(&id);
}

/// Flush all thread local metrics to the shared storage.
pub fn flush_thread_local_metrics(&self) {
let g = self.observers.lock().unwrap();
g.values().for_each(|local| {
local.flush();
});
}
}

impl ThreadLocalMetrics {
pub fn new(slots_submission_queue_depth: LocalHistogram) -> Self {
ThreadLocalMetrics {
slots_submission_queue_depth: Mutex::new(slots_submission_queue_depth),
}
}

/// Flushes the thread local metrics to shared aggregator.
pub fn flush(&self) {
let Self {
slots_submission_queue_depth,
} = self;
slots_submission_queue_depth.lock().unwrap().flush();
}
}

impl tokio_epoll_uring::metrics::PerSystemMetrics for ThreadLocalMetrics {
fn observe_slots_submission_queue_depth(&self, queue_depth: u64) {
let Self {
slots_submission_queue_depth,
} = self;
slots_submission_queue_depth
.lock()
.unwrap()
.observe(queue_depth as f64);
}
}

pub struct Collector {
descs: Vec<metrics::core::Desc>,
systems_created: UIntGauge,
systems_destroyed: UIntGauge,
thread_local_metrics_storage: &'static ThreadLocalMetricsStorage,
}

impl metrics::core::Collector for Collector {
Expand All @@ -3056,20 +3154,29 @@ pub mod tokio_epoll_uring {

fn collect(&self) -> Vec<metrics::proto::MetricFamily> {
let mut mfs = Vec::with_capacity(Self::NMETRICS);
let tokio_epoll_uring::metrics::Metrics {
let tokio_epoll_uring::metrics::GlobalMetrics {
systems_created,
systems_destroyed,
} = tokio_epoll_uring::metrics::global();
self.systems_created.set(systems_created);
mfs.extend(self.systems_created.collect());
self.systems_destroyed.set(systems_destroyed);
mfs.extend(self.systems_destroyed.collect());

self.thread_local_metrics_storage
.flush_thread_local_metrics();

mfs.extend(
self.thread_local_metrics_storage
.slots_submission_queue_depth
.collect(),
);
mfs
}
}

impl Collector {
const NMETRICS: usize = 2;
const NMETRICS: usize = 3;

#[allow(clippy::new_without_default)]
pub fn new() -> Self {
Expand Down Expand Up @@ -3101,6 +3208,7 @@ pub mod tokio_epoll_uring {
descs,
systems_created,
systems_destroyed,
thread_local_metrics_storage: &THREAD_LOCAL_METRICS_STORAGE,
}
}
}
Expand Down Expand Up @@ -3460,6 +3568,7 @@ pub fn preinitialize_metrics() {
Lazy::force(&RECONSTRUCT_TIME);
Lazy::force(&BASEBACKUP_QUERY_TIME);
Lazy::force(&COMPUTE_COMMANDS_COUNTERS);
Lazy::force(&tokio_epoll_uring::THREAD_LOCAL_METRICS_STORAGE);

tenant_throttling::preinitialize_global_metrics();
}
18 changes: 13 additions & 5 deletions pageserver/src/virtual_file/io_engine/tokio_epoll_uring_ext.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,18 +16,24 @@ use tokio_epoll_uring::{System, SystemHandle};

use crate::virtual_file::on_fatal_io_error;

use crate::metrics::tokio_epoll_uring as metrics;
use crate::metrics::tokio_epoll_uring::{self as metrics, THREAD_LOCAL_METRICS_STORAGE};

#[derive(Clone)]
struct ThreadLocalState(Arc<ThreadLocalStateInner>);

struct ThreadLocalStateInner {
cell: tokio::sync::OnceCell<SystemHandle>,
cell: tokio::sync::OnceCell<SystemHandle<metrics::ThreadLocalMetrics>>,
launch_attempts: AtomicU32,
/// populated through fetch_add from [`THREAD_LOCAL_STATE_ID`]
thread_local_state_id: u64,
}

impl Drop for ThreadLocalStateInner {
fn drop(&mut self) {
THREAD_LOCAL_METRICS_STORAGE.remove_system(self.thread_local_state_id);
}
}

impl ThreadLocalState {
pub fn new() -> Self {
Self(Arc::new(ThreadLocalStateInner {
Expand Down Expand Up @@ -71,7 +77,8 @@ pub async fn thread_local_system() -> Handle {
&fake_cancel,
)
.await;
let res = System::launch()
let per_system_metrics = metrics::THREAD_LOCAL_METRICS_STORAGE.register_system(inner.thread_local_state_id);
let res = System::launch_with_metrics(per_system_metrics)
// this might move us to another executor thread => loop outside the get_or_try_init, not inside it
.await;
match res {
Expand All @@ -86,6 +93,7 @@ pub async fn thread_local_system() -> Handle {
emit_launch_failure_process_stats();
});
metrics::THREAD_LOCAL_LAUNCH_FAILURES.inc();
metrics::THREAD_LOCAL_METRICS_STORAGE.remove_system(inner.thread_local_state_id);
Err(())
}
// abort the process instead of panicking because pageserver usually becomes half-broken if we panic somewhere.
Expand Down Expand Up @@ -115,7 +123,7 @@ fn emit_launch_failure_process_stats() {
// number of threads
// rss / system memory usage generally

let tokio_epoll_uring::metrics::Metrics {
let tokio_epoll_uring::metrics::GlobalMetrics {
systems_created,
systems_destroyed,
} = tokio_epoll_uring::metrics::global();
Expand Down Expand Up @@ -182,7 +190,7 @@ fn emit_launch_failure_process_stats() {
pub struct Handle(ThreadLocalState);

impl std::ops::Deref for Handle {
type Target = SystemHandle;
type Target = SystemHandle<metrics::ThreadLocalMetrics>;

fn deref(&self) -> &Self::Target {
self.0
Expand Down
1 change: 1 addition & 0 deletions test_runner/fixtures/metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,7 @@ def counter(name: str) -> str:
counter("pageserver_tenant_throttling_count_accounted_finish_global"),
counter("pageserver_tenant_throttling_wait_usecs_sum_global"),
counter("pageserver_tenant_throttling_count_global"),
*histogram("pageserver_tokio_epoll_uring_slots_submission_queue_depth"),
)

PAGESERVER_PER_TENANT_METRICS: tuple[str, ...] = (
Expand Down

1 comment on commit 85b954f

@github-actions
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

5344 tests run: 5119 passed, 0 failed, 225 skipped (full report)


Flaky tests (2)

Postgres 17

Code coverage* (full report)

  • functions: 31.3% (7689 of 24545 functions)
  • lines: 48.8% (60461 of 123951 lines)

* collected from Rust tests only


The comment gets automatically updated with the latest test results
85b954f at 2024-10-25T21:51:13.788Z :recycle:

Please sign in to comment.