Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Get S3 credentials lazily #31

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
143 changes: 143 additions & 0 deletions src/lazy_buckets.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
use std::sync::Mutex;

use s3::bucket::Bucket;

use crate::result::Result;

type VecBucketBuilder = dyn FnOnce() -> Result<Vec<Bucket>> + Send + 'static;

/// A `LazyVecBucket` is a vector of `Bucket`s that is constructed
/// when first needed.
///
/// This avoids the potential I/O overhead of creating a credential
/// object when no S3 call will be made.
pub(crate) struct LazyVecBucket {
builder: Mutex<Option<Box<VecBucketBuilder>>>,
buckets: quinine::MonoBox<Result<Vec<Bucket>>>,
}

/// Workaround for the fact that rust-s3 doesn't redact credentials in debug
/// impls.
impl std::fmt::Debug for LazyVecBucket {
fn fmt(&self, fmt: &mut std::fmt::Formatter) -> std::fmt::Result {
#[derive(Debug)]
#[allow(dead_code)] // Because we disregard Debug in dead code analysis.
struct RedactedBucket<'a> {
name: &'a str,
region: &'a awsregion::Region,
}

match self.buckets.as_ref() {
None => write!(fmt, "{:?}", Option::<Vec<Bucket>>::None),
Some(Err(e)) => write!(fmt, "{:?}", Err::<(), std::io::Error>(e.to_io())),
Some(Ok(buckets)) => {
let redacted = buckets
.iter()
.map(|x| RedactedBucket {
name: x.name.as_str(),
region: &x.region,
})
.collect::<Vec<_>>();

write!(fmt, "{:?}", redacted)
}
}
}
}

impl LazyVecBucket {
pub fn new(builder: Box<VecBucketBuilder>) -> Self {
LazyVecBucket {
builder: Mutex::new(Some(builder)),
buckets: Default::default(),
}
}

#[inline(never)]
fn make_buckets(&self) -> std::io::Result<&[Bucket]> {
let mut builder = self.builder.lock().expect("poisoned lock");

// Only do something if we don't have buckets yet.
if self.buckets.is_none() {
let buckets = builder.take().expect("should have a buider")();
let _ = self.buckets.store_value(buckets);
}

self.buckets()
}

pub fn buckets(&self) -> std::io::Result<&[Bucket]> {
if let Some(cache) = self.buckets.as_ref() {
match cache {
Ok(buckets) => return Ok(buckets),
Err(e) => return Err(e.to_io()),
}
}

self.make_buckets()
}
}

#[test]
fn test_buckets_no_credential() {
let called_flag = std::sync::Arc::new(Mutex::new(false));

let builder_flag = called_flag.clone();
let builder = move || {
*builder_flag.lock().unwrap() = true;
Ok(vec![Bucket::new_public(
"test-bucket",
awsregion::Region::UsEast1,
)
.unwrap()])
};

let lazy_vec = LazyVecBucket::new(Box::new(builder));

{
println!("Delayed buckets: {:?}", lazy_vec);
let debug_output = format!("{:?}", lazy_vec);
assert!(!debug_output.to_lowercase().contains("credential"));
assert!(!debug_output.to_lowercase().contains("key"));
assert!(!debug_output.to_lowercase().contains("token"));
}

// Should still be delayed
assert!(!*called_flag.lock().unwrap());

// Force computation
let _ = lazy_vec.buckets();

// Should have called the builder
assert!(*called_flag.lock().unwrap());

{
println!("Resolved buckets: {:?}", lazy_vec);
let debug_output = format!("{:?}", lazy_vec);
assert!(!debug_output.to_lowercase().contains("credential"));
assert!(!debug_output.to_lowercase().contains("key"));
assert!(!debug_output.to_lowercase().contains("token"));
}
}

#[test]
fn test_buckets_error() {
let called_flag = std::sync::Arc::new(Mutex::new(false));

let builder_flag = called_flag.clone();
let builder = move || {
*builder_flag.lock().unwrap() = true;
Err(crate::fresh_error!("fake error"))
};

let lazy_vec = LazyVecBucket::new(Box::new(builder));

// Force computation
let _ = lazy_vec.buckets();

println!("Erroneous buckets: {:?}", lazy_vec);
let debug_output = format!("{:?}", lazy_vec);
assert!(!debug_output.to_lowercase().contains("credential"));
assert!(!debug_output.to_lowercase().contains("key"));
assert!(!debug_output.to_lowercase().contains("token"));
}
1 change: 1 addition & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ mod atomic_kv32;
mod copier;
mod executor;
mod instance_id;
mod lazy_buckets; // For loader
mod loader;
mod manifest_schema;
mod ofd_lock;
Expand Down
75 changes: 35 additions & 40 deletions src/loader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use crate::chain_warn;
use crate::drop_result;
use crate::executor::block_on_with_executor;
use crate::fresh_error;
use crate::lazy_buckets::LazyVecBucket;
use crate::manifest_schema::fingerprint_file_chunk;
use crate::manifest_schema::hash_file_chunk;
use crate::replication_target::apply_cache_replication_targets;
Expand Down Expand Up @@ -83,35 +84,11 @@ impl Chunk {
}
}

