Skip to content

Commit

Permalink
Updates for C++ k-means integration
Browse files Browse the repository at this point in the history
Squashed from: #147
pick 697c481 Parameterize min heap with comparison function [skip ci]
pick e5a797a Debug zero cluster fix [skip ci]
pick d085f66 Uncomment debug statements [skip ci]
pick 6b07f17 Initial partition-equalization
pick 9867c90 Updates for kmeans and kmeans++
pick ff71e87 Small update
pick 9176a54 clean up warnings, clang-format
pick e5e3690 Add documentation, update unit tests
pick 76b2fe8 Replace std::abs<size_t> with std::labs
pick 28651e6 Supress std::labs warnings
pick f156dfa Small bug fix in predict
pick d8b0871 clang-format [skip ci]
pick ae5fca4 Add documentation, verify build
  • Loading branch information
lums658 authored and ihnorton committed Nov 14, 2023
1 parent 7bbcaa0 commit c394d0b
Show file tree
Hide file tree
Showing 45 changed files with 1,578 additions and 777 deletions.
1 change: 1 addition & 0 deletions .github/workflows/build_wheels.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ on:
push:
branches:
- release-*
- '*wheel*' # must quote since "*" is a YAML reserved character; we want a string
tags:
- '*'
pull_request:
Expand Down
4 changes: 4 additions & 0 deletions .github/workflows/ci_python.yml
Original file line number Diff line number Diff line change
Expand Up @@ -35,4 +35,8 @@ jobs:
#pip uninstall -y tiledb.vector_search
#pip install -e .
#pytest
pip install -r test/ipynb/requirements.txt
pytest --nbmake test/ipynb
env:
TILEDB_REST_TOKEN: ${{ secrets.TILEDB_CLOUD_HELPER_VAR }}
shell: bash -el {0}
2 changes: 1 addition & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ RUN conda config --prepend channels conda-forge
# Install mamba for faster installations
RUN conda install mamba

RUN mamba install -y -c tiledb 'tiledb>=2.16,<2.17' tiledb-py cmake pybind11 pytest c-compiler cxx-compiler ninja openblas-devel "pip>22"
RUN mamba install -y -c tiledb 'tiledb>=2.17,<2.18' tiledb-py cmake pybind11 pytest c-compiler cxx-compiler ninja openblas-devel "pip>22"

COPY . TileDB-Vector-Search/

Expand Down
12 changes: 12 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -42,3 +42,15 @@ development-build instructions. For large new
features, please open an issue to discuss goals and approach in order
to ensure a smooth PR integration and review process. All contributions
must be licensed under the repository's [MIT License](../LICENSE).

# Testing

* Unit tests: `pytest`
* Demo notebooks:
* ```
pip install -r test/ipynb/requirements.txt
pytest --nbmake test/ipynb
```
* Credentials:
* Some tests run on TileDB Cloud using your current environment variable `TILEDB_REST_TOKEN` -- you will need a valid API token for the tests to pass
* For continuous integration, the token is configured for the `unittest` user and all tests should pass
4 changes: 2 additions & 2 deletions apis/python/pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[project]
name = "tiledb-vector-search"
version = "0.0.10"
version = "0.0.14"
#dynamic = ["version"]
description = "TileDB Vector Search Python client"
license = { text = "MIT" }
Expand All @@ -26,7 +26,7 @@ dependencies = [
]

[project.optional-dependencies]
test = ["pytest"]
test = ["nbmake", "pytest"]


[project.urls]
Expand Down
11 changes: 7 additions & 4 deletions apis/python/src/tiledb/vector_search/flat_index.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,13 @@ def __init__(
ctx=self.ctx,
config=config,
)
self.ids_uri = self.group[
storage_formats[self.storage_version]["IDS_ARRAY_NAME"] + self.index_version
].uri
if tiledb.array_exists(self.ids_uri, self.ctx):

# Check for existence of ids array. Previous versions were not using external_ids in the ingestion assuming
# that the external_ids were the position of the vector in the array.
if storage_formats[self.storage_version]["IDS_ARRAY_NAME"] + self.index_version in self.group:
self.ids_uri = self.group[
storage_formats[self.storage_version]["IDS_ARRAY_NAME"] + self.index_version
].uri
self._ids = read_vector_u64(self.ctx, self.ids_uri, 0, 0)
else:
self._ids = StdVector_u64(np.arange(self.size).astype(np.uint64))
Expand Down
148 changes: 99 additions & 49 deletions apis/python/src/tiledb/vector_search/index.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import concurrent.futures as futures
import os
import numpy as np
import sys

