Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix handling of swap files #2624

Merged
merged 2 commits into from
Feb 18, 2025
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
194 changes: 86 additions & 108 deletions sequencer/src/persistence/fs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -221,49 +221,56 @@ impl Inner {

fn collect_garbage(
&mut self,
view: ViewNumber,
intervals: &[RangeInclusive<u64>],
decided_view: ViewNumber,
prune_intervals: &[RangeInclusive<ViewNumber>],
) -> anyhow::Result<()> {
let view_number = view.u64();
let prune_view = view.saturating_sub(self.view_retention);
let prune_view = ViewNumber::new(decided_view.saturating_sub(self.view_retention));

let delete_files =
|intervals: &[RangeInclusive<u64>], keep, dir_path: PathBuf| -> anyhow::Result<()> {
if !dir_path.is_dir() {
return Ok(());
}
self.prune_files(self.da_dir_path(), prune_view, None, prune_intervals)?;
self.prune_files(self.vid_dir_path(), prune_view, None, prune_intervals)?;
self.prune_files(
self.quorum_proposals_dir_path(),
prune_view,
None,
prune_intervals,
)?;

for entry in fs::read_dir(dir_path)? {
let entry = entry?;
let path = entry.path();

if let Some(file) = path.file_stem().and_then(|n| n.to_str()) {
if let Ok(v) = file.parse::<u64>() {
// If the view is the anchor view, keep it no matter what.
if let Some(keep) = keep {
if keep == v {
continue;
}
}
// Otherwise, delete it if it is time to prune this view _or_ if the
// given intervals, which we've already successfully processed, contain
// the view; in this case we simply don't need it anymore.
if v < prune_view || intervals.iter().any(|i| i.contains(&v)) {
fs::remove_file(&path)?;
}
}
}
}
// Save the most recent leaf as it will be our anchor point if the node restarts.
self.prune_files(
self.decided_leaf_path(),
prune_view,
Some(decided_view),
prune_intervals,
)?;

Ok(())
};
Ok(())
}

delete_files(intervals, None, self.da_dir_path())?;
delete_files(intervals, None, self.vid_dir_path())?;
delete_files(intervals, None, self.quorum_proposals_dir_path())?;
fn prune_files(
&mut self,
dir_path: PathBuf,
prune_view: ViewNumber,
keep_decided_view: Option<ViewNumber>,
prune_intervals: &[RangeInclusive<ViewNumber>],
) -> anyhow::Result<()> {
if !dir_path.is_dir() {
return Ok(());
}

// Save the most recent leaf as it will be our anchor point if the node restarts.
delete_files(intervals, Some(view_number), self.decided_leaf_path())?;
for (file_view, path) in view_files(dir_path)? {
// If the view is the anchor view, keep it no matter what.
if let Some(decided_view) = keep_decided_view {
if decided_view == file_view {
continue;
}
}
// Otherwise, delete it if it is time to prune this view _or_ if the given intervals,
// which we've already successfully processed, contain the view; in this case we simply
// don't need it anymore.
if file_view < prune_view || prune_intervals.iter().any(|i| i.contains(&file_view)) {
fs::remove_file(&path)?;
}
}

Ok(())
}
Expand All @@ -276,22 +283,13 @@ impl Inner {
&self,
view: ViewNumber,
consumer: &impl EventConsumer,
) -> anyhow::Result<Vec<RangeInclusive<u64>>> {
) -> anyhow::Result<Vec<RangeInclusive<ViewNumber>>> {
// Generate a decide event for each leaf, to be processed by the event consumer. We make a
// separate event for each leaf because it is possible we have non-consecutive leaves in our
// storage, which would not be valid as a single decide with a single leaf chain.
let mut leaves = BTreeMap::new();
for entry in fs::read_dir(self.decided_leaf_path())? {
let entry = entry?;
let path = entry.path();

let Some(file) = path.file_stem().and_then(|n| n.to_str()) else {
continue;
};
let Ok(v) = file.parse::<u64>() else {
continue;
};
if v > view.u64() {
for (v, path) in view_files(self.decided_leaf_path())? {
Copy link
Collaborator

@sveitser sveitser Feb 18, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

More descriptive name than v? Looking at this PR diff, or file in general, I think it would be nice to be a bit more explicit with names that describe different kinds of view number. I think we should only use the bare term view if that's not ambiguous and that is no longer the case when we are comparing views.

if v > view {
continue;
}

Expand All @@ -302,22 +300,20 @@ impl Inner {
.context(format!("parsing decided leaf {}", path.display()))?;

// Include the VID share if available.
let vid_share = self
.load_vid_share(ViewNumber::new(v))?
.map(|proposal| proposal.data);
let vid_share = self.load_vid_share(v)?.map(|proposal| proposal.data);
if vid_share.is_none() {
tracing::debug!(view = v, "VID share not available at decide");
tracing::debug!(?v, "VID share not available at decide");
}

// Fill in the full block payload using the DA proposals we had persisted.
if let Some(proposal) = self.load_da_proposal(ViewNumber::new(v))? {
if let Some(proposal) = self.load_da_proposal(v)? {
let payload = Payload::from_bytes(
&proposal.data.encoded_transactions,
&proposal.data.metadata,
);
leaf.fill_block_payload_unchecked(payload);
} else {
tracing::debug!(view = v, "DA proposal not available at decide");
tracing::debug!(?v, "DA proposal not available at decide");
}

let info = LeafInfo {
Expand All @@ -339,7 +335,7 @@ impl Inner {
if let Some((oldest_view, _)) = leaves.first_key_value() {
// The only exception is when the oldest leaf is the genesis leaf; then there was no
// previous decide event.
if *oldest_view > 0 {
if *oldest_view > ViewNumber::genesis() {
leaves.pop_first();
}
}
Expand All @@ -350,7 +346,7 @@ impl Inner {
let height = leaf.leaf.block_header().block_number();
consumer
.handle_event(&Event {
view_number: ViewNumber::new(view),
view_number: view,
event: EventType::Decide {
qc: Arc::new(qc.to_qc2()),
leaf_chain: Arc::new(vec![leaf]),
Expand Down Expand Up @@ -423,15 +419,12 @@ impl Inner {
let mut anchor: Option<(Leaf2, QuorumCertificate2<SeqTypes>)> = None;

// Return the latest decided leaf.
for entry in
fs::read_dir(self.decided_leaf_path()).context("opening decided leaf directory")?
{
let file = entry.context("reading decided leaf directory")?.path();
for (_, path) in view_files(self.decided_leaf_path())? {
let bytes =
fs::read(&file).context(format!("reading decided leaf {}", file.display()))?;
fs::read(&path).context(format!("reading decided leaf {}", path.display()))?;
let (leaf, qc) =
bincode::deserialize::<(Leaf, QuorumCertificate<SeqTypes>)>(&bytes)
.context(format!("parsing decided leaf {}", file.display()))?;
.context(format!("parsing decided leaf {}", path.display()))?;
if let Some((anchor_leaf, _)) = &anchor {
if leaf.view_number() > anchor_leaf.view_number() {
let leaf2 = leaf.into();
Expand Down Expand Up @@ -760,47 +753,10 @@ impl SequencerPersistence for Persistence {
return Ok(Default::default());
}

// Then, we want to get the entries in this directory since they'll be the
// key/value pairs for our map.
let files = fs::read_dir(dir_path.clone())?.filter_map(|entry| {
let entry = entry.ok()?;
if entry.file_type().ok()?.is_file() && entry.path().extension()? == "txt" {
Some(entry.path())
} else {
None
}
});

// Read quorum proposals from every data file in this directory.
let mut map = BTreeMap::new();
for file in files {
// Parse each file into a proposal if possible. We ignore files we don't recognize or
// can't parse, as sometimes we can end up with random extra files (e.g. swap files) in
// the directory.
//
// Get the stem to remove the ".txt" from the end.
let Some(file_name) = file.file_stem() else {
continue;
};

// Parse the filename (which corresponds to the view)
let file_name = file_name.to_string_lossy();
let Ok(view_number) = file_name.parse::<u64>() else {
tracing::info!(
%file_name,
"ignoring extraneous file in quorum proposals directory"
);
continue;
};

// Now, we'll try and load the proposal associated with this function. In this case we
// do propagate errors: errors from the file system are more likely to be some transient
// issue (e.g. failure to connect to a network-mounted file system) than an issue with
// the file itself (like a swap file being left over in the directory). Thus, this file
// likely does have good data even if we can't read it, so we do want to propagate the
// error and eventually retry.
let proposal_bytes = fs::read(file)?;

// Then, deserialize.
for (view, path) in view_files(&dir_path)? {
let proposal_bytes = fs::read(path)?;
let proposal: Proposal<SeqTypes, QuorumProposal<SeqTypes>> =
match bincode::deserialize(&proposal_bytes) {
Ok(proposal) => proposal,
Expand All @@ -811,17 +767,14 @@ impl SequencerPersistence for Persistence {
// many proposals as we can rather than letting one bad proposal cause the
// entire operation to fail, and it is still possible that this was just
// some unintended file whose name happened to match the naming convention.
tracing::warn!(
view_number,
"ignoring malformed quorum proposal file: {err:#}"
);
tracing::warn!(?view, "ignoring malformed quorum proposal file: {err:#}");
continue;
}
};
let proposal2 = convert_proposal(proposal);

// Push to the map and we're done.
map.insert(ViewNumber::new(view_number), proposal2);
map.insert(view, proposal2);
}

Ok(map)
Expand Down Expand Up @@ -987,6 +940,31 @@ fn migrate_network_config(
Ok(network_config)
}

/// Get all paths under `dir` whose name is of the form <view number>.txt.
fn view_files(
dir: impl AsRef<Path>,
) -> anyhow::Result<impl Iterator<Item = (ViewNumber, PathBuf)>> {
Ok(fs::read_dir(dir.as_ref())?.filter_map(move |entry| {
let dir = dir.as_ref().display();
let entry = entry.ok()?;
if !entry.file_type().ok()?.is_file() {
tracing::debug!(%dir, ?entry, "ignoring non-file in data directory");
return None;
}
let path = entry.path();
if path.extension()? != "txt" {
tracing::debug!(%dir, ?entry, "ignoring non-text file in data directory");
return None;
}
let file_name = path.file_stem()?;
let Ok(view_number) = file_name.to_string_lossy().parse::<u64>() else {
tracing::debug!(%dir, ?file_name, "ignoring extraneous file in data directory");
return None;
};
Some((ViewNumber::new(view_number), entry.path().to_owned()))
}))
}

#[cfg(test)]
mod testing {
use tempfile::TempDir;
Expand Down
Loading