Skip to content

Commit

Permalink
Virtual chunk location inspection (#504)
Browse files Browse the repository at this point in the history
  • Loading branch information
paraseba authored Dec 24, 2024
1 parent fb74b71 commit 50e9315
Show file tree
Hide file tree
Showing 6 changed files with 70 additions and 3 deletions.
1 change: 1 addition & 0 deletions icechunk-python/python/icechunk/_icechunk_python.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,7 @@ class PySession:
def has_uncommitted_changes(self) -> bool: ...
def discard_changes(self) -> None: ...
def store(self, config: StoreConfig | None = None) -> PyStore: ...
def all_virtual_chunk_locations(self) -> list[str]: ...
def merge(self, other: PySession) -> None: ...
def commit(self, message: str) -> str: ...
def rebase(self, solver: ConflictSolver) -> None: ...
Expand Down
4 changes: 4 additions & 0 deletions icechunk-python/python/icechunk/session.py
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,10 @@ def store(self, config: StoreConfig | None = None) -> IcechunkStore:
"""Get a zarr Store object for reading and writing data from the repository using zarr python"""
return IcechunkStore(self._session.store(config))

def all_virtual_chunk_locations(self) -> list[str]:
"""Return the location URLs of all virtual chunks"""
return self._session.all_virtual_chunk_locations()

def merge(self, other: Self) -> None:
"""Merge the changes for this session with the changes from another session"""
self._session.merge(other._session)
Expand Down
17 changes: 17 additions & 0 deletions icechunk-python/src/session.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use std::{borrow::Cow, ops::Deref, sync::Arc};

use futures::TryStreamExt;
use icechunk::{session::Session, Store};
use pyo3::{prelude::*, types::PyType};
use tokio::sync::RwLock;
Expand Down Expand Up @@ -66,6 +67,22 @@ impl PySession {
Ok(PyStore(store))
}

pub fn all_virtual_chunk_locations(&self) -> PyResult<Vec<String>> {
let session = self.0.blocking_read();

pyo3_async_runtimes::tokio::get_runtime().block_on(async move {
let res = session
.all_virtual_chunk_locations()
.await
.map_err(PyIcechunkStoreError::SessionError)?
.try_collect()
.await
.map_err(PyIcechunkStoreError::SessionError)?;

Ok(res)
})
}

pub fn merge(&self, other: &PySession) -> PyResult<()> {
// TODO: Bad clone
let changes = other.0.blocking_read().deref().changes().clone();
Expand Down
5 changes: 5 additions & 0 deletions icechunk-python/tests/test_virtual_ref.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,11 @@ async def test_write_minio_virtual_refs():
# TODO: we should include the key and other info in the exception
await store.get("c/0/0/2", prototype=buffer_prototype)

all_locations = set(session.all_virtual_chunk_locations())
assert "s3://testbucket/path/to/python/non-existing" in all_locations
assert "s3://testbucket/path/to/python/chunk-1" in all_locations
assert "s3://testbucket/path/to/python/chunk-2" in all_locations

_snapshot_id = session.commit("Add virtual refs")


Expand Down
11 changes: 11 additions & 0 deletions icechunk/src/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -498,6 +498,17 @@ impl Session {
all_chunks(self.storage.as_ref(), &self.change_set, self.snapshot_id()).await
}

pub async fn all_virtual_chunk_locations(
&self,
) -> SessionResult<impl Stream<Item = SessionResult<String>> + '_> {
let stream =
self.all_chunks().await?.try_filter_map(|(_, info)| match info.payload {
ChunkPayload::Virtual(reference) => ready(Ok(Some(reference.location.0))),
_ => ready(Ok(None)),
});
Ok(stream)
}

/// Discard all uncommitted changes and return them as a `ChangeSet`
pub fn discard_changes(&mut self) -> ChangeSet {
std::mem::take(&mut self.change_set)
Expand Down
35 changes: 32 additions & 3 deletions icechunk/tests/test_virtual_refs.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#[cfg(test)]
#[allow(clippy::panic, clippy::unwrap_used, clippy::expect_used, clippy::expect_fun_call)]
mod tests {
use futures::TryStreamExt;
use icechunk::{
config::{Credentials, S3CompatibleOptions, StaticCredentials},
format::{
Expand All @@ -18,7 +19,13 @@ mod tests {
virtual_chunks::VirtualChunkContainer,
ObjectStoreConfig, Repository, RepositoryConfig, Storage, Store,
};
use std::{collections::HashMap, error::Error, num::NonZeroU64, path::PathBuf, vec};
use std::{
collections::{HashMap, HashSet},
error::Error,
num::NonZeroU64,
path::PathBuf,
vec,
};
use std::{path::Path as StdPath, sync::Arc};
use tempfile::TempDir;
use tokio::sync::RwLock;
Expand Down Expand Up @@ -577,9 +584,9 @@ mod tests {
];
write_chunks_to_minio(chunks.iter().cloned()).await;

let ds = repo.writable_session("main").await?;
let session = repo.writable_session("main").await?;
let store =
Store::from_session(Arc::new(RwLock::new(ds)), StoreConfig::default());
Store::from_session(Arc::new(RwLock::new(session)), StoreConfig::default());

store
.set(
Expand Down Expand Up @@ -741,6 +748,28 @@ mod tests {
))) if location == "s3://earthmover-sample-data/netcdf/oscar_vel2018.nc"
));

let session = store.session();
let locations = session
.read()
.await
.all_virtual_chunk_locations()
.await?
.try_collect::<HashSet<_>>()
.await?;
assert_eq!(
locations,
[
"s3://earthmover-sample-data/netcdf/oscar_vel2018.nc".to_string(),
"s3://testbucket/path/to/chunk-1".to_string(),
"s3://testbucket/path/to/chunk-2".to_string(),
"s3://testbucket/path/to/chunk-3".to_string(),
format!("file://{}", local_chunks[0].0),
format!("file://{}", local_chunks[1].0),
format!("file://{}", local_chunks[2].0),
]
.into()
);

Ok(())
}
}

0 comments on commit 50e9315

Please sign in to comment.