Expand All @@ -20,6 +22,7 @@ class Index:
config: Optional[Mapping[str, Any]]
config dictionary, defaults to None
"""

def __init__(
self,
uri: str,
Expand All @@ -36,16 +39,28 @@ def __init__(
self.storage_version = self.group.meta.get("storage_version", "0.1")
self.update_arrays_uri = None
self.index_version = self.group.meta.get("index_version", "")

self.thread_executor = futures.ThreadPoolExecutor()

def query(self, queries: np.ndarray, k, **kwargs):
updated_ids = set(self.read_updated_ids())
retrieval_k = k
if len(updated_ids) > 0:
retrieval_k = 2*k
internal_results_d, internal_results_i = self.query_internal(queries, retrieval_k, **kwargs)
if self.update_arrays_uri is None:
return internal_results_d[:, 0:k], internal_results_i[:, 0:k]
return self.query_internal(queries, k, **kwargs)

# Query with updates
# Perform the queries in parallel
retrieval_k = 2 * k
kwargs["nthreads"] = int(os.cpu_count() / 2)
future = self.thread_executor.submit(
Index.query_additions,
queries,
k,
self.dtype,
self.update_arrays_uri,
int(os.cpu_count() / 2),
)
internal_results_d, internal_results_i = self.query_internal(
queries, retrieval_k, **kwargs
)
addition_results_d, addition_results_i, updated_ids = future.result()

# Filter updated vectors
query_id = 0
Expand All @@ -55,119 +70,137 @@ def query(self, queries: np.ndarray, k, **kwargs):
if res in updated_ids:
internal_results_d[query_id, res_id] = MAX_FLOAT_32
internal_results_i[query_id, res_id] = MAX_UINT64
if (
internal_results_d[query_id, res_id] == 0
and internal_results_i[query_id, res_id] == 0
):
internal_results_d[query_id, res_id] = MAX_FLOAT_32
internal_results_i[query_id, res_id] = MAX_UINT64
res_id += 1
query_id += 1
sort_index = np.argsort(internal_results_d, axis=1)
internal_results_d = np.take_along_axis(internal_results_d, sort_index, axis=1)
internal_results_i = np.take_along_axis(internal_results_i, sort_index, axis=1)

# Merge update results
addition_results_d, addition_results_i = self.query_additions(queries, k)
if addition_results_d is None:
return internal_results_d[:, 0:k], internal_results_i[:, 0:k]

query_id = 0
for query in addition_results_d:
res_id = 0
for res in query:
if addition_results_d[query_id, res_id] == 0 and addition_results_i[query_id, res_id] == 0:
if (
addition_results_d[query_id, res_id] == 0
and addition_results_i[query_id, res_id] == 0
):
addition_results_d[query_id, res_id] = MAX_FLOAT_32
addition_results_i[query_id, res_id] = MAX_UINT64
res_id += 1
query_id += 1


results_d = np.hstack((internal_results_d, addition_results_d))
results_i = np.hstack((internal_results_i, addition_results_i))
sort_index = np.argsort(results_d, axis=1)
results_d = np.take_along_axis(results_d, sort_index, axis=1)
results_i = np.take_along_axis(results_i, sort_index, axis=1)
return results_d[:, 0:k], results_i[:, 0:k]

def query_internal(self, queries: np.ndarray, k, **kwargs):
raise NotImplementedError

def query_additions(self, queries: np.ndarray, k):
@staticmethod
def query_additions(
queries: np.ndarray, k, dtype, update_arrays_uri, nthreads=8
):
assert queries.dtype == np.float32
additions_vectors, additions_external_ids = self.read_additions()
additions_vectors, additions_external_ids, updated_ids = Index.read_additions(
update_arrays_uri
)
if additions_vectors is None:
return None, None
return None, None, updated_ids

queries_m = array_to_matrix(np.transpose(queries))
d, i = query_vq_heap_pyarray(
array_to_matrix(np.transpose(additions_vectors).astype(self.dtype)),
array_to_matrix(np.transpose(additions_vectors).astype(dtype)),
queries_m,
StdVector_u64(additions_external_ids),
k,
8)
return np.transpose(np.array(d)), np.transpose(np.array(i))
nthreads,
)
return np.transpose(np.array(d)), np.transpose(np.array(i)), updated_ids

@staticmethod
def read_additions(update_arrays_uri) -> (np.ndarray, np.array):
if update_arrays_uri is None:
return None, None, np.array([], np.uint64)
updates_array = tiledb.open(update_arrays_uri, mode="r")
q = updates_array.query(attrs=("vector",), coords=True)
data = q[:]
updates_array.close()
updated_ids = data["external_id"]
additions_filter = [len(item) > 0 for item in data["vector"]]
if len(data["external_id"][additions_filter]) > 0:
return (
np.vstack(data["vector"][additions_filter]),
data["external_id"][additions_filter],
updated_ids
)
else:
return None, None, updated_ids

def query_internal(self, queries: np.ndarray, k, **kwargs):
raise NotImplementedError

def update(self, vector: np.array, external_id: np.uint64):
updates_array = self.open_updates_array()
vectors = np.empty((1), dtype='O')
vectors = np.empty((1), dtype="O")
vectors[0] = vector
updates_array[external_id] = {'vector': vectors}
updates_array[external_id] = {"vector": vectors}
updates_array.close()
self.consolidate_update_fragments()

def update_batch(self, vectors: np.ndarray, external_ids: np.array):
updates_array = self.open_updates_array()
updates_array[external_ids] = {'vector': vectors}
updates_array[external_ids] = {"vector": vectors}
updates_array.close()
self.consolidate_update_fragments()

def delete(self, external_id: np.uint64):
updates_array = self.open_updates_array()
deletes = np.empty((1), dtype='O')
deletes = np.empty((1), dtype="O")
deletes[0] = np.array([], dtype=self.dtype)
updates_array[external_id] = {'vector': deletes}
updates_array[external_id] = {"vector": deletes}
updates_array.close()
self.consolidate_update_fragments()

def delete_batch(self, external_ids: np.array):
updates_array = self.open_updates_array()
deletes = np.empty((len(external_ids)), dtype='O')
deletes = np.empty((len(external_ids)), dtype="O")
for i in range(len(external_ids)):
deletes[i] = np.array([], dtype=self.dtype)
updates_array[external_ids] = {'vector': deletes}
updates_array[external_ids] = {"vector": deletes}
updates_array.close()
self.consolidate_update_fragments()

def consolidate_update_fragments(self):
fragments_info = tiledb.array_fragments(self.update_arrays_uri)
if(len(fragments_info) > 10):
if len(fragments_info) > 10:
tiledb.consolidate(self.update_arrays_uri)
tiledb.vacuum(self.update_arrays_uri)

def get_updates_uri(self):
return self.update_arrays_uri

def read_additions(self) -> (np.ndarray, np.array):
if self.update_arrays_uri is None:
return None, None
updates_array = tiledb.open(self.update_arrays_uri, mode="r")
q = updates_array.query(attrs=('vector',), coords=True)
data = q[:]
additions_filter = [len(item) > 0 for item in data["vector"]]
if len(data["external_id"][additions_filter]) > 0:
return np.vstack(data["vector"][additions_filter]), data["external_id"][additions_filter]
else:
return None, None
def read_updated_ids(self) -> np.array:
if self.update_arrays_uri is None:
return np.array([], np.uint64)
updates_array = tiledb.open(self.update_arrays_uri, mode="r")
q = updates_array.query(attrs=('vector',), coords=True)
data = q[:]
return data["external_id"]

def open_updates_array(self):
if self.update_arrays_uri is None:
updates_array_name = storage_formats[self.storage_version]["UPDATES_ARRAY_NAME"]
updates_array_name = storage_formats[self.storage_version][
"UPDATES_ARRAY_NAME"
]
updates_array_uri = f"{self.group.uri}/{updates_array_name}"
if tiledb.array_exists(updates_array_uri):
raise RuntimeError(f"Array {updates_array_uri} already exists.")
external_id_dim = tiledb.Dim(
name="external_id", domain=(0, MAX_UINT64-1), dtype=np.dtype(np.uint64)
name="external_id",
domain=(0, MAX_UINT64 - 1),
dtype=np.dtype(np.uint64),
)
dom = tiledb.Domain(external_id_dim)
vector_attr = tiledb.Attr(name="vector", dtype=self.dtype, var=True)
Expand All @@ -188,13 +221,30 @@ def open_updates_array(self):

def consolidate_updates(self):
from tiledb.vector_search.ingestion import ingest

new_index = ingest(
index_type=self.index_type,
index_uri=self.uri,
size=self.size,
source_uri=self.db_uri,
external_ids_uri=self.ids_uri,
updates_uri=self.update_arrays_uri
updates_uri=self.update_arrays_uri,
)
tiledb.Array.delete_array(self.update_arrays_uri)
self.group.close()
self.group = tiledb.Group(self.uri, "w", ctx=tiledb.Ctx(self.config))
self.group.remove(self.update_arrays_uri)
self.group.close()
return new_index

@staticmethod
def delete_index(uri, config):
try:
group = tiledb.Group(uri, "m", config=config)
except tiledb.TileDBError as err:
message = str(err)
if "group does not exist" in message:
return
else:
raise err
group.delete()
Loading

0 comments on commit c394d0b

Please sign in to comment.