diff --git a/src/executor/operator/physical_index_scan.cpp b/src/executor/operator/physical_index_scan.cpp index dabab0e1af..0e47700806 100644 --- a/src/executor/operator/physical_index_scan.cpp +++ b/src/executor/operator/physical_index_scan.cpp @@ -452,14 +452,13 @@ struct FilterResult { } template - inline void - ExecuteSingleRangeT(const FilterIntervalRangeT &interval_range, SegmentIndexEntry &index_entry, const TxnTimeStamp ts) { + inline void ExecuteSingleRangeT(const FilterIntervalRangeT &interval_range, SegmentIndexEntry &index_entry, Txn *txn) { Vector>> trunk_readers; Tuple>, SharedPtr> chunks_snapshot = index_entry.GetSecondaryIndexSnapshot(); const u32 segment_row_count = SegmentRowCount(); auto &[chunk_index_entries, memory_secondary_index] = chunks_snapshot; for (const auto &chunk_index_entry : chunk_index_entries) { - if (chunk_index_entry->CheckVisible(ts)) { + if (chunk_index_entry->CheckVisible(txn)) { trunk_readers.emplace_back(MakeUnique>(segment_row_count, chunk_index_entry)); } } @@ -492,7 +491,7 @@ struct FilterResult { inline void ExecuteSingleRange(const HashMap &column_index_map, const FilterExecuteSingleRange &single_range, SegmentID segment_id, - const TxnTimeStamp ts) { + Txn *txn) { // step 1. check if range is empty if (single_range.IsEmpty()) { return SetEmptyResult(); @@ -504,7 +503,7 @@ struct FilterResult { // step 3. search index auto &interval_range_variant = single_range.GetIntervalRange(); std::visit(Overload{[&](const FilterIntervalRangeT &interval_range) { - ExecuteSingleRangeT(interval_range, index_entry, ts); + ExecuteSingleRangeT(interval_range, index_entry, txn); }, [](const std::monostate &empty) { UnrecoverableError("FilterResult::ExecuteSingleRange(): class member interval_range_ not initialized!"); @@ -598,7 +597,7 @@ FilterResult SolveSecondaryIndexFilterInner(const Vector &fil const SegmentID segment_id, const u32 segment_row_count, const u32 segment_row_actual_count, - const TxnTimeStamp ts) { + Txn *txn) { Vector result_stack; // execute filter_execute_command_ (Reverse Polish notation) for (auto const &elem : filter_execute_command) { @@ -628,7 +627,7 @@ FilterResult SolveSecondaryIndexFilterInner(const Vector &fil }, [&](const FilterExecuteSingleRange &single_range) { result_stack.emplace_back(segment_row_count, segment_row_actual_count); - result_stack.back().ExecuteSingleRange(column_index_map, single_range, segment_id, ts); + result_stack.back().ExecuteSingleRange(column_index_map, single_range, segment_id, txn); }}, elem); } @@ -644,13 +643,13 @@ std::variant, Bitmask> SolveSecondaryIndexFilter(const Vector, Bitmask>(std::in_place_type); } auto result = - SolveSecondaryIndexFilterInner(filter_execute_command, column_index_map, segment_id, segment_row_count, segment_row_actual_count, ts); + SolveSecondaryIndexFilterInner(filter_execute_command, column_index_map, segment_id, segment_row_count, segment_row_actual_count, txn); return std::move(result.selected_rows_); } @@ -709,7 +708,7 @@ void PhysicalIndexScan::ExecuteInternal(QueryContext *query_context, IndexScanOp DeleteFilter delete_filter(segment_entry, begin_ts, segment_entry->row_count(begin_ts)); // output const auto result = - SolveSecondaryIndexFilterInner(filter_execute_command_, column_index_map_, segment_id, segment_row_count, segment_row_actual_count, begin_ts); + SolveSecondaryIndexFilterInner(filter_execute_command_, column_index_map_, segment_id, segment_row_count, segment_row_actual_count, txn); result.Output(output_data_blocks, segment_id, delete_filter); LOG_TRACE(fmt::format("IndexScan: job number: {}, segment_ids.size(): {}, finished", next_idx, segment_ids.size())); diff --git a/src/executor/operator/physical_index_scan.cppm b/src/executor/operator/physical_index_scan.cppm index d96c80a702..a79ca4b827 100644 --- a/src/executor/operator/physical_index_scan.cppm +++ b/src/executor/operator/physical_index_scan.cppm @@ -39,6 +39,8 @@ import bitmask; namespace infinity { +class Txn; + // for int range filter, x > n is equivalent to x >= n + 1 // for float range filter, x > f is equivalent to x >= std::nextafter(f, INFINITY) // we can use this to simplify the filter @@ -110,6 +112,6 @@ export std::variant, Bitmask> SolveSecondaryIndexFilter(const Vector const SegmentID segment_id, const u32 segment_row_count, const u32 segment_row_actual_count, - const TxnTimeStamp ts); + Txn *txn); } // namespace infinity diff --git a/src/executor/operator/physical_knn_scan.cpp b/src/executor/operator/physical_knn_scan.cpp index d39fca356b..39b52f1144 100644 --- a/src/executor/operator/physical_knn_scan.cpp +++ b/src/executor/operator/physical_knn_scan.cpp @@ -241,9 +241,10 @@ SizeT PhysicalKnnScan::BlockEntryCount() const { return base_table_ref_->block_i template typename C> void PhysicalKnnScan::ExecuteInternal(QueryContext *query_context, KnnScanOperatorState *operator_state) { - TxnTimeStamp begin_ts = query_context->GetTxn()->BeginTS(); + Txn *txn = query_context->GetTxn(); + TxnTimeStamp begin_ts = txn->BeginTS(); - if (!common_query_filter_->TryFinishBuild(begin_ts, query_context->GetTxn()->buffer_mgr())) { + if (!common_query_filter_->TryFinishBuild(txn)) { // not ready, abort and wait for next time return; } @@ -484,7 +485,7 @@ void PhysicalKnnScan::ExecuteInternal(QueryContext *query_context, KnnScanOperat if (block_entry == nullptr) { UnrecoverableError( fmt::format("Cannot find segment id: {}, block id: {}, index chunk is {}", segment_id, block_id, chunk_id)); - } + } // this is for debug } merge_heap->Search(0, d_ptr.get(), row_ids.get(), result_n); } @@ -493,7 +494,7 @@ void PhysicalKnnScan::ExecuteInternal(QueryContext *query_context, KnnScanOperat auto [chunk_index_entries, memory_index_entry] = segment_index_entry->GetHnswIndexSnapshot(); int i = 0; for (auto &chunk_index_entry : chunk_index_entries) { - if (chunk_index_entry->CheckVisible(begin_ts)) { + if (chunk_index_entry->CheckVisible(txn)) { BufferHandle index_handle = chunk_index_entry->GetIndex(); hnsw_search(index_handle, false, i++); } diff --git a/src/executor/operator/physical_match.cpp b/src/executor/operator/physical_match.cpp index 99ac922774..e891589d9d 100644 --- a/src/executor/operator/physical_match.cpp +++ b/src/executor/operator/physical_match.cpp @@ -23,7 +23,7 @@ module; module physical_match; import stl; - +import txn; import query_context; import operator_state; import physical_operator; @@ -517,9 +517,8 @@ bool PhysicalMatch::ExecuteInnerHomebrewed(QueryContext *query_context, Operator auto execute_start_time = std::chrono::high_resolution_clock::now(); // 1. build QueryNode tree // 1.1 populate column2analyzer - TransactionID txn_id = query_context->GetTxn()->TxnID(); - TxnTimeStamp begin_ts = query_context->GetTxn()->BeginTS(); - QueryBuilder query_builder(txn_id, begin_ts, base_table_ref_); + Txn *txn = query_context->GetTxn(); + QueryBuilder query_builder(txn, base_table_ref_); auto finish_init_query_builder_time = std::chrono::high_resolution_clock::now(); TimeDurationType query_builder_init_duration = finish_init_query_builder_time - execute_start_time; LOG_TRACE(fmt::format("PhysicalMatch Part 0.1: Init QueryBuilder time: {} ms", query_builder_init_duration.count())); @@ -805,7 +804,8 @@ bool PhysicalMatch::Execute(QueryContext *query_context, OperatorState *operator auto start_time = std::chrono::high_resolution_clock::now(); assert(common_query_filter_); { - bool try_result = common_query_filter_->TryFinishBuild(query_context->GetTxn()->BeginTS(), query_context->GetTxn()->buffer_mgr()); + Txn *txn = query_context->GetTxn(); + bool try_result = common_query_filter_->TryFinishBuild(txn); auto finish_filter_time = std::chrono::high_resolution_clock::now(); std::chrono::duration filter_duration = finish_filter_time - start_time; LOG_TRACE(fmt::format("PhysicalMatch Prepare: Filter time: {} ms", filter_duration.count())); diff --git a/src/planner/bound/base_table_ref.cppm b/src/planner/bound/base_table_ref.cppm index 0b3771c780..c3cdd6de12 100644 --- a/src/planner/bound/base_table_ref.cppm +++ b/src/planner/bound/base_table_ref.cppm @@ -21,7 +21,7 @@ export module base_table_ref; import stl; import table_ref; import table_entry; - +import txn; import table_function; import block_index; import internal_types; @@ -48,8 +48,8 @@ public: explicit BaseTableRef(TableEntry *table_entry, SharedPtr block_index) : TableRef(TableRefType::kTable, ""), table_entry_ptr_(table_entry), block_index_(std::move(block_index)) {} - static SharedPtr FakeTableRef(TableEntry *table_entry, TxnTimeStamp ts) { - SharedPtr block_index = table_entry->GetBlockIndex(ts); + static SharedPtr FakeTableRef(TableEntry *table_entry, Txn *txn) { + SharedPtr block_index = table_entry->GetBlockIndex(txn); return MakeShared(table_entry, std::move(block_index)); } diff --git a/src/planner/query_binder.cpp b/src/planner/query_binder.cpp index 0cc3aa109b..23a879bd82 100644 --- a/src/planner/query_binder.cpp +++ b/src/planner/query_binder.cpp @@ -71,6 +71,7 @@ import data_type; import logical_type; import base_entry; import view_entry; +import txn; namespace infinity { @@ -425,10 +426,9 @@ SharedPtr QueryBinder::BuildBaseTable(QueryContext *query_context, columns.emplace_back(idx); } -// TransactionID txn_id = query_context->GetTxn()->TxnID(); - TxnTimeStamp begin_ts = query_context->GetTxn()->BeginTS(); + Txn *txn = query_context->GetTxn(); - SharedPtr block_index = table_entry->GetBlockIndex(begin_ts); + SharedPtr block_index = table_entry->GetBlockIndex(txn); u64 table_index = bind_context_ptr_->GenerateTableIndex(); auto table_ref = MakeShared(table_entry, std::move(columns), block_index, alias, table_index, names_ptr, types_ptr); diff --git a/src/storage/bg_task/compact_segments_task.cpp b/src/storage/bg_task/compact_segments_task.cpp index 9f2a8d6aa4..5626d93237 100644 --- a/src/storage/bg_task/compact_segments_task.cpp +++ b/src/storage/bg_task/compact_segments_task.cpp @@ -148,7 +148,7 @@ void CompactSegmentsTask::CompactSegments(CompactSegmentsTaskState &state) { } auto new_segment = CompactSegmentsToOne(state, to_compact_segments); - block_index->Insert(new_segment.get(), UNCOMMIT_TS, false); + block_index->Insert(new_segment.get(), txn_); segment_data.emplace_back(new_segment, std::move(to_compact_segments)); old_segments.insert(old_segments.end(), to_compact_segments.begin(), to_compact_segments.end()); diff --git a/src/storage/common/block_index.cpp b/src/storage/common/block_index.cpp index fc7a43d88d..4f6a1b5870 100644 --- a/src/storage/common/block_index.cpp +++ b/src/storage/common/block_index.cpp @@ -14,32 +14,36 @@ module; +#include + module block_index; import stl; import segment_entry; import global_block_id; import block_iter; -import segment_iter; +import txn; namespace infinity { -void BlockIndex::Insert(SegmentEntry *segment_entry, TxnTimeStamp timestamp, bool check_ts) { - if (!check_ts || segment_entry->CheckVisible(timestamp)) { +void BlockIndex::Insert(SegmentEntry *segment_entry, Txn *txn) { + if (segment_entry->CheckVisible(txn)) { u32 segment_id = segment_entry->segment_id(); segments_.emplace_back(segment_entry); segment_index_.emplace(segment_id, segment_entry); BlocksInfo blocks_info; - auto block_entry_iter = BlockEntryIter(segment_entry); - for (auto *block_entry = block_entry_iter.Next(); block_entry != nullptr; block_entry = block_entry_iter.Next()) { - if (timestamp >= block_entry->min_row_ts()) { - blocks_info.block_map_.emplace(block_entry->block_id(), block_entry); - global_blocks_.emplace_back(GlobalBlockID{segment_id, block_entry->block_id()}); + { + auto block_guard = segment_entry->GetBlocksGuard(); + for (const auto &block_entry : block_guard.block_entries_) { + if (block_entry->CheckVisible(txn)) { + blocks_info.block_map_.emplace(block_entry->block_id(), block_entry.get()); + global_blocks_.emplace_back(GlobalBlockID{segment_id, block_entry->block_id()}); + } } } - blocks_info.segment_offset_ = segment_entry->row_count(timestamp); - // blocks_info.segment_offset_ = segment_entry->row_count(); // use false row count to pass benchmark + TxnTimeStamp begin_ts = txn->BeginTS(); + blocks_info.segment_offset_ = segment_entry->row_count(begin_ts); segment_block_index_.emplace(segment_id, std::move(blocks_info)); } diff --git a/src/storage/common/block_index.cppm b/src/storage/common/block_index.cppm index 61675a7354..11a7b7b9ad 100644 --- a/src/storage/common/block_index.cppm +++ b/src/storage/common/block_index.cppm @@ -23,6 +23,7 @@ namespace infinity { struct BlockEntry; struct SegmentEntry; +class Txn; export struct BlockIndex { private: @@ -32,7 +33,7 @@ private: }; public: - void Insert(SegmentEntry *segment_entry, TxnTimeStamp timestamp, bool check_ts = true); + void Insert(SegmentEntry *segment_entry, Txn *txn); void Reserve(SizeT n); diff --git a/src/storage/invertedindex/column_index_reader.cpp b/src/storage/invertedindex/column_index_reader.cpp index 3d10690d54..3f81eafc57 100644 --- a/src/storage/invertedindex/column_index_reader.cpp +++ b/src/storage/invertedindex/column_index_reader.cpp @@ -132,7 +132,9 @@ void TableIndexReaderCache::UpdateKnownUpdateTs(TxnTimeStamp ts, std::shared_mut last_known_update_ts_ = std::max(last_known_update_ts_, ts); } -IndexReader TableIndexReaderCache::GetIndexReader(TransactionID txn_id, TxnTimeStamp begin_ts, TableEntry *self_table_entry_ptr) { +IndexReader TableIndexReaderCache::GetIndexReader(Txn *txn, TableEntry *self_table_entry_ptr) { + TxnTimeStamp begin_ts = txn->BeginTS(); + TransactionID txn_id = txn->TxnID(); IndexReader result; result.session_pool_ = MakeShared(); std::scoped_lock lock(mutex_); @@ -176,7 +178,7 @@ IndexReader TableIndexReaderCache::GetIndexReader(TransactionID txn_id, TxnTimeS optionflag_t flag = index_full_text->flag_; String index_dir = *(table_index_entry->index_dir()); Map> index_by_segment = - table_index_entry->GetIndexBySegmentSnapshot(self_table_entry_ptr, begin_ts); + table_index_entry->GetIndexBySegmentSnapshot(self_table_entry_ptr, txn); column_index_reader->Open(flag, std::move(index_dir), std::move(index_by_segment)); (*result.column_index_readers_)[column_id] = std::move(column_index_reader); } diff --git a/src/storage/invertedindex/column_index_reader.cppm b/src/storage/invertedindex/column_index_reader.cppm index 0d17227ab9..566488144f 100644 --- a/src/storage/invertedindex/column_index_reader.cppm +++ b/src/storage/invertedindex/column_index_reader.cppm @@ -31,6 +31,7 @@ export module column_index_reader; namespace infinity { struct TableEntry; class BlockMaxTermDocIterator; +class Txn; export class ColumnIndexReader { public: @@ -75,7 +76,7 @@ export class TableIndexReaderCache { public: void UpdateKnownUpdateTs(TxnTimeStamp ts, std::shared_mutex &segment_update_ts_mutex, TxnTimeStamp &segment_update_ts); - IndexReader GetIndexReader(TransactionID txn_id, TxnTimeStamp begin_ts, TableEntry *table_entry_ptr); + IndexReader GetIndexReader(Txn *txn, TableEntry *table_entry_ptr); private: std::mutex mutex_; diff --git a/src/storage/invertedindex/search/query_builder.cpp b/src/storage/invertedindex/search/query_builder.cpp index d423c17f22..4a55b272ed 100644 --- a/src/storage/invertedindex/search/query_builder.cpp +++ b/src/storage/invertedindex/search/query_builder.cpp @@ -37,9 +37,8 @@ import third_party; namespace infinity { -QueryBuilder::QueryBuilder(TransactionID txn_id, TxnTimeStamp begin_ts, SharedPtr &base_table_ref) - : txn_id_(txn_id), begin_ts_(begin_ts), table_entry_(base_table_ref->table_entry_ptr_), - index_reader_(table_entry_->GetFullTextIndexReader(txn_id_, begin_ts_)) { +QueryBuilder::QueryBuilder(Txn *txn, SharedPtr &base_table_ref) + : table_entry_(base_table_ref->table_entry_ptr_), index_reader_(table_entry_->GetFullTextIndexReader(txn)) { u64 total_row_count = 0; for (SegmentEntry *segment_entry : base_table_ref->block_index_->segments_) { total_row_count += segment_entry->row_count(); diff --git a/src/storage/invertedindex/search/query_builder.cppm b/src/storage/invertedindex/search/query_builder.cppm index 24d1cd2d0d..6872907342 100644 --- a/src/storage/invertedindex/search/query_builder.cppm +++ b/src/storage/invertedindex/search/query_builder.cppm @@ -27,6 +27,7 @@ import base_table_ref; namespace infinity { +class Txn; struct QueryNode; export struct FullTextQueryContext { UniquePtr query_tree_; @@ -37,7 +38,7 @@ class EarlyTerminateIterator; export class QueryBuilder { public: - QueryBuilder(TransactionID txn_id, TxnTimeStamp begin_ts, SharedPtr &base_table_ref); + QueryBuilder(Txn *txn, SharedPtr &base_table_ref); ~QueryBuilder(); @@ -50,8 +51,6 @@ public: inline float Score(RowID doc_id) { return scorer_.Score(doc_id); } private: - TransactionID txn_id_{}; - TxnTimeStamp begin_ts_{}; TableEntry *table_entry_{nullptr}; IndexReader index_reader_; Scorer scorer_; diff --git a/src/storage/meta/catalog.cpp b/src/storage/meta/catalog.cpp index 466e9ed2f3..9baffcbfc1 100644 --- a/src/storage/meta/catalog.cpp +++ b/src/storage/meta/catalog.cpp @@ -206,7 +206,9 @@ Tuple, Status> Catalog::DropTableByName(const String &db_n return db_entry->DropTable(table_name, conflict_type, txn_id, begin_ts, txn_mgr); } -Status Catalog::GetTables(const String &db_name, Vector &output_table_array, TransactionID txn_id, TxnTimeStamp begin_ts) { +Status Catalog::GetTables(const String &db_name, Vector &output_table_array, Txn *txn) { + TransactionID txn_id = txn->TxnID(); + TxnTimeStamp begin_ts = txn->BeginTS(); // Check the db entries auto [db_entry, status] = this->GetDatabase(db_name, txn_id, begin_ts); if (!status.ok()) { @@ -214,7 +216,7 @@ Status Catalog::GetTables(const String &db_name, Vector &output_tab LOG_ERROR(fmt::format("Database: {} is invalid.", db_name)); return status; } - return db_entry->GetTablesDetail(txn_id, begin_ts, output_table_array); + return db_entry->GetTablesDetail(txn, output_table_array); } Tuple Catalog::GetTableByName(const String &db_name, const String &table_name, TransactionID txn_id, TxnTimeStamp begin_ts) { @@ -229,8 +231,9 @@ Tuple Catalog::GetTableByName(const String &db_name, const return db_entry->GetTableCollection(table_name, txn_id, begin_ts); } -Tuple, Status> -Catalog::GetTableInfo(const String &db_name, const String &table_name, TransactionID txn_id, TxnTimeStamp begin_ts) { +Tuple, Status> Catalog::GetTableInfo(const String &db_name, const String &table_name, Txn *txn) { + TransactionID txn_id = txn->TxnID(); + TxnTimeStamp begin_ts = txn->BeginTS(); auto [db_entry, status] = this->GetDatabase(db_name, txn_id, begin_ts); if (!status.ok()) { // Error @@ -238,7 +241,7 @@ Catalog::GetTableInfo(const String &db_name, const String &table_name, Transacti return {nullptr, status}; } - return db_entry->GetTableInfo(table_name, txn_id, begin_ts); + return db_entry->GetTableInfo(table_name, txn); } Status Catalog::RemoveTableEntry(TableEntry *table_entry, TransactionID txn_id) { @@ -657,7 +660,8 @@ void Catalog::LoadFromEntryDelta(TxnTimeStamp max_commit_ts, BufferManager *buff commit_ts, check_point_ts, check_point_row_count, - buffer_mgr); + buffer_mgr, + txn_id); if (merge_flag == MergeFlag::kNew) { if (!block_filter_binary_data.empty()) { diff --git a/src/storage/meta/catalog.cppm b/src/storage/meta/catalog.cppm index 6d431cd247..49263264e4 100644 --- a/src/storage/meta/catalog.cppm +++ b/src/storage/meta/catalog.cppm @@ -157,11 +157,11 @@ public: TxnTimeStamp begin_ts, TxnManager *txn_mgr); - Status GetTables(const String &db_name, Vector &output_table_array, TransactionID txn_id, TxnTimeStamp begin_ts); + Status GetTables(const String &db_name, Vector &output_table_array, Txn *txn); Tuple GetTableByName(const String &db_name, const String &table_name, TransactionID txn_id, TxnTimeStamp begin_ts); - Tuple, Status> GetTableInfo(const String &db_name, const String &table_name, TransactionID txn_id, TxnTimeStamp begin_ts); + Tuple, Status> GetTableInfo(const String &db_name, const String &table_name, Txn *txn); static Status RemoveTableEntry(TableEntry *table_entry, TransactionID txn_id); diff --git a/src/storage/meta/entry/base_entry.cppm b/src/storage/meta/entry/base_entry.cppm index ebee90b686..ffd80cfb81 100644 --- a/src/storage/meta/entry/base_entry.cppm +++ b/src/storage/meta/entry/base_entry.cppm @@ -20,6 +20,11 @@ export module base_entry; import stl; import default_values; +import txn; +import txn_manager; +import infinity_exception; +import third_party; +import txn_state; namespace infinity { @@ -64,8 +69,26 @@ public: SharedPtr encode_ptr() const { return encode_; } + // return if this entry is visible to the `txn` + virtual bool CheckVisible(Txn *txn) const { + TxnTimeStamp begin_ts = txn->BeginTS(); + if (begin_ts >= commit_ts_ || txn_id_ == txn->TxnID()) { + return true; + } + TxnManager *txn_mgr = txn->txn_mgr(); + if (txn_mgr == nullptr) { // when replay + UnrecoverableError(fmt::format("Replay should not reach here. begin_ts: {}, commit_ts_: {} txn_id: {}, txn_id_: {}", + begin_ts, + commit_ts_, + txn->TxnID(), + txn_id_)); + } + // Check if the entry is in committing process, because commit_ts of the base_entry is set in the Txn::CommitBottom + return txn_mgr->CheckIfCommitting(txn_id_, begin_ts); + } + public: - atomic_u64 txn_id_{0}; + TransactionID txn_id_{0}; TxnTimeStamp begin_ts_{0}; atomic_u64 commit_ts_{UNCOMMIT_TS}; const bool deleted_; diff --git a/src/storage/meta/entry/block_entry.cpp b/src/storage/meta/entry/block_entry.cpp index 4f1d76c4ad..9ad62ce4f0 100644 --- a/src/storage/meta/entry/block_entry.cpp +++ b/src/storage/meta/entry/block_entry.cpp @@ -88,10 +88,13 @@ UniquePtr BlockEntry::NewReplayBlockEntry(const SegmentEntry *segmen TxnTimeStamp commit_ts, TxnTimeStamp check_point_ts, u16 checkpoint_row_count, - BufferManager *buffer_mgr) { + BufferManager *buffer_mgr, + TransactionID txn_id) { auto block_entry = MakeUnique(segment_entry, block_id, 0); + block_entry->txn_id_ = txn_id; + block_entry->row_count_ = row_count; block_entry->min_row_ts_ = min_row_ts; block_entry->max_row_ts_ = max_row_ts; @@ -295,7 +298,6 @@ void BlockEntry::CommitBlock(TransactionID txn_id, TxnTimeStamp commit_ts) { } max_row_ts_ = commit_ts; if (!this->Committed()) { - txn_id_ = txn_id; this->Commit(commit_ts); for (auto &column : columns_) { column->CommitColumn(txn_id, commit_ts); @@ -405,6 +407,8 @@ UniquePtr BlockEntry::Deserialize(const nlohmann::json &block_entry_ auto commit_ts = block_entry_json["commit_ts"]; auto checkpoint_ts = block_entry_json["checkpoint_ts"]; + auto txn_id = block_entry_json["txn_id"]; + UniquePtr block_entry = BlockEntry::NewReplayBlockEntry(segment_entry, block_id, row_count, @@ -414,10 +418,10 @@ UniquePtr BlockEntry::Deserialize(const nlohmann::json &block_entry_ commit_ts, checkpoint_ts, row_count, - buffer_mgr); + buffer_mgr, + txn_id); block_entry->begin_ts_ = block_entry_json["begin_ts"]; - block_entry->txn_id_ = block_entry_json["txn_id"]; for (const auto &block_column_json : block_entry_json["columns"]) { block_entry->columns_.emplace_back(BlockColumnEntry::Deserialize(block_column_json, block_entry.get(), buffer_mgr)); diff --git a/src/storage/meta/entry/block_entry.cppm b/src/storage/meta/entry/block_entry.cppm index cbda710ad9..5c206500f0 100644 --- a/src/storage/meta/entry/block_entry.cppm +++ b/src/storage/meta/entry/block_entry.cppm @@ -71,7 +71,8 @@ public: TxnTimeStamp commit_ts, TxnTimeStamp check_point_ts, u16 checkpoint_row_count, - BufferManager *buffer_mgr); + BufferManager *buffer_mgr, + TransactionID txn_id); void UpdateBlockReplay(SharedPtr block_entry, String block_filter_binary_data); diff --git a/src/storage/meta/entry/chunk_index_entry.cppm b/src/storage/meta/entry/chunk_index_entry.cppm index ff2bf9ec19..86b3c9df37 100644 --- a/src/storage/meta/entry/chunk_index_entry.cppm +++ b/src/storage/meta/entry/chunk_index_entry.cppm @@ -114,7 +114,7 @@ public: deprecate_ts_.store(commit_ts); } - bool CheckVisible(TxnTimeStamp ts) { + bool CheckVisibleByTS(TxnTimeStamp ts) { // FIXME: should overload BaseEntry::CheckVisible TxnTimeStamp deprecate_ts = deprecate_ts_.load(); TxnTimeStamp commit_ts = commit_ts_.load(); assert(commit_ts == UNCOMMIT_TS || commit_ts < deprecate_ts); diff --git a/src/storage/meta/entry/db_entry.cpp b/src/storage/meta/entry/db_entry.cpp index c7a84f537f..89740d7c1a 100644 --- a/src/storage/meta/entry/db_entry.cpp +++ b/src/storage/meta/entry/db_entry.cpp @@ -55,7 +55,7 @@ DBEntry::DBEntry(DBMeta *db_meta, TxnTimeStamp begin_ts) : BaseEntry(EntryType::kDatabase, is_delete, DBEntry::EncodeIndex(*db_name)), db_meta_(db_meta), db_entry_dir_(db_entry_dir), db_name_(db_name) { begin_ts_ = begin_ts; - txn_id_.store(txn_id); + txn_id_ = txn_id; } SharedPtr DBEntry::NewDBEntry(DBMeta *db_meta, @@ -111,14 +111,14 @@ Tuple DBEntry::GetTableCollection(const String &table_name return table_meta->GetEntry(std::move(r_lock), txn_id, begin_ts); } -Tuple, Status> DBEntry::GetTableInfo(const String &table_name, TransactionID txn_id, TxnTimeStamp begin_ts) { +Tuple, Status> DBEntry::GetTableInfo(const String &table_name, Txn *txn) { LOG_TRACE(fmt::format("Get a table entry {}", table_name)); auto [table_meta, status, r_lock] = table_meta_map_.GetExistMeta(table_name, ConflictType::kError); if (table_meta == nullptr) { return {nullptr, status}; } - return table_meta->GetTableInfo(std::move(r_lock), txn_id, begin_ts); + return table_meta->GetTableInfo(std::move(r_lock), txn); } void DBEntry::RemoveTableEntry(const String &table_name, TransactionID txn_id) { @@ -200,7 +200,9 @@ Vector DBEntry::TableCollections(TransactionID txn_id, TxnTimeStam return results; } -Status DBEntry::GetTablesDetail(TransactionID txn_id, TxnTimeStamp begin_ts, Vector &output_table_array) { +Status DBEntry::GetTablesDetail(Txn *txn, Vector &output_table_array) { + TransactionID txn_id = txn->TxnID(); + TxnTimeStamp begin_ts = txn->BeginTS(); Vector table_collection_entries = this->TableCollections(txn_id, begin_ts); output_table_array.reserve(table_collection_entries.size()); for (TableEntry *table_entry : table_collection_entries) { @@ -213,7 +215,7 @@ Status DBEntry::GetTablesDetail(TransactionID txn_id, TxnTimeStamp begin_ts, Vec table_detail.segment_capacity_ = DEFAULT_SEGMENT_CAPACITY; table_detail.block_capacity_ = DEFAULT_BLOCK_CAPACITY; - SharedPtr segment_index = table_entry->GetBlockIndex(begin_ts); + SharedPtr segment_index = table_entry->GetBlockIndex(txn); table_detail.segment_count_ = segment_index->SegmentCount(); table_detail.block_count_ = segment_index->BlockCount(); @@ -236,7 +238,7 @@ nlohmann::json DBEntry::Serialize(TxnTimeStamp max_commit_ts) { { std::shared_lock lck(this->rw_locker()); json_res["db_name"] = *this->db_name_; - json_res["txn_id"] = this->txn_id_.load(); + json_res["txn_id"] = this->txn_id_; json_res["begin_ts"] = this->begin_ts_; json_res["commit_ts"] = this->commit_ts_.load(); json_res["deleted"] = this->deleted_; diff --git a/src/storage/meta/entry/db_entry.cppm b/src/storage/meta/entry/db_entry.cppm index 580bcff87a..0a755f5979 100644 --- a/src/storage/meta/entry/db_entry.cppm +++ b/src/storage/meta/entry/db_entry.cppm @@ -95,7 +95,7 @@ public: Tuple GetTableCollection(const String &table_name, TransactionID txn_id, TxnTimeStamp begin_ts); - Tuple, Status> GetTableInfo(const String &table_name, TransactionID txn_id, TxnTimeStamp begin_ts); + Tuple, Status> GetTableInfo(const String &table_name, Txn *txn); void RemoveTableEntry(const String &table_collection_name, TransactionID txn_id); @@ -120,7 +120,7 @@ public: Vector TableCollections(TransactionID txn_id, TxnTimeStamp begin_ts); - Status GetTablesDetail(TransactionID txn_id, TxnTimeStamp begin_ts, Vector &output_table_array); + Status GetTablesDetail(Txn *txn, Vector &output_table_array); private: static SharedPtr DetermineDBDir(const String &parent_dir, const String &db_name) { diff --git a/src/storage/meta/entry/segment_entry.cpp b/src/storage/meta/entry/segment_entry.cpp index d2f1f68eb2..7bc11ed3d2 100644 --- a/src/storage/meta/entry/segment_entry.cpp +++ b/src/storage/meta/entry/segment_entry.cpp @@ -233,9 +233,10 @@ bool SegmentEntry::CheckDeleteVisible(HashMap> &blo return true; } -bool SegmentEntry::CheckVisible(TxnTimeStamp check_ts) const { +bool SegmentEntry::CheckVisible(Txn *txn) const { + TxnTimeStamp begin_ts = txn->BeginTS(); std::shared_lock lock(rw_locker_); - return min_row_ts_ <= check_ts && check_ts <= deprecate_ts_; + return begin_ts < deprecate_ts_ && BaseEntry::CheckVisible(txn); } bool SegmentEntry::CheckDeprecate(TxnTimeStamp check_ts) const { @@ -403,7 +404,7 @@ void SegmentEntry::CommitFlushed(TxnTimeStamp commit_ts) { } } -void SegmentEntry::CommitSegment(TransactionID txn_id, TxnTimeStamp commit_ts) { +void SegmentEntry::CommitSegment(TransactionID txn_id, TxnTimeStamp commit_ts, const TxnSegmentStore &segment_store) { std::unique_lock w_lock(rw_locker_); min_row_ts_ = std::min(min_row_ts_, commit_ts); if (commit_ts < max_row_ts_) { @@ -411,9 +412,12 @@ void SegmentEntry::CommitSegment(TransactionID txn_id, TxnTimeStamp commit_ts) { } max_row_ts_ = commit_ts; if (!this->Committed()) { - this->txn_id_ = txn_id; this->Commit(commit_ts); } + // hold the lock here + for (const auto &[block_id, block_entry] : segment_store.block_entries_) { + block_entry->CommitBlock(txn_id, commit_ts); + } } void SegmentEntry::RollbackBlocks(TxnTimeStamp commit_ts, const HashMap &block_entries) { diff --git a/src/storage/meta/entry/segment_entry.cppm b/src/storage/meta/entry/segment_entry.cppm index 72adef0cb5..bb334bada6 100644 --- a/src/storage/meta/entry/segment_entry.cppm +++ b/src/storage/meta/entry/segment_entry.cppm @@ -34,10 +34,16 @@ import cleanup_scanner; namespace infinity { class TxnTableStore; +struct TxnSegmentStore; struct TableEntry; class CompactSegmentsTask; class BlockEntryIter; +export struct BlocksGuard { + const Vector> &block_entries_; + std::shared_lock lock_; +}; + export enum class SegmentStatus : u8 { kUnsealed, kSealed, @@ -109,7 +115,7 @@ public: bool CheckDeleteVisible(HashMap> &block_offsets_map, Txn *txn) const; - bool CheckVisible(TxnTimeStamp check_ts) const; + virtual bool CheckVisible(Txn *txn) const override; bool CheckDeprecate(TxnTimeStamp check_ts) const; @@ -154,9 +160,7 @@ public: } // only used in Serialize(), FullCheckpoint, and no concurrency - SizeT checkpoint_row_count() const { - return checkpoint_row_count_; - } + SizeT checkpoint_row_count() const { return checkpoint_row_count_; } int Room() const { return this->row_capacity_ - this->row_count(); } @@ -174,6 +178,8 @@ public: SharedPtr GetBlockEntryByID(BlockID block_id) const; + BlocksGuard GetBlocksGuard() const { return BlocksGuard{block_entries_, std::shared_lock(rw_locker_)}; } + public: SizeT row_count(TxnTimeStamp check_ts) const; @@ -183,7 +189,7 @@ public: void CommitFlushed(TxnTimeStamp commit_ts); - void CommitSegment(TransactionID txn_id, TxnTimeStamp commit_ts); + void CommitSegment(TransactionID txn_id, TxnTimeStamp commit_ts, const TxnSegmentStore &segment_store); void RollbackBlocks(TxnTimeStamp commit_ts, const HashMap &block_entries); diff --git a/src/storage/meta/entry/segment_index_entry.cpp b/src/storage/meta/entry/segment_index_entry.cpp index b08461c59c..21f7fc6cd5 100644 --- a/src/storage/meta/entry/segment_index_entry.cpp +++ b/src/storage/meta/entry/segment_index_entry.cpp @@ -242,16 +242,18 @@ void SegmentIndexEntry::MemIndexInsert(SharedPtr block_entry, const IndexFullText *index_fulltext = static_cast(index_base.get()); if (memory_indexer_.get() == nullptr) { String base_name = fmt::format("ft_{:016x}", begin_row_id.ToUint64()); - std::unique_lock lck(rw_locker_); - memory_indexer_ = MakeUnique(*table_index_entry_->index_dir(), - base_name, - begin_row_id, - index_fulltext->flag_, - index_fulltext->analyzer_, - table_index_entry_->GetFulltextByteSlicePool(), - table_index_entry_->GetFulltextBufferPool(), - table_index_entry_->GetFulltextInvertingThreadPool(), - table_index_entry_->GetFulltextCommitingThreadPool()); + { + std::unique_lock lck(rw_locker_); + memory_indexer_ = MakeUnique(*table_index_entry_->index_dir(), + base_name, + begin_row_id, + index_fulltext->flag_, + index_fulltext->analyzer_, + table_index_entry_->GetFulltextByteSlicePool(), + table_index_entry_->GetFulltextBufferPool(), + table_index_entry_->GetFulltextInvertingThreadPool(), + table_index_entry_->GetFulltextCommitingThreadPool()); + } table_index_entry_->UpdateFulltextSegmentTs(commit_ts); } else { RowID exp_begin_row_id = memory_indexer_->GetBaseRowId() + memory_indexer_->GetDocCount(); @@ -780,7 +782,7 @@ ChunkIndexEntry *SegmentIndexEntry::RebuildChunkIndexEntries(TxnTableStore *txn_ return nullptr; } for (const auto &chunk_index_entry : chunk_index_entries_) { - if (chunk_index_entry->CheckVisible(begin_ts)) { + if (chunk_index_entry->CheckVisible(txn)) { row_count += chunk_index_entry->row_count_; old_chunks.push_back(chunk_index_entry.get()); } @@ -829,7 +831,7 @@ ChunkIndexEntry *SegmentIndexEntry::RebuildChunkIndexEntries(TxnTableStore *txn_ return nullptr; } for (const auto &chunk_index_entry : chunk_index_entries_) { - if (chunk_index_entry->CheckVisible(begin_ts)) { + if (chunk_index_entry->CheckVisible(txn)) { row_count += chunk_index_entry->GetRowCount(); old_chunks.push_back(chunk_index_entry.get()); } diff --git a/src/storage/meta/entry/segment_index_entry.cppm b/src/storage/meta/entry/segment_index_entry.cppm index ead32661ef..5a66069d79 100644 --- a/src/storage/meta/entry/segment_index_entry.cppm +++ b/src/storage/meta/entry/segment_index_entry.cppm @@ -139,7 +139,7 @@ public: SizeT num = chunk_index_entries_.size(); for (SizeT i = 0; i < num; i++) { auto &chunk_index_entry = chunk_index_entries_[i]; - if (chunk_index_entry->CheckVisible(begin_ts)) { + if (chunk_index_entry->CheckVisibleByTS(begin_ts)) { chunk_index_entries.push_back(chunk_index_entry); } } diff --git a/src/storage/meta/entry/table_entry.cpp b/src/storage/meta/entry/table_entry.cpp index 3e374abbd0..4634696f0c 100644 --- a/src/storage/meta/entry/table_entry.cpp +++ b/src/storage/meta/entry/table_entry.cpp @@ -208,7 +208,7 @@ void TableEntry::RemoveIndexEntry(const String &index_name, TransactionID txn_id /// replay void TableEntry::UpdateEntryReplay(const SharedPtr &table_entry) { - txn_id_.store(table_entry->txn_id_); + txn_id_ = table_entry->txn_id_; begin_ts_ = table_entry->begin_ts_; commit_ts_.store(table_entry->commit_ts_); row_count_ = table_entry->row_count(); @@ -474,10 +474,7 @@ Status TableEntry::CommitCompact(TransactionID txn_id, TxnTimeStamp commit_ts, T auto *segment_entry = segment_store.segment_entry_; - segment_entry->CommitSegment(txn_id, commit_ts); - for (auto [block_id, block_entry] : segment_store.block_entries_) { - block_entry->CommitBlock(txn_id, commit_ts); - } + segment_entry->CommitSegment(txn_id, commit_ts, segment_store); for (const auto &old_segment : old_segments) { // old_segment->TrySetDeprecated(commit_ts); @@ -581,10 +578,7 @@ Status TableEntry::RollbackCompact(TransactionID txn_id, TxnTimeStamp commit_ts, Status TableEntry::CommitWrite(TransactionID txn_id, TxnTimeStamp commit_ts, const HashMap &segment_stores) { for (const auto &[segment_id, segment_store] : segment_stores) { auto *segment_entry = segment_store.segment_entry_; - segment_entry->CommitSegment(txn_id, commit_ts); - for (auto [block_id, block_entry] : segment_store.block_entries_) { - block_entry->CommitBlock(txn_id, commit_ts); - } + segment_entry->CommitSegment(txn_id, commit_ts, segment_store); } return Status::OK(); } @@ -882,7 +876,7 @@ SharedPtr TableEntry::GetSegmentByID(SegmentID seg_id, Txn *txn) c return nullptr; } const auto &segment = iter->second; - if (segment->commit_ts_ > txn->BeginTS() && segment->txn_id_ != txn->TxnID()) { + if (!segment->CheckVisible(txn)) { return nullptr; } return segment; @@ -910,7 +904,7 @@ String TableEntry::GetPathNameTail() const { return table_entry_dir_->substr(delimiter_i + 1); } -SharedPtr TableEntry::GetBlockIndex(TxnTimeStamp begin_ts) { +SharedPtr TableEntry::GetBlockIndex(Txn *txn) { // SharedPtr> result = MakeShared>(); SharedPtr result = MakeShared(); std::shared_lock rw_locker(this->rw_locker_); @@ -918,7 +912,7 @@ SharedPtr TableEntry::GetBlockIndex(TxnTimeStamp begin_ts) { // Add segment that is not deprecated for (const auto &segment_pair : this->segment_map_) { - result->Insert(segment_pair.second.get(), begin_ts); + result->Insert(segment_pair.second.get(), txn); } return result; @@ -937,16 +931,6 @@ bool TableEntry::CheckDeleteVisible(DeleteState &delete_state, Txn *txn) { return true; } -bool TableEntry::CheckVisible(SegmentID segment_id, TxnTimeStamp begin_ts) const { - std::shared_lock lock(this->rw_locker_); - auto iter = segment_map_.find(segment_id); - if (iter == segment_map_.end()) { - return false; - } - const auto &segment = iter->second; - return segment->CheckVisible(begin_ts); -} - nlohmann::json TableEntry::Serialize(TxnTimeStamp max_commit_ts) { nlohmann::json json_res; @@ -960,7 +944,7 @@ nlohmann::json TableEntry::Serialize(TxnTimeStamp max_commit_ts) { json_res["table_entry_type"] = this->table_entry_type_; json_res["begin_ts"] = this->begin_ts_; json_res["commit_ts"] = this->commit_ts_.load(); - json_res["txn_id"] = this->txn_id_.load(); + json_res["txn_id"] = this->txn_id_; json_res["deleted"] = this->deleted_; if (!this->deleted_) { json_res["table_entry_dir"] = *this->table_entry_dir_; @@ -1196,8 +1180,6 @@ void TableEntry::Cleanup() { LOG_TRACE(fmt::format("Cleaned dir: {}", *table_entry_dir_)); } -IndexReader TableEntry::GetFullTextIndexReader(TransactionID txn_id, TxnTimeStamp begin_ts) { - return fulltext_column_index_cache_.GetIndexReader(txn_id, begin_ts, this); -} +IndexReader TableEntry::GetFullTextIndexReader(Txn *txn) { return fulltext_column_index_cache_.GetIndexReader(txn, this); } } // namespace infinity diff --git a/src/storage/meta/entry/table_entry.cppm b/src/storage/meta/entry/table_entry.cppm index f3b54c453f..ff8bd5a43f 100644 --- a/src/storage/meta/entry/table_entry.cppm +++ b/src/storage/meta/entry/table_entry.cppm @@ -217,7 +217,7 @@ public: Pair GetSegmentRowCountBySegmentID(u32 seg_id); - SharedPtr GetBlockIndex(TxnTimeStamp begin_ts); + SharedPtr GetBlockIndex(Txn *txn); void GetFulltextAnalyzers(TransactionID txn_id, TxnTimeStamp begin_ts, Map &column2analyzer); @@ -235,7 +235,7 @@ public: const Vector> &column_defs() const { return columns_; } - IndexReader GetFullTextIndexReader(TransactionID txn_id, TxnTimeStamp begin_ts); + IndexReader GetFullTextIndexReader(Txn *txn); void UpdateFullTextSegmentTs(TxnTimeStamp ts, std::shared_mutex &segment_update_ts_mutex, TxnTimeStamp &segment_update_ts) { return fulltext_column_index_cache_.UpdateKnownUpdateTs(ts, segment_update_ts_mutex, segment_update_ts); @@ -243,8 +243,6 @@ public: bool CheckDeleteVisible(DeleteState &delete_state, Txn *txn); - bool CheckVisible(SegmentID segment_id, TxnTimeStamp check_ts) const; - private: TableMeta *const table_meta_{}; diff --git a/src/storage/meta/entry/table_index_entry.cpp b/src/storage/meta/entry/table_index_entry.cpp index 0399d24a14..4b89778f93 100644 --- a/src/storage/meta/entry/table_index_entry.cpp +++ b/src/storage/meta/entry/table_index_entry.cpp @@ -111,13 +111,14 @@ SharedPtr TableIndexEntry::ReplayTableIndexEntry(TableIndexMeta return table_index_entry; } -Map> TableIndexEntry::GetIndexBySegmentSnapshot(const TableEntry *table_entry, TxnTimeStamp begin_ts) { +Map> TableIndexEntry::GetIndexBySegmentSnapshot(const TableEntry *table_entry, Txn *txn) { std::shared_lock lck(this->rw_locker_); Map> index_by_segment_snapshot; for (const auto &[segment_id, segment_index_entry] : this->index_by_segment_) { - bool visible = table_entry->CheckVisible(segment_id, begin_ts); - if (!visible) + auto segment_entry = table_entry->GetSegmentByID(segment_id, txn); + if (segment_entry.get() == nullptr) { continue; + } index_by_segment_snapshot.emplace(segment_id, segment_index_entry); } return index_by_segment_snapshot; @@ -190,7 +191,7 @@ nlohmann::json TableIndexEntry::Serialize(TxnTimeStamp max_commit_ts) { Vector> segment_index_entry_candidates; { std::shared_lock lck(this->rw_locker_); - json["txn_id"] = this->txn_id_.load(); + json["txn_id"] = this->txn_id_; json["begin_ts"] = this->begin_ts_; json["commit_ts"] = this->commit_ts_.load(); json["deleted"] = this->deleted_; @@ -307,12 +308,12 @@ TableIndexEntry::CreateIndexPrepare(TableEntry *table_entry, BlockIndex *block_i return {segment_index_entries, Status::OK()}; } -Status TableIndexEntry::CreateIndexDo(const TableEntry *table_entry, HashMap &create_index_idxes) { +Status TableIndexEntry::CreateIndexDo(const TableEntry *table_entry, HashMap &create_index_idxes, Txn *txn) { if (this->index_base_->column_names_.size() != 1) { // TODO RecoverableError(Status::NotSupport("Not implemented")); } - Map> index_by_segment = GetIndexBySegmentSnapshot(table_entry, MAX_TIMESTAMP); + Map> index_by_segment = GetIndexBySegmentSnapshot(table_entry, txn); for (auto &[segment_id, segment_index_entry] : index_by_segment) { atomic_u64 &create_index_idx = create_index_idxes.at(segment_id); auto status = segment_index_entry->CreateIndexDo(create_index_idx); diff --git a/src/storage/meta/entry/table_index_entry.cppm b/src/storage/meta/entry/table_index_entry.cppm index dcc2c816c9..ce6a1cf44e 100644 --- a/src/storage/meta/entry/table_index_entry.cppm +++ b/src/storage/meta/entry/table_index_entry.cppm @@ -95,7 +95,7 @@ public: inline const SharedPtr &column_def() const { return column_def_; } Map> &index_by_segment() { return index_by_segment_; } - Map> GetIndexBySegmentSnapshot(const TableEntry *table_entry, TxnTimeStamp begin_ts); + Map> GetIndexBySegmentSnapshot(const TableEntry *table_entry, Txn *txn); const SharedPtr &index_dir() const { return index_dir_; } String GetPathNameTail() const; bool GetOrCreateSegment(SegmentID segment_id, Txn *txn, SharedPtr &segment_index_entry); @@ -115,7 +115,7 @@ public: Tuple, Status> CreateIndexPrepare(TableEntry *table_entry, BlockIndex *block_index, Txn *txn, bool prepare, bool is_replay, bool check_ts = true); - Status CreateIndexDo(const TableEntry *table_entry, HashMap &create_index_idxes); + Status CreateIndexDo(const TableEntry *table_entry, HashMap &create_index_idxes, Txn *txn); MemoryPool &GetFulltextByteSlicePool() { return byte_slice_pool_; } RecyclePool &GetFulltextBufferPool() { return buffer_pool_; } diff --git a/src/storage/meta/table_meta.cpp b/src/storage/meta/table_meta.cpp index 0706852e9f..0dbb46f59b 100644 --- a/src/storage/meta/table_meta.cpp +++ b/src/storage/meta/table_meta.cpp @@ -89,7 +89,9 @@ Tuple, Status> TableMeta::DropEntry(std::shared_lock, Status> -TableMeta::GetTableInfo(std::shared_lock &&r_lock, TransactionID txn_id, TxnTimeStamp begin_ts) { +TableMeta::GetTableInfo(std::shared_lock &&r_lock, Txn *txn) { + TransactionID txn_id = txn->TxnID(); + TxnTimeStamp begin_ts = txn->BeginTS(); auto [table_entry, status] = table_entry_list_.GetEntry(std::move(r_lock), txn_id, begin_ts); if (!status.ok()) { return {nullptr, status}; @@ -101,7 +103,7 @@ TableMeta::GetTableInfo(std::shared_lock &&r_lock, Transactio table_info->column_count_ = table_entry->ColumnCount(); table_info->row_count_ = table_entry->row_count(); - SharedPtr segment_index = table_entry->GetBlockIndex(begin_ts); + SharedPtr segment_index = table_entry->GetBlockIndex(txn); table_info->segment_count_ = segment_index->SegmentCount(); return {table_info, status}; diff --git a/src/storage/meta/table_meta.cppm b/src/storage/meta/table_meta.cppm index c9dd61e660..361d64695b 100644 --- a/src/storage/meta/table_meta.cppm +++ b/src/storage/meta/table_meta.cppm @@ -36,6 +36,7 @@ namespace infinity { class DBEntry; class TxnManager; +class Txn; export struct TableMeta : public MetaInterface { using EntryT = TableEntry; @@ -80,7 +81,7 @@ private: const String &table_name, ConflictType conflict_type); - Tuple, Status> GetTableInfo(std::shared_lock &&r_lock, TransactionID txn_id, TxnTimeStamp begin_ts); + Tuple, Status> GetTableInfo(std::shared_lock &&r_lock, Txn *txn); Tuple GetEntry(std::shared_lock &&r_lock, TransactionID txn_id, TxnTimeStamp begin_ts) { return table_entry_list_.GetEntry(std::move(r_lock), txn_id, begin_ts); diff --git a/src/storage/secondary_index/common_query_filter.cpp b/src/storage/secondary_index/common_query_filter.cpp index 7be0893135..2e238e7d26 100644 --- a/src/storage/secondary_index/common_query_filter.cpp +++ b/src/storage/secondary_index/common_query_filter.cpp @@ -130,7 +130,9 @@ CommonQueryFilter::CommonQueryFilter(SharedPtr original_filter, } } -void CommonQueryFilter::BuildFilter(u32 task_id, TxnTimeStamp begin_ts, BufferManager *buffer_mgr) { +void CommonQueryFilter::BuildFilter(u32 task_id, Txn *txn) { + auto *buffer_mgr = txn->buffer_mgr(); + TxnTimeStamp begin_ts = txn->BeginTS(); const HashMap &segment_index = base_table_ref_->block_index_->segment_index_; const SegmentID segment_id = tasks_[task_id]; const SegmentEntry *segment_entry = segment_index.at(segment_id); @@ -145,7 +147,7 @@ void CommonQueryFilter::BuildFilter(u32 task_id, TxnTimeStamp begin_ts, BufferMa segment_id, segment_row_count, segment_actual_row_count, - begin_ts); + txn); if (std::visit(Overload{[](const Vector &v) -> bool { return v.empty(); }, [](const Bitmask &) -> bool { return false; }}, result_elem)) { // empty result return; diff --git a/src/storage/secondary_index/common_query_filter.cppm b/src/storage/secondary_index/common_query_filter.cppm index 654aa3011c..d600febddf 100644 --- a/src/storage/secondary_index/common_query_filter.cppm +++ b/src/storage/secondary_index/common_query_filter.cppm @@ -24,6 +24,7 @@ class BaseTableRef; class BaseExpression; class QueryContext; class BufferManager; +class Txn; struct TableIndexEntry; export struct CommonQueryFilter { @@ -61,7 +62,7 @@ export struct CommonQueryFilter { // 2. return true if the filter is available for query // if other threads are building the filter, the filter is not available for query // in this case, physical operator should return early and wait for next scheduling - bool TryFinishBuild(TxnTimeStamp begin_ts, BufferManager *buffer_mgr) { + bool TryFinishBuild(Txn *txn) { if (finish_build_.test(std::memory_order_acquire)) { return true; } @@ -74,7 +75,7 @@ export struct CommonQueryFilter { } task_id = begin_task_num_++; } - BuildFilter(task_id, begin_ts, buffer_mgr); + BuildFilter(task_id, txn); if (++end_task_num_ == total_task_num_) { finish_build_.test_and_set(std::memory_order_release); break; @@ -89,7 +90,7 @@ export struct CommonQueryFilter { void TryApplySecondaryIndexFilterOptimizer(QueryContext *query_context); private: - void BuildFilter(u32 task_id, TxnTimeStamp begin_ts, BufferManager *buffer_mgr); + void BuildFilter(u32 task_id, Txn *txn); }; } // namespace infinity \ No newline at end of file diff --git a/src/storage/txn/txn.cpp b/src/storage/txn/txn.cpp index f0bbb02baf..b49c52fc52 100644 --- a/src/storage/txn/txn.cpp +++ b/src/storage/txn/txn.cpp @@ -207,9 +207,7 @@ Vector Txn::ListDatabases() { Status Txn::GetTables(const String &db_name, Vector &output_table_array) { this->CheckTxn(db_name); - TxnTimeStamp begin_ts = txn_context_.GetBeginTS(); - - return catalog_->GetTables(db_name, output_table_array, txn_id_, begin_ts); + return catalog_->GetTables(db_name, output_table_array, this); } Status Txn::CreateTable(const String &db_name, const SharedPtr &table_def, ConflictType conflict_type) { @@ -302,7 +300,7 @@ Status Txn::CreateIndexDo(BaseTableRef *table_ref, const String &index_name, Has UnrecoverableError("Index is not created by this txn. Something error happened."); } - return table_index_entry->CreateIndexDo(table_entry, create_index_idxes); + return table_index_entry->CreateIndexDo(table_entry, create_index_idxes, this); } Status Txn::CreateIndexFinish(const TableEntry *table_entry, const TableIndexEntry *table_index_entry) { @@ -351,8 +349,7 @@ Tuple Txn::GetTableByName(const String &db_name, const Str } Tuple, Status> Txn::GetTableInfo(const String &db_name, const String &table_name) { - TxnTimeStamp begin_ts = txn_context_.GetBeginTS(); - return catalog_->GetTableInfo(db_name, table_name, txn_id_, begin_ts); + return catalog_->GetTableInfo(db_name, table_name, this); } Status Txn::CreateCollection(const String &, const String &, ConflictType, BaseEntry *&) { diff --git a/src/storage/txn/txn_manager.cpp b/src/storage/txn/txn_manager.cpp index 80415ffe92..d74e972663 100644 --- a/src/storage/txn/txn_manager.cpp +++ b/src/storage/txn/txn_manager.cpp @@ -83,7 +83,29 @@ SharedPtr TxnManager::GetTxnPtr(TransactionID txn_id) { return res; } -TxnState TxnManager::GetTxnState(TransactionID txn_id) { return GetTxn(txn_id)->GetTxnState(); } +TxnState TxnManager::GetTxnState(TransactionID txn_id) { + std::lock_guard guard(rw_locker_); + auto iter = txn_map_.find(txn_id); + if (iter == txn_map_.end()) { + return TxnState::kCommitted; + } + Txn *txn = iter->second.get(); + return txn->GetTxnState(); +} + +bool TxnManager::CheckIfCommitting(TransactionID txn_id, TxnTimeStamp begin_ts) { + std::lock_guard guard(rw_locker_); + auto iter = txn_map_.find(txn_id); + if (iter == txn_map_.end()) { + return true; // Txn is already committed + } + Txn *txn = iter->second.get(); + auto state = txn->GetTxnState(); + if (state != TxnState::kCommitting && state != TxnState::kCommitted) { + return false; + } + return txn->CommitTS() < begin_ts; +} TxnTimeStamp TxnManager::GetCommitTimeStampR(Txn *txn) { std::lock_guard guard(rw_locker_); diff --git a/src/storage/txn/txn_manager.cppm b/src/storage/txn/txn_manager.cppm index 183443918a..3a25b0db86 100644 --- a/src/storage/txn/txn_manager.cppm +++ b/src/storage/txn/txn_manager.cppm @@ -49,6 +49,8 @@ public: TxnState GetTxnState(TransactionID txn_id); + bool CheckIfCommitting(TransactionID txn_id, TxnTimeStamp begin_ts); + inline void Lock() { rw_locker_.lock(); } inline void UnLock() { rw_locker_.unlock(); } diff --git a/src/storage/wal/wal_manager.cpp b/src/storage/wal/wal_manager.cpp index ed0f521597..4fd6f45716 100644 --- a/src/storage/wal/wal_manager.cpp +++ b/src/storage/wal/wal_manager.cpp @@ -651,7 +651,8 @@ void WalManager::WalCmdCreateIndexReplay(const WalCmdCreateIndex &cmd, Transacti auto fake_txn = Txn::NewReplayTxn(storage_->buffer_manager(), storage_->txn_manager(), storage_->catalog(), txn_id); - auto block_index = table_entry->GetBlockIndex(commit_ts); + auto txn = MakeUnique(nullptr /*buffer_mgr*/, nullptr /*txn_mgr*/, nullptr /*catalog*/, txn_id, begin_ts); + auto block_index = table_entry->GetBlockIndex(txn.get()); table_index_entry->CreateIndexPrepare(table_entry, block_index.get(), fake_txn.get(), false, true); auto *txn_store = fake_txn->GetTxnTableStore(table_entry); @@ -707,7 +708,8 @@ WalManager::ReplaySegment(TableEntry *table_entry, const WalSegmentInfo &segment commit_ts, /*commit_ts*/ commit_ts, /*checkpoint_ts*/ block_info.row_count_, /*checkpoint_row_count*/ - buffer_mgr); + buffer_mgr, + txn_id); for (ColumnID column_id = 0; column_id < (ColumnID)block_info.outline_infos_.size(); ++column_id) { auto [next_idx, last_off] = block_info.outline_infos_[column_id]; auto column_entry = BlockColumnEntry::NewReplayBlockColumnEntry(block_entry.get(), column_id, buffer_mgr, next_idx, last_off, commit_ts); diff --git a/src/unit_test/storage/bg_task/cleanup_task.cpp b/src/unit_test/storage/bg_task/cleanup_task.cpp index c7b2ce422a..655ccb76f6 100644 --- a/src/unit_test/storage/bg_task/cleanup_task.cpp +++ b/src/unit_test/storage/bg_task/cleanup_task.cpp @@ -70,7 +70,7 @@ class CleanupTaskTest : public BaseTest { break; } if (end - start > 10) { - UnrecoverableException("WaitFlushDeltaOp timeout"); + UnrecoverableError("WaitFlushDeltaOp timeout"); } LOG_INFO(fmt::format("Before usleep. Wait flush delta op for {} seconds", end - start)); usleep(1000 * 1000); @@ -456,8 +456,7 @@ TEST_F(CleanupTaskTest, test_with_index_compact_and_cleanup) { auto [table_entry, status1] = txn->GetTableByName(*db_name, *table_name); EXPECT_TRUE(status1.ok()); - TxnTimeStamp begin_ts = txn->BeginTS(); - auto table_ref = BaseTableRef::FakeTableRef(table_entry, begin_ts); + auto table_ref = BaseTableRef::FakeTableRef(table_entry, txn); auto [table_index_entry, status2] = txn->CreateIndexDef(table_entry, index_base, ConflictType::kError); EXPECT_TRUE(status2.ok()); diff --git a/src/unit_test/storage/buffer/buffer_obj.cpp b/src/unit_test/storage/buffer/buffer_obj.cpp index 2adb854ffc..6902d65eeb 100644 --- a/src/unit_test/storage/buffer/buffer_obj.cpp +++ b/src/unit_test/storage/buffer/buffer_obj.cpp @@ -80,29 +80,34 @@ class BufferObjTest : public BaseTest { void WaitCleanup(Storage *storage, TxnTimeStamp last_commit_ts) { Catalog *catalog = storage->catalog(); - TxnManager *txn_mgr = storage->txn_manager(); BufferManager *buffer_mgr = storage->buffer_manager(); - LOG_INFO("Waiting cleanup"); + auto visible_ts = WaitFlushDeltaOp(storage, last_commit_ts); + + auto cleanup_task = MakeShared(catalog, visible_ts, buffer_mgr); + cleanup_task->Execute(); + } + + TxnTimeStamp WaitFlushDeltaOp(Storage *storage, TxnTimeStamp last_commit_ts) { + TxnManager *txn_mgr = storage->txn_manager(); + TxnTimeStamp visible_ts = 0; time_t start = time(nullptr); while (true) { visible_ts = txn_mgr->GetCleanupScanTS(); + // wait for at most 10s time_t end = time(nullptr); if (visible_ts >= last_commit_ts) { - LOG_INFO(fmt::format("Cleanup finished after {}", end - start)); + LOG_INFO(fmt::format("FlushDeltaOp finished after {}", end - start)); break; } - // wait for at most 10s - if (end - start > 10) { - UnrecoverableError("WaitCleanup timeout"); + if (end - start > 5) { + UnrecoverableError("WaitFlushDeltaOp timeout"); } - LOG_INFO(fmt::format("Before usleep. Wait cleanup for {} seconds", end - start)); + LOG_INFO(fmt::format("Before usleep. Wait flush delta op for {} seconds", end - start)); usleep(1000 * 1000); } - - auto cleanup_task = MakeShared(catalog, visible_ts, buffer_mgr); - cleanup_task->Execute(); + return visible_ts; } }; @@ -567,7 +572,7 @@ TEST_F(BufferObjTest, test_hnsw_index_buffer_obj_shutdown) { auto [table_entry, table_status] = txn->GetTableByName(db_name, table_name); EXPECT_EQ(table_status.ok(), true); { - auto table_ref = BaseTableRef::FakeTableRef(table_entry, txn->BeginTS()); + auto table_ref = BaseTableRef::FakeTableRef(table_entry, txn); auto result = txn->CreateIndexDef(table_entry, index_base_hnsw, conflict_type); auto *table_index_entry = std::get<0>(result); auto status = std::get<1>(result); @@ -602,9 +607,10 @@ TEST_F(BufferObjTest, test_hnsw_index_buffer_obj_shutdown) { auto append_status = txn->Append(table_entry, data_block); ASSERT_TRUE(append_status.ok()); - txn_mgr->CommitTxn(txn); + last_commit_ts = txn_mgr->CommitTxn(txn); } } + WaitFlushDeltaOp(storage, last_commit_ts); // Get Index { auto *txn = txn_mgr->BeginTxn(MakeUnique("get index")); diff --git a/src/unit_test/storage/invertedindex/search/query_match.cpp b/src/unit_test/storage/invertedindex/search/query_match.cpp index c1ea674a4e..897a1f22ae 100644 --- a/src/unit_test/storage/invertedindex/search/query_match.cpp +++ b/src/unit_test/storage/invertedindex/search/query_match.cpp @@ -206,8 +206,7 @@ void QueryMatchTest::CreateIndex(const String& db_name, const String& table_name columns.emplace_back(idx); } - TxnTimeStamp begin_ts = txn_idx->BeginTS(); - SharedPtr block_index = table_entry->GetBlockIndex(begin_ts); + SharedPtr block_index = table_entry->GetBlockIndex(txn_idx); u64 table_idx = 0; auto table_ref = MakeShared(table_entry, std::move(columns), block_index, alias, table_idx, names_ptr, types_ptr); @@ -292,12 +291,9 @@ void QueryMatchTest::QueryMatch(const String& db_name, auto [table_index_entry, status_index] = txn->GetIndexByName(db_name, table_name, index_name); EXPECT_TRUE(status_index.ok()); - auto txn_id = txn->TxnID(); - auto begin_ts = txn->BeginTS(); + auto fake_table_ref = BaseTableRef::FakeTableRef(table_entry, txn); - auto fake_table_ref = BaseTableRef::FakeTableRef(table_entry, begin_ts); - - QueryBuilder query_builder(txn_id, begin_ts, fake_table_ref); + QueryBuilder query_builder(txn, fake_table_ref); const Map &column2analyzer = query_builder.GetColumn2Analyzer(); auto match_expr = MakeShared(); diff --git a/src/unit_test/storage/wal/catalog_delta_replay.cpp b/src/unit_test/storage/wal/catalog_delta_replay.cpp index 545b15e2e2..fa0945cced 100644 --- a/src/unit_test/storage/wal/catalog_delta_replay.cpp +++ b/src/unit_test/storage/wal/catalog_delta_replay.cpp @@ -69,7 +69,7 @@ class CatalogDeltaReplayTest : public BaseTest { // wait for at most 10s time_t end = time(nullptr); if (end - start > 10) { - UnrecoverableException("WaitFlushDeltaOp timeout"); + UnrecoverableError("WaitFlushDeltaOp timeout"); } } } @@ -877,8 +877,7 @@ TEST_F(CatalogDeltaReplayTest, replay_table_single_index) { columns.emplace_back(idx); } - TxnTimeStamp begin_ts = txn_idx->BeginTS(); - SharedPtr block_index = table_entry->GetBlockIndex(begin_ts); + SharedPtr block_index = table_entry->GetBlockIndex(txn_idx); u64 table_idx = 0; auto table_ref = MakeShared(table_entry, std::move(columns), block_index, alias, table_idx, names_ptr, types_ptr); @@ -1043,8 +1042,7 @@ TEST_F(CatalogDeltaReplayTest, replay_table_single_index_named_db) { columns.emplace_back(idx); } - TxnTimeStamp begin_ts = txn_idx->BeginTS(); - SharedPtr block_index = table_entry->GetBlockIndex(begin_ts); + SharedPtr block_index = table_entry->GetBlockIndex(txn_idx); u64 table_idx = 0; auto table_ref = MakeShared(table_entry, std::move(columns), block_index, alias, table_idx, names_ptr, types_ptr); @@ -1193,8 +1191,7 @@ TEST_F(CatalogDeltaReplayTest, replay_table_single_index_and_compact) { columns.emplace_back(idx); } - TxnTimeStamp begin_ts = txn_idx->BeginTS(); - SharedPtr block_index = table_entry->GetBlockIndex(begin_ts); + SharedPtr block_index = table_entry->GetBlockIndex(txn_idx); u64 table_idx = 0; auto table_ref = MakeShared(table_entry, std::move(columns), block_index, alias, table_idx, names_ptr, types_ptr); diff --git a/src/unit_test/storage/wal/checkpoint.cpp b/src/unit_test/storage/wal/checkpoint.cpp index 6329990b4d..7159c58956 100644 --- a/src/unit_test/storage/wal/checkpoint.cpp +++ b/src/unit_test/storage/wal/checkpoint.cpp @@ -80,7 +80,7 @@ class CheckpointTest : public BaseTest { } // wait for at most 10s if (end - start > 10) { - UnrecoverableException("WaitFlushDeltaOp timeout"); + UnrecoverableError("WaitFlushDeltaOp timeout"); } } } @@ -274,8 +274,7 @@ TEST_F(CheckpointTest, test_index_replay_with_full_and_delta_checkpoint1) { auto [table_entry, status1] = txn->GetTableByName(*db_name, *table_name); EXPECT_TRUE(status1.ok()); - TxnTimeStamp begin_ts = txn->BeginTS(); - auto table_ref = BaseTableRef::FakeTableRef(table_entry, begin_ts); + auto table_ref = BaseTableRef::FakeTableRef(table_entry, txn); auto [table_index_entry, status2] = txn->CreateIndexDef(table_entry, index_base, ConflictType::kError); EXPECT_TRUE(status2.ok()); auto status3 = txn->CreateIndexPrepare(table_index_entry, table_ref.get(), false); @@ -316,8 +315,7 @@ TEST_F(CheckpointTest, test_index_replay_with_full_and_delta_checkpoint1) { auto [table_entry, status1] = txn->GetTableByName(*db_name, *table_name); EXPECT_TRUE(status1.ok()); - TxnTimeStamp begin_ts = txn->BeginTS(); - auto table_ref = BaseTableRef::FakeTableRef(table_entry, begin_ts); + auto table_ref = BaseTableRef::FakeTableRef(table_entry, txn); auto [table_index_entry, status2] = txn->CreateIndexDef(table_entry, index_base, ConflictType::kError); EXPECT_TRUE(status2.ok()); auto status3 = txn->CreateIndexPrepare(table_index_entry, table_ref.get(), false); @@ -384,8 +382,7 @@ TEST_F(CheckpointTest, test_index_replay_with_full_and_delta_checkpoint2) { auto [table_entry, status1] = txn->GetTableByName(*db_name, *table_name); EXPECT_TRUE(status1.ok()); - TxnTimeStamp begin_ts = txn->BeginTS(); - auto table_ref = BaseTableRef::FakeTableRef(table_entry, begin_ts); + auto table_ref = BaseTableRef::FakeTableRef(table_entry, txn); auto [table_index_entry, status2] = txn->CreateIndexDef(table_entry, index_base, ConflictType::kError); EXPECT_TRUE(status2.ok()); auto status3 = txn->CreateIndexPrepare(table_index_entry, table_ref.get(), false); diff --git a/src/unit_test/storage/wal/wal_replay.cpp b/src/unit_test/storage/wal/wal_replay.cpp index fd571e8870..8da6f5a3af 100644 --- a/src/unit_test/storage/wal/wal_replay.cpp +++ b/src/unit_test/storage/wal/wal_replay.cpp @@ -846,7 +846,7 @@ TEST_F(WalReplayTest, wal_replay_create_index_IvfFlat) { auto [table_entry, table_status] = txn->GetTableByName(db_name, table_name); EXPECT_EQ(table_status.ok(), true); { - auto table_ref = BaseTableRef::FakeTableRef(table_entry, txn->BeginTS()); + auto table_ref = BaseTableRef::FakeTableRef(table_entry, txn); auto result = txn->CreateIndexDef(table_entry, index_base_ivf, conflict_type); auto *table_index_entry = std::get<0>(result); auto status = std::get<1>(result); @@ -953,7 +953,7 @@ TEST_F(WalReplayTest, wal_replay_create_index_hnsw) { auto [table_entry, table_status] = txn->GetTableByName(db_name, table_name); EXPECT_EQ(table_status.ok(), true); { - auto table_ref = BaseTableRef::FakeTableRef(table_entry, txn->BeginTS()); + auto table_ref = BaseTableRef::FakeTableRef(table_entry, txn); auto result = txn->CreateIndexDef(table_entry, index_base_hnsw, conflict_type); auto *table_index_entry = std::get<0>(result); auto status = std::get<1>(result);