Skip to content

Commit

Permalink
Fix: compact bug. (#1230)
Browse files Browse the repository at this point in the history
### What problem does this PR solve?

1. Fix: Add lock in txn_store
2. Fix: physical compact plan.

### Type of change

- [x] Bug Fix (non-breaking change which fixes an issue)
  • Loading branch information
small-turtle-1 authored May 22, 2024
1 parent 199083d commit 996fb7b
Show file tree
Hide file tree
Showing 12 changed files with 148 additions and 112 deletions.
4 changes: 3 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -123,4 +123,6 @@ callgrind.out.*
.cache/

# ignore all benchmark datasets
python/benchmark/datasets/
python/benchmark/datasets/

myvenv/
32 changes: 14 additions & 18 deletions src/executor/operator/physical_compact.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -114,17 +114,18 @@ bool PhysicalCompact::Execute(QueryContext *query_context, OperatorState *operat
SizeT group_idx = compact_operator_state->compact_idx_;
CompactStateData *compact_state_data = compact_operator_state->compact_state_data_.get();
RowIDRemap &remapper = compact_state_data->remapper_;
if (group_idx == compact_state_data->segment_data_list_.size()) {
if (group_idx == compact_operator_state->segment_groups_.size()) {
compact_operator_state->SetComplete();
return true;
}
CompactSegmentData &compact_segment_data = compact_state_data->segment_data_list_[group_idx];
compact_segment_data.old_segments_ = compactible_segments_group_[group_idx];
const auto &compactible_segments = compact_segment_data.old_segments_;

for (auto *compactible_segment : compactible_segments) {
if (!compactible_segment->TrySetCompacting(compact_state_data)) {
UnrecoverableError("Segment should be compactible.");
const Vector<SegmentEntry *> &candidate_segments = compact_operator_state->segment_groups_[group_idx];
Vector<SegmentEntry *> compactible_segments;
{

for (auto *candidate_segment : candidate_segments) {
if (candidate_segment->TrySetCompacting(compact_state_data)) {
compactible_segments.push_back(candidate_segment);
}
}
}

Expand All @@ -137,12 +138,10 @@ bool PhysicalCompact::Execute(QueryContext *query_context, OperatorState *operat

SizeT column_count = table_entry->ColumnCount();

compact_segment_data.new_segment_ = SegmentEntry::NewSegmentEntry(table_entry, Catalog::GetNextSegmentID(table_entry), txn);
auto *new_segment = compact_segment_data.new_segment_.get();
compact_state_data->AddNewSegment(new_segment, txn);
auto new_segment = SegmentEntry::NewSegmentEntry(table_entry, Catalog::GetNextSegmentID(table_entry), txn);
SegmentID new_segment_id = new_segment->segment_id();

UniquePtr<BlockEntry> new_block = BlockEntry::NewBlockEntry(new_segment, new_segment->GetNextBlockID(), 0 /*checkpoint_ts*/, column_count, txn);
UniquePtr<BlockEntry> new_block = BlockEntry::NewBlockEntry(new_segment.get(), new_segment->GetNextBlockID(), 0 /*checkpoint_ts*/, column_count, txn);
const SizeT block_capacity = new_block->row_capacity();

for (SegmentEntry *segment : compactible_segments) {
Expand Down Expand Up @@ -180,7 +179,7 @@ bool PhysicalCompact::Execute(QueryContext *query_context, OperatorState *operat
read_size -= read_size1;
new_segment->AppendBlockEntry(std::move(new_block));

new_block = BlockEntry::NewBlockEntry(new_segment, new_segment->GetNextBlockID(), 0, column_count, txn);
new_block = BlockEntry::NewBlockEntry(new_segment.get(), new_segment->GetNextBlockID(), 0, column_count, txn);
}
block_entry_append(row_begin, read_size);
}
Expand All @@ -189,8 +188,9 @@ bool PhysicalCompact::Execute(QueryContext *query_context, OperatorState *operat
if (new_block->row_count() > 0) {
new_segment->AppendBlockEntry(std::move(new_block));
}
compact_state_data->AddNewSegment(new_segment, std::move(compactible_segments), txn);
compact_operator_state->compact_idx_ = ++group_idx;
if (group_idx == compact_state_data->segment_data_list_.size()) {
if (group_idx == compact_operator_state->segment_groups_.size()) {
compact_operator_state->SetComplete();
}
compact_operator_state->SetComplete();
Expand All @@ -199,10 +199,6 @@ bool PhysicalCompact::Execute(QueryContext *query_context, OperatorState *operat
}

Vector<Vector<Vector<SegmentEntry *>>> PhysicalCompact::PlanCompact(SizeT parallel_count) {
if (parallel_count > compactible_segments_group_.size()) {
UnrecoverableError(
fmt::format("parallel_count {} is larger than compactible_segments_group_ size {}", parallel_count, compactible_segments_group_.size()));
}
Vector<Vector<Vector<SegmentEntry *>>> result(parallel_count);
for (SizeT i = 0; i < compactible_segments_group_.size(); ++i) {
result[i % parallel_count].push_back(compactible_segments_group_[i]);
Expand Down
5 changes: 3 additions & 2 deletions src/executor/operator/physical_compact_index_prepare.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,9 @@ bool PhysicalCompactIndexPrepare::Execute(QueryContext *query_context, OperatorS
}

SizeT PhysicalCompactIndexPrepare::TaskletCount() {
auto *index_index = base_table_ref_->index_index_.get();
return index_index->index_snapshots_vec_.size();
// auto *index_index = base_table_ref_->index_index_.get();
// return index_index->index_snapshots_vec_.size();
return 1;
}

} // namespace infinity
13 changes: 10 additions & 3 deletions src/executor/operator_state.cppm
Original file line number Diff line number Diff line change
Expand Up @@ -382,11 +382,11 @@ export struct FusionOperatorState : public OperatorState {

// Compact
export struct CompactOperatorState : public OperatorState {
inline explicit CompactOperatorState(SizeT compact_idx, SharedPtr<CompactStateData> compact_state_data)
: OperatorState(PhysicalOperatorType::kCompact), compact_idx_(compact_idx), compact_state_data_(compact_state_data) {}
inline explicit CompactOperatorState(Vector<Vector<SegmentEntry *>> segment_groups, SharedPtr<CompactStateData> compact_state_data)
: OperatorState(PhysicalOperatorType::kCompact), segment_groups_(std::move(segment_groups)), compact_state_data_(compact_state_data) {}

SizeT compact_idx_{};

Vector<Vector<SegmentEntry *>> segment_groups_;
SharedPtr<CompactStateData> compact_state_data_{};
};

Expand Down Expand Up @@ -500,6 +500,13 @@ export struct KnnScanSourceState : public SourceState {
explicit KnnScanSourceState() : SourceState(SourceStateType::kKnnScan) {}
};

export struct CompactSourceState : public SourceState {
explicit CompactSourceState(Vector<Vector<SegmentEntry *>> segment_groups)
: SourceState(SourceStateType::kCompact), segment_groups_(std::move(segment_groups)) {}

Vector<Vector<SegmentEntry *>> segment_groups_;
};

export struct EmptySourceState : public SourceState {
explicit EmptySourceState() : SourceState(SourceStateType::kEmpty) {}
};
Expand Down
7 changes: 5 additions & 2 deletions src/function/table/compact_state_data.cppm
Original file line number Diff line number Diff line change
Expand Up @@ -95,10 +95,13 @@ public:

const HashMap<SegmentID, Vector<SegmentOffset>> &GetToDelete() const { return to_delete_; }

void AddNewSegment(SegmentEntry *new_segment, Txn *txn) {
void AddNewSegment(SharedPtr<SegmentEntry> new_segment, Vector<SegmentEntry *> compacted_segments, Txn *txn) {
std::lock_guard lock(mutex2_);
auto *block_index = new_table_ref_->block_index_.get();
block_index->Insert(new_segment, txn);
block_index->Insert(new_segment.get(), txn);

CompactSegmentData data{new_segment, std::move(compacted_segments)};
segment_data_list_.push_back(std::move(data));
}

void AddNewIndex(TableIndexEntry *table_index_entry, Txn *txn) {
Expand Down
34 changes: 0 additions & 34 deletions src/main/bg_query_state.cppm

This file was deleted.

23 changes: 20 additions & 3 deletions src/scheduler/fragment_context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -160,11 +160,18 @@ UniquePtr<OperatorState> MakeKnnScanState(PhysicalKnnScan *physical_knn_scan, Fr
}

UniquePtr<OperatorState> MakeCompactState(PhysicalCompact *physical_compact, FragmentTask *task, FragmentContext *fragment_ctx) {
SourceState *source_state = task->source_state_.get();
if (source_state->state_type_ != SourceStateType::kCompact) {
UnrecoverableError("Expect compact source state");
}
auto *compact_source_state = static_cast<CompactSourceState *>(source_state);

if (fragment_ctx->ContextType() != FragmentType::kParallelMaterialize) {
UnrecoverableError("Compact operator should be in parallel materialized fragment.");
}
auto *parallel_materialize_fragment_ctx = static_cast<ParallelMaterializedFragmentCtx *>(fragment_ctx);
auto compact_operator_state = MakeUnique<CompactOperatorState>(task->TaskID(), parallel_materialize_fragment_ctx->compact_state_data_);
auto compact_operator_state =
MakeUnique<CompactOperatorState>(std::move(compact_source_state->segment_groups_), parallel_materialize_fragment_ctx->compact_state_data_);
return compact_operator_state;
}

Expand Down Expand Up @@ -689,7 +696,6 @@ SizeT InitCompactFragmentContext(PhysicalCompact *compact_operator, FragmentCont
auto *parent_serial_materialize_fragment_ctx = static_cast<SerialMaterializedFragmentCtx *>(parent_context);

auto &compact_state_data = parent_serial_materialize_fragment_ctx->compact_state_data_;
compact_state_data->segment_data_list_.resize(task_n);
parallel_materialize_fragment_ctx->compact_state_data_ = compact_state_data;
return task_n;
}
Expand Down Expand Up @@ -809,8 +815,19 @@ void FragmentContext::MakeSourceState(i64 parallel_count) {
tasks_[0]->source_state_ = MakeUnique<QueueSourceState>();
break;
}
case PhysicalOperatorType::kCompact: {
if (fragment_type_ != FragmentType::kParallelMaterialize) {
UnrecoverableError(
fmt::format("{} should in parallel materialized fragment", PhysicalOperatorToString(first_operator->operator_type())));
}
auto *physical_compact = static_cast<PhysicalCompact *>(first_operator);
Vector<Vector<Vector<SegmentEntry *>>> segment_groups_list = physical_compact->PlanCompact(parallel_count);
for (i64 task_id = 0; task_id < parallel_count; ++task_id) {
tasks_[task_id]->source_state_ = MakeUnique<CompactSourceState>(std::move(segment_groups_list[task_id]));
}
break;
}
case PhysicalOperatorType::kCreateIndexDo:
case PhysicalOperatorType::kCompact:
case PhysicalOperatorType::kCompactIndexDo: {
if (fragment_type_ != FragmentType::kParallelMaterialize) {
UnrecoverableError(
Expand Down
69 changes: 69 additions & 0 deletions src/storage/bg_query_state.cppm
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
// Copyright(C) 2023 InfiniFlow, Inc. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

module;

export module bg_query_state;

import stl;
import logical_node;
import physical_operator;
import plan_fragment;
import fragment_context;
import query_context;
import infinity_context;
import session_manager;
import session;
import txn;

namespace infinity {

export struct BGQueryState {
Vector<SharedPtr<LogicalNode>> logical_plans{};
Vector<UniquePtr<PhysicalOperator>> physical_plans{};
SharedPtr<PlanFragment> plan_fragment{};
UniquePtr<Notifier> notifier{};
};

export struct BGQueryContextWrapper {
UniquePtr<QueryContext> query_context_;

SessionManager *session_mgr_;
SharedPtr<BaseSession> session_;

BGQueryContextWrapper(BGQueryContextWrapper &&other)
: query_context_(std::move(other.query_context_)), session_mgr_(other.session_mgr_), session_(std::move(other.session_)) {
other.session_mgr_ = nullptr;
}

BGQueryContextWrapper(Txn *txn) : session_mgr_(InfinityContext::instance().session_manager()) {
session_ = session_mgr_->CreateLocalSession();
query_context_ = MakeUnique<QueryContext>(session_.get());
query_context_->Init(InfinityContext::instance().config(),
InfinityContext::instance().task_scheduler(),
InfinityContext::instance().storage(),
InfinityContext::instance().resource_manager(),
InfinityContext::instance().session_manager());
query_context_->SetTxn(txn);
}

~BGQueryContextWrapper() {
if (session_mgr_ != nullptr) {
auto *session = query_context_->current_session();
session_mgr_->RemoveSessionByID(session->session_id());
}
}
};

} // namespace infinity
50 changes: 9 additions & 41 deletions src/storage/compaction_process.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,44 +32,13 @@ import bg_query_state;
import query_context;
import infinity_context;
import compact_statement;
import session;
import compilation_config;
import defer_op;
import bg_query_state;

namespace infinity {

struct BGQueryContextWrapper {
UniquePtr<QueryContext> query_context_;

SessionManager *session_mgr_;
SharedPtr<BaseSession> session_;

BGQueryContextWrapper(BGQueryContextWrapper &&other)
: query_context_(std::move(other.query_context_)), session_mgr_(other.session_mgr_), session_(std::move(other.session_)) {
other.session_mgr_ = nullptr;
}

BGQueryContextWrapper(Txn *txn, SessionManager *session_mgr) : session_mgr_(session_mgr) {
session_ = session_mgr_->CreateLocalSession();
query_context_ = MakeUnique<QueryContext>(session_.get());
query_context_->Init(InfinityContext::instance().config(),
InfinityContext::instance().task_scheduler(),
InfinityContext::instance().storage(),
InfinityContext::instance().resource_manager(),
InfinityContext::instance().session_manager());
query_context_->SetTxn(txn);
}

~BGQueryContextWrapper() {
if (session_mgr_ != nullptr) {
auto *session = query_context_->current_session();
session_mgr_->RemoveSessionByID(session->session_id());
}
}
};

CompactionProcessor::CompactionProcessor(Catalog *catalog, TxnManager *txn_mgr)
: catalog_(catalog), txn_mgr_(txn_mgr), session_mgr_(InfinityContext::instance().session_manager()) {}
CompactionProcessor::CompactionProcessor(Catalog *catalog, TxnManager *txn_mgr) : catalog_(catalog), txn_mgr_(txn_mgr) {}

void CompactionProcessor::Start() {
LOG_INFO("Compaction processor is started.");
Expand Down Expand Up @@ -99,37 +68,36 @@ void CompactionProcessor::DoCompact() {
Vector<Pair<UniquePtr<BaseStatement>, Txn *>> statements = this->ScanForCompact(scan_txn);
Vector<Pair<BGQueryContextWrapper, BGQueryState>> wrappers;
for (const auto &[statement, txn] : statements) {
BGQueryContextWrapper wrapper(txn, session_mgr_);
BGQueryContextWrapper wrapper(txn);
BGQueryState state;
bool res = wrapper.query_context_->ExecuteBGStatement(statement.get(), state);
if (res) {
wrappers.emplace_back(std::move(wrapper), std::move(state));
}
}
for (auto &[wrapper, query_state] : wrappers) {
TxnTimeStamp commit_ts = 0;
wrapper.query_context_->JoinBGStatement(query_state, commit_ts);
TxnTimeStamp commit_ts_out = 0;
wrapper.query_context_->JoinBGStatement(query_state, commit_ts_out);
}
txn_mgr_->CommitTxn(scan_txn);
success = true;
}

TxnTimeStamp
CompactionProcessor::ManualDoCompact(const String &schema_name, const String &table_name, bool rollback, Optional<std::function<void()>> mid_func) {
TxnTimeStamp commit_ts = 0;

auto statement = MakeUnique<ManualCompactStatement>(schema_name, table_name);
Txn *txn = txn_mgr_->BeginTxn(MakeUnique<String>("ManualCompact"));
BGQueryContextWrapper wrapper(txn, session_mgr_);
BGQueryContextWrapper wrapper(txn);
BGQueryState state;
bool res = wrapper.query_context_->ExecuteBGStatement(statement.get(), state);
if (mid_func) {
mid_func.value()();
}
TxnTimeStamp out_commit_ts = 0;
if (res) {
wrapper.query_context_->JoinBGStatement(state, commit_ts, rollback);
wrapper.query_context_->JoinBGStatement(state, out_commit_ts, rollback);
}
return commit_ts;
return out_commit_ts;
}

Vector<Pair<UniquePtr<BaseStatement>, Txn *>> CompactionProcessor::ScanForCompact(Txn *scan_txn) {
Expand Down
Loading

0 comments on commit 996fb7b

Please sign in to comment.