Skip to content

Commit c69b2d1

Browse files
authored
feat: Add support for repodata patching in rattler-index, fix silent failures (#1129)
1 parent 8995a5c commit c69b2d1

File tree

7 files changed

+226
-68
lines changed

7 files changed

+226
-68
lines changed

crates/rattler_conda_types/src/repo_data/mod.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -470,7 +470,7 @@ fn determine_subdir(
470470
Arch::X86_64 => "64",
471471
_ => arch.as_str(),
472472
};
473-
Ok(format!("{}-{}", platform, arch_str))
473+
Ok(format!("{platform}-{arch_str}"))
474474
}
475475
Err(_) => Err(ConvertSubdirError::NoKnownCombination { platform, arch }),
476476
}

crates/rattler_index/src/lib.rs

+194-62
Original file line numberDiff line numberDiff line change
@@ -2,21 +2,24 @@
22
//! files
33
#![deny(missing_docs)]
44

5-
use anyhow::Result;
5+
use anyhow::{Context, Result};
66
use bytes::buf::Buf;
77
use fs_err::{self as fs};
8-
use futures::future::try_join_all;
8+
use futures::{stream::FuturesUnordered, StreamExt};
99
use fxhash::FxHashMap;
1010
use indicatif::{MultiProgress, ProgressBar, ProgressStyle};
1111
use rattler_conda_types::{
1212
package::{ArchiveType, IndexJson, PackageFile},
13-
ChannelInfo, PackageRecord, Platform, RepoData,
13+
ChannelInfo, PackageRecord, PatchInstructions, Platform, RepoData,
1414
};
1515
use rattler_networking::{Authentication, AuthenticationStorage};
16-
use rattler_package_streaming::{read, seek};
16+
use rattler_package_streaming::{
17+
read,
18+
seek::{self, stream_conda_content},
19+
};
1720
use std::{
1821
collections::{HashMap, HashSet},
19-
io::{Cursor, Read},
22+
io::{Cursor, Read, Seek},
2023
path::{Path, PathBuf},
2124
str::FromStr,
2225
sync::Arc,
@@ -71,6 +74,46 @@ pub fn package_record_from_index_json<T: Read>(
7174
Ok(package_record)
7275
}
7376

77+
fn repodata_patch_from_conda_package_stream<'a>(
78+
package: impl Read + Seek + 'a,
79+
) -> anyhow::Result<rattler_conda_types::RepoDataPatch> {
80+
let mut subdirs = FxHashMap::default();
81+
82+
let mut content_reader = stream_conda_content(package)?;
83+
let entries = content_reader.entries()?;
84+
for entry in entries {
85+
let mut entry = entry?;
86+
if !entry.header().entry_type().is_file() {
87+
return Err(anyhow::anyhow!(
88+
"Expected repodata patch package to be a file"
89+
));
90+
}
91+
let mut buf = Vec::new();
92+
entry.read_to_end(&mut buf)?;
93+
let path = entry.path()?;
94+
let components = path.components().collect::<Vec<_>>();
95+
let subdir =
96+
if components.len() == 2 && components[1].as_os_str() == "patch_instructions.json" {
97+
let subdir_str = components[0]
98+
.as_os_str()
99+
.to_str()
100+
.context("Could not convert OsStr to str")?;
101+
let _ = Platform::from_str(subdir_str)?;
102+
subdir_str.to_string()
103+
} else {
104+
return Err(anyhow::anyhow!(
105+
"Expected files of form <subdir>/patch_instructions.json, but found {}",
106+
path.display()
107+
));
108+
};
109+
110+
let instructions: PatchInstructions = serde_json::from_slice(&buf)?;
111+
subdirs.insert(subdir, instructions);
112+
}
113+
114+
Ok(rattler_conda_types::RepoDataPatch { subdirs })
115+
}
116+
74117
/// Extract the package record from a `.tar.bz2` package file.
75118
/// This function will look for the `info/index.json` file in the conda package
76119
/// and extract the package record from it.
@@ -132,12 +175,17 @@ async fn index_subdir(
132175
subdir: Platform,
133176
op: Operator,
134177
force: bool,
178+
repodata_patch: Option<PatchInstructions>,
135179
progress: Option<MultiProgress>,
136180
semaphore: Arc<Semaphore>,
137181
) -> Result<()> {
182+
let repodata_path = if repodata_patch.is_some() {
183+
format!("{subdir}/repodata_from_packages.json")
184+
} else {
185+
format!("{subdir}/repodata.json")
186+
};
138187
let mut registered_packages: FxHashMap<String, PackageRecord> = HashMap::default();
139188
if !force {
140-
let repodata_path = format!("{subdir}/repodata.json");
141189
let repodata_bytes = op.read(&repodata_path).await;
142190
let repodata: RepoData = match repodata_bytes {
143191
Ok(bytes) => serde_json::from_slice(&bytes.to_vec())?,
@@ -210,7 +258,7 @@ async fn index_subdir(
210258
.cloned()
211259
.collect::<Vec<_>>();
212260

213-
tracing::debug!(
261+
tracing::info!(
214262
"Adding {} packages to subdir {}.",
215263
packages_to_add.len(),
216264
subdir
@@ -229,53 +277,79 @@ async fn index_subdir(
229277
.progress_chars("##-");
230278
pb.set_style(sty);
231279

232-
let tasks = packages_to_add
233-
.iter()
234-
.map(|filename| {
235-
tokio::spawn({
236-
let op = op.clone();
237-
let filename = filename.clone();
238-
let pb = pb.clone();
239-
let semaphore = semaphore.clone();
240-
{
241-
async move {
242-
let _permit = semaphore
243-
.acquire()
244-
.await
245-
.expect("Semaphore was unexpectedly closed");
246-
pb.set_message(format!(
247-
"Indexing {} {}",
248-
subdir.as_str(),
249-
console::style(filename.clone()).dim()
250-
));
251-
let file_path = format!("{subdir}/{filename}");
252-
let buffer = op.read(&file_path).await?;
253-
let reader = buffer.reader();
254-
// We already know it's not None
255-
let archive_type = ArchiveType::try_from(&filename).unwrap();
256-
let record = match archive_type {
257-
ArchiveType::TarBz2 => package_record_from_tar_bz2_reader(reader),
258-
ArchiveType::Conda => package_record_from_conda_reader(reader),
259-
}?;
260-
pb.inc(1);
261-
Ok::<(String, PackageRecord), std::io::Error>((filename.clone(), record))
262-
}
280+
let mut tasks = FuturesUnordered::new();
281+
for filename in packages_to_add.iter() {
282+
let task = {
283+
let op = op.clone();
284+
let filename = filename.clone();
285+
let pb = pb.clone();
286+
let semaphore = semaphore.clone();
287+
{
288+
async move {
289+
let _permit = semaphore
290+
.acquire()
291+
.await
292+
.expect("Semaphore was unexpectedly closed");
293+
pb.set_message(format!(
294+
"Indexing {} {}",
295+
subdir.as_str(),
296+
console::style(filename.clone()).dim()
297+
));
298+
let file_path = format!("{subdir}/{filename}");
299+
let buffer = op.read(&file_path).await?;
300+
let reader = buffer.reader();
301+
// We already know it's not None
302+
let archive_type = ArchiveType::try_from(&filename).unwrap();
303+
let record = match archive_type {
304+
ArchiveType::TarBz2 => package_record_from_tar_bz2_reader(reader),
305+
ArchiveType::Conda => package_record_from_conda_reader(reader),
306+
}?;
307+
pb.inc(1);
308+
Ok::<(String, PackageRecord), std::io::Error>((filename.clone(), record))
263309
}
264-
})
265-
})
266-
.collect::<Vec<_>>();
267-
let results = try_join_all(tasks).await?;
268-
269-
pb.finish_with_message(format!("Finished {}", subdir.as_str()));
310+
}
311+
};
312+
tasks.push(tokio::spawn(task));
313+
}
314+
let mut results = Vec::new();
315+
while let Some(join_result) = tasks.next().await {
316+
match join_result {
317+
Ok(Ok(result)) => results.push(result),
318+
Ok(Err(e)) => {
319+
tasks.clear();
320+
tracing::error!("Failed to process package: {}", e);
321+
pb.abandon_with_message(format!(
322+
"{} {}",
323+
console::style("Failed to index").red(),
324+
console::style(subdir.as_str()).dim()
325+
));
326+
return Err(e.into());
327+
}
328+
Err(join_err) => {
329+
tasks.clear();
330+
tracing::error!("Task panicked: {}", join_err);
331+
pb.abandon_with_message(format!(
332+
"{} {}",
333+
console::style("Failed to index").red(),
334+
console::style(subdir.as_str()).dim()
335+
));
336+
return Err(anyhow::anyhow!("Task panicked: {}", join_err));
337+
}
338+
}
339+
}
340+
pb.finish_with_message(format!(
341+
"{} {}",
342+
console::style("Finished").green(),
343+
subdir.as_str()
344+
));
270345

271-
tracing::debug!(
346+
tracing::info!(
272347
"Successfully added {} packages to subdir {}.",
273348
results.len(),
274349
subdir
275350
);
276351

277-
for result in results {
278-
let (filename, record) = result?;
352+
for (filename, record) in results {
279353
registered_packages.insert(filename, record);
280354
}
281355

@@ -304,26 +378,46 @@ async fn index_subdir(
304378
version: Some(2),
305379
};
306380

307-
let repodata_path = format!("{subdir}/repodata.json");
381+
tracing::info!("Writing repodata to {}", repodata_path);
308382
let repodata_bytes = serde_json::to_vec(&repodata)?;
309383
op.write(&repodata_path, repodata_bytes).await?;
384+
385+
if let Some(instructions) = repodata_patch {
386+
let patched_repodata_path = format!("{subdir}/repodata.json");
387+
tracing::info!("Writing patched repodata to {}", patched_repodata_path);
388+
let mut patched_repodata = repodata.clone();
389+
patched_repodata.apply_patches(&instructions);
390+
let patched_repodata_bytes = serde_json::to_vec(&patched_repodata)?;
391+
op.write(&patched_repodata_path, patched_repodata_bytes)
392+
.await?;
393+
}
310394
// todo: also write repodata.json.bz2, repodata.json.zst, repodata.json.jlap and sharded repodata once available in rattler
311395
// https://github.com/conda/rattler/issues/1096
312396

313397
Ok(())
314398
}
315399

316400
/// Create a new `repodata.json` for all packages in the channel at the given directory.
401+
#[allow(clippy::too_many_arguments)]
317402
pub async fn index_fs(
318403
channel: impl Into<PathBuf>,
319404
target_platform: Option<Platform>,
405+
repodata_patch: Option<String>,
320406
force: bool,
321407
max_parallel: usize,
322408
multi_progress: Option<MultiProgress>,
323409
) -> anyhow::Result<()> {
324410
let mut config = FsConfig::default();
325411
config.root = Some(channel.into().canonicalize()?.to_string_lossy().to_string());
326-
index(target_platform, config, force, max_parallel, multi_progress).await
412+
index(
413+
target_platform,
414+
config,
415+
repodata_patch,
416+
force,
417+
max_parallel,
418+
multi_progress,
419+
)
420+
.await
327421
}
328422

329423
/// Create a new `repodata.json` for all packages in the channel at the given S3 URL.
@@ -337,6 +431,7 @@ pub async fn index_s3(
337431
secret_access_key: Option<String>,
338432
session_token: Option<String>,
339433
target_platform: Option<Platform>,
434+
repodata_patch: Option<String>,
340435
force: bool,
341436
max_parallel: usize,
342437
multi_progress: Option<MultiProgress>,
@@ -376,6 +471,7 @@ pub async fn index_s3(
376471
index(
377472
target_platform,
378473
s3_config,
474+
repodata_patch,
379475
force,
380476
max_parallel,
381477
multi_progress,
@@ -398,6 +494,7 @@ pub async fn index_s3(
398494
pub async fn index<T: Configurator>(
399495
target_platform: Option<Platform>,
400496
config: T,
497+
repodata_patch: Option<String>,
401498
force: bool,
402499
max_parallel: usize,
403500
multi_progress: Option<MultiProgress>,
@@ -443,22 +540,57 @@ pub async fn index<T: Configurator>(
443540
subdirs.insert(Platform::NoArch);
444541
}
445542

543+
let repodata_patch = if let Some(path) = repodata_patch {
544+
match ArchiveType::try_from(path.clone()) {
545+
Some(ArchiveType::Conda) => {}
546+
Some(ArchiveType::TarBz2) | None => {
547+
return Err(anyhow::anyhow!(
548+
"Only .conda packages are supported for repodata patches. Got: {}",
549+
path
550+
))
551+
}
552+
}
553+
let repodata_patch_path = format!("noarch/{path}");
554+
let repodata_patch_bytes = op.read(&repodata_patch_path).await?.to_bytes();
555+
let reader = Cursor::new(repodata_patch_bytes);
556+
let repodata_patch = repodata_patch_from_conda_package_stream(reader)?;
557+
Some(repodata_patch)
558+
} else {
559+
None
560+
};
561+
446562
let semaphore = Semaphore::new(max_parallel);
447563
let semaphore = Arc::new(semaphore);
448564

449-
let tasks = subdirs
450-
.iter()
451-
.map(|subdir| {
452-
tokio::spawn(index_subdir(
453-
*subdir,
454-
op.clone(),
455-
force,
456-
multi_progress.clone(),
457-
semaphore.clone(),
458-
))
459-
})
460-
.collect::<Vec<_>>();
461-
try_join_all(tasks).await?;
565+
let mut tasks = FuturesUnordered::new();
566+
for subdir in subdirs.iter() {
567+
let task = index_subdir(
568+
*subdir,
569+
op.clone(),
570+
force,
571+
repodata_patch
572+
.as_ref()
573+
.and_then(|p| p.subdirs.get(&subdir.to_string()).cloned()),
574+
multi_progress.clone(),
575+
semaphore.clone(),
576+
);
577+
tasks.push(tokio::spawn(task));
578+
}
462579

580+
while let Some(join_result) = tasks.next().await {
581+
match join_result {
582+
Ok(Ok(_)) => {}
583+
Ok(Err(e)) => {
584+
tracing::error!("Failed to process subdir: {}", e);
585+
tasks.clear();
586+
return Err(e);
587+
}
588+
Err(join_err) => {
589+
tracing::error!("Task panicked: {}", join_err);
590+
tasks.clear();
591+
return Err(anyhow::anyhow!("Task panicked: {}", join_err));
592+
}
593+
}
594+
}
463595
Ok(())
464596
}

0 commit comments

Comments
 (0)