Skip to content

Commit

Permalink
Pickle Support for python IcechunkStore [EAR-1326] (#134)
Browse files Browse the repository at this point in the history
* Start adding consolidated store tracking for serialization

* Tests passing, add new pickle ops without testing

* Add failing test

* linter

* Switch to cow, doesnt help

* filesysmte store works

* Workign pickling!

* lint

* Clean out deprecated methods

* Get rid of rmp serde for now

* Simplify equality, tests

* Fix lint

* Add fixme comments
  • Loading branch information
mpiannucci authored Oct 4, 2024
1 parent b93a6b9 commit 54493a1
Show file tree
Hide file tree
Showing 9 changed files with 212 additions and 111 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions icechunk-python/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ pyo3-asyncio-0-21 = { version = "0.21.0", features = ["tokio-runtime"] }
async-stream = "0.3.5"
thiserror = "1.0.64"
tokio = "1.40"
serde_json = "1.0.128"

[lints]
workspace = true
83 changes: 18 additions & 65 deletions icechunk-python/python/icechunk/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
# module
import json
from collections.abc import AsyncGenerator, Iterable
from typing import Any, Self

Expand All @@ -17,14 +16,15 @@
VirtualRefConfig,
pyicechunk_store_create,
pyicechunk_store_exists,
pyicechunk_store_from_json_config,
pyicechunk_store_open_existing,
pyicechunk_store_from_bytes,
)

__all__ = [
"IcechunkStore",
"StorageConfig",
"S3Credentials",
"SnapshotMetadata",
"StoreConfig",
"VirtualRefConfig",
]
Expand Down Expand Up @@ -89,64 +89,6 @@ def __init__(
)
self._store = store

@classmethod
async def from_config(
cls, config: dict, mode: AccessModeLiteral = "r", *args: Any, **kwargs: Any
) -> Self:
"""Create an IcechunkStore from a given configuration.
NOTE: This is deprecated and will be removed in a future release. Use the open_existing or create methods instead.
The configuration should be a dictionary in the following format:
{
"storage": {
"type": "s3, // one of "in_memory", "local_filesystem", "s3", "cached"
"...": "additional storage configuration"
},
"repository": {
// Optional, only required if you want to open an existing repository
"version": {
"branch": "main",
},
},
"config": {
// The threshold at which chunks are stored inline and not written to chunk storage
inline_chunk_threshold_bytes: 512
}
}
The following storage types are supported:
- in_memory: store data in memory
- local_filesystem: store data on the local filesystem
- s3: store data on S3 compatible storage
- cached: store data in memory with a backing storage
The following additional configuration options are supported for each storage type:
- in_memory: {}
- local_filesystem: {"root": "path/to/root/directory"}
- s3: {
"bucket": "bucket-name",
"prefix": "optional-prefix",
"access_key_id": "optional-access-key-id",
"secret_access_key": "optional",
"session_token": "optional",
"endpoint": "optional"
}
- cached: {
"approx_max_memory_bytes": 1_000_000,
"backend": {
"type": "s3",
"...": "additional storage configuration"
}
}
If opened with AccessModeLiteral "r", the store will be read-only. Otherwise the store will be writable.
"""
config_str = json.dumps(config)
read_only = mode == "r"
store = await pyicechunk_store_from_json_config(config_str, read_only=read_only)
return cls(store=store, mode=mode, args=args, kwargs=kwargs)

