Skip to content

Commit b826097

Browse files
Fix: config & unit test.
1 parent 337833b commit b826097

20 files changed

+88
-59
lines changed

.github/workflows/tests.yml

+10-1
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,16 @@ jobs:
5959

6060
- name: Unit test debug version
6161
if: ${{ !cancelled() && !failure() }}
62-
run: sudo docker exec infinity_build bash -c "mkdir -p /var/infinity && cd /infinity/ && cmake-build-debug/src/test_main > unittest_debug.log 2>&1"
62+
run: |
63+
sudo docker exec infinity_build bash -c "mkdir -p /var/infinity \
64+
cd /infinity/ \
65+
for i in $(seq 1 5); do \
66+
echo \"Run $i\" \
67+
cmake-build-debug/src/test_main > unittest_debug.log 2>&1 \
68+
if [ $? -eq 0 ]; then \
69+
break \
70+
fi \
71+
done"
6372
6473
- name: Collect infinity unit test debug output
6574
run: cat unittest_debug.log 2>/dev/null || true

conf/infinity_conf.toml

+8-8
Original file line numberDiff line numberDiff line change
@@ -43,31 +43,31 @@ storage_capacity = "64GB"
4343
# s means seconds, for example "60s", 60 seconds
4444
# m means minutes, for example "60m", 60 minutes
4545
# h means hours, for example "1h", 1 hour
46-
garbage_collection_interval = "60s"
46+
cleanup_interval = "60s"
4747

4848
# storage ratio activates garbage collection:
4949
# 0 means disable,
5050
# 0.1 means, once the storage reach 10% storage capacity, GC is triggered.
5151
garbage_collection_storage_ratio = 0.1
5252

5353
# dump memory index entry when it reachs the capacity
54-
memindex_capacity = 1048576
54+
mem_index_capacity = 1048576
5555

5656
[buffer]
57-
buffer_pool_size = "4GB"
57+
buffer_manager_size = "4GB"
5858
temp_dir = "/var/infinity/tmp"
5959

6060
[wal]
6161
wal_dir = "/var/infinity/wal"
62-
full_checkpoint_interval_sec = 86400
63-
delta_checkpoint_interval_sec = 60
64-
delta_checkpoint_interval_wal_bytes = 1000000000
65-
wal_file_size_threshold = "1GB"
62+
full_checkpoint_interval = 86400
63+
delta_checkpoint_interval = 60
64+
delta_checkpoint_threshold = 1000000000
65+
wal_compact_threshold = "1GB"
6666

6767
# flush_at_once: write and flush log each commit
6868
# only_write: write log, OS control when to flush the log, default
6969
# flush_per_second: logs are written after each commit and flushed to disk per second.
70-
flush_at_commit = "only_write"
70+
flush_option = "only_write"
7171

7272
[resource]
7373
dictionary_dir = "/var/infinity/resource"

src/main/config.cpp

