-
Notifications
You must be signed in to change notification settings - Fork 28
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[WIP] Begin Manifest Sharding #767
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
|
@@ -4,7 +4,7 @@ use crate::format::flatbuffers::gen; | |||||
use bytes::Bytes; | ||||||
use flatbuffers::VerifierOptions; | ||||||
use futures::{Stream, TryStreamExt}; | ||||||
use itertools::Itertools; | ||||||
use itertools::{multiunzip, repeat_n, Itertools}; | ||||||
use serde::{Deserialize, Serialize}; | ||||||
use thiserror::Error; | ||||||
|
||||||
|
@@ -21,12 +21,6 @@ use super::{ | |||||
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] | ||||||
pub struct ManifestExtents(Vec<Range<u32>>); | ||||||
|
||||||
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] | ||||||
pub struct ManifestRef { | ||||||
pub object_id: ManifestId, | ||||||
pub extents: ManifestExtents, | ||||||
} | ||||||
|
||||||
impl ManifestExtents { | ||||||
pub fn new(from: &[u32], to: &[u32]) -> Self { | ||||||
let v = from | ||||||
|
@@ -37,9 +31,77 @@ impl ManifestExtents { | |||||
Self(v) | ||||||
} | ||||||
|
||||||
pub fn contains(&self, coord: &[u32]) -> bool { | ||||||
self.iter().zip(coord.iter()).all(|(range, that)| range.contains(that)) | ||||||
} | ||||||
|
||||||
pub fn iter(&self) -> impl Iterator<Item = &Range<u32>> { | ||||||
self.0.iter() | ||||||
} | ||||||
|
||||||
pub fn len(&self) -> usize { | ||||||
self.0.len() | ||||||
} | ||||||
} | ||||||
|
||||||
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] | ||||||
pub struct ManifestRef { | ||||||
pub object_id: ManifestId, | ||||||
pub extents: ManifestExtents, | ||||||
} | ||||||
|
||||||
#[derive(Debug, Clone, Serialize, Deserialize)] | ||||||
pub struct ManifestShards(Vec<ManifestExtents>); | ||||||
|
||||||
impl ManifestShards { | ||||||
pub fn default(ndim: usize) -> Self { | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't like this, but it is certainly tied to |
||||||
Self(vec![ManifestExtents(repeat_n(0..u32::MAX, ndim).collect())]) | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I guess I can do
Suggested change
|
||||||
} | ||||||
|
||||||
pub fn from_edges(iter: impl IntoIterator<Item = Vec<u32>>) -> Self { | ||||||
let res = iter | ||||||
.into_iter() | ||||||
.map(|x| x.into_iter().tuple_windows()) | ||||||
.multi_cartesian_product() | ||||||
.map(|x| multiunzip(x)) | ||||||
.map(|(from, to): (Vec<u32>, Vec<u32>)| { | ||||||
ManifestExtents::new(from.as_slice(), to.as_slice()) | ||||||
}); | ||||||
Self(res.collect()) | ||||||
} | ||||||
|
||||||
// Returns the index of shard_range that includes ChunkIndices | ||||||
// This can be used at write time to split manifests based on the config | ||||||
// and at read time to choose which manifest to query for chunk payload | ||||||
pub fn which(&self, coord: &ChunkIndices) -> Result<usize, IcechunkFormatError> { | ||||||
// shard_range[i] must bound ChunkIndices | ||||||
// 0 <= return value <= shard_range.len() | ||||||
// it is possible that shard_range does not include a coord. say we have 2x2 shard grid | ||||||
// but only shard (0,0) and shard (1,1) are populated with data. | ||||||
// A coord located in (1, 0) should return Err | ||||||
// Since shard_range need not form a regular grid, we must iterate through and find the first result. | ||||||
// ManifestExtents in shard_range MUST NOT overlap with each other. How do we ensure this? | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We probably should have a better datastructure to maintain this invariant. |
||||||
// ndim must be the same | ||||||
// debug_assert_eq!(coord.0.len(), shard_range[0].len()); | ||||||
// FIXME: could optimize for unbounded single manifest | ||||||
self.iter() | ||||||
.enumerate() | ||||||
.find(|(_, e)| e.contains(coord.0.as_slice())) | ||||||
.map(|(i, _)| i as usize) | ||||||
.ok_or(IcechunkFormatError::from( | ||||||
IcechunkFormatErrorKind::InvalidIndexForSharding { | ||||||
coords: coord.clone(), | ||||||
}, | ||||||
)) | ||||||
} | ||||||
|
||||||
pub fn iter(&self) -> impl Iterator<Item = &ManifestExtents> { | ||||||
self.0.iter() | ||||||
} | ||||||
|
||||||
pub fn len(&self) -> usize { | ||||||
self.0.len() | ||||||
} | ||||||
} | ||||||
|
||||||
#[derive(Debug, Error)] | ||||||
|
@@ -204,7 +266,7 @@ impl Manifest { | |||||
} | ||||||
|
||||||
if array_manifests.is_empty() { | ||||||
// empty manifet | ||||||
// empty manifest | ||||||
return Ok(None); | ||||||
} | ||||||
|
||||||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We need to start checking on writes that indexes have the proper size for the metadata