@classmethod
async def open_existing(
cls,
Expand Down Expand Up @@ -218,6 +160,22 @@ def with_mode(self, mode: AccessModeLiteral) -> Self:
new_store = self._store.with_mode(read_only)
return self.__class__(new_store, mode=mode)

def __eq__(self, value: object) -> bool:
if not isinstance(value, self.__class__):
return False
return self._store == value._store

def __getstate__(self) -> object:
store_repr = self._store.as_bytes()
return {"store": store_repr, "mode": self.mode}

def __setstate__(self, state: Any) -> None:
store_repr = state["store"]
mode = state['mode']
is_read_only = (mode == "r")
self._store = pyicechunk_store_from_bytes(store_repr, is_read_only)
self._is_open = True

@property
def snapshot_id(self) -> str:
"""Return the current snapshot id."""
Expand Down Expand Up @@ -494,8 +452,3 @@ def list_dir(self, prefix: str) -> AsyncGenerator[str, None]:
# listing methods should not be async, so we need to
# wrap the async method in a sync method.
return self._store.list_dir(prefix)

def __eq__(self, other) -> bool:
if other is self:
return True
raise NotImplementedError
8 changes: 6 additions & 2 deletions icechunk-python/python/icechunk/_icechunk_python.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import datetime
from collections.abc import AsyncGenerator

class PyIcechunkStore:
def as_bytes(self) -> bytes: ...
def with_mode(self, read_only: bool) -> PyIcechunkStore: ...
@property
def snapshot_id(self) -> str: ...
Expand Down Expand Up @@ -228,6 +229,9 @@ async def pyicechunk_store_create(
async def pyicechunk_store_open_existing(
storage: StorageConfig, read_only: bool, config: StoreConfig
) -> PyIcechunkStore: ...
async def pyicechunk_store_from_json_config(
config: str, read_only: bool
# async def pyicechunk_store_from_json_config(
# config: str, read_only: bool
# ) -> PyIcechunkStore: ...
def pyicechunk_store_from_bytes(
bytes: bytes, read_only: bool
) -> PyIcechunkStore: ...
107 changes: 67 additions & 40 deletions icechunk-python/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ mod errors;
mod storage;
mod streams;

use std::sync::Arc;
use std::{borrow::Cow, sync::Arc};

use ::icechunk::{format::ChunkOffset, Store};
use bytes::Bytes;
Expand All @@ -27,6 +27,7 @@ use tokio::sync::{Mutex, RwLock};

#[pyclass]
struct PyIcechunkStore {
consolidated: ConsolidatedStore,
store: Arc<RwLock<Store>>,
rt: tokio::runtime::Runtime,
}
Expand All @@ -50,6 +51,7 @@ impl From<&PyStoreConfig> for RepositoryConfig {
version: None,
inline_chunk_threshold_bytes: config.inline_chunk_threshold_bytes,
unsafe_overwrite_refs: config.unsafe_overwrite_refs,
change_set_bytes: None,
virtual_ref_config: config
.virtual_ref_config
.as_ref()
Expand Down Expand Up @@ -111,6 +113,10 @@ impl From<SnapshotMetadata> for PySnapshotMetadata {
type KeyRanges = Vec<(String, (Option<ChunkOffset>, Option<ChunkOffset>))>;

impl PyIcechunkStore {
pub(crate) fn consolidated(&self) -> &ConsolidatedStore {
&self.consolidated
}

async fn store_exists(storage: StorageConfig) -> PyIcechunkStoreResult<bool> {
let storage = storage
.make_cached_storage()
Expand All @@ -126,69 +132,55 @@ impl PyIcechunkStore {
repository_config: RepositoryConfig,
store_config: StoreOptions,
) -> Result<Self, String> {
let access_mode = if read_only {
icechunk::zarr::AccessMode::ReadOnly
} else {
icechunk::zarr::AccessMode::ReadWrite
};
let repository = repository_config
.with_version(VersionInfo::BranchTipRef(Ref::DEFAULT_BRANCH.to_string()));
let config =
let consolidated =
ConsolidatedStore { storage, repository, config: Some(store_config) };

let store = Store::from_consolidated(&config, access_mode).await?;
let store = Arc::new(RwLock::new(store));
let rt = tokio::runtime::Runtime::new().map_err(|e| e.to_string())?;
Ok(Self { store, rt })
PyIcechunkStore::from_consolidated(consolidated, read_only).await
}

async fn create(
storage: StorageConfig,
repository_config: RepositoryConfig,
store_config: StoreOptions,
) -> Result<Self, String> {
let config = ConsolidatedStore {
let consolidated = ConsolidatedStore {
storage,
repository: repository_config,
config: Some(store_config),
};

let store =
Store::from_consolidated(&config, icechunk::zarr::AccessMode::ReadWrite)
.await?;
let store = Arc::new(RwLock::new(store));
let rt = tokio::runtime::Runtime::new().map_err(|e| e.to_string())?;
Ok(Self { store, rt })
PyIcechunkStore::from_consolidated(consolidated, false).await
}

async fn from_json_config(json: &[u8], read_only: bool) -> Result<Self, String> {
async fn from_consolidated(
consolidated: ConsolidatedStore,
read_only: bool,
) -> Result<Self, String> {
let access_mode = if read_only {
icechunk::zarr::AccessMode::ReadOnly
} else {
icechunk::zarr::AccessMode::ReadWrite
};
let store = Store::from_json(json, access_mode).await?;

let store = Store::from_consolidated(&consolidated, access_mode).await?;
let store = Arc::new(RwLock::new(store));
let rt = tokio::runtime::Runtime::new().map_err(|e| e.to_string())?;
Ok(Self { store, rt })
Ok(Self { consolidated, store, rt })
}
}

#[pyfunction]
fn pyicechunk_store_from_json_config(
py: Python<'_>,
json: String,
read_only: bool,
) -> PyResult<Bound<'_, PyAny>> {
let json = json.as_bytes().to_owned();
async fn as_consolidated(&self) -> PyIcechunkStoreResult<ConsolidatedStore> {
let consolidated = self.consolidated.clone();

// The commit mechanism is async and calls tokio::spawn so we need to use the
// pyo3_asyncio_0_21::tokio helper to run the async function in the tokio runtime
pyo3_asyncio_0_21::tokio::future_into_py(py, async move {
PyIcechunkStore::from_json_config(&json, read_only)
.await
.map_err(PyValueError::new_err)
})
let store = self.store.read().await;
let version = store.current_version().await;
let change_set = store.change_set_bytes().await?;

let consolidated =
consolidated.with_version(version).with_change_set_bytes(change_set)?;
Ok(consolidated)
}
}

#[pyfunction]
Expand Down Expand Up @@ -240,20 +232,55 @@ fn pyicechunk_store_create<'py>(
})
}

#[pyfunction]
fn pyicechunk_store_from_bytes(
bytes: Cow<[u8]>,
read_only: bool,
) -> PyResult<PyIcechunkStore> {
// FIXME: Use rmp_serde instead of serde_json to optimize performance
let consolidated: ConsolidatedStore = serde_json::from_slice(&bytes)
.map_err(|e| PyValueError::new_err(e.to_string()))?;

let rt = tokio::runtime::Runtime::new()
.map_err(|e| PyIcechunkStoreError::UnkownError(e.to_string()))?;
let store = rt.block_on(async move {
PyIcechunkStore::from_consolidated(consolidated, read_only)
.await
.map_err(PyValueError::new_err)
})?;

Ok(store)
}

#[pymethods]
impl PyIcechunkStore {
fn __eq__(&self, other: &Self) -> bool {
self.consolidated.storage == other.consolidated().storage
}

fn as_bytes(&self) -> PyResult<Cow<[u8]>> {
let consolidated = self.rt.block_on(self.as_consolidated())?;

// FIXME: Use rmp_serde instead of serde_json to optimize performance
let serialized = serde_json::to_vec(&consolidated)
.map_err(|e| PyValueError::new_err(e.to_string()))?;
Ok(Cow::Owned(serialized))
}

fn with_mode(&self, read_only: bool) -> PyResult<PyIcechunkStore> {
let access_mode = if read_only {
icechunk::zarr::AccessMode::ReadOnly
} else {
icechunk::zarr::AccessMode::ReadWrite
};
let store = self.store.blocking_read().with_access_mode(access_mode);
let store = Arc::new(RwLock::new(store));

let readable_store = self.store.blocking_read();
let consolidated = self.rt.block_on(self.as_consolidated())?;
let store = Arc::new(RwLock::new(readable_store.with_access_mode(access_mode)));
let rt = tokio::runtime::Runtime::new()
.map_err(|e| PyValueError::new_err(e.to_string()))?;

Ok(PyIcechunkStore { store, rt })
Ok(PyIcechunkStore { consolidated, store, rt })
}

fn checkout_snapshot<'py>(
Expand Down Expand Up @@ -732,9 +759,9 @@ fn _icechunk_python(m: &Bound<'_, PyModule>) -> PyResult<()> {
m.add_class::<PyStoreConfig>()?;
m.add_class::<PySnapshotMetadata>()?;
m.add_class::<PyVirtualRefConfig>()?;
m.add_function(wrap_pyfunction!(pyicechunk_store_from_json_config, m)?)?;
m.add_function(wrap_pyfunction!(pyicechunk_store_exists, m)?)?;
m.add_function(wrap_pyfunction!(pyicechunk_store_create, m)?)?;
m.add_function(wrap_pyfunction!(pyicechunk_store_open_existing, m)?)?;
m.add_function(wrap_pyfunction!(pyicechunk_store_from_bytes, m)?)?;
Ok(())
}
5 changes: 3 additions & 2 deletions icechunk-python/tests/test_concurrency.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,9 @@ async def list_store(store, barrier):


async def test_concurrency():
store = await icechunk.IcechunkStore.from_config(
config={"storage": {"type": "in_memory"}, "repository": {}}, mode="w"
store = await icechunk.IcechunkStore.open(
mode="w",
storage=icechunk.StorageConfig.memory(prefix='concurrency'),
)

group = zarr.group(store=store, overwrite=True)
Expand Down
Loading

0 comments on commit 54493a1

Please sign in to comment.