Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support real time secondary index #1171

Merged
merged 9 commits into from
May 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
442 changes: 262 additions & 180 deletions src/executor/operator/physical_index_scan.cpp

Large diffs are not rendered by default.

3 changes: 2 additions & 1 deletion src/executor/operator/physical_index_scan.cppm
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ export std::variant<Vector<u32>, Bitmask> SolveSecondaryIndexFilter(const Vector
const HashMap<ColumnID, TableIndexEntry *> &column_index_map,
const SegmentID segment_id,
const u32 segment_row_count,
const u32 segment_row_actual_count);
const u32 segment_row_actual_count,
const TxnTimeStamp ts);

} // namespace infinity
114 changes: 77 additions & 37 deletions src/storage/buffer/file_worker/secondary_index_file_worker.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,66 +41,106 @@ void SecondaryIndexFileWorker::AllocateInMemory() {
if (data_) [[unlikely]] {
UnrecoverableError("AllocateInMemory: Already allocated.");
} else if (auto &data_type = column_def_->type(); data_type->CanBuildSecondaryIndex()) [[likely]] {
if (worker_id_ == 0) {
data_ = static_cast<void *>(new SecondaryIndexDataHead(part_capacity_, row_count_, data_type));
} else {
if (u32 previous_rows = (worker_id_ - 1) * part_capacity_; previous_rows < row_count_) [[likely]] {
auto part_size = std::min<u32>(part_capacity_, row_count_ - previous_rows);
data_ = static_cast<void *>(new SecondaryIndexDataPart(worker_id_ - 1, part_size));
} else {
UnrecoverableError(fmt::format("AllocateInMemory: previous_rows: {} >= row_count_: {}.", previous_rows, row_count_));
}
}
LOG_TRACE(fmt::format("Finished AllocateInMemory() by worker_id: {}.", worker_id_));
data_ = static_cast<void *>(GetSecondaryIndexData(data_type, row_count_, true));
LOG_TRACE("Finished AllocateInMemory().");
} else {
UnrecoverableError(fmt::format("Cannot build secondary index on data type: {}", data_type->ToString()));
}
}

void SecondaryIndexFileWorker::FreeInMemory() {
if (data_) [[likely]] {
if (worker_id_ == 0) {
auto index = static_cast<SecondaryIndexDataHead *>(data_);
delete index;
} else {
auto index = static_cast<SecondaryIndexDataPart *>(data_);
delete index;
}
auto index = static_cast<SecondaryIndexData *>(data_);
delete index;
data_ = nullptr;
LOG_TRACE(fmt::format("Finished FreeInMemory() by worker_id: {}, deleted data_ ptr.", worker_id_));
LOG_TRACE("Finished FreeInMemory(), deleted data_ ptr.");
} else {
UnrecoverableError("FreeInMemory: Data is not allocated.");
}
}

void SecondaryIndexFileWorker::WriteToFileImpl(bool to_spill, bool &prepare_success) {
if (data_) [[likely]] {
if (worker_id_ == 0) {
auto index = static_cast<SecondaryIndexDataHead *>(data_);
index->SaveIndexInner(*file_handler_);
} else {
auto index = static_cast<SecondaryIndexDataPart *>(data_);
index->SaveIndexInner(*file_handler_);
}
auto index = static_cast<SecondaryIndexData *>(data_);
index->SaveIndexInner(*file_handler_);
prepare_success = true;
LOG_TRACE(fmt::format("Finished WriteToFileImpl(bool &prepare_success) by worker_id: {}.", worker_id_));
LOG_TRACE("Finished WriteToFileImpl(bool &prepare_success).");
} else {
UnrecoverableError("WriteToFileImpl: data_ is nullptr");
}
}

void SecondaryIndexFileWorker::ReadFromFileImpl() {
if (!data_) [[likely]] {
if (worker_id_ == 0) {
auto index = new SecondaryIndexDataHead();
index->ReadIndexInner(*file_handler_);
data_ = static_cast<void *>(index);
} else {
auto index = new SecondaryIndexDataPart();
index->ReadIndexInner(*file_handler_);
data_ = static_cast<void *>(index);
}
LOG_TRACE(fmt::format("Finished ReadFromFileImpl() by worker_id: {}.", worker_id_));
auto index = GetSecondaryIndexData(column_def_->type(), row_count_, false);
index->ReadIndexInner(*file_handler_);
data_ = static_cast<void *>(index);
LOG_TRACE("Finished ReadFromFileImpl().");
} else {
UnrecoverableError("ReadFromFileImpl: data_ is not nullptr");
}
}

