Skip to content

Commit 1f7215e

Browse files
authored
Fix handling of swap files (#2624)
* Fix handling of swap files * Refactor fs garbage collection
1 parent e8ea153 commit 1f7215e

File tree

1 file changed

+86
-108
lines changed
  • sequencer/src/persistence

1 file changed

+86
-108
lines changed

sequencer/src/persistence/fs.rs

+86-108
Original file line numberDiff line numberDiff line change
@@ -221,49 +221,56 @@ impl Inner {
221221

222222
fn collect_garbage(
223223
&mut self,
224-
view: ViewNumber,
225-
intervals: &[RangeInclusive<u64>],
224+
decided_view: ViewNumber,
225+
prune_intervals: &[RangeInclusive<ViewNumber>],
226226
) -> anyhow::Result<()> {
227-
let view_number = view.u64();
228-
let prune_view = view.saturating_sub(self.view_retention);
227+
let prune_view = ViewNumber::new(decided_view.saturating_sub(self.view_retention));
229228

230-
let delete_files =
231-
|intervals: &[RangeInclusive<u64>], keep, dir_path: PathBuf| -> anyhow::Result<()> {
232-
if !dir_path.is_dir() {
233-
return Ok(());
234-
}
229+
self.prune_files(self.da_dir_path(), prune_view, None, prune_intervals)?;
230+
self.prune_files(self.vid_dir_path(), prune_view, None, prune_intervals)?;
231+
self.prune_files(
232+
self.quorum_proposals_dir_path(),
233+
prune_view,
234+
None,
235+
prune_intervals,
236+
)?;
235237

236-
for entry in fs::read_dir(dir_path)? {
237-
let entry = entry?;
238-
let path = entry.path();
239-
240-
if let Some(file) = path.file_stem().and_then(|n| n.to_str()) {
241-
if let Ok(v) = file.parse::<u64>() {
242-
// If the view is the anchor view, keep it no matter what.
243-
if let Some(keep) = keep {
244-
if keep == v {
245-
continue;
246-
}
247-
}
248-
// Otherwise, delete it if it is time to prune this view _or_ if the
249-
// given intervals, which we've already successfully processed, contain
250-
// the view; in this case we simply don't need it anymore.
251-
if v < prune_view || intervals.iter().any(|i| i.contains(&v)) {
252-
fs::remove_file(&path)?;
253-
}
254-
}
255-
}
256-
}
238+
// Save the most recent leaf as it will be our anchor point if the node restarts.
239+
self.prune_files(
240+
self.decided_leaf_path(),
241+
prune_view,
242+
Some(decided_view),
243+
prune_intervals,
244+
)?;
257245

258-
Ok(())
259-
};
246+
Ok(())
247+
}
260248

261-
delete_files(intervals, None, self.da_dir_path())?;
262-
delete_files(intervals, None, self.vid_dir_path())?;
263-
delete_files(intervals, None, self.quorum_proposals_dir_path())?;
249+
fn prune_files(
250+
&mut self,
251+
dir_path: PathBuf,
252+
prune_view: ViewNumber,
253+
keep_decided_view: Option<ViewNumber>,
254+
prune_intervals: &[RangeInclusive<ViewNumber>],
255+
) -> anyhow::Result<()> {
256+
if !dir_path.is_dir() {
257+
return Ok(());
258+
}
264259

265-
// Save the most recent leaf as it will be our anchor point if the node restarts.
266-
delete_files(intervals, Some(view_number), self.decided_leaf_path())?;
260+
for (file_view, path) in view_files(dir_path)? {
261+
// If the view is the anchor view, keep it no matter what.
262+
if let Some(decided_view) = keep_decided_view {
263+
if decided_view == file_view {
264+
continue;
265+
}
266+
}
267+
// Otherwise, delete it if it is time to prune this view _or_ if the given intervals,
268+
// which we've already successfully processed, contain the view; in this case we simply
269+
// don't need it anymore.
270+
if file_view < prune_view || prune_intervals.iter().any(|i| i.contains(&file_view)) {
271+
fs::remove_file(&path)?;
272+
}
273+
}
267274

268275
Ok(())
269276
}
@@ -276,22 +283,13 @@ impl Inner {
276283
&self,
277284
view: ViewNumber,
278285
consumer: &impl EventConsumer,
279-
) -> anyhow::Result<Vec<RangeInclusive<u64>>> {
286+
) -> anyhow::Result<Vec<RangeInclusive<ViewNumber>>> {
280287
// Generate a decide event for each leaf, to be processed by the event consumer. We make a
281288
// separate event for each leaf because it is possible we have non-consecutive leaves in our
282289
// storage, which would not be valid as a single decide with a single leaf chain.
283290
let mut leaves = BTreeMap::new();
284-
for entry in fs::read_dir(self.decided_leaf_path())? {
285-
let entry = entry?;
286-
let path = entry.path();
287-
288-
let Some(file) = path.file_stem().and_then(|n| n.to_str()) else {
289-
continue;
290-
};
291-
let Ok(v) = file.parse::<u64>() else {
292-
continue;
293-
};
294-
if v > view.u64() {
291+
for (v, path) in view_files(self.decided_leaf_path())? {
292+
if v > view {
295293
continue;
296294
}
297295

@@ -302,22 +300,20 @@ impl Inner {
302300
.context(format!("parsing decided leaf {}", path.display()))?;
303301

304302
// Include the VID share if available.
305-
let vid_share = self
306-
.load_vid_share(ViewNumber::new(v))?
307-
.map(|proposal| proposal.data);
303+
let vid_share = self.load_vid_share(v)?.map(|proposal| proposal.data);
308304
if vid_share.is_none() {
309-
tracing::debug!(view = v, "VID share not available at decide");
305+
tracing::debug!(?v, "VID share not available at decide");
310306
}
311307

312308
// Fill in the full block payload using the DA proposals we had persisted.
313-
if let Some(proposal) = self.load_da_proposal(ViewNumber::new(v))? {
309+
if let Some(proposal) = self.load_da_proposal(v)? {
314310
let payload = Payload::from_bytes(
315311
&proposal.data.encoded_transactions,
316312
&proposal.data.metadata,
317313
);
318314
leaf.fill_block_payload_unchecked(payload);
319315
} else {
320-
tracing::debug!(view = v, "DA proposal not available at decide");
316+
tracing::debug!(?v, "DA proposal not available at decide");
321317
}
322318

323319
let info = LeafInfo {
@@ -339,7 +335,7 @@ impl Inner {
339335
if let Some((oldest_view, _)) = leaves.first_key_value() {
340336
// The only exception is when the oldest leaf is the genesis leaf; then there was no
341337
// previous decide event.
342-
if *oldest_view > 0 {
338+
if *oldest_view > ViewNumber::genesis() {
343339
leaves.pop_first();
344340
}
345341
}
@@ -350,7 +346,7 @@ impl Inner {
350346
let height = leaf.leaf.block_header().block_number();
351347
consumer
352348
.handle_event(&Event {
353-
view_number: ViewNumber::new(view),
349+
view_number: view,
354350
event: EventType::Decide {
355351
qc: Arc::new(qc.to_qc2()),
356352
leaf_chain: Arc::new(vec![leaf]),
@@ -423,15 +419,12 @@ impl Inner {
423419
let mut anchor: Option<(Leaf2, QuorumCertificate2<SeqTypes>)> = None;
424420

425421
// Return the latest decided leaf.
426-
for entry in
427-
fs::read_dir(self.decided_leaf_path()).context("opening decided leaf directory")?
428-
{
429-
let file = entry.context("reading decided leaf directory")?.path();
422+
for (_, path) in view_files(self.decided_leaf_path())? {
430423
let bytes =
431-
fs::read(&file).context(format!("reading decided leaf {}", file.display()))?;
424+
fs::read(&path).context(format!("reading decided leaf {}", path.display()))?;
432425
let (leaf, qc) =
433426
bincode::deserialize::<(Leaf, QuorumCertificate<SeqTypes>)>(&bytes)
434-
.context(format!("parsing decided leaf {}", file.display()))?;
427+
.context(format!("parsing decided leaf {}", path.display()))?;
435428
if let Some((anchor_leaf, _)) = &anchor {
436429
if leaf.view_number() > anchor_leaf.view_number() {
437430
let leaf2 = leaf.into();
@@ -760,47 +753,10 @@ impl SequencerPersistence for Persistence {
760753
return Ok(Default::default());
761754
}
762755

763-
// Then, we want to get the entries in this directory since they'll be the
764-
// key/value pairs for our map.
765-
let files = fs::read_dir(dir_path.clone())?.filter_map(|entry| {
766-
let entry = entry.ok()?;
767-
if entry.file_type().ok()?.is_file() && entry.path().extension()? == "txt" {
768-
Some(entry.path())
769-
} else {
770-
None
771-
}
772-
});
773-
756+
// Read quorum proposals from every data file in this directory.
774757
let mut map = BTreeMap::new();
775-
for file in files {
776-
// Parse each file into a proposal if possible. We ignore files we don't recognize or
777-
// can't parse, as sometimes we can end up with random extra files (e.g. swap files) in
778-
// the directory.
779-
//
780-
// Get the stem to remove the ".txt" from the end.
781-
let Some(file_name) = file.file_stem() else {
782-
continue;
783-
};
784-
785-
// Parse the filename (which corresponds to the view)
786-
let file_name = file_name.to_string_lossy();
787-
let Ok(view_number) = file_name.parse::<u64>() else {
788-
tracing::info!(
789-
%file_name,
790-
"ignoring extraneous file in quorum proposals directory"
791-
);
792-
continue;
793-
};
794-
795-
// Now, we'll try and load the proposal associated with this function. In this case we
796-
// do propagate errors: errors from the file system are more likely to be some transient
797-
// issue (e.g. failure to connect to a network-mounted file system) than an issue with
798-
// the file itself (like a swap file being left over in the directory). Thus, this file
799-
// likely does have good data even if we can't read it, so we do want to propagate the
800-
// error and eventually retry.
801-
let proposal_bytes = fs::read(file)?;
802-
803-
// Then, deserialize.
758+
for (view, path) in view_files(&dir_path)? {
759+
let proposal_bytes = fs::read(path)?;
804760
let proposal: Proposal<SeqTypes, QuorumProposal<SeqTypes>> =
805761
match bincode::deserialize(&proposal_bytes) {
806762
Ok(proposal) => proposal,
@@ -811,17 +767,14 @@ impl SequencerPersistence for Persistence {
811767
// many proposals as we can rather than letting one bad proposal cause the
812768
// entire operation to fail, and it is still possible that this was just
813769
// some unintended file whose name happened to match the naming convention.
814-
tracing::warn!(
815-
view_number,
816-
"ignoring malformed quorum proposal file: {err:#}"
817-
);
770+
tracing::warn!(?view, "ignoring malformed quorum proposal file: {err:#}");
818771
continue;
819772
}
820773
};
821774
let proposal2 = convert_proposal(proposal);
822775

823776
// Push to the map and we're done.
824-
map.insert(ViewNumber::new(view_number), proposal2);
777+
map.insert(view, proposal2);
825778
}
826779

827780
Ok(map)
@@ -987,6 +940,31 @@ fn migrate_network_config(
987940
Ok(network_config)
988941
}
989942

943+
/// Get all paths under `dir` whose name is of the form <view number>.txt.
944+
fn view_files(
945+
dir: impl AsRef<Path>,
946+
) -> anyhow::Result<impl Iterator<Item = (ViewNumber, PathBuf)>> {
947+
Ok(fs::read_dir(dir.as_ref())?.filter_map(move |entry| {
948+
let dir = dir.as_ref().display();
949+
let entry = entry.ok()?;
950+
if !entry.file_type().ok()?.is_file() {
951+
tracing::debug!(%dir, ?entry, "ignoring non-file in data directory");
952+
return None;
953+
}
954+
let path = entry.path();
955+
if path.extension()? != "txt" {
956+
tracing::debug!(%dir, ?entry, "ignoring non-text file in data directory");
957+
return None;
958+
}
959+
let file_name = path.file_stem()?;
960+
let Ok(view_number) = file_name.to_string_lossy().parse::<u64>() else {
961+
tracing::debug!(%dir, ?file_name, "ignoring extraneous file in data directory");
962+
return None;
963+
};
964+
Some((ViewNumber::new(view_number), entry.path().to_owned()))
965+
}))
966+
}
967+
990968
#[cfg(test)]
991969
mod testing {
992970
use tempfile::TempDir;

0 commit comments

Comments
 (0)