Skip to content

Commit da54fc2

Browse files
authored
fix data race in PhysicalFilter (infiniflow#548)
1 parent b925ebb commit da54fc2

File tree

4 files changed

+10
-29
lines changed

4 files changed

+10
-29
lines changed

src/executor/expression/expression_selector.cpp

+1-2
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,6 @@ import data_type;
3232

3333
import infinity_exception;
3434

35-
3635
namespace infinity {
3736

3837
SizeT ExpressionSelector::Select(const SharedPtr<BaseExpression> &expr,
@@ -48,8 +47,8 @@ SizeT ExpressionSelector::Select(const SharedPtr<BaseExpression> &expr,
4847

4948
Select(expr, state, count, input_select, output_true_select, output_false_select);
5049

51-
output_data_block->UnInit();
5250
// Shrink the input data block into output data block
51+
// this Init function will throw if output_data_block is already initialized before
5352
output_data_block->Init(input_data_block, output_true_select);
5453
return output_true_select->Size();
5554
}

src/executor/operator/physical_filter.cpp

+7-8
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,8 @@
1414

1515
module;
1616

17+
module physical_filter;
18+
1719
import stl;
1820
import txn;
1921
import query_context;
@@ -23,14 +25,13 @@ import data_table;
2325
import physical_operator_type;
2426
import operator_state;
2527
import expression_state;
28+
import expression_selector;
2629
import data_block;
2730
import logger;
2831
import third_party;
2932

3033
import infinity_exception;
3134

32-
module physical_filter;
33-
3435
namespace infinity {
3536

3637
void PhysicalFilter::Init() {
@@ -66,19 +67,17 @@ bool PhysicalFilter::Execute(QueryContext *, OperatorState *operator_state) {
6667

6768
for(SizeT block_idx = 0; block_idx < input_block_count; ++ block_idx) {
6869

70+
// create uninitialized data block for output
6971
UniquePtr<DataBlock> data_block = DataBlock::MakeUniquePtr();
70-
data_block->Init(*GetOutputTypes());
7172
DataBlock* output_data_block = data_block.get();
7273
operator_state->data_block_array_.emplace_back(std::move(data_block));
7374

7475
SharedPtr<ExpressionState> condition_state = ExpressionState::CreateState(condition_);
7576
DataBlock* input_data_block = prev_op_state->data_block_array_[block_idx].get();
7677

77-
SizeT selected_count = selector_.Select(condition_,
78-
condition_state,
79-
input_data_block,
80-
output_data_block,
81-
input_data_block->row_count());
78+
// selector contains a pointer to input data, which should not be shared by multiple tasks
79+
ExpressionSelector selector;
80+
SizeT selected_count = selector.Select(condition_, condition_state, input_data_block, output_data_block, input_data_block->row_count());
8281

8382
LOG_TRACE(fmt::format("{} rows after filter", selected_count));
8483
}

src/executor/operator/physical_filter.cppm

-5
Original file line numberDiff line numberDiff line change
@@ -24,8 +24,6 @@ import physical_operator;
2424
import physical_operator_type;
2525
import base_expression;
2626
import data_table;
27-
import expression_evaluator;
28-
import expression_selector;
2927
import load_meta;
3028
import infinity_exception;
3129
import internal_types;
@@ -55,9 +53,6 @@ public:
5553
private:
5654
SharedPtr<BaseExpression> condition_;
5755

58-
ExpressionEvaluator executor_;
59-
ExpressionSelector selector_;
60-
6156
SharedPtr<DataTable> input_table_{};
6257
};
6358

src/executor/operator/physical_index_scan.cpp

+2-14
Original file line numberDiff line numberDiff line change
@@ -22,22 +22,12 @@ module physical_index_scan;
2222
import query_context;
2323
import operator_state;
2424
import default_values;
25-
import base_expression;
26-
import expression_type;
27-
import value_expression;
28-
import column_expression;
29-
import cast_expression;
30-
import function_expression;
31-
import expression_evaluator;
32-
import expression_state;
33-
import column_vector;
25+
import buffer_handle;
3426
import infinity_exception;
3527
import logger;
3628
import third_party;
3729
import txn;
3830
import data_block;
39-
import default_values;
40-
import buffer_handle;
4131
import secondary_index_scan_execute_expression;
4232
import logical_type;
4333
import segment_column_index_entry;
@@ -465,7 +455,6 @@ class FilterResult {
465455
// delete_filter: return false if the row is deleted
466456
std::visit(Overload{[&](const Bitmask &bitmask) {
467457
u32 output_block_row_id = 0;
468-
u32 output_block_idx = 0;
469458
DataBlock *output_block_ptr = output_data_blocks.back().get();
470459
// TODO: 64 bit in a loop?
471460
const u32 segment_row_count = SegmentRowCount();
@@ -495,8 +484,7 @@ class FilterResult {
495484
},
496485
[&](const Vector<u32> &selected_rows) {
497486
u32 output_block_row_id = 0;
498-
u32 output_block_idx = 0;
499-
DataBlock *output_block_ptr = output_data_blocks[output_block_idx++].get();
487+
DataBlock *output_block_ptr = output_data_blocks.back().get();
500488
for (u32 segment_offset : selected_rows) {
501489
if (!delete_filter(segment_offset)) {
502490
// deleted

0 commit comments

Comments
 (0)