Skip to content

Commit

Permalink
tests and fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
tobz committed Feb 13, 2025
1 parent 7f38063 commit 2e2c3a1
Show file tree
Hide file tree
Showing 4 changed files with 244 additions and 13 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,7 @@ tikv-jemalloc-ctl = "0.6"
tikv-jemallocator = { version = "0.6", default-features = false }
axum-extra = { version = "0.9", default-features = false }
papaya = { version = "0.1.7", default-features = false }
tempfile = { version = "3", default-features = false }

[profile.release]
lto = "thin"
Expand Down
1 change: 1 addition & 0 deletions lib/saluki-io/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -61,4 +61,5 @@ url = { workspace = true, features = ["std"] }
[dev-dependencies]
http-body-util = { workspace = true }
proptest = { workspace = true }
tempfile = { workspace = true }
tokio-test = { workspace = true }
254 changes: 241 additions & 13 deletions lib/saluki-io/src/net/util/retry/queue/persisted.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@ use std::{
path::{Path, PathBuf},
};

use chrono::{NaiveDateTime, Utc};
use chrono::{DateTime, NaiveDateTime, Utc};
use rand::Rng;
use saluki_error::{ErrorContext as _, GenericError};
use saluki_error::{generic_error, ErrorContext as _, GenericError};
use serde::{de::DeserializeOwned, Serialize};
use tracing::warn;

Expand All @@ -15,7 +15,7 @@ use tracing::warn;
/// Represents the high-level metadata of a persisted entry, including the path to and size of the entry.
struct PersistedEntry {
path: PathBuf,
timestamp: u64,
timestamp: u128,
size_bytes: u64,
}

Expand All @@ -32,7 +32,7 @@ impl PersistedEntry {
})
}

