Skip to content

Commit

Permalink
Add optional overrides for nonconforming object stores
Browse files Browse the repository at this point in the history
  • Loading branch information
paraseba committed Feb 20, 2025
1 parent c1dc921 commit 1c4fce6
Show file tree
Hide file tree
Showing 11 changed files with 282 additions and 107 deletions.
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ unwrap_used = "warn"
panic = "warn"
todo = "warn"
unimplemented = "warn"
dbg_macro = "warn"

[workspace.metadata.release]
allow-branch = ["main"]
Expand Down
37 changes: 35 additions & 2 deletions icechunk-python/python/icechunk/_icechunk_python.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -585,26 +585,59 @@ class StorageConcurrencySettings:
class StorageSettings:
"""Configuration for how Icechunk uses its Storage instance"""

def __init__(self, concurrency: StorageConcurrencySettings | None = None) -> None:
def __init__(
self,
concurrency: StorageConcurrencySettings | None = None,
unsafe_use_conditional_create: bool | None = None,
unsafe_use_conditional_update: bool | None = None,
unsafe_use_metadata: bool | None = None,
) -> None:
"""
Create a new `StorageSettings` object
Parameters
----------
concurrency: StorageConcurrencySettings | None
The configuration for how Icechunk uses its Storage instance.
unsafe_use_conditional_update: bool | None
If set to False, Icechunk loses some of its consistency guarantees.
This is only useful in object stores that don't support the feature.
Use it at your own risk.
unsafe_use_conditional_create: bool | None
If set to False, Icechunk loses some of its consistency guarantees.
This is only useful in object stores that don't support the feature.
Use at your own risk.
unsafe_use_metadata: bool | None
Don't write metadata fields in Icechunk files.
This is only useful in object stores that don't support the feature.
Use at your own risk.
"""
...
@property
def concurrency(self) -> StorageConcurrencySettings | None:
"""
The configuration for how Icechunk uses its Storage instance.
The configuration for how much concurrency Icechunk store uses
Returns
-------
StorageConcurrencySettings | None
The configuration for how Icechunk uses its Storage instance.
"""

@property
def unsafe_use_conditional_update(self) -> bool | None:
"""True if Icechunk will use conditional PUT operations for updates in the object store"""
...
@property
def unsafe_use_conditional_create(self) -> bool | None:
"""True if Icechunk will use conditional PUT operations for creation in the object store"""
...
@property
def unsafe_use_metadata(self) -> bool | None:
"""True if Icechunk will write object metadata in the object store"""
...

class RepositoryConfig:
Expand Down
51 changes: 36 additions & 15 deletions icechunk-python/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,13 +88,13 @@ impl PyS3StaticCredentials {
r#"S3StaticCredentials(access_key_id="{ak}", secret_access_key="{sk}", session_token={st}, expires_after={ea})"#,
ak = self.access_key_id.as_str(),
sk = self.secret_access_key.as_str(),
st = format_option_string(self.session_token.as_ref()),
st = format_option(self.session_token.as_ref()),
ea = format_option(self.expires_after.as_ref().map(datetime_repr))
)
}
}

