diff --git a/python/benchmark/clients/base_client.py b/python/benchmark/clients/base_client.py index 83c747885a..1df5cadbea 100644 --- a/python/benchmark/clients/base_client.py +++ b/python/benchmark/clients/base_client.py @@ -10,19 +10,18 @@ class BaseClient: """ Base class for all clients(Qdrant, ES, infinity). - mode is a string that corresponds to a JSON file's address in the configurations. + mode is a string that corresponds to a JSON file's address in the configurations. Each client reads the required parameters from the JSON configuration file. """ - @abstractmethod - def __init__(self, - mode: str, - options: argparse.Namespace, - drop_old: bool = True) -> None: + def __init__( + self, conf_path: str, options: argparse.Namespace, drop_old: bool = True + ) -> None: """ - The mode configuration file is parsed to extract the needed parameters, which are then all stored for use by other functions. + The conf_path configuration file is parsed to extract the needed parameters, which are then all stored for use by other functions. """ - pass + self.delta = 0 + self.running = True @abstractmethod def upload(self): @@ -34,7 +33,7 @@ def upload(self): @abstractmethod def search(self) -> list[list[Any]]: """ - Execute the corresponding query tasks (vector search, full-text search, hybrid search) based on the parsed parameters. + Execute the corresponding query tasks (vector search, full-text search, hybrid search) based on the parsed parameters. The function returns id list. """ pass @@ -44,14 +43,14 @@ def download_data(self, url, target_path): Download dataset and extract it into path. """ _, ext = os.path.splitext(url) - if ext == '.bz2': - bz2_path = target_path + '.bz2' - subprocess.run(['wget', '-O', bz2_path, url], check=True) - subprocess.run(['bunzip2', bz2_path], check=True) + if ext == ".bz2": + bz2_path = target_path + ".bz2" + subprocess.run(["wget", "-O", bz2_path, url], check=True) + subprocess.run(["bunzip2", bz2_path], check=True) extracted_path = os.path.splitext(bz2_path)[0] os.rename(extracted_path, target_path) else: - subprocess.run(['wget', '-O', target_path, url], check=True) + subprocess.run(["wget", "-O", target_path, url], check=True) @abstractmethod def check_and_save_results(self, results: list[list[Any]]): @@ -61,6 +60,37 @@ def check_and_save_results(self, results: list[list[Any]]): """ pass + @abstractmethod + def search_express(self, shared_counter, exit_event): + """ + Search in express mode: + - doesn't record per-query latency and result + - call update_shared_counter regularly to update the shared counter and quit when exit_event is set + """ + + def search_express_outer(self, shared_counter, exit_event): + try: + self.search_express(shared_counter, exit_event) + except KeyboardInterrupt: + logging.info("Interrupted by user! Exiting...") + except Exception as e: + logging.error(e) + finally: + logging.info("subprocess exited") + + def update_shared_counter(self, shared_counter, exit_event): + """ + update shared_counter (allocated from shared memory) regularly. + set self.running to False when exit_event is set + """ + self.delta += 1 + if self.delta >= 100: + with shared_counter.get_lock(): + shared_counter.value += self.delta + self.delta = 0 + self.running = not exit_event.is_set() + return + def run_experiment(self, args): """ run experiment and save results. @@ -70,6 +100,6 @@ def run_experiment(self, args): self.upload() finish_time = time.time() logging.info(f"upload finish, cost time = {finish_time - start_time}") - if args.query: + elif args.query: results = self.search() self.check_and_save_results(results) diff --git a/python/benchmark/clients/elasticsearch_client.py b/python/benchmark/clients/elasticsearch_client.py index 0d7cb5a432..69444de76a 100644 --- a/python/benchmark/clients/elasticsearch_client.py +++ b/python/benchmark/clients/elasticsearch_client.py @@ -8,24 +8,26 @@ import h5py import uuid import numpy as np +import random import logging from .base_client import BaseClient class ElasticsearchClient(BaseClient): - def __init__(self, - mode: str, - options: argparse.Namespace, - drop_old: bool = True) -> None: + def __init__( + self, conf_path: str, options: argparse.Namespace, drop_old: bool = True + ) -> None: """ The mode configuration file is parsed to extract the needed parameters, which are then all stored for use by other functions. """ - with open(mode, 'r') as f: + BaseClient.__init__(self, conf_path, drop_old) + with open(conf_path, "r") as f: self.data = json.load(f) - self.client = Elasticsearch(self.data['connection_url']) - self.collection_name = self.data['name'] + self.client = Elasticsearch(self.data["connection_url"]) + self.collection_name = self.data["name"] self.path_prefix = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) + logging.getLogger("elastic_transport").setLevel(logging.WARNING) def upload_batch(self, actions: List): helpers.bulk(self.client, actions) @@ -36,48 +38,70 @@ def upload(self): """ if self.client.indices.exists(index=self.collection_name): self.client.indices.delete(index=self.collection_name) - self.client.indices.create(index=self.collection_name, body=self.data['index']) + self.client.indices.create(index=self.collection_name, body=self.data["index"]) batch_size = self.data["insert_batch_size"] dataset_path = os.path.join(self.path_prefix, self.data["data_path"]) if not os.path.exists(dataset_path): self.download_data(self.data["data_link"], dataset_path) _, ext = os.path.splitext(dataset_path) - if ext == '.json': - with open(dataset_path, 'r') as f: + if ext == ".json": + with open(dataset_path, "r") as f: actions = [] for i, line in enumerate(f): if i % batch_size == 0 and i != 0: self.upload_batch(actions) actions = [] record = json.loads(line) - actions.append({"_index": self.collection_name, "_id": uuid.UUID(int=i).hex, "_source": record}) + actions.append( + { + "_index": self.collection_name, + "_id": uuid.UUID(int=i).hex, + "_source": record, + } + ) if actions: self.upload_batch(actions) - elif ext == '.hdf5' and self.data['mode'] == 'vector': - with h5py.File(dataset_path, 'r') as f: + elif ext == ".hdf5" and self.data["mode"] == "vector": + with h5py.File(dataset_path, "r") as f: actions = [] - for i, line in enumerate(f['train']): + for i, line in enumerate(f["train"]): if i % batch_size == 0 and i != 0: self.upload_batch(actions) actions = [] - record = {self.data['vector_name']: line} - actions.append({"_index": self.collection_name, "_id": uuid.UUID(int=i).hex, "_source": record}) + record = {self.data["vector_name"]: line} + actions.append( + { + "_index": self.collection_name, + "_id": uuid.UUID(int=i).hex, + "_source": record, + } + ) if actions: self.upload_batch(actions) - elif ext == '.csv': + elif ext == ".csv": custom_headers = [] headers = self.data["index"]["mappings"]["properties"] for key, value in headers.items(): custom_headers.append(key) - with open(dataset_path, 'r', encoding='utf-8', errors='replace') as data_file: + with open( + dataset_path, "r", encoding="utf-8", errors="replace" + ) as data_file: current_batch = [] for i, line in enumerate(data_file): - row = line.strip().split('\t') + row = line.strip().split("\t") if len(row) != len(headers): - logging.info(f"row = {i}, row_len = {len(row)}, not equal headers len, skip") + logging.info( + f"row = {i}, row_len = {len(row)}, not equal headers len, skip" + ) continue row_dict = {header: value for header, value in zip(headers, row)} - current_batch.append({"_index": self.collection_name, "_id": uuid.UUID(int=i).hex, "_source": row_dict}) + current_batch.append( + { + "_index": self.collection_name, + "_id": uuid.UUID(int=i).hex, + "_source": row_dict, + } + ) if len(current_batch) >= batch_size: self.upload_batch(current_batch) current_batch = [] @@ -86,47 +110,83 @@ def upload(self): self.upload_batch(current_batch) else: raise TypeError("Unsupported file type") - - self.client.indices.forcemerge(index=self.collection_name, wait_for_completion=True) + + self.client.indices.forcemerge( + index=self.collection_name, wait_for_completion=True + ) def parse_fulltext_query(self, query: dict) -> Any: condition = query["body"]["query"] key, value = list(condition.items())[0] ret = {} - if key == 'and': - ret = { - "query": { - "bool": { - "must": [{"match": item} for item in value] - } - } - } - elif key == 'or': - ret = { - "query": { - "bool": { - "should": [{"match": item} for item in value] - } - } - } - elif key == 'match': - ret = { - "query": { - "match": { - list(value.keys())[0]: list(value.values())[0] - } - } - } + if key == "and": + ret = {"query": {"bool": {"must": [{"match": item} for item in value]}}} + elif key == "or": + ret = {"query": {"bool": {"should": [{"match": item} for item in value]}}} + elif key == "match": + ret = {"query": {"match": {list(value.keys())[0]: list(value.values())[0]}}} return ret - def parse_fulltext_query_content(self, query_content: dict) -> Any: - ret = { - "query": { - "match": query_content - } - } + # https://www.elastic.co/guide/en/elasticsearch/reference/current/query-dsl-match-query.html + def parse_fulltext_query_content(self, query: str) -> Any: + ret = {"query": {"match": {"body": query}}} return ret + def search_express(self, shared_counter, exit_event): + """ + Search in express mode: doesn't record per-query latency and result. + """ + query_path = os.path.join(self.path_prefix, self.data["query_path"]) + logging.info(query_path) + _, ext = os.path.splitext(query_path) + if ext == ".json" or ext == ".jsonl": + assert self.data["mode"] == "fulltext" + with open(query_path, "r") as f: + queries = json.load(f) + while self.running: + query = random.choice(queries) + body = self.parse_fulltext_query(query) + _ = self.client.search(index=self.collection_name, body=body) + self.update_shared_counter(shared_counter, exit_event) + if not self.running: + break + elif ext == ".hdf5" and self.data["mode"] == "vector": + with h5py.File(query_path, "r") as f: + lines = f["test"] + while self.running: + query = random.choice(lines) + knn = { + "field": self.data["vector_name"], + "query_vector": query, + "k": self.data["topK"], + "num_candidates": 200, + } + _ = self.client.search( + index=self.collection_name, + source=["_id", "_score"], + knn=knn, + size=self.data["topK"], + ) + self.update_shared_counter(shared_counter, exit_event) + if not self.running: + break + elif ext == ".txt": + assert self.data["mode"] == "fulltext" + with open(query_path, "r") as f: + lines = f.readlines() + while self.running: + line = random.choice(lines) + body = self.parse_fulltext_query_content(line) + _ = self.client.search( + index=self.collection_name, body=body, size=self.data["topK"] + ) + self.update_shared_counter(shared_counter, exit_event) + if not self.running: + break + else: + raise TypeError("Unsupported file type") + return + def search(self) -> list[list[Any]]: """ Execute the corresponding query tasks (vector search, full-text search, hybrid search) based on the parsed parameters. @@ -136,94 +196,124 @@ def search(self) -> list[list[Any]]: logging.info(query_path) results = [] _, ext = os.path.splitext(query_path) - if ext == '.json' or ext == '.jsonl': - with open(query_path, 'r') as f: + if ext == ".json" or ext == ".jsonl": + with open(query_path, "r") as f: queries = json.load(f) - if self.data['mode'] == 'fulltext': + if self.data["mode"] == "fulltext": for query in queries: body = self.parse_fulltext_query(query) - topk = self.data['topK'] start = time.time() - result = self.client.search(index=self.collection_name, - body=body) + result = self.client.search( + index=self.collection_name, body=body + ) end = time.time() latency = (end - start) * 1000 - result = [(uuid.UUID(hex=hit['_id']).int, hit['_score']) for hit in result['hits']['hits']] + result = [ + (uuid.UUID(hex=hit["_id"]).int, hit["_score"]) + for hit in result["hits"]["hits"] + ] result.append(latency) results.append(result) - elif ext == '.hdf5' and self.data['mode'] == 'vector': - with h5py.File(query_path, 'r') as f: - for query in f['test']: + elif ext == ".hdf5" and self.data["mode"] == "vector": + with h5py.File(query_path, "r") as f: + for query in f["test"]: knn = { "field": self.data["vector_name"], "query_vector": query, "k": self.data["topK"], - "num_candidates": 200 + "num_candidates": 200, } start = time.time() - result = self.client.search(index=self.collection_name, - source=["_id", "_score"], - knn=knn, - size=self.data["topK"]) + result = self.client.search( + index=self.collection_name, + source=["_id", "_score"], + knn=knn, + size=self.data["topK"], + ) end = time.time() latency = (end - start) * 1000 - result = [(uuid.UUID(hex=hit['_id']).int, hit['_score']) for hit in result['hits']['hits']] + result = [ + (uuid.UUID(hex=hit["_id"]).int, hit["_score"]) + for hit in result["hits"]["hits"] + ] result.append(latency) results.append(result) - elif ext == '.txt': - with open(query_path, 'r') as f: - if self.data['mode'] == 'fulltext': + elif ext == ".txt": + with open(query_path, "r") as f: + if self.data["mode"] == "fulltext": for line in f: - query = {"body" : line[:-1]} - body = self.parse_fulltext_query_content(query) - topk = self.data['topK'] + body = self.parse_fulltext_query_content(line) start = time.time() - result = self.client.search(index=self.collection_name, - body=body) + result = self.client.search( + index=self.collection_name, + body=body, + size=self.data["topK"], + ) end = time.time() latency = (end - start) * 1000 - result = [(uuid.UUID(hex=hit['_id']).int, hit['_score']) for hit in result['hits']['hits']] + result = [ + (uuid.UUID(hex=hit["_id"]).int, hit["_score"]) + for hit in result["hits"]["hits"] + ] result.append(latency) - logging.info(f"{line[:-1]}, {latency}") + # logging.debug(f"{line[:-1]}, {latency}") results.append(result) else: raise TypeError("Unsupported file type") return results - + def check_and_save_results(self, results: List[List[Any]]): - if 'ground_truth_path' in self.data: - ground_truth_path = self.data['ground_truth_path'] + if "ground_truth_path" in self.data: + ground_truth_path = self.data["ground_truth_path"] _, ext = os.path.splitext(ground_truth_path) precisions = [] latencies = [] - if ext == '.hdf5': - with h5py.File(ground_truth_path, 'r') as f: - expected_result = f['neighbors'] + if ext == ".hdf5": + with h5py.File(ground_truth_path, "r") as f: + expected_result = f["neighbors"] for i, result in enumerate(results): ids = set(x[0] for x in result[:-1]) - precision = len(ids.intersection(expected_result[i][:self.data['topK']])) / self.data['topK'] + precision = ( + len( + ids.intersection( + expected_result[i][: self.data["topK"]] + ) + ) + / self.data["topK"] + ) precisions.append(precision) latencies.append(result[-1]) - elif ext == '.json' or ext == '.jsonl': - with open(ground_truth_path, 'r') as f: + elif ext == ".json" or ext == ".jsonl": + with open(ground_truth_path, "r") as f: for i, line in enumerate(f): expected_result = json.loads(line) result = results[i] ids = set(x[0] for x in result[:-1]) - precision = len(ids.intersection(expected_result['expected_results'][:self.data['topK']])) / self.data['topK'] + precision = ( + len( + ids.intersection( + expected_result["expected_results"][ + : self.data["topK"] + ] + ) + ) + / self.data["topK"] + ) precisions.append(precision) latencies.append(result[-1]) logging.info( - f'''mean_time: {np.mean(latencies)}, mean_precisions: {np.mean(precisions)}, + f"""mean_time: {np.mean(latencies)}, mean_precisions: {np.mean(precisions)}, std_time: {np.std(latencies)}, min_time: {np.min(latencies)}, \n max_time: {np.max(latencies)}, p95_time: {np.percentile(latencies, 95)}, - p99_time: {np.percentile(latencies, 99)}''') + p99_time: {np.percentile(latencies, 99)}""" + ) else: latencies = [] for result in results: latencies.append(result[-1]) logging.info( - f'''mean_time: {np.mean(latencies)}, std_time: {np.std(latencies)}, + f"""mean_time: {np.mean(latencies)}, std_time: {np.std(latencies)}, max_time: {np.max(latencies)}, min_time: {np.min(latencies)}, - p95_time: {np.percentile(latencies, 95)}, p99_time: {np.percentile(latencies, 99)}''') + p95_time: {np.percentile(latencies, 95)}, p99_time: {np.percentile(latencies, 99)}""" + ) diff --git a/python/benchmark/clients/infinity_client.py b/python/benchmark/clients/infinity_client.py index e7e8e056e8..8e7de74bfb 100644 --- a/python/benchmark/clients/infinity_client.py +++ b/python/benchmark/clients/infinity_client.py @@ -5,6 +5,7 @@ import time import numpy as np from typing import Any, List +import random import logging import infinity @@ -12,28 +13,29 @@ from infinity import NetworkAddress from .base_client import BaseClient + class InfinityClient(BaseClient): - def __init__(self, - mode: str, - options: argparse.Namespace, - drop_old: bool = True) -> None: + def __init__( + self, conf_path: str, options: argparse.Namespace, drop_old: bool = True + ) -> None: """ - The mode configuration file is parsed to extract the needed parameters, which are then all stored for use by other functions. + The conf_path configuration file is parsed to extract the needed parameters, which are then all stored for use by other functions. """ - with open(mode, 'r') as f: + BaseClient.__init__(self, conf_path, drop_old) + with open(conf_path, "r") as f: self.data = json.load(f) self.client = infinity.connect(NetworkAddress("127.0.0.1", 23817)) - self.collection_name = self.data['name'] + self.collection_name = self.data["name"] self.path_prefix = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) def _parse_index_schema(self, index_schema): indexs = [] for key, value in index_schema.items(): - if value['type'] == 'text': + if value["type"] == "text": indexs.append(index.IndexInfo(key, index.IndexType.FullText, [])) - elif value['type'] == 'HNSW': + elif value["type"] == "HNSW": params = [] - for param, v in value['params'].items(): + for param, v in value["params"].items(): params.append(index.InitParameter(param, str(v))) indexs.append(index.IndexInfo(key, index.IndexType.Hnsw, params)) return indexs @@ -52,8 +54,8 @@ def upload(self): batch_size = self.data["insert_batch_size"] features = list(self.data["schema"].keys()) _, ext = os.path.splitext(dataset_path) - if ext == '.json': - with open(dataset_path, 'r') as f: + if ext == ".json": + with open(dataset_path, "r") as f: actions = [] for i, line in enumerate(f): if i % batch_size == 0 and i != 0: @@ -66,37 +68,43 @@ def upload(self): actions.append(action) if actions: table_obj.insert(actions) - elif ext == '.hdf5': - with h5py.File(dataset_path, 'r') as f: + elif ext == ".hdf5": + with h5py.File(dataset_path, "r") as f: actions = [] # line is vector - for i, line in enumerate(f['train']): + for i, line in enumerate(f["train"]): if i % batch_size == 0 and i != 0: table_obj.insert(actions) actions = [] - record = {self.data['vector_name']: line.tolist()} + record = {self.data["vector_name"]: line.tolist()} actions.append(record) if actions: table_obj.insert(actions) - elif ext == '.csv': + elif ext == ".csv": if self.data["use_import"]: - table_obj.import_data(dataset_path, import_options={"delimiter": "\t"}) + table_obj.import_data(dataset_path, import_options={"delimiter": "\t"}) else: custom_headers = [] headers = self.data["schema"] for key, value in headers.items(): custom_headers.append(key) - with open(dataset_path, 'r', encoding='utf-8', errors='replace') as data_file: + with open( + dataset_path, "r", encoding="utf-8", errors="replace" + ) as data_file: current_batch = [] for i, line in enumerate(data_file): - row = line.strip().split('\t') - if (i % 100000 == 0): + row = line.strip().split("\t") + if i % 100000 == 0: logging.info(f"row {i}") if len(row) != len(headers): - logging.info(f"row = {i}, row_len = {len(row)}, not equal headers len, skip") + logging.info( + f"row = {i}, row_len = {len(row)}, not equal headers len, skip" + ) continue - row_dict = {header: value for header, value in zip(headers, row)} + row_dict = { + header: value for header, value in zip(headers, row) + } current_batch.append(row_dict) if len(current_batch) >= batch_size: table_obj.insert(current_batch) @@ -115,13 +123,63 @@ def parse_fulltext_query(self, query: dict) -> Any: key, value = list(condition.items())[0] ret = f'{list(value.keys())[0]}:"{list(value.values())[0]}"' - if key == 'and': - ret = '&&'.join(f'{list(value.keys())[0]}:"{list(value.values())[0]}"') - elif key == 'or': - ret = '||'.join(f'{list(value.keys())[0]}:"{list(value.values())[0]}"') + if key == "and": + ret = "&&".join(f'{list(value.keys())[0]}:"{list(value.values())[0]}"') + elif key == "or": + ret = "||".join(f'{list(value.keys())[0]}:"{list(value.values())[0]}"') return ret + def search_express(self, shared_counter, exit_event): + """ + Search in express mode: doesn't record per-query latency and result. + """ + db_obj = self.client.get_database("default_db") + table_obj = db_obj.get_table(self.collection_name) + query_path = os.path.join(self.path_prefix, self.data["query_path"]) + _, ext = os.path.splitext(query_path) + if ext == ".hdf5": + with h5py.File(query_path, "r") as f: + lines = f["test"] + while self.running: + line = random.choice(lines) + _, _ = ( + table_obj.output(["_row_id"]) + .knn( + self.data["vector_name"], + line.tolist(), + "float", + self.data["metric_type"], + self.data["topK"], + ) + .to_result() + ) + self.update_shared_counter(shared_counter, exit_event) + if not self.running: + break + elif ext == ".txt": + assert self.data["mode"] == "fulltext" + with open(query_path, "r") as f: + lines = f.readlines() + while self.running: + line = random.choice(lines) + condition = f"{line.strip()}" + res, _ = ( + table_obj.output(["_row_id", "_score"]) + .match( + "", + condition, + f"topn={self.data['topK']};default_field=body", + ) + .to_result() + ) + self.update_shared_counter(shared_counter, exit_event) + if not self.running: + break + else: + raise TypeError("Unsupported file type") + return + def search(self) -> list[list[Any]]: """ Execute the corresponding query tasks (vector search, full-text search, hybrid search) based on the parsed parameters. @@ -132,80 +190,106 @@ def search(self) -> list[list[Any]]: query_path = os.path.join(self.path_prefix, self.data["query_path"]) _, ext = os.path.splitext(query_path) results = [] - if ext == '.hdf5': - with h5py.File(query_path, 'r') as f: - for line in f['test']: - start = time.time() - res, _ = table_obj.output(["_row_id"]).knn(self.data["vector_name"], line.tolist(), "float", self.data['metric_type'], self.data["topK"]).to_result() - latency = (time.time() - start) * 1000 - result = [[x[0] for x in res['ROW_ID']]] - result.append(latency) - results.append(result) - elif ext == '.json': - with open(query_path, 'r') as f: - queries = json.load(f) - for query in queries: - if self.data['mode'] == 'fulltext': - match_condition = self.parse_fulltext_query(query) + if ext == ".hdf5": + with h5py.File(query_path, "r") as f: + for line in f["test"]: start = time.time() - res, _ = table_obj.output(["_row_id", "_score"]).match("", match_condition, f"topn={self.data['topK']}").to_result() + res, _ = ( + table_obj.output(["_row_id"]) + .knn( + self.data["vector_name"], + line.tolist(), + "float", + self.data["metric_type"], + self.data["topK"], + ) + .to_result() + ) latency = (time.time() - start) * 1000 - result = [(row_id[0], score) for row_id, score in zip(res['ROW_ID'], res['SCORE'])] + result = [[x[0] for x in res["ROW_ID"]]] result.append(latency) results.append(result) - elif ext == '.txt': - with open(query_path, 'r') as f: - if self.data['mode'] == 'fulltext': + elif ext == ".txt": + with open(query_path, "r") as f: + if self.data["mode"] == "fulltext": queries = f.readlines() for query_line in queries: - query = query_line[:-1] - condition = f"body:'{query}'" + condition = f"{query_line}" start = time.time() - res, _ = table_obj.output(["_row_id", "_score"]).match("", condition, f"topn={self.data['topK']}").to_result() + res, _ = ( + table_obj.output(["_row_id", "_score"]) + .match( + "", + condition, + f"topn={self.data['topK']};default_field=body", + ) + .to_result() + ) latency = (time.time() - start) * 1000 - result = [(row_id[0], score) for row_id, score in zip(res['ROW_ID'], res['SCORE'])] + result = [ + (row_id[0], score) + for row_id, score in zip(res["ROW_ID"], res["SCORE"]) + ] result.append(latency) - logging.info(f"{query}, {latency}") + # logging.debug(f"{query}, {latency}") results.append(result) else: raise TypeError("Unsupported file type") return results def check_and_save_results(self, results: List[List[Any]]): - if 'ground_truth_path' in self.data: - ground_truth_path = self.data['ground_truth_path'] + if "ground_truth_path" in self.data: + ground_truth_path = self.data["ground_truth_path"] _, ext = os.path.splitext(ground_truth_path) precisions = [] latencies = [] - if ext == '.hdf5': - with h5py.File(ground_truth_path, 'r') as f: - expected_result = f['neighbors'] + if ext == ".hdf5": + with h5py.File(ground_truth_path, "r") as f: + expected_result = f["neighbors"] assert len(expected_result) == len(results) for i, result in enumerate(results): ids = set(result[0]) - precision = len(ids.intersection(expected_result[i][:self.data['topK']])) / self.data['topK'] + precision = ( + len( + ids.intersection( + expected_result[i][: self.data["topK"]] + ) + ) + / self.data["topK"] + ) precisions.append(precision) latencies.append(result[-1]) - elif ext == '.json' or ext == '.jsonl': - with open(ground_truth_path, 'r') as f: + elif ext == ".json" or ext == ".jsonl": + with open(ground_truth_path, "r") as f: for i, line in enumerate(f): expected_result = json.loads(line) result = results[i] ids = set(x[0] for x in result[:-1]) - precision = len(ids.intersection(expected_result['expected_results'][:self.data['topK']])) / self.data['topK'] + precision = ( + len( + ids.intersection( + expected_result["expected_results"][ + : self.data["topK"] + ] + ) + ) + / self.data["topK"] + ) precisions.append(precision) latencies.append(result[-1]) logging.info( - f'''mean_time: {np.mean(latencies)}, mean_precisions: {np.mean(precisions)}, + f"""mean_time: {np.mean(latencies)}, mean_precisions: {np.mean(precisions)}, std_time: {np.std(latencies)}, min_time: {np.min(latencies)}, max_time: {np.max(latencies)}, p95_time: {np.percentile(latencies, 95)}, - p99_time: {np.percentile(latencies, 99)}''') + p99_time: {np.percentile(latencies, 99)}""" + ) else: latencies = [] for result in results: latencies.append(result[-1]) logging.info( - f'''mean_time: {np.mean(latencies)}, std_time: {np.std(latencies)}, + f"""mean_time: {np.mean(latencies)}, std_time: {np.std(latencies)}, max_time: {np.max(latencies)}, min_time: {np.min(latencies)}, - p95_time: {np.percentile(latencies, 95)}, p99_time: {np.percentile(latencies, 99)}''') + p95_time: {np.percentile(latencies, 95)}, p99_time: {np.percentile(latencies, 99)}""" + ) diff --git a/python/benchmark/clients/qdrant_client.py b/python/benchmark/clients/qdrant_client.py index db78665849..58e8821865 100644 --- a/python/benchmark/clients/qdrant_client.py +++ b/python/benchmark/clients/qdrant_client.py @@ -7,42 +7,40 @@ import json import h5py from typing import Any +import random import logging from .base_client import BaseClient + class QdrantClient(BaseClient): - def __init__(self, - mode: str, - options: argparse.Namespace, - drop_old: bool = True) -> None: - with open(mode, 'r') as f: + def __init__( + self, conf_path: str, options: argparse.Namespace, drop_old: bool = True + ) -> None: + BaseClient.__init__(self, conf_path, drop_old) + with open(conf_path, "r") as f: self.data = json.load(f) - self.client = QC(self.data['connection_url']) - self.collection_name = self.data['name'] - if self.data['distance'] == 'cosine': + self.client = QC(self.data["connection_url"]) + self.collection_name = self.data["name"] + if self.data["distance"] == "cosine": self.distance = Distance.COSINE - elif self.data['distance'] == 'L2': + elif self.data["distance"] == "L2": self.distance = Distance.EUCLID self.path_prefix = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) - - def upload_bach(self, ids: list[int], vectors, payloads = None): + + def upload_bach(self, ids: list[int], vectors, payloads=None): self.client.upsert( collection_name=self.collection_name, - points=models.Batch( - ids=ids, - vectors=vectors, - payloads=payloads - ), - wait=True + points=models.Batch(ids=ids, vectors=vectors, payloads=payloads), + wait=True, ) def upload(self): # get the dataset (downloading is completed in run.py) - if 'vector_index' in self.data: - index_config = self.data['vector_index'] - if index_config['type'] == "HNSW": - hnsw_params = index_config['index_params'] + if "vector_index" in self.data: + index_config = self.data["vector_index"] + if index_config["type"] == "HNSW": + hnsw_params = index_config["index_params"] hnsw_config = models.HnswConfigDiff( m=hnsw_params.get("M", None), ef_construct=hnsw_params.get("ef_construct", None), @@ -53,105 +51,157 @@ def upload(self): ) else: hnsw_config = None - - self.client.recreate_collection(collection_name=self.collection_name, - vectors_config=VectorParams( - size=self.data['vector_size'], - distance=self.distance), - hnsw_config=hnsw_config) + + self.client.recreate_collection( + collection_name=self.collection_name, + vectors_config=VectorParams( + size=self.data["vector_size"], distance=self.distance + ), + hnsw_config=hnsw_config, + ) # set payload index - if 'payload_index_schema' in self.data: - for field_name, field_schema in self.data['payload_index_schema'].items(): - self.client.create_payload_index(collection_name=self.collection_name, - field_name=field_name, - field_schema=field_schema) - + if "payload_index_schema" in self.data: + for field_name, field_schema in self.data["payload_index_schema"].items(): + self.client.create_payload_index( + collection_name=self.collection_name, + field_name=field_name, + field_schema=field_schema, + ) + # set full text index - if 'full_text_index_schema' in self.data: - for field_name, schema in self.data['full_text_index_schema'].items(): + if "full_text_index_schema" in self.data: + for field_name, schema in self.data["full_text_index_schema"].items(): field_schema = models.TextIndexParams( - type="text", - tokenizer=schema.get("tokenizer", None), - min_token_len=schema.get("min_token_len", None), - max_token_len=schema.get("max_token_len", None), - lowercase=schema.get("lowercase", None), - ) - self.client.create_payload_index(collection_name=self.collection_name, - field_name=field_name, - field_schema=field_schema) - dataset_path = os.path.join(self.path_prefix, self.data['data_path']) + type="text", + tokenizer=schema.get("tokenizer", None), + min_token_len=schema.get("min_token_len", None), + max_token_len=schema.get("max_token_len", None), + lowercase=schema.get("lowercase", None), + ) + self.client.create_payload_index( + collection_name=self.collection_name, + field_name=field_name, + field_schema=field_schema, + ) + dataset_path = os.path.join(self.path_prefix, self.data["data_path"]) if not os.path.exists(dataset_path): - self.download_data(self.data['data_link'], dataset_path) - vector_name = self.data['vector_name'] - batch_size=self.data['insert_batch_size'] + self.download_data(self.data["data_link"], dataset_path) + vector_name = self.data["vector_name"] + batch_size = self.data["insert_batch_size"] total_time = 0 _, ext = os.path.splitext(dataset_path) - if ext == '.json': - with open(dataset_path, 'r') as f: - vectors = [] - payloads = [] - for i, line in enumerate(f): - if i % batch_size == 0 and i != 0: - start_time = time.time() - self.upload_bach(list(range(i-batch_size, i)), vectors, payloads) - end_time = time.time() - total_time += end_time - start_time - # reset vectors and payloads for the next batch - vectors = [] - payloads = [] - record = json.loads(line) - vectors.append(record[vector_name]) - del record[vector_name] - payloads.append(record) - if vectors: + if ext == ".json": + with open(dataset_path, "r") as f: + vectors = [] + payloads = [] + for i, line in enumerate(f): + if i % batch_size == 0 and i != 0: start_time = time.time() - self.upload_bach(list(range(i-len(vectors)+1, i+1)), vectors, payloads) + self.upload_bach( + list(range(i - batch_size, i)), vectors, payloads + ) end_time = time.time() total_time += end_time - start_time - elif ext == '.hdf5': + # reset vectors and payloads for the next batch + vectors = [] + payloads = [] + record = json.loads(line) + vectors.append(record[vector_name]) + del record[vector_name] + payloads.append(record) + if vectors: + start_time = time.time() + self.upload_bach( + list(range(i - len(vectors) + 1, i + 1)), vectors, payloads + ) + end_time = time.time() + total_time += end_time - start_time + elif ext == ".hdf5": with h5py.File(dataset_path) as f: vectors = [] - for i, line in enumerate(f['train']): + for i, line in enumerate(f["train"]): if i % batch_size == 0 and i != 0: - self.upload_bach(list(range(i-batch_size, i)), vectors) - vectors= [] + self.upload_bach(list(range(i - batch_size, i)), vectors) + vectors = [] vectors.append(line) if vectors: - self.upload_bach(list(range(i-len(vectors)+1, i+1)), vectors) + self.upload_bach(list(range(i - len(vectors) + 1, i + 1)), vectors) else: raise TypeError("Unsupported file type") + def search_express(self, shared_counter, exit_event): + """ + Search in express mode: doesn't record per-query latency and result. + """ + # get the queries path + query_path = os.path.join(self.path_prefix, self.data["query_path"]) + results = [] + _, ext = os.path.splitext(query_path) + if ext == ".json" and self.data["mode"] == "vector": + with open(query_path, "r") as f: + lines = f.readlines() + while self.running: + line = random.choice(lines) + query = json.loads(line) + _ = self.client.search( + collection_name=self.collection_name, + query_vector=query["vector"], + limit=self.data.get("topK", 10), + with_payload=False, + ) + if not self.running: + break + elif ext == ".hdf5" and self.data["mode"] == "vector": + with h5py.File(query_path, "r") as f: + lines = f["test"] + while self.running: + line = random.choice(lines) + _ = self.client.search( + collection_name=self.collection_name, + query_vector=line, + limit=self.data.get("topK", 10), + ) + if not self.running: + break + else: + raise TypeError("Unsupported file type") + return results + def search(self) -> list[list[Any]]: # get the queries path - query_path = os.path.join(self.path_prefix, self.data['query_path']) + query_path = os.path.join(self.path_prefix, self.data["query_path"]) results = [] _, ext = os.path.splitext(query_path) - if ext == '.json' and self.data['mode'] == 'vector': - with open(query_path, 'r') as f: + if ext == ".json" and self.data["mode"] == "vector": + with open(query_path, "r") as f: for line in f.readlines(): query = json.loads(line) start = time.time() result = self.client.search( collection_name=self.collection_name, - query_vector=query['vector'], - limit=self.data.get('topK', 10), - with_payload=False + query_vector=query["vector"], + limit=self.data.get("topK", 10), + with_payload=False, ) end = time.time() - logging.info(f"latency of search: {(end - start)*1000:.2f} milliseconds") + logging.info( + f"latency of search: {(end - start)*1000:.2f} milliseconds" + ) results.append(result) - elif ext == '.hdf5' and self.data['mode'] == 'vector': - with h5py.File(query_path, 'r') as f: + elif ext == ".hdf5" and self.data["mode"] == "vector": + with h5py.File(query_path, "r") as f: start = time.time() - for line in f['test']: + for line in f["test"]: result = self.client.search( collection_name=self.collection_name, query_vector=line, - limit=self.data.get('topK', 10), + limit=self.data.get("topK", 10), ) results.append(result) end = time.time() - logging.info(f"latency of KNN search: {(end - start)*1000/len(f['test']):.2f} milliseconds") + logging.info( + f"latency of KNN search: {(end - start)*1000/len(f['test']):.2f} milliseconds" + ) else: raise TypeError("Unsupported file type") return results diff --git a/python/benchmark/generate_query_json.py b/python/benchmark/generate_query_json.py index 7d27328c94..2020956d39 100644 --- a/python/benchmark/generate_query_json.py +++ b/python/benchmark/generate_query_json.py @@ -1,74 +1,43 @@ -import json -import csv import random import os +import re +from collections import Counter +import logging -def load_terms(file_path): - data = [] - _, ext = os.path.splitext(file_path) - if ext == '.csv': - with open(file_path, 'r', encoding='utf-8', errors='replace') as file: - reader = csv.reader(file, delimiter='\t') - line_count = 0 - for row in reader: - if line_count >= 100: - break - if len(row) >= 3: - body = row[2] - terms = body.split() - data.extend(terms) - line_count += 1 - elif ext == '.txt': - with open(file_path, 'r', encoding='utf-8', errors='replace') as file: - for line in file: - columns = line.split() - if columns[0] == "HighTerm:": - continue - if "'" in columns[1]: - continue - data.append(columns[1]) - return data +def generate_terms_txt(src_fp, dst_fp): + word_counts = Counter() + i = 0 + with open(src_fp, 'r', encoding='utf-8', errors='ignore') as file: + for line in file: + words = re.findall(r'\b[a-zA-Z]+\b', line) # find pure alphabet words + words = [x.lower() for x in words] + word_counts.update(words) + i += 1 + word_counts = Counter({key: value for key, value in word_counts.items() if value >= 100}) # remove words which are very very rare + logging.info(f"read {i} lines {len(word_counts)} words from {src_fp} to generate terms file {dst_fp}") + sorted_words = word_counts.most_common() # in descending order + with open(dst_fp, 'w', encoding='utf-8') as file: + for word, count in sorted_words: + file.write(f'{word} {count}\n') -def random_select(data, n): - return random.sample(data, n) - -def generate_query_txt(terms_path, query_cnt = 1, terms_count = 4, operation_path="datasets/enwiki/operations.txt"): - querys = [] - for i in range(query_cnt): - new_value = "" - terms = load_terms(terms_path) - if terms_count > 0: - query_terms = random_select(terms, terms_count) - else: - terms_count = random.randint(1, -terms_count + 1) - query_terms = random_select(terms, terms_count) - new_value = " ".join(query_terms) - querys.append(new_value) - with open(operation_path, 'w') as file: - for query in querys: - file.write(query + '\n') +def load_terms(terms_path): + data = [] + with open(terms_path, 'r', encoding='utf-8', errors='ignore') as file: + for line in file: + columns = line.split() + data.append(columns[0]) + return data -def generate_query_json(terms_path, terms_count = 10, operation_path="datasets/enwiki/operations.json"): - data = [ - { - "name": "term", - "body": { - "query": { - "match": { - "body": "" - } - } - } - } - ] - new_value = "" +def generate_query_txt(dataset_path, terms_path, query_path, query_cnt = 100000): + if not os.path.exists(terms_path): + generate_terms_txt(dataset_path, terms_path) terms = load_terms(terms_path) - query_terms = random_select(terms, terms_count) - new_value = " ".join(query_terms) - data[0]["body"]["query"]["match"]["body"] = new_value - print(f"generate terms = {new_value}") - - with open(operation_path, "w") as json_file: - json.dump(data, json_file, indent=2) \ No newline at end of file + probabilities = [0.03, 0.15, 0.25, 0.25, 0.15, 0.08, 0.04, 0.03, 0.02] + with open(query_path, 'w') as file: + for i in range(query_cnt): + terms_count = 1 + random.choices(population=range(len(probabilities)), weights=probabilities, k=1)[0] + query_terms = random.sample(terms, terms_count) + query = " ".join(query_terms) + file.write(query + '\n') diff --git a/python/benchmark/run.py b/python/benchmark/run.py index 7f13f665d7..d8fc0abc85 100644 --- a/python/benchmark/run.py +++ b/python/benchmark/run.py @@ -1,74 +1,163 @@ import argparse import os import logging +import sys +import time +import multiprocessing from clients.elasticsearch_client import ElasticsearchClient from clients.infinity_client import InfinityClient from clients.qdrant_client import QdrantClient from generate_query_json import generate_query_txt -ENGINES = ['infinity', 'qdrant', 'elasticsearch'] -DATA_SETS = ['gist', 'sift', 'geonames', 'enwiki'] +ENGINES = ["infinity", "qdrant", "elasticsearch"] +DATA_SETS = ["gist", "sift", "geonames", "enwiki"] + +WARM_UP_SECONDS = 60 +REPORT_QPS_INTERVAL = 60 -def parse_args() -> argparse.Namespace: - parser: argparse.ArgumentParser = argparse.ArgumentParser(description="Vector Database Benchmark") - parser.add_argument('-e', '--engine', type=str, default='all', dest='engine') - parser.add_argument('-m', '--mode', type=str, default='all', dest='mode') - parser.add_argument('-t', '--threads', type=int, default=1, dest='threads') - parser.add_argument('-r', '--rounds', type=int, default=5, dest='rounds') - parser.add_argument('--hardware', type=str, default='8c_16g', dest='hardware') - parser.add_argument('--limit-ram', type=str, default='16g', dest='limit_ram') - parser.add_argument('--limit-cpu', type=int, default=8, dest='limit_cpu') - parser.add_argument('--generate-query-num', type=int, default=1, dest='query_num') - parser.add_argument('--generate-term-num', type=int, default=4, dest='term_num') - parser.add_argument('--query', action='store_true', dest='query') - parser.add_argument('--import', action='store_true', dest='import_data') - parser.add_argument('--generate', action='store_true', dest='generate_terms') +def parse_args() -> argparse.Namespace: + parser: argparse.ArgumentParser = argparse.ArgumentParser( + description="Vector Database Benchmark" + ) + parser.add_argument( + "--generate", + action="store_true", + dest="generate_terms", + help="Generate fulltext queries based on the dataset", + ) + parser.add_argument( + "--import", + action="store_true", + dest="import_data", + help="Import data set into database engine", + ) + parser.add_argument( + "--query", + action="store_true", + dest="query", + help="Run single client to benchmark query latency", + ) + parser.add_argument( + "--query-express", + type=int, + default=0, + dest="query_express", + help="Run multiple clients in express mode to benchmark QPS", + ) + parser.add_argument( + "--engine", + type=str, + default="all", + dest="engine", + help="database engine to benchmark, one of: all, " + ", ".join(ENGINES), + ) + parser.add_argument( + "--dataset", + type=str, + default="all", + dest="dataset", + help="data set to benchmark, one of: all, " + ", ".join(DATA_SETS), + ) return parser.parse_args() + def generate_config_paths(kwargs: argparse.Namespace) -> list[tuple[str, str]]: paths = [] - config_path_prefix = os.path.join(os.path.dirname(os.path.abspath(__file__)), "configs") - - def add_path(_engine: str, _mode: str): - paths.append((os.path.join(config_path_prefix, f"{_engine}_{_mode}.json"), _engine)) - - engines = ENGINES if kwargs.engine == 'all' else [kwargs.engine] - modes = DATA_SETS if kwargs.mode == 'all' else [kwargs.mode] - + config_path_prefix = os.path.join( + os.path.dirname(os.path.abspath(__file__)), "configs" + ) + engines = ENGINES if kwargs.engine == "all" else [kwargs.engine] + datasets = DATA_SETS if kwargs.dataset == "all" else [kwargs.dataset] for engine in engines: - for mode in modes: - add_path(engine, mode) + for dataset in datasets: + paths.append( + (os.path.join(config_path_prefix, f"{engine}_{dataset}.json"), engine) + ) return paths -def get_client(engine: str, config: str, options: argparse.Namespace): - if engine == 'qdrant': - return QdrantClient(config, options) - elif engine == 'elasticsearch': - return ElasticsearchClient(config, options) - elif engine == 'infinity': - return InfinityClient(config, options) + +def get_client(engine: str, conf_path: str, options: argparse.Namespace): + if engine == "qdrant": + return QdrantClient(conf_path, options) + elif engine == "elasticsearch": + return ElasticsearchClient(conf_path, options) + elif engine == "infinity": + return InfinityClient(conf_path, options) else: raise ValueError(f"Unknown engine: {engine}") -if __name__ == '__main__': - logging.basicConfig(level=logging.INFO, format='%(asctime)-15s %(levelname)-8s %(message)s') + +def main(): + logging.basicConfig( + level=logging.INFO, + format="%(asctime)-15s %(levelname)-8s (%(process)d) %(message)s", + ) args = parse_args() config_paths = generate_config_paths(args) if args.generate_terms: # TODO: Write a fixed path for the fulltext benchmark, expand or delete it for the general benchmark - generate_query_txt("datasets/enwiki/enwiki-top-terms.txt", - query_cnt=args.query_num, - terms_count=args.term_num, - operation_path="datasets/enwiki/operations.txt") + generate_query_txt( + "datasets/enwiki/enwiki.csv", + "datasets/enwiki/enwiki-terms.txt", + "datasets/enwiki/operations.txt", + ) + sys.exit(0) - for path, engine in config_paths: - if not os.path.exists(path): + for conf_path, engine in config_paths: + if not os.path.exists(conf_path): logging.info("qdrant does not support full text search") continue - logging.info("Running {} with {}".format(engine, path)) - client = get_client(engine, path, args) - client.run_experiment(args) - logging.info("Finished {} with {}".format(engine, path)) + logging.info("Running {} with {}".format(engine, conf_path)) + if args.query_express >= 1: + shared_counter = multiprocessing.Value("i", 0) + exit_event = multiprocessing.Event() + workers = [] + clients = [] + for i in range(args.query_express): + client = get_client(engine, conf_path, args) + clients.append(client) + worker = multiprocessing.Process( + target=client.search_express_outer, + args=(shared_counter, exit_event), + ) + worker.start() + workers.append(worker) + try: + logging.info(f"Let database warm-up for {WARM_UP_SECONDS} seconds") + time.sleep(WARM_UP_SECONDS) + logging.info( + "Collecting statistics for 30 minutes. Print statistics so far every minute. Type Ctrl+C to quit." + ) + shared_counter.value = 0 + start = time.time() + start_str = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(start)) + counter_prev = 0 + for i in range(int(1800 / REPORT_QPS_INTERVAL)): + time.sleep(REPORT_QPS_INTERVAL) + now = time.time() + counter = shared_counter.value + avg_start = counter / (now - start) + avg_interval = (counter - counter_prev) / REPORT_QPS_INTERVAL + counter_prev = counter + logging.info( + f"average QPS since {start_str}: {avg_start}, average QPS of last interval:{avg_interval}" + ) + except KeyboardInterrupt: + logging.info("Interrupted by user! Exiting...") + except Exception as e: + logging.error(e) + finally: + exit_event.set() + for worker in workers: + worker.join() + else: + client = get_client(engine, conf_path, args) + client.run_experiment(args) + logging.info("Finished {} with {}".format(engine, conf_path)) + + +if __name__ == "__main__": + main() diff --git a/scripts/Dockerfile_infinity_builder_centos7 b/scripts/Dockerfile_infinity_builder_centos7 index 9ae443cfb8..26bfec2ee6 100644 --- a/scripts/Dockerfile_infinity_builder_centos7 +++ b/scripts/Dockerfile_infinity_builder_centos7 @@ -143,7 +143,7 @@ RUN --mount=type=bind,source=openssl-1.1.1w.tar.gz,target=/root/openssl-1.1.1w.t # packages need by python optional modules: _dbm _gdbm _lzma _sqlite3 _tkinter _uuid # Too old: ncurses-devel sqlite-devel # TODO: failed to build ncurses 5.4, 5.7, 5.9 -RUN yum install -y gdbm-devel sqlite-devel xz-devel tk-devel libffi-devel libuuid-devel bzip2-devel readline-devel tk-devel \ +RUN yum install -y gdbm-devel sqlite-devel xz-devel libffi-devel libuuid-devel bzip2-devel readline-devel tk-devel \ && yum clean all # Install Python 3.12