Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] Begin Manifest Sharding #767

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
78 changes: 70 additions & 8 deletions icechunk/src/format/manifest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use crate::format::flatbuffers::gen;
use bytes::Bytes;
use flatbuffers::VerifierOptions;
use futures::{Stream, TryStreamExt};
use itertools::Itertools;
use itertools::{multiunzip, repeat_n, Itertools};
use serde::{Deserialize, Serialize};
use thiserror::Error;

Expand All @@ -21,12 +21,6 @@ use super::{
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct ManifestExtents(Vec<Range<u32>>);

#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct ManifestRef {
pub object_id: ManifestId,
pub extents: ManifestExtents,
}

impl ManifestExtents {
pub fn new(from: &[u32], to: &[u32]) -> Self {
let v = from
Expand All @@ -37,9 +31,77 @@ impl ManifestExtents {
Self(v)
}

pub fn contains(&self, coord: &[u32]) -> bool {
self.iter().zip(coord.iter()).all(|(range, that)| range.contains(that))
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to start checking on writes that indexes have the proper size for the metadata

}

pub fn iter(&self) -> impl Iterator<Item = &Range<u32>> {
self.0.iter()
}

pub fn len(&self) -> usize {
self.0.len()
}
}

#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct ManifestRef {
pub object_id: ManifestId,
pub extents: ManifestExtents,
}

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ManifestShards(Vec<ManifestExtents>);

impl ManifestShards {
pub fn default(ndim: usize) -> Self {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't like this, but it is certainly tied to ndim.

Self(vec![ManifestExtents(repeat_n(0..u32::MAX, ndim).collect())])
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess I can do

Suggested change
Self(vec![ManifestExtents(repeat_n(0..u32::MAX, ndim).collect())])
Self(vec![ManifestExtents(repeat_n(0.., ndim).collect())])

}

pub fn from_edges(iter: impl IntoIterator<Item = Vec<u32>>) -> Self {
let res = iter
.into_iter()
.map(|x| x.into_iter().tuple_windows())
.multi_cartesian_product()
.map(|x| multiunzip(x))
.map(|(from, to): (Vec<u32>, Vec<u32>)| {
ManifestExtents::new(from.as_slice(), to.as_slice())
});
Self(res.collect())
}

// Returns the index of shard_range that includes ChunkIndices
// This can be used at write time to split manifests based on the config
// and at read time to choose which manifest to query for chunk payload
pub fn which(&self, coord: &ChunkIndices) -> Result<usize, IcechunkFormatError> {
// shard_range[i] must bound ChunkIndices
// 0 <= return value <= shard_range.len()
// it is possible that shard_range does not include a coord. say we have 2x2 shard grid
// but only shard (0,0) and shard (1,1) are populated with data.
// A coord located in (1, 0) should return Err
// Since shard_range need not form a regular grid, we must iterate through and find the first result.
// ManifestExtents in shard_range MUST NOT overlap with each other. How do we ensure this?
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We probably should have a better datastructure to maintain this invariant.

// ndim must be the same
// debug_assert_eq!(coord.0.len(), shard_range[0].len());
// FIXME: could optimize for unbounded single manifest
self.iter()
.enumerate()
.find(|(_, e)| e.contains(coord.0.as_slice()))
.map(|(i, _)| i as usize)
.ok_or(IcechunkFormatError::from(
IcechunkFormatErrorKind::InvalidIndexForSharding {
coords: coord.clone(),
},
))
}

pub fn iter(&self) -> impl Iterator<Item = &ManifestExtents> {
self.0.iter()
}

pub fn len(&self) -> usize {
self.0.len()
}
}

#[derive(Debug, Error)]
Expand Down Expand Up @@ -204,7 +266,7 @@ impl Manifest {
}

if array_manifests.is_empty() {
// empty manifet
// empty manifest
return Ok(None);
}

