Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix parallel readwrite #1191

Merged
merged 5 commits into from
May 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 9 additions & 10 deletions src/executor/operator/physical_index_scan.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -452,14 +452,13 @@ struct FilterResult {
}

template <typename ColumnValueType>
inline void
ExecuteSingleRangeT(const FilterIntervalRangeT<ColumnValueType> &interval_range, SegmentIndexEntry &index_entry, const TxnTimeStamp ts) {
inline void ExecuteSingleRangeT(const FilterIntervalRangeT<ColumnValueType> &interval_range, SegmentIndexEntry &index_entry, Txn *txn) {
Vector<UniquePtr<TrunkReader<ColumnValueType>>> trunk_readers;
Tuple<Vector<SharedPtr<ChunkIndexEntry>>, SharedPtr<SecondaryIndexInMem>> chunks_snapshot = index_entry.GetSecondaryIndexSnapshot();
const u32 segment_row_count = SegmentRowCount();
auto &[chunk_index_entries, memory_secondary_index] = chunks_snapshot;
for (const auto &chunk_index_entry : chunk_index_entries) {
if (chunk_index_entry->CheckVisible(ts)) {
if (chunk_index_entry->CheckVisible(txn)) {
trunk_readers.emplace_back(MakeUnique<TrunkReaderT<ColumnValueType>>(segment_row_count, chunk_index_entry));
}
}
Expand Down Expand Up @@ -492,7 +491,7 @@ struct FilterResult {
inline void ExecuteSingleRange(const HashMap<ColumnID, TableIndexEntry *> &column_index_map,
const FilterExecuteSingleRange &single_range,
SegmentID segment_id,
const TxnTimeStamp ts) {
Txn *txn) {
// step 1. check if range is empty
if (single_range.IsEmpty()) {
return SetEmptyResult();
Expand All @@ -504,7 +503,7 @@ struct FilterResult {
// step 3. search index
auto &interval_range_variant = single_range.GetIntervalRange();
std::visit(Overload{[&]<typename ColumnValueType>(const FilterIntervalRangeT<ColumnValueType> &interval_range) {
ExecuteSingleRangeT(interval_range, index_entry, ts);
ExecuteSingleRangeT(interval_range, index_entry, txn);
},
[](const std::monostate &empty) {
UnrecoverableError("FilterResult::ExecuteSingleRange(): class member interval_range_ not initialized!");
Expand Down Expand Up @@ -598,7 +597,7 @@ FilterResult SolveSecondaryIndexFilterInner(const Vector<FilterExecuteElem> &fil
const SegmentID segment_id,
const u32 segment_row_count,
const u32 segment_row_actual_count,
const TxnTimeStamp ts) {
Txn *txn) {
Vector<FilterResult> result_stack;
// execute filter_execute_command_ (Reverse Polish notation)
for (auto const &elem : filter_execute_command) {
Expand Down Expand Up @@ -628,7 +627,7 @@ FilterResult SolveSecondaryIndexFilterInner(const Vector<FilterExecuteElem> &fil
},
[&](const FilterExecuteSingleRange &single_range) {
result_stack.emplace_back(segment_row_count, segment_row_actual_count);
result_stack.back().ExecuteSingleRange(column_index_map, single_range, segment_id, ts);
result_stack.back().ExecuteSingleRange(column_index_map, single_range, segment_id, txn);
}},
elem);
}
Expand All @@ -644,13 +643,13 @@ std::variant<Vector<u32>, Bitmask> SolveSecondaryIndexFilter(const Vector<Filter
const SegmentID segment_id,
const u32 segment_row_count,
const u32 segment_row_actual_count,
const TxnTimeStamp ts) {
Txn *txn) {
if (filter_execute_command.empty()) {
// return all true
return std::variant<Vector<u32>, Bitmask>(std::in_place_type<Bitmask>);
}
auto result =
SolveSecondaryIndexFilterInner(filter_execute_command, column_index_map, segment_id, segment_row_count, segment_row_actual_count, ts);
SolveSecondaryIndexFilterInner(filter_execute_command, column_index_map, segment_id, segment_row_count, segment_row_actual_count, txn);
return std::move(result.selected_rows_);
}

Expand Down Expand Up @@ -709,7 +708,7 @@ void PhysicalIndexScan::ExecuteInternal(QueryContext *query_context, IndexScanOp
DeleteFilter delete_filter(segment_entry, begin_ts, segment_entry->row_count(begin_ts));
// output
const auto result =
SolveSecondaryIndexFilterInner(filter_execute_command_, column_index_map_, segment_id, segment_row_count, segment_row_actual_count, begin_ts);
SolveSecondaryIndexFilterInner(filter_execute_command_, column_index_map_, segment_id, segment_row_count, segment_row_actual_count, txn);
result.Output(output_data_blocks, segment_id, delete_filter);

LOG_TRACE(fmt::format("IndexScan: job number: {}, segment_ids.size(): {}, finished", next_idx, segment_ids.size()));
Expand Down
4 changes: 3 additions & 1 deletion src/executor/operator/physical_index_scan.cppm
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ import bitmask;

namespace infinity {

class Txn;

// for int range filter, x > n is equivalent to x >= n + 1
// for float range filter, x > f is equivalent to x >= std::nextafter(f, INFINITY)
// we can use this to simplify the filter
Expand Down Expand Up @@ -110,6 +112,6 @@ export std::variant<Vector<u32>, Bitmask> SolveSecondaryIndexFilter(const Vector
const SegmentID segment_id,
const u32 segment_row_count,
const u32 segment_row_actual_count,
const TxnTimeStamp ts);
Txn *txn);

} // namespace infinity
9 changes: 5 additions & 4 deletions src/executor/operator/physical_knn_scan.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -241,9 +241,10 @@ SizeT PhysicalKnnScan::BlockEntryCount() const { return base_table_ref_->block_i

template <typename DataType, template <typename, typename> typename C>
void PhysicalKnnScan::ExecuteInternal(QueryContext *query_context, KnnScanOperatorState *operator_state) {
TxnTimeStamp begin_ts = query_context->GetTxn()->BeginTS();
Txn *txn = query_context->GetTxn();
TxnTimeStamp begin_ts = txn->BeginTS();

if (!common_query_filter_->TryFinishBuild(begin_ts, query_context->GetTxn()->buffer_mgr())) {
if (!common_query_filter_->TryFinishBuild(txn)) {
// not ready, abort and wait for next time
return;
}
Expand Down Expand Up @@ -484,7 +485,7 @@ void PhysicalKnnScan::ExecuteInternal(QueryContext *query_context, KnnScanOperat
if (block_entry == nullptr) {
UnrecoverableError(
fmt::format("Cannot find segment id: {}, block id: {}, index chunk is {}", segment_id, block_id, chunk_id));
}
} // this is for debug
}
merge_heap->Search(0, d_ptr.get(), row_ids.get(), result_n);
}
Expand All @@ -493,7 +494,7 @@ void PhysicalKnnScan::ExecuteInternal(QueryContext *query_context, KnnScanOperat
auto [chunk_index_entries, memory_index_entry] = segment_index_entry->GetHnswIndexSnapshot();
int i = 0;
for (auto &chunk_index_entry : chunk_index_entries) {
if (chunk_index_entry->CheckVisible(begin_ts)) {
if (chunk_index_entry->CheckVisible(txn)) {
BufferHandle index_handle = chunk_index_entry->GetIndex();
hnsw_search(index_handle, false, i++);
}
Expand Down
10 changes: 5 additions & 5 deletions src/executor/operator/physical_match.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ module;
module physical_match;

import stl;

import txn;
import query_context;
import operator_state;
import physical_operator;
Expand Down Expand Up @@ -517,9 +517,8 @@ bool PhysicalMatch::ExecuteInnerHomebrewed(QueryContext *query_context, Operator
auto execute_start_time = std::chrono::high_resolution_clock::now();
// 1. build QueryNode tree
// 1.1 populate column2analyzer
TransactionID txn_id = query_context->GetTxn()->TxnID();
TxnTimeStamp begin_ts = query_context->GetTxn()->BeginTS();
QueryBuilder query_builder(txn_id, begin_ts, base_table_ref_);
Txn *txn = query_context->GetTxn();
QueryBuilder query_builder(txn, base_table_ref_);
auto finish_init_query_builder_time = std::chrono::high_resolution_clock::now();
TimeDurationType query_builder_init_duration = finish_init_query_builder_time - execute_start_time;
LOG_TRACE(fmt::format("PhysicalMatch Part 0.1: Init QueryBuilder time: {} ms", query_builder_init_duration.count()));
Expand Down Expand Up @@ -805,7 +804,8 @@ bool PhysicalMatch::Execute(QueryContext *query_context, OperatorState *operator
auto start_time = std::chrono::high_resolution_clock::now();
assert(common_query_filter_);
{
bool try_result = common_query_filter_->TryFinishBuild(query_context->GetTxn()->BeginTS(), query_context->GetTxn()->buffer_mgr());
Txn *txn = query_context->GetTxn();
bool try_result = common_query_filter_->TryFinishBuild(txn);
auto finish_filter_time = std::chrono::high_resolution_clock::now();
std::chrono::duration<float, std::milli> filter_duration = finish_filter_time - start_time;
LOG_TRACE(fmt::format("PhysicalMatch Prepare: Filter time: {} ms", filter_duration.count()));
Expand Down
6 changes: 3 additions & 3 deletions src/planner/bound/base_table_ref.cppm
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ export module base_table_ref;
import stl;
import table_ref;
import table_entry;

import txn;
import table_function;
import block_index;
import internal_types;
Expand All @@ -48,8 +48,8 @@ public:
explicit BaseTableRef(TableEntry *table_entry, SharedPtr<BlockIndex> block_index)
: TableRef(TableRefType::kTable, ""), table_entry_ptr_(table_entry), block_index_(std::move(block_index)) {}

static SharedPtr<BaseTableRef> FakeTableRef(TableEntry *table_entry, TxnTimeStamp ts) {
SharedPtr<BlockIndex> block_index = table_entry->GetBlockIndex(ts);
static SharedPtr<BaseTableRef> FakeTableRef(TableEntry *table_entry, Txn *txn) {
SharedPtr<BlockIndex> block_index = table_entry->GetBlockIndex(txn);
return MakeShared<BaseTableRef>(table_entry, std::move(block_index));
}

Expand Down
6 changes: 3 additions & 3 deletions src/planner/query_binder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ import data_type;
import logical_type;
import base_entry;
import view_entry;
import txn;

namespace infinity {

Expand Down Expand Up @@ -425,10 +426,9 @@ SharedPtr<BaseTableRef> QueryBinder::BuildBaseTable(QueryContext *query_context,
columns.emplace_back(idx);
}

// TransactionID txn_id = query_context->GetTxn()->TxnID();
TxnTimeStamp begin_ts = query_context->GetTxn()->BeginTS();
Txn *txn = query_context->GetTxn();

SharedPtr<BlockIndex> block_index = table_entry->GetBlockIndex(begin_ts);
SharedPtr<BlockIndex> block_index = table_entry->GetBlockIndex(txn);

u64 table_index = bind_context_ptr_->GenerateTableIndex();
auto table_ref = MakeShared<BaseTableRef>(table_entry, std::move(columns), block_index, alias, table_index, names_ptr, types_ptr);
Expand Down
2 changes: 1 addition & 1 deletion src/storage/bg_task/compact_segments_task.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ void CompactSegmentsTask::CompactSegments(CompactSegmentsTaskState &state) {
}

auto new_segment = CompactSegmentsToOne(state, to_compact_segments);
block_index->Insert(new_segment.get(), UNCOMMIT_TS, false);
block_index->Insert(new_segment.get(), txn_);

segment_data.emplace_back(new_segment, std::move(to_compact_segments));
old_segments.insert(old_segments.end(), to_compact_segments.begin(), to_compact_segments.end());
Expand Down
24 changes: 14 additions & 10 deletions src/storage/common/block_index.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,32 +14,36 @@

module;

#include <vector>

module block_index;

import stl;
import segment_entry;
import global_block_id;
import block_iter;
import segment_iter;
import txn;

namespace infinity {

void BlockIndex::Insert(SegmentEntry *segment_entry, TxnTimeStamp timestamp, bool check_ts) {
if (!check_ts || segment_entry->CheckVisible(timestamp)) {
void BlockIndex::Insert(SegmentEntry *segment_entry, Txn *txn) {
if (segment_entry->CheckVisible(txn)) {
u32 segment_id = segment_entry->segment_id();
segments_.emplace_back(segment_entry);
segment_index_.emplace(segment_id, segment_entry);
BlocksInfo blocks_info;

auto block_entry_iter = BlockEntryIter(segment_entry);
for (auto *block_entry = block_entry_iter.Next(); block_entry != nullptr; block_entry = block_entry_iter.Next()) {
if (timestamp >= block_entry->min_row_ts()) {
blocks_info.block_map_.emplace(block_entry->block_id(), block_entry);
global_blocks_.emplace_back(GlobalBlockID{segment_id, block_entry->block_id()});
{
auto block_guard = segment_entry->GetBlocksGuard();
for (const auto &block_entry : block_guard.block_entries_) {
if (block_entry->CheckVisible(txn)) {
blocks_info.block_map_.emplace(block_entry->block_id(), block_entry.get());
global_blocks_.emplace_back(GlobalBlockID{segment_id, block_entry->block_id()});
}
}
}
blocks_info.segment_offset_ = segment_entry->row_count(timestamp);
// blocks_info.segment_offset_ = segment_entry->row_count(); // use false row count to pass benchmark
TxnTimeStamp begin_ts = txn->BeginTS();
blocks_info.segment_offset_ = segment_entry->row_count(begin_ts);

segment_block_index_.emplace(segment_id, std::move(blocks_info));
}
Expand Down
3 changes: 2 additions & 1 deletion src/storage/common/block_index.cppm
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ namespace infinity {

struct BlockEntry;
struct SegmentEntry;
class Txn;

export struct BlockIndex {
private:
Expand All @@ -32,7 +33,7 @@ private:
};

public:
void Insert(SegmentEntry *segment_entry, TxnTimeStamp timestamp, bool check_ts = true);
void Insert(SegmentEntry *segment_entry, Txn *txn);

void Reserve(SizeT n);

Expand Down
6 changes: 4 additions & 2 deletions src/storage/invertedindex/column_index_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,9 @@ void TableIndexReaderCache::UpdateKnownUpdateTs(TxnTimeStamp ts, std::shared_mut
last_known_update_ts_ = std::max(last_known_update_ts_, ts);
}

IndexReader TableIndexReaderCache::GetIndexReader(TransactionID txn_id, TxnTimeStamp begin_ts, TableEntry *self_table_entry_ptr) {
IndexReader TableIndexReaderCache::GetIndexReader(Txn *txn, TableEntry *self_table_entry_ptr) {
TxnTimeStamp begin_ts = txn->BeginTS();
TransactionID txn_id = txn->TxnID();
IndexReader result;
result.session_pool_ = MakeShared<MemoryPool>();
std::scoped_lock lock(mutex_);
Expand Down Expand Up @@ -176,7 +178,7 @@ IndexReader TableIndexReaderCache::GetIndexReader(TransactionID txn_id, TxnTimeS
optionflag_t flag = index_full_text->flag_;
String index_dir = *(table_index_entry->index_dir());
Map<SegmentID, SharedPtr<SegmentIndexEntry>> index_by_segment =
table_index_entry->GetIndexBySegmentSnapshot(self_table_entry_ptr, begin_ts);
table_index_entry->GetIndexBySegmentSnapshot(self_table_entry_ptr, txn);
column_index_reader->Open(flag, std::move(index_dir), std::move(index_by_segment));
(*result.column_index_readers_)[column_id] = std::move(column_index_reader);
}
Expand Down
3 changes: 2 additions & 1 deletion src/storage/invertedindex/column_index_reader.cppm
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ export module column_index_reader;
namespace infinity {
struct TableEntry;
class BlockMaxTermDocIterator;
class Txn;

export class ColumnIndexReader {
public:
Expand Down Expand Up @@ -75,7 +76,7 @@ export class TableIndexReaderCache {
public:
void UpdateKnownUpdateTs(TxnTimeStamp ts, std::shared_mutex &segment_update_ts_mutex, TxnTimeStamp &segment_update_ts);

IndexReader GetIndexReader(TransactionID txn_id, TxnTimeStamp begin_ts, TableEntry *table_entry_ptr);
IndexReader GetIndexReader(Txn *txn, TableEntry *table_entry_ptr);

private:
std::mutex mutex_;
Expand Down
5 changes: 2 additions & 3 deletions src/storage/invertedindex/search/query_builder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,8 @@ import third_party;

namespace infinity {

QueryBuilder::QueryBuilder(TransactionID txn_id, TxnTimeStamp begin_ts, SharedPtr<BaseTableRef> &base_table_ref)
: txn_id_(txn_id), begin_ts_(begin_ts), table_entry_(base_table_ref->table_entry_ptr_),
index_reader_(table_entry_->GetFullTextIndexReader(txn_id_, begin_ts_)) {
QueryBuilder::QueryBuilder(Txn *txn, SharedPtr<BaseTableRef> &base_table_ref)
: table_entry_(base_table_ref->table_entry_ptr_), index_reader_(table_entry_->GetFullTextIndexReader(txn)) {
u64 total_row_count = 0;
for (SegmentEntry *segment_entry : base_table_ref->block_index_->segments_) {
total_row_count += segment_entry->row_count();
Expand Down
5 changes: 2 additions & 3 deletions src/storage/invertedindex/search/query_builder.cppm
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import base_table_ref;

namespace infinity {

class Txn;
struct QueryNode;
export struct FullTextQueryContext {
UniquePtr<QueryNode> query_tree_;
Expand All @@ -37,7 +38,7 @@ class EarlyTerminateIterator;

export class QueryBuilder {
public:
QueryBuilder(TransactionID txn_id, TxnTimeStamp begin_ts, SharedPtr<BaseTableRef> &base_table_ref);
QueryBuilder(Txn *txn, SharedPtr<BaseTableRef> &base_table_ref);

~QueryBuilder();

Expand All @@ -50,8 +51,6 @@ public:
inline float Score(RowID doc_id) { return scorer_.Score(doc_id); }

private:
TransactionID txn_id_{};
TxnTimeStamp begin_ts_{};
TableEntry *table_entry_{nullptr};
IndexReader index_reader_;
Scorer scorer_;
Expand Down
16 changes: 10 additions & 6 deletions src/storage/meta/catalog.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -206,15 +206,17 @@ Tuple<SharedPtr<TableEntry>, Status> Catalog::DropTableByName(const String &db_n
return db_entry->DropTable(table_name, conflict_type, txn_id, begin_ts, txn_mgr);
}

Status Catalog::GetTables(const String &db_name, Vector<TableDetail> &output_table_array, TransactionID txn_id, TxnTimeStamp begin_ts) {
Status Catalog::GetTables(const String &db_name, Vector<TableDetail> &output_table_array, Txn *txn) {
TransactionID txn_id = txn->TxnID();
TxnTimeStamp begin_ts = txn->BeginTS();
// Check the db entries
auto [db_entry, status] = this->GetDatabase(db_name, txn_id, begin_ts);
if (!status.ok()) {
// Error
LOG_ERROR(fmt::format("Database: {} is invalid.", db_name));
return status;
}
return db_entry->GetTablesDetail(txn_id, begin_ts, output_table_array);
return db_entry->GetTablesDetail(txn, output_table_array);
}

Tuple<TableEntry *, Status> Catalog::GetTableByName(const String &db_name, const String &table_name, TransactionID txn_id, TxnTimeStamp begin_ts) {
Expand All @@ -229,16 +231,17 @@ Tuple<TableEntry *, Status> Catalog::GetTableByName(const String &db_name, const
return db_entry->GetTableCollection(table_name, txn_id, begin_ts);
}

Tuple<SharedPtr<TableInfo>, Status>
Catalog::GetTableInfo(const String &db_name, const String &table_name, TransactionID txn_id, TxnTimeStamp begin_ts) {
Tuple<SharedPtr<TableInfo>, Status> Catalog::GetTableInfo(const String &db_name, const String &table_name, Txn *txn) {
TransactionID txn_id = txn->TxnID();
TxnTimeStamp begin_ts = txn->BeginTS();
auto [db_entry, status] = this->GetDatabase(db_name, txn_id, begin_ts);
if (!status.ok()) {
// Error
LOG_ERROR(fmt::format("Database: {} is invalid.", db_name));
return {nullptr, status};
}

return db_entry->GetTableInfo(table_name, txn_id, begin_ts);
return db_entry->GetTableInfo(table_name, txn);
}

Status Catalog::RemoveTableEntry(TableEntry *table_entry, TransactionID txn_id) {
Expand Down Expand Up @@ -657,7 +660,8 @@ void Catalog::LoadFromEntryDelta(TxnTimeStamp max_commit_ts, BufferManager *buff
commit_ts,
check_point_ts,
check_point_row_count,
buffer_mgr);
buffer_mgr,
txn_id);

if (merge_flag == MergeFlag::kNew) {
if (!block_filter_binary_data.empty()) {
Expand Down
Loading
Loading