From 996fb7ba0a9ff3bb2b8000f3686d5966b50eb71e Mon Sep 17 00:00:00 2001 From: shen yushi Date: Wed, 22 May 2024 13:20:38 +0800 Subject: [PATCH] Fix: compact bug. (#1230) ### What problem does this PR solve? 1. Fix: Add lock in txn_store 2. Fix: physical compact plan. ### Type of change - [x] Bug Fix (non-breaking change which fixes an issue) --- .gitignore | 4 +- src/executor/operator/physical_compact.cpp | 32 ++++----- .../physical_compact_index_prepare.cpp | 5 +- src/executor/operator_state.cppm | 13 +++- src/function/table/compact_state_data.cppm | 7 +- src/main/bg_query_state.cppm | 34 --------- src/scheduler/fragment_context.cpp | 23 ++++++- src/storage/bg_query_state.cppm | 69 +++++++++++++++++++ src/storage/compaction_process.cpp | 50 +++----------- src/storage/txn/txn.cpp | 18 +++-- src/storage/txn/txn_store.cppm | 2 + .../storage/wal/catalog_delta_replay.cpp | 3 +- 12 files changed, 148 insertions(+), 112 deletions(-) delete mode 100644 src/main/bg_query_state.cppm create mode 100644 src/storage/bg_query_state.cppm diff --git a/.gitignore b/.gitignore index 30e841ea54..fc12e94290 100644 --- a/.gitignore +++ b/.gitignore @@ -123,4 +123,6 @@ callgrind.out.* .cache/ # ignore all benchmark datasets -python/benchmark/datasets/ \ No newline at end of file +python/benchmark/datasets/ + +myvenv/ \ No newline at end of file diff --git a/src/executor/operator/physical_compact.cpp b/src/executor/operator/physical_compact.cpp index d8e03e57e1..1f8638d998 100644 --- a/src/executor/operator/physical_compact.cpp +++ b/src/executor/operator/physical_compact.cpp @@ -114,17 +114,18 @@ bool PhysicalCompact::Execute(QueryContext *query_context, OperatorState *operat SizeT group_idx = compact_operator_state->compact_idx_; CompactStateData *compact_state_data = compact_operator_state->compact_state_data_.get(); RowIDRemap &remapper = compact_state_data->remapper_; - if (group_idx == compact_state_data->segment_data_list_.size()) { + if (group_idx == compact_operator_state->segment_groups_.size()) { compact_operator_state->SetComplete(); return true; } - CompactSegmentData &compact_segment_data = compact_state_data->segment_data_list_[group_idx]; - compact_segment_data.old_segments_ = compactible_segments_group_[group_idx]; - const auto &compactible_segments = compact_segment_data.old_segments_; - - for (auto *compactible_segment : compactible_segments) { - if (!compactible_segment->TrySetCompacting(compact_state_data)) { - UnrecoverableError("Segment should be compactible."); + const Vector &candidate_segments = compact_operator_state->segment_groups_[group_idx]; + Vector compactible_segments; + { + + for (auto *candidate_segment : candidate_segments) { + if (candidate_segment->TrySetCompacting(compact_state_data)) { + compactible_segments.push_back(candidate_segment); + } } } @@ -137,12 +138,10 @@ bool PhysicalCompact::Execute(QueryContext *query_context, OperatorState *operat SizeT column_count = table_entry->ColumnCount(); - compact_segment_data.new_segment_ = SegmentEntry::NewSegmentEntry(table_entry, Catalog::GetNextSegmentID(table_entry), txn); - auto *new_segment = compact_segment_data.new_segment_.get(); - compact_state_data->AddNewSegment(new_segment, txn); + auto new_segment = SegmentEntry::NewSegmentEntry(table_entry, Catalog::GetNextSegmentID(table_entry), txn); SegmentID new_segment_id = new_segment->segment_id(); - UniquePtr new_block = BlockEntry::NewBlockEntry(new_segment, new_segment->GetNextBlockID(), 0 /*checkpoint_ts*/, column_count, txn); + UniquePtr new_block = BlockEntry::NewBlockEntry(new_segment.get(), new_segment->GetNextBlockID(), 0 /*checkpoint_ts*/, column_count, txn); const SizeT block_capacity = new_block->row_capacity(); for (SegmentEntry *segment : compactible_segments) { @@ -180,7 +179,7 @@ bool PhysicalCompact::Execute(QueryContext *query_context, OperatorState *operat read_size -= read_size1; new_segment->AppendBlockEntry(std::move(new_block)); - new_block = BlockEntry::NewBlockEntry(new_segment, new_segment->GetNextBlockID(), 0, column_count, txn); + new_block = BlockEntry::NewBlockEntry(new_segment.get(), new_segment->GetNextBlockID(), 0, column_count, txn); } block_entry_append(row_begin, read_size); } @@ -189,8 +188,9 @@ bool PhysicalCompact::Execute(QueryContext *query_context, OperatorState *operat if (new_block->row_count() > 0) { new_segment->AppendBlockEntry(std::move(new_block)); } + compact_state_data->AddNewSegment(new_segment, std::move(compactible_segments), txn); compact_operator_state->compact_idx_ = ++group_idx; - if (group_idx == compact_state_data->segment_data_list_.size()) { + if (group_idx == compact_operator_state->segment_groups_.size()) { compact_operator_state->SetComplete(); } compact_operator_state->SetComplete(); @@ -199,10 +199,6 @@ bool PhysicalCompact::Execute(QueryContext *query_context, OperatorState *operat } Vector>> PhysicalCompact::PlanCompact(SizeT parallel_count) { - if (parallel_count > compactible_segments_group_.size()) { - UnrecoverableError( - fmt::format("parallel_count {} is larger than compactible_segments_group_ size {}", parallel_count, compactible_segments_group_.size())); - } Vector>> result(parallel_count); for (SizeT i = 0; i < compactible_segments_group_.size(); ++i) { result[i % parallel_count].push_back(compactible_segments_group_[i]); diff --git a/src/executor/operator/physical_compact_index_prepare.cpp b/src/executor/operator/physical_compact_index_prepare.cpp index 976c4e4915..3685fc78a5 100644 --- a/src/executor/operator/physical_compact_index_prepare.cpp +++ b/src/executor/operator/physical_compact_index_prepare.cpp @@ -70,8 +70,9 @@ bool PhysicalCompactIndexPrepare::Execute(QueryContext *query_context, OperatorS } SizeT PhysicalCompactIndexPrepare::TaskletCount() { - auto *index_index = base_table_ref_->index_index_.get(); - return index_index->index_snapshots_vec_.size(); + // auto *index_index = base_table_ref_->index_index_.get(); + // return index_index->index_snapshots_vec_.size(); + return 1; } } // namespace infinity \ No newline at end of file diff --git a/src/executor/operator_state.cppm b/src/executor/operator_state.cppm index 1675252471..dcf38a379c 100644 --- a/src/executor/operator_state.cppm +++ b/src/executor/operator_state.cppm @@ -382,11 +382,11 @@ export struct FusionOperatorState : public OperatorState { // Compact export struct CompactOperatorState : public OperatorState { - inline explicit CompactOperatorState(SizeT compact_idx, SharedPtr compact_state_data) - : OperatorState(PhysicalOperatorType::kCompact), compact_idx_(compact_idx), compact_state_data_(compact_state_data) {} + inline explicit CompactOperatorState(Vector> segment_groups, SharedPtr compact_state_data) + : OperatorState(PhysicalOperatorType::kCompact), segment_groups_(std::move(segment_groups)), compact_state_data_(compact_state_data) {} SizeT compact_idx_{}; - + Vector> segment_groups_; SharedPtr compact_state_data_{}; }; @@ -500,6 +500,13 @@ export struct KnnScanSourceState : public SourceState { explicit KnnScanSourceState() : SourceState(SourceStateType::kKnnScan) {} }; +export struct CompactSourceState : public SourceState { + explicit CompactSourceState(Vector> segment_groups) + : SourceState(SourceStateType::kCompact), segment_groups_(std::move(segment_groups)) {} + + Vector> segment_groups_; +}; + export struct EmptySourceState : public SourceState { explicit EmptySourceState() : SourceState(SourceStateType::kEmpty) {} }; diff --git a/src/function/table/compact_state_data.cppm b/src/function/table/compact_state_data.cppm index 127e9afa58..3931c35b8a 100644 --- a/src/function/table/compact_state_data.cppm +++ b/src/function/table/compact_state_data.cppm @@ -95,10 +95,13 @@ public: const HashMap> &GetToDelete() const { return to_delete_; } - void AddNewSegment(SegmentEntry *new_segment, Txn *txn) { + void AddNewSegment(SharedPtr new_segment, Vector compacted_segments, Txn *txn) { std::lock_guard lock(mutex2_); auto *block_index = new_table_ref_->block_index_.get(); - block_index->Insert(new_segment, txn); + block_index->Insert(new_segment.get(), txn); + + CompactSegmentData data{new_segment, std::move(compacted_segments)}; + segment_data_list_.push_back(std::move(data)); } void AddNewIndex(TableIndexEntry *table_index_entry, Txn *txn) { diff --git a/src/main/bg_query_state.cppm b/src/main/bg_query_state.cppm deleted file mode 100644 index b33ef43b81..0000000000 --- a/src/main/bg_query_state.cppm +++ /dev/null @@ -1,34 +0,0 @@ -// Copyright(C) 2023 InfiniFlow, Inc. All rights reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// https://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -module; - -export module bg_query_state; - -import stl; -import logical_node; -import physical_operator; -import plan_fragment; -import fragment_context; - -namespace infinity { - -export struct BGQueryState { - Vector> logical_plans{}; - Vector> physical_plans{}; - SharedPtr plan_fragment{}; - UniquePtr notifier{}; -}; - -} // namespace infinity diff --git a/src/scheduler/fragment_context.cpp b/src/scheduler/fragment_context.cpp index ca58316c51..b83a80ea26 100644 --- a/src/scheduler/fragment_context.cpp +++ b/src/scheduler/fragment_context.cpp @@ -160,11 +160,18 @@ UniquePtr MakeKnnScanState(PhysicalKnnScan *physical_knn_scan, Fr } UniquePtr MakeCompactState(PhysicalCompact *physical_compact, FragmentTask *task, FragmentContext *fragment_ctx) { + SourceState *source_state = task->source_state_.get(); + if (source_state->state_type_ != SourceStateType::kCompact) { + UnrecoverableError("Expect compact source state"); + } + auto *compact_source_state = static_cast(source_state); + if (fragment_ctx->ContextType() != FragmentType::kParallelMaterialize) { UnrecoverableError("Compact operator should be in parallel materialized fragment."); } auto *parallel_materialize_fragment_ctx = static_cast(fragment_ctx); - auto compact_operator_state = MakeUnique(task->TaskID(), parallel_materialize_fragment_ctx->compact_state_data_); + auto compact_operator_state = + MakeUnique(std::move(compact_source_state->segment_groups_), parallel_materialize_fragment_ctx->compact_state_data_); return compact_operator_state; } @@ -689,7 +696,6 @@ SizeT InitCompactFragmentContext(PhysicalCompact *compact_operator, FragmentCont auto *parent_serial_materialize_fragment_ctx = static_cast(parent_context); auto &compact_state_data = parent_serial_materialize_fragment_ctx->compact_state_data_; - compact_state_data->segment_data_list_.resize(task_n); parallel_materialize_fragment_ctx->compact_state_data_ = compact_state_data; return task_n; } @@ -809,8 +815,19 @@ void FragmentContext::MakeSourceState(i64 parallel_count) { tasks_[0]->source_state_ = MakeUnique(); break; } + case PhysicalOperatorType::kCompact: { + if (fragment_type_ != FragmentType::kParallelMaterialize) { + UnrecoverableError( + fmt::format("{} should in parallel materialized fragment", PhysicalOperatorToString(first_operator->operator_type()))); + } + auto *physical_compact = static_cast(first_operator); + Vector>> segment_groups_list = physical_compact->PlanCompact(parallel_count); + for (i64 task_id = 0; task_id < parallel_count; ++task_id) { + tasks_[task_id]->source_state_ = MakeUnique(std::move(segment_groups_list[task_id])); + } + break; + } case PhysicalOperatorType::kCreateIndexDo: - case PhysicalOperatorType::kCompact: case PhysicalOperatorType::kCompactIndexDo: { if (fragment_type_ != FragmentType::kParallelMaterialize) { UnrecoverableError( diff --git a/src/storage/bg_query_state.cppm b/src/storage/bg_query_state.cppm new file mode 100644 index 0000000000..a5197c68d5 --- /dev/null +++ b/src/storage/bg_query_state.cppm @@ -0,0 +1,69 @@ +// Copyright(C) 2023 InfiniFlow, Inc. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +module; + +export module bg_query_state; + +import stl; +import logical_node; +import physical_operator; +import plan_fragment; +import fragment_context; +import query_context; +import infinity_context; +import session_manager; +import session; +import txn; + +namespace infinity { + +export struct BGQueryState { + Vector> logical_plans{}; + Vector> physical_plans{}; + SharedPtr plan_fragment{}; + UniquePtr notifier{}; +}; + +export struct BGQueryContextWrapper { + UniquePtr query_context_; + + SessionManager *session_mgr_; + SharedPtr session_; + + BGQueryContextWrapper(BGQueryContextWrapper &&other) + : query_context_(std::move(other.query_context_)), session_mgr_(other.session_mgr_), session_(std::move(other.session_)) { + other.session_mgr_ = nullptr; + } + + BGQueryContextWrapper(Txn *txn) : session_mgr_(InfinityContext::instance().session_manager()) { + session_ = session_mgr_->CreateLocalSession(); + query_context_ = MakeUnique(session_.get()); + query_context_->Init(InfinityContext::instance().config(), + InfinityContext::instance().task_scheduler(), + InfinityContext::instance().storage(), + InfinityContext::instance().resource_manager(), + InfinityContext::instance().session_manager()); + query_context_->SetTxn(txn); + } + + ~BGQueryContextWrapper() { + if (session_mgr_ != nullptr) { + auto *session = query_context_->current_session(); + session_mgr_->RemoveSessionByID(session->session_id()); + } + } +}; + +} // namespace infinity diff --git a/src/storage/compaction_process.cpp b/src/storage/compaction_process.cpp index 4f324dcb9a..ce5cf80e27 100644 --- a/src/storage/compaction_process.cpp +++ b/src/storage/compaction_process.cpp @@ -32,44 +32,13 @@ import bg_query_state; import query_context; import infinity_context; import compact_statement; -import session; import compilation_config; import defer_op; +import bg_query_state; namespace infinity { -struct BGQueryContextWrapper { - UniquePtr query_context_; - - SessionManager *session_mgr_; - SharedPtr session_; - - BGQueryContextWrapper(BGQueryContextWrapper &&other) - : query_context_(std::move(other.query_context_)), session_mgr_(other.session_mgr_), session_(std::move(other.session_)) { - other.session_mgr_ = nullptr; - } - - BGQueryContextWrapper(Txn *txn, SessionManager *session_mgr) : session_mgr_(session_mgr) { - session_ = session_mgr_->CreateLocalSession(); - query_context_ = MakeUnique(session_.get()); - query_context_->Init(InfinityContext::instance().config(), - InfinityContext::instance().task_scheduler(), - InfinityContext::instance().storage(), - InfinityContext::instance().resource_manager(), - InfinityContext::instance().session_manager()); - query_context_->SetTxn(txn); - } - - ~BGQueryContextWrapper() { - if (session_mgr_ != nullptr) { - auto *session = query_context_->current_session(); - session_mgr_->RemoveSessionByID(session->session_id()); - } - } -}; - -CompactionProcessor::CompactionProcessor(Catalog *catalog, TxnManager *txn_mgr) - : catalog_(catalog), txn_mgr_(txn_mgr), session_mgr_(InfinityContext::instance().session_manager()) {} +CompactionProcessor::CompactionProcessor(Catalog *catalog, TxnManager *txn_mgr) : catalog_(catalog), txn_mgr_(txn_mgr) {} void CompactionProcessor::Start() { LOG_INFO("Compaction processor is started."); @@ -99,7 +68,7 @@ void CompactionProcessor::DoCompact() { Vector, Txn *>> statements = this->ScanForCompact(scan_txn); Vector> wrappers; for (const auto &[statement, txn] : statements) { - BGQueryContextWrapper wrapper(txn, session_mgr_); + BGQueryContextWrapper wrapper(txn); BGQueryState state; bool res = wrapper.query_context_->ExecuteBGStatement(statement.get(), state); if (res) { @@ -107,8 +76,8 @@ void CompactionProcessor::DoCompact() { } } for (auto &[wrapper, query_state] : wrappers) { - TxnTimeStamp commit_ts = 0; - wrapper.query_context_->JoinBGStatement(query_state, commit_ts); + TxnTimeStamp commit_ts_out = 0; + wrapper.query_context_->JoinBGStatement(query_state, commit_ts_out); } txn_mgr_->CommitTxn(scan_txn); success = true; @@ -116,20 +85,19 @@ void CompactionProcessor::DoCompact() { TxnTimeStamp CompactionProcessor::ManualDoCompact(const String &schema_name, const String &table_name, bool rollback, Optional> mid_func) { - TxnTimeStamp commit_ts = 0; - auto statement = MakeUnique(schema_name, table_name); Txn *txn = txn_mgr_->BeginTxn(MakeUnique("ManualCompact")); - BGQueryContextWrapper wrapper(txn, session_mgr_); + BGQueryContextWrapper wrapper(txn); BGQueryState state; bool res = wrapper.query_context_->ExecuteBGStatement(statement.get(), state); if (mid_func) { mid_func.value()(); } + TxnTimeStamp out_commit_ts = 0; if (res) { - wrapper.query_context_->JoinBGStatement(state, commit_ts, rollback); + wrapper.query_context_->JoinBGStatement(state, out_commit_ts, rollback); } - return commit_ts; + return out_commit_ts; } Vector, Txn *>> CompactionProcessor::ScanForCompact(Txn *scan_txn) { diff --git a/src/storage/txn/txn.cpp b/src/storage/txn/txn.cpp index 97fcedae7c..4882b4947c 100644 --- a/src/storage/txn/txn.cpp +++ b/src/storage/txn/txn.cpp @@ -276,15 +276,19 @@ Txn::CreateIndexPrepare(TableIndexEntry *table_index_entry, BaseTableRef *table_ return {segment_index_entries, status}; } - auto *txn_table_store = txn_store_.GetTxnTableStore(table_entry); - txn_table_store->AddSegmentIndexesStore(table_index_entry, segment_index_entries); - for (auto &segment_index_entry : segment_index_entries) { - Vector> chunk_index_entries; - segment_index_entry->GetChunkIndexEntries(chunk_index_entries); - for (auto &chunk_index_intry : chunk_index_entries) { - txn_table_store->AddChunkIndexStore(table_index_entry, chunk_index_intry.get()); + { + std::lock_guard guard(txn_store_.mtx_); + auto *txn_table_store = txn_store_.GetTxnTableStore(table_entry); + txn_table_store->AddSegmentIndexesStore(table_index_entry, segment_index_entries); + for (auto &segment_index_entry : segment_index_entries) { + Vector> chunk_index_entries; + segment_index_entry->GetChunkIndexEntries(chunk_index_entries); + for (auto &chunk_index_intry : chunk_index_entries) { + txn_table_store->AddChunkIndexStore(table_index_entry, chunk_index_intry.get()); + } } } + return {segment_index_entries, Status::OK()}; } diff --git a/src/storage/txn/txn_store.cppm b/src/storage/txn/txn_store.cppm index 4831ab7070..b8a282f6dc 100644 --- a/src/storage/txn/txn_store.cppm +++ b/src/storage/txn/txn_store.cppm @@ -188,6 +188,8 @@ public: bool Empty() const; + std::mutex mtx_{}; + private: // Txn store Txn *txn_{}; // TODO: remove this diff --git a/src/unit_test/storage/wal/catalog_delta_replay.cpp b/src/unit_test/storage/wal/catalog_delta_replay.cpp index b77bcf2db2..a5a822d778 100644 --- a/src/unit_test/storage/wal/catalog_delta_replay.cpp +++ b/src/unit_test/storage/wal/catalog_delta_replay.cpp @@ -898,6 +898,7 @@ TEST_F(CatalogDeltaReplayTest, replay_table_single_index) { } last_commit_ts = txn_mgr->CommitTxn(txn_idx); } + WaitFlushDeltaOp(storage, last_commit_ts); } { // Drop index (by Name) @@ -909,8 +910,8 @@ TEST_F(CatalogDeltaReplayTest, replay_table_single_index) { } last_commit_ts = txn_mgr->CommitTxn(txn_idx_drop); } + WaitFlushDeltaOp(storage, last_commit_ts); } - WaitFlushDeltaOp(storage, last_commit_ts); infinity::InfinityContext::instance().UnInit(); } }