Expand Down
2 changes: 2 additions & 0 deletions icechunk/src/format/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,8 @@ pub enum IcechunkFormatErrorKind {
NodeNotFound { path: Path },
#[error("chunk coordinates not found `{coords:?}`")]
ChunkCoordinatesNotFound { coords: ChunkIndices },
#[error("invalid chunk index for sharding manifests: {coords:?}")]
InvalidIndexForSharding { coords: ChunkIndices },
#[error("manifest information cannot be found in snapshot `{manifest_id}`")]
ManifestInfoNotFound { manifest_id: ManifestId },
#[error("invalid magic numbers in file")]
Expand Down
3 changes: 3 additions & 0 deletions icechunk/src/format/snapshot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,9 @@ impl DimensionShape {
pub struct ArrayShape(Vec<DimensionShape>);

impl ArrayShape {
pub fn len(&self) -> usize {
self.0.len()
}
pub fn new<I>(it: I) -> Option<Self>
where
I: IntoIterator<Item = (u64, u64)>,
Expand Down
101 changes: 77 additions & 24 deletions icechunk/src/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use crate::{
format::{
manifest::{
ChunkInfo, ChunkPayload, ChunkRef, Manifest, ManifestExtents, ManifestRef,
VirtualChunkLocation, VirtualChunkRef, VirtualReferenceError,
ManifestShards, VirtualChunkLocation, VirtualChunkRef, VirtualReferenceError,
VirtualReferenceErrorKind,
},
snapshot::{
Expand Down Expand Up @@ -1376,32 +1376,58 @@ impl<'a> FlushProcess<'a> {
) -> SessionResult<()> {
let mut from = vec![];
let mut to = vec![];
let chunks = stream::iter(
self.change_set
.new_array_chunk_iterator(node_id, node_path)
.map(Ok::<ChunkInfo, Infallible>),
);
let chunks = aggregate_extents(&mut from, &mut to, chunks, |ci| &ci.coord);

if let Some(new_manifest) = Manifest::from_stream(chunks).await.unwrap() {
let new_manifest = Arc::new(new_manifest);
let new_manifest_size =
self.asset_manager.write_manifest(Arc::clone(&new_manifest)).await?;

let file_info =
ManifestFileInfo::new(new_manifest.as_ref(), new_manifest_size);
self.manifest_files.insert(file_info);
let mut chunks = self.change_set.new_array_chunk_iterator(node_id, node_path);

let shards = ManifestShards::from_edges(vec![vec![0, 10, 20], vec![0, 10, 20]]);

let mut sharded_refs: HashMap<usize, Vec<_>> = HashMap::new();
sharded_refs.reserve(shards.len());
// TODO: what is a good capacity
let ref_capacity = 8_192;
sharded_refs.insert(0, Vec::with_capacity(ref_capacity));
while let Some(chunk) = chunks.next() {
let shard_index = shards.which(&chunk.coord).unwrap();
sharded_refs
.entry(shard_index)
.or_insert_with(|| Vec::with_capacity(ref_capacity))
.push(chunk);
}

let new_ref = ManifestRef {
object_id: new_manifest.id().clone(),
extents: ManifestExtents::new(&from, &to),
};
for i in 0..shards.len() {
if let Some(shard_chunks) = sharded_refs.remove(&i) {
let shard_chunks = stream::iter(
shard_chunks.into_iter().map(Ok::<ChunkInfo, Infallible>),
);
let shard_chunks =
aggregate_extents(&mut from, &mut to, shard_chunks, |ci| &ci.coord);

if let Some(new_manifest) =
Manifest::from_stream(shard_chunks).await.unwrap()
{
let new_manifest = Arc::new(new_manifest);
let new_manifest_size = self
.asset_manager
.write_manifest(Arc::clone(&new_manifest))
.await?;

let file_info =
ManifestFileInfo::new(new_manifest.as_ref(), new_manifest_size);
self.manifest_files.insert(file_info);

let new_ref = ManifestRef {
object_id: new_manifest.id().clone(),
extents: ManifestExtents::new(&from, &to),
};

self.manifest_refs
.entry(node_id.clone())
.and_modify(|v| v.push(new_ref.clone()))
.or_insert_with(|| vec![new_ref]);
self.manifest_refs
.entry(node_id.clone())
.and_modify(|v| v.push(new_ref.clone()))
.or_insert_with(|| vec![new_ref]);
}
}
}

Ok(())
}

Expand All @@ -1425,6 +1451,15 @@ impl<'a> FlushProcess<'a> {
let updated_chunks =
aggregate_extents(&mut from, &mut to, updated_chunks, |ci| &ci.coord);

// // FIXME
// let shards = {
// if let NodeData::Array { shape, .. } = node.node_data.clone() {
// ManifestShards::default(shape.len())
// } else {
// todo!()
// }
// };

if let Some(new_manifest) = Manifest::from_stream(updated_chunks).await? {
let new_manifest = Arc::new(new_manifest);
let new_manifest_size =
Expand Down Expand Up @@ -1742,7 +1777,7 @@ mod tests {
basic_solver::{BasicConflictSolver, VersionSelection},
detector::ConflictDetector,
},
format::manifest::ManifestExtents,
format::manifest::{ManifestExtents, ManifestShards},
refs::{fetch_tag, Ref},
repository::VersionInfo,
storage::new_in_memory_storage,
Expand Down Expand Up @@ -1938,6 +1973,24 @@ mod tests {
prop_assert_eq!(to, expected_to);
}

#[tokio::test]
async fn test_which_shard() -> Result<(), Box<dyn Error>> {
let shards = ManifestShards::from_edges(vec![vec![0, 10, 20]]);

assert_eq!(shards.which(&ChunkIndices(vec![1])).unwrap(), 0);
assert_eq!(shards.which(&ChunkIndices(vec![11])).unwrap(), 1);

let edges = vec![vec![0, 10, 20], vec![0, 10, 20]];

let shards = ManifestShards::from_edges(edges);
assert_eq!(shards.which(&ChunkIndices(vec![1, 1])).unwrap(), 0);
assert_eq!(shards.which(&ChunkIndices(vec![1, 10])).unwrap(), 1);
assert_eq!(shards.which(&ChunkIndices(vec![1, 11])).unwrap(), 1);
assert!(shards.which(&ChunkIndices(vec![21, 21])).is_err());

Ok(())
}

#[tokio::test(flavor = "multi_thread")]
async fn test_repository_with_updates() -> Result<(), Box<dyn Error>> {
let storage: Arc<dyn Storage + Send + Sync> = new_in_memory_storage().await?;
Expand Down
Loading