Skip to content

Commit

Permalink
Added DictionaryReader (#556)
Browse files Browse the repository at this point in the history
  • Loading branch information
yuzhichang authored Feb 7, 2024
1 parent 0a30a3a commit fdc2a1b
Show file tree
Hide file tree
Showing 8 changed files with 146 additions and 11 deletions.
2 changes: 1 addition & 1 deletion src/storage/invertedindex/column_index_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ SharedPtr<DiskIndexSegmentReader> ColumnIndexReader::CreateDiskSegmentReader(con
Path path = Path(root_dir_) / std::to_string(segment.GetSegmentID());
String dict_file = path.string();
dict_file.append(DICT_SUFFIX);
SharedPtr<DictionaryReader> dict_reader = MakeShared<DictionaryReader>(dict_file);
SharedPtr<DictionaryReader> dict_reader = MakeShared<DictionaryReader>(dict_file, index_config_.GetPostingFormatOption());
return MakeShared<DiskIndexSegmentReader>(root_dir_, segment, index_config_, dict_reader);
}

Expand Down
47 changes: 47 additions & 0 deletions src/storage/invertedindex/common/mmap.cppm
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
module;

#include <fcntl.h>
#include <filesystem>
#include <sys/mman.h>
#include <sys/stat.h>
#include <unistd.h>
import stl;

export module mmap;

using namespace infinity;
namespace fs = std::filesystem;

namespace infinity {

export int MmapFile(const String &fp, u8 *&data_ptr, SizeT &data_len) {
data_ptr = nullptr;
data_len = 0;
long len_f = fs::file_size(fp);
if (len_f == 0)
return -1;
int f = open(fp.c_str(), O_RDONLY);
void *tmpd = mmap(NULL, len_f, PROT_READ, MAP_SHARED, f, 0);
if (tmpd == MAP_FAILED)
return -1;
close(f);
int rc = madvise(tmpd, len_f, MADV_RANDOM | MADV_DONTDUMP);
if (rc < 0)
return -1;
data_ptr = (u8 *)tmpd;
data_len = len_f;
return 0;
}

export int MunmapFile(u8 *&data_ptr, SizeT &data_len) {
if (data_ptr != nullptr) {
int rc = munmap(data_ptr, data_len);
if (rc < 0)
return -1;
data_ptr = nullptr;
data_len = 0;
}
return 0;
}

} // namespace infinity
61 changes: 61 additions & 0 deletions src/storage/invertedindex/dict_reader.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
module;

import stl;
import term_meta;
import posting_list_format;
import fst;
import mmap;
import infinity_exception;
module dict_reader;

namespace infinity {

DictionaryReader::DictionaryReader(const String &dict_path, const PostingFormatOption &option)
: dict_path_(dict_path), meta_loader_(option), data_ptr_(nullptr), data_len_(0) {
int rc = MmapFile(dict_path, data_ptr_, data_len_);
if (rc < 0) {
throw UnrecoverableException("MmapFile failed");
}
// fst_root_addr + addr_offset(21) == fst_len
SizeT fst_root_addr = ReadU64LE(data_ptr_ + data_len_ - 4 - 8);
SizeT fst_len = fst_root_addr + 21;
u8 *fst_data = data_ptr_ + (data_len_ - fst_len);
fst_ = MakeUnique<Fst>(fst_data, fst_len);
}

DictionaryReader::~DictionaryReader() {
if (data_ptr_ != nullptr) {
int rc = MunmapFile(data_ptr_, data_len_);
if (rc < 0) {
throw UnrecoverableException("MunmapFile failed");
}
}
}

bool DictionaryReader::Lookup(const String &key, TermMeta &term_meta) {
u64 val;
bool found = fst_->Get((u8 *)key.c_str(), key.length(), val);
if (!found)
return false;
u8 *data_cursor = data_ptr_ + val;
SizeT left_size = data_len_ - val;
meta_loader_.Load(data_cursor, left_size, term_meta);
return true;
}

void DictionaryReader::LookupPrefix(const String &prefix, Vector<Pair<String, TermMeta>> &term_metas) {
FstStream s(*fst_, (u8 *)prefix.c_str(), prefix.length());
Vector<u8> key;
u64 val;
String term;
TermMeta term_meta;
while (s.Next(key, val)) {
term = String((char *)key.data(), key.size());
u8 *data_cursor = data_ptr_ + val;
SizeT left_size = data_len_ - val;
meta_loader_.Load(data_cursor, left_size, term_meta);
term_metas.push_back({term, term_meta});
}
}

} // namespace infinity
22 changes: 15 additions & 7 deletions src/storage/invertedindex/dict_reader.cppm
Original file line number Diff line number Diff line change
@@ -1,20 +1,28 @@
module;

import stl;
import memory_pool;
import segment_posting;
import index_defines;
import index_config;
import term_meta;
import posting_list_format;
import fst;
export module dict_reader;

namespace infinity {

export class DictionaryReader {
private:
const String &dict_path_;
TermMetaLoader meta_loader_;
u8 *data_ptr_;
SizeT data_len_;
UniquePtr<Fst> fst_;

public:
DictionaryReader(const String &root_path){};
DictionaryReader(const String &dict_path, const PostingFormatOption &option);

~DictionaryReader();

~DictionaryReader() = default;
bool Lookup(const String &key, TermMeta &term_meta);

bool Lookup(const String &key, u64 &value) { return false; }
void LookupPrefix(const String &prefix, Vector<Pair<String, TermMeta>> &term_metas);
};
} // namespace infinity
5 changes: 3 additions & 2 deletions src/storage/invertedindex/disk_segment_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import index_config;
import segment;
import file_reader;
import dict_reader;
import term_meta;
import posting_list_format;

namespace infinity {
Expand All @@ -25,8 +26,8 @@ DiskIndexSegmentReader::DiskIndexSegmentReader(const String &root_path,
DiskIndexSegmentReader::~DiskIndexSegmentReader() {}

bool DiskIndexSegmentReader::GetSegmentPosting(const String &term, docid_t base_doc_id, SegmentPosting &seg_posting, MemoryPool *session_pool) const {
u64 dict_value;
if (!dict_reader_.get() || !dict_reader_->Lookup(term, dict_value))
TermMeta term_meta;
if (!dict_reader_.get() || !dict_reader_->Lookup(term, term_meta))
return false;
/// TODO
return true;
Expand Down
2 changes: 1 addition & 1 deletion src/storage/invertedindex/fst/bytes.cppm
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ u32 ReadU32LE(const u8 *ptr) {
}

/// Read a u64 in little endian format from the beginning of the given slice.
u64 ReadU64LE(const u8 *ptr) {
export u64 ReadU64LE(const u8 *ptr) {
#ifdef HAVE_EFFICIENT_UNALIGNED_ACCESS
u64 result = *(u64 *)ptr;
#else
Expand Down
17 changes: 17 additions & 0 deletions src/storage/invertedindex/fst/fst.cppm
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,7 @@ export struct Bound {
Vector<u8> key_;

Bound() : ty_(kUnbounded){};
Bound(BoundType ty) : ty_(ty){};
Bound(BoundType ty, u8 *key_ptr, SizeT key_len) : ty_(ty), key_(key_ptr, key_ptr + key_len) {}

bool ExceededBy(u8 *inp_ptr, SizeT inp_len) {
Expand Down Expand Up @@ -235,6 +236,22 @@ private:

public:
FstStream(Fst &fst, Bound min = Bound(), Bound max = Bound()) : fst_(fst), end_at_(max) { SeekMin(min); }
FstStream(Fst &fst, u8 *prefix_ptr, SizeT prefix_len) : fst_(fst) {
Bound min(Bound::kIncluded, prefix_ptr, prefix_len);
Bound max(Bound::kExcluded, prefix_ptr, prefix_len);
int i;
for (i = prefix_len - 1; i >= 0; i--) {
prefix_ptr[i]++;
if (prefix_ptr[i] != 0x00) {
break;
}
}
if (i < 0) {
max = Bound();
}
end_at_ = max;
SeekMin(min);
}

void Reset(Bound min = Bound(), Bound max = Bound()) {
end_at_ = max;
Expand Down
1 change: 1 addition & 0 deletions src/storage/invertedindex/fst/mod.cppm
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ module;
export module fst;
export import :build;
export import :fst;
export import :bytes;
export import :error;
export import :writer;
export import :registry;

0 comments on commit fdc2a1b

Please sign in to comment.