diff --git a/python/benchmark/clients/base_client.py b/python/benchmark/clients/base_client.py index 1dfb7a859c..8cd6d11a9a 100644 --- a/python/benchmark/clients/base_client.py +++ b/python/benchmark/clients/base_client.py @@ -28,6 +28,7 @@ def __init__(self, conf_path: str) -> None: self.clients = list() # Following are for multithreading self.mt_lock = threading.Lock() + self.mt_query_batch = 10 self.mt_next_begin = 0 self.mt_done_queries = 0 self.mt_active_workers = 0 @@ -35,6 +36,7 @@ def __init__(self, conf_path: str) -> None: # Following are for multiprocessing self.mp_manager = multiprocessing.Manager() self.mp_lock = multiprocessing.Lock() + self.mp_query_batch = multiprocessing.Value("i", 10, lock=False) self.mp_next_begin = multiprocessing.Value("i", 0, lock=False) self.mp_done_queries = multiprocessing.Value("i", 0, lock=False) self.mp_active_workers = multiprocessing.Value("i", 0, lock=False) @@ -136,14 +138,22 @@ def search_mt(self, is_express=False, num_workers=1): f"average QPS since {start_str}: {avg_start}, average QPS of last interval: {avg_interval}" ) else: + query_batch = 10 with self.mt_lock: + # how many queries a single worker can do in 10ms + query_batch = self.mt_done_queries / ( + num_workers * report_qps_sec * 100 + ) + query_batch = min(100, query_batch) + query_batch = max(query_batch, 1) + self.mt_query_batch = query_batch self.mt_done_queries = 0 start = now start_str = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(start)) report_prev = now done_warm_up = True logging.info( - "Collecting statistics for 30 minutes. Print statistics so far every minute. Type Ctrl+C to quit." + f"Collecting statistics for 30 minutes and print QPS so far every minute. Workers will report every {query_batch} queries. Type Ctrl+C to quit." ) for i in range(num_workers): @@ -152,9 +162,11 @@ def search_mt(self, is_express=False, num_workers=1): self.save_and_check_results(self.mt_results) def search_thread_mainloop(self, is_express: bool, client_id: int): - query_batch = 100 num_queries = len(self.queries) if is_express: + query_batch = 10 + with self.mt_lock: + query_batch = self.mt_query_batch local_rng = random.Random() # random number generator per thread deadline = time.time() + 30 * 60 # 30 minutes while time.time() < deadline: @@ -163,6 +175,7 @@ def search_thread_mainloop(self, is_express: bool, client_id: int): _ = self.do_single_query(query_id, client_id) with self.mt_lock: self.mt_done_queries += query_batch + query_batch = self.mt_query_batch else: begin = 0 end = 0 @@ -171,7 +184,7 @@ def search_thread_mainloop(self, is_express: bool, client_id: int): with self.mt_lock: self.mt_done_queries += end - begin begin = self.mt_next_begin - end = begin + query_batch + end = begin + self.mt_query_batch if end > num_queries: end = num_queries self.mt_next_begin = end @@ -257,14 +270,22 @@ def search_mp(self, is_express=False, num_workers=1): f"average QPS since {start_str}: {avg_start}, average QPS of last interval: {avg_interval}" ) else: + query_batch = 10 with self.mp_lock: + # how many queries a single worker can do in 10ms + query_batch = self.mp_done_queries.value / ( + num_workers * report_qps_sec * 100 + ) + query_batch = min(100, query_batch) + query_batch = max(query_batch, 1) + self.mp_query_batch.value = query_batch self.mp_done_queries.value = 0 start = now start_str = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(start)) report_prev = now done_warm_up = True logging.info( - "Collecting statistics for 30 minutes. Print statistics so far every minute. Type Ctrl+C to quit." + f"Collecting statistics for 30 minutes and print QPS so far every minute. Workers will report every {query_batch} queries. Type Ctrl+C to quit." ) for i in range(num_workers): @@ -274,9 +295,11 @@ def search_mp(self, is_express=False, num_workers=1): def search_process_mainloop(self, is_express: bool): self.setup_clients(1) # socket is unsafe to share among workers - query_batch = 100 num_queries = len(self.queries) if is_express: + query_batch = 10 + with self.mp_lock: + query_batch = self.mp_query_batch.value local_rng = random.Random() # random number generator per thread deadline = time.time() + 30 * 60 # 30 minutes while time.time() < deadline: @@ -285,6 +308,7 @@ def search_process_mainloop(self, is_express: bool): _ = self.do_single_query(query_id, 0) with self.mp_lock: self.mp_done_queries.value += query_batch + query_batch = self.mp_query_batch.value else: begin = 0 end = 0 @@ -293,7 +317,7 @@ def search_process_mainloop(self, is_express: bool): with self.mp_lock: self.mp_done_queries.value += end - begin begin = self.mp_next_begin.value - end = begin + query_batch + end = begin + self.mp_query_batch.value if end > num_queries: end = num_queries self.mp_next_begin.value = end diff --git a/src/main/resource_manager.cppm b/src/main/resource_manager.cppm index 8ed0aaaaf1..582db9bd7c 100644 --- a/src/main/resource_manager.cppm +++ b/src/main/resource_manager.cppm @@ -23,14 +23,15 @@ namespace infinity { export class ResourceManager : public Singleton { public: - explicit ResourceManager(u64 total_cpu_count, u64 total_memory) : total_cpu_count_(total_cpu_count), total_memory_(total_memory) {} + explicit ResourceManager(u64 total_cpu_count, u64 total_memory) + : total_cpu_count_(total_cpu_count), total_memory_(total_memory), hardware_concurrency_(Thread::hardware_concurrency()) {} inline u64 GetCpuResource(u64 cpu_count) { total_cpu_count_ -= cpu_count; return cpu_count; } - inline u64 GetCpuResource() { return GetCpuResource(Thread::hardware_concurrency()); } + inline u64 GetCpuResource() { return GetCpuResource(hardware_concurrency_); } // inline u64 GetCpuResource() { return GetCpuResource(4); } inline u64 GetMemoryResource(u64 memory_size) { @@ -46,6 +47,7 @@ public: private: atomic_u64 total_cpu_count_; atomic_u64 total_memory_; + u64 hardware_concurrency_; }; } // namespace infinity diff --git a/src/storage/txn/txn.cpp b/src/storage/txn/txn.cpp index 1af283a098..eaf8d32281 100644 --- a/src/storage/txn/txn.cpp +++ b/src/storage/txn/txn.cpp @@ -67,8 +67,10 @@ Txn::Txn(BufferManager *buffer_mgr, TxnManager *txn_mgr, Catalog *catalog, Trans : txn_store_(this, catalog), txn_mgr_(txn_mgr), buffer_mgr_(buffer_mgr), catalog_(catalog), txn_id_(txn_id), txn_context_(begin_ts), wal_entry_(MakeShared()), local_catalog_delta_ops_entry_(MakeUnique()) {} -UniquePtr Txn::NewReplayTxn(BufferManager *buffer_mgr, TxnManager *txn_mgr, Catalog *catalog, TransactionID txn_id) { - auto txn = MakeUnique(buffer_mgr, txn_mgr, catalog, txn_id, MAX_TIMESTAMP); +UniquePtr Txn::NewReplayTxn(BufferManager *buffer_mgr, TxnManager *txn_mgr, Catalog *catalog, TransactionID txn_id, TxnTimeStamp begin_ts) { + auto txn = MakeUnique(buffer_mgr, txn_mgr, catalog, txn_id, begin_ts); + txn->txn_context_.commit_ts_ = begin_ts; + txn->txn_context_.state_ = TxnState::kCommitted; return txn; } @@ -479,13 +481,6 @@ void Txn::CancelCommitBottom() { cond_var_.notify_one(); } -// Dangerous! only used during replaying wal. -void Txn::FakeCommit(TxnTimeStamp commit_ts) { - txn_context_.begin_ts_ = commit_ts; - txn_context_.commit_ts_ = commit_ts; - txn_context_.state_ = TxnState::kCommitted; -} - void Txn::Rollback() { auto state = txn_context_.GetTxnState(); TxnTimeStamp abort_ts = 0; diff --git a/src/storage/txn/txn.cppm b/src/storage/txn/txn.cppm index 2b7ada0497..55d3f86a9b 100644 --- a/src/storage/txn/txn.cppm +++ b/src/storage/txn/txn.cppm @@ -76,7 +76,7 @@ public: // For replay txn explicit Txn(BufferManager *buffer_mgr, TxnManager *txn_mgr, Catalog *catalog, TransactionID txn_id, TxnTimeStamp begin_ts); - static UniquePtr NewReplayTxn(BufferManager *buffer_mgr, TxnManager *txn_mgr, Catalog *catalog, TransactionID txn_id); + static UniquePtr NewReplayTxn(BufferManager *buffer_mgr, TxnManager *txn_mgr, Catalog *catalog, TransactionID txn_id, TxnTimeStamp begin_ts); // Txn steps: // 1. CreateTxn @@ -195,9 +195,6 @@ public: void SetTxnWrite() { txn_context_.SetTxnType(TxnType::kWrite); } // WAL and replay OPS - // Dangerous! only used during replaying wal. - void FakeCommit(TxnTimeStamp commit_ts); - void AddWalCmd(const SharedPtr &cmd); bool Checkpoint(const TxnTimeStamp max_commit_ts, bool is_full_checkpoint); diff --git a/src/storage/txn/txn_context.cppm b/src/storage/txn/txn_context.cppm index 1a859c47db..cece973fc4 100644 --- a/src/storage/txn/txn_context.cppm +++ b/src/storage/txn/txn_context.cppm @@ -31,10 +31,7 @@ public: TxnContext(TxnTimeStamp begin_ts) : begin_ts_(begin_ts) {} - inline TxnTimeStamp GetBeginTS() { - std::shared_lock r_locker(rw_locker_); - return begin_ts_; - } + inline TxnTimeStamp GetBeginTS() const { return begin_ts_; } inline TxnTimeStamp GetCommitTS() { std::shared_lock r_locker(rw_locker_); @@ -92,7 +89,7 @@ public: private: std::shared_mutex rw_locker_{}; - TxnTimeStamp begin_ts_{}; + const TxnTimeStamp begin_ts_{}; TxnTimeStamp commit_ts_{}; TxnTimeStamp committed_ts_{}; TxnState state_{TxnState::kStarted}; diff --git a/src/storage/wal/wal_manager.cpp b/src/storage/wal/wal_manager.cpp index 063619a08e..05bb910db0 100644 --- a/src/storage/wal/wal_manager.cpp +++ b/src/storage/wal/wal_manager.cpp @@ -668,10 +668,8 @@ void WalManager::WalCmdCreateIndexReplay(const WalCmdCreateIndex &cmd, Transacti txn_id, begin_ts); - auto fake_txn = Txn::NewReplayTxn(storage_->buffer_manager(), storage_->txn_manager(), storage_->catalog(), txn_id); - - auto txn = MakeUnique(nullptr /*buffer_mgr*/, nullptr /*txn_mgr*/, nullptr /*catalog*/, txn_id, begin_ts); - auto base_table_ref = MakeShared(table_entry, table_entry->GetBlockIndex(txn.get())); + auto fake_txn = Txn::NewReplayTxn(storage_->buffer_manager(), storage_->txn_manager(), storage_->catalog(), txn_id, commit_ts); + auto base_table_ref = MakeShared(table_entry, table_entry->GetBlockIndex(fake_txn.get())); table_index_entry->CreateIndexPrepare(base_table_ref.get(), fake_txn.get(), false, true); auto *txn_store = fake_txn->GetTxnTableStore(table_entry); @@ -763,10 +761,9 @@ void WalManager::WalCmdDeleteReplay(const WalCmdDelete &cmd, TransactionID txn_i UnrecoverableError(fmt::format("Wal Replay: Get table failed {}", table_status.message())); } - auto fake_txn = Txn::NewReplayTxn(storage_->buffer_manager(), storage_->txn_manager(), storage_->catalog(), txn_id); + auto fake_txn = Txn::NewReplayTxn(storage_->buffer_manager(), storage_->txn_manager(), storage_->catalog(), txn_id, commit_ts); auto table_store = fake_txn->GetTxnTableStore(table_entry); table_store->Delete(cmd.row_ids_); - fake_txn->FakeCommit(commit_ts); Catalog::Delete(table_store->table_entry_, fake_txn->TxnID(), (void *)table_store, fake_txn->CommitTS(), table_store->delete_state_); Catalog::CommitWrite(table_store->table_entry_, fake_txn->TxnID(), commit_ts, table_store->txn_segments(), &table_store->delete_state_); } @@ -800,14 +797,13 @@ void WalManager::WalCmdAppendReplay(const WalCmdAppend &cmd, TransactionID txn_i UnrecoverableError(fmt::format("Wal Replay: Get table failed {}", table_status.message())); } - auto fake_txn = Txn::NewReplayTxn(storage_->buffer_manager(), storage_->txn_manager(), storage_->catalog(), txn_id); + auto fake_txn = Txn::NewReplayTxn(storage_->buffer_manager(), storage_->txn_manager(), storage_->catalog(), txn_id, commit_ts); auto table_store = fake_txn->GetTxnTableStore(table_entry); table_store->Append(cmd.block_); auto append_state = MakeUnique(table_store->blocks_); table_store->append_state_ = std::move(append_state); - fake_txn->FakeCommit(commit_ts); Catalog::Append(table_store->table_entry_, fake_txn->TxnID(), table_store, commit_ts, storage_->buffer_manager(), true); Catalog::CommitWrite(table_store->table_entry_, fake_txn->TxnID(), commit_ts, table_store->txn_segments(), nullptr); }