Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix unittest stuck on 0.1.x #1194

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 8 additions & 8 deletions conf/infinity_conf.toml
Original file line number Diff line number Diff line change
Expand Up @@ -43,31 +43,31 @@ storage_capacity = "64GB"
# s means seconds, for example "60s", 60 seconds
# m means minutes, for example "60m", 60 minutes
# h means hours, for example "1h", 1 hour
garbage_collection_interval = "60s"
cleanup_interval = "60s"

# storage ratio activates garbage collection:
# 0 means disable,
# 0.1 means, once the storage reach 10% storage capacity, GC is triggered.
garbage_collection_storage_ratio = 0.1

# dump memory index entry when it reachs the capacity
memindex_capacity = 1048576
mem_index_capacity = 1048576

[buffer]
buffer_pool_size = "4GB"
buffer_manager_size = "4GB"
temp_dir = "/var/infinity/tmp"

[wal]
wal_dir = "/var/infinity/wal"
full_checkpoint_interval_sec = 86400
delta_checkpoint_interval_sec = 60
delta_checkpoint_interval_wal_bytes = 1000000000
wal_file_size_threshold = "1GB"
full_checkpoint_interval = "86400s"
delta_checkpoint_interval = "60s"
# delta_checkpoint_threshold = 1000000000
wal_compact_threshold = "1GB"

# flush_at_once: write and flush log each commit
# only_write: write log, OS control when to flush the log, default
# flush_per_second: logs are written after each commit and flushed to disk per second.
flush_at_commit = "only_write"
flush_option = "only_write"

[resource]
dictionary_dir = "/var/infinity/resource"
20 changes: 9 additions & 11 deletions src/main/query_context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ import session_manager;
import base_statement;
import parser_result;
import parser_assert;
import plan_fragment;

