Skip to content

Commit

Permalink
Fix: committing txn state invisible bug.
Browse files Browse the repository at this point in the history
  • Loading branch information
small-turtle-1 committed May 7, 2024
1 parent c94a9d7 commit 860f64e
Show file tree
Hide file tree
Showing 6 changed files with 65 additions and 47 deletions.
17 changes: 9 additions & 8 deletions src/storage/txn/txn.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -379,6 +379,11 @@ Status Txn::GetViews(const String &, Vector<ViewDetail> &output_view_array) {
return {ErrorCode::kNotSupported, "Not Implemented Txn Operation: GetViews"};
}

void Txn::SetTxnCommitted(TxnTimeStamp committed_ts) {
// LOG_INFO(fmt::format("Txn {} is committed, committed_ts: {}", txn_id_, committed_ts));
txn_context_.SetTxnCommitted(committed_ts);
}

void Txn::SetTxnCommitting(TxnTimeStamp commit_ts) {
txn_context_.SetTxnCommitting(commit_ts);
wal_entry_->commit_ts_ = commit_ts;
Expand All @@ -404,13 +409,14 @@ TxnTimeStamp Txn::Commit() {
// Don't need to write empty WalEntry (read-only transactions).
TxnTimeStamp commit_ts = txn_mgr_->GetCommitTimeStampR(this);
this->SetTxnCommitting(commit_ts);
this->SetTxnCommitted();
LOG_TRACE(fmt::format("Txn: {} is committed. commit ts: {}", txn_id_, commit_ts));
this->SetTxnCommitted(commit_ts);
return commit_ts;
}

// register commit ts in wal manager here, define the commit sequence
TxnTimeStamp commit_ts = txn_mgr_->GetCommitTimeStampW(this);
// LOG_INFO(fmt::format("Txn: {} is committing, committing ts: {}", txn_id_, commit_ts));

this->SetTxnCommitting(commit_ts);
// LOG_INFO(fmt::format("Txn {} commit ts: {}", txn_id_, commit_ts));

Expand All @@ -428,7 +434,6 @@ TxnTimeStamp Txn::Commit() {
// Wait until CommitTxnBottom is done.
std::unique_lock<std::mutex> lk(lock_);
cond_var_.wait(lk, [this] { return done_bottom_; });
LOG_TRACE(fmt::format("Txn: {} is committed. commit ts: {}", txn_id_, commit_ts));

if (txn_mgr_->enable_compaction()) {
txn_store_.MaintainCompactionAlg();
Expand All @@ -437,13 +442,11 @@ TxnTimeStamp Txn::Commit() {
txn_mgr_->AddDeltaEntry(std::move(local_catalog_delta_ops_entry_));
}

this->SetTxnCommitted();

return commit_ts;
}

bool Txn::CheckConflict(Txn *txn) {
LOG_INFO(fmt::format("Txn {} check conflict with {}.", txn_id_, txn->txn_id_));
LOG_TRACE(fmt::format("Txn {} check conflict with {}.", txn_id_, txn->txn_id_));

return txn_store_.CheckConflict(txn->txn_store_);
}
Expand Down Expand Up @@ -499,8 +502,6 @@ void Txn::Rollback() {

txn_store_.Rollback(txn_id_, abort_ts);

this->SetTxnRollbacked();

LOG_TRACE(fmt::format("Txn: {} is dropped.", txn_id_));
}

Expand Down
4 changes: 3 additions & 1 deletion src/storage/txn/txn.cppm
Original file line number Diff line number Diff line change
Expand Up @@ -175,13 +175,15 @@ public:

inline TxnTimeStamp CommitTS() { return txn_context_.GetCommitTS(); }

TxnTimeStamp CommittedTS() { return txn_context_.GetCommittedTS(); }

inline TxnTimeStamp BeginTS() { return txn_context_.GetBeginTS(); }

inline TxnState GetTxnState() { return txn_context_.GetTxnState(); }

inline TxnType GetTxnType() const { return txn_context_.GetTxnType(); }

void SetTxnCommitted() { txn_context_.SetTxnCommitted(); }
void SetTxnCommitted(TxnTimeStamp committed_ts);

void SetTxnCommitting(TxnTimeStamp commit_ts);

Expand Down
9 changes: 8 additions & 1 deletion src/storage/txn/txn_context.cppm
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,11 @@ public:
return commit_ts_;
}

TxnTimeStamp GetCommittedTS() {
std::shared_lock<std::shared_mutex> r_locker(rw_locker_);
return committed_ts_;
}

inline TxnState GetTxnState() {
std::shared_lock<std::shared_mutex> r_locker(rw_locker_);
return state_;
Expand All @@ -63,12 +68,13 @@ public:
state_ = TxnState::kRollbacked;
}

inline void SetTxnCommitted() {
inline void SetTxnCommitted(TxnTimeStamp committed_ts) {
std::unique_lock<std::shared_mutex> w_locker(rw_locker_);
if (state_ != TxnState::kCommitting) {
UnrecoverableError("Transaction isn't in COMMITTING status.");
}
state_ = TxnState::kCommitted;
committed_ts_ = committed_ts;
}

inline void SetTxnCommitting(TxnTimeStamp commit_ts) {
Expand All @@ -88,6 +94,7 @@ private:
std::shared_mutex rw_locker_{};
TxnTimeStamp begin_ts_{};
TxnTimeStamp commit_ts_{};
TxnTimeStamp committed_ts_{};
TxnState state_{TxnState::kStarted};
TxnType type_{TxnType::kInvalid};
};
Expand Down
70 changes: 39 additions & 31 deletions src/storage/txn/txn_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ Txn *TxnManager::BeginTxn(UniquePtr<String> txn_text) {
beginned_txns_.emplace_back(new_txn);
rw_locker_.unlock();

LOG_TRACE(fmt::format("Txn: {} is Begin. begin ts: {}", new_txn_id, ts));
// LOG_INFO(fmt::format("Txn: {} is Begin. begin ts: {}", new_txn_id, ts));
return new_txn.get();
}

Expand Down Expand Up @@ -100,7 +100,7 @@ TxnTimeStamp TxnManager::GetCommitTimeStampW(Txn *txn) {
std::lock_guard guard(rw_locker_);
TxnTimeStamp commit_ts = ++start_ts_;
wait_conflict_ck_.emplace(commit_ts, nullptr);
finished_txns_.emplace_back(txn);
finishing_txns_.emplace(txn);
txn->SetTxnWrite();
return commit_ts;
}
Expand All @@ -111,18 +111,26 @@ bool TxnManager::CheckConflict(Txn *txn) {
Vector<Txn *> candidate_txns;
{
std::lock_guard guard(rw_locker_);
// use binary search find the first txn whose commit ts is greater than `begin_ts`
auto iter =
std::lower_bound(finished_txns_.begin(), finished_txns_.end(), begin_ts, [](Txn *txn, TxnTimeStamp ts) { return txn->CommitTS() <= ts; });
for (; iter != finished_txns_.end(); ++iter) {
if ((*iter)->CommitTS() >= commit_ts) {
break; // not include itself
// LOG_INFO(fmt::format("Txn {} check conflict", txn->TxnID()));
for (auto *finishing_txn : finishing_txns_) {
// LOG_INFO(fmt::format("Txn {} tries to test txn {}", txn->TxnID(), finishing_txn->TxnID()));
const auto &finishing_state = finishing_txn->GetTxnState();
bool add = false;
if (finishing_state == TxnState::kCommitted) {
TxnTimeStamp committed_ts = finishing_txn->CommittedTS();
if (begin_ts < committed_ts) {
add = true;
}
} else if (finishing_state == TxnState::kCommitting) {
TxnTimeStamp finishing_commit_ts = finishing_txn->CommitTS();
if (commit_ts > finishing_commit_ts) {
add = true;
}
}
auto state = (*iter)->GetTxnState();
if (state == TxnState::kRollbacking || state == TxnState::kRollbacked) {
continue;
if (add) {
// LOG_INFO(fmt::format("Txn {} tests txn {}", txn->TxnID(), finishing_txn->TxnID()));
candidate_txns.push_back(finishing_txn);
}
candidate_txns.push_back(*iter);
}
}
for (auto *candidate_txn : candidate_txns) {
Expand Down Expand Up @@ -211,16 +219,14 @@ void TxnManager::Stop() {
bool TxnManager::Stopped() { return !is_running_.load(); }

TxnTimeStamp TxnManager::CommitTxn(Txn *txn) {
TransactionID txn_id = txn->TxnID();
TxnTimeStamp commit_ts = txn->Commit();
this->FinishTxn(txn_id);
this->FinishTxn(txn);
return commit_ts;
}

void TxnManager::RollBackTxn(Txn *txn) {
TransactionID txn_id = txn->TxnID();
txn->Rollback();
this->FinishTxn(txn_id);
this->FinishTxn(txn);
}

SizeT TxnManager::ActiveTxnCount() {
Expand All @@ -232,22 +238,25 @@ TxnTimeStamp TxnManager::CurrentTS() const { return start_ts_; }

// A Txn can be deleted when there is no uncommitted txn whose begin is less than the commit ts of the txn
// So maintain the least uncommitted begin ts
void TxnManager::FinishTxn(TransactionID txn_id) {
void TxnManager::FinishTxn(Txn *txn) {
std::lock_guard guard(rw_locker_);
auto iter = txn_map_.find(txn_id);
if (iter == txn_map_.end()) {
return;
}
auto *txn = iter->second.get();

if (txn->GetTxnType() == TxnType::kInvalid) {
UnrecoverableError("Txn type is invalid");
}
if (txn->GetTxnType() == TxnType::kRead) {
txn_map_.erase(txn_id);
// LOG_INFO(fmt::format("Erase txn 1: {}", txn_id));
} else if (txn->GetTxnType() == TxnType::kRead) {
txn_map_.erase(txn->TxnID());
return;
}

TxnTimeStamp finished_ts = ++start_ts_;
finished_txns_.emplace_back(finished_ts, txn);
auto state = txn->GetTxnState();
if (state == TxnState::kCommitting) {
txn->SetTxnCommitted(finished_ts);
} else if (state == TxnState::kRollbacking) {
txn->SetTxnRollbacked();
}

TxnTimeStamp least_uncommitted_begin_ts = txn->CommitTS() + 1;
while (!beginned_txns_.empty()) {
auto first_txn = beginned_txns_.front().lock();
Expand All @@ -265,19 +274,18 @@ void TxnManager::FinishTxn(TransactionID txn_id) {
}

while (!finished_txns_.empty()) {
auto *finished_txn = finished_txns_.front();
const auto &[finished_ts, finished_txn] = finished_txns_.front();
auto finished_state = finished_txn->GetTxnState();
if (finished_state != TxnState::kCommitted && finished_state != TxnState::kRollbacked) {
break;
}
TxnTimeStamp finished_commit_ts = finished_txn->CommitTS();
if (finished_commit_ts > least_uncommitted_begin_ts) {
if (finished_ts > least_uncommitted_begin_ts) {
break;
}
auto finished_txn_id = finished_txn->TxnID();
// LOG_INFO(fmt::format("Txn: {} is erased from txn map", finished_txn_id));
finishing_txns_.erase(finished_txn);
// LOG_INFO(fmt::format("Txn: {} is erased", finished_txn_id));
SizeT remove_n = txn_map_.erase(finished_txn_id);
// LOG_INFO(fmt::format("Erase txn 2: {}", finished_txn_id));
if (remove_n == 0) {
UnrecoverableError(fmt::format("Txn: {} not found in txn map", finished_txn_id));
}
Expand Down
9 changes: 5 additions & 4 deletions src/storage/txn/txn_manager.cppm
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ public:
TxnTimeStamp CurrentTS() const;

private:
void FinishTxn(TransactionID txn_id);
void FinishTxn(Txn *txn);

void AddWaitFlushTxn(const Vector<TransactionID> &txn_ids);

Expand All @@ -107,9 +107,10 @@ private:
HashMap<TransactionID, SharedPtr<Txn>> txn_map_{};
WalManager *wal_mgr_;

Deque<WeakPtr<Txn>> beginned_txns_; // sorted by begin ts
Deque<Txn *> finished_txns_; // sorted by commit ts
Map<TxnTimeStamp, WalEntry *> wait_conflict_ck_{};
Deque<WeakPtr<Txn>> beginned_txns_; // sorted by begin ts
HashSet<Txn *> finishing_txns_; // the txns for conflict check
Deque<Pair<TxnTimeStamp, Txn *>> finished_txns_; // sorted by finished ts
Map<TxnTimeStamp, WalEntry *> wait_conflict_ck_{}; // sorted by commit ts

Atomic<TxnTimeStamp> start_ts_{}; // The next txn ts
// Deque<TxnTimeStamp> ts_queue_{}; // the ts queue
Expand Down
3 changes: 1 addition & 2 deletions src/storage/wal/catalog_delta_entry.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1118,8 +1118,7 @@ void GlobalCatalogDeltaEntry::AddDeltaEntryInner(CatalogDeltaEntry *delta_entry)
for (const auto txn_id : delta_entry->txn_ids()) {
ss << txn_id << " ";
}
LOG_INFO(ss.str());
throw e;
UnrecoverableError(ss.str());
}
} else {
PruneFlag prune_flag = CatalogDeltaOperation::ToPrune(None, new_op->merge_flag_);
Expand Down

0 comments on commit 860f64e

Please sign in to comment.