Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add merge method for combining changes from multiple stores #154

Closed
wants to merge 6 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
72 changes: 37 additions & 35 deletions icechunk-python/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -354,34 +354,35 @@ impl PyIcechunkStore {
// The commit mechanism is async and calls tokio::spawn so we need to use the
// pyo3_asyncio_0_21::tokio helper to run the async function in the tokio runtime
pyo3_asyncio_0_21::tokio::future_into_py(py, async move {
let mut writeable_store = store.write().await;
let oid = writeable_store
let oid = store
.read()
.await
.commit(&message)
.await
.map_err(PyIcechunkStoreError::from)?;
Ok(String::from(&oid))
})
}

fn distributed_commit<'py>(
&'py self,
py: Python<'py>,
message: String,
other_change_set_bytes: Vec<Vec<u8>>,
) -> PyResult<Bound<'py, PyAny>> {
let store = Arc::clone(&self.store);

// The commit mechanism is async and calls tokio::spawn so we need to use the
// pyo3_asyncio_0_21::tokio helper to run the async function in the tokio runtime
pyo3_asyncio_0_21::tokio::future_into_py(py, async move {
let mut writeable_store = store.write().await;
let oid = writeable_store
.distributed_commit(&message, other_change_set_bytes)
.await
.map_err(PyIcechunkStoreError::from)?;
Ok(String::from(&oid))
})
}
// fn distributed_commit<'py>(
// &'py self,
// py: Python<'py>,
// message: String,
// other_change_set_bytes: Vec<Vec<u8>>,
// ) -> PyResult<Bound<'py, PyAny>> {
// let store = Arc::clone(&self.store);

// // The commit mechanism is async and calls tokio::spawn so we need to use the
// // pyo3_asyncio_0_21::tokio helper to run the async function in the tokio runtime
// pyo3_asyncio_0_21::tokio::future_into_py(py, async move {
// let mut writeable_store = store.write().await;
// let oid = writeable_store
// .distributed_commit(&message, other_change_set_bytes)
// .await
// .map_err(PyIcechunkStoreError::from)?;
// Ok(String::from(&oid))
// })
// }

