Skip to content

Commit

Permalink
Minor improvement of fulltext query (#1250)
Browse files Browse the repository at this point in the history
Optimized ResourceManager::GetCpuResource.
Removed Txn::FakeCommit

- [x] Performance Improvement
  • Loading branch information
yuzhichang authored May 28, 2024
1 parent 0171896 commit 2b10cc3
Show file tree
Hide file tree
Showing 6 changed files with 45 additions and 34 deletions.
36 changes: 30 additions & 6 deletions python/benchmark/clients/base_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,15 @@ def __init__(self, conf_path: str) -> None:
self.clients = list()
# Following are for multithreading
self.mt_lock = threading.Lock()
self.mt_query_batch = 10
self.mt_next_begin = 0
self.mt_done_queries = 0
self.mt_active_workers = 0
self.mt_results = []
# Following are for multiprocessing
self.mp_manager = multiprocessing.Manager()
self.mp_lock = multiprocessing.Lock()
self.mp_query_batch = multiprocessing.Value("i", 10, lock=False)
self.mp_next_begin = multiprocessing.Value("i", 0, lock=False)
self.mp_done_queries = multiprocessing.Value("i", 0, lock=False)
self.mp_active_workers = multiprocessing.Value("i", 0, lock=False)
Expand Down Expand Up @@ -136,14 +138,22 @@ def search_mt(self, is_express=False, num_workers=1):
f"average QPS since {start_str}: {avg_start}, average QPS of last interval: {avg_interval}"
)
else:
query_batch = 10
with self.mt_lock:
# how many queries a single worker can do in 10ms
query_batch = self.mt_done_queries / (
num_workers * report_qps_sec * 100
)
query_batch = min(100, query_batch)
query_batch = max(query_batch, 1)
self.mt_query_batch = query_batch
self.mt_done_queries = 0
start = now
start_str = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(start))
report_prev = now
done_warm_up = True
logging.info(
"Collecting statistics for 30 minutes. Print statistics so far every minute. Type Ctrl+C to quit."
f"Collecting statistics for 30 minutes and print QPS so far every minute. Workers will report every {query_batch} queries. Type Ctrl+C to quit."
)

for i in range(num_workers):
Expand All @@ -152,9 +162,11 @@ def search_mt(self, is_express=False, num_workers=1):
self.save_and_check_results(self.mt_results)

def search_thread_mainloop(self, is_express: bool, client_id: int):
query_batch = 100
num_queries = len(self.queries)
if is_express:
query_batch = 10
with self.mt_lock:
query_batch = self.mt_query_batch
local_rng = random.Random() # random number generator per thread
deadline = time.time() + 30 * 60 # 30 minutes
while time.time() < deadline:
Expand All @@ -163,6 +175,7 @@ def search_thread_mainloop(self, is_express: bool, client_id: int):
_ = self.do_single_query(query_id, client_id)
with self.mt_lock:
self.mt_done_queries += query_batch
query_batch = self.mt_query_batch
else:
begin = 0
end = 0
Expand All @@ -171,7 +184,7 @@ def search_thread_mainloop(self, is_express: bool, client_id: int):
with self.mt_lock:
self.mt_done_queries += end - begin
begin = self.mt_next_begin
end = begin + query_batch
end = begin + self.mt_query_batch
if end > num_queries:
end = num_queries
self.mt_next_begin = end
Expand Down Expand Up @@ -257,14 +270,22 @@ def search_mp(self, is_express=False, num_workers=1):
f"average QPS since {start_str}: {avg_start}, average QPS of last interval: {avg_interval}"
)
else:
query_batch = 10
with self.mp_lock:
# how many queries a single worker can do in 10ms
query_batch = self.mp_done_queries.value / (
num_workers * report_qps_sec * 100
)
query_batch = min(100, query_batch)
query_batch = max(query_batch, 1)
self.mp_query_batch.value = query_batch
self.mp_done_queries.value = 0
start = now
start_str = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(start))
report_prev = now
done_warm_up = True
logging.info(
"Collecting statistics for 30 minutes. Print statistics so far every minute. Type Ctrl+C to quit."
f"Collecting statistics for 30 minutes and print QPS so far every minute. Workers will report every {query_batch} queries. Type Ctrl+C to quit."
)

for i in range(num_workers):
Expand All @@ -274,9 +295,11 @@ def search_mp(self, is_express=False, num_workers=1):

