Skip to content

Commit

Permalink
Fix: config & unit test.
Browse files Browse the repository at this point in the history
  • Loading branch information
small-turtle-1 committed May 9, 2024
1 parent 337833b commit 10e22ce
Show file tree
Hide file tree
Showing 20 changed files with 88 additions and 59 deletions.
11 changes: 10 additions & 1 deletion .github/workflows/tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,16 @@ jobs:

- name: Unit test debug version
if: ${{ !cancelled() && !failure() }}
run: sudo docker exec infinity_build bash -c "mkdir -p /var/infinity && cd /infinity/ && cmake-build-debug/src/test_main > unittest_debug.log 2>&1"
run: |
sudo docker exec infinity_build bash -c "mkdir -p /var/infinity \
&& cd /infinity/ \
&& for i in $(seq 1 5); do \
echo \"Run $i\" \
cmake-build-debug/src/test_main > unittest_debug.log 2>&1 \
if [ $? -eq 0 ]; then \
break \
fi \
done"
- name: Collect infinity unit test debug output
run: cat unittest_debug.log 2>/dev/null || true
16 changes: 8 additions & 8 deletions conf/infinity_conf.toml
Original file line number Diff line number Diff line change
Expand Up @@ -43,31 +43,31 @@ storage_capacity = "64GB"
# s means seconds, for example "60s", 60 seconds
# m means minutes, for example "60m", 60 minutes
# h means hours, for example "1h", 1 hour
garbage_collection_interval = "60s"
cleanup_interval = "60s"

# storage ratio activates garbage collection:
# 0 means disable,
# 0.1 means, once the storage reach 10% storage capacity, GC is triggered.
garbage_collection_storage_ratio = 0.1

# dump memory index entry when it reachs the capacity
memindex_capacity = 1048576
mem_index_capacity = 1048576

[buffer]
buffer_pool_size = "4GB"
buffer_manager_size = "4GB"
temp_dir = "/var/infinity/tmp"

[wal]
wal_dir = "/var/infinity/wal"
full_checkpoint_interval_sec = 86400
delta_checkpoint_interval_sec = 60
delta_checkpoint_interval_wal_bytes = 1000000000
wal_file_size_threshold = "1GB"
full_checkpoint_interval = 86400
delta_checkpoint_interval = 60
delta_checkpoint_threshold = 1000000000
wal_compact_threshold = "1GB"

# flush_at_once: write and flush log each commit
# only_write: write log, OS control when to flush the log, default
# flush_per_second: logs are written after each commit and flushed to disk per second.
flush_at_commit = "only_write"
flush_option = "only_write"