SecondaryIndexFileWorkerParts::SecondaryIndexFileWorkerParts(SharedPtr<String> file_dir,
SharedPtr<String> file_name,
SharedPtr<IndexBase> index_base,
SharedPtr<ColumnDef> column_def,
u32 row_count,
u32 part_id)
: IndexFileWorker(file_dir, file_name, index_base, column_def), row_count_(row_count), part_id_(part_id) {
data_pair_size_ = GetSecondaryIndexDataPairSize(column_def_->type());
}

SecondaryIndexFileWorkerParts::~SecondaryIndexFileWorkerParts() {
if (data_ != nullptr) {
FreeInMemory();
data_ = nullptr;
}
}

void SecondaryIndexFileWorkerParts::AllocateInMemory() {
if (row_count_ < part_id_ * 8192) {
UnrecoverableError(fmt::format("AllocateInMemory: row_count_: {} < part_id_ * 8192: {}", row_count_, part_id_ * 8192));
}
if (data_) [[unlikely]] {
UnrecoverableError("AllocateInMemory: Already allocated.");
} else if (auto &data_type = column_def_->type(); data_type->CanBuildSecondaryIndex()) [[likely]] {
data_ = static_cast<void *>(new char[part_row_count_ * data_pair_size_]);
LOG_TRACE("Finished AllocateInMemory().");
} else {
UnrecoverableError(fmt::format("Cannot build secondary index on data type: {}", data_type->ToString()));
}
}

void SecondaryIndexFileWorkerParts::FreeInMemory() {
if (data_) [[likely]] {
delete[] static_cast<char *>(data_);
data_ = nullptr;
LOG_TRACE("Finished FreeInMemory(), deleted data_ ptr.");
} else {
UnrecoverableError("FreeInMemory: Data is not allocated.");
}
}

void SecondaryIndexFileWorkerParts::WriteToFileImpl(bool to_spill, bool &prepare_success) {
if (data_) [[likely]] {
file_handler_->Write(data_, part_row_count_ * data_pair_size_);
prepare_success = true;
LOG_TRACE("Finished WriteToFileImpl(bool &prepare_success).");
} else {
UnrecoverableError("WriteToFileImpl: data_ is nullptr");
}
}

