Skip to content

Commit

Permalink
Cleanup commented out code
Browse files Browse the repository at this point in the history
  • Loading branch information
mpiannucci committed Oct 7, 2024
1 parent a3cf364 commit bc7b2e5
Show file tree
Hide file tree
Showing 2 changed files with 0 additions and 107 deletions.
100 changes: 0 additions & 100 deletions icechunk/src/repository.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1070,106 +1070,6 @@ async fn flush(
}
}

// async fn distributed_flush<I: IntoIterator<Item = ChangeSet>>(
// storage: &(dyn Storage + Send + Sync),
// parent_id: &SnapshotId,
// change_sets: I,
// message: &str,
// properties: SnapshotProperties,
// ) -> RepositoryResult<SnapshotId> {
// let mut change_set = ChangeSet::default();
// change_set.merge_many(change_sets);
// if change_set.is_empty() {
// return Err(RepositoryError::NoChangesToCommit);
// }

// // We search for the current manifest. We are assumming a single one for now
// let old_snapshot = storage.fetch_snapshot(parent_id).await?;
// let old_snapshot_c = Arc::clone(&old_snapshot);
// let manifest_id = old_snapshot_c.iter_arc().find_map(|node| {
// match node.node_data {
// NodeData::Array(_, man) => {
// // TODO: can we avoid clone
// man.first().map(|manifest| manifest.object_id.clone())
// }
// NodeData::Group => None,
// }
// });

// let old_manifest = match manifest_id {
// Some(ref manifest_id) => storage.fetch_manifests(manifest_id).await?,
// // If there is no previous manifest we create an empty one
// None => Arc::new(Manifest::default()),
// };

// // The manifest update process is CPU intensive, so we want to executed it on a worker
// // thread. Currently it's also destructive of the manifest, so we are also cloning the
// // old manifest data
// //
// // The update process requires reference access to the set_chunks map, since we are running
// // it on blocking task, it wants that reference to be 'static, which we cannot provide.
// // As a solution, we temporarily `take` the map, replacing it an empty one, run the thread,
// // and at the end we put the map back to where it was, in case there is some later failure.
// // We always want to leave things in the previous state if there was a failure.

// let chunk_changes = Arc::new(change_set.take_chunks());
// let chunk_changes_c = Arc::clone(&chunk_changes);

// let update_task = task::spawn_blocking(move || {
// //FIXME: avoid clone, this one is extremely expensive en memory
// //it's currently needed because we don't want to destroy the manifest in case of later
// //failure
// let mut new_chunks = old_manifest.as_ref().chunks().clone();
// update_manifest(&mut new_chunks, &chunk_changes_c);
// (new_chunks, chunk_changes)
// });

// match update_task.await {
// Ok((new_chunks, chunk_changes)) => {
// // reset the set_chunks map to it's previous value
// #[allow(clippy::expect_used)]
// {
// // It's OK to call into_inner here because we created the Arc locally and never
// // shared it with other code
// let chunks =
// Arc::into_inner(chunk_changes).expect("Bug in flush task join");
// change_set.set_chunks(chunks);
// }

// let new_manifest = Arc::new(Manifest::new(new_chunks));
// let new_manifest_id = ObjectId::random();
// storage
// .write_manifests(new_manifest_id.clone(), Arc::clone(&new_manifest))
// .await?;

// let all_nodes =
// updated_nodes(storage, &change_set, parent_id, &new_manifest_id).await?;

// let mut new_snapshot = Snapshot::from_iter(
// old_snapshot.as_ref(),
// Some(properties),
// vec![ManifestFileInfo {
// id: new_manifest_id.clone(),
// format_version: new_manifest.icechunk_manifest_format_version,
// }],
// vec![],
// all_nodes,
// );
// new_snapshot.metadata.message = message.to_string();
// new_snapshot.metadata.written_at = Utc::now();

// let new_snapshot = Arc::new(new_snapshot);
// let new_snapshot_id = &new_snapshot.metadata.id;
// storage
// .write_snapshot(new_snapshot_id.clone(), Arc::clone(&new_snapshot))
// .await?;

// Ok(new_snapshot_id.clone())
// }
// Err(_) => Err(RepositoryError::OtherFlushError),
// }
// }

#[cfg(test)]
#[allow(clippy::panic, clippy::unwrap_used, clippy::expect_used)]
mod tests {
Expand Down
7 changes: 0 additions & 7 deletions icechunk/tests/test_distributed_writes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -171,13 +171,6 @@ async fn test_distributed_writes() -> Result<(), Box<dyn std::error::Error + Sen
let repo3 = write_results.pop().unwrap().unwrap();
let repo4 = write_results.pop().unwrap().unwrap();

// We get the ChangeSet from repos 2, 3 and 4, by converting them into bytes.
// This simulates a marshalling operation from a remote writer.
// let change_sets: Vec<ChangeSet> = vec![repo2.into(), repo3.into(), repo4.into()];
// let change_sets_bytes = change_sets.iter().map(|cs| cs.export_to_bytes().unwrap());
// let change_sets = change_sets_bytes
// .map(|bytes| ChangeSet::import_from_bytes(bytes.as_slice()).unwrap());

// Distributed commit now, using arbitrarily one of the repos as base and the others as extra
// changesets
repo1.merge(vec![repo2, repo3, repo4]).await?;
Expand Down

0 comments on commit bc7b2e5

Please sign in to comment.