Skip to content

Commit

Permalink
safekeeper: don't un-evict timelines during snapshot API handler (#9428)
Browse files Browse the repository at this point in the history
## Problem

When we use pull_timeline API on an evicted timeline, it gets downloaded
to serve the snapshot API request. That means that to evacuate all the
timelines from a node, the node needs enough disk space to download
partial segments from all timelines, which may not be physically the
case.

Closes: #8833 

## Summary of changes

- Add a "try" variant of acquiring a residence guard, that returns None
if the timeline is offloaded
- During snapshot API handler, take a different code path if the
timeline isn't resident, where we just read the checkpoint and don't try
to read any segments.
  • Loading branch information
jcsp authored Oct 28, 2024
1 parent e727788 commit 923974d
Show file tree
Hide file tree
Showing 6 changed files with 298 additions and 42 deletions.
8 changes: 0 additions & 8 deletions safekeeper/src/http/routes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -262,14 +262,6 @@ async fn timeline_snapshot_handler(request: Request<Body>) -> Result<Response<Bo
check_permission(&request, Some(ttid.tenant_id))?;

let tli = GlobalTimelines::get(ttid).map_err(ApiError::from)?;
// Note: with evicted timelines it should work better then de-evict them and
// stream; probably start_snapshot would copy partial s3 file to dest path
// and stream control file, or return WalResidentTimeline if timeline is not
// evicted.
let tli = tli
.wal_residence_guard()
.await
.map_err(ApiError::InternalServerError)?;

