Skip to content

Commit

Permalink
Bmp idx 3 (#1361)
Browse files Browse the repository at this point in the history
### What problem does this PR solve?

1. Add knn query by sparse vector's bmp index
2. Add filter knn query for sparse vector.

Issue link:#1355

### Type of change

- [x] New Feature (non-breaking change which adds functionality)
- [x] Test cases
  • Loading branch information
small-turtle-1 authored Jun 21, 2024
1 parent 343b4c5 commit 40f512c
Show file tree
Hide file tree
Showing 33 changed files with 2,274 additions and 1,577 deletions.
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -129,3 +129,5 @@ callgrind.out.*
python/benchmark/datasets/

.venv/

dist/
151 changes: 96 additions & 55 deletions benchmark/local_infinity/sparse/bmp_benchmark.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -49,80 +49,121 @@ int main(int argc, char *argv[]) {
case ModeType::kImport: {
SparseMatrix<f32, i32> data_mat = DecodeSparseDataset(opt.data_path_);
profiler.Begin();
BMPAlg<f32, i16, BMPCompressType::kCompressed> index(data_mat.ncol_, opt.block_size_);
for (SparseMatrixIter<f32, i32> iter(data_mat); iter.HasNext(); iter.Next()) {
SparseVecRef vec = iter.val();
u32 doc_id = iter.row_id();
Vector<i16> indices(vec.nnz_);
for (i32 i = 0; i < vec.nnz_; i++) {
indices[i] = static_cast<i16>(vec.indices_[i]);

auto inner = [&](auto &index) {
for (SparseMatrixIter<f32, i32> iter(data_mat); iter.HasNext(); iter.Next()) {
SparseVecRef vec = iter.val();
u32 doc_id = iter.row_id();
Vector<i16> indices(vec.nnz_);
for (i32 i = 0; i < vec.nnz_; i++) {
indices[i] = static_cast<i16>(vec.indices_[i]);
}
SparseVecRef<f32, i16> vec1(vec.nnz_, indices.data(), vec.data_);
index.AddDoc(vec1, doc_id);

if (LogInterval != 0 && doc_id % LogInterval == 0) {
std::cout << fmt::format("Imported {} docs\n", doc_id);
}
}
SparseVecRef<f32, i16> vec1(vec.nnz_, indices.data(), vec.data_);
index.AddDoc(vec1, doc_id);
data_mat.Clear();

BMPOptimizeOptions optimize_options{.topk_ = opt.topk_};
std::cout << "Optimizing index...\n";
index.Optimize(optimize_options);
std::cout << "Index built\n";

profiler.End();

if (LogInterval != 0 && doc_id % LogInterval == 0) {
std::cout << fmt::format("Imported {} docs\n", doc_id);
std::cout << fmt::format("Import data time: {}\n", profiler.ElapsedToString(1000));
auto [file_handler, status] =
fs.OpenFile(opt.index_save_path_.string(), FileFlags::WRITE_FLAG | FileFlags::CREATE_FLAG, FileLockType::kNoLock);
if (!status.ok()) {
UnrecoverableError(fmt::format("Failed to open file: {}", opt.index_save_path_.string()));
}
index.Save(*file_handler);
};
switch (opt.type_) {
case BMPCompressType::kCompressed: {
BMPAlg<f32, i16, BMPCompressType::kCompressed> index(data_mat.ncol_, opt.block_size_);
inner(index);
break;
}
case BMPCompressType::kRaw: {
BMPAlg<f32, i16, BMPCompressType::kRaw> index(data_mat.ncol_, opt.block_size_);
inner(index);
break;
}
default: {
UnrecoverableError("Unknown compress type");
}
}
data_mat.Clear();
std::cout << "Optimizing index...\n";
index.Optimize(opt.topk_);
std::cout << "Index built\n";
profiler.End();

std::cout << fmt::format("Import data time: {}\n", profiler.ElapsedToString(1000));
auto [file_handler, status] =
fs.OpenFile(opt.index_save_path_.string(), FileFlags::WRITE_FLAG | FileFlags::CREATE_FLAG, FileLockType::kNoLock);
if (!status.ok()) {
UnrecoverableError(fmt::format("Failed to open file: {}", opt.index_save_path_.string()));
}
index.Save(*file_handler);
break;
}
case ModeType::kQuery: {
i64 query_n = opt.query_n_;
i32 thread_n = opt.thread_n_;
BmpSearchOptions search_options{.alpha_ = opt.alpha_, .beta_ = opt.beta_, .use_tail_ = true, .use_lock_ = false};

auto [file_handler, status] = fs.OpenFile(opt.index_save_path_.string(), FileFlags::READ_FLAG, FileLockType::kNoLock);
if (!status.ok()) {
UnrecoverableError(fmt::format("Failed to open file: {}", opt.index_save_path_.string()));
}
auto index = BMPAlg<f32, i16, BMPCompressType::kCompressed>::Load(*file_handler);

auto [top_k, all_query_n, _1, _2] = DecodeGroundtruth(opt.groundtruth_path_, true);
if ((int)top_k != opt.topk_) {
UnrecoverableError(fmt::format("Topk mismatch: {} vs {}", top_k, opt.topk_));
}
Vector<Pair<Vector<u32>, Vector<f32>>> query_result;
{
SparseMatrix<f32, i32> query_mat = DecodeSparseDataset(opt.query_path_);
if (all_query_n != query_mat.nrow_) {
UnrecoverableError(fmt::format("Query number mismatch: {} vs {}", query_n, query_mat.nrow_));
auto inner = [&](auto &index) {
auto [top_k, all_query_n, _1, _2] = DecodeGroundtruth(opt.groundtruth_path_, true);
if ((int)top_k != opt.topk_) {
UnrecoverableError(fmt::format("Topk mismatch: {} vs {}", top_k, opt.topk_));
}
if (query_n > all_query_n) {
UnrecoverableError(fmt::format("Query number: {} is larger than all query number: {}", query_n, all_query_n));
Vector<Pair<Vector<u32>, Vector<f32>>> query_result;
{
SparseMatrix<f32, i32> query_mat = DecodeSparseDataset(opt.query_path_);
if (all_query_n != query_mat.nrow_) {
UnrecoverableError(fmt::format("Query number mismatch: {} vs {}", query_n, query_mat.nrow_));
}
if (query_n > all_query_n) {
UnrecoverableError(fmt::format("Query number: {} is larger than all query number: {}", query_n, all_query_n));
}
if (query_n == 0) {
query_n = all_query_n;
}

profiler.Begin();
query_result = Search(thread_n,
query_mat,
top_k,
query_n,
[&](const SparseVecRef<f32, i32> &query, u32 topk) -> Pair<Vector<u32>, Vector<f32>> {
Vector<i16> indices(query.nnz_);
for (i32 i = 0; i < query.nnz_; i++) {
indices[i] = static_cast<i16>(query.indices_[i]);
}
SparseVecRef<f32, i16> query1(query.nnz_, indices.data(), query.data_);
return index.SearchKnn(query1, topk, search_options);
});
profiler.End();
std::cout << fmt::format("Search time: {}\n", profiler.ElapsedToString(1000));
}
if (query_n == 0) {
query_n = all_query_n;
{
auto [_1, _2, gt_indices, gt_scores] = DecodeGroundtruth(opt.groundtruth_path_, false);
f32 recall = CheckGroundtruth(gt_indices.get(), gt_scores.get(), query_result, top_k);
std::cout << fmt::format("Recall: {}\n", recall);
}
};

profiler.Begin();
query_result =
Search(thread_n, query_mat, top_k, query_n, [&](const SparseVecRef<f32, i32> &query, u32 topk) -> Pair<Vector<u32>, Vector<f32>> {
Vector<i16> indices(query.nnz_);
for (i32 i = 0; i < query.nnz_; i++) {
indices[i] = static_cast<i16>(query.indices_[i]);
}
SparseVecRef<f32, i16> query1(query.nnz_, indices.data(), query.data_);
return index.SearchKnn(query1, topk, opt.alpha_, opt.beta_);
});
profiler.End();
std::cout << fmt::format("Search time: {}\n", profiler.ElapsedToString(1000));
}
{
auto [_1, _2, gt_indices, gt_scores] = DecodeGroundtruth(opt.groundtruth_path_, false);
f32 recall = CheckGroundtruth(gt_indices.get(), gt_scores.get(), query_result, top_k);
std::cout << fmt::format("Recall: {}\n", recall);
switch (opt.type_) {
case BMPCompressType::kCompressed: {
auto index = BMPAlg<f32, i16, BMPCompressType::kCompressed>::Load(*file_handler);
inner(index);
break;
}
case BMPCompressType::kRaw: {
auto index = BMPAlg<f32, i16, BMPCompressType::kRaw>::Load(*file_handler);
inner(index);
break;
}
default: {
UnrecoverableError("Unknown compress type");
}
}
break;
}
Expand Down
14 changes: 12 additions & 2 deletions benchmark/local_infinity/sparse/sparse_benchmark_util.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import third_party;
import infinity_exception;
import sparse_util;
import compilation_config;
import bmp_util;

using namespace infinity;

Expand Down Expand Up @@ -248,17 +249,26 @@ struct LinScanOption : public BenchmarkOption {
struct BMPOption : public BenchmarkOption {
public:
void ParseInner(CLI::App &app_) override {
Map<String, BMPCompressType> bmp_compress_type_map = {
{"compress", BMPCompressType::kCompressed},
{"raw", BMPCompressType::kRaw},
};

app_.add_option("--type", type_, "BMP compress type")
->required(false)
->transform(CLI::CheckedTransformer(bmp_compress_type_map, CLI::ignore_case));
app_.add_option("--topk", topk_, "Topk")->required(false)->transform(CLI::Range(1, 1024));
app_.add_option("--block_size", block_size_, "Block size")->required(false)->transform(CLI::Range(1, 1024));
app_.add_option("--alpha", alpha_, "Alpha")->required(false)->transform(CLI::Range(0.0, 100.0));
app_.add_option("--beta", beta_, "Beta")->required(false)->transform(CLI::Range(0.0, 100.0));
}

String IndexName() const override { return fmt::format("bmp_block{}", block_size_); }
String IndexName() const override { return fmt::format("bmp_block{}_type{}", block_size_, static_cast<i8>(type_)); }

public:
BMPCompressType type_ = BMPCompressType::kCompressed;
i32 topk_ = 10;
i8 block_size_ = 8;
u8 block_size_ = 8;
f32 alpha_ = 1.0;
f32 beta_ = 1.0;
};
Expand Down
18 changes: 17 additions & 1 deletion src/executor/operator/physical_optimize.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,12 @@ namespace infinity {
void PhysicalOptimize::Init() {}

bool PhysicalOptimize::Execute(QueryContext *query_context, OperatorState *operator_state) {
OptimizeIndex(query_context, operator_state);
if (index_name_.empty()) {
OptimizeIndex(query_context, operator_state);
} else {
OptimizeAIndex(query_context, operator_state);
}

operator_state->SetComplete();
return true;
}
Expand All @@ -58,4 +63,15 @@ void PhysicalOptimize::OptimizeIndex(QueryContext *query_context, OperatorState
LOG_INFO(fmt::format("OptimizeIndex {}.{} end", db_name_, table_name_));
}

void PhysicalOptimize::OptimizeAIndex(QueryContext *query_context, OperatorState *operator_state) {
auto txn = query_context->GetTxn();
auto [table_index_entry, status] = txn->GetIndexByName(db_name_, table_name_, index_name_);
if (!status.ok()) {
operator_state->status_ = status;
RecoverableError(status);
return;
}
table_index_entry->OptimizeIndex(txn, opt_params_);
}

} // namespace infinity
14 changes: 12 additions & 2 deletions src/executor/operator/physical_optimize.cppm
Original file line number Diff line number Diff line change
Expand Up @@ -29,14 +29,20 @@ import internal_types;
import optimize_statement;
import data_type;
import logger;
import statement_common;

namespace infinity {

export class PhysicalOptimize final : public PhysicalOperator {
public:
explicit PhysicalOptimize(u64 id, String db_name, String table_name, SharedPtr<Vector<LoadMeta>> load_metas)
explicit PhysicalOptimize(u64 id,
String db_name,
String table_name,
String index_name,
Vector<UniquePtr<InitParameter>> opt_params,
SharedPtr<Vector<LoadMeta>> load_metas)
: PhysicalOperator(PhysicalOperatorType::kOptimize, nullptr, nullptr, id, load_metas), db_name_(std::move(db_name)),
table_name_(std::move(table_name)) {}
table_name_(std::move(table_name)), index_name_(std::move(index_name)), opt_params_(std::move(opt_params)) {}

~PhysicalOptimize() override = default;

Expand All @@ -58,9 +64,13 @@ public:
private:
void OptimizeIndex(QueryContext *query_context, OperatorState *operator_state);

void OptimizeAIndex(QueryContext *query_context, OperatorState *operator_state);

private:
String db_name_{};
String table_name_{};
String index_name_{};
Vector<UniquePtr<InitParameter>> opt_params_;

SharedPtr<Vector<String>> output_names_{};
SharedPtr<Vector<SharedPtr<DataType>>> output_types_{};
Expand Down
Loading

0 comments on commit 40f512c

Please sign in to comment.