diff --git a/apis/python/src/tiledb/vector_search/ingestion.py b/apis/python/src/tiledb/vector_search/ingestion.py index 86f8b0c35..60a376426 100644 --- a/apis/python/src/tiledb/vector_search/ingestion.py +++ b/apis/python/src/tiledb/vector_search/ingestion.py @@ -2623,32 +2623,43 @@ def consolidate_and_vacuum( index_group_uri: str, config: Optional[Mapping[str, Any]] = None, ): + """ + Consolidate fragments. Needed because during ingestion we have multiple workers that write + different fragments. + + We don't consolidate CENTROIDS_ARRAY_NAME (and others) because they are only written once. + + We also don't consolidate type-erased indexes because they are only written once. If we add + distributed ingestion we should write a C++ method to consolidate them. + """ with tiledb.Group(index_group_uri) as group: write_group = tiledb.Group(index_group_uri, "w") - try: - if INPUT_VECTORS_ARRAY_NAME in group: - tiledb.Array.delete_array(group[INPUT_VECTORS_ARRAY_NAME].uri) - write_group.remove(INPUT_VECTORS_ARRAY_NAME) - if EXTERNAL_IDS_ARRAY_NAME in group: - tiledb.Array.delete_array(group[EXTERNAL_IDS_ARRAY_NAME].uri) - write_group.remove(EXTERNAL_IDS_ARRAY_NAME) - except tiledb.TileDBError as err: - message = str(err) - if "does not exist" not in message: - raise err - write_group.close() - - modes = ["fragment_meta", "commits", "array_meta"] - for mode in modes: - conf = tiledb.Config(config) - conf["sm.consolidation.mode"] = mode - conf["sm.vacuum.mode"] = mode - ids_uri = group[IDS_ARRAY_NAME].uri - parts_uri = group[PARTS_ARRAY_NAME].uri - tiledb.consolidate(parts_uri, config=conf) - tiledb.vacuum(parts_uri, config=conf) - tiledb.consolidate(ids_uri, config=conf) - tiledb.vacuum(ids_uri, config=conf) + + if not is_type_erased_index(index_type): + try: + if INPUT_VECTORS_ARRAY_NAME in group: + tiledb.Array.delete_array(group[INPUT_VECTORS_ARRAY_NAME].uri) + write_group.remove(INPUT_VECTORS_ARRAY_NAME) + if EXTERNAL_IDS_ARRAY_NAME in group: + tiledb.Array.delete_array(group[EXTERNAL_IDS_ARRAY_NAME].uri) + write_group.remove(EXTERNAL_IDS_ARRAY_NAME) + except tiledb.TileDBError as err: + message = str(err) + if "does not exist" not in message: + raise err + write_group.close() + modes = ["fragment_meta", "commits", "array_meta"] + for mode in modes: + conf = tiledb.Config(config) + conf["sm.consolidation.mode"] = mode + conf["sm.vacuum.mode"] = mode + ids_uri = group[IDS_ARRAY_NAME].uri + parts_uri = group[PARTS_ARRAY_NAME].uri + tiledb.consolidate(parts_uri, config=conf) + tiledb.vacuum(parts_uri, config=conf) + tiledb.consolidate(ids_uri, config=conf) + tiledb.vacuum(ids_uri, config=conf) + partial_write_array_exists = PARTIAL_WRITE_ARRAY_DIR in group if partial_write_array_exists: with tiledb.Group(index_group_uri, "w") as partial_write_array_group: diff --git a/apis/python/src/tiledb/vector_search/ivf_pq_index.py b/apis/python/src/tiledb/vector_search/ivf_pq_index.py index 657108a88..a155e8650 100644 --- a/apis/python/src/tiledb/vector_search/ivf_pq_index.py +++ b/apis/python/src/tiledb/vector_search/ivf_pq_index.py @@ -58,6 +58,7 @@ def __init__( timestamp=timestamp, open_for_remote_query_execution=open_for_remote_query_execution, ) + # TODO(SC-48710): Add support for `open_for_remote_query_execution`. We don't leave `self.index`` as `None` because we need to be able to call index.dimensions(). self.index = vspy.IndexIVFPQ(self.ctx, uri, to_temporal_policy(timestamp)) # TODO(paris): This is incorrect - should be fixed when we fix consolidation. self.db_uri = self.group[ @@ -67,19 +68,9 @@ def __init__( storage_formats[self.storage_version]["IDS_ARRAY_NAME"] ].uri - schema = tiledb.ArraySchema.load(self.db_uri, ctx=tiledb.Ctx(self.config)) self.dimensions = self.index.dimensions() - self.dtype = np.dtype(self.group.meta.get("dtype", None)) - if self.dtype is None: - self.dtype = np.dtype(schema.attr("values").dtype) - else: - self.dtype = np.dtype(self.dtype) - - if self.base_size == -1: - self.size = schema.domain.dim(1).domain[1] + 1 - else: - self.size = self.base_size + self.size = self.base_size def get_dimensions(self): """ diff --git a/apis/python/src/tiledb/vector_search/vamana_index.py b/apis/python/src/tiledb/vector_search/vamana_index.py index 39b6151ae..2f2fd6867 100644 --- a/apis/python/src/tiledb/vector_search/vamana_index.py +++ b/apis/python/src/tiledb/vector_search/vamana_index.py @@ -77,19 +77,9 @@ def __init__( storage_formats[self.storage_version]["IDS_ARRAY_NAME"] ].uri - schema = tiledb.ArraySchema.load(self.db_uri, ctx=tiledb.Ctx(self.config)) self.dimensions = self.index.dimensions() - self.dtype = np.dtype(self.group.meta.get("dtype", None)) - if self.dtype is None: - self.dtype = np.dtype(schema.attr("values").dtype) - else: - self.dtype = np.dtype(self.dtype) - - if self.base_size == -1: - self.size = schema.domain.dim(1).domain[1] + 1 - else: - self.size = self.base_size + self.size = self.base_size def get_dimensions(self): """