Skip to content

Commit

Permalink
Partitioned Matrix API (#215)
Browse files Browse the repository at this point in the history
### Partitioned Matrix
 This PR moves the code for the refactored `PartitionedMatrix` from #173.  It recapitulates part of #153, which is primarily targeted at the `ivf_index` class. 
* The refactored `PartitionedMatrix` is a separate class and is used as a base class for `tdbPartitionedMatrix`. The `PartitionedMatrix` class encapsulates the partitioning indices, the partitioned vectors, and the partitioned ids and they are always self-consistent with each other and are completely contained in memory.  
* All of the management of out-of-core indices and so forth is provided by `tdbPartitionedMatrix`.
* This refactoring results in a significantly cleaner API for the query code.  The prior API included separate parameters for partitioned vectors, etc., while the new API uses the encapsulation of the `PartitionedMatrix`.
* The refactoring results in significantly simpler implementations (and may allow unification of finite and infinite RAM cases -- though that should be a later PR).
* Moved the updated Python bindings that use the new API for the query functions.
* Moved the test data siftsmall flat files from #173.

The `PartitionedMatrix` class and the refined API are important for implementing the `ivf_index` class.
  • Loading branch information
lums658 authored Jan 26, 2024
1 parent d60424d commit ebcf35b
Show file tree
Hide file tree
Showing 60 changed files with 6,013 additions and 3,056 deletions.
814 changes: 491 additions & 323 deletions apis/python/src/tiledb/vector_search/module.cc

Large diffs are not rendered by default.

42 changes: 32 additions & 10 deletions apis/python/src/tiledb/vector_search/module.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import tiledb
from tiledb.vector_search._tiledbvspy import *

import logging, pdb

def load_as_matrix(
path: str,
Expand Down Expand Up @@ -35,15 +36,15 @@ def load_as_matrix(
a = tiledb.ArraySchema.load(path, ctx=tiledb.Ctx(config))
dtype = a.attr(0).dtype
if dtype == np.float32:
m = tdbColMajorMatrix_f32(ctx, path, 0, 0, 0, size, timestamp)
m = tdbColMajorMatrix_f32(ctx, path, 0, 0, 0, size, 0, timestamp)
elif dtype == np.float64:
m = tdbColMajorMatrix_f64(ctx, path, 0, 0, 0, size, timestamp)
m = tdbColMajorMatrix_f64(ctx, path, 0, 0, 0, size, 0, timestamp)
elif dtype == np.int32:
m = tdbColMajorMatrix_i32(ctx, path, 0, 0, 0, size, timestamp)
m = tdbColMajorMatrix_i32(ctx, path, 0, 0, 0, size, 0, timestamp)
elif dtype == np.int32:
m = tdbColMajorMatrix_i64(ctx, path, 0, 0, 0, size, timestamp)
m = tdbColMajorMatrix_i64(ctx, path, 0, 0, 0, size, 0, timestamp)
elif dtype == np.uint8:
m = tdbColMajorMatrix_u8(ctx, path, 0, 0, 0, size, timestamp)
m = tdbColMajorMatrix_u8(ctx, path, 0, 0, 0, size, 0, timestamp)
# elif dtype == np.uint64:
# return tdbColMajorMatrix_u64(ctx, path, size, timestamp)
else:
Expand Down Expand Up @@ -80,6 +81,17 @@ def load_as_array(
return r


def debug_slice(m: "colMajorMatrix", name: str):
dtype = m.dtype
if (dtype == np.float32):
return debug_slice_f32(m, name)
elif (dtype == np.uint8):
return debug_slice_u8(m, name)
elif (dtype == np.uint64):
return debug_slice_u64(m, name)
else:
raise TypeError(f"Unsupported type: {dtype}!")

def query_vq_nth(db: "colMajorMatrix", *args):
"""
Run vector query
Expand Down Expand Up @@ -269,6 +281,7 @@ def ivf_query_ram(
if ctx is None:
ctx = Ctx({})


args = tuple(
[
parts_db,
Expand All @@ -282,6 +295,9 @@ def ivf_query_ram(
]
)

# logging.info(f">>>> ivf_query_ram len(indices): {len(indices)}, dtype: {dtype}, use_nuv_implementation: {use_nuv_implementation}")
# pdb.set_trace()

if dtype == np.float32:
if use_nuv_implementation:
return nuv_query_heap_infinite_ram_reg_blocked_f32(*args)
Expand Down Expand Up @@ -360,6 +376,8 @@ def ivf_query(
]
)

logging.info(f">>>> module.py: ivf_query_ram len(indices): {len(indices)}, dtype: {dtype}, use_nuv_implementation: {use_nuv_implementation}")

if dtype == np.float32:
if use_nuv_implementation:
return nuv_query_heap_finite_ram_reg_blocked_f32(*args)
Expand Down Expand Up @@ -399,17 +417,21 @@ def dist_qv(
ctx = Ctx({})
args = tuple(
[
ctx,
parts_uri,
active_partitions,
query_vectors,
active_queries,
ctx, # 0
parts_uri, # 1
StdVector_u64(active_partitions), # 2
query_vectors, # 3
active_queries, # 4
StdVector_u64(indices),
ids_uri,
k_nn,
timestamp,
]
)

# logging.info(f">>>> ivf_query_ram len(indices): {len(indices)}, dtype: {dtype},")
# pdb.set_trace()

if dtype == np.float32:
return dist_qv_f32(*args)
elif dtype == np.uint8:
Expand Down
9 changes: 6 additions & 3 deletions apis/python/test/array_paths.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,14 @@

import os
# TODO Use python Pathlib?
# TODO Get absolute path from cmake / setup / pytest
# TODO Get absolute path to project root from cmake / setup / pytest

test_data_path = os.path.join(os.path.dirname(__file__), "../../../")
# Use path relative to test file rather than to where pytest is invoked
this_file_path = os.path.join(os.path.dirname(__file__))
vector_search_root = this_file_path + "/../../../"

# vector_search_root = "../../"

vector_search_root = "../../"
test_data_root = vector_search_root + "external/test_data/"
test_array_root = test_data_root + "arrays/"
test_file_root = test_data_root + "files/"
Expand Down
50 changes: 46 additions & 4 deletions apis/python/test/test_index.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import numpy as np
from common import *
from array_paths import *
import pytest

import tiledb.vector_search.index as ind
Expand Down Expand Up @@ -56,6 +57,7 @@ def test_flat_index(tmp_path):
def test_ivf_flat_index(tmp_path):
partitions = 10
uri = os.path.join(tmp_path, "array")

index = ivf_flat_index.create(
uri=uri, dimensions=3, vector_type=np.dtype(np.uint8), partitions=partitions
)
Expand All @@ -68,9 +70,11 @@ def test_ivf_flat_index(tmp_path):
update_vectors[3] = np.array([3, 3, 3], dtype=np.dtype(np.uint8))
update_vectors[4] = np.array([4, 4, 4], dtype=np.dtype(np.uint8))
index.update_batch(vectors=update_vectors, external_ids=np.array([0, 1, 2, 3, 4]))

query_and_check(index, np.array([[2, 2, 2]], dtype=np.float32), 3, {1, 2, 3}, nprobe=partitions)

index = index.consolidate_updates()

query_and_check(index, np.array([[2, 2, 2]], dtype=np.float32), 3, {1, 2, 3}, nprobe=partitions)

index.delete_batch(external_ids=np.array([1, 3]))
Expand Down Expand Up @@ -130,8 +134,8 @@ def test_index_with_incorrect_dimensions(tmp_path):
index.query(np.array([[1, 1, 1]], dtype=np.float32), k=3)

def test_index_with_incorrect_num_of_query_columns_simple(tmp_path):
siftsmall_uri = "test/data/siftsmall/siftsmall_base.fvecs"
queries_uri = "test/data/siftsmall/siftsmall_query.fvecs"
siftsmall_uri = siftsmall_inputs_file
queries_uri = siftsmall_query_file
indexes = ["FLAT", "IVF_FLAT"]
for index_type in indexes:
index_uri = os.path.join(tmp_path, f"sift10k_flat_{index_type}")
Expand All @@ -152,7 +156,7 @@ def test_index_with_incorrect_num_of_query_columns_simple(tmp_path):
index.query(queries, k=10)

def test_index_with_incorrect_num_of_query_columns_complex(tmp_path):
# Tests that we raise a TypeError if the number of columns in the query is not the same as the
# Tests that we raise a TypeError if the number of columns in the query is not the same as the
# number of columns in the indexed data.
size=1000
indexes = ["FLAT", "IVF_FLAT"]
Expand All @@ -164,7 +168,7 @@ def test_index_with_incorrect_num_of_query_columns_complex(tmp_path):
create_random_dataset_f32_only_data(nb=size, d=num_columns, centers=1, path=dataset_dir)
index = ingest(index_type=index_type, index_uri=index_uri, source_uri=os.path.join(dataset_dir, "data.f32bin"))

# We have created a dataset with num_columns in each vector. Let's try creating queries
# We have created a dataset with num_columns in each vector. Let's try creating queries
# with different numbers of columns and confirming incorrect ones will throw.
for num_columns_for_query in range(1, num_columns + 2):
query_shape = (1, num_columns_for_query)
Expand All @@ -174,3 +178,41 @@ def test_index_with_incorrect_num_of_query_columns_complex(tmp_path):
else:
with pytest.raises(TypeError):
index.query(query, k=1)


# TODO(paris): This will throw with the following error. Fix and re-enable, then remove
# test_index_with_incorrect_num_of_query_columns_in_single_vector_query:
# def array_to_matrix(array: np.ndarray):
# if array.dtype == np.float32:
# > return pyarray_copyto_matrix_f32(array)
# E RuntimeError: Number of dimensions must be two
# Here we test with a query which is just a vector, i.e. [1, 2, 3].
# query = query[0]
# if num_columns_for_query == num_columns:
# index.query(query, k=1)
# else:
# with pytest.raises(TypeError):
# index.query(query, k=1)

def test_index_with_incorrect_num_of_query_columns_in_single_vector_query(tmp_path):
# Tests that we raise a TypeError if the number of columns in the query is not the same as the
# number of columns in the indexed data, specifically for a single vector query.
# i.e. queries = [1, 2, 3] instead of queries = [[1, 2, 3], [4, 5, 6]].
indexes = [flat_index, ivf_flat_index]
for index_type in indexes:
uri = os.path.join(tmp_path, f"array_{index_type.__name__}")
index = index_type.create(uri=uri, dimensions=3, vector_type=np.dtype(np.uint8))

# Wrong number of columns will raise a TypeError.
with pytest.raises(TypeError):
index.query(np.array([1], dtype=np.float32), k=3)
with pytest.raises(TypeError):
index.query(np.array([1, 1], dtype=np.float32), k=3)
with pytest.raises(TypeError):
index.query(np.array([1, 1, 1, 1], dtype=np.float32), k=3)

# TODO: This also throws a TypeError for incorrect dimension
with pytest.raises(TypeError):
index.query(np.array([1, 1, 1], dtype=np.float32), k=3)


71 changes: 37 additions & 34 deletions apis/python/test/test_ingestion.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import time
import numpy as np
from common import *
from array_paths import *
import pytest

from tiledb.cloud.dag import Mode
Expand All @@ -19,6 +20,7 @@ def query_and_check_equals(index, queries, expected_result_d, expected_result_i)
check_equals(result_d=result_d, result_i=result_i, expected_result_d=expected_result_d, expected_result_i=expected_result_i)



def test_flat_ingestion_u8(tmp_path):
dataset_dir = os.path.join(tmp_path, "dataset")
index_uri = os.path.join(tmp_path, "array")
Expand Down Expand Up @@ -196,9 +198,9 @@ def test_ivf_flat_ingestion_f32(tmp_path):


def test_ivf_flat_ingestion_fvec(tmp_path):
source_uri = "test/data/siftsmall/siftsmall_base.fvecs"
queries_uri = "test/data/siftsmall/siftsmall_query.fvecs"
gt_uri = "test/data/siftsmall/siftsmall_groundtruth.ivecs"
source_uri = siftsmall_inputs_file
queries_uri = siftsmall_query_file
gt_uri = siftsmall_groundtruth_file
index_uri = os.path.join(tmp_path, "array")
k = 100
partitions = 100
Expand Down Expand Up @@ -236,9 +238,9 @@ def test_ivf_flat_ingestion_fvec(tmp_path):


def test_ivf_flat_ingestion_numpy(tmp_path):
source_uri = "test/data/siftsmall/siftsmall_base.fvecs"
queries_uri = "test/data/siftsmall/siftsmall_query.fvecs"
gt_uri = "test/data/siftsmall/siftsmall_groundtruth.ivecs"
source_uri = siftsmall_inputs_file
queries_uri = siftsmall_query_file
gt_uri = siftsmall_groundtruth_file
index_uri = os.path.join(tmp_path, "array")
k = 100
partitions = 100
Expand Down Expand Up @@ -277,9 +279,9 @@ def test_ivf_flat_ingestion_numpy(tmp_path):


def test_ivf_flat_ingestion_multiple_workers(tmp_path):
source_uri = "test/data/siftsmall/siftsmall_base.fvecs"
queries_uri = "test/data/siftsmall/siftsmall_query.fvecs"
gt_uri = "test/data/siftsmall/siftsmall_groundtruth.ivecs"
source_uri = siftsmall_inputs_file
queries_uri = siftsmall_query_file
gt_uri = siftsmall_groundtruth_file
index_uri = os.path.join(tmp_path, "array")
k = 100
partitions = 100
Expand Down Expand Up @@ -319,9 +321,9 @@ def test_ivf_flat_ingestion_multiple_workers(tmp_path):


def test_ivf_flat_ingestion_external_ids_numpy(tmp_path):
source_uri = "test/data/siftsmall/siftsmall_base.fvecs"
queries_uri = "test/data/siftsmall/siftsmall_query.fvecs"
gt_uri = "test/data/siftsmall/siftsmall_groundtruth.ivecs"
source_uri = siftsmall_inputs_file
queries_uri = siftsmall_query_file
gt_uri = siftsmall_groundtruth_file
index_uri = os.path.join(tmp_path, "array")
k = 100
partitions = 100
Expand Down Expand Up @@ -452,6 +454,7 @@ def test_ivf_flat_ingestion_with_batch_updates(tmp_path):
assert accuracy(result, gt_i, updated_ids=updated_ids) > 0.99



def test_ivf_flat_ingestion_with_updates_and_timetravel(tmp_path):
dataset_dir = os.path.join(tmp_path, "dataset")
index_uri = os.path.join(tmp_path, "array")
Expand Down Expand Up @@ -506,16 +509,16 @@ def test_ivf_flat_ingestion_with_updates_and_timetravel(tmp_path):
index = IVFFlatIndex(uri=index_uri, timestamp=(2, 101))
_, result = index.query(queries, k=k, nprobe=index.partitions)
assert (
0.05
<= accuracy(result, gt_i, updated_ids=updated_ids, only_updated_ids=True)
<= 0.15
0.05
<= accuracy(result, gt_i, updated_ids=updated_ids, only_updated_ids=True)
<= 0.15
)
index = IVFFlatIndex(uri=index_uri, timestamp=(2, None))
_, result = index.query(queries, k=k, nprobe=index.partitions)
assert (
0.05
<= accuracy(result, gt_i, updated_ids=updated_ids, only_updated_ids=True)
<= 0.15
0.05
<= accuracy(result, gt_i, updated_ids=updated_ids, only_updated_ids=True)
<= 0.15
)

# Timetravel with partial read from updates table
Expand All @@ -532,9 +535,9 @@ def test_ivf_flat_ingestion_with_updates_and_timetravel(tmp_path):
index = IVFFlatIndex(uri=index_uri, timestamp=(2, 51))
_, result = index.query(queries, k=k, nprobe=index.partitions)
assert (
0.02
<= accuracy(result, gt_i, updated_ids=updated_ids, only_updated_ids=True)
<= 0.07
0.02
<= accuracy(result, gt_i, updated_ids=updated_ids, only_updated_ids=True)
<= 0.07
)

# Timetravel at previous ingestion timestamp
Expand All @@ -560,16 +563,16 @@ def test_ivf_flat_ingestion_with_updates_and_timetravel(tmp_path):
index = IVFFlatIndex(uri=index_uri, timestamp=(2, 101))
_, result = index.query(queries, k=k, nprobe=index.partitions)
assert (
0.05
<= accuracy(result, gt_i, updated_ids=updated_ids, only_updated_ids=True)
<= 0.15
0.05
<= accuracy(result, gt_i, updated_ids=updated_ids, only_updated_ids=True)
<= 0.15
)
index = IVFFlatIndex(uri=index_uri, timestamp=(2, None))
_, result = index.query(queries, k=k, nprobe=index.partitions)
assert (
0.05
<= accuracy(result, gt_i, updated_ids=updated_ids, only_updated_ids=True)
<= 0.15
0.05
<= accuracy(result, gt_i, updated_ids=updated_ids, only_updated_ids=True)
<= 0.15
)

# Timetravel with partial read from updates table
Expand All @@ -586,9 +589,9 @@ def test_ivf_flat_ingestion_with_updates_and_timetravel(tmp_path):
index = IVFFlatIndex(uri=index_uri, timestamp=(2, 51))
_, result = index.query(queries, k=k, nprobe=index.partitions)
assert (
0.02
<= accuracy(result, gt_i, updated_ids=updated_ids, only_updated_ids=True)
<= 0.07
0.02
<= accuracy(result, gt_i, updated_ids=updated_ids, only_updated_ids=True)
<= 0.07
)

# Timetravel at previous ingestion timestamp
Expand Down Expand Up @@ -752,9 +755,9 @@ def test_ivf_flat_ingestion_tdb_random_sampling_policy(tmp_path):


def test_ivf_flat_ingestion_fvec_random_sampling_policy(tmp_path):
source_uri = "test/data/siftsmall/siftsmall_base.fvecs"
queries_uri = "test/data/siftsmall/siftsmall_query.fvecs"
gt_uri = "test/data/siftsmall/siftsmall_groundtruth.ivecs"
source_uri = siftsmall_inputs_file
queries_uri = siftsmall_query_file
gt_uri = siftsmall_groundtruth_file
index_uri = os.path.join(tmp_path, "array")
k = 100
partitions = 50
Expand Down Expand Up @@ -1172,4 +1175,4 @@ def test_ivf_flat_ingestion_with_training_source_uri_numpy(tmp_path):
index = index.consolidate_updates(retrain_index=True, training_sample_size=3)

queries = np.array([update_vectors[0]], dtype=np.float32)
query_and_check_equals(index=index, queries=queries, expected_result_d=[[0]], expected_result_i=[[1003]])
query_and_check_equals(index=index, queries=queries, expected_result_d=[[0]], expected_result_i=[[1003]])
Loading

0 comments on commit ebcf35b

Please sign in to comment.