fn format_option_to_string<T: Display>(o: Option<T>) -> String {
pub(crate) fn format_option_to_string<T: Display>(o: Option<T>) -> String {
match o.as_ref() {
None => "None".to_string(),
Some(s) => s.to_string(),
Expand All @@ -108,13 +108,6 @@ fn format_option<'a, T: AsRef<str> + 'a>(o: Option<T>) -> String {
}
}

pub(crate) fn format_option_string<'a, T: AsRef<str> + 'a>(o: Option<T>) -> String {
match o.as_ref() {
None => "None".to_string(),
Some(s) => format!(r#""{}""#, s.as_ref()),
}
}

fn format_bool(b: bool) -> &'static str {
match b {
true => "True",
Expand Down Expand Up @@ -377,8 +370,8 @@ impl PyS3Options {
// TODO: escape
format!(
r#"S3Options(region={region}, endpoint_url={url}, allow_http={http}, anonymous={anon})"#,
region = format_option_string(self.region.as_ref()),
url = format_option_string(self.endpoint_url.as_ref()),
region = format_option(self.region.as_ref()),
url = format_option(self.endpoint_url.as_ref()),
http = format_bool(self.allow_http),
anon = format_bool(self.anonymous),
)
Expand Down Expand Up @@ -707,6 +700,12 @@ fn storage_concurrency_settings_repr(s: &PyStorageConcurrencySettings) -> String
pub struct PyStorageSettings {
#[pyo3(get, set)]
pub concurrency: Option<Py<PyStorageConcurrencySettings>>,
#[pyo3(get, set)]
pub unsafe_use_conditional_update: Option<bool>,
#[pyo3(get, set)]
pub unsafe_use_conditional_create: Option<bool>,
#[pyo3(get, set)]
pub unsafe_use_metadata: Option<bool>,
}

impl From<storage::Settings> for PyStorageSettings {
Expand All @@ -717,6 +716,9 @@ impl From<storage::Settings> for PyStorageSettings {
Py::new(py, Into::<PyStorageConcurrencySettings>::into(c))
.expect("Cannot create instance of StorageConcurrencySettings")
}),
unsafe_use_conditional_create: value.unsafe_use_conditional_create,
unsafe_use_conditional_update: value.unsafe_use_conditional_update,
unsafe_use_metadata: value.unsafe_use_metadata,
})
}
}
Expand All @@ -725,6 +727,9 @@ impl From<&PyStorageSettings> for storage::Settings {
fn from(value: &PyStorageSettings) -> Self {
Python::with_gil(|py| Self {
concurrency: value.concurrency.as_ref().map(|c| (&*c.borrow(py)).into()),
unsafe_use_conditional_create: value.unsafe_use_conditional_create,
unsafe_use_conditional_update: value.unsafe_use_conditional_update,
unsafe_use_metadata: value.unsafe_use_metadata,
})
}
}
Expand All @@ -741,10 +746,20 @@ impl Eq for PyStorageSettings {}

#[pymethods]
impl PyStorageSettings {
#[pyo3(signature = ( concurrency=None))]
#[pyo3(signature = ( concurrency=None, unsafe_use_conditional_create=None, unsafe_use_conditional_update=None, unsafe_use_metadata=None))]
#[new]
pub fn new(concurrency: Option<Py<PyStorageConcurrencySettings>>) -> Self {
Self { concurrency }
pub fn new(
concurrency: Option<Py<PyStorageConcurrencySettings>>,
unsafe_use_conditional_create: Option<bool>,
unsafe_use_conditional_update: Option<bool>,
unsafe_use_metadata: Option<bool>,
) -> Self {
Self {
concurrency,
unsafe_use_conditional_create,
unsafe_use_metadata,
unsafe_use_conditional_update,
}
}

pub fn __repr__(&self) -> String {
Expand All @@ -756,7 +771,13 @@ impl PyStorageSettings {
}),
};

format!(r#"StorageSettings(concurrency={conc})"#, conc = inner)
format!(
r#"StorageSettings(concurrency={conc}, unsafe_use_conditional_create={cr}, unsafe_use_conditional_update={up}, unsafe_use_metadata={me})"#,
conc = inner,
cr = format_option(self.unsafe_use_conditional_create.map(format_bool)),
up = format_option(self.unsafe_use_conditional_update.map(format_bool)),
me = format_option(self.unsafe_use_metadata.map(format_bool))
)
}
}

Expand Down
4 changes: 2 additions & 2 deletions icechunk-python/src/repository.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ use tokio::sync::{Mutex, RwLock};

use crate::{
config::{
datetime_repr, format_option_string, PyCredentials, PyRepositoryConfig,
datetime_repr, format_option_to_string, PyCredentials, PyRepositoryConfig,
PyStorage, PyStorageSettings,
},
errors::PyIcechunkStoreError,
Expand Down Expand Up @@ -176,7 +176,7 @@ impl PySnapshotInfo {
format!(
r#"SnapshotInfo(id="{id}", parent_id={parent}, written_at={at}, message="{message}")"#,
id = self.id,
parent = format_option_string(self.parent_id.as_ref()),
parent = format_option_to_string(self.parent_id.as_ref()),
at = datetime_repr(&self.written_at),
message = self.message.chars().take(10).collect::<String>() + "...",
)
Expand Down
2 changes: 1 addition & 1 deletion icechunk-python/tests/test_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ def test_can_change_deep_config_values() -> None:
)

assert re.match(
r"RepositoryConfig\(inline_chunk_threshold_bytes=5, get_partial_values_concurrency=42, compression=CompressionConfig\(algorithm=None, level=2\), caching=CachingConfig\(num_snapshot_nodes=None, num_chunk_refs=8, num_transaction_changes=None, num_bytes_attributes=None, num_bytes_chunks=None\), storage=StorageSettings\(concurrency=StorageConcurrencySettings\(max_concurrent_requests_for_object=5, ideal_concurrent_request_size=1000000\)\), manifest=.*\)",
r"RepositoryConfig\(inline_chunk_threshold_bytes=5, get_partial_values_concurrency=42, compression=CompressionConfig\(algorithm=None, level=2\), caching=CachingConfig\(num_snapshot_nodes=None, num_chunk_refs=8, num_transaction_changes=None, num_bytes_attributes=None, num_bytes_chunks=None\), storage=StorageSettings\(concurrency=StorageConcurrencySettings\(max_concurrent_requests_for_object=5, ideal_concurrent_request_size=1000000\), unsafe_use_conditional_create=None, unsafe_use_conditional_update=None, unsafe_use_metadata=None\), manifest=.*\)",
repr(config),
)
repo = icechunk.Repository.open(
Expand Down
1 change: 1 addition & 0 deletions icechunk/examples/multithreaded_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
async fn writer(name: &str, range: Range<u64>, store: &Store) {
println!("Starting writer {name}.");
for i in range {
#[allow(clippy::dbg_macro)]
if let Err(err) = store
.set(
format!("array/c/{i}").as_str(),
Expand Down
42 changes: 42 additions & 0 deletions icechunk/src/storage/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,9 @@ impl ConcurrencySettings {
#[derive(Debug, PartialEq, Eq, Serialize, Deserialize, Clone, Default)]
pub struct Settings {
pub concurrency: Option<ConcurrencySettings>,
pub unsafe_use_conditional_update: Option<bool>,
pub unsafe_use_conditional_create: Option<bool>,
pub unsafe_use_metadata: Option<bool>,
}

static DEFAULT_CONCURRENCY: OnceLock<ConcurrencySettings> = OnceLock::new();
Expand All @@ -185,6 +188,18 @@ impl Settings {
.unwrap_or_else(|| DEFAULT_CONCURRENCY.get_or_init(Default::default))
}

pub fn unsafe_use_conditional_create(&self) -> bool {
self.unsafe_use_conditional_create.unwrap_or(true)
}

pub fn unsafe_use_conditional_update(&self) -> bool {
self.unsafe_use_conditional_update.unwrap_or(true)
}

pub fn unsafe_use_metadata(&self) -> bool {
self.unsafe_use_metadata.unwrap_or(true)
}

pub fn merge(&self, other: Self) -> Self {
Self {
concurrency: match (&self.concurrency, other.concurrency) {
Expand All @@ -193,6 +208,33 @@ impl Settings {
(Some(c), None) => Some(c.clone()),
(Some(mine), Some(theirs)) => Some(mine.merge(theirs)),
},
unsafe_use_conditional_create: match (
&self.unsafe_use_conditional_create,
other.unsafe_use_conditional_create,
) {
(None, None) => None,
(None, Some(c)) => Some(c),
(Some(c), None) => Some(*c),
(Some(_), Some(theirs)) => Some(theirs),
},
unsafe_use_conditional_update: match (
&self.unsafe_use_conditional_update,
other.unsafe_use_conditional_update,
) {
(None, None) => None,
(None, Some(c)) => Some(c),
(Some(c), None) => Some(*c),
(Some(_), Some(theirs)) => Some(theirs),
},
unsafe_use_metadata: match (
&self.unsafe_use_metadata,
other.unsafe_use_metadata,
) {
(None, None) => None,
(None, Some(c)) => Some(c),
(Some(c), None) => Some(*c),
(Some(_), Some(theirs)) => Some(theirs),
},
}
}
}
Expand Down
Loading

0 comments on commit 1c4fce6

Please sign in to comment.