Skip to content

Commit

Permalink
pageserver: notify compaction loop at threshold (#10740)
Browse files Browse the repository at this point in the history
## Problem

The compaction loop currently runs periodically, which can cause it to
wait for up to 20 seconds before starting L0 compaction by default.

Also, when we later separate the semaphores for L0 compaction and image
compaction, we want to give up waiting for the image compaction
semaphore if L0 compaction is needed on any timeline.

Touches #10694.

## Summary of changes

Notify the compaction loop when an L0 flush (on any timeline) exceeds
`compaction_threshold`.

Also do some opportunistic cleanups in the area.
  • Loading branch information
erikgrinaker authored Feb 10, 2025
1 parent c368b0f commit 8c4e941
Show file tree
Hide file tree
Showing 5 changed files with 103 additions and 87 deletions.
6 changes: 6 additions & 0 deletions libs/utils/src/backoff.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use std::fmt::{Debug, Display};
use std::time::Duration;

use futures::Future;
use tokio_util::sync::CancellationToken;
Expand Down Expand Up @@ -29,6 +30,11 @@ pub async fn exponential_backoff(
}
}

pub fn exponential_backoff_duration(n: u32, base_increment: f64, max_seconds: f64) -> Duration {
let seconds = exponential_backoff_duration_seconds(n, base_increment, max_seconds);
Duration::from_secs_f64(seconds)
}

