Skip to content

Commit 97da405

Browse files
Fix: entry visibility while commiting.
1 parent 96370f1 commit 97da405

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

44 files changed

+230
-177
lines changed

src/executor/operator/physical_index_scan.cpp

+9-10
Original file line numberDiff line numberDiff line change
@@ -452,14 +452,13 @@ struct FilterResult {
452452
}
453453

454454
template <typename ColumnValueType>
455-
inline void
456-
ExecuteSingleRangeT(const FilterIntervalRangeT<ColumnValueType> &interval_range, SegmentIndexEntry &index_entry, const TxnTimeStamp ts) {
455+
inline void ExecuteSingleRangeT(const FilterIntervalRangeT<ColumnValueType> &interval_range, SegmentIndexEntry &index_entry, Txn *txn) {
457456
Vector<UniquePtr<TrunkReader<ColumnValueType>>> trunk_readers;
458457
Tuple<Vector<SharedPtr<ChunkIndexEntry>>, SharedPtr<SecondaryIndexInMem>> chunks_snapshot = index_entry.GetSecondaryIndexSnapshot();
459458
const u32 segment_row_count = SegmentRowCount();
460459
auto &[chunk_index_entries, memory_secondary_index] = chunks_snapshot;
461460
for (const auto &chunk_index_entry : chunk_index_entries) {
462-
if (chunk_index_entry->CheckVisible(ts)) {
461+
if (chunk_index_entry->CheckVisible(txn)) {
463462
trunk_readers.emplace_back(MakeUnique<TrunkReaderT<ColumnValueType>>(segment_row_count, chunk_index_entry));
464463
}
465464
}
@@ -492,7 +491,7 @@ struct FilterResult {
492491
inline void ExecuteSingleRange(const HashMap<ColumnID, TableIndexEntry *> &column_index_map,
493492
const FilterExecuteSingleRange &single_range,
494493
SegmentID segment_id,
495-
const TxnTimeStamp ts) {
494+
Txn *txn) {
496495
// step 1. check if range is empty
497496
if (single_range.IsEmpty()) {
498497
return SetEmptyResult();
@@ -504,7 +503,7 @@ struct FilterResult {
504503
// step 3. search index
505504
auto &interval_range_variant = single_range.GetIntervalRange();
506505
std::visit(Overload{[&]<typename ColumnValueType>(const FilterIntervalRangeT<ColumnValueType> &interval_range) {
507-
ExecuteSingleRangeT(interval_range, index_entry, ts);
506+
ExecuteSingleRangeT(interval_range, index_entry, txn);
508507
},
509508
[](const std::monostate &empty) {
510509
UnrecoverableError("FilterResult::ExecuteSingleRange(): class member interval_range_ not initialized!");
@@ -598,7 +597,7 @@ FilterResult SolveSecondaryIndexFilterInner(const Vector<FilterExecuteElem> &fil
598597
const SegmentID segment_id,
599598
const u32 segment_row_count,
600599
const u32 segment_row_actual_count,
601-
const TxnTimeStamp ts) {
600+
Txn *txn) {
602601
Vector<FilterResult> result_stack;
603602
// execute filter_execute_command_ (Reverse Polish notation)
604603
for (auto const &elem : filter_execute_command) {
@@ -628,7 +627,7 @@ FilterResult SolveSecondaryIndexFilterInner(const Vector<FilterExecuteElem> &fil
628627
},
629628
[&](const FilterExecuteSingleRange &single_range) {
630629
result_stack.emplace_back(segment_row_count, segment_row_actual_count);
631-
result_stack.back().ExecuteSingleRange(column_index_map, single_range, segment_id, ts);
630+
result_stack.back().ExecuteSingleRange(column_index_map, single_range, segment_id, txn);
632631
}},
633632
elem);
634633
}
@@ -644,13 +643,13 @@ std::variant<Vector<u32>, Bitmask> SolveSecondaryIndexFilter(const Vector<Filter
644643
const SegmentID segment_id,
645644
const u32 segment_row_count,
646645
const u32 segment_row_actual_count,
647-
const TxnTimeStamp ts) {
646+
Txn *txn) {
648647
if (filter_execute_command.empty()) {
649648
// return all true
650649
return std::variant<Vector<u32>, Bitmask>(std::in_place_type<Bitmask>);
651650
}
652651
auto result =
653-
SolveSecondaryIndexFilterInner(filter_execute_command, column_index_map, segment_id, segment_row_count, segment_row_actual_count, ts);
652+
SolveSecondaryIndexFilterInner(filter_execute_command, column_index_map, segment_id, segment_row_count, segment_row_actual_count, txn);
654653
return std::move(result.selected_rows_);
655654
}
656655

@@ -709,7 +708,7 @@ void PhysicalIndexScan::ExecuteInternal(QueryContext *query_context, IndexScanOp
709708
DeleteFilter delete_filter(segment_entry, begin_ts, segment_entry->row_count(begin_ts));
710709
// output
711710
const auto result =
712-
SolveSecondaryIndexFilterInner(filter_execute_command_, column_index_map_, segment_id, segment_row_count, segment_row_actual_count, begin_ts);
711+
SolveSecondaryIndexFilterInner(filter_execute_command_, column_index_map_, segment_id, segment_row_count, segment_row_actual_count, txn);
713712
result.Output(output_data_blocks, segment_id, delete_filter);
714713

715714
LOG_TRACE(fmt::format("IndexScan: job number: {}, segment_ids.size(): {}, finished", next_idx, segment_ids.size()));

src/executor/operator/physical_index_scan.cppm

+3-1
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,8 @@ import bitmask;
3939

4040
namespace infinity {
4141

42+
class Txn;
43+
4244
// for int range filter, x > n is equivalent to x >= n + 1
4345
// for float range filter, x > f is equivalent to x >= std::nextafter(f, INFINITY)
4446
// we can use this to simplify the filter
@@ -110,6 +112,6 @@ export std::variant<Vector<u32>, Bitmask> SolveSecondaryIndexFilter(const Vector
110112
const SegmentID segment_id,
111113
const u32 segment_row_count,
112114
const u32 segment_row_actual_count,
113-
const TxnTimeStamp ts);
115+
Txn *txn);
114116

115117
} // namespace infinity

src/executor/operator/physical_knn_scan.cpp

+5-4
Original file line numberDiff line numberDiff line change
@@ -241,9 +241,10 @@ SizeT PhysicalKnnScan::BlockEntryCount() const { return base_table_ref_->block_i
241241

242242
template <typename DataType, template <typename, typename> typename C>
243243
void PhysicalKnnScan::ExecuteInternal(QueryContext *query_context, KnnScanOperatorState *operator_state) {
244-
TxnTimeStamp begin_ts = query_context->GetTxn()->BeginTS();
244+
Txn *txn = query_context->GetTxn();
245+
TxnTimeStamp begin_ts = txn->BeginTS();
245246

246-
if (!common_query_filter_->TryFinishBuild(begin_ts, query_context->GetTxn()->buffer_mgr())) {
247+
if (!common_query_filter_->TryFinishBuild(txn)) {
247248
// not ready, abort and wait for next time
248249
return;
249250
}
@@ -484,7 +485,7 @@ void PhysicalKnnScan::ExecuteInternal(QueryContext *query_context, KnnScanOperat
484485
if (block_entry == nullptr) {
485486
UnrecoverableError(
486487
fmt::format("Cannot find segment id: {}, block id: {}, index chunk is {}", segment_id, block_id, chunk_id));
487-
}
488+
} // this is for debug
488489
}
489490
merge_heap->Search(0, d_ptr.get(), row_ids.get(), result_n);
490491
}
@@ -493,7 +494,7 @@ void PhysicalKnnScan::ExecuteInternal(QueryContext *query_context, KnnScanOperat
493494
auto [chunk_index_entries, memory_index_entry] = segment_index_entry->GetHnswIndexSnapshot();
494495
int i = 0;
495496
for (auto &chunk_index_entry : chunk_index_entries) {
496-
if (chunk_index_entry->CheckVisible(begin_ts)) {
497+
if (chunk_index_entry->CheckVisible(txn)) {
497498
BufferHandle index_handle = chunk_index_entry->GetIndex();
498499
hnsw_search(index_handle, false, i++);
499500
}

src/executor/operator/physical_match.cpp

+5-5
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ module;
2323
module physical_match;
2424

2525
import stl;
26-
26+
import txn;
2727
import query_context;
2828
import operator_state;
2929
import physical_operator;
@@ -556,9 +556,8 @@ bool PhysicalMatch::ExecuteInnerHomebrewed(QueryContext *query_context, Operator
556556
auto execute_start_time = std::chrono::high_resolution_clock::now();
557557
// 1. build QueryNode tree
558558
// 1.1 populate column2analyzer
559-
TransactionID txn_id = query_context->GetTxn()->TxnID();
560-
TxnTimeStamp begin_ts = query_context->GetTxn()->BeginTS();
561-
QueryBuilder query_builder(txn_id, begin_ts, base_table_ref_);
559+
Txn *txn = query_context->GetTxn();
560+
QueryBuilder query_builder(txn, base_table_ref_);
562561
auto finish_init_query_builder_time = std::chrono::high_resolution_clock::now();
563562
TimeDurationType query_builder_init_duration = finish_init_query_builder_time - execute_start_time;
564563
LOG_TRACE(fmt::format("PhysicalMatch Part 0.1: Init QueryBuilder time: {} ms", query_builder_init_duration.count()));
@@ -849,7 +848,8 @@ bool PhysicalMatch::Execute(QueryContext *query_context, OperatorState *operator
849848
auto start_time = std::chrono::high_resolution_clock::now();
850849
assert(common_query_filter_);
851850
{
852-
bool try_result = common_query_filter_->TryFinishBuild(query_context->GetTxn()->BeginTS(), query_context->GetTxn()->buffer_mgr());
851+
Txn *txn = query_context->GetTxn();
852+
bool try_result = common_query_filter_->TryFinishBuild(txn);
853853
auto finish_filter_time = std::chrono::high_resolution_clock::now();
854854
std::chrono::duration<float, std::milli> filter_duration = finish_filter_time - start_time;
855855
LOG_TRACE(fmt::format("PhysicalMatch Prepare: Filter time: {} ms", filter_duration.count()));

src/planner/bound/base_table_ref.cppm

+3-3
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ export module base_table_ref;
2121
import stl;
2222
import table_ref;
2323
import table_entry;
24-
24+
import txn;
2525
import table_function;
2626
import block_index;
2727
import internal_types;
@@ -48,8 +48,8 @@ public:
4848
explicit BaseTableRef(TableEntry *table_entry, SharedPtr<BlockIndex> block_index)
4949
: TableRef(TableRefType::kTable, ""), table_entry_ptr_(table_entry), block_index_(std::move(block_index)) {}
5050

51-
static SharedPtr<BaseTableRef> FakeTableRef(TableEntry *table_entry, TxnTimeStamp ts) {
52-
SharedPtr<BlockIndex> block_index = table_entry->GetBlockIndex(ts);
51+
static SharedPtr<BaseTableRef> FakeTableRef(TableEntry *table_entry, Txn *txn) {
52+
SharedPtr<BlockIndex> block_index = table_entry->GetBlockIndex(txn);
5353
return MakeShared<BaseTableRef>(table_entry, std::move(block_index));
5454
}
5555

src/planner/query_binder.cpp

+3-3
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,7 @@ import data_type;
7171
import logical_type;
7272
import base_entry;
7373
import view_entry;
74+
import txn;
7475

7576
namespace infinity {
7677

@@ -425,10 +426,9 @@ SharedPtr<BaseTableRef> QueryBinder::BuildBaseTable(QueryContext *query_context,
425426
columns.emplace_back(idx);
426427
}
427428

428-
// TransactionID txn_id = query_context->GetTxn()->TxnID();
429-
TxnTimeStamp begin_ts = query_context->GetTxn()->BeginTS();
429+
Txn *txn = query_context->GetTxn();
430430

431-
SharedPtr<BlockIndex> block_index = table_entry->GetBlockIndex(begin_ts);
431+
SharedPtr<BlockIndex> block_index = table_entry->GetBlockIndex(txn);
432432

433433
u64 table_index = bind_context_ptr_->GenerateTableIndex();
434434
auto table_ref = MakeShared<BaseTableRef>(table_entry, std::move(columns), block_index, alias, table_index, names_ptr, types_ptr);

src/storage/bg_task/compact_segments_task.cpp

+1-1
Original file line numberDiff line numberDiff line change
@@ -148,7 +148,7 @@ void CompactSegmentsTask::CompactSegments(CompactSegmentsTaskState &state) {
148148
}
149149

150150
auto new_segment = CompactSegmentsToOne(state, to_compact_segments);
151-
block_index->Insert(new_segment.get(), UNCOMMIT_TS, false);
151+
block_index->Insert(new_segment.get(), txn_);
152152

153153
segment_data.emplace_back(new_segment, std::move(to_compact_segments));
154154
old_segments.insert(old_segments.end(), to_compact_segments.begin(), to_compact_segments.end());

src/storage/common/block_index.cpp

+14-10
Original file line numberDiff line numberDiff line change
@@ -14,32 +14,36 @@
1414

1515
module;
1616

17+
#include <vector>
18+
1719
module block_index;
1820

1921
import stl;
2022
import segment_entry;
2123
import global_block_id;
2224
import block_iter;
23-
import segment_iter;
25+
import txn;
2426

2527
namespace infinity {
2628

27-
void BlockIndex::Insert(SegmentEntry *segment_entry, TxnTimeStamp timestamp, bool check_ts) {
28-
if (!check_ts || segment_entry->CheckVisible(timestamp)) {
29+
void BlockIndex::Insert(SegmentEntry *segment_entry, Txn *txn) {
30+
if (segment_entry->CheckVisible(txn)) {
2931
u32 segment_id = segment_entry->segment_id();
3032
segments_.emplace_back(segment_entry);
3133
segment_index_.emplace(segment_id, segment_entry);
3234
BlocksInfo blocks_info;
3335

34-
auto block_entry_iter = BlockEntryIter(segment_entry);
35-
for (auto *block_entry = block_entry_iter.Next(); block_entry != nullptr; block_entry = block_entry_iter.Next()) {
36-
if (timestamp >= block_entry->min_row_ts()) {
37-
blocks_info.block_map_.emplace(block_entry->block_id(), block_entry);
38-
global_blocks_.emplace_back(GlobalBlockID{segment_id, block_entry->block_id()});
36+
{
37+
auto block_guard = segment_entry->GetBlocksGuard();
38+
for (const auto &block_entry : block_guard.block_entries_) {
39+
if (block_entry->CheckVisible(txn)) {
40+
blocks_info.block_map_.emplace(block_entry->block_id(), block_entry.get());
41+
global_blocks_.emplace_back(GlobalBlockID{segment_id, block_entry->block_id()});
42+
}
3943
}
4044
}
41-
blocks_info.segment_offset_ = segment_entry->row_count(timestamp);
42-
// blocks_info.segment_offset_ = segment_entry->row_count(); // use false row count to pass benchmark
45+
TxnTimeStamp begin_ts = txn->BeginTS();
46+
blocks_info.segment_offset_ = segment_entry->row_count(begin_ts);
4347

4448
segment_block_index_.emplace(segment_id, std::move(blocks_info));
4549
}

src/storage/common/block_index.cppm

+2-1
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ namespace infinity {
2323

2424
struct BlockEntry;
2525
struct SegmentEntry;
26+
class Txn;
2627

2728
export struct BlockIndex {
2829
private:
@@ -32,7 +33,7 @@ private:
3233
};
3334

3435
public:
35-
void Insert(SegmentEntry *segment_entry, TxnTimeStamp timestamp, bool check_ts = true);
36+
void Insert(SegmentEntry *segment_entry, Txn *txn);
3637

3738
void Reserve(SizeT n);
3839

src/storage/invertedindex/column_index_reader.cpp

+4-2
Original file line numberDiff line numberDiff line change
@@ -132,7 +132,9 @@ void TableIndexReaderCache::UpdateKnownUpdateTs(TxnTimeStamp ts, std::shared_mut
132132
last_known_update_ts_ = std::max(last_known_update_ts_, ts);
133133
}
134134

135-
IndexReader TableIndexReaderCache::GetIndexReader(TransactionID txn_id, TxnTimeStamp begin_ts, TableEntry *self_table_entry_ptr) {
135+
IndexReader TableIndexReaderCache::GetIndexReader(Txn *txn, TableEntry *self_table_entry_ptr) {
136+
TxnTimeStamp begin_ts = txn->BeginTS();
137+
TransactionID txn_id = txn->TxnID();
136138
IndexReader result;
137139
result.session_pool_ = MakeShared<MemoryPool>();
138140
std::scoped_lock lock(mutex_);
@@ -176,7 +178,7 @@ IndexReader TableIndexReaderCache::GetIndexReader(TransactionID txn_id, TxnTimeS
176178
optionflag_t flag = index_full_text->flag_;
177179
String index_dir = *(table_index_entry->index_dir());
178180
Map<SegmentID, SharedPtr<SegmentIndexEntry>> index_by_segment =
179-
table_index_entry->GetIndexBySegmentSnapshot(self_table_entry_ptr, begin_ts);
181+
table_index_entry->GetIndexBySegmentSnapshot(self_table_entry_ptr, txn);
180182
column_index_reader->Open(flag, std::move(index_dir), std::move(index_by_segment));
181183
(*result.column_index_readers_)[column_id] = std::move(column_index_reader);
182184
}

src/storage/invertedindex/column_index_reader.cppm

+2-1
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ export module column_index_reader;
3131
namespace infinity {
3232
struct TableEntry;
3333
class BlockMaxTermDocIterator;
34+
class Txn;
3435

3536
export class ColumnIndexReader {
3637
public:
@@ -75,7 +76,7 @@ export class TableIndexReaderCache {
7576
public:
7677
void UpdateKnownUpdateTs(TxnTimeStamp ts, std::shared_mutex &segment_update_ts_mutex, TxnTimeStamp &segment_update_ts);
7778

78-
IndexReader GetIndexReader(TransactionID txn_id, TxnTimeStamp begin_ts, TableEntry *table_entry_ptr);
79+
IndexReader GetIndexReader(Txn *txn, TableEntry *table_entry_ptr);
7980

8081
private:
8182
std::mutex mutex_;

src/storage/invertedindex/search/query_builder.cpp

+2-3
Original file line numberDiff line numberDiff line change
@@ -37,9 +37,8 @@ import third_party;
3737

3838
namespace infinity {
3939

40-
QueryBuilder::QueryBuilder(TransactionID txn_id, TxnTimeStamp begin_ts, SharedPtr<BaseTableRef> &base_table_ref)
41-
: txn_id_(txn_id), begin_ts_(begin_ts), table_entry_(base_table_ref->table_entry_ptr_),
42-
index_reader_(table_entry_->GetFullTextIndexReader(txn_id_, begin_ts_)) {
40+
QueryBuilder::QueryBuilder(Txn *txn, SharedPtr<BaseTableRef> &base_table_ref)
41+
: table_entry_(base_table_ref->table_entry_ptr_), index_reader_(table_entry_->GetFullTextIndexReader(txn)) {
4342
u64 total_row_count = 0;
4443
for (SegmentEntry *segment_entry : base_table_ref->block_index_->segments_) {
4544
total_row_count += segment_entry->row_count();

src/storage/invertedindex/search/query_builder.cppm

+2-3
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ import base_table_ref;
2727

2828
namespace infinity {
2929

30+
class Txn;
3031
struct QueryNode;
3132
export struct FullTextQueryContext {
3233
UniquePtr<QueryNode> query_tree_;
@@ -37,7 +38,7 @@ class EarlyTerminateIterator;
3738

3839
export class QueryBuilder {
3940
public:
40-
QueryBuilder(TransactionID txn_id, TxnTimeStamp begin_ts, SharedPtr<BaseTableRef> &base_table_ref);
41+
QueryBuilder(Txn *txn, SharedPtr<BaseTableRef> &base_table_ref);
4142

4243
~QueryBuilder();
4344

@@ -50,8 +51,6 @@ public:
5051
inline float Score(RowID doc_id) { return scorer_.Score(doc_id); }
5152

5253
private:
53-
TransactionID txn_id_{};
54-
TxnTimeStamp begin_ts_{};
5554
TableEntry *table_entry_{nullptr};
5655
IndexReader index_reader_;
5756
Scorer scorer_;

src/storage/meta/catalog.cpp

+10-6
Original file line numberDiff line numberDiff line change
@@ -208,15 +208,17 @@ Tuple<SharedPtr<TableEntry>, Status> Catalog::DropTableByName(const String &db_n
208208
return db_entry->DropTable(table_name, conflict_type, txn_id, begin_ts, txn_mgr);
209209
}
210210

211-
Status Catalog::GetTables(const String &db_name, Vector<TableDetail> &output_table_array, TransactionID txn_id, TxnTimeStamp begin_ts) {
211+
Status Catalog::GetTables(const String &db_name, Vector<TableDetail> &output_table_array, Txn *txn) {
212+
TransactionID txn_id = txn->TxnID();
213+
TxnTimeStamp begin_ts = txn->BeginTS();
212214
// Check the db entries
213215
auto [db_entry, status] = this->GetDatabase(db_name, txn_id, begin_ts);
214216
if (!status.ok()) {
215217
// Error
216218
LOG_ERROR(fmt::format("Database: {} is invalid.", db_name));
217219
return status;
218220
}
219-
return db_entry->GetTablesDetail(txn_id, begin_ts, output_table_array);
221+
return db_entry->GetTablesDetail(txn, output_table_array);
220222
}
221223

222224
Tuple<TableEntry *, Status> Catalog::GetTableByName(const String &db_name, const String &table_name, TransactionID txn_id, TxnTimeStamp begin_ts) {
@@ -231,16 +233,17 @@ Tuple<TableEntry *, Status> Catalog::GetTableByName(const String &db_name, const
231233
return db_entry->GetTableCollection(table_name, txn_id, begin_ts);
232234
}
233235

234-
Tuple<SharedPtr<TableInfo>, Status>
235-
Catalog::GetTableInfo(const String &db_name, const String &table_name, TransactionID txn_id, TxnTimeStamp begin_ts) {
236+
Tuple<SharedPtr<TableInfo>, Status> Catalog::GetTableInfo(const String &db_name, const String &table_name, Txn *txn) {
237+
TransactionID txn_id = txn->TxnID();
238+
TxnTimeStamp begin_ts = txn->BeginTS();
236239
auto [db_entry, status] = this->GetDatabase(db_name, txn_id, begin_ts);
237240
if (!status.ok()) {
238241
// Error
239242
LOG_ERROR(fmt::format("Database: {} is invalid.", db_name));
240243
return {nullptr, status};
241244
}
242245

243-
return db_entry->GetTableInfo(table_name, txn_id, begin_ts);
246+
return db_entry->GetTableInfo(table_name, txn);
244247
}
245248

246249
Status Catalog::RemoveTableEntry(TableEntry *table_entry, TransactionID txn_id) {
@@ -659,7 +662,8 @@ void Catalog::LoadFromEntryDelta(TxnTimeStamp max_commit_ts, BufferManager *buff
659662
commit_ts,
660663
check_point_ts,
661664
check_point_row_count,
662-
buffer_mgr);
665+
buffer_mgr,
666+
txn_id);
663667

664668
if (merge_flag == MergeFlag::kNew) {
665669
if (!block_filter_binary_data.empty()) {

0 commit comments

Comments
 (0)