Skip to content

Commit

Permalink
Add mem index append.
Browse files Browse the repository at this point in the history
  • Loading branch information
small-turtle-1 committed Feb 25, 2025
1 parent e77c099 commit fe39051
Show file tree
Hide file tree
Showing 22 changed files with 815 additions and 43 deletions.
16 changes: 13 additions & 3 deletions src/planner/optimizer/index_scan/index_filter_evaluators.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -481,7 +481,8 @@ Bitmask IndexFilterEvaluatorFulltext::Evaluate(const SegmentID segment_id, const
result.SetAllFalse();
const RowID begin_rowid(segment_id, 0);
const RowID end_rowid(segment_id, segment_row_count);
const CreateSearchParams params{table_info_, index_reader_.get(), early_term_algo_, ft_similarity_, bm25_params_, minimum_should_match_, 0u, index_names_};
const CreateSearchParams
params{table_info_, index_reader_.get(), early_term_algo_, ft_similarity_, bm25_params_, minimum_should_match_, 0u, index_names_};
auto ft_iter = query_tree_->CreateSearch(params);
if (ft_iter && score_threshold_ > 0.0f) {
auto new_ft_iter = MakeUnique<ScoreThresholdIterator>(std::move(ft_iter), score_threshold_);
Expand Down Expand Up @@ -610,13 +611,22 @@ struct TrunkReaderT final : TrunkReader<ColumnValueType> {
using KeyType = typename TrunkReader<ColumnValueType>::SecondaryIndexOrderedT;
const u32 segment_row_count_;
SharedPtr<ChunkIndexEntry> chunk_index_entry_;
const SecondaryIndexData *index_ = nullptr;
u32 begin_pos_ = 0;
u32 end_pos_ = 0;
TrunkReaderT(const u32 segment_row_count, const SharedPtr<ChunkIndexEntry> &chunk_index_entry)
: segment_row_count_(segment_row_count), chunk_index_entry_(chunk_index_entry) {}
TrunkReaderT(const u32 segment_row_count, const SecondaryIndexData *index)
: segment_row_count_(segment_row_count), index_(index) {}
u32 GetResultCnt(const Pair<KeyType, KeyType> interval_range) override {
const BufferHandle index_handle = chunk_index_entry_->GetIndex();
const auto index = static_cast<const SecondaryIndexData *>(index_handle.GetData());
Optional<BufferHandle> index_handle;
const SecondaryIndexData *index = nullptr;
if (chunk_index_entry_) {
index_handle = chunk_index_entry_->GetIndex();
index = static_cast<const SecondaryIndexData *>(index_handle->GetData());
} else {
index = index_;
}
const u32 index_data_num = index->GetChunkRowCount();
const auto [begin_val, end_val] = interval_range;
// 1. search PGM and get approximate search range
Expand Down
40 changes: 40 additions & 0 deletions src/storage/catalog/mem_index.cppm
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
// Copyright(C) 2023 InfiniFlow, Inc. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

module;

export module mem_index;

import stl;
import secondary_index_in_mem;
import ivf_index_data_in_mem;
import emvb_index_in_mem;
import memory_indexer;
import abstract_hnsw;
import abstract_bmp;

namespace infinity {

export struct MemIndex {
std::mutex mtx_;

SharedPtr<HnswIndexInMem> memory_hnsw_index_{};
SharedPtr<IVFIndexInMem> memory_ivf_index_{};
SharedPtr<MemoryIndexer> memory_indexer_{};
SharedPtr<SecondaryIndexInMem> memory_secondary_index_{};
SharedPtr<EMVBIndexInMem> memory_emvb_index_{};
SharedPtr<BMPIndexInMem> memory_bmp_index_{};
};

} // namespace infinity
21 changes: 21 additions & 0 deletions src/storage/catalog/meta/segment_index_meta.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@ import kv_store;
import table_index_meeta;
import table_meeta;
import third_party;
import infinity_context;
import new_catalog;
import mem_index;

namespace infinity {

Expand Down Expand Up @@ -79,6 +82,24 @@ Status SegmentIndexMeta::InitSet() {
return status;
}
}
{
String mem_index_key = GetSegmentIndexTag("mem_index");
NewCatalog *new_catalog = InfinityContext::instance().storage()->new_catalog();
Status status = new_catalog->AddMemIndex(std::move(mem_index_key), MakeShared<MemIndex>());
if (!status.ok()) {
return status;
}
}
return Status::OK();
}

Status SegmentIndexMeta::GetMemIndex(SharedPtr<MemIndex> &mem_index) {
NewCatalog *new_catalog = InfinityContext::instance().storage()->new_catalog();
String mem_index_key = GetSegmentIndexTag("mem_index");
Status status = new_catalog->GetMemIndex(mem_index_key, mem_index);
if (!status.ok()) {
return status;
}
return Status::OK();
}

Expand Down
5 changes: 4 additions & 1 deletion src/storage/catalog/meta/segment_index_meta.cppm
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ namespace infinity {

class KVInstance;
class TableIndexMeeta;
class MemIndex;

export class SegmentIndexMeta {
public:
Expand All @@ -47,7 +48,7 @@ public:

Status GetNextChunkID(ChunkID &chunk_id) {
if (!next_chunk_id_) {
Status status = LoadChunkIDs();
Status status = LoadNextChunkID();
if (!status.ok()) {
return status;
}
Expand All @@ -64,6 +65,8 @@ public:

Status InitSet();

Status GetMemIndex(SharedPtr<MemIndex> &mem_index);

private:
Status LoadChunkIDs();

Expand Down
46 changes: 32 additions & 14 deletions src/storage/catalog/meta/table_index_meeta.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,31 +34,37 @@ Status TableIndexMeeta::GetColumnDef(SharedPtr<ColumnDef> &column_def) {
return status;
}
}
SizeT column_idx;
{
Status status = table_meta_.GetColumnIDByColumnName(index_def->column_name(), column_idx);
Status status = table_meta_.GetColumnDefByColumnName(index_def->column_name(), column_def);
if (!status.ok()) {
return status;
}
}
{
Vector<SharedPtr<ColumnDef>> *column_defs = nullptr;
Status status = table_meta_.GetColumnDefs(column_defs);
if (!status.ok()) {
return status;
}
column_def = (*column_defs)[column_idx];
}
return Status::OK();
}

Status TableIndexMeeta::SetSegmentIDs(const Vector<SegmentID> &segment_ids) {
//
String segment_ids_key = GetTableIndexTag("segment_ids");
String segment_ids_str = nlohmann::json(segment_ids).dump();
Status status = kv_instance_.Put(segment_ids_key, segment_ids_str);
if (!status.ok()) {
return status;
}
return Status::OK();
}

Status TableIndexMeeta::AddSegmentID(SegmentID segment_id) {
//
if (!segment_ids_) {
Status status = LoadSegmentIDs();
if (!status.ok()) {
return status;
}
}
segment_ids_->push_back(segment_id);
Status status = SetSegmentIDs(*segment_ids_);
if (!status.ok()) {
return status;
}
return Status::OK();
}

Expand All @@ -75,12 +81,24 @@ Status TableIndexMeeta::LoadIndexDef() {
}

Status TableIndexMeeta::LoadIndexDir() {
//
index_dir_ = MakeShared<String>();
String index_dir_key = GetTableIndexTag("dir");
Status status = kv_instance_.Get(index_dir_key, *index_dir_);
if (!status.ok()) {
return status;
}
return Status::OK();
}

Status TableIndexMeeta::LoadSegmentIDs() {
//
String segment_ids_key = GetTableIndexTag("segment_ids");
String segment_ids_str;
Status status = kv_instance_.Get(segment_ids_key, segment_ids_str);
if (!status.ok()) {
return status;
}
Vector<SegmentID> segment_ids = nlohmann::json::parse(segment_ids_str).get<Vector<SegmentID>>();
segment_ids_ = segment_ids;
return Status::OK();
}

Expand Down
6 changes: 3 additions & 3 deletions src/storage/catalog/meta/table_index_meeta.cppm
Original file line number Diff line number Diff line change
Expand Up @@ -47,20 +47,20 @@ public:
return Status::OK();
}

Status GetIndexDir(SharedPtr<String> index_dir) {
Status GetIndexDir(String *&index_dir_ptr) {
if (!index_dir_) {
Status status = LoadIndexDir();
if (!status.ok()) {
return status;
}
}
index_dir_ = index_dir;
index_dir_ptr = index_dir_.get();
return Status::OK();
}

Status GetColumnDef(SharedPtr<ColumnDef> &column_def);

Status GetSegmentIds(Vector<SegmentID> *&segment_ids) {
Status GetSegmentIDs(Vector<SegmentID> *&segment_ids) {
if (!segment_ids_) {
Status status = LoadSegmentIDs();
if (!status.ok()) {
Expand Down
25 changes: 21 additions & 4 deletions src/storage/catalog/meta/table_meeta.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -74,14 +74,14 @@ Status TableMeeta::GetTableDir(String *&table_dir) {
return Status::OK();
}

Status TableMeeta::GetColumnIDByColumnName(const String &column_name, ColumnID &column_id) {
Status TableMeeta::GetColumnDefByColumnName(const String &column_name, SharedPtr<ColumnDef> &column_def) {
String column_key = KeyEncode::TableColumnKey(db_id_str_, table_id_str_, column_name);
String column_id_str;
Status status = kv_instance_.Get(column_key, column_id_str);
String column_def_str;
Status status = kv_instance_.Get(column_key, column_def_str);
if (!status.ok()) {
return status;
}
column_id = std::stoull(column_id_str);
column_def = ColumnDef::FromJson(nlohmann::json::parse(column_def_str));
return Status::OK();
}

Expand Down Expand Up @@ -198,6 +198,23 @@ Status TableMeeta::LoadSegmentIDs() {
return Status::OK();
}

Status TableMeeta::LoadIndexIDs() {
Vector<String> index_id_strs;
String index_prefix = KeyEncode::CatalogTableIndexPrefix(db_id_str_, table_id_str_);
auto iter = kv_instance_.GetIterator();
iter->Seek(index_prefix);
while (iter->Valid() && iter->Key().starts_with(index_prefix)) {
String column_key = iter->Key().ToString();
String column_key1 = column_key.substr(index_prefix.size());
[[maybe_unused]] String index_name = column_key1.substr(0, column_key1.find('|'));
String index_id_str = iter->Value().ToString();
index_id_strs.emplace_back(std::move(index_id_str));
iter->Next();
}
index_id_strs_ = std::move(index_id_strs);
return Status::OK();
}

Status TableMeeta::LoadNextSegmentID() {
String next_id_key = GetTableTag(String(LATEST_SEGMENT_ID));
String next_id_str;
Expand Down
16 changes: 15 additions & 1 deletion src/storage/catalog/meta/table_meeta.cppm
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,22 @@ public:

Status GetSegmentIDs(Vector<SegmentID> *&segment_ids);

Status GetIndexIDs(Vector<String> *&index_id_strs) {
if (!index_id_strs_) {
Status status = LoadIndexIDs();
if (!status.ok()) {
return status;
}
}
index_id_strs = &index_id_strs_.value();
return Status::OK();
}

Status GetNextSegmentID(SegmentID &next_segment_id);

Status GetTableDir(String *&table_dir);

Status GetColumnIDByColumnName(const String &column_name, ColumnID &column_id);
Status GetColumnDefByColumnName(const String &column_name, SharedPtr<ColumnDef> &column_def);

Status SetNextSegmentID(SegmentID next_segment_id);

Expand All @@ -66,6 +77,8 @@ private:

Status LoadSegmentIDs();

Status LoadIndexIDs();

Status LoadNextSegmentID();

Status LoadLatestSegmentID();
Expand All @@ -85,6 +98,7 @@ private:

Optional<Vector<SharedPtr<ColumnDef>>> column_defs_;
Optional<Vector<SegmentID>> segment_ids_;
Optional<Vector<String>> index_id_strs_;
Optional<SegmentID> next_segment_id_;
Optional<SegmentID> latest_segment_id_;
Optional<String> table_dir_;
Expand Down
39 changes: 39 additions & 0 deletions src/storage/catalog/new_catalog.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import third_party;
import logger;
import infinity_exception;
import default_values;
import mem_index;

namespace infinity {

Expand Down Expand Up @@ -486,4 +487,42 @@ Status NewCatalog::DropBlockLockByBlockKey(const String &block_key) {
return Status::OK();
}

Status NewCatalog::AddMemIndex(String mem_index_key, SharedPtr<MemIndex> mem_index) {
bool insert_success = false;
HashMap<String, SharedPtr<MemIndex>>::iterator iter;
{
std::unique_lock lock(mem_index_mtx_);
std::tie(iter, insert_success) = mem_index_map_.emplace(std::move(mem_index_key), std::move(mem_index));
}
if (!insert_success) {
return Status::CatalogError(fmt::format("MemIndex key: {} already exists", iter->first));
}
return Status::OK();
}

Status NewCatalog::GetMemIndex(const String &mem_index_key, SharedPtr<MemIndex> &mem_index) {
{
std::shared_lock<std::shared_mutex> lck(mem_index_mtx_);
if (auto iter = mem_index_map_.find(mem_index_key); iter != mem_index_map_.end()) {
mem_index = iter->second;
}
}
if (mem_index == nullptr) {
return Status::CatalogError(fmt::format("MemIndex key: {} not found", mem_index_key));
}
return Status::OK();
}

Status NewCatalog::DropMemIndexByMemIndexKey(const String &mem_index_key) {
bool delete_success = false;
{
std::unique_lock lock(mem_index_mtx_);
delete_success = mem_index_map_.erase(mem_index_key) > 0;
}
if (!delete_success) {
return Status::CatalogError(fmt::format("MemIndex key: {} not found", mem_index_key));
}
return Status::OK();
}

} // namespace infinity
Loading

0 comments on commit fe39051

Please sign in to comment.