namespace infinity {

Expand Down Expand Up @@ -106,6 +107,10 @@ QueryResult QueryContext::Query(const String &query) {

QueryResult QueryContext::QueryStatement(const BaseStatement *statement) {
QueryResult query_result;
SharedPtr<LogicalNode> logical_plan = nullptr;
UniquePtr<PlanFragment> plan_fragment = nullptr;
UniquePtr<PhysicalOperator> physical_plan = nullptr;

// ProfilerStart("Query");
// BaseProfiler profiler;
// profiler.Begin();
Expand All @@ -127,7 +132,7 @@ QueryResult QueryContext::QueryStatement(const BaseStatement *statement) {
}

current_max_node_id_ = bind_context->GetNewLogicalNodeId();
SharedPtr<LogicalNode> logical_plan = logical_planner_->LogicalPlan();
logical_plan = logical_planner_->LogicalPlan();
StopProfile(QueryPhase::kLogicalPlan);
// LOG_WARN(fmt::format("Before optimizer cost: {}", profiler.ElapsedToString()));
// Apply optimized rule to the logical plan
Expand All @@ -137,13 +142,13 @@ QueryResult QueryContext::QueryStatement(const BaseStatement *statement) {

// Build physical plan
StartProfile(QueryPhase::kPhysicalPlan);
UniquePtr<PhysicalOperator> physical_plan = physical_planner_->BuildPhysicalOperator(logical_plan);
physical_plan = physical_planner_->BuildPhysicalOperator(logical_plan);
StopProfile(QueryPhase::kPhysicalPlan);
// LOG_WARN(fmt::format("Before pipeline cost: {}", profiler.ElapsedToString()));
StartProfile(QueryPhase::kPipelineBuild);
// Fragment Builder, only for test now.
// SharedPtr<PlanFragment> plan_fragment = fragment_builder.Build(physical_plan);
auto plan_fragment = fragment_builder_->BuildFragment(physical_plan.get());
plan_fragment = fragment_builder_->BuildFragment(physical_plan.get());
StopProfile(QueryPhase::kPipelineBuild);

auto notifier = MakeUnique<Notifier>();
Expand All @@ -159,14 +164,7 @@ QueryResult QueryContext::QueryStatement(const BaseStatement *statement) {
StopProfile(QueryPhase::kExecution);
// LOG_WARN(fmt::format("Before commit cost: {}", profiler.ElapsedToString()));
StartProfile(QueryPhase::kCommit);
try {
this->CommitTxn();
} catch (RecoverableException &e) {
StopProfile();
this->RollbackTxn();
query_result.result_table_ = nullptr;
query_result.status_.Init(e.ErrorCode(), e.what());
}
this->CommitTxn();
StopProfile(QueryPhase::kCommit);

} catch (RecoverableException &e) {
Expand Down
5 changes: 2 additions & 3 deletions src/storage/bg_task/periodic_trigger.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import third_party;
namespace infinity {

void CleanupPeriodicTrigger::Trigger() {
TxnTimeStamp visible_ts = txn_mgr_->GetMinUnflushedTS();
TxnTimeStamp visible_ts = txn_mgr_->GetCleanupScanTS();
// if (visible_ts == last_visible_ts_) {
// LOG_TRACE(fmt::format("Skip cleanup. visible timestamp: {}", visible_ts));
// return;
Expand All @@ -49,8 +49,7 @@ void CheckpointPeriodicTrigger::Trigger() {
auto checkpoint_task = MakeShared<CheckpointTask>(is_full_checkpoint_);
LOG_TRACE(fmt::format("Trigger {} periodic checkpoint.", is_full_checkpoint_ ? "FULL" : "DELTA"));
if (!wal_mgr_->TrySubmitCheckpointTask(std::move(checkpoint_task))) {
LOG_TRACE(
fmt::format("Skip {} checkpoint(time) because there is already a checkpoint task running.", is_full_checkpoint_ ? "FULL" : "DELTA"));
LOG_TRACE(fmt::format("Skip {} checkpoint(time) because there is already a checkpoint task running.", is_full_checkpoint_ ? "FULL" : "DELTA"));
}
}

Expand Down
4 changes: 3 additions & 1 deletion src/storage/buffer/buffer_obj.cppm
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ export String BufferStatusToString(BufferStatus status) {
return "Freed";
case BufferStatus::kNew:
return "New";
case BufferStatus::kClean:
return "Clean";
default:
return "Invalid";
}
Expand Down Expand Up @@ -77,7 +79,7 @@ public:

void CleanupFile() const;

void CleanupTempFile() const ;
void CleanupTempFile() const;

SizeT GetBufferSize() const { return file_worker_->GetMemoryCost(); }

Expand Down
4 changes: 0 additions & 4 deletions src/storage/meta/catalog.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -77,8 +77,6 @@ Catalog::~Catalog() {
mem_index_commit_thread_.join();
}

void Catalog::SetTxnMgr(TxnManager *txn_mgr) { txn_mgr_ = txn_mgr; }

// do not only use this method to create database
// it will not record database in transaction, so when you commit transaction
// it will lose operation
Expand Down Expand Up @@ -887,8 +885,6 @@ bool Catalog::SaveDeltaCatalog(TxnTimeStamp max_commit_ts, String &delta_catalog
// Check the SegmentEntry's for flush the data to disk.
UniquePtr<CatalogDeltaEntry> flush_delta_entry = global_catalog_delta_entry_->PickFlushEntry(max_commit_ts);

DeferFn defer_fn([&]() { txn_mgr_->RemoveWaitFlushTxns(flush_delta_entry->txn_ids()); });

if (flush_delta_entry->operations().empty()) {
LOG_TRACE("Save delta catalog ops is empty. Skip flush.");
return true;
Expand Down
4 changes: 0 additions & 4 deletions src/storage/meta/catalog.cppm
Original file line number Diff line number Diff line change
Expand Up @@ -106,8 +106,6 @@ public:

~Catalog();

void SetTxnMgr(TxnManager *txn_mgr);

public:
// Database related functions
Tuple<DBEntry *, Status> CreateDatabase(const String &db_name,
Expand Down Expand Up @@ -291,8 +289,6 @@ public:

ProfileHistory history{DEFAULT_PROFILER_HISTORY_SIZE};

TxnManager *txn_mgr_{nullptr};

private: // TODO: remove this
std::shared_mutex &rw_locker() { return db_meta_map_.rw_locker_; }

Expand Down
64 changes: 16 additions & 48 deletions src/storage/txn/txn_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,7 @@ TxnManager::TxnManager(Catalog *catalog,
TxnTimeStamp start_ts,
bool enable_compaction)
: catalog_(catalog), buffer_mgr_(buffer_mgr), bg_task_processor_(bg_task_processor), wal_mgr_(wal_mgr), start_ts_(start_ts), is_running_(false),
enable_compaction_(enable_compaction) {
catalog_->SetTxnMgr(this);
}
enable_compaction_(enable_compaction) {}

Txn *TxnManager::BeginTxn() {
// Check if the is_running_ is true
Expand All @@ -66,7 +64,6 @@ Txn *TxnManager::BeginTxn() {

// Storage txn in txn manager
txn_map_[new_txn_id] = new_txn;
ts_map_.emplace(ts, new_txn_id);
beginned_txns_.emplace_back(new_txn);
rw_locker_.unlock();

Expand Down Expand Up @@ -182,8 +179,6 @@ void TxnManager::AddDeltaEntry(UniquePtr<CatalogDeltaEntry> delta_entry) {
UnrecoverableError("TxnManager is not running, cannot add delta entry");
}
i64 wal_size = wal_mgr_->WalSize();
const auto &txn_ids = delta_entry->txn_ids();
this->AddWaitFlushTxn(txn_ids);
bg_task_processor_->Submit(MakeShared<AddDeltaEntryTask>(std::move(delta_entry), wal_size));
}

Expand Down Expand Up @@ -235,6 +230,21 @@ SizeT TxnManager::ActiveTxnCount() {

TxnTimeStamp TxnManager::CurrentTS() const { return start_ts_; }

TxnTimeStamp TxnManager::GetCleanupScanTS() {
std::lock_guard guard(rw_locker_);
TxnTimeStamp first_uncommitted_begin_ts = start_ts_;
while (!beginned_txns_.empty()) {
auto first_txn = beginned_txns_.front().lock();
if (first_txn.get() != nullptr) {
first_uncommitted_begin_ts = first_txn->BeginTS();
break;
}
beginned_txns_.pop_front();
}
TxnTimeStamp checkpointed_ts = wal_mgr_->GetCheckpointedTS();
return std::min(first_uncommitted_begin_ts, checkpointed_ts);
}

// A Txn can be deleted when there is no uncommitted txn whose begin is less than the commit ts of the txn
// So maintain the least uncommitted begin ts
void TxnManager::FinishTxn(Txn *txn) {
Expand Down Expand Up @@ -292,46 +302,4 @@ void TxnManager::FinishTxn(Txn *txn) {
}
}

void TxnManager::AddWaitFlushTxn(const Vector<TransactionID> &txn_ids) {
// std::stringstream ss;
// for (auto txn_id : txn_ids) {
// ss << txn_id << " ";
// }
// LOG_INFO(fmt::format("Add txns: {} to wait flush set", ss.str()));
std::unique_lock w_lock(rw_locker_);
wait_flush_txns_.insert(txn_ids.begin(), txn_ids.end());
}

void TxnManager::RemoveWaitFlushTxns(const Vector<TransactionID> &txn_ids) {
// std::stringstream ss2;
// for (auto txn_id : txn_ids) {
// ss2 << txn_id << " ";
// }
// LOG_INFO(fmt::format("Remove txn: {} from wait flush set", ss2.str()));
std::unique_lock w_lock(rw_locker_);
for (auto txn_id : txn_ids) {
if (!wait_flush_txns_.erase(txn_id)) {
UnrecoverableError(fmt::format("Txn: {} not found in wait flush set", txn_id));
}
}
}

TxnTimeStamp TxnManager::GetMinUnflushedTS() {
std::unique_lock w_locker(rw_locker_);
for (auto iter = ts_map_.begin(); iter != ts_map_.end();) {
auto &[ts, txn_id] = *iter;
if (txn_map_.find(txn_id) != txn_map_.end()) {
LOG_TRACE(fmt::format("Txn: {} found in txn map", txn_id));
return ts;
}
if (wait_flush_txns_.find(txn_id) != wait_flush_txns_.end()) {
LOG_TRACE(fmt::format("Txn: {} wait flush", txn_id));
return ts;
}
iter = ts_map_.erase(iter);
}
LOG_TRACE(fmt::format("No txn is active, return the next ts {}", start_ts_));
return start_ts_;
}

} // namespace infinity
10 changes: 2 additions & 8 deletions src/storage/txn/txn_manager.cppm
Original file line number Diff line number Diff line change
Expand Up @@ -85,15 +85,12 @@ public:

TxnTimeStamp CurrentTS() const;

TxnTimeStamp GetCleanupScanTS();

private:
void FinishTxn(Txn *txn);

void AddWaitFlushTxn(const Vector<TransactionID> &txn_ids);

public:
void RemoveWaitFlushTxns(const Vector<TransactionID> &txn_ids);

TxnTimeStamp GetMinUnflushedTS();

bool enable_compaction() const { return enable_compaction_; }

Expand All @@ -113,9 +110,6 @@ private:
Map<TxnTimeStamp, WalEntry *> wait_conflict_ck_{}; // sorted by commit ts

Atomic<TxnTimeStamp> start_ts_{}; // The next txn ts
// Deque<TxnTimeStamp> ts_queue_{}; // the ts queue
Map<TxnTimeStamp, TransactionID> ts_map_{}; // optimize the data structure
HashSet<TransactionID> wait_flush_txns_{};

// For stop the txn manager
atomic_bool is_running_{false};
Expand Down
6 changes: 4 additions & 2 deletions src/storage/wal/catalog_delta_entry.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -998,15 +998,17 @@ void GlobalCatalogDeltaEntry::AddDeltaEntry(UniquePtr<CatalogDeltaEntry> delta_e
// }
// }
std::lock_guard<std::mutex> lock(catalog_delta_locker_);
if (delta_entry->sequence() != last_sequence_ + 1) {
u64 entry_sequence = delta_entry->sequence();
if (entry_sequence != last_sequence_ + 1) {
// Discontinuous
u64 entry_sequence = delta_entry->sequence();
// LOG_INFO(fmt::format("Add delta entry: {} in to sequence_heap_", entry_sequence));
sequence_heap_.push(entry_sequence);
delta_entry_map_.emplace(entry_sequence, std::move(delta_entry));
} else {
// Continuous
do {
wal_size_ = std::max(wal_size_, wal_size);
// LOG_INFO(fmt::format("Add delta entry: {} in to delta_ops_", entry_sequence));
this->AddDeltaEntryInner(delta_entry.get());

++last_sequence_;
Expand Down
26 changes: 17 additions & 9 deletions src/storage/wal/wal_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,11 @@ module wal_manager;

namespace infinity {

WalManager::WalManager(Storage *storage, String wal_dir, u64 wal_size_threshold, u64 delta_checkpoint_interval_wal_bytes, FlushOption flush_option)
WalManager::WalManager(Storage *storage,
String wal_dir,
u64 wal_size_threshold,
u64 delta_checkpoint_interval_wal_bytes,
FlushOptionType flush_option)
: cfg_wal_size_threshold_(wal_size_threshold), cfg_delta_checkpoint_interval_wal_bytes_(delta_checkpoint_interval_wal_bytes), wal_dir_(wal_dir),
wal_path_(wal_dir + "/" + WalFile::TempWalFilename()), storage_(storage), running_(false), flush_option_(flush_option), last_ckp_wal_size_(0),
checkpoint_in_progress_(false), last_ckp_ts_(UNCOMMIT_TS), last_full_ckp_ts_(UNCOMMIT_TS) {}
Expand Down Expand Up @@ -142,6 +146,8 @@ i64 WalManager::WalSize() const {
return wal_size_;
}

TxnTimeStamp WalManager::GetCheckpointedTS() { return last_ckp_ts_ == UNCOMMIT_TS ? 0 : last_ckp_ts_ + 1; }

// Flush is scheduled regularly. It collects a batch of transactions, sync
// wal and do parallel committing. Each sync cost ~1s. Each checkpoint cost
// ~10s. So it's necessary to sync for a batch of transactions, and to
Expand Down Expand Up @@ -274,25 +280,27 @@ void WalManager::Checkpoint(ForceCheckpointTask *ckp_task, TxnTimeStamp max_comm
void WalManager::CheckpointInner(bool is_full_checkpoint, Txn *txn, TxnTimeStamp max_commit_ts, i64 wal_size) {
DeferFn defer([&] { checkpoint_in_progress_.store(false); });

TxnTimeStamp last_ckp_ts = last_ckp_ts_;
TxnTimeStamp last_full_ckp_ts = last_full_ckp_ts_;
if (is_full_checkpoint) {
if (max_commit_ts == last_full_ckp_ts_) {
if (max_commit_ts == last_full_ckp_ts) {
LOG_TRACE(fmt::format("Skip full checkpoint because the max_commit_ts {} is the same as the last full checkpoint", max_commit_ts));
return;
}
if (last_full_ckp_ts_ != UNCOMMIT_TS && last_full_ckp_ts_ >= max_commit_ts) {
if (last_full_ckp_ts != UNCOMMIT_TS && last_full_ckp_ts >= max_commit_ts) {
UnrecoverableError(
fmt::format("WalManager::UpdateLastFullMaxCommitTS last_full_ckp_ts_ {} >= max_commit_ts {}", last_full_ckp_ts_, max_commit_ts));
fmt::format("WalManager::UpdateLastFullMaxCommitTS last_full_ckp_ts {} >= max_commit_ts {}", last_full_ckp_ts, max_commit_ts));
}
if (last_ckp_ts_ != UNCOMMIT_TS && last_ckp_ts_ > max_commit_ts) {
UnrecoverableError(fmt::format("WalManager::UpdateLastFullMaxCommitTS last_ckp_ts_ {} >= max_commit_ts {}", last_ckp_ts_, max_commit_ts));
if (last_ckp_ts != UNCOMMIT_TS && last_ckp_ts > max_commit_ts) {
UnrecoverableError(fmt::format("WalManager::UpdateLastFullMaxCommitTS last_ckp_ts {} >= max_commit_ts {}", last_ckp_ts, max_commit_ts));
}
} else {
if (max_commit_ts == last_ckp_ts_) {
if (max_commit_ts == last_ckp_ts) {
LOG_TRACE(fmt::format("Skip delta checkpoint because the max_commit_ts {} is the same as the last checkpoint", max_commit_ts));
return;
}
if (last_ckp_ts_ >= max_commit_ts) {
UnrecoverableError(fmt::format("WalManager::UpdateLastMaxCommitTS last_ckp_ts_ {} >= max_commit_ts {}", last_ckp_ts_, max_commit_ts));
if (last_ckp_ts >= max_commit_ts) {
UnrecoverableError(fmt::format("WalManager::UpdateLastMaxCommitTS last_ckp_ts {} >= max_commit_ts {}", last_ckp_ts, max_commit_ts));
}
}
try {
Expand Down
4 changes: 3 additions & 1 deletion src/storage/wal/wal_manager.cppm
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,8 @@ public:

i64 GetLastCkpWalSize();

TxnTimeStamp GetCheckpointedTS();

private:
// Checkpoint Helper
void CheckpointInner(bool is_full_checkpoint, Txn *txn, TxnTimeStamp max_commit_ts, i64 wal_size);
Expand Down Expand Up @@ -123,7 +125,7 @@ private:
i64 last_ckp_wal_size_{};
Atomic<bool> checkpoint_in_progress_{false};

// Only Checkpoint thread access following members
// Only Checkpoint/Cleanup thread access following members
TxnTimeStamp last_ckp_ts_{};
TxnTimeStamp last_full_ckp_ts_{};
};
Expand Down
Loading
Loading