Skip to content

Commit

Permalink
Support real time secondary index (#1187)
Browse files Browse the repository at this point in the history
### What problem does this PR solve?

Support real time secondary index

Issue link:#477

### Type of change

- [x] New Feature (non-breaking change which adds functionality)
- [x] Breaking Change (fix or feature that could cause existing
functionality not to work as expected)
- [x] Refactoring
- [x] Test cases
  • Loading branch information
yangzq50 authored May 8, 2024
1 parent bd2f3a3 commit 73f85c2
Show file tree
Hide file tree
Showing 29 changed files with 1,946 additions and 1,291 deletions.
442 changes: 262 additions & 180 deletions src/executor/operator/physical_index_scan.cpp

Large diffs are not rendered by default.

3 changes: 2 additions & 1 deletion src/executor/operator/physical_index_scan.cppm
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ export std::variant<Vector<u32>, Bitmask> SolveSecondaryIndexFilter(const Vector
const HashMap<ColumnID, TableIndexEntry *> &column_index_map,
const SegmentID segment_id,
const u32 segment_row_count,
const u32 segment_row_actual_count);
const u32 segment_row_actual_count,
const TxnTimeStamp ts);

} // namespace infinity
114 changes: 77 additions & 37 deletions src/storage/buffer/file_worker/secondary_index_file_worker.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,66 +41,106 @@ void SecondaryIndexFileWorker::AllocateInMemory() {
if (data_) [[unlikely]] {
UnrecoverableError("AllocateInMemory: Already allocated.");
} else if (auto &data_type = column_def_->type(); data_type->CanBuildSecondaryIndex()) [[likely]] {
if (worker_id_ == 0) {
data_ = static_cast<void *>(new SecondaryIndexDataHead(part_capacity_, row_count_, data_type));
} else {
if (u32 previous_rows = (worker_id_ - 1) * part_capacity_; previous_rows < row_count_) [[likely]] {
auto part_size = std::min<u32>(part_capacity_, row_count_ - previous_rows);
data_ = static_cast<void *>(new SecondaryIndexDataPart(worker_id_ - 1, part_size));
} else {
UnrecoverableError(fmt::format("AllocateInMemory: previous_rows: {} >= row_count_: {}.", previous_rows, row_count_));
}
}
LOG_TRACE(fmt::format("Finished AllocateInMemory() by worker_id: {}.", worker_id_));
data_ = static_cast<void *>(GetSecondaryIndexData(data_type, row_count_, true));
LOG_TRACE("Finished AllocateInMemory().");
} else {
UnrecoverableError(fmt::format("Cannot build secondary index on data type: {}", data_type->ToString()));
}
}

void SecondaryIndexFileWorker::FreeInMemory() {
if (data_) [[likely]] {
if (worker_id_ == 0) {
auto index = static_cast<SecondaryIndexDataHead *>(data_);
delete index;
} else {
auto index = static_cast<SecondaryIndexDataPart *>(data_);
delete index;
}
auto index = static_cast<SecondaryIndexData *>(data_);
delete index;
data_ = nullptr;
LOG_TRACE(fmt::format("Finished FreeInMemory() by worker_id: {}, deleted data_ ptr.", worker_id_));
LOG_TRACE("Finished FreeInMemory(), deleted data_ ptr.");
} else {
UnrecoverableError("FreeInMemory: Data is not allocated.");
}
}

void SecondaryIndexFileWorker::WriteToFileImpl(bool to_spill, bool &prepare_success) {
if (data_) [[likely]] {
if (worker_id_ == 0) {
auto index = static_cast<SecondaryIndexDataHead *>(data_);
index->SaveIndexInner(*file_handler_);
} else {
auto index = static_cast<SecondaryIndexDataPart *>(data_);
index->SaveIndexInner(*file_handler_);
}
auto index = static_cast<SecondaryIndexData *>(data_);
index->SaveIndexInner(*file_handler_);
prepare_success = true;
LOG_TRACE(fmt::format("Finished WriteToFileImpl(bool &prepare_success) by worker_id: {}.", worker_id_));
LOG_TRACE("Finished WriteToFileImpl(bool &prepare_success).");
} else {
UnrecoverableError("WriteToFileImpl: data_ is nullptr");
}
}

