Skip to content

Commit

Permalink
Add fulltext index insert and dump.
Browse files Browse the repository at this point in the history
  • Loading branch information
small-turtle-1 committed Feb 26, 2025
1 parent 6a7bdd4 commit 5ae1b64
Show file tree
Hide file tree
Showing 14 changed files with 762 additions and 61 deletions.
68 changes: 68 additions & 0 deletions src/storage/catalog/meta/segment_index_meta.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,21 @@ import third_party;
import infinity_context;
import new_catalog;
import mem_index;
import index_base;
import create_index_info;

namespace infinity {

void SegmentIndexFtInfo::ToJson(nlohmann::json &json) const {
json["ft_column_len_sum"] = ft_column_len_sum_;
json["ft_column_len_cnt"] = ft_column_len_cnt_;
}

void SegmentIndexFtInfo::FromJson(const nlohmann::json &json) {
ft_column_len_sum_ = json["ft_column_len_sum"].get<u64>();
ft_column_len_cnt_ = json["ft_column_len_cnt"].get<u32>();
}

SegmentIndexMeta::SegmentIndexMeta(SegmentID segment_id, TableIndexMeeta &table_index_meta, KVInstance &kv_instance)
: kv_instance_(kv_instance), table_index_meta_(table_index_meta), segment_id_(segment_id) {}

Expand Down Expand Up @@ -69,6 +81,35 @@ Status SegmentIndexMeta::SetNextChunkID(ChunkID chunk_id) {
return Status::OK();
}

Status SegmentIndexMeta::SetFtInfo(const SegmentIndexFtInfo &ft_info) {
ft_info_ = ft_info;
String ft_info_key = GetSegmentIndexTag("ft_info");
nlohmann::json ft_info_json;
ft_info.ToJson(ft_info_json);
String ft_info_str = ft_info_json.dump();
Status status = kv_instance_.Put(ft_info_key, ft_info_str);
if (!status.ok()) {
return status;
}
return Status::OK();
}

Status SegmentIndexMeta::UpdateFtInfo(u64 column_len_sum, u32 column_len_cnt) {
if (!ft_info_) {
Status status = LoadFtInfo();
if (!status.ok()) {
return status;
}
}
ft_info_->ft_column_len_sum_ += column_len_sum;
ft_info_->ft_column_len_cnt_ += column_len_cnt;
Status status = SetFtInfo(*ft_info_);
if (!status.ok()) {
return status;
}
return Status::OK();
}

Status SegmentIndexMeta::InitSet() {
{
Status status = SetChunkIDs(Vector<ChunkID>());
Expand All @@ -90,6 +131,20 @@ Status SegmentIndexMeta::InitSet() {
return status;
}
}
{
SharedPtr<IndexBase> index_def;
Status status = table_index_meta_.GetIndexDef(index_def);
if (!status.ok()) {
return status;
}
if (index_def->index_type_ == IndexType::kFullText) {
SegmentIndexFtInfo ft_info{};
Status status = this->SetFtInfo(ft_info);
if (!status.ok()) {
return status;
}
}
}
return Status::OK();
}

Expand Down Expand Up @@ -125,6 +180,19 @@ Status SegmentIndexMeta::LoadNextChunkID() {
return Status::OK();
}

Status SegmentIndexMeta::LoadFtInfo() {
String ft_info_key = GetSegmentIndexTag("ft_info");
String ft_info_str;
Status status = kv_instance_.Get(ft_info_key, ft_info_str);
if (!status.ok()) {
return status;
}
nlohmann::json ft_info_json = nlohmann::json::parse(ft_info_str);
ft_info_ = SegmentIndexFtInfo();
ft_info_->FromJson(ft_info_json);
return Status::OK();
}

String SegmentIndexMeta::GetSegmentIndexTag(const String &tag) {
const TableMeeta &table_meta = table_index_meta_.table_meta();
return KeyEncode::CatalogIdxSegmentTagKey(table_meta.db_id_str(), table_meta.table_id_str(), table_index_meta_.index_id_str(), segment_id_, tag);
Expand Down
29 changes: 29 additions & 0 deletions src/storage/catalog/meta/segment_index_meta.cppm
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,23 @@ export module segment_index_meta;

import stl;
import status;
import third_party;

namespace infinity {

class KVInstance;
class TableIndexMeeta;
class MemIndex;

export struct SegmentIndexFtInfo {
u64 ft_column_len_sum_{}; // increase only
u32 ft_column_len_cnt_{}; // increase only

void ToJson(nlohmann::json &json) const;

void FromJson(const nlohmann::json &json);
};

export class SegmentIndexMeta {
public:
SegmentIndexMeta(SegmentID segment_id, TableIndexMeeta &table_index_meta, KVInstance &kv_instance);
Expand Down Expand Up @@ -57,12 +67,27 @@ public:
return Status::OK();
}

Status GetFtInfo(SegmentIndexFtInfo *&ft_info) {
if (!ft_info_) {
Status status = LoadFtInfo();
if (!status.ok()) {
return status;
}
}
ft_info = &ft_info_.value();
return Status::OK();
}

Status SetChunkIDs(const Vector<ChunkID> &chunk_ids);

Status AddChunkID(ChunkID chunk_id);

Status SetNextChunkID(ChunkID chunk_id);

Status SetFtInfo(const SegmentIndexFtInfo &ft_info);

Status UpdateFtInfo(u64 column_len_sum, u32 column_len_cnt);

Status InitSet();

Status GetMemIndex(SharedPtr<MemIndex> &mem_index);
Expand All @@ -72,6 +97,8 @@ private:

Status LoadNextChunkID();

Status LoadFtInfo();

String GetSegmentIndexTag(const String &tag);

private:
Expand All @@ -81,6 +108,8 @@ private:

Optional<Vector<ChunkID>> chunk_ids_;
Optional<ChunkID> next_chunk_id_;

Optional<SegmentIndexFtInfo> ft_info_;
};

} // namespace infinity
24 changes: 24 additions & 0 deletions src/storage/catalog/meta/table_index_meeta.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@

module;

#include <string>

module table_index_meeta;

import kv_store;
Expand All @@ -26,6 +28,26 @@ namespace infinity {
TableIndexMeeta::TableIndexMeeta(String index_id_str, TableMeeta &table_meta, KVInstance &kv_instance)
: kv_instance_(kv_instance), table_meta_(table_meta), index_id_str_(std::move(index_id_str)) {}

Status TableIndexMeeta::GetTableIndexDir(String &table_index_dir) {
SharedPtr<String> table_dir_ptr{nullptr};
{
auto [table_dir, status] = table_meta_.GetTableDir();
if (!status.ok()) {
return status;
}
table_dir_ptr = table_dir;
}
String *index_dir_ptr = nullptr;
{
Status status = this->GetIndexDir(index_dir_ptr);
if (!status.ok()) {
return status;
}
}
table_index_dir = fmt::format("{}/{}", *table_dir_ptr, *index_dir_ptr);
return Status::OK();
}

Tuple<SharedPtr<ColumnDef>, Status> TableIndexMeeta::GetColumnDef() {
SharedPtr<IndexBase> index_def;
{
Expand Down Expand Up @@ -125,4 +147,6 @@ String TableIndexMeeta::GetTableIndexTag(const String &tag) const {
return KeyEncode::CatalogIndexTagKey(table_meta_.db_id_str(), table_meta_.table_id_str(), index_id_str_, tag);
}

String TableIndexMeeta::FtIndexCacheTag() const { return GetTableIndexTag("ft_cache"); }

} // namespace infinity
5 changes: 5 additions & 0 deletions src/storage/catalog/meta/table_index_meeta.cppm
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,8 @@ public:
return Status::OK();
}

Status GetTableIndexDir(String &table_index_dir);

Tuple<SharedPtr<ColumnDef>, Status> GetColumnDef();

Status GetSegmentIDs(Vector<SegmentID> *&segment_ids) {
Expand Down Expand Up @@ -86,6 +88,9 @@ private:

String GetTableIndexTag(const String &tag) const;

public:
String FtIndexCacheTag() const;

private:
KVInstance &kv_instance_;
TableMeeta &table_meta_;
Expand Down
40 changes: 40 additions & 0 deletions src/storage/catalog/new_catalog.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import logger;
import infinity_exception;
import default_values;
import mem_index;
import column_index_reader;

namespace infinity {

Expand Down Expand Up @@ -527,4 +528,43 @@ Status NewCatalog::DropMemIndexByMemIndexKey(const String &mem_index_key) {
return Status::OK();
}

Status NewCatalog::AddFtIndexCache(String ft_index_cache_key, SharedPtr<TableIndexReaderCache> ft_index_cache) {
bool insert_success = false;
HashMap<String, SharedPtr<TableIndexReaderCache>>::iterator iter;
{
std::unique_lock lock(ft_index_cache_mtx_);
std::tie(iter, insert_success) = ft_index_cache_map_.emplace(std::move(ft_index_cache_key), std::move(ft_index_cache));
}
if (!insert_success) {
return Status::CatalogError(fmt::format("FtIndexCache key: {} already exists", iter->first));
}
return Status::OK();
}

Status NewCatalog::GetFtIndexCache(const String &ft_index_cache_key, SharedPtr<TableIndexReaderCache> &ft_index_cache) {
ft_index_cache = nullptr;
{
std::shared_lock<std::shared_mutex> lck(ft_index_cache_mtx_);
if (auto iter = ft_index_cache_map_.find(ft_index_cache_key); iter != ft_index_cache_map_.end()) {
ft_index_cache = iter->second;
}
}
if (ft_index_cache == nullptr) {
return Status::CatalogError(fmt::format("FtIndexCache key: {} not found", ft_index_cache_key));
}
return Status::OK();
}

Status NewCatalog::DropFtIndexCacheByFtIndexCacheKey(const String &ft_index_cache_key) {
bool delete_success = false;
{
std::unique_lock lock(ft_index_cache_mtx_);
delete_success = ft_index_cache_map_.erase(ft_index_cache_key) > 0;
}
if (!delete_success) {
return Status::CatalogError(fmt::format("FtIndexCache key: {} not found", ft_index_cache_key));
}
return Status::OK();
}

} // namespace infinity
10 changes: 10 additions & 0 deletions src/storage/catalog/new_catalog.cppm
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ namespace infinity {

class NewTxn;
class MemIndex;
class TableIndexReaderCache;

export enum class LockType { kLocking, kLocked, kUnlocking, kUnlocked, kImmutable };

Expand Down Expand Up @@ -116,6 +117,15 @@ public:
private:
std::shared_mutex mem_index_mtx_{};
HashMap<String, SharedPtr<MemIndex>> mem_index_map_{};

public:
Status AddFtIndexCache(String ft_index_cache_key, SharedPtr<TableIndexReaderCache> ft_index_cache);
Status GetFtIndexCache(const String &ft_index_cache_key, SharedPtr<TableIndexReaderCache> &ft_index_cache);
Status DropFtIndexCacheByFtIndexCacheKey(const String &ft_index_cache_key);

private:
std::shared_mutex ft_index_cache_mtx_{};
HashMap<String, SharedPtr<TableIndexReaderCache>> ft_index_cache_map_{};
};

} // namespace infinity
Loading

0 comments on commit 5ae1b64

Please sign in to comment.