Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add merge method for combining changes from multiple stores #154

Closed
wants to merge 6 commits into from
Closed
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
93 changes: 33 additions & 60 deletions icechunk/src/repository.rs
Original file line number Diff line number Diff line change
Expand Up @@ -757,26 +757,13 @@ impl Repository {
Ok(existing_array_chunks.chain(new_array_chunks))
}

pub async fn distributed_flush<I: IntoIterator<Item = ChangeSet>>(
pub async fn merge<I: IntoIterator<Item = Repository>>(
&mut self,
other_change_sets: I,
message: &str,
properties: SnapshotProperties,
) -> RepositoryResult<SnapshotId> {
// FIXME: this clone can be avoided
let change_sets = iter::once(self.change_set.clone()).chain(other_change_sets);
let new_snapshot_id = distributed_flush(
self.storage.as_ref(),
self.snapshot_id(),
change_sets,
message,
properties,
)
.await?;

self.snapshot_id = new_snapshot_id.clone();
self.change_set = ChangeSet::default();
Ok(new_snapshot_id)
other_repositories: I,
) -> RepositoryResult<()> {
let change_sets = other_repositories.into_iter().map(|r| r.change_set);
self.change_set.merge_many(change_sets);
Ok(())
}

/// After changes to the repository have been made, this generates and writes to `Storage` the updated datastructures.
Expand All @@ -791,36 +778,36 @@ impl Repository {
message: &str,
properties: SnapshotProperties,
) -> RepositoryResult<SnapshotId> {
self.distributed_flush(iter::empty(), message, properties).await
}
// TODO: can this clone can be avoided? its difficult because
// self is borrows for flush and the change set should only
// be cleared after the flush is successful.
let mut change_set = self.change_set.clone();

pub async fn commit(
&mut self,
update_branch_name: &str,
message: &str,
properties: Option<SnapshotProperties>,
) -> RepositoryResult<SnapshotId> {
self.distributed_commit(update_branch_name, iter::empty(), message, properties)
.await
let new_snapshot_id = flush(
self.storage.as_ref(),
self.snapshot_id(),
&mut change_set,
message,
properties,
)
.await?;

self.snapshot_id = new_snapshot_id.clone();
self.change_set = ChangeSet::default();
Ok(new_snapshot_id)
}

pub async fn distributed_commit<I: IntoIterator<Item = ChangeSet>>(
pub async fn commit(
&mut self,
update_branch_name: &str,
other_change_sets: I,
message: &str,
properties: Option<SnapshotProperties>,
) -> RepositoryResult<SnapshotId> {
let current = fetch_branch_tip(self.storage.as_ref(), update_branch_name).await;

match current {
Err(RefError::RefNotFound(_)) => {
self.do_distributed_commit(
update_branch_name,
other_change_sets,
message,
properties,
)
.await
self.do_commit(update_branch_name, message, properties).await
}
Err(err) => Err(err.into()),
Ok(ref_data) => {
Expand All @@ -831,29 +818,21 @@ impl Repository {
actual_parent: Some(ref_data.snapshot.clone()),
})
} else {
self.do_distributed_commit(
update_branch_name,
other_change_sets,
message,
properties,
)
.await
self.do_commit(update_branch_name, message, properties).await
}
}
}
}

