From 0d81b4889125ac7618d7ecb06a877ba8054649a6 Mon Sep 17 00:00:00 2001 From: Zhichang Yu Date: Fri, 24 May 2024 18:25:16 +0800 Subject: [PATCH] Refactor benchmark scripts (#1243) Refactor benchmark scripts - [x] Documentation Update - [x] Refactoring --- docs/references/benchmark.md | 10 +- python/benchmark/clients/base_client.py | 243 +++++++++++---- .../benchmark/clients/elasticsearch_client.py | 280 ++++-------------- python/benchmark/clients/infinity_client.py | 235 +++------------ python/benchmark/clients/qdrant_client.py | 110 ++----- .../benchmark/configs/infinity_geonames.json | 2 +- python/benchmark/configs/infinity_sift.json | 3 +- ...rate_query_json.py => generate_queries.py} | 0 python/benchmark/preprocess_aol_search_log.py | 53 ++++ .../benchmark/preprocess_aol_search_result.py | 39 +++ python/benchmark/run.py | 121 +++----- python/infinity/remote_thrift/types.py | 4 +- 12 files changed, 457 insertions(+), 643 deletions(-) rename python/benchmark/{generate_query_json.py => generate_queries.py} (100%) create mode 100755 python/benchmark/preprocess_aol_search_log.py create mode 100755 python/benchmark/preprocess_aol_search_result.py diff --git a/docs/references/benchmark.md b/docs/references/benchmark.md index 2af491e185..a443b7be8f 100644 --- a/docs/references/benchmark.md +++ b/docs/references/benchmark.md @@ -57,11 +57,11 @@ sed '1d' datasets/enwiki/enwiki-20120502-lines-1k.txt > datasets/enwiki/enwiki.c mkdir -p $HOME/elasticsearch docker run -d --name elasticsearch --network host -e "discovery.type=single-node" -e "ES_JAVA_OPTS=-Xms16384m -Xmx32000m" -e "xpack.security.enabled=false" -v $HOME/elasticsearch:/usr/share/elasticsearch elasticsearch:8.13.4 -mkdir -p $HOME/qdrant -docker run -d --name qdrant --network host -v $HOME/qdrant:/qdrant qdrant/qdrant:v1.8.2 +mkdir -p $HOME/qdrant/storage +docker run -d --name qdrant --network host -v $HOME/qdrant/storage:/qdrant/storage qdrant/qdrant:v1.9.2 -sudo mkdir -p /var/infinity && sudo chown -R $USER /var/infinity -docker run -d --name infinity -v /var/infinity/:/var/infinity --ulimit nofile=500000:500000 --network=host infiniflow/infinity:0.1.0 +mkdir -p $HOME/infinity +docker run -d --name infinity -v $HOME/infinity:/var/infinity --ulimit nofile=500000:500000 --network=host infiniflow/infinity:0.1.0 ``` 4. Run Benchmark: @@ -101,7 +101,7 @@ Following are commands for engine `infinity` and dataset `enwiki`: ```bash python run.py --generate --engine infinity --dataset enwiki python run.py --import --engine infinity --dataset enwiki -python run.py --query --engine infinity --dataset enwiki +python run.py --query=16 --engine infinity --dataset enwiki python run.py --query-express=16 --engine infinity --dataset enwiki ``` diff --git a/python/benchmark/clients/base_client.py b/python/benchmark/clients/base_client.py index 1df5cadbea..4fc87d36f9 100644 --- a/python/benchmark/clients/base_client.py +++ b/python/benchmark/clients/base_client.py @@ -1,10 +1,14 @@ -import argparse from abc import abstractmethod from typing import Any import subprocess import os import time import logging +import random +import json +import h5py +import numpy as np +import threading class BaseClient: @@ -14,14 +18,18 @@ class BaseClient: Each client reads the required parameters from the JSON configuration file. """ - def __init__( - self, conf_path: str, options: argparse.Namespace, drop_old: bool = True - ) -> None: + def __init__(self, conf_path: str) -> None: """ The conf_path configuration file is parsed to extract the needed parameters, which are then all stored for use by other functions. """ - self.delta = 0 - self.running = True + self.data = None + self.queries = list() + self.clients = list() + self.lock = threading.Lock() + self.next_begin = 0 + self.results = [] + self.done_queries = 0 + self.active_threads = 0 @abstractmethod def upload(self): @@ -31,11 +39,11 @@ def upload(self): pass @abstractmethod - def search(self) -> list[list[Any]]: - """ - Execute the corresponding query tasks (vector search, full-text search, hybrid search) based on the parsed parameters. - The function returns id list. - """ + def setup_clients(self, num_threads=1): + pass + + @abstractmethod + def do_single_query(self, query_id, client_id) -> list[Any]: pass def download_data(self, url, target_path): @@ -52,44 +60,178 @@ def download_data(self, url, target_path): else: subprocess.run(["wget", "-O", target_path, url], check=True) - @abstractmethod - def check_and_save_results(self, results: list[list[Any]]): - """ - The correct results for queries are read from the mode configuration file to compare with the search results and calculate recall. - Record the results (metrics to be measured) and save them in the results folder. - """ - pass + def search(self, is_express=False, num_threads=1): + self.setup_clients(num_threads) - @abstractmethod - def search_express(self, shared_counter, exit_event): - """ - Search in express mode: - - doesn't record per-query latency and result - - call update_shared_counter regularly to update the shared counter and quit when exit_event is set - """ + query_path = os.path.join(self.path_prefix, self.data["query_path"]) + _, ext = os.path.splitext(query_path) + if self.data["mode"] == "fulltext": + assert ext == ".txt" + for line in open(query_path, "r"): + line = line.strip() + self.queries.append(line) + else: + self.data["mode"] == "vector" + if ext == ".hdf5": + with h5py.File(query_path, "r") as f: + self.queries = list(f["test"]) + else: + assert ext == "jsonl" + for line in open(query_path, "r"): + query = json.loads(line)["vector"] + self.queries.append(query) - def search_express_outer(self, shared_counter, exit_event): - try: - self.search_express(shared_counter, exit_event) - except KeyboardInterrupt: - logging.info("Interrupted by user! Exiting...") - except Exception as e: - logging.error(e) - finally: - logging.info("subprocess exited") - - def update_shared_counter(self, shared_counter, exit_event): - """ - update shared_counter (allocated from shared memory) regularly. - set self.running to False when exit_event is set - """ - self.delta += 1 - if self.delta >= 100: - with shared_counter.get_lock(): - shared_counter.value += self.delta - self.delta = 0 - self.running = not exit_event.is_set() - return + self.active_threads = num_threads + threads = [] + for i in range(num_threads): + threads.append( + threading.Thread( + target=self.search_thread_mainloop, + args=[is_express, i], + daemon=True, + ) + ) + for i in range(num_threads): + threads[i].start() + + report_qps_sec = 60 + sleep_sec = 10 + sleep_cnt = 0 + done_warm_up = True + if is_express: + logging.info(f"Let database warm-up for {report_qps_sec} seconds") + done_warm_up = False + start = time.time() + start_str = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(start)) + report_prev = start + done_queries_prev = 0 + done_queries_curr = 0 + + while self.active_threads > 0: + time.sleep(sleep_sec) + sleep_cnt += 1 + if sleep_cnt < report_qps_sec / sleep_sec: + continue + sleep_cnt = 0 + now = time.time() + if done_warm_up: + with self.lock: + done_queries_curr = self.done_queries + avg_start = done_queries_curr / (now - start) + avg_interval = (done_queries_curr - done_queries_prev) / ( + now - report_prev + ) + done_queries_prev = done_queries_curr + report_prev = now + logging.info( + f"average QPS since {start_str}: {avg_start}, average QPS of last interval:{avg_interval}" + ) + else: + with self.lock: + self.done_queries = 0 + start = now + start_str = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(start)) + report_prev = now + done_warm_up = True + logging.info( + "Collecting statistics for 30 minutes. Print statistics so far every minute. Type Ctrl+C to quit." + ) + + for i in range(num_threads): + threads[i].join() + if not is_express: + self.save_and_check_results(self.results) + + def search_thread_mainloop(self, is_express: bool, client_id: int): + query_batch = 100 + num_queries = len(self.queries) + if is_express: + local_rng = random.Random() # random number generator per thread + deadline = time.time() + 30 * 60 # 30 minutes + while time.time() < deadline: + for i in range(query_batch): + query_id = local_rng.randrange(0, num_queries) + _ = self.do_single_query(query_id, client_id) + with self.lock: + self.done_queries += query_batch + else: + begin = 0 + end = 0 + local_results = list() + while end < num_queries: + with self.lock: + self.done_queries += end - begin + begin = self.next_begin + end = begin + query_batch + if end > num_queries: + end = num_queries + self.next_begin = end + for query_id in range(begin, end): + start = time.time() + result = self.do_single_query(query_id, client_id) + latency = (time.time() - start) * 1000 + result = [(query_id, latency)] + result + local_results.append(result) + with self.lock: + self.done_queries += end - begin + self.results += local_results + with self.lock: + self.active_threads -= 1 + + def save_and_check_results(self, results: list[list[Any]]): + """ + Compare the search results with ground truth to calculate recall. + """ + self.results.sort(key=lambda x: x[0][0]) + if "result_path" in self.data: + result_path = self.data["result_path"] + with open(result_path, "w") as f: + for result in results: + line = json.dumps(result) + f.write(line + "\n") + logging.info("query_result_path: {0}".format(result_path)) + latencies = [] + for result in results: + latencies.append(result[0][1]) + logging.info( + f"""mean_time: {np.mean(latencies)}, std_time: {np.std(latencies)}, + max_time: {np.max(latencies)}, min_time: {np.min(latencies)}, + p95_time: {np.percentile(latencies, 95)}, p99_time: {np.percentile(latencies, 99)}""" + ) + if "ground_truth_path" in self.data: + ground_truth_path = self.data["ground_truth_path"] + _, ext = os.path.splitext(ground_truth_path) + precisions = [] + if self.data["mode"] == "vector": + assert ext == ".hdf5" + with h5py.File(ground_truth_path, "r") as f: + expected_result = f["neighbors"] + for i, result in enumerate(results): + ids = [((x >> 32) << 23) + (x & 0xFFFFFFFF) for x in result[1:]] + precision = ( + len(set(ids).intersection(expected_result[i][1:])) + / self.data["topK"] + ) + precisions.append(precision) + else: + assert ext == ".json" or ext == ".jsonl" + with open(ground_truth_path, "r") as f: + for i, line in enumerate(f): + expected_result = json.loads(line) + exp_ids = [ + ((x[0] >> 32) << 23) + (x[0] & 0xFFFFFFFF) + for x in expected_result[1:] + ] + result = results[i] + ids = [ + ((x[0] >> 32) << 23) + (x[0] & 0xFFFFFFFF) + for x in result[1:] + ] + precision = ( + len(set(ids).intersection(exp_ids)) / self.data["topK"] + ) + precisions.append(precision) + logging.info(f"""mean_precisions: {np.mean(precisions)}""") def run_experiment(self, args): """ @@ -100,6 +242,7 @@ def run_experiment(self, args): self.upload() finish_time = time.time() logging.info(f"upload finish, cost time = {finish_time - start_time}") - elif args.query: - results = self.search() - self.check_and_save_results(results) + elif args.query >= 1: + self.search(is_express=False, num_threads=args.query) + elif args.query_express >= 1: + self.search(is_express=True, num_threads=args.query_express) diff --git a/python/benchmark/clients/elasticsearch_client.py b/python/benchmark/clients/elasticsearch_client.py index 86030c665b..067c91a788 100644 --- a/python/benchmark/clients/elasticsearch_client.py +++ b/python/benchmark/clients/elasticsearch_client.py @@ -1,31 +1,25 @@ -import argparse from typing import Any from elasticsearch import Elasticsearch, helpers import json -import time from typing import List import os import h5py import uuid -import numpy as np -import random import logging from .base_client import BaseClient class ElasticsearchClient(BaseClient): - def __init__( - self, conf_path: str, options: argparse.Namespace, drop_old: bool = True - ) -> None: + def __init__(self, conf_path: str) -> None: """ The mode configuration file is parsed to extract the needed parameters, which are then all stored for use by other functions. """ - BaseClient.__init__(self, conf_path, drop_old) + BaseClient.__init__(self, conf_path) with open(conf_path, "r") as f: self.data = json.load(f) self.client = Elasticsearch(self.data["connection_url"]) - self.collection_name = self.data["name"] + self.table_name = self.data["name"] self.path_prefix = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) logging.getLogger("elastic_transport").setLevel(logging.WARNING) @@ -36,9 +30,9 @@ def upload(self): """ Upload data and build indexes (parameters are parsed by __init__). """ - if self.client.indices.exists(index=self.collection_name): - self.client.indices.delete(index=self.collection_name) - self.client.indices.create(index=self.collection_name, body=self.data["index"]) + if self.client.indices.exists(index=self.table_name): + self.client.indices.delete(index=self.table_name) + self.client.indices.create(index=self.table_name, body=self.data["index"]) batch_size = self.data["insert_batch_size"] dataset_path = os.path.join(self.path_prefix, self.data["data_path"]) if not os.path.exists(dataset_path): @@ -54,7 +48,7 @@ def upload(self): record = json.loads(line) actions.append( { - "_index": self.collection_name, + "_index": self.table_name, "_id": uuid.UUID(int=i).hex, "_source": record, } @@ -71,7 +65,7 @@ def upload(self): record = {self.data["vector_name"]: line} actions.append( { - "_index": self.collection_name, + "_index": self.table_name, "_id": uuid.UUID(int=i).hex, "_source": record, } @@ -97,7 +91,7 @@ def upload(self): row_dict = {header: value for header, value in zip(headers, row)} current_batch.append( { - "_index": self.collection_name, + "_index": self.table_name, "_id": uuid.UUID(int=i).hex, "_source": row_dict, } @@ -111,216 +105,60 @@ def upload(self): else: raise TypeError("Unsupported file type") - self.client.indices.forcemerge( - index=self.collection_name, wait_for_completion=True - ) - - def parse_fulltext_query(self, query: dict) -> Any: - condition = query["body"]["query"] - key, value = list(condition.items())[0] - ret = {} - if key == "and": - ret = {"query": {"bool": {"must": [{"match": item} for item in value]}}} - elif key == "or": - ret = {"query": {"bool": {"should": [{"match": item} for item in value]}}} - elif key == "match": - ret = {"query": {"match": {list(value.keys())[0]: list(value.values())[0]}}} - return ret + self.client.indices.forcemerge(index=self.table_name, wait_for_completion=True) # https://www.elastic.co/guide/en/elasticsearch/reference/current/query-dsl-match-query.html - def parse_fulltext_query_content(self, query: str) -> Any: - ret = {"query": {"match": {"body": query}}} - return ret - - def search_express(self, shared_counter, exit_event): - """ - Search in express mode: doesn't record per-query latency and result. - """ - query_path = os.path.join(self.path_prefix, self.data["query_path"]) - logging.info(query_path) - _, ext = os.path.splitext(query_path) - if ext == ".json" or ext == ".jsonl": - assert self.data["mode"] == "fulltext" - with open(query_path, "r") as f: - queries = json.load(f) - while self.running: - query = random.choice(queries) - body = self.parse_fulltext_query(query) - _ = self.client.search(index=self.collection_name, body=body) - self.update_shared_counter(shared_counter, exit_event) - if not self.running: - break - elif ext == ".hdf5" and self.data["mode"] == "vector": - with h5py.File(query_path, "r") as f: - lines = f["test"] - while self.running: - query = random.choice(lines) - knn = { - "field": self.data["vector_name"], - "query_vector": query, - "k": self.data["topK"], - "num_candidates": 200, - } - _ = self.client.search( - index=self.collection_name, - source=["_id", "_score"], - knn=knn, - size=self.data["topK"], - ) - self.update_shared_counter(shared_counter, exit_event) - if not self.running: - break - elif ext == ".txt": - assert self.data["mode"] == "fulltext" - with open(query_path, "r") as f: - lines = f.readlines() - while self.running: - line = random.choice(lines) - body = self.parse_fulltext_query_content(line) - _ = self.client.search( - index=self.collection_name, body=body, size=self.data["topK"] - ) - self.update_shared_counter(shared_counter, exit_event) - if not self.running: - break - else: - raise TypeError("Unsupported file type") - return - - def search(self) -> list[list[Any]]: - """ - Execute the corresponding query tasks (vector search, full-text search, hybrid search) based on the parsed parameters. - The function returns id list. - """ - query_path = os.path.join(self.path_prefix, self.data["query_path"]) - logging.info(query_path) - results = [] - _, ext = os.path.splitext(query_path) - if ext == ".json" or ext == ".jsonl": - with open(query_path, "r") as f: - queries = json.load(f) - if self.data["mode"] == "fulltext": - for query in queries: - body = self.parse_fulltext_query(query) - start = time.time() - result = self.client.search( - index=self.collection_name, body=body - ) - end = time.time() - latency = (end - start) * 1000 - result = [ - (uuid.UUID(hex=hit["_id"]).int, hit["_score"]) - for hit in result["hits"]["hits"] - ] - result.append(latency) - results.append(result) - elif ext == ".hdf5" and self.data["mode"] == "vector": - with h5py.File(query_path, "r") as f: - for query in f["test"]: - knn = { - "field": self.data["vector_name"], - "query_vector": query, - "k": self.data["topK"], - "num_candidates": 200, - } - start = time.time() - result = self.client.search( - index=self.collection_name, - source=["_id", "_score"], - knn=knn, - size=self.data["topK"], - ) - end = time.time() - latency = (end - start) * 1000 - result = [ - (uuid.UUID(hex=hit["_id"]).int, hit["_score"]) - for hit in result["hits"]["hits"] - ] - result.append(latency) - results.append(result) - elif ext == ".txt": - with open(query_path, "r") as f: - if self.data["mode"] == "fulltext": - for line in f: - body = self.parse_fulltext_query_content(line) - start = time.time() - result = self.client.search( - index=self.collection_name, - body=body, - size=self.data["topK"], - ) - end = time.time() - latency = (end - start) * 1000 - result = [ - (uuid.UUID(hex=hit["_id"]).int, hit["_score"]) - for hit in result["hits"]["hits"] - ] - result.append(latency) - # logging.debug(f"{line[:-1]}, {latency}") - results.append(result) + def get_fulltext_query_content(self, query: str, is_and: bool = False) -> Any: + ret = None + if is_and: + terms = query.split() + ret = { + "query": { + "bool": {"must": [{"match": {"body": term}} for term in terms]} + } + } else: - raise TypeError("Unsupported file type") - return results + ret = {"query": {"match": {"body": query}}} + return ret - def check_and_save_results(self, results: List[List[Any]]): - if "result_path" in self.data: - result_path = self.data["result_path"] - with open(result_path, "w") as f: - for result in results: - line = json.dumps(result) - f.write(line + "\n") - logging.info("query_result_path: {0}".format(result_path)) - if "ground_truth_path" in self.data: - ground_truth_path = self.data["ground_truth_path"] - _, ext = os.path.splitext(ground_truth_path) - precisions = [] - latencies = [] - if ext == ".hdf5": - with h5py.File(ground_truth_path, "r") as f: - expected_result = f["neighbors"] - for i, result in enumerate(results): - ids = set(x[0] for x in result[:-1]) - precision = ( - len( - ids.intersection( - expected_result[i][: self.data["topK"]] - ) - ) - / self.data["topK"] - ) - precisions.append(precision) - latencies.append(result[-1]) - elif ext == ".json" or ext == ".jsonl": - with open(ground_truth_path, "r") as f: - for i, line in enumerate(f): - expected_result = json.loads(line) - result = results[i] - ids = set(x[0] for x in result[:-1]) - precision = ( - len( - ids.intersection( - expected_result["expected_results"][ - : self.data["topK"] - ] - ) - ) - / self.data["topK"] - ) - precisions.append(precision) - latencies.append(result[-1]) + def setup_clients(self, num_threads=1): + self.clients = list() + for i in range(num_threads): + client = Elasticsearch(self.data["connection_url"]) + self.clients.append(client) - logging.info( - f"""mean_time: {np.mean(latencies)}, mean_precisions: {np.mean(precisions)}, - std_time: {np.std(latencies)}, min_time: {np.min(latencies)}, \n - max_time: {np.max(latencies)}, p95_time: {np.percentile(latencies, 95)}, - p99_time: {np.percentile(latencies, 99)}""" + def do_single_query(self, query_id, client_id) -> list[Any]: + query = self.queries[query_id] + client = self.clients[client_id] + if self.data["mode"] == "vector": + knn = { + "field": self.data["vector_name"], + "query_vector": query, + "k": self.data["topK"], + "num_candidates": 200, + } + result = client.search( + index=self.table_name, + source=["_id", "_score"], + knn=knn, + size=self.data["topK"], ) - else: - latencies = [] - for result in results: - latencies.append(result[-1]) - logging.info( - f"""mean_time: {np.mean(latencies)}, std_time: {np.std(latencies)}, - max_time: {np.max(latencies)}, min_time: {np.min(latencies)}, - p95_time: {np.percentile(latencies, 95)}, p99_time: {np.percentile(latencies, 99)}""" + result = [ + (uuid.UUID(hex=hit["_id"]).int, hit["_score"]) + for hit in result["hits"]["hits"] + ] + return result + elif self.data["mode"] == "fulltext": + body = self.get_fulltext_query_content(query) + result = self.client.search( + index=self.table_name, + body=body, + size=self.data["topK"], ) + result = [ + (uuid.UUID(hex=hit["_id"]).int, hit["_score"]) + for hit in result["hits"]["hits"] + ] + return result + else: + raise TypeError("Unsupported data mode {}".format(self.data["mode"])) diff --git a/python/benchmark/clients/infinity_client.py b/python/benchmark/clients/infinity_client.py index c84b7fed99..e33e88f06f 100644 --- a/python/benchmark/clients/infinity_client.py +++ b/python/benchmark/clients/infinity_client.py @@ -1,11 +1,7 @@ -import argparse import json import os import h5py -import time -import numpy as np -from typing import Any, List -import random +from typing import Any import logging import infinity @@ -15,18 +11,19 @@ class InfinityClient(BaseClient): - def __init__( - self, conf_path: str, options: argparse.Namespace, drop_old: bool = True - ) -> None: + def __init__(self, conf_path: str) -> None: """ The conf_path configuration file is parsed to extract the needed parameters, which are then all stored for use by other functions. """ - BaseClient.__init__(self, conf_path, drop_old) + BaseClient.__init__(self, conf_path) with open(conf_path, "r") as f: self.data = json.load(f) - self.client = infinity.connect(NetworkAddress("127.0.0.1", 23817)) - self.collection_name = self.data["name"] + host, port = self.data["host"].split(":") + self.client = infinity.connect(NetworkAddress(host, int(port))) + self.table_name = self.data["name"] + self.data_mode = self.data["mode"] self.path_prefix = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) + self.table_objs = list() def _parse_index_schema(self, index_schema): indexs = [] @@ -45,9 +42,9 @@ def upload(self): Upload data and build indexes (parameters are parsed by __init__). """ db_obj = self.client.get_database("default_db") - db_obj.drop_table(self.collection_name) - db_obj.create_table(self.collection_name, self.data["schema"]) - table_obj = db_obj.get_table(self.collection_name) + db_obj.drop_table(self.table_name) + db_obj.create_table(self.table_name, self.data["schema"]) + table_obj = db_obj.get_table(self.table_name) dataset_path = os.path.join(self.path_prefix, self.data["data_path"]) if not os.path.exists(dataset_path): self.download_data(self.data["data_link"], dataset_path) @@ -118,180 +115,42 @@ def upload(self): for i, idx in enumerate(indexs): table_obj.create_index(f"index{i}", [idx]) - def parse_fulltext_query(self, query: dict) -> Any: - condition = query["body"]["query"] + def setup_clients(self, num_threads=1): + host, port = self.data["host"].split(":") + self.clients = list() + for i in range(num_threads): + client = infinity.connect(NetworkAddress(host, int(port))) + db_obj = client.get_database("default_db") + table_obj = db_obj.get_table(self.table_name) + self.clients.append(client) + self.table_objs.append(table_obj) - key, value = list(condition.items())[0] - ret = f'{list(value.keys())[0]}:"{list(value.values())[0]}"' - if key == "and": - ret = "&&".join(f'{list(value.keys())[0]}:"{list(value.values())[0]}"') - elif key == "or": - ret = "||".join(f'{list(value.keys())[0]}:"{list(value.values())[0]}"') - - return ret - - def search_express(self, shared_counter, exit_event): - """ - Search in express mode: doesn't record per-query latency and result. - """ - db_obj = self.client.get_database("default_db") - table_obj = db_obj.get_table(self.collection_name) - query_path = os.path.join(self.path_prefix, self.data["query_path"]) - _, ext = os.path.splitext(query_path) - if ext == ".hdf5": - with h5py.File(query_path, "r") as f: - lines = f["test"] - while self.running: - line = random.choice(lines) - _, _ = ( - table_obj.output(["_row_id"]) - .knn( - self.data["vector_name"], - line.tolist(), - "float", - self.data["metric_type"], - self.data["topK"], - ) - .to_result() - ) - self.update_shared_counter(shared_counter, exit_event) - if not self.running: - break - elif ext == ".txt": - assert self.data["mode"] == "fulltext" - with open(query_path, "r") as f: - lines = f.readlines() - while self.running: - line = random.choice(lines) - condition = f"{line.strip()}" - res, _ = ( - table_obj.output(["_row_id", "_score"]) - .match( - "", - condition, - f"topn={self.data['topK']};default_field=body", - ) - .to_result() - ) - self.update_shared_counter(shared_counter, exit_event) - if not self.running: - break - else: - raise TypeError("Unsupported file type") - return - - def search(self) -> list[list[Any]]: - """ - Execute the corresponding query tasks (vector search, full-text search, hybrid search) based on the parsed parameters. - The function returns id list. - """ - db_obj = self.client.get_database("default_db") - table_obj = db_obj.get_table(self.collection_name) - query_path = os.path.join(self.path_prefix, self.data["query_path"]) - _, ext = os.path.splitext(query_path) - results = [] - if ext == ".hdf5": - with h5py.File(query_path, "r") as f: - for line in f["test"]: - start = time.time() - res, _ = ( - table_obj.output(["_row_id"]) - .knn( - self.data["vector_name"], - line.tolist(), - "float", - self.data["metric_type"], - self.data["topK"], - ) - .to_result() - ) - latency = (time.time() - start) * 1000 - result = [[x[0] for x in res["ROW_ID"]]] - result.append(latency) - results.append(result) - elif ext == ".txt": - with open(query_path, "r") as f: - if self.data["mode"] == "fulltext": - queries = f.readlines() - for query_line in queries: - condition = f"{query_line}" - start = time.time() - res, _ = ( - table_obj.output(["_row_id", "_score"]) - .match( - "", - condition, - f"topn={self.data['topK']};default_field=body", - ) - .to_result() - ) - latency = (time.time() - start) * 1000 - result = [ - (row_id[0], score) - for row_id, score in zip(res["ROW_ID"], res["SCORE"]) - ] - result.append(latency) - # logging.debug(f"{query}, {latency}") - results.append(result) - else: - raise TypeError("Unsupported file type") - return results - - def check_and_save_results(self, results: List[List[Any]]): - if "result_path" in self.data: - result_path = self.data["result_path"] - with open(result_path, "w") as f: - for result in results: - line = json.dumps(result) - f.write(line + "\n") - logging.info("query_result_path: {0}".format(result_path)) - if "ground_truth_path" in self.data: - ground_truth_path = self.data["ground_truth_path"] - _, ext = os.path.splitext(ground_truth_path) - precisions = [] - latencies = [] - if ext == ".hdf5": - with h5py.File(ground_truth_path, "r") as f: - expected_result = f["neighbors"] - assert len(expected_result) == len(results) - for i, result in enumerate(results): - ids = set(result[0]) - precision = ( - len( - ids.intersection( - expected_result[i][: self.data["topK"]] - ) - ) - / self.data["topK"] - ) - precisions.append(precision) - latencies.append(result[-1]) - elif ext == ".json" or ext == ".jsonl": - with open(ground_truth_path, "r") as f: - for i, line in enumerate(f): - expected_result = json.loads(line) - exp_ids = set(x[0] for x in expected_result[:-1]) - result = results[i] - ids = set(x[0] for x in result[:-1]) - precision = len(ids.intersection(exp_ids)) / self.data["topK"] - logging.info( - f"expected_ids: {exp_ids}, ids: {ids}, precision: {precision}" - ) - precisions.append(precision) - latencies.append(result[-1]) - - logging.info( - f"""mean_time: {np.mean(latencies)}, mean_precisions: {np.mean(precisions)}, - std_time: {np.std(latencies)}, min_time: {np.min(latencies)}, - max_time: {np.max(latencies)}, p95_time: {np.percentile(latencies, 95)}, - p99_time: {np.percentile(latencies, 99)}""" + def do_single_query(self, query_id, client_id) -> list[Any]: + result = None + query = self.queries[query_id] + table_obj = self.table_objs[client_id] + if self.data_mode == "vector": + res, _ = ( + table_obj.output(["_row_id"]) + .knn( + self.data["vector_name"], + query, + "float", + self.data["metric_type"], + self.data["topK"], + ) + .to_result() ) - else: - latencies = [] - for result in results: - latencies.append(result[-1]) - logging.info( - f"""mean_time: {np.mean(latencies)}, std_time: {np.std(latencies)}, - max_time: {np.max(latencies)}, min_time: {np.min(latencies)}, - p95_time: {np.percentile(latencies, 95)}, p99_time: {np.percentile(latencies, 99)}""" + result = res["ROW_ID"] + elif self.data_mode == "fulltext": + res, _ = ( + table_obj.output(["_row_id", "_score"]) + .match( + "", + query, + f"topn={self.data['topK']};default_field=body", + ) + .to_result() ) + result = list(zip(res["ROW_ID"], res["SCORE"])) + return result diff --git a/python/benchmark/clients/qdrant_client.py b/python/benchmark/clients/qdrant_client.py index 58e8821865..42f45148ac 100644 --- a/python/benchmark/clients/qdrant_client.py +++ b/python/benchmark/clients/qdrant_client.py @@ -1,4 +1,3 @@ -import argparse from qdrant_client import QdrantClient as QC from qdrant_client import models from qdrant_client.models import VectorParams, Distance @@ -7,30 +6,28 @@ import json import h5py from typing import Any -import random import logging from .base_client import BaseClient class QdrantClient(BaseClient): - def __init__( - self, conf_path: str, options: argparse.Namespace, drop_old: bool = True - ) -> None: - BaseClient.__init__(self, conf_path, drop_old) + def __init__(self, conf_path: str) -> None: + BaseClient.__init__(self, conf_path) with open(conf_path, "r") as f: self.data = json.load(f) self.client = QC(self.data["connection_url"]) - self.collection_name = self.data["name"] + self.table_name = self.data["name"] if self.data["distance"] == "cosine": self.distance = Distance.COSINE elif self.data["distance"] == "L2": self.distance = Distance.EUCLID self.path_prefix = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) + logging.getLogger("httpx").setLevel(logging.WARNING) def upload_bach(self, ids: list[int], vectors, payloads=None): self.client.upsert( - collection_name=self.collection_name, + collection_name=self.table_name, points=models.Batch(ids=ids, vectors=vectors, payloads=payloads), wait=True, ) @@ -53,7 +50,7 @@ def upload(self): hnsw_config = None self.client.recreate_collection( - collection_name=self.collection_name, + collection_name=self.table_name, vectors_config=VectorParams( size=self.data["vector_size"], distance=self.distance ), @@ -63,7 +60,7 @@ def upload(self): if "payload_index_schema" in self.data: for field_name, field_schema in self.data["payload_index_schema"].items(): self.client.create_payload_index( - collection_name=self.collection_name, + collection_name=self.table_name, field_name=field_name, field_schema=field_schema, ) @@ -79,7 +76,7 @@ def upload(self): lowercase=schema.get("lowercase", None), ) self.client.create_payload_index( - collection_name=self.collection_name, + collection_name=self.table_name, field_name=field_name, field_schema=field_schema, ) @@ -129,79 +126,20 @@ def upload(self): else: raise TypeError("Unsupported file type") - def search_express(self, shared_counter, exit_event): - """ - Search in express mode: doesn't record per-query latency and result. - """ - # get the queries path - query_path = os.path.join(self.path_prefix, self.data["query_path"]) - results = [] - _, ext = os.path.splitext(query_path) - if ext == ".json" and self.data["mode"] == "vector": - with open(query_path, "r") as f: - lines = f.readlines() - while self.running: - line = random.choice(lines) - query = json.loads(line) - _ = self.client.search( - collection_name=self.collection_name, - query_vector=query["vector"], - limit=self.data.get("topK", 10), - with_payload=False, - ) - if not self.running: - break - elif ext == ".hdf5" and self.data["mode"] == "vector": - with h5py.File(query_path, "r") as f: - lines = f["test"] - while self.running: - line = random.choice(lines) - _ = self.client.search( - collection_name=self.collection_name, - query_vector=line, - limit=self.data.get("topK", 10), - ) - if not self.running: - break - else: - raise TypeError("Unsupported file type") - return results + def setup_clients(self, num_threads=1): + self.clients = list() + for i in range(num_threads): + client = QC(self.data["connection_url"]) + self.clients.append(client) - def search(self) -> list[list[Any]]: - # get the queries path - query_path = os.path.join(self.path_prefix, self.data["query_path"]) - results = [] - _, ext = os.path.splitext(query_path) - if ext == ".json" and self.data["mode"] == "vector": - with open(query_path, "r") as f: - for line in f.readlines(): - query = json.loads(line) - start = time.time() - result = self.client.search( - collection_name=self.collection_name, - query_vector=query["vector"], - limit=self.data.get("topK", 10), - with_payload=False, - ) - end = time.time() - logging.info( - f"latency of search: {(end - start)*1000:.2f} milliseconds" - ) - results.append(result) - elif ext == ".hdf5" and self.data["mode"] == "vector": - with h5py.File(query_path, "r") as f: - start = time.time() - for line in f["test"]: - result = self.client.search( - collection_name=self.collection_name, - query_vector=line, - limit=self.data.get("topK", 10), - ) - results.append(result) - end = time.time() - logging.info( - f"latency of KNN search: {(end - start)*1000/len(f['test']):.2f} milliseconds" - ) - else: - raise TypeError("Unsupported file type") - return results + def do_single_query(self, query_id, client_id) -> list[Any]: + query = self.queries[query_id] + client = self.clients[client_id] + assert self.data["mode"] == "vector" + res = client.search( + collection_name=self.table_name, + query_vector=query, + limit=self.data.get("topK", 10), + with_payload=False, + ) + return res diff --git a/python/benchmark/configs/infinity_geonames.json b/python/benchmark/configs/infinity_geonames.json index acbe1bae80..5ace7752f8 100644 --- a/python/benchmark/configs/infinity_geonames.json +++ b/python/benchmark/configs/infinity_geonames.json @@ -1,7 +1,7 @@ { "name": "infinity_geonames", "app": "infinity", - "connection_url": "http://localhost:23817", + "host": "127.0.0.1:23817", "data_path": "datasets/geonames/geonames.json", "data_link": "https://rally-tracks.elastic.co/geonames/documents-2.json.bz2", "insert_batch_size": 8192, diff --git a/python/benchmark/configs/infinity_sift.json b/python/benchmark/configs/infinity_sift.json index 4eb98a181d..7fd6a25509 100644 --- a/python/benchmark/configs/infinity_sift.json +++ b/python/benchmark/configs/infinity_sift.json @@ -1,9 +1,10 @@ { "name": "infinity_sift_1m", "app": "infinity", - "host": "http://127.0.0.1:23817", + "host": "127.0.0.1:23817", "data_path": "datasets/sift/sift-128-euclidean.hdf5", "data_link": "http://ann-benchmarks.com/sift-128-euclidean.hdf5", + "insert_batch_size": 8192, "topK": 100, "mode": "vector", "schema": { diff --git a/python/benchmark/generate_query_json.py b/python/benchmark/generate_queries.py similarity index 100% rename from python/benchmark/generate_query_json.py rename to python/benchmark/generate_queries.py diff --git a/python/benchmark/preprocess_aol_search_log.py b/python/benchmark/preprocess_aol_search_log.py new file mode 100755 index 0000000000..65bbe4beb9 --- /dev/null +++ b/python/benchmark/preprocess_aol_search_log.py @@ -0,0 +1,53 @@ +#!/usr/bin/env python3 + +# This script extracts queries from the wikipedia search log. The search log can be downloaded from: +# - https://en.wikipedia.org/wiki/AOL_search_log_release +# - https://www.kaggle.com/datasets/dineshydv/aol-user-session-collection-500k + +import sys +import re + + +def load_terms(enwiki_terms_file): + data = set() + with open(enwiki_terms_file, "r", encoding="utf-8", errors="ignore") as file: + for line in file: + columns = line.split() + data.add(columns[0]) + return data + + +def process_tsv(aol_search_log_file, enwiki_terms_file, output_aol_queries_file): + terms = load_terms(enwiki_terms_file) + unique_queries = set() + with open(aol_search_log_file, "r") as file: + next(file) # skip title line + for line in file: + # split per tab, the second field is the query + fields = line.strip().split("\t") + if len(fields) <= 2: + continue + # find pure alphabet words and lowercase them + words = re.findall(r"\b[a-zA-Z]+\b", fields[1]) + words = [x.lower() for x in words] + # filter out words that are not in the terms file + words = [x for x in words if x in terms] + if words and len(words) <= 5: + unique_queries.add(" ".join(words)) + sorted_queries = sorted(unique_queries) + with open(output_aol_queries_file, "w") as file: + for query in sorted_queries: + file.write(query + "\n") + + +if __name__ == "__main__": + if len(sys.argv) != 4: + print( + "Usage: python preprocess_aol_search_log.py " + ) + sys.exit(1) + + aol_search_log_file = sys.argv[1] + enwiki_terms_file = sys.argv[2] + output_aol_queries_file = sys.argv[3] + process_tsv(aol_search_log_file, enwiki_terms_file, output_aol_queries_file) diff --git a/python/benchmark/preprocess_aol_search_result.py b/python/benchmark/preprocess_aol_search_result.py new file mode 100755 index 0000000000..49c7593cac --- /dev/null +++ b/python/benchmark/preprocess_aol_search_result.py @@ -0,0 +1,39 @@ +#!/usr/bin/env python3 + +# This script extracts queries from the wikipedia search log. The search log can be downloaded from: +# - https://en.wikipedia.org/wiki/AOL_search_log_release +# - https://www.kaggle.com/datasets/dineshydv/aol-user-session-collection-500k + +import sys +import json + + +if __name__ == "__main__": + if len(sys.argv) != 4: + print( + "Usage: python preprocess_aol_search_result.py " + ) + sys.exit(1) + + aol_queries_file = sys.argv[1] + aol_results_file = sys.argv[2] + output_file = sys.argv[3] + queries = open(aol_queries_file, "r").readlines() + selected_queries = [] + results = [] + with open(aol_results_file, "r") as f: + for line in f: + result = json.loads(line) + results.append(result) + results.sort(key=lambda x: x[-1][1], reverse=True) + i = 0 + for result in results: + query = queries[result[0][0]] + terms = query.split() + if len(terms) > 5: + continue + selected_queries.append(query) + i += 1 + if i >= 100000: + break + open(output_file, "w").writelines(selected_queries) diff --git a/python/benchmark/run.py b/python/benchmark/run.py index cefe44edb2..3e2af70c84 100644 --- a/python/benchmark/run.py +++ b/python/benchmark/run.py @@ -2,20 +2,15 @@ import os import logging import sys -import time -import multiprocessing from clients.elasticsearch_client import ElasticsearchClient from clients.infinity_client import InfinityClient from clients.qdrant_client import QdrantClient -from generate_query_json import generate_query_txt +from generate_queries import generate_query_txt ENGINES = ["infinity", "qdrant", "elasticsearch"] DATA_SETS = ["gist", "sift", "geonames", "enwiki"] -WARM_UP_SECONDS = 60 -REPORT_QPS_INTERVAL = 60 - def parse_args() -> argparse.Namespace: parser: argparse.ArgumentParser = argparse.ArgumentParser( @@ -24,67 +19,53 @@ def parse_args() -> argparse.Namespace: parser.add_argument( "--generate", action="store_true", - dest="generate_terms", + dest="generate_queries", help="Generate fulltext queries based on the dataset", ) parser.add_argument( "--import", action="store_true", dest="import_data", - help="Import data set into database engine", + help="Import dataset into database engine", ) parser.add_argument( "--query", - action="store_true", + type=int, + default=0, dest="query", - help="Run single client to benchmark query latency", + help="Run the query set only once using given number of clients with recording the result and latency. This is for result validation and latency analysis", ) parser.add_argument( "--query-express", type=int, default=0, dest="query_express", - help="Run multiple clients in express mode to benchmark QPS", + help="Run the query set randomly using given number of clients without recording the result. This is for QPS measurement.", ) parser.add_argument( "--engine", type=str, - default="all", + default="infinity", dest="engine", - help="database engine to benchmark, one of: all, " + ", ".join(ENGINES), + help="database engine to benchmark, one of: " + ", ".join(ENGINES), ) parser.add_argument( "--dataset", type=str, - default="all", + default="enwiki", dest="dataset", - help="data set to benchmark, one of: all, " + ", ".join(DATA_SETS), + help="dataset to benchmark, one of: " + ", ".join(DATA_SETS), ) return parser.parse_args() -def generate_config_paths(kwargs: argparse.Namespace) -> list[tuple[str, str]]: - paths = [] - config_path_prefix = os.path.join( - os.path.dirname(os.path.abspath(__file__)), "configs" - ) - engines = ENGINES if kwargs.engine == "all" else [kwargs.engine] - datasets = DATA_SETS if kwargs.dataset == "all" else [kwargs.dataset] - for engine in engines: - for dataset in datasets: - paths.append( - (os.path.join(config_path_prefix, f"{engine}_{dataset}.json"), engine) - ) - return paths - - -def get_client(engine: str, conf_path: str, options: argparse.Namespace): +def get_client(engine: str, conf_path: str): if engine == "qdrant": - return QdrantClient(conf_path, options) + return QdrantClient(conf_path) elif engine == "elasticsearch": - return ElasticsearchClient(conf_path, options) + return ElasticsearchClient(conf_path) elif engine == "infinity": - return InfinityClient(conf_path, options) + return InfinityClient(conf_path) else: raise ValueError(f"Unknown engine: {engine}") @@ -95,10 +76,14 @@ def main(): format="%(asctime)-15s %(levelname)-8s (%(process)d) %(message)s", ) args = parse_args() - config_paths = generate_config_paths(args) + conf_path = os.path.join( + os.path.dirname(os.path.abspath(__file__)), + "configs", + f"{args.engine}_{args.dataset}.json", + ) - if args.generate_terms: - # TODO: Write a fixed path for the fulltext benchmark, expand or delete it for the general benchmark + if args.generate_queries: + # TODO: Write a fixed path for the fulltext benchmark generate_query_txt( "datasets/enwiki/enwiki.csv", "datasets/enwiki/enwiki-terms.txt", @@ -106,57 +91,17 @@ def main(): ) sys.exit(0) - for conf_path, engine in config_paths: - if not os.path.exists(conf_path): - logging.info("qdrant does not support full text search") - continue - logging.info("Running {} with {}".format(engine, conf_path)) - if args.query_express >= 1: - shared_counter = multiprocessing.Value("i", 0) - exit_event = multiprocessing.Event() - workers = [] - clients = [] - for i in range(args.query_express): - client = get_client(engine, conf_path, args) - clients.append(client) - worker = multiprocessing.Process( - target=client.search_express_outer, - args=(shared_counter, exit_event), - ) - worker.start() - workers.append(worker) - try: - logging.info(f"Let database warm-up for {WARM_UP_SECONDS} seconds") - time.sleep(WARM_UP_SECONDS) - logging.info( - "Collecting statistics for 30 minutes. Print statistics so far every minute. Type Ctrl+C to quit." - ) - shared_counter.value = 0 - start = time.time() - start_str = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(start)) - counter_prev = 0 - for i in range(int(1800 / REPORT_QPS_INTERVAL)): - time.sleep(REPORT_QPS_INTERVAL) - now = time.time() - counter = shared_counter.value - avg_start = counter / (now - start) - avg_interval = (counter - counter_prev) / REPORT_QPS_INTERVAL - counter_prev = counter - logging.info( - f"average QPS since {start_str}: {avg_start}, average QPS of last interval:{avg_interval}" - ) - except KeyboardInterrupt: - logging.info("Interrupted by user! Exiting...") - except Exception as e: - logging.error(e) - finally: - exit_event.set() - for worker in workers: - worker.join() - else: - client = get_client(engine, conf_path, args) - client.run_experiment(args) - logging.info("Finished {} with {}".format(engine, conf_path)) + if not os.path.exists(conf_path): + logging.error(f"config file doesn't exist: {conf_path}") + return + logging.info( + f"Running engine {args.engine}, dataset {args.dataset}, config file {conf_path}" + ) + client = get_client(args.engine, conf_path) + client.run_experiment(args) + logging.info( + f"Done engine {args.engine}, dataset {args.dataset}, config file {conf_path}" + ) if __name__ == "__main__": diff --git a/python/infinity/remote_thrift/types.py b/python/infinity/remote_thrift/types.py index 83c0c047b3..82e9d9dfc9 100644 --- a/python/infinity/remote_thrift/types.py +++ b/python/infinity/remote_thrift/types.py @@ -141,9 +141,7 @@ def column_vector_to_list(column_type: ttypes.ColumnType, column_data_type: ttyp case ttypes.ColumnType.ColumnInt16: return list(struct.unpack('<{}h'.format(len(column_vector) // 2), column_vector)) case ttypes.ColumnType.ColumnRowID: - all_list = list(struct.unpack('<{}i'.format( - len(column_vector) // 4), column_vector)) - return [all_list[i:i + 2] for i in range(0, len(all_list), 2)] + return list(struct.unpack('<{}q'.format(len(column_vector) // 8), column_vector)) case ttypes.ColumnType.ColumnEmbedding: dimension = column_data_type.physical_type.embedding_type.dimension # print(dimension)