Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix write conflict #1170

Merged
merged 18 commits into from
May 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 8 additions & 4 deletions .github/workflows/tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,10 @@ jobs:

- name: Unit test debug version
if: ${{ !cancelled() && !failure() }}
run: sudo docker exec infinity_build bash -c "mkdir -p /var/infinity && cd /infinity/ && cmake-build-debug/src/test_main"
run: sudo docker exec infinity_build bash -c "mkdir -p /var/infinity && cd /infinity/ && cmake-build-debug/src/test_main > unittest_debug.log 2>&1"

- name: Collect infinity unit test debug output
run: cat unittest_debug.log 2>/dev/null || true

- name: Install pysdk
if: ${{ !cancelled() && !failure() }}
Expand Down Expand Up @@ -113,7 +116,6 @@ jobs:
- name: Collect infinity debug output
# GitHub Actions interprets output lines starting with "Error" as error messages, and it automatically sets the step status to failed when such lines are detected.
if: ${{ !cancelled() }} # always run this step even if previous steps failed
run: cat debug.log 2>/dev/null || true

release_tests:
Expand Down Expand Up @@ -154,7 +156,10 @@ jobs:

- name: Unit test release version
if: ${{ !cancelled() && !failure() }}
run: sudo docker exec infinity_build bash -c "mkdir -p /var/infinity && cd /infinity/ && cmake-build-release/src/test_main"
run: sudo docker exec infinity_build bash -c "mkdir -p /var/infinity && cd /infinity/ && cmake-build-release/src/test_main > unittest_release.log 2>&1"

- name: Collect infinity unit test release output
run: cat unittest_release.log 2>/dev/null || true

- name: Install pysdk
if: ${{ !cancelled() && !failure() }}
Expand Down Expand Up @@ -207,7 +212,6 @@ jobs:
- name: Collect infinity release output
# GitHub Actions interprets output lines starting with "Error" as error messages, and it automatically sets the step status to failed when such lines are detected.
if: ${{ !cancelled() }} # always run this step even if previous steps failed
run: cat release.log 2>/dev/null || true

- name: Prepare sift dataset
Expand Down
4 changes: 1 addition & 3 deletions python/parallel_test/test_chaos.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,7 @@


class TestIndexParallel:

@pytest.mark.skip(
reason="Decrease row count exceed actual row count@src/storage/meta/entry/segment_entry.cppm:184, and update vector fail due to 'Not support to convert Embedding to Embedding'")
#@pytest.mark.skip(reason="To pass benchmark, use wrong row count in knn scan")
def test_chaos(self, get_infinity_connection_pool):
data = read_out_data()
connection_pool = get_infinity_connection_pool
Expand Down
2 changes: 0 additions & 2 deletions python/parallel_test/test_ddl_and_insert_delete.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@


class TestInsertDeleteUpdate:
@pytest.mark.skip(
reason="#issue 1087 Decrease row count exceed actual row count@src/storage/meta/entry/segment_entry.cppm:184")
def test_insert_delete_ddl_parallel(self, get_infinity_connection_pool):
connection_pool = get_infinity_connection_pool

Expand Down
1 change: 0 additions & 1 deletion python/parallel_test/test_insert_delete_parallel.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@


class TestInsertDeleteParallel:
# @pytest.mark.skip(reason="varchar bug, No such chunk in heap")
def test_insert_and_delete_parallel(self, get_infinity_connection_pool):
connection_pool = get_infinity_connection_pool
infinity_obj = connection_pool.get_conn()
Expand Down
2 changes: 0 additions & 2 deletions python/parallel_test/test_insert_delete_update.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,6 @@


