Skip to content

Commit

Permalink
Fix UnrecoverableError: Invalid MergeFlag: 1 (#2416)
Browse files Browse the repository at this point in the history
### What problem does this PR solve?

Fix `bool TxnTableStore::CheckConflict(const TxnTableStore
*other_table_store) const`

Issue link:#2388

### Type of change

- [x] Bug Fix (non-breaking change which fixes an issue)
- [x] Refactoring
  • Loading branch information
yangzq50 authored Jan 2, 2025
1 parent e295d00 commit d48860a
Show file tree
Hide file tree
Showing 9 changed files with 76 additions and 38 deletions.
19 changes: 11 additions & 8 deletions src/storage/meta/entry/segment_index_entry.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -865,7 +865,8 @@ void SegmentIndexEntry::ReplaceChunkIndexEntries(TxnTableStore *txn_table_store,
ChunkIndexEntry *SegmentIndexEntry::RebuildChunkIndexEntries(TxnTableStore *txn_table_store, SegmentEntry *segment_entry) {
const auto &index_name = *table_index_entry_->GetIndexName();
Txn *txn = txn_table_store->GetTxn();
if (!TrySetOptimizing(txn)) {
const auto [result_b, add_segment_optimizing] = TrySetOptimizing(txn);
if (!result_b) {
LOG_INFO(fmt::format("Index {} segment {} is optimizing, skip optimize.", index_name, segment_id_));
return nullptr;
}
Expand Down Expand Up @@ -899,6 +900,7 @@ ChunkIndexEntry *SegmentIndexEntry::RebuildChunkIndexEntries(TxnTableStore *txn_
return nullptr;
}
}
add_segment_optimizing();
RowID base_rowid(segment_id_, 0);
SharedPtr<ChunkIndexEntry> merged_chunk_index_entry = nullptr;
switch (index_base->index_type_) {
Expand Down Expand Up @@ -1225,17 +1227,18 @@ UniquePtr<SegmentIndexEntry> SegmentIndexEntry::Deserialize(const nlohmann::json
return segment_index_entry;
}

bool SegmentIndexEntry::TrySetOptimizing(Txn *txn) {
Pair<bool, std::function<void()>> SegmentIndexEntry::TrySetOptimizing(Txn *txn) {
bool expected = false;
bool success = optimizing_.compare_exchange_strong(expected, true);
if (!success) {
return false;
return {false, nullptr};
}
TableEntry *table_entry = table_index_entry_->table_index_meta()->GetTableEntry();
TxnTableStore *txn_table_store = txn->txn_store()->GetTxnTableStore(table_entry);
TxnIndexStore *txn_index_store = txn_table_store->GetIndexStore(table_index_entry_);
txn_index_store->AddSegmentOptimizing(this);
return true;
return {true, [this, txn] {
TableEntry *table_entry = table_index_entry_->table_index_meta()->GetTableEntry();
TxnTableStore *txn_table_store = txn->txn_store()->GetTxnTableStore(table_entry);
TxnIndexStore *txn_index_store = txn_table_store->GetIndexStore(table_index_entry_);
txn_index_store->AddSegmentOptimizing(this);
}};
}

void SegmentIndexEntry::ResetOptimizing() { optimizing_.store(false); }
Expand Down
2 changes: 1 addition & 1 deletion src/storage/meta/entry/segment_index_entry.cppm
Original file line number Diff line number Diff line change
Expand Up @@ -277,7 +277,7 @@ private:
Atomic<TxnTimeStamp> deprecate_ts_ = UNCOMMIT_TS;

public:
bool TrySetOptimizing(Txn *txn);
Pair<bool, std::function<void()>> TrySetOptimizing(Txn *txn);

void ResetOptimizing();

Expand Down
5 changes: 3 additions & 2 deletions src/storage/meta/entry/table_entry.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -918,7 +918,8 @@ void TableEntry::OptimizeIndex(Txn *txn) {
const IndexFullText *index_fulltext = static_cast<const IndexFullText *>(index_base);
Map<SegmentID, SharedPtr<SegmentIndexEntry>> index_by_segment = table_index_entry->GetIndexBySegmentSnapshot(this, txn);
for (auto &[segment_id, segment_index_entry] : index_by_segment) {
if (!segment_index_entry->TrySetOptimizing(txn)) {
const auto [result_b, add_segment_optimizing] = segment_index_entry->TrySetOptimizing(txn);
if (!result_b) {
LOG_INFO(fmt::format("Index {} segment {} is optimizing, skip optimize.", index_name, segment_id));
continue;
}
Expand All @@ -937,7 +938,7 @@ void TableEntry::OptimizeIndex(Txn *txn) {
opt_success = true;
continue;
}

add_segment_optimizing();
String msg = fmt::format("merging {}", index_name);
Vector<String> base_names;
Vector<RowID> base_rowids;
Expand Down
8 changes: 4 additions & 4 deletions src/storage/txn/txn.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -568,11 +568,11 @@ TxnTimeStamp Txn::Commit() {
txn_store_.PrepareCommit1(); // Only for import and compact, pre-commit segment
// LOG_INFO(fmt::format("Txn {} commit ts: {}", txn_id_, commit_ts));

if (txn_mgr_->CheckTxnConflict(this)) {
LOG_ERROR(fmt::format("Txn: {} is rolled back. rollback ts: {}", txn_id_, commit_ts));
if (const auto conflict_reason = txn_mgr_->CheckTxnConflict(this); conflict_reason) {
LOG_ERROR(fmt::format("Txn: {} is rolled back. rollback ts: {}. Txn conflict reason: {}.", txn_id_, commit_ts, *conflict_reason));
wal_entry_ = nullptr;
txn_mgr_->SendToWAL(this);
RecoverableError(Status::TxnConflict(txn_id_, "Txn conflict reason."));
RecoverableError(Status::TxnConflict(txn_id_, fmt::format("Txn conflict reason: {}.", *conflict_reason)));
}

// Put wal entry to the manager in the same order as commit_ts.
Expand All @@ -595,7 +595,7 @@ TxnTimeStamp Txn::Commit() {

bool Txn::CheckConflict() { return txn_store_.CheckConflict(catalog_); }

bool Txn::CheckConflict(Txn *other_txn) {
Optional<String> Txn::CheckConflict(Txn *other_txn) {
LOG_TRACE(fmt::format("Txn {} check conflict with {}.", txn_id_, other_txn->txn_id_));

return txn_store_.CheckConflict(other_txn->txn_store_);
Expand Down
2 changes: 1 addition & 1 deletion src/storage/txn/txn.cppm
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ public:

bool CheckConflict();

bool CheckConflict(Txn *txn);
Optional<String> CheckConflict(Txn *txn);

void CommitBottom();

Expand Down
12 changes: 6 additions & 6 deletions src/storage/txn/txn_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ TxnTimeStamp TxnManager::GetWriteCommitTS(Txn *txn) {
return commit_ts;
}

bool TxnManager::CheckTxnConflict(Txn *txn) {
Optional<String> TxnManager::CheckTxnConflict(Txn *txn) {
TxnTimeStamp commit_ts = txn->CommitTS();
Vector<SharedPtr<Txn>> candidate_txns;
TxnTimeStamp min_checking_ts = UNCOMMIT_TS;
Expand All @@ -160,19 +160,19 @@ bool TxnManager::CheckTxnConflict(Txn *txn) {
}
});
if (txn->CheckConflict()) {
return true;
return "Conflict in txn->CheckConflict()";
}
for (SharedPtr<Txn> &candidate_txn : candidate_txns) {
for (const auto &candidate_txn : candidate_txns) {
// LOG_INFO(fmt::format("Txn {}(commit_ts: {}) check conflict with txn {}(commit_ts: {})",
// txn->TxnID(),
// txn->CommitTS(),
// candidate_txn->TxnID(),
// candidate_txn->CommitTS()));
if (txn->CheckConflict(candidate_txn.get())) {
return true;
if (const auto conflict_reason = txn->CheckConflict(candidate_txn.get()); conflict_reason) {
return fmt::format("Conflict with candidate_txn {}: {}", candidate_txn->TxnID(), *conflict_reason);
}
}
return false;
return None;
}

void TxnManager::SendToWAL(Txn *txn) {
Expand Down
2 changes: 1 addition & 1 deletion src/storage/txn/txn_manager.cppm
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ public:

TxnTimeStamp GetWriteCommitTS(Txn *txn);

bool CheckTxnConflict(Txn *txn);
Optional<String> CheckTxnConflict(Txn *txn);

void SendToWAL(Txn *txn);

Expand Down
60 changes: 47 additions & 13 deletions src/storage/txn/txn_store.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

module;

#include <ranges>
#include <string>
#include <vector>

Expand Down Expand Up @@ -357,19 +358,50 @@ bool TxnTableStore::CheckConflict(Catalog *catalog, Txn *txn) const {
return false;
}

bool TxnTableStore::CheckConflict(const TxnTableStore *other_table_store) const {
for (const auto &[index_name, _] : txn_indexes_store_) {
for (const auto [index_entry, _] : other_table_store->txn_indexes_) {
if (index_name == *index_entry->GetIndexName()) {
return true;
Optional<String> TxnTableStore::CheckConflict(const TxnTableStore *other_table_store) const {
{
Set<std::string_view> other_txn_indexes_set;
Map<std::string_view, Set<SegmentID>> other_txn_indexes_store_map;
for (const auto &index_entry : std::views::keys(other_table_store->txn_indexes_)) {
other_txn_indexes_set.insert(*index_entry->GetIndexName());
}
for (const auto &[index_name, index_store] : other_table_store->txn_indexes_store_) {
auto &segment_set = other_txn_indexes_store_map[index_name];
for (const auto segment_id : std::views::keys(index_store->index_entry_map_)) {
segment_set.insert(segment_id);
}
}
for (const auto &index_entry : std::views::keys(txn_indexes_)) {
if (const auto &index_name = *index_entry->GetIndexName(); other_txn_indexes_set.contains(index_name)) {
return fmt::format("{}: txn_indexes_ containing Index {} conflict with other_table_store->txn_indexes_",
__func__,
index_name);
}
}
for (const auto &[index_name, index_store] : txn_indexes_store_) {
if (other_txn_indexes_set.contains(index_name)) {
return fmt::format("{}: txn_indexes_store_ containing Index {} conflict with other_table_store->txn_indexes_",
__func__,
index_name);
}
if (const auto iter = other_txn_indexes_store_map.find(index_name); iter != other_txn_indexes_store_map.end()) {
for (const auto &other_segment_set = iter->second; const auto segment_id : std::views::keys(index_store->index_entry_map_)) {
if (other_segment_set.contains(segment_id)) {
return fmt::format(
"{}: txn_indexes_store_ containing Index {} Segment {} conflict with other_table_store->txn_indexes_store_",
__func__,
index_name,
segment_id);
}
}
}
}
}

const auto &delete_state = delete_state_;
const auto &other_delete_state = other_table_store->delete_state_;
if (delete_state.rows_.empty() || other_delete_state.rows_.empty()) {
return false;
return None;
}
for (const auto &[segment_id, block_map] : delete_state.rows_) {
auto other_iter = other_delete_state.rows_.find(segment_id);
Expand All @@ -391,12 +423,12 @@ bool TxnTableStore::CheckConflict(const TxnTableStore *other_table_store) const
break;
}
if (other_block_offsets[j] == block_offset) {
return true;
return fmt::format("Delete conflict: segment_id: {}, block_id: {}, block_offset: {}", segment_id, block_id, block_offset);
}
}
}
}
return false;
return None;
}

void TxnTableStore::PrepareCommit1(const Vector<WalSegmentInfo *> &segment_infos) const {
Expand Down Expand Up @@ -635,11 +667,11 @@ bool TxnStore::CheckConflict(Catalog *catalog) {
return false;
}

bool TxnStore::CheckConflict(const TxnStore &other_txn_store) {
Optional<String> TxnStore::CheckConflict(const TxnStore &other_txn_store) {
for (const auto &[table_name, table_store] : txn_tables_store_) {
for (const auto [table_entry, _] : other_txn_store.txn_tables_) {
if (table_name == *table_entry->GetTableName()) {
return true;
return fmt::format("txn_tables_store_ containing table_name {} conflict with other_txn_store.txn_tables_", table_name);
}
}

Expand All @@ -649,11 +681,13 @@ bool TxnStore::CheckConflict(const TxnStore &other_txn_store) {
}

const TxnTableStore *other_table_store = other_iter->second.get();
if (table_store->CheckConflict(other_table_store)) {
return true;
if (const auto conflict_reason = table_store->CheckConflict(other_table_store); conflict_reason) {
return fmt::format("txn_tables_store_ containing table_name {} conflict with other_txn_store.txn_tables_store_: {}",
table_name,
*conflict_reason);
}
}
return false;
return None;
}

void TxnStore::PrepareCommit1() {
Expand Down
4 changes: 2 additions & 2 deletions src/storage/txn/txn_store.cppm
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ public:

bool CheckConflict(Catalog *catalog, Txn *txn) const;

bool CheckConflict(const TxnTableStore *txn_table_store) const;
Optional<String> CheckConflict(const TxnTableStore *txn_table_store) const;

void PrepareCommit1(const Vector<WalSegmentInfo *> &segment_infos) const;

Expand Down Expand Up @@ -241,7 +241,7 @@ public:

bool CheckConflict(Catalog *catalog);

bool CheckConflict(const TxnStore &txn_store);
Optional<String> CheckConflict(const TxnStore &txn_store);

void PrepareCommit1();

Expand Down

0 comments on commit d48860a

Please sign in to comment.