From c72d11c9f67e3d3d9f5a064b4d6d07c97f6881e1 Mon Sep 17 00:00:00 2001 From: Paul Khuong Date: Sun, 6 Oct 2024 13:33:19 -0400 Subject: [PATCH 1/2] NFC loader: stick buckets in a lazily built vector --- src/lazy_buckets.rs | 91 +++++++++++++++++++++++++++++++++++++++++++++ src/lib.rs | 1 + src/loader.rs | 42 ++++----------------- 3 files changed, 100 insertions(+), 34 deletions(-) create mode 100644 src/lazy_buckets.rs diff --git a/src/lazy_buckets.rs b/src/lazy_buckets.rs new file mode 100644 index 0000000..8f50195 --- /dev/null +++ b/src/lazy_buckets.rs @@ -0,0 +1,91 @@ +use std::sync::Mutex; + +use s3::bucket::Bucket; + +use crate::result::Result; + +type VecBucketBuilder = dyn FnOnce() -> Result> + Send + 'static; + +/// A `LazyVecBucket` is a vector of `Bucket`s that is constructed +/// when first needed. +/// +/// This avoids the potential I/O overhead of creating a credential +/// object when no S3 call will be made. +pub(crate) struct LazyVecBucket { + builder: Mutex>>, + buckets: quinine::MonoBox>>, +} + +/// Workaround for the fact that rust-s3 doesn't redact credentials in debug +/// impls. +impl std::fmt::Debug for LazyVecBucket { + fn fmt(&self, fmt: &mut std::fmt::Formatter) -> std::fmt::Result { + #[derive(Debug)] + #[allow(dead_code)] // Because we disregard Debug in dead code analysis. + struct RedactedBucket<'a> { + name: &'a str, + region: &'a awsregion::Region, + } + + match self.buckets.as_ref() { + None => write!(fmt, "{:?}", Option::>::None), + Some(Err(e)) => write!(fmt, "{:?}", Err::<(), std::io::Error>(e.to_io())), + Some(Ok(buckets)) => { + let redacted = buckets + .iter() + .map(|x| RedactedBucket { + name: x.name.as_str(), + region: &x.region, + }) + .collect::>(); + + write!(fmt, "{:?}", redacted) + } + } + } +} + +impl LazyVecBucket { + pub fn new_from_buckets(buckets: Vec) -> Self { + LazyVecBucket { + builder: Default::default(), + buckets: Box::new(Ok(buckets)).into(), + } + } + + #[inline(never)] + fn make_buckets(&self) -> std::io::Result<&[Bucket]> { + let mut builder = self.builder.lock().expect("poisoned lock"); + + // Only do something if we don't have buckets yet. + if self.buckets.is_none() { + let buckets = builder.take().expect("should have a buider")(); + let _ = self.buckets.store_value(buckets); + } + + self.buckets() + } + + pub fn buckets(&self) -> std::io::Result<&[Bucket]> { + if let Some(cache) = self.buckets.as_ref() { + match cache { + Ok(buckets) => return Ok(buckets), + Err(e) => return Err(e.to_io()), + } + } + + self.make_buckets() + } +} + +#[test] +fn test_buckets_no_credential() { + let bucket = Bucket::new_public("test-bucket", awsregion::Region::UsEast1).unwrap(); + let lazy_vec = LazyVecBucket::new_from_buckets(vec![bucket]); + + println!("Buckets: {:?}", lazy_vec); + let debug_output = format!("{:?}", lazy_vec); + assert!(!debug_output.to_lowercase().contains("credential")); + assert!(!debug_output.to_lowercase().contains("key")); + assert!(!debug_output.to_lowercase().contains("token")); +} diff --git a/src/lib.rs b/src/lib.rs index 08e74e9..7f71645 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -2,6 +2,7 @@ mod atomic_kv32; mod copier; mod executor; mod instance_id; +mod lazy_buckets; // For loader mod loader; mod manifest_schema; mod ofd_lock; diff --git a/src/loader.rs b/src/loader.rs index 2acfe5b..acac1c7 100644 --- a/src/loader.rs +++ b/src/loader.rs @@ -19,6 +19,7 @@ use crate::chain_warn; use crate::drop_result; use crate::executor::block_on_with_executor; use crate::fresh_error; +use crate::lazy_buckets::LazyVecBucket; use crate::manifest_schema::fingerprint_file_chunk; use crate::manifest_schema::hash_file_chunk; use crate::replication_target::apply_cache_replication_targets; @@ -83,35 +84,11 @@ impl Chunk { } } -/// Workaround for the fact that rust-s3 doesn't redact credentials in debug -/// impls. -#[allow(clippy::ptr_arg)] -fn redacted_bucket_fmt(buckets: &Vec, fmt: &mut std::fmt::Formatter) -> std::fmt::Result { - #[derive(Debug)] - #[allow(dead_code)] // Because we disregard Debug in dead code analysis. - struct RedactedBucket<'a> { - name: &'a str, - region: &'a awsregion::Region, - } - - let redacted = buckets - .iter() - .map(|x| RedactedBucket { - name: x.name.as_str(), - region: &x.region, - }) - .collect::>(); - - write!(fmt, "{:?}", redacted) -} - -#[derive(derivative::Derivative)] -#[derivative(Debug)] +#[derive(Debug)] pub(crate) struct Loader { cache: kismet_cache::Cache, - #[derivative(Debug(format_with = "redacted_bucket_fmt"))] - remote_sources: Vec, + remote_sources: LazyVecBucket, // These chunks will be returned without involving any cache nor // `remote_sources`. @@ -358,7 +335,7 @@ impl Loader { Ok(Loader { cache: cache_builder.build(), - remote_sources, + remote_sources: LazyVecBucket::new_from_buckets(remote_sources), known_chunks: well_known_chunks() .into_iter() .map(|chunk| (chunk.fprint(), chunk)) @@ -434,7 +411,7 @@ impl Loader { use std::io::Error; use std::io::ErrorKind; - for source in &self.remote_sources { + for source in self.remote_sources.buckets()? { if let Some(remote) = load_from_source(source, &name) .map_err(|_| Error::new(ErrorKind::Other, "failed to fetch remote chunk"))? { @@ -689,16 +666,13 @@ mod tests { #[test] fn test_loader_no_credential() { - let mut loader = Loader { + let bucket = Bucket::new_public("test-bucket", awsregion::Region::UsEast1).unwrap(); + let loader = Loader { cache: kismet_cache::CacheBuilder::new().build(), - remote_sources: Vec::new(), + remote_sources: LazyVecBucket::new_from_buckets(vec![bucket]), known_chunks: HashMap::new(), }; - loader - .remote_sources - .push(Bucket::new_public("test-bucket", awsregion::Region::UsEast1).unwrap()); - println!("Loader: {:?}", loader); let debug_output = format!("{:?}", loader); assert!(!debug_output.to_lowercase().contains("credential")); From f8d22480827f0fb9cc2aa3d8330f886df3a1e7e4 Mon Sep 17 00:00:00 2001 From: Paul Khuong Date: Sun, 6 Oct 2024 13:44:09 -0400 Subject: [PATCH 2/2] loader: create S3 bucket objects (which need credentials) on demand The write VFS (that produces replication data) sometimes needs a loader, but usually doesn't. Before this commit, creating a loader could perform (local) network I/O to get credentials. We now wait until we actually have to make an S3 API call (to fetch a chunk) before getting the credentials. Sample debug printer output: ``` cargo test loader -- --nocapture Finished `test` profile [unoptimized + debuginfo] target(s) in 0.07s Running unittests src/lib.rs (target/debug/deps/verneuil-106bf643236b6261) running 1 test Resolved Loader: Loader { cache: Cache { write_side: None, auto_sync: true, read_side: ReadOnlyCache { stack: [] } }, remote_sources: [RedactedBucket { name: "test-bucket", region: UsEast1 }], known_chunks: {} } test loader::tests::test_loader_no_credential ... ok test result: ok. 1 passed; 0 failed; 0 ignored; 0 measured; 46 filtered out; finished in 0.00s ``` --- src/lazy_buckets.rs | 64 ++++++++++++++++++++++++++++++++++++++++----- src/loader.rs | 41 ++++++++++++++++++++++------- 2 files changed, 89 insertions(+), 16 deletions(-) diff --git a/src/lazy_buckets.rs b/src/lazy_buckets.rs index 8f50195..aa930fc 100644 --- a/src/lazy_buckets.rs +++ b/src/lazy_buckets.rs @@ -46,10 +46,10 @@ impl std::fmt::Debug for LazyVecBucket { } impl LazyVecBucket { - pub fn new_from_buckets(buckets: Vec) -> Self { + pub fn new(builder: Box) -> Self { LazyVecBucket { - builder: Default::default(), - buckets: Box::new(Ok(buckets)).into(), + builder: Mutex::new(Some(builder)), + buckets: Default::default(), } } @@ -80,10 +80,62 @@ impl LazyVecBucket { #[test] fn test_buckets_no_credential() { - let bucket = Bucket::new_public("test-bucket", awsregion::Region::UsEast1).unwrap(); - let lazy_vec = LazyVecBucket::new_from_buckets(vec![bucket]); + let called_flag = std::sync::Arc::new(Mutex::new(false)); - println!("Buckets: {:?}", lazy_vec); + let builder_flag = called_flag.clone(); + let builder = move || { + *builder_flag.lock().unwrap() = true; + Ok(vec![Bucket::new_public( + "test-bucket", + awsregion::Region::UsEast1, + ) + .unwrap()]) + }; + + let lazy_vec = LazyVecBucket::new(Box::new(builder)); + + { + println!("Delayed buckets: {:?}", lazy_vec); + let debug_output = format!("{:?}", lazy_vec); + assert!(!debug_output.to_lowercase().contains("credential")); + assert!(!debug_output.to_lowercase().contains("key")); + assert!(!debug_output.to_lowercase().contains("token")); + } + + // Should still be delayed + assert!(!*called_flag.lock().unwrap()); + + // Force computation + let _ = lazy_vec.buckets(); + + // Should have called the builder + assert!(*called_flag.lock().unwrap()); + + { + println!("Resolved buckets: {:?}", lazy_vec); + let debug_output = format!("{:?}", lazy_vec); + assert!(!debug_output.to_lowercase().contains("credential")); + assert!(!debug_output.to_lowercase().contains("key")); + assert!(!debug_output.to_lowercase().contains("token")); + } +} + +#[test] +fn test_buckets_error() { + let called_flag = std::sync::Arc::new(Mutex::new(false)); + + let builder_flag = called_flag.clone(); + let builder = move || { + *builder_flag.lock().unwrap() = true; + Err(crate::fresh_error!("fake error")) + }; + + let lazy_vec = LazyVecBucket::new(Box::new(builder)); + + // Force computation + let _ = lazy_vec.buckets(); + + println!("Erroneous buckets: {:?}", lazy_vec); let debug_output = format!("{:?}", lazy_vec); assert!(!debug_output.to_lowercase().contains("credential")); assert!(!debug_output.to_lowercase().contains("key")); diff --git a/src/loader.rs b/src/loader.rs index acac1c7..ce9330c 100644 --- a/src/loader.rs +++ b/src/loader.rs @@ -307,19 +307,30 @@ impl Loader { mut cache_builder: kismet_cache::CacheBuilder, remote: &[ReplicationTarget], ) -> Result { - let mut remote_sources = Vec::new(); + // Precompute S3 target specs for `bucket_builder`. + let s3_specs = remote + .iter() + .filter(|x| matches!(x, ReplicationTarget::S3(_))) + .cloned() + .collect::>(); + + let bucket_builder = move || { + if s3_specs.is_empty() { + return Ok(Vec::new()); + } - // We only care about remote S3 sources. - if remote.iter().any(|x| matches!(x, ReplicationTarget::S3(_))) { let creds = Credentials::default().map_err(|e| chain_error!(e, "failed to get credentials"))?; - for source in remote { - if let Some(bucket) = create_source(source, &creds, |s3| &s3.chunk_bucket)? { + let mut remote_sources = Vec::new(); + for source in s3_specs { + if let Some(bucket) = create_source(&source, &creds, |s3| &s3.chunk_bucket)? { remote_sources.push(bucket); } } - } + + Ok(remote_sources) + }; cache_builder = apply_cache_replication_targets(cache_builder, remote); @@ -335,7 +346,7 @@ impl Loader { Ok(Loader { cache: cache_builder.build(), - remote_sources: LazyVecBucket::new_from_buckets(remote_sources), + remote_sources: LazyVecBucket::new(Box::new(bucket_builder)), known_chunks: well_known_chunks() .into_iter() .map(|chunk| (chunk.fprint(), chunk)) @@ -666,14 +677,24 @@ mod tests { #[test] fn test_loader_no_credential() { - let bucket = Bucket::new_public("test-bucket", awsregion::Region::UsEast1).unwrap(); + let builder = move || { + Ok(vec![Bucket::new_public( + "test-bucket", + awsregion::Region::UsEast1, + ) + .unwrap()]) + }; + let loader = Loader { cache: kismet_cache::CacheBuilder::new().build(), - remote_sources: LazyVecBucket::new_from_buckets(vec![bucket]), + remote_sources: LazyVecBucket::new(Box::new(builder)), known_chunks: HashMap::new(), }; - println!("Loader: {:?}", loader); + // Force computation + let _ = loader.remote_sources.buckets(); + + println!("Resolved Loader: {:?}", loader); let debug_output = format!("{:?}", loader); assert!(!debug_output.to_lowercase().contains("credential")); assert!(!debug_output.to_lowercase().contains("key"));