Skip to content

Commit 8763556

Browse files
Fix: check entry visibility when a txn is committing but not committed.
1 parent 3fc933d commit 8763556

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

41 files changed

+184
-123
lines changed

src/executor/operator/physical_knn_scan.cpp

+4-3
Original file line numberDiff line numberDiff line change
@@ -241,7 +241,8 @@ SizeT PhysicalKnnScan::BlockEntryCount() const { return base_table_ref_->block_i
241241

242242
template <typename DataType, template <typename, typename> typename C>
243243
void PhysicalKnnScan::ExecuteInternal(QueryContext *query_context, KnnScanOperatorState *operator_state) {
244-
TxnTimeStamp begin_ts = query_context->GetTxn()->BeginTS();
244+
Txn *txn = query_context->GetTxn();
245+
TxnTimeStamp begin_ts = txn->BeginTS();
245246

246247
if (!common_query_filter_->TryFinishBuild(begin_ts, query_context->GetTxn()->buffer_mgr())) {
247248
// not ready, abort and wait for next time
@@ -484,7 +485,7 @@ void PhysicalKnnScan::ExecuteInternal(QueryContext *query_context, KnnScanOperat
484485
if (block_entry == nullptr) {
485486
UnrecoverableError(
486487
fmt::format("Cannot find segment id: {}, block id: {}, index chunk is {}", segment_id, block_id, chunk_id));
487-
}
488+
} // this is for debug
488489
}
489490
merge_heap->Search(0, d_ptr.get(), row_ids.get(), result_n);
490491
}
@@ -493,7 +494,7 @@ void PhysicalKnnScan::ExecuteInternal(QueryContext *query_context, KnnScanOperat
493494
auto [chunk_index_entries, memory_index_entry] = segment_index_entry->GetHnswIndexSnapshot();
494495
int i = 0;
495496
for (auto &chunk_index_entry : chunk_index_entries) {
496-
if (chunk_index_entry->CheckVisible(begin_ts)) {
497+
if (chunk_index_entry->CheckVisible(txn)) {
497498
BufferHandle index_handle = chunk_index_entry->GetIndex();
498499
hnsw_search(index_handle, false, i++);
499500
}

src/executor/operator/physical_match.cpp

+3-4
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ module;
2323
module physical_match;
2424

2525
import stl;
26-
26+
import txn;
2727
import query_context;
2828
import operator_state;
2929
import physical_operator;
@@ -517,9 +517,8 @@ bool PhysicalMatch::ExecuteInnerHomebrewed(QueryContext *query_context, Operator
517517
auto execute_start_time = std::chrono::high_resolution_clock::now();
518518
// 1. build QueryNode tree
519519
// 1.1 populate column2analyzer
520-
TransactionID txn_id = query_context->GetTxn()->TxnID();
521-
TxnTimeStamp begin_ts = query_context->GetTxn()->BeginTS();
522-
QueryBuilder query_builder(txn_id, begin_ts, base_table_ref_);
520+
Txn *txn = query_context->GetTxn();
521+
QueryBuilder query_builder(txn, base_table_ref_);
523522
auto finish_init_query_builder_time = std::chrono::high_resolution_clock::now();
524523
TimeDurationType query_builder_init_duration = finish_init_query_builder_time - execute_start_time;
525524
LOG_TRACE(fmt::format("PhysicalMatch Part 0.1: Init QueryBuilder time: {} ms", query_builder_init_duration.count()));

src/planner/bound/base_table_ref.cppm

+3-3
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ export module base_table_ref;
2121
import stl;
2222
import table_ref;
2323
import table_entry;
24-
24+
import txn;
2525
import table_function;
2626
import block_index;
2727
import internal_types;
@@ -48,8 +48,8 @@ public:
4848
explicit BaseTableRef(TableEntry *table_entry, SharedPtr<BlockIndex> block_index)
4949
: TableRef(TableRefType::kTable, ""), table_entry_ptr_(table_entry), block_index_(std::move(block_index)) {}
5050

51-
static SharedPtr<BaseTableRef> FakeTableRef(TableEntry *table_entry, TxnTimeStamp ts) {
52-
SharedPtr<BlockIndex> block_index = table_entry->GetBlockIndex(ts);
51+
static SharedPtr<BaseTableRef> FakeTableRef(TableEntry *table_entry, Txn *txn) {
52+
SharedPtr<BlockIndex> block_index = table_entry->GetBlockIndex(txn);
5353
return MakeShared<BaseTableRef>(table_entry, std::move(block_index));
5454
}
5555

src/planner/query_binder.cpp

+3-3
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,7 @@ import data_type;
7171
import logical_type;
7272
import base_entry;
7373
import view_entry;
74+
import txn;
7475

7576
namespace infinity {
7677

@@ -425,10 +426,9 @@ SharedPtr<BaseTableRef> QueryBinder::BuildBaseTable(QueryContext *query_context,
425426
columns.emplace_back(idx);
426427
}
427428

428-
// TransactionID txn_id = query_context->GetTxn()->TxnID();
429-
TxnTimeStamp begin_ts = query_context->GetTxn()->BeginTS();
429+
Txn *txn = query_context->GetTxn();
430430

431-
SharedPtr<BlockIndex> block_index = table_entry->GetBlockIndex(begin_ts);
431+
SharedPtr<BlockIndex> block_index = table_entry->GetBlockIndex(txn);
432432

433433
u64 table_index = bind_context_ptr_->GenerateTableIndex();
434434
auto table_ref = MakeShared<BaseTableRef>(table_entry, std::move(columns), block_index, alias, table_index, names_ptr, types_ptr);

src/storage/bg_task/compact_segments_task.cpp

+1-1
Original file line numberDiff line numberDiff line change
@@ -148,7 +148,7 @@ void CompactSegmentsTask::CompactSegments(CompactSegmentsTaskState &state) {
148148
}
149149

150150
auto new_segment = CompactSegmentsToOne(state, to_compact_segments);
151-
block_index->Insert(new_segment.get(), UNCOMMIT_TS, false);
151+
block_index->Insert(new_segment.get(), txn_);
152152

153153
segment_data.emplace_back(new_segment, std::move(to_compact_segments));
154154
old_segments.insert(old_segments.end(), to_compact_segments.begin(), to_compact_segments.end());

src/storage/common/block_index.cpp

+6-5
Original file line numberDiff line numberDiff line change
@@ -22,12 +22,12 @@ import stl;
2222
import segment_entry;
2323
import global_block_id;
2424
import block_iter;
25-
import segment_iter;
25+
import txn;
2626

2727
namespace infinity {
2828

29-
void BlockIndex::Insert(SegmentEntry *segment_entry, TxnTimeStamp timestamp, bool check_ts) {
30-
if (!check_ts || segment_entry->CheckVisible(timestamp)) {
29+
void BlockIndex::Insert(SegmentEntry *segment_entry, Txn *txn) {
30+
if (segment_entry->CheckVisible(txn)) {
3131
u32 segment_id = segment_entry->segment_id();
3232
segments_.emplace_back(segment_entry);
3333
segment_index_.emplace(segment_id, segment_entry);
@@ -36,13 +36,14 @@ void BlockIndex::Insert(SegmentEntry *segment_entry, TxnTimeStamp timestamp, boo
3636
{
3737
auto block_guard = segment_entry->GetBlocksGuard();
3838
for (const auto &block_entry : block_guard.block_entries_) {
39-
if (timestamp >= block_entry->commit_ts_) {
39+
if (block_entry->CheckVisible(txn)) {
4040
blocks_info.block_map_.emplace(block_entry->block_id(), block_entry.get());
4141
global_blocks_.emplace_back(GlobalBlockID{segment_id, block_entry->block_id()});
4242
}
4343
}
4444
}
45-
blocks_info.segment_offset_ = segment_entry->row_count(timestamp);
45+
TxnTimeStamp begin_ts = txn->BeginTS();
46+
blocks_info.segment_offset_ = segment_entry->row_count(begin_ts);
4647

4748
segment_block_index_.emplace(segment_id, std::move(blocks_info));
4849
}

src/storage/common/block_index.cppm

+2-1
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ namespace infinity {
2323

2424
struct BlockEntry;
2525
struct SegmentEntry;
26+
class Txn;
2627

2728
export struct BlockIndex {
2829
private:
@@ -32,7 +33,7 @@ private:
3233
};
3334

3435
public:
35-
void Insert(SegmentEntry *segment_entry, TxnTimeStamp timestamp, bool check_ts = true);
36+
void Insert(SegmentEntry *segment_entry, Txn *txn);
3637

3738
void Reserve(SizeT n);
3839

src/storage/invertedindex/column_index_reader.cpp

+4-2
Original file line numberDiff line numberDiff line change
@@ -132,7 +132,9 @@ void TableIndexReaderCache::UpdateKnownUpdateTs(TxnTimeStamp ts, std::shared_mut
132132
last_known_update_ts_ = std::max(last_known_update_ts_, ts);
133133
}
134134

135-
IndexReader TableIndexReaderCache::GetIndexReader(TransactionID txn_id, TxnTimeStamp begin_ts, TableEntry *self_table_entry_ptr) {
135+
IndexReader TableIndexReaderCache::GetIndexReader(Txn *txn, TableEntry *self_table_entry_ptr) {
136+
TxnTimeStamp begin_ts = txn->BeginTS();
137+
TransactionID txn_id = txn->TxnID();
136138
IndexReader result;
137139
result.session_pool_ = MakeShared<MemoryPool>();
138140
std::scoped_lock lock(mutex_);
@@ -176,7 +178,7 @@ IndexReader TableIndexReaderCache::GetIndexReader(TransactionID txn_id, TxnTimeS
176178
optionflag_t flag = index_full_text->flag_;
177179
String index_dir = *(table_index_entry->index_dir());
178180
Map<SegmentID, SharedPtr<SegmentIndexEntry>> index_by_segment =
179-
table_index_entry->GetIndexBySegmentSnapshot(self_table_entry_ptr, begin_ts);
181+
table_index_entry->GetIndexBySegmentSnapshot(self_table_entry_ptr, txn);
180182
column_index_reader->Open(flag, std::move(index_dir), std::move(index_by_segment));
181183
(*result.column_index_readers_)[column_id] = std::move(column_index_reader);
182184
}

src/storage/invertedindex/column_index_reader.cppm

+2-1
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ export module column_index_reader;
3131
namespace infinity {
3232
struct TableEntry;
3333
class BlockMaxTermDocIterator;
34+
class Txn;
3435

3536
export class ColumnIndexReader {
3637
public:
@@ -75,7 +76,7 @@ export class TableIndexReaderCache {
7576
public:
7677
void UpdateKnownUpdateTs(TxnTimeStamp ts, std::shared_mutex &segment_update_ts_mutex, TxnTimeStamp &segment_update_ts);
7778

78-
IndexReader GetIndexReader(TransactionID txn_id, TxnTimeStamp begin_ts, TableEntry *table_entry_ptr);
79+
IndexReader GetIndexReader(Txn *txn, TableEntry *table_entry_ptr);
7980

8081
private:
8182
std::mutex mutex_;

src/storage/invertedindex/search/query_builder.cpp

+2-3
Original file line numberDiff line numberDiff line change
@@ -37,9 +37,8 @@ import third_party;
3737

3838
namespace infinity {
3939

40-
QueryBuilder::QueryBuilder(TransactionID txn_id, TxnTimeStamp begin_ts, SharedPtr<BaseTableRef> &base_table_ref)
41-
: txn_id_(txn_id), begin_ts_(begin_ts), table_entry_(base_table_ref->table_entry_ptr_),
42-
index_reader_(table_entry_->GetFullTextIndexReader(txn_id_, begin_ts_)) {
40+
QueryBuilder::QueryBuilder(Txn *txn, SharedPtr<BaseTableRef> &base_table_ref)
41+
: table_entry_(base_table_ref->table_entry_ptr_), index_reader_(table_entry_->GetFullTextIndexReader(txn)) {
4342
u64 total_row_count = 0;
4443
for (SegmentEntry *segment_entry : base_table_ref->block_index_->segments_) {
4544
total_row_count += segment_entry->row_count();

src/storage/invertedindex/search/query_builder.cppm

+2-3
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ import base_table_ref;
2727

2828
namespace infinity {
2929

30+
class Txn;
3031
struct QueryNode;
3132
export struct FullTextQueryContext {
3233
UniquePtr<QueryNode> query_tree_;
@@ -37,7 +38,7 @@ class EarlyTerminateIterator;
3738

3839
export class QueryBuilder {
3940
public:
40-
QueryBuilder(TransactionID txn_id, TxnTimeStamp begin_ts, SharedPtr<BaseTableRef> &base_table_ref);
41+
QueryBuilder(Txn *txn, SharedPtr<BaseTableRef> &base_table_ref);
4142

4243
~QueryBuilder();
4344

@@ -50,8 +51,6 @@ public:
5051
inline float Score(RowID doc_id) { return scorer_.Score(doc_id); }
5152

5253
private:
53-
TransactionID txn_id_{};
54-
TxnTimeStamp begin_ts_{};
5554
TableEntry *table_entry_{nullptr};
5655
IndexReader index_reader_;
5756
Scorer scorer_;

src/storage/meta/catalog.cpp

+10-6
Original file line numberDiff line numberDiff line change
@@ -208,15 +208,17 @@ Tuple<SharedPtr<TableEntry>, Status> Catalog::DropTableByName(const String &db_n
208208
return db_entry->DropTable(table_name, conflict_type, txn_id, begin_ts, txn_mgr);
209209
}
210210

211-
Status Catalog::GetTables(const String &db_name, Vector<TableDetail> &output_table_array, TransactionID txn_id, TxnTimeStamp begin_ts) {
211+
Status Catalog::GetTables(const String &db_name, Vector<TableDetail> &output_table_array, Txn *txn) {
212+
TransactionID txn_id = txn->TxnID();
213+
TxnTimeStamp begin_ts = txn->BeginTS();
212214
// Check the db entries
213215
auto [db_entry, status] = this->GetDatabase(db_name, txn_id, begin_ts);
214216
if (!status.ok()) {
215217
// Error
216218
LOG_ERROR(fmt::format("Database: {} is invalid.", db_name));
217219
return status;
218220
}
219-
return db_entry->GetTablesDetail(txn_id, begin_ts, output_table_array);
221+
return db_entry->GetTablesDetail(txn, output_table_array);
220222
}
221223

222224
Tuple<TableEntry *, Status> Catalog::GetTableByName(const String &db_name, const String &table_name, TransactionID txn_id, TxnTimeStamp begin_ts) {
@@ -231,16 +233,17 @@ Tuple<TableEntry *, Status> Catalog::GetTableByName(const String &db_name, const
231233
return db_entry->GetTableCollection(table_name, txn_id, begin_ts);
232234
}
233235

234-
Tuple<SharedPtr<TableInfo>, Status>
235-
Catalog::GetTableInfo(const String &db_name, const String &table_name, TransactionID txn_id, TxnTimeStamp begin_ts) {
236+
Tuple<SharedPtr<TableInfo>, Status> Catalog::GetTableInfo(const String &db_name, const String &table_name, Txn *txn) {
237+
TransactionID txn_id = txn->TxnID();
238+
TxnTimeStamp begin_ts = txn->BeginTS();
236239
auto [db_entry, status] = this->GetDatabase(db_name, txn_id, begin_ts);
237240
if (!status.ok()) {
238241
// Error
239242
LOG_ERROR(fmt::format("Database: {} is invalid.", db_name));
240243
return {nullptr, status};
241244
}
242245

243-
return db_entry->GetTableInfo(table_name, txn_id, begin_ts);
246+
return db_entry->GetTableInfo(table_name, txn);
244247
}
245248

246249
Status Catalog::RemoveTableEntry(TableEntry *table_entry, TransactionID txn_id) {
@@ -659,7 +662,8 @@ void Catalog::LoadFromEntryDelta(TxnTimeStamp max_commit_ts, BufferManager *buff
659662
commit_ts,
660663
check_point_ts,
661664
check_point_row_count,
662-
buffer_mgr);
665+
buffer_mgr,
666+
txn_id);
663667

664668
if (merge_flag == MergeFlag::kNew) {
665669
if (!block_filter_binary_data.empty()) {

src/storage/meta/catalog.cppm

+2-2
Original file line numberDiff line numberDiff line change
@@ -159,11 +159,11 @@ public:
159159
TxnTimeStamp begin_ts,
160160
TxnManager *txn_mgr);
161161

162-
Status GetTables(const String &db_name, Vector<TableDetail> &output_table_array, TransactionID txn_id, TxnTimeStamp begin_ts);
162+
Status GetTables(const String &db_name, Vector<TableDetail> &output_table_array, Txn *txn);
163163

164164
Tuple<TableEntry *, Status> GetTableByName(const String &db_name, const String &table_name, TransactionID txn_id, TxnTimeStamp begin_ts);
165165

166-
Tuple<SharedPtr<TableInfo>, Status> GetTableInfo(const String &db_name, const String &table_name, TransactionID txn_id, TxnTimeStamp begin_ts);
166+
Tuple<SharedPtr<TableInfo>, Status> GetTableInfo(const String &db_name, const String &table_name, Txn *txn);
167167

168168
static Status RemoveTableEntry(TableEntry *table_entry, TransactionID txn_id);
169169

src/storage/meta/entry/base_entry.cpp

+39
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
// Copyright(C) 2023 InfiniFlow, Inc. All rights reserved.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// https://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
module;
16+
17+
module base_entry;
18+
19+
import stl;
20+
import txn_manager;
21+
import txn_state;
22+
import txn;
23+
24+
namespace infinity {
25+
26+
bool BaseEntry::CheckVisible(Txn *txn) const {
27+
TxnTimeStamp begin_ts = txn->BeginTS();
28+
TxnManager *txn_mgr = txn->txn_mgr();
29+
TransactionID txn_id = txn_id_.load();
30+
if (begin_ts < commit_ts_ || txn_id_ == txn->TxnID()) {
31+
return true;
32+
}
33+
if (txn_mgr == nullptr) { // when replay
34+
return false;
35+
}
36+
return txn_mgr->CheckIfCommitting(txn_id, begin_ts);
37+
}
38+
39+
} // namespace infinity

src/storage/meta/entry/base_entry.cppm

+4
Original file line numberDiff line numberDiff line change
@@ -20,10 +20,12 @@ export module base_entry;
2020

2121
import stl;
2222
import default_values;
23+
import txn;
2324

2425
namespace infinity {
2526

2627
class Catalog;
28+
class TxnManager;
2729

2830
export enum class EntryType : i8 {
2931
kDatabase,
@@ -64,6 +66,8 @@ public:
6466

6567
SharedPtr<String> encode_ptr() const { return encode_; }
6668

69+
virtual bool CheckVisible(Txn *txn) const;
70+
6771
public:
6872
atomic_u64 txn_id_{0};
6973
TxnTimeStamp begin_ts_{0};

src/storage/meta/entry/block_entry.cpp

+8-4
Original file line numberDiff line numberDiff line change
@@ -88,10 +88,13 @@ UniquePtr<BlockEntry> BlockEntry::NewReplayBlockEntry(const SegmentEntry *segmen
8888
TxnTimeStamp commit_ts,
8989
TxnTimeStamp check_point_ts,
9090
u16 checkpoint_row_count,
91-
BufferManager *buffer_mgr) {
91+
BufferManager *buffer_mgr,
92+
TransactionID txn_id) {
9293

9394
auto block_entry = MakeUnique<BlockEntry>(segment_entry, block_id, 0);
9495

96+
block_entry->txn_id_ = txn_id;
97+
9598
block_entry->row_count_ = row_count;
9699
block_entry->min_row_ts_ = min_row_ts;
97100
block_entry->max_row_ts_ = max_row_ts;
@@ -295,7 +298,6 @@ void BlockEntry::CommitBlock(TransactionID txn_id, TxnTimeStamp commit_ts) {
295298
}
296299
max_row_ts_ = commit_ts;
297300
if (!this->Committed()) {
298-
txn_id_ = txn_id;
299301
this->Commit(commit_ts);
300302
for (auto &column : columns_) {
301303
column->CommitColumn(txn_id, commit_ts);
@@ -405,6 +407,8 @@ UniquePtr<BlockEntry> BlockEntry::Deserialize(const nlohmann::json &block_entry_
405407
auto commit_ts = block_entry_json["commit_ts"];
406408
auto checkpoint_ts = block_entry_json["checkpoint_ts"];
407409

410+
auto txn_id = block_entry_json["txn_id"];
411+
408412
UniquePtr<BlockEntry> block_entry = BlockEntry::NewReplayBlockEntry(segment_entry,
409413
block_id,
410414
row_count,
@@ -414,10 +418,10 @@ UniquePtr<BlockEntry> BlockEntry::Deserialize(const nlohmann::json &block_entry_
414418
commit_ts,
415419
checkpoint_ts,
416420
row_count,
417-
buffer_mgr);
421+
buffer_mgr,
422+
txn_id);
418423

419424
block_entry->begin_ts_ = block_entry_json["begin_ts"];
420-
block_entry->txn_id_ = block_entry_json["txn_id"];
421425

422426
for (const auto &block_column_json : block_entry_json["columns"]) {
423427
block_entry->columns_.emplace_back(BlockColumnEntry::Deserialize(block_column_json, block_entry.get(), buffer_mgr));

0 commit comments

Comments
 (0)