diff --git a/apis/python/src/tiledb/vector_search/ingestion.py b/apis/python/src/tiledb/vector_search/ingestion.py index 3644bf0bf..b5652be62 100644 --- a/apis/python/src/tiledb/vector_search/ingestion.py +++ b/apis/python/src/tiledb/vector_search/ingestion.py @@ -317,16 +317,11 @@ def ingest( EXTERNAL_IDS_ARRAY_NAME = storage_formats[storage_version][ "EXTERNAL_IDS_ARRAY_NAME" ] - if index_type == "IVF_PQ": - PARTIAL_WRITE_ARRAY_DIR = storage_formats[storage_version][ - "PARTIAL_WRITE_ARRAY_DIR" - ] - else: - PARTIAL_WRITE_ARRAY_DIR = ( - storage_formats[storage_version]["PARTIAL_WRITE_ARRAY_DIR"] - + "_" - + "".join(random.choices(string.ascii_letters, k=10)) - ) + PARTIAL_WRITE_ARRAY_DIR = ( + storage_formats[storage_version]["PARTIAL_WRITE_ARRAY_DIR"] + + "_" + + "".join(random.choices(string.ascii_letters, k=10)) + ) DEFAULT_ATTR_FILTERS = storage_formats[storage_version]["DEFAULT_ATTR_FILTERS"] # This is used to auto-configure `input_vectors_per_work_item` @@ -603,13 +598,26 @@ def create_temp_data_group( group: tiledb.Group, ) -> tiledb.Group: partial_write_array_dir_uri = f"{group.uri}/{PARTIAL_WRITE_ARRAY_DIR}" - try: - tiledb.group_create(partial_write_array_dir_uri) - add_to_group(group, partial_write_array_dir_uri, PARTIAL_WRITE_ARRAY_DIR) - except tiledb.TileDBError as err: - message = str(err) - if "already exists" not in message: - raise err + if index_type == "IVF_PQ": + ctx = vspy.Ctx(config) + index = vspy.IndexIVFPQ( + ctx, + index_group_uri, + vspy.IndexLoadStrategy.PQ_INDEX, + 0, + to_temporal_policy(index_timestamp), + ) + index.create_temp_data_group(PARTIAL_WRITE_ARRAY_DIR) + else: + try: + tiledb.group_create(partial_write_array_dir_uri) + add_to_group( + group, partial_write_array_dir_uri, PARTIAL_WRITE_ARRAY_DIR + ) + except tiledb.TileDBError as err: + message = str(err) + if "already exists" not in message: + raise err return tiledb.Group(partial_write_array_dir_uri, "w") def create_partial_write_array_group( @@ -779,16 +787,6 @@ def create_arrays( create_index_array=True, asset_creation_threads=asset_creation_threads, ) - elif index_type == "IVF_PQ": - ctx = vspy.Ctx(config) - index = vspy.IndexIVFPQ( - ctx, - index_group_uri, - vspy.IndexLoadStrategy.PQ_INDEX, - 0, - to_temporal_policy(index_timestamp), - ) - index.create_temp_data_group() # Note that we don't create type-erased indexes (i.e. Vamana) here. Instead we create them # at very start of ingest() in C++. elif not is_type_erased_index(index_type): @@ -1294,9 +1292,13 @@ def ivf_pq_train_udf( else np.empty((0, dimensions), dtype=vector_type) ) - # Filter out the updated vectors from the sample vectors. + # NOTE: We add kind='sort' as a workaround to this bug: https://github.com/numpy/numpy/issues/26922 updates_filter = np.in1d( - external_ids, updated_ids, assume_unique=True, invert=True + external_ids, + updated_ids, + assume_unique=True, + invert=True, + kind="sort", ) sample_vectors = sample_vectors[updates_filter] @@ -1922,6 +1924,7 @@ def ingest_vectors_udf( start=part, end=part_end, partition_start=part_id * (partitions + 1), + partial_write_array_dir=PARTIAL_WRITE_ARRAY_DIR, ) else: ivf_index_tdb( @@ -1979,6 +1982,7 @@ def ingest_vectors_udf( start=part, end=part_end, partition_start=part_id * (partitions + 1), + partial_write_array_dir=PARTIAL_WRITE_ARRAY_DIR, ) else: ivf_index( @@ -2069,6 +2073,7 @@ def ingest_additions_udf( start=write_offset, end=0, partition_start=partition_start, + partial_write_array_dir=PARTIAL_WRITE_ARRAY_DIR, ) else: ivf_index( @@ -2161,7 +2166,12 @@ def ivf_pq_consolidate_partition_udf( to_temporal_policy(index_timestamp), ) index.consolidate_partitions( - partitions, work_items, partition_id_start, partition_id_end, batch + partitions=partitions, + work_items=work_items, + partition_id_start=partition_id_start, + partition_id_end=partition_id_end, + batch=batch, + partial_write_array_dir=PARTIAL_WRITE_ARRAY_DIR, ) def consolidate_partition_udf( diff --git a/apis/python/src/tiledb/vector_search/type_erased_module.cc b/apis/python/src/tiledb/vector_search/type_erased_module.cc index 6946d4815..03df6f2cc 100644 --- a/apis/python/src/tiledb/vector_search/type_erased_module.cc +++ b/apis/python/src/tiledb/vector_search/type_erased_module.cc @@ -542,7 +542,10 @@ void init_type_erased_module(py::module_& m) { py::arg("temporal_policy") = std::nullopt) .def( "create_temp_data_group", - [](IndexIVFPQ& index) { index.create_temp_data_group(); }) + [](IndexIVFPQ& index, const std::string& partial_write_array_dir) { + index.create_temp_data_group(partial_write_array_dir); + }, + py::arg("partial_write_array_dir")) .def( "train", [](IndexIVFPQ& index, @@ -577,21 +580,24 @@ void init_type_erased_module(py::module_& m) { const FeatureVector& deleted_ids, size_t start, size_t end, - size_t partition_start) { + size_t partition_start, + const std::string& partial_write_array_dir) { index.ingest_parts( input_vectors, external_ids, deleted_ids, start, end, - partition_start); + partition_start, + partial_write_array_dir); }, py::arg("input_vectors"), py::arg("external_ids"), py::arg("deleted_ids"), py::arg("start"), py::arg("end"), - py::arg("partition_start")) + py::arg("partition_start"), + py::arg("partial_write_array_dir")) .def( "consolidate_partitions", [](IndexIVFPQ& index, @@ -599,19 +605,22 @@ void init_type_erased_module(py::module_& m) { size_t work_items, size_t partition_id_start, size_t partition_id_end, - size_t batch) { + size_t batch, + const std::string& partial_write_array_dir) { index.consolidate_partitions( partitions, work_items, partition_id_start, partition_id_end, - batch); + batch, + partial_write_array_dir); }, py::arg("partitions"), py::arg("work_items"), py::arg("partition_id_start"), py::arg("partition_id_end"), - py::arg("batch")) + py::arg("batch"), + py::arg("partial_write_array_dir")) .def( "ingest", [](IndexIVFPQ& index, const FeatureVectorArray& input_vectors) { diff --git a/src/include/api/ivf_pq_index.h b/src/include/api/ivf_pq_index.h index 9388c44bb..6d4944d12 100644 --- a/src/include/api/ivf_pq_index.h +++ b/src/include/api/ivf_pq_index.h @@ -172,12 +172,12 @@ class IndexIVFPQ { dimensions_ = index_->dimensions(); } - void create_temp_data_group() { + void create_temp_data_group(const std::string& partial_write_array_dir) { if (!index_) { throw std::runtime_error( "Cannot create_temp_data_group() because there is no index."); } - index_->create_temp_data_group(); + index_->create_temp_data_group(partial_write_array_dir); } /** @@ -231,7 +231,8 @@ class IndexIVFPQ { const FeatureVector& deleted_ids, size_t start, size_t end, - size_t partition_start) { + size_t partition_start, + const std::string& partial_write_array_dir) { if (feature_datatype_ != input_vectors.feature_type()) { throw std::runtime_error( "[ivf_pq_index@ingest_parts] Feature datatype mismatch: " + @@ -243,7 +244,13 @@ class IndexIVFPQ { "Cannot ingest_parts() because there is no index."); } index_->ingest_parts( - input_vectors, external_ids, deleted_ids, start, end, partition_start); + input_vectors, + external_ids, + deleted_ids, + start, + end, + partition_start, + partial_write_array_dir); } void ingest( @@ -266,14 +273,20 @@ class IndexIVFPQ { size_t work_items, size_t partition_id_start, size_t partition_id_end, - size_t batch) { + size_t batch, + const std::string& partial_write_array_dir) { if (!index_) { throw std::runtime_error( "[ivf_pq_index@consolidate_partitions] Cannot " "consolidate_partitions() because there is no index."); } index_->consolidate_partitions( - partitions, work_items, partition_id_start, partition_id_end, batch); + partitions, + work_items, + partition_id_start, + partition_id_end, + batch, + partial_write_array_dir); } [[nodiscard]] auto query( @@ -413,7 +426,8 @@ class IndexIVFPQ { struct index_base { virtual ~index_base() = default; - virtual void create_temp_data_group() = 0; + virtual void create_temp_data_group( + const std::string& partial_write_array_dir) = 0; virtual void train( const FeatureVectorArray& training_set, @@ -426,7 +440,8 @@ class IndexIVFPQ { const FeatureVector& deleted_ids, size_t start, size_t end, - size_t partition_start) = 0; + size_t partition_start, + const std::string& partial_write_array_dir) = 0; virtual void ingest( const FeatureVectorArray& input_vectors, @@ -437,7 +452,8 @@ class IndexIVFPQ { size_t work_items, size_t partition_id_start, size_t partition_id_end, - size_t batch) = 0; + size_t batch, + const std::string& partial_write_array_dir) = 0; [[nodiscard]] virtual std::tuple query( @@ -499,8 +515,9 @@ class IndexIVFPQ { temporal_policy) { } - void create_temp_data_group() override { - impl_index_.create_temp_data_group(); + void create_temp_data_group( + const std::string& partial_write_array_dir) override { + impl_index_.create_temp_data_group(partial_write_array_dir); } void train( @@ -521,7 +538,8 @@ class IndexIVFPQ { const FeatureVector& deleted_ids, size_t start, size_t end, - size_t partition_start) override { + size_t partition_start, + const std::string& partial_write_array_dir) override { using feature_type = typename T::feature_type; using id_type = typename T::id_type; auto fspan = MatrixView{ @@ -534,7 +552,13 @@ class IndexIVFPQ { auto ids = std::vector(::num_vectors(input_vectors)); std::iota(ids.begin(), ids.end(), start); impl_index_.ingest_parts( - fspan, ids, deleted_ids_span, start, end, partition_start); + fspan, + ids, + deleted_ids_span, + start, + end, + partition_start, + partial_write_array_dir); } else { auto external_ids_span = std::span( (id_type*)external_ids.data(), external_ids.dimensions()); @@ -544,7 +568,8 @@ class IndexIVFPQ { deleted_ids_span, start, end, - partition_start); + partition_start, + partial_write_array_dir); } } @@ -573,9 +598,15 @@ class IndexIVFPQ { size_t work_items, size_t partition_id_start, size_t partition_id_end, - size_t batch) override { + size_t batch, + const std::string& partial_write_array_dir) override { impl_index_.consolidate_partitions( - partitions, work_items, partition_id_start, partition_id_end, batch); + partitions, + work_items, + partition_id_start, + partition_id_end, + batch, + partial_write_array_dir); } /** diff --git a/src/include/index/index_group.h b/src/include/index/index_group.h index 60f5ffe74..125fd41fd 100644 --- a/src/include/index/index_group.h +++ b/src/include/index/index_group.h @@ -108,7 +108,6 @@ class base_index_group { // `tiledb://foo/edc4656a-3f45-43a1-8ee5-fa692a015c53` which cannot have the // array name added as a suffix. std::unordered_map array_name_to_uri_; - std::unordered_map array_name_to_temp_uri_; /** Lookup an array name given an array key */ constexpr auto array_key_to_array_name(const std::string& array_key) const { @@ -130,10 +129,6 @@ class base_index_group { array_key_to_array_name_[array_key] = array_name; array_name_to_uri_[array_name] = array_name_to_uri(group_uri_, array_name); - array_name_to_temp_uri_[array_name] = array_name_to_temp_uri( - group_uri_, - storage_formats[version_]["partial_write_array_dir"], - array_name); } static_cast(this)->append_valid_array_names_impl(); } @@ -283,16 +278,13 @@ class base_index_group { } constexpr std::string array_key_to_temp_uri( - const std::string& array_key) const { + const std::string& array_key, + const std::string& partial_write_array_dir) const { auto name = array_key_to_array_name(array_key); - if (array_name_to_temp_uri_.find(name) == array_name_to_temp_uri_.end()) { - throw std::runtime_error( - "[index_group@array_key_to_temp_uri] Invalid key when getting the " - "URI: " + - array_key + ". Name does not exist: " + name); - } - - return array_name_to_temp_uri_.at(name); + return (std::filesystem::path{group_uri_} / + std::filesystem::path{partial_write_array_dir} / + std::filesystem::path{name}) + .string(); } /** @@ -486,8 +478,9 @@ class base_index_group { [[nodiscard]] std::string ids_uri() const { return array_key_to_uri("ids_array_name"); } - [[nodiscard]] std::string ids_temp_uri() const { - return array_key_to_temp_uri("ids_array_name"); + [[nodiscard]] std::string ids_temp_uri( + const std::string& partial_write_array_dir) const { + return array_key_to_temp_uri("ids_array_name", partial_write_array_dir); } [[nodiscard]] std::string ids_array_name() const { return array_key_to_array_name("ids_array_name"); @@ -496,8 +489,9 @@ class base_index_group { [[nodiscard]] std::string feature_vectors_uri() const { return array_key_to_uri("parts_array_name"); } - [[nodiscard]] std::string feature_vectors_temp_uri() const { - return array_key_to_temp_uri("parts_array_name"); + [[nodiscard]] std::string feature_vectors_temp_uri( + const std::string& partial_write_array_dir) const { + return array_key_to_temp_uri("parts_array_name", partial_write_array_dir); } [[nodiscard]] std::string feature_vectors_array_name() const { return array_key_to_array_name("parts_array_name"); @@ -506,18 +500,19 @@ class base_index_group { [[nodiscard]] std::string feature_vectors_index_uri() const { return array_key_to_uri("index_array_name"); } - [[nodiscard]] std::string feature_vectors_index_temp_uri() const { - return array_key_to_temp_uri("index_array_name"); + [[nodiscard]] std::string feature_vectors_index_temp_uri( + const std::string& partial_write_array_dir) const { + return array_key_to_temp_uri("index_array_name", partial_write_array_dir); } [[nodiscard]] std::string feature_vectors_index_name() const { return array_key_to_array_name("index_array_name"); } - [[nodiscard]] std::string temp_data_uri() const { - return array_key_to_uri("partial_write_array_dir"); - } - [[nodiscard]] std::string temp_data_name() const { - return array_key_to_array_name("partial_write_array_dir"); + [[nodiscard]] std::string temp_data_uri( + const std::string& partial_write_array_dir) const { + return (std::filesystem::path{group_uri_} / + std::filesystem::path{partial_write_array_dir}) + .string(); } [[nodiscard]] tiledb::Context& cached_ctx() { diff --git a/src/include/index/ivf_pq_group.h b/src/include/index/ivf_pq_group.h index 047dda71b..89478fdaf 100644 --- a/src/include/index/ivf_pq_group.h +++ b/src/include/index/ivf_pq_group.h @@ -61,7 +61,6 @@ class ivf_pq_group : public base_index_group { using Base = base_index_group; using Base::array_key_to_array_name_; - using Base::array_name_to_temp_uri_; using Base::array_name_to_uri_; using Base::cached_ctx_; using Base::group_uri_; @@ -135,10 +134,6 @@ class ivf_pq_group : public base_index_group { array_key_to_array_name_[array_key] = array_name; array_name_to_uri_[array_name] = array_name_to_uri(group_uri_, array_name); - array_name_to_temp_uri_[array_name] = array_name_to_temp_uri( - group_uri_, - storage_formats[version_]["partial_write_array_dir"], - array_name); } } @@ -209,8 +204,10 @@ class ivf_pq_group : public base_index_group { [[nodiscard]] auto pq_ivf_vectors_uri() const { return this->array_key_to_uri("pq_ivf_vectors_array_name"); } - [[nodiscard]] auto pq_ivf_vectors_temp_uri() const { - return this->array_key_to_temp_uri("pq_ivf_vectors_array_name"); + [[nodiscard]] auto pq_ivf_vectors_temp_uri( + const std::string& partial_write_array_dir) const { + return this->array_key_to_temp_uri( + "pq_ivf_vectors_array_name", partial_write_array_dir); } [[nodiscard]] auto pq_ivf_vectors_array_name() const { return this->array_key_to_array_name("pq_ivf_vectors_array_name"); @@ -341,39 +338,41 @@ class ivf_pq_group : public base_index_group { tiledb_helpers::add_to_group( write_group, flat_ivf_centroids_uri(), flat_ivf_centroids_array_name()); - create_temp_data_group(); - metadata_.store_metadata(write_group); } - void create_temp_data_group() { + void create_temp_data_group(const std::string& partial_write_array_dir) { auto write_group = tiledb::Group( cached_ctx_, group_uri_, TILEDB_WRITE, cached_ctx_.config()); - // First remove the temp_data group if it exists. This can happen if we - // ingest multiple times. - if (tiledb::Object::object(cached_ctx_, this->temp_data_uri()).type() == - tiledb::Object::Type::Group) { - tiledb::Object::remove(cached_ctx_, this->temp_data_uri()); - } - - // Then create the new temp data group. - tiledb::Group::create(cached_ctx_, this->temp_data_uri()); + // Create the new temp data group. + tiledb::Group::create( + cached_ctx_, this->temp_data_uri(partial_write_array_dir)); tiledb_helpers::add_to_group( - write_group, this->temp_data_uri(), this->temp_data_name()); + write_group, + this->temp_data_uri(partial_write_array_dir), + partial_write_array_dir); - // Finally create the array's in the temp data group that we will need + // Then create the array's in the temp data group that we will need // during ingestion. auto temp_group = tiledb::Group( - cached_ctx_, this->temp_data_uri(), TILEDB_WRITE, cached_ctx_.config()); + cached_ctx_, + this->temp_data_uri(partial_write_array_dir), + TILEDB_WRITE, + cached_ctx_.config()); - create_feature_vectors_matrix(temp_group, this->feature_vectors_temp_uri()); - create_ids_vector(temp_group, this->ids_temp_uri(), this->ids_array_name()); + create_feature_vectors_matrix( + temp_group, this->feature_vectors_temp_uri(partial_write_array_dir)); + create_ids_vector( + temp_group, + this->ids_temp_uri(partial_write_array_dir), + this->ids_array_name()); create_indices_vector( temp_group, - this->feature_vectors_index_temp_uri(), + this->feature_vectors_index_temp_uri(partial_write_array_dir), this->feature_vectors_index_name()); - create_pq_ivf_vectors_matrix(temp_group, pq_ivf_vectors_temp_uri()); + create_pq_ivf_vectors_matrix( + temp_group, pq_ivf_vectors_temp_uri(partial_write_array_dir)); } private: diff --git a/src/include/index/ivf_pq_index.h b/src/include/index/ivf_pq_index.h index 157c2cc66..d1197cd10 100644 --- a/src/include/index/ivf_pq_index.h +++ b/src/include/index/ivf_pq_index.h @@ -368,7 +368,12 @@ class ivf_pq_index { write_group.store_metadata(); } - void create_temp_data_group() { + /** + * @brief Create the temp data group. + * + * @param partial_write_array_dir The directory to write the temp data group. + */ + void create_temp_data_group(const std::string& partial_write_array_dir) { auto write_group = ivf_pq_group( group_->cached_ctx(), group_uri_, @@ -378,7 +383,7 @@ class ivf_pq_index { dimensions_, num_clusters_, num_subspaces_); - write_group.create_temp_data_group(); + write_group.create_temp_data_group(partial_write_array_dir); } /** @@ -859,6 +864,8 @@ class ivf_pq_index { * @param end The ending index of the training set to ingest. * @param partition_start The starting index of the partitioned vectors to * write. + * @param partial_write_array_dir The directory to write the temp arrays. If + * not set, we will write to the main arrays instead of the temp arrays. * @param distance The distance function to use. * @param write_to_temp_arrays Whether to write to the temp arrays. True if * ingesting via Python. @@ -873,8 +880,8 @@ class ivf_pq_index { size_t start, size_t end, size_t partition_start, - Distance distance = Distance{}, - bool write_to_temp_arrays = true) { + const std::string& partial_write_array_dir = "", + Distance distance = Distance{}) { // 1. pq-encode the vectors. // This results in a matrix where we have num_vectors(training_set) columns, // and num_subspaces_ rows. So if we had 10 vectors, each with 16 @@ -908,14 +915,18 @@ class ivf_pq_index { training_set_ids, deleted_ids, flat_ivf_centroids_, - write_to_temp_arrays ? write_group.feature_vectors_temp_uri() : - write_group.feature_vectors_uri(), - write_to_temp_arrays ? write_group.feature_vectors_index_temp_uri() : - write_group.feature_vectors_index_uri(), - write_to_temp_arrays ? write_group.ids_temp_uri() : - write_group.ids_uri(), - write_to_temp_arrays ? write_group.pq_ivf_vectors_temp_uri() : - write_group.pq_ivf_vectors_uri(), + partial_write_array_dir.empty() ? + write_group.feature_vectors_uri() : + write_group.feature_vectors_temp_uri(partial_write_array_dir), + partial_write_array_dir.empty() ? + write_group.feature_vectors_index_uri() : + write_group.feature_vectors_index_temp_uri(partial_write_array_dir), + partial_write_array_dir.empty() ? + write_group.ids_uri() : + write_group.ids_temp_uri(partial_write_array_dir), + partial_write_array_dir.empty() ? + write_group.pq_ivf_vectors_uri() : + write_group.pq_ivf_vectors_temp_uri(partial_write_array_dir), start, end, num_threads_, @@ -932,13 +943,15 @@ class ivf_pq_index { * @param partition_id_start The starting partition id. * @param partition_id_end The ending partition id. * @param batch The batch size. + * @param partial_write_array_dir The directory to write the temp arrays. */ void consolidate_partitions( size_t partitions, size_t work_items, size_t partition_id_start, size_t partition_id_end, - size_t batch) { + size_t batch, + const std::string& partial_write_array_dir) { std::vector>> partition_slices( partitions); @@ -946,7 +959,7 @@ class ivf_pq_index { std::vector partial_indexes = read_vector( group_->cached_ctx(), - group_->feature_vectors_index_temp_uri(), + group_->feature_vectors_index_temp_uri(partial_write_array_dir), 0, total_partitions, temporal_policy_); @@ -1004,14 +1017,14 @@ class ivf_pq_index { // Read data. std::vector ids = read_vector( group_->cached_ctx(), - group_->ids_temp_uri(), + group_->ids_temp_uri(partial_write_array_dir), read_slices, total_slices_size, temporal_policy_); auto vectors = tdbColMajorMatrixMultiRange( group_->cached_ctx(), - group_->feature_vectors_temp_uri(), + group_->feature_vectors_temp_uri(partial_write_array_dir), dimensions_, read_slices, total_slices_size, @@ -1021,7 +1034,7 @@ class ivf_pq_index { auto pq_vectors = tdbColMajorMatrixMultiRange( group_->cached_ctx(), - group_->pq_ivf_vectors_temp_uri(), + group_->pq_ivf_vectors_temp_uri(partial_write_array_dir), num_subspaces_, read_slices, total_slices_size, @@ -1097,8 +1110,8 @@ class ivf_pq_index { 0, ::num_vectors(vectors), 0, - distance, - false); + "", + distance); auto write_group = ivf_pq_group( group_->cached_ctx(), diff --git a/src/include/test/unit_api_ivf_pq_index.cc b/src/include/test/unit_api_ivf_pq_index.cc index 93114e120..ab26076c5 100644 --- a/src/include/test/unit_api_ivf_pq_index.cc +++ b/src/include/test/unit_api_ivf_pq_index.cc @@ -1154,6 +1154,12 @@ TEST_CASE("ingest_parts testing", "[api_ivf_pq_index]") { auto vectors = ColMajorMatrix{ {{1.0f, 1.1f, 1.2f, 1.3f}, {2.0f, 2.1f, 2.2f, 2.3f}}}; + std::string partial_write_array_dir = "temp"; + { + auto index = IndexIVFPQ( + ctx, index_uri, IndexLoadStrategy::PQ_INDEX, 0, temporal_policy); + index.create_temp_data_group(partial_write_array_dir); + } { auto index = IndexIVFPQ( ctx, index_uri, IndexLoadStrategy::PQ_INDEX, 0, temporal_policy); @@ -1171,7 +1177,8 @@ TEST_CASE("ingest_parts testing", "[api_ivf_pq_index]") { FeatureVector(0, "uint64"), part, part_end, - part_id); + part_id, + partial_write_array_dir); } { // Here is where Python does compute_partition_indexes_udf. We simulate that @@ -1181,7 +1188,7 @@ TEST_CASE("ingest_parts testing", "[api_ivf_pq_index]") { auto total_partitions = 2; auto indexes = read_vector( ctx, - group.feature_vectors_index_temp_uri(), + group.feature_vectors_index_temp_uri(partial_write_array_dir), 0, total_partitions, temporal_policy); @@ -1203,7 +1210,12 @@ TEST_CASE("ingest_parts testing", "[api_ivf_pq_index]") { size_t partition_id_end = 1; size_t batch = 33554432; index.consolidate_partitions( - partitions, work_items, partition_id_start, partition_id_end, batch); + partitions, + work_items, + partition_id_start, + partition_id_end, + batch, + partial_write_array_dir); } { @@ -1211,15 +1223,6 @@ TEST_CASE("ingest_parts testing", "[api_ivf_pq_index]") { auto&& [scores, ids] = index.query(FeatureVectorArray(vectors), 1, 1); check_single_vector_equals(scores, ids, {0, 0}, {0, 1}); } - - { - // If we were to re-ingest, Python would delete the temp data group. - } - - { - auto index = IndexIVFPQ(ctx, index_uri, IndexLoadStrategy::PQ_INDEX, 0); - index.create_temp_data_group(); - } } TEST_CASE("train python", "[api_ivf_pq_index]") {