async fn do_distributed_commit<I: IntoIterator<Item = ChangeSet>>(
async fn do_commit(
&mut self,
update_branch_name: &str,
other_change_sets: I,
message: &str,
properties: Option<SnapshotProperties>,
) -> RepositoryResult<SnapshotId> {
let parent_snapshot = self.snapshot_id.clone();
let properties = properties.unwrap_or_default();
let new_snapshot =
self.distributed_flush(other_change_sets, message, properties).await?;
let new_snapshot = self.flush(message, properties).await?;

match update_branch(
self.storage.as_ref(),
Expand Down Expand Up @@ -997,19 +976,13 @@ async fn updated_nodes<'a>(
.chain(change_set.new_nodes_iterator(manifest_id)))
}

async fn distributed_flush<I: IntoIterator<Item = ChangeSet>>(
async fn flush(
storage: &(dyn Storage + Send + Sync),
parent_id: &SnapshotId,
change_sets: I,
change_set: &mut ChangeSet,
message: &str,
properties: SnapshotProperties,
) -> RepositoryResult<SnapshotId> {
let mut change_set = ChangeSet::default();
change_set.merge_many(change_sets);
if change_set.is_empty() {
return Err(RepositoryError::NoChangesToCommit);
}

// We search for the current manifest. We are assumming a single one for now
let old_snapshot = storage.fetch_snapshot(parent_id).await?;
let old_snapshot_c = Arc::clone(&old_snapshot);
Expand Down Expand Up @@ -1038,8 +1011,8 @@ async fn distributed_flush<I: IntoIterator<Item = ChangeSet>>(
// As a solution, we temporarily `take` the map, replacing it an empty one, run the thread,
// and at the end we put the map back to where it was, in case there is some later failure.
// We always want to leave things in the previous state if there was a failure.

let chunk_changes = Arc::new(change_set.take_chunks());
let chunks = change_set.take_chunks();
let chunk_changes = Arc::new(chunks);
let chunk_changes_c = Arc::clone(&chunk_changes);

let update_task = task::spawn_blocking(move || {
Expand Down Expand Up @@ -1070,7 +1043,7 @@ async fn distributed_flush<I: IntoIterator<Item = ChangeSet>>(
.await?;

let all_nodes =
updated_nodes(storage, &change_set, parent_id, &new_manifest_id).await?;
updated_nodes(storage, change_set, parent_id, &new_manifest_id).await?;

let mut new_snapshot = Snapshot::from_iter(
old_snapshot.as_ref(),
Expand Down
59 changes: 36 additions & 23 deletions icechunk/src/zarr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -262,6 +262,8 @@ pub enum StoreError {
NotFound(#[from] KeyNotFoundError),
#[error("unsuccessful repository operation: `{0}`")]
RepositoryError(#[from] RepositoryError),
#[error("error merging stores: `{0}`")]
MergeError(String),
#[error("cannot commit when no snapshot is present")]
NoSnapshot,
#[error("all commits must be made on a branch")]
Expand Down Expand Up @@ -435,33 +437,44 @@ impl Store {
Ok((snapshot_id, version))
}

pub async fn merge<I: IntoIterator<Item = Store>>(
&self,
other_stores: I,
) -> StoreResult<()> {
let repositories = other_stores
.into_iter()
.enumerate()
.map(|(i, store)| {
let repository_lock =
Arc::try_unwrap(store.repository).map_err(|_| {
StoreError::MergeError(format!(
"store at index {i} in merge operation is still in use"
))
})?;
let repository = repository_lock.into_inner();
Ok(repository)
})
.collect::<Result<Vec<_>, StoreError>>()?;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is required to check for errors and exit early if there are some. IDK if this is the right approach...

If one fails should the others still merge? What about ones that come before?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is the right approach. We never want jobs to successfully commit if one of their workers failed or is still working. This may even be somewhat common, bad concurrent code tries to merge while there is other thread still doing work, and in that case, Arc::try_unwrap will fail.

If they want to commit anyway, they can do it explicitly, by not passing those stores. What is very important is recoverability: We let them know something is still going, they wait and try again. So, I think there are better return types for this function, something like:

-> StoreResult<Vec<(usize, Store)>>

returns the list of Store that are still pending. The user can wait on those somehow and try to merge them again. The ones that succeeded are gone (not really gone, just merged).

Things we should think more about:

  • I don't love the Vec in the return type though, we may want to think some more. I'm a bit worried about the ugly case in which people use dask and every chunk becomes a task, and we have millions of things to merge.
  • How do we help them "wait" until they can merge?
  • There is a possible answer to both points: this function keep retrying until it succeeds. So we don't need a return type, and we are the ones waiting, but that approach sounds quite unsatisfying.

nit: try_collect is usually more readable.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A few more thoughts, always about this real wold scenario where a thread is trying to do set while other is trying to merge all repos.

  • Should this merge require a &mut instead? It may be more faithful to reality
  • Either way, my "unsatisfying approach" is not only unsatisfying but potentially deadllocking if not done carefully, both threads are trying to write to the repo.
  • I think I have a much better result type
-> Result<(), I::Iterator>

or whatever way you write t hat, there is probably some amount of as IntoItorator missing.

  • The idea is, I start merging, I'll stop as soon as I find one Repository that is not ready to commit (by that we mean: which Arc cannot be unwrapped), and when I stop I'll give you back the iterator of the remaining repos.
  • Calling code can decide what to do next: simply retry with the remaining repos, wait and retry, skip the first repo, etc.
  • Not sure how useful it would be but we can provide a ready_to_merge function that verifies the ref count on the arc == 1

We should talk more about all this, fun stuff.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just had another idea. All the issues arise because we are trying to merge multiple stores. This also makes it harder on the python side, because we need to be very careful using a generator and not a list. I think there is a much easier way, only allow merging one store into self. Let the user deal with gathering all of them and calling merge one by one. I think this is also easier on the user, they just need to get results as soon as they are produced, and call merge on them.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is also the approach I started doing in Python, so we got to the same place! I wound up reverting it because the lifetimes were driving me nuts, but I totally agree that is the approach we should use


self.repository.write().await.merge(repositories).await?;
Ok(())
}

/// Commit the current changes to the current branch. If the store is not currently
/// on a branch, this will return an error.
pub async fn commit(&mut self, message: &str) -> StoreResult<SnapshotId> {
self.distributed_commit(message, vec![]).await
}
let Some(branch) = &self.current_branch else {
return Err(StoreError::NotOnBranch);
};

pub async fn distributed_commit<'a, I: IntoIterator<Item = Vec<u8>>>(
&mut self,
message: &str,
other_changesets_bytes: I,
) -> StoreResult<SnapshotId> {
if let Some(branch) = &self.current_branch {
let other_change_sets: Vec<ChangeSet> = other_changesets_bytes
.into_iter()
.map(|v| ChangeSet::import_from_bytes(v.as_slice()))
.try_collect()?;
let result = self
.repository
.write()
.await
.deref_mut()
.distributed_commit(branch, other_change_sets, message, None)
.await?;
Ok(result)
} else {
Err(StoreError::NotOnBranch)
}
let result = self
.repository
.write()
.await
.deref_mut()
.commit(branch, message, None)
.await?;
Ok(result)
}

/// Tag the given snapshot with a specified tag
Expand Down
13 changes: 3 additions & 10 deletions icechunk/tests/test_distributed_writes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use bytes::Bytes;
use icechunk::{
format::{ByteRange, ChunkIndices, Path, SnapshotId},
metadata::{ChunkKeyEncoding, ChunkShape, DataType, FillValue},
repository::{get_chunk, ChangeSet, ZarrArrayMetadata},
repository::{get_chunk, ZarrArrayMetadata},
storage::s3::{S3Config, S3Credentials, S3Storage},
Repository, Storage,
};
Expand Down Expand Up @@ -171,17 +171,10 @@ async fn test_distributed_writes() -> Result<(), Box<dyn std::error::Error + Sen
let repo3 = write_results.pop().unwrap().unwrap();
let repo4 = write_results.pop().unwrap().unwrap();

// We get the ChangeSet from repos 2, 3 and 4, by converting them into bytes.
// This simulates a marshalling operation from a remote writer.
let change_sets: Vec<ChangeSet> = vec![repo2.into(), repo3.into(), repo4.into()];
let change_sets_bytes = change_sets.iter().map(|cs| cs.export_to_bytes().unwrap());
let change_sets = change_sets_bytes
.map(|bytes| ChangeSet::import_from_bytes(bytes.as_slice()).unwrap());

// Distributed commit now, using arbitrarily one of the repos as base and the others as extra
// changesets
let _new_snapshot =
repo1.distributed_commit("main", change_sets, "distributed commit", None).await?;
repo1.merge(vec![repo2, repo3, repo4]).await?;
let _new_snapshot = repo1.commit("main", "distributed commit", None).await?;

// We check we can read all chunks correctly
verify(repo1).await?;
Expand Down
Loading