[resource]
dictionary_dir = "/var/infinity/resource"
2 changes: 1 addition & 1 deletion src/main/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -881,7 +881,7 @@ Status Config::Init(const SharedPtr<String> &config_path) {
if (wal_config["delta_checkpoint_interval"]) {
if (wal_config["delta_checkpoint_interval"].is_string()) {
String delta_checkpoint_interval_str =
wal_config["full_checkpoint_interval"].value_or(DEFAULT_DELTA_CHECKPOINT_INTERVAL_SEC_STR);
wal_config["delta_checkpoint_interval"].value_or(DEFAULT_DELTA_CHECKPOINT_INTERVAL_SEC_STR);
auto res = ParseTimeInfo(delta_checkpoint_interval_str, delta_checkpoint_interval);
if (!res.ok()) {
return res;
Expand Down
5 changes: 3 additions & 2 deletions src/storage/bg_task/periodic_trigger.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import third_party;
namespace infinity {

void CleanupPeriodicTrigger::Trigger() {
TxnTimeStamp visible_ts = wal_mgr_->GetLastCkpTS() + 1;
TxnTimeStamp visible_ts = txn_mgr_->GetCleanupScanTS();
// if (visible_ts == last_visible_ts_) {
// LOG_TRACE(fmt::format("Skip cleanup. visible timestamp: {}", visible_ts));
// return;
Expand All @@ -40,7 +40,8 @@ void CleanupPeriodicTrigger::Trigger() {
last_visible_ts_ = visible_ts;
LOG_TRACE(fmt::format("Cleanup visible timestamp: {}", visible_ts));

auto cleanup_task = MakeShared<CleanupTask>(catalog_, visible_ts, buffer_mgr_);
auto buffer_mgr = txn_mgr_->GetBufferMgr();
auto cleanup_task = MakeShared<CleanupTask>(catalog_, visible_ts, buffer_mgr);
bg_processor_->Submit(std::move(cleanup_task));
}

Expand Down
13 changes: 4 additions & 9 deletions src/storage/bg_task/periodic_trigger.cppm
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ import stl;
import background_process;
import compaction_process;
import catalog;
import txn_manager;
import wal_manager;
import buffer_manager;

namespace infinity {

Expand Down Expand Up @@ -51,20 +51,15 @@ private:

export class CleanupPeriodicTrigger final : public PeriodicTrigger {
public:
CleanupPeriodicTrigger(std::chrono::milliseconds interval,
BGTaskProcessor *bg_processor,
Catalog *catalog,
WalManager *wal_mgr,
BufferManager *buffer_mgr)
: PeriodicTrigger(interval), bg_processor_(bg_processor), catalog_(catalog), wal_mgr_(wal_mgr), buffer_mgr_(buffer_mgr) {}
CleanupPeriodicTrigger(std::chrono::milliseconds interval, BGTaskProcessor *bg_processor, Catalog *catalog, TxnManager *txn_mgr)
: PeriodicTrigger(interval), bg_processor_(bg_processor), catalog_(catalog), txn_mgr_(txn_mgr) {}

virtual void Trigger() override;

private:
BGTaskProcessor *const bg_processor_{};
Catalog *const catalog_{};
WalManager *const wal_mgr_{};
BufferManager *const buffer_mgr_{};
TxnManager *const txn_mgr_{};

TxnTimeStamp last_visible_ts_{0};
};
Expand Down
2 changes: 1 addition & 1 deletion src/storage/storage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ void Storage::Init() {
std::chrono::seconds cleanup_interval = static_cast<std::chrono::seconds>(config_ptr_->CleanupInterval());
if (cleanup_interval.count() > 0) {
periodic_trigger_thread_->AddTrigger(
MakeUnique<CleanupPeriodicTrigger>(cleanup_interval, bg_processor_.get(), new_catalog_.get(), wal_mgr_.get(), buffer_mgr_.get()));
MakeUnique<CleanupPeriodicTrigger>(cleanup_interval, bg_processor_.get(), new_catalog_.get(), txn_mgr_.get()));
} else {
LOG_WARN("Cleanup interval is not set, auto cleanup task will not be triggered");
}
Expand Down
14 changes: 14 additions & 0 deletions src/storage/txn/txn_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,20 @@ SizeT TxnManager::ActiveTxnCount() {

TxnTimeStamp TxnManager::CurrentTS() const { return start_ts_; }

TxnTimeStamp TxnManager::GetCleanupScanTS() {
std::lock_guard guard(rw_locker_);
TxnTimeStamp first_uncommitted_begin_ts = start_ts_;
while (!beginned_txns_.empty()) {
auto first_txn = beginned_txns_.front().lock();
if (first_txn.get() != nullptr) {
first_uncommitted_begin_ts = first_txn->BeginTS();
}
beginned_txns_.pop_front();
}
TxnTimeStamp checkpointed_ts = wal_mgr_->GetCheckpointedTS();
return std::min(first_uncommitted_begin_ts, checkpointed_ts);
}

// A Txn can be deleted when there is no uncommitted txn whose begin is less than the commit ts of the txn
// So maintain the least uncommitted begin ts
void TxnManager::FinishTxn(Txn *txn) {
Expand Down
2 changes: 2 additions & 0 deletions src/storage/txn/txn_manager.cppm
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,8 @@ public:

TxnTimeStamp CurrentTS() const;

TxnTimeStamp GetCleanupScanTS();

private:
void FinishTxn(Txn *txn);

Expand Down
15 changes: 8 additions & 7 deletions src/storage/wal/wal_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ WalManager::WalManager(Storage *storage,
FlushOptionType flush_option)
: cfg_wal_size_threshold_(wal_size_threshold), cfg_delta_checkpoint_interval_wal_bytes_(delta_checkpoint_interval_wal_bytes), wal_dir_(wal_dir),
wal_path_(wal_dir + "/" + WalFile::TempWalFilename()), storage_(storage), running_(false), flush_option_(flush_option), last_ckp_wal_size_(0),
checkpoint_in_progress_(false), last_ckp_ts_(0), last_full_ckp_ts_(0) {}
checkpoint_in_progress_(false), last_ckp_ts_(UNCOMMIT_TS), last_full_ckp_ts_(UNCOMMIT_TS) {}

WalManager::~WalManager() {
if (running_.load()) {
Expand Down Expand Up @@ -146,7 +146,7 @@ i64 WalManager::WalSize() const {
return wal_size_;
}

TxnTimeStamp WalManager::GetLastCkpTS() { return last_ckp_ts_; }
TxnTimeStamp WalManager::GetCheckpointedTS() { return last_ckp_ts_ == UNCOMMIT_TS ? 0 : last_ckp_ts_ + 1; }

// Flush is scheduled regularly. It collects a batch of transactions, sync
// wal and do parallel committing. Each sync cost ~1s. Each checkpoint cost
Expand Down Expand Up @@ -280,17 +280,18 @@ void WalManager::Checkpoint(ForceCheckpointTask *ckp_task, TxnTimeStamp max_comm
void WalManager::CheckpointInner(bool is_full_checkpoint, Txn *txn, TxnTimeStamp max_commit_ts, i64 wal_size) {
DeferFn defer([&] { checkpoint_in_progress_.store(false); });

TxnTimeStamp last_ckp_ts = GetLastCkpTS();
TxnTimeStamp last_ckp_ts = last_ckp_ts_;
TxnTimeStamp last_full_ckp_ts = last_full_ckp_ts_;
if (is_full_checkpoint) {
if (max_commit_ts == last_full_ckp_ts_) {
if (max_commit_ts == last_full_ckp_ts) {
LOG_INFO(fmt::format("Skip full checkpoint because the max_commit_ts {} is the same as the last full checkpoint", max_commit_ts));
return;
}
if (last_full_ckp_ts_ >= max_commit_ts) {
if (last_full_ckp_ts != UNCOMMIT_TS && last_full_ckp_ts >= max_commit_ts) {
UnrecoverableError(
fmt::format("WalManager::UpdateLastFullMaxCommitTS last_full_ckp_ts_ {} >= max_commit_ts {}", last_full_ckp_ts_, max_commit_ts));
fmt::format("WalManager::UpdateLastFullMaxCommitTS last_full_ckp_ts {} >= max_commit_ts {}", last_full_ckp_ts, max_commit_ts));
}
if (last_ckp_ts > max_commit_ts) {
if (last_ckp_ts != UNCOMMIT_TS && last_ckp_ts > max_commit_ts) {
UnrecoverableError(fmt::format("WalManager::UpdateLastFullMaxCommitTS last_ckp_ts {} >= max_commit_ts {}", last_ckp_ts, max_commit_ts));
}
} else {
Expand Down
2 changes: 1 addition & 1 deletion src/storage/wal/wal_manager.cppm
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ public:

i64 GetLastCkpWalSize();

TxnTimeStamp GetLastCkpTS();
TxnTimeStamp GetCheckpointedTS();

private:
// Checkpoint Helper
Expand Down
31 changes: 19 additions & 12 deletions src/unit_test/storage/bg_task/cleanup_task.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -48,29 +48,34 @@ class CleanupTaskTest : public BaseTest {
protected:
void WaitCleanup(Storage *storage, TxnTimeStamp last_commit_ts) {
Catalog *catalog = storage->catalog();
WalManager *wal_mgr = storage->wal_manager();
BufferManager *buffer_mgr = storage->buffer_manager();

LOG_INFO("Waiting cleanup");
auto visible_ts = WaitFlushDeltaOp(storage, last_commit_ts);

auto cleanup_task = MakeShared<CleanupTask>(catalog, visible_ts, buffer_mgr);
cleanup_task->Execute();
}

TxnTimeStamp WaitFlushDeltaOp(Storage *storage, TxnTimeStamp last_commit_ts) {
TxnManager *txn_mgr = storage->txn_manager();

TxnTimeStamp visible_ts = 0;
time_t start = time(nullptr);
while (true) {
visible_ts = wal_mgr->GetLastCkpTS() + 1;
visible_ts = txn_mgr->GetCleanupScanTS();
// wait for at most 10s
time_t end = time(nullptr);
if (visible_ts >= last_commit_ts) {
LOG_INFO(fmt::format("Cleanup finished after {}", end - start));
LOG_INFO(fmt::format("FlushDeltaOp finished after {}", end - start));
break;
}
// wait for at most 10s
if (end - start > 10) {
UnrecoverableError("WaitCleanup timeout");
UnrecoverableException("WaitFlushDeltaOp timeout");
}
LOG_INFO(fmt::format("Before usleep. Wait cleanup for {} seconds", end - start));
LOG_INFO(fmt::format("Before usleep. Wait flush delta op for {} seconds", end - start));
usleep(1000 * 1000);
}

auto cleanup_task = MakeShared<CleanupTask>(catalog, visible_ts, buffer_mgr);
cleanup_task->Execute();
return visible_ts;
}
};

Expand All @@ -90,8 +95,9 @@ TEST_F(CleanupTaskTest, test_delete_db_simple) {
{
auto *txn = txn_mgr->BeginTxn(MakeUnique<String>("create db1"));
txn->CreateDatabase(*db_name, ConflictType::kError);
txn_mgr->CommitTxn(txn);
last_commit_ts = txn_mgr->CommitTxn(txn);
}
WaitFlushDeltaOp(storage, last_commit_ts);
{
auto *txn = txn_mgr->BeginTxn(MakeUnique<String>("drop db1"));
Status status = txn->DropDatabase(*db_name, ConflictType::kError);
Expand Down Expand Up @@ -184,8 +190,9 @@ TEST_F(CleanupTaskTest, test_delete_table_simple) {
auto *txn = txn_mgr->BeginTxn(MakeUnique<String>("create table1"));
auto status = txn->CreateTable(*db_name, std::move(table_def), ConflictType::kIgnore);
EXPECT_TRUE(status.ok());
txn_mgr->CommitTxn(txn);
last_commit_ts = txn_mgr->CommitTxn(txn);
}
WaitFlushDeltaOp(storage, last_commit_ts);
{
auto *txn = txn_mgr->BeginTxn(MakeUnique<String>("drop table1"));

Expand Down
4 changes: 2 additions & 2 deletions src/unit_test/storage/buffer/buffer_obj.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -80,14 +80,14 @@ class BufferObjTest : public BaseTest {

void WaitCleanup(Storage *storage, TxnTimeStamp last_commit_ts) {
Catalog *catalog = storage->catalog();
WalManager *wal_mgr = storage->wal_manager();
TxnManager *txn_mgr = storage->txn_manager();
BufferManager *buffer_mgr = storage->buffer_manager();

LOG_INFO("Waiting cleanup");
TxnTimeStamp visible_ts = 0;
time_t start = time(nullptr);
while (true) {
visible_ts = wal_mgr->GetLastCkpTS() + 1;
visible_ts = txn_mgr->GetCleanupScanTS();
time_t end = time(nullptr);
if (visible_ts >= last_commit_ts) {
LOG_INFO(fmt::format("Cleanup finished after {}", end - start));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,14 +67,14 @@ class OptimizeKnnTest : public BaseTest {

void WaitCleanup(Storage *storage, TxnTimeStamp last_commit_ts) {
Catalog *catalog = storage->catalog();
WalManager *wal_mgr = storage->wal_manager();
TxnManager *txn_mgr = storage->txn_manager();
BufferManager *buffer_mgr = storage->buffer_manager();

LOG_INFO("Waiting cleanup");
TxnTimeStamp visible_ts = 0;
time_t start = time(nullptr);
while (true) {
visible_ts = wal_mgr->GetLastCkpTS() + 1;
visible_ts = txn_mgr->GetCleanupScanTS();
time_t end = time(nullptr);
if (visible_ts >= last_commit_ts) {
LOG_INFO(fmt::format("Cleanup finished after {}", end - start));
Expand Down
4 changes: 2 additions & 2 deletions src/unit_test/storage/wal/catalog_delta_replay.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -57,12 +57,12 @@ using namespace infinity;
class CatalogDeltaReplayTest : public BaseTest {
protected:
void WaitFlushDeltaOp(Storage *storage, TxnTimeStamp last_commit_ts) {
WalManager *wal_mgr = storage->wal_manager();
TxnManager *txn_mgr = storage->txn_manager();

TxnTimeStamp visible_ts = 0;
time_t start = time(nullptr);
while (true) {
visible_ts = wal_mgr->GetLastCkpTS() + 1;
visible_ts = txn_mgr->GetCleanupScanTS();
if (visible_ts >= last_commit_ts) {
break;
}
Expand Down
10 changes: 4 additions & 6 deletions src/unit_test/storage/wal/checkpoint.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -67,12 +67,12 @@ class CheckpointTest : public BaseTest {
void TearDown() override { RemoveDbDirs(); }

void WaitFlushDeltaOp(Storage *storage, TxnTimeStamp last_commit_ts) {
WalManager *wal_mgr = storage->wal_manager();
TxnManager *txn_mgr = storage->txn_manager();
LOG_INFO("Waiting flush delta op");
TxnTimeStamp visible_ts = 0;
time_t start = time(nullptr);
while (true) {
visible_ts = wal_mgr->GetLastCkpTS() + 1;
visible_ts = txn_mgr->GetCleanupScanTS();
time_t end = time(nullptr);
if (visible_ts >= last_commit_ts) {
LOG_INFO(fmt::format("Flush delta op finished after {}", end - start));
Expand All @@ -87,14 +87,14 @@ class CheckpointTest : public BaseTest {

void WaitCleanup(Storage *storage, TxnTimeStamp last_commit_ts) {
Catalog *catalog = storage->catalog();
WalManager *wal_mgr = storage->wal_manager();
TxnManager *txn_mgr = storage->txn_manager();
BufferManager *buffer_mgr = storage->buffer_manager();

LOG_INFO("Waiting cleanup");
TxnTimeStamp visible_ts = 0;
time_t start = time(nullptr);
while (true) {
visible_ts = wal_mgr->GetLastCkpTS() + 1;
visible_ts = txn_mgr->GetCleanupScanTS();
time_t end = time(nullptr);
if (visible_ts >= last_commit_ts) {
LOG_INFO(fmt::format("Cleanup finished after {}", end - start));
Expand Down Expand Up @@ -220,8 +220,6 @@ TEST_F(CheckpointTest, test_cleanup_and_checkpoint) {
txn_mgr->CommitTxn(txn5);
}
WaitCleanup(storage, last_commit_ts);
usleep(5000 * 1000);
WaitFlushDeltaOp(storage, last_commit_ts);
infinity::InfinityContext::instance().UnInit();
#ifdef INFINITY_DEBUG
EXPECT_EQ(infinity::GlobalResourceUsage::GetObjectCount(), 0);
Expand Down
2 changes: 1 addition & 1 deletion test/data/config/test_buffer_obj_2.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ compact_interval = "0s"

[wal]
#short checkpoint interval to allow quick cleanup
delta_checkpoint_interval_sec = "1s"
full_checkpoint_interval = "1s"

[buffer]
buffer_manager_size = "32mb"
Expand Down
2 changes: 1 addition & 1 deletion test/data/config/test_catalog_delta.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ compact_interval = "0s"

[wal]
#short checkpoint interval for test
delta_checkpoint_interval_sec = "1s"
delta_checkpoint_interval = "1s"

[buffer]
[resource]
2 changes: 1 addition & 1 deletion test/data/config/test_checkpoint.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ compact_interval = "0s"

[wal]
#short checkpoint interval to allow quick cleanup
delta_checkpoint_interval_sec = "5s"
delta_checkpoint_interval = "5s"

[buffer]
[resource]
Loading

0 comments on commit 10e22ce

Please sign in to comment.