Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement table snapshot and save it for recovery #2539

Open
wants to merge 14 commits into
base: main
Choose a base branch
from
1 change: 1 addition & 0 deletions src/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,7 @@ add_library(infinity_core
${common_cpp}
${admin_cpp}
${network_cpp}
executor/operator/snapshot/database_snapshot.cpp
)

target_sources(infinity_core
Expand Down
13 changes: 7 additions & 6 deletions src/common/utility/exception.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,13 +32,14 @@ namespace infinity {

void PrintTransactionHistory() {
TxnManager *txn_manager = InfinityContext::instance().storage()->txn_manager();
if (txn_manager != nullptr) {
Vector<SharedPtr<TxnContext>> txn_contexts = txn_manager->GetTxnContextHistories();

Vector<SharedPtr<TxnContext>> txn_contexts = txn_manager->GetTxnContextHistories();

SizeT history_count = txn_contexts.size();
for (SizeT idx = 0; idx < history_count; ++idx) {
SharedPtr<TxnContext> txn_history = txn_contexts[idx];
LOG_CRITICAL(txn_history->ToString());
SizeT history_count = txn_contexts.size();
for (SizeT idx = 0; idx < history_count; ++idx) {
SharedPtr<TxnContext> txn_history = txn_contexts[idx];
LOG_CRITICAL(txn_history->ToString());
}
}
}

Expand Down
80 changes: 80 additions & 0 deletions src/executor/operator/snapshot/database_snapshot.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
// // Copyright(C) 2025 InfiniFlow, Inc. All rights reserved.
// //
// // Licensed under the Apache License, Version 2.0 (the "License");
// // you may not use this file except in compliance with the License.
// // You may obtain a copy of the License at
// //
// // https://www.apache.org/licenses/LICENSE-2.0
// //
// // Unless required by applicable law or agreed to in writing, software
// // distributed under the License is distributed on an "AS IS" BASIS,
// // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// // See the License for the specific language governing permissions and
// // limitations under the License.
//
// module;
//
// module snapshot;
//
// import stl;
// import txn;
// import query_context;
// import table_entry;
// import status;
// import third_party;
// import config;
// import infinity_exception;
// import snapshot_info;
//
// namespace infinity {
//
// Status Snapshot::CreateDatabaseSnapshot(QueryContext *query_context, const String &snapshot_name) {
// Txn *txn_ptr = query_context->GetTxn();
// const String &db_name = query_context->schema_name();
//
//
// SharedPtr<DatabaseSnapshotInfo> db_snapshot;
// db_snapshot->snapshot_name_ = snapshot_name;
// db_snapshot->db_name_ = db_name;
//
//
// Vector<String> table_names = txn_ptr->GetTableNames();
// for (const auto &table_name : table_names) {
// SharedPtr<TableSnapshotInfo> table_snapshot;
// Status status;
// std::tie(table_snapshot, status) = txn_ptr->GetTableSnapshot(db_name, table_name);
// if (!status.ok()) {
// LOG_WARN(fmt::format("Failed to get table snapshot {}: {}", table_name, status.message()));
// continue;
// }
// table_snapshot->snapshot_name_ = snapshot_name;
// db_snapshot->table_snapshots_[table_name] = table_snapshot;
// }
//
//
// String snapshot_dir = query_context->global_config()->SnapshotDir();
// db_snapshot->Serialize(snapshot_dir);
//
// return Status::OK();
// }
//
// void RestoreDatabaseSnapshot(QueryContext *query_context, const String &snapshot_name) {
// auto snapshot_dir = query_context->global_config()->SnapshotDir();
//
//
// auto [db_snapshot, status] = DatabaseSnapshotInfo::Deserialize(snapshot_dir, snapshot_name);
// if (!status.ok()) {
// LOG_ERROR(fmt::format("Failed to deserialize database snapshot: {}", status.message()));
// return;
// }
//
//
// for (const auto &[table_name, table_snapshot] : db_snapshot->table_snapshots_) {
// auto txn = query_context->GetTxn();
// txn->ApplyTableSnapshot(table_snapshot);
// }
//
// LOG_INFO(fmt::format("Database snapshot restored successfully: {}", snapshot_name));
// }
//
// } // namespace infinity
18 changes: 14 additions & 4 deletions src/executor/operator/snapshot/snapshot.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,18 +33,24 @@ namespace infinity {
Status Snapshot::DropSnapshot(QueryContext *query_context, const String &snapshot_name) {

String snapshot_dir = query_context->global_config()->SnapshotDir();
bool found = false;
bool meta_found = false;
bool directory_found = false;
for (const auto &entry : std::filesystem::directory_iterator(snapshot_dir)) {
if (entry.is_directory()) {
// Don't search the directory recursively
LOG_INFO("Check snapshot directory: {}" + entry.path().string());
if (entry.path().stem() == snapshot_name) {
VirtualStore::RemoveDirectory(entry.path().string());
directory_found = true;
}
} else {
// Just the file base name
if (entry.path().stem() == snapshot_name) {
String extension = entry.path().extension();
if (extension == ".json" or extension == ".lz4") {
LOG_INFO(fmt::format("Delete file: {}", entry.path().string()));
VirtualStore::DeleteFile(entry.path().string());
found = true;
meta_found = true;
}
} else {
String filename = entry.path().filename();
Expand All @@ -53,8 +59,12 @@ Status Snapshot::DropSnapshot(QueryContext *query_context, const String &snapsho
}
}

if (!found) {
return Status::NotFound(fmt::format("Snapshot: {} not found", snapshot_name));
if (!meta_found) {
return Status::NotFound(fmt::format("Snapshot meta: {} not found", snapshot_name));
}

if (!directory_found) {
return Status::NotFound(fmt::format("Snapshot directory: {} not found", snapshot_name));
}

return Status::OK();
Expand Down
7 changes: 5 additions & 2 deletions src/executor/operator/snapshot/table_snapshot.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,10 @@ Status Snapshot::CreateTableSnapshot(QueryContext *query_context, const String &
}
table_snapshot->snapshot_name_ = snapshot_name;
String snapshot_dir = query_context->global_config()->SnapshotDir();
table_snapshot->Serialize(snapshot_dir);
status = table_snapshot->Serialize(snapshot_dir);
if (!status.ok()) {
return status;
}

return Status::OK();
}
Expand All @@ -52,7 +55,7 @@ Status Snapshot::RestoreTableSnapshot(QueryContext *query_context, const String
SharedPtr<TableSnapshotInfo> table_snapshot;
Status status;
std::tie(table_snapshot, status) = TableSnapshotInfo::Deserialize(snapshot_dir, snapshot_name);
if(!status.ok()) {
if (!status.ok()) {
return status;
}
txn_ptr->ApplyTableSnapshot(table_snapshot);
Expand Down
110 changes: 93 additions & 17 deletions src/storage/common/snapshot_info.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,13 +38,15 @@ import block_version;
import data_type;
import parsed_expr;
import segment_entry;
import create_index_info;

namespace infinity {

nlohmann::json BlockColumnSnapshotInfo::Serialize() {
nlohmann::json json_res;
json_res["column_id"] = column_id_;
json_res["filename"] = filename_;
json_res["last_chunk_offset"] = last_chunk_offset_;
for (const auto &outline_snapshot : outline_snapshots_) {
json_res["outlines"].emplace_back(outline_snapshot->filename_);
}
Expand All @@ -55,6 +57,7 @@ SharedPtr<BlockColumnSnapshotInfo> BlockColumnSnapshotInfo::Deserialize(const nl
auto column_block_snapshot = MakeShared<BlockColumnSnapshotInfo>();
column_block_snapshot->column_id_ = column_block_json["column_id"];
column_block_snapshot->filename_ = column_block_json["filename"];
column_block_snapshot->last_chunk_offset_ = column_block_json["last_chunk_offset"];
if (column_block_json.contains("outlines")) {
for (const auto &outline_snapshot : column_block_json["outlines"]) {
auto outline_snapshot_info = MakeShared<OutlineSnapshotInfo>();
Expand All @@ -69,6 +72,10 @@ nlohmann::json BlockSnapshotInfo::Serialize() {
nlohmann::json json_res;
json_res["block_id"] = block_id_;
json_res["block_dir"] = block_dir_;
json_res["row_count"] = row_count_;
json_res["min_row_ts"] = min_row_ts_;
json_res["max_row_ts"] = max_row_ts_;
json_res["row_capacity"] = row_capacity_;
for (const auto &column_block_snapshot : column_block_snapshots_) {
json_res["columns"].emplace_back(column_block_snapshot->Serialize());
}
Expand All @@ -79,6 +86,9 @@ SharedPtr<BlockSnapshotInfo> BlockSnapshotInfo::Deserialize(const nlohmann::json
auto block_snapshot = MakeShared<BlockSnapshotInfo>();
block_snapshot->block_id_ = block_json["block_id"];
block_snapshot->block_dir_ = block_json["block_dir"];
block_snapshot->min_row_ts_ = block_json["min_row_ts"];
block_snapshot->max_row_ts_ = block_json["max_row_ts"];
block_snapshot->row_capacity_ = block_json["row_capacity"];
for (const auto &column_block_json : block_json["columns"]) {
auto column_block_snapshot = BlockColumnSnapshotInfo::Deserialize(column_block_json);
block_snapshot->column_block_snapshots_.emplace_back(column_block_snapshot);
Expand Down Expand Up @@ -123,11 +133,21 @@ SharedPtr<SegmentSnapshotInfo> SegmentSnapshotInfo::Deserialize(const nlohmann::

nlohmann::json ChunkIndexSnapshotInfo::Serialize() {
nlohmann::json json_res;
json_res["chunk_id"] = chunk_id_;
json_res["base_name"] = base_name_;
for (const auto &file : files_) {
json_res["files"].emplace_back(file);
}
return json_res;
}

SharedPtr<ChunkIndexSnapshotInfo> ChunkIndexSnapshotInfo::Deserialize(const nlohmann::json &chunk_index_json) {
auto chunk_index_snapshot = MakeShared<ChunkIndexSnapshotInfo>();
chunk_index_snapshot->chunk_id_ = chunk_index_json["chunk_id"];
chunk_index_snapshot->base_name_ = chunk_index_json["base_name"];
for (auto &file : chunk_index_snapshot->files_) {
chunk_index_snapshot->files_.emplace_back(file);
}
return chunk_index_snapshot;
}

Expand Down Expand Up @@ -164,21 +184,30 @@ SharedPtr<TableIndexSnapshotInfo> TableIndexSnapshotInfo::Deserialize(const nloh
auto table_index_snapshot = MakeShared<TableIndexSnapshotInfo>();
table_index_snapshot->index_dir_ = MakeShared<String>(table_index_json["index_dir"]);
table_index_snapshot->index_base_ = IndexBase::Deserialize(table_index_json["index_base"]);
for (const auto &segment_index_json : table_index_json["segment_indexes"]) {
auto segment_index_snapshot = SegmentIndexSnapshotInfo::Deserialize(segment_index_json);
table_index_snapshot->index_by_segment_.emplace(segment_index_snapshot->segment_id_, segment_index_snapshot);
if (table_index_json.count("segment_index") > 0) {
for (const auto &segment_index_json : table_index_json["segment_indexes"]) {
auto segment_index_snapshot = SegmentIndexSnapshotInfo::Deserialize(segment_index_json);
table_index_snapshot->index_by_segment_.emplace(segment_index_snapshot->segment_id_, segment_index_snapshot);
}
}
return table_index_snapshot;
}

void TableSnapshotInfo::Serialize(const String &save_dir) {
Status TableSnapshotInfo::Serialize(const String &save_dir) {

Config *config = InfinityContext::instance().config();
PersistenceManager *persistence_manager = InfinityContext::instance().persistence_manager();

// Create compressed file
// String compressed_filename = fmt::format("{}/{}.lz4", save_dir, snapshot_name_);
// std::ofstream output_stream = VirtualStore::BeginCompress(compressed_filename);
String snapshot_meta = fmt::format("{}/{}.json", save_dir, snapshot_name_);
if (VirtualStore::Exists(snapshot_meta)) {
return Status::DuplicatedFile(snapshot_meta);
} else {
String snapshot_dir = fmt::format("{}/{}", save_dir, snapshot_name_);
VirtualStore::MakeDirectory(snapshot_dir);
}

// Get files
Vector<String> original_files = GetFiles();
Expand Down Expand Up @@ -315,14 +344,9 @@ void TableSnapshotInfo::Serialize(const String &save_dir) {

String json_string = json_res.dump();

Path save_path = Path(save_dir) / fmt::format("{}.json", snapshot_name_);

if (!VirtualStore::Exists(save_dir)) {
VirtualStore::MakeDirectory(save_dir);
}
auto [snapshot_file_handle, status] = VirtualStore::Open(save_path.string(), FileAccessMode::kWrite);
auto [snapshot_file_handle, status] = VirtualStore::Open(snapshot_meta, FileAccessMode::kWrite);
if (!status.ok()) {
UnrecoverableError(fmt::format("{}: {}", save_path.string(), status.message()));
UnrecoverableError(fmt::format("{}: {}", snapshot_meta, status.message()));
}

status = snapshot_file_handle->Append(json_string.data(), json_string.size());
Expand All @@ -332,6 +356,8 @@ void TableSnapshotInfo::Serialize(const String &save_dir) {
snapshot_file_handle->Sync();

LOG_INFO(fmt::format("{}", json_res.dump()));

return Status::OK();
}

Vector<String> TableSnapshotInfo::GetFiles() const {
Expand All @@ -351,13 +377,50 @@ Vector<String> TableSnapshotInfo::GetFiles() const {

for (const auto &table_index_snapshot_pair : table_index_snapshots_) {
for (const auto &segment_index_snapshot_pair : table_index_snapshot_pair.second->index_by_segment_) {
SegmentID segment_id = segment_index_snapshot_pair.second->segment_id_;
for (const auto &chunk_index_snapshot : segment_index_snapshot_pair.second->chunk_index_snapshots_) {
if (chunk_index_snapshot->files_.empty()) {
files.emplace_back(
VirtualStore::ConcatenatePath(*table_index_snapshot_pair.second->index_dir_, chunk_index_snapshot->base_name_));
} else {
files.insert(files.end(), chunk_index_snapshot->files_.cbegin(), chunk_index_snapshot->files_.cend());
}
// IndexBase *index_base = table_index_snapshot_pair.second->index_base_.get();

// switch (index_base->index_type_) {
// case IndexType::kIVF: {
// break;
// }
// case IndexType::kHnsw: {
// break;
// }
// case IndexType::kBMP: {
// break;
// }
// case IndexType::kFullText: {
// break;
// }
// case IndexType::kSecondary: {
// files.emplace_back(
// VirtualStore::ConcatenatePath(*table_index_snapshot_pair.second->index_dir_,fmt::format("seg{}_chunk{}.idx", segment_id,
// chunk_index_snapshot->chunk_id_) ));
// break;
// }
// case IndexType::kEMVB: {
// break;
// }
// case IndexType::kDiskAnn: {
// break;
// }
// case IndexType::kInvalid: {
// UnrecoverableError("Invalid index type");
// break;
// }
// }
//
files.emplace_back(VirtualStore::ConcatenatePath(*table_index_snapshot_pair.second->index_dir_,
fmt::format("seg{}_chunk{}.idx", segment_id, chunk_index_snapshot->chunk_id_)));

// if (chunk_index_snapshot->files_.empty()) {
// files.emplace_back(
// VirtualStore::ConcatenatePath(*table_index_snapshot_pair.second->index_dir_, chunk_index_snapshot->index_filename_));
// } else {
// files.insert(files.end(), chunk_index_snapshot->files_.cbegin(), chunk_index_snapshot->files_.cend());
// }
}
}
}
Expand Down Expand Up @@ -454,6 +517,19 @@ Tuple<SharedPtr<TableSnapshotInfo>, Status> TableSnapshotInfo::Deserialize(const
}

// LOG_INFO(table_snapshot->ToString());
// LOG_INFO(fmt::format("Deserialize src data: {}/{}", snapshot_dir, snapshot_name));
Vector<String> original_files = table_snapshot->GetFiles();
Config *config = InfinityContext::instance().config();
String data_dir = config->DataDir();
for (const auto &file : original_files) {
String src_file_path = fmt::format("{}/{}/{}", snapshot_dir, snapshot_name, file);
String dst_file_path = fmt::format("{}/{}", data_dir, file);
// LOG_INFO(fmt::format("Copy from: {} to {}", src_file_path, dst_file_path));
Status copy_status = VirtualStore::Copy(dst_file_path, src_file_path);
if (!copy_status.ok()) {
RecoverableError(copy_status);
}
}

return {table_snapshot, Status::OK()};
}
Expand Down
Loading
Loading