Skip to content

Commit

Permalink
Update IVF_PQ to use temp directory pattern to support parallel inges…
Browse files Browse the repository at this point in the history
…tion (#554)
  • Loading branch information
jparismorgan authored Oct 18, 2024
1 parent eaf1a5f commit 37f2f9d
Show file tree
Hide file tree
Showing 7 changed files with 195 additions and 135 deletions.
70 changes: 40 additions & 30 deletions apis/python/src/tiledb/vector_search/ingestion.py
Original file line number Diff line number Diff line change
Expand Up @@ -317,16 +317,11 @@ def ingest(
EXTERNAL_IDS_ARRAY_NAME = storage_formats[storage_version][
"EXTERNAL_IDS_ARRAY_NAME"
]
if index_type == "IVF_PQ":
PARTIAL_WRITE_ARRAY_DIR = storage_formats[storage_version][
"PARTIAL_WRITE_ARRAY_DIR"
]
else:
PARTIAL_WRITE_ARRAY_DIR = (
storage_formats[storage_version]["PARTIAL_WRITE_ARRAY_DIR"]
+ "_"
+ "".join(random.choices(string.ascii_letters, k=10))
)
PARTIAL_WRITE_ARRAY_DIR = (
storage_formats[storage_version]["PARTIAL_WRITE_ARRAY_DIR"]
+ "_"
+ "".join(random.choices(string.ascii_letters, k=10))
)
DEFAULT_ATTR_FILTERS = storage_formats[storage_version]["DEFAULT_ATTR_FILTERS"]

# This is used to auto-configure `input_vectors_per_work_item`
Expand Down Expand Up @@ -603,13 +598,26 @@ def create_temp_data_group(
group: tiledb.Group,
) -> tiledb.Group:
partial_write_array_dir_uri = f"{group.uri}/{PARTIAL_WRITE_ARRAY_DIR}"
try:
tiledb.group_create(partial_write_array_dir_uri)
add_to_group(group, partial_write_array_dir_uri, PARTIAL_WRITE_ARRAY_DIR)
except tiledb.TileDBError as err:
message = str(err)
if "already exists" not in message:
raise err
if index_type == "IVF_PQ":
ctx = vspy.Ctx(config)
index = vspy.IndexIVFPQ(
ctx,
index_group_uri,
vspy.IndexLoadStrategy.PQ_INDEX,
0,
to_temporal_policy(index_timestamp),
)
index.create_temp_data_group(PARTIAL_WRITE_ARRAY_DIR)
else:
try:
tiledb.group_create(partial_write_array_dir_uri)
add_to_group(
group, partial_write_array_dir_uri, PARTIAL_WRITE_ARRAY_DIR
)
except tiledb.TileDBError as err:
message = str(err)
if "already exists" not in message:
raise err
return tiledb.Group(partial_write_array_dir_uri, "w")

def create_partial_write_array_group(
Expand Down Expand Up @@ -779,16 +787,6 @@ def create_arrays(
create_index_array=True,
asset_creation_threads=asset_creation_threads,
)
elif index_type == "IVF_PQ":
ctx = vspy.Ctx(config)
index = vspy.IndexIVFPQ(
ctx,
index_group_uri,
vspy.IndexLoadStrategy.PQ_INDEX,
0,
to_temporal_policy(index_timestamp),
)
index.create_temp_data_group()
# Note that we don't create type-erased indexes (i.e. Vamana) here. Instead we create them
# at very start of ingest() in C++.
elif not is_type_erased_index(index_type):
Expand Down Expand Up @@ -1294,9 +1292,13 @@ def ivf_pq_train_udf(
else np.empty((0, dimensions), dtype=vector_type)
)

# Filter out the updated vectors from the sample vectors.
# NOTE: We add kind='sort' as a workaround to this bug: https://github.com/numpy/numpy/issues/26922
updates_filter = np.in1d(
external_ids, updated_ids, assume_unique=True, invert=True
external_ids,
updated_ids,
assume_unique=True,
invert=True,
kind="sort",
)
sample_vectors = sample_vectors[updates_filter]

Expand Down Expand Up @@ -1922,6 +1924,7 @@ def ingest_vectors_udf(
start=part,
end=part_end,
partition_start=part_id * (partitions + 1),
partial_write_array_dir=PARTIAL_WRITE_ARRAY_DIR,
)
else:
ivf_index_tdb(
Expand Down Expand Up @@ -1979,6 +1982,7 @@ def ingest_vectors_udf(
start=part,
end=part_end,
partition_start=part_id * (partitions + 1),
partial_write_array_dir=PARTIAL_WRITE_ARRAY_DIR,
)
else:
ivf_index(
Expand Down Expand Up @@ -2069,6 +2073,7 @@ def ingest_additions_udf(
start=write_offset,
end=0,
partition_start=partition_start,
partial_write_array_dir=PARTIAL_WRITE_ARRAY_DIR,
)
else:
ivf_index(
Expand Down Expand Up @@ -2161,7 +2166,12 @@ def ivf_pq_consolidate_partition_udf(
to_temporal_policy(index_timestamp),
)
index.consolidate_partitions(
partitions, work_items, partition_id_start, partition_id_end, batch
partitions=partitions,
work_items=work_items,
partition_id_start=partition_id_start,
partition_id_end=partition_id_end,
batch=batch,
partial_write_array_dir=PARTIAL_WRITE_ARRAY_DIR,
)

def consolidate_partition_udf(
Expand Down
23 changes: 16 additions & 7 deletions apis/python/src/tiledb/vector_search/type_erased_module.cc
Original file line number Diff line number Diff line change
Expand Up @@ -542,7 +542,10 @@ void init_type_erased_module(py::module_& m) {
py::arg("temporal_policy") = std::nullopt)
.def(
"create_temp_data_group",
[](IndexIVFPQ& index) { index.create_temp_data_group(); })
[](IndexIVFPQ& index, const std::string& partial_write_array_dir) {
index.create_temp_data_group(partial_write_array_dir);
},
py::arg("partial_write_array_dir"))
.def(
"train",
[](IndexIVFPQ& index,
Expand Down Expand Up @@ -577,41 +580,47 @@ void init_type_erased_module(py::module_& m) {
const FeatureVector& deleted_ids,
size_t start,
size_t end,
size_t partition_start) {
size_t partition_start,
const std::string& partial_write_array_dir) {
index.ingest_parts(
input_vectors,
external_ids,
deleted_ids,
start,
end,
partition_start);
partition_start,
partial_write_array_dir);
},
py::arg("input_vectors"),
py::arg("external_ids"),
py::arg("deleted_ids"),
py::arg("start"),
py::arg("end"),
py::arg("partition_start"))
py::arg("partition_start"),
py::arg("partial_write_array_dir"))
.def(
"consolidate_partitions",
[](IndexIVFPQ& index,
size_t partitions,
size_t work_items,
size_t partition_id_start,
size_t partition_id_end,
size_t batch) {
size_t batch,
const std::string& partial_write_array_dir) {
index.consolidate_partitions(
partitions,
work_items,
partition_id_start,
partition_id_end,
batch);
batch,
partial_write_array_dir);
},
py::arg("partitions"),
py::arg("work_items"),
py::arg("partition_id_start"),
py::arg("partition_id_end"),
py::arg("batch"))
py::arg("batch"),
py::arg("partial_write_array_dir"))
.def(
"ingest",
[](IndexIVFPQ& index, const FeatureVectorArray& input_vectors) {
Expand Down
63 changes: 47 additions & 16 deletions src/include/api/ivf_pq_index.h
Original file line number Diff line number Diff line change
Expand Up @@ -172,12 +172,12 @@ class IndexIVFPQ {
dimensions_ = index_->dimensions();
}

void create_temp_data_group() {
void create_temp_data_group(const std::string& partial_write_array_dir) {
if (!index_) {
throw std::runtime_error(
"Cannot create_temp_data_group() because there is no index.");
}
index_->create_temp_data_group();
index_->create_temp_data_group(partial_write_array_dir);
}

/**
Expand Down Expand Up @@ -231,7 +231,8 @@ class IndexIVFPQ {
const FeatureVector& deleted_ids,
size_t start,
size_t end,
size_t partition_start) {
size_t partition_start,
const std::string& partial_write_array_dir) {
if (feature_datatype_ != input_vectors.feature_type()) {
throw std::runtime_error(
"[ivf_pq_index@ingest_parts] Feature datatype mismatch: " +
Expand All @@ -243,7 +244,13 @@ class IndexIVFPQ {
"Cannot ingest_parts() because there is no index.");
}
index_->ingest_parts(
input_vectors, external_ids, deleted_ids, start, end, partition_start);
input_vectors,
external_ids,
deleted_ids,
start,
end,
partition_start,
partial_write_array_dir);
}

void ingest(
Expand All @@ -266,14 +273,20 @@ class IndexIVFPQ {
size_t work_items,
size_t partition_id_start,
size_t partition_id_end,
size_t batch) {
size_t batch,
const std::string& partial_write_array_dir) {
if (!index_) {
throw std::runtime_error(
"[ivf_pq_index@consolidate_partitions] Cannot "
"consolidate_partitions() because there is no index.");
}
index_->consolidate_partitions(
partitions, work_items, partition_id_start, partition_id_end, batch);
partitions,
work_items,
partition_id_start,
partition_id_end,
batch,
partial_write_array_dir);
}

[[nodiscard]] auto query(
Expand Down Expand Up @@ -413,7 +426,8 @@ class IndexIVFPQ {
struct index_base {
virtual ~index_base() = default;

virtual void create_temp_data_group() = 0;
virtual void create_temp_data_group(
const std::string& partial_write_array_dir) = 0;

virtual void train(
const FeatureVectorArray& training_set,
Expand All @@ -426,7 +440,8 @@ class IndexIVFPQ {
const FeatureVector& deleted_ids,
size_t start,
size_t end,
size_t partition_start) = 0;
size_t partition_start,
const std::string& partial_write_array_dir) = 0;

virtual void ingest(
const FeatureVectorArray& input_vectors,
Expand All @@ -437,7 +452,8 @@ class IndexIVFPQ {
size_t work_items,
size_t partition_id_start,
size_t partition_id_end,
size_t batch) = 0;
size_t batch,
const std::string& partial_write_array_dir) = 0;

[[nodiscard]] virtual std::tuple<FeatureVectorArray, FeatureVectorArray>
query(
Expand Down Expand Up @@ -499,8 +515,9 @@ class IndexIVFPQ {
temporal_policy) {
}

void create_temp_data_group() override {
impl_index_.create_temp_data_group();
void create_temp_data_group(
const std::string& partial_write_array_dir) override {
impl_index_.create_temp_data_group(partial_write_array_dir);
}

void train(
Expand All @@ -521,7 +538,8 @@ class IndexIVFPQ {
const FeatureVector& deleted_ids,
size_t start,
size_t end,
size_t partition_start) override {
size_t partition_start,
const std::string& partial_write_array_dir) override {
using feature_type = typename T::feature_type;
using id_type = typename T::id_type;
auto fspan = MatrixView<feature_type, stdx::layout_left>{
Expand All @@ -534,7 +552,13 @@ class IndexIVFPQ {
auto ids = std::vector<id_type>(::num_vectors(input_vectors));
std::iota(ids.begin(), ids.end(), start);
impl_index_.ingest_parts(
fspan, ids, deleted_ids_span, start, end, partition_start);
fspan,
ids,
deleted_ids_span,
start,
end,
partition_start,
partial_write_array_dir);
} else {
auto external_ids_span = std::span<id_type>(
(id_type*)external_ids.data(), external_ids.dimensions());
Expand All @@ -544,7 +568,8 @@ class IndexIVFPQ {
deleted_ids_span,
start,
end,
partition_start);
partition_start,
partial_write_array_dir);
}
}

Expand Down Expand Up @@ -573,9 +598,15 @@ class IndexIVFPQ {
size_t work_items,
size_t partition_id_start,
size_t partition_id_end,
size_t batch) override {
size_t batch,
const std::string& partial_write_array_dir) override {
impl_index_.consolidate_partitions(
partitions, work_items, partition_id_start, partition_id_end, batch);
partitions,
work_items,
partition_id_start,
partition_id_end,
batch,
partial_write_array_dir);
}

/**
Expand Down
Loading

0 comments on commit 37f2f9d

Please sign in to comment.