void SecondaryIndexFileWorker::ReadFromFileImpl() {
if (!data_) [[likely]] {
if (worker_id_ == 0) {
auto index = new SecondaryIndexDataHead();
index->ReadIndexInner(*file_handler_);
data_ = static_cast<void *>(index);
} else {
auto index = new SecondaryIndexDataPart();
index->ReadIndexInner(*file_handler_);
data_ = static_cast<void *>(index);
}
LOG_TRACE(fmt::format("Finished ReadFromFileImpl() by worker_id: {}.", worker_id_));
auto index = GetSecondaryIndexData(column_def_->type(), row_count_, false);
index->ReadIndexInner(*file_handler_);
data_ = static_cast<void *>(index);
LOG_TRACE("Finished ReadFromFileImpl().");
} else {
UnrecoverableError("ReadFromFileImpl: data_ is not nullptr");
}
}

SecondaryIndexFileWorkerParts::SecondaryIndexFileWorkerParts(SharedPtr<String> file_dir,
SharedPtr<String> file_name,
SharedPtr<IndexBase> index_base,
SharedPtr<ColumnDef> column_def,
u32 row_count,
u32 part_id)
: IndexFileWorker(file_dir, file_name, index_base, column_def), row_count_(row_count), part_id_(part_id) {
data_pair_size_ = GetSecondaryIndexDataPairSize(column_def_->type());
}

SecondaryIndexFileWorkerParts::~SecondaryIndexFileWorkerParts() {
if (data_ != nullptr) {
FreeInMemory();
data_ = nullptr;
}
}

void SecondaryIndexFileWorkerParts::AllocateInMemory() {
if (row_count_ < part_id_ * 8192) {
UnrecoverableError(fmt::format("AllocateInMemory: row_count_: {} < part_id_ * 8192: {}", row_count_, part_id_ * 8192));
}
if (data_) [[unlikely]] {
UnrecoverableError("AllocateInMemory: Already allocated.");
} else if (auto &data_type = column_def_->type(); data_type->CanBuildSecondaryIndex()) [[likely]] {
data_ = static_cast<void *>(new char[part_row_count_ * data_pair_size_]);
LOG_TRACE("Finished AllocateInMemory().");
} else {
UnrecoverableError(fmt::format("Cannot build secondary index on data type: {}", data_type->ToString()));
}
}

void SecondaryIndexFileWorkerParts::FreeInMemory() {
if (data_) [[likely]] {
delete[] static_cast<char *>(data_);
data_ = nullptr;
LOG_TRACE("Finished FreeInMemory(), deleted data_ ptr.");
} else {
UnrecoverableError("FreeInMemory: Data is not allocated.");
}
}

void SecondaryIndexFileWorkerParts::WriteToFileImpl(bool to_spill, bool &prepare_success) {
if (data_) [[likely]] {
file_handler_->Write(data_, part_row_count_ * data_pair_size_);
prepare_success = true;
LOG_TRACE("Finished WriteToFileImpl(bool &prepare_success).");
} else {
UnrecoverableError("WriteToFileImpl: data_ is nullptr");
}
}

