From 1c4fce6efd298dc496838e7e43d1cf04f2e73f8e Mon Sep 17 00:00:00 2001 From: Sebastian Galkin Date: Thu, 20 Feb 2025 17:47:20 -0300 Subject: [PATCH] Add optional overrides for nonconforming object stores --- Cargo.toml | 1 + .../python/icechunk/_icechunk_python.pyi | 37 +++++- icechunk-python/src/config.rs | 51 +++++--- icechunk-python/src/repository.rs | 4 +- icechunk-python/tests/test_config.py | 2 +- icechunk/examples/multithreaded_store.rs | 1 + icechunk/src/storage/mod.rs | 42 +++++++ icechunk/src/storage/object_store.rs | 114 +++++++++--------- icechunk/src/storage/s3.rs | 82 ++++++++----- icechunk/tests/test_storage.rs | 54 ++++++++- icechunk/tests/test_virtual_refs.rs | 1 + 11 files changed, 282 insertions(+), 107 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 2d0c0136..7c8f6958 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -9,6 +9,7 @@ unwrap_used = "warn" panic = "warn" todo = "warn" unimplemented = "warn" +dbg_macro = "warn" [workspace.metadata.release] allow-branch = ["main"] diff --git a/icechunk-python/python/icechunk/_icechunk_python.pyi b/icechunk-python/python/icechunk/_icechunk_python.pyi index b2564187..7b9fa91c 100644 --- a/icechunk-python/python/icechunk/_icechunk_python.pyi +++ b/icechunk-python/python/icechunk/_icechunk_python.pyi @@ -585,7 +585,13 @@ class StorageConcurrencySettings: class StorageSettings: """Configuration for how Icechunk uses its Storage instance""" - def __init__(self, concurrency: StorageConcurrencySettings | None = None) -> None: + def __init__( + self, + concurrency: StorageConcurrencySettings | None = None, + unsafe_use_conditional_create: bool | None = None, + unsafe_use_conditional_update: bool | None = None, + unsafe_use_metadata: bool | None = None, + ) -> None: """ Create a new `StorageSettings` object @@ -593,18 +599,45 @@ class StorageSettings: ---------- concurrency: StorageConcurrencySettings | None The configuration for how Icechunk uses its Storage instance. + + unsafe_use_conditional_update: bool | None + If set to False, Icechunk loses some of its consistency guarantees. + This is only useful in object stores that don't support the feature. + Use it at your own risk. + + unsafe_use_conditional_create: bool | None + If set to False, Icechunk loses some of its consistency guarantees. + This is only useful in object stores that don't support the feature. + Use at your own risk. + + unsafe_use_metadata: bool | None + Don't write metadata fields in Icechunk files. + This is only useful in object stores that don't support the feature. + Use at your own risk. """ ... @property def concurrency(self) -> StorageConcurrencySettings | None: """ - The configuration for how Icechunk uses its Storage instance. + The configuration for how much concurrency Icechunk store uses Returns ------- StorageConcurrencySettings | None The configuration for how Icechunk uses its Storage instance. """ + + @property + def unsafe_use_conditional_update(self) -> bool | None: + """True if Icechunk will use conditional PUT operations for updates in the object store""" + ... + @property + def unsafe_use_conditional_create(self) -> bool | None: + """True if Icechunk will use conditional PUT operations for creation in the object store""" + ... + @property + def unsafe_use_metadata(self) -> bool | None: + """True if Icechunk will write object metadata in the object store""" ... class RepositoryConfig: diff --git a/icechunk-python/src/config.rs b/icechunk-python/src/config.rs index 016ff0df..9d5e0044 100644 --- a/icechunk-python/src/config.rs +++ b/icechunk-python/src/config.rs @@ -88,13 +88,13 @@ impl PyS3StaticCredentials { r#"S3StaticCredentials(access_key_id="{ak}", secret_access_key="{sk}", session_token={st}, expires_after={ea})"#, ak = self.access_key_id.as_str(), sk = self.secret_access_key.as_str(), - st = format_option_string(self.session_token.as_ref()), + st = format_option(self.session_token.as_ref()), ea = format_option(self.expires_after.as_ref().map(datetime_repr)) ) } } -fn format_option_to_string(o: Option) -> String { +pub(crate) fn format_option_to_string(o: Option) -> String { match o.as_ref() { None => "None".to_string(), Some(s) => s.to_string(), @@ -108,13 +108,6 @@ fn format_option<'a, T: AsRef + 'a>(o: Option) -> String { } } -pub(crate) fn format_option_string<'a, T: AsRef + 'a>(o: Option) -> String { - match o.as_ref() { - None => "None".to_string(), - Some(s) => format!(r#""{}""#, s.as_ref()), - } -} - fn format_bool(b: bool) -> &'static str { match b { true => "True", @@ -377,8 +370,8 @@ impl PyS3Options { // TODO: escape format!( r#"S3Options(region={region}, endpoint_url={url}, allow_http={http}, anonymous={anon})"#, - region = format_option_string(self.region.as_ref()), - url = format_option_string(self.endpoint_url.as_ref()), + region = format_option(self.region.as_ref()), + url = format_option(self.endpoint_url.as_ref()), http = format_bool(self.allow_http), anon = format_bool(self.anonymous), ) @@ -707,6 +700,12 @@ fn storage_concurrency_settings_repr(s: &PyStorageConcurrencySettings) -> String pub struct PyStorageSettings { #[pyo3(get, set)] pub concurrency: Option>, + #[pyo3(get, set)] + pub unsafe_use_conditional_update: Option, + #[pyo3(get, set)] + pub unsafe_use_conditional_create: Option, + #[pyo3(get, set)] + pub unsafe_use_metadata: Option, } impl From for PyStorageSettings { @@ -717,6 +716,9 @@ impl From for PyStorageSettings { Py::new(py, Into::::into(c)) .expect("Cannot create instance of StorageConcurrencySettings") }), + unsafe_use_conditional_create: value.unsafe_use_conditional_create, + unsafe_use_conditional_update: value.unsafe_use_conditional_update, + unsafe_use_metadata: value.unsafe_use_metadata, }) } } @@ -725,6 +727,9 @@ impl From<&PyStorageSettings> for storage::Settings { fn from(value: &PyStorageSettings) -> Self { Python::with_gil(|py| Self { concurrency: value.concurrency.as_ref().map(|c| (&*c.borrow(py)).into()), + unsafe_use_conditional_create: value.unsafe_use_conditional_create, + unsafe_use_conditional_update: value.unsafe_use_conditional_update, + unsafe_use_metadata: value.unsafe_use_metadata, }) } } @@ -741,10 +746,20 @@ impl Eq for PyStorageSettings {} #[pymethods] impl PyStorageSettings { - #[pyo3(signature = ( concurrency=None))] + #[pyo3(signature = ( concurrency=None, unsafe_use_conditional_create=None, unsafe_use_conditional_update=None, unsafe_use_metadata=None))] #[new] - pub fn new(concurrency: Option>) -> Self { - Self { concurrency } + pub fn new( + concurrency: Option>, + unsafe_use_conditional_create: Option, + unsafe_use_conditional_update: Option, + unsafe_use_metadata: Option, + ) -> Self { + Self { + concurrency, + unsafe_use_conditional_create, + unsafe_use_metadata, + unsafe_use_conditional_update, + } } pub fn __repr__(&self) -> String { @@ -756,7 +771,13 @@ impl PyStorageSettings { }), }; - format!(r#"StorageSettings(concurrency={conc})"#, conc = inner) + format!( + r#"StorageSettings(concurrency={conc}, unsafe_use_conditional_create={cr}, unsafe_use_conditional_update={up}, unsafe_use_metadata={me})"#, + conc = inner, + cr = format_option(self.unsafe_use_conditional_create.map(format_bool)), + up = format_option(self.unsafe_use_conditional_update.map(format_bool)), + me = format_option(self.unsafe_use_metadata.map(format_bool)) + ) } } diff --git a/icechunk-python/src/repository.rs b/icechunk-python/src/repository.rs index ab9da2e3..d51f1ee6 100644 --- a/icechunk-python/src/repository.rs +++ b/icechunk-python/src/repository.rs @@ -28,7 +28,7 @@ use tokio::sync::{Mutex, RwLock}; use crate::{ config::{ - datetime_repr, format_option_string, PyCredentials, PyRepositoryConfig, + datetime_repr, format_option_to_string, PyCredentials, PyRepositoryConfig, PyStorage, PyStorageSettings, }, errors::PyIcechunkStoreError, @@ -176,7 +176,7 @@ impl PySnapshotInfo { format!( r#"SnapshotInfo(id="{id}", parent_id={parent}, written_at={at}, message="{message}")"#, id = self.id, - parent = format_option_string(self.parent_id.as_ref()), + parent = format_option_to_string(self.parent_id.as_ref()), at = datetime_repr(&self.written_at), message = self.message.chars().take(10).collect::() + "...", ) diff --git a/icechunk-python/tests/test_config.py b/icechunk-python/tests/test_config.py index df509752..dc1b0843 100644 --- a/icechunk-python/tests/test_config.py +++ b/icechunk-python/tests/test_config.py @@ -178,7 +178,7 @@ def test_can_change_deep_config_values() -> None: ) assert re.match( - r"RepositoryConfig\(inline_chunk_threshold_bytes=5, get_partial_values_concurrency=42, compression=CompressionConfig\(algorithm=None, level=2\), caching=CachingConfig\(num_snapshot_nodes=None, num_chunk_refs=8, num_transaction_changes=None, num_bytes_attributes=None, num_bytes_chunks=None\), storage=StorageSettings\(concurrency=StorageConcurrencySettings\(max_concurrent_requests_for_object=5, ideal_concurrent_request_size=1000000\)\), manifest=.*\)", + r"RepositoryConfig\(inline_chunk_threshold_bytes=5, get_partial_values_concurrency=42, compression=CompressionConfig\(algorithm=None, level=2\), caching=CachingConfig\(num_snapshot_nodes=None, num_chunk_refs=8, num_transaction_changes=None, num_bytes_attributes=None, num_bytes_chunks=None\), storage=StorageSettings\(concurrency=StorageConcurrencySettings\(max_concurrent_requests_for_object=5, ideal_concurrent_request_size=1000000\), unsafe_use_conditional_create=None, unsafe_use_conditional_update=None, unsafe_use_metadata=None\), manifest=.*\)", repr(config), ) repo = icechunk.Repository.open( diff --git a/icechunk/examples/multithreaded_store.rs b/icechunk/examples/multithreaded_store.rs index 1e77e9e8..470627ea 100644 --- a/icechunk/examples/multithreaded_store.rs +++ b/icechunk/examples/multithreaded_store.rs @@ -54,6 +54,7 @@ async fn main() -> Result<(), Box> { async fn writer(name: &str, range: Range, store: &Store) { println!("Starting writer {name}."); for i in range { + #[allow(clippy::dbg_macro)] if let Err(err) = store .set( format!("array/c/{i}").as_str(), diff --git a/icechunk/src/storage/mod.rs b/icechunk/src/storage/mod.rs index 67ff4ef3..28768818 100644 --- a/icechunk/src/storage/mod.rs +++ b/icechunk/src/storage/mod.rs @@ -174,6 +174,9 @@ impl ConcurrencySettings { #[derive(Debug, PartialEq, Eq, Serialize, Deserialize, Clone, Default)] pub struct Settings { pub concurrency: Option, + pub unsafe_use_conditional_update: Option, + pub unsafe_use_conditional_create: Option, + pub unsafe_use_metadata: Option, } static DEFAULT_CONCURRENCY: OnceLock = OnceLock::new(); @@ -185,6 +188,18 @@ impl Settings { .unwrap_or_else(|| DEFAULT_CONCURRENCY.get_or_init(Default::default)) } + pub fn unsafe_use_conditional_create(&self) -> bool { + self.unsafe_use_conditional_create.unwrap_or(true) + } + + pub fn unsafe_use_conditional_update(&self) -> bool { + self.unsafe_use_conditional_update.unwrap_or(true) + } + + pub fn unsafe_use_metadata(&self) -> bool { + self.unsafe_use_metadata.unwrap_or(true) + } + pub fn merge(&self, other: Self) -> Self { Self { concurrency: match (&self.concurrency, other.concurrency) { @@ -193,6 +208,33 @@ impl Settings { (Some(c), None) => Some(c.clone()), (Some(mine), Some(theirs)) => Some(mine.merge(theirs)), }, + unsafe_use_conditional_create: match ( + &self.unsafe_use_conditional_create, + other.unsafe_use_conditional_create, + ) { + (None, None) => None, + (None, Some(c)) => Some(c), + (Some(c), None) => Some(*c), + (Some(_), Some(theirs)) => Some(theirs), + }, + unsafe_use_conditional_update: match ( + &self.unsafe_use_conditional_update, + other.unsafe_use_conditional_update, + ) { + (None, None) => None, + (None, Some(c)) => Some(c), + (Some(c), None) => Some(*c), + (Some(_), Some(theirs)) => Some(theirs), + }, + unsafe_use_metadata: match ( + &self.unsafe_use_metadata, + other.unsafe_use_metadata, + ) { + (None, None) => None, + (None, Some(c)) => Some(c), + (Some(c), None) => Some(*c), + (Some(_), Some(theirs)) => Some(theirs), + }, } } } diff --git a/icechunk/src/storage/object_store.rs b/icechunk/src/storage/object_store.rs index d85ea694..390143f1 100644 --- a/icechunk/src/storage/object_store.rs +++ b/icechunk/src/storage/object_store.rs @@ -158,16 +158,6 @@ impl ObjectStorage { self.backend.artificially_sort_refs_in_mem() } - /// We need this because object_store's local file implementation doesn't support metadata. - pub fn supports_metadata(&self) -> bool { - self.backend.supports_metadata() - } - - /// We need this because object_store's local file implementation doesn't support it - pub fn supports_conditional_put_updates(&self) -> bool { - self.backend.supports_conditional_put_updates() - } - /// Return all keys in the store /// /// Intended for testing and debugging purposes only. @@ -251,8 +241,12 @@ impl ObjectStorage { .compat()) } - fn metadata_to_attributes(&self, metadata: Vec<(String, String)>) -> Attributes { - if self.supports_metadata() { + fn metadata_to_attributes( + &self, + settings: &Settings, + metadata: Vec<(String, String)>, + ) -> Attributes { + if settings.unsafe_use_metadata() { Attributes::from_iter(metadata.into_iter().map(|(key, val)| { ( Attribute::Metadata(std::borrow::Cow::Owned(key)), @@ -270,18 +264,24 @@ impl ObjectStorage { Some(parent.as_ref().to_string()) } - fn get_put_mode(&self, previous_version: &VersionInfo) -> PutMode { - let degrade_to_overwrite = - !previous_version.is_create() && !self.supports_conditional_put_updates(); - if degrade_to_overwrite { - PutMode::Overwrite - } else if previous_version.is_create() { - PutMode::Create - } else { - PutMode::Update(UpdateVersion { + fn get_put_mode( + &self, + settings: &Settings, + previous_version: &VersionInfo, + ) -> PutMode { + match ( + previous_version.is_create(), + settings.unsafe_use_conditional_create(), + settings.unsafe_use_conditional_update(), + ) { + (true, true, _) => PutMode::Create, + (true, false, _) => PutMode::Overwrite, + + (false, _, true) => PutMode::Update(UpdateVersion { e_tag: previous_version.etag().cloned(), version: previous_version.generation().cloned(), - }) + }), + (false, _, false) => PutMode::Overwrite, } } } @@ -317,15 +317,15 @@ impl Storage for ObjectStorage { Err(err) => Err(err.into()), } } - #[instrument(skip(self, _settings, config))] + #[instrument(skip(self, settings, config))] async fn update_config( &self, - _settings: &Settings, + settings: &Settings, config: Bytes, previous_version: &VersionInfo, ) -> StorageResult { let path = self.get_config_path(); - let attributes = if self.supports_metadata() { + let attributes = if settings.unsafe_use_metadata() { Attributes::from_iter(vec![( Attribute::ContentType, AttributeValue::from("application/yaml"), @@ -334,7 +334,7 @@ impl Storage for ObjectStorage { Attributes::new() }; - let mode = self.get_put_mode(previous_version); + let mode = self.get_put_mode(settings, previous_version); let options = PutOptions { mode, attributes, ..PutOptions::default() }; let res = self.get_client().await.put_opts(&path, config.into(), options).await; @@ -394,48 +394,48 @@ impl Storage for ObjectStorage { Ok(Box::new(self.get_object_reader(settings, &path).await?)) } - #[instrument(skip(self, _settings, metadata, bytes))] + #[instrument(skip(self, settings, metadata, bytes))] async fn write_snapshot( &self, - _settings: &Settings, + settings: &Settings, id: SnapshotId, metadata: Vec<(String, String)>, bytes: Bytes, ) -> StorageResult<()> { let path = self.get_snapshot_path(&id); - let attributes = self.metadata_to_attributes(metadata); + let attributes = self.metadata_to_attributes(settings, metadata); let options = PutOptions { attributes, ..PutOptions::default() }; // FIXME: use multipart self.get_client().await.put_opts(&path, bytes.into(), options).await?; Ok(()) } - #[instrument(skip(self, _settings, metadata, bytes))] + #[instrument(skip(self, settings, metadata, bytes))] async fn write_manifest( &self, - _settings: &Settings, + settings: &Settings, id: ManifestId, metadata: Vec<(String, String)>, bytes: Bytes, ) -> StorageResult<()> { let path = self.get_manifest_path(&id); - let attributes = self.metadata_to_attributes(metadata); + let attributes = self.metadata_to_attributes(settings, metadata); let options = PutOptions { attributes, ..PutOptions::default() }; // FIXME: use multipart self.get_client().await.put_opts(&path, bytes.into(), options).await?; Ok(()) } - #[instrument(skip(self, _settings, metadata, bytes))] + #[instrument(skip(self, settings, metadata, bytes))] async fn write_transaction_log( &self, - _settings: &Settings, + settings: &Settings, id: SnapshotId, metadata: Vec<(String, String)>, bytes: Bytes, ) -> StorageResult<()> { let path = self.get_transaction_path(&id); - let attributes = self.metadata_to_attributes(metadata); + let attributes = self.metadata_to_attributes(settings, metadata); let options = PutOptions { attributes, ..PutOptions::default() }; // FIXME: use multipart self.get_client().await.put_opts(&path, bytes.into(), options).await?; @@ -506,16 +506,16 @@ impl Storage for ObjectStorage { .await?) } - #[instrument(skip(self, _settings, bytes))] + #[instrument(skip(self, settings, bytes))] async fn write_ref( &self, - _settings: &Settings, + settings: &Settings, ref_key: &str, bytes: Bytes, previous_version: &VersionInfo, ) -> StorageResult { let key = self.ref_key(ref_key); - let mode = self.get_put_mode(previous_version); + let mode = self.get_put_mode(settings, previous_version); let opts = PutOptions { mode, ..PutOptions::default() }; match self @@ -634,19 +634,7 @@ pub trait ObjectStoreBackend: Debug + Sync + Send { false } - /// We need this because object_store's local file implementation doesn't support metadata. - fn supports_metadata(&self) -> bool { - true - } - - /// We need this because object_store's local file implementation doesn't support it - fn supports_conditional_put_updates(&self) -> bool { - true - } - - fn default_settings(&self) -> Settings { - Settings::default() - } + fn default_settings(&self) -> Settings; } #[derive(Debug, Serialize, Deserialize)] @@ -674,6 +662,7 @@ impl ObjectStoreBackend for InMemoryObjectStoreBackend { NonZeroU64::new(1).unwrap_or(NonZeroU64::MIN), ), }), + ..Default::default() } } } @@ -706,14 +695,6 @@ impl ObjectStoreBackend for LocalFileSystemObjectStoreBackend { true } - fn supports_metadata(&self) -> bool { - false - } - - fn supports_conditional_put_updates(&self) -> bool { - false - } - fn default_settings(&self) -> Settings { Settings { concurrency: Some(ConcurrencySettings { @@ -724,6 +705,9 @@ impl ObjectStoreBackend for LocalFileSystemObjectStoreBackend { NonZeroU64::new(4 * 1024).unwrap_or(NonZeroU64::MIN), ), }), + unsafe_use_conditional_update: Some(false), + unsafe_use_metadata: Some(false), + ..Default::default() } } } @@ -795,6 +779,10 @@ impl ObjectStoreBackend for S3ObjectStoreBackend { fn prefix(&self) -> String { self.prefix.clone().unwrap_or("".to_string()) } + + fn default_settings(&self) -> Settings { + Default::default() + } } #[derive(Debug, Serialize, Deserialize)] @@ -844,6 +832,10 @@ impl ObjectStoreBackend for AzureObjectStoreBackend { fn prefix(&self) -> String { self.prefix.clone().unwrap_or("".to_string()) } + + fn default_settings(&self) -> Settings { + Default::default() + } } #[derive(Debug, Serialize, Deserialize)] @@ -910,6 +902,10 @@ impl ObjectStoreBackend for GcsObjectStoreBackend { fn prefix(&self) -> String { self.prefix.clone().unwrap_or("".to_string()) } + + fn default_settings(&self) -> Settings { + Default::default() + } } #[derive(Debug)] diff --git a/icechunk/src/storage/s3.rs b/icechunk/src/storage/s3.rs index 33b2c196..197add32 100644 --- a/icechunk/src/storage/s3.rs +++ b/icechunk/src/storage/s3.rs @@ -196,6 +196,7 @@ impl S3Storage { I: IntoIterator, impl Into)>, >( &self, + settings: &Settings, key: &str, content_type: Option>, metadata: I, @@ -208,10 +209,11 @@ impl S3Storage { b = b.content_type(ct) }; - for (k, v) in metadata { - b = b.metadata(k, v); + if settings.unsafe_use_metadata() { + for (k, v) in metadata { + b = b.metadata(k, v); + } } - b.body(bytes.into()).send().await?; Ok(()) } @@ -296,10 +298,10 @@ impl Storage for S3Storage { } } - #[instrument(skip(self, _settings, config))] + #[instrument(skip(self, settings, config))] async fn update_config( &self, - _settings: &Settings, + settings: &Settings, config: Bytes, previous_version: &VersionInfo, ) -> StorageResult { @@ -313,10 +315,14 @@ impl Storage for S3Storage { .content_type("application/yaml") .body(config.into()); - if let Some(etag) = previous_version.etag() { - req = req.if_match(etag) - } else { - req = req.if_none_match("*") + match ( + previous_version.etag(), + settings.unsafe_use_conditional_create(), + settings.unsafe_use_conditional_update(), + ) { + (None, true, _) => req = req.if_none_match("*"), + (Some(etag), _, true) => req = req.if_match(etag), + (_, _, _) => {} } let res = req.send().await; @@ -411,53 +417,67 @@ impl Storage for S3Storage { .await } - #[instrument(skip(self, _settings, metadata, bytes))] + #[instrument(skip(self, settings, metadata, bytes))] async fn write_snapshot( &self, - _settings: &Settings, + settings: &Settings, id: SnapshotId, metadata: Vec<(String, String)>, bytes: Bytes, ) -> StorageResult<()> { let key = self.get_snapshot_path(&id)?; - self.put_object(key.as_str(), None::, metadata, bytes).await + self.put_object(settings, key.as_str(), None::, metadata, bytes).await } - #[instrument(skip(self, _settings, metadata, bytes))] + #[instrument(skip(self, settings, metadata, bytes))] async fn write_manifest( &self, - _settings: &Settings, + settings: &Settings, id: ManifestId, metadata: Vec<(String, String)>, bytes: Bytes, ) -> StorageResult<()> { let key = self.get_manifest_path(&id)?; - self.put_object(key.as_str(), None::, metadata.into_iter(), bytes).await + self.put_object( + settings, + key.as_str(), + None::, + metadata.into_iter(), + bytes, + ) + .await } - #[instrument(skip(self, _settings, metadata, bytes))] + #[instrument(skip(self, settings, metadata, bytes))] async fn write_transaction_log( &self, - _settings: &Settings, + settings: &Settings, id: SnapshotId, metadata: Vec<(String, String)>, bytes: Bytes, ) -> StorageResult<()> { let key = self.get_transaction_path(&id)?; - self.put_object(key.as_str(), None::, metadata.into_iter(), bytes).await + self.put_object( + settings, + key.as_str(), + None::, + metadata.into_iter(), + bytes, + ) + .await } - #[instrument(skip(self, _settings, bytes))] + #[instrument(skip(self, settings, bytes))] async fn write_chunk( &self, - _settings: &Settings, + settings: &Settings, id: ChunkId, bytes: bytes::Bytes, ) -> Result<(), StorageError> { let key = self.get_chunk_path(&id)?; //FIXME: use multipart upload let metadata: [(String, String); 0] = []; - self.put_object(key.as_str(), None::, metadata, bytes).await + self.put_object(settings, key.as_str(), None::, metadata, bytes).await } #[instrument(skip(self, _settings))] @@ -523,10 +543,10 @@ impl Storage for S3Storage { Ok(res) } - #[instrument(skip(self, _settings, bytes))] + #[instrument(skip(self, settings, bytes))] async fn write_ref( &self, - _settings: &Settings, + settings: &Settings, ref_key: &str, bytes: Bytes, previous_version: &VersionInfo, @@ -539,10 +559,18 @@ impl Storage for S3Storage { .bucket(self.bucket.clone()) .key(key.clone()); - if let Some(etag) = previous_version.etag() { - builder = builder.if_match(etag); - } else { - builder = builder.if_none_match("*"); + match ( + previous_version.etag(), + settings.unsafe_use_conditional_create(), + settings.unsafe_use_conditional_update(), + ) { + (None, true, _) => { + builder = builder.if_none_match("*"); + } + (Some(etag), _, true) => { + builder = builder.if_match(etag); + } + (_, _, _) => {} } let res = builder.body(bytes.into()).send().await; diff --git a/icechunk/tests/test_storage.rs b/icechunk/tests/test_storage.rs index 5aeaf28e..5165c119 100644 --- a/icechunk/tests/test_storage.rs +++ b/icechunk/tests/test_storage.rs @@ -14,7 +14,7 @@ use icechunk::{ RefErrorKind, }, storage::{ - new_in_memory_storage, new_s3_storage, ETag, FetchConfigResult, Generation, + self, new_in_memory_storage, new_s3_storage, ETag, FetchConfigResult, Generation, StorageResult, UpdateConfigResult, VersionInfo, }, ObjectStorage, Storage, @@ -488,3 +488,55 @@ pub async fn test_write_config_fails_on_bad_version_when_existing( }).await?; Ok(()) } + +#[tokio::test] +#[allow(clippy::panic)] +pub async fn test_write_config_can_overwrite_with_unsafe_config( +) -> Result<(), Box> { + with_storage(|_, storage| async move { + let storage_settings = storage::Settings { + unsafe_use_conditional_update: Some(false), + unsafe_use_conditional_create: Some(false), + ..storage.default_settings() + }; + + // create the initial version + let config = Bytes::copy_from_slice(b"hello"); + match storage + .update_config( + &storage_settings, + config.clone(), + &VersionInfo { + etag: Some(ETag("some-bad-etag".to_string())), + generation: Some(Generation("42".to_string())), + } + ) + .await? + { + UpdateConfigResult::Updated { new_version } => new_version, + _ => panic!(), + }; + + // attempt a bad change that should succeed in this config + let update_res = storage + .update_config( + &storage_settings, + Bytes::copy_from_slice(b"bye"), + &VersionInfo { + etag: Some(ETag("other-bad-etag".to_string())), + generation: Some(Generation("55".to_string())), + }, + ) + .await?; + + assert!(matches!(update_res, UpdateConfigResult::Updated { .. })); + + let fetch_res = storage.fetch_config(&storage_settings).await?; + assert!( + matches!(fetch_res, FetchConfigResult::Found{bytes, ..} if bytes.as_ref() == b"bye") + ); + Ok(()) + }) + .await?; + Ok(()) +} diff --git a/icechunk/tests/test_virtual_refs.rs b/icechunk/tests/test_virtual_refs.rs index 9ab5153f..034f64ba 100644 --- a/icechunk/tests/test_virtual_refs.rs +++ b/icechunk/tests/test_virtual_refs.rs @@ -399,6 +399,7 @@ mod tests { max_concurrent_requests_for_object: Some(100.try_into()?), ideal_concurrent_request_size: Some(1.try_into()?), }), + ..repo.storage().default_settings() }); let repo = repo.reopen(Some(config), None)?; assert_eq!(