Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add optional overrides for nonconforming object stores #763

Merged
merged 1 commit into from
Feb 21, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ unwrap_used = "warn"
panic = "warn"
todo = "warn"
unimplemented = "warn"
dbg_macro = "warn"

[workspace.metadata.release]
allow-branch = ["main"]
Expand Down
37 changes: 35 additions & 2 deletions icechunk-python/python/icechunk/_icechunk_python.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -585,26 +585,59 @@ class StorageConcurrencySettings:
class StorageSettings:
"""Configuration for how Icechunk uses its Storage instance"""

def __init__(self, concurrency: StorageConcurrencySettings | None = None) -> None:
def __init__(
self,
concurrency: StorageConcurrencySettings | None = None,
unsafe_use_conditional_create: bool | None = None,
unsafe_use_conditional_update: bool | None = None,
unsafe_use_metadata: bool | None = None,
) -> None:
"""
Create a new `StorageSettings` object

Parameters
----------
concurrency: StorageConcurrencySettings | None
The configuration for how Icechunk uses its Storage instance.

unsafe_use_conditional_update: bool | None
If set to False, Icechunk loses some of its consistency guarantees.
This is only useful in object stores that don't support the feature.
Use it at your own risk.

unsafe_use_conditional_create: bool | None
If set to False, Icechunk loses some of its consistency guarantees.
This is only useful in object stores that don't support the feature.
Use at your own risk.

unsafe_use_metadata: bool | None
Don't write metadata fields in Icechunk files.
This is only useful in object stores that don't support the feature.
Use at your own risk.
"""
...
@property
def concurrency(self) -> StorageConcurrencySettings | None:
"""
The configuration for how Icechunk uses its Storage instance.
The configuration for how much concurrency Icechunk store uses

Returns
-------
StorageConcurrencySettings | None
The configuration for how Icechunk uses its Storage instance.
"""

@property
def unsafe_use_conditional_update(self) -> bool | None:
"""True if Icechunk will use conditional PUT operations for updates in the object store"""
...
@property
def unsafe_use_conditional_create(self) -> bool | None:
"""True if Icechunk will use conditional PUT operations for creation in the object store"""
...
@property
def unsafe_use_metadata(self) -> bool | None:
"""True if Icechunk will write object metadata in the object store"""
...

class RepositoryConfig:
Expand Down
51 changes: 36 additions & 15 deletions icechunk-python/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,13 +88,13 @@ impl PyS3StaticCredentials {
r#"S3StaticCredentials(access_key_id="{ak}", secret_access_key="{sk}", session_token={st}, expires_after={ea})"#,
ak = self.access_key_id.as_str(),
sk = self.secret_access_key.as_str(),
st = format_option_string(self.session_token.as_ref()),
st = format_option(self.session_token.as_ref()),
ea = format_option(self.expires_after.as_ref().map(datetime_repr))
)
}
}

