Skip to content

Commit 3510cfb

Browse files
authored
Fix: error txn_store pointer when replay wal command. (infiniflow#804)
### What problem does this PR solve? The replay txn will set another `txn_store` when replay wal command, which will make block report error: ``` terminate called after throwing an instance of 'infinity::UnrecoverableException@infinity_exception' what(): Multiple transactions are changing data of Segment: 0, Block: 0@src/storage/meta/entry/block_entry.cpp:165 Signal: SIGABRT (signal SIGABRT) ``` It is because the `txn` before is not cleared. Issue link:infiniflow#802 ### Type of change - [x] Bug Fix (non-breaking change which fixes an issue)
1 parent 94e0b89 commit 3510cfb

File tree

8 files changed

+17
-19
lines changed

8 files changed

+17
-19
lines changed

src/executor/operator/physical_import.cpp

+4-4
Original file line numberDiff line numberDiff line change
@@ -198,7 +198,7 @@ void PhysicalImport::ImportCSV(QueryContext *query_context, ImportOperatorState
198198
UniquePtr<ZxvParserCtx> parser_context = nullptr;
199199
Txn *txn = query_context->GetTxn();
200200
{
201-
auto *buffer_mgr = txn->GetBufferMgr();
201+
auto *buffer_mgr = txn->buffer_manager();
202202
u64 segment_id = Catalog::GetNextSegmentID(table_entry_);
203203
SharedPtr<SegmentEntry> segment_entry = SegmentEntry::NewImportSegmentEntry(table_entry_, segment_id, txn);
204204
UniquePtr<BlockEntry> block_entry = BlockEntry::NewBlockEntry(segment_entry.get(), 0, 0, table_entry_->ColumnCount(), txn);
@@ -278,7 +278,7 @@ void PhysicalImport::ImportJSONL(QueryContext *query_context, ImportOperatorStat
278278
Vector<ColumnVector> column_vectors;
279279
for (SizeT i = 0; i < table_entry_->ColumnCount(); ++i) {
280280
auto *block_column_entry = block_entry->GetColumnBlockEntry(i);
281-
column_vectors.emplace_back(block_column_entry->GetColumnVector(txn->GetBufferMgr()));
281+
column_vectors.emplace_back(block_column_entry->GetColumnVector(txn->buffer_manager()));
282282
}
283283
while (true) {
284284
SizeT end_pos = jsonl_str.find('\n', start_pos);
@@ -312,7 +312,7 @@ void PhysicalImport::ImportJSONL(QueryContext *query_context, ImportOperatorStat
312312
column_vectors.clear();
313313
for (SizeT i = 0; i < table_entry_->ColumnCount(); ++i) {
314314
auto *block_column_entry = block_entry->GetColumnBlockEntry(i);
315-
column_vectors.emplace_back(block_column_entry->GetColumnVector(txn->GetBufferMgr()));
315+
column_vectors.emplace_back(block_column_entry->GetColumnVector(txn->buffer_manager()));
316316
}
317317
}
318318
}
@@ -361,7 +361,7 @@ void PhysicalImport::CSVRowHandler(void *context) {
361361
SizeT column_count = parser_context->parser_.CellCount();
362362

363363
auto *txn = parser_context->txn_;
364-
auto *buffer_mgr = txn->GetBufferMgr();
364+
auto *buffer_mgr = txn->buffer_manager();
365365

366366
auto segment_entry = parser_context->segment_entry_;
367367
UniquePtr<BlockEntry> block_entry = std::move(parser_context->block_entry_);

src/storage/meta/entry/block_entry.cpp

+5-2
Original file line numberDiff line numberDiff line change
@@ -162,8 +162,11 @@ u16 BlockEntry::AppendData(TransactionID txn_id,
162162
BufferManager *buffer_mgr) {
163163
std::unique_lock<std::shared_mutex> lck(this->rw_locker_);
164164
if (this->using_txn_id_ != 0 && this->using_txn_id_ != txn_id) {
165-
UnrecoverableError(
166-
fmt::format("Multiple transactions are changing data of Segment: {}, Block: {}", this->segment_entry_->segment_id(), this->block_id_));
165+
UnrecoverableError(fmt::format("Multiple transactions are changing data of Segment: {}, Block: {}, using_txn_id_: {}, txn_id: {}",
166+
this->segment_entry_->segment_id(),
167+
this->block_id_,
168+
this->using_txn_id_,
169+
txn_id));
167170
}
168171

169172
this->using_txn_id_ = txn_id;

src/storage/meta/entry/segment_index_entry.cpp

+2-2
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,7 @@ SegmentIndexEntry::SegmentIndexEntry(TableIndexEntry *table_index_entry, Segment
6262

6363
SharedPtr<SegmentIndexEntry>
6464
SegmentIndexEntry::NewIndexEntry(TableIndexEntry *table_index_entry, SegmentID segment_id, Txn *txn, CreateIndexParam *param) {
65-
auto *buffer_mgr = txn->GetBufferMgr();
65+
auto *buffer_mgr = txn->buffer_manager();
6666

6767
// FIXME: estimate index size.
6868
auto vector_file_worker = table_index_entry->CreateFileWorker(param, segment_id);
@@ -138,7 +138,7 @@ Status SegmentIndexEntry::CreateIndexPrepare(const IndexBase *index_base,
138138
bool check_ts) {
139139
TxnTimeStamp begin_ts = txn->BeginTS();
140140

141-
auto *buffer_mgr = txn->GetBufferMgr();
141+
auto *buffer_mgr = txn->buffer_manager();
142142
switch (index_base->index_type_) {
143143
case IndexType::kIVFFlat: {
144144
if (column_def->type()->type() != LogicalType::kEmbedding) {

src/storage/meta/entry/table_index_entry.cpp

+1-1
Original file line numberDiff line numberDiff line change
@@ -245,7 +245,7 @@ Tuple<FulltextIndexEntry *, Vector<SegmentIndexEntry *>, Status>
245245
TableIndexEntry::CreateIndexPrepare(TableEntry *table_entry, BlockIndex *block_index, Txn *txn, bool prepare, bool is_replay, bool check_ts) {
246246
FulltextIndexEntry *fulltext_index_entry = this->fulltext_index_entry_.get();
247247
if (fulltext_index_entry != nullptr && !IsFulltextIndexHomebrewed()) {
248-
auto *buffer_mgr = txn->GetBufferMgr();
248+
auto *buffer_mgr = txn->buffer_manager();
249249
for (const auto *segment_entry : block_index->segments_) {
250250
fulltext_index_entry->irs_index_->BatchInsert(table_entry, index_base_.get(), segment_entry, buffer_mgr);
251251
}

src/storage/txn/txn.cpp

-2
Original file line numberDiff line numberDiff line change
@@ -171,8 +171,6 @@ void Txn::CheckTxn(const String &db_name) {
171171
}
172172
}
173173

174-
BufferManager *Txn::GetBufferMgr() const { return this->txn_mgr_->GetBufferMgr(); }
175-
176174
// Database OPs
177175
Status Txn::CreateDatabase(const String &db_name, ConflictType conflict_type) {
178176
this->CheckTxnStatus();

src/storage/txn/txn.cppm

-2
Original file line numberDiff line numberDiff line change
@@ -146,8 +146,6 @@ public:
146146
Compact(TableEntry *table_entry, Vector<Pair<SharedPtr<SegmentEntry>, Vector<SegmentEntry *>>> &&segment_data, CompactSegmentsTaskType type);
147147

148148
// Getter
149-
BufferManager *GetBufferMgr() const;
150-
151149
BufferManager *buffer_manager() const { return buffer_mgr_; }
152150

153151
Catalog *GetCatalog() { return catalog_; }

src/storage/txn/txn_store.cpp

+1-1
Original file line numberDiff line numberDiff line change
@@ -206,7 +206,7 @@ void TxnTableStore::PrepareCommit() {
206206

207207
// Start to append
208208
LOG_TRACE(fmt::format("Transaction local storage table: {}, Start to prepare commit", *table_entry_->GetTableName()));
209-
Catalog::Append(table_entry_, txn_->TxnID(), this, txn_->CommitTS(), txn_->GetBufferMgr());
209+
Catalog::Append(table_entry_, txn_->TxnID(), this, txn_->CommitTS(), txn_->buffer_manager());
210210

211211
// Attention: "compact" needs to be ahead of "delete"
212212
if (compact_state_.task_type_ != CompactSegmentsTaskType::kInvalid) {

src/storage/wal/wal_manager.cpp

+4-5
Original file line numberDiff line numberDiff line change
@@ -742,10 +742,10 @@ void WalManager::WalCmdDeleteReplay(const WalCmdDelete &cmd, TransactionID txn_i
742742
}
743743

744744
auto fake_txn = Txn::NewReplayTxn(storage_->buffer_manager(), storage_->txn_manager(), storage_->catalog(), txn_id);
745-
auto table_store = MakeShared<TxnTableStore>(table_entry, fake_txn.get());
745+
auto table_store = fake_txn->GetTxnTableStore(table_entry);
746746
table_store->Delete(cmd.row_ids_);
747747
fake_txn->FakeCommit(commit_ts);
748-
Catalog::Delete(table_store->table_entry_, fake_txn->TxnID(), (void *)table_store.get(), fake_txn->CommitTS(), table_store->delete_state_);
748+
Catalog::Delete(table_store->table_entry_, fake_txn->TxnID(), (void *)table_store, fake_txn->CommitTS(), table_store->delete_state_);
749749
Catalog::CommitWrite(table_store->table_entry_, fake_txn->TxnID(), commit_ts, table_store->txn_segments());
750750
}
751751

@@ -777,15 +777,14 @@ void WalManager::WalCmdAppendReplay(const WalCmdAppend &cmd, TransactionID txn_i
777777
}
778778

779779
auto fake_txn = Txn::NewReplayTxn(storage_->buffer_manager(), storage_->txn_manager(), storage_->catalog(), txn_id);
780-
781-
auto table_store = MakeShared<TxnTableStore>(table_entry, fake_txn.get());
780+
auto table_store = fake_txn->GetTxnTableStore(table_entry);
782781
table_store->Append(cmd.block_);
783782

784783
auto append_state = MakeUnique<AppendState>(table_store->blocks_);
785784
table_store->append_state_ = std::move(append_state);
786785

787786
fake_txn->FakeCommit(commit_ts);
788-
Catalog::Append(table_store->table_entry_, fake_txn->TxnID(), table_store.get(), commit_ts, storage_->buffer_manager());
787+
Catalog::Append(table_store->table_entry_, fake_txn->TxnID(), table_store, commit_ts, storage_->buffer_manager());
789788
Catalog::CommitWrite(table_store->table_entry_, fake_txn->TxnID(), commit_ts, table_store->txn_segments());
790789
}
791790

0 commit comments

Comments
 (0)