) -> Result,
// so create the chan and write to it in another task.
diff --git a/safekeeper/src/pull_timeline.rs b/safekeeper/src/pull_timeline.rs
index c7f5165f90c9..c700e18cc7d7 100644
--- a/safekeeper/src/pull_timeline.rs
+++ b/safekeeper/src/pull_timeline.rs
@@ -8,6 +8,7 @@ use serde::{Deserialize, Serialize};
use std::{
io::{self, ErrorKind},
+ sync::Arc,
use tokio::{fs::OpenOptions, io::AsyncWrite, sync::mpsc, task};
use tokio_tar::{Archive, Builder, Header};
@@ -25,8 +26,8 @@ use crate::{
- state::TimelinePersistentState,
- timeline::WalResidentTimeline,
+ state::{EvictionState, TimelinePersistentState},
+ timeline::{Timeline, WalResidentTimeline},
timelines_global_map::{create_temp_timeline_dir, validate_temp_timeline},
@@ -43,18 +44,33 @@ use utils::{
/// Stream tar archive of timeline to tx.
#[instrument(name = "snapshot", skip_all, fields(ttid = %tli.ttid))]
pub async fn stream_snapshot(
- tli: WalResidentTimeline,
+ tli: Arc,
source: NodeId,
destination: NodeId,
tx: mpsc::Sender>,
) {
- if let Err(e) = stream_snapshot_guts(tli, source, destination, tx.clone()).await {
- // Error type/contents don't matter as they won't can't reach the client
- // (hyper likely doesn't do anything with it), but http stream will be
- // prematurely terminated. It would be nice to try to send the error in
- // trailers though.
- tx.send(Err(anyhow!("snapshot failed"))).await.ok();
- error!("snapshot failed: {:#}", e);
+ match tli.try_wal_residence_guard().await {
+ Err(e) => {
+ tx.send(Err(anyhow!("Error checking residence: {:#}", e)))
+ .await
+ .ok();
+ }
+ Ok(maybe_resident_tli) => {
+ if let Err(e) = match maybe_resident_tli {
+ Some(resident_tli) => {
+ stream_snapshot_resident_guts(resident_tli, source, destination, tx.clone())
+ .await
+ }
+ None => stream_snapshot_offloaded_guts(tli, source, destination, tx.clone()).await,
+ } {
+ // Error type/contents don't matter as they won't can't reach the client
+ // (hyper likely doesn't do anything with it), but http stream will be
+ // prematurely terminated. It would be nice to try to send the error in
+ // trailers though.
+ tx.send(Err(anyhow!("snapshot failed"))).await.ok();
+ error!("snapshot failed: {:#}", e);
+ }
+ }
@@ -80,12 +96,10 @@ impl Drop for SnapshotContext {
-pub async fn stream_snapshot_guts(
- tli: WalResidentTimeline,
- source: NodeId,
- destination: NodeId,
+/// Build a tokio_tar stream that sends encoded bytes into a Bytes channel.
+fn prepare_tar_stream(
tx: mpsc::Sender>,
-) -> Result<()> {
+) -> tokio_tar::Builder {
// tokio-tar wants Write implementor, but we have mpsc tx >;
// use SinkWriter as a Write impl. That is,
// - create Sink from the tx. It returns PollSendError if chan is closed.
@@ -100,12 +114,38 @@ pub async fn stream_snapshot_guts(
// - SinkWriter (not surprisingly) wants sink of &[u8], not bytes, so wrap
// into CopyToBytes. This is a data copy.
let copy_to_bytes = CopyToBytes::new(oksink);
- let mut writer = SinkWriter::new(copy_to_bytes);
- let pinned_writer = std::pin::pin!(writer);
+ let writer = SinkWriter::new(copy_to_bytes);
+ let pinned_writer = Box::pin(writer);
// Note that tokio_tar append_* funcs use tokio::io::copy with 8KB buffer
// which is also likely suboptimal.
- let mut ar = Builder::new_non_terminated(pinned_writer);
+ Builder::new_non_terminated(pinned_writer)
+/// Implementation of snapshot for an offloaded timeline, only reads control file
+pub(crate) async fn stream_snapshot_offloaded_guts(
+ tli: Arc,
+ source: NodeId,
+ destination: NodeId,
+ tx: mpsc::Sender>,
+) -> Result<()> {
+ let mut ar = prepare_tar_stream(tx);
+ tli.snapshot_offloaded(&mut ar, source, destination).await?;
+ ar.finish().await?;
+ Ok(())
+/// Implementation of snapshot for a timeline which is resident (includes some segment data)
+pub async fn stream_snapshot_resident_guts(
+ tli: WalResidentTimeline,
+ source: NodeId,
+ destination: NodeId,
+ tx: mpsc::Sender>,
+) -> Result<()> {
+ let mut ar = prepare_tar_stream(tx);
let bctx = tli.start_snapshot(&mut ar, source, destination).await?;
@@ -138,6 +178,70 @@ pub async fn stream_snapshot_guts(
+impl Timeline {
+ /// Simple snapshot for an offloaded timeline: we will only upload a renamed partial segment and
+ /// pass a modified control file into the provided tar stream (nothing with data segments on disk, since
+ /// we are offloaded and there aren't any)
+ async fn snapshot_offloaded(
+ self: &Arc,
+ ar: &mut tokio_tar::Builder,
+ source: NodeId,
+ destination: NodeId,
+ ) -> Result<()> {
+ // Take initial copy of control file, then release state lock
+ let mut control_file = {
+ let shared_state = self.write_shared_state().await;
+ let control_file = TimelinePersistentState::clone(shared_state.sk.state());
+ // Rare race: we got unevicted between entering function and reading control file.
+ // We error out and let API caller retry.
+ if !matches!(control_file.eviction_state, EvictionState::Offloaded(_)) {
+ bail!("Timeline was un-evicted during snapshot, please retry");
+ }
+ control_file
+ };
+ // Modify the partial segment of the in-memory copy for the control file to
+ // point to the destination safekeeper.
+ let replace = control_file
+ .partial_backup
+ .replace_uploaded_segment(source, destination)?;
+ let Some(replace) = replace else {
+ // In Manager:: ready_for_eviction, we do not permit eviction unless the timeline
+ // has a partial segment. It is unexpected that
+ anyhow::bail!("Timeline has no partial segment, cannot generate snapshot");
+ };
+ tracing::info!("Replacing uploaded partial segment in in-mem control file: {replace:?}");
+ // Optimistically try to copy the partial segment to the destination's path: this
+ // can fail if the timeline was un-evicted and modified in the background.
+ let remote_timeline_path = &self.remote_path;
+ wal_backup::copy_partial_segment(
+ &replace.previous.remote_path(remote_timeline_path),
+ &replace.current.remote_path(remote_timeline_path),
+ )
+ .await?;
+ // Since the S3 copy succeeded with the path given in our control file snapshot, and
+ // we are sending that snapshot in our response, we are giving the caller a consistent
+ // snapshot even if our local Timeline was unevicted or otherwise modified in the meantime.
+ let buf = control_file
+ .write_to_buf()
+ .with_context(|| "failed to serialize control store")?;
+ let mut header = Header::new_gnu();
+ header.set_size(buf.len().try_into().expect("never breaches u64"));
+ ar.append_data(&mut header, CONTROL_FILE_NAME, buf.as_slice())
+ .await
+ .with_context(|| "failed to append to archive")?;
+ Ok(())
+ }
impl WalResidentTimeline {
/// Start streaming tar archive with timeline:
/// 1) stream control file under lock;
diff --git a/safekeeper/src/timeline.rs b/safekeeper/src/timeline.rs
index c737dfcf9b99..f0113978c469 100644
--- a/safekeeper/src/timeline.rs
+++ b/safekeeper/src/timeline.rs
@@ -797,14 +797,17 @@ impl Timeline {
- /// Get the timeline guard for reading/writing WAL files.
- /// If WAL files are not present on disk (evicted), they will be automatically
- /// downloaded from remote storage. This is done in the manager task, which is
- /// responsible for issuing all guards.
- ///
- /// NB: don't use this function from timeline_manager, it will deadlock.
- /// NB: don't use this function while holding shared_state lock.
- pub async fn wal_residence_guard(self: &Arc) -> Result {
+ /// Guts of [`Self::wal_residence_guard`] and [`Self::try_wal_residence_guard`]
+ async fn do_wal_residence_guard(
+ self: &Arc,
+ block: bool,
+ ) -> Result