From e33b357a854c69820fa6eee932adaafc34ec41f6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Sebasti=C3=A1n=20Galkin?= Date: Mon, 13 Jan 2025 23:54:40 -0300 Subject: [PATCH] Better ancestry interface (#574) Now users can use a snapshot, tag or branch to fetch history. ```python class Repository: def ancestry( self, *, branch: str | None = None, tag: str | None = None, snapshot: str | None = None, ) -> list[SnapshotMetadata]: ... ``` Closes: #572 --- .../python/icechunk/_icechunk_python.pyi | 10 ++- icechunk-python/python/icechunk/repository.py | 26 ++++--- icechunk-python/src/repository.rs | 72 +++++++++++-------- icechunk-python/tests/test_can_read_old.py | 17 +++-- .../tests/test_stateful_repo_ops.py | 2 +- icechunk-python/tests/test_timetravel.py | 9 ++- icechunk/src/repository.rs | 31 +++++--- icechunk/src/session.rs | 20 ++++-- 8 files changed, 123 insertions(+), 64 deletions(-) diff --git a/icechunk-python/python/icechunk/_icechunk_python.pyi b/icechunk-python/python/icechunk/_icechunk_python.pyi index f759edd0..27f0c2a1 100644 --- a/icechunk-python/python/icechunk/_icechunk_python.pyi +++ b/icechunk-python/python/icechunk/_icechunk_python.pyi @@ -185,7 +185,13 @@ class PyRepository: @staticmethod def fetch_config(storage: Storage) -> RepositoryConfig | None: ... def save_config(self) -> None: ... - def ancestry(self, snapshot_id: str) -> list[SnapshotMetadata]: ... + def ancestry( + self, + *, + branch: str | None = None, + tag: str | None = None, + snapshot: str | None = None, + ) -> list[SnapshotMetadata]: ... def create_branch(self, branch: str, snapshot_id: str) -> None: ... def list_branches(self) -> set[str]: ... def lookup_branch(self, branch: str) -> str: ... @@ -199,7 +205,7 @@ class PyRepository: *, branch: str | None = None, tag: str | None = None, - snapshot_id: str | None = None, + snapshot: str | None = None, ) -> PySession: ... def writable_session(self, branch: str) -> PySession: ... diff --git a/icechunk-python/python/icechunk/repository.py b/icechunk-python/python/icechunk/repository.py index aa359b50..7dd74181 100644 --- a/icechunk-python/python/icechunk/repository.py +++ b/icechunk-python/python/icechunk/repository.py @@ -105,16 +105,26 @@ def save_config(self) -> None: """Save the repository configuration to storage, this configuration will be used in future calls to Repository.open.""" return self._repository.save_config() - def ancestry(self, snapshot_id: str) -> list[SnapshotMetadata]: + def ancestry( + self, + *, + branch: str | None = None, + tag: str | None = None, + snapshot: str | None = None, + ) -> list[SnapshotMetadata]: """Get the ancestry of a snapshot. Args: - snapshot_id: The snapshot ID to get the ancestry of. + branch: The branch to get the ancestry of. + tag: The tag to get the ancestry of. + snapshot: The snapshot ID to get the ancestry of. Returns: list[SnapshotMetadata]: The ancestry of the snapshot, listing out the snapshots and their metadata + + Only one of the arguments can be specified. """ - return self._repository.ancestry(snapshot_id) + return self._repository.ancestry(branch=branch, tag=tag, snapshot=snapshot) def create_branch(self, branch: str, snapshot_id: str) -> None: """Create a new branch at the given snapshot. @@ -189,7 +199,7 @@ def readonly_session( *, branch: str | None = None, tag: str | None = None, - snapshot_id: str | None = None, + snapshot: str | None = None, ) -> Session: """Create a read-only session. @@ -200,15 +210,15 @@ def readonly_session( Args: branch: If provided, the branch to create the session on. tag: If provided, the tag to create the session on. - snapshot_id: If provided, the snapshot ID to create the session on. + snapshot: If provided, the snapshot ID to create the session on. Returns: Session: The read-only session, pointing to the specified snapshot, tag, or branch. + + Only one of the arguments can be specified. """ return Session( - self._repository.readonly_session( - branch=branch, tag=tag, snapshot_id=snapshot_id - ) + self._repository.readonly_session(branch=branch, tag=tag, snapshot=snapshot) ) def writable_session(self, branch: str) -> Session: diff --git a/icechunk-python/src/repository.rs b/icechunk-python/src/repository.rs index 047a418a..d16806a7 100644 --- a/icechunk-python/src/repository.rs +++ b/icechunk-python/src/repository.rs @@ -20,8 +20,8 @@ use crate::{ session::PySession, }; -#[pyclass(name = "SnapshotMetadata")] -#[derive(Clone, Debug)] +#[pyclass(name = "SnapshotMetadata", eq)] +#[derive(Clone, Debug, PartialEq, Eq)] pub struct PySnapshotMetadata { #[pyo3(get)] id: String, @@ -179,24 +179,23 @@ impl PyRepository { self.0.storage_settings().clone().into() } + #[pyo3(signature = (*, branch = None, tag = None, snapshot = None))] pub fn ancestry( &self, py: Python<'_>, - snapshot_id: &str, + branch: Option, + tag: Option, + snapshot: Option, ) -> PyResult> { // This function calls block_on, so we need to allow other thread python to make progress py.allow_threads(move || { - let snapshot_id = SnapshotId::try_from(snapshot_id).map_err(|_| { - PyIcechunkStoreError::RepositoryError(RepositoryError::InvalidSnapshotId( - snapshot_id.to_owned(), - )) - })?; + let version = args_to_version_info(branch, tag, snapshot)?; // TODO: this holds everything in memory pyo3_async_runtimes::tokio::get_runtime().block_on(async move { let ancestry = self .0 - .ancestry(&snapshot_id) + .ancestry(&version) .await .map_err(PyIcechunkStoreError::RepositoryError)? .map_ok(Into::::into) @@ -349,35 +348,17 @@ impl PyRepository { }) } - #[pyo3(signature = (*, branch = None, tag = None, snapshot_id = None))] + #[pyo3(signature = (*, branch = None, tag = None, snapshot = None))] pub fn readonly_session( &self, py: Python<'_>, branch: Option, tag: Option, - snapshot_id: Option, + snapshot: Option, ) -> PyResult { // This function calls block_on, so we need to allow other thread python to make progress py.allow_threads(move || { - let version = if let Some(branch_name) = branch { - VersionInfo::BranchTipRef(branch_name) - } else if let Some(tag_name) = tag { - VersionInfo::TagRef(tag_name) - } else if let Some(snapshot_id) = snapshot_id { - let snapshot_id = - SnapshotId::try_from(snapshot_id.as_str()).map_err(|_| { - PyIcechunkStoreError::RepositoryError( - RepositoryError::InvalidSnapshotId(snapshot_id.to_owned()), - ) - })?; - - VersionInfo::SnapshotId(snapshot_id) - } else { - return Err(PyValueError::new_err( - "Must provide either branch_name, tag_name, or snapshot_id", - )); - }; - + let version = args_to_version_info(branch, tag, snapshot)?; let session = pyo3_async_runtimes::tokio::get_runtime().block_on(async move { self.0 @@ -414,3 +395,34 @@ fn map_credentials( }) .unwrap_or_default() } + +fn args_to_version_info( + branch: Option, + tag: Option, + snapshot: Option, +) -> PyResult { + let n = [&branch, &tag, &snapshot].iter().filter(|r| !r.is_none()).count(); + if n > 1 { + return Err(PyValueError::new_err( + "Must provide one of branch_name, tag_name, or snapshot_id", + )); + } + + if let Some(branch_name) = branch { + Ok(VersionInfo::BranchTipRef(branch_name)) + } else if let Some(tag_name) = tag { + Ok(VersionInfo::TagRef(tag_name)) + } else if let Some(snapshot_id) = snapshot { + let snapshot_id = SnapshotId::try_from(snapshot_id.as_str()).map_err(|_| { + PyIcechunkStoreError::RepositoryError(RepositoryError::InvalidSnapshotId( + snapshot_id.to_owned(), + )) + })?; + + Ok(VersionInfo::SnapshotId(snapshot_id)) + } else { + return Err(PyValueError::new_err( + "Must provide either branch_name, tag_name, or snapshot_id", + )); + } +} diff --git a/icechunk-python/tests/test_can_read_old.py b/icechunk-python/tests/test_can_read_old.py index edb171cb..a2a9d4d7 100644 --- a/icechunk-python/tests/test_can_read_old.py +++ b/icechunk-python/tests/test_can_read_old.py @@ -170,23 +170,26 @@ async def test_icechunk_can_read_old_repo() -> None: "empty structure", "Repository initialized", ] - assert [p.message for p in repo.ancestry(main_snapshot)] == expected_main_history + assert [ + p.message for p in repo.ancestry(snapshot=main_snapshot) + ] == expected_main_history - my_branch_snapshot = repo.lookup_branch("my-branch") expected_branch_history = [ "some more structure", "delete a chunk", ] + expected_main_history assert [ - p.message for p in repo.ancestry(my_branch_snapshot) + p.message for p in repo.ancestry(branch="my-branch") ] == expected_branch_history - tag_snapshot = repo.lookup_tag("it also works!") - assert [p.message for p in repo.ancestry(tag_snapshot)] == expected_branch_history + assert [ + p.message for p in repo.ancestry(tag="it also works!") + ] == expected_branch_history - tag_snapshot = repo.lookup_tag("it works!") - assert [p.message for p in repo.ancestry(tag_snapshot)] == expected_branch_history[1:] + assert [p.message for p in repo.ancestry(tag="it works!")] == expected_branch_history[ + 1: + ] session = repo.writable_session("my-branch") store = session.store diff --git a/icechunk-python/tests/test_stateful_repo_ops.py b/icechunk-python/tests/test_stateful_repo_ops.py index b113ccf3..db2a1be6 100644 --- a/icechunk-python/tests/test_stateful_repo_ops.py +++ b/icechunk-python/tests/test_stateful_repo_ops.py @@ -258,7 +258,7 @@ def commit(self, message): @rule(ref=commits) def checkout_commit(self, ref): note(f"Checking out commit {ref}") - self.session = self.repo.readonly_session(snapshot_id=ref) + self.session = self.repo.readonly_session(snapshot=ref) assert self.session.read_only self.model.checkout_commit(ref) diff --git a/icechunk-python/tests/test_timetravel.py b/icechunk-python/tests/test_timetravel.py index b5720f18..60edd047 100644 --- a/icechunk-python/tests/test_timetravel.py +++ b/icechunk-python/tests/test_timetravel.py @@ -38,14 +38,14 @@ def test_timetravel() -> None: new_snapshot_id = session.commit("commit 2") - session = repo.readonly_session(snapshot_id=snapshot_id) + session = repo.readonly_session(snapshot=snapshot_id) store = session.store group = zarr.open_group(store=store, mode="r") air_temp = cast(zarr.core.array.Array, group["air_temp"]) assert store.read_only assert air_temp[200, 6] == 42 - session = repo.readonly_session(snapshot_id=new_snapshot_id) + session = repo.readonly_session(snapshot=new_snapshot_id) store = session.store group = zarr.open_group(store=store, mode="r") air_temp = cast(zarr.core.array.Array, group["air_temp"]) @@ -86,6 +86,7 @@ def test_timetravel() -> None: assert branches == set(["main"]) repo.create_tag("v1.0", feature_snapshot_id) + repo.create_branch("feature-not-dead", feature_snapshot_id) session = repo.readonly_session(tag="v1.0") store = session.store assert store._read_only @@ -95,7 +96,7 @@ def test_timetravel() -> None: air_temp = cast(zarr.core.array.Array, group["air_temp"]) assert air_temp[200, 6] == 90 - parents = list(repo.ancestry(feature_snapshot_id)) + parents = list(repo.ancestry(snapshot=feature_snapshot_id)) assert [snap.message for snap in parents] == [ "commit 3", "commit 2", @@ -104,6 +105,8 @@ def test_timetravel() -> None: ] assert sorted(parents, key=lambda p: p.written_at) == list(reversed(parents)) assert len(set([snap.id for snap in parents])) == 4 + assert list(repo.ancestry(tag="v1.0")) == parents + assert list(repo.ancestry(branch="feature-not-dead")) == parents tags = repo.list_tags() assert tags == set(["v1.0"]) diff --git a/icechunk/src/repository.rs b/icechunk/src/repository.rs index 537fe438..9e15011e 100644 --- a/icechunk/src/repository.rs +++ b/icechunk/src/repository.rs @@ -287,7 +287,7 @@ impl Repository { } /// Returns the sequence of parents of the current session, in order of latest first. - pub async fn ancestry( + async fn snapshot_ancestry( &self, snapshot_id: &SnapshotId, ) -> RepositoryResult>> { @@ -306,6 +306,15 @@ impl Repository { Ok(futures::stream::iter(iter::once(Ok(last)).chain(it.map(Ok)))) } + /// Returns the sequence of parents of the snapshot pointed by the given version + pub async fn ancestry( + &self, + version: &VersionInfo, + ) -> RepositoryResult>> { + let snapshot_id = self.resolve_version(version).await?; + self.snapshot_ancestry(&snapshot_id).await + } + /// Create a new branch in the repository at the given snapshot id pub async fn create_branch( &self, @@ -417,11 +426,11 @@ impl Repository { Ok(ref_data.snapshot) } - pub async fn readonly_session( + async fn resolve_version( &self, version: &VersionInfo, - ) -> RepositoryResult { - let snapshot_id: SnapshotId = match version { + ) -> RepositoryResult { + match version { VersionInfo::SnapshotId(sid) => { raise_if_invalid_snapshot_id( self.storage.as_ref(), @@ -429,12 +438,12 @@ impl Repository { sid, ) .await?; - Ok::<_, RepositoryError>(SnapshotId::from(sid.clone())) + Ok(sid.clone()) } VersionInfo::TagRef(tag) => { let ref_data = fetch_tag(self.storage.as_ref(), &self.storage_settings, tag).await?; - Ok::<_, RepositoryError>(ref_data.snapshot) + Ok(ref_data.snapshot) } VersionInfo::BranchTipRef(branch) => { let ref_data = fetch_branch_tip( @@ -443,10 +452,16 @@ impl Repository { branch, ) .await?; - Ok::<_, RepositoryError>(ref_data.snapshot) + Ok(ref_data.snapshot) } - }?; + } + } + pub async fn readonly_session( + &self, + version: &VersionInfo, + ) -> RepositoryResult { + let snapshot_id = self.resolve_version(version).await?; let session = Session::create_readonly_session( self.config.clone(), self.storage_settings.clone(), diff --git a/icechunk/src/session.rs b/icechunk/src/session.rs index 9e99327c..1434d35b 100644 --- a/icechunk/src/session.rs +++ b/icechunk/src/session.rs @@ -2257,8 +2257,11 @@ mod tests { assert_eq!(ref_name, Ref::Branch("main".to_string())); assert_eq!(new_snapshot_id, ref_data.snapshot); - let parents = - repo.ancestry(&new_snapshot_id).await?.try_collect::>().await?; + let parents = repo + .ancestry(&VersionInfo::SnapshotId(new_snapshot_id)) + .await? + .try_collect::>() + .await?; assert_eq!(parents[0].message, "second commit"); assert_eq!(parents[1].message, "first commit"); assert_eq!(parents[2].message, Snapshot::INITIAL_COMMIT_MESSAGE); @@ -2314,8 +2317,11 @@ mod tests { let ds = repository .readonly_session(&VersionInfo::BranchTipRef("main".to_string())) .await?; - let parents = - repository.ancestry(&ds.snapshot_id).await?.try_collect::>().await?; + let parents = repository + .ancestry(&VersionInfo::SnapshotId(ds.snapshot_id)) + .await? + .try_collect::>() + .await?; assert_eq!(parents.len(), 2); let msg = parents[0].message.as_str(); assert!(msg == "from 1" || msg == "from 2"); @@ -2838,7 +2844,11 @@ mod tests { ds2.get_chunk_ref(&new_array_2_path, &ChunkIndices(vec![0])).await?; assert_eq!(data, Some(ChunkPayload::Inline("bye0".into()))); - let commits = repo.ancestry(&snapshot).await?.try_collect::>().await?; + let commits = repo + .ancestry(&VersionInfo::SnapshotId(snapshot)) + .await? + .try_collect::>() + .await?; assert_eq!(commits[0].message, "after conflict"); assert_eq!(commits[1].message, "write two chunks with repo 1"); } else {