Skip to content

Commit 6094a9f

Browse files
Fix fullckp & Cleanup temp file (infiniflow#1122)
### What problem does this PR solve? 1. Cleanup temp file when call `Save` on buffer_obj. 2. Fix: replay create index bug. ### Type of change - [x] Bug Fix (non-breaking change which fixes an issue) - [x] New Feature (non-breaking change which adds functionality) --------- Co-authored-by: Jin Hai <haijin.chn@gmail.com>
1 parent 001f774 commit 6094a9f

File tree

9 files changed

+176
-28
lines changed

9 files changed

+176
-28
lines changed

src/storage/buffer/buffer_manager.cpp

+13-3
Original file line numberDiff line numberDiff line change
@@ -75,14 +75,19 @@ BufferObj *BufferManager::GetBufferObject(UniquePtr<FileWorker> file_worker) {
7575

7676
void BufferManager::RemoveClean() {
7777
Vector<BufferObj *> clean_list;
78+
Vector<BufferObj *> clean_temp_list;
7879
{
7980
std::unique_lock lock(clean_locker_);
8081
clean_list.swap(clean_list_);
82+
clean_temp_list.swap(clean_temp_list_);
8183
}
8284

8385
for (auto *buffer_obj : clean_list) {
8486
buffer_obj->CleanupFile();
8587
}
88+
for (auto *buffer_obj : clean_temp_list) {
89+
buffer_obj->CleanupTempFile();
90+
}
8691

8792
{
8893
std::unique_lock lock(gc_locker_);
@@ -147,12 +152,12 @@ bool BufferManager::RemoveFromGCQueue(BufferObj *buffer_obj) {
147152
return RemoveFromGCQueueInner(buffer_obj);
148153
}
149154

150-
void BufferManager::AddToCleanList(BufferObj *buffer_obj, bool free) {
155+
void BufferManager::AddToCleanList(BufferObj *buffer_obj, bool do_free) {
151156
{
152157
std::unique_lock lock(clean_locker_);
153-
clean_list_.push_back(buffer_obj);
158+
clean_list_.emplace_back(buffer_obj);
154159
}
155-
if (free) {
160+
if (do_free) {
156161
std::unique_lock lock(gc_locker_);
157162
current_memory_size_ -= buffer_obj->GetBufferSize();
158163
if (!RemoveFromGCQueueInner(buffer_obj)) {
@@ -161,6 +166,11 @@ void BufferManager::AddToCleanList(BufferObj *buffer_obj, bool free) {
161166
}
162167
}
163168

169+
void BufferManager::AddToCleanTempList(BufferObj *buffer_obj) {
170+
std::unique_lock lock(clean_locker_);
171+
clean_temp_list_.push_back(buffer_obj);
172+
}
173+
164174
bool BufferManager::RemoveFromGCQueueInner(BufferObj *buffer_obj) {
165175
if (auto iter = gc_map_.find(buffer_obj); iter != gc_map_.end()) {
166176
gc_list_.erase(iter->second);

src/storage/buffer/buffer_manager.cppm

+4-1
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,9 @@ private:
6868

6969
bool RemoveFromGCQueue(BufferObj *buffer_obj);
7070

71-
void AddToCleanList(BufferObj *buffer_obj, bool free);
71+
void AddToCleanList(BufferObj *buffer_obj, bool do_free);
72+
73+
void AddToCleanTempList(BufferObj *buffer_obj);
7274

7375
private:
7476
bool RemoveFromGCQueueInner(BufferObj *buffer_obj);
@@ -90,6 +92,7 @@ private:
9092

9193
std::mutex clean_locker_{};
9294
Vector<BufferObj *> clean_list_{};
95+
Vector<BufferObj *> clean_temp_list_{};
9396
};
9497

9598
} // namespace infinity

src/storage/buffer/buffer_obj.cpp

+21-9
Original file line numberDiff line numberDiff line change
@@ -114,6 +114,7 @@ bool BufferObj::Save() {
114114
case BufferStatus::kUnloaded: {
115115
LOG_TRACE(fmt::format("BufferObj::Save file: {}", GetFilename()));
116116
file_worker_->WriteToFile(false);
117+
buffer_mgr_->AddToCleanTempList(this);
117118
write = true;
118119
break;
119120
}
@@ -138,28 +139,37 @@ void BufferObj::PickForCleanup() {
138139
switch (status_) {
139140
// when insert data into table with index, the index buffer_obj
140141
// will remain BufferStatus::kNew, so we should allow this situation
141-
case BufferStatus::kNew:
142+
case BufferStatus::kNew: {
143+
buffer_mgr_->AddToCleanList(this, false /*do_free*/);
144+
break;
145+
}
142146
case BufferStatus::kFreed: {
143-
status_ = BufferStatus::kClean;
144-
if (file_worker_->GetData() != nullptr) {
145-
UnrecoverableError("Buffer is not freed.");
146-
}
147-
buffer_mgr_->AddToCleanList(this, false /*free*/);
147+
buffer_mgr_->AddToCleanList(this, false /*do_free*/);
148148
break;
149149
}
150150
case BufferStatus::kUnloaded: {
151151
file_worker_->FreeInMemory();
152-
status_ = BufferStatus::kClean;
153-
buffer_mgr_->AddToCleanList(this, true /*free*/);
152+
buffer_mgr_->AddToCleanList(this, true /*do_free*/);
154153
break;
155154
}
156155
default: {
157156
UnrecoverableError(fmt::format("Invalid status: {}", BufferStatusToString(status_)));
158157
}
159158
}
159+
status_ = BufferStatus::kClean;
160+
switch (type_) {
161+
case BufferType::kPersistent:
162+
case BufferType::kTemp: {
163+
buffer_mgr_->AddToCleanTempList(this);
164+
break;
165+
}
166+
default: {
167+
break;
168+
}
169+
}
160170
}
161171

162-
void BufferObj::CleanupFile() {
172+
void BufferObj::CleanupFile() const {
163173
if (status_ != BufferStatus::kClean) {
164174
UnrecoverableError("Invalid status.");
165175
}
@@ -169,6 +179,8 @@ void BufferObj::CleanupFile() {
169179
file_worker_->CleanupFile();
170180
}
171181

182+
void BufferObj::CleanupTempFile() const { file_worker_->CleanupTempFile(); }
183+
172184
void BufferObj::LoadInner() {
173185
std::unique_lock<std::mutex> locker(w_locker_);
174186
if (status_ != BufferStatus::kLoaded) {

src/storage/buffer/buffer_obj.cppm

+3-1
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,9 @@ public:
7575

7676
void PickForCleanup();
7777

78-
void CleanupFile();
78+
void CleanupFile() const;
79+
80+
void CleanupTempFile() const ;
7981

8082
SizeT GetBufferSize() const { return file_worker_->GetMemoryCost(); }
8183

src/storage/buffer/file_worker/file_worker.cpp

+16-6
Original file line numberDiff line numberDiff line change
@@ -96,17 +96,27 @@ void FileWorker::MoveFile() {
9696
fs.Rename(src_path, dest_path);
9797
}
9898

99-
void FileWorker::CleanupFile() {
99+
void FileWorker::CleanupFile() const {
100100
LocalFileSystem fs;
101+
101102
String path = fmt::format("{}/{}", ChooseFileDir(false), *file_name_);
102103
if (fs.Exists(path)) {
103-
LOG_TRACE(fmt::format("Cleaning up file: {}", path));
104104
fs.DeleteFile(path);
105-
LOG_TRACE(fmt::format("Cleaned file: {}", path));
105+
LOG_INFO(fmt::format("Cleaned file: {}", path));
106+
} else {
107+
// Now, we cannot check whether a buffer obj has been flushed to disk.
108+
LOG_TRACE(fmt::format("Cleanup: File {} not found for deletion", path));
109+
}
110+
}
111+
112+
void FileWorker::CleanupTempFile() const {
113+
LocalFileSystem fs;
114+
String path = fmt::format("{}/{}", ChooseFileDir(true), *file_name_);
115+
if (fs.Exists(path)) {
116+
fs.DeleteFile(path);
117+
LOG_INFO(fmt::format("Cleaned file: {}", path));
106118
} else {
107-
// this may happen the same reason as in "CleanupScanner::CleanupDir"
108-
// It may also happen when cleanup a table not been flushed (need a checkpoint txn),
109-
// at that time there is not data file under dir.
119+
// Now, we cannot check whether a temp file is moved to data
110120
LOG_TRACE(fmt::format("Cleanup: File {} not found for deletion", path));
111121
}
112122
}

src/storage/buffer/file_worker/file_worker.cppm

+3-1
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,9 @@ public:
5353
// Get file path. As key of buffer handle.
5454
String GetFilePath() const { return fmt::format("{}/{}", *file_dir_, *file_name_); }
5555

56-
void CleanupFile();
56+
void CleanupFile() const;
57+
58+
void CleanupTempFile() const;
5759

5860
protected:
5961
virtual void WriteToFileImpl(bool to_spill, bool &prepare_success) = 0;

src/storage/meta/entry/table_index_entry.cpp

+3-1
Original file line numberDiff line numberDiff line change
@@ -226,7 +226,9 @@ SharedPtr<TableIndexEntry> TableIndexEntry::Deserialize(const nlohmann::json &in
226226
bool deleted = index_def_entry_json["deleted"];
227227

228228
if (deleted) {
229-
auto table_index_entry = ReplayTableIndexEntry(table_index_meta, true, nullptr, nullptr, txn_id, begin_ts, commit_ts);
229+
auto index_name = table_index_meta->index_name();
230+
auto table_index_entry =
231+
ReplayTableIndexEntry(table_index_meta, true, MakeShared<IndexBase>(index_name), nullptr, txn_id, begin_ts, commit_ts);
230232
table_index_entry->commit_ts_.store(commit_ts);
231233
table_index_entry->begin_ts_ = begin_ts;
232234
return table_index_entry;

src/storage/wal/wal_manager.cpp

+2-2
Original file line numberDiff line numberDiff line change
@@ -501,11 +501,11 @@ i64 WalManager::ReplayWalFile() {
501501
system_start_ts = replay_entries[replay_count]->commit_ts_;
502502
last_txn_id = replay_entries[replay_count]->txn_id_;
503503

504+
LOG_INFO(replay_entries[replay_count]->ToString());
504505
ReplayWalEntry(*replay_entries[replay_count]);
505-
LOG_TRACE(replay_entries[replay_count]->ToString());
506506
}
507507

508-
LOG_TRACE(fmt::format("System start ts: {}, lastest txn id: {}", system_start_ts, last_txn_id));
508+
LOG_INFO(fmt::format("System start ts: {}, lastest txn id: {}", system_start_ts, last_txn_id));
509509
storage_->catalog()->next_txn_id_ = last_txn_id;
510510
this->max_commit_ts_ = system_start_ts;
511511

src/unit_test/storage/buffer/buffer_parallel.cpp src/unit_test/storage/buffer/buffer_manager.cpp

+111-4
Original file line numberDiff line numberDiff line change
@@ -28,8 +28,31 @@ import config;
2828

2929
using namespace infinity;
3030

31-
class BufferParallelTest : public BaseTest {
31+
class BufferManagerTest : public BaseTest {
32+
private:
33+
LocalFileSystem fs;
34+
35+
Vector<SharedPtr<DirEntry>> ListAllFile(const String &path) {
36+
Vector<SharedPtr<DirEntry>> res;
37+
std::function<void(const String &)> f = [&](const String &path) {
38+
auto entries = fs.ListDirectory(path);
39+
for (auto &entry : entries) {
40+
if (entry->is_directory()) {
41+
f(entry->path());
42+
} else {
43+
res.push_back(entry);
44+
}
45+
}
46+
};
47+
f(path);
48+
return res;
49+
}
50+
3251
protected:
52+
Vector<SharedPtr<DirEntry>> ListAllData() { return ListAllFile(*data_dir_); }
53+
54+
Vector<SharedPtr<DirEntry>> ListAllTemp() { return ListAllFile(*temp_dir_); }
55+
3356
SharedPtr<String> data_dir_;
3457
SharedPtr<String> temp_dir_;
3558

@@ -40,13 +63,13 @@ class BufferParallelTest : public BaseTest {
4063

4164
data_dir_ = MakeShared<String>(std::string(tmp_data_path()) + "/buffer/data");
4265
temp_dir_ = MakeShared<String>(std::string(tmp_data_path()) + "/buffer/temp");
43-
LocalFileSystem fs;
66+
fs.DeleteDirectory(*data_dir_);
67+
fs.DeleteDirectory(*temp_dir_);
4468
fs.CreateDirectory(*data_dir_);
4569
fs.CreateDirectory(*temp_dir_);
4670
}
4771

4872
void TearDown() override {
49-
LocalFileSystem fs;
5073
fs.DeleteDirectory(*data_dir_);
5174
fs.DeleteDirectory(*temp_dir_);
5275

@@ -61,7 +84,90 @@ class BufferParallelTest : public BaseTest {
6184
};
6285
};
6386

64-
TEST_F(BufferParallelTest, test1) {
87+
TEST_F(BufferManagerTest, cleanup_test) {
88+
89+
const SizeT k = 2;
90+
const SizeT file_size = 100;
91+
const SizeT buffer_size = k * file_size;
92+
const SizeT file_num = 100;
93+
EXPECT_GT(file_num, k);
94+
95+
{
96+
BufferManager buffer_mgr(buffer_size, data_dir_, temp_dir_);
97+
Vector<BufferObj *> buffer_objs;
98+
99+
for (SizeT i = 0; i < file_num; ++i) {
100+
auto file_name = MakeShared<String>(fmt::format("file_{}", i));
101+
auto file_worker = MakeUnique<DataFileWorker>(data_dir_, file_name, file_size);
102+
auto *buffer_obj = buffer_mgr.AllocateBufferObject(std::move(file_worker));
103+
buffer_objs.push_back(buffer_obj);
104+
{
105+
auto buffer_handle = buffer_obj->Load();
106+
auto *data = reinterpret_cast<char *>(buffer_handle.GetDataMut());
107+
for (SizeT j = 0; j < file_size; ++j) {
108+
data[j] = 'a' + i % 26;
109+
}
110+
}
111+
}
112+
113+
{
114+
auto datas = ListAllData();
115+
auto temps = ListAllTemp();
116+
EXPECT_EQ(datas.size(), 0ull);
117+
EXPECT_EQ(temps.size(), file_num - k);
118+
}
119+
120+
{
121+
SizeT write_n = 0;
122+
for (auto *buffer_obj : buffer_objs) {
123+
if (buffer_obj->Save()) {
124+
++write_n;
125+
}
126+
}
127+
EXPECT_EQ(write_n, k);
128+
}
129+
{
130+
auto datas = ListAllData();
131+
auto temps = ListAllTemp();
132+
EXPECT_EQ(datas.size(), file_num);
133+
EXPECT_EQ(temps.size(), 0ull);
134+
}
135+
136+
for (SizeT i = 0; i < file_num; ++i) {
137+
auto *buffer_obj = buffer_objs[i];
138+
auto buffer_handle = buffer_obj->Load();
139+
const auto *data = reinterpret_cast<const char *>(buffer_handle.GetData());
140+
for (SizeT j = 0; j < file_size; ++j) {
141+
EXPECT_EQ(data[j], char('a' + i % 26));
142+
}
143+
}
144+
{
145+
auto datas = ListAllData();
146+
auto temps = ListAllTemp();
147+
EXPECT_EQ(datas.size(), file_num);
148+
EXPECT_EQ(temps.size(), 0ull);
149+
}
150+
151+
for (SizeT i = 0; i < file_num; ++i) {
152+
auto *buffer_obj = buffer_objs[i];
153+
auto buffer_handle = buffer_obj->Load();
154+
auto *data = reinterpret_cast<char *>(buffer_handle.GetDataMut());
155+
for (SizeT j = 0; j < file_size; ++j) {
156+
data[j] = 'a' + (i + i) % 26;
157+
}
158+
}
159+
{
160+
auto datas = ListAllData();
161+
auto temps = ListAllTemp();
162+
EXPECT_EQ(datas.size(), file_num);
163+
EXPECT_EQ(temps.size(), file_num - k);
164+
}
165+
}
166+
}
167+
168+
TEST_F(BufferManagerTest, parallel_test) {
169+
LocalFileSystem fs;
170+
65171
const SizeT thread_n = 4;
66172
const SizeT file_n = 100;
67173
const SizeT avg_file_size = 100;
@@ -145,4 +251,5 @@ TEST_F(BufferParallelTest, test1) {
145251
thread.join();
146252
}
147253
}
254+
LOG_INFO("Finished parallel test.");
148255
}

0 commit comments

Comments
 (0)