Skip to content

Commit

Permalink
cleanup ivf_index() code
Browse files Browse the repository at this point in the history
  • Loading branch information
jparismorgan committed Jul 1, 2024
1 parent 6d908ce commit f4c79ff
Show file tree
Hide file tree
Showing 2 changed files with 148 additions and 169 deletions.
150 changes: 53 additions & 97 deletions apis/python/src/tiledb/vector_search/ingestion.py
Original file line number Diff line number Diff line change
Expand Up @@ -1742,37 +1742,25 @@ def ingest_vectors_udf(
)
if source_type == "TILEDB_ARRAY":
logger.debug("Start indexing")
if index_timestamp is None:
ivf_index_tdb(
dtype=vector_type,
db_uri=source_uri,
external_ids_uri=external_ids_uri,
deleted_ids=StdVector_u64(updated_ids),
centroids_uri=centroids_uri,
parts_uri=partial_write_array_parts_uri,
index_array_uri=partial_write_array_index_uri,
id_uri=partial_write_array_ids_uri,
start=part,
end=part_end,
nthreads=threads,
config=config,
)
else:
ivf_index_tdb(
dtype=vector_type,
db_uri=source_uri,
external_ids_uri=external_ids_uri,
deleted_ids=StdVector_u64(updated_ids),
centroids_uri=centroids_uri,
parts_uri=partial_write_array_parts_uri,
index_array_uri=partial_write_array_index_uri,
id_uri=partial_write_array_ids_uri,
start=part,
end=part_end,
nthreads=threads,
timestamp=index_timestamp,
config=config,
)
ivf_index_tdb(
dtype=vector_type,
db_uri=source_uri,
external_ids_uri=external_ids_uri,
deleted_ids=StdVector_u64(updated_ids),
centroids_uri=centroids_uri,
parts_uri=partial_write_array_parts_uri,
index_array_uri=partial_write_array_index_uri,
id_uri=partial_write_array_ids_uri,
start=part,
end=part_end,
nthreads=threads,
**(
{"timestamp": index_timestamp}
if index_timestamp is not None
else {}
),
config=config,
)
else:
in_vectors = read_input_vectors(
source_uri=source_uri,
Expand All @@ -1795,41 +1783,25 @@ def ingest_vectors_udf(
trace_id=trace_id,
)
logger.debug("Start indexing")
if index_timestamp is None:
ivf_index(
dtype=vector_type,
db=array_to_matrix(
np.transpose(in_vectors).astype(vector_type)
),
external_ids=StdVector_u64(external_ids),
deleted_ids=StdVector_u64(updated_ids),
centroids_uri=centroids_uri,
parts_uri=partial_write_array_parts_uri,
index_array_uri=partial_write_array_index_uri,
id_uri=partial_write_array_ids_uri,
start=part,
end=part_end,
nthreads=threads,
config=config,
)
else:
ivf_index(
dtype=vector_type,
db=array_to_matrix(
np.transpose(in_vectors).astype(vector_type)
),
external_ids=StdVector_u64(external_ids),
deleted_ids=StdVector_u64(updated_ids),
centroids_uri=centroids_uri,
parts_uri=partial_write_array_parts_uri,
index_array_uri=partial_write_array_index_uri,
id_uri=partial_write_array_ids_uri,
start=part,
end=part_end,
nthreads=threads,
timestamp=index_timestamp,
config=config,
)
ivf_index(
dtype=vector_type,
db=array_to_matrix(np.transpose(in_vectors).astype(vector_type)),
external_ids=StdVector_u64(external_ids),
deleted_ids=StdVector_u64(updated_ids),
centroids_uri=centroids_uri,
parts_uri=partial_write_array_parts_uri,
index_array_uri=partial_write_array_index_uri,
id_uri=partial_write_array_ids_uri,
start=part,
end=part_end,
nthreads=threads,
**(
{"timestamp": index_timestamp}
if index_timestamp is not None
else {}
),
config=config,
)

def ingest_additions_udf(
index_group_uri: str,
Expand Down Expand Up @@ -1870,37 +1842,21 @@ def ingest_additions_udf(
return

logger.debug(f"Ingesting additions {partial_write_array_index_uri}")
if index_timestamp is None:
ivf_index(
dtype=vector_type,
db=array_to_matrix(np.transpose(additions_vectors).astype(vector_type)),
external_ids=StdVector_u64(additions_external_ids),
deleted_ids=StdVector_u64(np.array([], np.uint64)),
centroids_uri=centroids_uri,
parts_uri=partial_write_array_parts_uri,
index_array_uri=partial_write_array_index_uri,
id_uri=partial_write_array_ids_uri,
start=write_offset,
end=0,
nthreads=threads,
config=config,
)
else:
ivf_index(
dtype=vector_type,
db=array_to_matrix(np.transpose(additions_vectors).astype(vector_type)),
external_ids=StdVector_u64(additions_external_ids),
deleted_ids=StdVector_u64(np.array([], np.uint64)),
centroids_uri=centroids_uri,
parts_uri=partial_write_array_parts_uri,
index_array_uri=partial_write_array_index_uri,
id_uri=partial_write_array_ids_uri,
start=write_offset,
end=0,
nthreads=threads,
timestamp=index_timestamp,
config=config,
)
ivf_index(
dtype=vector_type,
db=array_to_matrix(np.transpose(additions_vectors).astype(vector_type)),
external_ids=StdVector_u64(additions_external_ids),
deleted_ids=StdVector_u64(np.array([], np.uint64)),
centroids_uri=centroids_uri,
parts_uri=partial_write_array_parts_uri,
index_array_uri=partial_write_array_index_uri,
id_uri=partial_write_array_ids_uri,
start=write_offset,
end=0,
nthreads=threads,
**({"timestamp": index_timestamp} if index_timestamp is not None else {}),
config=config,
)

def compute_partition_indexes_udf(
index_group_uri: str,
Expand Down
Loading

0 comments on commit f4c79ff

Please sign in to comment.