void SecondaryIndexFileWorkerParts::ReadFromFileImpl() {
if (row_count_ < part_id_ * 8192) {
UnrecoverableError(fmt::format("ReadFromFileImpl: row_count_: {} < part_id_ * 8192: {}", row_count_, part_id_ * 8192));
}
if (!data_) [[likely]] {
const u32 read_bytes = part_row_count_ * data_pair_size_;
data_ = static_cast<void *>(new char[read_bytes]);
file_handler_->Read(data_, read_bytes);
LOG_TRACE("Finished ReadFromFileImpl().");
} else {
UnrecoverableError("ReadFromFileImpl: data_ is not nullptr");
}
Expand Down
60 changes: 38 additions & 22 deletions src/storage/buffer/file_worker/secondary_index_file_worker.cppm
Original file line number Diff line number Diff line change
Expand Up @@ -28,44 +28,60 @@ import column_def;
namespace infinity {

export struct CreateSecondaryIndexParam : public CreateIndexParam {
// when create index file worker, we should always use the row_count_
// because the actual_row_count_ will reduce when we delete rows
// which will cause the index file worker count to be inconsistent when we read the index file
const u32 row_count_{}; // rows in the segment, include the deleted rows
const u32 part_capacity_{}; // split sorted index data into parts
CreateSecondaryIndexParam(SharedPtr<IndexBase> index_base, SharedPtr<ColumnDef> column_def, u32 row_count, u32 part_capacity)
: CreateIndexParam(index_base, column_def), row_count_(row_count), part_capacity_(part_capacity) {}
const u32 row_count_{}; // rows in the segment, include the deleted rows
CreateSecondaryIndexParam(SharedPtr<IndexBase> index_base, SharedPtr<ColumnDef> column_def, u32 row_count)
: CreateIndexParam(index_base, column_def), row_count_(row_count) {}
};

// SecondaryIndexFileWorker includes two types of data:
// 1. SecondaryIndexDataHead (when worker_id_ == 0)
// 2. SecondaryIndexDataPart (when worker_id_ > 0)
// pgm index
export class SecondaryIndexFileWorker final : public IndexFileWorker {
public:
explicit SecondaryIndexFileWorker(SharedPtr<String> file_dir,
SharedPtr<String> file_name,
SharedPtr<IndexBase> index_base,
SharedPtr<ColumnDef> column_def,
u32 worker_id,
u32 row_count,
u32 part_capacity)
: IndexFileWorker(file_dir, file_name, index_base, column_def), worker_id_(worker_id), row_count_(row_count), part_capacity_(part_capacity) {}
u32 row_count)
: IndexFileWorker(file_dir, file_name, index_base, column_def), row_count_(row_count) {}

~SecondaryIndexFileWorker() final;
~SecondaryIndexFileWorker() override;

public:
void AllocateInMemory() final;
void AllocateInMemory() override;

void FreeInMemory() final;
void FreeInMemory() override;

protected:
void WriteToFileImpl(bool to_spill, bool &prepare_success) final;
void WriteToFileImpl(bool to_spill, bool &prepare_success) override;

void ReadFromFileImpl() final;
void ReadFromFileImpl() override;

const u32 worker_id_{};
const u32 row_count_{};
const u32 part_capacity_{};
};

// row_count * pair<T, u32>
export class SecondaryIndexFileWorkerParts final : public IndexFileWorker {
public:
explicit SecondaryIndexFileWorkerParts(SharedPtr<String> file_dir,
SharedPtr<String> file_name,
SharedPtr<IndexBase> index_base,
SharedPtr<ColumnDef> column_def,
u32 row_count,
u32 part_id);

~SecondaryIndexFileWorkerParts() override;

void AllocateInMemory() override;

void FreeInMemory() override;

protected:
void WriteToFileImpl(bool to_spill, bool &prepare_success) override;

void ReadFromFileImpl() override;

const u32 row_count_;
const u32 part_id_;
u32 part_row_count_ = std::min<u32>(8192, row_count_ - part_id_ * 8192);
u32 data_pair_size_ = 0;
};

} // namespace infinity
11 changes: 11 additions & 0 deletions src/storage/column_vector/bitmask.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -325,4 +325,15 @@ SharedPtr<Bitmask> Bitmask::ReadAdv(char *&ptr, i32) {
return bitmask;
}

Bitmask &Bitmask::operator=(Bitmask &&right) {
if (this != &right) {
data_ptr_ = right.data_ptr_;
buffer_ptr = std::move(right.buffer_ptr);
count_ = right.count_;
right.data_ptr_ = nullptr;
right.count_ = 0;
}
return *this;
}

} // namespace infinity
1 change: 1 addition & 0 deletions src/storage/column_vector/bitmask.cppm
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ public:

bool operator==(const Bitmask &other) const;
bool operator!=(const Bitmask &other) const { return !(*this == other); }
Bitmask &operator=(Bitmask &&right);

// Estimated serialized size in bytes
i32 GetSizeInBytes() const;
Expand Down
63 changes: 63 additions & 0 deletions src/storage/meta/entry/chunk_index_entry.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ import buffer_handle;
import infinity_exception;
import index_defines;
import local_file_system;
import secondary_index_file_worker;
import column_def;