fn format_option_to_string<T: Display>(o: Option<T>) -> String {
pub(crate) fn format_option_to_string<T: Display>(o: Option<T>) -> String {
match o.as_ref() {
None => "None".to_string(),
Some(s) => s.to_string(),
Expand All @@ -108,13 +108,6 @@ fn format_option<'a, T: AsRef<str> + 'a>(o: Option<T>) -> String {
}
}

pub(crate) fn format_option_string<'a, T: AsRef<str> + 'a>(o: Option<T>) -> String {
match o.as_ref() {
None => "None".to_string(),
Some(s) => format!(r#""{}""#, s.as_ref()),
}
}

fn format_bool(b: bool) -> &'static str {
match b {
true => "True",
Expand Down Expand Up @@ -377,8 +370,8 @@ impl PyS3Options {
// TODO: escape
format!(
r#"S3Options(region={region}, endpoint_url={url}, allow_http={http}, anonymous={anon})"#,
region = format_option_string(self.region.as_ref()),
url = format_option_string(self.endpoint_url.as_ref()),
region = format_option(self.region.as_ref()),
url = format_option(self.endpoint_url.as_ref()),
http = format_bool(self.allow_http),
anon = format_bool(self.anonymous),
)
Expand Down Expand Up @@ -707,6 +700,12 @@ fn storage_concurrency_settings_repr(s: &PyStorageConcurrencySettings) -> String
pub struct PyStorageSettings {
#[pyo3(get, set)]
pub concurrency: Option<Py<PyStorageConcurrencySettings>>,
#[pyo3(get, set)]
pub unsafe_use_conditional_update: Option<bool>,
#[pyo3(get, set)]
pub unsafe_use_conditional_create: Option<bool>,
#[pyo3(get, set)]
pub unsafe_use_metadata: Option<bool>,
}

impl From<storage::Settings> for PyStorageSettings {
Expand All @@ -717,6 +716,9 @@ impl From<storage::Settings> for PyStorageSettings {
Py::new(py, Into::<PyStorageConcurrencySettings>::into(c))
.expect("Cannot create instance of StorageConcurrencySettings")
}),
unsafe_use_conditional_create: value.unsafe_use_conditional_create,
unsafe_use_conditional_update: value.unsafe_use_conditional_update,
unsafe_use_metadata: value.unsafe_use_metadata,
})
}
}
Expand All @@ -725,6 +727,9 @@ impl From<&PyStorageSettings> for storage::Settings {
fn from(value: &PyStorageSettings) -> Self {
Python::with_gil(|py| Self {
concurrency: value.concurrency.as_ref().map(|c| (&*c.borrow(py)).into()),
unsafe_use_conditional_create: value.unsafe_use_conditional_create,
unsafe_use_conditional_update: value.unsafe_use_conditional_update,
unsafe_use_metadata: value.unsafe_use_metadata,
})
}
}
Expand All @@ -741,10 +746,20 @@ impl Eq for PyStorageSettings {}

#[pymethods]
impl PyStorageSettings {
#[pyo3(signature = ( concurrency=None))]
#[pyo3(signature = ( concurrency=None, unsafe_use_conditional_create=None, unsafe_use_conditional_update=None, unsafe_use_metadata=None))]
#[new]
pub fn new(concurrency: Option<Py<PyStorageConcurrencySettings>>) -> Self {
Self { concurrency }
pub fn new(
concurrency: Option<Py<PyStorageConcurrencySettings>>,
unsafe_use_conditional_create: Option<bool>,
unsafe_use_conditional_update: Option<bool>,
unsafe_use_metadata: Option<bool>,
) -> Self {
Self {
concurrency,
unsafe_use_conditional_create,
unsafe_use_metadata,
unsafe_use_conditional_update,
}
}

pub fn __repr__(&self) -> String {
Expand All @@ -756,7 +771,13 @@ impl PyStorageSettings {
}),
};

format!(r#"StorageSettings(concurrency={conc})"#, conc = inner)
format!(
r#"StorageSettings(concurrency={conc}, unsafe_use_conditional_create={cr}, unsafe_use_conditional_update={up}, unsafe_use_metadata={me})"#,
conc = inner,
cr = format_option(self.unsafe_use_conditional_create.map(format_bool)),
up = format_option(self.unsafe_use_conditional_update.map(format_bool)),
me = format_option(self.unsafe_use_metadata.map(format_bool))
)
}
}

Expand Down
4 changes: 2 additions & 2 deletions icechunk-python/src/repository.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ use tokio::sync::{Mutex, RwLock};

use crate::{
config::{
datetime_repr, format_option_string, PyCredentials, PyRepositoryConfig,
datetime_repr, format_option_to_string, PyCredentials, PyRepositoryConfig,
PyStorage, PyStorageSettings,
},
errors::PyIcechunkStoreError,
Expand Down Expand Up @@ -176,7 +176,7 @@ impl PySnapshotInfo {
format!(
r#"SnapshotInfo(id="{id}", parent_id={parent}, written_at={at}, message="{message}")"#,
id = self.id,
parent = format_option_string(self.parent_id.as_ref()),
parent = format_option_to_string(self.parent_id.as_ref()),
at = datetime_repr(&self.written_at),
message = self.message.chars().take(10).collect::<String>() + "...",
)
Expand Down
2 changes: 1 addition & 1 deletion icechunk-python/tests/test_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ def test_can_change_deep_config_values() -> None:
)

assert re.match(
r"RepositoryConfig\(inline_chunk_threshold_bytes=5, get_partial_values_concurrency=42, compression=CompressionConfig\(algorithm=None, level=2\), caching=CachingConfig\(num_snapshot_nodes=None, num_chunk_refs=8, num_transaction_changes=None, num_bytes_attributes=None, num_bytes_chunks=None\), storage=StorageSettings\(concurrency=StorageConcurrencySettings\(max_concurrent_requests_for_object=5, ideal_concurrent_request_size=1000000\)\), manifest=.*\)",
r"RepositoryConfig\(inline_chunk_threshold_bytes=5, get_partial_values_concurrency=42, compression=CompressionConfig\(algorithm=None, level=2\), caching=CachingConfig\(num_snapshot_nodes=None, num_chunk_refs=8, num_transaction_changes=None, num_bytes_attributes=None, num_bytes_chunks=None\), storage=StorageSettings\(concurrency=StorageConcurrencySettings\(max_concurrent_requests_for_object=5, ideal_concurrent_request_size=1000000\), unsafe_use_conditional_create=None, unsafe_use_conditional_update=None, unsafe_use_metadata=None\), manifest=.*\)",
repr(config),
)
repo = icechunk.Repository.open(
Expand Down
1 change: 1 addition & 0 deletions icechunk/examples/multithreaded_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
async fn writer(name: &str, range: Range<u64>, store: &Store) {
println!("Starting writer {name}.");
for i in range {
#[allow(clippy::dbg_macro)]
if let Err(err) = store
.set(
format!("array/c/{i}").as_str(),
Expand Down
42 changes: 42 additions & 0 deletions icechunk/src/storage/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,9 @@ impl ConcurrencySettings {
#[derive(Debug, PartialEq, Eq, Serialize, Deserialize, Clone, Default)]
pub struct Settings {
pub concurrency: Option<ConcurrencySettings>,
pub unsafe_use_conditional_update: Option<bool>,
pub unsafe_use_conditional_create: Option<bool>,
pub unsafe_use_metadata: Option<bool>,
}

static DEFAULT_CONCURRENCY: OnceLock<ConcurrencySettings> = OnceLock::new();
Expand All @@ -185,6 +188,18 @@ impl Settings {
.unwrap_or_else(|| DEFAULT_CONCURRENCY.get_or_init(Default::default))
}

pub fn unsafe_use_conditional_create(&self) -> bool {
self.unsafe_use_conditional_create.unwrap_or(true)
}

pub fn unsafe_use_conditional_update(&self) -> bool {
self.unsafe_use_conditional_update.unwrap_or(true)
}

pub fn unsafe_use_metadata(&self) -> bool {
self.unsafe_use_metadata.unwrap_or(true)
}

pub fn merge(&self, other: Self) -> Self {
Self {
concurrency: match (&self.concurrency, other.concurrency) {
Expand All @@ -193,6 +208,33 @@ impl Settings {
(Some(c), None) => Some(c.clone()),
(Some(mine), Some(theirs)) => Some(mine.merge(theirs)),
},
unsafe_use_conditional_create: match (
&self.unsafe_use_conditional_create,
other.unsafe_use_conditional_create,
) {
(None, None) => None,
(None, Some(c)) => Some(c),
(Some(c), None) => Some(*c),
(Some(_), Some(theirs)) => Some(theirs),
},
unsafe_use_conditional_update: match (
&self.unsafe_use_conditional_update,
other.unsafe_use_conditional_update,
) {
(None, None) => None,
(None, Some(c)) => Some(c),
(Some(c), None) => Some(*c),
(Some(_), Some(theirs)) => Some(theirs),
},
unsafe_use_metadata: match (
&self.unsafe_use_metadata,
other.unsafe_use_metadata,
) {
(None, None) => None,
(None, Some(c)) => Some(c),
(Some(c), None) => Some(*c),
(Some(_), Some(theirs)) => Some(theirs),
},
}
}
}
Expand Down
Loading
Loading