Skip to content

Commit

Permalink
Buffer version file (#1112)
Browse files Browse the repository at this point in the history
### What problem does this PR solve?

1. Add: use buffer manager cache version file.
2. Fix scheduler task number decrease bug.
3. Fix: mem hnsw bug when using MemIndexInserterIter
4. Test: mem hnsw insert test.
5. Fix: add delta op which is already in full checkpoint.

Issue link:#1042

### Type of change

- [x] Bug Fix (non-breaking change which fixes an issue)
- [x] Test cases
  • Loading branch information
small-turtle-1 authored Apr 25, 2024
1 parent 2587ce0 commit e7d7a3b
Show file tree
Hide file tree
Showing 40 changed files with 617 additions and 241 deletions.
3 changes: 1 addition & 2 deletions src/executor/operator/physical_import.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -605,8 +605,7 @@ void PhysicalImport::JSONLRowHandler(const nlohmann::json &line_json, Vector<Col
}

void PhysicalImport::SaveSegmentData(TableEntry *table_entry, Txn *txn, SharedPtr<SegmentEntry> segment_entry) {
TxnTimeStamp flush_ts = txn->BeginTS();
segment_entry->FlushNewData(flush_ts);
segment_entry->FlushNewData();

const String &db_name = *table_entry->GetDBName();
const String &table_name = *table_entry->GetTableName();
Expand Down
2 changes: 1 addition & 1 deletion src/scheduler/task_scheduler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -273,7 +273,7 @@ void TaskScheduler::WorkerLoop(FragmentTaskBlockQueue *task_queue, i64 worker_id
--worker_workloads_[worker_id];
iter = task_lists.erase(iter);
}
if (finish) {
if (finish || error) {
fragment_ctx->notifier()->FinishTask(error, fragment_ctx);
} else {
fragment_ctx->notifier()->UnstartTask();
Expand Down
3 changes: 1 addition & 2 deletions src/storage/bg_task/compact_segments_task.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -250,10 +250,9 @@ void CompactSegmentsTask::SaveSegmentsData(CompactSegmentsTaskState &state) {
Vector<WalSegmentInfo> segment_infos;
Vector<SegmentID> old_segment_ids;

TxnTimeStamp flush_ts = txn_->BeginTS();
for (auto &[new_segment, old_segments] : segment_data) {
if (new_segment->row_count() > 0) {
new_segment->FlushNewData(flush_ts);
new_segment->FlushNewData();
segment_infos.push_back(WalSegmentInfo(new_segment.get()));
}

Expand Down
2 changes: 2 additions & 0 deletions src/storage/buffer/buffer_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ BufferManager::BufferManager(u64 memory_limit, SharedPtr<String> data_dir, Share
fs.CleanupDirectory(*temp_dir_);
}

BufferManager::~BufferManager() { RemoveClean(); }

BufferObj *BufferManager::AllocateBufferObject(UniquePtr<FileWorker> file_worker) {
String file_path = file_worker->GetFilePath();
auto buffer_obj = MakeUnique<BufferObj>(this, true, std::move(file_worker));
Expand Down
2 changes: 2 additions & 0 deletions src/storage/buffer/buffer_manager.cppm
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ export class BufferManager {
public:
explicit BufferManager(u64 memory_limit, SharedPtr<String> data_dir, SharedPtr<String> temp_dir);

~BufferManager();

public:
// Create a new BufferHandle, or in replay process. (read data block from wal)
BufferObj *AllocateBufferObject(UniquePtr<FileWorker> file_worker);
Expand Down
2 changes: 2 additions & 0 deletions src/storage/buffer/buffer_obj.cppm
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,8 @@ public:

String GetFilename() const { return file_worker_->GetFilePath(); }

FileWorker *file_worker() { return file_worker_.get(); }

private:
// Friend to encapsulate `Unload` interface and to increase `rc_`.
friend class BufferHandle;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ public:
void FreeInMemory() override;

protected:
void WriteToFileImpl(bool &prepare_success) override;
void WriteToFileImpl(bool to_spill, bool &prepare_success) override;

void ReadFromFileImpl() override;

Expand Down Expand Up @@ -119,7 +119,7 @@ void AnnIVFFlatIndexFileWorker<DataType>::FreeInMemory() {
}

template <typename DataType>
void AnnIVFFlatIndexFileWorker<DataType>::WriteToFileImpl(bool &prepare_success) {
void AnnIVFFlatIndexFileWorker<DataType>::WriteToFileImpl(bool to_spill, bool &prepare_success) {
auto *index = static_cast<AnnIVFFlatIndexData<DataType> *>(data_);
index->SaveIndexInner(*file_handler_);
prepare_success = true;
Expand Down
3 changes: 2 additions & 1 deletion src/storage/buffer/file_worker/data_file_worker.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,8 @@ void DataFileWorker::FreeInMemory() {
data_ = nullptr;
}

void DataFileWorker::WriteToFileImpl(bool &prepare_success) {
// FIXME: to_spill
void DataFileWorker::WriteToFileImpl(bool to_spill, bool &prepare_success) {
LocalFileSystem fs;
// File structure:
// - header: magic number
Expand Down
2 changes: 1 addition & 1 deletion src/storage/buffer/file_worker/data_file_worker.cppm
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ public:
SizeT GetMemoryCost() const override { return buffer_size_; }

protected:
void WriteToFileImpl(bool &prepare_success) override;
void WriteToFileImpl(bool to_spill, bool &prepare_success) override;

void ReadFromFileImpl() override;

Expand Down
2 changes: 1 addition & 1 deletion src/storage/buffer/file_worker/file_worker.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ void FileWorker::WriteToFile(bool to_spill) {
file_handler_ = nullptr;
});

WriteToFileImpl(prepare_success);
WriteToFileImpl(to_spill, prepare_success);
if (prepare_success) {
if (to_spill) {
LOG_TRACE(fmt::format("Write to spill file {} finished. success {}", write_path, prepare_success));
Expand Down
2 changes: 1 addition & 1 deletion src/storage/buffer/file_worker/file_worker.cppm
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ public:
void CleanupFile();

protected:
virtual void WriteToFileImpl(bool &prepare_success) = 0;
virtual void WriteToFileImpl(bool to_spill, bool &prepare_success) = 0;

virtual void ReadFromFileImpl() = 0;

Expand Down
2 changes: 1 addition & 1 deletion src/storage/buffer/file_worker/hnsw_file_worker.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ void HnswFileWorker::FreeInMemory() {
data_ = nullptr;
}

void HnswFileWorker::WriteToFileImpl(bool &prepare_success) {
void HnswFileWorker::WriteToFileImpl(bool to_spill, bool &prepare_success) {
if (!data_) {
UnrecoverableError("WriteToFileImpl: Data is not allocated.");
}
Expand Down
2 changes: 1 addition & 1 deletion src/storage/buffer/file_worker/hnsw_file_worker.cppm
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ public:
void FreeInMemory() override;

protected:
void WriteToFileImpl(bool &prepare_success) override;
void WriteToFileImpl(bool to_spill, bool &prepare_success) override;

void ReadFromFileImpl() override;

Expand Down
2 changes: 1 addition & 1 deletion src/storage/buffer/file_worker/raw_file_worker.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ void RawFileWorker::FreeInMemory() {
data_ = nullptr;
}

void RawFileWorker::WriteToFileImpl(bool &prepare_success) {
void RawFileWorker::WriteToFileImpl(bool to_spill, bool &prepare_success) {
assert(data_ != nullptr && buffer_size_ > 0);
LocalFileSystem fs;
i64 nbytes = fs.Write(*file_handler_, data_, buffer_size_);
Expand Down
2 changes: 1 addition & 1 deletion src/storage/buffer/file_worker/raw_file_worker.cppm
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ public:
SizeT GetMemoryCost() const override { return buffer_size_; }

protected:
void WriteToFileImpl(bool &prepare_success) override;
void WriteToFileImpl(bool to_spill, bool &prepare_success) override;

void ReadFromFileImpl() override;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ void SecondaryIndexFileWorker::FreeInMemory() {
}
}

void SecondaryIndexFileWorker::WriteToFileImpl(bool &prepare_success) {
void SecondaryIndexFileWorker::WriteToFileImpl(bool to_spill, bool &prepare_success) {
if (data_) [[likely]] {
if (worker_id_ == 0) {
auto index = static_cast<SecondaryIndexDataHead *>(data_);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ public:
void FreeInMemory() final;

protected:
void WriteToFileImpl(bool &prepare_success) final;
void WriteToFileImpl(bool to_spill, bool &prepare_success) final;

void ReadFromFileImpl() final;

Expand Down
79 changes: 79 additions & 0 deletions src/storage/buffer/file_worker/version_file_worker.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
// Copyright(C) 2023 InfiniFlow, Inc. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

module;

module version_file_worker;

import stl;
import file_worker;
import block_version;
import infinity_exception;

namespace infinity {

VersionFileWorker::VersionFileWorker(SharedPtr<String> file_dir, SharedPtr<String> file_name, SizeT capacity)
: FileWorker(std::move(file_dir), std::move(file_name)), capacity_(capacity) {}

VersionFileWorker::~VersionFileWorker() {
if (data_ != nullptr) {
FreeInMemory();
data_ = nullptr;
}
}

void VersionFileWorker::AllocateInMemory() {
if (data_ != nullptr) {
UnrecoverableError("Data is already allocated.");
}
if (capacity_ == 0) {
UnrecoverableError("Capacity is 0.");
}
auto *data = new BlockVersion(capacity_);
data_ = static_cast<void *>(data);
}

void VersionFileWorker::FreeInMemory() {
if (data_ == nullptr) {
UnrecoverableError("Data is already freed.");
}
auto *data = static_cast<BlockVersion *>(data_);
delete data;
data_ = nullptr;
}

// FIXME
SizeT VersionFileWorker::GetMemoryCost() const { return capacity_ * sizeof(TxnTimeStamp); }

void VersionFileWorker::WriteToFileImpl(bool to_spill, bool &prepare_success) {
if (data_ == nullptr) {
UnrecoverableError("Data is not allocated.");
}
auto *data = static_cast<BlockVersion *>(data_);
if (to_spill) {
data->SpillToFile(*file_handler_);
} else {
data->SaveToFile(checkpoint_ts_, *file_handler_);
}
}

void VersionFileWorker::ReadFromFileImpl() {
if (data_ != nullptr) {
UnrecoverableError("Data is already allocated.");
}
auto *data = BlockVersion::LoadFromFile(*file_handler_).release();
data_ = static_cast<void *>(data);
}

} // namespace infinity
49 changes: 49 additions & 0 deletions src/storage/buffer/file_worker/version_file_worker.cppm
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
// Copyright(C) 2023 InfiniFlow, Inc. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

module;

export module version_file_worker;

import stl;
import file_worker;

namespace infinity {

export class VersionFileWorker : public FileWorker {
public:
explicit VersionFileWorker(SharedPtr<String> file_dir, SharedPtr<String> file_name, SizeT capacity);

virtual ~VersionFileWorker() override;

public:
void AllocateInMemory() override;

void FreeInMemory() override;

SizeT GetMemoryCost() const override;

void SetCheckpointTS(TxnTimeStamp ts) { checkpoint_ts_ = ts; }

protected:
void WriteToFileImpl(bool to_spill, bool &prepare_success) override;

void ReadFromFileImpl() override;

private:
SizeT capacity_{};
TxnTimeStamp checkpoint_ts_{};
};

} // namespace infinity
7 changes: 5 additions & 2 deletions src/storage/compaction_process.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -91,8 +91,11 @@ void CompactionProcessor::ScanAndOptimize() {
table_entry->OptimizeIndex(opt_txn);
}
}

txn_mgr_->CommitTxn(opt_txn);
try {
txn_mgr_->CommitTxn(opt_txn);
} catch (const RecoverableException &e) {
txn_mgr_->RollBackTxn(opt_txn);
}
}

void CompactionProcessor::Process() {
Expand Down
4 changes: 2 additions & 2 deletions src/storage/meta/catalog.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -492,7 +492,7 @@ UniquePtr<CatalogDeltaEntry> Catalog::LoadFromFileDelta(const DeltaCatalogFileIn
}

void Catalog::LoadFromEntryDelta(TxnTimeStamp max_commit_ts, BufferManager *buffer_mgr) {
auto delta_entry = global_catalog_delta_entry_->PickFlushEntry(full_ckp_commit_ts_, max_commit_ts);
auto delta_entry = global_catalog_delta_entry_->PickFlushEntry(max_commit_ts);

auto &delta_ops = delta_entry->operations();
for (auto &op : delta_ops) {
Expand Down Expand Up @@ -913,7 +913,7 @@ bool Catalog::SaveDeltaCatalog(TxnTimeStamp max_commit_ts, String &delta_catalog
delta_catalog_path = fmt::format("{}/{}", *catalog_dir_, CatalogFile::DeltaCheckpointFilename(max_commit_ts));

// Check the SegmentEntry's for flush the data to disk.
UniquePtr<CatalogDeltaEntry> flush_delta_entry = global_catalog_delta_entry_->PickFlushEntry(full_ckp_commit_ts_, max_commit_ts);
UniquePtr<CatalogDeltaEntry> flush_delta_entry = global_catalog_delta_entry_->PickFlushEntry(max_commit_ts);

DeferFn defer_fn([&]() { txn_mgr_->RemoveWaitFlushTxns(flush_delta_entry->txn_ids()); });

Expand Down
11 changes: 6 additions & 5 deletions src/storage/meta/entry/block_column_entry.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ void BlockColumnEntry::Append(const ColumnVector *input_column_vector, u16 input
column_vector.AppendWith(*input_column_vector, input_column_vector_offset, append_rows);
}

void BlockColumnEntry::Flush(BlockColumnEntry *block_column_entry, SizeT checkpoint_row_count) {
void BlockColumnEntry::Flush(BlockColumnEntry *block_column_entry, SizeT start_row_count, SizeT checkpoint_row_count) {
// TODO: Opt, Flush certain row_count content
DataType *column_type = block_column_entry->column_type_.get();
switch (column_type->type()) {
Expand Down Expand Up @@ -157,7 +157,7 @@ void BlockColumnEntry::Flush(BlockColumnEntry *block_column_entry, SizeT checkpo

std::shared_lock lock(block_column_entry->mutex_);
for (auto *outline_buffer : block_column_entry->outline_buffers_) {
if(outline_buffer != nullptr) {
if (outline_buffer != nullptr) {
outline_buffer->Save();
}
}
Expand Down Expand Up @@ -223,10 +223,11 @@ BlockColumnEntry::Deserialize(const nlohmann::json &column_data_json, BlockEntry
}

void BlockColumnEntry::CommitColumn(TransactionID txn_id, TxnTimeStamp commit_ts) {
if (!this->Committed()) {
this->txn_id_ = txn_id;
this->Commit(commit_ts);
if (this->Committed()) {
UnrecoverableError("Column already committed");
}
this->txn_id_ = txn_id;
this->Commit(commit_ts);
}

Vector<String> BlockColumnEntry::OutlinePaths() const {
Expand Down
2 changes: 1 addition & 1 deletion src/storage/meta/entry/block_column_entry.cppm
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ public:
public:
void Append(const ColumnVector *input_column_vector, u16 input_offset, SizeT append_rows, BufferManager *buffer_mgr);

static void Flush(BlockColumnEntry *block_column_entry, SizeT row_count);
static void Flush(BlockColumnEntry *block_column_entry, SizeT start_row_count, SizeT checkpoint_row_count);

void Cleanup();

Expand Down
Loading

0 comments on commit e7d7a3b

Please sign in to comment.