/// Workaround for the fact that rust-s3 doesn't redact credentials in debug
/// impls.
#[allow(clippy::ptr_arg)]
fn redacted_bucket_fmt(buckets: &Vec<Bucket>, fmt: &mut std::fmt::Formatter) -> std::fmt::Result {
#[derive(Debug)]
#[allow(dead_code)] // Because we disregard Debug in dead code analysis.
struct RedactedBucket<'a> {
name: &'a str,
region: &'a awsregion::Region,
}

let redacted = buckets
.iter()
.map(|x| RedactedBucket {
name: x.name.as_str(),
region: &x.region,
})
.collect::<Vec<_>>();

write!(fmt, "{:?}", redacted)
}

#[derive(derivative::Derivative)]
#[derivative(Debug)]
#[derive(Debug)]
pub(crate) struct Loader {
cache: kismet_cache::Cache,

#[derivative(Debug(format_with = "redacted_bucket_fmt"))]
remote_sources: Vec<Bucket>,
remote_sources: LazyVecBucket,

// These chunks will be returned without involving any cache nor
// `remote_sources`.
Expand Down Expand Up @@ -330,19 +307,30 @@ impl Loader {
mut cache_builder: kismet_cache::CacheBuilder,
remote: &[ReplicationTarget],
) -> Result<Loader> {
let mut remote_sources = Vec::new();
// Precompute S3 target specs for `bucket_builder`.
let s3_specs = remote
.iter()
.filter(|x| matches!(x, ReplicationTarget::S3(_)))
.cloned()
.collect::<Vec<_>>();

let bucket_builder = move || {
if s3_specs.is_empty() {
return Ok(Vec::new());
}

// We only care about remote S3 sources.
if remote.iter().any(|x| matches!(x, ReplicationTarget::S3(_))) {
let creds =
Credentials::default().map_err(|e| chain_error!(e, "failed to get credentials"))?;

for source in remote {
if let Some(bucket) = create_source(source, &creds, |s3| &s3.chunk_bucket)? {
let mut remote_sources = Vec::new();
for source in s3_specs {
if let Some(bucket) = create_source(&source, &creds, |s3| &s3.chunk_bucket)? {
remote_sources.push(bucket);
}
}
}

Ok(remote_sources)
};

cache_builder = apply_cache_replication_targets(cache_builder, remote);

Expand All @@ -358,7 +346,7 @@ impl Loader {

Ok(Loader {
cache: cache_builder.build(),
remote_sources,
remote_sources: LazyVecBucket::new(Box::new(bucket_builder)),
known_chunks: well_known_chunks()
.into_iter()
.map(|chunk| (chunk.fprint(), chunk))
Expand Down Expand Up @@ -434,7 +422,7 @@ impl Loader {
use std::io::Error;
use std::io::ErrorKind;

for source in &self.remote_sources {
for source in self.remote_sources.buckets()? {
if let Some(remote) = load_from_source(source, &name)
.map_err(|_| Error::new(ErrorKind::Other, "failed to fetch remote chunk"))?
{
Expand Down Expand Up @@ -689,17 +677,24 @@ mod tests {

#[test]
fn test_loader_no_credential() {
let mut loader = Loader {
let builder = move || {
Ok(vec![Bucket::new_public(
"test-bucket",
awsregion::Region::UsEast1,
)
.unwrap()])
};

let loader = Loader {
cache: kismet_cache::CacheBuilder::new().build(),
remote_sources: Vec::new(),
remote_sources: LazyVecBucket::new(Box::new(builder)),
known_chunks: HashMap::new(),
};

loader
.remote_sources
.push(Bucket::new_public("test-bucket", awsregion::Region::UsEast1).unwrap());
// Force computation
let _ = loader.remote_sources.buckets();

println!("Loader: {:?}", loader);
println!("Resolved Loader: {:?}", loader);
let debug_output = format!("{:?}", loader);
assert!(!debug_output.to_lowercase().contains("credential"));
assert!(!debug_output.to_lowercase().contains("key"));
Expand Down