fn change_set_bytes(&self) -> PyIcechunkStoreResult<Vec<u8>> {
let store = self.store.blocking_read();
Expand Down Expand Up @@ -410,12 +411,7 @@ impl PyIcechunkStore {
let store = Arc::clone(&self.store);

pyo3_asyncio_0_21::tokio::future_into_py(py, async move {
store
.write()
.await
.reset()
.await
.map_err(PyIcechunkStoreError::StoreError)?;
store.read().await.reset().await.map_err(PyIcechunkStoreError::StoreError)?;
Ok(())
})
}
Expand Down Expand Up @@ -450,10 +446,14 @@ impl PyIcechunkStore {
// The commit mechanism is async and calls tokio::spawn so we need to use the
// pyo3_asyncio_0_21::tokio helper to run the async function in the tokio runtime
pyo3_asyncio_0_21::tokio::future_into_py(py, async move {
let mut writeable_store = store.write().await;
let oid = ObjectId::try_from(snapshot_id.as_str())
.map_err(|e| PyIcechunkStoreError::UnkownError(e.to_string()))?;
writeable_store.tag(&tag, &oid).await.map_err(PyIcechunkStoreError::from)?;
store
.read()
.await
.tag(&tag, &oid)
.await
.map_err(PyIcechunkStoreError::from)?;
Ok(())
})
}
Expand Down Expand Up @@ -627,14 +627,16 @@ impl PyIcechunkStore {
) -> PyResult<Bound<'py, PyAny>> {
let store = Arc::clone(&self.store);

let virtual_ref = VirtualChunkRef {
location: VirtualChunkLocation::Absolute(location),
offset,
length,
};

pyo3_asyncio_0_21::tokio::future_into_py(py, async move {
let virtual_ref = VirtualChunkRef {
location: VirtualChunkLocation::Absolute(location),
offset,
length,
};
let mut store = store.write().await;
store
.read()
.await
.set_virtual_ref(&key, virtual_ref)
.await
.map_err(PyIcechunkStoreError::from)?;
Expand Down
106 changes: 46 additions & 60 deletions icechunk/src/repository.rs
Original file line number Diff line number Diff line change
Expand Up @@ -281,6 +281,11 @@ impl Repository {
!self.change_set.is_empty()
}

/// Discard all uncommitted changes and return them as a `ChangeSet`
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

everything in this file lgtm

pub fn discard_changes(&mut self) -> ChangeSet {
std::mem::take(&mut self.change_set)
}

/// Returns the sequence of parents of the current session, in order of latest first.
pub async fn ancestry(
&self,
Expand Down Expand Up @@ -757,26 +762,13 @@ impl Repository {
Ok(existing_array_chunks.chain(new_array_chunks))
}

pub async fn distributed_flush<I: IntoIterator<Item = ChangeSet>>(
pub async fn merge<I: IntoIterator<Item = Repository>>(
&mut self,
other_change_sets: I,
message: &str,
properties: SnapshotProperties,
) -> RepositoryResult<SnapshotId> {
// FIXME: this clone can be avoided
let change_sets = iter::once(self.change_set.clone()).chain(other_change_sets);
let new_snapshot_id = distributed_flush(
self.storage.as_ref(),
self.snapshot_id(),
change_sets,
message,
properties,
)
.await?;

self.snapshot_id = new_snapshot_id.clone();
self.change_set = ChangeSet::default();
Ok(new_snapshot_id)
other_repositories: I,
) -> RepositoryResult<()> {
let change_sets = other_repositories.into_iter().map(|r| r.change_set);
self.change_set.merge_many(change_sets);
Ok(())
}

/// After changes to the repository have been made, this generates and writes to `Storage` the updated datastructures.
Expand All @@ -791,36 +783,36 @@ impl Repository {
message: &str,
properties: SnapshotProperties,
) -> RepositoryResult<SnapshotId> {
self.distributed_flush(iter::empty(), message, properties).await
}
// TODO: can this clone can be avoided? its difficult because
// self is borrows for flush and the change set should only
// be cleared after the flush is successful.
let mut change_set = self.change_set.clone();

pub async fn commit(
&mut self,
update_branch_name: &str,
message: &str,
properties: Option<SnapshotProperties>,
) -> RepositoryResult<SnapshotId> {
self.distributed_commit(update_branch_name, iter::empty(), message, properties)
.await
let new_snapshot_id = flush(
self.storage.as_ref(),
self.snapshot_id(),
&mut change_set,
message,
properties,
)
.await?;

self.snapshot_id = new_snapshot_id.clone();
self.change_set = ChangeSet::default();
Ok(new_snapshot_id)
}

pub async fn distributed_commit<I: IntoIterator<Item = ChangeSet>>(
pub async fn commit(
&mut self,
update_branch_name: &str,
other_change_sets: I,
message: &str,
properties: Option<SnapshotProperties>,
) -> RepositoryResult<SnapshotId> {
let current = fetch_branch_tip(self.storage.as_ref(), update_branch_name).await;

match current {
Err(RefError::RefNotFound(_)) => {
self.do_distributed_commit(
update_branch_name,
other_change_sets,
message,
properties,
)
.await
self.do_commit(update_branch_name, message, properties).await
}
Err(err) => Err(err.into()),
Ok(ref_data) => {
Expand All @@ -831,29 +823,21 @@ impl Repository {
actual_parent: Some(ref_data.snapshot.clone()),
})
} else {
self.do_distributed_commit(
update_branch_name,
other_change_sets,
message,
properties,
)
.await
self.do_commit(update_branch_name, message, properties).await
}
}
}
}

async fn do_distributed_commit<I: IntoIterator<Item = ChangeSet>>(
async fn do_commit(
&mut self,
update_branch_name: &str,
other_change_sets: I,
message: &str,
properties: Option<SnapshotProperties>,
) -> RepositoryResult<SnapshotId> {
let parent_snapshot = self.snapshot_id.clone();
let properties = properties.unwrap_or_default();
let new_snapshot =
self.distributed_flush(other_change_sets, message, properties).await?;
let new_snapshot = self.flush(message, properties).await?;

match update_branch(
self.storage.as_ref(),
Expand Down Expand Up @@ -997,19 +981,13 @@ async fn updated_nodes<'a>(
.chain(change_set.new_nodes_iterator(manifest_id)))
}

async fn distributed_flush<I: IntoIterator<Item = ChangeSet>>(
async fn flush(
storage: &(dyn Storage + Send + Sync),
parent_id: &SnapshotId,
change_sets: I,
change_set: &mut ChangeSet,
message: &str,
properties: SnapshotProperties,
) -> RepositoryResult<SnapshotId> {
let mut change_set = ChangeSet::default();
change_set.merge_many(change_sets);
if change_set.is_empty() {
return Err(RepositoryError::NoChangesToCommit);
}

// We search for the current manifest. We are assumming a single one for now
let old_snapshot = storage.fetch_snapshot(parent_id).await?;
let old_snapshot_c = Arc::clone(&old_snapshot);
Expand Down Expand Up @@ -1038,8 +1016,8 @@ async fn distributed_flush<I: IntoIterator<Item = ChangeSet>>(
// As a solution, we temporarily `take` the map, replacing it an empty one, run the thread,
// and at the end we put the map back to where it was, in case there is some later failure.
// We always want to leave things in the previous state if there was a failure.

let chunk_changes = Arc::new(change_set.take_chunks());
let chunks = change_set.take_chunks();
let chunk_changes = Arc::new(chunks);
let chunk_changes_c = Arc::clone(&chunk_changes);

let update_task = task::spawn_blocking(move || {
Expand Down Expand Up @@ -1070,7 +1048,7 @@ async fn distributed_flush<I: IntoIterator<Item = ChangeSet>>(
.await?;

let all_nodes =
updated_nodes(storage, &change_set, parent_id, &new_manifest_id).await?;
updated_nodes(storage, change_set, parent_id, &new_manifest_id).await?;

let mut new_snapshot = Snapshot::from_iter(
old_snapshot.as_ref(),
Expand Down Expand Up @@ -1615,6 +1593,14 @@ mod tests {
// wo commit to test the case of a chunkless array
let _snapshot_id = ds.flush("commit", SnapshotProperties::default()).await?;

let new_new_array_path: Path = "/group/array2".try_into().unwrap();
ds.add_array(new_new_array_path.clone(), zarr_meta.clone()).await?;

assert!(ds.has_uncommitted_changes());
let changes = ds.discard_changes();
assert!(!changes.is_empty());
assert!(!ds.has_uncommitted_changes());

// we set a chunk in a new array
ds.set_chunk_ref(
new_array_path.clone(),
Expand Down
Loading
Loading