pub fn exponential_backoff_duration_seconds(n: u32, base_increment: f64, max_seconds: f64) -> f64 {
if n == 0 {
0.0
Expand Down
23 changes: 15 additions & 8 deletions pageserver/src/tenant.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ use timeline::CompactOptions;
use timeline::ShutdownMode;
use tokio::io::BufReader;
use tokio::sync::watch;
use tokio::sync::Notify;
use tokio::task::JoinSet;
use tokio_util::sync::CancellationToken;
use tracing::*;
Expand Down Expand Up @@ -350,6 +351,9 @@ pub struct Tenant {
/// Overhead of mutex is acceptable because compaction is done with a multi-second period.
compaction_circuit_breaker: std::sync::Mutex<CircuitBreaker>,

/// Signals the tenant compaction loop that there is L0 compaction work to be done.
pub(crate) l0_compaction_trigger: Arc<Notify>,

/// Scheduled gc-compaction tasks.
scheduled_compaction_tasks: std::sync::Mutex<HashMap<TimelineId, Arc<GcCompactionQueue>>>,

Expand Down Expand Up @@ -1691,12 +1695,7 @@ impl Tenant {
timeline_id,
index_part,
remote_metadata,
TimelineResources {
remote_client,
pagestream_throttle: self.pagestream_throttle.clone(),
pagestream_throttle_metrics: self.pagestream_throttle_metrics.clone(),
l0_flush_global_state: self.l0_flush_global_state.clone(),
},
self.get_timeline_resources_for(remote_client),
LoadTimelineCause::Attach,
ctx,
)
Expand Down Expand Up @@ -4112,6 +4111,7 @@ impl Tenant {
// use an extremely long backoff.
Some(Duration::from_secs(3600 * 24)),
)),
l0_compaction_trigger: Arc::new(Notify::new()),
scheduled_compaction_tasks: Mutex::new(Default::default()),
activate_now_sem: tokio::sync::Semaphore::new(0),
attach_wal_lag_cooldown: Arc::new(std::sync::OnceLock::new()),
Expand Down Expand Up @@ -5020,12 +5020,19 @@ impl Tenant {
)
}

/// Call this before constructing a timeline, to build its required structures
/// Builds required resources for a new timeline.
fn build_timeline_resources(&self, timeline_id: TimelineId) -> TimelineResources {
let remote_client = self.build_timeline_remote_client(timeline_id);
self.get_timeline_resources_for(remote_client)
}

/// Builds timeline resources for the given remote client.
fn get_timeline_resources_for(&self, remote_client: RemoteTimelineClient) -> TimelineResources {
TimelineResources {
remote_client: self.build_timeline_remote_client(timeline_id),
remote_client,
pagestream_throttle: self.pagestream_throttle.clone(),
pagestream_throttle_metrics: self.pagestream_throttle_metrics.clone(),
l0_compaction_trigger: self.l0_compaction_trigger.clone(),
l0_flush_global_state: self.l0_flush_global_state.clone(),
}
}
Expand Down
133 changes: 67 additions & 66 deletions pageserver/src/tenant/tasks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,10 @@ use crate::tenant::timeline::compaction::CompactionOutcome;
use crate::tenant::timeline::CompactionError;
use crate::tenant::{Tenant, TenantState};
use pageserver_api::config::tenant_conf_defaults::DEFAULT_COMPACTION_PERIOD;
use utils::backoff::exponential_backoff_duration;
use utils::completion::Barrier;
use utils::pausable_failpoint;
use utils::rate_limit::RateLimit;
use utils::{backoff, pausable_failpoint};

/// Semaphore limiting concurrent background tasks (across all tenants).
///
Expand Down Expand Up @@ -211,89 +212,93 @@ pub fn start_background_loops(tenant: &Arc<Tenant>, can_start: Option<&Barrier>)

/// Compaction task's main loop.
async fn compaction_loop(tenant: Arc<Tenant>, cancel: CancellationToken) {
const BASE_BACKOFF_SECS: f64 = 1.0;
const MAX_BACKOFF_SECS: f64 = 300.0;
const RECHECK_CONFIG_INTERVAL: Duration = Duration::from_secs(10);

let ctx = RequestContext::todo_child(TaskKind::Compaction, DownloadBehavior::Download);
let mut first = true;
let mut period = tenant.get_compaction_period();
let mut error_run = 0; // consecutive errors

// Stagger the compaction loop across tenants.
if wait_for_active_tenant(&tenant, &cancel).await.is_break() {
return;
}
if sleep_random(period, &cancel).await.is_err() {
return;
}

loop {
// Recheck that we're still active.
if wait_for_active_tenant(&tenant, &cancel).await.is_break() {
return;
}

let period = tenant.get_compaction_period();

// TODO: we shouldn't need to await to find tenant and this could be moved outside of
// loop, #3501. There are also additional "allowed_errors" in tests.
if first {
first = false;
if sleep_random(period, &cancel).await.is_err() {
break;
}
}

let sleep_duration;
// Refresh the period. If compaction is disabled, check again in a bit.
period = tenant.get_compaction_period();
if period == Duration::ZERO {
#[cfg(not(feature = "testing"))]
info!("automatic compaction is disabled");
// check again in 10 seconds, in case it's been enabled again.
sleep_duration = Duration::from_secs(10)
} else {
let iteration = Iteration {
started_at: Instant::now(),
period,
kind: BackgroundLoopKind::Compaction,
};

// Run compaction
let IterationResult { output, elapsed } = iteration
.run(tenant.compaction_iteration(&cancel, &ctx))
.await;
match output {
Ok(outcome) => {
error_run = 0;
// schedule the next compaction immediately in case there is a pending compaction task
sleep_duration = if let CompactionOutcome::Pending = outcome {
Duration::from_secs(1)
} else {
period
};
}
Err(err) => {
let wait_duration = backoff::exponential_backoff_duration_seconds(
error_run + 1,
1.0,
MAX_BACKOFF_SECS,
);
error_run += 1;
let wait_duration = Duration::from_secs_f64(wait_duration);
log_compaction_error(&err, error_run, &wait_duration, cancel.is_cancelled());
sleep_duration = wait_duration;
}
tokio::select! {
_ = tokio::time::sleep(RECHECK_CONFIG_INTERVAL) => {},
_ = cancel.cancelled() => return,
}
continue;
}

// the duration is recorded by performance tests by enabling debug in this function
debug!(
elapsed_ms = elapsed.as_millis(),
"compaction iteration complete"
);
// Wait for the next compaction run.
let backoff = exponential_backoff_duration(error_run, BASE_BACKOFF_SECS, MAX_BACKOFF_SECS);
tokio::select! {
_ = tokio::time::sleep(backoff), if error_run > 0 => {},
_ = tokio::time::sleep(period), if error_run == 0 => {},
_ = tenant.l0_compaction_trigger.notified(), if error_run == 0 => {},
_ = cancel.cancelled() => return,
}

// Run compaction.
let iteration = Iteration {
started_at: Instant::now(),
period,
kind: BackgroundLoopKind::Compaction,
};
let IterationResult { output, elapsed } = iteration
.run(tenant.compaction_iteration(&cancel, &ctx))
.await;

match output {
Ok(outcome) => {
error_run = 0;
// If there's more compaction work pending, reschedule immediately. This isn't
// necessarily L0 compaction, but that's fine for now.
//
// TODO: differentiate between L0 compaction and other compaction. The former needs
// to be responsive, the latter doesn't.
if outcome == CompactionOutcome::Pending {
tenant.l0_compaction_trigger.notify_one();
}
}

// Sleep
if tokio::time::timeout(sleep_duration, cancel.cancelled())
.await
.is_ok()
{
break;
Err(err) => {
error_run += 1;
let backoff =
exponential_backoff_duration(error_run, BASE_BACKOFF_SECS, MAX_BACKOFF_SECS);
log_compaction_error(&err, error_run, backoff, cancel.is_cancelled());
continue;
}
}

// NB: this log entry is recorded by performance tests.
debug!(
elapsed_ms = elapsed.as_millis(),
"compaction iteration complete"
);
}
}

fn log_compaction_error(
err: &CompactionError,
error_count: u32,
sleep_duration: &Duration,
sleep_duration: Duration,
task_cancelled: bool,
) {
use crate::tenant::upload_queue::NotInitialized;
Expand Down Expand Up @@ -390,13 +395,9 @@ async fn gc_loop(tenant: Arc<Tenant>, cancel: CancellationToken) {
return;
}
Err(e) => {
let wait_duration = backoff::exponential_backoff_duration_seconds(
error_run + 1,
1.0,
MAX_BACKOFF_SECS,
);
error_run += 1;
let wait_duration = Duration::from_secs_f64(wait_duration);
let wait_duration =
exponential_backoff_duration(error_run, 1.0, MAX_BACKOFF_SECS);

if matches!(e, crate::tenant::GcError::TimelineCancelled) {
// Timeline was cancelled during gc. We might either be in an event
Expand Down
17 changes: 13 additions & 4 deletions pageserver/src/tenant/timeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,11 +45,9 @@ use rand::Rng;
use remote_storage::DownloadError;
use serde_with::serde_as;
use storage_broker::BrokerClientChannel;
use tokio::runtime::Handle;
use tokio::sync::mpsc::Sender;
use tokio::{
runtime::Handle,
sync::{oneshot, watch},
};
use tokio::sync::{oneshot, watch, Notify};
use tokio_util::sync::CancellationToken;
use tracing::*;
use utils::critical;
Expand Down Expand Up @@ -227,6 +225,7 @@ pub struct TimelineResources {
pub remote_client: RemoteTimelineClient,
pub pagestream_throttle: Arc<crate::tenant::throttle::Throttle>,
pub pagestream_throttle_metrics: Arc<crate::metrics::tenant_throttling::Pagestream>,
pub l0_compaction_trigger: Arc<Notify>,
pub l0_flush_global_state: l0_flush::L0FlushGlobalState,
}

Expand Down Expand Up @@ -426,6 +425,9 @@ pub struct Timeline {
/// If true, the last compaction failed.
compaction_failed: AtomicBool,

/// Notifies the tenant compaction loop that there is pending L0 compaction work.
l0_compaction_trigger: Arc<Notify>,

/// Make sure we only have one running gc at a time.
///
/// Must only be taken in two places:
Expand Down Expand Up @@ -2664,6 +2666,7 @@ impl Timeline {

compaction_lock: tokio::sync::Mutex::default(),
compaction_failed: AtomicBool::default(),
l0_compaction_trigger: resources.l0_compaction_trigger,
gc_lock: tokio::sync::Mutex::default(),

standby_horizon: AtomicLsn::new(0),
Expand Down Expand Up @@ -4006,6 +4009,12 @@ impl Timeline {
}
let flush_duration = flush_timer.stop_and_record();

// Notify the tenant compaction loop if L0 compaction is needed.
let l0_count = *watch_l0.borrow();
if l0_count >= self.get_compaction_threshold() {
self.l0_compaction_trigger.notify_one();
}

// Delay the next flush to backpressure if compaction can't keep up. We delay by the
// flush duration such that the flush takes 2x as long. This is propagated up to WAL
// ingestion by having ephemeral layer rolls wait for flushes.
Expand Down
11 changes: 2 additions & 9 deletions pageserver/src/tenant/timeline/delete.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,11 @@ use crate::{
metadata::TimelineMetadata,
remote_timeline_client::{PersistIndexPartWithDeletedFlagError, RemoteTimelineClient},
CreateTimelineCause, DeleteTimelineError, MaybeDeletedIndexPart, Tenant,
TenantManifestError, TimelineOrOffloaded,
TenantManifestError, Timeline, TimelineOrOffloaded,
},
virtual_file::MaybeFatalIo,
};

use super::{Timeline, TimelineResources};

/// Mark timeline as deleted in S3 so we won't pick it up next time
/// during attach or pageserver restart.
/// See comment in persist_index_part_with_deleted_flag.
Expand Down Expand Up @@ -296,12 +294,7 @@ impl DeleteTimelineFlow {
timeline_id,
local_metadata,
None, // Ancestor is not needed for deletion.
TimelineResources {
remote_client,
pagestream_throttle: tenant.pagestream_throttle.clone(),
pagestream_throttle_metrics: tenant.pagestream_throttle_metrics.clone(),
l0_flush_global_state: tenant.l0_flush_global_state.clone(),
},
tenant.get_timeline_resources_for(remote_client),
// Important. We dont pass ancestor above because it can be missing.
// Thus we need to skip the validation here.
CreateTimelineCause::Delete,
Expand Down

1 comment on commit 8c4e941

@github-actions
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

7425 tests run: 7000 passed, 44 failed, 381 skipped (full report)


Failures on Postgres 17

Failures on Postgres 16

Failures on Postgres 15

Failures on Postgres 14

# Run all failed tests locally:
scripts/pytest -vv -n $(nproc) -k "test_forward_compatibility[release-pg14] or test_forward_compatibility[release-pg14] or test_versions_mismatch[release-pg14-combination_ooonn] or test_versions_mismatch[release-pg14-combination_ooonn] or test_versions_mismatch[release-pg14-combination_ononn] or test_versions_mismatch[release-pg14-combination_ononn] or test_versions_mismatch[release-pg14-combination_onnnn] or test_versions_mismatch[release-pg14-combination_onnnn] or test_forward_compatibility[release-pg15] or test_forward_compatibility[release-pg15] or test_versions_mismatch[release-pg15-combination_ooonn] or test_versions_mismatch[release-pg15-combination_ooonn] or test_versions_mismatch[release-pg15-combination_ononn] or test_versions_mismatch[release-pg15-combination_ononn] or test_versions_mismatch[release-pg15-combination_onnnn] or test_versions_mismatch[release-pg15-combination_onnnn] or test_forward_compatibility[release-pg16] or test_forward_compatibility[release-pg16] or test_versions_mismatch[release-pg16-combination_ooonn] or test_versions_mismatch[release-pg16-combination_ooonn] or test_versions_mismatch[release-pg16-combination_ononn] or test_versions_mismatch[release-pg16-combination_ononn] or test_versions_mismatch[release-pg16-combination_onnnn] or test_versions_mismatch[release-pg16-combination_onnnn] or test_forward_compatibility[release-pg17] or test_forward_compatibility[release-pg17] or test_forward_compatibility[debug-pg17] or test_forward_compatibility[release-pg17] or test_forward_compatibility[release-pg17] or test_versions_mismatch[release-pg17-combination_ooonn] or test_versions_mismatch[release-pg17-combination_ooonn] or test_versions_mismatch[debug-pg17-combination_ooonn] or test_versions_mismatch[release-pg17-combination_ooonn] or test_versions_mismatch[release-pg17-combination_ooonn] or test_versions_mismatch[release-pg17-combination_ononn] or test_versions_mismatch[release-pg17-combination_ononn] or test_versions_mismatch[debug-pg17-combination_ononn] or test_versions_mismatch[release-pg17-combination_ononn] or test_versions_mismatch[release-pg17-combination_ononn] or test_versions_mismatch[release-pg17-combination_onnnn] or test_versions_mismatch[release-pg17-combination_onnnn] or test_versions_mismatch[debug-pg17-combination_onnnn] or test_versions_mismatch[release-pg17-combination_onnnn] or test_versions_mismatch[release-pg17-combination_onnnn]"
Flaky tests (3)

Postgres 16

Postgres 15

Test coverage report is not available

The comment gets automatically updated with the latest test results
8c4e941 at 2025-02-10T19:17:28.039Z :recycle:

Please sign in to comment.