Skip to content

Commit

Permalink
Better ancestry interface (#574)
Browse files Browse the repository at this point in the history
Now users can use a snapshot, tag or branch to fetch history.

```python

class Repository:
  def ancestry(
      self,
      *,
      branch: str | None = None,
      tag: str | None = None,
      snapshot: str | None = None,
  ) -> list[SnapshotMetadata]: ...

```

Closes: #572
  • Loading branch information
paraseba authored Jan 14, 2025
1 parent b2be6fc commit e33b357
Show file tree
Hide file tree
Showing 8 changed files with 123 additions and 64 deletions.
10 changes: 8 additions & 2 deletions icechunk-python/python/icechunk/_icechunk_python.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,13 @@ class PyRepository:
@staticmethod
def fetch_config(storage: Storage) -> RepositoryConfig | None: ...
def save_config(self) -> None: ...
def ancestry(self, snapshot_id: str) -> list[SnapshotMetadata]: ...
def ancestry(
self,
*,
branch: str | None = None,
tag: str | None = None,
snapshot: str | None = None,
) -> list[SnapshotMetadata]: ...
def create_branch(self, branch: str, snapshot_id: str) -> None: ...
def list_branches(self) -> set[str]: ...
def lookup_branch(self, branch: str) -> str: ...
Expand All @@ -199,7 +205,7 @@ class PyRepository:
*,
branch: str | None = None,
tag: str | None = None,
snapshot_id: str | None = None,
snapshot: str | None = None,
) -> PySession: ...
def writable_session(self, branch: str) -> PySession: ...

Expand Down
26 changes: 18 additions & 8 deletions icechunk-python/python/icechunk/repository.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,16 +105,26 @@ def save_config(self) -> None:
"""Save the repository configuration to storage, this configuration will be used in future calls to Repository.open."""
return self._repository.save_config()

def ancestry(self, snapshot_id: str) -> list[SnapshotMetadata]:
def ancestry(
self,
*,
branch: str | None = None,
tag: str | None = None,
snapshot: str | None = None,
) -> list[SnapshotMetadata]:
"""Get the ancestry of a snapshot.
Args:
snapshot_id: The snapshot ID to get the ancestry of.
branch: The branch to get the ancestry of.
tag: The tag to get the ancestry of.
snapshot: The snapshot ID to get the ancestry of.
Returns:
list[SnapshotMetadata]: The ancestry of the snapshot, listing out the snapshots and their metadata
Only one of the arguments can be specified.
"""
return self._repository.ancestry(snapshot_id)
return self._repository.ancestry(branch=branch, tag=tag, snapshot=snapshot)

