Skip to content

Commit

Permalink
Fix unittest stuck (#1192)
Browse files Browse the repository at this point in the history
### What problem does this PR solve?

1. Fix: cleanup processor get visit_ts from txn_manager, visit_ts is
Min(first_uncommitted_begin_ts, last_checkpoint_ts)
2. Fix: config file typo.

Issue link:#1172
### Type of change

- [x] Bug Fix (non-breaking change which fixes an issue)
  • Loading branch information
small-turtle-1 authored May 9, 2024
1 parent ceae80c commit af24a07
Show file tree
Hide file tree
Showing 22 changed files with 151 additions and 180 deletions.
16 changes: 8 additions & 8 deletions conf/infinity_conf.toml
Original file line number Diff line number Diff line change
Expand Up @@ -43,31 +43,31 @@ storage_capacity = "64GB"
# s means seconds, for example "60s", 60 seconds
# m means minutes, for example "60m", 60 minutes
# h means hours, for example "1h", 1 hour
garbage_collection_interval = "60s"
cleanup_interval = "60s"

# storage ratio activates garbage collection:
# 0 means disable,
# 0.1 means, once the storage reach 10% storage capacity, GC is triggered.
garbage_collection_storage_ratio = 0.1

# dump memory index entry when it reachs the capacity
memindex_capacity = 1048576
mem_index_capacity = 1048576

[buffer]
buffer_pool_size = "4GB"
buffer_manager_size = "4GB"
temp_dir = "/var/infinity/tmp"

[wal]
wal_dir = "/var/infinity/wal"
full_checkpoint_interval_sec = 86400
delta_checkpoint_interval_sec = 60
delta_checkpoint_interval_wal_bytes = 1000000000
wal_file_size_threshold = "1GB"
full_checkpoint_interval = "86400s"
delta_checkpoint_interval = "60s"
# delta_checkpoint_threshold = 1000000000
wal_compact_threshold = "1GB"

# flush_at_once: write and flush log each commit
# only_write: write log, OS control when to flush the log, default
# flush_per_second: logs are written after each commit and flushed to disk per second.
flush_at_commit = "only_write"
flush_option = "only_write"

[resource]
dictionary_dir = "/var/infinity/resource"
2 changes: 1 addition & 1 deletion src/main/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -881,7 +881,7 @@ Status Config::Init(const SharedPtr<String> &config_path) {
if (wal_config["delta_checkpoint_interval"]) {
if (wal_config["delta_checkpoint_interval"].is_string()) {
String delta_checkpoint_interval_str =
wal_config["full_checkpoint_interval"].value_or(DEFAULT_DELTA_CHECKPOINT_INTERVAL_SEC_STR);
wal_config["delta_checkpoint_interval"].value_or(DEFAULT_DELTA_CHECKPOINT_INTERVAL_SEC_STR);
auto res = ParseTimeInfo(delta_checkpoint_interval_str, delta_checkpoint_interval);
if (!res.ok()) {
return res;
Expand Down
20 changes: 9 additions & 11 deletions src/main/query_context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ import session_manager;
import base_statement;
import parser_result;
import parser_assert;
import plan_fragment;

namespace infinity {

Expand Down Expand Up @@ -106,6 +107,10 @@ QueryResult QueryContext::Query(const String &query) {

QueryResult QueryContext::QueryStatement(const BaseStatement *statement) {
QueryResult query_result;
SharedPtr<LogicalNode> logical_plan = nullptr;
UniquePtr<PlanFragment> plan_fragment = nullptr;
UniquePtr<PhysicalOperator> physical_plan = nullptr;

// ProfilerStart("Query");
// BaseProfiler profiler;
// profiler.Begin();
Expand All @@ -127,7 +132,7 @@ QueryResult QueryContext::QueryStatement(const BaseStatement *statement) {
}

current_max_node_id_ = bind_context->GetNewLogicalNodeId();
SharedPtr<LogicalNode> logical_plan = logical_planner_->LogicalPlan();
logical_plan = logical_planner_->LogicalPlan();
StopProfile(QueryPhase::kLogicalPlan);
// LOG_WARN(fmt::format("Before optimizer cost: {}", profiler.ElapsedToString()));
// Apply optimized rule to the logical plan
Expand All @@ -137,13 +142,13 @@ QueryResult QueryContext::QueryStatement(const BaseStatement *statement) {

// Build physical plan
StartProfile(QueryPhase::kPhysicalPlan);
UniquePtr<PhysicalOperator> physical_plan = physical_planner_->BuildPhysicalOperator(logical_plan);
physical_plan = physical_planner_->BuildPhysicalOperator(logical_plan);
StopProfile(QueryPhase::kPhysicalPlan);
// LOG_WARN(fmt::format("Before pipeline cost: {}", profiler.ElapsedToString()));
StartProfile(QueryPhase::kPipelineBuild);
// Fragment Builder, only for test now.
// SharedPtr<PlanFragment> plan_fragment = fragment_builder.Build(physical_plan);
auto plan_fragment = fragment_builder_->BuildFragment(physical_plan.get());
plan_fragment = fragment_builder_->BuildFragment(physical_plan.get());
StopProfile(QueryPhase::kPipelineBuild);

auto notifier = MakeUnique<Notifier>();
Expand All @@ -159,14 +164,7 @@ QueryResult QueryContext::QueryStatement(const BaseStatement *statement) {
StopProfile(QueryPhase::kExecution);
// LOG_WARN(fmt::format("Before commit cost: {}", profiler.ElapsedToString()));
StartProfile(QueryPhase::kCommit);
try {
this->CommitTxn();
} catch (RecoverableException &e) {
StopProfile();
this->RollbackTxn();
query_result.result_table_ = nullptr;
query_result.status_.Init(e.ErrorCode(), e.what());
}
this->CommitTxn();
StopProfile(QueryPhase::kCommit);

} catch (RecoverableException &e) {
Expand Down
5 changes: 2 additions & 3 deletions src/storage/bg_task/periodic_trigger.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import third_party;
namespace infinity {

void CleanupPeriodicTrigger::Trigger() {
TxnTimeStamp visible_ts = txn_mgr_->GetMinUnflushedTS();
TxnTimeStamp visible_ts = txn_mgr_->GetCleanupScanTS();
// if (visible_ts == last_visible_ts_) {
// LOG_TRACE(fmt::format("Skip cleanup. visible timestamp: {}", visible_ts));
// return;
Expand All @@ -49,8 +49,7 @@ void CheckpointPeriodicTrigger::Trigger() {
auto checkpoint_task = MakeShared<CheckpointTask>(is_full_checkpoint_);
LOG_INFO(fmt::format("Trigger {} periodic checkpoint.", is_full_checkpoint_ ? "FULL" : "DELTA"));
if (!wal_mgr_->TrySubmitCheckpointTask(std::move(checkpoint_task))) {
LOG_TRACE(
fmt::format("Skip {} checkpoint(time) because there is already a checkpoint task running.", is_full_checkpoint_ ? "FULL" : "DELTA"));
LOG_TRACE(fmt::format("Skip {} checkpoint(time) because there is already a checkpoint task running.", is_full_checkpoint_ ? "FULL" : "DELTA"));
}
}

Expand Down
4 changes: 3 additions & 1 deletion src/storage/buffer/buffer_obj.cppm
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ export String BufferStatusToString(BufferStatus status) {
return "Freed";
case BufferStatus::kNew:
return "New";
case BufferStatus::kClean:
return "Clean";
default:
return "Invalid";
}
Expand Down Expand Up @@ -77,7 +79,7 @@ public:

void CleanupFile() const;

void CleanupTempFile() const ;
void CleanupTempFile() const;

SizeT GetBufferSize() const { return file_worker_->GetMemoryCost(); }

Expand Down
4 changes: 0 additions & 4 deletions src/storage/meta/catalog.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -77,8 +77,6 @@ Catalog::~Catalog() {
mem_index_commit_thread_.join();
}

void Catalog::SetTxnMgr(TxnManager *txn_mgr) { txn_mgr_ = txn_mgr; }

// do not only use this method to create database
// it will not record database in transaction, so when you commit transaction
// it will lose operation
Expand Down Expand Up @@ -887,8 +885,6 @@ bool Catalog::SaveDeltaCatalog(TxnTimeStamp max_commit_ts, String &delta_catalog
// Check the SegmentEntry's for flush the data to disk.
UniquePtr<CatalogDeltaEntry> flush_delta_entry = global_catalog_delta_entry_->PickFlushEntry(max_commit_ts);

DeferFn defer_fn([&]() { txn_mgr_->RemoveWaitFlushTxns(flush_delta_entry->txn_ids()); });

if (flush_delta_entry->operations().empty()) {
LOG_TRACE("Save delta catalog ops is empty. Skip flush.");
return true;
Expand Down
4 changes: 0 additions & 4 deletions src/storage/meta/catalog.cppm
Original file line number Diff line number Diff line change
Expand Up @@ -106,8 +106,6 @@ public:

~Catalog();

void SetTxnMgr(TxnManager *txn_mgr);

public:
// Database related functions
Tuple<DBEntry *, Status> CreateDatabase(const String &db_name,
Expand Down Expand Up @@ -291,8 +289,6 @@ public:

ProfileHistory history{DEFAULT_PROFILER_HISTORY_SIZE};

TxnManager *txn_mgr_{nullptr};

private: // TODO: remove this
std::shared_mutex &rw_locker() { return db_meta_map_.rw_locker_; }

Expand Down
64 changes: 16 additions & 48 deletions src/storage/txn/txn_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,7 @@ TxnManager::TxnManager(Catalog *catalog,
TxnTimeStamp start_ts,
bool enable_compaction)
: catalog_(catalog), buffer_mgr_(buffer_mgr), bg_task_processor_(bg_task_processor), wal_mgr_(wal_mgr), start_ts_(start_ts), is_running_(false),
enable_compaction_(enable_compaction) {
catalog_->SetTxnMgr(this);
}
enable_compaction_(enable_compaction) {}

Txn *TxnManager::BeginTxn(UniquePtr<String> txn_text) {
// Check if the is_running_ is true
Expand All @@ -66,7 +64,6 @@ Txn *TxnManager::BeginTxn(UniquePtr<String> txn_text) {

// Storage txn in txn manager
txn_map_[new_txn_id] = new_txn;
ts_map_.emplace(ts, new_txn_id);
beginned_txns_.emplace_back(new_txn);
rw_locker_.unlock();

Expand Down Expand Up @@ -182,8 +179,6 @@ void TxnManager::AddDeltaEntry(UniquePtr<CatalogDeltaEntry> delta_entry) {
UnrecoverableError("TxnManager is not running, cannot add delta entry");
}
i64 wal_size = wal_mgr_->WalSize();
const auto &txn_ids = delta_entry->txn_ids();
this->AddWaitFlushTxn(txn_ids);
bg_task_processor_->Submit(MakeShared<AddDeltaEntryTask>(std::move(delta_entry), wal_size));
}

Expand Down Expand Up @@ -235,6 +230,21 @@ SizeT TxnManager::ActiveTxnCount() {

TxnTimeStamp TxnManager::CurrentTS() const { return start_ts_; }

TxnTimeStamp TxnManager::GetCleanupScanTS() {
std::lock_guard guard(rw_locker_);
TxnTimeStamp first_uncommitted_begin_ts = start_ts_;
while (!beginned_txns_.empty()) {
auto first_txn = beginned_txns_.front().lock();
if (first_txn.get() != nullptr) {
first_uncommitted_begin_ts = first_txn->BeginTS();
break;
}
beginned_txns_.pop_front();
}
TxnTimeStamp checkpointed_ts = wal_mgr_->GetCheckpointedTS();
return std::min(first_uncommitted_begin_ts, checkpointed_ts);
}

// A Txn can be deleted when there is no uncommitted txn whose begin is less than the commit ts of the txn
// So maintain the least uncommitted begin ts
void TxnManager::FinishTxn(Txn *txn) {
Expand Down Expand Up @@ -292,46 +302,4 @@ void TxnManager::FinishTxn(Txn *txn) {
}
}

void TxnManager::AddWaitFlushTxn(const Vector<TransactionID> &txn_ids) {
// std::stringstream ss;
// for (auto txn_id : txn_ids) {
// ss << txn_id << " ";
// }
// LOG_INFO(fmt::format("Add txns: {} to wait flush set", ss.str()));
std::unique_lock w_lock(rw_locker_);
wait_flush_txns_.insert(txn_ids.begin(), txn_ids.end());
}

void TxnManager::RemoveWaitFlushTxns(const Vector<TransactionID> &txn_ids) {
// std::stringstream ss2;
// for (auto txn_id : txn_ids) {
// ss2 << txn_id << " ";
// }
// LOG_INFO(fmt::format("Remove txn: {} from wait flush set", ss2.str()));
std::unique_lock w_lock(rw_locker_);
for (auto txn_id : txn_ids) {
if (!wait_flush_txns_.erase(txn_id)) {
UnrecoverableError(fmt::format("Txn: {} not found in wait flush set", txn_id));
}
}
}

TxnTimeStamp TxnManager::GetMinUnflushedTS() {
std::unique_lock w_locker(rw_locker_);
for (auto iter = ts_map_.begin(); iter != ts_map_.end();) {
auto &[ts, txn_id] = *iter;
if (txn_map_.find(txn_id) != txn_map_.end()) {
LOG_TRACE(fmt::format("Txn: {} found in txn map", txn_id));
return ts;
}
if (wait_flush_txns_.find(txn_id) != wait_flush_txns_.end()) {
LOG_TRACE(fmt::format("Txn: {} wait flush", txn_id));
return ts;
}
iter = ts_map_.erase(iter);
}
LOG_TRACE(fmt::format("No txn is active, return the next ts {}", start_ts_));
return start_ts_;
}

} // namespace infinity
10 changes: 2 additions & 8 deletions src/storage/txn/txn_manager.cppm
Original file line number Diff line number Diff line change
Expand Up @@ -85,15 +85,12 @@ public:

TxnTimeStamp CurrentTS() const;

TxnTimeStamp GetCleanupScanTS();

private:
void FinishTxn(Txn *txn);

void AddWaitFlushTxn(const Vector<TransactionID> &txn_ids);

public:
void RemoveWaitFlushTxns(const Vector<TransactionID> &txn_ids);

TxnTimeStamp GetMinUnflushedTS();

bool enable_compaction() const { return enable_compaction_; }

Expand All @@ -113,9 +110,6 @@ private:
Map<TxnTimeStamp, WalEntry *> wait_conflict_ck_{}; // sorted by commit ts

Atomic<TxnTimeStamp> start_ts_{}; // The next txn ts
// Deque<TxnTimeStamp> ts_queue_{}; // the ts queue
Map<TxnTimeStamp, TransactionID> ts_map_{}; // optimize the data structure
HashSet<TransactionID> wait_flush_txns_{};

// For stop the txn manager
atomic_bool is_running_{false};
Expand Down
6 changes: 4 additions & 2 deletions src/storage/wal/catalog_delta_entry.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -998,15 +998,17 @@ void GlobalCatalogDeltaEntry::AddDeltaEntry(UniquePtr<CatalogDeltaEntry> delta_e
// }
// }
std::lock_guard<std::mutex> lock(catalog_delta_locker_);
if (delta_entry->sequence() != last_sequence_ + 1) {
u64 entry_sequence = delta_entry->sequence();
if (entry_sequence != last_sequence_ + 1) {
// Discontinuous
u64 entry_sequence = delta_entry->sequence();
// LOG_INFO(fmt::format("Add delta entry: {} in to sequence_heap_", entry_sequence));
sequence_heap_.push(entry_sequence);
delta_entry_map_.emplace(entry_sequence, std::move(delta_entry));
} else {
// Continuous
do {
wal_size_ = std::max(wal_size_, wal_size);
// LOG_INFO(fmt::format("Add delta entry: {} in to delta_ops_", entry_sequence));
this->AddDeltaEntryInner(delta_entry.get());

++last_sequence_;
Expand Down
26 changes: 17 additions & 9 deletions src/storage/wal/wal_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,11 @@ module wal_manager;

namespace infinity {

WalManager::WalManager(Storage *storage, String wal_dir, u64 wal_size_threshold, u64 delta_checkpoint_interval_wal_bytes, FlushOptionType flush_option)
WalManager::WalManager(Storage *storage,
String wal_dir,
u64 wal_size_threshold,
u64 delta_checkpoint_interval_wal_bytes,
FlushOptionType flush_option)
: cfg_wal_size_threshold_(wal_size_threshold), cfg_delta_checkpoint_interval_wal_bytes_(delta_checkpoint_interval_wal_bytes), wal_dir_(wal_dir),
wal_path_(wal_dir + "/" + WalFile::TempWalFilename()), storage_(storage), running_(false), flush_option_(flush_option), last_ckp_wal_size_(0),
checkpoint_in_progress_(false), last_ckp_ts_(UNCOMMIT_TS), last_full_ckp_ts_(UNCOMMIT_TS) {}
Expand Down Expand Up @@ -142,6 +146,8 @@ i64 WalManager::WalSize() const {
return wal_size_;
}

TxnTimeStamp WalManager::GetCheckpointedTS() { return last_ckp_ts_ == UNCOMMIT_TS ? 0 : last_ckp_ts_ + 1; }

// Flush is scheduled regularly. It collects a batch of transactions, sync
// wal and do parallel committing. Each sync cost ~1s. Each checkpoint cost
// ~10s. So it's necessary to sync for a batch of transactions, and to
Expand Down Expand Up @@ -274,25 +280,27 @@ void WalManager::Checkpoint(ForceCheckpointTask *ckp_task, TxnTimeStamp max_comm
void WalManager::CheckpointInner(bool is_full_checkpoint, Txn *txn, TxnTimeStamp max_commit_ts, i64 wal_size) {
DeferFn defer([&] { checkpoint_in_progress_.store(false); });

TxnTimeStamp last_ckp_ts = last_ckp_ts_;
TxnTimeStamp last_full_ckp_ts = last_full_ckp_ts_;
if (is_full_checkpoint) {
if (max_commit_ts == last_full_ckp_ts_) {
if (max_commit_ts == last_full_ckp_ts) {
LOG_TRACE(fmt::format("Skip full checkpoint because the max_commit_ts {} is the same as the last full checkpoint", max_commit_ts));
return;
}
if (last_full_ckp_ts_ != UNCOMMIT_TS && last_full_ckp_ts_ >= max_commit_ts) {
if (last_full_ckp_ts != UNCOMMIT_TS && last_full_ckp_ts >= max_commit_ts) {
UnrecoverableError(
fmt::format("WalManager::UpdateLastFullMaxCommitTS last_full_ckp_ts_ {} >= max_commit_ts {}", last_full_ckp_ts_, max_commit_ts));
fmt::format("WalManager::UpdateLastFullMaxCommitTS last_full_ckp_ts {} >= max_commit_ts {}", last_full_ckp_ts, max_commit_ts));
}
if (last_ckp_ts_ != UNCOMMIT_TS && last_ckp_ts_ > max_commit_ts) {
UnrecoverableError(fmt::format("WalManager::UpdateLastFullMaxCommitTS last_ckp_ts_ {} >= max_commit_ts {}", last_ckp_ts_, max_commit_ts));
if (last_ckp_ts != UNCOMMIT_TS && last_ckp_ts > max_commit_ts) {
UnrecoverableError(fmt::format("WalManager::UpdateLastFullMaxCommitTS last_ckp_ts {} >= max_commit_ts {}", last_ckp_ts, max_commit_ts));
}
} else {
if (max_commit_ts == last_ckp_ts_) {
if (max_commit_ts == last_ckp_ts) {
LOG_TRACE(fmt::format("Skip delta checkpoint because the max_commit_ts {} is the same as the last checkpoint", max_commit_ts));
return;
}
if (last_ckp_ts_ >= max_commit_ts) {
UnrecoverableError(fmt::format("WalManager::UpdateLastMaxCommitTS last_ckp_ts_ {} >= max_commit_ts {}", last_ckp_ts_, max_commit_ts));
if (last_ckp_ts >= max_commit_ts) {
UnrecoverableError(fmt::format("WalManager::UpdateLastMaxCommitTS last_ckp_ts {} >= max_commit_ts {}", last_ckp_ts, max_commit_ts));
}
}
try {
Expand Down
Loading

0 comments on commit af24a07

Please sign in to comment.