forked from infiniflow/infinity
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathwal_manager.cppm
133 lines (98 loc) · 4.74 KB
/
wal_manager.cppm
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
// Copyright(C) 2023 InfiniFlow, Inc. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
module;
export module wal_manager;
import stl;
import bg_task;
import wal_entry;
import options;
import catalog_delta_entry;
import blocking_queue;
namespace infinity {
class Storage;
class BGTaskProcessor;
class TableEntry;
class Txn;
class SegmentEntry;
export class WalManager {
public:
WalManager(Storage *storage, String wal_dir, u64 wal_size_threshold, u64 delta_checkpoint_interval_wal_bytes, FlushOption flush_option);
~WalManager();
void Start();
void Stop();
// Session request to persist an entry. Assuming txn_id of the entry has
// been initialized.
void PutEntries(Vector<WalEntry *> wal_entries);
// Flush is scheduled regularly. It collects a batch of transactions, sync
// wal and do parallel committing. Each sync cost ~1s. Each checkpoint cost
// ~10s. So it's necessary to sync for a batch of transactions, and to
// checkpoint for a batch of sync.
void Flush();
bool TrySubmitCheckpointTask(SharedPtr<CheckpointTaskBase> ckp_task);
void Checkpoint(bool is_full_checkpoint, TxnTimeStamp max_commit_ts, i64 wal_size);
void Checkpoint(ForceCheckpointTask *ckp_task, TxnTimeStamp max_commit_ts, i64 wal_size);
void SwapWalFile(TxnTimeStamp max_commit_ts);
i64 ReplayWalFile();
void ReplayWalEntry(const WalEntry &entry);
void RecycleWalFile(TxnTimeStamp full_ckp_ts);
// Should only call in `Flush` thread
i64 WalSize() const;
i64 GetLastCkpWalSize();
TxnTimeStamp GetCheckpointedTS();
private:
// Checkpoint Helper
void CheckpointInner(bool is_full_checkpoint, Txn *txn, TxnTimeStamp max_commit_ts, i64 wal_size);
void SetLastCkpWalSize(i64 wal_size);
void WalCmdCreateDatabaseReplay(const WalCmdCreateDatabase &cmd, TransactionID txn_id, TxnTimeStamp commit_ts);
void WalCmdDropDatabaseReplay(const WalCmdDropDatabase &cmd, TransactionID txn_id, TxnTimeStamp commit_ts);
void WalCmdCreateTableReplay(const WalCmdCreateTable &cmd, TransactionID txn_id, TxnTimeStamp commit_ts);
void WalCmdDropTableReplay(const WalCmdDropTable &cmd, TransactionID txn_id, TxnTimeStamp commit_ts);
void WalCmdCreateIndexReplay(const WalCmdCreateIndex &cmd, TransactionID txn_id, TxnTimeStamp commit_ts);
void WalCmdDropIndexReplay(const WalCmdDropIndex &cmd, TransactionID txn_id, TxnTimeStamp commit_ts);
void WalCmdAppendReplay(const WalCmdAppend &cmd, TransactionID txn_id, TxnTimeStamp commit_ts);
// import and compact helper
SharedPtr<SegmentEntry> ReplaySegment(TableEntry *table_entry, const WalSegmentInfo &segment_info, TransactionID txn_id, TxnTimeStamp commit_ts);
void WalCmdImportReplay(const WalCmdImport &cmd, TransactionID txn_id, TxnTimeStamp commit_ts);
void WalCmdDeleteReplay(const WalCmdDelete &cmd, TransactionID txn_id, TxnTimeStamp commit_ts);
// void WalCmdSetSegmentStatusSealedReplay(const WalCmdSetSegmentStatusSealed &cmd, TransactionID txn_id, TxnTimeStamp commit_ts);
// void WalCmdUpdateSegmentBloomFilterDataReplay(const WalCmdUpdateSegmentBloomFilterData &cmd, TransactionID txn_id, TxnTimeStamp commit_ts);
void WalCmdCompactReplay(const WalCmdCompact &cmd, TransactionID txn_id, TxnTimeStamp commit_ts);
public:
u64 cfg_wal_size_threshold_{};
u64 cfg_delta_checkpoint_interval_wal_bytes_{};
private:
// Concurrent writing WAL is disallowed. So put all WAL writing into a queue
// and do serial writing.
String wal_dir_{};
String wal_path_{};
Storage *storage_{};
// WalManager state
Atomic<bool> running_{};
Thread flush_thread_{};
// TxnManager and Flush thread access following members
BlockingQueue<WalEntry *> wait_flush_{};
// Only Flush thread access following members
std::ofstream ofs_{};
TxnTimeStamp max_commit_ts_{};
i64 wal_size_{};
FlushOption flush_option_{FlushOption::kOnlyWrite};
// Flush and Checkpoint threads access following members
mutable std::mutex mutex2_{};
i64 last_ckp_wal_size_{};
Atomic<bool> checkpoint_in_progress_{false};
// Only Checkpoint/Cleanup thread access following members
TxnTimeStamp last_ckp_ts_{};
TxnTimeStamp last_full_ckp_ts_{};
};
} // namespace infinity