// To stream the body use wrap_stream which wants Stream of Result<Bytes>,
// so create the chan and write to it in another task.
Expand Down
140 changes: 122 additions & 18 deletions safekeeper/src/pull_timeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use serde::{Deserialize, Serialize};
use std::{
cmp::min,
io::{self, ErrorKind},
sync::Arc,
};
use tokio::{fs::OpenOptions, io::AsyncWrite, sync::mpsc, task};
use tokio_tar::{Archive, Builder, Header};
Expand All @@ -25,8 +26,8 @@ use crate::{
routes::TimelineStatus,
},
safekeeper::Term,
state::TimelinePersistentState,
timeline::WalResidentTimeline,
state::{EvictionState, TimelinePersistentState},
timeline::{Timeline, WalResidentTimeline},
timelines_global_map::{create_temp_timeline_dir, validate_temp_timeline},
wal_backup,
wal_storage::open_wal_file,
Expand All @@ -43,18 +44,33 @@ use utils::{
/// Stream tar archive of timeline to tx.
#[instrument(name = "snapshot", skip_all, fields(ttid = %tli.ttid))]
pub async fn stream_snapshot(
tli: WalResidentTimeline,
tli: Arc<Timeline>,
source: NodeId,
destination: NodeId,
tx: mpsc::Sender<Result<Bytes>>,
) {
if let Err(e) = stream_snapshot_guts(tli, source, destination, tx.clone()).await {
// Error type/contents don't matter as they won't can't reach the client
// (hyper likely doesn't do anything with it), but http stream will be
// prematurely terminated. It would be nice to try to send the error in
// trailers though.
tx.send(Err(anyhow!("snapshot failed"))).await.ok();
error!("snapshot failed: {:#}", e);
match tli.try_wal_residence_guard().await {
Err(e) => {
tx.send(Err(anyhow!("Error checking residence: {:#}", e)))
.await
.ok();
}
Ok(maybe_resident_tli) => {
if let Err(e) = match maybe_resident_tli {
Some(resident_tli) => {
stream_snapshot_resident_guts(resident_tli, source, destination, tx.clone())
.await
}
None => stream_snapshot_offloaded_guts(tli, source, destination, tx.clone()).await,
} {
// Error type/contents don't matter as they won't can't reach the client
// (hyper likely doesn't do anything with it), but http stream will be
// prematurely terminated. It would be nice to try to send the error in
// trailers though.
tx.send(Err(anyhow!("snapshot failed"))).await.ok();
error!("snapshot failed: {:#}", e);
}
}
}
}

Expand All @@ -80,12 +96,10 @@ impl Drop for SnapshotContext {
}
}

pub async fn stream_snapshot_guts(
tli: WalResidentTimeline,
source: NodeId,
destination: NodeId,
/// Build a tokio_tar stream that sends encoded bytes into a Bytes channel.
fn prepare_tar_stream(
tx: mpsc::Sender<Result<Bytes>>,
) -> Result<()> {
) -> tokio_tar::Builder<impl AsyncWrite + Unpin + Send> {
// tokio-tar wants Write implementor, but we have mpsc tx <Result<Bytes>>;
// use SinkWriter as a Write impl. That is,
// - create Sink from the tx. It returns PollSendError if chan is closed.
Expand All @@ -100,12 +114,38 @@ pub async fn stream_snapshot_guts(
// - SinkWriter (not surprisingly) wants sink of &[u8], not bytes, so wrap
// into CopyToBytes. This is a data copy.
let copy_to_bytes = CopyToBytes::new(oksink);
let mut writer = SinkWriter::new(copy_to_bytes);
let pinned_writer = std::pin::pin!(writer);
let writer = SinkWriter::new(copy_to_bytes);
let pinned_writer = Box::pin(writer);

// Note that tokio_tar append_* funcs use tokio::io::copy with 8KB buffer
// which is also likely suboptimal.
let mut ar = Builder::new_non_terminated(pinned_writer);
Builder::new_non_terminated(pinned_writer)
}

/// Implementation of snapshot for an offloaded timeline, only reads control file
pub(crate) async fn stream_snapshot_offloaded_guts(
tli: Arc<Timeline>,
source: NodeId,
destination: NodeId,
tx: mpsc::Sender<Result<Bytes>>,
) -> Result<()> {
let mut ar = prepare_tar_stream(tx);

tli.snapshot_offloaded(&mut ar, source, destination).await?;

ar.finish().await?;

Ok(())
}

/// Implementation of snapshot for a timeline which is resident (includes some segment data)
pub async fn stream_snapshot_resident_guts(
tli: WalResidentTimeline,
source: NodeId,
destination: NodeId,
tx: mpsc::Sender<Result<Bytes>>,
) -> Result<()> {
let mut ar = prepare_tar_stream(tx);

let bctx = tli.start_snapshot(&mut ar, source, destination).await?;
pausable_failpoint!("sk-snapshot-after-list-pausable");
Expand Down Expand Up @@ -138,6 +178,70 @@ pub async fn stream_snapshot_guts(
Ok(())
}

impl Timeline {
/// Simple snapshot for an offloaded timeline: we will only upload a renamed partial segment and
/// pass a modified control file into the provided tar stream (nothing with data segments on disk, since
/// we are offloaded and there aren't any)
async fn snapshot_offloaded<W: AsyncWrite + Unpin + Send>(
self: &Arc<Timeline>,
ar: &mut tokio_tar::Builder<W>,
source: NodeId,
destination: NodeId,
) -> Result<()> {
// Take initial copy of control file, then release state lock
let mut control_file = {
let shared_state = self.write_shared_state().await;

let control_file = TimelinePersistentState::clone(shared_state.sk.state());

// Rare race: we got unevicted between entering function and reading control file.
// We error out and let API caller retry.
if !matches!(control_file.eviction_state, EvictionState::Offloaded(_)) {
bail!("Timeline was un-evicted during snapshot, please retry");
}

control_file
};

// Modify the partial segment of the in-memory copy for the control file to
// point to the destination safekeeper.
let replace = control_file
.partial_backup
.replace_uploaded_segment(source, destination)?;

let Some(replace) = replace else {
// In Manager:: ready_for_eviction, we do not permit eviction unless the timeline
// has a partial segment. It is unexpected that
anyhow::bail!("Timeline has no partial segment, cannot generate snapshot");
};

tracing::info!("Replacing uploaded partial segment in in-mem control file: {replace:?}");

// Optimistically try to copy the partial segment to the destination's path: this
// can fail if the timeline was un-evicted and modified in the background.
let remote_timeline_path = &self.remote_path;
wal_backup::copy_partial_segment(
&replace.previous.remote_path(remote_timeline_path),
&replace.current.remote_path(remote_timeline_path),
)
.await?;

// Since the S3 copy succeeded with the path given in our control file snapshot, and
// we are sending that snapshot in our response, we are giving the caller a consistent
// snapshot even if our local Timeline was unevicted or otherwise modified in the meantime.
let buf = control_file
.write_to_buf()
.with_context(|| "failed to serialize control store")?;
let mut header = Header::new_gnu();
header.set_size(buf.len().try_into().expect("never breaches u64"));
ar.append_data(&mut header, CONTROL_FILE_NAME, buf.as_slice())
.await
.with_context(|| "failed to append to archive")?;

Ok(())
}
}

impl WalResidentTimeline {
/// Start streaming tar archive with timeline:
/// 1) stream control file under lock;
Expand Down
59 changes: 43 additions & 16 deletions safekeeper/src/timeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -797,14 +797,17 @@ impl Timeline {
state.sk.term_bump(to).await
}

/// Get the timeline guard for reading/writing WAL files.
/// If WAL files are not present on disk (evicted), they will be automatically
/// downloaded from remote storage. This is done in the manager task, which is
/// responsible for issuing all guards.
///
/// NB: don't use this function from timeline_manager, it will deadlock.
/// NB: don't use this function while holding shared_state lock.
pub async fn wal_residence_guard(self: &Arc<Self>) -> Result<WalResidentTimeline> {
/// Guts of [`Self::wal_residence_guard`] and [`Self::try_wal_residence_guard`]
async fn do_wal_residence_guard(
self: &Arc<Self>,
block: bool,
) -> Result<Option<WalResidentTimeline>> {
let op_label = if block {
"wal_residence_guard"
} else {
"try_wal_residence_guard"
};

if self.is_cancelled() {
bail!(TimelineError::Cancelled(self.ttid));
}
Expand All @@ -816,41 +819,65 @@ impl Timeline {
// Wait 30 seconds for the guard to be acquired. It can time out if someone is
// holding the lock (e.g. during `SafeKeeper::process_msg()`) or manager task
// is stuck.
let res = tokio::time::timeout_at(
started_at + Duration::from_secs(30),
self.manager_ctl.wal_residence_guard(),
)
let res = tokio::time::timeout_at(started_at + Duration::from_secs(30), async {
if block {
self.manager_ctl.wal_residence_guard().await.map(Some)
} else {
self.manager_ctl.try_wal_residence_guard().await
}
})
.await;

let guard = match res {
Ok(Ok(guard)) => {
let finished_at = Instant::now();
let elapsed = finished_at - started_at;
MISC_OPERATION_SECONDS
.with_label_values(&["wal_residence_guard"])
.with_label_values(&[op_label])
.observe(elapsed.as_secs_f64());

guard
}
Ok(Err(e)) => {
warn!(
"error while acquiring WalResidentTimeline guard, statuses {:?} => {:?}",
"error acquiring in {op_label}, statuses {:?} => {:?}",
status_before,
self.mgr_status.get()
);
return Err(e);
}
Err(_) => {
warn!(
"timeout while acquiring WalResidentTimeline guard, statuses {:?} => {:?}",
"timeout acquiring in {op_label} guard, statuses {:?} => {:?}",
status_before,
self.mgr_status.get()
);
anyhow::bail!("timeout while acquiring WalResidentTimeline guard");
}
};

Ok(WalResidentTimeline::new(self.clone(), guard))
Ok(guard.map(|g| WalResidentTimeline::new(self.clone(), g)))
}

/// Get the timeline guard for reading/writing WAL files.
/// If WAL files are not present on disk (evicted), they will be automatically
/// downloaded from remote storage. This is done in the manager task, which is
/// responsible for issuing all guards.
///
/// NB: don't use this function from timeline_manager, it will deadlock.
/// NB: don't use this function while holding shared_state lock.
pub async fn wal_residence_guard(self: &Arc<Self>) -> Result<WalResidentTimeline> {
self.do_wal_residence_guard(true)
.await
.map(|m| m.expect("Always get Some in block=true mode"))
}

/// Get the timeline guard for reading/writing WAL files if the timeline is resident,
/// else return None
pub(crate) async fn try_wal_residence_guard(
self: &Arc<Self>,
) -> Result<Option<WalResidentTimeline>> {
self.do_wal_residence_guard(false).await
}

pub async fn backup_partial_reset(self: &Arc<Self>) -> Result<Vec<String>> {
Expand Down
3 changes: 3 additions & 0 deletions safekeeper/src/timeline_eviction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,9 @@ impl Manager {
// This also works for the first segment despite last_removed_segno
// being 0 on init because this 0 triggers run of wal_removal_task
// on success of which manager updates the horizon.
//
// **Note** pull_timeline functionality assumes that evicted timelines always have
// a partial segment: if we ever change this condition, must also update that code.
&& self
.partial_backup_uploaded
.as_ref()
Expand Down
27 changes: 27 additions & 0 deletions safekeeper/src/timeline_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,8 @@ const REFRESH_INTERVAL: Duration = Duration::from_millis(300);
pub enum ManagerCtlMessage {
/// Request to get a guard for WalResidentTimeline, with WAL files available locally.
GuardRequest(tokio::sync::oneshot::Sender<anyhow::Result<ResidenceGuard>>),
/// Get a guard for WalResidentTimeline if the timeline is not currently offloaded, else None
TryGuardRequest(tokio::sync::oneshot::Sender<Option<ResidenceGuard>>),
/// Request to drop the guard.
GuardDrop(GuardId),
/// Request to reset uploaded partial backup state.
Expand All @@ -110,6 +112,7 @@ impl std::fmt::Debug for ManagerCtlMessage {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
ManagerCtlMessage::GuardRequest(_) => write!(f, "GuardRequest"),
ManagerCtlMessage::TryGuardRequest(_) => write!(f, "TryGuardRequest"),
ManagerCtlMessage::GuardDrop(id) => write!(f, "GuardDrop({:?})", id),
ManagerCtlMessage::BackupPartialReset(_) => write!(f, "BackupPartialReset"),
}
Expand Down Expand Up @@ -152,6 +155,19 @@ impl ManagerCtl {
.and_then(std::convert::identity)
}

/// Issue a new guard if the timeline is currently not offloaded, else return None
/// Sends a message to the manager and waits for the response.
/// Can be blocked indefinitely if the manager is stuck.
pub async fn try_wal_residence_guard(&self) -> anyhow::Result<Option<ResidenceGuard>> {
let (tx, rx) = tokio::sync::oneshot::channel();
self.manager_tx
.send(ManagerCtlMessage::TryGuardRequest(tx))?;

// wait for the manager to respond with the guard
rx.await
.map_err(|e| anyhow::anyhow!("response read fail: {:?}", e))
}

/// Request timeline manager to reset uploaded partial segment state and
/// wait for the result.
pub async fn backup_partial_reset(&self) -> anyhow::Result<Vec<String>> {
Expand Down Expand Up @@ -674,6 +690,17 @@ impl Manager {
warn!("failed to reply with a guard, receiver dropped");
}
}
Some(ManagerCtlMessage::TryGuardRequest(tx)) => {
let result = if self.is_offloaded {
None
} else {
Some(self.access_service.create_guard())
};

if tx.send(result).is_err() {
warn!("failed to reply with a guard, receiver dropped");
}
}
Some(ManagerCtlMessage::GuardDrop(guard_id)) => {
self.access_service.drop_guard(guard_id);
}
Expand Down
Loading

1 comment on commit 923974d

@github-actions
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

5353 tests run: 5127 passed, 1 failed, 225 skipped (full report)


Failures on Postgres 16

  • test_storage_controller_many_tenants[github-actions-selfhosted]: release-x86-64
# Run all failed tests locally:
scripts/pytest -vv -n $(nproc) -k "test_storage_controller_many_tenants[release-pg16-github-actions-selfhosted]"
Flaky tests (1)

Postgres 15

Code coverage* (full report)

  • functions: 31.3% (7686 of 24563 functions)
  • lines: 48.7% (60451 of 124034 lines)

* collected from Rust tests only


The comment gets automatically updated with the latest test results
923974d at 2024-10-28T10:42:19.970Z :recycle:

Please sign in to comment.