Skip to content

Commit

Permalink
Fix: rebase compile fail.
Browse files Browse the repository at this point in the history
  • Loading branch information
small-turtle-1 committed May 8, 2024
1 parent 63c46b8 commit 941c85d
Show file tree
Hide file tree
Showing 7 changed files with 22 additions and 19 deletions.
19 changes: 9 additions & 10 deletions src/executor/operator/physical_index_scan.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -452,14 +452,13 @@ struct FilterResult {
}

template <typename ColumnValueType>
inline void
ExecuteSingleRangeT(const FilterIntervalRangeT<ColumnValueType> &interval_range, SegmentIndexEntry &index_entry, const TxnTimeStamp ts) {
inline void ExecuteSingleRangeT(const FilterIntervalRangeT<ColumnValueType> &interval_range, SegmentIndexEntry &index_entry, Txn *txn) {
Vector<UniquePtr<TrunkReader<ColumnValueType>>> trunk_readers;
Tuple<Vector<SharedPtr<ChunkIndexEntry>>, SharedPtr<SecondaryIndexInMem>> chunks_snapshot = index_entry.GetSecondaryIndexSnapshot();
const u32 segment_row_count = SegmentRowCount();
auto &[chunk_index_entries, memory_secondary_index] = chunks_snapshot;
for (const auto &chunk_index_entry : chunk_index_entries) {
if (chunk_index_entry->CheckVisible(ts)) {
if (chunk_index_entry->CheckVisible(txn)) {
trunk_readers.emplace_back(MakeUnique<TrunkReaderT<ColumnValueType>>(segment_row_count, chunk_index_entry));
}
}
Expand Down Expand Up @@ -492,7 +491,7 @@ struct FilterResult {
inline void ExecuteSingleRange(const HashMap<ColumnID, TableIndexEntry *> &column_index_map,
const FilterExecuteSingleRange &single_range,
SegmentID segment_id,
const TxnTimeStamp ts) {
Txn *txn) {
// step 1. check if range is empty
if (single_range.IsEmpty()) {
return SetEmptyResult();
Expand All @@ -504,7 +503,7 @@ struct FilterResult {
// step 3. search index
auto &interval_range_variant = single_range.GetIntervalRange();
std::visit(Overload{[&]<typename ColumnValueType>(const FilterIntervalRangeT<ColumnValueType> &interval_range) {
ExecuteSingleRangeT(interval_range, index_entry, ts);
ExecuteSingleRangeT(interval_range, index_entry, txn);
},
[](const std::monostate &empty) {
UnrecoverableError("FilterResult::ExecuteSingleRange(): class member interval_range_ not initialized!");
Expand Down Expand Up @@ -598,7 +597,7 @@ FilterResult SolveSecondaryIndexFilterInner(const Vector<FilterExecuteElem> &fil
const SegmentID segment_id,
const u32 segment_row_count,
const u32 segment_row_actual_count,
const TxnTimeStamp ts) {
Txn *txn) {
Vector<FilterResult> result_stack;
// execute filter_execute_command_ (Reverse Polish notation)
for (auto const &elem : filter_execute_command) {
Expand Down Expand Up @@ -628,7 +627,7 @@ FilterResult SolveSecondaryIndexFilterInner(const Vector<FilterExecuteElem> &fil
},
[&](const FilterExecuteSingleRange &single_range) {
result_stack.emplace_back(segment_row_count, segment_row_actual_count);
result_stack.back().ExecuteSingleRange(column_index_map, single_range, segment_id, ts);
result_stack.back().ExecuteSingleRange(column_index_map, single_range, segment_id, txn);
}},
elem);
}
Expand All @@ -644,13 +643,13 @@ std::variant<Vector<u32>, Bitmask> SolveSecondaryIndexFilter(const Vector<Filter
const SegmentID segment_id,
const u32 segment_row_count,
const u32 segment_row_actual_count,
const TxnTimeStamp ts) {
Txn *txn) {
if (filter_execute_command.empty()) {
// return all true
return std::variant<Vector<u32>, Bitmask>(std::in_place_type<Bitmask>);
}
auto result =
SolveSecondaryIndexFilterInner(filter_execute_command, column_index_map, segment_id, segment_row_count, segment_row_actual_count, ts);
SolveSecondaryIndexFilterInner(filter_execute_command, column_index_map, segment_id, segment_row_count, segment_row_actual_count, txn);
return std::move(result.selected_rows_);
}

Expand Down Expand Up @@ -709,7 +708,7 @@ void PhysicalIndexScan::ExecuteInternal(QueryContext *query_context, IndexScanOp
DeleteFilter delete_filter(segment_entry, begin_ts, segment_entry->row_count(begin_ts));
// output
const auto result =
SolveSecondaryIndexFilterInner(filter_execute_command_, column_index_map_, segment_id, segment_row_count, segment_row_actual_count, begin_ts);
SolveSecondaryIndexFilterInner(filter_execute_command_, column_index_map_, segment_id, segment_row_count, segment_row_actual_count, txn);
result.Output(output_data_blocks, segment_id, delete_filter);

LOG_TRACE(fmt::format("IndexScan: job number: {}, segment_ids.size(): {}, finished", next_idx, segment_ids.size()));
Expand Down
2 changes: 1 addition & 1 deletion src/executor/operator/physical_index_scan.cppm
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,6 @@ export std::variant<Vector<u32>, Bitmask> SolveSecondaryIndexFilter(const Vector
const SegmentID segment_id,
const u32 segment_row_count,
const u32 segment_row_actual_count,
const TxnTimeStamp ts);
Txn *txn);

} // namespace infinity
2 changes: 1 addition & 1 deletion src/executor/operator/physical_knn_scan.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,7 @@ void PhysicalKnnScan::ExecuteInternal(QueryContext *query_context, KnnScanOperat
Txn *txn = query_context->GetTxn();
TxnTimeStamp begin_ts = txn->BeginTS();

if (!common_query_filter_->TryFinishBuild(begin_ts, query_context->GetTxn()->buffer_mgr())) {
if (!common_query_filter_->TryFinishBuild(txn) {
// not ready, abort and wait for next time
return;
}
Expand Down
3 changes: 2 additions & 1 deletion src/executor/operator/physical_match.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -804,7 +804,8 @@ bool PhysicalMatch::Execute(QueryContext *query_context, OperatorState *operator
auto start_time = std::chrono::high_resolution_clock::now();
assert(common_query_filter_);
{
bool try_result = common_query_filter_->TryFinishBuild(query_context->GetTxn()->BeginTS(), query_context->GetTxn()->buffer_mgr());
Txn *txn = query_context->GetTxn()
bool try_result = common_query_filter_->TryFinishBuild(txn);
auto finish_filter_time = std::chrono::high_resolution_clock::now();
std::chrono::duration<float, std::milli> filter_duration = finish_filter_time - start_time;
LOG_TRACE(fmt::format("PhysicalMatch Prepare: Filter time: {} ms", filter_duration.count()));
Expand Down
2 changes: 1 addition & 1 deletion src/storage/meta/entry/segment_index_entry.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -831,7 +831,7 @@ ChunkIndexEntry *SegmentIndexEntry::RebuildChunkIndexEntries(TxnTableStore *txn_
return nullptr;
}
for (const auto &chunk_index_entry : chunk_index_entries_) {
if (chunk_index_entry->CheckVisible(begin_ts)) {
if (chunk_index_entry->CheckVisible(txn)) {
row_count += chunk_index_entry->GetRowCount();
old_chunks.push_back(chunk_index_entry.get());
}
Expand Down
6 changes: 4 additions & 2 deletions src/storage/secondary_index/common_query_filter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,9 @@ CommonQueryFilter::CommonQueryFilter(SharedPtr<BaseExpression> original_filter,
}
}

void CommonQueryFilter::BuildFilter(u32 task_id, TxnTimeStamp begin_ts, BufferManager *buffer_mgr) {
void CommonQueryFilter::BuildFilter(u32 task_id, Txn *txn) {
auto *buffer_mgr = txn->buffer_mgr();
TxnTimeStamp begin_ts = txn->BeginTS();
const HashMap<SegmentID, SegmentEntry *> &segment_index = base_table_ref_->block_index_->segment_index_;
const SegmentID segment_id = tasks_[task_id];
const SegmentEntry *segment_entry = segment_index.at(segment_id);
Expand All @@ -145,7 +147,7 @@ void CommonQueryFilter::BuildFilter(u32 task_id, TxnTimeStamp begin_ts, BufferMa
segment_id,
segment_row_count,
segment_actual_row_count,
begin_ts);
txn);
if (std::visit(Overload{[](const Vector<u32> &v) -> bool { return v.empty(); }, [](const Bitmask &) -> bool { return false; }}, result_elem)) {
// empty result
return;
Expand Down
7 changes: 4 additions & 3 deletions src/storage/secondary_index/common_query_filter.cppm
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ class BaseTableRef;
class BaseExpression;
class QueryContext;
class BufferManager;
class Txn;
struct TableIndexEntry;

export struct CommonQueryFilter {
Expand Down Expand Up @@ -61,7 +62,7 @@ export struct CommonQueryFilter {
// 2. return true if the filter is available for query
// if other threads are building the filter, the filter is not available for query
// in this case, physical operator should return early and wait for next scheduling
bool TryFinishBuild(TxnTimeStamp begin_ts, BufferManager *buffer_mgr) {
bool TryFinishBuild(Txn *txn) {
if (finish_build_.test(std::memory_order_acquire)) {
return true;
}
Expand All @@ -74,7 +75,7 @@ export struct CommonQueryFilter {
}
task_id = begin_task_num_++;
}
BuildFilter(task_id, begin_ts, buffer_mgr);
BuildFilter(task_id, txn);
if (++end_task_num_ == total_task_num_) {
finish_build_.test_and_set(std::memory_order_release);
break;
Expand All @@ -89,7 +90,7 @@ export struct CommonQueryFilter {
void TryApplySecondaryIndexFilterOptimizer(QueryContext *query_context);

private:
void BuildFilter(u32 task_id, TxnTimeStamp begin_ts, BufferManager *buffer_mgr);
void BuildFilter(u32 task_id, Txn *txn);
};

} // namespace infinity

0 comments on commit 941c85d

Please sign in to comment.