From e7d9221607f6d883d48b80800597af3337b7ee32 Mon Sep 17 00:00:00 2001 From: Deepak Cherian Date: Thu, 27 Feb 2025 16:09:57 -0700 Subject: [PATCH] [WIP] manifest sharding --- icechunk/src/format/manifest.rs | 78 +++++++++++++++++++++--- icechunk/src/format/mod.rs | 2 + icechunk/src/format/snapshot.rs | 3 + icechunk/src/session.rs | 101 ++++++++++++++++++++++++-------- 4 files changed, 152 insertions(+), 32 deletions(-) diff --git a/icechunk/src/format/manifest.rs b/icechunk/src/format/manifest.rs index bcf665f1..aca50bd7 100644 --- a/icechunk/src/format/manifest.rs +++ b/icechunk/src/format/manifest.rs @@ -4,7 +4,7 @@ use crate::format::flatbuffers::gen; use bytes::Bytes; use flatbuffers::VerifierOptions; use futures::{Stream, TryStreamExt}; -use itertools::Itertools; +use itertools::{multiunzip, repeat_n, Itertools}; use serde::{Deserialize, Serialize}; use thiserror::Error; @@ -21,12 +21,6 @@ use super::{ #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] pub struct ManifestExtents(Vec>); -#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] -pub struct ManifestRef { - pub object_id: ManifestId, - pub extents: ManifestExtents, -} - impl ManifestExtents { pub fn new(from: &[u32], to: &[u32]) -> Self { let v = from @@ -37,9 +31,77 @@ impl ManifestExtents { Self(v) } + pub fn contains(&self, coord: &[u32]) -> bool { + self.iter().zip(coord.iter()).all(|(range, that)| range.contains(that)) + } + pub fn iter(&self) -> impl Iterator> { self.0.iter() } + + pub fn len(&self) -> usize { + self.0.len() + } +} + +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +pub struct ManifestRef { + pub object_id: ManifestId, + pub extents: ManifestExtents, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct ManifestShards(Vec); + +impl ManifestShards { + pub fn default(ndim: usize) -> Self { + Self(vec![ManifestExtents(repeat_n(0..u32::MAX, ndim).collect())]) + } + + pub fn from_edges(iter: impl IntoIterator>) -> Self { + let res = iter + .into_iter() + .map(|x| x.into_iter().tuple_windows()) + .multi_cartesian_product() + .map(|x| multiunzip(x)) + .map(|(from, to): (Vec, Vec)| { + ManifestExtents::new(from.as_slice(), to.as_slice()) + }); + Self(res.collect()) + } + + // Returns the index of shard_range that includes ChunkIndices + // This can be used at write time to split manifests based on the config + // and at read time to choose which manifest to query for chunk payload + pub fn which(&self, coord: &ChunkIndices) -> Result { + // shard_range[i] must bound ChunkIndices + // 0 <= return value <= shard_range.len() + // it is possible that shard_range does not include a coord. say we have 2x2 shard grid + // but only shard (0,0) and shard (1,1) are populated with data. + // A coord located in (1, 0) should return Err + // Since shard_range need not form a regular grid, we must iterate through and find the first result. + // ManifestExtents in shard_range MUST NOT overlap with each other. How do we ensure this? + // ndim must be the same + // debug_assert_eq!(coord.0.len(), shard_range[0].len()); + // FIXME: could optimize for unbounded single manifest + self.iter() + .enumerate() + .find(|(_, e)| e.contains(coord.0.as_slice())) + .map(|(i, _)| i as usize) + .ok_or(IcechunkFormatError::from( + IcechunkFormatErrorKind::InvalidIndexForSharding { + coords: coord.clone(), + }, + )) + } + + pub fn iter(&self) -> impl Iterator { + self.0.iter() + } + + pub fn len(&self) -> usize { + self.0.len() + } } #[derive(Debug, Error)] @@ -204,7 +266,7 @@ impl Manifest { } if array_manifests.is_empty() { - // empty manifet + // empty manifest return Ok(None); } diff --git a/icechunk/src/format/mod.rs b/icechunk/src/format/mod.rs index dcb2c877..f02b9cb3 100644 --- a/icechunk/src/format/mod.rs +++ b/icechunk/src/format/mod.rs @@ -245,6 +245,8 @@ pub enum IcechunkFormatErrorKind { NodeNotFound { path: Path }, #[error("chunk coordinates not found `{coords:?}`")] ChunkCoordinatesNotFound { coords: ChunkIndices }, + #[error("invalid chunk index for sharding manifests: {coords:?}")] + InvalidIndexForSharding { coords: ChunkIndices }, #[error("manifest information cannot be found in snapshot `{manifest_id}`")] ManifestInfoNotFound { manifest_id: ManifestId }, #[error("invalid magic numbers in file")] diff --git a/icechunk/src/format/snapshot.rs b/icechunk/src/format/snapshot.rs index c6ebcbfb..4e90e8f1 100644 --- a/icechunk/src/format/snapshot.rs +++ b/icechunk/src/format/snapshot.rs @@ -37,6 +37,9 @@ impl DimensionShape { pub struct ArrayShape(Vec); impl ArrayShape { + pub fn len(&self) -> usize { + self.0.len() + } pub fn new(it: I) -> Option where I: IntoIterator, diff --git a/icechunk/src/session.rs b/icechunk/src/session.rs index 93988195..e65f57aa 100644 --- a/icechunk/src/session.rs +++ b/icechunk/src/session.rs @@ -27,7 +27,7 @@ use crate::{ format::{ manifest::{ ChunkInfo, ChunkPayload, ChunkRef, Manifest, ManifestExtents, ManifestRef, - VirtualChunkLocation, VirtualChunkRef, VirtualReferenceError, + ManifestShards, VirtualChunkLocation, VirtualChunkRef, VirtualReferenceError, VirtualReferenceErrorKind, }, snapshot::{ @@ -1376,32 +1376,58 @@ impl<'a> FlushProcess<'a> { ) -> SessionResult<()> { let mut from = vec![]; let mut to = vec![]; - let chunks = stream::iter( - self.change_set - .new_array_chunk_iterator(node_id, node_path) - .map(Ok::), - ); - let chunks = aggregate_extents(&mut from, &mut to, chunks, |ci| &ci.coord); - - if let Some(new_manifest) = Manifest::from_stream(chunks).await.unwrap() { - let new_manifest = Arc::new(new_manifest); - let new_manifest_size = - self.asset_manager.write_manifest(Arc::clone(&new_manifest)).await?; - let file_info = - ManifestFileInfo::new(new_manifest.as_ref(), new_manifest_size); - self.manifest_files.insert(file_info); + let mut chunks = self.change_set.new_array_chunk_iterator(node_id, node_path); + + let shards = ManifestShards::from_edges(vec![vec![0, 10, 20], vec![0, 10, 20]]); + + let mut sharded_refs: HashMap> = HashMap::new(); + sharded_refs.reserve(shards.len()); + // TODO: what is a good capacity + let ref_capacity = 8_192; + sharded_refs.insert(0, Vec::with_capacity(ref_capacity)); + while let Some(chunk) = chunks.next() { + let shard_index = shards.which(&chunk.coord).unwrap(); + sharded_refs + .entry(shard_index) + .or_insert_with(|| Vec::with_capacity(ref_capacity)) + .push(chunk); + } - let new_ref = ManifestRef { - object_id: new_manifest.id().clone(), - extents: ManifestExtents::new(&from, &to), - }; + for i in 0..shards.len() { + if let Some(shard_chunks) = sharded_refs.remove(&i) { + let shard_chunks = stream::iter( + shard_chunks.into_iter().map(Ok::), + ); + let shard_chunks = + aggregate_extents(&mut from, &mut to, shard_chunks, |ci| &ci.coord); + + if let Some(new_manifest) = + Manifest::from_stream(shard_chunks).await.unwrap() + { + let new_manifest = Arc::new(new_manifest); + let new_manifest_size = self + .asset_manager + .write_manifest(Arc::clone(&new_manifest)) + .await?; + + let file_info = + ManifestFileInfo::new(new_manifest.as_ref(), new_manifest_size); + self.manifest_files.insert(file_info); + + let new_ref = ManifestRef { + object_id: new_manifest.id().clone(), + extents: ManifestExtents::new(&from, &to), + }; - self.manifest_refs - .entry(node_id.clone()) - .and_modify(|v| v.push(new_ref.clone())) - .or_insert_with(|| vec![new_ref]); + self.manifest_refs + .entry(node_id.clone()) + .and_modify(|v| v.push(new_ref.clone())) + .or_insert_with(|| vec![new_ref]); + } + } } + Ok(()) } @@ -1425,6 +1451,15 @@ impl<'a> FlushProcess<'a> { let updated_chunks = aggregate_extents(&mut from, &mut to, updated_chunks, |ci| &ci.coord); + // // FIXME + // let shards = { + // if let NodeData::Array { shape, .. } = node.node_data.clone() { + // ManifestShards::default(shape.len()) + // } else { + // todo!() + // } + // }; + if let Some(new_manifest) = Manifest::from_stream(updated_chunks).await? { let new_manifest = Arc::new(new_manifest); let new_manifest_size = @@ -1742,7 +1777,7 @@ mod tests { basic_solver::{BasicConflictSolver, VersionSelection}, detector::ConflictDetector, }, - format::manifest::ManifestExtents, + format::manifest::{ManifestExtents, ManifestShards}, refs::{fetch_tag, Ref}, repository::VersionInfo, storage::new_in_memory_storage, @@ -1938,6 +1973,24 @@ mod tests { prop_assert_eq!(to, expected_to); } + #[tokio::test] + async fn test_which_shard() -> Result<(), Box> { + let shards = ManifestShards::from_edges(vec![vec![0, 10, 20]]); + + assert_eq!(shards.which(&ChunkIndices(vec![1])).unwrap(), 0); + assert_eq!(shards.which(&ChunkIndices(vec![11])).unwrap(), 1); + + let edges = vec![vec![0, 10, 20], vec![0, 10, 20]]; + + let shards = ManifestShards::from_edges(edges); + assert_eq!(shards.which(&ChunkIndices(vec![1, 1])).unwrap(), 0); + assert_eq!(shards.which(&ChunkIndices(vec![1, 10])).unwrap(), 1); + assert_eq!(shards.which(&ChunkIndices(vec![1, 11])).unwrap(), 1); + assert!(shards.which(&ChunkIndices(vec![21, 21])).is_err()); + + Ok(()) + } + #[tokio::test(flavor = "multi_thread")] async fn test_repository_with_updates() -> Result<(), Box> { let storage: Arc = new_in_memory_storage().await?;