Skip to content

Commit c30d3c6

Browse files
authored
Fix: use checkpoint_row_count to replace row_count (#1136)
### What problem does this PR solve? `SegmentEntry` and `TableEntry` use `row_count_` directly when doing full checkpoint, which will cause error if other txn modified `row_count_` concurrently. ### Type of change - [x] Bug Fix (non-breaking change which fixes an issue)
1 parent 7a00587 commit c30d3c6

File tree

4 files changed

+23
-4
lines changed

4 files changed

+23
-4
lines changed

src/storage/meta/entry/segment_entry.cpp

+6-1
Original file line numberDiff line numberDiff line change
@@ -409,6 +409,8 @@ SharedPtr<BlockEntry> SegmentEntry::GetBlockEntryByID(BlockID block_id) const {
409409
nlohmann::json SegmentEntry::Serialize(TxnTimeStamp max_commit_ts) {
410410
nlohmann::json json_res;
411411

412+
this->checkpoint_row_count_ = 0;
413+
412414
// const field
413415
json_res["segment_dir"] = *this->segment_dir_;
414416
json_res["row_capacity"] = this->row_capacity_;
@@ -420,7 +422,6 @@ nlohmann::json SegmentEntry::Serialize(TxnTimeStamp max_commit_ts) {
420422
json_res["min_row_ts"] = this->min_row_ts_;
421423
json_res["max_row_ts"] = std::min(this->max_row_ts_, max_commit_ts);
422424
json_res["deleted"] = this->deleted_;
423-
json_res["row_count"] = this->row_count_;
424425
json_res["actual_row_count"] = this->actual_row_count_;
425426

426427
json_res["commit_ts"] = TxnTimeStamp(this->commit_ts_);
@@ -436,8 +437,11 @@ nlohmann::json SegmentEntry::Serialize(TxnTimeStamp max_commit_ts) {
436437
if (block_entry->commit_ts_ <= max_commit_ts) {
437438
block_entry->Flush(max_commit_ts);
438439
json_res["block_entries"].emplace_back(block_entry->Serialize(max_commit_ts));
440+
this->checkpoint_row_count_ += block_entry->checkpoint_row_count();
439441
}
440442
}
443+
444+
json_res["row_count"] = this->checkpoint_row_count_;
441445
}
442446
return json_res;
443447
}
@@ -455,6 +459,7 @@ SharedPtr<SegmentEntry> SegmentEntry::Deserialize(const nlohmann::json &segment_
455459
segment_entry->max_row_ts_ = segment_entry_json["max_row_ts"];
456460
segment_entry->row_count_ = segment_entry_json["row_count"];
457461
segment_entry->actual_row_count_ = segment_entry_json["actual_row_count"];
462+
segment_entry->checkpoint_row_count_ = 0;
458463

459464
segment_entry->commit_ts_ = segment_entry_json["commit_ts"];
460465
segment_entry->begin_ts_ = segment_entry_json["begin_ts"];

src/storage/meta/entry/segment_entry.cppm

+6
Original file line numberDiff line numberDiff line change
@@ -151,6 +151,11 @@ public:
151151
return actual_row_count_;
152152
}
153153

154+
// only used in Serialize(), FullCheckpoint, and no concurrency
155+
SizeT checkpoint_row_count() const {
156+
return checkpoint_row_count_;
157+
}
158+
154159
int Room() const { return this->row_capacity_ - this->row_count(); }
155160

156161
TxnTimeStamp min_row_ts() const { return min_row_ts_; }
@@ -207,6 +212,7 @@ private:
207212

208213
SizeT row_count_{};
209214
SizeT actual_row_count_{}; // not deleted row count
215+
SizeT checkpoint_row_count_{};
210216

211217
TxnTimeStamp min_row_ts_{UNCOMMIT_TS}; // Indicate the commit_ts which create this SegmentEntry
212218
TxnTimeStamp max_row_ts_{0};

src/storage/meta/entry/table_entry.cpp

+9-3
Original file line numberDiff line numberDiff line change
@@ -313,12 +313,10 @@ void TableEntry::Import(SharedPtr<SegmentEntry> segment_entry, Txn *txn) {
313313
{
314314
std::unique_lock lock(this->rw_locker_);
315315
SegmentID segment_id = segment_entry->segment_id();
316-
SizeT row_count = segment_entry->row_count();
317316
auto [_, insert_ok] = this->segment_map_.emplace(segment_id, segment_entry);
318317
if (!insert_ok) {
319318
UnrecoverableError(fmt::format("Insert segment {} failed.", segment_id));
320319
}
321-
this->row_count_ += row_count;
322320
}
323321
// Populate index entirely for the segment
324322
TxnTableStore *txn_table_store = txn->GetTxnTableStore(this);
@@ -369,6 +367,12 @@ void TableEntry::AppendData(TransactionID txn_id, void *txn_store, TxnTimeStamp
369367
AppendState *append_state_ptr = txn_store_ptr->append_state_.get();
370368
Txn *txn = txn_store_ptr->txn_;
371369
if (append_state_ptr->Finished()) {
370+
// Import update row count
371+
if (append_state_ptr->blocks_.empty()) {
372+
for (auto &segment : txn_store_ptr->flushed_segments()) {
373+
this->row_count_ += segment->row_count();
374+
}
375+
}
372376
LOG_TRACE("No append is done.");
373377
return;
374378
}
@@ -904,11 +908,11 @@ nlohmann::json TableEntry::Serialize(TxnTimeStamp max_commit_ts) {
904908
Vector<SegmentEntry *> segment_candidates;
905909
Vector<TableIndexMeta *> table_index_meta_candidates;
906910
Vector<String> table_index_name_candidates;
911+
SizeT checkpoint_row_count = 0;
907912
{
908913
std::shared_lock<std::shared_mutex> lck(this->rw_locker_);
909914
json_res["table_name"] = *this->GetTableName();
910915
json_res["table_entry_type"] = this->table_entry_type_;
911-
json_res["row_count"] = this->row_count_.load();
912916
json_res["begin_ts"] = this->begin_ts_;
913917
json_res["commit_ts"] = this->commit_ts_.load();
914918
json_res["txn_id"] = this->txn_id_.load();
@@ -952,7 +956,9 @@ nlohmann::json TableEntry::Serialize(TxnTimeStamp max_commit_ts) {
952956
// Serialize segments
953957
for (const auto &segment_entry : segment_candidates) {
954958
json_res["segments"].emplace_back(segment_entry->Serialize(max_commit_ts));
959+
checkpoint_row_count += segment_entry->checkpoint_row_count();
955960
}
961+
json_res["row_count"] = checkpoint_row_count;
956962
json_res["unsealed_id"] = unsealed_id_;
957963

958964
// Serialize indexes

src/storage/txn/txn_store.cppm

+2
Original file line numberDiff line numberDiff line change
@@ -134,6 +134,8 @@ public: // Getter
134134

135135
const HashMap<SegmentID, TxnSegmentStore> &txn_segments() const { return txn_segments_store_; }
136136

137+
const Vector<SegmentEntry *> &flushed_segments() const { return flushed_segments_; }
138+
137139
private:
138140
HashMap<SegmentID, TxnSegmentStore> txn_segments_store_{};
139141
Vector<SegmentEntry *> flushed_segments_{};

0 commit comments

Comments
 (0)