From bc7b2e5b551162f043e6e5e8accce2af8cc1cab5 Mon Sep 17 00:00:00 2001 From: Matthew Iannucci Date: Mon, 7 Oct 2024 12:51:11 -0400 Subject: [PATCH] Cleanup commented out code --- icechunk/src/repository.rs | 100 ---------------------- icechunk/tests/test_distributed_writes.rs | 7 -- 2 files changed, 107 deletions(-) diff --git a/icechunk/src/repository.rs b/icechunk/src/repository.rs index 648c618e..2b3de018 100644 --- a/icechunk/src/repository.rs +++ b/icechunk/src/repository.rs @@ -1070,106 +1070,6 @@ async fn flush( } } -// async fn distributed_flush>( -// storage: &(dyn Storage + Send + Sync), -// parent_id: &SnapshotId, -// change_sets: I, -// message: &str, -// properties: SnapshotProperties, -// ) -> RepositoryResult { -// let mut change_set = ChangeSet::default(); -// change_set.merge_many(change_sets); -// if change_set.is_empty() { -// return Err(RepositoryError::NoChangesToCommit); -// } - -// // We search for the current manifest. We are assumming a single one for now -// let old_snapshot = storage.fetch_snapshot(parent_id).await?; -// let old_snapshot_c = Arc::clone(&old_snapshot); -// let manifest_id = old_snapshot_c.iter_arc().find_map(|node| { -// match node.node_data { -// NodeData::Array(_, man) => { -// // TODO: can we avoid clone -// man.first().map(|manifest| manifest.object_id.clone()) -// } -// NodeData::Group => None, -// } -// }); - -// let old_manifest = match manifest_id { -// Some(ref manifest_id) => storage.fetch_manifests(manifest_id).await?, -// // If there is no previous manifest we create an empty one -// None => Arc::new(Manifest::default()), -// }; - -// // The manifest update process is CPU intensive, so we want to executed it on a worker -// // thread. Currently it's also destructive of the manifest, so we are also cloning the -// // old manifest data -// // -// // The update process requires reference access to the set_chunks map, since we are running -// // it on blocking task, it wants that reference to be 'static, which we cannot provide. -// // As a solution, we temporarily `take` the map, replacing it an empty one, run the thread, -// // and at the end we put the map back to where it was, in case there is some later failure. -// // We always want to leave things in the previous state if there was a failure. - -// let chunk_changes = Arc::new(change_set.take_chunks()); -// let chunk_changes_c = Arc::clone(&chunk_changes); - -// let update_task = task::spawn_blocking(move || { -// //FIXME: avoid clone, this one is extremely expensive en memory -// //it's currently needed because we don't want to destroy the manifest in case of later -// //failure -// let mut new_chunks = old_manifest.as_ref().chunks().clone(); -// update_manifest(&mut new_chunks, &chunk_changes_c); -// (new_chunks, chunk_changes) -// }); - -// match update_task.await { -// Ok((new_chunks, chunk_changes)) => { -// // reset the set_chunks map to it's previous value -// #[allow(clippy::expect_used)] -// { -// // It's OK to call into_inner here because we created the Arc locally and never -// // shared it with other code -// let chunks = -// Arc::into_inner(chunk_changes).expect("Bug in flush task join"); -// change_set.set_chunks(chunks); -// } - -// let new_manifest = Arc::new(Manifest::new(new_chunks)); -// let new_manifest_id = ObjectId::random(); -// storage -// .write_manifests(new_manifest_id.clone(), Arc::clone(&new_manifest)) -// .await?; - -// let all_nodes = -// updated_nodes(storage, &change_set, parent_id, &new_manifest_id).await?; - -// let mut new_snapshot = Snapshot::from_iter( -// old_snapshot.as_ref(), -// Some(properties), -// vec![ManifestFileInfo { -// id: new_manifest_id.clone(), -// format_version: new_manifest.icechunk_manifest_format_version, -// }], -// vec![], -// all_nodes, -// ); -// new_snapshot.metadata.message = message.to_string(); -// new_snapshot.metadata.written_at = Utc::now(); - -// let new_snapshot = Arc::new(new_snapshot); -// let new_snapshot_id = &new_snapshot.metadata.id; -// storage -// .write_snapshot(new_snapshot_id.clone(), Arc::clone(&new_snapshot)) -// .await?; - -// Ok(new_snapshot_id.clone()) -// } -// Err(_) => Err(RepositoryError::OtherFlushError), -// } -// } - #[cfg(test)] #[allow(clippy::panic, clippy::unwrap_used, clippy::expect_used)] mod tests { diff --git a/icechunk/tests/test_distributed_writes.rs b/icechunk/tests/test_distributed_writes.rs index 7a15be97..c7ce9594 100644 --- a/icechunk/tests/test_distributed_writes.rs +++ b/icechunk/tests/test_distributed_writes.rs @@ -171,13 +171,6 @@ async fn test_distributed_writes() -> Result<(), Box = vec![repo2.into(), repo3.into(), repo4.into()]; - // let change_sets_bytes = change_sets.iter().map(|cs| cs.export_to_bytes().unwrap()); - // let change_sets = change_sets_bytes - // .map(|bytes| ChangeSet::import_from_bytes(bytes.as_slice()).unwrap()); - // Distributed commit now, using arbitrarily one of the repos as base and the others as extra // changesets repo1.merge(vec![repo2, repo3, repo4]).await?;