fn from_parts(path: PathBuf, timestamp: u64, size_bytes: u64) -> Self {
fn from_parts(path: PathBuf, timestamp: u128, size_bytes: u64) -> Self {
Self {
path,
timestamp,
Expand Down Expand Up @@ -63,9 +63,9 @@ where
/// shrink the directory to fit the given maximum size, an error is returned.
pub async fn from_root_path(root_path: PathBuf, max_size_bytes: u64) -> Result<Self, GenericError> {
// Make sure the directory exists first.
tokio::fs::create_dir_all(&root_path)
create_directory_recursive(root_path.clone())
.await
.with_error_context(|| format!("Failed to create retry directory '{}'.", root_path.display()))?;
.error_context("Failed to create retry directory.")?;

let mut persisted_requests = Self {
root_path,
Expand All @@ -91,6 +91,10 @@ where
let serialized = serde_json::to_vec(&entry)
.with_error_context(|| format!("Failed to serialize entry for '{}'.", entry_path.display()))?;

if serialized.len() as u64 > self.max_size_bytes {
return Err(generic_error!("Entry is too large to persist."));
}

// Make sure we have enough space to persist the entry.
self.remove_until_available_space(serialized.len() as u64)
.await
Expand Down Expand Up @@ -145,6 +149,11 @@ where
let deserialized = serde_json::from_slice(&serialized)
.with_error_context(|| format!("Failed to deserialize persisted entry '{}'.", entry.path.display()))?;

// Delete the entry from disk before returning, so that we don't risk sending duplicates.
tokio::fs::remove_file(&entry.path)
.await
.with_error_context(|| format!("Failed to remove persisted entry '{}'.", entry.path.display()))?;

return Ok(Some(deserialized));
}
}
Expand Down Expand Up @@ -195,7 +204,7 @@ where
/// If there is an error while deleting persisted entries, an error is returned.
async fn remove_until_available_space(&mut self, required_bytes: u64) -> io::Result<()> {
let mut total_size_bytes = self.entries.iter().map(|entry| entry.size_bytes).sum::<u64>();
while total_size_bytes + required_bytes > self.max_size_bytes {
while !self.entries.is_empty() && total_size_bytes + required_bytes > self.max_size_bytes {
let entry = self.entries.remove(0);
tokio::fs::remove_file(&entry.path).await?;

Expand All @@ -208,17 +217,17 @@ where
}
}

fn generate_timestamped_filename() -> (PathBuf, u64) {
fn generate_timestamped_filename() -> (PathBuf, u128) {
let now = Utc::now();
let now_ts = now.timestamp() as u64;
let now_ts = datetime_to_timestamp(now);
let nonce = rand::thread_rng().gen_range(100000000..999999999);

let filename = format!("retry-{}-{}.json", now.format("%Y%m%d%H%M%S"), nonce).into();
let filename = format!("retry-{}-{}.json", now.format("%Y%m%d%H%M%S%f"), nonce).into();

(filename, now_ts)
}

fn decode_timestamped_filename(path: &Path) -> Option<u64> {
fn decode_timestamped_filename(path: &Path) -> Option<u128> {
let filename = path.file_stem()?.to_str()?;
let mut filename_parts = filename.split('-');

Expand All @@ -232,7 +241,226 @@ fn decode_timestamped_filename(path: &Path) -> Option<u64> {
}

// Try and decode the timestamp portion.
NaiveDateTime::parse_from_str(timestamp_str, "%Y%m%d%H%M%S")
.map(|dt| dt.and_utc().timestamp() as u64)
NaiveDateTime::parse_from_str(timestamp_str, "%Y%m%d%H%M%S%f")
.map(|dt| datetime_to_timestamp(dt.and_utc()))
.ok()
}

fn datetime_to_timestamp(dt: DateTime<Utc>) -> u128 {
let secs = (dt.timestamp() as u128) * 1_000_000_000;
let ns = dt.timestamp_subsec_nanos() as u128;

secs + ns
}

async fn create_directory_recursive(path: PathBuf) -> Result<(), GenericError> {
let mut dir_builder = std::fs::DirBuilder::new();
dir_builder.recursive(true);

// When on Unix platforms, adjust the permissions of the directory to be RWX for the owner only, and nothing for
// group/world.
#[cfg(unix)]
{
use std::os::unix::fs::DirBuilderExt;
dir_builder.mode(0o700);
}

tokio::task::spawn_blocking(move || {
dir_builder
.create(&path)
.with_error_context(|| format!("Failed to create directory '{}'.", path.display()))
})
.await
.error_context("Failed to spawn directory creation blocking task.")?
}

#[cfg(test)]
mod tests {
use rand::distributions::Alphanumeric;
use serde::Deserialize;

use super::*;

#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
struct FakeData {
name: String,
value: u32,
}

impl FakeData {
fn random() -> Self {
Self {
name: rand::thread_rng()
.sample_iter(&Alphanumeric)
.take(8)
.map(char::from)
.collect(),
value: rand::thread_rng().gen_range(0..100),
}
}
}

async fn files_in_dir(path: &Path) -> usize {
let mut file_count = 0;
let mut dir_reader = tokio::fs::read_dir(path).await.unwrap();
while let Some(entry) = dir_reader.next_entry().await.unwrap() {
if entry.metadata().await.unwrap().is_file() {
file_count += 1;
}
}
file_count
}

#[tokio::test]
async fn basic_push_pop() {
let data = FakeData::random();

// Create our temporary directory and point our persisted queue at it.
let temp_dir = tempfile::tempdir().expect("should not fail to create temporary directory");
let root_path = temp_dir.path().to_path_buf();

let mut persisted_queue = PersistedQueue::<FakeData>::from_root_path(root_path.clone(), 1024)
.await
.expect("should not fail to create persisted queue");

// Ensure the directory is empty.
assert_eq!(0, files_in_dir(&root_path).await);

// Push our data to the queue and ensure it persisted it to disk.
persisted_queue
.push(data.clone())
.await
.expect("should not fail to push data");
assert_eq!(1, files_in_dir(&root_path).await);

// Now pop the data back out and ensure it matches what we pushed, and that the file has been removed from disk.
let actual = persisted_queue
.pop()
.await
.expect("should not fail to pop data")
.expect("should not be empty");
assert_eq!(data, actual);
assert_eq!(0, files_in_dir(&root_path).await);
}

#[tokio::test]
async fn entry_too_large() {
let data = FakeData::random();

// Create our temporary directory and point our persisted queue at it.
let temp_dir = tempfile::tempdir().expect("should not fail to create temporary directory");
let root_path = temp_dir.path().to_path_buf();

let mut persisted_queue = PersistedQueue::<FakeData>::from_root_path(root_path.clone(), 1)
.await
.expect("should not fail to create persisted queue");

// Ensure the directory is empty.
assert_eq!(0, files_in_dir(&root_path).await);

// Attempt to push our data into the queue, which should fail because it's too large.
assert!(persisted_queue.push(data).await.is_err());

// Ensure the directory is (still) empty.
assert_eq!(0, files_in_dir(&root_path).await);
}

#[tokio::test]
async fn remove_oldest_entry_on_push() {
let data1 = FakeData::random();
let data2 = FakeData::random();

// Create our temporary directory and point our persisted queue at it.
//
// Our queue is sized such that only one entry can be persisted at a time.
let temp_dir = tempfile::tempdir().expect("should not fail to create temporary directory");
let root_path = temp_dir.path().to_path_buf();

let mut persisted_queue = PersistedQueue::<FakeData>::from_root_path(root_path.clone(), 32)
.await
.expect("should not fail to create persisted queue");

// Ensure the directory is empty.
assert_eq!(0, files_in_dir(&root_path).await);

// Push our data to the queue and ensure it persisted it to disk.
persisted_queue.push(data1).await.expect("should not fail to push data");
assert_eq!(1, files_in_dir(&root_path).await);

// Push a second data entry, which should cause the first entry to be removed.
persisted_queue
.push(data2.clone())
.await
.expect("should not fail to push data");
assert_eq!(1, files_in_dir(&root_path).await);

// Now pop the data back out and ensure it matches the second item we pushed -- indicating the first item was
// removed -- and that we've consumed it, leaving no files on disk.
let actual = persisted_queue
.pop()
.await
.expect("should not fail to pop data")
.expect("should not be empty");
assert_eq!(data2, actual);
assert_eq!(0, files_in_dir(&root_path).await);
}

#[tokio::test]
async fn trim_excess_entries_on_create() {
let data1 = FakeData::random();
let data2 = FakeData::random();
let data3 = FakeData::random();

// Create our temporary directory and point our persisted queue at it.
let temp_dir = tempfile::tempdir().expect("should not fail to create temporary directory");
let root_path = temp_dir.path().to_path_buf();

let mut persisted_queue = PersistedQueue::<FakeData>::from_root_path(root_path.clone(), 1024)
.await
.expect("should not fail to create persisted queue");

// Ensure the directory is empty.
assert_eq!(0, files_in_dir(&root_path).await);

// Push all our data to the queue.
persisted_queue.push(data1).await.expect("should not fail to push data");
persisted_queue
.push(data2.clone())
.await
.expect("should not fail to push data");
persisted_queue
.push(data3.clone())
.await
.expect("should not fail to push data");
assert_eq!(3, files_in_dir(&root_path).await);

// Now recreate the persisted queue with a smaller size, which should cause the oldest entry to be removed, as
// we only have room for two entries based on our reduced maximum size.
drop(persisted_queue);

let mut persisted_queue = PersistedQueue::<FakeData>::from_root_path(root_path.clone(), 64)
.await
.expect("should not fail to create persisted queue");

// Ensure we only have two entries left, and that the oldest entry was removed.
assert_eq!(2, files_in_dir(&root_path).await);

let actual = persisted_queue
.pop()
.await
.expect("should not fail to pop data")
.expect("should not be empty");
assert_eq!(data2, actual);

assert_eq!(1, files_in_dir(&root_path).await);

let actual = persisted_queue
.pop()
.await
.expect("should not fail to pop data")
.expect("should not be empty");
assert_eq!(data3, actual);

assert_eq!(0, files_in_dir(&root_path).await);
}
}

0 comments on commit 2e2c3a1

Please sign in to comment.