Skip to content

Commit 337833b

Browse files
Fix: refactor wait cleanup alg.
1 parent 6508193 commit 337833b

15 files changed

+106
-159
lines changed

src/main/query_context.cpp

+9-11
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@ import session_manager;
5050
import base_statement;
5151
import parser_result;
5252
import parser_assert;
53+
import plan_fragment;
5354

5455
namespace infinity {
5556

@@ -106,6 +107,10 @@ QueryResult QueryContext::Query(const String &query) {
106107

107108
QueryResult QueryContext::QueryStatement(const BaseStatement *statement) {
108109
QueryResult query_result;
110+
SharedPtr<LogicalNode> logical_plan = nullptr;
111+
UniquePtr<PlanFragment> plan_fragment = nullptr;
112+
UniquePtr<PhysicalOperator> physical_plan = nullptr;
113+
109114
// ProfilerStart("Query");
110115
// BaseProfiler profiler;
111116
// profiler.Begin();
@@ -127,7 +132,7 @@ QueryResult QueryContext::QueryStatement(const BaseStatement *statement) {
127132
}
128133

129134
current_max_node_id_ = bind_context->GetNewLogicalNodeId();
130-
SharedPtr<LogicalNode> logical_plan = logical_planner_->LogicalPlan();
135+
logical_plan = logical_planner_->LogicalPlan();
131136
StopProfile(QueryPhase::kLogicalPlan);
132137
// LOG_WARN(fmt::format("Before optimizer cost: {}", profiler.ElapsedToString()));
133138
// Apply optimized rule to the logical plan
@@ -137,13 +142,13 @@ QueryResult QueryContext::QueryStatement(const BaseStatement *statement) {
137142

138143
// Build physical plan
139144
StartProfile(QueryPhase::kPhysicalPlan);
140-
UniquePtr<PhysicalOperator> physical_plan = physical_planner_->BuildPhysicalOperator(logical_plan);
145+
physical_plan = physical_planner_->BuildPhysicalOperator(logical_plan);
141146
StopProfile(QueryPhase::kPhysicalPlan);
142147
// LOG_WARN(fmt::format("Before pipeline cost: {}", profiler.ElapsedToString()));
143148
StartProfile(QueryPhase::kPipelineBuild);
144149
// Fragment Builder, only for test now.
145150
// SharedPtr<PlanFragment> plan_fragment = fragment_builder.Build(physical_plan);
146-
auto plan_fragment = fragment_builder_->BuildFragment(physical_plan.get());
151+
plan_fragment = fragment_builder_->BuildFragment(physical_plan.get());
147152
StopProfile(QueryPhase::kPipelineBuild);
148153

149154
auto notifier = MakeUnique<Notifier>();
@@ -159,14 +164,7 @@ QueryResult QueryContext::QueryStatement(const BaseStatement *statement) {
159164
StopProfile(QueryPhase::kExecution);
160165
// LOG_WARN(fmt::format("Before commit cost: {}", profiler.ElapsedToString()));
161166
StartProfile(QueryPhase::kCommit);
162-
try {
163-
this->CommitTxn();
164-
} catch (RecoverableException &e) {
165-
StopProfile();
166-
this->RollbackTxn();
167-
query_result.result_table_ = nullptr;
168-
query_result.status_.Init(e.ErrorCode(), e.what());
169-
}
167+
this->CommitTxn();
170168
StopProfile(QueryPhase::kCommit);
171169

172170
} catch (RecoverableException &e) {

src/storage/bg_task/periodic_trigger.cpp

+3-5
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ import third_party;
2828
namespace infinity {
2929

3030
void CleanupPeriodicTrigger::Trigger() {
31-
TxnTimeStamp visible_ts = txn_mgr_->GetMinUnflushedTS();
31+
TxnTimeStamp visible_ts = wal_mgr_->GetLastCkpTS() + 1;
3232
// if (visible_ts == last_visible_ts_) {
3333
// LOG_TRACE(fmt::format("Skip cleanup. visible timestamp: {}", visible_ts));
3434
// return;
@@ -40,17 +40,15 @@ void CleanupPeriodicTrigger::Trigger() {
4040
last_visible_ts_ = visible_ts;
4141
LOG_TRACE(fmt::format("Cleanup visible timestamp: {}", visible_ts));
4242

43-
auto buffer_mgr = txn_mgr_->GetBufferMgr();
44-
auto cleanup_task = MakeShared<CleanupTask>(catalog_, visible_ts, buffer_mgr);
43+
auto cleanup_task = MakeShared<CleanupTask>(catalog_, visible_ts, buffer_mgr_);
4544
bg_processor_->Submit(std::move(cleanup_task));
4645
}
4746

4847
void CheckpointPeriodicTrigger::Trigger() {
4948
auto checkpoint_task = MakeShared<CheckpointTask>(is_full_checkpoint_);
5049
LOG_INFO(fmt::format("Trigger {} periodic checkpoint.", is_full_checkpoint_ ? "FULL" : "DELTA"));
5150
if (!wal_mgr_->TrySubmitCheckpointTask(std::move(checkpoint_task))) {
52-
LOG_INFO(
53-
fmt::format("Skip {} checkpoint(time) because there is already a checkpoint task running.", is_full_checkpoint_ ? "FULL" : "DELTA"));
51+
LOG_INFO(fmt::format("Skip {} checkpoint(time) because there is already a checkpoint task running.", is_full_checkpoint_ ? "FULL" : "DELTA"));
5452
}
5553
}
5654

src/storage/bg_task/periodic_trigger.cppm

+9-4
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,8 @@ import stl;
2020
import background_process;
2121
import compaction_process;
2222
import catalog;
23-
import txn_manager;
2423
import wal_manager;
24+
import buffer_manager;
2525

2626
namespace infinity {
2727

@@ -51,15 +51,20 @@ private:
5151

5252
export class CleanupPeriodicTrigger final : public PeriodicTrigger {
5353
public:
54-
CleanupPeriodicTrigger(std::chrono::milliseconds interval, BGTaskProcessor *bg_processor, Catalog *catalog, TxnManager *txn_mgr)
55-
: PeriodicTrigger(interval), bg_processor_(bg_processor), catalog_(catalog), txn_mgr_(txn_mgr) {}
54+
CleanupPeriodicTrigger(std::chrono::milliseconds interval,
55+
BGTaskProcessor *bg_processor,
56+
Catalog *catalog,
57+
WalManager *wal_mgr,
58+
BufferManager *buffer_mgr)
59+
: PeriodicTrigger(interval), bg_processor_(bg_processor), catalog_(catalog), wal_mgr_(wal_mgr), buffer_mgr_(buffer_mgr) {}
5660

5761
virtual void Trigger() override;
5862

5963
private:
6064
BGTaskProcessor *const bg_processor_{};
6165
Catalog *const catalog_{};
62-
TxnManager *const txn_mgr_{};
66+
WalManager *const wal_mgr_{};
67+
BufferManager *const buffer_mgr_{};
6368

6469
TxnTimeStamp last_visible_ts_{0};
6570
};

src/storage/meta/catalog.cpp

-4
Original file line numberDiff line numberDiff line change
@@ -77,8 +77,6 @@ Catalog::~Catalog() {
7777
mem_index_commit_thread_.join();
7878
}
7979

80-
void Catalog::SetTxnMgr(TxnManager *txn_mgr) { txn_mgr_ = txn_mgr; }
81-
8280
// do not only use this method to create database
8381
// it will not record database in transaction, so when you commit transaction
8482
// it will lose operation
@@ -887,8 +885,6 @@ bool Catalog::SaveDeltaCatalog(TxnTimeStamp max_commit_ts, String &delta_catalog
887885
// Check the SegmentEntry's for flush the data to disk.
888886
UniquePtr<CatalogDeltaEntry> flush_delta_entry = global_catalog_delta_entry_->PickFlushEntry(max_commit_ts);
889887

890-
DeferFn defer_fn([&]() { txn_mgr_->RemoveWaitFlushTxns(flush_delta_entry->txn_ids()); });
891-
892888
if (flush_delta_entry->operations().empty()) {
893889
LOG_TRACE("Save delta catalog ops is empty. Skip flush.");
894890
return true;

src/storage/meta/catalog.cppm

-4
Original file line numberDiff line numberDiff line change
@@ -106,8 +106,6 @@ public:
106106

107107
~Catalog();
108108

109-
void SetTxnMgr(TxnManager *txn_mgr);
110-
111109
public:
112110
// Database related functions
113111
Tuple<DBEntry *, Status> CreateDatabase(const String &db_name,
@@ -291,8 +289,6 @@ public:
291289

292290
ProfileHistory history{DEFAULT_PROFILER_HISTORY_SIZE};
293291

294-
TxnManager *txn_mgr_{nullptr};
295-
296292
private: // TODO: remove this
297293
std::shared_mutex &rw_locker() { return db_meta_map_.rw_locker_; }
298294

src/storage/storage.cpp

+1-1
Original file line numberDiff line numberDiff line change
@@ -124,7 +124,7 @@ void Storage::Init() {
124124
std::chrono::seconds cleanup_interval = static_cast<std::chrono::seconds>(config_ptr_->CleanupInterval());
125125
if (cleanup_interval.count() > 0) {
126126
periodic_trigger_thread_->AddTrigger(
127-
MakeUnique<CleanupPeriodicTrigger>(cleanup_interval, bg_processor_.get(), new_catalog_.get(), txn_mgr_.get()));
127+
MakeUnique<CleanupPeriodicTrigger>(cleanup_interval, bg_processor_.get(), new_catalog_.get(), wal_mgr_.get(), buffer_mgr_.get()));
128128
} else {
129129
LOG_WARN("Cleanup interval is not set, auto cleanup task will not be triggered");
130130
}

src/storage/txn/txn_manager.cpp

+2-48
Original file line numberDiff line numberDiff line change
@@ -43,9 +43,7 @@ TxnManager::TxnManager(Catalog *catalog,
4343
TxnTimeStamp start_ts,
4444
bool enable_compaction)
4545
: catalog_(catalog), buffer_mgr_(buffer_mgr), bg_task_processor_(bg_task_processor), wal_mgr_(wal_mgr), start_ts_(start_ts), is_running_(false),
46-
enable_compaction_(enable_compaction) {
47-
catalog_->SetTxnMgr(this);
48-
}
46+
enable_compaction_(enable_compaction) {}
4947

5048
Txn *TxnManager::BeginTxn(UniquePtr<String> txn_text) {
5149
// Check if the is_running_ is true
@@ -68,7 +66,7 @@ Txn *TxnManager::BeginTxn(UniquePtr<String> txn_text) {
6866

6967
// Storage txn in txn manager
7068
txn_map_[new_txn_id] = new_txn;
71-
ts_map_.emplace(ts, new_txn_id);
69+
// ts_map_.emplace(ts, new_txn_id);
7270
beginned_txns_.emplace_back(new_txn);
7371
rw_locker_.unlock();
7472

@@ -184,8 +182,6 @@ void TxnManager::AddDeltaEntry(UniquePtr<CatalogDeltaEntry> delta_entry) {
184182
UnrecoverableError("TxnManager is not running, cannot add delta entry");
185183
}
186184
i64 wal_size = wal_mgr_->WalSize();
187-
const auto &txn_ids = delta_entry->txn_ids();
188-
this->AddWaitFlushTxn(txn_ids);
189185
bg_task_processor_->Submit(MakeShared<AddDeltaEntryTask>(std::move(delta_entry), wal_size));
190186
}
191187

@@ -294,46 +290,4 @@ void TxnManager::FinishTxn(Txn *txn) {
294290
}
295291
}
296292

297-
void TxnManager::AddWaitFlushTxn(const Vector<TransactionID> &txn_ids) {
298-
std::stringstream ss;
299-
for (auto txn_id : txn_ids) {
300-
ss << txn_id << " ";
301-
}
302-
LOG_INFO(fmt::format("Add txns: {} to wait flush set", ss.str()));
303-
std::unique_lock w_lock(rw_locker_);
304-
wait_flush_txns_.insert(txn_ids.begin(), txn_ids.end());
305-
}
306-
307-
void TxnManager::RemoveWaitFlushTxns(const Vector<TransactionID> &txn_ids) {
308-
std::stringstream ss2;
309-
for (auto txn_id : txn_ids) {
310-
ss2 << txn_id << " ";
311-
}
312-
LOG_INFO(fmt::format("Remove txn: {} from wait flush set", ss2.str()));
313-
std::unique_lock w_lock(rw_locker_);
314-
for (auto txn_id : txn_ids) {
315-
if (!wait_flush_txns_.erase(txn_id)) {
316-
UnrecoverableError(fmt::format("Txn: {} not found in wait flush set", txn_id));
317-
}
318-
}
319-
}
320-
321-
TxnTimeStamp TxnManager::GetMinUnflushedTS() {
322-
std::unique_lock w_locker(rw_locker_);
323-
for (auto iter = ts_map_.begin(); iter != ts_map_.end();) {
324-
auto &[ts, txn_id] = *iter;
325-
if (txn_map_.find(txn_id) != txn_map_.end()) {
326-
LOG_INFO(fmt::format("Txn: {} found in txn map", txn_id));
327-
return ts;
328-
}
329-
if (wait_flush_txns_.find(txn_id) != wait_flush_txns_.end()) {
330-
LOG_INFO(fmt::format("Txn: {} wait flush", txn_id));
331-
return ts;
332-
}
333-
iter = ts_map_.erase(iter);
334-
}
335-
LOG_INFO(fmt::format("No txn is active, return the next ts {}", start_ts_));
336-
return start_ts_;
337-
}
338-
339293
} // namespace infinity

src/storage/txn/txn_manager.cppm

-8
Original file line numberDiff line numberDiff line change
@@ -88,12 +88,7 @@ public:
8888
private:
8989
void FinishTxn(Txn *txn);
9090

91-
void AddWaitFlushTxn(const Vector<TransactionID> &txn_ids);
92-
9391
public:
94-
void RemoveWaitFlushTxns(const Vector<TransactionID> &txn_ids);
95-
96-
TxnTimeStamp GetMinUnflushedTS();
9792

9893
bool enable_compaction() const { return enable_compaction_; }
9994

@@ -113,9 +108,6 @@ private:
113108
Map<TxnTimeStamp, WalEntry *> wait_conflict_ck_{}; // sorted by commit ts
114109

115110
Atomic<TxnTimeStamp> start_ts_{}; // The next txn ts
116-
// Deque<TxnTimeStamp> ts_queue_{}; // the ts queue
117-
Map<TxnTimeStamp, TransactionID> ts_map_{}; // optimize the data structure
118-
HashSet<TransactionID> wait_flush_txns_{};
119111

120112
// For stop the txn manager
121113
atomic_bool is_running_{false};

src/storage/wal/wal_manager.cpp

+15-8
Original file line numberDiff line numberDiff line change
@@ -58,10 +58,14 @@ module wal_manager;
5858

5959
namespace infinity {
6060

61-
WalManager::WalManager(Storage *storage, String wal_dir, u64 wal_size_threshold, u64 delta_checkpoint_interval_wal_bytes, FlushOptionType flush_option)
61+
WalManager::WalManager(Storage *storage,
62+
String wal_dir,
63+
u64 wal_size_threshold,
64+
u64 delta_checkpoint_interval_wal_bytes,
65+
FlushOptionType flush_option)
6266
: cfg_wal_size_threshold_(wal_size_threshold), cfg_delta_checkpoint_interval_wal_bytes_(delta_checkpoint_interval_wal_bytes), wal_dir_(wal_dir),
6367
wal_path_(wal_dir + "/" + WalFile::TempWalFilename()), storage_(storage), running_(false), flush_option_(flush_option), last_ckp_wal_size_(0),
64-
checkpoint_in_progress_(false), last_ckp_ts_(UNCOMMIT_TS), last_full_ckp_ts_(UNCOMMIT_TS) {}
68+
checkpoint_in_progress_(false), last_ckp_ts_(0), last_full_ckp_ts_(0) {}
6569

6670
WalManager::~WalManager() {
6771
if (running_.load()) {
@@ -142,6 +146,8 @@ i64 WalManager::WalSize() const {
142146
return wal_size_;
143147
}
144148

149+
TxnTimeStamp WalManager::GetLastCkpTS() { return last_ckp_ts_; }
150+
145151
// Flush is scheduled regularly. It collects a batch of transactions, sync
146152
// wal and do parallel committing. Each sync cost ~1s. Each checkpoint cost
147153
// ~10s. So it's necessary to sync for a batch of transactions, and to
@@ -274,25 +280,26 @@ void WalManager::Checkpoint(ForceCheckpointTask *ckp_task, TxnTimeStamp max_comm
274280
void WalManager::CheckpointInner(bool is_full_checkpoint, Txn *txn, TxnTimeStamp max_commit_ts, i64 wal_size) {
275281
DeferFn defer([&] { checkpoint_in_progress_.store(false); });
276282

283+
TxnTimeStamp last_ckp_ts = GetLastCkpTS();
277284
if (is_full_checkpoint) {
278285
if (max_commit_ts == last_full_ckp_ts_) {
279286
LOG_INFO(fmt::format("Skip full checkpoint because the max_commit_ts {} is the same as the last full checkpoint", max_commit_ts));
280287
return;
281288
}
282-
if (last_full_ckp_ts_ != UNCOMMIT_TS && last_full_ckp_ts_ >= max_commit_ts) {
289+
if (last_full_ckp_ts_ >= max_commit_ts) {
283290
UnrecoverableError(
284291
fmt::format("WalManager::UpdateLastFullMaxCommitTS last_full_ckp_ts_ {} >= max_commit_ts {}", last_full_ckp_ts_, max_commit_ts));
285292
}
286-
if (last_ckp_ts_ != UNCOMMIT_TS && last_ckp_ts_ > max_commit_ts) {
287-
UnrecoverableError(fmt::format("WalManager::UpdateLastFullMaxCommitTS last_ckp_ts_ {} >= max_commit_ts {}", last_ckp_ts_, max_commit_ts));
293+
if (last_ckp_ts > max_commit_ts) {
294+
UnrecoverableError(fmt::format("WalManager::UpdateLastFullMaxCommitTS last_ckp_ts {} >= max_commit_ts {}", last_ckp_ts, max_commit_ts));
288295
}
289296
} else {
290-
if (max_commit_ts == last_ckp_ts_) {
297+
if (max_commit_ts == last_ckp_ts) {
291298
LOG_INFO(fmt::format("Skip delta checkpoint because the max_commit_ts {} is the same as the last checkpoint", max_commit_ts));
292299
return;
293300
}
294-
if (last_ckp_ts_ >= max_commit_ts) {
295-
UnrecoverableError(fmt::format("WalManager::UpdateLastMaxCommitTS last_ckp_ts_ {} >= max_commit_ts {}", last_ckp_ts_, max_commit_ts));
301+
if (last_ckp_ts >= max_commit_ts) {
302+
UnrecoverableError(fmt::format("WalManager::UpdateLastMaxCommitTS last_ckp_ts {} >= max_commit_ts {}", last_ckp_ts, max_commit_ts));
296303
}
297304
}
298305
try {

src/storage/wal/wal_manager.cppm

+3-1
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,8 @@ public:
7070

7171
i64 GetLastCkpWalSize();
7272

73+
TxnTimeStamp GetLastCkpTS();
74+
7375
private:
7476
// Checkpoint Helper
7577
void CheckpointInner(bool is_full_checkpoint, Txn *txn, TxnTimeStamp max_commit_ts, i64 wal_size);
@@ -123,7 +125,7 @@ private:
123125
i64 last_ckp_wal_size_{};
124126
Atomic<bool> checkpoint_in_progress_{false};
125127

126-
// Only Checkpoint thread access following members
128+
// Only Checkpoint/Cleanup thread access following members
127129
TxnTimeStamp last_ckp_ts_{};
128130
TxnTimeStamp last_full_ckp_ts_{};
129131
};

0 commit comments

Comments
 (0)