void SecondaryIndexFileWorkerParts::ReadFromFileImpl() {
if (row_count_ < part_id_ * 8192) {
UnrecoverableError(fmt::format("ReadFromFileImpl: row_count_: {} < part_id_ * 8192: {}", row_count_, part_id_ * 8192));
}
if (!data_) [[likely]] {
const u32 read_bytes = part_row_count_ * data_pair_size_;
data_ = static_cast<void *>(new char[read_bytes]);
file_handler_->Read(data_, read_bytes);
LOG_TRACE("Finished ReadFromFileImpl().");
} else {
UnrecoverableError("ReadFromFileImpl: data_ is not nullptr");
}
Expand Down
60 changes: 38 additions & 22 deletions src/storage/buffer/file_worker/secondary_index_file_worker.cppm
Original file line number Diff line number Diff line change
Expand Up @@ -28,44 +28,60 @@ import column_def;
namespace infinity {

export struct CreateSecondaryIndexParam : public CreateIndexParam {
// when create index file worker, we should always use the row_count_
// because the actual_row_count_ will reduce when we delete rows
// which will cause the index file worker count to be inconsistent when we read the index file
const u32 row_count_{}; // rows in the segment, include the deleted rows
const u32 part_capacity_{}; // split sorted index data into parts
CreateSecondaryIndexParam(SharedPtr<IndexBase> index_base, SharedPtr<ColumnDef> column_def, u32 row_count, u32 part_capacity)
: CreateIndexParam(index_base, column_def), row_count_(row_count), part_capacity_(part_capacity) {}
const u32 row_count_{}; // rows in the segment, include the deleted rows
CreateSecondaryIndexParam(SharedPtr<IndexBase> index_base, SharedPtr<ColumnDef> column_def, u32 row_count)
: CreateIndexParam(index_base, column_def), row_count_(row_count) {}
};

// SecondaryIndexFileWorker includes two types of data:
// 1. SecondaryIndexDataHead (when worker_id_ == 0)
// 2. SecondaryIndexDataPart (when worker_id_ > 0)
// pgm index
export class SecondaryIndexFileWorker final : public IndexFileWorker {
public:
explicit SecondaryIndexFileWorker(SharedPtr<String> file_dir,
SharedPtr<String> file_name,
SharedPtr<IndexBase> index_base,
SharedPtr<ColumnDef> column_def,
u32 worker_id,
u32 row_count,
u32 part_capacity)
: IndexFileWorker(file_dir, file_name, index_base, column_def), worker_id_(worker_id), row_count_(row_count), part_capacity_(part_capacity) {}
u32 row_count)
: IndexFileWorker(file_dir, file_name, index_base, column_def), row_count_(row_count) {}

~SecondaryIndexFileWorker() final;
~SecondaryIndexFileWorker() override;

public:
void AllocateInMemory() final;
void AllocateInMemory() override;

void FreeInMemory() final;
void FreeInMemory() override;

protected:
void WriteToFileImpl(bool to_spill, bool &prepare_success) final;
void WriteToFileImpl(bool to_spill, bool &prepare_success) override;

void ReadFromFileImpl() final;
void ReadFromFileImpl() override;

const u32 worker_id_{};
const u32 row_count_{};
const u32 part_capacity_{};
};

// row_count * pair<T, u32>
export class SecondaryIndexFileWorkerParts final : public IndexFileWorker {
public:
explicit SecondaryIndexFileWorkerParts(SharedPtr<String> file_dir,
SharedPtr<String> file_name,
SharedPtr<IndexBase> index_base,
SharedPtr<ColumnDef> column_def,
u32 row_count,
u32 part_id);

~SecondaryIndexFileWorkerParts() override;

void AllocateInMemory() override;

void FreeInMemory() override;

protected:
void WriteToFileImpl(bool to_spill, bool &prepare_success) override;

void ReadFromFileImpl() override;

const u32 row_count_;
const u32 part_id_;
u32 part_row_count_ = std::min<u32>(8192, row_count_ - part_id_ * 8192);
u32 data_pair_size_ = 0;
};

} // namespace infinity
11 changes: 11 additions & 0 deletions src/storage/column_vector/bitmask.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -325,4 +325,15 @@ SharedPtr<Bitmask> Bitmask::ReadAdv(char *&ptr, i32) {
return bitmask;
}

Bitmask &Bitmask::operator=(Bitmask &&right) {
if (this != &right) {
data_ptr_ = right.data_ptr_;
buffer_ptr = std::move(right.buffer_ptr);
count_ = right.count_;
right.data_ptr_ = nullptr;
right.count_ = 0;
}
return *this;
}

} // namespace infinity
1 change: 1 addition & 0 deletions src/storage/column_vector/bitmask.cppm
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ public:

bool operator==(const Bitmask &other) const;
bool operator!=(const Bitmask &other) const { return !(*this == other); }
Bitmask &operator=(Bitmask &&right);

// Estimated serialized size in bytes
i32 GetSizeInBytes() const;
Expand Down
63 changes: 63 additions & 0 deletions src/storage/meta/entry/chunk_index_entry.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ import buffer_handle;
import infinity_exception;
import index_defines;
import local_file_system;
import secondary_index_file_worker;
import column_def;

