Skip to content

Commit

Permalink
Guard against concurrent cache writes on Windows (#11007)
Browse files Browse the repository at this point in the history
## Summary

On Windows, we have a lot of issues with atomic replacement and such.
There are a bunch of different failure modes, but they generally
involve: trying to persist a fail to a path at which the file already
exists, trying to replace or remove a file while someone else is reading
it, etc.

This PR adds locks to all of the relevant database paths. We already use
these advisory locks when building source distributions; now we use them
when unzipping wheels, storing metadata, etc.

Closes #11002.

## Test Plan

I ran the following script:

```shell
# Define the cache directory path
$cacheDir = "C:\Users\crmar\workspace\uv\cache"

# Clear the cache directory if it exists
if (Test-Path $cacheDir) {
    Remove-Item -Recurse -Force $cacheDir
}

# Create the cache directory again
New-Item -ItemType Directory -Force -Path $cacheDir

# Define the command to run with --cache-dir flag
$command = {
    param ($venvPath)

    # Create a virtual environment in the specified path with --python
    uv venv $venvPath

    # Run the pip install command with --cache-dir flag
    C:\Users\crmar\workspace\uv\target\profiling\uv.exe pip install flask==1.0.4 --no-binary flask --cache-dir C:\Users\crmar\workspace\uv\cache -v --python $venvPath
}

# Define the paths for the different virtual environments
$venv1 = "C:\Users\crmar\workspace\uv\venv1"
$venv2 = "C:\Users\crmar\workspace\uv\venv2"
$venv3 = "C:\Users\crmar\workspace\uv\venv3"
$venv4 = "C:\Users\crmar\workspace\uv\venv4"
$venv5 = "C:\Users\crmar\workspace\uv\venv5"

# Start the command in parallel five times using Start-Job, each with a different venv
$job1 = Start-Job -ScriptBlock $command -ArgumentList $venv1
$job2 = Start-Job -ScriptBlock $command -ArgumentList $venv2
$job3 = Start-Job -ScriptBlock $command -ArgumentList $venv3
$job4 = Start-Job -ScriptBlock $command -ArgumentList $venv4
$job5 = Start-Job -ScriptBlock $command -ArgumentList $venv5

# Wait for all jobs to complete
$jobs = @($job1, $job2, $job3, $job4, $job5)
$jobs | ForEach-Object { Wait-Job $_ }

# Retrieve the results (optional)
$jobs | ForEach-Object { Receive-Job -Job $_ }

# Clean up the jobs
$jobs | ForEach-Object { Remove-Job -Job $_ }
```

And ensured it succeeded in five straight invocations (whereas on
`main`, it consistently fails with a variety of different traces).
  • Loading branch information
charliermarsh authored Jan 28, 2025
1 parent 321f8cc commit f1840c7
Show file tree
Hide file tree
Showing 7 changed files with 137 additions and 71 deletions.
14 changes: 13 additions & 1 deletion crates/uv-cache/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use tracing::debug;
pub use archive::ArchiveId;
use uv_cache_info::Timestamp;
use uv_distribution_types::InstalledDist;
use uv_fs::{cachedir, directories};
use uv_fs::{cachedir, directories, LockedFile};
use uv_normalize::PackageName;
use uv_pypi_types::ResolutionMetadata;

Expand Down Expand Up @@ -74,6 +74,12 @@ impl CacheEntry {
pub fn with_file(&self, file: impl AsRef<Path>) -> Self {
Self(self.dir().join(file))
}

/// Acquire the [`CacheEntry`] as an exclusive lock.
pub async fn lock(&self) -> Result<LockedFile, io::Error> {
fs_err::create_dir_all(self.dir())?;
LockedFile::acquire(self.path(), self.path().display()).await
}
}

impl AsRef<Path> for CacheEntry {
Expand All @@ -97,6 +103,12 @@ impl CacheShard {
pub fn shard(&self, dir: impl AsRef<Path>) -> Self {
Self(self.0.join(dir.as_ref()))
}

/// Acquire the cache entry as an exclusive lock.
pub async fn lock(&self) -> Result<LockedFile, io::Error> {
fs_err::create_dir_all(self.as_ref())?;
LockedFile::acquire(self.join(".lock"), self.display()).await
}
}

impl AsRef<Path> for CacheShard {
Expand Down
21 changes: 21 additions & 0 deletions crates/uv-client/src/registry_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -336,6 +336,13 @@ impl RegistryClient {
Connectivity::Offline => CacheControl::AllowStale,
};

// Acquire an advisory lock, to guard against concurrent writes.
#[cfg(windows)]
let _lock = {
let lock_entry = cache_entry.with_file(format!("{package_name}.lock"));
lock_entry.lock().await.map_err(ErrorKind::CacheWrite)?
};

let result = if matches!(index, IndexUrl::Path(_)) {
self.fetch_local_index(package_name, &url).await
} else {
Expand Down Expand Up @@ -614,6 +621,13 @@ impl RegistryClient {
Connectivity::Offline => CacheControl::AllowStale,
};

// Acquire an advisory lock, to guard against concurrent writes.
#[cfg(windows)]
let _lock = {
let lock_entry = cache_entry.with_file(format!("{}.lock", filename.stem()));
lock_entry.lock().await.map_err(ErrorKind::CacheWrite)?
};

let response_callback = |response: Response| async {
let bytes = response
.bytes()
Expand Down Expand Up @@ -677,6 +691,13 @@ impl RegistryClient {
Connectivity::Offline => CacheControl::AllowStale,
};

// Acquire an advisory lock, to guard against concurrent writes.
#[cfg(windows)]
let _lock = {
let lock_entry = cache_entry.with_file(format!("{}.lock", filename.stem()));
lock_entry.lock().await.map_err(ErrorKind::CacheWrite)?
};

// Attempt to fetch via a range request.
if index.map_or(true, |index| capabilities.supports_range_requests(index)) {
let req = self
Expand Down
40 changes: 37 additions & 3 deletions crates/uv-distribution/src/distribution_database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -356,6 +356,19 @@ impl<'a, Context: BuildContext> DistributionDatabase<'a, Context> {
.boxed_local()
.await?;

// Acquire the advisory lock.
#[cfg(windows)]
let _lock = {
let lock_entry = CacheEntry::new(
built_wheel.target.parent().unwrap(),
format!(
"{}.lock",
built_wheel.target.file_name().unwrap().to_str().unwrap()
),
);
lock_entry.lock().await.map_err(Error::CacheWrite)?
};

// If the wheel was unzipped previously, respect it. Source distributions are
// cached under a unique revision ID, so unzipped directories are never stale.
match built_wheel.target.canonicalize() {
Expand Down Expand Up @@ -515,6 +528,13 @@ impl<'a, Context: BuildContext> DistributionDatabase<'a, Context> {
dist: &BuiltDist,
hashes: HashPolicy<'_>,
) -> Result<Archive, Error> {
// Acquire an advisory lock, to guard against concurrent writes.
#[cfg(windows)]
let _lock = {
let lock_entry = wheel_entry.with_file(format!("{}.lock", filename.stem()));
lock_entry.lock().await.map_err(Error::CacheWrite)?
};

// Create an entry for the HTTP cache.
let http_entry = wheel_entry.with_file(format!("{}.http", filename.stem()));

Expand Down Expand Up @@ -640,6 +660,13 @@ impl<'a, Context: BuildContext> DistributionDatabase<'a, Context> {
dist: &BuiltDist,
hashes: HashPolicy<'_>,
) -> Result<Archive, Error> {
// Acquire an advisory lock, to guard against concurrent writes.
#[cfg(windows)]
let _lock = {
let lock_entry = wheel_entry.with_file(format!("{}.lock", filename.stem()));
lock_entry.lock().await.map_err(Error::CacheWrite)?
};

// Create an entry for the HTTP cache.
let http_entry = wheel_entry.with_file(format!("{}.http", filename.stem()));

Expand Down Expand Up @@ -796,6 +823,12 @@ impl<'a, Context: BuildContext> DistributionDatabase<'a, Context> {
dist: &BuiltDist,
hashes: HashPolicy<'_>,
) -> Result<LocalWheel, Error> {
#[cfg(windows)]
let _lock = {
let lock_entry = wheel_entry.with_file(format!("{}.lock", filename.stem()));
lock_entry.lock().await.map_err(Error::CacheWrite)?
};

// Determine the last-modified time of the wheel.
let modified = Timestamp::from_path(path).map_err(Error::CacheRead)?;

Expand Down Expand Up @@ -890,10 +923,11 @@ impl<'a, Context: BuildContext> DistributionDatabase<'a, Context> {
let temp_dir = tokio::task::spawn_blocking({
let path = path.to_owned();
let root = self.build_context.cache().root().to_path_buf();
move || -> Result<TempDir, uv_extract::Error> {
move || -> Result<TempDir, Error> {
// Unzip the wheel into a temporary directory.
let temp_dir = tempfile::tempdir_in(root)?;
uv_extract::unzip(fs_err::File::open(path)?, temp_dir.path())?;
let temp_dir = tempfile::tempdir_in(root).map_err(Error::CacheWrite)?;
let reader = fs_err::File::open(path).map_err(Error::CacheWrite)?;
uv_extract::unzip(reader, temp_dir.path())?;
Ok(temp_dir)
}
})
Expand Down
35 changes: 13 additions & 22 deletions crates/uv-distribution/src/source/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ use uv_distribution_types::{
PathSourceUrl, SourceDist, SourceUrl,
};
use uv_extract::hash::Hasher;
use uv_fs::{rename_with_retry, write_atomic, LockedFile};
use uv_fs::{rename_with_retry, write_atomic};
use uv_git::{GitHubRepository, GitOid};
use uv_metadata::read_archive_metadata;
use uv_normalize::PackageName;
Expand Down Expand Up @@ -392,7 +392,7 @@ impl<'a, T: BuildContext> SourceDistributionBuilder<'a, T> {
hashes: HashPolicy<'_>,
client: &ManagedClient<'_>,
) -> Result<BuiltWheelMetadata, Error> {
let _lock = lock_shard(cache_shard).await?;
let _lock = cache_shard.lock().await.map_err(Error::CacheWrite)?;

// Fetch the revision for the source distribution.
let revision = self
Expand Down Expand Up @@ -505,7 +505,7 @@ impl<'a, T: BuildContext> SourceDistributionBuilder<'a, T> {
hashes: HashPolicy<'_>,
client: &ManagedClient<'_>,
) -> Result<ArchiveMetadata, Error> {
let _lock = lock_shard(cache_shard).await?;
let _lock = cache_shard.lock().await.map_err(Error::CacheWrite)?;

// Fetch the revision for the source distribution.
let revision = self
Expand Down Expand Up @@ -753,7 +753,7 @@ impl<'a, T: BuildContext> SourceDistributionBuilder<'a, T> {
tags: &Tags,
hashes: HashPolicy<'_>,
) -> Result<BuiltWheelMetadata, Error> {
let _lock = lock_shard(cache_shard).await?;
let _lock = cache_shard.lock().await.map_err(Error::CacheWrite)?;

// Fetch the revision for the source distribution.
let LocalRevisionPointer {
Expand Down Expand Up @@ -847,7 +847,7 @@ impl<'a, T: BuildContext> SourceDistributionBuilder<'a, T> {
cache_shard: &CacheShard,
hashes: HashPolicy<'_>,
) -> Result<ArchiveMetadata, Error> {
let _lock = lock_shard(cache_shard).await?;
let _lock = cache_shard.lock().await.map_err(Error::CacheWrite)?;

// Fetch the revision for the source distribution.
let LocalRevisionPointer { revision, .. } = self
Expand Down Expand Up @@ -1058,7 +1058,8 @@ impl<'a, T: BuildContext> SourceDistributionBuilder<'a, T> {
},
);

let _lock = lock_shard(&cache_shard).await?;
// Acquire the advisory lock.
let _lock = cache_shard.lock().await.map_err(Error::CacheWrite)?;

// Fetch the revision for the source distribution.
let LocalRevisionPointer {
Expand Down Expand Up @@ -1168,7 +1169,8 @@ impl<'a, T: BuildContext> SourceDistributionBuilder<'a, T> {
},
);

let _lock = lock_shard(&cache_shard).await?;
// Acquire the advisory lock.
let _lock = cache_shard.lock().await.map_err(Error::CacheWrite)?;

// Fetch the revision for the source distribution.
let LocalRevisionPointer { revision, .. } = self
Expand Down Expand Up @@ -1430,7 +1432,8 @@ impl<'a, T: BuildContext> SourceDistributionBuilder<'a, T> {
);
let metadata_entry = cache_shard.entry(METADATA);

let _lock = lock_shard(&cache_shard).await?;
// Acquire the advisory lock.
let _lock = cache_shard.lock().await.map_err(Error::CacheWrite)?;

// If there are build settings, we need to scope to a cache shard.
let config_settings = self.build_context.config_settings();
Expand Down Expand Up @@ -1581,7 +1584,8 @@ impl<'a, T: BuildContext> SourceDistributionBuilder<'a, T> {
);
let metadata_entry = cache_shard.entry(METADATA);

let _lock = lock_shard(&cache_shard).await?;
// Acquire the advisory lock.
let _lock = cache_shard.lock().await.map_err(Error::CacheWrite)?;

let path = if let Some(subdirectory) = resource.subdirectory {
Cow::Owned(fetch.path().join(subdirectory))
Expand Down Expand Up @@ -2882,16 +2886,3 @@ fn read_wheel_metadata(
.map_err(|err| Error::WheelMetadata(wheel.to_path_buf(), Box::new(err)))?;
Ok(ResolutionMetadata::parse_metadata(&dist_info)?)
}

/// Apply an advisory lock to a [`CacheShard`] to prevent concurrent builds.
async fn lock_shard(cache_shard: &CacheShard) -> Result<LockedFile, Error> {
let root = cache_shard.as_ref();

fs_err::create_dir_all(root).map_err(Error::CacheWrite)?;

let lock = LockedFile::acquire(root.join(".lock"), root.display())
.await
.map_err(Error::CacheWrite)?;

Ok(lock)
}
11 changes: 9 additions & 2 deletions crates/uv-fs/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,11 @@ pub async fn read_to_string_transcode(path: impl AsRef<Path>) -> std::io::Result

/// Create a symlink at `dst` pointing to `src`, replacing any existing symlink.
///
/// On Windows, this uses the `junction` crate to create a junction point.
/// Note because junctions are used, the source must be a directory.
/// On Windows, this uses the `junction` crate to create a junction point. The
/// operation is _not_ atomic, as we first delete the junction, then create a
/// junction at the same path.
///
/// Note that because junctions are used, the source must be a directory.
#[cfg(windows)]
pub fn replace_symlink(src: impl AsRef<Path>, dst: impl AsRef<Path>) -> std::io::Result<()> {
// If the source is a file, we can't create a junction
Expand Down Expand Up @@ -79,6 +82,10 @@ pub fn replace_symlink(src: impl AsRef<Path>, dst: impl AsRef<Path>) -> std::io:
}

/// Create a symlink at `dst` pointing to `src`, replacing any existing symlink if necessary.
///
/// On Unix, this method creates a temporary file, then moves it into place.
///
/// TODO(charlie): Consider using the `rust-atomicwrites` crate.
#[cfg(unix)]
pub fn replace_symlink(src: impl AsRef<Path>, dst: impl AsRef<Path>) -> std::io::Result<()> {
// Attempt to create the symlink directly.
Expand Down
12 changes: 8 additions & 4 deletions crates/uv/tests/it/cache_clean.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,11 +63,13 @@ fn clean_package_pypi() -> Result<()> {
.filters()
.into_iter()
.chain([
// The cache entry does not have a stable key, so we filter it out
// The cache entry does not have a stable key, so we filter it out.
(
r"\[CACHE_DIR\](\\|\/)(.+)(\\|\/).*",
"[CACHE_DIR]/$2/[ENTRY]",
),
// The file count varies by operating system, so we filter it out.
("Removed \\d+ files?", "Removed [N] files"),
])
.collect();

Expand All @@ -79,7 +81,7 @@ fn clean_package_pypi() -> Result<()> {
----- stderr -----
DEBUG uv [VERSION] ([COMMIT] DATE)
DEBUG Removing dangling cache entry: [CACHE_DIR]/archive-v0/[ENTRY]
Removed 12 files ([SIZE])
Removed [N] files ([SIZE])
"###);

// Assert that the `.rkyv` file is removed for `iniconfig`.
Expand Down Expand Up @@ -136,11 +138,13 @@ fn clean_package_index() -> Result<()> {
.filters()
.into_iter()
.chain([
// The cache entry does not have a stable key, so we filter it out
// The cache entry does not have a stable key, so we filter it out.
(
r"\[CACHE_DIR\](\\|\/)(.+)(\\|\/).*",
"[CACHE_DIR]/$2/[ENTRY]",
),
// The file count varies by operating system, so we filter it out.
("Removed \\d+ files?", "Removed [N] files"),
])
.collect();

Expand All @@ -152,7 +156,7 @@ fn clean_package_index() -> Result<()> {
----- stderr -----
DEBUG uv [VERSION] ([COMMIT] DATE)
DEBUG Removing dangling cache entry: [CACHE_DIR]/archive-v0/[ENTRY]
Removed 12 files ([SIZE])
Removed [N] files ([SIZE])
"###);

// Assert that the `.rkyv` file is removed for `iniconfig`.
Expand Down
Loading

0 comments on commit f1840c7

Please sign in to comment.