From 024ddc4f80dbd7922c7370da4296aad152550219 Mon Sep 17 00:00:00 2001 From: shenyushi Date: Thu, 9 May 2024 14:59:50 +0800 Subject: [PATCH] Fix: cleanup visit_ts calculate. --- conf/infinity_conf.toml | 16 ++--- src/main/query_context.cpp | 20 +++--- src/storage/bg_task/periodic_trigger.cpp | 5 +- src/storage/buffer/buffer_obj.cppm | 4 +- src/storage/meta/catalog.cpp | 4 -- src/storage/meta/catalog.cppm | 4 -- src/storage/txn/txn_manager.cpp | 64 +++++-------------- src/storage/txn/txn_manager.cppm | 10 +-- src/storage/wal/catalog_delta_entry.cpp | 6 +- src/storage/wal/wal_manager.cpp | 26 +++++--- src/storage/wal/wal_manager.cppm | 4 +- .../storage/bg_task/cleanup_task.cpp | 54 +++++++++------- src/unit_test/storage/buffer/buffer_obj.cpp | 15 +++-- .../knnindex/merge_optimize/test_optimize.cpp | 20 +++--- .../storage/wal/catalog_delta_replay.cpp | 38 ++++------- src/unit_test/storage/wal/checkpoint.cpp | 27 ++++---- test/data/config/test_cleanup_task.toml | 2 + test/data/config/test_close_all_bgtask.toml | 4 +- 18 files changed, 147 insertions(+), 176 deletions(-) diff --git a/conf/infinity_conf.toml b/conf/infinity_conf.toml index 4a6fbd4e39..434fe19d64 100644 --- a/conf/infinity_conf.toml +++ b/conf/infinity_conf.toml @@ -43,7 +43,7 @@ storage_capacity = "64GB" # s means seconds, for example "60s", 60 seconds # m means minutes, for example "60m", 60 minutes # h means hours, for example "1h", 1 hour -garbage_collection_interval = "60s" +cleanup_interval = "60s" # storage ratio activates garbage collection: # 0 means disable, @@ -51,23 +51,23 @@ garbage_collection_interval = "60s" garbage_collection_storage_ratio = 0.1 # dump memory index entry when it reachs the capacity -memindex_capacity = 1048576 +mem_index_capacity = 1048576 [buffer] -buffer_pool_size = "4GB" +buffer_manager_size = "4GB" temp_dir = "/var/infinity/tmp" [wal] wal_dir = "/var/infinity/wal" -full_checkpoint_interval_sec = 86400 -delta_checkpoint_interval_sec = 60 -delta_checkpoint_interval_wal_bytes = 1000000000 -wal_file_size_threshold = "1GB" +full_checkpoint_interval = "86400s" +delta_checkpoint_interval = "60s" +# delta_checkpoint_threshold = 1000000000 +wal_compact_threshold = "1GB" # flush_at_once: write and flush log each commit # only_write: write log, OS control when to flush the log, default # flush_per_second: logs are written after each commit and flushed to disk per second. -flush_at_commit = "only_write" +flush_option = "only_write" [resource] dictionary_dir = "/var/infinity/resource" diff --git a/src/main/query_context.cpp b/src/main/query_context.cpp index f127864a32..67b26496dc 100644 --- a/src/main/query_context.cpp +++ b/src/main/query_context.cpp @@ -50,6 +50,7 @@ import session_manager; import base_statement; import parser_result; import parser_assert; +import plan_fragment; namespace infinity { @@ -106,6 +107,10 @@ QueryResult QueryContext::Query(const String &query) { QueryResult QueryContext::QueryStatement(const BaseStatement *statement) { QueryResult query_result; + SharedPtr logical_plan = nullptr; + UniquePtr plan_fragment = nullptr; + UniquePtr physical_plan = nullptr; + // ProfilerStart("Query"); // BaseProfiler profiler; // profiler.Begin(); @@ -127,7 +132,7 @@ QueryResult QueryContext::QueryStatement(const BaseStatement *statement) { } current_max_node_id_ = bind_context->GetNewLogicalNodeId(); - SharedPtr logical_plan = logical_planner_->LogicalPlan(); + logical_plan = logical_planner_->LogicalPlan(); StopProfile(QueryPhase::kLogicalPlan); // LOG_WARN(fmt::format("Before optimizer cost: {}", profiler.ElapsedToString())); // Apply optimized rule to the logical plan @@ -137,13 +142,13 @@ QueryResult QueryContext::QueryStatement(const BaseStatement *statement) { // Build physical plan StartProfile(QueryPhase::kPhysicalPlan); - UniquePtr physical_plan = physical_planner_->BuildPhysicalOperator(logical_plan); + physical_plan = physical_planner_->BuildPhysicalOperator(logical_plan); StopProfile(QueryPhase::kPhysicalPlan); // LOG_WARN(fmt::format("Before pipeline cost: {}", profiler.ElapsedToString())); StartProfile(QueryPhase::kPipelineBuild); // Fragment Builder, only for test now. // SharedPtr plan_fragment = fragment_builder.Build(physical_plan); - auto plan_fragment = fragment_builder_->BuildFragment(physical_plan.get()); + plan_fragment = fragment_builder_->BuildFragment(physical_plan.get()); StopProfile(QueryPhase::kPipelineBuild); auto notifier = MakeUnique(); @@ -159,14 +164,7 @@ QueryResult QueryContext::QueryStatement(const BaseStatement *statement) { StopProfile(QueryPhase::kExecution); // LOG_WARN(fmt::format("Before commit cost: {}", profiler.ElapsedToString())); StartProfile(QueryPhase::kCommit); - try { - this->CommitTxn(); - } catch (RecoverableException &e) { - StopProfile(); - this->RollbackTxn(); - query_result.result_table_ = nullptr; - query_result.status_.Init(e.ErrorCode(), e.what()); - } + this->CommitTxn(); StopProfile(QueryPhase::kCommit); } catch (RecoverableException &e) { diff --git a/src/storage/bg_task/periodic_trigger.cpp b/src/storage/bg_task/periodic_trigger.cpp index 8f28cf668c..d378b65208 100644 --- a/src/storage/bg_task/periodic_trigger.cpp +++ b/src/storage/bg_task/periodic_trigger.cpp @@ -28,7 +28,7 @@ import third_party; namespace infinity { void CleanupPeriodicTrigger::Trigger() { - TxnTimeStamp visible_ts = txn_mgr_->GetMinUnflushedTS(); + TxnTimeStamp visible_ts = txn_mgr_->GetCleanupScanTS(); // if (visible_ts == last_visible_ts_) { // LOG_TRACE(fmt::format("Skip cleanup. visible timestamp: {}", visible_ts)); // return; @@ -49,8 +49,7 @@ void CheckpointPeriodicTrigger::Trigger() { auto checkpoint_task = MakeShared(is_full_checkpoint_); LOG_TRACE(fmt::format("Trigger {} periodic checkpoint.", is_full_checkpoint_ ? "FULL" : "DELTA")); if (!wal_mgr_->TrySubmitCheckpointTask(std::move(checkpoint_task))) { - LOG_TRACE( - fmt::format("Skip {} checkpoint(time) because there is already a checkpoint task running.", is_full_checkpoint_ ? "FULL" : "DELTA")); + LOG_TRACE(fmt::format("Skip {} checkpoint(time) because there is already a checkpoint task running.", is_full_checkpoint_ ? "FULL" : "DELTA")); } } diff --git a/src/storage/buffer/buffer_obj.cppm b/src/storage/buffer/buffer_obj.cppm index 43d4c1ab60..982c880fff 100644 --- a/src/storage/buffer/buffer_obj.cppm +++ b/src/storage/buffer/buffer_obj.cppm @@ -48,6 +48,8 @@ export String BufferStatusToString(BufferStatus status) { return "Freed"; case BufferStatus::kNew: return "New"; + case BufferStatus::kClean: + return "Clean"; default: return "Invalid"; } @@ -77,7 +79,7 @@ public: void CleanupFile() const; - void CleanupTempFile() const ; + void CleanupTempFile() const; SizeT GetBufferSize() const { return file_worker_->GetMemoryCost(); } diff --git a/src/storage/meta/catalog.cpp b/src/storage/meta/catalog.cpp index 850f71f780..466e9ed2f3 100644 --- a/src/storage/meta/catalog.cpp +++ b/src/storage/meta/catalog.cpp @@ -77,8 +77,6 @@ Catalog::~Catalog() { mem_index_commit_thread_.join(); } -void Catalog::SetTxnMgr(TxnManager *txn_mgr) { txn_mgr_ = txn_mgr; } - // do not only use this method to create database // it will not record database in transaction, so when you commit transaction // it will lose operation @@ -887,8 +885,6 @@ bool Catalog::SaveDeltaCatalog(TxnTimeStamp max_commit_ts, String &delta_catalog // Check the SegmentEntry's for flush the data to disk. UniquePtr flush_delta_entry = global_catalog_delta_entry_->PickFlushEntry(max_commit_ts); - DeferFn defer_fn([&]() { txn_mgr_->RemoveWaitFlushTxns(flush_delta_entry->txn_ids()); }); - if (flush_delta_entry->operations().empty()) { LOG_TRACE("Save delta catalog ops is empty. Skip flush."); return true; diff --git a/src/storage/meta/catalog.cppm b/src/storage/meta/catalog.cppm index 77d27ca1f3..6d431cd247 100644 --- a/src/storage/meta/catalog.cppm +++ b/src/storage/meta/catalog.cppm @@ -106,8 +106,6 @@ public: ~Catalog(); - void SetTxnMgr(TxnManager *txn_mgr); - public: // Database related functions Tuple CreateDatabase(const String &db_name, @@ -291,8 +289,6 @@ public: ProfileHistory history{DEFAULT_PROFILER_HISTORY_SIZE}; - TxnManager *txn_mgr_{nullptr}; - private: // TODO: remove this std::shared_mutex &rw_locker() { return db_meta_map_.rw_locker_; } diff --git a/src/storage/txn/txn_manager.cpp b/src/storage/txn/txn_manager.cpp index 1a3d21f256..46737a919e 100644 --- a/src/storage/txn/txn_manager.cpp +++ b/src/storage/txn/txn_manager.cpp @@ -43,9 +43,7 @@ TxnManager::TxnManager(Catalog *catalog, TxnTimeStamp start_ts, bool enable_compaction) : catalog_(catalog), buffer_mgr_(buffer_mgr), bg_task_processor_(bg_task_processor), wal_mgr_(wal_mgr), start_ts_(start_ts), is_running_(false), - enable_compaction_(enable_compaction) { - catalog_->SetTxnMgr(this); -} + enable_compaction_(enable_compaction) {} Txn *TxnManager::BeginTxn() { // Check if the is_running_ is true @@ -66,7 +64,6 @@ Txn *TxnManager::BeginTxn() { // Storage txn in txn manager txn_map_[new_txn_id] = new_txn; - ts_map_.emplace(ts, new_txn_id); beginned_txns_.emplace_back(new_txn); rw_locker_.unlock(); @@ -182,8 +179,6 @@ void TxnManager::AddDeltaEntry(UniquePtr delta_entry) { UnrecoverableError("TxnManager is not running, cannot add delta entry"); } i64 wal_size = wal_mgr_->WalSize(); - const auto &txn_ids = delta_entry->txn_ids(); - this->AddWaitFlushTxn(txn_ids); bg_task_processor_->Submit(MakeShared(std::move(delta_entry), wal_size)); } @@ -235,6 +230,21 @@ SizeT TxnManager::ActiveTxnCount() { TxnTimeStamp TxnManager::CurrentTS() const { return start_ts_; } +TxnTimeStamp TxnManager::GetCleanupScanTS() { + std::lock_guard guard(rw_locker_); + TxnTimeStamp first_uncommitted_begin_ts = start_ts_; + while (!beginned_txns_.empty()) { + auto first_txn = beginned_txns_.front().lock(); + if (first_txn.get() != nullptr) { + first_uncommitted_begin_ts = first_txn->BeginTS(); + break; + } + beginned_txns_.pop_front(); + } + TxnTimeStamp checkpointed_ts = wal_mgr_->GetCheckpointedTS(); + return std::min(first_uncommitted_begin_ts, checkpointed_ts); +} + // A Txn can be deleted when there is no uncommitted txn whose begin is less than the commit ts of the txn // So maintain the least uncommitted begin ts void TxnManager::FinishTxn(Txn *txn) { @@ -292,46 +302,4 @@ void TxnManager::FinishTxn(Txn *txn) { } } -void TxnManager::AddWaitFlushTxn(const Vector &txn_ids) { - // std::stringstream ss; - // for (auto txn_id : txn_ids) { - // ss << txn_id << " "; - // } - // LOG_INFO(fmt::format("Add txns: {} to wait flush set", ss.str())); - std::unique_lock w_lock(rw_locker_); - wait_flush_txns_.insert(txn_ids.begin(), txn_ids.end()); -} - -void TxnManager::RemoveWaitFlushTxns(const Vector &txn_ids) { - // std::stringstream ss2; - // for (auto txn_id : txn_ids) { - // ss2 << txn_id << " "; - // } - // LOG_INFO(fmt::format("Remove txn: {} from wait flush set", ss2.str())); - std::unique_lock w_lock(rw_locker_); - for (auto txn_id : txn_ids) { - if (!wait_flush_txns_.erase(txn_id)) { - UnrecoverableError(fmt::format("Txn: {} not found in wait flush set", txn_id)); - } - } -} - -TxnTimeStamp TxnManager::GetMinUnflushedTS() { - std::unique_lock w_locker(rw_locker_); - for (auto iter = ts_map_.begin(); iter != ts_map_.end();) { - auto &[ts, txn_id] = *iter; - if (txn_map_.find(txn_id) != txn_map_.end()) { - LOG_TRACE(fmt::format("Txn: {} found in txn map", txn_id)); - return ts; - } - if (wait_flush_txns_.find(txn_id) != wait_flush_txns_.end()) { - LOG_TRACE(fmt::format("Txn: {} wait flush", txn_id)); - return ts; - } - iter = ts_map_.erase(iter); - } - LOG_TRACE(fmt::format("No txn is active, return the next ts {}", start_ts_)); - return start_ts_; -} - } // namespace infinity diff --git a/src/storage/txn/txn_manager.cppm b/src/storage/txn/txn_manager.cppm index f5d1e1b785..ab549e1e29 100644 --- a/src/storage/txn/txn_manager.cppm +++ b/src/storage/txn/txn_manager.cppm @@ -85,15 +85,12 @@ public: TxnTimeStamp CurrentTS() const; + TxnTimeStamp GetCleanupScanTS(); + private: void FinishTxn(Txn *txn); - void AddWaitFlushTxn(const Vector &txn_ids); - public: - void RemoveWaitFlushTxns(const Vector &txn_ids); - - TxnTimeStamp GetMinUnflushedTS(); bool enable_compaction() const { return enable_compaction_; } @@ -113,9 +110,6 @@ private: Map wait_conflict_ck_{}; // sorted by commit ts Atomic start_ts_{}; // The next txn ts - // Deque ts_queue_{}; // the ts queue - Map ts_map_{}; // optimize the data structure - HashSet wait_flush_txns_{}; // For stop the txn manager atomic_bool is_running_{false}; diff --git a/src/storage/wal/catalog_delta_entry.cpp b/src/storage/wal/catalog_delta_entry.cpp index 055dc49673..5c33542aff 100644 --- a/src/storage/wal/catalog_delta_entry.cpp +++ b/src/storage/wal/catalog_delta_entry.cpp @@ -998,15 +998,17 @@ void GlobalCatalogDeltaEntry::AddDeltaEntry(UniquePtr delta_e // } // } std::lock_guard lock(catalog_delta_locker_); - if (delta_entry->sequence() != last_sequence_ + 1) { + u64 entry_sequence = delta_entry->sequence(); + if (entry_sequence != last_sequence_ + 1) { // Discontinuous - u64 entry_sequence = delta_entry->sequence(); + // LOG_INFO(fmt::format("Add delta entry: {} in to sequence_heap_", entry_sequence)); sequence_heap_.push(entry_sequence); delta_entry_map_.emplace(entry_sequence, std::move(delta_entry)); } else { // Continuous do { wal_size_ = std::max(wal_size_, wal_size); + // LOG_INFO(fmt::format("Add delta entry: {} in to delta_ops_", entry_sequence)); this->AddDeltaEntryInner(delta_entry.get()); ++last_sequence_; diff --git a/src/storage/wal/wal_manager.cpp b/src/storage/wal/wal_manager.cpp index ae758b4584..29f3af66fc 100644 --- a/src/storage/wal/wal_manager.cpp +++ b/src/storage/wal/wal_manager.cpp @@ -58,7 +58,11 @@ module wal_manager; namespace infinity { -WalManager::WalManager(Storage *storage, String wal_dir, u64 wal_size_threshold, u64 delta_checkpoint_interval_wal_bytes, FlushOption flush_option) +WalManager::WalManager(Storage *storage, + String wal_dir, + u64 wal_size_threshold, + u64 delta_checkpoint_interval_wal_bytes, + FlushOptionType flush_option) : cfg_wal_size_threshold_(wal_size_threshold), cfg_delta_checkpoint_interval_wal_bytes_(delta_checkpoint_interval_wal_bytes), wal_dir_(wal_dir), wal_path_(wal_dir + "/" + WalFile::TempWalFilename()), storage_(storage), running_(false), flush_option_(flush_option), last_ckp_wal_size_(0), checkpoint_in_progress_(false), last_ckp_ts_(UNCOMMIT_TS), last_full_ckp_ts_(UNCOMMIT_TS) {} @@ -142,6 +146,8 @@ i64 WalManager::WalSize() const { return wal_size_; } +TxnTimeStamp WalManager::GetCheckpointedTS() { return last_ckp_ts_ == UNCOMMIT_TS ? 0 : last_ckp_ts_ + 1; } + // Flush is scheduled regularly. It collects a batch of transactions, sync // wal and do parallel committing. Each sync cost ~1s. Each checkpoint cost // ~10s. So it's necessary to sync for a batch of transactions, and to @@ -274,25 +280,27 @@ void WalManager::Checkpoint(ForceCheckpointTask *ckp_task, TxnTimeStamp max_comm void WalManager::CheckpointInner(bool is_full_checkpoint, Txn *txn, TxnTimeStamp max_commit_ts, i64 wal_size) { DeferFn defer([&] { checkpoint_in_progress_.store(false); }); + TxnTimeStamp last_ckp_ts = last_ckp_ts_; + TxnTimeStamp last_full_ckp_ts = last_full_ckp_ts_; if (is_full_checkpoint) { - if (max_commit_ts == last_full_ckp_ts_) { + if (max_commit_ts == last_full_ckp_ts) { LOG_TRACE(fmt::format("Skip full checkpoint because the max_commit_ts {} is the same as the last full checkpoint", max_commit_ts)); return; } - if (last_full_ckp_ts_ != UNCOMMIT_TS && last_full_ckp_ts_ >= max_commit_ts) { + if (last_full_ckp_ts != UNCOMMIT_TS && last_full_ckp_ts >= max_commit_ts) { UnrecoverableError( - fmt::format("WalManager::UpdateLastFullMaxCommitTS last_full_ckp_ts_ {} >= max_commit_ts {}", last_full_ckp_ts_, max_commit_ts)); + fmt::format("WalManager::UpdateLastFullMaxCommitTS last_full_ckp_ts {} >= max_commit_ts {}", last_full_ckp_ts, max_commit_ts)); } - if (last_ckp_ts_ != UNCOMMIT_TS && last_ckp_ts_ > max_commit_ts) { - UnrecoverableError(fmt::format("WalManager::UpdateLastFullMaxCommitTS last_ckp_ts_ {} >= max_commit_ts {}", last_ckp_ts_, max_commit_ts)); + if (last_ckp_ts != UNCOMMIT_TS && last_ckp_ts > max_commit_ts) { + UnrecoverableError(fmt::format("WalManager::UpdateLastFullMaxCommitTS last_ckp_ts {} >= max_commit_ts {}", last_ckp_ts, max_commit_ts)); } } else { - if (max_commit_ts == last_ckp_ts_) { + if (max_commit_ts == last_ckp_ts) { LOG_TRACE(fmt::format("Skip delta checkpoint because the max_commit_ts {} is the same as the last checkpoint", max_commit_ts)); return; } - if (last_ckp_ts_ >= max_commit_ts) { - UnrecoverableError(fmt::format("WalManager::UpdateLastMaxCommitTS last_ckp_ts_ {} >= max_commit_ts {}", last_ckp_ts_, max_commit_ts)); + if (last_ckp_ts >= max_commit_ts) { + UnrecoverableError(fmt::format("WalManager::UpdateLastMaxCommitTS last_ckp_ts {} >= max_commit_ts {}", last_ckp_ts, max_commit_ts)); } } try { diff --git a/src/storage/wal/wal_manager.cppm b/src/storage/wal/wal_manager.cppm index a91e765fee..82599f9072 100644 --- a/src/storage/wal/wal_manager.cppm +++ b/src/storage/wal/wal_manager.cppm @@ -70,6 +70,8 @@ public: i64 GetLastCkpWalSize(); + TxnTimeStamp GetCheckpointedTS(); + private: // Checkpoint Helper void CheckpointInner(bool is_full_checkpoint, Txn *txn, TxnTimeStamp max_commit_ts, i64 wal_size); @@ -123,7 +125,7 @@ private: i64 last_ckp_wal_size_{}; Atomic checkpoint_in_progress_{false}; - // Only Checkpoint thread access following members + // Only Checkpoint/Cleanup thread access following members TxnTimeStamp last_ckp_ts_{}; TxnTimeStamp last_full_ckp_ts_{}; }; diff --git a/src/unit_test/storage/bg_task/cleanup_task.cpp b/src/unit_test/storage/bg_task/cleanup_task.cpp index 264a77a030..5b51dca4cc 100644 --- a/src/unit_test/storage/bg_task/cleanup_task.cpp +++ b/src/unit_test/storage/bg_task/cleanup_task.cpp @@ -40,32 +40,42 @@ import third_party; import base_table_ref; import index_secondary; import infinity_exception; +import wal_manager; using namespace infinity; class CleanupTaskTest : public BaseTest { protected: - void WaitCleanup(Catalog *catalog, TxnManager *txn_mgr, TxnTimeStamp last_commit_ts) { - LOG_INFO("Waiting cleanup"); + void WaitCleanup(Storage *storage, TxnTimeStamp last_commit_ts) { + Catalog *catalog = storage->catalog(); + BufferManager *buffer_mgr = storage->buffer_manager(); + + auto visible_ts = WaitFlushDeltaOp(storage, last_commit_ts); + + auto cleanup_task = MakeShared(catalog, visible_ts, buffer_mgr); + cleanup_task->Execute(); + } + + TxnTimeStamp WaitFlushDeltaOp(Storage *storage, TxnTimeStamp last_commit_ts) { + TxnManager *txn_mgr = storage->txn_manager(); + TxnTimeStamp visible_ts = 0; time_t start = time(nullptr); while (true) { - visible_ts = txn_mgr->GetMinUnflushedTS(); + visible_ts = txn_mgr->GetCleanupScanTS(); + // wait for at most 10s time_t end = time(nullptr); if (visible_ts >= last_commit_ts) { - LOG_INFO(fmt::format("Cleanup finished after {}", end - start)); + LOG_INFO(fmt::format("FlushDeltaOp finished after {}", end - start)); break; } - // wait for at most 10s if (end - start > 10) { - UnrecoverableException("WaitCleanup timeout"); + UnrecoverableException("WaitFlushDeltaOp timeout"); } + LOG_INFO(fmt::format("Before usleep. Wait flush delta op for {} seconds", end - start)); usleep(1000 * 1000); } - - auto buffer_mgr = txn_mgr->GetBufferMgr(); - auto cleanup_task = MakeShared(catalog, visible_ts, buffer_mgr); - cleanup_task->Execute(); + return visible_ts; } }; @@ -79,22 +89,22 @@ TEST_F(CleanupTaskTest, test_delete_db_simple) { EXPECT_NE(storage, nullptr); TxnManager *txn_mgr = storage->txn_manager(); - Catalog *catalog = storage->catalog(); TxnTimeStamp last_commit_ts = 0; auto db_name = MakeShared("db1"); { auto *txn = txn_mgr->BeginTxn(); txn->CreateDatabase(*db_name, ConflictType::kError); - txn_mgr->CommitTxn(txn); + last_commit_ts = txn_mgr->CommitTxn(txn); } + WaitFlushDeltaOp(storage, last_commit_ts); { auto *txn = txn_mgr->BeginTxn(); Status status = txn->DropDatabase(*db_name, ConflictType::kError); EXPECT_TRUE(status.ok()); last_commit_ts = txn_mgr->CommitTxn(txn); } - WaitCleanup(catalog, txn_mgr, last_commit_ts); + WaitCleanup(storage, last_commit_ts); { auto *txn = txn_mgr->BeginTxn(); auto [db_entry, status] = txn->GetDatabase(*db_name); @@ -114,7 +124,6 @@ TEST_F(CleanupTaskTest, test_delete_db_complex) { EXPECT_NE(storage, nullptr); TxnManager *txn_mgr = storage->txn_manager(); - Catalog *catalog = storage->catalog(); TxnTimeStamp last_commit_ts = 0; auto db_name = MakeShared("db1"); @@ -143,7 +152,7 @@ TEST_F(CleanupTaskTest, test_delete_db_complex) { auto *txn = txn_mgr->BeginTxn(); Status status = txn->DropDatabase(*db_name, ConflictType::kError); EXPECT_TRUE(status.ok()); - WaitCleanup(catalog, txn_mgr, last_commit_ts); + WaitCleanup(storage, last_commit_ts); txn_mgr->CommitTxn(txn); } { @@ -165,7 +174,6 @@ TEST_F(CleanupTaskTest, test_delete_table_simple) { EXPECT_NE(storage, nullptr); TxnManager *txn_mgr = storage->txn_manager(); - Catalog *catalog = storage->catalog(); TxnTimeStamp last_commit_ts = 0; Vector> column_defs; @@ -182,8 +190,9 @@ TEST_F(CleanupTaskTest, test_delete_table_simple) { auto *txn = txn_mgr->BeginTxn(); auto status = txn->CreateTable(*db_name, std::move(table_def), ConflictType::kIgnore); EXPECT_TRUE(status.ok()); - txn_mgr->CommitTxn(txn); + last_commit_ts = txn_mgr->CommitTxn(txn); } + WaitFlushDeltaOp(storage, last_commit_ts); { auto *txn = txn_mgr->BeginTxn(); @@ -193,7 +202,7 @@ TEST_F(CleanupTaskTest, test_delete_table_simple) { last_commit_ts = txn_mgr->CommitTxn(txn); } - WaitCleanup(catalog, txn_mgr, last_commit_ts); + WaitCleanup(storage, last_commit_ts); { auto *txn = txn_mgr->BeginTxn(); auto [table_entry, status] = txn->GetTableByName(*db_name, *table_name); @@ -215,7 +224,6 @@ TEST_F(CleanupTaskTest, test_delete_table_complex) { EXPECT_NE(storage, nullptr); TxnManager *txn_mgr = storage->txn_manager(); - Catalog *catalog = storage->catalog(); TxnTimeStamp last_commit_ts = 0; Vector> column_defs; @@ -262,7 +270,7 @@ TEST_F(CleanupTaskTest, test_delete_table_complex) { Status status = txn->DropTableCollectionByName(*db_name, *table_name, ConflictType::kError); EXPECT_TRUE(status.ok()); - WaitCleanup(catalog, txn_mgr, last_commit_ts); + WaitCleanup(storage, last_commit_ts); txn_mgr->CommitTxn(txn); } { @@ -290,7 +298,6 @@ TEST_F(CleanupTaskTest, test_compact_and_cleanup) { TxnManager *txn_mgr = storage->txn_manager(); BufferManager *buffer_mgr = storage->buffer_manager(); - Catalog *catalog = storage->catalog(); TxnTimeStamp last_commit_ts = 0; Vector> column_defs; @@ -362,7 +369,7 @@ TEST_F(CleanupTaskTest, test_compact_and_cleanup) { last_commit_ts = txn_mgr->CommitTxn(txn); } - WaitCleanup(catalog, txn_mgr, last_commit_ts); + WaitCleanup(storage, last_commit_ts); InfinityContext::instance().UnInit(); } @@ -381,7 +388,6 @@ TEST_F(CleanupTaskTest, test_with_index_compact_and_cleanup) { TxnManager *txn_mgr = storage->txn_manager(); BufferManager *buffer_mgr = storage->buffer_manager(); - Catalog *catalog = storage->catalog(); TxnTimeStamp last_commit_ts = 0; auto db_name = MakeShared("default_db"); @@ -474,7 +480,7 @@ TEST_F(CleanupTaskTest, test_with_index_compact_and_cleanup) { last_commit_ts = txn_mgr->CommitTxn(txn); } - WaitCleanup(catalog, txn_mgr, last_commit_ts); + WaitCleanup(storage, last_commit_ts); InfinityContext::instance().UnInit(); } \ No newline at end of file diff --git a/src/unit_test/storage/buffer/buffer_obj.cpp b/src/unit_test/storage/buffer/buffer_obj.cpp index 8287205f45..bb9f6f1399 100644 --- a/src/unit_test/storage/buffer/buffer_obj.cpp +++ b/src/unit_test/storage/buffer/buffer_obj.cpp @@ -49,6 +49,7 @@ import embedding_info; import bg_task; import physical_import; import chunk_index_entry; +import wal_manager; using namespace infinity; @@ -77,12 +78,16 @@ class BufferObjTest : public BaseTest { public: void SaveBufferObj(BufferObj *buffer_obj) { buffer_obj->Save(); }; - void WaitCleanup(Catalog *catalog, TxnManager *txn_mgr, TxnTimeStamp last_commit_ts) { + void WaitCleanup(Storage *storage, TxnTimeStamp last_commit_ts) { + Catalog *catalog = storage->catalog(); + TxnManager *txn_mgr = storage->txn_manager(); + BufferManager *buffer_mgr = storage->buffer_manager(); + LOG_INFO("Waiting cleanup"); TxnTimeStamp visible_ts = 0; time_t start = time(nullptr); while (true) { - visible_ts = txn_mgr->GetMinUnflushedTS(); + visible_ts = txn_mgr->GetCleanupScanTS(); time_t end = time(nullptr); if (visible_ts >= last_commit_ts) { LOG_INFO(fmt::format("Cleanup finished after {}", end - start)); @@ -96,7 +101,6 @@ class BufferObjTest : public BaseTest { usleep(1000 * 1000); } - auto buffer_mgr = txn_mgr->GetBufferMgr(); auto cleanup_task = MakeShared(catalog, visible_ts, buffer_mgr); cleanup_task->Execute(); } @@ -493,7 +497,7 @@ TEST_F(BufferObjTest, test1) { // } TEST_F(BufferObjTest, test_hnsw_index_buffer_obj_shutdown) { - GTEST_SKIP(); // FIXME + // GTEST_SKIP(); // FIXME #ifdef INFINITY_DEBUG infinity::InfinityContext::instance().UnInit(); @@ -515,7 +519,6 @@ TEST_F(BufferObjTest, test_hnsw_index_buffer_obj_shutdown) { TxnManager *txn_mgr = storage->txn_manager(); BufferManager *buffer_mgr = storage->buffer_manager(); - Catalog *catalog = storage->catalog(); TxnTimeStamp last_commit_ts = 0; auto db_name = MakeShared("default_db"); @@ -661,7 +664,7 @@ TEST_F(BufferObjTest, test_hnsw_index_buffer_obj_shutdown) { txn->DropTableCollectionByName(*db_name, *table_name, ConflictType::kError); last_commit_ts = txn_mgr->CommitTxn(txn); } - WaitCleanup(catalog, txn_mgr, last_commit_ts); + WaitCleanup(storage, last_commit_ts); } TEST_F(BufferObjTest, test_big_with_gc_and_cleanup) { diff --git a/src/unit_test/storage/knnindex/merge_optimize/test_optimize.cpp b/src/unit_test/storage/knnindex/merge_optimize/test_optimize.cpp index 274285250d..6fd11c6bb1 100644 --- a/src/unit_test/storage/knnindex/merge_optimize/test_optimize.cpp +++ b/src/unit_test/storage/knnindex/merge_optimize/test_optimize.cpp @@ -39,6 +39,8 @@ import catalog; import infinity_exception; import bg_task; import txn_store; +import wal_manager; +import buffer_manager; using namespace infinity; @@ -63,12 +65,16 @@ class OptimizeKnnTest : public BaseTest { RemoveDbDirs(); } - void WaitCleanup(Catalog *catalog, TxnManager *txn_mgr, TxnTimeStamp last_commit_ts) { + void WaitCleanup(Storage *storage, TxnTimeStamp last_commit_ts) { + Catalog *catalog = storage->catalog(); + TxnManager *txn_mgr = storage->txn_manager(); + BufferManager *buffer_mgr = storage->buffer_manager(); + LOG_INFO("Waiting cleanup"); TxnTimeStamp visible_ts = 0; time_t start = time(nullptr); while (true) { - visible_ts = txn_mgr->GetMinUnflushedTS(); + visible_ts = txn_mgr->GetCleanupScanTS(); time_t end = time(nullptr); if (visible_ts >= last_commit_ts) { LOG_INFO(fmt::format("Cleanup finished after {}", end - start)); @@ -76,12 +82,12 @@ class OptimizeKnnTest : public BaseTest { } // wait for at most 10s if (end - start > 10) { - UnrecoverableException("WaitCleanup timeout"); + UnrecoverableError("WaitCleanup timeout"); } + LOG_INFO(fmt::format("Before usleep. Wait cleanup for {} seconds", end - start)); usleep(1000 * 1000); } - auto buffer_mgr = txn_mgr->GetBufferMgr(); auto cleanup_task = MakeShared(catalog, visible_ts, buffer_mgr); cleanup_task->Execute(); } @@ -89,7 +95,6 @@ class OptimizeKnnTest : public BaseTest { TEST_F(OptimizeKnnTest, test1) { Storage *storage = InfinityContext::instance().storage(); - Catalog *catalog = storage->catalog(); TxnManager *txn_mgr = storage->txn_manager(); auto db_name = std::make_shared("default_db"); @@ -195,7 +200,7 @@ TEST_F(OptimizeKnnTest, test1) { last_commit_ts = txn_mgr->CommitTxn(txn); } - WaitCleanup(catalog, txn_mgr, last_commit_ts); + WaitCleanup(storage, last_commit_ts); { auto *txn = txn_mgr->BeginTxn(); @@ -222,7 +227,6 @@ TEST_F(OptimizeKnnTest, test1) { TEST_F(OptimizeKnnTest, test_secondary_index_optimize) { Storage *storage = InfinityContext::instance().storage(); - Catalog *catalog = storage->catalog(); TxnManager *txn_mgr = storage->txn_manager(); auto db_name = std::make_shared("default_db"); @@ -297,7 +301,7 @@ TEST_F(OptimizeKnnTest, test_secondary_index_optimize) { table_entry->OptimizeIndex(txn); last_commit_ts = txn_mgr->CommitTxn(txn); } - WaitCleanup(catalog, txn_mgr, last_commit_ts); + WaitCleanup(storage, last_commit_ts); { auto *txn = txn_mgr->BeginTxn(); auto [table_entry, status1] = txn->GetTableByName(*db_name, *table_name); diff --git a/src/unit_test/storage/wal/catalog_delta_replay.cpp b/src/unit_test/storage/wal/catalog_delta_replay.cpp index f3451fa27c..abd08bfec9 100644 --- a/src/unit_test/storage/wal/catalog_delta_replay.cpp +++ b/src/unit_test/storage/wal/catalog_delta_replay.cpp @@ -50,31 +50,19 @@ import logger; import infinity_exception; import default_values; import block_index; +import wal_manager; using namespace infinity; class CatalogDeltaReplayTest : public BaseTest { protected: - void WaitFlushDeltaOp(TxnManager *txn_mgr, TxnTimeStamp last_commit_ts) { - // TxnTimeStamp visible_ts = 0; - // auto start = std::chrono::steady_clock::now(); - // while (true) { - // visible_ts = txn_mgr->GetMinUnflushedTS(); - // if (visible_ts <= last_commit_ts) { - // break; - // } - // auto end = std::chrono::steady_clock::now(); - // auto duration = std::chrono::duration_cast(end - start); - // if (duration.count() > 10) { - // UnrecoverableException("WaitFlushDeltaOp timeout"); - // } - // std::this_thread::sleep_for(Seconds(1)); - // }; + void WaitFlushDeltaOp(Storage *storage, TxnTimeStamp last_commit_ts) { + TxnManager *txn_mgr = storage->txn_manager(); TxnTimeStamp visible_ts = 0; time_t start = time(nullptr); while (true) { - visible_ts = txn_mgr->GetMinUnflushedTS(); + visible_ts = txn_mgr->GetCleanupScanTS(); if (visible_ts >= last_commit_ts) { break; } @@ -155,7 +143,7 @@ TEST_F(CatalogDeltaReplayTest, replay_db_entry) { txn->CreateDatabase(*db_name3, ConflictType::kError); txn_mgr->RollBackTxn(txn); } - WaitFlushDeltaOp(txn_mgr, last_commit_ts); + WaitFlushDeltaOp(storage, last_commit_ts); infinity::InfinityContext::instance().UnInit(); } @@ -232,7 +220,7 @@ TEST_F(CatalogDeltaReplayTest, replay_table_entry) { txn->CreateTable(*db_name, table_def3, ConflictType::kError); txn_mgr->RollBackTxn(txn); } - WaitFlushDeltaOp(txn_mgr, last_commit_ts); + WaitFlushDeltaOp(storage, last_commit_ts); infinity::InfinityContext::instance().UnInit(); } @@ -331,7 +319,7 @@ TEST_F(CatalogDeltaReplayTest, replay_import) { last_commit_ts = txn_mgr->CommitTxn(txn); } - WaitFlushDeltaOp(txn_mgr, last_commit_ts); + WaitFlushDeltaOp(storage, last_commit_ts); infinity::InfinityContext::instance().UnInit(); } @@ -419,7 +407,7 @@ TEST_F(CatalogDeltaReplayTest, replay_append) { ASSERT_TRUE(status.ok()); last_commit_ts = txn_mgr->CommitTxn(txn); } - WaitFlushDeltaOp(txn_mgr, last_commit_ts); + WaitFlushDeltaOp(storage, last_commit_ts); infinity::InfinityContext::instance().UnInit(); } @@ -525,7 +513,7 @@ TEST_F(CatalogDeltaReplayTest, replay_delete) { txn_mgr->CommitTxn(txn); } - WaitFlushDeltaOp(txn_mgr, last_commit_ts); + WaitFlushDeltaOp(storage, last_commit_ts); infinity::InfinityContext::instance().UnInit(); } } @@ -661,7 +649,7 @@ TEST_F(CatalogDeltaReplayTest, replay_with_full_checkpoint) { ASSERT_TRUE(status.ok()); last_commit_ts = txn_mgr->CommitTxn(txn_record3); - WaitFlushDeltaOp(txn_mgr, last_commit_ts); + WaitFlushDeltaOp(storage, last_commit_ts); } infinity::InfinityContext::instance().UnInit(); } @@ -929,7 +917,7 @@ TEST_F(CatalogDeltaReplayTest, replay_table_single_index) { last_commit_ts = txn_mgr->CommitTxn(txn_idx_drop); } } - WaitFlushDeltaOp(txn_mgr, last_commit_ts); + WaitFlushDeltaOp(storage, last_commit_ts); infinity::InfinityContext::instance().UnInit(); } } @@ -1095,7 +1083,7 @@ TEST_F(CatalogDeltaReplayTest, replay_table_single_index_named_db) { last_commit_ts = txn_mgr->CommitTxn(txn_idx_drop); } } - WaitFlushDeltaOp(txn_mgr, last_commit_ts); + WaitFlushDeltaOp(storage, last_commit_ts); infinity::InfinityContext::instance().UnInit(); } } @@ -1265,7 +1253,7 @@ TEST_F(CatalogDeltaReplayTest, replay_table_single_index_and_compact) { last_commit_ts = txn_mgr->CommitTxn(txn_idx_drop); } } - WaitFlushDeltaOp(txn_mgr, last_commit_ts); + WaitFlushDeltaOp(storage, last_commit_ts); infinity::InfinityContext::instance().UnInit(); } } \ No newline at end of file diff --git a/src/unit_test/storage/wal/checkpoint.cpp b/src/unit_test/storage/wal/checkpoint.cpp index a73530063e..204e1993ca 100644 --- a/src/unit_test/storage/wal/checkpoint.cpp +++ b/src/unit_test/storage/wal/checkpoint.cpp @@ -52,6 +52,7 @@ import default_values; import global_resource_usage; import infinity; import background_process; +import wal_manager; using namespace infinity; @@ -65,12 +66,13 @@ class CheckpointTest : public BaseTest { void TearDown() override { RemoveDbDirs(); } - void WaitFlushDeltaOp(TxnManager *txn_mgr, TxnTimeStamp last_commit_ts) { + void WaitFlushDeltaOp(Storage *storage, TxnTimeStamp last_commit_ts) { + TxnManager *txn_mgr = storage->txn_manager(); LOG_INFO("Waiting flush delta op"); TxnTimeStamp visible_ts = 0; time_t start = time(nullptr); while (true) { - visible_ts = txn_mgr->GetMinUnflushedTS(); + visible_ts = txn_mgr->GetCleanupScanTS(); time_t end = time(nullptr); if (visible_ts >= last_commit_ts) { LOG_INFO(fmt::format("Flush delta op finished after {}", end - start)); @@ -83,12 +85,16 @@ class CheckpointTest : public BaseTest { } } - void WaitCleanup(Catalog *catalog, TxnManager *txn_mgr, TxnTimeStamp last_commit_ts) { + void WaitCleanup(Storage *storage, TxnTimeStamp last_commit_ts) { + Catalog *catalog = storage->catalog(); + TxnManager *txn_mgr = storage->txn_manager(); + BufferManager *buffer_mgr = storage->buffer_manager(); + LOG_INFO("Waiting cleanup"); TxnTimeStamp visible_ts = 0; time_t start = time(nullptr); while (true) { - visible_ts = txn_mgr->GetMinUnflushedTS(); + visible_ts = txn_mgr->GetCleanupScanTS(); time_t end = time(nullptr); if (visible_ts >= last_commit_ts) { LOG_INFO(fmt::format("Cleanup finished after {}", end - start)); @@ -96,12 +102,12 @@ class CheckpointTest : public BaseTest { } // wait for at most 10s if (end - start > 10) { - UnrecoverableException("WaitCleanup timeout"); + UnrecoverableError("WaitCleanup timeout"); } + LOG_INFO(fmt::format("Before usleep. Wait cleanup for {} seconds", end - start)); usleep(1000 * 1000); } - auto buffer_mgr = txn_mgr->GetBufferMgr(); auto cleanup_task = MakeShared(catalog, visible_ts, buffer_mgr); cleanup_task->Execute(); } @@ -155,7 +161,6 @@ TEST_F(CheckpointTest, test_cleanup_and_checkpoint) { Storage *storage = infinity::InfinityContext::instance().storage(); BufferManager *buffer_manager = storage->buffer_manager(); TxnManager *txn_mgr = storage->txn_manager(); - Catalog *catalog = storage->catalog(); TxnTimeStamp last_commit_ts = 0; Vector> columns; @@ -214,9 +219,7 @@ TEST_F(CheckpointTest, test_cleanup_and_checkpoint) { txn_mgr->CommitTxn(txn5); } - WaitCleanup(catalog, txn_mgr, last_commit_ts); - usleep(5000 * 1000); - WaitFlushDeltaOp(txn_mgr, last_commit_ts); + WaitCleanup(storage, last_commit_ts); infinity::InfinityContext::instance().UnInit(); #ifdef INFINITY_DEBUG EXPECT_EQ(infinity::GlobalResourceUsage::GetObjectCount(), 0); @@ -298,7 +301,7 @@ TEST_F(CheckpointTest, test_index_replay_with_full_and_delta_checkpoint1) { auto last_commit_ts = txn_mgr->CommitTxn(txn); - WaitFlushDeltaOp(txn_mgr, last_commit_ts); + WaitFlushDeltaOp(storage, last_commit_ts); infinity::InfinityContext::instance().UnInit(); } @@ -422,7 +425,7 @@ TEST_F(CheckpointTest, test_index_replay_with_full_and_delta_checkpoint2) { } usleep(5000 * 1000); - WaitFlushDeltaOp(txn_mgr, last_commit_ts); + WaitFlushDeltaOp(storage, last_commit_ts); infinity::InfinityContext::instance().UnInit(); } diff --git a/test/data/config/test_cleanup_task.toml b/test/data/config/test_cleanup_task.toml index 1488a19f47..c88400c096 100644 --- a/test/data/config/test_cleanup_task.toml +++ b/test/data/config/test_cleanup_task.toml @@ -9,5 +9,7 @@ cleanup_interval = 0 compaction_interval = 0 [wal] +# close full checkpoint +full_checkpoint_interval = "0s" #short checkpoint interval to allow quick cleanup delta_checkpoint_interval_sec = 1 diff --git a/test/data/config/test_close_all_bgtask.toml b/test/data/config/test_close_all_bgtask.toml index fe68efb12f..43afcb46a1 100644 --- a/test/data/config/test_close_all_bgtask.toml +++ b/test/data/config/test_close_all_bgtask.toml @@ -16,8 +16,8 @@ compact_interval = "0s" [buffer] [wal] # close delta checkpoint -delta_checkpoint_interval_sec = "0s" +delta_checkpoint_interval = "0s" # close full checkpoint -full_checkpoint_interval_sec = "0s" +full_checkpoint_interval = "0s" [resource] \ No newline at end of file