+1-1
Original file line numberDiff line numberDiff line change
@@ -881,7 +881,7 @@ Status Config::Init(const SharedPtr<String> &config_path) {
881881
if (wal_config["delta_checkpoint_interval"]) {
882882
if (wal_config["delta_checkpoint_interval"].is_string()) {
883883
String delta_checkpoint_interval_str =
884-
wal_config["full_checkpoint_interval"].value_or(DEFAULT_DELTA_CHECKPOINT_INTERVAL_SEC_STR);
884+
wal_config["delta_checkpoint_interval"].value_or(DEFAULT_DELTA_CHECKPOINT_INTERVAL_SEC_STR);
885885
auto res = ParseTimeInfo(delta_checkpoint_interval_str, delta_checkpoint_interval);
886886
if (!res.ok()) {
887887
return res;

src/storage/bg_task/periodic_trigger.cpp

+3-2
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ import third_party;
2828
namespace infinity {
2929

3030
void CleanupPeriodicTrigger::Trigger() {
31-
TxnTimeStamp visible_ts = wal_mgr_->GetLastCkpTS() + 1;
31+
TxnTimeStamp visible_ts = txn_mgr_->GetCleanupScanTS();
3232
// if (visible_ts == last_visible_ts_) {
3333
// LOG_TRACE(fmt::format("Skip cleanup. visible timestamp: {}", visible_ts));
3434
// return;
@@ -40,7 +40,8 @@ void CleanupPeriodicTrigger::Trigger() {
4040
last_visible_ts_ = visible_ts;
4141
LOG_TRACE(fmt::format("Cleanup visible timestamp: {}", visible_ts));
4242

43-
auto cleanup_task = MakeShared<CleanupTask>(catalog_, visible_ts, buffer_mgr_);
43+
auto buffer_mgr = txn_mgr_->GetBufferMgr();
44+
auto cleanup_task = MakeShared<CleanupTask>(catalog_, visible_ts, buffer_mgr);
4445
bg_processor_->Submit(std::move(cleanup_task));
4546
}
4647

src/storage/bg_task/periodic_trigger.cppm

+4-9
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,8 @@ import stl;
2020
import background_process;
2121
import compaction_process;
2222
import catalog;
23+
import txn_manager;
2324
import wal_manager;
24-
import buffer_manager;
2525

2626
namespace infinity {
2727

@@ -51,20 +51,15 @@ private:
5151

5252
export class CleanupPeriodicTrigger final : public PeriodicTrigger {
5353
public:
54-
CleanupPeriodicTrigger(std::chrono::milliseconds interval,
55-
BGTaskProcessor *bg_processor,
56-
Catalog *catalog,
57-
WalManager *wal_mgr,
58-
BufferManager *buffer_mgr)
59-
: PeriodicTrigger(interval), bg_processor_(bg_processor), catalog_(catalog), wal_mgr_(wal_mgr), buffer_mgr_(buffer_mgr) {}
54+
CleanupPeriodicTrigger(std::chrono::milliseconds interval, BGTaskProcessor *bg_processor, Catalog *catalog, TxnManager *txn_mgr)
55+
: PeriodicTrigger(interval), bg_processor_(bg_processor), catalog_(catalog), txn_mgr_(txn_mgr) {}
6056

6157
virtual void Trigger() override;
6258

6359
private:
6460
BGTaskProcessor *const bg_processor_{};
6561
Catalog *const catalog_{};
66-
WalManager *const wal_mgr_{};
67-
BufferManager *const buffer_mgr_{};
62+
TxnManager *const txn_mgr_{};
6863

6964
TxnTimeStamp last_visible_ts_{0};
7065
};

src/storage/storage.cpp

+1-1
Original file line numberDiff line numberDiff line change
@@ -124,7 +124,7 @@ void Storage::Init() {
124124
std::chrono::seconds cleanup_interval = static_cast<std::chrono::seconds>(config_ptr_->CleanupInterval());
125125
if (cleanup_interval.count() > 0) {
126126
periodic_trigger_thread_->AddTrigger(
127-
MakeUnique<CleanupPeriodicTrigger>(cleanup_interval, bg_processor_.get(), new_catalog_.get(), wal_mgr_.get(), buffer_mgr_.get()));
127+
MakeUnique<CleanupPeriodicTrigger>(cleanup_interval, bg_processor_.get(), new_catalog_.get(), txn_mgr_.get()));
128128
} else {
129129
LOG_WARN("Cleanup interval is not set, auto cleanup task will not be triggered");
130130
}

src/storage/txn/txn_manager.cpp

+14
Original file line numberDiff line numberDiff line change
@@ -233,6 +233,20 @@ SizeT TxnManager::ActiveTxnCount() {
233233

234234
TxnTimeStamp TxnManager::CurrentTS() const { return start_ts_; }
235235

236+
TxnTimeStamp TxnManager::GetCleanupScanTS() {
237+
std::lock_guard guard(rw_locker_);
238+
TxnTimeStamp first_uncommitted_begin_ts = start_ts_;
239+
while (!beginned_txns_.empty()) {
240+
auto first_txn = beginned_txns_.front().lock();
241+
if (first_txn.get() != nullptr) {
242+
first_uncommitted_begin_ts = first_txn->BeginTS();
243+
}
244+
beginned_txns_.pop_front();
245+
}
246+
TxnTimeStamp checkpointed_ts = wal_mgr_->GetCheckpointedTS();
247+
return std::min(first_uncommitted_begin_ts, checkpointed_ts);
248+
}
249+
236250
// A Txn can be deleted when there is no uncommitted txn whose begin is less than the commit ts of the txn
237251
// So maintain the least uncommitted begin ts
238252
void TxnManager::FinishTxn(Txn *txn) {

src/storage/txn/txn_manager.cppm

+2
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,8 @@ public:
8585

8686
TxnTimeStamp CurrentTS() const;
8787

88+
TxnTimeStamp GetCleanupScanTS();
89+
8890
private:
8991
void FinishTxn(Txn *txn);
9092

src/storage/wal/wal_manager.cpp

+8-7
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,7 @@ WalManager::WalManager(Storage *storage,
6565
FlushOptionType flush_option)
6666
: cfg_wal_size_threshold_(wal_size_threshold), cfg_delta_checkpoint_interval_wal_bytes_(delta_checkpoint_interval_wal_bytes), wal_dir_(wal_dir),
6767
wal_path_(wal_dir + "/" + WalFile::TempWalFilename()), storage_(storage), running_(false), flush_option_(flush_option), last_ckp_wal_size_(0),
68-
checkpoint_in_progress_(false), last_ckp_ts_(0), last_full_ckp_ts_(0) {}
68+
checkpoint_in_progress_(false), last_ckp_ts_(UNCOMMIT_TS), last_full_ckp_ts_(UNCOMMIT_TS) {}
6969

7070
WalManager::~WalManager() {
7171
if (running_.load()) {
@@ -146,7 +146,7 @@ i64 WalManager::WalSize() const {
146146
return wal_size_;
147147
}
148148

149-
TxnTimeStamp WalManager::GetLastCkpTS() { return last_ckp_ts_; }
149+
TxnTimeStamp WalManager::GetCheckpointedTS() { return last_ckp_ts_ == UNCOMMIT_TS ? 0 : last_ckp_ts_ + 1; }
150150

151151
// Flush is scheduled regularly. It collects a batch of transactions, sync
152152
// wal and do parallel committing. Each sync cost ~1s. Each checkpoint cost
@@ -280,17 +280,18 @@ void WalManager::Checkpoint(ForceCheckpointTask *ckp_task, TxnTimeStamp max_comm
280280
void WalManager::CheckpointInner(bool is_full_checkpoint, Txn *txn, TxnTimeStamp max_commit_ts, i64 wal_size) {
281281
DeferFn defer([&] { checkpoint_in_progress_.store(false); });
282282

283-
TxnTimeStamp last_ckp_ts = GetLastCkpTS();
283+
TxnTimeStamp last_ckp_ts = last_ckp_ts_;
284+
TxnTimeStamp last_full_ckp_ts = last_full_ckp_ts_;
284285
if (is_full_checkpoint) {
285-
if (max_commit_ts == last_full_ckp_ts_) {
286+
if (max_commit_ts == last_full_ckp_ts) {
286287
LOG_INFO(fmt::format("Skip full checkpoint because the max_commit_ts {} is the same as the last full checkpoint", max_commit_ts));
287288
return;
288289
}
289-
if (last_full_ckp_ts_ >= max_commit_ts) {
290+
if (last_full_ckp_ts != UNCOMMIT_TS && last_full_ckp_ts >= max_commit_ts) {
290291
UnrecoverableError(
291-
fmt::format("WalManager::UpdateLastFullMaxCommitTS last_full_ckp_ts_ {} >= max_commit_ts {}", last_full_ckp_ts_, max_commit_ts));
292+
fmt::format("WalManager::UpdateLastFullMaxCommitTS last_full_ckp_ts {} >= max_commit_ts {}", last_full_ckp_ts, max_commit_ts));
292293
}
293-
if (last_ckp_ts > max_commit_ts) {
294+
if (last_ckp_ts != UNCOMMIT_TS && last_ckp_ts > max_commit_ts) {
294295
UnrecoverableError(fmt::format("WalManager::UpdateLastFullMaxCommitTS last_ckp_ts {} >= max_commit_ts {}", last_ckp_ts, max_commit_ts));
295296
}
296297
} else {

src/storage/wal/wal_manager.cppm

+1-1
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,7 @@ public:
7070

7171
i64 GetLastCkpWalSize();
7272

73-
TxnTimeStamp GetLastCkpTS();
73+
TxnTimeStamp GetCheckpointedTS();
7474

7575
private:
7676
// Checkpoint Helper

src/unit_test/storage/bg_task/cleanup_task.cpp

+19-12
Original file line numberDiff line numberDiff line change
@@ -48,29 +48,34 @@ class CleanupTaskTest : public BaseTest {
4848
protected:
4949
void WaitCleanup(Storage *storage, TxnTimeStamp last_commit_ts) {
5050
Catalog *catalog = storage->catalog();
51-
WalManager *wal_mgr = storage->wal_manager();
5251
BufferManager *buffer_mgr = storage->buffer_manager();
5352

54-
LOG_INFO("Waiting cleanup");
53+
auto visible_ts = WaitFlushDeltaOp(storage, last_commit_ts);
54+
55+
auto cleanup_task = MakeShared<CleanupTask>(catalog, visible_ts, buffer_mgr);
56+
cleanup_task->Execute();
57+
}
58+
59+
TxnTimeStamp WaitFlushDeltaOp(Storage *storage, TxnTimeStamp last_commit_ts) {
60+
TxnManager *txn_mgr = storage->txn_manager();
61+
5562
TxnTimeStamp visible_ts = 0;
5663
time_t start = time(nullptr);
5764
while (true) {
58-
visible_ts = wal_mgr->GetLastCkpTS() + 1;
65+
visible_ts = txn_mgr->GetCleanupScanTS();
66+
// wait for at most 10s
5967
time_t end = time(nullptr);
6068
if (visible_ts >= last_commit_ts) {
61-
LOG_INFO(fmt::format("Cleanup finished after {}", end - start));
69+
LOG_INFO(fmt::format("FlushDeltaOp finished after {}", end - start));
6270
break;
6371
}
64-
// wait for at most 10s
6572
if (end - start > 10) {
66-
UnrecoverableError("WaitCleanup timeout");
73+
UnrecoverableException("WaitFlushDeltaOp timeout");
6774
}
68-
LOG_INFO(fmt::format("Before usleep. Wait cleanup for {} seconds", end - start));
75+
LOG_INFO(fmt::format("Before usleep. Wait flush delta op for {} seconds", end - start));
6976
usleep(1000 * 1000);
7077
}
71-
72-
auto cleanup_task = MakeShared<CleanupTask>(catalog, visible_ts, buffer_mgr);
73-
cleanup_task->Execute();
78+
return visible_ts;
7479
}
7580
};
7681

@@ -90,8 +95,9 @@ TEST_F(CleanupTaskTest, test_delete_db_simple) {
9095
{
9196
auto *txn = txn_mgr->BeginTxn(MakeUnique<String>("create db1"));
9297
txn->CreateDatabase(*db_name, ConflictType::kError);
93-
txn_mgr->CommitTxn(txn);
98+
last_commit_ts = txn_mgr->CommitTxn(txn);
9499
}
100+
WaitFlushDeltaOp(storage, last_commit_ts);
95101
{
96102
auto *txn = txn_mgr->BeginTxn(MakeUnique<String>("drop db1"));
97103
Status status = txn->DropDatabase(*db_name, ConflictType::kError);
@@ -184,8 +190,9 @@ TEST_F(CleanupTaskTest, test_delete_table_simple) {
184190
auto *txn = txn_mgr->BeginTxn(MakeUnique<String>("create table1"));
185191
auto status = txn->CreateTable(*db_name, std::move(table_def), ConflictType::kIgnore);
186192
EXPECT_TRUE(status.ok());
187-
txn_mgr->CommitTxn(txn);
193+
last_commit_ts = txn_mgr->CommitTxn(txn);
188194
}
195+
WaitFlushDeltaOp(storage, last_commit_ts);
189196
{
190197
auto *txn = txn_mgr->BeginTxn(MakeUnique<String>("drop table1"));
191198

src/unit_test/storage/buffer/buffer_obj.cpp

+2-2
Original file line numberDiff line numberDiff line change
@@ -80,14 +80,14 @@ class BufferObjTest : public BaseTest {
8080

8181
void WaitCleanup(Storage *storage, TxnTimeStamp last_commit_ts) {
8282
Catalog *catalog = storage->catalog();
83-
WalManager *wal_mgr = storage->wal_manager();
83+
TxnManager *txn_mgr = storage->txn_manager();
8484
BufferManager *buffer_mgr = storage->buffer_manager();
8585

8686
LOG_INFO("Waiting cleanup");
8787
TxnTimeStamp visible_ts = 0;
8888
time_t start = time(nullptr);
8989
while (true) {
90-
visible_ts = wal_mgr->GetLastCkpTS() + 1;
90+
visible_ts = txn_mgr->GetCleanupScanTS();
9191
time_t end = time(nullptr);
9292
if (visible_ts >= last_commit_ts) {
9393
LOG_INFO(fmt::format("Cleanup finished after {}", end - start));

src/unit_test/storage/knnindex/merge_optimize/test_optimize.cpp

+2-2
Original file line numberDiff line numberDiff line change
@@ -67,14 +67,14 @@ class OptimizeKnnTest : public BaseTest {
6767

6868
void WaitCleanup(Storage *storage, TxnTimeStamp last_commit_ts) {
6969
Catalog *catalog = storage->catalog();
70-
WalManager *wal_mgr = storage->wal_manager();
70+
TxnManager *txn_mgr = storage->txn_manager();
7171
BufferManager *buffer_mgr = storage->buffer_manager();
7272

7373
LOG_INFO("Waiting cleanup");
7474
TxnTimeStamp visible_ts = 0;
7575
time_t start = time(nullptr);
7676
while (true) {
77-
visible_ts = wal_mgr->GetLastCkpTS() + 1;
77+
visible_ts = txn_mgr->GetCleanupScanTS();
7878
time_t end = time(nullptr);
7979
if (visible_ts >= last_commit_ts) {
8080
LOG_INFO(fmt::format("Cleanup finished after {}", end - start));

src/unit_test/storage/wal/catalog_delta_replay.cpp

+2-2
Original file line numberDiff line numberDiff line change
@@ -57,12 +57,12 @@ using namespace infinity;
5757
class CatalogDeltaReplayTest : public BaseTest {
5858
protected:
5959
void WaitFlushDeltaOp(Storage *storage, TxnTimeStamp last_commit_ts) {
60-
WalManager *wal_mgr = storage->wal_manager();
60+
TxnManager *txn_mgr = storage->txn_manager();
6161

6262
TxnTimeStamp visible_ts = 0;
6363
time_t start = time(nullptr);
6464
while (true) {
65-
visible_ts = wal_mgr->GetLastCkpTS() + 1;
65+
visible_ts = txn_mgr->GetCleanupScanTS();
6666
if (visible_ts >= last_commit_ts) {
6767
break;
6868
}

src/unit_test/storage/wal/checkpoint.cpp

+4-6
Original file line numberDiff line numberDiff line change
@@ -67,12 +67,12 @@ class CheckpointTest : public BaseTest {
6767
void TearDown() override { RemoveDbDirs(); }
6868

6969
void WaitFlushDeltaOp(Storage *storage, TxnTimeStamp last_commit_ts) {
70-
WalManager *wal_mgr = storage->wal_manager();
70+
TxnManager *txn_mgr = storage->txn_manager();
7171
LOG_INFO("Waiting flush delta op");
7272
TxnTimeStamp visible_ts = 0;
7373
time_t start = time(nullptr);
7474
while (true) {
75-
visible_ts = wal_mgr->GetLastCkpTS() + 1;
75+
visible_ts = txn_mgr->GetCleanupScanTS();
7676
time_t end = time(nullptr);
7777
if (visible_ts >= last_commit_ts) {
7878
LOG_INFO(fmt::format("Flush delta op finished after {}", end - start));
@@ -87,14 +87,14 @@ class CheckpointTest : public BaseTest {
8787

8888
void WaitCleanup(Storage *storage, TxnTimeStamp last_commit_ts) {
8989
Catalog *catalog = storage->catalog();
90-
WalManager *wal_mgr = storage->wal_manager();
90+
TxnManager *txn_mgr = storage->txn_manager();
9191
BufferManager *buffer_mgr = storage->buffer_manager();
9292

9393
LOG_INFO("Waiting cleanup");
9494
TxnTimeStamp visible_ts = 0;
9595
time_t start = time(nullptr);
9696
while (true) {
97-
visible_ts = wal_mgr->GetLastCkpTS() + 1;
97+
visible_ts = txn_mgr->GetCleanupScanTS();
9898
time_t end = time(nullptr);
9999
if (visible_ts >= last_commit_ts) {
100100
LOG_INFO(fmt::format("Cleanup finished after {}", end - start));
@@ -220,8 +220,6 @@ TEST_F(CheckpointTest, test_cleanup_and_checkpoint) {
220220
txn_mgr->CommitTxn(txn5);
221221
}
222222
WaitCleanup(storage, last_commit_ts);
223-
usleep(5000 * 1000);
224-
WaitFlushDeltaOp(storage, last_commit_ts);
225223
infinity::InfinityContext::instance().UnInit();
226224
#ifdef INFINITY_DEBUG
227225
EXPECT_EQ(infinity::GlobalResourceUsage::GetObjectCount(), 0);

test/data/config/test_buffer_obj_2.toml

+1-1
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ compact_interval = "0s"
1313

1414
[wal]
1515
#short checkpoint interval to allow quick cleanup
16-
delta_checkpoint_interval_sec = "1s"
16+
full_checkpoint_interval = "1s"
1717

1818
[buffer]
1919
buffer_manager_size = "32mb"

test/data/config/test_catalog_delta.toml

+1-1
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ compact_interval = "0s"
1111

1212
[wal]
1313
#short checkpoint interval for test
14-
delta_checkpoint_interval_sec = "1s"
14+
delta_checkpoint_interval = "1s"
1515

1616
[buffer]
1717
[resource]

test/data/config/test_checkpoint.toml

+1-1
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ compact_interval = "0s"
1212

1313
[wal]
1414
#short checkpoint interval to allow quick cleanup
15-
delta_checkpoint_interval_sec = "5s"
15+
delta_checkpoint_interval = "5s"
1616

1717
[buffer]
1818
[resource]

0 commit comments

Comments
 (0)