diff --git a/sequencer/src/persistence/fs.rs b/sequencer/src/persistence/fs.rs index 98c6e2472f..38a1c6ca34 100644 --- a/sequencer/src/persistence/fs.rs +++ b/sequencer/src/persistence/fs.rs @@ -221,49 +221,56 @@ impl Inner { fn collect_garbage( &mut self, - view: ViewNumber, - intervals: &[RangeInclusive], + decided_view: ViewNumber, + prune_intervals: &[RangeInclusive], ) -> anyhow::Result<()> { - let view_number = view.u64(); - let prune_view = view.saturating_sub(self.view_retention); + let prune_view = ViewNumber::new(decided_view.saturating_sub(self.view_retention)); - let delete_files = - |intervals: &[RangeInclusive], keep, dir_path: PathBuf| -> anyhow::Result<()> { - if !dir_path.is_dir() { - return Ok(()); - } + self.prune_files(self.da_dir_path(), prune_view, None, prune_intervals)?; + self.prune_files(self.vid_dir_path(), prune_view, None, prune_intervals)?; + self.prune_files( + self.quorum_proposals_dir_path(), + prune_view, + None, + prune_intervals, + )?; - for entry in fs::read_dir(dir_path)? { - let entry = entry?; - let path = entry.path(); - - if let Some(file) = path.file_stem().and_then(|n| n.to_str()) { - if let Ok(v) = file.parse::() { - // If the view is the anchor view, keep it no matter what. - if let Some(keep) = keep { - if keep == v { - continue; - } - } - // Otherwise, delete it if it is time to prune this view _or_ if the - // given intervals, which we've already successfully processed, contain - // the view; in this case we simply don't need it anymore. - if v < prune_view || intervals.iter().any(|i| i.contains(&v)) { - fs::remove_file(&path)?; - } - } - } - } + // Save the most recent leaf as it will be our anchor point if the node restarts. + self.prune_files( + self.decided_leaf_path(), + prune_view, + Some(decided_view), + prune_intervals, + )?; - Ok(()) - }; + Ok(()) + } - delete_files(intervals, None, self.da_dir_path())?; - delete_files(intervals, None, self.vid_dir_path())?; - delete_files(intervals, None, self.quorum_proposals_dir_path())?; + fn prune_files( + &mut self, + dir_path: PathBuf, + prune_view: ViewNumber, + keep_decided_view: Option, + prune_intervals: &[RangeInclusive], + ) -> anyhow::Result<()> { + if !dir_path.is_dir() { + return Ok(()); + } - // Save the most recent leaf as it will be our anchor point if the node restarts. - delete_files(intervals, Some(view_number), self.decided_leaf_path())?; + for (file_view, path) in view_files(dir_path)? { + // If the view is the anchor view, keep it no matter what. + if let Some(decided_view) = keep_decided_view { + if decided_view == file_view { + continue; + } + } + // Otherwise, delete it if it is time to prune this view _or_ if the given intervals, + // which we've already successfully processed, contain the view; in this case we simply + // don't need it anymore. + if file_view < prune_view || prune_intervals.iter().any(|i| i.contains(&file_view)) { + fs::remove_file(&path)?; + } + } Ok(()) } @@ -276,22 +283,13 @@ impl Inner { &self, view: ViewNumber, consumer: &impl EventConsumer, - ) -> anyhow::Result>> { + ) -> anyhow::Result>> { // Generate a decide event for each leaf, to be processed by the event consumer. We make a // separate event for each leaf because it is possible we have non-consecutive leaves in our // storage, which would not be valid as a single decide with a single leaf chain. let mut leaves = BTreeMap::new(); - for entry in fs::read_dir(self.decided_leaf_path())? { - let entry = entry?; - let path = entry.path(); - - let Some(file) = path.file_stem().and_then(|n| n.to_str()) else { - continue; - }; - let Ok(v) = file.parse::() else { - continue; - }; - if v > view.u64() { + for (v, path) in view_files(self.decided_leaf_path())? { + if v > view { continue; } @@ -302,22 +300,20 @@ impl Inner { .context(format!("parsing decided leaf {}", path.display()))?; // Include the VID share if available. - let vid_share = self - .load_vid_share(ViewNumber::new(v))? - .map(|proposal| proposal.data); + let vid_share = self.load_vid_share(v)?.map(|proposal| proposal.data); if vid_share.is_none() { - tracing::debug!(view = v, "VID share not available at decide"); + tracing::debug!(?v, "VID share not available at decide"); } // Fill in the full block payload using the DA proposals we had persisted. - if let Some(proposal) = self.load_da_proposal(ViewNumber::new(v))? { + if let Some(proposal) = self.load_da_proposal(v)? { let payload = Payload::from_bytes( &proposal.data.encoded_transactions, &proposal.data.metadata, ); leaf.fill_block_payload_unchecked(payload); } else { - tracing::debug!(view = v, "DA proposal not available at decide"); + tracing::debug!(?v, "DA proposal not available at decide"); } let info = LeafInfo { @@ -339,7 +335,7 @@ impl Inner { if let Some((oldest_view, _)) = leaves.first_key_value() { // The only exception is when the oldest leaf is the genesis leaf; then there was no // previous decide event. - if *oldest_view > 0 { + if *oldest_view > ViewNumber::genesis() { leaves.pop_first(); } } @@ -350,7 +346,7 @@ impl Inner { let height = leaf.leaf.block_header().block_number(); consumer .handle_event(&Event { - view_number: ViewNumber::new(view), + view_number: view, event: EventType::Decide { qc: Arc::new(qc.to_qc2()), leaf_chain: Arc::new(vec![leaf]), @@ -423,15 +419,12 @@ impl Inner { let mut anchor: Option<(Leaf2, QuorumCertificate2)> = None; // Return the latest decided leaf. - for entry in - fs::read_dir(self.decided_leaf_path()).context("opening decided leaf directory")? - { - let file = entry.context("reading decided leaf directory")?.path(); + for (_, path) in view_files(self.decided_leaf_path())? { let bytes = - fs::read(&file).context(format!("reading decided leaf {}", file.display()))?; + fs::read(&path).context(format!("reading decided leaf {}", path.display()))?; let (leaf, qc) = bincode::deserialize::<(Leaf, QuorumCertificate)>(&bytes) - .context(format!("parsing decided leaf {}", file.display()))?; + .context(format!("parsing decided leaf {}", path.display()))?; if let Some((anchor_leaf, _)) = &anchor { if leaf.view_number() > anchor_leaf.view_number() { let leaf2 = leaf.into(); @@ -760,47 +753,10 @@ impl SequencerPersistence for Persistence { return Ok(Default::default()); } - // Then, we want to get the entries in this directory since they'll be the - // key/value pairs for our map. - let files = fs::read_dir(dir_path.clone())?.filter_map(|entry| { - let entry = entry.ok()?; - if entry.file_type().ok()?.is_file() && entry.path().extension()? == "txt" { - Some(entry.path()) - } else { - None - } - }); - + // Read quorum proposals from every data file in this directory. let mut map = BTreeMap::new(); - for file in files { - // Parse each file into a proposal if possible. We ignore files we don't recognize or - // can't parse, as sometimes we can end up with random extra files (e.g. swap files) in - // the directory. - // - // Get the stem to remove the ".txt" from the end. - let Some(file_name) = file.file_stem() else { - continue; - }; - - // Parse the filename (which corresponds to the view) - let file_name = file_name.to_string_lossy(); - let Ok(view_number) = file_name.parse::() else { - tracing::info!( - %file_name, - "ignoring extraneous file in quorum proposals directory" - ); - continue; - }; - - // Now, we'll try and load the proposal associated with this function. In this case we - // do propagate errors: errors from the file system are more likely to be some transient - // issue (e.g. failure to connect to a network-mounted file system) than an issue with - // the file itself (like a swap file being left over in the directory). Thus, this file - // likely does have good data even if we can't read it, so we do want to propagate the - // error and eventually retry. - let proposal_bytes = fs::read(file)?; - - // Then, deserialize. + for (view, path) in view_files(&dir_path)? { + let proposal_bytes = fs::read(path)?; let proposal: Proposal> = match bincode::deserialize(&proposal_bytes) { Ok(proposal) => proposal, @@ -811,17 +767,14 @@ impl SequencerPersistence for Persistence { // many proposals as we can rather than letting one bad proposal cause the // entire operation to fail, and it is still possible that this was just // some unintended file whose name happened to match the naming convention. - tracing::warn!( - view_number, - "ignoring malformed quorum proposal file: {err:#}" - ); + tracing::warn!(?view, "ignoring malformed quorum proposal file: {err:#}"); continue; } }; let proposal2 = convert_proposal(proposal); // Push to the map and we're done. - map.insert(ViewNumber::new(view_number), proposal2); + map.insert(view, proposal2); } Ok(map) @@ -987,6 +940,31 @@ fn migrate_network_config( Ok(network_config) } +/// Get all paths under `dir` whose name is of the form .txt. +fn view_files( + dir: impl AsRef, +) -> anyhow::Result> { + Ok(fs::read_dir(dir.as_ref())?.filter_map(move |entry| { + let dir = dir.as_ref().display(); + let entry = entry.ok()?; + if !entry.file_type().ok()?.is_file() { + tracing::debug!(%dir, ?entry, "ignoring non-file in data directory"); + return None; + } + let path = entry.path(); + if path.extension()? != "txt" { + tracing::debug!(%dir, ?entry, "ignoring non-text file in data directory"); + return None; + } + let file_name = path.file_stem()?; + let Ok(view_number) = file_name.to_string_lossy().parse::() else { + tracing::debug!(%dir, ?file_name, "ignoring extraneous file in data directory"); + return None; + }; + Some((ViewNumber::new(view_number), entry.path().to_owned())) + })) +} + #[cfg(test)] mod testing { use tempfile::TempDir;