namespace infinity {

Expand Down Expand Up @@ -112,6 +114,33 @@ SharedPtr<ChunkIndexEntry> ChunkIndexEntry::NewFtChunkIndexEntry(SegmentIndexEnt
return chunk_index_entry;
}

SharedPtr<ChunkIndexEntry> ChunkIndexEntry::NewSecondaryIndexChunkIndexEntry(ChunkID chunk_id,
SegmentIndexEntry *segment_index_entry,
const String &base_name,
RowID base_rowid,
u32 row_count,
BufferManager *buffer_mgr) {
auto chunk_index_entry = MakeShared<ChunkIndexEntry>(chunk_id, segment_index_entry, base_name, base_rowid, row_count);
const auto &index_dir = segment_index_entry->index_dir();
assert(index_dir.get() != nullptr);
if (buffer_mgr != nullptr) {
SegmentID segment_id = segment_index_entry->segment_id();
auto secondary_index_file_name = MakeShared<String>(IndexFileName(segment_id, chunk_id));
const auto &index_base = segment_index_entry->table_index_entry()->table_index_def();
const auto &column_def = segment_index_entry->table_index_entry()->column_def();
auto file_worker = MakeUnique<SecondaryIndexFileWorker>(index_dir, secondary_index_file_name, index_base, column_def, row_count);
chunk_index_entry->buffer_obj_ = buffer_mgr->AllocateBufferObject(std::move(file_worker));
const u32 part_cnt = (row_count + 8191) / 8192;
for (u32 i = 0; i < part_cnt; ++i) {
auto part_name = MakeShared<String>(fmt::format("{}_part{}", *secondary_index_file_name, i));
auto part_file_worker = MakeUnique<SecondaryIndexFileWorkerParts>(index_dir, part_name, index_base, column_def, row_count, i);
BufferObj *part_ptr = buffer_mgr->AllocateBufferObject(std::move(part_file_worker));
chunk_index_entry->part_buffer_objs_.push_back(part_ptr);
}
}
return chunk_index_entry;
}

SharedPtr<ChunkIndexEntry> ChunkIndexEntry::NewReplayChunkIndexEntry(ChunkID chunk_id,
SegmentIndexEntry *segment_index_entry,
CreateIndexParam *param,
Expand All @@ -127,6 +156,15 @@ SharedPtr<ChunkIndexEntry> ChunkIndexEntry::NewReplayChunkIndexEntry(ChunkID chu
auto column_length_file_name = MakeShared<String>(base_name + LENGTH_SUFFIX);
auto file_worker = MakeUnique<RawFileWorker>(index_dir, column_length_file_name, row_count * sizeof(u32));
chunk_index_entry->buffer_obj_ = buffer_mgr->GetBufferObject(std::move(file_worker));
} else if (param->index_base_->index_type_ == IndexType::kSecondary) {
const auto &index_dir = segment_index_entry->index_dir();
SegmentID segment_id = segment_index_entry->segment_id();
auto secondary_index_file_name = MakeShared<String>(IndexFileName(segment_id, chunk_id));
const auto &index_base = segment_index_entry->table_index_entry()->table_index_def();
const auto &column_def = segment_index_entry->table_index_entry()->column_def();
auto file_worker = MakeUnique<SecondaryIndexFileWorker>(index_dir, secondary_index_file_name, index_base, column_def, row_count);
chunk_index_entry->buffer_obj_ = buffer_mgr->GetBufferObject(std::move(file_worker));
chunk_index_entry->LoadPartsReader(buffer_mgr);
} else {
const auto &index_dir = segment_index_entry->index_dir();
const auto &index_base = param->index_base_;
Expand Down Expand Up @@ -171,6 +209,9 @@ void ChunkIndexEntry::Cleanup() {
if (buffer_obj_) {
buffer_obj_->PickForCleanup();
}
for (BufferObj *part_buffer_obj : part_buffer_objs_) {
part_buffer_obj->PickForCleanup();
}
TableIndexEntry *table_index_entry = segment_index_entry_->table_index_entry();
const auto &index_dir = segment_index_entry_->index_dir();
const IndexBase *index_base = table_index_entry->index_base();
Expand All @@ -194,6 +235,28 @@ void ChunkIndexEntry::SaveIndexFile() {
return;
}
buffer_obj_->Save();
for (BufferObj *part_buffer_obj : part_buffer_objs_) {
part_buffer_obj->Save();
}
}

void ChunkIndexEntry::LoadPartsReader(BufferManager *buffer_mgr) {
const auto &index_dir = segment_index_entry_->index_dir();
SegmentID segment_id = segment_index_entry_->segment_id();
String secondary_index_file_name = IndexFileName(segment_id, chunk_id_);
const auto &index_base = segment_index_entry_->table_index_entry()->table_index_def();
const auto &column_def = segment_index_entry_->table_index_entry()->column_def();
const u32 part_cnt = (row_count_ + 8191) / 8192;
part_buffer_objs_.clear();
part_buffer_objs_.reserve(part_cnt);
for (u32 i = 0; i < part_cnt; ++i) {
auto part_name = MakeShared<String>(fmt::format("{}_part{}", secondary_index_file_name, i));
auto part_file_worker = MakeUnique<SecondaryIndexFileWorkerParts>(index_dir, std::move(part_name), index_base, column_def, row_count_, i);
BufferObj *part_ptr = buffer_mgr->GetBufferObject(std::move(part_file_worker));
part_buffer_objs_.push_back(part_ptr);
}
}

BufferHandle ChunkIndexEntry::GetIndexPartAt(u32 i) { return part_buffer_objs_.at(i)->Load(); }

} // namespace infinity
Loading

0 comments on commit 73f85c2

Please sign in to comment.