Skip to content

Commit

Permalink
Don’t always append larger updates in archive. Add archive-stats comm…
Browse files Browse the repository at this point in the history
…and.
  • Loading branch information
partim committed Dec 6, 2024
1 parent ea03b6a commit 2574af9
Show file tree
Hide file tree
Showing 6 changed files with 155 additions and 11 deletions.
2 changes: 1 addition & 1 deletion src/collector/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
// `Collector`, `Run`, and `Repository` types.
//
pub use self::base::{Collector, Cleanup, Run, Repository};
pub use self::rrdp::{HttpStatus, SnapshotReason};
pub use self::rrdp::{HttpStatus, RrdpArchive, SnapshotReason};

mod base;
mod rrdp;
Expand Down
7 changes: 3 additions & 4 deletions src/collector/rrdp/archive.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use crate::config::Config;
use crate::error::RunFailed;
use crate::utils::archive;
use crate::utils::archive::{
Archive, ArchiveError, FetchError, OpenError, PublishError
Archive, ArchiveError, ArchiveStats, FetchError, OpenError, PublishError
};
use crate::utils::binio::{Compose, Parse};

Expand Down Expand Up @@ -81,10 +81,9 @@ impl RrdpArchive {
}

impl RrdpArchive {
pub fn verify(path: &Path) -> Result<(), OpenError> {
pub fn verify(path: &Path) -> Result<ArchiveStats, OpenError> {
let archive = archive::Archive::<RrdpObjectMeta>::open(path, false)?;
archive.verify()?;
Ok(())
Ok(archive.verify()?)
}

/// Loads an object from the archive.
Expand Down
2 changes: 1 addition & 1 deletion src/collector/rrdp/base.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ impl Collector {
continue;
}
match RrdpArchive::verify(entry.path()) {
Ok(()) | Err(OpenError::NotFound) => { }
Ok(_) | Err(OpenError::NotFound) => { }
Err(OpenError::Archive(ArchiveError::Io(err))) => {
error!(
"Fatal: Failed to read RRDP repository archive\
Expand Down
1 change: 1 addition & 0 deletions src/collector/rrdp/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#![allow(dead_code)]

pub use self::archive::RrdpArchive;
pub use self::base::{Collector, LoadResult, ReadRepository, Run};
pub use self::http::HttpStatus;
pub use self::update::SnapshotReason;
Expand Down
70 changes: 70 additions & 0 deletions src/operation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ pub enum Operation {
Update(Update),
PrintConfig(PrintConfig),
Dump(Dump),
ArchiveStats(ArchiveStats),
Man(Man),
}

Expand All @@ -93,6 +94,7 @@ impl Operation {
let app = Update::config_args(app);
let app = PrintConfig::config_args(app);
let app = Dump::config_args(app);
let app = ArchiveStats::config_args(app);
Man::config_args(app)
}

Expand Down Expand Up @@ -131,6 +133,11 @@ impl Operation {
Some(("dump", matches)) => {
Operation::Dump( Dump::from_arg_matches(matches, cur_dir)?)
}
Some(("archive-stats", matches)) => {
Operation::ArchiveStats(
ArchiveStats::from_arg_matches(matches)?
)
}
Some(("man", matches)) => {
Operation::Man(Man::from_arg_matches(matches)?)
}
Expand Down Expand Up @@ -167,6 +174,7 @@ impl Operation {
Operation::Update(cmd) => cmd.run(process),
Operation::PrintConfig(cmd) => cmd.run(process),
Operation::Dump(cmd) => cmd.run(process),
Operation::ArchiveStats(cmd) => cmd.run(process),
Operation::Man(cmd) => cmd.run(process),
}
}
Expand Down Expand Up @@ -252,6 +260,7 @@ impl Server {
let join = thread::spawn(move || {
let mut can_retry = true;
let err = loop {
eprintln!("Starting run");
if let Some(log) = log.as_ref() {
log.start();
}
Expand Down Expand Up @@ -310,6 +319,16 @@ impl Server {
// to recalculate timeout.
let deadline = Instant::now() + timeout;

info!(
"Next validation run scheduled in {} seconds",
timeout.as_secs()
);

eprintln!(
"Next validation run scheduled in {} seconds",
timeout.as_secs()
);

let end = loop {
let timeout = deadline.saturating_duration_since(
Instant::now()
Expand Down Expand Up @@ -1184,6 +1203,57 @@ impl Dump {
}


//------------ ArchiveStats --------------------------------------------------

/// Prints archive statistics.
#[derive(Clone, Debug, Parser)]
pub struct ArchiveStats {
/// Archive file.
#[arg(value_name = "PATH")]
archive: PathBuf,
}

impl ArchiveStats {
/// Adds the command configuration to a clap app.
pub fn config_args<'a: 'b, 'b>(app: clap::Command) -> clap::Command {
// config
app.subcommand(
ArchiveStats::augment_args(
clap::Command::new("archive-stats")
.about("Prints statics for an RRDP archive")
.after_help(AFTER_HELP)
)
)
}

/// Creates a command from clap matches.
pub fn from_arg_matches(matches: &ArgMatches) -> Result<Self, Failed> {
Ok(
<ArchiveStats as FromArgMatches>::from_arg_matches(
matches
).unwrap()
)
}

fn run(self, process: Process) -> Result<(), ExitError> {
use crate::collector::RrdpArchive;

process.switch_logging(false, false)?;
match RrdpArchive::verify(&self.archive) {
Ok(stats) => {
println!("RRDP archive {}:", self.archive.display());
stats.print();
Ok(())
}
Err(err) => {
eprintln!("Archive is corrupt: {err}");
Err(ExitError::Generic)
}
}
}
}


//------------ Man -----------------------------------------------------------

/// Show the manual page.
Expand Down
84 changes: 79 additions & 5 deletions src/utils/archive.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
//! If possible (currently on Unix systems only), the file is memory mapped
//! for faster access.
use std::{fmt, fs, io, mem};
use std::{cmp, fmt, fs, io, mem};
use std::borrow::Cow;
use std::hash::Hasher;
use std::marker::PhantomData;
Expand Down Expand Up @@ -136,12 +136,14 @@ impl<Meta> Archive<Meta> {
/// The method traverses the entire archive and makes sure that the
/// entiry file is covered by objects and that these objects aren’t
/// overlapping.
pub fn verify(&self) -> Result<(), ArchiveError> {
pub fn verify(&self) -> Result<ArchiveStats, ArchiveError> {
// We’re going to collect a list of all encountered objects in here.
// Items are pair of the start position and the length.
// At the end we check that they form a consecutive sequence.
let mut objects = Vec::new();

let mut stats = ArchiveStats::default();

// Step 1. Go over each index bucket and collect all the objects.
// Check that the name hashes correctly.
for idx in 0.. usize_to_u64(self.meta.bucket_count) {
Expand All @@ -154,6 +156,8 @@ impl<Meta> Archive<Meta> {
return Err(ArchiveError::Corrupt)
}
objects.push((u64::from(pos), header.size));
stats.object_count += 1;
stats.object_size += header.size;
start = header.next;
}
}
Expand All @@ -163,6 +167,15 @@ impl<Meta> Archive<Meta> {
while let Some(pos) = start {
let header = ObjectHeader::read(&self.file, pos.into())?;
objects.push((u64::from(pos), header.size));
stats.empty_count += 1;
stats.empty_size += header.size;
if stats.empty_min == 0 {
stats.empty_min = header.size
}
else {
stats.empty_min = cmp::min(stats.empty_min, header.size);
}
stats.empty_max = cmp::max(stats.empty_max, header.size);
start = header.next;
}

Expand All @@ -175,7 +188,7 @@ impl<Meta> Archive<Meta> {
}
}

Ok(())
Ok(stats)
}

/// Returns an iterator over all the objects in the archive.
Expand Down Expand Up @@ -271,11 +284,21 @@ impl<Meta: ObjectMeta> Archive<Meta> {
if self.find(hash, name)?.is_some() {
return Err(PublishError::AlreadyExists)
}
self.publish_not_found(hash, name, meta, data)?;
Ok(())
}

fn publish_not_found(
&mut self,
hash: u64, name: &[u8], meta: &Meta, data: &[u8]
) -> Result<(), ArchiveError> {
match self.find_empty(name, data)? {
Some((empty, pos)) => {
self.publish_replace(hash, name, meta, data, empty, pos)?
}
None => self.publish_append(hash, name, meta, data)?,
None => {
self.publish_append(hash, name, meta, data)?
}
}
Ok(())
}
Expand Down Expand Up @@ -368,7 +391,7 @@ impl<Meta: ObjectMeta> Archive<Meta> {
}
else {
self.delete_found(hash, found)?;
self.publish_append(hash, name, meta, data)?;
self.publish_not_found(hash, name, meta, data)?;
}
Ok(())
}
Expand Down Expand Up @@ -938,6 +961,57 @@ impl FoundObject {
}


//------------ ArchiveStats --------------------------------------------------

/// Statistics for an archive.
///
/// A value of this type is returned by [`Archive::verify`].
#[derive(Clone, Copy, Debug, Default)]
pub struct ArchiveStats {
/// The number of objects in the archive.
pub object_count: u64,

/// The overall size of objects in bytes.
pub object_size: u64,

/// The number of blocks of empty space.
pub empty_count: u64,

/// The overall size of empty space.
pub empty_size: u64,

/// The smallest empty block.
pub empty_min: u64,

/// The largest empty block.
pub empty_max: u64,
}

impl ArchiveStats {
/// Prints the stats to stdout.
///
/// Uses a two space indent.
pub fn print(self) {
println!(" object count: {}", self.object_count);
if self.object_count > 0 {
println!(" object size, sum: {}", self.object_size);
println!(" object size, avg: {}",
self.object_size / self.object_count
);
}
println!(" empty block count: {}", self.empty_count);
if self.empty_count > 0 {
println!(" empty size, sum: {}", self.empty_size);
println!(" empty size, min: {}", self.empty_min);
println!(" empty size, max: {}", self.empty_max);
println!(
" empty size, avg: {}", self.empty_size / self.empty_count
);
}
}
}


//------------ Magic Cookie --------------------------------------------------
//
// The marker we use for a quick file type check.
Expand Down

0 comments on commit 2574af9

Please sign in to comment.