Skip to content

Commit 1370e09

Browse files
committed
Add cache key migration
1 parent 1cc32f5 commit 1370e09

File tree

2 files changed

+181
-13
lines changed

2 files changed

+181
-13
lines changed

crates/rattler_cache/src/package_cache/cache_key.rs

+6-1
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,11 @@ impl CacheKey {
4343
self.location_hash = Some(format!("{path_hash:x}"));
4444
self
4545
}
46+
47+
/// Return the old cache key format without the location hash
48+
pub fn key_without_location(&self) -> String {
49+
format!("{}-{}-{}", &self.name, &self.version, &self.build_string)
50+
}
4651
}
4752

4853
impl CacheKey {
@@ -84,7 +89,7 @@ impl Display for CacheKey {
8489
"{}-{}-{}-{}",
8590
&self.name, &self.version, &self.build_string, url_hash
8691
),
87-
None => write!(f, "{}-{}-{}", &self.name, &self.version, &self.build_string),
92+
None => write!(f, "{}", self.key_without_location()),
8893
}
8994
}
9095
}

crates/rattler_cache/src/package_cache/mod.rs

+175-12
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
//! [`PackageCache`].
33
44
use std::{
5+
collections::VecDeque,
56
error::Error,
67
fmt::Debug,
78
future::Future,
@@ -89,6 +90,10 @@ pub enum PackageCacheError {
8990
#[error("{0}")]
9091
LockError(String, #[source] std::io::Error),
9192

93+
/// A cache key migration error.
94+
#[error("failed to migrate the cache key: {0}")]
95+
CacheKeyMigrationError(tokio::io::Error),
96+
9297
/// The operation was cancelled
9398
#[error("operation was cancelled")]
9499
Cancelled,
@@ -133,7 +138,6 @@ impl PackageCache {
133138
E: std::error::Error + Send + Sync + 'static,
134139
{
135140
let cache_key: CacheKey = pkg.into();
136-
let cache_path = self.inner.path.join(cache_key.to_string());
137141
let cache_entry = self
138142
.inner
139143
.packages
@@ -147,7 +151,8 @@ impl PackageCache {
147151

148152
// Validate the cache entry or fetch the package if it is not valid.
149153
let cache_lock = validate_or_fetch_to_cache(
150-
cache_path,
154+
&cache_key,
155+
&self.inner.path,
151156
fetch,
152157
cache_entry.last_revision,
153158
cache_key.sha256.as_ref(),
@@ -297,10 +302,82 @@ impl PackageCache {
297302
}
298303
}
299304

305+
/// Adds the .lock extension to a given path.
306+
fn get_path_with_lock_extension(path: &Path) -> PathBuf {
307+
// Append the `.lock` extension to the cache path to create the lock file path.
308+
// `Path::with_extension` strips too much from the filename if it contains one
309+
// or more dots.
310+
let mut path_str = path.as_os_str().to_owned();
311+
path_str.push(".lock");
312+
PathBuf::from(path_str)
313+
}
314+
315+
/// Moves a directory with all nested contents.
316+
async fn move_dir_all(src: impl AsRef<Path>, dst: impl AsRef<Path>) -> tokio::io::Result<()> {
317+
let src = src.as_ref().to_path_buf();
318+
let dst = dst.as_ref().to_path_buf();
319+
let mut dir_queue = VecDeque::new();
320+
dir_queue.push_back((src.clone(), dst.clone()));
321+
322+
while let Some((src_dir, dst_dir)) = dir_queue.pop_front() {
323+
tokio_fs::create_dir_all(&dst_dir).await?;
324+
let mut entries = tokio_fs::read_dir(&src_dir).await?;
325+
326+
while let Some(entry) = entries.next_entry().await? {
327+
let src_path = entry.path();
328+
let dst_path = dst_dir.join(entry.file_name());
329+
330+
if entry.file_type().await?.is_dir() {
331+
dir_queue.push_back((src_path.clone(), dst_path));
332+
} else {
333+
tokio_fs::rename(&src_path, &dst_path).await?;
334+
}
335+
}
336+
}
337+
338+
tokio_fs::remove_dir_all(src).await?;
339+
340+
Ok(())
341+
}
342+
343+
/// Migrates old cache keys that do not contain the location hash.
344+
async fn migrate_cache_entry_to_location_hash(
345+
cache_dir: &Path,
346+
cache_key: &CacheKey,
347+
) -> Result<(), PackageCacheError> {
348+
let old_cache_entry_path = cache_dir.join(cache_key.key_without_location());
349+
350+
// Check if the old cache entry exists or not, because it is not necessary to acquire a lock
351+
// if the entry does not exist.
352+
if old_cache_entry_path.exists() {
353+
let new_cache_entry_path = cache_dir.join(cache_key.to_string());
354+
let new_cache_lock_path = get_path_with_lock_extension(&new_cache_entry_path);
355+
let old_cache_lock_path = get_path_with_lock_extension(&old_cache_entry_path);
356+
let _old_lock = CacheRwLock::acquire_write(&old_cache_lock_path).await?;
357+
let _new_lock = CacheRwLock::acquire_write(&new_cache_lock_path).await?;
358+
359+
// We check if the entries exists again, because there could have been a race condition and
360+
// another process might have already renamed to old cache entries.
361+
if old_cache_entry_path.exists() {
362+
move_dir_all(old_cache_entry_path, new_cache_entry_path)
363+
.await
364+
.map_err(PackageCacheError::CacheKeyMigrationError)?;
365+
}
366+
if old_cache_lock_path.exists() {
367+
tokio_fs::rename(old_cache_lock_path, new_cache_lock_path)
368+
.await
369+
.map_err(PackageCacheError::CacheKeyMigrationError)?;
370+
}
371+
}
372+
373+
Ok(())
374+
}
375+
300376
/// Validates that the package that is currently stored is a valid package and
301377
/// otherwise calls the `fetch` method to populate the cache.
302378
async fn validate_or_fetch_to_cache<F, Fut, E>(
303-
path: PathBuf,
379+
cache_key: &CacheKey,
380+
cache_dir: &Path,
304381
fetch: F,
305382
known_valid_revision: Option<u64>,
306383
given_sha: Option<&Sha256Hash>,
@@ -311,16 +388,10 @@ where
311388
Fut: Future<Output = Result<(), E>> + 'static,
312389
E: Error + Send + Sync + 'static,
313390
{
391+
let path = cache_dir.join(cache_key.to_string());
314392
// Acquire a read lock on the cache entry. This ensures that no other process is
315393
// currently writing to the cache.
316-
let lock_file_path = {
317-
// Append the `.lock` extension to the cache path to create the lock file path.
318-
// `Path::with_extension` strips too much from the filename if it contains one
319-
// or more dots.
320-
let mut path_str = path.as_os_str().to_owned();
321-
path_str.push(".lock");
322-
PathBuf::from(path_str)
323-
};
394+
let lock_file_path = get_path_with_lock_extension(&path);
324395

325396
// Ensure the directory containing the lock-file exists.
326397
if let Some(root_dir) = lock_file_path.parent() {
@@ -334,6 +405,9 @@ where
334405
.await?;
335406
}
336407

408+
// Ensure that all cache keys are migrated to contain the location hash.
409+
migrate_cache_entry_to_location_hash(cache_dir, cache_key).await?;
410+
337411
// The revision of the cache entry that we already know is valid.
338412
let mut validated_revision = known_valid_revision;
339413

@@ -510,12 +584,13 @@ mod test {
510584
use reqwest_retry::RetryTransientMiddleware;
511585
use tempfile::tempdir;
512586
use tokio::sync::Mutex;
587+
use tokio::{fs, io::AsyncWriteExt};
513588
use tokio_stream::StreamExt;
514589
use url::Url;
515590

516591
use super::PackageCache;
517592
use crate::{
518-
package_cache::CacheKey,
593+
package_cache::{migrate_cache_entry_to_location_hash, CacheKey},
519594
validation::{validate_package_directory, ValidationMode},
520595
};
521596

@@ -871,4 +946,92 @@ mod test {
871946
"expected sha256 to be different"
872947
);
873948
}
949+
950+
#[tokio::test]
951+
async fn test_migrate_cache_entry_to_location_hash() {
952+
let package_url: Url ="https://conda.anaconda.org/robostack/linux-64/ros-noetic-rosbridge-suite-0.11.14-py39h6fdeb60_14.tar.bz2".parse().unwrap();
953+
let tar_archive_path = tools::download_and_cache_file_async(
954+
package_url.clone(),
955+
"4dd9893f1eee45e1579d1a4f5533ef67a84b5e4b7515de7ed0db1dd47adc6bc8",
956+
)
957+
.await
958+
.unwrap();
959+
960+
// Create a temporary directory to store the packages
961+
let packages_dir = tempdir().unwrap().into_path();
962+
963+
let new_cache_key = "ros-noetic-rosbridge-suite-0.11.14-py39h6fdeb60_14-c5e00a13f8127a70ed3004ecb6f83eafa98ced9e5602bca19e8d52ae3c9fc881";
964+
let old_cache_key = "ros-noetic-rosbridge-suite-0.11.14-py39h6fdeb60_14";
965+
let new_cache_key_lock = "ros-noetic-rosbridge-suite-0.11.14-py39h6fdeb60_14-c5e00a13f8127a70ed3004ecb6f83eafa98ced9e5602bca19e8d52ae3c9fc881.lock";
966+
let old_cache_key_lock = "ros-noetic-rosbridge-suite-0.11.14-py39h6fdeb60_14.lock";
967+
968+
// Set the sha256 of the package
969+
let cache_key: CacheKey = ArchiveIdentifier::try_from_path(&tar_archive_path)
970+
.unwrap()
971+
.into();
972+
let cache_key = cache_key
973+
.with_sha256(
974+
parse_digest_from_hex::<Sha256>(
975+
"4dd9893f1eee45e1579d1a4f5533ef67a84b5e4b7515de7ed0db1dd47adc6bc8",
976+
)
977+
.unwrap(),
978+
)
979+
.with_url(package_url);
980+
981+
assert_eq!(&cache_key.key_without_location(), old_cache_key);
982+
assert_eq!(&cache_key.to_string(), new_cache_key);
983+
984+
fs::create_dir(packages_dir.join(old_cache_key))
985+
.await
986+
.unwrap();
987+
fs::create_dir(packages_dir.join(old_cache_key).join("foobar"))
988+
.await
989+
.unwrap();
990+
fs::create_dir(packages_dir.join(old_cache_key).join("baz"))
991+
.await
992+
.unwrap();
993+
let mut file = fs::File::create(
994+
packages_dir
995+
.join(old_cache_key)
996+
.join("foobar")
997+
.join("some_file"),
998+
)
999+
.await
1000+
.unwrap();
1001+
file.write(b"Some random file content").await.unwrap();
1002+
let mut file = fs::File::create(packages_dir.join(old_cache_key_lock))
1003+
.await
1004+
.unwrap();
1005+
file.write(b"Fake lockfile content").await.unwrap();
1006+
1007+
migrate_cache_entry_to_location_hash(&packages_dir, &cache_key)
1008+
.await
1009+
.unwrap();
1010+
1011+
assert!(!packages_dir.join(old_cache_key).exists());
1012+
assert!(!packages_dir.join(old_cache_key_lock).exists());
1013+
assert!(packages_dir.join(new_cache_key).exists());
1014+
assert!(packages_dir.join(new_cache_key).is_dir());
1015+
assert!(packages_dir.join(new_cache_key).join("foobar").exists());
1016+
assert!(packages_dir.join(new_cache_key).join("baz").exists());
1017+
assert!(packages_dir.join(new_cache_key_lock).is_file());
1018+
assert!(packages_dir.join(new_cache_key_lock).exists());
1019+
assert_eq!(
1020+
fs::read_to_string(packages_dir.join(new_cache_key_lock))
1021+
.await
1022+
.unwrap(),
1023+
"Fake lockfile content"
1024+
);
1025+
assert_eq!(
1026+
fs::read_to_string(
1027+
packages_dir
1028+
.join(new_cache_key)
1029+
.join("foobar")
1030+
.join("some_file")
1031+
)
1032+
.await
1033+
.unwrap(),
1034+
"Some random file content"
1035+
);
1036+
}
8741037
}

0 commit comments

Comments
 (0)