def search_process_mainloop(self, is_express: bool):
self.setup_clients(1) # socket is unsafe to share among workers
query_batch = 100
num_queries = len(self.queries)
if is_express:
query_batch = 10
with self.mp_lock:
query_batch = self.mp_query_batch.value
local_rng = random.Random() # random number generator per thread
deadline = time.time() + 30 * 60 # 30 minutes
while time.time() < deadline:
Expand All @@ -285,6 +308,7 @@ def search_process_mainloop(self, is_express: bool):
_ = self.do_single_query(query_id, 0)
with self.mp_lock:
self.mp_done_queries.value += query_batch
query_batch = self.mp_query_batch.value
else:
begin = 0
end = 0
Expand All @@ -293,7 +317,7 @@ def search_process_mainloop(self, is_express: bool):
with self.mp_lock:
self.mp_done_queries.value += end - begin
begin = self.mp_next_begin.value
end = begin + query_batch
end = begin + self.mp_query_batch.value
if end > num_queries:
end = num_queries
self.mp_next_begin.value = end
Expand Down
6 changes: 4 additions & 2 deletions src/main/resource_manager.cppm
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,15 @@ namespace infinity {

export class ResourceManager : public Singleton<ResourceManager> {
public:
explicit ResourceManager(u64 total_cpu_count, u64 total_memory) : total_cpu_count_(total_cpu_count), total_memory_(total_memory) {}
explicit ResourceManager(u64 total_cpu_count, u64 total_memory)
: total_cpu_count_(total_cpu_count), total_memory_(total_memory), hardware_concurrency_(Thread::hardware_concurrency()) {}

inline u64 GetCpuResource(u64 cpu_count) {
total_cpu_count_ -= cpu_count;
return cpu_count;
}

inline u64 GetCpuResource() { return GetCpuResource(Thread::hardware_concurrency()); }
inline u64 GetCpuResource() { return GetCpuResource(hardware_concurrency_); }
// inline u64 GetCpuResource() { return GetCpuResource(4); }

inline u64 GetMemoryResource(u64 memory_size) {
Expand All @@ -46,6 +47,7 @@ public:
private:
atomic_u64 total_cpu_count_;
atomic_u64 total_memory_;
u64 hardware_concurrency_;
};

} // namespace infinity
13 changes: 4 additions & 9 deletions src/storage/txn/txn.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,10 @@ Txn::Txn(BufferManager *buffer_mgr, TxnManager *txn_mgr, Catalog *catalog, Trans
: txn_store_(this, catalog), txn_mgr_(txn_mgr), buffer_mgr_(buffer_mgr), catalog_(catalog), txn_id_(txn_id), txn_context_(begin_ts),
wal_entry_(MakeShared<WalEntry>()), local_catalog_delta_ops_entry_(MakeUnique<CatalogDeltaEntry>()) {}

UniquePtr<Txn> Txn::NewReplayTxn(BufferManager *buffer_mgr, TxnManager *txn_mgr, Catalog *catalog, TransactionID txn_id) {
auto txn = MakeUnique<Txn>(buffer_mgr, txn_mgr, catalog, txn_id, MAX_TIMESTAMP);
UniquePtr<Txn> Txn::NewReplayTxn(BufferManager *buffer_mgr, TxnManager *txn_mgr, Catalog *catalog, TransactionID txn_id, TxnTimeStamp begin_ts) {
auto txn = MakeUnique<Txn>(buffer_mgr, txn_mgr, catalog, txn_id, begin_ts);
txn->txn_context_.commit_ts_ = begin_ts;
txn->txn_context_.state_ = TxnState::kCommitted;
return txn;
}

Expand Down Expand Up @@ -479,13 +481,6 @@ void Txn::CancelCommitBottom() {
cond_var_.notify_one();
}

// Dangerous! only used during replaying wal.
void Txn::FakeCommit(TxnTimeStamp commit_ts) {
txn_context_.begin_ts_ = commit_ts;
txn_context_.commit_ts_ = commit_ts;
txn_context_.state_ = TxnState::kCommitted;
}

void Txn::Rollback() {
auto state = txn_context_.GetTxnState();
TxnTimeStamp abort_ts = 0;
Expand Down
5 changes: 1 addition & 4 deletions src/storage/txn/txn.cppm
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ public:
// For replay txn
explicit Txn(BufferManager *buffer_mgr, TxnManager *txn_mgr, Catalog *catalog, TransactionID txn_id, TxnTimeStamp begin_ts);

static UniquePtr<Txn> NewReplayTxn(BufferManager *buffer_mgr, TxnManager *txn_mgr, Catalog *catalog, TransactionID txn_id);
static UniquePtr<Txn> NewReplayTxn(BufferManager *buffer_mgr, TxnManager *txn_mgr, Catalog *catalog, TransactionID txn_id, TxnTimeStamp begin_ts);

// Txn steps:
// 1. CreateTxn
Expand Down Expand Up @@ -195,9 +195,6 @@ public:
void SetTxnWrite() { txn_context_.SetTxnType(TxnType::kWrite); }

// WAL and replay OPS
// Dangerous! only used during replaying wal.
void FakeCommit(TxnTimeStamp commit_ts);

void AddWalCmd(const SharedPtr<WalCmd> &cmd);

bool Checkpoint(const TxnTimeStamp max_commit_ts, bool is_full_checkpoint);
Expand Down
7 changes: 2 additions & 5 deletions src/storage/txn/txn_context.cppm
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,7 @@ public:

TxnContext(TxnTimeStamp begin_ts) : begin_ts_(begin_ts) {}

inline TxnTimeStamp GetBeginTS() {
std::shared_lock<std::shared_mutex> r_locker(rw_locker_);
return begin_ts_;
}
inline TxnTimeStamp GetBeginTS() const { return begin_ts_; }

inline TxnTimeStamp GetCommitTS() {
std::shared_lock<std::shared_mutex> r_locker(rw_locker_);
Expand Down Expand Up @@ -92,7 +89,7 @@ public:

private:
std::shared_mutex rw_locker_{};
TxnTimeStamp begin_ts_{};
const TxnTimeStamp begin_ts_{};
TxnTimeStamp commit_ts_{};
TxnTimeStamp committed_ts_{};
TxnState state_{TxnState::kStarted};
Expand Down
12 changes: 4 additions & 8 deletions src/storage/wal/wal_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -668,10 +668,8 @@ void WalManager::WalCmdCreateIndexReplay(const WalCmdCreateIndex &cmd, Transacti
txn_id,
begin_ts);

auto fake_txn = Txn::NewReplayTxn(storage_->buffer_manager(), storage_->txn_manager(), storage_->catalog(), txn_id);

auto txn = MakeUnique<Txn>(nullptr /*buffer_mgr*/, nullptr /*txn_mgr*/, nullptr /*catalog*/, txn_id, begin_ts);
auto base_table_ref = MakeShared<BaseTableRef>(table_entry, table_entry->GetBlockIndex(txn.get()));
auto fake_txn = Txn::NewReplayTxn(storage_->buffer_manager(), storage_->txn_manager(), storage_->catalog(), txn_id, commit_ts);
auto base_table_ref = MakeShared<BaseTableRef>(table_entry, table_entry->GetBlockIndex(fake_txn.get()));
table_index_entry->CreateIndexPrepare(base_table_ref.get(), fake_txn.get(), false, true);

auto *txn_store = fake_txn->GetTxnTableStore(table_entry);
Expand Down Expand Up @@ -763,10 +761,9 @@ void WalManager::WalCmdDeleteReplay(const WalCmdDelete &cmd, TransactionID txn_i
UnrecoverableError(fmt::format("Wal Replay: Get table failed {}", table_status.message()));
}

auto fake_txn = Txn::NewReplayTxn(storage_->buffer_manager(), storage_->txn_manager(), storage_->catalog(), txn_id);
auto fake_txn = Txn::NewReplayTxn(storage_->buffer_manager(), storage_->txn_manager(), storage_->catalog(), txn_id, commit_ts);
auto table_store = fake_txn->GetTxnTableStore(table_entry);
table_store->Delete(cmd.row_ids_);
fake_txn->FakeCommit(commit_ts);
Catalog::Delete(table_store->table_entry_, fake_txn->TxnID(), (void *)table_store, fake_txn->CommitTS(), table_store->delete_state_);
Catalog::CommitWrite(table_store->table_entry_, fake_txn->TxnID(), commit_ts, table_store->txn_segments(), &table_store->delete_state_);
}
Expand Down Expand Up @@ -800,14 +797,13 @@ void WalManager::WalCmdAppendReplay(const WalCmdAppend &cmd, TransactionID txn_i
UnrecoverableError(fmt::format("Wal Replay: Get table failed {}", table_status.message()));
}

auto fake_txn = Txn::NewReplayTxn(storage_->buffer_manager(), storage_->txn_manager(), storage_->catalog(), txn_id);
auto fake_txn = Txn::NewReplayTxn(storage_->buffer_manager(), storage_->txn_manager(), storage_->catalog(), txn_id, commit_ts);
auto table_store = fake_txn->GetTxnTableStore(table_entry);
table_store->Append(cmd.block_);

auto append_state = MakeUnique<AppendState>(table_store->blocks_);
table_store->append_state_ = std::move(append_state);

fake_txn->FakeCommit(commit_ts);
Catalog::Append(table_store->table_entry_, fake_txn->TxnID(), table_store, commit_ts, storage_->buffer_manager(), true);
Catalog::CommitWrite(table_store->table_entry_, fake_txn->TxnID(), commit_ts, table_store->txn_segments(), nullptr);
}
Expand Down

0 comments on commit 2b10cc3

Please sign in to comment.