Skip to content

Commit 89d18c9

Browse files
authored
Fix: checkpoint recovery (infiniflow#831)
### What problem does this PR solve? Problem: 1. The service doesn't check whether the `commit_ts` of entry is lower or equal than `max_commit_ts` . And when load from META Catalog, there lacks check for segment_map when set `unsealed_segment_` cause it may be `nullptr` . 2. When load delta catalog, the service will skip the operation which `txn_id` is lower than catalog's, but the `next_txn_id_` of catalog is set by loading from full catalog, which means it is actually the `txn_id` of full checkpoint txn. But there exists uncommitted txn before full checkpoint, which means these delta catalog operations will be skipped. Things have done: 1. Add restart service action in github workflow to test whether the service could restart normally. 2. Add corresponding test samples. 3. Fix loading delta catalog. Issue link:infiniflow#825 ### Type of change - [x] Bug Fix (non-breaking change which fixes an issue) - [x] Test cases
1 parent dc042db commit 89d18c9

File tree

9 files changed

+199
-8
lines changed

9 files changed

+199
-8
lines changed

.github/workflows/tests.yml

+2-2
Original file line numberDiff line numberDiff line change
@@ -88,7 +88,7 @@ jobs:
8888

8989
- name: Stop infinity sqllogictest debug
9090
if: ${{ !cancelled() }} # always run this step even if previous steps failed
91-
run: sudo kill $(pidof cmake-build-debug/src/infinity)
91+
run: sudo kill $(pidof cmake-build-debug/src/infinity) && sleep 5s
9292

9393
- name: Collect infinity debug output
9494
# GitHub Actions interprets output lines starting with "Error" as error messages, and it automatically sets the step status to failed when such lines are detected.
@@ -164,7 +164,7 @@ jobs:
164164

165165
- name: Stop infinity sqllogictest release
166166
if: ${{ !cancelled() }} # always run this step even if previous steps failed
167-
run: sudo kill $(pidof cmake-build-release/src/infinity)
167+
run: sudo kill $(pidof cmake-build-release/src/infinity) && sleep 5s
168168

169169
- name: Collect infinity release output
170170
# GitHub Actions interprets output lines starting with "Error" as error messages, and it automatically sets the step status to failed when such lines are detected.

src/storage/meta/catalog.cpp

+4-1
Original file line numberDiff line numberDiff line change
@@ -405,6 +405,7 @@ nlohmann::json Catalog::Serialize(TxnTimeStamp max_commit_ts, bool is_full_check
405405
std::shared_lock<std::shared_mutex> lck(this->rw_locker());
406406
json_res["current_dir"] = *this->current_dir_;
407407
json_res["next_txn_id"] = this->next_txn_id_;
408+
json_res["full_ckp_commit_ts"] = this->full_ckp_commit_ts_;
408409
json_res["catalog_version"] = this->catalog_version_;
409410
databases.reserve(this->db_meta_map().size());
410411
for (auto &db_meta : this->db_meta_map()) {
@@ -482,7 +483,7 @@ void Catalog::LoadFromEntry(Catalog *catalog, const String &catalog_path, Buffer
482483
auto txn_id = op->txn_id();
483484
auto begin_ts = op->begin_ts();
484485
auto is_delete = op->is_delete();
485-
if (txn_id < catalog->next_txn_id_) {
486+
if (op->commit_ts_ < catalog->full_ckp_commit_ts_) {
486487
// Ignore the old txn
487488
continue;
488489
}
@@ -795,6 +796,7 @@ void Catalog::Deserialize(const nlohmann::json &catalog_json, BufferManager *buf
795796
// FIXME: new catalog need a scheduler, current we use nullptr to represent it.
796797
catalog = MakeUnique<Catalog>(current_dir);
797798
catalog->next_txn_id_ = catalog_json["next_txn_id"];
799+
catalog->full_ckp_commit_ts_ = catalog_json["full_ckp_commit_ts"];
798800
catalog->catalog_version_ = catalog_json["catalog_version"];
799801
if (catalog_json.contains("databases")) {
800802
for (const auto &db_json : catalog_json["databases"]) {
@@ -809,6 +811,7 @@ UniquePtr<String> Catalog::SaveFullCatalog(const String &catalog_dir, TxnTimeSta
809811
String catalog_tmp_path = String(fmt::format("{}/_META_CATALOG.{}.json", catalog_dir, max_commit_ts));
810812

811813
// Serialize catalog to string
814+
full_ckp_commit_ts_ = max_commit_ts;
812815
nlohmann::json catalog_json = Serialize(max_commit_ts, true);
813816
String catalog_str = catalog_json.dump();
814817

src/storage/meta/catalog.cppm

+1
Original file line numberDiff line numberDiff line change
@@ -265,6 +265,7 @@ public:
265265
MetaMap<DBMeta> db_meta_map_{};
266266

267267
TransactionID next_txn_id_{};
268+
TxnTimeStamp full_ckp_commit_ts_{};
268269
u64 catalog_version_{}; // TODO seems useless
269270

270271
// Currently, these function or function set can't be changed and also will not be persistent.

src/storage/meta/db_meta.cpp

+1-1
Original file line numberDiff line numberDiff line change
@@ -127,7 +127,7 @@ nlohmann::json DBMeta::Serialize(TxnTimeStamp max_commit_ts, bool is_full_checkp
127127
// Need to find the full history of the entry till given timestamp. Note that GetEntry returns at most one valid entry at given timestamp.
128128
db_candidates.reserve(this->db_entry_list().size());
129129
for (auto &db_entry : this->db_entry_list()) {
130-
if (db_entry->entry_type_ == EntryType::kDatabase) {
130+
if (db_entry->entry_type_ == EntryType::kDatabase && db_entry->commit_ts_ <= max_commit_ts) {
131131
// Put it to candidate list
132132
db_candidates.push_back((DBEntry *)db_entry.get());
133133
}

src/storage/meta/entry/segment_entry.cpp

+3-1
Original file line numberDiff line numberDiff line change
@@ -411,7 +411,9 @@ nlohmann::json SegmentEntry::Serialize(TxnTimeStamp max_commit_ts, bool is_full_
411411
LOG_TRACE(fmt::format("SegmentEntry::Serialize: End try to save FastRoughFilter to json file"));
412412
}
413413
for (auto &block_entry : this->block_entries_) {
414-
json_res["block_entries"].emplace_back(block_entry->Serialize(max_commit_ts));
414+
if (block_entry->commit_ts_ <= max_commit_ts) {
415+
json_res["block_entries"].emplace_back(block_entry->Serialize(max_commit_ts));
416+
}
415417
}
416418
}
417419
return json_res;

src/storage/meta/entry/table_entry.cpp

+4-1
Original file line numberDiff line numberDiff line change
@@ -648,7 +648,10 @@ UniquePtr<TableEntry> TableEntry::Deserialize(const nlohmann::json &table_entry_
648648
SharedPtr<SegmentEntry> segment_entry = SegmentEntry::Deserialize(segment_json, table_entry.get(), buffer_mgr);
649649
table_entry->segment_map_.emplace(segment_entry->segment_id(), segment_entry);
650650
}
651-
table_entry->unsealed_segment_ = table_entry->segment_map_.at(unsealed_id);
651+
// here the unsealed_segment_ may be nullptr
652+
if (table_entry->segment_map_.find(unsealed_id) != table_entry->segment_map_.end()) {
653+
table_entry->unsealed_segment_ = table_entry->segment_map_.at(unsealed_id);
654+
}
652655
}
653656

654657
table_entry->commit_ts_ = table_entry_json["commit_ts"];

src/storage/meta/table_meta.cpp

+2-1
Original file line numberDiff line numberDiff line change
@@ -126,6 +126,7 @@ void TableMeta::CreateEntryReplay(std::function<SharedPtr<TableEntry>(TableMeta
126126
void TableMeta::DropEntryReplay(std::function<SharedPtr<TableEntry>(TableMeta *, SharedPtr<String>, TransactionID, TxnTimeStamp)> &&init_entry,
127127
TransactionID txn_id,
128128
TxnTimeStamp begin_ts) {
129+
// LOG_INFO(fmt::format("Drop table entry replay: table name: {}, txn_id:{}, begin_ts:{}", *table_name_, txn_id, begin_ts));
129130
auto [entry, status] = table_entry_list_.DropEntryReplay(
130131
[&](TransactionID txn_id, TxnTimeStamp begin_ts) { return init_entry(this, table_name_, txn_id, begin_ts); },
131132
txn_id,
@@ -162,7 +163,7 @@ nlohmann::json TableMeta::Serialize(TxnTimeStamp max_commit_ts, bool is_full_che
162163
// Need to find the full history of the entry till given timestamp. Note that GetEntry returns at most one valid entry at given timestamp.
163164
table_candidates.reserve(this->table_entry_list().size());
164165
for (auto &table_entry : this->table_entry_list()) {
165-
if (table_entry->entry_type_ == EntryType::kTable) {
166+
if (table_entry->entry_type_ == EntryType::kTable && table_entry->commit_ts_ <= max_commit_ts) {
166167
// Put it to candidate list
167168
table_candidates.push_back((TableEntry *)table_entry.get());
168169
}

src/storage/wal/wal_manager.cpp

-1
Original file line numberDiff line numberDiff line change
@@ -709,7 +709,6 @@ void WalManager::ReplaySegment(TableEntry *table_entry, const WalSegmentInfo &se
709709

710710
segment_entry->AppendBlockEntry(std::move(block_entry));
711711
}
712-
BuildFastRoughFilterTask::ExecuteOnNewSealedSegment(segment_entry.get(), storage_->buffer_manager(), commit_ts);
713712

714713
table_entry->WalReplaySegment(segment_entry);
715714
}

src/unit_test/storage/wal/catalog_delta_replay.cpp

+182
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,8 @@ import third_party;
4040
import base_table_ref;
4141
import index_secondary;
4242
import data_block;
43+
import bg_task;
44+
import logger;
4345

4446
using namespace infinity;
4547

@@ -413,3 +415,183 @@ TEST_F(CatalogDeltaReplayTest, replay_append) {
413415
infinity::InfinityContext::instance().UnInit();
414416
}
415417
}
418+
419+
TEST_F(CatalogDeltaReplayTest, replay_with_full_checkpoint) {
420+
auto config_path = std::make_shared<std::string>(std::string(test_data_path()) + "/config/test_catalog_delta.toml");
421+
422+
auto db_name = std::make_shared<std::string>("default");
423+
424+
auto column_def1 =
425+
std::make_shared<ColumnDef>(0, std::make_shared<DataType>(LogicalType::kInteger), "col1", std::unordered_set<ConstraintType>{});
426+
auto column_def2 =
427+
std::make_shared<ColumnDef>(0, std::make_shared<DataType>(LogicalType::kVarchar), "col2", std::unordered_set<ConstraintType>{});
428+
auto table_name = std::make_shared<std::string>("tb1");
429+
auto table_name_committed = std::make_shared<std::string>("tb_committed");
430+
auto table_name_uncommitted = std::make_shared<std::string>("tb_uncommitted");
431+
auto table_def = TableDef::Make(db_name, table_name, {column_def1, column_def2});
432+
auto table_def_committed = TableDef::Make(db_name, table_name_committed, {column_def1, column_def2});
433+
auto table_def_uncommitted = TableDef::Make(db_name, table_name_uncommitted, {column_def1, column_def2});
434+
435+
{
436+
InfinityContext::instance().Init(config_path);
437+
Storage *storage = InfinityContext::instance().storage();
438+
439+
TxnManager *txn_mgr = storage->txn_manager();
440+
TxnTimeStamp last_commit_ts = 0;
441+
// create table and insert two records
442+
{
443+
auto *txn = txn_mgr->CreateTxn();
444+
txn->Begin();
445+
446+
txn->CreateTable(*db_name, table_def, ConflictType::kError);
447+
448+
auto [table_entry, status] = txn->GetTableByName(*db_name, *table_name);
449+
EXPECT_TRUE(status.ok());
450+
451+
last_commit_ts = txn_mgr->CommitTxn(txn);
452+
}
453+
{
454+
auto *txn = txn_mgr->CreateTxn();
455+
txn->Begin();
456+
457+
Vector<SharedPtr<ColumnVector>> column_vectors;
458+
for (SizeT i = 0; i < table_def->columns().size(); ++i) {
459+
SharedPtr<DataType> data_type = table_def->columns()[i]->type();
460+
column_vectors.push_back(MakeShared<ColumnVector>(data_type));
461+
column_vectors.back()->Initialize();
462+
}
463+
{
464+
int v1 = 1;
465+
column_vectors[0]->AppendByPtr(reinterpret_cast<const_ptr_t>(&v1));
466+
}
467+
{
468+
std::string v2 = "v2v2v2v2v2v2v2v2v2v2v2v2v2v2v2v2v2v2v2v2";
469+
column_vectors[1]->AppendByStringView(v2, ',');
470+
}
471+
auto data_block = DataBlock::Make();
472+
data_block->Init(column_vectors);
473+
474+
auto status = txn->Append(*db_name, *table_name, data_block);
475+
ASSERT_TRUE(status.ok());
476+
last_commit_ts = txn_mgr->CommitTxn(txn);
477+
}
478+
{
479+
auto *txn = txn_mgr->CreateTxn();
480+
txn->Begin();
481+
482+
Vector<SharedPtr<ColumnVector>> column_vectors;
483+
for (SizeT i = 0; i < table_def->columns().size(); ++i) {
484+
SharedPtr<DataType> data_type = table_def->columns()[i]->type();
485+
column_vectors.push_back(MakeShared<ColumnVector>(data_type));
486+
column_vectors.back()->Initialize();
487+
}
488+
{
489+
int v1 = 2;
490+
column_vectors[0]->AppendByPtr(reinterpret_cast<const_ptr_t>(&v1));
491+
}
492+
{
493+
std::string v2 = "askljhfasfhuiwqeriqkldfnramgeasfklhllfjas";
494+
column_vectors[1]->AppendByStringView(v2, ',');
495+
}
496+
auto data_block = DataBlock::Make();
497+
data_block->Init(column_vectors);
498+
499+
auto status = txn->Append(*db_name, *table_name, data_block);
500+
ASSERT_TRUE(status.ok());
501+
last_commit_ts = txn_mgr->CommitTxn(txn);
502+
}
503+
504+
// 1. remain some uncommitted txn before force full checkpoint
505+
// 2. wait delta checkpoint
506+
{
507+
auto *txn_uncommitted = txn_mgr->CreateTxn();
508+
txn_uncommitted->Begin();
509+
txn_uncommitted->CreateTable(*db_name, table_def_uncommitted, ConflictType::kError);
510+
511+
LOG_INFO("BEFORE FULL CKP");
512+
auto *txn_force_ckp = txn_mgr->CreateTxn();
513+
txn_force_ckp->Begin();
514+
auto force_ckp_task = MakeShared<ForceCheckpointTask>(txn_force_ckp, true);
515+
storage->bg_processor()->Submit(force_ckp_task);
516+
force_ckp_task->Wait();
517+
LOG_INFO("AFTER FULL CKP");
518+
txn_mgr->CommitTxn(txn_force_ckp);
519+
520+
txn_mgr->CommitTxn(txn_uncommitted);
521+
522+
// some txn in delta checkpoint
523+
auto *txn_committed = txn_mgr->CreateTxn();
524+
txn_committed->Begin();
525+
txn_committed->CreateTable(*db_name, table_def_committed, ConflictType::kError);
526+
txn_mgr->CommitTxn(txn_committed);
527+
528+
auto *txn_record3 = txn_mgr->CreateTxn();
529+
txn_record3->Begin();
530+
Vector<SharedPtr<ColumnVector>> column_vectors;
531+
for (SizeT i = 0; i < table_def_committed->columns().size(); ++i) {
532+
SharedPtr<DataType> data_type = table_def_committed->columns()[i]->type();
533+
column_vectors.push_back(MakeShared<ColumnVector>(data_type));
534+
column_vectors.back()->Initialize();
535+
}
536+
{
537+
int v1 = 3;
538+
column_vectors[0]->AppendByPtr(reinterpret_cast<const_ptr_t>(&v1));
539+
}
540+
{
541+
std::string v2 = "this is a test for replay with full checkpoint";
542+
column_vectors[1]->AppendByStringView(v2, ',');
543+
}
544+
auto data_block = DataBlock::Make();
545+
data_block->Init(column_vectors);
546+
547+
auto status = txn_record3->Append(*db_name, *table_name, data_block);
548+
ASSERT_TRUE(status.ok());
549+
550+
last_commit_ts = txn_mgr->CommitTxn(txn_record3);
551+
WaitFlushDeltaOp(txn_mgr, last_commit_ts);
552+
553+
infinity::InfinityContext::instance().UnInit();
554+
}
555+
}
556+
557+
// now restart and the table `tb_uncommitted` should exist
558+
{
559+
InfinityContext::instance().Init(config_path);
560+
Storage *storage = InfinityContext::instance().storage();
561+
562+
TxnManager *txn_mgr = storage->txn_manager();
563+
564+
{
565+
auto *txn = txn_mgr->CreateTxn();
566+
txn->Begin();
567+
{
568+
auto [table_uncommitted, uncommitted_status] = txn->GetTableByName(*db_name, *table_name_uncommitted);
569+
EXPECT_TRUE(uncommitted_status.ok());
570+
571+
auto [table_committed, committed_status] = txn->GetTableByName(*db_name, *table_name_committed);
572+
EXPECT_TRUE(committed_status.ok());
573+
574+
auto [table_entry, table_status] = txn->GetTableByName(*db_name, *table_name);
575+
EXPECT_TRUE(table_status.ok());
576+
577+
EXPECT_EQ(table_entry->row_count(), 3ul);
578+
ASSERT_EQ(table_entry->segment_map().size(), 1ul);
579+
{
580+
auto &segment_entry = table_entry->segment_map().begin()->second;
581+
EXPECT_EQ(segment_entry->row_count(), 3ul);
582+
ASSERT_EQ(segment_entry->block_entries().size(), 1ul);
583+
{
584+
BlockEntry *block_entry = segment_entry->block_entries()[0].get();
585+
EXPECT_EQ(block_entry->row_count(), 3ul);
586+
ASSERT_EQ(block_entry->columns().size(), 2ul);
587+
{
588+
auto &col2 = block_entry->columns()[1];
589+
EXPECT_EQ(col2->OutlineBufferCount(), 3ul);
590+
}
591+
}
592+
}
593+
}
594+
}
595+
infinity::InfinityContext::instance().UnInit();
596+
}
597+
}

0 commit comments

Comments
 (0)