def create_branch(self, branch: str, snapshot_id: str) -> None:
"""Create a new branch at the given snapshot.
Expand Down Expand Up @@ -189,7 +199,7 @@ def readonly_session(
*,
branch: str | None = None,
tag: str | None = None,
snapshot_id: str | None = None,
snapshot: str | None = None,
) -> Session:
"""Create a read-only session.
Expand All @@ -200,15 +210,15 @@ def readonly_session(
Args:
branch: If provided, the branch to create the session on.
tag: If provided, the tag to create the session on.
snapshot_id: If provided, the snapshot ID to create the session on.
snapshot: If provided, the snapshot ID to create the session on.
Returns:
Session: The read-only session, pointing to the specified snapshot, tag, or branch.
Only one of the arguments can be specified.
"""
return Session(
self._repository.readonly_session(
branch=branch, tag=tag, snapshot_id=snapshot_id
)
self._repository.readonly_session(branch=branch, tag=tag, snapshot=snapshot)
)

def writable_session(self, branch: str) -> Session:
Expand Down
72 changes: 42 additions & 30 deletions icechunk-python/src/repository.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ use crate::{
session::PySession,
};

#[pyclass(name = "SnapshotMetadata")]
#[derive(Clone, Debug)]
#[pyclass(name = "SnapshotMetadata", eq)]
#[derive(Clone, Debug, PartialEq, Eq)]
pub struct PySnapshotMetadata {
#[pyo3(get)]
id: String,
Expand Down Expand Up @@ -179,24 +179,23 @@ impl PyRepository {
self.0.storage_settings().clone().into()
}

#[pyo3(signature = (*, branch = None, tag = None, snapshot = None))]
pub fn ancestry(
&self,
py: Python<'_>,
snapshot_id: &str,
branch: Option<String>,
tag: Option<String>,
snapshot: Option<String>,
) -> PyResult<Vec<PySnapshotMetadata>> {
// This function calls block_on, so we need to allow other thread python to make progress
py.allow_threads(move || {
let snapshot_id = SnapshotId::try_from(snapshot_id).map_err(|_| {
PyIcechunkStoreError::RepositoryError(RepositoryError::InvalidSnapshotId(
snapshot_id.to_owned(),
))
})?;
let version = args_to_version_info(branch, tag, snapshot)?;

// TODO: this holds everything in memory
pyo3_async_runtimes::tokio::get_runtime().block_on(async move {
let ancestry = self
.0
.ancestry(&snapshot_id)
.ancestry(&version)
.await
.map_err(PyIcechunkStoreError::RepositoryError)?
.map_ok(Into::<PySnapshotMetadata>::into)
Expand Down Expand Up @@ -349,35 +348,17 @@ impl PyRepository {
})
}

#[pyo3(signature = (*, branch = None, tag = None, snapshot_id = None))]
#[pyo3(signature = (*, branch = None, tag = None, snapshot = None))]
pub fn readonly_session(
&self,
py: Python<'_>,
branch: Option<String>,
tag: Option<String>,
snapshot_id: Option<String>,
snapshot: Option<String>,
) -> PyResult<PySession> {
// This function calls block_on, so we need to allow other thread python to make progress
py.allow_threads(move || {
let version = if let Some(branch_name) = branch {
VersionInfo::BranchTipRef(branch_name)
} else if let Some(tag_name) = tag {
VersionInfo::TagRef(tag_name)
} else if let Some(snapshot_id) = snapshot_id {
let snapshot_id =
SnapshotId::try_from(snapshot_id.as_str()).map_err(|_| {
PyIcechunkStoreError::RepositoryError(
RepositoryError::InvalidSnapshotId(snapshot_id.to_owned()),
)
})?;

VersionInfo::SnapshotId(snapshot_id)
} else {
return Err(PyValueError::new_err(
"Must provide either branch_name, tag_name, or snapshot_id",
));
};

let version = args_to_version_info(branch, tag, snapshot)?;
let session =
pyo3_async_runtimes::tokio::get_runtime().block_on(async move {
self.0
Expand Down Expand Up @@ -414,3 +395,34 @@ fn map_credentials(
})
.unwrap_or_default()
}

fn args_to_version_info(
branch: Option<String>,
tag: Option<String>,
snapshot: Option<String>,
) -> PyResult<VersionInfo> {
let n = [&branch, &tag, &snapshot].iter().filter(|r| !r.is_none()).count();
if n > 1 {
return Err(PyValueError::new_err(
"Must provide one of branch_name, tag_name, or snapshot_id",
));
}

if let Some(branch_name) = branch {
Ok(VersionInfo::BranchTipRef(branch_name))
} else if let Some(tag_name) = tag {
Ok(VersionInfo::TagRef(tag_name))
} else if let Some(snapshot_id) = snapshot {
let snapshot_id = SnapshotId::try_from(snapshot_id.as_str()).map_err(|_| {
PyIcechunkStoreError::RepositoryError(RepositoryError::InvalidSnapshotId(
snapshot_id.to_owned(),
))
})?;

Ok(VersionInfo::SnapshotId(snapshot_id))
} else {
return Err(PyValueError::new_err(
"Must provide either branch_name, tag_name, or snapshot_id",
));
}
}
17 changes: 10 additions & 7 deletions icechunk-python/tests/test_can_read_old.py
Original file line number Diff line number Diff line change
Expand Up @@ -170,23 +170,26 @@ async def test_icechunk_can_read_old_repo() -> None:
"empty structure",
"Repository initialized",
]
assert [p.message for p in repo.ancestry(main_snapshot)] == expected_main_history
assert [
p.message for p in repo.ancestry(snapshot=main_snapshot)
] == expected_main_history

my_branch_snapshot = repo.lookup_branch("my-branch")
expected_branch_history = [
"some more structure",
"delete a chunk",
] + expected_main_history

assert [
p.message for p in repo.ancestry(my_branch_snapshot)
p.message for p in repo.ancestry(branch="my-branch")
] == expected_branch_history

tag_snapshot = repo.lookup_tag("it also works!")
assert [p.message for p in repo.ancestry(tag_snapshot)] == expected_branch_history
assert [
p.message for p in repo.ancestry(tag="it also works!")
] == expected_branch_history

tag_snapshot = repo.lookup_tag("it works!")
assert [p.message for p in repo.ancestry(tag_snapshot)] == expected_branch_history[1:]
assert [p.message for p in repo.ancestry(tag="it works!")] == expected_branch_history[
1:
]

session = repo.writable_session("my-branch")
store = session.store
Expand Down
2 changes: 1 addition & 1 deletion icechunk-python/tests/test_stateful_repo_ops.py
Original file line number Diff line number Diff line change
Expand Up @@ -258,7 +258,7 @@ def commit(self, message):
@rule(ref=commits)
def checkout_commit(self, ref):
note(f"Checking out commit {ref}")
self.session = self.repo.readonly_session(snapshot_id=ref)
self.session = self.repo.readonly_session(snapshot=ref)
assert self.session.read_only
self.model.checkout_commit(ref)

Expand Down
9 changes: 6 additions & 3 deletions icechunk-python/tests/test_timetravel.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,14 +38,14 @@ def test_timetravel() -> None:

new_snapshot_id = session.commit("commit 2")

session = repo.readonly_session(snapshot_id=snapshot_id)
session = repo.readonly_session(snapshot=snapshot_id)
store = session.store
group = zarr.open_group(store=store, mode="r")
air_temp = cast(zarr.core.array.Array, group["air_temp"])
assert store.read_only
assert air_temp[200, 6] == 42

session = repo.readonly_session(snapshot_id=new_snapshot_id)
session = repo.readonly_session(snapshot=new_snapshot_id)
store = session.store
group = zarr.open_group(store=store, mode="r")
air_temp = cast(zarr.core.array.Array, group["air_temp"])
Expand Down Expand Up @@ -86,6 +86,7 @@ def test_timetravel() -> None:
assert branches == set(["main"])

repo.create_tag("v1.0", feature_snapshot_id)
repo.create_branch("feature-not-dead", feature_snapshot_id)
session = repo.readonly_session(tag="v1.0")
store = session.store
assert store._read_only
Expand All @@ -95,7 +96,7 @@ def test_timetravel() -> None:
air_temp = cast(zarr.core.array.Array, group["air_temp"])
assert air_temp[200, 6] == 90

parents = list(repo.ancestry(feature_snapshot_id))
parents = list(repo.ancestry(snapshot=feature_snapshot_id))
assert [snap.message for snap in parents] == [
"commit 3",
"commit 2",
Expand All @@ -104,6 +105,8 @@ def test_timetravel() -> None:
]
assert sorted(parents, key=lambda p: p.written_at) == list(reversed(parents))
assert len(set([snap.id for snap in parents])) == 4
assert list(repo.ancestry(tag="v1.0")) == parents
assert list(repo.ancestry(branch="feature-not-dead")) == parents

tags = repo.list_tags()
assert tags == set(["v1.0"])
Expand Down
31 changes: 23 additions & 8 deletions icechunk/src/repository.rs
Original file line number Diff line number Diff line change
Expand Up @@ -287,7 +287,7 @@ impl Repository {
}

/// Returns the sequence of parents of the current session, in order of latest first.
pub async fn ancestry(
async fn snapshot_ancestry(
&self,
snapshot_id: &SnapshotId,
) -> RepositoryResult<impl Stream<Item = RepositoryResult<SnapshotMetadata>>> {
Expand All @@ -306,6 +306,15 @@ impl Repository {
Ok(futures::stream::iter(iter::once(Ok(last)).chain(it.map(Ok))))
}

/// Returns the sequence of parents of the snapshot pointed by the given version
pub async fn ancestry(
&self,
version: &VersionInfo,
) -> RepositoryResult<impl Stream<Item = RepositoryResult<SnapshotMetadata>>> {
let snapshot_id = self.resolve_version(version).await?;
self.snapshot_ancestry(&snapshot_id).await
}

/// Create a new branch in the repository at the given snapshot id
pub async fn create_branch(
&self,
Expand Down Expand Up @@ -417,24 +426,24 @@ impl Repository {
Ok(ref_data.snapshot)
}

pub async fn readonly_session(
async fn resolve_version(
&self,
version: &VersionInfo,
) -> RepositoryResult<Session> {
let snapshot_id: SnapshotId = match version {
) -> RepositoryResult<SnapshotId> {
match version {
VersionInfo::SnapshotId(sid) => {
raise_if_invalid_snapshot_id(
self.storage.as_ref(),
&self.storage_settings,
sid,
)
.await?;
Ok::<_, RepositoryError>(SnapshotId::from(sid.clone()))
Ok(sid.clone())
}
VersionInfo::TagRef(tag) => {
let ref_data =
fetch_tag(self.storage.as_ref(), &self.storage_settings, tag).await?;
Ok::<_, RepositoryError>(ref_data.snapshot)
Ok(ref_data.snapshot)
}
VersionInfo::BranchTipRef(branch) => {
let ref_data = fetch_branch_tip(
Expand All @@ -443,10 +452,16 @@ impl Repository {
branch,
)
.await?;
Ok::<_, RepositoryError>(ref_data.snapshot)
Ok(ref_data.snapshot)
}
}?;
}
}

pub async fn readonly_session(
&self,
version: &VersionInfo,
) -> RepositoryResult<Session> {
let snapshot_id = self.resolve_version(version).await?;
let session = Session::create_readonly_session(
self.config.clone(),
self.storage_settings.clone(),
Expand Down
20 changes: 15 additions & 5 deletions icechunk/src/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2257,8 +2257,11 @@ mod tests {
assert_eq!(ref_name, Ref::Branch("main".to_string()));
assert_eq!(new_snapshot_id, ref_data.snapshot);

let parents =
repo.ancestry(&new_snapshot_id).await?.try_collect::<Vec<_>>().await?;
let parents = repo
.ancestry(&VersionInfo::SnapshotId(new_snapshot_id))
.await?
.try_collect::<Vec<_>>()
.await?;
assert_eq!(parents[0].message, "second commit");
assert_eq!(parents[1].message, "first commit");
assert_eq!(parents[2].message, Snapshot::INITIAL_COMMIT_MESSAGE);
Expand Down Expand Up @@ -2314,8 +2317,11 @@ mod tests {
let ds = repository
.readonly_session(&VersionInfo::BranchTipRef("main".to_string()))
.await?;
let parents =
repository.ancestry(&ds.snapshot_id).await?.try_collect::<Vec<_>>().await?;
let parents = repository
.ancestry(&VersionInfo::SnapshotId(ds.snapshot_id))
.await?
.try_collect::<Vec<_>>()
.await?;
assert_eq!(parents.len(), 2);
let msg = parents[0].message.as_str();
assert!(msg == "from 1" || msg == "from 2");
Expand Down Expand Up @@ -2838,7 +2844,11 @@ mod tests {
ds2.get_chunk_ref(&new_array_2_path, &ChunkIndices(vec![0])).await?;
assert_eq!(data, Some(ChunkPayload::Inline("bye0".into())));

let commits = repo.ancestry(&snapshot).await?.try_collect::<Vec<_>>().await?;
let commits = repo
.ancestry(&VersionInfo::SnapshotId(snapshot))
.await?
.try_collect::<Vec<_>>()
.await?;
assert_eq!(commits[0].message, "after conflict");
assert_eq!(commits[1].message, "write two chunks with repo 1");
} else {
Expand Down

0 comments on commit e33b357

Please sign in to comment.