class TestInsertDeleteUpdate:
@pytest.mark.skip(
reason="#issue 1087 Decrease row count exceed actual row count@src/storage/meta/entry/segment_entry.cppm:184")
def test_insert_delete_update_parallel_vec(self, get_infinity_connection_pool):
connection_pool = get_infinity_connection_pool
infinity_obj = connection_pool.get_conn()
Expand Down
1 change: 1 addition & 0 deletions src/common/stl.cppm
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ export namespace std {
using std::is_same;
using std::fill;
using std::lower_bound;
using std::upper_bound;

using std::condition_variable;
using std::condition_variable_any;
Expand Down
4 changes: 1 addition & 3 deletions src/executor/operator/physical_delete.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,6 @@ bool PhysicalDelete::Execute(QueryContext *query_context, OperatorState *operato
for(SizeT block_idx = 0; block_idx < data_block_count; ++ block_idx) {
DataBlock *input_data_block_ptr = prev_op_state->data_block_array_[block_idx].get();
auto txn = query_context->GetTxn();
const String& db_name = *table_entry_ptr_->GetDBName();
auto table_name = table_entry_ptr_->GetTableName();
Vector<RowID> row_ids;
for (SizeT i = 0; i < input_data_block_ptr->column_count(); i++) {
SharedPtr<ColumnVector> column_vector = input_data_block_ptr->column_vectors[i];
Expand All @@ -55,7 +53,7 @@ bool PhysicalDelete::Execute(QueryContext *query_context, OperatorState *operato
}
}
if (!row_ids.empty()) {
txn->Delete(db_name, *table_name, row_ids); // TODO: segment id in `row_ids` is fixed.
txn->Delete(table_entry_ptr_, row_ids); // TODO: segment id in `row_ids` is fixed.
DeleteOperatorState* delete_operator_state = static_cast<DeleteOperatorState*>(operator_state);
++ delete_operator_state->count_;
delete_operator_state->sum_ += row_ids.size();
Expand Down
5 changes: 1 addition & 4 deletions src/executor/operator/physical_import.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -619,10 +619,7 @@ void PhysicalImport::JSONLRowHandler(const nlohmann::json &line_json, Vector<Col

void PhysicalImport::SaveSegmentData(TableEntry *table_entry, Txn *txn, SharedPtr<SegmentEntry> segment_entry) {
segment_entry->FlushNewData();

const String &db_name = *table_entry->GetDBName();
const String &table_name = *table_entry->GetTableName();
txn->Import(db_name, table_name, std::move(segment_entry));
txn->Import(table_entry, std::move(segment_entry));
}

} // namespace infinity
2 changes: 1 addition & 1 deletion src/executor/operator/physical_index_scan.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -625,7 +625,7 @@ void PhysicalIndexScan::ExecuteInternal(QueryContext *query_context, IndexScanOp
const u32 segment_row_actual_count = segment_entry->actual_row_count(); // count of rows in segment, exclude deleted rows

// prepare filter for deleted rows
DeleteFilter delete_filter(segment_entry, begin_ts);
DeleteFilter delete_filter(segment_entry, begin_ts, segment_entry->row_count(begin_ts));
// output
auto result = SolveSecondaryIndexFilterInner(filter_execute_command_, column_index_map_, segment_id, segment_row_count, segment_row_actual_count);
result.Output(output_data_blocks, segment_id, delete_filter);
Expand Down
4 changes: 1 addition & 3 deletions src/executor/operator/physical_insert.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -86,9 +86,7 @@ bool PhysicalInsert::Execute(QueryContext *query_context, OperatorState *operato
output_block->Finalize();

auto *txn = query_context->GetTxn();
const String &db_name = *table_entry_->GetDBName();
const String &table_name = *table_entry_->GetTableName();
txn->Append(db_name, table_name, output_block);
txn->Append(table_entry_, output_block);

UniquePtr<String> result_msg = MakeUnique<String>(fmt::format("INSERTED {} Rows", output_block->row_count()));
if (operator_state == nullptr) {
Expand Down
20 changes: 15 additions & 5 deletions src/executor/operator/physical_knn_scan.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -395,8 +395,9 @@ void PhysicalKnnScan::ExecuteInternal(QueryContext *query_context, KnnScanOperat
IVFFlatScan(filter);
}
} else {
SegmentOffset max_segment_offset = block_index->GetSegmentOffset(segment_id);
if (segment_entry->CheckAnyDelete(begin_ts)) {
DeleteFilter filter(segment_entry, begin_ts);
DeleteFilter filter(segment_entry, begin_ts, max_segment_offset);
IVFFlatScan(filter);
} else {
IVFFlatScan();
Expand All @@ -407,7 +408,7 @@ void PhysicalKnnScan::ExecuteInternal(QueryContext *query_context, KnnScanOperat
case IndexType::kHnsw: {
const auto *index_hnsw = static_cast<const IndexHnsw *>(segment_index_entry->table_index_entry()->index_base());

auto hnsw_search = [&](BufferHandle index_handle, bool with_lock) {
auto hnsw_search = [&](BufferHandle index_handle, bool with_lock, int chunk_id = -1) {
AbstractHnsw<f32, SegmentOffset> abstract_hnsw(index_handle.GetDataMut(), index_hnsw);

for (const auto &opt_param : knn_scan_shared_data->opt_params_) {
Expand Down Expand Up @@ -436,15 +437,16 @@ void PhysicalKnnScan::ExecuteInternal(QueryContext *query_context, KnnScanOperat
abstract_hnsw.KnnSearch(query, knn_scan_shared_data->topk_, filter, with_lock);
}
} else {
SegmentOffset max_segment_offset = block_index->GetSegmentOffset(segment_id);
if (segment_entry->CheckAnyDelete(begin_ts)) {
DeleteFilter filter(segment_entry, begin_ts);
DeleteFilter filter(segment_entry, begin_ts, max_segment_offset);
std::tie(result_n1, d_ptr, l_ptr) =
abstract_hnsw.KnnSearch(query, knn_scan_shared_data->topk_, filter, with_lock);
} else {
if (!with_lock) {
std::tie(result_n1, d_ptr, l_ptr) = abstract_hnsw.KnnSearch(query, knn_scan_shared_data->topk_, false);
} else {
AppendFilter filter(block_index->GetSegmentOffset(segment_id));
AppendFilter filter(max_segment_offset);
std::tie(result_n1, d_ptr, l_ptr) = abstract_hnsw.KnnSearch(query, knn_scan_shared_data->topk_, filter, true);
}
}
Expand Down Expand Up @@ -476,16 +478,24 @@ void PhysicalKnnScan::ExecuteInternal(QueryContext *query_context, KnnScanOperat
auto row_ids = MakeUniqueForOverwrite<RowID[]>(result_n);
for (i64 i = 0; i < result_n; ++i) {
row_ids[i] = RowID{segment_id, l_ptr[i]};

BlockID block_id = l_ptr[i] / DEFAULT_BLOCK_CAPACITY;
auto *block_entry = block_index->GetBlockEntry(segment_id, block_id);
if (block_entry == nullptr) {
UnrecoverableError(
fmt::format("Cannot find segment id: {}, block id: {}, index chunk is {}", segment_id, block_id, chunk_id));
}
}
merge_heap->Search(0, d_ptr.get(), row_ids.get(), result_n);
}
};

auto [chunk_index_entries, memory_index_entry] = segment_index_entry->GetHnswIndexSnapshot();
int i = 0;
for (auto &chunk_index_entry : chunk_index_entries) {
if (chunk_index_entry->CheckVisible(begin_ts)) {
BufferHandle index_handle = chunk_index_entry->GetIndex();
hnsw_search(index_handle, false);
hnsw_search(index_handle, false, i++);
}
}
if (memory_index_entry.get() != nullptr) {
Expand Down
3 changes: 2 additions & 1 deletion src/executor/operator/physical_match.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,8 @@ class FilterIteratorBase : public QueryIteratorT {
return {false, INVALID_ROWID};
}
if (cache_need_check_delete_) [[unlikely]] {
DeleteFilter delete_filter(cache_segment_entry_, common_query_filter_->begin_ts_);
SegmentOffset max_segment_offset = cache_segment_offset_;
DeleteFilter delete_filter(cache_segment_entry_, common_query_filter_->begin_ts_, max_segment_offset);
if (delete_filter(id.segment_offset_)) {
return {true, id};
}
Expand Down
6 changes: 2 additions & 4 deletions src/executor/operator/physical_update.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,6 @@ bool PhysicalUpdate::Execute(QueryContext *query_context, OperatorState *operato
DataBlock *input_data_block_ptr = prev_op_state->data_block_array_[block_idx].get();

auto txn = query_context->GetTxn();
const String& db_name = *table_entry_ptr_->GetDBName();
auto table_name = table_entry_ptr_->GetTableName();
Vector<RowID> row_ids;
Vector<SharedPtr<ColumnVector>> column_vectors;
for (SizeT i = 0; i < input_data_block_ptr->column_count(); i++) {
Expand Down Expand Up @@ -76,8 +74,8 @@ bool PhysicalUpdate::Execute(QueryContext *query_context, OperatorState *operato

SharedPtr<DataBlock> output_data_block = DataBlock::Make();
output_data_block->Init(column_vectors);
txn->Append(db_name, *table_name, output_data_block);
txn->Delete(db_name, *table_name, row_ids);
txn->Append(table_entry_ptr_, output_data_block);
txn->Delete(table_entry_ptr_, row_ids);

UpdateOperatorState* update_operator_state = static_cast<UpdateOperatorState*>(operator_state);
++ update_operator_state->count_;
Expand Down
12 changes: 9 additions & 3 deletions src/function/table/knn_filter.cppm
Original file line number Diff line number Diff line change
Expand Up @@ -47,20 +47,26 @@ private:

export class DeleteFilter final : public FilterBase<SegmentOffset> {
public:
explicit DeleteFilter(const SegmentEntry *segment, TxnTimeStamp query_ts) : segment_(segment), query_ts_(query_ts) {}
explicit DeleteFilter(const SegmentEntry *segment, TxnTimeStamp query_ts, SegmentOffset max_segment_offset)
: segment_(segment), query_ts_(query_ts), max_segment_offset_(max_segment_offset) {}

bool operator()(const SegmentOffset &segment_offset) const final { return segment_->CheckRowVisible(segment_offset, query_ts_); }
bool operator()(const SegmentOffset &segment_offset) const final {
bool check_append = max_segment_offset_ == 0;
return segment_offset <= max_segment_offset_ && segment_->CheckRowVisible(segment_offset, query_ts_, check_append);
}

private:
const SegmentEntry *const segment_;

const TxnTimeStamp query_ts_;

const SegmentOffset max_segment_offset_;
};

export class DeleteWithBitmaskFilter final : public FilterBase<SegmentOffset> {
public:
explicit DeleteWithBitmaskFilter(const Bitmask &bitmask, const SegmentEntry *segment, TxnTimeStamp query_ts)
: bitmask_filter_(bitmask), delete_filter_(segment, query_ts) {}
: bitmask_filter_(bitmask), delete_filter_(segment, query_ts, 0) {}

bool operator()(const SegmentOffset &segment_offset) const final { return bitmask_filter_(segment_offset) && delete_filter_(segment_offset); }

Expand Down
45 changes: 13 additions & 32 deletions src/storage/bg_task/compact_segments_task.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -126,30 +126,14 @@ UniquePtr<CompactSegmentsTask> CompactSegmentsTask::MakeTaskWithWholeTable(Table
}

CompactSegmentsTask::CompactSegmentsTask(TableEntry *table_entry, Vector<SegmentEntry *> &&segments, Txn *txn, CompactSegmentsTaskType type)
: task_type_(type), db_name_(table_entry->GetDBName()), table_name_(table_entry->GetTableName()), commit_ts_(table_entry->commit_ts_),
segments_(std::move(segments)), txn_(txn) {}

bool CompactSegmentsTask::Execute() {
auto [table_entry, status] = txn_->GetTableByName(*db_name_, *table_name_);
if (!status.ok()) {
// the table is dropped before the background task is executed.
if (status.code() == ErrorCode::kTableNotExist) {
LOG_INFO(fmt::format("Table {} not exist, skip compact", *table_name_));
return false;
} else {
UnrecoverableError("Get table entry failed");
}
}
if (table_entry->commit_ts_ != commit_ts_) {
// If the table is compacted, the table will be removed.
return false;
}
CompactSegmentsTaskState state(table_entry);
: task_type_(type), table_entry_(table_entry), commit_ts_(table_entry->commit_ts_), segments_(std::move(segments)), txn_(txn) {}

void CompactSegmentsTask::Execute() {
CompactSegmentsTaskState state;
CompactSegments(state);
CreateNewIndex(state);
SaveSegmentsData(state);
ApplyDeletes(state);
return true;
}

// generate new_table_ref_ to compact
Expand All @@ -165,13 +149,7 @@ void CompactSegmentsTask::CompactSegments(CompactSegmentsTaskState &state) {

auto new_segment = CompactSegmentsToOne(state, to_compact_segments);
block_index->Insert(new_segment.get(), UNCOMMIT_TS, false);
{
String ss;
for (auto *segment : to_compact_segments) {
ss += std::to_string(segment->segment_id()) + " ";
}
LOG_TRACE(fmt::format("Table {}, type: {}, compacting segments: {} into {}", *table_name_, (u8)task_type_, ss, new_segment->segment_id()));
}

segment_data.emplace_back(new_segment, std::move(to_compact_segments));
old_segments.insert(old_segments.end(), to_compact_segments.begin(), to_compact_segments.end());
};
Expand Down Expand Up @@ -215,7 +193,7 @@ void CompactSegmentsTask::CompactSegments(CompactSegmentsTaskState &state) {
}

// FIXME: fake table ref here
state.new_table_ref_ = MakeUnique<BaseTableRef>(state.table_entry_, block_index);
state.new_table_ref_ = MakeUnique<BaseTableRef>(table_entry_, block_index);
}

void CompactSegmentsTask::CreateNewIndex(CompactSegmentsTaskState &state) {
Expand Down Expand Up @@ -244,7 +222,7 @@ void CompactSegmentsTask::CreateNewIndex(CompactSegmentsTaskState &state) {
}

void CompactSegmentsTask::SaveSegmentsData(CompactSegmentsTaskState &state) {
auto *table_entry = state.table_entry_;
auto *table_entry = table_entry_;
auto segment_data = std::move(state.segment_data_);

Vector<WalSegmentInfo> segment_infos;
Expand All @@ -261,7 +239,8 @@ void CompactSegmentsTask::SaveSegmentsData(CompactSegmentsTaskState &state) {
}
}
txn_->Compact(table_entry, std::move(segment_data), task_type_);
String db_name = *db_name_, table_name = *table_name_;
String db_name = *table_entry->GetDBName();
String table_name = *table_entry->GetTableName();
txn_->AddWalCmd(MakeShared<WalCmdCompact>(std::move(db_name), std::move(table_name), std::move(segment_infos), std::move(old_segment_ids)));
}

Expand All @@ -281,16 +260,18 @@ void CompactSegmentsTask::ApplyDeletes(CompactSegmentsTaskState &state) {
row_ids.push_back(new_row_id);
}
}
txn_->Delete(*db_name_, *table_name_, row_ids, false);
txn_->Delete(table_entry_, row_ids, false);
}

void CompactSegmentsTask::AddToDelete(SegmentID segment_id, Vector<SegmentOffset> &&delete_offsets) {
std::unique_lock lock(mutex_);
to_deletes_.emplace_back(ToDeleteInfo{segment_id, std::move(delete_offsets)});
}

const String &CompactSegmentsTask::table_name() const { return *table_entry_->GetTableName(); }

SharedPtr<SegmentEntry> CompactSegmentsTask::CompactSegmentsToOne(CompactSegmentsTaskState &state, const Vector<SegmentEntry *> &segments) {
auto *table_entry = state.table_entry_;
auto *table_entry = table_entry_;
auto &remapper = state.remapper_;
auto new_segment = SegmentEntry::NewSegmentEntry(table_entry, Catalog::GetNextSegmentID(table_entry), txn_);

Expand Down
12 changes: 3 additions & 9 deletions src/storage/bg_task/compact_segments_task.cppm
Original file line number Diff line number Diff line change
Expand Up @@ -65,11 +65,6 @@ struct ToDeleteInfo {
};

export struct CompactSegmentsTaskState {
// default copy construct of table ref
CompactSegmentsTaskState(TableEntry *table_entry) : table_entry_(table_entry) {}

TableEntry *const table_entry_;

RowIDRemapper remapper_;

Vector<Pair<SharedPtr<SegmentEntry>, Vector<SegmentEntry *>>> segment_data_;
Expand Down Expand Up @@ -103,7 +98,7 @@ public:
}
}

bool Execute();
void Execute();

// Called by `SegmentEntry::DeleteData` which is called by wal thread in
// So to_deletes_ is thread-safe.
Expand All @@ -124,15 +119,14 @@ public:

public:
// Getter
const String &table_name() const { return *table_name_; }
const String &table_name() const;

private:
SharedPtr<SegmentEntry> CompactSegmentsToOne(CompactSegmentsTaskState &state, const Vector<SegmentEntry *> &segments);

private:
const CompactSegmentsTaskType task_type_;
SharedPtr<String> db_name_;
SharedPtr<String> table_name_;
TableEntry *table_entry_;
TxnTimeStamp commit_ts_;
Vector<SegmentEntry *> segments_;

Expand Down
Loading
Loading