Skip to content

Commit

Permalink
Benchmark fulltext insertion (#968)
Browse files Browse the repository at this point in the history
### What problem does this PR solve?

Boost fulltext insertion performance.

Issue link:#358

### Type of change

- [x] Refactoring
- [x] Performance Improvement
  • Loading branch information
yuzhichang authored Apr 9, 2024
1 parent 42d7742 commit 58b3217
Show file tree
Hide file tree
Showing 26 changed files with 331 additions and 369 deletions.
10 changes: 4 additions & 6 deletions benchmark/local_infinity/fulltext/fulltext_import_benchmark.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ void BenchmarkInsert(SharedPtr<Infinity> infinity, const String &db_name, const

profiler.Begin();
SizeT num_rows = 0;
const int batch_size = 100;
const int batch_size = 1;
Vector<Tuple<char *, char *, char *>> batch;
batch.reserve(batch_size);

Expand Down Expand Up @@ -173,12 +173,7 @@ void BenchmarkInsert(SharedPtr<Infinity> infinity, const String &db_name, const
values->push_back(value_list);
}
infinity->Insert(db_name, table_name, columns, values);

// NOTE: ~InsertStatement() has deleted or freed columns, values, value_list, const_expr, const_expr->str_value_
if (batch.size() < batch_size) {
done = true;
break;
}
}
input_file.close();
LOG_INFO(fmt::format("Insert data {} rows cost: {}", num_rows, profiler.ElapsedToString()));
Expand Down Expand Up @@ -277,13 +272,16 @@ int main(int argc, char* argv[]) {
}

if (is_import) {
BenchmarkCreateIndex(infinity, db_name, table_name, index_name);
BenchmarkImport(infinity, db_name, table_name, srcfile);
}
if (is_insert) {
BenchmarkCreateIndex(infinity, db_name, table_name, index_name);
BenchmarkInsert(infinity, db_name, table_name, srcfile);
}
if (is_merge) {
BenchmarkCreateIndex(infinity, db_name, table_name, index_name);
BenchmarkInsert(infinity, db_name, table_name, srcfile);
BenchmarkOptimize(infinity, db_name, table_name);
}
sleep(10);
Expand Down
4 changes: 2 additions & 2 deletions src/common/default_values.cppm
Original file line number Diff line number Diff line change
Expand Up @@ -86,10 +86,10 @@ export {
constexpr SizeT DEFAULT_BASE_FILE_SIZE = 8 * 1024;
constexpr SizeT DEFAULT_OUTLINE_FILE_MAX_SIZE = 16 * 1024 * 1024;

constexpr SizeT DEFAULT_WAL_FILE_SIZE_THRESHOLD = 8 * 1024;
constexpr SizeT DEFAULT_WAL_FILE_SIZE_THRESHOLD = 1 * GB;
constexpr SizeT FULL_CHECKPOINT_INTERVAL_SEC = 30; // 30 seconds
constexpr SizeT DELTA_CHECKPOINT_INTERVAL_SEC = 5; // 5 seconds
constexpr SizeT DELTA_CHECKPOINT_INTERVAL_WAL_BYTES = 64 * 1024;
constexpr SizeT DELTA_CHECKPOINT_INTERVAL_WAL_BYTES = 64 * MB;
constexpr std::string_view WAL_FILE_TEMP_FILE = "wal.log";
constexpr std::string_view WAL_FILE_PREFIX = "wal.log";
constexpr std::string_view CATALOG_FILE_DIR = "catalog";
Expand Down
11 changes: 6 additions & 5 deletions src/storage/invertedindex/column_index_merger.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import third_party;
import file_system;
import file_system_type;
import infinity_exception;
import vector_with_lock;

namespace infinity {
ColumnIndexMerger::ColumnIndexMerger(const String &index_dir, optionflag_t flag, MemoryPool *memory_pool, RecyclePool *buffer_pool)
Expand All @@ -34,7 +35,7 @@ ColumnIndexMerger::ColumnIndexMerger(const String &index_dir, optionflag_t flag,
ColumnIndexMerger::~ColumnIndexMerger() {}

SharedPtr<PostingMerger> ColumnIndexMerger::CreatePostingMerger() {
return MakeShared<PostingMerger>(memory_pool_, buffer_pool_, flag_, column_length_mutex_, column_length_array_);
return MakeShared<PostingMerger>(memory_pool_, buffer_pool_, flag_, column_lengths_);
}

void ColumnIndexMerger::Merge(const Vector<String> &base_names, const Vector<RowID> &base_rowids, const String &dst_base_name) {
Expand Down Expand Up @@ -69,17 +70,17 @@ void ColumnIndexMerger::Merge(const Vector<String> &base_names, const Vector<Row
// prepare column length info
// the indexes to be merged should be from the same segment
// otherwise the range of row_id will be very large ( >= 2^32)
std::unique_lock<std::shared_mutex> lock(column_length_mutex_);
column_length_array_.clear();
Vector<u32> &unsafe_column_lengths = column_lengths_.UnsafeVec();
unsafe_column_lengths.clear();
for (u32 i = 0; i < base_names.size(); ++i) {
String column_len_file = (Path(index_dir_) / base_names[i]).string() + LENGTH_SUFFIX;
RowID base_row_id = base_rowids[i];
u32 id_offset = base_row_id - merge_base_rowid;
UniquePtr<FileHandler> file_handler = fs_.OpenFile(column_len_file, FileFlags::READ_FLAG, FileLockType::kNoLock);
const u32 file_size = fs_.GetFileSize(*file_handler);
u32 file_read_array_len = file_size / sizeof(u32);
column_length_array_.resize(id_offset + file_read_array_len);
const i64 read_count = fs_.Read(*file_handler, column_length_array_.data() + id_offset, file_size);
unsafe_column_lengths.resize(id_offset + file_read_array_len);
const i64 read_count = fs_.Read(*file_handler, unsafe_column_lengths.data() + id_offset, file_size);
file_handler->Close();
if (read_count != file_size) {
UnrecoverableError("ColumnIndexMerger: when loading column length file, read_count != file_size");
Expand Down
4 changes: 2 additions & 2 deletions src/storage/invertedindex/column_index_merger.cppm
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import column_index_iterator;
import segment_term_posting;
import local_file_system;
import internal_types;
import vector_with_lock;

namespace infinity {
export class ColumnIndexMerger {
Expand All @@ -37,7 +38,6 @@ private:
LocalFileSystem fs_;

// for column length info
std::shared_mutex column_length_mutex_;
Vector<u32> column_length_array_;
VectorWithLock<u32> column_lengths_;
};
} // namespace infinity
23 changes: 11 additions & 12 deletions src/storage/invertedindex/column_index_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -47,26 +47,25 @@ void ColumnIndexReader::Open(optionflag_t flag, String &&index_dir, Map<SegmentI
index_by_segment_ = std::move(index_by_segment);
// need to ensure that segment_id is in ascending order
for (const auto &[segment_id, segment_index_entry] : index_by_segment_) {
auto [base_names, base_row_ids, memory_indexer] = segment_index_entry->GetFullTextIndexSnapshot();
auto [chunk_index_entries, memory_indexer] = segment_index_entry->GetFullTextIndexSnapshot();
// segment_readers
for (u32 i = 0; i < base_names.size(); ++i) {
SharedPtr<DiskIndexSegmentReader> segment_reader = MakeShared<DiskIndexSegmentReader>(index_dir_, base_names[i], base_row_ids[i], flag);
for (u32 i = 0; i < chunk_index_entries.size(); ++i) {
SharedPtr<DiskIndexSegmentReader> segment_reader =
MakeShared<DiskIndexSegmentReader>(index_dir_, chunk_index_entries[i]->base_name_, chunk_index_entries[i]->base_rowid_, flag);
segment_readers_.push_back(std::move(segment_reader));
}
// for loading column length files
base_names_.insert(base_names_.end(), std::move_iterator(base_names.begin()), std::move_iterator(base_names.end()));
base_row_ids_.insert(base_row_ids_.end(), base_row_ids.begin(), base_row_ids.end());
if (memory_indexer and memory_indexer->GetDocCount() != 0) {
chunk_index_entries_.insert(chunk_index_entries_.end(),
std::move_iterator(chunk_index_entries.begin()),
std::move_iterator(chunk_index_entries.end()));
if (memory_indexer.get() != nullptr && memory_indexer->GetDocCount() != 0) {
// segment_reader
SharedPtr<InMemIndexSegmentReader> segment_reader = MakeShared<InMemIndexSegmentReader>(memory_indexer);
SharedPtr<InMemIndexSegmentReader> segment_reader = MakeShared<InMemIndexSegmentReader>(memory_indexer.get());
segment_readers_.push_back(std::move(segment_reader));
// for loading column length file
base_names_.push_back(memory_indexer->GetBaseName());
base_row_ids_.push_back(memory_indexer->GetBaseRowId());
assert(memory_indexer_.get() == nullptr);
memory_indexer_ = memory_indexer;
}
}
// put an INVALID_ROWID at the end of base_row_ids_
base_row_ids_.emplace_back(INVALID_ROWID);
}

UniquePtr<PostingIterator> ColumnIndexReader::Lookup(const String &term, MemoryPool *session_pool) {
Expand Down
5 changes: 3 additions & 2 deletions src/storage/invertedindex/column_index_reader.cppm
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import index_defines;
import memory_indexer;
import internal_types;
import segment_index_entry;
import chunk_index_entry;

export module column_index_reader;

Expand All @@ -49,8 +50,8 @@ private:
public:
// for loading column length files
String index_dir_;
Vector<String> base_names_;
Vector<RowID> base_row_ids_;
Vector<SharedPtr<ChunkIndexEntry>> chunk_index_entries_;
SharedPtr<MemoryIndexer> memory_indexer_{nullptr};
};

namespace detail {
Expand Down
25 changes: 14 additions & 11 deletions src/storage/invertedindex/column_inverter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import term;
import radix_sort;
import index_defines;
import posting_writer;
import vector_with_lock;
import infinity_exception;
import third_party;
import status;
Expand All @@ -41,8 +42,8 @@ static u32 Align(u32 unaligned) {
return (unaligned + T - 1) & (-T);
}

ColumnInverter::ColumnInverter(const String &analyzer, PostingWriterProvider posting_writer_provider)
: analyzer_(AnalyzerPool::instance().Get(analyzer)), posting_writer_provider_(posting_writer_provider) {
ColumnInverter::ColumnInverter(const String &analyzer, PostingWriterProvider posting_writer_provider, VectorWithLock<u32> &column_lengths)
: analyzer_(AnalyzerPool::instance().Get(analyzer)), posting_writer_provider_(posting_writer_provider), column_lengths_(column_lengths) {
if (analyzer_.get() == nullptr) {
RecoverableError(Status::UnexpectedError(fmt::format("Invalid analyzer: {}", analyzer)));
}
Expand All @@ -52,19 +53,27 @@ ColumnInverter::~ColumnInverter() = default;

bool ColumnInverter::CompareTermRef::operator()(const u32 lhs, const u32 rhs) const { return std::strcmp(GetTerm(lhs), GetTerm(rhs)) < 0; }

void ColumnInverter::InvertColumn(SharedPtr<ColumnVector> column_vector, u32 row_offset, u32 row_count, u32 begin_doc_id) {
SizeT ColumnInverter::InvertColumn(SharedPtr<ColumnVector> column_vector, u32 row_offset, u32 row_count, u32 begin_doc_id) {
begin_doc_id_ = begin_doc_id;
doc_count_ = row_count;
Vector<u32> column_lengths(row_count);
SizeT term_count_sum = 0;
for (SizeT i = 0; i < row_count; ++i) {
String data = column_vector->ToString(row_offset + i);
InvertColumn(begin_doc_id + i, data);
SizeT term_count = InvertColumn(begin_doc_id + i, data);
column_lengths[i] = term_count;
term_count_sum += term_count;
}
column_lengths_.SetBatch(begin_doc_id, column_lengths);
return term_count_sum;
}

void ColumnInverter::InvertColumn(u32 doc_id, const String &val) {
SizeT ColumnInverter::InvertColumn(u32 doc_id, const String &val) {
auto terms_once_ = MakeUnique<TermList>();
analyzer_->Analyze(val, *terms_once_);
SizeT term_count = terms_once_->size();
terms_per_doc_.push_back(Pair<u32, UniquePtr<TermList>>(doc_id, std::move(terms_once_)));
return term_count;
}

u32 ColumnInverter::AddTerm(StringRef term) {
Expand Down Expand Up @@ -125,12 +134,6 @@ void ColumnInverter::Merge(Vector<SharedPtr<ColumnInverter>> &inverters) {
}
}

void ColumnInverter::GetTermListLength(u32 *term_list_length_ptr) const {
for (const auto &[_, term_list] : terms_per_doc_) {
*(term_list_length_ptr++) = term_list->size();
}
}

struct TermRefRadix {
u32 operator()(const u64 v) { return v >> 32; }
};
Expand Down
12 changes: 6 additions & 6 deletions src/storage/invertedindex/column_inverter.cppm
Original file line number Diff line number Diff line change
Expand Up @@ -26,21 +26,20 @@ import term;
import string_ref;
import internal_types;
import posting_writer;
import vector_with_lock;

namespace infinity {

export class ColumnInverter {
public:
ColumnInverter(const String &analyzer, PostingWriterProvider posting_writer_provider);
ColumnInverter(const String &analyzer, PostingWriterProvider posting_writer_provider, VectorWithLock<u32> &column_lengths);
ColumnInverter(const ColumnInverter &) = delete;
ColumnInverter(const ColumnInverter &&) = delete;
ColumnInverter &operator=(const ColumnInverter &) = delete;
ColumnInverter &operator=(const ColumnInverter &&) = delete;
~ColumnInverter();

void InvertColumn(SharedPtr<ColumnVector> column_vector, u32 row_offset, u32 row_count, u32 begin_doc_id);

void InvertColumn(u32 doc_id, const String &val);
SizeT InvertColumn(SharedPtr<ColumnVector> column_vector, u32 row_offset, u32 row_count, u32 begin_doc_id);

void SortForOfflineDump();

Expand All @@ -52,8 +51,6 @@ public:

void GeneratePosting();

void GetTermListLength(u32 *term_list_length_ptr) const;

u32 GetDocCount() { return doc_count_; }

u32 GetMerged() { return merged_; }
Expand Down Expand Up @@ -91,6 +88,8 @@ private:
bool operator()(const u32 lhs, const u32 rhs) const;
};

SizeT InvertColumn(u32 doc_id, const String &val);

const char *GetTermFromRef(u32 term_ref) const { return &terms_[term_ref << 2]; }

const char *GetTermFromNum(u32 term_num) const { return GetTermFromRef(term_refs_[term_num]); }
Expand Down Expand Up @@ -120,5 +119,6 @@ private:
U32Vec term_refs_;
Vector<Pair<u32, UniquePtr<TermList>>> terms_per_doc_;
PostingWriterProvider posting_writer_provider_{};
VectorWithLock<u32> &column_lengths_;
};
} // namespace infinity
74 changes: 74 additions & 0 deletions src/storage/invertedindex/common/vector_with_lock.cppm
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
// Copyright(C) 2023 InfiniFlow, Inc. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

module;

export module vector_with_lock;
import stl;

namespace infinity {

export template <typename ValueType>
class VectorWithLock {
private:
std::shared_mutex mutex_;
Vector<ValueType> vec_;

public:
VectorWithLock() = default;
VectorWithLock(SizeT count) : vec_(count) {}
VectorWithLock(SizeT count, const ValueType &value) : vec_(count, value) {}

~VectorWithLock() = default;

void Resize(SizeT new_size) {
std::unique_lock<std::shared_mutex> lock(mutex_);
vec_.resize(new_size);
}

ValueType Get(SizeT i) {
std::shared_lock<std::shared_mutex> lock(mutex_);
return vec_[i];
}

ValueType Sum() {
ValueType sum = 0;
std::shared_lock<std::shared_mutex> lock(mutex_);
for (SizeT i = 0; i < vec_.size(); ++i)
sum += vec_[i];
return sum;
}

void Set(SizeT i, const ValueType &value) {
std::unique_lock<std::shared_mutex> lock(mutex_);
vec_[i] = value;
}

void SetBatch(SizeT begin_idx, const Vector<ValueType> &values) {
SizeT required_size = begin_idx + values.size();
std::unique_lock<std::shared_mutex> lock(mutex_);
if (vec_.size() < required_size)
vec_.resize(required_size);
for (SizeT i = 0; i < values.size(); ++i)
vec_[begin_idx + i] = values[i];
}

Vector<ValueType> &UnsafeVec() { return vec_; }

void Clear() {
std::unique_lock<std::shared_mutex> lock(mutex_);
vec_.clear();
}
};
} // namespace infinity
Loading

0 comments on commit 58b3217

Please sign in to comment.