Skip to content

Commit

Permalink
For type-erased Python indexes, 1) Don't consolidate parts and ids Ar…
Browse files Browse the repository at this point in the history
…rays 2) Avoid extra Schema open in constructor (#444)
  • Loading branch information
jparismorgan authored Jul 12, 2024
1 parent ed04aee commit 4d743b5
Show file tree
Hide file tree
Showing 3 changed files with 38 additions and 46 deletions.
59 changes: 35 additions & 24 deletions apis/python/src/tiledb/vector_search/ingestion.py
Original file line number Diff line number Diff line change
Expand Up @@ -2623,32 +2623,43 @@ def consolidate_and_vacuum(
index_group_uri: str,
config: Optional[Mapping[str, Any]] = None,
):
"""
Consolidate fragments. Needed because during ingestion we have multiple workers that write
different fragments.
We don't consolidate CENTROIDS_ARRAY_NAME (and others) because they are only written once.
We also don't consolidate type-erased indexes because they are only written once. If we add
distributed ingestion we should write a C++ method to consolidate them.
"""
with tiledb.Group(index_group_uri) as group:
write_group = tiledb.Group(index_group_uri, "w")
try:
if INPUT_VECTORS_ARRAY_NAME in group:
tiledb.Array.delete_array(group[INPUT_VECTORS_ARRAY_NAME].uri)
write_group.remove(INPUT_VECTORS_ARRAY_NAME)
if EXTERNAL_IDS_ARRAY_NAME in group:
tiledb.Array.delete_array(group[EXTERNAL_IDS_ARRAY_NAME].uri)
write_group.remove(EXTERNAL_IDS_ARRAY_NAME)
except tiledb.TileDBError as err:
message = str(err)
if "does not exist" not in message:
raise err
write_group.close()

modes = ["fragment_meta", "commits", "array_meta"]
for mode in modes:
conf = tiledb.Config(config)
conf["sm.consolidation.mode"] = mode
conf["sm.vacuum.mode"] = mode
ids_uri = group[IDS_ARRAY_NAME].uri
parts_uri = group[PARTS_ARRAY_NAME].uri
tiledb.consolidate(parts_uri, config=conf)
tiledb.vacuum(parts_uri, config=conf)
tiledb.consolidate(ids_uri, config=conf)
tiledb.vacuum(ids_uri, config=conf)

if not is_type_erased_index(index_type):
try:
if INPUT_VECTORS_ARRAY_NAME in group:
tiledb.Array.delete_array(group[INPUT_VECTORS_ARRAY_NAME].uri)
write_group.remove(INPUT_VECTORS_ARRAY_NAME)
if EXTERNAL_IDS_ARRAY_NAME in group:
tiledb.Array.delete_array(group[EXTERNAL_IDS_ARRAY_NAME].uri)
write_group.remove(EXTERNAL_IDS_ARRAY_NAME)
except tiledb.TileDBError as err:
message = str(err)
if "does not exist" not in message:
raise err
write_group.close()
modes = ["fragment_meta", "commits", "array_meta"]
for mode in modes:
conf = tiledb.Config(config)
conf["sm.consolidation.mode"] = mode
conf["sm.vacuum.mode"] = mode
ids_uri = group[IDS_ARRAY_NAME].uri
parts_uri = group[PARTS_ARRAY_NAME].uri
tiledb.consolidate(parts_uri, config=conf)
tiledb.vacuum(parts_uri, config=conf)
tiledb.consolidate(ids_uri, config=conf)
tiledb.vacuum(ids_uri, config=conf)

partial_write_array_exists = PARTIAL_WRITE_ARRAY_DIR in group
if partial_write_array_exists:
with tiledb.Group(index_group_uri, "w") as partial_write_array_group:
Expand Down
13 changes: 2 additions & 11 deletions apis/python/src/tiledb/vector_search/ivf_pq_index.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ def __init__(
timestamp=timestamp,
open_for_remote_query_execution=open_for_remote_query_execution,
)
# TODO(SC-48710): Add support for `open_for_remote_query_execution`. We don't leave `self.index`` as `None` because we need to be able to call index.dimensions().
self.index = vspy.IndexIVFPQ(self.ctx, uri, to_temporal_policy(timestamp))
# TODO(paris): This is incorrect - should be fixed when we fix consolidation.
self.db_uri = self.group[
Expand All @@ -67,19 +68,9 @@ def __init__(
storage_formats[self.storage_version]["IDS_ARRAY_NAME"]
].uri

schema = tiledb.ArraySchema.load(self.db_uri, ctx=tiledb.Ctx(self.config))
self.dimensions = self.index.dimensions()

self.dtype = np.dtype(self.group.meta.get("dtype", None))
if self.dtype is None:
self.dtype = np.dtype(schema.attr("values").dtype)
else:
self.dtype = np.dtype(self.dtype)

if self.base_size == -1:
self.size = schema.domain.dim(1).domain[1] + 1
else:
self.size = self.base_size
self.size = self.base_size

def get_dimensions(self):
"""
Expand Down
12 changes: 1 addition & 11 deletions apis/python/src/tiledb/vector_search/vamana_index.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,19 +77,9 @@ def __init__(
storage_formats[self.storage_version]["IDS_ARRAY_NAME"]
].uri

schema = tiledb.ArraySchema.load(self.db_uri, ctx=tiledb.Ctx(self.config))
self.dimensions = self.index.dimensions()

self.dtype = np.dtype(self.group.meta.get("dtype", None))
if self.dtype is None:
self.dtype = np.dtype(schema.attr("values").dtype)
else:
self.dtype = np.dtype(self.dtype)

if self.base_size == -1:
self.size = schema.domain.dim(1).domain[1] + 1
else:
self.size = self.base_size
self.size = self.base_size

def get_dimensions(self):
"""
Expand Down

0 comments on commit 4d743b5

Please sign in to comment.