namespace infinity {

Expand Down Expand Up @@ -112,6 +114,33 @@ SharedPtr<ChunkIndexEntry> ChunkIndexEntry::NewFtChunkIndexEntry(SegmentIndexEnt
return chunk_index_entry;
}

SharedPtr<ChunkIndexEntry> ChunkIndexEntry::NewSecondaryIndexChunkIndexEntry(ChunkID chunk_id,
SegmentIndexEntry *segment_index_entry,
const String &base_name,
RowID base_rowid,
u32 row_count,
BufferManager *buffer_mgr) {
auto chunk_index_entry = MakeShared<ChunkIndexEntry>(chunk_id, segment_index_entry, base_name, base_rowid, row_count);
const auto &index_dir = segment_index_entry->index_dir();
assert(index_dir.get() != nullptr);
if (buffer_mgr != nullptr) {
SegmentID segment_id = segment_index_entry->segment_id();
auto secondary_index_file_name = MakeShared<String>(IndexFileName(segment_id, chunk_id));
const auto &index_base = segment_index_entry->table_index_entry()->table_index_def();
const auto &column_def = segment_index_entry->table_index_entry()->column_def();
auto file_worker = MakeUnique<SecondaryIndexFileWorker>(index_dir, secondary_index_file_name, index_base, column_def, row_count);
chunk_index_entry->buffer_obj_ = buffer_mgr->AllocateBufferObject(std::move(file_worker));
const u32 part_cnt = (row_count + 8191) / 8192;
for (u32 i = 0; i < part_cnt; ++i) {
auto part_name = MakeShared<String>(fmt::format("{}_part{}", *secondary_index_file_name, i));
auto part_file_worker = MakeUnique<SecondaryIndexFileWorkerParts>(index_dir, part_name, index_base, column_def, row_count, i);
BufferObj *part_ptr = buffer_mgr->AllocateBufferObject(std::move(part_file_worker));
chunk_index_entry->part_buffer_objs_.push_back(part_ptr);
}
}
return chunk_index_entry;
}

SharedPtr<ChunkIndexEntry> ChunkIndexEntry::NewReplayChunkIndexEntry(ChunkID chunk_id,
SegmentIndexEntry *segment_index_entry,
CreateIndexParam *param,
Expand All @@ -127,6 +156,15 @@ SharedPtr<ChunkIndexEntry> ChunkIndexEntry::NewReplayChunkIndexEntry(ChunkID chu
auto column_length_file_name = MakeShared<String>(base_name + LENGTH_SUFFIX);
auto file_worker = MakeUnique<RawFileWorker>(index_dir, column_length_file_name, row_count * sizeof(u32));
chunk_index_entry->buffer_obj_ = buffer_mgr->GetBufferObject(std::move(file_worker));
} else if (param->index_base_->index_type_ == IndexType::kSecondary) {
const auto &index_dir = segment_index_entry->index_dir();
SegmentID segment_id = segment_index_entry->segment_id();
auto secondary_index_file_name = MakeShared<String>(IndexFileName(segment_id, chunk_id));
const auto &index_base = segment_index_entry->table_index_entry()->table_index_def();
const auto &column_def = segment_index_entry->table_index_entry()->column_def();
auto file_worker = MakeUnique<SecondaryIndexFileWorker>(index_dir, secondary_index_file_name, index_base, column_def, row_count);
chunk_index_entry->buffer_obj_ = buffer_mgr->GetBufferObject(std::move(file_worker));
chunk_index_entry->LoadPartsReader(buffer_mgr);
} else {
const auto &index_dir = segment_index_entry->index_dir();
const auto &index_base = param->index_base_;
Expand Down Expand Up @@ -171,6 +209,9 @@ void ChunkIndexEntry::Cleanup() {
if (buffer_obj_) {
buffer_obj_->PickForCleanup();
}
for (BufferObj *part_buffer_obj : part_buffer_objs_) {
part_buffer_obj->PickForCleanup();
}
TableIndexEntry *table_index_entry = segment_index_entry_->table_index_entry();
const auto &index_dir = segment_index_entry_->index_dir();
const IndexBase *index_base = table_index_entry->index_base();
Expand All @@ -194,6 +235,28 @@ void ChunkIndexEntry::SaveIndexFile() {
return;
}
buffer_obj_->Save();
for (BufferObj *part_buffer_obj : part_buffer_objs_) {
part_buffer_obj->Save();
}
}

void ChunkIndexEntry::LoadPartsReader(BufferManager *buffer_mgr) {
const auto &index_dir = segment_index_entry_->index_dir();
SegmentID segment_id = segment_index_entry_->segment_id();
String secondary_index_file_name = IndexFileName(segment_id, chunk_id_);
const auto &index_base = segment_index_entry_->table_index_entry()->table_index_def();
const auto &column_def = segment_index_entry_->table_index_entry()->column_def();
const u32 part_cnt = (row_count_ + 8191) / 8192;
part_buffer_objs_.clear();
part_buffer_objs_.reserve(part_cnt);
for (u32 i = 0; i < part_cnt; ++i) {
auto part_name = MakeShared<String>(fmt::format("{}_part{}", secondary_index_file_name, i));
auto part_file_worker = MakeUnique<SecondaryIndexFileWorkerParts>(index_dir, std::move(part_name), index_base, column_def, row_count_, i);
BufferObj *part_ptr = buffer_mgr->GetBufferObject(std::move(part_file_worker));
part_buffer_objs_.push_back(part_ptr);
}
}

BufferHandle ChunkIndexEntry::GetIndexPartAt(u32 i) { return part_buffer_objs_.at(i)->Load(); }

} // namespace infinity
Loading
Loading